Apply scheduler fixes and tests from #49104 to 2018.3

This commit is contained in:
Gareth J. Greenaway 2018-10-18 08:27:35 -07:00
parent 5123488dc2
commit adb1e79723
No known key found for this signature in database
GPG key ID: 10B62F8A7CAD7A41
3 changed files with 318 additions and 27 deletions

View file

@ -828,6 +828,12 @@ class Schedule(object):
splay_ = random.randint(1, splaytime)
return splay_
def _chop_ms(dt):
'''
Remove the microseconds from a datetime object
'''
return dt - datetime.timedelta(microseconds=dt.microsecond)
schedule = self._get_schedule()
if not isinstance(schedule, dict):
raise ValueError('Schedule must be of type dict.')
@ -846,6 +852,11 @@ class Schedule(object):
if job in _hidden:
continue
# Clear these out between runs
for item in ['_skipped',
'_skip_reason']:
if item in data:
del data[item]
# Clear out _skip_reason from previous runs
if '_skip_reason' in data:
del data['_skip_reason']
@ -1145,7 +1156,7 @@ class Schedule(object):
else:
continue
seconds = int((data['_next_fire_time'] - now).total_seconds())
seconds = int((_chop_ms(data['_next_fire_time']) - _chop_ms(now)).total_seconds())
if 'splay' in data:
# Got "splay" configured, make decision to run a job based on that
@ -1419,38 +1430,45 @@ class Schedule(object):
utils = self.utils
self.utils = {}
try:
# Job is disabled, continue
if 'enabled' in data and not data['enabled']:
log.debug('Job: %s is disabled', job)
data['_skip_reason'] = 'disabled'
continue
else:
if not self.standalone:
data = self._check_max_running(func, data, self.opts)
run = data['run']
if run:
# Job is disabled, continue
if 'enabled' in data and not data['enabled']:
log.debug('Job: %s is disabled', job)
data['_skip_reason'] = 'disabled'
continue
else:
if not self.standalone:
data = self._check_max_running(func, data, self.opts)
run = data['run']
if run:
if multiprocessing_enabled:
thread_cls = salt.utils.process.SignalHandlingMultiprocessingProcess
else:
thread_cls = threading.Thread
proc = thread_cls(target=self.handle_func, args=(multiprocessing_enabled, func, data))
if run:
if multiprocessing_enabled:
thread_cls = salt.utils.process.SignalHandlingMultiprocessingProcess
else:
thread_cls = threading.Thread
proc = thread_cls(target=self.handle_func, args=(multiprocessing_enabled, func, data))
if multiprocessing_enabled:
with salt.utils.process.default_signals(signal.SIGINT, signal.SIGTERM):
# Reset current signals before starting the process in
# order not to inherit the current signal handlers
if multiprocessing_enabled:
with salt.utils.process.default_signals(signal.SIGINT, signal.SIGTERM):
# Reset current signals before starting the process in
# order not to inherit the current signal handlers
proc.start()
else:
proc.start()
else:
proc.start()
if multiprocessing_enabled:
proc.join()
if multiprocessing_enabled:
proc.join()
finally:
if run:
data['_last_run'] = now
data['_splay'] = None
if '_seconds' in data:
data['_next_fire_time'] = now + datetime.timedelta(seconds=data['_seconds'])
data['_last_run'] = now
data['_splay'] = None
if self.standalone:
data['_next_fire_time'] = now + datetime.timedelta(seconds=data['_seconds'])
elif '_skipped' in data and data['_skipped']:
data['_next_fire_time'] = now + datetime.timedelta(seconds=data['_seconds'])
elif run:
data['_next_fire_time'] = now + datetime.timedelta(seconds=data['_seconds'])
if salt.utils.platform.is_windows():
# Restore our function references.
self.functions = functions

View file

@ -9,6 +9,7 @@ import os
import random
import dateutil.parser as dateutil_parser
import datetime
# Import Salt Testing libs
from tests.support.case import ModuleCase
@ -428,3 +429,233 @@ class SchedulerEvalTest(ModuleCase, SaltReturnAssertsMixin):
self.schedule.eval(now=run_time2)
ret = self.schedule.job_status('job1')
self.assertEqual(ret['_last_run'], run_time2)
def test_eval_seconds(self):
'''
verify that scheduled job run mutiple times with seconds
'''
job_name = 'job_eval_seconds'
job = {
'schedule': {
job_name: {
'function': 'test.ping',
'seconds': '30',
}
}
}
if salt.utils.platform.is_darwin():
job['schedule'][job_name]['dry_run'] = True
# Add job to schedule
self.schedule.opts.update(job)
# eval at 2:00pm to prime, simulate minion start up.
run_time = dateutil_parser.parse('11/29/2017 2:00pm')
next_run_time = run_time + datetime.timedelta(seconds=30)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 2:00:01pm, will not run.
run_time = dateutil_parser.parse('11/29/2017 2:00:01pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertNotIn('_last_run', ret)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 2:00:30pm, will run.
run_time = dateutil_parser.parse('11/29/2017 2:00:30pm')
next_run_time = run_time + datetime.timedelta(seconds=30)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_last_run'], run_time)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 2:01:00pm, will run.
run_time = dateutil_parser.parse('11/29/2017 2:01:00pm')
next_run_time = run_time + datetime.timedelta(seconds=30)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_last_run'], run_time)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 2:01:30pm, will run.
run_time = dateutil_parser.parse('11/29/2017 2:01:30pm')
next_run_time = run_time + datetime.timedelta(seconds=30)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_last_run'], run_time)
self.assertEqual(ret['_next_fire_time'], next_run_time)
def test_eval_minutes(self):
'''
verify that scheduled job run mutiple times with minutes
'''
job_name = 'job_eval_minutes'
job = {
'schedule': {
job_name: {
'function': 'test.ping',
'minutes': '30',
}
}
}
if salt.utils.platform.is_darwin():
job['schedule'][job_name]['dry_run'] = True
# Add job to schedule
self.schedule.opts.update(job)
# eval at 2:00pm to prime, simulate minion start up.
run_time = dateutil_parser.parse('11/29/2017 2:00pm')
next_run_time = run_time + datetime.timedelta(minutes=30)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 2:00:01pm, will not run.
run_time = dateutil_parser.parse('11/29/2017 2:00:01pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertNotIn('_last_run', ret)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 2:30:00pm, will run.
run_time = dateutil_parser.parse('11/29/2017 2:30:00pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_last_run'], run_time)
# eval at 3:00:00pm, will run.
run_time = dateutil_parser.parse('11/29/2017 3:00:00pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_last_run'], run_time)
# eval at 3:30:00pm, will run.
run_time = dateutil_parser.parse('11/29/2017 3:30:00pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_last_run'], run_time)
def test_eval_hours(self):
'''
verify that scheduled job run mutiple times with hours
'''
job_name = 'job_eval_hours'
job = {
'schedule': {
job_name: {
'function': 'test.ping',
'hours': '2',
}
}
}
if salt.utils.platform.is_darwin():
job['schedule'][job_name]['dry_run'] = True
# Add job to schedule
self.schedule.opts.update(job)
# eval at 2:00pm to prime, simulate minion start up.
run_time = dateutil_parser.parse('11/29/2017 2:00pm')
next_run_time = run_time + datetime.timedelta(hours=2)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 2:00:01pm, will not run.
run_time = dateutil_parser.parse('11/29/2017 2:00:01pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertNotIn('_last_run', ret)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 4:00:00pm, will run.
run_time = dateutil_parser.parse('11/29/2017 4:00:00pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_last_run'], run_time)
# eval at 6:00:00pm, will run.
run_time = dateutil_parser.parse('11/29/2017 6:00:00pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_last_run'], run_time)
# eval at 8:00:00pm, will run.
run_time = dateutil_parser.parse('11/29/2017 8:00:00pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_last_run'], run_time)
def test_eval_days(self):
'''
verify that scheduled job run mutiple times with days
'''
job_name = 'job_eval_days'
job = {
'schedule': {
job_name: {
'function': 'test.ping',
'days': '2',
'dry_run': True
}
}
}
if salt.utils.platform.is_darwin():
job['schedule'][job_name]['dry_run'] = True
# Add job to schedule
self.schedule.opts.update(job)
# eval at 11/23/2017 2:00pm to prime, simulate minion start up.
run_time = dateutil_parser.parse('11/23/2017 2:00pm')
next_run_time = run_time + datetime.timedelta(days=2)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 11/25/2017 2:00:00pm, will run.
run_time = dateutil_parser.parse('11/25/2017 2:00:00pm')
next_run_time = run_time + datetime.timedelta(days=2)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_last_run'], run_time)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 11/26/2017 2:00:00pm, will not run.
run_time = dateutil_parser.parse('11/26/2017 2:00:00pm')
last_run_time = run_time - datetime.timedelta(days=1)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_last_run'], last_run_time)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 11/27/2017 2:00:00pm, will run.
run_time = dateutil_parser.parse('11/27/2017 2:00:00pm')
next_run_time = run_time + datetime.timedelta(days=2)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_last_run'], run_time)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 11/28/2017 2:00:00pm, will not run.
run_time = dateutil_parser.parse('11/28/2017 2:00:00pm')
last_run_time = run_time - datetime.timedelta(days=1)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_last_run'], last_run_time)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 11/29/2017 2:00:00pm, will run.
run_time = dateutil_parser.parse('11/29/2017 2:00:00pm')
next_run_time = run_time + datetime.timedelta(days=2)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status(job_name)
self.assertEqual(ret['_last_run'], run_time)
self.assertEqual(ret['_next_fire_time'], next_run_time)

View file

@ -191,3 +191,45 @@ class SchedulerSkipTest(ModuleCase, SaltReturnAssertsMixin):
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job1')
self.assertEqual(ret['_last_run'], run_time)
def test_run_seconds_skip(self):
'''
verify that scheduled job is skipped during the specified range
'''
job = {
'schedule': {
'job1': {
'function': 'test.ping',
'seconds': '10',
}
}
}
# Add job to schedule
self.schedule.opts.update(job)
# eval at 2:00pm, to prime the scheduler
run_time = dateutil_parser.parse('11/29/2017 2:00pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job1')
# eval at 2:00:10pm
run_time = dateutil_parser.parse('11/29/2017 2:00:10pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job1')
# Skip at 2:00:20pm
run_time = dateutil_parser.parse('11/29/2017 2:00:20pm')
self.schedule.skip_job('job1', {'time': run_time.strftime('%Y-%m-%dT%H:%M:%S'),
'time_fmt': '%Y-%m-%dT%H:%M:%S'})
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job1')
self.assertIn('_next_fire_time', ret)
self.assertEqual(ret['_skip_reason'], 'skip_explicit')
self.assertEqual(ret['_skipped_time'], run_time)
# Run at 2:00:30pm
run_time = dateutil_parser.parse('11/29/2017 2:00:30pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job1')
self.assertIn('_last_run', ret)