mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Unify reactor configuration, fix caller reactors
There are 4 types of reactor jobs, and 3 different config schemas for passing arguments: 1. local - positional and keyword args passed in arg/kwarg params, respectively. 2. runner/wheel - passed as individual params directly under the function name. 3. caller - only positional args supported, passed under an "args" param. In addition to being wildly inconsistent, there are several problems with each of the above approaches: - For local jobs, having to know which are positional and keyword arguments is not user-friendly. - For runner/wheel jobs, the fact that the arguments are all passed in the level directly below the function name means that they are dumped directly into the low chunk. This means that if any arguments are passed which conflict with the reserved keywords in the low chunk (name, order, etc.), they will override their counterparts in the low chunk, which may make the Reactor behave unpredictably. To solve these issues, this commit makes the following changes: 1. A new, unified configuration schema has been added, so that arguments are passed identically across all types of reactions. In this new schema, all arguments are passed as named arguments underneath an "args" parameter. Those named arguments are then passed as keyword arguments to the desired function. This works even for positional arguments because Python will automagically pass a keyword argument as its positional counterpart when the name of a positional argument is found in the kwargs. 2. The caller jobs now support both positional and keyword arguments. Backward-compatibility with the old configuration schema has been preserved, so old Reactor SLS files do not break. In addition, you've probably already said to yourself "Hey, caller jobs were _already_ passing their arguments under an "args" param. What gives?" Well, using the old config schema, only positional arguments were supported. So if we detect a list of positional arguments, we treat the input as positional arguments (i.e. old schema), while if the input is a dictionary (or "dictlist"), we treat the input as kwargs (i.e. new schema).
This commit is contained in:
parent
4afb179bad
commit
2a35ab7f39
1 changed files with 159 additions and 72 deletions
|
@ -7,12 +7,14 @@ import glob
|
|||
import logging
|
||||
|
||||
# Import salt libs
|
||||
import salt.client
|
||||
import salt.runner
|
||||
import salt.state
|
||||
import salt.utils
|
||||
import salt.utils.cache
|
||||
import salt.utils.event
|
||||
import salt.utils.process
|
||||
import salt.wheel
|
||||
import salt.defaults.exitcodes
|
||||
|
||||
# Import 3rd-party libs
|
||||
|
@ -21,6 +23,15 @@ import salt.ext.six as six
|
|||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
REACTOR_INTERNAL_KEYWORDS = frozenset([
|
||||
'__id__',
|
||||
'__sls__',
|
||||
'name',
|
||||
'order',
|
||||
'fun',
|
||||
'state',
|
||||
])
|
||||
|
||||
|
||||
class Reactor(salt.utils.process.SignalHandlingMultiprocessingProcess, salt.state.Compiler):
|
||||
'''
|
||||
|
@ -29,6 +40,10 @@ class Reactor(salt.utils.process.SignalHandlingMultiprocessingProcess, salt.stat
|
|||
The reactor has the capability to execute pre-programmed executions
|
||||
as reactions to events
|
||||
'''
|
||||
aliases = {
|
||||
'cmd': 'local',
|
||||
}
|
||||
|
||||
def __init__(self, opts, log_queue=None):
|
||||
super(Reactor, self).__init__(log_queue=log_queue)
|
||||
local_minion_opts = opts.copy()
|
||||
|
@ -171,6 +186,16 @@ class Reactor(salt.utils.process.SignalHandlingMultiprocessingProcess, salt.stat
|
|||
|
||||
return {'status': False, 'comment': 'Reactor does not exists.'}
|
||||
|
||||
def resolve_aliases(self, chunks):
|
||||
'''
|
||||
Preserve backward compatibility by rewriting the 'state' key in the low
|
||||
chunks if it is using a legacy type.
|
||||
'''
|
||||
for idx, _ in enumerate(chunks):
|
||||
new_state = self.aliases.get(chunks[idx]['state'])
|
||||
if new_state is not None:
|
||||
chunks[idx]['state'] = new_state
|
||||
|
||||
def reactions(self, tag, data, reactors):
|
||||
'''
|
||||
Render a list of reactor files and returns a reaction struct
|
||||
|
@ -191,6 +216,7 @@ class Reactor(salt.utils.process.SignalHandlingMultiprocessingProcess, salt.stat
|
|||
except Exception as exc:
|
||||
log.error('Exception trying to compile reactions: {0}'.format(exc), exc_info=True)
|
||||
|
||||
self.resolve_aliases(chunks)
|
||||
return chunks
|
||||
|
||||
def call_reactions(self, chunks):
|
||||
|
@ -248,12 +274,19 @@ class Reactor(salt.utils.process.SignalHandlingMultiprocessingProcess, salt.stat
|
|||
|
||||
class ReactWrap(object):
|
||||
'''
|
||||
Create a wrapper that executes low data for the reaction system
|
||||
Wrapper that executes low data for the Reactor System
|
||||
'''
|
||||
# class-wide cache of clients
|
||||
client_cache = None
|
||||
event_user = 'Reactor'
|
||||
|
||||
reaction_class = {
|
||||
'local': salt.client.LocalClient,
|
||||
'runner': salt.runner.RunnerClient,
|
||||
'wheel': salt.wheel.Wheel,
|
||||
'caller': salt.client.Caller,
|
||||
}
|
||||
|
||||
def __init__(self, opts):
|
||||
self.opts = opts
|
||||
if ReactWrap.client_cache is None:
|
||||
|
@ -264,21 +297,49 @@ class ReactWrap(object):
|
|||
queue_size=self.opts['reactor_worker_hwm'] # queue size for those workers
|
||||
)
|
||||
|
||||
def populate_client_cache(self, low):
|
||||
'''
|
||||
Populate the client cache with an instance of the specified type
|
||||
'''
|
||||
reaction_type = low['state']
|
||||
if reaction_type not in self.client_cache:
|
||||
log.debug('Reactor is populating %s client cache', reaction_type)
|
||||
if reaction_type in ('runner', 'wheel'):
|
||||
# Reaction types that run locally on the master want the full
|
||||
# opts passed.
|
||||
self.client_cache[reaction_type] = \
|
||||
self.reaction_class[reaction_type](self.opts)
|
||||
# The len() function will cause the module functions to load if
|
||||
# they aren't already loaded. We want to load them so that the
|
||||
# spawned threads don't need to load them. Loading in the
|
||||
# spawned threads creates race conditions such as sometimes not
|
||||
# finding the required function because another thread is in
|
||||
# the middle of loading the functions.
|
||||
len(self.client_cache[reaction_type].functions)
|
||||
else:
|
||||
# Reactions which use remote pubs only need the conf file when
|
||||
# instantiating a client instance.
|
||||
self.client_cache[reaction_type] = \
|
||||
self.reaction_class[reaction_type](self.opts['conf_file'])
|
||||
|
||||
def run(self, low):
|
||||
'''
|
||||
Execute the specified function in the specified state by passing the
|
||||
low data
|
||||
Execute a reaction by invoking the proper wrapper func
|
||||
'''
|
||||
l_fun = getattr(self, low['state'])
|
||||
self.populate_client_cache(low)
|
||||
try:
|
||||
f_call = salt.utils.format_call(l_fun, low)
|
||||
kwargs = f_call.get('kwargs', {})
|
||||
if 'arg' not in kwargs:
|
||||
kwargs['arg'] = []
|
||||
if 'kwarg' not in kwargs:
|
||||
kwargs['kwarg'] = {}
|
||||
l_fun = getattr(self, low['state'])
|
||||
except AttributeError:
|
||||
log.error(
|
||||
'ReactWrap is missing a wrapper function for \'%s\'',
|
||||
low['state']
|
||||
)
|
||||
|
||||
# TODO: Setting the user doesn't seem to work for actual remote publishes
|
||||
try:
|
||||
wrap_call = salt.utils.format_call(l_fun, low)
|
||||
args = wrap_call.get('args', ())
|
||||
kwargs = wrap_call.get('kwargs', {})
|
||||
# TODO: Setting user doesn't seem to work for actual remote pubs
|
||||
if low['state'] in ('runner', 'wheel'):
|
||||
# Update called function's low data with event user to
|
||||
# segregate events fired by reactor and avoid reaction loops
|
||||
|
@ -286,80 +347,106 @@ class ReactWrap(object):
|
|||
# Replace ``state`` kwarg which comes from high data compiler.
|
||||
# It breaks some runner functions and seems unnecessary.
|
||||
kwargs['__state__'] = kwargs.pop('state')
|
||||
# NOTE: if any additional keys are added here, they will also
|
||||
# need to be added to filter_kwargs()
|
||||
|
||||
l_fun(*f_call.get('args', ()), **kwargs)
|
||||
if 'args' in kwargs:
|
||||
# New configuration
|
||||
reactor_args = kwargs.pop('args')
|
||||
for item in ('arg', 'kwarg'):
|
||||
if item in low:
|
||||
log.warning(
|
||||
'Reactor \'%s\' is ignoring \'%s\' param %s due to '
|
||||
'presence of \'args\' param. Check the Reactor System '
|
||||
'documentation for the correct argument format.',
|
||||
low['__id__'], item, low[item]
|
||||
)
|
||||
if low['state'] == 'caller' \
|
||||
and isinstance(reactor_args, list) \
|
||||
and not salt.utils.is_dictlist(reactor_args):
|
||||
# Legacy 'caller' reactors were already using the 'args'
|
||||
# param, but only supported a list of positional arguments.
|
||||
# If low['args'] is a list but is *not* a dictlist, then
|
||||
# this is actually using the legacy configuration. So, put
|
||||
# the reactor args into kwarg['arg'] so that the wrapper
|
||||
# interprets them as positional args.
|
||||
kwargs['arg'] = reactor_args
|
||||
kwargs['kwarg'] = {}
|
||||
else:
|
||||
kwargs['arg'] = ()
|
||||
kwargs['kwarg'] = reactor_args
|
||||
if not isinstance(kwargs['kwarg'], dict):
|
||||
kwargs['kwarg'] = salt.utils.repack_dictlist(kwargs['kwarg'])
|
||||
if not kwargs['kwarg']:
|
||||
log.error(
|
||||
'Reactor \'%s\' failed to execute %s \'%s\': '
|
||||
'Incorrect argument format, check the Reactor System '
|
||||
'documentation for the correct format.',
|
||||
low['__id__'], low['state'], low['fun']
|
||||
)
|
||||
return
|
||||
else:
|
||||
# Legacy configuration
|
||||
react_call = {}
|
||||
if low['state'] in ('runner', 'wheel'):
|
||||
if 'arg' not in kwargs or 'kwarg' not in kwargs:
|
||||
# Runner/wheel execute on the master, so we can use
|
||||
# format_call to get the functions args/kwargs
|
||||
react_fun = self.client_cache[low['state']].functions.get(low['fun'])
|
||||
if react_fun is None:
|
||||
log.error(
|
||||
'Reactor \'%s\' failed to execute %s \'%s\': '
|
||||
'function not available',
|
||||
low['__id__'], low['state'], low['fun']
|
||||
)
|
||||
return
|
||||
|
||||
react_call = salt.utils.format_call(
|
||||
react_fun,
|
||||
low,
|
||||
expected_extra_kws=REACTOR_INTERNAL_KEYWORDS
|
||||
)
|
||||
|
||||
if 'arg' not in kwargs:
|
||||
kwargs['arg'] = react_call.get('args', ())
|
||||
if 'kwarg' not in kwargs:
|
||||
kwargs['kwarg'] = react_call.get('kwargs', {})
|
||||
|
||||
# Execute the wrapper with the proper args/kwargs. kwargs['arg']
|
||||
# and kwargs['kwarg'] contain the positional and keyword arguments
|
||||
# that will be passed to the client interface to execute the
|
||||
# desired runner/wheel/remote-exec/etc. function.
|
||||
l_fun(*args, **kwargs)
|
||||
except SystemExit:
|
||||
log.warning(
|
||||
'Reactor \'%s\' attempted to exit. Ignored.', low['__id__']
|
||||
)
|
||||
except Exception:
|
||||
log.error(
|
||||
'Failed to execute {0}: {1}\n'.format(low['state'], l_fun),
|
||||
exc_info=True
|
||||
)
|
||||
|
||||
def local(self, *args, **kwargs):
|
||||
'''
|
||||
Wrap LocalClient for running :ref:`execution modules <all-salt.modules>`
|
||||
'''
|
||||
if 'local' not in self.client_cache:
|
||||
self.client_cache['local'] = salt.client.LocalClient(self.opts['conf_file'])
|
||||
try:
|
||||
self.client_cache['local'].cmd_async(*args, **kwargs)
|
||||
except SystemExit:
|
||||
log.warning('Attempt to exit reactor. Ignored.')
|
||||
except Exception as exc:
|
||||
log.warning('Exception caught by reactor: {0}'.format(exc))
|
||||
|
||||
cmd = local
|
||||
'Reactor \'%s\' failed to execute %s \'%s\'',
|
||||
low['__id__'], low['state'], low['fun'], exc_info=True
|
||||
)
|
||||
|
||||
def runner(self, fun, **kwargs):
|
||||
'''
|
||||
Wrap RunnerClient for executing :ref:`runner modules <all-salt.runners>`
|
||||
'''
|
||||
if 'runner' not in self.client_cache:
|
||||
self.client_cache['runner'] = salt.runner.RunnerClient(self.opts)
|
||||
# The len() function will cause the module functions to load if
|
||||
# they aren't already loaded. We want to load them so that the
|
||||
# spawned threads don't need to load them. Loading in the spawned
|
||||
# threads creates race conditions such as sometimes not finding
|
||||
# the required function because another thread is in the middle
|
||||
# of loading the functions.
|
||||
len(self.client_cache['runner'].functions)
|
||||
try:
|
||||
self.pool.fire_async(self.client_cache['runner'].low, args=(fun, kwargs))
|
||||
except SystemExit:
|
||||
log.warning('Attempt to exit in reactor by runner. Ignored')
|
||||
except Exception as exc:
|
||||
log.warning('Exception caught by reactor: {0}'.format(exc))
|
||||
self.pool.fire_async(self.client_cache['runner'].low, args=(fun, kwargs))
|
||||
|
||||
def wheel(self, fun, **kwargs):
|
||||
'''
|
||||
Wrap Wheel to enable executing :ref:`wheel modules <all-salt.wheel>`
|
||||
'''
|
||||
if 'wheel' not in self.client_cache:
|
||||
self.client_cache['wheel'] = salt.wheel.Wheel(self.opts)
|
||||
# The len() function will cause the module functions to load if
|
||||
# they aren't already loaded. We want to load them so that the
|
||||
# spawned threads don't need to load them. Loading in the spawned
|
||||
# threads creates race conditions such as sometimes not finding
|
||||
# the required function because another thread is in the middle
|
||||
# of loading the functions.
|
||||
len(self.client_cache['wheel'].functions)
|
||||
try:
|
||||
self.pool.fire_async(self.client_cache['wheel'].low, args=(fun, kwargs))
|
||||
except SystemExit:
|
||||
log.warning('Attempt to in reactor by whell. Ignored.')
|
||||
except Exception as exc:
|
||||
log.warning('Exception caught by reactor: {0}'.format(exc))
|
||||
self.pool.fire_async(self.client_cache['wheel'].low, args=(fun, kwargs))
|
||||
|
||||
def caller(self, fun, *args, **kwargs):
|
||||
def local(self, fun, tgt, **kwargs):
|
||||
'''
|
||||
Wrap Caller to enable executing :ref:`caller modules <all-salt.caller>`
|
||||
Wrap LocalClient for running :ref:`execution modules <all-salt.modules>`
|
||||
'''
|
||||
log.debug("in caller with fun {0} args {1} kwargs {2}".format(fun, args, kwargs))
|
||||
args = kwargs.get('args', [])
|
||||
if 'caller' not in self.client_cache:
|
||||
self.client_cache['caller'] = salt.client.Caller(self.opts['conf_file'])
|
||||
try:
|
||||
self.client_cache['caller'].function(fun, *args)
|
||||
except SystemExit:
|
||||
log.warning('Attempt to exit reactor. Ignored.')
|
||||
except Exception as exc:
|
||||
log.warning('Exception caught by reactor: {0}'.format(exc))
|
||||
self.client_cache['local'].cmd_async(tgt, fun, **kwargs)
|
||||
|
||||
def caller(self, fun, **kwargs):
|
||||
'''
|
||||
Wrap LocalCaller to execute remote exec functions locally on the Minion
|
||||
'''
|
||||
self.client_cache['caller'].cmd(fun, *kwargs['arg'], **kwargs['kwarg'])
|
||||
|
|
Loading…
Add table
Reference in a new issue