mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Adding salt/metaproxy/proxy.py and tests/unit/modules/test_match.py
This commit is contained in:
parent
51726b174e
commit
d162a88629
2 changed files with 1044 additions and 0 deletions
799
salt/metaproxy/proxy.py
Normal file
799
salt/metaproxy/proxy.py
Normal file
|
@ -0,0 +1,799 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Proxy minion metaproxy modules
|
||||
#
|
||||
from __future__ import absolute_import, print_function, with_statement, unicode_literals
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import types
|
||||
import logging
|
||||
import threading
|
||||
import traceback
|
||||
|
||||
# Import Salt Libs
|
||||
# pylint: disable=3rd-party-module-not-gated
|
||||
import salt
|
||||
import salt.client
|
||||
import salt.crypt
|
||||
import salt.loader
|
||||
import salt.beacons
|
||||
import salt.engines
|
||||
import salt.payload
|
||||
import salt.pillar
|
||||
import salt.syspaths
|
||||
import salt.utils.args
|
||||
import salt.utils.context
|
||||
import salt.utils.data
|
||||
import salt.utils.error
|
||||
import salt.utils.event
|
||||
import salt.utils.files
|
||||
import salt.utils.jid
|
||||
import salt.utils.minion
|
||||
import salt.utils.minions
|
||||
import salt.utils.network
|
||||
import salt.utils.platform
|
||||
import salt.utils.process
|
||||
import salt.utils.schedule
|
||||
import salt.utils.ssdp
|
||||
import salt.utils.user
|
||||
import salt.utils.zeromq
|
||||
import salt.defaults.exitcodes
|
||||
import salt.cli.daemons
|
||||
import salt.log.setup
|
||||
import salt.serializers.msgpack
|
||||
import salt.minion
|
||||
import salt.defaults.exitcodes
|
||||
|
||||
import salt.utils.dictupdate
|
||||
from salt.utils.event import tagify
|
||||
from salt.exceptions import (
|
||||
CommandExecutionError,
|
||||
CommandNotFoundError,
|
||||
SaltInvocationError,
|
||||
SaltSystemExit,
|
||||
)
|
||||
from salt.ext import six
|
||||
from salt.ext.six.moves import range
|
||||
from salt.minion import ProxyMinion
|
||||
|
||||
from salt.defaults import DEFAULT_TARGET_DELIM
|
||||
from salt.utils.process import (default_signals,
|
||||
SignalHandlingMultiprocessingProcess)
|
||||
|
||||
|
||||
import tornado.gen # pylint: disable=F0401
|
||||
import tornado.ioloop # pylint: disable=F0401
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def post_master_init(self, master):
|
||||
|
||||
log.debug("subclassed LazyLoaded _post_master_init")
|
||||
if self.connected:
|
||||
self.opts['master'] = master
|
||||
|
||||
self.opts['pillar'] = yield salt.pillar.get_async_pillar(
|
||||
self.opts,
|
||||
self.opts['grains'],
|
||||
self.opts['id'],
|
||||
saltenv=self.opts['saltenv'],
|
||||
pillarenv=self.opts.get('pillarenv'),
|
||||
).compile_pillar()
|
||||
|
||||
if 'proxy' not in self.opts['pillar'] and 'proxy' not in self.opts:
|
||||
errmsg = 'No proxy key found in pillar or opts for id ' + self.opts['id'] + '. ' + \
|
||||
'Check your pillar/opts configuration and contents. Salt-proxy aborted.'
|
||||
log.error(errmsg)
|
||||
self._running = False
|
||||
raise SaltSystemExit(code=-1, msg=errmsg)
|
||||
|
||||
if 'proxy' not in self.opts:
|
||||
self.opts['proxy'] = self.opts['pillar']['proxy']
|
||||
|
||||
if self.opts.get('proxy_merge_pillar_in_opts'):
|
||||
# Override proxy opts with pillar data when the user required.
|
||||
self.opts = salt.utils.dictupdate.merge(self.opts,
|
||||
self.opts['pillar'],
|
||||
strategy=self.opts.get('proxy_merge_pillar_in_opts_strategy'),
|
||||
merge_lists=self.opts.get('proxy_deep_merge_pillar_in_opts', False))
|
||||
elif self.opts.get('proxy_mines_pillar'):
|
||||
# Even when not required, some details such as mine configuration
|
||||
# should be merged anyway whenever possible.
|
||||
if 'mine_interval' in self.opts['pillar']:
|
||||
self.opts['mine_interval'] = self.opts['pillar']['mine_interval']
|
||||
if 'mine_functions' in self.opts['pillar']:
|
||||
general_proxy_mines = self.opts.get('mine_functions', [])
|
||||
specific_proxy_mines = self.opts['pillar']['mine_functions']
|
||||
try:
|
||||
self.opts['mine_functions'] = general_proxy_mines + specific_proxy_mines
|
||||
except TypeError as terr:
|
||||
log.error('Unable to merge mine functions from the pillar in the opts, for proxy {}'.format(
|
||||
self.opts['id']))
|
||||
|
||||
fq_proxyname = self.opts['proxy']['proxytype']
|
||||
|
||||
# Need to load the modules so they get all the dunder variables
|
||||
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
|
||||
|
||||
# we can then sync any proxymodules down from the master
|
||||
# we do a sync_all here in case proxy code was installed by
|
||||
# SPM or was manually placed in /srv/salt/_modules etc.
|
||||
self.functions['saltutil.sync_all'](saltenv=self.opts['saltenv'])
|
||||
|
||||
# Pull in the utils
|
||||
self.utils = salt.loader.utils(self.opts)
|
||||
|
||||
# Then load the proxy module
|
||||
self.proxy = salt.loader.proxy(self.opts, utils=self.utils)
|
||||
|
||||
# And re-load the modules so the __proxy__ variable gets injected
|
||||
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
|
||||
self.functions.pack['__proxy__'] = self.proxy
|
||||
self.proxy.pack['__salt__'] = self.functions
|
||||
self.proxy.pack['__ret__'] = self.returners
|
||||
self.proxy.pack['__pillar__'] = self.opts['pillar']
|
||||
|
||||
# Reload utils as well (chicken and egg, __utils__ needs __proxy__ and __proxy__ needs __utils__
|
||||
self.utils = salt.loader.utils(self.opts, proxy=self.proxy)
|
||||
self.proxy.pack['__utils__'] = self.utils
|
||||
|
||||
# Reload all modules so all dunder variables are injected
|
||||
self.proxy.reload_modules()
|
||||
|
||||
# Start engines here instead of in the Minion superclass __init__
|
||||
# This is because we need to inject the __proxy__ variable but
|
||||
# it is not setup until now.
|
||||
self.io_loop.spawn_callback(salt.engines.start_engines, self.opts,
|
||||
self.process_manager, proxy=self.proxy)
|
||||
|
||||
if ('{0}.init'.format(fq_proxyname) not in self.proxy
|
||||
or '{0}.shutdown'.format(fq_proxyname) not in self.proxy):
|
||||
errmsg = 'Proxymodule {0} is missing an init() or a shutdown() or both. '.format(fq_proxyname) + \
|
||||
'Check your proxymodule. Salt-proxy aborted.'
|
||||
log.error(errmsg)
|
||||
self._running = False
|
||||
raise SaltSystemExit(code=-1, msg=errmsg)
|
||||
|
||||
self.module_executors = self.proxy.get('{0}.module_executors'.format(fq_proxyname), lambda: [])()
|
||||
proxy_init_fn = self.proxy[fq_proxyname + '.init']
|
||||
proxy_init_fn(self.opts)
|
||||
|
||||
self.opts['grains'] = salt.loader.grains(self.opts, proxy=self.proxy)
|
||||
|
||||
self.serial = salt.payload.Serial(self.opts)
|
||||
self.mod_opts = self._prep_mod_opts()
|
||||
self.matchers = salt.loader.matchers(self.opts)
|
||||
self.beacons = salt.beacons.Beacon(self.opts, self.functions)
|
||||
uid = salt.utils.user.get_uid(user=self.opts.get('user', None))
|
||||
self.proc_dir = salt.minion.get_proc_dir(self.opts['cachedir'], uid=uid)
|
||||
|
||||
if self.connected and self.opts['pillar']:
|
||||
# The pillar has changed due to the connection to the master.
|
||||
# Reload the functions so that they can use the new pillar data.
|
||||
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
|
||||
if hasattr(self, 'schedule'):
|
||||
self.schedule.functions = self.functions
|
||||
self.schedule.returners = self.returners
|
||||
|
||||
if not hasattr(self, 'schedule'):
|
||||
self.schedule = salt.utils.schedule.Schedule(
|
||||
self.opts,
|
||||
self.functions,
|
||||
self.returners,
|
||||
cleanup=[salt.minion.master_event(type='alive')],
|
||||
proxy=self.proxy)
|
||||
|
||||
# add default scheduling jobs to the minions scheduler
|
||||
if self.opts['mine_enabled'] and 'mine.update' in self.functions:
|
||||
self.schedule.add_job({
|
||||
'__mine_interval':
|
||||
{
|
||||
'function': 'mine.update',
|
||||
'minutes': self.opts['mine_interval'],
|
||||
'jid_include': True,
|
||||
'maxrunning': 2,
|
||||
'run_on_start': True,
|
||||
'return_job': self.opts.get('mine_return_job', False)
|
||||
}
|
||||
}, persist=True)
|
||||
log.info('Added mine.update to scheduler')
|
||||
else:
|
||||
self.schedule.delete_job('__mine_interval', persist=True)
|
||||
|
||||
# add master_alive job if enabled
|
||||
if (self.opts['transport'] != 'tcp' and
|
||||
self.opts['master_alive_interval'] > 0):
|
||||
self.schedule.add_job({
|
||||
salt.minion.master_event(type='alive', master=self.opts['master']):
|
||||
{
|
||||
'function': 'status.master',
|
||||
'seconds': self.opts['master_alive_interval'],
|
||||
'jid_include': True,
|
||||
'maxrunning': 1,
|
||||
'return_job': False,
|
||||
'kwargs': {'master': self.opts['master'],
|
||||
'connected': True}
|
||||
}
|
||||
}, persist=True)
|
||||
if self.opts['master_failback'] and \
|
||||
'master_list' in self.opts and \
|
||||
self.opts['master'] != self.opts['master_list'][0]:
|
||||
self.schedule.add_job({
|
||||
salt.minion.master_event(type='failback'):
|
||||
{
|
||||
'function': 'status.ping_master',
|
||||
'seconds': self.opts['master_failback_interval'],
|
||||
'jid_include': True,
|
||||
'maxrunning': 1,
|
||||
'return_job': False,
|
||||
'kwargs': {'master': self.opts['master_list'][0]}
|
||||
}
|
||||
}, persist=True)
|
||||
else:
|
||||
self.schedule.delete_job(salt.minion.master_event(type='failback'), persist=True)
|
||||
else:
|
||||
self.schedule.delete_job(salt.minion.master_event(type='alive', master=self.opts['master']), persist=True)
|
||||
self.schedule.delete_job(salt.minion.master_event(type='failback'), persist=True)
|
||||
|
||||
# proxy keepalive
|
||||
proxy_alive_fn = fq_proxyname+'.alive'
|
||||
if (proxy_alive_fn in self.proxy
|
||||
and 'status.proxy_reconnect' in self.functions
|
||||
and self.opts.get('proxy_keep_alive', True)):
|
||||
# if `proxy_keep_alive` is either not specified, either set to False does not retry reconnecting
|
||||
self.schedule.add_job({
|
||||
'__proxy_keepalive':
|
||||
{
|
||||
'function': 'status.proxy_reconnect',
|
||||
'minutes': self.opts.get('proxy_keep_alive_interval', 1), # by default, check once per minute
|
||||
'jid_include': True,
|
||||
'maxrunning': 1,
|
||||
'return_job': False,
|
||||
'kwargs': {
|
||||
'proxy_name': fq_proxyname
|
||||
}
|
||||
}
|
||||
}, persist=True)
|
||||
self.schedule.enable_schedule()
|
||||
else:
|
||||
self.schedule.delete_job('__proxy_keepalive', persist=True)
|
||||
|
||||
# Sync the grains here so the proxy can communicate them to the master
|
||||
self.functions['saltutil.sync_grains'](saltenv='base')
|
||||
self.grains_cache = self.opts['grains']
|
||||
self.ready = True
|
||||
|
||||
|
||||
def target(cls, minion_instance, opts, data, connected):
|
||||
|
||||
if not minion_instance:
|
||||
minion_instance = cls(opts)
|
||||
minion_instance.connected = connected
|
||||
if not hasattr(minion_instance, 'functions'):
|
||||
# Need to load the modules so they get all the dunder variables
|
||||
functions, returners, function_errors, executors = (
|
||||
minion_instance._load_modules(grains=opts['grains'])
|
||||
)
|
||||
minion_instance.functions = functions
|
||||
minion_instance.returners = returners
|
||||
minion_instance.function_errors = function_errors
|
||||
minion_instance.executors = executors
|
||||
|
||||
# Pull in the utils
|
||||
minion_instance.utils = salt.loader.utils(minion_instance.opts)
|
||||
|
||||
# Then load the proxy module
|
||||
minion_instance.proxy = salt.loader.proxy(minion_instance.opts, utils=minion_instance.utils)
|
||||
|
||||
# And re-load the modules so the __proxy__ variable gets injected
|
||||
functions, returners, function_errors, executors = (
|
||||
minion_instance._load_modules(grains=opts['grains'])
|
||||
)
|
||||
minion_instance.functions = functions
|
||||
minion_instance.returners = returners
|
||||
minion_instance.function_errors = function_errors
|
||||
minion_instance.executors = executors
|
||||
|
||||
minion_instance.functions.pack['__proxy__'] = minion_instance.proxy
|
||||
minion_instance.proxy.pack['__salt__'] = minion_instance.functions
|
||||
minion_instance.proxy.pack['__ret__'] = minion_instance.returners
|
||||
minion_instance.proxy.pack['__pillar__'] = minion_instance.opts['pillar']
|
||||
|
||||
# Reload utils as well (chicken and egg, __utils__ needs __proxy__ and __proxy__ needs __utils__
|
||||
minion_instance.utils = salt.loader.utils(minion_instance.opts, proxy=minion_instance.proxy)
|
||||
minion_instance.proxy.pack['__utils__'] = minion_instance.utils
|
||||
|
||||
# Reload all modules so all dunder variables are injected
|
||||
minion_instance.proxy.reload_modules()
|
||||
|
||||
fq_proxyname = opts['proxy']['proxytype']
|
||||
|
||||
minion_instance.module_executors = minion_instance.proxy.get('{0}.module_executors'.format(fq_proxyname), lambda: [])()
|
||||
|
||||
proxy_init_fn = minion_instance.proxy[fq_proxyname + '.init']
|
||||
proxy_init_fn(opts)
|
||||
if not hasattr(minion_instance, 'serial'):
|
||||
minion_instance.serial = salt.payload.Serial(opts)
|
||||
if not hasattr(minion_instance, 'proc_dir'):
|
||||
uid = salt.utils.user.get_uid(user=opts.get('user', None))
|
||||
minion_instance.proc_dir = (
|
||||
salt.minion.get_proc_dir(opts['cachedir'], uid=uid)
|
||||
)
|
||||
|
||||
with tornado.stack_context.StackContext(minion_instance.ctx):
|
||||
if isinstance(data['fun'], tuple) or isinstance(data['fun'], list):
|
||||
ProxyMinion._thread_multi_return(minion_instance, opts, data)
|
||||
else:
|
||||
ProxyMinion._thread_return(minion_instance, opts, data)
|
||||
|
||||
|
||||
def thread_return(cls, minion_instance, opts, data):
|
||||
'''
|
||||
This method should be used as a threading target, start the actual
|
||||
minion side execution.
|
||||
'''
|
||||
fn_ = os.path.join(minion_instance.proc_dir, data['jid'])
|
||||
|
||||
if opts['multiprocessing'] and not salt.utils.platform.is_windows():
|
||||
# Shutdown the multiprocessing before daemonizing
|
||||
salt.log.setup.shutdown_multiprocessing_logging()
|
||||
|
||||
salt.utils.process.daemonize_if(opts)
|
||||
|
||||
# Reconfigure multiprocessing logging after daemonizing
|
||||
salt.log.setup.setup_multiprocessing_logging()
|
||||
|
||||
salt.utils.process.appendproctitle('{0}._thread_return {1}'.format(cls.__name__, data['jid']))
|
||||
|
||||
sdata = {'pid': os.getpid()}
|
||||
sdata.update(data)
|
||||
log.info('Starting a new job with PID %s', sdata['pid'])
|
||||
with salt.utils.files.fopen(fn_, 'w+b') as fp_:
|
||||
fp_.write(minion_instance.serial.dumps(sdata))
|
||||
ret = {'success': False}
|
||||
function_name = data['fun']
|
||||
executors = data.get('module_executors') or \
|
||||
getattr(minion_instance, 'module_executors', []) or \
|
||||
opts.get('module_executors', ['direct_call'])
|
||||
allow_missing_funcs = any([
|
||||
minion_instance.executors['{0}.allow_missing_func'.format(executor)](function_name)
|
||||
for executor in executors
|
||||
if '{0}.allow_missing_func' in minion_instance.executors
|
||||
])
|
||||
if function_name in minion_instance.functions or allow_missing_funcs is True:
|
||||
try:
|
||||
minion_blackout_violation = False
|
||||
if minion_instance.connected and minion_instance.opts['pillar'].get('minion_blackout', False):
|
||||
whitelist = minion_instance.opts['pillar'].get('minion_blackout_whitelist', [])
|
||||
# this minion is blacked out. Only allow saltutil.refresh_pillar and the whitelist
|
||||
if function_name != 'saltutil.refresh_pillar' and function_name not in whitelist:
|
||||
minion_blackout_violation = True
|
||||
# use minion_blackout_whitelist from grains if it exists
|
||||
if minion_instance.opts['grains'].get('minion_blackout', False):
|
||||
whitelist = minion_instance.opts['grains'].get('minion_blackout_whitelist', [])
|
||||
if function_name != 'saltutil.refresh_pillar' and function_name not in whitelist:
|
||||
minion_blackout_violation = True
|
||||
if minion_blackout_violation:
|
||||
raise SaltInvocationError('Minion in blackout mode. Set \'minion_blackout\' '
|
||||
'to False in pillar or grains to resume operations. Only '
|
||||
'saltutil.refresh_pillar allowed in blackout mode.')
|
||||
|
||||
if function_name in minion_instance.functions:
|
||||
func = minion_instance.functions[function_name]
|
||||
args, kwargs = salt.minion.load_args_and_kwargs(
|
||||
func,
|
||||
data['arg'],
|
||||
data)
|
||||
else:
|
||||
# only run if function_name is not in minion_instance.functions and allow_missing_funcs is True
|
||||
func = function_name
|
||||
args, kwargs = data['arg'], data
|
||||
minion_instance.functions.pack['__context__']['retcode'] = 0
|
||||
if isinstance(executors, six.string_types):
|
||||
executors = [executors]
|
||||
elif not isinstance(executors, list) or not executors:
|
||||
raise SaltInvocationError("Wrong executors specification: {0}. String or non-empty list expected".
|
||||
format(executors))
|
||||
if opts.get('sudo_user', '') and executors[-1] != 'sudo':
|
||||
executors[-1] = 'sudo' # replace the last one with sudo
|
||||
log.trace('Executors list %s', executors) # pylint: disable=no-member
|
||||
|
||||
for name in executors:
|
||||
fname = '{0}.execute'.format(name)
|
||||
if fname not in minion_instance.executors:
|
||||
raise SaltInvocationError("Executor '{0}' is not available".format(name))
|
||||
return_data = minion_instance.executors[fname](opts, data, func, args, kwargs)
|
||||
if return_data is not None:
|
||||
break
|
||||
|
||||
if isinstance(return_data, types.GeneratorType):
|
||||
ind = 0
|
||||
iret = {}
|
||||
for single in return_data:
|
||||
if isinstance(single, dict) and isinstance(iret, dict):
|
||||
iret.update(single)
|
||||
else:
|
||||
if not iret:
|
||||
iret = []
|
||||
iret.append(single)
|
||||
tag = tagify([data['jid'], 'prog', opts['id'], six.text_type(ind)], 'job')
|
||||
event_data = {'return': single}
|
||||
minion_instance._fire_master(event_data, tag)
|
||||
ind += 1
|
||||
ret['return'] = iret
|
||||
else:
|
||||
ret['return'] = return_data
|
||||
|
||||
retcode = minion_instance.functions.pack['__context__'].get(
|
||||
'retcode',
|
||||
salt.defaults.exitcodes.EX_OK
|
||||
)
|
||||
if retcode == salt.defaults.exitcodes.EX_OK:
|
||||
# No nonzero retcode in __context__ dunder. Check if return
|
||||
# is a dictionary with a "result" or "success" key.
|
||||
try:
|
||||
func_result = all(return_data.get(x, True)
|
||||
for x in ('result', 'success'))
|
||||
except Exception:
|
||||
# return data is not a dict
|
||||
func_result = True
|
||||
if not func_result:
|
||||
retcode = salt.defaults.exitcodes.EX_GENERIC
|
||||
|
||||
ret['retcode'] = retcode
|
||||
ret['success'] = retcode == salt.defaults.exitcodes.EX_OK
|
||||
except CommandNotFoundError as exc:
|
||||
msg = 'Command required for \'{0}\' not found'.format(
|
||||
function_name
|
||||
)
|
||||
log.debug(msg, exc_info=True)
|
||||
ret['return'] = '{0}: {1}'.format(msg, exc)
|
||||
ret['out'] = 'nested'
|
||||
ret['retcode'] = salt.defaults.exitcodes.EX_GENERIC
|
||||
except CommandExecutionError as exc:
|
||||
log.error(
|
||||
'A command in \'%s\' had a problem: %s',
|
||||
function_name, exc,
|
||||
exc_info_on_loglevel=logging.DEBUG
|
||||
)
|
||||
ret['return'] = 'ERROR: {0}'.format(exc)
|
||||
ret['out'] = 'nested'
|
||||
ret['retcode'] = salt.defaults.exitcodes.EX_GENERIC
|
||||
except SaltInvocationError as exc:
|
||||
log.error(
|
||||
'Problem executing \'%s\': %s',
|
||||
function_name, exc,
|
||||
exc_info_on_loglevel=logging.DEBUG
|
||||
)
|
||||
ret['return'] = 'ERROR executing \'{0}\': {1}'.format(
|
||||
function_name, exc
|
||||
)
|
||||
ret['out'] = 'nested'
|
||||
ret['retcode'] = salt.defaults.exitcodes.EX_GENERIC
|
||||
except TypeError as exc:
|
||||
msg = 'Passed invalid arguments to {0}: {1}\n{2}'.format(
|
||||
function_name, exc, func.__doc__ or ''
|
||||
)
|
||||
log.warning(msg, exc_info_on_loglevel=logging.DEBUG)
|
||||
ret['return'] = msg
|
||||
ret['out'] = 'nested'
|
||||
ret['retcode'] = salt.defaults.exitcodes.EX_GENERIC
|
||||
except Exception:
|
||||
msg = 'The minion function caused an exception'
|
||||
log.warning(msg, exc_info_on_loglevel=True)
|
||||
salt.utils.error.fire_exception(salt.exceptions.MinionError(msg), opts, job=data)
|
||||
ret['return'] = '{0}: {1}'.format(msg, traceback.format_exc())
|
||||
ret['out'] = 'nested'
|
||||
ret['retcode'] = salt.defaults.exitcodes.EX_GENERIC
|
||||
else:
|
||||
docs = minion_instance.functions['sys.doc']('{0}*'.format(function_name))
|
||||
if docs:
|
||||
docs[function_name] = minion_instance.functions.missing_fun_string(function_name)
|
||||
ret['return'] = docs
|
||||
else:
|
||||
ret['return'] = minion_instance.functions.missing_fun_string(function_name)
|
||||
mod_name = function_name.split('.')[0]
|
||||
if mod_name in minion_instance.function_errors:
|
||||
ret['return'] += ' Possible reasons: \'{0}\''.format(
|
||||
minion_instance.function_errors[mod_name]
|
||||
)
|
||||
ret['success'] = False
|
||||
ret['retcode'] = salt.defaults.exitcodes.EX_GENERIC
|
||||
ret['out'] = 'nested'
|
||||
|
||||
ret['jid'] = data['jid']
|
||||
ret['fun'] = data['fun']
|
||||
ret['fun_args'] = data['arg']
|
||||
if 'master_id' in data:
|
||||
ret['master_id'] = data['master_id']
|
||||
if 'metadata' in data:
|
||||
if isinstance(data['metadata'], dict):
|
||||
ret['metadata'] = data['metadata']
|
||||
else:
|
||||
log.warning('The metadata parameter must be a dictionary. Ignoring.')
|
||||
if minion_instance.connected:
|
||||
minion_instance._return_pub(
|
||||
ret,
|
||||
timeout=minion_instance._return_retry_timer()
|
||||
)
|
||||
|
||||
# Add default returners from minion config
|
||||
# Should have been coverted to comma-delimited string already
|
||||
if isinstance(opts.get('return'), six.string_types):
|
||||
if data['ret']:
|
||||
data['ret'] = ','.join((data['ret'], opts['return']))
|
||||
else:
|
||||
data['ret'] = opts['return']
|
||||
|
||||
log.debug('minion return: %s', ret)
|
||||
# TODO: make a list? Seems odd to split it this late :/
|
||||
if data['ret'] and isinstance(data['ret'], six.string_types):
|
||||
if 'ret_config' in data:
|
||||
ret['ret_config'] = data['ret_config']
|
||||
if 'ret_kwargs' in data:
|
||||
ret['ret_kwargs'] = data['ret_kwargs']
|
||||
ret['id'] = opts['id']
|
||||
for returner in set(data['ret'].split(',')):
|
||||
try:
|
||||
returner_str = '{0}.returner'.format(returner)
|
||||
if returner_str in minion_instance.returners:
|
||||
minion_instance.returners[returner_str](ret)
|
||||
else:
|
||||
returner_err = minion_instance.returners.missing_fun_string(returner_str)
|
||||
log.error(
|
||||
'Returner %s could not be loaded: %s',
|
||||
returner_str, returner_err
|
||||
)
|
||||
except Exception as exc:
|
||||
log.exception(
|
||||
'The return failed for job %s: %s', data['jid'], exc
|
||||
)
|
||||
|
||||
|
||||
def thread_multi_return(cls, minion_instance, opts, data):
|
||||
'''
|
||||
This method should be used as a threading target, start the actual
|
||||
minion side execution.
|
||||
'''
|
||||
fn_ = os.path.join(minion_instance.proc_dir, data['jid'])
|
||||
|
||||
if opts['multiprocessing'] and not salt.utils.platform.is_windows():
|
||||
# Shutdown the multiprocessing before daemonizing
|
||||
salt.log.setup.shutdown_multiprocessing_logging()
|
||||
|
||||
salt.utils.process.daemonize_if(opts)
|
||||
|
||||
# Reconfigure multiprocessing logging after daemonizing
|
||||
salt.log.setup.setup_multiprocessing_logging()
|
||||
|
||||
salt.utils.process.appendproctitle('{0}._thread_multi_return {1}'.format(cls.__name__, data['jid']))
|
||||
|
||||
sdata = {'pid': os.getpid()}
|
||||
sdata.update(data)
|
||||
log.info('Starting a new job with PID %s', sdata['pid'])
|
||||
with salt.utils.files.fopen(fn_, 'w+b') as fp_:
|
||||
fp_.write(minion_instance.serial.dumps(sdata))
|
||||
|
||||
multifunc_ordered = opts.get('multifunc_ordered', False)
|
||||
num_funcs = len(data['fun'])
|
||||
if multifunc_ordered:
|
||||
ret = {
|
||||
'return': [None] * num_funcs,
|
||||
'retcode': [None] * num_funcs,
|
||||
'success': [False] * num_funcs
|
||||
}
|
||||
else:
|
||||
ret = {
|
||||
'return': {},
|
||||
'retcode': {},
|
||||
'success': {}
|
||||
}
|
||||
|
||||
for ind in range(0, num_funcs):
|
||||
if not multifunc_ordered:
|
||||
ret['success'][data['fun'][ind]] = False
|
||||
try:
|
||||
minion_blackout_violation = False
|
||||
if minion_instance.connected and minion_instance.opts['pillar'].get('minion_blackout', False):
|
||||
whitelist = minion_instance.opts['pillar'].get('minion_blackout_whitelist', [])
|
||||
# this minion is blacked out. Only allow saltutil.refresh_pillar and the whitelist
|
||||
if data['fun'][ind] != 'saltutil.refresh_pillar' and data['fun'][ind] not in whitelist:
|
||||
minion_blackout_violation = True
|
||||
elif minion_instance.opts['grains'].get('minion_blackout', False):
|
||||
whitelist = minion_instance.opts['grains'].get('minion_blackout_whitelist', [])
|
||||
if data['fun'][ind] != 'saltutil.refresh_pillar' and data['fun'][ind] not in whitelist:
|
||||
minion_blackout_violation = True
|
||||
if minion_blackout_violation:
|
||||
raise SaltInvocationError('Minion in blackout mode. Set \'minion_blackout\' '
|
||||
'to False in pillar or grains to resume operations. Only '
|
||||
'saltutil.refresh_pillar allowed in blackout mode.')
|
||||
|
||||
func = minion_instance.functions[data['fun'][ind]]
|
||||
|
||||
args, kwargs = salt.minion.load_args_and_kwargs(
|
||||
func,
|
||||
data['arg'][ind],
|
||||
data)
|
||||
minion_instance.functions.pack['__context__']['retcode'] = 0
|
||||
key = ind if multifunc_ordered else data['fun'][ind]
|
||||
ret['return'][key] = func(*args, **kwargs)
|
||||
retcode = minion_instance.functions.pack['__context__'].get(
|
||||
'retcode',
|
||||
0
|
||||
)
|
||||
if retcode == 0:
|
||||
# No nonzero retcode in __context__ dunder. Check if return
|
||||
# is a dictionary with a "result" or "success" key.
|
||||
try:
|
||||
func_result = all(ret['return'][key].get(x, True)
|
||||
for x in ('result', 'success'))
|
||||
except Exception:
|
||||
# return data is not a dict
|
||||
func_result = True
|
||||
if not func_result:
|
||||
retcode = 1
|
||||
|
||||
ret['retcode'][key] = retcode
|
||||
ret['success'][key] = retcode == 0
|
||||
except Exception as exc:
|
||||
trb = traceback.format_exc()
|
||||
log.warning('The minion function caused an exception: %s', exc)
|
||||
if multifunc_ordered:
|
||||
ret['return'][ind] = trb
|
||||
else:
|
||||
ret['return'][data['fun'][ind]] = trb
|
||||
ret['jid'] = data['jid']
|
||||
ret['fun'] = data['fun']
|
||||
ret['fun_args'] = data['arg']
|
||||
if 'metadata' in data:
|
||||
ret['metadata'] = data['metadata']
|
||||
if minion_instance.connected:
|
||||
minion_instance._return_pub(
|
||||
ret,
|
||||
timeout=minion_instance._return_retry_timer()
|
||||
)
|
||||
if data['ret']:
|
||||
if 'ret_config' in data:
|
||||
ret['ret_config'] = data['ret_config']
|
||||
if 'ret_kwargs' in data:
|
||||
ret['ret_kwargs'] = data['ret_kwargs']
|
||||
for returner in set(data['ret'].split(',')):
|
||||
ret['id'] = opts['id']
|
||||
try:
|
||||
minion_instance.returners['{0}.returner'.format(
|
||||
returner
|
||||
)](ret)
|
||||
except Exception as exc:
|
||||
log.error(
|
||||
'The return failed for job %s: %s',
|
||||
data['jid'], exc
|
||||
)
|
||||
|
||||
|
||||
def handle_payload(self, payload):
|
||||
if payload is not None and payload['enc'] == 'aes':
|
||||
if self._target_load(payload['load']):
|
||||
|
||||
self._handle_decoded_payload(payload['load'])
|
||||
elif self.opts['zmq_filtering']:
|
||||
# In the filtering enabled case, we'd like to know when minion sees something it shouldnt
|
||||
log.trace(
|
||||
'Broadcast message received not for this minion, Load: %s',
|
||||
payload['load']
|
||||
)
|
||||
# If it's not AES, and thus has not been verified, we do nothing.
|
||||
# In the future, we could add support for some clearfuncs, but
|
||||
# the minion currently has no need.
|
||||
|
||||
|
||||
def handle_decoded_payload(self, data):
|
||||
'''
|
||||
Override this method if you wish to handle the decoded data
|
||||
differently.
|
||||
'''
|
||||
# Ensure payload is unicode. Disregard failure to decode binary blobs.
|
||||
if six.PY2:
|
||||
data = salt.utils.data.decode(data, keep=True)
|
||||
if 'user' in data:
|
||||
log.info(
|
||||
'User %s Executing command %s with jid %s',
|
||||
data['user'], data['fun'], data['jid']
|
||||
)
|
||||
else:
|
||||
log.info(
|
||||
'Executing command %s with jid %s',
|
||||
data['fun'], data['jid']
|
||||
)
|
||||
log.debug('Command details %s', data)
|
||||
|
||||
# Don't duplicate jobs
|
||||
log.trace('Started JIDs: %s', self.jid_queue)
|
||||
if self.jid_queue is not None:
|
||||
if data['jid'] in self.jid_queue:
|
||||
return
|
||||
else:
|
||||
self.jid_queue.append(data['jid'])
|
||||
if len(self.jid_queue) > self.opts['minion_jid_queue_hwm']:
|
||||
self.jid_queue.pop(0)
|
||||
|
||||
if isinstance(data['fun'], six.string_types):
|
||||
if data['fun'] == 'sys.reload_modules':
|
||||
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
|
||||
self.schedule.functions = self.functions
|
||||
self.schedule.returners = self.returners
|
||||
|
||||
process_count_max = self.opts.get('process_count_max')
|
||||
if process_count_max > 0:
|
||||
process_count = len(salt.utils.minion.running(self.opts))
|
||||
while process_count >= process_count_max:
|
||||
log.warning("Maximum number of processes reached while executing jid {0}, waiting...".format(data['jid']))
|
||||
yield tornado.gen.sleep(10)
|
||||
process_count = len(salt.utils.minion.running(self.opts))
|
||||
|
||||
# We stash an instance references to allow for the socket
|
||||
# communication in Windows. You can't pickle functions, and thus
|
||||
# python needs to be able to reconstruct the reference on the other
|
||||
# side.
|
||||
instance = self
|
||||
multiprocessing_enabled = self.opts.get('multiprocessing', True)
|
||||
if multiprocessing_enabled:
|
||||
if sys.platform.startswith('win'):
|
||||
# let python reconstruct the minion on the other side if we're
|
||||
# running on windows
|
||||
instance = None
|
||||
with default_signals(signal.SIGINT, signal.SIGTERM):
|
||||
process = SignalHandlingMultiprocessingProcess(
|
||||
target=self._target, args=(instance, self.opts, data, self.connected)
|
||||
)
|
||||
else:
|
||||
process = threading.Thread(
|
||||
target=self._target,
|
||||
args=(instance, self.opts, data, self.connected),
|
||||
name=data['jid']
|
||||
)
|
||||
|
||||
if multiprocessing_enabled:
|
||||
with default_signals(signal.SIGINT, signal.SIGTERM):
|
||||
# Reset current signals before starting the process in
|
||||
# order not to inherit the current signal handlers
|
||||
process.start()
|
||||
else:
|
||||
process.start()
|
||||
|
||||
# TODO: remove the windows specific check?
|
||||
if multiprocessing_enabled and not salt.utils.platform.is_windows():
|
||||
# we only want to join() immediately if we are daemonizing a process
|
||||
process.join()
|
||||
else:
|
||||
self.win_proc.append(process)
|
||||
|
||||
|
||||
def target_load(self, load):
|
||||
# Verify that the publication is valid
|
||||
if 'tgt' not in load or 'jid' not in load or 'fun' not in load \
|
||||
or 'arg' not in load:
|
||||
return False
|
||||
# Verify that the publication applies to this minion
|
||||
|
||||
# It's important to note that the master does some pre-processing
|
||||
# to determine which minions to send a request to. So for example,
|
||||
# a "salt -G 'grain_key:grain_val' test.ping" will invoke some
|
||||
# pre-processing on the master and this minion should not see the
|
||||
# publication if the master does not determine that it should.
|
||||
if 'tgt_type' in load:
|
||||
match_func = self.matchers.get('{0}_match.match'.format(load['tgt_type']), None)
|
||||
if match_func is None:
|
||||
return False
|
||||
if load['tgt_type'] in ('grain', 'grain_pcre', 'pillar'):
|
||||
delimiter = load.get('delimiter', DEFAULT_TARGET_DELIM)
|
||||
if not match_func(load['tgt'], delimiter=delimiter):
|
||||
return False
|
||||
elif not match_func(load['tgt']):
|
||||
return False
|
||||
else:
|
||||
if not self.matchers['glob_match.match'](load['tgt']):
|
||||
return False
|
||||
|
||||
return True
|
245
tests/unit/modules/test_match.py
Normal file
245
tests/unit/modules/test_match.py
Normal file
|
@ -0,0 +1,245 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
'''
|
||||
:codeauthor: Oleg Lipovchenko <oleg.lipovchenko@gmail.com>
|
||||
'''
|
||||
|
||||
# Import python libs
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
|
||||
|
||||
# Import Salt Testing libs
|
||||
from tests.support.mixins import LoaderModuleMockMixin
|
||||
from tests.support.unit import TestCase, skipIf
|
||||
from tests.support.mock import (
|
||||
MagicMock,
|
||||
patch,
|
||||
NO_MOCK,
|
||||
NO_MOCK_REASON,
|
||||
)
|
||||
|
||||
# Import Salt Libs
|
||||
from salt.exceptions import SaltException
|
||||
import salt.modules.match as match
|
||||
import salt.matchers.compound_match as compound_match
|
||||
import salt.matchers.glob_match as glob_match
|
||||
import salt.matchers.list_match as list_match
|
||||
|
||||
MATCHERS_DICT = {
|
||||
'compound_match.match': compound_match.match,
|
||||
'glob_match.match': glob_match.match,
|
||||
'list_match.match': list_match.match
|
||||
}
|
||||
|
||||
# the name of the minion to be used for tests
|
||||
MINION_ID = 'bar03'
|
||||
|
||||
|
||||
@skipIf(NO_MOCK, NO_MOCK_REASON)
|
||||
@patch('salt.loader.matchers', MagicMock(return_value=MATCHERS_DICT))
|
||||
class MatchTestCase(TestCase, LoaderModuleMockMixin):
|
||||
'''
|
||||
This class contains a set of functions that test salt.modules.match.
|
||||
'''
|
||||
|
||||
def setup_loader_modules(self):
|
||||
return {
|
||||
match: {
|
||||
'__opts__': {
|
||||
'extension_modules': '',
|
||||
'id': MINION_ID
|
||||
}
|
||||
},
|
||||
compound_match: {
|
||||
'__opts__': {
|
||||
'id': MINION_ID
|
||||
}
|
||||
},
|
||||
glob_match: {
|
||||
'__opts__': {'id': MINION_ID}
|
||||
},
|
||||
list_match: {
|
||||
'__opts__': {'id': MINION_ID}
|
||||
}
|
||||
}
|
||||
|
||||
def test_filter_by(self):
|
||||
'''
|
||||
Tests if filter_by returns the correct dictionary.
|
||||
'''
|
||||
lookup = {
|
||||
'foo*': {
|
||||
'key1': 'fooval1', 'key2': 'fooval2'
|
||||
},
|
||||
'bar*': {
|
||||
'key1': 'barval1', 'key2': 'barval2'
|
||||
}
|
||||
}
|
||||
result = {'key1': 'barval1', 'key2': 'barval2'}
|
||||
|
||||
self.assertDictEqual(match.filter_by(lookup), result)
|
||||
|
||||
def test_filter_by_merge(self):
|
||||
'''
|
||||
Tests if filter_by returns a dictionary merged with another dictionary.
|
||||
'''
|
||||
lookup = {
|
||||
'foo*': {
|
||||
'key1': 'fooval1', 'key2': 'fooval2'
|
||||
},
|
||||
'bar*': {
|
||||
'key1': 'barval1', 'key2': 'barval2'
|
||||
}
|
||||
}
|
||||
mdict = {'key1': 'mergeval1'}
|
||||
result = {'key1': 'mergeval1', 'key2': 'barval2'}
|
||||
|
||||
self.assertDictEqual(match.filter_by(lookup, merge=mdict), result)
|
||||
|
||||
def test_filter_by_merge_lists_rep(self):
|
||||
'''
|
||||
Tests if filter_by merges list values by replacing the original list
|
||||
values with the merged list values.
|
||||
'''
|
||||
lookup = {
|
||||
'foo*': {
|
||||
'list_key': []
|
||||
},
|
||||
'bar*': {
|
||||
'list_key': [
|
||||
'val1',
|
||||
'val2'
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
mdict = {
|
||||
'list_key': [
|
||||
'val3',
|
||||
'val4'
|
||||
]
|
||||
}
|
||||
|
||||
# list replacement specified by the merge_lists=False option
|
||||
result = {
|
||||
'list_key': [
|
||||
'val3',
|
||||
'val4'
|
||||
]
|
||||
}
|
||||
|
||||
self.assertDictEqual(match.filter_by(lookup, merge=mdict, merge_lists=False), result)
|
||||
|
||||
def test_filter_by_merge_lists_agg(self):
|
||||
'''
|
||||
Tests if filter_by merges list values by aggregating them.
|
||||
'''
|
||||
lookup = {
|
||||
'foo*': {
|
||||
'list_key': []
|
||||
},
|
||||
'bar*': {
|
||||
'list_key': [
|
||||
'val1',
|
||||
'val2'
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
mdict = {
|
||||
'list_key': [
|
||||
'val3',
|
||||
'val4'
|
||||
]
|
||||
}
|
||||
|
||||
# list aggregation specified by the merge_lists=True option
|
||||
result = {
|
||||
'list_key': [
|
||||
'val1',
|
||||
'val2',
|
||||
'val3',
|
||||
'val4'
|
||||
]
|
||||
}
|
||||
|
||||
self.assertDictEqual(match.filter_by(lookup, merge=mdict, merge_lists=True), result)
|
||||
|
||||
def test_filter_by_merge_with_none(self):
|
||||
'''
|
||||
Tests if filter_by merges a None object with a merge dictionary.
|
||||
'''
|
||||
lookup = {
|
||||
'foo*': {
|
||||
'key1': 'fooval1', 'key2': 'fooval2'
|
||||
},
|
||||
'bar*': None
|
||||
}
|
||||
|
||||
# mdict should also be the returned dictionary
|
||||
# since a merge is done with None
|
||||
mdict = {'key1': 'mergeval1'}
|
||||
|
||||
self.assertDictEqual(match.filter_by(lookup, merge=mdict), mdict)
|
||||
|
||||
def test_filter_by_merge_fail(self):
|
||||
'''
|
||||
Tests for an exception if a merge is done without a dictionary.
|
||||
'''
|
||||
lookup = {
|
||||
'foo*': {
|
||||
'key1': 'fooval1', 'key2': 'fooval2'
|
||||
},
|
||||
'bar*': {
|
||||
'key1': 'barval1', 'key2': 'barval2'
|
||||
}
|
||||
}
|
||||
mdict = 'notadict'
|
||||
|
||||
self.assertRaises(
|
||||
SaltException,
|
||||
match.filter_by,
|
||||
lookup,
|
||||
merge=mdict
|
||||
)
|
||||
|
||||
def test_watch_for_opts_mismatch_glob_match(self):
|
||||
'''
|
||||
Tests for situations where the glob matcher might reference __opts__ directly
|
||||
instead of the local opts variable.
|
||||
|
||||
When metaproxies/proxy minions are in use, matchers get called with a different `opts`
|
||||
dictionary. Inside the matchers we check to see if `opts` was passed
|
||||
and use it instead of `__opts__`. If sometime in the future we update the matchers
|
||||
and use `__opts__` directly this breaks proxy matching.
|
||||
'''
|
||||
self.assertTrue(glob_match.match('bar03'))
|
||||
self.assertTrue(glob_match.match('rest03', {'id': 'rest03'}))
|
||||
self.assertFalse(glob_match.match('rest03'))
|
||||
|
||||
def test_watch_for_opts_mismatch_list_match(self):
|
||||
'''
|
||||
Tests for situations where the list matcher might reference __opts__ directly
|
||||
instead of the local opts variable
|
||||
|
||||
When metaproxies/proxy minions are in use, matchers get called with a different `opts`
|
||||
dictionary. Inside the matchers we check to see if `opts` was passed
|
||||
and use it instead of `__opts__`. If sometime in the future we update the matchers
|
||||
and use `__opts__` directly this breaks proxy matching.
|
||||
'''
|
||||
self.assertTrue(list_match.match('bar03'))
|
||||
self.assertTrue(list_match.match('rest03', {'id': 'rest03'}))
|
||||
self.assertFalse(list_match.match('rest03'))
|
||||
|
||||
def test_watch_for_opts_mismatch_compound_match(self):
|
||||
'''
|
||||
Tests for situations where the compound matcher might reference __opts__ directly
|
||||
instead of the local opts variable
|
||||
|
||||
When metaproxies/proxy minions are in use, matchers get called with a different `opts`
|
||||
dictionary. Inside the matchers we check to see if `opts` was passed
|
||||
and use it instead of `__opts__`. If sometime in the future we update the matchers
|
||||
and use `__opts__` directly this breaks proxy matching.
|
||||
'''
|
||||
self.assertTrue(compound_match.match('L@bar03'))
|
||||
self.assertTrue(compound_match.match('L@rest03', {'id': 'rest03'}))
|
||||
self.assertFalse(compound_match.match('L@rest03'))
|
Loading…
Add table
Reference in a new issue