mock_open: rewrite multi-file support

The initial approach tied all opened files to the same mocked
filehandle. This new approach moves all the read functions into a class,
and then uses a separate instance of that class for each opened file.
This commit is contained in:
Erik Johnson 2018-06-16 18:34:21 -05:00
parent 5e6b539770
commit 1861e9b944
No known key found for this signature in database
GPG key ID: 5E5583C437808F3F

View file

@ -94,39 +94,81 @@ if NO_MOCK is False:
NO_MOCK_REASON = 'you need to upgrade your mock version to >= 0.8.0'
# backport mock_open from the python 3 unittest.mock library so that we can
# mock read, readline, readlines, and file iteration properly
class MockFH(object):
def __init__(self, filename, read_data):
self.filename = filename
self.empty_string = b'' if isinstance(read_data, six.binary_type) else ''
self.read_data = self._iterate_read_data(read_data)
self.write = Mock()
self.writelines = Mock()
file_spec = None
def _iterate_read_data(self, read_data):
'''
Helper for mock_open:
Retrieve lines from read_data via a generator so that separate calls to
readline, read, and readlines are properly interleaved
'''
# Newline will always be a bytestring on PY2 because mock_open will have
# normalized it to one.
newline = b'\n' if isinstance(read_data, six.binary_type) else '\n'
read_data = [line + newline for line in read_data.split(newline)]
if read_data[-1] == newline:
# If the last line ended in a newline, the list comprehension will have an
# extra entry that's just a newline. Remove this.
read_data = read_data[:-1]
else:
# If there wasn't an extra newline by itself, then the file being
# emulated doesn't have a newline to end the last line, so remove the
# newline that we added in the list comprehension.
read_data[-1] = read_data[-1][:-1]
for line in read_data:
yield line
def read(self, size=0):
if not isinstance(size, six.integer_types) or size < 0:
raise TypeError('a positive integer is required')
joined = self.empty_string.join(self.read_data)
if not size:
# read() called with no args, return everything
return joined
else:
# read() called with an explicit size. Return a slice matching the
# requested size, but before doing so, reset read_data to reflect
# what we read.
self.read_data = self._iterate_read_data(joined[size:])
return joined[:size]
def readlines(self, size=None): # pylint: disable=unused-argument
# TODO: Implement "size" argument
return list(self.read_data)
def readline(self, size=None): # pylint: disable=unused-argument
# TODO: Implement "size" argument
try:
return next(self.read_data)
except StopIteration:
return self.empty_string
def __iter__(self):
while True:
try:
yield next(self.read_data)
except StopIteration:
break
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb): # pylint: disable=unused-argument
pass
def _iterate_read_data(read_data):
'''
Helper for mock_open:
Retrieve lines from read_data via a generator so that separate calls to
readline, read, and readlines are properly interleaved
'''
# Newline will always be a bytestring on PY2 because mock_open will have
# normalized it to one.
newline = b'\n' if isinstance(read_data, six.binary_type) else '\n'
read_data = [line + newline for line in read_data.split(newline)]
if read_data[-1] == newline:
# If the last line ended in a newline, the list comprehension will have an
# extra entry that's just a newline. Remove this.
read_data = read_data[:-1]
else:
# If there wasn't an extra newline by itself, then the file being
# emulated doesn't have a newline to end the last line, so remove the
# newline that we added in the list comprehension.
read_data[-1] = read_data[-1][:-1]
for line in read_data:
yield line
def mock_open(mock=None, read_data=''):
# reimplement mock_open to support multiple filehandles
def mock_open(read_data=''):
'''
A helper function to create a mock to replace the use of `open`. It works
for "open" called directly or used as a context manager.
@ -138,7 +180,7 @@ def mock_open(mock=None, read_data=''):
"read_data" is a string representing the contents of the file to be read.
By default, this is an empty string.
Optionally, `read_data` can be a dictionary mapping fnmatch.fnmatch()
Optionally, "read_data" can be a dictionary mapping fnmatch.fnmatch()
patterns to strings. This allows the mocked filehandle to serve content for
more than one file path.
@ -159,12 +201,13 @@ def mock_open(mock=None, read_data=''):
with patch('salt.utils.files.fopen', mock_open(read_data=data):
do stuff
If the file path being opened does not match the glob expression, an
IOError will be raised to simulate the file not existing.
If the file path being opened does not match any of the glob expressions,
an IOError will be raised to simulate the file not existing.
Glob expressions will be attempted in iteration order, so if a file path
matches more than one glob expression it will match whichever is iterated
first.
first. If a specifc iteration order is desired (and you are not running
Python >= 3.6), consider passing "read_data" as an OrderedDict.
Passing "read_data" as a string is equivalent to passing it with a glob
expression of "*".
@ -175,131 +218,28 @@ def mock_open(mock=None, read_data=''):
read_data = {'*': read_data}
if six.PY2:
read_data = {x: salt.utils.stringutils.to_str(y)
for x, y in six.iteritems(read_data)}
global file_spec
if file_spec is None:
if six.PY3:
import _io
file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
else:
file_spec = file # pylint: disable=undefined-variable
if mock is None:
mock = MagicMock(name='open', spec=open)
# We're using a dict here so that we can access both the mock object and
# the generator from the closures below. This also allows us to replace the
# genreator with an updated one after we've read from it, which allows
# the mocked read funcs to more closely emulate an actual filehandle. The
# .__class__() function is used to preserve the dict class, in case
# read_data is an OrderedDict.
data = {
'filehandle': read_data.__class__(),
'mock': mock,
}
def _filename(data):
return data['mock'].call_args[0][0]
def _match_glob(filename):
for key in read_data:
if key == '*':
continue
if fnmatch.fnmatch(filename, key):
return key
return '*'
def _match_fn(data):
filename = _filename(data)
try:
return data['glob_map'][filename]
except KeyError:
data.setdefault('glob_map', {})[filename] = _match_glob(filename)
return data['glob_map'][filename]
def _empty_string(data):
filename = _filename(data)
try:
return data['empty_string'][filename]
except KeyError:
data.setdefault('empty_string', {})[filename] = (
b'' if isinstance(read_data[_match_fn(data)], six.binary_type)
else ''
)
return data['empty_string'][filename]
def _readlines_side_effect(*args, **kwargs):
filename = _filename(data)
ret = list(data['filehandle'][filename])
# We've read everything in the file. Clear its contents so that further
# reads behave as expected.
data['filehandle'][filename] = _iterate_read_data('')
return ret
def _read_side_effect(*args, **kwargs):
filename = _filename(data)
joined = _empty_string(data).join(data['filehandle'][filename])
if not args:
# read() called with no args, we want to return everything. If
# anything was in the generator, clear it
if joined:
# If there were any contents, clear them
data['filehandle'][filename] = _iterate_read_data('')
return joined
else:
# read() called with an explicit size. Return a slice matching the
# requested size, but before doing so, reset data to reflect what
# we read.
size = args[0]
if not isinstance(size, six.integer_types):
raise TypeError('an integer is required')
data['filehandle'][filename] = _iterate_read_data(joined[size:])
return joined[:size]
def _readline_side_effect():
filename = _filename(data)
try:
return next(data['filehandle'][filename])
except StopIteration:
return _empty_string(data)
def _iter_side_effect():
filename = _filename(data)
while True:
try:
yield next(data['filehandle'][filename])
except StopIteration:
break
# .__class__() used here to preserve the dict class in the event that
# an OrderedDict was used.
read_data = read_data.__class__(
[(x, salt.utils.stringutils.to_str(y))
for x, y in six.iteritems(read_data)]
)
def _fopen_side_effect(name, *args, **kwargs):
key = _match_glob(name)
for pat in read_data:
if pat == '*':
continue
if fnmatch.fnmatch(name, pat):
matched_pattern = pat
break
else:
# No non-glob match in read_data, fall back to '*'
matched_pattern = '*'
try:
data['filehandle'][name] = _iterate_read_data(read_data[key])
return DEFAULT
return MockFH(name, read_data[matched_pattern])
except KeyError:
# No matching glob in read_data, treat this as a file that does
# not exist and raise the appropriate exception.
raise IOError(errno.ENOENT, 'No such file or directory', name)
handle = MagicMock(spec=file_spec)
handle.__enter__.return_value = handle
handle.write.return_value = None
handle.read.return_value = None
handle.readline.return_value = None
handle.readlines.return_value = None
# Support iteration via for loop
handle.__iter__ = lambda x: _iter_side_effect()
# This is salt specific and not in the upstream mock
handle.read.side_effect = _read_side_effect
handle.readline.side_effect = _readline_side_effect
handle.readlines.side_effect = _readlines_side_effect
mock.side_effect = _fopen_side_effect
mock.return_value = handle
return mock
return MagicMock(name='open', spec=open, side_effect=_fopen_side_effect)