mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #42036 from twangboy/win_unit_test_file
Fix `unit.modules.test_file` for Windows
This commit is contained in:
commit
7fbbea3806
2 changed files with 257 additions and 194 deletions
|
@ -2179,14 +2179,14 @@ def replace(path,
|
|||
if not_found_content is None:
|
||||
not_found_content = repl
|
||||
if prepend_if_not_found:
|
||||
new_file.insert(0, not_found_content + b'\n')
|
||||
new_file.insert(0, not_found_content + salt.utils.to_bytes(os.linesep))
|
||||
else:
|
||||
# append_if_not_found
|
||||
# Make sure we have a newline at the end of the file
|
||||
if 0 != len(new_file):
|
||||
if not new_file[-1].endswith(b'\n'):
|
||||
new_file[-1] += b'\n'
|
||||
new_file.append(not_found_content + b'\n')
|
||||
if not new_file[-1].endswith(salt.utils.to_bytes(os.linesep)):
|
||||
new_file[-1] += salt.utils.to_bytes(os.linesep)
|
||||
new_file.append(not_found_content + salt.utils.to_bytes(os.linesep))
|
||||
has_changes = True
|
||||
if not dry_run:
|
||||
try:
|
||||
|
@ -2197,9 +2197,9 @@ def replace(path,
|
|||
raise CommandExecutionError("Exception: {0}".format(exc))
|
||||
# write new content in the file while avoiding partial reads
|
||||
try:
|
||||
fh_ = salt.utils.atomicfile.atomic_open(path, 'w')
|
||||
fh_ = salt.utils.atomicfile.atomic_open(path, 'wb')
|
||||
for line in new_file:
|
||||
fh_.write(salt.utils.to_str(line))
|
||||
fh_.write(salt.utils.to_bytes(line))
|
||||
finally:
|
||||
fh_.close()
|
||||
|
||||
|
@ -2369,9 +2369,10 @@ def blockreplace(path,
|
|||
try:
|
||||
fi_file = fileinput.input(path,
|
||||
inplace=False, backup=False,
|
||||
bufsize=1, mode='r')
|
||||
bufsize=1, mode='rb')
|
||||
for line in fi_file:
|
||||
|
||||
line = salt.utils.to_str(line)
|
||||
result = line
|
||||
|
||||
if marker_start in line:
|
||||
|
@ -2384,14 +2385,24 @@ def blockreplace(path,
|
|||
# end of block detected
|
||||
in_block = False
|
||||
|
||||
# Check for multi-line '\n' terminated content as split will
|
||||
# introduce an unwanted additional new line.
|
||||
if content and content[-1] == '\n':
|
||||
content = content[:-1]
|
||||
# Handle situations where there may be multiple types
|
||||
# of line endings in the same file. Separate the content
|
||||
# into lines. Account for Windows-style line endings
|
||||
# using os.linesep, then by linux-style line endings
|
||||
# using '\n'
|
||||
split_content = []
|
||||
for linesep_line in content.split(os.linesep):
|
||||
for content_line in linesep_line.split('\n'):
|
||||
split_content.append(content_line)
|
||||
|
||||
# Trim any trailing new lines to avoid unwanted
|
||||
# additional new lines
|
||||
while not split_content[-1]:
|
||||
split_content.pop()
|
||||
|
||||
# push new block content in file
|
||||
for cline in content.split('\n'):
|
||||
new_file.append(cline + '\n')
|
||||
for content_line in split_content:
|
||||
new_file.append(content_line + os.linesep)
|
||||
|
||||
done = True
|
||||
|
||||
|
@ -2419,25 +2430,25 @@ def blockreplace(path,
|
|||
if not done:
|
||||
if prepend_if_not_found:
|
||||
# add the markers and content at the beginning of file
|
||||
new_file.insert(0, marker_end + '\n')
|
||||
new_file.insert(0, marker_end + os.linesep)
|
||||
if append_newline is True:
|
||||
new_file.insert(0, content + '\n')
|
||||
new_file.insert(0, content + os.linesep)
|
||||
else:
|
||||
new_file.insert(0, content)
|
||||
new_file.insert(0, marker_start + '\n')
|
||||
new_file.insert(0, marker_start + os.linesep)
|
||||
done = True
|
||||
elif append_if_not_found:
|
||||
# Make sure we have a newline at the end of the file
|
||||
if 0 != len(new_file):
|
||||
if not new_file[-1].endswith('\n'):
|
||||
new_file[-1] += '\n'
|
||||
if not new_file[-1].endswith(os.linesep):
|
||||
new_file[-1] += os.linesep
|
||||
# add the markers and content at the end of file
|
||||
new_file.append(marker_start + '\n')
|
||||
new_file.append(marker_start + os.linesep)
|
||||
if append_newline is True:
|
||||
new_file.append(content + '\n')
|
||||
new_file.append(content + os.linesep)
|
||||
else:
|
||||
new_file.append(content)
|
||||
new_file.append(marker_end + '\n')
|
||||
new_file.append(marker_end + os.linesep)
|
||||
done = True
|
||||
else:
|
||||
raise CommandExecutionError(
|
||||
|
@ -2468,9 +2479,9 @@ def blockreplace(path,
|
|||
|
||||
# write new content in the file while avoiding partial reads
|
||||
try:
|
||||
fh_ = salt.utils.atomicfile.atomic_open(path, 'w')
|
||||
fh_ = salt.utils.atomicfile.atomic_open(path, 'wb')
|
||||
for line in new_file:
|
||||
fh_.write(line)
|
||||
fh_.write(salt.utils.to_bytes(line))
|
||||
finally:
|
||||
fh_.close()
|
||||
|
||||
|
@ -3609,6 +3620,14 @@ def source_list(source, source_hash, saltenv):
|
|||
single_src = next(iter(single))
|
||||
single_hash = single[single_src] if single[single_src] else source_hash
|
||||
urlparsed_single_src = _urlparse(single_src)
|
||||
# Fix this for Windows
|
||||
if salt.utils.is_windows():
|
||||
# urlparse doesn't handle a local Windows path without the
|
||||
# protocol indicator (file://). The scheme will be the
|
||||
# drive letter instead of the protocol. So, we'll add the
|
||||
# protocol and re-parse
|
||||
if urlparsed_single_src.scheme.lower() in string.ascii_lowercase:
|
||||
urlparsed_single_src = _urlparse('file://' + single_src)
|
||||
proto = urlparsed_single_src.scheme
|
||||
if proto == 'salt':
|
||||
path, senv = salt.utils.url.parse(single_src)
|
||||
|
@ -3620,10 +3639,15 @@ def source_list(source, source_hash, saltenv):
|
|||
elif proto.startswith('http') or proto == 'ftp':
|
||||
ret = (single_src, single_hash)
|
||||
break
|
||||
elif proto == 'file' and os.path.exists(urlparsed_single_src.path):
|
||||
elif proto == 'file' and (
|
||||
os.path.exists(urlparsed_single_src.netloc) or
|
||||
os.path.exists(urlparsed_single_src.path) or
|
||||
os.path.exists(os.path.join(
|
||||
urlparsed_single_src.netloc,
|
||||
urlparsed_single_src.path))):
|
||||
ret = (single_src, single_hash)
|
||||
break
|
||||
elif single_src.startswith('/') and os.path.exists(single_src):
|
||||
elif single_src.startswith(os.sep) and os.path.exists(single_src):
|
||||
ret = (single_src, single_hash)
|
||||
break
|
||||
elif isinstance(single, six.string_types):
|
||||
|
@ -3634,14 +3658,26 @@ def source_list(source, source_hash, saltenv):
|
|||
ret = (single, source_hash)
|
||||
break
|
||||
urlparsed_src = _urlparse(single)
|
||||
if salt.utils.is_windows():
|
||||
# urlparse doesn't handle a local Windows path without the
|
||||
# protocol indicator (file://). The scheme will be the
|
||||
# drive letter instead of the protocol. So, we'll add the
|
||||
# protocol and re-parse
|
||||
if urlparsed_src.scheme.lower() in string.ascii_lowercase:
|
||||
urlparsed_src = _urlparse('file://' + single)
|
||||
proto = urlparsed_src.scheme
|
||||
if proto == 'file' and os.path.exists(urlparsed_src.path):
|
||||
if proto == 'file' and (
|
||||
os.path.exists(urlparsed_src.netloc) or
|
||||
os.path.exists(urlparsed_src.path) or
|
||||
os.path.exists(os.path.join(
|
||||
urlparsed_src.netloc,
|
||||
urlparsed_src.path))):
|
||||
ret = (single, source_hash)
|
||||
break
|
||||
elif proto.startswith('http') or proto == 'ftp':
|
||||
ret = (single, source_hash)
|
||||
break
|
||||
elif single.startswith('/') and os.path.exists(single):
|
||||
elif single.startswith(os.sep) and os.path.exists(single):
|
||||
ret = (single, source_hash)
|
||||
break
|
||||
if ret is None:
|
||||
|
|
|
@ -10,7 +10,7 @@ import textwrap
|
|||
# Import Salt Testing libs
|
||||
from tests.support.mixins import LoaderModuleMockMixin
|
||||
from tests.support.paths import TMP
|
||||
from tests.support.unit import TestCase
|
||||
from tests.support.unit import TestCase, skipIf
|
||||
from tests.support.mock import MagicMock, patch
|
||||
|
||||
# Import Salt libs
|
||||
|
@ -89,45 +89,56 @@ class FileReplaceTestCase(TestCase, LoaderModuleMockMixin):
|
|||
'repl': 'baz=\\g<value>',
|
||||
'append_if_not_found': True,
|
||||
}
|
||||
base = 'foo=1\nbar=2'
|
||||
expected = '{base}\n{repl}\n'.format(base=base, **args)
|
||||
base = os.linesep.join(['foo=1', 'bar=2'])
|
||||
|
||||
# File ending with a newline, no match
|
||||
with tempfile.NamedTemporaryFile(mode='w+') as tfile:
|
||||
tfile.write(base + '\n')
|
||||
with tempfile.NamedTemporaryFile('w+b', delete=False) as tfile:
|
||||
tfile.write(salt.utils.to_bytes(base + os.linesep))
|
||||
tfile.flush()
|
||||
filemod.replace(tfile.name, **args)
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), expected)
|
||||
filemod.replace(tfile.name, **args)
|
||||
expected = os.linesep.join([base, 'baz=\\g<value>']) + os.linesep
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), expected)
|
||||
os.remove(tfile.name)
|
||||
|
||||
# File not ending with a newline, no match
|
||||
with tempfile.NamedTemporaryFile('w+') as tfile:
|
||||
tfile.write(base)
|
||||
with tempfile.NamedTemporaryFile('w+b', delete=False) as tfile:
|
||||
tfile.write(salt.utils.to_bytes(base))
|
||||
tfile.flush()
|
||||
filemod.replace(tfile.name, **args)
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), expected)
|
||||
filemod.replace(tfile.name, **args)
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), expected)
|
||||
os.remove(tfile.name)
|
||||
|
||||
# A newline should not be added in empty files
|
||||
with tempfile.NamedTemporaryFile('w+') as tfile:
|
||||
filemod.replace(tfile.name, **args)
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), args['repl'] + '\n')
|
||||
with tempfile.NamedTemporaryFile('w+b', delete=False) as tfile:
|
||||
pass
|
||||
filemod.replace(tfile.name, **args)
|
||||
expected = args['repl'] + os.linesep
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), expected)
|
||||
os.remove(tfile.name)
|
||||
|
||||
# Using not_found_content, rather than repl
|
||||
with tempfile.NamedTemporaryFile('w+') as tfile:
|
||||
args['not_found_content'] = 'baz=3'
|
||||
expected = '{base}\n{not_found_content}\n'.format(base=base, **args)
|
||||
tfile.write(base)
|
||||
with tempfile.NamedTemporaryFile('w+b', delete=False) as tfile:
|
||||
tfile.write(salt.utils.to_bytes(base))
|
||||
tfile.flush()
|
||||
filemod.replace(tfile.name, **args)
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), expected)
|
||||
args['not_found_content'] = 'baz=3'
|
||||
expected = os.linesep.join([base, 'baz=3']) + os.linesep
|
||||
filemod.replace(tfile.name, **args)
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), expected)
|
||||
os.remove(tfile.name)
|
||||
|
||||
# not appending if matches
|
||||
with tempfile.NamedTemporaryFile('w+') as tfile:
|
||||
base = 'foo=1\n#baz=42\nbar=2\n'
|
||||
expected = 'foo=1\nbaz=42\nbar=2\n'
|
||||
tfile.write(base)
|
||||
with tempfile.NamedTemporaryFile('w+b', delete=False) as tfile:
|
||||
base = os.linesep.join(['foo=1', 'baz=42', 'bar=2'])
|
||||
tfile.write(salt.utils.to_bytes(base))
|
||||
tfile.flush()
|
||||
filemod.replace(tfile.name, **args)
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), expected)
|
||||
expected = base
|
||||
filemod.replace(tfile.name, **args)
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), expected)
|
||||
|
||||
def test_backup(self):
|
||||
fext = '.bak'
|
||||
|
@ -246,25 +257,26 @@ class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
|
|||
del self.tfile
|
||||
|
||||
def test_replace_multiline(self):
|
||||
new_multiline_content = (
|
||||
"Who's that then?\nWell, how'd you become king,"
|
||||
"then?\nWe found them. I'm not a witch.\nWe shall"
|
||||
"say 'Ni' again to you, if you do not appease us."
|
||||
)
|
||||
new_multiline_content = os.linesep.join([
|
||||
"Who's that then?",
|
||||
"Well, how'd you become king, then?",
|
||||
"We found them. I'm not a witch.",
|
||||
"We shall say 'Ni' again to you, if you do not appease us."
|
||||
])
|
||||
filemod.blockreplace(self.tfile.name,
|
||||
'#-- START BLOCK 1',
|
||||
'#-- END BLOCK 1',
|
||||
new_multiline_content,
|
||||
backup=False)
|
||||
|
||||
with salt.utils.fopen(self.tfile.name, 'r') as fp:
|
||||
with salt.utils.fopen(self.tfile.name, 'rb') as fp:
|
||||
filecontent = fp.read()
|
||||
self.assertIn('#-- START BLOCK 1'
|
||||
+ "\n" + new_multiline_content
|
||||
+ "\n"
|
||||
+ '#-- END BLOCK 1', filecontent)
|
||||
self.assertNotIn('old content part 1', filecontent)
|
||||
self.assertNotIn('old content part 2', filecontent)
|
||||
self.assertIn(salt.utils.to_bytes(
|
||||
os.linesep.join([
|
||||
'#-- START BLOCK 1', new_multiline_content, '#-- END BLOCK 1'])),
|
||||
filecontent)
|
||||
self.assertNotIn(b'old content part 1', filecontent)
|
||||
self.assertNotIn(b'old content part 2', filecontent)
|
||||
|
||||
def test_replace_append(self):
|
||||
new_content = "Well, I didn't vote for you."
|
||||
|
@ -291,10 +303,12 @@ class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
|
|||
backup=False,
|
||||
append_if_not_found=True)
|
||||
|
||||
with salt.utils.fopen(self.tfile.name, 'r') as fp:
|
||||
self.assertIn('#-- START BLOCK 2'
|
||||
+ "\n" + new_content
|
||||
+ '#-- END BLOCK 2', fp.read())
|
||||
with salt.utils.fopen(self.tfile.name, 'rb') as fp:
|
||||
self.assertIn(salt.utils.to_bytes(
|
||||
os.linesep.join([
|
||||
'#-- START BLOCK 2',
|
||||
'{0}#-- END BLOCK 2'.format(new_content)])),
|
||||
fp.read())
|
||||
|
||||
def test_replace_append_newline_at_eof(self):
|
||||
'''
|
||||
|
@ -308,27 +322,33 @@ class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
|
|||
'content': 'baz',
|
||||
'append_if_not_found': True,
|
||||
}
|
||||
block = '{marker_start}\n{content}{marker_end}\n'.format(**args)
|
||||
expected = base + '\n' + block
|
||||
block = os.linesep.join(['#start', 'baz#stop']) + os.linesep
|
||||
# File ending with a newline
|
||||
with tempfile.NamedTemporaryFile(mode='w+') as tfile:
|
||||
tfile.write(base + '\n')
|
||||
with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as tfile:
|
||||
tfile.write(salt.utils.to_bytes(base + os.linesep))
|
||||
tfile.flush()
|
||||
filemod.blockreplace(tfile.name, **args)
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), expected)
|
||||
filemod.blockreplace(tfile.name, **args)
|
||||
expected = os.linesep.join([base, block])
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), expected)
|
||||
os.remove(tfile.name)
|
||||
|
||||
# File not ending with a newline
|
||||
with tempfile.NamedTemporaryFile(mode='w+') as tfile:
|
||||
tfile.write(base)
|
||||
with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as tfile:
|
||||
tfile.write(salt.utils.to_bytes(base))
|
||||
tfile.flush()
|
||||
filemod.blockreplace(tfile.name, **args)
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), expected)
|
||||
filemod.blockreplace(tfile.name, **args)
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), expected)
|
||||
os.remove(tfile.name)
|
||||
|
||||
# A newline should not be added in empty files
|
||||
with tempfile.NamedTemporaryFile(mode='w+') as tfile:
|
||||
filemod.blockreplace(tfile.name, **args)
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), block)
|
||||
with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as tfile:
|
||||
pass
|
||||
filemod.blockreplace(tfile.name, **args)
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), block)
|
||||
os.remove(tfile.name)
|
||||
|
||||
def test_replace_prepend(self):
|
||||
new_content = "Well, I didn't vote for you."
|
||||
|
@ -343,10 +363,11 @@ class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
|
|||
prepend_if_not_found=False,
|
||||
backup=False
|
||||
)
|
||||
with salt.utils.fopen(self.tfile.name, 'r') as fp:
|
||||
self.assertNotIn(
|
||||
'#-- START BLOCK 2' + "\n"
|
||||
+ new_content + '#-- END BLOCK 2',
|
||||
with salt.utils.fopen(self.tfile.name, 'rb') as fp:
|
||||
self.assertNotIn(salt.utils.to_bytes(
|
||||
os.linesep.join([
|
||||
'#-- START BLOCK 2',
|
||||
'{0}#-- END BLOCK 2'.format(new_content)])),
|
||||
fp.read())
|
||||
|
||||
filemod.blockreplace(self.tfile.name,
|
||||
|
@ -355,12 +376,12 @@ class FileBlockReplaceTestCase(TestCase, LoaderModuleMockMixin):
|
|||
backup=False,
|
||||
prepend_if_not_found=True)
|
||||
|
||||
with salt.utils.fopen(self.tfile.name, 'r') as fp:
|
||||
with salt.utils.fopen(self.tfile.name, 'rb') as fp:
|
||||
self.assertTrue(
|
||||
fp.read().startswith(
|
||||
'#-- START BLOCK 2'
|
||||
+ "\n" + new_content
|
||||
+ '#-- END BLOCK 2'))
|
||||
fp.read().startswith(salt.utils.to_bytes(
|
||||
os.linesep.join([
|
||||
'#-- START BLOCK 2',
|
||||
'{0}#-- END BLOCK 2'.format(new_content)]))))
|
||||
|
||||
def test_replace_partial_marked_lines(self):
|
||||
filemod.blockreplace(self.tfile.name,
|
||||
|
@ -477,6 +498,7 @@ class FileModuleTestCase(TestCase, LoaderModuleMockMixin):
|
|||
}
|
||||
}
|
||||
|
||||
@skipIf(salt.utils.is_windows(), 'SED is not available on Windows')
|
||||
def test_sed_limit_escaped(self):
|
||||
with tempfile.NamedTemporaryFile(mode='w+') as tfile:
|
||||
tfile.write(SED_CONTENT)
|
||||
|
@ -501,127 +523,131 @@ class FileModuleTestCase(TestCase, LoaderModuleMockMixin):
|
|||
newlines at end of file.
|
||||
'''
|
||||
# File ending with a newline
|
||||
with tempfile.NamedTemporaryFile(mode='w+') as tfile:
|
||||
tfile.write('foo\n')
|
||||
with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as tfile:
|
||||
tfile.write(salt.utils.to_bytes('foo' + os.linesep))
|
||||
tfile.flush()
|
||||
filemod.append(tfile.name, 'bar')
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), 'foo\nbar\n')
|
||||
filemod.append(tfile.name, 'bar')
|
||||
expected = os.linesep.join(['foo', 'bar']) + os.linesep
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), expected)
|
||||
|
||||
# File not ending with a newline
|
||||
with tempfile.NamedTemporaryFile(mode='w+') as tfile:
|
||||
tfile.write('foo')
|
||||
with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as tfile:
|
||||
tfile.write(salt.utils.to_bytes('foo'))
|
||||
tfile.flush()
|
||||
filemod.append(tfile.name, 'bar')
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), expected)
|
||||
|
||||
# A newline should be added in empty files
|
||||
with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as tfile:
|
||||
filemod.append(tfile.name, 'bar')
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), 'foo\nbar\n')
|
||||
# A newline should not be added in empty files
|
||||
with tempfile.NamedTemporaryFile(mode='w+') as tfile:
|
||||
filemod.append(tfile.name, 'bar')
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), 'bar\n')
|
||||
with salt.utils.fopen(tfile.name) as tfile2:
|
||||
self.assertEqual(tfile2.read(), 'bar' + os.linesep)
|
||||
|
||||
def test_extract_hash(self):
|
||||
'''
|
||||
Check various hash file formats.
|
||||
'''
|
||||
# With file name
|
||||
with tempfile.NamedTemporaryFile(mode='w+') as tfile:
|
||||
tfile.write(
|
||||
with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as tfile:
|
||||
tfile.write(salt.utils.to_bytes(
|
||||
'rc.conf ef6e82e4006dee563d98ada2a2a80a27\n'
|
||||
'ead48423703509d37c4a90e6a0d53e143b6fc268 example.tar.gz\n'
|
||||
'fe05bcdcdc4928012781a5f1a2a77cbb5398e106 ./subdir/example.tar.gz\n'
|
||||
'ad782ecdac770fc6eb9a62e44f90873fb97fb26b foo.tar.bz2\n'
|
||||
)
|
||||
))
|
||||
tfile.flush()
|
||||
|
||||
result = filemod.extract_hash(tfile.name, '', '/rc.conf')
|
||||
self.assertEqual(result, {
|
||||
'hsum': 'ef6e82e4006dee563d98ada2a2a80a27',
|
||||
'hash_type': 'md5'
|
||||
})
|
||||
result = filemod.extract_hash(tfile.name, '', '/rc.conf')
|
||||
self.assertEqual(result, {
|
||||
'hsum': 'ef6e82e4006dee563d98ada2a2a80a27',
|
||||
'hash_type': 'md5'
|
||||
})
|
||||
|
||||
result = filemod.extract_hash(tfile.name, '', '/example.tar.gz')
|
||||
self.assertEqual(result, {
|
||||
result = filemod.extract_hash(tfile.name, '', '/example.tar.gz')
|
||||
self.assertEqual(result, {
|
||||
'hsum': 'ead48423703509d37c4a90e6a0d53e143b6fc268',
|
||||
'hash_type': 'sha1'
|
||||
})
|
||||
|
||||
# All the checksums in this test file are sha1 sums. We run this
|
||||
# loop three times. The first pass tests auto-detection of hash
|
||||
# type by length of the hash. The second tests matching a specific
|
||||
# type. The third tests a failed attempt to match a specific type,
|
||||
# since sha256 was requested but sha1 is what is in the file.
|
||||
for hash_type in ('', 'sha1', 'sha256'):
|
||||
# Test the source_hash_name argument. Even though there are
|
||||
# matches in the source_hash file for both the file_name and
|
||||
# source params, they should be ignored in favor of the
|
||||
# source_hash_name.
|
||||
file_name = '/example.tar.gz'
|
||||
source = 'https://mydomain.tld/foo.tar.bz2?key1=val1&key2=val2'
|
||||
source_hash_name = './subdir/example.tar.gz'
|
||||
result = filemod.extract_hash(
|
||||
tfile.name,
|
||||
hash_type,
|
||||
file_name,
|
||||
source,
|
||||
source_hash_name)
|
||||
expected = {
|
||||
'hsum': 'fe05bcdcdc4928012781a5f1a2a77cbb5398e106',
|
||||
'hash_type': 'sha1'
|
||||
} if hash_type != 'sha256' else None
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
# Test both a file_name and source but no source_hash_name.
|
||||
# Even though there are matches for both file_name and
|
||||
# source_hash_name, file_name should be preferred.
|
||||
file_name = '/example.tar.gz'
|
||||
source = 'https://mydomain.tld/foo.tar.bz2?key1=val1&key2=val2'
|
||||
source_hash_name = None
|
||||
result = filemod.extract_hash(
|
||||
tfile.name,
|
||||
hash_type,
|
||||
file_name,
|
||||
source,
|
||||
source_hash_name)
|
||||
expected = {
|
||||
'hsum': 'ead48423703509d37c4a90e6a0d53e143b6fc268',
|
||||
'hash_type': 'sha1'
|
||||
})
|
||||
} if hash_type != 'sha256' else None
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
# All the checksums in this test file are sha1 sums. We run this
|
||||
# loop three times. The first pass tests auto-detection of hash
|
||||
# type by length of the hash. The second tests matching a specific
|
||||
# type. The third tests a failed attempt to match a specific type,
|
||||
# since sha256 was requested but sha1 is what is in the file.
|
||||
for hash_type in ('', 'sha1', 'sha256'):
|
||||
# Test the source_hash_name argument. Even though there are
|
||||
# matches in the source_hash file for both the file_name and
|
||||
# source params, they should be ignored in favor of the
|
||||
# source_hash_name.
|
||||
file_name = '/example.tar.gz'
|
||||
source = 'https://mydomain.tld/foo.tar.bz2?key1=val1&key2=val2'
|
||||
source_hash_name = './subdir/example.tar.gz'
|
||||
result = filemod.extract_hash(
|
||||
tfile.name,
|
||||
hash_type,
|
||||
file_name,
|
||||
source,
|
||||
source_hash_name)
|
||||
expected = {
|
||||
'hsum': 'fe05bcdcdc4928012781a5f1a2a77cbb5398e106',
|
||||
'hash_type': 'sha1'
|
||||
} if hash_type != 'sha256' else None
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
# Test both a file_name and source but no source_hash_name.
|
||||
# Even though there are matches for both file_name and
|
||||
# source_hash_name, file_name should be preferred.
|
||||
file_name = '/example.tar.gz'
|
||||
source = 'https://mydomain.tld/foo.tar.bz2?key1=val1&key2=val2'
|
||||
source_hash_name = None
|
||||
result = filemod.extract_hash(
|
||||
tfile.name,
|
||||
hash_type,
|
||||
file_name,
|
||||
source,
|
||||
source_hash_name)
|
||||
expected = {
|
||||
'hsum': 'ead48423703509d37c4a90e6a0d53e143b6fc268',
|
||||
'hash_type': 'sha1'
|
||||
} if hash_type != 'sha256' else None
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
# Test both a file_name and source but no source_hash_name.
|
||||
# Since there is no match for the file_name, the source is
|
||||
# matched.
|
||||
file_name = '/somefile.tar.gz'
|
||||
source = 'https://mydomain.tld/foo.tar.bz2?key1=val1&key2=val2'
|
||||
source_hash_name = None
|
||||
result = filemod.extract_hash(
|
||||
tfile.name,
|
||||
hash_type,
|
||||
file_name,
|
||||
source,
|
||||
source_hash_name)
|
||||
expected = {
|
||||
'hsum': 'ad782ecdac770fc6eb9a62e44f90873fb97fb26b',
|
||||
'hash_type': 'sha1'
|
||||
} if hash_type != 'sha256' else None
|
||||
self.assertEqual(result, expected)
|
||||
# Test both a file_name and source but no source_hash_name.
|
||||
# Since there is no match for the file_name, the source is
|
||||
# matched.
|
||||
file_name = '/somefile.tar.gz'
|
||||
source = 'https://mydomain.tld/foo.tar.bz2?key1=val1&key2=val2'
|
||||
source_hash_name = None
|
||||
result = filemod.extract_hash(
|
||||
tfile.name,
|
||||
hash_type,
|
||||
file_name,
|
||||
source,
|
||||
source_hash_name)
|
||||
expected = {
|
||||
'hsum': 'ad782ecdac770fc6eb9a62e44f90873fb97fb26b',
|
||||
'hash_type': 'sha1'
|
||||
} if hash_type != 'sha256' else None
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
# Hash only, no file name (Maven repo checksum format)
|
||||
# Since there is no name match, the first checksum in the file will
|
||||
# always be returned, never the second.
|
||||
with tempfile.NamedTemporaryFile(mode='w+') as tfile:
|
||||
tfile.write('ead48423703509d37c4a90e6a0d53e143b6fc268\n'
|
||||
'ad782ecdac770fc6eb9a62e44f90873fb97fb26b\n')
|
||||
with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as tfile:
|
||||
tfile.write(salt.utils.to_bytes(
|
||||
'ead48423703509d37c4a90e6a0d53e143b6fc268\n'
|
||||
'ad782ecdac770fc6eb9a62e44f90873fb97fb26b\n'))
|
||||
tfile.flush()
|
||||
|
||||
for hash_type in ('', 'sha1', 'sha256'):
|
||||
result = filemod.extract_hash(tfile.name, hash_type, '/testfile')
|
||||
expected = {
|
||||
'hsum': 'ead48423703509d37c4a90e6a0d53e143b6fc268',
|
||||
'hash_type': 'sha1'
|
||||
} if hash_type != 'sha256' else None
|
||||
self.assertEqual(result, expected)
|
||||
for hash_type in ('', 'sha1', 'sha256'):
|
||||
result = filemod.extract_hash(tfile.name, hash_type, '/testfile')
|
||||
expected = {
|
||||
'hsum': 'ead48423703509d37c4a90e6a0d53e143b6fc268',
|
||||
'hash_type': 'sha1'
|
||||
} if hash_type != 'sha256' else None
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
def test_user_to_uid_int(self):
|
||||
'''
|
||||
|
@ -774,6 +800,7 @@ class FileBasicsTestCase(TestCase, LoaderModuleMockMixin):
|
|||
self.addCleanup(os.remove, self.myfile)
|
||||
self.addCleanup(delattr, self, 'myfile')
|
||||
|
||||
@skipIf(salt.utils.is_windows(), 'os.symlink is not available on Windows')
|
||||
def test_symlink_already_in_desired_state(self):
|
||||
os.symlink(self.tfile.name, self.directory + '/a_link')
|
||||
self.addCleanup(os.remove, self.directory + '/a_link')
|
||||
|
|
Loading…
Add table
Reference in a new issue