mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #51738 from garethgreenaway/nftables_cleanup
[2018.3] Nftables cleanup
This commit is contained in:
commit
c7136cbd3f
4 changed files with 705 additions and 314 deletions
|
@ -117,6 +117,10 @@ def build_rule(table=None, chain=None, command=None, position='', full=None, fam
|
|||
family=ipv6
|
||||
|
||||
'''
|
||||
ret = {'comment': '',
|
||||
'rule': '',
|
||||
'result': False}
|
||||
|
||||
if 'target' in kwargs:
|
||||
kwargs['jump'] = kwargs['target']
|
||||
del kwargs['target']
|
||||
|
@ -152,14 +156,14 @@ def build_rule(table=None, chain=None, command=None, position='', full=None, fam
|
|||
kwargs['dport'] = six.text_type(kwargs['dport'])
|
||||
if ':' in kwargs['dport']:
|
||||
kwargs['dport'] = kwargs['dport'].replace(':', '-')
|
||||
rule += 'dport {{ {0}}} '.format(kwargs['dport'])
|
||||
rule += 'dport {{ {0} }} '.format(kwargs['dport'])
|
||||
del kwargs['dport']
|
||||
|
||||
if 'sport' in kwargs:
|
||||
kwargs['sport'] = six.text_type(kwargs['sport'])
|
||||
if ':' in kwargs['sport']:
|
||||
kwargs['sport'] = kwargs['sport'].replace(':', '-')
|
||||
rule += 'sport {{ {0}}} '.format(kwargs['sport'])
|
||||
rule += 'sport {{ {0} }} '.format(kwargs['sport'])
|
||||
del kwargs['sport']
|
||||
|
||||
if 'dports' in kwargs:
|
||||
|
@ -171,7 +175,7 @@ def build_rule(table=None, chain=None, command=None, position='', full=None, fam
|
|||
_dports.sort(reverse=True)
|
||||
kwargs['dports'] = ', '.join(six.text_type(x) for x in _dports)
|
||||
|
||||
rule += 'dport {{ {0}}} '.format(kwargs['dports'])
|
||||
rule += 'dport {{ {0} }} '.format(kwargs['dports'])
|
||||
del kwargs['dports']
|
||||
|
||||
if 'sports' in kwargs:
|
||||
|
@ -207,8 +211,7 @@ def build_rule(table=None, chain=None, command=None, position='', full=None, fam
|
|||
del kwargs['to-ports']
|
||||
|
||||
if 'to-destination' in kwargs:
|
||||
after_jump.append('--to-destination {0} '.
|
||||
format(kwargs['to-destination']))
|
||||
after_jump.append('--to-destination {0} '.format(kwargs['to-destination']))
|
||||
del kwargs['to-destination']
|
||||
|
||||
if 'reject-with' in kwargs:
|
||||
|
@ -225,31 +228,53 @@ def build_rule(table=None, chain=None, command=None, position='', full=None, fam
|
|||
rule = rule.replace('dport', '{0} dport'.format(proto))
|
||||
rule = rule.replace('sport', '{0} sport'.format(proto))
|
||||
|
||||
ret['rule'] = rule
|
||||
|
||||
if full in ['True', 'true']:
|
||||
|
||||
if not table:
|
||||
return 'Error: Table needs to be specified'
|
||||
ret['comment'] = 'Table needs to be specified'
|
||||
return ret
|
||||
|
||||
if not chain:
|
||||
return 'Error: Chain needs to be specified'
|
||||
ret['comment'] = 'Chain needs to be specified'
|
||||
return ret
|
||||
|
||||
if not command:
|
||||
return 'Error: Command needs to be specified'
|
||||
ret['comment'] = 'Command needs to be specified'
|
||||
return ret
|
||||
|
||||
if command in ['Insert', 'insert', 'INSERT']:
|
||||
if position:
|
||||
return '{0} insert rule {1} {2} {3} position {4} {5}'.\
|
||||
format(_nftables_cmd(), nft_family, table,
|
||||
chain, position, rule)
|
||||
ret['rule'] = '{0} insert rule {1} {2} {3} ' \
|
||||
'position {4} {5}'.format(_nftables_cmd(),
|
||||
nft_family,
|
||||
table,
|
||||
chain,
|
||||
position,
|
||||
rule)
|
||||
else:
|
||||
return '{0} insert rule {1} {2} {3} {4}'.\
|
||||
format(_nftables_cmd(), nft_family, table,
|
||||
chain, rule)
|
||||
ret['rule'] = '{0} insert rule ' \
|
||||
'{1} {2} {3} {4}'.format(_nftables_cmd(),
|
||||
nft_family,
|
||||
table,
|
||||
chain,
|
||||
rule)
|
||||
else:
|
||||
ret['rule'] = '{0} {1} rule {2} {3} {4} {5}'.format(_nftables_cmd(),
|
||||
command,
|
||||
nft_family,
|
||||
table,
|
||||
chain,
|
||||
rule)
|
||||
|
||||
return '{0} {1} rule {2} {3} {4} {5}'.format(_nftables_cmd(),
|
||||
command, nft_family, table, chain, rule)
|
||||
|
||||
return rule
|
||||
if ret['rule']:
|
||||
ret['comment'] = 'Successfully built rule'
|
||||
ret['result'] = True
|
||||
return ret
|
||||
|
||||
|
||||
def get_saved_rules(conf_file=None, family='ipv4'):
|
||||
def get_saved_rules(conf_file=None):
|
||||
'''
|
||||
Return a data structure of the rules in the conf file
|
||||
|
||||
|
@ -291,8 +316,9 @@ def get_rules(family='ipv4'):
|
|||
'''
|
||||
nft_family = _NFTABLES_FAMILIES[family]
|
||||
rules = []
|
||||
cmd = '{0} --numeric --numeric --numeric list tables {1}'.\
|
||||
format(_nftables_cmd(), nft_family)
|
||||
cmd = '{0} --numeric --numeric --numeric ' \
|
||||
'list tables {1}'. format(_nftables_cmd(),
|
||||
nft_family)
|
||||
out = __salt__['cmd.run'](cmd, python_shell=False)
|
||||
if not out:
|
||||
return rules
|
||||
|
@ -300,8 +326,9 @@ def get_rules(family='ipv4'):
|
|||
tables = re.split('\n+', out)
|
||||
for table in tables:
|
||||
table_name = table.split(' ')[1]
|
||||
cmd = '{0} --numeric --numeric --numeric list table {1} {2}'.format(_nftables_cmd(),
|
||||
nft_family, table_name)
|
||||
cmd = '{0} --numeric --numeric --numeric ' \
|
||||
'list table {1} {2}'.format(_nftables_cmd(),
|
||||
nft_family, table_name)
|
||||
out = __salt__['cmd.run'](cmd, python_shell=False)
|
||||
rules.append(out)
|
||||
return rules
|
||||
|
@ -354,29 +381,35 @@ def get_rule_handle(table='filter', chain=None, rule=None, family='ipv4'):
|
|||
.. code-block:: bash
|
||||
|
||||
salt '*' nftables.get_rule_handle filter input \\
|
||||
rule='input tcp dport 22 log accept'
|
||||
rule='tcp dport 22 log accept'
|
||||
|
||||
IPv6:
|
||||
salt '*' nftables.get_rule_handle filter input \\
|
||||
rule='input tcp dport 22 log accept' \\
|
||||
rule='tcp dport 22 log accept' \\
|
||||
family=ipv6
|
||||
'''
|
||||
ret = {'comment': '',
|
||||
'result': False}
|
||||
|
||||
if not chain:
|
||||
return 'Error: Chain needs to be specified'
|
||||
ret['comment'] = 'Chain needs to be specified'
|
||||
return ret
|
||||
|
||||
if not rule:
|
||||
return 'Error: Rule needs to be specified'
|
||||
ret['comment'] = 'Rule needs to be specified'
|
||||
return ret
|
||||
|
||||
if not check_table(table, family=family):
|
||||
return 'Error: table {0} in family {1} does not exist'.\
|
||||
format(table, family)
|
||||
res = check_table(table, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
|
||||
if not check_chain(table, chain, family=family):
|
||||
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
|
||||
format(chain, table, family)
|
||||
res = check_chain(table, chain, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
|
||||
if not check(table, chain, rule, family=family):
|
||||
return 'Error: rule {0} chain {1} in table {2} in family {3} does not exist'.\
|
||||
format(rule, chain, table, family)
|
||||
res = check(table, chain, rule, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
|
||||
nft_family = _NFTABLES_FAMILIES[family]
|
||||
cmd = '{0} --numeric --numeric --numeric --handle list chain {1} {2} {3}'.\
|
||||
|
@ -388,8 +421,9 @@ def get_rule_handle(table='filter', chain=None, rule=None, family='ipv4'):
|
|||
for r in rules:
|
||||
match = pat.search(r)
|
||||
if match:
|
||||
return match.group('handle')
|
||||
return 'Error: could not find rule {0}'.format(rule)
|
||||
return {'result': True, 'handle': match.group('handle')}
|
||||
return {'result': False,
|
||||
'comment': 'Could not find rule {0}'.format(rule)}
|
||||
|
||||
|
||||
def check(table='filter', chain=None, rule=None, family='ipv4'):
|
||||
|
@ -406,40 +440,46 @@ def check(table='filter', chain=None, rule=None, family='ipv4'):
|
|||
.. code-block:: bash
|
||||
|
||||
salt '*' nftables.check filter input \\
|
||||
rule='input tcp dport 22 log accept'
|
||||
rule='tcp dport 22 log accept'
|
||||
|
||||
IPv6:
|
||||
salt '*' nftables.check filter input \\
|
||||
rule='input tcp dport 22 log accept' \\
|
||||
rule='tcp dport 22 log accept' \\
|
||||
family=ipv6
|
||||
'''
|
||||
ret = {'comment': '',
|
||||
'result': False}
|
||||
|
||||
if not chain:
|
||||
return 'Error: Chain needs to be specified'
|
||||
ret['comment'] = 'Chain needs to be specified'
|
||||
return ret
|
||||
|
||||
if not rule:
|
||||
return 'Error: Rule needs to be specified'
|
||||
ret['comment'] = 'Rule needs to be specified'
|
||||
return ret
|
||||
|
||||
if not check_table(table, family=family):
|
||||
return 'Error: table {0} in family {1} does not exist'.\
|
||||
format(table, family)
|
||||
res = check_table(table, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
|
||||
if not check_chain(table, chain, family=family):
|
||||
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
|
||||
format(chain, table, family)
|
||||
res = check_chain(table, chain, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
|
||||
nft_family = _NFTABLES_FAMILIES[family]
|
||||
cmd = '{0} --handle --numeric --numeric --numeric list chain {1} {2} {3}'.\
|
||||
format(_nftables_cmd(), nft_family, table, chain)
|
||||
format(_nftables_cmd(), nft_family, table, chain)
|
||||
search_rule = '{0} #'.format(rule)
|
||||
out = __salt__['cmd.run'](cmd, python_shell=False).find(search_rule)
|
||||
|
||||
if out != -1:
|
||||
out = ''
|
||||
if out == -1:
|
||||
ret['comment'] = 'Rule {0} in chain {1} in table {2} in family {3} does not exist'.\
|
||||
format(rule, chain, table, family)
|
||||
else:
|
||||
return False
|
||||
|
||||
if not out:
|
||||
return True
|
||||
return out
|
||||
ret['comment'] = 'Rule {0} in chain {1} in table {2} in family {3} exists'.\
|
||||
format(rule, chain, table, family)
|
||||
ret['result'] = True
|
||||
return ret
|
||||
|
||||
|
||||
def check_chain(table='filter', chain=None, family='ipv4'):
|
||||
|
@ -458,21 +498,25 @@ def check_chain(table='filter', chain=None, family='ipv4'):
|
|||
salt '*' nftables.check_chain filter input family=ipv6
|
||||
'''
|
||||
|
||||
ret = {'comment': '',
|
||||
'result': False}
|
||||
|
||||
if not chain:
|
||||
return 'Error: Chain needs to be specified'
|
||||
ret['comment'] = 'Chain needs to be specified'
|
||||
return ret
|
||||
|
||||
nft_family = _NFTABLES_FAMILIES[family]
|
||||
cmd = '{0} list table {1} {2}' . format(_nftables_cmd(), nft_family, table)
|
||||
out = __salt__['cmd.run'](cmd, python_shell=False).find('chain {0} {{'.format(chain))
|
||||
|
||||
if out != -1:
|
||||
out = ''
|
||||
if out == -1:
|
||||
ret['comment'] = 'Chain {0} in table {1} in family {2} does not exist'.\
|
||||
format(chain, table, family)
|
||||
else:
|
||||
return False
|
||||
|
||||
if not out:
|
||||
return True
|
||||
return out
|
||||
ret['comment'] = 'Chain {0} in table {1} in family {2} exists'.\
|
||||
format(chain, table, family)
|
||||
ret['result'] = True
|
||||
return ret
|
||||
|
||||
|
||||
def check_table(table=None, family='ipv4'):
|
||||
|
@ -483,21 +527,25 @@ def check_table(table=None, family='ipv4'):
|
|||
|
||||
salt '*' nftables.check_table nat
|
||||
'''
|
||||
ret = {'comment': '',
|
||||
'result': False}
|
||||
|
||||
if not table:
|
||||
return 'Error: table needs to be specified'
|
||||
ret['comment'] = 'Table needs to be specified'
|
||||
return ret
|
||||
|
||||
nft_family = _NFTABLES_FAMILIES[family]
|
||||
cmd = '{0} list tables {1}' . format(_nftables_cmd(), nft_family)
|
||||
out = __salt__['cmd.run'](cmd, python_shell=False).find('table {0} {1}'.format(nft_family, table))
|
||||
|
||||
if out != -1:
|
||||
out = ''
|
||||
if out == -1:
|
||||
ret['comment'] = 'Table {0} in family {1} does not exist'.\
|
||||
format(table, family)
|
||||
else:
|
||||
return False
|
||||
|
||||
if not out:
|
||||
return True
|
||||
return out
|
||||
ret['comment'] = 'Table {0} in family {1} exists'.\
|
||||
format(table, family)
|
||||
ret['result'] = True
|
||||
return ret
|
||||
|
||||
|
||||
def new_table(table, family='ipv4'):
|
||||
|
@ -515,21 +563,29 @@ def new_table(table, family='ipv4'):
|
|||
IPv6:
|
||||
salt '*' nftables.new_table filter family=ipv6
|
||||
'''
|
||||
ret = {'comment': '',
|
||||
'result': False}
|
||||
|
||||
if not table:
|
||||
return 'Error: table needs to be specified'
|
||||
ret['comment'] = 'Table needs to be specified'
|
||||
return ret
|
||||
|
||||
if check_table(table, family=family):
|
||||
return 'Error: table {0} in family {1} already exists'.\
|
||||
format(table, family)
|
||||
res = check_table(table, family=family)
|
||||
if res['result']:
|
||||
return res
|
||||
|
||||
nft_family = _NFTABLES_FAMILIES[family]
|
||||
cmd = '{0} add table {1} {2}'.format(_nftables_cmd(), nft_family, table)
|
||||
out = __salt__['cmd.run'](cmd, python_shell=False)
|
||||
|
||||
if not out:
|
||||
out = True
|
||||
return out
|
||||
ret['comment'] = 'Table {0} in family {1} created'.\
|
||||
format(table, family)
|
||||
ret['result'] = True
|
||||
else:
|
||||
ret['comment'] = 'Table {0} in family {1} could not be created'.\
|
||||
format(table, family)
|
||||
return ret
|
||||
|
||||
|
||||
def delete_table(table, family='ipv4'):
|
||||
|
@ -547,20 +603,29 @@ def delete_table(table, family='ipv4'):
|
|||
IPv6:
|
||||
salt '*' nftables.delete_table filter family=ipv6
|
||||
'''
|
||||
ret = {'comment': '',
|
||||
'result': False}
|
||||
|
||||
if not table:
|
||||
return 'Error: table needs to be specified'
|
||||
ret['comment'] = 'Table needs to be specified'
|
||||
return ret
|
||||
|
||||
if not check_table(table, family=family):
|
||||
return 'Error: table {0} in family {1} does not exist' . format(table, family)
|
||||
res = check_table(table, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
|
||||
nft_family = _NFTABLES_FAMILIES[family]
|
||||
cmd = '{0} delete table {1} {2}'.format(_nftables_cmd(), nft_family, table)
|
||||
out = __salt__['cmd.run'](cmd, python_shell=False)
|
||||
|
||||
if not out:
|
||||
out = True
|
||||
return out
|
||||
ret['comment'] = 'Table {0} in family {1} deleted'.\
|
||||
format(table, family)
|
||||
ret['result'] = True
|
||||
else:
|
||||
ret['comment'] = 'Table {0} in family {1} could not be deleted'.\
|
||||
format(table, family)
|
||||
return ret
|
||||
|
||||
|
||||
def new_chain(table='filter', chain=None, table_type=None, hook=None, priority=None, family='ipv4'):
|
||||
|
@ -588,34 +653,45 @@ def new_chain(table='filter', chain=None, table_type=None, hook=None, priority=N
|
|||
|
||||
salt '*' nftables.new_chain filter foo family=ipv6
|
||||
'''
|
||||
ret = {'comment': '',
|
||||
'result': False}
|
||||
|
||||
if not chain:
|
||||
return 'Error: Chain needs to be specified'
|
||||
ret['comment'] = 'Chain needs to be specified'
|
||||
return ret
|
||||
|
||||
if not check_table(table, family=family):
|
||||
return 'Error: table {0} in family {1} does not exist'.\
|
||||
format(table, family)
|
||||
res = check_table(table, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
|
||||
if check_chain(table, chain, family=family):
|
||||
return 'Error: chain {0} in table {1} in family {2} already exists'.\
|
||||
res = check_chain(table, chain, family=family)
|
||||
if res['result']:
|
||||
ret['comment'] = 'Chain {0} in table {1} in family {2} already exists'.\
|
||||
format(chain, table, family)
|
||||
return ret
|
||||
|
||||
nft_family = _NFTABLES_FAMILIES[family]
|
||||
cmd = '{0} add chain {1} {2} {3}'.\
|
||||
format(_nftables_cmd(), nft_family, table, chain)
|
||||
format(_nftables_cmd(), nft_family, table, chain)
|
||||
if table_type or hook or priority:
|
||||
if table_type and hook and six.text_type(priority):
|
||||
cmd = r'{0} \{{ type {1} hook {2} priority {3}\; \}}'.\
|
||||
format(cmd, table_type, hook, priority)
|
||||
else:
|
||||
# Specify one, rqeuire all
|
||||
return 'Error: table_type hook and priority required'
|
||||
# Specify one, require all
|
||||
ret['comment'] = 'Table_type, hook, and priority required.'
|
||||
return ret
|
||||
|
||||
out = __salt__['cmd.run'](cmd, python_shell=False)
|
||||
|
||||
if not out:
|
||||
out = True
|
||||
return out
|
||||
ret['comment'] = 'Chain {0} in table {1} in family {2} created'.\
|
||||
format(chain, table, family)
|
||||
ret['result'] = True
|
||||
else:
|
||||
ret['comment'] = 'Chain {0} in table {1} in family {2} could not be created'.\
|
||||
format(chain, table, family)
|
||||
return ret
|
||||
|
||||
|
||||
def delete_chain(table='filter', chain=None, family='ipv4'):
|
||||
|
@ -637,26 +713,34 @@ def delete_chain(table='filter', chain=None, family='ipv4'):
|
|||
|
||||
salt '*' nftables.delete_chain filter foo family=ipv6
|
||||
'''
|
||||
ret = {'comment': '',
|
||||
'result': False}
|
||||
|
||||
if not chain:
|
||||
return 'Error: Chain needs to be specified'
|
||||
ret['comment'] = 'Chain needs to be specified'
|
||||
return ret
|
||||
|
||||
if not check_table(table, family=family):
|
||||
return 'Error: table {0} in family {1} does not exist'.\
|
||||
format(table, family)
|
||||
res = check_table(table, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
|
||||
if not check_chain(table, chain, family=family):
|
||||
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
|
||||
format(chain, table, family)
|
||||
res = check_chain(table, chain, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
|
||||
nft_family = _NFTABLES_FAMILIES[family]
|
||||
cmd = '{0} delete chain {1} {2} {3}'.\
|
||||
format(_nftables_cmd(), nft_family, table, chain)
|
||||
format(_nftables_cmd(), nft_family, table, chain)
|
||||
out = __salt__['cmd.run'](cmd, python_shell=False)
|
||||
|
||||
if not out:
|
||||
out = True
|
||||
return out
|
||||
ret['comment'] = 'Chain {0} in table {1} in family {2} deleted'.\
|
||||
format(chain, table, family)
|
||||
ret['result'] = True
|
||||
else:
|
||||
ret['comment'] = 'Chain {0} in table {1} in family {2} could not be deleted'.\
|
||||
format(chain, table, family)
|
||||
return ret
|
||||
|
||||
|
||||
def append(table='filter', chain=None, rule=None, family='ipv4'):
|
||||
|
@ -673,38 +757,51 @@ def append(table='filter', chain=None, rule=None, family='ipv4'):
|
|||
.. code-block:: bash
|
||||
|
||||
salt '*' nftables.append filter input \\
|
||||
rule='input tcp dport 22 log accept'
|
||||
rule='tcp dport 22 log accept'
|
||||
|
||||
IPv6:
|
||||
salt '*' nftables.append filter input \\
|
||||
rule='input tcp dport 22 log accept' \\
|
||||
rule='tcp dport 22 log accept' \\
|
||||
family=ipv6
|
||||
'''
|
||||
ret = {'comment': 'Failed to append rule {0} to chain {1} in table {2}.'.format(rule, chain, table),
|
||||
'result': False}
|
||||
|
||||
if not chain:
|
||||
return 'Error: Chain needs to be specified'
|
||||
ret['comment'] = 'Chain needs to be specified'
|
||||
return ret
|
||||
|
||||
if not rule:
|
||||
return 'Error: Rule needs to be specified'
|
||||
ret['comment'] = 'Rule needs to be specified'
|
||||
return ret
|
||||
|
||||
if not check_table(table, family=family):
|
||||
return 'Error: table {0} in family {1} does not exist'.\
|
||||
format(table, family)
|
||||
res = check_table(table, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
|
||||
if not check_chain(table, chain, family=family):
|
||||
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
|
||||
format(chain, table, family)
|
||||
res = check_chain(table, chain, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
|
||||
if check(table, chain, rule, family=family):
|
||||
return 'Error: rule {0} chain {1} in table {2} in family {3} already exists'.\
|
||||
format(rule, chain, table, family)
|
||||
res = check(table, chain, rule, family=family)
|
||||
if res['result']:
|
||||
ret['comment'] = 'Rule {0} chain {1} in table {2} in family {3} already exists'.\
|
||||
format(rule, chain, table, family)
|
||||
return ret
|
||||
|
||||
nft_family = _NFTABLES_FAMILIES[family]
|
||||
cmd = '{0} add rule {1} {2} {3} {4}'.\
|
||||
format(_nftables_cmd(), nft_family, table, chain, rule)
|
||||
format(_nftables_cmd(), nft_family, table, chain, rule)
|
||||
out = __salt__['cmd.run'](cmd, python_shell=False)
|
||||
|
||||
if len(out) == 0:
|
||||
return True
|
||||
ret['result'] = True
|
||||
ret['comment'] = 'Added rule "{0}" chain {1} in table {2} in family {3}.'.\
|
||||
format(rule, chain, table, family)
|
||||
else:
|
||||
return False
|
||||
ret['comment'] = 'Failed to add rule "{0}" chain {1} in table {2} in family {3}.'.\
|
||||
format(rule, chain, table, family)
|
||||
return ret
|
||||
|
||||
|
||||
def insert(table='filter', chain=None, position=None, rule=None, family='ipv4'):
|
||||
|
@ -723,36 +820,44 @@ def insert(table='filter', chain=None, position=None, rule=None, family='ipv4'):
|
|||
.. code-block:: bash
|
||||
|
||||
salt '*' nftables.insert filter input \\
|
||||
rule='input tcp dport 22 log accept'
|
||||
rule='tcp dport 22 log accept'
|
||||
|
||||
salt '*' nftables.insert filter input position=3 \\
|
||||
rule='input tcp dport 22 log accept'
|
||||
rule='tcp dport 22 log accept'
|
||||
|
||||
IPv6:
|
||||
salt '*' nftables.insert filter input \\
|
||||
rule='input tcp dport 22 log accept' \\
|
||||
rule='tcp dport 22 log accept' \\
|
||||
family=ipv6
|
||||
|
||||
salt '*' nftables.insert filter input position=3 \\
|
||||
rule='input tcp dport 22 log accept' \\
|
||||
rule='tcp dport 22 log accept' \\
|
||||
family=ipv6
|
||||
'''
|
||||
ret = {'comment': 'Failed to insert rule {0} to table {1}.'.format(rule, table),
|
||||
'result': False}
|
||||
|
||||
if not chain:
|
||||
return 'Error: Chain needs to be specified'
|
||||
ret['comment'] = 'Chain needs to be specified'
|
||||
return ret
|
||||
|
||||
if not rule:
|
||||
return 'Error: Rule needs to be specified'
|
||||
ret['comment'] = 'Rule needs to be specified'
|
||||
return ret
|
||||
|
||||
if not check_table(table, family=family):
|
||||
return 'Error: table {0} in family {1} does not exist'.\
|
||||
format(table, family)
|
||||
res = check_table(table, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
|
||||
if not check_chain(table, chain, family=family):
|
||||
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
|
||||
format(chain, table, family)
|
||||
res = check_chain(table, chain, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
|
||||
if check(table, chain, rule, family=family):
|
||||
return 'Error: rule {0} chain {1} in table {2} in family {3} already exists'.\
|
||||
format(rule, chain, table, family)
|
||||
res = check(table, chain, rule, family=family)
|
||||
if res['result']:
|
||||
ret['comment'] = 'Rule {0} chain {1} in table {2} in family {3} already exists'.\
|
||||
format(rule, chain, table, family)
|
||||
return ret
|
||||
|
||||
nft_family = _NFTABLES_FAMILIES[family]
|
||||
if position:
|
||||
|
@ -764,9 +869,13 @@ def insert(table='filter', chain=None, position=None, rule=None, family='ipv4'):
|
|||
out = __salt__['cmd.run'](cmd, python_shell=False)
|
||||
|
||||
if len(out) == 0:
|
||||
return True
|
||||
ret['result'] = True
|
||||
ret['comment'] = 'Added rule "{0}" chain {1} in table {2} in family {3}.'.\
|
||||
format(rule, chain, table, family)
|
||||
else:
|
||||
return False
|
||||
ret['comment'] = 'Failed to add rule "{0}" chain {1} in table {2} in family {3}.'.\
|
||||
format(rule, chain, table, family)
|
||||
return ret
|
||||
|
||||
|
||||
def delete(table, chain=None, position=None, rule=None, family='ipv4'):
|
||||
|
@ -786,30 +895,35 @@ def delete(table, chain=None, position=None, rule=None, family='ipv4'):
|
|||
salt '*' nftables.delete filter input position=3
|
||||
|
||||
salt '*' nftables.delete filter input \\
|
||||
rule='input tcp dport 22 log accept'
|
||||
rule='tcp dport 22 log accept'
|
||||
|
||||
IPv6:
|
||||
salt '*' nftables.delete filter input position=3 family=ipv6
|
||||
|
||||
salt '*' nftables.delete filter input \\
|
||||
rule='input tcp dport 22 log accept' \\
|
||||
rule='tcp dport 22 log accept' \\
|
||||
family=ipv6
|
||||
'''
|
||||
ret = {'comment': 'Failed to delete rule {0} in table {1}.'.format(rule, table),
|
||||
'result': False}
|
||||
|
||||
if position and rule:
|
||||
return 'Error: Only specify a position or a rule, not both'
|
||||
ret['comment'] = 'Only specify a position or a rule, not both'
|
||||
return ret
|
||||
|
||||
if not check_table(table, family=family):
|
||||
return 'Error: table {0} in family {1} does not exist'.\
|
||||
format(table, family)
|
||||
res = check_table(table, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
|
||||
if not check_chain(table, chain, family=family):
|
||||
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
|
||||
format(chain, table, family)
|
||||
res = check_chain(table, chain, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
|
||||
if not check(table, chain, rule, family=family):
|
||||
return 'Error: rule {0} chain {1} in table {2} in family {3} does not exist'.\
|
||||
format(rule, chain, table, family)
|
||||
res = check(table, chain, rule, family=family)
|
||||
if not res['result']:
|
||||
ret['comment'] = 'Rule {0} chain {1} in table {2} in family {3} does not exist'.\
|
||||
format(rule, chain, table, family)
|
||||
return ret
|
||||
|
||||
# nftables rules can only be deleted using the handle
|
||||
# if we don't have it, find it.
|
||||
|
@ -818,13 +932,17 @@ def delete(table, chain=None, position=None, rule=None, family='ipv4'):
|
|||
|
||||
nft_family = _NFTABLES_FAMILIES[family]
|
||||
cmd = '{0} delete rule {1} {2} {3} handle {4}'.\
|
||||
format(_nftables_cmd(), nft_family, table, chain, position)
|
||||
format(_nftables_cmd(), nft_family, table, chain, position)
|
||||
out = __salt__['cmd.run'](cmd, python_shell=False)
|
||||
|
||||
if len(out) == 0:
|
||||
return True
|
||||
ret['result'] = True
|
||||
ret['comment'] = 'Deleted rule "{0}" in chain {1} in table {2} in family {3}.'.\
|
||||
format(rule, chain, table, family)
|
||||
else:
|
||||
return False
|
||||
ret['comment'] = 'Failed to delete rule "{0}" in chain {1} table {2} in family {3}'.\
|
||||
format(rule, chain, table, family)
|
||||
return ret
|
||||
|
||||
|
||||
def flush(table='filter', chain='', family='ipv4'):
|
||||
|
@ -843,24 +961,33 @@ def flush(table='filter', chain='', family='ipv4'):
|
|||
IPv6:
|
||||
salt '*' nftables.flush filter input family=ipv6
|
||||
'''
|
||||
if not check_table(table, family=family):
|
||||
return 'Error: table {0} in family {1} does not exist'.\
|
||||
format(table, family)
|
||||
ret = {'comment': 'Failed to flush rules from chain {0} in table {1}.'.format(chain, table),
|
||||
'result': False}
|
||||
|
||||
res = check_table(table, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
|
||||
nft_family = _NFTABLES_FAMILIES[family]
|
||||
|
||||
if chain:
|
||||
if not check_chain(table, chain, family=family):
|
||||
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
|
||||
format(chain, table, nft_family)
|
||||
res = check_chain(table, chain, family=family)
|
||||
if not res['result']:
|
||||
return res
|
||||
cmd = '{0} flush chain {1} {2} {3}'.\
|
||||
format(_nftables_cmd(), nft_family, table, chain)
|
||||
format(_nftables_cmd(), nft_family, table, chain)
|
||||
comment = 'from chain {0} in table {1} in family {2}.'.\
|
||||
format(chain, table, family)
|
||||
else:
|
||||
cmd = '{0} flush table {1} {2}'.\
|
||||
format(_nftables_cmd(), nft_family, table)
|
||||
comment = 'from table {0} in family {1}.'.\
|
||||
format(table, family)
|
||||
out = __salt__['cmd.run'](cmd, python_shell=False)
|
||||
|
||||
if len(out) == 0:
|
||||
return True
|
||||
ret['result'] = True
|
||||
ret['comment'] = 'Flushed rules {0}'.format(comment)
|
||||
else:
|
||||
return False
|
||||
ret['comment'] = 'Failed to flush rules {0}'.format(comment)
|
||||
return ret
|
||||
|
|
|
@ -99,6 +99,15 @@ at some point be deprecated in favor of a more generic `firewall` state.
|
|||
- sport: 1025:65535
|
||||
- save: True
|
||||
|
||||
output:
|
||||
nftables.chain_present:
|
||||
- family: ip
|
||||
- table: filter
|
||||
|
||||
output:
|
||||
nftables.chain_absent:
|
||||
- family: ip
|
||||
- table: filter
|
||||
|
||||
'''
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
|
@ -106,6 +115,9 @@ from __future__ import absolute_import, print_function, unicode_literals
|
|||
# Import salt libs
|
||||
from salt.state import STATE_INTERNAL_KEYWORDS as _STATE_INTERNAL_KEYWORDS
|
||||
|
||||
import logging
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def __virtual__():
|
||||
'''
|
||||
|
@ -136,13 +148,13 @@ def chain_present(name, table='filter', table_type=None, hook=None, priority=Non
|
|||
'comment': ''}
|
||||
|
||||
chain_check = __salt__['nftables.check_chain'](table, name, family=family)
|
||||
if chain_check is True:
|
||||
if chain_check['result'] is True:
|
||||
ret['result'] = True
|
||||
ret['comment'] = ('nftables {0} chain is already exist in {1} table for {2}'
|
||||
.format(name, table, family))
|
||||
return ret
|
||||
|
||||
command = __salt__['nftables.new_chain'](
|
||||
res = __salt__['nftables.new_chain'](
|
||||
table,
|
||||
name,
|
||||
table_type=table_type,
|
||||
|
@ -151,7 +163,7 @@ def chain_present(name, table='filter', table_type=None, hook=None, priority=Non
|
|||
family=family
|
||||
)
|
||||
|
||||
if command is True:
|
||||
if res['result'] is True:
|
||||
ret['changes'] = {'locale': name}
|
||||
ret['result'] = True
|
||||
ret['comment'] = ('nftables {0} chain in {1} table create success for {2}'
|
||||
|
@ -162,7 +174,7 @@ def chain_present(name, table='filter', table_type=None, hook=None, priority=Non
|
|||
ret['comment'] = 'Failed to create {0} chain in {1} table: {2} for {3}'.format(
|
||||
name,
|
||||
table,
|
||||
command.strip(),
|
||||
res['comment'].strip(),
|
||||
family
|
||||
)
|
||||
return ret
|
||||
|
@ -239,26 +251,38 @@ def append(name, family='ipv4', **kwargs):
|
|||
for ignore in _STATE_INTERNAL_KEYWORDS:
|
||||
if ignore in kwargs:
|
||||
del kwargs[ignore]
|
||||
rule = __salt__['nftables.build_rule'](family=family, **kwargs)
|
||||
command = __salt__['nftables.build_rule'](full=True, family=family, command='add', **kwargs)
|
||||
res = __salt__['nftables.build_rule'](family=family, **kwargs)
|
||||
if not res['result']:
|
||||
return res
|
||||
rule = res['rule']
|
||||
|
||||
if __salt__['nftables.check'](kwargs['table'],
|
||||
kwargs['chain'],
|
||||
rule,
|
||||
family) is True:
|
||||
res = __salt__['nftables.build_rule'](full=True, family=family, command='add', **kwargs)
|
||||
if not res['result']:
|
||||
return res
|
||||
command = res['rule']
|
||||
|
||||
res = __salt__['nftables.check'](kwargs['table'],
|
||||
kwargs['chain'],
|
||||
rule,
|
||||
family)
|
||||
if res['result']:
|
||||
ret['result'] = True
|
||||
ret['comment'] = 'nftables rule for {0} already set ({1}) for {2}'.format(
|
||||
name,
|
||||
command.strip(),
|
||||
family)
|
||||
return ret
|
||||
if __opts__['test']:
|
||||
if 'test' in __opts__ and __opts__['test']:
|
||||
ret['comment'] = 'nftables rule for {0} needs to be set ({1}) for {2}'.format(
|
||||
name,
|
||||
command.strip(),
|
||||
family)
|
||||
return ret
|
||||
if __salt__['nftables.append'](kwargs['table'], kwargs['chain'], rule, family):
|
||||
res = __salt__['nftables.append'](kwargs['table'],
|
||||
kwargs['chain'],
|
||||
rule,
|
||||
family)
|
||||
if res['result']:
|
||||
ret['changes'] = {'locale': name}
|
||||
ret['result'] = True
|
||||
ret['comment'] = 'Set nftables rule for {0} to: {1} for {2}'.format(
|
||||
|
@ -274,9 +298,10 @@ def append(name, family='ipv4', **kwargs):
|
|||
else:
|
||||
ret['result'] = False
|
||||
ret['comment'] = ('Failed to set nftables rule for {0}.\n'
|
||||
'Attempted rule was {1} for {2}').format(
|
||||
'Attempted rule was {1} for {2}.\n'
|
||||
'{3}').format(
|
||||
name,
|
||||
command.strip(), family)
|
||||
command.strip(), family, res['comment'])
|
||||
return ret
|
||||
|
||||
|
||||
|
@ -306,25 +331,42 @@ def insert(name, family='ipv4', **kwargs):
|
|||
for ignore in _STATE_INTERNAL_KEYWORDS:
|
||||
if ignore in kwargs:
|
||||
del kwargs[ignore]
|
||||
rule = __salt__['nftables.build_rule'](family=family, **kwargs)
|
||||
command = __salt__['nftables.build_rule'](full=True, family=family, command='insert', **kwargs)
|
||||
if __salt__['nftables.check'](kwargs['table'],
|
||||
kwargs['chain'],
|
||||
rule,
|
||||
family) is True:
|
||||
res = __salt__['nftables.build_rule'](family=family, **kwargs)
|
||||
if not res['result']:
|
||||
return res
|
||||
rule = res['rule']
|
||||
|
||||
res = __salt__['nftables.build_rule'](full=True,
|
||||
family=family,
|
||||
command='insert',
|
||||
**kwargs)
|
||||
if not res['result']:
|
||||
return res
|
||||
command = res['rule']
|
||||
|
||||
res = __salt__['nftables.check'](kwargs['table'],
|
||||
kwargs['chain'],
|
||||
rule,
|
||||
family)
|
||||
if res['result']:
|
||||
ret['result'] = True
|
||||
ret['comment'] = 'nftables rule for {0} already set for {1} ({2})'.format(
|
||||
name,
|
||||
family,
|
||||
command.strip())
|
||||
return ret
|
||||
if __opts__['test']:
|
||||
if 'test' in __opts__ and __opts__['test']:
|
||||
ret['comment'] = 'nftables rule for {0} needs to be set for {1} ({2})'.format(
|
||||
name,
|
||||
family,
|
||||
command.strip())
|
||||
return ret
|
||||
if __salt__['nftables.insert'](kwargs['table'], kwargs['chain'], kwargs['position'], rule, family):
|
||||
res = __salt__['nftables.insert'](kwargs['table'],
|
||||
kwargs['chain'],
|
||||
kwargs['position'],
|
||||
rule,
|
||||
family)
|
||||
if res['result']:
|
||||
ret['changes'] = {'locale': name}
|
||||
ret['result'] = True
|
||||
ret['comment'] = 'Set nftables rule for {0} to: {1} for {2}'.format(
|
||||
|
@ -372,19 +414,29 @@ def delete(name, family='ipv4', **kwargs):
|
|||
for ignore in _STATE_INTERNAL_KEYWORDS:
|
||||
if ignore in kwargs:
|
||||
del kwargs[ignore]
|
||||
rule = __salt__['nftables.build_rule'](family=family, **kwargs)
|
||||
command = __salt__['nftables.build_rule'](full=True, family=family, command='D', **kwargs)
|
||||
if not __salt__['nftables.check'](kwargs['table'],
|
||||
kwargs['chain'],
|
||||
rule,
|
||||
family) is True:
|
||||
res = __salt__['nftables.build_rule'](family=family, **kwargs)
|
||||
if not res['result']:
|
||||
return res
|
||||
rule = res['rule']
|
||||
|
||||
res = __salt__['nftables.build_rule'](full=True, family=family, command='D', **kwargs)
|
||||
if not res['result']:
|
||||
return res
|
||||
command = res['rule']
|
||||
|
||||
res = __salt__['nftables.check'](kwargs['table'],
|
||||
kwargs['chain'],
|
||||
rule,
|
||||
family)
|
||||
|
||||
if not res['result']:
|
||||
ret['result'] = True
|
||||
ret['comment'] = 'nftables rule for {0} already absent for {1} ({2})'.format(
|
||||
name,
|
||||
family,
|
||||
command.strip())
|
||||
return ret
|
||||
if __opts__['test']:
|
||||
if 'test' in __opts__ and __opts__['test']:
|
||||
ret['comment'] = 'nftables rule for {0} needs to be deleted for {1} ({2})'.format(
|
||||
name,
|
||||
family,
|
||||
|
@ -392,19 +444,19 @@ def delete(name, family='ipv4', **kwargs):
|
|||
return ret
|
||||
|
||||
if 'position' in kwargs:
|
||||
result = __salt__['nftables.delete'](
|
||||
res = __salt__['nftables.delete'](
|
||||
kwargs['table'],
|
||||
kwargs['chain'],
|
||||
family=family,
|
||||
position=kwargs['position'])
|
||||
else:
|
||||
result = __salt__['nftables.delete'](
|
||||
res = __salt__['nftables.delete'](
|
||||
kwargs['table'],
|
||||
kwargs['chain'],
|
||||
family=family,
|
||||
rule=rule)
|
||||
|
||||
if result:
|
||||
if res['result']:
|
||||
ret['changes'] = {'locale': name}
|
||||
ret['result'] = True
|
||||
ret['comment'] = 'Delete nftables rule for {0} {1}'.format(
|
||||
|
@ -447,7 +499,8 @@ def flush(name, family='ipv4', **kwargs):
|
|||
if 'table' not in kwargs:
|
||||
kwargs['table'] = 'filter'
|
||||
|
||||
if not __salt__['nftables.check_table'](kwargs['table'], family=family):
|
||||
res = __salt__['nftables.check_table'](kwargs['table'], family=family)
|
||||
if not res['result']:
|
||||
ret['result'] = False
|
||||
ret['comment'] = 'Failed to flush table {0} in family {1}, table does not exist.'.format(
|
||||
kwargs['table'],
|
||||
|
@ -458,7 +511,10 @@ def flush(name, family='ipv4', **kwargs):
|
|||
if 'chain' not in kwargs:
|
||||
kwargs['chain'] = ''
|
||||
else:
|
||||
if not __salt__['nftables.check_chain'](kwargs['table'], kwargs['chain'], family=family):
|
||||
res = __salt__['nftables.check_chain'](kwargs['table'],
|
||||
kwargs['chain'],
|
||||
family=family)
|
||||
if not res['result']:
|
||||
ret['result'] = False
|
||||
ret['comment'] = 'Failed to flush chain {0} in table {1} in family {2}, chain does not exist.'.format(
|
||||
kwargs['chain'],
|
||||
|
@ -467,7 +523,10 @@ def flush(name, family='ipv4', **kwargs):
|
|||
)
|
||||
return ret
|
||||
|
||||
if __salt__['nftables.flush'](kwargs['table'], kwargs['chain'], family):
|
||||
res = __salt__['nftables.flush'](kwargs['table'],
|
||||
kwargs['chain'],
|
||||
family)
|
||||
if res['result']:
|
||||
ret['changes'] = {'locale': name}
|
||||
ret['result'] = True
|
||||
ret['comment'] = 'Flush nftables rules in {0} table {1} chain {2} family'.format(
|
||||
|
|
|
@ -48,29 +48,44 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
Test if it build a well-formatted nftables rule based on kwargs.
|
||||
'''
|
||||
self.assertEqual(nftables.build_rule(full='True'),
|
||||
'Error: Table needs to be specified')
|
||||
{'result': False,
|
||||
'rule': '',
|
||||
'comment': 'Table needs to be specified'})
|
||||
|
||||
self.assertEqual(nftables.build_rule(table='filter', full='True'),
|
||||
'Error: Chain needs to be specified')
|
||||
{'result': False,
|
||||
'rule': '',
|
||||
'comment': 'Chain needs to be specified'})
|
||||
|
||||
self.assertEqual(nftables.build_rule(table='filter', chain='input',
|
||||
full='True'),
|
||||
'Error: Command needs to be specified')
|
||||
{'result': False,
|
||||
'rule': '',
|
||||
'comment': 'Command needs to be specified'})
|
||||
|
||||
self.assertEqual(nftables.build_rule(table='filter', chain='input',
|
||||
command='insert', position='3',
|
||||
full='True'),
|
||||
'nft insert rule ip filter input position 3 ')
|
||||
{'result': True,
|
||||
'rule': 'nft insert rule ip filter input position 3 ',
|
||||
'comment': 'Successfully built rule'})
|
||||
|
||||
self.assertEqual(nftables.build_rule(table='filter', chain='input',
|
||||
command='insert', full='True'),
|
||||
'nft insert rule ip filter input ')
|
||||
{'result': True,
|
||||
'rule': 'nft insert rule ip filter input ',
|
||||
'comment': 'Successfully built rule'})
|
||||
|
||||
self.assertEqual(nftables.build_rule(table='filter', chain='input',
|
||||
command='halt', full='True'),
|
||||
'nft halt rule ip filter input ')
|
||||
{'result': True,
|
||||
'rule': 'nft halt rule ip filter input ',
|
||||
'comment': 'Successfully built rule'})
|
||||
|
||||
self.assertEqual(nftables.build_rule(), '')
|
||||
self.assertEqual(nftables.build_rule(),
|
||||
{'result': True,
|
||||
'rule': '',
|
||||
'comment': ''})
|
||||
|
||||
# 'get_saved_rules' function tests: 1
|
||||
|
||||
|
@ -119,33 +134,46 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
Test if it get the handle for a particular rule
|
||||
'''
|
||||
self.assertEqual(nftables.get_rule_handle(),
|
||||
'Error: Chain needs to be specified')
|
||||
{'result': False,
|
||||
'comment': 'Chain needs to be specified'})
|
||||
|
||||
self.assertEqual(nftables.get_rule_handle(chain='input'),
|
||||
'Error: Rule needs to be specified')
|
||||
{'result': False,
|
||||
'comment': 'Rule needs to be specified'})
|
||||
|
||||
_ru = 'input tcp dport 22 log accept'
|
||||
ret = 'Error: table filter in family ipv4 does not exist'
|
||||
ret = {'result': False,
|
||||
'comment': 'Table filter in family ipv4 does not exist'}
|
||||
mock = MagicMock(return_value='')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.get_rule_handle(chain='input', rule=_ru),
|
||||
ret)
|
||||
|
||||
ret = 'Error: chain input in table filter in family ipv4 does not exist'
|
||||
ret = {'result': False,
|
||||
'comment': 'Chain input in table filter in family ipv4 does not exist'}
|
||||
mock = MagicMock(return_value='table ip filter')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.get_rule_handle(chain='input', rule=_ru),
|
||||
ret)
|
||||
|
||||
ret = ('Error: rule input tcp dport 22 log accept chain input'
|
||||
' in table filter in family ipv4 does not exist')
|
||||
ret1 = 'Error: could not find rule input tcp dport 22 log accept'
|
||||
ret = {'result': False,
|
||||
'comment': ('Rule input tcp dport 22 log accept chain'
|
||||
' input in table filter in family ipv4 does not exist')}
|
||||
ret1 = {'result': False,
|
||||
'comment': 'Could not find rule input tcp dport 22 log accept'}
|
||||
with patch.object(nftables, 'check_table',
|
||||
MagicMock(return_value=True)):
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})):
|
||||
with patch.object(nftables, 'check_chain',
|
||||
MagicMock(return_value=True)):
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})):
|
||||
_ret1 = {'result': False,
|
||||
'comment': ('Rule input tcp dport 22 log accept'
|
||||
' chain input in table filter in'
|
||||
' family ipv4 does not exist')}
|
||||
_ret2 = {'result': True, 'comment': ''}
|
||||
with patch.object(nftables, 'check',
|
||||
MagicMock(side_effect=[False, True])):
|
||||
MagicMock(side_effect=[_ret1, _ret2])):
|
||||
self.assertEqual(nftables.get_rule_handle(chain='input',
|
||||
rule=_ru), ret)
|
||||
|
||||
|
@ -163,30 +191,38 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
Test if it check for the existence of a rule in the table and chain
|
||||
'''
|
||||
self.assertEqual(nftables.check(),
|
||||
'Error: Chain needs to be specified')
|
||||
{'result': False,
|
||||
'comment': 'Chain needs to be specified'})
|
||||
|
||||
self.assertEqual(nftables.check(chain='input'),
|
||||
'Error: Rule needs to be specified')
|
||||
{'result': False,
|
||||
'comment': 'Rule needs to be specified'})
|
||||
|
||||
_ru = 'input tcp dport 22 log accept'
|
||||
ret = 'Error: table filter in family ipv4 does not exist'
|
||||
_ru = 'tcp dport 22 log accept'
|
||||
ret = {'result': False,
|
||||
'comment': 'Table filter in family ipv4 does not exist'}
|
||||
mock = MagicMock(return_value='')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.check(chain='input', rule=_ru), ret)
|
||||
|
||||
mock = MagicMock(return_value='table ip filter')
|
||||
ret = 'Error: chain input in table filter in family ipv4 does not exist'
|
||||
ret = {'result': False,
|
||||
'comment': 'Chain input in table filter in family ipv4 does not exist'}
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.check(chain='input', rule=_ru), ret)
|
||||
|
||||
mock = MagicMock(return_value='table ip filter chain input {{')
|
||||
ret = {'result': False, 'comment':
|
||||
'Rule tcp dport 22 log accept in chain input in table filter in family ipv4 does not exist'}
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertFalse(nftables.check(chain='input', rule=_ru))
|
||||
self.assertEqual(nftables.check(chain='input', rule=_ru), ret)
|
||||
|
||||
r_val = 'table ip filter chain input {{ input tcp dport 22 log accept #'
|
||||
mock = MagicMock(return_value=r_val)
|
||||
ret = {'result': True,
|
||||
'comment': 'Rule tcp dport 22 log accept in chain input in table filter in family ipv4 exists'}
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertTrue(nftables.check(chain='input', rule=_ru))
|
||||
self.assertEqual(nftables.check(chain='input', rule=_ru), ret)
|
||||
|
||||
# 'check_chain' function tests: 1
|
||||
|
||||
|
@ -195,15 +231,20 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
Test if it check for the existence of a chain in the table
|
||||
'''
|
||||
self.assertEqual(nftables.check_chain(),
|
||||
'Error: Chain needs to be specified')
|
||||
{'result': False,
|
||||
'comment': 'Chain needs to be specified'})
|
||||
|
||||
mock = MagicMock(return_value='')
|
||||
ret = {'comment': 'Chain input in table filter in family ipv4 does not exist',
|
||||
'result': False}
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertFalse(nftables.check_chain(chain='input'))
|
||||
self.assertEqual(nftables.check_chain(chain='input'), ret)
|
||||
|
||||
mock = MagicMock(return_value='chain input {{')
|
||||
ret = {'comment': 'Chain input in table filter in family ipv4 exists',
|
||||
'result': True}
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertTrue(nftables.check_chain(chain='input'))
|
||||
self.assertEqual(nftables.check_chain(chain='input'), ret)
|
||||
|
||||
# 'check_table' function tests: 1
|
||||
|
||||
|
@ -212,15 +253,20 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
Test if it check for the existence of a table
|
||||
'''
|
||||
self.assertEqual(nftables.check_table(),
|
||||
'Error: table needs to be specified')
|
||||
{'result': False,
|
||||
'comment': 'Table needs to be specified'})
|
||||
|
||||
mock = MagicMock(return_value='')
|
||||
ret = {'comment': 'Table nat in family ipv4 does not exist',
|
||||
'result': False}
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertFalse(nftables.check_table(table='nat'))
|
||||
self.assertEqual(nftables.check_table(table='nat'), ret)
|
||||
|
||||
mock = MagicMock(return_value='table ip nat')
|
||||
ret = {'comment': 'Table nat in family ipv4 exists',
|
||||
'result': True}
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertTrue(nftables.check_table(table='nat'))
|
||||
self.assertEqual(nftables.check_table(table='nat'), ret)
|
||||
|
||||
# 'new_table' function tests: 1
|
||||
|
||||
|
@ -229,16 +275,19 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
Test if it create new custom table.
|
||||
'''
|
||||
self.assertEqual(nftables.new_table(table=None),
|
||||
'Error: table needs to be specified')
|
||||
{'result': False,
|
||||
'comment': 'Table needs to be specified'})
|
||||
|
||||
mock = MagicMock(return_value='')
|
||||
ret = {'comment': 'Table nat in family ipv4 created', 'result': True}
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.new_table(table='nat'), True)
|
||||
self.assertEqual(nftables.new_table(table='nat'), ret)
|
||||
|
||||
mock = MagicMock(return_value='table ip nat')
|
||||
ret = {'comment': 'Table nat in family ipv4 exists', 'result': True}
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.new_table(table='nat'),
|
||||
'Error: table nat in family ipv4 already exists')
|
||||
ret)
|
||||
|
||||
# 'delete_table' function tests: 1
|
||||
|
||||
|
@ -247,16 +296,35 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
Test if it delete custom table.
|
||||
'''
|
||||
self.assertEqual(nftables.delete_table(table=None),
|
||||
'Error: table needs to be specified')
|
||||
{'result': False,
|
||||
'comment': 'Table needs to be specified'})
|
||||
|
||||
mock = MagicMock(return_value='')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.delete_table(table='nat'),
|
||||
'Error: table nat in family ipv4 does not exist')
|
||||
mock_ret = {'result': False,
|
||||
'comment': 'Table nat in family ipv4 does not exist'}
|
||||
with patch('salt.modules.nftables.check_table',
|
||||
MagicMock(return_value=mock_ret)):
|
||||
ret = nftables.delete_table(table='nat')
|
||||
self.assertEqual(ret,
|
||||
{'result': False,
|
||||
'comment': 'Table nat in family ipv4 does not exist'})
|
||||
|
||||
mock = MagicMock(return_value='table ip nat')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.delete_table(table='nat'), 'table ip nat')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
|
||||
patch('salt.modules.nftables.check_table',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})):
|
||||
self.assertEqual(nftables.delete_table(table='nat'),
|
||||
{'comment': 'Table nat in family ipv4 could not be deleted',
|
||||
'result': False})
|
||||
|
||||
mock = MagicMock(return_value='')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
|
||||
patch('salt.modules.nftables.check_table',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})):
|
||||
self.assertEqual(nftables.delete_table(table='nat'),
|
||||
{'comment': 'Table nat in family ipv4 deleted',
|
||||
'result': True})
|
||||
|
||||
# 'new_chain' function tests: 2
|
||||
|
||||
|
@ -265,14 +333,17 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
Test if it create new chain to the specified table.
|
||||
'''
|
||||
self.assertEqual(nftables.new_chain(),
|
||||
'Error: Chain needs to be specified')
|
||||
{'result': False,
|
||||
'comment': 'Chain needs to be specified'})
|
||||
|
||||
ret = 'Error: table filter in family ipv4 does not exist'
|
||||
ret = {'result': False,
|
||||
'comment': 'Table filter in family ipv4 does not exist'}
|
||||
mock = MagicMock(return_value='')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.new_chain(chain='input'), ret)
|
||||
|
||||
ret = 'Error: chain input in table filter in family ipv4 already exists'
|
||||
ret = {'result': False,
|
||||
'comment': 'Chain input in table filter in family ipv4 already exists'}
|
||||
mock = MagicMock(return_value='table ip filter chain input {{')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.new_chain(chain='input'), ret)
|
||||
|
@ -283,11 +354,16 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
'''
|
||||
mock = MagicMock(return_value='')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
|
||||
patch('salt.modules.nftables.check_chain', MagicMock(return_value=False)), \
|
||||
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
|
||||
patch('salt.modules.nftables.check_chain',
|
||||
MagicMock(return_value={'result': False,
|
||||
'comment': ''})), \
|
||||
patch('salt.modules.nftables.check_table',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})):
|
||||
self.assertEqual(nftables.new_chain(chain='input',
|
||||
table_type='filter'),
|
||||
'Error: table_type hook and priority required')
|
||||
{'result': False,
|
||||
'comment': 'Table_type, hook, and priority required.'})
|
||||
|
||||
self.assertTrue(nftables.new_chain(chain='input',
|
||||
table_type='filter',
|
||||
|
@ -300,16 +376,37 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
Test if it delete the chain from the specified table.
|
||||
'''
|
||||
self.assertEqual(nftables.delete_chain(),
|
||||
'Error: Chain needs to be specified')
|
||||
{'result': False,
|
||||
'comment': 'Chain needs to be specified'})
|
||||
|
||||
ret = 'Error: table filter in family ipv4 does not exist'
|
||||
ret = {'result': False,
|
||||
'comment': 'Table filter in family ipv4 does not exist'}
|
||||
mock = MagicMock(return_value='')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.delete_chain(chain='input'), ret)
|
||||
|
||||
ret = 'Error: chain input in table filter in family ipv4 does not exist'
|
||||
ret = {'result': False,
|
||||
'comment': 'Chain input in table filter in family ipv4 could not be deleted'}
|
||||
mock = MagicMock(return_value='table ip filter')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
|
||||
patch('salt.modules.nftables.check_table',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})), \
|
||||
patch('salt.modules.nftables.check_chain',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})):
|
||||
self.assertEqual(nftables.delete_chain(chain='input'), ret)
|
||||
|
||||
ret = {'result': True,
|
||||
'comment': 'Chain input in table filter in family ipv4 deleted'}
|
||||
mock = MagicMock(return_value='')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
|
||||
patch('salt.modules.nftables.check_table',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})), \
|
||||
patch('salt.modules.nftables.check_chain',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})):
|
||||
self.assertEqual(nftables.delete_chain(chain='input'), ret)
|
||||
|
||||
def test_delete_chain_variables(self):
|
||||
|
@ -318,9 +415,15 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
'''
|
||||
mock = MagicMock(return_value='')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
|
||||
patch('salt.modules.nftables.check_chain', MagicMock(return_value=True)), \
|
||||
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
|
||||
self.assertTrue(nftables.delete_chain(chain='input'))
|
||||
patch('salt.modules.nftables.check_chain',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})), \
|
||||
patch('salt.modules.nftables.check_table',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})):
|
||||
_expected = {'comment': 'Chain input in table filter in family ipv4 deleted',
|
||||
'result': True}
|
||||
self.assertEqual(nftables.delete_chain(chain='input'), _expected)
|
||||
|
||||
# 'append' function tests: 2
|
||||
|
||||
|
@ -329,26 +432,34 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
Test if it append a rule to the specified table & chain.
|
||||
'''
|
||||
self.assertEqual(nftables.append(),
|
||||
'Error: Chain needs to be specified')
|
||||
{'result': False,
|
||||
'comment': 'Chain needs to be specified'})
|
||||
|
||||
self.assertEqual(nftables.append(chain='input'),
|
||||
'Error: Rule needs to be specified')
|
||||
{'result': False,
|
||||
'comment': 'Rule needs to be specified'})
|
||||
|
||||
_ru = 'input tcp dport 22 log accept'
|
||||
ret = 'Error: table filter in family ipv4 does not exist'
|
||||
ret = {'comment': 'Table filter in family ipv4 does not exist',
|
||||
'result': False}
|
||||
mock = MagicMock(return_value='')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.append(chain='input', rule=_ru), ret)
|
||||
|
||||
ret = 'Error: chain input in table filter in family ipv4 does not exist'
|
||||
ret = {'comment': 'Chain input in table filter in family ipv4 does not exist',
|
||||
'result': False}
|
||||
mock = MagicMock(return_value='table ip filter')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.append(chain='input', rule=_ru), ret)
|
||||
|
||||
r_val = 'table ip filter chain input {{ input tcp dport 22 log accept #'
|
||||
mock = MagicMock(return_value=r_val)
|
||||
_expected = {'comment': 'Rule input tcp dport 22 log accept chain input in table filter in family ipv4 already exists',
|
||||
'result': False}
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertTrue(nftables.append(chain='input', rule=_ru))
|
||||
self.assertEqual(nftables.append(chain='input',
|
||||
rule=_ru),
|
||||
_expected)
|
||||
|
||||
def test_append_rule(self):
|
||||
'''
|
||||
|
@ -357,11 +468,19 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
_ru = 'input tcp dport 22 log accept'
|
||||
mock = MagicMock(side_effect=['1', ''])
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
|
||||
patch('salt.modules.nftables.check', MagicMock(return_value=False)), \
|
||||
patch('salt.modules.nftables.check_chain', MagicMock(return_value=True)), \
|
||||
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
|
||||
self.assertFalse(nftables.append(chain='input', rule=_ru))
|
||||
self.assertTrue(nftables.append(chain='input', rule=_ru))
|
||||
patch('salt.modules.nftables.check',
|
||||
MagicMock(return_value={'result': False,
|
||||
'comment': ''})), \
|
||||
patch('salt.modules.nftables.check_chain',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})), \
|
||||
patch('salt.modules.nftables.check_table',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})):
|
||||
_expected = {'comment': 'Failed to add rule "{0}" chain input in table filter in family ipv4.'.format(_ru), 'result': False}
|
||||
self.assertEqual(nftables.append(chain='input', rule=_ru), _expected)
|
||||
_expected = {'comment': 'Added rule "{0}" chain input in table filter in family ipv4.'.format(_ru), 'result': True}
|
||||
self.assertEqual(nftables.append(chain='input', rule=_ru), _expected)
|
||||
|
||||
# 'insert' function tests: 2
|
||||
|
||||
|
@ -371,18 +490,22 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
at the specified position.
|
||||
'''
|
||||
self.assertEqual(nftables.insert(),
|
||||
'Error: Chain needs to be specified')
|
||||
{'result': False,
|
||||
'comment': 'Chain needs to be specified'})
|
||||
|
||||
self.assertEqual(nftables.insert(chain='input'),
|
||||
'Error: Rule needs to be specified')
|
||||
{'result': False,
|
||||
'comment': 'Rule needs to be specified'})
|
||||
|
||||
_ru = 'input tcp dport 22 log accept'
|
||||
ret = 'Error: table filter in family ipv4 does not exist'
|
||||
ret = {'result': False,
|
||||
'comment': 'Table filter in family ipv4 does not exist'}
|
||||
mock = MagicMock(return_value='')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.insert(chain='input', rule=_ru), ret)
|
||||
|
||||
ret = 'Error: chain input in table filter in family ipv4 does not exist'
|
||||
ret = {'result': False,
|
||||
'comment': 'Chain input in table filter in family ipv4 does not exist'}
|
||||
mock = MagicMock(return_value='table ip filter')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.insert(chain='input', rule=_ru), ret)
|
||||
|
@ -390,6 +513,10 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
r_val = 'table ip filter chain input {{ input tcp dport 22 log accept #'
|
||||
mock = MagicMock(return_value=r_val)
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
res = nftables.insert(chain='input', rule=_ru)
|
||||
import logging
|
||||
log = logging.getLogger(__name__)
|
||||
log.debug('=== res %s ===', res)
|
||||
self.assertTrue(nftables.insert(chain='input', rule=_ru))
|
||||
|
||||
def test_insert_rule(self):
|
||||
|
@ -400,11 +527,23 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
_ru = 'input tcp dport 22 log accept'
|
||||
mock = MagicMock(side_effect=['1', ''])
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
|
||||
patch('salt.modules.nftables.check', MagicMock(return_value=False)), \
|
||||
patch('salt.modules.nftables.check_chain', MagicMock(return_value=True)), \
|
||||
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
|
||||
self.assertFalse(nftables.insert(chain='input', rule=_ru))
|
||||
self.assertTrue(nftables.insert(chain='input', rule=_ru))
|
||||
patch('salt.modules.nftables.check',
|
||||
MagicMock(return_value={'result': False,
|
||||
'comment': ''})), \
|
||||
patch('salt.modules.nftables.check_chain',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})), \
|
||||
patch('salt.modules.nftables.check_table',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})):
|
||||
_expected = {'result': False,
|
||||
'comment': 'Failed to add rule "{0}" chain input in table filter in family ipv4.'.format(_ru)}
|
||||
self.assertEqual(nftables.insert(chain='input', rule=_ru),
|
||||
_expected)
|
||||
_expected = {'result': True,
|
||||
'comment': 'Added rule "{0}" chain input in table filter in family ipv4.'.format(_ru)}
|
||||
self.assertEqual(nftables.insert(chain='input', rule=_ru),
|
||||
_expected)
|
||||
|
||||
# 'delete' function tests: 2
|
||||
|
||||
|
@ -415,17 +554,21 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
the rule's position in the chain.
|
||||
'''
|
||||
_ru = 'input tcp dport 22 log accept'
|
||||
ret = {'result': False,
|
||||
'comment': 'Only specify a position or a rule, not both'}
|
||||
self.assertEqual(nftables.delete(table='filter', chain='input',
|
||||
position='3', rule=_ru),
|
||||
'Error: Only specify a position or a rule, not both')
|
||||
ret)
|
||||
|
||||
ret = 'Error: table filter in family ipv4 does not exist'
|
||||
ret = {'result': False,
|
||||
'comment': 'Table filter in family ipv4 does not exist'}
|
||||
mock = MagicMock(return_value='')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.delete(table='filter', chain='input',
|
||||
rule=_ru), ret)
|
||||
|
||||
ret = 'Error: chain input in table filter in family ipv4 does not exist'
|
||||
ret = {'result': False,
|
||||
'comment': 'Chain input in table filter in family ipv4 does not exist'}
|
||||
mock = MagicMock(return_value='table ip filter')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.delete(table='filter', chain='input',
|
||||
|
@ -444,14 +587,27 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
'''
|
||||
mock = MagicMock(side_effect=['1', ''])
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
|
||||
patch('salt.modules.nftables.check', MagicMock(return_value=True)), \
|
||||
patch('salt.modules.nftables.check_chain', MagicMock(return_value=True)), \
|
||||
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
|
||||
self.assertFalse(nftables.delete(table='filter', chain='input',
|
||||
position='3'))
|
||||
self.assertTrue(nftables.delete(table='filter', chain='input',
|
||||
position='3'))
|
||||
|
||||
patch('salt.modules.nftables.check',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})), \
|
||||
patch('salt.modules.nftables.check_chain',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})), \
|
||||
patch('salt.modules.nftables.check_table',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})):
|
||||
_expected = {'result': False,
|
||||
'comment': 'Failed to delete rule "None" in chain input table filter in family ipv4'}
|
||||
self.assertEqual(nftables.delete(table='filter',
|
||||
chain='input',
|
||||
position='3'),
|
||||
_expected)
|
||||
_expected = {'result': True,
|
||||
'comment': 'Deleted rule "None" in chain input in table filter in family ipv4.'}
|
||||
self.assertEqual(nftables.delete(table='filter',
|
||||
chain='input',
|
||||
position='3'),
|
||||
_expected)
|
||||
# 'flush' function tests: 2
|
||||
|
||||
def test_flush(self):
|
||||
|
@ -459,12 +615,14 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
Test if it flush the chain in the specified table, flush all chains
|
||||
in the specified table if chain is not specified.
|
||||
'''
|
||||
ret = 'Error: table filter in family ipv4 does not exist'
|
||||
ret = {'result': False,
|
||||
'comment': 'Table filter in family ipv4 does not exist'}
|
||||
mock = MagicMock(return_value='')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.flush(table='filter', chain='input'), ret)
|
||||
|
||||
ret = 'Error: chain input in table filter in family ip does not exist'
|
||||
ret = {'result': False,
|
||||
'comment': 'Chain input in table filter in family ipv4 does not exist'}
|
||||
mock = MagicMock(return_value='table ip filter')
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
||||
self.assertEqual(nftables.flush(table='filter', chain='input'), ret)
|
||||
|
@ -476,7 +634,19 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
'''
|
||||
mock = MagicMock(side_effect=['1', ''])
|
||||
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
|
||||
patch('salt.modules.nftables.check_chain', MagicMock(return_value=True)), \
|
||||
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
|
||||
self.assertFalse(nftables.flush(table='filter', chain='input'))
|
||||
self.assertTrue(nftables.flush(table='filter', chain='input'))
|
||||
patch('salt.modules.nftables.check_chain',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})), \
|
||||
patch('salt.modules.nftables.check_table',
|
||||
MagicMock(return_value={'result': True,
|
||||
'comment': ''})):
|
||||
_expected = {'result': False,
|
||||
'comment': 'Failed to flush rules from chain input in table filter in family ipv4.'}
|
||||
self.assertEqual(nftables.flush(table='filter',
|
||||
chain='input'),
|
||||
_expected)
|
||||
_expected = {'result': True,
|
||||
'comment': 'Flushed rules from chain input in table filter in family ipv4.'}
|
||||
self.assertEqual(nftables.flush(table='filter',
|
||||
chain='input'),
|
||||
_expected)
|
||||
|
|
|
@ -36,14 +36,17 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
'changes': {},
|
||||
'result': True,
|
||||
'comment': ''}
|
||||
mock = MagicMock(side_effect=[True, False, False])
|
||||
with patch.dict(nftables.__salt__, {"nftables.check_chain": mock}):
|
||||
mock = MagicMock(side_effect=[{'result': True, 'comment': ''},
|
||||
{'result': False, 'comment': ''},
|
||||
{'result': False, 'comment': ''}])
|
||||
with patch.dict(nftables.__salt__, {'nftables.check_chain': mock}):
|
||||
ret.update({'comment': 'nftables salt chain is already'
|
||||
' exist in filter table for ipv4'})
|
||||
self.assertDictEqual(nftables.chain_present('salt'), ret)
|
||||
|
||||
mock = MagicMock(side_effect=[True, ''])
|
||||
with patch.dict(nftables.__salt__, {"nftables.new_chain": mock}):
|
||||
mock = MagicMock(side_effect=[{'result': True, 'comment': ''},
|
||||
{'result': False, 'comment': ''}])
|
||||
with patch.dict(nftables.__salt__, {'nftables.new_chain': mock}):
|
||||
ret.update({'changes': {'locale': 'salt'},
|
||||
'comment': 'nftables salt chain in filter'
|
||||
' table create success for ipv4'})
|
||||
|
@ -64,13 +67,13 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
'result': True,
|
||||
'comment': ''}
|
||||
mock = MagicMock(side_effect=[False, True])
|
||||
with patch.dict(nftables.__salt__, {"nftables.check_chain": mock}):
|
||||
with patch.dict(nftables.__salt__, {'nftables.check_chain': mock}):
|
||||
ret.update({'comment': 'nftables salt chain is already absent'
|
||||
' in filter table for ipv4'})
|
||||
self.assertDictEqual(nftables.chain_absent('salt'), ret)
|
||||
|
||||
mock = MagicMock(return_value='')
|
||||
with patch.dict(nftables.__salt__, {"nftables.flush": mock}):
|
||||
with patch.dict(nftables.__salt__, {'nftables.flush': mock}):
|
||||
ret.update({'result': False,
|
||||
'comment': 'Failed to flush salt chain'
|
||||
' in filter table: for ipv4'})
|
||||
|
@ -86,26 +89,34 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
'comment': ''}
|
||||
mock = MagicMock(return_value=[])
|
||||
with patch.object(nftables, '_STATE_INTERNAL_KEYWORDS', mock):
|
||||
mock = MagicMock(return_value='a')
|
||||
with patch.dict(nftables.__salt__, {"nftables.build_rule": mock}):
|
||||
mock = MagicMock(side_effect=[True, False, False, False])
|
||||
with patch.dict(nftables.__salt__, {"nftables.check": mock}):
|
||||
mock = MagicMock(return_value={'result': True,
|
||||
'comment': '',
|
||||
'rule': 'a'})
|
||||
with patch.dict(nftables.__salt__, {'nftables.build_rule': mock}):
|
||||
mock = MagicMock(side_effect=[{'result': True, 'comment': ''},
|
||||
{'result': False, 'comment': ''},
|
||||
{'result': False, 'comment': ''},
|
||||
{'result': False, 'comment': ''}])
|
||||
with patch.dict(nftables.__salt__, {'nftables.check': mock}):
|
||||
ret.update({'comment': 'nftables rule for salt'
|
||||
' already set (a) for ipv4'})
|
||||
self.assertDictEqual(nftables.append('salt', table='',
|
||||
chain=''), ret)
|
||||
|
||||
with patch.dict(nftables.__opts__, {"test": True}):
|
||||
with patch.dict(nftables.__opts__, {'test': True}):
|
||||
ret.update({'result': None,
|
||||
'comment': 'nftables rule for salt needs'
|
||||
' to be set (a) for ipv4'})
|
||||
self.assertDictEqual(nftables.append('salt', table='',
|
||||
chain=''), ret)
|
||||
|
||||
with patch.dict(nftables.__opts__, {"test": False}):
|
||||
mock = MagicMock(side_effect=[True, False])
|
||||
with patch.dict(nftables.__opts__, {'test': False}):
|
||||
mock = MagicMock(side_effect=[{'result': True,
|
||||
'comment': ''},
|
||||
{'result': False,
|
||||
'comment': ''}])
|
||||
with patch.dict(nftables.__salt__,
|
||||
{"nftables.append": mock}):
|
||||
{'nftables.append': mock}):
|
||||
ret.update({'changes': {'locale': 'salt'},
|
||||
'comment': 'Set nftables rule for salt'
|
||||
' to: a for ipv4',
|
||||
|
@ -118,7 +129,7 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
ret.update({'changes': {},
|
||||
'comment': 'Failed to set nftables'
|
||||
' rule for salt.\nAttempted rule was'
|
||||
' a for ipv4', 'result': False})
|
||||
' a for ipv4.\n', 'result': False})
|
||||
self.assertDictEqual(nftables.append('salt',
|
||||
table='',
|
||||
chain=''),
|
||||
|
@ -134,26 +145,34 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
'comment': ''}
|
||||
mock = MagicMock(return_value=[])
|
||||
with patch.object(nftables, '_STATE_INTERNAL_KEYWORDS', mock):
|
||||
mock = MagicMock(return_value='a')
|
||||
with patch.dict(nftables.__salt__, {"nftables.build_rule": mock}):
|
||||
mock = MagicMock(side_effect=[True, False, False, False])
|
||||
with patch.dict(nftables.__salt__, {"nftables.check": mock}):
|
||||
mock = MagicMock(return_value={'result': True,
|
||||
'comment': '',
|
||||
'rule': 'a'})
|
||||
with patch.dict(nftables.__salt__, {'nftables.build_rule': mock}):
|
||||
mock = MagicMock(side_effect=[{'result': True, 'comment': ''},
|
||||
{'result': False, 'comment': ''},
|
||||
{'result': False, 'comment': ''},
|
||||
{'result': False, 'comment': ''}])
|
||||
with patch.dict(nftables.__salt__, {'nftables.check': mock}):
|
||||
ret.update({'comment': 'nftables rule for salt already'
|
||||
' set for ipv4 (a)'})
|
||||
self.assertDictEqual(nftables.insert('salt', table='',
|
||||
chain=''), ret)
|
||||
|
||||
with patch.dict(nftables.__opts__, {"test": True}):
|
||||
with patch.dict(nftables.__opts__, {'test': True}):
|
||||
ret.update({'result': None,
|
||||
'comment': 'nftables rule for salt'
|
||||
' needs to be set for ipv4 (a)'})
|
||||
self.assertDictEqual(nftables.insert('salt', table='',
|
||||
chain=''), ret)
|
||||
|
||||
with patch.dict(nftables.__opts__, {"test": False}):
|
||||
mock = MagicMock(side_effect=[True, False])
|
||||
with patch.dict(nftables.__opts__, {'test': False}):
|
||||
mock = MagicMock(side_effect=[{'result': True,
|
||||
'comment': ''},
|
||||
{'result': False,
|
||||
'comment': ''}])
|
||||
with patch.dict(nftables.__salt__,
|
||||
{"nftables.insert": mock}):
|
||||
{'nftables.insert': mock}):
|
||||
ret.update({'changes': {'locale': 'salt'},
|
||||
'comment': 'Set nftables rule for'
|
||||
' salt to: a for ipv4',
|
||||
|
@ -185,9 +204,14 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
|
||||
mock = MagicMock(return_value=[])
|
||||
with patch.object(nftables, '_STATE_INTERNAL_KEYWORDS', mock):
|
||||
mock = MagicMock(return_value='a')
|
||||
mock = MagicMock(return_value={'result': True,
|
||||
'comment': '',
|
||||
'rule': 'a'})
|
||||
with patch.dict(nftables.__salt__, {'nftables.build_rule': mock}):
|
||||
mock = MagicMock(side_effect=[False, True, True, True])
|
||||
mock = MagicMock(side_effect=[{'result': False, 'comment': ''},
|
||||
{'result': True, 'comment': ''},
|
||||
{'result': True, 'comment': ''},
|
||||
{'result': True, 'comment': ''}])
|
||||
with patch.dict(nftables.__salt__, {'nftables.check': mock}):
|
||||
ret.update({'comment': 'nftables rule for salt'
|
||||
' already absent for ipv4 (a)',
|
||||
|
@ -205,7 +229,10 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
chain=''), ret)
|
||||
|
||||
with patch.dict(nftables.__opts__, {'test': False}):
|
||||
mock = MagicMock(side_effect=[True, False])
|
||||
mock = MagicMock(side_effect=[{'result': True,
|
||||
'comment': ''},
|
||||
{'result': False,
|
||||
'comment': ''}])
|
||||
with patch.dict(nftables.__salt__,
|
||||
{'nftables.delete': mock}):
|
||||
ret.update({'result': True,
|
||||
|
@ -239,7 +266,10 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
'comment': ''}
|
||||
mock = MagicMock(return_value=[])
|
||||
with patch.object(nftables, '_STATE_INTERNAL_KEYWORDS', mock):
|
||||
mock = MagicMock(side_effect=[False, True, True, True])
|
||||
mock = MagicMock(side_effect=[{'result': False, 'comment': ''},
|
||||
{'result': True, 'comment': ''},
|
||||
{'result': True, 'comment': ''},
|
||||
{'result': True, 'comment': ''}])
|
||||
with patch.dict(nftables.__salt__, {'nftables.check_table': mock}):
|
||||
ret.update({'comment': 'Failed to flush table in family'
|
||||
' ipv4, table does not exist.',
|
||||
|
@ -248,7 +278,9 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
table='', chain=''),
|
||||
ret)
|
||||
|
||||
mock = MagicMock(side_effect=[False, True, True])
|
||||
mock = MagicMock(side_effect=[{'result': False, 'comment': ''},
|
||||
{'result': True, 'comment': ''},
|
||||
{'result': True, 'comment': ''}])
|
||||
with patch.dict(nftables.__salt__,
|
||||
{'nftables.check_chain': mock}):
|
||||
ret.update({'comment': 'Failed to flush chain in table'
|
||||
|
@ -256,7 +288,10 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
|
|||
self.assertDictEqual(nftables.flush('salt', table='',
|
||||
chain=''), ret)
|
||||
|
||||
mock = MagicMock(side_effect=[True, False])
|
||||
mock = MagicMock(side_effect=[{'result': True,
|
||||
'comment': ''},
|
||||
{'result': False,
|
||||
'comment': ''}])
|
||||
with patch.dict(nftables.__salt__,
|
||||
{'nftables.flush': mock}):
|
||||
ret.update({'changes': {'locale': 'salt'},
|
||||
|
|
Loading…
Add table
Reference in a new issue