mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #48503 from rallytime/bp-46291
Back-port #46291 to 2018.3
This commit is contained in:
commit
ee257a1f91
3 changed files with 297 additions and 167 deletions
|
@ -61,6 +61,7 @@ import salt.utils.stringutils
|
|||
import salt.utils.templates
|
||||
import salt.utils.url
|
||||
import salt.utils.user
|
||||
import salt.utils.data
|
||||
from salt.exceptions import CommandExecutionError, MinionError, SaltInvocationError, get_error_message as _get_error_message
|
||||
from salt.utils.files import HASHES, HASHES_REVMAP
|
||||
|
||||
|
@ -1718,18 +1719,19 @@ def _regex_to_static(src, regex):
|
|||
return None
|
||||
|
||||
try:
|
||||
src = re.search(regex, src, re.M)
|
||||
compiled = re.compile(regex, re.DOTALL)
|
||||
src = [line for line in src if compiled.search(line) or line.count(regex)]
|
||||
except Exception as ex:
|
||||
raise CommandExecutionError("{0}: '{1}'".format(_get_error_message(ex), regex))
|
||||
|
||||
return src and src.group().rstrip('\r') or regex
|
||||
return src and src or []
|
||||
|
||||
|
||||
def _assert_occurrence(src, probe, target, amount=1):
|
||||
def _assert_occurrence(probe, target, amount=1):
|
||||
'''
|
||||
Raise an exception, if there are different amount of specified occurrences in src.
|
||||
'''
|
||||
occ = src.count(probe)
|
||||
occ = len(probe)
|
||||
if occ > amount:
|
||||
msg = 'more than'
|
||||
elif occ < amount:
|
||||
|
@ -1745,7 +1747,7 @@ def _assert_occurrence(src, probe, target, amount=1):
|
|||
return occ
|
||||
|
||||
|
||||
def _get_line_indent(src, line, indent):
|
||||
def _set_line_indent(src, line, indent):
|
||||
'''
|
||||
Indent the line with the source line.
|
||||
'''
|
||||
|
@ -1758,7 +1760,36 @@ def _get_line_indent(src, line, indent):
|
|||
break
|
||||
idt.append(c)
|
||||
|
||||
return ''.join(idt) + line.strip()
|
||||
return ''.join(idt) + line.lstrip()
|
||||
|
||||
|
||||
def _get_eol(line):
|
||||
match = re.search('((?<!\r)\n|\r(?!\n)|\r\n)$', line)
|
||||
return match and match.group() or ''
|
||||
|
||||
|
||||
def _set_line_eol(src, line):
|
||||
'''
|
||||
Add line ending
|
||||
'''
|
||||
line_ending = _get_eol(src) or os.linesep
|
||||
return line.rstrip() + line_ending
|
||||
|
||||
|
||||
def _insert_line_before(idx, body, content, indent):
|
||||
if not idx or (idx and _starts_till(body[idx - 1], content) < 0):
|
||||
cnd = _set_line_indent(body[idx], content, indent)
|
||||
body.insert(idx, cnd)
|
||||
return body
|
||||
|
||||
|
||||
def _insert_line_after(idx, body, content, indent):
|
||||
# No duplicates or append, if "after" is the last line
|
||||
next_line = idx + 1 < len(body) and body[idx + 1] or None
|
||||
if next_line is None or _starts_till(next_line, content) < 0:
|
||||
cnd = _set_line_indent(body[idx], content, indent)
|
||||
body.insert(idx + 1, cnd)
|
||||
return body
|
||||
|
||||
|
||||
def line(path, content=None, match=None, mode=None, location=None,
|
||||
|
@ -1889,132 +1920,110 @@ def line(path, content=None, match=None, mode=None, location=None,
|
|||
match = content
|
||||
|
||||
with salt.utils.files.fopen(path, mode='r') as fp_:
|
||||
body = salt.utils.stringutils.to_unicode(fp_.read())
|
||||
body_before = hashlib.sha256(salt.utils.stringutils.to_bytes(body)).hexdigest()
|
||||
body = salt.utils.data.decode_list(fp_.readlines())
|
||||
body_before = hashlib.sha256(salt.utils.stringutils.to_bytes(''.join(body))).hexdigest()
|
||||
# Add empty line at the end if last line ends with eol.
|
||||
# Allows simpler code
|
||||
if body and _get_eol(body[-1]):
|
||||
body.append('')
|
||||
|
||||
after = _regex_to_static(body, after)
|
||||
before = _regex_to_static(body, before)
|
||||
match = _regex_to_static(body, match)
|
||||
|
||||
if os.stat(path).st_size == 0 and mode in ('delete', 'replace'):
|
||||
log.warning('Cannot find text to {0}. File \'{1}\' is empty.'.format(mode, path))
|
||||
body = ''
|
||||
elif mode == 'delete':
|
||||
body = os.linesep.join([line for line in body.split(os.linesep) if line.find(match) < 0])
|
||||
elif mode == 'replace':
|
||||
body = os.linesep.join([(_get_line_indent(file_line, content, indent)
|
||||
if (file_line.find(match) > -1 and not file_line == content) else file_line)
|
||||
for file_line in body.split(os.linesep)])
|
||||
body = []
|
||||
elif mode == 'delete' and match:
|
||||
body = [line for line in body if line != match[0]]
|
||||
elif mode == 'replace' and match:
|
||||
idx = body.index(match[0])
|
||||
file_line = body.pop(idx)
|
||||
body.insert(idx, _set_line_indent(file_line, content, indent))
|
||||
elif mode == 'insert':
|
||||
if not location and not before and not after:
|
||||
raise CommandExecutionError('On insert must be defined either "location" or "before/after" conditions.')
|
||||
|
||||
if not location:
|
||||
if before and after:
|
||||
_assert_occurrence(body, before, 'before')
|
||||
_assert_occurrence(body, after, 'after')
|
||||
_assert_occurrence(before, 'before')
|
||||
_assert_occurrence(after, 'after')
|
||||
|
||||
out = []
|
||||
lines = body.split(os.linesep)
|
||||
in_range = False
|
||||
for line in lines:
|
||||
if line.find(after) > -1:
|
||||
for line in body:
|
||||
if line == after[0]:
|
||||
in_range = True
|
||||
elif line.find(before) > -1 and in_range:
|
||||
out.append(_get_line_indent(line, content, indent))
|
||||
elif line == before[0] and in_range:
|
||||
cnd = _set_line_indent(line, content, indent)
|
||||
out.append(cnd)
|
||||
out.append(line)
|
||||
body = os.linesep.join(out)
|
||||
body = out
|
||||
|
||||
if before and not after:
|
||||
_assert_occurrence(body, before, 'before')
|
||||
out = []
|
||||
lines = body.split(os.linesep)
|
||||
for idx in range(len(lines)):
|
||||
_line = lines[idx]
|
||||
if _line.find(before) > -1:
|
||||
cnd = _get_line_indent(_line, content, indent)
|
||||
if not idx or (idx and _starts_till(lines[idx - 1], cnd) < 0): # Job for replace instead
|
||||
out.append(cnd)
|
||||
out.append(_line)
|
||||
body = os.linesep.join(out)
|
||||
_assert_occurrence(before, 'before')
|
||||
|
||||
idx = body.index(before[0])
|
||||
body = _insert_line_before(idx, body, content, indent)
|
||||
|
||||
elif after and not before:
|
||||
_assert_occurrence(body, after, 'after')
|
||||
out = []
|
||||
lines = body.split(os.linesep)
|
||||
for idx, _line in enumerate(lines):
|
||||
out.append(_line)
|
||||
cnd = _get_line_indent(_line, content, indent)
|
||||
# No duplicates or append, if "after" is the last line
|
||||
if (_line.find(after) > -1 and
|
||||
(lines[((idx + 1) < len(lines)) and idx + 1 or idx].strip() != cnd or
|
||||
idx + 1 == len(lines))):
|
||||
out.append(cnd)
|
||||
body = os.linesep.join(out)
|
||||
_assert_occurrence(after, 'after')
|
||||
|
||||
idx = body.index(after[0])
|
||||
body = _insert_line_after(idx, body, content, indent)
|
||||
|
||||
else:
|
||||
if location == 'start':
|
||||
body = os.linesep.join((content, body))
|
||||
if body:
|
||||
body.insert(0, _set_line_eol(body[0], content))
|
||||
else:
|
||||
body.append(content + os.linesep)
|
||||
elif location == 'end':
|
||||
body = os.linesep.join((body, _get_line_indent(body[-1], content, indent) if body else content))
|
||||
body.append(_set_line_indent(body[-1], content, indent) if body else content)
|
||||
|
||||
elif mode == 'ensure':
|
||||
after = after and after.strip()
|
||||
before = before and before.strip()
|
||||
|
||||
if before and after:
|
||||
_assert_occurrence(body, before, 'before')
|
||||
_assert_occurrence(body, after, 'after')
|
||||
_assert_occurrence(before, 'before')
|
||||
_assert_occurrence(after, 'after')
|
||||
|
||||
is_there = bool(body.count(content))
|
||||
is_there = bool([l for l in body if l.count(content)])
|
||||
if not is_there:
|
||||
out = []
|
||||
body = body.split(os.linesep)
|
||||
for idx, line in enumerate(body):
|
||||
out.append(line)
|
||||
if line.find(content) > -1:
|
||||
is_there = True
|
||||
if not is_there:
|
||||
if idx < (len(body) - 1) and line.find(after) > -1 and body[idx + 1].find(before) > -1:
|
||||
out.append(content)
|
||||
elif line.find(after) > -1:
|
||||
raise CommandExecutionError('Found more than one line between '
|
||||
'boundaries "before" and "after".')
|
||||
body = os.linesep.join(out)
|
||||
idx = body.index(after[0])
|
||||
if idx < (len(body) - 1) and body[idx + 1] == before[0]:
|
||||
cnd = _set_line_indent(body[idx], content, indent)
|
||||
body.insert(idx + 1, cnd)
|
||||
else:
|
||||
raise CommandExecutionError('Found more than one line between '
|
||||
'boundaries "before" and "after".')
|
||||
|
||||
elif before and not after:
|
||||
_assert_occurrence(body, before, 'before')
|
||||
body = body.split(os.linesep)
|
||||
out = []
|
||||
for idx in range(len(body)):
|
||||
if body[idx].find(before) > -1:
|
||||
prev = (idx > 0 and idx or 1) - 1
|
||||
out.append(_get_line_indent(body[idx], content, indent))
|
||||
if _starts_till(out[prev], content) > -1:
|
||||
del out[prev]
|
||||
out.append(body[idx])
|
||||
body = os.linesep.join(out)
|
||||
_assert_occurrence(before, 'before')
|
||||
|
||||
idx = body.index(before[0])
|
||||
body = _insert_line_before(idx, body, content, indent)
|
||||
|
||||
elif not before and after:
|
||||
_assert_occurrence(body, after, 'after')
|
||||
body = body.split(os.linesep)
|
||||
skip = None
|
||||
out = []
|
||||
for idx in range(len(body)):
|
||||
if skip != body[idx]:
|
||||
out.append(body[idx])
|
||||
_assert_occurrence(after, 'after')
|
||||
|
||||
if body[idx].find(after) > -1:
|
||||
next_line = idx + 1 < len(body) and body[idx + 1] or None
|
||||
if next_line is not None and _starts_till(next_line, content) > -1:
|
||||
skip = next_line
|
||||
out.append(_get_line_indent(body[idx], content, indent))
|
||||
body = os.linesep.join(out)
|
||||
idx = body.index(after[0])
|
||||
body = _insert_line_after(idx, body, content, indent)
|
||||
|
||||
else:
|
||||
raise CommandExecutionError("Wrong conditions? "
|
||||
"Unable to ensure line without knowing "
|
||||
"where to put it before and/or after.")
|
||||
|
||||
changed = body_before != hashlib.sha256(salt.utils.stringutils.to_bytes(body)).hexdigest()
|
||||
if body:
|
||||
for idx, line in enumerate(body):
|
||||
if not _get_eol(line) and idx+1 < len(body):
|
||||
prev = idx and idx-1 or 1
|
||||
body[idx] = _set_line_eol(body[prev], line)
|
||||
# We do not need empty line at the end anymore
|
||||
if '' == body[-1]:
|
||||
body.pop()
|
||||
|
||||
changed = body_before != hashlib.sha256(salt.utils.stringutils.to_bytes(''.join(body))).hexdigest()
|
||||
|
||||
if backup and changed and __opts__['test'] is False:
|
||||
try:
|
||||
|
@ -2028,12 +2037,9 @@ def line(path, content=None, match=None, mode=None, location=None,
|
|||
if changed:
|
||||
if show_changes:
|
||||
with salt.utils.files.fopen(path, 'r') as fp_:
|
||||
path_content = [salt.utils.stringutils.to_unicode(x)
|
||||
for x in fp_.read().splitlines(True)]
|
||||
path_content = salt.utils.data.decode_list(fp_.read().splitlines(True))
|
||||
changes_diff = ''.join(difflib.unified_diff(
|
||||
path_content,
|
||||
[salt.utils.stringutils.to_unicode(x)
|
||||
for x in body.splitlines(True)]
|
||||
path_content, body
|
||||
))
|
||||
if __opts__['test'] is False:
|
||||
fh_ = None
|
||||
|
@ -2041,12 +2047,12 @@ def line(path, content=None, match=None, mode=None, location=None,
|
|||
# Make sure we match the file mode from salt.utils.files.fopen
|
||||
if six.PY2 and salt.utils.platform.is_windows():
|
||||
mode = 'wb'
|
||||
body = salt.utils.stringutils.to_bytes(body)
|
||||
body = salt.utils.data.encode_list(body)
|
||||
else:
|
||||
mode = 'w'
|
||||
body = salt.utils.stringutils.to_str(body)
|
||||
body = salt.utils.data.decode_list(body, to_str=True)
|
||||
fh_ = salt.utils.atomicfile.atomic_open(path, mode)
|
||||
fh_.write(body)
|
||||
fh_.writelines(body)
|
||||
finally:
|
||||
if fh_:
|
||||
fh_.close()
|
||||
|
|
|
@ -58,8 +58,9 @@ from salt.modules.file import (check_hash, # pylint: disable=W0611
|
|||
RE_FLAG_TABLE, blockreplace, prepend, seek_read, seek_write, rename,
|
||||
lstat, path_exists_glob, write, pardir, join, HASHES, HASHES_REVMAP,
|
||||
comment, uncomment, _add_flags, comment_line, _regex_to_static,
|
||||
_get_line_indent, apply_template_on_contents, dirname, basename,
|
||||
list_backups_dir, _assert_occurrence, _starts_till)
|
||||
_set_line_indent, apply_template_on_contents, dirname, basename,
|
||||
list_backups_dir, _assert_occurrence, _starts_till, _set_line_eol, _get_eol,
|
||||
_insert_line_after, _insert_line_before)
|
||||
from salt.modules.file import normpath as normpath_
|
||||
|
||||
from salt.utils.functools import namespaced_function as _namespaced_function
|
||||
|
@ -116,8 +117,9 @@ def __virtual__():
|
|||
global blockreplace, prepend, seek_read, seek_write, rename, lstat
|
||||
global write, pardir, join, _add_flags, apply_template_on_contents
|
||||
global path_exists_glob, comment, uncomment, _mkstemp_copy
|
||||
global _regex_to_static, _get_line_indent, dirname, basename
|
||||
global _regex_to_static, _set_line_indent, dirname, basename
|
||||
global list_backups_dir, normpath_, _assert_occurrence, _starts_till
|
||||
global _insert_line_before, _insert_line_after, _set_line_eol, _get_eol
|
||||
|
||||
replace = _namespaced_function(replace, globals())
|
||||
search = _namespaced_function(search, globals())
|
||||
|
@ -173,7 +175,11 @@ def __virtual__():
|
|||
uncomment = _namespaced_function(uncomment, globals())
|
||||
comment_line = _namespaced_function(comment_line, globals())
|
||||
_regex_to_static = _namespaced_function(_regex_to_static, globals())
|
||||
_get_line_indent = _namespaced_function(_get_line_indent, globals())
|
||||
_set_line_indent = _namespaced_function(_set_line_indent, globals())
|
||||
_set_line_eol = _namespaced_function(_set_line_eol, globals())
|
||||
_get_eol = _namespaced_function(_get_eol, globals())
|
||||
_insert_line_after = _namespaced_function(_insert_line_after, globals())
|
||||
_insert_line_before = _namespaced_function(_insert_line_before, globals())
|
||||
_mkstemp_copy = _namespaced_function(_mkstemp_copy, globals())
|
||||
_add_flags = _namespaced_function(_add_flags, globals())
|
||||
apply_template_on_contents = _namespaced_function(apply_template_on_contents, globals())
|
||||
|
|
|
@ -23,6 +23,7 @@ except ImportError:
|
|||
from salt.ext import six
|
||||
import salt.config
|
||||
import salt.loader
|
||||
import salt.utils.data
|
||||
import salt.utils.files
|
||||
import salt.utils.platform
|
||||
import salt.utils.stringutils
|
||||
|
@ -921,6 +922,19 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
}
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _get_body(content):
|
||||
'''
|
||||
The body is written as bytestrings or strings depending on platform.
|
||||
This func accepts a string of content and returns the appropriate list
|
||||
of strings back.
|
||||
'''
|
||||
ret = content.splitlines(True)
|
||||
if six.PY2 and salt.utils.platform.is_windows():
|
||||
return salt.utils.data.encode_list(ret)
|
||||
else:
|
||||
return salt.utils.data.decode_list(ret, to_str=True)
|
||||
|
||||
@patch('os.path.realpath', MagicMock())
|
||||
@patch('os.path.isfile', MagicMock(return_value=True))
|
||||
def test_delete_line_in_empty_file(self):
|
||||
|
@ -940,6 +954,29 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
self.assertIn('Cannot find text to {0}'.format(mode),
|
||||
_log.warning.call_args_list[0][0][0])
|
||||
|
||||
@patch('os.path.realpath', MagicMock())
|
||||
@patch('os.path.isfile', MagicMock(return_value=True))
|
||||
@patch('os.stat', MagicMock())
|
||||
def test_line_delete_no_match(self):
|
||||
'''
|
||||
Tests that when calling file.line with ``mode=delete``,
|
||||
with not matching pattern to delete returns False
|
||||
:return:
|
||||
'''
|
||||
file_content = os.linesep.join([
|
||||
'file_roots:',
|
||||
' base:',
|
||||
' - /srv/salt',
|
||||
' - /srv/custom'
|
||||
])
|
||||
match = 'not matching'
|
||||
for mode in ['delete', 'replace']:
|
||||
files_fopen = mock_open(read_data=file_content)
|
||||
with patch('salt.utils.files.fopen', files_fopen):
|
||||
atomic_opener = mock_open()
|
||||
with patch('salt.utils.atomicfile.atomic_open', atomic_opener):
|
||||
self.assertFalse(filemod.line('foo', content='foo', match=match, mode=mode))
|
||||
|
||||
@patch('os.path.realpath', MagicMock())
|
||||
@patch('os.path.isfile', MagicMock(return_value=True))
|
||||
def test_line_modecheck_failure(self):
|
||||
|
@ -1026,12 +1063,13 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
# We should only have opened the file once
|
||||
open_count = len(handles)
|
||||
assert open_count == 1, open_count
|
||||
# We should only have invoked .write() once...
|
||||
write_count = len(handles[0].write.call_args_list)
|
||||
assert write_count == 1, write_count
|
||||
# We should only have invoked .writelines() once...
|
||||
writelines_content = handles[0].writelines_calls
|
||||
writelines_count = len(writelines_content)
|
||||
assert writelines_count == 1, writelines_count
|
||||
# ... with the updated content
|
||||
write_content = handles[0].write.call_args_list[0][0][0]
|
||||
assert write_content == file_modified, write_content
|
||||
expected = self._get_body(file_modified)
|
||||
assert writelines_content[0] == expected, (writelines_content[0], expected)
|
||||
|
||||
@with_tempfile()
|
||||
def test_line_insert_after_pattern(self, name):
|
||||
|
@ -1076,12 +1114,17 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
# We should only have opened the file once
|
||||
open_count = len(handles)
|
||||
assert open_count == 1, open_count
|
||||
# We should only have invoked .write() once...
|
||||
write_count = len(handles[0].write.call_args_list)
|
||||
assert write_count == 1, write_count
|
||||
# We should only have invoked .writelines() once...
|
||||
writelines_content = handles[0].writelines_calls
|
||||
writelines_count = len(writelines_content)
|
||||
assert writelines_count == 1, writelines_count
|
||||
# ... with the updated content
|
||||
write_content = handles[0].write.call_args_list[0][0][0]
|
||||
assert write_content == file_modified, write_content
|
||||
expected = self._get_body(file_modified)
|
||||
# We passed cfg_content with a newline in the middle, so it
|
||||
# will be written as two lines in the same element of the list
|
||||
# passed to .writelines()
|
||||
expected[3] = expected[3] + expected.pop(4)
|
||||
assert writelines_content[0] == expected, (writelines_content[0], expected)
|
||||
|
||||
@with_tempfile()
|
||||
def test_line_insert_multi_line_content_after_unicode(self, name):
|
||||
|
@ -1091,8 +1134,10 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
See issue #48113
|
||||
:return:
|
||||
'''
|
||||
file_content = ("This is a line\nThis is another line")
|
||||
file_modified = salt.utils.stringutils.to_str("This is a line\nThis is another line\nThis is a line with unicode Ŷ")
|
||||
file_content = 'This is a line\nThis is another line'
|
||||
file_modified = salt.utils.stringutils.to_str('This is a line\n'
|
||||
'This is another line\n'
|
||||
'This is a line with unicode Ŷ')
|
||||
cfg_content = "This is a line with unicode Ŷ"
|
||||
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
|
||||
for after_line in ['This is another line']:
|
||||
|
@ -1107,12 +1152,13 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
# We should only have opened the file once
|
||||
open_count = len(handles)
|
||||
assert open_count == 1, open_count
|
||||
# We should only have invoked .write() once...
|
||||
write_count = len(handles[0].write.call_args_list)
|
||||
assert write_count == 1, write_count
|
||||
# We should only have invoked .writelines() once...
|
||||
writelines_content = handles[0].writelines_calls
|
||||
writelines_count = len(writelines_content)
|
||||
assert writelines_count == 1, writelines_count
|
||||
# ... with the updated content
|
||||
write_content = handles[0].write.call_args_list[0][0][0]
|
||||
assert write_content == file_modified, write_content
|
||||
expected = self._get_body(file_modified)
|
||||
assert writelines_content[0] == expected, (writelines_content[0], expected)
|
||||
|
||||
@with_tempfile()
|
||||
def test_line_insert_before(self, name):
|
||||
|
@ -1138,7 +1184,7 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
cfg_content = '- /srv/custom'
|
||||
|
||||
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
|
||||
for before_line in ['/srv/salt', '/srv/sa.*t', '/sr.*']:
|
||||
for before_line in ['/srv/salt', '/srv/sa.*t']:
|
||||
with patch('os.path.isfile', isfile_mock), \
|
||||
patch('os.stat', MagicMock(return_value=DummyStat())), \
|
||||
patch('salt.utils.files.fopen',
|
||||
|
@ -1150,12 +1196,39 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
# We should only have opened the file once
|
||||
open_count = len(handles)
|
||||
assert open_count == 1, open_count
|
||||
# We should only have invoked .write() once...
|
||||
write_count = len(handles[0].write.call_args_list)
|
||||
assert write_count == 1, write_count
|
||||
# We should only have invoked .writelines() once...
|
||||
writelines_content = handles[0].writelines_calls
|
||||
writelines_count = len(writelines_content)
|
||||
assert writelines_count == 1, writelines_count
|
||||
# ... with the updated content
|
||||
write_content = handles[0].write.call_args_list[0][0][0]
|
||||
assert write_content == file_modified, write_content
|
||||
expected = self._get_body(file_modified)
|
||||
assert writelines_content[0] == expected, (writelines_content[0], expected)
|
||||
|
||||
@patch('os.path.realpath', MagicMock())
|
||||
@patch('os.path.isfile', MagicMock(return_value=True))
|
||||
@patch('os.stat', MagicMock())
|
||||
def test_line_assert_exception_pattern(self):
|
||||
'''
|
||||
Test for file.line for exception on insert with too general pattern.
|
||||
|
||||
:return:
|
||||
'''
|
||||
file_content = os.linesep.join([
|
||||
'file_roots:',
|
||||
' base:',
|
||||
' - /srv/salt',
|
||||
' - /srv/sugar'
|
||||
])
|
||||
cfg_content = '- /srv/custom'
|
||||
for before_line in ['/sr.*']:
|
||||
files_fopen = mock_open(read_data=file_content)
|
||||
with patch('salt.utils.files.fopen', files_fopen):
|
||||
atomic_opener = mock_open()
|
||||
with patch('salt.utils.atomicfile.atomic_open', atomic_opener):
|
||||
with self.assertRaises(CommandExecutionError) as cm:
|
||||
filemod.line('foo', content=cfg_content, before=before_line, mode='insert')
|
||||
self.assertEqual(cm.exception.strerror,
|
||||
'Found more than expected occurrences in "before" expression')
|
||||
|
||||
@with_tempfile()
|
||||
def test_line_insert_before_after(self, name):
|
||||
|
@ -1195,12 +1268,13 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
# We should only have opened the file once
|
||||
open_count = len(handles)
|
||||
assert open_count == 1, open_count
|
||||
# We should only have invoked .write() once...
|
||||
write_count = len(handles[0].write.call_args_list)
|
||||
assert write_count == 1, write_count
|
||||
# We should only have invoked .writelines() once...
|
||||
writelines_content = handles[0].writelines_calls
|
||||
writelines_count = len(writelines_content)
|
||||
assert writelines_count == 1, writelines_count
|
||||
# ... with the updated content
|
||||
write_content = handles[0].write.call_args_list[0][0][0]
|
||||
assert write_content == file_modified, write_content
|
||||
expected = self._get_body(file_modified)
|
||||
assert writelines_content[0] == expected, (writelines_content[0], expected)
|
||||
|
||||
@with_tempfile()
|
||||
def test_line_insert_start(self, name):
|
||||
|
@ -1235,12 +1309,13 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
# We should only have opened the file once
|
||||
open_count = len(handles)
|
||||
assert open_count == 1, open_count
|
||||
# We should only have invoked .write() once...
|
||||
write_count = len(handles[0].write.call_args_list)
|
||||
assert write_count == 1, write_count
|
||||
# We should only have invoked .writelines() once...
|
||||
writelines_content = handles[0].writelines_calls
|
||||
writelines_count = len(writelines_content)
|
||||
assert writelines_count == 1, writelines_count
|
||||
# ... with the updated content
|
||||
write_content = handles[0].write.call_args_list[0][0][0]
|
||||
assert write_content == file_modified, write_content
|
||||
expected = self._get_body(file_modified)
|
||||
assert writelines_content[0] == expected, (writelines_content[0], expected)
|
||||
|
||||
@with_tempfile()
|
||||
def test_line_insert_end(self, name):
|
||||
|
@ -1260,7 +1335,7 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
' base:',
|
||||
' - /srv/salt',
|
||||
' - /srv/sugar',
|
||||
cfg_content
|
||||
' ' + cfg_content
|
||||
])
|
||||
|
||||
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
|
||||
|
@ -1275,12 +1350,13 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
# We should only have opened the file once
|
||||
open_count = len(handles)
|
||||
assert open_count == 1, open_count
|
||||
# We should only have invoked .write() once...
|
||||
write_count = len(handles[0].write.call_args_list)
|
||||
assert write_count == 1, write_count
|
||||
# We should only have invoked .writelines() once...
|
||||
writelines_content = handles[0].writelines_calls
|
||||
writelines_count = len(writelines_content)
|
||||
assert writelines_count == 1, writelines_count
|
||||
# ... with the updated content
|
||||
write_content = handles[0].write.call_args_list[0][0][0]
|
||||
assert write_content == file_modified, write_content
|
||||
expected = self._get_body(file_modified)
|
||||
assert writelines_content[0] == expected, (writelines_content[0], expected)
|
||||
|
||||
@with_tempfile()
|
||||
def test_line_insert_ensure_before(self, name):
|
||||
|
@ -1313,12 +1389,50 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
# We should only have opened the file once
|
||||
open_count = len(handles)
|
||||
assert open_count == 1, open_count
|
||||
# We should only have invoked .write() once...
|
||||
write_count = len(handles[0].write.call_args_list)
|
||||
assert write_count == 1, write_count
|
||||
# We should only have invoked .writelines() once...
|
||||
writelines_content = handles[0].writelines_calls
|
||||
writelines_count = len(writelines_content)
|
||||
assert writelines_count == 1, writelines_count
|
||||
# ... with the updated content
|
||||
write_content = handles[0].write.call_args_list[0][0][0]
|
||||
assert write_content == file_modified, write_content
|
||||
expected = self._get_body(file_modified)
|
||||
assert writelines_content[0] == expected, (writelines_content[0], expected)
|
||||
|
||||
@with_tempfile()
|
||||
def test_line_insert_ensure_before_first_line(self, name):
|
||||
'''
|
||||
Test for file.line for insertion ensuring the line is before first line
|
||||
:return:
|
||||
'''
|
||||
cfg_content = '#!/bin/bash'
|
||||
file_content = os.linesep.join([
|
||||
'/etc/init.d/someservice restart',
|
||||
'exit 0'
|
||||
])
|
||||
file_modified = os.linesep.join([
|
||||
cfg_content,
|
||||
'/etc/init.d/someservice restart',
|
||||
'exit 0'
|
||||
])
|
||||
|
||||
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
|
||||
with patch('os.path.isfile', isfile_mock), \
|
||||
patch('os.stat', MagicMock(return_value=DummyStat())), \
|
||||
patch('salt.utils.files.fopen',
|
||||
mock_open(read_data=file_content)), \
|
||||
patch('salt.utils.atomicfile.atomic_open',
|
||||
mock_open()) as atomic_open_mock:
|
||||
filemod.line(name, content=cfg_content, before='/etc/init.d/someservice restart', mode='ensure')
|
||||
handles = atomic_open_mock.filehandles[name]
|
||||
# We should only have opened the file once
|
||||
open_count = len(handles)
|
||||
assert open_count == 1, open_count
|
||||
# We should only have invoked .writelines() once...
|
||||
writelines_content = handles[0].writelines_calls
|
||||
writelines_count = len(writelines_content)
|
||||
assert writelines_count == 1, writelines_count
|
||||
# ... with the updated content
|
||||
expected = self._get_body(file_modified)
|
||||
assert writelines_content[0] == expected, (writelines_content[0], expected)
|
||||
|
||||
@with_tempfile()
|
||||
def test_line_insert_ensure_after(self, name):
|
||||
|
@ -1349,12 +1463,13 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
# We should only have opened the file once
|
||||
open_count = len(handles)
|
||||
assert open_count == 1, open_count
|
||||
# We should only have invoked .write() once...
|
||||
write_count = len(handles[0].write.call_args_list)
|
||||
assert write_count == 1, write_count
|
||||
# We should only have invoked .writelines() once...
|
||||
writelines_content = handles[0].writelines_calls
|
||||
writelines_count = len(writelines_content)
|
||||
assert writelines_count == 1, writelines_count
|
||||
# ... with the updated content
|
||||
write_content = handles[0].write.call_args_list[0][0][0]
|
||||
assert write_content == file_modified, write_content
|
||||
expected = self._get_body(file_modified)
|
||||
assert writelines_content[0] == expected, (writelines_content[0], expected)
|
||||
|
||||
@with_tempfile()
|
||||
def test_line_insert_ensure_beforeafter_twolines(self, name):
|
||||
|
@ -1385,12 +1500,13 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
# We should only have opened the file once
|
||||
open_count = len(handles)
|
||||
assert open_count == 1, open_count
|
||||
# We should only have invoked .write() once...
|
||||
write_count = len(handles[0].write.call_args_list)
|
||||
assert write_count == 1, write_count
|
||||
# We should only have invoked .writelines() once...
|
||||
writelines_content = handles[0].writelines_calls
|
||||
writelines_count = len(writelines_content)
|
||||
assert writelines_count == 1, writelines_count
|
||||
# ... with the updated content
|
||||
write_content = handles[0].write.call_args_list[0][0][0]
|
||||
assert write_content == file_modified, write_content
|
||||
expected = self._get_body(file_modified)
|
||||
assert writelines_content[0] == expected, (writelines_content[0], expected)
|
||||
|
||||
@with_tempfile()
|
||||
def test_line_insert_ensure_beforeafter_twolines_exists(self, name):
|
||||
|
@ -1479,12 +1595,13 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
# We should only have opened the file once
|
||||
open_count = len(handles)
|
||||
assert open_count == 1, open_count
|
||||
# We should only have invoked .write() once...
|
||||
write_count = len(handles[0].write.call_args_list)
|
||||
assert write_count == 1, write_count
|
||||
# We should only have invoked .writelines() once...
|
||||
writelines_content = handles[0].writelines_calls
|
||||
writelines_count = len(writelines_content)
|
||||
assert writelines_count == 1, writelines_count
|
||||
# ... with the updated content
|
||||
write_content = handles[0].write.call_args_list[0][0][0]
|
||||
assert write_content == file_modified, write_content
|
||||
expected = self._get_body(file_modified)
|
||||
assert writelines_content[0] == expected, (writelines_content[0], expected)
|
||||
|
||||
@with_tempfile()
|
||||
def test_line_replace(self, name):
|
||||
|
@ -1519,12 +1636,13 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
|
|||
# We should only have opened the file once
|
||||
open_count = len(handles)
|
||||
assert open_count == 1, open_count
|
||||
# We should only have invoked .write() once...
|
||||
write_count = len(handles[0].write.call_args_list)
|
||||
assert write_count == 1, write_count
|
||||
# We should only have invoked .writelines() once...
|
||||
writelines_content = handles[0].writelines_calls
|
||||
writelines_count = len(writelines_content)
|
||||
assert writelines_count == 1, writelines_count
|
||||
# ... with the updated content
|
||||
write_content = handles[0].write.call_args_list[0][0][0]
|
||||
assert write_content == file_modified, write_content
|
||||
expected = self._get_body(file_modified)
|
||||
assert writelines_content[0] == expected, (writelines_content[0], expected)
|
||||
|
||||
|
||||
class FileBasicsTestCase(TestCase, LoaderModuleMockMixin):
|
||||
|
|
Loading…
Add table
Reference in a new issue