Use closure to preserve message future

This commit is contained in:
Daniel A. Wozniak 2023-09-20 14:57:14 -07:00 committed by Pedro Algarvio
parent 84d58f717b
commit 529d27061e

View file

@ -519,6 +519,7 @@ class AsyncReqMessageClient:
self._closing = False
self._future = None
self._send_future_map = {}
self.lock = salt.ext.tornado.locks.Lock()
def connect(self):
@ -577,18 +578,6 @@ class AsyncReqMessageClient:
self.stream = zmq.eventloop.zmqstream.ZMQStream(
self.socket, io_loop=self.io_loop
)
self.stream.on_recv(self.handle_reply)
def timeout_message(self, future):
"""
Handle a message timeout by removing it from the sending queue
and informing the caller
:raises: SaltReqTimeoutError
"""
if self._future == future:
self._future = None
future.set_exception(SaltReqTimeoutError("Message timed out"))
@salt.ext.tornado.gen.coroutine
def send(self, message, timeout=None, callback=None):
@ -610,22 +599,26 @@ class AsyncReqMessageClient:
if self.opts.get("detect_mode") is True:
timeout = 1
def timeout_message(future):
if not future.done():
future.set_exception(SaltReqTimeoutError("Message timed out"))
if timeout is not None:
send_timeout = self.io_loop.call_later(
timeout, self.timeout_message, future
timeout, timeout_message, future
)
def mark_future(msg):
if not future.done():
data = salt.payload.loads(msg[0])
future.set_result(data)
with (yield self.lock.acquire()):
self._future = future
self.stream.on_recv(mark_future)
yield self.stream.send(message)
recv = yield future
raise salt.ext.tornado.gen.Return(recv)
def handle_reply(self, msg):
data = salt.payload.loads(msg[0])
future = self._future
self._future = None
future.set_result(data)
raise salt.ext.tornado.gen.Return(recv)
class ZeroMQSocketMonitor: