mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Moving tests/integration/modules/test_cmdmod.py to pytest, Gareth J Greenaway original author
This commit is contained in:
parent
c5cfe214cd
commit
2c1040b4c2
3 changed files with 654 additions and 634 deletions
|
@ -1,634 +0,0 @@
|
|||
import os
|
||||
import random
|
||||
import sys
|
||||
import tempfile
|
||||
from contextlib import contextmanager
|
||||
|
||||
import pytest
|
||||
|
||||
import salt.utils.path
|
||||
import salt.utils.platform
|
||||
import salt.utils.user
|
||||
from tests.support.case import ModuleCase
|
||||
from tests.support.helpers import SKIP_INITIAL_PHOTONOS_FAILURES, dedent
|
||||
from tests.support.runtests import RUNTIME_VARS
|
||||
|
||||
AVAILABLE_PYTHON_EXECUTABLE = salt.utils.path.which_bin(
|
||||
["python", "python2", "python2.6", "python2.7"]
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.windows_whitelisted
|
||||
class CMDModuleTest(ModuleCase):
|
||||
"""
|
||||
Validate the cmd module
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.runas_usr = "nobody"
|
||||
if salt.utils.platform.is_darwin():
|
||||
self.runas_usr = "macsalttest"
|
||||
|
||||
@contextmanager
|
||||
def _ensure_user_exists(self, name):
|
||||
if name in self.run_function("user.info", [name]).values():
|
||||
# User already exists; don't touch
|
||||
yield
|
||||
else:
|
||||
# Need to create user for test
|
||||
self.run_function("user.add", [name])
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
self.run_function("user.delete", [name], remove=True)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
@pytest.mark.skip_on_windows
|
||||
def test_run(self):
|
||||
"""
|
||||
cmd.run
|
||||
"""
|
||||
shell = os.environ.get("SHELL")
|
||||
if shell is None:
|
||||
# Failed to get the SHELL var, don't run
|
||||
self.skipTest("Unable to get the SHELL environment variable")
|
||||
|
||||
self.assertTrue(self.run_function("cmd.run", ["echo $SHELL"]))
|
||||
self.assertEqual(
|
||||
self.run_function(
|
||||
"cmd.run", ["echo $SHELL", "shell={}".format(shell)], python_shell=True
|
||||
).rstrip(),
|
||||
shell,
|
||||
)
|
||||
self.assertEqual(
|
||||
self.run_function("cmd.run", ["ls / | grep etc"], python_shell=True), "etc"
|
||||
)
|
||||
self.assertEqual(
|
||||
self.run_function(
|
||||
"cmd.run",
|
||||
['echo {{grains.id}} | awk "{print $1}"'],
|
||||
template="jinja",
|
||||
python_shell=True,
|
||||
),
|
||||
"minion",
|
||||
)
|
||||
self.assertEqual(
|
||||
self.run_function(
|
||||
"cmd.run", ["grep f"], stdin="one\ntwo\nthree\nfour\nfive\n"
|
||||
),
|
||||
"four\nfive",
|
||||
)
|
||||
self.assertEqual(
|
||||
self.run_function(
|
||||
"cmd.run", ['echo "a=b" | sed -e s/=/:/g'], python_shell=True
|
||||
),
|
||||
"a:b",
|
||||
)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_stdout(self):
|
||||
"""
|
||||
cmd.run_stdout
|
||||
"""
|
||||
self.assertEqual(
|
||||
self.run_function("cmd.run_stdout", ['echo "cheese"']).rstrip(),
|
||||
"cheese" if not salt.utils.platform.is_windows() else '"cheese"',
|
||||
)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_stderr(self):
|
||||
"""
|
||||
cmd.run_stderr
|
||||
"""
|
||||
if sys.platform.startswith(("freebsd", "openbsd")):
|
||||
shell = "/bin/sh"
|
||||
else:
|
||||
shell = "/bin/bash"
|
||||
|
||||
self.assertEqual(
|
||||
self.run_function(
|
||||
"cmd.run_stderr",
|
||||
['echo "cheese" 1>&2', "shell={}".format(shell)],
|
||||
python_shell=True,
|
||||
).rstrip(),
|
||||
"cheese" if not salt.utils.platform.is_windows() else '"cheese"',
|
||||
)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_run_all(self):
|
||||
"""
|
||||
cmd.run_all
|
||||
"""
|
||||
if sys.platform.startswith(("freebsd", "openbsd")):
|
||||
shell = "/bin/sh"
|
||||
else:
|
||||
shell = "/bin/bash"
|
||||
|
||||
ret = self.run_function(
|
||||
"cmd.run_all",
|
||||
['echo "cheese" 1>&2', "shell={}".format(shell)],
|
||||
python_shell=True,
|
||||
)
|
||||
self.assertTrue("pid" in ret)
|
||||
self.assertTrue("retcode" in ret)
|
||||
self.assertTrue("stdout" in ret)
|
||||
self.assertTrue("stderr" in ret)
|
||||
self.assertTrue(isinstance(ret.get("pid"), int))
|
||||
self.assertTrue(isinstance(ret.get("retcode"), int))
|
||||
self.assertTrue(isinstance(ret.get("stdout"), str))
|
||||
self.assertTrue(isinstance(ret.get("stderr"), str))
|
||||
self.assertEqual(
|
||||
ret.get("stderr").rstrip(),
|
||||
"cheese" if not salt.utils.platform.is_windows() else '"cheese"',
|
||||
)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_retcode(self):
|
||||
"""
|
||||
cmd.retcode
|
||||
"""
|
||||
self.assertEqual(
|
||||
self.run_function("cmd.retcode", ["exit 0"], python_shell=True), 0
|
||||
)
|
||||
self.assertEqual(
|
||||
self.run_function("cmd.retcode", ["exit 1"], python_shell=True), 1
|
||||
)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_run_all_with_success_retcodes(self):
|
||||
"""
|
||||
cmd.run with success_retcodes
|
||||
"""
|
||||
ret = self.run_function(
|
||||
"cmd.run_all", ["exit 42"], success_retcodes=[42], python_shell=True
|
||||
)
|
||||
|
||||
self.assertTrue("retcode" in ret)
|
||||
self.assertEqual(ret.get("retcode"), 0)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_retcode_with_success_retcodes(self):
|
||||
"""
|
||||
cmd.run with success_retcodes
|
||||
"""
|
||||
ret = self.run_function(
|
||||
"cmd.retcode", ["exit 42"], success_retcodes=[42], python_shell=True
|
||||
)
|
||||
|
||||
self.assertEqual(ret, 0)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_run_all_with_success_stderr(self):
|
||||
"""
|
||||
cmd.run with success_retcodes
|
||||
"""
|
||||
random_file = "{}{}{}".format(
|
||||
RUNTIME_VARS.TMP_ROOT_DIR, os.path.sep, random.random()
|
||||
)
|
||||
|
||||
if salt.utils.platform.is_windows():
|
||||
func = "type"
|
||||
expected_stderr = "cannot find the file specified"
|
||||
else:
|
||||
func = "cat"
|
||||
expected_stderr = "No such file or directory"
|
||||
ret = self.run_function(
|
||||
"cmd.run_all",
|
||||
["{} {}".format(func, random_file)],
|
||||
success_stderr=[expected_stderr],
|
||||
python_shell=True,
|
||||
)
|
||||
|
||||
self.assertTrue("retcode" in ret)
|
||||
self.assertEqual(ret.get("retcode"), 0)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_blacklist_glob(self):
|
||||
"""
|
||||
cmd_blacklist_glob
|
||||
"""
|
||||
self.assertEqual(
|
||||
self.run_function("cmd.run", ["bad_command --foo"]).rstrip(),
|
||||
'ERROR: The shell command "bad_command --foo" is not permitted',
|
||||
)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_script(self):
|
||||
"""
|
||||
cmd.script
|
||||
"""
|
||||
args = "saltines crackers biscuits=yes"
|
||||
script = "salt://script.py"
|
||||
ret = self.run_function("cmd.script", [script, args], saltenv="base")
|
||||
self.assertEqual(ret["stdout"], args)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_script_query_string(self):
|
||||
"""
|
||||
cmd.script
|
||||
"""
|
||||
args = "saltines crackers biscuits=yes"
|
||||
script = "salt://script.py?saltenv=base"
|
||||
ret = self.run_function("cmd.script", [script, args], saltenv="base")
|
||||
self.assertEqual(ret["stdout"], args)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_script_retcode(self):
|
||||
"""
|
||||
cmd.script_retcode
|
||||
"""
|
||||
script = "salt://script.py"
|
||||
ret = self.run_function("cmd.script_retcode", [script], saltenv="base")
|
||||
self.assertEqual(ret, 0)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_script_cwd(self):
|
||||
"""
|
||||
cmd.script with cwd
|
||||
"""
|
||||
tmp_cwd = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
|
||||
args = "saltines crackers biscuits=yes"
|
||||
script = "salt://script.py"
|
||||
ret = self.run_function(
|
||||
"cmd.script", [script, args], cwd=tmp_cwd, saltenv="base"
|
||||
)
|
||||
self.assertEqual(ret["stdout"], args)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_script_cwd_with_space(self):
|
||||
"""
|
||||
cmd.script with cwd
|
||||
"""
|
||||
tmp_cwd = "{}{}test 2".format(
|
||||
tempfile.mkdtemp(dir=RUNTIME_VARS.TMP), os.path.sep
|
||||
)
|
||||
os.mkdir(tmp_cwd)
|
||||
|
||||
args = "saltines crackers biscuits=yes"
|
||||
script = "salt://script.py"
|
||||
ret = self.run_function(
|
||||
"cmd.script", [script, args], cwd=tmp_cwd, saltenv="base"
|
||||
)
|
||||
self.assertEqual(ret["stdout"], args)
|
||||
|
||||
@pytest.mark.destructive_test
|
||||
def test_tty(self):
|
||||
"""
|
||||
cmd.tty
|
||||
"""
|
||||
for tty in ("tty0", "pts3"):
|
||||
if os.path.exists(os.path.join("/dev", tty)):
|
||||
ret = self.run_function("cmd.tty", [tty, "apply salt liberally"])
|
||||
self.assertTrue("Success" in ret)
|
||||
|
||||
@pytest.mark.skip_on_windows
|
||||
@pytest.mark.skip_if_binaries_missing("which")
|
||||
def test_which(self):
|
||||
"""
|
||||
cmd.which
|
||||
"""
|
||||
cmd_which = self.run_function("cmd.which", ["cat"])
|
||||
self.assertIsInstance(cmd_which, str)
|
||||
cmd_run = self.run_function("cmd.run", ["which cat"])
|
||||
self.assertIsInstance(cmd_run, str)
|
||||
self.assertEqual(cmd_which.rstrip(), cmd_run.rstrip())
|
||||
|
||||
@pytest.mark.skip_on_windows
|
||||
@pytest.mark.skip_if_binaries_missing("which")
|
||||
def test_which_bin(self):
|
||||
"""
|
||||
cmd.which_bin
|
||||
"""
|
||||
cmds = ["pip3", "pip2", "pip", "pip-python"]
|
||||
ret = self.run_function("cmd.which_bin", [cmds])
|
||||
self.assertTrue(os.path.split(ret)[1] in cmds)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_has_exec(self):
|
||||
"""
|
||||
cmd.has_exec
|
||||
"""
|
||||
self.assertTrue(
|
||||
self.run_function("cmd.has_exec", [AVAILABLE_PYTHON_EXECUTABLE])
|
||||
)
|
||||
self.assertFalse(
|
||||
self.run_function("cmd.has_exec", ["alllfsdfnwieulrrh9123857ygf"])
|
||||
)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_exec_code(self):
|
||||
"""
|
||||
cmd.exec_code
|
||||
"""
|
||||
code = dedent(
|
||||
"""
|
||||
import sys
|
||||
sys.stdout.write('cheese')
|
||||
"""
|
||||
)
|
||||
self.assertEqual(
|
||||
self.run_function(
|
||||
"cmd.exec_code", [AVAILABLE_PYTHON_EXECUTABLE, code]
|
||||
).rstrip(),
|
||||
"cheese",
|
||||
)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_exec_code_with_single_arg(self):
|
||||
"""
|
||||
cmd.exec_code
|
||||
"""
|
||||
code = dedent(
|
||||
"""
|
||||
import sys
|
||||
sys.stdout.write(sys.argv[1])
|
||||
"""
|
||||
)
|
||||
arg = "cheese"
|
||||
self.assertEqual(
|
||||
self.run_function(
|
||||
"cmd.exec_code", [AVAILABLE_PYTHON_EXECUTABLE, code], args=arg
|
||||
).rstrip(),
|
||||
arg,
|
||||
)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_exec_code_with_multiple_args(self):
|
||||
"""
|
||||
cmd.exec_code
|
||||
"""
|
||||
code = dedent(
|
||||
"""
|
||||
import sys
|
||||
sys.stdout.write(sys.argv[1])
|
||||
"""
|
||||
)
|
||||
arg = "cheese"
|
||||
self.assertEqual(
|
||||
self.run_function(
|
||||
"cmd.exec_code", [AVAILABLE_PYTHON_EXECUTABLE, code], args=[arg, "test"]
|
||||
).rstrip(),
|
||||
arg,
|
||||
)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_quotes(self):
|
||||
"""
|
||||
cmd.run with quoted command
|
||||
"""
|
||||
cmd = """echo 'SELECT * FROM foo WHERE bar="baz"' """
|
||||
expected_result = 'SELECT * FROM foo WHERE bar="baz"'
|
||||
if salt.utils.platform.is_windows():
|
||||
expected_result = "'SELECT * FROM foo WHERE bar=\"baz\"'"
|
||||
result = self.run_function("cmd.run_stdout", [cmd]).strip()
|
||||
self.assertEqual(result, expected_result)
|
||||
|
||||
@pytest.mark.skip_if_not_root
|
||||
@pytest.mark.skip_on_windows(reason="Skip on Windows, requires password")
|
||||
def test_quotes_runas(self):
|
||||
"""
|
||||
cmd.run with quoted command
|
||||
"""
|
||||
cmd = """echo 'SELECT * FROM foo WHERE bar="baz"' """
|
||||
expected_result = 'SELECT * FROM foo WHERE bar="baz"'
|
||||
result = self.run_function(
|
||||
"cmd.run_all", [cmd], runas=RUNTIME_VARS.RUNNING_TESTS_USER
|
||||
)
|
||||
errmsg = "The command returned: {}".format(result)
|
||||
self.assertEqual(result["retcode"], 0, errmsg)
|
||||
self.assertEqual(result["stdout"], expected_result, errmsg)
|
||||
|
||||
@pytest.mark.destructive_test
|
||||
@pytest.mark.skip_if_not_root
|
||||
@pytest.mark.skip_on_windows(reason="Skip on Windows, uses unix commands")
|
||||
@pytest.mark.slow_test
|
||||
def test_avoid_injecting_shell_code_as_root(self):
|
||||
"""
|
||||
cmd.run should execute the whole command as the "runas" user, not
|
||||
running substitutions as root.
|
||||
"""
|
||||
cmd = "echo $(id -u)"
|
||||
|
||||
root_id = self.run_function("cmd.run_stdout", [cmd])
|
||||
runas_root_id = self.run_function(
|
||||
"cmd.run_stdout", [cmd], runas=RUNTIME_VARS.RUNNING_TESTS_USER
|
||||
)
|
||||
with self._ensure_user_exists(self.runas_usr):
|
||||
user_id = self.run_function("cmd.run_stdout", [cmd], runas=self.runas_usr)
|
||||
|
||||
self.assertNotEqual(user_id, root_id)
|
||||
self.assertNotEqual(user_id, runas_root_id)
|
||||
self.assertEqual(root_id, runas_root_id)
|
||||
|
||||
@pytest.mark.destructive_test
|
||||
@pytest.mark.skip_if_not_root
|
||||
@pytest.mark.skip_on_windows(reason="Skip on Windows, uses unix commands")
|
||||
@pytest.mark.slow_test
|
||||
def test_cwd_runas(self):
|
||||
"""
|
||||
cmd.run should be able to change working directory correctly, whether
|
||||
or not runas is in use.
|
||||
"""
|
||||
cmd = "pwd"
|
||||
tmp_cwd = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
|
||||
os.chmod(tmp_cwd, 0o711)
|
||||
|
||||
cwd_normal = self.run_function("cmd.run_stdout", [cmd], cwd=tmp_cwd).rstrip(
|
||||
"\n"
|
||||
)
|
||||
self.assertEqual(tmp_cwd, cwd_normal)
|
||||
|
||||
with self._ensure_user_exists(self.runas_usr):
|
||||
cwd_runas = self.run_function(
|
||||
"cmd.run_stdout", [cmd], cwd=tmp_cwd, runas=self.runas_usr
|
||||
).rstrip("\n")
|
||||
self.assertEqual(tmp_cwd, cwd_runas)
|
||||
|
||||
@pytest.mark.destructive_test
|
||||
@pytest.mark.skip_if_not_root
|
||||
@pytest.mark.skip_unless_on_darwin(reason="Applicable to MacOS only")
|
||||
@pytest.mark.slow_test
|
||||
def test_runas_env(self):
|
||||
"""
|
||||
cmd.run should be able to change working directory correctly, whether
|
||||
or not runas is in use.
|
||||
"""
|
||||
with self._ensure_user_exists(self.runas_usr):
|
||||
user_path = self.run_function(
|
||||
"cmd.run_stdout", ['printf %s "$PATH"'], runas=self.runas_usr
|
||||
)
|
||||
# XXX: Not sure of a better way. Environment starts out with
|
||||
# /bin:/usr/bin and should be populated by path helper and the bash
|
||||
# profile.
|
||||
self.assertNotEqual("/bin:/usr/bin", user_path)
|
||||
|
||||
@pytest.mark.destructive_test
|
||||
@pytest.mark.skip_if_not_root
|
||||
@pytest.mark.skip_unless_on_darwin(reason="Applicable to MacOS only")
|
||||
@pytest.mark.slow_test
|
||||
def test_runas_complex_command_bad_cwd(self):
|
||||
"""
|
||||
cmd.run should not accidentally run parts of a complex command when
|
||||
given a cwd which cannot be used by the user the command is run as.
|
||||
|
||||
Due to the need to use `su -l` to login to another user on MacOS, we
|
||||
cannot cd into directories that the target user themselves does not
|
||||
have execute permission for. To an extent, this test is testing that
|
||||
buggy behaviour, but its purpose is to ensure that the greater bug of
|
||||
running commands after failing to cd does not occur.
|
||||
"""
|
||||
tmp_cwd = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
|
||||
os.chmod(tmp_cwd, 0o700)
|
||||
|
||||
with self._ensure_user_exists(self.runas_usr):
|
||||
cmd_result = self.run_function(
|
||||
"cmd.run_all",
|
||||
['pwd; pwd; : $(echo "You have failed the test" >&2)'],
|
||||
cwd=tmp_cwd,
|
||||
runas=self.runas_usr,
|
||||
)
|
||||
|
||||
self.assertEqual("", cmd_result["stdout"])
|
||||
self.assertNotIn("You have failed the test", cmd_result["stderr"])
|
||||
self.assertNotEqual(0, cmd_result["retcode"])
|
||||
|
||||
@SKIP_INITIAL_PHOTONOS_FAILURES
|
||||
@pytest.mark.skip_on_windows
|
||||
@pytest.mark.skip_if_not_root
|
||||
@pytest.mark.destructive_test
|
||||
@pytest.mark.slow_test
|
||||
def test_runas(self):
|
||||
"""
|
||||
Ensure that the env is the runas user's
|
||||
"""
|
||||
with self._ensure_user_exists(self.runas_usr):
|
||||
out = self.run_function(
|
||||
"cmd.run", ["env"], runas=self.runas_usr
|
||||
).splitlines()
|
||||
self.assertIn("USER={}".format(self.runas_usr), out)
|
||||
|
||||
@pytest.mark.skip_if_binaries_missing("sleep", reason="sleep cmd not installed")
|
||||
def test_timeout(self):
|
||||
"""
|
||||
cmd.run trigger timeout
|
||||
"""
|
||||
out = self.run_function(
|
||||
"cmd.run", ["sleep 2 && echo hello"], f_timeout=1, python_shell=True
|
||||
)
|
||||
self.assertTrue("Timed out" in out)
|
||||
|
||||
@pytest.mark.skip_if_binaries_missing("sleep", reason="sleep cmd not installed")
|
||||
def test_timeout_success(self):
|
||||
"""
|
||||
cmd.run sufficient timeout to succeed
|
||||
"""
|
||||
out = self.run_function(
|
||||
"cmd.run", ["sleep 1 && echo hello"], f_timeout=2, python_shell=True
|
||||
)
|
||||
self.assertEqual(out, "hello")
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_hide_output(self):
|
||||
"""
|
||||
Test the hide_output argument
|
||||
"""
|
||||
ls_command = (
|
||||
["ls", "/"] if not salt.utils.platform.is_windows() else ["dir", "c:\\"]
|
||||
)
|
||||
|
||||
error_command = ["thiscommanddoesnotexist"]
|
||||
|
||||
# cmd.run
|
||||
out = self.run_function("cmd.run", ls_command, hide_output=True)
|
||||
self.assertEqual(out, "")
|
||||
|
||||
# cmd.shell
|
||||
out = self.run_function("cmd.shell", ls_command, hide_output=True)
|
||||
self.assertEqual(out, "")
|
||||
|
||||
# cmd.run_stdout
|
||||
out = self.run_function("cmd.run_stdout", ls_command, hide_output=True)
|
||||
self.assertEqual(out, "")
|
||||
|
||||
# cmd.run_stderr
|
||||
out = self.run_function("cmd.shell", error_command, hide_output=True)
|
||||
self.assertEqual(out, "")
|
||||
|
||||
# cmd.run_all (command should have produced stdout)
|
||||
out = self.run_function("cmd.run_all", ls_command, hide_output=True)
|
||||
self.assertEqual(out["stdout"], "")
|
||||
self.assertEqual(out["stderr"], "")
|
||||
|
||||
# cmd.run_all (command should have produced stderr)
|
||||
out = self.run_function("cmd.run_all", error_command, hide_output=True)
|
||||
self.assertEqual(out["stdout"], "")
|
||||
self.assertEqual(out["stderr"], "")
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_cmd_run_whoami(self):
|
||||
"""
|
||||
test return of whoami
|
||||
"""
|
||||
if not salt.utils.platform.is_windows():
|
||||
user = RUNTIME_VARS.RUNTIME_CONFIGS["master"]["user"]
|
||||
else:
|
||||
user = salt.utils.user.get_specific_user()
|
||||
if user.startswith("sudo_"):
|
||||
user = user.replace("sudo_", "")
|
||||
cmd = self.run_function("cmd.run", ["whoami"])
|
||||
try:
|
||||
self.assertEqual(user.lower(), cmd.lower())
|
||||
except AssertionError as exc:
|
||||
if not salt.utils.platform.is_windows():
|
||||
raise exc from None
|
||||
if "\\" in user:
|
||||
user = user.split("\\")[-1]
|
||||
self.assertEqual(user.lower(), cmd.lower())
|
||||
|
||||
@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows")
|
||||
@pytest.mark.slow_test
|
||||
def test_windows_env_handling(self):
|
||||
"""
|
||||
Ensure that nt.environ is used properly with cmd.run*
|
||||
"""
|
||||
out = self.run_function(
|
||||
"cmd.run", ["set"], env={"abc": "123", "ABC": "456"}
|
||||
).splitlines()
|
||||
self.assertIn("abc=123", out)
|
||||
self.assertIn("ABC=456", out)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows")
|
||||
def test_windows_powershell_script_args(self):
|
||||
"""
|
||||
Ensure that powershell processes inline script in args
|
||||
"""
|
||||
val = "i like cheese"
|
||||
args = (
|
||||
'-SecureString (ConvertTo-SecureString -String "{}" -AsPlainText -Force)'
|
||||
" -ErrorAction Stop".format(val)
|
||||
)
|
||||
script = "salt://issue-56195/test.ps1"
|
||||
ret = self.run_function(
|
||||
"cmd.script", [script], args=args, shell="powershell", saltenv="base"
|
||||
)
|
||||
self.assertEqual(ret["stdout"], val)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows")
|
||||
@pytest.mark.skip_if_binaries_missing("pwsh")
|
||||
def test_windows_powershell_script_args_pwsh(self):
|
||||
"""
|
||||
Ensure that powershell processes inline script in args with powershell
|
||||
core
|
||||
"""
|
||||
val = "i like cheese"
|
||||
args = (
|
||||
'-SecureString (ConvertTo-SecureString -String "{}" -AsPlainText -Force)'
|
||||
" -ErrorAction Stop".format(val)
|
||||
)
|
||||
script = "salt://issue-56195/test.ps1"
|
||||
ret = self.run_function(
|
||||
"cmd.script", [script], args=args, shell="pwsh", saltenv="base"
|
||||
)
|
||||
self.assertEqual(ret["stdout"], val)
|
561
tests/pytests/functional/modules/test_cmdmod.py
Normal file
561
tests/pytests/functional/modules/test_cmdmod.py
Normal file
|
@ -0,0 +1,561 @@
|
|||
import os
|
||||
import random
|
||||
import sys
|
||||
from contextlib import contextmanager
|
||||
|
||||
import pytest
|
||||
|
||||
import salt.config
|
||||
import salt.utils.path
|
||||
import salt.utils.platform
|
||||
import salt.utils.user
|
||||
from tests.support.helpers import SKIP_INITIAL_PHOTONOS_FAILURES, dedent
|
||||
|
||||
pytestmark = [pytest.mark.windows_whitelisted]
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def cmdmod(modules):
|
||||
return modules.cmd
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def usermod(modules):
|
||||
return modules.user
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def available_python_executable():
|
||||
yield salt.utils.path.which_bin(["python", "python2", "python2.6", "python2.7"])
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def runas_usr():
|
||||
runas_usr = "nobody"
|
||||
if salt.utils.platform.is_darwin():
|
||||
runas_usr = "macsalttest"
|
||||
yield runas_usr
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def running_username():
|
||||
"""
|
||||
Return the username that is running the code.
|
||||
"""
|
||||
return salt.utils.user.get_user()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def script_contents(state_tree):
|
||||
_contents = """
|
||||
#!/usr/bin/env python
|
||||
import sys
|
||||
print(" ".join(sys.argv[1:]))
|
||||
"""
|
||||
|
||||
with pytest.helpers.temp_file("script.py", _contents, state_tree):
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def issue_56195_test_ps1(state_tree):
|
||||
_contents = """
|
||||
[CmdLetBinding()]
|
||||
Param(
|
||||
[SecureString] $SecureString
|
||||
)
|
||||
$Credential = New-Object System.Net.NetworkCredential("DummyId", $SecureString)
|
||||
$Credential.Password
|
||||
"""
|
||||
|
||||
with pytest.helpers.temp_file("issue_56195_test.ps1", _contents, state_tree):
|
||||
yield
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _ensure_user_exists(name, usermod):
|
||||
if name in usermod.info(name).values():
|
||||
# User already exists; don't touch
|
||||
yield
|
||||
else:
|
||||
# Need to create user for test
|
||||
usermod.add(name)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
usermod.delete(name, remove=True)
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_run(cmdmod):
|
||||
"""
|
||||
cmd.run
|
||||
"""
|
||||
shell = os.environ.get("SHELL")
|
||||
if shell is None:
|
||||
# Failed to get the SHELL var, don't run
|
||||
pytest.skip("Unable to get the SHELL environment variable")
|
||||
|
||||
assert cmdmod.run("echo $SHELL")
|
||||
assert cmdmod.run("echo $SHELL", shell=shell, python_shell=True).rstrip() == shell
|
||||
assert cmdmod.run("ls / | grep etc", python_shell=True) == "etc"
|
||||
assert (
|
||||
cmdmod.run(
|
||||
'echo {{grains.id}} | awk "{print $1}"',
|
||||
template="jinja",
|
||||
python_shell=True,
|
||||
)
|
||||
== "func-tests-minion"
|
||||
)
|
||||
assert cmdmod.run("grep f", stdin="one\ntwo\nthree\nfour\nfive\n") == "four\nfive"
|
||||
assert cmdmod.run('echo "a=b" | sed -e s/=/:/g', python_shell=True) == "a:b"
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_stdout(cmdmod):
|
||||
"""
|
||||
cmd.run_stdout
|
||||
"""
|
||||
assert (
|
||||
cmdmod.run_stdout('echo "cheese"').rstrip() == "cheese"
|
||||
if not salt.utils.platform.is_windows()
|
||||
else '"cheese"'
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_stderr(cmdmod):
|
||||
"""
|
||||
cmd.run_stderr
|
||||
"""
|
||||
if sys.platform.startswith(("freebsd", "openbsd")):
|
||||
shell = "/bin/sh"
|
||||
else:
|
||||
shell = "/bin/bash"
|
||||
|
||||
assert (
|
||||
cmdmod.run_stderr(
|
||||
'echo "cheese" 1>&2',
|
||||
shell=shell,
|
||||
python_shell=True,
|
||||
).rstrip()
|
||||
== "cheese"
|
||||
if not salt.utils.platform.is_windows()
|
||||
else '"cheese"'
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_run_all(cmdmod):
|
||||
"""
|
||||
cmd.run_all
|
||||
"""
|
||||
if sys.platform.startswith(("freebsd", "openbsd")):
|
||||
shell = "/bin/sh"
|
||||
else:
|
||||
shell = "/bin/bash"
|
||||
|
||||
ret = cmdmod.run_all(
|
||||
'echo "cheese" 1>&2',
|
||||
shell=shell,
|
||||
python_shell=True,
|
||||
)
|
||||
assert "pid" in ret
|
||||
assert "retcode" in ret
|
||||
assert "stdout" in ret
|
||||
assert "stderr" in ret
|
||||
assert isinstance(ret.get("pid"), int)
|
||||
assert isinstance(ret.get("retcode"), int)
|
||||
assert isinstance(ret.get("stdout"), str)
|
||||
assert isinstance(ret.get("stderr"), str)
|
||||
assert (
|
||||
ret.get("stderr").rstrip() == "cheese"
|
||||
if not salt.utils.platform.is_windows()
|
||||
else '"cheese"'
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_retcode(cmdmod):
|
||||
"""
|
||||
cmd.retcode
|
||||
"""
|
||||
assert cmdmod.retcode("exit 0", python_shell=True) == 0
|
||||
assert cmdmod.retcode("exit 1", python_shell=True) == 1
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_run_all_with_success_retcodes(cmdmod):
|
||||
"""
|
||||
cmd.run with success_retcodes
|
||||
"""
|
||||
ret = cmdmod.run_all("exit 42", success_retcodes=[42], python_shell=True)
|
||||
|
||||
assert "retcode" in ret
|
||||
assert ret.get("retcode") == 0
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_retcode_with_success_retcodes(cmdmod):
|
||||
"""
|
||||
cmd.run with success_retcodes
|
||||
"""
|
||||
ret = cmdmod.retcode("exit 42", success_retcodes=[42], python_shell=True)
|
||||
|
||||
assert ret == 0
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_run_all_with_success_stderr(cmdmod, tmp_path):
|
||||
"""
|
||||
cmd.run with success_retcodes
|
||||
"""
|
||||
random_file = str(tmp_path / f"{random.random()}")
|
||||
|
||||
if salt.utils.platform.is_windows():
|
||||
func = "type"
|
||||
expected_stderr = "cannot find the file specified"
|
||||
else:
|
||||
func = "cat"
|
||||
expected_stderr = "No such file or directory"
|
||||
ret = cmdmod.run_all(
|
||||
f"{func} {random_file}",
|
||||
success_stderr=[expected_stderr],
|
||||
python_shell=True,
|
||||
)
|
||||
|
||||
assert "retcode" in ret
|
||||
assert ret.get("retcode") == 0
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_script(cmdmod, script_contents):
|
||||
"""
|
||||
cmd.script
|
||||
"""
|
||||
args = "saltines crackers biscuits=yes"
|
||||
script = "salt://script.py"
|
||||
ret = cmdmod.script(script, args, saltenv="base")
|
||||
assert ret["stdout"] == args
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_script_query_string(cmdmod, script_contents):
|
||||
"""
|
||||
cmd.script
|
||||
"""
|
||||
args = "saltines crackers biscuits=yes"
|
||||
script = "salt://script.py?saltenv=base"
|
||||
ret = cmdmod.script(script, args, saltenv="base")
|
||||
assert ret["stdout"] == args
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_script_retcode(cmdmod, script_contents):
|
||||
"""
|
||||
cmd.script_retcode
|
||||
"""
|
||||
script = "salt://script.py"
|
||||
ret = cmdmod.script_retcode(script, saltenv="base")
|
||||
assert ret == 0
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_script_cwd(cmdmod, script_contents, tmp_path):
|
||||
"""
|
||||
cmd.script with cwd
|
||||
"""
|
||||
tmp_cwd = str(tmp_path)
|
||||
args = "saltines crackers biscuits=yes"
|
||||
script = "salt://script.py"
|
||||
ret = cmdmod.script(script, args, cwd=tmp_cwd, saltenv="base")
|
||||
assert ret["stdout"] == args
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_script_cwd_with_space(cmdmod, script_contents, tmp_path):
|
||||
"""
|
||||
cmd.script with cwd
|
||||
"""
|
||||
tmp_cwd = str(tmp_path / "test 2")
|
||||
os.mkdir(tmp_cwd)
|
||||
|
||||
args = "saltines crackers biscuits=yes"
|
||||
script = "salt://script.py"
|
||||
ret = cmdmod.script(script, args, cwd=tmp_cwd, saltenv="base")
|
||||
assert ret["stdout"] == args
|
||||
|
||||
|
||||
@pytest.mark.destructive_test
|
||||
def test_tty(cmdmod):
|
||||
"""
|
||||
cmd.tty
|
||||
"""
|
||||
for tty in ("tty0", "pts3"):
|
||||
if os.path.exists(os.path.join("/dev", tty)):
|
||||
ret = cmdmod.tty(tty, "apply salt liberally")
|
||||
assert "Success" in ret
|
||||
|
||||
|
||||
@pytest.mark.skip_on_windows
|
||||
@pytest.mark.skip_if_binaries_missing("which")
|
||||
def test_which(cmdmod):
|
||||
"""
|
||||
cmd.which
|
||||
"""
|
||||
cmd_which = cmdmod.which("cat")
|
||||
assert isinstance(cmd_which, str)
|
||||
cmd_run = cmdmod.run("which cat")
|
||||
assert isinstance(cmd_run, str)
|
||||
assert cmd_which.rstrip() == cmd_run.rstrip()
|
||||
|
||||
|
||||
@pytest.mark.skip_on_windows
|
||||
@pytest.mark.skip_if_binaries_missing("which")
|
||||
def test_which_bin(cmdmod):
|
||||
"""
|
||||
cmd.which_bin
|
||||
"""
|
||||
cmds = ["pip3", "pip2", "pip", "pip-python"]
|
||||
ret = cmdmod.which_bin(cmds)
|
||||
assert os.path.split(ret)[1] in cmds
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_has_exec(cmdmod, available_python_executable):
|
||||
"""
|
||||
cmd.has_exec
|
||||
"""
|
||||
assert cmdmod.has_exec(available_python_executable)
|
||||
assert not cmdmod.has_exec("alllfsdfnwieulrrh9123857ygf")
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_exec_code(cmdmod, available_python_executable):
|
||||
"""
|
||||
cmd.exec_code
|
||||
"""
|
||||
code = dedent(
|
||||
"""
|
||||
import sys
|
||||
sys.stdout.write('cheese')
|
||||
"""
|
||||
)
|
||||
assert cmdmod.exec_code(available_python_executable, code).rstrip() == "cheese"
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_exec_code_with_single_arg(cmdmod, available_python_executable):
|
||||
"""
|
||||
cmd.exec_code
|
||||
"""
|
||||
code = dedent(
|
||||
"""
|
||||
import sys
|
||||
sys.stdout.write(sys.argv[1])
|
||||
"""
|
||||
)
|
||||
arg = "cheese"
|
||||
assert cmdmod.exec_code(available_python_executable, code, args=arg).rstrip() == arg
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_exec_code_with_multiple_args(cmdmod, available_python_executable):
|
||||
"""
|
||||
cmd.exec_code
|
||||
"""
|
||||
code = dedent(
|
||||
"""
|
||||
import sys
|
||||
sys.stdout.write(sys.argv[1])
|
||||
"""
|
||||
)
|
||||
arg = "cheese"
|
||||
assert (
|
||||
cmdmod.exec_code(available_python_executable, code, args=[arg, "test"]).rstrip()
|
||||
== arg
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_quotes(cmdmod):
|
||||
"""
|
||||
cmd.run with quoted command
|
||||
"""
|
||||
cmd = """echo 'SELECT * FROM foo WHERE bar="baz"' """
|
||||
expected_result = 'SELECT * FROM foo WHERE bar="baz"'
|
||||
result = cmdmod.run_stdout(cmd).strip()
|
||||
assert result == expected_result
|
||||
|
||||
|
||||
@pytest.mark.skip_if_not_root
|
||||
@pytest.mark.skip_on_windows(reason="Skip on Windows, requires password")
|
||||
def test_quotes_runas(cmdmod, running_username):
|
||||
"""
|
||||
cmd.run with quoted command
|
||||
"""
|
||||
cmd = """echo 'SELECT * FROM foo WHERE bar="baz"' """
|
||||
expected_result = 'SELECT * FROM foo WHERE bar="baz"'
|
||||
result = cmdmod.run_all(cmd, runas=running_username)
|
||||
errmsg = f"The command returned: {result}"
|
||||
assert result["retcode"] == 0, errmsg
|
||||
assert result["stdout"] == expected_result, errmsg
|
||||
|
||||
|
||||
@pytest.mark.destructive_test
|
||||
@pytest.mark.skip_if_not_root
|
||||
@pytest.mark.skip_on_windows(reason="Skip on Windows, uses unix commands")
|
||||
@pytest.mark.slow_test
|
||||
def test_cwd_runas(cmdmod, usermod, runas_usr, tmp_path):
|
||||
"""
|
||||
cmd.run should be able to change working directory correctly, whether
|
||||
or not runas is in use.
|
||||
"""
|
||||
cmd = "pwd"
|
||||
tmp_cwd = str(tmp_path)
|
||||
os.chmod(tmp_cwd, 0o711)
|
||||
|
||||
cwd_normal = cmdmod.run_stdout(cmd, cwd=tmp_cwd).rstrip("\n")
|
||||
assert tmp_cwd == cwd_normal
|
||||
|
||||
with _ensure_user_exists(runas_usr, usermod):
|
||||
cwd_runas = cmdmod.run_stdout(cmd, cwd=tmp_cwd, runas=runas_usr).rstrip("\n")
|
||||
assert tmp_cwd == cwd_runas
|
||||
|
||||
|
||||
@pytest.mark.destructive_test
|
||||
@pytest.mark.skip_if_not_root
|
||||
@pytest.mark.skip_unless_on_darwin(reason="Applicable to MacOS only")
|
||||
@pytest.mark.slow_test
|
||||
def test_runas_env(cmdmod, usermod, runas_usr):
|
||||
"""
|
||||
cmd.run should be able to change working directory correctly, whether
|
||||
or not runas is in use.
|
||||
"""
|
||||
with _ensure_user_exists(runas_usr, usermod):
|
||||
user_path = cmdmod.run_stdout('printf %s "$PATH"', runas=runas_usr)
|
||||
# XXX: Not sure of a better way. Environment starts out with
|
||||
# /bin:/usr/bin and should be populated by path helper and the bash
|
||||
# profile.
|
||||
assert "/bin:/usr/bin" != user_path
|
||||
|
||||
|
||||
@pytest.mark.destructive_test
|
||||
@pytest.mark.skip_if_not_root
|
||||
@pytest.mark.skip_unless_on_darwin(reason="Applicable to MacOS only")
|
||||
@pytest.mark.slow_test
|
||||
def test_runas_complex_command_bad_cwd(cmdmod, usermod, runas_usr, tmp_path):
|
||||
"""
|
||||
cmd.run should not accidentally run parts of a complex command when
|
||||
given a cwd which cannot be used by the user the command is run as.
|
||||
Due to the need to use `su -l` to login to another user on MacOS, we
|
||||
cannot cd into directories that the target user themselves does not
|
||||
have execute permission for. To an extent, this test is testing that
|
||||
buggy behaviour, but its purpose is to ensure that the greater bug of
|
||||
running commands after failing to cd does not occur.
|
||||
"""
|
||||
tmp_cwd = str(tmp_path)
|
||||
os.chmod(tmp_cwd, 0o700)
|
||||
|
||||
with _ensure_user_exists(runas_usr, usermod):
|
||||
cmd_result = cmdmod.run_all(
|
||||
'pwd; pwd; : $(echo "You have failed the test" >&2)',
|
||||
cwd=tmp_cwd,
|
||||
runas=runas_usr,
|
||||
)
|
||||
|
||||
assert "" == cmd_result["stdout"]
|
||||
assert "You have failed the test" not in cmd_result["stderr"]
|
||||
assert 0 != cmd_result["retcode"]
|
||||
|
||||
|
||||
@SKIP_INITIAL_PHOTONOS_FAILURES
|
||||
@pytest.mark.skip_on_windows
|
||||
@pytest.mark.skip_if_not_root
|
||||
@pytest.mark.destructive_test
|
||||
@pytest.mark.slow_test
|
||||
def test_runas(cmdmod, usermod, runas_usr):
|
||||
"""
|
||||
Ensure that the env is the runas user's
|
||||
"""
|
||||
with _ensure_user_exists(runas_usr, usermod):
|
||||
out = cmdmod.run("env", runas=runas_usr).splitlines()
|
||||
assert f"USER={runas_usr}" in out
|
||||
|
||||
|
||||
@pytest.mark.skip_if_binaries_missing("sleep", reason="sleep cmd not installed")
|
||||
def test_timeout(cmdmod):
|
||||
"""
|
||||
cmd.run trigger timeout
|
||||
"""
|
||||
out = cmdmod.run("sleep 2 && echo hello", timeout=1, python_shell=True)
|
||||
assert "Timed out" in out
|
||||
|
||||
|
||||
@pytest.mark.skip_if_binaries_missing("sleep", reason="sleep cmd not installed")
|
||||
def test_timeout_success(cmdmod):
|
||||
"""
|
||||
cmd.run sufficient timeout to succeed
|
||||
"""
|
||||
out = cmdmod.run("sleep 1 && echo hello", timeout=2, python_shell=True)
|
||||
assert out == "hello"
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_cmd_run_whoami(cmdmod, running_username):
|
||||
"""
|
||||
test return of whoami
|
||||
"""
|
||||
if not salt.utils.platform.is_windows():
|
||||
user = running_username
|
||||
else:
|
||||
user = salt.utils.user.get_specific_user()
|
||||
if user.startswith("sudo_"):
|
||||
user = user.replace("sudo_", "")
|
||||
cmd = cmdmod.run("whoami")
|
||||
assert user.lower() == cmd.lower()
|
||||
|
||||
|
||||
@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows")
|
||||
@pytest.mark.slow_test
|
||||
def test_windows_env_handling(cmdmod):
|
||||
"""
|
||||
Ensure that nt.environ is used properly with cmd.run*
|
||||
"""
|
||||
out = cmdmod.run("set", env={"abc": "123", "ABC": "456"}).splitlines()
|
||||
assert "abc=123" in out
|
||||
assert "ABC=456" in out
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows")
|
||||
def test_windows_powershell_script_args(cmdmod, issue_56195_test_ps1):
|
||||
"""
|
||||
Ensure that powershell processes inline script in args
|
||||
"""
|
||||
val = "i like cheese"
|
||||
args = (
|
||||
'-SecureString (ConvertTo-SecureString -String "{}" -AsPlainText -Force)'
|
||||
" -ErrorAction Stop".format(val)
|
||||
)
|
||||
script = "salt://issue_56195_test.ps1"
|
||||
ret = cmdmod.script(script, args=args, shell="powershell", saltenv="base")
|
||||
assert ret["stdout"] == val
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows")
|
||||
@pytest.mark.skip_if_binaries_missing("pwsh")
|
||||
def test_windows_powershell_script_args_pwsh(cmdmod, issue_56195_test_ps1):
|
||||
"""
|
||||
Ensure that powershell processes inline script in args with powershell
|
||||
core
|
||||
"""
|
||||
val = "i like cheese"
|
||||
args = (
|
||||
'-SecureString (ConvertTo-SecureString -String "{}" -AsPlainText -Force)'
|
||||
" -ErrorAction Stop".format(val)
|
||||
)
|
||||
script = "salt://issue_56195_test.ps1"
|
||||
ret = cmdmod.script(script, args=args, shell="pwsh", saltenv="base")
|
||||
assert ret["stdout"] == val
|
|
@ -1,5 +1,11 @@
|
|||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
import salt.utils.user
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def non_root_account():
|
||||
|
@ -7,6 +13,14 @@ def non_root_account():
|
|||
yield account
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def running_username():
|
||||
"""
|
||||
Return the username that is running the code.
|
||||
"""
|
||||
return salt.utils.user.get_user()
|
||||
|
||||
|
||||
@pytest.mark.skip_if_not_root
|
||||
def test_exec_code_all(salt_call_cli, non_root_account):
|
||||
ret = salt_call_cli.run(
|
||||
|
@ -22,3 +36,82 @@ def test_long_stdout(salt_cli, salt_minion):
|
|||
)
|
||||
assert ret.returncode == 0
|
||||
assert len(ret.data.strip()) == len(echo_str)
|
||||
|
||||
|
||||
@pytest.mark.skip_if_not_root
|
||||
@pytest.mark.skip_on_windows(reason="Skip on Windows, uses unix commands")
|
||||
def test_avoid_injecting_shell_code_as_root(
|
||||
salt_call_cli, non_root_account, running_username
|
||||
):
|
||||
"""
|
||||
cmd.run should execute the whole command as the "runas" user, not
|
||||
running substitutions as root.
|
||||
"""
|
||||
cmd = "echo $(id -u)"
|
||||
|
||||
ret = salt_call_cli.run("cmd.run_stdout", cmd)
|
||||
root_id = ret.json
|
||||
ret = salt_call_cli.run("cmd.run_stdout", cmd, runas=running_username)
|
||||
runas_root_id = ret.json
|
||||
|
||||
ret = salt_call_cli.run("cmd.run_stdout", cmd, runas=non_root_account.username)
|
||||
user_id = ret.json
|
||||
|
||||
assert user_id != root_id
|
||||
assert user_id != runas_root_id
|
||||
assert root_id == runas_root_id
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_blacklist_glob(salt_call_cli):
|
||||
"""
|
||||
cmd_blacklist_glob
|
||||
"""
|
||||
cmd = "bad_command --foo"
|
||||
ret = salt_call_cli.run(
|
||||
"cmd.run",
|
||||
cmd,
|
||||
)
|
||||
|
||||
assert (
|
||||
ret.stderr.rstrip()
|
||||
== "Error running 'cmd.run': The shell command \"bad_command --foo\" is not permitted"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_hide_output(salt_call_cli):
|
||||
"""
|
||||
Test the hide_output argument
|
||||
"""
|
||||
ls_command = (
|
||||
["ls", "/"] if not salt.utils.platform.is_windows() else ["dir", "c:\\"]
|
||||
)
|
||||
|
||||
error_command = ["thiscommanddoesnotexist"]
|
||||
|
||||
# cmd.run
|
||||
ret = salt_call_cli.run("cmd.run", ls_command, hide_output=True)
|
||||
assert ret.data == ""
|
||||
|
||||
# cmd.shell
|
||||
ret = salt_call_cli.run("cmd.shell", ls_command, hide_output=True)
|
||||
assert ret.data == ""
|
||||
|
||||
# cmd.run_stdout
|
||||
ret = salt_call_cli.run("cmd.run_stdout", ls_command, hide_output=True)
|
||||
assert ret.data == ""
|
||||
|
||||
# cmd.run_stderr
|
||||
ret = salt_call_cli.run("cmd.shell", error_command, hide_output=True)
|
||||
assert ret.data == ""
|
||||
|
||||
# cmd.run_all (command should have produced stdout)
|
||||
ret = salt_call_cli.run("cmd.run_all", ls_command, hide_output=True)
|
||||
assert ret.data["stdout"] == ""
|
||||
assert ret.data["stderr"] == ""
|
||||
|
||||
# cmd.run_all (command should have produced stderr)
|
||||
ret = salt_call_cli.run("cmd.run_all", error_command, hide_output=True)
|
||||
assert ret.data["stdout"] == ""
|
||||
assert ret.data["stderr"] == ""
|
||||
|
|
Loading…
Add table
Reference in a new issue