Merge remote-tracking branch 'upstream/2015.2' into merge-forward-develop

Conflicts:
    .pylintrc
    .testing.pylintrc
    salt/auth/pam.py
    salt/modules/win_status.py
    salt/utils/network.py
    salt/utils/schedule.py
    tests/unit/renderers/gpg_test.py
    tests/unit/states/boto_secgroup.py
This commit is contained in:
Colton Myers 2015-04-29 12:01:52 -06:00
commit cca8ddc886
29 changed files with 1154 additions and 144 deletions

View file

@ -31,7 +31,8 @@ load-plugins=saltpylint.pep8,
saltpylint.strings,
saltpylint.fileperms,
saltpylint.py3modernize,
salttesting.pylintplugins.smartup
salttesting.pylintplugins.smartup,
saltpylint.pyminver
# Use multiple processes to speed up Pylint.
# Don't bump this values on PyLint 1.4.0 - Know bug that ignores the passed --rcfile
@ -53,6 +54,8 @@ fileperms-ignore-paths=tests/runtests.py,tests/jenkins*.py,tests/saltsh.py,tests
# Py3 Modernize PyLint Plugin Settings
modernize-nofix = libmodernize.fixes.fix_dict_six
# Minimum Python Version To Enforce
minimum-python-version = 2.6
[MESSAGES CONTROL]
# Only show warnings with the listed confidence levels. Leave empty to show

View file

@ -31,7 +31,8 @@ load-plugins=saltpylint.pep8,
saltpylint.strings,
saltpylint.fileperms,
saltpylint.py3modernize,
saltpylint.smartup
saltpylint.smartup,
saltpylint.minpyver
# Use multiple processes to speed up Pylint.
# Don't bump this values on PyLint 1.4.0 - Know bug that ignores the passed --rcfile
@ -53,6 +54,8 @@ fileperms-ignore-paths=tests/runtests.py,tests/jenkins*.py,tests/saltsh.py,tests
# Py3 Modernize PyLint Plugin Settings
modernize-nofix = libmodernize.fixes.fix_dict_six
# Minimum Python Version To Enforce
minimum-python-version = 2.6
[MESSAGES CONTROL]
# Only show warnings with the listed confidence levels. Leave empty to show

View file

@ -97,45 +97,9 @@ Some core components are packaged separately in the Ubuntu repositories. These
ZeroMQ 4
========
We recommend using ZeroMQ 4 where available. ZeroMQ 4 is already available for
Ubuntu 14.04 and Ubuntu 14.10 and nothing additional needs to be done. However,
the **chris-lea/zeromq** PPA can be used to provide ZeroMQ 4 on Ubuntu 12.04 LTS.
Adding this PPA can be done with a :mod:`pkgrepo.managed <salt.states.pkgrepo.managed>`
state.
.. code-block:: yaml
zeromq-ppa:
pkgrepo.managed:
- ppa: chris-lea/zeromq
The following states can be used to upgrade ZeroMQ and pyzmq, and then restart
the minion:
.. code-block:: yaml
update_zmq:
pkg.latest:
- pkgs:
- zeromq
- python-zmq
- order: last
cmd.wait:
- name: |
echo service salt-minion restart | at now + 1 minute
- watch:
- pkg: update_zmq
.. note::
This example assumes that atd is installed and running, see here_ for a more
detailed explanation.
.. _here: http://docs.saltstack.com/en/latest/faq.html#what-is-the-best-way-to-restart-a-salt-daemon-using-salt
If this repo is added *before* Salt is installed, then installing either
``salt-master`` or ``salt-minion`` will automatically pull in ZeroMQ 4.0.4, and
additional states to upgrade ZeroMQ and pyzmq are unnecessary.
ZeroMQ 4 is available by default for Ubuntu 14.04 and newer. However, for Ubuntu
12.04 LTS, starting with Salt version ``2014.7.5``, ZeroMQ 4 is included with the
Salt installation package and nothing additional needs to be done.
Post-installation tasks

View file

@ -170,6 +170,19 @@ rendered SLS file (or any errors generated while rendering the SLS file).
salt-master -l debug
3. Look for log entries in the form:
.. code-block:: text
[DEBUG ] Gathering reactors for tag foo/bar
[DEBUG ] Compiling reactions for tag foo/bar
[DEBUG ] Rendered data from file: /path/to/the/reactor_file.sls:
<... Rendered output appears here. ...>
The rendered output is the result of the Jinja parsing and is a good way to
view the result of referencing Jinja variables. If the result is empty then
Jinja produced an empty result and the Reactor will ignore it.
Understanding the Structure of Reactor Formulas
===============================================

View file

@ -8,6 +8,7 @@ Salt compatibility code
from __future__ import absolute_import
import sys
import types
import subprocess
# Import 3rd-party libs
from salt.ext.six import binary_type, string_types, text_type
@ -129,3 +130,42 @@ def string_io(data=None): # cStringIO can't handle unicode
return cStringIO(bytes(data))
except (UnicodeEncodeError, TypeError):
return StringIO(data)
if sys.version_info < (2, 7):
# Backport of Python's 2.7 subprocess methods not found in 2.6
# This code comes directly from the 2.7 subprocess module
def check_output(*popenargs, **kwargs):
r"""Run command with arguments and return its output as a byte string.
If the exit code was non-zero it raises a CalledProcessError. The
CalledProcessError object will have the return code in the returncode
attribute and output in the output attribute.
The arguments are the same as for the Popen constructor. Example:
>>> check_output(["ls", "-l", "/dev/null"])
'crw-rw-rw- 1 root root 1, 3 Oct 18 2007 /dev/null\n'
The stdout argument is not allowed as it is used internally.
To capture standard error in the result, use stderr=STDOUT.
>>> check_output(["/bin/sh", "-c",
... "ls -l non_existent_file ; exit 0"],
... stderr=STDOUT)
'ls: non_existent_file: No such file or directory\n'
"""
if 'stdout' in kwargs:
raise ValueError('stdout argument not allowed, it will be overridden.')
process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs)
output, unused_err = process.communicate()
retcode = process.poll()
if retcode:
cmd = kwargs.get("args")
if cmd is None:
cmd = popenargs[0]
raise subprocess.CalledProcessError(retcode, cmd, output=output)
return output
subprocess.check_output = check_output

View file

@ -24,12 +24,12 @@ from ctypes import CDLL, POINTER, Structure, CFUNCTYPE, cast, pointer, sizeof
from ctypes import c_void_p, c_uint, c_char_p, c_char, c_int
from ctypes.util import find_library
# Import Salt Libs
# Import Salt libs
from salt.utils import get_group_list
from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
LIBPAM = CDLL(find_library("pam"))
LIBC = CDLL(find_library("c"))
LIBPAM = CDLL(find_library('pam'))
LIBC = CDLL(find_library('c'))
CALLOC = LIBC.calloc
CALLOC.restype = c_void_p
@ -51,7 +51,7 @@ class PamHandle(Structure):
Wrapper class for pam_handle_t
'''
_fields_ = [
("handle", c_void_p)
('handle', c_void_p)
]
def __init__(self):
@ -65,11 +65,11 @@ class PamMessage(Structure):
'''
_fields_ = [
("msg_style", c_int),
("msg", POINTER(c_char)),
("msg", c_char_p),
]
def __repr__(self):
return "<PamMessage {0:d} '{1}'>".format(self.msg_style, self.msg)
return '<PamMessage {0} {1!r}>'.format(self.msg_style, self.msg)
class PamResponse(Structure):
@ -77,12 +77,13 @@ class PamResponse(Structure):
Wrapper class for pam_response structure
'''
_fields_ = [
("resp", POINTER(c_char)),
("resp_retcode", c_int),
('resp', c_char_p),
('resp_retcode', c_int),
]
def __repr__(self):
return "<PamResponse {0:d} '{1}'>".format(self.resp_retcode, self.resp)
return '<PamResponse {0} {1!r}>'.format(self.resp_retcode, self.resp)
CONV_FUNC = CFUNCTYPE(c_int,
c_int, POINTER(POINTER(PamMessage)),
@ -94,35 +95,24 @@ class PamConv(Structure):
Wrapper class for pam_conv structure
'''
_fields_ = [
("conv", CONV_FUNC),
("appdata_ptr", c_void_p)
('conv', CONV_FUNC),
('appdata_ptr', c_void_p)
]
try:
PAM_START = LIBPAM.pam_start
PAM_START.restype = c_int
PAM_START.argtypes = [c_char_p, c_char_p, POINTER(PamConv),
POINTER(PamHandle)]
PAM_END = LIBPAM.pam_end
PAM_END.restpe = c_int
PAM_END.argtypes = [PamHandle, c_int]
POINTER(PamHandle)]
PAM_AUTHENTICATE = LIBPAM.pam_authenticate
PAM_AUTHENTICATE.restype = c_int
PAM_AUTHENTICATE.argtypes = [PamHandle, c_int]
PAM_SETCRED = LIBPAM.pam_setcred
PAM_SETCRED.restype = c_int
PAM_SETCRED.argtypes = [PamHandle, c_int]
PAM_OPEN_SESSION = LIBPAM.pam_open_session
PAM_OPEN_SESSION.restype = c_int
PAM_OPEN_SESSION.argtypes = [PamHandle, c_int]
PAM_CLOSE_SESSION = LIBPAM.pam_close_session
PAM_CLOSE_SESSION.restype = c_int
PAM_CLOSE_SESSION.argtypes = [PamHandle, c_int]
PAM_END = LIBPAM.pam_end
PAM_END.restype = c_int
PAM_END.argtypes = [PamHandle, c_int]
except Exception:
HAS_PAM = False
else:
@ -160,7 +150,7 @@ def authenticate(username, password, service='login'):
for i in range(n_messages):
if messages[i].contents.msg_style == PAM_PROMPT_ECHO_OFF:
pw_copy = STRDUP(str(password))
p_response.contents[i].resp = pw_copy
p_response.contents[i].resp = cast(pw_copy, c_char_p)
p_response.contents[i].resp_retcode = 0
return 0
@ -175,26 +165,7 @@ def authenticate(username, password, service='login'):
return False
retval = PAM_AUTHENTICATE(handle, 0)
if retval != 0:
PAM_END(handle, retval)
return False
retval = PAM_SETCRED(handle, 0)
if retval != 0:
PAM_END(handle, retval)
return False
retval = PAM_OPEN_SESSION(handle, 0)
if retval != 0:
PAM_END(handle, retval)
return False
retval = PAM_CLOSE_SESSION(handle, 0)
if retval != 0:
PAM_END(handle, retval)
return False
retval = PAM_END(handle, retval)
PAM_END(handle, 0)
return retval == 0

View file

@ -508,12 +508,13 @@ def _get_tree_dulwich(repo, tgt_env):
return None
def _clean_stale(repo_obj, local_refs=None):
def _clean_stale(repo, local_refs=None):
'''
Clean stale local refs so they don't appear as fileserver environments
'''
provider = _get_provider()
cleaned = []
repo_obj = repo['repo']
if provider == 'gitpython':
for ref in repo_obj.remotes[0].stale_refs:
if ref.name.startswith('refs/tags/'):
@ -524,6 +525,15 @@ def _clean_stale(repo_obj, local_refs=None):
ref.delete(repo_obj, ref)
cleaned.append(ref)
elif provider == 'pygit2':
if isinstance(repo.get('credentials'),
(pygit2.Keypair, pygit2.UserPass)):
log.debug(
'pygit2 does not support detecting stale refs for '
'authenticated remotes, saltenvs will not reflect '
'branches/tags removed from remote repository {0}'
.format(repo['url'])
)
return []
if local_refs is None:
local_refs = repo_obj.listall_references()
remote_refs = []
@ -1141,7 +1151,7 @@ def update():
fetch_results = origin.fetch()
except AssertionError:
fetch_results = origin.fetch()
cleaned = _clean_stale(repo['repo'])
cleaned = _clean_stale(repo)
if fetch_results or cleaned:
data['changed'] = True
elif provider == 'pygit2':
@ -1186,7 +1196,7 @@ def update():
)
# Clean up any stale refs
refs_post = repo['repo'].listall_references()
cleaned = _clean_stale(repo['repo'], refs_post)
cleaned = _clean_stale(repo, refs_post)
if received_objects or refs_pre != refs_post or cleaned:
data['changed'] = True
elif provider == 'dulwich':
@ -1740,7 +1750,9 @@ def _file_list_gitpython(repo, tgt_env):
tree = tree / repo['root']
except KeyError:
return files, symlinks
relpath = lambda path: os.path.relpath(path, repo['root'])
relpath = lambda path: os.path.relpath(path, repo['root'])
else:
relpath = lambda path: path
add_mountpoint = lambda path: os.path.join(repo['mountpoint'], path)
for file_blob in tree.traverse():
if not isinstance(file_blob, git.Blob):
@ -1796,10 +1808,12 @@ def _file_list_pygit2(repo, tgt_env):
return files, symlinks
if not isinstance(tree, pygit2.Tree):
return files, symlinks
relpath = lambda path: os.path.relpath(path, repo['root'])
else:
relpath = lambda path: path
blobs = {}
if len(tree):
_traverse(tree, repo['repo'], blobs, repo['root'])
relpath = lambda path: os.path.relpath(path, repo['root'])
add_mountpoint = lambda path: os.path.join(repo['mountpoint'], path)
for repo_path in blobs.get('files', []):
files.add(add_mountpoint(relpath(repo_path)))
@ -1842,7 +1856,10 @@ def _file_list_dulwich(repo, tgt_env):
blobs = {}
if len(tree):
_traverse(tree, repo['repo'], blobs, repo['root'])
relpath = lambda path: os.path.relpath(path, repo['root'])
if repo['root']:
relpath = lambda path: os.path.relpath(path, repo['root'])
else:
relpath = lambda path: path
add_mountpoint = lambda path: os.path.join(repo['mountpoint'], path)
for repo_path in blobs.get('files', []):
files.add(add_mountpoint(relpath(repo_path)))
@ -1913,7 +1930,9 @@ def _dir_list_gitpython(repo, tgt_env):
tree = tree / repo['root']
except KeyError:
return ret
relpath = lambda path: os.path.relpath(path, repo['root'])
relpath = lambda path: os.path.relpath(path, repo['root'])
else:
relpath = lambda path: path
add_mountpoint = lambda path: os.path.join(repo['mountpoint'], path)
for blob in tree.traverse():
if isinstance(blob, git.Tree):
@ -1956,10 +1975,12 @@ def _dir_list_pygit2(repo, tgt_env):
return ret
if not isinstance(tree, pygit2.Tree):
return ret
relpath = lambda path: os.path.relpath(path, repo['root'])
else:
relpath = lambda path: path
blobs = []
if len(tree):
_traverse(tree, repo['repo'], blobs, repo['root'])
relpath = lambda path: os.path.relpath(path, repo['root'])
add_mountpoint = lambda path: os.path.join(repo['mountpoint'], path)
for blob in blobs:
ret.add(add_mountpoint(relpath(blob)))
@ -1997,7 +2018,10 @@ def _dir_list_dulwich(repo, tgt_env):
blobs = []
if len(tree):
_traverse(tree, repo['repo'], blobs, repo['root'])
relpath = lambda path: os.path.relpath(path, repo['root'])
if repo['root']:
relpath = lambda path: os.path.relpath(path, repo['root'])
else:
relpath = lambda path: path
add_mountpoint = lambda path: os.path.join(repo['mountpoint'], path)
for blob in blobs:
ret.add(add_mountpoint(relpath(blob)))

View file

@ -825,8 +825,13 @@ class LazyLoader(salt.utils.lazy.LazyDict):
self.file_mapping = {}
for mod_dir in self.module_dirs:
files = []
try:
for filename in os.listdir(mod_dir):
files = os.listdir(mod_dir)
except OSError:
continue
for filename in files:
try:
if filename.startswith('_'):
# skip private modules
# log messages omitted for obviousness
@ -864,8 +869,8 @@ class LazyLoader(salt.utils.lazy.LazyDict):
curr_ext = self.file_mapping[f_noext][1]
if suffix_order.index(ext) < suffix_order.index(curr_ext):
self.file_mapping[f_noext] = (fpath, ext)
except OSError:
continue
except OSError:
continue
def clear(self):
'''

View file

@ -2,16 +2,16 @@
'''
The jail module for FreeBSD
'''
from __future__ import absolute_import
# Import python libs
from __future__ import absolute_import
import os
import subprocess
import shlex
import re
# Import salt libs
import salt.utils
from salt._compat import subprocess
# Define the module's virtual name
__virtualname__ = 'jail'
@ -121,7 +121,7 @@ def show_config(jail):
'''
ret = {}
if subprocess.call(["jls", "-nq", "-j", jail]) == 0:
jls = subprocess.check_output(["jls", "-nq", "-j", jail])
jls = subprocess.check_output(["jls", "-nq", "-j", jail]) # pylint: disable=minimum-python-version
jailopts = shlex.split(jls)
for jailopt in jailopts:
if '=' not in jailopt:

View file

@ -826,7 +826,7 @@ def _parser():
parser = optparse.OptionParser()
add_arg = parser.add_option
else:
import argparse
import argparse # pylint: disable=minimum-python-version
parser = argparse.ArgumentParser()
add_arg = parser.add_argument

View file

@ -2568,7 +2568,6 @@ def set_dns(name, dnsservers=None, searchdomains=None):
salt myminion lxc.set_dns ubuntu "['8.8.8.8', '4.4.4.4']"
'''
ret = {'result': False}
if dnsservers is None:
dnsservers = ['8.8.8.8', '4.4.4.4']
elif not isinstance(dnsservers, list):
@ -2590,10 +2589,47 @@ def set_dns(name, dnsservers=None, searchdomains=None):
dns = ['nameserver {0}'.format(x) for x in dnsservers]
dns.extend(['search {0}'.format(x) for x in searchdomains])
dns = '\n'.join(dns) + '\n'
result = run_all(name,
'tee /etc/resolv.conf',
stdin=dns,
python_shell=False)
# we may be using resolvconf in the container
# We need to handle that case with care:
# - we create the resolv.conf runtime directory (the
# linked directory) as anyway it will be shadowed when the real
# runned tmpfs mountpoint will be mounted.
# ( /etc/resolv.conf -> ../run/resolvconf/resolv.conf)
# Indeed, it can save us in any other case (running, eg, in a
# bare chroot when repairing or preparing the container for
# operation.
# - We also teach resolvconf to use the aforementioned dns.
# - We finally also set /etc/resolv.conf in all cases
rstr = __salt__['test.rand_str']()
# no tmp here, apparmor wont let us execute !
script = '/sbin/{0}_dns.sh'.format(rstr)
DNS_SCRIPT = "\n".join([
# 'set -x',
'if [ -h /etc/resolv.conf ];then',
' if [ "x$(readlink /etc/resolv.conf)"'
' = "x../run/resolvconf/resolv.conf" ];then',
' if [ ! -d /run/resolvconf/ ];then',
' mkdir -p /run/resolvconf',
' fi',
' cat > /etc/resolvconf/resolv.conf.d/head <<EOF',
dns,
'EOF',
'',
' fi',
'fi',
'cat > /etc/resolv.conf <<EOF',
dns,
'EOF',
''])
result = run_all(
name, 'tee {0}'.format(script), stdin=DNS_SCRIPT, python_shell=True)
if result['retcode'] == 0:
result = run_all(
name, 'sh -c "chmod +x {0};{0}"'.format(script), python_shell=True)
# blindly delete the setter file
run_all(name,
'if [ -f "{0}" ];then rm -f "{0}";fi'.format(script),
python_shell=True)
if result['retcode'] != 0:
error = ('Unable to write to /etc/resolv.conf in container \'{0}\''
.format(name))

View file

@ -9,13 +9,16 @@ or for problem solving if your minion is having problems.
:depends: - pythoncom
- wmi
'''
from __future__ import absolute_import
import subprocess
# Import Python Libs
from __future__ import absolute_import
import logging
# Import Salt Libs
import salt.utils
import salt.ext.six as six
import salt.utils.event
from salt._compat import subprocess
from salt.utils.network import host_to_ip as _host_to_ip
import os
@ -336,7 +339,7 @@ def master(master=None, connected=True):
'''
remotes = set()
try:
data = subprocess.check_output(['netstat', '-n', '-p', 'TCP'])
data = subprocess.check_output(['netstat', '-n', '-p', 'TCP']) # pylint: disable=minimum-python-version
except subprocess.CalledProcessError:
log.error('Failed netstat')
raise

View file

@ -26,7 +26,7 @@ from salt.ext.six.moves import range
from salt.ext.six.moves import map
try:
import importlib
import importlib # pylint: disable=minimum-python-version
HAS_IMPORTLIB = True
except ImportError:
# Python < 2.7 does not have importlib

View file

@ -289,7 +289,7 @@ import salt.utils
# Only used when called from a terminal
log = None
if __name__ == '__main__':
import argparse
import argparse # pylint: disable=minimum-python-version
parser = argparse.ArgumentParser()
parser.add_argument('hostname', help='Hostname')

View file

@ -121,10 +121,10 @@ def prep_jid(nocache=False, passed_jid=None):
if passed_jid is None:
return prep_jid(nocache=nocache)
with salt.utils.fopen(os.path.join(jid_dir_, 'jid'), 'w+') as fn_:
with salt.utils.fopen(os.path.join(jid_dir_, 'jid'), 'wb+') as fn_:
fn_.write(jid)
if nocache:
with salt.utils.fopen(os.path.join(jid_dir_, 'nocache'), 'w+') as fn_:
with salt.utils.fopen(os.path.join(jid_dir_, 'nocache'), 'wb+') as fn_:
fn_.write('')
return jid
@ -305,7 +305,7 @@ def clean_old_jobs():
# No jid file means corrupted cache entry, scrub it
shutil.rmtree(f_path)
else:
with salt.utils.fopen(jid_file, 'r') as fn_:
with salt.utils.fopen(jid_file, 'rb') as fn_:
jid = fn_.read()
if len(jid) < 18:
# Invalid jid, scrub the dir

View file

@ -124,9 +124,15 @@ sls = orchestrate # pylint: disable=invalid-name
def orchestrate_single(fun, name, test=None, queue=False, pillar=None, **kwargs):
'''
versionadded:: 2015.2
Execute a single state orchestration routine
.. versionadded:: 2015.2.0
CLI Example:
.. code-block:: bash
salt-run state.orchestrate_single fun=salt.wheel name=key.list_all
'''
if pillar is not None and not isinstance(pillar, dict):
raise SaltInvocationError(
@ -148,9 +154,22 @@ def orchestrate_single(fun, name, test=None, queue=False, pillar=None, **kwargs)
def orchestrate_high(data, test=None, queue=False, pillar=None, **kwargs):
'''
versionadded:: 2015.2
Execute a single state orchestration routine
.. versionadded:: 2015.2.0
CLI Example:
.. code-block:: bash
salt-run state.orchestrate_high '{
stage_one:
{salt.state: [{tgt: "db*"}, {sls: postgres_setup}]},
stage_two:
{salt.state: [{tgt: "web*"}, {sls: apache_setup}, {
require: [{salt: stage_one}],
}]},
}'
'''
if pillar is not None and not isinstance(pillar, dict):
raise SaltInvocationError(

View file

@ -2291,7 +2291,7 @@ class BaseHighState(object):
envs = set(['base'])
if 'file_roots' in self.opts:
envs.update(list(self.opts['file_roots']))
return envs
return envs.union(set(self.client.envs()))
def get_tops(self):
'''

View file

@ -390,8 +390,9 @@ def volume_attached(name, server_name, provider=None, **kwargs):
if name in volumes and volumes[name]['attachments']:
volume = volumes[name]
ret['comment'] = ('Volume {name} is already'
'attached: {attachments}').format(**volumes[name])
ret['comment'] = (
'Volume {name} is already attached: {attachments}'
).format(**volumes[name])
ret['result'] = True
return ret
elif name not in volumes:

View file

@ -245,6 +245,9 @@ def state(
m_ret = False
if 'return' in mdata and 'ret' not in mdata:
mdata['ret'] = mdata.pop('return')
if mdata.get('failed', False):
m_state = False
else:

View file

@ -25,6 +25,7 @@ except ImportError:
# Import salt libs
import salt.utils
from salt._compat import subprocess
log = logging.getLogger(__name__)
@ -1059,7 +1060,7 @@ def _sunos_remotes_on(port, which_end):
'''
remotes = set()
try:
data = subprocess.check_output(['netstat', '-f', 'inet', '-n'])
data = subprocess.check_output(['netstat', '-f', 'inet', '-n']) # pylint: disable=minimum-python-version
except subprocess.CalledProcessError:
log.error('Failed netstat')
raise
@ -1105,7 +1106,7 @@ def _freebsd_remotes_on(port, which_end):
try:
cmd = shlex.split('sockstat -4 -c -p {0}'.format(port))
data = subprocess.check_output(cmd)
data = subprocess.check_output(cmd) # pylint: disable=minimum-python-version
except subprocess.CalledProcessError as ex:
log.error('Failed "sockstat" with returncode = {0}'.format(ex.returncode))
raise
@ -1201,7 +1202,7 @@ def remotes_on_local_tcp_port(port):
return _windows_remotes_on(port, 'local_port')
try:
data = subprocess.check_output(['lsof', '-i4TCP:{0:d}'.format(port), '-n'])
data = subprocess.check_output(['lsof', '-i4TCP:{0:d}'.format(port), '-n']) # pylint: disable=minimum-python-version
except subprocess.CalledProcessError as ex:
log.error('Failed "lsof" with returncode = {0}'.format(ex.returncode))
raise
@ -1254,7 +1255,7 @@ def remotes_on_remote_tcp_port(port):
return _windows_remotes_on(port, 'remote_port')
try:
data = subprocess.check_output(['lsof', '-i4TCP:{0:d}'.format(port), '-n'])
data = subprocess.check_output(['lsof', '-i4TCP:{0:d}'.format(port), '-n']) # pylint: disable=minimum-python-version
except subprocess.CalledProcessError as ex:
log.error('Failed "lsof" with returncode = {0}'.format(ex.returncode))
raise

View file

@ -28,7 +28,7 @@ from collections import Callable
import salt.ext.six as six
try:
from collections import OrderedDict # pylint: disable=E0611
from collections import OrderedDict # pylint: disable=E0611,minimum-python-version
except ImportError:
try:
from ordereddict import OrderedDict

View file

@ -341,7 +341,7 @@ class Schedule(object):
'minion.d',
'_schedule.conf')
try:
with salt.utils.fopen(schedule_conf, 'w+') as fp_:
with salt.utils.fopen(schedule_conf, 'wb+') as fp_:
fp_.write(yaml.dump({'schedule': self.opts['schedule']}))
except (IOError, OSError):
log.error('Failed to persist the updated schedule')

View file

@ -298,7 +298,7 @@ def check_user(user):
pwuser = pwd.getpwnam(user)
try:
if hasattr(os, 'initgroups'):
os.initgroups(user, pwuser.pw_gid)
os.initgroups(user, pwuser.pw_gid) # pylint: disable=minimum-python-version
else:
os.setgroups(salt.utils.get_gid_list(user, include_default=False))
os.setgid(pwuser.pw_gid)

View file

@ -40,7 +40,7 @@ import socket
from struct import unpack
import pcapy
import sys
import argparse
import argparse # pylint: disable=minimum-python-version
import time

View file

@ -0,0 +1,206 @@
# -*- coding: utf-8 -*-
'''
:codeauthor: :email:`Jayesh Kariya <jayeshk@saltstack.com>`
'''
# Import Python libs
from __future__ import absolute_import
# Import Salt Testing Libs
from salttesting import skipIf, TestCase
from salttesting.mock import (
NO_MOCK,
NO_MOCK_REASON,
MagicMock,
patch)
from salttesting.helpers import ensure_in_syspath
ensure_in_syspath('../../')
# Import Salt Libs
from salt.modules import rh_ip
import jinja2.exceptions
import os
# Globals
rh_ip.__grains__ = {}
rh_ip.__salt__ = {}
@skipIf(NO_MOCK, NO_MOCK_REASON)
class RhipTestCase(TestCase):
'''
Test cases for salt.modules.rh_ip
'''
def test_build_bond(self):
'''
Test to create a bond script in /etc/modprobe.d with the passed
settings and load the bonding kernel module.
'''
with patch.dict(rh_ip.__grains__, {'osrelease': 'osrelease'}):
with patch.object(rh_ip, '_parse_settings_bond', MagicMock()):
mock = jinja2.exceptions.TemplateNotFound('foo')
with patch.object(jinja2.Environment, 'get_template',
MagicMock(side_effect=mock)):
self.assertEqual(rh_ip.build_bond('iface'), '')
with patch.dict(rh_ip.__salt__, {'kmod.load':
MagicMock(return_value=None)}):
with patch.object(rh_ip, '_read_temp', return_value='A'):
self.assertEqual(rh_ip.build_bond('iface', test='A'),
'A')
with patch.object(rh_ip, '_read_file', return_value='A'):
self.assertEqual(rh_ip.build_bond('iface', test=None),
'A')
def test_build_interface(self):
'''
Test to build an interface script for a network interface.
'''
with patch.dict(rh_ip.__grains__, {'os': 'Fedora'}):
with patch.object(rh_ip, '_raise_error_iface', return_value=None):
self.assertRaises(AttributeError,
rh_ip.build_interface,
'iface', 'slave', True)
with patch.object(rh_ip, '_parse_settings_bond', MagicMock()):
mock = jinja2.exceptions.TemplateNotFound('foo')
with patch.object(jinja2.Environment,
'get_template',
MagicMock(side_effect=mock)):
self.assertEqual(rh_ip.build_interface('iface',
'vlan',
True), '')
with patch.object(rh_ip, '_read_temp', return_value='A'):
with patch.object(jinja2.Environment,
'get_template', MagicMock()):
self.assertEqual(rh_ip.build_interface('iface',
'vlan',
True,
test='A'),
'A')
with patch.object(rh_ip, '_write_file_iface',
return_value=None):
with patch.object(os.path, 'join',
return_value='A'):
with patch.object(rh_ip, '_read_file',
return_value='A'):
self.assertEqual(rh_ip.build_interface
('iface', 'vlan',
True), 'A')
def test_build_routes(self):
'''
Test to build a route script for a network interface.
'''
with patch.object(rh_ip, '_parse_routes', MagicMock()):
mock = jinja2.exceptions.TemplateNotFound('foo')
with patch.object(jinja2.Environment,
'get_template', MagicMock(side_effect=mock)):
self.assertEqual(rh_ip.build_routes('iface'), '')
with patch.object(jinja2.Environment,
'get_template', MagicMock()):
with patch.object(rh_ip, '_read_temp', return_value='A'):
self.assertEqual(rh_ip.build_routes('i', test='t'), 'A')
with patch.object(rh_ip, '_read_file', return_value='A'):
with patch.object(os.path, 'join', return_value='A'):
with patch.object(rh_ip, '_write_file_iface',
return_value=None):
self.assertEqual(rh_ip.build_routes('i',
test=None),
'A')
def test_down(self):
'''
Test to shutdown a network interface
'''
with patch.dict(rh_ip.__salt__, {'cmd.run':
MagicMock(return_value='A')}):
self.assertEqual(rh_ip.down('iface', 'iface_type'), 'A')
self.assertEqual(rh_ip.down('iface', 'slave'), None)
def test_get_bond(self):
'''
Test to return the content of a bond script
'''
with patch.object(os.path, 'join', return_value='A'):
with patch.object(rh_ip, '_read_file', return_value='A'):
self.assertEqual(rh_ip.get_bond('iface'), 'A')
def test_get_interface(self):
'''
Test to return the contents of an interface script
'''
with patch.object(os.path, 'join', return_value='A'):
with patch.object(rh_ip, '_read_file', return_value='A'):
self.assertEqual(rh_ip.get_interface('iface'), 'A')
def test_up(self):
'''
Test to start up a network interface
'''
with patch.dict(rh_ip.__salt__, {'cmd.run':
MagicMock(return_value='A')}):
self.assertEqual(rh_ip.up('iface', 'iface_type'), 'A')
self.assertEqual(rh_ip.up('iface', 'slave'), None)
def test_get_routes(self):
'''
Test to return the contents of the interface routes script.
'''
with patch.object(os.path, 'join', return_value='A'):
with patch.object(rh_ip, '_read_file', return_value='A'):
self.assertEqual(rh_ip.get_routes('iface'), 'A')
def test_get_network_settings(self):
'''
Test to return the contents of the global network script.
'''
with patch.object(rh_ip, '_read_file', return_value='A'):
self.assertEqual(rh_ip.get_network_settings(), 'A')
def test_apply_network_settings(self):
'''
Test to apply global network configuration.
'''
with patch.dict(rh_ip.__salt__, {'service.restart':
MagicMock(return_value=True)}):
self.assertTrue(rh_ip.apply_network_settings())
def test_build_network_settings(self):
'''
Test to build the global network script.
'''
with patch.object(rh_ip, '_parse_rh_config', MagicMock()):
with patch.object(rh_ip, '_parse_network_settings', MagicMock()):
mock = jinja2.exceptions.TemplateNotFound('foo')
with patch.object(jinja2.Environment,
'get_template', MagicMock(side_effect=mock)):
self.assertEqual(rh_ip.build_network_settings(), '')
with patch.object(jinja2.Environment,
'get_template', MagicMock()):
with patch.object(rh_ip, '_read_temp', return_value='A'):
self.assertEqual(rh_ip.build_network_settings
(test='t'), 'A')
with patch.object(rh_ip, '_write_file_network',
return_value=None):
with patch.object(rh_ip, '_read_file',
return_value='A'):
self.assertEqual(rh_ip.build_network_settings
(test=None), 'A')
if __name__ == '__main__':
from integration import run_tests
run_tests(RhipTestCase, needs_daemon=False)

View file

@ -17,15 +17,10 @@ import salt.loader
import salt.config
import salt.utils
from integration import TMP, ModuleCase
from salt.utils.odict import OrderedDict
import copy
try:
from collections import OrderedDict
OD_AVAILABLE = True
except ImportError:
OD_AVAILABLE = False
GPG_KEYDIR = os.path.join(TMP, 'gpg-keydir')
# The keyring library uses `getcwd()`, let's make sure we in a good directory
@ -84,7 +79,6 @@ class GPGTestCase(ModuleCase):
decrypted_data_mock.__str__ = lambda x: DECRYPTED_STRING
return decrypted_data_mock
@skipIf(not OD_AVAILABLE, 'OrderedDict not available. Skipping.')
def make_nested_object(self, s):
return OrderedDict([
('array_key', [1, False, s]),

View file

@ -2,13 +2,16 @@
# import Python Libs
from __future__ import absolute_import
from collections import OrderedDict
# Import Salt Testing Libs
from salttesting.case import TestCase
from salttesting.helpers import ensure_in_syspath
ensure_in_syspath('../../')
# Import Salt Libs
from salt.states import boto_secgroup
# Import Salt Testing Libs
from salttesting import TestCase
from salt.utils.odict import OrderedDict
class Boto_SecgroupTestCase(TestCase):

View file

@ -0,0 +1,390 @@
# -*- coding: utf-8 -*-
'''
:codeauthor: :email:`Jayesh Kariya <jayeshk@saltstack.com>`
'''
# Import Python libs
from __future__ import absolute_import
# Import Salt Testing Libs
from salttesting import skipIf, TestCase
from salttesting.mock import (
NO_MOCK,
NO_MOCK_REASON,
MagicMock,
patch)
from salttesting.helpers import ensure_in_syspath
ensure_in_syspath('../../')
# Import Salt Libs
from salt.states import cloud
import salt.utils.cloud
cloud.__salt__ = {}
cloud.__opts__ = {}
@skipIf(NO_MOCK, NO_MOCK_REASON)
class CloudTestCase(TestCase):
'''
Test cases for salt.states.cloud
'''
# 'present' function tests: 1
def test_present(self):
'''
Test to spin up a single instance on a cloud provider, using salt-cloud.
'''
name = 'mycloud'
cloud_provider = 'my_cloud_provider'
ret = {'name': name,
'result': True,
'changes': {},
'comment': ''}
mock = MagicMock(side_effect=[True, False])
mock_bool = MagicMock(side_effect=[True, False, False])
mock_dict = MagicMock(return_value={'cloud': 'saltcloud'})
with patch.dict(cloud.__salt__, {'cmd.retcode': mock,
'cloud.has_instance': mock_bool,
'cloud.create': mock_dict}):
comt = ('onlyif execution failed')
ret.update({'comment': comt})
self.assertDictEqual(cloud.present(name, cloud_provider,
onlyif=False), ret)
self.assertDictEqual(cloud.present(name, cloud_provider, onlyif=''),
ret)
comt = ('unless execution succeeded')
ret.update({'comment': comt})
self.assertDictEqual(cloud.present(name, cloud_provider,
unless=True), ret)
self.assertDictEqual(cloud.present(name, cloud_provider, unless=''),
ret)
comt = ('Already present instance {0}'.format(name))
ret.update({'comment': comt})
self.assertDictEqual(cloud.present(name, cloud_provider), ret)
with patch.dict(cloud.__opts__, {'test': True}):
comt = ('Instance {0} needs to be created'.format(name))
ret.update({'comment': comt, 'result': None})
self.assertDictEqual(cloud.present(name, cloud_provider), ret)
with patch.dict(cloud.__opts__, {'test': False}):
comt = ('Created instance mycloud using provider '
'my_cloud_provider and the following options: {}')
ret.update({'comment': comt, 'result': True,
'changes': {'cloud': 'saltcloud'}})
self.assertDictEqual(cloud.present(name, cloud_provider), ret)
# 'absent' function tests: 1
def test_absent(self):
'''
Test to ensure that no instances with the specified names exist.
'''
name = 'mycloud'
ret = {'name': name,
'result': True,
'changes': {},
'comment': ''}
mock = MagicMock(side_effect=[True, False])
mock_bool = MagicMock(side_effect=[False, True, True])
mock_dict = MagicMock(return_value={'cloud': 'saltcloud'})
with patch.dict(cloud.__salt__, {'cmd.retcode': mock,
'cloud.has_instance': mock_bool,
'cloud.destroy': mock_dict}):
comt = ('onlyif execution failed')
ret.update({'comment': comt})
self.assertDictEqual(cloud.absent(name, onlyif=False), ret)
self.assertDictEqual(cloud.absent(name, onlyif=''), ret)
comt = ('unless execution succeeded')
ret.update({'comment': comt})
self.assertDictEqual(cloud.absent(name, unless=True), ret)
self.assertDictEqual(cloud.absent(name, unless=''), ret)
comt = ('Already absent instance {0}'.format(name))
ret.update({'comment': comt})
self.assertDictEqual(cloud.absent(name), ret)
with patch.dict(cloud.__opts__, {'test': True}):
comt = ('Instance {0} needs to be destroyed'.format(name))
ret.update({'comment': comt, 'result': None})
self.assertDictEqual(cloud.absent(name), ret)
with patch.dict(cloud.__opts__, {'test': False}):
comt = (('Destroyed instance {0}').format(name))
ret.update({'comment': comt, 'result': True,
'changes': {'cloud': 'saltcloud'}})
self.assertDictEqual(cloud.absent(name), ret)
# 'profile' function tests: 1
def test_profile(self):
'''
Test to create a single instance on a cloud provider,
using a salt-cloud profile.
'''
name = 'mycloud'
profile = 'myprofile'
ret = {'name': name,
'result': True,
'changes': {},
'comment': ''}
mock = MagicMock(side_effect=[True, False])
mock_dict = MagicMock(side_effect=[{'cloud': 'saltcloud'},
{'Not Actioned': True},
{'Not Actioned': True}])
mock_d = MagicMock(return_value={})
with patch.dict(cloud.__salt__, {'cmd.retcode': mock,
'cloud.profile': mock_d,
'cloud.action': mock_dict}):
comt = ('onlyif execution failed')
ret.update({'comment': comt})
self.assertDictEqual(cloud.profile(name, profile, onlyif=False),
ret)
self.assertDictEqual(cloud.profile(name, profile, onlyif=''), ret)
comt = ('unless execution succeeded')
ret.update({'comment': comt})
self.assertDictEqual(cloud.profile(name, profile, unless=True), ret)
self.assertDictEqual(cloud.profile(name, profile, unless=''), ret)
comt = ('Already present instance {0}'.format(name))
ret.update({'comment': comt})
self.assertDictEqual(cloud.profile(name, profile), ret)
with patch.dict(cloud.__opts__, {'test': True}):
comt = ('Instance {0} needs to be created'.format(name))
ret.update({'comment': comt, 'result': None})
self.assertDictEqual(cloud.profile(name, profile), ret)
with patch.dict(cloud.__opts__, {'test': False}):
comt = (('Failed to create instance {0}'
'using profile {1}').format(name, profile))
ret.update({'comment': comt, 'result': False})
self.assertDictEqual(cloud.profile(name, profile), ret)
# 'volume_present' function tests: 1
def test_volume_present(self):
'''
Test to check that a block volume exists.
'''
name = 'mycloud'
ret = {'name': name,
'result': False,
'changes': {},
'comment': ''}
mock = MagicMock(return_value=name)
mock_lst = MagicMock(side_effect=[[name], [], []])
with patch.dict(cloud.__salt__, {'cloud.volume_list': mock_lst,
'cloud.volume_create': mock}):
with patch.object(salt.utils.cloud, 'check_name',
MagicMock(return_value=True)):
comt = ('Invalid characters in name.')
ret.update({'comment': comt})
self.assertDictEqual(cloud.volume_present(name), ret)
comt = ('Volume exists: {0}'.format(name))
ret.update({'comment': comt, 'result': True})
self.assertDictEqual(cloud.volume_present(name), ret)
with patch.dict(cloud.__opts__, {'test': True}):
comt = ('Volume {0} will be created.'.format(name))
ret.update({'comment': comt, 'result': None})
self.assertDictEqual(cloud.volume_present(name), ret)
with patch.dict(cloud.__opts__, {'test': False}):
comt = ('Volume {0} was created'.format(name))
ret.update({'comment': comt, 'result': True,
'changes': {'old': None, 'new': name}})
self.assertDictEqual(cloud.volume_present(name), ret)
# 'volume_absent' function tests: 1
def test_volume_absent(self):
'''
Test to check that a block volume exists.
'''
name = 'mycloud'
ret = {'name': name,
'result': False,
'changes': {},
'comment': ''}
mock = MagicMock(return_value=False)
mock_lst = MagicMock(side_effect=[[], [name], [name]])
with patch.dict(cloud.__salt__, {'cloud.volume_list': mock_lst,
'cloud.volume_delete': mock}):
with patch.object(salt.utils.cloud, 'check_name',
MagicMock(return_value=True)):
comt = ('Invalid characters in name.')
ret.update({'comment': comt})
self.assertDictEqual(cloud.volume_absent(name), ret)
comt = ('Volume is absent.')
ret.update({'comment': comt, 'result': True})
self.assertDictEqual(cloud.volume_absent(name), ret)
with patch.dict(cloud.__opts__, {'test': True}):
comt = ('Volume {0} will be deleted.'.format(name))
ret.update({'comment': comt, 'result': None})
self.assertDictEqual(cloud.volume_absent(name), ret)
with patch.dict(cloud.__opts__, {'test': False}):
comt = ('Volume {0} failed to delete.'.format(name))
ret.update({'comment': comt, 'result': False})
self.assertDictEqual(cloud.volume_absent(name), ret)
# 'volume_attached' function tests: 1
def test_volume_attached(self):
'''
Test to check if a block volume is attached.
'''
name = 'mycloud'
server_name = 'mycloud_server'
disk_name = 'trogdor'
ret = {'name': name,
'result': False,
'changes': {},
'comment': ''}
mock = MagicMock(return_value=False)
mock_dict = MagicMock(side_effect=[{name: {'name': disk_name, 'attachments': True}}, {},
{name: {'name': disk_name, 'attachments': False}},
{name: {'name': disk_name, 'attachments': False}},
{name: {'name': disk_name, 'attachments': False}}])
with patch.dict(cloud.__salt__, {'cloud.volume_list': mock_dict,
'cloud.action': mock}):
with patch.object(salt.utils.cloud, 'check_name',
MagicMock(side_effect=[True, False, True])):
comt = ('Invalid characters in name.')
ret.update({'comment': comt})
self.assertDictEqual(cloud.volume_attached(name, server_name),
ret)
ret.update({'name': server_name})
self.assertDictEqual(cloud.volume_attached(name, server_name),
ret)
comt = ('Volume {0} is already attached: True'.format(disk_name))
ret.update({'comment': comt, 'result': True})
self.assertDictEqual(cloud.volume_attached(name, server_name), ret)
comt = ('Volume {0} does not exist'.format(name))
ret.update({'comment': comt, 'result': False})
self.assertDictEqual(cloud.volume_attached(name, server_name), ret)
comt = ('Server {0} does not exist'.format(server_name))
ret.update({'comment': comt, 'result': False})
self.assertDictEqual(cloud.volume_attached(name, server_name), ret)
mock = MagicMock(return_value=True)
with patch.dict(cloud.__salt__, {'cloud.action': mock,
'cloud.volume_attach': mock}):
with patch.dict(cloud.__opts__, {'test': True}):
comt = ('Volume {0} will be will be attached.'.format(name))
ret.update({'comment': comt, 'result': None})
self.assertDictEqual(cloud.volume_attached(name,
server_name),
ret)
with patch.dict(cloud.__opts__, {'test': False}):
comt = ('Volume {0} was created'.format(name))
ret.update({'comment': comt, 'result': True,
'changes': {'new': True,
'old': {'name': disk_name,
'attachments': False}}})
self.assertDictEqual(cloud.volume_attached(name,
server_name),
ret)
# 'volume_detached' function tests: 1
def test_volume_detached(self):
'''
Test to check if a block volume is detached.
'''
name = 'mycloud'
server_name = 'mycloud_server'
disk_name = 'trogdor'
ret = {'name': name,
'result': False,
'changes': {},
'comment': ''}
mock = MagicMock(return_value=False)
mock_dict = MagicMock(side_effect=[{name: {'name': disk_name, 'attachments': False}}, {},
{name: {'name': disk_name, 'attachments': True}},
{name: {'name': disk_name, 'attachments': True}},
{name: {'name': disk_name, 'attachments': True}}])
with patch.dict(cloud.__salt__, {'cloud.volume_list': mock_dict,
'cloud.action': mock}):
with patch.object(salt.utils.cloud, 'check_name',
MagicMock(side_effect=[True, False, True])):
comt = ('Invalid characters in name.')
ret.update({'comment': comt})
self.assertDictEqual(cloud.volume_detached(name, server_name),
ret)
ret.update({'name': server_name})
self.assertDictEqual(cloud.volume_detached(name, server_name),
ret)
comt = ('Volume {0} is not currently attached to anything.'.format(disk_name))
ret.update({'comment': comt, 'result': True})
self.assertDictEqual(cloud.volume_detached(name, server_name), ret)
comt = ('Volume {0} does not exist'.format(name))
ret.update({'comment': comt})
self.assertDictEqual(cloud.volume_detached(name, server_name), ret)
comt = ('Server {0} does not exist'.format(server_name))
ret.update({'comment': comt})
self.assertDictEqual(cloud.volume_detached(name, server_name), ret)
mock = MagicMock(return_value=True)
with patch.dict(cloud.__salt__, {'cloud.action': mock,
'cloud.volume_detach': mock}):
with patch.dict(cloud.__opts__, {'test': True}):
comt = ('Volume {0} will be will be detached.'.format(name))
ret.update({'comment': comt, 'result': None})
self.assertDictEqual(cloud.volume_detached(name,
server_name),
ret)
with patch.dict(cloud.__opts__, {'test': False}):
comt = ('Volume {0} was created'.format(name))
ret.update({'comment': comt, 'result': True,
'changes': {'new': True,
'old': {'name': disk_name,
'attachments': True}}})
self.assertDictEqual(cloud.volume_detached(name,
server_name),
ret)
if __name__ == '__main__':
from integration import run_tests
run_tests(CloudTestCase, needs_daemon=False)

View file

@ -0,0 +1,331 @@
# -*- coding: utf-8 -*-
'''
:codeauthor: :email:`Jayesh Kariya <jayeshk@saltstack.com>`
'''
# Import Python libs
from __future__ import absolute_import
import os.path
# Import Salt Testing Libs
from salttesting import skipIf, TestCase
from salttesting.mock import (
NO_MOCK,
NO_MOCK_REASON,
MagicMock,
patch)
from salttesting.helpers import ensure_in_syspath
from salt.exceptions import CommandExecutionError
ensure_in_syspath('../../')
# Import Salt Libs
from salt.states import cmd
cmd.__salt__ = {}
cmd.__opts__ = {}
cmd.__grains__ = {}
cmd.__env__ = {}
@skipIf(NO_MOCK, NO_MOCK_REASON)
class CmdTestCase(TestCase):
'''
Test cases for salt.states.cmd
'''
# 'mod_run_check' function tests: 1
def test_mod_run_check(self):
'''
Test to execute the onlyif and unless logic.
'''
cmd_kwargs = {}
group = 'saltgrp'
creates = '/tmp'
ret = {'comment': 'The group saltgrp is not available', 'result': False}
self.assertDictEqual(cmd.mod_run_check(cmd_kwargs, '', '', group,
creates), ret)
mock = MagicMock(return_value=1)
with patch.dict(cmd.__salt__, {'cmd.retcode': mock}):
with patch.dict(cmd.__opts__, {'test': True}):
ret = {'comment': 'onlyif execution failed', 'result': True,
'skip_watch': True}
self.assertDictEqual(cmd.mod_run_check(cmd_kwargs, '', '',
False, creates), ret)
self.assertDictEqual(cmd.mod_run_check(cmd_kwargs, [''], '',
False, creates), ret)
self.assertDictEqual(cmd.mod_run_check(cmd_kwargs, {}, '',
False, creates), ret)
mock = MagicMock(return_value=0)
with patch.dict(cmd.__salt__, {'cmd.retcode': mock}):
ret = {'comment': 'unless execution succeeded', 'result': True,
'skip_watch': True}
self.assertDictEqual(cmd.mod_run_check(cmd_kwargs, None, '', False,
creates), ret)
self.assertDictEqual(cmd.mod_run_check(cmd_kwargs, None, [''],
False, creates), ret)
self.assertDictEqual(cmd.mod_run_check(cmd_kwargs, None, True,
False, creates), ret)
with patch.object(os.path, 'exists',
MagicMock(sid_effect=[True, True, False])):
ret = {'comment': '/tmp exists', 'result': True}
self.assertDictEqual(cmd.mod_run_check(cmd_kwargs, None, None,
False, creates), ret)
ret = {'comment': 'All files in creates exist', 'result': True}
self.assertDictEqual(cmd.mod_run_check(cmd_kwargs, None, None,
False, [creates]), ret)
self.assertTrue(cmd.mod_run_check(cmd_kwargs, None, None, False,
{}))
# 'wait' function tests: 1
def test_wait(self):
'''
Test to run the given command only if the watch statement calls it.
'''
name = 'cmd.script'
ret = {'name': name,
'result': True,
'changes': {},
'comment': ''}
self.assertDictEqual(cmd.wait(name), ret)
# 'wait_script' function tests: 1
def test_wait_script(self):
'''
Test to download a script from a remote source and execute it
only if a watch statement calls it.
'''
name = 'cmd.script'
ret = {'name': name,
'result': True,
'changes': {},
'comment': ''}
self.assertDictEqual(cmd.wait_script(name), ret)
# 'run' function tests: 1
def test_run(self):
'''
Test to run a command if certain circumstances are met.
'''
name = 'cmd.script'
ret = {'name': name,
'result': False,
'changes': {},
'comment': ''}
with patch.object(os.path, 'isdir', MagicMock(return_value=False)):
comt = ('Desired working directory "/tmp/salt" is not available')
ret.update({'comment': comt})
self.assertDictEqual(cmd.run(name, cwd='/tmp/salt'), ret)
comt = ("Invalidly-formatted 'env' parameter. See documentation.")
ret.update({'comment': comt})
self.assertDictEqual(cmd.run(name, env='salt'), ret)
with patch.dict(cmd.__grains__, {'shell': 'shell'}):
with patch.dict(cmd.__opts__, {'test': False}):
mock = MagicMock(side_effect=[CommandExecutionError,
{'retcode': 1}])
with patch.dict(cmd.__salt__, {'cmd.run_all': mock}):
ret.update({'comment': '', 'result': False})
self.assertDictEqual(cmd.run(name), ret)
ret.update({'comment': 'Command "cmd.script" run',
'result': False, 'changes': {'retcode': 1}})
self.assertDictEqual(cmd.run(name), ret)
with patch.dict(cmd.__opts__, {'test': True}):
comt = ('Command "cmd.script" would have been executed')
ret.update({'comment': comt, 'result': None, 'changes': {}})
self.assertDictEqual(cmd.run(name), ret)
mock = MagicMock(return_value=1)
with patch.dict(cmd.__salt__, {'cmd.retcode': mock}):
comt = ('onlyif execution failed')
ret.update({'comment': comt, 'result': True,
'skip_watch': True})
self.assertDictEqual(cmd.run(name, onlyif=''), ret)
# 'script' function tests: 1
def test_script(self):
'''
Test to download a script and execute it with specified arguments.
'''
name = 'cmd.script'
ret = {'name': name,
'result': False,
'changes': {},
'comment': ''}
with patch.object(os.path, 'isdir', MagicMock(return_value=False)):
comt = ('Desired working directory "/tmp/salt" is not available')
ret.update({'comment': comt})
self.assertDictEqual(cmd.script(name, cwd='/tmp/salt'), ret)
comt = ("Invalidly-formatted 'env' parameter. See documentation.")
ret.update({'comment': comt})
self.assertDictEqual(cmd.script(name, env='salt'), ret)
with patch.dict(cmd.__grains__, {'shell': 'shell'}):
with patch.dict(cmd.__opts__, {'test': True}):
comt = ("Command 'cmd.script' would have been executed")
ret.update({'comment': comt, 'result': None, 'changes': {}})
self.assertDictEqual(cmd.script(name), ret)
with patch.dict(cmd.__opts__, {'test': False}):
mock = MagicMock(side_effect=[CommandExecutionError,
{'retcode': 1}])
with patch.dict(cmd.__salt__, {'cmd.script': mock}):
ret.update({'comment': '', 'result': False})
self.assertDictEqual(cmd.script(name), ret)
ret.update({'comment': "Command 'cmd.script' run",
'result': False, 'changes': {'retcode': 1}})
self.assertDictEqual(cmd.script(name), ret)
mock = MagicMock(return_value=1)
with patch.dict(cmd.__salt__, {'cmd.retcode': mock}):
comt = ('onlyif execution failed')
ret.update({'comment': comt, 'result': True,
'skip_watch': True, 'changes': {}})
self.assertDictEqual(cmd.script(name, onlyif=''), ret)
# 'call' function tests: 1
def test_call(self):
'''
Test to invoke a pre-defined Python function with arguments
specified in the state declaration.
'''
name = 'cmd.script'
# func = 'myfunc'
ret = {'name': name,
'result': False,
'changes': {},
'comment': ''}
flag = None
def func():
'''
Mock func method
'''
if flag:
return {}
else:
return []
with patch.dict(cmd.__grains__, {'shell': 'shell'}):
flag = True
self.assertDictEqual(cmd.call(name, func), ret)
flag = False
comt = ('onlyif execution failed')
ret.update({'comment': '', 'result': False,
'changes': {'retval': []}})
self.assertDictEqual(cmd.call(name, func), ret)
mock = MagicMock(return_value=1)
with patch.dict(cmd.__salt__, {'cmd.retcode': mock}):
with patch.dict(cmd.__opts__, {'test': True}):
comt = ('onlyif execution failed')
ret.update({'comment': comt, 'skip_watch': True,
'result': True, 'changes': {}})
self.assertDictEqual(cmd.call(name, func, onlyif=''), ret)
# 'wait_call' function tests: 1
def test_wait_call(self):
'''
Test to run wait_call.
'''
name = 'cmd.script'
func = 'myfunc'
ret = {'name': name,
'result': True,
'changes': {},
'comment': ''}
self.assertDictEqual(cmd.wait_call(name, func), ret)
# 'mod_watch' function tests: 1
def test_mod_watch(self):
'''
Test to execute a cmd function based on a watch call
'''
name = 'cmd.script'
ret = {'name': name,
'result': False,
'changes': {},
'comment': ''}
def func():
'''
Mock func method
'''
return {}
with patch.dict(cmd.__grains__, {'shell': 'shell'}):
with patch.dict(cmd.__opts__, {'test': False}):
mock = MagicMock(return_value={'retcode': 1})
with patch.dict(cmd.__salt__, {'cmd.run_all': mock}):
self.assertDictEqual(cmd.mod_watch(name, sfun='wait',
stateful=True), ret)
comt = ('Command "cmd.script" run')
ret.update({'comment': comt, 'changes': {'retcode': 1}})
self.assertDictEqual(cmd.mod_watch(name, sfun='wait',
stateful=False), ret)
with patch.dict(cmd.__salt__, {'cmd.script': mock}):
ret.update({'comment': '', 'changes': {}})
self.assertDictEqual(cmd.mod_watch(name, sfun='script',
stateful=True), ret)
comt = ("Command 'cmd.script' run")
ret.update({'comment': comt, 'changes': {'retcode': 1}})
self.assertDictEqual(cmd.mod_watch(name, sfun='script',
stateful=False), ret)
with patch.dict(cmd.__salt__, {'cmd.script': mock}):
ret.update({'comment': '', 'changes': {}})
self.assertDictEqual(cmd.mod_watch(name, sfun='call',
func=func), ret)
comt = ('cmd.call needs a named parameter func')
ret.update({'comment': comt})
self.assertDictEqual(cmd.mod_watch(name, sfun='call'), ret)
comt = ('cmd.salt does not work with the watch requisite,'
' please use cmd.wait or cmd.wait_script')
ret.update({'comment': comt})
self.assertDictEqual(cmd.mod_watch(name, sfun='salt'), ret)
if __name__ == '__main__':
from integration import run_tests
run_tests(CmdTestCase, needs_daemon=False)