fixes saltstack/salt#63356 allow setting max queue size for state runs

This commit is contained in:
nicholasmhughes 2022-12-21 14:15:33 -05:00
parent d8c5180bf5
commit 1e5721ad52
No known key found for this signature in database
GPG key ID: CB87254A2EA67E01
3 changed files with 131 additions and 49 deletions

1
changelog/63356.added Normal file
View file

@ -0,0 +1 @@
Allow max queue size setting for state runs to prevent performance problems from queue growth

View file

@ -119,16 +119,19 @@ def _get_pillar_errors(kwargs, pillar=None):
return None if kwargs.get("force") else (pillar or __pillar__).get("_errors")
def _wait(jid):
def _wait(jid, max_queue=0):
"""
Wait for all previously started state jobs to finish running
"""
if jid is None:
jid = salt.utils.jid.gen_jid(__opts__)
states = _prior_running_states(jid)
while states:
time.sleep(1)
states = _prior_running_states(jid)
if not max_queue or len(states) < max_queue:
while states:
time.sleep(1)
states = _prior_running_states(jid)
return True
return False
def _snapper_pre(opts, jid):
@ -413,13 +416,23 @@ def _check_queue(queue, kwargs):
Utility function to queue the state run if requested
and to check for conflicts in currently running states
"""
if queue:
if queue is None:
queue = __salt__["config.option"]("state_queue", False)
if queue is True:
_wait(kwargs.get("__pub_jid"))
else:
conflict = running(concurrent=kwargs.get("concurrent", False))
if conflict:
__context__["retcode"] = salt.defaults.exitcodes.EX_STATE_COMPILER_ERROR
return conflict
queue_ret = False
if not isinstance(queue, bool) and isinstance(queue, int):
queue_ret = _wait(kwargs.get("__pub_jid"), max_queue=queue)
if not queue_ret:
conflict = running(concurrent=kwargs.get("concurrent", False))
if conflict:
__context__["retcode"] = salt.defaults.exitcodes.EX_STATE_COMPILER_ERROR
return conflict
return
def _get_initial_pillar(opts):
@ -431,7 +444,7 @@ def _get_initial_pillar(opts):
)
def low(data, queue=False, **kwargs):
def low(data, queue=None, **kwargs):
"""
Execute a single low data call
@ -480,7 +493,7 @@ def _get_test_value(test=None, **kwargs):
return ret
def high(data, test=None, queue=False, **kwargs):
def high(data, test=None, queue=None, **kwargs):
"""
Execute the compound calls stored in a single set of high data
@ -533,7 +546,7 @@ def high(data, test=None, queue=False, **kwargs):
return ret
def template(tem, queue=False, **kwargs):
def template(tem, queue=None, **kwargs):
"""
Execute the information stored in a template file on the minion.
@ -586,7 +599,7 @@ def template(tem, queue=False, **kwargs):
return ret
def template_str(tem, queue=False, **kwargs):
def template_str(tem, queue=None, **kwargs):
"""
Execute the information stored in a string from an sls template
@ -668,11 +681,19 @@ def apply_(mods=None, **kwargs):
queue : False
Instead of failing immediately when another state run is in progress,
queue the new state run to begin running once the other has finished.
a value of ``True`` will queue the new state run to begin running once
the other has finished.
This option starts a new thread for each queued state run, so use this
option sparingly.
.. versionchanged:: 3007.0
This parameter can also be set via the ``state_queue`` configuration
option. Additionally, it can now be set to an integer representing
the maximum queue size which can be attained before the state runs
will fail to be queued. This can prevent runaway conditions where
new threads are started until system performance is hampered.
localconfig
Optionally, instead of using the minion config, load minion opts from
the file specified by this argument, and then merge them with the
@ -727,11 +748,19 @@ def apply_(mods=None, **kwargs):
queue : False
Instead of failing immediately when another state run is in progress,
queue the new state run to begin running once the other has finished.
a value of ``True`` will queue the new state run to begin running once
the other has finished.
This option starts a new thread for each queued state run, so use this
option sparingly.
.. versionchanged:: 3007.0
This parameter can also be set via the ``state_queue`` configuration
option. Additionally, it can now be set to an integer representing
the maximum queue size which can be attained before the state runs
will fail to be queued. This can prevent runaway conditions where
new threads are started until system performance is hampered.
concurrent : False
Execute state runs concurrently instead of serially
@ -945,7 +974,7 @@ def run_request(name="default", **kwargs):
return {}
def highstate(test=None, queue=False, **kwargs):
def highstate(test=None, queue=None, **kwargs):
"""
Retrieve the state data from the salt master for this minion and execute it
@ -1007,11 +1036,19 @@ def highstate(test=None, queue=False, **kwargs):
queue : False
Instead of failing immediately when another state run is in progress,
queue the new state run to begin running once the other has finished.
a value of ``True`` will queue the new state run to begin running once
the other has finished.
This option starts a new thread for each queued state run, so use this
option sparingly.
.. versionchanged:: 3007.0
This parameter can also be set via the ``state_queue`` configuration
option. Additionally, it can now be set to an integer representing
the maximum queue size which can be attained before the state runs
will fail to be queued. This can prevent runaway conditions where
new threads are started until system performance is hampered.
concurrent : False
Execute state runs concurrently instead of serially
@ -1061,15 +1098,9 @@ def highstate(test=None, queue=False, **kwargs):
}
return ret
concurrent = kwargs.get("concurrent", False)
if queue:
_wait(kwargs.get("__pub_jid"))
else:
conflict = running(concurrent)
if conflict:
__context__["retcode"] = salt.defaults.exitcodes.EX_STATE_COMPILER_ERROR
return conflict
conflict = _check_queue(queue, kwargs)
if conflict is not None:
return conflict
orig_test = __opts__.get("test", None)
opts = salt.utils.state.get_sls_opts(__opts__, **kwargs)
@ -1155,7 +1186,7 @@ def highstate(test=None, queue=False, **kwargs):
return ret
def sls(mods, test=None, exclude=None, queue=False, sync_mods=None, **kwargs):
def sls(mods, test=None, exclude=None, queue=None, sync_mods=None, **kwargs):
"""
Execute the states in one or more SLS files
@ -1198,11 +1229,19 @@ def sls(mods, test=None, exclude=None, queue=False, sync_mods=None, **kwargs):
queue : False
Instead of failing immediately when another state run is in progress,
queue the new state run to begin running once the other has finished.
a value of ``True`` will queue the new state run to begin running once
the other has finished.
This option starts a new thread for each queued state run, so use this
option sparingly.
.. versionchanged:: 3007.0
This parameter can also be set via the ``state_queue`` configuration
option. Additionally, it can now be set to an integer representing
the maximum queue size which can be attained before the state runs
will fail to be queued. This can prevent runaway conditions where
new threads are started until system performance is hampered.
concurrent : False
Execute state runs concurrently instead of serially
@ -1279,14 +1318,10 @@ def sls(mods, test=None, exclude=None, queue=False, sync_mods=None, **kwargs):
# "env" is not supported; Use "saltenv".
kwargs.pop("env")
# Modification to __opts__ lost after this if-else
if queue:
_wait(kwargs.get("__pub_jid"))
else:
conflict = running(concurrent)
if conflict:
__context__["retcode"] = salt.defaults.exitcodes.EX_STATE_COMPILER_ERROR
return conflict
# Modification to __opts__ lost after this
conflict = _check_queue(queue, kwargs)
if conflict is not None:
return conflict
if isinstance(mods, list):
disabled = _disabled(mods)
@ -1442,7 +1477,7 @@ def sls(mods, test=None, exclude=None, queue=False, sync_mods=None, **kwargs):
return ret
def top(topfn, test=None, queue=False, **kwargs):
def top(topfn, test=None, queue=None, **kwargs):
"""
Execute a specific top file instead of the default. This is useful to apply
configurations from a different environment (for example, dev or prod), without
@ -1450,11 +1485,19 @@ def top(topfn, test=None, queue=False, **kwargs):
queue : False
Instead of failing immediately when another state run is in progress,
queue the new state run to begin running once the other has finished.
a value of ``True`` will queue the new state run to begin running once
the other has finished.
This option starts a new thread for each queued state run, so use this
option sparingly.
.. versionchanged:: 3007.0
This parameter can also be set via the ``state_queue`` configuration
option. Additionally, it can now be set to an integer representing
the maximum queue size which can be attained before the state runs
will fail to be queued. This can prevent runaway conditions where
new threads are started until system performance is hampered.
saltenv
Specify a salt fileserver environment to be used when applying states
@ -1542,7 +1585,7 @@ def top(topfn, test=None, queue=False, **kwargs):
return ret
def show_highstate(queue=False, **kwargs):
def show_highstate(queue=None, **kwargs):
"""
Retrieve the highstate data from the salt master and display it
@ -1601,7 +1644,7 @@ def show_highstate(queue=False, **kwargs):
return ret
def show_lowstate(queue=False, **kwargs):
def show_lowstate(queue=None, **kwargs):
"""
List out the low data that will be applied to this minion
@ -1638,7 +1681,7 @@ def show_lowstate(queue=False, **kwargs):
return ret
def show_state_usage(queue=False, **kwargs):
def show_state_usage(queue=None, **kwargs):
"""
Retrieve the highstate data from the salt master to analyse used and unused states
@ -1672,7 +1715,7 @@ def show_state_usage(queue=False, **kwargs):
return ret
def show_states(queue=False, **kwargs):
def show_states(queue=None, **kwargs):
"""
Returns the list of states that will be applied on highstate.
@ -1722,7 +1765,7 @@ def show_states(queue=False, **kwargs):
return list(states.keys())
def sls_id(id_, mods, test=None, queue=False, **kwargs):
def sls_id(id_, mods, test=None, queue=None, **kwargs):
"""
Call a single ID from the named module(s) and handle all requisites
@ -1849,7 +1892,7 @@ def sls_id(id_, mods, test=None, queue=False, **kwargs):
return ret
def show_low_sls(mods, test=None, queue=False, **kwargs):
def show_low_sls(mods, test=None, queue=None, **kwargs):
"""
Display the low data from a specific sls. The default environment is
``base``, use ``saltenv`` to specify a different environment.
@ -1945,7 +1988,7 @@ def show_low_sls(mods, test=None, queue=False, **kwargs):
return ret
def show_sls(mods, test=None, queue=False, **kwargs):
def show_sls(mods, test=None, queue=None, **kwargs):
"""
Display the state data from a specific sls or list of sls files on the
master. The default environment is ``base``, use ``saltenv`` to specify a
@ -2039,7 +2082,7 @@ def show_sls(mods, test=None, queue=False, **kwargs):
return high_
def sls_exists(mods, test=None, queue=False, **kwargs):
def sls_exists(mods, test=None, queue=None, **kwargs):
"""
Tests for the existence the of a specific SLS or list of SLS files on the
master. Similar to :py:func:`state.show_sls <salt.modules.state.show_sls>`,
@ -2061,7 +2104,7 @@ def sls_exists(mods, test=None, queue=False, **kwargs):
return isinstance(show_sls(mods, test=test, queue=queue, **kwargs), dict)
def id_exists(ids, mods, test=None, queue=False, **kwargs):
def id_exists(ids, mods, test=None, queue=None, **kwargs):
"""
Tests for the existence of a specific ID or list of IDs within the
specified SLS file(s). Similar to :py:func:`state.sls_exists
@ -2088,7 +2131,7 @@ def id_exists(ids, mods, test=None, queue=False, **kwargs):
return ids.issubset(sls_ids)
def show_top(queue=False, **kwargs):
def show_top(queue=None, **kwargs):
"""
Return the top data that the minion will use for a highstate
@ -2130,7 +2173,7 @@ def show_top(queue=False, **kwargs):
return matches
def single(fun, name, test=None, queue=False, **kwargs):
def single(fun, name, test=None, queue=None, **kwargs):
"""
Execute a single state function with the named kwargs, returns False if
insufficient data is sent to the command

View file

@ -1274,3 +1274,41 @@ def test_event():
if _expected in x.args[0]:
found = True
assert found is True
@pytest.mark.parametrize(
"max_queue,call_count,ret_value",
[(0, 2, True), (3, 2, True), (2, 0, False), (1, 0, False)],
)
def test__wait(max_queue, call_count, ret_value):
mock_jid = 8675309
mock_sleep = MagicMock()
mock_prior = MagicMock(
side_effect=[
["one", "two"],
["one"],
[],
]
)
with patch("time.sleep", mock_sleep), patch(
"salt.modules.state._prior_running_states", mock_prior
):
ret = state._wait(mock_jid, max_queue=max_queue)
assert mock_sleep.call_count == call_count
assert ret is ret_value
@pytest.mark.parametrize(
"queue,wait_called,ret_value",
[(True, True, None), (False, False, True), (1, True, None)],
)
def test__check_queue(queue, wait_called, ret_value):
mock_wait = MagicMock()
with patch("salt.modules.state._wait", mock_wait), patch(
"salt.modules.state.running", MagicMock(return_value=True)
), patch.dict(state.__context__, {"retcode": "banana"}):
ret = state._check_queue(queue, {})
assert mock_wait.called is wait_called
assert ret is ret_value
if ret_value is True:
assert state.__context__["retcode"] == 1