Use Powershell instead of netsh for firewall settings

This commit is contained in:
Shane Lee 2024-04-23 11:04:23 -06:00 committed by Pedro Algarvio
parent 5028305cd3
commit 344a3d8c2f
4 changed files with 363 additions and 181 deletions

2
changelog/61534.fixed.md Normal file
View file

@ -0,0 +1,2 @@
Fixed the win_lgpo_netsh salt util to handle non-English systems. This was a
rewrite to use PowerShell instead of netsh to make the changes on the system

View file

@ -6,16 +6,24 @@ A salt util for modifying firewall settings.
This util allows you to modify firewall settings in the local group policy in
addition to the normal firewall settings. Parameters are taken from the
netsh advfirewall prompt.
netsh advfirewall prompt. This utility has been adapted to use powershell
instead of the ``netsh`` command to make it compatible with non-English systems.
It maintains the ``netsh`` commands and parameters, but it is using powershell
under the hood.
.. versionchanged:: 3008.0
.. note::
More information can be found in the advfirewall context in netsh. This can
be access by opening a netsh prompt. At a command prompt type the following:
be accessed by opening a netsh prompt. At a command prompt type the
following:
c:\>netsh
netsh>advfirewall
netsh advfirewall>set help
netsh advfirewall>set domain help
.. code-block:: powershell
c:\>netsh
netsh>advfirewall
netsh advfirewall>set help
netsh advfirewall>set domain help
Usage:
@ -66,19 +74,40 @@ Usage:
store='lgpo')
"""
import logging
import os
import re
import socket
import tempfile
from textwrap import dedent
import salt.modules.cmdmod
import salt.utils.platform
import salt.utils.win_pwsh
from salt.exceptions import CommandExecutionError
log = logging.getLogger(__name__)
__hostname__ = socket.gethostname()
ON_OFF = {
0: "OFF",
1: "ON",
2: "NotConfigured",
"off": "False",
"on": "True",
"notconfigured": "NotConfigured",
}
ENABLE_DISABLE = {
0: "Disable",
1: "Enable",
2: "NotConfigured",
"disable": 0,
"enable": 1,
"notconfigured": 2,
}
OUTBOUND = {
0: "NotConfigured",
2: "AllowOutbound",
4: "BlockOutbound",
"notconfigured": "NotConfigured",
"allowoutbound": "Allow",
"blockoutbound": "Block",
}
__virtualname__ = "netsh"
__hostname__ = socket.gethostname()
# Although utils are often directly imported, it is also possible to use the
@ -93,60 +122,42 @@ def __virtual__():
return __virtualname__
def _netsh_file(content):
def _get_inbound_text(rule, action):
"""
helper function to get the results of ``netsh -f content.txt``
The "Inbound connections" setting is a combination of 2 parameters:
Running ``netsh`` will drop you into a ``netsh`` prompt where you can issue
``netsh`` commands. You can put a series of commands in an external file and
run them as if from a ``netsh`` prompt using the ``-f`` switch. That's what
this function does.
- AllowInboundRules
- DefaultInboundAction
Args:
The settings are as follows:
content (str):
The contents of the file that will be run by the ``netsh -f``
command
Returns:
str: The text returned by the netsh command
Rules Action
2 2 AllowInbound
2 4 BlockInbound
0 4 BlockInboundAlways
2 0 NotConfigured
"""
with tempfile.NamedTemporaryFile(
mode="w", prefix="salt-", suffix=".netsh", delete=False, encoding="utf-8"
) as fp:
fp.write(content)
try:
log.debug("%s:\n%s", fp.name, content)
return salt.modules.cmdmod.run(f"netsh -f {fp.name}", python_shell=True)
finally:
os.remove(fp.name)
settings = {
0: {
4: "BlockInboundAlways",
},
2: {
0: "NotConfigured",
2: "AllowInbound",
4: "BlockInbound",
},
}
return settings[rule][action]
def _netsh_command(command, store):
if store.lower() not in ("local", "lgpo"):
raise ValueError(f"Incorrect store: {store}")
# set the store for local or lgpo
if store.lower() == "local":
netsh_script = dedent(
"""\
advfirewall
set store local
{}
""".format(
command
)
)
else:
netsh_script = dedent(
"""\
advfirewall
set store gpo = {}
{}
""".format(
__hostname__, command
)
)
return _netsh_file(content=netsh_script).splitlines()
def _get_inbound_settings(text):
settings = {
"allowinbound": (2, 2),
"blockinbound": (2, 4),
"blockinboundalways": (0, 4),
"notconfigured": (2, 0),
}
return settings[text.lower()]
def get_settings(profile, section, store="local"):
@ -195,70 +206,54 @@ def get_settings(profile, section, store="local"):
raise ValueError(f"Incorrect section: {section}")
if store.lower() not in ("local", "lgpo"):
raise ValueError(f"Incorrect store: {store}")
command = f"show {profile}profile {section}"
# run it
results = _netsh_command(command=command, store=store)
# sample output:
# Domain Profile Settings:
# ----------------------------------------------------------------------
# LocalFirewallRules N/A (GPO-store only)
# LocalConSecRules N/A (GPO-store only)
# InboundUserNotification Disable
# RemoteManagement Disable
# UnicastResponseToMulticast Enable
# if it's less than 3 lines it failed
if len(results) < 3:
raise CommandExecutionError(f"Invalid results: {results}")
ret = {}
# Skip the first 2 lines. Add everything else to a dictionary
for line in results[3:]:
ret.update(dict(list(zip(*[iter(re.split(r"\s{2,}", line))] * 2))))
# Build the powershell command
cmd = ["Get-NetFirewallProfile"]
if profile:
cmd.append(profile)
if store and store.lower() == "lgpo":
cmd.extend(["-PolicyStore", "localhost"])
# Remove spaces from the values so that `Not Configured` is detected
# correctly
for item in ret:
ret[item] = ret[item].replace(" ", "")
# Run the command
settings = salt.utils.win_pwsh.run_dict(cmd)
# special handling for firewallpolicy
if section == "firewallpolicy":
inbound, outbound = ret["Firewall Policy"].split(",")
return {"Inbound": inbound, "Outbound": outbound}
# A successful run should return a dictionary
if not settings:
raise CommandExecutionError("LGPO NETSH: An unknown error occurred")
return ret
# Remove the junk
for setting in list(settings.keys()):
if setting.startswith("Cim"):
settings.pop(setting)
# Make it look like netsh output
ret_settings = {
"firewallpolicy": {
"Inbound": _get_inbound_text(
settings["AllowInboundRules"], settings["DefaultInboundAction"]
),
"Outbound": OUTBOUND[settings["DefaultOutboundAction"]],
},
"state": {
"State": ON_OFF[settings["Enabled"]],
},
"logging": {
"FileName": settings["LogFileName"],
"LogAllowedConnections": ENABLE_DISABLE[settings["LogAllowed"]],
"LogDroppedConnections": ENABLE_DISABLE[settings["LogBlocked"]],
"MaxFileSize": settings["LogMaxSizeKilobytes"],
},
"settings": {
"InboundUserNotification": ENABLE_DISABLE[settings["NotifyOnListen"]],
"LocalConSecRules": ENABLE_DISABLE[settings["AllowLocalIPsecRules"]],
"LocalFirewallRules": ENABLE_DISABLE[settings["AllowLocalFirewallRules"]],
"UnicastResponseToMulticast": ENABLE_DISABLE[
settings["AllowUnicastResponseToMulticast"]
],
},
}
def get_all_settings(profile, store="local"):
"""
Gets all the properties for the specified profile in the specified store
Args:
profile (str):
The firewall profile to query. Valid options are:
- domain
- public
- private
store (str):
The store to use. This is either the local firewall policy or the
policy defined by local group policy. Valid options are:
- lgpo
- local
Default is ``local``
Returns:
dict: A dictionary containing the specified settings
"""
ret = dict()
ret.update(get_settings(profile=profile, section="state", store=store))
ret.update(get_settings(profile=profile, section="firewallpolicy", store=store))
ret.update(get_settings(profile=profile, section="settings", store=store))
ret.update(get_settings(profile=profile, section="logging", store=store))
return ret
return ret_settings[section.lower()]
def get_all_profiles(store="local"):
@ -286,6 +281,82 @@ def get_all_profiles(store="local"):
}
def get_all_settings(profile, store="local"):
"""
Gets all the properties for the specified profile in the specified store
Args:
profile (str):
The firewall profile to query. Valid options are:
- domain
- public
- private
store (str):
The store to use. This is either the local firewall policy or the
policy defined by local group policy. Valid options are:
- lgpo
- local
Default is ``local``
Returns:
dict: A dictionary containing the specified settings
Raises:
CommandExecutionError: If an error occurs
ValueError: If the parameters are incorrect
"""
# validate input
if profile.lower() not in ("domain", "public", "private"):
raise ValueError(f"Incorrect profile: {profile}")
if store.lower() not in ("local", "lgpo"):
raise ValueError(f"Incorrect store: {store}")
# Build the powershell command
cmd = ["Get-NetFirewallProfile"]
if profile:
cmd.append(profile)
if store and store.lower() == "lgpo":
cmd.extend(["-PolicyStore", "localhost"])
# Run the command
settings = salt.utils.win_pwsh.run_dict(cmd)
# A successful run should return a dictionary
if not settings:
raise CommandExecutionError("LGPO NETSH: An unknown error occurred")
# Remove the junk
for setting in list(settings.keys()):
if setting.startswith("Cim"):
settings.pop(setting)
# Make it look like netsh output
ret_settings = {
"FileName": settings["LogFileName"],
"Inbound": _get_inbound_text(
settings["AllowInboundRules"], settings["DefaultInboundAction"]
),
"InboundUserNotification": ENABLE_DISABLE[settings["NotifyOnListen"]],
"LocalConSecRules": ENABLE_DISABLE[settings["AllowLocalIPsecRules"]],
"LocalFirewallRules": ENABLE_DISABLE[settings["AllowLocalFirewallRules"]],
"LogAllowedConnections": ENABLE_DISABLE[settings["LogAllowed"]],
"LogDroppedConnections": ENABLE_DISABLE[settings["LogBlocked"]],
"MaxFileSize": settings["LogMaxSizeKilobytes"],
"Outbound": OUTBOUND[settings["DefaultOutboundAction"]],
"State": ON_OFF[settings["Enabled"]],
"UnicastResponseToMulticast": ON_OFF[
settings["AllowUnicastResponseToMulticast"]
],
}
return ret_settings
def set_firewall_settings(profile, inbound=None, outbound=None, store="local"):
"""
Set the firewall inbound/outbound settings for the specified profile and
@ -307,7 +378,7 @@ def set_firewall_settings(profile, inbound=None, outbound=None, store="local"):
- blockinbound
- blockinboundalways
- allowinbound
- notconfigured
- notconfigured <=== lgpo only
Default is ``None``
@ -317,7 +388,7 @@ def set_firewall_settings(profile, inbound=None, outbound=None, store="local"):
- allowoutbound
- blockoutbound
- notconfigured
- notconfigured <=== lgpo only
Default is ``None``
@ -355,21 +426,34 @@ def set_firewall_settings(profile, inbound=None, outbound=None, store="local"):
raise ValueError(f"Incorrect outbound value: {outbound}")
if not inbound and not outbound:
raise ValueError("Must set inbound or outbound")
if store == "local":
if inbound and inbound.lower() == "notconfigured":
msg = "Cannot set local inbound policies as NotConfigured"
raise CommandExecutionError(msg)
if outbound and outbound.lower() == "notconfigured":
msg = "Cannot set local outbound policies as NotConfigured"
raise CommandExecutionError(msg)
# You have to specify inbound and outbound setting at the same time
# If you're only specifying one, you have to get the current setting for the
# other
if not inbound or not outbound:
ret = get_settings(profile=profile, section="firewallpolicy", store=store)
if not inbound:
inbound = ret["Inbound"]
if not outbound:
outbound = ret["Outbound"]
# Build the powershell command
cmd = ["Set-NetFirewallProfile"]
if profile:
cmd.append(profile)
if store and store.lower() == "lgpo":
cmd.extend(["-PolicyStore", "localhost"])
command = f"set {profile}profile firewallpolicy {inbound},{outbound}"
# Get inbound settings
if inbound:
in_rule, in_action = _get_inbound_settings(inbound.lower())
cmd.extend(["-AllowInboundRules", in_rule, "-DefaultInboundAction", in_action])
results = _netsh_command(command=command, store=store)
if outbound:
out_rule = OUTBOUND[outbound.lower()]
cmd.extend(["-DefaultOutboundAction", out_rule])
# Run the command
results = salt.utils.win_pwsh.run_dict(cmd)
# A successful run should return an empty list
if results:
raise CommandExecutionError(f"An error occurred: {results}")
@ -442,6 +526,10 @@ def set_logging_settings(profile, setting, value, store="local"):
# Input validation
if profile.lower() not in ("domain", "public", "private"):
raise ValueError(f"Incorrect profile: {profile}")
if store == "local":
if str(value).lower() == "notconfigured":
msg = "Cannot set local policies as NotConfigured"
raise CommandExecutionError(msg)
if setting.lower() not in (
"allowedconnections",
"droppedconnections",
@ -449,13 +537,21 @@ def set_logging_settings(profile, setting, value, store="local"):
"maxfilesize",
):
raise ValueError(f"Incorrect setting: {setting}")
settings = {"filename": ["-LogFileName", value]}
if setting.lower() in ("allowedconnections", "droppedconnections"):
if value.lower() not in ("enable", "disable", "notconfigured"):
raise ValueError(f"Incorrect value: {value}")
settings.update(
{
"allowedconnections": ["-LogAllowed", ENABLE_DISABLE[value.lower()]],
"droppedconnections": ["-LogBlocked", ENABLE_DISABLE[value.lower()]],
}
)
# TODO: Consider adding something like the following to validate filename
# https://stackoverflow.com/questions/9532499/check-whether-a-path-is-valid-in-python-without-creating-a-file-at-the-paths-ta
if setting.lower() == "maxfilesize":
if value.lower() != "notconfigured":
if str(value).lower() != "notconfigured":
# Must be a number between 1 and 32767
try:
int(value)
@ -463,9 +559,18 @@ def set_logging_settings(profile, setting, value, store="local"):
raise ValueError(f"Incorrect value: {value}")
if not 1 <= int(value) <= 32767:
raise ValueError(f"Incorrect value: {value}")
# Run the command
command = f"set {profile}profile logging {setting} {value}"
results = _netsh_command(command=command, store=store)
settings.update({"maxfilesize": ["-LogMaxSizeKilobytes", value]})
# Build the powershell command
cmd = ["Set-NetFirewallProfile"]
if profile:
cmd.append(profile)
if store and store.lower() == "lgpo":
cmd.extend(["-PolicyStore", "localhost"])
cmd.extend(settings[setting.lower()])
results = salt.utils.win_pwsh.run_dict(cmd)
# A successful run should return an empty list
if results:
@ -493,7 +598,6 @@ def set_settings(profile, setting, value, store="local"):
- localfirewallrules
- localconsecrules
- inboundusernotification
- remotemanagement
- unicastresponsetomulticast
value (str):
@ -526,16 +630,42 @@ def set_settings(profile, setting, value, store="local"):
"localfirewallrules",
"localconsecrules",
"inboundusernotification",
"remotemanagement",
"unicastresponsetomulticast",
):
raise ValueError(f"Incorrect setting: {setting}")
if value.lower() not in ("enable", "disable", "notconfigured"):
raise ValueError(f"Incorrect value: {value}")
if setting.lower() in ["localfirewallrules", "localconsecrules"]:
if store.lower() != "lgpo":
msg = f"{setting} can only be set using Group Policy"
raise CommandExecutionError(msg)
if setting.lower() == "inboundusernotification" and store.lower() != "lgpo":
if value.lower() == "notconfigured":
msg = "NotConfigured is only valid when setting group policy"
raise CommandExecutionError(msg)
# Run the command
command = f"set {profile}profile settings {setting} {value}"
results = _netsh_command(command=command, store=store)
# Build the powershell command
cmd = ["Set-NetFirewallProfile"]
if profile:
cmd.append(profile)
if store and store.lower() == "lgpo":
cmd.extend(["-PolicyStore", "localhost"])
settings = {
"localfirewallrules": [
"-AllowLocalFirewallRules",
ENABLE_DISABLE[value.lower()],
],
"localconsecrules": ["-AllowLocalIPsecRules", ENABLE_DISABLE[value.lower()]],
"inboundusernotification": ["-NotifyOnListen", ENABLE_DISABLE[value.lower()]],
"unicastresponsetomulticast": [
"-AllowUnicastResponseToMulticast",
ENABLE_DISABLE[value.lower()],
],
}
cmd.extend(settings[setting.lower()])
results = salt.utils.win_pwsh.run_dict(cmd)
# A successful run should return an empty list
if results:
@ -546,7 +676,7 @@ def set_settings(profile, setting, value, store="local"):
def set_state(profile, state, store="local"):
"""
Configure the firewall state.
Enable or disable the firewall profile.
Args:
@ -583,12 +713,22 @@ def set_state(profile, state, store="local"):
# Input validation
if profile.lower() not in ("domain", "public", "private"):
raise ValueError(f"Incorrect profile: {profile}")
if state.lower() not in ("on", "off", "notconfigured"):
raise ValueError(f"Incorrect state: {state}")
if not isinstance(state, bool):
if state.lower() not in ("on", "off", "notconfigured"):
raise ValueError(f"Incorrect state: {state}")
else:
state = "On" if state else "Off"
# Run the command
command = f"set {profile}profile state {state}"
results = _netsh_command(command=command, store=store)
# Build the powershell command
cmd = ["Set-NetFirewallProfile"]
if profile:
cmd.append(profile)
if store and store.lower() == "lgpo":
cmd.extend(["-PolicyStore", "localhost"])
cmd.extend(["-Enabled", ON_OFF[state.lower()]])
results = salt.utils.win_pwsh.run_dict(cmd)
# A successful run should return an empty list
if results:

67
salt/utils/win_pwsh.py Normal file
View file

@ -0,0 +1,67 @@
import salt.modules.cmdmod
import salt.utils.json
import salt.utils.platform
from salt.exceptions import CommandExecutionError
__virtualname__ = "win_pwsh"
def __virtual__():
"""
Only load if windows
"""
if not salt.utils.platform.is_windows():
return False, "This utility will only run on Windows"
return __virtualname__
def run_dict(cmd, cwd=None):
"""
Execute the powershell command and return the data as a dictionary
.. versionadded:: 3006.9
Args:
cmd (str,list): The powershell command to run
cwd (str): The current working directory
Returns:
dict: A dictionary containing the output of the powershell command
Raises:
CommandExecutionError:
If an error is encountered or the command does not complete
successfully
"""
if isinstance(cmd, list):
cmd = " ".join(map(str, cmd))
if "convertto-json" not in cmd.lower():
cmd = f"{cmd} | ConvertTo-Json"
if "progresspreference" not in cmd.lower():
cmd = f"$ProgressPreference = 'SilentlyContinue'; {cmd}"
ret = salt.modules.cmdmod.run_all(cmd=cmd, shell="powershell", cwd=cwd)
if "pid" in ret:
del ret["pid"]
if ret.get("stderr", ""):
error = ret["stderr"].splitlines()[0]
raise CommandExecutionError(error, info=ret)
if "retcode" not in ret or ret["retcode"] != 0:
# run_all logs an error to log.error, fail hard back to the user
raise CommandExecutionError("Issue executing PowerShell cmd", info=ret)
# Sometimes Powershell returns an empty string, which isn't valid JSON
if ret["stdout"] == "":
ret["stdout"] = "{}"
try:
ret = salt.utils.json.loads(ret["stdout"], strict=False)
except ValueError:
raise CommandExecutionError("No JSON results from PowerShell", info=ret)
return ret

View file

@ -63,7 +63,6 @@ def test_get_settings_settings_local():
assert "InboundUserNotification" in ret
assert "LocalConSecRules" in ret
assert "LocalFirewallRules" in ret
assert "RemoteManagement" in ret
assert "UnicastResponseToMulticast" in ret
@ -74,7 +73,6 @@ def test_get_settings_settings_lgpo():
assert "InboundUserNotification" in ret
assert "LocalConSecRules" in ret
assert "LocalFirewallRules" in ret
assert "RemoteManagement" in ret
assert "UnicastResponseToMulticast" in ret
@ -99,7 +97,6 @@ def test_get_all_settings_local():
assert "InboundUserNotification" in ret
assert "LocalConSecRules" in ret
assert "LocalFirewallRules" in ret
assert "RemoteManagement" in ret
assert "UnicastResponseToMulticast" in ret
assert "State" in ret
@ -115,7 +112,6 @@ def test_get_all_settings_lgpo():
assert "InboundUserNotification" in ret
assert "LocalConSecRules" in ret
assert "LocalFirewallRules" in ret
assert "RemoteManagement" in ret
assert "UnicastResponseToMulticast" in ret
assert "State" in ret
@ -356,7 +352,7 @@ def test_set_firewall_logging_maxfilesize_local():
new = win_lgpo_netsh.get_settings(
profile="domain", section="logging", store="local"
)["MaxFileSize"]
assert new == "16384"
assert new == 16384
finally:
ret = win_lgpo_netsh.set_logging_settings(
profile="domain", setting="maxfilesize", value=current, store="local"
@ -491,32 +487,6 @@ def test_set_firewall_settings_notification_lgpo_notconfigured():
assert ret is True
def test_set_firewall_settings_remotemgmt_local_enable():
current = win_lgpo_netsh.get_settings(
profile="domain", section="settings", store="local"
)["RemoteManagement"]
try:
ret = win_lgpo_netsh.set_settings(
profile="domain",
setting="remotemanagement",
value="enable",
store="local",
)
assert ret is True
new = win_lgpo_netsh.get_settings(
profile="domain", section="settings", store="local"
)["RemoteManagement"]
assert new == "Enable"
finally:
ret = win_lgpo_netsh.set_settings(
profile="domain",
setting="remotemanagement",
value=current,
store="local",
)
assert ret is True
def test_set_firewall_settings_unicast_local_disable():
current = win_lgpo_netsh.get_settings(
profile="domain", section="settings", store="local"
@ -566,13 +536,16 @@ def test_set_firewall_state_local_notconfigured():
profile="domain", section="state", store="local"
)["State"]
try:
pytest.raises(
CommandExecutionError,
win_lgpo_netsh.set_state,
ret = win_lgpo_netsh.set_state(
profile="domain",
state="notconfigured",
store="local",
)
assert ret is True
new = win_lgpo_netsh.get_settings(
profile="domain", section="state", store="local"
)["State"]
assert new == "NotConfigured"
finally:
ret = win_lgpo_netsh.set_state(profile="domain", state=current, store="local")
assert ret is True