Merge pull request #43669 from moio/develop-limit-minion-processes

Introduce process_count_max minion configuration parameter (develop)
This commit is contained in:
Thomas S Hatch 2017-09-26 05:56:11 -06:00 committed by GitHub
commit d8f371b8b1
5 changed files with 87 additions and 3 deletions

View file

@ -689,6 +689,12 @@
# for a full explanation.
#multiprocessing: True
# Limit the maximum amount of processes or threads created by salt-minion.
# This is useful to avoid resource exhaustion in case the minion receives more
# publications than it is able to handle, as it limits the number of spawned
# processes or threads. -1 is the default and disables the limit.
#process_count_max: -1
##### Logging settings #####
##########################################

View file

@ -2419,6 +2419,23 @@ executed in a thread.
multiprocessing: True
.. conf_minion:: process_count_max
``process_count_max``
-------
.. versionadded:: Oxygen
Default: ``-1``
Limit the maximum amount of processes or threads created by ``salt-minion``.
This is useful to avoid resource exhaustion in case the minion receives more
publications than it is able to handle, as it limits the number of spawned
processes or threads. ``-1`` is the default and disables the limit.
.. code-block:: yaml
process_count_max: -1
.. _minion-logging-settings:

View file

@ -337,6 +337,9 @@ VALID_OPTS = {
# Whether or not processes should be forked when needed. The alternative is to use threading.
'multiprocessing': bool,
# Maximum number of concurrently active processes at any given point in time
'process_count_max': int,
# Whether or not the salt minion should run scheduled mine updates
'mine_enabled': bool,
@ -1258,6 +1261,7 @@ DEFAULT_MINION_OPTS = {
'auto_accept': True,
'autosign_timeout': 120,
'multiprocessing': True,
'process_count_max': -1,
'mine_enabled': True,
'mine_return_job': False,
'mine_interval': 60,

View file

@ -1333,6 +1333,7 @@ class Minion(MinionBase):
self._send_req_async(load, timeout, callback=lambda f: None) # pylint: disable=unexpected-keyword-arg
return True
@tornado.gen.coroutine
def _handle_decoded_payload(self, data):
'''
Override this method if you wish to handle the decoded data
@ -1365,6 +1366,15 @@ class Minion(MinionBase):
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
self.schedule.functions = self.functions
self.schedule.returners = self.returners
process_count_max = self.opts.get('process_count_max')
if process_count_max > 0:
process_count = len(salt.utils.minion.running(self.opts))
while process_count >= process_count_max:
log.warn("Maximum number of processes reached while executing jid {0}, waiting...".format(data['jid']))
yield tornado.gen.sleep(10)
process_count = len(salt.utils.minion.running(self.opts))
# We stash an instance references to allow for the socket
# communication in Windows. You can't pickle functions, and thus
# python needs to be able to reconstruct the reference on the other

View file

@ -18,6 +18,7 @@ import salt.utils.event as event
from salt.exceptions import SaltSystemExit
import salt.syspaths
import tornado
from salt.ext.six.moves import range
__opts__ = {}
@ -69,7 +70,7 @@ class MinionTestCase(TestCase):
mock_jid_queue = [123]
try:
minion = salt.minion.Minion(mock_opts, jid_queue=copy.copy(mock_jid_queue), io_loop=tornado.ioloop.IOLoop())
ret = minion._handle_decoded_payload(mock_data)
ret = minion._handle_decoded_payload(mock_data).result()
self.assertEqual(minion.jid_queue, mock_jid_queue)
self.assertIsNone(ret)
finally:
@ -98,7 +99,7 @@ class MinionTestCase(TestCase):
# Call the _handle_decoded_payload function and update the mock_jid_queue to include the new
# mock_jid. The mock_jid should have been added to the jid_queue since the mock_jid wasn't
# previously included. The minion's jid_queue attribute and the mock_jid_queue should be equal.
minion._handle_decoded_payload(mock_data)
minion._handle_decoded_payload(mock_data).result()
mock_jid_queue.append(mock_jid)
self.assertEqual(minion.jid_queue, mock_jid_queue)
finally:
@ -126,8 +127,54 @@ class MinionTestCase(TestCase):
# Call the _handle_decoded_payload function and check that the queue is smaller by one item
# and contains the new jid
minion._handle_decoded_payload(mock_data)
minion._handle_decoded_payload(mock_data).result()
self.assertEqual(len(minion.jid_queue), 2)
self.assertEqual(minion.jid_queue, [456, 789])
finally:
minion.destroy()
def test_process_count_max(self):
'''
Tests that the _handle_decoded_payload function does not spawn more than the configured amount of processes,
as per process_count_max.
'''
with patch('salt.minion.Minion.ctx', MagicMock(return_value={})), \
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.start', MagicMock(return_value=True)), \
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.join', MagicMock(return_value=True)), \
patch('salt.utils.minion.running', MagicMock(return_value=[])), \
patch('tornado.gen.sleep', MagicMock(return_value=tornado.concurrent.Future())):
process_count_max = 10
mock_opts = salt.config.DEFAULT_MINION_OPTS
mock_opts['minion_jid_queue_hwm'] = 100
mock_opts["process_count_max"] = process_count_max
try:
io_loop = tornado.ioloop.IOLoop()
minion = salt.minion.Minion(mock_opts, jid_queue=[], io_loop=io_loop)
# mock gen.sleep to throw a special Exception when called, so that we detect it
class SleepCalledEception(Exception):
"""Thrown when sleep is called"""
pass
tornado.gen.sleep.return_value.set_exception(SleepCalledEception())
# up until process_count_max: gen.sleep does not get called, processes are started normally
for i in range(process_count_max):
mock_data = {'fun': 'foo.bar',
'jid': i}
io_loop.run_sync(lambda data=mock_data: minion._handle_decoded_payload(data))
self.assertEqual(salt.utils.process.SignalHandlingMultiprocessingProcess.start.call_count, i + 1)
self.assertEqual(len(minion.jid_queue), i + 1)
salt.utils.minion.running.return_value += [i]
# above process_count_max: gen.sleep does get called, JIDs are created but no new processes are started
mock_data = {'fun': 'foo.bar',
'jid': process_count_max + 1}
self.assertRaises(SleepCalledEception,
lambda: io_loop.run_sync(lambda: minion._handle_decoded_payload(mock_data)))
self.assertEqual(salt.utils.process.SignalHandlingMultiprocessingProcess.start.call_count,
process_count_max)
self.assertEqual(len(minion.jid_queue), process_count_max + 1)
finally:
minion.destroy()