From cec5aa517d75344d9506a8e9960341c354f595f0 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Fri, 16 Jun 2023 02:03:24 -0700 Subject: [PATCH] extras --- salt/minion.py | 4 +-- salt/transport/zeromq.py | 58 +++++++++++++++++++++++----------------- salt/utils/event.py | 9 ++++--- 3 files changed, 42 insertions(+), 29 deletions(-) diff --git a/salt/minion.py b/salt/minion.py index fea37abe23d..cd838667c1f 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -285,7 +285,7 @@ def get_proc_dir(cachedir, **kwargs): mode = kwargs.pop("mode", None) if mode is None: - mode = {} + eode = {} else: mode = {"mode": mode} @@ -1068,7 +1068,7 @@ class MinionManager(MinionBase): ipc_publisher.pull_uri = epull_sock_path ipc_publisher.publish_daemon(ipc_publisher.publish_payload) - thread = threading.Thread(target=target) + thread = salt.utils.process.Process(target=target) thread.start() self.event = salt.utils.event.get_event( "minion", opts=self.opts, io_loop=self.io_loop diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index d84f682f2ee..de8e79a17b4 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -234,6 +234,7 @@ class PublishClient(salt.transport.base.PublishClient): **opts, ) self.connect_called = False + self.callbacks = {} def close(self): if self._closing is True: @@ -325,17 +326,21 @@ class PublishClient(salt.transport.base.PublishClient): # the decoded payload to 'ret' and resume operation return payload - def on_recv(self, callback): - """ - Register a callback for received messages (that we didn't initiate) + #@property + #def stream(self): + # """ + # Return the current zmqstream, creating one if necessary + # """ + # if not hasattr(self, "_stream"): + # self._stream = zmq.eventloop.zmqstream.ZMQStream( + # self._socket, io_loop=self.io_loop + # ) + # return self._stream - :param func callback: A function which should be called when data is received - """ - running = asyncio.Event() - running.set() + #def on_recv(self, callback): + # """ + # Register a callback for received messages (that we didn't initiate) - async def recv(self, timeout=None): - return await self._socket.recv() async def recv(self, timeout=None): log.error("SOCK %r %s", self._socket, self.connect_called) @@ -352,9 +357,20 @@ class PublishClient(salt.transport.base.PublishClient): else: return await self._socket.recv() - @tornado.gen.coroutine - def send(self, msg): - self.stream.send(msg, noblock=True) + async def send(self, msg): + return + await self._socket.send(msg) + + def on_recv(self, callback): + + """ + Register a callback for received messages (that we didn't initiate) + + :param func callback: A function which should be called when data is received + """ + running = asyncio.Event() + running.set() + async def consume(running): while running.is_set(): try: @@ -371,13 +387,9 @@ class PublishClient(salt.transport.base.PublishClient): log.error("Exception while running callback", exc_info=True) log.debug("Callback done %r", callback) - task = self.io_loop.create_task(consume(running)) + task = self.io_loop.spawn_callback(consume, running) self.callbacks[callback] = running, task - async def send(self, msg): - await self._socket.send(msg) - - class RequestServer(salt.transport.base.DaemonizedRequestServer): def __init__(self, opts): # pylint: disable=W0231 self.opts = opts @@ -773,7 +785,9 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): self.pull_uri = "ipc://{}".format( os.path.join(self.opts["sock_dir"], "publish_pull.ipc") ) - self.pub_uri = "tcp://{interface}:{publish_port}".format(**self.opts) + interface = self.opts.get("interface", "127.0.0.1") + publish_port = self.opts.get("publish_port", 4560) + self.pub_uri = f"tcp://{interface}:{publish_port}" def connect(self): return tornado.gen.sleep(5) @@ -788,7 +802,8 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): This method represents the Publish Daemon process. It is intended to be run in a thread or process as it creates and runs an it's own ioloop. """ - ioloop = tornado.ioloop.IOLoop.current() + ioloop = tornado.ioloop.IOLoop() + ioloop.asyncio_loop.set_debug(True) self.io_loop = ioloop context = zmq.asyncio.Context() pub_sock = context.socket(zmq.PUB) @@ -999,11 +1014,6 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): if not self.pub_sock: self.pub_connect() log.error("Payload %r", payload) -# if "noserial" not in kwargs: -# serialized = salt.payload.dumps(payload) -# log.error("Serialized %r", serialized) -# self.pub_sock.send(serialized) -# else: self.pub_sock.send(payload) log.debug("Sent payload to publish daemon.") diff --git a/salt/utils/event.py b/salt/utils/event.py index a995ae1ec30..18e423600cc 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -722,6 +722,7 @@ class SaltEvent: if not self.cpub: if not self.connect_pub(): return None + log.error("GET EVENT NOBLOCK %r", self.subscriber) raw = self.subscriber.recv(timeout=0) if raw is None: return None @@ -738,6 +739,7 @@ class SaltEvent: if not self.cpub: if not self.connect_pub(): return None + log.error("GET EVENT BLOCK %r", self.subscriber) raw = self.subscriber.recv(timeout=None) if raw is None: return None @@ -858,12 +860,12 @@ class SaltEvent: ] ) msg = salt.utils.stringutils.to_bytes(event, "utf-8") - log.error("FIRE EVENT %r", msg) if self._run_io_loop_sync: + log.error("FIRE EVENT A %r %r", msg, self.pusher) with salt.utils.asynchronous.current_ioloop(self.io_loop): try: #self.pusher.send(msg) - self.pusher.publish(msg, noserial=True) + self.pusher.publish(msg) except Exception as exc: # pylint: disable=broad-except log.debug( "Publisher send failed with exception: %s", @@ -872,7 +874,8 @@ class SaltEvent: ) raise else: - self.pusher.publish(msg, noserial=True) + log.error("FIRE EVENT B %r %r", msg, self.pusher) + self.pusher.publish(msg) #self.io_loop.spawn_callback(self.pusher.send, msg) return True