From 8a14947270031c62c1d68e46901ba694b0fa607e Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Tue, 25 Jul 2023 00:08:45 -0700 Subject: [PATCH] More cleanup and test fixes after refactor --- salt/utils/event.py | 70 ------------------- tests/pytests/conftest.py | 6 -- .../pytests/functional/channel/test_server.py | 1 - tests/pytests/unit/transport/test_ipc.py | 65 +++++++++++++++++ tests/pytests/unit/utils/event/test_event.py | 31 -------- tests/support/pytest/transport.py | 1 - 6 files changed, 65 insertions(+), 109 deletions(-) diff --git a/salt/utils/event.py b/salt/utils/event.py index fe150f32399..c3c474158f7 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -276,76 +276,6 @@ class SaltEvent: # the default 'startswith' and the optional 'regex' match_type cls.cache_regex = salt.utils.cache.CacheRegex(prepend="^") - def __load_uri(self, sock_dir, node): - """ - Return the string URI for the location of the pull and pub sockets to - use for firing and listening to events - """ - pub_host = None - pub_port = None - pub_path = None - pull_host = None - pull_port = None - pull_path = None - if node == "master": - if self.opts["ipc_mode"] == "tcp": - pub_host = "127.0.0.1" - pub_port = int(self.opts["tcp_master_pub_port"]) - pull_host = "127.0.0.1" - pull_port = int(self.opts["tcp_master_pull_port"]) - log.debug( - "%s PUB socket URI: %s:%s", - self.__class__.__name__, - pub_host, - pub_port, - ) - log.debug( - "%s PULL socket URI: %s:%s", - self.__class__.__name__, - pull_host, - pull_port, - ) - else: - pub_path = os.path.join(sock_dir, "master_event_pub.ipc") - pull_path = os.path.join(sock_dir, "master_event_pull.ipc") - log.debug("%s PUB socket URI: %s", self.__class__.__name__, pub_path) - log.debug("%s PULL socket URI: %s", self.__class__.__name__, pull_path) - else: - if self.opts["ipc_mode"] == "tcp": - pub_host = "127.0.0.1" - pub_port = int(self.opts["tcp_pub_port"]) - pull_host = "127.0.0.1" - pull_port = int(self.opts["tcp_pull_port"]) - log.debug( - "%s PUB socket URI: %s:%s", - self.__class__.__name__, - pub_host, - pub_port, - ) - log.debug( - "%s PULL socket URI: %s:%s", - self.__class__.__name__, - pull_host, - pull_port, - ) - else: - hash_type = getattr(hashlib, self.opts["hash_type"]) - # Only use the first 10 chars to keep longer hashes from exceeding the - # max socket path length. - minion_id = self.opts.get("hash_id", self.opts["id"]) - id_hash = hash_type( - salt.utils.stringutils.to_bytes(minion_id) - ).hexdigest()[:10] - pub_path = os.path.join( - sock_dir, "minion_event_{}_pub.ipc".format(id_hash) - ) - pull_path = os.path.join( - sock_dir, "minion_event_{}_pull.ipc".format(id_hash) - ) - log.debug("%s PUB socket URI: %s", self.__class__.__name__, pub_path) - log.debug("%s PULL socket URI: %s", self.__class__.__name__, pull_path) - return pub_host, pub_port, pub_path, pull_host, pull_port, pull_path - def subscribe(self, tag=None, match_type=None): """ Subscribe to events matching the passed tag. diff --git a/tests/pytests/conftest.py b/tests/pytests/conftest.py index fc0cb403057..721972b7337 100644 --- a/tests/pytests/conftest.py +++ b/tests/pytests/conftest.py @@ -607,18 +607,12 @@ def pytest_pyfunc_call(pyfuncitem): __tracebackhide__ = True - # loop.run_sync( - # CoroTestFunction(pyfuncitem.obj, testargs), timeout=get_test_timeout(pyfuncitem) - # ) - # try: loop.asyncio_loop.run_until_complete( asyncio.wait_for( CoroTestFunction(pyfuncitem.obj, testargs)(), timeout=get_test_timeout(pyfuncitem), ) ) - # except RuntimeError as exc: - # log.warning("WTFSON %r", dir(exc)) return True diff --git a/tests/pytests/functional/channel/test_server.py b/tests/pytests/functional/channel/test_server.py index 8db1f64c057..58bf654704a 100644 --- a/tests/pytests/functional/channel/test_server.py +++ b/tests/pytests/functional/channel/test_server.py @@ -141,7 +141,6 @@ def _connect_and_publish( io_loop.spawn_callback( server.publish, {"tgt_type": "glob", "tgt": [channel_minion_id], "WTF": "SON"} ) - # server.publish({"tgt_type": "glob", "tgt": [channel_minion_id], "WTF": "SON"}) start = time.time() while time.time() - start < timeout: yield tornado.gen.sleep(1) diff --git a/tests/pytests/unit/transport/test_ipc.py b/tests/pytests/unit/transport/test_ipc.py index 77c0e6f2964..289dc10e81c 100644 --- a/tests/pytests/unit/transport/test_ipc.py +++ b/tests/pytests/unit/transport/test_ipc.py @@ -1,7 +1,11 @@ +import hashlib + import pytest import tornado.iostream from pytestshellutils.utils import ports +import salt.config +import salt.transport import salt.transport.ipc import salt.utils.asynchronous import salt.utils.platform @@ -11,6 +15,20 @@ pytestmark = [ ] +@pytest.fixture +def sock_dir(tmp_path): + sock_dir_path = tmp_path / "test-socks" + sock_dir_path.mkdir(parents=True, exist_ok=True) + yield sock_dir_path + + +@pytest.fixture +def minion_config(sock_dir): + minion_config = salt.config.minion_config("") + minion_config["sock_dir"] = sock_dir + yield minion_config + + def test_ipc_connect_in_async_methods(): "The connect method is in IPCMessageSubscriber's async_methods property" assert "connect" in salt.transport.ipc.IPCMessageSubscriber.async_methods @@ -34,3 +52,50 @@ async def test_ipc_connect_sync_wrapped(io_loop, tmp_path): with pytest.raises(tornado.iostream.StreamClosedError): # Don't `await subscriber.connect()`, that's the purpose of the SyncWrapper subscriber.connect() + + +@pytest.fixture +def master_config(sock_dir): + conf = salt.config.master_config("") + conf["sock_dir"] = sock_dir + yield conf + + +def test_master_ipc_server_unix(master_config, sock_dir): + assert master_config.get("ipc_mode") != "tcp" + server = salt.transport.ipc_publish_server("master", master_config) + assert server.pub_path == str(sock_dir / "master_event_pub.ipc") + assert server.pull_path == str(sock_dir / "master_event_pull.ipc") + + +def test_minion_ipc_server_unix(minion_config, sock_dir): + minion_config["id"] = "foo" + id_hash = hashlib.sha256( + salt.utils.stringutils.to_bytes(minion_config["id"]) + ).hexdigest()[:10] + assert minion_config.get("ipc_mode") != "tcp" + server = salt.transport.ipc_publish_server("minion", minion_config) + assert server.pub_path == str(sock_dir / f"minion_event_{id_hash}_pub.ipc") + assert server.pull_path == str(sock_dir / f"minion_event_{id_hash}_pull.ipc") + + +def test_master_ipc_server_tcp(master_config, sock_dir): + master_config["ipc_mode"] = "tcp" + server = salt.transport.ipc_publish_server("master", master_config) + assert server.pub_host == "127.0.0.1" + assert server.pub_port == int(master_config["tcp_master_pub_port"]) + assert server.pub_path is None + assert server.pull_host == "127.0.0.1" + assert server.pull_port == int(master_config["tcp_master_pull_port"]) + assert server.pull_path is None + + +def test_minion_ipc_server_tcp(minion_config, sock_dir): + minion_config["ipc_mode"] = "tcp" + server = salt.transport.ipc_publish_server("minion", minion_config) + assert server.pub_host == "127.0.0.1" + assert server.pub_port == int(minion_config["tcp_pub_port"]) + assert server.pub_path is None + assert server.pull_host == "127.0.0.1" + assert server.pull_port == int(minion_config["tcp_pull_port"]) + assert server.pull_path is None diff --git a/tests/pytests/unit/utils/event/test_event.py b/tests/pytests/unit/utils/event/test_event.py index fc42053f4b4..0268b50de4a 100644 --- a/tests/pytests/unit/utils/event/test_event.py +++ b/tests/pytests/unit/utils/event/test_event.py @@ -1,4 +1,3 @@ -import hashlib import os import stat import time @@ -46,36 +45,6 @@ def _assert_got_event(evt, data, msg=None, expected_failure=False): assert data[key] != evt[key] -def test_master_event(sock_dir): - with salt.utils.event.MasterEvent(str(sock_dir), listen=False) as me: - assert me.pub_path == str(sock_dir / "master_event_pub.ipc") - assert me.pull_path == str(sock_dir / "master_event_pull.ipc") - - -def test_minion_event(sock_dir): - opts = dict(id="foo", sock_dir=str(sock_dir)) - id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes(opts["id"])).hexdigest()[ - :10 - ] - with salt.utils.event.MinionEvent(opts, listen=False) as me: - assert me.pub_path == str(sock_dir / f"minion_event_{id_hash}_pub.ipc") - assert me.pull_path == str(sock_dir / f"minion_event_{id_hash}_pull.ipc") - - -def test_minion_event_tcp_ipc_mode(): - opts = dict(id="foo", ipc_mode="tcp") - with salt.utils.event.MinionEvent(opts, listen=False) as me: - assert me.pub_port == 4510 - assert me.pull_port == 4511 - - -def test_minion_event_no_id(sock_dir): - with salt.utils.event.MinionEvent(dict(sock_dir=str(sock_dir)), listen=False) as me: - id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes("")).hexdigest()[:10] - assert me.pub_path == str(sock_dir / f"minion_event_{id_hash}_pub.ipc") - assert me.pull_path == str(sock_dir / f"minion_event_{id_hash}_pull.ipc") - - @pytest.mark.slow_test def test_event_single(sock_dir): """Test a single event is received""" diff --git a/tests/support/pytest/transport.py b/tests/support/pytest/transport.py index 3c5e95a9a0d..335dba37e6d 100644 --- a/tests/support/pytest/transport.py +++ b/tests/support/pytest/transport.py @@ -117,7 +117,6 @@ class Collector(salt.utils.process.SignalHandlingProcess): else: for msg in self.unpacker: serial_payload = salt.payload.loads(msg["body"]) - # raise tornado.gen.Return(msg["body"]) raise tornado.gen.Return(serial_payload) byts = yield self.sock.read_bytes(8096, partial=True) self.unpacker.feed(byts)