Merge pull request #39670 from smarsching/feature-parallel-runners

Added the saltmod.parallel_runners state.
This commit is contained in:
Nicole Thomas 2017-09-28 17:30:08 -04:00 committed by GitHub
commit 63821381a0

View file

@ -26,10 +26,15 @@ from __future__ import absolute_import
# Import python libs
import fnmatch
import logging
import sys
import threading
import time
# Import salt libs
import salt.syspaths
import salt.exceptions
import salt.output
import salt.utils
import salt.utils.event
import salt.utils.versions
from salt.ext import six
@ -59,6 +64,48 @@ def _fire_args(tag_data):
)
def _parallel_map(func, inputs):
'''
Applies a function to each element of a list, returning the resulting list.
A separate thread is created for each element in the input list and the
passed function is called for each of the elements. When all threads have
finished execution a list with the results corresponding to the inputs is
returned.
If one of the threads fails (because the function throws an exception),
that exception is reraised. If more than one thread fails, the exception
from the first thread (according to the index of the input element) is
reraised.
func:
function that is applied on each input element.
inputs:
list of elements that shall be processed. The length of this list also
defines the number of threads created.
'''
outputs = len(inputs) * [None]
errors = len(inputs) * [None]
def create_thread(index):
def run_thread():
try:
outputs[index] = func(inputs[index])
except: # pylint: disable=bare-except
errors[index] = sys.exc_info()
thread = threading.Thread(target=run_thread)
thread.start()
return thread
threads = list(six.moves.map(create_thread, six.moves.range(len(inputs))))
for thread in threads:
thread.join()
for error in errors:
if error is not None:
exc_type, exc_value, exc_traceback = error
six.reraise(exc_type, exc_value, exc_traceback)
return outputs
def state(name,
tgt,
ssh=False,
@ -770,6 +817,190 @@ def runner(name, **kwargs):
return ret
def parallel_runners(name, runners):
'''
Executes multiple runner modules on the master in parallel.
.. versionadded:: 2017.x.0 (Nitrogen)
A separate thread is spawned for each runner. This state is intended to be
used with the orchestrate runner in place of the ``saltmod.runner`` state
when different tasks should be run in parallel. In general, Salt states are
not safe when used concurrently, so ensure that they are used in a safe way
(e.g. by only targeting separate minions in parallel tasks).
name:
name identifying this state. The name is provided as part of the
output, but not used for anything else.
runners:
list of runners that should be run in parallel. Each element of the
list has to be a dictionary. This dictionary's name entry stores the
name of the runner function that shall be invoked. The optional kwarg
entry stores a dictionary of named arguments that are passed to the
runner function.
.. code-block:: yaml
parallel-state:
salt.parallel_runners:
- runners:
my_runner_1:
- name: state.orchestrate
- kwarg:
mods: orchestrate_state_1
my_runner_2:
- name: state.orchestrate
- kwarg:
mods: orchestrate_state_2
'''
# For the sake of consistency, we treat a single string in the same way as
# a key without a value. This allows something like
# salt.parallel_runners:
# - runners:
# state.orchestrate
# Obviously, this will only work if the specified runner does not need any
# arguments.
if isinstance(runners, six.string_types):
runners = {runners: [{name: runners}]}
# If the runners argument is not a string, it must be a dict. Everything
# else is considered an error.
if not isinstance(runners, dict):
return {
'name': name,
'result': False,
'changes': {},
'comment': 'The runners parameter must be a string or dict.'
}
# The configuration for each runner is given as a list of key-value pairs.
# This is not very useful for what we want to do, but it is the typical
# style used in Salt. For further processing, we convert each of these
# lists to a dict. This also makes it easier to check whether a name has
# been specified explicitly.
for runner_id, runner_config in six.iteritems(runners):
if runner_config is None:
runner_config = {}
else:
runner_config = salt.utils.repack_dictlist(runner_config)
if 'name' not in runner_config:
runner_config['name'] = runner_id
runners[runner_id] = runner_config
try:
jid = __orchestration_jid__
except NameError:
log.debug(
'Unable to fire args event due to missing __orchestration_jid__')
jid = None
def call_runner(runner_config):
return __salt__['saltutil.runner'](runner_config['name'],
__orchestration_jid__=jid,
__env__=__env__,
full_return=True,
**(runner_config.get('kwarg', {})))
try:
outputs = _parallel_map(call_runner, list(six.itervalues(runners)))
except salt.exceptions.SaltException as exc:
return {
'name': name,
'result': False,
'success': False,
'changes': {},
'comment': 'One of the runners raised an exception: {0}'.format(
exc)
}
# We bundle the results of the runners with the IDs of the runners so that
# we can easily identify which output belongs to which runner. At the same
# time we exctract the actual return value of the runner (saltutil.runner
# adds some extra information that is not interesting to us).
outputs = {
runner_id: out['return']for runner_id, out in
six.moves.zip(six.iterkeys(runners), outputs)
}
# If each of the runners returned its output in the format compatible with
# the 'highstate' outputter, we can leverage this fact when merging the
# outputs.
highstate_output = all(
[out.get('outputter', '') == 'highstate' and 'data' in out for out in
six.itervalues(outputs)]
)
# The following helper function is used to extract changes from highstate
# output.
def extract_changes(obj):
if not isinstance(obj, dict):
return {}
elif 'changes' in obj:
if (isinstance(obj['changes'], dict)
and obj['changes'].get('out', '') == 'highstate'
and 'ret' in obj['changes']):
return obj['changes']['ret']
else:
return obj['changes']
else:
found_changes = {}
for key, value in six.iteritems(obj):
change = extract_changes(value)
if change:
found_changes[key] = change
return found_changes
if highstate_output:
failed_runners = [runner_id for runner_id, out in
six.iteritems(outputs) if
out['data'].get('retcode', 0) != 0]
all_successful = not failed_runners
if all_successful:
comment = 'All runner functions executed successfully.'
else:
runner_comments = [
'Runner {0} failed with return value:\n{1}'.format(
runner_id,
salt.output.out_format(outputs[runner_id],
'nested',
__opts__,
nested_indent=2)
) for runner_id in failed_runners
]
comment = '\n'.join(runner_comments)
changes = {}
for runner_id, out in six.iteritems(outputs):
runner_changes = extract_changes(out['data'])
if runner_changes:
changes[runner_id] = runner_changes
else:
failed_runners = [runner_id for runner_id, out in
six.iteritems(outputs) if
out.get('exit_code', 0) != 0]
all_successful = not failed_runners
if all_successful:
comment = 'All runner functions executed successfully.'
else:
if len(failed_runners) == 1:
comment = 'Runner {0} failed.'.format(failed_runners[0])
else:
comment =\
'Runners {0} failed.'.format(', '.join(failed_runners))
changes = {'ret': {
runner_id: out for runner_id, out in six.iteritems(outputs)
}}
ret = {
'name': name,
'result': all_successful,
'changes': changes,
'comment': comment
}
# The 'runner' function includes out['jid'] as '__jid__' in the returned
# dict, but we cannot do this here because we have more than one JID if
# we have more than one runner.
return ret
def wheel(name, **kwargs):
'''
Execute a wheel module on the master