Merge pull request #48166 from terminalmage/salt-jenkins-1000

Add trace logging and optimize file handling in grains.core.os_data
This commit is contained in:
Nicole Thomas 2018-06-21 14:02:32 -04:00 committed by GitHub
commit 51928ff050
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 1615 additions and 753 deletions

View file

@ -1371,6 +1371,18 @@ _OS_FAMILY_MAP = {
'AIX': 'AIX'
}
# Matches any possible format:
# DISTRIB_ID="Ubuntu"
# DISTRIB_ID='Mageia'
# DISTRIB_ID=Fedora
# DISTRIB_RELEASE='10.10'
# DISTRIB_CODENAME='squeeze'
# DISTRIB_DESCRIPTION='Ubuntu 10.10'
_LSB_REGEX = re.compile((
'^(DISTRIB_(?:ID|RELEASE|CODENAME|DESCRIPTION))=(?:\'|")?'
'([\\w\\s\\.\\-_]+)(?:\'|")?'
))
def _linux_bin_exists(binary):
'''
@ -1403,32 +1415,49 @@ def _get_interfaces():
return _INTERFACES
def _parse_os_release(os_release_files):
def _parse_lsb_release():
ret = {}
try:
log.trace('Attempting to parse /etc/lsb-release')
with salt.utils.files.fopen('/etc/lsb-release') as ifile:
for line in ifile:
try:
key, value = _LSB_REGEX.match(line.rstrip('\n')).groups()[:2]
except AttributeError:
pass
else:
# Adds lsb_distrib_{id,release,codename,description}
ret['lsb_{0}'.format(key.lower())] = value.rstrip()
except (IOError, OSError) as exc:
log.trace('Failed to parse /etc/lsb-release: %s', exc)
return ret
def _parse_os_release(*os_release_files):
'''
Parse os-release and return a parameter dictionary
See http://www.freedesktop.org/software/systemd/man/os-release.html
for specification of the file format.
'''
data = dict()
ret = {}
for filename in os_release_files:
if os.path.isfile(filename):
try:
with salt.utils.files.fopen(filename) as ifile:
regex = re.compile('^([\\w]+)=(?:\'|")?(.*?)(?:\'|")?$')
for line in ifile:
match = regex.match(line.strip())
if match:
# Shell special characters ("$", quotes, backslash,
# backtick) are escaped with backslashes
ret[match.group(1)] = re.sub(
r'\\([$"\'\\`])', r'\1', match.group(2)
)
break
else:
# None of the specified os-release files exist
return data
except (IOError, OSError):
pass
with salt.utils.files.fopen(filename) as ifile:
regex = re.compile('^([\\w]+)=(?:\'|")?(.*?)(?:\'|")?$')
for line in ifile:
match = regex.match(line.strip())
if match:
# Shell special characters ("$", quotes, backslash, backtick)
# are escaped with backslashes
data[match.group(1)] = re.sub(r'\\([$"\'\\`])', r'\1', match.group(2))
return data
return ret
def os_data():
@ -1491,6 +1520,7 @@ def os_data():
elif salt.utils.platform.is_linux():
# Add SELinux grain, if you have it
if _linux_bin_exists('selinuxenabled'):
log.trace('Adding selinux grains')
grains['selinux'] = {}
grains['selinux']['enabled'] = __salt__['cmd.retcode'](
'selinuxenabled'
@ -1502,6 +1532,7 @@ def os_data():
# Add systemd grain, if you have it
if _linux_bin_exists('systemctl') and _linux_bin_exists('localectl'):
log.trace('Adding systemd grains')
grains['systemd'] = {}
systemd_info = __salt__['cmd.run'](
'systemctl --version'
@ -1511,68 +1542,72 @@ def os_data():
# Add init grain
grains['init'] = 'unknown'
log.trace('Adding init grain')
try:
os.stat('/run/systemd/system')
grains['init'] = 'systemd'
except (OSError, IOError):
if os.path.exists('/proc/1/cmdline'):
try:
with salt.utils.files.fopen('/proc/1/cmdline') as fhr:
init_cmdline = fhr.read().replace('\x00', ' ').split()
except (IOError, OSError):
pass
else:
try:
init_bin = salt.utils.path.which(init_cmdline[0])
except IndexError:
# Emtpy init_cmdline
init_bin = None
log.warning('Unable to fetch data from /proc/1/cmdline')
if init_bin is not None and init_bin.endswith('bin/init'):
supported_inits = (b'upstart', b'sysvinit', b'systemd')
edge_len = max(len(x) for x in supported_inits) - 1
try:
init_bin = salt.utils.path.which(init_cmdline[0])
except IndexError:
# Emtpy init_cmdline
init_bin = None
log.warning(
"Unable to fetch data from /proc/1/cmdline"
)
if init_bin is not None and init_bin.endswith('bin/init'):
supported_inits = (b'upstart', b'sysvinit', b'systemd')
edge_len = max(len(x) for x in supported_inits) - 1
try:
buf_size = __opts__['file_buffer_size']
except KeyError:
# Default to the value of file_buffer_size for the minion
buf_size = 262144
try:
with salt.utils.files.fopen(init_bin, 'rb') as fp_:
buf = True
edge = b''
buf_size = __opts__['file_buffer_size']
except KeyError:
# Default to the value of file_buffer_size for the minion
buf_size = 262144
try:
with salt.utils.files.fopen(init_bin, 'rb') as fp_:
buf = True
edge = b''
buf = fp_.read(buf_size).lower()
while buf:
buf = edge + buf
for item in supported_inits:
if item in buf:
if six.PY3:
item = item.decode('utf-8')
grains['init'] = item
buf = b''
break
edge = buf[-edge_len:]
buf = fp_.read(buf_size).lower()
while buf:
buf = edge + buf
for item in supported_inits:
if item in buf:
if six.PY3:
item = item.decode('utf-8')
grains['init'] = item
buf = b''
break
edge = buf[-edge_len:]
buf = fp_.read(buf_size).lower()
except (IOError, OSError) as exc:
log.error(
'Unable to read from init_bin (%s): %s',
init_bin, exc
)
elif salt.utils.path.which('supervisord') in init_cmdline:
grains['init'] = 'supervisord'
elif init_cmdline == ['runit']:
grains['init'] = 'runit'
elif '/sbin/my_init' in init_cmdline:
#Phusion Base docker container use runit for srv mgmt, but my_init as pid1
grains['init'] = 'runit'
else:
log.info(
'Could not determine init system from command line: (%s)',
' '.join(init_cmdline)
except (IOError, OSError) as exc:
log.error(
'Unable to read from init_bin (%s): %s',
init_bin, exc
)
elif salt.utils.path.which('supervisord') in init_cmdline:
grains['init'] = 'supervisord'
elif init_cmdline == ['runit']:
grains['init'] = 'runit'
elif '/sbin/my_init' in init_cmdline:
# Phusion Base docker container use runit for srv mgmt, but
# my_init as pid1
grains['init'] = 'runit'
else:
log.info(
'Could not determine init system from command line: (%s)',
' '.join(init_cmdline)
)
# Add lsb grains on any distro with lsb-release. Note that this import
# can fail on systems with lsb-release installed if the system package
# does not install the python package for the python interpreter used by
# Salt (i.e. python2 or python3)
try:
log.trace('Getting lsb_release distro information')
import lsb_release # pylint: disable=import-error
release = lsb_release.get_distro_information()
for key, value in six.iteritems(release):
@ -1585,35 +1620,21 @@ def os_data():
# Catch a NameError to workaround possible breakage in lsb_release
# See https://github.com/saltstack/salt/issues/37867
except (ImportError, NameError):
# if the python library isn't available, default to regex
if os.path.isfile('/etc/lsb-release'):
# Matches any possible format:
# DISTRIB_ID="Ubuntu"
# DISTRIB_ID='Mageia'
# DISTRIB_ID=Fedora
# DISTRIB_RELEASE='10.10'
# DISTRIB_CODENAME='squeeze'
# DISTRIB_DESCRIPTION='Ubuntu 10.10'
regex = re.compile((
'^(DISTRIB_(?:ID|RELEASE|CODENAME|DESCRIPTION))=(?:\'|")?'
'([\\w\\s\\.\\-_]+)(?:\'|")?'
))
with salt.utils.files.fopen('/etc/lsb-release') as ifile:
for line in ifile:
match = regex.match(line.rstrip('\n'))
if match:
# Adds:
# lsb_distrib_{id,release,codename,description}
grains[
'lsb_{0}'.format(match.groups()[0].lower())
] = match.groups()[1].rstrip()
# if the python library isn't available, try to parse
# /etc/lsb-release using regex
log.trace('lsb_release python bindings not available')
grains.update(_parse_lsb_release())
if grains.get('lsb_distrib_description', '').lower().startswith('antergos'):
# Antergos incorrectly configures their /etc/lsb-release,
# setting the DISTRIB_ID to "Arch". This causes the "os" grain
# to be incorrectly set to "Arch".
grains['osfullname'] = 'Antergos Linux'
elif 'lsb_distrib_id' not in grains:
os_release = _parse_os_release(['/etc/os-release', '/usr/lib/os-release'])
log.trace(
'Failed to get lsb_distrib_id, trying to parse os-release'
)
os_release = _parse_os_release('/etc/os-release', '/usr/lib/os-release')
if os_release:
if 'NAME' in os_release:
grains['lsb_distrib_id'] = os_release['NAME'].strip()
@ -1638,6 +1659,7 @@ def os_data():
elif os_release.get("VERSION") == "Tumbleweed":
grains['osfullname'] = os_release["VERSION"]
elif os.path.isfile('/etc/SuSE-release'):
log.trace('Parsing distrib info from /etc/SuSE-release')
grains['lsb_distrib_id'] = 'SUSE'
version = ''
patch = ''
@ -1659,6 +1681,7 @@ def os_data():
if not grains.get('lsb_distrib_codename'):
grains['lsb_distrib_codename'] = 'n.a'
elif os.path.isfile('/etc/altlinux-release'):
log.trace('Parsing distrib info from /etc/altlinux-release')
# ALT Linux
grains['lsb_distrib_id'] = 'altlinux'
with salt.utils.files.fopen('/etc/altlinux-release') as ifile:
@ -1674,6 +1697,7 @@ def os_data():
grains['lsb_distrib_codename'] = \
comps[3].replace('(', '').replace(')', '')
elif os.path.isfile('/etc/centos-release'):
log.trace('Parsing distrib info from /etc/centos-release')
# CentOS Linux
grains['lsb_distrib_id'] = 'CentOS'
with salt.utils.files.fopen('/etc/centos-release') as ifile:
@ -1691,6 +1715,9 @@ def os_data():
elif os.path.isfile('/etc.defaults/VERSION') \
and os.path.isfile('/etc.defaults/synoinfo.conf'):
grains['osfullname'] = 'Synology'
log.trace(
'Parsing Synology distrib info from /etc/.defaults/VERSION'
)
with salt.utils.files.fopen('/etc.defaults/VERSION', 'r') as fp_:
synoinfo = {}
for line in fp_:
@ -1714,6 +1741,10 @@ def os_data():
# Use the already intelligent platform module to get distro info
# (though apparently it's not intelligent enough to strip quotes)
log.trace(
'Getting OS name, release, and codename from '
'platform.linux_distribution()'
)
(osname, osrelease, oscodename) = \
[x.strip('"').strip("'") for x in
linux_distribution(supported_dists=_supported_dists)]
@ -1723,8 +1754,7 @@ def os_data():
# so that linux_distribution() does the /etc/lsb-release parsing, but
# we do it anyway here for the sake for full portability.
if 'osfullname' not in grains:
grains['osfullname'] = \
grains.get('lsb_distrib_id', osname).strip()
grains['osfullname'] = grains.get('lsb_distrib_id', osname).strip()
if 'osrelease' not in grains:
# NOTE: This is a workaround for CentOS 7 os-release bug
# https://bugs.centos.org/view.php?id=8359

View file

@ -10,6 +10,7 @@ id: minion
open_mode: True
log_file: minion.log
log_level_logfile: debug
log_fmt_console: '%(asctime)s,%(msecs)03d [%(levelname)-8s] %(message)s'
pidfile: minion.pid
# Give the minion extra attempts to find the master

View file

@ -15,6 +15,8 @@
# pylint: disable=unused-import,function-redefined,blacklisted-module,blacklisted-external-module
from __future__ import absolute_import
import collections
import copy
import errno
import fnmatch
import sys
@ -94,119 +96,347 @@ if NO_MOCK is False:
NO_MOCK_REASON = 'you need to upgrade your mock version to >= 0.8.0'
# backport mock_open from the python 3 unittest.mock library so that we can
# mock read, readline, readlines, and file iteration properly
class MockFH(object):
def __init__(self, filename, read_data, *args, **kwargs):
self.filename = filename
self.call = MockCall(filename, *args, **kwargs)
self.empty_string = b'' if isinstance(read_data, six.binary_type) else ''
self.read_data = self._iterate_read_data(read_data)
self.read = Mock(side_effect=self._read)
self.readlines = Mock(side_effect=self._readlines)
self.readline = Mock(side_effect=self._readline)
self.close = Mock()
self.write = Mock()
self.writelines = Mock()
self.seek = Mock()
self._loc = 0
file_spec = None
def _iterate_read_data(self, read_data):
'''
Helper for mock_open:
Retrieve lines from read_data via a generator so that separate calls to
readline, read, and readlines are properly interleaved
'''
# Newline will always be a bytestring on PY2 because mock_open will have
# normalized it to one.
newline = b'\n' if isinstance(read_data, six.binary_type) else '\n'
read_data = [line + newline for line in read_data.split(newline)]
def _iterate_read_data(read_data):
'''
Helper for mock_open:
Retrieve lines from read_data via a generator so that separate calls to
readline, read, and readlines are properly interleaved
'''
# Newline will always be a bytestring on PY2 because mock_open will have
# normalized it to one.
newline = b'\n' if isinstance(read_data, six.binary_type) else '\n'
if read_data[-1] == newline:
# If the last line ended in a newline, the list comprehension will have an
# extra entry that's just a newline. Remove this.
read_data = read_data[:-1]
else:
# If there wasn't an extra newline by itself, then the file being
# emulated doesn't have a newline to end the last line, so remove the
# newline that we added in the list comprehension.
read_data[-1] = read_data[-1][:-1]
read_data = [line + newline for line in read_data.split(newline)]
if read_data[-1] == newline:
# If the last line ended in a newline, the list comprehension will have an
# extra entry that's just a newline. Remove this.
read_data = read_data[:-1]
else:
# If there wasn't an extra newline by itself, then the file being
# emulated doesn't have a newline to end the last line, so remove the
# newline that we added in the list comprehension.
read_data[-1] = read_data[-1][:-1]
for line in read_data:
yield line
def mock_open(mock=None, read_data='', match=None):
'''
A helper function to create a mock to replace the use of `open`. It works
for `open` called directly or used as a context manager.
The `mock` argument is the mock object to configure. If `None` (the
default) then a `MagicMock` will be created for you, with the API limited
to methods or attributes available on standard file handles.
`read_data` is a string for the `read` methoddline`, and `readlines` of the
file handle to return. This is an empty string by default.
If passed, `match` can be either a string or an iterable containing
patterns to attempt to match using fnmatch.fnmatch(). A side_effect will be
added to the mock object returned, which will cause an IOError(2, 'No such
file or directory') to be raised when the file path is not a match. This
allows you to make your mocked filehandle only work for certain file paths.
'''
# Normalize read_data, Python 2 filehandles should never produce unicode
# types on read.
if six.PY2:
read_data = salt.utils.stringutils.to_str(read_data)
def _readlines_side_effect(*args, **kwargs):
if handle.readlines.return_value is not None:
return handle.readlines.return_value
return list(_data)
def _read_side_effect(*args, **kwargs):
if handle.read.return_value is not None:
return handle.read.return_value
joiner = b'' if isinstance(read_data, six.binary_type) else ''
return joiner.join(_data)
def _readline_side_effect():
if handle.readline.return_value is not None:
while True:
yield handle.readline.return_value
for line in _data:
for line in read_data:
yield line
global file_spec
if file_spec is None:
if six.PY3:
import _io
file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
@property
def write_calls(self):
'''
Return a list of all calls to the .write() mock
'''
return [x[1][0] for x in self.write.mock_calls]
@property
def writelines_calls(self):
'''
Return a list of all calls to the .writelines() mock
'''
return [x[1][0] for x in self.writelines.mock_calls]
def tell(self):
return self._loc
def _read(self, size=0):
if not isinstance(size, six.integer_types) or size < 0:
raise TypeError('a positive integer is required')
joined = self.empty_string.join(self.read_data)
if not size:
# read() called with no args, return everything
self._loc += len(joined)
return joined
else:
file_spec = file # pylint: disable=undefined-variable
# read() called with an explicit size. Return a slice matching the
# requested size, but before doing so, reset read_data to reflect
# what we read.
self.read_data = self._iterate_read_data(joined[size:])
ret = joined[:size]
self._loc += len(ret)
return ret
if mock is None:
mock = MagicMock(name='open', spec=open)
def _readlines(self, size=None): # pylint: disable=unused-argument
# TODO: Implement "size" argument
ret = list(self.read_data)
self._loc += sum(len(x) for x in ret)
return ret
handle = MagicMock(spec=file_spec)
handle.__enter__.return_value = handle
def _readline(self, size=None): # pylint: disable=unused-argument
# TODO: Implement "size" argument
try:
ret = next(self.read_data)
self._loc += len(ret)
return ret
except StopIteration:
return self.empty_string
_data = _iterate_read_data(read_data)
def __iter__(self):
while True:
try:
ret = next(self.read_data)
self._loc += len(ret)
yield ret
except StopIteration:
break
handle.write.return_value = None
handle.read.return_value = None
handle.readline.return_value = None
handle.readlines.return_value = None
def __enter__(self):
return self
# Support iteration via for loop
handle.__iter__ = lambda x: _readline_side_effect()
def __exit__(self, exc_type, exc_val, exc_tb): # pylint: disable=unused-argument
pass
# This is salt specific and not in the upstream mock
handle.read.side_effect = _read_side_effect
handle.readline.side_effect = _readline_side_effect()
handle.readlines.side_effect = _readlines_side_effect
if match is not None:
if isinstance(match, six.string_types):
match = [match]
class MockCall(object):
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def fopen_side_effect(name, *args, **kwargs):
for pat in match:
if fnmatch.fnmatch(name, pat):
return DEFAULT
def __repr__(self):
# future lint: disable=blacklisted-function
ret = str('MockCall(')
for arg in self.args:
ret += repr(arg) + str(', ')
if not self.kwargs:
if self.args:
# Remove trailing ', '
ret = ret[:-2]
else:
for key, val in six.iteritems(self.kwargs):
ret += str('{0}={1}').format(
salt.utils.stringutils.to_str(key),
repr(val)
)
ret += str(')')
return ret
# future lint: enable=blacklisted-function
def __str__(self):
return self.__repr__()
def __eq__(self, other):
return self.args == other.args and self.kwargs == other.kwargs
class MockOpen(object):
r'''
This class can be used to mock the use of ``open()``.
``read_data`` is a string representing the contents of the file to be read.
By default, this is an empty string.
Optionally, ``read_data`` can be a dictionary mapping ``fnmatch.fnmatch()``
patterns to strings (or optionally, exceptions). This allows the mocked
filehandle to serve content for more than one file path.
.. code-block:: python
data = {
'/etc/foo.conf': textwrap.dedent("""\
Foo
Bar
Baz
"""),
'/etc/bar.conf': textwrap.dedent("""\
A
B
C
"""),
}
with patch('salt.utils.files.fopen', mock_open(read_data=data):
do stuff
If the file path being opened does not match any of the glob expressions,
an IOError will be raised to simulate the file not existing.
Passing ``read_data`` as a string is equivalent to passing it with a glob
expression of "*". That is to say, the below two invocations are
equivalent:
.. code-block:: python
mock_open(read_data='foo\n')
mock_open(read_data={'*': 'foo\n'})
Instead of a string representing file contents, ``read_data`` can map to an
exception, and that exception will be raised if a file matching that
pattern is opened:
.. code-block:: python
data = {
'/etc/*': IOError(errno.EACCES, 'Permission denied'),
'*': 'Hello world!\n',
}
with patch('salt.utils.files.fopen', mock_open(read_data=data)):
do stuff
The above would raise an exception if any files within /etc are opened, but
would produce a mocked filehandle if any other file is opened.
To simulate file contents changing upon subsequent opens, the file contents
can be a list of strings/exceptions. For example:
.. code-block:: python
data = {
'/etc/foo.conf': [
'before\n',
'after\n',
],
'/etc/bar.conf': [
IOError(errno.ENOENT, 'No such file or directory', '/etc/bar.conf'),
'Hey, the file exists now!',
],
}
with patch('salt.utils.files.fopen', mock_open(read_data=data):
do stuff
The first open of ``/etc/foo.conf`` would return "before\n" when read,
while the second would return "after\n" when read. For ``/etc/bar.conf``,
the first read would raise an exception, while the second would open
successfully and read the specified string.
Expressions will be attempted in dictionary iteration order (the exception
being ``*`` which is tried last), so if a file path matches more than one
fnmatch expression then the first match "wins". If your use case calls for
overlapping expressions, then an OrderedDict can be used to ensure that the
desired matching behavior occurs:
.. code-block:: python
data = OrderedDict()
data['/etc/foo.conf'] = 'Permission granted!'
data['/etc/*'] = IOError(errno.EACCES, 'Permission denied')
data['*'] = '*': 'Hello world!\n'
with patch('salt.utils.files.fopen', mock_open(read_data=data):
do stuff
The following attributes are tracked for the life of a mock object:
* call_count - Tracks how many fopen calls were attempted
* filehandles - This is a dictionary mapping filenames to lists of MockFH
objects, representing the individual times that a given file was opened.
'''
def __init__(self, read_data=''):
# If the read_data contains lists, we will be popping it. So, don't
# modify the original value passed.
read_data = copy.copy(read_data)
# Normalize read_data, Python 2 filehandles should never produce unicode
# types on read.
if not isinstance(read_data, dict):
read_data = {'*': read_data}
if six.PY2:
# .__class__() used here to preserve the dict class in the event that
# an OrderedDict was used.
new_read_data = read_data.__class__()
for key, val in six.iteritems(read_data):
try:
val = salt.utils.data.decode(val, to_str=True)
except TypeError:
if not isinstance(val, BaseException):
raise
new_read_data[key] = val
read_data = new_read_data
del new_read_data
self.read_data = read_data
self.filehandles = {}
self.calls = []
self.call_count = 0
def __call__(self, name, *args, **kwargs):
'''
Match the file being opened to the patterns in the read_data and spawn
a mocked filehandle with the corresponding file contents.
'''
call = MockCall(name, *args, **kwargs)
self.calls.append(call)
self.call_count += 1
for pat in self.read_data:
if pat == '*':
continue
if fnmatch.fnmatch(name, pat):
matched_pattern = pat
break
else:
# No non-glob match in read_data, fall back to '*'
matched_pattern = '*'
try:
matched_contents = self.read_data[matched_pattern]
try:
# Assuming that the value for the matching expression is a
# list, pop the first element off of it.
file_contents = matched_contents.pop(0)
except AttributeError:
# The value for the matching expression is a string (or exception)
file_contents = matched_contents
except IndexError:
# We've run out of file contents, abort!
raise RuntimeError(
'File matching expression \'{0}\' opened more times than '
'expected'.format(matched_pattern)
)
try:
# Raise the exception if the matched file contents are an
# instance of an exception class.
raise file_contents
except TypeError:
# Contents were not an exception, so proceed with creating the
# mocked filehandle.
pass
ret = MockFH(name, file_contents, *args, **kwargs)
self.filehandles.setdefault(name, []).append(ret)
return ret
except KeyError:
# No matching glob in read_data, treat this as a file that does
# not exist and raise the appropriate exception.
raise IOError(errno.ENOENT, 'No such file or directory', name)
mock.side_effect = fopen_side_effect
def write_calls(self, path=None):
'''
Returns the contents passed to all .write() calls. Use `path` to narrow
the results to files matching a given pattern.
'''
ret = []
for filename, handles in six.iteritems(self.filehandles):
if path is None or fnmatch.fnmatch(filename, path):
for fh_ in handles:
ret.extend(fh_.write_calls)
return ret
mock.return_value = handle
return mock
def writelines_calls(self, path=None):
'''
Returns the contents passed to all .writelines() calls. Use `path` to
narrow the results to files matching a given pattern.
'''
ret = []
for filename, handles in six.iteritems(self.filehandles):
if path is None or fnmatch.fnmatch(filename, path):
for fh_ in handles:
ret.extend(fh_.writelines_calls)
return ret
# reimplement mock_open to support multiple filehandles
mock_open = MockOpen

View file

@ -11,6 +11,7 @@ from tests.support.mixins import LoaderModuleMockMixin
# Salt libs
import salt.beacons.btmp as btmp
from salt.ext import six
raw = b'\x06\x00\x00\x00Nt\x00\x00ssh:notty\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00garet\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00::1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xdd\xc7\xc2Y\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
pack = (6, 29774, b'ssh:notty\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', b'\x00\x00\x00\x00', b'garet\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', b'::1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 0, 0, 0, 1505937373, 0, 0, 0, 0, 16777216)
@ -57,8 +58,9 @@ class BTMPBeaconTestCase(TestCase, LoaderModuleMockMixin):
with patch('salt.utils.files.fopen', mock_open()) as m_open:
ret = btmp.beacon(config)
m_open.assert_called_with(btmp.BTMP, 'rb')
self.assertEqual(ret, [])
call_args = next(six.itervalues(m_open.filehandles))[0].call.args
assert call_args == (btmp.BTMP, 'rb'), call_args
assert ret == [], ret
def test_match(self):
with patch('salt.utils.files.fopen',

View file

@ -11,6 +11,7 @@ from tests.support.mixins import LoaderModuleMockMixin
# Salt libs
import salt.beacons.wtmp as wtmp
from salt.ext import six
raw = b'\x07\x00\x00\x00H\x18\x00\x00pts/14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00s/14gareth\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00::1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x13I\xc5YZf\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
pack = (7, 6216, b'pts/14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', b's/14', b'gareth\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', b'::1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 0, 0, 0, 1506101523, 353882, 0, 0, 0, 16777216)
@ -57,8 +58,9 @@ class WTMPBeaconTestCase(TestCase, LoaderModuleMockMixin):
with patch('salt.utils.files.fopen', mock_open()) as m_open:
ret = wtmp.beacon(config)
m_open.assert_called_with(wtmp.WTMP, 'rb')
self.assertEqual(ret, [])
call_args = next(six.itervalues(m_open.filehandles))[0].call.args
assert call_args == (wtmp.WTMP, 'rb'), call_args
assert ret == [], ret
def test_match(self):
with patch('salt.utils.files.fopen',

View file

@ -69,7 +69,9 @@ class CoreGrainsTestCase(TestCase, LoaderModuleMockMixin):
with salt.utils.files.fopen(os.path.join(OS_RELEASE_DIR, "ubuntu-17.10")) as os_release_file:
os_release_content = os_release_file.read()
with patch("salt.utils.files.fopen", mock_open(read_data=os_release_content)):
os_release = core._parse_os_release(["/etc/os-release", "/usr/lib/os-release"])
os_release = core._parse_os_release(
'/etc/os-release',
'/usr/lib/os-release')
self.assertEqual(os_release, {
"NAME": "Ubuntu",
"VERSION": "17.10 (Artful Aardvark)",
@ -85,10 +87,9 @@ class CoreGrainsTestCase(TestCase, LoaderModuleMockMixin):
"UBUNTU_CODENAME": "artful",
})
@patch("os.path.isfile")
def test_missing_os_release(self, path_isfile_mock):
path_isfile_mock.return_value = False
os_release = core._parse_os_release(["/etc/os-release", "/usr/lib/os-release"])
def test_missing_os_release(self):
with patch('salt.utils.files.fopen', mock_open(read_data={})):
os_release = core._parse_os_release('/etc/os-release', '/usr/lib/os-release')
self.assertEqual(os_release, {})
@skipIf(not salt.utils.platform.is_linux(), 'System is not Linux')
@ -101,7 +102,7 @@ class CoreGrainsTestCase(TestCase, LoaderModuleMockMixin):
}
_path_isfile_map = {}
_cmd_run_map = {
'dpkg --print-architecture': 'amd64'
'dpkg --print-architecture': 'amd64',
}
path_exists_mock = MagicMock(side_effect=lambda x: _path_exists_map[x])
@ -124,60 +125,35 @@ class CoreGrainsTestCase(TestCase, LoaderModuleMockMixin):
raise ImportError('No module named lsb_release')
return orig_import(name, *args)
# Skip the first if statement
# - Skip the first if statement
# - Skip the selinux/systemd stuff (not pertinent)
# - Skip the init grain compilation (not pertinent)
# - Ensure that lsb_release fails to import
# - Skip all the /etc/*-release stuff (not pertinent)
# - Mock linux_distribution to give us the OS name that we want
# - Make a bunch of functions return empty dicts, we don't care about
# these grains for the purposes of this test.
# - Mock the osarch
distro_mock = MagicMock(return_value=('Debian GNU/Linux', '8.3', ''))
with patch.object(salt.utils.platform, 'is_proxy',
MagicMock(return_value=False)):
# Skip the selinux/systemd stuff (not pertinent)
with patch.object(core, '_linux_bin_exists',
MagicMock(return_value=False)):
# Skip the init grain compilation (not pertinent)
with patch.object(os.path, 'exists', path_exists_mock):
# Ensure that lsb_release fails to import
with patch('{0}.__import__'.format(built_in),
side_effect=_import_mock):
# Skip all the /etc/*-release stuff (not pertinent)
with patch.object(os.path, 'isfile', path_isfile_mock):
# Mock linux_distribution to give us the OS name
# that we want.
distro_mock = MagicMock(
return_value=('Debian GNU/Linux', '8.3', '')
)
with patch.object(
core,
'linux_distribution',
distro_mock):
# Make a bunch of functions return empty dicts,
# we don't care about these grains for the
# purposes of this test.
with patch.object(
core,
'_linux_cpudata',
empty_mock):
with patch.object(
core,
'_linux_gpu_data',
empty_mock):
with patch.object(
core,
'_memdata',
empty_mock):
with patch.object(
core,
'_hw_data',
empty_mock):
with patch.object(
core,
'_virtual',
empty_mock):
with patch.object(
core,
'_ps',
empty_mock):
# Mock the osarch
with patch.dict(
core.__salt__,
{'cmd.run': cmd_run_mock}):
os_grains = core.os_data()
MagicMock(return_value=False)), \
patch.object(core, '_linux_bin_exists',
MagicMock(return_value=False)), \
patch.object(os.path, 'exists', path_exists_mock), \
patch('{0}.__import__'.format(built_in), side_effect=_import_mock), \
patch.object(os.path, 'isfile', path_isfile_mock), \
patch.object(core, '_parse_lsb_release', empty_mock), \
patch.object(core, '_parse_os_release', empty_mock), \
patch.object(core, '_parse_lsb_release', empty_mock), \
patch.object(core, 'linux_distribution', distro_mock), \
patch.object(core, '_linux_cpudata', empty_mock), \
patch.object(core, '_linux_gpu_data', empty_mock), \
patch.object(core, '_memdata', empty_mock), \
patch.object(core, '_hw_data', empty_mock), \
patch.object(core, '_virtual', empty_mock), \
patch.object(core, '_ps', empty_mock), \
patch.dict(core.__salt__, {'cmd.run': cmd_run_mock}):
os_grains = core.os_data()
self.assertEqual(os_grains.get('os_family'), 'Debian')
@ -215,33 +191,34 @@ class CoreGrainsTestCase(TestCase, LoaderModuleMockMixin):
raise ImportError('No module named lsb_release')
return orig_import(name, *args)
# Skip the first if statement
distro_mock = MagicMock(
return_value=('SUSE Linux Enterprise Server ', '12', 'x86_64')
)
# - Skip the first if statement
# - Skip the selinux/systemd stuff (not pertinent)
# - Skip the init grain compilation (not pertinent)
# - Ensure that lsb_release fails to import
# - Skip all the /etc/*-release stuff (not pertinent)
# - Mock linux_distribution to give us the OS name that we want
# - Mock the osarch
with patch.object(salt.utils.platform, 'is_proxy',
MagicMock(return_value=False)):
# Skip the selinux/systemd stuff (not pertinent)
with patch.object(core, '_linux_bin_exists',
MagicMock(return_value=False)):
# Skip the init grain compilation (not pertinent)
with patch.object(os.path, 'exists', path_exists_mock):
# Ensure that lsb_release fails to import
with patch('{0}.__import__'.format(built_in),
side_effect=_import_mock):
# Skip all the /etc/*-release stuff (not pertinent)
with patch.object(os.path, 'isfile', MagicMock(return_value=False)):
with patch.object(core, '_parse_os_release', os_release_mock):
# Mock linux_distribution to give us the OS
# name that we want.
distro_mock = MagicMock(
return_value=('SUSE Linux Enterprise Server ', '12', 'x86_64')
)
with patch.object(core, 'linux_distribution', distro_mock):
with patch.object(core, '_linux_gpu_data', empty_mock):
with patch.object(core, '_hw_data', empty_mock):
with patch.object(core, '_linux_cpudata', empty_mock):
with patch.object(core, '_virtual', empty_mock):
# Mock the osarch
with patch.dict(core.__salt__, {'cmd.run': osarch_mock}):
os_grains = core.os_data()
MagicMock(return_value=False)), \
patch.object(core, '_linux_bin_exists',
MagicMock(return_value=False)), \
patch.object(os.path, 'exists', path_exists_mock), \
patch('{0}.__import__'.format(built_in),
side_effect=_import_mock), \
patch.object(os.path, 'isfile', MagicMock(return_value=False)), \
patch.object(core, '_parse_os_release', os_release_mock), \
patch.object(core, '_parse_lsb_release', empty_mock), \
patch.object(core, 'linux_distribution', distro_mock), \
patch.object(core, '_linux_gpu_data', empty_mock), \
patch.object(core, '_hw_data', empty_mock), \
patch.object(core, '_linux_cpudata', empty_mock), \
patch.object(core, '_virtual', empty_mock), \
patch.dict(core.__salt__, {'cmd.run': osarch_mock}):
os_grains = core.os_data()
self.assertEqual(os_grains.get('os_family'), 'Suse')
self.assertEqual(os_grains.get('os'), 'SUSE')
@ -252,7 +229,8 @@ class CoreGrainsTestCase(TestCase, LoaderModuleMockMixin):
osarch_mock = MagicMock(return_value="amd64")
if os_release_filename:
os_release_data = core._parse_os_release(
[os.path.join(OS_RELEASE_DIR, os_release_filename)])
os.path.join(OS_RELEASE_DIR, os_release_filename)
)
else:
os_release_data = os_release_map.get('os_release_file', {})
os_release_mock = MagicMock(return_value=os_release_data)
@ -268,13 +246,19 @@ class CoreGrainsTestCase(TestCase, LoaderModuleMockMixin):
raise ImportError('No module named lsb_release')
return orig_import(name, *args)
# Skip the first if statement
# Skip the selinux/systemd stuff (not pertinent)
# Skip the init grain compilation (not pertinent)
# Ensure that lsb_release fails to import
# Skip all the /etc/*-release stuff (not pertinent)
# - Mock linux_distribution to give us the OS name that we want.
# Mock the osarch
suse_release_file = os_release_map.get('suse_release_file')
file_contents = {'/proc/1/cmdline': ''}
if suse_release_file:
file_contents['/etc/SuSE-release'] = suse_release_file
# - Skip the first if statement
# - Skip the selinux/systemd stuff (not pertinent)
# - Skip the init grain compilation (not pertinent)
# - Ensure that lsb_release fails to import
# - Skip all the /etc/*-release stuff (not pertinent)
# - Mock linux_distribution to give us the OS name that we want
# - Mock the osarch
distro_mock = MagicMock(return_value=os_release_map['linux_distribution'])
with patch.object(salt.utils.platform, 'is_proxy', MagicMock(return_value=False)), \
patch.object(core, '_linux_bin_exists', MagicMock(return_value=False)), \
@ -282,7 +266,8 @@ class CoreGrainsTestCase(TestCase, LoaderModuleMockMixin):
patch('{0}.__import__'.format(built_in), side_effect=_import_mock), \
patch.object(os.path, 'isfile', path_isfile_mock), \
patch.object(core, '_parse_os_release', os_release_mock), \
patch('salt.utils.files.fopen', mock_open(read_data=os_release_map.get('suse_release_file', ''))), \
patch.object(core, '_parse_lsb_release', empty_mock), \
patch('salt.utils.files.fopen', mock_open(read_data=file_contents)), \
patch.object(core, 'linux_distribution', distro_mock), \
patch.object(core, '_linux_gpu_data', empty_mock), \
patch.object(core, '_linux_cpudata', empty_mock), \
@ -559,67 +544,13 @@ class CoreGrainsTestCase(TestCase, LoaderModuleMockMixin):
'''
Test memdata on Linux systems
'''
_path_exists_map = {
'/proc/1/cmdline': False,
'/proc/meminfo': True
}
_path_isfile_map = {
'/proc/meminfo': True
}
_cmd_run_map = {
'dpkg --print-architecture': 'amd64',
'rpm --eval %{_host_cpu}': 'x86_64'
}
path_exists_mock = MagicMock(side_effect=lambda x: _path_exists_map[x])
path_isfile_mock = MagicMock(
side_effect=lambda x: _path_isfile_map.get(x, False)
)
cmd_run_mock = MagicMock(
side_effect=lambda x: _cmd_run_map[x]
)
empty_mock = MagicMock(return_value={})
_proc_meminfo = textwrap.dedent('''\
MemTotal: 16277028 kB
SwapTotal: 4789244 kB''')
orig_import = __import__
if six.PY2:
built_in = '__builtin__'
else:
built_in = 'builtins'
def _import_mock(name, *args):
if name == 'lsb_release':
raise ImportError('No module named lsb_release')
return orig_import(name, *args)
# Mock a bunch of stuff so we can isolate the mem stuff:
# - Skip the first if statement
# - Skip the init grain compilation (not pertinent)
# - Ensure that lsb_release fails to import
# - Skip all the /etc/*-release stuff (not pertinent)
# - Make a bunch of functions return empty dicts, we don't care
# about these grains for the purposes of this test.
# - Mock the osarch
# - And most importantly, mock the contents of /proc/meminfo
with patch.object(salt.utils.platform, 'is_proxy', MagicMock(return_value=False)), \
patch.object(core, '_linux_bin_exists', MagicMock(return_value=False)), \
patch.object(os.path, 'exists', path_exists_mock), \
patch('{0}.__import__'.format(built_in), side_effect=_import_mock), \
patch.object(os.path, 'isfile', path_isfile_mock), \
patch.object(core, '_linux_cpudata', empty_mock), \
patch.object(core, '_linux_gpu_data', empty_mock), \
patch.object(core, '_hw_data', empty_mock), \
patch.object(core, '_virtual', empty_mock), \
patch.object(core, '_ps', empty_mock), \
patch.dict(core.__salt__, {'cmd.run': cmd_run_mock}), \
patch('salt.utils.files.fopen', mock_open(read_data=_proc_meminfo)):
os_grains = core.os_data()
self.assertEqual(os_grains.get('mem_total'), 15895)
self.assertEqual(os_grains.get('swap_total'), 4676)
with patch('salt.utils.files.fopen', mock_open(read_data=_proc_meminfo)):
memdata = core._linux_memdata()
self.assertEqual(memdata.get('mem_total'), 15895)
self.assertEqual(memdata.get('swap_total'), 4676)
@skipIf(salt.utils.platform.is_windows(), 'System is Windows')
def test_bsd_memdata(self):
@ -910,39 +841,41 @@ class CoreGrainsTestCase(TestCase, LoaderModuleMockMixin):
'''
import platform
path_isfile_mock = MagicMock(side_effect=lambda x: x in ['/etc/release'])
with patch.object(platform, 'uname',
MagicMock(return_value=('SunOS', 'testsystem', '5.11', '11.3', 'sunv4', 'sparc'))):
with patch.object(salt.utils.platform, 'is_proxy',
MagicMock(return_value=False)):
with patch.object(salt.utils.platform, 'is_linux',
MagicMock(return_value=False)):
with patch.object(salt.utils.platform, 'is_windows',
MagicMock(return_value=False)):
with patch.object(salt.utils.platform, 'is_smartos',
MagicMock(return_value=False)):
with patch.object(salt.utils.path, 'which_bin',
MagicMock(return_value=None)):
with patch.object(os.path, 'isfile', path_isfile_mock):
with salt.utils.files.fopen(os.path.join(OS_RELEASE_DIR, "solaris-11.3")) as os_release_file:
os_release_content = os_release_file.readlines()
with patch("salt.utils.files.fopen", mock_open()) as os_release_file:
os_release_file.return_value.__iter__.return_value = os_release_content
with patch.object(core, '_sunos_cpudata',
MagicMock(return_value={'cpuarch': 'sparcv9',
'num_cpus': '1',
'cpu_model': 'MOCK_CPU_MODEL',
'cpu_flags': []})):
with patch.object(core, '_memdata',
MagicMock(return_value={'mem_total': 16384})):
with patch.object(core, '_virtual',
MagicMock(return_value={})):
with patch.object(core, '_ps',
MagicMock(return_value={})):
with patch.object(salt.utils.path, 'which',
MagicMock(return_value=True)):
sparc_return_mock = MagicMock(return_value=prtdata)
with patch.dict(core.__salt__, {'cmd.run': sparc_return_mock}):
os_grains = core.os_data()
with salt.utils.files.fopen(os.path.join(OS_RELEASE_DIR, "solaris-11.3")) as os_release_file:
os_release_content = os_release_file.readlines()
uname_mock = MagicMock(return_value=(
'SunOS', 'testsystem', '5.11', '11.3', 'sunv4', 'sparc'
))
with patch.object(platform, 'uname', uname_mock), \
patch.object(salt.utils.platform, 'is_proxy',
MagicMock(return_value=False)), \
patch.object(salt.utils.platform, 'is_linux',
MagicMock(return_value=False)), \
patch.object(salt.utils.platform, 'is_windows',
MagicMock(return_value=False)), \
patch.object(salt.utils.platform, 'is_smartos',
MagicMock(return_value=False)), \
patch.object(salt.utils.path, 'which_bin',
MagicMock(return_value=None)), \
patch.object(os.path, 'isfile', path_isfile_mock), \
patch('salt.utils.files.fopen',
mock_open(read_data=os_release_content)) as os_release_file, \
patch.object(core, '_sunos_cpudata',
MagicMock(return_value={
'cpuarch': 'sparcv9',
'num_cpus': '1',
'cpu_model': 'MOCK_CPU_MODEL',
'cpu_flags': []})), \
patch.object(core, '_memdata',
MagicMock(return_value={'mem_total': 16384})), \
patch.object(core, '_virtual',
MagicMock(return_value={})), \
patch.object(core, '_ps', MagicMock(return_value={})), \
patch.object(salt.utils.path, 'which',
MagicMock(return_value=True)), \
patch.dict(core.__salt__,
{'cmd.run': MagicMock(return_value=prtdata)}):
os_grains = core.os_data()
grains = {k: v for k, v in os_grains.items()
if k in set(['product', 'productname'])}
self.assertEqual(grains, expectation)

View file

@ -32,20 +32,14 @@ class FibreChannelGrainsTestCase(TestCase):
cmd_run_mock = MagicMock(return_value=wwns)
with patch('salt.modules.cmdmod.powershell', cmd_run_mock):
ret = fibre_channel._windows_wwns()
self.assertEqual(ret, wwns)
assert ret == wwns, ret
def test_linux_fibre_channel_wwns_grains(self):
def multi_mock_open(*file_contents):
mock_files = [mock_open(read_data=content).return_value for content in file_contents]
mock_opener = mock_open()
mock_opener.side_effect = mock_files
return mock_opener
contents = ['0x500143802426baf4', '0x500143802426baf5']
files = ['file1', 'file2']
with patch('glob.glob', MagicMock(return_value=files)):
with patch('salt.utils.files.fopen', multi_mock_open('0x500143802426baf4', '0x500143802426baf5')):
ret = fibre_channel._linux_wwns()
with patch('glob.glob', MagicMock(return_value=files)), \
patch('salt.utils.files.fopen', mock_open(read_data=contents)):
ret = fibre_channel._linux_wwns()
self.assertEqual(ret, ['500143802426baf4', '500143802426baf5'])
assert ret == ['500143802426baf4', '500143802426baf5'], ret

View file

@ -131,19 +131,23 @@ class CpTestCase(TestCase, LoaderModuleMockMixin):
'''
Test if push works with good posix path.
'''
filename = '/saltines/test.file'
with patch('salt.modules.cp.os.path',
MagicMock(isfile=Mock(return_value=True), wraps=cp.os.path)), \
patch.multiple('salt.modules.cp',
_auth=MagicMock(**{'return_value.gen_token.return_value': 'token'}),
__opts__={'id': 'abc', 'file_buffer_size': 10}), \
patch('salt.utils.files.fopen', mock_open(read_data=b'content')), \
patch('salt.utils.files.fopen', mock_open(read_data=b'content')) as m_open, \
patch('salt.transport.Channel.factory', MagicMock()):
response = cp.push('/saltines/test.file')
self.assertEqual(response, True)
self.assertEqual(salt.utils.files.fopen().read.call_count, 2) # pylint: disable=resource-leakage
response = cp.push(filename)
assert response, response
num_opens = len(m_open.filehandles[filename])
assert num_opens == 1, num_opens
fh_ = m_open.filehandles[filename][0]
assert fh_.read.call_count == 2, fh_.read.call_count
salt.transport.Channel.factory({}).send.assert_called_once_with(
dict(
loc=salt.utils.files.fopen().tell(), # pylint: disable=resource-leakage
loc=fh_.tell(), # pylint: disable=resource-leakage
cmd='_file_recv',
tok='token',
path=['saltines', 'test.file'],

View file

@ -13,7 +13,6 @@ from tests.support.mock import (
MagicMock,
patch,
mock_open,
call,
NO_MOCK,
NO_MOCK_REASON
)
@ -51,24 +50,23 @@ mock_soa_zone = salt.utils.stringutils.to_str(
'1 PTR localhost.')
if NO_MOCK is False:
mock_calls_list = [
call.read(),
call.write(salt.utils.stringutils.to_str('##\n')),
call.write(salt.utils.stringutils.to_str('# Host Database\n')),
call.write(salt.utils.stringutils.to_str('#\n')),
call.write(salt.utils.stringutils.to_str('# localhost is used to configure the '
'loopback interface\n')),
call.write(salt.utils.stringutils.to_str('# when the system is booting. Do not '
'change this entry.\n')),
call.write(salt.utils.stringutils.to_str('##\n')),
call.write(salt.utils.stringutils.to_str('127.0.0.1 localhost')),
call.write(salt.utils.stringutils.to_str('\n')),
call.write(salt.utils.stringutils.to_str('255.255.255.255 broadcasthost')),
call.write(salt.utils.stringutils.to_str('\n')),
call.write(salt.utils.stringutils.to_str('::1 localhost')),
call.write(salt.utils.stringutils.to_str('\n')),
call.write(salt.utils.stringutils.to_str('fe80::1%lo0 localhost')),
call.write(salt.utils.stringutils.to_str('\n'))]
mock_writes_list = salt.utils.data.decode([
'##\n',
'# Host Database\n',
'#\n',
'# localhost is used to configure the loopback interface\n',
'# when the system is booting. Do not change this entry.\n',
'##\n',
'127.0.0.1 localhost',
'\n',
'255.255.255.255 broadcasthost',
'\n',
'::1 localhost',
'\n',
'fe80::1%lo0 localhost',
'\n'
], to_str=True
)
@skipIf(NO_MOCK, NO_MOCK_REASON)
@ -84,18 +82,21 @@ class DNSUtilTestCase(TestCase):
with patch('salt.utils.files.fopen', mock_open(read_data=mock_hosts_file)) as m_open, \
patch('salt.modules.dnsutil.parse_hosts', MagicMock(return_value=mock_hosts_file_rtn)):
dnsutil.hosts_append('/etc/hosts', '127.0.0.1', 'ad1.yuk.co,ad2.yuk.co')
helper_open = m_open()
helper_open.write.assert_called_once_with(
salt.utils.stringutils.to_str('\n127.0.0.1 ad1.yuk.co ad2.yuk.co'))
writes = m_open.write_calls()
# We should have called .write() only once, with the expected
# content
num_writes = len(writes)
assert num_writes == 1, num_writes
expected = salt.utils.stringutils.to_str('\n127.0.0.1 ad1.yuk.co ad2.yuk.co')
assert writes[0] == expected, writes[0]
def test_hosts_remove(self):
to_remove = 'ad1.yuk.co'
new_mock_file = mock_hosts_file + '\n127.0.0.1 ' + to_remove + '\n'
with patch('salt.utils.files.fopen', mock_open(read_data=new_mock_file)) as m_open:
dnsutil.hosts_remove('/etc/hosts', to_remove)
helper_open = m_open()
calls_list = helper_open.method_calls
self.assertEqual(calls_list, mock_calls_list)
writes = m_open.write_calls()
assert writes == mock_writes_list, writes
@skipIf(True, 'Waiting on bug report fixes')
def test_parse_zone(self):

View file

@ -8,10 +8,11 @@ import tempfile
import textwrap
# Import Salt Testing libs
from tests.support.helpers import with_tempfile
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.paths import TMP
from tests.support.unit import TestCase, skipIf
from tests.support.mock import MagicMock, Mock, patch, mock_open
from tests.support.mock import MagicMock, Mock, patch, mock_open, DEFAULT
try:
import pytest
@ -39,6 +40,10 @@ here
'''
class DummyStat(object):
st_size = 123
class FileReplaceTestCase(TestCase, LoaderModuleMockMixin):
def setup_loader_modules(self):
@ -990,10 +995,8 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
self.assertEqual(
filemod._starts_till(src=src, probe='and here is something'), -1)
@patch('os.path.realpath', MagicMock())
@patch('os.path.isfile', MagicMock(return_value=True))
@patch('os.stat', MagicMock())
def test_line_insert_after_no_pattern(self):
@with_tempfile()
def test_line_insert_after_no_pattern(self, name):
'''
Test for file.line for insertion after specific line, using no pattern.
@ -1012,19 +1015,26 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
' - /srv/custom'
])
cfg_content = '- /srv/custom'
files_fopen = mock_open(read_data=file_content)
with patch('salt.utils.files.fopen', files_fopen):
atomic_opener = mock_open()
with patch('salt.utils.atomicfile.atomic_open', atomic_opener):
filemod.line('foo', content=cfg_content, after='- /srv/salt', mode='insert')
self.assertEqual(len(atomic_opener().write.call_args_list), 1)
self.assertEqual(atomic_opener().write.call_args_list[0][0][0],
file_modified)
@patch('os.path.realpath', MagicMock())
@patch('os.path.isfile', MagicMock(return_value=True))
@patch('os.stat', MagicMock())
def test_line_insert_after_pattern(self):
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
with patch('os.path.isfile', isfile_mock), \
patch('os.stat', MagicMock(return_value=DummyStat())), \
patch('salt.utils.files.fopen', mock_open(read_data=file_content)), \
patch('salt.utils.atomicfile.atomic_open', mock_open()) as atomic_open_mock:
filemod.line(name, content=cfg_content, after='- /srv/salt', mode='insert')
handles = atomic_open_mock.filehandles[name]
# We should only have opened the file once
open_count = len(handles)
assert open_count == 1, open_count
# We should only have invoked .write() once...
write_count = len(handles[0].write.call_args_list)
assert write_count == 1, write_count
# ... with the updated content
write_content = handles[0].write.call_args_list[0][0][0]
assert write_content == file_modified, write_content
@with_tempfile()
def test_line_insert_after_pattern(self, name):
'''
Test for file.line for insertion after specific line, using pattern.
@ -1053,20 +1063,28 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
' custom:',
' - /srv/custom'
])
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
for after_line in ['file_r.*', '.*roots']:
files_fopen = mock_open(read_data=file_content)
with patch('salt.utils.files.fopen', files_fopen):
atomic_opener = mock_open()
with patch('salt.utils.atomicfile.atomic_open', atomic_opener):
filemod.line('foo', content=cfg_content, after=after_line, mode='insert', indent=False)
self.assertEqual(len(atomic_opener().write.call_args_list), 1)
self.assertEqual(atomic_opener().write.call_args_list[0][0][0],
file_modified)
with patch('os.path.isfile', isfile_mock), \
patch('os.stat', MagicMock(return_value=DummyStat())), \
patch('salt.utils.files.fopen',
mock_open(read_data=file_content)), \
patch('salt.utils.atomicfile.atomic_open',
mock_open()) as atomic_open_mock:
filemod.line(name, content=cfg_content, after=after_line, mode='insert', indent=False)
handles = atomic_open_mock.filehandles[name]
# We should only have opened the file once
open_count = len(handles)
assert open_count == 1, open_count
# We should only have invoked .write() once...
write_count = len(handles[0].write.call_args_list)
assert write_count == 1, write_count
# ... with the updated content
write_content = handles[0].write.call_args_list[0][0][0]
assert write_content == file_modified, write_content
@patch('os.path.realpath', MagicMock())
@patch('os.path.isfile', MagicMock(return_value=True))
@patch('os.stat', MagicMock())
def test_line_insert_multi_line_content_after_unicode(self):
@with_tempfile()
def test_line_insert_multi_line_content_after_unicode(self, name):
'''
Test for file.line for insertion after specific line with Unicode
@ -1076,20 +1094,28 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
file_content = ("This is a line\nThis is another line")
file_modified = salt.utils.stringutils.to_str("This is a line\nThis is another line\nThis is a line with unicode Ŷ")
cfg_content = "This is a line with unicode Ŷ"
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
for after_line in ['This is another line']:
files_fopen = mock_open(read_data=file_content)
with patch('salt.utils.files.fopen', files_fopen):
atomic_opener = mock_open()
with patch('salt.utils.atomicfile.atomic_open', atomic_opener):
filemod.line('foo', content=cfg_content, after=after_line, mode='insert', indent=False)
self.assertEqual(len(atomic_opener().write.call_args_list), 1)
self.assertEqual(atomic_opener().write.call_args_list[0][0][0],
file_modified)
with patch('os.path.isfile', isfile_mock), \
patch('os.stat', MagicMock(return_value=DummyStat())), \
patch('salt.utils.files.fopen',
mock_open(read_data=file_content)), \
patch('salt.utils.atomicfile.atomic_open',
mock_open()) as atomic_open_mock:
filemod.line(name, content=cfg_content, after=after_line, mode='insert', indent=False)
handles = atomic_open_mock.filehandles[name]
# We should only have opened the file once
open_count = len(handles)
assert open_count == 1, open_count
# We should only have invoked .write() once...
write_count = len(handles[0].write.call_args_list)
assert write_count == 1, write_count
# ... with the updated content
write_content = handles[0].write.call_args_list[0][0][0]
assert write_content == file_modified, write_content
@patch('os.path.realpath', MagicMock())
@patch('os.path.isfile', MagicMock(return_value=True))
@patch('os.stat', MagicMock())
def test_line_insert_before(self):
@with_tempfile()
def test_line_insert_before(self, name):
'''
Test for file.line for insertion before specific line, using pattern and no patterns.
@ -1110,20 +1136,29 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
' - /srv/sugar'
])
cfg_content = '- /srv/custom'
for before_line in ['/srv/salt', '/srv/sa.*t', '/sr.*']:
files_fopen = mock_open(read_data=file_content)
with patch('salt.utils.files.fopen', files_fopen):
atomic_opener = mock_open()
with patch('salt.utils.atomicfile.atomic_open', atomic_opener):
filemod.line('foo', content=cfg_content, before=before_line, mode='insert')
self.assertEqual(len(atomic_opener().write.call_args_list), 1)
self.assertEqual(atomic_opener().write.call_args_list[0][0][0],
file_modified)
@patch('os.path.realpath', MagicMock())
@patch('os.path.isfile', MagicMock(return_value=True))
@patch('os.stat', MagicMock())
def test_line_insert_before_after(self):
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
for before_line in ['/srv/salt', '/srv/sa.*t', '/sr.*']:
with patch('os.path.isfile', isfile_mock), \
patch('os.stat', MagicMock(return_value=DummyStat())), \
patch('salt.utils.files.fopen',
mock_open(read_data=file_content)), \
patch('salt.utils.atomicfile.atomic_open',
mock_open()) as atomic_open_mock:
filemod.line(name, content=cfg_content, before=before_line, mode='insert')
handles = atomic_open_mock.filehandles[name]
# We should only have opened the file once
open_count = len(handles)
assert open_count == 1, open_count
# We should only have invoked .write() once...
write_count = len(handles[0].write.call_args_list)
assert write_count == 1, write_count
# ... with the updated content
write_content = handles[0].write.call_args_list[0][0][0]
assert write_content == file_modified, write_content
@with_tempfile()
def test_line_insert_before_after(self, name):
'''
Test for file.line for insertion before specific line, using pattern and no patterns.
@ -1146,20 +1181,29 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
' - /srv/sugar'
])
cfg_content = '- /srv/coriander'
for b_line, a_line in [('/srv/sugar', '/srv/salt')]:
files_fopen = mock_open(read_data=file_content)
with patch('salt.utils.files.fopen', files_fopen):
atomic_opener = mock_open()
with patch('salt.utils.atomicfile.atomic_open', atomic_opener):
filemod.line('foo', content=cfg_content, before=b_line, after=a_line, mode='insert')
self.assertEqual(len(atomic_opener().write.call_args_list), 1)
self.assertEqual(atomic_opener().write.call_args_list[0][0][0],
file_modified)
@patch('os.path.realpath', MagicMock())
@patch('os.path.isfile', MagicMock(return_value=True))
@patch('os.stat', MagicMock())
def test_line_insert_start(self):
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
for b_line, a_line in [('/srv/sugar', '/srv/salt')]:
with patch('os.path.isfile', isfile_mock), \
patch('os.stat', MagicMock(return_value=DummyStat())), \
patch('salt.utils.files.fopen',
mock_open(read_data=file_content)), \
patch('salt.utils.atomicfile.atomic_open',
mock_open()) as atomic_open_mock:
filemod.line(name, content=cfg_content, before=b_line, after=a_line, mode='insert')
handles = atomic_open_mock.filehandles[name]
# We should only have opened the file once
open_count = len(handles)
assert open_count == 1, open_count
# We should only have invoked .write() once...
write_count = len(handles[0].write.call_args_list)
assert write_count == 1, write_count
# ... with the updated content
write_content = handles[0].write.call_args_list[0][0][0]
assert write_content == file_modified, write_content
@with_tempfile()
def test_line_insert_start(self, name):
'''
Test for file.line for insertion at the beginning of the file
:return:
@ -1178,19 +1222,28 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
' - /srv/salt',
' - /srv/sugar'
])
files_fopen = mock_open(read_data=file_content)
with patch('salt.utils.files.fopen', files_fopen):
atomic_opener = mock_open()
with patch('salt.utils.atomicfile.atomic_open', atomic_opener):
filemod.line('foo', content=cfg_content, location='start', mode='insert')
self.assertEqual(len(atomic_opener().write.call_args_list), 1)
self.assertEqual(atomic_opener().write.call_args_list[0][0][0],
file_modified)
@patch('os.path.realpath', MagicMock())
@patch('os.path.isfile', MagicMock(return_value=True))
@patch('os.stat', MagicMock())
def test_line_insert_end(self):
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
with patch('os.path.isfile', isfile_mock), \
patch('os.stat', MagicMock(return_value=DummyStat())), \
patch('salt.utils.files.fopen',
mock_open(read_data=file_content)), \
patch('salt.utils.atomicfile.atomic_open',
mock_open()) as atomic_open_mock:
filemod.line(name, content=cfg_content, location='start', mode='insert')
handles = atomic_open_mock.filehandles[name]
# We should only have opened the file once
open_count = len(handles)
assert open_count == 1, open_count
# We should only have invoked .write() once...
write_count = len(handles[0].write.call_args_list)
assert write_count == 1, write_count
# ... with the updated content
write_content = handles[0].write.call_args_list[0][0][0]
assert write_content == file_modified, write_content
@with_tempfile()
def test_line_insert_end(self, name):
'''
Test for file.line for insertion at the end of the file (append)
:return:
@ -1209,19 +1262,28 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
' - /srv/sugar',
cfg_content
])
files_fopen = mock_open(read_data=file_content)
with patch('salt.utils.files.fopen', files_fopen):
atomic_opener = mock_open()
with patch('salt.utils.atomicfile.atomic_open', atomic_opener):
filemod.line('foo', content=cfg_content, location='end', mode='insert')
self.assertEqual(len(atomic_opener().write.call_args_list), 1)
self.assertEqual(atomic_opener().write.call_args_list[0][0][0],
file_modified)
@patch('os.path.realpath', MagicMock())
@patch('os.path.isfile', MagicMock(return_value=True))
@patch('os.stat', MagicMock())
def test_line_insert_ensure_before(self):
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
with patch('os.path.isfile', isfile_mock), \
patch('os.stat', MagicMock(return_value=DummyStat())), \
patch('salt.utils.files.fopen',
mock_open(read_data=file_content)), \
patch('salt.utils.atomicfile.atomic_open',
mock_open()) as atomic_open_mock:
filemod.line(name, content=cfg_content, location='end', mode='insert')
handles = atomic_open_mock.filehandles[name]
# We should only have opened the file once
open_count = len(handles)
assert open_count == 1, open_count
# We should only have invoked .write() once...
write_count = len(handles[0].write.call_args_list)
assert write_count == 1, write_count
# ... with the updated content
write_content = handles[0].write.call_args_list[0][0][0]
assert write_content == file_modified, write_content
@with_tempfile()
def test_line_insert_ensure_before(self, name):
'''
Test for file.line for insertion ensuring the line is before
:return:
@ -1238,19 +1300,28 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
cfg_content,
'exit 0'
])
files_fopen = mock_open(read_data=file_content)
with patch('salt.utils.files.fopen', files_fopen):
atomic_opener = mock_open()
with patch('salt.utils.atomicfile.atomic_open', atomic_opener):
filemod.line('foo', content=cfg_content, before='exit 0', mode='ensure')
self.assertEqual(len(atomic_opener().write.call_args_list), 1)
self.assertEqual(atomic_opener().write.call_args_list[0][0][0],
file_modified)
@patch('os.path.realpath', MagicMock())
@patch('os.path.isfile', MagicMock(return_value=True))
@patch('os.stat', MagicMock())
def test_line_insert_ensure_after(self):
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
with patch('os.path.isfile', isfile_mock), \
patch('os.stat', MagicMock(return_value=DummyStat())), \
patch('salt.utils.files.fopen',
mock_open(read_data=file_content)), \
patch('salt.utils.atomicfile.atomic_open',
mock_open()) as atomic_open_mock:
filemod.line(name, content=cfg_content, before='exit 0', mode='ensure')
handles = atomic_open_mock.filehandles[name]
# We should only have opened the file once
open_count = len(handles)
assert open_count == 1, open_count
# We should only have invoked .write() once...
write_count = len(handles[0].write.call_args_list)
assert write_count == 1, write_count
# ... with the updated content
write_content = handles[0].write.call_args_list[0][0][0]
assert write_content == file_modified, write_content
@with_tempfile()
def test_line_insert_ensure_after(self, name):
'''
Test for file.line for insertion ensuring the line is after
:return:
@ -1265,19 +1336,28 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
'/etc/init.d/someservice restart',
cfg_content
])
files_fopen = mock_open(read_data=file_content)
with patch('salt.utils.files.fopen', files_fopen):
atomic_opener = mock_open()
with patch('salt.utils.atomicfile.atomic_open', atomic_opener):
filemod.line('foo', content=cfg_content, after='/etc/init.d/someservice restart', mode='ensure')
self.assertEqual(len(atomic_opener().write.call_args_list), 1)
self.assertEqual(atomic_opener().write.call_args_list[0][0][0],
file_modified)
@patch('os.path.realpath', MagicMock())
@patch('os.path.isfile', MagicMock(return_value=True))
@patch('os.stat', MagicMock())
def test_line_insert_ensure_beforeafter_twolines(self):
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
with patch('os.path.isfile', isfile_mock), \
patch('os.stat', MagicMock(return_value=DummyStat())), \
patch('salt.utils.files.fopen',
mock_open(read_data=file_content)), \
patch('salt.utils.atomicfile.atomic_open',
mock_open()) as atomic_open_mock:
filemod.line(name, content=cfg_content, after='/etc/init.d/someservice restart', mode='ensure')
handles = atomic_open_mock.filehandles[name]
# We should only have opened the file once
open_count = len(handles)
assert open_count == 1, open_count
# We should only have invoked .write() once...
write_count = len(handles[0].write.call_args_list)
assert write_count == 1, write_count
# ... with the updated content
write_content = handles[0].write.call_args_list[0][0][0]
assert write_content == file_modified, write_content
@with_tempfile()
def test_line_insert_ensure_beforeafter_twolines(self, name):
'''
Test for file.line for insertion ensuring the line is between two lines
:return:
@ -1291,23 +1371,32 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
# pylint: enable=W1401
after, before = file_content.split(os.linesep)
file_modified = os.linesep.join([after, cfg_content, before])
for (_after, _before) in [(after, before), ('NAME_.*', 'SKEL_.*')]:
files_fopen = mock_open(read_data=file_content)
with patch('salt.utils.files.fopen', files_fopen):
atomic_opener = mock_open()
with patch('salt.utils.atomicfile.atomic_open', atomic_opener):
filemod.line('foo', content=cfg_content, after=_after, before=_before, mode='ensure')
self.assertEqual(len(atomic_opener().write.call_args_list), 1)
self.assertEqual(atomic_opener().write.call_args_list[0][0][0],
file_modified)
@patch('os.path.realpath', MagicMock())
@patch('os.path.isfile', MagicMock(return_value=True))
@patch('os.stat', MagicMock())
def test_line_insert_ensure_beforeafter_twolines_exists(self):
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
for (_after, _before) in [(after, before), ('NAME_.*', 'SKEL_.*')]:
with patch('os.path.isfile', isfile_mock), \
patch('os.stat', MagicMock(return_value=DummyStat())), \
patch('salt.utils.files.fopen',
mock_open(read_data=file_content)), \
patch('salt.utils.atomicfile.atomic_open',
mock_open()) as atomic_open_mock:
filemod.line(name, content=cfg_content, after=_after, before=_before, mode='ensure')
handles = atomic_open_mock.filehandles[name]
# We should only have opened the file once
open_count = len(handles)
assert open_count == 1, open_count
# We should only have invoked .write() once...
write_count = len(handles[0].write.call_args_list)
assert write_count == 1, write_count
# ... with the updated content
write_content = handles[0].write.call_args_list[0][0][0]
assert write_content == file_modified, write_content
@with_tempfile()
def test_line_insert_ensure_beforeafter_twolines_exists(self, name):
'''
Test for file.line for insertion ensuring the line is between two lines where content already exists
:return:
Test for file.line for insertion ensuring the line is between two lines
where content already exists
'''
cfg_content = 'EXTRA_GROUPS="dialout"'
# pylint: disable=W1401
@ -1318,24 +1407,28 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
])
# pylint: enable=W1401
after, before = file_content.split(os.linesep)[0], file_content.split(os.linesep)[2]
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
for (_after, _before) in [(after, before), ('NAME_.*', 'SKEL_.*')]:
files_fopen = mock_open(read_data=file_content)
with patch('salt.utils.files.fopen', files_fopen):
atomic_opener = mock_open()
with patch('salt.utils.atomicfile.atomic_open', atomic_opener):
result = filemod.line('foo', content=cfg_content, after=_after, before=_before, mode='ensure')
self.assertEqual(len(atomic_opener().write.call_args_list), 0)
self.assertEqual(result, False)
with patch('os.path.isfile', isfile_mock), \
patch('os.stat', MagicMock(return_value=DummyStat())), \
patch('salt.utils.files.fopen',
mock_open(read_data=file_content)), \
patch('salt.utils.atomicfile.atomic_open',
mock_open()) as atomic_open_mock:
result = filemod.line('foo', content=cfg_content, after=_after, before=_before, mode='ensure')
# We should not have opened the file
assert not atomic_open_mock.filehandles
# No changes should have been made
assert result is False
@patch('os.path.realpath', MagicMock())
@patch('os.path.isfile', MagicMock(return_value=True))
@patch('os.stat', MagicMock())
def test_line_insert_ensure_beforeafter_rangelines(self):
'''
Test for file.line for insertion ensuring the line is between two lines within the range.
This expected to bring no changes.
:return:
Test for file.line for insertion ensuring the line is between two lines
within the range. This expected to bring no changes.
'''
cfg_content = 'EXTRA_GROUPS="dialout cdrom floppy audio video plugdev users"'
# pylint: disable=W1401
@ -1354,10 +1447,8 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
'Found more than one line between boundaries "before" and "after"',
six.text_type(cmd_err))
@patch('os.path.realpath', MagicMock())
@patch('os.path.isfile', MagicMock(return_value=True))
@patch('os.stat', MagicMock())
def test_line_delete(self):
@with_tempfile()
def test_line_delete(self, name):
'''
Test for file.line for deletion of specific line
:return:
@ -1375,20 +1466,28 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
' - /srv/salt',
' - /srv/sugar'
])
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
for content in ['/srv/pepper', '/srv/pepp*', '/srv/p.*', '/sr.*pe.*']:
files_fopen = mock_open(read_data=file_content)
with patch('salt.utils.files.fopen', files_fopen):
atomic_opener = mock_open()
with patch('salt.utils.atomicfile.atomic_open', atomic_opener):
filemod.line('foo', content=content, mode='delete')
self.assertEqual(len(atomic_opener().write.call_args_list), 1)
self.assertEqual(atomic_opener().write.call_args_list[0][0][0],
file_modified)
with patch('os.path.isfile', isfile_mock), \
patch('os.stat', MagicMock(return_value=DummyStat())), \
patch('salt.utils.files.fopen', files_fopen), \
patch('salt.utils.atomicfile.atomic_open', mock_open()) as atomic_open_mock:
filemod.line(name, content=content, mode='delete')
handles = atomic_open_mock.filehandles[name]
# We should only have opened the file once
open_count = len(handles)
assert open_count == 1, open_count
# We should only have invoked .write() once...
write_count = len(handles[0].write.call_args_list)
assert write_count == 1, write_count
# ... with the updated content
write_content = handles[0].write.call_args_list[0][0][0]
assert write_content == file_modified, write_content
@patch('os.path.realpath', MagicMock())
@patch('os.path.isfile', MagicMock(return_value=True))
@patch('os.stat', MagicMock())
def test_line_replace(self):
@with_tempfile()
def test_line_replace(self, name):
'''
Test for file.line for replacement of specific line
:return:
@ -1407,15 +1506,25 @@ class FilemodLineTests(TestCase, LoaderModuleMockMixin):
' - /srv/natrium-chloride',
' - /srv/sugar'
])
isfile_mock = MagicMock(side_effect=lambda x: True if x == name else DEFAULT)
for match in ['/srv/pepper', '/srv/pepp*', '/srv/p.*', '/sr.*pe.*']:
files_fopen = mock_open(read_data=file_content)
with patch('salt.utils.files.fopen', files_fopen):
atomic_opener = mock_open()
with patch('salt.utils.atomicfile.atomic_open', atomic_opener):
filemod.line('foo', content='- /srv/natrium-chloride', match=match, mode='replace')
self.assertEqual(len(atomic_opener().write.call_args_list), 1)
self.assertEqual(atomic_opener().write.call_args_list[0][0][0],
file_modified)
with patch('os.path.isfile', isfile_mock), \
patch('os.stat', MagicMock(return_value=DummyStat())), \
patch('salt.utils.files.fopen', files_fopen), \
patch('salt.utils.atomicfile.atomic_open', mock_open()) as atomic_open_mock:
filemod.line(name, content='- /srv/natrium-chloride', match=match, mode='replace')
handles = atomic_open_mock.filehandles[name]
# We should only have opened the file once
open_count = len(handles)
assert open_count == 1, open_count
# We should only have invoked .write() once...
write_count = len(handles[0].write.call_args_list)
assert write_count == 1, write_count
# ... with the updated content
write_content = handles[0].write.call_args_list[0][0][0]
assert write_content == file_modified, write_content
class FileBasicsTestCase(TestCase, LoaderModuleMockMixin):

View file

@ -5,6 +5,7 @@
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import errno
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
@ -43,15 +44,13 @@ class GrublegacyTestCase(TestCase, LoaderModuleMockMixin):
'''
Test for Parse GRUB conf file
'''
mock = MagicMock(side_effect=IOError('foo'))
with patch('salt.utils.files.fopen', mock):
with patch.object(grub_legacy, '_detect_conf', return_value='A'):
self.assertRaises(CommandExecutionError, grub_legacy.conf)
file_data = IOError(errno.EACCES, 'Permission denied')
with patch('salt.utils.files.fopen', mock_open(read_data=file_data)), \
patch.object(grub_legacy, '_detect_conf', return_value='A'):
self.assertRaises(CommandExecutionError, grub_legacy.conf)
file_data = salt.utils.stringutils.to_str('\n'.join(['#', 'A B C D,E,F G H']))
with patch('salt.utils.files.fopen',
mock_open(read_data=file_data), create=True) as f_mock:
f_mock.return_value.__iter__.return_value = file_data.splitlines()
with patch.object(grub_legacy, '_detect_conf', return_value='A'):
self.assertEqual(grub_legacy.conf(),
{'A': 'B C D,E,F G H', 'stanzas': []})
with patch('salt.utils.files.fopen', mock_open(read_data=file_data)), \
patch.object(grub_legacy, '_detect_conf', return_value='A'):
conf = grub_legacy.conf()
assert conf == {'A': 'B C D,E,F G H', 'stanzas': []}, conf

View file

@ -1473,29 +1473,26 @@ class Test_Junos_Module(TestCase, LoaderModuleMockMixin, XMLEqualityMixin):
with patch('jnpr.junos.device.Device.execute') as mock_execute:
mock_execute.return_value = etree.XML(
'<rpc-reply>text rpc reply</rpc-reply>')
m = mock_open()
with patch('salt.utils.files.fopen', m, create=True):
with patch('salt.utils.files.fopen', mock_open(), create=True) as m_open:
junos.rpc('get-chassis-inventory', '/path/to/file', format='text')
handle = m()
handle.write.assert_called_with('text rpc reply')
writes = m_open.write_calls()
assert writes == ['text rpc reply'], writes
def test_rpc_write_file_format_json(self):
with patch('jnpr.junos.device.Device.execute') as mock_execute, \
patch('salt.utils.json.dumps') as mock_dumps:
mock_dumps.return_value = 'json rpc reply'
m = mock_open()
with patch('salt.utils.files.fopen', m, create=True):
with patch('salt.utils.files.fopen', mock_open(), create=True) as m_open:
junos.rpc('get-chassis-inventory', '/path/to/file', format='json')
handle = m()
handle.write.assert_called_with('json rpc reply')
writes = m_open.write_calls()
assert writes == ['json rpc reply'], writes
def test_rpc_write_file(self):
with patch('salt.modules.junos.jxmlease.parse') as mock_parse, \
patch('salt.modules.junos.etree.tostring') as mock_tostring, \
patch('jnpr.junos.device.Device.execute') as mock_execute:
mock_tostring.return_value = 'xml rpc reply'
m = mock_open()
with patch('salt.utils.files.fopen', m, create=True):
with patch('salt.utils.files.fopen', mock_open(), create=True) as m_open:
junos.rpc('get-chassis-inventory', '/path/to/file')
handle = m()
handle.write.assert_called_with('xml rpc reply')
writes = m_open.write_calls()
assert writes == ['xml rpc reply'], writes

View file

@ -101,6 +101,7 @@ class LinuxSysctlTestCase(TestCase, LoaderModuleMockMixin):
'''
Tests successful add of config file when previously not one
'''
config = '/etc/sysctl.conf'
with patch('os.path.isfile', MagicMock(return_value=False)), \
patch('os.path.exists', MagicMock(return_value=True)):
asn_cmd = {'pid': 1337, 'retcode': 0, 'stderr': '',
@ -110,18 +111,20 @@ class LinuxSysctlTestCase(TestCase, LoaderModuleMockMixin):
sys_cmd = 'systemd 208\n+PAM +LIBWRAP'
mock_sys_cmd = MagicMock(return_value=sys_cmd)
with patch('salt.utils.files.fopen', mock_open()) as m_open:
with patch.dict(linux_sysctl.__context__, {'salt.utils.systemd.version': 232}):
with patch.dict(linux_sysctl.__salt__,
{'cmd.run_stdout': mock_sys_cmd,
'cmd.run_all': mock_asn_cmd}):
with patch.dict(systemd.__context__,
{'salt.utils.systemd.booted': True,
'salt.utils.systemd.version': 232}):
linux_sysctl.persist('net.ipv4.ip_forward', 1)
helper_open = m_open()
helper_open.write.assert_called_once_with(
'#\n# Kernel sysctl configuration\n#\n')
with patch('salt.utils.files.fopen', mock_open()) as m_open, \
patch.dict(linux_sysctl.__context__,
{'salt.utils.systemd.version': 232}), \
patch.dict(linux_sysctl.__salt__,
{'cmd.run_stdout': mock_sys_cmd,
'cmd.run_all': mock_asn_cmd}), \
patch.dict(systemd.__context__,
{'salt.utils.systemd.booted': True,
'salt.utils.systemd.version': 232}):
linux_sysctl.persist('net.ipv4.ip_forward', 1, config=config)
writes = m_open.write_calls()
assert writes == [
'#\n# Kernel sysctl configuration\n#\n'
], writes
def test_persist_read_conf_success(self):
'''

View file

@ -17,9 +17,9 @@ from tests.support.mock import (
MagicMock,
mock_open,
patch,
call,
NO_MOCK,
NO_MOCK_REASON
NO_MOCK_REASON,
DEFAULT
)
@ -67,9 +67,9 @@ class DarwinSysctlTestCase(TestCase, LoaderModuleMockMixin):
'''
Tests adding of config file failure
'''
with patch('salt.utils.files.fopen', mock_open()) as m_open, \
read_data = IOError(13, 'Permission denied', '/file')
with patch('salt.utils.files.fopen', mock_open(read_data=read_data)), \
patch('os.path.isfile', MagicMock(return_value=False)):
m_open.side_effect = IOError(13, 'Permission denied', '/file')
self.assertRaises(CommandExecutionError,
mac_sysctl.persist,
'net.inet.icmp.icmplim',
@ -77,29 +77,45 @@ class DarwinSysctlTestCase(TestCase, LoaderModuleMockMixin):
def test_persist_no_conf_success(self):
'''
Tests successful add of config file when previously not one
Tests successful add of config file when it did not already exist
'''
config = '/etc/sysctl.conf'
isfile_mock = MagicMock(
side_effect=lambda x: False if x == config else DEFAULT
)
with patch('salt.utils.files.fopen', mock_open()) as m_open, \
patch('os.path.isfile', MagicMock(return_value=False)):
mac_sysctl.persist('net.inet.icmp.icmplim', 50)
helper_open = m_open()
helper_open.write.assert_called_once_with(
'#\n# Kernel sysctl configuration\n#\n')
patch('os.path.isfile', isfile_mock):
mac_sysctl.persist('net.inet.icmp.icmplim', 50, config=config)
# We only should have opened the one file
num_handles = len(m_open.filehandles)
assert num_handles == 1, num_handles
writes = m_open.write_calls()
# We should have called .write() only once, with the expected
# content
num_writes = len(writes)
assert num_writes == 1, num_writes
assert writes[0] == '#\n# Kernel sysctl configuration\n#\n', writes[0]
def test_persist_success(self):
'''
Tests successful write to existing sysctl file
'''
config = '/etc/sysctl.conf'
to_write = '#\n# Kernel sysctl configuration\n#\n'
m_calls_list = [call.writelines([
writelines_calls = [[
'#\n',
'# Kernel sysctl configuration\n',
'#\n',
'net.inet.icmp.icmplim=50\n',
])]
]]
isfile_mock = MagicMock(
side_effect=lambda x: True if x == config else DEFAULT
)
with patch('salt.utils.files.fopen', mock_open(read_data=to_write)) as m_open, \
patch('os.path.isfile', MagicMock(return_value=True)):
mac_sysctl.persist('net.inet.icmp.icmplim', 50, config=to_write)
helper_open = m_open()
calls_list = helper_open.method_calls
self.assertEqual(calls_list, m_calls_list)
patch('os.path.isfile', isfile_mock):
mac_sysctl.persist('net.inet.icmp.icmplim', 50, config=config)
# We only should have opened the one file
num_handles = len(m_open.filehandles)
assert num_handles == 1, num_handles
writes = m_open.writelines_calls()
assert writes == writelines_calls, writes

View file

@ -87,20 +87,19 @@ class MountTestCase(TestCase, LoaderModuleMockMixin):
with patch.object(os.path, 'isfile', mock):
self.assertEqual(mount.fstab(), {})
file_data = '\n'.join(['#', 'A B C D,E,F G H'])
mock = MagicMock(return_value=True)
with patch.dict(mount.__grains__, {'kernel': ''}):
with patch.object(os.path, 'isfile', mock):
file_data = '\n'.join(['#',
'A B C D,E,F G H'])
with patch('salt.utils.files.fopen',
mock_open(read_data=file_data),
create=True) as m:
m.return_value.__iter__.return_value = file_data.splitlines()
self.assertEqual(mount.fstab(), {'B': {'device': 'A',
'dump': 'G',
'fstype': 'C',
'opts': ['D', 'E', 'F'],
'pass': 'H'}})
with patch.dict(mount.__grains__, {'kernel': ''}), \
patch.object(os.path, 'isfile', mock), \
patch('salt.utils.files.fopen', mock_open(read_data=file_data)):
fstab = mount.fstab()
assert fstab == {
'B': {'device': 'A',
'dump': 'G',
'fstype': 'C',
'opts': ['D', 'E', 'F'],
'pass': 'H'}
}, fstab
def test_vfstab(self):
'''
@ -110,21 +109,23 @@ class MountTestCase(TestCase, LoaderModuleMockMixin):
with patch.object(os.path, 'isfile', mock):
self.assertEqual(mount.vfstab(), {})
file_data = textwrap.dedent('''\
#
swap - /tmp tmpfs - yes size=2048m
''')
mock = MagicMock(return_value=True)
with patch.dict(mount.__grains__, {'kernel': 'SunOS'}):
with patch.object(os.path, 'isfile', mock):
file_data = '\n'.join(['#',
'swap - /tmp tmpfs - yes size=2048m'])
with patch('salt.utils.files.fopen',
mock_open(read_data=file_data),
create=True) as m:
m.return_value.__iter__.return_value = file_data.splitlines()
self.assertEqual(mount.fstab(), {'/tmp': {'device': 'swap',
'device_fsck': '-',
'fstype': 'tmpfs',
'mount_at_boot': 'yes',
'opts': ['size=2048m'],
'pass_fsck': '-'}})
with patch.dict(mount.__grains__, {'kernel': 'SunOS'}), \
patch.object(os.path, 'isfile', mock), \
patch('salt.utils.files.fopen', mock_open(read_data=file_data)):
vfstab = mount.vfstab()
assert vfstab == {
'/tmp': {'device': 'swap',
'device_fsck': '-',
'fstype': 'tmpfs',
'mount_at_boot': 'yes',
'opts': ['size=2048m'],
'pass_fsck': '-'}
}, vfstab
def test_rm_fstab(self):
'''
@ -274,30 +275,36 @@ class MountTestCase(TestCase, LoaderModuleMockMixin):
Return a dict containing information on active swap
'''
file_data = '\n'.join(['Filename Type Size Used Priority',
'/dev/sda1 partition 31249404 4100 -1'])
file_data = textwrap.dedent('''\
Filename Type Size Used Priority
/dev/sda1 partition 31249404 4100 -1
''')
with patch.dict(mount.__grains__, {'os': '', 'kernel': ''}):
with patch('salt.utils.files.fopen',
mock_open(read_data=file_data),
create=True) as m:
m.return_value.__iter__.return_value = file_data.splitlines()
with patch('salt.utils.files.fopen', mock_open(read_data=file_data)):
swaps = mount.swaps()
assert swaps == {
'/dev/sda1': {
'priority': '-1',
'size': '31249404',
'type': 'partition',
'used': '4100'}
}, swaps
self.assertDictEqual(mount.swaps(), {'/dev/sda1':
{'priority': '-1',
'size': '31249404',
'type': 'partition',
'used': '4100'}})
file_data = '\n'.join(['Device Size Used Unknown Unknown Priority',
'/dev/sda1 31249404 4100 unknown unknown -1'])
file_data = textwrap.dedent('''\
Device Size Used Unknown Unknown Priority
/dev/sda1 31249404 4100 unknown unknown -1
''')
mock = MagicMock(return_value=file_data)
with patch.dict(mount.__grains__, {'os': 'OpenBSD', 'kernel': 'OpenBSD'}):
with patch.dict(mount.__salt__, {'cmd.run_stdout': mock}):
self.assertDictEqual(mount.swaps(), {'/dev/sda1':
{'priority': '-1',
'size': '31249404',
'type': 'partition',
'used': '4100'}})
with patch.dict(mount.__grains__, {'os': 'OpenBSD', 'kernel': 'OpenBSD'}), \
patch.dict(mount.__salt__, {'cmd.run_stdout': mock}):
swaps = mount.swaps()
assert swaps == {
'/dev/sda1': {
'priority': '-1',
'size': '31249404',
'type': 'partition',
'used': '4100'}
}, swaps
def test_swapon(self):
'''

View file

@ -230,16 +230,14 @@ class NetworkTestCase(TestCase, LoaderModuleMockMixin):
Test for Modify hostname
'''
self.assertFalse(network.mod_hostname(None))
file_d = '\n'.join(['#', 'A B C D,E,F G H'])
with patch.object(salt.utils.path, 'which', return_value='hostname'):
with patch.dict(network.__salt__,
{'cmd.run': MagicMock(return_value=None)}):
file_d = '\n'.join(['#', 'A B C D,E,F G H'])
with patch('salt.utils.files.fopen', mock_open(read_data=file_d),
create=True) as mfi:
mfi.return_value.__iter__.return_value = file_d.splitlines()
with patch.dict(network.__grains__, {'os_family': 'A'}):
self.assertTrue(network.mod_hostname('hostname'))
with patch.object(salt.utils.path, 'which', return_value='hostname'), \
patch.dict(network.__salt__,
{'cmd.run': MagicMock(return_value=None)}), \
patch.dict(network.__grains__, {'os_family': 'A'}), \
patch('salt.utils.files.fopen', mock_open(read_data=file_d)):
self.assertTrue(network.mod_hostname('hostname'))
def test_connect(self):
'''

View file

@ -10,6 +10,7 @@ from __future__ import absolute_import, print_function, unicode_literals
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.unit import TestCase, skipIf
from tests.support.mock import (
MagicMock,
mock_open,
patch,
NO_MOCK,
@ -32,21 +33,21 @@ class NfsTestCase(TestCase, LoaderModuleMockMixin):
'''
Test for List configured exports
'''
file_d = '\n'.join(['A B1(23'])
with patch('salt.utils.files.fopen',
mock_open(read_data=file_d), create=True) as mfi:
mfi.return_value.__iter__.return_value = file_d.splitlines()
self.assertDictEqual(nfs3.list_exports(),
{'A': [{'hosts': 'B1', 'options': ['23']}]})
with patch('salt.utils.files.fopen', mock_open(read_data='A B1(23')):
exports = nfs3.list_exports()
assert exports == {'A': [{'hosts': 'B1', 'options': ['23']}]}, exports
def test_del_export(self):
'''
Test for Remove an export
'''
with patch.object(nfs3,
'list_exports',
return_value={'A':
[{'hosts':
['B1'], 'options': ['23']}]}):
with patch.object(nfs3, '_write_exports', return_value=None):
self.assertDictEqual(nfs3.del_export(path='A'), {})
list_exports_mock = MagicMock(return_value={
'A': [
{'hosts': ['B1'],
'options': ['23']},
],
})
with patch.object(nfs3, 'list_exports', list_exports_mock), \
patch.object(nfs3, '_write_exports', MagicMock(return_value=None)):
result = nfs3.del_export(path='A')
assert result == {}, result

View file

@ -140,8 +140,9 @@ class PuppetTestCase(TestCase, LoaderModuleMockMixin):
mock_open(read_data="resources: 1")):
self.assertDictEqual(puppet.summary(), {'resources': 1})
with patch('salt.utils.files.fopen', mock_open()) as m_open:
m_open.side_effect = IOError(13, 'Permission denied:', '/file')
permission_error = IOError(os.errno.EACCES, 'Permission denied:', '/file')
with patch('salt.utils.files.fopen',
mock_open(read_data=permission_error)) as m_open:
self.assertRaises(CommandExecutionError, puppet.summary)
def test_plugin_sync(self):

View file

@ -358,13 +358,14 @@ class SnapperTestCase(TestCase, LoaderModuleMockMixin):
patch('os.path.isfile', MagicMock(side_effect=[True, True, False, True])), \
patch('os.path.isdir', MagicMock(return_value=False)), \
patch('salt.modules.snapper.snapper.ListConfigs', MagicMock(return_value=DBUS_RET['ListConfigs'])):
fopen_effect = [
mock_open(read_data=FILE_CONTENT["/tmp/foo"]['pre']).return_value,
mock_open(read_data=FILE_CONTENT["/tmp/foo"]['post']).return_value,
mock_open(read_data=FILE_CONTENT["/tmp/foo2"]['post']).return_value,
]
with patch('salt.utils.files.fopen') as fopen_mock:
fopen_mock.side_effect = fopen_effect
contents = {
'*/tmp/foo': [
FILE_CONTENT['/tmp/foo']['pre'],
FILE_CONTENT['/tmp/foo']['post'],
],
'*/tmp/foo2': FILE_CONTENT['/tmp/foo2']['post'],
}
with patch('salt.utils.files.fopen', mock_open(read_data=contents)):
module_ret = {
"/tmp/foo": MODULE_RET['DIFF']["/tmp/foo"],
"/tmp/foo2": MODULE_RET['DIFF']["/tmp/foo2"],
@ -387,12 +388,7 @@ class SnapperTestCase(TestCase, LoaderModuleMockMixin):
"f18f971f1517449208a66589085ddd3723f7f6cefb56c141e3d97ae49e1d87fa",
])
}):
fopen_effect = [
mock_open(read_data="dummy binary").return_value,
mock_open(read_data="dummy binary").return_value,
]
with patch('salt.utils.files.fopen') as fopen_mock:
fopen_mock.side_effect = fopen_effect
with patch('salt.utils.files.fopen', mock_open(read_data='dummy binary')):
module_ret = {
"/tmp/foo3": MODULE_RET['DIFF']["/tmp/foo3"],
}

View file

@ -203,13 +203,11 @@ class TimezoneModuleTestCase(TestCase, LoaderModuleMockMixin):
:return:
'''
with patch.dict(timezone.__grains__, {'os_family': ['Gentoo']}):
_fopen = mock_open()
with patch('salt.utils.files.fopen', _fopen):
with patch('salt.utils.files.fopen', mock_open()) as m_open:
assert timezone.set_zone(self.TEST_TZ)
name, args, kwargs = _fopen.mock_calls[0]
assert args == ('/etc/timezone', 'w')
name, args, kwargs = _fopen.return_value.__enter__.return_value.write.mock_calls[0]
assert args == ('UTC',)
fh_ = m_open.filehandles['/etc/timezone'][0]
assert fh_.call.args == ('/etc/timezone', 'w'), fh_.call.args
assert fh_.write_calls == ['UTC', '\n'], fh_.write_calls
@skipIf(salt.utils.platform.is_windows(), 'os.symlink not available in Windows')
@patch('salt.utils.path.which', MagicMock(return_value=False))
@ -222,13 +220,11 @@ class TimezoneModuleTestCase(TestCase, LoaderModuleMockMixin):
:return:
'''
with patch.dict(timezone.__grains__, {'os_family': ['Debian']}):
_fopen = mock_open()
with patch('salt.utils.files.fopen', _fopen):
with patch('salt.utils.files.fopen', mock_open()) as m_open:
assert timezone.set_zone(self.TEST_TZ)
name, args, kwargs = _fopen.mock_calls[0]
assert args == ('/etc/timezone', 'w')
name, args, kwargs = _fopen.return_value.__enter__.return_value.write.mock_calls[0]
assert args == ('UTC',)
fh_ = m_open.filehandles['/etc/timezone'][0]
assert fh_.call.args == ('/etc/timezone', 'w'), fh_.call.args
assert fh_.write_calls == ['UTC', '\n'], fh_.write_calls
@skipIf(salt.utils.platform.is_windows(), 'os.symlink not available in Windows')
@patch('salt.utils.path.which', MagicMock(return_value=True))

View file

@ -8,7 +8,14 @@ import shutil
# salt testing libs
from tests.support.unit import TestCase, skipIf
from tests.support.mock import patch, call, mock_open, NO_MOCK, NO_MOCK_REASON, MagicMock
from tests.support.mock import(
patch,
mock_open,
NO_MOCK,
NO_MOCK_REASON,
MagicMock,
MockCall,
)
# salt libs
from salt.ext import six
@ -96,19 +103,23 @@ SIG = (
@skipIf(HAS_M2, 'm2crypto is used by salt.crypt if installed')
class CryptTestCase(TestCase):
def test_gen_keys(self):
open_priv_wb = MockCall('/keydir{0}keyname.pem'.format(os.sep), 'wb+')
open_pub_wb = MockCall('/keydir{0}keyname.pub'.format(os.sep), 'wb+')
with patch.multiple(os, umask=MagicMock(), chmod=MagicMock(),
access=MagicMock(return_value=True)):
with patch('salt.utils.files.fopen', mock_open()):
open_priv_wb = call('/keydir{0}keyname.pem'.format(os.sep), 'wb+')
open_pub_wb = call('/keydir{0}keyname.pub'.format(os.sep), 'wb+')
with patch('os.path.isfile', return_value=True):
self.assertEqual(crypt.gen_keys('/keydir', 'keyname', 2048), '/keydir{0}keyname.pem'.format(os.sep))
self.assertNotIn(open_priv_wb, salt.utils.files.fopen.mock_calls)
self.assertNotIn(open_pub_wb, salt.utils.files.fopen.mock_calls)
with patch('os.path.isfile', return_value=False):
with patch('salt.utils.files.fopen', mock_open()):
crypt.gen_keys('/keydir', 'keyname', 2048)
salt.utils.files.fopen.assert_has_calls([open_priv_wb, open_pub_wb], any_order=True)
with patch('salt.utils.files.fopen', mock_open()) as m_open, \
patch('os.path.isfile', return_value=True):
result = crypt.gen_keys('/keydir', 'keyname', 2048)
assert result == '/keydir{0}keyname.pem'.format(os.sep), result
assert open_priv_wb not in m_open.calls
assert open_pub_wb not in m_open.calls
with patch('salt.utils.files.fopen', mock_open()) as m_open, \
patch('os.path.isfile', return_value=False):
crypt.gen_keys('/keydir', 'keyname', 2048)
assert open_priv_wb in m_open.calls
assert open_pub_wb in m_open.calls
@patch('os.umask', MagicMock())
@patch('os.chmod', MagicMock())
@ -116,17 +127,23 @@ class CryptTestCase(TestCase):
@patch('os.access', MagicMock(return_value=True))
def test_gen_keys_with_passphrase(self):
key_path = os.path.join(os.sep, 'keydir')
with patch('salt.utils.files.fopen', mock_open()):
open_priv_wb = call(os.path.join(key_path, 'keyname.pem'), 'wb+')
open_pub_wb = call(os.path.join(key_path, 'keyname.pub'), 'wb+')
with patch('os.path.isfile', return_value=True):
self.assertEqual(crypt.gen_keys(key_path, 'keyname', 2048, passphrase='password'), os.path.join(key_path, 'keyname.pem'))
self.assertNotIn(open_priv_wb, salt.utils.files.fopen.mock_calls)
self.assertNotIn(open_pub_wb, salt.utils.files.fopen.mock_calls)
with patch('os.path.isfile', return_value=False):
with patch('salt.utils.files.fopen', mock_open()):
crypt.gen_keys(key_path, 'keyname', 2048)
salt.utils.files.fopen.assert_has_calls([open_priv_wb, open_pub_wb], any_order=True)
open_priv_wb = MockCall(os.path.join(key_path, 'keyname.pem'), 'wb+')
open_pub_wb = MockCall(os.path.join(key_path, 'keyname.pub'), 'wb+')
with patch('salt.utils.files.fopen', mock_open()) as m_open, \
patch('os.path.isfile', return_value=True):
self.assertEqual(crypt.gen_keys(key_path, 'keyname', 2048, passphrase='password'), os.path.join(key_path, 'keyname.pem'))
result = crypt.gen_keys(key_path, 'keyname', 2048,
passphrase='password')
assert result == os.path.join(key_path, 'keyname.pem'), result
assert open_priv_wb not in m_open.calls
assert open_pub_wb not in m_open.calls
with patch('salt.utils.files.fopen', mock_open()) as m_open, \
patch('os.path.isfile', return_value=False):
crypt.gen_keys(key_path, 'keyname', 2048)
assert open_priv_wb in m_open.calls
assert open_pub_wb in m_open.calls
def test_sign_message(self):
key = RSA.importKey(PRIVKEY_DATA)

525
tests/unit/test_mock.py Normal file
View file

@ -0,0 +1,525 @@
# -*- coding: utf-8 -*-
'''
Tests for our mock_open helper
'''
# Import Python Libs
from __future__ import absolute_import, unicode_literals, print_function
import errno
import logging
# Import Salt libs
import salt.utils.data
import salt.utils.files
# Import Salt Testing Libs
from tests.support.mock import patch, mock_open, NO_MOCK, NO_MOCK_REASON
from tests.support.unit import TestCase, skipIf
QUESTIONS = '''\
What is your name?
What is your quest?
What is the airspeed velocity of an unladen swallow?
'''
ANSWERS = '''\
It is Arthur, King of the Britons.
To seek the Holy Grail.
What do you mean? An African or European swallow?
'''
log = logging.getLogger(__name__)
@skipIf(NO_MOCK, NO_MOCK_REASON)
class MockOpenTestCase(TestCase):
'''
Tests for our mock_open helper to ensure that it behaves as closely as
possible to a real filehandle.
'''
@classmethod
def setUpClass(cls):
cls.contents = {'foo.txt': QUESTIONS, 'b*.txt': ANSWERS}
cls.read_data_as_list = [
'foo', 'bar', 'спам',
IOError(errno.EACCES, 'Permission denied')
]
cls.normalized_read_data_as_list = salt.utils.data.decode(
cls.read_data_as_list,
to_str=True
)
cls.read_data_as_list_bytes = salt.utils.data.encode(cls.read_data_as_list)
def tearDown(self):
'''
Each test should read the entire contents of the mocked filehandle(s).
This confirms that the other read functions return empty strings/lists,
to simulate being at EOF.
'''
for handle_name in ('fh', 'fh2', 'fh3'):
try:
fh = getattr(self, handle_name)
except AttributeError:
continue
log.debug('Running tearDown tests for self.%s', handle_name)
result = fh.read(5)
assert not result, result
result = fh.read()
assert not result, result
result = fh.readline()
assert not result, result
result = fh.readlines()
assert not result, result
# Last but not least, try to read using a for loop. This should not
# read anything as we should hit EOF immediately, before the generator
# in the mocked filehandle has a chance to yield anything. So the
# exception will only be raised if we aren't at EOF already.
for line in fh:
raise Exception(
'Instead of EOF, read the following from {0}: {1}'.format(
handle_name,
line
)
)
del fh
def test_read(self):
'''
Test reading the entire file
'''
with patch('salt.utils.files.fopen', mock_open(read_data=QUESTIONS)):
with salt.utils.files.fopen('foo.txt') as self.fh:
result = self.fh.read()
assert result == QUESTIONS, result
def test_read_multifile(self):
'''
Same as test_read, but using multifile support
'''
with patch('salt.utils.files.fopen', mock_open(read_data=self.contents)):
with salt.utils.files.fopen('foo.txt') as self.fh:
result = self.fh.read()
assert result == QUESTIONS, result
with salt.utils.files.fopen('bar.txt') as self.fh2:
result = self.fh2.read()
assert result == ANSWERS, result
with salt.utils.files.fopen('baz.txt') as self.fh3:
result = self.fh3.read()
assert result == ANSWERS, result
try:
with salt.utils.files.fopen('helloworld.txt'):
raise Exception('No globs should have matched')
except IOError:
# An IOError is expected here
pass
def test_read_explicit_size(self):
'''
Test reading with explicit sizes
'''
with patch('salt.utils.files.fopen', mock_open(read_data=QUESTIONS)):
with salt.utils.files.fopen('foo.txt') as self.fh:
# Read 10 bytes
result = self.fh.read(10)
assert result == QUESTIONS[:10], result
# Read another 10 bytes
result = self.fh.read(10)
assert result == QUESTIONS[10:20], result
# Read the rest
result = self.fh.read()
assert result == QUESTIONS[20:], result
def test_read_explicit_size_multifile(self):
'''
Same as test_read_explicit_size, but using multifile support
'''
with patch('salt.utils.files.fopen', mock_open(read_data=self.contents)):
with salt.utils.files.fopen('foo.txt') as self.fh:
# Read 10 bytes
result = self.fh.read(10)
assert result == QUESTIONS[:10], result
# Read another 10 bytes
result = self.fh.read(10)
assert result == QUESTIONS[10:20], result
# Read the rest
result = self.fh.read()
assert result == QUESTIONS[20:], result
with salt.utils.files.fopen('bar.txt') as self.fh2:
# Read 10 bytes
result = self.fh2.read(10)
assert result == ANSWERS[:10], result
# Read another 10 bytes
result = self.fh2.read(10)
assert result == ANSWERS[10:20], result
# Read the rest
result = self.fh2.read()
assert result == ANSWERS[20:], result
with salt.utils.files.fopen('baz.txt') as self.fh3:
# Read 10 bytes
result = self.fh3.read(10)
assert result == ANSWERS[:10], result
# Read another 10 bytes
result = self.fh3.read(10)
assert result == ANSWERS[10:20], result
# Read the rest
result = self.fh3.read()
assert result == ANSWERS[20:], result
try:
with salt.utils.files.fopen('helloworld.txt'):
raise Exception('No globs should have matched')
except IOError:
# An IOError is expected here
pass
def test_read_explicit_size_larger_than_file_size(self):
'''
Test reading with an explicit size larger than the size of read_data.
This ensures that we just return the contents up until EOF and that we
don't raise any errors due to the desired size being larger than the
mocked file's size.
'''
with patch('salt.utils.files.fopen', mock_open(read_data=QUESTIONS)):
with salt.utils.files.fopen('foo.txt') as self.fh:
result = self.fh.read(999999)
assert result == QUESTIONS, result
def test_read_explicit_size_larger_than_file_size_multifile(self):
'''
Same as test_read_explicit_size_larger_than_file_size, but using
multifile support
'''
with patch('salt.utils.files.fopen', mock_open(read_data=self.contents)):
with salt.utils.files.fopen('foo.txt') as self.fh:
result = self.fh.read(999999)
assert result == QUESTIONS, result
with salt.utils.files.fopen('bar.txt') as self.fh2:
result = self.fh2.read(999999)
assert result == ANSWERS, result
with salt.utils.files.fopen('baz.txt') as self.fh3:
result = self.fh3.read(999999)
assert result == ANSWERS, result
try:
with salt.utils.files.fopen('helloworld.txt'):
raise Exception('No globs should have matched')
except IOError:
# An IOError is expected here
pass
def test_read_for_loop(self):
'''
Test reading the contents of the file line by line in a for loop
'''
with patch('salt.utils.files.fopen', mock_open(read_data=QUESTIONS)):
lines = QUESTIONS.splitlines(True)
with salt.utils.files.fopen('foo.txt') as self.fh:
index = 0
for line in self.fh:
assert line == lines[index], 'Line {0}: {1}'.format(index, line)
index += 1
def test_read_for_loop_multifile(self):
'''
Same as test_read_for_loop, but using multifile support
'''
with patch('salt.utils.files.fopen', mock_open(read_data=self.contents)):
lines = QUESTIONS.splitlines(True)
with salt.utils.files.fopen('foo.txt') as self.fh:
index = 0
for line in self.fh:
assert line == lines[index], 'Line {0}: {1}'.format(index, line)
index += 1
lines = ANSWERS.splitlines(True)
with salt.utils.files.fopen('bar.txt') as self.fh2:
index = 0
for line in self.fh2:
assert line == lines[index], 'Line {0}: {1}'.format(index, line)
index += 1
with salt.utils.files.fopen('baz.txt') as self.fh3:
index = 0
for line in self.fh3:
assert line == lines[index], 'Line {0}: {1}'.format(index, line)
index += 1
try:
with salt.utils.files.fopen('helloworld.txt'):
raise Exception('No globs should have matched')
except IOError:
# An IOError is expected here
pass
def test_read_readline(self):
'''
Test reading part of a line using .read(), then reading the rest of the
line (and subsequent lines) using .readline().
'''
with patch('salt.utils.files.fopen', mock_open(read_data=QUESTIONS)):
with salt.utils.files.fopen('foo.txt') as self.fh:
# Read the first 4 chars of line 1
result = self.fh.read(4)
assert result == 'What', result
# Use .readline() to read the remainder of the line
result = self.fh.readline()
assert result == ' is your name?\n', result
# Read and check the other two lines
result = self.fh.readline()
assert result == 'What is your quest?\n', result
result = self.fh.readline()
assert result == 'What is the airspeed velocity of an unladen swallow?\n', result
def test_read_readline_multifile(self):
'''
Same as test_read_readline, but using multifile support
'''
with patch('salt.utils.files.fopen', mock_open(read_data=self.contents)):
with salt.utils.files.fopen('foo.txt') as self.fh:
# Read the first 4 chars of line 1
result = self.fh.read(4)
assert result == 'What', result
# Use .readline() to read the remainder of the line
result = self.fh.readline()
assert result == ' is your name?\n', result
# Read and check the other two lines
result = self.fh.readline()
assert result == 'What is your quest?\n', result
result = self.fh.readline()
assert result == 'What is the airspeed velocity of an unladen swallow?\n', result
with salt.utils.files.fopen('bar.txt') as self.fh2:
# Read the first 4 chars of line 1
result = self.fh2.read(14)
assert result == 'It is Arthur, ', result
# Use .readline() to read the remainder of the line
result = self.fh2.readline()
assert result == 'King of the Britons.\n', result
# Read and check the other two lines
result = self.fh2.readline()
assert result == 'To seek the Holy Grail.\n', result
result = self.fh2.readline()
assert result == 'What do you mean? An African or European swallow?\n', result
with salt.utils.files.fopen('baz.txt') as self.fh3:
# Read the first 4 chars of line 1
result = self.fh3.read(14)
assert result == 'It is Arthur, ', result
# Use .readline() to read the remainder of the line
result = self.fh3.readline()
assert result == 'King of the Britons.\n', result
# Read and check the other two lines
result = self.fh3.readline()
assert result == 'To seek the Holy Grail.\n', result
result = self.fh3.readline()
assert result == 'What do you mean? An African or European swallow?\n', result
try:
with salt.utils.files.fopen('helloworld.txt'):
raise Exception('No globs should have matched')
except IOError:
# An IOError is expected here
pass
def test_readline_readlines(self):
'''
Test reading the first line using .readline(), then reading the rest of
the file using .readlines().
'''
with patch('salt.utils.files.fopen', mock_open(read_data=QUESTIONS)):
with salt.utils.files.fopen('foo.txt') as self.fh:
# Read the first line
result = self.fh.readline()
assert result == 'What is your name?\n', result
# Use .readlines() to read the remainder of the file
result = self.fh.readlines()
assert result == [
'What is your quest?\n',
'What is the airspeed velocity of an unladen swallow?\n'
], result
def test_readline_readlines_multifile(self):
'''
Same as test_readline_readlines, but using multifile support
'''
with patch('salt.utils.files.fopen', mock_open(read_data=self.contents)):
with salt.utils.files.fopen('foo.txt') as self.fh:
# Read the first line
result = self.fh.readline()
assert result == 'What is your name?\n', result
# Use .readlines() to read the remainder of the file
result = self.fh.readlines()
assert result == [
'What is your quest?\n',
'What is the airspeed velocity of an unladen swallow?\n'
], result
with salt.utils.files.fopen('bar.txt') as self.fh2:
# Read the first line
result = self.fh2.readline()
assert result == 'It is Arthur, King of the Britons.\n', result
# Use .readlines() to read the remainder of the file
result = self.fh2.readlines()
assert result == [
'To seek the Holy Grail.\n',
'What do you mean? An African or European swallow?\n'
], result
with salt.utils.files.fopen('baz.txt') as self.fh3:
# Read the first line
result = self.fh3.readline()
assert result == 'It is Arthur, King of the Britons.\n', result
# Use .readlines() to read the remainder of the file
result = self.fh3.readlines()
assert result == [
'To seek the Holy Grail.\n',
'What do you mean? An African or European swallow?\n'
], result
try:
with salt.utils.files.fopen('helloworld.txt'):
raise Exception('No globs should have matched')
except IOError:
# An IOError is expected here
pass
def test_readlines(self):
'''
Test reading the entire file using .readlines
'''
with patch('salt.utils.files.fopen', mock_open(read_data=QUESTIONS)):
with salt.utils.files.fopen('foo.txt') as self.fh:
result = self.fh.readlines()
assert result == QUESTIONS.splitlines(True), result
def test_readlines_multifile(self):
'''
Same as test_readlines, but using multifile support
'''
with patch('salt.utils.files.fopen', mock_open(read_data=self.contents)):
with salt.utils.files.fopen('foo.txt') as self.fh:
result = self.fh.readlines()
assert result == QUESTIONS.splitlines(True), result
with salt.utils.files.fopen('bar.txt') as self.fh2:
result = self.fh2.readlines()
assert result == ANSWERS.splitlines(True), result
with salt.utils.files.fopen('baz.txt') as self.fh3:
result = self.fh3.readlines()
assert result == ANSWERS.splitlines(True), result
try:
with salt.utils.files.fopen('helloworld.txt'):
raise Exception('No globs should have matched')
except IOError:
# An IOError is expected here
pass
def test_read_data_converted_to_dict(self):
'''
Test that a non-dict value for read_data is converted to a dict mapping
'*' to that value.
'''
contents = 'спам'
normalized = salt.utils.stringutils.to_str(contents)
with patch('salt.utils.files.fopen',
mock_open(read_data=contents)) as m_open:
assert m_open.read_data == {'*': normalized}, m_open.read_data
with patch('salt.utils.files.fopen',
mock_open(read_data=self.read_data_as_list)) as m_open:
assert m_open.read_data == {
'*': self.normalized_read_data_as_list,
}, m_open.read_data
def test_read_data_list(self):
'''
Test read_data when it is a list
'''
with patch('salt.utils.files.fopen',
mock_open(read_data=self.read_data_as_list)):
for value in self.normalized_read_data_as_list:
try:
with salt.utils.files.fopen('foo.txt') as self.fh:
result = self.fh.read()
assert result == value, result
except IOError:
# Don't raise the exception if it was expected
if not isinstance(value, IOError):
raise
def test_read_data_list_bytes(self):
'''
Test read_data when it is a list and the value is a bytestring
'''
with patch('salt.utils.files.fopen',
mock_open(read_data=self.read_data_as_list_bytes)):
for value in self.read_data_as_list_bytes:
try:
with salt.utils.files.fopen('foo.txt') as self.fh:
result = self.fh.read()
assert result == value, result
except IOError:
# Don't raise the exception if it was expected
if not isinstance(value, IOError):
raise
def test_tell(self):
'''
Test the implementation of tell
'''
lines = QUESTIONS.splitlines(True)
with patch('salt.utils.files.fopen',
mock_open(read_data=self.contents)):
# Try with reading explicit sizes and then reading the rest of the
# file.
with salt.utils.files.fopen('foo.txt') as self.fh:
self.fh.read(5)
loc = self.fh.tell()
assert loc == 5, loc
self.fh.read(12)
loc = self.fh.tell()
assert loc == 17, loc
self.fh.read()
loc = self.fh.tell()
assert loc == len(QUESTIONS), loc
# Try reading way more content then actually exists in the file,
# tell() should return a value equal to the length of the content
with salt.utils.files.fopen('foo.txt') as self.fh:
self.fh.read(999999)
loc = self.fh.tell()
assert loc == len(QUESTIONS), loc
# Try reading a few bytes using .read(), then the rest of the line
# using .readline(), then the rest of the file using .readlines(),
# and check the location after each read.
with salt.utils.files.fopen('foo.txt') as self.fh:
# Read a few bytes
self.fh.read(5)
loc = self.fh.tell()
assert loc == 5, loc
# Read the rest of the line. Location should then be at the end
# of the first line.
self.fh.readline()
loc = self.fh.tell()
assert loc == len(lines[0]), loc
# Read the rest of the file using .readlines()
self.fh.readlines()
loc = self.fh.tell()
assert loc == len(QUESTIONS), loc
# Check location while iterating through the filehandle
with salt.utils.files.fopen('foo.txt') as self.fh:
index = 0
for _ in self.fh:
index += 1
loc = self.fh.tell()
assert loc == sum(len(x) for x in lines[:index]), loc

View file

@ -164,7 +164,7 @@ class NetworkTestCase(TestCase):
## ccc
127.0.0.1 localhost thisismyhostname # 本机
''')
fopen_mock = mock_open(read_data=content, match='/etc/hosts')
fopen_mock = mock_open(read_data={'/etc/hosts': content})
with patch('salt.utils.files.fopen', fopen_mock):
assert 'thisismyhostname' in network._generate_minion_id()