From 344a3d8c2fedcbaf6924c16821a96c98e95d7108 Mon Sep 17 00:00:00 2001 From: Shane Lee Date: Tue, 23 Apr 2024 11:04:23 -0600 Subject: [PATCH] Use Powershell instead of netsh for firewall settings --- changelog/61534.fixed.md | 2 + salt/utils/win_lgpo_netsh.py | 434 ++++++++++++------ salt/utils/win_pwsh.py | 67 +++ .../pytests/unit/utils/win_lgpo/test_netsh.py | 41 +- 4 files changed, 363 insertions(+), 181 deletions(-) create mode 100644 changelog/61534.fixed.md create mode 100644 salt/utils/win_pwsh.py diff --git a/changelog/61534.fixed.md b/changelog/61534.fixed.md new file mode 100644 index 00000000000..ed6c4401140 --- /dev/null +++ b/changelog/61534.fixed.md @@ -0,0 +1,2 @@ +Fixed the win_lgpo_netsh salt util to handle non-English systems. This was a +rewrite to use PowerShell instead of netsh to make the changes on the system diff --git a/salt/utils/win_lgpo_netsh.py b/salt/utils/win_lgpo_netsh.py index 30ea51fc10d..1a0ab75911e 100644 --- a/salt/utils/win_lgpo_netsh.py +++ b/salt/utils/win_lgpo_netsh.py @@ -6,16 +6,24 @@ A salt util for modifying firewall settings. This util allows you to modify firewall settings in the local group policy in addition to the normal firewall settings. Parameters are taken from the -netsh advfirewall prompt. +netsh advfirewall prompt. This utility has been adapted to use powershell +instead of the ``netsh`` command to make it compatible with non-English systems. +It maintains the ``netsh`` commands and parameters, but it is using powershell +under the hood. + +.. versionchanged:: 3008.0 .. note:: More information can be found in the advfirewall context in netsh. This can - be access by opening a netsh prompt. At a command prompt type the following: + be accessed by opening a netsh prompt. At a command prompt type the + following: - c:\>netsh - netsh>advfirewall - netsh advfirewall>set help - netsh advfirewall>set domain help + .. code-block:: powershell + + c:\>netsh + netsh>advfirewall + netsh advfirewall>set help + netsh advfirewall>set domain help Usage: @@ -66,19 +74,40 @@ Usage: store='lgpo') """ -import logging -import os -import re import socket -import tempfile -from textwrap import dedent -import salt.modules.cmdmod +import salt.utils.platform +import salt.utils.win_pwsh from salt.exceptions import CommandExecutionError -log = logging.getLogger(__name__) -__hostname__ = socket.gethostname() +ON_OFF = { + 0: "OFF", + 1: "ON", + 2: "NotConfigured", + "off": "False", + "on": "True", + "notconfigured": "NotConfigured", +} + +ENABLE_DISABLE = { + 0: "Disable", + 1: "Enable", + 2: "NotConfigured", + "disable": 0, + "enable": 1, + "notconfigured": 2, +} +OUTBOUND = { + 0: "NotConfigured", + 2: "AllowOutbound", + 4: "BlockOutbound", + "notconfigured": "NotConfigured", + "allowoutbound": "Allow", + "blockoutbound": "Block", +} + __virtualname__ = "netsh" +__hostname__ = socket.gethostname() # Although utils are often directly imported, it is also possible to use the @@ -93,60 +122,42 @@ def __virtual__(): return __virtualname__ -def _netsh_file(content): +def _get_inbound_text(rule, action): """ - helper function to get the results of ``netsh -f content.txt`` + The "Inbound connections" setting is a combination of 2 parameters: - Running ``netsh`` will drop you into a ``netsh`` prompt where you can issue - ``netsh`` commands. You can put a series of commands in an external file and - run them as if from a ``netsh`` prompt using the ``-f`` switch. That's what - this function does. + - AllowInboundRules + - DefaultInboundAction - Args: + The settings are as follows: - content (str): - The contents of the file that will be run by the ``netsh -f`` - command - - Returns: - str: The text returned by the netsh command + Rules Action + 2 2 AllowInbound + 2 4 BlockInbound + 0 4 BlockInboundAlways + 2 0 NotConfigured """ - with tempfile.NamedTemporaryFile( - mode="w", prefix="salt-", suffix=".netsh", delete=False, encoding="utf-8" - ) as fp: - fp.write(content) - try: - log.debug("%s:\n%s", fp.name, content) - return salt.modules.cmdmod.run(f"netsh -f {fp.name}", python_shell=True) - finally: - os.remove(fp.name) + settings = { + 0: { + 4: "BlockInboundAlways", + }, + 2: { + 0: "NotConfigured", + 2: "AllowInbound", + 4: "BlockInbound", + }, + } + return settings[rule][action] -def _netsh_command(command, store): - if store.lower() not in ("local", "lgpo"): - raise ValueError(f"Incorrect store: {store}") - # set the store for local or lgpo - if store.lower() == "local": - netsh_script = dedent( - """\ - advfirewall - set store local - {} - """.format( - command - ) - ) - else: - netsh_script = dedent( - """\ - advfirewall - set store gpo = {} - {} - """.format( - __hostname__, command - ) - ) - return _netsh_file(content=netsh_script).splitlines() +def _get_inbound_settings(text): + settings = { + "allowinbound": (2, 2), + "blockinbound": (2, 4), + "blockinboundalways": (0, 4), + "notconfigured": (2, 0), + } + return settings[text.lower()] def get_settings(profile, section, store="local"): @@ -195,70 +206,54 @@ def get_settings(profile, section, store="local"): raise ValueError(f"Incorrect section: {section}") if store.lower() not in ("local", "lgpo"): raise ValueError(f"Incorrect store: {store}") - command = f"show {profile}profile {section}" - # run it - results = _netsh_command(command=command, store=store) - # sample output: - # Domain Profile Settings: - # ---------------------------------------------------------------------- - # LocalFirewallRules N/A (GPO-store only) - # LocalConSecRules N/A (GPO-store only) - # InboundUserNotification Disable - # RemoteManagement Disable - # UnicastResponseToMulticast Enable - # if it's less than 3 lines it failed - if len(results) < 3: - raise CommandExecutionError(f"Invalid results: {results}") - ret = {} - # Skip the first 2 lines. Add everything else to a dictionary - for line in results[3:]: - ret.update(dict(list(zip(*[iter(re.split(r"\s{2,}", line))] * 2)))) + # Build the powershell command + cmd = ["Get-NetFirewallProfile"] + if profile: + cmd.append(profile) + if store and store.lower() == "lgpo": + cmd.extend(["-PolicyStore", "localhost"]) - # Remove spaces from the values so that `Not Configured` is detected - # correctly - for item in ret: - ret[item] = ret[item].replace(" ", "") + # Run the command + settings = salt.utils.win_pwsh.run_dict(cmd) - # special handling for firewallpolicy - if section == "firewallpolicy": - inbound, outbound = ret["Firewall Policy"].split(",") - return {"Inbound": inbound, "Outbound": outbound} + # A successful run should return a dictionary + if not settings: + raise CommandExecutionError("LGPO NETSH: An unknown error occurred") - return ret + # Remove the junk + for setting in list(settings.keys()): + if setting.startswith("Cim"): + settings.pop(setting) + # Make it look like netsh output + ret_settings = { + "firewallpolicy": { + "Inbound": _get_inbound_text( + settings["AllowInboundRules"], settings["DefaultInboundAction"] + ), + "Outbound": OUTBOUND[settings["DefaultOutboundAction"]], + }, + "state": { + "State": ON_OFF[settings["Enabled"]], + }, + "logging": { + "FileName": settings["LogFileName"], + "LogAllowedConnections": ENABLE_DISABLE[settings["LogAllowed"]], + "LogDroppedConnections": ENABLE_DISABLE[settings["LogBlocked"]], + "MaxFileSize": settings["LogMaxSizeKilobytes"], + }, + "settings": { + "InboundUserNotification": ENABLE_DISABLE[settings["NotifyOnListen"]], + "LocalConSecRules": ENABLE_DISABLE[settings["AllowLocalIPsecRules"]], + "LocalFirewallRules": ENABLE_DISABLE[settings["AllowLocalFirewallRules"]], + "UnicastResponseToMulticast": ENABLE_DISABLE[ + settings["AllowUnicastResponseToMulticast"] + ], + }, + } -def get_all_settings(profile, store="local"): - """ - Gets all the properties for the specified profile in the specified store - - Args: - - profile (str): - The firewall profile to query. Valid options are: - - - domain - - public - - private - - store (str): - The store to use. This is either the local firewall policy or the - policy defined by local group policy. Valid options are: - - - lgpo - - local - - Default is ``local`` - - Returns: - dict: A dictionary containing the specified settings - """ - ret = dict() - ret.update(get_settings(profile=profile, section="state", store=store)) - ret.update(get_settings(profile=profile, section="firewallpolicy", store=store)) - ret.update(get_settings(profile=profile, section="settings", store=store)) - ret.update(get_settings(profile=profile, section="logging", store=store)) - return ret + return ret_settings[section.lower()] def get_all_profiles(store="local"): @@ -286,6 +281,82 @@ def get_all_profiles(store="local"): } +def get_all_settings(profile, store="local"): + """ + Gets all the properties for the specified profile in the specified store + + Args: + + profile (str): + The firewall profile to query. Valid options are: + + - domain + - public + - private + + store (str): + The store to use. This is either the local firewall policy or the + policy defined by local group policy. Valid options are: + + - lgpo + - local + + Default is ``local`` + + Returns: + dict: A dictionary containing the specified settings + + Raises: + CommandExecutionError: If an error occurs + ValueError: If the parameters are incorrect + """ + # validate input + if profile.lower() not in ("domain", "public", "private"): + raise ValueError(f"Incorrect profile: {profile}") + if store.lower() not in ("local", "lgpo"): + raise ValueError(f"Incorrect store: {store}") + + # Build the powershell command + cmd = ["Get-NetFirewallProfile"] + if profile: + cmd.append(profile) + if store and store.lower() == "lgpo": + cmd.extend(["-PolicyStore", "localhost"]) + + # Run the command + settings = salt.utils.win_pwsh.run_dict(cmd) + + # A successful run should return a dictionary + if not settings: + raise CommandExecutionError("LGPO NETSH: An unknown error occurred") + + # Remove the junk + for setting in list(settings.keys()): + if setting.startswith("Cim"): + settings.pop(setting) + + # Make it look like netsh output + ret_settings = { + "FileName": settings["LogFileName"], + "Inbound": _get_inbound_text( + settings["AllowInboundRules"], settings["DefaultInboundAction"] + ), + "InboundUserNotification": ENABLE_DISABLE[settings["NotifyOnListen"]], + "LocalConSecRules": ENABLE_DISABLE[settings["AllowLocalIPsecRules"]], + "LocalFirewallRules": ENABLE_DISABLE[settings["AllowLocalFirewallRules"]], + "LogAllowedConnections": ENABLE_DISABLE[settings["LogAllowed"]], + "LogDroppedConnections": ENABLE_DISABLE[settings["LogBlocked"]], + "MaxFileSize": settings["LogMaxSizeKilobytes"], + "Outbound": OUTBOUND[settings["DefaultOutboundAction"]], + "State": ON_OFF[settings["Enabled"]], + "UnicastResponseToMulticast": ON_OFF[ + settings["AllowUnicastResponseToMulticast"] + ], + } + + return ret_settings + + def set_firewall_settings(profile, inbound=None, outbound=None, store="local"): """ Set the firewall inbound/outbound settings for the specified profile and @@ -307,7 +378,7 @@ def set_firewall_settings(profile, inbound=None, outbound=None, store="local"): - blockinbound - blockinboundalways - allowinbound - - notconfigured + - notconfigured <=== lgpo only Default is ``None`` @@ -317,7 +388,7 @@ def set_firewall_settings(profile, inbound=None, outbound=None, store="local"): - allowoutbound - blockoutbound - - notconfigured + - notconfigured <=== lgpo only Default is ``None`` @@ -355,21 +426,34 @@ def set_firewall_settings(profile, inbound=None, outbound=None, store="local"): raise ValueError(f"Incorrect outbound value: {outbound}") if not inbound and not outbound: raise ValueError("Must set inbound or outbound") + if store == "local": + if inbound and inbound.lower() == "notconfigured": + msg = "Cannot set local inbound policies as NotConfigured" + raise CommandExecutionError(msg) + if outbound and outbound.lower() == "notconfigured": + msg = "Cannot set local outbound policies as NotConfigured" + raise CommandExecutionError(msg) - # You have to specify inbound and outbound setting at the same time - # If you're only specifying one, you have to get the current setting for the - # other - if not inbound or not outbound: - ret = get_settings(profile=profile, section="firewallpolicy", store=store) - if not inbound: - inbound = ret["Inbound"] - if not outbound: - outbound = ret["Outbound"] + # Build the powershell command + cmd = ["Set-NetFirewallProfile"] + if profile: + cmd.append(profile) + if store and store.lower() == "lgpo": + cmd.extend(["-PolicyStore", "localhost"]) - command = f"set {profile}profile firewallpolicy {inbound},{outbound}" + # Get inbound settings + if inbound: + in_rule, in_action = _get_inbound_settings(inbound.lower()) + cmd.extend(["-AllowInboundRules", in_rule, "-DefaultInboundAction", in_action]) - results = _netsh_command(command=command, store=store) + if outbound: + out_rule = OUTBOUND[outbound.lower()] + cmd.extend(["-DefaultOutboundAction", out_rule]) + # Run the command + results = salt.utils.win_pwsh.run_dict(cmd) + + # A successful run should return an empty list if results: raise CommandExecutionError(f"An error occurred: {results}") @@ -442,6 +526,10 @@ def set_logging_settings(profile, setting, value, store="local"): # Input validation if profile.lower() not in ("domain", "public", "private"): raise ValueError(f"Incorrect profile: {profile}") + if store == "local": + if str(value).lower() == "notconfigured": + msg = "Cannot set local policies as NotConfigured" + raise CommandExecutionError(msg) if setting.lower() not in ( "allowedconnections", "droppedconnections", @@ -449,13 +537,21 @@ def set_logging_settings(profile, setting, value, store="local"): "maxfilesize", ): raise ValueError(f"Incorrect setting: {setting}") + settings = {"filename": ["-LogFileName", value]} if setting.lower() in ("allowedconnections", "droppedconnections"): if value.lower() not in ("enable", "disable", "notconfigured"): raise ValueError(f"Incorrect value: {value}") + settings.update( + { + "allowedconnections": ["-LogAllowed", ENABLE_DISABLE[value.lower()]], + "droppedconnections": ["-LogBlocked", ENABLE_DISABLE[value.lower()]], + } + ) + # TODO: Consider adding something like the following to validate filename # https://stackoverflow.com/questions/9532499/check-whether-a-path-is-valid-in-python-without-creating-a-file-at-the-paths-ta if setting.lower() == "maxfilesize": - if value.lower() != "notconfigured": + if str(value).lower() != "notconfigured": # Must be a number between 1 and 32767 try: int(value) @@ -463,9 +559,18 @@ def set_logging_settings(profile, setting, value, store="local"): raise ValueError(f"Incorrect value: {value}") if not 1 <= int(value) <= 32767: raise ValueError(f"Incorrect value: {value}") - # Run the command - command = f"set {profile}profile logging {setting} {value}" - results = _netsh_command(command=command, store=store) + settings.update({"maxfilesize": ["-LogMaxSizeKilobytes", value]}) + + # Build the powershell command + cmd = ["Set-NetFirewallProfile"] + if profile: + cmd.append(profile) + if store and store.lower() == "lgpo": + cmd.extend(["-PolicyStore", "localhost"]) + + cmd.extend(settings[setting.lower()]) + + results = salt.utils.win_pwsh.run_dict(cmd) # A successful run should return an empty list if results: @@ -493,7 +598,6 @@ def set_settings(profile, setting, value, store="local"): - localfirewallrules - localconsecrules - inboundusernotification - - remotemanagement - unicastresponsetomulticast value (str): @@ -526,16 +630,42 @@ def set_settings(profile, setting, value, store="local"): "localfirewallrules", "localconsecrules", "inboundusernotification", - "remotemanagement", "unicastresponsetomulticast", ): raise ValueError(f"Incorrect setting: {setting}") if value.lower() not in ("enable", "disable", "notconfigured"): raise ValueError(f"Incorrect value: {value}") + if setting.lower() in ["localfirewallrules", "localconsecrules"]: + if store.lower() != "lgpo": + msg = f"{setting} can only be set using Group Policy" + raise CommandExecutionError(msg) + if setting.lower() == "inboundusernotification" and store.lower() != "lgpo": + if value.lower() == "notconfigured": + msg = "NotConfigured is only valid when setting group policy" + raise CommandExecutionError(msg) - # Run the command - command = f"set {profile}profile settings {setting} {value}" - results = _netsh_command(command=command, store=store) + # Build the powershell command + cmd = ["Set-NetFirewallProfile"] + if profile: + cmd.append(profile) + if store and store.lower() == "lgpo": + cmd.extend(["-PolicyStore", "localhost"]) + + settings = { + "localfirewallrules": [ + "-AllowLocalFirewallRules", + ENABLE_DISABLE[value.lower()], + ], + "localconsecrules": ["-AllowLocalIPsecRules", ENABLE_DISABLE[value.lower()]], + "inboundusernotification": ["-NotifyOnListen", ENABLE_DISABLE[value.lower()]], + "unicastresponsetomulticast": [ + "-AllowUnicastResponseToMulticast", + ENABLE_DISABLE[value.lower()], + ], + } + cmd.extend(settings[setting.lower()]) + + results = salt.utils.win_pwsh.run_dict(cmd) # A successful run should return an empty list if results: @@ -546,7 +676,7 @@ def set_settings(profile, setting, value, store="local"): def set_state(profile, state, store="local"): """ - Configure the firewall state. + Enable or disable the firewall profile. Args: @@ -583,12 +713,22 @@ def set_state(profile, state, store="local"): # Input validation if profile.lower() not in ("domain", "public", "private"): raise ValueError(f"Incorrect profile: {profile}") - if state.lower() not in ("on", "off", "notconfigured"): - raise ValueError(f"Incorrect state: {state}") + if not isinstance(state, bool): + if state.lower() not in ("on", "off", "notconfigured"): + raise ValueError(f"Incorrect state: {state}") + else: + state = "On" if state else "Off" - # Run the command - command = f"set {profile}profile state {state}" - results = _netsh_command(command=command, store=store) + # Build the powershell command + cmd = ["Set-NetFirewallProfile"] + if profile: + cmd.append(profile) + if store and store.lower() == "lgpo": + cmd.extend(["-PolicyStore", "localhost"]) + + cmd.extend(["-Enabled", ON_OFF[state.lower()]]) + + results = salt.utils.win_pwsh.run_dict(cmd) # A successful run should return an empty list if results: diff --git a/salt/utils/win_pwsh.py b/salt/utils/win_pwsh.py new file mode 100644 index 00000000000..7cf119165fa --- /dev/null +++ b/salt/utils/win_pwsh.py @@ -0,0 +1,67 @@ +import salt.modules.cmdmod +import salt.utils.json +import salt.utils.platform +from salt.exceptions import CommandExecutionError + +__virtualname__ = "win_pwsh" + + +def __virtual__(): + """ + Only load if windows + """ + if not salt.utils.platform.is_windows(): + return False, "This utility will only run on Windows" + + return __virtualname__ + + +def run_dict(cmd, cwd=None): + """ + Execute the powershell command and return the data as a dictionary + + .. versionadded:: 3006.9 + + Args: + + cmd (str,list): The powershell command to run + + cwd (str): The current working directory + + Returns: + dict: A dictionary containing the output of the powershell command + + Raises: + CommandExecutionError: + If an error is encountered or the command does not complete + successfully + """ + if isinstance(cmd, list): + cmd = " ".join(map(str, cmd)) + if "convertto-json" not in cmd.lower(): + cmd = f"{cmd} | ConvertTo-Json" + if "progresspreference" not in cmd.lower(): + cmd = f"$ProgressPreference = 'SilentlyContinue'; {cmd}" + ret = salt.modules.cmdmod.run_all(cmd=cmd, shell="powershell", cwd=cwd) + + if "pid" in ret: + del ret["pid"] + + if ret.get("stderr", ""): + error = ret["stderr"].splitlines()[0] + raise CommandExecutionError(error, info=ret) + + if "retcode" not in ret or ret["retcode"] != 0: + # run_all logs an error to log.error, fail hard back to the user + raise CommandExecutionError("Issue executing PowerShell cmd", info=ret) + + # Sometimes Powershell returns an empty string, which isn't valid JSON + if ret["stdout"] == "": + ret["stdout"] = "{}" + + try: + ret = salt.utils.json.loads(ret["stdout"], strict=False) + except ValueError: + raise CommandExecutionError("No JSON results from PowerShell", info=ret) + + return ret diff --git a/tests/pytests/unit/utils/win_lgpo/test_netsh.py b/tests/pytests/unit/utils/win_lgpo/test_netsh.py index e77b64fd61a..f6785d498ff 100644 --- a/tests/pytests/unit/utils/win_lgpo/test_netsh.py +++ b/tests/pytests/unit/utils/win_lgpo/test_netsh.py @@ -63,7 +63,6 @@ def test_get_settings_settings_local(): assert "InboundUserNotification" in ret assert "LocalConSecRules" in ret assert "LocalFirewallRules" in ret - assert "RemoteManagement" in ret assert "UnicastResponseToMulticast" in ret @@ -74,7 +73,6 @@ def test_get_settings_settings_lgpo(): assert "InboundUserNotification" in ret assert "LocalConSecRules" in ret assert "LocalFirewallRules" in ret - assert "RemoteManagement" in ret assert "UnicastResponseToMulticast" in ret @@ -99,7 +97,6 @@ def test_get_all_settings_local(): assert "InboundUserNotification" in ret assert "LocalConSecRules" in ret assert "LocalFirewallRules" in ret - assert "RemoteManagement" in ret assert "UnicastResponseToMulticast" in ret assert "State" in ret @@ -115,7 +112,6 @@ def test_get_all_settings_lgpo(): assert "InboundUserNotification" in ret assert "LocalConSecRules" in ret assert "LocalFirewallRules" in ret - assert "RemoteManagement" in ret assert "UnicastResponseToMulticast" in ret assert "State" in ret @@ -356,7 +352,7 @@ def test_set_firewall_logging_maxfilesize_local(): new = win_lgpo_netsh.get_settings( profile="domain", section="logging", store="local" )["MaxFileSize"] - assert new == "16384" + assert new == 16384 finally: ret = win_lgpo_netsh.set_logging_settings( profile="domain", setting="maxfilesize", value=current, store="local" @@ -491,32 +487,6 @@ def test_set_firewall_settings_notification_lgpo_notconfigured(): assert ret is True -def test_set_firewall_settings_remotemgmt_local_enable(): - current = win_lgpo_netsh.get_settings( - profile="domain", section="settings", store="local" - )["RemoteManagement"] - try: - ret = win_lgpo_netsh.set_settings( - profile="domain", - setting="remotemanagement", - value="enable", - store="local", - ) - assert ret is True - new = win_lgpo_netsh.get_settings( - profile="domain", section="settings", store="local" - )["RemoteManagement"] - assert new == "Enable" - finally: - ret = win_lgpo_netsh.set_settings( - profile="domain", - setting="remotemanagement", - value=current, - store="local", - ) - assert ret is True - - def test_set_firewall_settings_unicast_local_disable(): current = win_lgpo_netsh.get_settings( profile="domain", section="settings", store="local" @@ -566,13 +536,16 @@ def test_set_firewall_state_local_notconfigured(): profile="domain", section="state", store="local" )["State"] try: - pytest.raises( - CommandExecutionError, - win_lgpo_netsh.set_state, + ret = win_lgpo_netsh.set_state( profile="domain", state="notconfigured", store="local", ) + assert ret is True + new = win_lgpo_netsh.get_settings( + profile="domain", section="state", store="local" + )["State"] + assert new == "NotConfigured" finally: ret = win_lgpo_netsh.set_state(profile="domain", state=current, store="local") assert ret is True