Fix issues with the cmd module on Windows

stderr is no longer piped to stdout by default for cmd.run
Scripts are called using the -File paramter for powershell.exe
stderr is cleared if it contains CLIXML (only for encoded commands)
cmd.powershell now accepts lists as commands
Makes sure returned JSON data is valid before trying to load it
Strips whitespace from the stdout in win_runas
This commit is contained in:
Shane Lee 2024-04-26 12:03:45 -06:00 committed by Pedro Algarvio
parent b6a9cc90b6
commit 45c56b0033
7 changed files with 322 additions and 129 deletions

6
changelog/61166.fixed.md Normal file
View file

@ -0,0 +1,6 @@
Fixes multiple issues with the cmd module on Windows. ``stderr`` is no
longer piped to ``stdout`` by default on ``cmd.run``. Scripts are called
using the ``-File`` parameter to the ``powershell.exe`` binary. ``CLIXML``
data in stderr is now removed (only applies to encoded commands). Commands can
now be sent to ``cmd.powershell`` as a list. Makes sure JSON data returned is
valid. Strips whitespace from the return when using ``runas``.

View file

@ -256,32 +256,39 @@ def _check_avail(cmd):
return bret and wret
def _prep_powershell_cmd(shell, cmd, stack, encoded_cmd):
def _prep_powershell_cmd(win_shell, cmd, encoded_cmd):
"""
Prep cmd when shell is powershell
Prep cmd when shell is powershell. If we were called by script(), then fake
out the Windows shell to run a Powershell script. Otherwise, just run a
Powershell command.
"""
# Find the full path to the shell
win_shell = salt.utils.path.which(win_shell)
# If this is running on Windows wrap
# the shell in quotes in case there are
# spaces in the paths.
if salt.utils.platform.is_windows():
shell = f'"{shell}"'
if not win_shell:
raise CommandExecutionError("PowerShell binary not found")
new_cmd = [win_shell, "-NonInteractive", "-NoProfile", "-ExecutionPolicy", "Bypass"]
# extract_stack() returns a list of tuples.
# The last item in the list [-1] is the current method.
# The third item[2] in each tuple is the name of that method.
if stack[-2][2] == "script":
cmd = (
"{} -NonInteractive -NoProfile -ExecutionPolicy Bypass -Command {}".format(
shell, cmd
)
)
stack = traceback.extract_stack(limit=3)
if stack[-3][2] == "script":
# If this is cmd.script, then we're running a file
new_cmd.extend(["-File", f"{cmd}"])
elif encoded_cmd:
cmd = f"{shell} -NonInteractive -NoProfile -EncodedCommand {cmd}"
new_cmd.extend(["-EncodedCommand", f"{cmd}"])
else:
cmd = f'{shell} -NonInteractive -NoProfile -Command "{cmd}"'
# Strip whitespace
if isinstance(cmd, str):
cmd = cmd.strip()
elif isinstance(cmd, list):
cmd = " ".join(cmd).strip()
new_cmd.extend(["-Command", f"& {{{cmd}}}"])
return cmd
log.debug(new_cmd)
return new_cmd
def _run(
@ -384,19 +391,7 @@ def _run(
# The powershell core binary is "pwsh"
# you can also pass a path here as long as the binary name is one of the two
if any(word in shell.lower().strip() for word in ["powershell", "pwsh"]):
# Strip whitespace
if isinstance(cmd, str):
cmd = cmd.strip()
elif isinstance(cmd, list):
cmd = " ".join(cmd).strip()
cmd = cmd.replace('"', '\\"')
# If we were called by script(), then fakeout the Windows
# shell to run a Powershell script.
# Else just run a Powershell command.
stack = traceback.extract_stack(limit=2)
cmd = _prep_powershell_cmd(shell, cmd, stack, encoded_cmd)
cmd = _prep_powershell_cmd(shell, cmd, encoded_cmd)
# munge the cmd and cwd through the template
(cmd, cwd) = _render_cmd(cmd, cwd, template, saltenv, pillarenv, pillar_override)
@ -809,6 +804,9 @@ def _run(
_log_cmd(cmd),
)
# Encoded commands dump CLIXML data in stderr. It's not an actual error
if encoded_cmd and "CLIXML" in err:
err = ""
if rstrip:
if out is not None:
out = out.rstrip()
@ -1055,6 +1053,7 @@ def run(
ignore_retcode=False,
saltenv=None,
use_vt=False,
redirect_stderr=False,
bg=False,
password=None,
encoded_cmd=False,
@ -1190,6 +1189,12 @@ def run(
:param bool use_vt: Use VT utils (saltstack) to stream the command output
more interactively to the console and the logs. This is experimental.
:param bool redirect_stderr: If set to ``True``, then stderr will be
redirected to stdout. This is helpful for cases where obtaining both
the retcode and output is desired.
.. versionadded:: 3006.9
:param bool encoded_cmd: Specify if the supplied command is encoded.
Only applies to shell 'powershell' and 'pwsh'.
@ -1301,6 +1306,7 @@ def run(
salt '*' cmd.run cmd='sed -e s/=/:/g'
"""
python_shell = _python_shell_default(python_shell, kwargs.get("__pub_jid", ""))
stderr = subprocess.STDOUT if redirect_stderr else subprocess.PIPE
ret = _run(
cmd,
runas=runas,
@ -1309,7 +1315,7 @@ def run(
python_shell=python_shell,
cwd=cwd,
stdin=stdin,
stderr=subprocess.STDOUT,
stderr=stderr,
env=env,
clean_env=clean_env,
prepend_path=prepend_path,
@ -4057,6 +4063,9 @@ def powershell(
else:
python_shell = True
if isinstance(cmd, list):
cmd = " ".join(cmd)
# Append PowerShell Object formatting
# ConvertTo-JSON is only available on PowerShell 3.0 and later
psversion = shell_info("powershell")["psversion"]
@ -4085,7 +4094,7 @@ def powershell(
encoded_cmd = False
# Retrieve the response, while overriding shell with 'powershell'
response = run(
response = run_stdout(
cmd,
cwd=cwd,
stdin=stdin,
@ -4113,9 +4122,8 @@ def powershell(
**kwargs,
)
# Sometimes Powershell returns an empty string, which isn't valid JSON
if response == "":
response = "{}"
response = _prep_powershell_json(response)
try:
return salt.utils.json.loads(response)
except Exception: # pylint: disable=broad-except
@ -4419,10 +4427,16 @@ def powershell_all(
else:
python_shell = True
if isinstance(cmd, list):
cmd = " ".join(cmd)
# Append PowerShell Object formatting
cmd += " | ConvertTo-JSON"
if depth is not None:
cmd += f" -Depth {depth}"
# ConvertTo-JSON is only available on PowerShell 3.0 and later
psversion = shell_info("powershell")["psversion"]
if salt.utils.versions.version_cmp(psversion, "2.0") == 1:
cmd += " | ConvertTo-JSON"
if depth is not None:
cmd += f" -Depth {depth}"
if encode_cmd:
# Convert the cmd to UTF-16LE without a BOM and base64 encode.
@ -4474,6 +4488,8 @@ def powershell_all(
response["result"] = []
return response
stdoutput = _prep_powershell_json(stdoutput)
# If we fail to parse stdoutput we will raise an exception
try:
result = salt.utils.json.loads(stdoutput)
@ -4492,9 +4508,30 @@ def powershell_all(
else:
# result type is list so the force_list param has no effect
response["result"] = result
# Encoded commands dump CLIXML data in stderr. It's not an actual error
if "CLIXML" in response["stderr"]:
response["stderr"] = ""
return response
def _prep_powershell_json(text):
"""
Try to fix the output from OutputTo-JSON in powershell commands to make it
valid JSON
"""
# An empty string just needs to be an empty quote
if text == "":
text = '""'
else:
# Raw text needs to be quoted
starts_with = ['"', "{", "["]
if not any(text.startswith(x) for x in starts_with):
text = f'"{text}"'
return text
def run_bg(
cmd,
cwd=None,

View file

@ -52,6 +52,9 @@ def __virtual__():
def split_username(username):
"""
Splits out the username from the domain name and returns both.
"""
domain = "."
user_name = username
if "@" in username:
@ -234,7 +237,7 @@ def runas(cmdLine, username, password=None, cwd=None):
fd_out = msvcrt.open_osfhandle(stdout_read.handle, os.O_RDONLY | os.O_TEXT)
with os.fdopen(fd_out, "r") as f_out:
stdout = f_out.read()
ret["stdout"] = stdout
ret["stdout"] = stdout.strip()
# Read standard error
fd_err = msvcrt.open_osfhandle(stderr_read.handle, os.O_RDONLY | os.O_TEXT)

View file

@ -1,10 +1,7 @@
import base64
import pytest
import salt.modules.cmdmod as cmdmod
import salt.utils.path
import salt.utils.stringutils
pytestmark = [
pytest.mark.windows_whitelisted,
@ -23,83 +20,189 @@ def shell(request):
return request.param
def test_powershell(shell):
@pytest.fixture(scope="module")
def account():
with pytest.helpers.create_account() as _account:
yield _account
@pytest.mark.parametrize(
"cmd, expected, encode_cmd",
[
("Write-Output Foo", "Foo", False),
(["Write-Output", "Foo"], "Foo", False),
('Write-Output "Encoded Foo"', "Encoded Foo", True),
(["Write-Output", '"Encoded Foo"'], "Encoded Foo", True),
],
)
def test_powershell(shell, cmd, expected, encode_cmd):
"""
Test cmd.powershell
"""
ret = cmdmod.powershell("Write-Output foo", shell=shell)
assert ret == "foo"
ret = cmdmod.powershell(cmd=cmd, encode_cmd=encode_cmd, shell=shell)
assert ret == expected
def test_powershell_encode_cmd(shell):
@pytest.mark.parametrize(
"cmd, expected, encode_cmd",
[
("Write-Output Foo", "Foo", False),
(["Write-Output", "Foo"], "Foo", False),
('Write-Output "Encoded Foo"', "Encoded Foo", True),
(["Write-Output", '"Encoded Foo"'], "Encoded Foo", True),
],
)
def test_powershell_runas(shell, account, cmd, expected, encode_cmd):
"""
Test cmd.powershell with encode_cmd
Test cmd.powershell with runas
"""
ret = cmdmod.powershell('Write-Output "encoded foo"', encode_cmd=True, shell=shell)
assert ret == "encoded foo"
ret = cmdmod.powershell(
cmd=cmd,
encode_cmd=encode_cmd,
shell=shell,
runas=account.username,
password=account.password,
)
assert ret == expected
def test_powershell_all(shell):
@pytest.mark.parametrize(
"cmd, expected, encode_cmd",
[
("Write-Output Foo", "Foo", False),
(["Write-Output", "Foo"], "Foo", False),
('Write-Output "Encoded Foo"', "Encoded Foo", True),
(["Write-Output", '"Encoded Foo"'], "Encoded Foo", True),
],
)
def test_powershell_all(shell, cmd, expected, encode_cmd):
"""
Test cmd.powershell_all
Test cmd.powershell_all. `encode_cmd` takes the passed command and encodes
it. Different from encoded_command where it's receiving an already encoded
command
"""
ret = cmdmod.powershell_all("Write-Output foo", shell=shell)
ret = cmdmod.powershell_all(cmd=cmd, encode_cmd=encode_cmd, shell=shell)
assert isinstance(ret["pid"], int)
assert ret["retcode"] == 0
assert ret["stderr"] == ""
assert ret["result"] == "foo"
assert ret["result"] == expected
def test_powershell_all_encode_cmd(shell):
@pytest.mark.parametrize(
"cmd, expected, encode_cmd",
[
("Write-Output Foo", "Foo", False),
(["Write-Output", "Foo"], "Foo", False),
('Write-Output "Encoded Foo"', "Encoded Foo", True),
(["Write-Output", '"Encoded Foo"'], "Encoded Foo", True),
],
)
def test_powershell_all_runas(shell, account, cmd, expected, encode_cmd):
"""
Test cmd.powershell_all with encode_cmd
Test cmd.powershell_all with runas. `encode_cmd` takes the passed command
and encodes it. Different from encoded_command where it's receiving an
already encoded command
"""
ret = cmdmod.powershell_all(
'Write-Output "encoded foo"', encode_cmd=True, shell=shell
cmd=cmd,
encode_cmd=encode_cmd,
shell=shell,
runas=account.username,
password=account.password,
)
assert isinstance(ret["pid"], int)
assert ret["retcode"] == 0
assert ret["stderr"] == ""
assert ret["result"] == "encoded foo"
assert ret["result"] == expected
def test_cmd_run_all_powershell_list():
@pytest.mark.parametrize(
"cmd, expected, encoded_cmd",
[
("Write-Output Foo", "Foo", False),
(["Write-Output", "Foo"], "Foo", False),
(
"VwByAGkAdABlAC0ASABvAHMAdAAgACcARQBuAGMAbwBkAGUAZAAgAEYAbwBvACcA",
"Encoded Foo",
True,
),
],
)
def test_cmd_run_all_powershell(shell, cmd, expected, encoded_cmd):
"""
Ensure that cmd.run_all supports running shell='powershell' with cmd passed
as a list
Ensure that cmd.run_all supports running shell='powershell'
"""
ret = cmdmod.run_all(cmd=cmd, shell=shell, encoded_cmd=encoded_cmd)
assert ret["stdout"] == expected
@pytest.mark.parametrize(
"cmd, expected, encoded_cmd",
[
("Write-Output Foo", "Foo", False),
(["Write-Output", "Foo"], "Foo", False),
(
"VwByAGkAdABlAC0ASABvAHMAdAAgACcARQBuAGMAbwBkAGUAZAAgAEYAbwBvACcA",
"Encoded Foo",
True,
),
],
)
def test_cmd_run_all_powershell_runas(shell, account, cmd, expected, encoded_cmd):
"""
Ensure that cmd.run_all with runas supports running shell='powershell'
"""
ret = cmdmod.run_all(
cmd=["Write-Output", "salt"], python_shell=False, shell="powershell"
cmd=cmd,
shell=shell,
encoded_cmd=encoded_cmd,
runas=account.username,
password=account.password,
)
assert ret["stdout"] == "salt"
assert ret["stdout"] == expected
def test_cmd_run_all_powershell_string():
@pytest.mark.parametrize(
"cmd, expected, encoded_cmd",
[
("Write-Output Foo", "Foo", False),
(["Write-Output", "Foo"], "Foo", False),
(
"VwByAGkAdABlAC0ASABvAHMAdAAgACcARQBuAGMAbwBkAGUAZAAgAEYAbwBvACcA",
"Encoded Foo",
True,
),
],
)
def test_cmd_run_encoded_cmd(shell, cmd, expected, encoded_cmd):
"""
Ensure that cmd.run_all supports running shell='powershell' with cmd passed
as a string
Ensure that cmd.run supports running shell='powershell'
"""
ret = cmdmod.run_all(
cmd="Write-Output salt", python_shell=False, shell="powershell"
ret = cmdmod.run(cmd=cmd, shell=shell, encoded_cmd=encoded_cmd)
assert ret == expected
@pytest.mark.parametrize(
"cmd, expected, encoded_cmd",
[
("Write-Output Foo", "Foo", False),
(["Write-Output", "Foo"], "Foo", False),
(
"VwByAGkAdABlAC0ASABvAHMAdAAgACcARQBuAGMAbwBkAGUAZAAgAEYAbwBvACcA",
"Encoded Foo",
True,
),
],
)
def test_cmd_run_encoded_cmd_runas(shell, account, cmd, expected, encoded_cmd):
"""
Ensure that cmd.run with runas supports running shell='powershell'
"""
ret = cmdmod.run(
cmd=cmd,
shell=shell,
encoded_cmd=encoded_cmd,
runas=account.username,
password=account.password,
)
assert ret["stdout"] == "salt"
def test_cmd_run_encoded_cmd(shell):
cmd = "Write-Output 'encoded command'"
cmd = f"$ProgressPreference='SilentlyContinue'; {cmd}"
cmd_utf16 = cmd.encode("utf-16-le")
encoded_cmd = base64.standard_b64encode(cmd_utf16)
encoded_cmd = salt.utils.stringutils.to_str(encoded_cmd)
ret = cmdmod.run(cmd=encoded_cmd, shell=shell, encoded_cmd=True)
assert ret == "encoded command"
def test_cmd_run_all_encoded_cmd(shell):
cmd = "Write-Output 'encoded command'"
cmd = f"$ProgressPreference='SilentlyContinue'; {cmd}"
cmd_utf16 = cmd.encode("utf-16-le")
encoded_cmd = base64.standard_b64encode(cmd_utf16)
encoded_cmd = salt.utils.stringutils.to_str(encoded_cmd)
ret = cmdmod.run_all(cmd=encoded_cmd, shell=shell, encoded_cmd=True)
assert ret["stdout"] == "encoded command"
assert ret == expected

View file

@ -310,7 +310,7 @@ def test_powershell_empty():
mock_run = {"pid": 1234, "retcode": 0, "stderr": "", "stdout": ""}
with patch("salt.modules.cmdmod._run", return_value=mock_run):
ret = cmdmod.powershell("Set-ExecutionPolicy RemoteSigned")
assert ret == {}
assert ret == ""
def test_is_valid_shell_windows():
@ -1052,57 +1052,97 @@ def test_runas_env_sudo_group(bundled):
)
def test_prep_powershell_cmd_no_powershell():
with pytest.raises(CommandExecutionError):
cmdmod._prep_powershell_cmd(
win_shell="unk_bin", cmd="Some-Command", encoded_cmd=False
)
def test_prep_powershell_cmd():
"""
Tests _prep_powershell_cmd returns correct cmd
"""
with patch("salt.utils.platform.is_windows", MagicMock(return_value=False)):
stack = [["", "", ""], ["", "", ""], ["", "", ""]]
stack = [["", "", ""], ["", "", ""], ["", "", ""], ["", "", ""]]
with patch("traceback.extract_stack", return_value=stack), patch(
"salt.utils.path.which", return_value="C:\\powershell.exe"
):
ret = cmdmod._prep_powershell_cmd(
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=False
win_shell="powershell", cmd="$PSVersionTable", encoded_cmd=False
)
assert ret == 'powershell -NonInteractive -NoProfile -Command "$PSVersionTable"'
expected = [
"C:\\powershell.exe",
"-NonInteractive",
"-NoProfile",
"-ExecutionPolicy",
"Bypass",
"-Command",
"& {$PSVersionTable}",
]
assert ret == expected
def test_prep_powershell_cmd_encoded():
"""
Tests _prep_powershell_cmd returns correct cmd when encoded_cmd=True
"""
stack = [["", "", ""], ["", "", ""], ["", "", ""], ["", "", ""]]
# This is the encoded command for 'Write-Host "Encoded HOLO"'
e_cmd = "VwByAGkAdABlAC0ASABvAHMAdAAgACIARQBuAGMAbwBkAGUAZAAgAEgATwBMAE8AIgA="
with patch("traceback.extract_stack", return_value=stack), patch(
"salt.utils.path.which", return_value="C:\\powershell.exe"
):
ret = cmdmod._prep_powershell_cmd(
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=True
)
assert (
ret
== "powershell -NonInteractive -NoProfile -EncodedCommand $PSVersionTable"
win_shell="powershell", cmd=e_cmd, encoded_cmd=True
)
expected = [
"C:\\powershell.exe",
"-NonInteractive",
"-NoProfile",
"-ExecutionPolicy",
"Bypass",
"-EncodedCommand",
f"{e_cmd}",
]
assert ret == expected
stack = [["", "", ""], ["", "", "script"], ["", "", ""]]
def test_prep_powershell_cmd_script():
"""
Tests _prep_powershell_cmd returns correct cmd when called from cmd.script
"""
stack = [["", "", ""], ["", "", "script"], ["", "", ""], ["", "", ""]]
script = r"C:\some\script.ps1"
with patch("traceback.extract_stack", return_value=stack), patch(
"salt.utils.path.which", return_value="C:\\powershell.exe"
):
ret = cmdmod._prep_powershell_cmd(
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=False
)
assert (
ret
== "powershell -NonInteractive -NoProfile -ExecutionPolicy Bypass -Command $PSVersionTable"
win_shell="powershell", cmd=script, encoded_cmd=False
)
expected = [
"C:\\powershell.exe",
"-NonInteractive",
"-NoProfile",
"-ExecutionPolicy",
"Bypass",
"-File",
script,
]
assert ret == expected
with patch("salt.utils.platform.is_windows", MagicMock(return_value=True)):
stack = [["", "", ""], ["", "", ""], ["", "", ""]]
ret = cmdmod._prep_powershell_cmd(
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=False
)
assert (
ret == '"powershell" -NonInteractive -NoProfile -Command "$PSVersionTable"'
)
ret = cmdmod._prep_powershell_cmd(
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=True
)
assert (
ret
== '"powershell" -NonInteractive -NoProfile -EncodedCommand $PSVersionTable'
)
stack = [["", "", ""], ["", "", "script"], ["", "", ""]]
ret = cmdmod._prep_powershell_cmd(
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=False
)
assert (
ret
== '"powershell" -NonInteractive -NoProfile -ExecutionPolicy Bypass -Command $PSVersionTable'
)
@pytest.mark.parametrize(
"text, expected",
[
("", '""'), # Should quote an empty string
("Foo", '"Foo"'), # Should quote a string
('["foo", "bar"]', '["foo", "bar"]'), # Should leave unchanged
('{"foo": "bar"}', '{"foo": "bar"}'), # Should leave unchanged
],
)
def test_prep_powershell_json(text, expected):
"""
Make sure the output is valid json
"""
result = cmdmod._prep_powershell_json(text)
assert result == expected

View file

@ -329,6 +329,11 @@ class TestAccount:
ret = self.sminion.functions.user.add(self.username)
assert ret is True
self._delete_account = True
if salt.utils.platform.is_windows():
log.debug("Configuring system account: %s", self)
ret = self.sminion.functions.user.update(
self.username, password_never_expires=True
)
if salt.utils.platform.is_darwin() or salt.utils.platform.is_windows():
password = self.password
else:

View file

@ -361,7 +361,6 @@ MISSING_DOCSTRINGS = {
"machine_get_machinestate_tuple",
],
"salt/utils/win_osinfo.py": ["get_os_version_info"],
"salt/utils/win_runas.py": ["split_username"],
"salt/utils/yamldumper.py": [
"represent_undefined",
"represent_ordereddict",