Only process events that are job returns

This commit is contained in:
Daniel A. Wozniak 2023-10-13 13:51:38 -07:00 committed by Daniel Wozniak
parent 4fc11cd53e
commit 334c5bac25

View file

@ -299,7 +299,7 @@ class LocalClient:
tgt_type=tgt_type,
timeout=timeout,
listen=listen,
**kwargs
**kwargs,
)
if "jid" in pub_data:
@ -365,7 +365,7 @@ class LocalClient:
jid="",
kwarg=None,
listen=False,
**kwargs
**kwargs,
):
"""
Asynchronously send a command to connected minions
@ -393,7 +393,7 @@ class LocalClient:
jid=jid,
timeout=self._get_timeout(timeout),
listen=listen,
**kwargs
**kwargs,
)
except SaltClientError:
# Re-raise error with specific message
@ -429,7 +429,7 @@ class LocalClient:
kwarg=None,
listen=True,
io_loop=None,
**kwargs
**kwargs,
):
"""
Asynchronously send a command to connected minions
@ -458,7 +458,7 @@ class LocalClient:
timeout=self._get_timeout(timeout),
io_loop=io_loop,
listen=listen,
**kwargs
**kwargs,
)
except SaltClientError:
# Re-raise error with specific message
@ -511,7 +511,7 @@ class LocalClient:
cli=False,
progress=False,
full_return=False,
**kwargs
**kwargs,
):
"""
Execute a command on a random subset of the targeted systems
@ -553,7 +553,7 @@ class LocalClient:
kwarg=kwarg,
progress=progress,
full_return=full_return,
**kwargs
**kwargs,
)
def cmd_batch(
@ -565,7 +565,7 @@ class LocalClient:
ret="",
kwarg=None,
batch="10%",
**kwargs
**kwargs,
):
"""
Iteratively execute a command on subsets of minions at a time
@ -641,7 +641,7 @@ class LocalClient:
jid="",
full_return=False,
kwarg=None,
**kwargs
**kwargs,
):
"""
Synchronously execute a command on targeted minions
@ -759,7 +759,7 @@ class LocalClient:
jid,
kwarg=kwarg,
listen=True,
**kwargs
**kwargs,
)
if not pub_data:
@ -772,7 +772,7 @@ class LocalClient:
self._get_timeout(timeout),
tgt,
tgt_type,
**kwargs
**kwargs,
):
if fn_ret:
@ -797,7 +797,7 @@ class LocalClient:
verbose=False,
kwarg=None,
progress=False,
**kwargs
**kwargs,
):
"""
Used by the :command:`salt` CLI. This method returns minion returns as
@ -821,7 +821,7 @@ class LocalClient:
timeout,
kwarg=kwarg,
listen=True,
**kwargs
**kwargs,
)
if not self.pub_data:
yield self.pub_data
@ -835,7 +835,7 @@ class LocalClient:
tgt_type,
verbose,
progress,
**kwargs
**kwargs,
):
if not fn_ret:
@ -866,7 +866,7 @@ class LocalClient:
tgt_type="glob",
ret="",
kwarg=None,
**kwargs
**kwargs,
):
"""
Yields the individual minion returns as they come in
@ -901,7 +901,7 @@ class LocalClient:
timeout,
kwarg=kwarg,
listen=True,
**kwargs
**kwargs,
)
if not pub_data:
@ -915,7 +915,7 @@ class LocalClient:
timeout=self._get_timeout(timeout),
tgt=tgt,
tgt_type=tgt_type,
**kwargs
**kwargs,
):
if not fn_ret:
continue
@ -936,7 +936,7 @@ class LocalClient:
kwarg=None,
show_jid=False,
verbose=False,
**kwargs
**kwargs,
):
"""
Yields the individual minion returns as they come in, or None
@ -972,7 +972,7 @@ class LocalClient:
timeout,
kwarg=kwarg,
listen=True,
**kwargs
**kwargs,
)
if not pub_data:
@ -985,7 +985,7 @@ class LocalClient:
tgt=tgt,
tgt_type=tgt_type,
block=False,
**kwargs
**kwargs,
):
if fn_ret and any([show_jid, verbose]):
for minion in fn_ret:
@ -1007,7 +1007,7 @@ class LocalClient:
ret="",
verbose=False,
kwarg=None,
**kwargs
**kwargs,
):
"""
Execute a salt command and return
@ -1024,7 +1024,7 @@ class LocalClient:
timeout,
kwarg=kwarg,
listen=True,
**kwargs
**kwargs,
)
if not pub_data:
@ -1046,7 +1046,7 @@ class LocalClient:
tgt_type="glob",
verbose=False,
show_jid=False,
**kwargs
**kwargs,
):
"""
Starts a watcher looking at the return data for a specified JID
@ -1123,7 +1123,7 @@ class LocalClient:
tgt_type="glob",
expect_minions=False,
block=True,
**kwargs
**kwargs,
):
"""
Watch the event system and return job data as it comes in
@ -1202,7 +1202,13 @@ class LocalClient:
if "missing" in raw.get("data", {}):
missing.update(raw["data"]["missing"])
continue
# Anything below this point is expected to be a job return event.
if not raw["tag"].startswith(f"salt/job/{jid}/ret"):
log.debug("Skipping non return event: %s", raw["tag"])
continue
if "return" not in raw["data"]:
log.warning("Malformed event return: %s", raw["tag"])
continue
if kwargs.get("raw", False):
found.add(raw["data"]["id"])
@ -1628,7 +1634,7 @@ class LocalClient:
progress=False,
show_timeout=False,
show_jid=False,
**kwargs
**kwargs,
):
"""
Get the returns for the command line interface via the event system
@ -1658,7 +1664,7 @@ class LocalClient:
expect_minions=(
kwargs.pop("expect_minions", False) or verbose or show_timeout
),
**kwargs
**kwargs,
):
log.debug("return event: %s", ret)
return_count = return_count + 1
@ -1851,7 +1857,7 @@ class LocalClient:
jid="",
timeout=5,
listen=False,
**kwargs
**kwargs,
):
"""
Take the required arguments and publish the given command.
@ -1953,7 +1959,7 @@ class LocalClient:
timeout=5,
io_loop=None,
listen=True,
**kwargs
**kwargs,
):
"""
Take the required arguments and publish the given command.