Merge pull request #51738 from garethgreenaway/nftables_cleanup

[2018.3] Nftables cleanup
This commit is contained in:
Daniel Wozniak 2019-02-21 10:25:43 -07:00 committed by GitHub
commit c7136cbd3f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 705 additions and 314 deletions

View file

@ -117,6 +117,10 @@ def build_rule(table=None, chain=None, command=None, position='', full=None, fam
family=ipv6
'''
ret = {'comment': '',
'rule': '',
'result': False}
if 'target' in kwargs:
kwargs['jump'] = kwargs['target']
del kwargs['target']
@ -152,14 +156,14 @@ def build_rule(table=None, chain=None, command=None, position='', full=None, fam
kwargs['dport'] = six.text_type(kwargs['dport'])
if ':' in kwargs['dport']:
kwargs['dport'] = kwargs['dport'].replace(':', '-')
rule += 'dport {{ {0}}} '.format(kwargs['dport'])
rule += 'dport {{ {0} }} '.format(kwargs['dport'])
del kwargs['dport']
if 'sport' in kwargs:
kwargs['sport'] = six.text_type(kwargs['sport'])
if ':' in kwargs['sport']:
kwargs['sport'] = kwargs['sport'].replace(':', '-')
rule += 'sport {{ {0}}} '.format(kwargs['sport'])
rule += 'sport {{ {0} }} '.format(kwargs['sport'])
del kwargs['sport']
if 'dports' in kwargs:
@ -171,7 +175,7 @@ def build_rule(table=None, chain=None, command=None, position='', full=None, fam
_dports.sort(reverse=True)
kwargs['dports'] = ', '.join(six.text_type(x) for x in _dports)
rule += 'dport {{ {0}}} '.format(kwargs['dports'])
rule += 'dport {{ {0} }} '.format(kwargs['dports'])
del kwargs['dports']
if 'sports' in kwargs:
@ -207,8 +211,7 @@ def build_rule(table=None, chain=None, command=None, position='', full=None, fam
del kwargs['to-ports']
if 'to-destination' in kwargs:
after_jump.append('--to-destination {0} '.
format(kwargs['to-destination']))
after_jump.append('--to-destination {0} '.format(kwargs['to-destination']))
del kwargs['to-destination']
if 'reject-with' in kwargs:
@ -225,31 +228,53 @@ def build_rule(table=None, chain=None, command=None, position='', full=None, fam
rule = rule.replace('dport', '{0} dport'.format(proto))
rule = rule.replace('sport', '{0} sport'.format(proto))
ret['rule'] = rule
if full in ['True', 'true']:
if not table:
return 'Error: Table needs to be specified'
ret['comment'] = 'Table needs to be specified'
return ret
if not chain:
return 'Error: Chain needs to be specified'
ret['comment'] = 'Chain needs to be specified'
return ret
if not command:
return 'Error: Command needs to be specified'
ret['comment'] = 'Command needs to be specified'
return ret
if command in ['Insert', 'insert', 'INSERT']:
if position:
return '{0} insert rule {1} {2} {3} position {4} {5}'.\
format(_nftables_cmd(), nft_family, table,
chain, position, rule)
ret['rule'] = '{0} insert rule {1} {2} {3} ' \
'position {4} {5}'.format(_nftables_cmd(),
nft_family,
table,
chain,
position,
rule)
else:
return '{0} insert rule {1} {2} {3} {4}'.\
format(_nftables_cmd(), nft_family, table,
chain, rule)
ret['rule'] = '{0} insert rule ' \
'{1} {2} {3} {4}'.format(_nftables_cmd(),
nft_family,
table,
chain,
rule)
else:
ret['rule'] = '{0} {1} rule {2} {3} {4} {5}'.format(_nftables_cmd(),
command,
nft_family,
table,
chain,
rule)
return '{0} {1} rule {2} {3} {4} {5}'.format(_nftables_cmd(),
command, nft_family, table, chain, rule)
return rule
if ret['rule']:
ret['comment'] = 'Successfully built rule'
ret['result'] = True
return ret
def get_saved_rules(conf_file=None, family='ipv4'):
def get_saved_rules(conf_file=None):
'''
Return a data structure of the rules in the conf file
@ -291,8 +316,9 @@ def get_rules(family='ipv4'):
'''
nft_family = _NFTABLES_FAMILIES[family]
rules = []
cmd = '{0} --numeric --numeric --numeric list tables {1}'.\
format(_nftables_cmd(), nft_family)
cmd = '{0} --numeric --numeric --numeric ' \
'list tables {1}'. format(_nftables_cmd(),
nft_family)
out = __salt__['cmd.run'](cmd, python_shell=False)
if not out:
return rules
@ -300,8 +326,9 @@ def get_rules(family='ipv4'):
tables = re.split('\n+', out)
for table in tables:
table_name = table.split(' ')[1]
cmd = '{0} --numeric --numeric --numeric list table {1} {2}'.format(_nftables_cmd(),
nft_family, table_name)
cmd = '{0} --numeric --numeric --numeric ' \
'list table {1} {2}'.format(_nftables_cmd(),
nft_family, table_name)
out = __salt__['cmd.run'](cmd, python_shell=False)
rules.append(out)
return rules
@ -354,29 +381,35 @@ def get_rule_handle(table='filter', chain=None, rule=None, family='ipv4'):
.. code-block:: bash
salt '*' nftables.get_rule_handle filter input \\
rule='input tcp dport 22 log accept'
rule='tcp dport 22 log accept'
IPv6:
salt '*' nftables.get_rule_handle filter input \\
rule='input tcp dport 22 log accept' \\
rule='tcp dport 22 log accept' \\
family=ipv6
'''
ret = {'comment': '',
'result': False}
if not chain:
return 'Error: Chain needs to be specified'
ret['comment'] = 'Chain needs to be specified'
return ret
if not rule:
return 'Error: Rule needs to be specified'
ret['comment'] = 'Rule needs to be specified'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
format(table, family)
res = check_table(table, family=family)
if not res['result']:
return res
if not check_chain(table, chain, family=family):
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, family)
res = check_chain(table, chain, family=family)
if not res['result']:
return res
if not check(table, chain, rule, family=family):
return 'Error: rule {0} chain {1} in table {2} in family {3} does not exist'.\
format(rule, chain, table, family)
res = check(table, chain, rule, family=family)
if not res['result']:
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} --numeric --numeric --numeric --handle list chain {1} {2} {3}'.\
@ -388,8 +421,9 @@ def get_rule_handle(table='filter', chain=None, rule=None, family='ipv4'):
for r in rules:
match = pat.search(r)
if match:
return match.group('handle')
return 'Error: could not find rule {0}'.format(rule)
return {'result': True, 'handle': match.group('handle')}
return {'result': False,
'comment': 'Could not find rule {0}'.format(rule)}
def check(table='filter', chain=None, rule=None, family='ipv4'):
@ -406,40 +440,46 @@ def check(table='filter', chain=None, rule=None, family='ipv4'):
.. code-block:: bash
salt '*' nftables.check filter input \\
rule='input tcp dport 22 log accept'
rule='tcp dport 22 log accept'
IPv6:
salt '*' nftables.check filter input \\
rule='input tcp dport 22 log accept' \\
rule='tcp dport 22 log accept' \\
family=ipv6
'''
ret = {'comment': '',
'result': False}
if not chain:
return 'Error: Chain needs to be specified'
ret['comment'] = 'Chain needs to be specified'
return ret
if not rule:
return 'Error: Rule needs to be specified'
ret['comment'] = 'Rule needs to be specified'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
format(table, family)
res = check_table(table, family=family)
if not res['result']:
return res
if not check_chain(table, chain, family=family):
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, family)
res = check_chain(table, chain, family=family)
if not res['result']:
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} --handle --numeric --numeric --numeric list chain {1} {2} {3}'.\
format(_nftables_cmd(), nft_family, table, chain)
format(_nftables_cmd(), nft_family, table, chain)
search_rule = '{0} #'.format(rule)
out = __salt__['cmd.run'](cmd, python_shell=False).find(search_rule)
if out != -1:
out = ''
if out == -1:
ret['comment'] = 'Rule {0} in chain {1} in table {2} in family {3} does not exist'.\
format(rule, chain, table, family)
else:
return False
if not out:
return True
return out
ret['comment'] = 'Rule {0} in chain {1} in table {2} in family {3} exists'.\
format(rule, chain, table, family)
ret['result'] = True
return ret
def check_chain(table='filter', chain=None, family='ipv4'):
@ -458,21 +498,25 @@ def check_chain(table='filter', chain=None, family='ipv4'):
salt '*' nftables.check_chain filter input family=ipv6
'''
ret = {'comment': '',
'result': False}
if not chain:
return 'Error: Chain needs to be specified'
ret['comment'] = 'Chain needs to be specified'
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} list table {1} {2}' . format(_nftables_cmd(), nft_family, table)
out = __salt__['cmd.run'](cmd, python_shell=False).find('chain {0} {{'.format(chain))
if out != -1:
out = ''
if out == -1:
ret['comment'] = 'Chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, family)
else:
return False
if not out:
return True
return out
ret['comment'] = 'Chain {0} in table {1} in family {2} exists'.\
format(chain, table, family)
ret['result'] = True
return ret
def check_table(table=None, family='ipv4'):
@ -483,21 +527,25 @@ def check_table(table=None, family='ipv4'):
salt '*' nftables.check_table nat
'''
ret = {'comment': '',
'result': False}
if not table:
return 'Error: table needs to be specified'
ret['comment'] = 'Table needs to be specified'
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} list tables {1}' . format(_nftables_cmd(), nft_family)
out = __salt__['cmd.run'](cmd, python_shell=False).find('table {0} {1}'.format(nft_family, table))
if out != -1:
out = ''
if out == -1:
ret['comment'] = 'Table {0} in family {1} does not exist'.\
format(table, family)
else:
return False
if not out:
return True
return out
ret['comment'] = 'Table {0} in family {1} exists'.\
format(table, family)
ret['result'] = True
return ret
def new_table(table, family='ipv4'):
@ -515,21 +563,29 @@ def new_table(table, family='ipv4'):
IPv6:
salt '*' nftables.new_table filter family=ipv6
'''
ret = {'comment': '',
'result': False}
if not table:
return 'Error: table needs to be specified'
ret['comment'] = 'Table needs to be specified'
return ret
if check_table(table, family=family):
return 'Error: table {0} in family {1} already exists'.\
format(table, family)
res = check_table(table, family=family)
if res['result']:
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} add table {1} {2}'.format(_nftables_cmd(), nft_family, table)
out = __salt__['cmd.run'](cmd, python_shell=False)
if not out:
out = True
return out
ret['comment'] = 'Table {0} in family {1} created'.\
format(table, family)
ret['result'] = True
else:
ret['comment'] = 'Table {0} in family {1} could not be created'.\
format(table, family)
return ret
def delete_table(table, family='ipv4'):
@ -547,20 +603,29 @@ def delete_table(table, family='ipv4'):
IPv6:
salt '*' nftables.delete_table filter family=ipv6
'''
ret = {'comment': '',
'result': False}
if not table:
return 'Error: table needs to be specified'
ret['comment'] = 'Table needs to be specified'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist' . format(table, family)
res = check_table(table, family=family)
if not res['result']:
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} delete table {1} {2}'.format(_nftables_cmd(), nft_family, table)
out = __salt__['cmd.run'](cmd, python_shell=False)
if not out:
out = True
return out
ret['comment'] = 'Table {0} in family {1} deleted'.\
format(table, family)
ret['result'] = True
else:
ret['comment'] = 'Table {0} in family {1} could not be deleted'.\
format(table, family)
return ret
def new_chain(table='filter', chain=None, table_type=None, hook=None, priority=None, family='ipv4'):
@ -588,34 +653,45 @@ def new_chain(table='filter', chain=None, table_type=None, hook=None, priority=N
salt '*' nftables.new_chain filter foo family=ipv6
'''
ret = {'comment': '',
'result': False}
if not chain:
return 'Error: Chain needs to be specified'
ret['comment'] = 'Chain needs to be specified'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
format(table, family)
res = check_table(table, family=family)
if not res['result']:
return res
if check_chain(table, chain, family=family):
return 'Error: chain {0} in table {1} in family {2} already exists'.\
res = check_chain(table, chain, family=family)
if res['result']:
ret['comment'] = 'Chain {0} in table {1} in family {2} already exists'.\
format(chain, table, family)
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} add chain {1} {2} {3}'.\
format(_nftables_cmd(), nft_family, table, chain)
format(_nftables_cmd(), nft_family, table, chain)
if table_type or hook or priority:
if table_type and hook and six.text_type(priority):
cmd = r'{0} \{{ type {1} hook {2} priority {3}\; \}}'.\
format(cmd, table_type, hook, priority)
else:
# Specify one, rqeuire all
return 'Error: table_type hook and priority required'
# Specify one, require all
ret['comment'] = 'Table_type, hook, and priority required.'
return ret
out = __salt__['cmd.run'](cmd, python_shell=False)
if not out:
out = True
return out
ret['comment'] = 'Chain {0} in table {1} in family {2} created'.\
format(chain, table, family)
ret['result'] = True
else:
ret['comment'] = 'Chain {0} in table {1} in family {2} could not be created'.\
format(chain, table, family)
return ret
def delete_chain(table='filter', chain=None, family='ipv4'):
@ -637,26 +713,34 @@ def delete_chain(table='filter', chain=None, family='ipv4'):
salt '*' nftables.delete_chain filter foo family=ipv6
'''
ret = {'comment': '',
'result': False}
if not chain:
return 'Error: Chain needs to be specified'
ret['comment'] = 'Chain needs to be specified'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
format(table, family)
res = check_table(table, family=family)
if not res['result']:
return res
if not check_chain(table, chain, family=family):
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, family)
res = check_chain(table, chain, family=family)
if not res['result']:
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} delete chain {1} {2} {3}'.\
format(_nftables_cmd(), nft_family, table, chain)
format(_nftables_cmd(), nft_family, table, chain)
out = __salt__['cmd.run'](cmd, python_shell=False)
if not out:
out = True
return out
ret['comment'] = 'Chain {0} in table {1} in family {2} deleted'.\
format(chain, table, family)
ret['result'] = True
else:
ret['comment'] = 'Chain {0} in table {1} in family {2} could not be deleted'.\
format(chain, table, family)
return ret
def append(table='filter', chain=None, rule=None, family='ipv4'):
@ -673,38 +757,51 @@ def append(table='filter', chain=None, rule=None, family='ipv4'):
.. code-block:: bash
salt '*' nftables.append filter input \\
rule='input tcp dport 22 log accept'
rule='tcp dport 22 log accept'
IPv6:
salt '*' nftables.append filter input \\
rule='input tcp dport 22 log accept' \\
rule='tcp dport 22 log accept' \\
family=ipv6
'''
ret = {'comment': 'Failed to append rule {0} to chain {1} in table {2}.'.format(rule, chain, table),
'result': False}
if not chain:
return 'Error: Chain needs to be specified'
ret['comment'] = 'Chain needs to be specified'
return ret
if not rule:
return 'Error: Rule needs to be specified'
ret['comment'] = 'Rule needs to be specified'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
format(table, family)
res = check_table(table, family=family)
if not res['result']:
return res
if not check_chain(table, chain, family=family):
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, family)
res = check_chain(table, chain, family=family)
if not res['result']:
return res
if check(table, chain, rule, family=family):
return 'Error: rule {0} chain {1} in table {2} in family {3} already exists'.\
format(rule, chain, table, family)
res = check(table, chain, rule, family=family)
if res['result']:
ret['comment'] = 'Rule {0} chain {1} in table {2} in family {3} already exists'.\
format(rule, chain, table, family)
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} add rule {1} {2} {3} {4}'.\
format(_nftables_cmd(), nft_family, table, chain, rule)
format(_nftables_cmd(), nft_family, table, chain, rule)
out = __salt__['cmd.run'](cmd, python_shell=False)
if len(out) == 0:
return True
ret['result'] = True
ret['comment'] = 'Added rule "{0}" chain {1} in table {2} in family {3}.'.\
format(rule, chain, table, family)
else:
return False
ret['comment'] = 'Failed to add rule "{0}" chain {1} in table {2} in family {3}.'.\
format(rule, chain, table, family)
return ret
def insert(table='filter', chain=None, position=None, rule=None, family='ipv4'):
@ -723,36 +820,44 @@ def insert(table='filter', chain=None, position=None, rule=None, family='ipv4'):
.. code-block:: bash
salt '*' nftables.insert filter input \\
rule='input tcp dport 22 log accept'
rule='tcp dport 22 log accept'
salt '*' nftables.insert filter input position=3 \\
rule='input tcp dport 22 log accept'
rule='tcp dport 22 log accept'
IPv6:
salt '*' nftables.insert filter input \\
rule='input tcp dport 22 log accept' \\
rule='tcp dport 22 log accept' \\
family=ipv6
salt '*' nftables.insert filter input position=3 \\
rule='input tcp dport 22 log accept' \\
rule='tcp dport 22 log accept' \\
family=ipv6
'''
ret = {'comment': 'Failed to insert rule {0} to table {1}.'.format(rule, table),
'result': False}
if not chain:
return 'Error: Chain needs to be specified'
ret['comment'] = 'Chain needs to be specified'
return ret
if not rule:
return 'Error: Rule needs to be specified'
ret['comment'] = 'Rule needs to be specified'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
format(table, family)
res = check_table(table, family=family)
if not res['result']:
return res
if not check_chain(table, chain, family=family):
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, family)
res = check_chain(table, chain, family=family)
if not res['result']:
return res
if check(table, chain, rule, family=family):
return 'Error: rule {0} chain {1} in table {2} in family {3} already exists'.\
format(rule, chain, table, family)
res = check(table, chain, rule, family=family)
if res['result']:
ret['comment'] = 'Rule {0} chain {1} in table {2} in family {3} already exists'.\
format(rule, chain, table, family)
return ret
nft_family = _NFTABLES_FAMILIES[family]
if position:
@ -764,9 +869,13 @@ def insert(table='filter', chain=None, position=None, rule=None, family='ipv4'):
out = __salt__['cmd.run'](cmd, python_shell=False)
if len(out) == 0:
return True
ret['result'] = True
ret['comment'] = 'Added rule "{0}" chain {1} in table {2} in family {3}.'.\
format(rule, chain, table, family)
else:
return False
ret['comment'] = 'Failed to add rule "{0}" chain {1} in table {2} in family {3}.'.\
format(rule, chain, table, family)
return ret
def delete(table, chain=None, position=None, rule=None, family='ipv4'):
@ -786,30 +895,35 @@ def delete(table, chain=None, position=None, rule=None, family='ipv4'):
salt '*' nftables.delete filter input position=3
salt '*' nftables.delete filter input \\
rule='input tcp dport 22 log accept'
rule='tcp dport 22 log accept'
IPv6:
salt '*' nftables.delete filter input position=3 family=ipv6
salt '*' nftables.delete filter input \\
rule='input tcp dport 22 log accept' \\
rule='tcp dport 22 log accept' \\
family=ipv6
'''
ret = {'comment': 'Failed to delete rule {0} in table {1}.'.format(rule, table),
'result': False}
if position and rule:
return 'Error: Only specify a position or a rule, not both'
ret['comment'] = 'Only specify a position or a rule, not both'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
format(table, family)
res = check_table(table, family=family)
if not res['result']:
return res
if not check_chain(table, chain, family=family):
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, family)
res = check_chain(table, chain, family=family)
if not res['result']:
return res
if not check(table, chain, rule, family=family):
return 'Error: rule {0} chain {1} in table {2} in family {3} does not exist'.\
format(rule, chain, table, family)
res = check(table, chain, rule, family=family)
if not res['result']:
ret['comment'] = 'Rule {0} chain {1} in table {2} in family {3} does not exist'.\
format(rule, chain, table, family)
return ret
# nftables rules can only be deleted using the handle
# if we don't have it, find it.
@ -818,13 +932,17 @@ def delete(table, chain=None, position=None, rule=None, family='ipv4'):
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} delete rule {1} {2} {3} handle {4}'.\
format(_nftables_cmd(), nft_family, table, chain, position)
format(_nftables_cmd(), nft_family, table, chain, position)
out = __salt__['cmd.run'](cmd, python_shell=False)
if len(out) == 0:
return True
ret['result'] = True
ret['comment'] = 'Deleted rule "{0}" in chain {1} in table {2} in family {3}.'.\
format(rule, chain, table, family)
else:
return False
ret['comment'] = 'Failed to delete rule "{0}" in chain {1} table {2} in family {3}'.\
format(rule, chain, table, family)
return ret
def flush(table='filter', chain='', family='ipv4'):
@ -843,24 +961,33 @@ def flush(table='filter', chain='', family='ipv4'):
IPv6:
salt '*' nftables.flush filter input family=ipv6
'''
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
format(table, family)
ret = {'comment': 'Failed to flush rules from chain {0} in table {1}.'.format(chain, table),
'result': False}
res = check_table(table, family=family)
if not res['result']:
return res
nft_family = _NFTABLES_FAMILIES[family]
if chain:
if not check_chain(table, chain, family=family):
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, nft_family)
res = check_chain(table, chain, family=family)
if not res['result']:
return res
cmd = '{0} flush chain {1} {2} {3}'.\
format(_nftables_cmd(), nft_family, table, chain)
format(_nftables_cmd(), nft_family, table, chain)
comment = 'from chain {0} in table {1} in family {2}.'.\
format(chain, table, family)
else:
cmd = '{0} flush table {1} {2}'.\
format(_nftables_cmd(), nft_family, table)
comment = 'from table {0} in family {1}.'.\
format(table, family)
out = __salt__['cmd.run'](cmd, python_shell=False)
if len(out) == 0:
return True
ret['result'] = True
ret['comment'] = 'Flushed rules {0}'.format(comment)
else:
return False
ret['comment'] = 'Failed to flush rules {0}'.format(comment)
return ret

View file

@ -99,6 +99,15 @@ at some point be deprecated in favor of a more generic `firewall` state.
- sport: 1025:65535
- save: True
output:
nftables.chain_present:
- family: ip
- table: filter
output:
nftables.chain_absent:
- family: ip
- table: filter
'''
from __future__ import absolute_import, print_function, unicode_literals
@ -106,6 +115,9 @@ from __future__ import absolute_import, print_function, unicode_literals
# Import salt libs
from salt.state import STATE_INTERNAL_KEYWORDS as _STATE_INTERNAL_KEYWORDS
import logging
log = logging.getLogger(__name__)
def __virtual__():
'''
@ -136,13 +148,13 @@ def chain_present(name, table='filter', table_type=None, hook=None, priority=Non
'comment': ''}
chain_check = __salt__['nftables.check_chain'](table, name, family=family)
if chain_check is True:
if chain_check['result'] is True:
ret['result'] = True
ret['comment'] = ('nftables {0} chain is already exist in {1} table for {2}'
.format(name, table, family))
return ret
command = __salt__['nftables.new_chain'](
res = __salt__['nftables.new_chain'](
table,
name,
table_type=table_type,
@ -151,7 +163,7 @@ def chain_present(name, table='filter', table_type=None, hook=None, priority=Non
family=family
)
if command is True:
if res['result'] is True:
ret['changes'] = {'locale': name}
ret['result'] = True
ret['comment'] = ('nftables {0} chain in {1} table create success for {2}'
@ -162,7 +174,7 @@ def chain_present(name, table='filter', table_type=None, hook=None, priority=Non
ret['comment'] = 'Failed to create {0} chain in {1} table: {2} for {3}'.format(
name,
table,
command.strip(),
res['comment'].strip(),
family
)
return ret
@ -239,26 +251,38 @@ def append(name, family='ipv4', **kwargs):
for ignore in _STATE_INTERNAL_KEYWORDS:
if ignore in kwargs:
del kwargs[ignore]
rule = __salt__['nftables.build_rule'](family=family, **kwargs)
command = __salt__['nftables.build_rule'](full=True, family=family, command='add', **kwargs)
res = __salt__['nftables.build_rule'](family=family, **kwargs)
if not res['result']:
return res
rule = res['rule']
if __salt__['nftables.check'](kwargs['table'],
kwargs['chain'],
rule,
family) is True:
res = __salt__['nftables.build_rule'](full=True, family=family, command='add', **kwargs)
if not res['result']:
return res
command = res['rule']
res = __salt__['nftables.check'](kwargs['table'],
kwargs['chain'],
rule,
family)
if res['result']:
ret['result'] = True
ret['comment'] = 'nftables rule for {0} already set ({1}) for {2}'.format(
name,
command.strip(),
family)
return ret
if __opts__['test']:
if 'test' in __opts__ and __opts__['test']:
ret['comment'] = 'nftables rule for {0} needs to be set ({1}) for {2}'.format(
name,
command.strip(),
family)
return ret
if __salt__['nftables.append'](kwargs['table'], kwargs['chain'], rule, family):
res = __salt__['nftables.append'](kwargs['table'],
kwargs['chain'],
rule,
family)
if res['result']:
ret['changes'] = {'locale': name}
ret['result'] = True
ret['comment'] = 'Set nftables rule for {0} to: {1} for {2}'.format(
@ -274,9 +298,10 @@ def append(name, family='ipv4', **kwargs):
else:
ret['result'] = False
ret['comment'] = ('Failed to set nftables rule for {0}.\n'
'Attempted rule was {1} for {2}').format(
'Attempted rule was {1} for {2}.\n'
'{3}').format(
name,
command.strip(), family)
command.strip(), family, res['comment'])
return ret
@ -306,25 +331,42 @@ def insert(name, family='ipv4', **kwargs):
for ignore in _STATE_INTERNAL_KEYWORDS:
if ignore in kwargs:
del kwargs[ignore]
rule = __salt__['nftables.build_rule'](family=family, **kwargs)
command = __salt__['nftables.build_rule'](full=True, family=family, command='insert', **kwargs)
if __salt__['nftables.check'](kwargs['table'],
kwargs['chain'],
rule,
family) is True:
res = __salt__['nftables.build_rule'](family=family, **kwargs)
if not res['result']:
return res
rule = res['rule']
res = __salt__['nftables.build_rule'](full=True,
family=family,
command='insert',
**kwargs)
if not res['result']:
return res
command = res['rule']
res = __salt__['nftables.check'](kwargs['table'],
kwargs['chain'],
rule,
family)
if res['result']:
ret['result'] = True
ret['comment'] = 'nftables rule for {0} already set for {1} ({2})'.format(
name,
family,
command.strip())
return ret
if __opts__['test']:
if 'test' in __opts__ and __opts__['test']:
ret['comment'] = 'nftables rule for {0} needs to be set for {1} ({2})'.format(
name,
family,
command.strip())
return ret
if __salt__['nftables.insert'](kwargs['table'], kwargs['chain'], kwargs['position'], rule, family):
res = __salt__['nftables.insert'](kwargs['table'],
kwargs['chain'],
kwargs['position'],
rule,
family)
if res['result']:
ret['changes'] = {'locale': name}
ret['result'] = True
ret['comment'] = 'Set nftables rule for {0} to: {1} for {2}'.format(
@ -372,19 +414,29 @@ def delete(name, family='ipv4', **kwargs):
for ignore in _STATE_INTERNAL_KEYWORDS:
if ignore in kwargs:
del kwargs[ignore]
rule = __salt__['nftables.build_rule'](family=family, **kwargs)
command = __salt__['nftables.build_rule'](full=True, family=family, command='D', **kwargs)
if not __salt__['nftables.check'](kwargs['table'],
kwargs['chain'],
rule,
family) is True:
res = __salt__['nftables.build_rule'](family=family, **kwargs)
if not res['result']:
return res
rule = res['rule']
res = __salt__['nftables.build_rule'](full=True, family=family, command='D', **kwargs)
if not res['result']:
return res
command = res['rule']
res = __salt__['nftables.check'](kwargs['table'],
kwargs['chain'],
rule,
family)
if not res['result']:
ret['result'] = True
ret['comment'] = 'nftables rule for {0} already absent for {1} ({2})'.format(
name,
family,
command.strip())
return ret
if __opts__['test']:
if 'test' in __opts__ and __opts__['test']:
ret['comment'] = 'nftables rule for {0} needs to be deleted for {1} ({2})'.format(
name,
family,
@ -392,19 +444,19 @@ def delete(name, family='ipv4', **kwargs):
return ret
if 'position' in kwargs:
result = __salt__['nftables.delete'](
res = __salt__['nftables.delete'](
kwargs['table'],
kwargs['chain'],
family=family,
position=kwargs['position'])
else:
result = __salt__['nftables.delete'](
res = __salt__['nftables.delete'](
kwargs['table'],
kwargs['chain'],
family=family,
rule=rule)
if result:
if res['result']:
ret['changes'] = {'locale': name}
ret['result'] = True
ret['comment'] = 'Delete nftables rule for {0} {1}'.format(
@ -447,7 +499,8 @@ def flush(name, family='ipv4', **kwargs):
if 'table' not in kwargs:
kwargs['table'] = 'filter'
if not __salt__['nftables.check_table'](kwargs['table'], family=family):
res = __salt__['nftables.check_table'](kwargs['table'], family=family)
if not res['result']:
ret['result'] = False
ret['comment'] = 'Failed to flush table {0} in family {1}, table does not exist.'.format(
kwargs['table'],
@ -458,7 +511,10 @@ def flush(name, family='ipv4', **kwargs):
if 'chain' not in kwargs:
kwargs['chain'] = ''
else:
if not __salt__['nftables.check_chain'](kwargs['table'], kwargs['chain'], family=family):
res = __salt__['nftables.check_chain'](kwargs['table'],
kwargs['chain'],
family=family)
if not res['result']:
ret['result'] = False
ret['comment'] = 'Failed to flush chain {0} in table {1} in family {2}, chain does not exist.'.format(
kwargs['chain'],
@ -467,7 +523,10 @@ def flush(name, family='ipv4', **kwargs):
)
return ret
if __salt__['nftables.flush'](kwargs['table'], kwargs['chain'], family):
res = __salt__['nftables.flush'](kwargs['table'],
kwargs['chain'],
family)
if res['result']:
ret['changes'] = {'locale': name}
ret['result'] = True
ret['comment'] = 'Flush nftables rules in {0} table {1} chain {2} family'.format(

View file

@ -48,29 +48,44 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it build a well-formatted nftables rule based on kwargs.
'''
self.assertEqual(nftables.build_rule(full='True'),
'Error: Table needs to be specified')
{'result': False,
'rule': '',
'comment': 'Table needs to be specified'})
self.assertEqual(nftables.build_rule(table='filter', full='True'),
'Error: Chain needs to be specified')
{'result': False,
'rule': '',
'comment': 'Chain needs to be specified'})
self.assertEqual(nftables.build_rule(table='filter', chain='input',
full='True'),
'Error: Command needs to be specified')
{'result': False,
'rule': '',
'comment': 'Command needs to be specified'})
self.assertEqual(nftables.build_rule(table='filter', chain='input',
command='insert', position='3',
full='True'),
'nft insert rule ip filter input position 3 ')
{'result': True,
'rule': 'nft insert rule ip filter input position 3 ',
'comment': 'Successfully built rule'})
self.assertEqual(nftables.build_rule(table='filter', chain='input',
command='insert', full='True'),
'nft insert rule ip filter input ')
{'result': True,
'rule': 'nft insert rule ip filter input ',
'comment': 'Successfully built rule'})
self.assertEqual(nftables.build_rule(table='filter', chain='input',
command='halt', full='True'),
'nft halt rule ip filter input ')
{'result': True,
'rule': 'nft halt rule ip filter input ',
'comment': 'Successfully built rule'})
self.assertEqual(nftables.build_rule(), '')
self.assertEqual(nftables.build_rule(),
{'result': True,
'rule': '',
'comment': ''})
# 'get_saved_rules' function tests: 1
@ -119,33 +134,46 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it get the handle for a particular rule
'''
self.assertEqual(nftables.get_rule_handle(),
'Error: Chain needs to be specified')
{'result': False,
'comment': 'Chain needs to be specified'})
self.assertEqual(nftables.get_rule_handle(chain='input'),
'Error: Rule needs to be specified')
{'result': False,
'comment': 'Rule needs to be specified'})
_ru = 'input tcp dport 22 log accept'
ret = 'Error: table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.get_rule_handle(chain='input', rule=_ru),
ret)
ret = 'Error: chain input in table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Chain input in table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='table ip filter')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.get_rule_handle(chain='input', rule=_ru),
ret)
ret = ('Error: rule input tcp dport 22 log accept chain input'
' in table filter in family ipv4 does not exist')
ret1 = 'Error: could not find rule input tcp dport 22 log accept'
ret = {'result': False,
'comment': ('Rule input tcp dport 22 log accept chain'
' input in table filter in family ipv4 does not exist')}
ret1 = {'result': False,
'comment': 'Could not find rule input tcp dport 22 log accept'}
with patch.object(nftables, 'check_table',
MagicMock(return_value=True)):
MagicMock(return_value={'result': True,
'comment': ''})):
with patch.object(nftables, 'check_chain',
MagicMock(return_value=True)):
MagicMock(return_value={'result': True,
'comment': ''})):
_ret1 = {'result': False,
'comment': ('Rule input tcp dport 22 log accept'
' chain input in table filter in'
' family ipv4 does not exist')}
_ret2 = {'result': True, 'comment': ''}
with patch.object(nftables, 'check',
MagicMock(side_effect=[False, True])):
MagicMock(side_effect=[_ret1, _ret2])):
self.assertEqual(nftables.get_rule_handle(chain='input',
rule=_ru), ret)
@ -163,30 +191,38 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it check for the existence of a rule in the table and chain
'''
self.assertEqual(nftables.check(),
'Error: Chain needs to be specified')
{'result': False,
'comment': 'Chain needs to be specified'})
self.assertEqual(nftables.check(chain='input'),
'Error: Rule needs to be specified')
{'result': False,
'comment': 'Rule needs to be specified'})
_ru = 'input tcp dport 22 log accept'
ret = 'Error: table filter in family ipv4 does not exist'
_ru = 'tcp dport 22 log accept'
ret = {'result': False,
'comment': 'Table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.check(chain='input', rule=_ru), ret)
mock = MagicMock(return_value='table ip filter')
ret = 'Error: chain input in table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Chain input in table filter in family ipv4 does not exist'}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.check(chain='input', rule=_ru), ret)
mock = MagicMock(return_value='table ip filter chain input {{')
ret = {'result': False, 'comment':
'Rule tcp dport 22 log accept in chain input in table filter in family ipv4 does not exist'}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertFalse(nftables.check(chain='input', rule=_ru))
self.assertEqual(nftables.check(chain='input', rule=_ru), ret)
r_val = 'table ip filter chain input {{ input tcp dport 22 log accept #'
mock = MagicMock(return_value=r_val)
ret = {'result': True,
'comment': 'Rule tcp dport 22 log accept in chain input in table filter in family ipv4 exists'}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertTrue(nftables.check(chain='input', rule=_ru))
self.assertEqual(nftables.check(chain='input', rule=_ru), ret)
# 'check_chain' function tests: 1
@ -195,15 +231,20 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it check for the existence of a chain in the table
'''
self.assertEqual(nftables.check_chain(),
'Error: Chain needs to be specified')
{'result': False,
'comment': 'Chain needs to be specified'})
mock = MagicMock(return_value='')
ret = {'comment': 'Chain input in table filter in family ipv4 does not exist',
'result': False}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertFalse(nftables.check_chain(chain='input'))
self.assertEqual(nftables.check_chain(chain='input'), ret)
mock = MagicMock(return_value='chain input {{')
ret = {'comment': 'Chain input in table filter in family ipv4 exists',
'result': True}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertTrue(nftables.check_chain(chain='input'))
self.assertEqual(nftables.check_chain(chain='input'), ret)
# 'check_table' function tests: 1
@ -212,15 +253,20 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it check for the existence of a table
'''
self.assertEqual(nftables.check_table(),
'Error: table needs to be specified')
{'result': False,
'comment': 'Table needs to be specified'})
mock = MagicMock(return_value='')
ret = {'comment': 'Table nat in family ipv4 does not exist',
'result': False}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertFalse(nftables.check_table(table='nat'))
self.assertEqual(nftables.check_table(table='nat'), ret)
mock = MagicMock(return_value='table ip nat')
ret = {'comment': 'Table nat in family ipv4 exists',
'result': True}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertTrue(nftables.check_table(table='nat'))
self.assertEqual(nftables.check_table(table='nat'), ret)
# 'new_table' function tests: 1
@ -229,16 +275,19 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it create new custom table.
'''
self.assertEqual(nftables.new_table(table=None),
'Error: table needs to be specified')
{'result': False,
'comment': 'Table needs to be specified'})
mock = MagicMock(return_value='')
ret = {'comment': 'Table nat in family ipv4 created', 'result': True}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.new_table(table='nat'), True)
self.assertEqual(nftables.new_table(table='nat'), ret)
mock = MagicMock(return_value='table ip nat')
ret = {'comment': 'Table nat in family ipv4 exists', 'result': True}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.new_table(table='nat'),
'Error: table nat in family ipv4 already exists')
ret)
# 'delete_table' function tests: 1
@ -247,16 +296,35 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it delete custom table.
'''
self.assertEqual(nftables.delete_table(table=None),
'Error: table needs to be specified')
{'result': False,
'comment': 'Table needs to be specified'})
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.delete_table(table='nat'),
'Error: table nat in family ipv4 does not exist')
mock_ret = {'result': False,
'comment': 'Table nat in family ipv4 does not exist'}
with patch('salt.modules.nftables.check_table',
MagicMock(return_value=mock_ret)):
ret = nftables.delete_table(table='nat')
self.assertEqual(ret,
{'result': False,
'comment': 'Table nat in family ipv4 does not exist'})
mock = MagicMock(return_value='table ip nat')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.delete_table(table='nat'), 'table ip nat')
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})):
self.assertEqual(nftables.delete_table(table='nat'),
{'comment': 'Table nat in family ipv4 could not be deleted',
'result': False})
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})):
self.assertEqual(nftables.delete_table(table='nat'),
{'comment': 'Table nat in family ipv4 deleted',
'result': True})
# 'new_chain' function tests: 2
@ -265,14 +333,17 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it create new chain to the specified table.
'''
self.assertEqual(nftables.new_chain(),
'Error: Chain needs to be specified')
{'result': False,
'comment': 'Chain needs to be specified'})
ret = 'Error: table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.new_chain(chain='input'), ret)
ret = 'Error: chain input in table filter in family ipv4 already exists'
ret = {'result': False,
'comment': 'Chain input in table filter in family ipv4 already exists'}
mock = MagicMock(return_value='table ip filter chain input {{')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.new_chain(chain='input'), ret)
@ -283,11 +354,16 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'''
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check_chain', MagicMock(return_value=False)), \
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
patch('salt.modules.nftables.check_chain',
MagicMock(return_value={'result': False,
'comment': ''})), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})):
self.assertEqual(nftables.new_chain(chain='input',
table_type='filter'),
'Error: table_type hook and priority required')
{'result': False,
'comment': 'Table_type, hook, and priority required.'})
self.assertTrue(nftables.new_chain(chain='input',
table_type='filter',
@ -300,16 +376,37 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it delete the chain from the specified table.
'''
self.assertEqual(nftables.delete_chain(),
'Error: Chain needs to be specified')
{'result': False,
'comment': 'Chain needs to be specified'})
ret = 'Error: table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.delete_chain(chain='input'), ret)
ret = 'Error: chain input in table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Chain input in table filter in family ipv4 could not be deleted'}
mock = MagicMock(return_value='table ip filter')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})), \
patch('salt.modules.nftables.check_chain',
MagicMock(return_value={'result': True,
'comment': ''})):
self.assertEqual(nftables.delete_chain(chain='input'), ret)
ret = {'result': True,
'comment': 'Chain input in table filter in family ipv4 deleted'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})), \
patch('salt.modules.nftables.check_chain',
MagicMock(return_value={'result': True,
'comment': ''})):
self.assertEqual(nftables.delete_chain(chain='input'), ret)
def test_delete_chain_variables(self):
@ -318,9 +415,15 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'''
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check_chain', MagicMock(return_value=True)), \
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
self.assertTrue(nftables.delete_chain(chain='input'))
patch('salt.modules.nftables.check_chain',
MagicMock(return_value={'result': True,
'comment': ''})), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})):
_expected = {'comment': 'Chain input in table filter in family ipv4 deleted',
'result': True}
self.assertEqual(nftables.delete_chain(chain='input'), _expected)
# 'append' function tests: 2
@ -329,26 +432,34 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it append a rule to the specified table & chain.
'''
self.assertEqual(nftables.append(),
'Error: Chain needs to be specified')
{'result': False,
'comment': 'Chain needs to be specified'})
self.assertEqual(nftables.append(chain='input'),
'Error: Rule needs to be specified')
{'result': False,
'comment': 'Rule needs to be specified'})
_ru = 'input tcp dport 22 log accept'
ret = 'Error: table filter in family ipv4 does not exist'
ret = {'comment': 'Table filter in family ipv4 does not exist',
'result': False}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.append(chain='input', rule=_ru), ret)
ret = 'Error: chain input in table filter in family ipv4 does not exist'
ret = {'comment': 'Chain input in table filter in family ipv4 does not exist',
'result': False}
mock = MagicMock(return_value='table ip filter')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.append(chain='input', rule=_ru), ret)
r_val = 'table ip filter chain input {{ input tcp dport 22 log accept #'
mock = MagicMock(return_value=r_val)
_expected = {'comment': 'Rule input tcp dport 22 log accept chain input in table filter in family ipv4 already exists',
'result': False}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertTrue(nftables.append(chain='input', rule=_ru))
self.assertEqual(nftables.append(chain='input',
rule=_ru),
_expected)
def test_append_rule(self):
'''
@ -357,11 +468,19 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
_ru = 'input tcp dport 22 log accept'
mock = MagicMock(side_effect=['1', ''])
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check', MagicMock(return_value=False)), \
patch('salt.modules.nftables.check_chain', MagicMock(return_value=True)), \
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
self.assertFalse(nftables.append(chain='input', rule=_ru))
self.assertTrue(nftables.append(chain='input', rule=_ru))
patch('salt.modules.nftables.check',
MagicMock(return_value={'result': False,
'comment': ''})), \
patch('salt.modules.nftables.check_chain',
MagicMock(return_value={'result': True,
'comment': ''})), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})):
_expected = {'comment': 'Failed to add rule "{0}" chain input in table filter in family ipv4.'.format(_ru), 'result': False}
self.assertEqual(nftables.append(chain='input', rule=_ru), _expected)
_expected = {'comment': 'Added rule "{0}" chain input in table filter in family ipv4.'.format(_ru), 'result': True}
self.assertEqual(nftables.append(chain='input', rule=_ru), _expected)
# 'insert' function tests: 2
@ -371,18 +490,22 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
at the specified position.
'''
self.assertEqual(nftables.insert(),
'Error: Chain needs to be specified')
{'result': False,
'comment': 'Chain needs to be specified'})
self.assertEqual(nftables.insert(chain='input'),
'Error: Rule needs to be specified')
{'result': False,
'comment': 'Rule needs to be specified'})
_ru = 'input tcp dport 22 log accept'
ret = 'Error: table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.insert(chain='input', rule=_ru), ret)
ret = 'Error: chain input in table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Chain input in table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='table ip filter')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.insert(chain='input', rule=_ru), ret)
@ -390,6 +513,10 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
r_val = 'table ip filter chain input {{ input tcp dport 22 log accept #'
mock = MagicMock(return_value=r_val)
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
res = nftables.insert(chain='input', rule=_ru)
import logging
log = logging.getLogger(__name__)
log.debug('=== res %s ===', res)
self.assertTrue(nftables.insert(chain='input', rule=_ru))
def test_insert_rule(self):
@ -400,11 +527,23 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
_ru = 'input tcp dport 22 log accept'
mock = MagicMock(side_effect=['1', ''])
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check', MagicMock(return_value=False)), \
patch('salt.modules.nftables.check_chain', MagicMock(return_value=True)), \
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
self.assertFalse(nftables.insert(chain='input', rule=_ru))
self.assertTrue(nftables.insert(chain='input', rule=_ru))
patch('salt.modules.nftables.check',
MagicMock(return_value={'result': False,
'comment': ''})), \
patch('salt.modules.nftables.check_chain',
MagicMock(return_value={'result': True,
'comment': ''})), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})):
_expected = {'result': False,
'comment': 'Failed to add rule "{0}" chain input in table filter in family ipv4.'.format(_ru)}
self.assertEqual(nftables.insert(chain='input', rule=_ru),
_expected)
_expected = {'result': True,
'comment': 'Added rule "{0}" chain input in table filter in family ipv4.'.format(_ru)}
self.assertEqual(nftables.insert(chain='input', rule=_ru),
_expected)
# 'delete' function tests: 2
@ -415,17 +554,21 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
the rule's position in the chain.
'''
_ru = 'input tcp dport 22 log accept'
ret = {'result': False,
'comment': 'Only specify a position or a rule, not both'}
self.assertEqual(nftables.delete(table='filter', chain='input',
position='3', rule=_ru),
'Error: Only specify a position or a rule, not both')
ret)
ret = 'Error: table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.delete(table='filter', chain='input',
rule=_ru), ret)
ret = 'Error: chain input in table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Chain input in table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='table ip filter')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.delete(table='filter', chain='input',
@ -444,14 +587,27 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'''
mock = MagicMock(side_effect=['1', ''])
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check', MagicMock(return_value=True)), \
patch('salt.modules.nftables.check_chain', MagicMock(return_value=True)), \
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
self.assertFalse(nftables.delete(table='filter', chain='input',
position='3'))
self.assertTrue(nftables.delete(table='filter', chain='input',
position='3'))
patch('salt.modules.nftables.check',
MagicMock(return_value={'result': True,
'comment': ''})), \
patch('salt.modules.nftables.check_chain',
MagicMock(return_value={'result': True,
'comment': ''})), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})):
_expected = {'result': False,
'comment': 'Failed to delete rule "None" in chain input table filter in family ipv4'}
self.assertEqual(nftables.delete(table='filter',
chain='input',
position='3'),
_expected)
_expected = {'result': True,
'comment': 'Deleted rule "None" in chain input in table filter in family ipv4.'}
self.assertEqual(nftables.delete(table='filter',
chain='input',
position='3'),
_expected)
# 'flush' function tests: 2
def test_flush(self):
@ -459,12 +615,14 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it flush the chain in the specified table, flush all chains
in the specified table if chain is not specified.
'''
ret = 'Error: table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.flush(table='filter', chain='input'), ret)
ret = 'Error: chain input in table filter in family ip does not exist'
ret = {'result': False,
'comment': 'Chain input in table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='table ip filter')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.flush(table='filter', chain='input'), ret)
@ -476,7 +634,19 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'''
mock = MagicMock(side_effect=['1', ''])
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check_chain', MagicMock(return_value=True)), \
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
self.assertFalse(nftables.flush(table='filter', chain='input'))
self.assertTrue(nftables.flush(table='filter', chain='input'))
patch('salt.modules.nftables.check_chain',
MagicMock(return_value={'result': True,
'comment': ''})), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})):
_expected = {'result': False,
'comment': 'Failed to flush rules from chain input in table filter in family ipv4.'}
self.assertEqual(nftables.flush(table='filter',
chain='input'),
_expected)
_expected = {'result': True,
'comment': 'Flushed rules from chain input in table filter in family ipv4.'}
self.assertEqual(nftables.flush(table='filter',
chain='input'),
_expected)

View file

@ -36,14 +36,17 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'changes': {},
'result': True,
'comment': ''}
mock = MagicMock(side_effect=[True, False, False])
with patch.dict(nftables.__salt__, {"nftables.check_chain": mock}):
mock = MagicMock(side_effect=[{'result': True, 'comment': ''},
{'result': False, 'comment': ''},
{'result': False, 'comment': ''}])
with patch.dict(nftables.__salt__, {'nftables.check_chain': mock}):
ret.update({'comment': 'nftables salt chain is already'
' exist in filter table for ipv4'})
self.assertDictEqual(nftables.chain_present('salt'), ret)
mock = MagicMock(side_effect=[True, ''])
with patch.dict(nftables.__salt__, {"nftables.new_chain": mock}):
mock = MagicMock(side_effect=[{'result': True, 'comment': ''},
{'result': False, 'comment': ''}])
with patch.dict(nftables.__salt__, {'nftables.new_chain': mock}):
ret.update({'changes': {'locale': 'salt'},
'comment': 'nftables salt chain in filter'
' table create success for ipv4'})
@ -64,13 +67,13 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'result': True,
'comment': ''}
mock = MagicMock(side_effect=[False, True])
with patch.dict(nftables.__salt__, {"nftables.check_chain": mock}):
with patch.dict(nftables.__salt__, {'nftables.check_chain': mock}):
ret.update({'comment': 'nftables salt chain is already absent'
' in filter table for ipv4'})
self.assertDictEqual(nftables.chain_absent('salt'), ret)
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {"nftables.flush": mock}):
with patch.dict(nftables.__salt__, {'nftables.flush': mock}):
ret.update({'result': False,
'comment': 'Failed to flush salt chain'
' in filter table: for ipv4'})
@ -86,26 +89,34 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'comment': ''}
mock = MagicMock(return_value=[])
with patch.object(nftables, '_STATE_INTERNAL_KEYWORDS', mock):
mock = MagicMock(return_value='a')
with patch.dict(nftables.__salt__, {"nftables.build_rule": mock}):
mock = MagicMock(side_effect=[True, False, False, False])
with patch.dict(nftables.__salt__, {"nftables.check": mock}):
mock = MagicMock(return_value={'result': True,
'comment': '',
'rule': 'a'})
with patch.dict(nftables.__salt__, {'nftables.build_rule': mock}):
mock = MagicMock(side_effect=[{'result': True, 'comment': ''},
{'result': False, 'comment': ''},
{'result': False, 'comment': ''},
{'result': False, 'comment': ''}])
with patch.dict(nftables.__salt__, {'nftables.check': mock}):
ret.update({'comment': 'nftables rule for salt'
' already set (a) for ipv4'})
self.assertDictEqual(nftables.append('salt', table='',
chain=''), ret)
with patch.dict(nftables.__opts__, {"test": True}):
with patch.dict(nftables.__opts__, {'test': True}):
ret.update({'result': None,
'comment': 'nftables rule for salt needs'
' to be set (a) for ipv4'})
self.assertDictEqual(nftables.append('salt', table='',
chain=''), ret)
with patch.dict(nftables.__opts__, {"test": False}):
mock = MagicMock(side_effect=[True, False])
with patch.dict(nftables.__opts__, {'test': False}):
mock = MagicMock(side_effect=[{'result': True,
'comment': ''},
{'result': False,
'comment': ''}])
with patch.dict(nftables.__salt__,
{"nftables.append": mock}):
{'nftables.append': mock}):
ret.update({'changes': {'locale': 'salt'},
'comment': 'Set nftables rule for salt'
' to: a for ipv4',
@ -118,7 +129,7 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
ret.update({'changes': {},
'comment': 'Failed to set nftables'
' rule for salt.\nAttempted rule was'
' a for ipv4', 'result': False})
' a for ipv4.\n', 'result': False})
self.assertDictEqual(nftables.append('salt',
table='',
chain=''),
@ -134,26 +145,34 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'comment': ''}
mock = MagicMock(return_value=[])
with patch.object(nftables, '_STATE_INTERNAL_KEYWORDS', mock):
mock = MagicMock(return_value='a')
with patch.dict(nftables.__salt__, {"nftables.build_rule": mock}):
mock = MagicMock(side_effect=[True, False, False, False])
with patch.dict(nftables.__salt__, {"nftables.check": mock}):
mock = MagicMock(return_value={'result': True,
'comment': '',
'rule': 'a'})
with patch.dict(nftables.__salt__, {'nftables.build_rule': mock}):
mock = MagicMock(side_effect=[{'result': True, 'comment': ''},
{'result': False, 'comment': ''},
{'result': False, 'comment': ''},
{'result': False, 'comment': ''}])
with patch.dict(nftables.__salt__, {'nftables.check': mock}):
ret.update({'comment': 'nftables rule for salt already'
' set for ipv4 (a)'})
self.assertDictEqual(nftables.insert('salt', table='',
chain=''), ret)
with patch.dict(nftables.__opts__, {"test": True}):
with patch.dict(nftables.__opts__, {'test': True}):
ret.update({'result': None,
'comment': 'nftables rule for salt'
' needs to be set for ipv4 (a)'})
self.assertDictEqual(nftables.insert('salt', table='',
chain=''), ret)
with patch.dict(nftables.__opts__, {"test": False}):
mock = MagicMock(side_effect=[True, False])
with patch.dict(nftables.__opts__, {'test': False}):
mock = MagicMock(side_effect=[{'result': True,
'comment': ''},
{'result': False,
'comment': ''}])
with patch.dict(nftables.__salt__,
{"nftables.insert": mock}):
{'nftables.insert': mock}):
ret.update({'changes': {'locale': 'salt'},
'comment': 'Set nftables rule for'
' salt to: a for ipv4',
@ -185,9 +204,14 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
mock = MagicMock(return_value=[])
with patch.object(nftables, '_STATE_INTERNAL_KEYWORDS', mock):
mock = MagicMock(return_value='a')
mock = MagicMock(return_value={'result': True,
'comment': '',
'rule': 'a'})
with patch.dict(nftables.__salt__, {'nftables.build_rule': mock}):
mock = MagicMock(side_effect=[False, True, True, True])
mock = MagicMock(side_effect=[{'result': False, 'comment': ''},
{'result': True, 'comment': ''},
{'result': True, 'comment': ''},
{'result': True, 'comment': ''}])
with patch.dict(nftables.__salt__, {'nftables.check': mock}):
ret.update({'comment': 'nftables rule for salt'
' already absent for ipv4 (a)',
@ -205,7 +229,10 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
chain=''), ret)
with patch.dict(nftables.__opts__, {'test': False}):
mock = MagicMock(side_effect=[True, False])
mock = MagicMock(side_effect=[{'result': True,
'comment': ''},
{'result': False,
'comment': ''}])
with patch.dict(nftables.__salt__,
{'nftables.delete': mock}):
ret.update({'result': True,
@ -239,7 +266,10 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'comment': ''}
mock = MagicMock(return_value=[])
with patch.object(nftables, '_STATE_INTERNAL_KEYWORDS', mock):
mock = MagicMock(side_effect=[False, True, True, True])
mock = MagicMock(side_effect=[{'result': False, 'comment': ''},
{'result': True, 'comment': ''},
{'result': True, 'comment': ''},
{'result': True, 'comment': ''}])
with patch.dict(nftables.__salt__, {'nftables.check_table': mock}):
ret.update({'comment': 'Failed to flush table in family'
' ipv4, table does not exist.',
@ -248,7 +278,9 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
table='', chain=''),
ret)
mock = MagicMock(side_effect=[False, True, True])
mock = MagicMock(side_effect=[{'result': False, 'comment': ''},
{'result': True, 'comment': ''},
{'result': True, 'comment': ''}])
with patch.dict(nftables.__salt__,
{'nftables.check_chain': mock}):
ret.update({'comment': 'Failed to flush chain in table'
@ -256,7 +288,10 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
self.assertDictEqual(nftables.flush('salt', table='',
chain=''), ret)
mock = MagicMock(side_effect=[True, False])
mock = MagicMock(side_effect=[{'result': True,
'comment': ''},
{'result': False,
'comment': ''}])
with patch.dict(nftables.__salt__,
{'nftables.flush': mock}):
ret.update({'changes': {'locale': 'salt'},