From 0dc6cfc78fa0412f83a5a9f211c145fa0dac23b1 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Fri, 30 Jun 2023 02:28:42 -0700 Subject: [PATCH] Fix minion comming up without master --- salt/minion.py | 2 +- salt/transport/tcp.py | 10 ++ salt/transport/zeromq.py | 113 ++++++++++++++---- .../pytests/functional/channel/test_server.py | 6 +- 4 files changed, 106 insertions(+), 25 deletions(-) diff --git a/salt/minion.py b/salt/minion.py index 037fe84b184..a8c4b793fee 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -1062,7 +1062,7 @@ class MinionManager(MinionBase): pub_path=epub_sock_path, pull_path=epull_sock_path, ) - self.io_loop.add_callback( + self.io_loop.spawn_callback( ipc_publisher.publisher, ipc_publisher.publish_payload, self.io_loop ) self.event = salt.utils.event.get_event( diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 08827ed879e..beeb67d5dca 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -390,13 +390,17 @@ class TCPPubClient(salt.transport.base.PublishClient): self._read_in_progress.release() async def on_recv_handler(self, callback): + log.error("ON RECV HANDLER") while not self._stream: await asyncio.sleep(0.003) while True: try: + log.error("ON RECV HANDLER - RECV") msg = await self.recv() + log.error("ON RECV HANDLER - RECVED") logit = True except tornado.iostream.StreamClosedError: + log.error("Stream Closed") self._stream.close() self._stream = None await self._connect() @@ -404,6 +408,8 @@ class TCPPubClient(salt.transport.base.PublishClient): self.disconnect_callback() self.unpacker = salt.utils.msgpack.Unpacker() continue + except: + log.error("Stream Closed", exc_info=True) callback(msg) def on_recv(self, callback): @@ -1432,6 +1438,7 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer): process_manager.add_process(self.publish_daemon, name=self.__class__.__name__) async def publish_payload(self, payload, *args): + log.error("publisher - publish payload") return await self.pub_server.publish_payload(payload) def connect(self): @@ -1448,6 +1455,7 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer): Publish "load" to minions """ if not self.pub_sock: + log.error("CONNECT") self.connect() # if self.opts.get("ipc_mode", "") == "tcp": # pull_uri = int(self.opts.get("tcp_master_publish_pull", 4514)) @@ -1461,6 +1469,7 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer): # ) # self.pub_sock.connect() # await self.pub_sock.send(payload) + log.error("publish payload") self.pub_sock.send(payload) def close(self): @@ -1694,6 +1703,7 @@ class TCPReqClient(salt.transport.base.RequestClient): self._connecting_future = tornado.concurrent.Future() self._stream_return_running = False self._stream = None + self.disconnect_callback = None async def getstream(self, **kwargs): if self.source_ip or self.source_port: diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index ce64265cb8b..2104a99a874 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -254,6 +254,8 @@ class PublishClient(salt.transport.base.PublishClient): # TODO: this is the time to see if we are connected, maybe use the req channel to guess? async def connect(self, port=None, connect_callback=None, disconnect_callback=None): self.connect_called = True + if port is not None: + self.port = port if self.path: pub_uri = f"ipc://{self.path}" log.debug("Connecting the publisher client to: %s", pub_uri) @@ -635,8 +637,7 @@ class AsyncReqMessageClient: self.io_loop = tornado.ioloop.IOLoop.current() else: self.io_loop = io_loop - - self.context = zmq.asyncio.Context() + self.context = None self.send_queue = [] # mapping of message -> future @@ -644,7 +645,7 @@ class AsyncReqMessageClient: self._closing = False self.socket = None - self.sending = False + self.sending = asyncio.Lock() async def connect(self): if self.socket is None: @@ -658,12 +659,15 @@ class AsyncReqMessageClient: self._closing = True if self.socket: self.socket.close() + self.socket = None if self.context.closed is False: # This hangs if closing the stream causes an import error self.context.term() + self.context = None def _init_socket(self): if self.socket is not None: + self.context = zmq.asyncio.Context() self.socket.close() # pylint: disable=E0203 del self.socket @@ -698,28 +702,28 @@ class AsyncReqMessageClient: future.set_exception(SaltReqTimeoutError("Message timed out")) async def _send_recv(self, message): - if not self.socket: - await self.connect() message = salt.payload.dumps(message) await self.socket.send(message) ret = await self.socket.recv() - data = salt.payload.loads(ret) - return data + return salt.payload.loads(ret) async def send(self, message, timeout=None, callback=None): """ Return a future which will be completed when the message has a response """ - while self.sending: - await asyncio.sleep(0.03) - self.sending = True + if not self.socket: + await self.connect() + await self.sending.acquire() try: response = await asyncio.wait_for(self._send_recv(message), timeout=timeout) if callback: callback(response) return response + except TimeoutError: + self.close() + raise finally: - self.sending = False + self.sending.release() class ZeroMQSocketMonitor: @@ -1043,24 +1047,87 @@ class RequestClient(salt.transport.base.RequestClient): ttype = "zeromq" - def __init__(self, opts, io_loop): # pylint: disable=W0231 + def __init__(self, opts, io_loop, linger=0): # pylint: disable=W0231 self.opts = opts - master_uri = self.get_master_uri(opts) - self.message_client = AsyncReqMessageClient( - self.opts, - master_uri, - io_loop=io_loop, - ) + self.master_uri = self.get_master_uri(opts) + self.linger = linger + if io_loop is None: + self.io_loop = tornado.ioloop.IOLoop.current() + else: + self.io_loop = io_loop + self.context = None + self.send_queue = [] + # mapping of message -> future + self.send_future_map = {} + self._closing = False + self.socket = None + self.sending = asyncio.Lock() async def connect(self): - await self.message_client.connect() + if self.socket is None: + # wire up sockets + self._init_socket() - async def send(self, load, timeout=60): - await self.connect() - return await self.message_client.send(load, timeout=timeout) + def _init_socket(self): + if self.socket is not None: + self.context = zmq.asyncio.Context() + self.socket.close() # pylint: disable=E0203 + del self.socket + self.context = zmq.asyncio.Context() + self.socket = self.context.socket(zmq.REQ) + self.socket.setsockopt(zmq.LINGER, -1) + # socket options + if hasattr(zmq, "RECONNECT_IVL_MAX"): + self.socket.setsockopt(zmq.RECONNECT_IVL_MAX, 5000) + + _set_tcp_keepalive(self.socket, self.opts) + if self.master_uri.startswith("tcp://["): + # Hint PF type if bracket enclosed IPv6 address + if hasattr(zmq, "IPV6"): + self.socket.setsockopt(zmq.IPV6, 1) + elif hasattr(zmq, "IPV4ONLY"): + self.socket.setsockopt(zmq.IPV4ONLY, 0) + self.socket.linger = self.linger + self.socket.connect(self.master_uri) + + # TODO: timeout all in-flight sessions, or error def close(self): - self.message_client.close() + if self._closing: + return + self._closing = True + if self.socket: + self.socket.close() + self.socket = None + if self.context.closed is False: + # This hangs if closing the stream causes an import error + self.context.term() + self.context = None + + async def _send_recv(self, message): + message = salt.payload.dumps(message) + await self.socket.send(message) + ret = await self.socket.recv() + return salt.payload.loads(ret) + + async def send(self, message, timeout=None, callback=None): + """ + Return a future which will be completed when the message has a response + """ + if not self.socket: + await self.connect() + await self.sending.acquire() + try: + response = await asyncio.wait_for(self._send_recv(message), timeout=timeout) + if callback: + callback(response) + return response + except TimeoutError: + self.close() + except Exception: + self.close() + finally: + self.sending.release() @staticmethod def get_master_uri(opts): diff --git a/tests/pytests/functional/channel/test_server.py b/tests/pytests/functional/channel/test_server.py index cda4d1ba6f4..f13284504ab 100644 --- a/tests/pytests/functional/channel/test_server.py +++ b/tests/pytests/functional/channel/test_server.py @@ -137,7 +137,11 @@ def _connect_and_publish( io_loop.stop() channel.on_recv(cb) - server.publish({"tgt_type": "glob", "tgt": [channel_minion_id], "WTF": "SON"}) + log.error("TEST - RUN PUBLISH") + io_loop.spawn_callback( + server.publish, {"tgt_type": "glob", "tgt": [channel_minion_id], "WTF": "SON"} + ) + # server.publish({"tgt_type": "glob", "tgt": [channel_minion_id], "WTF": "SON"}) start = time.time() while time.time() - start < timeout: yield tornado.gen.sleep(1)