Unify reactor configuration, fix caller reactors

There are 4 types of reactor jobs, and 3 different config schemas for
passing arguments:

1. local - positional and keyword args passed in arg/kwarg params,
   respectively.
2. runner/wheel - passed as individual params directly under the
   function name.
3. caller - only positional args supported, passed under an "args"
   param.

In addition to being wildly inconsistent, there are several problems
with each of the above approaches:

- For local jobs, having to know which are positional and keyword
  arguments is not user-friendly.
- For runner/wheel jobs, the fact that the arguments are all passed in
  the level directly below the function name means that they are dumped
  directly into the low chunk. This means that if any arguments are
  passed which conflict with the reserved keywords in the low chunk
  (name, order, etc.), they will override their counterparts in the low
  chunk, which may make the Reactor behave unpredictably.

To solve these issues, this commit makes the following changes:

1. A new, unified configuration schema has been added, so that arguments
   are passed identically across all types of reactions. In this new
   schema, all arguments are passed as named arguments underneath an
   "args" parameter. Those named arguments are then passed as keyword
   arguments to the desired function. This works even for positional
   arguments because Python will automagically pass a keyword argument
   as its positional counterpart when the name of a positional argument
   is found in the kwargs.
2. The caller jobs now support both positional and keyword arguments.

Backward-compatibility with the old configuration schema has been
preserved, so old Reactor SLS files do not break. In addition, you've
probably already said to yourself "Hey, caller jobs were _already_
passing their arguments under an "args" param. What gives?" Well, using
the old config schema, only positional arguments were supported. So if
we detect a list of positional arguments, we treat the input as
positional arguments (i.e. old schema), while if the input is a
dictionary (or "dictlist"), we treat the input as kwargs (i.e. new
schema).
This commit is contained in:
Erik Johnson 2017-08-31 00:24:11 -05:00 committed by rallytime
parent 4afb179bad
commit 2a35ab7f39

View file

@ -7,12 +7,14 @@ import glob
import logging
# Import salt libs
import salt.client
import salt.runner
import salt.state
import salt.utils
import salt.utils.cache
import salt.utils.event
import salt.utils.process
import salt.wheel
import salt.defaults.exitcodes
# Import 3rd-party libs
@ -21,6 +23,15 @@ import salt.ext.six as six
log = logging.getLogger(__name__)
REACTOR_INTERNAL_KEYWORDS = frozenset([
'__id__',
'__sls__',
'name',
'order',
'fun',
'state',
])
class Reactor(salt.utils.process.SignalHandlingMultiprocessingProcess, salt.state.Compiler):
'''
@ -29,6 +40,10 @@ class Reactor(salt.utils.process.SignalHandlingMultiprocessingProcess, salt.stat
The reactor has the capability to execute pre-programmed executions
as reactions to events
'''
aliases = {
'cmd': 'local',
}
def __init__(self, opts, log_queue=None):
super(Reactor, self).__init__(log_queue=log_queue)
local_minion_opts = opts.copy()
@ -171,6 +186,16 @@ class Reactor(salt.utils.process.SignalHandlingMultiprocessingProcess, salt.stat
return {'status': False, 'comment': 'Reactor does not exists.'}
def resolve_aliases(self, chunks):
'''
Preserve backward compatibility by rewriting the 'state' key in the low
chunks if it is using a legacy type.
'''
for idx, _ in enumerate(chunks):
new_state = self.aliases.get(chunks[idx]['state'])
if new_state is not None:
chunks[idx]['state'] = new_state
def reactions(self, tag, data, reactors):
'''
Render a list of reactor files and returns a reaction struct
@ -191,6 +216,7 @@ class Reactor(salt.utils.process.SignalHandlingMultiprocessingProcess, salt.stat
except Exception as exc:
log.error('Exception trying to compile reactions: {0}'.format(exc), exc_info=True)
self.resolve_aliases(chunks)
return chunks
def call_reactions(self, chunks):
@ -248,12 +274,19 @@ class Reactor(salt.utils.process.SignalHandlingMultiprocessingProcess, salt.stat
class ReactWrap(object):
'''
Create a wrapper that executes low data for the reaction system
Wrapper that executes low data for the Reactor System
'''
# class-wide cache of clients
client_cache = None
event_user = 'Reactor'
reaction_class = {
'local': salt.client.LocalClient,
'runner': salt.runner.RunnerClient,
'wheel': salt.wheel.Wheel,
'caller': salt.client.Caller,
}
def __init__(self, opts):
self.opts = opts
if ReactWrap.client_cache is None:
@ -264,21 +297,49 @@ class ReactWrap(object):
queue_size=self.opts['reactor_worker_hwm'] # queue size for those workers
)
def populate_client_cache(self, low):
'''
Populate the client cache with an instance of the specified type
'''
reaction_type = low['state']
if reaction_type not in self.client_cache:
log.debug('Reactor is populating %s client cache', reaction_type)
if reaction_type in ('runner', 'wheel'):
# Reaction types that run locally on the master want the full
# opts passed.
self.client_cache[reaction_type] = \
self.reaction_class[reaction_type](self.opts)
# The len() function will cause the module functions to load if
# they aren't already loaded. We want to load them so that the
# spawned threads don't need to load them. Loading in the
# spawned threads creates race conditions such as sometimes not
# finding the required function because another thread is in
# the middle of loading the functions.
len(self.client_cache[reaction_type].functions)
else:
# Reactions which use remote pubs only need the conf file when
# instantiating a client instance.
self.client_cache[reaction_type] = \
self.reaction_class[reaction_type](self.opts['conf_file'])
def run(self, low):
'''
Execute the specified function in the specified state by passing the
low data
Execute a reaction by invoking the proper wrapper func
'''
l_fun = getattr(self, low['state'])
self.populate_client_cache(low)
try:
f_call = salt.utils.format_call(l_fun, low)
kwargs = f_call.get('kwargs', {})
if 'arg' not in kwargs:
kwargs['arg'] = []
if 'kwarg' not in kwargs:
kwargs['kwarg'] = {}
l_fun = getattr(self, low['state'])
except AttributeError:
log.error(
'ReactWrap is missing a wrapper function for \'%s\'',
low['state']
)
# TODO: Setting the user doesn't seem to work for actual remote publishes
try:
wrap_call = salt.utils.format_call(l_fun, low)
args = wrap_call.get('args', ())
kwargs = wrap_call.get('kwargs', {})
# TODO: Setting user doesn't seem to work for actual remote pubs
if low['state'] in ('runner', 'wheel'):
# Update called function's low data with event user to
# segregate events fired by reactor and avoid reaction loops
@ -286,80 +347,106 @@ class ReactWrap(object):
# Replace ``state`` kwarg which comes from high data compiler.
# It breaks some runner functions and seems unnecessary.
kwargs['__state__'] = kwargs.pop('state')
# NOTE: if any additional keys are added here, they will also
# need to be added to filter_kwargs()
l_fun(*f_call.get('args', ()), **kwargs)
if 'args' in kwargs:
# New configuration
reactor_args = kwargs.pop('args')
for item in ('arg', 'kwarg'):
if item in low:
log.warning(
'Reactor \'%s\' is ignoring \'%s\' param %s due to '
'presence of \'args\' param. Check the Reactor System '
'documentation for the correct argument format.',
low['__id__'], item, low[item]
)
if low['state'] == 'caller' \
and isinstance(reactor_args, list) \
and not salt.utils.is_dictlist(reactor_args):
# Legacy 'caller' reactors were already using the 'args'
# param, but only supported a list of positional arguments.
# If low['args'] is a list but is *not* a dictlist, then
# this is actually using the legacy configuration. So, put
# the reactor args into kwarg['arg'] so that the wrapper
# interprets them as positional args.
kwargs['arg'] = reactor_args
kwargs['kwarg'] = {}
else:
kwargs['arg'] = ()
kwargs['kwarg'] = reactor_args
if not isinstance(kwargs['kwarg'], dict):
kwargs['kwarg'] = salt.utils.repack_dictlist(kwargs['kwarg'])
if not kwargs['kwarg']:
log.error(
'Reactor \'%s\' failed to execute %s \'%s\': '
'Incorrect argument format, check the Reactor System '
'documentation for the correct format.',
low['__id__'], low['state'], low['fun']
)
return
else:
# Legacy configuration
react_call = {}
if low['state'] in ('runner', 'wheel'):
if 'arg' not in kwargs or 'kwarg' not in kwargs:
# Runner/wheel execute on the master, so we can use
# format_call to get the functions args/kwargs
react_fun = self.client_cache[low['state']].functions.get(low['fun'])
if react_fun is None:
log.error(
'Reactor \'%s\' failed to execute %s \'%s\': '
'function not available',
low['__id__'], low['state'], low['fun']
)
return
react_call = salt.utils.format_call(
react_fun,
low,
expected_extra_kws=REACTOR_INTERNAL_KEYWORDS
)
if 'arg' not in kwargs:
kwargs['arg'] = react_call.get('args', ())
if 'kwarg' not in kwargs:
kwargs['kwarg'] = react_call.get('kwargs', {})
# Execute the wrapper with the proper args/kwargs. kwargs['arg']
# and kwargs['kwarg'] contain the positional and keyword arguments
# that will be passed to the client interface to execute the
# desired runner/wheel/remote-exec/etc. function.
l_fun(*args, **kwargs)
except SystemExit:
log.warning(
'Reactor \'%s\' attempted to exit. Ignored.', low['__id__']
)
except Exception:
log.error(
'Failed to execute {0}: {1}\n'.format(low['state'], l_fun),
exc_info=True
)
def local(self, *args, **kwargs):
'''
Wrap LocalClient for running :ref:`execution modules <all-salt.modules>`
'''
if 'local' not in self.client_cache:
self.client_cache['local'] = salt.client.LocalClient(self.opts['conf_file'])
try:
self.client_cache['local'].cmd_async(*args, **kwargs)
except SystemExit:
log.warning('Attempt to exit reactor. Ignored.')
except Exception as exc:
log.warning('Exception caught by reactor: {0}'.format(exc))
cmd = local
'Reactor \'%s\' failed to execute %s \'%s\'',
low['__id__'], low['state'], low['fun'], exc_info=True
)
def runner(self, fun, **kwargs):
'''
Wrap RunnerClient for executing :ref:`runner modules <all-salt.runners>`
'''
if 'runner' not in self.client_cache:
self.client_cache['runner'] = salt.runner.RunnerClient(self.opts)
# The len() function will cause the module functions to load if
# they aren't already loaded. We want to load them so that the
# spawned threads don't need to load them. Loading in the spawned
# threads creates race conditions such as sometimes not finding
# the required function because another thread is in the middle
# of loading the functions.
len(self.client_cache['runner'].functions)
try:
self.pool.fire_async(self.client_cache['runner'].low, args=(fun, kwargs))
except SystemExit:
log.warning('Attempt to exit in reactor by runner. Ignored')
except Exception as exc:
log.warning('Exception caught by reactor: {0}'.format(exc))
self.pool.fire_async(self.client_cache['runner'].low, args=(fun, kwargs))
def wheel(self, fun, **kwargs):
'''
Wrap Wheel to enable executing :ref:`wheel modules <all-salt.wheel>`
'''
if 'wheel' not in self.client_cache:
self.client_cache['wheel'] = salt.wheel.Wheel(self.opts)
# The len() function will cause the module functions to load if
# they aren't already loaded. We want to load them so that the
# spawned threads don't need to load them. Loading in the spawned
# threads creates race conditions such as sometimes not finding
# the required function because another thread is in the middle
# of loading the functions.
len(self.client_cache['wheel'].functions)
try:
self.pool.fire_async(self.client_cache['wheel'].low, args=(fun, kwargs))
except SystemExit:
log.warning('Attempt to in reactor by whell. Ignored.')
except Exception as exc:
log.warning('Exception caught by reactor: {0}'.format(exc))
self.pool.fire_async(self.client_cache['wheel'].low, args=(fun, kwargs))
def caller(self, fun, *args, **kwargs):
def local(self, fun, tgt, **kwargs):
'''
Wrap Caller to enable executing :ref:`caller modules <all-salt.caller>`
Wrap LocalClient for running :ref:`execution modules <all-salt.modules>`
'''
log.debug("in caller with fun {0} args {1} kwargs {2}".format(fun, args, kwargs))
args = kwargs.get('args', [])
if 'caller' not in self.client_cache:
self.client_cache['caller'] = salt.client.Caller(self.opts['conf_file'])
try:
self.client_cache['caller'].function(fun, *args)
except SystemExit:
log.warning('Attempt to exit reactor. Ignored.')
except Exception as exc:
log.warning('Exception caught by reactor: {0}'.format(exc))
self.client_cache['local'].cmd_async(tgt, fun, **kwargs)
def caller(self, fun, **kwargs):
'''
Wrap LocalCaller to execute remote exec functions locally on the Minion
'''
self.client_cache['caller'].cmd(fun, *kwargs['arg'], **kwargs['kwarg'])