From 7e3a5b10f1da7aa76f41fcaa0290034d6cb2e5fc Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Mon, 19 Jun 2023 15:26:16 -0700 Subject: [PATCH] Minion uses tcp transport for ipc --- salt/channel/client.py | 4 ++ salt/minion.py | 68 ++++++++++++---------- salt/transport/base.py | 1 - salt/transport/zeromq.py | 121 +++++++++++++++++++++++++++++---------- salt/utils/event.py | 28 ++++----- 5 files changed, 148 insertions(+), 74 deletions(-) diff --git a/salt/channel/client.py b/salt/channel/client.py index 18999c78e03..814968a19e3 100644 --- a/salt/channel/client.py +++ b/salt/channel/client.py @@ -376,6 +376,7 @@ class AsyncPubChannel: async_methods = [ "connect", "_decode_messages", + # "close", ] close_methods = [ "close", @@ -455,8 +456,11 @@ class AsyncPubChannel: """ Close the channel """ + log.error("AsyncPubChannel.close called") self.transport.close() + log.error("Transport closed") if self.event is not None: + log.error("Event destroy called") self.event.destroy() self.event = None diff --git a/salt/minion.py b/salt/minion.py index ffeacc40640..821d7c673ff 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -1048,33 +1048,27 @@ class MinionManager(MinionBase): # self.opts, # io_loop=self.io_loop, # ) - def target(): - import hashlib - - self.opts["publish_port"] = 12321 - hash_type = getattr(hashlib, self.opts["hash_type"]) - ipc_publisher = salt.transport.publish_server(self.opts) - id_hash = hash_type( - salt.utils.stringutils.to_bytes(self.opts["id"]) - ).hexdigest()[:10] - epub_sock_path = "ipc://{}".format( - os.path.join( - self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash) - ) + import hashlib + ipc_publisher = salt.transport.publish_server(self.opts) + hash_type = getattr(hashlib, self.opts["hash_type"]) + id_hash = hash_type( + salt.utils.stringutils.to_bytes(self.opts["id"]) + ).hexdigest()[:10] + epub_sock_path = "ipc://{}".format( + os.path.join( + self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash) ) - if os.path.exists(epub_sock_path): - os.unlink(epub_sock_path) - epull_sock_path = "ipc://{}".format( - os.path.join( - self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash) - ) + ) + if os.path.exists(epub_sock_path): + os.unlink(epub_sock_path) + epull_sock_path = "ipc://{}".format( + os.path.join( + self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash) ) - ipc_publisher.pub_uri = epub_sock_path - ipc_publisher.pull_uri = epull_sock_path - ipc_publisher.publish_daemon(ipc_publisher.publish_payload) - - thread = salt.utils.process.Process(target=target) - thread.start() + ) + ipc_publisher.pub_uri = epub_sock_path + ipc_publisher.pull_uri = epull_sock_path + self.io_loop.add_callback(ipc_publisher.publisher, ipc_publisher.publish_payload, self.io_loop) self.event = salt.utils.event.get_event( "minion", opts=self.opts, io_loop=self.io_loop ) @@ -3265,17 +3259,29 @@ class Minion(MinionBase): if self._running is False: return + log.error("Loop status %r %r", self, self.io_loop.asyncio_loop.is_running()) + self.io_loop.asyncio_loop.stop() + log.error("Loop status %r %r", self, self.io_loop.asyncio_loop.is_running()) self._running = False if hasattr(self, "schedule"): del self.schedule if hasattr(self, "pub_channel") and self.pub_channel is not None: self.pub_channel.on_recv(None) - if hasattr(self.pub_channel, "close"): - self.pub_channel.close() - del self.pub_channel - if hasattr(self, "periodic_callbacks"): - for cb in self.periodic_callbacks.values(): - cb.stop() + log.error("create pub_channel.close task %r", self) + self.pub_channel.close() + #self.io_loop.asyncio_loop.run_until_complete(self.pub_channel.close()) + #if hasattr(self.pub_channel, "close"): + # asyncio.create_task( + # self.pub_channel.close() + # ) + # #self.pub_channel.close() + #del self.pub_channel + if hasattr(self, "event"): + log.error("HAS EVENT") + #if hasattr(self, "periodic_callbacks"): + # for cb in self.periodic_callbacks.values(): + # cb.stop() + log.error("%r destroy method finished", self) # pylint: disable=W1701 def __del__(self): diff --git a/salt/transport/base.py b/salt/transport/base.py index 86a2c882deb..55ebae765f0 100644 --- a/salt/transport/base.py +++ b/salt/transport/base.py @@ -85,7 +85,6 @@ def publish_client(opts, io_loop): # switch on available ttypes if ttype == "zeromq": import salt.transport.zeromq - return salt.transport.zeromq.PublishClient(opts, io_loop) elif ttype == "tcp": import salt.transport.tcp diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index 1148041951c..5df0522e820 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -113,7 +113,7 @@ class PublishClient(salt.transport.base.PublishClient): "connect", "connect_uri", "recv", - "close", + #"close", ] close_methods = [ "close", @@ -139,7 +139,9 @@ class PublishClient(salt.transport.base.PublishClient): self.hexid = hashlib.sha1(salt.utils.stringutils.to_bytes(_id)).hexdigest() self._closing = False self.context = zmq.asyncio.Context() + log.error("ZMQ Context creat %r", self) self._socket = self.context.socket(zmq.SUB) + self._socket.setsockopt(zmq.LINGER, -1) if zmq_filtering: # TODO: constants file for "broadcast" self._socket.setsockopt(zmq.SUBSCRIBE, b"broadcast") @@ -221,7 +223,29 @@ class PublishClient(salt.transport.base.PublishClient): elif hasattr(self, "_socket"): self._socket.close(0) if hasattr(self, "context") and self.context.closed is False: + log.error("ZMQ Context term %r", self) self.context.term() + log.error("ZMQ Context after term %r", self) + if self.callbacks: + for cb in self.callbacks: + running, task = self.callbacks[cb] + task.cancel() + + def close(self): + if self._closing is True: + return + self._closing = True + if hasattr(self, "_monitor") and self._monitor is not None: + self._monitor.stop() + self._monitor = None + if hasattr(self, "_stream"): + self._stream.close(0) + elif hasattr(self, "_socket"): + self._socket.close(0) + if hasattr(self, "context") and self.context.closed is False: + log.error("ZMQ Context term %r", self) + self.context.term() + log.error("ZMQ Context after term %r", self) # pylint: enable=W1701 def __enter__(self): @@ -335,18 +359,24 @@ class PublishClient(salt.transport.base.PublishClient): try: while running.is_set(): try: - msg = await self._socket.recv() + log.error("Waiting for pyaload from %r", self.uri) + msg = await self.recv(timeout=None) + log.error("Got for pyaload from %r", self.uri) except zmq.error.ZMQError as exc: log.error("ZMQERROR, %s", exc) # We've disconnected just die break except Exception: # pylint: disable=broad-except + log.error("WTF", exc_info=True) break - try: - await callback(msg) - except Exception: # pylint: disable=broad-except - log.error("Exception while running callback", exc_info=True) - log.debug("Callback done %r", callback) + if msg: + try: + log.error("Running callback for pyaload from %r", self.uri) + await callback(msg) + log.error("Finished callback for pyaload from %r", self.uri) + except Exception: # pylint: disable=broad-except + log.error("Exception while running callback", exc_info=True) + #log.debug("Callback done %r", callback) except Exception as exc: # pylint: disable=broad-except log.error("CONSUME Exception %s %s", self.uri, exc, exc_info=True) log.error("CONSUME ENDING %s", self.uri) @@ -370,6 +400,7 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer): """ self.__setup_signals() context = zmq.Context(self.opts["worker_threads"]) + log.error("ZMQ Context create %r", self) # Prepare the zeromq sockets self.uri = "tcp://{interface}:{ret_port}".format(**self.opts) self.clients = context.socket(zmq.ROUTER) @@ -417,7 +448,9 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer): raise except (KeyboardInterrupt, SystemExit): break + log.error("ZMQ Context term %r", self) context.term() + log.error("ZMQ Context after term %r", self) def close(self): """ @@ -443,7 +476,9 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer): if hasattr(self, "_socket") and self._socket.closed is False: self._socket.close() if hasattr(self, "context") and self.context.closed is False: + log.error("ZMQ Context term %r", self) self.context.term() + log.error("ZMQ Context after term %r", self) def pre_fork(self, process_manager): """ @@ -477,7 +512,7 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer): :param IOLoop io_loop: An instance of a Tornado IOLoop, to handle event scheduling """ # context = zmq.Context(1) - context = zmq.asyncio.Context() + context = zmq.asyncio.Context(1) self._socket = context.socket(zmq.REP) # Linger -1 means we'll never discard messages. self._socket.setsockopt(zmq.LINGER, -1) @@ -561,6 +596,7 @@ def _set_tcp_keepalive(zmq_socket, opts): if "tcp_keepalive_intvl" in opts: zmq_socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, opts["tcp_keepalive_intvl"]) +ctx = zmq.asyncio.Context() # TODO: unit tests! class AsyncReqMessageClient: @@ -591,6 +627,7 @@ class AsyncReqMessageClient: self.io_loop = io_loop self.context = zmq.asyncio.Context() + log.error("ZMQ Context create %r", self) self.send_queue = [] # mapping of message -> future @@ -616,7 +653,9 @@ class AsyncReqMessageClient: self.socket.close() if self.context.closed is False: # This hangs if closing the stream causes an import error + log.error("ZMQ Context term %r", self) self.context.term() + log.error("ZMQ Context after term %r", self) def _init_socket(self): if hasattr(self, "socket"): @@ -624,6 +663,7 @@ class AsyncReqMessageClient: del self.socket self.socket = self.context.socket(zmq.REQ) + self.socket.setsockopt(zmq.LINGER, -1) # socket options if hasattr(zmq, "RECONNECT_IVL_MAX"): @@ -753,7 +793,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): # _sock_data = threading.local() async_methods = [ "publish", - "close", + #"close", ] close_methods = [ "close", @@ -772,11 +812,15 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): interface = self.opts.get("interface", "127.0.0.1") publish_port = self.opts.get("publish_port", 4560) self.pub_uri = f"tcp://{interface}:{publish_port}" - self.ctx = zmq.asyncio.Context() + self.ctx = None self.sock = None + self.deamon_context = None + self.deamon_pub_sock = None + self.deamon_pull_sock = None + self.deamon_monitor = None def __repr__(self): - return f"" def publish_daemon( self, @@ -789,9 +833,14 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): run in a thread or process as it creates and runs an it's own ioloop. """ ioloop = tornado.ioloop.IOLoop() - ioloop.asyncio_loop.set_debug(True) - self.io_loop = ioloop - context = zmq.asyncio.Context() + ioloop.add_callback(self.publisher, publish_payload) + try: + ioloop.start() + finally: + self.close() + + def _get_sockets(self, context, ioloop): + log.error("ZMQ Context create %r", self) pub_sock = context.socket(zmq.PUB) monitor = ZeroMQSocketMonitor(pub_sock) monitor.start_io_loop(ioloop) @@ -837,22 +886,19 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): pull_path, 0o600, ) + return pull_sock, pub_sock, monitor - async def run_publisher(): - await self.publisher(pull_sock, publish_payload) - - ioloop.add_callback(self.publisher, pull_sock, publish_payload) - try: - ioloop.start() - finally: - pub_sock.close() - pull_sock.close() - - async def publisher(self, pull_sock, publish_payload): + async def publisher(self, publish_payload, ioloop=None): + if ioloop is None: + ioloop = tornado.ioloop.IOLoop.current() + ioloop.asyncio_loop.set_debug(True) + self.daemon_context = zmq.asyncio.Context() + self.daemon_pull_sock, self.daemon_pub_sock, self.deamon_monitor = self._get_sockets(self.daemon_context, ioloop) while True: try: - package = await pull_sock.recv() - log.error("Publisher got package %r %s", package, self.pull_uri) + log.error("Publisher wait package %s", self.pull_uri) + package = await self.daemon_pull_sock.recv() + log.error("Publisher got package %s %r", self.pull_uri, package) # payload = salt.payload.loads(package) await publish_payload(package) except Exception as exc: # pylint: disable=broad-except @@ -922,19 +968,36 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): new socket. """ log.debug("Connecting to pub server: %s", self.pull_uri) + self.ctx = zmq.asyncio.Context() self.sock = self.ctx.socket(zmq.PUSH) - self.sock.setsockopt(zmq.LINGER, -1) + self.sock.setsockopt(zmq.LINGER, 300) self.sock.connect(self.pull_uri) return self.sock - async def close(self): + def close(self): """ Disconnect an existing publisher socket and remove it from the local thread's cache. """ sock = self.sock self.sock = None + log.error("Socket close %r", self) sock.close() + log.error("Socket closed %r", self) + if self.ctx and self.ctx.closed is False: + ctx = self.ctx + self.ctx = None + log.error("Context term %r", self) + ctx.term() + log.error("After context term %r", self) + if self.deamon_pub_sock: + self.deamon_pub_sock.close() + if self.deamon_pull_sock: + self.deamon_pull_sock.close() + if self.daemon_monitor: + self.daemon_monitor.close() + if self.deamon_context: + self.deamon_context.term() async def publish(self, payload, **kwargs): """ diff --git a/salt/utils/event.py b/salt/utils/event.py index a3eb1f71f0a..06fea02926c 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -410,7 +410,10 @@ class SaltEvent: """ if not self.cpub: return - + #if isinstance(self.subscriber, salt.utils.asynchronous.SyncWrapper): + # self.subscriber.close() + #else: + # asyncio.create_task(self.subscriber.close()) self.subscriber.close() self.subscriber = None self.pending_events = [] @@ -862,18 +865,17 @@ class SaltEvent: ) msg = salt.utils.stringutils.to_bytes(event, "utf-8") if self._run_io_loop_sync: - log.error("FIRE EVENT A %r %r", msg, self.pusher) - with salt.utils.asynchronous.current_ioloop(self.io_loop): - try: - # self.pusher.send(msg) - self.pusher.publish(msg) - except Exception as exc: # pylint: disable=broad-except - log.debug( - "Publisher send failed with exception: %s", - exc, - exc_info_on_loglevel=logging.DEBUG, - ) - raise + log.error("FIRE EVENT A %r %r", msg, self.pusher.obj) + try: + # self.pusher.send(msg) + self.pusher.publish(msg) + except Exception as exc: # pylint: disable=broad-except + log.debug( + "Publisher send failed with exception: %s", + exc, + exc_info_on_loglevel=logging.DEBUG, + ) + raise else: log.error("FIRE EVENT B %r %r", msg, self.pusher) asyncio.create_task(self.pusher.publish(msg))