Initial commit of large event fix

This commit is contained in:
Daniel A. Wozniak 2024-12-12 14:13:18 -07:00 committed by Daniel Wozniak
parent 66e46a32d8
commit f6aa6ff807
2 changed files with 177 additions and 91 deletions

View file

@ -17,6 +17,7 @@ import threading
import time
import traceback
import types
import uuid
import salt
import salt.beacons
@ -1072,7 +1073,7 @@ class MinionManager(MinionBase):
@salt.ext.tornado.gen.coroutine
def handle_event(self, package):
for minion in self.minions:
minion.handle_event(package)
yield minion.handle_event(package)
def _create_minion_object(
self,
@ -1397,13 +1398,8 @@ class Minion(MinionBase):
self.req_channel = salt.channel.client.AsyncReqChannel.factory(
self.opts, io_loop=self.io_loop
)
if hasattr(
self.req_channel, "connect"
): # TODO: consider generalizing this for all channels
log.debug("Connecting minion's long-running req channel")
yield self.req_channel.connect()
log.debug("Connecting minion's long-running req channel")
yield self.req_channel.connect()
yield self._post_master_init(master)
@salt.ext.tornado.gen.coroutine
@ -1626,6 +1622,7 @@ class Minion(MinionBase):
return functions, returners, errors, executors
def _send_req_sync(self, load, timeout):
# XXX: Signing should happen in RequestChannel to be fixed in 3008
if self.opts["minion_sign_messages"]:
log.trace("Signing event to be published onto the bus.")
minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem")
@ -1633,18 +1630,25 @@ class Minion(MinionBase):
minion_privkey_path, salt.serializers.msgpack.serialize(load)
)
load["sig"] = sig
with salt.utils.event.get_event(
"minion", opts=self.opts, listen=False
) as event:
return event.fire_event(
with salt.utils.event.get_event("minion", opts=self.opts, listen=True) as event:
request_id = str(uuid.uuid4())
log.debug("Send request to main id=%s", request_id)
event.fire_event(
load,
f"__master_req_channel_payload/{self.opts['master']}",
f"__master_req_channel_payload/{request_id}/{self.opts['master']}",
timeout=timeout,
)
ret = event.get_event(
tag=f"__master_req_channel_return/{request_id}",
wait=timeout,
)
log.trace("Reply from main %s", request_id)
return ret["ret"]
@salt.ext.tornado.gen.coroutine
def _send_req_async(self, load, timeout):
# XXX: Signing should happen in RequestChannel to be fixed in 3008
# XXX: This is only used by syndic
if self.opts["minion_sign_messages"]:
log.trace("Signing event to be published onto the bus.")
minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem")
@ -1652,31 +1656,52 @@ class Minion(MinionBase):
minion_privkey_path, salt.serializers.msgpack.serialize(load)
)
load["sig"] = sig
with salt.utils.event.get_event(
"minion", opts=self.opts, listen=False
) as event:
ret = yield event.fire_event_async(
with salt.utils.event.get_event("minion", opts=self.opts, listen=True) as event:
request_id = str(uuid.uuid4())
log.debug(
"Sending req to main thread. id=%s",
request_id,
)
yield event.fire_event_async(
load,
f"__master_req_channel_payload/{self.opts['master']}",
f"__master_req_channel_payload/{request_id}/{self.opts['master']}",
timeout=timeout,
)
raise salt.ext.tornado.gen.Return(ret)
start = time.time()
while time.time() - start < timeout:
ret = event.get_event(
tag=f"__master_req_channel_return/{request_id}", no_block=True
)
if ret:
break
yield salt.ext.tornado.gen.sleep(0.3)
else:
raise TimeoutError("Did not recieve return event")
log.debug("Recieved request reply %s %r", request_id, ret)
raise salt.ext.tornado.gen.Return(ret["ret"])
def _fire_master(
self,
data=None,
tag=None,
events=None,
pretag=None,
timeout=60,
sync=True,
timeout_handler=None,
include_startup_grains=False,
@salt.ext.tornado.gen.coroutine
def _send_req_async_main(self, load, timeout):
"""
Send a request to the master's request server. To be called from the
top level process in the main thread only. Worker threads and
processess should call _send_req_sync or _send_req_async as nessecery.
"""
if self.opts["minion_sign_messages"]:
log.trace("Signing event to be published onto the bus.")
minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem")
sig = salt.crypt.sign_message(
minion_privkey_path, salt.serializers.msgpack.serialize(load)
)
load["sig"] = sig
ret = yield self.req_channel.send(
load, timeout=timeout, tries=self.opts["return_retry_tries"]
)
raise salt.ext.tornado.gen.Return(ret)
def _fire_master_prepare(
self, data, tag, events, pretag, include_startup_grains=False
):
"""
Fire an event on the master, or drop message if unable to send.
"""
load = {
"id": self.opts["id"],
"cmd": "_minion_event",
@ -1701,35 +1726,62 @@ class Minion(MinionBase):
if k in self.opts["start_event_grains"]
}
load["grains"] = grains_to_add
return load
if sync:
try:
self._send_req_sync(load, timeout)
except salt.exceptions.SaltReqTimeoutError:
@salt.ext.tornado.gen.coroutine
def _fire_master_main(
self,
data=None,
tag=None,
events=None,
pretag=None,
timeout=60,
timeout_handler=None,
include_startup_grains=False,
):
load = self._fire_master_prepare(
data, tag, events, pretag, include_startup_grains
)
if timeout_handler is None:
def handle_timeout(*_):
log.info(
"fire_master failed: master could not be contacted. Request timed"
" out."
"fire_master failed: master could not be contacted. Request"
" timed out."
)
return False
except Exception: # pylint: disable=broad-except
log.info("fire_master failed: %s", traceback.format_exc())
return False
else:
if timeout_handler is None:
return True
def handle_timeout(*_):
log.info(
"fire_master failed: master could not be contacted. Request"
" timed out."
)
return True
timeout_handler = handle_timeout
timeout_handler = handle_timeout
yield self._send_req_async_main(load, timeout)
with salt.ext.tornado.stack_context.ExceptionStackContext(timeout_handler):
# pylint: disable=unexpected-keyword-arg
self._send_req_async(load, timeout, callback=lambda f: None)
# pylint: enable=unexpected-keyword-arg
def _fire_master(
self,
data=None,
tag=None,
events=None,
pretag=None,
timeout=60,
timeout_handler=None,
include_startup_grains=False,
):
"""
Fire an event on the master, or drop message if unable to send.
"""
load = self._fire_master_prepare(
data, tag, events, pretag, include_startup_grains
)
try:
self._send_req_sync(load, timeout)
except salt.exceptions.SaltReqTimeoutError:
log.info(
"fire_master failed: master could not be contacted. Request timed"
" out."
)
return False
except Exception: # pylint: disable=broad-except
log.info("fire_master failed: %s", traceback.format_exc())
return False
return True
@salt.ext.tornado.gen.coroutine
@ -2228,10 +2280,7 @@ class Minion(MinionBase):
except Exception as exc: # pylint: disable=broad-except
log.error("The return failed for job %s: %s", data["jid"], exc)
def _return_pub(self, ret, ret_cmd="_return", timeout=60, sync=True):
"""
Return the data from the executed command to the master server
"""
def _prepare_return_pub(self, ret, ret_cmd="_return"):
jid = ret.get("jid", ret.get("__jid__"))
fun = ret.get("fun", ret.get("__fun__"))
if self.opts["multiprocessing"]:
@ -2285,7 +2334,12 @@ class Minion(MinionBase):
if ret["jid"] == "req":
ret["jid"] = salt.utils.jid.gen_jid(self.opts)
salt.utils.minion.cache_jobs(self.opts, ret["jid"], ret)
return load
@salt.ext.tornado.gen.coroutine
def _return_pub_main(self, ret, ret_cmd="_return", timeout=60):
jid = ret.get("jid", ret.get("__jid__"))
load = self._prepare_return_pub(ret, ret_cmd)
if not self.opts["pub_ret"]:
return ""
@ -2299,20 +2353,38 @@ class Minion(MinionBase):
)
return True
if sync:
try:
ret_val = self._send_req_sync(load, timeout=timeout)
except SaltReqTimeoutError:
timeout_handler()
return ""
else:
with salt.ext.tornado.stack_context.ExceptionStackContext(timeout_handler):
# pylint: disable=unexpected-keyword-arg
ret_val = self._send_req_async(
load, timeout=timeout, callback=lambda f: None
)
# pylint: enable=unexpected-keyword-arg
try:
ret_val = yield self._send_req_async_main(load, timeout=timeout)
except SaltReqTimeoutError:
timeout_handler()
ret_val = ""
log.trace("ret_val = %s", ret_val) # pylint: disable=no-member
raise salt.ext.tornado.gen.Return(ret_val)
def _return_pub(self, ret, ret_cmd="_return", timeout=60):
"""
Return the data from the executed command to the master server
"""
jid = ret.get("jid", ret.get("__jid__"))
load = self._prepare_return_pub(ret, ret_cmd)
if not self.opts["pub_ret"]:
return ""
def timeout_handler(*_):
log.warning(
"The minion failed to return the job information for job %s. "
"This is often due to the master being shut down or "
"overloaded. If the master is running, consider increasing "
"the worker_threads value.",
jid,
)
return True
try:
ret_val = self._send_req_sync(load, timeout=timeout)
except SaltReqTimeoutError:
timeout_handler()
return ""
log.trace("ret_val = %s", ret_val) # pylint: disable=no-member
return ret_val
@ -2320,6 +2392,9 @@ class Minion(MinionBase):
"""
Return the data from the executed command to the master server
"""
# XXX: This is only used by syndic and should be moved to the Syndic class.
# XXX: The sync flag is only called with sync=False. Which also means
# deprecating sync means we can remove Minion._send_req_async.
if not isinstance(rets, list):
rets = [rets]
jids = {}
@ -2460,13 +2535,13 @@ class Minion(MinionBase):
# Send an event to the master that the minion is live
if self.opts["enable_legacy_startup_events"]:
# Old style event. Defaults to False in 3001 release.
self._fire_master(
self._fire_master_main(
"Minion {} started at {}".format(self.opts["id"], time.asctime()),
"minion_start",
include_startup_grains=include_grains,
)
# send name spaced event
self._fire_master(
self._fire_master_main(
"Minion {} started at {}".format(self.opts["id"], time.asctime()),
tagify([self.opts["id"], "start"], "minion"),
include_startup_grains=include_grains,
@ -2749,21 +2824,35 @@ class Minion(MinionBase):
notify=data.get("notify", False),
)
elif tag.startswith("__master_req_channel_payload"):
job_master = tag.rsplit("/", 1)[1]
request_id, job_master = tag.rsplit("/", 2)[1:]
if job_master == self.opts["master"]:
ret = None
try:
yield _minion.req_channel.send(
ret = yield _minion.req_channel.send(
data,
timeout=_minion._return_retry_timer(),
tries=_minion.opts["return_retry_tries"],
)
except salt.exceptions.SaltReqTimeoutError:
log.error("Timeout encountered while sending %r request", data)
log.error(
"Timeout encountered while sending %r request. id=%s",
data,
request_id,
)
raise salt.ext.tornado.gen.Return()
with salt.utils.event.get_event(
"minion", opts=self.opts, listen=False
) as event:
yield event.fire_event_async(
{"ret": ret},
f"__master_req_channel_return/{request_id}",
)
else:
log.debug(
"Skipping req for other master: cmd=%s master=%s",
"Skipping req for other master: cmd=%s master=%s id=%s",
data["cmd"],
job_master,
request_id,
)
elif tag.startswith("pillar_refresh"):
yield _minion.pillar_refresh(
@ -2792,12 +2881,11 @@ class Minion(MinionBase):
elif tag.startswith("fire_master"):
if self.connected:
log.debug("Forwarding master event tag=%s", data["tag"])
self._fire_master(
yield self._fire_master_main(
data["data"],
data["tag"],
data["events"],
data["pretag"],
sync=False,
)
elif tag.startswith(master_event(type="disconnected")) or tag.startswith(
master_event(type="failback")
@ -2865,6 +2953,7 @@ class Minion(MinionBase):
self.req_channel = salt.channel.client.AsyncReqChannel.factory(
self.opts, io_loop=self.io_loop
)
yield self.req_channel.connect()
# put the current schedule into the new loaders
self.opts["schedule"] = self.schedule.option("schedule")
@ -2954,11 +3043,11 @@ class Minion(MinionBase):
1
],
)
self._return_pub(data, ret_cmd="_return", sync=False)
yield self._return_pub_main(data, ret_cmd="_return")
elif tag.startswith("_salt_error"):
if self.connected:
log.debug("Forwarding salt error event tag=%s", tag)
self._fire_master(data, tag, sync=False)
yield self._fire_master_main(data, tag)
elif tag.startswith("salt/auth/creds"):
key = tuple(data["key"])
log.debug(
@ -2971,7 +3060,7 @@ class Minion(MinionBase):
elif tag.startswith("__beacons_return"):
if self.connected:
log.debug("Firing beacons to master")
self._fire_master(events=data["beacons"])
yield self._fire_master_main(events=data["beacons"])
def cleanup_subprocesses(self):
"""
@ -3169,10 +3258,9 @@ class Minion(MinionBase):
"minion is running under an init system."
)
self._fire_master(
self._fire_master_main(
"ping",
"minion_ping",
sync=False,
timeout_handler=ping_timeout_handler,
)
except Exception: # pylint: disable=broad-except
@ -3373,12 +3461,10 @@ class Syndic(Minion):
self._fire_master(
"Syndic {} started at {}".format(self.opts["id"], time.asctime()),
"syndic_start",
sync=False,
)
self._fire_master(
"Syndic {} started at {}".format(self.opts["id"], time.asctime()),
tagify([self.opts["id"], "start"], "syndic"),
sync=False,
)
# TODO: clean up docs
@ -3769,7 +3855,7 @@ class SyndicManager(MinionBase):
"events": events,
"pretag": tagify(self.opts["id"], base="syndic"),
"timeout": self._return_retry_timer(),
"sync": False,
"sync": True, # Sync needs to be true unless being called from a coroutine
},
)
if self.delayed:

View file

@ -50,7 +50,7 @@ class SyncWrapper:
close_methods=None,
loop_kwarg=None,
):
self.io_loop = salt.ext.tornado.ioloop.IOLoop()
self.io_loop = salt.ext.tornado.ioloop.IOLoop(make_current=False)
if args is None:
args = []
if kwargs is None: