From c2a4baf11fe3c89106df519861c9b4bab9579495 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Sun, 20 Aug 2023 17:24:34 -0700 Subject: [PATCH] Only the cluster aes key is on disk --- salt/channel/client.py | 62 +----- salt/channel/server.py | 120 ++++++++--- salt/master.py | 95 +++++---- tests/pytests/scenarios/cluster/conftest.py | 194 ++++++++++++++++++ .../scenarios/cluster/test_basic_cluster.py | 64 ++++++ tests/pytests/unit/test_crypt.py | 57 +++-- tests/pytests/unit/test_master.py | 71 ++++--- 7 files changed, 484 insertions(+), 179 deletions(-) create mode 100644 tests/pytests/scenarios/cluster/conftest.py create mode 100644 tests/pytests/scenarios/cluster/test_basic_cluster.py diff --git a/salt/channel/client.py b/salt/channel/client.py index d869d89a233..a41d03edd82 100644 --- a/salt/channel/client.py +++ b/salt/channel/client.py @@ -224,6 +224,7 @@ class AsyncReqChannel: tries, timeout, ) + log.error("WTF %r", ret) if HAS_M2: aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding) else: @@ -670,64 +671,3 @@ class AsyncPullChannel: import salt.transport.ipc return salt.transport.ipc.IPCMessageServer(opts, **kwargs) - - -class AsyncMasterPubChannel: - """ """ - - async_methods = [ - "connect", - ] - - close_methods = [ - "close", - ] - - @classmethod - def factory(cls, opts, **kwargs): - io_loop = kwargs.get("io_loop") - if io_loop is None: - io_loop = tornado.ioloop.IOLoop.current() - transport = salt.transport.ipc_publish_client(opts, "master") - return cls(opts, transport, None, io_loop) - - def __init__(self, opts, transport, auth, io_loop=None): - self.opts = opts - self.io_loop = io_loop - self.auth = auth - self.transport = transport - self._closing = False - self._reconnected = False - - async def connect(self): - """ - Return a future which completes when connected to the remote publisher - """ - await self.transport.connect() - - async def recv(self, timeout=None): - return await self.transport.recv(timeout) - - def close(self): - """ - Close the channel - """ - self.transport.close() - - def on_recv(self, callback=None): - """ - When jobs are received pass them (decoded) to callback - """ - return self.transport.on_recv(callback) - - def __enter__(self): - return self - - def __exit__(self, *args): - self.io_loop.spawn_callback(self.close) - - async def __aenter__(self): - return self - - async def __aexit__(self, *_): - await self.close() diff --git a/salt/channel/server.py b/salt/channel/server.py index e2573cc21e1..4f84b826b38 100644 --- a/salt/channel/server.py +++ b/salt/channel/server.py @@ -6,6 +6,7 @@ This includes server side transport, for the ReqServer and the Publisher import asyncio import binascii +import collections import hashlib import logging import os @@ -244,12 +245,16 @@ class ReqServerChannel: """ import salt.master + key = "aes" + if self.opts.get("cluster_id", None): + key = "cluster_aes" + if ( - salt.master.SMaster.secrets["aes"]["secret"].value + salt.master.SMaster.secrets[key]["secret"].value != self.crypticle.key_string ): self.crypticle = salt.crypt.Crypticle( - self.opts, salt.master.SMaster.secrets["aes"]["secret"].value + self.opts, salt.master.SMaster.secrets[key]["secret"].value ) return True return False @@ -648,7 +653,7 @@ class ReqServerChannel: ) else: mtoken = mcipher.decrypt(load["token"]) - aes = "{}_|-{}".format(self.aes_key, mtoken) + aes = f"{self.aes_key}_|-{mtoken}" except Exception: # pylint: disable=broad-except # Token failed to decrypt, send back the salty bacon to # support older minions @@ -999,20 +1004,23 @@ class MasterPubServerChannel: self.io_loop = tornado.ioloop.IOLoop.current() tcp_master_pool_port = 4520 self.pushers = [] - for master in self.opts.get("cluster_peers", []): + self.auth_errors = {} + for peer in self.opts.get("cluster_peers", []): pusher = salt.transport.tcp.TCPPublishServer( self.opts, - pull_host=master, + pull_host=peer, pull_port=tcp_master_pool_port, ) + self.auth_errors[peer] = collections.deque() self.pushers.append(pusher) - self.pool_puller = salt.transport.tcp.TCPPuller( - host=self.opts["interface"], - port=tcp_master_pool_port, - io_loop=self.io_loop, - payload_handler=self.handle_pool_publish, - ) - self.pool_puller.start() + if self.opts.get("cluster_id", None): + self.pool_puller = salt.transport.tcp.TCPPuller( + host=self.opts["interface"], + port=tcp_master_pool_port, + io_loop=self.io_loop, + payload_handler=self.handle_pool_publish, + ) + self.pool_puller.start() self.io_loop.add_callback( self.transport.publisher, self.publish_payload, @@ -1061,20 +1069,46 @@ class MasterPubServerChannel: if self.peer_keys[peer] != key_str: self.peer_keys[peer] = key_str self.send_aes_key_event() + while self.auth_errors[peer]: + key, data = self.auth_errors[peer].popleft() + peer_id, parsed_tag = self.parse_cluster_tag(tag) + try: + event_data = self.extract_cluster_event(peer_id, data) + except salt.exceptions.AuthenticationError: + log.error( + "Event from peer failed authentication: %s", peer_id + ) + else: + await self.transport.publish_payload( + salt.utils.event.SaltEvent.pack( + parsed_tag, event_data + ) + ) else: self.peer_keys[peer] = key_str self.send_aes_key_event() + while self.auth_errors[peer]: + key, data = self.auth_errors[peer].popleft() + peer_id, parsed_tag = self.parse_cluster_tag(tag) + try: + event_data = self.extract_cluster_event(peer_id, data) + except salt.exceptions.AuthenticationError: + log.error( + "Event from peer failed authentication: %s", peer_id + ) + else: + await self.transport.publish_payload( + salt.utils.event.SaltEvent.pack(parsed_tag, event_data) + ) elif tag.startswith("cluster/event"): - peer_id = tag.replace("cluster/event/", "").split("/")[0] - stripped_tag = tag.replace(f"cluster/event/{peer_id}/", "") - if peer_id in self.peer_keys: - crypticle = salt.crypt.Crypticle(self.opts, self.peer_keys[peer_id]) - event_data = crypticle.loads(data) - # __peer_id can be used to know if this event came from a - # different master? - event_data["__peer_id"] = peer_id + peer_id, parsed_tag = self.parse_cluster_tag(tag) + try: + event_data = self.extract_cluster_event(peer_id, data) + except salt.exceptions.AuthenticationError: + self.auth_errors[peer_id].append((tag, data)) + else: await self.transport.publish_payload( - salt.utils.event.SaltEvent.pack(stripped_tag, event_data) + salt.utils.event.SaltEvent.pack(parsed_tag, event_data) ) else: log.error("This cluster tag not valid %s", tag) @@ -1083,18 +1117,37 @@ class MasterPubServerChannel: log.critical("Unexpected error while polling master events", exc_info=True) return None + def parse_cluster_tag(self, tag): + peer_id = tag.replace("cluster/event/", "").split("/")[0] + stripped_tag = tag.replace(f"cluster/event/{peer_id}/", "") + return peer_id, stripped_tag + + def extract_cluster_event(self, peer_id, data): + if peer_id in self.peer_keys: + crypticle = salt.crypt.Crypticle(self.opts, self.peer_keys[peer_id]) + event_data = crypticle.loads(data)["event_payload"] + # __peer_id can be used to know if this event came from a + # different master? + event_data["__peer_id"] = peer_id + return event_data + raise salt.exceptions.AuthenticationError("Peer aes key not available") + async def publish_payload(self, load, *args): log.error("Publish event to local ipc clients") tag, data = salt.utils.event.SaltEvent.unpack(load) tasks = [] if not tag.startswith("cluster/peer"): - tasks = [asyncio.create_task(self.transport.publish_payload(load))] + tasks = [ + asyncio.create_task( + self.transport.publish_payload(load), name=self.opts["id"] + ) + ] for pusher in self.pushers: - log.error( - "Publish event to peer master %s:%s", pusher.pull_host, pusher.pull_port - ) + log.debug("Publish event to peer %s:%s", pusher.pull_host, pusher.pull_port) if tag.startswith("cluster/peer"): - tasks.append(asyncio.create_task(pusher.publish(load))) + tasks.append( + asyncio.create_task(pusher.publish(load), name=pusher.pull_host) + ) continue crypticle = salt.crypt.Crypticle( self.opts, salt.master.SMaster.secrets["aes"]["secret"].value @@ -1109,5 +1162,16 @@ class MasterPubServerChannel: for task in tasks: try: task.result() - except Exception: # pylint: disable=broad-except - log.error("Error sending task %s", exc) + # XXX This error is transport specific and should be something else + except tornado.iostream.StreamClosedError: + if task.get_name() == self.opts["id"]: + log.error("Unable to forward event to local ipc bus") + else: + log.warning( + "Unable to forward event to cluster peer %s", task.get_name() + ) + continue + except Exception as exc: # pylint: disable=broad-except + log.error( + "Unhandled error sending task %s", task.get_name(), exc_info=True + ) diff --git a/salt/master.py b/salt/master.py index a93c771358f..93015149224 100644 --- a/salt/master.py +++ b/salt/master.py @@ -139,7 +139,9 @@ class SMaster: return cls.secrets["aes"]["serial"].value @classmethod - def rotate_secrets(cls, opts=None, event=None, use_lock=True, owner=False): + def rotate_secrets( + cls, opts=None, event=None, use_lock=True, owner=False, publisher=None + ): log.info("Rotating master AES key") if opts is None: opts = {} @@ -159,6 +161,10 @@ class SMaster: ) if "serial" in secret_map: secret_map["serial"].value = 0 + + if publisher: + publisher.send_aes_key_event() + if event: event.fire_event({f"rotate_{secret_key}_key": True}, tag="key") @@ -180,6 +186,7 @@ class Maintenance(salt.utils.process.SignalHandlingProcess): :param dict opts: The salt options """ self.master_secrets = kwargs.pop("master_secrets", None) + self.ipc_publisher = kwargs.pop("ipc_publisher", None) super().__init__(**kwargs) self.opts = opts # How often do we perform the maintenance tasks @@ -331,27 +338,31 @@ class Maintenance(salt.utils.process.SignalHandlingProcess): if not to_rotate and self.opts.get("publish_session"): if self.opts.get("cluster_id", None): keyfile = os.path.join(self.opts["cluster_pki_dir"], ".aes") - else: - keyfile = os.path.join(self.opts["cachedir"], ".aes") - try: - stats = os.stat(keyfile) - except os.error as exc: - log.error("Unexpected condition while reading keyfile %s", exc) - return - if now - stats.st_mtime >= self.opts["publish_session"]: - salt.crypt.dropfile( - self.opts["cachedir"], self.opts["user"], self.opts["id"] - ) - # There is currently no concept of a leader in a master - # cluster. Lets fake it till we make it with a little - # waiting period. - time.sleep(drop_file_wait) - to_rotate = ( - salt.crypt.read_dropfile(self.opts["cachedir"]) == self.opts["id"] - ) + try: + stats = os.stat(keyfile) + except os.error as exc: + log.error("Unexpected condition while reading keyfile %s", exc) + return + if now - stats.st_mtime >= self.opts["publish_session"]: + salt.crypt.dropfile( + self.opts["cachedir"], self.opts["user"], self.opts["id"] + ) + # There is currently no concept of a leader in a master + # cluster. Lets fake it till we make it with a little + # waiting period. + time.sleep(drop_file_wait) + to_rotate = ( + salt.crypt.read_dropfile(self.opts["cachedir"]) + == self.opts["id"] + ) if to_rotate: - SMaster.rotate_secrets(self.opts, self.event, owner=True) + if self.opts.get("cluster_id", None): + SMaster.rotate_secrets( + self.opts, self.event, owner=True, publisher=self.ipc_publisher + ) + else: + SMaster.rotate_secrets(self.opts, self.event, owner=True) def handle_git_pillar(self): """ @@ -721,7 +732,7 @@ class Master(SMaster): with salt.utils.process.default_signals(signal.SIGINT, signal.SIGTERM): if self.opts["cluster_id"]: keypath = os.path.join(self.opts["cluster_pki_dir"], ".aes") - keygen = functools.partial( + cluster_keygen = functools.partial( salt.crypt.Crypticle.read_or_generate_key, keypath, ) @@ -729,14 +740,19 @@ class Master(SMaster): # them as well. SMaster.secrets["cluster_aes"] = { "secret": multiprocessing.Array( - ctypes.c_char, salt.utils.stringutils.to_bytes(keygen()) + ctypes.c_char, salt.utils.stringutils.to_bytes(cluster_keygen()) ), "serial": multiprocessing.Value( ctypes.c_longlong, lock=False, # We'll use the lock from 'secret' ), - "reload": keygen, + "reload": cluster_keygen, } + + # Wrap generate_key_string to ignore remove keyward arg. + def master_keygen(*args, **kwargs): + return salt.crypt.Crypticle.generate_key_string() + SMaster.secrets["aes"] = { "secret": multiprocessing.Array( ctypes.c_char, @@ -747,7 +763,7 @@ class Master(SMaster): "serial": multiprocessing.Value( ctypes.c_longlong, lock=False # We'll use the lock from 'secret' ), - "reload": salt.crypt.Crypticle.generate_key_string, + "reload": master_keygen, } log.info("Creating master process manager") @@ -792,7 +808,10 @@ class Master(SMaster): self.process_manager.add_process( Maintenance, args=(self.opts,), - kwargs={"master_secrets": SMaster.secrets}, + kwargs={ + "master_secrets": SMaster.secrets, + "ipc_publisher": ipc_publisher, + }, name="Maintenance", ) @@ -900,23 +919,27 @@ class EventMonitor(salt.utils.process.SignalHandlingProcess): channels = [] self.channels = channels - @tornado.gen.coroutine - def handle_event(self, package): + async def handle_event(self, package): """ Event handler for publish forwarder """ tag, data = salt.utils.event.SaltEvent.unpack(package) - log.error("got evetn %s %r", tag, data) + log.error("got event %s %r", tag, data) if tag.startswith("salt/job") and tag.endswith("/publish"): - # data.pop("_stamp", None) - log.trace("Forward job event to publisher server: %r", data) - # if not self.channels: - # for transport, opts in iter_transport_opts(self.opts): - # chan = salt.channel.server.PubServerChannel.factory(opts) - # self.channels.append(chan) - # for chan in self.channels: - # yield chan.publish(data) + peer_id = data.pop("__peer_id", None) + if peer_id: + data.pop("_stamp", None) + log.error("Forward job event to publisher server: %r", data) + if not self.channels: + for transport, opts in iter_transport_opts(self.opts): + chan = salt.channel.server.PubServerChannel.factory(opts) + self.channels.append(chan) + tasks = [] + for chan in self.channels: + tasks.append(asyncio.create_task(chan.publish(data))) + await asyncio.gather(*tasks) elif tag == "rotate_aes_key": + log.error("Recieved rotate_aes_key event") SMaster.rotate_secrets(self.opts, owner=False) def run(self): diff --git a/tests/pytests/scenarios/cluster/conftest.py b/tests/pytests/scenarios/cluster/conftest.py new file mode 100644 index 00000000000..6a7f5482e2f --- /dev/null +++ b/tests/pytests/scenarios/cluster/conftest.py @@ -0,0 +1,194 @@ +import logging +import subprocess + +import pytest + +import salt.utils.platform + +log = logging.getLogger(__name__) + + +@pytest.fixture +def cluster_shared_path(tmp_path): + path = tmp_path / "cluster" + path.mkdir() + return path + + +@pytest.fixture +def cluster_pki_path(cluster_shared_path): + path = cluster_shared_path / "pki" + path.mkdir() + (path / "peers").mkdir() + return path + + +@pytest.fixture +def cluster_cache_path(cluster_shared_path): + path = cluster_shared_path / "cache" + path.mkdir() + return path + + +@pytest.fixture +def cluster_master_1(request, salt_factories, cluster_pki_path, cluster_cache_path): + config_defaults = { + "open_mode": True, + "transport": request.config.getoption("--transport"), + } + config_overrides = { + "interface": "127.0.0.1", + "cluster_id": "master_cluster", + "cluster_peers": [ + "127.0.0.2", + ], + "cluster_pki_dir": str(cluster_pki_path), + "cache_dir": str(cluster_cache_path), + } + factory = salt_factories.salt_master_daemon( + "127.0.0.1", + defaults=config_defaults, + overrides=config_overrides, + extra_cli_arguments_after_first_start_failure=["--log-level=info"], + ) + with factory.started(start_timeout=120): + yield factory + + +# @pytest.fixture(scope="package") +# def cluster_master_1_salt_cli(cluster_master_1): +# return cluster__master_1.salt_cli(timeout=120) +# +# +@pytest.fixture +def cluster_master_2(salt_factories, cluster_master_1): + if salt.utils.platform.is_darwin() or salt.utils.platform.is_freebsd(): + subprocess.check_output(["ifconfig", "lo0", "alias", "127.0.0.2", "up"]) + + config_defaults = { + "open_mode": True, + "transport": cluster_master_1.config["transport"], + } + config_overrides = { + "interface": "127.0.0.2", + "cluster_id": "master_cluster", + "cluster_peers": [ + "127.0.0.1", + ], + "cluster_pki_dir": cluster_master_1.config["cluster_pki_dir"], + "cache_dir": cluster_master_1.config["cache_dir"], + } + + # Use the same ports for both masters, they are binding to different interfaces + for key in ( + "ret_port", + "publish_port", + ): + config_overrides[key] = cluster_master_1.config[key] + factory = salt_factories.salt_master_daemon( + "127.0.0.2", + defaults=config_defaults, + overrides=config_overrides, + extra_cli_arguments_after_first_start_failure=["--log-level=info"], + ) + with factory.started(start_timeout=120): + yield factory + + +# +# @pytest.fixture(scope="package") +# def cluster_master_2_salt_cli(cluster_master_2): +# return cluster_master_2.salt_cli(timeout=120) +# +# +@pytest.fixture +def cluster_minion_1(cluster_master_1): + config_defaults = { + "transport": cluster_master_1.config["transport"], + } + + port = cluster_master_1.config["ret_port"] + addr = cluster_master_1.config["interface"] + config_overrides = { + "master": f"{addr}:{port}", + "test.foo": "baz", + } + factory = cluster_master_1.salt_minion_daemon( + "cluster-minion-1", + defaults=config_defaults, + overrides=config_overrides, + extra_cli_arguments_after_first_start_failure=["--log-level=info"], + ) + with factory.started(start_timeout=120): + yield factory + + +# +# +# @pytest.fixture(scope="package") +# def cluster_minion_2(cluster_master_2): +# config_defaults = { +# "transport": cluster_master_2.config["transport"], +# } +# +# port = cluster_master_2.config["ret_port"] +# addr = cluster_master_2.config["interface"] +# config_overrides = { +# "master": f"{port}:{addr}", +# "test.foo": "baz", +# } +# factory = salt_mm_master_1.salt_minion_daemon( +# "cluster-minion-2", +# defaults=config_defaults, +# overrides=config_overrides, +# extra_cli_arguments_after_first_start_failure=["--log-level=info"], +# ) +# with factory.started(start_timeout=120): +# yield factory +# +# +# +# @pytest.fixture(scope="package") +# def run_salt_cmds(): +# def _run_salt_cmds_fn(clis, minions): +# """ +# Run test.ping from all clis to all minions +# """ +# returned_minions = [] +# minion_instances = {minion.id: minion for minion in minions} +# clis_to_check = {minion.id: list(clis) for minion in minions} +# +# attempts = 6 +# timeout = 5 +# if salt.utils.platform.spawning_platform(): +# timeout *= 2 +# while attempts: +# if not clis_to_check: +# break +# for minion in list(clis_to_check): +# if not clis_to_check[minion]: +# clis_to_check.pop(minion) +# continue +# for cli in list(clis_to_check[minion]): +# try: +# ret = cli.run( +# "--timeout={}".format(timeout), +# "test.ping", +# minion_tgt=minion, +# _timeout=2 * timeout, +# ) +# if ret.returncode == 0 and ret.data is True: +# returned_minions.append((cli, minion_instances[minion])) +# clis_to_check[minion].remove(cli) +# except FactoryTimeout: +# log.debug( +# "Failed to execute test.ping from %s to %s.", +# cli.get_display_name(), +# minion, +# ) +# time.sleep(1) +# attempts -= 1 +# +# return returned_minions +# +# return _run_salt_cmds_fn diff --git a/tests/pytests/scenarios/cluster/test_basic_cluster.py b/tests/pytests/scenarios/cluster/test_basic_cluster.py new file mode 100644 index 00000000000..0886bc98de2 --- /dev/null +++ b/tests/pytests/scenarios/cluster/test_basic_cluster.py @@ -0,0 +1,64 @@ +import salt.utils.event + + +def test_basic_cluster_setup( + cluster_master_1, cluster_master_2, cluster_pki_path, cluster_cache_path +): + cli1 = cluster_master_1.salt_run_cli(timeout=120) + ret = cli1.run("config.get", "cluster_pki_dir") + assert str(cluster_pki_path) == ret.stdout + ret = cli1.run("config.get", "cache_dir") + assert str(cluster_cache_path) == ret.stdout + ret = cli1.run("config.get", "cluster_peers") + assert ["127.0.0.2"] == ret.data + + cli2 = cluster_master_2.salt_run_cli(timeout=120) + ret = cli2.run("config.get", "cluster_pki_dir") + assert str(cluster_pki_path) == ret.stdout + ret = cli2.run("config.get", "cache_dir") + assert str(cluster_cache_path) == ret.stdout + ret = cli2.run("config.get", "cluster_peers") + assert ["127.0.0.1"] == ret.data + + peers_path = cluster_pki_path / "peers" + unexpected = False + found = [] + for key_path in peers_path.iterdir(): + if key_path.name == "127.0.0.1.pub": + found.append("127.0.0.1") + elif key_path.name == "127.0.0.2.pub": + found.append("127.0.0.2") + else: + unexpected = True + + found.sort() + + assert ["127.0.0.1", "127.0.0.2"] == found + assert unexpected is False + + assert (cluster_pki_path / ".aes").exists() + + +def test_basic_cluster_event(cluster_master_1, cluster_master_2): + with salt.utils.event.get_event( + "master", opts=cluster_master_2.config, listen=True + ) as event2: + event1 = salt.utils.event.get_event("master", opts=cluster_master_2.config) + data = {"meh": "bah"} + event1.fire_event(data, "meh/bah") + evt = event2.get_event(tag="meh/bah", wait=5) + assert data == evt + + +def test_basic_cluster_minion_1(cluster_master_1, cluster_master_2, cluster_minion_1): + cli = cluster_master_1.salt_cli(timeout=120) + ret = cli.run("test.ping", minion_tgt="cluster-minion-1") + assert ret.data is True + + +def test_basic_cluster_minion_1_from_master_2( + cluster_master_1, cluster_master_2, cluster_minion_1 +): + cli = cluster_master_2.salt_cli(timeout=120) + ret = cli.run("test.ping", minion_tgt="cluster-minion-1") + assert ret.data is True diff --git a/tests/pytests/unit/test_crypt.py b/tests/pytests/unit/test_crypt.py index 8b6a698645b..6b5b054fb5a 100644 --- a/tests/pytests/unit/test_crypt.py +++ b/tests/pytests/unit/test_crypt.py @@ -344,8 +344,8 @@ def test_master_keys_with_cluster_id(tmp_path, master_opts): @pytest.mark.skipif(not HAS_PYCRYPTO_RSA, reason="pycrypto >= 2.6 is not available") @pytest.mark.skipif(HAS_M2, reason="m2crypto is used by salt.crypt if installed") def test_pycrypto_gen_keys(): - open_priv_wb = MockCall("/keydir{}keyname.pem".format(os.sep), "wb+") - open_pub_wb = MockCall("/keydir{}keyname.pub".format(os.sep), "wb+") + open_priv_wb = MockCall(f"/keydir{os.sep}keyname.pem", "wb+") + open_pub_wb = MockCall(f"/keydir{os.sep}keyname.pub", "wb+") with patch.multiple( os, @@ -357,7 +357,7 @@ def test_pycrypto_gen_keys(): "os.path.isfile", return_value=True ): result = salt.crypt.gen_keys("/keydir", "keyname", 2048) - assert result == "/keydir{}keyname.pem".format(os.sep), result + assert result == f"/keydir{os.sep}keyname.pem", result assert open_priv_wb not in m_open.calls assert open_pub_wb not in m_open.calls @@ -434,20 +434,22 @@ def test_m2_gen_keys(): with patch("M2Crypto.RSA.RSA.save_pem", MagicMock()) as save_pem: with patch("M2Crypto.RSA.RSA.save_pub_key", MagicMock()) as save_pub: with patch("os.path.isfile", return_value=True): - assert salt.crypt.gen_keys( - "/keydir", "keyname", 2048 - ) == "/keydir{}keyname.pem".format(os.sep) + assert ( + salt.crypt.gen_keys("/keydir", "keyname", 2048) + == f"/keydir{os.sep}keyname.pem" + ) save_pem.assert_not_called() save_pub.assert_not_called() with patch("os.path.isfile", return_value=False): - salt.crypt.gen_keys( - "/keydir", "keyname", 2048 - ) == "/keydir{}keyname.pem".format(os.sep) - save_pem.assert_called_once_with( - "/keydir{}keyname.pem".format(os.sep), cipher=None + assert ( + salt.crypt.gen_keys("/keydir", "keyname", 2048) + == f"/keydir{os.sep}keyname.pem" ) - save_pub.assert_called_once_with("/keydir{}keyname.pub".format(os.sep)) + save_pem.assert_called_once_with( + f"/keydir{os.sep}keyname.pem", cipher=None + ) + save_pub.assert_called_once_with(f"/keydir{os.sep}keyname.pub") @patch("os.umask", MagicMock()) @@ -460,24 +462,30 @@ def test_gen_keys_with_passphrase(): with patch("M2Crypto.RSA.RSA.save_pem", MagicMock()) as save_pem: with patch("M2Crypto.RSA.RSA.save_pub_key", MagicMock()) as save_pub: with patch("os.path.isfile", return_value=True): - assert salt.crypt.gen_keys( - "/keydir", "keyname", 2048, passphrase="password" - ) == "/keydir{}keyname.pem".format(os.sep) + assert ( + salt.crypt.gen_keys( + "/keydir", "keyname", 2048, passphrase="password" + ) + == f"/keydir{os.sep}keyname.pem" + ) save_pem.assert_not_called() save_pub.assert_not_called() with patch("os.path.isfile", return_value=False): - assert salt.crypt.gen_keys( - "/keydir", "keyname", 2048, passphrase="password" - ) == "/keydir{}keyname.pem".format(os.sep) + assert ( + salt.crypt.gen_keys( + "/keydir", "keyname", 2048, passphrase="password" + ) + == f"/keydir{os.sep}keyname.pem" + ) callback = save_pem.call_args[1]["callback"] save_pem.assert_called_once_with( - "/keydir{}keyname.pem".format(os.sep), + f"/keydir{os.sep}keyname.pem", cipher="des_ede3_cbc", callback=callback, ) assert callback(None) == b"password" - save_pub.assert_called_once_with("/keydir{}keyname.pub".format(os.sep)) + save_pub.assert_called_once_with(f"/keydir{os.sep}keyname.pub") @pytest.mark.skipif(not HAS_M2, reason="m2crypto is not available") @@ -528,15 +536,6 @@ def test_m2_bad_key(key_to_test): assert key.check_key() == 1 -@pytest.mark.skipif(not HAS_M2, reason="Skip when m2crypto is not installed") -def test_m2_bad_key(key_to_test): - """ - Load public key with an invalid header using m2crypto and validate it - """ - key = salt.crypt.get_rsa_pub_key(key_to_test) - assert key.check_key() == 1 - - @pytest.mark.skipif(HAS_M2, reason="Skip when m2crypto is installed") def test_pycrypto_bad_key(key_to_test): """ diff --git a/tests/pytests/unit/test_master.py b/tests/pytests/unit/test_master.py index 7e003cd1c2d..198c63d8602 100644 --- a/tests/pytests/unit/test_master.py +++ b/tests/pytests/unit/test_master.py @@ -1,5 +1,6 @@ import os import pathlib +import stat import threading import time @@ -11,7 +12,7 @@ from tests.support.mock import MagicMock, patch @pytest.fixture -def maintenence_opts(master_opts): +def maintenance_opts(master_opts): """ Options needed for master's Maintenence class """ @@ -21,11 +22,11 @@ def maintenence_opts(master_opts): @pytest.fixture -def maintenence(maintenence_opts): +def maintenance(maintenance_opts): """ The master's Maintenence class """ - return salt.master.Maintenance(maintenence_opts) + return salt.master.Maintenance(maintenance_opts) @pytest.fixture @@ -40,6 +41,29 @@ def clear_funcs(master_opts): clear_funcs.destroy() +@pytest.fixture +def cluster_maintenance_opts(master_opts, tmp_path): + """ + Options needed for master's Maintenence class + """ + opts = master_opts.copy() + opts.update( + git_pillar_update_interval=180, + maintenance_interval=181, + cluster_pki_dir=tmp_path, + cluster_id="test-cluster", + ) + yield opts + + +@pytest.fixture +def cluster_maintenance(cluster_maintenance_opts): + """ + The master's Maintenence class + """ + return salt.master.Maintenance(cluster_maintenance_opts) + + @pytest.fixture def encrypted_requests(tmp_path): # To honor the comment on AESFuncs @@ -854,7 +878,7 @@ async def test_publish_user_authorization_error(clear_funcs): assert await clear_funcs.publish(load) == mock_ret -def test_run_func(maintenence): +def test_run_func(maintenance): """ Test the run function inside Maintenance class. """ @@ -917,7 +941,7 @@ def test_run_func(maintenence): "salt.utils.verify.check_max_open_files", mocked_check_max_open_files ): try: - maintenence.run() + maintenance.run() except RuntimeError as exc: assert str(exc) == "Time passes" assert mocked_time._calls == [60] * 4 @@ -933,51 +957,48 @@ def test_run_func(maintenence): assert mocked_check_max_open_files.call_times == [0, 60, 120, 180] -def test_key_rotate_master_match(maintenence): - maintenence.event = MagicMock() +def test_key_rotate_master_match(maintenance): + maintenance.event = MagicMock() now = time.monotonic() - dfn = pathlib.Path(maintenence.opts["cachedir"]) / ".dfn" + dfn = pathlib.Path(maintenance.opts["cachedir"]) / ".dfn" salt.crypt.dropfile( - maintenence.opts["cachedir"], - maintenence.opts["user"], - master_id=maintenence.opts["id"], + maintenance.opts["cachedir"], + maintenance.opts["user"], + master_id=maintenance.opts["id"], ) assert dfn.exists() with patch("salt.master.SMaster.rotate_secrets") as rotate_secrets: - maintenence.handle_key_rotate(now) + maintenance.handle_key_rotate(now) assert not dfn.exists() rotate_secrets.assert_called_with( - maintenence.opts, maintenence.event, owner=True + maintenance.opts, maintenance.event, owner=True ) -def test_key_rotate_no_master_match(maintenence): +def test_key_rotate_no_master_match(maintenance): now = time.monotonic() - dfn = pathlib.Path(maintenence.opts["cachedir"]) / ".dfn" + dfn = pathlib.Path(maintenance.opts["cachedir"]) / ".dfn" dfn.write_text("nomatch") assert dfn.exists() with patch("salt.master.SMaster.rotate_secrets") as rotate_secrets: - maintenence.handle_key_rotate(now) + maintenance.handle_key_rotate(now) assert dfn.exists() rotate_secrets.assert_not_called() -import stat - - @pytest.mark.slow_test -def test_key_dfn_wait(maintenence): +def test_key_dfn_wait(cluster_maintenance): now = time.monotonic() - key = pathlib.Path(maintenence.opts["cachedir"]) / ".aes" + key = pathlib.Path(cluster_maintenance.opts["cluster_pki_dir"]) / ".aes" salt.crypt.Crypticle.read_or_generate_key(str(key)) - rotate_time = time.monotonic() - (maintenence.opts["publish_session"] + 1) + rotate_time = time.monotonic() - (cluster_maintenance.opts["publish_session"] + 1) os.utime(str(key), (rotate_time, rotate_time)) - dfn = pathlib.Path(maintenence.opts["cachedir"]) / ".dfn" + dfn = pathlib.Path(cluster_maintenance.opts["cachedir"]) / ".dfn" def run_key_rotate(): with patch("salt.master.SMaster.rotate_secrets") as rotate_secrets: - maintenence.handle_key_rotate(now) + cluster_maintenance.handle_key_rotate(now) assert dfn.exists() rotate_secrets.assert_not_called() @@ -990,7 +1011,7 @@ def test_key_dfn_wait(maintenence): if time.monotonic() - start > 30: assert dfn.exists(), "dfn file never created" - assert maintenence.opts["id"] == dfn.read_text() + assert cluster_maintenance.opts["id"] == dfn.read_text() with salt.utils.files.set_umask(0o277): if os.path.isfile(dfn) and not os.access(dfn, os.W_OK):