Merge pull request #43425 from garethgreenaway/42135_isolate_salt_scheduler

Isolate Salt Scheduler
This commit is contained in:
Nicole Thomas 2017-09-12 09:05:06 -04:00 committed by GitHub
commit f1a93401d1

View file

@ -385,7 +385,7 @@ class Schedule(object):
'''
instance = None
def __new__(cls, opts, functions, returners=None, intervals=None, cleanup=None, proxy=None):
def __new__(cls, opts, functions, returners=None, intervals=None, cleanup=None, proxy=None, standalone=False):
'''
Only create one instance of Schedule
'''
@ -395,33 +395,36 @@ class Schedule(object):
# it in a WeakValueDictionary-- which will remove the item if no one
# references it-- this forces a reference while we return to the caller
cls.instance = object.__new__(cls)
cls.instance.__singleton_init__(opts, functions, returners, intervals, cleanup, proxy)
cls.instance.__singleton_init__(opts, functions, returners, intervals, cleanup, proxy, standalone)
else:
log.debug('Re-using Schedule')
return cls.instance
# has to remain empty for singletons, since __init__ will *always* be called
def __init__(self, opts, functions, returners=None, intervals=None, cleanup=None, proxy=None):
def __init__(self, opts, functions, returners=None, intervals=None, cleanup=None, proxy=None, standalone=False):
pass
# an init for the singleton instance to call
def __singleton_init__(self, opts, functions, returners=None, intervals=None, cleanup=None, proxy=None):
def __singleton_init__(self, opts, functions, returners=None, intervals=None, cleanup=None, proxy=None, standalone=False):
self.opts = opts
self.proxy = proxy
self.functions = functions
self.standalone = standalone
if isinstance(intervals, dict):
self.intervals = intervals
else:
self.intervals = {}
if hasattr(returners, '__getitem__'):
self.returners = returners
else:
self.returners = returners.loader.gen_functions()
if not self.standalone:
if hasattr(returners, '__getitem__'):
self.returners = returners
else:
self.returners = returners.loader.gen_functions()
self.time_offset = self.functions.get('timezone.get_offset', lambda: '0000')()
self.schedule_returner = self.option('schedule_returner')
# Keep track of the lowest loop interval needed in this variable
self.loop_interval = six.MAXSIZE
clean_proc_dir(opts)
if not self.standalone:
clean_proc_dir(opts)
if cleanup:
for prefix in cleanup:
self.delete_job_prefix(prefix)
@ -778,36 +781,37 @@ class Schedule(object):
salt.utils.appendproctitle('{0} {1}'.format(self.__class__.__name__, ret['jid']))
proc_fn = os.path.join(
salt.minion.get_proc_dir(self.opts['cachedir']),
ret['jid']
)
if not self.standalone:
proc_fn = os.path.join(
salt.minion.get_proc_dir(self.opts['cachedir']),
ret['jid']
)
# Check to see if there are other jobs with this
# signature running. If there are more than maxrunning
# jobs present then don't start another.
# If jid_include is False for this job we can ignore all this
# NOTE--jid_include defaults to True, thus if it is missing from the data
# dict we treat it like it was there and is True
if 'jid_include' not in data or data['jid_include']:
jobcount = 0
for job in salt.utils.minion.running(self.opts):
if 'schedule' in job:
log.debug('schedule.handle_func: Checking job against '
'fun {0}: {1}'.format(ret['fun'], job))
if ret['schedule'] == job['schedule'] \
and salt.utils.process.os_is_running(job['pid']):
jobcount += 1
log.debug(
'schedule.handle_func: Incrementing jobcount, now '
'{0}, maxrunning is {1}'.format(
jobcount, data['maxrunning']))
if jobcount >= data['maxrunning']:
# Check to see if there are other jobs with this
# signature running. If there are more than maxrunning
# jobs present then don't start another.
# If jid_include is False for this job we can ignore all this
# NOTE--jid_include defaults to True, thus if it is missing from the data
# dict we treat it like it was there and is True
if 'jid_include' not in data or data['jid_include']:
jobcount = 0
for job in salt.utils.minion.running(self.opts):
if 'schedule' in job:
log.debug('schedule.handle_func: Checking job against '
'fun {0}: {1}'.format(ret['fun'], job))
if ret['schedule'] == job['schedule'] \
and salt.utils.process.os_is_running(job['pid']):
jobcount += 1
log.debug(
'schedule.handle_func: The scheduled job {0} '
'was not started, {1} already running'.format(
ret['schedule'], data['maxrunning']))
return False
'schedule.handle_func: Incrementing jobcount, now '
'{0}, maxrunning is {1}'.format(
jobcount, data['maxrunning']))
if jobcount >= data['maxrunning']:
log.debug(
'schedule.handle_func: The scheduled job {0} '
'was not started, {1} already running'.format(
ret['schedule'], data['maxrunning']))
return False
if multiprocessing_enabled and not salt.utils.platform.is_windows():
# Reconfigure multiprocessing logging after daemonizing
@ -820,12 +824,13 @@ class Schedule(object):
try:
ret['pid'] = os.getpid()
if 'jid_include' not in data or data['jid_include']:
log.debug('schedule.handle_func: adding this job to the jobcache '
'with data {0}'.format(ret))
# write this to /var/cache/salt/minion/proc
with salt.utils.files.fopen(proc_fn, 'w+b') as fp_:
fp_.write(salt.payload.Serial(self.opts).dumps(ret))
if not self.standalone:
if 'jid_include' not in data or data['jid_include']:
log.debug('schedule.handle_func: adding this job to the jobcache '
'with data {0}'.format(ret))
# write this to /var/cache/salt/minion/proc
with salt.utils.files.fopen(proc_fn, 'w+b') as fp_:
fp_.write(salt.payload.Serial(self.opts).dumps(ret))
args = tuple()
if 'args' in data:
@ -853,35 +858,36 @@ class Schedule(object):
ret['return'] = self.functions[func](*args, **kwargs)
# runners do not provide retcode
if 'retcode' in self.functions.pack['__context__']:
ret['retcode'] = self.functions.pack['__context__']['retcode']
if not self.standalone:
# runners do not provide retcode
if 'retcode' in self.functions.pack['__context__']:
ret['retcode'] = self.functions.pack['__context__']['retcode']
ret['success'] = True
ret['success'] = True
data_returner = data.get('returner', None)
if data_returner or self.schedule_returner:
if 'return_config' in data:
ret['ret_config'] = data['return_config']
if 'return_kwargs' in data:
ret['ret_kwargs'] = data['return_kwargs']
rets = []
for returner in [data_returner, self.schedule_returner]:
if isinstance(returner, six.string_types):
rets.append(returner)
elif isinstance(returner, list):
rets.extend(returner)
# simple de-duplication with order retained
for returner in OrderedDict.fromkeys(rets):
ret_str = '{0}.returner'.format(returner)
if ret_str in self.returners:
self.returners[ret_str](ret)
else:
log.info(
'Job {0} using invalid returner: {1}. Ignoring.'.format(
func, returner
data_returner = data.get('returner', None)
if data_returner or self.schedule_returner:
if 'return_config' in data:
ret['ret_config'] = data['return_config']
if 'return_kwargs' in data:
ret['ret_kwargs'] = data['return_kwargs']
rets = []
for returner in [data_returner, self.schedule_returner]:
if isinstance(returner, six.string_types):
rets.append(returner)
elif isinstance(returner, list):
rets.extend(returner)
# simple de-duplication with order retained
for returner in OrderedDict.fromkeys(rets):
ret_str = '{0}.returner'.format(returner)
if ret_str in self.returners:
self.returners[ret_str](ret)
else:
log.info(
'Job {0} using invalid returner: {1}. Ignoring.'.format(
func, returner
)
)
)
except Exception:
log.exception("Unhandled exception running {0}".format(ret['fun']))
@ -923,24 +929,25 @@ class Schedule(object):
except Exception as exc:
log.exception("Unhandled exception firing event: {0}".format(exc))
log.debug('schedule.handle_func: Removing {0}'.format(proc_fn))
if not self.standalone:
log.debug('schedule.handle_func: Removing {0}'.format(proc_fn))
try:
os.unlink(proc_fn)
except OSError as exc:
if exc.errno == errno.EEXIST or exc.errno == errno.ENOENT:
# EEXIST and ENOENT are OK because the file is gone and that's what
# we wanted
pass
else:
log.error("Failed to delete '{0}': {1}".format(proc_fn, exc.errno))
# Otherwise, failing to delete this file is not something
# we can cleanly handle.
raise
finally:
if multiprocessing_enabled:
# Let's make sure we exit the process!
sys.exit(salt.defaults.exitcodes.EX_GENERIC)
try:
os.unlink(proc_fn)
except OSError as exc:
if exc.errno == errno.EEXIST or exc.errno == errno.ENOENT:
# EEXIST and ENOENT are OK because the file is gone and that's what
# we wanted
pass
else:
log.error("Failed to delete '{0}': {1}".format(proc_fn, exc.errno))
# Otherwise, failing to delete this file is not something
# we can cleanly handle.
raise
finally:
if multiprocessing_enabled:
# Let's make sure we exit the process!
sys.exit(salt.defaults.exitcodes.EX_GENERIC)
def eval(self):
'''