More tests

This commit is contained in:
Daniel A. Wozniak 2023-08-21 22:28:51 -07:00 committed by Gareth J. Greenaway
parent de45a7bed0
commit dec16149b2
12 changed files with 377 additions and 163 deletions

View file

@ -224,7 +224,6 @@ class AsyncReqChannel:
tries,
timeout,
)
log.error("WTF %r", ret)
if HAS_M2:
aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
else:

View file

@ -126,6 +126,51 @@ class Master(
self.master.process_manager._handle_signals(signum, sigframe)
super()._handle_signals(signum, sigframe)
def verify_environment(self):
if not self.config["verify_env"]:
return
v_dirs = [
self.config["pki_dir"],
os.path.join(self.config["pki_dir"], "minions"),
os.path.join(self.config["pki_dir"], "minions_pre"),
os.path.join(self.config["pki_dir"], "minions_denied"),
os.path.join(self.config["pki_dir"], "minions_autosign"),
os.path.join(self.config["pki_dir"], "minions_rejected"),
self.config["cachedir"],
os.path.join(self.config["cachedir"], "jobs"),
os.path.join(self.config["cachedir"], "proc"),
self.config["sock_dir"],
self.config["token_dir"],
self.config["syndic_dir"],
self.config["sqlite_queue_dir"],
]
pki_dir = self.config["pki_dir"]
if (
self.config["cluster_id"]
and self.config["cluster_pki_dir"]
and self.config["cluster_pki_dir"] != self.config["pki_dir"]
):
v_dirs.extend(
[
self.config["cluster_pki_dir"],
os.path.join(self.config["cluster_pki_dir"], "peers"),
os.path.join(self.config["cluster_pki_dir"], "minions"),
os.path.join(self.config["cluster_pki_dir"], "minions_pre"),
os.path.join(self.config["cluster_pki_dir"], "minions_denied"),
os.path.join(self.config["cluster_pki_dir"], "minions_autosign"),
os.path.join(self.config["cluster_pki_dir"], "minions_rejected"),
]
)
pki_dir = [self.config["pki_dir"], self.config["cluster_pki_dir"]]
verify_env(
v_dirs,
self.config["user"],
permissive=self.config["permissive_pki_access"],
root_dir=self.config["root_dir"],
pki_dir=pki_dir,
)
def prepare(self):
"""
Run the preparation sequence required to start a salt master server.
@ -137,55 +182,7 @@ class Master(
super().prepare()
try:
if self.config["verify_env"]:
v_dirs = [
self.config["pki_dir"],
os.path.join(self.config["pki_dir"], "minions"),
os.path.join(self.config["pki_dir"], "minions_pre"),
os.path.join(self.config["pki_dir"], "minions_denied"),
os.path.join(self.config["pki_dir"], "minions_autosign"),
os.path.join(self.config["pki_dir"], "minions_rejected"),
self.config["cachedir"],
os.path.join(self.config["cachedir"], "jobs"),
os.path.join(self.config["cachedir"], "proc"),
self.config["sock_dir"],
self.config["token_dir"],
self.config["syndic_dir"],
self.config["sqlite_queue_dir"],
]
pki_dir = self.config["pki_dir"]
if (
self.config["cluster_pki_dir"]
and self.config["cluster_pki_dir"] != self.config["pki_dir"]
):
v_dirs.extend(
[
self.config["cluster_pki_dir"],
os.path.join(self.config["cluster_pki_dir"], "minions"),
os.path.join(self.config["cluster_pki_dir"], "minions_pre"),
os.path.join(
self.config["cluster_pki_dir"], "minions_denied"
),
os.path.join(
self.config["cluster_pki_dir"], "minions_autosign"
),
os.path.join(
self.config["cluster_pki_dir"], "minions_rejected"
),
]
)
pki_dir = [self.config["pki_dir"], self.config["cluster_pki_dir"]]
verify_env(
v_dirs,
self.config["user"],
permissive=self.config["permissive_pki_access"],
root_dir=self.config["root_dir"],
pki_dir=pki_dir,
)
# Clear out syndics from cachedir
for syndic_file in os.listdir(self.config["syndic_dir"]):
os.remove(os.path.join(self.config["syndic_dir"], syndic_file))
self.verify_environment()
except OSError as error:
self.environment_failure(error)

View file

@ -920,12 +920,15 @@ class EventMonitor(salt.utils.process.SignalHandlingProcess):
Event handler for publish forwarder
"""
tag, data = salt.utils.event.SaltEvent.unpack(package)
log.error("got event %s %r", tag, data)
log.debug("Event monitor got event %s %r", tag, data)
if tag.startswith("salt/job") and tag.endswith("/publish"):
peer_id = data.pop("__peer_id", None)
if peer_id:
data.pop("_stamp", None)
log.error("Forward job event to publisher server: %r", data)
log.debug(
"Event monitor forward job to publish server: jid=%s",
data.get("jid", "no jid"),
)
if not self.channels:
for transport, opts in iter_transport_opts(self.opts):
chan = salt.channel.server.PubServerChannel.factory(opts)
@ -935,7 +938,7 @@ class EventMonitor(salt.utils.process.SignalHandlingProcess):
tasks.append(asyncio.create_task(chan.publish(data)))
await asyncio.gather(*tasks)
elif tag == "rotate_aes_key":
log.error("Recieved rotate_aes_key event")
log.debug("Event monitor recieved rotate aes key event, rotating key.")
SMaster.rotate_secrets(self.opts, owner=False)
def run(self):
@ -2424,7 +2427,6 @@ class ClearFuncs(TransportMethods):
# An alternative to copy may be to pop it
# payload.pop("_stamp")
self._send_ssh_pub(payload, ssh_minions=ssh_minions)
log.error("SEND JOB PAYLOAD %r", payload)
await self._send_pub(payload)
return {

View file

@ -438,7 +438,7 @@ class TCPPubClient(salt.transport.base.PublishClient):
try:
# XXX This is handled better in the websocket transport work
await callback(msg)
except Exception:
except Exception as exc: # pylint: disable=broad-except
log.error(
"Unhandled exception while running callback %r",
self,
@ -1380,7 +1380,7 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
log.debug("Publish server binding pub to %s", self.pub_path)
sock = tornado.netutil.bind_unix_socket(self.pub_path)
else:
log.info(
log.debug(
"Publish server binding pub to %s:%s", self.pub_host, self.pub_port
)
sock = _get_socket(self.opts)
@ -1398,7 +1398,7 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
log.debug("Publish server binding pull to %s", self.pull_path)
pull_path = self.pull_path
else:
log.info(
log.debug(
"Publish server binding pull to %s:%s", self.pull_host, self.pull_port
)
pull_host = self.pull_host

View file

@ -1299,7 +1299,11 @@ class EventReturn(salt.utils.process.SignalHandlingProcess):
if event["tag"] == "salt/event/exit":
# We're done eventing
self.stop = True
if self._filter(event):
if self._filter(
event,
allow=self.opts["event_return_whitelist"],
deny=self.opts["event_return_blacklist"],
):
# This event passed the filter, add it to the queue
self.event_queue.append(event)
too_long_in_queue = False
@ -1347,23 +1351,40 @@ class EventReturn(salt.utils.process.SignalHandlingProcess):
self.flush_events()
def _filter(self, event):
@staticmethod
def _filter(event, allow=None, deny=None):
"""
Take an event and run it through configured filters.
Returns True if event should be stored, else False
Returns True if event should be stored, else False.
Any event that has a "__peer_id" id key defined are denied outright
because they did not originate from this master in a clustered
configuration.
If no allow or deny lists are given the event is allowed. Otherwise the
event's tag will be checked against the allow list. Then the deny list.
"""
if "__peer_id" in event:
return False
if allow is None:
allow = []
if deny is None:
deny = []
tag = event["tag"]
if self.opts["event_return_whitelist"]:
if allow:
ret = False
else:
ret = True
for whitelist_match in self.opts["event_return_whitelist"]:
if fnmatch.fnmatch(tag, whitelist_match):
for allow_match in allow:
if fnmatch.fnmatch(tag, allow_match):
ret = True
break
for blacklist_match in self.opts["event_return_blacklist"]:
if fnmatch.fnmatch(tag, blacklist_match):
for deny_match in deny:
if fnmatch.fnmatch(tag, deny_match):
ret = False
break
return ret

View file

@ -36,7 +36,7 @@ class UserInfo:
@key_file.default
def _default_key_file(self):
return ".{}_key".format(self.username)
return f".{self.username}_key"
@key_path.default
def _default_key_path(self):
@ -103,6 +103,7 @@ def test_pub_not_allowed(
user_info,
caplog,
):
assert hasattr(salt.master.ClearFuncs, "_send_pub")
tempfile = tmp_path / "evil_file"
assert not tempfile.exists()
jid = "202003100000000001"
@ -143,11 +144,11 @@ def test_pub_not_allowed(
time.sleep(0.5)
# If we got the log message, we shouldn't get anything from the event bus
expected_tag = "salt/job/{}/*".format(jid)
expected_tag = f"salt/job/{jid}/*"
event_pattern = (salt_master.id, expected_tag)
events = event_listener.get_events([event_pattern], after_time=start_time)
for event in events:
pytest.fail("This event should't have gone through: {}".format(event))
pytest.fail(f"This event should't have gone through: {event}")
assert not tempfile.exists(), "Evil file created"

View file

@ -35,7 +35,7 @@ def test_publish_retry(salt_master, salt_minion_retry, salt_cli, salt_run_cli):
# stop the salt master for some time
with salt_master.stopped():
# verify we don't yet have the result and sleep
assert salt_run_cli.run("jobs.lookup_jid", jid, _timeout=60).data == {}
assert salt_run_cli.run("jobs.lookup_jid", jid, _timeout=10).data == {}
# the 70s sleep (and 60s timer value) is to reduce flakiness due to slower test runs
# and should be addresses when number of tries is configurable through minion opts

View file

@ -1,8 +1,7 @@
import asyncio
from functools import partial
import asyncio
import pytest
import tornado.gen
from salt.netapi.rest_tornado import saltnado

View file

@ -56,11 +56,6 @@ def cluster_master_1(request, salt_factories, cluster_pki_path, cluster_cache_pa
yield factory
# @pytest.fixture(scope="package")
# def cluster_master_1_salt_cli(cluster_master_1):
# return cluster__master_1.salt_cli(timeout=120)
#
#
@pytest.fixture
def cluster_master_2(salt_factories, cluster_master_1):
if salt.utils.platform.is_darwin() or salt.utils.platform.is_freebsd():
@ -133,12 +128,6 @@ def cluster_master_3(salt_factories, cluster_master_1):
yield factory
#
# @pytest.fixture(scope="package")
# def cluster_master_2_salt_cli(cluster_master_2):
# return cluster_master_2.salt_cli(timeout=120)
#
#
@pytest.fixture
def cluster_minion_1(cluster_master_1):
config_defaults = {
@ -159,74 +148,3 @@ def cluster_minion_1(cluster_master_1):
)
with factory.started(start_timeout=120):
yield factory
#
#
# @pytest.fixture(scope="package")
# def cluster_minion_2(cluster_master_2):
# config_defaults = {
# "transport": cluster_master_2.config["transport"],
# }
#
# port = cluster_master_2.config["ret_port"]
# addr = cluster_master_2.config["interface"]
# config_overrides = {
# "master": f"{port}:{addr}",
# "test.foo": "baz",
# }
# factory = salt_mm_master_1.salt_minion_daemon(
# "cluster-minion-2",
# defaults=config_defaults,
# overrides=config_overrides,
# extra_cli_arguments_after_first_start_failure=["--log-level=info"],
# )
# with factory.started(start_timeout=120):
# yield factory
#
#
#
# @pytest.fixture(scope="package")
# def run_salt_cmds():
# def _run_salt_cmds_fn(clis, minions):
# """
# Run test.ping from all clis to all minions
# """
# returned_minions = []
# minion_instances = {minion.id: minion for minion in minions}
# clis_to_check = {minion.id: list(clis) for minion in minions}
#
# attempts = 6
# timeout = 5
# if salt.utils.platform.spawning_platform():
# timeout *= 2
# while attempts:
# if not clis_to_check:
# break
# for minion in list(clis_to_check):
# if not clis_to_check[minion]:
# clis_to_check.pop(minion)
# continue
# for cli in list(clis_to_check[minion]):
# try:
# ret = cli.run(
# "--timeout={}".format(timeout),
# "test.ping",
# minion_tgt=minion,
# _timeout=2 * timeout,
# )
# if ret.returncode == 0 and ret.data is True:
# returned_minions.append((cli, minion_instances[minion]))
# clis_to_check[minion].remove(cli)
# except FactoryTimeout:
# log.debug(
# "Failed to execute test.ping from %s to %s.",
# cli.get_display_name(),
# minion,
# )
# time.sleep(1)
# attempts -= 1
#
# return returned_minions
#
# return _run_salt_cmds_fn

View file

@ -5,7 +5,7 @@ Unit test for the daemons starter classes.
import logging
import multiprocessing
import salt.cli.daemons as daemons
import salt.cli.daemons
from tests.support.mock import MagicMock, patch
log = logging.getLogger(__name__)
@ -76,7 +76,7 @@ def _master_exec_test(child_pipe):
Create master instance
:return:
"""
obj = daemons.Master()
obj = salt.cli.daemons.Master()
obj.config = {"user": "dummy", "hash_type": alg}
for attr in ["start_log_info", "prepare", "shutdown", "master"]:
setattr(obj, attr, MagicMock())
@ -91,7 +91,7 @@ def _master_exec_test(child_pipe):
for alg in ["md5", "sha1"]:
_create_master().start()
ret = ret and _logger.has_message(
"Do not use {alg}".format(alg=alg), log_type="warning"
f"Do not use {alg}", log_type="warning"
)
_logger.reset()
@ -116,7 +116,7 @@ def _minion_exec_test(child_pipe):
Create minion instance
:return:
"""
obj = daemons.Minion()
obj = salt.cli.daemons.Minion()
obj.config = {"user": "dummy", "hash_type": alg}
for attr in ["start_log_info", "prepare", "shutdown"]:
setattr(obj, attr, MagicMock())
@ -132,7 +132,7 @@ def _minion_exec_test(child_pipe):
for alg in ["md5", "sha1"]:
_create_minion().start()
ret = ret and _logger.has_message(
"Do not use {alg}".format(alg=alg), log_type="warning"
f"Do not use {alg}", log_type="warning"
)
_logger.reset()
@ -156,7 +156,7 @@ def _proxy_exec_test(child_pipe):
Create proxy minion instance
:return:
"""
obj = daemons.ProxyMinion()
obj = salt.cli.daemons.ProxyMinion()
obj.config = {"user": "dummy", "hash_type": alg}
for attr in ["minion", "start_log_info", "prepare", "shutdown", "tune_in"]:
setattr(obj, attr, MagicMock())
@ -172,7 +172,7 @@ def _proxy_exec_test(child_pipe):
for alg in ["md5", "sha1"]:
_create_proxy_minion().start()
ret = ret and _logger.has_message(
"Do not use {alg}".format(alg=alg), log_type="warning"
f"Do not use {alg}", log_type="warning"
)
_logger.reset()
@ -197,7 +197,7 @@ def _syndic_exec_test(child_pipe):
Create syndic instance
:return:
"""
obj = daemons.Syndic()
obj = salt.cli.daemons.Syndic()
obj.config = {"user": "dummy", "hash_type": alg}
for attr in ["syndic", "start_log_info", "prepare", "shutdown"]:
setattr(obj, attr, MagicMock())
@ -212,7 +212,7 @@ def _syndic_exec_test(child_pipe):
for alg in ["md5", "sha1"]:
_create_syndic().start()
ret = ret and _logger.has_message(
"Do not use {alg}".format(alg=alg), log_type="warning"
f"Do not use {alg}", log_type="warning"
)
_logger.reset()
@ -265,3 +265,149 @@ def test_syndic_daemon_hash_type_verified():
Verify if Syndic is verifying hash_type config option.
"""
_multiproc_exec_test(_syndic_exec_test)
def test_master_skip_prepare(tmp_path):
root_dir = tmp_path / "root"
pki_dir = tmp_path / "pki"
sock_dir = tmp_path / "socket"
cache_dir = tmp_path / "cache"
token_dir = tmp_path / "token"
syndic_dir = tmp_path / "syndic"
sqlite_dir = tmp_path / "sqlite_queue_dir"
assert not pki_dir.exists()
assert not sock_dir.exists()
assert not cache_dir.exists()
assert not token_dir.exists()
assert not syndic_dir.exists()
assert not sqlite_dir.exists()
master = salt.cli.daemons.Master()
master.config = {
"verify_env": True,
"pki_dir": str(pki_dir),
"cachedir": str(cache_dir),
"sock_dir": str(sock_dir),
"token_dir": str(token_dir),
"syndic_dir": str(syndic_dir),
"sqlite_queue_dir": str(sqlite_dir),
"cluster_id": None,
"user": "root",
"permissive_pki_access": False,
"root_dir": str(root_dir),
}
assert not pki_dir.exists()
assert not sock_dir.exists()
assert not cache_dir.exists()
assert not token_dir.exists()
assert not syndic_dir.exists()
assert not sqlite_dir.exists()
def test_master_prepare(tmp_path):
root_dir = tmp_path / "root"
pki_dir = tmp_path / "pki"
sock_dir = tmp_path / "socket"
cache_dir = tmp_path / "cache"
token_dir = tmp_path / "token"
syndic_dir = tmp_path / "syndic"
sqlite_dir = tmp_path / "sqlite_queue_dir"
assert not pki_dir.exists()
assert not sock_dir.exists()
assert not cache_dir.exists()
assert not token_dir.exists()
assert not syndic_dir.exists()
assert not sqlite_dir.exists()
master = salt.cli.daemons.Master()
master.config = {
"verify_env": True,
"pki_dir": str(pki_dir),
"cachedir": str(cache_dir),
"sock_dir": str(sock_dir),
"token_dir": str(token_dir),
"syndic_dir": str(syndic_dir),
"sqlite_queue_dir": str(sqlite_dir),
"cluster_id": None,
"user": "root",
"permissive_pki_access": False,
"root_dir": str(root_dir),
}
master.verify_environment()
assert pki_dir.exists()
assert (pki_dir / "minions").exists()
assert (pki_dir / "minions_pre").exists()
assert (pki_dir / "minions_denied").exists()
assert (pki_dir / "minions_autosign").exists()
assert (pki_dir / "minions_rejected").exists()
assert sock_dir.exists()
assert cache_dir.exists()
assert (cache_dir / "jobs").exists()
assert (cache_dir / "proc").exists()
assert token_dir.exists()
assert syndic_dir.exists()
assert sqlite_dir.exists()
def test_master_prepare_cluster(tmp_path):
root_dir = tmp_path / "root"
pki_dir = tmp_path / "pki"
sock_dir = tmp_path / "socket"
cache_dir = tmp_path / "cache"
token_dir = tmp_path / "token"
syndic_dir = tmp_path / "syndic"
sqlite_dir = tmp_path / "sqlite_queue_dir"
cluster_dir = tmp_path / "cluster"
assert not pki_dir.exists()
assert not sock_dir.exists()
assert not cache_dir.exists()
assert not token_dir.exists()
assert not syndic_dir.exists()
assert not sqlite_dir.exists()
assert not cluster_dir.exists()
master = salt.cli.daemons.Master()
master.config = {
"verify_env": True,
"cluster_id": "cluster-test",
"cluster_pki_dir": str(cluster_dir),
"pki_dir": str(pki_dir),
"cachedir": str(cache_dir),
"sock_dir": str(sock_dir),
"token_dir": str(token_dir),
"syndic_dir": str(syndic_dir),
"sqlite_queue_dir": str(sqlite_dir),
"user": "root",
"permissive_pki_access": False,
"root_dir": str(root_dir),
}
master.verify_environment()
assert pki_dir.exists()
assert (pki_dir / "minions").exists()
assert (pki_dir / "minions_pre").exists()
assert (pki_dir / "minions_denied").exists()
assert (pki_dir / "minions_autosign").exists()
assert (pki_dir / "minions_rejected").exists()
assert sock_dir.exists()
assert cache_dir.exists()
assert (cache_dir / "jobs").exists()
assert (cache_dir / "proc").exists()
assert token_dir.exists()
assert syndic_dir.exists()
assert sqlite_dir.exists()
assert cluster_dir.exists()
assert (cluster_dir / "peers").exists()
assert (cluster_dir / "minions").exists()
assert (cluster_dir / "minions_pre").exists()
assert (cluster_dir / "minions_denied").exists()
assert (cluster_dir / "minions_autosign").exists()
assert (cluster_dir / "minions_rejected").exists()

View file

@ -0,0 +1,62 @@
import salt.config
def test_apply_no_cluster_id():
defaults = salt.config.DEFAULT_MASTER_OPTS.copy()
assert "cluster_id" not in defaults
overrides = {}
opts = salt.config.apply_master_config(overrides, defaults)
assert "cluster_id" in opts
assert opts["cluster_id"] is None
assert "cluster_pki_dir" in opts
assert opts["cluster_pki_dir"] is None
def test_apply_default_for_cluster():
defaults = salt.config.DEFAULT_MASTER_OPTS.copy()
assert "cluster_id" not in defaults
overrides = {"cluster_id": "test-cluster"}
opts = salt.config.apply_master_config(overrides, defaults)
assert "cluster_id" in opts
assert "test-cluster" == opts["cluster_id"]
# the cluster pki dir defaults to pki_dir
assert "cluster_pki_dir" in opts
assert opts["pki_dir"] == opts["cluster_pki_dir"]
# the cluster peers defaults to empty list
assert "cluster_peers" in opts
assert [] == opts["cluster_peers"]
def test_apply_for_cluster():
defaults = salt.config.DEFAULT_MASTER_OPTS.copy()
assert "cluster_id" not in defaults
cluster_dir = "/tmp/cluster"
overrides = {
"cluster_id": "test-cluster",
"cluster_peers": [
"127.0.0.1",
"127.0.0.3",
],
"cluster_pki_dir": cluster_dir,
}
opts = salt.config.apply_master_config(overrides, defaults)
assert "cluster_id" in opts
assert "test-cluster" == opts["cluster_id"]
# the cluster pki dir defaults to pki_dir
assert "cluster_pki_dir" in opts
assert cluster_dir == opts["cluster_pki_dir"]
# the cluster peers defaults to empty list
assert "cluster_peers" in opts
assert isinstance(opts["cluster_peers"], list)
opts["cluster_peers"].sort()
assert ["127.0.0.1", "127.0.0.3"] == opts["cluster_peers"]

View file

@ -13,7 +13,76 @@ def test_event_return(master_opts):
evt.start()
except TypeError as exc:
if "object" in str(exc):
pytest.fail("'{}' TypeError should have not been raised".format(exc))
pytest.fail(f"'{exc}' TypeError should have not been raised")
finally:
if evt is not None:
terminate_process(evt.pid, kill_children=True)
def test_filter_cluster_peer():
assert (
salt.utils.event.EventReturn._filter(
{"__peer_id": "foo", "tag": "salt/test", "data": {"foo": "bar"}},
)
is False
)
def test_filter_no_allow_or_deny():
assert (
salt.utils.event.EventReturn._filter(
{"tag": "salt/test", "data": {"foo": "bar"}},
)
is True
)
def test_filter_not_allowed():
assert (
salt.utils.event.EventReturn._filter(
{"tag": "salt/test", "data": {"foo": "bar"}},
allow=["foo/*"],
)
is False
)
def test_filter_not_denied():
assert (
salt.utils.event.EventReturn._filter(
{"tag": "salt/test", "data": {"foo": "bar"}},
deny=["foo/*"],
)
is True
)
def test_filter_allowed():
assert (
salt.utils.event.EventReturn._filter(
{"tag": "salt/test", "data": {"foo": "bar"}},
allow=["salt/*"],
)
is True
)
def test_filter_denied():
assert (
salt.utils.event.EventReturn._filter(
{"tag": "salt/test", "data": {"foo": "bar"}},
deny=["salt/*"],
)
is False
)
def test_filter_allowed_but_denied():
assert (
salt.utils.event.EventReturn._filter(
{"tag": "salt/test", "data": {"foo": "bar"}},
allow=["salt/*"],
deny=["salt/test"],
)
is False
)