mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
mock_open: rewrite multi-file support
The initial approach tied all opened files to the same mocked filehandle. This new approach moves all the read functions into a class, and then uses a separate instance of that class for each opened file.
This commit is contained in:
parent
5e6b539770
commit
1861e9b944
1 changed files with 94 additions and 154 deletions
|
@ -94,39 +94,81 @@ if NO_MOCK is False:
|
|||
NO_MOCK_REASON = 'you need to upgrade your mock version to >= 0.8.0'
|
||||
|
||||
|
||||
# backport mock_open from the python 3 unittest.mock library so that we can
|
||||
# mock read, readline, readlines, and file iteration properly
|
||||
class MockFH(object):
|
||||
def __init__(self, filename, read_data):
|
||||
self.filename = filename
|
||||
self.empty_string = b'' if isinstance(read_data, six.binary_type) else ''
|
||||
self.read_data = self._iterate_read_data(read_data)
|
||||
self.write = Mock()
|
||||
self.writelines = Mock()
|
||||
|
||||
file_spec = None
|
||||
def _iterate_read_data(self, read_data):
|
||||
'''
|
||||
Helper for mock_open:
|
||||
Retrieve lines from read_data via a generator so that separate calls to
|
||||
readline, read, and readlines are properly interleaved
|
||||
'''
|
||||
# Newline will always be a bytestring on PY2 because mock_open will have
|
||||
# normalized it to one.
|
||||
newline = b'\n' if isinstance(read_data, six.binary_type) else '\n'
|
||||
|
||||
read_data = [line + newline for line in read_data.split(newline)]
|
||||
|
||||
if read_data[-1] == newline:
|
||||
# If the last line ended in a newline, the list comprehension will have an
|
||||
# extra entry that's just a newline. Remove this.
|
||||
read_data = read_data[:-1]
|
||||
else:
|
||||
# If there wasn't an extra newline by itself, then the file being
|
||||
# emulated doesn't have a newline to end the last line, so remove the
|
||||
# newline that we added in the list comprehension.
|
||||
read_data[-1] = read_data[-1][:-1]
|
||||
|
||||
for line in read_data:
|
||||
yield line
|
||||
|
||||
def read(self, size=0):
|
||||
if not isinstance(size, six.integer_types) or size < 0:
|
||||
raise TypeError('a positive integer is required')
|
||||
|
||||
joined = self.empty_string.join(self.read_data)
|
||||
if not size:
|
||||
# read() called with no args, return everything
|
||||
return joined
|
||||
else:
|
||||
# read() called with an explicit size. Return a slice matching the
|
||||
# requested size, but before doing so, reset read_data to reflect
|
||||
# what we read.
|
||||
self.read_data = self._iterate_read_data(joined[size:])
|
||||
return joined[:size]
|
||||
|
||||
def readlines(self, size=None): # pylint: disable=unused-argument
|
||||
# TODO: Implement "size" argument
|
||||
return list(self.read_data)
|
||||
|
||||
def readline(self, size=None): # pylint: disable=unused-argument
|
||||
# TODO: Implement "size" argument
|
||||
try:
|
||||
return next(self.read_data)
|
||||
except StopIteration:
|
||||
return self.empty_string
|
||||
|
||||
def __iter__(self):
|
||||
while True:
|
||||
try:
|
||||
yield next(self.read_data)
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb): # pylint: disable=unused-argument
|
||||
pass
|
||||
|
||||
|
||||
def _iterate_read_data(read_data):
|
||||
'''
|
||||
Helper for mock_open:
|
||||
Retrieve lines from read_data via a generator so that separate calls to
|
||||
readline, read, and readlines are properly interleaved
|
||||
'''
|
||||
# Newline will always be a bytestring on PY2 because mock_open will have
|
||||
# normalized it to one.
|
||||
newline = b'\n' if isinstance(read_data, six.binary_type) else '\n'
|
||||
|
||||
read_data = [line + newline for line in read_data.split(newline)]
|
||||
|
||||
if read_data[-1] == newline:
|
||||
# If the last line ended in a newline, the list comprehension will have an
|
||||
# extra entry that's just a newline. Remove this.
|
||||
read_data = read_data[:-1]
|
||||
else:
|
||||
# If there wasn't an extra newline by itself, then the file being
|
||||
# emulated doesn't have a newline to end the last line, so remove the
|
||||
# newline that we added in the list comprehension.
|
||||
read_data[-1] = read_data[-1][:-1]
|
||||
|
||||
for line in read_data:
|
||||
yield line
|
||||
|
||||
|
||||
def mock_open(mock=None, read_data=''):
|
||||
# reimplement mock_open to support multiple filehandles
|
||||
def mock_open(read_data=''):
|
||||
'''
|
||||
A helper function to create a mock to replace the use of `open`. It works
|
||||
for "open" called directly or used as a context manager.
|
||||
|
@ -138,7 +180,7 @@ def mock_open(mock=None, read_data=''):
|
|||
"read_data" is a string representing the contents of the file to be read.
|
||||
By default, this is an empty string.
|
||||
|
||||
Optionally, `read_data` can be a dictionary mapping fnmatch.fnmatch()
|
||||
Optionally, "read_data" can be a dictionary mapping fnmatch.fnmatch()
|
||||
patterns to strings. This allows the mocked filehandle to serve content for
|
||||
more than one file path.
|
||||
|
||||
|
@ -159,12 +201,13 @@ def mock_open(mock=None, read_data=''):
|
|||
with patch('salt.utils.files.fopen', mock_open(read_data=data):
|
||||
do stuff
|
||||
|
||||
If the file path being opened does not match the glob expression, an
|
||||
IOError will be raised to simulate the file not existing.
|
||||
If the file path being opened does not match any of the glob expressions,
|
||||
an IOError will be raised to simulate the file not existing.
|
||||
|
||||
Glob expressions will be attempted in iteration order, so if a file path
|
||||
matches more than one glob expression it will match whichever is iterated
|
||||
first.
|
||||
first. If a specifc iteration order is desired (and you are not running
|
||||
Python >= 3.6), consider passing "read_data" as an OrderedDict.
|
||||
|
||||
Passing "read_data" as a string is equivalent to passing it with a glob
|
||||
expression of "*".
|
||||
|
@ -175,131 +218,28 @@ def mock_open(mock=None, read_data=''):
|
|||
read_data = {'*': read_data}
|
||||
|
||||
if six.PY2:
|
||||
read_data = {x: salt.utils.stringutils.to_str(y)
|
||||
for x, y in six.iteritems(read_data)}
|
||||
|
||||
global file_spec
|
||||
if file_spec is None:
|
||||
if six.PY3:
|
||||
import _io
|
||||
file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
|
||||
else:
|
||||
file_spec = file # pylint: disable=undefined-variable
|
||||
|
||||
if mock is None:
|
||||
mock = MagicMock(name='open', spec=open)
|
||||
|
||||
# We're using a dict here so that we can access both the mock object and
|
||||
# the generator from the closures below. This also allows us to replace the
|
||||
# genreator with an updated one after we've read from it, which allows
|
||||
# the mocked read funcs to more closely emulate an actual filehandle. The
|
||||
# .__class__() function is used to preserve the dict class, in case
|
||||
# read_data is an OrderedDict.
|
||||
data = {
|
||||
'filehandle': read_data.__class__(),
|
||||
'mock': mock,
|
||||
}
|
||||
|
||||
def _filename(data):
|
||||
return data['mock'].call_args[0][0]
|
||||
|
||||
def _match_glob(filename):
|
||||
for key in read_data:
|
||||
if key == '*':
|
||||
continue
|
||||
if fnmatch.fnmatch(filename, key):
|
||||
return key
|
||||
return '*'
|
||||
|
||||
def _match_fn(data):
|
||||
filename = _filename(data)
|
||||
try:
|
||||
return data['glob_map'][filename]
|
||||
except KeyError:
|
||||
data.setdefault('glob_map', {})[filename] = _match_glob(filename)
|
||||
return data['glob_map'][filename]
|
||||
|
||||
def _empty_string(data):
|
||||
filename = _filename(data)
|
||||
try:
|
||||
return data['empty_string'][filename]
|
||||
except KeyError:
|
||||
data.setdefault('empty_string', {})[filename] = (
|
||||
b'' if isinstance(read_data[_match_fn(data)], six.binary_type)
|
||||
else ''
|
||||
)
|
||||
return data['empty_string'][filename]
|
||||
|
||||
def _readlines_side_effect(*args, **kwargs):
|
||||
filename = _filename(data)
|
||||
ret = list(data['filehandle'][filename])
|
||||
# We've read everything in the file. Clear its contents so that further
|
||||
# reads behave as expected.
|
||||
data['filehandle'][filename] = _iterate_read_data('')
|
||||
return ret
|
||||
|
||||
def _read_side_effect(*args, **kwargs):
|
||||
filename = _filename(data)
|
||||
joined = _empty_string(data).join(data['filehandle'][filename])
|
||||
if not args:
|
||||
# read() called with no args, we want to return everything. If
|
||||
# anything was in the generator, clear it
|
||||
if joined:
|
||||
# If there were any contents, clear them
|
||||
data['filehandle'][filename] = _iterate_read_data('')
|
||||
return joined
|
||||
else:
|
||||
# read() called with an explicit size. Return a slice matching the
|
||||
# requested size, but before doing so, reset data to reflect what
|
||||
# we read.
|
||||
size = args[0]
|
||||
if not isinstance(size, six.integer_types):
|
||||
raise TypeError('an integer is required')
|
||||
data['filehandle'][filename] = _iterate_read_data(joined[size:])
|
||||
return joined[:size]
|
||||
|
||||
def _readline_side_effect():
|
||||
filename = _filename(data)
|
||||
try:
|
||||
return next(data['filehandle'][filename])
|
||||
except StopIteration:
|
||||
return _empty_string(data)
|
||||
|
||||
def _iter_side_effect():
|
||||
filename = _filename(data)
|
||||
while True:
|
||||
try:
|
||||
yield next(data['filehandle'][filename])
|
||||
except StopIteration:
|
||||
break
|
||||
# .__class__() used here to preserve the dict class in the event that
|
||||
# an OrderedDict was used.
|
||||
read_data = read_data.__class__(
|
||||
[(x, salt.utils.stringutils.to_str(y))
|
||||
for x, y in six.iteritems(read_data)]
|
||||
)
|
||||
|
||||
def _fopen_side_effect(name, *args, **kwargs):
|
||||
key = _match_glob(name)
|
||||
for pat in read_data:
|
||||
if pat == '*':
|
||||
continue
|
||||
if fnmatch.fnmatch(name, pat):
|
||||
matched_pattern = pat
|
||||
break
|
||||
else:
|
||||
# No non-glob match in read_data, fall back to '*'
|
||||
matched_pattern = '*'
|
||||
try:
|
||||
data['filehandle'][name] = _iterate_read_data(read_data[key])
|
||||
return DEFAULT
|
||||
return MockFH(name, read_data[matched_pattern])
|
||||
except KeyError:
|
||||
# No matching glob in read_data, treat this as a file that does
|
||||
# not exist and raise the appropriate exception.
|
||||
raise IOError(errno.ENOENT, 'No such file or directory', name)
|
||||
|
||||
handle = MagicMock(spec=file_spec)
|
||||
handle.__enter__.return_value = handle
|
||||
|
||||
handle.write.return_value = None
|
||||
handle.read.return_value = None
|
||||
handle.readline.return_value = None
|
||||
handle.readlines.return_value = None
|
||||
|
||||
# Support iteration via for loop
|
||||
handle.__iter__ = lambda x: _iter_side_effect()
|
||||
|
||||
# This is salt specific and not in the upstream mock
|
||||
handle.read.side_effect = _read_side_effect
|
||||
handle.readline.side_effect = _readline_side_effect
|
||||
handle.readlines.side_effect = _readlines_side_effect
|
||||
|
||||
mock.side_effect = _fopen_side_effect
|
||||
|
||||
mock.return_value = handle
|
||||
return mock
|
||||
return MagicMock(name='open', spec=open, side_effect=_fopen_side_effect)
|
||||
|
|
Loading…
Add table
Reference in a new issue