mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Revert change to zmq transport
This commit is contained in:
parent
10b714cf55
commit
e39246b1ef
1 changed files with 55 additions and 17 deletions
|
@ -342,28 +342,66 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
# raise Exception("Send not supported")
|
||||
# await self._socket.send(msg)
|
||||
|
||||
async def on_recv_handler(self, callback):
|
||||
while not self._socket:
|
||||
# Retry quickly, we may want to increase this if it's hogging cpu.
|
||||
await asyncio.sleep(0.003)
|
||||
while True:
|
||||
msg = await self.recv()
|
||||
if msg:
|
||||
await callback(msg)
|
||||
#async def on_recv_handler(self, callback):
|
||||
# while not self._socket:
|
||||
# # Retry quickly, we may want to increase this if it's hogging cpu.
|
||||
# await asyncio.sleep(0.003)
|
||||
# while True:
|
||||
# msg = await self.recv()
|
||||
# if msg:
|
||||
# await callback(msg)
|
||||
|
||||
#def on_recv(self, callback):
|
||||
# """
|
||||
# Register a callback for received messages (that we didn't initiate)
|
||||
# """
|
||||
# if self.on_recv_task:
|
||||
# # XXX: We are not awaiting this canceled task. This still needs to
|
||||
# # be addressed.
|
||||
# self.on_recv_task.cancel()
|
||||
# if callback is None:
|
||||
# self.on_recv_task = None
|
||||
# else:
|
||||
# self.on_recv_task = asyncio.create_task(self.on_recv_handler(callback))
|
||||
|
||||
def on_recv(self, callback):
|
||||
|
||||
"""
|
||||
Register a callback for received messages (that we didn't initiate)
|
||||
"""
|
||||
if self.on_recv_task:
|
||||
# XXX: We are not awaiting this canceled task. This still needs to
|
||||
# be addressed.
|
||||
self.on_recv_task.cancel()
|
||||
if callback is None:
|
||||
self.on_recv_task = None
|
||||
else:
|
||||
self.on_recv_task = asyncio.create_task(self.on_recv_handler(callback))
|
||||
|
||||
:param func callback: A function which should be called when data is received
|
||||
"""
|
||||
if callback is None:
|
||||
callbacks = self.callbacks
|
||||
self.callbacks = {}
|
||||
for callback, (running, task) in callbacks.items():
|
||||
running.clear()
|
||||
return
|
||||
|
||||
running = asyncio.Event()
|
||||
running.set()
|
||||
|
||||
async def consume(running):
|
||||
try:
|
||||
while running.is_set():
|
||||
try:
|
||||
msg = await self.recv(timeout=None)
|
||||
except zmq.error.ZMQError as exc:
|
||||
# We've disconnected just die
|
||||
break
|
||||
if msg:
|
||||
try:
|
||||
await callback(msg)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
log.error("Exception while running callback", exc_info=True)
|
||||
# log.debug("Callback done %r", callback)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
log.error(
|
||||
"Exception while consuming%s %s", self.uri, exc, exc_info=True
|
||||
)
|
||||
|
||||
task = self.io_loop.spawn_callback(consume, running)
|
||||
self.callbacks[callback] = running, task
|
||||
|
||||
|
||||
class RequestServer(salt.transport.base.DaemonizedRequestServer):
|
||||
|
|
Loading…
Add table
Reference in a new issue