mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
include the 'success' field in scheduled jobs return data (part of #24237)
* Instead of ignoring the execution of jobs with invalid function names, continue, so that always some data is returned. * Move the code that returns data to the master to the 'finally' section, so that data is returned also in case of error. * Set a default 'success' value, and overwrite it with the value from the function context.
This commit is contained in:
parent
f72a4ca42d
commit
5a1b2ca486
1 changed files with 68 additions and 62 deletions
|
@ -482,24 +482,24 @@ class Schedule(object):
|
|||
func = None
|
||||
if func not in self.functions:
|
||||
log.info(
|
||||
'Invalid function: {0} in job {1}. Ignoring.'.format(
|
||||
'Invalid function: {0} in scheduled job {1}.'.format(
|
||||
func, name
|
||||
)
|
||||
)
|
||||
|
||||
if 'name' not in data:
|
||||
data['name'] = name
|
||||
log.info(
|
||||
'Running Job: {0}.'.format(name)
|
||||
)
|
||||
if self.opts.get('multiprocessing', True):
|
||||
thread_cls = multiprocessing.Process
|
||||
else:
|
||||
if 'name' not in data:
|
||||
data['name'] = name
|
||||
log.info(
|
||||
'Running Job: {0}.'.format(name)
|
||||
)
|
||||
if self.opts.get('multiprocessing', True):
|
||||
thread_cls = multiprocessing.Process
|
||||
else:
|
||||
thread_cls = threading.Thread
|
||||
proc = thread_cls(target=self.handle_func, args=(func, data))
|
||||
proc.start()
|
||||
if self.opts.get('multiprocessing', True):
|
||||
proc.join()
|
||||
thread_cls = threading.Thread
|
||||
proc = thread_cls(target=self.handle_func, args=(func, data))
|
||||
proc.start()
|
||||
if self.opts.get('multiprocessing', True):
|
||||
proc.join()
|
||||
|
||||
def enable_schedule(self):
|
||||
'''
|
||||
|
@ -642,33 +642,39 @@ class Schedule(object):
|
|||
except OSError:
|
||||
log.info('Unable to remove file: {0}.'.format(fn_))
|
||||
|
||||
salt.utils.daemonize_if(self.opts)
|
||||
|
||||
ret['pid'] = os.getpid()
|
||||
|
||||
if 'jid_include' not in data or data['jid_include']:
|
||||
log.debug('schedule.handle_func: adding this job to the jobcache '
|
||||
'with data {0}'.format(ret))
|
||||
# write this to /var/cache/salt/minion/proc
|
||||
with salt.utils.fopen(proc_fn, 'w+b') as fp_:
|
||||
fp_.write(salt.payload.Serial(self.opts).dumps(ret))
|
||||
|
||||
args = tuple()
|
||||
if 'args' in data:
|
||||
args = data['args']
|
||||
|
||||
kwargs = {}
|
||||
if 'kwargs' in data:
|
||||
kwargs = data['kwargs']
|
||||
# if the func support **kwargs, lets pack in the pub data we have
|
||||
# TODO: pack the *same* pub data as a minion?
|
||||
argspec = salt.utils.args.get_function_argspec(self.functions[func])
|
||||
if argspec.keywords:
|
||||
# this function accepts **kwargs, pack in the publish data
|
||||
for key, val in six.iteritems(ret):
|
||||
kwargs['__pub_{0}'.format(key)] = val
|
||||
|
||||
try:
|
||||
salt.utils.daemonize_if(self.opts)
|
||||
|
||||
ret['pid'] = os.getpid()
|
||||
|
||||
if 'jid_include' not in data or data['jid_include']:
|
||||
log.debug('schedule.handle_func: adding this job to the jobcache '
|
||||
'with data {0}'.format(ret))
|
||||
# write this to /var/cache/salt/minion/proc
|
||||
with salt.utils.fopen(proc_fn, 'w+b') as fp_:
|
||||
fp_.write(salt.payload.Serial(self.opts).dumps(ret))
|
||||
|
||||
args = tuple()
|
||||
if 'args' in data:
|
||||
args = data['args']
|
||||
|
||||
kwargs = {}
|
||||
if 'kwargs' in data:
|
||||
kwargs = data['kwargs']
|
||||
|
||||
if func not in self.functions:
|
||||
ret['return'] = self.functions.missing_fun_string(func)
|
||||
salt.utils.error.raise_error(
|
||||
message=self.functions.missing_fun_string(func))
|
||||
|
||||
# if the func support **kwargs, lets pack in the pub data we have
|
||||
# TODO: pack the *same* pub data as a minion?
|
||||
argspec = salt.utils.args.get_function_argspec(self.functions[func])
|
||||
if argspec.keywords:
|
||||
# this function accepts **kwargs, pack in the publish data
|
||||
for key, val in six.iteritems(ret):
|
||||
kwargs['__pub_{0}'.format(key)] = val
|
||||
|
||||
ret['return'] = self.functions[func](*args, **kwargs)
|
||||
|
||||
data_returner = data.get('returner', None)
|
||||
|
@ -694,33 +700,34 @@ class Schedule(object):
|
|||
)
|
||||
)
|
||||
|
||||
# Only attempt to return data to the master
|
||||
# if the scheduled job is running on a minion.
|
||||
if '__role' in self.opts and self.opts['__role'] == 'minion':
|
||||
if 'return_job' in data and not data['return_job']:
|
||||
pass
|
||||
else:
|
||||
# Send back to master so the job is included in the job list
|
||||
mret = ret.copy()
|
||||
mret['jid'] = 'req'
|
||||
channel = salt.transport.Channel.factory(self.opts, usage='salt_schedule')
|
||||
load = {
|
||||
'cmd': '_return',
|
||||
'id': self.opts['id'],
|
||||
'retcode': self.functions.pack['__context__']['retcode']
|
||||
}
|
||||
|
||||
for key, value in six.iteritems(mret):
|
||||
load[key] = value
|
||||
channel.send(load)
|
||||
|
||||
ret['retcode'] = self.functions.pack['__context__']['retcode']
|
||||
ret['success'] = True
|
||||
except Exception:
|
||||
log.exception("Unhandled exception running {0}".format(ret['fun']))
|
||||
# Although catch-all exception handlers are bad, the exception here
|
||||
# is to let the exception bubble up to the top of the thread context,
|
||||
# where the thread will die silently, which is worse.
|
||||
if not 'return' in ret:
|
||||
ret['return'] = "Unhandled exception running {0}".format(ret['fun'])
|
||||
ret['success'] = False
|
||||
ret['retcode'] = 254
|
||||
finally:
|
||||
try:
|
||||
# Only attempt to return data to the master
|
||||
# if the scheduled job is running on a minion.
|
||||
if '__role' in self.opts and self.opts['__role'] == 'minion':
|
||||
if 'return_job' in data and not data['return_job']:
|
||||
pass
|
||||
else:
|
||||
# Send back to master so the job is included in the job list
|
||||
mret = ret.copy()
|
||||
mret['jid'] = 'req'
|
||||
channel = salt.transport.Channel.factory(self.opts, usage='salt_schedule')
|
||||
load = {'cmd': '_return', 'id': self.opts['id']}
|
||||
for key, value in six.iteritems(mret):
|
||||
load[key] = value
|
||||
channel.send(load)
|
||||
|
||||
log.debug('schedule.handle_func: Removing {0}'.format(proc_fn))
|
||||
os.unlink(proc_fn)
|
||||
except OSError as exc:
|
||||
|
@ -762,11 +769,10 @@ class Schedule(object):
|
|||
func = None
|
||||
if func not in self.functions:
|
||||
log.info(
|
||||
'Invalid function: {0} in job {1}. Ignoring.'.format(
|
||||
'Invalid function: {0} in scheduled job {1}.'.format(
|
||||
func, job
|
||||
)
|
||||
)
|
||||
continue
|
||||
if 'name' not in data:
|
||||
data['name'] = job
|
||||
# Add up how many seconds between now and then
|
||||
|
|
Loading…
Add table
Reference in a new issue