More cleanup and test fixes after refactor

This commit is contained in:
Daniel A. Wozniak 2023-07-25 00:08:45 -07:00 committed by Gareth J. Greenaway
parent 16fad0baef
commit 8a14947270
6 changed files with 65 additions and 109 deletions

View file

@ -276,76 +276,6 @@ class SaltEvent:
# the default 'startswith' and the optional 'regex' match_type
cls.cache_regex = salt.utils.cache.CacheRegex(prepend="^")
def __load_uri(self, sock_dir, node):
"""
Return the string URI for the location of the pull and pub sockets to
use for firing and listening to events
"""
pub_host = None
pub_port = None
pub_path = None
pull_host = None
pull_port = None
pull_path = None
if node == "master":
if self.opts["ipc_mode"] == "tcp":
pub_host = "127.0.0.1"
pub_port = int(self.opts["tcp_master_pub_port"])
pull_host = "127.0.0.1"
pull_port = int(self.opts["tcp_master_pull_port"])
log.debug(
"%s PUB socket URI: %s:%s",
self.__class__.__name__,
pub_host,
pub_port,
)
log.debug(
"%s PULL socket URI: %s:%s",
self.__class__.__name__,
pull_host,
pull_port,
)
else:
pub_path = os.path.join(sock_dir, "master_event_pub.ipc")
pull_path = os.path.join(sock_dir, "master_event_pull.ipc")
log.debug("%s PUB socket URI: %s", self.__class__.__name__, pub_path)
log.debug("%s PULL socket URI: %s", self.__class__.__name__, pull_path)
else:
if self.opts["ipc_mode"] == "tcp":
pub_host = "127.0.0.1"
pub_port = int(self.opts["tcp_pub_port"])
pull_host = "127.0.0.1"
pull_port = int(self.opts["tcp_pull_port"])
log.debug(
"%s PUB socket URI: %s:%s",
self.__class__.__name__,
pub_host,
pub_port,
)
log.debug(
"%s PULL socket URI: %s:%s",
self.__class__.__name__,
pull_host,
pull_port,
)
else:
hash_type = getattr(hashlib, self.opts["hash_type"])
# Only use the first 10 chars to keep longer hashes from exceeding the
# max socket path length.
minion_id = self.opts.get("hash_id", self.opts["id"])
id_hash = hash_type(
salt.utils.stringutils.to_bytes(minion_id)
).hexdigest()[:10]
pub_path = os.path.join(
sock_dir, "minion_event_{}_pub.ipc".format(id_hash)
)
pull_path = os.path.join(
sock_dir, "minion_event_{}_pull.ipc".format(id_hash)
)
log.debug("%s PUB socket URI: %s", self.__class__.__name__, pub_path)
log.debug("%s PULL socket URI: %s", self.__class__.__name__, pull_path)
return pub_host, pub_port, pub_path, pull_host, pull_port, pull_path
def subscribe(self, tag=None, match_type=None):
"""
Subscribe to events matching the passed tag.

View file

@ -607,18 +607,12 @@ def pytest_pyfunc_call(pyfuncitem):
__tracebackhide__ = True
# loop.run_sync(
# CoroTestFunction(pyfuncitem.obj, testargs), timeout=get_test_timeout(pyfuncitem)
# )
# try:
loop.asyncio_loop.run_until_complete(
asyncio.wait_for(
CoroTestFunction(pyfuncitem.obj, testargs)(),
timeout=get_test_timeout(pyfuncitem),
)
)
# except RuntimeError as exc:
# log.warning("WTFSON %r", dir(exc))
return True

View file

@ -141,7 +141,6 @@ def _connect_and_publish(
io_loop.spawn_callback(
server.publish, {"tgt_type": "glob", "tgt": [channel_minion_id], "WTF": "SON"}
)
# server.publish({"tgt_type": "glob", "tgt": [channel_minion_id], "WTF": "SON"})
start = time.time()
while time.time() - start < timeout:
yield tornado.gen.sleep(1)

View file

@ -1,7 +1,11 @@
import hashlib
import pytest
import tornado.iostream
from pytestshellutils.utils import ports
import salt.config
import salt.transport
import salt.transport.ipc
import salt.utils.asynchronous
import salt.utils.platform
@ -11,6 +15,20 @@ pytestmark = [
]
@pytest.fixture
def sock_dir(tmp_path):
sock_dir_path = tmp_path / "test-socks"
sock_dir_path.mkdir(parents=True, exist_ok=True)
yield sock_dir_path
@pytest.fixture
def minion_config(sock_dir):
minion_config = salt.config.minion_config("")
minion_config["sock_dir"] = sock_dir
yield minion_config
def test_ipc_connect_in_async_methods():
"The connect method is in IPCMessageSubscriber's async_methods property"
assert "connect" in salt.transport.ipc.IPCMessageSubscriber.async_methods
@ -34,3 +52,50 @@ async def test_ipc_connect_sync_wrapped(io_loop, tmp_path):
with pytest.raises(tornado.iostream.StreamClosedError):
# Don't `await subscriber.connect()`, that's the purpose of the SyncWrapper
subscriber.connect()
@pytest.fixture
def master_config(sock_dir):
conf = salt.config.master_config("")
conf["sock_dir"] = sock_dir
yield conf
def test_master_ipc_server_unix(master_config, sock_dir):
assert master_config.get("ipc_mode") != "tcp"
server = salt.transport.ipc_publish_server("master", master_config)
assert server.pub_path == str(sock_dir / "master_event_pub.ipc")
assert server.pull_path == str(sock_dir / "master_event_pull.ipc")
def test_minion_ipc_server_unix(minion_config, sock_dir):
minion_config["id"] = "foo"
id_hash = hashlib.sha256(
salt.utils.stringutils.to_bytes(minion_config["id"])
).hexdigest()[:10]
assert minion_config.get("ipc_mode") != "tcp"
server = salt.transport.ipc_publish_server("minion", minion_config)
assert server.pub_path == str(sock_dir / f"minion_event_{id_hash}_pub.ipc")
assert server.pull_path == str(sock_dir / f"minion_event_{id_hash}_pull.ipc")
def test_master_ipc_server_tcp(master_config, sock_dir):
master_config["ipc_mode"] = "tcp"
server = salt.transport.ipc_publish_server("master", master_config)
assert server.pub_host == "127.0.0.1"
assert server.pub_port == int(master_config["tcp_master_pub_port"])
assert server.pub_path is None
assert server.pull_host == "127.0.0.1"
assert server.pull_port == int(master_config["tcp_master_pull_port"])
assert server.pull_path is None
def test_minion_ipc_server_tcp(minion_config, sock_dir):
minion_config["ipc_mode"] = "tcp"
server = salt.transport.ipc_publish_server("minion", minion_config)
assert server.pub_host == "127.0.0.1"
assert server.pub_port == int(minion_config["tcp_pub_port"])
assert server.pub_path is None
assert server.pull_host == "127.0.0.1"
assert server.pull_port == int(minion_config["tcp_pull_port"])
assert server.pull_path is None

View file

@ -1,4 +1,3 @@
import hashlib
import os
import stat
import time
@ -46,36 +45,6 @@ def _assert_got_event(evt, data, msg=None, expected_failure=False):
assert data[key] != evt[key]
def test_master_event(sock_dir):
with salt.utils.event.MasterEvent(str(sock_dir), listen=False) as me:
assert me.pub_path == str(sock_dir / "master_event_pub.ipc")
assert me.pull_path == str(sock_dir / "master_event_pull.ipc")
def test_minion_event(sock_dir):
opts = dict(id="foo", sock_dir=str(sock_dir))
id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes(opts["id"])).hexdigest()[
:10
]
with salt.utils.event.MinionEvent(opts, listen=False) as me:
assert me.pub_path == str(sock_dir / f"minion_event_{id_hash}_pub.ipc")
assert me.pull_path == str(sock_dir / f"minion_event_{id_hash}_pull.ipc")
def test_minion_event_tcp_ipc_mode():
opts = dict(id="foo", ipc_mode="tcp")
with salt.utils.event.MinionEvent(opts, listen=False) as me:
assert me.pub_port == 4510
assert me.pull_port == 4511
def test_minion_event_no_id(sock_dir):
with salt.utils.event.MinionEvent(dict(sock_dir=str(sock_dir)), listen=False) as me:
id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes("")).hexdigest()[:10]
assert me.pub_path == str(sock_dir / f"minion_event_{id_hash}_pub.ipc")
assert me.pull_path == str(sock_dir / f"minion_event_{id_hash}_pull.ipc")
@pytest.mark.slow_test
def test_event_single(sock_dir):
"""Test a single event is received"""

View file

@ -117,7 +117,6 @@ class Collector(salt.utils.process.SignalHandlingProcess):
else:
for msg in self.unpacker:
serial_payload = salt.payload.loads(msg["body"])
# raise tornado.gen.Return(msg["body"])
raise tornado.gen.Return(serial_payload)
byts = yield self.sock.read_bytes(8096, partial=True)
self.unpacker.feed(byts)