Merge pull request #50130 from garethgreenaway/49957_master_schedule_ignoring_maxrunning

[2018.3] Fixes to schedule maxrunning on master
This commit is contained in:
Mike Place 2018-11-26 13:35:34 -05:00 committed by GitHub
commit 5b7ab35dca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 251 additions and 6 deletions

View file

@ -39,6 +39,98 @@ from salt.utils.zeromq import zmq
log = logging.getLogger(__name__)
def get_running_jobs(opts):
'''
Return the running jobs on this minion
'''
ret = []
proc_dir = os.path.join(opts['cachedir'], 'proc')
if not os.path.isdir(proc_dir):
return ret
for fn_ in os.listdir(proc_dir):
path = os.path.join(proc_dir, fn_)
try:
data = _read_proc_file(path, opts)
if data is not None:
ret.append(data)
except (IOError, OSError):
# proc files may be removed at any time during this process by
# the master process that is executing the JID in question, so
# we must ignore ENOENT during this process
log.trace('%s removed during processing by master process', path)
return ret
def _read_proc_file(path, opts):
'''
Return a dict of JID metadata, or None
'''
serial = salt.payload.Serial(opts)
with salt.utils.files.fopen(path, 'rb') as fp_:
buf = fp_.read()
fp_.close()
if buf:
data = serial.loads(buf)
else:
# Proc file is empty, remove
try:
os.remove(path)
except IOError:
log.debug('Unable to remove proc file %s.', path)
return None
if not isinstance(data, dict):
# Invalid serial object
return None
if not salt.utils.process.os_is_running(data['pid']):
# The process is no longer running, clear out the file and
# continue
try:
os.remove(path)
except IOError:
log.debug('Unable to remove proc file %s.', path)
return None
if not _check_cmdline(data):
pid = data.get('pid')
if pid:
log.warning(
'PID %s exists but does not appear to be a salt process.', pid
)
try:
os.remove(path)
except IOError:
log.debug('Unable to remove proc file %s.', path)
return None
return data
def _check_cmdline(data):
'''
In some cases where there are an insane number of processes being created
on a system a PID can get recycled or assigned to a non-Salt process.
On Linux this fn checks to make sure the PID we are checking on is actually
a Salt process.
For non-Linux systems we punt and just return True
'''
if not salt.utils.platform.is_linux():
return True
pid = data.get('pid')
if not pid:
return False
if not os.path.isdir('/proc'):
return True
path = os.path.join('/proc/{0}/cmdline'.format(pid))
if not os.path.isfile(path):
return False
try:
with salt.utils.files.fopen(path, 'rb') as fp_:
return b'salt' in fp_.read()
except (OSError, IOError):
return False
class MasterPillarUtil(object):
'''
Helper utility for easy access to targeted minion grain and
@ -721,6 +813,7 @@ def get_values_of_matching_keys(pattern_dict, user_name):
ret.extend(pattern_dict[expr])
return ret
# test code for the ConCache class
if __name__ == '__main__':

View file

@ -42,6 +42,9 @@ def running(opts):
def cache_jobs(opts, jid, ret):
'''
Write job information to cache
'''
serial = salt.payload.Serial(opts=opts)
fn_ = os.path.join(
@ -73,7 +76,7 @@ def _read_proc_file(path, opts):
try:
os.remove(path)
except IOError:
pass
log.debug('Unable to remove proc file %s.', path)
return None
if not isinstance(data, dict):
# Invalid serial object
@ -84,7 +87,7 @@ def _read_proc_file(path, opts):
try:
os.remove(path)
except IOError:
pass
log.debug('Unable to remove proc file %s.', path)
return None
if opts.get('multiprocessing'):
if data.get('pid') == pid:
@ -94,7 +97,7 @@ def _read_proc_file(path, opts):
try:
os.remove(path)
except IOError:
pass
log.debug('Unable to remove proc file %s.', path)
return None
if data.get('jid') == current_thread:
return None
@ -102,7 +105,7 @@ def _read_proc_file(path, opts):
try:
os.remove(path)
except IOError:
pass
log.debug('Unable to remove proc file %s.', path)
return None
if not _check_cmdline(data):
@ -114,7 +117,7 @@ def _read_proc_file(path, opts):
try:
os.remove(path)
except IOError:
pass
log.debug('Unable to remove proc file %s.', path)
return None
return data

View file

@ -32,6 +32,7 @@ import salt.utils.error
import salt.utils.event
import salt.utils.files
import salt.utils.jid
import salt.utils.master
import salt.utils.minion
import salt.utils.platform
import salt.utils.process
@ -177,7 +178,11 @@ class Schedule(object):
data['run'] = True
if 'jid_include' not in data or data['jid_include']:
jobcount = 0
for job in salt.utils.minion.running(self.opts):
if self.opts['__role'] == 'master':
current_jobs = salt.utils.master.get_running_jobs(self.opts)
else:
current_jobs = salt.utils.minion.running(self.opts)
for job in current_jobs:
if 'schedule' in job:
log.debug(
'schedule.handle_func: Checking job against fun '

View file

@ -219,6 +219,7 @@ salt/utils/schedule.py:
- integration.scheduler.test_eval
- integration.scheduler.test_postpone
- integration.scheduler.test_skip
- integration.scheduler.test_maxrunning
salt/utils/vt.py:
- integration.cli.test_custom_module

View file

@ -0,0 +1,142 @@
# -*- coding: utf-8 -*-
# Import Python libs
from __future__ import absolute_import
import copy
import logging
import os
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.mixins import SaltReturnAssertsMixin
# Import Salt Testing Libs
from tests.support.mock import MagicMock, patch
import tests.integration as integration
# Import Salt libs
import salt.utils.schedule
from salt.modules.test import ping as ping
try:
import croniter # pylint: disable=W0611
HAS_CRONITER = True
except ImportError:
HAS_CRONITER = False
log = logging.getLogger(__name__)
ROOT_DIR = os.path.join(integration.TMP, 'schedule-unit-tests')
SOCK_DIR = os.path.join(ROOT_DIR, 'test-socks')
DEFAULT_CONFIG = salt.config.minion_config(None)
DEFAULT_CONFIG['conf_dir'] = ROOT_DIR
DEFAULT_CONFIG['root_dir'] = ROOT_DIR
DEFAULT_CONFIG['sock_dir'] = SOCK_DIR
DEFAULT_CONFIG['pki_dir'] = os.path.join(ROOT_DIR, 'pki')
DEFAULT_CONFIG['cachedir'] = os.path.join(ROOT_DIR, 'cache')
class SchedulerMaxRunningTest(ModuleCase, SaltReturnAssertsMixin):
'''
Validate the pkg module
'''
def setUp(self):
with patch('salt.utils.schedule.clean_proc_dir', MagicMock(return_value=None)):
functions = {'test.ping': ping}
self.schedule = salt.utils.schedule.Schedule(copy.deepcopy(DEFAULT_CONFIG), functions, returners={})
self.schedule.opts['loop_interval'] = 1
def tearDown(self):
self.schedule.reset()
def test_maxrunning_minion(self):
'''
verify that scheduled job runs
'''
self.schedule.opts['__role'] = 'minion'
job = {
'schedule': {
'maxrunning_minion': {
'function': 'test.ping',
'seconds': 10,
'maxrunning': 1
}
}
}
job_data = {'function': 'test.ping',
'run': False,
'name': 'maxrunning_minion',
'seconds': 10,
'_seconds': 10,
'jid_include': True,
'maxrunning': 1}
# Add the job to the scheduler
self.schedule.opts.update(job)
running_data = [{'fun_args': [],
'jid': '20181018165923360935',
'schedule': 'maxrunning_minion',
'pid': 15338,
'fun': 'test.sleep',
'id': 'host'}]
with patch('salt.utils.minion.running',
MagicMock(return_value=running_data)):
with patch('salt.utils.process.os_is_running',
MagicMock(return_value=True)):
ret = self.schedule._check_max_running('test.ping',
job_data,
self.schedule.opts)
self.assertIn('_skip_reason', ret)
self.assertEqual('maxrunning', ret['_skip_reason'])
self.assertEqual(False, ret['run'])
def test_maxrunning_master(self):
'''
verify that scheduled job runs
'''
self.schedule.opts['__role'] = 'master'
job = {
'schedule': {
'maxrunning_master': {
'function': 'state.orch',
'args': ['test.orch_test'],
'minutes': 1,
'maxrunning': 1
}
}
}
job_data = {'function': 'state.orch',
'fun_args': ['test.orch_test'],
'run': False,
'name': 'maxrunning_master',
'minutes': 1,
'jid_include': True,
'maxrunning': 1}
# Add the job to the scheduler
self.schedule.opts.update(job)
running_data = [{'fun_args': ['test.orch_test'],
'jid': '20181018165923360935',
'schedule': 'maxrunning_master',
'pid': 15338,
'fun': 'state.orch',
'id': 'host'}]
with patch('salt.utils.master.get_running_jobs',
MagicMock(return_value=running_data)):
with patch('salt.utils.process.os_is_running',
MagicMock(return_value=True)):
ret = self.schedule._check_max_running('state.orch',
job_data,
self.schedule.opts)
self.assertIn('_skip_reason', ret)
self.assertEqual('maxrunning', ret['_skip_reason'])
self.assertEqual(False, ret['run'])

View file

@ -146,6 +146,7 @@ class BadTestModuleNamesTestCase(TestCase):
'integration.scheduler.test_eval',
'integration.scheduler.test_postpone',
'integration.scheduler.test_skip',
'integration.scheduler.test_maxrunning',
'integration.shell.test_spm',
'integration.shell.test_cp',
'integration.shell.test_syndic',