Handle key rotation with multiple masters

This commit is contained in:
Daniel A. Wozniak 2023-08-07 23:28:40 -07:00 committed by Gareth J. Greenaway
parent e407cb99ae
commit d7daa1c424
5 changed files with 187 additions and 35 deletions

View file

@ -81,10 +81,20 @@ if not HAS_M2 and not HAS_CRYPTO:
log = logging.getLogger(__name__)
def dropfile(cachedir, user=None):
def read_dropfile(cachedir):
dfn = os.path.join(cachedir, ".dfn")
try:
with salt.utils.files.fopen(dfn, "r") as fp:
return fp.read()
except FileNotFoundError:
pass
def dropfile(cachedir, user=None, master_id=""):
"""
Set an AES dropfile to request the master update the publish session key
"""
dfn_next = os.path.join(cachedir, ".dfn-next")
dfn = os.path.join(cachedir, ".dfn")
# set a mask (to avoid a race condition on file creation) and store original.
with salt.utils.files.set_umask(0o277):
@ -95,17 +105,18 @@ def dropfile(cachedir, user=None):
if os.path.isfile(dfn) and not os.access(dfn, os.W_OK):
os.chmod(dfn, stat.S_IRUSR | stat.S_IWUSR)
with salt.utils.files.fopen(dfn, "wb+") as fp_:
fp_.write(b"")
os.chmod(dfn, stat.S_IRUSR)
with salt.utils.files.fopen(dfn_next, "w+") as fp_:
fp_.write(master_id)
os.chmod(dfn_next, stat.S_IRUSR)
if user:
try:
import pwd
uid = pwd.getpwnam(user).pw_uid
os.chown(dfn, uid, -1)
os.chown(dfn_next, uid, -1)
except (KeyError, ImportError, OSError):
pass
os.rename(dfn_next, dfn)
def gen_keys(keydir, keyname, keysize, user=None, passphrase=None):
@ -1444,6 +1455,21 @@ class Crypticle:
# Return data must be a base64-encoded string, not a unicode type
return b64key.replace("\n", "")
@classmethod
def read_or_generate_key(cls, path, key_size=192, remove=False):
if remove:
os.remove(path)
with salt.utils.files.set_umask(0o177):
try:
with salt.utils.files.fopen(path, "r") as fp:
return fp.read()
except FileNotFoundError:
pass
key = cls.generate_key_string(key_size)
with salt.utils.files.fopen(path, "w") as fp:
fp.write(key)
return key
@classmethod
def extract_keys(cls, key_string, key_size):
key = salt.utils.stringutils.to_bytes(base64.b64decode(key_string))

View file

@ -725,7 +725,9 @@ class Key:
else:
self.check_minion_cache()
if self.opts.get("rotate_aes_key"):
salt.crypt.dropfile(self.opts["cachedir"], self.opts["user"])
salt.crypt.dropfile(
self.opts["cachedir"], self.opts["user"], self.opts["id"]
)
return self.name_match(match) if match is not None else self.dict_match(matches)
def delete_den(self):
@ -758,7 +760,9 @@ class Key:
pass
self.check_minion_cache()
if self.opts.get("rotate_aes_key"):
salt.crypt.dropfile(self.opts["cachedir"], self.opts["user"])
salt.crypt.dropfile(
self.opts["cachedir"], self.opts["user"], self.opts["id"]
)
return self.list_keys()
def reject(
@ -792,7 +796,9 @@ class Key:
pass
self.check_minion_cache()
if self.opts.get("rotate_aes_key"):
salt.crypt.dropfile(self.opts["cachedir"], self.opts["user"])
salt.crypt.dropfile(
self.opts["cachedir"], self.opts["user"], self.opts["id"]
)
return self.name_match(match) if match is not None else self.dict_match(matches)
def reject_all(self):
@ -812,7 +818,9 @@ class Key:
pass
self.check_minion_cache()
if self.opts.get("rotate_aes_key"):
salt.crypt.dropfile(self.opts["cachedir"], self.opts["user"])
salt.crypt.dropfile(
self.opts["cachedir"], self.opts["user"], self.opts["id"]
)
return self.list_keys()
def finger(self, match, hash_type=None):

View file

@ -5,6 +5,7 @@ involves preparing the three listeners and the workers needed by the master.
import collections
import copy
import ctypes
import functools
import logging
import multiprocessing
import os
@ -137,7 +138,7 @@ class SMaster:
return cls.secrets["aes"]["serial"].value
@classmethod
def rotate_secrets(cls, opts=None, event=None, use_lock=True):
def rotate_secrets(cls, opts=None, event=None, use_lock=True, owner=False):
log.info("Rotating master AES key")
if opts is None:
opts = {}
@ -147,13 +148,13 @@ class SMaster:
if use_lock:
with secret_map["secret"].get_lock():
secret_map["secret"].value = salt.utils.stringutils.to_bytes(
secret_map["reload"]()
secret_map["reload"](remove=owner)
)
if "serial" in secret_map:
secret_map["serial"].value = 0
else:
secret_map["secret"].value = salt.utils.stringutils.to_bytes(
secret_map["reload"]()
secret_map["reload"](remove=owner)
)
if "serial" in secret_map:
secret_map["serial"].value = 0
@ -182,8 +183,6 @@ class Maintenance(salt.utils.process.SignalHandlingProcess):
self.opts = opts
# How often do we perform the maintenance tasks
self.loop_interval = int(self.opts["loop_interval"])
# Track key rotation intervals
self.rotate = int(time.time())
# A serializer for general maint operations
self.restart_interval = int(self.opts["maintenance_interval"])
@ -299,7 +298,7 @@ class Maintenance(salt.utils.process.SignalHandlingProcess):
) as cache_file:
salt.payload.dump(keys, cache_file)
def handle_key_rotate(self, now):
def handle_key_rotate(self, now, drop_file_wait=5):
"""
Rotate the AES key rotation
"""
@ -310,24 +309,45 @@ class Maintenance(salt.utils.process.SignalHandlingProcess):
# Basic Windows permissions don't distinguish between
# user/group/all. Check for read-only state instead.
if salt.utils.platform.is_windows() and not os.access(dfn, os.W_OK):
to_rotate = True
to_rotate = (
salt.crypt.read_dropfile(self.opts["cachedir"]) == self.opts["id"]
)
# Cannot delete read-only files on Windows.
os.chmod(dfn, stat.S_IRUSR | stat.S_IWUSR)
elif stats.st_mode == 0o100400:
to_rotate = True
to_rotate = (
salt.crypt.read_dropfile(self.opts["cachedir"]) == self.opts["id"]
)
else:
log.error("Found dropfile with incorrect permissions, ignoring...")
os.remove(dfn)
if to_rotate:
os.remove(dfn)
except os.error:
pass
if self.opts.get("publish_session"):
if now - self.rotate >= self.opts["publish_session"]:
to_rotate = True
# There is no need to check key against publish_session if we're
# already rotating.
if not to_rotate and self.opts.get("publish_session"):
keyfile = os.path.join(self.opts["cachedir"], ".aes")
try:
stats = os.stat(keyfile)
except os.error as exc:
log.error("Unexpected condition while reading keyfile %s", exc)
return
if now - stats.st_mtime >= self.opts["publish_session"]:
salt.crypt.dropfile(
self.opts["cachedir"], self.opts["user"], self.opts["id"]
)
# There is currently no concept of a leader in a master
# cluster. Lets fake it till we make it with a little
# waiting period.
time.sleep(drop_file_wait)
to_rotate = (
salt.crypt.read_dropfile(self.opts["cachedir"]) == self.opts["id"]
)
if to_rotate:
SMaster.rotate_secrets(self.opts, self.event)
self.rotate = now
SMaster.rotate_secrets(self.opts, self.event, owner=True)
def handle_git_pillar(self):
"""
@ -695,20 +715,22 @@ class Master(SMaster):
# manager. We don't want the processes being started to inherit those
# signal handlers
with salt.utils.process.default_signals(signal.SIGINT, signal.SIGTERM):
keypath = os.path.join(self.opts["cachedir"], ".aes")
keygen = functools.partial(
salt.crypt.Crypticle.read_or_generate_key,
keypath,
)
# Setup the secrets here because the PubServerChannel may need
# them as well.
SMaster.secrets["aes"] = {
"secret": multiprocessing.Array(
ctypes.c_char,
salt.utils.stringutils.to_bytes(
salt.crypt.Crypticle.generate_key_string()
),
ctypes.c_char, salt.utils.stringutils.to_bytes(keygen())
),
"serial": multiprocessing.Value(
ctypes.c_longlong, lock=False # We'll use the lock from 'secret'
),
"reload": salt.crypt.Crypticle.generate_key_string,
"reload": keygen,
}
log.info("Creating master process manager")
@ -732,9 +754,9 @@ class Master(SMaster):
)
self.process_manager.add_process(
PublishForwarder,
EventMonitor,
args=[self.opts],
name="PublishForwarder",
name="EventMonitor",
)
if self.opts.get("reactor"):
@ -848,12 +870,15 @@ class Master(SMaster):
sys.exit(0)
class PublishForwarder(salt.utils.process.SignalHandlingProcess):
class EventMonitor(salt.utils.process.SignalHandlingProcess):
"""
Forward events from the event bus to the publish server.
Monitor the master event bus.
- Forward publish events to minion event publisher.
- Handle key rotate events.
"""
def __init__(self, opts, channels=None, name="PublishForwarder"):
def __init__(self, opts, channels=None, name="EventMonitor"):
super().__init__(name=name)
self.opts = opts
if channels is None:
@ -866,15 +891,17 @@ class PublishForwarder(salt.utils.process.SignalHandlingProcess):
Event handler for publish forwarder
"""
tag, data = salt.utils.event.SaltEvent.unpack(package)
log.error("event tag is %r", tag)
if tag.startswith("salt/job") and tag.endswith("/publish"):
log.info("Forward job event to publisher server: %r", package)
data.pop("_stamp", None)
log.trace("Forward job event to publisher server: %r", data)
if not self.channels:
for transport, opts in iter_transport_opts(self.opts):
chan = salt.channel.server.PubServerChannel.factory(opts)
self.channels.append(chan)
for chan in self.channels:
yield chan.publish(data)
elif tag == "rotate_aes_key":
SMaster.rotate_secrets(self.opts, owner=False)
def run(self):
io_loop = tornado.ioloop.IOLoop()

View file

@ -170,3 +170,22 @@ def test_verify_signature_bad_sig(tmp_path):
msg = b"foo bar"
sig = salt.crypt.sign_message(str(tmp_path.joinpath("foo.pem")), msg)
assert not salt.crypt.verify_signature(str(tmp_path.joinpath("bar.pub")), msg, sig)
def test_read_or_generate_key_string(tempdir):
keyfile = tempdir.tempdir / ".aes"
assert not keyfile.exists()
first_key = salt.crypt.Crypticle.read_or_generate_key(keyfile)
assert keyfile.exists()
second_key = salt.crypt.Crypticle.read_or_generate_key(keyfile)
assert first_key == second_key
third_key = salt.crypt.Crypticle.read_or_generate_key(keyfile, remove=True)
assert second_key != third_key
def test_dropfile_contents(tempdir, master_opts):
salt.crypt.dropfile(
str(tempdir.tempdir), master_opts["user"], master_id=master_opts["id"]
)
with salt.utils.files.fopen(str(tempdir.tempdir / ".dfn"), "r") as fp:
assert master_opts["id"] == fp.read()

View file

@ -1,3 +1,6 @@
import os
import pathlib
import threading
import time
import pytest
@ -927,3 +930,72 @@ def test_run_func(maintenence):
assert mocked_handle_presence.call_times == [0, 60, 120, 180]
assert mocked_handle_key_rotate.call_times == [0, 60, 120, 180]
assert mocked_check_max_open_files.call_times == [0, 60, 120, 180]
def test_key_rotate_master_match(maintenence):
maintenence.event = MagicMock()
now = time.monotonic()
dfn = pathlib.Path(maintenence.opts["cachedir"]) / ".dfn"
salt.crypt.dropfile(
maintenence.opts["cachedir"],
maintenence.opts["user"],
master_id=maintenence.opts["id"],
)
assert dfn.exists()
with patch("salt.master.SMaster.rotate_secrets") as rotate_secrets:
maintenence.handle_key_rotate(now)
assert not dfn.exists()
rotate_secrets.assert_called_with(
maintenence.opts, maintenence.event, owner=True
)
def test_key_rotate_no_master_match(maintenence):
now = time.monotonic()
dfn = pathlib.Path(maintenence.opts["cachedir"]) / ".dfn"
dfn.write_text("nomatch")
assert dfn.exists()
with patch("salt.master.SMaster.rotate_secrets") as rotate_secrets:
maintenence.handle_key_rotate(now)
assert dfn.exists()
rotate_secrets.assert_not_called()
import stat
@pytest.mark.slow_test
def test_key_dfn_wait(maintenence):
now = time.monotonic()
key = pathlib.Path(maintenence.opts["cachedir"]) / ".aes"
salt.crypt.Crypticle.read_or_generate_key(str(key))
rotate_time = time.monotonic() - (maintenence.opts["publish_session"] + 1)
os.utime(str(key), (rotate_time, rotate_time))
dfn = pathlib.Path(maintenence.opts["cachedir"]) / ".dfn"
def run_key_rotate():
with patch("salt.master.SMaster.rotate_secrets") as rotate_secrets:
maintenence.handle_key_rotate(now)
assert dfn.exists()
rotate_secrets.assert_not_called()
thread = threading.Thread(target=run_key_rotate)
assert not dfn.exists()
start = time.monotonic()
thread.start()
while not dfn.exists():
if time.monotonic() - start > 30:
assert dfn.exists(), "dfn file never created"
assert maintenence.opts["id"] == dfn.read_text()
with salt.utils.files.set_umask(0o277):
if os.path.isfile(dfn) and not os.access(dfn, os.W_OK):
os.chmod(dfn, stat.S_IRUSR | stat.S_IWUSR)
dfn.write_text("othermaster")
thread.join()
assert time.time() - start >= 5
assert dfn.read_text() == "othermaster"