Merge branch '2019.2.1' into add_pypsexec

This commit is contained in:
Megan Wilhite 2019-06-17 11:32:32 -04:00 committed by GitHub
commit 8f7cbfcc45
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 478 additions and 139 deletions

View file

@ -22,6 +22,7 @@ from salt.ext.six.moves import map
# Get logging started
log = logging.getLogger(__name__)
default_event_wait = 30
__func_alias__ = {
'list_': 'list',
'reload_': 'reload'
@ -56,7 +57,7 @@ def list_(return_yaml=True,
beacons = None
try:
eventer = salt.utils.event.get_event('minion', opts=__opts__)
eventer = salt.utils.event.get_event('minion', opts=__opts__, listen=True)
res = __salt__['event.fire']({'func': 'list',
'include_pillar': include_pillar,
'include_opts': include_opts},
@ -64,7 +65,7 @@ def list_(return_yaml=True,
if res:
event_ret = eventer.get_event(
tag='/salt/minion/minion_beacons_list_complete',
wait=kwargs.get('timeout', 30))
wait=kwargs.get('timeout', default_event_wait))
log.debug('event_ret %s', event_ret)
if event_ret and event_ret['complete']:
beacons = event_ret['beacons']
@ -103,12 +104,12 @@ def list_available(return_yaml=True, **kwargs):
beacons = None
try:
eventer = salt.utils.event.get_event('minion', opts=__opts__)
eventer = salt.utils.event.get_event('minion', opts=__opts__, listen=True)
res = __salt__['event.fire']({'func': 'list_available'}, 'manage_beacons')
if res:
event_ret = eventer.get_event(
tag='/salt/minion/minion_beacons_list_available_complete',
wait=kwargs.get('timeout', 30))
wait=kwargs.get('timeout', default_event_wait))
if event_ret and event_ret['complete']:
beacons = event_ret['beacons']
except KeyError:
@ -160,7 +161,7 @@ def add(name, beacon_data, **kwargs):
else:
try:
# Attempt to load the beacon module so we have access to the validate function
eventer = salt.utils.event.get_event('minion', opts=__opts__)
eventer = salt.utils.event.get_event('minion', opts=__opts__, listen=True)
res = __salt__['event.fire']({'name': name,
'beacon_data': beacon_data,
'func': 'validate_beacon'},
@ -168,7 +169,7 @@ def add(name, beacon_data, **kwargs):
if res:
event_ret = eventer.get_event(
tag='/salt/minion/minion_beacon_validation_complete',
wait=kwargs.get('timeout', 30))
wait=kwargs.get('timeout', default_event_wait))
valid = event_ret['valid']
vcomment = event_ret['vcomment']
@ -189,7 +190,7 @@ def add(name, beacon_data, **kwargs):
if res:
event_ret = eventer.get_event(
tag='/salt/minion/minion_beacon_add_complete',
wait=kwargs.get('timeout', 30))
wait=kwargs.get('timeout', default_event_wait))
if event_ret and event_ret['complete']:
beacons = event_ret['beacons']
if name in beacons and beacons[name] == beacon_data:
@ -234,7 +235,7 @@ def modify(name, beacon_data, **kwargs):
else:
try:
# Attempt to load the beacon module so we have access to the validate function
eventer = salt.utils.event.get_event('minion', opts=__opts__)
eventer = salt.utils.event.get_event('minion', opts=__opts__, listen=True)
res = __salt__['event.fire']({'name': name,
'beacon_data': beacon_data,
'func': 'validate_beacon'},
@ -242,7 +243,7 @@ def modify(name, beacon_data, **kwargs):
if res:
event_ret = eventer.get_event(
tag='/salt/minion/minion_beacon_validation_complete',
wait=kwargs.get('timeout', 30))
wait=kwargs.get('timeout', default_event_wait))
valid = event_ret['valid']
vcomment = event_ret['vcomment']
@ -283,12 +284,12 @@ def modify(name, beacon_data, **kwargs):
ret['changes']['diff'] = ''.join(_diff)
try:
eventer = salt.utils.event.get_event('minion', opts=__opts__)
eventer = salt.utils.event.get_event('minion', opts=__opts__, listen=True)
res = __salt__['event.fire']({'name': name, 'beacon_data': beacon_data, 'func': 'modify'}, 'manage_beacons')
if res:
event_ret = eventer.get_event(
tag='/salt/minion/minion_beacon_modify_complete',
wait=kwargs.get('timeout', 30))
wait=kwargs.get('timeout', default_event_wait))
if event_ret and event_ret['complete']:
beacons = event_ret['beacons']
if name in beacons and beacons[name] == beacon_data:
@ -329,12 +330,12 @@ def delete(name, **kwargs):
ret['comment'] = 'Beacon: {0} would be deleted.'.format(name)
else:
try:
eventer = salt.utils.event.get_event('minion', opts=__opts__)
eventer = salt.utils.event.get_event('minion', opts=__opts__, listen=True)
res = __salt__['event.fire']({'name': name, 'func': 'delete'}, 'manage_beacons')
if res:
event_ret = eventer.get_event(
tag='/salt/minion/minion_beacon_delete_complete',
wait=kwargs.get('timeout', 30))
wait=kwargs.get('timeout', default_event_wait))
if event_ret and event_ret['complete']:
beacons = event_ret['beacons']
if name not in beacons:
@ -410,12 +411,12 @@ def enable(**kwargs):
ret['comment'] = 'Beacons would be enabled.'
else:
try:
eventer = salt.utils.event.get_event('minion', opts=__opts__)
eventer = salt.utils.event.get_event('minion', opts=__opts__, listen=True)
res = __salt__['event.fire']({'func': 'enable'}, 'manage_beacons')
if res:
event_ret = eventer.get_event(
tag='/salt/minion/minion_beacons_enabled_complete',
wait=kwargs.get('timeout', 30))
wait=kwargs.get('timeout', default_event_wait))
if event_ret and event_ret['complete']:
beacons = event_ret['beacons']
if 'enabled' in beacons and beacons['enabled']:
@ -451,12 +452,12 @@ def disable(**kwargs):
ret['comment'] = 'Beacons would be disabled.'
else:
try:
eventer = salt.utils.event.get_event('minion', opts=__opts__)
eventer = salt.utils.event.get_event('minion', opts=__opts__, listen=True)
res = __salt__['event.fire']({'func': 'disable'}, 'manage_beacons')
if res:
event_ret = eventer.get_event(
tag='/salt/minion/minion_beacons_disabled_complete',
wait=kwargs.get('timeout', 30))
wait=kwargs.get('timeout', default_event_wait))
log.debug('event_ret %s', event_ret)
if event_ret and event_ret['complete']:
beacons = event_ret['beacons']
@ -515,12 +516,12 @@ def enable_beacon(name, **kwargs):
return ret
try:
eventer = salt.utils.event.get_event('minion', opts=__opts__)
eventer = salt.utils.event.get_event('minion', opts=__opts__, listen=True)
res = __salt__['event.fire']({'func': 'enable_beacon', 'name': name}, 'manage_beacons')
if res:
event_ret = eventer.get_event(
tag='/salt/minion/minion_beacon_enabled_complete',
wait=kwargs.get('timeout', 30))
wait=kwargs.get('timeout', default_event_wait))
if event_ret and event_ret['complete']:
beacons = event_ret['beacons']
beacon_config_dict = _get_beacon_config_dict(beacons[name])
@ -573,12 +574,12 @@ def disable_beacon(name, **kwargs):
return ret
try:
eventer = salt.utils.event.get_event('minion', opts=__opts__)
eventer = salt.utils.event.get_event('minion', opts=__opts__, listen=True)
res = __salt__['event.fire']({'func': 'disable_beacon', 'name': name}, 'manage_beacons')
if res:
event_ret = eventer.get_event(
tag='/salt/minion/minion_beacon_disabled_complete',
wait=kwargs.get('timeout', 30))
wait=kwargs.get('timeout', default_event_wait))
if event_ret and event_ret['complete']:
beacons = event_ret['beacons']
beacon_config_dict = _get_beacon_config_dict(beacons[name])
@ -617,12 +618,12 @@ def reset(**kwargs):
ret['comment'] = 'Beacons would be reset.'
else:
try:
eventer = salt.utils.event.get_event('minion', opts=__opts__)
eventer = salt.utils.event.get_event('minion', opts=__opts__, listen=True)
res = __salt__['event.fire']({'func': 'reset'}, 'manage_beacons')
if res:
event_ret = eventer.get_event(
tag='/salt/minion/minion_beacon_reset_complete',
wait=kwargs.get('timeout', 30))
wait=kwargs.get('timeout', default_event_wait))
if event_ret and event_ret['complete']:
ret['result'] = True
ret['comment'] = 'Beacon configuration reset.'

View file

@ -30,7 +30,7 @@ import time
import glob
import hashlib
import mmap
from collections import Iterable, Mapping
from collections import Iterable, Mapping, namedtuple
from functools import reduce # pylint: disable=redefined-builtin
# pylint: disable=import-error,no-name-in-module,redefined-builtin
@ -73,6 +73,9 @@ __func_alias__ = {
}
AttrChanges = namedtuple('AttrChanges', 'added,removed')
def __virtual__():
'''
Only work on POSIX-like systems
@ -161,33 +164,49 @@ def _splitlines_preserving_trailing_newline(str):
return lines
def _get_chattr_man():
'''
Get the contents of the chattr man page
'''
return subprocess.check_output(['man', 'chattr'])
def _parse_chattr_man(man):
'''
Parse the contents of a chattr man page to find the E2fsprogs version
'''
match = re.search(
r'E2fsprogs version [0-9\.]+',
salt.utils.stringutils.to_str(man),
)
if match:
version = match.group().strip('E2fsprogs version ')
else:
version = None
return version
def _chattr_version():
'''
Return the version of chattr installed
'''
return _parse_chattr_man(_get_chattr_man())
# There's no really *good* way to get the version of chattr installed.
# It's part of the e2fsprogs package - we could try to parse the version
# from the package manager, but there's no guarantee that it was
# installed that way.
#
# The most reliable approach is to just check tune2fs, since that should
# be installed with chattr, at least if it was installed in a conventional
# manner.
#
# See https://unix.stackexchange.com/a/520399/5788 for discussion.
tune2fs = salt.utils.path.which('tune2fs')
if not tune2fs or salt.utils.platform.is_aix():
return None
cmd = [tune2fs]
result = __salt__['cmd.run'](cmd, ignore_retcode=True, python_shell=False)
match = re.search(
r'tune2fs (?P<version>[0-9\.]+)',
salt.utils.stringutils.to_str(result),
)
if match is None:
version = None
else:
version = match.group('version')
return version
def _chattr_has_extended_attrs():
'''
Return ``True`` if chattr supports extended attributes, that is,
the version is >1.41.22. Otherwise, ``False``
'''
ver = _chattr_version()
if ver is None:
return False
needed_version = salt.utils.versions.LooseVersion('1.41.12')
chattr_version = salt.utils.versions.LooseVersion(ver)
return chattr_version > needed_version
def gid_to_group(gid):
@ -551,8 +570,6 @@ def _cmp_attrs(path, attrs):
attrs
string of attributes to compare against a given file
'''
diff = [None, None]
# lsattr for AIX is not the same thing as lsattr for linux.
if salt.utils.platform.is_aix():
return None
@ -563,15 +580,13 @@ def _cmp_attrs(path, attrs):
# lsattr not installed
return None
old = [chr for chr in lattrs if chr not in attrs]
if len(old) > 0:
diff[1] = ''.join(old)
new = set(attrs)
old = set(lattrs)
new = [chr for chr in attrs if chr not in lattrs]
if len(new) > 0:
diff[0] = ''.join(new)
return diff
return AttrChanges(
added=''.join(new-old) or None,
removed=''.join(old-new) or None,
)
def lsattr(path):
@ -607,15 +622,12 @@ def lsattr(path):
results = {}
for line in result.splitlines():
if not line.startswith('lsattr: '):
vals = line.split(None, 1)
needed_version = salt.utils.versions.LooseVersion('1.41.12')
chattr_version = salt.utils.versions.LooseVersion(_chattr_version())
# The version of chattr on Centos 6 does not support extended
# attributes.
if chattr_version > needed_version:
results[vals[1]] = re.findall(r"[aAcCdDeijPsStTu]", vals[0])
attrs, file = line.split(None, 1)
if _chattr_has_extended_attrs():
pattern = r"[aAcCdDeijPsStTu]"
else:
results[vals[1]] = re.findall(r"[acdijstuADST]", vals[0])
pattern = r"[acdijstuADST]"
results[file] = re.findall(pattern, attrs)
return results
@ -680,8 +692,7 @@ def chattr(*files, **kwargs):
result = __salt__['cmd.run'](cmd, python_shell=False)
if bool(result):
raise CommandExecutionError(
"chattr failed to run, possibly due to bad parameters.")
return False
return True
@ -4516,19 +4527,6 @@ def check_perms(name, ret, user, group, mode, attrs=None, follow_symlinks=False)
is_dir = os.path.isdir(name)
is_link = os.path.islink(name)
if attrs is not None \
and not salt.utils.platform.is_windows() \
and not is_dir and not is_link:
try:
lattrs = lsattr(name)
except SaltInvocationError:
lattrs = None
if lattrs is not None:
# List attributes on file
perms['lattrs'] = ''.join(lattrs.get(name, ''))
# Remove attributes on file so changes can be enforced.
if perms['lattrs']:
chattr(name, operator='remove', attributes=perms['lattrs'])
# user/group changes if needed, then check if it worked
if user:
@ -4610,11 +4608,6 @@ def check_perms(name, ret, user, group, mode, attrs=None, follow_symlinks=False)
elif 'cgroup' in perms and user != '':
ret['changes']['group'] = group
if not salt.utils.platform.is_windows() and not is_dir:
# Replace attributes on file if it had been removed
if perms.get('lattrs', ''):
chattr(name, operator='add', attributes=perms['lattrs'])
# Mode changes if needed
if mode is not None:
# File is a symlink, ignore the mode setting
@ -4644,23 +4637,37 @@ def check_perms(name, ret, user, group, mode, attrs=None, follow_symlinks=False)
pass
else:
diff_attrs = _cmp_attrs(name, attrs)
if diff_attrs is not None:
if diff_attrs[0] is not None or diff_attrs[1] is not None:
if __opts__['test'] is True:
ret['changes']['attrs'] = attrs
if diff_attrs and any(attr for attr in diff_attrs):
changes = {
'old': ''.join(lsattr(name)[name]),
'new': None,
}
if __opts__['test'] is True:
changes['new'] = attrs
else:
if diff_attrs.added:
chattr(
name,
operator="add",
attributes=diff_attrs.added,
)
if diff_attrs.removed:
chattr(
name,
operator="remove",
attributes=diff_attrs.removed,
)
cmp_attrs = _cmp_attrs(name, attrs)
if any(attr for attr in cmp_attrs):
ret['result'] = False
ret['comment'].append(
'Failed to change attributes to {0}'.format(attrs)
)
changes['new'] = ''.join(lsattr(name)[name])
else:
if diff_attrs[0] is not None:
chattr(name, operator="add", attributes=diff_attrs[0])
if diff_attrs[1] is not None:
chattr(name, operator="remove", attributes=diff_attrs[1])
cmp_attrs = _cmp_attrs(name, attrs)
if cmp_attrs[0] is not None or cmp_attrs[1] is not None:
ret['result'] = False
ret['comment'].append(
'Failed to change attributes to {0}'.format(attrs)
)
else:
ret['changes']['attrs'] = attrs
changes['new'] = attrs
if changes['old'] != changes['new']:
ret['changes']['attrs'] = changes
# Only combine the comment list into a string
# after all comments are added above

View file

@ -757,14 +757,6 @@ class TestDaemon(object):
}
master_opts['ext_pillar'].append({'file_tree': file_tree})
# This is the syndic for master
# Let's start with a copy of the syndic master configuration
syndic_opts = copy.deepcopy(master_opts)
# Let's update with the syndic configuration
syndic_opts.update(salt.config._read_conf_file(os.path.join(RUNTIME_VARS.CONF_DIR, 'syndic')))
syndic_opts['cachedir'] = os.path.join(TMP, 'rootdir', 'cache')
syndic_opts['config_dir'] = RUNTIME_VARS.TMP_SYNDIC_MINION_CONF_DIR
# Under windows we can't seem to properly create a virtualenv off of another
# virtualenv, we can on linux but we will still point to the virtualenv binary
# outside the virtualenv running the test suite, if that's the case.
@ -854,6 +846,14 @@ class TestDaemon(object):
syndic_master_opts['transport'] = 'tcp'
proxy_opts['transport'] = 'tcp'
# This is the syndic for master
# Let's start with a copy of the syndic master configuration
syndic_opts = copy.deepcopy(master_opts)
# Let's update with the syndic configuration
syndic_opts.update(salt.config._read_conf_file(os.path.join(RUNTIME_VARS.CONF_DIR, 'syndic')))
syndic_opts['cachedir'] = os.path.join(TMP, 'rootdir', 'cache')
syndic_opts['config_dir'] = RUNTIME_VARS.TMP_SYNDIC_MINION_CONF_DIR
# Set up config options that require internal data
master_opts['pillar_roots'] = syndic_master_opts['pillar_roots'] = {
'base': [

View file

@ -23,7 +23,6 @@ from multiprocessing import Queue
import msgpack
# Import Salt libs
from salt.ext import six
from salt.utils.platform import is_darwin
import salt.log.setup
@ -35,8 +34,6 @@ __virtualname__ = 'runtests_log_handler'
def __virtual__():
if 'runtests_log_port' not in __opts__:
return False, "'runtests_log_port' not in options"
if six.PY3:
return False, "runtests external logging handler is temporarily disabled for Python 3 tests"
return True

View file

@ -2064,32 +2064,366 @@ class FileBasicsTestCase(TestCase, LoaderModuleMockMixin):
self.assertEqual(list(ret), ['file://' + self.myfile, 'filehash'])
class ChattrVersionTests(TestCase):
CHATTR_MAN = salt.utils.stringutils.to_bytes((
'AVAILABILITY\n'
'chattr is part of the e2fsprogs package and is available '
'from http://e2fsprogs.sourceforge.net.\n'
'SEE ALSO\n'
' lsattr(1), btrfs(5), ext4(5), xfs(5).\n\n'
'E2fsprogs version 1.43.4 '
' '
'January 2017 '
' '
' CHATTR(1)'
))
class LsattrTests(TestCase, LoaderModuleMockMixin):
def setup_loader_modules(self):
return {
filemod: {
'__salt__': {
'cmd.run': cmdmod.run,
},
},
}
def test__parse_chattr_version(self):
'''
Validate we can parse the E2fsprogs version from the chattr man page
'''
man_out = dedent(self.CHATTR_MAN)
parsed_version = filemod._parse_chattr_man(man_out)
assert parsed_version == '1.43.4', parsed_version
def run(self, result=None):
patch_aix = patch(
'salt.utils.platform.is_aix',
Mock(return_value=False),
)
patch_exists = patch(
'os.path.exists',
Mock(return_value=True),
)
patch_which = patch(
'salt.utils.path.which',
Mock(return_value='fnord'),
)
with patch_aix, patch_exists, patch_which:
super(LsattrTests, self).run(result)
def test__chattr_version(self):
'''
The _chattr_version method works
'''
with patch('subprocess.check_output', return_value=self.CHATTR_MAN):
parsed_version = filemod._chattr_version()
assert parsed_version == '1.43.4', parsed_version
def test_if_lsattr_is_missing_it_should_return_None(self):
patch_which = patch(
'salt.utils.path.which',
Mock(return_value=None),
)
with patch_which:
actual = filemod.lsattr('foo')
assert actual is None, actual
def test_on_aix_lsattr_should_be_None(self):
patch_aix = patch(
'salt.utils.platform.is_aix',
Mock(return_value=True),
)
with patch_aix:
# SaltInvocationError will be raised if filemod.lsattr
# doesn't early exit
actual = filemod.lsattr('foo')
self.assertIsNone(actual)
def test_SaltInvocationError_should_be_raised_when_file_is_missing(self):
patch_exists = patch(
'os.path.exists',
Mock(return_value=False),
)
with patch_exists, self.assertRaises(SaltInvocationError):
filemod.lsattr('foo')
def test_if_chattr_version_is_less_than_required_flags_should_ignore_extended(self):
fname = '/path/to/fnord'
with_extended = textwrap.dedent(
'''
aAcCdDeijPsStTu---- {}
'''
).strip().format(fname)
expected = set('acdijstuADST')
patch_has_ext = patch(
'salt.modules.file._chattr_has_extended_attrs',
Mock(return_value=False),
)
patch_run = patch.dict(
filemod.__salt__,
{'cmd.run': Mock(return_value=with_extended)},
)
with patch_has_ext, patch_run:
actual = set(filemod.lsattr(fname)[fname])
msg = 'Actual: {!r} Expected: {!r}'.format(actual, expected) # pylint: disable=E1322
assert actual == expected, msg
def test_if_chattr_version_is_high_enough_then_extended_flags_should_be_returned(self):
fname = '/path/to/fnord'
with_extended = textwrap.dedent(
'''
aAcCdDeijPsStTu---- {}
'''
).strip().format(fname)
expected = set('aAcCdDeijPsStTu')
patch_has_ext = patch(
'salt.modules.file._chattr_has_extended_attrs',
Mock(return_value=True),
)
patch_run = patch.dict(
filemod.__salt__,
{'cmd.run': Mock(return_value=with_extended)},
)
with patch_has_ext, patch_run:
actual = set(filemod.lsattr(fname)[fname])
msg = 'Actual: {!r} Expected: {!r}'.format(actual, expected) # pylint: disable=E1322
assert actual == expected, msg
def test_if_supports_extended_but_there_are_no_flags_then_none_should_be_returned(self):
fname = '/path/to/fnord'
with_extended = textwrap.dedent(
'''
------------------- {}
'''
).strip().format(fname)
expected = set('')
patch_has_ext = patch(
'salt.modules.file._chattr_has_extended_attrs',
Mock(return_value=True),
)
patch_run = patch.dict(
filemod.__salt__,
{'cmd.run': Mock(return_value=with_extended)},
)
with patch_has_ext, patch_run:
actual = set(filemod.lsattr(fname)[fname])
msg = 'Actual: {!r} Expected: {!r}'.format(actual, expected) # pylint: disable=E1322
assert actual == expected, msg
# This should create a merge conflict with ChattrVersionTests when
# a merge forward to develop happens. Develop's changes are made
# obsolete by this ChattrTests class, and should be removed in favor
# of this change.
class ChattrTests(TestCase, LoaderModuleMockMixin):
def setup_loader_modules(self):
return {
filemod: {
'__salt__': {
'cmd.run': cmdmod.run,
},
'__opts__': {
'test': False,
},
},
}
def run(self, result=None):
patch_aix = patch(
'salt.utils.platform.is_aix',
Mock(return_value=False),
)
patch_exists = patch(
'os.path.exists',
Mock(return_value=True),
)
patch_which = patch(
'salt.utils.path.which',
Mock(return_value='some/tune2fs'),
)
with patch_aix, patch_exists, patch_which:
super(ChattrTests, self).run(result)
def test_chattr_version_returns_None_if_no_tune2fs_exists(self):
patch_which = patch(
'salt.utils.path.which',
Mock(return_value=''),
)
with patch_which:
actual = filemod._chattr_version()
self.assertIsNone(actual)
def test_on_aix_chattr_version_should_be_None_even_if_tune2fs_exists(self):
patch_which = patch(
'salt.utils.path.which',
Mock(return_value='fnord'),
)
patch_aix = patch(
'salt.utils.platform.is_aix',
Mock(return_value=True),
)
mock_run = MagicMock(return_value='fnord')
patch_run = patch.dict(filemod.__salt__, {'cmd.run': mock_run})
with patch_which, patch_aix, patch_run:
actual = filemod._chattr_version()
self.assertIsNone(actual)
mock_run.assert_not_called()
def test_chattr_version_should_return_version_from_tune2fs(self):
expected = '1.43.4'
sample_output = textwrap.dedent(
'''
tune2fs 1.43.4 (31-Jan-2017)
Usage: tune2fs [-c max_mounts_count] [-e errors_behavior] [-f] [-g group]
[-i interval[d|m|w]] [-j] [-J journal_options] [-l]
[-m reserved_blocks_percent] [-o [^]mount_options[,...]]
[-p mmp_update_interval] [-r reserved_blocks_count] [-u user]
[-C mount_count] [-L volume_label] [-M last_mounted_dir]
[-O [^]feature[,...]] [-Q quota_options]
[-E extended-option[,...]] [-T last_check_time] [-U UUID]
[-I new_inode_size] [-z undo_file] device
'''
)
patch_which = patch(
'salt.utils.path.which',
Mock(return_value='fnord'),
)
patch_run = patch.dict(
filemod.__salt__,
{'cmd.run': MagicMock(return_value=sample_output)},
)
with patch_which, patch_run:
actual = filemod._chattr_version()
self.assertEqual(actual, expected)
def test_if_tune2fs_has_no_version_version_should_be_None(self):
patch_which = patch(
'salt.utils.path.which',
Mock(return_value='fnord'),
)
patch_run = patch.dict(
filemod.__salt__,
{'cmd.run': MagicMock(return_value='fnord')},
)
with patch_which, patch_run:
actual = filemod._chattr_version()
self.assertIsNone(actual)
def test_chattr_has_extended_attrs_should_return_False_if_chattr_version_is_None(self):
patch_chattr = patch(
'salt.modules.file._chattr_version',
Mock(return_value=None),
)
with patch_chattr:
actual = filemod._chattr_has_extended_attrs()
assert not actual, actual
def test_chattr_has_extended_attrs_should_return_False_if_version_is_too_low(self):
below_expected = '0.1.1'
patch_chattr = patch(
'salt.modules.file._chattr_version',
Mock(return_value=below_expected),
)
with patch_chattr:
actual = filemod._chattr_has_extended_attrs()
assert not actual, actual
def test_chattr_has_extended_attrs_should_return_False_if_version_is_equal_threshold(self):
threshold = '1.41.12'
patch_chattr = patch(
'salt.modules.file._chattr_version',
Mock(return_value=threshold),
)
with patch_chattr:
actual = filemod._chattr_has_extended_attrs()
assert not actual, actual
def test_chattr_has_extended_attrs_should_return_True_if_version_is_above_threshold(self):
higher_than = '1.41.13'
patch_chattr = patch(
'salt.modules.file._chattr_version',
Mock(return_value=higher_than),
)
with patch_chattr:
actual = filemod._chattr_has_extended_attrs()
assert actual, actual
def test_check_perms_should_report_no_attr_changes_if_there_are_none(self):
filename = '/path/to/fnord'
attrs = 'aAcCdDeijPsStTu'
higher_than = '1.41.13'
patch_chattr = patch(
'salt.modules.file._chattr_version',
Mock(return_value=higher_than),
)
patch_exists = patch(
'os.path.exists',
Mock(return_value=True),
)
patch_stats = patch(
'salt.modules.file.stats',
Mock(return_value={
'user': 'foo',
'group': 'bar',
'mode': '123',
}),
)
patch_run = patch.dict(
filemod.__salt__,
{'cmd.run': MagicMock(return_value='--------- '+filename)},
)
with patch_chattr, patch_exists, patch_stats, patch_run:
actual_ret, actual_perms = filemod.check_perms(
name=filename,
ret=None,
user='foo',
group='bar',
mode='123',
attrs=attrs,
follow_symlinks=False,
)
assert actual_ret.get('changes', {}).get('attrs')is None, actual_ret
def test_check_perms_should_report_attrs_new_and_old_if_they_changed(self):
filename = '/path/to/fnord'
attrs = 'aAcCdDeijPsStTu'
existing_attrs = 'aeiu'
expected = {
'attrs': {
'old': existing_attrs,
'new': attrs,
},
}
higher_than = '1.41.13'
patch_chattr = patch(
'salt.modules.file._chattr_version',
Mock(return_value=higher_than),
)
patch_stats = patch(
'salt.modules.file.stats',
Mock(return_value={
'user': 'foo',
'group': 'bar',
'mode': '123',
}),
)
patch_cmp = patch(
'salt.modules.file._cmp_attrs',
MagicMock(side_effect=[
filemod.AttrChanges(
added='aAcCdDeijPsStTu',
removed='',
),
filemod.AttrChanges(
None,
None,
),
]),
)
patch_chattr = patch(
'salt.modules.file.chattr',
MagicMock(),
)
def fake_cmd(cmd, *args, **kwargs):
if cmd == ['lsattr', '/path/to/fnord']:
return textwrap.dedent(
'''
{}---- {}
'''.format(existing_attrs, filename)
).strip()
else:
assert False, "not sure how to handle {}".format(cmd)
patch_run = patch.dict(
filemod.__salt__,
{'cmd.run': MagicMock(side_effect=fake_cmd)},
)
patch_ver = patch(
'salt.modules.file._chattr_has_extended_attrs',
MagicMock(return_value=True),
)
with patch_chattr, patch_stats, patch_cmp, patch_run, patch_ver:
actual_ret, actual_perms = filemod.check_perms(
name=filename,
ret=None,
user='foo',
group='bar',
mode='123',
attrs=attrs,
follow_symlinks=False,
)
self.assertDictEqual(actual_ret['changes'], expected)