mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Allow external_auth
limit funs/args for runner/wheel modules.
Example: external_auth: pam: thatch: - '@runner': - 'mod.fun': args: ['a', 'b'] kwargs: aa: bb
This commit is contained in:
parent
a67db1be3c
commit
7ec398eb72
5 changed files with 450 additions and 144 deletions
|
@ -93,6 +93,26 @@ By user, by minion:
|
|||
<minion compound target>:
|
||||
- <regex to match function>
|
||||
|
||||
By user, by runner/wheel:
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
external_auth:
|
||||
<eauth backend>:
|
||||
<user or group%>:
|
||||
<@runner or @wheel>:
|
||||
- <regex to match function>
|
||||
|
||||
By user, by runner+wheel module:
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
external_auth:
|
||||
<eauth backend>:
|
||||
<user or group%>:
|
||||
<@module_name>:
|
||||
- <regex to match function without module_name>
|
||||
|
||||
Groups
|
||||
------
|
||||
|
||||
|
@ -127,6 +147,14 @@ Positional arguments or keyword arguments to functions can also be whitelisted.
|
|||
kwargs:
|
||||
'kwa': 'kwa.*'
|
||||
'kwb': 'kwb'
|
||||
- '@runner':
|
||||
- 'runner_mod.*':
|
||||
args:
|
||||
- 'a.*'
|
||||
- 'b.*'
|
||||
kwargs:
|
||||
'kwa': 'kwa.*'
|
||||
'kwb': 'kwb'
|
||||
|
||||
The rules:
|
||||
|
||||
|
|
|
@ -1071,7 +1071,7 @@ class LocalFuncs(object):
|
|||
'for user {0}.').format(username)))
|
||||
auth_list = self.loadauth.get_auth_list(load)
|
||||
|
||||
if not self.ckminions.runner_check(auth_list, load['fun']):
|
||||
if not self.ckminions.runner_check(auth_list, load['fun'], load['kwarg']):
|
||||
return dict(error=dict(name=err_name,
|
||||
message=('Authentication failure of type "{0}" occurred '
|
||||
'for user {1}.').format(auth_type, username)))
|
||||
|
@ -1127,7 +1127,7 @@ class LocalFuncs(object):
|
|||
'user {0}.').format(username)))
|
||||
|
||||
if auth_type != 'user':
|
||||
if not self.ckminions.wheel_check(auth_list, load['fun']):
|
||||
if not self.ckminions.wheel_check(auth_list, load['fun'], load['kwarg']):
|
||||
return dict(error=dict(name=err_name,
|
||||
message=('Authentication failure of type "{0}" occurred for '
|
||||
'user {1}.').format(auth_type, username)))
|
||||
|
|
|
@ -1684,7 +1684,7 @@ class ClearFuncs(object):
|
|||
clear_load[u'username'] = token[u'name']
|
||||
auth_list = self.loadauth.get_auth_list(clear_load)
|
||||
|
||||
if not self.ckminions.runner_check(auth_list, clear_load[u'fun']):
|
||||
if not self.ckminions.runner_check(auth_list, clear_load[u'fun'], clear_load[u'kwarg']):
|
||||
return dict(error=dict(name=u'TokenAuthenticationError',
|
||||
message=(u'Authentication failure of type "token" occurred for '
|
||||
u'user {0}.').format(token[u'name'])))
|
||||
|
@ -1697,7 +1697,7 @@ class ClearFuncs(object):
|
|||
u'user {0}.').format(clear_load.get(u'username', u'UNKNOWN'))))
|
||||
|
||||
auth_list = self.loadauth.get_auth_list(clear_load)
|
||||
if not self.ckminions.runner_check(auth_list, clear_load[u'fun']):
|
||||
if not self.ckminions.runner_check(auth_list, clear_load[u'fun'], clear_load[u'kwarg']):
|
||||
return dict(error=dict(name=u'EauthAuthenticationError',
|
||||
message=(u'Authentication failure of type "eauth" occurred for '
|
||||
u'user {0}.').format(clear_load.get(u'username', u'UNKNOWN'))))
|
||||
|
@ -1751,7 +1751,7 @@ class ClearFuncs(object):
|
|||
clear_load[u'eauth'] = token[u'eauth']
|
||||
clear_load[u'username'] = token[u'name']
|
||||
auth_list = self.loadauth.get_auth_list(clear_load)
|
||||
if not self.ckminions.wheel_check(auth_list, clear_load[u'fun']):
|
||||
if not self.ckminions.wheel_check(auth_list, clear_load[u'fun'], clear_load[u'kwarg']):
|
||||
return dict(error=dict(name=u'TokenAuthenticationError',
|
||||
message=(u'Authentication failure of type "token" occurred for '
|
||||
u'user {0}.').format(token[u'name'])))
|
||||
|
@ -1764,7 +1764,7 @@ class ClearFuncs(object):
|
|||
u'user {0}.').format(clear_load.get(u'username', u'UNKNOWN'))))
|
||||
|
||||
auth_list = self.loadauth.get_auth_list(clear_load)
|
||||
if not self.ckminions.wheel_check(auth_list, clear_load[u'fun']):
|
||||
if not self.ckminions.wheel_check(auth_list, clear_load[u'fun'], clear_load[u'kwarg']):
|
||||
return dict(error=dict(name=u'EauthAuthenticationError',
|
||||
message=(u'Authentication failure of type "eauth" occurred for '
|
||||
u'user {0}.').format(clear_load.get(u'username', u'UNKNOWN'))))
|
||||
|
|
|
@ -733,6 +733,7 @@ class CkMinions(object):
|
|||
return self.spec_check(
|
||||
auth_list,
|
||||
fun,
|
||||
arg,
|
||||
form)
|
||||
|
||||
def auth_check_expanded(self,
|
||||
|
@ -902,73 +903,15 @@ class CkMinions(object):
|
|||
tgt_type,
|
||||
minions=minions):
|
||||
# Minions are allowed, verify function in allowed list
|
||||
if isinstance(ind[valid], six.string_types):
|
||||
if self.match_check(ind[valid], fun):
|
||||
return True
|
||||
elif isinstance(ind[valid], list):
|
||||
for cond in ind[valid]:
|
||||
# Function name match
|
||||
if isinstance(cond, six.string_types):
|
||||
if self.match_check(cond, fun):
|
||||
return True
|
||||
# Function and args match
|
||||
elif isinstance(cond, dict):
|
||||
if len(cond) != 1:
|
||||
# Invalid argument
|
||||
continue
|
||||
fcond = next(six.iterkeys(cond))
|
||||
# cond: {
|
||||
# 'mod.func': {
|
||||
# 'args': [
|
||||
# 'one.*', 'two\\|three'],
|
||||
# 'kwargs': {
|
||||
# 'functioin': 'teach\\|feed',
|
||||
# 'user': 'mother\\|father'
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
if self.match_check(fcond,
|
||||
fun): # check key that is function name match
|
||||
acond = cond[fcond]
|
||||
if not isinstance(acond, dict):
|
||||
# Invalid argument
|
||||
continue
|
||||
# whitelist args, kwargs
|
||||
arg_list = args[num]
|
||||
cond_args = acond.get('args', [])
|
||||
good = True
|
||||
for i, cond_arg in enumerate(cond_args):
|
||||
if len(arg_list) <= i:
|
||||
good = False
|
||||
break
|
||||
if cond_arg is None: # None == '.*' i.e. allow any
|
||||
continue
|
||||
if not self.match_check(cond_arg,
|
||||
arg_list[i]):
|
||||
good = False
|
||||
break
|
||||
if not good:
|
||||
continue
|
||||
# Check kwargs
|
||||
cond_kwargs = acond.get('kwargs', {})
|
||||
arg_kwargs = {}
|
||||
for a in arg_list:
|
||||
if isinstance(a,
|
||||
dict) and '__kwarg__' in a:
|
||||
arg_kwargs = a
|
||||
break
|
||||
for k, v in six.iteritems(cond_kwargs):
|
||||
if k not in arg_kwargs:
|
||||
good = False
|
||||
break
|
||||
if v is None: # None == '.*' i.e. allow any
|
||||
continue
|
||||
if not self.match_check(v,
|
||||
arg_kwargs[k]):
|
||||
good = False
|
||||
break
|
||||
if good:
|
||||
return True
|
||||
fun_args = args[num]
|
||||
fun_kwargs = fun_args[-1] if fun_args else None
|
||||
if isinstance(fun_kwargs, dict) and '__kwarg__' in fun_kwargs:
|
||||
fun_args = list(fun_args) # copy on modify
|
||||
del fun_args[-1]
|
||||
else:
|
||||
fun_kwargs = None
|
||||
if self.__fun_check(ind[valid], fun, fun_args, fun_kwargs):
|
||||
return True
|
||||
except TypeError:
|
||||
return False
|
||||
return False
|
||||
|
@ -987,69 +930,19 @@ class CkMinions(object):
|
|||
auth_list.append(matcher)
|
||||
return auth_list
|
||||
|
||||
def wheel_check(self, auth_list, fun):
|
||||
def wheel_check(self, auth_list, fun, args):
|
||||
'''
|
||||
Check special API permissions
|
||||
'''
|
||||
comps = fun.split('.')
|
||||
if len(comps) != 2:
|
||||
return False
|
||||
mod = comps[0]
|
||||
fun = comps[1]
|
||||
for ind in auth_list:
|
||||
if isinstance(ind, six.string_types):
|
||||
if ind.startswith('@') and ind[1:] == mod:
|
||||
return True
|
||||
if ind == '@wheel':
|
||||
return True
|
||||
if ind == '@wheels':
|
||||
return True
|
||||
elif isinstance(ind, dict):
|
||||
if len(ind) != 1:
|
||||
continue
|
||||
valid = next(six.iterkeys(ind))
|
||||
if valid.startswith('@') and valid[1:] == mod:
|
||||
if isinstance(ind[valid], six.string_types):
|
||||
if self.match_check(ind[valid], fun):
|
||||
return True
|
||||
elif isinstance(ind[valid], list):
|
||||
for regex in ind[valid]:
|
||||
if self.match_check(regex, fun):
|
||||
return True
|
||||
return False
|
||||
return self.spec_check(auth_list, fun, args, 'wheel')
|
||||
|
||||
def runner_check(self, auth_list, fun):
|
||||
def runner_check(self, auth_list, fun, args):
|
||||
'''
|
||||
Check special API permissions
|
||||
'''
|
||||
comps = fun.split('.')
|
||||
if len(comps) != 2:
|
||||
return False
|
||||
mod = comps[0]
|
||||
fun = comps[1]
|
||||
for ind in auth_list:
|
||||
if isinstance(ind, six.string_types):
|
||||
if ind.startswith('@') and ind[1:] == mod:
|
||||
return True
|
||||
if ind == '@runners':
|
||||
return True
|
||||
if ind == '@runner':
|
||||
return True
|
||||
elif isinstance(ind, dict):
|
||||
if len(ind) != 1:
|
||||
continue
|
||||
valid = next(six.iterkeys(ind))
|
||||
if valid.startswith('@') and valid[1:] == mod:
|
||||
if isinstance(ind[valid], six.string_types):
|
||||
if self.match_check(ind[valid], fun):
|
||||
return True
|
||||
elif isinstance(ind[valid], list):
|
||||
for regex in ind[valid]:
|
||||
if self.match_check(regex, fun):
|
||||
return True
|
||||
return False
|
||||
return self.spec_check(auth_list, fun, args, 'runner')
|
||||
|
||||
def spec_check(self, auth_list, fun, form):
|
||||
def spec_check(self, auth_list, fun, args, form):
|
||||
'''
|
||||
Check special API permissions
|
||||
'''
|
||||
|
@ -1057,30 +950,87 @@ class CkMinions(object):
|
|||
comps = fun.split('.')
|
||||
if len(comps) != 2:
|
||||
return False
|
||||
mod = comps[0]
|
||||
fun = comps[1]
|
||||
mod_name = comps[0]
|
||||
fun_name = comps[1]
|
||||
else:
|
||||
mod = fun
|
||||
fun_name = mod_name = fun
|
||||
for ind in auth_list:
|
||||
if isinstance(ind, six.string_types):
|
||||
if ind.startswith('@') and ind[1:] == mod:
|
||||
return True
|
||||
if ind == '@{0}'.format(form):
|
||||
return True
|
||||
if ind == '@{0}s'.format(form):
|
||||
return True
|
||||
if ind[0] == '@':
|
||||
if ind[1:] == mod_name or ind[1:] == form or ind == '@{0}s'.format(form):
|
||||
return True
|
||||
elif isinstance(ind, dict):
|
||||
if len(ind) != 1:
|
||||
continue
|
||||
valid = next(six.iterkeys(ind))
|
||||
if valid.startswith('@') and valid[1:] == mod:
|
||||
if isinstance(ind[valid], six.string_types):
|
||||
if self.match_check(ind[valid], fun):
|
||||
if valid[0] == '@':
|
||||
if valid[1:] == mod_name:
|
||||
if self.__fun_check(ind[valid], fun_name, args.get('arg'), args.get('kwarg')):
|
||||
return True
|
||||
elif isinstance(ind[valid], list):
|
||||
for regex in ind[valid]:
|
||||
if self.match_check(regex, fun):
|
||||
return True
|
||||
if valid[1:] == form or valid == '@{0}s'.format(form):
|
||||
if self.__fun_check(ind[valid], fun, args.get('arg'), args.get('kwarg')):
|
||||
return True
|
||||
return False
|
||||
|
||||
def __fun_check(self, valid, fun, args=None, kwargs=None):
|
||||
'''
|
||||
Check the given function name (fun) and its arguments (args) against the list of conditions.
|
||||
'''
|
||||
if not isinstance(valid, list):
|
||||
valid = [valid]
|
||||
for cond in valid:
|
||||
# Function name match
|
||||
if isinstance(cond, six.string_types):
|
||||
if self.match_check(cond, fun):
|
||||
return True
|
||||
# Function and args match
|
||||
elif isinstance(cond, dict):
|
||||
if len(cond) != 1:
|
||||
# Invalid argument
|
||||
continue
|
||||
fname_cond = next(six.iterkeys(cond))
|
||||
if self.match_check(fname_cond, fun): # check key that is function name match
|
||||
if self.__args_check(cond[fname_cond], args, kwargs):
|
||||
return True
|
||||
return False
|
||||
|
||||
def __args_check(self, valid, args=None, kwargs=None):
|
||||
'''
|
||||
valid is a dicts: {'args': [...], 'kwargs': {...}} or a list of such dicts.
|
||||
'''
|
||||
if not isinstance(valid, list):
|
||||
valid = [valid]
|
||||
for cond in valid:
|
||||
if not isinstance(cond, dict):
|
||||
# Invalid argument
|
||||
continue
|
||||
# whitelist args, kwargs
|
||||
cond_args = cond.get('args', [])
|
||||
good = True
|
||||
for i, cond_arg in enumerate(cond_args):
|
||||
if args is None or len(args) <= i:
|
||||
good = False
|
||||
break
|
||||
if cond_arg is None: # None == '.*' i.e. allow any
|
||||
continue
|
||||
if not self.match_check(cond_arg, str(args[i])):
|
||||
good = False
|
||||
break
|
||||
if not good:
|
||||
continue
|
||||
# Check kwargs
|
||||
cond_kwargs = cond.get('kwargs', {})
|
||||
for k, v in six.iteritems(cond_kwargs):
|
||||
if kwargs is None or k not in kwargs:
|
||||
good = False
|
||||
break
|
||||
if v is None: # None == '.*' i.e. allow any
|
||||
continue
|
||||
if not self.match_check(v, str(kwargs[k])):
|
||||
good = False
|
||||
break
|
||||
if good:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
|
|
|
@ -8,6 +8,10 @@ import salt.utils.minions as minions
|
|||
|
||||
# Import Salt Testing Libs
|
||||
from tests.support.unit import TestCase
|
||||
from tests.support.mock import (
|
||||
patch,
|
||||
MagicMock,
|
||||
)
|
||||
|
||||
NODEGROUPS = {
|
||||
'group1': 'L@host1,host2,host3',
|
||||
|
@ -36,3 +40,327 @@ class MinionsTestCase(TestCase):
|
|||
expected = EXPECTED[nodegroup]
|
||||
ret = minions.nodegroup_comp(nodegroup, NODEGROUPS)
|
||||
self.assertEqual(ret, expected)
|
||||
|
||||
|
||||
class CkMinionsTestCase(TestCase):
|
||||
'''
|
||||
TestCase for salt.utils.minions.CkMinions class
|
||||
'''
|
||||
def setUp(self):
|
||||
self.ckminions = minions.CkMinions({})
|
||||
|
||||
def test_spec_check(self):
|
||||
# Test spec-only rule
|
||||
auth_list = ['@runner']
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'wheel')
|
||||
self.assertFalse(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'testarg', {}, 'runner')
|
||||
self.assertFalse(ret)
|
||||
|
||||
# Test spec in plural form
|
||||
auth_list = ['@runners']
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'wheel')
|
||||
self.assertFalse(ret)
|
||||
|
||||
# Test spec with module.function restriction
|
||||
auth_list = [{'@runner': 'test.arg'}]
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'wheel')
|
||||
self.assertFalse(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'tes.arg', {}, 'runner')
|
||||
self.assertFalse(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.ar', {}, 'runner')
|
||||
self.assertFalse(ret)
|
||||
|
||||
# Test function name is a regex
|
||||
auth_list = [{'@runner': 'test.arg.*some'}]
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
|
||||
self.assertFalse(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.argsome', {}, 'runner')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg_aaa_some', {}, 'runner')
|
||||
self.assertTrue(ret)
|
||||
|
||||
# Test a list of funcs
|
||||
auth_list = [{'@runner': ['test.arg', 'jobs.active']}]
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'jobs.active', {}, 'runner')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.active', {}, 'runner')
|
||||
self.assertFalse(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'jobs.arg', {}, 'runner')
|
||||
self.assertFalse(ret)
|
||||
|
||||
# Test args-kwargs rules
|
||||
auth_list = [{
|
||||
'@runner': {
|
||||
'test.arg': {
|
||||
'args': ['1', '2'],
|
||||
'kwargs': {
|
||||
'aaa': 'bbb',
|
||||
'ccc': 'ddd'
|
||||
}
|
||||
}
|
||||
}
|
||||
}]
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
|
||||
self.assertFalse(ret)
|
||||
args = {
|
||||
'arg': ['1', '2'],
|
||||
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd'}
|
||||
}
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertTrue(ret)
|
||||
args = {
|
||||
'arg': ['1', '2', '3'],
|
||||
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd'}
|
||||
}
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertTrue(ret)
|
||||
args = {
|
||||
'arg': ['1', '2'],
|
||||
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd', 'zzz': 'zzz'}
|
||||
}
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertTrue(ret)
|
||||
args = {
|
||||
'arg': ['1', '2'],
|
||||
'kwarg': {'aaa': 'bbb', 'ccc': 'ddc'}
|
||||
}
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertFalse(ret)
|
||||
args = {
|
||||
'arg': ['1', '2'],
|
||||
'kwarg': {'aaa': 'bbb'}
|
||||
}
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertFalse(ret)
|
||||
args = {
|
||||
'arg': ['1', '3'],
|
||||
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd'}
|
||||
}
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertFalse(ret)
|
||||
args = {
|
||||
'arg': ['1'],
|
||||
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd'}
|
||||
}
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertFalse(ret)
|
||||
args = {
|
||||
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd'}
|
||||
}
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertFalse(ret)
|
||||
args = {
|
||||
'arg': ['1', '2'],
|
||||
}
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertFalse(ret)
|
||||
|
||||
# Test kwargs only
|
||||
auth_list = [{
|
||||
'@runner': {
|
||||
'test.arg': {
|
||||
'kwargs': {
|
||||
'aaa': 'bbb',
|
||||
'ccc': 'ddd'
|
||||
}
|
||||
}
|
||||
}
|
||||
}]
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
|
||||
self.assertFalse(ret)
|
||||
args = {
|
||||
'arg': ['1', '2'],
|
||||
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd'}
|
||||
}
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertTrue(ret)
|
||||
|
||||
# Test args only
|
||||
auth_list = [{
|
||||
'@runner': {
|
||||
'test.arg': {
|
||||
'args': ['1', '2']
|
||||
}
|
||||
}
|
||||
}]
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
|
||||
self.assertFalse(ret)
|
||||
args = {
|
||||
'arg': ['1', '2'],
|
||||
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd'}
|
||||
}
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertTrue(ret)
|
||||
|
||||
# Test list of args
|
||||
auth_list = [{'@runner': [{'test.arg': [{'args': ['1', '2'],
|
||||
'kwargs': {'aaa': 'bbb',
|
||||
'ccc': 'ddd'
|
||||
}
|
||||
},
|
||||
{'args': ['2', '3'],
|
||||
'kwargs': {'aaa': 'aaa',
|
||||
'ccc': 'ccc'
|
||||
}
|
||||
}]
|
||||
}]
|
||||
}]
|
||||
args = {
|
||||
'arg': ['1', '2'],
|
||||
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd'}
|
||||
}
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertTrue(ret)
|
||||
args = {
|
||||
'arg': ['2', '3'],
|
||||
'kwarg': {'aaa': 'aaa', 'ccc': 'ccc'}
|
||||
}
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertTrue(ret)
|
||||
|
||||
# Test @module form
|
||||
auth_list = ['@jobs']
|
||||
ret = self.ckminions.spec_check(auth_list, 'jobs.active', {}, 'runner')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'jobs.active', {}, 'wheel')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
|
||||
self.assertFalse(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'job.arg', {}, 'runner')
|
||||
self.assertFalse(ret)
|
||||
|
||||
# Test @module: function
|
||||
auth_list = [{'@jobs': 'active'}]
|
||||
ret = self.ckminions.spec_check(auth_list, 'jobs.active', {}, 'runner')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'jobs.active', {}, 'wheel')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'jobs.active_jobs', {}, 'runner')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'jobs.activ', {}, 'runner')
|
||||
self.assertFalse(ret)
|
||||
|
||||
# Test @module: [functions]
|
||||
auth_list = [{'@jobs': ['active', 'li']}]
|
||||
ret = self.ckminions.spec_check(auth_list, 'jobs.active', {}, 'runner')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'jobs.list_jobs', {}, 'runner')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'jobs.last_run', {}, 'runner')
|
||||
self.assertFalse(ret)
|
||||
|
||||
# Test @module: function with args
|
||||
auth_list = [{'@jobs': {'active': {'args': ['1', '2'],
|
||||
'kwargs': {'a': 'b', 'c': 'd'}}}}]
|
||||
args = {'arg': ['1', '2'],
|
||||
'kwarg': {'a': 'b', 'c': 'd'}}
|
||||
ret = self.ckminions.spec_check(auth_list, 'jobs.active', args, 'runner')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.spec_check(auth_list, 'jobs.active', {}, 'runner')
|
||||
self.assertFalse(ret)
|
||||
|
||||
@patch('salt.utils.minions.CkMinions._pki_minions', MagicMock(return_value=['alpha', 'beta', 'gamma']))
|
||||
def test_auth_check(self):
|
||||
# Test function-only rule
|
||||
auth_list = ['test.ping']
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.ping', None, 'alpha')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.arg', None, 'alpha')
|
||||
self.assertFalse(ret)
|
||||
|
||||
# Test minion and function
|
||||
auth_list = [{'alpha': 'test.ping'}]
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.ping', None, 'alpha')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.arg', None, 'alpha')
|
||||
self.assertFalse(ret)
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.ping', None, 'beta')
|
||||
self.assertFalse(ret)
|
||||
|
||||
# Test function list
|
||||
auth_list = [{'*': ['test.*', 'saltutil.cmd']}]
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.arg', None, 'alpha')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.ping', None, 'beta')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.auth_check(auth_list, 'saltutil.cmd', None, 'gamma')
|
||||
self.assertTrue(ret)
|
||||
ret = self.ckminions.auth_check(auth_list, 'saltutil.running', None, 'gamma')
|
||||
self.assertFalse(ret)
|
||||
|
||||
# Test an args and kwargs rule
|
||||
auth_list = [{
|
||||
'alpha': {
|
||||
'test.arg': {
|
||||
'args': ['1', '2'],
|
||||
'kwargs': {
|
||||
'aaa': 'bbb',
|
||||
'ccc': 'ddd'
|
||||
}
|
||||
}
|
||||
}
|
||||
}]
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.arg', None, 'runner')
|
||||
self.assertFalse(ret)
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.arg', [], 'runner')
|
||||
self.assertFalse(ret)
|
||||
args = ['1', '2', {'aaa': 'bbb', 'ccc': 'ddd', '__kwarg__': True}]
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertTrue(ret)
|
||||
args = ['1', '2', '3', {'aaa': 'bbb', 'ccc': 'ddd', 'eee': 'fff', '__kwarg__': True}]
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertTrue(ret)
|
||||
args = ['1', {'aaa': 'bbb', 'ccc': 'ddd', '__kwarg__': True}]
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertFalse(ret)
|
||||
args = ['1', '2', {'aaa': 'bbb', '__kwarg__': True}]
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertFalse(ret)
|
||||
args = ['1', '3', {'aaa': 'bbb', 'ccc': 'ddd', '__kwarg__': True}]
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertFalse(ret)
|
||||
args = ['1', '2', {'aaa': 'bbb', 'ccc': 'fff', '__kwarg__': True}]
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertFalse(ret)
|
||||
|
||||
# Test kwargs only rule
|
||||
auth_list = [{
|
||||
'alpha': {
|
||||
'test.arg': {
|
||||
'kwargs': {
|
||||
'aaa': 'bbb',
|
||||
'ccc': 'ddd'
|
||||
}
|
||||
}
|
||||
}
|
||||
}]
|
||||
args = ['1', '2', {'aaa': 'bbb', 'ccc': 'ddd', '__kwarg__': True}]
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertTrue(ret)
|
||||
args = [{'aaa': 'bbb', 'ccc': 'ddd', 'eee': 'fff', '__kwarg__': True}]
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertTrue(ret)
|
||||
|
||||
# Test args only rule
|
||||
auth_list = [{
|
||||
'alpha': {
|
||||
'test.arg': {
|
||||
'args': ['1', '2'],
|
||||
}
|
||||
}
|
||||
}]
|
||||
args = ['1', '2', {'aaa': 'bbb', 'ccc': 'ddd', '__kwarg__': True}]
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertTrue(ret)
|
||||
args = ['1', '2']
|
||||
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
|
||||
self.assertTrue(ret)
|
||||
|
|
Loading…
Add table
Reference in a new issue