Only return job results to originating master.

This commit is contained in:
Daniel A. Wozniak 2023-08-24 15:38:42 -07:00 committed by Pedro Algarvio
parent daaa2d4ac5
commit 442796b011
3 changed files with 130 additions and 10 deletions

1
changelog/62834.fixed.md Normal file
View file

@ -0,0 +1 @@
Job returns are only sent to originating master

View file

@ -1607,7 +1607,9 @@ class Minion(MinionBase):
"minion", opts=self.opts, listen=False
) as event:
return event.fire_event(
load, "__master_req_channel_payload", timeout=timeout
load,
f"__master_req_channel_payload/{self.opts['master']}",
timeout=timeout,
)
@salt.ext.tornado.gen.coroutine
@ -1624,7 +1626,9 @@ class Minion(MinionBase):
"minion", opts=self.opts, listen=False
) as event:
ret = yield event.fire_event_async(
load, "__master_req_channel_payload", timeout=timeout
load,
f"__master_req_channel_payload/{self.opts['master']}",
timeout=timeout,
)
raise salt.ext.tornado.gen.Return(ret)
@ -2717,14 +2721,22 @@ class Minion(MinionBase):
notify=data.get("notify", False),
)
elif tag.startswith("__master_req_channel_payload"):
try:
yield _minion.req_channel.send(
data,
timeout=_minion._return_retry_timer(),
tries=_minion.opts["return_retry_tries"],
job_master = tag.rsplit("/", 1)[1]
if job_master == self.opts["master"]:
try:
yield _minion.req_channel.send(
data,
timeout=_minion._return_retry_timer(),
tries=_minion.opts["return_retry_tries"],
)
except salt.exceptions.SaltReqTimeoutError:
log.error("Timeout encountered while sending %r request", data)
else:
log.debug(
"Skipping job return for other master: jid=%s master=%s",
data["jid"],
job_master,
)
except salt.exceptions.SaltReqTimeoutError:
log.error("Timeout encountered while sending %r request", data)
elif tag.startswith("pillar_refresh"):
yield _minion.pillar_refresh(
force_refresh=data.get("force_refresh", False),
@ -3328,7 +3340,7 @@ class Syndic(Minion):
data["to"],
io_loop=self.io_loop,
callback=lambda _: None,
**kwargs
**kwargs,
)
def _send_req_sync(self, load, timeout):

View file

@ -0,0 +1,107 @@
import os
import shutil
import subprocess
import pytest
import salt.utils.platform
@pytest.fixture
def salt_master_1(request, salt_factories):
config_defaults = {
"open_mode": True,
"transport": request.config.getoption("--transport"),
}
config_overrides = {
"interface": "127.0.0.1",
}
factory = salt_factories.salt_master_daemon(
"master-1",
defaults=config_defaults,
overrides=config_overrides,
extra_cli_arguments_after_first_start_failure=["--log-level=info"],
)
with factory.started(start_timeout=120):
yield factory
@pytest.fixture
def salt_master_2(salt_factories, salt_master_1):
if salt.utils.platform.is_darwin() or salt.utils.platform.is_freebsd():
subprocess.check_output(["ifconfig", "lo0", "alias", "127.0.0.2", "up"])
config_defaults = {
"open_mode": True,
"transport": salt_master_1.config["transport"],
}
config_overrides = {
"interface": "127.0.0.2",
}
# Use the same ports for both masters, they are binding to different interfaces
for key in (
"ret_port",
"publish_port",
):
config_overrides[key] = salt_master_1.config[key]
factory = salt_factories.salt_master_daemon(
"master-2",
defaults=config_defaults,
overrides=config_overrides,
extra_cli_arguments_after_first_start_failure=["--log-level=info"],
)
# The secondary salt master depends on the primarily salt master fixture
# because we need to clone the keys
for keyfile in ("master.pem", "master.pub"):
shutil.copyfile(
os.path.join(salt_master_1.config["pki_dir"], keyfile),
os.path.join(factory.config["pki_dir"], keyfile),
)
with factory.started(start_timeout=120):
yield factory
@pytest.fixture
def salt_minion_1(salt_master_1, salt_master_2):
config_defaults = {
"transport": salt_master_1.config["transport"],
}
master_1_port = salt_master_1.config["ret_port"]
master_1_addr = salt_master_1.config["interface"]
master_2_port = salt_master_2.config["ret_port"]
master_2_addr = salt_master_2.config["interface"]
config_overrides = {
"master": [
"{}:{}".format(master_1_addr, master_1_port),
"{}:{}".format(master_2_addr, master_2_port),
],
"test.foo": "baz",
}
factory = salt_master_1.salt_minion_daemon(
"minion-1",
defaults=config_defaults,
overrides=config_overrides,
extra_cli_arguments_after_first_start_failure=["--log-level=info"],
)
with factory.started(start_timeout=120):
yield factory
def test_job_resturn(salt_master_1, salt_master_2, salt_minion_1):
cli = salt_master_1.salt_cli(timeout=120)
ret = cli.run("test.ping", "-v", minion_tgt="minion-1")
for line in ret.stdout.splitlines():
if "with jid" in line:
jid = line.split("with jid")[1].strip()
run_1 = salt_master_1.salt_run_cli(timeout=120)
ret = run_1.run("jobs.lookup_jid", jid)
assert ret.data == {"minion-1": True}
run_2 = salt_master_2.salt_run_cli(timeout=120)
ret = run_2.run("jobs.lookup_jid", jid)
assert ret.data == {}