mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Fix salt proxy tests
- The loader can now handle async methods on loaded modules - Fix async proxy methods: handle_decoded_payload and handle_payload
This commit is contained in:
parent
11a06ce0da
commit
0ff43842cf
7 changed files with 212 additions and 177 deletions
|
@ -166,6 +166,40 @@ class LoadedFunc:
|
|||
return f"<{self.__class__.__name__} name={self.name!r}>"
|
||||
|
||||
|
||||
class LoadedCoro(LoadedFunc):
|
||||
"""
|
||||
Coroutine functions loaded by LazyLoader instances using subscript notation
|
||||
'a[k]' will be wrapped with LoadedCoro.
|
||||
|
||||
- Makes sure functions are called with the correct loader's context.
|
||||
- Provides access to a wrapped func's __global__ attribute
|
||||
|
||||
:param func str: The function name to wrap
|
||||
:param LazyLoader loader: The loader instance to use in the context when the wrapped callable is called.
|
||||
"""
|
||||
|
||||
async def __call__(
|
||||
self, *args, **kwargs
|
||||
): # pylint: disable=invalid-overridden-method
|
||||
run_func = self.func
|
||||
mod = sys.modules[run_func.__module__]
|
||||
# All modules we've imported should have __opts__ defined. There are
|
||||
# cases in the test suite where mod ends up being something other than
|
||||
# a module we've loaded.
|
||||
set_test = False
|
||||
if hasattr(mod, "__opts__"):
|
||||
if not isinstance(mod.__opts__, salt.loader.context.NamedLoaderContext):
|
||||
if "test" in self.loader.opts:
|
||||
mod.__opts__["test"] = self.loader.opts["test"]
|
||||
set_test = True
|
||||
if self.loader.inject_globals:
|
||||
run_func = global_injector_decorator(self.loader.inject_globals)(run_func)
|
||||
ret = await self.loader.run(run_func, *args, **kwargs)
|
||||
if set_test:
|
||||
self.loader.opts["test"] = mod.__opts__["test"]
|
||||
return ret
|
||||
|
||||
|
||||
class LoadedMod:
|
||||
"""
|
||||
This class is used as a proxy to a loaded module
|
||||
|
@ -347,7 +381,9 @@ class LazyLoader(salt.utils.lazy.LazyDict):
|
|||
Override the __getitem__ in order to decorate the returned function if we need
|
||||
to last-minute inject globals
|
||||
"""
|
||||
super().__getitem__(item) # try to get the item from the dictionary
|
||||
_ = super().__getitem__(item) # try to get the item from the dictionary
|
||||
if inspect.iscoroutinefunction(_):
|
||||
return LoadedCoro(item, self)
|
||||
return LoadedFunc(item, self)
|
||||
|
||||
def __getattr__(self, mod_name):
|
||||
|
|
|
@ -974,7 +974,7 @@ def thread_multi_return(cls, minion_instance, opts, data):
|
|||
log.error("The return failed for job %s: %s", data["jid"], exc)
|
||||
|
||||
|
||||
def handle_payload(self, payload):
|
||||
async def handle_payload(self, payload):
|
||||
"""
|
||||
Verify the publication and then pass
|
||||
the payload along to _handle_decoded_payload.
|
||||
|
@ -982,7 +982,7 @@ def handle_payload(self, payload):
|
|||
if payload is not None and payload["enc"] == "aes":
|
||||
# First handle payload for the "control" proxy
|
||||
if self._target_load(payload["load"]):
|
||||
self._handle_decoded_payload(payload["load"])
|
||||
await self._handle_decoded_payload(payload["load"])
|
||||
|
||||
# The following handles the sub-proxies
|
||||
sub_ids = self.opts["proxy"].get("ids", [self.opts["id"]])
|
||||
|
@ -990,7 +990,7 @@ def handle_payload(self, payload):
|
|||
if _id in self.deltaproxy_objs:
|
||||
instance = self.deltaproxy_objs[_id]
|
||||
if instance._target_load(payload["load"]):
|
||||
instance._handle_decoded_payload(payload["load"])
|
||||
await instance._handle_decoded_payload(payload["load"])
|
||||
else:
|
||||
log.warning("Proxy minion %s is not loaded, skipping.", _id)
|
||||
|
||||
|
@ -1004,7 +1004,7 @@ def handle_payload(self, payload):
|
|||
# the minion currently has no need.
|
||||
|
||||
|
||||
def handle_decoded_payload(self, data):
|
||||
async def handle_decoded_payload(self, data):
|
||||
"""
|
||||
Override this method if you wish to handle the decoded data
|
||||
differently.
|
||||
|
@ -1052,7 +1052,8 @@ def handle_decoded_payload(self, data):
|
|||
data["jid"],
|
||||
)
|
||||
once_logged = True
|
||||
yield tornado.gen.sleep(0.5)
|
||||
# yield tornado.gen.sleep(0.5)
|
||||
await asyncio.sleep(0.5)
|
||||
process_count = self.subprocess_list.count
|
||||
|
||||
# We stash an instance references to allow for the socket
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
# Proxy minion metaproxy modules
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import copy
|
||||
import logging
|
||||
import os
|
||||
|
@ -741,7 +742,7 @@ def thread_multi_return(cls, minion_instance, opts, data):
|
|||
log.error("The return failed for job %s: %s", data["jid"], exc)
|
||||
|
||||
|
||||
def handle_payload(self, payload):
|
||||
async def handle_payload(self, payload):
|
||||
"""
|
||||
Verify the publication and then pass
|
||||
the payload along to _handle_decoded_payload.
|
||||
|
@ -749,7 +750,7 @@ def handle_payload(self, payload):
|
|||
if payload is not None and payload["enc"] == "aes":
|
||||
if self._target_load(payload["load"]):
|
||||
|
||||
self._handle_decoded_payload(payload["load"])
|
||||
await self._handle_decoded_payload(payload["load"])
|
||||
elif self.opts["zmq_filtering"]:
|
||||
# In the filtering enabled case, we'd like to know when minion sees something it shouldnt
|
||||
log.trace(
|
||||
|
@ -761,7 +762,7 @@ def handle_payload(self, payload):
|
|||
# the minion currently has no need.
|
||||
|
||||
|
||||
def handle_decoded_payload(self, data):
|
||||
async def handle_decoded_payload(self, data):
|
||||
"""
|
||||
Override this method if you wish to handle the decoded data
|
||||
differently.
|
||||
|
@ -807,7 +808,8 @@ def handle_decoded_payload(self, data):
|
|||
"Maximum number of processes reached while executing jid %s, waiting...",
|
||||
data["jid"],
|
||||
)
|
||||
yield tornado.gen.sleep(10)
|
||||
# yield tornado.gen.sleep(10)
|
||||
await asyncio.sleep(10)
|
||||
process_count = len(salt.utils.minion.running(self.opts))
|
||||
|
||||
# We stash an instance references to allow for the socket
|
||||
|
|
|
@ -3920,13 +3920,14 @@ class ProxyMinion(Minion):
|
|||
mp_call = _metaproxy_call(self.opts, "target_load")
|
||||
return mp_call(self, load)
|
||||
|
||||
# @tornado.gen.coroutine
|
||||
async def _handle_payload(self, payload):
|
||||
mp_call = _metaproxy_call(self.opts, "handle_payload")
|
||||
return mp_call(self, payload)
|
||||
return await mp_call(self, payload)
|
||||
|
||||
async def _handle_decoded_payload(self, data):
|
||||
mp_call = _metaproxy_call(self.opts, "handle_decoded_payload")
|
||||
return mp_call(self, data)
|
||||
return await mp_call(self, data)
|
||||
|
||||
@classmethod
|
||||
def _target(cls, minion_instance, opts, data, connected, creds_map):
|
||||
|
|
|
@ -27,6 +27,9 @@ def loader_dir(tmp_path):
|
|||
|
||||
def get_context(key):
|
||||
return __context__[key]
|
||||
|
||||
async def myasync(foo):
|
||||
return foo
|
||||
"""
|
||||
with pytest.helpers.temp_file(
|
||||
"mod_a.py", directory=tmp_path, contents=mod_contents
|
||||
|
@ -140,3 +143,11 @@ def test_loader_pack_opts_not_overwritten(loader_dir):
|
|||
assert "foo" not in loader.pack["__opts__"]
|
||||
assert "baz" in loader.pack["__opts__"]
|
||||
assert loader.pack["__opts__"]["baz"] == "bif"
|
||||
|
||||
|
||||
async def test_loader_async(loader_dir):
|
||||
opts = {"optimization_order": [0, 1, 2]}
|
||||
loader = salt.loader.lazy.LazyLoader([loader_dir], opts)
|
||||
myasync = loader["mod_a.myasync"]
|
||||
ret = await myasync("foo")
|
||||
assert ret == "foo"
|
||||
|
|
149
tests/pytests/unit/test_proxy_minion.py
Normal file
149
tests/pytests/unit/test_proxy_minion.py
Normal file
|
@ -0,0 +1,149 @@
|
|||
import copy
|
||||
import textwrap
|
||||
|
||||
import pytest
|
||||
from saltfactories.utils import random_string
|
||||
|
||||
import salt.config
|
||||
from tests.support.mock import MagicMock, patch
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_post_master_init_metaproxy_called(io_loop):
|
||||
"""
|
||||
Tests that when the _post_master_ini function is called, _metaproxy_call is also called.
|
||||
"""
|
||||
|
||||
mock_opts = salt.config.DEFAULT_MINION_OPTS.copy()
|
||||
mock_opts.update(salt.config.DEFAULT_PROXY_MINION_OPTS)
|
||||
mock_jid_queue = [123]
|
||||
proxy_minion = salt.minion.ProxyMinion(
|
||||
mock_opts,
|
||||
jid_queue=copy.copy(mock_jid_queue),
|
||||
io_loop=io_loop,
|
||||
)
|
||||
mock_metaproxy_call = MagicMock()
|
||||
with patch(
|
||||
"salt.minion._metaproxy_call",
|
||||
return_value=mock_metaproxy_call,
|
||||
autospec=True,
|
||||
):
|
||||
try:
|
||||
ret = proxy_minion._post_master_init("dummy_master")
|
||||
salt.minion._metaproxy_call.assert_called_once()
|
||||
finally:
|
||||
proxy_minion.destroy()
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
async def test_handle_decoded_payload_metaproxy_called(io_loop):
|
||||
"""
|
||||
Tests that when the _handle_decoded_payload function is called, _metaproxy_call is also called.
|
||||
"""
|
||||
mock_opts = salt.config.DEFAULT_MINION_OPTS.copy()
|
||||
mock_opts.update(salt.config.DEFAULT_PROXY_MINION_OPTS)
|
||||
|
||||
mock_data = {"fun": "foo.bar", "jid": 123}
|
||||
mock_jid_queue = [123]
|
||||
proxy_minion = salt.minion.ProxyMinion(
|
||||
mock_opts,
|
||||
jid_queue=copy.copy(mock_jid_queue),
|
||||
io_loop=io_loop,
|
||||
)
|
||||
mock_metaproxy_call = MagicMock()
|
||||
with patch(
|
||||
"salt.minion._metaproxy_call",
|
||||
return_value=mock_metaproxy_call,
|
||||
autospec=True,
|
||||
):
|
||||
try:
|
||||
ret = await proxy_minion._handle_decoded_payload(mock_data)
|
||||
assert proxy_minion.jid_queue, mock_jid_queue
|
||||
salt.minion._metaproxy_call.assert_called_once()
|
||||
finally:
|
||||
proxy_minion.destroy()
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
async def test_handle_payload_metaproxy_called(io_loop):
|
||||
"""
|
||||
Tests that when the _handle_payload function is called, _metaproxy_call is also called.
|
||||
"""
|
||||
mock_opts = salt.config.DEFAULT_MINION_OPTS.copy()
|
||||
mock_opts.update(salt.config.DEFAULT_PROXY_MINION_OPTS)
|
||||
|
||||
mock_data = {"fun": "foo.bar", "jid": 123}
|
||||
mock_jid_queue = [123]
|
||||
proxy_minion = salt.minion.ProxyMinion(
|
||||
mock_opts,
|
||||
jid_queue=copy.copy(mock_jid_queue),
|
||||
io_loop=io_loop,
|
||||
)
|
||||
mock_metaproxy_call = MagicMock()
|
||||
with patch(
|
||||
"salt.minion._metaproxy_call",
|
||||
return_value=mock_metaproxy_call,
|
||||
autospec=True,
|
||||
):
|
||||
try:
|
||||
ret = await proxy_minion._handle_decoded_payload(mock_data)
|
||||
assert proxy_minion.jid_queue == mock_jid_queue
|
||||
mock_metaproxy_call.assert_called_once()
|
||||
finally:
|
||||
proxy_minion.destroy()
|
||||
|
||||
|
||||
def test_proxy_config_default_include(tmp_path):
|
||||
"""
|
||||
Tests that when the proxy_config function is called,
|
||||
for the proxy minion, eg. /etc/salt/proxy.d/<The-Proxy-ID>/*.conf
|
||||
"""
|
||||
proxyid = random_string("proxy-")
|
||||
root_dir = tmp_path / "root"
|
||||
conf_dir = root_dir / "conf"
|
||||
conf_file = conf_dir / "proxy"
|
||||
conf_d_dir = conf_dir / "proxy.d"
|
||||
proxy_conf_d = conf_d_dir / proxyid
|
||||
proxy_conf_d.mkdir(parents=True)
|
||||
|
||||
with salt.utils.files.fopen(str(conf_file), "w") as wfh:
|
||||
wfh.write(
|
||||
textwrap.dedent(
|
||||
"""\
|
||||
id: {id}
|
||||
root_dir: {root_dir}
|
||||
pidfile: run/proxy.pid
|
||||
pki_dir: pki
|
||||
cachedir: cache
|
||||
sock_dir: run/proxy
|
||||
log_file: logs/proxy.log
|
||||
""".format(
|
||||
id=proxyid, root_dir=root_dir
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
with salt.utils.files.fopen(str(proxy_conf_d / "_schedule.conf"), "w") as wfh:
|
||||
wfh.write(
|
||||
textwrap.dedent(
|
||||
"""\
|
||||
schedule:
|
||||
test_job:
|
||||
args: [arg1, arg2]
|
||||
enabled: true
|
||||
function: test.arg
|
||||
jid_include: true
|
||||
kwargs: {key1: value1, key2: value2}
|
||||
maxrunning: 1
|
||||
name: test_job
|
||||
return_job: false
|
||||
"""
|
||||
)
|
||||
)
|
||||
opts = salt.config.proxy_config(
|
||||
str(conf_file),
|
||||
minion_id=proxyid,
|
||||
cache_minion_id=False,
|
||||
)
|
||||
assert "schedule" in opts
|
||||
assert "test_job" in opts["schedule"]
|
|
@ -1,165 +0,0 @@
|
|||
"""
|
||||
:codeauthor: Gareth J. Greenaway <gareth@saltstack.com>
|
||||
"""
|
||||
|
||||
import copy
|
||||
import logging
|
||||
import pathlib
|
||||
import shutil
|
||||
import tempfile
|
||||
import textwrap
|
||||
|
||||
import pytest
|
||||
import tornado
|
||||
import tornado.testing
|
||||
from saltfactories.utils import random_string
|
||||
|
||||
import salt.config
|
||||
import salt.metaproxy.proxy
|
||||
import salt.minion
|
||||
import salt.syspaths
|
||||
from tests.support.mock import MagicMock, patch
|
||||
from tests.support.runtests import RUNTIME_VARS
|
||||
from tests.support.unit import TestCase
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ProxyMinionTestCase(TestCase):
|
||||
@pytest.mark.slow_test
|
||||
def test_post_master_init_metaproxy_called(self):
|
||||
"""
|
||||
Tests that when the _post_master_ini function is called, _metaproxy_call is also called.
|
||||
"""
|
||||
|
||||
mock_opts = salt.config.DEFAULT_MINION_OPTS.copy()
|
||||
mock_opts.update(salt.config.DEFAULT_PROXY_MINION_OPTS)
|
||||
mock_jid_queue = [123]
|
||||
proxy_minion = salt.minion.ProxyMinion(
|
||||
mock_opts,
|
||||
jid_queue=copy.copy(mock_jid_queue),
|
||||
io_loop=tornado.ioloop.IOLoop(),
|
||||
)
|
||||
mock_metaproxy_call = MagicMock()
|
||||
with patch(
|
||||
"salt.minion._metaproxy_call",
|
||||
return_value=mock_metaproxy_call,
|
||||
autospec=True,
|
||||
):
|
||||
try:
|
||||
ret = proxy_minion._post_master_init("dummy_master")
|
||||
salt.minion._metaproxy_call.assert_called_once()
|
||||
finally:
|
||||
proxy_minion.destroy()
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_handle_decoded_payload_metaproxy_called(self):
|
||||
"""
|
||||
Tests that when the _handle_decoded_payload function is called, _metaproxy_call is also called.
|
||||
"""
|
||||
mock_opts = salt.config.DEFAULT_MINION_OPTS.copy()
|
||||
mock_opts.update(salt.config.DEFAULT_PROXY_MINION_OPTS)
|
||||
|
||||
mock_data = {"fun": "foo.bar", "jid": 123}
|
||||
mock_jid_queue = [123]
|
||||
proxy_minion = salt.minion.ProxyMinion(
|
||||
mock_opts,
|
||||
jid_queue=copy.copy(mock_jid_queue),
|
||||
io_loop=tornado.ioloop.IOLoop(),
|
||||
)
|
||||
mock_metaproxy_call = MagicMock()
|
||||
with patch(
|
||||
"salt.minion._metaproxy_call",
|
||||
return_value=mock_metaproxy_call,
|
||||
autospec=True,
|
||||
):
|
||||
try:
|
||||
ret = proxy_minion._handle_decoded_payload(mock_data).result()
|
||||
self.assertEqual(proxy_minion.jid_queue, mock_jid_queue)
|
||||
salt.minion._metaproxy_call.assert_called_once()
|
||||
finally:
|
||||
proxy_minion.destroy()
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_handle_payload_metaproxy_called(self):
|
||||
"""
|
||||
Tests that when the _handle_payload function is called, _metaproxy_call is also called.
|
||||
"""
|
||||
mock_opts = salt.config.DEFAULT_MINION_OPTS.copy()
|
||||
mock_opts.update(salt.config.DEFAULT_PROXY_MINION_OPTS)
|
||||
|
||||
mock_data = {"fun": "foo.bar", "jid": 123}
|
||||
mock_jid_queue = [123]
|
||||
proxy_minion = salt.minion.ProxyMinion(
|
||||
mock_opts,
|
||||
jid_queue=copy.copy(mock_jid_queue),
|
||||
io_loop=tornado.ioloop.IOLoop(),
|
||||
)
|
||||
mock_metaproxy_call = MagicMock()
|
||||
with patch(
|
||||
"salt.minion._metaproxy_call",
|
||||
return_value=mock_metaproxy_call,
|
||||
autospec=True,
|
||||
):
|
||||
try:
|
||||
ret = proxy_minion._handle_decoded_payload(mock_data).result()
|
||||
self.assertEqual(proxy_minion.jid_queue, mock_jid_queue)
|
||||
mock_metaproxy_call.assert_called_once()
|
||||
finally:
|
||||
proxy_minion.destroy()
|
||||
|
||||
def test_proxy_config_default_include(self):
|
||||
"""
|
||||
Tests that when the proxy_config function is called,
|
||||
for the proxy minion, eg. /etc/salt/proxy.d/<The-Proxy-ID>/*.conf
|
||||
"""
|
||||
proxyid = random_string("proxy-")
|
||||
root_dir = pathlib.Path(tempfile.mkdtemp(dir=RUNTIME_VARS.TMP))
|
||||
self.addCleanup(shutil.rmtree, str(root_dir), ignore_errors=True)
|
||||
conf_dir = root_dir / "conf"
|
||||
conf_file = conf_dir / "proxy"
|
||||
conf_d_dir = conf_dir / "proxy.d"
|
||||
proxy_conf_d = conf_d_dir / proxyid
|
||||
proxy_conf_d.mkdir(parents=True)
|
||||
|
||||
with salt.utils.files.fopen(str(conf_file), "w") as wfh:
|
||||
wfh.write(
|
||||
textwrap.dedent(
|
||||
"""\
|
||||
id: {id}
|
||||
root_dir: {root_dir}
|
||||
pidfile: run/proxy.pid
|
||||
pki_dir: pki
|
||||
cachedir: cache
|
||||
sock_dir: run/proxy
|
||||
log_file: logs/proxy.log
|
||||
""".format(
|
||||
id=proxyid, root_dir=root_dir
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
with salt.utils.files.fopen(str(proxy_conf_d / "_schedule.conf"), "w") as wfh:
|
||||
wfh.write(
|
||||
textwrap.dedent(
|
||||
"""\
|
||||
schedule:
|
||||
test_job:
|
||||
args: [arg1, arg2]
|
||||
enabled: true
|
||||
function: test.arg
|
||||
jid_include: true
|
||||
kwargs: {key1: value1, key2: value2}
|
||||
maxrunning: 1
|
||||
name: test_job
|
||||
return_job: false
|
||||
"""
|
||||
)
|
||||
)
|
||||
opts = salt.config.proxy_config(
|
||||
str(conf_file),
|
||||
minion_id=proxyid,
|
||||
cache_minion_id=False,
|
||||
)
|
||||
self.assertIn("schedule", opts)
|
||||
self.assertIn("test_job", opts["schedule"])
|
Loading…
Add table
Reference in a new issue