mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Only the cluster aes key is on disk
This commit is contained in:
parent
f516003529
commit
c2a4baf11f
7 changed files with 484 additions and 179 deletions
|
@ -224,6 +224,7 @@ class AsyncReqChannel:
|
|||
tries,
|
||||
timeout,
|
||||
)
|
||||
log.error("WTF %r", ret)
|
||||
if HAS_M2:
|
||||
aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
|
||||
else:
|
||||
|
@ -670,64 +671,3 @@ class AsyncPullChannel:
|
|||
import salt.transport.ipc
|
||||
|
||||
return salt.transport.ipc.IPCMessageServer(opts, **kwargs)
|
||||
|
||||
|
||||
class AsyncMasterPubChannel:
|
||||
""" """
|
||||
|
||||
async_methods = [
|
||||
"connect",
|
||||
]
|
||||
|
||||
close_methods = [
|
||||
"close",
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def factory(cls, opts, **kwargs):
|
||||
io_loop = kwargs.get("io_loop")
|
||||
if io_loop is None:
|
||||
io_loop = tornado.ioloop.IOLoop.current()
|
||||
transport = salt.transport.ipc_publish_client(opts, "master")
|
||||
return cls(opts, transport, None, io_loop)
|
||||
|
||||
def __init__(self, opts, transport, auth, io_loop=None):
|
||||
self.opts = opts
|
||||
self.io_loop = io_loop
|
||||
self.auth = auth
|
||||
self.transport = transport
|
||||
self._closing = False
|
||||
self._reconnected = False
|
||||
|
||||
async def connect(self):
|
||||
"""
|
||||
Return a future which completes when connected to the remote publisher
|
||||
"""
|
||||
await self.transport.connect()
|
||||
|
||||
async def recv(self, timeout=None):
|
||||
return await self.transport.recv(timeout)
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
Close the channel
|
||||
"""
|
||||
self.transport.close()
|
||||
|
||||
def on_recv(self, callback=None):
|
||||
"""
|
||||
When jobs are received pass them (decoded) to callback
|
||||
"""
|
||||
return self.transport.on_recv(callback)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
self.io_loop.spawn_callback(self.close)
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *_):
|
||||
await self.close()
|
||||
|
|
|
@ -6,6 +6,7 @@ This includes server side transport, for the ReqServer and the Publisher
|
|||
|
||||
import asyncio
|
||||
import binascii
|
||||
import collections
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
|
@ -244,12 +245,16 @@ class ReqServerChannel:
|
|||
"""
|
||||
import salt.master
|
||||
|
||||
key = "aes"
|
||||
if self.opts.get("cluster_id", None):
|
||||
key = "cluster_aes"
|
||||
|
||||
if (
|
||||
salt.master.SMaster.secrets["aes"]["secret"].value
|
||||
salt.master.SMaster.secrets[key]["secret"].value
|
||||
!= self.crypticle.key_string
|
||||
):
|
||||
self.crypticle = salt.crypt.Crypticle(
|
||||
self.opts, salt.master.SMaster.secrets["aes"]["secret"].value
|
||||
self.opts, salt.master.SMaster.secrets[key]["secret"].value
|
||||
)
|
||||
return True
|
||||
return False
|
||||
|
@ -648,7 +653,7 @@ class ReqServerChannel:
|
|||
)
|
||||
else:
|
||||
mtoken = mcipher.decrypt(load["token"])
|
||||
aes = "{}_|-{}".format(self.aes_key, mtoken)
|
||||
aes = f"{self.aes_key}_|-{mtoken}"
|
||||
except Exception: # pylint: disable=broad-except
|
||||
# Token failed to decrypt, send back the salty bacon to
|
||||
# support older minions
|
||||
|
@ -999,20 +1004,23 @@ class MasterPubServerChannel:
|
|||
self.io_loop = tornado.ioloop.IOLoop.current()
|
||||
tcp_master_pool_port = 4520
|
||||
self.pushers = []
|
||||
for master in self.opts.get("cluster_peers", []):
|
||||
self.auth_errors = {}
|
||||
for peer in self.opts.get("cluster_peers", []):
|
||||
pusher = salt.transport.tcp.TCPPublishServer(
|
||||
self.opts,
|
||||
pull_host=master,
|
||||
pull_host=peer,
|
||||
pull_port=tcp_master_pool_port,
|
||||
)
|
||||
self.auth_errors[peer] = collections.deque()
|
||||
self.pushers.append(pusher)
|
||||
self.pool_puller = salt.transport.tcp.TCPPuller(
|
||||
host=self.opts["interface"],
|
||||
port=tcp_master_pool_port,
|
||||
io_loop=self.io_loop,
|
||||
payload_handler=self.handle_pool_publish,
|
||||
)
|
||||
self.pool_puller.start()
|
||||
if self.opts.get("cluster_id", None):
|
||||
self.pool_puller = salt.transport.tcp.TCPPuller(
|
||||
host=self.opts["interface"],
|
||||
port=tcp_master_pool_port,
|
||||
io_loop=self.io_loop,
|
||||
payload_handler=self.handle_pool_publish,
|
||||
)
|
||||
self.pool_puller.start()
|
||||
self.io_loop.add_callback(
|
||||
self.transport.publisher,
|
||||
self.publish_payload,
|
||||
|
@ -1061,20 +1069,46 @@ class MasterPubServerChannel:
|
|||
if self.peer_keys[peer] != key_str:
|
||||
self.peer_keys[peer] = key_str
|
||||
self.send_aes_key_event()
|
||||
while self.auth_errors[peer]:
|
||||
key, data = self.auth_errors[peer].popleft()
|
||||
peer_id, parsed_tag = self.parse_cluster_tag(tag)
|
||||
try:
|
||||
event_data = self.extract_cluster_event(peer_id, data)
|
||||
except salt.exceptions.AuthenticationError:
|
||||
log.error(
|
||||
"Event from peer failed authentication: %s", peer_id
|
||||
)
|
||||
else:
|
||||
await self.transport.publish_payload(
|
||||
salt.utils.event.SaltEvent.pack(
|
||||
parsed_tag, event_data
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.peer_keys[peer] = key_str
|
||||
self.send_aes_key_event()
|
||||
while self.auth_errors[peer]:
|
||||
key, data = self.auth_errors[peer].popleft()
|
||||
peer_id, parsed_tag = self.parse_cluster_tag(tag)
|
||||
try:
|
||||
event_data = self.extract_cluster_event(peer_id, data)
|
||||
except salt.exceptions.AuthenticationError:
|
||||
log.error(
|
||||
"Event from peer failed authentication: %s", peer_id
|
||||
)
|
||||
else:
|
||||
await self.transport.publish_payload(
|
||||
salt.utils.event.SaltEvent.pack(parsed_tag, event_data)
|
||||
)
|
||||
elif tag.startswith("cluster/event"):
|
||||
peer_id = tag.replace("cluster/event/", "").split("/")[0]
|
||||
stripped_tag = tag.replace(f"cluster/event/{peer_id}/", "")
|
||||
if peer_id in self.peer_keys:
|
||||
crypticle = salt.crypt.Crypticle(self.opts, self.peer_keys[peer_id])
|
||||
event_data = crypticle.loads(data)
|
||||
# __peer_id can be used to know if this event came from a
|
||||
# different master?
|
||||
event_data["__peer_id"] = peer_id
|
||||
peer_id, parsed_tag = self.parse_cluster_tag(tag)
|
||||
try:
|
||||
event_data = self.extract_cluster_event(peer_id, data)
|
||||
except salt.exceptions.AuthenticationError:
|
||||
self.auth_errors[peer_id].append((tag, data))
|
||||
else:
|
||||
await self.transport.publish_payload(
|
||||
salt.utils.event.SaltEvent.pack(stripped_tag, event_data)
|
||||
salt.utils.event.SaltEvent.pack(parsed_tag, event_data)
|
||||
)
|
||||
else:
|
||||
log.error("This cluster tag not valid %s", tag)
|
||||
|
@ -1083,18 +1117,37 @@ class MasterPubServerChannel:
|
|||
log.critical("Unexpected error while polling master events", exc_info=True)
|
||||
return None
|
||||
|
||||
def parse_cluster_tag(self, tag):
|
||||
peer_id = tag.replace("cluster/event/", "").split("/")[0]
|
||||
stripped_tag = tag.replace(f"cluster/event/{peer_id}/", "")
|
||||
return peer_id, stripped_tag
|
||||
|
||||
def extract_cluster_event(self, peer_id, data):
|
||||
if peer_id in self.peer_keys:
|
||||
crypticle = salt.crypt.Crypticle(self.opts, self.peer_keys[peer_id])
|
||||
event_data = crypticle.loads(data)["event_payload"]
|
||||
# __peer_id can be used to know if this event came from a
|
||||
# different master?
|
||||
event_data["__peer_id"] = peer_id
|
||||
return event_data
|
||||
raise salt.exceptions.AuthenticationError("Peer aes key not available")
|
||||
|
||||
async def publish_payload(self, load, *args):
|
||||
log.error("Publish event to local ipc clients")
|
||||
tag, data = salt.utils.event.SaltEvent.unpack(load)
|
||||
tasks = []
|
||||
if not tag.startswith("cluster/peer"):
|
||||
tasks = [asyncio.create_task(self.transport.publish_payload(load))]
|
||||
tasks = [
|
||||
asyncio.create_task(
|
||||
self.transport.publish_payload(load), name=self.opts["id"]
|
||||
)
|
||||
]
|
||||
for pusher in self.pushers:
|
||||
log.error(
|
||||
"Publish event to peer master %s:%s", pusher.pull_host, pusher.pull_port
|
||||
)
|
||||
log.debug("Publish event to peer %s:%s", pusher.pull_host, pusher.pull_port)
|
||||
if tag.startswith("cluster/peer"):
|
||||
tasks.append(asyncio.create_task(pusher.publish(load)))
|
||||
tasks.append(
|
||||
asyncio.create_task(pusher.publish(load), name=pusher.pull_host)
|
||||
)
|
||||
continue
|
||||
crypticle = salt.crypt.Crypticle(
|
||||
self.opts, salt.master.SMaster.secrets["aes"]["secret"].value
|
||||
|
@ -1109,5 +1162,16 @@ class MasterPubServerChannel:
|
|||
for task in tasks:
|
||||
try:
|
||||
task.result()
|
||||
except Exception: # pylint: disable=broad-except
|
||||
log.error("Error sending task %s", exc)
|
||||
# XXX This error is transport specific and should be something else
|
||||
except tornado.iostream.StreamClosedError:
|
||||
if task.get_name() == self.opts["id"]:
|
||||
log.error("Unable to forward event to local ipc bus")
|
||||
else:
|
||||
log.warning(
|
||||
"Unable to forward event to cluster peer %s", task.get_name()
|
||||
)
|
||||
continue
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
log.error(
|
||||
"Unhandled error sending task %s", task.get_name(), exc_info=True
|
||||
)
|
||||
|
|
|
@ -139,7 +139,9 @@ class SMaster:
|
|||
return cls.secrets["aes"]["serial"].value
|
||||
|
||||
@classmethod
|
||||
def rotate_secrets(cls, opts=None, event=None, use_lock=True, owner=False):
|
||||
def rotate_secrets(
|
||||
cls, opts=None, event=None, use_lock=True, owner=False, publisher=None
|
||||
):
|
||||
log.info("Rotating master AES key")
|
||||
if opts is None:
|
||||
opts = {}
|
||||
|
@ -159,6 +161,10 @@ class SMaster:
|
|||
)
|
||||
if "serial" in secret_map:
|
||||
secret_map["serial"].value = 0
|
||||
|
||||
if publisher:
|
||||
publisher.send_aes_key_event()
|
||||
|
||||
if event:
|
||||
event.fire_event({f"rotate_{secret_key}_key": True}, tag="key")
|
||||
|
||||
|
@ -180,6 +186,7 @@ class Maintenance(salt.utils.process.SignalHandlingProcess):
|
|||
:param dict opts: The salt options
|
||||
"""
|
||||
self.master_secrets = kwargs.pop("master_secrets", None)
|
||||
self.ipc_publisher = kwargs.pop("ipc_publisher", None)
|
||||
super().__init__(**kwargs)
|
||||
self.opts = opts
|
||||
# How often do we perform the maintenance tasks
|
||||
|
@ -331,27 +338,31 @@ class Maintenance(salt.utils.process.SignalHandlingProcess):
|
|||
if not to_rotate and self.opts.get("publish_session"):
|
||||
if self.opts.get("cluster_id", None):
|
||||
keyfile = os.path.join(self.opts["cluster_pki_dir"], ".aes")
|
||||
else:
|
||||
keyfile = os.path.join(self.opts["cachedir"], ".aes")
|
||||
try:
|
||||
stats = os.stat(keyfile)
|
||||
except os.error as exc:
|
||||
log.error("Unexpected condition while reading keyfile %s", exc)
|
||||
return
|
||||
if now - stats.st_mtime >= self.opts["publish_session"]:
|
||||
salt.crypt.dropfile(
|
||||
self.opts["cachedir"], self.opts["user"], self.opts["id"]
|
||||
)
|
||||
# There is currently no concept of a leader in a master
|
||||
# cluster. Lets fake it till we make it with a little
|
||||
# waiting period.
|
||||
time.sleep(drop_file_wait)
|
||||
to_rotate = (
|
||||
salt.crypt.read_dropfile(self.opts["cachedir"]) == self.opts["id"]
|
||||
)
|
||||
try:
|
||||
stats = os.stat(keyfile)
|
||||
except os.error as exc:
|
||||
log.error("Unexpected condition while reading keyfile %s", exc)
|
||||
return
|
||||
if now - stats.st_mtime >= self.opts["publish_session"]:
|
||||
salt.crypt.dropfile(
|
||||
self.opts["cachedir"], self.opts["user"], self.opts["id"]
|
||||
)
|
||||
# There is currently no concept of a leader in a master
|
||||
# cluster. Lets fake it till we make it with a little
|
||||
# waiting period.
|
||||
time.sleep(drop_file_wait)
|
||||
to_rotate = (
|
||||
salt.crypt.read_dropfile(self.opts["cachedir"])
|
||||
== self.opts["id"]
|
||||
)
|
||||
|
||||
if to_rotate:
|
||||
SMaster.rotate_secrets(self.opts, self.event, owner=True)
|
||||
if self.opts.get("cluster_id", None):
|
||||
SMaster.rotate_secrets(
|
||||
self.opts, self.event, owner=True, publisher=self.ipc_publisher
|
||||
)
|
||||
else:
|
||||
SMaster.rotate_secrets(self.opts, self.event, owner=True)
|
||||
|
||||
def handle_git_pillar(self):
|
||||
"""
|
||||
|
@ -721,7 +732,7 @@ class Master(SMaster):
|
|||
with salt.utils.process.default_signals(signal.SIGINT, signal.SIGTERM):
|
||||
if self.opts["cluster_id"]:
|
||||
keypath = os.path.join(self.opts["cluster_pki_dir"], ".aes")
|
||||
keygen = functools.partial(
|
||||
cluster_keygen = functools.partial(
|
||||
salt.crypt.Crypticle.read_or_generate_key,
|
||||
keypath,
|
||||
)
|
||||
|
@ -729,14 +740,19 @@ class Master(SMaster):
|
|||
# them as well.
|
||||
SMaster.secrets["cluster_aes"] = {
|
||||
"secret": multiprocessing.Array(
|
||||
ctypes.c_char, salt.utils.stringutils.to_bytes(keygen())
|
||||
ctypes.c_char, salt.utils.stringutils.to_bytes(cluster_keygen())
|
||||
),
|
||||
"serial": multiprocessing.Value(
|
||||
ctypes.c_longlong,
|
||||
lock=False, # We'll use the lock from 'secret'
|
||||
),
|
||||
"reload": keygen,
|
||||
"reload": cluster_keygen,
|
||||
}
|
||||
|
||||
# Wrap generate_key_string to ignore remove keyward arg.
|
||||
def master_keygen(*args, **kwargs):
|
||||
return salt.crypt.Crypticle.generate_key_string()
|
||||
|
||||
SMaster.secrets["aes"] = {
|
||||
"secret": multiprocessing.Array(
|
||||
ctypes.c_char,
|
||||
|
@ -747,7 +763,7 @@ class Master(SMaster):
|
|||
"serial": multiprocessing.Value(
|
||||
ctypes.c_longlong, lock=False # We'll use the lock from 'secret'
|
||||
),
|
||||
"reload": salt.crypt.Crypticle.generate_key_string,
|
||||
"reload": master_keygen,
|
||||
}
|
||||
|
||||
log.info("Creating master process manager")
|
||||
|
@ -792,7 +808,10 @@ class Master(SMaster):
|
|||
self.process_manager.add_process(
|
||||
Maintenance,
|
||||
args=(self.opts,),
|
||||
kwargs={"master_secrets": SMaster.secrets},
|
||||
kwargs={
|
||||
"master_secrets": SMaster.secrets,
|
||||
"ipc_publisher": ipc_publisher,
|
||||
},
|
||||
name="Maintenance",
|
||||
)
|
||||
|
||||
|
@ -900,23 +919,27 @@ class EventMonitor(salt.utils.process.SignalHandlingProcess):
|
|||
channels = []
|
||||
self.channels = channels
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def handle_event(self, package):
|
||||
async def handle_event(self, package):
|
||||
"""
|
||||
Event handler for publish forwarder
|
||||
"""
|
||||
tag, data = salt.utils.event.SaltEvent.unpack(package)
|
||||
log.error("got evetn %s %r", tag, data)
|
||||
log.error("got event %s %r", tag, data)
|
||||
if tag.startswith("salt/job") and tag.endswith("/publish"):
|
||||
# data.pop("_stamp", None)
|
||||
log.trace("Forward job event to publisher server: %r", data)
|
||||
# if not self.channels:
|
||||
# for transport, opts in iter_transport_opts(self.opts):
|
||||
# chan = salt.channel.server.PubServerChannel.factory(opts)
|
||||
# self.channels.append(chan)
|
||||
# for chan in self.channels:
|
||||
# yield chan.publish(data)
|
||||
peer_id = data.pop("__peer_id", None)
|
||||
if peer_id:
|
||||
data.pop("_stamp", None)
|
||||
log.error("Forward job event to publisher server: %r", data)
|
||||
if not self.channels:
|
||||
for transport, opts in iter_transport_opts(self.opts):
|
||||
chan = salt.channel.server.PubServerChannel.factory(opts)
|
||||
self.channels.append(chan)
|
||||
tasks = []
|
||||
for chan in self.channels:
|
||||
tasks.append(asyncio.create_task(chan.publish(data)))
|
||||
await asyncio.gather(*tasks)
|
||||
elif tag == "rotate_aes_key":
|
||||
log.error("Recieved rotate_aes_key event")
|
||||
SMaster.rotate_secrets(self.opts, owner=False)
|
||||
|
||||
def run(self):
|
||||
|
|
194
tests/pytests/scenarios/cluster/conftest.py
Normal file
194
tests/pytests/scenarios/cluster/conftest.py
Normal file
|
@ -0,0 +1,194 @@
|
|||
import logging
|
||||
import subprocess
|
||||
|
||||
import pytest
|
||||
|
||||
import salt.utils.platform
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cluster_shared_path(tmp_path):
|
||||
path = tmp_path / "cluster"
|
||||
path.mkdir()
|
||||
return path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cluster_pki_path(cluster_shared_path):
|
||||
path = cluster_shared_path / "pki"
|
||||
path.mkdir()
|
||||
(path / "peers").mkdir()
|
||||
return path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cluster_cache_path(cluster_shared_path):
|
||||
path = cluster_shared_path / "cache"
|
||||
path.mkdir()
|
||||
return path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cluster_master_1(request, salt_factories, cluster_pki_path, cluster_cache_path):
|
||||
config_defaults = {
|
||||
"open_mode": True,
|
||||
"transport": request.config.getoption("--transport"),
|
||||
}
|
||||
config_overrides = {
|
||||
"interface": "127.0.0.1",
|
||||
"cluster_id": "master_cluster",
|
||||
"cluster_peers": [
|
||||
"127.0.0.2",
|
||||
],
|
||||
"cluster_pki_dir": str(cluster_pki_path),
|
||||
"cache_dir": str(cluster_cache_path),
|
||||
}
|
||||
factory = salt_factories.salt_master_daemon(
|
||||
"127.0.0.1",
|
||||
defaults=config_defaults,
|
||||
overrides=config_overrides,
|
||||
extra_cli_arguments_after_first_start_failure=["--log-level=info"],
|
||||
)
|
||||
with factory.started(start_timeout=120):
|
||||
yield factory
|
||||
|
||||
|
||||
# @pytest.fixture(scope="package")
|
||||
# def cluster_master_1_salt_cli(cluster_master_1):
|
||||
# return cluster__master_1.salt_cli(timeout=120)
|
||||
#
|
||||
#
|
||||
@pytest.fixture
|
||||
def cluster_master_2(salt_factories, cluster_master_1):
|
||||
if salt.utils.platform.is_darwin() or salt.utils.platform.is_freebsd():
|
||||
subprocess.check_output(["ifconfig", "lo0", "alias", "127.0.0.2", "up"])
|
||||
|
||||
config_defaults = {
|
||||
"open_mode": True,
|
||||
"transport": cluster_master_1.config["transport"],
|
||||
}
|
||||
config_overrides = {
|
||||
"interface": "127.0.0.2",
|
||||
"cluster_id": "master_cluster",
|
||||
"cluster_peers": [
|
||||
"127.0.0.1",
|
||||
],
|
||||
"cluster_pki_dir": cluster_master_1.config["cluster_pki_dir"],
|
||||
"cache_dir": cluster_master_1.config["cache_dir"],
|
||||
}
|
||||
|
||||
# Use the same ports for both masters, they are binding to different interfaces
|
||||
for key in (
|
||||
"ret_port",
|
||||
"publish_port",
|
||||
):
|
||||
config_overrides[key] = cluster_master_1.config[key]
|
||||
factory = salt_factories.salt_master_daemon(
|
||||
"127.0.0.2",
|
||||
defaults=config_defaults,
|
||||
overrides=config_overrides,
|
||||
extra_cli_arguments_after_first_start_failure=["--log-level=info"],
|
||||
)
|
||||
with factory.started(start_timeout=120):
|
||||
yield factory
|
||||
|
||||
|
||||
#
|
||||
# @pytest.fixture(scope="package")
|
||||
# def cluster_master_2_salt_cli(cluster_master_2):
|
||||
# return cluster_master_2.salt_cli(timeout=120)
|
||||
#
|
||||
#
|
||||
@pytest.fixture
|
||||
def cluster_minion_1(cluster_master_1):
|
||||
config_defaults = {
|
||||
"transport": cluster_master_1.config["transport"],
|
||||
}
|
||||
|
||||
port = cluster_master_1.config["ret_port"]
|
||||
addr = cluster_master_1.config["interface"]
|
||||
config_overrides = {
|
||||
"master": f"{addr}:{port}",
|
||||
"test.foo": "baz",
|
||||
}
|
||||
factory = cluster_master_1.salt_minion_daemon(
|
||||
"cluster-minion-1",
|
||||
defaults=config_defaults,
|
||||
overrides=config_overrides,
|
||||
extra_cli_arguments_after_first_start_failure=["--log-level=info"],
|
||||
)
|
||||
with factory.started(start_timeout=120):
|
||||
yield factory
|
||||
|
||||
|
||||
#
|
||||
#
|
||||
# @pytest.fixture(scope="package")
|
||||
# def cluster_minion_2(cluster_master_2):
|
||||
# config_defaults = {
|
||||
# "transport": cluster_master_2.config["transport"],
|
||||
# }
|
||||
#
|
||||
# port = cluster_master_2.config["ret_port"]
|
||||
# addr = cluster_master_2.config["interface"]
|
||||
# config_overrides = {
|
||||
# "master": f"{port}:{addr}",
|
||||
# "test.foo": "baz",
|
||||
# }
|
||||
# factory = salt_mm_master_1.salt_minion_daemon(
|
||||
# "cluster-minion-2",
|
||||
# defaults=config_defaults,
|
||||
# overrides=config_overrides,
|
||||
# extra_cli_arguments_after_first_start_failure=["--log-level=info"],
|
||||
# )
|
||||
# with factory.started(start_timeout=120):
|
||||
# yield factory
|
||||
#
|
||||
#
|
||||
#
|
||||
# @pytest.fixture(scope="package")
|
||||
# def run_salt_cmds():
|
||||
# def _run_salt_cmds_fn(clis, minions):
|
||||
# """
|
||||
# Run test.ping from all clis to all minions
|
||||
# """
|
||||
# returned_minions = []
|
||||
# minion_instances = {minion.id: minion for minion in minions}
|
||||
# clis_to_check = {minion.id: list(clis) for minion in minions}
|
||||
#
|
||||
# attempts = 6
|
||||
# timeout = 5
|
||||
# if salt.utils.platform.spawning_platform():
|
||||
# timeout *= 2
|
||||
# while attempts:
|
||||
# if not clis_to_check:
|
||||
# break
|
||||
# for minion in list(clis_to_check):
|
||||
# if not clis_to_check[minion]:
|
||||
# clis_to_check.pop(minion)
|
||||
# continue
|
||||
# for cli in list(clis_to_check[minion]):
|
||||
# try:
|
||||
# ret = cli.run(
|
||||
# "--timeout={}".format(timeout),
|
||||
# "test.ping",
|
||||
# minion_tgt=minion,
|
||||
# _timeout=2 * timeout,
|
||||
# )
|
||||
# if ret.returncode == 0 and ret.data is True:
|
||||
# returned_minions.append((cli, minion_instances[minion]))
|
||||
# clis_to_check[minion].remove(cli)
|
||||
# except FactoryTimeout:
|
||||
# log.debug(
|
||||
# "Failed to execute test.ping from %s to %s.",
|
||||
# cli.get_display_name(),
|
||||
# minion,
|
||||
# )
|
||||
# time.sleep(1)
|
||||
# attempts -= 1
|
||||
#
|
||||
# return returned_minions
|
||||
#
|
||||
# return _run_salt_cmds_fn
|
64
tests/pytests/scenarios/cluster/test_basic_cluster.py
Normal file
64
tests/pytests/scenarios/cluster/test_basic_cluster.py
Normal file
|
@ -0,0 +1,64 @@
|
|||
import salt.utils.event
|
||||
|
||||
|
||||
def test_basic_cluster_setup(
|
||||
cluster_master_1, cluster_master_2, cluster_pki_path, cluster_cache_path
|
||||
):
|
||||
cli1 = cluster_master_1.salt_run_cli(timeout=120)
|
||||
ret = cli1.run("config.get", "cluster_pki_dir")
|
||||
assert str(cluster_pki_path) == ret.stdout
|
||||
ret = cli1.run("config.get", "cache_dir")
|
||||
assert str(cluster_cache_path) == ret.stdout
|
||||
ret = cli1.run("config.get", "cluster_peers")
|
||||
assert ["127.0.0.2"] == ret.data
|
||||
|
||||
cli2 = cluster_master_2.salt_run_cli(timeout=120)
|
||||
ret = cli2.run("config.get", "cluster_pki_dir")
|
||||
assert str(cluster_pki_path) == ret.stdout
|
||||
ret = cli2.run("config.get", "cache_dir")
|
||||
assert str(cluster_cache_path) == ret.stdout
|
||||
ret = cli2.run("config.get", "cluster_peers")
|
||||
assert ["127.0.0.1"] == ret.data
|
||||
|
||||
peers_path = cluster_pki_path / "peers"
|
||||
unexpected = False
|
||||
found = []
|
||||
for key_path in peers_path.iterdir():
|
||||
if key_path.name == "127.0.0.1.pub":
|
||||
found.append("127.0.0.1")
|
||||
elif key_path.name == "127.0.0.2.pub":
|
||||
found.append("127.0.0.2")
|
||||
else:
|
||||
unexpected = True
|
||||
|
||||
found.sort()
|
||||
|
||||
assert ["127.0.0.1", "127.0.0.2"] == found
|
||||
assert unexpected is False
|
||||
|
||||
assert (cluster_pki_path / ".aes").exists()
|
||||
|
||||
|
||||
def test_basic_cluster_event(cluster_master_1, cluster_master_2):
|
||||
with salt.utils.event.get_event(
|
||||
"master", opts=cluster_master_2.config, listen=True
|
||||
) as event2:
|
||||
event1 = salt.utils.event.get_event("master", opts=cluster_master_2.config)
|
||||
data = {"meh": "bah"}
|
||||
event1.fire_event(data, "meh/bah")
|
||||
evt = event2.get_event(tag="meh/bah", wait=5)
|
||||
assert data == evt
|
||||
|
||||
|
||||
def test_basic_cluster_minion_1(cluster_master_1, cluster_master_2, cluster_minion_1):
|
||||
cli = cluster_master_1.salt_cli(timeout=120)
|
||||
ret = cli.run("test.ping", minion_tgt="cluster-minion-1")
|
||||
assert ret.data is True
|
||||
|
||||
|
||||
def test_basic_cluster_minion_1_from_master_2(
|
||||
cluster_master_1, cluster_master_2, cluster_minion_1
|
||||
):
|
||||
cli = cluster_master_2.salt_cli(timeout=120)
|
||||
ret = cli.run("test.ping", minion_tgt="cluster-minion-1")
|
||||
assert ret.data is True
|
|
@ -344,8 +344,8 @@ def test_master_keys_with_cluster_id(tmp_path, master_opts):
|
|||
@pytest.mark.skipif(not HAS_PYCRYPTO_RSA, reason="pycrypto >= 2.6 is not available")
|
||||
@pytest.mark.skipif(HAS_M2, reason="m2crypto is used by salt.crypt if installed")
|
||||
def test_pycrypto_gen_keys():
|
||||
open_priv_wb = MockCall("/keydir{}keyname.pem".format(os.sep), "wb+")
|
||||
open_pub_wb = MockCall("/keydir{}keyname.pub".format(os.sep), "wb+")
|
||||
open_priv_wb = MockCall(f"/keydir{os.sep}keyname.pem", "wb+")
|
||||
open_pub_wb = MockCall(f"/keydir{os.sep}keyname.pub", "wb+")
|
||||
|
||||
with patch.multiple(
|
||||
os,
|
||||
|
@ -357,7 +357,7 @@ def test_pycrypto_gen_keys():
|
|||
"os.path.isfile", return_value=True
|
||||
):
|
||||
result = salt.crypt.gen_keys("/keydir", "keyname", 2048)
|
||||
assert result == "/keydir{}keyname.pem".format(os.sep), result
|
||||
assert result == f"/keydir{os.sep}keyname.pem", result
|
||||
assert open_priv_wb not in m_open.calls
|
||||
assert open_pub_wb not in m_open.calls
|
||||
|
||||
|
@ -434,20 +434,22 @@ def test_m2_gen_keys():
|
|||
with patch("M2Crypto.RSA.RSA.save_pem", MagicMock()) as save_pem:
|
||||
with patch("M2Crypto.RSA.RSA.save_pub_key", MagicMock()) as save_pub:
|
||||
with patch("os.path.isfile", return_value=True):
|
||||
assert salt.crypt.gen_keys(
|
||||
"/keydir", "keyname", 2048
|
||||
) == "/keydir{}keyname.pem".format(os.sep)
|
||||
assert (
|
||||
salt.crypt.gen_keys("/keydir", "keyname", 2048)
|
||||
== f"/keydir{os.sep}keyname.pem"
|
||||
)
|
||||
save_pem.assert_not_called()
|
||||
save_pub.assert_not_called()
|
||||
|
||||
with patch("os.path.isfile", return_value=False):
|
||||
salt.crypt.gen_keys(
|
||||
"/keydir", "keyname", 2048
|
||||
) == "/keydir{}keyname.pem".format(os.sep)
|
||||
save_pem.assert_called_once_with(
|
||||
"/keydir{}keyname.pem".format(os.sep), cipher=None
|
||||
assert (
|
||||
salt.crypt.gen_keys("/keydir", "keyname", 2048)
|
||||
== f"/keydir{os.sep}keyname.pem"
|
||||
)
|
||||
save_pub.assert_called_once_with("/keydir{}keyname.pub".format(os.sep))
|
||||
save_pem.assert_called_once_with(
|
||||
f"/keydir{os.sep}keyname.pem", cipher=None
|
||||
)
|
||||
save_pub.assert_called_once_with(f"/keydir{os.sep}keyname.pub")
|
||||
|
||||
|
||||
@patch("os.umask", MagicMock())
|
||||
|
@ -460,24 +462,30 @@ def test_gen_keys_with_passphrase():
|
|||
with patch("M2Crypto.RSA.RSA.save_pem", MagicMock()) as save_pem:
|
||||
with patch("M2Crypto.RSA.RSA.save_pub_key", MagicMock()) as save_pub:
|
||||
with patch("os.path.isfile", return_value=True):
|
||||
assert salt.crypt.gen_keys(
|
||||
"/keydir", "keyname", 2048, passphrase="password"
|
||||
) == "/keydir{}keyname.pem".format(os.sep)
|
||||
assert (
|
||||
salt.crypt.gen_keys(
|
||||
"/keydir", "keyname", 2048, passphrase="password"
|
||||
)
|
||||
== f"/keydir{os.sep}keyname.pem"
|
||||
)
|
||||
save_pem.assert_not_called()
|
||||
save_pub.assert_not_called()
|
||||
|
||||
with patch("os.path.isfile", return_value=False):
|
||||
assert salt.crypt.gen_keys(
|
||||
"/keydir", "keyname", 2048, passphrase="password"
|
||||
) == "/keydir{}keyname.pem".format(os.sep)
|
||||
assert (
|
||||
salt.crypt.gen_keys(
|
||||
"/keydir", "keyname", 2048, passphrase="password"
|
||||
)
|
||||
== f"/keydir{os.sep}keyname.pem"
|
||||
)
|
||||
callback = save_pem.call_args[1]["callback"]
|
||||
save_pem.assert_called_once_with(
|
||||
"/keydir{}keyname.pem".format(os.sep),
|
||||
f"/keydir{os.sep}keyname.pem",
|
||||
cipher="des_ede3_cbc",
|
||||
callback=callback,
|
||||
)
|
||||
assert callback(None) == b"password"
|
||||
save_pub.assert_called_once_with("/keydir{}keyname.pub".format(os.sep))
|
||||
save_pub.assert_called_once_with(f"/keydir{os.sep}keyname.pub")
|
||||
|
||||
|
||||
@pytest.mark.skipif(not HAS_M2, reason="m2crypto is not available")
|
||||
|
@ -528,15 +536,6 @@ def test_m2_bad_key(key_to_test):
|
|||
assert key.check_key() == 1
|
||||
|
||||
|
||||
@pytest.mark.skipif(not HAS_M2, reason="Skip when m2crypto is not installed")
|
||||
def test_m2_bad_key(key_to_test):
|
||||
"""
|
||||
Load public key with an invalid header using m2crypto and validate it
|
||||
"""
|
||||
key = salt.crypt.get_rsa_pub_key(key_to_test)
|
||||
assert key.check_key() == 1
|
||||
|
||||
|
||||
@pytest.mark.skipif(HAS_M2, reason="Skip when m2crypto is installed")
|
||||
def test_pycrypto_bad_key(key_to_test):
|
||||
"""
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import os
|
||||
import pathlib
|
||||
import stat
|
||||
import threading
|
||||
import time
|
||||
|
||||
|
@ -11,7 +12,7 @@ from tests.support.mock import MagicMock, patch
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def maintenence_opts(master_opts):
|
||||
def maintenance_opts(master_opts):
|
||||
"""
|
||||
Options needed for master's Maintenence class
|
||||
"""
|
||||
|
@ -21,11 +22,11 @@ def maintenence_opts(master_opts):
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def maintenence(maintenence_opts):
|
||||
def maintenance(maintenance_opts):
|
||||
"""
|
||||
The master's Maintenence class
|
||||
"""
|
||||
return salt.master.Maintenance(maintenence_opts)
|
||||
return salt.master.Maintenance(maintenance_opts)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -40,6 +41,29 @@ def clear_funcs(master_opts):
|
|||
clear_funcs.destroy()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cluster_maintenance_opts(master_opts, tmp_path):
|
||||
"""
|
||||
Options needed for master's Maintenence class
|
||||
"""
|
||||
opts = master_opts.copy()
|
||||
opts.update(
|
||||
git_pillar_update_interval=180,
|
||||
maintenance_interval=181,
|
||||
cluster_pki_dir=tmp_path,
|
||||
cluster_id="test-cluster",
|
||||
)
|
||||
yield opts
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cluster_maintenance(cluster_maintenance_opts):
|
||||
"""
|
||||
The master's Maintenence class
|
||||
"""
|
||||
return salt.master.Maintenance(cluster_maintenance_opts)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def encrypted_requests(tmp_path):
|
||||
# To honor the comment on AESFuncs
|
||||
|
@ -854,7 +878,7 @@ async def test_publish_user_authorization_error(clear_funcs):
|
|||
assert await clear_funcs.publish(load) == mock_ret
|
||||
|
||||
|
||||
def test_run_func(maintenence):
|
||||
def test_run_func(maintenance):
|
||||
"""
|
||||
Test the run function inside Maintenance class.
|
||||
"""
|
||||
|
@ -917,7 +941,7 @@ def test_run_func(maintenence):
|
|||
"salt.utils.verify.check_max_open_files", mocked_check_max_open_files
|
||||
):
|
||||
try:
|
||||
maintenence.run()
|
||||
maintenance.run()
|
||||
except RuntimeError as exc:
|
||||
assert str(exc) == "Time passes"
|
||||
assert mocked_time._calls == [60] * 4
|
||||
|
@ -933,51 +957,48 @@ def test_run_func(maintenence):
|
|||
assert mocked_check_max_open_files.call_times == [0, 60, 120, 180]
|
||||
|
||||
|
||||
def test_key_rotate_master_match(maintenence):
|
||||
maintenence.event = MagicMock()
|
||||
def test_key_rotate_master_match(maintenance):
|
||||
maintenance.event = MagicMock()
|
||||
now = time.monotonic()
|
||||
dfn = pathlib.Path(maintenence.opts["cachedir"]) / ".dfn"
|
||||
dfn = pathlib.Path(maintenance.opts["cachedir"]) / ".dfn"
|
||||
salt.crypt.dropfile(
|
||||
maintenence.opts["cachedir"],
|
||||
maintenence.opts["user"],
|
||||
master_id=maintenence.opts["id"],
|
||||
maintenance.opts["cachedir"],
|
||||
maintenance.opts["user"],
|
||||
master_id=maintenance.opts["id"],
|
||||
)
|
||||
assert dfn.exists()
|
||||
with patch("salt.master.SMaster.rotate_secrets") as rotate_secrets:
|
||||
maintenence.handle_key_rotate(now)
|
||||
maintenance.handle_key_rotate(now)
|
||||
assert not dfn.exists()
|
||||
rotate_secrets.assert_called_with(
|
||||
maintenence.opts, maintenence.event, owner=True
|
||||
maintenance.opts, maintenance.event, owner=True
|
||||
)
|
||||
|
||||
|
||||
def test_key_rotate_no_master_match(maintenence):
|
||||
def test_key_rotate_no_master_match(maintenance):
|
||||
now = time.monotonic()
|
||||
dfn = pathlib.Path(maintenence.opts["cachedir"]) / ".dfn"
|
||||
dfn = pathlib.Path(maintenance.opts["cachedir"]) / ".dfn"
|
||||
dfn.write_text("nomatch")
|
||||
assert dfn.exists()
|
||||
with patch("salt.master.SMaster.rotate_secrets") as rotate_secrets:
|
||||
maintenence.handle_key_rotate(now)
|
||||
maintenance.handle_key_rotate(now)
|
||||
assert dfn.exists()
|
||||
rotate_secrets.assert_not_called()
|
||||
|
||||
|
||||
import stat
|
||||
|
||||
|
||||
@pytest.mark.slow_test
|
||||
def test_key_dfn_wait(maintenence):
|
||||
def test_key_dfn_wait(cluster_maintenance):
|
||||
now = time.monotonic()
|
||||
key = pathlib.Path(maintenence.opts["cachedir"]) / ".aes"
|
||||
key = pathlib.Path(cluster_maintenance.opts["cluster_pki_dir"]) / ".aes"
|
||||
salt.crypt.Crypticle.read_or_generate_key(str(key))
|
||||
rotate_time = time.monotonic() - (maintenence.opts["publish_session"] + 1)
|
||||
rotate_time = time.monotonic() - (cluster_maintenance.opts["publish_session"] + 1)
|
||||
os.utime(str(key), (rotate_time, rotate_time))
|
||||
|
||||
dfn = pathlib.Path(maintenence.opts["cachedir"]) / ".dfn"
|
||||
dfn = pathlib.Path(cluster_maintenance.opts["cachedir"]) / ".dfn"
|
||||
|
||||
def run_key_rotate():
|
||||
with patch("salt.master.SMaster.rotate_secrets") as rotate_secrets:
|
||||
maintenence.handle_key_rotate(now)
|
||||
cluster_maintenance.handle_key_rotate(now)
|
||||
assert dfn.exists()
|
||||
rotate_secrets.assert_not_called()
|
||||
|
||||
|
@ -990,7 +1011,7 @@ def test_key_dfn_wait(maintenence):
|
|||
if time.monotonic() - start > 30:
|
||||
assert dfn.exists(), "dfn file never created"
|
||||
|
||||
assert maintenence.opts["id"] == dfn.read_text()
|
||||
assert cluster_maintenance.opts["id"] == dfn.read_text()
|
||||
|
||||
with salt.utils.files.set_umask(0o277):
|
||||
if os.path.isfile(dfn) and not os.access(dfn, os.W_OK):
|
||||
|
|
Loading…
Add table
Reference in a new issue