mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #50130 from garethgreenaway/49957_master_schedule_ignoring_maxrunning
[2018.3] Fixes to schedule maxrunning on master
This commit is contained in:
commit
5b7ab35dca
6 changed files with 251 additions and 6 deletions
|
@ -39,6 +39,98 @@ from salt.utils.zeromq import zmq
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_running_jobs(opts):
|
||||
'''
|
||||
Return the running jobs on this minion
|
||||
'''
|
||||
|
||||
ret = []
|
||||
proc_dir = os.path.join(opts['cachedir'], 'proc')
|
||||
if not os.path.isdir(proc_dir):
|
||||
return ret
|
||||
for fn_ in os.listdir(proc_dir):
|
||||
path = os.path.join(proc_dir, fn_)
|
||||
try:
|
||||
data = _read_proc_file(path, opts)
|
||||
if data is not None:
|
||||
ret.append(data)
|
||||
except (IOError, OSError):
|
||||
# proc files may be removed at any time during this process by
|
||||
# the master process that is executing the JID in question, so
|
||||
# we must ignore ENOENT during this process
|
||||
log.trace('%s removed during processing by master process', path)
|
||||
return ret
|
||||
|
||||
|
||||
def _read_proc_file(path, opts):
|
||||
'''
|
||||
Return a dict of JID metadata, or None
|
||||
'''
|
||||
serial = salt.payload.Serial(opts)
|
||||
with salt.utils.files.fopen(path, 'rb') as fp_:
|
||||
buf = fp_.read()
|
||||
fp_.close()
|
||||
if buf:
|
||||
data = serial.loads(buf)
|
||||
else:
|
||||
# Proc file is empty, remove
|
||||
try:
|
||||
os.remove(path)
|
||||
except IOError:
|
||||
log.debug('Unable to remove proc file %s.', path)
|
||||
return None
|
||||
if not isinstance(data, dict):
|
||||
# Invalid serial object
|
||||
return None
|
||||
if not salt.utils.process.os_is_running(data['pid']):
|
||||
# The process is no longer running, clear out the file and
|
||||
# continue
|
||||
try:
|
||||
os.remove(path)
|
||||
except IOError:
|
||||
log.debug('Unable to remove proc file %s.', path)
|
||||
return None
|
||||
|
||||
if not _check_cmdline(data):
|
||||
pid = data.get('pid')
|
||||
if pid:
|
||||
log.warning(
|
||||
'PID %s exists but does not appear to be a salt process.', pid
|
||||
)
|
||||
try:
|
||||
os.remove(path)
|
||||
except IOError:
|
||||
log.debug('Unable to remove proc file %s.', path)
|
||||
return None
|
||||
return data
|
||||
|
||||
|
||||
def _check_cmdline(data):
|
||||
'''
|
||||
In some cases where there are an insane number of processes being created
|
||||
on a system a PID can get recycled or assigned to a non-Salt process.
|
||||
On Linux this fn checks to make sure the PID we are checking on is actually
|
||||
a Salt process.
|
||||
|
||||
For non-Linux systems we punt and just return True
|
||||
'''
|
||||
if not salt.utils.platform.is_linux():
|
||||
return True
|
||||
pid = data.get('pid')
|
||||
if not pid:
|
||||
return False
|
||||
if not os.path.isdir('/proc'):
|
||||
return True
|
||||
path = os.path.join('/proc/{0}/cmdline'.format(pid))
|
||||
if not os.path.isfile(path):
|
||||
return False
|
||||
try:
|
||||
with salt.utils.files.fopen(path, 'rb') as fp_:
|
||||
return b'salt' in fp_.read()
|
||||
except (OSError, IOError):
|
||||
return False
|
||||
|
||||
|
||||
class MasterPillarUtil(object):
|
||||
'''
|
||||
Helper utility for easy access to targeted minion grain and
|
||||
|
@ -721,6 +813,7 @@ def get_values_of_matching_keys(pattern_dict, user_name):
|
|||
ret.extend(pattern_dict[expr])
|
||||
return ret
|
||||
|
||||
|
||||
# test code for the ConCache class
|
||||
if __name__ == '__main__':
|
||||
|
||||
|
|
|
@ -42,6 +42,9 @@ def running(opts):
|
|||
|
||||
|
||||
def cache_jobs(opts, jid, ret):
|
||||
'''
|
||||
Write job information to cache
|
||||
'''
|
||||
serial = salt.payload.Serial(opts=opts)
|
||||
|
||||
fn_ = os.path.join(
|
||||
|
@ -73,7 +76,7 @@ def _read_proc_file(path, opts):
|
|||
try:
|
||||
os.remove(path)
|
||||
except IOError:
|
||||
pass
|
||||
log.debug('Unable to remove proc file %s.', path)
|
||||
return None
|
||||
if not isinstance(data, dict):
|
||||
# Invalid serial object
|
||||
|
@ -84,7 +87,7 @@ def _read_proc_file(path, opts):
|
|||
try:
|
||||
os.remove(path)
|
||||
except IOError:
|
||||
pass
|
||||
log.debug('Unable to remove proc file %s.', path)
|
||||
return None
|
||||
if opts.get('multiprocessing'):
|
||||
if data.get('pid') == pid:
|
||||
|
@ -94,7 +97,7 @@ def _read_proc_file(path, opts):
|
|||
try:
|
||||
os.remove(path)
|
||||
except IOError:
|
||||
pass
|
||||
log.debug('Unable to remove proc file %s.', path)
|
||||
return None
|
||||
if data.get('jid') == current_thread:
|
||||
return None
|
||||
|
@ -102,7 +105,7 @@ def _read_proc_file(path, opts):
|
|||
try:
|
||||
os.remove(path)
|
||||
except IOError:
|
||||
pass
|
||||
log.debug('Unable to remove proc file %s.', path)
|
||||
return None
|
||||
|
||||
if not _check_cmdline(data):
|
||||
|
@ -114,7 +117,7 @@ def _read_proc_file(path, opts):
|
|||
try:
|
||||
os.remove(path)
|
||||
except IOError:
|
||||
pass
|
||||
log.debug('Unable to remove proc file %s.', path)
|
||||
return None
|
||||
return data
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import salt.utils.error
|
|||
import salt.utils.event
|
||||
import salt.utils.files
|
||||
import salt.utils.jid
|
||||
import salt.utils.master
|
||||
import salt.utils.minion
|
||||
import salt.utils.platform
|
||||
import salt.utils.process
|
||||
|
@ -177,7 +178,11 @@ class Schedule(object):
|
|||
data['run'] = True
|
||||
if 'jid_include' not in data or data['jid_include']:
|
||||
jobcount = 0
|
||||
for job in salt.utils.minion.running(self.opts):
|
||||
if self.opts['__role'] == 'master':
|
||||
current_jobs = salt.utils.master.get_running_jobs(self.opts)
|
||||
else:
|
||||
current_jobs = salt.utils.minion.running(self.opts)
|
||||
for job in current_jobs:
|
||||
if 'schedule' in job:
|
||||
log.debug(
|
||||
'schedule.handle_func: Checking job against fun '
|
||||
|
|
|
@ -219,6 +219,7 @@ salt/utils/schedule.py:
|
|||
- integration.scheduler.test_eval
|
||||
- integration.scheduler.test_postpone
|
||||
- integration.scheduler.test_skip
|
||||
- integration.scheduler.test_maxrunning
|
||||
|
||||
salt/utils/vt.py:
|
||||
- integration.cli.test_custom_module
|
||||
|
|
142
tests/integration/scheduler/test_maxrunning.py
Normal file
142
tests/integration/scheduler/test_maxrunning.py
Normal file
|
@ -0,0 +1,142 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Import Python libs
|
||||
from __future__ import absolute_import
|
||||
import copy
|
||||
import logging
|
||||
import os
|
||||
|
||||
# Import Salt Testing libs
|
||||
from tests.support.case import ModuleCase
|
||||
from tests.support.mixins import SaltReturnAssertsMixin
|
||||
|
||||
# Import Salt Testing Libs
|
||||
from tests.support.mock import MagicMock, patch
|
||||
import tests.integration as integration
|
||||
|
||||
# Import Salt libs
|
||||
import salt.utils.schedule
|
||||
|
||||
from salt.modules.test import ping as ping
|
||||
|
||||
try:
|
||||
import croniter # pylint: disable=W0611
|
||||
HAS_CRONITER = True
|
||||
except ImportError:
|
||||
HAS_CRONITER = False
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
ROOT_DIR = os.path.join(integration.TMP, 'schedule-unit-tests')
|
||||
SOCK_DIR = os.path.join(ROOT_DIR, 'test-socks')
|
||||
|
||||
DEFAULT_CONFIG = salt.config.minion_config(None)
|
||||
DEFAULT_CONFIG['conf_dir'] = ROOT_DIR
|
||||
DEFAULT_CONFIG['root_dir'] = ROOT_DIR
|
||||
DEFAULT_CONFIG['sock_dir'] = SOCK_DIR
|
||||
DEFAULT_CONFIG['pki_dir'] = os.path.join(ROOT_DIR, 'pki')
|
||||
DEFAULT_CONFIG['cachedir'] = os.path.join(ROOT_DIR, 'cache')
|
||||
|
||||
|
||||
class SchedulerMaxRunningTest(ModuleCase, SaltReturnAssertsMixin):
|
||||
'''
|
||||
Validate the pkg module
|
||||
'''
|
||||
def setUp(self):
|
||||
with patch('salt.utils.schedule.clean_proc_dir', MagicMock(return_value=None)):
|
||||
functions = {'test.ping': ping}
|
||||
self.schedule = salt.utils.schedule.Schedule(copy.deepcopy(DEFAULT_CONFIG), functions, returners={})
|
||||
self.schedule.opts['loop_interval'] = 1
|
||||
|
||||
def tearDown(self):
|
||||
self.schedule.reset()
|
||||
|
||||
def test_maxrunning_minion(self):
|
||||
'''
|
||||
verify that scheduled job runs
|
||||
'''
|
||||
self.schedule.opts['__role'] = 'minion'
|
||||
|
||||
job = {
|
||||
'schedule': {
|
||||
'maxrunning_minion': {
|
||||
'function': 'test.ping',
|
||||
'seconds': 10,
|
||||
'maxrunning': 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
job_data = {'function': 'test.ping',
|
||||
'run': False,
|
||||
'name': 'maxrunning_minion',
|
||||
'seconds': 10,
|
||||
'_seconds': 10,
|
||||
'jid_include': True,
|
||||
'maxrunning': 1}
|
||||
|
||||
# Add the job to the scheduler
|
||||
self.schedule.opts.update(job)
|
||||
|
||||
running_data = [{'fun_args': [],
|
||||
'jid': '20181018165923360935',
|
||||
'schedule': 'maxrunning_minion',
|
||||
'pid': 15338,
|
||||
'fun': 'test.sleep',
|
||||
'id': 'host'}]
|
||||
|
||||
with patch('salt.utils.minion.running',
|
||||
MagicMock(return_value=running_data)):
|
||||
with patch('salt.utils.process.os_is_running',
|
||||
MagicMock(return_value=True)):
|
||||
ret = self.schedule._check_max_running('test.ping',
|
||||
job_data,
|
||||
self.schedule.opts)
|
||||
self.assertIn('_skip_reason', ret)
|
||||
self.assertEqual('maxrunning', ret['_skip_reason'])
|
||||
self.assertEqual(False, ret['run'])
|
||||
|
||||
def test_maxrunning_master(self):
|
||||
'''
|
||||
verify that scheduled job runs
|
||||
'''
|
||||
self.schedule.opts['__role'] = 'master'
|
||||
|
||||
job = {
|
||||
'schedule': {
|
||||
'maxrunning_master': {
|
||||
'function': 'state.orch',
|
||||
'args': ['test.orch_test'],
|
||||
'minutes': 1,
|
||||
'maxrunning': 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
job_data = {'function': 'state.orch',
|
||||
'fun_args': ['test.orch_test'],
|
||||
'run': False,
|
||||
'name': 'maxrunning_master',
|
||||
'minutes': 1,
|
||||
'jid_include': True,
|
||||
'maxrunning': 1}
|
||||
|
||||
# Add the job to the scheduler
|
||||
self.schedule.opts.update(job)
|
||||
|
||||
running_data = [{'fun_args': ['test.orch_test'],
|
||||
'jid': '20181018165923360935',
|
||||
'schedule': 'maxrunning_master',
|
||||
'pid': 15338,
|
||||
'fun': 'state.orch',
|
||||
'id': 'host'}]
|
||||
|
||||
with patch('salt.utils.master.get_running_jobs',
|
||||
MagicMock(return_value=running_data)):
|
||||
with patch('salt.utils.process.os_is_running',
|
||||
MagicMock(return_value=True)):
|
||||
ret = self.schedule._check_max_running('state.orch',
|
||||
job_data,
|
||||
self.schedule.opts)
|
||||
self.assertIn('_skip_reason', ret)
|
||||
self.assertEqual('maxrunning', ret['_skip_reason'])
|
||||
self.assertEqual(False, ret['run'])
|
|
@ -146,6 +146,7 @@ class BadTestModuleNamesTestCase(TestCase):
|
|||
'integration.scheduler.test_eval',
|
||||
'integration.scheduler.test_postpone',
|
||||
'integration.scheduler.test_skip',
|
||||
'integration.scheduler.test_maxrunning',
|
||||
'integration.shell.test_spm',
|
||||
'integration.shell.test_cp',
|
||||
'integration.shell.test_syndic',
|
||||
|
|
Loading…
Add table
Reference in a new issue