From d52df08f220d7f3cb7ddfb2118cc9bf6196725f7 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Mon, 3 Jul 2023 00:01:58 -0700 Subject: [PATCH] Preserve ipc_mod logic --- salt/master.py | 26 ++++-- salt/minion.py | 38 +++++--- salt/transport/tcp.py | 53 +++++++---- salt/utils/event.py | 92 +++++++++++++++----- tests/pytests/unit/utils/event/test_event.py | 16 ++-- 5 files changed, 159 insertions(+), 66 deletions(-) diff --git a/salt/master.py b/salt/master.py index 6ce1765fa2e..fe5d39293ba 100644 --- a/salt/master.py +++ b/salt/master.py @@ -723,12 +723,26 @@ class Master(SMaster): pub_channels.append(chan) log.info("Creating master event publisher process") - ipc_publisher = salt.transport.publish_server( - self.opts, - pub_path=os.path.join(self.opts["sock_dir"], "master_event_pub.ipc"), - pull_path=os.path.join(self.opts["sock_dir"], "master_event_pull.ipc"), - transport="tcp", - ) + if self.opts["ipc_mode"] == "tcp": + ipc_publisher = salt.transport.publish_server( + self.opts, + transport="tcp", + pub_host="127.0.0.1", + pub_port=int(self.opts["tcp_master_pub_port"]), + pull_host="127.0.0.1", + pull_port=int(self.opts["tcp_master_pull_port"]), + ) + else: + ipc_publisher = salt.transport.publish_server( + self.opts, + transport="tcp", + pub_path=os.path.join( + self.opts["sock_dir"], "master_event_pub.ipc" + ), + pull_path=os.path.join( + self.opts["sock_dir"], "master_event_pull.ipc" + ), + ) self.process_manager.add_process( ipc_publisher.publish_daemon, args=[ diff --git a/salt/minion.py b/salt/minion.py index 5e6b8c652df..55b95c7e764 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -1049,20 +1049,30 @@ class MinionManager(MinionBase): id_hash = hash_type( salt.utils.stringutils.to_bytes(self.opts["id"]) ).hexdigest()[:10] - epub_sock_path = os.path.join( - self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash) - ) - epull_sock_path = os.path.join( - self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash) - ) - if os.path.exists(epub_sock_path): - os.unlink(epub_sock_path) - ipc_publisher = salt.transport.publish_server( - self.opts, - pub_path=epub_sock_path, - pull_path=epull_sock_path, - transport="tcp", - ) + if self.opts["ipc_mode"] == "tcp": + ipc_publisher = salt.transport.publish_server( + self.opts, + pub_host="127.0.0.1", + pub_port=int(self.opts["tcp_pub_port"]), + pull_host="127.0.0.1", + pull_port=int(self.opts["tcp_pull_port"]), + transport="tcp", + ) + else: + epub_sock_path = os.path.join( + self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash) + ) + epull_sock_path = os.path.join( + self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash) + ) + if os.path.exists(epub_sock_path): + os.unlink(epub_sock_path) + ipc_publisher = salt.transport.publish_server( + self.opts, + pub_path=epub_sock_path, + pull_path=epull_sock_path, + transport="tcp", + ) self.io_loop.spawn_callback( ipc_publisher.publisher, ipc_publisher.publish_payload, self.io_loop ) diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 09ae4099d14..6c57362fb63 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -298,7 +298,9 @@ class TCPPubClient(salt.transport.base.PublishClient): ssl_options=self.opts.get("ssl"), **kwargs, ) - log.error("PubClient conencted to %r %r:%r", self, self.host, self.port) + log.error( + "PubClient conencted to %r %r:%r", self, self.host, self.port + ) else: sock_type = socket.AF_UNIX stream = tornado.iostream.IOStream( @@ -398,7 +400,11 @@ class TCPPubClient(salt.transport.base.PublishClient): elif timeout: try: return await asyncio.wait_for(self.recv(), timeout=timeout) - except (TimeoutError, asyncio.exceptions.TimeoutError, asyncio.exceptions.CancelledError): + except ( + TimeoutError, + asyncio.exceptions.TimeoutError, + asyncio.exceptions.CancelledError, + ): self.close() await self.connect() return @@ -416,9 +422,9 @@ class TCPPubClient(salt.transport.base.PublishClient): self.close() await self.connect() continue - #except AttributeError: + # except AttributeError: # return - #except Exception: + # except Exception: # raise finally: self._read_in_progress.release() @@ -1325,9 +1331,7 @@ class TCPPuller: # pylint: disable=W1701 def __del__(self): if not self._closing: - warnings.warn( - "unclosed tcp puller {self!r}", ResourceWarning, source=self - ) + warnings.warn("unclosed tcp puller {self!r}", ResourceWarning, source=self) # pylint: enable=W1701 @@ -1368,10 +1372,18 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer): return not self.opts.get("order_masters", False) def __setstate__(self, state): - self.__init__(state["opts"]) + self.__init__(**state) def __getstate__(self): - return {"opts": self.opts} + return { + "opts": self.opts, + "pub_host": self.pub_host, + "pub_port": self.pub_port, + "pub_path": self.pub_path, + "pull_host": self.pull_host, + "pull_port": self.pull_port, + "pull_path": self.pull_path, + } def publish_daemon( self, @@ -1459,7 +1471,11 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer): log.debug("Connect pusher %s", self.pull_path) self.pub_sock = salt.utils.asynchronous.SyncWrapper( _TCPPubServerPublisher, - (self.pull_path,), + ( + self.pull_host, + self.pull_port, + self.pull_path, + ), loop_kwarg="io_loop", ) self.pub_sock.connect() @@ -1519,7 +1535,7 @@ class _TCPPubServerPublisher: "close", ] - def __init__(self, socket_path, io_loop=None): + def __init__(self, host, port, path, io_loop=None): """ Create a new IPC client @@ -1529,7 +1545,9 @@ class _TCPPubServerPublisher: """ self.io_loop = io_loop or tornado.ioloop.IOLoop.current() - self.socket_path = socket_path + self.host = host + self.port = port + self.path = path self._closing = False self.stream = None # msgpack deprecated `encoding` starting with version 0.5.2 @@ -1573,12 +1591,12 @@ class _TCPPubServerPublisher: """ Connect to a running IPCServer """ - if isinstance(self.socket_path, int): + if isinstance(self.path, int): sock_type = socket.AF_INET - sock_addr = ("127.0.0.1", self.socket_path) + sock_addr = (self.host, self.port) else: sock_type = socket.AF_UNIX - sock_addr = self.socket_path + sock_addr = self.path self.stream = None if timeout is not None: @@ -1595,7 +1613,10 @@ class _TCPPubServerPublisher: ) try: log.trace( - "TCPMessageClient: Connecting to socket: %s", self.socket_path + "TCPMessageClient: Connecting to socket: %s:%s %s", + self.host, + self.port, + self.path, ) await self.stream.connect(sock_addr) self._connecting_future.set_result(True) diff --git a/salt/utils/event.py b/salt/utils/event.py index 61273d9854c..32c37ed3293 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -253,7 +253,14 @@ class SaltEvent: if salt.utils.platform.is_windows() and "ipc_mode" not in opts: self.opts["ipc_mode"] = "tcp" - self.puburi, self.pulluri = self.__load_uri(sock_dir, node) + ( + self.pub_host, + self.pub_port, + self.pub_path, + self.pull_host, + self.pull_port, + self.pull_path, + ) = self.__load_uri(sock_dir, node) self.pending_tags = [] self.pending_events = [] self.__load_cache_regex() @@ -281,17 +288,53 @@ class SaltEvent: Return the string URI for the location of the pull and pub sockets to use for firing and listening to events """ + pub_host = None + pub_port = None + pub_path = None + pull_host = None + pull_port = None + pull_path = None if node == "master": if self.opts["ipc_mode"] == "tcp": - puburi = int(self.opts["tcp_master_pub_port"]) - pulluri = int(self.opts["tcp_master_pull_port"]) + pub_host = "127.0.0.1" + pub_port = int(self.opts["tcp_master_pub_port"]) + pull_host = "127.0.0.1" + pull_port = int(self.opts["tcp_master_pull_port"]) + log.debug( + "%s PUB socket URI: %s:%s", + self.__class__.__name__, + pub_host, + pub_port, + ) + log.debug( + "%s PULL socket URI: %s:%s", + self.__class__.__name__, + pull_host, + pull_port, + ) else: - puburi = os.path.join(sock_dir, "master_event_pub.ipc") - pulluri = os.path.join(sock_dir, "master_event_pull.ipc") + pub_path = os.path.join(sock_dir, "master_event_pub.ipc") + pull_path = os.path.join(sock_dir, "master_event_pull.ipc") + log.debug("%s PUB socket URI: %s", self.__class__.__name__, pub_path) + log.debug("%s PULL socket URI: %s", self.__class__.__name__, pull_path) else: if self.opts["ipc_mode"] == "tcp": - puburi = int(self.opts["tcp_pub_port"]) - pulluri = int(self.opts["tcp_pull_port"]) + pub_host = "127.0.0.1" + pub_port = int(self.opts["tcp_pub_port"]) + pull_host = "127.0.0.1" + pull_port = int(self.opts["tcp_pull_port"]) + log.debug( + "%s PUB socket URI: %s:%s", + self.__class__.__name__, + pub_host, + pub_port, + ) + log.debug( + "%s PULL socket URI: %s:%s", + self.__class__.__name__, + pull_host, + pull_port, + ) else: hash_type = getattr(hashlib, self.opts["hash_type"]) # Only use the first 10 chars to keep longer hashes from exceeding the @@ -300,15 +343,15 @@ class SaltEvent: id_hash = hash_type( salt.utils.stringutils.to_bytes(minion_id) ).hexdigest()[:10] - puburi = os.path.join( + pub_path = os.path.join( sock_dir, "minion_event_{}_pub.ipc".format(id_hash) ) - pulluri = os.path.join( + pull_path = os.path.join( sock_dir, "minion_event_{}_pull.ipc".format(id_hash) ) - log.debug("%s PUB socket URI: %s", self.__class__.__name__, puburi) - log.debug("%s PULL socket URI: %s", self.__class__.__name__, pulluri) - return puburi, pulluri + log.debug("%s PUB socket URI: %s", self.__class__.__name__, pub_path) + log.debug("%s PULL socket URI: %s", self.__class__.__name__, pull_path) + return pub_host, pub_port, pub_path, pull_host, pull_port, pull_path def subscribe(self, tag=None, match_type=None): """ @@ -354,10 +397,7 @@ class SaltEvent: return True kwargs = {"transport": "tcp"} - if isinstance(self.puburi, int): - kwargs.update(host="127.0.0.1", port=self.puburi) - else: - kwargs.update(path=self.puburi) + kwargs.update(host=self.pub_host, port=self.pub_port, path=self.pub_path) if self._run_io_loop_sync: if self.subscriber is None: self.subscriber = salt.utils.asynchronous.SyncWrapper( @@ -368,7 +408,7 @@ class SaltEvent: ) try: # self.subscriber.connect(timeout=timeout) - log.debug("Event connect subscriber %r", self.puburi) + log.debug("Event connect subscriber %r", self.pub_path) self.subscriber.connect() self.cpub = True except tornado.iostream.StreamClosedError: @@ -389,7 +429,7 @@ class SaltEvent: self.opts["master_ip"] = "" kwargs["io_loop"] = self.io_loop self.subscriber = salt.transport.publish_client(self.opts, **kwargs) - log.debug("Event connect subscriber %r", self.puburi) + log.debug("Event connect subscriber %r", self.pub_path) self.io_loop.spawn_callback(self.subscriber.connect) # self.subscriber = salt.transport.ipc.IPCMessageSubscriber( # self.puburi, io_loop=self.io_loop @@ -429,8 +469,12 @@ class SaltEvent: salt.transport.publish_server, args=(self.opts,), kwargs={ - "pub_path": self.puburi, - "pull_path": self.pulluri, + "pub_host": self.pub_host, + "pub_port": self.pub_port, + "pub_path": self.pub_path, + "pull_host": self.pull_host, + "pull_port": self.pull_port, + "pull_path": self.pull_path, "transport": "tcp", }, ) @@ -450,8 +494,12 @@ class SaltEvent: if self.pusher is None: self.pusher = salt.transport.publish_server( self.opts, - pub_path=self.puburi, - pull_path=self.pulluri, + pub_host=self.pub_host, + pub_port=self.pub_port, + pub_path=self.pub_path, + pull_host=self.pull_host, + pull_port=self.pull_port, + pull_path=self.pull_path, transport="tcp", ) # For the asynchronous case, the connect will be deferred to when diff --git a/tests/pytests/unit/utils/event/test_event.py b/tests/pytests/unit/utils/event/test_event.py index bf52b9b717d..fc42053f4b4 100644 --- a/tests/pytests/unit/utils/event/test_event.py +++ b/tests/pytests/unit/utils/event/test_event.py @@ -48,8 +48,8 @@ def _assert_got_event(evt, data, msg=None, expected_failure=False): def test_master_event(sock_dir): with salt.utils.event.MasterEvent(str(sock_dir), listen=False) as me: - assert me.puburi == str(sock_dir / "master_event_pub.ipc") - assert me.pulluri == str(sock_dir / "master_event_pull.ipc") + assert me.pub_path == str(sock_dir / "master_event_pub.ipc") + assert me.pull_path == str(sock_dir / "master_event_pull.ipc") def test_minion_event(sock_dir): @@ -58,22 +58,22 @@ def test_minion_event(sock_dir): :10 ] with salt.utils.event.MinionEvent(opts, listen=False) as me: - assert me.puburi == str(sock_dir / f"minion_event_{id_hash}_pub.ipc") - assert me.pulluri == str(sock_dir / f"minion_event_{id_hash}_pull.ipc") + assert me.pub_path == str(sock_dir / f"minion_event_{id_hash}_pub.ipc") + assert me.pull_path == str(sock_dir / f"minion_event_{id_hash}_pull.ipc") def test_minion_event_tcp_ipc_mode(): opts = dict(id="foo", ipc_mode="tcp") with salt.utils.event.MinionEvent(opts, listen=False) as me: - assert me.puburi == 4510 - assert me.pulluri == 4511 + assert me.pub_port == 4510 + assert me.pull_port == 4511 def test_minion_event_no_id(sock_dir): with salt.utils.event.MinionEvent(dict(sock_dir=str(sock_dir)), listen=False) as me: id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes("")).hexdigest()[:10] - assert me.puburi == str(sock_dir / f"minion_event_{id_hash}_pub.ipc") - assert me.pulluri == str(sock_dir / f"minion_event_{id_hash}_pull.ipc") + assert me.pub_path == str(sock_dir / f"minion_event_{id_hash}_pub.ipc") + assert me.pull_path == str(sock_dir / f"minion_event_{id_hash}_pull.ipc") @pytest.mark.slow_test