Merge branch 'master' into backport_49981

This commit is contained in:
Daniel Wozniak 2020-04-21 23:03:42 -07:00 committed by GitHub
commit 65b9caa690
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 846 additions and 332 deletions

View file

@ -15,6 +15,9 @@ Versions are `MAJOR.PATCH`.
### Changed
- [#56751](https://github.com/saltstack/salt/pull/56751) - Backport 49981
- [#56731](https://github.com/saltstack/salt/pull/56731) - Backport #53994
- [#56753](https://github.com/saltstack/salt/pull/56753) - Backport 51095
### Fixed
- [#56237](https://github.com/saltstack/salt/pull/56237) - Fix alphabetical ordering and remove duplicates across all documentation indexes - [@myii](https://github.com/myii)
- [#56325](https://github.com/saltstack/salt/pull/56325) - Fix hyperlinks to `salt.serializers` and other documentation issues - [@myii](https://github.com/myii)

View file

@ -87,7 +87,7 @@ the context into the included file is required:
.. code-block:: jinja
{% from 'lib.sls' import test with context %}
Includes must use full paths, like so:
.. code-block:: jinja
@ -649,6 +649,56 @@ Returns:
1, 4
.. jinja_ref:: method_call
``method_call``
---------------
.. versionadded:: Sodium
Returns a result of object's method call.
Example #1:
.. code-block:: jinja
{{ [1, 2, 1, 3, 4] | method_call('index', 1, 1, 3) }}
Returns:
.. code-block:: text
2
This filter can be used with the `map filter`_ to apply object methods without
using loop constructs or temporary variables.
Example #2:
.. code-block:: jinja
{% set host_list = ['web01.example.com', 'db01.example.com'] %}
{% set host_list_split = [] %}
{% for item in host_list %}
{% do host_list_split.append(item.split('.', 1)) %}
{% endfor %}
{{ host_list_split }}
Example #3:
.. code-block:: jinja
{{ host_list|map('method_call', 'split', '.', 1)|list }}
Return of examples #2 and #3:
.. code-block:: text
[[web01, example.com], [db01, example.com]]
.. _`map filter`: http://jinja.pocoo.org/docs/2.10/templates/#map
.. jinja_ref:: is_sorted
``is_sorted``

View file

@ -79,6 +79,11 @@ def communicator(func):
queue.put("ERROR")
queue.put("Exception")
queue.put("{0}\n{1}\n".format(ex, trace))
except SystemExit as ex:
trace = traceback.format_exc()
queue.put("ERROR")
queue.put("System exit")
queue.put("{0}\n{1}\n".format(ex, trace))
return ret
return _call

View file

@ -268,6 +268,10 @@ def create(vm_):
"deploy", vm_, __opts__, default=False
)
# If ssh_host is not set, default to the minion name
if not config.get_cloud_config_value("ssh_host", vm_, __opts__, default=""):
vm_["ssh_host"] = vm_["name"]
if deploy_config:
wol_mac = config.get_cloud_config_value(
"wake_on_lan_mac", vm_, __opts__, default=""

View file

@ -116,6 +116,14 @@ def describe_topic(name, region=None, key=None, keyid=None, profile=None):
ret["Attributes"] = get_topic_attributes(
arn, region=region, key=key, keyid=keyid, profile=profile
)
# Grab extended attributes for the above subscriptions
for sub in range(len(ret["Subscriptions"])):
sub_arn = ret["Subscriptions"][sub]["SubscriptionArn"]
if not sub_arn.startswith("arn:aws:sns:"):
# Sometimes a sub is in e.g. PendingAccept or other
# wierd states and doesn't have an ARN yet
log.debug("Subscription with invalid ARN %s skipped...", sub_arn)
continue
return ret
@ -382,6 +390,17 @@ def unsubscribe(SubscriptionArn, region=None, key=None, keyid=None, profile=None
salt myminion boto3_sns.unsubscribe my_subscription_arn region=us-east-1
"""
if not SubscriptionArn.startswith("arn:aws:sns:"):
# Grrr, AWS sent us an ARN that's NOT and ARN....
# This can happen if, for instance, a subscription is left in PendingAcceptance or similar
# Note that anything left in PendingConfirmation will be auto-deleted by AWS after 30 days
# anyway, so this isn't as ugly a hack as it might seem at first...
log.info(
"Invalid subscription ARN `%s` passed - likely a PendingConfirmaton or such. "
"Skipping unsubscribe attempt as it would almost certainly fail...",
SubscriptionArn,
)
return True
subs = list_subscriptions(region=region, key=key, keyid=keyid, profile=profile)
sub = [s for s in subs if s.get("SubscriptionArn") == SubscriptionArn]
if not sub:

View file

@ -264,6 +264,7 @@ def create_function(
.. code-block:: bash
salt myminion boto_lamba.create_function my_function python2.7 my_role my_file.my_function my_function.zip
salt myminion boto_lamba.create_function my_function python2.7 my_role my_file.my_function salt://files/my_function.zip
"""
@ -276,6 +277,13 @@ def create_function(
"Either ZipFile must be specified, or "
"S3Bucket and S3Key must be provided."
)
if "://" in ZipFile: # Looks like a remote URL to me...
dlZipFile = __salt__["cp.cache_file"](path=ZipFile)
if dlZipFile is False:
ret["result"] = False
ret["comment"] = "Failed to cache ZipFile `{0}`.".format(ZipFile)
return ret
ZipFile = dlZipFile
code = {
"ZipFile": _filedata(ZipFile),
}

View file

@ -398,10 +398,20 @@ def convert_to_group_ids(
)
if not group_id:
# Security groups are a big deal - need to fail if any can't be resolved...
raise CommandExecutionError(
"Could not resolve Security Group name "
"{0} to a Group ID".format(group)
)
# But... if we're running in test mode, it may just be that the SG is scheduled
# to be created, and thus WOULD have been there if running "for real"...
if __opts__["test"]:
log.warn(
"Security Group `%s` could not be resolved to an ID. This may "
"cause a failure when not running in test mode.",
group,
)
return []
else:
raise CommandExecutionError(
"Could not resolve Security Group name "
"{0} to a Group ID".format(group)
)
else:
group_ids.append(six.text_type(group_id))
log.debug("security group contents %s post-conversion", group_ids)

View file

@ -289,14 +289,91 @@ def refresh_db(failhard=False, **kwargs): # pylint: disable=unused-argument
return ret
def _is_testmode(**kwargs):
"""
Returns whether a test mode (noaction) operation was requested.
"""
return bool(kwargs.get("test") or __opts__.get("test"))
def _append_noaction_if_testmode(cmd, **kwargs):
"""
Adds the --noaction flag to the command if it's running in the test mode.
"""
if bool(kwargs.get("test") or __opts__.get("test")):
if _is_testmode(**kwargs):
cmd.append("--noaction")
def _build_install_command_list(cmd_prefix, to_install, to_downgrade, to_reinstall):
"""
Builds a list of install commands to be executed in sequence in order to process
each of the to_install, to_downgrade, and to_reinstall lists.
"""
cmds = []
if to_install:
cmd = copy.deepcopy(cmd_prefix)
cmd.extend(to_install)
cmds.append(cmd)
if to_downgrade:
cmd = copy.deepcopy(cmd_prefix)
cmd.append("--force-downgrade")
cmd.extend(to_downgrade)
cmds.append(cmd)
if to_reinstall:
cmd = copy.deepcopy(cmd_prefix)
cmd.append("--force-reinstall")
cmd.extend(to_reinstall)
cmds.append(cmd)
return cmds
def _parse_reported_packages_from_install_output(output):
"""
Parses the output of "opkg install" to determine what packages would have been
installed by an operation run with the --noaction flag.
We are looking for lines like:
Installing <package> (<version>) on <target>
or
Upgrading <package> from <oldVersion> to <version> on root
"""
reported_pkgs = {}
install_pattern = re.compile(
r"Installing\s(?P<package>.*?)\s\((?P<version>.*?)\)\son\s(?P<target>.*?)"
)
upgrade_pattern = re.compile(
r"Upgrading\s(?P<package>.*?)\sfrom\s(?P<oldVersion>.*?)\sto\s(?P<version>.*?)\son\s(?P<target>.*?)"
)
for line in salt.utils.itertools.split(output, "\n"):
match = install_pattern.match(line)
if match is None:
match = upgrade_pattern.match(line)
if match:
reported_pkgs[match.group("package")] = match.group("version")
return reported_pkgs
def _execute_install_command(cmd, parse_output, errors, parsed_packages):
"""
Executes a command for the install operation.
If the command fails, its error output will be appended to the errors list.
If the command succeeds and parse_output is true, updated packages will be appended
to the parsed_packages dictionary.
"""
out = __salt__["cmd.run_all"](cmd, output_loglevel="trace", python_shell=False)
if out["retcode"] != 0:
if out["stderr"]:
errors.append(out["stderr"])
else:
errors.append(out["stdout"])
elif parse_output:
parsed_packages.update(
_parse_reported_packages_from_install_output(out["stdout"])
)
def install(
name=None, refresh=False, pkgs=None, sources=None, reinstall=False, **kwargs
):
@ -440,24 +517,9 @@ def install(
# This should cause the command to fail.
to_install.append(pkgstr)
cmds = []
if to_install:
cmd = copy.deepcopy(cmd_prefix)
cmd.extend(to_install)
cmds.append(cmd)
if to_downgrade:
cmd = copy.deepcopy(cmd_prefix)
cmd.append("--force-downgrade")
cmd.extend(to_downgrade)
cmds.append(cmd)
if to_reinstall:
cmd = copy.deepcopy(cmd_prefix)
cmd.append("--force-reinstall")
cmd.extend(to_reinstall)
cmds.append(cmd)
cmds = _build_install_command_list(
cmd_prefix, to_install, to_downgrade, to_reinstall
)
if not cmds:
return {}
@ -466,16 +528,17 @@ def install(
refresh_db()
errors = []
is_testmode = _is_testmode(**kwargs)
test_packages = {}
for cmd in cmds:
out = __salt__["cmd.run_all"](cmd, output_loglevel="trace", python_shell=False)
if out["retcode"] != 0:
if out["stderr"]:
errors.append(out["stderr"])
else:
errors.append(out["stdout"])
_execute_install_command(cmd, is_testmode, errors, test_packages)
__context__.pop("pkg.list_pkgs", None)
new = list_pkgs()
if is_testmode:
new = copy.deepcopy(new)
new.update(test_packages)
ret = salt.utils.data.compare_dicts(old, new)
if pkg_type == "file" and reinstall:
@ -513,6 +576,26 @@ def install(
return ret
def _parse_reported_packages_from_remove_output(output):
"""
Parses the output of "opkg remove" to determine what packages would have been
removed by an operation run with the --noaction flag.
We are looking for lines like
Removing <package> (<version>) from <Target>...
"""
reported_pkgs = {}
remove_pattern = re.compile(
r"Removing\s(?P<package>.*?)\s\((?P<version>.*?)\)\sfrom\s(?P<target>.*?)..."
)
for line in salt.utils.itertools.split(output, "\n"):
match = remove_pattern.match(line)
if match:
reported_pkgs[match.group("package")] = ""
return reported_pkgs
def remove(name=None, pkgs=None, **kwargs): # pylint: disable=unused-argument
"""
Remove packages using ``opkg remove``.
@ -576,6 +659,9 @@ def remove(name=None, pkgs=None, **kwargs): # pylint: disable=unused-argument
__context__.pop("pkg.list_pkgs", None)
new = list_pkgs()
if _is_testmode(**kwargs):
reportedPkgs = _parse_reported_packages_from_remove_output(out["stdout"])
new = {k: v for k, v in new.items() if k not in reportedPkgs}
ret = salt.utils.data.compare_dicts(old, new)
rs_result = _get_restartcheck_result(errors)

View file

@ -6,6 +6,9 @@ Utility functions for use with or in SLS files
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import os
import textwrap
# Import Salt libs
import salt.exceptions
import salt.loader
@ -243,3 +246,184 @@ def deserialize(serializer, stream_or_string, **mod_kwargs):
"""
kwargs = salt.utils.args.clean_kwargs(**mod_kwargs)
return _get_serialize_fn(serializer, "deserialize")(stream_or_string, **kwargs)
def banner(
width=72,
commentchar="#",
borderchar="#",
blockstart=None,
blockend=None,
title=None,
text=None,
newline=False,
):
"""
Create a standardized comment block to include in a templated file.
A common technique in configuration management is to include a comment
block in managed files, warning users not to modify the file. This
function simplifies and standardizes those comment blocks.
:param width: The width, in characters, of the banner. Default is 72.
:param commentchar: The character to be used in the starting position of
each line. This value should be set to a valid line comment character
for the syntax of the file in which the banner is being inserted.
Multiple character sequences, like '//' are supported.
If the file's syntax does not support line comments (such as XML),
use the ``blockstart`` and ``blockend`` options.
:param borderchar: The character to use in the top and bottom border of
the comment box. Must be a single character.
:param blockstart: The character sequence to use at the beginning of a
block comment. Should be used in conjunction with ``blockend``
:param blockend: The character sequence to use at the end of a
block comment. Should be used in conjunction with ``blockstart``
:param title: The first field of the comment block. This field appears
centered at the top of the box.
:param text: The second filed of the comment block. This field appears
left-justifed at the bottom of the box.
:param newline: Boolean value to indicate whether the comment block should
end with a newline. Default is ``False``.
**Example 1 - the default banner:**
.. code-block:: jinja
{{ salt['slsutil.banner']() }}
.. code-block:: none
########################################################################
# #
# THIS FILE IS MANAGED BY SALT - DO NOT EDIT #
# #
# The contents of this file are managed by Salt. Any changes to this #
# file may be overwritten automatically and without warning. #
########################################################################
**Example 2 - a Javadoc-style banner:**
.. code-block:: jinja
{{ salt['slsutil.banner'](commentchar=' *', borderchar='*', blockstart='/**', blockend=' */') }}
.. code-block:: none
/**
***********************************************************************
* *
* THIS FILE IS MANAGED BY SALT - DO NOT EDIT *
* *
* The contents of this file are managed by Salt. Any changes to this *
* file may be overwritten automatically and without warning. *
***********************************************************************
*/
**Example 3 - custom text:**
.. code-block:: jinja
{{ set copyright='This file may not be copied or distributed without permission of SaltStack, Inc.' }}
{{ salt['slsutil.banner'](title='Copyright 2019 SaltStack, Inc.', text=copyright, width=60) }}
.. code-block:: none
############################################################
# #
# Copyright 2019 SaltStack, Inc. #
# #
# This file may not be copied or distributed without #
# permission of SaltStack, Inc. #
############################################################
"""
if title is None:
title = "THIS FILE IS MANAGED BY SALT - DO NOT EDIT"
if text is None:
text = (
"The contents of this file are managed by Salt. "
"Any changes to this file may be overwritten "
"automatically and without warning."
)
# Set up some typesetting variables
ledge = commentchar.rstrip()
redge = commentchar.strip()
lgutter = ledge + " "
rgutter = " " + redge
textwidth = width - len(lgutter) - len(rgutter)
# Check the width
if textwidth <= 0:
raise salt.exceptions.ArgumentValueError("Width is too small to render banner")
# Define the static elements
border_line = (
commentchar + borderchar[:1] * (width - len(ledge) - len(redge)) + redge
)
spacer_line = commentchar + " " * (width - len(commentchar) * 2) + commentchar
# Create the banner
wrapper = textwrap.TextWrapper(width=textwidth)
block = list()
if blockstart is not None:
block.append(blockstart)
block.append(border_line)
block.append(spacer_line)
for line in wrapper.wrap(title):
block.append(lgutter + line.center(textwidth) + rgutter)
block.append(spacer_line)
for line in wrapper.wrap(text):
block.append(lgutter + line + " " * (textwidth - len(line)) + rgutter)
block.append(border_line)
if blockend is not None:
block.append(blockend)
# Convert list to multi-line string
result = os.linesep.join(block)
# Add a newline character to the end of the banner
if newline:
return result + os.linesep
return result
def boolstr(value, true="true", false="false"):
"""
Convert a boolean value into a string. This function is
intended to be used from within file templates to provide
an easy way to take boolean values stored in Pillars or
Grains, and write them out in the apprpriate syntax for
a particular file template.
:param value: The boolean value to be converted
:param true: The value to return if ``value`` is ``True``
:param false: The value to return if ``value`` is ``False``
In this example, a pillar named ``smtp:encrypted`` stores a boolean
value, but the template that uses that value needs ``yes`` or ``no``
to be written, based on the boolean value.
*Note: this is written on two lines for clarity. The same result
could be achieved in one line.*
.. code-block:: jinja
{% set encrypted = salt[pillar.get]('smtp:encrypted', false) %}
use_tls: {{ salt['slsutil.boolstr'](encrypted, 'yes', 'no') }}
Result (assuming the value is ``True``):
.. code-block:: none
use_tls: yes
"""
if value:
return true
return false

View file

@ -209,24 +209,22 @@ def get_zone():
Returns:
str: Timezone in unix format
Raises:
CommandExecutionError: If timezone could not be gathered
CLI Example:
.. code-block:: bash
salt '*' timezone.get_zone
"""
win_zone = __utils__["reg.read_value"](
hive="HKLM",
key="SYSTEM\\CurrentControlSet\\Control\\TimeZoneInformation",
vname="TimeZoneKeyName",
)["vdata"]
# Some data may have null characters. We only need the first portion up to
# the first null character. See the following:
# https://github.com/saltstack/salt/issues/51940
# https://stackoverflow.com/questions/27716746/hklm-system-currentcontrolset-control-timezoneinformation-timezonekeyname-corrup
if "\0" in win_zone:
win_zone = win_zone.split("\0")[0]
return mapper.get_unix(win_zone.lower(), "Unknown")
cmd = ["tzutil", "/g"]
res = __salt__["cmd.run_all"](cmd, python_shell=False)
if res["retcode"] or not res["stdout"]:
raise CommandExecutionError(
"tzutil encountered an error getting timezone", info=res
)
return mapper.get_unix(res["stdout"].lower(), "Unknown")
def get_offset():

View file

@ -233,7 +233,9 @@ def topic_present(
subscribe += [sub]
for sub in current_subs:
minimal = {"Protocol": sub["Protocol"], "Endpoint": sub["Endpoint"]}
if minimal not in obfuscated_subs:
if minimal not in obfuscated_subs and sub["SubscriptionArn"].startswith(
"arn:aws:sns:"
):
unsubscribe += [sub["SubscriptionArn"]]
for sub in subscribe:
prot = sub["Protocol"]

View file

@ -429,23 +429,20 @@ def _function_config_present(
func = __salt__["boto_lambda.describe_function"](
FunctionName, region=region, key=key, keyid=keyid, profile=profile
)["function"]
# pylint: disable=possibly-unused-variable
role_arn = _get_role_arn(Role, region, key, keyid, profile)
# pylint: enable=possibly-unused-variable
need_update = False
options = {
"Role": "role_arn",
"Handler": "Handler",
"Description": "Description",
"Timeout": "Timeout",
"MemorySize": "MemorySize",
"Role": _get_role_arn(Role, region, key, keyid, profile),
"Handler": Handler,
"Description": Description,
"Timeout": Timeout,
"MemorySize": MemorySize,
}
for val, var in six.iteritems(options):
if func[val] != locals()[var]:
for key, val in six.iteritems(options):
if func[key] != val:
need_update = True
ret["changes"].setdefault("new", {})[var] = locals()[var]
ret["changes"].setdefault("old", {})[var] = func[val]
ret["changes"].setdefault("old", {})[key] = func[key]
ret["changes"].setdefault("new", {})[key] = val
# VpcConfig returns the extra value 'VpcId' so do a special compare
oldval = func.get("VpcConfig")
if oldval is not None:
@ -508,6 +505,13 @@ def _function_code_present(
)["function"]
update = False
if ZipFile:
if "://" in ZipFile: # Looks like a remote URL to me...
dlZipFile = __salt__["cp.cache_file"](path=ZipFile)
if dlZipFile is False:
ret["result"] = False
ret["comment"] = "Failed to cache ZipFile `{0}`.".format(ZipFile)
return ret
ZipFile = dlZipFile
size = os.path.getsize(ZipFile)
if size == func["CodeSize"]:
sha = hashlib.sha256()
@ -787,13 +791,13 @@ def alias_present(
)["alias"]
need_update = False
options = {"FunctionVersion": "FunctionVersion", "Description": "Description"}
options = {"FunctionVersion": FunctionVersion, "Description": Description}
for val, var in six.iteritems(options):
if _describe[val] != locals()[var]:
for key, val in six.iteritems(options):
if _describe[key] != val:
need_update = True
ret["changes"].setdefault("new", {})[var] = locals()[var]
ret["changes"].setdefault("old", {})[var] = _describe[val]
ret["changes"].setdefault("old", {})[key] = _describe[key]
ret["changes"].setdefault("new", {})[key] = val
if need_update:
ret["comment"] = os.linesep.join(
[ret["comment"], "Alias config to be modified"]
@ -1026,13 +1030,13 @@ def event_source_mapping_present(
)["event_source_mapping"]
need_update = False
options = {"BatchSize": "BatchSize"}
options = {"BatchSize": BatchSize}
for val, var in six.iteritems(options):
if _describe[val] != locals()[var]:
for key, val in six.iteritems(options):
if _describe[key] != val:
need_update = True
ret["changes"].setdefault("new", {})[var] = locals()[var]
ret["changes"].setdefault("old", {})[var] = _describe[val]
ret["changes"].setdefault("old", {})[key] = _describe[key]
ret["changes"].setdefault("new", {})[key] = val
# verify FunctionName against FunctionArn
function_arn = _get_function_arn(
FunctionName, region=region, key=key, keyid=keyid, profile=profile

View file

@ -47,7 +47,6 @@ try:
UserPassCredentials,
ServicePrincipalCredentials,
)
from msrestazure.azure_active_directory import MSIAuthentication
from msrestazure.azure_cloud import (
MetadataEndpointError,
get_cloud_from_metadata_endpoint,
@ -123,7 +122,14 @@ def _determine_auth(**kwargs):
kwargs["username"], kwargs["password"], cloud_environment=cloud_env
)
elif "subscription_id" in kwargs:
credentials = MSIAuthentication(cloud_environment=cloud_env)
try:
from msrestazure.azure_active_directory import MSIAuthentication
credentials = MSIAuthentication(cloud_environment=cloud_env)
except ImportError:
raise SaltSystemExit(
msg="MSI authentication support not availabe (requires msrestazure >= 0.4.14)"
)
else:
raise SaltInvocationError(
@ -161,7 +167,7 @@ def get_client(client_type, **kwargs):
if client_type not in client_map:
raise SaltSystemExit(
"The Azure ARM client_type {0} specified can not be found.".format(
msg="The Azure ARM client_type {0} specified can not be found.".format(
client_type
)
)

View file

@ -670,6 +670,11 @@ def symmetric_difference(lst1, lst2):
)
@jinja_filter("method_call")
def method_call(obj, f_name, *f_args, **f_kwargs):
return getattr(obj, f_name, lambda *args, **kwargs: None)(*f_args, **f_kwargs)
@jinja2.contextfunction
def show_full_context(ctx):
return salt.utils.data.simple_types_filter(

View file

@ -3,44 +3,43 @@
Integration tests for the vault execution module
"""
# Import Python Libs
from __future__ import absolute_import, print_function, unicode_literals
import inspect
import logging
import time
# Import Salt Libs
import salt.utils.path
from tests.support.case import ModuleCase
from tests.support.helpers import destructiveTest
from tests.support.paths import FILES
# Import Salt Testing Libs
from tests.support.unit import skipIf
from tests.support.runtests import RUNTIME_VARS
from tests.support.sminion import create_sminion
from tests.support.unit import SkipTest, skipIf
log = logging.getLogger(__name__)
VAULT_BINARY_PATH = salt.utils.path.which("vault")
@destructiveTest
@skipIf(not salt.utils.path.which("dockerd"), "Docker not installed")
@skipIf(not salt.utils.path.which("vault"), "Vault not installed")
@skipIf(not VAULT_BINARY_PATH, "Vault not installed")
class VaultTestCase(ModuleCase):
"""
Test vault module
"""
count = 0
def setUp(self):
"""
SetUp vault container
"""
if self.count == 0:
config = '{"backend": {"file": {"path": "/vault/file"}}, "default_lease_ttl": "168h", "max_lease_ttl": "720h", "disable_mlock": true}'
self.run_state("docker_image.present", name="vault", tag="0.9.6")
self.run_state(
"docker_container.running",
@classmethod
def setUpClass(cls):
cls.sminion = sminion = create_sminion()
config = '{"backend": {"file": {"path": "/vault/file"}}, "default_lease_ttl": "168h", "max_lease_ttl": "720h", "disable_mlock": true}'
sminion.states.docker_image.present(name="vault", tag="0.9.6")
login_attempts = 1
container_created = False
while True:
if container_created:
sminion.states.docker_container.stopped(name="vault")
sminion.states.docker_container.absent(name="vault")
ret = sminion.states.docker_container.running(
name="vault",
image="vault:0.9.6",
port_bindings="8200:8200",
@ -49,38 +48,37 @@ class VaultTestCase(ModuleCase):
"VAULT_LOCAL_CONFIG": config,
},
)
log.debug("docker_container.running return: %s", ret)
container_created = ret["result"]
time.sleep(5)
ret = self.run_function(
"cmd.retcode",
cmd="/usr/local/bin/vault login token=testsecret",
ret = sminion.functions.cmd.run_all(
cmd="{} login token=testsecret".format(VAULT_BINARY_PATH),
env={"VAULT_ADDR": "http://127.0.0.1:8200"},
hide_output=False,
)
if ret != 0:
self.skipTest("unable to login to vault")
ret = self.run_function(
"cmd.retcode",
cmd="/usr/local/bin/vault policy write testpolicy {0}/vault.hcl".format(
FILES
),
env={"VAULT_ADDR": "http://127.0.0.1:8200"},
)
if ret != 0:
self.skipTest("unable to assign policy to vault")
self.count += 1
if ret["retcode"] == 0:
break
log.debug("Vault login failed. Return: %s", ret)
login_attempts += 1
def tearDown(self):
"""
TearDown vault container
"""
if login_attempts >= 3:
raise SkipTest("unable to login to vault")
def count_tests(funcobj):
return inspect.ismethod(funcobj) and funcobj.__name__.startswith("test_")
ret = sminion.functions.cmd.retcode(
cmd="{} policy write testpolicy {}/vault.hcl".format(
VAULT_BINARY_PATH, RUNTIME_VARS.FILES
),
env={"VAULT_ADDR": "http://127.0.0.1:8200"},
)
if ret != 0:
raise SkipTest("unable to assign policy to vault")
numtests = len(inspect.getmembers(VaultTestCase, predicate=count_tests))
if self.count >= numtests:
self.run_state("docker_container.stopped", name="vault")
self.run_state("docker_container.absent", name="vault")
self.run_state("docker_image.absent", name="vault", force=True)
@classmethod
def tearDownClass(cls):
cls.sminion.states.docker_container.stopped(name="vault")
cls.sminion.states.docker_container.absent(name="vault")
cls.sminion.states.docker_image.absent(name="vault", force=True)
cls.sminion = None
@skipIf(True, "SLOWTEST skip")
def test_write_read_secret(self):
@ -151,17 +149,18 @@ class VaultTestCaseCurrent(ModuleCase):
Test vault module against current vault
"""
count = 0
def setUp(self):
"""
SetUp vault container
"""
if self.count == 0:
config = '{"backend": {"file": {"path": "/vault/file"}}, "default_lease_ttl": "168h", "max_lease_ttl": "720h", "disable_mlock": true}'
self.run_state("docker_image.present", name="vault", tag="1.3.1")
self.run_state(
"docker_container.running",
@classmethod
def setUpClass(cls):
cls.sminion = sminion = create_sminion()
config = '{"backend": {"file": {"path": "/vault/file"}}, "default_lease_ttl": "168h", "max_lease_ttl": "720h", "disable_mlock": true}'
sminion.states.docker_image.present(name="vault", tag="1.3.1")
login_attempts = 1
container_created = False
while True:
if container_created:
sminion.states.docker_container.stopped(name="vault")
sminion.states.docker_container.absent(name="vault")
ret = sminion.states.docker_container.running(
name="vault",
image="vault:1.3.1",
port_bindings="8200:8200",
@ -170,38 +169,37 @@ class VaultTestCaseCurrent(ModuleCase):
"VAULT_LOCAL_CONFIG": config,
},
)
log.debug("docker_container.running return: %s", ret)
container_created = ret["result"]
time.sleep(5)
ret = self.run_function(
"cmd.retcode",
cmd="/usr/local/bin/vault login token=testsecret",
ret = sminion.functions.cmd.run_all(
cmd="{} login token=testsecret".format(VAULT_BINARY_PATH),
env={"VAULT_ADDR": "http://127.0.0.1:8200"},
hide_output=False,
)
if ret != 0:
self.skipTest("unable to login to vault")
ret = self.run_function(
"cmd.retcode",
cmd="/usr/local/bin/vault policy write testpolicy {0}/vault.hcl".format(
FILES
),
env={"VAULT_ADDR": "http://127.0.0.1:8200"},
)
if ret != 0:
self.skipTest("unable to assign policy to vault")
self.count += 1
if ret["retcode"] == 0:
break
log.debug("Vault login failed. Return: %s", ret)
login_attempts += 1
def tearDown(self):
"""
TearDown vault container
"""
if login_attempts >= 3:
raise SkipTest("unable to login to vault")
def count_tests(funcobj):
return inspect.ismethod(funcobj) and funcobj.__name__.startswith("test_")
ret = sminion.functions.cmd.retcode(
cmd="{} policy write testpolicy {}/vault.hcl".format(
VAULT_BINARY_PATH, RUNTIME_VARS.FILES
),
env={"VAULT_ADDR": "http://127.0.0.1:8200"},
)
if ret != 0:
raise SkipTest("unable to assign policy to vault")
numtests = len(inspect.getmembers(VaultTestCaseCurrent, predicate=count_tests))
if self.count >= numtests:
self.run_state("docker_container.stopped", name="vault")
self.run_state("docker_container.absent", name="vault")
self.run_state("docker_image.absent", name="vault", force=True)
@classmethod
def tearDownClass(cls):
cls.sminion.states.docker_container.stopped(name="vault")
cls.sminion.states.docker_container.absent(name="vault")
cls.sminion.states.docker_image.absent(name="vault", force=True)
cls.sminion = None
@skipIf(True, "SLOWTEST skip")
def test_write_read_secret_kv2(self):

View file

@ -917,6 +917,14 @@ class ModuleCase(TestCase, SaltClientTestCaseMixin):
if "f_timeout" in kwargs:
kwargs["timeout"] = kwargs.pop("f_timeout")
client = self.client if master_tgt is None else self.clients[master_tgt]
log.debug(
"Running client.cmd(minion_tgt=%r, function=%r, arg=%r, timeout=%r, kwarg=%r)",
minion_tgt,
function,
arg,
timeout,
kwargs,
)
orig = client.cmd(minion_tgt, function, arg, timeout=timeout, kwarg=kwargs)
if RUNTIME_VARS.PYTEST_SESSION:

View file

@ -82,6 +82,31 @@ class SaltifyTestCase(TestCase, LoaderModuleMockMixin):
mock_cmd.assert_called_once_with(vm_, ANY)
self.assertTrue(result)
def test_create_no_ssh_host(self):
"""
Test that ssh_host is set to the vm name if not defined
"""
mock_cmd = MagicMock(return_value=True)
with patch.dict(
"salt.cloud.clouds.saltify.__utils__", {"cloud.bootstrap": mock_cmd}
):
vm_ = {
"deploy": True,
"driver": "saltify",
"name": "new2",
"profile": "testprofile2",
}
result = saltify.create(vm_)
mock_cmd.assert_called_once_with(vm_, ANY)
assert result
# Make sure that ssh_host was added to the vm. Note that this is
# done in two asserts so that the failure is more explicit about
# what is wrong. If ssh_host wasn't inserted in the vm_ dict, the
# failure would be a KeyError, which would be harder to
# troubleshoot.
assert "ssh_host" in vm_
assert vm_["ssh_host"] == "new2"
def test_create_wake_on_lan(self):
"""
Test if wake on lan works

View file

@ -105,7 +105,8 @@ class AutoKeyTest(TestCase):
@patch_check_permissions()
def test_check_permissions_group_can_write_not_permissive(self):
"""
Assert that a file is accepted, when group can write to it and perkissive_pki_access=False
Assert that a file is accepted, when group can write to it and
permissive_pki_access=False
"""
self.stats["testfile"] = {"mode": gen_permissions("w", "w", ""), "gid": 1}
if salt.utils.platform.is_windows():
@ -116,7 +117,8 @@ class AutoKeyTest(TestCase):
@patch_check_permissions(permissive_pki=True)
def test_check_permissions_group_can_write_permissive(self):
"""
Assert that a file is accepted, when group can write to it and perkissive_pki_access=True
Assert that a file is accepted, when group can write to it and
permissive_pki_access=True
"""
self.stats["testfile"] = {"mode": gen_permissions("w", "w", ""), "gid": 1}
self.assertTrue(self.auto_key.check_permissions("testfile"))
@ -124,8 +126,8 @@ class AutoKeyTest(TestCase):
@patch_check_permissions(uid=0, permissive_pki=True)
def test_check_permissions_group_can_write_permissive_root_in_group(self):
"""
Assert that a file is accepted, when group can write to it, perkissive_pki_access=False,
salt is root and in the file owning group
Assert that a file is accepted, when group can write to it,
permissive_pki_access=False, salt is root and in the file owning group
"""
self.stats["testfile"] = {"mode": gen_permissions("w", "w", ""), "gid": 0}
self.assertTrue(self.auto_key.check_permissions("testfile"))
@ -133,8 +135,9 @@ class AutoKeyTest(TestCase):
@patch_check_permissions(uid=0, permissive_pki=True)
def test_check_permissions_group_can_write_permissive_root_not_in_group(self):
"""
Assert that no file is accepted, when group can write to it, perkissive_pki_access=False,
salt is root and **not** in the file owning group
Assert that no file is accepted, when group can write to it,
permissive_pki_access=False, salt is root and **not** in the file owning
group
"""
self.stats["testfile"] = {"mode": gen_permissions("w", "w", ""), "gid": 1}
if salt.utils.platform.is_windows():

View file

@ -3,58 +3,55 @@
:synopsis: Unit Tests for Package Management module 'module.opkg'
:platform: Linux
"""
# pylint: disable=import-error,3rd-party-module-not-gated
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import collections
import copy
import salt.modules.opkg as opkg
# Import Salt Libs
from salt.ext import six
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.mock import MagicMock, patch
from tests.support.unit import TestCase
# pylint: disable=import-error,3rd-party-module-not-gated
OPKG_VIM_INFO = {
"vim": {
"Package": "vim",
"Version": "7.4.769-r0.31",
"Status": "install ok installed",
}
}
OPKG_VIM_FILES = {
"errors": [],
"packages": {
"vim": [
"/usr/bin/view",
"/usr/bin/vim.vim",
"/usr/bin/xxd",
"/usr/bin/vimdiff",
"/usr/bin/rview",
"/usr/bin/rvim",
"/usr/bin/ex",
]
},
}
INSTALLED = {"vim": {"new": "7.4", "old": six.text_type()}}
REMOVED = {"vim": {"new": six.text_type(), "old": "7.4"}}
PACKAGES = {"vim": "7.4"}
class OpkgTestCase(TestCase, LoaderModuleMockMixin):
"""
Test cases for salt.modules.opkg
"""
@classmethod
def setUpClass(cls):
cls.opkg_vim_info = {
"vim": {
"Package": "vim",
"Version": "7.4.769-r0.31",
"Status": "install ok installed",
}
}
cls.opkg_vim_files = {
"errors": [],
"packages": {
"vim": [
"/usr/bin/view",
"/usr/bin/vim.vim",
"/usr/bin/xxd",
"/usr/bin/vimdiff",
"/usr/bin/rview",
"/usr/bin/rvim",
"/usr/bin/ex",
]
},
}
cls.installed = {"vim": {"new": "7.4", "old": ""}}
cls.removed = {"vim": {"new": "", "old": "7.4"}}
cls.packages = {"vim": "7.4"}
@classmethod
def tearDownClass(cls):
cls.opkg_vim_info = (
cls.opkg_vim_files
) = cls.installed = cls.removed = cls.packages = None
def setup_loader_modules(self): # pylint: disable=no-self-use
"""
Tested modules
@ -66,7 +63,7 @@ class OpkgTestCase(TestCase, LoaderModuleMockMixin):
Test - Returns a string representing the package version or an empty string if
not installed.
"""
version = OPKG_VIM_INFO["vim"]["Version"]
version = self.opkg_vim_info["vim"]["Version"]
mock = MagicMock(return_value=version)
with patch.dict(opkg.__salt__, {"pkg_resource.version": mock}):
self.assertEqual(opkg.version(*["vim"]), version)
@ -82,22 +79,22 @@ class OpkgTestCase(TestCase, LoaderModuleMockMixin):
"""
Test - List the files that belong to a package, grouped by package.
"""
std_out = "\n".join(OPKG_VIM_FILES["packages"]["vim"])
std_out = "\n".join(self.opkg_vim_files["packages"]["vim"])
ret_value = {"stdout": std_out}
mock = MagicMock(return_value=ret_value)
with patch.dict(opkg.__salt__, {"cmd.run_all": mock}):
self.assertEqual(opkg.file_dict("vim"), OPKG_VIM_FILES)
self.assertEqual(opkg.file_dict("vim"), self.opkg_vim_files)
def test_file_list(self):
"""
Test - List the files that belong to a package.
"""
std_out = "\n".join(OPKG_VIM_FILES["packages"]["vim"])
std_out = "\n".join(self.opkg_vim_files["packages"]["vim"])
ret_value = {"stdout": std_out}
mock = MagicMock(return_value=ret_value)
files = {
"errors": OPKG_VIM_FILES["errors"],
"files": OPKG_VIM_FILES["packages"]["vim"],
"errors": self.opkg_vim_files["errors"],
"files": self.opkg_vim_files["packages"]["vim"],
}
with patch.dict(opkg.__salt__, {"cmd.run_all": mock}):
self.assertEqual(opkg.file_list("vim"), files)
@ -116,7 +113,7 @@ class OpkgTestCase(TestCase, LoaderModuleMockMixin):
Test - Install packages.
"""
with patch(
"salt.modules.opkg.list_pkgs", MagicMock(side_effect=[{}, PACKAGES])
"salt.modules.opkg.list_pkgs", MagicMock(side_effect=[{}, self.packages])
):
ret_value = {"retcode": 0}
mock = MagicMock(return_value=ret_value)
@ -132,14 +129,15 @@ class OpkgTestCase(TestCase, LoaderModuleMockMixin):
}
}
with patch.multiple(opkg, **patch_kwargs):
self.assertEqual(opkg.install("vim:7.4"), INSTALLED)
self.assertEqual(opkg.install("vim:7.4"), self.installed)
def test_install_noaction(self):
"""
Test - Install packages.
"""
with patch("salt.modules.opkg.list_pkgs", MagicMock(return_value=({}))):
ret_value = {"retcode": 0}
with patch("salt.modules.opkg.list_pkgs", MagicMock(side_effect=({}, {}))):
std_out = "Downloading http://feedserver/feeds/test/vim_7.4_arch.ipk.\n\nInstalling vim (7.4) on root\n"
ret_value = {"retcode": 0, "stdout": std_out}
mock = MagicMock(return_value=ret_value)
patch_kwargs = {
"__salt__": {
@ -153,14 +151,14 @@ class OpkgTestCase(TestCase, LoaderModuleMockMixin):
}
}
with patch.multiple(opkg, **patch_kwargs):
self.assertEqual(opkg.install("vim:7.4", test=True), {})
self.assertEqual(opkg.install("vim:7.4", test=True), self.installed)
def test_remove(self):
"""
Test - Remove packages.
"""
with patch(
"salt.modules.opkg.list_pkgs", MagicMock(side_effect=[PACKAGES, {}])
"salt.modules.opkg.list_pkgs", MagicMock(side_effect=[self.packages, {}])
):
ret_value = {"retcode": 0}
mock = MagicMock(return_value=ret_value)
@ -176,14 +174,18 @@ class OpkgTestCase(TestCase, LoaderModuleMockMixin):
}
}
with patch.multiple(opkg, **patch_kwargs):
self.assertEqual(opkg.remove("vim"), REMOVED)
self.assertEqual(opkg.remove("vim"), self.removed)
def test_remove_noaction(self):
"""
Test - Remove packages.
"""
with patch("salt.modules.opkg.list_pkgs", MagicMock(return_value=({}))):
ret_value = {"retcode": 0}
with patch(
"salt.modules.opkg.list_pkgs",
MagicMock(side_effect=[self.packages, self.packages]),
):
std_out = "\nRemoving vim (7.4) from root...\n"
ret_value = {"retcode": 0, "stdout": std_out}
mock = MagicMock(return_value=ret_value)
patch_kwargs = {
"__salt__": {
@ -197,17 +199,19 @@ class OpkgTestCase(TestCase, LoaderModuleMockMixin):
}
}
with patch.multiple(opkg, **patch_kwargs):
self.assertEqual(opkg.remove("vim:7.4", test=True), {})
self.assertEqual(opkg.remove("vim:7.4", test=True), self.removed)
def test_info_installed(self):
"""
Test - Return the information of the named package(s) installed on the system.
"""
installed = copy.deepcopy(OPKG_VIM_INFO["vim"])
installed = copy.deepcopy(self.opkg_vim_info["vim"])
del installed["Package"]
ordered_info = collections.OrderedDict(sorted(installed.items()))
expected_dict = {"vim": {k.lower(): v for k, v in ordered_info.items()}}
std_out = "\n".join([k + ": " + v for k, v in OPKG_VIM_INFO["vim"].items()])
std_out = "\n".join(
[k + ": " + v for k, v in self.opkg_vim_info["vim"].items()]
)
ret_value = {"stdout": std_out, "retcode": 0}
mock = MagicMock(return_value=ret_value)
with patch.dict(opkg.__salt__, {"cmd.run_all": mock}):

View file

@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import logging
import salt.modules.slsutil as slsutil
from tests.support.unit import TestCase
log = logging.getLogger(__name__)
class SlsUtilTestCase(TestCase):
"""
Test cases for salt.modules.slsutil
"""
def test_banner(self):
"""
Test banner function
"""
self.check_banner()
self.check_banner(width=81)
self.check_banner(width=20)
self.check_banner(commentchar="//", borderchar="-")
self.check_banner(title="title here", text="text here")
self.check_banner(commentchar=" *")
def check_banner(
self,
width=72,
commentchar="#",
borderchar="#",
blockstart=None,
blockend=None,
title=None,
text=None,
newline=True,
):
result = slsutil.banner(
width=width,
commentchar=commentchar,
borderchar=borderchar,
blockstart=blockstart,
blockend=blockend,
title=title,
text=text,
newline=newline,
).splitlines()
for line in result:
self.assertEqual(len(line), width)
self.assertTrue(line.startswith(commentchar))
self.assertTrue(line.endswith(commentchar.strip()))
def test_boolstr(self):
"""
Test boolstr function
"""
self.assertEqual("yes", slsutil.boolstr(True, true="yes", false="no"))
self.assertEqual("no", slsutil.boolstr(False, true="yes", false="no"))

View file

@ -7,6 +7,7 @@ from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Libs
import salt.modules.win_timezone as win_timezone
from salt.exceptions import CommandExecutionError
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
@ -23,37 +24,45 @@ class WinTimezoneTestCase(TestCase, LoaderModuleMockMixin):
def setup_loader_modules(self):
return {win_timezone: {}}
# 'get_zone' function tests: 3
def test_get_zone(self):
def test_get_zone_normal(self):
"""
Test if it gets current timezone (i.e. Asia/Calcutta)
Test if it get current timezone (i.e. Asia/Calcutta)
"""
mock_read = MagicMock(
side_effect=[
{"vdata": "India Standard Time"},
{"vdata": "Indian Standard Time"},
]
mock_read_ok = MagicMock(
return_value={
"pid": 78,
"retcode": 0,
"stderr": "",
"stdout": "India Standard Time",
}
)
with patch.dict(win_timezone.__utils__, {"reg.read_value": mock_read}):
with patch.dict(win_timezone.__salt__, {"cmd.run_all": mock_read_ok}):
self.assertEqual(win_timezone.get_zone(), "Asia/Calcutta")
def test_get_zone_unknown(self):
"""
Test get_zone with unknown timezone (i.e. Indian Standard Time)
"""
mock_read_error = MagicMock(
return_value={
"pid": 78,
"retcode": 0,
"stderr": "",
"stdout": "Indian Standard Time",
}
)
with patch.dict(win_timezone.__salt__, {"cmd.run_all": mock_read_error}):
self.assertEqual(win_timezone.get_zone(), "Unknown")
def test_get_zone_null_terminated(self):
def test_get_zone_error(self):
"""
Test if it handles instances where the registry contains null values
Test get_zone when it encounters an error
"""
mock_read = MagicMock(
side_effect=[
{"vdata": "India Standard Time\0\0\0\0"},
{"vdata": "Indian Standard Time\0\0some more junk data\0\0"},
]
mock_read_fatal = MagicMock(
return_value={"pid": 78, "retcode": 1, "stderr": "", "stdout": ""}
)
with patch.dict(win_timezone.__utils__, {"reg.read_value": mock_read}):
self.assertEqual(win_timezone.get_zone(), "Asia/Calcutta")
self.assertEqual(win_timezone.get_zone(), "Unknown")
with patch.dict(win_timezone.__salt__, {"cmd.run_all": mock_read_fatal}):
self.assertRaises(CommandExecutionError, win_timezone.get_zone)
# 'get_offset' function tests: 1
@ -61,9 +70,16 @@ class WinTimezoneTestCase(TestCase, LoaderModuleMockMixin):
"""
Test if it get current numeric timezone offset from UCT (i.e. +0530)
"""
mock_read = MagicMock(return_value={"vdata": "India Standard Time"})
mock_read = MagicMock(
return_value={
"pid": 78,
"retcode": 0,
"stderr": "",
"stdout": "India Standard Time",
}
)
with patch.dict(win_timezone.__utils__, {"reg.read_value": mock_read}):
with patch.dict(win_timezone.__salt__, {"cmd.run_all": mock_read}):
self.assertEqual(win_timezone.get_offset(), "+0530")
# 'get_zonecode' function tests: 1
@ -72,9 +88,16 @@ class WinTimezoneTestCase(TestCase, LoaderModuleMockMixin):
"""
Test if it get current timezone (i.e. PST, MDT, etc)
"""
mock_read = MagicMock(return_value={"vdata": "India Standard Time"})
mock_read = MagicMock(
return_value={
"pid": 78,
"retcode": 0,
"stderr": "",
"stdout": "India Standard Time",
}
)
with patch.dict(win_timezone.__utils__, {"reg.read_value": mock_read}):
with patch.dict(win_timezone.__salt__, {"cmd.run_all": mock_read}):
self.assertEqual(win_timezone.get_zonecode(), "IST")
# 'set_zone' function tests: 1
@ -83,13 +106,20 @@ class WinTimezoneTestCase(TestCase, LoaderModuleMockMixin):
"""
Test if it unlinks, then symlinks /etc/localtime to the set timezone.
"""
mock_cmd = MagicMock(
mock_write = MagicMock(
return_value={"pid": 78, "retcode": 0, "stderr": "", "stdout": ""}
)
mock_read = MagicMock(return_value={"vdata": "India Standard Time"})
mock_read = MagicMock(
return_value={
"pid": 78,
"retcode": 0,
"stderr": "",
"stdout": "India Standard Time",
}
)
with patch.dict(win_timezone.__salt__, {"cmd.run_all": mock_cmd}), patch.dict(
win_timezone.__utils__, {"reg.read_value": mock_read}
with patch.dict(win_timezone.__salt__, {"cmd.run_all": mock_write}), patch.dict(
win_timezone.__salt__, {"cmd.run_all": mock_read}
):
self.assertTrue(win_timezone.set_zone("Asia/Calcutta"))
@ -102,9 +132,16 @@ class WinTimezoneTestCase(TestCase, LoaderModuleMockMixin):
the one set in /etc/localtime. Returns True if they match,
and False if not. Mostly useful for running state checks.
"""
mock_read = MagicMock(return_value={"vdata": "India Standard Time"})
mock_read = MagicMock(
return_value={
"pid": 78,
"retcode": 0,
"stderr": "",
"stdout": "India Standard Time",
}
)
with patch.dict(win_timezone.__utils__, {"reg.read_value": mock_read}):
with patch.dict(win_timezone.__salt__, {"cmd.run_all": mock_read}):
self.assertTrue(win_timezone.zone_compare("Asia/Calcutta"))
# 'get_hwclock' function tests: 1

View file

@ -2,7 +2,6 @@
"""
Tests for salt.utils.jinja
"""
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import ast
@ -13,7 +12,6 @@ import pprint
import re
import tempfile
# Import Salt libs
import salt.config
import salt.loader
@ -39,12 +37,9 @@ from salt.utils.templates import JINJA, render_jinja_tmpl
from tests.support.case import ModuleCase
from tests.support.helpers import flaky
from tests.support.mock import MagicMock, Mock, patch
# Import Salt Testing libs
from tests.support.runtests import RUNTIME_VARS
from tests.support.unit import TestCase, skipIf
# Import 3rd party libs
try:
import timelib # pylint: disable=W0611
@ -127,6 +122,7 @@ class TestSaltCacheLoader(TestCase):
def tearDown(self):
salt.utils.files.rm_rf(self.tempdir)
self.tempdir = self.template_dir = self.opts
def test_searchpath(self):
"""
@ -284,6 +280,7 @@ class TestGetTemplate(TestCase):
def tearDown(self):
salt.utils.files.rm_rf(self.tempdir)
self.tempdir = self.template_dir = self.local_opts = self.local_salt = None
def test_fallback(self):
"""
@ -559,19 +556,6 @@ class TestGetTemplate(TestCase):
dict(opts=self.local_opts, saltenv="test", salt=self.local_salt),
)
@skipIf(six.PY3, "Not applicable to Python 3")
def test_render_with_unicode_syntax_error(self):
with patch.object(builtins, "__salt_system_encoding__", "utf-8"):
template = "hello\n\n{{ bad\n\nfoo한"
expected = r".*---\nhello\n\n{{ bad\n\nfoo\xed\x95\x9c <======================\n---"
self.assertRaisesRegex(
SaltRenderError,
expected,
render_jinja_tmpl,
template,
dict(opts=self.local_opts, saltenv="test", salt=self.local_salt),
)
def test_render_with_utf8_syntax_error(self):
with patch.object(builtins, "__salt_system_encoding__", "utf-8"):
template = "hello\n\n{{ bad\n\nfoo한"
@ -621,9 +605,9 @@ class TestGetTemplate(TestCase):
class TestJinjaDefaultOptions(TestCase):
def __init__(self, *args, **kws):
TestCase.__init__(self, *args, **kws)
self.local_opts = {
@classmethod
def setUpClass(cls):
cls.local_opts = {
"cachedir": os.path.join(RUNTIME_VARS.TMP, "jinja-template-cache"),
"file_buffer_size": 1048576,
"file_client": "local",
@ -642,11 +626,15 @@ class TestJinjaDefaultOptions(TestCase):
),
"jinja_env": {"line_comment_prefix": "##", "line_statement_prefix": "%"},
}
self.local_salt = {
cls.local_salt = {
"myvar": "zero",
"mylist": [0, 1, 2, 3],
}
@classmethod
def tearDownClass(cls):
cls.local_opts = cls.local_salt = None
def test_comment_prefix(self):
template = """
@ -681,9 +669,9 @@ class TestJinjaDefaultOptions(TestCase):
class TestCustomExtensions(TestCase):
def __init__(self, *args, **kws):
super(TestCustomExtensions, self).__init__(*args, **kws)
self.local_opts = {
@classmethod
def setUpClass(cls):
cls.local_opts = {
"cachedir": os.path.join(RUNTIME_VARS.TMP, "jinja-template-cache"),
"file_buffer_size": 1048576,
"file_client": "local",
@ -701,7 +689,7 @@ class TestCustomExtensions(TestCase):
os.path.dirname(os.path.abspath(__file__)), "extmods"
),
}
self.local_salt = {
cls.local_salt = {
# 'dns.A': dnsutil.A,
# 'dns.AAAA': dnsutil.AAAA,
# 'file.exists': filemod.file_exists,
@ -709,6 +697,10 @@ class TestCustomExtensions(TestCase):
# 'file.dirname': filemod.dirname
}
@classmethod
def tearDownClass(cls):
cls.local_opts = cls.local_salt = None
def test_regex_escape(self):
dataset = "foo?:.*/\\bar"
env = Environment(extensions=[SerializerExtension])
@ -721,51 +713,39 @@ class TestCustomExtensions(TestCase):
unique = set(dataset)
env = Environment(extensions=[SerializerExtension])
env.filters.update(JinjaFilter.salt_jinja_filters)
if six.PY3:
rendered = (
env.from_string("{{ dataset|unique }}")
.render(dataset=dataset)
.strip("'{}")
.split("', '")
)
self.assertEqual(sorted(rendered), sorted(list(unique)))
else:
rendered = env.from_string("{{ dataset|unique }}").render(dataset=dataset)
self.assertEqual(rendered, "{0}".format(unique))
rendered = (
env.from_string("{{ dataset|unique }}")
.render(dataset=dataset)
.strip("'{}")
.split("', '")
)
self.assertEqual(sorted(rendered), sorted(list(unique)))
def test_unique_tuple(self):
dataset = ("foo", "foo", "bar")
unique = set(dataset)
env = Environment(extensions=[SerializerExtension])
env.filters.update(JinjaFilter.salt_jinja_filters)
if six.PY3:
rendered = (
env.from_string("{{ dataset|unique }}")
.render(dataset=dataset)
.strip("'{}")
.split("', '")
)
self.assertEqual(sorted(rendered), sorted(list(unique)))
else:
rendered = env.from_string("{{ dataset|unique }}").render(dataset=dataset)
self.assertEqual(rendered, "{0}".format(unique))
rendered = (
env.from_string("{{ dataset|unique }}")
.render(dataset=dataset)
.strip("'{}")
.split("', '")
)
self.assertEqual(sorted(rendered), sorted(list(unique)))
def test_unique_list(self):
dataset = ["foo", "foo", "bar"]
unique = ["foo", "bar"]
env = Environment(extensions=[SerializerExtension])
env.filters.update(JinjaFilter.salt_jinja_filters)
if six.PY3:
rendered = (
env.from_string("{{ dataset|unique }}")
.render(dataset=dataset)
.strip("'[]")
.split("', '")
)
self.assertEqual(rendered, unique)
else:
rendered = env.from_string("{{ dataset|unique }}").render(dataset=dataset)
self.assertEqual(rendered, "{0}".format(unique))
rendered = (
env.from_string("{{ dataset|unique }}")
.render(dataset=dataset)
.strip("'[]")
.split("', '")
)
self.assertEqual(rendered, unique)
def test_serialize_json(self):
dataset = {"foo": True, "bar": 42, "baz": [1, 2, 3], "qux": 2.0}
@ -795,17 +775,7 @@ class TestCustomExtensions(TestCase):
dataset = "str value"
env = Environment(extensions=[SerializerExtension])
rendered = env.from_string("{{ dataset|yaml }}").render(dataset=dataset)
if six.PY3:
self.assertEqual("str value", rendered)
else:
# Due to a bug in the equality handler, this check needs to be split
# up into several different assertions. We need to check that the various
# string segments are present in the rendered value, as well as the
# type of the rendered variable (should be unicode, which is the same as
# six.text_type). This should cover all use cases but also allow the test
# to pass on CentOS 6 running Python 2.7.
self.assertIn("str value", rendered)
self.assertIsInstance(rendered, six.text_type)
self.assertEqual("str value", rendered)
def test_serialize_python(self):
dataset = {"foo": True, "bar": 42, "baz": [1, 2, 3], "qux": 2.0}
@ -976,20 +946,14 @@ class TestCustomExtensions(TestCase):
rendered = env.from_string("{{ data }}").render(data=data)
self.assertEqual(
rendered,
"{u'foo': {u'bar': u'baz', u'qux': 42}}"
if six.PY2
else "{'foo': {'bar': 'baz', 'qux': 42}}",
rendered, "{'foo': {'bar': 'baz', 'qux': 42}}",
)
rendered = env.from_string("{{ data }}").render(
data=[OrderedDict(foo="bar",), OrderedDict(baz=42,)]
)
self.assertEqual(
rendered,
"[{'foo': u'bar'}, {'baz': 42}]"
if six.PY2
else "[{'foo': 'bar'}, {'baz': 42}]",
rendered, "[{'foo': 'bar'}, {'baz': 42}]",
)
def test_set_dict_key_value(self):
@ -1031,10 +995,7 @@ class TestCustomExtensions(TestCase):
),
)
self.assertEqual(
rendered,
"{u'bar': {u'baz': {u'qux': 1, u'quux': 3}}}"
if six.PY2
else "{'bar': {'baz': {'qux': 1, 'quux': 3}}}",
rendered, "{'bar': {'baz': {'qux': 1, 'quux': 3}}}",
)
# Test incorrect usage
@ -1076,10 +1037,7 @@ class TestCustomExtensions(TestCase):
),
)
self.assertEqual(
rendered,
"{u'bar': {u'baz': [1, 2, 42]}}"
if six.PY2
else "{'bar': {'baz': [1, 2, 42]}}",
rendered, "{'bar': {'baz': [1, 2, 42]}}",
)
def test_extend_dict_key_value(self):
@ -1102,10 +1060,7 @@ class TestCustomExtensions(TestCase):
),
)
self.assertEqual(
rendered,
"{u'bar': {u'baz': [1, 2, 42, 43]}}"
if six.PY2
else "{'bar': {'baz': [1, 2, 42, 43]}}",
rendered, "{'bar': {'baz': [1, 2, 42, 43]}}",
)
# Edge cases
rendered = render_jinja_tmpl(
@ -1576,6 +1531,45 @@ class TestCustomExtensions(TestCase):
)
self.assertEqual(rendered, "1, 4")
def test_method_call(self):
"""
Test the `method_call` Jinja filter.
"""
rendered = render_jinja_tmpl(
"{{ 6|method_call('bit_length') }}",
dict(opts=self.local_opts, saltenv="test", salt=self.local_salt),
)
self.assertEqual(rendered, "3")
rendered = render_jinja_tmpl(
"{{ 6.7|method_call('is_integer') }}",
dict(opts=self.local_opts, saltenv="test", salt=self.local_salt),
)
self.assertEqual(rendered, "False")
rendered = render_jinja_tmpl(
"{{ 'absaltba'|method_call('strip', 'ab') }}",
dict(opts=self.local_opts, saltenv="test", salt=self.local_salt),
)
self.assertEqual(rendered, "salt")
rendered = render_jinja_tmpl(
"{{ [1, 2, 1, 3, 4]|method_call('index', 1, 1, 3) }}",
dict(opts=self.local_opts, saltenv="test", salt=self.local_salt),
)
self.assertEqual(rendered, "2")
# have to use `dictsort` to keep test result deterministic
rendered = render_jinja_tmpl(
"{{ {}|method_call('fromkeys', ['a', 'b', 'c'], 0)|dictsort }}",
dict(opts=self.local_opts, saltenv="test", salt=self.local_salt),
)
self.assertEqual(rendered, "[('a', 0), ('b', 0), ('c', 0)]")
# missing object method test
rendered = render_jinja_tmpl(
"{{ 6|method_call('bit_width') }}",
dict(opts=self.local_opts, saltenv="test", salt=self.local_salt),
)
self.assertEqual(rendered, "None")
def test_md5(self):
"""
Test the `md5` Jinja filter.