From 6320f769ea807e4ab196ebf2ccc18c49ac5d477e Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Sun, 23 Jul 2023 22:35:20 -0700 Subject: [PATCH] Revert windows test fix --- salt/master.py | 21 +----- salt/minion.py | 30 +------- salt/transport/__init__.py | 12 +++- salt/transport/base.py | 111 ++++++++++++++++++++++++------ salt/utils/event.py | 57 +++++---------- tests/pytests/unit/test_minion.py | 3 +- 6 files changed, 121 insertions(+), 113 deletions(-) diff --git a/salt/master.py b/salt/master.py index fe5d39293ba..276e945a0fc 100644 --- a/salt/master.py +++ b/salt/master.py @@ -723,26 +723,7 @@ class Master(SMaster): pub_channels.append(chan) log.info("Creating master event publisher process") - if self.opts["ipc_mode"] == "tcp": - ipc_publisher = salt.transport.publish_server( - self.opts, - transport="tcp", - pub_host="127.0.0.1", - pub_port=int(self.opts["tcp_master_pub_port"]), - pull_host="127.0.0.1", - pull_port=int(self.opts["tcp_master_pull_port"]), - ) - else: - ipc_publisher = salt.transport.publish_server( - self.opts, - transport="tcp", - pub_path=os.path.join( - self.opts["sock_dir"], "master_event_pub.ipc" - ), - pull_path=os.path.join( - self.opts["sock_dir"], "master_event_pull.ipc" - ), - ) + ipc_publisher = salt.transport.ipc_publish_server("master", self.opts) self.process_manager.add_process( ipc_publisher.publish_daemon, args=[ diff --git a/salt/minion.py b/salt/minion.py index 22760b5cafd..0a717808886 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -5,7 +5,6 @@ import asyncio import binascii import contextlib import copy -import hashlib import logging import multiprocessing import os @@ -1045,34 +1044,7 @@ class MinionManager(MinionBase): def _bind(self): # start up the event publisher, so we can see events during startup - hash_type = getattr(hashlib, self.opts["hash_type"]) - id_hash = hash_type( - salt.utils.stringutils.to_bytes(self.opts["id"]) - ).hexdigest()[:10] - if self.opts["ipc_mode"] == "tcp": - ipc_publisher = salt.transport.publish_server( - self.opts, - pub_host="127.0.0.1", - pub_port=int(self.opts["tcp_pub_port"]), - pull_host="127.0.0.1", - pull_port=int(self.opts["tcp_pull_port"]), - transport="tcp", - ) - else: - epub_sock_path = os.path.join( - self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash) - ) - epull_sock_path = os.path.join( - self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash) - ) - if os.path.exists(epub_sock_path): - os.unlink(epub_sock_path) - ipc_publisher = salt.transport.publish_server( - self.opts, - pub_path=epub_sock_path, - pull_path=epull_sock_path, - transport="tcp", - ) + ipc_publisher = salt.transport.ipc_publish_server("minion", self.opts) self.io_loop.spawn_callback( ipc_publisher.publisher, ipc_publisher.publish_payload, self.io_loop ) diff --git a/salt/transport/__init__.py b/salt/transport/__init__.py index 3c7e2326efa..e8ae20f8da2 100644 --- a/salt/transport/__init__.py +++ b/salt/transport/__init__.py @@ -1,8 +1,6 @@ """ Encapsulate the different transports available to Salt. """ - - import logging import warnings @@ -12,6 +10,7 @@ from salt.transport.base import ( publish_server, request_client, request_server, + ipc_publish_server, ) log = logging.getLogger(__name__) @@ -21,3 +20,12 @@ log = logging.getLogger(__name__) warnings.filterwarnings( "ignore", message="IOLoop.current expected instance.*", category=RuntimeWarning ) + +__all__ = ( + "TRANSPORTS", + "publish_client", + "publish_server", + "request_client", + "request_server", + "ipc_publish_server", +) diff --git a/salt/transport/base.py b/salt/transport/base.py index 93a4ed57c53..bcfd0958f3b 100644 --- a/salt/transport/base.py +++ b/salt/transport/base.py @@ -1,4 +1,8 @@ import os +import hashlib + +import salt.utils.stringutils + TRANSPORTS = ( "zeromq", @@ -90,26 +94,6 @@ def publish_server(opts, **kwargs): raise Exception("Transport type not found: {}".format(ttype)) -def ipc_publish_client(opts, io_loop): - # Default to ZeroMQ for now - ttype = "zeromq" - # determine the ttype - if "transport" in opts: - ttype = opts["transport"] - elif "transport" in opts.get("pillar", {}).get("master", {}): - ttype = opts["pillar"]["master"]["transport"] - # switch on available ttypes - if ttype == "zeromq": - import salt.transport.zeromq - - return salt.transport.zeromq.PublishClient(opts, io_loop) - elif ttype == "tcp": - import salt.transport.tcp - - return salt.transport.tcp.TCPPubClient(opts, io_loop) - raise Exception("Transport type not found: {}".format(ttype)) - - def publish_client(opts, io_loop, host=None, port=None, path=None, transport=None): # Default to ZeroMQ for now ttype = "zeromq" @@ -138,6 +122,93 @@ def publish_client(opts, io_loop, host=None, port=None, path=None, transport=Non raise Exception("Transport type not found: {}".format(ttype)) +def _minion_hash(hash_type, minion_id): + """ + Generate a hash string for the minion id + """ + hasher = getattr(hashlib, hash_type) + return hasher(salt.utils.stringutils.to_bytes(minion_id)).hexdigest()[:10] + + +def ipc_publish_client(node, opts, io_loop): + # Default to ZeroMQ for now + ttype = "tcp" + + kwargs = {} + if opts["ipc_mode"] == "tcp": + if node == "master": + kwargs.update( + host="127.0.0.1", + port=int(opts["tcp_master_pub_port"]), + ) + else: + kwargs.update( + host="127.0.0.1", + port=int(opts["tcp_pub_port"]), + ) + else: + if node == "master": + kwargs.update( + path=os.path.join(opts["sock_dir"], "master_event_pub.ipc"), + ) + else: + id_hash = _minion_hash( + hash_type=opts["hash_type"], + minion_id=opts.get("hash_id", opts["id"]), + ) + kwargs.update( + path=os.path.join( + opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash) + ) + ) + return publish_client(opts, io_loop, **kwargs) + + +def ipc_publish_server(node, opts): + # Default to TCP for now + kwargs = {"transport": "tcp"} + if opts["ipc_mode"] == "tcp": + if node == "master": + kwargs.update( + pub_host="127.0.0.1", + pub_port=int(opts["tcp_master_pub_port"]), + pull_host="127.0.0.1", + pull_port=int(opts["tcp_master_pull_port"]), + ) + else: + kwargs.update( + pub_host="127.0.0.1", + pub_port=int(opts["tcp_pub_port"]), + pull_host="127.0.0.1", + pull_port=int(opts["tcp_pull_port"]), + ) + else: + if node == "master": + kwargs.update( + pub_path=os.path.join(opts["sock_dir"], "master_event_pub.ipc"), + pull_path=os.path.join(opts["sock_dir"], "master_event_pull.ipc"), + ) + else: + id_hash = _minion_hash( + hash_type=opts["hash_type"], + minion_id=opts.get("hash_id", opts["id"]), + ) + pub_path = ( + os.path.join( + opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash) + ), + ) + kwargs.update( + pub_path=pub_path, + pull_path=os.path.join( + opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash) + ), + ) + if os.path.exists(pub_path): + os.unlink(pub_path) + return publish_server(opts, **kwargs) + + class RequestClient: """ The RequestClient transport is used to make requests and get corresponding diff --git a/salt/utils/event.py b/salt/utils/event.py index b1438ce12b0..1490a59d362 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -225,6 +225,7 @@ class SaltEvent: is destroyed. This is useful when using event loops from within third party asynchronous code """ + self.node = node self.keep_loop = keep_loop if io_loop is not None: self.io_loop = io_loop @@ -253,14 +254,6 @@ class SaltEvent: if salt.utils.platform.is_windows() and "ipc_mode" not in opts: self.opts["ipc_mode"] = "tcp" - ( - self.pub_host, - self.pub_port, - self.pub_path, - self.pull_host, - self.pull_port, - self.pull_path, - ) = self.__load_uri(sock_dir, node) self.pending_tags = [] self.pending_events = [] self.__load_cache_regex() @@ -395,19 +388,17 @@ class SaltEvent: """ if self.cpub: return True - - kwargs = {"transport": "tcp"} - kwargs.update(host=self.pub_host, port=self.pub_port, path=self.pub_path) if self._run_io_loop_sync: if self.subscriber is None: self.subscriber = salt.utils.asynchronous.SyncWrapper( - salt.transport.publish_client, - args=(self.opts,), - kwargs=kwargs, + salt.transport.ipc_publish_client, + args=( + self.node, + self.opts, + ), loop_kwarg="io_loop", ) try: - # self.subscriber.connect(timeout=timeout) log.debug("Event connect subscriber %r", self.pub_path) self.subscriber.connect(timeout=timeout) self.cpub = True @@ -425,15 +416,11 @@ class SaltEvent: ) else: if self.subscriber is None: - if "master_ip" not in self.opts: - self.opts["master_ip"] = "" - kwargs["io_loop"] = self.io_loop - self.subscriber = salt.transport.publish_client(self.opts, **kwargs) + self.subscriber = salt.transport.ipc_publish_client( + self.node, self.opts, io_loop=self.io_loop + ) log.debug("Event connect subscriber %r", self.pub_path) self.io_loop.spawn_callback(self.subscriber.connect) - # self.subscriber = salt.transport.ipc.IPCMessageSubscriber( - # self.puburi, io_loop=self.io_loop - # ) # For the asynchronous case, the connect will be defered to when # set_event_handler() is invoked. @@ -466,17 +453,11 @@ class SaltEvent: if self._run_io_loop_sync: if self.pusher is None: self.pusher = salt.utils.asynchronous.SyncWrapper( - salt.transport.publish_server, - args=(self.opts,), - kwargs={ - "pub_host": self.pub_host, - "pub_port": self.pub_port, - "pub_path": self.pub_path, - "pull_host": self.pull_host, - "pull_port": self.pull_port, - "pull_path": self.pull_path, - "transport": "tcp", - }, + salt.transport.ipc_publish_server, + args=( + self.node, + self.opts, + ), ) try: # self.pusher.connect(timeout=timeout) @@ -492,15 +473,9 @@ class SaltEvent: ) else: if self.pusher is None: - self.pusher = salt.transport.publish_server( + self.pusher = salt.transport.ipc_publish_server( + self.node, self.opts, - pub_host=self.pub_host, - pub_port=self.pub_port, - pub_path=self.pub_path, - pull_host=self.pull_host, - pull_port=self.pull_port, - pull_path=self.pull_path, - transport="tcp", ) # For the asynchronous case, the connect will be deferred to when # fire_event() is invoked. diff --git a/tests/pytests/unit/test_minion.py b/tests/pytests/unit/test_minion.py index 7a4214a4aa3..a98e96456cc 100644 --- a/tests/pytests/unit/test_minion.py +++ b/tests/pytests/unit/test_minion.py @@ -143,6 +143,7 @@ async def test_send_req_async_regression_62453(minion_opts): minion_opts["random_startup_delay"] = 0 minion_opts["return_retry_tries"] = 30 minion_opts["grains"] = {} + minion_opts["ipc_mode"] = "tcp" with patch("salt.loader.grains"): minion = salt.minion.Minion(minion_opts) @@ -151,7 +152,7 @@ async def test_send_req_async_regression_62453(minion_opts): # We are just validating no exception is raised rtn = await minion._send_req_async(load, timeout) - assert rtn is None + assert rtn is False def test_mine_send_tries():