diff --git a/changelog/63356.added b/changelog/63356.added new file mode 100644 index 00000000000..994e7a70d57 --- /dev/null +++ b/changelog/63356.added @@ -0,0 +1 @@ +Allow max queue size setting for state runs to prevent performance problems from queue growth diff --git a/salt/modules/state.py b/salt/modules/state.py index 387b2bd9127..a83810fb1a4 100644 --- a/salt/modules/state.py +++ b/salt/modules/state.py @@ -119,16 +119,19 @@ def _get_pillar_errors(kwargs, pillar=None): return None if kwargs.get("force") else (pillar or __pillar__).get("_errors") -def _wait(jid): +def _wait(jid, max_queue=0): """ Wait for all previously started state jobs to finish running """ if jid is None: jid = salt.utils.jid.gen_jid(__opts__) states = _prior_running_states(jid) - while states: - time.sleep(1) - states = _prior_running_states(jid) + if not max_queue or len(states) < max_queue: + while states: + time.sleep(1) + states = _prior_running_states(jid) + return True + return False def _snapper_pre(opts, jid): @@ -413,13 +416,23 @@ def _check_queue(queue, kwargs): Utility function to queue the state run if requested and to check for conflicts in currently running states """ - if queue: + if queue is None: + queue = __salt__["config.option"]("state_queue", False) + + if queue is True: _wait(kwargs.get("__pub_jid")) else: - conflict = running(concurrent=kwargs.get("concurrent", False)) - if conflict: - __context__["retcode"] = salt.defaults.exitcodes.EX_STATE_COMPILER_ERROR - return conflict + queue_ret = False + if not isinstance(queue, bool) and isinstance(queue, int): + queue_ret = _wait(kwargs.get("__pub_jid"), max_queue=queue) + + if not queue_ret: + conflict = running(concurrent=kwargs.get("concurrent", False)) + if conflict: + __context__["retcode"] = salt.defaults.exitcodes.EX_STATE_COMPILER_ERROR + return conflict + + return def _get_initial_pillar(opts): @@ -431,7 +444,7 @@ def _get_initial_pillar(opts): ) -def low(data, queue=False, **kwargs): +def low(data, queue=None, **kwargs): """ Execute a single low data call @@ -480,7 +493,7 @@ def _get_test_value(test=None, **kwargs): return ret -def high(data, test=None, queue=False, **kwargs): +def high(data, test=None, queue=None, **kwargs): """ Execute the compound calls stored in a single set of high data @@ -533,7 +546,7 @@ def high(data, test=None, queue=False, **kwargs): return ret -def template(tem, queue=False, **kwargs): +def template(tem, queue=None, **kwargs): """ Execute the information stored in a template file on the minion. @@ -586,7 +599,7 @@ def template(tem, queue=False, **kwargs): return ret -def template_str(tem, queue=False, **kwargs): +def template_str(tem, queue=None, **kwargs): """ Execute the information stored in a string from an sls template @@ -668,11 +681,19 @@ def apply_(mods=None, **kwargs): queue : False Instead of failing immediately when another state run is in progress, - queue the new state run to begin running once the other has finished. + a value of ``True`` will queue the new state run to begin running once + the other has finished. This option starts a new thread for each queued state run, so use this option sparingly. + .. versionchanged:: 3007.0 + This parameter can also be set via the ``state_queue`` configuration + option. Additionally, it can now be set to an integer representing + the maximum queue size which can be attained before the state runs + will fail to be queued. This can prevent runaway conditions where + new threads are started until system performance is hampered. + localconfig Optionally, instead of using the minion config, load minion opts from the file specified by this argument, and then merge them with the @@ -727,11 +748,19 @@ def apply_(mods=None, **kwargs): queue : False Instead of failing immediately when another state run is in progress, - queue the new state run to begin running once the other has finished. + a value of ``True`` will queue the new state run to begin running once + the other has finished. This option starts a new thread for each queued state run, so use this option sparingly. + .. versionchanged:: 3007.0 + This parameter can also be set via the ``state_queue`` configuration + option. Additionally, it can now be set to an integer representing + the maximum queue size which can be attained before the state runs + will fail to be queued. This can prevent runaway conditions where + new threads are started until system performance is hampered. + concurrent : False Execute state runs concurrently instead of serially @@ -945,7 +974,7 @@ def run_request(name="default", **kwargs): return {} -def highstate(test=None, queue=False, **kwargs): +def highstate(test=None, queue=None, **kwargs): """ Retrieve the state data from the salt master for this minion and execute it @@ -1007,11 +1036,19 @@ def highstate(test=None, queue=False, **kwargs): queue : False Instead of failing immediately when another state run is in progress, - queue the new state run to begin running once the other has finished. + a value of ``True`` will queue the new state run to begin running once + the other has finished. This option starts a new thread for each queued state run, so use this option sparingly. + .. versionchanged:: 3007.0 + This parameter can also be set via the ``state_queue`` configuration + option. Additionally, it can now be set to an integer representing + the maximum queue size which can be attained before the state runs + will fail to be queued. This can prevent runaway conditions where + new threads are started until system performance is hampered. + concurrent : False Execute state runs concurrently instead of serially @@ -1061,15 +1098,9 @@ def highstate(test=None, queue=False, **kwargs): } return ret - concurrent = kwargs.get("concurrent", False) - - if queue: - _wait(kwargs.get("__pub_jid")) - else: - conflict = running(concurrent) - if conflict: - __context__["retcode"] = salt.defaults.exitcodes.EX_STATE_COMPILER_ERROR - return conflict + conflict = _check_queue(queue, kwargs) + if conflict is not None: + return conflict orig_test = __opts__.get("test", None) opts = salt.utils.state.get_sls_opts(__opts__, **kwargs) @@ -1155,7 +1186,7 @@ def highstate(test=None, queue=False, **kwargs): return ret -def sls(mods, test=None, exclude=None, queue=False, sync_mods=None, **kwargs): +def sls(mods, test=None, exclude=None, queue=None, sync_mods=None, **kwargs): """ Execute the states in one or more SLS files @@ -1198,11 +1229,19 @@ def sls(mods, test=None, exclude=None, queue=False, sync_mods=None, **kwargs): queue : False Instead of failing immediately when another state run is in progress, - queue the new state run to begin running once the other has finished. + a value of ``True`` will queue the new state run to begin running once + the other has finished. This option starts a new thread for each queued state run, so use this option sparingly. + .. versionchanged:: 3007.0 + This parameter can also be set via the ``state_queue`` configuration + option. Additionally, it can now be set to an integer representing + the maximum queue size which can be attained before the state runs + will fail to be queued. This can prevent runaway conditions where + new threads are started until system performance is hampered. + concurrent : False Execute state runs concurrently instead of serially @@ -1279,14 +1318,10 @@ def sls(mods, test=None, exclude=None, queue=False, sync_mods=None, **kwargs): # "env" is not supported; Use "saltenv". kwargs.pop("env") - # Modification to __opts__ lost after this if-else - if queue: - _wait(kwargs.get("__pub_jid")) - else: - conflict = running(concurrent) - if conflict: - __context__["retcode"] = salt.defaults.exitcodes.EX_STATE_COMPILER_ERROR - return conflict + # Modification to __opts__ lost after this + conflict = _check_queue(queue, kwargs) + if conflict is not None: + return conflict if isinstance(mods, list): disabled = _disabled(mods) @@ -1442,7 +1477,7 @@ def sls(mods, test=None, exclude=None, queue=False, sync_mods=None, **kwargs): return ret -def top(topfn, test=None, queue=False, **kwargs): +def top(topfn, test=None, queue=None, **kwargs): """ Execute a specific top file instead of the default. This is useful to apply configurations from a different environment (for example, dev or prod), without @@ -1450,11 +1485,19 @@ def top(topfn, test=None, queue=False, **kwargs): queue : False Instead of failing immediately when another state run is in progress, - queue the new state run to begin running once the other has finished. + a value of ``True`` will queue the new state run to begin running once + the other has finished. This option starts a new thread for each queued state run, so use this option sparingly. + .. versionchanged:: 3007.0 + This parameter can also be set via the ``state_queue`` configuration + option. Additionally, it can now be set to an integer representing + the maximum queue size which can be attained before the state runs + will fail to be queued. This can prevent runaway conditions where + new threads are started until system performance is hampered. + saltenv Specify a salt fileserver environment to be used when applying states @@ -1542,7 +1585,7 @@ def top(topfn, test=None, queue=False, **kwargs): return ret -def show_highstate(queue=False, **kwargs): +def show_highstate(queue=None, **kwargs): """ Retrieve the highstate data from the salt master and display it @@ -1601,7 +1644,7 @@ def show_highstate(queue=False, **kwargs): return ret -def show_lowstate(queue=False, **kwargs): +def show_lowstate(queue=None, **kwargs): """ List out the low data that will be applied to this minion @@ -1638,7 +1681,7 @@ def show_lowstate(queue=False, **kwargs): return ret -def show_state_usage(queue=False, **kwargs): +def show_state_usage(queue=None, **kwargs): """ Retrieve the highstate data from the salt master to analyse used and unused states @@ -1672,7 +1715,7 @@ def show_state_usage(queue=False, **kwargs): return ret -def show_states(queue=False, **kwargs): +def show_states(queue=None, **kwargs): """ Returns the list of states that will be applied on highstate. @@ -1722,7 +1765,7 @@ def show_states(queue=False, **kwargs): return list(states.keys()) -def sls_id(id_, mods, test=None, queue=False, **kwargs): +def sls_id(id_, mods, test=None, queue=None, **kwargs): """ Call a single ID from the named module(s) and handle all requisites @@ -1849,7 +1892,7 @@ def sls_id(id_, mods, test=None, queue=False, **kwargs): return ret -def show_low_sls(mods, test=None, queue=False, **kwargs): +def show_low_sls(mods, test=None, queue=None, **kwargs): """ Display the low data from a specific sls. The default environment is ``base``, use ``saltenv`` to specify a different environment. @@ -1945,7 +1988,7 @@ def show_low_sls(mods, test=None, queue=False, **kwargs): return ret -def show_sls(mods, test=None, queue=False, **kwargs): +def show_sls(mods, test=None, queue=None, **kwargs): """ Display the state data from a specific sls or list of sls files on the master. The default environment is ``base``, use ``saltenv`` to specify a @@ -2039,7 +2082,7 @@ def show_sls(mods, test=None, queue=False, **kwargs): return high_ -def sls_exists(mods, test=None, queue=False, **kwargs): +def sls_exists(mods, test=None, queue=None, **kwargs): """ Tests for the existence the of a specific SLS or list of SLS files on the master. Similar to :py:func:`state.show_sls `, @@ -2061,7 +2104,7 @@ def sls_exists(mods, test=None, queue=False, **kwargs): return isinstance(show_sls(mods, test=test, queue=queue, **kwargs), dict) -def id_exists(ids, mods, test=None, queue=False, **kwargs): +def id_exists(ids, mods, test=None, queue=None, **kwargs): """ Tests for the existence of a specific ID or list of IDs within the specified SLS file(s). Similar to :py:func:`state.sls_exists @@ -2088,7 +2131,7 @@ def id_exists(ids, mods, test=None, queue=False, **kwargs): return ids.issubset(sls_ids) -def show_top(queue=False, **kwargs): +def show_top(queue=None, **kwargs): """ Return the top data that the minion will use for a highstate @@ -2130,7 +2173,7 @@ def show_top(queue=False, **kwargs): return matches -def single(fun, name, test=None, queue=False, **kwargs): +def single(fun, name, test=None, queue=None, **kwargs): """ Execute a single state function with the named kwargs, returns False if insufficient data is sent to the command diff --git a/tests/pytests/unit/modules/state/test_state.py b/tests/pytests/unit/modules/state/test_state.py index 0497f122d95..7c42646bcf7 100644 --- a/tests/pytests/unit/modules/state/test_state.py +++ b/tests/pytests/unit/modules/state/test_state.py @@ -1274,3 +1274,41 @@ def test_event(): if _expected in x.args[0]: found = True assert found is True + + +@pytest.mark.parametrize( + "max_queue,call_count,ret_value", + [(0, 2, True), (3, 2, True), (2, 0, False), (1, 0, False)], +) +def test__wait(max_queue, call_count, ret_value): + mock_jid = 8675309 + mock_sleep = MagicMock() + mock_prior = MagicMock( + side_effect=[ + ["one", "two"], + ["one"], + [], + ] + ) + with patch("time.sleep", mock_sleep), patch( + "salt.modules.state._prior_running_states", mock_prior + ): + ret = state._wait(mock_jid, max_queue=max_queue) + assert mock_sleep.call_count == call_count + assert ret is ret_value + + +@pytest.mark.parametrize( + "queue,wait_called,ret_value", + [(True, True, None), (False, False, True), (1, True, None)], +) +def test__check_queue(queue, wait_called, ret_value): + mock_wait = MagicMock() + with patch("salt.modules.state._wait", mock_wait), patch( + "salt.modules.state.running", MagicMock(return_value=True) + ), patch.dict(state.__context__, {"retcode": "banana"}): + ret = state._check_queue(queue, {}) + assert mock_wait.called is wait_called + assert ret is ret_value + if ret_value is True: + assert state.__context__["retcode"] == 1