mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Revert changes to un-used deprecated class
This commit is contained in:
parent
528134ec18
commit
3a84dadfd2
1 changed files with 70 additions and 46 deletions
|
@ -598,7 +598,6 @@ def _set_tcp_keepalive(zmq_socket, opts):
|
|||
zmq_socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, opts["tcp_keepalive_intvl"])
|
||||
|
||||
|
||||
# TODO: unit tests!
|
||||
class AsyncReqMessageClient:
|
||||
"""
|
||||
This class wraps the underlying zeromq REQ socket and gives a future-based
|
||||
|
@ -629,42 +628,55 @@ class AsyncReqMessageClient:
|
|||
self.io_loop = tornado.ioloop.IOLoop.current()
|
||||
else:
|
||||
self.io_loop = io_loop
|
||||
self.context = None
|
||||
|
||||
self.context = zmq.Context()
|
||||
|
||||
self.send_queue = []
|
||||
# mapping of message -> future
|
||||
self.send_future_map = {}
|
||||
|
||||
self._closing = False
|
||||
self.socket = None
|
||||
self.sending = asyncio.Lock()
|
||||
|
||||
async def connect(self):
|
||||
if self.socket is None:
|
||||
# wire up sockets
|
||||
self._init_socket()
|
||||
def connect(self):
|
||||
# wire up sockets
|
||||
self._init_socket()
|
||||
|
||||
# TODO: timeout all in-flight sessions, or error
|
||||
def close(self):
|
||||
if self._closing:
|
||||
try:
|
||||
if self._closing:
|
||||
return
|
||||
except AttributeError:
|
||||
# We must have been called from __del__
|
||||
# The python interpreter has nuked most attributes already
|
||||
return
|
||||
self._closing = True
|
||||
if self.socket:
|
||||
self.socket.close()
|
||||
self.socket = None
|
||||
if self.context.closed is False:
|
||||
# This hangs if closing the stream causes an import error
|
||||
self.context.term()
|
||||
self.context = None
|
||||
else:
|
||||
self._closing = True
|
||||
if hasattr(self, "stream") and self.stream is not None:
|
||||
if ZMQ_VERSION_INFO < (14, 3, 0):
|
||||
# stream.close() doesn't work properly on pyzmq < 14.3.0
|
||||
if self.stream.socket:
|
||||
self.stream.socket.close()
|
||||
self.stream.io_loop.remove_handler(self.stream.socket)
|
||||
# set this to None, more hacks for messed up pyzmq
|
||||
self.stream.socket = None
|
||||
self.socket.close()
|
||||
else:
|
||||
self.stream.close(1)
|
||||
self.socket = None
|
||||
self.stream = None
|
||||
if self.context.closed is False:
|
||||
# This hangs if closing the stream causes an import error
|
||||
self.context.term()
|
||||
|
||||
def _init_socket(self):
|
||||
if self.socket is not None:
|
||||
self.context = zmq.asyncio.Context()
|
||||
if hasattr(self, "stream"):
|
||||
self.stream.close() # pylint: disable=E0203
|
||||
self.socket.close() # pylint: disable=E0203
|
||||
del self.stream
|
||||
del self.socket
|
||||
|
||||
self.socket = self.context.socket(zmq.REQ)
|
||||
self.socket.setsockopt(zmq.LINGER, -1)
|
||||
|
||||
# socket options
|
||||
if hasattr(zmq, "RECONNECT_IVL_MAX"):
|
||||
|
@ -679,6 +691,9 @@ class AsyncReqMessageClient:
|
|||
self.socket.setsockopt(zmq.IPV4ONLY, 0)
|
||||
self.socket.linger = self.linger
|
||||
self.socket.connect(self.addr)
|
||||
self.stream = zmq.eventloop.zmqstream.ZMQStream(
|
||||
self.socket, io_loop=self.io_loop
|
||||
)
|
||||
|
||||
def timeout_message(self, message):
|
||||
"""
|
||||
|
@ -693,35 +708,44 @@ class AsyncReqMessageClient:
|
|||
if future is not None:
|
||||
future.set_exception(SaltReqTimeoutError("Message timed out"))
|
||||
|
||||
async def _send_recv(self, message):
|
||||
message = salt.payload.dumps(message)
|
||||
await self.sending.acquire()
|
||||
try:
|
||||
await self.socket.send(message)
|
||||
ret = await self.socket.recv()
|
||||
except zmq.error.ZMQError:
|
||||
self.close()
|
||||
await self.connect()
|
||||
await self.socket.send(message)
|
||||
ret = await self.socket.recv()
|
||||
finally:
|
||||
self.sending.release()
|
||||
return salt.payload.loads(ret)
|
||||
|
||||
async def send(self, message, timeout=None, callback=None):
|
||||
@tornado.gen.coroutine
|
||||
def send(self, message, timeout=None, callback=None):
|
||||
"""
|
||||
Return a future which will be completed when the message has a response
|
||||
"""
|
||||
if not self.socket:
|
||||
await self.connect()
|
||||
try:
|
||||
response = await asyncio.wait_for(self._send_recv(message), timeout=timeout)
|
||||
if callback:
|
||||
callback(response)
|
||||
return response
|
||||
except TimeoutError:
|
||||
self.close()
|
||||
raise
|
||||
future = tornado.concurrent.Future()
|
||||
|
||||
message = salt.payload.dumps(message)
|
||||
|
||||
if callback is not None:
|
||||
|
||||
def handle_future(future):
|
||||
response = future.result()
|
||||
self.io_loop.add_callback(callback, response)
|
||||
|
||||
future.add_done_callback(handle_future)
|
||||
|
||||
# Add this future to the mapping
|
||||
self.send_future_map[message] = future
|
||||
|
||||
if self.opts.get("detect_mode") is True:
|
||||
timeout = 1
|
||||
|
||||
if timeout is not None:
|
||||
send_timeout = self.io_loop.call_later(
|
||||
timeout, self.timeout_message, message
|
||||
)
|
||||
|
||||
def mark_future(msg):
|
||||
if not future.done():
|
||||
data = salt.payload.loads(msg[0])
|
||||
future.set_result(data)
|
||||
self.send_future_map.pop(message)
|
||||
|
||||
self.stream.on_recv(mark_future)
|
||||
yield self.stream.send(message)
|
||||
recv = yield future
|
||||
raise tornado.gen.Return(recv)
|
||||
|
||||
|
||||
class ZeroMQSocketMonitor:
|
||||
|
|
Loading…
Add table
Reference in a new issue