include the 'success' field in scheduled jobs return data (part of #24237)

* Instead of ignoring the execution of jobs with invalid function names,
  continue, so that always some data is returned.

* Move the code that returns data to the master to the 'finally' section, so
  that data is returned also in case of error.

* Set a default 'success' value, and overwrite it with the value from
  the function context.
This commit is contained in:
Duncan Mac-Vicar P 2016-01-08 22:16:18 +01:00
parent f72a4ca42d
commit 5a1b2ca486

View file

@ -482,24 +482,24 @@ class Schedule(object):
func = None
if func not in self.functions:
log.info(
'Invalid function: {0} in job {1}. Ignoring.'.format(
'Invalid function: {0} in scheduled job {1}.'.format(
func, name
)
)
if 'name' not in data:
data['name'] = name
log.info(
'Running Job: {0}.'.format(name)
)
if self.opts.get('multiprocessing', True):
thread_cls = multiprocessing.Process
else:
if 'name' not in data:
data['name'] = name
log.info(
'Running Job: {0}.'.format(name)
)
if self.opts.get('multiprocessing', True):
thread_cls = multiprocessing.Process
else:
thread_cls = threading.Thread
proc = thread_cls(target=self.handle_func, args=(func, data))
proc.start()
if self.opts.get('multiprocessing', True):
proc.join()
thread_cls = threading.Thread
proc = thread_cls(target=self.handle_func, args=(func, data))
proc.start()
if self.opts.get('multiprocessing', True):
proc.join()
def enable_schedule(self):
'''
@ -642,33 +642,39 @@ class Schedule(object):
except OSError:
log.info('Unable to remove file: {0}.'.format(fn_))
salt.utils.daemonize_if(self.opts)
ret['pid'] = os.getpid()
if 'jid_include' not in data or data['jid_include']:
log.debug('schedule.handle_func: adding this job to the jobcache '
'with data {0}'.format(ret))
# write this to /var/cache/salt/minion/proc
with salt.utils.fopen(proc_fn, 'w+b') as fp_:
fp_.write(salt.payload.Serial(self.opts).dumps(ret))
args = tuple()
if 'args' in data:
args = data['args']
kwargs = {}
if 'kwargs' in data:
kwargs = data['kwargs']
# if the func support **kwargs, lets pack in the pub data we have
# TODO: pack the *same* pub data as a minion?
argspec = salt.utils.args.get_function_argspec(self.functions[func])
if argspec.keywords:
# this function accepts **kwargs, pack in the publish data
for key, val in six.iteritems(ret):
kwargs['__pub_{0}'.format(key)] = val
try:
salt.utils.daemonize_if(self.opts)
ret['pid'] = os.getpid()
if 'jid_include' not in data or data['jid_include']:
log.debug('schedule.handle_func: adding this job to the jobcache '
'with data {0}'.format(ret))
# write this to /var/cache/salt/minion/proc
with salt.utils.fopen(proc_fn, 'w+b') as fp_:
fp_.write(salt.payload.Serial(self.opts).dumps(ret))
args = tuple()
if 'args' in data:
args = data['args']
kwargs = {}
if 'kwargs' in data:
kwargs = data['kwargs']
if func not in self.functions:
ret['return'] = self.functions.missing_fun_string(func)
salt.utils.error.raise_error(
message=self.functions.missing_fun_string(func))
# if the func support **kwargs, lets pack in the pub data we have
# TODO: pack the *same* pub data as a minion?
argspec = salt.utils.args.get_function_argspec(self.functions[func])
if argspec.keywords:
# this function accepts **kwargs, pack in the publish data
for key, val in six.iteritems(ret):
kwargs['__pub_{0}'.format(key)] = val
ret['return'] = self.functions[func](*args, **kwargs)
data_returner = data.get('returner', None)
@ -694,33 +700,34 @@ class Schedule(object):
)
)
# Only attempt to return data to the master
# if the scheduled job is running on a minion.
if '__role' in self.opts and self.opts['__role'] == 'minion':
if 'return_job' in data and not data['return_job']:
pass
else:
# Send back to master so the job is included in the job list
mret = ret.copy()
mret['jid'] = 'req'
channel = salt.transport.Channel.factory(self.opts, usage='salt_schedule')
load = {
'cmd': '_return',
'id': self.opts['id'],
'retcode': self.functions.pack['__context__']['retcode']
}
for key, value in six.iteritems(mret):
load[key] = value
channel.send(load)
ret['retcode'] = self.functions.pack['__context__']['retcode']
ret['success'] = True
except Exception:
log.exception("Unhandled exception running {0}".format(ret['fun']))
# Although catch-all exception handlers are bad, the exception here
# is to let the exception bubble up to the top of the thread context,
# where the thread will die silently, which is worse.
if not 'return' in ret:
ret['return'] = "Unhandled exception running {0}".format(ret['fun'])
ret['success'] = False
ret['retcode'] = 254
finally:
try:
# Only attempt to return data to the master
# if the scheduled job is running on a minion.
if '__role' in self.opts and self.opts['__role'] == 'minion':
if 'return_job' in data and not data['return_job']:
pass
else:
# Send back to master so the job is included in the job list
mret = ret.copy()
mret['jid'] = 'req'
channel = salt.transport.Channel.factory(self.opts, usage='salt_schedule')
load = {'cmd': '_return', 'id': self.opts['id']}
for key, value in six.iteritems(mret):
load[key] = value
channel.send(load)
log.debug('schedule.handle_func: Removing {0}'.format(proc_fn))
os.unlink(proc_fn)
except OSError as exc:
@ -762,11 +769,10 @@ class Schedule(object):
func = None
if func not in self.functions:
log.info(
'Invalid function: {0} in job {1}. Ignoring.'.format(
'Invalid function: {0} in scheduled job {1}.'.format(
func, job
)
)
continue
if 'name' not in data:
data['name'] = job
# Add up how many seconds between now and then