Adding some master specific functions to uitls/masters.py to determine if a Salt process is running. Updating utils/schedule.py to use the appropriate running function either from utils/master.py or utils/minion.py depending on where the scheduled job is running. Adding tests to test maxrunning in scheduled jobs for both the minion and master.

This commit is contained in:
Gareth J. Greenaway 2018-10-19 08:12:07 -07:00
parent 0d65304350
commit 65b44214d4
No known key found for this signature in database
GPG key ID: 10B62F8A7CAD7A41
3 changed files with 259 additions and 1 deletions

View file

@ -39,6 +39,114 @@ from salt.utils.zeromq import zmq
log = logging.getLogger(__name__)
def running(opts):
'''
Return the running jobs on this minion
'''
ret = []
proc_dir = os.path.join(opts['cachedir'], 'proc')
if not os.path.isdir(proc_dir):
return ret
for fn_ in os.listdir(proc_dir):
path = os.path.join(proc_dir, fn_)
try:
data = _read_proc_file(path, opts)
if data is not None:
ret.append(data)
except (IOError, OSError):
# proc files may be removed at any time during this process by
# the minion process that is executing the JID in question, so
# we must ignore ENOENT during this process
pass
return ret
def cache_jobs(opts, jid, ret):
serial = salt.payload.Serial(opts=opts)
fn_ = os.path.join(
opts['cachedir'],
'minion_jobs',
jid,
'return.p')
jdir = os.path.dirname(fn_)
if not os.path.isdir(jdir):
os.makedirs(jdir)
with salt.utils.files.fopen(fn_, 'w+b') as fp_:
fp_.write(serial.dumps(ret))
def _read_proc_file(path, opts):
'''
Return a dict of JID metadata, or None
'''
serial = salt.payload.Serial(opts)
with salt.utils.files.fopen(path, 'rb') as fp_:
buf = fp_.read()
fp_.close()
if buf:
data = serial.loads(buf)
else:
# Proc file is empty, remove
try:
os.remove(path)
except IOError:
pass
return None
if not isinstance(data, dict):
# Invalid serial object
return None
if not salt.utils.process.os_is_running(data['pid']):
# The process is no longer running, clear out the file and
# continue
try:
os.remove(path)
except IOError:
pass
return None
if not _check_cmdline(data):
pid = data.get('pid')
if pid:
log.warning(
'PID %s exists but does not appear to be a salt process.', pid
)
try:
os.remove(path)
except IOError:
pass
return None
return data
def _check_cmdline(data):
'''
In some cases where there are an insane number of processes being created
on a system a PID can get recycled or assigned to a non-Salt process.
On Linux this fn checks to make sure the PID we are checking on is actually
a Salt process.
For non-Linux systems we punt and just return True
'''
if not salt.utils.platform.is_linux():
return True
pid = data.get('pid')
if not pid:
return False
if not os.path.isdir('/proc'):
return True
path = os.path.join('/proc/{0}/cmdline'.format(pid))
if not os.path.isfile(path):
return False
try:
with salt.utils.files.fopen(path, 'rb') as fp_:
if b'salt' in fp_.read():
return True
except (OSError, IOError):
return False
class MasterPillarUtil(object):
'''
Helper utility for easy access to targeted minion grain and

View file

@ -32,6 +32,7 @@ import salt.utils.error
import salt.utils.event
import salt.utils.files
import salt.utils.jid
import salt.utils.master
import salt.utils.minion
import salt.utils.platform
import salt.utils.process
@ -177,7 +178,11 @@ class Schedule(object):
data['run'] = True
if 'jid_include' not in data or data['jid_include']:
jobcount = 0
for job in salt.utils.minion.running(self.opts):
if self.opts['__role'] == 'master':
current_jobs = salt.utils.master.running(self.opts)
else:
current_jobs = salt.utils.minion.running(self.opts)
for job in current_jobs:
if 'schedule' in job:
log.debug(
'schedule.handle_func: Checking job against fun '

View file

@ -0,0 +1,145 @@
# -*- coding: utf-8 -*-
# Import Python libs
from __future__ import absolute_import
import copy
import datetime
import logging
import os
import random
import dateutil.parser as dateutil_parser
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.mixins import SaltReturnAssertsMixin
# Import Salt Testing Libs
from tests.support.mock import MagicMock, patch
from tests.support.unit import skipIf
import tests.integration as integration
# Import Salt libs
import salt.utils.schedule
from salt.modules.test import ping as ping
try:
import croniter # pylint: disable=W0611
HAS_CRONITER = True
except ImportError:
HAS_CRONITER = False
log = logging.getLogger(__name__)
ROOT_DIR = os.path.join(integration.TMP, 'schedule-unit-tests')
SOCK_DIR = os.path.join(ROOT_DIR, 'test-socks')
DEFAULT_CONFIG = salt.config.minion_config(None)
DEFAULT_CONFIG['conf_dir'] = ROOT_DIR
DEFAULT_CONFIG['root_dir'] = ROOT_DIR
DEFAULT_CONFIG['sock_dir'] = SOCK_DIR
DEFAULT_CONFIG['pki_dir'] = os.path.join(ROOT_DIR, 'pki')
DEFAULT_CONFIG['cachedir'] = os.path.join(ROOT_DIR, 'cache')
class SchedulerMaxRunningTest(ModuleCase, SaltReturnAssertsMixin):
'''
Validate the pkg module
'''
def setUp(self):
with patch('salt.utils.schedule.clean_proc_dir', MagicMock(return_value=None)):
functions = {'test.ping': ping}
self.schedule = salt.utils.schedule.Schedule(copy.deepcopy(DEFAULT_CONFIG), functions, returners={})
self.schedule.opts['loop_interval'] = 1
def tearDown(self):
self.schedule.reset()
def test_maxrunning_minion(self):
'''
verify that scheduled job runs
'''
job = {
'schedule': {
'maxrunning_minion': {
'function': 'test.ping',
'seconds': 10,
'maxrunning': 1
}
}
}
job_data = {'function': 'test.ping',
'run': False,
'name': 'maxrunning_minion',
'seconds': 10,
'_seconds': 10,
'jid_include': True,
'maxrunning': 1}
# Add the job to the scheduler
self.schedule.opts.update(job)
running_data = [{'fun_args': [],
'jid': '20181018165923360935',
'schedule': 'maxrunning_minion',
'pid': 15338,
'fun': 'test.sleep',
'id': 'host'}]
with patch('salt.utils.minion.running',
MagicMock(return_value=running_data)):
with patch('salt.utils.process.os_is_running',
MagicMock(return_value=True)):
ret = self.schedule._check_max_running('test.ping',
job_data,
self.schedule.opts)
self.assertIn('_skip_reason', ret)
self.assertEqual('maxrunning', ret['_skip_reason'])
self.assertEqual(False, ret['run'])
def test_maxrunning_master(self):
'''
verify that scheduled job runs
'''
self.schedule.opts['__role'] = 'master'
job = {
'schedule': {
'maxrunning_master': {
'function': 'state.orch',
'args': ['test.orch_test'],
'minutes': 1,
'maxrunning': 1
}
}
}
job_data = {'function': 'state.orch',
'fun_args': ['test.orch_test'],
'run': False,
'name': 'maxrunning_master',
'minutes': 1,
'jid_include': True,
'maxrunning': 1}
# Add the job to the scheduler
self.schedule.opts.update(job)
running_data = [{'fun_args': ['test.orch_test'],
'jid': '20181018165923360935',
'schedule': 'maxrunning_master',
'pid': 15338,
'fun': 'state.orch',
'id': 'host'}]
with patch('salt.utils.master.running',
MagicMock(return_value=running_data)):
with patch('salt.utils.process.os_is_running',
MagicMock(return_value=True)):
ret = self.schedule._check_max_running('state.orch',
job_data,
self.schedule.opts)
self.assertIn('_skip_reason', ret)
self.assertEqual('maxrunning', ret['_skip_reason'])
self.assertEqual(False, ret['run'])