Adding nftables functionality for use in EL8 (#56259)

* adding nftables functionality for use in EL8

* default to current functionality

* updates for black

* updates for black

* updates for black

* Update test_nftables.py

* updates for black

* updates for black

* updates for black

* updates for black

* updates for black

* updates for black

* fix spacing for test comment returns to match state

* changed versionadded, black, and lint due to time-travelling PR

* fixing test lint failure

* more lint fixes

Co-authored-by: Sage the Rage <36676171+sagetherage@users.noreply.github.com>
This commit is contained in:
Nicholas Hughes 2020-08-27 12:59:36 -04:00 committed by GitHub
parent 737e3e0215
commit 4fef467b64
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 956 additions and 205 deletions

View file

@ -1,8 +1,6 @@
# -*- coding: utf-8 -*-
"""
Support for nftables
"""
from __future__ import absolute_import, print_function, unicode_literals
# Import python libs
import json
@ -15,7 +13,6 @@ import salt.utils.path
from salt.exceptions import CommandExecutionError
# Import salt libs
from salt.ext import six
from salt.state import STATE_INTERNAL_KEYWORDS as _STATE_INTERNAL_KEYWORDS
# Set up logging
@ -80,7 +77,7 @@ def version():
salt '*' nftables.version
"""
cmd = "{0} --version".format(_nftables_cmd())
cmd = "{} --version".format(_nftables_cmd())
out = __salt__["cmd.run"](cmd).split()
return out[1]
@ -142,11 +139,11 @@ def build_rule(
nft_family = _NFTABLES_FAMILIES[family]
if "if" in kwargs:
rule += "meta iifname {0} ".format(kwargs["if"])
rule += "meta iifname {} ".format(kwargs["if"])
del kwargs["if"]
if "of" in kwargs:
rule += "meta oifname {0} ".format(kwargs["of"])
rule += "meta oifname {} ".format(kwargs["of"])
del kwargs["of"]
if "proto" in kwargs:
@ -156,18 +153,44 @@ def build_rule(
del kwargs["state"]
if "connstate" in kwargs:
rule += "ct state {{ {0}}} ".format(kwargs["connstate"])
rule += "ct state {{ {0} }} ".format(kwargs["connstate"])
del kwargs["connstate"]
if "icmp-type" in kwargs:
rule += "icmp type {{ {0} }} ".format(kwargs["icmp-type"])
del kwargs["icmp-type"]
if "pkttype" in kwargs:
rule += "meta pkttype {{ {0} }} ".format(kwargs["pkttype"])
del kwargs["pkttype"]
if "counter" in kwargs:
rule += "counter "
del kwargs["counter"]
if "saddr" in kwargs or "source" in kwargs:
rule += "ip saddr {}".format(kwargs.get("saddr") or kwargs.get("source"))
if "saddr" in kwargs:
del kwargs["saddr"]
if "source" in kwargs:
del kwargs["source"]
if "daddr" in kwargs or "destination" in kwargs:
rule += "ip daddr {}".format(kwargs.get("daddr") or kwargs.get("destination"))
if "daddr" in kwargs:
del kwargs["daddr"]
if "destination" in kwargs:
del kwargs["destination"]
if "dport" in kwargs:
kwargs["dport"] = six.text_type(kwargs["dport"])
kwargs["dport"] = str(kwargs["dport"])
if ":" in kwargs["dport"]:
kwargs["dport"] = kwargs["dport"].replace(":", "-")
rule += "dport {{ {0} }} ".format(kwargs["dport"])
del kwargs["dport"]
if "sport" in kwargs:
kwargs["sport"] = six.text_type(kwargs["sport"])
kwargs["sport"] = str(kwargs["sport"])
if ":" in kwargs["sport"]:
kwargs["sport"] = kwargs["sport"].replace(":", "-")
rule += "sport {{ {0} }} ".format(kwargs["sport"])
@ -180,7 +203,7 @@ def build_rule(
_dports = kwargs["dports"].split(",")
_dports = [int(x) for x in _dports]
_dports.sort(reverse=True)
kwargs["dports"] = ", ".join(six.text_type(x) for x in _dports)
kwargs["dports"] = ", ".join(str(x) for x in _dports)
rule += "dport {{ {0} }} ".format(kwargs["dports"])
del kwargs["dports"]
@ -192,7 +215,7 @@ def build_rule(
_sports = kwargs["sports"].split(",")
_sports = [int(x) for x in _sports]
_sports.sort(reverse=True)
kwargs["sports"] = ", ".join(six.text_type(x) for x in _sports)
kwargs["sports"] = ", ".join(str(x) for x in _sports)
rule += "sport {{ {0} }} ".format(kwargs["sports"])
del kwargs["sports"]
@ -202,27 +225,36 @@ def build_rule(
after_jump = []
if "jump" in kwargs:
after_jump.append("{0} ".format(kwargs["jump"]))
after_jump.append("{} ".format(kwargs["jump"]))
del kwargs["jump"]
if "j" in kwargs:
after_jump.append("{0} ".format(kwargs["j"]))
after_jump.append("{} ".format(kwargs["j"]))
del kwargs["j"]
if "to-port" in kwargs:
after_jump.append("--to-port {0} ".format(kwargs["to-port"]))
del kwargs["to-port"]
if "redirect-to" in kwargs or "to-port" in kwargs:
after_jump.append(
"redirect to {} ".format(kwargs.get("redirect-to") or kwargs.get("to-port"))
)
if "redirect-to" in kwargs:
del kwargs["redirect-to"]
if "to-port" in kwargs:
del kwargs["to-port"]
if "to-ports" in kwargs:
after_jump.append("--to-ports {0} ".format(kwargs["to-ports"]))
after_jump.append("--to-ports {} ".format(kwargs["to-ports"]))
del kwargs["to-ports"]
if "to-source" in kwargs:
after_jump.append("{} ".format(kwargs["to-source"]))
del kwargs["to-source"]
if "to-destination" in kwargs:
after_jump.append("--to-destination {0} ".format(kwargs["to-destination"]))
after_jump.append("{} ".format(kwargs["to-destination"]))
del kwargs["to-destination"]
if "reject-with" in kwargs:
after_jump.append("--reject-with {0} ".format(kwargs["reject-with"]))
after_jump.append("reject with {} ".format(kwargs["reject-with"]))
del kwargs["reject-with"]
for item in after_jump:
@ -232,8 +264,8 @@ def build_rule(
rule = rule.strip()
# Insert the protocol prior to dport or sport
rule = rule.replace("dport", "{0} dport".format(proto))
rule = rule.replace("sport", "{0} sport".format(proto))
rule = rule.replace("dport", "{} dport".format(proto))
rule = rule.replace("sport", "{} sport".format(proto))
ret["rule"] = rule
@ -253,15 +285,15 @@ def build_rule(
if command in ["Insert", "insert", "INSERT"]:
if position:
ret["rule"] = "{0} insert rule {1} {2} {3} " "position {4} {5}".format(
ret["rule"] = "{} insert rule {} {} {} " "position {} {}".format(
_nftables_cmd(), nft_family, table, chain, position, rule
)
else:
ret["rule"] = "{0} insert rule " "{1} {2} {3} {4}".format(
ret["rule"] = "{} insert rule " "{} {} {} {}".format(
_nftables_cmd(), nft_family, table, chain, rule
)
else:
ret["rule"] = "{0} {1} rule {2} {3} {4} {5}".format(
ret["rule"] = "{} {} rule {} {} {} {}".format(
_nftables_cmd(), command, nft_family, table, chain, rule
)
@ -313,13 +345,20 @@ def list_tables(family="ipv4"):
"""
nft_family = _NFTABLES_FAMILIES[family]
tables = []
cmd = "{0} --json --numeric --numeric --numeric " "list tables {1}".format(
cmd = "{} --json --numeric --numeric --numeric list tables {}".format(
_nftables_cmd(), nft_family
)
out = __salt__["cmd.run"](cmd, python_shell=False)
if not out:
return tables
data = json.loads(out)
try:
data = json.loads(out)
except ValueError:
return tables
if not data or not data.get("nftables"):
return tables
for item in data.get("nftables", []):
if "metainfo" not in item:
@ -347,7 +386,7 @@ def get_rules(family="ipv4"):
rules = []
for table in tables:
table_name = table["name"]
cmd = "{0} --numeric --numeric --numeric " "list table {1} {2}".format(
cmd = "{} --numeric --numeric --numeric " "list table {} {}".format(
_nftables_cmd(), nft_family, table_name
)
out = __salt__["cmd.run"](cmd, python_shell=False)
@ -355,9 +394,48 @@ def get_rules(family="ipv4"):
return rules
def get_rules_json(family="ipv4"):
"""
.. versionadded:: Magnesium
Return a list of dictionaries comprising the current, in-memory rules
family
Networking family, either ipv4 or ipv6
CLI Example:
.. code-block:: bash
salt '*' nftables.get_rules_json
salt '*' nftables.get_rules_json family=ipv6
"""
nft_family = _NFTABLES_FAMILIES[family]
rules = []
cmd = "{} --numeric --numeric --numeric --json list ruleset {}".format(
_nftables_cmd(), nft_family
)
out = __salt__["cmd.run"](cmd, python_shell=False)
if not out:
return rules
try:
rules = (json.loads(out))["nftables"]
except (KeyError, ValueError):
return rules
return rules
def save(filename=None, family="ipv4"):
"""
Save the current in-memory rules to disk
.. versionchanged:: Magnesium
Save the current in-memory rules to disk. On systems where /etc/nftables is
a directory, a file named salt-all-in-one.nft will be dropped inside by default.
The main nftables configuration will need to include this file.
CLI Example:
@ -368,8 +446,12 @@ def save(filename=None, family="ipv4"):
if _conf() and not filename:
filename = _conf()
nft_families = ["ip", "ip6", "arp", "bridge"]
# Not a typo. Invert the dictionary twice to get unique values only.
nft_families = {v: k for k, v in _NFTABLES_FAMILIES.items()}
nft_families = {v: k for k, v in _NFTABLES_FAMILIES.items()}
rules = "#! nft -f\n"
for family in nft_families:
out = get_rules(family)
if out:
@ -377,13 +459,16 @@ def save(filename=None, family="ipv4"):
rules = rules + "\n".join(out)
rules = rules + "\n"
if __salt__["file.directory_exists"](filename):
filename = "{}/salt-all-in-one.nft".format(filename)
try:
with salt.utils.files.fopen(filename, "wb") as _fh:
# Write out any changes
_fh.write(salt.utils.data.encode(rules))
except (IOError, OSError) as exc:
except OSError as exc:
raise CommandExecutionError(
"Problem writing to configuration file: {0}".format(exc)
"Problem writing to configuration file: {}".format(exc)
)
return rules
@ -432,18 +517,18 @@ def get_rule_handle(table="filter", chain=None, rule=None, family="ipv4"):
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = "{0} --numeric --numeric --numeric --handle list chain {1} {2} {3}".format(
cmd = "{} --numeric --numeric --numeric --handle list chain {} {} {}".format(
_nftables_cmd(), nft_family, table, chain
)
out = __salt__["cmd.run"](cmd, python_shell=False)
rules = re.split("\n+", out)
pat = re.compile(r"{0} # handle (?P<handle>\d+)".format(rule))
pat = re.compile(r"{} # handle (?P<handle>\d+)".format(rule))
for r in rules:
match = pat.search(r)
if match:
return {"result": True, "handle": match.group("handle")}
return {"result": False, "comment": "Could not find rule {0}".format(rule)}
return {"result": False, "comment": "Could not find rule {}".format(rule)}
def check(table="filter", chain=None, rule=None, family="ipv4"):
@ -486,22 +571,20 @@ def check(table="filter", chain=None, rule=None, family="ipv4"):
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = "{0} --handle --numeric --numeric --numeric list chain {1} {2} {3}".format(
cmd = "{} --handle --numeric --numeric --numeric list chain {} {} {}".format(
_nftables_cmd(), nft_family, table, chain
)
search_rule = "{0} #".format(rule)
search_rule = "{} #".format(rule)
out = __salt__["cmd.run"](cmd, python_shell=False).find(search_rule)
if out == -1:
ret[
"comment"
] = "Rule {0} in chain {1} in table {2} in family {3} does not exist".format(
] = "Rule {} in chain {} in table {} in family {} does not exist".format(
rule, chain, table, family
)
else:
ret[
"comment"
] = "Rule {0} in chain {1} in table {2} in family {3} exists".format(
ret["comment"] = "Rule {} in chain {} in table {} in family {} exists".format(
rule, chain, table, family
)
ret["result"] = True
@ -531,17 +614,17 @@ def check_chain(table="filter", chain=None, family="ipv4"):
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = "{0} list table {1} {2}".format(_nftables_cmd(), nft_family, table)
cmd = "{} list table {} {}".format(_nftables_cmd(), nft_family, table)
out = __salt__["cmd.run"](cmd, python_shell=False).find(
"chain {0} {{".format(chain)
)
if out == -1:
ret["comment"] = "Chain {0} in table {1} in family {2} does not exist".format(
ret["comment"] = "Chain {} in table {} in family {} does not exist".format(
chain, table, family
)
else:
ret["comment"] = "Chain {0} in table {1} in family {2} exists".format(
ret["comment"] = "Chain {} in table {} in family {} exists".format(
chain, table, family
)
ret["result"] = True
@ -563,15 +646,15 @@ def check_table(table=None, family="ipv4"):
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = "{0} list tables {1}".format(_nftables_cmd(), nft_family)
cmd = "{} list tables {}".format(_nftables_cmd(), nft_family)
out = __salt__["cmd.run"](cmd, python_shell=False).find(
"table {0} {1}".format(nft_family, table)
"table {} {}".format(nft_family, table)
)
if out == -1:
ret["comment"] = "Table {0} in family {1} does not exist".format(table, family)
ret["comment"] = "Table {} in family {} does not exist".format(table, family)
else:
ret["comment"] = "Table {0} in family {1} exists".format(table, family)
ret["comment"] = "Table {} in family {} exists".format(table, family)
ret["result"] = True
return ret
@ -602,14 +685,14 @@ def new_table(table, family="ipv4"):
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = "{0} add table {1} {2}".format(_nftables_cmd(), nft_family, table)
cmd = "{} add table {} {}".format(_nftables_cmd(), nft_family, table)
out = __salt__["cmd.run"](cmd, python_shell=False)
if not out:
ret["comment"] = "Table {0} in family {1} created".format(table, family)
ret["comment"] = "Table {} in family {} created".format(table, family)
ret["result"] = True
else:
ret["comment"] = "Table {0} in family {1} could not be created".format(
ret["comment"] = "Table {} in family {} could not be created".format(
table, family
)
return ret
@ -641,14 +724,14 @@ def delete_table(table, family="ipv4"):
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = "{0} delete table {1} {2}".format(_nftables_cmd(), nft_family, table)
cmd = "{} delete table {} {}".format(_nftables_cmd(), nft_family, table)
out = __salt__["cmd.run"](cmd, python_shell=False)
if not out:
ret["comment"] = "Table {0} in family {1} deleted".format(table, family)
ret["comment"] = "Table {} in family {} deleted".format(table, family)
ret["result"] = True
else:
ret["comment"] = "Table {0} in family {1} could not be deleted".format(
ret["comment"] = "Table {} in family {} could not be deleted".format(
table, family
)
return ret
@ -693,15 +776,15 @@ def new_chain(
res = check_chain(table, chain, family=family)
if res["result"]:
ret["comment"] = "Chain {0} in table {1} in family {2} already exists".format(
ret["comment"] = "Chain {} in table {} in family {} already exists".format(
chain, table, family
)
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = "{0} add chain {1} {2} {3}".format(_nftables_cmd(), nft_family, table, chain)
cmd = "{} -- add chain {} {} {}".format(_nftables_cmd(), nft_family, table, chain)
if table_type or hook or priority:
if table_type and hook and six.text_type(priority):
if table_type and hook and str(priority):
cmd = r"{0} \{{ type {1} hook {2} priority {3}\; \}}".format(
cmd, table_type, hook, priority
)
@ -713,14 +796,14 @@ def new_chain(
out = __salt__["cmd.run"](cmd, python_shell=False)
if not out:
ret["comment"] = "Chain {0} in table {1} in family {2} created".format(
ret["comment"] = "Chain {} in table {} in family {} created".format(
chain, table, family
)
ret["result"] = True
else:
ret[
"comment"
] = "Chain {0} in table {1} in family {2} could not be created".format(
] = "Chain {} in table {} in family {} could not be created".format(
chain, table, family
)
return ret
@ -760,20 +843,18 @@ def delete_chain(table="filter", chain=None, family="ipv4"):
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = "{0} delete chain {1} {2} {3}".format(
_nftables_cmd(), nft_family, table, chain
)
cmd = "{} delete chain {} {} {}".format(_nftables_cmd(), nft_family, table, chain)
out = __salt__["cmd.run"](cmd, python_shell=False)
if not out:
ret["comment"] = "Chain {0} in table {1} in family {2} deleted".format(
ret["comment"] = "Chain {} in table {} in family {} deleted".format(
chain, table, family
)
ret["result"] = True
else:
ret[
"comment"
] = "Chain {0} in table {1} in family {2} could not be deleted".format(
] = "Chain {} in table {} in family {} could not be deleted".format(
chain, table, family
)
return ret
@ -801,7 +882,7 @@ def append(table="filter", chain=None, rule=None, family="ipv4"):
family=ipv6
"""
ret = {
"comment": "Failed to append rule {0} to chain {1} in table {2}.".format(
"comment": "Failed to append rule {} to chain {} in table {}.".format(
rule, chain, table
),
"result": False,
@ -827,28 +908,26 @@ def append(table="filter", chain=None, rule=None, family="ipv4"):
if res["result"]:
ret[
"comment"
] = "Rule {0} chain {1} in table {2} in family {3} already exists".format(
] = "Rule {} chain {} in table {} in family {} already exists".format(
rule, chain, table, family
)
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = "{0} add rule {1} {2} {3} {4}".format(
cmd = "{} add rule {} {} {} {}".format(
_nftables_cmd(), nft_family, table, chain, rule
)
out = __salt__["cmd.run"](cmd, python_shell=False)
if len(out) == 0:
if not out:
ret["result"] = True
ret[
"comment"
] = 'Added rule "{0}" chain {1} in table {2} in family {3}.'.format(
ret["comment"] = 'Added rule "{}" chain {} in table {} in family {}.'.format(
rule, chain, table, family
)
else:
ret[
"comment"
] = 'Failed to add rule "{0}" chain {1} in table {2} in family {3}.'.format(
] = 'Failed to add rule "{}" chain {} in table {} in family {}.'.format(
rule, chain, table, family
)
return ret
@ -885,7 +964,7 @@ def insert(table="filter", chain=None, position=None, rule=None, family="ipv4"):
family=ipv6
"""
ret = {
"comment": "Failed to insert rule {0} to table {1}.".format(rule, table),
"comment": "Failed to insert rule {} to table {}.".format(rule, table),
"result": False,
}
@ -909,33 +988,31 @@ def insert(table="filter", chain=None, position=None, rule=None, family="ipv4"):
if res["result"]:
ret[
"comment"
] = "Rule {0} chain {1} in table {2} in family {3} already exists".format(
] = "Rule {} chain {} in table {} in family {} already exists".format(
rule, chain, table, family
)
return ret
nft_family = _NFTABLES_FAMILIES[family]
if position:
cmd = "{0} insert rule {1} {2} {3} position {4} {5}".format(
cmd = "{} insert rule {} {} {} position {} {}".format(
_nftables_cmd(), nft_family, table, chain, position, rule
)
else:
cmd = "{0} insert rule {1} {2} {3} {4}".format(
cmd = "{} insert rule {} {} {} {}".format(
_nftables_cmd(), nft_family, table, chain, rule
)
out = __salt__["cmd.run"](cmd, python_shell=False)
if len(out) == 0:
if not out:
ret["result"] = True
ret[
"comment"
] = 'Added rule "{0}" chain {1} in table {2} in family {3}.'.format(
ret["comment"] = 'Added rule "{}" chain {} in table {} in family {}.'.format(
rule, chain, table, family
)
else:
ret[
"comment"
] = 'Failed to add rule "{0}" chain {1} in table {2} in family {3}.'.format(
] = 'Failed to add rule "{}" chain {} in table {} in family {}.'.format(
rule, chain, table, family
)
return ret
@ -968,7 +1045,7 @@ def delete(table, chain=None, position=None, rule=None, family="ipv4"):
family=ipv6
"""
ret = {
"comment": "Failed to delete rule {0} in table {1}.".format(rule, table),
"comment": "Failed to delete rule {} in table {}.".format(rule, table),
"result": False,
}
@ -988,7 +1065,7 @@ def delete(table, chain=None, position=None, rule=None, family="ipv4"):
if not res["result"]:
ret[
"comment"
] = "Rule {0} chain {1} in table {2} in family {3} does not exist".format(
] = "Rule {} chain {} in table {} in family {} does not exist".format(
rule, chain, table, family
)
return ret
@ -999,22 +1076,22 @@ def delete(table, chain=None, position=None, rule=None, family="ipv4"):
position = get_rule_handle(table, chain, rule, family)
nft_family = _NFTABLES_FAMILIES[family]
cmd = "{0} delete rule {1} {2} {3} handle {4}".format(
cmd = "{} delete rule {} {} {} handle {}".format(
_nftables_cmd(), nft_family, table, chain, position
)
out = __salt__["cmd.run"](cmd, python_shell=False)
if len(out) == 0:
if not out:
ret["result"] = True
ret[
"comment"
] = 'Deleted rule "{0}" in chain {1} in table {2} in family {3}.'.format(
] = 'Deleted rule "{}" in chain {} in table {} in family {}.'.format(
rule, chain, table, family
)
else:
ret[
"comment"
] = 'Failed to delete rule "{0}" in chain {1} table {2} in family {3}'.format(
] = 'Failed to delete rule "{}" in chain {} table {} in family {}'.format(
rule, chain, table, family
)
return ret
@ -1037,7 +1114,7 @@ def flush(table="filter", chain="", family="ipv4"):
salt '*' nftables.flush filter input family=ipv6
"""
ret = {
"comment": "Failed to flush rules from chain {0} in table {1}.".format(
"comment": "Failed to flush rules from chain {} in table {}.".format(
chain, table
),
"result": False,
@ -1053,20 +1130,129 @@ def flush(table="filter", chain="", family="ipv4"):
res = check_chain(table, chain, family=family)
if not res["result"]:
return res
cmd = "{0} flush chain {1} {2} {3}".format(
cmd = "{} flush chain {} {} {}".format(
_nftables_cmd(), nft_family, table, chain
)
comment = "from chain {0} in table {1} in family {2}.".format(
chain, table, family
)
comment = "from chain {} in table {} in family {}.".format(chain, table, family)
else:
cmd = "{0} flush table {1} {2}".format(_nftables_cmd(), nft_family, table)
comment = "from table {0} in family {1}.".format(table, family)
cmd = "{} flush table {} {}".format(_nftables_cmd(), nft_family, table)
comment = "from table {} in family {}.".format(table, family)
out = __salt__["cmd.run"](cmd, python_shell=False)
if len(out) == 0:
if not out:
ret["result"] = True
ret["comment"] = "Flushed rules {0}".format(comment)
ret["comment"] = "Flushed rules {}".format(comment)
else:
ret["comment"] = "Failed to flush rules {0}".format(comment)
ret["comment"] = "Failed to flush rules {}".format(comment)
return ret
def get_policy(table="filter", chain=None, family="ipv4"):
"""
.. versionadded:: Magnesium
Return the current policy for the specified table/chain
table
Name of the table containing the chain to check
chain
Name of the chain to get the policy for
family
Networking family, either ipv4 or ipv6
CLI Example:
.. code-block:: bash
salt '*' nftables.get_policy filter input
IPv6:
salt '*' nftables.get_policy filter input family=ipv6
"""
if not chain:
return "Error: Chain needs to be specified"
nft_family = _NFTABLES_FAMILIES[family]
rules = get_rules_json(family=nft_family)
try:
for rule in rules["nftables"]:
if (
rule.get("chain", {}).get("name") == chain
and rule.get("chain", {}).get("type") == table
):
return rule["chain"]["policy"]
except (KeyError, TypeError, ValueError):
return None
def set_policy(table="filter", chain=None, policy=None, family="ipv4"):
"""
.. versionadded:: Magnesium
Set the current policy for the specified table/chain. This only works on
chains with an existing base chain.
table
Name of the table containing the chain to modify
chain
Name of the chain to set the policy for
policy
accept or drop
family
Networking family, either ipv4 or ipv6
CLI Example:
.. code-block:: bash
salt '*' nftables.set_policy filter input accept
IPv6:
salt '*' nftables.set_policy filter input accept family=ipv6
"""
if not chain:
return "Error: Chain needs to be specified"
if not policy:
return "Error: Policy needs to be specified"
nft_family = _NFTABLES_FAMILIES[family]
chain_info = {}
rules = get_rules_json(family=nft_family)
if not rules:
return False
for rule in rules:
try:
if rule["chain"]["table"] == table and rule["chain"]["name"] == chain:
chain_info = rule["chain"]
break
except KeyError:
continue
if not chain_info:
return False
cmd = "{} add chain {} {} {}".format(_nftables_cmd(), nft_family, table, chain)
# We can't infer the base chain parameters. Bail out if they're not present.
if "type" not in chain_info or "hook" not in chain_info or "prio" not in chain_info:
return False
params = "type {} hook {} priority {};".format(
chain_info["type"], chain_info["hook"], chain_info["prio"]
)
cmd = '{0} "{{ {1} policy {2}; }}"'.format(cmd, params, policy)
out = __salt__["cmd.run_all"](cmd, python_shell=False)
return not out["retcode"]

View file

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
Management of nftables
======================
@ -110,7 +109,6 @@ at some point be deprecated in favor of a more generic `firewall` state.
- table: filter
"""
from __future__ import absolute_import, print_function, unicode_literals
import logging
@ -135,7 +133,9 @@ def chain_present(
"""
.. versionadded:: 2014.7.0
Verify the chain is exist.
.. versionchanged:: Magnesium
Verify a chain exists in a table.
name
A user-defined chain name.
@ -152,9 +152,15 @@ def chain_present(
chain_check = __salt__["nftables.check_chain"](table, name, family=family)
if chain_check["result"] is True:
ret["result"] = True
ret["comment"] = "nftables {} chain is already exist in {} table for {}".format(
name, table, family
)
return ret
if __opts__["test"]:
ret[
"comment"
] = "nftables {0} chain is already exist in {1} table for {2}".format(
] = "nftables chain {} would be created in table {} for family {}".format(
name, table, family
)
return ret
@ -166,15 +172,13 @@ def chain_present(
if res["result"] is True:
ret["changes"] = {"locale": name}
ret["result"] = True
ret[
"comment"
] = "nftables {0} chain in {1} table create success for {2}".format(
ret["comment"] = "nftables {} chain in {} table create success for {}".format(
name, table, family
)
return ret
else:
ret["result"] = False
ret["comment"] = "Failed to create {0} chain in {1} table: {2} for {3}".format(
ret["comment"] = "Failed to create {} chain in {} table: {} for {}".format(
name, table, res["comment"].strip(), family
)
return ret
@ -197,7 +201,7 @@ def chain_absent(name, table="filter", family="ipv4"):
ret["result"] = True
ret[
"comment"
] = "nftables {0} chain is already absent in {1} table for {2}".format(
] = "nftables {} chain is already absent in {} table for {}".format(
name, table, family
)
return ret
@ -210,19 +214,17 @@ def chain_absent(name, table="filter", family="ipv4"):
ret["result"] = True
ret[
"comment"
] = "nftables {0} chain in {1} table delete success for {2}".format(
] = "nftables {} chain in {} table delete success for {}".format(
name, table, family
)
else:
ret["result"] = False
ret[
"comment"
] = "Failed to delete {0} chain in {1} table: {2} for {3}".format(
ret["comment"] = "Failed to delete {} chain in {} table: {} for {}".format(
name, table, command.strip(), family
)
else:
ret["result"] = False
ret["comment"] = "Failed to flush {0} chain in {1} table: {2} for {3}".format(
ret["comment"] = "Failed to flush {} chain in {} table: {} for {}".format(
name, table, flush_chain.strip(), family
)
return ret
@ -266,12 +268,12 @@ def append(name, family="ipv4", **kwargs):
res = __salt__["nftables.check"](kwargs["table"], kwargs["chain"], rule, family)
if res["result"]:
ret["result"] = True
ret["comment"] = "nftables rule for {0} already set ({1}) for {2}".format(
ret["comment"] = "nftables rule for {} already set ({}) for {}".format(
name, command.strip(), family
)
return ret
if "test" in __opts__ and __opts__["test"]:
ret["comment"] = "nftables rule for {0} needs to be set ({1}) for {2}".format(
ret["comment"] = "nftables rule for {} needs to be set ({}) for {}".format(
name, command.strip(), family
)
return ret
@ -279,23 +281,23 @@ def append(name, family="ipv4", **kwargs):
if res["result"]:
ret["changes"] = {"locale": name}
ret["result"] = True
ret["comment"] = "Set nftables rule for {0} to: {1} for {2}".format(
ret["comment"] = "Set nftables rule for {} to: {} for {}".format(
name, command.strip(), family
)
if "save" in kwargs:
if kwargs["save"]:
__salt__["nftables.save"](filename=None, family=family)
ret["comment"] = (
"Set and Saved nftables rule for {0} to: "
"{1} for {2}".format(name, command.strip(), family)
"Set and Saved nftables rule for {} to: "
"{} for {}".format(name, command.strip(), family)
)
return ret
else:
ret["result"] = False
ret["comment"] = (
"Failed to set nftables rule for {0}.\n"
"Attempted rule was {1} for {2}.\n"
"{3}"
"Failed to set nftables rule for {}.\n"
"Attempted rule was {} for {}.\n"
"{}"
).format(name, command.strip(), family, res["comment"])
return ret
@ -338,12 +340,12 @@ def insert(name, family="ipv4", **kwargs):
res = __salt__["nftables.check"](kwargs["table"], kwargs["chain"], rule, family)
if res["result"]:
ret["result"] = True
ret["comment"] = "nftables rule for {0} already set for {1} ({2})".format(
ret["comment"] = "nftables rule for {} already set for {} ({})".format(
name, family, command.strip()
)
return ret
if "test" in __opts__ and __opts__["test"]:
ret["comment"] = "nftables rule for {0} needs to be set for {1} ({2})".format(
ret["comment"] = "nftables rule for {} needs to be set for {} ({})".format(
name, family, command.strip()
)
return ret
@ -353,21 +355,21 @@ def insert(name, family="ipv4", **kwargs):
if res["result"]:
ret["changes"] = {"locale": name}
ret["result"] = True
ret["comment"] = "Set nftables rule for {0} to: {1} for {2}".format(
ret["comment"] = "Set nftables rule for {} to: {} for {}".format(
name, command.strip(), family
)
if "save" in kwargs:
if kwargs["save"]:
__salt__["nftables.save"](filename=None, family=family)
ret["comment"] = (
"Set and Saved nftables rule for {0} to: "
"{1} for {2}".format(name, command.strip(), family)
"Set and Saved nftables rule for {} to: "
"{} for {}".format(name, command.strip(), family)
)
return ret
else:
ret["result"] = False
ret["comment"] = (
"Failed to set nftables rule for {0}.\n" "Attempted rule was {1}"
"Failed to set nftables rule for {}.\n" "Attempted rule was {}"
).format(name, command.strip())
return ret
@ -411,14 +413,12 @@ def delete(name, family="ipv4", **kwargs):
if not res["result"]:
ret["result"] = True
ret["comment"] = "nftables rule for {0} already absent for {1} ({2})".format(
ret["comment"] = "nftables rule for {} already absent for {} ({})".format(
name, family, command.strip()
)
return ret
if "test" in __opts__ and __opts__["test"]:
ret[
"comment"
] = "nftables rule for {0} needs to be deleted for {1} ({2})".format(
ret["comment"] = "nftables rule for {} needs to be deleted for {} ({})".format(
name, family, command.strip()
)
return ret
@ -435,37 +435,47 @@ def delete(name, family="ipv4", **kwargs):
if res["result"]:
ret["changes"] = {"locale": name}
ret["result"] = True
ret["comment"] = "Delete nftables rule for {0} {1}".format(
name, command.strip()
)
ret["comment"] = "Delete nftables rule for {} {}".format(name, command.strip())
if "save" in kwargs:
if kwargs["save"]:
__salt__["nftables.save"](filename=None, family=family)
ret["comment"] = (
"Deleted and Saved nftables rule for {0} for {1}"
"{2}".format(name, command.strip(), family)
"Deleted and Saved nftables rule for {} for {}"
"{}".format(name, command.strip(), family)
)
return ret
else:
ret["result"] = False
ret["comment"] = (
"Failed to delete nftables rule for {0}.\n" "Attempted rule was {1}"
"Failed to delete nftables rule for {}.\n" "Attempted rule was {}"
).format(name, command.strip())
return ret
def flush(name, family="ipv4", **kwargs):
def flush(name, family="ipv4", ignore_absence=False, **kwargs):
"""
.. versionadded:: 2014.7.0
.. versionchanged:: Magnesium
Flush current nftables state
family
Networking family, either ipv4 or ipv6
ignore_absence
If set to True, attempts to flush a non-existent table will not
result in a failed state.
.. versionadded:: Magnesium
"""
ret = {"name": name, "changes": {}, "result": None, "comment": ""}
if __opts__["test"]:
ret["comment"] = "nftables flush not performed in test mode."
return ret
for ignore in _STATE_INTERNAL_KEYWORDS:
if ignore in kwargs:
del kwargs[ignore]
@ -473,12 +483,12 @@ def flush(name, family="ipv4", **kwargs):
if "table" not in kwargs:
kwargs["table"] = "filter"
res = __salt__["nftables.check_table"](kwargs["table"], family=family)
if not res["result"]:
check_table = __salt__["nftables.check_table"](kwargs["table"], family=family)
if not ignore_absence and not check_table["result"]:
ret["result"] = False
ret[
"comment"
] = "Failed to flush table {0} in family {1}, table does not exist.".format(
] = "Failed to flush table {} in family {}, table does not exist.".format(
kwargs["table"], family
)
return ret
@ -486,25 +496,25 @@ def flush(name, family="ipv4", **kwargs):
if "chain" not in kwargs:
kwargs["chain"] = ""
else:
res = __salt__["nftables.check_chain"](
check_chain = __salt__["nftables.check_chain"](
kwargs["table"], kwargs["chain"], family=family
)
if not res["result"]:
if not ignore_absence and not check_chain["result"]:
ret["result"] = False
ret[
"comment"
] = "Failed to flush chain {0} in table {1} in family {2}, chain does not exist.".format(
] = "Failed to flush chain {} in table {} in family {}, chain does not exist.".format(
kwargs["chain"], kwargs["table"], family
)
return ret
res = __salt__["nftables.flush"](kwargs["table"], kwargs["chain"], family)
if res["result"]:
if res["result"] or (
ignore_absence and (not check_table["result"] or not check_chain["result"])
):
ret["changes"] = {"locale": name}
ret["result"] = True
ret[
"comment"
] = "Flush nftables rules in {0} table {1} chain {2} family".format(
ret["comment"] = "Flush nftables rules in {} table {} chain {} family".format(
kwargs["table"], kwargs["chain"], family
)
return ret
@ -512,3 +522,167 @@ def flush(name, family="ipv4", **kwargs):
ret["result"] = False
ret["comment"] = "Failed to flush nftables rules"
return ret
def set_policy(name, table="filter", family="ipv4", **kwargs):
"""
.. versionadded:: Magnesium
Sets the default policy for nftables chains
table
The table that owns the chain that should be modified
family
Networking family, either ipv4 or ipv6
policy
The requested table policy (accept or drop)
save
Boolean to save the in-memory nftables settings to a file.
save_filename
The filename to save the nftables settings (default: /etc/nftables
or /etc/nftables/salt-all-in-one.nft if the former is a directory)
"""
ret = {"name": name, "changes": {}, "result": None, "comment": ""}
for ignore in _STATE_INTERNAL_KEYWORDS:
if ignore in kwargs:
del kwargs[ignore]
policy = __salt__["nftables.get_policy"](table, kwargs["chain"], family)
if (policy or "").lower() == kwargs["policy"].lower():
ret["result"] = True
ret[
"comment"
] = "nftables default policy for chain {} on table {} for {} already set to {}".format(
kwargs["chain"], table, family, kwargs["policy"]
)
return ret
if __opts__["test"]:
ret[
"comment"
] = "nftables default policy for chain {} on table {} for {} needs to be set to {}".format(
kwargs["chain"], table, family, kwargs["policy"]
)
return ret
if __salt__["nftables.set_policy"](
table, kwargs["chain"], kwargs["policy"].lower(), family
):
ret["changes"] = {"locale": name}
ret["result"] = True
ret["comment"] = "Set default policy for {} to {} family {}".format(
kwargs["chain"], kwargs["policy"], family
)
if "save" in kwargs:
if kwargs["save"]:
__salt__["nftables.save"](
filename=kwargs.get("save_filename"), family=family
)
ret[
"comment"
] = "Set and saved default policy for {} to {} family {}".format(
kwargs["chain"], kwargs["policy"], family
)
else:
ret["result"] = False
ret["comment"] = "Failed to set nftables default policy"
return ret
def table_present(name, family="ipv4", **kwargs):
"""
.. versionadded:: Magnesium
Ensure an nftables table is present
name
A user-defined table name.
family
Networking family, either ipv4 or ipv6
"""
ret = {"name": name, "changes": {}, "result": None, "comment": ""}
table_check = __salt__["nftables.check_table"](name, family=family)
if table_check["result"] is True:
ret["result"] = True
ret["comment"] = "nftables table {} already exists in family {}".format(
name, family
)
return ret
if __opts__["test"]:
ret["comment"] = "nftables table {} would be created in family {}".format(
name, family
)
return ret
res = __salt__["nftables.new_table"](name, family=family)
if res["result"] is True:
ret["changes"] = {"locale": name}
ret["result"] = True
ret["comment"] = "nftables table {} successfully created in family {}".format(
name, family
)
else:
ret["result"] = False
ret["comment"] = "Failed to create table {} for family {}".format(name, family)
return ret
def table_absent(name, family="ipv4", **kwargs):
"""
.. versionadded:: Magnesium
Ensure an nftables table is absent
name
Name of the table to ensure is absent
family
Networking family, either ipv4 or ipv6
"""
ret = {"name": name, "changes": {}, "result": None, "comment": ""}
table_check = __salt__["nftables.check_table"](name, family)
if table_check["result"] is False:
ret["result"] = True
ret["comment"] = "nftables table {} is already absent from family {}".format(
name, family
)
return ret
if __opts__["test"]:
ret["comment"] = "nftables table {} would be deleted from family {}".format(
name, family
)
return ret
res = __salt__["nftables.delete_table"](name, family=family)
if res["result"] is True:
ret["changes"] = {"locale": name}
ret["result"] = True
ret["comment"] = "nftables table {} successfully deleted from family {}".format(
name, family
)
else:
ret["result"] = False
ret["comment"] = "Failed to delete table {} from family {}".format(name, family)
return ret

View file

@ -1,10 +1,10 @@
# -*- coding: utf-8 -*-
"""
:codeauthor: Jayesh Kariya <jayeshk@saltstack.com>
"""
# Import Python Libs
from __future__ import absolute_import, print_function, unicode_literals
import json
# Import Salt Libs
import salt.modules.nftables as nftables
@ -156,6 +156,71 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
with patch.object(nftables, "list_tables", list_tables_mock):
self.assertListEqual(nftables.get_rules(), [])
# 'get_rules_json' function tests: 1
def test_get_rules_json(self):
"""
Test if it return a data structure of the current, in-memory rules
"""
list_rules_return = """
{
"nftables": [
{
"table": {
"family": "ip",
"name": "filter",
"handle": 47
}
},
{
"chain": {
"family": "ip",
"table": "filter",
"name": "input",
"handle": 1,
"type": "filter",
"hook": "input",
"prio": 0,
"policy": "accept"
}
},
{
"chain": {
"family": "ip",
"table": "filter",
"name": "forward",
"handle": 2,
"type": "filter",
"hook": "forward",
"prio": 0,
"policy": "accept"
}
},
{
"chain": {
"family": "ip",
"table": "filter",
"name": "output",
"handle": 3,
"type": "filter",
"hook": "output",
"prio": 0,
"policy": "accept"
}
}
]
}
"""
list_rules_mock = MagicMock(return_value=list_rules_return)
expected = json.loads(list_rules_return)["nftables"]
with patch.dict(nftables.__salt__, {"cmd.run": list_rules_mock}):
self.assertListEqual(nftables.get_rules_json(), expected)
list_rules_mock = MagicMock(return_value=[])
with patch.dict(nftables.__salt__, {"cmd.run": list_rules_mock}):
self.assertListEqual(nftables.get_rules_json(), [])
# 'save' function tests: 1
def test_save(self):
@ -164,14 +229,17 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
"""
with patch.dict(nftables.__grains__, {"os_family": "Debian"}):
mock = MagicMock(return_value=False)
with patch.dict(nftables.__salt__, {"cmd.run": mock}):
with patch.object(salt.utils.files, "fopen", MagicMock(mock_open())):
self.assertEqual(nftables.save(), "#! nft -f\n\n")
with patch.dict(nftables.__salt__, {"file.directory_exists": mock}):
with patch.dict(nftables.__salt__, {"cmd.run": mock}):
with patch.object(
salt.utils.files, "fopen", MagicMock(mock_open())
):
self.assertEqual(nftables.save(), "#! nft -f\n\n")
with patch.object(
salt.utils.files, "fopen", MagicMock(side_effect=IOError)
):
self.assertRaises(CommandExecutionError, nftables.save)
with patch.object(
salt.utils.files, "fopen", MagicMock(side_effect=IOError)
):
self.assertRaises(CommandExecutionError, nftables.save)
# 'get_rule_handle' function tests: 1
@ -578,14 +646,14 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
MagicMock(return_value={"result": True, "comment": ""}),
):
_expected = {
"comment": 'Failed to add rule "{0}" chain input in table filter in family ipv4.'.format(
"comment": 'Failed to add rule "{}" chain input in table filter in family ipv4.'.format(
_ru
),
"result": False,
}
self.assertEqual(nftables.append(chain="input", rule=_ru), _expected)
_expected = {
"comment": 'Added rule "{0}" chain input in table filter in family ipv4.'.format(
"comment": 'Added rule "{}" chain input in table filter in family ipv4.'.format(
_ru
),
"result": True,
@ -652,14 +720,14 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
):
_expected = {
"result": False,
"comment": 'Failed to add rule "{0}" chain input in table filter in family ipv4.'.format(
"comment": 'Failed to add rule "{}" chain input in table filter in family ipv4.'.format(
_ru
),
}
self.assertEqual(nftables.insert(chain="input", rule=_ru), _expected)
_expected = {
"result": True,
"comment": 'Added rule "{0}" chain input in table filter in family ipv4.'.format(
"comment": 'Added rule "{}" chain input in table filter in family ipv4.'.format(
_ru
),
}
@ -778,3 +846,154 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
"comment": "Flushed rules from chain input in table filter in family ipv4.",
}
self.assertEqual(nftables.flush(table="filter", chain="input"), _expected)
# 'get_policy' function tests: 1
def test_get_policy(self):
"""
Test the current policy for the specified table/chain
"""
list_rules_return = """
{
"nftables": [
{
"table": {
"family": "ip",
"name": "filter",
"handle": 47
}
},
{
"chain": {
"family": "ip",
"table": "filter",
"name": "input",
"handle": 1,
"type": "filter",
"hook": "input",
"prio": 0,
"policy": "accept"
}
},
{
"chain": {
"family": "ip",
"table": "filter",
"name": "forward",
"handle": 2,
"type": "filter",
"hook": "forward",
"prio": 0,
"policy": "accept"
}
},
{
"chain": {
"family": "ip",
"table": "filter",
"name": "output",
"handle": 3,
"type": "filter",
"hook": "output",
"prio": 0,
"policy": "accept"
}
}
]
}
"""
expected = json.loads(list_rules_return)
self.assertEqual(
nftables.get_policy(table="filter", chain=None, family="ipv4"),
"Error: Chain needs to be specified",
)
with patch.object(nftables, "get_rules_json", MagicMock(return_value=expected)):
self.assertEqual(
nftables.get_policy(table="filter", chain="input", family="ipv4"),
"accept",
)
with patch.object(nftables, "get_rules_json", MagicMock(return_value=expected)):
self.assertIsNone(
nftables.get_policy(table="filter", chain="missing", family="ipv4")
)
# 'set_policy' function tests: 1
def test_set_policy(self):
"""
Test set the current policy for the specified table/chain
"""
list_rules_return = """
{
"nftables": [
{
"table": {
"family": "ip",
"name": "filter",
"handle": 47
}
},
{
"chain": {
"family": "ip",
"table": "filter",
"name": "input",
"handle": 1,
"type": "filter",
"hook": "input",
"prio": 0,
"policy": "accept"
}
},
{
"chain": {
"family": "ip",
"table": "filter",
"name": "forward",
"handle": 2,
"type": "filter",
"hook": "forward",
"prio": 0,
"policy": "accept"
}
},
{
"chain": {
"family": "ip",
"table": "filter",
"name": "output",
"handle": 3,
"type": "filter",
"hook": "output",
"prio": 0,
"policy": "accept"
}
}
]
}
"""
expected = json.loads(list_rules_return)["nftables"]
self.assertEqual(
nftables.set_policy(table="filter", chain=None, policy=None, family="ipv4"),
"Error: Chain needs to be specified",
)
self.assertEqual(
nftables.set_policy(
table="filter", chain="input", policy=None, family="ipv4"
),
"Error: Policy needs to be specified",
)
mock = MagicMock(return_value={"retcode": 0})
with patch.object(nftables, "get_rules_json", MagicMock(return_value=expected)):
with patch.dict(nftables.__salt__, {"cmd.run_all": mock}):
self.assertTrue(
nftables.set_policy(
table="filter", chain="input", policy="accept", family="ipv4"
)
)

View file

@ -1,10 +1,8 @@
# -*- coding: utf-8 -*-
"""
:codeauthor: Rahul Handay <rahulha@saltstack.com>
"""
# Import Python Libs
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Libs
import salt.states.nftables as nftables
@ -51,24 +49,25 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
]
)
with patch.dict(nftables.__salt__, {"nftables.new_chain": mock}):
ret.update(
{
"changes": {"locale": "salt"},
"comment": "nftables salt chain in filter"
" table create success for ipv4",
}
)
self.assertDictEqual(nftables.chain_present("salt"), ret)
with patch.dict(nftables.__opts__, {"test": False}):
ret.update(
{
"changes": {"locale": "salt"},
"comment": "nftables salt chain in filter"
" table create success for ipv4",
}
)
self.assertDictEqual(nftables.chain_present("salt"), ret)
ret.update(
{
"changes": {},
"comment": "Failed to create salt chain"
" in filter table: for ipv4",
"result": False,
}
)
self.assertDictEqual(nftables.chain_present("salt"), ret)
ret.update(
{
"changes": {},
"comment": "Failed to create salt chain"
" in filter table: for ipv4",
"result": False,
}
)
self.assertDictEqual(nftables.chain_present("salt"), ret)
def test_chain_absent(self):
"""
@ -343,59 +342,232 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
]
)
with patch.dict(nftables.__salt__, {"nftables.check_table": mock}):
ret.update(
{
"comment": "Failed to flush table in family"
" ipv4, table does not exist.",
"result": False,
}
)
self.assertDictEqual(nftables.flush("salt", table="", chain=""), ret)
mock = MagicMock(
side_effect=[
{"result": False, "comment": ""},
{"result": True, "comment": ""},
{"result": True, "comment": ""},
]
)
with patch.dict(nftables.__salt__, {"nftables.check_chain": mock}):
with patch.dict(nftables.__opts__, {"test": False}):
ret.update(
{
"comment": "Failed to flush chain in table"
" in family ipv4, chain does not exist."
"comment": "Failed to flush table in family"
" ipv4, table does not exist.",
"result": False,
}
)
self.assertDictEqual(
nftables.flush("salt", table="", chain=""), ret
nftables.flush(
"salt", table="", chain="", ignore_absence=False
),
ret,
)
mock = MagicMock(
side_effect=[
{"result": True, "comment": ""},
{"result": False, "comment": ""},
{"result": True, "comment": ""},
{"result": True, "comment": ""},
]
)
with patch.dict(nftables.__salt__, {"nftables.flush": mock}):
with patch.dict(nftables.__salt__, {"nftables.check_chain": mock}):
ret.update(
{
"comment": "Failed to flush chain in table"
" in family ipv4, chain does not exist."
}
)
self.assertDictEqual(
nftables.flush(
"salt", table="", chain="", ignore_absence=False
),
ret,
)
mock = MagicMock(
side_effect=[
{"result": True, "comment": ""},
{"result": False, "comment": ""},
]
)
with patch.dict(nftables.__salt__, {"nftables.flush": mock}):
ret.update(
{
"changes": {"locale": "salt"},
"comment": "Flush nftables rules in table chain ipv4 family",
"result": True,
}
)
self.assertDictEqual(
nftables.flush("salt", table="", chain=""), ret
)
ret.update(
{
"changes": {},
"comment": "Failed to flush nftables rules",
"result": False,
}
)
self.assertDictEqual(
nftables.flush("salt", table="", chain=""), ret
)
def test_set_policy(self):
"""
Test to sets the default policy for nftables firewall tables
"""
ret = {"name": "salt", "changes": {}, "result": True, "comment": ""}
mock = MagicMock(return_value=[])
with patch.object(nftables, "_STATE_INTERNAL_KEYWORDS", mock):
mock = MagicMock(return_value="stack")
with patch.dict(nftables.__salt__, {"nftables.get_policy": mock}):
ret.update(
{
"comment": "nftables default policy for chain"
" on table for ipv4 already set to stack"
}
)
self.assertDictEqual(
nftables.set_policy("salt", table="", chain="", policy="stack"),
ret,
)
with patch.dict(nftables.__opts__, {"test": True}):
ret.update(
{
"comment": "nftables default policy for chain"
" on table for ipv4 needs to be set to sal",
"result": None,
}
)
self.assertDictEqual(
nftables.set_policy("salt", table="", chain="", policy="sal"),
ret,
)
with patch.dict(nftables.__opts__, {"test": False}):
mock = MagicMock(side_effect=[True, False])
with patch.dict(nftables.__salt__, {"nftables.set_policy": mock}):
ret.update(
{
"changes": {"locale": "salt"},
"comment": "Flush nftables rules in table"
" chain ipv4 family",
"comment": "Set default policy for to sal family ipv4",
"result": True,
}
)
self.assertDictEqual(
nftables.flush("salt", table="", chain=""), ret
nftables.set_policy(
"salt", table="", chain="", policy="sal"
),
ret,
)
ret.update(
{
"changes": {},
"comment": "Failed to flush" " nftables rules",
"comment": "Failed to set nftables default policy",
"result": False,
"changes": {},
}
)
self.assertDictEqual(
nftables.flush("salt", table="", chain=""), ret
nftables.set_policy(
"salt", table="", chain="", policy="sal"
),
ret,
)
def test_table_present(self):
"""
Test to verify a table exists.
"""
ret = {"name": "salt", "changes": {}, "result": True, "comment": ""}
mock = MagicMock(
side_effect=[
{"result": True},
{"result": False},
{"result": False},
{"result": False},
]
)
with patch.dict(nftables.__salt__, {"nftables.check_table": mock}):
ret.update({"comment": "nftables table salt already exists in family ipv4"})
self.assertDictEqual(nftables.table_present("salt"), ret)
with patch.dict(nftables.__opts__, {"test": True}):
ret.update(
{
"comment": "nftables table salt would be created in family ipv4",
"result": None,
}
)
self.assertDictEqual(nftables.table_present("salt"), ret)
with patch.dict(nftables.__opts__, {"test": False}):
mock = MagicMock(side_effect=[{"result": True}, {"result": False}])
with patch.dict(nftables.__salt__, {"nftables.new_table": mock}):
ret.update(
{
"result": True,
"comment": "nftables table salt successfully created in family ipv4",
"changes": {"locale": "salt"},
}
)
self.assertDictEqual(nftables.table_present("salt"), ret)
ret.update(
{
"changes": {},
"result": False,
"comment": "Failed to create table salt for family ipv4",
}
)
self.assertDictEqual(nftables.table_present("salt"), ret)
def test_table_absent(self):
"""
Test to verify a table is absent.
"""
ret = {"name": "salt", "changes": {}, "result": True, "comment": ""}
mock = MagicMock(
side_effect=[
{"result": False},
{"result": True},
{"result": True},
{"result": True},
]
)
with patch.dict(nftables.__salt__, {"nftables.check_table": mock}):
ret.update(
{"comment": "nftables table salt is already absent from family ipv4"}
)
self.assertDictEqual(nftables.table_absent("salt"), ret)
with patch.dict(nftables.__opts__, {"test": True}):
ret.update(
{
"comment": "nftables table salt would be deleted from family ipv4",
"result": None,
}
)
self.assertDictEqual(nftables.table_absent("salt"), ret)
with patch.dict(nftables.__opts__, {"test": False}):
mock = MagicMock(side_effect=[False, "a"])
with patch.dict(nftables.__salt__, {"nftables.flush": mock}):
mock = MagicMock(side_effect=[{"result": True}, {"result": False}])
with patch.dict(nftables.__salt__, {"nftables.delete_table": mock}):
ret.update(
{
"changes": {"locale": "salt"},
"comment": "nftables table salt successfully deleted from family ipv4",
"result": True,
}
)
self.assertDictEqual(nftables.table_absent("salt"), ret)
ret.update(
{
"changes": {},
"result": False,
"comment": "Failed to delete table salt from family ipv4",
}
)
self.assertDictEqual(nftables.table_absent("salt"), ret)