Merge pull request #54001 from garethgreenaway/add_metaproxy_2019_2_1

[2019.2.1] Porting metaproxy changes in #50183 to 2019.2.1
This commit is contained in:
Daniel Wozniak 2019-08-07 21:24:30 -07:00 committed by GitHub
commit 584e3f2460
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 1171 additions and 315 deletions

View file

@ -310,6 +310,18 @@ def raw_mod(opts, name, functions, mod='modules'):
return dict(loader._dict) # return a copy of *just* the funcs for `name`
def metaproxy(opts):
'''
Return functions used in the meta proxy
'''
return LazyLoader(
_module_dirs(opts, 'metaproxy'),
opts,
tag='metaproxy'
)
def matchers(opts):
'''
Return the matcher services plugins

View file

@ -17,13 +17,17 @@ def mmatch(expr,
greedy,
search_type,
regex_match=False,
exact_match=False):
exact_match=False,
opts=None):
'''
Helper function to search for minions in master caches
If 'greedy' return accepted minions that matched by the condition or absent in the cache.
If not 'greedy' return the only minions have cache data and matched by the condition.
'''
ckminions = salt.utils.minions.CkMinions(__opts__)
if not opts:
opts = __opts__
ckminions = salt.utils.minions.CkMinions(opts)
return ckminions._check_cache_minions(expr, delimiter, greedy,
search_type, regex_match=regex_match,

View file

@ -19,13 +19,15 @@ except ImportError:
log = logging.getLogger(__name__)
def match(tgt):
def match(tgt, opts=None):
'''
Runs the compound target check
'''
nodegroups = __opts__.get('nodegroups', {})
matchers = salt.loader.matchers(__opts__)
minion_id = __opts__.get('minion_id', __opts__['id'])
if not opts:
opts = __opts__
nodegroups = opts.get('nodegroups', {})
matchers = salt.loader.matchers(opts)
minion_id = opts.get('minion_id', opts['id'])
if not isinstance(tgt, six.string_types) and not isinstance(tgt, (list, tuple)):
log.error('Compound target received that is neither string, list nor tuple')
@ -90,7 +92,7 @@ def match(tgt):
return False
engine_args = [target_info['pattern']]
engine_kwargs = {}
engine_kwargs = {'opts': opts}
if target_info['delimiter']:
engine_kwargs['delimiter'] = target_info['delimiter']
@ -100,7 +102,7 @@ def match(tgt):
else:
# The match is not explicitly defined, evaluate it as a glob
results.append(six.text_type(matchers['glob_match.match'](word)))
results.append(six.text_type(matchers['glob_match.match'](word, opts)))
results = ' '.join(results)
log.debug('compound_match %s ? "%s" => "%s"', minion_id, tgt, results)

View file

@ -14,10 +14,13 @@ import salt.utils.minions # pylint: disable=3rd-party-module-not-gated
log = logging.getLogger(__name__)
def mmatch(expr, delimiter, greedy):
def mmatch(expr, delimiter, greedy, opts=None):
'''
Return the minions found by looking via pillar
'''
ckminions = salt.utils.minions.CkMinions(__opts__)
if not opts:
opts = __opts__
ckminions = salt.utils.minions.CkMinions(opts)
return ckminions._check_compound_minions(expr, delimiter, greedy,
pillar_exact=True)

View file

@ -16,13 +16,15 @@ import salt.loader # pylint: disable=3rd-party-module-not-gated
log = logging.getLogger(__name__)
def match(tgt, functions=None):
def match(tgt, functions=None, opts=None):
'''
Match based on the local data store on the minion
'''
if not opts:
opts = __opts__
if functions is None:
utils = salt.loader.utils(__opts__)
functions = salt.loader.minion_mods(__opts__, utils=utils)
utils = salt.loader.utils(opts)
functions = salt.loader.minion_mods(opts, utils=utils)
comps = tgt.split(':')
if len(comps) < 2:
return False

View file

@ -8,11 +8,13 @@ import fnmatch
from salt.ext import six # pylint: disable=3rd-party-module-not-gated
def match(tgt):
def match(tgt, opts=None):
'''
Returns true if the passed glob matches the id
'''
minion_id = __opts__.get('minion_id', __opts__['id'])
if not opts:
opts = __opts__
minion_id = opts.get('minion_id', opts['id'])
if not isinstance(tgt, six.string_types):
return False

View file

@ -12,15 +12,19 @@ import salt.utils.data # pylint: disable=3rd-party-module-not-gated
log = logging.getLogger(__name__)
def match(tgt, delimiter=DEFAULT_TARGET_DELIM):
def match(tgt, delimiter=DEFAULT_TARGET_DELIM, opts=None):
'''
Reads in the grains glob match
'''
if not opts:
opts = __opts__
log.debug('grains target: %s', tgt)
if delimiter not in tgt:
log.error('Got insufficient arguments for grains match '
'statement from master')
return False
return salt.utils.data.subdict_match(
__opts__['grains'], tgt, delimiter=delimiter
opts['grains'], tgt, delimiter=delimiter
)

View file

@ -12,14 +12,17 @@ import salt.utils.data # pylint: disable=3rd-party-module-not-gated
log = logging.getLogger(__name__)
def match(tgt, delimiter=DEFAULT_TARGET_DELIM):
def match(tgt, delimiter=DEFAULT_TARGET_DELIM, opts=None):
'''
Matches a grain based on regex
'''
if not opts:
opts = __opts__
log.debug('grains pcre target: %s', tgt)
if delimiter not in tgt:
log.error('Got insufficient arguments for grains pcre match '
'statement from master')
return False
return salt.utils.data.subdict_match(
__opts__['grains'], tgt, delimiter=delimiter, regex_match=True)
opts['grains'], tgt, delimiter=delimiter, regex_match=True)

View file

@ -17,10 +17,13 @@ else:
log = logging.getLogger(__name__)
def match(tgt):
def match(tgt, opts=None):
'''
Matches based on IP address or CIDR notation
'''
if not opts:
opts = __opts__
try:
# Target is an address?
tgt = ipaddress.ip_address(tgt)
@ -33,7 +36,7 @@ def match(tgt):
return []
proto = 'ipv{0}'.format(tgt.version)
grains = __opts__['grains']
grains = opts['grains']
if proto not in grains:
match = False

View file

@ -8,23 +8,26 @@ import logging
log = logging.getLogger(__name__)
def match(tgt):
def match(tgt, opts=None):
'''
Determines if this host is on the list
'''
if not opts:
opts = __opts__
try:
if ',' + __opts__['id'] + ',' in tgt \
or tgt.startswith(__opts__['id'] + ',') \
or tgt.endswith(',' + __opts__['id']):
if ',' + opts['id'] + ',' in tgt \
or tgt.startswith(opts['id'] + ',') \
or tgt.endswith(',' + opts['id']):
return True
# tgt is a string, which we know because the if statement above did not
# cause one of the exceptions being caught. Therefore, look for an
# exact match. (e.g. salt -L foo test.ping)
return __opts__['id'] == tgt
return opts['id'] == tgt
except (AttributeError, TypeError):
# tgt is not a string, maybe it's a sequence type?
try:
return __opts__['id'] in tgt
return opts['id'] in tgt
except Exception:
# tgt was likely some invalid type
return False

View file

@ -6,16 +6,24 @@ from __future__ import absolute_import, print_function, unicode_literals
import salt.utils.minions # pylint: disable=3rd-party-module-not-gated
import salt.loader
import logging
log = logging.getLogger(__name__)
def match(tgt, nodegroups):
def match(tgt, nodegroups=None, opts=None):
'''
This is a compatibility matcher and is NOT called when using
nodegroups for remote execution, but is called when the nodegroups
matcher is used in states
'''
if not opts:
opts = __opts__
if not nodegroups:
log.debug('Nodegroup matcher called with no nodegroups.')
return False
if tgt in nodegroups:
matchers = salt.loader.matchers(__opts__)
matchers = salt.loader.matchers(opts)
return matchers['compound_match.match'](
salt.utils.minions.nodegroup_comp(tgt, nodegroups)
)

View file

@ -7,8 +7,11 @@ from __future__ import absolute_import, print_function, unicode_literals
import re
def match(tgt):
def match(tgt, opts=None):
'''
Returns true if the passed pcre regex matches
'''
return bool(re.match(tgt, __opts__['id']))
if not opts:
return bool(re.match(tgt, __opts__['id']))
else:
return bool(re.match(tgt, opts['id']))

View file

@ -10,16 +10,19 @@ import salt.utils.data # pylint: disable=3rd-party-module-not-gated
log = logging.getLogger(__name__)
def match(tgt, delimiter=':'):
def match(tgt, delimiter=':', opts=None):
'''
Reads in the pillar match, no globbing, no PCRE
'''
if not opts:
opts = __opts__
log.debug('pillar target: %s', tgt)
if delimiter not in tgt:
log.error('Got insufficient arguments for pillar match '
'statement from master')
return False
return salt.utils.data.subdict_match(__opts__['pillar'],
return salt.utils.data.subdict_match(opts['pillar'],
tgt,
delimiter=delimiter,
exact_match=True)

View file

@ -11,15 +11,17 @@ import salt.utils.data # pylint: disable=3rd-party-module-not-gated
log = logging.getLogger(__name__)
def match(tgt, delimiter=DEFAULT_TARGET_DELIM):
def match(tgt, delimiter=DEFAULT_TARGET_DELIM, opts=None):
'''
Reads in the pillar glob match
'''
if not opts:
opts = __opts__
log.debug('pillar target: %s', tgt)
if delimiter not in tgt:
log.error('Got insufficient arguments for pillar match '
'statement from master')
return False
return salt.utils.data.subdict_match(
__opts__['pillar'], tgt, delimiter=delimiter
opts['pillar'], tgt, delimiter=delimiter
)

View file

@ -11,15 +11,18 @@ import salt.utils.data # pylint: disable=3rd-party-module-not-gated
log = logging.getLogger(__name__)
def match(tgt, delimiter=DEFAULT_TARGET_DELIM):
def match(tgt, delimiter=DEFAULT_TARGET_DELIM, opts=None):
'''
Reads in the pillar pcre match
'''
if not opts:
opts = __opts__
log.debug('pillar PCRE target: %s', tgt)
if delimiter not in tgt:
log.error('Got insufficient arguments for pillar PCRE match '
'statement from master')
return False
return salt.utils.data.subdict_match(
__opts__['pillar'], tgt, delimiter=delimiter, regex_match=True
opts['pillar'], tgt, delimiter=delimiter, regex_match=True
)

View file

@ -16,14 +16,16 @@ except ImportError:
log = logging.getLogger(__name__)
def match(tgt):
def match(tgt, opts=None):
'''
Matches based on range cluster
'''
if not opts:
opts = __opts__
if HAS_RANGE:
range_ = seco.range.Range(__opts__['range_server'])
range_ = seco.range.Range(opts['range_server'])
try:
return __opts__['grains']['fqdn'] in range_.expand(tgt)
return opts['grains']['fqdn'] in range_.expand(tgt)
except seco.range.RangeException as exc:
log.debug('Range exception in compound match: %s', exc)
return False

799
salt/metaproxy/proxy.py Normal file
View file

@ -0,0 +1,799 @@
# -*- coding: utf-8 -*-
#
# Proxy minion metaproxy modules
#
from __future__ import absolute_import, print_function, with_statement, unicode_literals
import os
import signal
import sys
import types
import logging
import threading
import traceback
# Import Salt Libs
# pylint: disable=3rd-party-module-not-gated
import salt
import salt.client
import salt.crypt
import salt.loader
import salt.beacons
import salt.engines
import salt.payload
import salt.pillar
import salt.syspaths
import salt.utils.args
import salt.utils.context
import salt.utils.data
import salt.utils.error
import salt.utils.event
import salt.utils.files
import salt.utils.jid
import salt.utils.minion
import salt.utils.minions
import salt.utils.network
import salt.utils.platform
import salt.utils.process
import salt.utils.schedule
import salt.utils.ssdp
import salt.utils.user
import salt.utils.zeromq
import salt.defaults.exitcodes
import salt.cli.daemons
import salt.log.setup
import salt.serializers.msgpack
import salt.minion
import salt.defaults.exitcodes
import salt.utils.dictupdate
from salt.utils.event import tagify
from salt.exceptions import (
CommandExecutionError,
CommandNotFoundError,
SaltInvocationError,
SaltSystemExit,
)
from salt.ext import six
from salt.ext.six.moves import range
from salt.minion import ProxyMinion
from salt.defaults import DEFAULT_TARGET_DELIM
from salt.utils.process import (default_signals,
SignalHandlingMultiprocessingProcess)
import tornado.gen # pylint: disable=F0401
import tornado.ioloop # pylint: disable=F0401
log = logging.getLogger(__name__)
def post_master_init(self, master):
log.debug("subclassed LazyLoaded _post_master_init")
if self.connected:
self.opts['master'] = master
self.opts['pillar'] = yield salt.pillar.get_async_pillar(
self.opts,
self.opts['grains'],
self.opts['id'],
saltenv=self.opts['saltenv'],
pillarenv=self.opts.get('pillarenv'),
).compile_pillar()
if 'proxy' not in self.opts['pillar'] and 'proxy' not in self.opts:
errmsg = 'No proxy key found in pillar or opts for id ' + self.opts['id'] + '. ' + \
'Check your pillar/opts configuration and contents. Salt-proxy aborted.'
log.error(errmsg)
self._running = False
raise SaltSystemExit(code=-1, msg=errmsg)
if 'proxy' not in self.opts:
self.opts['proxy'] = self.opts['pillar']['proxy']
if self.opts.get('proxy_merge_pillar_in_opts'):
# Override proxy opts with pillar data when the user required.
self.opts = salt.utils.dictupdate.merge(self.opts,
self.opts['pillar'],
strategy=self.opts.get('proxy_merge_pillar_in_opts_strategy'),
merge_lists=self.opts.get('proxy_deep_merge_pillar_in_opts', False))
elif self.opts.get('proxy_mines_pillar'):
# Even when not required, some details such as mine configuration
# should be merged anyway whenever possible.
if 'mine_interval' in self.opts['pillar']:
self.opts['mine_interval'] = self.opts['pillar']['mine_interval']
if 'mine_functions' in self.opts['pillar']:
general_proxy_mines = self.opts.get('mine_functions', [])
specific_proxy_mines = self.opts['pillar']['mine_functions']
try:
self.opts['mine_functions'] = general_proxy_mines + specific_proxy_mines
except TypeError as terr:
log.error('Unable to merge mine functions from the pillar in the opts, for proxy {}'.format(
self.opts['id']))
fq_proxyname = self.opts['proxy']['proxytype']
# Need to load the modules so they get all the dunder variables
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
# we can then sync any proxymodules down from the master
# we do a sync_all here in case proxy code was installed by
# SPM or was manually placed in /srv/salt/_modules etc.
self.functions['saltutil.sync_all'](saltenv=self.opts['saltenv'])
# Pull in the utils
self.utils = salt.loader.utils(self.opts)
# Then load the proxy module
self.proxy = salt.loader.proxy(self.opts, utils=self.utils)
# And re-load the modules so the __proxy__ variable gets injected
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
self.functions.pack['__proxy__'] = self.proxy
self.proxy.pack['__salt__'] = self.functions
self.proxy.pack['__ret__'] = self.returners
self.proxy.pack['__pillar__'] = self.opts['pillar']
# Reload utils as well (chicken and egg, __utils__ needs __proxy__ and __proxy__ needs __utils__
self.utils = salt.loader.utils(self.opts, proxy=self.proxy)
self.proxy.pack['__utils__'] = self.utils
# Reload all modules so all dunder variables are injected
self.proxy.reload_modules()
# Start engines here instead of in the Minion superclass __init__
# This is because we need to inject the __proxy__ variable but
# it is not setup until now.
self.io_loop.spawn_callback(salt.engines.start_engines, self.opts,
self.process_manager, proxy=self.proxy)
if ('{0}.init'.format(fq_proxyname) not in self.proxy
or '{0}.shutdown'.format(fq_proxyname) not in self.proxy):
errmsg = 'Proxymodule {0} is missing an init() or a shutdown() or both. '.format(fq_proxyname) + \
'Check your proxymodule. Salt-proxy aborted.'
log.error(errmsg)
self._running = False
raise SaltSystemExit(code=-1, msg=errmsg)
self.module_executors = self.proxy.get('{0}.module_executors'.format(fq_proxyname), lambda: [])()
proxy_init_fn = self.proxy[fq_proxyname + '.init']
proxy_init_fn(self.opts)
self.opts['grains'] = salt.loader.grains(self.opts, proxy=self.proxy)
self.serial = salt.payload.Serial(self.opts)
self.mod_opts = self._prep_mod_opts()
self.matchers = salt.loader.matchers(self.opts)
self.beacons = salt.beacons.Beacon(self.opts, self.functions)
uid = salt.utils.user.get_uid(user=self.opts.get('user', None))
self.proc_dir = salt.minion.get_proc_dir(self.opts['cachedir'], uid=uid)
if self.connected and self.opts['pillar']:
# The pillar has changed due to the connection to the master.
# Reload the functions so that they can use the new pillar data.
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
if hasattr(self, 'schedule'):
self.schedule.functions = self.functions
self.schedule.returners = self.returners
if not hasattr(self, 'schedule'):
self.schedule = salt.utils.schedule.Schedule(
self.opts,
self.functions,
self.returners,
cleanup=[salt.minion.master_event(type='alive')],
proxy=self.proxy)
# add default scheduling jobs to the minions scheduler
if self.opts['mine_enabled'] and 'mine.update' in self.functions:
self.schedule.add_job({
'__mine_interval':
{
'function': 'mine.update',
'minutes': self.opts['mine_interval'],
'jid_include': True,
'maxrunning': 2,
'run_on_start': True,
'return_job': self.opts.get('mine_return_job', False)
}
}, persist=True)
log.info('Added mine.update to scheduler')
else:
self.schedule.delete_job('__mine_interval', persist=True)
# add master_alive job if enabled
if (self.opts['transport'] != 'tcp' and
self.opts['master_alive_interval'] > 0):
self.schedule.add_job({
salt.minion.master_event(type='alive', master=self.opts['master']):
{
'function': 'status.master',
'seconds': self.opts['master_alive_interval'],
'jid_include': True,
'maxrunning': 1,
'return_job': False,
'kwargs': {'master': self.opts['master'],
'connected': True}
}
}, persist=True)
if self.opts['master_failback'] and \
'master_list' in self.opts and \
self.opts['master'] != self.opts['master_list'][0]:
self.schedule.add_job({
salt.minion.master_event(type='failback'):
{
'function': 'status.ping_master',
'seconds': self.opts['master_failback_interval'],
'jid_include': True,
'maxrunning': 1,
'return_job': False,
'kwargs': {'master': self.opts['master_list'][0]}
}
}, persist=True)
else:
self.schedule.delete_job(salt.minion.master_event(type='failback'), persist=True)
else:
self.schedule.delete_job(salt.minion.master_event(type='alive', master=self.opts['master']), persist=True)
self.schedule.delete_job(salt.minion.master_event(type='failback'), persist=True)
# proxy keepalive
proxy_alive_fn = fq_proxyname+'.alive'
if (proxy_alive_fn in self.proxy
and 'status.proxy_reconnect' in self.functions
and self.opts.get('proxy_keep_alive', True)):
# if `proxy_keep_alive` is either not specified, either set to False does not retry reconnecting
self.schedule.add_job({
'__proxy_keepalive':
{
'function': 'status.proxy_reconnect',
'minutes': self.opts.get('proxy_keep_alive_interval', 1), # by default, check once per minute
'jid_include': True,
'maxrunning': 1,
'return_job': False,
'kwargs': {
'proxy_name': fq_proxyname
}
}
}, persist=True)
self.schedule.enable_schedule()
else:
self.schedule.delete_job('__proxy_keepalive', persist=True)
# Sync the grains here so the proxy can communicate them to the master
self.functions['saltutil.sync_grains'](saltenv='base')
self.grains_cache = self.opts['grains']
self.ready = True
def target(cls, minion_instance, opts, data, connected):
if not minion_instance:
minion_instance = cls(opts)
minion_instance.connected = connected
if not hasattr(minion_instance, 'functions'):
# Need to load the modules so they get all the dunder variables
functions, returners, function_errors, executors = (
minion_instance._load_modules(grains=opts['grains'])
)
minion_instance.functions = functions
minion_instance.returners = returners
minion_instance.function_errors = function_errors
minion_instance.executors = executors
# Pull in the utils
minion_instance.utils = salt.loader.utils(minion_instance.opts)
# Then load the proxy module
minion_instance.proxy = salt.loader.proxy(minion_instance.opts, utils=minion_instance.utils)
# And re-load the modules so the __proxy__ variable gets injected
functions, returners, function_errors, executors = (
minion_instance._load_modules(grains=opts['grains'])
)
minion_instance.functions = functions
minion_instance.returners = returners
minion_instance.function_errors = function_errors
minion_instance.executors = executors
minion_instance.functions.pack['__proxy__'] = minion_instance.proxy
minion_instance.proxy.pack['__salt__'] = minion_instance.functions
minion_instance.proxy.pack['__ret__'] = minion_instance.returners
minion_instance.proxy.pack['__pillar__'] = minion_instance.opts['pillar']
# Reload utils as well (chicken and egg, __utils__ needs __proxy__ and __proxy__ needs __utils__
minion_instance.utils = salt.loader.utils(minion_instance.opts, proxy=minion_instance.proxy)
minion_instance.proxy.pack['__utils__'] = minion_instance.utils
# Reload all modules so all dunder variables are injected
minion_instance.proxy.reload_modules()
fq_proxyname = opts['proxy']['proxytype']
minion_instance.module_executors = minion_instance.proxy.get('{0}.module_executors'.format(fq_proxyname), lambda: [])()
proxy_init_fn = minion_instance.proxy[fq_proxyname + '.init']
proxy_init_fn(opts)
if not hasattr(minion_instance, 'serial'):
minion_instance.serial = salt.payload.Serial(opts)
if not hasattr(minion_instance, 'proc_dir'):
uid = salt.utils.user.get_uid(user=opts.get('user', None))
minion_instance.proc_dir = (
salt.minion.get_proc_dir(opts['cachedir'], uid=uid)
)
with tornado.stack_context.StackContext(minion_instance.ctx):
if isinstance(data['fun'], tuple) or isinstance(data['fun'], list):
ProxyMinion._thread_multi_return(minion_instance, opts, data)
else:
ProxyMinion._thread_return(minion_instance, opts, data)
def thread_return(cls, minion_instance, opts, data):
'''
This method should be used as a threading target, start the actual
minion side execution.
'''
fn_ = os.path.join(minion_instance.proc_dir, data['jid'])
if opts['multiprocessing'] and not salt.utils.platform.is_windows():
# Shutdown the multiprocessing before daemonizing
salt.log.setup.shutdown_multiprocessing_logging()
salt.utils.process.daemonize_if(opts)
# Reconfigure multiprocessing logging after daemonizing
salt.log.setup.setup_multiprocessing_logging()
salt.utils.process.appendproctitle('{0}._thread_return {1}'.format(cls.__name__, data['jid']))
sdata = {'pid': os.getpid()}
sdata.update(data)
log.info('Starting a new job with PID %s', sdata['pid'])
with salt.utils.files.fopen(fn_, 'w+b') as fp_:
fp_.write(minion_instance.serial.dumps(sdata))
ret = {'success': False}
function_name = data['fun']
executors = data.get('module_executors') or \
getattr(minion_instance, 'module_executors', []) or \
opts.get('module_executors', ['direct_call'])
allow_missing_funcs = any([
minion_instance.executors['{0}.allow_missing_func'.format(executor)](function_name)
for executor in executors
if '{0}.allow_missing_func' in minion_instance.executors
])
if function_name in minion_instance.functions or allow_missing_funcs is True:
try:
minion_blackout_violation = False
if minion_instance.connected and minion_instance.opts['pillar'].get('minion_blackout', False):
whitelist = minion_instance.opts['pillar'].get('minion_blackout_whitelist', [])
# this minion is blacked out. Only allow saltutil.refresh_pillar and the whitelist
if function_name != 'saltutil.refresh_pillar' and function_name not in whitelist:
minion_blackout_violation = True
# use minion_blackout_whitelist from grains if it exists
if minion_instance.opts['grains'].get('minion_blackout', False):
whitelist = minion_instance.opts['grains'].get('minion_blackout_whitelist', [])
if function_name != 'saltutil.refresh_pillar' and function_name not in whitelist:
minion_blackout_violation = True
if minion_blackout_violation:
raise SaltInvocationError('Minion in blackout mode. Set \'minion_blackout\' '
'to False in pillar or grains to resume operations. Only '
'saltutil.refresh_pillar allowed in blackout mode.')
if function_name in minion_instance.functions:
func = minion_instance.functions[function_name]
args, kwargs = salt.minion.load_args_and_kwargs(
func,
data['arg'],
data)
else:
# only run if function_name is not in minion_instance.functions and allow_missing_funcs is True
func = function_name
args, kwargs = data['arg'], data
minion_instance.functions.pack['__context__']['retcode'] = 0
if isinstance(executors, six.string_types):
executors = [executors]
elif not isinstance(executors, list) or not executors:
raise SaltInvocationError("Wrong executors specification: {0}. String or non-empty list expected".
format(executors))
if opts.get('sudo_user', '') and executors[-1] != 'sudo':
executors[-1] = 'sudo' # replace the last one with sudo
log.trace('Executors list %s', executors) # pylint: disable=no-member
for name in executors:
fname = '{0}.execute'.format(name)
if fname not in minion_instance.executors:
raise SaltInvocationError("Executor '{0}' is not available".format(name))
return_data = minion_instance.executors[fname](opts, data, func, args, kwargs)
if return_data is not None:
break
if isinstance(return_data, types.GeneratorType):
ind = 0
iret = {}
for single in return_data:
if isinstance(single, dict) and isinstance(iret, dict):
iret.update(single)
else:
if not iret:
iret = []
iret.append(single)
tag = tagify([data['jid'], 'prog', opts['id'], six.text_type(ind)], 'job')
event_data = {'return': single}
minion_instance._fire_master(event_data, tag)
ind += 1
ret['return'] = iret
else:
ret['return'] = return_data
retcode = minion_instance.functions.pack['__context__'].get(
'retcode',
salt.defaults.exitcodes.EX_OK
)
if retcode == salt.defaults.exitcodes.EX_OK:
# No nonzero retcode in __context__ dunder. Check if return
# is a dictionary with a "result" or "success" key.
try:
func_result = all(return_data.get(x, True)
for x in ('result', 'success'))
except Exception:
# return data is not a dict
func_result = True
if not func_result:
retcode = salt.defaults.exitcodes.EX_GENERIC
ret['retcode'] = retcode
ret['success'] = retcode == salt.defaults.exitcodes.EX_OK
except CommandNotFoundError as exc:
msg = 'Command required for \'{0}\' not found'.format(
function_name
)
log.debug(msg, exc_info=True)
ret['return'] = '{0}: {1}'.format(msg, exc)
ret['out'] = 'nested'
ret['retcode'] = salt.defaults.exitcodes.EX_GENERIC
except CommandExecutionError as exc:
log.error(
'A command in \'%s\' had a problem: %s',
function_name, exc,
exc_info_on_loglevel=logging.DEBUG
)
ret['return'] = 'ERROR: {0}'.format(exc)
ret['out'] = 'nested'
ret['retcode'] = salt.defaults.exitcodes.EX_GENERIC
except SaltInvocationError as exc:
log.error(
'Problem executing \'%s\': %s',
function_name, exc,
exc_info_on_loglevel=logging.DEBUG
)
ret['return'] = 'ERROR executing \'{0}\': {1}'.format(
function_name, exc
)
ret['out'] = 'nested'
ret['retcode'] = salt.defaults.exitcodes.EX_GENERIC
except TypeError as exc:
msg = 'Passed invalid arguments to {0}: {1}\n{2}'.format(
function_name, exc, func.__doc__ or ''
)
log.warning(msg, exc_info_on_loglevel=logging.DEBUG)
ret['return'] = msg
ret['out'] = 'nested'
ret['retcode'] = salt.defaults.exitcodes.EX_GENERIC
except Exception:
msg = 'The minion function caused an exception'
log.warning(msg, exc_info_on_loglevel=True)
salt.utils.error.fire_exception(salt.exceptions.MinionError(msg), opts, job=data)
ret['return'] = '{0}: {1}'.format(msg, traceback.format_exc())
ret['out'] = 'nested'
ret['retcode'] = salt.defaults.exitcodes.EX_GENERIC
else:
docs = minion_instance.functions['sys.doc']('{0}*'.format(function_name))
if docs:
docs[function_name] = minion_instance.functions.missing_fun_string(function_name)
ret['return'] = docs
else:
ret['return'] = minion_instance.functions.missing_fun_string(function_name)
mod_name = function_name.split('.')[0]
if mod_name in minion_instance.function_errors:
ret['return'] += ' Possible reasons: \'{0}\''.format(
minion_instance.function_errors[mod_name]
)
ret['success'] = False
ret['retcode'] = salt.defaults.exitcodes.EX_GENERIC
ret['out'] = 'nested'
ret['jid'] = data['jid']
ret['fun'] = data['fun']
ret['fun_args'] = data['arg']
if 'master_id' in data:
ret['master_id'] = data['master_id']
if 'metadata' in data:
if isinstance(data['metadata'], dict):
ret['metadata'] = data['metadata']
else:
log.warning('The metadata parameter must be a dictionary. Ignoring.')
if minion_instance.connected:
minion_instance._return_pub(
ret,
timeout=minion_instance._return_retry_timer()
)
# Add default returners from minion config
# Should have been coverted to comma-delimited string already
if isinstance(opts.get('return'), six.string_types):
if data['ret']:
data['ret'] = ','.join((data['ret'], opts['return']))
else:
data['ret'] = opts['return']
log.debug('minion return: %s', ret)
# TODO: make a list? Seems odd to split it this late :/
if data['ret'] and isinstance(data['ret'], six.string_types):
if 'ret_config' in data:
ret['ret_config'] = data['ret_config']
if 'ret_kwargs' in data:
ret['ret_kwargs'] = data['ret_kwargs']
ret['id'] = opts['id']
for returner in set(data['ret'].split(',')):
try:
returner_str = '{0}.returner'.format(returner)
if returner_str in minion_instance.returners:
minion_instance.returners[returner_str](ret)
else:
returner_err = minion_instance.returners.missing_fun_string(returner_str)
log.error(
'Returner %s could not be loaded: %s',
returner_str, returner_err
)
except Exception as exc:
log.exception(
'The return failed for job %s: %s', data['jid'], exc
)
def thread_multi_return(cls, minion_instance, opts, data):
'''
This method should be used as a threading target, start the actual
minion side execution.
'''
fn_ = os.path.join(minion_instance.proc_dir, data['jid'])
if opts['multiprocessing'] and not salt.utils.platform.is_windows():
# Shutdown the multiprocessing before daemonizing
salt.log.setup.shutdown_multiprocessing_logging()
salt.utils.process.daemonize_if(opts)
# Reconfigure multiprocessing logging after daemonizing
salt.log.setup.setup_multiprocessing_logging()
salt.utils.process.appendproctitle('{0}._thread_multi_return {1}'.format(cls.__name__, data['jid']))
sdata = {'pid': os.getpid()}
sdata.update(data)
log.info('Starting a new job with PID %s', sdata['pid'])
with salt.utils.files.fopen(fn_, 'w+b') as fp_:
fp_.write(minion_instance.serial.dumps(sdata))
multifunc_ordered = opts.get('multifunc_ordered', False)
num_funcs = len(data['fun'])
if multifunc_ordered:
ret = {
'return': [None] * num_funcs,
'retcode': [None] * num_funcs,
'success': [False] * num_funcs
}
else:
ret = {
'return': {},
'retcode': {},
'success': {}
}
for ind in range(0, num_funcs):
if not multifunc_ordered:
ret['success'][data['fun'][ind]] = False
try:
minion_blackout_violation = False
if minion_instance.connected and minion_instance.opts['pillar'].get('minion_blackout', False):
whitelist = minion_instance.opts['pillar'].get('minion_blackout_whitelist', [])
# this minion is blacked out. Only allow saltutil.refresh_pillar and the whitelist
if data['fun'][ind] != 'saltutil.refresh_pillar' and data['fun'][ind] not in whitelist:
minion_blackout_violation = True
elif minion_instance.opts['grains'].get('minion_blackout', False):
whitelist = minion_instance.opts['grains'].get('minion_blackout_whitelist', [])
if data['fun'][ind] != 'saltutil.refresh_pillar' and data['fun'][ind] not in whitelist:
minion_blackout_violation = True
if minion_blackout_violation:
raise SaltInvocationError('Minion in blackout mode. Set \'minion_blackout\' '
'to False in pillar or grains to resume operations. Only '
'saltutil.refresh_pillar allowed in blackout mode.')
func = minion_instance.functions[data['fun'][ind]]
args, kwargs = salt.minion.load_args_and_kwargs(
func,
data['arg'][ind],
data)
minion_instance.functions.pack['__context__']['retcode'] = 0
key = ind if multifunc_ordered else data['fun'][ind]
ret['return'][key] = func(*args, **kwargs)
retcode = minion_instance.functions.pack['__context__'].get(
'retcode',
0
)
if retcode == 0:
# No nonzero retcode in __context__ dunder. Check if return
# is a dictionary with a "result" or "success" key.
try:
func_result = all(ret['return'][key].get(x, True)
for x in ('result', 'success'))
except Exception:
# return data is not a dict
func_result = True
if not func_result:
retcode = 1
ret['retcode'][key] = retcode
ret['success'][key] = retcode == 0
except Exception as exc:
trb = traceback.format_exc()
log.warning('The minion function caused an exception: %s', exc)
if multifunc_ordered:
ret['return'][ind] = trb
else:
ret['return'][data['fun'][ind]] = trb
ret['jid'] = data['jid']
ret['fun'] = data['fun']
ret['fun_args'] = data['arg']
if 'metadata' in data:
ret['metadata'] = data['metadata']
if minion_instance.connected:
minion_instance._return_pub(
ret,
timeout=minion_instance._return_retry_timer()
)
if data['ret']:
if 'ret_config' in data:
ret['ret_config'] = data['ret_config']
if 'ret_kwargs' in data:
ret['ret_kwargs'] = data['ret_kwargs']
for returner in set(data['ret'].split(',')):
ret['id'] = opts['id']
try:
minion_instance.returners['{0}.returner'.format(
returner
)](ret)
except Exception as exc:
log.error(
'The return failed for job %s: %s',
data['jid'], exc
)
def handle_payload(self, payload):
if payload is not None and payload['enc'] == 'aes':
if self._target_load(payload['load']):
self._handle_decoded_payload(payload['load'])
elif self.opts['zmq_filtering']:
# In the filtering enabled case, we'd like to know when minion sees something it shouldnt
log.trace(
'Broadcast message received not for this minion, Load: %s',
payload['load']
)
# If it's not AES, and thus has not been verified, we do nothing.
# In the future, we could add support for some clearfuncs, but
# the minion currently has no need.
def handle_decoded_payload(self, data):
'''
Override this method if you wish to handle the decoded data
differently.
'''
# Ensure payload is unicode. Disregard failure to decode binary blobs.
if six.PY2:
data = salt.utils.data.decode(data, keep=True)
if 'user' in data:
log.info(
'User %s Executing command %s with jid %s',
data['user'], data['fun'], data['jid']
)
else:
log.info(
'Executing command %s with jid %s',
data['fun'], data['jid']
)
log.debug('Command details %s', data)
# Don't duplicate jobs
log.trace('Started JIDs: %s', self.jid_queue)
if self.jid_queue is not None:
if data['jid'] in self.jid_queue:
return
else:
self.jid_queue.append(data['jid'])
if len(self.jid_queue) > self.opts['minion_jid_queue_hwm']:
self.jid_queue.pop(0)
if isinstance(data['fun'], six.string_types):
if data['fun'] == 'sys.reload_modules':
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
self.schedule.functions = self.functions
self.schedule.returners = self.returners
process_count_max = self.opts.get('process_count_max')
if process_count_max > 0:
process_count = len(salt.utils.minion.running(self.opts))
while process_count >= process_count_max:
log.warning("Maximum number of processes reached while executing jid {0}, waiting...".format(data['jid']))
yield tornado.gen.sleep(10)
process_count = len(salt.utils.minion.running(self.opts))
# We stash an instance references to allow for the socket
# communication in Windows. You can't pickle functions, and thus
# python needs to be able to reconstruct the reference on the other
# side.
instance = self
multiprocessing_enabled = self.opts.get('multiprocessing', True)
if multiprocessing_enabled:
if sys.platform.startswith('win'):
# let python reconstruct the minion on the other side if we're
# running on windows
instance = None
with default_signals(signal.SIGINT, signal.SIGTERM):
process = SignalHandlingMultiprocessingProcess(
target=self._target, args=(instance, self.opts, data, self.connected)
)
else:
process = threading.Thread(
target=self._target,
args=(instance, self.opts, data, self.connected),
name=data['jid']
)
if multiprocessing_enabled:
with default_signals(signal.SIGINT, signal.SIGTERM):
# Reset current signals before starting the process in
# order not to inherit the current signal handlers
process.start()
else:
process.start()
# TODO: remove the windows specific check?
if multiprocessing_enabled and not salt.utils.platform.is_windows():
# we only want to join() immediately if we are daemonizing a process
process.join()
else:
self.win_proc.append(process)
def target_load(self, load):
# Verify that the publication is valid
if 'tgt' not in load or 'jid' not in load or 'fun' not in load \
or 'arg' not in load:
return False
# Verify that the publication applies to this minion
# It's important to note that the master does some pre-processing
# to determine which minions to send a request to. So for example,
# a "salt -G 'grain_key:grain_val' test.ping" will invoke some
# pre-processing on the master and this minion should not see the
# publication if the master does not determine that it should.
if 'tgt_type' in load:
match_func = self.matchers.get('{0}_match.match'.format(load['tgt_type']), None)
if match_func is None:
return False
if load['tgt_type'] in ('grain', 'grain_pcre', 'pillar'):
delimiter = load.get('delimiter', DEFAULT_TARGET_DELIM)
if not match_func(load['tgt'], delimiter=delimiter):
return False
elif not match_func(load['tgt']):
return False
else:
if not self.matchers['glob_match.match'](load['tgt']):
return False
return True

View file

@ -21,7 +21,6 @@ from random import randint, shuffle
from stat import S_IMODE
import salt.serializers.msgpack
from binascii import crc32
# Import Salt Libs
# pylint: disable=import-error,no-name-in-module,redefined-builtin
from salt.ext import six
@ -1329,26 +1328,30 @@ class Minion(MinionBase):
mod_opts[key] = val
return mod_opts
def _load_modules(self, force_refresh=False, notify=False, grains=None):
def _load_modules(self, force_refresh=False, notify=False, grains=None, opts=None):
'''
Return the functions and the returners loaded up from the loader
module
'''
opt_in = True
if not opts:
opts = self.opts
opt_in = False
# if this is a *nix system AND modules_max_memory is set, lets enforce
# a memory limit on module imports
# this feature ONLY works on *nix like OSs (resource module doesn't work on windows)
modules_max_memory = False
if self.opts.get('modules_max_memory', -1) > 0 and HAS_PSUTIL and HAS_RESOURCE:
if opts.get('modules_max_memory', -1) > 0 and HAS_PSUTIL and HAS_RESOURCE:
log.debug(
'modules_max_memory set, enforcing a maximum of %s',
self.opts['modules_max_memory']
opts['modules_max_memory']
)
modules_max_memory = True
old_mem_limit = resource.getrlimit(resource.RLIMIT_AS)
rss, vms = psutil.Process(os.getpid()).memory_info()[:2]
mem_limit = rss + vms + self.opts['modules_max_memory']
mem_limit = rss + vms + opts['modules_max_memory']
resource.setrlimit(resource.RLIMIT_AS, (mem_limit, mem_limit))
elif self.opts.get('modules_max_memory', -1) > 0:
elif opts.get('modules_max_memory', -1) > 0:
if not HAS_PSUTIL:
log.error('Unable to enforce modules_max_memory because psutil is missing')
if not HAS_RESOURCE:
@ -1361,16 +1364,16 @@ class Minion(MinionBase):
proxy = None
if grains is None:
self.opts['grains'] = salt.loader.grains(self.opts, force_refresh, proxy=proxy)
self.utils = salt.loader.utils(self.opts, proxy=proxy)
opts['grains'] = salt.loader.grains(opts, force_refresh, proxy=proxy)
self.utils = salt.loader.utils(opts, proxy=proxy)
if self.opts.get('multimaster', False):
s_opts = copy.deepcopy(self.opts)
if opts.get('multimaster', False):
s_opts = copy.deepcopy(opts)
functions = salt.loader.minion_mods(s_opts, utils=self.utils, proxy=proxy,
loaded_base_name=self.loaded_base_name, notify=notify)
else:
functions = salt.loader.minion_mods(self.opts, utils=self.utils, notify=notify, proxy=proxy)
returners = salt.loader.returners(self.opts, functions, proxy=proxy)
functions = salt.loader.minion_mods(opts, utils=self.utils, notify=notify, proxy=proxy)
returners = salt.loader.returners(opts, functions, proxy=proxy)
errors = {}
if '_errors' in functions:
errors = functions['_errors']
@ -1380,7 +1383,10 @@ class Minion(MinionBase):
if modules_max_memory is True:
resource.setrlimit(resource.RLIMIT_AS, old_mem_limit)
executors = salt.loader.executors(self.opts, functions, proxy=proxy)
executors = salt.loader.executors(opts, functions, proxy=proxy)
if opt_in:
self.opts = opts
return functions, returners, errors, executors
@ -3301,6 +3307,21 @@ class ProxyMinionManager(MinionManager):
jid_queue=jid_queue)
def _metaproxy_call(opts, fn_name):
log.debug('=== here ====')
metaproxy = salt.loader.metaproxy(opts)
try:
metaproxy_name = opts['metaproxy']
except KeyError:
metaproxy_name = 'proxy'
errmsg = 'No metaproxy key found in opts for id ' + opts['id'] + '. ' + \
'Defaulting to standard proxy minion'
log.trace(errmsg)
metaproxy_fn = metaproxy_name + '.' + fn_name
return metaproxy[metaproxy_fn]
class ProxyMinion(Minion):
'''
This class instantiates a 'proxy' minion--a minion that does not manipulate
@ -3323,265 +3344,40 @@ class ProxyMinion(Minion):
which is why the differences are not factored out into separate helper
functions.
'''
log.debug("subclassed _post_master_init")
mp_call = _metaproxy_call(self.opts, 'post_master_init')
return mp_call(self, master)
if self.connected:
self.opts['master'] = master
def _target_load(self, load):
'''
Verify that the publication is valid and applies to this minion
'''
mp_call = _metaproxy_call(self.opts, 'target_load')
return mp_call(self, load)
async_pillar = salt.pillar.get_async_pillar(
self.opts,
self.opts['grains'],
self.opts['id'],
saltenv=self.opts['saltenv'],
pillarenv=self.opts.get('pillarenv'),
)
self.opts['pillar'] = yield async_pillar.compile_pillar()
async_pillar.destroy()
def _handle_payload(self, payload):
mp_call = _metaproxy_call(self.opts, 'handle_payload')
return mp_call(self, payload)
if 'proxy' not in self.opts['pillar'] and 'proxy' not in self.opts:
errmsg = 'No proxy key found in pillar or opts for id ' + self.opts['id'] + '. ' + \
'Check your pillar/opts configuration and contents. Salt-proxy aborted.'
log.error(errmsg)
self._running = False
raise SaltSystemExit(code=-1, msg=errmsg)
if 'proxy' not in self.opts:
self.opts['proxy'] = self.opts['pillar']['proxy']
if self.opts.get('proxy_merge_pillar_in_opts'):
# Override proxy opts with pillar data when the user required.
self.opts = salt.utils.dictupdate.merge(self.opts,
self.opts['pillar'],
strategy=self.opts.get('proxy_merge_pillar_in_opts_strategy'),
merge_lists=self.opts.get('proxy_deep_merge_pillar_in_opts', False))
elif self.opts.get('proxy_mines_pillar'):
# Even when not required, some details such as mine configuration
# should be merged anyway whenever possible.
if 'mine_interval' in self.opts['pillar']:
self.opts['mine_interval'] = self.opts['pillar']['mine_interval']
if 'mine_functions' in self.opts['pillar']:
general_proxy_mines = self.opts.get('mine_functions', [])
specific_proxy_mines = self.opts['pillar']['mine_functions']
try:
self.opts['mine_functions'] = general_proxy_mines + specific_proxy_mines
except TypeError as terr:
log.error('Unable to merge mine functions from the pillar in the opts, for proxy {}'.format(
self.opts['id']))
fq_proxyname = self.opts['proxy']['proxytype']
# Need to load the modules so they get all the dunder variables
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
# we can then sync any proxymodules down from the master
# we do a sync_all here in case proxy code was installed by
# SPM or was manually placed in /srv/salt/_modules etc.
self.functions['saltutil.sync_all'](saltenv=self.opts['saltenv'])
# Pull in the utils
self.utils = salt.loader.utils(self.opts)
# Then load the proxy module
self.proxy = salt.loader.proxy(self.opts, utils=self.utils)
# And re-load the modules so the __proxy__ variable gets injected
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
self.functions.pack['__proxy__'] = self.proxy
self.proxy.pack['__salt__'] = self.functions
self.proxy.pack['__ret__'] = self.returners
self.proxy.pack['__pillar__'] = self.opts['pillar']
# Reload utils as well (chicken and egg, __utils__ needs __proxy__ and __proxy__ needs __utils__
self.utils = salt.loader.utils(self.opts, proxy=self.proxy)
self.proxy.pack['__utils__'] = self.utils
# Reload all modules so all dunder variables are injected
self.proxy.reload_modules()
# Start engines here instead of in the Minion superclass __init__
# This is because we need to inject the __proxy__ variable but
# it is not setup until now.
self.io_loop.spawn_callback(salt.engines.start_engines, self.opts,
self.process_manager, proxy=self.proxy)
if ('{0}.init'.format(fq_proxyname) not in self.proxy
or '{0}.shutdown'.format(fq_proxyname) not in self.proxy):
errmsg = 'Proxymodule {0} is missing an init() or a shutdown() or both. '.format(fq_proxyname) + \
'Check your proxymodule. Salt-proxy aborted.'
log.error(errmsg)
self._running = False
raise SaltSystemExit(code=-1, msg=errmsg)
self.module_executors = self.proxy.get('{0}.module_executors'.format(fq_proxyname), lambda: [])()
proxy_init_fn = self.proxy[fq_proxyname + '.init']
proxy_init_fn(self.opts)
self.opts['grains'] = salt.loader.grains(self.opts, proxy=self.proxy)
self.serial = salt.payload.Serial(self.opts)
self.mod_opts = self._prep_mod_opts()
self.matchers = salt.loader.matchers(self.opts)
self.beacons = salt.beacons.Beacon(self.opts, self.functions)
uid = salt.utils.user.get_uid(user=self.opts.get('user', None))
self.proc_dir = get_proc_dir(self.opts['cachedir'], uid=uid)
if self.connected and self.opts['pillar']:
# The pillar has changed due to the connection to the master.
# Reload the functions so that they can use the new pillar data.
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
if hasattr(self, 'schedule'):
self.schedule.functions = self.functions
self.schedule.returners = self.returners
if not hasattr(self, 'schedule'):
self.schedule = salt.utils.schedule.Schedule(
self.opts,
self.functions,
self.returners,
cleanup=[master_event(type='alive')],
proxy=self.proxy)
# add default scheduling jobs to the minions scheduler
if self.opts['mine_enabled'] and 'mine.update' in self.functions:
self.schedule.add_job({
'__mine_interval':
{
'function': 'mine.update',
'minutes': self.opts['mine_interval'],
'jid_include': True,
'maxrunning': 2,
'run_on_start': True,
'return_job': self.opts.get('mine_return_job', False)
}
}, persist=True)
log.info('Added mine.update to scheduler')
else:
self.schedule.delete_job('__mine_interval', persist=True)
# add master_alive job if enabled
if (self.opts['transport'] != 'tcp' and
self.opts['master_alive_interval'] > 0):
self.schedule.add_job({
master_event(type='alive', master=self.opts['master']):
{
'function': 'status.master',
'seconds': self.opts['master_alive_interval'],
'jid_include': True,
'maxrunning': 1,
'return_job': False,
'kwargs': {'master': self.opts['master'],
'connected': True}
}
}, persist=True)
if self.opts['master_failback'] and \
'master_list' in self.opts and \
self.opts['master'] != self.opts['master_list'][0]:
self.schedule.add_job({
master_event(type='failback'):
{
'function': 'status.ping_master',
'seconds': self.opts['master_failback_interval'],
'jid_include': True,
'maxrunning': 1,
'return_job': False,
'kwargs': {'master': self.opts['master_list'][0]}
}
}, persist=True)
else:
self.schedule.delete_job(master_event(type='failback'), persist=True)
else:
self.schedule.delete_job(master_event(type='alive', master=self.opts['master']), persist=True)
self.schedule.delete_job(master_event(type='failback'), persist=True)
# proxy keepalive
proxy_alive_fn = fq_proxyname+'.alive'
if (proxy_alive_fn in self.proxy
and 'status.proxy_reconnect' in self.functions
and self.opts.get('proxy_keep_alive', True)):
# if `proxy_keep_alive` is either not specified, either set to False does not retry reconnecting
self.schedule.add_job({
'__proxy_keepalive':
{
'function': 'status.proxy_reconnect',
'minutes': self.opts.get('proxy_keep_alive_interval', 1), # by default, check once per minute
'jid_include': True,
'maxrunning': 1,
'return_job': False,
'kwargs': {
'proxy_name': fq_proxyname
}
}
}, persist=True)
self.schedule.enable_schedule()
else:
self.schedule.delete_job('__proxy_keepalive', persist=True)
# Sync the grains here so the proxy can communicate them to the master
self.functions['saltutil.sync_grains'](saltenv='base')
self.grains_cache = self.opts['grains']
self.ready = True
@tornado.gen.coroutine
def _handle_decoded_payload(self, data):
mp_call = _metaproxy_call(self.opts, 'handle_decoded_payload')
return mp_call(self, data)
@classmethod
def _target(cls, minion_instance, opts, data, connected):
if not minion_instance:
minion_instance = cls(opts)
minion_instance.connected = connected
if not hasattr(minion_instance, 'functions'):
# Need to load the modules so they get all the dunder variables
functions, returners, function_errors, executors = (
minion_instance._load_modules(grains=opts['grains'])
)
minion_instance.functions = functions
minion_instance.returners = returners
minion_instance.function_errors = function_errors
minion_instance.executors = executors
# Pull in the utils
minion_instance.utils = salt.loader.utils(minion_instance.opts)
mp_call = _metaproxy_call(opts, 'target')
return mp_call(cls, minion_instance, opts, data, connected)
# Then load the proxy module
minion_instance.proxy = salt.loader.proxy(minion_instance.opts, utils=minion_instance.utils)
@classmethod
def _thread_return(cls, minion_instance, opts, data):
mp_call = _metaproxy_call(opts, 'thread_return')
return mp_call(cls, minion_instance, opts, data)
# And re-load the modules so the __proxy__ variable gets injected
functions, returners, function_errors, executors = (
minion_instance._load_modules(grains=opts['grains'])
)
minion_instance.functions = functions
minion_instance.returners = returners
minion_instance.function_errors = function_errors
minion_instance.executors = executors
minion_instance.functions.pack['__proxy__'] = minion_instance.proxy
minion_instance.proxy.pack['__salt__'] = minion_instance.functions
minion_instance.proxy.pack['__ret__'] = minion_instance.returners
minion_instance.proxy.pack['__pillar__'] = minion_instance.opts['pillar']
# Reload utils as well (chicken and egg, __utils__ needs __proxy__ and __proxy__ needs __utils__
minion_instance.utils = salt.loader.utils(minion_instance.opts, proxy=minion_instance.proxy)
minion_instance.proxy.pack['__utils__'] = minion_instance.utils
# Reload all modules so all dunder variables are injected
minion_instance.proxy.reload_modules()
fq_proxyname = opts['proxy']['proxytype']
minion_instance.module_executors = minion_instance.proxy.get('{0}.module_executors'.format(fq_proxyname), lambda: [])()
proxy_init_fn = minion_instance.proxy[fq_proxyname + '.init']
proxy_init_fn(opts)
if not hasattr(minion_instance, 'serial'):
minion_instance.serial = salt.payload.Serial(opts)
if not hasattr(minion_instance, 'proc_dir'):
uid = salt.utils.user.get_uid(user=opts.get('user', None))
minion_instance.proc_dir = (
get_proc_dir(opts['cachedir'], uid=uid)
)
with tornado.stack_context.StackContext(minion_instance.ctx):
if isinstance(data['fun'], tuple) or isinstance(data['fun'], list):
Minion._thread_multi_return(minion_instance, opts, data)
else:
Minion._thread_return(minion_instance, opts, data)
@classmethod
def _thread_multi_return(cls, minion_instance, opts, data):
mp_call = _metaproxy_call(opts, 'thread_multi_return')
return mp_call(cls, minion_instance, opts, data)
class SProxyMinion(SMinion):

View file

@ -47,7 +47,7 @@ def compound(tgt, minion_id=None):
opts = __opts__
matchers = salt.loader.matchers(opts)
try:
return matchers['compound_match.match'](tgt)
return matchers['compound_match.match'](tgt, opts=opts)
except Exception as exc:
log.exception(exc)
return False
@ -75,7 +75,7 @@ def ipcidr(tgt):
'''
matchers = salt.loader.matchers(__opts__)
try:
return matchers['ipcidr_match.match'](tgt)
return matchers['ipcidr_match.match'](tgt, opts=__opts__)
except Exception as exc:
log.exception(exc)
return False
@ -106,7 +106,7 @@ def pillar_pcre(tgt, delimiter=DEFAULT_TARGET_DELIM):
'''
matchers = salt.loader.matchers(__opts__)
try:
return matchers['pillar_pcre_match.match'](tgt, delimiter=delimiter)
return matchers['pillar_pcre_match.match'](tgt, delimiter=delimiter, opts=__opts__)
except Exception as exc:
log.exception(exc)
return False
@ -137,7 +137,7 @@ def pillar(tgt, delimiter=DEFAULT_TARGET_DELIM):
'''
matchers = salt.loader.matchers(__opts__)
try:
return matchers['pillar_match.match'](tgt, delimiter=delimiter)
return matchers['pillar_match.match'](tgt, delimiter=delimiter, opts=__opts__)
except Exception as exc:
log.exception(exc)
return False
@ -155,7 +155,7 @@ def data(tgt):
'''
matchers = salt.loader.matchers(__opts__)
try:
return matchers['data_match.match'](tgt)
return matchers['data_match.match'](tgt, opts=__opts__)
except Exception as exc:
log.exception(exc)
return False
@ -186,7 +186,7 @@ def grain_pcre(tgt, delimiter=DEFAULT_TARGET_DELIM):
'''
matchers = salt.loader.matchers(__opts__)
try:
return matchers['grain_pcre_match.match'](tgt, delimiter=delimiter)
return matchers['grain_pcre_match.match'](tgt, delimiter=delimiter, opts=__opts__)
except Exception as exc:
log.exception(exc)
return False
@ -217,7 +217,7 @@ def grain(tgt, delimiter=DEFAULT_TARGET_DELIM):
'''
matchers = salt.loader.matchers(__opts__)
try:
return matchers['grain_match.match'](tgt, delimiter=delimiter)
return matchers['grain_match.match'](tgt, delimiter=delimiter, opts=__opts__)
except Exception as exc:
log.exception(exc)
return False
@ -247,7 +247,7 @@ def list_(tgt, minion_id=None):
opts = __opts__
matchers = salt.loader.matchers(opts)
try:
return matchers['list_match.match'](tgt)
return matchers['list_match.match'](tgt, opts=__opts__)
except Exception as exc:
log.exception(exc)
return False
@ -277,7 +277,7 @@ def pcre(tgt, minion_id=None):
opts = __opts__
matchers = salt.loader.matchers(opts)
try:
return matchers['pcre_match.match'](tgt)
return matchers['pcre_match.match'](tgt, opts=__opts__)
except Exception as exc:
log.exception(exc)
return False
@ -308,7 +308,7 @@ def glob(tgt, minion_id=None):
matchers = salt.loader.matchers(opts)
try:
return matchers['glob_match.match'](tgt)
return matchers['glob_match.match'](tgt, opts=__opts__)
except Exception as exc:
log.exception(exc)
return False

View file

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
'''
This is a dummy proxy-minion designed for testing the proxy minion subsystem.
This is the a dummy proxy-minion designed for testing the proxy minion subsystem.
'''
from __future__ import absolute_import, print_function, unicode_literals

View file

@ -63,7 +63,7 @@ def alive(opts):
log.debug('=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-')
log.debug('proxys alive() fn called')
log.debug('=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-')
return False
return ping()
def id(opts):

View file

@ -736,6 +736,8 @@ class TestDaemon(object):
os.makedirs(RUNTIME_VARS.TMP_SUB_MINION_CONF_DIR)
os.makedirs(RUNTIME_VARS.TMP_SYNDIC_MASTER_CONF_DIR)
os.makedirs(RUNTIME_VARS.TMP_SYNDIC_MINION_CONF_DIR)
if not os.path.exists(RUNTIME_VARS.TMP):
os.makedirs(RUNTIME_VARS.TMP)
print(' * Transplanting configuration files to \'{0}\''.format(RUNTIME_VARS.TMP_CONF_DIR))
tests_known_hosts_file = os.path.join(RUNTIME_VARS.TMP_CONF_DIR, 'salt_ssh_known_hosts')
with salt.utils.files.fopen(tests_known_hosts_file, 'w') as known_hosts:
@ -823,10 +825,14 @@ class TestDaemon(object):
# This proxy connects to master
proxy_opts = salt.config._read_conf_file(os.path.join(CONF_DIR, 'proxy'))
proxy_opts['cachedir'] = os.path.join(TMP, 'rootdir-proxy', 'cache')
if not os.path.exists(proxy_opts['cachedir']):
os.makedirs(proxy_opts['cachedir'])
# proxy_opts['user'] = running_tests_user
proxy_opts['config_dir'] = RUNTIME_VARS.TMP_CONF_DIR
proxy_opts['root_dir'] = os.path.join(TMP, 'rootdir-proxy')
proxy_opts['pki_dir'] = os.path.join(TMP, 'rootdir-proxy', 'pki')
if not os.path.exists(proxy_opts['pki_dir']):
os.makedirs(proxy_opts['pki_dir'])
proxy_opts['hosts.file'] = os.path.join(TMP, 'rootdir-proxy', 'hosts')
proxy_opts['aliases.file'] = os.path.join(TMP, 'rootdir-proxy', 'aliases')

View file

@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-
'''
:codeauthor: Oleg Lipovchenko <oleg.lipovchenko@gmail.com>
'''
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Testing libs
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.unit import TestCase, skipIf
from tests.support.mock import (
MagicMock,
patch,
NO_MOCK,
NO_MOCK_REASON,
)
# Import Salt Libs
import salt.modules.match as match
import salt.matchers.compound_match as compound_match
import salt.matchers.glob_match as glob_match
import salt.matchers.list_match as list_match
MATCHERS_DICT = {
'compound_match.match': compound_match.match,
'glob_match.match': glob_match.match,
'list_match.match': list_match.match
}
# the name of the minion to be used for tests
MINION_ID = 'bar03'
@skipIf(NO_MOCK, NO_MOCK_REASON)
@patch('salt.loader.matchers', MagicMock(return_value=MATCHERS_DICT))
class MatchTestCase(TestCase, LoaderModuleMockMixin):
'''
This class contains a set of functions that test salt.modules.match.
'''
def setup_loader_modules(self):
return {
match: {
'__opts__': {
'extension_modules': '',
'id': MINION_ID
}
},
compound_match: {
'__opts__': {
'id': MINION_ID
}
},
glob_match: {
'__opts__': {'id': MINION_ID}
},
list_match: {
'__opts__': {'id': MINION_ID}
}
}
def test_filter_by(self):
'''
Tests if filter_by returns the correct dictionary.
'''
lookup = {
'foo*': {
'key1': 'fooval1', 'key2': 'fooval2'
},
'bar*': {
'key1': 'barval1', 'key2': 'barval2'
}
}
result = {'key1': 'barval1', 'key2': 'barval2'}
self.assertDictEqual(match.filter_by(lookup), result)
def test_watch_for_opts_mismatch_glob_match(self):
'''
Tests for situations where the glob matcher might reference __opts__ directly
instead of the local opts variable.
When metaproxies/proxy minions are in use, matchers get called with a different `opts`
dictionary. Inside the matchers we check to see if `opts` was passed
and use it instead of `__opts__`. If sometime in the future we update the matchers
and use `__opts__` directly this breaks proxy matching.
'''
self.assertTrue(glob_match.match('bar03'))
self.assertTrue(glob_match.match('rest03', {'id': 'rest03'}))
self.assertFalse(glob_match.match('rest03'))
def test_watch_for_opts_mismatch_list_match(self):
'''
Tests for situations where the list matcher might reference __opts__ directly
instead of the local opts variable
When metaproxies/proxy minions are in use, matchers get called with a different `opts`
dictionary. Inside the matchers we check to see if `opts` was passed
and use it instead of `__opts__`. If sometime in the future we update the matchers
and use `__opts__` directly this breaks proxy matching.
'''
self.assertTrue(list_match.match('bar03'))
self.assertTrue(list_match.match('rest03', {'id': 'rest03'}))
self.assertFalse(list_match.match('rest03'))
def test_watch_for_opts_mismatch_compound_match(self):
'''
Tests for situations where the compound matcher might reference __opts__ directly
instead of the local opts variable
When metaproxies/proxy minions are in use, matchers get called with a different `opts`
dictionary. Inside the matchers we check to see if `opts` was passed
and use it instead of `__opts__`. If sometime in the future we update the matchers
and use `__opts__` directly this breaks proxy matching.
'''
self.assertTrue(compound_match.match('L@bar03'))
self.assertTrue(compound_match.match('L@rest03', {'id': 'rest03'}))
self.assertFalse(compound_match.match('L@rest03'))

View file

@ -115,6 +115,7 @@ class BadTestModuleNamesTestCase(TestCase):
'unit.test_virtualname',
'unit.test_simple',
'unit.test_zypp_plugins',
'unit.test_proxy_minion',
'unit.cache.test_cache',
'unit.serializers.test_serializers',
'unit.states.test_postgres',

View file

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
'''
:codeauthor: Gareth J. Greenaway <gareth@saltstack.com>
'''
# Import python libs
from __future__ import absolute_import
import copy
import logging
import tornado
import tornado.testing
# Import Salt Testing libs
from tests.support.unit import TestCase, skipIf
from tests.support.mock import NO_MOCK, NO_MOCK_REASON
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
# Import salt libs
import salt.minion
import salt.syspaths
log = logging.getLogger(__name__)
__opts__ = {}
@skipIf(NO_MOCK, NO_MOCK_REASON)
class ProxyMinionTestCase(TestCase, AdaptedConfigurationTestCaseMixin):
def test_post_master_init_metaproxy_called(self):
'''
Tests that when the _post_master_ini function is called, _metaproxy_call is also called.
'''
mock_opts = salt.config.DEFAULT_MINION_OPTS
mock_jid_queue = [123]
proxy_minion = salt.minion.ProxyMinion(mock_opts, jid_queue=copy.copy(mock_jid_queue), io_loop=tornado.ioloop.IOLoop())
try:
ret = proxy_minion._post_master_init('dummy_master')
self.assert_called_once(salt.minion._metaproxy_call)
finally:
proxy_minion.destroy()
def test_handle_decoded_payload_metaproxy_called(self):
'''
Tests that when the _handle_decoded_payload function is called, _metaproxy_call is also called.
'''
mock_opts = salt.config.DEFAULT_MINION_OPTS
mock_data = {'fun': 'foo.bar',
'jid': 123}
mock_jid_queue = [123]
proxy_minion = salt.minion.ProxyMinion(mock_opts, jid_queue=copy.copy(mock_jid_queue), io_loop=tornado.ioloop.IOLoop())
try:
ret = proxy_minion._handle_decoded_payload(mock_data).result()
self.assertEqual(proxy_minion.jid_queue, mock_jid_queue)
self.assertIsNone(ret)
self.assert_called_once(salt.minion._metaproxy_call)
finally:
proxy_minion.destroy()
def test_handle_payload_metaproxy_called(self):
'''
Tests that when the _handle_payload function is called, _metaproxy_call is also called.
'''
mock_opts = salt.config.DEFAULT_MINION_OPTS
mock_data = {'fun': 'foo.bar',
'jid': 123}
mock_jid_queue = [123]
proxy_minion = salt.minion.ProxyMinion(mock_opts, jid_queue=copy.copy(mock_jid_queue), io_loop=tornado.ioloop.IOLoop())
try:
ret = proxy_minion._handle_decoded_payload(mock_data).result()
self.assertEqual(proxy_minion.jid_queue, mock_jid_queue)
self.assertIsNone(ret)
self.assert_called_once(salt.minion._metaproxy_call)
finally:
proxy_minion.destroy()