Publish new job directly to publisher and also to event bus

This commit is contained in:
Daniel A. Wozniak 2023-08-10 14:03:55 -07:00 committed by Gareth J. Greenaway
parent 39fa242248
commit 6ba1b273ba

View file

@ -2,6 +2,7 @@
This module contains all of the routines needed to set up a master server, this
involves preparing the three listeners and the workers needed by the master.
"""
import asyncio
import collections
import copy
import ctypes
@ -894,12 +895,12 @@ class EventMonitor(salt.utils.process.SignalHandlingProcess):
if tag.startswith("salt/job") and tag.endswith("/publish"):
data.pop("_stamp", None)
log.trace("Forward job event to publisher server: %r", data)
if not self.channels:
for transport, opts in iter_transport_opts(self.opts):
chan = salt.channel.server.PubServerChannel.factory(opts)
self.channels.append(chan)
for chan in self.channels:
yield chan.publish(data)
# if not self.channels:
# for transport, opts in iter_transport_opts(self.opts):
# chan = salt.channel.server.PubServerChannel.factory(opts)
# self.channels.append(chan)
# for chan in self.channels:
# yield chan.publish(data)
elif tag == "rotate_aes_key":
SMaster.rotate_secrets(self.opts, owner=False)
@ -2382,6 +2383,7 @@ class ClearFuncs(TransportMethods):
# Send it!
self.event.fire_event(payload, tagify([jid, "publish"], "job"))
self._send_ssh_pub(payload, ssh_minions=ssh_minions)
await self._send_pub(payload)
return {
"enc": "clear",
@ -2430,6 +2432,19 @@ class ClearFuncs(TransportMethods):
return {"error": msg}
return jid
async def _send_pub(self, load):
"""
Take a load and send it across the network to connected minions
"""
if not self.channels:
for transport, opts in iter_transport_opts(self.opts):
chan = salt.channel.server.PubServerChannel.factory(opts)
self.channels.append(chan)
tasks = set()
for chan in self.channels:
tasks.add(asyncio.create_task(chan.publish(load)))
await asyncio.gather(*tasks)
@property
def ssh_client(self):
if not hasattr(self, "_ssh_client"):