mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
1007 lines
38 KiB
Python
1007 lines
38 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
:codeauthor: Nicole Thomas <nicole@saltstack.com>
|
|
"""
|
|
from __future__ import absolute_import, print_function, unicode_literals
|
|
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
|
|
import salt.exceptions
|
|
import salt.state
|
|
import salt.utils.files
|
|
import salt.utils.platform
|
|
from salt.exceptions import CommandExecutionError
|
|
from salt.utils.decorators import state as statedecorators
|
|
from salt.utils.odict import OrderedDict
|
|
from tests.support.helpers import slowTest, with_tempfile
|
|
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
|
|
from tests.support.mock import MagicMock, patch
|
|
from tests.support.runtests import RUNTIME_VARS
|
|
from tests.support.unit import TestCase, skipIf
|
|
|
|
try:
|
|
import pytest
|
|
except ImportError as err:
|
|
pytest = None
|
|
|
|
|
|
class StateCompilerTestCase(TestCase, AdaptedConfigurationTestCaseMixin):
|
|
"""
|
|
TestCase for the state compiler.
|
|
"""
|
|
|
|
def test_format_log_non_ascii_character(self):
|
|
"""
|
|
Tests running a non-ascii character through the state.format_log
|
|
function. See Issue #33605.
|
|
"""
|
|
# There is no return to test against as the format_log
|
|
# function doesn't return anything. However, we do want
|
|
# to make sure that the function doesn't stacktrace when
|
|
# called.
|
|
ret = {
|
|
"changes": {"Français": {"old": "something old", "new": "something new"}},
|
|
"result": True,
|
|
}
|
|
salt.state.format_log(ret)
|
|
|
|
@slowTest
|
|
def test_render_error_on_invalid_requisite(self):
|
|
"""
|
|
Test that the state compiler correctly deliver a rendering
|
|
exception when a requisite cannot be resolved
|
|
"""
|
|
with patch("salt.state.State._gather_pillar") as state_patch:
|
|
high_data = {
|
|
"git": OrderedDict(
|
|
[
|
|
(
|
|
"pkg",
|
|
[
|
|
OrderedDict(
|
|
[
|
|
(
|
|
"require",
|
|
[
|
|
OrderedDict(
|
|
[
|
|
(
|
|
"file",
|
|
OrderedDict(
|
|
[("test1", "test")]
|
|
),
|
|
)
|
|
]
|
|
)
|
|
],
|
|
)
|
|
]
|
|
),
|
|
"installed",
|
|
{"order": 10000},
|
|
],
|
|
),
|
|
("__sls__", "issue_35226"),
|
|
("__env__", "base"),
|
|
]
|
|
)
|
|
}
|
|
minion_opts = self.get_temp_config("minion")
|
|
minion_opts["pillar"] = {"git": OrderedDict([("test1", "test")])}
|
|
state_obj = salt.state.State(minion_opts)
|
|
with self.assertRaises(salt.exceptions.SaltRenderError):
|
|
state_obj.call_high(high_data)
|
|
|
|
def test_verify_onlyif_parse(self):
|
|
low_data = {
|
|
"onlyif": [{"fun": "test.arg", "args": ["arg1", "arg2"]}],
|
|
"name": "mysql-server-5.7",
|
|
"state": "debconf",
|
|
"__id__": "set root password",
|
|
"fun": "set",
|
|
"__env__": "base",
|
|
"__sls__": "debconf",
|
|
"data": {
|
|
"mysql-server/root_password": {"type": "password", "value": "temp123"}
|
|
},
|
|
"order": 10000,
|
|
}
|
|
expected_result = {"comment": "onlyif condition is true", "result": False}
|
|
|
|
with patch("salt.state.State._gather_pillar") as state_patch:
|
|
minion_opts = self.get_temp_config("minion")
|
|
state_obj = salt.state.State(minion_opts)
|
|
return_result = state_obj._run_check_onlyif(low_data, "")
|
|
self.assertEqual(expected_result, return_result)
|
|
|
|
def test_verify_onlyif_cmd_error(self):
|
|
"""
|
|
Simulates a failure in cmd.retcode from onlyif
|
|
This could occur is runas is specified with a user that does not exist
|
|
"""
|
|
low_data = {
|
|
"onlyif": "somecommand",
|
|
"runas" "doesntexist" "name": "echo something",
|
|
"state": "cmd",
|
|
"__id__": "this is just a test",
|
|
"fun": "run",
|
|
"__env__": "base",
|
|
"__sls__": "sometest",
|
|
"order": 10000,
|
|
}
|
|
expected_result = {
|
|
"comment": "onlyif condition is false",
|
|
"result": True,
|
|
"skip_watch": True,
|
|
}
|
|
|
|
with patch("salt.state.State._gather_pillar") as state_patch:
|
|
minion_opts = self.get_temp_config("minion")
|
|
state_obj = salt.state.State(minion_opts)
|
|
mock = MagicMock(side_effect=CommandExecutionError("Boom!"))
|
|
with patch.dict(state_obj.functions, {"cmd.retcode": mock}):
|
|
# The mock handles the exception, but the runas dict is being passed as it would actually be
|
|
return_result = state_obj._run_check_onlyif(
|
|
low_data, {"runas": "doesntexist"}
|
|
)
|
|
self.assertEqual(expected_result, return_result)
|
|
|
|
def test_verify_unless_cmd_error(self):
|
|
"""
|
|
Simulates a failure in cmd.retcode from unless
|
|
This could occur is runas is specified with a user that does not exist
|
|
"""
|
|
low_data = {
|
|
"unless": "somecommand",
|
|
"runas" "doesntexist" "name": "echo something",
|
|
"state": "cmd",
|
|
"__id__": "this is just a test",
|
|
"fun": "run",
|
|
"__env__": "base",
|
|
"__sls__": "sometest",
|
|
"order": 10000,
|
|
}
|
|
expected_result = {
|
|
"comment": "unless condition is true",
|
|
"result": True,
|
|
"skip_watch": True,
|
|
}
|
|
|
|
with patch("salt.state.State._gather_pillar") as state_patch:
|
|
minion_opts = self.get_temp_config("minion")
|
|
state_obj = salt.state.State(minion_opts)
|
|
mock = MagicMock(side_effect=CommandExecutionError("Boom!"))
|
|
with patch.dict(state_obj.functions, {"cmd.retcode": mock}):
|
|
# The mock handles the exception, but the runas dict is being passed as it would actually be
|
|
return_result = state_obj._run_check_unless(
|
|
low_data, {"runas": "doesntexist"}
|
|
)
|
|
self.assertEqual(expected_result, return_result)
|
|
|
|
def test_verify_unless_parse(self):
|
|
low_data = {
|
|
"unless": [{"fun": "test.arg", "args": ["arg1", "arg2"]}],
|
|
"name": "mysql-server-5.7",
|
|
"state": "debconf",
|
|
"__id__": "set root password",
|
|
"fun": "set",
|
|
"__env__": "base",
|
|
"__sls__": "debconf",
|
|
"data": {
|
|
"mysql-server/root_password": {"type": "password", "value": "temp123"}
|
|
},
|
|
"order": 10000,
|
|
}
|
|
expected_result = {
|
|
"comment": "unless condition is true",
|
|
"result": True,
|
|
"skip_watch": True,
|
|
}
|
|
|
|
with patch("salt.state.State._gather_pillar") as state_patch:
|
|
minion_opts = self.get_temp_config("minion")
|
|
state_obj = salt.state.State(minion_opts)
|
|
return_result = state_obj._run_check_unless(low_data, "")
|
|
self.assertEqual(expected_result, return_result)
|
|
|
|
def test_verify_creates(self):
|
|
low_data = {
|
|
"state": "cmd",
|
|
"name": 'echo "something"',
|
|
"__sls__": "tests.creates",
|
|
"__env__": "base",
|
|
"__id__": "do_a_thing",
|
|
"creates": "/tmp/thing",
|
|
"order": 10000,
|
|
"fun": "run",
|
|
}
|
|
|
|
with patch("salt.state.State._gather_pillar") as state_patch:
|
|
minion_opts = self.get_temp_config("minion")
|
|
state_obj = salt.state.State(minion_opts)
|
|
with patch("os.path.exists") as path_mock:
|
|
path_mock.return_value = True
|
|
expected_result = {
|
|
"comment": "/tmp/thing exists",
|
|
"result": True,
|
|
"skip_watch": True,
|
|
}
|
|
return_result = state_obj._run_check_creates(low_data)
|
|
self.assertEqual(expected_result, return_result)
|
|
|
|
path_mock.return_value = False
|
|
expected_result = {
|
|
"comment": "Creates files not found",
|
|
"result": False,
|
|
}
|
|
return_result = state_obj._run_check_creates(low_data)
|
|
self.assertEqual(expected_result, return_result)
|
|
|
|
def test_verify_creates_list(self):
|
|
low_data = {
|
|
"state": "cmd",
|
|
"name": 'echo "something"',
|
|
"__sls__": "tests.creates",
|
|
"__env__": "base",
|
|
"__id__": "do_a_thing",
|
|
"creates": ["/tmp/thing", "/tmp/thing2"],
|
|
"order": 10000,
|
|
"fun": "run",
|
|
}
|
|
|
|
with patch("salt.state.State._gather_pillar") as state_patch:
|
|
minion_opts = self.get_temp_config("minion")
|
|
state_obj = salt.state.State(minion_opts)
|
|
with patch("os.path.exists") as path_mock:
|
|
path_mock.return_value = True
|
|
expected_result = {
|
|
"comment": "All files in creates exist",
|
|
"result": True,
|
|
"skip_watch": True,
|
|
}
|
|
return_result = state_obj._run_check_creates(low_data)
|
|
self.assertEqual(expected_result, return_result)
|
|
|
|
path_mock.return_value = False
|
|
expected_result = {
|
|
"comment": "Creates files not found",
|
|
"result": False,
|
|
}
|
|
return_result = state_obj._run_check_creates(low_data)
|
|
self.assertEqual(expected_result, return_result)
|
|
|
|
def _expand_win_path(self, path):
|
|
"""
|
|
Expand C:/users/admini~1/appdata/local/temp/salt-tests-tmpdir/...
|
|
into C:/users/adminitrator/appdata/local/temp/salt-tests-tmpdir/...
|
|
to prevent file.search from expanding the "~" with os.path.expanduser
|
|
"""
|
|
if salt.utils.platform.is_windows():
|
|
import win32file
|
|
|
|
return win32file.GetLongPathName(path).replace("\\", "/")
|
|
else:
|
|
return path
|
|
|
|
@with_tempfile()
|
|
def test_verify_onlyif_parse_slots(self, name):
|
|
with salt.utils.files.fopen(name, "w") as fp:
|
|
fp.write("file-contents")
|
|
low_data = {
|
|
"onlyif": [
|
|
{
|
|
"fun": "file.search",
|
|
"args": [
|
|
"__slot__:salt:test.echo({})".format(
|
|
self._expand_win_path(name)
|
|
),
|
|
],
|
|
"pattern": "__slot__:salt:test.echo(file-contents)",
|
|
}
|
|
],
|
|
"name": "mysql-server-5.7",
|
|
"state": "debconf",
|
|
"__id__": "set root password",
|
|
"fun": "set",
|
|
"__env__": "base",
|
|
"__sls__": "debconf",
|
|
"data": {
|
|
"mysql-server/root_password": {"type": "password", "value": "temp123"}
|
|
},
|
|
"order": 10000,
|
|
}
|
|
expected_result = {"comment": "onlyif condition is true", "result": False}
|
|
with patch("salt.state.State._gather_pillar") as state_patch:
|
|
minion_opts = self.get_temp_config("minion")
|
|
state_obj = salt.state.State(minion_opts)
|
|
return_result = state_obj._run_check_onlyif(low_data, "")
|
|
self.assertEqual(expected_result, return_result)
|
|
|
|
def test_verify_onlyif_list_cmd(self):
|
|
low_data = {
|
|
"state": "cmd",
|
|
"name": 'echo "something"',
|
|
"__sls__": "tests.cmd",
|
|
"__env__": "base",
|
|
"__id__": "check onlyif",
|
|
"onlyif": ["/bin/true", "/bin/false"],
|
|
"order": 10001,
|
|
"fun": "run",
|
|
}
|
|
expected_result = {
|
|
"comment": "onlyif condition is false",
|
|
"result": True,
|
|
"skip_watch": True,
|
|
}
|
|
with patch("salt.state.State._gather_pillar") as state_patch:
|
|
minion_opts = self.get_temp_config("minion")
|
|
state_obj = salt.state.State(minion_opts)
|
|
return_result = state_obj._run_check_onlyif(low_data, {})
|
|
self.assertEqual(expected_result, return_result)
|
|
|
|
@with_tempfile()
|
|
def test_verify_unless_parse_slots(self, name):
|
|
with salt.utils.files.fopen(name, "w") as fp:
|
|
fp.write("file-contents")
|
|
low_data = {
|
|
"unless": [
|
|
{
|
|
"fun": "file.search",
|
|
"args": [
|
|
"__slot__:salt:test.echo({})".format(
|
|
self._expand_win_path(name)
|
|
),
|
|
],
|
|
"pattern": "__slot__:salt:test.echo(file-contents)",
|
|
}
|
|
],
|
|
"name": "mysql-server-5.7",
|
|
"state": "debconf",
|
|
"__id__": "set root password",
|
|
"fun": "set",
|
|
"__env__": "base",
|
|
"__sls__": "debconf",
|
|
"data": {
|
|
"mysql-server/root_password": {"type": "password", "value": "temp123"}
|
|
},
|
|
"order": 10000,
|
|
}
|
|
expected_result = {
|
|
"comment": "unless condition is true",
|
|
"result": True,
|
|
"skip_watch": True,
|
|
}
|
|
|
|
with patch("salt.state.State._gather_pillar") as state_patch:
|
|
minion_opts = self.get_temp_config("minion")
|
|
state_obj = salt.state.State(minion_opts)
|
|
return_result = state_obj._run_check_unless(low_data, "")
|
|
self.assertEqual(expected_result, return_result)
|
|
|
|
def test_verify_retry_parsing(self):
|
|
low_data = {
|
|
"state": "file",
|
|
"name": "/tmp/saltstack.README.rst",
|
|
"__sls__": "demo.download",
|
|
"__env__": "base",
|
|
"__id__": "download sample data",
|
|
"retry": {"attempts": 5, "interval": 5},
|
|
"unless": ["test -f /tmp/saltstack.README.rst"],
|
|
"source": [
|
|
"https://raw.githubusercontent.com/saltstack/salt/develop/README.rst"
|
|
],
|
|
"source_hash": "f2bc8c0aa2ae4f5bb5c2051686016b48",
|
|
"order": 10000,
|
|
"fun": "managed",
|
|
}
|
|
expected_result = {
|
|
"__id__": "download sample data",
|
|
"__run_num__": 0,
|
|
"__sls__": "demo.download",
|
|
"changes": {},
|
|
"comment": "['unless condition is true'] The state would be retried every 5 "
|
|
"seconds (with a splay of up to 0 seconds) a maximum of 5 times or "
|
|
"until a result of True is returned",
|
|
"name": "/tmp/saltstack.README.rst",
|
|
"result": True,
|
|
"skip_watch": True,
|
|
}
|
|
|
|
with patch("salt.state.State._gather_pillar") as state_patch:
|
|
minion_opts = self.get_temp_config("minion")
|
|
minion_opts["test"] = True
|
|
minion_opts["file_client"] = "local"
|
|
state_obj = salt.state.State(minion_opts)
|
|
mock = {
|
|
"result": True,
|
|
"comment": ["unless condition is true"],
|
|
"skip_watch": True,
|
|
}
|
|
with patch.object(state_obj, "_run_check", return_value=mock):
|
|
self.assertDictContainsSubset(expected_result, state_obj.call(low_data))
|
|
|
|
def test_render_requisite_require_disabled(self):
|
|
"""
|
|
Test that the state compiler correctly deliver a rendering
|
|
exception when a requisite cannot be resolved
|
|
"""
|
|
with patch("salt.state.State._gather_pillar") as state_patch:
|
|
high_data = {
|
|
"step_one": OrderedDict(
|
|
[
|
|
(
|
|
"test",
|
|
[
|
|
OrderedDict(
|
|
[("require", [OrderedDict([("test", "step_two")])])]
|
|
),
|
|
"succeed_with_changes",
|
|
{"order": 10000},
|
|
],
|
|
),
|
|
("__sls__", "test.disable_require"),
|
|
("__env__", "base"),
|
|
]
|
|
),
|
|
"step_two": {
|
|
"test": ["succeed_with_changes", {"order": 10001}],
|
|
"__env__": "base",
|
|
"__sls__": "test.disable_require",
|
|
},
|
|
}
|
|
|
|
minion_opts = self.get_temp_config("minion")
|
|
minion_opts["disabled_requisites"] = ["require"]
|
|
state_obj = salt.state.State(minion_opts)
|
|
ret = state_obj.call_high(high_data)
|
|
run_num = ret["test_|-step_one_|-step_one_|-succeed_with_changes"][
|
|
"__run_num__"
|
|
]
|
|
self.assertEqual(run_num, 0)
|
|
|
|
def test_render_requisite_require_in_disabled(self):
|
|
"""
|
|
Test that the state compiler correctly deliver a rendering
|
|
exception when a requisite cannot be resolved
|
|
"""
|
|
with patch("salt.state.State._gather_pillar") as state_patch:
|
|
high_data = {
|
|
"step_one": {
|
|
"test": ["succeed_with_changes", {"order": 10000}],
|
|
"__env__": "base",
|
|
"__sls__": "test.disable_require_in",
|
|
},
|
|
"step_two": OrderedDict(
|
|
[
|
|
(
|
|
"test",
|
|
[
|
|
OrderedDict(
|
|
[
|
|
(
|
|
"require_in",
|
|
[OrderedDict([("test", "step_one")])],
|
|
)
|
|
]
|
|
),
|
|
"succeed_with_changes",
|
|
{"order": 10001},
|
|
],
|
|
),
|
|
("__sls__", "test.disable_require_in"),
|
|
("__env__", "base"),
|
|
]
|
|
),
|
|
}
|
|
|
|
minion_opts = self.get_temp_config("minion")
|
|
minion_opts["disabled_requisites"] = ["require_in"]
|
|
state_obj = salt.state.State(minion_opts)
|
|
ret = state_obj.call_high(high_data)
|
|
run_num = ret["test_|-step_one_|-step_one_|-succeed_with_changes"][
|
|
"__run_num__"
|
|
]
|
|
self.assertEqual(run_num, 0)
|
|
|
|
|
|
class HighStateTestCase(TestCase, AdaptedConfigurationTestCaseMixin):
|
|
def setUp(self):
|
|
root_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
|
|
self.state_tree_dir = os.path.join(root_dir, "state_tree")
|
|
cache_dir = os.path.join(root_dir, "cachedir")
|
|
for dpath in (root_dir, self.state_tree_dir, cache_dir):
|
|
if not os.path.isdir(dpath):
|
|
os.makedirs(dpath)
|
|
|
|
overrides = {}
|
|
overrides["root_dir"] = root_dir
|
|
overrides["state_events"] = False
|
|
overrides["id"] = "match"
|
|
overrides["file_client"] = "local"
|
|
overrides["file_roots"] = dict(base=[self.state_tree_dir])
|
|
overrides["cachedir"] = cache_dir
|
|
overrides["test"] = False
|
|
self.config = self.get_temp_config("minion", **overrides)
|
|
self.addCleanup(delattr, self, "config")
|
|
self.highstate = salt.state.HighState(self.config)
|
|
self.addCleanup(delattr, self, "highstate")
|
|
self.highstate.push_active()
|
|
|
|
def tearDown(self):
|
|
self.highstate.pop_active()
|
|
|
|
def test_top_matches_with_list(self):
|
|
top = {"env": {"match": ["state1", "state2"], "nomatch": ["state3"]}}
|
|
matches = self.highstate.top_matches(top)
|
|
self.assertEqual(matches, {"env": ["state1", "state2"]})
|
|
|
|
def test_top_matches_with_string(self):
|
|
top = {"env": {"match": "state1", "nomatch": "state2"}}
|
|
matches = self.highstate.top_matches(top)
|
|
self.assertEqual(matches, {"env": ["state1"]})
|
|
|
|
def test_matches_whitelist(self):
|
|
matches = {"env": ["state1", "state2", "state3"]}
|
|
matches = self.highstate.matches_whitelist(matches, ["state2"])
|
|
self.assertEqual(matches, {"env": ["state2"]})
|
|
|
|
def test_matches_whitelist_with_string(self):
|
|
matches = {"env": ["state1", "state2", "state3"]}
|
|
matches = self.highstate.matches_whitelist(matches, "state2,state3")
|
|
self.assertEqual(matches, {"env": ["state2", "state3"]})
|
|
|
|
def test_show_state_usage(self):
|
|
# monkey patch sub methods
|
|
self.highstate.avail = {"base": ["state.a", "state.b", "state.c"]}
|
|
|
|
def verify_tops(*args, **kwargs):
|
|
return []
|
|
|
|
def get_top(*args, **kwargs):
|
|
return None
|
|
|
|
def top_matches(*args, **kwargs):
|
|
return {"base": ["state.a", "state.b"]}
|
|
|
|
self.highstate.verify_tops = verify_tops
|
|
self.highstate.get_top = get_top
|
|
self.highstate.top_matches = top_matches
|
|
|
|
# get compile_state_usage() result
|
|
state_usage_dict = self.highstate.compile_state_usage()
|
|
|
|
self.assertEqual(state_usage_dict["base"]["count_unused"], 1)
|
|
self.assertEqual(state_usage_dict["base"]["count_used"], 2)
|
|
self.assertEqual(state_usage_dict["base"]["count_all"], 3)
|
|
self.assertEqual(state_usage_dict["base"]["used"], ["state.a", "state.b"])
|
|
self.assertEqual(state_usage_dict["base"]["unused"], ["state.c"])
|
|
|
|
def test_find_sls_ids_with_exclude(self):
|
|
"""
|
|
See https://github.com/saltstack/salt/issues/47182
|
|
"""
|
|
sls_dir = "issue-47182"
|
|
shutil.copytree(
|
|
os.path.join(RUNTIME_VARS.BASE_FILES, sls_dir),
|
|
os.path.join(self.state_tree_dir, sls_dir),
|
|
)
|
|
shutil.move(
|
|
os.path.join(self.state_tree_dir, sls_dir, "top.sls"), self.state_tree_dir
|
|
)
|
|
# Manually compile the high data. We don't have to worry about all of
|
|
# the normal error checking we do here since we know that all the SLS
|
|
# files exist and there is no whitelist/blacklist being used.
|
|
top = self.highstate.get_top() # pylint: disable=assignment-from-none
|
|
matches = self.highstate.top_matches(top)
|
|
high, _ = self.highstate.render_highstate(matches)
|
|
ret = salt.state.find_sls_ids("issue-47182.stateA.newer", high)
|
|
self.assertEqual(ret, [("somestuff", "cmd")])
|
|
|
|
|
|
class MultiEnvHighStateTestCase(TestCase, AdaptedConfigurationTestCaseMixin):
|
|
def setUp(self):
|
|
root_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
|
|
self.base_state_tree_dir = os.path.join(root_dir, "base")
|
|
self.other_state_tree_dir = os.path.join(root_dir, "other")
|
|
cache_dir = os.path.join(root_dir, "cachedir")
|
|
for dpath in (
|
|
root_dir,
|
|
self.base_state_tree_dir,
|
|
self.other_state_tree_dir,
|
|
cache_dir,
|
|
):
|
|
if not os.path.isdir(dpath):
|
|
os.makedirs(dpath)
|
|
shutil.copy(
|
|
os.path.join(RUNTIME_VARS.BASE_FILES, "top.sls"), self.base_state_tree_dir
|
|
)
|
|
shutil.copy(
|
|
os.path.join(RUNTIME_VARS.BASE_FILES, "core.sls"), self.base_state_tree_dir
|
|
)
|
|
shutil.copy(
|
|
os.path.join(RUNTIME_VARS.BASE_FILES, "test.sls"), self.other_state_tree_dir
|
|
)
|
|
overrides = {}
|
|
overrides["root_dir"] = root_dir
|
|
overrides["state_events"] = False
|
|
overrides["id"] = "match"
|
|
overrides["file_client"] = "local"
|
|
overrides["file_roots"] = dict(
|
|
base=[self.base_state_tree_dir], other=[self.other_state_tree_dir]
|
|
)
|
|
overrides["cachedir"] = cache_dir
|
|
overrides["test"] = False
|
|
self.config = self.get_temp_config("minion", **overrides)
|
|
self.addCleanup(delattr, self, "config")
|
|
self.highstate = salt.state.HighState(self.config)
|
|
self.addCleanup(delattr, self, "highstate")
|
|
self.highstate.push_active()
|
|
|
|
def tearDown(self):
|
|
self.highstate.pop_active()
|
|
|
|
def test_lazy_avail_states_base(self):
|
|
# list_states not called yet
|
|
self.assertEqual(self.highstate.avail._filled, False)
|
|
self.assertEqual(self.highstate.avail._avail, {"base": None})
|
|
# After getting 'base' env available states
|
|
self.highstate.avail["base"] # pylint: disable=pointless-statement
|
|
self.assertEqual(self.highstate.avail._filled, False)
|
|
self.assertEqual(self.highstate.avail._avail, {"base": ["core", "top"]})
|
|
|
|
def test_lazy_avail_states_other(self):
|
|
# list_states not called yet
|
|
self.assertEqual(self.highstate.avail._filled, False)
|
|
self.assertEqual(self.highstate.avail._avail, {"base": None})
|
|
# After getting 'other' env available states
|
|
self.highstate.avail["other"] # pylint: disable=pointless-statement
|
|
self.assertEqual(self.highstate.avail._filled, True)
|
|
self.assertEqual(self.highstate.avail._avail, {"base": None, "other": ["test"]})
|
|
|
|
def test_lazy_avail_states_multi(self):
|
|
# list_states not called yet
|
|
self.assertEqual(self.highstate.avail._filled, False)
|
|
self.assertEqual(self.highstate.avail._avail, {"base": None})
|
|
# After getting 'base' env available states
|
|
self.highstate.avail["base"] # pylint: disable=pointless-statement
|
|
self.assertEqual(self.highstate.avail._filled, False)
|
|
self.assertEqual(self.highstate.avail._avail, {"base": ["core", "top"]})
|
|
# After getting 'other' env available states
|
|
self.highstate.avail["other"] # pylint: disable=pointless-statement
|
|
self.assertEqual(self.highstate.avail._filled, True)
|
|
self.assertEqual(
|
|
self.highstate.avail._avail, {"base": ["core", "top"], "other": ["test"]}
|
|
)
|
|
|
|
|
|
@skipIf(pytest is None, "PyTest is missing")
|
|
class StateReturnsTestCase(TestCase):
|
|
"""
|
|
TestCase for code handling state returns.
|
|
"""
|
|
|
|
def test_state_output_check_changes_is_dict(self):
|
|
"""
|
|
Test that changes key contains a dictionary.
|
|
:return:
|
|
"""
|
|
data = {"changes": []}
|
|
out = statedecorators.OutputUnifier("content_check")(lambda: data)()
|
|
assert "'Changes' should be a dictionary" in out["comment"]
|
|
assert not out["result"]
|
|
|
|
def test_state_output_check_return_is_dict(self):
|
|
"""
|
|
Test for the entire return is a dictionary
|
|
:return:
|
|
"""
|
|
data = ["whatever"]
|
|
out = statedecorators.OutputUnifier("content_check")(lambda: data)()
|
|
assert (
|
|
"Malformed state return. Data must be a dictionary type" in out["comment"]
|
|
)
|
|
assert not out["result"]
|
|
|
|
def test_state_output_check_return_has_nrc(self):
|
|
"""
|
|
Test for name/result/comment keys are inside the return.
|
|
:return:
|
|
"""
|
|
data = {"arbitrary": "data", "changes": {}}
|
|
out = statedecorators.OutputUnifier("content_check")(lambda: data)()
|
|
assert (
|
|
" The following keys were not present in the state return: name, result, comment"
|
|
in out["comment"]
|
|
)
|
|
assert not out["result"]
|
|
|
|
def test_state_output_unifier_comment_is_not_list(self):
|
|
"""
|
|
Test for output is unified so the comment is converted to a multi-line string
|
|
:return:
|
|
"""
|
|
data = {
|
|
"comment": ["data", "in", "the", "list"],
|
|
"changes": {},
|
|
"name": None,
|
|
"result": "fantastic!",
|
|
}
|
|
expected = {
|
|
"comment": "data\nin\nthe\nlist",
|
|
"changes": {},
|
|
"name": None,
|
|
"result": True,
|
|
}
|
|
assert statedecorators.OutputUnifier("unify")(lambda: data)() == expected
|
|
|
|
data = {
|
|
"comment": ["data", "in", "the", "list"],
|
|
"changes": {},
|
|
"name": None,
|
|
"result": None,
|
|
}
|
|
expected = "data\nin\nthe\nlist"
|
|
assert (
|
|
statedecorators.OutputUnifier("unify")(lambda: data)()["comment"]
|
|
== expected
|
|
)
|
|
|
|
def test_state_output_unifier_result_converted_to_true(self):
|
|
"""
|
|
Test for output is unified so the result is converted to True
|
|
:return:
|
|
"""
|
|
data = {
|
|
"comment": ["data", "in", "the", "list"],
|
|
"changes": {},
|
|
"name": None,
|
|
"result": "Fantastic",
|
|
}
|
|
assert statedecorators.OutputUnifier("unify")(lambda: data)()["result"] is True
|
|
|
|
def test_state_output_unifier_result_converted_to_false(self):
|
|
"""
|
|
Test for output is unified so the result is converted to False
|
|
:return:
|
|
"""
|
|
data = {
|
|
"comment": ["data", "in", "the", "list"],
|
|
"changes": {},
|
|
"name": None,
|
|
"result": "",
|
|
}
|
|
assert statedecorators.OutputUnifier("unify")(lambda: data)()["result"] is False
|
|
|
|
|
|
class StateFormatSlotsTestCase(TestCase, AdaptedConfigurationTestCaseMixin):
|
|
"""
|
|
TestCase for code handling slots
|
|
"""
|
|
|
|
def setUp(self):
|
|
with patch("salt.state.State._gather_pillar"):
|
|
minion_opts = self.get_temp_config("minion")
|
|
self.state_obj = salt.state.State(minion_opts)
|
|
|
|
def test_format_slots_no_slots(self):
|
|
"""
|
|
Test the format slots keeps data without slots untouched.
|
|
"""
|
|
cdata = {"args": ["arg"], "kwargs": {"key": "val"}}
|
|
self.state_obj.format_slots(cdata)
|
|
self.assertEqual(cdata, {"args": ["arg"], "kwargs": {"key": "val"}})
|
|
|
|
@slowTest
|
|
def test_format_slots_arg(self):
|
|
"""
|
|
Test the format slots is calling a slot specified in args with corresponding arguments.
|
|
"""
|
|
cdata = {
|
|
"args": ["__slot__:salt:mod.fun(fun_arg, fun_key=fun_val)"],
|
|
"kwargs": {"key": "val"},
|
|
}
|
|
mock = MagicMock(return_value="fun_return")
|
|
with patch.dict(self.state_obj.functions, {"mod.fun": mock}):
|
|
self.state_obj.format_slots(cdata)
|
|
mock.assert_called_once_with("fun_arg", fun_key="fun_val")
|
|
self.assertEqual(cdata, {"args": ["fun_return"], "kwargs": {"key": "val"}})
|
|
|
|
@slowTest
|
|
def test_format_slots_dict_arg(self):
|
|
"""
|
|
Test the format slots is calling a slot specified in dict arg.
|
|
"""
|
|
cdata = {
|
|
"args": [{"subarg": "__slot__:salt:mod.fun(fun_arg, fun_key=fun_val)"}],
|
|
"kwargs": {"key": "val"},
|
|
}
|
|
mock = MagicMock(return_value="fun_return")
|
|
with patch.dict(self.state_obj.functions, {"mod.fun": mock}):
|
|
self.state_obj.format_slots(cdata)
|
|
mock.assert_called_once_with("fun_arg", fun_key="fun_val")
|
|
self.assertEqual(
|
|
cdata, {"args": [{"subarg": "fun_return"}], "kwargs": {"key": "val"}}
|
|
)
|
|
|
|
@slowTest
|
|
def test_format_slots_listdict_arg(self):
|
|
"""
|
|
Test the format slots is calling a slot specified in list containing a dict.
|
|
"""
|
|
cdata = {
|
|
"args": [[{"subarg": "__slot__:salt:mod.fun(fun_arg, fun_key=fun_val)"}]],
|
|
"kwargs": {"key": "val"},
|
|
}
|
|
mock = MagicMock(return_value="fun_return")
|
|
with patch.dict(self.state_obj.functions, {"mod.fun": mock}):
|
|
self.state_obj.format_slots(cdata)
|
|
mock.assert_called_once_with("fun_arg", fun_key="fun_val")
|
|
self.assertEqual(
|
|
cdata, {"args": [[{"subarg": "fun_return"}]], "kwargs": {"key": "val"}}
|
|
)
|
|
|
|
@slowTest
|
|
def test_format_slots_liststr_arg(self):
|
|
"""
|
|
Test the format slots is calling a slot specified in list containing a dict.
|
|
"""
|
|
cdata = {
|
|
"args": [["__slot__:salt:mod.fun(fun_arg, fun_key=fun_val)"]],
|
|
"kwargs": {"key": "val"},
|
|
}
|
|
mock = MagicMock(return_value="fun_return")
|
|
with patch.dict(self.state_obj.functions, {"mod.fun": mock}):
|
|
self.state_obj.format_slots(cdata)
|
|
mock.assert_called_once_with("fun_arg", fun_key="fun_val")
|
|
self.assertEqual(cdata, {"args": [["fun_return"]], "kwargs": {"key": "val"}})
|
|
|
|
@slowTest
|
|
def test_format_slots_kwarg(self):
|
|
"""
|
|
Test the format slots is calling a slot specified in kwargs with corresponding arguments.
|
|
"""
|
|
cdata = {
|
|
"args": ["arg"],
|
|
"kwargs": {"key": "__slot__:salt:mod.fun(fun_arg, fun_key=fun_val)"},
|
|
}
|
|
mock = MagicMock(return_value="fun_return")
|
|
with patch.dict(self.state_obj.functions, {"mod.fun": mock}):
|
|
self.state_obj.format_slots(cdata)
|
|
mock.assert_called_once_with("fun_arg", fun_key="fun_val")
|
|
self.assertEqual(cdata, {"args": ["arg"], "kwargs": {"key": "fun_return"}})
|
|
|
|
@slowTest
|
|
def test_format_slots_multi(self):
|
|
"""
|
|
Test the format slots is calling all slots with corresponding arguments when multiple slots
|
|
specified.
|
|
"""
|
|
cdata = {
|
|
"args": [
|
|
"__slot__:salt:test_mod.fun_a(a_arg, a_key=a_kwarg)",
|
|
"__slot__:salt:test_mod.fun_b(b_arg, b_key=b_kwarg)",
|
|
],
|
|
"kwargs": {
|
|
"kw_key_1": "__slot__:salt:test_mod.fun_c(c_arg, c_key=c_kwarg)",
|
|
"kw_key_2": "__slot__:salt:test_mod.fun_d(d_arg, d_key=d_kwarg)",
|
|
},
|
|
}
|
|
mock_a = MagicMock(return_value="fun_a_return")
|
|
mock_b = MagicMock(return_value="fun_b_return")
|
|
mock_c = MagicMock(return_value="fun_c_return")
|
|
mock_d = MagicMock(return_value="fun_d_return")
|
|
with patch.dict(
|
|
self.state_obj.functions,
|
|
{
|
|
"test_mod.fun_a": mock_a,
|
|
"test_mod.fun_b": mock_b,
|
|
"test_mod.fun_c": mock_c,
|
|
"test_mod.fun_d": mock_d,
|
|
},
|
|
):
|
|
self.state_obj.format_slots(cdata)
|
|
mock_a.assert_called_once_with("a_arg", a_key="a_kwarg")
|
|
mock_b.assert_called_once_with("b_arg", b_key="b_kwarg")
|
|
mock_c.assert_called_once_with("c_arg", c_key="c_kwarg")
|
|
mock_d.assert_called_once_with("d_arg", d_key="d_kwarg")
|
|
self.assertEqual(
|
|
cdata,
|
|
{
|
|
"args": ["fun_a_return", "fun_b_return"],
|
|
"kwargs": {"kw_key_1": "fun_c_return", "kw_key_2": "fun_d_return"},
|
|
},
|
|
)
|
|
|
|
@slowTest
|
|
def test_format_slots_malformed(self):
|
|
"""
|
|
Test the format slots keeps malformed slots untouched.
|
|
"""
|
|
sls_data = {
|
|
"args": [
|
|
"__slot__:NOT_SUPPORTED:not.called()",
|
|
"__slot__:salt:not.called(",
|
|
"__slot__:salt:",
|
|
"__slot__:salt",
|
|
"__slot__:",
|
|
"__slot__",
|
|
],
|
|
"kwargs": {
|
|
"key3": "__slot__:NOT_SUPPORTED:not.called()",
|
|
"key4": "__slot__:salt:not.called(",
|
|
"key5": "__slot__:salt:",
|
|
"key6": "__slot__:salt",
|
|
"key7": "__slot__:",
|
|
"key8": "__slot__",
|
|
},
|
|
}
|
|
cdata = sls_data.copy()
|
|
mock = MagicMock(return_value="return")
|
|
with patch.dict(self.state_obj.functions, {"not.called": mock}):
|
|
self.state_obj.format_slots(cdata)
|
|
mock.assert_not_called()
|
|
self.assertEqual(cdata, sls_data)
|
|
|
|
@slowTest
|
|
def test_slot_traverse_dict(self):
|
|
"""
|
|
Test the slot parsing of dict response.
|
|
"""
|
|
cdata = {
|
|
"args": ["arg"],
|
|
"kwargs": {"key": "__slot__:salt:mod.fun(fun_arg, fun_key=fun_val).key1"},
|
|
}
|
|
return_data = {"key1": "value1"}
|
|
mock = MagicMock(return_value=return_data)
|
|
with patch.dict(self.state_obj.functions, {"mod.fun": mock}):
|
|
self.state_obj.format_slots(cdata)
|
|
mock.assert_called_once_with("fun_arg", fun_key="fun_val")
|
|
self.assertEqual(cdata, {"args": ["arg"], "kwargs": {"key": "value1"}})
|
|
|
|
@slowTest
|
|
def test_slot_append(self):
|
|
"""
|
|
Test the slot parsing of dict response.
|
|
"""
|
|
cdata = {
|
|
"args": ["arg"],
|
|
"kwargs": {
|
|
"key": "__slot__:salt:mod.fun(fun_arg, fun_key=fun_val).key1 ~ thing~",
|
|
},
|
|
}
|
|
return_data = {"key1": "value1"}
|
|
mock = MagicMock(return_value=return_data)
|
|
with patch.dict(self.state_obj.functions, {"mod.fun": mock}):
|
|
self.state_obj.format_slots(cdata)
|
|
mock.assert_called_once_with("fun_arg", fun_key="fun_val")
|
|
self.assertEqual(cdata, {"args": ["arg"], "kwargs": {"key": "value1thing~"}})
|
|
|
|
# Skip on windows like integration.modules.test_state.StateModuleTest.test_parallel_state_with_long_tag
|
|
@skipIf(
|
|
salt.utils.platform.is_windows(),
|
|
"Skipped until parallel states can be fixed on Windows",
|
|
)
|
|
def test_format_slots_parallel(self):
|
|
"""
|
|
Test if slots work with "parallel: true".
|
|
"""
|
|
high_data = {
|
|
"always-changes-and-succeeds": {
|
|
"test": [
|
|
{"changes": True},
|
|
{"comment": "__slot__:salt:test.echo(fun_return)"},
|
|
{"parallel": True},
|
|
"configurable_test_state",
|
|
{"order": 10000},
|
|
],
|
|
"__env__": "base",
|
|
"__sls__": "parallel_slots",
|
|
}
|
|
}
|
|
self.state_obj.jid = "123"
|
|
res = self.state_obj.call_high(high_data)
|
|
self.state_obj.jid = None
|
|
[(_, data)] = res.items()
|
|
self.assertEqual(data["comment"], "fun_return")
|