From 0ff43842cf0141c946c58ac9b3af18ca744c0ff9 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Mon, 29 Apr 2024 15:27:15 -0700 Subject: [PATCH] Fix salt proxy tests - The loader can now handle async methods on loaded modules - Fix async proxy methods: handle_decoded_payload and handle_payload --- salt/loader/lazy.py | 38 +++++- salt/metaproxy/deltaproxy.py | 11 +- salt/metaproxy/proxy.py | 10 +- salt/minion.py | 5 +- tests/pytests/unit/loader/test_lazy.py | 11 ++ tests/pytests/unit/test_proxy_minion.py | 149 +++++++++++++++++++++ tests/unit/test_proxy_minion.py | 165 ------------------------ 7 files changed, 212 insertions(+), 177 deletions(-) create mode 100644 tests/pytests/unit/test_proxy_minion.py delete mode 100644 tests/unit/test_proxy_minion.py diff --git a/salt/loader/lazy.py b/salt/loader/lazy.py index 4afba4b5721..54bd02d650a 100644 --- a/salt/loader/lazy.py +++ b/salt/loader/lazy.py @@ -166,6 +166,40 @@ class LoadedFunc: return f"<{self.__class__.__name__} name={self.name!r}>" +class LoadedCoro(LoadedFunc): + """ + Coroutine functions loaded by LazyLoader instances using subscript notation + 'a[k]' will be wrapped with LoadedCoro. + + - Makes sure functions are called with the correct loader's context. + - Provides access to a wrapped func's __global__ attribute + + :param func str: The function name to wrap + :param LazyLoader loader: The loader instance to use in the context when the wrapped callable is called. + """ + + async def __call__( + self, *args, **kwargs + ): # pylint: disable=invalid-overridden-method + run_func = self.func + mod = sys.modules[run_func.__module__] + # All modules we've imported should have __opts__ defined. There are + # cases in the test suite where mod ends up being something other than + # a module we've loaded. + set_test = False + if hasattr(mod, "__opts__"): + if not isinstance(mod.__opts__, salt.loader.context.NamedLoaderContext): + if "test" in self.loader.opts: + mod.__opts__["test"] = self.loader.opts["test"] + set_test = True + if self.loader.inject_globals: + run_func = global_injector_decorator(self.loader.inject_globals)(run_func) + ret = await self.loader.run(run_func, *args, **kwargs) + if set_test: + self.loader.opts["test"] = mod.__opts__["test"] + return ret + + class LoadedMod: """ This class is used as a proxy to a loaded module @@ -347,7 +381,9 @@ class LazyLoader(salt.utils.lazy.LazyDict): Override the __getitem__ in order to decorate the returned function if we need to last-minute inject globals """ - super().__getitem__(item) # try to get the item from the dictionary + _ = super().__getitem__(item) # try to get the item from the dictionary + if inspect.iscoroutinefunction(_): + return LoadedCoro(item, self) return LoadedFunc(item, self) def __getattr__(self, mod_name): diff --git a/salt/metaproxy/deltaproxy.py b/salt/metaproxy/deltaproxy.py index 50bdaa0b3c3..d87ea5b3215 100644 --- a/salt/metaproxy/deltaproxy.py +++ b/salt/metaproxy/deltaproxy.py @@ -974,7 +974,7 @@ def thread_multi_return(cls, minion_instance, opts, data): log.error("The return failed for job %s: %s", data["jid"], exc) -def handle_payload(self, payload): +async def handle_payload(self, payload): """ Verify the publication and then pass the payload along to _handle_decoded_payload. @@ -982,7 +982,7 @@ def handle_payload(self, payload): if payload is not None and payload["enc"] == "aes": # First handle payload for the "control" proxy if self._target_load(payload["load"]): - self._handle_decoded_payload(payload["load"]) + await self._handle_decoded_payload(payload["load"]) # The following handles the sub-proxies sub_ids = self.opts["proxy"].get("ids", [self.opts["id"]]) @@ -990,7 +990,7 @@ def handle_payload(self, payload): if _id in self.deltaproxy_objs: instance = self.deltaproxy_objs[_id] if instance._target_load(payload["load"]): - instance._handle_decoded_payload(payload["load"]) + await instance._handle_decoded_payload(payload["load"]) else: log.warning("Proxy minion %s is not loaded, skipping.", _id) @@ -1004,7 +1004,7 @@ def handle_payload(self, payload): # the minion currently has no need. -def handle_decoded_payload(self, data): +async def handle_decoded_payload(self, data): """ Override this method if you wish to handle the decoded data differently. @@ -1052,7 +1052,8 @@ def handle_decoded_payload(self, data): data["jid"], ) once_logged = True - yield tornado.gen.sleep(0.5) + # yield tornado.gen.sleep(0.5) + await asyncio.sleep(0.5) process_count = self.subprocess_list.count # We stash an instance references to allow for the socket diff --git a/salt/metaproxy/proxy.py b/salt/metaproxy/proxy.py index 514c28dba69..25a3efc4ff8 100644 --- a/salt/metaproxy/proxy.py +++ b/salt/metaproxy/proxy.py @@ -2,6 +2,7 @@ # Proxy minion metaproxy modules # +import asyncio import copy import logging import os @@ -741,7 +742,7 @@ def thread_multi_return(cls, minion_instance, opts, data): log.error("The return failed for job %s: %s", data["jid"], exc) -def handle_payload(self, payload): +async def handle_payload(self, payload): """ Verify the publication and then pass the payload along to _handle_decoded_payload. @@ -749,7 +750,7 @@ def handle_payload(self, payload): if payload is not None and payload["enc"] == "aes": if self._target_load(payload["load"]): - self._handle_decoded_payload(payload["load"]) + await self._handle_decoded_payload(payload["load"]) elif self.opts["zmq_filtering"]: # In the filtering enabled case, we'd like to know when minion sees something it shouldnt log.trace( @@ -761,7 +762,7 @@ def handle_payload(self, payload): # the minion currently has no need. -def handle_decoded_payload(self, data): +async def handle_decoded_payload(self, data): """ Override this method if you wish to handle the decoded data differently. @@ -807,7 +808,8 @@ def handle_decoded_payload(self, data): "Maximum number of processes reached while executing jid %s, waiting...", data["jid"], ) - yield tornado.gen.sleep(10) + # yield tornado.gen.sleep(10) + await asyncio.sleep(10) process_count = len(salt.utils.minion.running(self.opts)) # We stash an instance references to allow for the socket diff --git a/salt/minion.py b/salt/minion.py index 3f52e9672b5..d4c5ddee289 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -3920,13 +3920,14 @@ class ProxyMinion(Minion): mp_call = _metaproxy_call(self.opts, "target_load") return mp_call(self, load) + # @tornado.gen.coroutine async def _handle_payload(self, payload): mp_call = _metaproxy_call(self.opts, "handle_payload") - return mp_call(self, payload) + return await mp_call(self, payload) async def _handle_decoded_payload(self, data): mp_call = _metaproxy_call(self.opts, "handle_decoded_payload") - return mp_call(self, data) + return await mp_call(self, data) @classmethod def _target(cls, minion_instance, opts, data, connected, creds_map): diff --git a/tests/pytests/unit/loader/test_lazy.py b/tests/pytests/unit/loader/test_lazy.py index 8e461b454b4..65e0e5427dc 100644 --- a/tests/pytests/unit/loader/test_lazy.py +++ b/tests/pytests/unit/loader/test_lazy.py @@ -27,6 +27,9 @@ def loader_dir(tmp_path): def get_context(key): return __context__[key] + + async def myasync(foo): + return foo """ with pytest.helpers.temp_file( "mod_a.py", directory=tmp_path, contents=mod_contents @@ -140,3 +143,11 @@ def test_loader_pack_opts_not_overwritten(loader_dir): assert "foo" not in loader.pack["__opts__"] assert "baz" in loader.pack["__opts__"] assert loader.pack["__opts__"]["baz"] == "bif" + + +async def test_loader_async(loader_dir): + opts = {"optimization_order": [0, 1, 2]} + loader = salt.loader.lazy.LazyLoader([loader_dir], opts) + myasync = loader["mod_a.myasync"] + ret = await myasync("foo") + assert ret == "foo" diff --git a/tests/pytests/unit/test_proxy_minion.py b/tests/pytests/unit/test_proxy_minion.py new file mode 100644 index 00000000000..e8b4c7decaf --- /dev/null +++ b/tests/pytests/unit/test_proxy_minion.py @@ -0,0 +1,149 @@ +import copy +import textwrap + +import pytest +from saltfactories.utils import random_string + +import salt.config +from tests.support.mock import MagicMock, patch + + +@pytest.mark.slow_test +def test_post_master_init_metaproxy_called(io_loop): + """ + Tests that when the _post_master_ini function is called, _metaproxy_call is also called. + """ + + mock_opts = salt.config.DEFAULT_MINION_OPTS.copy() + mock_opts.update(salt.config.DEFAULT_PROXY_MINION_OPTS) + mock_jid_queue = [123] + proxy_minion = salt.minion.ProxyMinion( + mock_opts, + jid_queue=copy.copy(mock_jid_queue), + io_loop=io_loop, + ) + mock_metaproxy_call = MagicMock() + with patch( + "salt.minion._metaproxy_call", + return_value=mock_metaproxy_call, + autospec=True, + ): + try: + ret = proxy_minion._post_master_init("dummy_master") + salt.minion._metaproxy_call.assert_called_once() + finally: + proxy_minion.destroy() + + +@pytest.mark.slow_test +async def test_handle_decoded_payload_metaproxy_called(io_loop): + """ + Tests that when the _handle_decoded_payload function is called, _metaproxy_call is also called. + """ + mock_opts = salt.config.DEFAULT_MINION_OPTS.copy() + mock_opts.update(salt.config.DEFAULT_PROXY_MINION_OPTS) + + mock_data = {"fun": "foo.bar", "jid": 123} + mock_jid_queue = [123] + proxy_minion = salt.minion.ProxyMinion( + mock_opts, + jid_queue=copy.copy(mock_jid_queue), + io_loop=io_loop, + ) + mock_metaproxy_call = MagicMock() + with patch( + "salt.minion._metaproxy_call", + return_value=mock_metaproxy_call, + autospec=True, + ): + try: + ret = await proxy_minion._handle_decoded_payload(mock_data) + assert proxy_minion.jid_queue, mock_jid_queue + salt.minion._metaproxy_call.assert_called_once() + finally: + proxy_minion.destroy() + + +@pytest.mark.slow_test +async def test_handle_payload_metaproxy_called(io_loop): + """ + Tests that when the _handle_payload function is called, _metaproxy_call is also called. + """ + mock_opts = salt.config.DEFAULT_MINION_OPTS.copy() + mock_opts.update(salt.config.DEFAULT_PROXY_MINION_OPTS) + + mock_data = {"fun": "foo.bar", "jid": 123} + mock_jid_queue = [123] + proxy_minion = salt.minion.ProxyMinion( + mock_opts, + jid_queue=copy.copy(mock_jid_queue), + io_loop=io_loop, + ) + mock_metaproxy_call = MagicMock() + with patch( + "salt.minion._metaproxy_call", + return_value=mock_metaproxy_call, + autospec=True, + ): + try: + ret = await proxy_minion._handle_decoded_payload(mock_data) + assert proxy_minion.jid_queue == mock_jid_queue + mock_metaproxy_call.assert_called_once() + finally: + proxy_minion.destroy() + + +def test_proxy_config_default_include(tmp_path): + """ + Tests that when the proxy_config function is called, + for the proxy minion, eg. /etc/salt/proxy.d//*.conf + """ + proxyid = random_string("proxy-") + root_dir = tmp_path / "root" + conf_dir = root_dir / "conf" + conf_file = conf_dir / "proxy" + conf_d_dir = conf_dir / "proxy.d" + proxy_conf_d = conf_d_dir / proxyid + proxy_conf_d.mkdir(parents=True) + + with salt.utils.files.fopen(str(conf_file), "w") as wfh: + wfh.write( + textwrap.dedent( + """\ + id: {id} + root_dir: {root_dir} + pidfile: run/proxy.pid + pki_dir: pki + cachedir: cache + sock_dir: run/proxy + log_file: logs/proxy.log + """.format( + id=proxyid, root_dir=root_dir + ) + ) + ) + + with salt.utils.files.fopen(str(proxy_conf_d / "_schedule.conf"), "w") as wfh: + wfh.write( + textwrap.dedent( + """\ + schedule: + test_job: + args: [arg1, arg2] + enabled: true + function: test.arg + jid_include: true + kwargs: {key1: value1, key2: value2} + maxrunning: 1 + name: test_job + return_job: false + """ + ) + ) + opts = salt.config.proxy_config( + str(conf_file), + minion_id=proxyid, + cache_minion_id=False, + ) + assert "schedule" in opts + assert "test_job" in opts["schedule"] diff --git a/tests/unit/test_proxy_minion.py b/tests/unit/test_proxy_minion.py deleted file mode 100644 index 92241cdef71..00000000000 --- a/tests/unit/test_proxy_minion.py +++ /dev/null @@ -1,165 +0,0 @@ -""" - :codeauthor: Gareth J. Greenaway -""" - -import copy -import logging -import pathlib -import shutil -import tempfile -import textwrap - -import pytest -import tornado -import tornado.testing -from saltfactories.utils import random_string - -import salt.config -import salt.metaproxy.proxy -import salt.minion -import salt.syspaths -from tests.support.mock import MagicMock, patch -from tests.support.runtests import RUNTIME_VARS -from tests.support.unit import TestCase - -log = logging.getLogger(__name__) - - -class ProxyMinionTestCase(TestCase): - @pytest.mark.slow_test - def test_post_master_init_metaproxy_called(self): - """ - Tests that when the _post_master_ini function is called, _metaproxy_call is also called. - """ - - mock_opts = salt.config.DEFAULT_MINION_OPTS.copy() - mock_opts.update(salt.config.DEFAULT_PROXY_MINION_OPTS) - mock_jid_queue = [123] - proxy_minion = salt.minion.ProxyMinion( - mock_opts, - jid_queue=copy.copy(mock_jid_queue), - io_loop=tornado.ioloop.IOLoop(), - ) - mock_metaproxy_call = MagicMock() - with patch( - "salt.minion._metaproxy_call", - return_value=mock_metaproxy_call, - autospec=True, - ): - try: - ret = proxy_minion._post_master_init("dummy_master") - salt.minion._metaproxy_call.assert_called_once() - finally: - proxy_minion.destroy() - - @pytest.mark.slow_test - def test_handle_decoded_payload_metaproxy_called(self): - """ - Tests that when the _handle_decoded_payload function is called, _metaproxy_call is also called. - """ - mock_opts = salt.config.DEFAULT_MINION_OPTS.copy() - mock_opts.update(salt.config.DEFAULT_PROXY_MINION_OPTS) - - mock_data = {"fun": "foo.bar", "jid": 123} - mock_jid_queue = [123] - proxy_minion = salt.minion.ProxyMinion( - mock_opts, - jid_queue=copy.copy(mock_jid_queue), - io_loop=tornado.ioloop.IOLoop(), - ) - mock_metaproxy_call = MagicMock() - with patch( - "salt.minion._metaproxy_call", - return_value=mock_metaproxy_call, - autospec=True, - ): - try: - ret = proxy_minion._handle_decoded_payload(mock_data).result() - self.assertEqual(proxy_minion.jid_queue, mock_jid_queue) - salt.minion._metaproxy_call.assert_called_once() - finally: - proxy_minion.destroy() - - @pytest.mark.slow_test - def test_handle_payload_metaproxy_called(self): - """ - Tests that when the _handle_payload function is called, _metaproxy_call is also called. - """ - mock_opts = salt.config.DEFAULT_MINION_OPTS.copy() - mock_opts.update(salt.config.DEFAULT_PROXY_MINION_OPTS) - - mock_data = {"fun": "foo.bar", "jid": 123} - mock_jid_queue = [123] - proxy_minion = salt.minion.ProxyMinion( - mock_opts, - jid_queue=copy.copy(mock_jid_queue), - io_loop=tornado.ioloop.IOLoop(), - ) - mock_metaproxy_call = MagicMock() - with patch( - "salt.minion._metaproxy_call", - return_value=mock_metaproxy_call, - autospec=True, - ): - try: - ret = proxy_minion._handle_decoded_payload(mock_data).result() - self.assertEqual(proxy_minion.jid_queue, mock_jid_queue) - mock_metaproxy_call.assert_called_once() - finally: - proxy_minion.destroy() - - def test_proxy_config_default_include(self): - """ - Tests that when the proxy_config function is called, - for the proxy minion, eg. /etc/salt/proxy.d//*.conf - """ - proxyid = random_string("proxy-") - root_dir = pathlib.Path(tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)) - self.addCleanup(shutil.rmtree, str(root_dir), ignore_errors=True) - conf_dir = root_dir / "conf" - conf_file = conf_dir / "proxy" - conf_d_dir = conf_dir / "proxy.d" - proxy_conf_d = conf_d_dir / proxyid - proxy_conf_d.mkdir(parents=True) - - with salt.utils.files.fopen(str(conf_file), "w") as wfh: - wfh.write( - textwrap.dedent( - """\ - id: {id} - root_dir: {root_dir} - pidfile: run/proxy.pid - pki_dir: pki - cachedir: cache - sock_dir: run/proxy - log_file: logs/proxy.log - """.format( - id=proxyid, root_dir=root_dir - ) - ) - ) - - with salt.utils.files.fopen(str(proxy_conf_d / "_schedule.conf"), "w") as wfh: - wfh.write( - textwrap.dedent( - """\ - schedule: - test_job: - args: [arg1, arg2] - enabled: true - function: test.arg - jid_include: true - kwargs: {key1: value1, key2: value2} - maxrunning: 1 - name: test_job - return_job: false - """ - ) - ) - opts = salt.config.proxy_config( - str(conf_file), - minion_id=proxyid, - cache_minion_id=False, - ) - self.assertIn("schedule", opts) - self.assertIn("test_job", opts["schedule"])