Add file pruned state (#62179)

* add file.rmdir tests for original function

* update file.rmdir exec module for recursive operation and verbose output

* fixes saltstack/salt#62178 add file.rmdir state

* add older_than capability to file.rmdir exec and state modules

* change test to use direct import of file module

* Revert "change test to use direct import of file module"

This reverts commit ff8c666e3b.

* revert previous test modification and add import to win_file

* rename file.rmdir state to file.pruned
This commit is contained in:
Nicholas Hughes 2022-08-03 09:16:22 -04:00 committed by GitHub
parent 254fc48b1f
commit a210dbc25f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 484 additions and 6 deletions

1
changelog/62178.added Normal file
View file

@ -0,0 +1 @@
Add file.pruned state and expanded file.rmdir exec module functionality

View file

@ -4128,18 +4128,43 @@ def stats(path, hash_type=None, follow_symlinks=True):
return ret
def rmdir(path):
def rmdir(path, recurse=False, verbose=False, older_than=None):
"""
.. versionadded:: 2014.1.0
.. versionchanged:: 3006.0
Changed return value for failure to a boolean.
Remove the specified directory. Fails if a directory is not empty.
recurse
When ``recurse`` is set to ``True``, all empty directories
within the path are pruned.
.. versionadded:: 3006.0
verbose
When ``verbose`` is set to ``True``, a dictionary is returned
which contains more information about the removal process.
.. versionadded:: 3006.0
older_than
When ``older_than`` is set to a number, it is used to determine the
**number of days** which must have passed since the last modification
timestamp before a directory will be allowed to be removed. Setting
the value to 0 is equivalent to leaving it at the default of ``None``.
.. versionadded:: 3006.0
CLI Example:
.. code-block:: bash
salt '*' file.rmdir /tmp/foo/
"""
ret = False
deleted = []
errors = []
path = os.path.expanduser(path)
if not os.path.isabs(path):
@ -4148,11 +4173,46 @@ def rmdir(path):
if not os.path.isdir(path):
raise SaltInvocationError("A valid directory was not specified.")
try:
os.rmdir(path)
return True
except OSError as exc:
return exc.strerror
if older_than:
now = time.time()
try:
older_than = now - (int(older_than) * 86400)
log.debug("Now (%s) looking for directories older than %s", now, older_than)
except (TypeError, ValueError) as exc:
older_than = 0
log.error("Unable to set 'older_than'. Defaulting to 0 days. (%s)", exc)
if recurse:
for root, dirs, _ in os.walk(path, topdown=False):
for subdir in dirs:
subdir_path = os.path.join(root, subdir)
if (
older_than and os.path.getmtime(subdir_path) < older_than
) or not older_than:
try:
log.debug("Removing '%s'", subdir_path)
os.rmdir(subdir_path)
deleted.append(subdir_path)
except OSError as exc:
errors.append([subdir_path, str(exc)])
log.error("Could not remove '%s': %s", subdir_path, exc)
ret = not errors
if (older_than and os.path.getmtime(path) < older_than) or not older_than:
try:
log.debug("Removing '%s'", path)
os.rmdir(path)
deleted.append(path)
ret = True if ret or not recurse else False
except OSError as exc:
ret = False
errors.append([path, str(exc)])
log.error("Could not remove '%s': %s", path, exc)
if verbose:
return {"deleted": deleted, "errors": errors, "result": ret}
else:
return ret
def remove(path):

View file

@ -9050,3 +9050,77 @@ def mod_beacon(name, **kwargs):
),
"result": False,
}
def pruned(name, recurse=False, ignore_errors=False, older_than=None):
"""
.. versionadded:: 3006.0
Ensure that the named directory is absent. If it exists and is empty, it
will be deleted. An entire directory tree can be pruned of empty
directories as well, by using the ``recurse`` option.
name
The directory which should be deleted if empty.
recurse
If set to ``True``, this option will recursive deletion of empty
directories. This is useful if nested paths are all empty, and would
be the only items preventing removal of the named root directory.
ignore_errors
If set to ``True``, any errors encountered while attempting to delete a
directory are ignored. This **AUTOMATICALLY ENABLES** the ``recurse``
option since it's not terribly useful to ignore errors on the removal of
a single directory. Useful for pruning only the empty directories in a
tree which contains non-empty directories as well.
older_than
When ``older_than`` is set to a number, it is used to determine the
**number of days** which must have passed since the last modification
timestamp before a directory will be allowed to be removed. Setting
the value to 0 is equivalent to leaving it at the default of ``None``.
"""
name = os.path.expanduser(name)
ret = {"name": name, "changes": {}, "comment": "", "result": True}
if ignore_errors:
recurse = True
if os.path.isdir(name):
if __opts__["test"]:
ret["result"] = None
ret["changes"]["deleted"] = name
ret["comment"] = "Directory {} is set for removal".format(name)
return ret
res = __salt__["file.rmdir"](
name, recurse=recurse, verbose=True, older_than=older_than
)
result = res.pop("result")
if result:
if recurse and res["deleted"]:
ret[
"comment"
] = "Recursively removed empty directories under {}".format(name)
ret["changes"]["deleted"] = sorted(res["deleted"])
elif not recurse:
ret["comment"] = "Removed directory {}".format(name)
ret["changes"]["deleted"] = name
return ret
elif ignore_errors and res["deleted"]:
ret["comment"] = "Recursively removed empty directories under {}".format(
name
)
ret["changes"]["deleted"] = sorted(res["deleted"])
return ret
ret["result"] = result
ret["changes"] = res
ret["comment"] = "Failed to remove directory {}".format(name)
return ret
ret["comment"] = "Directory {} is not present".format(name)
return ret

View file

@ -0,0 +1,151 @@
import os
import time
import pytest
pytestmark = [
pytest.mark.windows_whitelisted,
]
@pytest.fixture(scope="module")
def file(modules):
return modules.file
@pytest.fixture(scope="function")
def single_empty_dir(tmp_path):
yield str(tmp_path)
@pytest.fixture(scope="function")
def single_dir_with_file(tmp_path):
file = tmp_path / "stuff.txt"
file.write_text("things")
yield str(tmp_path)
@pytest.fixture(scope="function")
def nested_empty_dirs(tmp_path):
num_root = 2
num_mid = 4
num_last = 2
for root in range(1, num_root + 1):
for mid in range(1, num_mid + 1):
for last in range(1, num_last + 1):
nest = (
tmp_path
/ "root{}".format(root)
/ "mid{}".format(mid)
/ "last{}".format(last)
)
nest.mkdir(parents=True, exist_ok=True)
if last % 2:
now = time.time()
old = now - (2 * 86400)
os.utime(str(nest), (old, old))
yield str(tmp_path)
@pytest.fixture(scope="function")
def nested_dirs_with_files(tmp_path):
num_root = 2
num_mid = 4
num_last = 2
for root in range(1, num_root + 1):
for mid in range(1, num_mid + 1):
for last in range(1, num_last + 1):
nest = (
tmp_path
/ "root{}".format(root)
/ "mid{}".format(mid)
/ "last{}".format(last)
)
nest.mkdir(parents=True, exist_ok=True)
if last % 2:
last_file = nest / "stuff.txt"
last_file.write_text("things")
yield str(tmp_path)
def test_rmdir_success_with_default_options(file, single_empty_dir):
assert file.rmdir(single_empty_dir) is True
assert not os.path.isdir(single_empty_dir)
assert not os.path.exists(single_empty_dir)
def test_rmdir_failure_with_default_options(file, single_dir_with_file):
assert file.rmdir(single_dir_with_file) is False
assert os.path.isdir(single_dir_with_file)
def test_rmdir_single_dir_success_with_recurse(file, single_empty_dir):
assert file.rmdir(single_empty_dir, recurse=True) is True
assert not os.path.isdir(single_empty_dir)
assert not os.path.exists(single_empty_dir)
def test_rmdir_single_dir_failure_with_recurse(file, single_dir_with_file):
assert file.rmdir(single_dir_with_file, recurse=True) is False
assert os.path.isdir(single_dir_with_file)
def test_rmdir_nested_empty_dirs_failure_with_default_options(file, nested_empty_dirs):
assert file.rmdir(nested_empty_dirs) is False
assert os.path.isdir(nested_empty_dirs)
def test_rmdir_nested_empty_dirs_success_with_recurse(file, nested_empty_dirs):
assert file.rmdir(nested_empty_dirs, recurse=True) is True
assert not os.path.isdir(nested_empty_dirs)
assert not os.path.exists(nested_empty_dirs)
def test_rmdir_nested_dirs_with_files_failure_with_recurse(
file, nested_dirs_with_files
):
assert file.rmdir(nested_dirs_with_files, recurse=True) is False
assert os.path.isdir(nested_dirs_with_files)
def test_rmdir_verbose_nested_dirs_with_files_failure_with_recurse(
file, nested_dirs_with_files
):
ret = file.rmdir(nested_dirs_with_files, recurse=True, verbose=True)
assert ret["result"] is False
assert len(ret["deleted"]) == 8
assert len(ret["errors"]) == 19
assert os.path.isdir(nested_dirs_with_files)
def test_rmdir_verbose_success(file, single_empty_dir):
ret = file.rmdir(single_empty_dir, verbose=True)
assert ret["result"] is True
assert ret["deleted"][0] == single_empty_dir
assert not ret["errors"]
assert not os.path.isdir(single_empty_dir)
assert not os.path.exists(single_empty_dir)
def test_rmdir_verbose_failure(file, single_dir_with_file):
ret = file.rmdir(single_dir_with_file, verbose=True)
assert ret["result"] is False
assert not ret["deleted"]
assert ret["errors"][0][0] == single_dir_with_file
assert os.path.isdir(single_dir_with_file)
def test_rmdir_nested_empty_dirs_recurse_older_than(file, nested_empty_dirs):
ret = file.rmdir(nested_empty_dirs, recurse=True, verbose=True, older_than=1)
assert ret["result"] is True
assert len(ret["deleted"]) == 8
assert len(ret["errors"]) == 0
assert os.path.isdir(nested_empty_dirs)
def test_rmdir_nested_empty_dirs_recurse_not_older_than(file, nested_empty_dirs):
ret = file.rmdir(nested_empty_dirs, recurse=True, verbose=True, older_than=3)
assert ret["result"] is True
assert len(ret["deleted"]) == 0
assert len(ret["errors"]) == 0
assert os.path.isdir(nested_empty_dirs)

View file

@ -0,0 +1,82 @@
import pytest
pytestmark = [
pytest.mark.windows_whitelisted,
]
@pytest.fixture(scope="module")
def file(states):
return states.file
@pytest.fixture(scope="function")
def single_dir_with_file(tmp_path):
file = tmp_path / "stuff.txt"
file.write_text("things")
yield str(tmp_path)
@pytest.fixture(scope="function")
def nested_empty_dirs(tmp_path):
num_root = 2
num_mid = 4
num_last = 2
for root in range(1, num_root + 1):
for mid in range(1, num_mid + 1):
for last in range(1, num_last + 1):
nest = (
tmp_path
/ "root{}".format(root)
/ "mid{}".format(mid)
/ "last{}".format(last)
)
nest.mkdir(parents=True, exist_ok=True)
yield str(tmp_path)
@pytest.fixture(scope="function")
def nested_dirs_with_files(tmp_path):
num_root = 2
num_mid = 4
num_last = 2
for root in range(1, num_root + 1):
for mid in range(1, num_mid + 1):
for last in range(1, num_last + 1):
nest = (
tmp_path
/ "root{}".format(root)
/ "mid{}".format(mid)
/ "last{}".format(last)
)
nest.mkdir(parents=True, exist_ok=True)
if last % 2:
last_file = nest / "stuff.txt"
last_file.write_text("things")
yield str(tmp_path)
def test_pruned_failure(file, single_dir_with_file):
ret = file.pruned(name=single_dir_with_file)
assert ret.result is False
assert not ret.changes["deleted"]
assert len(ret.changes["errors"]) == 1
assert ret.comment == "Failed to remove directory {}".format(single_dir_with_file)
def test_pruned_success_recurse_and_deleted(file, nested_empty_dirs):
ret = file.pruned(name=nested_empty_dirs, recurse=True)
assert ret.result is True
assert len(ret.changes["deleted"]) == 27
assert ret.comment == "Recursively removed empty directories under {}".format(
nested_empty_dirs
)
def test_pruned_success_ignore_errors_and_deleted(file, nested_dirs_with_files):
ret = file.pruned(name=nested_dirs_with_files, ignore_errors=True)
assert ret.result is True
assert len(ret.changes["deleted"]) == 8
assert ret.comment == "Recursively removed empty directories under {}".format(
nested_dirs_with_files
)

View file

@ -0,0 +1,50 @@
import logging
import pytest
import salt.modules.file as filemod
from salt.exceptions import SaltInvocationError
from tests.support.mock import MagicMock, patch
log = logging.getLogger(__name__)
@pytest.fixture
def configure_loader_modules():
return {
filemod: {
"__salt__": {},
"__opts__": {
"test": False,
"file_roots": {"base": "tmp"},
"pillar_roots": {"base": "tmp"},
"cachedir": "tmp",
"grains": {},
},
"__grains__": {},
"__utils__": {},
}
}
def test_file_rmdir_not_absolute_path_exception():
with pytest.raises(SaltInvocationError):
filemod.rmdir("not_absolute")
def test_file_rmdir_not_found_exception():
with pytest.raises(SaltInvocationError):
filemod.rmdir("/tmp/not_there")
def test_file_rmdir_success_return():
with patch("os.rmdir", MagicMock(return_value=True)), patch(
"os.path.isdir", MagicMock(return_value=True)
):
assert filemod.rmdir("/tmp/salt_test_return") is True
def test_file_rmdir_failure_return():
with patch(
"os.rmdir", MagicMock(side_effect=OSError(39, "Directory not empty"))
), patch("os.path.isdir", MagicMock(return_value=True)):
assert filemod.rmdir("/tmp/salt_test_return") is False

View file

@ -0,0 +1,60 @@
import logging
import os
import pytest
import salt.states.file as filestate
import salt.utils.platform
from tests.support.mock import MagicMock, patch
log = logging.getLogger(__name__)
@pytest.fixture
def configure_loader_modules():
return {filestate: {"__salt__": {}, "__opts__": {}}}
@pytest.fixture
def directory_name():
name = os.sep + "test"
if salt.utils.platform.is_windows():
name = "c:" + name
return name
def test_pruned_clean(directory_name):
with patch("os.path.isdir", return_value=False):
ret = filestate.pruned(name=directory_name)
assert ret == {
"changes": {},
"comment": "Directory {} is not present".format(directory_name),
"name": directory_name,
"result": True,
}
def test_pruned_test(directory_name):
with patch("os.path.isdir", return_value=True), patch.dict(
filestate.__opts__, {"test": True}
):
ret = filestate.pruned(name=directory_name)
assert ret == {
"changes": {"deleted": directory_name},
"comment": "Directory {} is set for removal".format(directory_name),
"name": directory_name,
"result": None,
}
def test_pruned_success(directory_name):
rmdir = MagicMock(return_value={"result": True})
with patch("os.path.isdir", return_value=True), patch.dict(
filestate.__opts__, {"test": False}
), patch.dict(filestate.__salt__, {"file.rmdir": rmdir}):
ret = filestate.pruned(name=directory_name)
assert ret == {
"changes": {"deleted": directory_name},
"comment": "Removed directory {}".format(directory_name),
"name": directory_name,
"result": True,
}