Fix windows file recurse with clean true (#60415)

* fix windows file recurse with clean true

* fixs logic to handle windows case separately

* update logic

* pre commit

* update with changes

* remove param and let the helper function handle everything

* ooops forgot this

* ad changelog file

* let these changes also reflect for darwin systems

* pre commit

* remove check and have every system double check for files to keep

* new changes

* pre commit

* more changes

* support py35 with str over Path object

* remove tests

* more pytest changes

* remove not needed test

* pre commit

* add test

* pre commit and lint

* change name of utility function and update docs

Co-authored-by: Joe Eacott <jeacott@vmware.com>
This commit is contained in:
Joe Eacott 2021-08-18 22:27:13 -06:00 committed by GitHub
parent 6417cd7f83
commit 09efe7206e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 212 additions and 135 deletions

1
changelog/47969.fixed Normal file
View file

@ -0,0 +1 @@
fix issue that allows case sensitive files to be carried through

View file

@ -664,6 +664,12 @@ def _clean_dir(root, keep, exclude_pat):
Clean out all of the files and directories in a directory (root) while
preserving the files in a list (keep) and part of exclude_pat
"""
case_keep = None
if salt.utils.files.case_insensitive_filesystem():
# Create a case-sensitive dict before doing comparisons
# if file system is case sensitive
case_keep = keep
root = os.path.normcase(root)
real_keep = _find_keep_files(root, keep)
removed = set()
@ -676,6 +682,14 @@ def _clean_dir(root, keep, exclude_pat):
os.path.relpath(nfn, root), None, exclude_pat
):
return
# Before we can accurately assess the removal of a file, we must
# check for systems with case sensitive files. If we originally
# meant to keep a file, but due to case sensitivity python would
# otherwise remove the file, check against the original list.
if case_keep:
for item in case_keep:
if item.casefold() == nfn.casefold():
return
removed.add(nfn)
if not __opts__["test"]:
try:

View file

@ -785,6 +785,20 @@ def backup_minion(path, bkroot):
os.chmod(bkpath, fstat.st_mode)
def case_insensitive_filesystem(path=None):
"""
Detect case insensitivity on a system.
Returns:
bool: Flag to indicate case insensitivity
.. versionadded:: 3004
"""
with tempfile.NamedTemporaryFile(prefix="TmP", dir=path, delete=True) as tmp_file:
return os.path.exists(tmp_file.name.lower())
def get_encoding(path):
"""
Detect a file's encoding using the following:

View file

@ -0,0 +1,144 @@
"""
Unit Tests for functions located in salt/utils/files.py
"""
import copy
import os
import pytest
import salt.utils.files
from tests.support.mock import patch
def test_safe_rm():
with patch("os.remove") as os_remove_mock:
salt.utils.files.safe_rm("dummy_tgt")
assert os_remove_mock.called is True
def test_safe_rm_exceptions(tmp_path):
assert (
salt.utils.files.safe_rm(str(tmp_path / "no_way_this_is_a_file_nope.sh"))
is None
)
def test_safe_walk_symlink_recursion(tmp_path):
if tmp_path.stat().st_ino == 0:
pytest.xfail(reason="inodes not supported in {}".format(tmp_path))
tmp_path = str(tmp_path)
os.mkdir(os.path.join(tmp_path, "fax"))
os.makedirs(os.path.join(tmp_path, "foo", "bar"))
os.symlink(os.path.join("..", ".."), os.path.join(tmp_path, "foo", "bar", "baz"))
os.symlink("foo", os.path.join(tmp_path, "root"))
expected = [
(os.path.join(tmp_path, "root"), ["bar"], []),
(os.path.join(tmp_path, "root", "bar"), ["baz"], []),
(os.path.join(tmp_path, "root", "bar", "baz"), ["fax", "foo", "root"], []),
(os.path.join(tmp_path, "root", "bar", "baz", "fax"), [], []),
]
paths = []
for root, dirs, names in salt.utils.files.safe_walk(os.path.join(tmp_path, "root")):
paths.append((root, sorted(dirs), names))
assert paths == expected
def test_fopen_with_disallowed_fds():
"""
This is safe to have as a unit test since we aren't going to actually
try to read or write. We want to ensure that we are raising a
TypeError. Python 3's open() builtin will treat the booleans as file
descriptor numbers and try to open stdin/stdout. We also want to test
fd 2 which is stderr.
"""
for invalid_fn in (False, True, 0, 1, 2):
try:
with salt.utils.files.fopen(invalid_fn):
pass
except TypeError:
# This is expected. We aren't using an assertRaises here
# because we want to ensure that if we did somehow open the
# filehandle, that it doesn't remain open.
pass
else:
# We probably won't even get this far if we actually opened
# stdin/stdout as a file descriptor. It is likely to cause the
# integration suite to die since, news flash, closing
# stdin/stdout/stderr is usually not a wise thing to do in the
# middle of a program's execution.
pytest.fail(
"fopen() should have been prevented from opening a file "
"using {} as the filename".format(invalid_fn)
)
def _create_temp_structure(temp_directory, structure):
for folder, files in structure.items():
current_directory = os.path.join(temp_directory, folder)
os.makedirs(current_directory)
for name, content in files.items():
path = os.path.join(temp_directory, folder, name)
with salt.utils.files.fopen(path, "w+") as fh:
fh.write(content)
def _validate_folder_structure_and_contents(target_directory, desired_structure):
for folder, files in desired_structure.items():
for name, content in files.items():
path = os.path.join(target_directory, folder, name)
with salt.utils.files.fopen(path) as fh:
assert fh.read().strip() == content
def test_recursive_copy(tmp_path):
src = str(tmp_path / "src")
dest = str(tmp_path / "dest")
src_structure = {
"foo": {"foofile.txt": "fooSTRUCTURE"},
"bar": {"barfile.txt": "barSTRUCTURE"},
}
dest_structure = {
"foo": {"foo.txt": "fooTARGET_STRUCTURE"},
"baz": {"baz.txt": "bazTARGET_STRUCTURE"},
}
# Create the file structures in both src and dest dirs
_create_temp_structure(src, src_structure)
_create_temp_structure(dest, dest_structure)
# Perform the recursive copy
salt.utils.files.recursive_copy(src, dest)
# Confirm results match expected results
desired_structure = copy.copy(dest_structure)
desired_structure.update(src_structure)
_validate_folder_structure_and_contents(dest, desired_structure)
@pytest.mark.skip_unless_on_windows
def test_case_sensitive_filesystem_win():
"""
Test case insensitivity on Windows.
"""
result = salt.utils.files.case_insensitive_filesystem()
assert result is True
@pytest.mark.skip_unless_on_linux
def test_case_sensitive_filesystem_lin():
"""
Test case insensitivity on Linux.
"""
result = salt.utils.files.case_insensitive_filesystem()
assert result is False
@pytest.mark.skip_unless_on_darwin
def test_case_sensitive_filesystem_dar():
"""
Test case insensitivity on Darwin.
"""
result = salt.utils.files.case_insensitive_filesystem()
assert result is True

View file

@ -2895,7 +2895,7 @@ class TestFindKeepFiles(TestCase):
def test__find_keep_files_win32(self):
"""
Test _find_keep_files. The `_find_keep_files` function is only called by
_clean_dir, so case doesn't matter. Should return all lower case.
_clean_dir.
"""
keep = filestate._find_keep_files(
"c:\\test\\parent_folder",
@ -2914,6 +2914,44 @@ class TestFindKeepFiles(TestCase):
actual = sorted(list(keep))
self.assertListEqual(actual, expected)
@pytest.mark.skip_unless_on_windows
def test__clean_dir_win32(self):
"""
Test _clean_dir to ensure that regardless of case, we keep all files
requested and do not delete any. Therefore, the expected list should
be empty for this test.
"""
keep = filestate._clean_dir(
"c:\\test\\parent_folder",
[
"C:\\test\\parent_folder\\meh-1.txt",
"C:\\Test\\Parent_folder\\Meh-2.txt",
],
exclude_pat=None,
)
actual = sorted(list(keep))
expected = []
self.assertListEqual(actual, expected)
@pytest.mark.skip_unless_on_darwin
def test__find_keep_files_darwin(self):
"""
Test _clean_dir to ensure that regardless of case, we keep all files
requested and do not delete any. Therefore, the expected list should
be empty for this test.
"""
keep = filestate._clean_dir(
"/test/parent_folder",
[
"/test/folder/parent_folder/meh-1.txt",
"/Test/folder/Parent_Folder/Meh-2.txt",
],
exclude_pat=None,
)
actual = sorted(list(keep))
expected = []
self.assertListEqual(actual, expected)
class TestFileTidied(TestCase):
def setUp(self):

View file

@ -1,134 +0,0 @@
"""
Unit Tests for functions located in salt/utils/files.py
"""
import copy
import os
import salt.utils.files
from tests.support.helpers import with_tempdir
from tests.support.mock import patch
from tests.support.unit import TestCase, skipIf
class FilesTestCase(TestCase):
"""
Test case for files util.
"""
def test_safe_rm(self):
with patch("os.remove") as os_remove_mock:
salt.utils.files.safe_rm("dummy_tgt")
self.assertTrue(os_remove_mock.called)
@skipIf(
os.path.exists("/tmp/no_way_this_is_a_file_nope.sh"),
"Test file exists! Skipping safe_rm_exceptions test!",
)
def test_safe_rm_exceptions(self):
error = False
try:
salt.utils.files.safe_rm("/tmp/no_way_this_is_a_file_nope.sh")
except OSError:
error = True
self.assertFalse(
error, "salt.utils.files.safe_rm raised exception when it should not have"
)
@with_tempdir()
def test_safe_walk_symlink_recursion(self, tmp):
if os.stat(tmp).st_ino == 0:
self.skipTest("inodes not supported in {}".format(tmp))
os.mkdir(os.path.join(tmp, "fax"))
os.makedirs(os.path.join(tmp, "foo", "bar"))
os.symlink(os.path.join("..", ".."), os.path.join(tmp, "foo", "bar", "baz"))
os.symlink("foo", os.path.join(tmp, "root"))
expected = [
(os.path.join(tmp, "root"), ["bar"], []),
(os.path.join(tmp, "root", "bar"), ["baz"], []),
(os.path.join(tmp, "root", "bar", "baz"), ["fax", "foo", "root"], []),
(os.path.join(tmp, "root", "bar", "baz", "fax"), [], []),
]
paths = []
for root, dirs, names in salt.utils.files.safe_walk(os.path.join(tmp, "root")):
paths.append((root, sorted(dirs), names))
if paths != expected:
raise AssertionError(
"\n".join(
["got:"]
+ [repr(p) for p in paths]
+ ["", "expected:"]
+ [repr(p) for p in expected]
)
)
def test_fopen_with_disallowed_fds(self):
"""
This is safe to have as a unit test since we aren't going to actually
try to read or write. We want to ensure that we are raising a
TypeError. Python 3's open() builtin will treat the booleans as file
descriptor numbers and try to open stdin/stdout. We also want to test
fd 2 which is stderr.
"""
for invalid_fn in (False, True, 0, 1, 2):
try:
with salt.utils.files.fopen(invalid_fn):
pass
except TypeError:
# This is expected. We aren't using an assertRaises here
# because we want to ensure that if we did somehow open the
# filehandle, that it doesn't remain open.
pass
else:
# We probably won't even get this far if we actually opened
# stdin/stdout as a file descriptor. It is likely to cause the
# integration suite to die since, news flash, closing
# stdin/stdout/stderr is usually not a wise thing to do in the
# middle of a program's execution.
self.fail(
"fopen() should have been prevented from opening a file "
"using {} as the filename".format(invalid_fn)
)
def _create_temp_structure(self, temp_directory, structure):
for folder, files in structure.items():
current_directory = os.path.join(temp_directory, folder)
os.makedirs(current_directory)
for name, content in files.items():
path = os.path.join(temp_directory, folder, name)
with salt.utils.files.fopen(path, "w+") as fh:
fh.write(content)
def _validate_folder_structure_and_contents(
self, target_directory, desired_structure
):
for folder, files in desired_structure.items():
for name, content in files.items():
path = os.path.join(target_directory, folder, name)
with salt.utils.files.fopen(path) as fh:
assert fh.read().strip() == content
@with_tempdir()
@with_tempdir()
def test_recursive_copy(self, src, dest):
src_structure = {
"foo": {"foofile.txt": "fooSTRUCTURE"},
"bar": {"barfile.txt": "barSTRUCTURE"},
}
dest_structure = {
"foo": {"foo.txt": "fooTARGET_STRUCTURE"},
"baz": {"baz.txt": "bazTARGET_STRUCTURE"},
}
# Create the file structures in both src and dest dirs
self._create_temp_structure(src, src_structure)
self._create_temp_structure(dest, dest_structure)
# Perform the recursive copy
salt.utils.files.recursive_copy(src, dest)
# Confirm results match expected results
desired_structure = copy.copy(dest_structure)
desired_structure.update(src_structure)
self._validate_folder_structure_and_contents(dest, desired_structure)