This commit is contained in:
Daniel A. Wozniak 2023-06-16 02:03:24 -07:00 committed by Gareth J. Greenaway
parent 149502ebfc
commit cec5aa517d
3 changed files with 42 additions and 29 deletions

View file

@ -285,7 +285,7 @@ def get_proc_dir(cachedir, **kwargs):
mode = kwargs.pop("mode", None)
if mode is None:
mode = {}
eode = {}
else:
mode = {"mode": mode}
@ -1068,7 +1068,7 @@ class MinionManager(MinionBase):
ipc_publisher.pull_uri = epull_sock_path
ipc_publisher.publish_daemon(ipc_publisher.publish_payload)
thread = threading.Thread(target=target)
thread = salt.utils.process.Process(target=target)
thread.start()
self.event = salt.utils.event.get_event(
"minion", opts=self.opts, io_loop=self.io_loop

View file

@ -234,6 +234,7 @@ class PublishClient(salt.transport.base.PublishClient):
**opts,
)
self.connect_called = False
self.callbacks = {}
def close(self):
if self._closing is True:
@ -325,17 +326,21 @@ class PublishClient(salt.transport.base.PublishClient):
# the decoded payload to 'ret' and resume operation
return payload
def on_recv(self, callback):
"""
Register a callback for received messages (that we didn't initiate)
#@property
#def stream(self):
# """
# Return the current zmqstream, creating one if necessary
# """
# if not hasattr(self, "_stream"):
# self._stream = zmq.eventloop.zmqstream.ZMQStream(
# self._socket, io_loop=self.io_loop
# )
# return self._stream
:param func callback: A function which should be called when data is received
"""
running = asyncio.Event()
running.set()
#def on_recv(self, callback):
# """
# Register a callback for received messages (that we didn't initiate)
async def recv(self, timeout=None):
return await self._socket.recv()
async def recv(self, timeout=None):
log.error("SOCK %r %s", self._socket, self.connect_called)
@ -352,9 +357,20 @@ class PublishClient(salt.transport.base.PublishClient):
else:
return await self._socket.recv()
@tornado.gen.coroutine
def send(self, msg):
self.stream.send(msg, noblock=True)
async def send(self, msg):
return
await self._socket.send(msg)
def on_recv(self, callback):
"""
Register a callback for received messages (that we didn't initiate)
:param func callback: A function which should be called when data is received
"""
running = asyncio.Event()
running.set()
async def consume(running):
while running.is_set():
try:
@ -371,13 +387,9 @@ class PublishClient(salt.transport.base.PublishClient):
log.error("Exception while running callback", exc_info=True)
log.debug("Callback done %r", callback)
task = self.io_loop.create_task(consume(running))
task = self.io_loop.spawn_callback(consume, running)
self.callbacks[callback] = running, task
async def send(self, msg):
await self._socket.send(msg)
class RequestServer(salt.transport.base.DaemonizedRequestServer):
def __init__(self, opts): # pylint: disable=W0231
self.opts = opts
@ -773,7 +785,9 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
self.pull_uri = "ipc://{}".format(
os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
)
self.pub_uri = "tcp://{interface}:{publish_port}".format(**self.opts)
interface = self.opts.get("interface", "127.0.0.1")
publish_port = self.opts.get("publish_port", 4560)
self.pub_uri = f"tcp://{interface}:{publish_port}"
def connect(self):
return tornado.gen.sleep(5)
@ -788,7 +802,8 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
This method represents the Publish Daemon process. It is intended to be
run in a thread or process as it creates and runs an it's own ioloop.
"""
ioloop = tornado.ioloop.IOLoop.current()
ioloop = tornado.ioloop.IOLoop()
ioloop.asyncio_loop.set_debug(True)
self.io_loop = ioloop
context = zmq.asyncio.Context()
pub_sock = context.socket(zmq.PUB)
@ -999,11 +1014,6 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
if not self.pub_sock:
self.pub_connect()
log.error("Payload %r", payload)
# if "noserial" not in kwargs:
# serialized = salt.payload.dumps(payload)
# log.error("Serialized %r", serialized)
# self.pub_sock.send(serialized)
# else:
self.pub_sock.send(payload)
log.debug("Sent payload to publish daemon.")

View file

@ -722,6 +722,7 @@ class SaltEvent:
if not self.cpub:
if not self.connect_pub():
return None
log.error("GET EVENT NOBLOCK %r", self.subscriber)
raw = self.subscriber.recv(timeout=0)
if raw is None:
return None
@ -738,6 +739,7 @@ class SaltEvent:
if not self.cpub:
if not self.connect_pub():
return None
log.error("GET EVENT BLOCK %r", self.subscriber)
raw = self.subscriber.recv(timeout=None)
if raw is None:
return None
@ -858,12 +860,12 @@ class SaltEvent:
]
)
msg = salt.utils.stringutils.to_bytes(event, "utf-8")
log.error("FIRE EVENT %r", msg)
if self._run_io_loop_sync:
log.error("FIRE EVENT A %r %r", msg, self.pusher)
with salt.utils.asynchronous.current_ioloop(self.io_loop):
try:
#self.pusher.send(msg)
self.pusher.publish(msg, noserial=True)
self.pusher.publish(msg)
except Exception as exc: # pylint: disable=broad-except
log.debug(
"Publisher send failed with exception: %s",
@ -872,7 +874,8 @@ class SaltEvent:
)
raise
else:
self.pusher.publish(msg, noserial=True)
log.error("FIRE EVENT B %r %r", msg, self.pusher)
self.pusher.publish(msg)
#self.io_loop.spawn_callback(self.pusher.send, msg)
return True