mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Forward publish through master evetn bus.
Forward new jobs being published to minions through the master's event bus. This will facilitate multiple masters being notified of publish events.
This commit is contained in:
parent
a4e60cae7e
commit
e407cb99ae
3 changed files with 59 additions and 20 deletions
|
@ -219,7 +219,13 @@ def access_keys(opts):
|
|||
acl_users.add(salt.utils.user.get_user())
|
||||
for user in acl_users:
|
||||
log.info("Preparing the %s key for local communication", user)
|
||||
key = mk_key(opts, user)
|
||||
|
||||
keyfile = os.path.join(opts["cachedir"], ".{}_key".format(user))
|
||||
if os.path.exists(keyfile):
|
||||
with salt.utils.files.fopen(keyfile, "r") as fp:
|
||||
key = salt.utils.stringutils.to_unicode(fp.read())
|
||||
else:
|
||||
key = mk_key(opts, user)
|
||||
if key is not None:
|
||||
keys[user] = key
|
||||
|
||||
|
@ -231,7 +237,11 @@ def access_keys(opts):
|
|||
if user not in keys and salt.utils.stringutils.check_whitelist_blacklist(
|
||||
user, whitelist=acl_users
|
||||
):
|
||||
keys[user] = mk_key(opts, user)
|
||||
if os.path.exists(keyfile):
|
||||
with salt.utils.files.fopen(keyfile, "r") as fp:
|
||||
keys[user] = salt.utils.stringutils.to_unicode(fp.read())
|
||||
else:
|
||||
keys[user] = mk_key(opts, user)
|
||||
log.profile("End pwd.getpwall() call in masterapi access_keys function")
|
||||
|
||||
return keys
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
This module contains all of the routines needed to set up a master server, this
|
||||
involves preparing the three listeners and the workers needed by the master.
|
||||
"""
|
||||
import asyncio
|
||||
import collections
|
||||
import copy
|
||||
import ctypes
|
||||
|
@ -732,6 +731,12 @@ class Master(SMaster):
|
|||
name="EventPublisher",
|
||||
)
|
||||
|
||||
self.process_manager.add_process(
|
||||
PublishForwarder,
|
||||
args=[self.opts],
|
||||
name="PublishForwarder",
|
||||
)
|
||||
|
||||
if self.opts.get("reactor"):
|
||||
if isinstance(self.opts["engines"], list):
|
||||
rine = False
|
||||
|
@ -843,6 +848,44 @@ class Master(SMaster):
|
|||
sys.exit(0)
|
||||
|
||||
|
||||
class PublishForwarder(salt.utils.process.SignalHandlingProcess):
|
||||
"""
|
||||
Forward events from the event bus to the publish server.
|
||||
"""
|
||||
|
||||
def __init__(self, opts, channels=None, name="PublishForwarder"):
|
||||
super().__init__(name=name)
|
||||
self.opts = opts
|
||||
if channels is None:
|
||||
channels = []
|
||||
self.channels = channels
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def handle_event(self, package):
|
||||
"""
|
||||
Event handler for publish forwarder
|
||||
"""
|
||||
tag, data = salt.utils.event.SaltEvent.unpack(package)
|
||||
log.error("event tag is %r", tag)
|
||||
if tag.startswith("salt/job") and tag.endswith("/publish"):
|
||||
log.info("Forward job event to publisher server: %r", package)
|
||||
if not self.channels:
|
||||
for transport, opts in iter_transport_opts(self.opts):
|
||||
chan = salt.channel.server.PubServerChannel.factory(opts)
|
||||
self.channels.append(chan)
|
||||
for chan in self.channels:
|
||||
yield chan.publish(data)
|
||||
|
||||
def run(self):
|
||||
io_loop = tornado.ioloop.IOLoop()
|
||||
with salt.utils.event.get_master_event(
|
||||
self.opts, self.opts["sock_dir"], io_loop=io_loop, listen=True
|
||||
) as event_bus:
|
||||
event_bus.subscribe("")
|
||||
event_bus.set_event_handler(self.handle_event)
|
||||
io_loop.start()
|
||||
|
||||
|
||||
class ReqServer(salt.utils.process.SignalHandlingProcess):
|
||||
"""
|
||||
Starts up the master request server, minions send results to this
|
||||
|
@ -2310,8 +2353,8 @@ class ClearFuncs(TransportMethods):
|
|||
payload["auth_list"] = auth_list
|
||||
|
||||
# Send it!
|
||||
self.event.fire_event(payload, tagify([jid, "publish"], "job"))
|
||||
self._send_ssh_pub(payload, ssh_minions=ssh_minions)
|
||||
await self._send_pub(payload)
|
||||
|
||||
return {
|
||||
"enc": "clear",
|
||||
|
@ -2360,19 +2403,6 @@ class ClearFuncs(TransportMethods):
|
|||
return {"error": msg}
|
||||
return jid
|
||||
|
||||
async def _send_pub(self, load):
|
||||
"""
|
||||
Take a load and send it across the network to connected minions
|
||||
"""
|
||||
if not self.channels:
|
||||
for transport, opts in iter_transport_opts(self.opts):
|
||||
chan = salt.channel.server.PubServerChannel.factory(opts)
|
||||
self.channels.append(chan)
|
||||
tasks = set()
|
||||
for chan in self.channels:
|
||||
tasks.add(asyncio.create_task(chan.publish(load)))
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
@property
|
||||
def ssh_client(self):
|
||||
if not hasattr(self, "_ssh_client"):
|
||||
|
|
|
@ -315,7 +315,6 @@ def test_clear_funcs_black(master_opts):
|
|||
"_prep_auth_info",
|
||||
"_prep_jid",
|
||||
"_prep_pub",
|
||||
"_send_pub",
|
||||
"_send_ssh_pub",
|
||||
"connect",
|
||||
"destroy",
|
||||
|
@ -333,8 +332,8 @@ def test_clear_funcs_black(master_opts):
|
|||
|
||||
|
||||
def test_clear_funcs_get_method(clear_funcs):
|
||||
assert getattr(clear_funcs, "_send_pub", None) is not None
|
||||
assert clear_funcs.get_method("_send_pub") is None
|
||||
assert getattr(clear_funcs, "_prep_pub", None) is not None
|
||||
assert clear_funcs.get_method("_prep_pub") is None
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
|
|
Loading…
Add table
Reference in a new issue