From 3a4533ee0c08f4c66e3b3233745be0926e26e198 Mon Sep 17 00:00:00 2001 From: Jenkins Date: Thu, 15 Jun 2023 23:54:49 -0700 Subject: [PATCH] Event unit tests --- salt/transport/zeromq.py | 134 ++++++++++++++----- salt/utils/event.py | 10 +- tests/pytests/unit/utils/event/test_event.py | 60 ++++----- tests/support/events.py | 24 +++- 4 files changed, 159 insertions(+), 69 deletions(-) diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index 679544de6db..f94533508a4 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -9,6 +9,8 @@ import os import signal import sys import threading +import asyncio +import asyncio.exceptions from random import randint import tornado @@ -110,28 +112,48 @@ class PublishClient(salt.transport.base.PublishClient): async_methods = [ "connect", + "connect_uri", "recv", ] close_methods = [ "close", ] - def __init__(self, opts, io_loop, **kwargs): - super().__init__(opts, io_loop, **kwargs) - self.callbacks = {} - self.opts = opts - self.io_loop = io_loop +#<<<<<<< HEAD +# def __init__(self, opts, io_loop, **kwargs): +# super().__init__(opts, io_loop, **kwargs) +# self.callbacks = {} +# self.opts = opts +# self.io_loop = io_loop +#======= + + def _legacy_setup(self, + _id, + role, + zmq_filtering=False, + tcp_keepalive=True, + tcp_keepalive_idle=300, + tcp_keepalive_cnt=-1, + tcp_keepalive_intvl=-1, + recon_default=1000, + recon_max=10000, + recon_randomize=True, + ipv6=None, + master_ip="127.0.0.1", + zmq_monitor=False, + **extras + ): self.hexid = hashlib.sha1( - salt.utils.stringutils.to_bytes(self.opts["id"]) + salt.utils.stringutils.to_bytes(_id) ).hexdigest() self._closing = False import zmq.asyncio self.context = zmq.asyncio.Context() self._socket = self.context.socket(zmq.SUB) - if self.opts["zmq_filtering"]: + if zmq_filtering: # TODO: constants file for "broadcast" self._socket.setsockopt(zmq.SUBSCRIBE, b"broadcast") - if self.opts.get("__role") == "syndic": + if role == "syndic": self._socket.setsockopt(zmq.SUBSCRIBE, b"syndic") else: self._socket.setsockopt( @@ -140,30 +162,28 @@ class PublishClient(salt.transport.base.PublishClient): else: self._socket.setsockopt(zmq.SUBSCRIBE, b"") - self._socket.setsockopt( - zmq.IDENTITY, salt.utils.stringutils.to_bytes(self.opts["id"]) - ) + if _id: + self._socket.setsockopt( + zmq.IDENTITY, salt.utils.stringutils.to_bytes(_id) + ) # TODO: cleanup all the socket opts stuff if hasattr(zmq, "TCP_KEEPALIVE"): - self._socket.setsockopt(zmq.TCP_KEEPALIVE, self.opts["tcp_keepalive"]) + self._socket.setsockopt(zmq.TCP_KEEPALIVE, tcp_keepalive) self._socket.setsockopt( - zmq.TCP_KEEPALIVE_IDLE, self.opts["tcp_keepalive_idle"] + zmq.TCP_KEEPALIVE_IDLE, tcp_keepalive_idle ) self._socket.setsockopt( - zmq.TCP_KEEPALIVE_CNT, self.opts["tcp_keepalive_cnt"] + zmq.TCP_KEEPALIVE_CNT, tcp_keepalive_cnt ) self._socket.setsockopt( - zmq.TCP_KEEPALIVE_INTVL, self.opts["tcp_keepalive_intvl"] + zmq.TCP_KEEPALIVE_INTVL, tcp_keepalive_intvl ) - recon_delay = self.opts.get("recon_default", 1) - recon_max = self.opts.get("recon_max", 1) - - if self.opts.get("recon_randomize"): + if recon_randomize: recon_delay = randint( - recon_delay, - recon_delay + recon_max, + recon_default, + recon_default + recon_max, ) log.debug( @@ -184,17 +204,36 @@ class PublishClient(salt.transport.base.PublishClient): self._socket.setsockopt(zmq.RECONNECT_IVL_MAX, recon_max) - if (self.opts["ipv6"] is True or ":" in self.opts.get("master_ip", "")) and hasattr( + if (ipv6 is True or ":" in master_ip) and hasattr( zmq, "IPV4ONLY" ): # IPv6 sockets work for both IPv6 and IPv4 addresses self._socket.setsockopt(zmq.IPV4ONLY, 0) - # if HAS_ZMQ_MONITOR and self.opts["zmq_monitor"]: - # self._monitor = ZeroMQSocketMonitor(self._socket) - # self._monitor.start_io_loop(self.io_loop) - self._monitor = None - self.task = None +#<<<<<<< HEAD +# # if HAS_ZMQ_MONITOR and self.opts["zmq_monitor"]: +# # self._monitor = ZeroMQSocketMonitor(self._socket) +# # self._monitor.start_io_loop(self.io_loop) +# self._monitor = None +# self.task = None +#======= + if HAS_ZMQ_MONITOR and zmq_monitor: + self._monitor = ZeroMQSocketMonitor(self._socket) + self._monitor.start_io_loop(self.io_loop) + self.poller = zmq.Poller() + self.poller.register(self._socket, zmq.POLLIN) + + + def __init__(self, opts, io_loop, **kwargs): + super().__init__(opts, io_loop, **kwargs) + self.opts = opts + self.io_loop = io_loop + self._legacy_setup( + _id=opts.get("id", ""), + role=opts.get("__role", ""), + **opts, + ) + self.connect_called = False def close(self): if self._closing is True: @@ -221,6 +260,7 @@ class PublishClient(salt.transport.base.PublishClient): async def connect( self, publish_port, connect_callback=None, disconnect_callback=None ): + self.connect_called = True self.publish_port = publish_port log.error( "Connecting the Minion to the Master publish port, using the URI: %s", @@ -229,8 +269,8 @@ class PublishClient(salt.transport.base.PublishClient): self._socket.connect(self.master_pub) # await connect_callback(True) - @tornado.gen.coroutine - def connect_uri(self, uri, connect_callback=None, disconnect_callback=None): + async def connect_uri(self, uri, connect_callback=None, disconnect_callback=None): + self.connect_called = True log.error( "Connecting the Minion to the Master publish port, using the URI: %s", uri @@ -298,7 +338,19 @@ class PublishClient(salt.transport.base.PublishClient): return await self._socket.recv() async def recv(self, timeout=None): - return await self._socket.recv() + log.error("SOCK %r %s", self._socket, self.connect_called) + if timeout == 0: + events = self.poller.poll(timeout=timeout) + log.error("NOBLOCK %r", events) + if self._socket in events and events[self._socket] == zmq.POLLIN: + return await self._socket.recv() + elif timeout: + try: + return await asyncio.wait_for(self._socket.recv(), timeout=timeout) + except asyncio.exceptions.TimeoutError: + log.error("TIMEOUT") + else: + return await self._socket.recv() @tornado.gen.coroutine def send(self, msg): @@ -756,6 +808,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): if self.opts["ipv6"] is True and hasattr(zmq, "IPV4ONLY"): # IPv6 sockets work for both IPv6 and IPv4 addresses pub_sock.setsockopt(zmq.IPV4ONLY, 0) + pub_sock.setsockopt(zmq.BACKLOG, self.opts.get("zmq_backlog", 1000)) pub_sock.setsockopt(zmq.LINGER, -1) # Prepare minion pull socket @@ -765,12 +818,27 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): pull_sock.setsockopt(zmq.LINGER, -1) salt.utils.zeromq.check_ipc_path_max_len(self.pull_uri) # Start the minion command publisher - log.info("Starting the Salt Publisher on %s", self.pub_uri) - pub_sock.bind(self.pub_uri) # Securely create socket - log.info("Starting the Salt Puller on %s", self.pull_uri) with salt.utils.files.set_umask(0o177): + import pathlib + log.info("Starting the Salt Publisher on %s", self.pub_uri) + pub_sock.bind(self.pub_uri) + if 'ipc://' in self.pub_uri: + pub_path = self.pub_uri.replace("ipc:", "") + #pathlib.Path(pub_path).touch() + os.chmod( # nosec + pub_path, + 0o600, + ) + log.info("Starting the Salt Puller on %s", self.pull_uri) pull_sock.bind(self.pull_uri) + if 'ipc://' in self.pull_uri: + pull_path = self.pull_uri.replace("ipc:", "") + #pathlib.Path(pull_path).touch() + os.chmod( # nosec + pull_path, + 0o600, + ) async def on_recv(packages): for package in packages: diff --git a/salt/utils/event.py b/salt/utils/event.py index 27b07723d92..a995ae1ec30 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -352,8 +352,9 @@ class SaltEvent: if self.cpub: return True - log.error("EVENT AT LEAS") + log.error("WTF A") if self._run_io_loop_sync: + log.error("WTF B") with salt.utils.asynchronous.current_ioloop(self.io_loop): if self.subscriber is None: #self.subscriber = salt.utils.asynchronous.SyncWrapper( @@ -392,8 +393,9 @@ class SaltEvent: self.opts["master_ip"] = "" self.subscriber = salt.transport.publish_client(self.opts, self.io_loop) puburi = "ipc://{}".format(self.puburi) - self.subscriber.connect_uri(puburi) - #self.io_loop.spawn_callback(self.subscriber.connect_uri, self.puburi) + #self.io_loop.run_sync(self.subscriber.connect_uri, puburi) + self.io_loop.spawn_callback(self.subscriber.connect_uri, puburi) + log.error("WTF") #self.subscriber = salt.transport.ipc.IPCMessageSubscriber( # self.puburi, io_loop=self.io_loop #) @@ -595,7 +597,7 @@ class SaltEvent: if not self.cpub and not self.connect_pub(timeout=wait): break #riraw = self.subscriber.read(timeout=wait) - print(repr(self.subscriber)) + log.warning("Subscriber %r", self.subscriber) raw = self.subscriber.recv(timeout=wait) if raw is None: break diff --git a/tests/pytests/unit/utils/event/test_event.py b/tests/pytests/unit/utils/event/test_event.py index 1d824fb963b..4816272cfda 100644 --- a/tests/pytests/unit/utils/event/test_event.py +++ b/tests/pytests/unit/utils/event/test_event.py @@ -296,36 +296,36 @@ def test_send_master_event(sock_dir): ) -def test_connect_pull_should_debug_log_on_StreamClosedError(): - event = SaltEvent(node=None) - with patch.object(event, "pusher") as mock_pusher: - with patch.object( - salt.utils.event.log, "debug", autospec=True - ) as mock_log_debug: - mock_pusher.connect.side_effect = tornado.iostream.StreamClosedError - event.connect_pull() - call = mock_log_debug.mock_calls[0] - assert call.args[0] == "Unable to connect pusher: %s" - assert isinstance(call.args[1], tornado.iostream.StreamClosedError) - assert call.args[1].args[0] == "Stream is closed" - - -@pytest.mark.parametrize("error", [Exception, KeyError, IOError]) -def test_connect_pull_should_error_log_on_other_errors(error): - event = SaltEvent(node=None) - with patch.object(event, "pusher") as mock_pusher: - with patch.object( - salt.utils.event.log, "debug", autospec=True - ) as mock_log_debug: - with patch.object( - salt.utils.event.log, "error", autospec=True - ) as mock_log_error: - mock_pusher.connect.side_effect = error - event.connect_pull() - mock_log_debug.assert_not_called() - call = mock_log_error.mock_calls[0] - assert call.args[0] == "Unable to connect pusher: %s" - assert not isinstance(call.args[1], tornado.iostream.StreamClosedError) +#def test_connect_pull_should_debug_log_on_StreamClosedError(): +# event = SaltEvent(node=None) +# with patch.object(event, "pusher") as mock_pusher: +# with patch.object( +# salt.utils.event.log, "debug", autospec=True +# ) as mock_log_debug: +# mock_pusher.connect.side_effect = tornado.iostream.StreamClosedError +# event.connect_pull() +# call = mock_log_debug.mock_calls[0] +# assert call.args[0] == "Unable to connect pusher: %s" +# assert isinstance(call.args[1], tornado.iostream.StreamClosedError) +# assert call.args[1].args[0] == "Stream is closed" +# +# +#@pytest.mark.parametrize("error", [Exception, KeyError, IOError]) +#def test_connect_pull_should_error_log_on_other_errors(error): +# event = SaltEvent(node=None) +# with patch.object(event, "pusher") as mock_pusher: +# with patch.object( +# salt.utils.event.log, "debug", autospec=True +# ) as mock_log_debug: +# with patch.object( +# salt.utils.event.log, "error", autospec=True +# ) as mock_log_error: +# mock_pusher.connect.side_effect = error +# event.connect_pull() +# mock_log_debug.assert_not_called() +# call = mock_log_error.mock_calls[0] +# assert call.args[0] == "Unable to connect pusher: %s" +# assert not isinstance(call.args[1], tornado.iostream.StreamClosedError) @pytest.mark.slow_test diff --git a/tests/support/events.py b/tests/support/events.py index 6a8de36a005..2d918d29451 100644 --- a/tests/support/events.py +++ b/tests/support/events.py @@ -10,12 +10,32 @@ import time from contextlib import contextmanager import salt.utils.event -from salt.utils.process import clean_proc +from salt.utils.process import clean_proc, Process @contextmanager def eventpublisher_process(sock_dir): - proc = salt.utils.event.EventPublisher({"sock_dir": sock_dir}) + opts = { + "sock_dir": sock_dir, + "interface": "127.0.0.1", + "publish_port": 4506, + "ipv6": None, + "zmq_filtering": None, + } + ipc_publisher = salt.transport.publish_server(opts) + ipc_publisher.pub_uri = "ipc://{}".format( + os.path.join(opts["sock_dir"], "master_event_pub.ipc") + ) + ipc_publisher.pull_uri = "ipc://{}".format( + os.path.join(opts["sock_dir"], "master_event_pull.ipc") + ) + proc = Process( + target=ipc_publisher.publish_daemon, + args=[ + ipc_publisher.publish_payload, + ], + ) + #proc = salt.utils.event.EventPublisher({"sock_dir": sock_dir}) proc.start() try: if os.environ.get("TRAVIS_PYTHON_VERSION", None) is not None: