Merge pull request #66521 from s0undt3ch/hotfix/merge-forward-into-3007.x

[3007.x] Merge 3006.x into 3007.x
This commit is contained in:
Pedro Algarvio 2024-05-15 15:25:57 +01:00 committed by GitHub
commit 70324fab55
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 918 additions and 435 deletions

5
changelog/61166.fixed.md Normal file
View file

@ -0,0 +1,5 @@
Fixes multiple issues with the cmd module on Windows. Scripts are called using
the ``-File`` parameter to the ``powershell.exe`` binary. ``CLIXML`` data in
stderr is now removed (only applies to encoded commands). Commands can now be
sent to ``cmd.powershell`` as a list. Makes sure JSON data returned is valid.
Strips whitespace from the return when using ``runas``.

2
changelog/61534.fixed.md Normal file
View file

@ -0,0 +1,2 @@
Fixed the win_lgpo_netsh salt util to handle non-English systems. This was a
rewrite to use PowerShell instead of netsh to make the changes on the system

1
changelog/65295.fixed.md Normal file
View file

@ -0,0 +1 @@
Fix typo in nftables module to ensure unique nft family values

1
changelog/65837.fixed.md Normal file
View file

@ -0,0 +1 @@
Corrected x509_v2 CRL creation `last_update` and `next_update` values when system timezone is not UTC

1
changelog/66382.fixed.md Normal file
View file

@ -0,0 +1 @@
Fixed nftables.build_rule breaks ipv6 rules by using the wrong syntax for source and destination addresses

View file

@ -256,32 +256,44 @@ def _check_avail(cmd):
return bret and wret
def _prep_powershell_cmd(shell, cmd, stack, encoded_cmd):
def _prep_powershell_cmd(win_shell, cmd, encoded_cmd):
"""
Prep cmd when shell is powershell
Prep cmd when shell is powershell. If we were called by script(), then fake
out the Windows shell to run a Powershell script. Otherwise, just run a
Powershell command.
"""
# Find the full path to the shell
win_shell = salt.utils.path.which(win_shell)
# If this is running on Windows wrap
# the shell in quotes in case there are
# spaces in the paths.
if salt.utils.platform.is_windows():
shell = f'"{shell}"'
if not win_shell:
raise CommandExecutionError("PowerShell binary not found")
new_cmd = [win_shell, "-NonInteractive", "-NoProfile", "-ExecutionPolicy", "Bypass"]
# extract_stack() returns a list of tuples.
# The last item in the list [-1] is the current method.
# The third item[2] in each tuple is the name of that method.
if stack[-2][2] == "script":
cmd = (
"{} -NonInteractive -NoProfile -ExecutionPolicy Bypass -Command {}".format(
shell, cmd
)
)
stack = traceback.extract_stack(limit=3)
if stack[-3][2] == "script":
# If this is cmd.script, then we're running a file
# You might be tempted to use -File here instead of -Command
# The problem with using -File is that any arguments that contain
# powershell commands themselves will not be evaluated
# See GitHub issue #56195
new_cmd.append("-Command")
if isinstance(cmd, list):
cmd = " ".join(cmd)
new_cmd.append(f"& {cmd.strip()}")
elif encoded_cmd:
cmd = f"{shell} -NonInteractive -NoProfile -EncodedCommand {cmd}"
new_cmd.extend(["-EncodedCommand", f"{cmd}"])
else:
cmd = f'{shell} -NonInteractive -NoProfile -Command "{cmd}"'
# Strip whitespace
if isinstance(cmd, list):
cmd = " ".join(cmd)
new_cmd.extend(["-Command", f"& {{{cmd.strip()}}}"])
return cmd
log.debug(new_cmd)
return new_cmd
def _run(
@ -384,19 +396,7 @@ def _run(
# The powershell core binary is "pwsh"
# you can also pass a path here as long as the binary name is one of the two
if any(word in shell.lower().strip() for word in ["powershell", "pwsh"]):
# Strip whitespace
if isinstance(cmd, str):
cmd = cmd.strip()
elif isinstance(cmd, list):
cmd = " ".join(cmd).strip()
cmd = cmd.replace('"', '\\"')
# If we were called by script(), then fakeout the Windows
# shell to run a Powershell script.
# Else just run a Powershell command.
stack = traceback.extract_stack(limit=2)
cmd = _prep_powershell_cmd(shell, cmd, stack, encoded_cmd)
cmd = _prep_powershell_cmd(shell, cmd, encoded_cmd)
# munge the cmd and cwd through the template
(cmd, cwd) = _render_cmd(cmd, cwd, template, saltenv, pillarenv, pillar_override)
@ -809,6 +809,9 @@ def _run(
_log_cmd(cmd),
)
# Encoded commands dump CLIXML data in stderr. It's not an actual error
if encoded_cmd and "CLIXML" in err:
err = ""
if rstrip:
if out is not None:
out = out.rstrip()
@ -1055,6 +1058,7 @@ def run(
ignore_retcode=False,
saltenv=None,
use_vt=False,
redirect_stderr=True,
bg=False,
password=None,
encoded_cmd=False,
@ -1190,6 +1194,12 @@ def run(
:param bool use_vt: Use VT utils (saltstack) to stream the command output
more interactively to the console and the logs. This is experimental.
:param bool redirect_stderr: If set to ``True``, then stderr will be
redirected to stdout. This is helpful for cases where obtaining both
the retcode and output is desired. Default is ``True``
.. versionadded:: 3006.9
:param bool encoded_cmd: Specify if the supplied command is encoded.
Only applies to shell 'powershell' and 'pwsh'.
@ -1301,6 +1311,7 @@ def run(
salt '*' cmd.run cmd='sed -e s/=/:/g'
"""
python_shell = _python_shell_default(python_shell, kwargs.get("__pub_jid", ""))
stderr = subprocess.STDOUT if redirect_stderr else subprocess.PIPE
ret = _run(
cmd,
runas=runas,
@ -1309,7 +1320,7 @@ def run(
python_shell=python_shell,
cwd=cwd,
stdin=stdin,
stderr=subprocess.STDOUT,
stderr=stderr,
env=env,
clean_env=clean_env,
prepend_path=prepend_path,
@ -4057,6 +4068,9 @@ def powershell(
else:
python_shell = True
if isinstance(cmd, list):
cmd = " ".join(cmd)
# Append PowerShell Object formatting
# ConvertTo-JSON is only available on PowerShell 3.0 and later
psversion = shell_info("powershell")["psversion"]
@ -4085,7 +4099,7 @@ def powershell(
encoded_cmd = False
# Retrieve the response, while overriding shell with 'powershell'
response = run(
response = run_stdout(
cmd,
cwd=cwd,
stdin=stdin,
@ -4113,9 +4127,8 @@ def powershell(
**kwargs,
)
# Sometimes Powershell returns an empty string, which isn't valid JSON
if response == "":
response = "{}"
response = _prep_powershell_json(response)
try:
return salt.utils.json.loads(response)
except Exception: # pylint: disable=broad-except
@ -4419,10 +4432,16 @@ def powershell_all(
else:
python_shell = True
if isinstance(cmd, list):
cmd = " ".join(cmd)
# Append PowerShell Object formatting
cmd += " | ConvertTo-JSON"
if depth is not None:
cmd += f" -Depth {depth}"
# ConvertTo-JSON is only available on PowerShell 3.0 and later
psversion = shell_info("powershell")["psversion"]
if salt.utils.versions.version_cmp(psversion, "2.0") == 1:
cmd += " | ConvertTo-JSON"
if depth is not None:
cmd += f" -Depth {depth}"
if encode_cmd:
# Convert the cmd to UTF-16LE without a BOM and base64 encode.
@ -4474,6 +4493,8 @@ def powershell_all(
response["result"] = []
return response
stdoutput = _prep_powershell_json(stdoutput)
# If we fail to parse stdoutput we will raise an exception
try:
result = salt.utils.json.loads(stdoutput)
@ -4492,9 +4513,30 @@ def powershell_all(
else:
# result type is list so the force_list param has no effect
response["result"] = result
# Encoded commands dump CLIXML data in stderr. It's not an actual error
if "CLIXML" in response["stderr"]:
response["stderr"] = ""
return response
def _prep_powershell_json(text):
"""
Try to fix the output from OutputTo-JSON in powershell commands to make it
valid JSON
"""
# An empty string just needs to be an empty quote
if text == "":
text = '""'
else:
# Raw text needs to be quoted
starts_with = ['"', "{", "["]
if not any(text.startswith(x) for x in starts_with):
text = f'"{text}"'
return text
def run_bg(
cmd,
cwd=None,

View file

@ -165,14 +165,18 @@ def build_rule(
del kwargs["counter"]
if "saddr" in kwargs or "source" in kwargs:
rule += "ip saddr {} ".format(kwargs.get("saddr") or kwargs.get("source"))
rule += "{} saddr {} ".format(
nft_family, kwargs.get("saddr") or kwargs.get("source")
)
if "saddr" in kwargs:
del kwargs["saddr"]
if "source" in kwargs:
del kwargs["source"]
if "daddr" in kwargs or "destination" in kwargs:
rule += "ip daddr {} ".format(kwargs.get("daddr") or kwargs.get("destination"))
rule += "{} daddr {} ".format(
nft_family, kwargs.get("daddr") or kwargs.get("destination")
)
if "daddr" in kwargs:
del kwargs["daddr"]
if "destination" in kwargs:

View file

@ -450,7 +450,9 @@ def get_config():
raise
config = dict()
if raw_config:
if not raw_config:
raise CommandExecutionError("Not Configured")
else:
# Does this Configuration contain a single resource
if "ConfigurationName" in raw_config:
# Load the single resource
@ -606,11 +608,13 @@ def test_config():
"""
cmd = "Test-DscConfiguration"
try:
_pshell(cmd, ignore_retcode=True)
result = _pshell(cmd, ignore_retcode=True)
except CommandExecutionError as exc:
if "Current configuration does not exist" in exc.info["stderr"]:
raise CommandExecutionError("Not Configured")
raise
if not result:
raise CommandExecutionError("Not Configured")
return True
@ -635,11 +639,14 @@ def get_config_status():
"Type, Mode, RebootRequested, NumberofResources"
)
try:
return _pshell(cmd, ignore_retcode=True)
result = _pshell(cmd, ignore_retcode=True)
except CommandExecutionError as exc:
if "No status information available" in exc.info["stderr"]:
raise CommandExecutionError("Not Configured")
raise
if not result:
raise CommandExecutionError("Not Configured")
return result
def get_lcm_config():

View file

@ -168,12 +168,12 @@ Note that when a ``ca_server`` is involved, both peers must use the updated modu
import base64
import copy
import datetime
import glob
import logging
import os.path
import re
import sys
from datetime import datetime, timedelta, timezone
try:
import cryptography.x509 as cx509
@ -1409,10 +1409,12 @@ def expires(certificate, days=0):
Defaults to ``0``, which checks for the current time.
"""
cert = x509util.load_cert(certificate)
# dates are encoded in UTC/GMT, they are returned as a naive datetime object
return cert.not_valid_after <= datetime.datetime.utcnow() + datetime.timedelta(
days=days
)
try:
not_after = cert.not_valid_after_utc
except AttributeError:
# naive datetime object, release <42 (it's always UTC)
not_after = cert.not_valid_after.replace(tzinfo=timezone.utc)
return not_after <= datetime.now(tz=timezone.utc) + timedelta(days=days)
def expired(certificate):
@ -1692,6 +1694,13 @@ def read_certificate(certificate):
cert = x509util.load_cert(certificate)
key_type = x509util.get_key_type(cert.public_key(), as_string=True)
try:
not_before = cert.not_valid_before_utc
not_after = cert.not_valid_after_utc
except AttributeError:
# naive datetime object, release <42 (it's always UTC)
not_before = cert.not_valid_before.replace(tzinfo=timezone.utc)
not_after = cert.not_valid_after.replace(tzinfo=timezone.utc)
ret = {
"version": cert.version.value + 1, # 0-indexed
"key_size": cert.public_key().key_size if key_type in ["ec", "rsa"] else None,
@ -1707,8 +1716,8 @@ def read_certificate(certificate):
"issuer": _parse_dn(cert.issuer),
"issuer_hash": x509util.pretty_hex(_get_name_hash(cert.issuer)),
"issuer_str": cert.issuer.rfc4514_string(),
"not_before": cert.not_valid_before.strftime(x509util.TIME_FMT),
"not_after": cert.not_valid_after.strftime(x509util.TIME_FMT),
"not_before": not_before.strftime(x509util.TIME_FMT),
"not_after": not_after.strftime(x509util.TIME_FMT),
"public_key": get_public_key(cert),
"extensions": _parse_extensions(cert.extensions),
}
@ -1774,10 +1783,16 @@ def read_crl(crl):
The certificate revocation list to read.
"""
crl = x509util.load_crl(crl)
try:
last_update = crl.last_update_utc
next_update = crl.next_update_utc
except AttributeError:
last_update = crl.last_update.replace(tzinfo=timezone.utc)
next_update = crl.next_update.replace(tzinfo=timezone.utc)
ret = {
"issuer": _parse_dn(crl.issuer),
"last_update": crl.last_update.strftime(x509util.TIME_FMT),
"next_update": crl.next_update.strftime(x509util.TIME_FMT),
"last_update": last_update.strftime(x509util.TIME_FMT),
"next_update": next_update.strftime(x509util.TIME_FMT),
"revoked_certificates": {},
"extensions": _parse_extensions(crl.extensions),
}
@ -1797,12 +1812,15 @@ def read_crl(crl):
ret["signature_algorithm"] = crl.signature_algorithm_oid.dotted_string
for revoked in crl:
try:
revocation_date = revoked.revocation_date_utc
except AttributeError:
# naive datetime object, release <42 (it's always UTC)
revocation_date = revoked.revocation_date.replace(tzinfo=timezone.utc)
ret["revoked_certificates"].update(
{
x509util.dec2hex(revoked.serial_number).replace(":", ""): {
"revocation_date": revoked.revocation_date.strftime(
x509util.TIME_FMT
),
"revocation_date": revocation_date.strftime(x509util.TIME_FMT),
"extensions": _parse_crl_entry_extensions(revoked.extensions),
}
}

View file

@ -183,9 +183,9 @@ according to the www policy.
import base64
import copy
import datetime
import logging
import os.path
from datetime import datetime, timedelta, timezone
import salt.utils.files
from salt.exceptions import CommandExecutionError, SaltInvocationError
@ -487,11 +487,16 @@ def certificate_managed(
else None
):
changes["pkcs12_friendlyname"] = pkcs12_friendlyname
try:
curr_not_after = current.not_valid_after_utc
except AttributeError:
# naive datetime object, release <42 (it's always UTC)
curr_not_after = current.not_valid_after.replace(
tzinfo=timezone.utc
)
if (
current.not_valid_after
< datetime.datetime.utcnow()
+ datetime.timedelta(days=days_remaining)
if curr_not_after < datetime.now(tz=timezone.utc) + timedelta(
days=days_remaining
):
changes["expiration"] = True
@ -896,10 +901,14 @@ def crl_managed(
if encoding != current_encoding:
changes["encoding"] = encoding
try:
curr_next_update = current.next_update_utc
except AttributeError:
# naive datetime object, release <42 (it's always UTC)
curr_next_update = current.next_update.replace(tzinfo=timezone.utc)
if days_remaining and (
current.next_update
< datetime.datetime.utcnow()
+ datetime.timedelta(days=days_remaining)
curr_next_update
< datetime.now(tz=timezone.utc) + timedelta(days=days_remaining)
):
changes["expiration"] = True

View file

@ -6,16 +6,24 @@ A salt util for modifying firewall settings.
This util allows you to modify firewall settings in the local group policy in
addition to the normal firewall settings. Parameters are taken from the
netsh advfirewall prompt.
netsh advfirewall prompt. This utility has been adapted to use powershell
instead of the ``netsh`` command to make it compatible with non-English systems.
It maintains the ``netsh`` commands and parameters, but it is using powershell
under the hood.
.. versionchanged:: 3008.0
.. note::
More information can be found in the advfirewall context in netsh. This can
be access by opening a netsh prompt. At a command prompt type the following:
be accessed by opening a netsh prompt. At a command prompt type the
following:
c:\>netsh
netsh>advfirewall
netsh advfirewall>set help
netsh advfirewall>set domain help
.. code-block:: powershell
c:\>netsh
netsh>advfirewall
netsh advfirewall>set help
netsh advfirewall>set domain help
Usage:
@ -66,87 +74,73 @@ Usage:
store='lgpo')
"""
import logging
import os
import re
import socket
import tempfile
from textwrap import dedent
import salt.modules.cmdmod
import salt.utils.platform
import salt.utils.win_pwsh
from salt.exceptions import CommandExecutionError
log = logging.getLogger(__name__)
__hostname__ = socket.gethostname()
__virtualname__ = "netsh"
ON_OFF = {
0: "OFF",
1: "ON",
2: "NotConfigured",
"off": "False",
"on": "True",
"notconfigured": "NotConfigured",
}
ENABLE_DISABLE = {
0: "Disable",
1: "Enable",
2: "NotConfigured",
"disable": 0,
"enable": 1,
"notconfigured": 2,
}
OUTBOUND = {
0: "NotConfigured",
2: "AllowOutbound",
4: "BlockOutbound",
"notconfigured": "NotConfigured",
"allowoutbound": "Allow",
"blockoutbound": "Block",
}
# Although utils are often directly imported, it is also possible to use the
# loader.
def __virtual__():
def _get_inbound_text(rule, action):
"""
Only load if on a Windows system
The "Inbound connections" setting is a combination of 2 parameters:
- AllowInboundRules
- DefaultInboundAction
The settings are as follows:
Rules Action
2 2 AllowInbound
2 4 BlockInbound
0 4 BlockInboundAlways
2 0 NotConfigured
"""
if not salt.utils.platform.is_windows():
return False, "This utility only available on Windows"
return __virtualname__
settings = {
0: {
4: "BlockInboundAlways",
},
2: {
0: "NotConfigured",
2: "AllowInbound",
4: "BlockInbound",
},
}
return settings[rule][action]
def _netsh_file(content):
"""
helper function to get the results of ``netsh -f content.txt``
Running ``netsh`` will drop you into a ``netsh`` prompt where you can issue
``netsh`` commands. You can put a series of commands in an external file and
run them as if from a ``netsh`` prompt using the ``-f`` switch. That's what
this function does.
Args:
content (str):
The contents of the file that will be run by the ``netsh -f``
command
Returns:
str: The text returned by the netsh command
"""
with tempfile.NamedTemporaryFile(
mode="w", prefix="salt-", suffix=".netsh", delete=False, encoding="utf-8"
) as fp:
fp.write(content)
try:
log.debug("%s:\n%s", fp.name, content)
return salt.modules.cmdmod.run(f"netsh -f {fp.name}", python_shell=True)
finally:
os.remove(fp.name)
def _netsh_command(command, store):
if store.lower() not in ("local", "lgpo"):
raise ValueError(f"Incorrect store: {store}")
# set the store for local or lgpo
if store.lower() == "local":
netsh_script = dedent(
"""\
advfirewall
set store local
{}
""".format(
command
)
)
else:
netsh_script = dedent(
"""\
advfirewall
set store gpo = {}
{}
""".format(
__hostname__, command
)
)
return _netsh_file(content=netsh_script).splitlines()
def _get_inbound_settings(text):
settings = {
"allowinbound": (2, 2),
"blockinbound": (2, 4),
"blockinboundalways": (0, 4),
"notconfigured": (2, 0),
}
return settings[text.lower()]
def get_settings(profile, section, store="local"):
@ -195,70 +189,54 @@ def get_settings(profile, section, store="local"):
raise ValueError(f"Incorrect section: {section}")
if store.lower() not in ("local", "lgpo"):
raise ValueError(f"Incorrect store: {store}")
command = f"show {profile}profile {section}"
# run it
results = _netsh_command(command=command, store=store)
# sample output:
# Domain Profile Settings:
# ----------------------------------------------------------------------
# LocalFirewallRules N/A (GPO-store only)
# LocalConSecRules N/A (GPO-store only)
# InboundUserNotification Disable
# RemoteManagement Disable
# UnicastResponseToMulticast Enable
# if it's less than 3 lines it failed
if len(results) < 3:
raise CommandExecutionError(f"Invalid results: {results}")
ret = {}
# Skip the first 2 lines. Add everything else to a dictionary
for line in results[3:]:
ret.update(dict(list(zip(*[iter(re.split(r"\s{2,}", line))] * 2))))
# Build the powershell command
cmd = ["Get-NetFirewallProfile"]
if profile:
cmd.append(profile)
if store and store.lower() == "lgpo":
cmd.extend(["-PolicyStore", "localhost"])
# Remove spaces from the values so that `Not Configured` is detected
# correctly
for item in ret:
ret[item] = ret[item].replace(" ", "")
# Run the command
settings = salt.utils.win_pwsh.run_dict(cmd)
# special handling for firewallpolicy
if section == "firewallpolicy":
inbound, outbound = ret["Firewall Policy"].split(",")
return {"Inbound": inbound, "Outbound": outbound}
# A successful run should return a dictionary
if not settings:
raise CommandExecutionError("LGPO NETSH: An unknown error occurred")
return ret
# Remove the junk
for setting in list(settings.keys()):
if setting.startswith("Cim"):
settings.pop(setting)
# Make it look like netsh output
ret_settings = {
"firewallpolicy": {
"Inbound": _get_inbound_text(
settings["AllowInboundRules"], settings["DefaultInboundAction"]
),
"Outbound": OUTBOUND[settings["DefaultOutboundAction"]],
},
"state": {
"State": ON_OFF[settings["Enabled"]],
},
"logging": {
"FileName": settings["LogFileName"],
"LogAllowedConnections": ENABLE_DISABLE[settings["LogAllowed"]],
"LogDroppedConnections": ENABLE_DISABLE[settings["LogBlocked"]],
"MaxFileSize": settings["LogMaxSizeKilobytes"],
},
"settings": {
"InboundUserNotification": ENABLE_DISABLE[settings["NotifyOnListen"]],
"LocalConSecRules": ENABLE_DISABLE[settings["AllowLocalIPsecRules"]],
"LocalFirewallRules": ENABLE_DISABLE[settings["AllowLocalFirewallRules"]],
"UnicastResponseToMulticast": ENABLE_DISABLE[
settings["AllowUnicastResponseToMulticast"]
],
},
}
def get_all_settings(profile, store="local"):
"""
Gets all the properties for the specified profile in the specified store
Args:
profile (str):
The firewall profile to query. Valid options are:
- domain
- public
- private
store (str):
The store to use. This is either the local firewall policy or the
policy defined by local group policy. Valid options are:
- lgpo
- local
Default is ``local``
Returns:
dict: A dictionary containing the specified settings
"""
ret = dict()
ret.update(get_settings(profile=profile, section="state", store=store))
ret.update(get_settings(profile=profile, section="firewallpolicy", store=store))
ret.update(get_settings(profile=profile, section="settings", store=store))
ret.update(get_settings(profile=profile, section="logging", store=store))
return ret
return ret_settings[section.lower()]
def get_all_profiles(store="local"):
@ -286,6 +264,82 @@ def get_all_profiles(store="local"):
}
def get_all_settings(profile, store="local"):
"""
Gets all the properties for the specified profile in the specified store
Args:
profile (str):
The firewall profile to query. Valid options are:
- domain
- public
- private
store (str):
The store to use. This is either the local firewall policy or the
policy defined by local group policy. Valid options are:
- lgpo
- local
Default is ``local``
Returns:
dict: A dictionary containing the specified settings
Raises:
CommandExecutionError: If an error occurs
ValueError: If the parameters are incorrect
"""
# validate input
if profile.lower() not in ("domain", "public", "private"):
raise ValueError(f"Incorrect profile: {profile}")
if store.lower() not in ("local", "lgpo"):
raise ValueError(f"Incorrect store: {store}")
# Build the powershell command
cmd = ["Get-NetFirewallProfile"]
if profile:
cmd.append(profile)
if store and store.lower() == "lgpo":
cmd.extend(["-PolicyStore", "localhost"])
# Run the command
settings = salt.utils.win_pwsh.run_dict(cmd)
# A successful run should return a dictionary
if not settings:
raise CommandExecutionError("LGPO NETSH: An unknown error occurred")
# Remove the junk
for setting in list(settings.keys()):
if setting.startswith("Cim"):
settings.pop(setting)
# Make it look like netsh output
ret_settings = {
"FileName": settings["LogFileName"],
"Inbound": _get_inbound_text(
settings["AllowInboundRules"], settings["DefaultInboundAction"]
),
"InboundUserNotification": ENABLE_DISABLE[settings["NotifyOnListen"]],
"LocalConSecRules": ENABLE_DISABLE[settings["AllowLocalIPsecRules"]],
"LocalFirewallRules": ENABLE_DISABLE[settings["AllowLocalFirewallRules"]],
"LogAllowedConnections": ENABLE_DISABLE[settings["LogAllowed"]],
"LogDroppedConnections": ENABLE_DISABLE[settings["LogBlocked"]],
"MaxFileSize": settings["LogMaxSizeKilobytes"],
"Outbound": OUTBOUND[settings["DefaultOutboundAction"]],
"State": ON_OFF[settings["Enabled"]],
"UnicastResponseToMulticast": ON_OFF[
settings["AllowUnicastResponseToMulticast"]
],
}
return ret_settings
def set_firewall_settings(profile, inbound=None, outbound=None, store="local"):
"""
Set the firewall inbound/outbound settings for the specified profile and
@ -307,7 +361,7 @@ def set_firewall_settings(profile, inbound=None, outbound=None, store="local"):
- blockinbound
- blockinboundalways
- allowinbound
- notconfigured
- notconfigured <=== lgpo only
Default is ``None``
@ -317,7 +371,7 @@ def set_firewall_settings(profile, inbound=None, outbound=None, store="local"):
- allowoutbound
- blockoutbound
- notconfigured
- notconfigured <=== lgpo only
Default is ``None``
@ -355,21 +409,34 @@ def set_firewall_settings(profile, inbound=None, outbound=None, store="local"):
raise ValueError(f"Incorrect outbound value: {outbound}")
if not inbound and not outbound:
raise ValueError("Must set inbound or outbound")
if store == "local":
if inbound and inbound.lower() == "notconfigured":
msg = "Cannot set local inbound policies as NotConfigured"
raise CommandExecutionError(msg)
if outbound and outbound.lower() == "notconfigured":
msg = "Cannot set local outbound policies as NotConfigured"
raise CommandExecutionError(msg)
# You have to specify inbound and outbound setting at the same time
# If you're only specifying one, you have to get the current setting for the
# other
if not inbound or not outbound:
ret = get_settings(profile=profile, section="firewallpolicy", store=store)
if not inbound:
inbound = ret["Inbound"]
if not outbound:
outbound = ret["Outbound"]
# Build the powershell command
cmd = ["Set-NetFirewallProfile"]
if profile:
cmd.append(profile)
if store and store.lower() == "lgpo":
cmd.extend(["-PolicyStore", "localhost"])
command = f"set {profile}profile firewallpolicy {inbound},{outbound}"
# Get inbound settings
if inbound:
in_rule, in_action = _get_inbound_settings(inbound.lower())
cmd.extend(["-AllowInboundRules", in_rule, "-DefaultInboundAction", in_action])
results = _netsh_command(command=command, store=store)
if outbound:
out_rule = OUTBOUND[outbound.lower()]
cmd.extend(["-DefaultOutboundAction", out_rule])
# Run the command
results = salt.utils.win_pwsh.run_dict(cmd)
# A successful run should return an empty list
if results:
raise CommandExecutionError(f"An error occurred: {results}")
@ -442,6 +509,10 @@ def set_logging_settings(profile, setting, value, store="local"):
# Input validation
if profile.lower() not in ("domain", "public", "private"):
raise ValueError(f"Incorrect profile: {profile}")
if store == "local":
if str(value).lower() == "notconfigured":
msg = "Cannot set local policies as NotConfigured"
raise CommandExecutionError(msg)
if setting.lower() not in (
"allowedconnections",
"droppedconnections",
@ -449,13 +520,21 @@ def set_logging_settings(profile, setting, value, store="local"):
"maxfilesize",
):
raise ValueError(f"Incorrect setting: {setting}")
settings = {"filename": ["-LogFileName", value]}
if setting.lower() in ("allowedconnections", "droppedconnections"):
if value.lower() not in ("enable", "disable", "notconfigured"):
raise ValueError(f"Incorrect value: {value}")
settings.update(
{
"allowedconnections": ["-LogAllowed", ENABLE_DISABLE[value.lower()]],
"droppedconnections": ["-LogBlocked", ENABLE_DISABLE[value.lower()]],
}
)
# TODO: Consider adding something like the following to validate filename
# https://stackoverflow.com/questions/9532499/check-whether-a-path-is-valid-in-python-without-creating-a-file-at-the-paths-ta
if setting.lower() == "maxfilesize":
if value.lower() != "notconfigured":
if str(value).lower() != "notconfigured":
# Must be a number between 1 and 32767
try:
int(value)
@ -463,9 +542,18 @@ def set_logging_settings(profile, setting, value, store="local"):
raise ValueError(f"Incorrect value: {value}")
if not 1 <= int(value) <= 32767:
raise ValueError(f"Incorrect value: {value}")
# Run the command
command = f"set {profile}profile logging {setting} {value}"
results = _netsh_command(command=command, store=store)
settings.update({"maxfilesize": ["-LogMaxSizeKilobytes", value]})
# Build the powershell command
cmd = ["Set-NetFirewallProfile"]
if profile:
cmd.append(profile)
if store and store.lower() == "lgpo":
cmd.extend(["-PolicyStore", "localhost"])
cmd.extend(settings[setting.lower()])
results = salt.utils.win_pwsh.run_dict(cmd)
# A successful run should return an empty list
if results:
@ -493,7 +581,6 @@ def set_settings(profile, setting, value, store="local"):
- localfirewallrules
- localconsecrules
- inboundusernotification
- remotemanagement
- unicastresponsetomulticast
value (str):
@ -526,16 +613,42 @@ def set_settings(profile, setting, value, store="local"):
"localfirewallrules",
"localconsecrules",
"inboundusernotification",
"remotemanagement",
"unicastresponsetomulticast",
):
raise ValueError(f"Incorrect setting: {setting}")
if value.lower() not in ("enable", "disable", "notconfigured"):
raise ValueError(f"Incorrect value: {value}")
if setting.lower() in ["localfirewallrules", "localconsecrules"]:
if store.lower() != "lgpo":
msg = f"{setting} can only be set using Group Policy"
raise CommandExecutionError(msg)
if setting.lower() == "inboundusernotification" and store.lower() != "lgpo":
if value.lower() == "notconfigured":
msg = "NotConfigured is only valid when setting group policy"
raise CommandExecutionError(msg)
# Run the command
command = f"set {profile}profile settings {setting} {value}"
results = _netsh_command(command=command, store=store)
# Build the powershell command
cmd = ["Set-NetFirewallProfile"]
if profile:
cmd.append(profile)
if store and store.lower() == "lgpo":
cmd.extend(["-PolicyStore", "localhost"])
settings = {
"localfirewallrules": [
"-AllowLocalFirewallRules",
ENABLE_DISABLE[value.lower()],
],
"localconsecrules": ["-AllowLocalIPsecRules", ENABLE_DISABLE[value.lower()]],
"inboundusernotification": ["-NotifyOnListen", ENABLE_DISABLE[value.lower()]],
"unicastresponsetomulticast": [
"-AllowUnicastResponseToMulticast",
ENABLE_DISABLE[value.lower()],
],
}
cmd.extend(settings[setting.lower()])
results = salt.utils.win_pwsh.run_dict(cmd)
# A successful run should return an empty list
if results:
@ -546,7 +659,7 @@ def set_settings(profile, setting, value, store="local"):
def set_state(profile, state, store="local"):
"""
Configure the firewall state.
Enable or disable the firewall profile.
Args:
@ -583,12 +696,22 @@ def set_state(profile, state, store="local"):
# Input validation
if profile.lower() not in ("domain", "public", "private"):
raise ValueError(f"Incorrect profile: {profile}")
if state.lower() not in ("on", "off", "notconfigured"):
raise ValueError(f"Incorrect state: {state}")
if not isinstance(state, bool):
if state.lower() not in ("on", "off", "notconfigured"):
raise ValueError(f"Incorrect state: {state}")
else:
state = "On" if state else "Off"
# Run the command
command = f"set {profile}profile state {state}"
results = _netsh_command(command=command, store=store)
# Build the powershell command
cmd = ["Set-NetFirewallProfile"]
if profile:
cmd.append(profile)
if store and store.lower() == "lgpo":
cmd.extend(["-PolicyStore", "localhost"])
cmd.extend(["-Enabled", ON_OFF[state.lower()]])
results = salt.utils.win_pwsh.run_dict(cmd)
# A successful run should return an empty list
if results:

View file

@ -3,26 +3,16 @@ import salt.utils.json
import salt.utils.platform
from salt.exceptions import CommandExecutionError
__virtualname__ = "win_pwsh"
def __virtual__():
"""
Only load if windows
"""
if not salt.utils.platform.is_windows():
return False, "This utility will only run on Windows"
return __virtualname__
def run_dict(cmd, cwd=None):
"""
Execute the powershell command and return the data as a dictionary
.. versionadded:: 3006.9
Args:
cmd (str): The powershell command to run
cmd (str,list): The powershell command to run
cwd (str): The current working directory
@ -34,6 +24,8 @@ def run_dict(cmd, cwd=None):
If an error is encountered or the command does not complete
successfully
"""
if isinstance(cmd, list):
cmd = " ".join(map(str, cmd))
if "convertto-json" not in cmd.lower():
cmd = f"{cmd} | ConvertTo-Json"
if "progresspreference" not in cmd.lower():

View file

@ -52,6 +52,9 @@ def __virtual__():
def split_username(username):
"""
Splits out the username from the domain name and returns both.
"""
domain = "."
user_name = username
if "@" in username:
@ -234,7 +237,7 @@ def runas(cmdLine, username, password=None, cwd=None):
fd_out = msvcrt.open_osfhandle(stdout_read.handle, os.O_RDONLY | os.O_TEXT)
with os.fdopen(fd_out, "r") as f_out:
stdout = f_out.read()
ret["stdout"] = stdout
ret["stdout"] = stdout.strip()
# Read standard error
fd_err = msvcrt.open_osfhandle(stderr_read.handle, os.O_RDONLY | os.O_TEXT)

View file

@ -1,10 +1,10 @@
import base64
import copy
import datetime
import ipaddress
import logging
import os.path
import re
from datetime import datetime, timedelta, timezone
from enum import Enum
from urllib.parse import urlparse, urlunparse
@ -313,14 +313,14 @@ def build_crt(
)
not_before = (
datetime.datetime.strptime(not_before, TIME_FMT)
datetime.strptime(not_before, TIME_FMT).replace(tzinfo=timezone.utc)
if not_before
else datetime.datetime.utcnow()
else datetime.now(tz=timezone.utc)
)
not_after = (
datetime.datetime.strptime(not_after, TIME_FMT)
datetime.strptime(not_after, TIME_FMT).replace(tzinfo=timezone.utc)
if not_after
else datetime.datetime.utcnow() + datetime.timedelta(days=days_valid)
else datetime.now(tz=timezone.utc) + timedelta(days=days_valid)
)
builder = builder.not_valid_before(not_before).not_valid_after(not_after)
@ -422,32 +422,38 @@ def build_crl(
builder = cx509.CertificateRevocationListBuilder()
if signing_cert:
builder = builder.issuer_name(signing_cert.subject)
builder = builder.last_update(datetime.datetime.today())
builder = builder.last_update(datetime.now(tz=timezone.utc))
builder = builder.next_update(
datetime.datetime.today() + datetime.timedelta(days=days_valid)
datetime.now(tz=timezone.utc) + timedelta(days=days_valid)
)
for rev in revoked:
serial_number = not_after = revocation_date = None
if "not_after" in rev:
not_after = datetime.datetime.strptime(rev["not_after"], TIME_FMT)
not_after = datetime.strptime(rev["not_after"], TIME_FMT).replace(
tzinfo=timezone.utc
)
if "serial_number" in rev:
serial_number = rev["serial_number"]
if "certificate" in rev:
rev_cert = load_cert(rev["certificate"])
serial_number = rev_cert.serial_number
not_after = rev_cert.not_valid_after
try:
not_after = rev_cert.not_valid_after_utc
except AttributeError:
# naive datetime object, release <42 (it's always UTC)
not_after = rev_cert.not_valid_after.replace(tzinfo=timezone.utc)
if not serial_number:
raise SaltInvocationError("Need serial_number or certificate")
serial_number = _get_serial_number(serial_number)
if not_after and not include_expired:
if datetime.datetime.utcnow() > not_after:
if datetime.now(tz=timezone.utc) > not_after:
continue
if "revocation_date" in rev:
revocation_date = datetime.datetime.strptime(
revocation_date = datetime.strptime(
rev["revocation_date"], TIME_FMT
)
).replace(tzinfo=timezone.utc)
else:
revocation_date = datetime.datetime.utcnow()
revocation_date = datetime.now(tz=timezone.utc)
revoked_cert = cx509.RevokedCertificateBuilder(
serial_number=serial_number, revocation_date=revocation_date
@ -1624,8 +1630,9 @@ def _create_invalidity_date(val, **kwargs):
if critical:
val = val.split(" ", maxsplit=1)[1]
try:
# InvalidityDate deals in naive datetime objects only currently
return (
cx509.InvalidityDate(datetime.datetime.strptime(val, TIME_FMT)),
cx509.InvalidityDate(datetime.strptime(val, TIME_FMT)),
critical,
)
except ValueError as err:

View file

@ -596,39 +596,3 @@ class CMDModuleTest(ModuleCase):
).splitlines()
self.assertIn("abc=123", out)
self.assertIn("ABC=456", out)
@pytest.mark.slow_test
@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows")
def test_windows_powershell_script_args(self):
"""
Ensure that powershell processes inline script in args
"""
val = "i like cheese"
args = (
'-SecureString (ConvertTo-SecureString -String "{}" -AsPlainText -Force)'
" -ErrorAction Stop".format(val)
)
script = "salt://issue-56195/test.ps1"
ret = self.run_function(
"cmd.script", [script], args=args, shell="powershell", saltenv="base"
)
self.assertEqual(ret["stdout"], val)
@pytest.mark.slow_test
@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows")
@pytest.mark.skip_if_binaries_missing("pwsh")
def test_windows_powershell_script_args_pwsh(self):
"""
Ensure that powershell processes inline script in args with powershell
core
"""
val = "i like cheese"
args = (
'-SecureString (ConvertTo-SecureString -String "{}" -AsPlainText -Force)'
" -ErrorAction Stop".format(val)
)
script = "salt://issue-56195/test.ps1"
ret = self.run_function(
"cmd.script", [script], args=args, shell="pwsh", saltenv="base"
)
self.assertEqual(ret["stdout"], val)

View file

@ -1,10 +1,7 @@
import base64
import pytest
import salt.modules.cmdmod as cmdmod
import salt.utils.path
import salt.utils.stringutils
pytestmark = [
pytest.mark.windows_whitelisted,
@ -18,88 +15,197 @@ def shell(request):
This will run the test on powershell and powershell core (pwsh). If
powershell core is not installed that test run will be skipped
"""
if request.param == "pwsh" and salt.utils.path.which("pwsh") is None:
pytest.skip("Powershell 7 Not Present")
return request.param
def test_powershell(shell):
@pytest.fixture(scope="module")
def account():
with pytest.helpers.create_account() as _account:
yield _account
@pytest.mark.parametrize(
"cmd, expected, encode_cmd",
[
("Write-Output Foo", "Foo", False),
(["Write-Output", "Foo"], "Foo", False),
('Write-Output "Encoded Foo"', "Encoded Foo", True),
(["Write-Output", '"Encoded Foo"'], "Encoded Foo", True),
],
)
def test_powershell(shell, cmd, expected, encode_cmd):
"""
Test cmd.powershell
"""
ret = cmdmod.powershell("Write-Output foo", shell=shell)
assert ret == "foo"
ret = cmdmod.powershell(cmd=cmd, encode_cmd=encode_cmd, shell=shell)
assert ret == expected
def test_powershell_encode_cmd(shell):
@pytest.mark.parametrize(
"cmd, expected, encode_cmd",
[
("Write-Output Foo", "Foo", False),
(["Write-Output", "Foo"], "Foo", False),
('Write-Output "Encoded Foo"', "Encoded Foo", True),
(["Write-Output", '"Encoded Foo"'], "Encoded Foo", True),
],
)
def test_powershell_runas(shell, account, cmd, expected, encode_cmd):
"""
Test cmd.powershell with encode_cmd
Test cmd.powershell with runas
"""
ret = cmdmod.powershell('Write-Output "encoded foo"', encode_cmd=True, shell=shell)
assert ret == "encoded foo"
ret = cmdmod.powershell(
cmd=cmd,
encode_cmd=encode_cmd,
shell=shell,
runas=account.username,
password=account.password,
)
assert ret == expected
def test_powershell_all(shell):
@pytest.mark.parametrize(
"cmd, expected, encode_cmd",
[
("Write-Output Foo", "Foo", False),
(["Write-Output", "Foo"], "Foo", False),
('Write-Output "Encoded Foo"', "Encoded Foo", True),
(["Write-Output", '"Encoded Foo"'], "Encoded Foo", True),
],
)
def test_powershell_all(shell, cmd, expected, encode_cmd):
"""
Test cmd.powershell_all
Test cmd.powershell_all. `encode_cmd` takes the passed command and encodes
it. Different from encoded_command where it's receiving an already encoded
command
"""
ret = cmdmod.powershell_all("Write-Output foo", shell=shell)
ret = cmdmod.powershell_all(cmd=cmd, encode_cmd=encode_cmd, shell=shell)
assert isinstance(ret["pid"], int)
assert ret["retcode"] == 0
assert ret["stderr"] == ""
assert ret["result"] == "foo"
assert ret["result"] == expected
def test_powershell_all_encode_cmd(shell):
@pytest.mark.parametrize(
"cmd, expected, encode_cmd",
[
("Write-Output Foo", "Foo", False),
(["Write-Output", "Foo"], "Foo", False),
('Write-Output "Encoded Foo"', "Encoded Foo", True),
(["Write-Output", '"Encoded Foo"'], "Encoded Foo", True),
],
)
def test_powershell_all_runas(shell, account, cmd, expected, encode_cmd):
"""
Test cmd.powershell_all with encode_cmd
Test cmd.powershell_all with runas. `encode_cmd` takes the passed command
and encodes it. Different from encoded_command where it's receiving an
already encoded command
"""
ret = cmdmod.powershell_all(
'Write-Output "encoded foo"', encode_cmd=True, shell=shell
cmd=cmd,
encode_cmd=encode_cmd,
shell=shell,
runas=account.username,
password=account.password,
)
assert isinstance(ret["pid"], int)
assert ret["retcode"] == 0
assert ret["stderr"] == ""
assert ret["result"] == "encoded foo"
assert ret["result"] == expected
def test_cmd_run_all_powershell_list():
@pytest.mark.parametrize(
"cmd, expected, encoded_cmd",
[
("Write-Output Foo", "Foo", False),
(["Write-Output", "Foo"], "Foo", False),
(
"VwByAGkAdABlAC0ASABvAHMAdAAgACcARQBuAGMAbwBkAGUAZAAgAEYAbwBvACcA",
"Encoded Foo",
True,
),
],
)
def test_cmd_run_all_powershell(shell, cmd, expected, encoded_cmd):
"""
Ensure that cmd.run_all supports running shell='powershell' with cmd passed
as a list
Ensure that cmd.run_all supports running shell='powershell'
"""
ret = cmdmod.run_all(cmd=cmd, shell=shell, encoded_cmd=encoded_cmd)
assert ret["stdout"] == expected
@pytest.mark.parametrize(
"cmd, expected, encoded_cmd",
[
("Write-Output Foo", "Foo", False),
(["Write-Output", "Foo"], "Foo", False),
(
"VwByAGkAdABlAC0ASABvAHMAdAAgACcARQBuAGMAbwBkAGUAZAAgAEYAbwBvACcA",
"Encoded Foo",
True,
),
],
)
def test_cmd_run_all_powershell_runas(shell, account, cmd, expected, encoded_cmd):
"""
Ensure that cmd.run_all with runas supports running shell='powershell'
"""
ret = cmdmod.run_all(
cmd=["Write-Output", "salt"], python_shell=False, shell="powershell"
cmd=cmd,
shell=shell,
encoded_cmd=encoded_cmd,
runas=account.username,
password=account.password,
)
assert ret["stdout"] == "salt"
assert ret["stdout"] == expected
def test_cmd_run_all_powershell_string():
@pytest.mark.parametrize(
"cmd, expected, encoded_cmd",
[
("Write-Output Foo", "Foo", False),
(["Write-Output", "Foo"], "Foo", False),
(
"VwByAGkAdABlAC0ASABvAHMAdAAgACcARQBuAGMAbwBkAGUAZAAgAEYAbwBvACcA",
"Encoded Foo",
True,
),
],
)
def test_cmd_run_encoded_cmd(shell, cmd, expected, encoded_cmd):
"""
Ensure that cmd.run_all supports running shell='powershell' with cmd passed
as a string
Ensure that cmd.run supports running shell='powershell'
"""
ret = cmdmod.run_all(
cmd="Write-Output salt", python_shell=False, shell="powershell"
ret = cmdmod.run(
cmd=cmd, shell=shell, encoded_cmd=encoded_cmd, redirect_stderr=False
)
assert ret["stdout"] == "salt"
assert ret == expected
def test_cmd_run_encoded_cmd(shell):
cmd = "Write-Output 'encoded command'"
cmd = f"$ProgressPreference='SilentlyContinue'; {cmd}"
cmd_utf16 = cmd.encode("utf-16-le")
encoded_cmd = base64.standard_b64encode(cmd_utf16)
encoded_cmd = salt.utils.stringutils.to_str(encoded_cmd)
ret = cmdmod.run(cmd=encoded_cmd, shell=shell, encoded_cmd=True)
assert ret == "encoded command"
def test_cmd_run_all_encoded_cmd(shell):
cmd = "Write-Output 'encoded command'"
cmd = f"$ProgressPreference='SilentlyContinue'; {cmd}"
cmd_utf16 = cmd.encode("utf-16-le")
encoded_cmd = base64.standard_b64encode(cmd_utf16)
encoded_cmd = salt.utils.stringutils.to_str(encoded_cmd)
ret = cmdmod.run_all(cmd=encoded_cmd, shell=shell, encoded_cmd=True)
assert ret["stdout"] == "encoded command"
@pytest.mark.parametrize(
"cmd, expected, encoded_cmd",
[
("Write-Output Foo", "Foo", False),
(["Write-Output", "Foo"], "Foo", False),
(
"VwByAGkAdABlAC0ASABvAHMAdAAgACcARQBuAGMAbwBkAGUAZAAgAEYAbwBvACcA",
"Encoded Foo",
True,
),
],
)
def test_cmd_run_encoded_cmd_runas(shell, account, cmd, expected, encoded_cmd):
"""
Ensure that cmd.run with runas supports running shell='powershell'
"""
ret = cmdmod.run(
cmd=cmd,
shell=shell,
encoded_cmd=encoded_cmd,
runas=account.username,
password=account.password,
)
assert ret == expected

View file

@ -0,0 +1,87 @@
import pytest
import salt.utils.path
pytestmark = [
pytest.mark.core_test,
pytest.mark.windows_whitelisted,
]
@pytest.fixture(scope="module")
def cmd(modules):
return modules.cmd
@pytest.fixture(params=["powershell", "pwsh"])
def shell(request):
"""
This will run the test on powershell and powershell core (pwsh). If
powershell core is not installed that test run will be skipped
"""
if request.param == "pwsh" and salt.utils.path.which("pwsh") is None:
pytest.skip("Powershell 7 Not Present")
return request.param
@pytest.fixture(scope="module")
def account():
with pytest.helpers.create_account() as _account:
yield _account
@pytest.fixture
def issue_56195(state_tree):
contents = """
[CmdLetBinding()]
Param(
[SecureString] $SecureString
)
$Credential = New-Object System.Net.NetworkCredential("DummyId", $SecureString)
$Credential.Password
"""
with pytest.helpers.temp_file("test.ps1", contents, state_tree / "issue-56195"):
yield
@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows")
def test_windows_script_args_powershell(cmd, shell, issue_56195):
"""
Ensure that powershell processes an inline script with args where the args
contain powershell that needs to be rendered
"""
password = "i like cheese"
args = (
"-SecureString (ConvertTo-SecureString -String '{}' -AsPlainText -Force)"
" -ErrorAction Stop".format(password)
)
script = "salt://issue-56195/test.ps1"
ret = cmd.script(source=script, args=args, shell="powershell", saltenv="base")
assert ret["stdout"] == password
@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows")
def test_windows_script_args_powershell_runas(cmd, shell, account, issue_56195):
"""
Ensure that powershell processes an inline script with args where the args
contain powershell that needs to be rendered
"""
password = "i like cheese"
args = (
"-SecureString (ConvertTo-SecureString -String '{}' -AsPlainText -Force)"
" -ErrorAction Stop".format(password)
)
script = "salt://issue-56195/test.ps1"
ret = cmd.script(
source=script,
args=args,
shell="powershell",
saltenv="base",
runas=account.username,
password=account.password,
)
assert ret["stdout"] == password

View file

@ -320,19 +320,22 @@ def test_setpassword_int(user, account_int):
("logonscript", "\\\\server\\script.cmd", "", None),
("expiration_date", "3/19/2024", "", "2024-03-19 00:00:00"),
("expiration_date", "Never", "", None),
("expired", True, "", None),
("expired", False, "", None),
("account_disabled", True, "", None),
("account_disabled", False, "", None),
("unlock_account", True, "account_locked", False),
("password_never_expires", True, "", None),
("password_never_expires", False, "", None),
("expired", True, "", None),
("expired", False, "", None),
("disallow_change_password", True, "", None),
("disallow_change_password", False, "", None),
],
)
def test_update_str(user, value_name, new_value, info_field, expected, account_str):
setting = {value_name: new_value}
# You can't expire an account if the password never expires
if value_name == "expired":
setting.update({"password_never_expires": not new_value})
ret = user.update(account_str.username, **setting)
assert ret is True
ret = user.info(account_str.username)

View file

@ -310,7 +310,7 @@ def test_powershell_empty():
mock_run = {"pid": 1234, "retcode": 0, "stderr": "", "stdout": ""}
with patch("salt.modules.cmdmod._run", return_value=mock_run):
ret = cmdmod.powershell("Set-ExecutionPolicy RemoteSigned")
assert ret == {}
assert ret == ""
def test_is_valid_shell_windows():
@ -1052,57 +1052,97 @@ def test_runas_env_sudo_group(bundled):
)
def test_prep_powershell_cmd_no_powershell():
with pytest.raises(CommandExecutionError):
cmdmod._prep_powershell_cmd(
win_shell="unk_bin", cmd="Some-Command", encoded_cmd=False
)
def test_prep_powershell_cmd():
"""
Tests _prep_powershell_cmd returns correct cmd
"""
with patch("salt.utils.platform.is_windows", MagicMock(return_value=False)):
stack = [["", "", ""], ["", "", ""], ["", "", ""]]
stack = [["", "", ""], ["", "", ""], ["", "", ""], ["", "", ""]]
with patch("traceback.extract_stack", return_value=stack), patch(
"salt.utils.path.which", return_value="C:\\powershell.exe"
):
ret = cmdmod._prep_powershell_cmd(
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=False
win_shell="powershell", cmd="$PSVersionTable", encoded_cmd=False
)
assert ret == 'powershell -NonInteractive -NoProfile -Command "$PSVersionTable"'
expected = [
"C:\\powershell.exe",
"-NonInteractive",
"-NoProfile",
"-ExecutionPolicy",
"Bypass",
"-Command",
"& {$PSVersionTable}",
]
assert ret == expected
def test_prep_powershell_cmd_encoded():
"""
Tests _prep_powershell_cmd returns correct cmd when encoded_cmd=True
"""
stack = [["", "", ""], ["", "", ""], ["", "", ""], ["", "", ""]]
# This is the encoded command for 'Write-Host "Encoded HOLO"'
e_cmd = "VwByAGkAdABlAC0ASABvAHMAdAAgACIARQBuAGMAbwBkAGUAZAAgAEgATwBMAE8AIgA="
with patch("traceback.extract_stack", return_value=stack), patch(
"salt.utils.path.which", return_value="C:\\powershell.exe"
):
ret = cmdmod._prep_powershell_cmd(
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=True
)
assert (
ret
== "powershell -NonInteractive -NoProfile -EncodedCommand $PSVersionTable"
win_shell="powershell", cmd=e_cmd, encoded_cmd=True
)
expected = [
"C:\\powershell.exe",
"-NonInteractive",
"-NoProfile",
"-ExecutionPolicy",
"Bypass",
"-EncodedCommand",
f"{e_cmd}",
]
assert ret == expected
stack = [["", "", ""], ["", "", "script"], ["", "", ""]]
def test_prep_powershell_cmd_script():
"""
Tests _prep_powershell_cmd returns correct cmd when called from cmd.script
"""
stack = [["", "", ""], ["", "", "script"], ["", "", ""], ["", "", ""]]
script = r"C:\some\script.ps1"
with patch("traceback.extract_stack", return_value=stack), patch(
"salt.utils.path.which", return_value="C:\\powershell.exe"
):
ret = cmdmod._prep_powershell_cmd(
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=False
)
assert (
ret
== "powershell -NonInteractive -NoProfile -ExecutionPolicy Bypass -Command $PSVersionTable"
win_shell="powershell", cmd=script, encoded_cmd=False
)
expected = [
"C:\\powershell.exe",
"-NonInteractive",
"-NoProfile",
"-ExecutionPolicy",
"Bypass",
"-Command",
f"& {script}",
]
assert ret == expected
with patch("salt.utils.platform.is_windows", MagicMock(return_value=True)):
stack = [["", "", ""], ["", "", ""], ["", "", ""]]
ret = cmdmod._prep_powershell_cmd(
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=False
)
assert (
ret == '"powershell" -NonInteractive -NoProfile -Command "$PSVersionTable"'
)
ret = cmdmod._prep_powershell_cmd(
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=True
)
assert (
ret
== '"powershell" -NonInteractive -NoProfile -EncodedCommand $PSVersionTable'
)
stack = [["", "", ""], ["", "", "script"], ["", "", ""]]
ret = cmdmod._prep_powershell_cmd(
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=False
)
assert (
ret
== '"powershell" -NonInteractive -NoProfile -ExecutionPolicy Bypass -Command $PSVersionTable'
)
@pytest.mark.parametrize(
"text, expected",
[
("", '""'), # Should quote an empty string
("Foo", '"Foo"'), # Should quote a string
('["foo", "bar"]', '["foo", "bar"]'), # Should leave unchanged
('{"foo": "bar"}', '{"foo": "bar"}'), # Should leave unchanged
],
)
def test_prep_powershell_json(text, expected):
"""
Make sure the output is valid json
"""
result = cmdmod._prep_powershell_json(text)
assert result == expected

View file

@ -103,6 +103,26 @@ def test_build_rule():
"comment": "Successfully built rule",
}
assert nftables.build_rule(
table="filter",
chain="input",
family="ip6",
command="insert",
position="3",
full="True",
connstate="related,established",
saddr="::/0",
daddr="fe80:cafe::1",
jump="accept",
) == {
"result": True,
"rule": (
"nft insert rule ip6 filter input position 3 ct state {"
" related,established } ip6 saddr ::/0 ip6 daddr fe80:cafe::1 accept"
),
"comment": "Successfully built rule",
}
assert nftables.build_rule() == {"result": True, "rule": "", "comment": ""}

View file

@ -1,5 +1,5 @@
import datetime
import ipaddress
from datetime import datetime, timedelta, timezone
import pytest
@ -11,6 +11,9 @@ cryptography = pytest.importorskip(
"cryptography", reason="Needs cryptography library", minversion="37.0"
)
cx509 = pytest.importorskip("cryptography.x509", reason="Needs cryptography library")
cprim = pytest.importorskip(
"cryptography.hazmat.primitives", reason="Needs cryptography library"
)
@pytest.fixture
@ -1019,12 +1022,12 @@ class TestCreateExtension:
[
(
"critical, 2022-10-11 13:37:42",
datetime.datetime.strptime("2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S"),
datetime.strptime("2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S"),
True,
),
(
"2022-10-11 13:37:42",
datetime.datetime.strptime("2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S"),
datetime.strptime("2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S"),
False,
),
],
@ -1875,9 +1878,7 @@ def test_get_dn(inpt, expected):
cx509.Extension(
cx509.InvalidityDate.oid,
value=cx509.InvalidityDate(
datetime.datetime.strptime(
"2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S"
)
datetime.strptime("2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S")
),
critical=False,
),
@ -1888,3 +1889,86 @@ def test_get_dn(inpt, expected):
def test_render_extension(inpt, expected):
ret = x509.render_extension(inpt)
assert ret == expected
@pytest.fixture
def ca_cert():
return """\
-----BEGIN CERTIFICATE-----
MIIDODCCAiCgAwIBAgIIbfpgqP0VGPgwDQYJKoZIhvcNAQELBQAwKzELMAkGA1UE
BhMCVVMxDTALBgNVBAMMBFRlc3QxDTALBgNVBAoMBFNhbHQwHhcNMjIxMTE1MTQw
NDMzWhcNMzIxMTEyMTQwNDMzWjArMQswCQYDVQQGEwJVUzENMAsGA1UEAwwEVGVz
dDENMAsGA1UECgwEU2FsdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
AOGTScvrjcEt6vsJcG9RUp6fKaDNDWZnJET0omanK9ZwaoGpJPp8UDYe/8ADeI7N
10wdyB4oDM9gRDjInBtdQO/PsrmKZF6LzqVFgLMxu2up+PHMi9z6B2P4esIAzMu9
PYxc9zH4HzLImHqscVD2HCabsjp9X134Af7hVY5NN/W/4qTP7uOM20wSG2TPI6+B
tA9VyPbEPMPRzXzrqc45rVYe6kb2bT84GE93Vcu/e5JZ/k2AKD8Hoa2cxLPsTLq5
igl+D+k+dfUtiABiKPvVQiYBsD1fyHDn2m7B6pCgvrGqHjsoAKufgFnXy6PJRg7n
vQfaxSiusM5s+VS+fjlvgwsCAwEAAaNgMF4wDwYDVR0TBAgwBgEB/wIBATALBgNV
HQ8EBAMCAQYwHQYDVR0OBBYEFFzy8fRTKSOe7kBakqO0Ki71potnMB8GA1UdIwQY
MBaAFFzy8fRTKSOe7kBakqO0Ki71potnMA0GCSqGSIb3DQEBCwUAA4IBAQBZS4MP
fXYPoGZ66seM+0eikScZHirbRe8vHxHkujnTBUjQITKm86WeQgeBCD2pobgBGZtt
5YFozM4cERqY7/1BdemUxFvPmMFFznt0TM5w+DfGWVK8un6SYwHnmBbnkWgX4Srm
GsL0HHWxVXkGnFGFk6Sbo3vnN7CpkpQTWFqeQQ5rHOw91pt7KnNZwc6I3ZjrCUHJ
+UmKKrga16a4Q+8FBpYdphQU609npo/0zuaE6FyiJYlW3tG+mlbbNgzY/+eUaxt2
9Bp9mtA+Hkox551Mfpq45Oi+ehwMt0xjZCjuFCM78oiUdHCGO+EmcT7ogiYALiOF
LN1w5sybsYwIw6QN
-----END CERTIFICATE-----
"""
@pytest.fixture
def ca_key():
return """\
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEA4ZNJy+uNwS3q+wlwb1FSnp8poM0NZmckRPSiZqcr1nBqgakk
+nxQNh7/wAN4js3XTB3IHigMz2BEOMicG11A78+yuYpkXovOpUWAszG7a6n48cyL
3PoHY/h6wgDMy709jFz3MfgfMsiYeqxxUPYcJpuyOn1fXfgB/uFVjk039b/ipM/u
44zbTBIbZM8jr4G0D1XI9sQ8w9HNfOupzjmtVh7qRvZtPzgYT3dVy797kln+TYAo
PwehrZzEs+xMurmKCX4P6T519S2IAGIo+9VCJgGwPV/IcOfabsHqkKC+saoeOygA
q5+AWdfLo8lGDue9B9rFKK6wzmz5VL5+OW+DCwIDAQABAoIBAFfImc9hu6iR1gAb
jEXFwAE6r1iEc9KGEPdEvG52X/jzhn8u89UGy7BEIAL5VtE8Caz1agtSSqnpLKNs
blO31q18hnDuCmFAxwpKIeuaTvV3EAoJL+Su6HFfIWaeKRSgcHNPOmOXy4xXw/75
XJ/FJu9fZ9ybLaHEAgLObh0Sr9RSPQbZ72ZawPP8+5WCbR+2w90RApHXQL0piSbW
lIx1NE6o5wQb3vik8z/k5FqLCY2a8++WNyfvS+WWFY5WXGI7ZiDDQk46gnslquH2
Lon5CEn3JlTGQFhxaaa2ivssscf2lA2Rvm2E8o1rdZJS2OpSE0ai4TXY9XnyjZj1
5usWIwECgYEA+3Mwu03A7PyLEBksS/u3MSo/176S9lF/uXcecQNdhAIalUZ8AgV3
7HP2yI9ZC0ekA809ZzFjGFostXm9VfUOEZ549jLOMzvBtCdaI0aBUE8icu52fX4r
fT2NY6hYgz5/fxD8sq1XH/fqNNexABwtViH6YAly/9A1/8M3BOWt72UCgYEA5ag8
sIfiBUoWd1sS6qHDuugWlpx4ZWYC/59XEJyCN2wioP8qFji/aNZxF1wLfyQe/zaa
YBFusjsBnSfBU1p4UKCRHWQ9/CnC0DzqTkyKC4Fv8GuxgywNm5W9gPKk7idHP7mw
e+7Uvf1pOQccqEPh7yltpW+Xw27gfsC2DMAIGa8CgYByv/q5P56PiCCeVB6W/mR3
l2RTPLEsn7y+EtJdmL+QgrVG8kedVImJ6tHwbRqhvyvmYD9pXGxwrJZCqy/wjkjB
WaSyFjVrxBV99Yd5Ga/hyntaH+ELHA0UtoZTuHvMSTU9866ei+R6vlSvkM9B0ZoO
+KqeMTG99HLwKVJudbKO0QKBgQCd33U49XBOqoufKSBr4yAmUH2Ws6GgMuxExUiY
xr5NUyzK+B36gLA0ZZYAtOnCURZt4x9kgxdRtnZ5jma74ilrY7XeOpbRzfN6KyX3
BW6wUh6da6rvvUztc5Z+Gk9+18mG6SOFTr04jgfTiCwPD/s06YnSfFAbrRDukZOU
WD45SQKBgBvjSwl3AbPoJnRjZjGuCUMKQKrLm30xCeorxasu+di/4YV5Yd8VUjaO
mYyqXW6bQndKLuXT+AXtCd/Xt2sI96z8mc0G5fImDUxQjMUuS3RyQK357cEOu8Zy
HdI7Pfaf/l0HozAw/Al+LXbpmSBdfmz0U/EGAKRqXMW5+vQ7XHXD
-----END RSA PRIVATE KEY-----"""
def test_build_crl_accounts_for_local_time_zone(ca_key, ca_cert):
curr_time = datetime.now(tz=timezone(timedelta(hours=1)))
curr_time_naive = curr_time.replace(tzinfo=None)
def dtn(tz=None):
if tz is None:
return curr_time_naive
return curr_time
curr_time_utc = curr_time.astimezone(timezone.utc).replace(microsecond=0)
curr_time_utc_naive = curr_time_utc.replace(tzinfo=None)
privkey = cprim.serialization.load_pem_private_key(ca_key.encode(), password=None)
cert = cx509.load_pem_x509_certificate(ca_cert.encode())
with patch("salt.utils.x509.datetime") as fakedate:
fakedate.today.return_value = curr_time_naive
fakedate.now.side_effect = dtn
fakedate.utcnow.return_value = curr_time_utc_naive
builder, _ = x509.build_crl(privkey, [], signing_cert=cert)
crl = builder.sign(privkey, algorithm=cprim.hashes.SHA256())
try:
assert crl.last_update_utc == curr_time_utc
except AttributeError:
assert crl.last_update == curr_time_utc_naive

View file

@ -2,7 +2,6 @@ import pytest
import salt.utils.win_lgpo_netsh as win_lgpo_netsh
from salt.exceptions import CommandExecutionError
from tests.support.mock import patch
pytestmark = [
pytest.mark.windows_whitelisted,
@ -26,18 +25,6 @@ def test_get_settings_firewallpolicy_lgpo():
assert "Outbound" in ret
def test_get_settings_firewallpolicy_lgpo_issue_57591():
"""
Should not stacktrace when the hostname contains unicode characters
"""
with patch.object(win_lgpo_netsh, "__hostname__", return_value="kомпьютер"):
ret = win_lgpo_netsh.get_settings(
profile="domain", section="firewallpolicy", store="lgpo"
)
assert "Inbound" in ret
assert "Outbound" in ret
def test_get_settings_logging_local():
ret = win_lgpo_netsh.get_settings(
profile="domain", section="logging", store="local"
@ -63,7 +50,6 @@ def test_get_settings_settings_local():
assert "InboundUserNotification" in ret
assert "LocalConSecRules" in ret
assert "LocalFirewallRules" in ret
assert "RemoteManagement" in ret
assert "UnicastResponseToMulticast" in ret
@ -74,7 +60,6 @@ def test_get_settings_settings_lgpo():
assert "InboundUserNotification" in ret
assert "LocalConSecRules" in ret
assert "LocalFirewallRules" in ret
assert "RemoteManagement" in ret
assert "UnicastResponseToMulticast" in ret
@ -99,7 +84,6 @@ def test_get_all_settings_local():
assert "InboundUserNotification" in ret
assert "LocalConSecRules" in ret
assert "LocalFirewallRules" in ret
assert "RemoteManagement" in ret
assert "UnicastResponseToMulticast" in ret
assert "State" in ret
@ -115,7 +99,6 @@ def test_get_all_settings_lgpo():
assert "InboundUserNotification" in ret
assert "LocalConSecRules" in ret
assert "LocalFirewallRules" in ret
assert "RemoteManagement" in ret
assert "UnicastResponseToMulticast" in ret
assert "State" in ret
@ -356,7 +339,7 @@ def test_set_firewall_logging_maxfilesize_local():
new = win_lgpo_netsh.get_settings(
profile="domain", section="logging", store="local"
)["MaxFileSize"]
assert new == "16384"
assert new == 16384
finally:
ret = win_lgpo_netsh.set_logging_settings(
profile="domain", setting="maxfilesize", value=current, store="local"
@ -491,32 +474,6 @@ def test_set_firewall_settings_notification_lgpo_notconfigured():
assert ret is True
def test_set_firewall_settings_remotemgmt_local_enable():
current = win_lgpo_netsh.get_settings(
profile="domain", section="settings", store="local"
)["RemoteManagement"]
try:
ret = win_lgpo_netsh.set_settings(
profile="domain",
setting="remotemanagement",
value="enable",
store="local",
)
assert ret is True
new = win_lgpo_netsh.get_settings(
profile="domain", section="settings", store="local"
)["RemoteManagement"]
assert new == "Enable"
finally:
ret = win_lgpo_netsh.set_settings(
profile="domain",
setting="remotemanagement",
value=current,
store="local",
)
assert ret is True
def test_set_firewall_settings_unicast_local_disable():
current = win_lgpo_netsh.get_settings(
profile="domain", section="settings", store="local"
@ -566,13 +523,16 @@ def test_set_firewall_state_local_notconfigured():
profile="domain", section="state", store="local"
)["State"]
try:
pytest.raises(
CommandExecutionError,
win_lgpo_netsh.set_state,
ret = win_lgpo_netsh.set_state(
profile="domain",
state="notconfigured",
store="local",
)
assert ret is True
new = win_lgpo_netsh.get_settings(
profile="domain", section="state", store="local"
)["State"]
assert new == "NotConfigured"
finally:
ret = win_lgpo_netsh.set_state(profile="domain", state=current, store="local")
assert ret is True

View file

@ -330,6 +330,11 @@ class TestAccount:
ret = self.sminion.functions.user.add(self.username)
assert ret is True
self._delete_account = True
if salt.utils.platform.is_windows():
log.debug("Configuring system account: %s", self)
ret = self.sminion.functions.user.update(
self.username, password_never_expires=True
)
if salt.utils.platform.is_darwin() or salt.utils.platform.is_windows():
password = self.password
else:

View file

@ -361,7 +361,6 @@ MISSING_DOCSTRINGS = {
"machine_get_machinestate_tuple",
],
"salt/utils/win_osinfo.py": ["get_os_version_info"],
"salt/utils/win_runas.py": ["split_username"],
"salt/utils/yamldumper.py": [
"represent_undefined",
"represent_ordereddict",