mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge 3006.x into 3007.x
This commit is contained in:
commit
0f2c3b53b6
24 changed files with 918 additions and 435 deletions
5
changelog/61166.fixed.md
Normal file
5
changelog/61166.fixed.md
Normal file
|
@ -0,0 +1,5 @@
|
|||
Fixes multiple issues with the cmd module on Windows. Scripts are called using
|
||||
the ``-File`` parameter to the ``powershell.exe`` binary. ``CLIXML`` data in
|
||||
stderr is now removed (only applies to encoded commands). Commands can now be
|
||||
sent to ``cmd.powershell`` as a list. Makes sure JSON data returned is valid.
|
||||
Strips whitespace from the return when using ``runas``.
|
2
changelog/61534.fixed.md
Normal file
2
changelog/61534.fixed.md
Normal file
|
@ -0,0 +1,2 @@
|
|||
Fixed the win_lgpo_netsh salt util to handle non-English systems. This was a
|
||||
rewrite to use PowerShell instead of netsh to make the changes on the system
|
1
changelog/65295.fixed.md
Normal file
1
changelog/65295.fixed.md
Normal file
|
@ -0,0 +1 @@
|
|||
Fix typo in nftables module to ensure unique nft family values
|
1
changelog/65837.fixed.md
Normal file
1
changelog/65837.fixed.md
Normal file
|
@ -0,0 +1 @@
|
|||
Corrected x509_v2 CRL creation `last_update` and `next_update` values when system timezone is not UTC
|
1
changelog/66382.fixed.md
Normal file
1
changelog/66382.fixed.md
Normal file
|
@ -0,0 +1 @@
|
|||
Fixed nftables.build_rule breaks ipv6 rules by using the wrong syntax for source and destination addresses
|
|
@ -256,32 +256,44 @@ def _check_avail(cmd):
|
|||
return bret and wret
|
||||
|
||||
|
||||
def _prep_powershell_cmd(shell, cmd, stack, encoded_cmd):
|
||||
def _prep_powershell_cmd(win_shell, cmd, encoded_cmd):
|
||||
"""
|
||||
Prep cmd when shell is powershell
|
||||
Prep cmd when shell is powershell. If we were called by script(), then fake
|
||||
out the Windows shell to run a Powershell script. Otherwise, just run a
|
||||
Powershell command.
|
||||
"""
|
||||
# Find the full path to the shell
|
||||
win_shell = salt.utils.path.which(win_shell)
|
||||
|
||||
# If this is running on Windows wrap
|
||||
# the shell in quotes in case there are
|
||||
# spaces in the paths.
|
||||
if salt.utils.platform.is_windows():
|
||||
shell = f'"{shell}"'
|
||||
if not win_shell:
|
||||
raise CommandExecutionError("PowerShell binary not found")
|
||||
|
||||
new_cmd = [win_shell, "-NonInteractive", "-NoProfile", "-ExecutionPolicy", "Bypass"]
|
||||
|
||||
# extract_stack() returns a list of tuples.
|
||||
# The last item in the list [-1] is the current method.
|
||||
# The third item[2] in each tuple is the name of that method.
|
||||
if stack[-2][2] == "script":
|
||||
cmd = (
|
||||
"{} -NonInteractive -NoProfile -ExecutionPolicy Bypass -Command {}".format(
|
||||
shell, cmd
|
||||
)
|
||||
)
|
||||
stack = traceback.extract_stack(limit=3)
|
||||
if stack[-3][2] == "script":
|
||||
# If this is cmd.script, then we're running a file
|
||||
# You might be tempted to use -File here instead of -Command
|
||||
# The problem with using -File is that any arguments that contain
|
||||
# powershell commands themselves will not be evaluated
|
||||
# See GitHub issue #56195
|
||||
new_cmd.append("-Command")
|
||||
if isinstance(cmd, list):
|
||||
cmd = " ".join(cmd)
|
||||
new_cmd.append(f"& {cmd.strip()}")
|
||||
elif encoded_cmd:
|
||||
cmd = f"{shell} -NonInteractive -NoProfile -EncodedCommand {cmd}"
|
||||
new_cmd.extend(["-EncodedCommand", f"{cmd}"])
|
||||
else:
|
||||
cmd = f'{shell} -NonInteractive -NoProfile -Command "{cmd}"'
|
||||
# Strip whitespace
|
||||
if isinstance(cmd, list):
|
||||
cmd = " ".join(cmd)
|
||||
new_cmd.extend(["-Command", f"& {{{cmd.strip()}}}"])
|
||||
|
||||
return cmd
|
||||
log.debug(new_cmd)
|
||||
return new_cmd
|
||||
|
||||
|
||||
def _run(
|
||||
|
@ -384,19 +396,7 @@ def _run(
|
|||
# The powershell core binary is "pwsh"
|
||||
# you can also pass a path here as long as the binary name is one of the two
|
||||
if any(word in shell.lower().strip() for word in ["powershell", "pwsh"]):
|
||||
# Strip whitespace
|
||||
if isinstance(cmd, str):
|
||||
cmd = cmd.strip()
|
||||
elif isinstance(cmd, list):
|
||||
cmd = " ".join(cmd).strip()
|
||||
cmd = cmd.replace('"', '\\"')
|
||||
|
||||
# If we were called by script(), then fakeout the Windows
|
||||
# shell to run a Powershell script.
|
||||
# Else just run a Powershell command.
|
||||
stack = traceback.extract_stack(limit=2)
|
||||
|
||||
cmd = _prep_powershell_cmd(shell, cmd, stack, encoded_cmd)
|
||||
cmd = _prep_powershell_cmd(shell, cmd, encoded_cmd)
|
||||
|
||||
# munge the cmd and cwd through the template
|
||||
(cmd, cwd) = _render_cmd(cmd, cwd, template, saltenv, pillarenv, pillar_override)
|
||||
|
@ -809,6 +809,9 @@ def _run(
|
|||
_log_cmd(cmd),
|
||||
)
|
||||
|
||||
# Encoded commands dump CLIXML data in stderr. It's not an actual error
|
||||
if encoded_cmd and "CLIXML" in err:
|
||||
err = ""
|
||||
if rstrip:
|
||||
if out is not None:
|
||||
out = out.rstrip()
|
||||
|
@ -1055,6 +1058,7 @@ def run(
|
|||
ignore_retcode=False,
|
||||
saltenv=None,
|
||||
use_vt=False,
|
||||
redirect_stderr=True,
|
||||
bg=False,
|
||||
password=None,
|
||||
encoded_cmd=False,
|
||||
|
@ -1190,6 +1194,12 @@ def run(
|
|||
:param bool use_vt: Use VT utils (saltstack) to stream the command output
|
||||
more interactively to the console and the logs. This is experimental.
|
||||
|
||||
:param bool redirect_stderr: If set to ``True``, then stderr will be
|
||||
redirected to stdout. This is helpful for cases where obtaining both
|
||||
the retcode and output is desired. Default is ``True``
|
||||
|
||||
.. versionadded:: 3006.9
|
||||
|
||||
:param bool encoded_cmd: Specify if the supplied command is encoded.
|
||||
Only applies to shell 'powershell' and 'pwsh'.
|
||||
|
||||
|
@ -1301,6 +1311,7 @@ def run(
|
|||
salt '*' cmd.run cmd='sed -e s/=/:/g'
|
||||
"""
|
||||
python_shell = _python_shell_default(python_shell, kwargs.get("__pub_jid", ""))
|
||||
stderr = subprocess.STDOUT if redirect_stderr else subprocess.PIPE
|
||||
ret = _run(
|
||||
cmd,
|
||||
runas=runas,
|
||||
|
@ -1309,7 +1320,7 @@ def run(
|
|||
python_shell=python_shell,
|
||||
cwd=cwd,
|
||||
stdin=stdin,
|
||||
stderr=subprocess.STDOUT,
|
||||
stderr=stderr,
|
||||
env=env,
|
||||
clean_env=clean_env,
|
||||
prepend_path=prepend_path,
|
||||
|
@ -4057,6 +4068,9 @@ def powershell(
|
|||
else:
|
||||
python_shell = True
|
||||
|
||||
if isinstance(cmd, list):
|
||||
cmd = " ".join(cmd)
|
||||
|
||||
# Append PowerShell Object formatting
|
||||
# ConvertTo-JSON is only available on PowerShell 3.0 and later
|
||||
psversion = shell_info("powershell")["psversion"]
|
||||
|
@ -4085,7 +4099,7 @@ def powershell(
|
|||
encoded_cmd = False
|
||||
|
||||
# Retrieve the response, while overriding shell with 'powershell'
|
||||
response = run(
|
||||
response = run_stdout(
|
||||
cmd,
|
||||
cwd=cwd,
|
||||
stdin=stdin,
|
||||
|
@ -4113,9 +4127,8 @@ def powershell(
|
|||
**kwargs,
|
||||
)
|
||||
|
||||
# Sometimes Powershell returns an empty string, which isn't valid JSON
|
||||
if response == "":
|
||||
response = "{}"
|
||||
response = _prep_powershell_json(response)
|
||||
|
||||
try:
|
||||
return salt.utils.json.loads(response)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
|
@ -4419,10 +4432,16 @@ def powershell_all(
|
|||
else:
|
||||
python_shell = True
|
||||
|
||||
if isinstance(cmd, list):
|
||||
cmd = " ".join(cmd)
|
||||
|
||||
# Append PowerShell Object formatting
|
||||
cmd += " | ConvertTo-JSON"
|
||||
if depth is not None:
|
||||
cmd += f" -Depth {depth}"
|
||||
# ConvertTo-JSON is only available on PowerShell 3.0 and later
|
||||
psversion = shell_info("powershell")["psversion"]
|
||||
if salt.utils.versions.version_cmp(psversion, "2.0") == 1:
|
||||
cmd += " | ConvertTo-JSON"
|
||||
if depth is not None:
|
||||
cmd += f" -Depth {depth}"
|
||||
|
||||
if encode_cmd:
|
||||
# Convert the cmd to UTF-16LE without a BOM and base64 encode.
|
||||
|
@ -4474,6 +4493,8 @@ def powershell_all(
|
|||
response["result"] = []
|
||||
return response
|
||||
|
||||
stdoutput = _prep_powershell_json(stdoutput)
|
||||
|
||||
# If we fail to parse stdoutput we will raise an exception
|
||||
try:
|
||||
result = salt.utils.json.loads(stdoutput)
|
||||
|
@ -4492,9 +4513,30 @@ def powershell_all(
|
|||
else:
|
||||
# result type is list so the force_list param has no effect
|
||||
response["result"] = result
|
||||
|
||||
# Encoded commands dump CLIXML data in stderr. It's not an actual error
|
||||
if "CLIXML" in response["stderr"]:
|
||||
response["stderr"] = ""
|
||||
|
||||
return response
|
||||
|
||||
|
||||
def _prep_powershell_json(text):
|
||||
"""
|
||||
Try to fix the output from OutputTo-JSON in powershell commands to make it
|
||||
valid JSON
|
||||
"""
|
||||
# An empty string just needs to be an empty quote
|
||||
if text == "":
|
||||
text = '""'
|
||||
else:
|
||||
# Raw text needs to be quoted
|
||||
starts_with = ['"', "{", "["]
|
||||
if not any(text.startswith(x) for x in starts_with):
|
||||
text = f'"{text}"'
|
||||
return text
|
||||
|
||||
|
||||
def run_bg(
|
||||
cmd,
|
||||
cwd=None,
|
||||
|
|
|
@ -165,14 +165,18 @@ def build_rule(
|
|||
del kwargs["counter"]
|
||||
|
||||
if "saddr" in kwargs or "source" in kwargs:
|
||||
rule += "ip saddr {} ".format(kwargs.get("saddr") or kwargs.get("source"))
|
||||
rule += "{} saddr {} ".format(
|
||||
nft_family, kwargs.get("saddr") or kwargs.get("source")
|
||||
)
|
||||
if "saddr" in kwargs:
|
||||
del kwargs["saddr"]
|
||||
if "source" in kwargs:
|
||||
del kwargs["source"]
|
||||
|
||||
if "daddr" in kwargs or "destination" in kwargs:
|
||||
rule += "ip daddr {} ".format(kwargs.get("daddr") or kwargs.get("destination"))
|
||||
rule += "{} daddr {} ".format(
|
||||
nft_family, kwargs.get("daddr") or kwargs.get("destination")
|
||||
)
|
||||
if "daddr" in kwargs:
|
||||
del kwargs["daddr"]
|
||||
if "destination" in kwargs:
|
||||
|
|
|
@ -450,7 +450,9 @@ def get_config():
|
|||
raise
|
||||
|
||||
config = dict()
|
||||
if raw_config:
|
||||
if not raw_config:
|
||||
raise CommandExecutionError("Not Configured")
|
||||
else:
|
||||
# Does this Configuration contain a single resource
|
||||
if "ConfigurationName" in raw_config:
|
||||
# Load the single resource
|
||||
|
@ -606,11 +608,13 @@ def test_config():
|
|||
"""
|
||||
cmd = "Test-DscConfiguration"
|
||||
try:
|
||||
_pshell(cmd, ignore_retcode=True)
|
||||
result = _pshell(cmd, ignore_retcode=True)
|
||||
except CommandExecutionError as exc:
|
||||
if "Current configuration does not exist" in exc.info["stderr"]:
|
||||
raise CommandExecutionError("Not Configured")
|
||||
raise
|
||||
if not result:
|
||||
raise CommandExecutionError("Not Configured")
|
||||
return True
|
||||
|
||||
|
||||
|
@ -635,11 +639,14 @@ def get_config_status():
|
|||
"Type, Mode, RebootRequested, NumberofResources"
|
||||
)
|
||||
try:
|
||||
return _pshell(cmd, ignore_retcode=True)
|
||||
result = _pshell(cmd, ignore_retcode=True)
|
||||
except CommandExecutionError as exc:
|
||||
if "No status information available" in exc.info["stderr"]:
|
||||
raise CommandExecutionError("Not Configured")
|
||||
raise
|
||||
if not result:
|
||||
raise CommandExecutionError("Not Configured")
|
||||
return result
|
||||
|
||||
|
||||
def get_lcm_config():
|
||||
|
|
|
@ -168,12 +168,12 @@ Note that when a ``ca_server`` is involved, both peers must use the updated modu
|
|||
|
||||
import base64
|
||||
import copy
|
||||
import datetime
|
||||
import glob
|
||||
import logging
|
||||
import os.path
|
||||
import re
|
||||
import sys
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
try:
|
||||
import cryptography.x509 as cx509
|
||||
|
@ -1409,10 +1409,12 @@ def expires(certificate, days=0):
|
|||
Defaults to ``0``, which checks for the current time.
|
||||
"""
|
||||
cert = x509util.load_cert(certificate)
|
||||
# dates are encoded in UTC/GMT, they are returned as a naive datetime object
|
||||
return cert.not_valid_after <= datetime.datetime.utcnow() + datetime.timedelta(
|
||||
days=days
|
||||
)
|
||||
try:
|
||||
not_after = cert.not_valid_after_utc
|
||||
except AttributeError:
|
||||
# naive datetime object, release <42 (it's always UTC)
|
||||
not_after = cert.not_valid_after.replace(tzinfo=timezone.utc)
|
||||
return not_after <= datetime.now(tz=timezone.utc) + timedelta(days=days)
|
||||
|
||||
|
||||
def expired(certificate):
|
||||
|
@ -1692,6 +1694,13 @@ def read_certificate(certificate):
|
|||
cert = x509util.load_cert(certificate)
|
||||
key_type = x509util.get_key_type(cert.public_key(), as_string=True)
|
||||
|
||||
try:
|
||||
not_before = cert.not_valid_before_utc
|
||||
not_after = cert.not_valid_after_utc
|
||||
except AttributeError:
|
||||
# naive datetime object, release <42 (it's always UTC)
|
||||
not_before = cert.not_valid_before.replace(tzinfo=timezone.utc)
|
||||
not_after = cert.not_valid_after.replace(tzinfo=timezone.utc)
|
||||
ret = {
|
||||
"version": cert.version.value + 1, # 0-indexed
|
||||
"key_size": cert.public_key().key_size if key_type in ["ec", "rsa"] else None,
|
||||
|
@ -1707,8 +1716,8 @@ def read_certificate(certificate):
|
|||
"issuer": _parse_dn(cert.issuer),
|
||||
"issuer_hash": x509util.pretty_hex(_get_name_hash(cert.issuer)),
|
||||
"issuer_str": cert.issuer.rfc4514_string(),
|
||||
"not_before": cert.not_valid_before.strftime(x509util.TIME_FMT),
|
||||
"not_after": cert.not_valid_after.strftime(x509util.TIME_FMT),
|
||||
"not_before": not_before.strftime(x509util.TIME_FMT),
|
||||
"not_after": not_after.strftime(x509util.TIME_FMT),
|
||||
"public_key": get_public_key(cert),
|
||||
"extensions": _parse_extensions(cert.extensions),
|
||||
}
|
||||
|
@ -1774,10 +1783,16 @@ def read_crl(crl):
|
|||
The certificate revocation list to read.
|
||||
"""
|
||||
crl = x509util.load_crl(crl)
|
||||
try:
|
||||
last_update = crl.last_update_utc
|
||||
next_update = crl.next_update_utc
|
||||
except AttributeError:
|
||||
last_update = crl.last_update.replace(tzinfo=timezone.utc)
|
||||
next_update = crl.next_update.replace(tzinfo=timezone.utc)
|
||||
ret = {
|
||||
"issuer": _parse_dn(crl.issuer),
|
||||
"last_update": crl.last_update.strftime(x509util.TIME_FMT),
|
||||
"next_update": crl.next_update.strftime(x509util.TIME_FMT),
|
||||
"last_update": last_update.strftime(x509util.TIME_FMT),
|
||||
"next_update": next_update.strftime(x509util.TIME_FMT),
|
||||
"revoked_certificates": {},
|
||||
"extensions": _parse_extensions(crl.extensions),
|
||||
}
|
||||
|
@ -1797,12 +1812,15 @@ def read_crl(crl):
|
|||
ret["signature_algorithm"] = crl.signature_algorithm_oid.dotted_string
|
||||
|
||||
for revoked in crl:
|
||||
try:
|
||||
revocation_date = revoked.revocation_date_utc
|
||||
except AttributeError:
|
||||
# naive datetime object, release <42 (it's always UTC)
|
||||
revocation_date = revoked.revocation_date.replace(tzinfo=timezone.utc)
|
||||
ret["revoked_certificates"].update(
|
||||
{
|
||||
x509util.dec2hex(revoked.serial_number).replace(":", ""): {
|
||||
"revocation_date": revoked.revocation_date.strftime(
|
||||
x509util.TIME_FMT
|
||||
),
|
||||
"revocation_date": revocation_date.strftime(x509util.TIME_FMT),
|
||||
"extensions": _parse_crl_entry_extensions(revoked.extensions),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -183,9 +183,9 @@ according to the www policy.
|
|||
|
||||
import base64
|
||||
import copy
|
||||
import datetime
|
||||
import logging
|
||||
import os.path
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import salt.utils.files
|
||||
from salt.exceptions import CommandExecutionError, SaltInvocationError
|
||||
|
@ -487,11 +487,16 @@ def certificate_managed(
|
|||
else None
|
||||
):
|
||||
changes["pkcs12_friendlyname"] = pkcs12_friendlyname
|
||||
try:
|
||||
curr_not_after = current.not_valid_after_utc
|
||||
except AttributeError:
|
||||
# naive datetime object, release <42 (it's always UTC)
|
||||
curr_not_after = current.not_valid_after.replace(
|
||||
tzinfo=timezone.utc
|
||||
)
|
||||
|
||||
if (
|
||||
current.not_valid_after
|
||||
< datetime.datetime.utcnow()
|
||||
+ datetime.timedelta(days=days_remaining)
|
||||
if curr_not_after < datetime.now(tz=timezone.utc) + timedelta(
|
||||
days=days_remaining
|
||||
):
|
||||
changes["expiration"] = True
|
||||
|
||||
|
@ -896,10 +901,14 @@ def crl_managed(
|
|||
|
||||
if encoding != current_encoding:
|
||||
changes["encoding"] = encoding
|
||||
try:
|
||||
curr_next_update = current.next_update_utc
|
||||
except AttributeError:
|
||||
# naive datetime object, release <42 (it's always UTC)
|
||||
curr_next_update = current.next_update.replace(tzinfo=timezone.utc)
|
||||
if days_remaining and (
|
||||
current.next_update
|
||||
< datetime.datetime.utcnow()
|
||||
+ datetime.timedelta(days=days_remaining)
|
||||
curr_next_update
|
||||
< datetime.now(tz=timezone.utc) + timedelta(days=days_remaining)
|
||||
):
|
||||
changes["expiration"] = True
|
||||
|
||||
|
|
|
@ -6,16 +6,24 @@ A salt util for modifying firewall settings.
|
|||
|
||||
This util allows you to modify firewall settings in the local group policy in
|
||||
addition to the normal firewall settings. Parameters are taken from the
|
||||
netsh advfirewall prompt.
|
||||
netsh advfirewall prompt. This utility has been adapted to use powershell
|
||||
instead of the ``netsh`` command to make it compatible with non-English systems.
|
||||
It maintains the ``netsh`` commands and parameters, but it is using powershell
|
||||
under the hood.
|
||||
|
||||
.. versionchanged:: 3008.0
|
||||
|
||||
.. note::
|
||||
More information can be found in the advfirewall context in netsh. This can
|
||||
be access by opening a netsh prompt. At a command prompt type the following:
|
||||
be accessed by opening a netsh prompt. At a command prompt type the
|
||||
following:
|
||||
|
||||
c:\>netsh
|
||||
netsh>advfirewall
|
||||
netsh advfirewall>set help
|
||||
netsh advfirewall>set domain help
|
||||
.. code-block:: powershell
|
||||
|
||||
c:\>netsh
|
||||
netsh>advfirewall
|
||||
netsh advfirewall>set help
|
||||
netsh advfirewall>set domain help
|
||||
|
||||
Usage:
|
||||
|
||||
|
@ -66,87 +74,73 @@ Usage:
|
|||
store='lgpo')
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import tempfile
|
||||
from textwrap import dedent
|
||||
|
||||
import salt.modules.cmdmod
|
||||
import salt.utils.platform
|
||||
import salt.utils.win_pwsh
|
||||
from salt.exceptions import CommandExecutionError
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
__hostname__ = socket.gethostname()
|
||||
__virtualname__ = "netsh"
|
||||
ON_OFF = {
|
||||
0: "OFF",
|
||||
1: "ON",
|
||||
2: "NotConfigured",
|
||||
"off": "False",
|
||||
"on": "True",
|
||||
"notconfigured": "NotConfigured",
|
||||
}
|
||||
|
||||
ENABLE_DISABLE = {
|
||||
0: "Disable",
|
||||
1: "Enable",
|
||||
2: "NotConfigured",
|
||||
"disable": 0,
|
||||
"enable": 1,
|
||||
"notconfigured": 2,
|
||||
}
|
||||
OUTBOUND = {
|
||||
0: "NotConfigured",
|
||||
2: "AllowOutbound",
|
||||
4: "BlockOutbound",
|
||||
"notconfigured": "NotConfigured",
|
||||
"allowoutbound": "Allow",
|
||||
"blockoutbound": "Block",
|
||||
}
|
||||
|
||||
|
||||
# Although utils are often directly imported, it is also possible to use the
|
||||
# loader.
|
||||
def __virtual__():
|
||||
def _get_inbound_text(rule, action):
|
||||
"""
|
||||
Only load if on a Windows system
|
||||
The "Inbound connections" setting is a combination of 2 parameters:
|
||||
|
||||
- AllowInboundRules
|
||||
- DefaultInboundAction
|
||||
|
||||
The settings are as follows:
|
||||
|
||||
Rules Action
|
||||
2 2 AllowInbound
|
||||
2 4 BlockInbound
|
||||
0 4 BlockInboundAlways
|
||||
2 0 NotConfigured
|
||||
"""
|
||||
if not salt.utils.platform.is_windows():
|
||||
return False, "This utility only available on Windows"
|
||||
|
||||
return __virtualname__
|
||||
settings = {
|
||||
0: {
|
||||
4: "BlockInboundAlways",
|
||||
},
|
||||
2: {
|
||||
0: "NotConfigured",
|
||||
2: "AllowInbound",
|
||||
4: "BlockInbound",
|
||||
},
|
||||
}
|
||||
return settings[rule][action]
|
||||
|
||||
|
||||
def _netsh_file(content):
|
||||
"""
|
||||
helper function to get the results of ``netsh -f content.txt``
|
||||
|
||||
Running ``netsh`` will drop you into a ``netsh`` prompt where you can issue
|
||||
``netsh`` commands. You can put a series of commands in an external file and
|
||||
run them as if from a ``netsh`` prompt using the ``-f`` switch. That's what
|
||||
this function does.
|
||||
|
||||
Args:
|
||||
|
||||
content (str):
|
||||
The contents of the file that will be run by the ``netsh -f``
|
||||
command
|
||||
|
||||
Returns:
|
||||
str: The text returned by the netsh command
|
||||
"""
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w", prefix="salt-", suffix=".netsh", delete=False, encoding="utf-8"
|
||||
) as fp:
|
||||
fp.write(content)
|
||||
try:
|
||||
log.debug("%s:\n%s", fp.name, content)
|
||||
return salt.modules.cmdmod.run(f"netsh -f {fp.name}", python_shell=True)
|
||||
finally:
|
||||
os.remove(fp.name)
|
||||
|
||||
|
||||
def _netsh_command(command, store):
|
||||
if store.lower() not in ("local", "lgpo"):
|
||||
raise ValueError(f"Incorrect store: {store}")
|
||||
# set the store for local or lgpo
|
||||
if store.lower() == "local":
|
||||
netsh_script = dedent(
|
||||
"""\
|
||||
advfirewall
|
||||
set store local
|
||||
{}
|
||||
""".format(
|
||||
command
|
||||
)
|
||||
)
|
||||
else:
|
||||
netsh_script = dedent(
|
||||
"""\
|
||||
advfirewall
|
||||
set store gpo = {}
|
||||
{}
|
||||
""".format(
|
||||
__hostname__, command
|
||||
)
|
||||
)
|
||||
return _netsh_file(content=netsh_script).splitlines()
|
||||
def _get_inbound_settings(text):
|
||||
settings = {
|
||||
"allowinbound": (2, 2),
|
||||
"blockinbound": (2, 4),
|
||||
"blockinboundalways": (0, 4),
|
||||
"notconfigured": (2, 0),
|
||||
}
|
||||
return settings[text.lower()]
|
||||
|
||||
|
||||
def get_settings(profile, section, store="local"):
|
||||
|
@ -195,70 +189,54 @@ def get_settings(profile, section, store="local"):
|
|||
raise ValueError(f"Incorrect section: {section}")
|
||||
if store.lower() not in ("local", "lgpo"):
|
||||
raise ValueError(f"Incorrect store: {store}")
|
||||
command = f"show {profile}profile {section}"
|
||||
# run it
|
||||
results = _netsh_command(command=command, store=store)
|
||||
# sample output:
|
||||
# Domain Profile Settings:
|
||||
# ----------------------------------------------------------------------
|
||||
# LocalFirewallRules N/A (GPO-store only)
|
||||
# LocalConSecRules N/A (GPO-store only)
|
||||
# InboundUserNotification Disable
|
||||
# RemoteManagement Disable
|
||||
# UnicastResponseToMulticast Enable
|
||||
|
||||
# if it's less than 3 lines it failed
|
||||
if len(results) < 3:
|
||||
raise CommandExecutionError(f"Invalid results: {results}")
|
||||
ret = {}
|
||||
# Skip the first 2 lines. Add everything else to a dictionary
|
||||
for line in results[3:]:
|
||||
ret.update(dict(list(zip(*[iter(re.split(r"\s{2,}", line))] * 2))))
|
||||
# Build the powershell command
|
||||
cmd = ["Get-NetFirewallProfile"]
|
||||
if profile:
|
||||
cmd.append(profile)
|
||||
if store and store.lower() == "lgpo":
|
||||
cmd.extend(["-PolicyStore", "localhost"])
|
||||
|
||||
# Remove spaces from the values so that `Not Configured` is detected
|
||||
# correctly
|
||||
for item in ret:
|
||||
ret[item] = ret[item].replace(" ", "")
|
||||
# Run the command
|
||||
settings = salt.utils.win_pwsh.run_dict(cmd)
|
||||
|
||||
# special handling for firewallpolicy
|
||||
if section == "firewallpolicy":
|
||||
inbound, outbound = ret["Firewall Policy"].split(",")
|
||||
return {"Inbound": inbound, "Outbound": outbound}
|
||||
# A successful run should return a dictionary
|
||||
if not settings:
|
||||
raise CommandExecutionError("LGPO NETSH: An unknown error occurred")
|
||||
|
||||
return ret
|
||||
# Remove the junk
|
||||
for setting in list(settings.keys()):
|
||||
if setting.startswith("Cim"):
|
||||
settings.pop(setting)
|
||||
|
||||
# Make it look like netsh output
|
||||
ret_settings = {
|
||||
"firewallpolicy": {
|
||||
"Inbound": _get_inbound_text(
|
||||
settings["AllowInboundRules"], settings["DefaultInboundAction"]
|
||||
),
|
||||
"Outbound": OUTBOUND[settings["DefaultOutboundAction"]],
|
||||
},
|
||||
"state": {
|
||||
"State": ON_OFF[settings["Enabled"]],
|
||||
},
|
||||
"logging": {
|
||||
"FileName": settings["LogFileName"],
|
||||
"LogAllowedConnections": ENABLE_DISABLE[settings["LogAllowed"]],
|
||||
"LogDroppedConnections": ENABLE_DISABLE[settings["LogBlocked"]],
|
||||
"MaxFileSize": settings["LogMaxSizeKilobytes"],
|
||||
},
|
||||
"settings": {
|
||||
"InboundUserNotification": ENABLE_DISABLE[settings["NotifyOnListen"]],
|
||||
"LocalConSecRules": ENABLE_DISABLE[settings["AllowLocalIPsecRules"]],
|
||||
"LocalFirewallRules": ENABLE_DISABLE[settings["AllowLocalFirewallRules"]],
|
||||
"UnicastResponseToMulticast": ENABLE_DISABLE[
|
||||
settings["AllowUnicastResponseToMulticast"]
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
def get_all_settings(profile, store="local"):
|
||||
"""
|
||||
Gets all the properties for the specified profile in the specified store
|
||||
|
||||
Args:
|
||||
|
||||
profile (str):
|
||||
The firewall profile to query. Valid options are:
|
||||
|
||||
- domain
|
||||
- public
|
||||
- private
|
||||
|
||||
store (str):
|
||||
The store to use. This is either the local firewall policy or the
|
||||
policy defined by local group policy. Valid options are:
|
||||
|
||||
- lgpo
|
||||
- local
|
||||
|
||||
Default is ``local``
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing the specified settings
|
||||
"""
|
||||
ret = dict()
|
||||
ret.update(get_settings(profile=profile, section="state", store=store))
|
||||
ret.update(get_settings(profile=profile, section="firewallpolicy", store=store))
|
||||
ret.update(get_settings(profile=profile, section="settings", store=store))
|
||||
ret.update(get_settings(profile=profile, section="logging", store=store))
|
||||
return ret
|
||||
return ret_settings[section.lower()]
|
||||
|
||||
|
||||
def get_all_profiles(store="local"):
|
||||
|
@ -286,6 +264,82 @@ def get_all_profiles(store="local"):
|
|||
}
|
||||
|
||||
|
||||
def get_all_settings(profile, store="local"):
|
||||
"""
|
||||
Gets all the properties for the specified profile in the specified store
|
||||
|
||||
Args:
|
||||
|
||||
profile (str):
|
||||
The firewall profile to query. Valid options are:
|
||||
|
||||
- domain
|
||||
- public
|
||||
- private
|
||||
|
||||
store (str):
|
||||
The store to use. This is either the local firewall policy or the
|
||||
policy defined by local group policy. Valid options are:
|
||||
|
||||
- lgpo
|
||||
- local
|
||||
|
||||
Default is ``local``
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing the specified settings
|
||||
|
||||
Raises:
|
||||
CommandExecutionError: If an error occurs
|
||||
ValueError: If the parameters are incorrect
|
||||
"""
|
||||
# validate input
|
||||
if profile.lower() not in ("domain", "public", "private"):
|
||||
raise ValueError(f"Incorrect profile: {profile}")
|
||||
if store.lower() not in ("local", "lgpo"):
|
||||
raise ValueError(f"Incorrect store: {store}")
|
||||
|
||||
# Build the powershell command
|
||||
cmd = ["Get-NetFirewallProfile"]
|
||||
if profile:
|
||||
cmd.append(profile)
|
||||
if store and store.lower() == "lgpo":
|
||||
cmd.extend(["-PolicyStore", "localhost"])
|
||||
|
||||
# Run the command
|
||||
settings = salt.utils.win_pwsh.run_dict(cmd)
|
||||
|
||||
# A successful run should return a dictionary
|
||||
if not settings:
|
||||
raise CommandExecutionError("LGPO NETSH: An unknown error occurred")
|
||||
|
||||
# Remove the junk
|
||||
for setting in list(settings.keys()):
|
||||
if setting.startswith("Cim"):
|
||||
settings.pop(setting)
|
||||
|
||||
# Make it look like netsh output
|
||||
ret_settings = {
|
||||
"FileName": settings["LogFileName"],
|
||||
"Inbound": _get_inbound_text(
|
||||
settings["AllowInboundRules"], settings["DefaultInboundAction"]
|
||||
),
|
||||
"InboundUserNotification": ENABLE_DISABLE[settings["NotifyOnListen"]],
|
||||
"LocalConSecRules": ENABLE_DISABLE[settings["AllowLocalIPsecRules"]],
|
||||
"LocalFirewallRules": ENABLE_DISABLE[settings["AllowLocalFirewallRules"]],
|
||||
"LogAllowedConnections": ENABLE_DISABLE[settings["LogAllowed"]],
|
||||
"LogDroppedConnections": ENABLE_DISABLE[settings["LogBlocked"]],
|
||||
"MaxFileSize": settings["LogMaxSizeKilobytes"],
|
||||
"Outbound": OUTBOUND[settings["DefaultOutboundAction"]],
|
||||
"State": ON_OFF[settings["Enabled"]],
|
||||
"UnicastResponseToMulticast": ON_OFF[
|
||||
settings["AllowUnicastResponseToMulticast"]
|
||||
],
|
||||
}
|
||||
|
||||
return ret_settings
|
||||
|
||||
|
||||
def set_firewall_settings(profile, inbound=None, outbound=None, store="local"):
|
||||
"""
|
||||
Set the firewall inbound/outbound settings for the specified profile and
|
||||
|
@ -307,7 +361,7 @@ def set_firewall_settings(profile, inbound=None, outbound=None, store="local"):
|
|||
- blockinbound
|
||||
- blockinboundalways
|
||||
- allowinbound
|
||||
- notconfigured
|
||||
- notconfigured <=== lgpo only
|
||||
|
||||
Default is ``None``
|
||||
|
||||
|
@ -317,7 +371,7 @@ def set_firewall_settings(profile, inbound=None, outbound=None, store="local"):
|
|||
|
||||
- allowoutbound
|
||||
- blockoutbound
|
||||
- notconfigured
|
||||
- notconfigured <=== lgpo only
|
||||
|
||||
Default is ``None``
|
||||
|
||||
|
@ -355,21 +409,34 @@ def set_firewall_settings(profile, inbound=None, outbound=None, store="local"):
|
|||
raise ValueError(f"Incorrect outbound value: {outbound}")
|
||||
if not inbound and not outbound:
|
||||
raise ValueError("Must set inbound or outbound")
|
||||
if store == "local":
|
||||
if inbound and inbound.lower() == "notconfigured":
|
||||
msg = "Cannot set local inbound policies as NotConfigured"
|
||||
raise CommandExecutionError(msg)
|
||||
if outbound and outbound.lower() == "notconfigured":
|
||||
msg = "Cannot set local outbound policies as NotConfigured"
|
||||
raise CommandExecutionError(msg)
|
||||
|
||||
# You have to specify inbound and outbound setting at the same time
|
||||
# If you're only specifying one, you have to get the current setting for the
|
||||
# other
|
||||
if not inbound or not outbound:
|
||||
ret = get_settings(profile=profile, section="firewallpolicy", store=store)
|
||||
if not inbound:
|
||||
inbound = ret["Inbound"]
|
||||
if not outbound:
|
||||
outbound = ret["Outbound"]
|
||||
# Build the powershell command
|
||||
cmd = ["Set-NetFirewallProfile"]
|
||||
if profile:
|
||||
cmd.append(profile)
|
||||
if store and store.lower() == "lgpo":
|
||||
cmd.extend(["-PolicyStore", "localhost"])
|
||||
|
||||
command = f"set {profile}profile firewallpolicy {inbound},{outbound}"
|
||||
# Get inbound settings
|
||||
if inbound:
|
||||
in_rule, in_action = _get_inbound_settings(inbound.lower())
|
||||
cmd.extend(["-AllowInboundRules", in_rule, "-DefaultInboundAction", in_action])
|
||||
|
||||
results = _netsh_command(command=command, store=store)
|
||||
if outbound:
|
||||
out_rule = OUTBOUND[outbound.lower()]
|
||||
cmd.extend(["-DefaultOutboundAction", out_rule])
|
||||
|
||||
# Run the command
|
||||
results = salt.utils.win_pwsh.run_dict(cmd)
|
||||
|
||||
# A successful run should return an empty list
|
||||
if results:
|
||||
raise CommandExecutionError(f"An error occurred: {results}")
|
||||
|
||||
|
@ -442,6 +509,10 @@ def set_logging_settings(profile, setting, value, store="local"):
|
|||
# Input validation
|
||||
if profile.lower() not in ("domain", "public", "private"):
|
||||
raise ValueError(f"Incorrect profile: {profile}")
|
||||
if store == "local":
|
||||
if str(value).lower() == "notconfigured":
|
||||
msg = "Cannot set local policies as NotConfigured"
|
||||
raise CommandExecutionError(msg)
|
||||
if setting.lower() not in (
|
||||
"allowedconnections",
|
||||
"droppedconnections",
|
||||
|
@ -449,13 +520,21 @@ def set_logging_settings(profile, setting, value, store="local"):
|
|||
"maxfilesize",
|
||||
):
|
||||
raise ValueError(f"Incorrect setting: {setting}")
|
||||
settings = {"filename": ["-LogFileName", value]}
|
||||
if setting.lower() in ("allowedconnections", "droppedconnections"):
|
||||
if value.lower() not in ("enable", "disable", "notconfigured"):
|
||||
raise ValueError(f"Incorrect value: {value}")
|
||||
settings.update(
|
||||
{
|
||||
"allowedconnections": ["-LogAllowed", ENABLE_DISABLE[value.lower()]],
|
||||
"droppedconnections": ["-LogBlocked", ENABLE_DISABLE[value.lower()]],
|
||||
}
|
||||
)
|
||||
|
||||
# TODO: Consider adding something like the following to validate filename
|
||||
# https://stackoverflow.com/questions/9532499/check-whether-a-path-is-valid-in-python-without-creating-a-file-at-the-paths-ta
|
||||
if setting.lower() == "maxfilesize":
|
||||
if value.lower() != "notconfigured":
|
||||
if str(value).lower() != "notconfigured":
|
||||
# Must be a number between 1 and 32767
|
||||
try:
|
||||
int(value)
|
||||
|
@ -463,9 +542,18 @@ def set_logging_settings(profile, setting, value, store="local"):
|
|||
raise ValueError(f"Incorrect value: {value}")
|
||||
if not 1 <= int(value) <= 32767:
|
||||
raise ValueError(f"Incorrect value: {value}")
|
||||
# Run the command
|
||||
command = f"set {profile}profile logging {setting} {value}"
|
||||
results = _netsh_command(command=command, store=store)
|
||||
settings.update({"maxfilesize": ["-LogMaxSizeKilobytes", value]})
|
||||
|
||||
# Build the powershell command
|
||||
cmd = ["Set-NetFirewallProfile"]
|
||||
if profile:
|
||||
cmd.append(profile)
|
||||
if store and store.lower() == "lgpo":
|
||||
cmd.extend(["-PolicyStore", "localhost"])
|
||||
|
||||
cmd.extend(settings[setting.lower()])
|
||||
|
||||
results = salt.utils.win_pwsh.run_dict(cmd)
|
||||
|
||||
# A successful run should return an empty list
|
||||
if results:
|
||||
|
@ -493,7 +581,6 @@ def set_settings(profile, setting, value, store="local"):
|
|||
- localfirewallrules
|
||||
- localconsecrules
|
||||
- inboundusernotification
|
||||
- remotemanagement
|
||||
- unicastresponsetomulticast
|
||||
|
||||
value (str):
|
||||
|
@ -526,16 +613,42 @@ def set_settings(profile, setting, value, store="local"):
|
|||
"localfirewallrules",
|
||||
"localconsecrules",
|
||||
"inboundusernotification",
|
||||
"remotemanagement",
|
||||
"unicastresponsetomulticast",
|
||||
):
|
||||
raise ValueError(f"Incorrect setting: {setting}")
|
||||
if value.lower() not in ("enable", "disable", "notconfigured"):
|
||||
raise ValueError(f"Incorrect value: {value}")
|
||||
if setting.lower() in ["localfirewallrules", "localconsecrules"]:
|
||||
if store.lower() != "lgpo":
|
||||
msg = f"{setting} can only be set using Group Policy"
|
||||
raise CommandExecutionError(msg)
|
||||
if setting.lower() == "inboundusernotification" and store.lower() != "lgpo":
|
||||
if value.lower() == "notconfigured":
|
||||
msg = "NotConfigured is only valid when setting group policy"
|
||||
raise CommandExecutionError(msg)
|
||||
|
||||
# Run the command
|
||||
command = f"set {profile}profile settings {setting} {value}"
|
||||
results = _netsh_command(command=command, store=store)
|
||||
# Build the powershell command
|
||||
cmd = ["Set-NetFirewallProfile"]
|
||||
if profile:
|
||||
cmd.append(profile)
|
||||
if store and store.lower() == "lgpo":
|
||||
cmd.extend(["-PolicyStore", "localhost"])
|
||||
|
||||
settings = {
|
||||
"localfirewallrules": [
|
||||
"-AllowLocalFirewallRules",
|
||||
ENABLE_DISABLE[value.lower()],
|
||||
],
|
||||
"localconsecrules": ["-AllowLocalIPsecRules", ENABLE_DISABLE[value.lower()]],
|
||||
"inboundusernotification": ["-NotifyOnListen", ENABLE_DISABLE[value.lower()]],
|
||||
"unicastresponsetomulticast": [
|
||||
"-AllowUnicastResponseToMulticast",
|
||||
ENABLE_DISABLE[value.lower()],
|
||||
],
|
||||
}
|
||||
cmd.extend(settings[setting.lower()])
|
||||
|
||||
results = salt.utils.win_pwsh.run_dict(cmd)
|
||||
|
||||
# A successful run should return an empty list
|
||||
if results:
|
||||
|
@ -546,7 +659,7 @@ def set_settings(profile, setting, value, store="local"):
|
|||
|
||||
def set_state(profile, state, store="local"):
|
||||
"""
|
||||
Configure the firewall state.
|
||||
Enable or disable the firewall profile.
|
||||
|
||||
Args:
|
||||
|
||||
|
@ -583,12 +696,22 @@ def set_state(profile, state, store="local"):
|
|||
# Input validation
|
||||
if profile.lower() not in ("domain", "public", "private"):
|
||||
raise ValueError(f"Incorrect profile: {profile}")
|
||||
if state.lower() not in ("on", "off", "notconfigured"):
|
||||
raise ValueError(f"Incorrect state: {state}")
|
||||
if not isinstance(state, bool):
|
||||
if state.lower() not in ("on", "off", "notconfigured"):
|
||||
raise ValueError(f"Incorrect state: {state}")
|
||||
else:
|
||||
state = "On" if state else "Off"
|
||||
|
||||
# Run the command
|
||||
command = f"set {profile}profile state {state}"
|
||||
results = _netsh_command(command=command, store=store)
|
||||
# Build the powershell command
|
||||
cmd = ["Set-NetFirewallProfile"]
|
||||
if profile:
|
||||
cmd.append(profile)
|
||||
if store and store.lower() == "lgpo":
|
||||
cmd.extend(["-PolicyStore", "localhost"])
|
||||
|
||||
cmd.extend(["-Enabled", ON_OFF[state.lower()]])
|
||||
|
||||
results = salt.utils.win_pwsh.run_dict(cmd)
|
||||
|
||||
# A successful run should return an empty list
|
||||
if results:
|
||||
|
|
|
@ -3,26 +3,16 @@ import salt.utils.json
|
|||
import salt.utils.platform
|
||||
from salt.exceptions import CommandExecutionError
|
||||
|
||||
__virtualname__ = "win_pwsh"
|
||||
|
||||
|
||||
def __virtual__():
|
||||
"""
|
||||
Only load if windows
|
||||
"""
|
||||
if not salt.utils.platform.is_windows():
|
||||
return False, "This utility will only run on Windows"
|
||||
|
||||
return __virtualname__
|
||||
|
||||
|
||||
def run_dict(cmd, cwd=None):
|
||||
"""
|
||||
Execute the powershell command and return the data as a dictionary
|
||||
|
||||
.. versionadded:: 3006.9
|
||||
|
||||
Args:
|
||||
|
||||
cmd (str): The powershell command to run
|
||||
cmd (str,list): The powershell command to run
|
||||
|
||||
cwd (str): The current working directory
|
||||
|
||||
|
@ -34,6 +24,8 @@ def run_dict(cmd, cwd=None):
|
|||
If an error is encountered or the command does not complete
|
||||
successfully
|
||||
"""
|
||||
if isinstance(cmd, list):
|
||||
cmd = " ".join(map(str, cmd))
|
||||
if "convertto-json" not in cmd.lower():
|
||||
cmd = f"{cmd} | ConvertTo-Json"
|
||||
if "progresspreference" not in cmd.lower():
|
||||
|
|
|
@ -52,6 +52,9 @@ def __virtual__():
|
|||
|
||||
|
||||
def split_username(username):
|
||||
"""
|
||||
Splits out the username from the domain name and returns both.
|
||||
"""
|
||||
domain = "."
|
||||
user_name = username
|
||||
if "@" in username:
|
||||
|
@ -234,7 +237,7 @@ def runas(cmdLine, username, password=None, cwd=None):
|
|||
fd_out = msvcrt.open_osfhandle(stdout_read.handle, os.O_RDONLY | os.O_TEXT)
|
||||
with os.fdopen(fd_out, "r") as f_out:
|
||||
stdout = f_out.read()
|
||||
ret["stdout"] = stdout
|
||||
ret["stdout"] = stdout.strip()
|
||||
|
||||
# Read standard error
|
||||
fd_err = msvcrt.open_osfhandle(stderr_read.handle, os.O_RDONLY | os.O_TEXT)
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
import base64
|
||||
import copy
|
||||
import datetime
|
||||
import ipaddress
|
||||
import logging
|
||||
import os.path
|
||||
import re
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from enum import Enum
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
|
||||
|
@ -313,14 +313,14 @@ def build_crt(
|
|||
)
|
||||
|
||||
not_before = (
|
||||
datetime.datetime.strptime(not_before, TIME_FMT)
|
||||
datetime.strptime(not_before, TIME_FMT).replace(tzinfo=timezone.utc)
|
||||
if not_before
|
||||
else datetime.datetime.utcnow()
|
||||
else datetime.now(tz=timezone.utc)
|
||||
)
|
||||
not_after = (
|
||||
datetime.datetime.strptime(not_after, TIME_FMT)
|
||||
datetime.strptime(not_after, TIME_FMT).replace(tzinfo=timezone.utc)
|
||||
if not_after
|
||||
else datetime.datetime.utcnow() + datetime.timedelta(days=days_valid)
|
||||
else datetime.now(tz=timezone.utc) + timedelta(days=days_valid)
|
||||
)
|
||||
builder = builder.not_valid_before(not_before).not_valid_after(not_after)
|
||||
|
||||
|
@ -422,32 +422,38 @@ def build_crl(
|
|||
builder = cx509.CertificateRevocationListBuilder()
|
||||
if signing_cert:
|
||||
builder = builder.issuer_name(signing_cert.subject)
|
||||
builder = builder.last_update(datetime.datetime.today())
|
||||
builder = builder.last_update(datetime.now(tz=timezone.utc))
|
||||
builder = builder.next_update(
|
||||
datetime.datetime.today() + datetime.timedelta(days=days_valid)
|
||||
datetime.now(tz=timezone.utc) + timedelta(days=days_valid)
|
||||
)
|
||||
for rev in revoked:
|
||||
serial_number = not_after = revocation_date = None
|
||||
if "not_after" in rev:
|
||||
not_after = datetime.datetime.strptime(rev["not_after"], TIME_FMT)
|
||||
not_after = datetime.strptime(rev["not_after"], TIME_FMT).replace(
|
||||
tzinfo=timezone.utc
|
||||
)
|
||||
if "serial_number" in rev:
|
||||
serial_number = rev["serial_number"]
|
||||
if "certificate" in rev:
|
||||
rev_cert = load_cert(rev["certificate"])
|
||||
serial_number = rev_cert.serial_number
|
||||
not_after = rev_cert.not_valid_after
|
||||
try:
|
||||
not_after = rev_cert.not_valid_after_utc
|
||||
except AttributeError:
|
||||
# naive datetime object, release <42 (it's always UTC)
|
||||
not_after = rev_cert.not_valid_after.replace(tzinfo=timezone.utc)
|
||||
if not serial_number:
|
||||
raise SaltInvocationError("Need serial_number or certificate")
|
||||
serial_number = _get_serial_number(serial_number)
|
||||
if not_after and not include_expired:
|
||||
if datetime.datetime.utcnow() > not_after:
|
||||
if datetime.now(tz=timezone.utc) > not_after:
|
||||
continue
|
||||
if "revocation_date" in rev:
|
||||
revocation_date = datetime.datetime.strptime(
|
||||
revocation_date = datetime.strptime(
|
||||
rev["revocation_date"], TIME_FMT
|
||||
)
|
||||
).replace(tzinfo=timezone.utc)
|
||||
else:
|
||||
revocation_date = datetime.datetime.utcnow()
|
||||
revocation_date = datetime.now(tz=timezone.utc)
|
||||
|
||||
revoked_cert = cx509.RevokedCertificateBuilder(
|
||||
serial_number=serial_number, revocation_date=revocation_date
|
||||
|
@ -1624,8 +1630,9 @@ def _create_invalidity_date(val, **kwargs):
|
|||
if critical:
|
||||
val = val.split(" ", maxsplit=1)[1]
|
||||
try:
|
||||
# InvalidityDate deals in naive datetime objects only currently
|
||||
return (
|
||||
cx509.InvalidityDate(datetime.datetime.strptime(val, TIME_FMT)),
|
||||
cx509.InvalidityDate(datetime.strptime(val, TIME_FMT)),
|
||||
critical,
|
||||
)
|
||||
except ValueError as err:
|
||||
|
|
|
@ -596,39 +596,3 @@ class CMDModuleTest(ModuleCase):
|
|||
).splitlines()
|
||||
self.assertIn("abc=123", out)
|
||||
self.assertIn("ABC=456", out)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows")
|
||||
def test_windows_powershell_script_args(self):
|
||||
"""
|
||||
Ensure that powershell processes inline script in args
|
||||
"""
|
||||
val = "i like cheese"
|
||||
args = (
|
||||
'-SecureString (ConvertTo-SecureString -String "{}" -AsPlainText -Force)'
|
||||
" -ErrorAction Stop".format(val)
|
||||
)
|
||||
script = "salt://issue-56195/test.ps1"
|
||||
ret = self.run_function(
|
||||
"cmd.script", [script], args=args, shell="powershell", saltenv="base"
|
||||
)
|
||||
self.assertEqual(ret["stdout"], val)
|
||||
|
||||
@pytest.mark.slow_test
|
||||
@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows")
|
||||
@pytest.mark.skip_if_binaries_missing("pwsh")
|
||||
def test_windows_powershell_script_args_pwsh(self):
|
||||
"""
|
||||
Ensure that powershell processes inline script in args with powershell
|
||||
core
|
||||
"""
|
||||
val = "i like cheese"
|
||||
args = (
|
||||
'-SecureString (ConvertTo-SecureString -String "{}" -AsPlainText -Force)'
|
||||
" -ErrorAction Stop".format(val)
|
||||
)
|
||||
script = "salt://issue-56195/test.ps1"
|
||||
ret = self.run_function(
|
||||
"cmd.script", [script], args=args, shell="pwsh", saltenv="base"
|
||||
)
|
||||
self.assertEqual(ret["stdout"], val)
|
||||
|
|
|
@ -1,10 +1,7 @@
|
|||
import base64
|
||||
|
||||
import pytest
|
||||
|
||||
import salt.modules.cmdmod as cmdmod
|
||||
import salt.utils.path
|
||||
import salt.utils.stringutils
|
||||
|
||||
pytestmark = [
|
||||
pytest.mark.windows_whitelisted,
|
||||
|
@ -18,88 +15,197 @@ def shell(request):
|
|||
This will run the test on powershell and powershell core (pwsh). If
|
||||
powershell core is not installed that test run will be skipped
|
||||
"""
|
||||
|
||||
if request.param == "pwsh" and salt.utils.path.which("pwsh") is None:
|
||||
pytest.skip("Powershell 7 Not Present")
|
||||
return request.param
|
||||
|
||||
|
||||
def test_powershell(shell):
|
||||
@pytest.fixture(scope="module")
|
||||
def account():
|
||||
with pytest.helpers.create_account() as _account:
|
||||
yield _account
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"cmd, expected, encode_cmd",
|
||||
[
|
||||
("Write-Output Foo", "Foo", False),
|
||||
(["Write-Output", "Foo"], "Foo", False),
|
||||
('Write-Output "Encoded Foo"', "Encoded Foo", True),
|
||||
(["Write-Output", '"Encoded Foo"'], "Encoded Foo", True),
|
||||
],
|
||||
)
|
||||
def test_powershell(shell, cmd, expected, encode_cmd):
|
||||
"""
|
||||
Test cmd.powershell
|
||||
"""
|
||||
ret = cmdmod.powershell("Write-Output foo", shell=shell)
|
||||
assert ret == "foo"
|
||||
ret = cmdmod.powershell(cmd=cmd, encode_cmd=encode_cmd, shell=shell)
|
||||
assert ret == expected
|
||||
|
||||
|
||||
def test_powershell_encode_cmd(shell):
|
||||
@pytest.mark.parametrize(
|
||||
"cmd, expected, encode_cmd",
|
||||
[
|
||||
("Write-Output Foo", "Foo", False),
|
||||
(["Write-Output", "Foo"], "Foo", False),
|
||||
('Write-Output "Encoded Foo"', "Encoded Foo", True),
|
||||
(["Write-Output", '"Encoded Foo"'], "Encoded Foo", True),
|
||||
],
|
||||
)
|
||||
def test_powershell_runas(shell, account, cmd, expected, encode_cmd):
|
||||
"""
|
||||
Test cmd.powershell with encode_cmd
|
||||
Test cmd.powershell with runas
|
||||
"""
|
||||
ret = cmdmod.powershell('Write-Output "encoded foo"', encode_cmd=True, shell=shell)
|
||||
assert ret == "encoded foo"
|
||||
ret = cmdmod.powershell(
|
||||
cmd=cmd,
|
||||
encode_cmd=encode_cmd,
|
||||
shell=shell,
|
||||
runas=account.username,
|
||||
password=account.password,
|
||||
)
|
||||
assert ret == expected
|
||||
|
||||
|
||||
def test_powershell_all(shell):
|
||||
@pytest.mark.parametrize(
|
||||
"cmd, expected, encode_cmd",
|
||||
[
|
||||
("Write-Output Foo", "Foo", False),
|
||||
(["Write-Output", "Foo"], "Foo", False),
|
||||
('Write-Output "Encoded Foo"', "Encoded Foo", True),
|
||||
(["Write-Output", '"Encoded Foo"'], "Encoded Foo", True),
|
||||
],
|
||||
)
|
||||
def test_powershell_all(shell, cmd, expected, encode_cmd):
|
||||
"""
|
||||
Test cmd.powershell_all
|
||||
Test cmd.powershell_all. `encode_cmd` takes the passed command and encodes
|
||||
it. Different from encoded_command where it's receiving an already encoded
|
||||
command
|
||||
"""
|
||||
ret = cmdmod.powershell_all("Write-Output foo", shell=shell)
|
||||
ret = cmdmod.powershell_all(cmd=cmd, encode_cmd=encode_cmd, shell=shell)
|
||||
assert isinstance(ret["pid"], int)
|
||||
assert ret["retcode"] == 0
|
||||
assert ret["stderr"] == ""
|
||||
assert ret["result"] == "foo"
|
||||
assert ret["result"] == expected
|
||||
|
||||
|
||||
def test_powershell_all_encode_cmd(shell):
|
||||
@pytest.mark.parametrize(
|
||||
"cmd, expected, encode_cmd",
|
||||
[
|
||||
("Write-Output Foo", "Foo", False),
|
||||
(["Write-Output", "Foo"], "Foo", False),
|
||||
('Write-Output "Encoded Foo"', "Encoded Foo", True),
|
||||
(["Write-Output", '"Encoded Foo"'], "Encoded Foo", True),
|
||||
],
|
||||
)
|
||||
def test_powershell_all_runas(shell, account, cmd, expected, encode_cmd):
|
||||
"""
|
||||
Test cmd.powershell_all with encode_cmd
|
||||
Test cmd.powershell_all with runas. `encode_cmd` takes the passed command
|
||||
and encodes it. Different from encoded_command where it's receiving an
|
||||
already encoded command
|
||||
"""
|
||||
ret = cmdmod.powershell_all(
|
||||
'Write-Output "encoded foo"', encode_cmd=True, shell=shell
|
||||
cmd=cmd,
|
||||
encode_cmd=encode_cmd,
|
||||
shell=shell,
|
||||
runas=account.username,
|
||||
password=account.password,
|
||||
)
|
||||
assert isinstance(ret["pid"], int)
|
||||
assert ret["retcode"] == 0
|
||||
assert ret["stderr"] == ""
|
||||
assert ret["result"] == "encoded foo"
|
||||
assert ret["result"] == expected
|
||||
|
||||
|
||||
def test_cmd_run_all_powershell_list():
|
||||
@pytest.mark.parametrize(
|
||||
"cmd, expected, encoded_cmd",
|
||||
[
|
||||
("Write-Output Foo", "Foo", False),
|
||||
(["Write-Output", "Foo"], "Foo", False),
|
||||
(
|
||||
"VwByAGkAdABlAC0ASABvAHMAdAAgACcARQBuAGMAbwBkAGUAZAAgAEYAbwBvACcA",
|
||||
"Encoded Foo",
|
||||
True,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_cmd_run_all_powershell(shell, cmd, expected, encoded_cmd):
|
||||
"""
|
||||
Ensure that cmd.run_all supports running shell='powershell' with cmd passed
|
||||
as a list
|
||||
Ensure that cmd.run_all supports running shell='powershell'
|
||||
"""
|
||||
ret = cmdmod.run_all(cmd=cmd, shell=shell, encoded_cmd=encoded_cmd)
|
||||
assert ret["stdout"] == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"cmd, expected, encoded_cmd",
|
||||
[
|
||||
("Write-Output Foo", "Foo", False),
|
||||
(["Write-Output", "Foo"], "Foo", False),
|
||||
(
|
||||
"VwByAGkAdABlAC0ASABvAHMAdAAgACcARQBuAGMAbwBkAGUAZAAgAEYAbwBvACcA",
|
||||
"Encoded Foo",
|
||||
True,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_cmd_run_all_powershell_runas(shell, account, cmd, expected, encoded_cmd):
|
||||
"""
|
||||
Ensure that cmd.run_all with runas supports running shell='powershell'
|
||||
"""
|
||||
ret = cmdmod.run_all(
|
||||
cmd=["Write-Output", "salt"], python_shell=False, shell="powershell"
|
||||
cmd=cmd,
|
||||
shell=shell,
|
||||
encoded_cmd=encoded_cmd,
|
||||
runas=account.username,
|
||||
password=account.password,
|
||||
)
|
||||
assert ret["stdout"] == "salt"
|
||||
assert ret["stdout"] == expected
|
||||
|
||||
|
||||
def test_cmd_run_all_powershell_string():
|
||||
@pytest.mark.parametrize(
|
||||
"cmd, expected, encoded_cmd",
|
||||
[
|
||||
("Write-Output Foo", "Foo", False),
|
||||
(["Write-Output", "Foo"], "Foo", False),
|
||||
(
|
||||
"VwByAGkAdABlAC0ASABvAHMAdAAgACcARQBuAGMAbwBkAGUAZAAgAEYAbwBvACcA",
|
||||
"Encoded Foo",
|
||||
True,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_cmd_run_encoded_cmd(shell, cmd, expected, encoded_cmd):
|
||||
"""
|
||||
Ensure that cmd.run_all supports running shell='powershell' with cmd passed
|
||||
as a string
|
||||
Ensure that cmd.run supports running shell='powershell'
|
||||
"""
|
||||
ret = cmdmod.run_all(
|
||||
cmd="Write-Output salt", python_shell=False, shell="powershell"
|
||||
ret = cmdmod.run(
|
||||
cmd=cmd, shell=shell, encoded_cmd=encoded_cmd, redirect_stderr=False
|
||||
)
|
||||
assert ret["stdout"] == "salt"
|
||||
assert ret == expected
|
||||
|
||||
|
||||
def test_cmd_run_encoded_cmd(shell):
|
||||
cmd = "Write-Output 'encoded command'"
|
||||
cmd = f"$ProgressPreference='SilentlyContinue'; {cmd}"
|
||||
cmd_utf16 = cmd.encode("utf-16-le")
|
||||
encoded_cmd = base64.standard_b64encode(cmd_utf16)
|
||||
encoded_cmd = salt.utils.stringutils.to_str(encoded_cmd)
|
||||
ret = cmdmod.run(cmd=encoded_cmd, shell=shell, encoded_cmd=True)
|
||||
assert ret == "encoded command"
|
||||
|
||||
|
||||
def test_cmd_run_all_encoded_cmd(shell):
|
||||
cmd = "Write-Output 'encoded command'"
|
||||
cmd = f"$ProgressPreference='SilentlyContinue'; {cmd}"
|
||||
cmd_utf16 = cmd.encode("utf-16-le")
|
||||
encoded_cmd = base64.standard_b64encode(cmd_utf16)
|
||||
encoded_cmd = salt.utils.stringutils.to_str(encoded_cmd)
|
||||
ret = cmdmod.run_all(cmd=encoded_cmd, shell=shell, encoded_cmd=True)
|
||||
assert ret["stdout"] == "encoded command"
|
||||
@pytest.mark.parametrize(
|
||||
"cmd, expected, encoded_cmd",
|
||||
[
|
||||
("Write-Output Foo", "Foo", False),
|
||||
(["Write-Output", "Foo"], "Foo", False),
|
||||
(
|
||||
"VwByAGkAdABlAC0ASABvAHMAdAAgACcARQBuAGMAbwBkAGUAZAAgAEYAbwBvACcA",
|
||||
"Encoded Foo",
|
||||
True,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_cmd_run_encoded_cmd_runas(shell, account, cmd, expected, encoded_cmd):
|
||||
"""
|
||||
Ensure that cmd.run with runas supports running shell='powershell'
|
||||
"""
|
||||
ret = cmdmod.run(
|
||||
cmd=cmd,
|
||||
shell=shell,
|
||||
encoded_cmd=encoded_cmd,
|
||||
runas=account.username,
|
||||
password=account.password,
|
||||
)
|
||||
assert ret == expected
|
||||
|
|
87
tests/pytests/functional/modules/cmd/test_script.py
Normal file
87
tests/pytests/functional/modules/cmd/test_script.py
Normal file
|
@ -0,0 +1,87 @@
|
|||
import pytest
|
||||
|
||||
import salt.utils.path
|
||||
|
||||
pytestmark = [
|
||||
pytest.mark.core_test,
|
||||
pytest.mark.windows_whitelisted,
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def cmd(modules):
|
||||
return modules.cmd
|
||||
|
||||
|
||||
@pytest.fixture(params=["powershell", "pwsh"])
|
||||
def shell(request):
|
||||
"""
|
||||
This will run the test on powershell and powershell core (pwsh). If
|
||||
powershell core is not installed that test run will be skipped
|
||||
"""
|
||||
if request.param == "pwsh" and salt.utils.path.which("pwsh") is None:
|
||||
pytest.skip("Powershell 7 Not Present")
|
||||
return request.param
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def account():
|
||||
with pytest.helpers.create_account() as _account:
|
||||
yield _account
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def issue_56195(state_tree):
|
||||
contents = """
|
||||
[CmdLetBinding()]
|
||||
Param(
|
||||
[SecureString] $SecureString
|
||||
)
|
||||
$Credential = New-Object System.Net.NetworkCredential("DummyId", $SecureString)
|
||||
$Credential.Password
|
||||
"""
|
||||
with pytest.helpers.temp_file("test.ps1", contents, state_tree / "issue-56195"):
|
||||
yield
|
||||
|
||||
|
||||
@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows")
|
||||
def test_windows_script_args_powershell(cmd, shell, issue_56195):
|
||||
"""
|
||||
Ensure that powershell processes an inline script with args where the args
|
||||
contain powershell that needs to be rendered
|
||||
"""
|
||||
password = "i like cheese"
|
||||
args = (
|
||||
"-SecureString (ConvertTo-SecureString -String '{}' -AsPlainText -Force)"
|
||||
" -ErrorAction Stop".format(password)
|
||||
)
|
||||
script = "salt://issue-56195/test.ps1"
|
||||
|
||||
ret = cmd.script(source=script, args=args, shell="powershell", saltenv="base")
|
||||
|
||||
assert ret["stdout"] == password
|
||||
|
||||
|
||||
@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows")
|
||||
def test_windows_script_args_powershell_runas(cmd, shell, account, issue_56195):
|
||||
"""
|
||||
Ensure that powershell processes an inline script with args where the args
|
||||
contain powershell that needs to be rendered
|
||||
"""
|
||||
password = "i like cheese"
|
||||
args = (
|
||||
"-SecureString (ConvertTo-SecureString -String '{}' -AsPlainText -Force)"
|
||||
" -ErrorAction Stop".format(password)
|
||||
)
|
||||
script = "salt://issue-56195/test.ps1"
|
||||
|
||||
ret = cmd.script(
|
||||
source=script,
|
||||
args=args,
|
||||
shell="powershell",
|
||||
saltenv="base",
|
||||
runas=account.username,
|
||||
password=account.password,
|
||||
)
|
||||
|
||||
assert ret["stdout"] == password
|
|
@ -320,19 +320,22 @@ def test_setpassword_int(user, account_int):
|
|||
("logonscript", "\\\\server\\script.cmd", "", None),
|
||||
("expiration_date", "3/19/2024", "", "2024-03-19 00:00:00"),
|
||||
("expiration_date", "Never", "", None),
|
||||
("expired", True, "", None),
|
||||
("expired", False, "", None),
|
||||
("account_disabled", True, "", None),
|
||||
("account_disabled", False, "", None),
|
||||
("unlock_account", True, "account_locked", False),
|
||||
("password_never_expires", True, "", None),
|
||||
("password_never_expires", False, "", None),
|
||||
("expired", True, "", None),
|
||||
("expired", False, "", None),
|
||||
("disallow_change_password", True, "", None),
|
||||
("disallow_change_password", False, "", None),
|
||||
],
|
||||
)
|
||||
def test_update_str(user, value_name, new_value, info_field, expected, account_str):
|
||||
setting = {value_name: new_value}
|
||||
# You can't expire an account if the password never expires
|
||||
if value_name == "expired":
|
||||
setting.update({"password_never_expires": not new_value})
|
||||
ret = user.update(account_str.username, **setting)
|
||||
assert ret is True
|
||||
ret = user.info(account_str.username)
|
||||
|
|
|
@ -310,7 +310,7 @@ def test_powershell_empty():
|
|||
mock_run = {"pid": 1234, "retcode": 0, "stderr": "", "stdout": ""}
|
||||
with patch("salt.modules.cmdmod._run", return_value=mock_run):
|
||||
ret = cmdmod.powershell("Set-ExecutionPolicy RemoteSigned")
|
||||
assert ret == {}
|
||||
assert ret == ""
|
||||
|
||||
|
||||
def test_is_valid_shell_windows():
|
||||
|
@ -1052,57 +1052,97 @@ def test_runas_env_sudo_group(bundled):
|
|||
)
|
||||
|
||||
|
||||
def test_prep_powershell_cmd_no_powershell():
|
||||
with pytest.raises(CommandExecutionError):
|
||||
cmdmod._prep_powershell_cmd(
|
||||
win_shell="unk_bin", cmd="Some-Command", encoded_cmd=False
|
||||
)
|
||||
|
||||
|
||||
def test_prep_powershell_cmd():
|
||||
"""
|
||||
Tests _prep_powershell_cmd returns correct cmd
|
||||
"""
|
||||
with patch("salt.utils.platform.is_windows", MagicMock(return_value=False)):
|
||||
stack = [["", "", ""], ["", "", ""], ["", "", ""]]
|
||||
stack = [["", "", ""], ["", "", ""], ["", "", ""], ["", "", ""]]
|
||||
with patch("traceback.extract_stack", return_value=stack), patch(
|
||||
"salt.utils.path.which", return_value="C:\\powershell.exe"
|
||||
):
|
||||
ret = cmdmod._prep_powershell_cmd(
|
||||
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=False
|
||||
win_shell="powershell", cmd="$PSVersionTable", encoded_cmd=False
|
||||
)
|
||||
assert ret == 'powershell -NonInteractive -NoProfile -Command "$PSVersionTable"'
|
||||
expected = [
|
||||
"C:\\powershell.exe",
|
||||
"-NonInteractive",
|
||||
"-NoProfile",
|
||||
"-ExecutionPolicy",
|
||||
"Bypass",
|
||||
"-Command",
|
||||
"& {$PSVersionTable}",
|
||||
]
|
||||
assert ret == expected
|
||||
|
||||
|
||||
def test_prep_powershell_cmd_encoded():
|
||||
"""
|
||||
Tests _prep_powershell_cmd returns correct cmd when encoded_cmd=True
|
||||
"""
|
||||
stack = [["", "", ""], ["", "", ""], ["", "", ""], ["", "", ""]]
|
||||
# This is the encoded command for 'Write-Host "Encoded HOLO"'
|
||||
e_cmd = "VwByAGkAdABlAC0ASABvAHMAdAAgACIARQBuAGMAbwBkAGUAZAAgAEgATwBMAE8AIgA="
|
||||
with patch("traceback.extract_stack", return_value=stack), patch(
|
||||
"salt.utils.path.which", return_value="C:\\powershell.exe"
|
||||
):
|
||||
ret = cmdmod._prep_powershell_cmd(
|
||||
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=True
|
||||
)
|
||||
assert (
|
||||
ret
|
||||
== "powershell -NonInteractive -NoProfile -EncodedCommand $PSVersionTable"
|
||||
win_shell="powershell", cmd=e_cmd, encoded_cmd=True
|
||||
)
|
||||
expected = [
|
||||
"C:\\powershell.exe",
|
||||
"-NonInteractive",
|
||||
"-NoProfile",
|
||||
"-ExecutionPolicy",
|
||||
"Bypass",
|
||||
"-EncodedCommand",
|
||||
f"{e_cmd}",
|
||||
]
|
||||
assert ret == expected
|
||||
|
||||
stack = [["", "", ""], ["", "", "script"], ["", "", ""]]
|
||||
|
||||
def test_prep_powershell_cmd_script():
|
||||
"""
|
||||
Tests _prep_powershell_cmd returns correct cmd when called from cmd.script
|
||||
"""
|
||||
stack = [["", "", ""], ["", "", "script"], ["", "", ""], ["", "", ""]]
|
||||
script = r"C:\some\script.ps1"
|
||||
with patch("traceback.extract_stack", return_value=stack), patch(
|
||||
"salt.utils.path.which", return_value="C:\\powershell.exe"
|
||||
):
|
||||
ret = cmdmod._prep_powershell_cmd(
|
||||
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=False
|
||||
)
|
||||
assert (
|
||||
ret
|
||||
== "powershell -NonInteractive -NoProfile -ExecutionPolicy Bypass -Command $PSVersionTable"
|
||||
win_shell="powershell", cmd=script, encoded_cmd=False
|
||||
)
|
||||
expected = [
|
||||
"C:\\powershell.exe",
|
||||
"-NonInteractive",
|
||||
"-NoProfile",
|
||||
"-ExecutionPolicy",
|
||||
"Bypass",
|
||||
"-Command",
|
||||
f"& {script}",
|
||||
]
|
||||
assert ret == expected
|
||||
|
||||
with patch("salt.utils.platform.is_windows", MagicMock(return_value=True)):
|
||||
stack = [["", "", ""], ["", "", ""], ["", "", ""]]
|
||||
|
||||
ret = cmdmod._prep_powershell_cmd(
|
||||
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=False
|
||||
)
|
||||
assert (
|
||||
ret == '"powershell" -NonInteractive -NoProfile -Command "$PSVersionTable"'
|
||||
)
|
||||
|
||||
ret = cmdmod._prep_powershell_cmd(
|
||||
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=True
|
||||
)
|
||||
assert (
|
||||
ret
|
||||
== '"powershell" -NonInteractive -NoProfile -EncodedCommand $PSVersionTable'
|
||||
)
|
||||
|
||||
stack = [["", "", ""], ["", "", "script"], ["", "", ""]]
|
||||
ret = cmdmod._prep_powershell_cmd(
|
||||
shell="powershell", cmd="$PSVersionTable", stack=stack, encoded_cmd=False
|
||||
)
|
||||
assert (
|
||||
ret
|
||||
== '"powershell" -NonInteractive -NoProfile -ExecutionPolicy Bypass -Command $PSVersionTable'
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"text, expected",
|
||||
[
|
||||
("", '""'), # Should quote an empty string
|
||||
("Foo", '"Foo"'), # Should quote a string
|
||||
('["foo", "bar"]', '["foo", "bar"]'), # Should leave unchanged
|
||||
('{"foo": "bar"}', '{"foo": "bar"}'), # Should leave unchanged
|
||||
],
|
||||
)
|
||||
def test_prep_powershell_json(text, expected):
|
||||
"""
|
||||
Make sure the output is valid json
|
||||
"""
|
||||
result = cmdmod._prep_powershell_json(text)
|
||||
assert result == expected
|
||||
|
|
|
@ -103,6 +103,26 @@ def test_build_rule():
|
|||
"comment": "Successfully built rule",
|
||||
}
|
||||
|
||||
assert nftables.build_rule(
|
||||
table="filter",
|
||||
chain="input",
|
||||
family="ip6",
|
||||
command="insert",
|
||||
position="3",
|
||||
full="True",
|
||||
connstate="related,established",
|
||||
saddr="::/0",
|
||||
daddr="fe80:cafe::1",
|
||||
jump="accept",
|
||||
) == {
|
||||
"result": True,
|
||||
"rule": (
|
||||
"nft insert rule ip6 filter input position 3 ct state {"
|
||||
" related,established } ip6 saddr ::/0 ip6 daddr fe80:cafe::1 accept"
|
||||
),
|
||||
"comment": "Successfully built rule",
|
||||
}
|
||||
|
||||
assert nftables.build_rule() == {"result": True, "rule": "", "comment": ""}
|
||||
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import datetime
|
||||
import ipaddress
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
|
@ -11,6 +11,9 @@ cryptography = pytest.importorskip(
|
|||
"cryptography", reason="Needs cryptography library", minversion="37.0"
|
||||
)
|
||||
cx509 = pytest.importorskip("cryptography.x509", reason="Needs cryptography library")
|
||||
cprim = pytest.importorskip(
|
||||
"cryptography.hazmat.primitives", reason="Needs cryptography library"
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -1019,12 +1022,12 @@ class TestCreateExtension:
|
|||
[
|
||||
(
|
||||
"critical, 2022-10-11 13:37:42",
|
||||
datetime.datetime.strptime("2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S"),
|
||||
datetime.strptime("2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S"),
|
||||
True,
|
||||
),
|
||||
(
|
||||
"2022-10-11 13:37:42",
|
||||
datetime.datetime.strptime("2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S"),
|
||||
datetime.strptime("2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S"),
|
||||
False,
|
||||
),
|
||||
],
|
||||
|
@ -1875,9 +1878,7 @@ def test_get_dn(inpt, expected):
|
|||
cx509.Extension(
|
||||
cx509.InvalidityDate.oid,
|
||||
value=cx509.InvalidityDate(
|
||||
datetime.datetime.strptime(
|
||||
"2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
datetime.strptime("2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S")
|
||||
),
|
||||
critical=False,
|
||||
),
|
||||
|
@ -1888,3 +1889,86 @@ def test_get_dn(inpt, expected):
|
|||
def test_render_extension(inpt, expected):
|
||||
ret = x509.render_extension(inpt)
|
||||
assert ret == expected
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ca_cert():
|
||||
return """\
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIIDODCCAiCgAwIBAgIIbfpgqP0VGPgwDQYJKoZIhvcNAQELBQAwKzELMAkGA1UE
|
||||
BhMCVVMxDTALBgNVBAMMBFRlc3QxDTALBgNVBAoMBFNhbHQwHhcNMjIxMTE1MTQw
|
||||
NDMzWhcNMzIxMTEyMTQwNDMzWjArMQswCQYDVQQGEwJVUzENMAsGA1UEAwwEVGVz
|
||||
dDENMAsGA1UECgwEU2FsdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
|
||||
AOGTScvrjcEt6vsJcG9RUp6fKaDNDWZnJET0omanK9ZwaoGpJPp8UDYe/8ADeI7N
|
||||
10wdyB4oDM9gRDjInBtdQO/PsrmKZF6LzqVFgLMxu2up+PHMi9z6B2P4esIAzMu9
|
||||
PYxc9zH4HzLImHqscVD2HCabsjp9X134Af7hVY5NN/W/4qTP7uOM20wSG2TPI6+B
|
||||
tA9VyPbEPMPRzXzrqc45rVYe6kb2bT84GE93Vcu/e5JZ/k2AKD8Hoa2cxLPsTLq5
|
||||
igl+D+k+dfUtiABiKPvVQiYBsD1fyHDn2m7B6pCgvrGqHjsoAKufgFnXy6PJRg7n
|
||||
vQfaxSiusM5s+VS+fjlvgwsCAwEAAaNgMF4wDwYDVR0TBAgwBgEB/wIBATALBgNV
|
||||
HQ8EBAMCAQYwHQYDVR0OBBYEFFzy8fRTKSOe7kBakqO0Ki71potnMB8GA1UdIwQY
|
||||
MBaAFFzy8fRTKSOe7kBakqO0Ki71potnMA0GCSqGSIb3DQEBCwUAA4IBAQBZS4MP
|
||||
fXYPoGZ66seM+0eikScZHirbRe8vHxHkujnTBUjQITKm86WeQgeBCD2pobgBGZtt
|
||||
5YFozM4cERqY7/1BdemUxFvPmMFFznt0TM5w+DfGWVK8un6SYwHnmBbnkWgX4Srm
|
||||
GsL0HHWxVXkGnFGFk6Sbo3vnN7CpkpQTWFqeQQ5rHOw91pt7KnNZwc6I3ZjrCUHJ
|
||||
+UmKKrga16a4Q+8FBpYdphQU609npo/0zuaE6FyiJYlW3tG+mlbbNgzY/+eUaxt2
|
||||
9Bp9mtA+Hkox551Mfpq45Oi+ehwMt0xjZCjuFCM78oiUdHCGO+EmcT7ogiYALiOF
|
||||
LN1w5sybsYwIw6QN
|
||||
-----END CERTIFICATE-----
|
||||
"""
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ca_key():
|
||||
return """\
|
||||
-----BEGIN RSA PRIVATE KEY-----
|
||||
MIIEowIBAAKCAQEA4ZNJy+uNwS3q+wlwb1FSnp8poM0NZmckRPSiZqcr1nBqgakk
|
||||
+nxQNh7/wAN4js3XTB3IHigMz2BEOMicG11A78+yuYpkXovOpUWAszG7a6n48cyL
|
||||
3PoHY/h6wgDMy709jFz3MfgfMsiYeqxxUPYcJpuyOn1fXfgB/uFVjk039b/ipM/u
|
||||
44zbTBIbZM8jr4G0D1XI9sQ8w9HNfOupzjmtVh7qRvZtPzgYT3dVy797kln+TYAo
|
||||
PwehrZzEs+xMurmKCX4P6T519S2IAGIo+9VCJgGwPV/IcOfabsHqkKC+saoeOygA
|
||||
q5+AWdfLo8lGDue9B9rFKK6wzmz5VL5+OW+DCwIDAQABAoIBAFfImc9hu6iR1gAb
|
||||
jEXFwAE6r1iEc9KGEPdEvG52X/jzhn8u89UGy7BEIAL5VtE8Caz1agtSSqnpLKNs
|
||||
blO31q18hnDuCmFAxwpKIeuaTvV3EAoJL+Su6HFfIWaeKRSgcHNPOmOXy4xXw/75
|
||||
XJ/FJu9fZ9ybLaHEAgLObh0Sr9RSPQbZ72ZawPP8+5WCbR+2w90RApHXQL0piSbW
|
||||
lIx1NE6o5wQb3vik8z/k5FqLCY2a8++WNyfvS+WWFY5WXGI7ZiDDQk46gnslquH2
|
||||
Lon5CEn3JlTGQFhxaaa2ivssscf2lA2Rvm2E8o1rdZJS2OpSE0ai4TXY9XnyjZj1
|
||||
5usWIwECgYEA+3Mwu03A7PyLEBksS/u3MSo/176S9lF/uXcecQNdhAIalUZ8AgV3
|
||||
7HP2yI9ZC0ekA809ZzFjGFostXm9VfUOEZ549jLOMzvBtCdaI0aBUE8icu52fX4r
|
||||
fT2NY6hYgz5/fxD8sq1XH/fqNNexABwtViH6YAly/9A1/8M3BOWt72UCgYEA5ag8
|
||||
sIfiBUoWd1sS6qHDuugWlpx4ZWYC/59XEJyCN2wioP8qFji/aNZxF1wLfyQe/zaa
|
||||
YBFusjsBnSfBU1p4UKCRHWQ9/CnC0DzqTkyKC4Fv8GuxgywNm5W9gPKk7idHP7mw
|
||||
e+7Uvf1pOQccqEPh7yltpW+Xw27gfsC2DMAIGa8CgYByv/q5P56PiCCeVB6W/mR3
|
||||
l2RTPLEsn7y+EtJdmL+QgrVG8kedVImJ6tHwbRqhvyvmYD9pXGxwrJZCqy/wjkjB
|
||||
WaSyFjVrxBV99Yd5Ga/hyntaH+ELHA0UtoZTuHvMSTU9866ei+R6vlSvkM9B0ZoO
|
||||
+KqeMTG99HLwKVJudbKO0QKBgQCd33U49XBOqoufKSBr4yAmUH2Ws6GgMuxExUiY
|
||||
xr5NUyzK+B36gLA0ZZYAtOnCURZt4x9kgxdRtnZ5jma74ilrY7XeOpbRzfN6KyX3
|
||||
BW6wUh6da6rvvUztc5Z+Gk9+18mG6SOFTr04jgfTiCwPD/s06YnSfFAbrRDukZOU
|
||||
WD45SQKBgBvjSwl3AbPoJnRjZjGuCUMKQKrLm30xCeorxasu+di/4YV5Yd8VUjaO
|
||||
mYyqXW6bQndKLuXT+AXtCd/Xt2sI96z8mc0G5fImDUxQjMUuS3RyQK357cEOu8Zy
|
||||
HdI7Pfaf/l0HozAw/Al+LXbpmSBdfmz0U/EGAKRqXMW5+vQ7XHXD
|
||||
-----END RSA PRIVATE KEY-----"""
|
||||
|
||||
|
||||
def test_build_crl_accounts_for_local_time_zone(ca_key, ca_cert):
|
||||
curr_time = datetime.now(tz=timezone(timedelta(hours=1)))
|
||||
curr_time_naive = curr_time.replace(tzinfo=None)
|
||||
|
||||
def dtn(tz=None):
|
||||
if tz is None:
|
||||
return curr_time_naive
|
||||
return curr_time
|
||||
|
||||
curr_time_utc = curr_time.astimezone(timezone.utc).replace(microsecond=0)
|
||||
curr_time_utc_naive = curr_time_utc.replace(tzinfo=None)
|
||||
privkey = cprim.serialization.load_pem_private_key(ca_key.encode(), password=None)
|
||||
cert = cx509.load_pem_x509_certificate(ca_cert.encode())
|
||||
with patch("salt.utils.x509.datetime") as fakedate:
|
||||
fakedate.today.return_value = curr_time_naive
|
||||
fakedate.now.side_effect = dtn
|
||||
fakedate.utcnow.return_value = curr_time_utc_naive
|
||||
builder, _ = x509.build_crl(privkey, [], signing_cert=cert)
|
||||
crl = builder.sign(privkey, algorithm=cprim.hashes.SHA256())
|
||||
try:
|
||||
assert crl.last_update_utc == curr_time_utc
|
||||
except AttributeError:
|
||||
assert crl.last_update == curr_time_utc_naive
|
||||
|
|
|
@ -2,7 +2,6 @@ import pytest
|
|||
|
||||
import salt.utils.win_lgpo_netsh as win_lgpo_netsh
|
||||
from salt.exceptions import CommandExecutionError
|
||||
from tests.support.mock import patch
|
||||
|
||||
pytestmark = [
|
||||
pytest.mark.windows_whitelisted,
|
||||
|
@ -26,18 +25,6 @@ def test_get_settings_firewallpolicy_lgpo():
|
|||
assert "Outbound" in ret
|
||||
|
||||
|
||||
def test_get_settings_firewallpolicy_lgpo_issue_57591():
|
||||
"""
|
||||
Should not stacktrace when the hostname contains unicode characters
|
||||
"""
|
||||
with patch.object(win_lgpo_netsh, "__hostname__", return_value="kомпьютер"):
|
||||
ret = win_lgpo_netsh.get_settings(
|
||||
profile="domain", section="firewallpolicy", store="lgpo"
|
||||
)
|
||||
assert "Inbound" in ret
|
||||
assert "Outbound" in ret
|
||||
|
||||
|
||||
def test_get_settings_logging_local():
|
||||
ret = win_lgpo_netsh.get_settings(
|
||||
profile="domain", section="logging", store="local"
|
||||
|
@ -63,7 +50,6 @@ def test_get_settings_settings_local():
|
|||
assert "InboundUserNotification" in ret
|
||||
assert "LocalConSecRules" in ret
|
||||
assert "LocalFirewallRules" in ret
|
||||
assert "RemoteManagement" in ret
|
||||
assert "UnicastResponseToMulticast" in ret
|
||||
|
||||
|
||||
|
@ -74,7 +60,6 @@ def test_get_settings_settings_lgpo():
|
|||
assert "InboundUserNotification" in ret
|
||||
assert "LocalConSecRules" in ret
|
||||
assert "LocalFirewallRules" in ret
|
||||
assert "RemoteManagement" in ret
|
||||
assert "UnicastResponseToMulticast" in ret
|
||||
|
||||
|
||||
|
@ -99,7 +84,6 @@ def test_get_all_settings_local():
|
|||
assert "InboundUserNotification" in ret
|
||||
assert "LocalConSecRules" in ret
|
||||
assert "LocalFirewallRules" in ret
|
||||
assert "RemoteManagement" in ret
|
||||
assert "UnicastResponseToMulticast" in ret
|
||||
assert "State" in ret
|
||||
|
||||
|
@ -115,7 +99,6 @@ def test_get_all_settings_lgpo():
|
|||
assert "InboundUserNotification" in ret
|
||||
assert "LocalConSecRules" in ret
|
||||
assert "LocalFirewallRules" in ret
|
||||
assert "RemoteManagement" in ret
|
||||
assert "UnicastResponseToMulticast" in ret
|
||||
assert "State" in ret
|
||||
|
||||
|
@ -356,7 +339,7 @@ def test_set_firewall_logging_maxfilesize_local():
|
|||
new = win_lgpo_netsh.get_settings(
|
||||
profile="domain", section="logging", store="local"
|
||||
)["MaxFileSize"]
|
||||
assert new == "16384"
|
||||
assert new == 16384
|
||||
finally:
|
||||
ret = win_lgpo_netsh.set_logging_settings(
|
||||
profile="domain", setting="maxfilesize", value=current, store="local"
|
||||
|
@ -491,32 +474,6 @@ def test_set_firewall_settings_notification_lgpo_notconfigured():
|
|||
assert ret is True
|
||||
|
||||
|
||||
def test_set_firewall_settings_remotemgmt_local_enable():
|
||||
current = win_lgpo_netsh.get_settings(
|
||||
profile="domain", section="settings", store="local"
|
||||
)["RemoteManagement"]
|
||||
try:
|
||||
ret = win_lgpo_netsh.set_settings(
|
||||
profile="domain",
|
||||
setting="remotemanagement",
|
||||
value="enable",
|
||||
store="local",
|
||||
)
|
||||
assert ret is True
|
||||
new = win_lgpo_netsh.get_settings(
|
||||
profile="domain", section="settings", store="local"
|
||||
)["RemoteManagement"]
|
||||
assert new == "Enable"
|
||||
finally:
|
||||
ret = win_lgpo_netsh.set_settings(
|
||||
profile="domain",
|
||||
setting="remotemanagement",
|
||||
value=current,
|
||||
store="local",
|
||||
)
|
||||
assert ret is True
|
||||
|
||||
|
||||
def test_set_firewall_settings_unicast_local_disable():
|
||||
current = win_lgpo_netsh.get_settings(
|
||||
profile="domain", section="settings", store="local"
|
||||
|
@ -566,13 +523,16 @@ def test_set_firewall_state_local_notconfigured():
|
|||
profile="domain", section="state", store="local"
|
||||
)["State"]
|
||||
try:
|
||||
pytest.raises(
|
||||
CommandExecutionError,
|
||||
win_lgpo_netsh.set_state,
|
||||
ret = win_lgpo_netsh.set_state(
|
||||
profile="domain",
|
||||
state="notconfigured",
|
||||
store="local",
|
||||
)
|
||||
assert ret is True
|
||||
new = win_lgpo_netsh.get_settings(
|
||||
profile="domain", section="state", store="local"
|
||||
)["State"]
|
||||
assert new == "NotConfigured"
|
||||
finally:
|
||||
ret = win_lgpo_netsh.set_state(profile="domain", state=current, store="local")
|
||||
assert ret is True
|
||||
|
|
|
@ -330,6 +330,11 @@ class TestAccount:
|
|||
ret = self.sminion.functions.user.add(self.username)
|
||||
assert ret is True
|
||||
self._delete_account = True
|
||||
if salt.utils.platform.is_windows():
|
||||
log.debug("Configuring system account: %s", self)
|
||||
ret = self.sminion.functions.user.update(
|
||||
self.username, password_never_expires=True
|
||||
)
|
||||
if salt.utils.platform.is_darwin() or salt.utils.platform.is_windows():
|
||||
password = self.password
|
||||
else:
|
||||
|
|
|
@ -361,7 +361,6 @@ MISSING_DOCSTRINGS = {
|
|||
"machine_get_machinestate_tuple",
|
||||
],
|
||||
"salt/utils/win_osinfo.py": ["get_os_version_info"],
|
||||
"salt/utils/win_runas.py": ["split_username"],
|
||||
"salt/utils/yamldumper.py": [
|
||||
"represent_undefined",
|
||||
"represent_ordereddict",
|
||||
|
|
Loading…
Add table
Reference in a new issue