diff --git a/salt/crypt.py b/salt/crypt.py index a509e496495..0003b024d9f 100644 --- a/salt/crypt.py +++ b/salt/crypt.py @@ -81,10 +81,20 @@ if not HAS_M2 and not HAS_CRYPTO: log = logging.getLogger(__name__) -def dropfile(cachedir, user=None): +def read_dropfile(cachedir): + dfn = os.path.join(cachedir, ".dfn") + try: + with salt.utils.files.fopen(dfn, "r") as fp: + return fp.read() + except FileNotFoundError: + pass + + +def dropfile(cachedir, user=None, master_id=""): """ Set an AES dropfile to request the master update the publish session key """ + dfn_next = os.path.join(cachedir, ".dfn-next") dfn = os.path.join(cachedir, ".dfn") # set a mask (to avoid a race condition on file creation) and store original. with salt.utils.files.set_umask(0o277): @@ -95,17 +105,18 @@ def dropfile(cachedir, user=None): if os.path.isfile(dfn) and not os.access(dfn, os.W_OK): os.chmod(dfn, stat.S_IRUSR | stat.S_IWUSR) - with salt.utils.files.fopen(dfn, "wb+") as fp_: - fp_.write(b"") - os.chmod(dfn, stat.S_IRUSR) + with salt.utils.files.fopen(dfn_next, "w+") as fp_: + fp_.write(master_id) + os.chmod(dfn_next, stat.S_IRUSR) if user: try: import pwd uid = pwd.getpwnam(user).pw_uid - os.chown(dfn, uid, -1) + os.chown(dfn_next, uid, -1) except (KeyError, ImportError, OSError): pass + os.rename(dfn_next, dfn) def gen_keys(keydir, keyname, keysize, user=None, passphrase=None): @@ -1444,6 +1455,21 @@ class Crypticle: # Return data must be a base64-encoded string, not a unicode type return b64key.replace("\n", "") + @classmethod + def read_or_generate_key(cls, path, key_size=192, remove=False): + if remove: + os.remove(path) + with salt.utils.files.set_umask(0o177): + try: + with salt.utils.files.fopen(path, "r") as fp: + return fp.read() + except FileNotFoundError: + pass + key = cls.generate_key_string(key_size) + with salt.utils.files.fopen(path, "w") as fp: + fp.write(key) + return key + @classmethod def extract_keys(cls, key_string, key_size): key = salt.utils.stringutils.to_bytes(base64.b64decode(key_string)) diff --git a/salt/key.py b/salt/key.py index 3d464b46a2b..14ccf450f3c 100644 --- a/salt/key.py +++ b/salt/key.py @@ -725,7 +725,9 @@ class Key: else: self.check_minion_cache() if self.opts.get("rotate_aes_key"): - salt.crypt.dropfile(self.opts["cachedir"], self.opts["user"]) + salt.crypt.dropfile( + self.opts["cachedir"], self.opts["user"], self.opts["id"] + ) return self.name_match(match) if match is not None else self.dict_match(matches) def delete_den(self): @@ -758,7 +760,9 @@ class Key: pass self.check_minion_cache() if self.opts.get("rotate_aes_key"): - salt.crypt.dropfile(self.opts["cachedir"], self.opts["user"]) + salt.crypt.dropfile( + self.opts["cachedir"], self.opts["user"], self.opts["id"] + ) return self.list_keys() def reject( @@ -792,7 +796,9 @@ class Key: pass self.check_minion_cache() if self.opts.get("rotate_aes_key"): - salt.crypt.dropfile(self.opts["cachedir"], self.opts["user"]) + salt.crypt.dropfile( + self.opts["cachedir"], self.opts["user"], self.opts["id"] + ) return self.name_match(match) if match is not None else self.dict_match(matches) def reject_all(self): @@ -812,7 +818,9 @@ class Key: pass self.check_minion_cache() if self.opts.get("rotate_aes_key"): - salt.crypt.dropfile(self.opts["cachedir"], self.opts["user"]) + salt.crypt.dropfile( + self.opts["cachedir"], self.opts["user"], self.opts["id"] + ) return self.list_keys() def finger(self, match, hash_type=None): diff --git a/salt/master.py b/salt/master.py index e08ab9fc9eb..d231b1d956e 100644 --- a/salt/master.py +++ b/salt/master.py @@ -5,6 +5,7 @@ involves preparing the three listeners and the workers needed by the master. import collections import copy import ctypes +import functools import logging import multiprocessing import os @@ -137,7 +138,7 @@ class SMaster: return cls.secrets["aes"]["serial"].value @classmethod - def rotate_secrets(cls, opts=None, event=None, use_lock=True): + def rotate_secrets(cls, opts=None, event=None, use_lock=True, owner=False): log.info("Rotating master AES key") if opts is None: opts = {} @@ -147,13 +148,13 @@ class SMaster: if use_lock: with secret_map["secret"].get_lock(): secret_map["secret"].value = salt.utils.stringutils.to_bytes( - secret_map["reload"]() + secret_map["reload"](remove=owner) ) if "serial" in secret_map: secret_map["serial"].value = 0 else: secret_map["secret"].value = salt.utils.stringutils.to_bytes( - secret_map["reload"]() + secret_map["reload"](remove=owner) ) if "serial" in secret_map: secret_map["serial"].value = 0 @@ -182,8 +183,6 @@ class Maintenance(salt.utils.process.SignalHandlingProcess): self.opts = opts # How often do we perform the maintenance tasks self.loop_interval = int(self.opts["loop_interval"]) - # Track key rotation intervals - self.rotate = int(time.time()) # A serializer for general maint operations self.restart_interval = int(self.opts["maintenance_interval"]) @@ -299,7 +298,7 @@ class Maintenance(salt.utils.process.SignalHandlingProcess): ) as cache_file: salt.payload.dump(keys, cache_file) - def handle_key_rotate(self, now): + def handle_key_rotate(self, now, drop_file_wait=5): """ Rotate the AES key rotation """ @@ -310,24 +309,45 @@ class Maintenance(salt.utils.process.SignalHandlingProcess): # Basic Windows permissions don't distinguish between # user/group/all. Check for read-only state instead. if salt.utils.platform.is_windows() and not os.access(dfn, os.W_OK): - to_rotate = True + to_rotate = ( + salt.crypt.read_dropfile(self.opts["cachedir"]) == self.opts["id"] + ) # Cannot delete read-only files on Windows. os.chmod(dfn, stat.S_IRUSR | stat.S_IWUSR) elif stats.st_mode == 0o100400: - to_rotate = True + to_rotate = ( + salt.crypt.read_dropfile(self.opts["cachedir"]) == self.opts["id"] + ) else: log.error("Found dropfile with incorrect permissions, ignoring...") - os.remove(dfn) + if to_rotate: + os.remove(dfn) except os.error: pass - if self.opts.get("publish_session"): - if now - self.rotate >= self.opts["publish_session"]: - to_rotate = True + # There is no need to check key against publish_session if we're + # already rotating. + if not to_rotate and self.opts.get("publish_session"): + keyfile = os.path.join(self.opts["cachedir"], ".aes") + try: + stats = os.stat(keyfile) + except os.error as exc: + log.error("Unexpected condition while reading keyfile %s", exc) + return + if now - stats.st_mtime >= self.opts["publish_session"]: + salt.crypt.dropfile( + self.opts["cachedir"], self.opts["user"], self.opts["id"] + ) + # There is currently no concept of a leader in a master + # cluster. Lets fake it till we make it with a little + # waiting period. + time.sleep(drop_file_wait) + to_rotate = ( + salt.crypt.read_dropfile(self.opts["cachedir"]) == self.opts["id"] + ) if to_rotate: - SMaster.rotate_secrets(self.opts, self.event) - self.rotate = now + SMaster.rotate_secrets(self.opts, self.event, owner=True) def handle_git_pillar(self): """ @@ -695,20 +715,22 @@ class Master(SMaster): # manager. We don't want the processes being started to inherit those # signal handlers with salt.utils.process.default_signals(signal.SIGINT, signal.SIGTERM): + keypath = os.path.join(self.opts["cachedir"], ".aes") + keygen = functools.partial( + salt.crypt.Crypticle.read_or_generate_key, + keypath, + ) # Setup the secrets here because the PubServerChannel may need # them as well. SMaster.secrets["aes"] = { "secret": multiprocessing.Array( - ctypes.c_char, - salt.utils.stringutils.to_bytes( - salt.crypt.Crypticle.generate_key_string() - ), + ctypes.c_char, salt.utils.stringutils.to_bytes(keygen()) ), "serial": multiprocessing.Value( ctypes.c_longlong, lock=False # We'll use the lock from 'secret' ), - "reload": salt.crypt.Crypticle.generate_key_string, + "reload": keygen, } log.info("Creating master process manager") @@ -732,9 +754,9 @@ class Master(SMaster): ) self.process_manager.add_process( - PublishForwarder, + EventMonitor, args=[self.opts], - name="PublishForwarder", + name="EventMonitor", ) if self.opts.get("reactor"): @@ -848,12 +870,15 @@ class Master(SMaster): sys.exit(0) -class PublishForwarder(salt.utils.process.SignalHandlingProcess): +class EventMonitor(salt.utils.process.SignalHandlingProcess): """ - Forward events from the event bus to the publish server. + Monitor the master event bus. + + - Forward publish events to minion event publisher. + - Handle key rotate events. """ - def __init__(self, opts, channels=None, name="PublishForwarder"): + def __init__(self, opts, channels=None, name="EventMonitor"): super().__init__(name=name) self.opts = opts if channels is None: @@ -866,15 +891,17 @@ class PublishForwarder(salt.utils.process.SignalHandlingProcess): Event handler for publish forwarder """ tag, data = salt.utils.event.SaltEvent.unpack(package) - log.error("event tag is %r", tag) if tag.startswith("salt/job") and tag.endswith("/publish"): - log.info("Forward job event to publisher server: %r", package) + data.pop("_stamp", None) + log.trace("Forward job event to publisher server: %r", data) if not self.channels: for transport, opts in iter_transport_opts(self.opts): chan = salt.channel.server.PubServerChannel.factory(opts) self.channels.append(chan) for chan in self.channels: yield chan.publish(data) + elif tag == "rotate_aes_key": + SMaster.rotate_secrets(self.opts, owner=False) def run(self): io_loop = tornado.ioloop.IOLoop() diff --git a/tests/pytests/unit/test_crypt.py b/tests/pytests/unit/test_crypt.py index e3c98ab6366..a4fd73659ab 100644 --- a/tests/pytests/unit/test_crypt.py +++ b/tests/pytests/unit/test_crypt.py @@ -170,3 +170,22 @@ def test_verify_signature_bad_sig(tmp_path): msg = b"foo bar" sig = salt.crypt.sign_message(str(tmp_path.joinpath("foo.pem")), msg) assert not salt.crypt.verify_signature(str(tmp_path.joinpath("bar.pub")), msg, sig) + + +def test_read_or_generate_key_string(tempdir): + keyfile = tempdir.tempdir / ".aes" + assert not keyfile.exists() + first_key = salt.crypt.Crypticle.read_or_generate_key(keyfile) + assert keyfile.exists() + second_key = salt.crypt.Crypticle.read_or_generate_key(keyfile) + assert first_key == second_key + third_key = salt.crypt.Crypticle.read_or_generate_key(keyfile, remove=True) + assert second_key != third_key + + +def test_dropfile_contents(tempdir, master_opts): + salt.crypt.dropfile( + str(tempdir.tempdir), master_opts["user"], master_id=master_opts["id"] + ) + with salt.utils.files.fopen(str(tempdir.tempdir / ".dfn"), "r") as fp: + assert master_opts["id"] == fp.read() diff --git a/tests/pytests/unit/test_master.py b/tests/pytests/unit/test_master.py index a0ad619711a..29764700263 100644 --- a/tests/pytests/unit/test_master.py +++ b/tests/pytests/unit/test_master.py @@ -1,3 +1,6 @@ +import os +import pathlib +import threading import time import pytest @@ -927,3 +930,72 @@ def test_run_func(maintenence): assert mocked_handle_presence.call_times == [0, 60, 120, 180] assert mocked_handle_key_rotate.call_times == [0, 60, 120, 180] assert mocked_check_max_open_files.call_times == [0, 60, 120, 180] + + +def test_key_rotate_master_match(maintenence): + maintenence.event = MagicMock() + now = time.monotonic() + dfn = pathlib.Path(maintenence.opts["cachedir"]) / ".dfn" + salt.crypt.dropfile( + maintenence.opts["cachedir"], + maintenence.opts["user"], + master_id=maintenence.opts["id"], + ) + assert dfn.exists() + with patch("salt.master.SMaster.rotate_secrets") as rotate_secrets: + maintenence.handle_key_rotate(now) + assert not dfn.exists() + rotate_secrets.assert_called_with( + maintenence.opts, maintenence.event, owner=True + ) + + +def test_key_rotate_no_master_match(maintenence): + now = time.monotonic() + dfn = pathlib.Path(maintenence.opts["cachedir"]) / ".dfn" + dfn.write_text("nomatch") + assert dfn.exists() + with patch("salt.master.SMaster.rotate_secrets") as rotate_secrets: + maintenence.handle_key_rotate(now) + assert dfn.exists() + rotate_secrets.assert_not_called() + + +import stat + + +@pytest.mark.slow_test +def test_key_dfn_wait(maintenence): + now = time.monotonic() + key = pathlib.Path(maintenence.opts["cachedir"]) / ".aes" + salt.crypt.Crypticle.read_or_generate_key(str(key)) + rotate_time = time.monotonic() - (maintenence.opts["publish_session"] + 1) + os.utime(str(key), (rotate_time, rotate_time)) + + dfn = pathlib.Path(maintenence.opts["cachedir"]) / ".dfn" + + def run_key_rotate(): + with patch("salt.master.SMaster.rotate_secrets") as rotate_secrets: + maintenence.handle_key_rotate(now) + assert dfn.exists() + rotate_secrets.assert_not_called() + + thread = threading.Thread(target=run_key_rotate) + assert not dfn.exists() + start = time.monotonic() + thread.start() + + while not dfn.exists(): + if time.monotonic() - start > 30: + assert dfn.exists(), "dfn file never created" + + assert maintenence.opts["id"] == dfn.read_text() + + with salt.utils.files.set_umask(0o277): + if os.path.isfile(dfn) and not os.access(dfn, os.W_OK): + os.chmod(dfn, stat.S_IRUSR | stat.S_IWUSR) + dfn.write_text("othermaster") + + thread.join() + assert time.time() - start >= 5 + assert dfn.read_text() == "othermaster"