Merge pull request #37064 from cachedout/issue_35097

Unify job check in scheduler
This commit is contained in:
Mike Place 2016-10-19 11:08:05 +09:00 committed by GitHub
commit 67faee1f94

View file

@ -295,6 +295,7 @@ import salt.utils
import salt.utils.jid
import salt.utils.process
import salt.utils.args
import salt.utils.minion
import salt.loader
import salt.minion
import salt.payload
@ -665,37 +666,22 @@ class Schedule(object):
# dict we treat it like it was there and is True
if 'jid_include' not in data or data['jid_include']:
jobcount = 0
for basefilename in os.listdir(salt.minion.get_proc_dir(self.opts['cachedir'])):
fn_ = os.path.join(salt.minion.get_proc_dir(self.opts['cachedir']), basefilename)
if not os.path.exists(fn_):
log.debug('schedule.handle_func: {0} was processed '
'in another thread, skipping.'.format(
basefilename))
continue
with salt.utils.fopen(fn_, 'rb') as fp_:
job = salt.payload.Serial(self.opts).load(fp_)
if job:
if 'schedule' in job:
log.debug('schedule.handle_func: Checking job against '
'fun {0}: {1}'.format(ret['fun'], job))
if ret['schedule'] == job['schedule'] and os_is_running(job['pid']):
jobcount += 1
log.debug(
'schedule.handle_func: Incrementing jobcount, now '
'{0}, maxrunning is {1}'.format(
jobcount, data['maxrunning']))
if jobcount >= data['maxrunning']:
log.debug(
'schedule.handle_func: The scheduled job {0} '
'was not started, {1} already running'.format(
ret['schedule'], data['maxrunning']))
return False
else:
try:
log.info('Invalid job file found. Removing.')
os.remove(fn_)
except OSError:
log.info('Unable to remove file: {0}.'.format(fn_))
for job in salt.utils.minion.running(self.opts):
if 'schedule' in job:
log.debug('schedule.handle_func: Checking job against '
'fun {0}: {1}'.format(ret['fun'], job))
if ret['schedule'] == job['schedule'] and os_is_running(job['pid']):
jobcount += 1
log.debug(
'schedule.handle_func: Incrementing jobcount, now '
'{0}, maxrunning is {1}'.format(
jobcount, data['maxrunning']))
if jobcount >= data['maxrunning']:
log.debug(
'schedule.handle_func: The scheduled job {0} '
'was not started, {1} already running'.format(
ret['schedule'], data['maxrunning']))
return False
if multiprocessing_enabled and not salt.utils.is_windows():
# Reconfigure multiprocessing logging after daemonizing