Merge pull request #45861 from rallytime/merge-oxygen

[oxygen] Merge forward from oxygen.rc1 to oxygen
This commit is contained in:
Nicole Thomas 2018-02-08 08:39:58 -05:00 committed by GitHub
commit 048c18ea42
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 2213 additions and 628 deletions

View file

@ -33,7 +33,7 @@ at midnight.
may not include the path for any scripts or commands used by Salt, and it
may be necessary to set the PATH accordingly in the crontab:
.. code-block:: cron
.. code-block:: bash
PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin:/opt/bin

View file

@ -130,7 +130,7 @@ command line.
The Windows Firewall rule can be created by issuing a single command. Run the
following command from the command line or a run prompt:
.. code-block:: cmd
.. code-block:: bash
netsh advfirewall firewall add rule name="Salt" dir=in action=allow protocol=TCP localport=4505-4506

View file

@ -40,7 +40,7 @@ event <tag> [<extra>, <data>]
Example of usage
.. code-block:: txt
.. code-block:: text
08:33:57 @gtmanfred > !ping
08:33:57 gtmanbot > gtmanfred: pong
@ -49,7 +49,7 @@ Example of usage
08:34:17 @gtmanfred > !event test/tag/ircbot irc is usefull
08:34:17 gtmanbot > gtmanfred: TaDa!
.. code-block:: txt
.. code-block:: text
[DEBUG ] Sending event: tag = salt/engines/ircbot/test/tag/ircbot; data = {'_stamp': '2016-11-28T14:34:16.633623', 'data': ['irc', 'is', 'useful']}

View file

@ -1273,8 +1273,8 @@ class RemoteClient(Client):
load = {'saltenv': saltenv,
'prefix': prefix,
'cmd': '_file_list'}
return [sdecode(fn_) for fn_ in self.channel.send(load)]
return salt.utils.data.decode(self.channel.send(load)) if six.PY2 \
else self.channel.send(load)
def file_list_emptydirs(self, saltenv='base', prefix=''):
'''
@ -1283,7 +1283,8 @@ class RemoteClient(Client):
load = {'saltenv': saltenv,
'prefix': prefix,
'cmd': '_file_list_emptydirs'}
self.channel.send(load)
return salt.utils.data.decode(self.channel.send(load)) if six.PY2 \
else self.channel.send(load)
def dir_list(self, saltenv='base', prefix=''):
'''
@ -1292,7 +1293,8 @@ class RemoteClient(Client):
load = {'saltenv': saltenv,
'prefix': prefix,
'cmd': '_dir_list'}
return self.channel.send(load)
return salt.utils.data.decode(self.channel.send(load)) if six.PY2 \
else self.channel.send(load)
def symlink_list(self, saltenv='base', prefix=''):
'''
@ -1301,7 +1303,8 @@ class RemoteClient(Client):
load = {'saltenv': saltenv,
'prefix': prefix,
'cmd': '_symlink_list'}
return self.channel.send(load)
return salt.utils.data.decode(self.channel.send(load)) if six.PY2 \
else self.channel.send(load)
def __hash_and_stat_file(self, path, saltenv='base'):
'''
@ -1367,21 +1370,24 @@ class RemoteClient(Client):
'''
load = {'saltenv': saltenv,
'cmd': '_file_list'}
return self.channel.send(load)
return salt.utils.data.decode(self.channel.send(load)) if six.PY2 \
else self.channel.send(load)
def envs(self):
'''
Return a list of available environments
'''
load = {'cmd': '_file_envs'}
return self.channel.send(load)
return salt.utils.data.decode(self.channel.send(load)) if six.PY2 \
else self.channel.send(load)
def master_opts(self):
'''
Return the master opts data
'''
load = {'cmd': '_master_opts'}
return self.channel.send(load)
return salt.utils.data.decode(self.channel.send(load)) if six.PY2 \
else self.channel.send(load)
def master_tops(self):
'''
@ -1392,7 +1398,8 @@ class RemoteClient(Client):
'opts': self.opts}
if self.auth:
load['tok'] = self.auth.gen_token(b'salt')
return self.channel.send(load)
return salt.utils.data.decode(self.channel.send(load)) if six.PY2 \
else self.channel.send(load)
class FSClient(RemoteClient):

View file

@ -17,11 +17,11 @@ import time
import salt.loader
import salt.utils.data
import salt.utils.files
import salt.utils.locales
import salt.utils.path
import salt.utils.url
import salt.utils.versions
from salt.utils.args import get_function_argspec as _argspec
from salt.utils.decorators import ensure_unicode_args
# Import 3rd-party libs
from salt.ext import six
@ -546,8 +546,8 @@ class Fileserver(object):
Find the path and return the fnd structure, this structure is passed
to other backend interfaces.
'''
path = salt.utils.locales.sdecode(path)
saltenv = salt.utils.locales.sdecode(saltenv)
path = salt.utils.stringutils.to_unicode(path)
saltenv = salt.utils.stringutils.to_unicode(saltenv)
back = self.backends(back)
kwargs = {}
fnd = {'path': '',
@ -626,7 +626,7 @@ class Fileserver(object):
if not isinstance(load['saltenv'], six.string_types):
load['saltenv'] = six.text_type(load['saltenv'])
fnd = self.find_file(salt.utils.locales.sdecode(load['path']),
fnd = self.find_file(salt.utils.stringutils.to_unicode(load['path']),
load['saltenv'])
if not fnd.get('back'):
return '', None
@ -731,6 +731,7 @@ class Fileserver(object):
)
return ret
@ensure_unicode_args
def file_list(self, load):
'''
Return a list of files from the dominant environment
@ -749,14 +750,13 @@ class Fileserver(object):
fstr = '{0}.file_list'.format(fsb)
if fstr in self.servers:
ret.update(self.servers[fstr](load))
# upgrade all set elements to a common encoding
ret = [salt.utils.locales.sdecode(f) for f in ret]
# some *fs do not handle prefix. Ensure it is filtered
prefix = load.get('prefix', '').strip('/')
if prefix != '':
ret = [f for f in ret if f.startswith(prefix)]
return sorted(ret)
@ensure_unicode_args
def file_list_emptydirs(self, load):
'''
List all emptydirs in the given environment
@ -775,14 +775,13 @@ class Fileserver(object):
fstr = '{0}.file_list_emptydirs'.format(fsb)
if fstr in self.servers:
ret.update(self.servers[fstr](load))
# upgrade all set elements to a common encoding
ret = [salt.utils.locales.sdecode(f) for f in ret]
# some *fs do not handle prefix. Ensure it is filtered
prefix = load.get('prefix', '').strip('/')
if prefix != '':
ret = [f for f in ret if f.startswith(prefix)]
return sorted(ret)
@ensure_unicode_args
def dir_list(self, load):
'''
List all directories in the given environment
@ -801,14 +800,13 @@ class Fileserver(object):
fstr = '{0}.dir_list'.format(fsb)
if fstr in self.servers:
ret.update(self.servers[fstr](load))
# upgrade all set elements to a common encoding
ret = [salt.utils.locales.sdecode(f) for f in ret]
# some *fs do not handle prefix. Ensure it is filtered
prefix = load.get('prefix', '').strip('/')
if prefix != '':
ret = [f for f in ret if f.startswith(prefix)]
return sorted(ret)
@ensure_unicode_args
def symlink_list(self, load):
'''
Return a list of symlinked files and dirs
@ -827,10 +825,6 @@ class Fileserver(object):
symlstr = '{0}.symlink_list'.format(fsb)
if symlstr in self.servers:
ret = self.servers[symlstr](load)
# upgrade all set elements to a common encoding
ret = dict([
(salt.utils.locales.sdecode(x), salt.utils.locales.sdecode(y)) for x, y in ret.items()
])
# some *fs do not handle prefix. Ensure it is filtered
prefix = load.get('prefix', '').strip('/')
if prefix != '':

View file

@ -132,20 +132,23 @@ def _linux_disks():
ret = {'disks': [], 'SSDs': []}
for entry in glob.glob('/sys/block/*/queue/rotational'):
with salt.utils.files.fopen(entry) as entry_fp:
device = entry.split('/')[3]
flag = entry_fp.read(1)
if flag == '0':
ret['SSDs'].append(device)
log.trace('Device %s reports itself as an SSD', device)
elif flag == '1':
ret['disks'].append(device)
log.trace('Device %s reports itself as an HDD', device)
else:
log.trace(
'Unable to identify device %s as an SSD or HDD. It does '
'not report 0 or 1', device
)
try:
with salt.utils.files.fopen(entry) as entry_fp:
device = entry.split('/')[3]
flag = entry_fp.read(1)
if flag == '0':
ret['SSDs'].append(device)
log.trace('Device %s reports itself as an SSD', device)
elif flag == '1':
ret['disks'].append(device)
log.trace('Device %s reports itself as an HDD', device)
else:
log.trace(
'Unable to identify device %s as an SSD or HDD. It does '
'not report 0 or 1', device
)
except IOError:
pass
return ret

View file

@ -174,7 +174,12 @@ if sys.version_info < (3, 2):
this method if you want to use blocking, timeouts or custom queue
implementations.
'''
self.queue.put_nowait(record)
try:
self.queue.put_nowait(record)
except self.queue.Full:
sys.stderr.write('[WARNING ] Message queue is full, '
'unable to write "{0}" to log', record
)
def prepare(self, record):
'''

View file

@ -1437,6 +1437,9 @@ class Minion(MinionBase):
Override this method if you wish to handle the decoded data
differently.
'''
# Ensure payload is unicode. Disregard failure to decode binary blobs.
if six.PY2:
data = salt.utils.data.decode(data, keep=True)
if 'user' in data:
log.info(
'User %s Executing command %s with jid %s',

View file

@ -8,12 +8,11 @@ from __future__ import absolute_import, print_function, unicode_literals
import logging
import re
import os
HAS_DBUS = False
try:
import dbus
HAS_DBUS = True
except ImportError:
pass
dbus = None
# Import Salt libs
import salt.utils.locales
@ -31,10 +30,10 @@ __virtualname__ = 'locale'
def __virtual__():
'''
Only work on POSIX-like systems
Exclude Windows OS.
'''
if salt.utils.platform.is_windows():
return (False, 'Cannot load locale module: windows platforms are unsupported')
return False, 'Cannot load locale module: windows platforms are unsupported'
return __virtualname__
@ -51,7 +50,8 @@ def _parse_dbus_locale():
ret = {}
for env_var in system_locale:
match = re.match('^([A-Z_]+)=(.*)$', env_var)
env_var = six.text_type(env_var)
match = re.match(r'^([A-Z_]+)=(.*)$', env_var)
if match:
ret[match.group(1)] = match.group(2).replace('"', '')
else:
@ -62,28 +62,35 @@ def _parse_dbus_locale():
return ret
def _parse_localectl():
def _localectl_status():
'''
Get the 'System Locale' parameters from localectl
Parse localectl status into a dict.
:return: dict
'''
if salt.utils.which('localectl') is None:
raise CommandExecutionError('Unable to find "localectl"')
ret = {}
localectl_out = __salt__['cmd.run']('localectl')
reading_locale = False
for line in localectl_out.splitlines():
if 'System Locale:' in line:
line = line.replace('System Locale:', '')
reading_locale = True
if not reading_locale:
continue
match = re.match('^([A-Z_]+)=(.*)$', line.strip())
if not match:
break
ret[match.group(1)] = match.group(2).replace('"', '')
else:
raise CommandExecutionError('Could not find system locale - could not '
'parse localectl output\n{0}'.format(localectl_out))
locale_ctl_out = (__salt__['cmd.run']('localectl status') or '').strip()
ctl_key = None
for line in locale_ctl_out.splitlines():
if ': ' in line: # Keys are separate with ":" and a space (!).
ctl_key, ctl_data = line.split(': ')
ctl_key = ctl_key.strip().lower().replace(' ', '_')
else:
ctl_data = line.strip()
if ctl_key:
if '=' in ctl_data:
loc_set = ctl_data.split('=')
if len(loc_set) == 2:
if ctl_key not in ret:
ret[ctl_key] = {}
ret[ctl_key][loc_set[0]] = loc_set[1]
else:
ret[ctl_key] = ctl_data
if not ret:
log.debug("Unable to find any locale information inside the following data:\n%s", locale_ctl_out)
raise CommandExecutionError('Unable to parse result of "localectl"')
return ret
@ -93,12 +100,10 @@ def _localectl_set(locale=''):
Use systemd's localectl command to set the LANG locale parameter, making
sure not to trample on other params that have been set.
'''
locale_params = _parse_dbus_locale() if HAS_DBUS else _parse_localectl()
locale_params = _parse_dbus_locale() if dbus is not None else _localectl_status().get('system_locale', {})
locale_params['LANG'] = six.text_type(locale)
args = ' '.join(['{0}="{1}"'.format(k, v)
for k, v in six.iteritems(locale_params)])
cmd = 'localectl set-locale {0}'.format(args)
return __salt__['cmd.retcode'](cmd, python_shell=False) == 0
args = ' '.join(['{0}="{1}"'.format(k, v) for k, v in six.iteritems(locale_params)])
return not __salt__['cmd.retcode']('localectl set-locale {0}'.format(args), python_shell=False)
def list_avail():
@ -111,9 +116,7 @@ def list_avail():
salt '*' locale.list_avail
'''
cmd = 'locale -a'
out = __salt__['cmd.run'](cmd).split('\n')
return out
return __salt__['cmd.run']('locale -a').split('\n')
def get_locale():
@ -126,30 +129,36 @@ def get_locale():
salt '*' locale.get_locale
'''
cmd = ''
if 'Suse' in __grains__['os_family']:
# this block applies to all SUSE systems - also with systemd
cmd = 'grep "^RC_LANG" /etc/sysconfig/language'
elif salt.utils.systemd.booted(__context__):
params = _parse_dbus_locale() if HAS_DBUS else _parse_localectl()
return params.get('LANG', '')
elif 'RedHat' in __grains__['os_family']:
cmd = 'grep "^LANG=" /etc/sysconfig/i18n'
elif 'Debian' in __grains__['os_family']:
# this block only applies to Debian without systemd
cmd = 'grep "^LANG=" /etc/default/locale'
elif 'Gentoo' in __grains__['os_family']:
cmd = 'eselect --brief locale show'
return __salt__['cmd.run'](cmd).strip()
elif 'Solaris' in __grains__['os_family']:
cmd = 'grep "^LANG=" /etc/default/init'
else: # don't waste time on a failing cmd.run
raise CommandExecutionError('Error: Unsupported platform!')
ret = ''
lc_ctl = salt.utils.systemd.booted(__context__)
# localectl on SLE12 is installed but the integration is still broken in latest SP3 due to
# config is rewritten by by many %post installation hooks in the older packages.
# If you use it -- you will break your config. This is not the case in SLE15 anymore.
if lc_ctl and not (__grains__['os_family'] in ['Suse'] and __grains__['osmajorrelease'] in [12]):
ret = (_parse_dbus_locale() if dbus is not None else _localectl_status()['system_locale']).get('LANG', '')
else:
if 'Suse' in __grains__['os_family'] and __grains__['osmajorrelease'] == 12:
cmd = 'grep "^RC_LANG" /etc/sysconfig/language'
elif 'RedHat' in __grains__['os_family']:
cmd = 'grep "^LANG=" /etc/sysconfig/i18n'
elif 'Debian' in __grains__['os_family']:
# this block only applies to Debian without systemd
cmd = 'grep "^LANG=" /etc/default/locale'
elif 'Gentoo' in __grains__['os_family']:
cmd = 'eselect --brief locale show'
return __salt__['cmd.run'](cmd).strip()
elif 'Solaris' in __grains__['os_family']:
cmd = 'grep "^LANG=" /etc/default/init'
else: # don't waste time on a failing cmd.run
raise CommandExecutionError('Error: "{0}" is unsupported!'.format(__grains__['oscodename']))
try:
return __salt__['cmd.run'](cmd).split('=')[1].replace('"', '')
except IndexError:
return ''
if cmd:
try:
ret = __salt__['cmd.run'](cmd).split('=')[1].replace('"', '')
except IndexError as err:
log.error('Error occurred while running "%s": %s', cmd, err)
return ret
def set_locale(locale):
@ -162,6 +171,11 @@ def set_locale(locale):
salt '*' locale.set_locale 'en_US.UTF-8'
'''
lc_ctl = salt.utils.systemd.booted(__context__)
# localectl on SLE12 is installed but the integration is broken -- config is rewritten by YaST2
if lc_ctl and not (__grains__['os_family'] in ['Suse'] and __grains__['osmajorrelease'] in [12]):
return _localectl_set(locale)
if 'Suse' in __grains__['os_family']:
# this block applies to all SUSE systems - also with systemd
if not __salt__['file.file_exists']('/etc/sysconfig/language'):
@ -172,8 +186,6 @@ def set_locale(locale):
'RC_LANG="{0}"'.format(locale),
append_if_not_found=True
)
elif salt.utils.systemd.booted(__context__):
return _localectl_set(locale)
elif 'RedHat' in __grains__['os_family']:
if not __salt__['file.file_exists']('/etc/sysconfig/i18n'):
__salt__['file.touch']('/etc/sysconfig/i18n')
@ -190,8 +202,6 @@ def set_locale(locale):
raise CommandExecutionError(
'Cannot set locale: "update-locale" was not found.')
__salt__['cmd.run'](update_locale) # (re)generate /etc/default/locale
# FIXME: why are we writing to a file that is dynamically generated?
__salt__['file.replace'](
'/etc/default/locale',
'^LANG=.*',
@ -235,7 +245,7 @@ def avail(locale):
return False
avail_locales = __salt__['locale.list_avail']()
locale_exists = next((True for x in avail_locales
if salt.utils.locales.normalize_locale(x.strip()) == normalized_locale), False)
if salt.utils.locales.normalize_locale(x.strip()) == normalized_locale), False)
return locale_exists

View file

@ -369,7 +369,7 @@ def search_by(lookup, tgt_type='compound', minion_id=None):
CLI Example:
.. code-block:: base
.. code-block:: bash
salt '*' match.search_by '{web: [node1, node2], db: [node2, node]}'

View file

@ -84,7 +84,7 @@ without extra parameters:
salt-run nacl.enc 'asecretpass'
salt-run nacl.dec 'tqXzeIJnTAM9Xf0mdLcpEdklMbfBGPj2oTKmlgrm3S1DTVVHNnh9h8mU1GKllGq/+cYsk6m5WhGdk58='
.. code-block:: yam
.. code-block:: yaml
# a salt developers minion could have pillar data that includes a nacl public key
nacl.config:

View file

@ -19,6 +19,7 @@ import logging
# Import Salt libs
import salt.utils.args
import salt.utils.data
import salt.utils.path
import salt.utils.platform
from salt.exceptions import SaltInvocationError
@ -380,7 +381,15 @@ def do(cmdline, runas=None, env=None):
if not env:
env = {}
env['PATH'] = '{0}/shims:{1}'.format(path, os.environ['PATH'])
# NOTE: Env vars (and their values) need to be str type on both Python 2
# and 3. The code below first normalizes all path components to unicode to
# stitch them together, and then converts the result back to a str type.
env[str('PATH')] = salt.utils.stringutils.to_str( # future lint: disable=blacklisted-function
os.pathsep.join((
salt.utils.path.join(path, 'shims'),
salt.utils.stringutils.to_unicode(os.environ['PATH'])
))
)
try:
cmdline = salt.utils.args.shlex_split(cmdline)

View file

@ -99,6 +99,15 @@ def __virtual__():
'RedHat-based distros >= version 7 use systemd, will not '
'load rh_service.py as virtual \'service\''
)
if __grains__['os'] == 'Amazon':
if int(osrelease_major) in (2016, 2017):
return __virtualname__
else:
return (
False,
'Amazon Linux >= version 2 uses systemd. Will not '
'load rh_service.py as virtual \'service\''
)
return __virtualname__
return (False, 'Cannot load rh_service module: OS not in {0}'.format(enable))

View file

@ -735,14 +735,14 @@ def get_config_file():
return __SYSLOG_NG_CONFIG_FILE
def _run_command(cmd, options=()):
def _run_command(cmd, options=(), env=None):
'''
Runs the command cmd with options as its CLI parameters and returns the
result as a dictionary.
'''
params = [cmd]
params.extend(options)
return __salt__['cmd.run_all'](params, python_shell=False)
return __salt__['cmd.run_all'](params, env=env, python_shell=False)
def _determine_config_version(syslog_ng_sbin_dir):
@ -785,49 +785,26 @@ def set_parameters(version=None,
return _format_return_data(0)
def _add_to_path_envvar(directory):
'''
Adds directory to the PATH environment variable and returns the original
one.
'''
orig_path = os.environ.get('PATH', '')
if directory:
if not os.path.isdir(directory):
log.error('The given parameter is not a directory')
os.environ['PATH'] = '{0}{1}{2}'.format(orig_path,
os.pathsep,
directory)
return orig_path
def _restore_path_envvar(original):
'''
Sets the PATH environment variable to the parameter.
'''
if original:
os.environ['PATH'] = original
def _run_command_in_extended_path(syslog_ng_sbin_dir, command, params):
'''
Runs the given command in an environment, where the syslog_ng_sbin_dir is
added then removed from the PATH.
Runs the specified command with the syslog_ng_sbin_dir in the PATH
'''
orig_path = _add_to_path_envvar(syslog_ng_sbin_dir)
if not salt.utils.path.which(command):
error_message = (
'Unable to execute the command \'{0}\'. It is not in the PATH.'
.format(command)
)
log.error(error_message)
_restore_path_envvar(orig_path)
raise CommandExecutionError(error_message)
ret = _run_command(command, options=params)
_restore_path_envvar(orig_path)
return ret
orig_path = os.environ.get('PATH', '')
env = None
if syslog_ng_sbin_dir:
# Custom environment variables should be str types. This code
# normalizes the paths to unicode to join them together, and then
# converts back to a str type.
env = {
str('PATH'): salt.utils.stringutils.to_str( # future lint: disable=blacklisted-function
os.pathsep.join(
salt.utils.data.decode(
(orig_path, syslog_ng_sbin_dir)
)
)
)
}
return _run_command(command, options=params, env=env)
def _format_return_data(retcode, stdout=None, stderr=None):

View file

@ -25,9 +25,6 @@ import salt.utils.path
import salt.utils.platform
from salt.exceptions import CommandExecutionError, SaltInvocationError
# Import 3rd-party libs
from salt.ext import six
__virtualname__ = 'system'
@ -509,7 +506,6 @@ def get_computer_desc():
salt '*' system.get_computer_desc
'''
desc = None
hostname_cmd = salt.utils.path.which('hostnamectl')
if hostname_cmd:
desc = __salt__['cmd.run'](
@ -517,6 +513,7 @@ def get_computer_desc():
python_shell=False
)
else:
desc = None
pattern = re.compile(r'^\s*PRETTY_HOSTNAME=(.*)$')
try:
with salt.utils.files.fopen('/etc/machine-info', 'r') as mach_info:
@ -528,12 +525,12 @@ def get_computer_desc():
desc = _strip_quotes(match.group(1).strip())
# no break so we get the last occurance
except IOError:
pass
if desc is None:
return False
if six.PY3:
desc = desc.replace('\\"', '"')
else:
desc = desc.replace('\\"', '"').decode('string_escape')
return desc
return desc.replace(r'\"', r'"').replace(r'\n', '\n').replace(r'\t', '\t')
def set_computer_desc(desc):
@ -551,10 +548,9 @@ def set_computer_desc(desc):
salt '*' system.set_computer_desc "Michael's laptop"
'''
if six.PY3:
desc = desc.replace('"', '\\"')
else:
desc = desc.encode('string_escape').replace('"', '\\"')
desc = salt.utils.stringutils.to_unicode(
desc).replace('"', r'\"').replace('\n', r'\n').replace('\t', r'\t')
hostname_cmd = salt.utils.path.which('hostnamectl')
if hostname_cmd:
result = __salt__['cmd.retcode'](
@ -567,23 +563,22 @@ def set_computer_desc(desc):
with salt.utils.files.fopen('/etc/machine-info', 'w'):
pass
is_pretty_hostname_found = False
pattern = re.compile(r'^\s*PRETTY_HOSTNAME=(.*)$')
new_line = 'PRETTY_HOSTNAME="{0}"'.format(desc)
new_line = salt.utils.stringutils.to_str('PRETTY_HOSTNAME="{0}"'.format(desc))
try:
with salt.utils.files.fopen('/etc/machine-info', 'r+') as mach_info:
lines = mach_info.readlines()
for i, line in enumerate(lines):
if pattern.match(line):
is_pretty_hostname_found = True
if pattern.match(salt.utils.stringutils.to_unicode(line)):
lines[i] = new_line
if not is_pretty_hostname_found:
break
else:
# PRETTY_HOSTNAME line was not found, add it to the end
lines.append(new_line)
# time to write our changes to the file
mach_info.seek(0, 0)
mach_info.truncate()
mach_info.write(salt.utils.stringutils.to_str(''.join(lines)))
mach_info.write(salt.utils.stringutils.to_str('\n'))
mach_info.writelines(lines)
return True
except IOError:
return False

View file

@ -6,7 +6,7 @@ Note that not all Windows applications will rehash the PATH environment variable
Only the ones that listen to the WM_SETTINGCHANGE message
http://support.microsoft.com/kb/104011
'''
from __future__ import absolute_import, unicode_literals, print_function
from __future__ import absolute_import, print_function, unicode_literals
# Import Python libs
import logging
@ -14,7 +14,10 @@ import os
import re
# Import Salt libs
import salt.utils.args
import salt.utils.data
import salt.utils.platform
import salt.utils.stringutils
# Import 3rd-party libs
from salt.ext.six.moves import map
@ -28,6 +31,12 @@ except ImportError:
# Settings
log = logging.getLogger(__name__)
HIVE = 'HKEY_LOCAL_MACHINE'
KEY = 'SYSTEM\\CurrentControlSet\\Control\\Session Manager\\Environment'
VNAME = 'PATH'
VTYPE = 'REG_EXPAND_SZ'
PATHSEP = str(os.pathsep) # future lint: disable=blacklisted-function
def __virtual__():
'''
@ -38,16 +47,17 @@ def __virtual__():
return (False, "Module win_path: module only works on Windows systems")
def _normalize_dir(string):
def _normalize_dir(string_):
'''
Normalize the directory to make comparison possible
'''
return re.sub(r'\\$', '', string.lower())
return re.sub(r'\\$', '', salt.utils.stringutils.to_unicode(string_))
def rehash():
'''
Send a WM_SETTINGCHANGE Broadcast to Windows to refresh the Environment variables
Send a WM_SETTINGCHANGE Broadcast to Windows to refresh the Environment
variables
CLI Example:
@ -68,9 +78,12 @@ def get_path():
salt '*' win_path.get_path
'''
ret = __salt__['reg.read_value']('HKEY_LOCAL_MACHINE',
'SYSTEM\\CurrentControlSet\\Control\\Session Manager\\Environment',
'PATH')['vdata'].split(';')
ret = salt.utils.stringutils.to_unicode(
__salt__['reg.read_value'](
'HKEY_LOCAL_MACHINE',
'SYSTEM\\CurrentControlSet\\Control\\Session Manager\\Environment',
'PATH')['vdata']
).split(';')
# Trim ending backslash
return list(map(_normalize_dir, ret))
@ -95,17 +108,30 @@ def exists(path):
path = _normalize_dir(path)
sysPath = get_path()
return path in sysPath
return path.lower() in (x.lower() for x in sysPath)
def add(path, index=0):
def _update_local_path(local_path):
os.environ[str('PATH')] = PATHSEP.join(local_path) # future lint: disable=blacklisted-function
def add(path, index=None, **kwargs):
'''
Add the directory to the SYSTEM path in the index location
Add the directory to the SYSTEM path in the index location. Returns
``True`` if successful, otherwise ``False``.
Returns:
boolean True if successful, False if unsuccessful
path
Directory to add to path
CLI Example:
index
Optionally specify an index at which to insert the directory
rehash : True
If the registry was updated, and this value is set to ``True``, sends a
WM_SETTINGCHANGE broadcast to refresh the environment variables. Set
this to ``False`` to skip this broadcast.
CLI Examples:
.. code-block:: bash
@ -115,56 +141,157 @@ def add(path, index=0):
# Will add to the end of the path
salt '*' win_path.add 'c:\\python27' index='-1'
'''
currIndex = -1
sysPath = get_path()
kwargs = salt.utils.args.clean_kwargs(**kwargs)
rehash_ = kwargs.pop('rehash', True)
if kwargs:
salt.utils.args.invalid_kwargs(kwargs)
path = _normalize_dir(path)
index = int(index)
path_str = salt.utils.stringutils.to_str(path)
system_path = get_path()
# validate index boundaries
if index < 0:
index = len(sysPath) + index + 1
if index > len(sysPath):
index = len(sysPath)
# The current path should not have any unicode in it, but don't take any
# chances.
local_path = [
salt.utils.stringutils.to_str(x)
for x in os.environ['PATH'].split(PATHSEP)
]
localPath = os.environ["PATH"].split(os.pathsep)
if path not in localPath:
localPath.append(path)
os.environ["PATH"] = os.pathsep.join(localPath)
if index is not None:
try:
index = int(index)
except (TypeError, ValueError):
index = None
# Check if we are in the system path at the right location
try:
currIndex = sysPath.index(path)
if currIndex != index:
sysPath.pop(currIndex)
def _check_path(dirs, path, index):
'''
Check the dir list for the specified path, at the specified index, and
make changes to the list if needed. Return True if changes were made to
the list, otherwise return False.
'''
dirs_lc = [x.lower() for x in dirs]
try:
# Check index with case normalized
cur_index = dirs_lc.index(path.lower())
except ValueError:
cur_index = None
num_dirs = len(dirs)
# if pos is None, we don't care about where the directory is in the
# PATH. If it is a number, then that number is the index to be used for
# insertion (this number will be different from the index if the index
# is less than -1, for reasons explained in the comments below). If it
# is the string 'END', then the directory must be at the end of the
# PATH, so it should be removed before appending if it is anywhere but
# the end.
pos = index
if index is not None:
if index >= num_dirs or index == -1:
# Set pos to 'END' so we know that we're moving the directory
# if it exists and isn't already at the end.
pos = 'END'
elif index <= -num_dirs:
# Negative index is too large, shift index to beginning of list
index = pos = 0
elif index <= 0:
# Negative indexes (other than -1 which is handled above) must
# be inserted at index + 1 for the item to end up in the
# position you want, since list.insert() inserts before the
# index passed to it. For example:
#
# >>> x = ['one', 'two', 'four', 'five']
# >>> x.insert(-3, 'three')
# >>> x
# ['one', 'three', 'two', 'four', 'five']
# >>> x = ['one', 'two', 'four', 'five']
# >>> x.insert(-2, 'three')
# >>> x
# ['one', 'two', 'three', 'four', 'five']
pos += 1
if pos == 'END':
if cur_index is not None:
if cur_index == num_dirs - 1:
# Directory is already in the desired location, no changes
# need to be made.
return False
else:
# Remove from current location and add it to the end
dirs.pop(cur_index)
dirs.append(path)
return True
else:
# Doesn't exist in list, add it to the end
dirs.append(path)
return True
elif index is None:
# If index is None, that means that if the path is not already in
# list, we will be appending it to the end instead of inserting it
# somewhere in the middle.
if cur_index is not None:
# Directory is already in the PATH, no changes need to be made.
return False
else:
# Directory not in the PATH, and we're not enforcing the index.
# Append it to the list.
dirs.append(path)
return True
else:
return True
except ValueError:
pass
# Add it to the Path
sysPath.insert(index, path)
regedit = __salt__['reg.set_value'](
'HKEY_LOCAL_MACHINE',
'SYSTEM\\CurrentControlSet\\Control\\Session Manager\\Environment',
'PATH',
';'.join(sysPath),
'REG_EXPAND_SZ'
)
# Broadcast WM_SETTINGCHANGE to Windows
if regedit:
return rehash()
else:
if cur_index is not None:
if (index < 0 and cur_index != (num_dirs + index)) \
or (index >= 0 and cur_index != index):
# Directory is present, but not at the desired index.
# Remove it from the non-normalized path list and insert it
# at the correct postition.
dirs.pop(cur_index)
dirs.insert(pos, path)
return True
else:
# Directory is present and its position matches the desired
# index. No changes need to be made.
return False
else:
# Insert the path at the desired index.
dirs.insert(pos, path)
return True
return False
if _check_path(local_path, path_str, index):
_update_local_path(local_path)
def remove(path):
if not _check_path(system_path, path, index):
# No changes necessary
return True
# Move forward with registry update
result = __salt__['reg.set_value'](
HIVE,
KEY,
VNAME,
';'.join(salt.utils.data.decode(system_path)),
VTYPE
)
if result and rehash_:
# Broadcast WM_SETTINGCHANGE to Windows if registry was updated
return rehash()
else:
return result
def remove(path, **kwargs):
r'''
Remove the directory from the SYSTEM path
Returns:
boolean True if successful, False if unsuccessful
rehash : True
If the registry was updated, and this value is set to ``True``, sends a
WM_SETTINGCHANGE broadcast to refresh the environment variables. Set
this to ``False`` to skip this broadcast.
CLI Example:
.. code-block:: bash
@ -172,27 +299,58 @@ def remove(path):
# Will remove C:\Python27 from the path
salt '*' win_path.remove 'c:\\python27'
'''
kwargs = salt.utils.args.clean_kwargs(**kwargs)
rehash_ = kwargs.pop('rehash', True)
if kwargs:
salt.utils.args.invalid_kwargs(kwargs)
path = _normalize_dir(path)
sysPath = get_path()
path_str = salt.utils.stringutils.to_str(path)
system_path = get_path()
localPath = os.environ["PATH"].split(os.pathsep)
if path in localPath:
localPath.remove(path)
os.environ["PATH"] = os.pathsep.join(localPath)
# The current path should not have any unicode in it, but don't take any
# chances.
local_path = [
salt.utils.stringutils.to_str(x)
for x in os.environ['PATH'].split(PATHSEP)
]
try:
sysPath.remove(path)
except ValueError:
def _check_path(dirs, path):
'''
Check the dir list for the specified path, and make changes to the list
if needed. Return True if changes were made to the list, otherwise
return False.
'''
dirs_lc = [x.lower() for x in dirs]
path_lc = path.lower()
new_dirs = []
for index, dirname in enumerate(dirs_lc):
if path_lc != dirname:
new_dirs.append(dirs[index])
if len(new_dirs) != len(dirs):
dirs[:] = new_dirs[:]
return True
else:
return False
if _check_path(local_path, path_str):
_update_local_path(local_path)
if not _check_path(system_path, path):
# No changes necessary
return True
regedit = __salt__['reg.set_value'](
'HKEY_LOCAL_MACHINE',
'SYSTEM\\CurrentControlSet\\Control\\Session Manager\\Environment',
'PATH',
';'.join(sysPath),
'REG_EXPAND_SZ'
result = __salt__['reg.set_value'](
HIVE,
KEY,
VNAME,
';'.join(salt.utils.data.decode(system_path)),
VTYPE
)
if regedit:
if result and rehash_:
# Broadcast WM_SETTINGCHANGE to Windows if registry was updated
return rehash()
else:
return False
return result

View file

@ -84,7 +84,7 @@ without extra parameters:
salt-run nacl.enc 'asecretpass'
salt-run nacl.dec 'tqXzeIJnTAM9Xf0mdLcpEdklMbfBGPj2oTKmlgrm3S1DTVVHNnh9h8mU1GKllGq/+cYsk6m5WhGdk58='
.. code-block:: yam
.. code-block:: yaml
# a salt developers minion could have pillar data that includes a nacl public key
nacl.config:

View file

@ -2,11 +2,13 @@
'''
Manage the Windows System PATH
'''
from __future__ import absolute_import, unicode_literals, print_function
from __future__ import absolute_import, print_function, unicode_literals
# Python Libs
import re
import os
# Import Salt libs
import salt.utils.stringutils
# Import 3rd-party libs
from salt.ext import six
def __virtual__():
@ -16,11 +18,9 @@ def __virtual__():
return 'win_path' if 'win_path.rehash' in __salt__ else False
def _normalize_dir(string):
'''
Normalize the directory to make comparison possible
'''
return re.sub(r'\\$', '', string.lower())
def _format_comments(ret, comments):
ret['comment'] = ' '.join(comments)
return ret
def absent(name):
@ -41,23 +41,24 @@ def absent(name):
'changes': {},
'comment': ''}
localPath = os.environ["PATH"].split(os.pathsep)
if name in localPath:
localPath.remove(name)
os.environ["PATH"] = os.pathsep.join(localPath)
if __salt__['win_path.exists'](name):
ret['changes']['removed'] = name
else:
if not __salt__['win_path.exists'](name):
ret['comment'] = '{0} is not in the PATH'.format(name)
return ret
if __opts__['test']:
ret['comment'] = '{0} would be removed from the PATH'.format(name)
ret['result'] = None
return ret
ret['result'] = __salt__['win_path.remove'](name)
if not ret['result']:
ret['comment'] = 'could not remove {0} from the PATH'.format(name)
__salt__['win_path.remove'](name)
if __salt__['win_path.exists'](name):
ret['comment'] = 'Failed to remove {0} from the PATH'.format(name)
ret['result'] = False
else:
ret['comment'] = 'Removed {0} from the PATH'.format(name)
ret['changes']['removed'] = name
return ret
@ -65,12 +66,16 @@ def exists(name, index=None):
'''
Add the directory to the system PATH at index location
index: where the directory should be placed in the PATH (default: None).
This is 0-indexed, so 0 means to prepend at the very start of the PATH.
[Note: Providing no index will append directory to PATH and
will not enforce its location within the PATH.]
index
Position where the directory should be placed in the PATH. This is
0-indexed, so 0 means to prepend at the very start of the PATH.
Example:
.. note::
If the index is not specified, and the directory needs to be added
to the PATH, then the directory will be appended to the PATH, and
this state will not enforce its location within the PATH.
Examples:
.. code-block:: yaml
@ -80,53 +85,149 @@ def exists(name, index=None):
'C:\\sysinternals':
win_path.exists:
- index: 0
'C:\\mystuff':
win_path.exists:
- index: -1
'''
try:
name = salt.utils.stringutils.to_unicode(name)
except TypeError:
name = six.text_type(name)
ret = {'name': name,
'result': True,
'changes': {},
'comment': ''}
# determine what to do
sysPath = __salt__['win_path.get_path']()
path = _normalize_dir(name)
localPath = os.environ["PATH"].split(os.pathsep)
if path not in localPath:
localPath.append(path)
os.environ["PATH"] = os.pathsep.join(localPath)
try:
currIndex = sysPath.index(path)
if index is not None:
index = int(index)
if index < 0:
index = len(sysPath) + index + 1
if index > len(sysPath):
index = len(sysPath)
# check placement within PATH
if currIndex != index:
sysPath.pop(currIndex)
ret['changes']['removed'] = '{0} was removed from index {1}'.format(name, currIndex)
else:
ret['comment'] = '{0} is already present in the PATH at the right location'.format(name)
return ret
else: # path is in system PATH; don't care where
ret['comment'] = '{0} is already present in the PATH at the right location'.format(name)
return ret
except ValueError:
pass
if index is None:
index = len(sysPath) # put it at the end
ret['changes']['added'] = '{0} will be added at index {1}'.format(name, index)
if __opts__['test']:
ret['result'] = None
if index is not None and not isinstance(index, six.integer_types):
ret['comment'] = 'Index must be an integer'
ret['result'] = False
return ret
# Add it
ret['result'] = __salt__['win_path.add'](path, index)
if not ret['result']:
ret['comment'] = 'could not add {0} to the PATH'.format(name)
def _get_path_lowercase():
return [x.lower() for x in __salt__['win_path.get_path']()]
def _index(path=None):
if path is None:
path = _get_path_lowercase()
try:
pos = path.index(name.lower())
except ValueError:
return None
else:
if index is not None and index < 0:
# Since a negative index was used, convert the index to a
# negative index to make the changes dict easier to read, as
# well as making comparisons manageable.
return -(len(path) - pos)
else:
return pos
def _changes(old, new):
return {'index': {'old': old, 'new': new}}
pre_path = _get_path_lowercase()
num_dirs = len(pre_path)
if index is not None:
if index > num_dirs:
ret.setdefault('warnings', []).append(
'There are only {0} directories in the PATH, using an index '
'of {0} instead of {1}.'.format(num_dirs, index)
)
index = num_dirs
elif index <= -num_dirs:
ret.setdefault('warnings', []).append(
'There are only {0} directories in the PATH, using an index '
'of 0 instead of {1}.'.format(num_dirs, index)
)
index = 0
old_index = _index(pre_path)
comments = []
if old_index is not None:
# Directory exists in PATH
if index is None:
# We're not enforcing the index, and the directory is in the PATH.
# There's nothing to do here.
comments.append('{0} already exists in the PATH.'.format(name))
return _format_comments(ret, comments)
else:
if index == old_index:
comments.append(
'{0} already exists in the PATH at index {1}.'.format(
name, index
)
)
return _format_comments(ret, comments)
else:
if __opts__['test']:
ret['result'] = None
comments.append(
'{0} would be moved from index {1} to {2}.'.format(
name, old_index, index
)
)
ret['changes'] = _changes(old_index, index)
return _format_comments(ret, comments)
else:
ret['changes']['added'] = '{0} was added at index {1}'.format(name, index)
return ret
# Directory does not exist in PATH
if __opts__['test']:
ret['result'] = None
comments.append(
'{0} would be added to the PATH{1}.'.format(
name,
' at index {0}'.format(index) if index is not None else ''
)
)
ret['changes'] = _changes(old_index, index)
return _format_comments(ret, comments)
try:
ret['result'] = __salt__['win_path.add'](name, index=index, rehash=False)
except Exception as exc:
comments.append('Encountered error: {0}.'.format(exc))
ret['result'] = False
if ret['result']:
ret['result'] = __salt__['win_path.rehash']()
if not ret['result']:
comments.append(
'Updated registry with new PATH, but failed to rehash.'
)
new_index = _index()
if ret['result']:
# If we have not already determined a False result based on the return
# from either win_path.add or win_path.rehash, check the new_index.
ret['result'] = new_index is not None \
if index is None \
else index == new_index
if index is not None and old_index is not None:
comments.append(
'{0} {1} from index {2} to {3}.'.format(
'Moved' if ret['result'] else 'Failed to move',
name,
old_index,
index
)
)
else:
comments.append(
'{0} {1} to the PATH{2}.'.format(
'Added' if ret['result'] else 'Failed to add',
name,
' at index {0}'.format(index) if index else ''
)
)
if old_index != new_index:
ret['changes'] = _changes(old_index, new_index)
return _format_comments(ret, comments)

View file

@ -67,26 +67,41 @@ def compare_lists(old=None, new=None):
return ret
def decode(data, encoding=None, errors='strict', preserve_dict_class=False, preserve_tuples=False):
def decode(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False, preserve_tuples=False):
'''
Generic function which will decode whichever type is passed, if necessary
If `strict` is True, and `keep` is False, and we fail to decode, a
UnicodeDecodeError will be raised. Passing `keep` as True allows for the
original value to silently be returned in cases where decoding fails. This
can be useful for cases where the data passed to this function is likely to
contain binary blobs, such as in the case of cp.recv.
'''
if isinstance(data, collections.Mapping):
return decode_dict(data, encoding, errors, preserve_dict_class, preserve_tuples)
return decode_dict(data, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(data, list):
return decode_list(data, encoding, errors, preserve_dict_class, preserve_tuples)
return decode_list(data, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(data, tuple):
return decode_tuple(data, encoding, errors, preserve_dict_class) \
return decode_tuple(data, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else decode_list(data, encoding, errors, preserve_dict_class, preserve_tuples)
else decode_list(data, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
return salt.utils.stringutils.to_unicode(data, encoding, errors)
except TypeError:
return data
pass
except UnicodeDecodeError:
if not keep:
raise
return data
def decode_dict(data, encoding=None, errors='strict', preserve_dict_class=False, preserve_tuples=False):
def decode_dict(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False, preserve_tuples=False):
'''
Decode all string values to Unicode
'''
@ -94,114 +109,158 @@ def decode_dict(data, encoding=None, errors='strict', preserve_dict_class=False,
rv = data.__class__() if preserve_dict_class else {}
for key, value in six.iteritems(data):
if isinstance(key, tuple):
key = decode_tuple(key, encoding, errors, preserve_dict_class) \
key = decode_tuple(key, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else decode_list(key, encoding, errors, preserve_dict_class, preserve_tuples)
else decode_list(key, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
key = salt.utils.stringutils.to_unicode(key, encoding, errors)
except TypeError:
pass
except UnicodeDecodeError:
if not keep:
raise
if isinstance(value, list):
value = decode_list(value, encoding, errors, preserve_dict_class, preserve_tuples)
value = decode_list(value, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(value, tuple):
value = decode_tuple(value, encoding, errors, preserve_dict_class) \
value = decode_tuple(value, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else decode_list(value, encoding, errors, preserve_dict_class, preserve_tuples)
else decode_list(value, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(value, collections.Mapping):
value = decode_dict(value, encoding, errors, preserve_dict_class, preserve_tuples)
value = decode_dict(value, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
value = salt.utils.stringutils.to_unicode(value, encoding, errors)
except TypeError:
pass
except UnicodeDecodeError:
if not keep:
raise
rv[key] = value
return rv
def decode_list(data, encoding=None, errors='strict', preserve_dict_class=False, preserve_tuples=False):
def decode_list(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False, preserve_tuples=False):
'''
Decode all string values to Unicode
'''
rv = []
for item in data:
if isinstance(item, list):
item = decode_list(item, encoding, errors, preserve_dict_class, preserve_tuples)
item = decode_list(item, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(item, tuple):
item = decode_tuple(item, encoding, errors, preserve_dict_class) \
item = decode_tuple(item, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else decode_list(item, encoding, errors, preserve_dict_class, preserve_tuples)
else decode_list(item, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(item, collections.Mapping):
item = decode_dict(item, encoding, errors, preserve_dict_class, preserve_tuples)
item = decode_dict(item, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
item = salt.utils.stringutils.to_unicode(item, encoding, errors)
except TypeError:
pass
except UnicodeDecodeError:
if not keep:
raise
rv.append(item)
return rv
def decode_tuple(data, encoding=None, errors='strict', preserve_dict_class=False):
def decode_tuple(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False):
'''
Decode all string values to Unicode
'''
return tuple(decode_list(data, encoding, errors, preserve_dict_class, True))
return tuple(
decode_list(data, encoding, errors, keep, preserve_dict_class, True))
def encode(data, encoding=None, errors='strict', preserve_dict_class=False, preserve_tuples=False):
def encode(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False, preserve_tuples=False):
'''
Generic function which will encode whichever type is passed, if necessary
If `strict` is True, and `keep` is False, and we fail to encode, a
UnicodeEncodeError will be raised. Passing `keep` as True allows for the
original value to silently be returned in cases where encoding fails. This
can be useful for cases where the data passed to this function is likely to
contain binary blobs.
'''
if isinstance(data, collections.Mapping):
return encode_dict(data, encoding, errors, preserve_dict_class, preserve_tuples)
return encode_dict(data, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(data, list):
return encode_list(data, encoding, errors, preserve_dict_class, preserve_tuples)
return encode_list(data, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(data, tuple):
return encode_tuple(data, encoding, errors, preserve_dict_class) \
return encode_tuple(data, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else encode_list(data, encoding, errors, preserve_dict_class, preserve_tuples)
else encode_list(data, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
return salt.utils.stringutils.to_bytes(data, encoding, errors)
except TypeError:
return data
pass
except UnicodeEncodeError:
if not keep:
raise
return data
@jinja_filter('json_decode_dict') # Remove this for Neon
@jinja_filter('json_encode_dict')
def encode_dict(data, encoding=None, errors='strict', preserve_dict_class=False, preserve_tuples=False):
def encode_dict(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False, preserve_tuples=False):
'''
Encode all string values to bytes
'''
rv = data.__class__() if preserve_dict_class else {}
for key, value in six.iteritems(data):
if isinstance(key, tuple):
key = encode_tuple(key, encoding, errors, preserve_dict_class) \
key = encode_tuple(key, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else encode_list(key, encoding, errors, preserve_dict_class, preserve_tuples)
else encode_list(key, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
key = salt.utils.stringutils.to_bytes(key, encoding, errors)
except TypeError:
pass
except UnicodeEncodeError:
if not keep:
raise
if isinstance(value, list):
value = encode_list(value, encoding, errors, preserve_dict_class, preserve_tuples)
value = encode_list(value, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(value, tuple):
value = encode_tuple(value, encoding, errors, preserve_dict_class) \
value = encode_tuple(value, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else encode_list(value, encoding, errors, preserve_dict_class, preserve_tuples)
else encode_list(value, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(value, collections.Mapping):
value = encode_dict(value, encoding, errors, preserve_dict_class, preserve_tuples)
value = encode_dict(value, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
value = salt.utils.stringutils.to_bytes(value, encoding, errors)
except TypeError:
pass
except UnicodeEncodeError:
if not keep:
raise
rv[key] = value
return rv
@ -209,35 +268,44 @@ def encode_dict(data, encoding=None, errors='strict', preserve_dict_class=False,
@jinja_filter('json_decode_list') # Remove this for Neon
@jinja_filter('json_encode_list')
def encode_list(data, encoding=None, errors='strict', preserve_dict_class=False, preserve_tuples=False):
def encode_list(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False, preserve_tuples=False):
'''
Encode all string values to bytes
'''
rv = []
for item in data:
if isinstance(item, list):
item = encode_list(item, encoding, errors, preserve_dict_class, preserve_tuples)
item = encode_list(item, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(item, tuple):
item = encode_tuple(item, encoding, errors, preserve_dict_class) \
item = encode_tuple(item, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else encode_list(item, encoding, errors, preserve_dict_class, preserve_tuples)
else encode_list(item, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(item, collections.Mapping):
item = encode_dict(item, encoding, errors, preserve_dict_class, preserve_tuples)
item = encode_dict(item, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
item = salt.utils.stringutils.to_bytes(item, encoding, errors)
except TypeError:
pass
except UnicodeEncodeError:
if not keep:
raise
rv.append(item)
return rv
def encode_tuple(data, encoding=None, errors='strict', preserve_dict_class=False):
def encode_tuple(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False):
'''
Encode all string values to Unicode
'''
return tuple(encode_list(data, encoding, errors, preserve_dict_class, True))
return tuple(
encode_list(data, encoding, errors, keep, preserve_dict_class, True))
@jinja_filter('exactly_n_true')

View file

@ -14,6 +14,7 @@ from collections import defaultdict
# Import salt libs
import salt.utils.args
import salt.utils.data
from salt.exceptions import CommandExecutionError, SaltConfigurationError
from salt.log import LOG_LEVELS
@ -579,3 +580,19 @@ def ignores_kwargs(*kwarg_names):
return fn(*args, **kwargs_filtered)
return __ignores_kwargs
return _ignores_kwargs
def ensure_unicode_args(function):
'''
Decodes all arguments passed to the wrapped function
'''
@wraps(function)
def wrapped(*args, **kwargs):
if six.PY2:
return function(
*salt.utils.data.decode_list(args),
**salt.utils.data.decode_dict(kwargs)
)
else:
return function(*args, **kwargs)
return wrapped

View file

@ -14,7 +14,6 @@ import posixpath
import re
import string
import struct
import sys
# Import Salt libs
import salt.utils.args
@ -202,7 +201,9 @@ def which(exe=None):
# executable in cwd or fullpath
return exe
ext_list = os.environ.get('PATHEXT', '.EXE').split(';')
ext_list = salt.utils.stringutils.to_str(
os.environ.get('PATHEXT', str('.EXE'))
).split(str(';'))
@real_memoize
def _exe_has_ext():
@ -212,8 +213,13 @@ def which(exe=None):
'''
for ext in ext_list:
try:
pattern = r'.*\.' + ext.lstrip('.') + r'$'
re.match(pattern, exe, re.I).groups()
pattern = r'.*\.{0}$'.format(
salt.utils.stringutils.to_unicode(ext).lstrip('.')
)
re.match(
pattern,
salt.utils.stringutils.to_unicode(exe),
re.I).groups()
return True
except AttributeError:
continue
@ -221,13 +227,17 @@ def which(exe=None):
# Enhance POSIX path for the reliability at some environments, when $PATH is changing
# This also keeps order, where 'first came, first win' for cases to find optional alternatives
search_path = os.environ.get('PATH') and os.environ['PATH'].split(os.pathsep) or list()
for default_path in ['/bin', '/sbin', '/usr/bin', '/usr/sbin', '/usr/local/bin']:
if default_path not in search_path:
search_path.append(default_path)
os.environ['PATH'] = os.pathsep.join(search_path)
system_path = salt.utils.stringutils.to_unicode(os.environ.get('PATH', ''))
search_path = system_path.split(os.pathsep)
if not salt.utils.platform.is_windows():
search_path.extend([
x for x in ('/bin', '/sbin', '/usr/bin',
'/usr/sbin', '/usr/local/bin')
if x not in search_path
])
for path in search_path:
full_path = os.path.join(path, exe)
full_path = join(path, exe)
if _is_executable_file_or_link(full_path):
return full_path
elif salt.utils.platform.is_windows() and not _exe_has_ext():
@ -296,27 +306,12 @@ def join(*parts, **kwargs):
# No args passed to func
return ''
root = salt.utils.stringutils.to_unicode(root)
if not parts:
ret = root
else:
stripped = [p.lstrip(os.sep) for p in parts]
try:
ret = pathlib.join(root, *stripped)
except UnicodeDecodeError:
# This is probably Python 2 and one of the parts contains unicode
# characters in a bytestring. First try to decode to the system
# encoding.
try:
enc = __salt_system_encoding__
except NameError:
enc = sys.stdin.encoding or sys.getdefaultencoding()
try:
ret = pathlib.join(root.decode(enc),
*[x.decode(enc) for x in stripped])
except UnicodeDecodeError:
# Last resort, try UTF-8
ret = pathlib.join(root.decode('UTF-8'),
*[x.decode('UTF-8') for x in stripped])
ret = pathlib.join(root, *salt.utils.data.decode(stripped))
return pathlib.normpath(ret)

View file

@ -7,7 +7,6 @@ Set up the version of Salt
from __future__ import absolute_import, print_function, unicode_literals
import re
import sys
import locale
import platform
# linux_distribution depreacted in py3.7
@ -674,7 +673,7 @@ def system_information():
('release', release),
('machine', platform.machine()),
('version', version),
('locale', locale.getpreferredencoding()),
('locale', __salt_system_encoding__),
]
for name, attr in system:

View file

@ -522,7 +522,8 @@ class Sdist(sdist):
self.run_command('write_salt_ssh_packaging_file')
self.filelist.files.append(os.path.basename(PACKAGED_FOR_SALT_SSH_FILE))
sdist.make_release_tree(self, base_dir, files)
pkgfiles = [pkgfile if IS_PY3 else pkgfile.decode(__salt_system_encoding__) for pkgfile in files]
sdist.make_release_tree(self, base_dir, pkgfiles)
# Let's generate salt/_version.py to include in the sdist tarball
self.distribution.running_salt_sdist = True

View file

@ -6,6 +6,9 @@ from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Testing libs
from tests.support.case import ModuleCase
# Import 3rd-party libs
from salt.ext import six
class StdTest(ModuleCase):
'''
@ -85,10 +88,10 @@ class StdTest(ModuleCase):
'inner': 'value'}
)
data = ret['minion']['ret']
self.assertIn('str', data['args'][0])
self.assertIn(six.text_type.__name__, data['args'][0])
self.assertIn('int', data['args'][1])
self.assertIn('dict', data['kwargs']['outer'])
self.assertIn('str', data['kwargs']['inner'])
self.assertIn(six.text_type.__name__, data['kwargs']['inner'])
def test_full_return_kwarg(self):
ret = self.client.cmd('minion', 'test.ping', full_return=True)

View file

@ -0,0 +1 @@

View file

@ -7,6 +7,7 @@ import logging
import os
import signal
import subprocess
import textwrap
# Import Salt Testing libs
from tests.support.case import ModuleCase
@ -318,7 +319,7 @@ class SystemModuleTest(ModuleCase):
@skip_if_not_root
def test_set_computer_desc(self):
'''
Test setting the system hostname
Test setting the computer description
'''
self._save_machine_info()
desc = "test"
@ -328,6 +329,28 @@ class SystemModuleTest(ModuleCase):
self.assertTrue(ret)
self.assertIn(desc, computer_desc)
@destructiveTest
@skip_if_not_root
def test_set_computer_desc_multiline(self):
'''
Test setting the computer description with a multiline string with tabs
and double-quotes.
'''
self._save_machine_info()
desc = textwrap.dedent('''\
'First Line
\tSecond Line: 'single-quoted string'
\t\tThird Line: "double-quoted string with unicode: питон"''')
ret = self.run_function('system.set_computer_desc', [desc])
# self.run_function returns the serialized return, we need to convert
# back to unicode to compare to desc. in the assertIn below.
computer_desc = salt.utils.stringutils.to_unicode(
self.run_function('system.get_computer_desc')
)
self.assertTrue(ret)
self.assertIn(desc, computer_desc)
@skip_if_not_root
def test_has_hwclock(self):
'''

View file

@ -1170,6 +1170,27 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
finally:
shutil.rmtree(name, ignore_errors=True)
def test_recurse_issue_40578(self):
'''
This ensures that the state doesn't raise an exception when it
encounters a file with a unicode filename in the process of invoking
file.source_list.
'''
issue_dir = 'issue-40578'
name = os.path.join(TMP, issue_dir)
try:
ret = self.run_state('file.recurse',
name=name,
source='salt://соль')
self.assertSaltTrueReturn(ret)
self.assertEqual(
sorted(salt.utils.data.decode(os.listdir(name))),
sorted(['foo.txt', 'спам.txt', 'яйца.txt'])
)
finally:
shutil.rmtree(name, ignore_errors=True)
def test_replace(self):
'''
file.replace
@ -2214,8 +2235,6 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
'+마지막 행\n'
)
diff = salt.utils.stringutils.to_str(diff)
# using unicode.encode('utf-8') we should get the same as
# an utf-8 string
# future_lint: disable=blacklisted-function
expected = {
str('file_|-some-utf8-file-create_|-{0}_|-managed').format(test_file_encoded): {

View file

@ -16,17 +16,43 @@ from tests.support.mock import (
NO_MOCK,
NO_MOCK_REASON
)
try:
import pytest
except ImportError as import_error:
pytest = None
# Import Salt Libs
import salt.modules.localemod as localemod
from salt.exceptions import CommandExecutionError
from salt.ext import six
@skipIf(not pytest, False)
@skipIf(NO_MOCK, NO_MOCK_REASON)
class LocalemodTestCase(TestCase, LoaderModuleMockMixin):
'''
Test cases for salt.modules.localemod
'''
locale_ctl_out = '''
System Locale: LANG=de_DE.utf8
LANGUAGE=de_DE.utf8
VC Keymap: n/a
X11 Layout: us
X11 Model: pc105
'''
locale_ctl_out_empty = ''
locale_ctl_out_broken = '''
System error:Recursive traversal of loopback mount points
'''
locale_ctl_out_structure = '''
Main: printers=We're upgrading /dev/null
racks=hardware stress fractures
failure=Ionisation from the air-conditioning
Cow say: errors=We're out of slots on the server
hardware=high pressure system failure
Reason: The vendor put the bug there.
'''
def setup_loader_modules(self):
return {localemod: {}}
@ -36,87 +62,451 @@ class LocalemodTestCase(TestCase, LoaderModuleMockMixin):
'''
with patch.dict(localemod.__salt__,
{'cmd.run': MagicMock(return_value='A\nB')}):
self.assertEqual(localemod.list_avail(), ['A', 'B'])
assert localemod.list_avail() == ['A', 'B']
def test_get_locale(self):
@patch('salt.utils.which', MagicMock(return_value="/usr/bin/localctl"))
@patch('salt.modules.localemod.__salt__', {'cmd.run': MagicMock(return_value=locale_ctl_out)})
def test_localectl_status_parser(self):
'''
Test for Get the current system locale
Test localectl status parser.
:return:
'''
with patch.dict(localemod.__context__, {'salt.utils.systemd.booted': True}):
with patch.dict(localemod.__grains__, {'os_family': ['Unknown']}):
with patch.multiple(localemod,
_parse_dbus_locale=MagicMock(return_value={'LANG': 'A'}),
HAS_DBUS=True):
self.assertEqual('A', localemod.get_locale())
localemod._parse_dbus_locale.assert_called_once_with()
out = localemod._localectl_status()
assert isinstance(out, dict)
for key in ['system_locale', 'vc_keymap', 'x11_layout', 'x11_model']:
assert key in out
assert isinstance(out['system_locale'], dict)
assert 'LANG' in out['system_locale']
assert 'LANGUAGE' in out['system_locale']
assert out['system_locale']['LANG'] == out['system_locale']['LANGUAGE'] == 'de_DE.utf8'
assert out['vc_keymap'] == 'n/a'
assert out['x11_layout'] == 'us'
assert out['x11_model'] == 'pc105'
with patch.multiple(localemod,
_parse_localectl=MagicMock(return_value={'LANG': 'A'}),
HAS_DBUS=False):
self.assertEqual('A', localemod.get_locale())
localemod._parse_localectl.assert_called_once_with()
with patch.dict(localemod.__context__, {'salt.utils.systemd.booted': False}):
with patch.dict(localemod.__grains__, {'os_family': ['Gentoo']}):
with patch.dict(localemod.__salt__, {'cmd.run': MagicMock(return_value='A')}):
with patch.object(localemod,
'_parse_localectl',
return_value={'LANG': 'A'}):
self.assertEqual(localemod.get_locale(), 'A')
with patch.dict(localemod.__grains__, {'os_family': ['RedHat']}):
with patch.dict(localemod.__salt__, {'cmd.run': MagicMock(return_value='A=B')}):
with patch.object(localemod,
'_parse_localectl',
return_value={'LANG': 'B'}):
self.assertEqual(localemod.get_locale(), 'B')
with patch.dict(localemod.__grains__, {'os_family': ['Unknown']}):
with patch.dict(localemod.__salt__, {'cmd.run': MagicMock(return_value='A=B')}):
self.assertRaises(CommandExecutionError, localemod.get_locale)
def test_set_locale(self):
@patch('salt.modules.localemod.dbus', MagicMock())
def test_dbus_locale_parser_matches(self):
'''
Test for Sets the current system locale
Test dbus locale status parser matching the results.
:return:
'''
with patch.dict(localemod.__context__, {'salt.utils.systemd.booted': True}):
with patch.dict(localemod.__grains__, {'os_family': ['Unknown']}):
with patch.object(localemod, '_localectl_set', return_value=True):
self.assertTrue(localemod.set_locale('l'))
i_dbus = MagicMock()
i_dbus.Get = MagicMock(return_value=['LANG=de_DE.utf8'])
dbus = MagicMock(return_value=i_dbus)
with patch.dict(localemod.__context__, {'salt.utils.systemd.booted': False}):
with patch.dict(localemod.__grains__, {'os_family': ['Gentoo']}):
with patch.dict(localemod.__salt__, {'cmd.retcode': MagicMock(return_value='A')}):
with patch.object(localemod,
'_parse_localectl',
return_value={'LANG': 'B'}):
self.assertFalse(localemod.set_locale('l'))
with patch('salt.modules.localemod.dbus.Interface', dbus):
out = localemod._parse_dbus_locale()
assert isinstance(out, dict)
assert 'LANG' in out
assert out['LANG'] == 'de_DE.utf8'
with patch.dict(localemod.__grains__, {'os_family': ['A']}):
with patch.dict(localemod.__salt__, {'cmd.retcode': MagicMock(return_value=0)}):
with patch('salt.utils.systemd.booted', return_value=False):
self.assertRaises(CommandExecutionError, localemod.set_locale, 'A')
@patch('salt.modules.localemod.dbus', MagicMock())
@patch('salt.modules.localemod.log', MagicMock())
def test_dbus_locale_parser_doesnot_matches(self):
'''
Test dbus locale status parser does not matching the results.
:return:
'''
i_dbus = MagicMock()
i_dbus.Get = MagicMock(return_value=['Fatal error right in front of screen'])
dbus = MagicMock(return_value=i_dbus)
with patch('salt.modules.localemod.dbus.Interface', dbus):
out = localemod._parse_dbus_locale()
assert isinstance(out, dict)
assert 'LANG' not in out
assert localemod.log.error.called
msg = localemod.log.error.call_args[0][0] % localemod.log.error.call_args[0][1]
assert msg == ('Odd locale parameter "Fatal error right in front of screen" detected in dbus locale output.'
' This should not happen. You should probably investigate what caused this.')
@patch('salt.utils.which', MagicMock(return_value=None))
@patch('salt.modules.localemod.log', MagicMock())
def test_localectl_status_parser_no_systemd(self):
'''
Test localectl status parser raises an exception if no systemd installed.
:return:
'''
with pytest.raises(CommandExecutionError) as err:
localemod._localectl_status()
assert 'Unable to find "localectl"' in six.text_type(err)
assert not localemod.log.debug.called
@patch('salt.utils.which', MagicMock(return_value="/usr/bin/localctl"))
@patch('salt.modules.localemod.__salt__', {'cmd.run': MagicMock(return_value=locale_ctl_out_empty)})
def test_localectl_status_parser_empty(self):
with pytest.raises(CommandExecutionError) as err:
localemod._localectl_status()
assert 'Unable to parse result of "localectl"' in six.text_type(err)
@patch('salt.utils.which', MagicMock(return_value="/usr/bin/localctl"))
@patch('salt.modules.localemod.__salt__', {'cmd.run': MagicMock(return_value=locale_ctl_out_broken)})
def test_localectl_status_parser_broken(self):
with pytest.raises(CommandExecutionError) as err:
localemod._localectl_status()
assert 'Unable to parse result of "localectl"' in six.text_type(err)
@patch('salt.utils.which', MagicMock(return_value="/usr/bin/localctl"))
@patch('salt.modules.localemod.__salt__', {'cmd.run': MagicMock(return_value=locale_ctl_out_structure)})
def test_localectl_status_parser_structure(self):
out = localemod._localectl_status()
assert isinstance(out, dict)
for key in ['main', 'cow_say']:
assert isinstance(out[key], dict)
for in_key in out[key]:
assert isinstance(out[key][in_key], six.text_type)
assert isinstance(out['reason'], six.text_type)
@patch('salt.utils.which', MagicMock(return_value="/usr/bin/localctl"))
@patch('salt.modules.localemod.__grains__', {'os_family': 'Ubuntu', 'osmajorrelease': 42})
@patch('salt.modules.localemod.dbus', None)
@patch('salt.modules.localemod._parse_dbus_locale', MagicMock(return_value={'LANG': 'en_US.utf8'}))
@patch('salt.modules.localemod._localectl_status', MagicMock(return_value={'system_locale': {'LANG': 'de_DE.utf8'}}))
@patch('salt.utils.systemd.booted', MagicMock(return_value=True))
def test_get_locale_with_systemd_nodbus(self):
'''
Test getting current system locale with systemd but no dbus available.
:return:
'''
assert localemod.get_locale() == 'de_DE.utf8'
@patch('salt.utils.which', MagicMock(return_value="/usr/bin/localctl"))
@patch('salt.modules.localemod.__grains__', {'os_family': 'Ubuntu', 'osmajorrelease': 42})
@patch('salt.modules.localemod.dbus', True)
@patch('salt.modules.localemod._parse_dbus_locale', MagicMock(return_value={'LANG': 'en_US.utf8'}))
@patch('salt.modules.localemod._localectl_status', MagicMock(return_value={'system_locale': {'LANG': 'de_DE.utf8'}}))
@patch('salt.utils.systemd.booted', MagicMock(return_value=True))
def test_get_locale_with_systemd_and_dbus(self):
'''
Test getting current system locale with systemd and dbus available.
:return:
'''
assert localemod.get_locale() == 'en_US.utf8'
@patch('salt.utils.which', MagicMock(return_value="/usr/bin/localctl"))
@patch('salt.modules.localemod.__grains__', {'os_family': 'Suse', 'osmajorrelease': 12})
@patch('salt.modules.localemod.dbus', True)
@patch('salt.modules.localemod._parse_dbus_locale', MagicMock(return_value={'LANG': 'en_US.utf8'}))
@patch('salt.modules.localemod._localectl_status', MagicMock(return_value={'system_locale': {'LANG': 'de_DE.utf8'}}))
@patch('salt.modules.localemod.__salt__', {'cmd.run': MagicMock()})
@patch('salt.utils.systemd.booted', MagicMock(return_value=True))
def test_get_locale_with_systemd_and_dbus_sle12(self):
'''
Test getting current system locale with systemd and dbus available on SLE12.
:return:
'''
localemod.get_locale()
assert localemod.__salt__['cmd.run'].call_args[0][0] == 'grep "^RC_LANG" /etc/sysconfig/language'
@patch('salt.utils.which', MagicMock(return_value=None))
@patch('salt.modules.localemod.__grains__', {'os_family': 'RedHat', 'osmajorrelease': 12})
@patch('salt.modules.localemod.dbus', None)
@patch('salt.modules.localemod.__salt__', {'cmd.run': MagicMock()})
@patch('salt.utils.systemd.booted', MagicMock(return_value=False))
def test_get_locale_with_no_systemd_redhat(self):
'''
Test getting current system locale with systemd and dbus available on RedHat.
:return:
'''
localemod.get_locale()
assert localemod.__salt__['cmd.run'].call_args[0][0] == 'grep "^LANG=" /etc/sysconfig/i18n'
@patch('salt.utils.which', MagicMock(return_value=None))
@patch('salt.modules.localemod.__grains__', {'os_family': 'Debian', 'osmajorrelease': 12})
@patch('salt.modules.localemod.dbus', None)
@patch('salt.modules.localemod.__salt__', {'cmd.run': MagicMock()})
@patch('salt.utils.systemd.booted', MagicMock(return_value=False))
def test_get_locale_with_no_systemd_debian(self):
'''
Test getting current system locale with systemd and dbus available on Debian.
:return:
'''
localemod.get_locale()
assert localemod.__salt__['cmd.run'].call_args[0][0] == 'grep "^LANG=" /etc/default/locale'
@patch('salt.utils.which', MagicMock(return_value=None))
@patch('salt.modules.localemod.__grains__', {'os_family': 'Gentoo', 'osmajorrelease': 12})
@patch('salt.modules.localemod.dbus', None)
@patch('salt.modules.localemod.__salt__', {'cmd.run': MagicMock()})
@patch('salt.utils.systemd.booted', MagicMock(return_value=False))
def test_get_locale_with_no_systemd_gentoo(self):
'''
Test getting current system locale with systemd and dbus available on Gentoo.
:return:
'''
localemod.get_locale()
assert localemod.__salt__['cmd.run'].call_args[0][0] == 'eselect --brief locale show'
@patch('salt.utils.which', MagicMock(return_value=None))
@patch('salt.modules.localemod.__grains__', {'os_family': 'Solaris', 'osmajorrelease': 12})
@patch('salt.modules.localemod.dbus', None)
@patch('salt.modules.localemod.__salt__', {'cmd.run': MagicMock()})
@patch('salt.utils.systemd.booted', MagicMock(return_value=False))
def test_get_locale_with_no_systemd_slowlaris(self):
'''
Test getting current system locale with systemd and dbus available on Solaris.
:return:
'''
localemod.get_locale()
assert localemod.__salt__['cmd.run'].call_args[0][0] == 'grep "^LANG=" /etc/default/init'
@patch('salt.utils.which', MagicMock(return_value=None))
@patch('salt.modules.localemod.__grains__', {'os_family': 'BSD', 'osmajorrelease': 8, 'oscodename': 'DrunkDragon'})
@patch('salt.modules.localemod.dbus', None)
@patch('salt.modules.localemod.__salt__', {'cmd.run': MagicMock()})
@patch('salt.utils.systemd.booted', MagicMock(return_value=False))
def test_get_locale_with_no_systemd_unknown(self):
'''
Test getting current system locale with systemd and dbus available on Gentoo.
:return:
'''
with pytest.raises(CommandExecutionError) as err:
localemod.get_locale()
assert '"DrunkDragon" is unsupported' in six.text_type(err)
@patch('salt.utils.which', MagicMock(return_value="/usr/bin/localctl"))
@patch('salt.modules.localemod.__grains__', {'os_family': 'Ubuntu', 'osmajorrelease': 42})
@patch('salt.modules.localemod.dbus', None)
@patch('salt.utils.systemd.booted', MagicMock(return_value=True))
@patch('salt.modules.localemod._localectl_set', MagicMock())
def test_set_locale_with_systemd_nodbus(self):
'''
Test setting current system locale with systemd but no dbus available.
:return:
'''
loc = 'de_DE.utf8'
localemod.set_locale(loc)
assert localemod._localectl_set.call_args[0][0] == 'de_DE.utf8'
@patch('salt.utils.which', MagicMock(return_value="/usr/bin/localctl"))
@patch('salt.modules.localemod.__grains__', {'os_family': 'Ubuntu', 'osmajorrelease': 42})
@patch('salt.modules.localemod.dbus', True)
@patch('salt.utils.systemd.booted', MagicMock(return_value=True))
@patch('salt.modules.localemod._localectl_set', MagicMock())
def test_set_locale_with_systemd_and_dbus(self):
'''
Test setting current system locale with systemd and dbus available.
:return:
'''
loc = 'de_DE.utf8'
localemod.set_locale(loc)
assert localemod._localectl_set.call_args[0][0] == 'de_DE.utf8'
@patch('salt.utils.which', MagicMock(return_value="/usr/bin/localctl"))
@patch('salt.modules.localemod.__grains__', {'os_family': 'Suse', 'osmajorrelease': 12})
@patch('salt.modules.localemod.dbus', True)
@patch('salt.modules.localemod.__salt__', MagicMock())
@patch('salt.modules.localemod._localectl_set', MagicMock())
@patch('salt.utils.systemd.booted', MagicMock(return_value=True))
def test_set_locale_with_systemd_and_dbus_sle12(self):
'''
Test setting current system locale with systemd and dbus available on SLE12.
:return:
'''
loc = 'de_DE.utf8'
localemod.set_locale(loc)
assert not localemod._localectl_set.called
assert localemod.__salt__['file.replace'].called
assert localemod.__salt__['file.replace'].call_args[0][0] == '/etc/sysconfig/language'
assert localemod.__salt__['file.replace'].call_args[0][1] == '^RC_LANG=.*'
assert localemod.__salt__['file.replace'].call_args[0][2] == 'RC_LANG="{}"'.format(loc)
@patch('salt.utils.which', MagicMock(return_value=None))
@patch('salt.modules.localemod.__grains__', {'os_family': 'RedHat', 'osmajorrelease': 42})
@patch('salt.modules.localemod.dbus', None)
@patch('salt.modules.localemod.__salt__', MagicMock())
@patch('salt.modules.localemod._localectl_set', MagicMock())
@patch('salt.utils.systemd.booted', MagicMock(return_value=False))
def test_set_locale_with_no_systemd_redhat(self):
'''
Test setting current system locale with systemd and dbus available on RedHat.
:return:
'''
loc = 'de_DE.utf8'
localemod.set_locale(loc)
assert not localemod._localectl_set.called
assert localemod.__salt__['file.replace'].called
assert localemod.__salt__['file.replace'].call_args[0][0] == '/etc/sysconfig/i18n'
assert localemod.__salt__['file.replace'].call_args[0][1] == '^LANG=.*'
assert localemod.__salt__['file.replace'].call_args[0][2] == 'LANG="{}"'.format(loc)
@patch('salt.utils.which', MagicMock(return_value=None))
@patch('salt.utils.path.which', MagicMock(return_value='/usr/sbin/update-locale'))
@patch('salt.modules.localemod.__grains__', {'os_family': 'Debian', 'osmajorrelease': 42})
@patch('salt.modules.localemod.dbus', None)
@patch('salt.modules.localemod.__salt__', MagicMock())
@patch('salt.modules.localemod._localectl_set', MagicMock())
@patch('salt.utils.systemd.booted', MagicMock(return_value=False))
def test_set_locale_with_no_systemd_debian(self):
'''
Test setting current system locale with systemd and dbus available on Debian.
:return:
'''
loc = 'de_DE.utf8'
localemod.set_locale(loc)
assert not localemod._localectl_set.called
assert localemod.__salt__['file.replace'].called
assert localemod.__salt__['file.replace'].call_args[0][0] == '/etc/default/locale'
assert localemod.__salt__['file.replace'].call_args[0][1] == '^LANG=.*'
assert localemod.__salt__['file.replace'].call_args[0][2] == 'LANG="{}"'.format(loc)
@patch('salt.utils.which', MagicMock(return_value=None))
@patch('salt.utils.path.which', MagicMock(return_value=None))
@patch('salt.modules.localemod.__grains__', {'os_family': 'Debian', 'osmajorrelease': 42})
@patch('salt.modules.localemod.dbus', None)
@patch('salt.modules.localemod.__salt__', MagicMock())
@patch('salt.modules.localemod._localectl_set', MagicMock())
@patch('salt.utils.systemd.booted', MagicMock(return_value=False))
def test_set_locale_with_no_systemd_debian_no_update_locale(self):
'''
Test setting current system locale with systemd and dbus available on Debian but update-locale is not installed.
:return:
'''
loc = 'de_DE.utf8'
with pytest.raises(CommandExecutionError) as err:
localemod.set_locale(loc)
assert not localemod._localectl_set.called
assert 'Cannot set locale: "update-locale" was not found.' in six.text_type(err)
@patch('salt.utils.which', MagicMock(return_value=None))
@patch('salt.modules.localemod.__grains__', {'os_family': 'Gentoo', 'osmajorrelease': 42})
@patch('salt.modules.localemod.dbus', None)
@patch('salt.modules.localemod.__salt__', MagicMock())
@patch('salt.modules.localemod._localectl_set', MagicMock())
@patch('salt.utils.systemd.booted', MagicMock(return_value=False))
def test_set_locale_with_no_systemd_gentoo(self):
'''
Test setting current system locale with systemd and dbus available on Gentoo.
:return:
'''
loc = 'de_DE.utf8'
localemod.set_locale(loc)
assert not localemod._localectl_set.called
assert localemod.__salt__['cmd.retcode'].call_args[0][0] == 'eselect --brief locale set de_DE.utf8'
@patch('salt.utils.which', MagicMock(return_value=None))
@patch('salt.modules.localemod.__grains__', {'os_family': 'Solaris', 'osmajorrelease': 42})
@patch('salt.modules.localemod.dbus', None)
@patch('salt.modules.localemod.__salt__', {'locale.list_avail': MagicMock(return_value=['de_DE.utf8']),
'file.replace': MagicMock()})
@patch('salt.modules.localemod._localectl_set', MagicMock())
@patch('salt.utils.systemd.booted', MagicMock(return_value=False))
def test_set_locale_with_no_systemd_slowlaris_with_list_avail(self):
'''
Test setting current system locale with systemd and dbus available on Slowlaris.
The list_avail returns the proper locale.
:return:
'''
loc = 'de_DE.utf8'
localemod.set_locale(loc)
assert not localemod._localectl_set.called
assert localemod.__salt__['file.replace'].called
assert localemod.__salt__['file.replace'].call_args[0][0] == '/etc/default/init'
assert localemod.__salt__['file.replace'].call_args[0][1] == '^LANG=.*'
assert localemod.__salt__['file.replace'].call_args[0][2] == 'LANG="{}"'.format(loc)
@patch('salt.utils.which', MagicMock(return_value=None))
@patch('salt.modules.localemod.__grains__', {'os_family': 'Solaris', 'osmajorrelease': 42})
@patch('salt.modules.localemod.dbus', None)
@patch('salt.modules.localemod.__salt__', {'locale.list_avail': MagicMock(return_value=['en_GB.utf8']),
'file.replace': MagicMock()})
@patch('salt.modules.localemod._localectl_set', MagicMock())
@patch('salt.utils.systemd.booted', MagicMock(return_value=False))
def test_set_locale_with_no_systemd_slowlaris_without_list_avail(self):
'''
Test setting current system locale with systemd and dbus is not available on Slowlaris.
The list_avail does not return the proper locale.
:return:
'''
loc = 'de_DE.utf8'
assert not localemod.set_locale(loc)
assert not localemod._localectl_set.called
assert not localemod.__salt__['file.replace'].called
@patch('salt.utils.which', MagicMock(return_value=None))
@patch('salt.modules.localemod.__grains__', {'os_family': 'BSD', 'osmajorrelease': 42})
@patch('salt.modules.localemod.dbus', None)
@patch('salt.modules.localemod.__salt__', {'locale.list_avail': MagicMock(return_value=['en_GB.utf8']),
'file.replace': MagicMock()})
@patch('salt.modules.localemod._localectl_set', MagicMock())
@patch('salt.utils.systemd.booted', MagicMock(return_value=False))
def test_set_locale_with_no_systemd_unknown(self):
'''
Test setting current system locale without systemd on unknown system.
:return:
'''
with pytest.raises(CommandExecutionError) as err:
localemod.set_locale('de_DE.utf8')
assert 'Unsupported platform' in six.text_type(err)
@patch('salt.utils.locales.normalize_locale', MagicMock(return_value='en_US.UTF-8 UTF-8'))
@patch('salt.modules.localemod.__salt__', {'locale.list_avail': MagicMock(return_value=['A', 'B'])})
def test_avail(self):
'''
Test for Check if a locale is available
'''
with patch('salt.utils.locales.normalize_locale',
MagicMock(return_value='en_US.UTF-8 UTF-8')):
with patch.dict(localemod.__salt__,
{'locale.list_avail':
MagicMock(return_value=['A', 'B'])}):
self.assertTrue(localemod.avail('locale'))
assert localemod.avail('locale')
@patch('salt.modules.localemod.log', MagicMock())
@patch('salt.utils.path.which', MagicMock(return_value='/some/dir/path'))
@patch('salt.modules.localemod.__grains__', {'os': 'Debian'})
@patch('salt.modules.localemod.__salt__', {'file.search': MagicMock(return_value=False)})
def test_gen_locale_not_valid(self):
'''
Tests the return of gen_locale when the provided locale is not found
'''
with patch.dict(localemod.__grains__, {'os': 'Debian'}), \
patch('salt.utils.path.which', MagicMock(return_value='/some/dir/path')), \
patch.dict(localemod.__salt__,
{'file.search': MagicMock(return_value=False)}):
self.assertFalse(localemod.gen_locale('foo'))
assert not localemod.gen_locale('foo')
assert localemod.log.error.called
msg = localemod.log.error.call_args[0][0] % (localemod.log.error.call_args[0][1],
localemod.log.error.call_args[0][2])
assert msg == 'The provided locale "foo" is not found in /usr/share/i18n/SUPPORTED'
@patch('salt.modules.localemod.log', MagicMock())
@patch('salt.modules.localemod.__grains__', {'os_family': 'Suse'})
@patch('os.listdir', MagicMock(return_value=[]))
@patch('salt.utils.locales.join_locale', MagicMock(return_value='en_GB.utf8'))
def test_gen_locale_suse_invalid(self):
'''
Tests the location where gen_locale is searching for generated paths.
:return:
'''
assert not localemod.gen_locale('de_DE.utf8')
assert localemod.log.error.called
msg = localemod.log.error.call_args[0][0] % (localemod.log.error.call_args[0][1],
localemod.log.error.call_args[0][2])
assert localemod.os.listdir.call_args[0][0] == '/usr/share/locale'
assert msg == 'The provided locale "en_GB.utf8" is not found in /usr/share/locale'
@patch('salt.modules.localemod.log', MagicMock())
@patch('salt.modules.localemod.__grains__', {'os_family': 'Suse'})
@patch('salt.modules.localemod.__salt__', {'cmd.run_all': MagicMock(return_value={'retcode': 0})})
@patch('os.listdir', MagicMock(return_value=['de_DE']))
@patch('os.path.exists', MagicMock(return_value=False))
@patch('salt.utils.locales.join_locale', MagicMock(return_value='de_DE.utf8'))
@patch('salt.utils.path.which', MagicMock(side_effect=[None, '/usr/bin/localedef']))
def test_gen_locale_suse_valid(self):
'''
Tests the location where gen_locale is calling localedef on Suse os-family.
:return:
'''
localemod.gen_locale('de_DE.utf8')
assert localemod.__salt__['cmd.run_all'].call_args[0][0] == ['localedef', '--force', '-i', 'de_DE',
'-f', 'utf8', 'de_DE.utf8', '--quiet']
@patch('salt.modules.localemod.log', MagicMock())
@patch('salt.modules.localemod.__grains__', {'os_family': 'Suse'})
@patch('salt.modules.localemod.__salt__', {'cmd.run_all': MagicMock(return_value={'retcode': 0})})
@patch('os.listdir', MagicMock(return_value=['de_DE']))
@patch('os.path.exists', MagicMock(return_value=False))
@patch('salt.utils.locales.join_locale', MagicMock(return_value='de_DE.utf8'))
@patch('salt.utils.path.which', MagicMock(return_value=None))
def test_gen_locale_suse_localedef_error_handling(self):
'''
Tests the location where gen_locale is handling error while calling not installed localedef on Suse os-family.
:return:
'''
with pytest.raises(CommandExecutionError) as err:
localemod.gen_locale('de_DE.utf8')
assert 'Command "locale-gen" or "localedef" was not found on this system.' in six.text_type(err)
def test_gen_locale_debian(self):
'''
@ -129,29 +519,20 @@ class LocalemodTestCase(TestCase, LoaderModuleMockMixin):
{'file.search': MagicMock(return_value=True),
'file.replace': MagicMock(return_value=True),
'cmd.run_all': MagicMock(return_value=ret)}):
self.assertTrue(localemod.gen_locale('en_US.UTF-8 UTF-8'))
assert localemod.gen_locale('en_US.UTF-8 UTF-8')
def test_gen_locale_debian_no_charmap(self):
'''
Tests the return of successful gen_locale on Debian system without a charmap
'''
def file_search(search, pattern, flags):
'''
mock file.search
'''
if len(pattern.split()) == 1:
return False
else: # charmap was supplied
return True
ret = {'stdout': 'saltines', 'stderr': 'biscuits', 'retcode': 0, 'pid': 1337}
with patch.dict(localemod.__grains__, {'os': 'Debian'}), \
patch('salt.utils.path.which', MagicMock(return_value='/some/dir/path')), \
patch.dict(localemod.__salt__,
{'file.search': file_search,
{'file.search': lambda s, p, flags: not len(p.split()) == 1,
'file.replace': MagicMock(return_value=True),
'cmd.run_all': MagicMock(return_value=ret)}):
self.assertTrue(localemod.gen_locale('en_US.UTF-8'))
assert localemod.gen_locale('en_US.UTF-8')
def test_gen_locale_ubuntu(self):
'''
@ -166,7 +547,7 @@ class LocalemodTestCase(TestCase, LoaderModuleMockMixin):
patch('salt.utils.path.which', MagicMock(return_value='/some/dir/path')), \
patch('os.listdir', MagicMock(return_value=['en_US'])), \
patch.dict(localemod.__grains__, {'os': 'Ubuntu'}):
self.assertTrue(localemod.gen_locale('en_US.UTF-8'))
assert localemod.gen_locale('en_US.UTF-8')
def test_gen_locale_gentoo(self):
'''
@ -180,7 +561,7 @@ class LocalemodTestCase(TestCase, LoaderModuleMockMixin):
{'file.search': MagicMock(return_value=True),
'file.replace': MagicMock(return_value=True),
'cmd.run_all': MagicMock(return_value=ret)}):
self.assertTrue(localemod.gen_locale('en_US.UTF-8 UTF-8'))
assert localemod.gen_locale('en_US.UTF-8 UTF-8')
def test_gen_locale_gentoo_no_charmap(self):
'''
@ -203,7 +584,7 @@ class LocalemodTestCase(TestCase, LoaderModuleMockMixin):
{'file.search': file_search,
'file.replace': MagicMock(return_value=True),
'cmd.run_all': MagicMock(return_value=ret)}):
self.assertTrue(localemod.gen_locale('en_US.UTF-8'))
assert localemod.gen_locale('en_US.UTF-8')
def test_gen_locale(self):
'''
@ -215,7 +596,7 @@ class LocalemodTestCase(TestCase, LoaderModuleMockMixin):
'file.replace': MagicMock()}), \
patch('salt.utils.path.which', MagicMock(return_value='/some/dir/path')), \
patch('os.listdir', MagicMock(return_value=['en_US'])):
self.assertTrue(localemod.gen_locale('en_US.UTF-8'))
assert localemod.gen_locale('en_US.UTF-8')
def test_gen_locale_verbose(self):
'''
@ -227,7 +608,7 @@ class LocalemodTestCase(TestCase, LoaderModuleMockMixin):
'file.replace': MagicMock()}), \
patch('salt.utils.path.which', MagicMock(return_value='/some/dir/path')), \
patch('os.listdir', MagicMock(return_value=['en_US'])):
self.assertEqual(localemod.gen_locale('en_US.UTF-8', verbose=True), ret)
assert localemod.gen_locale('en_US.UTF-8', verbose=True) == ret
def test_parse_localectl(self):
localectl_out = (' System Locale: LANG=en_US.UTF-8\n'
@ -235,5 +616,5 @@ class LocalemodTestCase(TestCase, LoaderModuleMockMixin):
' VC Keymap: n/a')
mock_cmd = Mock(return_value=localectl_out)
with patch.dict(localemod.__salt__, {'cmd.run': mock_cmd}):
ret = localemod._parse_localectl()
self.assertEqual({'LANG': 'en_US.UTF-8', 'LANGUAGE': 'en_US:en'}, ret)
ret = localemod._localectl_status()['system_locale']
assert {'LANG': 'en_US.UTF-8', 'LANGUAGE': 'en_US:en'} == ret

View file

@ -5,6 +5,7 @@ Test module for syslog_ng
# Import Python modules
from __future__ import absolute_import, unicode_literals, print_function
import os
from textwrap import dedent
# Import Salt Testing libs
@ -13,7 +14,6 @@ from tests.support.unit import skipIf, TestCase
from tests.support.mock import NO_MOCK, NO_MOCK_REASON, MagicMock, patch
# Import Salt libs
import salt.utils.path
import salt.modules.syslog_ng as syslog_ng
_VERSION = "3.6.0alpha0"
@ -58,6 +58,12 @@ _SYSLOG_NG_CTL_NOT_INSTALLED_RETURN_VALUE = {
@skipIf(NO_MOCK, NO_MOCK_REASON)
class SyslogNGTestCase(TestCase, LoaderModuleMockMixin):
# pylint: disable=blacklisted-function
orig_env = {str('PATH'): str('/foo:/bar')}
bin_dir = str('/baz')
mocked_env = {str('PATH'): str('/foo:/bar:/baz')}
# pylint: enable=blacklisted-function
def setup_loader_modules(self):
return {syslog_ng: {}}
@ -199,83 +205,139 @@ class SyslogNGTestCase(TestCase, LoaderModuleMockMixin):
'''), b)
def test_version(self):
mock_return_value = {"retcode": 0, 'stdout': VERSION_OUTPUT}
expected_output = {"retcode": 0, "stdout": "3.6.0alpha0"}
mock_args = "syslog-ng -V"
self._assert_template(mock_args,
mock_return_value,
function_to_call=syslog_ng.version,
expected_output=expected_output)
cmd_ret = {'retcode': 0, 'stdout': VERSION_OUTPUT}
expected_output = {'retcode': 0, 'stdout': _VERSION}
cmd_args = ['syslog-ng', '-V']
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
result = syslog_ng.version()
self.assertEqual(result, expected_output)
cmd_mock.assert_called_once_with(
cmd_args,
env=None,
python_shell=False
)
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
result = syslog_ng.version(syslog_ng_sbin_dir=self.bin_dir)
self.assertEqual(result, expected_output)
cmd_mock.assert_called_once_with(
cmd_args,
env=self.mocked_env,
python_shell=False
)
def test_stats(self):
mock_return_value = {"retcode": 0, 'stdout': STATS_OUTPUT}
expected_output = {"retcode": 0, "stdout": STATS_OUTPUT}
mock_args = "syslog-ng-ctl stats"
self._assert_template(mock_args,
mock_return_value,
function_to_call=syslog_ng.stats,
expected_output=expected_output)
cmd_ret = {'retcode': 0, 'stdout': STATS_OUTPUT}
cmd_args = ['syslog-ng-ctl', 'stats']
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
result = syslog_ng.stats()
self.assertEqual(result, cmd_ret)
cmd_mock.assert_called_once_with(
cmd_args,
env=None,
python_shell=False
)
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
result = syslog_ng.stats(syslog_ng_sbin_dir=self.bin_dir)
self.assertEqual(result, cmd_ret)
cmd_mock.assert_called_once_with(
cmd_args,
env=self.mocked_env,
python_shell=False
)
def test_modules(self):
mock_return_value = {"retcode": 0, 'stdout': VERSION_OUTPUT}
expected_output = {"retcode": 0, "stdout": _MODULES}
mock_args = "syslog-ng -V"
self._assert_template(mock_args,
mock_return_value,
function_to_call=syslog_ng.modules,
expected_output=expected_output)
cmd_ret = {'retcode': 0, 'stdout': VERSION_OUTPUT}
expected_output = {'retcode': 0, 'stdout': _MODULES}
cmd_args = ['syslog-ng', '-V']
def test_config_test_ok(self):
mock_return_value = {"retcode": 0, "stderr": "", "stdout": "Syslog-ng startup text..."}
mock_args = "syslog-ng --syntax-only"
self._assert_template(mock_args,
mock_return_value,
function_to_call=syslog_ng.config_test,
expected_output=mock_return_value)
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
result = syslog_ng.modules()
self.assertEqual(result, expected_output)
cmd_mock.assert_called_once_with(
cmd_args,
env=None,
python_shell=False
)
def test_config_test_fails(self):
mock_return_value = {"retcode": 1, 'stderr': "Syntax error...", "stdout": ""}
mock_args = "syslog-ng --syntax-only"
self._assert_template(mock_args,
mock_return_value,
function_to_call=syslog_ng.config_test,
expected_output=mock_return_value)
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
result = syslog_ng.modules(syslog_ng_sbin_dir=self.bin_dir)
self.assertEqual(result, expected_output)
cmd_mock.assert_called_once_with(
cmd_args,
env=self.mocked_env,
python_shell=False
)
def test_config_test(self):
cmd_ret = {'retcode': 0, 'stderr': '', 'stdout': 'Foo'}
cmd_args = ['syslog-ng', '--syntax-only']
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
result = syslog_ng.config_test()
self.assertEqual(result, cmd_ret)
cmd_mock.assert_called_once_with(
cmd_args,
env=None,
python_shell=False
)
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
result = syslog_ng.config_test(syslog_ng_sbin_dir=self.bin_dir)
self.assertEqual(result, cmd_ret)
cmd_mock.assert_called_once_with(
cmd_args,
env=self.mocked_env,
python_shell=False
)
def test_config_test_cfgfile(self):
cfgfile = "/path/to/syslog-ng.conf"
mock_return_value = {"retcode": 1, 'stderr': "Syntax error...", "stdout": ""}
mock_args = "syslog-ng --syntax-only --cfgfile={0}".format(cfgfile)
self._assert_template(mock_args,
mock_return_value,
function_to_call=syslog_ng.config_test,
function_args={"cfgfile": cfgfile},
expected_output=mock_return_value)
cfgfile = '/path/to/syslog-ng.conf'
cmd_ret = {'retcode': 1, 'stderr': 'Syntax error...', 'stdout': ''}
cmd_args = ['syslog-ng', '--syntax-only',
'--cfgfile={0}'.format(cfgfile)]
def _assert_template(self,
mock_function_args,
mock_return_value,
function_to_call,
expected_output,
function_args=None):
if function_args is None:
function_args = {}
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
self.assertEqual(syslog_ng.config_test(cfgfile=cfgfile), cmd_ret)
cmd_mock.assert_called_once_with(
cmd_args,
env=None,
python_shell=False
)
installed = True
if not salt.utils.path.which("syslog-ng"):
installed = False
if "syslog-ng-ctl" in mock_function_args:
expected_output = _SYSLOG_NG_CTL_NOT_INSTALLED_RETURN_VALUE
else:
expected_output = _SYSLOG_NG_NOT_INSTALLED_RETURN_VALUE
mock_function = MagicMock(return_value=mock_return_value)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': mock_function}):
got = function_to_call(**function_args)
self.assertEqual(expected_output, got)
if installed:
self.assertTrue(mock_function.called)
self.assertEqual(len(mock_function.call_args), 2)
mock_param = mock_function.call_args
self.assertEqual(mock_param[0][0], mock_function_args.split())
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
self.assertEqual(
syslog_ng.config_test(
syslog_ng_sbin_dir=self.bin_dir,
cfgfile=cfgfile
),
cmd_ret
)
cmd_mock.assert_called_once_with(
cmd_args,
env=self.mocked_env,
python_shell=False
)

View file

@ -5,6 +5,7 @@
# Import Python Libs
from __future__ import absolute_import, unicode_literals, print_function
import os
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
@ -18,6 +19,7 @@ from tests.support.mock import (
# Import Salt Libs
import salt.modules.win_path as win_path
import salt.utils.stringutils
class MockWin32API(object):
@ -58,54 +60,223 @@ class WinPathTestCase(TestCase, LoaderModuleMockMixin):
'HWND_BROADCAST': MagicMock,
'WM_SETTINGCHANGE': MagicMock}}
def __init__(self, *args, **kwargs):
super(WinPathTestCase, self).__init__(*args, **kwargs)
self.pathsep = str(';') # future lint: disable=blacklisted-function
def assert_call_matches(self, mock_obj, new_path):
mock_obj.assert_called_once_with(
win_path.HIVE,
win_path.KEY,
win_path.VNAME,
self.pathsep.join(new_path),
win_path.VTYPE
)
def assert_path_matches(self, env, new_path):
self.assertEqual(
env['PATH'],
salt.utils.stringutils.to_str(self.pathsep.join(new_path))
)
def test_rehash(self):
'''
Test to rehash the Environment variables
Test to rehash the Environment variables
'''
self.assertTrue(win_path.rehash())
def test_get_path(self):
'''
Test to Returns the system path
Test to Returns the system path
'''
mock = MagicMock(return_value={'vdata': 'c:\\salt'})
mock = MagicMock(return_value={'vdata': 'C:\\Salt'})
with patch.dict(win_path.__salt__, {'reg.read_value': mock}):
self.assertListEqual(win_path.get_path(), ['c:\\salt'])
self.assertListEqual(win_path.get_path(), ['C:\\Salt'])
def test_exists(self):
'''
Test to check if the directory is configured
Test to check if the directory is configured
'''
mock = MagicMock(return_value='c:\\salt')
with patch.object(win_path, 'get_path', mock):
self.assertTrue(win_path.exists("c:\\salt"))
get_mock = MagicMock(return_value=['C:\\Foo', 'C:\\Bar'])
with patch.object(win_path, 'get_path', get_mock):
# Ensure case insensitivity respected
self.assertTrue(win_path.exists('C:\\FOO'))
self.assertTrue(win_path.exists('c:\\foo'))
self.assertFalse(win_path.exists('c:\\mystuff'))
def test_add(self):
'''
Test to add the directory to the SYSTEM path
Test to add the directory to the SYSTEM path
'''
mock_get = MagicMock(return_value=['c:\\salt'])
with patch.object(win_path, 'get_path', mock_get):
mock_set = MagicMock(return_value=True)
with patch.dict(win_path.__salt__, {'reg.set_value': mock_set}):
mock_rehash = MagicMock(side_effect=[True, False])
with patch.object(win_path, 'rehash', mock_rehash):
self.assertTrue(win_path.add("c:\\salt", 1))
orig_path = ('C:\\Foo', 'C:\\Bar')
self.assertFalse(win_path.add("c:\\salt", 1))
def _env(path):
return {
str('PATH'): salt.utils.stringutils.to_str( # future lint: disable=blacklisted-function
self.pathsep.join(path)
)
}
def _run(name, index=None, retval=True, path=None):
if path is None:
path = orig_path
env = _env(path)
mock_get = MagicMock(return_value=list(path))
mock_set = MagicMock(return_value=retval)
with patch.object(win_path, 'PATHSEP', self.pathsep), \
patch.object(win_path, 'get_path', mock_get), \
patch.object(os, 'environ', env), \
patch.dict(win_path.__salt__, {'reg.set_value': mock_set}), \
patch.object(win_path, 'rehash', MagicMock(return_value=True)):
return win_path.add(name, index), env, mock_set
# Test a successful reg update
ret, env, mock_set = _run('c:\\salt', retval=True)
new_path = ('C:\\Foo', 'C:\\Bar', 'c:\\salt')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
# Test an unsuccessful reg update
ret, env, mock_set = _run('c:\\salt', retval=False)
new_path = ('C:\\Foo', 'C:\\Bar', 'c:\\salt')
self.assertFalse(ret)
self.assert_call_matches(mock_set, new_path)
# The local path should still have been modified even
# though reg.set_value failed.
self.assert_path_matches(env, new_path)
# Test adding with a custom index
ret, env, mock_set = _run('c:\\salt', index=1, retval=True)
new_path = ('C:\\Foo', 'c:\\salt', 'C:\\Bar')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
# Test adding path with a case-insensitive match already present, and
# no index provided. The path should remain unchanged and we should not
# update the registry.
ret, env, mock_set = _run('c:\\foo', retval=True)
self.assertTrue(ret)
mock_set.assert_not_called()
self.assert_path_matches(env, orig_path)
# Test adding path with a case-insensitive match already present, and a
# negative index provided which does not match the current index. The
# match should be removed, and the path should be added to the end of
# the list.
ret, env, mock_set = _run('c:\\foo', index=-1, retval=True)
new_path = ('C:\\Bar', 'c:\\foo')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
# Test adding path with a case-insensitive match already present, and a
# negative index provided which matches the current index. No changes
# should be made.
ret, env, mock_set = _run('c:\\foo', index=-2, retval=True)
self.assertTrue(ret)
mock_set.assert_not_called()
self.assert_path_matches(env, orig_path)
# Test adding path with a case-insensitive match already present, and a
# negative index provided which is larger than the size of the list. No
# changes should be made, since in these cases we assume an index of 0,
# and the case-insensitive match is also at index 0.
ret, env, mock_set = _run('c:\\foo', index=-5, retval=True)
self.assertTrue(ret)
mock_set.assert_not_called()
self.assert_path_matches(env, orig_path)
# Test adding path with a case-insensitive match already present, and a
# negative index provided which is larger than the size of the list.
# The match should be removed from its current location and inserted at
# the beginning, since when a negative index is larger than the list,
# we put it at the beginning of the list.
ret, env, mock_set = _run('c:\\bar', index=-5, retval=True)
new_path = ('c:\\bar', 'C:\\Foo')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
# Test adding path with a case-insensitive match already present, and a
# negative index provided which matches the current index. The path
# should remain unchanged and we should not update the registry.
ret, env, mock_set = _run('c:\\bar', index=-1, retval=True)
self.assertTrue(ret)
mock_set.assert_not_called()
self.assert_path_matches(env, orig_path)
# Test adding path with a case-insensitive match already present, and
# an index provided which does not match the current index, and is also
# larger than the size of the PATH list. The match should be removed,
# and the path should be added to the end of the list.
ret, env, mock_set = _run('c:\\foo', index=5, retval=True)
new_path = ('C:\\Bar', 'c:\\foo')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
def test_remove(self):
'''
Test to remove the directory from the SYSTEM path
Test win_path.remove
'''
mock_get = MagicMock(side_effect=[[1], ['c:\\salt'], ['c:\\salt']])
with patch.object(win_path, 'get_path', mock_get):
self.assertTrue(win_path.remove("c:\\salt"))
orig_path = ('C:\\Foo', 'C:\\Bar', 'C:\\Baz')
mock_set = MagicMock(side_effect=[True, False])
with patch.dict(win_path.__salt__, {'reg.set_value': mock_set}):
mock_rehash = MagicMock(return_value="Salt")
with patch.object(win_path, 'rehash', mock_rehash):
self.assertEqual(win_path.remove("c:\\salt"), "Salt")
def _env(path):
return {
str('PATH'): salt.utils.stringutils.to_str( # future lint: disable=blacklisted-function
self.pathsep.join(path)
)
}
self.assertFalse(win_path.remove("c:\\salt"))
def _run(name='c:\\salt', index=None, retval=True, path=None):
if path is None:
path = orig_path
env = _env(path)
mock_get = MagicMock(return_value=list(path))
mock_set = MagicMock(return_value=retval)
with patch.object(win_path, 'PATHSEP', self.pathsep), \
patch.object(win_path, 'get_path', mock_get), \
patch.object(os, 'environ', env), \
patch.dict(win_path.__salt__, {'reg.set_value': mock_set}), \
patch.object(win_path, 'rehash', MagicMock(return_value=True)):
return win_path.remove(name), env, mock_set
# Test a successful reg update
ret, env, mock_set = _run('C:\\Bar', retval=True)
new_path = ('C:\\Foo', 'C:\\Baz')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
# Test a successful reg update with a case-insensitive match
ret, env, mock_set = _run('c:\\bar', retval=True)
new_path = ('C:\\Foo', 'C:\\Baz')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
# Test a successful reg update with multiple case-insensitive matches.
# All matches should be removed.
old_path = orig_path + ('C:\\BAR',)
ret, env, mock_set = _run('c:\\bar', retval=True)
new_path = ('C:\\Foo', 'C:\\Baz')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
# Test an unsuccessful reg update
ret, env, mock_set = _run('c:\\bar', retval=False)
new_path = ('C:\\Foo', 'C:\\Baz')
self.assertFalse(ret)
self.assert_call_matches(mock_set, new_path)
# The local path should still have been modified even
# though reg.set_value failed.
self.assert_path_matches(env, new_path)
# Test when no match found
ret, env, mock_set = _run('C:\\NotThere', retval=True)
self.assertTrue(ret)
mock_set.assert_not_called()
self.assert_path_matches(env, orig_path)

View file

@ -1,15 +1,17 @@
# -*- coding: utf-8 -*-
'''
:codeauthor: :email:`Rahul Handay <rahulha@saltstack.com>`
Tests for win_path states
'''
# Import Python Libs
from __future__ import absolute_import, unicode_literals, print_function
from __future__ import absolute_import, print_function, unicode_literals
import copy
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.unit import TestCase, skipIf
from tests.support.mock import (
Mock,
MagicMock,
patch,
NO_MOCK,
@ -19,64 +21,512 @@ from tests.support.mock import (
# Import Salt Libs
import salt.states.win_path as win_path
NAME = 'salt'
@skipIf(NO_MOCK, NO_MOCK_REASON)
class WinPathTestCase(TestCase, LoaderModuleMockMixin):
'''
Validate the win_path state
Validate the win_path state
'''
def setup_loader_modules(self):
return {win_path: {}}
def test_absent(self):
'''
Test to remove the directory from the SYSTEM path
Test various cases for win_path.absent
'''
ret = {'name': 'salt',
'changes': {},
'result': None,
'comment': ''}
mock = MagicMock(return_value=False)
with patch.dict(win_path.__salt__, {"win_path.exists": mock}):
with patch.dict(win_path.__opts__, {"test": True}):
ret.update({'comment': 'salt is not in the PATH'})
self.assertDictEqual(win_path.absent('salt'), ret)
ret_base = {'name': NAME, 'result': True, 'changes': {}}
with patch.dict(win_path.__opts__, {"test": False}):
mock = MagicMock(return_value=True)
with patch.dict(win_path.__salt__, {"win_path.remove": mock}):
ret.update({'result': True})
self.assertDictEqual(win_path.absent('salt'), ret)
def _mock(retval):
# Return a new MagicMock for each test case
return MagicMock(side_effect=retval)
def test_exists(self):
# We don't really want to run the remove func
with patch.dict(win_path.__salt__, {'win_path.remove': Mock()}):
# Test mode OFF
with patch.dict(win_path.__opts__, {'test': False}):
# Test already absent
with patch.dict(win_path.__salt__, {'win_path.exists': _mock([False])}):
ret = copy.deepcopy(ret_base)
ret['comment'] = '{0} is not in the PATH'.format(NAME)
ret['result'] = True
self.assertDictEqual(win_path.absent(NAME), ret)
# Test successful removal
with patch.dict(win_path.__salt__, {'win_path.exists': _mock([True, False])}):
ret = copy.deepcopy(ret_base)
ret['comment'] = 'Removed {0} from the PATH'.format(NAME)
ret['changes']['removed'] = NAME
ret['result'] = True
self.assertDictEqual(win_path.absent(NAME), ret)
# Test unsucessful removal
with patch.dict(win_path.__salt__, {'win_path.exists': _mock([True, True])}):
ret = copy.deepcopy(ret_base)
ret['comment'] = 'Failed to remove {0} from the PATH'.format(NAME)
ret['result'] = False
self.assertDictEqual(win_path.absent(NAME), ret)
# Test mode ON
with patch.dict(win_path.__opts__, {'test': True}):
# Test already absent
with patch.dict(win_path.__salt__, {'win_path.exists': _mock([False])}):
ret = copy.deepcopy(ret_base)
ret['comment'] = '{0} is not in the PATH'.format(NAME)
ret['result'] = True
self.assertDictEqual(win_path.absent(NAME), ret)
# Test the test-mode return
with patch.dict(win_path.__salt__, {'win_path.exists': _mock([True])}):
ret = copy.deepcopy(ret_base)
ret['comment'] = '{0} would be removed from the PATH'.format(NAME)
ret['result'] = None
self.assertDictEqual(win_path.absent(NAME), ret)
def test_exists_invalid_index(self):
'''
Test to add the directory to the system PATH at index location
Tests win_path.exists when a non-integer index is specified.
'''
ret = {'name': 'salt',
'changes': {},
'result': True,
'comment': ''}
mock = MagicMock(return_value=['Salt', 'Saltdude'])
with patch.dict(win_path.__salt__, {"win_path.get_path": mock}):
mock = MagicMock(side_effect=['Saltdude', 'Saltdude', '/Saltdude',
'Saltdude'])
with patch.object(win_path, '_normalize_dir', mock):
ret.update({'comment': 'salt is already present in the'
' PATH at the right location'})
self.assertDictEqual(win_path.exists('salt', 1), ret)
ret = win_path.exists(NAME, index='foo')
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {},
'result': False,
'comment': 'Index must be an integer'
}
)
self.assertDictEqual(win_path.exists('salt'), ret)
def test_exists_add_no_index_success(self):
'''
Tests win_path.exists when the directory isn't already in the PATH and
no index is specified (successful run).
'''
add_mock = MagicMock(return_value=True)
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', 'baz'],
['foo', 'bar', 'baz', NAME]
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__opts__, {"test": True}):
ret.update({'comment': '', 'result': None,
'changes': {'added': 'salt will be'
' added at index 2'}})
self.assertDictEqual(win_path.exists('salt'), ret)
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME)
with patch.dict(win_path.__opts__, {"test": False}):
mock = MagicMock(return_value=False)
with patch.dict(win_path.__salt__, {"win_path.add": mock}):
ret.update({'comment': 'salt is already present in the'
' PATH at the right location',
'result': True, 'changes': {}})
self.assertDictEqual(win_path.exists('salt'), ret)
add_mock.assert_called_once_with(NAME, index=None, rehash=False)
self.assert_called_once(rehash_mock)
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {'index': {'old': None, 'new': 3}},
'result': True,
'comment': 'Added {0} to the PATH.'.format(NAME)
}
)
def test_exists_add_no_index_failure(self):
'''
Tests win_path.exists when the directory isn't already in the PATH and
no index is specified (failed run).
'''
add_mock = MagicMock(return_value=False)
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', 'baz'],
['foo', 'bar', 'baz']
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME)
add_mock.assert_called_once_with(NAME, index=None, rehash=False)
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {},
'result': False,
'comment': 'Failed to add {0} to the PATH.'.format(NAME)
}
)
def test_exists_add_no_index_failure_exception(self):
'''
Tests win_path.exists when the directory isn't already in the PATH and
no index is specified (failed run due to exception).
'''
add_mock = MagicMock(side_effect=Exception('Global Thermonuclear War'))
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', 'baz'],
['foo', 'bar', 'baz']
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME)
add_mock.assert_called_once_with(NAME, index=None, rehash=False)
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {},
'result': False,
'comment': 'Encountered error: Global Thermonuclear War. '
'Failed to add {0} to the PATH.'.format(NAME)
}
)
def test_exists_change_index_success(self):
'''
Tests win_path.exists when the directory is already in the PATH and
needs to be moved to a different position (successful run).
'''
add_mock = MagicMock(return_value=True)
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', 'baz', NAME],
[NAME, 'foo', 'bar', 'baz']
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=0)
add_mock.assert_called_once_with(NAME, index=0, rehash=False)
self.assert_called_once(rehash_mock)
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {'index': {'old': 3, 'new': 0}},
'result': True,
'comment': 'Moved {0} from index 3 to 0.'.format(NAME)
}
)
def test_exists_change_negative_index_success(self):
'''
Tests win_path.exists when the directory is already in the PATH and
needs to be moved to a different position (successful run).
This tests a negative index.
'''
add_mock = MagicMock(return_value=True)
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', NAME, 'baz'],
['foo', 'bar', 'baz', NAME]
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=-1)
add_mock.assert_called_once_with(NAME, index=-1, rehash=False)
self.assert_called_once(rehash_mock)
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {'index': {'old': -2, 'new': -1}},
'result': True,
'comment': 'Moved {0} from index -2 to -1.'.format(NAME)
}
)
def test_exists_change_index_add_exception(self):
'''
Tests win_path.exists when the directory is already in the PATH but an
exception is raised when we attempt to add the key to its new location.
'''
add_mock = MagicMock(side_effect=Exception('Global Thermonuclear War'))
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', 'baz', NAME],
['foo', 'bar', 'baz', NAME],
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=0)
add_mock.assert_called_once_with(NAME, index=0, rehash=False)
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {},
'result': False,
'comment': 'Encountered error: Global Thermonuclear War. '
'Failed to move {0} from index 3 to 0.'.format(NAME)
}
)
def test_exists_change_negative_index_add_exception(self):
'''
Tests win_path.exists when the directory is already in the PATH but an
exception is raised when we attempt to add the key to its new location.
This tests a negative index.
'''
add_mock = MagicMock(side_effect=Exception('Global Thermonuclear War'))
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', NAME, 'baz'],
['foo', 'bar', NAME, 'baz'],
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=-1)
add_mock.assert_called_once_with(NAME, index=-1, rehash=False)
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {},
'result': False,
'comment': 'Encountered error: Global Thermonuclear War. '
'Failed to move {0} from index -2 to -1.'.format(NAME)
}
)
def test_exists_change_index_failure(self):
'''
Tests win_path.exists when the directory is already in the PATH and
needs to be moved to a different position (failed run).
'''
add_mock = MagicMock(return_value=False)
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', 'baz', NAME],
['foo', 'bar', 'baz', NAME]
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=0)
add_mock.assert_called_once_with(NAME, index=0, rehash=False)
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {},
'result': False,
'comment': 'Failed to move {0} from index 3 to 0.'.format(NAME)
}
)
def test_exists_change_negative_index_failure(self):
'''
Tests win_path.exists when the directory is already in the PATH and
needs to be moved to a different position (failed run).
This tests a negative index.
'''
add_mock = MagicMock(return_value=False)
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', NAME, 'baz'],
['foo', 'bar', NAME, 'baz']
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=-1)
add_mock.assert_called_once_with(NAME, index=-1, rehash=False)
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {},
'result': False,
'comment': 'Failed to move {0} from index -2 to -1.'.format(NAME)
}
)
def test_exists_change_index_test_mode(self):
'''
Tests win_path.exists when the directory is already in the PATH and
needs to be moved to a different position (test mode enabled).
'''
add_mock = Mock()
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', 'baz', NAME],
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': True}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=0)
add_mock.assert_not_called()
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {'index': {'old': 3, 'new': 0}},
'result': None,
'comment': '{0} would be moved from index 3 to 0.'.format(NAME)
}
)
def test_exists_change_negative_index_test_mode(self):
'''
Tests win_path.exists when the directory is already in the PATH and
needs to be moved to a different position (test mode enabled).
'''
add_mock = Mock()
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', NAME, 'baz'],
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': True}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=-1)
add_mock.assert_not_called()
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {'index': {'old': -2, 'new': -1}},
'result': None,
'comment': '{0} would be moved from index -2 to -1.'.format(NAME)
}
)
def _test_exists_add_already_present(self, index, test_mode):
'''
Tests win_path.exists when the directory already exists in the PATH.
Helper function to test both with and without and index, and with test
mode both disabled and enabled.
'''
current_path = ['foo', 'bar', 'baz']
if index is None:
current_path.append(NAME)
else:
pos = index if index >= 0 else len(current_path) + index + 1
current_path.insert(pos, NAME)
add_mock = Mock()
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[current_path]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': test_mode}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=index)
add_mock.assert_not_called()
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {},
'result': True,
'comment': '{0} already exists in the PATH{1}.'.format(
NAME,
' at index {0}'.format(index) if index is not None else ''
)
}
)
def test_exists_add_no_index_already_present(self):
self._test_exists_add_already_present(None, False)
def test_exists_add_no_index_already_present_test_mode(self):
self._test_exists_add_already_present(None, True)
def test_exists_add_index_already_present(self):
self._test_exists_add_already_present(1, False)
self._test_exists_add_already_present(2, False)
self._test_exists_add_already_present(-1, False)
self._test_exists_add_already_present(-2, False)
def test_exists_add_index_already_present_test_mode(self):
self._test_exists_add_already_present(1, True)
self._test_exists_add_already_present(2, True)
self._test_exists_add_already_present(-1, True)
self._test_exists_add_already_present(-2, True)

View file

@ -9,6 +9,7 @@ import logging
# Import Salt libs
import salt.utils.data
import salt.utils.data
from salt.utils.odict import OrderedDict
from tests.support.unit import TestCase, skipIf, LOREM_IPSUM
from tests.support.mock import patch, NO_MOCK, NO_MOCK_REASON
@ -16,6 +17,8 @@ from salt.ext.six.moves import builtins # pylint: disable=import-error,redefine
log = logging.getLogger(__name__)
_b = lambda x: x.encode('utf-8')
# Some randomized data that will not decode
BYTES = b'\x9c\xb1\xf7\xa3'
class DataTestCase(TestCase):
@ -27,14 +30,17 @@ class DataTestCase(TestCase):
True,
False,
None,
[123, 456.789, _b('спам'), True, False, None],
(987, 654.321, _b('яйца'), None, (True, False)),
BYTES,
[123, 456.789, _b('спам'), True, False, None, BYTES],
(987, 654.321, _b('яйца'), None, (True, False, BYTES)),
{_b('str_key'): _b('str_val'),
None: True, 123: 456.789,
None: True,
123: 456.789,
'blob': BYTES,
_b('subdict'): {'unicode_key': 'unicode_val',
_b('tuple'): (123, 'hello', _b('world'), True),
_b('list'): [456, _b('спам'), False]}},
OrderedDict([(_b('foo'), 'bar'), (123, 456)])
_b('tuple'): (123, 'hello', _b('world'), True, BYTES),
_b('list'): [456, _b('спам'), False, BYTES]}},
OrderedDict([(_b('foo'), 'bar'), (123, 456), ('blob', BYTES)])
]
def test_sorted_ignorecase(self):
@ -214,28 +220,49 @@ class DataTestCase(TestCase):
True,
False,
None,
[123, 456.789, 'спам', True, False, None],
(987, 654.321, 'яйца', None, (True, False)),
BYTES,
[123, 456.789, 'спам', True, False, None, BYTES],
(987, 654.321, 'яйца', None, (True, False, BYTES)),
{'str_key': 'str_val',
None: True, 123: 456.789,
None: True,
123: 456.789,
'blob': BYTES,
'subdict': {'unicode_key': 'unicode_val',
'tuple': (123, 'hello', 'world', True),
'list': [456, 'спам', False]}},
OrderedDict([('foo', 'bar'), (123, 456)])
'tuple': (123, 'hello', 'world', True, BYTES),
'list': [456, 'спам', False, BYTES]}},
OrderedDict([('foo', 'bar'), (123, 456), ('blob', BYTES)])
]
ret = salt.utils.data.decode(
self.test_data, encoding='utf-8', preserve_dict_class=True,
self.test_data,
encoding='utf-8',
keep=True,
preserve_dict_class=True,
preserve_tuples=True)
self.assertEqual(ret, expected)
# The binary data in the data structure should fail to decode, even
# using the fallback, and raise an exception.
self.assertRaises(
UnicodeDecodeError,
salt.utils.data.decode,
self.test_data,
encoding='utf-8',
keep=False,
preserve_dict_class=True,
preserve_tuples=True)
# Now munge the expected data so that we get what we would expect if we
# disable preservation of dict class and tuples
expected[8] = [987, 654.321, 'яйца', None, [True, False]]
expected[9]['subdict']['tuple'] = [123, 'hello', 'world', True]
expected[10] = {'foo': 'bar', 123: 456}
expected[9] = [987, 654.321, 'яйца', None, [True, False, BYTES]]
expected[10]['subdict']['tuple'] = [123, 'hello', 'world', True, BYTES]
expected[11] = {'foo': 'bar', 123: 456, 'blob': BYTES}
ret = salt.utils.data.decode(
self.test_data, encoding='utf-8', preserve_dict_class=False,
self.test_data,
encoding='utf-8',
keep=True,
preserve_dict_class=False,
preserve_tuples=False)
self.assertEqual(ret, expected)
@ -249,6 +276,14 @@ class DataTestCase(TestCase):
self.assertEqual(salt.utils.data.decode('foo'), 'foo')
self.assertEqual(salt.utils.data.decode(_b('bar')), 'bar')
# Test binary blob
self.assertEqual(salt.utils.data.decode(BYTES, keep=True), BYTES)
self.assertRaises(
UnicodeDecodeError,
salt.utils.data.decode,
BYTES,
keep=False)
@skipIf(NO_MOCK, NO_MOCK_REASON)
def test_decode_fallback(self):
'''
@ -270,27 +305,52 @@ class DataTestCase(TestCase):
True,
False,
None,
[123, 456.789, _b('спам'), True, False, None],
(987, 654.321, _b('яйца'), None, (True, False)),
BYTES,
[123, 456.789, _b('спам'), True, False, None, BYTES],
(987, 654.321, _b('яйца'), None, (True, False, BYTES)),
{_b('str_key'): _b('str_val'),
None: True, 123: 456.789,
None: True,
123: 456.789,
_b('blob'): BYTES,
_b('subdict'): {_b('unicode_key'): _b('unicode_val'),
_b('tuple'): (123, _b('hello'), _b('world'), True),
_b('list'): [456, _b('спам'), False]}},
OrderedDict([(_b('foo'), _b('bar')), (123, 456)])
_b('tuple'): (123, _b('hello'), _b('world'), True, BYTES),
_b('list'): [456, _b('спам'), False, BYTES]}},
OrderedDict([(_b('foo'), _b('bar')), (123, 456), (_b('blob'), BYTES)])
]
# Both keep=True and keep=False should work because the BYTES data is
# already bytes.
ret = salt.utils.data.encode(
self.test_data, preserve_dict_class=True, preserve_tuples=True)
self.test_data,
keep=True,
preserve_dict_class=True,
preserve_tuples=True)
self.assertEqual(ret, expected)
ret = salt.utils.data.encode(
self.test_data,
keep=False,
preserve_dict_class=True,
preserve_tuples=True)
self.assertEqual(ret, expected)
# Now munge the expected data so that we get what we would expect if we
# disable preservation of dict class and tuples
expected[8] = [987, 654.321, _b('яйца'), None, [True, False]]
expected[9][_b('subdict')][_b('tuple')] = [123, _b('hello'), _b('world'), True]
expected[10] = {_b('foo'): _b('bar'), 123: 456}
expected[9] = [987, 654.321, _b('яйца'), None, [True, False, BYTES]]
expected[10][_b('subdict')][_b('tuple')] = [
123, _b('hello'), _b('world'), True, BYTES
]
expected[11] = {_b('foo'): _b('bar'), 123: 456, _b('blob'): BYTES}
ret = salt.utils.data.encode(
self.test_data, preserve_dict_class=False, preserve_tuples=False)
self.test_data,
keep=True,
preserve_dict_class=False,
preserve_tuples=False)
self.assertEqual(ret, expected)
ret = salt.utils.data.encode(
self.test_data,
keep=False,
preserve_dict_class=False,
preserve_tuples=False)
self.assertEqual(ret, expected)
# Now test single non-string, non-data-structure items, these should
@ -303,6 +363,70 @@ class DataTestCase(TestCase):
self.assertEqual(salt.utils.data.encode('foo'), _b('foo'))
self.assertEqual(salt.utils.data.encode(_b('bar')), _b('bar'))
# Test binary blob, nothing should happen even when keep=False since
# the data is already bytes
self.assertEqual(salt.utils.data.encode(BYTES, keep=True), BYTES)
self.assertEqual(salt.utils.data.encode(BYTES, keep=False), BYTES)
def test_encode_keep(self):
'''
Whereas we tested the keep argument in test_decode, it is much easier
to do a more comprehensive test of keep in its own function where we
can force the encoding.
'''
unicode_str = 'питон'
encoding = 'ascii'
# Test single string
self.assertEqual(
salt.utils.data.encode(unicode_str, encoding, keep=True),
unicode_str)
self.assertRaises(
UnicodeEncodeError,
salt.utils.data.encode,
unicode_str,
encoding,
keep=False)
data = [
unicode_str,
[b'foo', [unicode_str], {b'key': unicode_str}, (unicode_str,)],
{b'list': [b'foo', unicode_str],
b'dict': {b'key': unicode_str},
b'tuple': (b'foo', unicode_str)},
([b'foo', unicode_str], {b'key': unicode_str}, (unicode_str,))
]
# Since everything was a bytestring aside from the bogus data, the
# return data should be identical. We don't need to test recursive
# decoding, that has already been tested in test_encode.
self.assertEqual(
salt.utils.data.encode(data, encoding,
keep=True, preserve_tuples=True),
data
)
self.assertRaises(
UnicodeEncodeError,
salt.utils.data.encode,
data,
encoding,
keep=False,
preserve_tuples=True)
for index, item in enumerate(data):
self.assertEqual(
salt.utils.data.encode(data[index], encoding,
keep=True, preserve_tuples=True),
data[index]
)
self.assertRaises(
UnicodeEncodeError,
salt.utils.data.encode,
data[index],
encoding,
keep=False,
preserve_tuples=True)
@skipIf(NO_MOCK, NO_MOCK_REASON)
def test_encode_fallback(self):
'''