Merge remote-tracking branch 'upstream/master' into issue_49427

This commit is contained in:
ch3ll 2020-01-06 14:49:29 -05:00
commit 7d3d823a59
No known key found for this signature in database
GPG key ID: 1124C6796EBDBD8D
34 changed files with 2185 additions and 304 deletions

1
.gitignore vendored
View file

@ -14,6 +14,7 @@ MANIFEST
*.DS_Store
.pytest_cache
Pipfile.lock
.mypy_cache/*
# virtualenv
# - ignores directories of a virtualenv when you create it right on

View file

@ -512,6 +512,7 @@ execution modules
xapi_virt
xbpspkg
xfs
xml
xmpp
yumpkg
zabbix

View file

@ -4,3 +4,4 @@ salt.modules.saltcheck
.. automodule:: salt.modules.saltcheck
:members:
:exclude-members: SaltCheck, StateTestLoader

View file

@ -0,0 +1,6 @@
================
salt.modules.xml
================
.. automodule:: salt.modules.xml
:members:

View file

@ -317,6 +317,7 @@ state modules
win_wusa
winrepo
x509
xml
xmpp
zabbix_action
zabbix_host

View file

@ -0,0 +1,6 @@
===============
salt.states.xml
===============
.. automodule:: salt.states.xml
:members:

View file

@ -4,6 +4,171 @@
Salt Release Notes - Codename Neon
==================================
Saltcheck Updates
=================
Available since 2018.3, the :py:func:`saltcheck module <salt.modules.saltcheck>`
has been enhanced to:
* Support saltenv environments
* Associate tests with states by naming convention
* Adds report_highstate_tests function
* Adds empty and notempty assertions
* Adds skip keyword
* Adds print_result keyword
* Adds assertion_section keyword
* Use saltcheck.state_apply to run state.apply for test setup or teardown
* Changes output to display test time
* Works with salt-ssh
Saltcheck provides unittest like functionality requiring only the knowledge of
salt module execution and yaml. Saltcheck uses salt modules to return data, then
runs an assertion against that return. This allows for testing with all the
features included in salt modules.
In order to run state and highstate saltcheck tests, a sub-folder in the state directory
must be created and named ``saltcheck-tests``. Tests for a state should be created in files
ending in ``*.tst`` and placed in the ``saltcheck-tests`` folder. ``tst`` files are run
through the salt rendering system, enabling tests to be written in yaml (or renderer of choice),
and include jinja, as well as the usual grain and pillar information. Like states, multiple tests can
be specified in a ``tst`` file. Multiple ``tst`` files can be created in the ``saltcheck-tests``
folder, and should be named the same as the associated state. The ``id`` of a test works in the
same manner as in salt state files and should be unique and descriptive.
Usage
-----
Example file system layout:
.. code-block:: text
/srv/salt/apache/
init.sls
config.sls
saltcheck-tests/
init.tst
config.tst
deployment_validation.tst
Tests can be run for each state by name, for all ``apache/saltcheck/*.tst`` files,
or for all states assigned to the minion in top.sls. Tests may also be created
with no associated state. These tests will be run through the use of
``saltcheck.run_state_tests``, but will not be automatically run by
``saltcheck.run_highstate_tests``.
.. code-block:: bash
salt '*' saltcheck.run_state_tests apache,apache.config
salt '*' saltcheck.run_state_tests apache check_all=True
salt '*' saltcheck.run_highstate_tests
salt '*' saltcheck.run_state_tests apache.deployment_validation
Example Tests
-------------
.. code-block:: jinja
{# will run the common salt state before further testing #}
setup_test_environment:
module_and_function: saltcheck.state_apply
args:
- common
pillar-data:
data: value
{% for package in ["apache2", "openssh"] %}
{# or another example #}
{# for package in salt['pillar.get']("packages") #}
jinja_test_{{ package }}_latest:
module_and_function: pkg.upgrade_available
args:
- {{ package }}
assertion: assertFalse
{% endfor %}
validate_user_present_and_shell:
module_and_function: user.info
args:
- root
assertion: assertEqual
expected-return: /bin/bash
assertion_section: shell
print_result: False
skip_test:
module_and_function: pkg.upgrade_available
args:
- apache2
assertion: assertFalse
skip: True
Output Format Changes
---------------------
Saltcheck output has been enhanced to display the time taken per test. This results
in a change to the output format.
Previous Output:
.. code-block:: text
local:
|_
----------
ntp:
----------
ntp-client-installed:
Pass
ntp-service-status:
Pass
|_
----------
TEST RESULTS:
----------
Failed:
0
Missing Tests:
0
Passed:
2
New output:
.. code-block:: text
local:
|_
----------
ntp:
----------
ntp-client-installed:
----------
duration:
1.0408
status:
Pass
ntp-service-status:
----------
duration:
1.464
status:
Pass
|_
----------
TEST RESULTS:
----------
Execution Time:
2.5048
Failed:
0
Missing Tests:
0
Passed:
2
Skipped:
0
Unless and onlyif Enhancements
==============================
@ -81,6 +246,41 @@ as well as managing keystore files.
Hn+GmxZA
-----END CERTIFICATE-----
XML Module
==========
A new state and execution module for editing XML files is now included. Currently it allows for
editing values from an xpath query, or editing XML IDs.
.. code-block:: bash
# salt-call xml.set_attribute /tmp/test.xml ".//actor[@id='3']" editedby "Jane Doe"
local:
True
# salt-call xml.get_attribute /tmp/test.xml ".//actor[@id='3']"
local:
----------
editedby:
Jane Doe
id:
3
# salt-call xml.get_value /tmp/test.xml ".//actor[@id='2']"
local:
Liam Neeson
# salt-call xml.set_value /tmp/test.xml ".//actor[@id='2']" "Patrick Stewart"
local:
True
# salt-call xml.get_value /tmp/test.xml ".//actor[@id='2']"
local:
Patrick Stewart
.. code-block:: yaml
ensure_value_true:
xml.value_present:
- name: /tmp/test.xml
- xpath: .//actor[@id='1']
- value: William Shatner
Jinja enhancements
==================
@ -111,7 +311,6 @@ The module can be also used to test ``json`` and ``yaml`` maps:
salt myminion jinja.import_json myformula/defaults.json
json_query filter
-----------------

163
salt/beacons/cert_info.py Normal file
View file

@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
'''
Beacon to monitor certificate expiration dates from files on the filesystem.
.. versionadded:: Sodium
:maintainer: <devops@eitr.tech>
:maturity: new
:depends: OpenSSL
'''
# Import Python libs
from __future__ import absolute_import, unicode_literals
from datetime import datetime
import logging
# Import salt libs
# pylint: disable=import-error,no-name-in-module,redefined-builtin,3rd-party-module-not-gated
from salt.ext.six.moves import map as _map
from salt.ext.six.moves import range as _range
# pylint: enable=import-error,no-name-in-module,redefined-builtin,3rd-party-module-not-gated
import salt.utils.files
# Import Third Party Libs
try:
from OpenSSL import crypto
HAS_OPENSSL = True
except ImportError:
HAS_OPENSSL = False
log = logging.getLogger(__name__)
DEFAULT_NOTIFY_DAYS = 45
__virtualname__ = 'cert_info'
def __virtual__():
if HAS_OPENSSL is False:
return False
return __virtualname__
def validate(config):
'''
Validate the beacon configuration
'''
_config = {}
list(_map(_config.update, config))
# Configuration for cert_info beacon should be a list of dicts
if not isinstance(config, list):
return False, ('Configuration for cert_info beacon must be a list.')
if 'files' not in _config:
return False, ('Configuration for cert_info beacon '
'must contain files option.')
return True, 'Valid beacon configuration'
def beacon(config):
'''
Monitor the certificate files on the minion.
Specify a notification threshold in days and only emit a beacon if any certificates are
expiring within that timeframe or if `notify_days` equals `-1` (always report information).
The default notification threshold is 45 days and can be overridden at the beacon level and
at an individual certificate level.
.. code-block:: yaml
beacons:
cert_info:
- files:
- /etc/pki/tls/certs/mycert.pem
- /etc/pki/tls/certs/yourcert.pem:
notify_days: 15
- /etc/pki/tls/certs/ourcert.pem
- notify_days: 45
- interval: 86400
'''
ret = []
certificates = []
CryptoError = crypto.Error # pylint: disable=invalid-name
_config = {}
list(_map(_config.update, config))
global_notify_days = _config.get('notify_days', DEFAULT_NOTIFY_DAYS)
for cert_path in _config.get('files', []):
notify_days = global_notify_days
if isinstance(cert_path, dict):
try:
notify_days = cert_path[cert_path.keys()[0]].get('notify_days', global_notify_days)
cert_path = cert_path.keys()[0]
except IndexError as exc:
log.error('Unable to load certificate %s (%s)', cert_path, exc)
continue
try:
with salt.utils.files.fopen(cert_path) as fp_:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, fp_.read())
except (IOError, CryptoError) as exc:
log.error('Unable to load certificate %s (%s)', cert_path, exc)
continue
cert_date = datetime.strptime(cert.get_notAfter().decode(encoding='UTF-8'), "%Y%m%d%H%M%SZ")
date_diff = (cert_date - datetime.today()).days
log.debug('Certificate %s expires in %s days.', cert_path, date_diff)
if notify_days < 0 or date_diff <= notify_days:
log.debug('Certificate %s triggered beacon due to %s day notification threshold.', cert_path, notify_days)
extensions = []
for ext in _range(0, cert.get_extension_count()):
extensions.append(
{
'ext_name': cert.get_extension(ext).get_short_name().decode(encoding='UTF-8'),
'ext_data': str(cert.get_extension(ext))
}
)
certificates.append(
{
'cert_path': cert_path,
'issuer': ','.join(
['{0}="{1}"'.format(
t[0].decode(encoding='UTF-8'),
t[1].decode(encoding='UTF-8')
) for t in cert.get_issuer().get_components()]),
'issuer_dict': {
k.decode('UTF-8'): v.decode('UTF-8') for k, v in cert.get_issuer().get_components()
},
'notAfter_raw': cert.get_notAfter().decode(encoding='UTF-8'),
'notAfter': cert_date.strftime("%Y-%m-%d %H:%M:%SZ"),
'notBefore_raw': cert.get_notBefore().decode(encoding='UTF-8'),
'notBefore': datetime.strptime(
cert.get_notBefore().decode(encoding='UTF-8'), "%Y%m%d%H%M%SZ"
).strftime("%Y-%m-%d %H:%M:%SZ"),
'serial_number': cert.get_serial_number(),
'signature_algorithm': cert.get_signature_algorithm().decode(encoding='UTF-8'),
'subject': ','.join(
['{0}="{1}"'.format(
t[0].decode(encoding='UTF-8'),
t[1].decode(encoding='UTF-8')
) for t in cert.get_subject().get_components()]),
'subject_dict': {
k.decode('UTF-8'): v.decode('UTF-8') for k, v in cert.get_subject().get_components()
},
'version': cert.get_version(),
'extensions': extensions,
'has_expired': cert.has_expired()
}
)
if certificates:
ret.append({'certificates': certificates})
return ret

View file

@ -0,0 +1,145 @@
# -*- coding: utf-8 -*-
'''
Wrap the saltcheck module to copy files to ssh minion before running tests
'''
# Import Python libs
from __future__ import absolute_import, print_function
import logging
import tempfile
import os
import shutil
import tarfile
from contextlib import closing
# Import salt libs
import salt.utils.url
import salt.utils.files
import salt.utils.json
log = logging.getLogger(__name__)
def update_master_cache(states, saltenv='base'):
'''
Replace standard saltcheck version with similar logic but replacing cp.cache_dir with
generating files, tar'ing them up, copying tarball to remote host, extracting tar
to state cache directory, and cleanup of files
'''
cache = __opts__['cachedir']
state_cache = os.path.join(cache, 'files', saltenv)
# Setup for copying states to gendir
gendir = tempfile.mkdtemp()
trans_tar = salt.utils.files.mkstemp()
if 'cp.fileclient_{0}'.format(id(__opts__)) not in __context__:
__context__['cp.fileclient_{0}'.format(id(__opts__))] = \
salt.fileclient.get_file_client(__opts__)
# generate cp.list_states output and save to gendir
cp_output = salt.utils.json.dumps(__salt__['cp.list_states']())
cp_output_file = os.path.join(gendir, 'cp_output.txt')
with salt.utils.files.fopen(cp_output_file, 'w') as fp:
fp.write(cp_output)
# cp state directories to gendir
already_processed = []
sls_list = salt.utils.args.split_input(states)
for state_name in sls_list:
# generate low data for each state and save to gendir
state_low_file = os.path.join(gendir, state_name + '.low')
state_low_output = salt.utils.json.dumps(__salt__['state.show_low_sls'](state_name))
with salt.utils.files.fopen(state_low_file, 'w') as fp:
fp.write(state_low_output)
state_name = state_name.replace(".", os.sep)
if state_name in already_processed:
log.debug("Already cached state for %s", state_name)
else:
file_copy_file = os.path.join(gendir, state_name + '.copy')
log.debug('copying %s to %s', state_name, gendir)
qualified_name = salt.utils.url.create(state_name, saltenv)
# Duplicate cp.get_dir to gendir
copy_result = __context__['cp.fileclient_{0}'.format(id(__opts__))].get_dir(
qualified_name, gendir, saltenv)
if copy_result:
copy_result = [dir.replace(gendir, state_cache) for dir in copy_result]
copy_result_output = salt.utils.json.dumps(copy_result)
with salt.utils.files.fopen(file_copy_file, 'w') as fp:
fp.write(copy_result_output)
already_processed.append(state_name)
else:
# If files were not copied, assume state.file.sls was given and just copy state
state_name = os.path.dirname(state_name)
file_copy_file = os.path.join(gendir, state_name + '.copy')
if state_name in already_processed:
log.debug('Already cached state for %s', state_name)
else:
qualified_name = salt.utils.url.create(state_name, saltenv)
copy_result = __context__['cp.fileclient_{0}'.format(id(__opts__))].get_dir(
qualified_name, gendir, saltenv)
if copy_result:
copy_result = [dir.replace(gendir, state_cache) for dir in copy_result]
copy_result_output = salt.utils.json.dumps(copy_result)
with salt.utils.files.fopen(file_copy_file, 'w') as fp:
fp.write(copy_result_output)
already_processed.append(state_name)
# turn gendir into tarball and remove gendir
try:
# cwd may not exist if it was removed but salt was run from it
cwd = os.getcwd()
except OSError:
cwd = None
os.chdir(gendir)
with closing(tarfile.open(trans_tar, 'w:gz')) as tfp:
for root, dirs, files in salt.utils.path.os_walk(gendir):
for name in files:
full = os.path.join(root, name)
tfp.add(full[len(gendir):].lstrip(os.sep))
if cwd:
os.chdir(cwd)
shutil.rmtree(gendir)
# Copy tarfile to ssh host
single = salt.client.ssh.Single(
__opts__,
'',
**__salt__.kwargs)
thin_dir = __opts__['thin_dir']
ret = single.shell.send(trans_tar, thin_dir)
# Clean up local tar
try:
os.remove(trans_tar)
except (OSError, IOError):
pass
tar_path = os.path.join(thin_dir, os.path.basename(trans_tar))
# Extract remote tarball to cache directory and remove tar file
# TODO this could be better handled by a single state/connection due to ssh overhead
ret = __salt__['file.mkdir'](state_cache)
ret = __salt__['archive.tar']('xf', tar_path, dest=state_cache)
ret = __salt__['file.remove'](tar_path)
return ret
def run_state_tests(states, saltenv='base', check_all=False):
'''
Define common functions to activite this wrapping module and tar copy.
After file copies are finished, run the usual local saltcheck function
'''
ret = update_master_cache(states, saltenv)
ret = __salt__['saltcheck.run_state_tests_ssh'](states, saltenv=saltenv, check_all=check_all)
return ret
def run_highstate_tests(saltenv='base'):
'''
Lookup top files for minion, pass results to wrapped run_state_tests for copy and run
'''
top_states = __salt__['state.show_top']().get(saltenv)
state_string = ','.join(top_states)
ret = run_state_tests(state_string, saltenv)
return ret

View file

@ -55,7 +55,7 @@ def _get_serializer(output):
try:
return getattr(serializers, output)
except AttributeError:
raise CommandExecutionError("Unknown serializer '%s' found for output option", output)
raise CommandExecutionError('Unknown serializer `{}` found for output option'.format(output))
def start(cmd, output='json', interval=1):

View file

@ -61,6 +61,11 @@ def mount(location, access='rw', root=None):
location.lstrip(os.sep).replace('/', '.') + rand
)
log.debug('Establishing new root as %s', root)
if not os.path.isdir(root):
try:
os.makedirs(root)
except OSError:
log.info('Path already existing: %s', root)
else:
break
cmd = 'guestmount -i -a {0} --{1} {2}'.format(location, access, root)

File diff suppressed because it is too large Load diff

View file

@ -131,9 +131,17 @@ def create(path,
CLI Example:
.. code-block:: bash
.. code-block:: console
salt '*' virtualenv.create /path/to/new/virtualenv
Example of using --always-copy environment variable (in case your fs doesn't support symlinks).
This will copy files into the virtualenv instead of symlinking them.
.. code-block:: yaml
- env:
- VIRTUALENV_ALWAYS_COPY: 1
'''
if venv_bin is None:
venv_bin = __opts__.get('venv_bin') or __pillar__.get('venv_bin')

View file

@ -11,7 +11,6 @@ from __future__ import absolute_import, print_function, unicode_literals
# Import Python libs
import logging
import os
import re
# Import Salt libs
import salt.utils.args
@ -50,7 +49,7 @@ def _normalize_dir(string_):
'''
Normalize the directory to make comparison possible
'''
return re.sub(r'\\$', '', salt.utils.stringutils.to_unicode(string_))
return os.path.normpath(salt.utils.stringutils.to_unicode(string_))
def rehash():
@ -200,7 +199,7 @@ def add(path, index=None, **kwargs):
elif index <= -num_dirs:
# Negative index is too large, shift index to beginning of list
index = pos = 0
elif index <= 0:
elif index < 0:
# Negative indexes (other than -1 which is handled above) must
# be inserted at index + 1 for the item to end up in the
# position you want, since list.insert() inserts before the

103
salt/modules/xml.py Normal file
View file

@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
'''
XML file manager
.. versionadded:: Neon
'''
from __future__ import absolute_import, print_function, unicode_literals
import logging
import xml.etree.ElementTree as ET
log = logging.getLogger(__name__)
# Define the module's virtual name
__virtualname__ = 'xml'
def __virtual__():
'''
Only load the module if all modules are imported correctly.
'''
return __virtualname__
def get_value(file, element):
'''
Returns the value of the matched xpath element
CLI Example:
.. code-block:: bash
salt '*' xml.get_value /tmp/test.xml ".//element"
'''
try:
root = ET.parse(file)
element = root.find(element)
return element.text
except AttributeError:
log.error("Unable to find element matching %s", element)
return False
def set_value(file, element, value):
'''
Sets the value of the matched xpath element
CLI Example:
.. code-block:: bash
salt '*' xml.set_value /tmp/test.xml ".//element" "new value"
'''
try:
root = ET.parse(file)
relement = root.find(element)
except AttributeError:
log.error("Unable to find element matching %s", element)
return False
relement.text = str(value)
root.write(file)
return True
def get_attribute(file, element):
'''
Return the attributes of the matched xpath element.
CLI Example:
.. code-block:: bash
salt '*' xml.get_attribute /tmp/test.xml ".//element[@id='3']"
'''
try:
root = ET.parse(file)
element = root.find(element)
return element.attrib
except AttributeError:
log.error("Unable to find element matching %s", element)
return False
def set_attribute(file, element, key, value):
'''
Set the requested attribute key and value for matched xpath element.
CLI Example:
.. code-block:: bash
salt '*' xml.set_attribute /tmp/test.xml ".//element[@id='3']" editedby "gal"
'''
try:
root = ET.parse(file)
element = root.find(element)
except AttributeError:
log.error("Unable to find element matching %s", element)
return False
element.set(key, str(value))
root.write(file)
return True

View file

@ -401,9 +401,9 @@ def running(name,
type: network
source: admin
- graphics:
- type: spice
type: spice
listen:
- type: address
type: address
address: 192.168.0.125
'''

View file

@ -4,11 +4,12 @@ Manage the Windows System PATH
'''
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt libs
import salt.utils.stringutils
# Import Python libs
import os
# Import 3rd-party libs
# Import Salt libs
from salt.ext import six
import salt.utils.stringutils
def __virtual__():
@ -89,7 +90,7 @@ def exists(name, index=None):
- index: -1
'''
try:
name = salt.utils.stringutils.to_unicode(name)
name = os.path.normpath(salt.utils.stringutils.to_unicode(name))
except TypeError:
name = six.text_type(name)
@ -221,7 +222,7 @@ def exists(name, index=None):
'{0} {1} to the PATH{2}.'.format(
'Added' if ret['result'] else 'Failed to add',
name,
' at index {0}'.format(index) if index else ''
' at index {0}'.format(index) if index is not None else ''
)
)

76
salt/states/xml.py Normal file
View file

@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
'''
XML Manager
===========
State managment of XML files
'''
from __future__ import absolute_import, print_function, unicode_literals
# Import Python libs
import logging
log = logging.getLogger(__name__)
def __virtual__():
'''
Only load if the XML execution module is available.
'''
if 'xml.get_value' in __salt__:
return 'xml'
else:
return False, "The xml execution module is not available"
def value_present(name, xpath, value, **kwargs):
'''
.. versionadded:: Neon
Manages a given XML file
name : string
The location of the XML file to manage, as an absolute path.
xpath : string
xpath location to manage
value : string
value to ensure present
.. code-block:: yaml
ensure_value_true:
xml.value_present:
- name: /tmp/test.xml
- xpath: .//playwright[@id='1']
- value: William Shakespeare
'''
ret = {'name': name,
'changes': {},
'result': True,
'comment': ''}
if 'test' not in kwargs:
kwargs['test'] = __opts__.get('test', False)
current_value = __salt__['xml.get_value'](name, xpath)
if not current_value:
ret['result'] = False
ret['comment'] = 'xpath query {0} not found in {1}'.format(xpath, name)
return ret
if current_value != value:
if kwargs['test']:
ret['result'] = None
ret['comment'] = '{0} will be updated'.format(name)
ret['changes'] = {name: {'old': current_value, 'new': value}}
else:
results = __salt__['xml.set_value'](name, xpath, value)
ret['result'] = results
ret['comment'] = '{0} updated'.format(name)
ret['changes'] = {name: {'old': current_value, 'new': value}}
else:
ret['comment'] = '{0} is already present'.format(value)
return ret

View file

@ -0,0 +1,3 @@
saltcheck-test-pass:
test.succeed_without_changes:
- name: testing-saltcheck

View file

@ -0,0 +1,7 @@
check_all_validate:
module_and_function: test.echo
args:
- "check"
kwargs:
assertion: assertEqual
expected_return: 'check'

View file

@ -0,0 +1,7 @@
echo_test_hello:
module_and_function: test.echo
args:
- "hello"
kwargs:
assertion: assertEqual
expected_return: 'hello'

View file

@ -0,0 +1,3 @@
saltcheck-prod-test-pass:
test.succeed_without_changes:
- name: testing-saltcheck-prodenv

View file

@ -0,0 +1,7 @@
check_all_validate_prod:
module_and_function: test.echo
args:
- "check-prod"
kwargs:
assertion: assertEqual
expected_return: 'check-prod'

View file

@ -0,0 +1,7 @@
echo_test_prod_env:
module_and_function: test.echo
args:
- "test-prod"
kwargs:
assertion: assertEqual
expected_return: 'test-prod'

View file

@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
'''
Test the saltcheck module
'''
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Testing libs
from tests.support.case import ModuleCase
class SaltcheckModuleTest(ModuleCase):
'''
Test the saltcheck module
'''
def test_saltcheck_run(self):
'''
saltcheck.run_test
'''
saltcheck_test = {"module_and_function": "test.echo",
"assertion": "assertEqual",
"expected_return": "This works!",
"args": ["This works!"]}
ret = self.run_function('saltcheck.run_test', test=saltcheck_test)
self.assertDictContainsSubset({'status': 'Pass'}, ret)
def test_saltcheck_state(self):
'''
saltcheck.run_state_tests
'''
saltcheck_test = 'validate-saltcheck'
ret = self.run_function('saltcheck.run_state_tests', [saltcheck_test])
self.assertDictContainsSubset({'status': 'Pass'}, ret[0]['validate-saltcheck']['echo_test_hello'])
def test_topfile_validation(self):
'''
saltcheck.run_highstate_tests
'''
expected_top_states = self.run_function('state.show_top').get('base', [])
expected_top_states.append('TEST RESULTS')
ret = self.run_function('saltcheck.run_highstate_tests')
for top_state_dict in ret:
self.assertIn(list(top_state_dict)[0], expected_top_states)
def test_saltcheck_checkall(self):
'''
Validate saltcheck.run_state_tests check_all for the default saltenv of base.
validate-saltcheck state hosts a saltcheck-tests directory with 2 .tst files. By running
check_all=True, both files should be found and show passed results.
'''
saltcheck_test = 'validate-saltcheck'
ret = self.run_function('saltcheck.run_state_tests', [saltcheck_test], check_all=True)
self.assertDictContainsSubset({'status': 'Pass'}, ret[0]['validate-saltcheck']['echo_test_hello'])
self.assertDictContainsSubset({'status': 'Pass'}, ret[0]['validate-saltcheck']['check_all_validate'])
def test_saltcheck_checkall_saltenv(self):
'''
Validate saltcheck.run_state_tests check_all for the prod saltenv
validate-saltcheck state hosts a saltcheck-tests directory with 2 .tst files. By running
check_all=True, both files should be found and show passed results.
'''
saltcheck_test = 'validate-saltcheck'
ret = self.run_function('saltcheck.run_state_tests', [saltcheck_test], saltenv='prod', check_all=True)
self.assertDictContainsSubset({'status': 'Pass'}, ret[0]['validate-saltcheck']['echo_test_prod_env'])
self.assertDictContainsSubset({'status': 'Pass'}, ret[0]['validate-saltcheck']['check_all_validate_prod'])

View file

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Testing Libs
from tests.support.case import SSHCase
from tests.support.unit import skipIf
# Import Salt Libs
import salt.utils.platform
@skipIf(salt.utils.platform.is_windows(), 'salt-ssh not available on Windows')
class SSHSaltcheckTest(SSHCase):
'''
testing saltcheck with salt-ssh
'''
def test_saltcheck_run_test(self):
'''
test saltcheck.run_test with salt-ssh
'''
saltcheck_test = {"module_and_function": "test.echo",
"assertion": "assertEqual",
"expected-return": "Test Works",
"args": ["Test Works"]}
ret = self.run_function('saltcheck.run_test', test=saltcheck_test)
self.assertDictContainsSubset({'status': 'Pass'}, ret)
def test_saltcheck_state(self):
'''
saltcheck.run_state_tests
'''
saltcheck_test = 'validate-saltcheck'
ret = self.run_function('saltcheck.run_state_tests', [saltcheck_test])
self.assertDictContainsSubset({'status': 'Pass'}, ret[0]['validate-saltcheck']['echo_test_hello'])

View file

@ -72,7 +72,7 @@ class ShellTestCase(TestCase, AdaptedConfigurationTestCaseMixin, ScriptPathMixin
return self.run_script('salt', arg_str, with_retcode=with_retcode, catch_stderr=catch_stderr, timeout=timeout)
def run_ssh(self, arg_str, with_retcode=False, timeout=25,
catch_stderr=False, wipe=False, raw=False):
catch_stderr=False, wipe=False, raw=False, **kwargs):
'''
Execute salt-ssh
'''
@ -84,7 +84,12 @@ class ShellTestCase(TestCase, AdaptedConfigurationTestCaseMixin, ScriptPathMixin
os.path.join(RUNTIME_VARS.TMP_CONF_DIR, 'roster'),
arg_str
)
return self.run_script('salt-ssh', arg_str, with_retcode=with_retcode, catch_stderr=catch_stderr, raw=True, timeout=timeout)
return self.run_script('salt-ssh',
arg_str, with_retcode=with_retcode,
catch_stderr=catch_stderr,
raw=True,
timeout=timeout,
**kwargs)
def run_run(self,
arg_str,
@ -197,7 +202,8 @@ class ShellTestCase(TestCase, AdaptedConfigurationTestCaseMixin, ScriptPathMixin
timeout=15,
raw=False,
popen_kwargs=None,
log_output=None):
log_output=None,
**kwargs):
'''
Execute a script with the given argument string
@ -237,6 +243,11 @@ class ShellTestCase(TestCase, AdaptedConfigurationTestCaseMixin, ScriptPathMixin
cmd += 'python{0}.{1} '.format(*sys.version_info)
cmd += '{0} '.format(script_path)
cmd += '{0} '.format(arg_str)
if kwargs:
# late import
import salt.utils.json
for key, value in kwargs.items():
cmd += "'{0}={1} '".format(key, salt.utils.json.dumps(value))
tmp_file = tempfile.SpooledTemporaryFile()
@ -453,7 +464,7 @@ class ShellCase(ShellTestCase, AdaptedConfigurationTestCaseMixin, ScriptPathMixi
return ret
def run_ssh(self, arg_str, with_retcode=False, catch_stderr=False, # pylint: disable=W0221
timeout=RUN_TIMEOUT, wipe=True, raw=False):
timeout=RUN_TIMEOUT, wipe=True, raw=False, **kwargs):
'''
Execute salt-ssh
'''
@ -469,8 +480,9 @@ class ShellCase(ShellTestCase, AdaptedConfigurationTestCaseMixin, ScriptPathMixi
with_retcode=with_retcode,
catch_stderr=catch_stderr,
timeout=timeout,
raw=True)
log.debug('Result of run_ssh for command \'%s\': %s', arg_str, ret)
raw=True,
**kwargs)
log.debug('Result of run_ssh for command \'%s %s\': %s', arg_str, kwargs, ret)
return ret
def run_run(self, arg_str, with_retcode=False, catch_stderr=False,
@ -945,8 +957,8 @@ class SSHCase(ShellCase):
We use a 180s timeout here, which some slower systems do end up needing
'''
ret = self.run_ssh(self._arg_str(function, arg), timeout=timeout,
wipe=wipe, raw=raw)
log.debug('SSHCase run_function executed %s with arg %s', function, arg)
wipe=wipe, raw=raw, **kwargs)
log.debug('SSHCase run_function executed %s with arg %s and kwargs %s', function, arg, kwargs)
log.debug('SSHCase JSON return: %s', ret)
# Late import

View file

@ -0,0 +1,107 @@
# coding: utf-8
# Python libs
from __future__ import absolute_import
import logging
# Salt testing libs
from tests.support.unit import TestCase
from tests.support.mock import patch, mock_open
from tests.support.mixins import LoaderModuleMockMixin
# Salt libs
import salt.beacons.cert_info as cert_info
log = logging.getLogger(__name__)
_TEST_CERT = '''
-----BEGIN CERTIFICATE-----
MIIC/jCCAeagAwIBAgIJAIQMfu6ShHvfMA0GCSqGSIb3DQEBCwUAMCQxIjAgBgNV
BAMMGXNhbHR0ZXN0LTAxLmV4YW1wbGUubG9jYWwwHhcNMTkwNjAzMjA1OTIyWhcN
MjkwNTMxMjA1OTIyWjAkMSIwIAYDVQQDDBlzYWx0dGVzdC0wMS5leGFtcGxlLmxv
Y2FsMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAv5UxxKGsOO8n2hUk
KjL8r2Rjt0om4wwdXUu0R1fQUlaSO0g+vk0wHHaovoVcEU6uZlhDPw1qZ4C+cp9Z
rDzSfwI2Njg813I5jzTBgox+3pJ+82vgXZ14xpqZ+f0ACMo4uRPjBkyQpHqYiDJ3
VockZSxm5s7RT05xDnedDfPgu1WAvzQovWO6slCs+Hlp8sh6QAy/hIwOZ0hT8y3J
NV6PSPqK7BEypOPak36+ogtiuPxxat4da74SUVS8Ffupnr40BjqVqEXBvfIIHiQt
3r5gpjoBjrWX2ccgQlHQP8gFaToFxWLSSYVT6E8Oj5UEywpmvPDRjJsJ5epscblT
oFyVXQIDAQABozMwMTAJBgNVHRMEAjAAMCQGA1UdEQQdMBuCGXNhbHR0ZXN0LTAx
LmV4YW1wbGUubG9jYWwwDQYJKoZIhvcNAQELBQADggEBABPqQlkaZDV5dPwNO/s2
PBT/19LroOwQ+fBJgZpbGha5/ZaSr+jcYZf2jAicPajWGlY/rXAdBSuxpmUYCC12
23tI4stwGyB8Quuoyg2Z+5LQJSDA1LxNJ1kxQfDUnS3tVQa0wJVtq8W9wNryNONL
noaQaDcdbGx3V15W+Bx0as5NfIWqz1uVi4MGGxI6hMBuDD7E7M+k1db8EaS+tI4u
seZBENjwjJA6zZmTXvYyzV5OBP4JyOhYuG9aqr7e6/yjPBEtZv0TJ9KMMbcywvE9
9FF+l4Y+wgKR/icrpDEpPlC4wYn64sy5vk7EGVagnVyhkjLJ52rn4trzyPox8FmO
2Zw=
-----END CERTIFICATE-----
'''
class CertInfoBeaconTestCase(TestCase, LoaderModuleMockMixin):
'''
Test case for salt.beacons.cert_info
'''
def setup_loader_modules(self):
return {
cert_info: {
'__context__': {},
'__salt__': {},
}
}
def test_non_list_config(self):
config = {}
ret = cert_info.validate(config)
self.assertEqual(ret, (False, 'Configuration for cert_info beacon must'
' be a list.'))
def test_empty_config(self):
config = [{}]
ret = cert_info.validate(config)
self.assertEqual(ret, (False, 'Configuration for cert_info beacon '
'must contain files option.'))
def test_cert_information(self):
with patch('salt.utils.files.fopen',
mock_open(read_data=_TEST_CERT)):
config = [{'files': ['/etc/pki/tls/certs/mycert.pem'],
'notify_days': -1
}]
ret = cert_info.validate(config)
self.assertEqual(ret, (True, 'Valid beacon configuration'))
_expected_return = [
{
'certificates': [
{
'cert_path': '/etc/pki/tls/certs/mycert.pem',
'extensions': [{'ext_data': 'CA:FALSE',
'ext_name': 'basicConstraints'},
{'ext_data': 'DNS:salttest-01.example.local',
'ext_name': 'subjectAltName'}],
'has_expired': False,
'issuer': 'CN="salttest-01.example.local"',
'issuer_dict': {'CN': 'salttest-01.example.local'},
'notAfter': '2029-05-31 20:59:22Z',
'notAfter_raw': '20290531205922Z',
'notBefore': '2019-06-03 20:59:22Z',
'notBefore_raw': '20190603205922Z',
'serial_number': 9515119675852487647,
'signature_algorithm': 'sha256WithRSAEncryption',
'subject': 'CN="salttest-01.example.local"',
'subject_dict': {'CN': 'salttest-01.example.local'},
'version': 2
}
]
}
]
ret = cert_info.beacon(config)
self.assertEqual(ret, _expected_return)

View file

@ -28,10 +28,40 @@ class GuestfsTestCase(TestCase, LoaderModuleMockMixin):
# 'mount' function tests: 1
def test_mount(self):
'''
Test if it mount an image
Test if it mounts an image
'''
with patch('os.path.join', MagicMock(return_value=True)), \
patch('os.path.isdir', MagicMock(return_value=True)), \
# Test case with non-existing mount folder
run_mock = MagicMock(return_value='')
with patch('os.path.join', MagicMock(return_value='/tmp/guest/fedora.qcow')), \
patch('os.path.isdir', MagicMock(return_value=False)), \
patch('os.makedirs', MagicMock()) as makedirs_mock, \
patch('os.listdir', MagicMock(return_value=False)), \
patch.dict(guestfs.__salt__, {'cmd.run': MagicMock(return_value='')}):
patch.dict(guestfs.__salt__, {'cmd.run': run_mock}):
self.assertTrue(guestfs.mount('/srv/images/fedora.qcow'))
run_mock.assert_called_once_with('guestmount -i -a /srv/images/fedora.qcow --rw /tmp/guest/fedora.qcow',
python_shell=False)
makedirs_mock.assert_called_once()
# Test case with existing but empty mount folder
run_mock.reset_mock()
with patch('os.path.join', MagicMock(return_value='/tmp/guest/fedora.qcow')), \
patch('os.path.isdir', MagicMock(return_value=True)), \
patch('os.makedirs', MagicMock()) as makedirs_mock, \
patch('os.listdir', MagicMock(return_value=False)), \
patch.dict(guestfs.__salt__, {'cmd.run': run_mock}):
self.assertTrue(guestfs.mount('/srv/images/fedora.qcow'))
run_mock.assert_called_once_with('guestmount -i -a /srv/images/fedora.qcow --rw /tmp/guest/fedora.qcow',
python_shell=False)
makedirs_mock.assert_not_called()
# Test case with existing but not empty mount folder
run_mock.reset_mock()
with patch('os.path.join', MagicMock(side_effect=['/tmp/guest/fedora.qcow', '/tmp/guest/fedora.qcowabc'])), \
patch('os.path.isdir', MagicMock(side_effect=[True, False])), \
patch('os.makedirs', MagicMock()) as makedirs_mock, \
patch('os.listdir', MagicMock(side_effect=[True, False])), \
patch.dict(guestfs.__salt__, {'cmd.run': run_mock}):
self.assertTrue(guestfs.mount('/srv/images/fedora.qcow'))
run_mock.assert_called_once_with('guestmount -i -a /srv/images/fedora.qcow --rw /tmp/guest/fedora.qcowabc',
python_shell=False)
makedirs_mock.assert_called_once()

View file

@ -15,7 +15,7 @@ from tests.support.unit import TestCase
from tests.support.mock import MagicMock, patch
class LinuxSysctlTestCase(TestCase, LoaderModuleMockMixin):
class SaltcheckTestCase(TestCase, LoaderModuleMockMixin):
'''
TestCase for salt.modules.saltcheck module
'''
@ -25,11 +25,12 @@ class LinuxSysctlTestCase(TestCase, LoaderModuleMockMixin):
local_opts = salt.config.minion_config(
os.path.join(syspaths.CONFIG_DIR, 'minion'))
local_opts['file_client'] = 'local'
local_opts['conf_file'] = '/etc/salt/minion'
patcher = patch('salt.config.minion_config',
MagicMock(return_value=local_opts))
patcher.start()
self.addCleanup(patcher.stop)
return {saltcheck: {}}
return {saltcheck: {'__opts__': local_opts}}
def test_call_salt_command(self):
'''test simple test.echo module'''
@ -38,13 +39,9 @@ class LinuxSysctlTestCase(TestCase, LoaderModuleMockMixin):
'cp.cache_master': MagicMock(return_value=[True])
}):
sc_instance = saltcheck.SaltCheck()
returned = sc_instance.call_salt_command(fun="test.echo", args=['hello'], kwargs=None)
returned = sc_instance._call_salt_command(fun="test.echo", args=['hello'], kwargs=None)
self.assertEqual(returned, 'hello')
def test_update_master_cache(self):
'''test master cache'''
self.assertTrue(saltcheck.update_master_cache)
def test_call_salt_command2(self):
'''test simple test.echo module again'''
with patch.dict(saltcheck.__salt__, {'config.get': MagicMock(return_value=True),
@ -52,7 +49,7 @@ class LinuxSysctlTestCase(TestCase, LoaderModuleMockMixin):
'cp.cache_master': MagicMock(return_value=[True])
}):
sc_instance = saltcheck.SaltCheck()
returned = sc_instance.call_salt_command(fun="test.echo", args=['hello'], kwargs=None)
returned = sc_instance._call_salt_command(fun="test.echo", args=['hello'], kwargs=None)
self.assertNotEqual(returned, 'not-hello')
def test__assert_equal1(self):
@ -309,6 +306,42 @@ class LinuxSysctlTestCase(TestCase, LoaderModuleMockMixin):
mybool = sc_instance._SaltCheck__assert_less_equal(aaa, bbb)
self.assertEqual(mybool, 'Pass')
def test__assert_empty(self):
'''test'''
with patch.dict(saltcheck.__salt__, {'config.get': MagicMock(return_value=True),
'cp.cache_master': MagicMock(return_value=[True])
}):
sc_instance = saltcheck.SaltCheck()
mybool = sc_instance._SaltCheck__assert_empty("")
self.assertEqual(mybool, 'Pass')
def test__assert_empty_fail(self):
'''test'''
with patch.dict(saltcheck.__salt__, {'config.get': MagicMock(return_value=True),
'cp.cache_master': MagicMock(return_value=[True])
}):
sc_instance = saltcheck.SaltCheck()
mybool = sc_instance._SaltCheck__assert_empty("data")
self.assertNotEqual(mybool, 'Pass')
def test__assert__not_empty(self):
'''test'''
with patch.dict(saltcheck.__salt__, {'config.get': MagicMock(return_value=True),
'cp.cache_master': MagicMock(return_value=[True])
}):
sc_instance = saltcheck.SaltCheck()
mybool = sc_instance._SaltCheck__assert_not_empty("data")
self.assertEqual(mybool, 'Pass')
def test__assert__not_empty_fail(self):
'''test'''
with patch.dict(saltcheck.__salt__, {'config.get': MagicMock(return_value=True),
'cp.cache_master': MagicMock(return_value=[True])
}):
sc_instance = saltcheck.SaltCheck()
mybool = sc_instance._SaltCheck__assert_not_empty("")
self.assertNotEqual(mybool, 'Pass')
def test_run_test_1(self):
'''test'''
with patch.dict(saltcheck.__salt__, {'config.get': MagicMock(return_value=True),
@ -317,7 +350,252 @@ class LinuxSysctlTestCase(TestCase, LoaderModuleMockMixin):
'cp.cache_master': MagicMock(return_value=[True])}):
returned = saltcheck.run_test(test={"module_and_function": "test.echo",
"assertion": "assertEqual",
"expected-return": "This works!",
"expected_return": "This works!",
"args": ["This works!"]
})
self.assertEqual(returned, 'Pass')
self.assertEqual(returned['status'], 'Pass')
def test_report_highstate_tests(self):
'''test report_highstate_tests'''
expected_output = {'TEST REPORT RESULTS': {
'States missing tests': ['state1'],
'Missing Tests': 1,
'States with tests': ['found']
}}
with patch('salt.modules.saltcheck._get_top_states') as mocked_get_top:
mocked_get_top.return_value = ['state1', 'found']
with patch('salt.modules.saltcheck.StateTestLoader') as mocked_stl:
instance = mocked_stl.return_value
instance.found_states = ['found']
returned = saltcheck.report_highstate_tests()
self.assertEqual(returned, expected_output)
def test_validation(self):
'''test validation of tests'''
sc_instance = saltcheck.SaltCheck()
# Fail on empty test
test_dict = {}
expected_return = False
val_ret = sc_instance._SaltCheck__is_valid_test(test_dict)
self.assertEqual(val_ret, expected_return)
# Succeed on standard test
test_dict = {
'module_and_function': 'test.echo',
'args': ["hello"],
'kwargs': {},
'assertion': 'assertEqual',
'expected_return': 'hello'
}
expected_return = True
with patch.dict(saltcheck.__salt__, {'sys.list_modules': MagicMock(return_value=['test']),
'sys.list_functions': MagicMock(return_value=['test.echo'])
}):
val_ret = sc_instance._SaltCheck__is_valid_test(test_dict)
self.assertEqual(val_ret, expected_return)
# Succeed on standard test with older expected-return syntax
test_dict = {
'module_and_function': 'test.echo',
'args': ["hello"],
'kwargs': {},
'assertion': 'assertEqual',
'expected-return': 'hello'
}
expected_return = True
with patch.dict(saltcheck.__salt__, {'sys.list_modules': MagicMock(return_value=['test']),
'sys.list_functions': MagicMock(return_value=['test.echo'])
}):
val_ret = sc_instance._SaltCheck__is_valid_test(test_dict)
self.assertEqual(val_ret, expected_return)
# Do not require expected_return for some assertions
assertions = ["assertEmpty",
"assertNotEmpty",
"assertTrue",
"assertFalse"]
for assertion in assertions:
test_dict = {
'module_and_function': 'test.echo',
'args': ["hello"]
}
test_dict['assertion'] = assertion
expected_return = True
with patch.dict(saltcheck.__salt__, {'sys.list_modules': MagicMock(return_value=['test']),
'sys.list_functions': MagicMock(return_value=['test.echo'])
}):
val_ret = sc_instance._SaltCheck__is_valid_test(test_dict)
self.assertEqual(val_ret, expected_return)
# Fail on invalid module
test_dict = {
'module_and_function': 'broken.echo',
'args': ["hello"],
'kwargs': {},
'assertion': 'assertEqual',
'expected_return': 'hello'
}
expected_return = False
with patch.dict(saltcheck.__salt__, {'sys.list_modules': MagicMock(return_value=['test']),
'sys.list_functions': MagicMock(return_value=['test.echo'])
}):
val_ret = sc_instance._SaltCheck__is_valid_test(test_dict)
self.assertEqual(val_ret, expected_return)
# Fail on invalid function
test_dict = {
'module_and_function': 'test.broken',
'args': ["hello"],
'kwargs': {},
'assertion': 'assertEqual',
'expected_return': 'hello'
}
expected_return = False
with patch.dict(saltcheck.__salt__, {'sys.list_modules': MagicMock(return_value=['test']),
'sys.list_functions': MagicMock(return_value=['test.echo'])
}):
val_ret = sc_instance._SaltCheck__is_valid_test(test_dict)
self.assertEqual(val_ret, expected_return)
# Fail on missing expected_return
test_dict = {
'module_and_function': 'test.echo',
'args': ["hello"],
'kwargs': {},
'assertion': 'assertEqual'
}
expected_return = False
with patch.dict(saltcheck.__salt__, {'sys.list_modules': MagicMock(return_value=['test']),
'sys.list_functions': MagicMock(return_value=['test.echo'])
}):
val_ret = sc_instance._SaltCheck__is_valid_test(test_dict)
self.assertEqual(val_ret, expected_return)
# Fail on empty expected_return
test_dict = {
'module_and_function': 'test.echo',
'args': ["hello"],
'kwargs': {},
'assertion': 'assertEqual',
'expected_return': None
}
expected_return = False
with patch.dict(saltcheck.__salt__, {'sys.list_modules': MagicMock(return_value=['test']),
'sys.list_functions': MagicMock(return_value=['test.echo'])
}):
val_ret = sc_instance._SaltCheck__is_valid_test(test_dict)
self.assertEqual(val_ret, expected_return)
# Succeed on m_and_f saltcheck.state_apply with only args
test_dict = {
'module_and_function': 'saltcheck.state_apply',
'args': ["common"]
}
expected_return = True
with patch.dict(saltcheck.__salt__, {'sys.list_modules': MagicMock(return_value=['saltcheck']),
'sys.list_functions': MagicMock(return_value=['saltcheck.state_apply'])
}):
val_ret = sc_instance._SaltCheck__is_valid_test(test_dict)
self.assertEqual(val_ret, expected_return)
def test_sls_path_generation(self):
'''test generation of sls paths'''
with patch.dict(saltcheck.__salt__, {'config.get': MagicMock(return_value='saltcheck-tests')}):
testLoader = saltcheck.StateTestLoader()
state_name = 'teststate'
expected_return = ['salt://teststate/saltcheck-tests',
'salt:///saltcheck-tests']
ret = testLoader._generate_sls_path(state_name)
self.assertEqual(ret, expected_return)
state_name = 'teststate.long.path'
expected_return = ['salt://teststate/long/path/saltcheck-tests',
'salt://teststate/long/saltcheck-tests',
'salt://teststate/saltcheck-tests']
ret = testLoader._generate_sls_path(state_name)
self.assertEqual(ret, expected_return)
state_name = 'teststate.really.long.path'
expected_return = ['salt://teststate/really/long/path/saltcheck-tests',
'salt://teststate/really/long/saltcheck-tests',
'salt://teststate/saltcheck-tests']
ret = testLoader._generate_sls_path(state_name)
self.assertEqual(ret, expected_return)
def test_generate_output(self):
# passing states
sc_results = {'a_state':
{'test_id1': {'status': 'Pass', 'duration': 1.987},
'test_id2': {'status': 'Pass', 'duration': 1.123}}}
expected_output = [{'a_state':
{'test_id1': {'status': 'Pass', 'duration': 1.987},
'test_id2': {'status': 'Pass', 'duration': 1.123}}},
{'TEST RESULTS':
{'Execution Time': 3.11,
'Passed': 2,
'Failed': 0,
'Skipped': 0,
'Missing Tests': 0
}
}]
ret = saltcheck._generate_out_list(sc_results)
self.assertEqual(ret, expected_output)
# Skipped
sc_results = {'a_state':
{'test_id1': {'status': 'Skip', 'duration': 1.987},
'test_id2': {'status': 'Pass', 'duration': 1.123}}}
expected_output = [{'a_state':
{'test_id1': {'status': 'Skip', 'duration': 1.987},
'test_id2': {'status': 'Pass', 'duration': 1.123}}},
{'TEST RESULTS':
{'Execution Time': 3.11,
'Passed': 1,
'Failed': 0,
'Skipped': 1,
'Missing Tests': 0
}
}]
ret = saltcheck._generate_out_list(sc_results)
self.assertEqual(ret, expected_output)
# Failed (does not test setting __context__)
sc_results = {'a_state':
{'test_id1': {'status': 'Failed', 'duration': 1.987},
'test_id2': {'status': 'Pass', 'duration': 1.123}}}
expected_output = [{'a_state':
{'test_id1': {'status': 'Failed', 'duration': 1.987},
'test_id2': {'status': 'Pass', 'duration': 1.123}}},
{'TEST RESULTS':
{'Execution Time': 3.11,
'Passed': 1,
'Failed': 1,
'Skipped': 0,
'Missing Tests': 0
}
}]
ret = saltcheck._generate_out_list(sc_results)
self.assertEqual(ret, expected_output)
# missing states
sc_results = {'a_state':
{'test_id1': {'status': 'Pass', 'duration': 1.987},
'test_id2': {'status': 'Pass', 'duration': 1.123}},
'b_state': {}
}
expected_output = [{'a_state':
{'test_id1': {'status': 'Pass', 'duration': 1.987},
'test_id2': {'status': 'Pass', 'duration': 1.123}}},
{'b_state': {}},
{'TEST RESULTS':
{'Execution Time': 3.11,
'Passed': 2,
'Failed': 0,
'Skipped': 0,
'Missing Tests': 1
}
}]
ret = saltcheck._generate_out_list(sc_results)
self.assertEqual(ret, expected_output)

View file

@ -114,6 +114,13 @@ class WinPathTestCase(TestCase, LoaderModuleMockMixin):
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
# Test adding with a custom index of 0
ret, env, mock_set = _run('c:\\salt', index=0, retval=True)
new_path = ('c:\\salt', 'C:\\Foo', 'C:\\Bar')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
# Test adding path with a case-insensitive match already present, and
# no index provided. The path should remain unchanged and we should not
# update the registry.

View file

@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
'''
Tests for xml module
'''
from __future__ import absolute_import, print_function, unicode_literals
import os
import tempfile
from salt.modules import xml
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.unit import TestCase
XML_STRING = '''
<root xmlns:foo="http://www.foo.org/" xmlns:bar="http://www.bar.org">
<actors>
<actor id="1">Christian Bale</actor>
<actor id="2">Liam Neeson</actor>
<actor id="3">Michael Caine</actor>
</actors>
<foo:singers>
<foo:singer id="4">Tom Waits</foo:singer>
<foo:singer id="5">B.B. King</foo:singer>
<foo:singer id="6">Ray Charles</foo:singer>
</foo:singers>
</root>
'''
class XmlTestCase(TestCase, LoaderModuleMockMixin):
'''
Test cases for salt.modules.xml
'''
def setup_loader_modules(self):
return {xml: {}}
def test_get_value(self):
'''
Verify xml.get_value
'''
with tempfile.NamedTemporaryFile('w+', delete=False) as xml_file:
xml_file.write(XML_STRING)
xml_file.flush()
xml_result = xml.get_value(xml_file.name, ".//actor[@id='2']")
self.assertEqual(xml_result, "Liam Neeson")
os.remove(xml_file.name)
def test_set_value(self):
'''
Verify xml.set_value
'''
with tempfile.NamedTemporaryFile('w+', delete=False) as xml_file:
xml_file.write(XML_STRING)
xml_file.flush()
xml_result = xml.set_value(xml_file.name, ".//actor[@id='2']", "Patrick Stewart")
assert xml_result is True
xml_result = xml.get_value(xml_file.name, ".//actor[@id='2']")
self.assertEqual(xml_result, "Patrick Stewart")
os.remove(xml_file.name)
def test_get_attribute(self):
'''
Verify xml.get_attribute
'''
with tempfile.NamedTemporaryFile('w+', delete=False) as xml_file:
xml_file.write(XML_STRING)
xml_file.flush()
xml_result = xml.get_attribute(xml_file.name, ".//actor[@id='3']")
self.assertEqual(xml_result, {"id": "3"})
os.remove(xml_file.name)
def test_set_attribute(self):
'''
Verify xml.set_value
'''
with tempfile.NamedTemporaryFile('w+', delete=False) as xml_file:
xml_file.write(XML_STRING)
xml_file.flush()
xml_result = xml.set_attribute(xml_file.name, ".//actor[@id='3']", "edited", "uh-huh")
assert xml_result is True
xml_result = xml.get_attribute(xml_file.name, ".//actor[@id='3']")
self.assertEqual(xml_result, {'edited': 'uh-huh', 'id': '3'})
os.remove(xml_file.name)

View file

@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
'''
Test cases for xml state
'''
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.unit import TestCase
from tests.support.mock import (
MagicMock,
patch)
# Import Salt Libs
import salt.states.xml as xml
class XMLTestCase(TestCase, LoaderModuleMockMixin):
'''
Test cases for salt.states.xml
'''
def setup_loader_modules(self):
return {xml: {}}
def test_value_already_present(self):
'''
Test for existing value_present
'''
name = "testfile.xml"
xpath = ".//list[@id='1']"
value = "test value"
state_return = {
'name': name,
'changes': {},
'result': True,
'comment': '{0} is already present'.format(value)
}
with patch.dict(xml.__salt__, {'xml.get_value': MagicMock(return_value=value)}):
self.assertDictEqual(xml.value_present(name, xpath, value), state_return)
def test_value_update(self):
'''
Test for updating value_present
'''
name = "testfile.xml"
xpath = ".//list[@id='1']"
value = "test value"
old_value = "not test value"
state_return = {
'name': name,
'changes': {name: {'new': value, 'old': old_value}},
'result': True,
'comment': '{0} updated'.format(name)
}
with patch.dict(xml.__salt__, {'xml.get_value': MagicMock(return_value=old_value)}):
with patch.dict(xml.__salt__, {'xml.set_value': MagicMock(return_value=True)}):
self.assertDictEqual(xml.value_present(name, xpath, value), state_return)
def test_value_update_test(self):
'''
Test for value_present test=True
'''
name = "testfile.xml"
xpath = ".//list[@id='1']"
value = "test value"
old_value = "not test value"
state_return = {
'name': name,
'changes': {name: {'old': old_value, 'new': value}},
'result': None,
'comment': '{0} will be updated'.format(name)
}
with patch.dict(xml.__salt__, {'xml.get_value': MagicMock(return_value=old_value)}):
self.assertDictEqual(xml.value_present(name, xpath, value, test=True), state_return)
def test_value_update_invalid_xpath(self):
'''
Test for value_present invalid xpath
'''
name = "testfile.xml"
xpath = ".//list[@id='1']"
value = "test value"
state_return = {
'name': name,
'changes': {},
'result': False,
'comment': 'xpath query {0} not found in {1}'.format(xpath, name)
}
with patch.dict(xml.__salt__, {'xml.get_value': MagicMock(return_value=False)}):
self.assertDictEqual(xml.value_present(name, xpath, value, test=True), state_return)

View file

@ -186,6 +186,7 @@ class BadTestModuleNamesTestCase(TestCase):
'integration.ssh.test_mine',
'integration.ssh.test_pillar',
'integration.ssh.test_raw',
'integration.ssh.test_saltcheck',
'integration.ssh.test_state',
'integration.states.test_compiler',
'integration.states.test_handle_error',