From 3a84dadfd26eb295e4c7ff885e70ba5f505f7b75 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Mon, 31 Jul 2023 14:37:56 -0700 Subject: [PATCH] Revert changes to un-used deprecated class --- salt/transport/zeromq.py | 116 +++++++++++++++++++++++---------------- 1 file changed, 70 insertions(+), 46 deletions(-) diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index 50988f6218b..6cbd345aeba 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -598,7 +598,6 @@ def _set_tcp_keepalive(zmq_socket, opts): zmq_socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, opts["tcp_keepalive_intvl"]) -# TODO: unit tests! class AsyncReqMessageClient: """ This class wraps the underlying zeromq REQ socket and gives a future-based @@ -629,42 +628,55 @@ class AsyncReqMessageClient: self.io_loop = tornado.ioloop.IOLoop.current() else: self.io_loop = io_loop - self.context = None + + self.context = zmq.Context() self.send_queue = [] # mapping of message -> future self.send_future_map = {} self._closing = False - self.socket = None - self.sending = asyncio.Lock() - async def connect(self): - if self.socket is None: - # wire up sockets - self._init_socket() + def connect(self): + # wire up sockets + self._init_socket() # TODO: timeout all in-flight sessions, or error def close(self): - if self._closing: + try: + if self._closing: + return + except AttributeError: + # We must have been called from __del__ + # The python interpreter has nuked most attributes already return - self._closing = True - if self.socket: - self.socket.close() - self.socket = None - if self.context.closed is False: - # This hangs if closing the stream causes an import error - self.context.term() - self.context = None + else: + self._closing = True + if hasattr(self, "stream") and self.stream is not None: + if ZMQ_VERSION_INFO < (14, 3, 0): + # stream.close() doesn't work properly on pyzmq < 14.3.0 + if self.stream.socket: + self.stream.socket.close() + self.stream.io_loop.remove_handler(self.stream.socket) + # set this to None, more hacks for messed up pyzmq + self.stream.socket = None + self.socket.close() + else: + self.stream.close(1) + self.socket = None + self.stream = None + if self.context.closed is False: + # This hangs if closing the stream causes an import error + self.context.term() def _init_socket(self): - if self.socket is not None: - self.context = zmq.asyncio.Context() + if hasattr(self, "stream"): + self.stream.close() # pylint: disable=E0203 self.socket.close() # pylint: disable=E0203 + del self.stream del self.socket self.socket = self.context.socket(zmq.REQ) - self.socket.setsockopt(zmq.LINGER, -1) # socket options if hasattr(zmq, "RECONNECT_IVL_MAX"): @@ -679,6 +691,9 @@ class AsyncReqMessageClient: self.socket.setsockopt(zmq.IPV4ONLY, 0) self.socket.linger = self.linger self.socket.connect(self.addr) + self.stream = zmq.eventloop.zmqstream.ZMQStream( + self.socket, io_loop=self.io_loop + ) def timeout_message(self, message): """ @@ -693,35 +708,44 @@ class AsyncReqMessageClient: if future is not None: future.set_exception(SaltReqTimeoutError("Message timed out")) - async def _send_recv(self, message): - message = salt.payload.dumps(message) - await self.sending.acquire() - try: - await self.socket.send(message) - ret = await self.socket.recv() - except zmq.error.ZMQError: - self.close() - await self.connect() - await self.socket.send(message) - ret = await self.socket.recv() - finally: - self.sending.release() - return salt.payload.loads(ret) - - async def send(self, message, timeout=None, callback=None): + @tornado.gen.coroutine + def send(self, message, timeout=None, callback=None): """ Return a future which will be completed when the message has a response """ - if not self.socket: - await self.connect() - try: - response = await asyncio.wait_for(self._send_recv(message), timeout=timeout) - if callback: - callback(response) - return response - except TimeoutError: - self.close() - raise + future = tornado.concurrent.Future() + + message = salt.payload.dumps(message) + + if callback is not None: + + def handle_future(future): + response = future.result() + self.io_loop.add_callback(callback, response) + + future.add_done_callback(handle_future) + + # Add this future to the mapping + self.send_future_map[message] = future + + if self.opts.get("detect_mode") is True: + timeout = 1 + + if timeout is not None: + send_timeout = self.io_loop.call_later( + timeout, self.timeout_message, message + ) + + def mark_future(msg): + if not future.done(): + data = salt.payload.loads(msg[0]) + future.set_result(data) + self.send_future_map.pop(message) + + self.stream.on_recv(mark_future) + yield self.stream.send(message) + recv = yield future + raise tornado.gen.Return(recv) class ZeroMQSocketMonitor: