Encrypt the master event bus for cluster peers

This commit is contained in:
Daniel A. Wozniak 2023-08-20 15:39:30 -07:00 committed by Gareth J. Greenaway
parent b473ed193a
commit f516003529
7 changed files with 732 additions and 549 deletions

View file

@ -4,10 +4,12 @@ Encapsulate the different transports available to Salt.
This includes server side transport, for the ReqServer and the Publisher
"""
import asyncio
import binascii
import hashlib
import logging
import os
import pathlib
import shutil
import tornado.gen
@ -61,6 +63,12 @@ class ReqServerChannel:
)
self.master_key = salt.crypt.MasterKeys(self.opts)
@property
def aes_key(self):
if self.opts.get("cluster_id", None):
return salt.master.SMaster.secrets["cluster_aes"]["secret"].value
return salt.master.SMaster.secrets["aes"]["secret"].value
def pre_fork(self, process_manager):
"""
Do anything necessary pre-fork. Since this is on the master side this will
@ -84,9 +92,7 @@ class ReqServerChannel:
)
os.nice(self.opts["pub_server_niceness"])
self.io_loop = io_loop
self.crypticle = salt.crypt.Crypticle(
self.opts, salt.master.SMaster.secrets["aes"]["secret"].value
)
self.crypticle = salt.crypt.Crypticle(self.opts, self.aes_key)
# other things needed for _auth
# Create the event manager
self.event = salt.utils.event.get_master_event(
@ -198,19 +204,14 @@ class ReqServerChannel:
key = salt.crypt.Crypticle.generate_key_string()
pcrypt = salt.crypt.Crypticle(self.opts, key)
try:
pub = salt.crypt.get_rsa_pub_key(pubfn)
pub = salt.crypt.PublicKey(pubfn)
except (ValueError, IndexError, TypeError):
return self.crypticle.dumps({})
except OSError:
log.error("AES key not found")
return {"error": "AES key not found"}
pret = {}
key = salt.utils.stringutils.to_bytes(key)
if HAS_M2:
pret["key"] = pub.public_encrypt(key, RSA.pkcs1_oaep_padding)
else:
cipher = PKCS1_OAEP.new(pub)
pret["key"] = cipher.encrypt(key)
pret["key"] = pub.encrypt(key)
if ret is False:
ret = {}
if sign_messages:
@ -221,7 +222,7 @@ class ReqServerChannel:
)
signed_msg = {
"data": tosign,
"sig": salt.crypt.sign_message(self.master_key.rsa_path, tosign),
"sig": salt.crypt.PrivateKey(self.master_key.rsa_path).sign(tosign),
}
pret[dictkey] = pcrypt.dumps(signed_msg)
else:
@ -647,15 +648,13 @@ class ReqServerChannel:
)
else:
mtoken = mcipher.decrypt(load["token"])
aes = "{}_|-{}".format(
salt.master.SMaster.secrets["aes"]["secret"].value, mtoken
)
aes = "{}_|-{}".format(self.aes_key, mtoken)
except Exception: # pylint: disable=broad-except
# Token failed to decrypt, send back the salty bacon to
# support older minions
pass
else:
aes = salt.master.SMaster.secrets["aes"]["secret"].value
aes = self.aes_key
if HAS_M2:
ret["aes"] = pub.public_encrypt(aes, RSA.pkcs1_oaep_padding)
@ -679,7 +678,7 @@ class ReqServerChannel:
# support older minions
pass
aes = salt.master.SMaster.secrets["aes"]["secret"].value
aes = self.aes_key
if HAS_M2:
ret["aes"] = pub.public_encrypt(aes, RSA.pkcs1_oaep_padding)
else:
@ -734,6 +733,12 @@ class PubServerChannel:
self.presence_events = presence_events
self.event = salt.utils.event.get_event("master", opts=self.opts, listen=False)
@property
def aes_key(self):
if self.opts.get("cluster_id", None):
return salt.master.SMaster.secrets["cluster_aes"]["secret"].value
return salt.master.SMaster.secrets["aes"]["secret"].value
def __getstate__(self):
return {
"opts": self.opts,
@ -789,9 +794,7 @@ class PubServerChannel:
if msg["enc"] != "aes":
# We only accept 'aes' encoded messages for 'id'
return
crypticle = salt.crypt.Crypticle(
self.opts, salt.master.SMaster.secrets["aes"]["secret"].value
)
crypticle = salt.crypt.Crypticle(self.opts, self.aes_key)
load = crypticle.loads(msg["load"])
load = salt.transport.frame.decode_embedded_strs(load)
if not self.aes_funcs.verify_minion(load["id"], load["tok"]):
@ -865,20 +868,15 @@ class PubServerChannel:
def wrap_payload(self, load):
payload = {"enc": "aes"}
load["serial"] = salt.master.SMaster.get_serial()
crypticle = salt.crypt.Crypticle(
self.opts, salt.master.SMaster.secrets["aes"]["secret"].value
)
if not self.opts.get("cluster_id", None):
load["serial"] = salt.master.SMaster.get_serial()
crypticle = salt.crypt.Crypticle(self.opts, self.aes_key)
payload["load"] = crypticle.dumps(load)
if self.opts["sign_pub_messages"]:
if self.opts["cluster_id"]:
master_pem_path = os.path.join(self.opts["cluster_pki_dir"], "cluster.pem")
else:
master_pem_path = os.path.join(self.opts["pki_dir"], "master.pem")
log.debug("Signing data packet")
payload["sig"] = salt.crypt.sign_message(
self.master_key.rsa_path, payload["load"]
)
payload["sig"] = salt.crypt.PrivateKey(
self.master_key.rsa_path,
).sign(payload["load"])
int_payload = {"payload": salt.payload.dumps(payload)}
# If topics are upported, target matching has to happen master side
@ -926,6 +924,41 @@ class MasterPubServerChannel:
self.opts = opts
self.transport = transport
self.io_loop = tornado.ioloop.IOLoop.current()
self.master_key = salt.crypt.MasterKeys(self.opts)
self.peer_keys = {}
def send_aes_key_event(self):
data = {"peer_id": self.opts["id"], "peers": {}}
for peer in self.opts.get("cluster_peers", []):
peer_pub = (
pathlib.Path(self.opts["cluster_pki_dir"]) / "peers" / f"{peer}.pub"
)
if peer_pub.exists():
pub = salt.crypt.PublicKey(peer_pub)
aes = salt.master.SMaster.secrets["aes"]["secret"].value
digest = salt.utils.stringutils.to_bytes(
hashlib.sha256(aes).hexdigest()
)
data["peers"][peer] = {
"aes": pub.encrypt(aes),
"sig": salt.crypt.private_encrypt(
self.master_key.master_key, digest
),
}
log.error(
"WTF SEND SIG %s",
hashlib.md5(data["peers"][peer]["sig"]).hexdigest(),
)
else:
log.error("Peer key missing %r", peer_pub)
data["peers"][peer] = {}
with salt.utils.event.get_master_event(
self.opts, self.opts["sock_dir"], listen=False
) as event:
event.fire_event(
data,
salt.utils.event.tagify(self.opts["id"], "peer", "cluster"),
)
def __getstate__(self):
return {
@ -949,7 +982,6 @@ class MasterPubServerChannel:
:param func process_manager: A ProcessManager, from salt.utils.process.ProcessManager
"""
if hasattr(self.transport, "publish_daemon"):
process_manager.add_process(
self._publish_daemon, kwargs=kwargs, name="EventPublisher"
)
@ -967,14 +999,13 @@ class MasterPubServerChannel:
self.io_loop = tornado.ioloop.IOLoop.current()
tcp_master_pool_port = 4520
self.pushers = []
for master in self.opts.get("master_pool", []):
for master in self.opts.get("cluster_peers", []):
pusher = salt.transport.tcp.TCPPublishServer(
self.opts,
pull_host=master,
pull_port=tcp_master_pool_port,
self.opts,
pull_host=master,
pull_port=tcp_master_pool_port,
)
self.pushers.append(pusher)
self.pool_puller = salt.transport.tcp.TCPPuller(
host=self.opts["interface"],
port=tcp_master_pool_port,
@ -995,23 +1026,88 @@ class MasterPubServerChannel:
finally:
self.close()
async def publish(self, load):
"""
Publish "load" to minions
"""
await self.transport.publish(load)
# async def publish(self, load):
# """
# Publish "load" to minions
# """
# await self.transport.publish(load)
async def handle_pool_publish(self, load, _):
log.error("Got event from other master")
async def handle_pool_publish(self, payload, _):
"""
Handle incomming events from cluster peer.
"""
try:
await self.transport.publish(load)
# Add an extra fallback in case a forked process leeks through
tag, data = salt.utils.event.SaltEvent.unpack(payload)
log.error("recieved event from peer %s %r", tag, data)
if tag.startswith("cluster/peer"):
log.error("Got peer join %r", data)
peer = data["peer_id"]
aes = data["peers"][self.opts["id"]]["aes"]
sig = data["peers"][self.opts["id"]]["sig"]
key_str = self.master_key.master_private_decrypt(aes)
digest = salt.utils.stringutils.to_bytes(
hashlib.sha256(key_str).hexdigest()
)
pub_path = (
pathlib.Path(self.opts["cluster_pki_dir"]) / "peers" / f"{peer}.pub"
)
key = salt.crypt.PublicKey(pub_path)
m_digest = key.decrypt(sig)
if m_digest != digest:
log.error("Invalid aes signature from peer: %s", peer)
return
log.error("Received new key from peer %s", peer)
if peer in self.peer_keys:
if self.peer_keys[peer] != key_str:
self.peer_keys[peer] = key_str
self.send_aes_key_event()
else:
self.peer_keys[peer] = key_str
self.send_aes_key_event()
elif tag.startswith("cluster/event"):
peer_id = tag.replace("cluster/event/", "").split("/")[0]
stripped_tag = tag.replace(f"cluster/event/{peer_id}/", "")
if peer_id in self.peer_keys:
crypticle = salt.crypt.Crypticle(self.opts, self.peer_keys[peer_id])
event_data = crypticle.loads(data)
# __peer_id can be used to know if this event came from a
# different master?
event_data["__peer_id"] = peer_id
await self.transport.publish_payload(
salt.utils.event.SaltEvent.pack(stripped_tag, event_data)
)
else:
log.error("This cluster tag not valid %s", tag)
## Add an extra fallback in case a forked process leeks through
except Exception: # pylint: disable=broad-except
log.critical("Unexpected error while polling master events", exc_info=True)
return None
async def publish_payload(self, load, *args):
await self.transport.publish_payload(load)
log.error("Publish event to local ipc clients")
tag, data = salt.utils.event.SaltEvent.unpack(load)
tasks = []
if not tag.startswith("cluster/peer"):
tasks = [asyncio.create_task(self.transport.publish_payload(load))]
for pusher in self.pushers:
log.error("Send event to master %s:%s", pusher.pull_host, pusher.pull_port)
await pusher.publish(load)
log.error(
"Publish event to peer master %s:%s", pusher.pull_host, pusher.pull_port
)
if tag.startswith("cluster/peer"):
tasks.append(asyncio.create_task(pusher.publish(load)))
continue
crypticle = salt.crypt.Crypticle(
self.opts, salt.master.SMaster.secrets["aes"]["secret"].value
)
load = {"event_payload": data}
event_data = salt.utils.event.SaltEvent.pack(
salt.utils.event.tagify(tag, self.opts["id"], "cluster/event"),
crypticle.dumps(load),
)
tasks.append(asyncio.create_task(pusher.publish(event_data)))
await asyncio.gather(*tasks, return_exceptions=True)
for task in tasks:
try:
task.result()
except Exception: # pylint: disable=broad-except
log.error("Error sending task %s", exc)

View file

@ -12,6 +12,7 @@ import hashlib
import hmac
import logging
import os
import pathlib
import random
import stat
import sys
@ -188,6 +189,83 @@ def gen_keys(keydir, keyname, keysize, user=None, passphrase=None):
return priv
class PrivateKey:
def __init__(self, path, passphrase=None):
if HAS_M2:
self.key = RSA.load_key(path, lambda x: bytes(passphrase))
else:
with salt.utils.files.fopen(path) as f:
self.key = RSA.importKey(f.read(), passphrase)
def encrypt(self, data):
if HAS_M2:
return self.key.private_encrypt(data, salt.utils.rsax931.RSA_X931_PADDING)
else:
return salt.utils.rsax931.RSAX931Signer(self.key.exportKey("PEM")).sign(
data
)
def sign(self, data):
if HAS_M2:
md = EVP.MessageDigest("sha1")
md.update(salt.utils.stringutils.to_bytes(data))
digest = md.final()
return self.key.sign(digest)
else:
signer = PKCS1_v1_5.new(self.key)
return signer.sign(SHA.new(salt.utils.stringutils.to_bytes(data)))
class PublicKey:
def __init__(self, path, _HAS_M2=HAS_M2):
self._HAS_M2 = _HAS_M2
if self._HAS_M2:
with salt.utils.files.fopen(path, "rb") as f:
data = f.read().replace(b"RSA ", b"")
bio = BIO.MemoryBuffer(data)
try:
self.key = RSA.load_pub_key_bio(bio)
except RSA.RSAError:
raise InvalidKeyError("Encountered bad RSA public key")
else:
with salt.utils.files.fopen(path) as f:
try:
self.key = RSA.importKey(f.read())
except (ValueError, IndexError, TypeError):
raise InvalidKeyError("Encountered bad RSA public key")
def encrypt(self, data):
bdata = salt.utils.stringutils.to_bytes(data)
if self._HAS_M2:
return self.key.public_encrypt(bdata, salt.crypt.RSA.pkcs1_oaep_padding)
else:
return salt.crypt.PKCS1_OAEP.new(self.key).encrypt(bdata)
def verify(self, data, signature):
if self._HAS_M2:
md = EVP.MessageDigest("sha1")
md.update(salt.utils.stringutils.to_bytes(data))
digest = md.final()
try:
return self.key.verify(digest, signature)
except RSA.RSAError as exc:
log.debug("Signature verification failed: %s", exc.args[0])
return False
else:
verifier = PKCS1_v1_5.new(self.key)
return verifier.verify(
SHA.new(salt.utils.stringutils.to_bytes(data)), signature
)
def decrypt(self, data):
data = salt.utils.stringutils.to_bytes(data)
if HAS_M2:
return self.key.public_decrypt(data, salt.utils.rsax931.RSA_X931_PADDING)
else:
verifier = salt.utils.rsax931.RSAX931Verifier(self.key.exportKey("PEM"))
return verifier.verify(data)
@salt.utils.decorators.memoize
def _get_key_with_evict(path, timestamp, passphrase):
"""
@ -386,13 +464,18 @@ class MasterKeys(dict):
self.cluster_rsa_path = os.path.join(
self.opts["cluster_pki_dir"], "cluster.pem"
)
self.cluster_shared_path = os.path.join(
self.opts["cluster_pki_dir"],
"peers",
f"{self.opts['id']}.pub",
)
self.check_master_shared_pub()
key_pass = salt.utils.sdb.sdb_get(self.opts["cluster_key_pass"], self.opts)
self.cluster_key = self.__get_keys(
name="cluster",
passphrase=key_pass,
pki_dir=self.opts["cluster_pki_dir"],
)
self.pub_signature = None
# set names for the signing key-pairs
@ -467,6 +550,12 @@ class MasterKeys(dict):
return self.cluster_rsa_path
return self.master_rsa_path
def __key_exists(self, name="master", passphrase=None, pki_dir=None):
if pki_dir is None:
pki_dir = self.opts["pki_dir"]
path = os.path.join(pki_dir, name + ".pem")
return os.path.exists(path)
def __get_keys(self, name="master", passphrase=None, pki_dir=None):
"""
Returns a key object for a key in the pki-dir
@ -474,7 +563,7 @@ class MasterKeys(dict):
if pki_dir is None:
pki_dir = self.opts["pki_dir"]
path = os.path.join(pki_dir, name + ".pem")
if not os.path.exists(path):
if not self.__key_exists(name, passphrase, pki_dir):
log.info("Generating %s keys: %s", name, pki_dir)
gen_keys(
pki_dir,
@ -535,6 +624,35 @@ class MasterKeys(dict):
"""
return self.pub_signature
def check_master_shared_pub(self):
"""
Check the status of the master's shared public key.
If the shared master key does not exist, write this master's public key
to the shared location. Otherwise validate the shared key matches our
key. Failed validation raises MasterExit
"""
shared_pub = pathlib.Path(self.cluster_shared_path)
master_pub = pathlib.Path(self.master_pub_path)
if shared_pub.exists():
if shared_pub.read_bytes() != master_pub.read_bytes():
message = (
f"Shared key does not match, remove it to continue: {shared_pub}"
)
log.error(message)
raise MasterExit(message)
else:
# permissions
log.debug("Writing shared key %s", shared_pub)
shared_pub.write_bytes(master_pub.read_bytes())
def master_private_decrypt(self, data):
if HAS_M2:
return self.master_key.private_decrypt(data, RSA.pkcs1_oaep_padding)
else:
cipher = PKCS1_OAEP.new(self.master_key)
return cipher.decrypt(data)
class AsyncAuth:
"""
@ -863,9 +981,9 @@ class AsyncAuth:
raise SaltClientError("Invalid master key")
master_pubkey_path = os.path.join(self.opts["pki_dir"], self.mpub)
if os.path.exists(master_pubkey_path) and not verify_signature(
master_pubkey_path, clear_signed_data, clear_signature
):
if os.path.exists(master_pubkey_path) and not PublicKey(
master_pubkey_path
).verify(clear_signed_data, clear_signature):
log.critical("The payload signature did not validate.")
raise SaltClientError("Invalid signature")

View file

@ -329,7 +329,10 @@ class Maintenance(salt.utils.process.SignalHandlingProcess):
# There is no need to check key against publish_session if we're
# already rotating.
if not to_rotate and self.opts.get("publish_session"):
keyfile = os.path.join(self.opts["cachedir"], ".aes")
if self.opts.get("cluster_id", None):
keyfile = os.path.join(self.opts["cluster_pki_dir"], ".aes")
else:
keyfile = os.path.join(self.opts["cachedir"], ".aes")
try:
stats = os.stat(keyfile)
except os.error as exc:
@ -716,22 +719,35 @@ class Master(SMaster):
# manager. We don't want the processes being started to inherit those
# signal handlers
with salt.utils.process.default_signals(signal.SIGINT, signal.SIGTERM):
keypath = os.path.join(self.opts["cachedir"], ".aes")
keygen = functools.partial(
salt.crypt.Crypticle.read_or_generate_key,
keypath,
)
# Setup the secrets here because the PubServerChannel may need
# them as well.
if self.opts["cluster_id"]:
keypath = os.path.join(self.opts["cluster_pki_dir"], ".aes")
keygen = functools.partial(
salt.crypt.Crypticle.read_or_generate_key,
keypath,
)
# Setup the secrets here because the PubServerChannel may need
# them as well.
SMaster.secrets["cluster_aes"] = {
"secret": multiprocessing.Array(
ctypes.c_char, salt.utils.stringutils.to_bytes(keygen())
),
"serial": multiprocessing.Value(
ctypes.c_longlong,
lock=False, # We'll use the lock from 'secret'
),
"reload": keygen,
}
SMaster.secrets["aes"] = {
"secret": multiprocessing.Array(
ctypes.c_char, salt.utils.stringutils.to_bytes(keygen())
ctypes.c_char,
salt.utils.stringutils.to_bytes(
salt.crypt.Crypticle.generate_key_string()
),
),
"serial": multiprocessing.Value(
ctypes.c_longlong, lock=False # We'll use the lock from 'secret'
),
"reload": keygen,
"reload": salt.crypt.Crypticle.generate_key_string,
}
log.info("Creating master process manager")
@ -745,20 +761,10 @@ class Master(SMaster):
pub_channels.append(chan)
log.info("Creating master event publisher process")
ipc_publisher = salt.transport.ipc_publish_server("master", self.opts)
ipc_publisher = salt.channel.server.MasterPubServerChannel.factory(
self.opts
)
ipc_publisher.pre_fork(self.process_manager)
# self.process_manager.add_process(
# ipc_publisher.publish_daemon,
# args=[
# ipc_publisher.publish_payload,
# ],
# name="EventPublisher",
# )
self.process_manager.add_process(
EventMonitor,
args=[self.opts],
@ -867,6 +873,9 @@ class Master(SMaster):
# No custom signal handling was added, install our own
signal.signal(signal.SIGTERM, self._handle_signals)
if self.opts.get("cluster_id", None):
# Notify the rest of the cluster we're starting.
ipc_publisher.send_aes_key_event()
self.process_manager.run()
def _handle_signals(self, signum, sigframe):
@ -897,6 +906,7 @@ class EventMonitor(salt.utils.process.SignalHandlingProcess):
Event handler for publish forwarder
"""
tag, data = salt.utils.event.SaltEvent.unpack(package)
log.error("got evetn %s %r", tag, data)
if tag.startswith("salt/job") and tag.endswith("/publish"):
# data.pop("_stamp", None)
log.trace("Forward job event to publisher server: %r", data)
@ -2395,6 +2405,7 @@ class ClearFuncs(TransportMethods):
# An alternative to copy may be to pop it
# payload.pop("_stamp")
self._send_ssh_pub(payload, ssh_minions=ssh_minions)
log.error("SEND JOB PAYLOAD %r", payload)
await self._send_pub(payload)
return {

View file

@ -1120,7 +1120,9 @@ class TCPPuller:
but using either UNIX domain sockets or TCP sockets
"""
def __init__(self, host=None, port=None, path=None, io_loop=None, payload_handler=None):
def __init__(
self, host=None, port=None, path=None, io_loop=None, payload_handler=None
):
"""
Create a new Tornado IPC server
@ -1216,7 +1218,9 @@ class TCPPuller:
if self.path:
log.trace("Client disconnected from IPC %s", self.path)
else:
log.trace("Client disconnected from IPC %s:%s", self.host, self.port)
log.trace(
"Client disconnected from IPC %s:%s", self.host, self.port
)
break
except OSError as exc:
# On occasion an exception will occur with
@ -1386,7 +1390,9 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
log.debug("Publish server binding pull to %s", self.pull_path)
pull_path = self.pull_path
else:
log.info("Publish server binding pull to 127.0.0.1:%s", self.pull_port)
log.info(
"Publish server binding pull to %s:%s", self.pull_host, self.pull_port
)
pull_host = self.pull_host
pull_port = self.pull_port

View file

@ -424,6 +424,25 @@ class SaltEvent:
data = salt.payload.loads(mdata, encoding="utf-8")
return mtag, data
@classmethod
def pack(cls, tag, data, max_size=None):
tagend = TAGEND
serialized_data = salt.payload.dumps(data, use_bin_type=True)
if max_size:
serialized_data = salt.utils.dicttrim.trim_dict(
serialized_data,
max_size,
is_msgpacked=True,
use_bin_type=True,
)
return b"".join(
[
salt.utils.stringutils.to_bytes(tag),
salt.utils.stringutils.to_bytes(tagend),
serialized_data,
]
)
def _get_match_func(self, match_type=None):
if match_type is None:
match_type = self.opts["event_match_type"]
@ -713,33 +732,7 @@ class SaltEvent:
return False
data["_stamp"] = datetime.datetime.utcnow().isoformat()
tagend = TAGEND
# Since the pack / unpack logic here is for local events only,
# it is safe to change the wire protocol. The mechanism
# that sends events from minion to master is outside this
# file.
dump_data = salt.payload.dumps(data, use_bin_type=True)
serialized_data = salt.utils.dicttrim.trim_dict(
dump_data,
self.opts["max_event_size"],
is_msgpacked=True,
use_bin_type=True,
)
log.debug(
"Sending event(fire_event_async): tag = %s; data = %s %r",
tag,
data,
self.pusher,
)
event = b"".join(
[
salt.utils.stringutils.to_bytes(tag),
salt.utils.stringutils.to_bytes(tagend),
serialized_data,
]
)
event = self.pack(tag, data, max_size=self.opts["max_event_size"])
msg = salt.utils.stringutils.to_bytes(event, "utf-8")
self.pusher.publish(msg)
if cb is not None:
@ -774,32 +767,7 @@ class SaltEvent:
return False
data["_stamp"] = datetime.datetime.utcnow().isoformat()
tagend = TAGEND
# Since the pack / unpack logic here is for local events only,
# it is safe to change the wire protocol. The mechanism
# that sends events from minion to master is outside this
# file.
dump_data = salt.payload.dumps(data, use_bin_type=True)
serialized_data = salt.utils.dicttrim.trim_dict(
dump_data,
self.opts["max_event_size"],
is_msgpacked=True,
use_bin_type=True,
)
log.debug(
"Sending event(fire_event): tag = %s; data = %s",
tag,
data,
)
event = b"".join(
[
salt.utils.stringutils.to_bytes(tag),
salt.utils.stringutils.to_bytes(tagend),
serialized_data,
]
)
event = self.pack(tag, data, max_size=self.opts["max_event_size"])
msg = salt.utils.stringutils.to_bytes(event, "utf-8")
if self._run_io_loop_sync:
try:

View file

@ -5,6 +5,7 @@ tests.pytests.unit.test_crypt
Unit tests for salt's crypt module
"""
import os
import uuid
import pytest
@ -12,6 +13,28 @@ import pytest
import salt.crypt
import salt.master
import salt.utils.files
from tests.support.helpers import dedent
from tests.support.mock import MagicMock, MockCall, mock_open, patch
try:
import M2Crypto
HAS_M2 = True
except ImportError:
HAS_M2 = False
try:
from Cryptodome.PublicKey import RSA
HAS_PYCRYPTO_RSA = True
except ImportError:
HAS_PYCRYPTO_RSA = False
if not HAS_PYCRYPTO_RSA:
try:
from Crypto.PublicKey import RSA # nosec
HAS_PYCRYPTO_RSA = True
except ImportError:
HAS_PYCRYPTO_RSA = False
PRIV_KEY = """
-----BEGIN RSA PRIVATE KEY-----
@ -99,6 +122,92 @@ bQIDAQAB
-----END PUBLIC KEY-----
"""
PRIVKEY_DATA = (
"-----BEGIN RSA PRIVATE KEY-----\n"
"MIIEpAIBAAKCAQEA75GR6ZTv5JOv90Vq8tKhKC7YQnhDIo2hM0HVziTEk5R4UQBW\n"
"a0CKytFMbTONY2msEDwX9iA0x7F5Lgj0X8eD4ZMsYqLzqjWMekLC8bjhxc+EuPo9\n"
"Dygu3mJ2VgRC7XhlFpmdo5NN8J2E7B/CNB3R4hOcMMZNZdi0xLtFoTfwU61UPfFX\n"
"14mV2laqLbvDEfQLJhUTDeFFV8EN5Z4H1ttLP3sMXJvc3EvM0JiDVj4l1TWFUHHz\n"
"eFgCA1Im0lv8i7PFrgW7nyMfK9uDSsUmIp7k6ai4tVzwkTmV5PsriP1ju88Lo3MB\n"
"4/sUmDv/JmlZ9YyzTO3Po8Uz3Aeq9HJWyBWHAQIDAQABAoIBAGOzBzBYZUWRGOgl\n"
"IY8QjTT12dY/ymC05GM6gMobjxuD7FZ5d32HDLu/QrknfS3kKlFPUQGDAbQhbbb0\n"
"zw6VL5NO9mfOPO2W/3FaG1sRgBQcerWonoSSSn8OJwVBHMFLG3a+U1Zh1UvPoiPK\n"
"S734swIM+zFpNYivGPvOm/muF/waFf8tF/47t1cwt/JGXYQnkG/P7z0vp47Irpsb\n"
"Yjw7vPe4BnbY6SppSxscW3KoV7GtJLFKIxAXbxsuJMF/rYe3O3w2VKJ1Sug1VDJl\n"
"/GytwAkSUer84WwP2b07Wn4c5pCnmLslMgXCLkENgi1NnJMhYVOnckxGDZk54hqP\n"
"9RbLnkkCgYEA/yKuWEvgdzYRYkqpzB0l9ka7Y00CV4Dha9Of6GjQi9i4VCJ/UFVr\n"
"UlhTo5y0ZzpcDAPcoZf5CFZsD90a/BpQ3YTtdln2MMCL/Kr3QFmetkmDrt+3wYnX\n"
"sKESfsa2nZdOATRpl1antpwyD4RzsAeOPwBiACj4fkq5iZJBSI0bxrMCgYEA8GFi\n"
"qAjgKh81/Uai6KWTOW2kX02LEMVRrnZLQ9VPPLGid4KZDDk1/dEfxjjkcyOxX1Ux\n"
"Klu4W8ZEdZyzPcJrfk7PdopfGOfrhWzkREK9C40H7ou/1jUecq/STPfSOmxh3Y+D\n"
"ifMNO6z4sQAHx8VaHaxVsJ7SGR/spr0pkZL+NXsCgYEA84rIgBKWB1W+TGRXJzdf\n"
"yHIGaCjXpm2pQMN3LmP3RrcuZWm0vBt94dHcrR5l+u/zc6iwEDTAjJvqdU4rdyEr\n"
"tfkwr7v6TNlQB3WvpWanIPyVzfVSNFX/ZWSsAgZvxYjr9ixw6vzWBXOeOb/Gqu7b\n"
"cvpLkjmJ0wxDhbXtyXKhZA8CgYBZyvcQb+hUs732M4mtQBSD0kohc5TsGdlOQ1AQ\n"
"McFcmbpnzDghkclyW8jzwdLMk9uxEeDAwuxWE/UEvhlSi6qdzxC+Zifp5NBc0fVe\n"
"7lMx2mfJGxj5CnSqQLVdHQHB4zSXkAGB6XHbBd0MOUeuvzDPfs2voVQ4IG3FR0oc\n"
"3/znuwKBgQChZGH3McQcxmLA28aUwOVbWssfXKdDCsiJO+PEXXlL0maO3SbnFn+Q\n"
"Tyf8oHI5cdP7AbwDSx9bUfRPjg9dKKmATBFr2bn216pjGxK0OjYOCntFTVr0psRB\n"
"CrKg52Qrq71/2l4V2NLQZU40Dr1bN9V+Ftd9L0pvpCAEAWpIbLXGDw==\n"
"-----END RSA PRIVATE KEY-----"
)
PUBKEY_DATA = (
"-----BEGIN PUBLIC KEY-----\n"
"MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA75GR6ZTv5JOv90Vq8tKh\n"
"KC7YQnhDIo2hM0HVziTEk5R4UQBWa0CKytFMbTONY2msEDwX9iA0x7F5Lgj0X8eD\n"
"4ZMsYqLzqjWMekLC8bjhxc+EuPo9Dygu3mJ2VgRC7XhlFpmdo5NN8J2E7B/CNB3R\n"
"4hOcMMZNZdi0xLtFoTfwU61UPfFX14mV2laqLbvDEfQLJhUTDeFFV8EN5Z4H1ttL\n"
"P3sMXJvc3EvM0JiDVj4l1TWFUHHzeFgCA1Im0lv8i7PFrgW7nyMfK9uDSsUmIp7k\n"
"6ai4tVzwkTmV5PsriP1ju88Lo3MB4/sUmDv/JmlZ9YyzTO3Po8Uz3Aeq9HJWyBWH\n"
"AQIDAQAB\n"
"-----END PUBLIC KEY-----"
)
MSG = b"It's me, Mario"
SIG = (
b"\x07\xf3\xb1\xe7\xdb\x06\xf4_\xe2\xdc\xcb!F\xfb\xbex{W\x1d\xe4E"
b"\xd3\r\xc5\x90\xca(\x05\x1d\x99\x8b\x1aug\x9f\x95>\x94\x7f\xe3+"
b"\x12\xfa\x9c\xd4\xb8\x02]\x0e\xa5\xa3LL\xc3\xa2\x8f+\x83Z\x1b\x17"
b'\xbfT\xd3\xc7\xfd\x0b\xf4\xd7J\xfe^\x86q"I\xa3x\xbc\xd3$\xe9M<\xe1'
b"\x07\xad\xf2_\x9f\xfa\xf7g(~\xd8\xf5\xe7\xda-\xa3Ko\xfc.\x99\xcf"
b"\x9b\xb9\xc1U\x97\x82'\xcb\xc6\x08\xaa\xa0\xe4\xd0\xc1+\xfc\x86"
b'\r\xe4y\xb1#\xd3\x1dS\x96D28\xc4\xd5\r\xd4\x98\x1a44"\xd7\xc2\xb4'
b"]\xa7\x0f\xa7Db\x85G\x8c\xd6\x94!\x8af1O\xf6g\xd7\x03\xfd\xb3\xbc"
b"\xce\x9f\xe7\x015\xb8\x1d]AHK\xa0\x14m\xda=O\xa7\xde\xf2\xff\x9b"
b"\x8e\x83\xc8j\x11\x1a\x98\x85\xde\xc5\x91\x07\x84!\x12^4\xcb\xa8"
b"\x98\x8a\x8a&#\xb9(#?\x80\x15\x9eW\xb5\x12\xd1\x95S\xf2<G\xeb\xf1"
b"\x14H\xb2\xc4>\xc3A\xed\x86x~\xcfU\xd5Q\xfe~\x10\xd2\x9b"
)
TEST_KEY = (
"-----BEGIN RSA PUBLIC KEY-----\n"
"MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzLtFhsvfbFDFaUgulSEX\n"
"Gl12XriL1DT78Ef2/u8HHaSMmPie37BLWas/zaHwI6066bIyYQJ/nUCahTaoHM7L\n"
"GlWc0wOU6zyfpihCRQHil05Y6F+olFBoZuYbFPvtp7/hJx/D7I/0n2o/c7M5i3Y2\n"
"3sBxAYNooIQHXHUmPQW6C9iu95ylZDW8JQzYy/EI4vCC8yQMdTK8jK1FQV0Sbwny\n"
"qcMxSyAWDoFbnhh2P2TnO8HOWuUOaXR8ZHOJzVcDl+a6ew+medW090x3K5O1f80D\n"
"+WjgnG6b2HG7VQpOCfM2GALD/FrxicPilvZ38X1aLhJuwjmVE4LAAv8DVNJXohaO\n"
"WQIDAQAB\n"
"-----END RSA PUBLIC KEY-----\n"
)
SIGNATURE = (
b"w\xac\xfe18o\xeb\xfb\x14+\x9e\xd1\xb7\x7fe}\xec\xd6\xe1P\x9e\xab"
b"\xb5\x07\xe0\xc1\xfd\xda#\x04Z\x8d\x7f\x0b\x1f}:~\xb2s\x860u\x02N"
b'\xd4q"\xb7\x86*\x8f\x1f\xd0\x9d\x11\x92\xc5~\xa68\xac>\x12H\xc2%y,'
b"\xe6\xceU\x1e\xa3?\x0c,\xf0u\xbb\xd0[g_\xdd\x8b\xb0\x95:Y\x18\xa5*"
b"\x99\xfd\xf3K\x92\x92 ({\xd1\xff\xd9F\xc8\xd6K\x86e\xf9\xa8\xad\xb0z"
b"\xe3\x9dD\xf5k\x8b_<\xe7\xe7\xec\xf3\"'\xd5\xd2M\xb4\xce\x1a\xe3$"
b"\x9c\x81\xad\xf9\x11\xf6\xf5>)\xc7\xdd\x03&\xf7\x86@ks\xa6\x05\xc2"
b"\xd0\xbd\x1a7\xfc\xde\xe6\xb0\xad!\x12#\xc86Y\xea\xc5\xe3\xe2\xb3"
b"\xc9\xaf\xfa\x0c\xf2?\xbf\x93w\x18\x9e\x0b\xa2a\x10:M\x05\x89\xe2W.Q"
b"\xe8;yGT\xb1\xf2\xc6A\xd2\xc4\xbeN\xb3\xcfS\xaf\x03f\xe2\xb4)\xe7\xf6"
b'\xdbs\xd0Z}8\xa4\xd2\x1fW*\xe6\x1c"\x8b\xd0\x18w\xb9\x7f\x9e\x96\xa3'
b"\xd9v\xf7\x833\x8e\x01"
)
def test_get_rsa_pub_key_bad_key(tmp_path):
"""
@ -211,6 +320,7 @@ def test_master_keys_with_cluster_id(tmp_path, master_opts):
# The paths need to exist
master_pki_path.mkdir()
cluster_pki_path.mkdir()
(cluster_pki_path / "peers").mkdir()
master_opts["pki_dir"] = str(master_pki_path)
master_opts["cluster_id"] = "cluster1"
@ -229,3 +339,296 @@ def test_master_keys_with_cluster_id(tmp_path, master_opts):
assert mkeys.pub_path == expected_cluster_pub
assert mkeys.rsa_path == expected_cluster_rsa
assert mkeys.key == mkeys.cluster_key
@pytest.mark.skipif(not HAS_PYCRYPTO_RSA, reason="pycrypto >= 2.6 is not available")
@pytest.mark.skipif(HAS_M2, reason="m2crypto is used by salt.crypt if installed")
def test_pycrypto_gen_keys():
open_priv_wb = MockCall("/keydir{}keyname.pem".format(os.sep), "wb+")
open_pub_wb = MockCall("/keydir{}keyname.pub".format(os.sep), "wb+")
with patch.multiple(
os,
umask=MagicMock(),
chmod=MagicMock(),
access=MagicMock(return_value=True),
):
with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
"os.path.isfile", return_value=True
):
result = salt.crypt.gen_keys("/keydir", "keyname", 2048)
assert result == "/keydir{}keyname.pem".format(os.sep), result
assert open_priv_wb not in m_open.calls
assert open_pub_wb not in m_open.calls
with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
"os.path.isfile", return_value=False
):
salt.crypt.gen_keys("/keydir", "keyname", 2048)
assert open_priv_wb in m_open.calls
assert open_pub_wb in m_open.calls
@patch("os.umask", MagicMock())
@patch("os.chmod", MagicMock())
@patch("os.chown", MagicMock(), create=True)
@patch("os.access", MagicMock(return_value=True))
@pytest.mark.slow_test
@pytest.mark.skipif(not HAS_PYCRYPTO_RSA, reason="pycrypto >= 2.6 is not available")
@pytest.mark.skipif(HAS_M2, reason="m2crypto is used by salt.crypt if installed")
def test_pycrypto_gen_keys_with_passphrase():
key_path = os.path.join(os.sep, "keydir")
open_priv_wb = MockCall(os.path.join(key_path, "keyname.pem"), "wb+")
open_pub_wb = MockCall(os.path.join(key_path, "keyname.pub"), "wb+")
with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
"os.path.isfile", return_value=True
):
assert salt.crypt.gen_keys(
key_path, "keyname", 2048, passphrase="password"
) == os.path.join(key_path, "keyname.pem")
result = salt.crypt.gen_keys(key_path, "keyname", 2048, passphrase="password")
assert result == os.path.join(key_path, "keyname.pem"), result
assert open_priv_wb not in m_open.calls
assert open_pub_wb not in m_open.calls
with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
"os.path.isfile", return_value=False
):
salt.crypt.gen_keys(key_path, "keyname", 2048)
assert open_priv_wb in m_open.calls
assert open_pub_wb in m_open.calls
@pytest.mark.skipif(not HAS_PYCRYPTO_RSA, reason="pycrypto >= 2.6 is not available")
@pytest.mark.skipif(HAS_M2, reason="m2crypto is used by salt.crypt if installed")
def test_pycrypto_sign_message():
key = RSA.importKey(PRIVKEY_DATA)
with patch("salt.crypt.get_rsa_key", return_value=key):
assert SIG == salt.crypt.sign_message("/keydir/keyname.pem", MSG)
@pytest.mark.skipif(not HAS_PYCRYPTO_RSA, reason="pycrypto >= 2.6 is not available")
@pytest.mark.skipif(HAS_M2, reason="m2crypto is used by salt.crypt if installed")
def test_pycrypto_sign_message_with_passphrase():
key = RSA.importKey(PRIVKEY_DATA)
with patch("salt.crypt.get_rsa_key", return_value=key):
assert SIG == salt.crypt.sign_message(
"/keydir/keyname.pem", MSG, passphrase="password"
)
@pytest.mark.skipif(not HAS_PYCRYPTO_RSA, reason="pycrypto >= 2.6 is not available")
@pytest.mark.skipif(HAS_M2, reason="m2crypto is used by salt.crypt if installed")
def test_pycrypto_verify_signature():
with patch("salt.utils.files.fopen", mock_open(read_data=PUBKEY_DATA)):
assert salt.crypt.verify_signature("/keydir/keyname.pub", MSG, SIG)
@patch("os.umask", MagicMock())
@patch("os.chmod", MagicMock())
@patch("os.access", MagicMock(return_value=True))
@pytest.mark.skipif(not HAS_M2, reason="m2crypto is not available")
@pytest.mark.slow_test
def test_m2_gen_keys():
with patch("M2Crypto.RSA.RSA.save_pem", MagicMock()) as save_pem:
with patch("M2Crypto.RSA.RSA.save_pub_key", MagicMock()) as save_pub:
with patch("os.path.isfile", return_value=True):
assert salt.crypt.gen_keys(
"/keydir", "keyname", 2048
) == "/keydir{}keyname.pem".format(os.sep)
save_pem.assert_not_called()
save_pub.assert_not_called()
with patch("os.path.isfile", return_value=False):
salt.crypt.gen_keys(
"/keydir", "keyname", 2048
) == "/keydir{}keyname.pem".format(os.sep)
save_pem.assert_called_once_with(
"/keydir{}keyname.pem".format(os.sep), cipher=None
)
save_pub.assert_called_once_with("/keydir{}keyname.pub".format(os.sep))
@patch("os.umask", MagicMock())
@patch("os.chmod", MagicMock())
@patch("os.chown", MagicMock())
@patch("os.access", MagicMock(return_value=True))
@pytest.mark.skipif(not HAS_M2, reason="m2crypto is not available")
@pytest.mark.slow_test
def test_gen_keys_with_passphrase():
with patch("M2Crypto.RSA.RSA.save_pem", MagicMock()) as save_pem:
with patch("M2Crypto.RSA.RSA.save_pub_key", MagicMock()) as save_pub:
with patch("os.path.isfile", return_value=True):
assert salt.crypt.gen_keys(
"/keydir", "keyname", 2048, passphrase="password"
) == "/keydir{}keyname.pem".format(os.sep)
save_pem.assert_not_called()
save_pub.assert_not_called()
with patch("os.path.isfile", return_value=False):
assert salt.crypt.gen_keys(
"/keydir", "keyname", 2048, passphrase="password"
) == "/keydir{}keyname.pem".format(os.sep)
callback = save_pem.call_args[1]["callback"]
save_pem.assert_called_once_with(
"/keydir{}keyname.pem".format(os.sep),
cipher="des_ede3_cbc",
callback=callback,
)
assert callback(None) == b"password"
save_pub.assert_called_once_with("/keydir{}keyname.pub".format(os.sep))
@pytest.mark.skipif(not HAS_M2, reason="m2crypto is not available")
def test_m2_sign_message_with_passphrase():
key = M2Crypto.RSA.load_key_string(salt.utils.stringutils.to_bytes(PRIVKEY_DATA))
with patch("salt.crypt.get_rsa_key", return_value=key):
assert SIG == salt.crypt.sign_message(
"/keydir/keyname.pem", MSG, passphrase="password"
)
@pytest.mark.skipif(not HAS_M2, reason="m2crypto is not available")
def test_m2_verify_signature():
with patch(
"salt.utils.files.fopen",
mock_open(read_data=salt.utils.stringutils.to_bytes(PUBKEY_DATA)),
):
assert salt.crypt.verify_signature("/keydir/keyname.pub", MSG, SIG)
@pytest.mark.skipif(not HAS_M2, reason="m2crypto is not available")
def test_m2_encrypt_decrypt_bin():
priv_key = M2Crypto.RSA.load_key_string(
salt.utils.stringutils.to_bytes(PRIVKEY_DATA)
)
pub_key = M2Crypto.RSA.load_pub_key_bio(
M2Crypto.BIO.MemoryBuffer(salt.utils.stringutils.to_bytes(PUBKEY_DATA))
)
encrypted = salt.crypt.private_encrypt(priv_key, b"salt")
decrypted = salt.crypt.public_decrypt(pub_key, encrypted)
assert b"salt" == decrypted
@pytest.fixture
def key_to_test(tmp_path):
key_path = tmp_path / "cryptodom-3.4.6.pub"
with salt.utils.files.fopen(key_path, "wb") as fd:
fd.write(TEST_KEY.encode())
return key_path
@pytest.mark.skipif(not HAS_M2, reason="Skip when m2crypto is not installed")
def test_m2_bad_key(key_to_test):
"""
Load public key with an invalid header using m2crypto and validate it
"""
key = salt.crypt.get_rsa_pub_key(key_to_test)
assert key.check_key() == 1
@pytest.mark.skipif(not HAS_M2, reason="Skip when m2crypto is not installed")
def test_m2_bad_key(key_to_test):
"""
Load public key with an invalid header using m2crypto and validate it
"""
key = salt.crypt.get_rsa_pub_key(key_to_test)
assert key.check_key() == 1
@pytest.mark.skipif(HAS_M2, reason="Skip when m2crypto is installed")
def test_pycrypto_bad_key(key_to_test):
"""
Load public key with an invalid header and validate it without m2crypto
"""
key = salt.crypt.get_rsa_pub_key(key_to_test)
assert key.can_encrypt()
@pytest.mark.skipif(not HAS_M2, reason="Skip when m2crypto is not installed")
def test_m2crypto_verify_bytes_47124():
message = salt.utils.stringutils.to_unicode("meh")
with patch(
"salt.utils.files.fopen",
mock_open(read_data=salt.utils.stringutils.to_bytes(PUBKEY_DATA)),
):
salt.crypt.verify_signature("/keydir/keyname.pub", message, SIGNATURE)
@pytest.mark.skipif(not HAS_M2, reason="Skip when m2crypto is not installed")
def test_m2crypto_verify_unicode_47124():
message = salt.utils.stringutils.to_bytes("meh")
with patch(
"salt.utils.files.fopen",
mock_open(read_data=salt.utils.stringutils.to_bytes(PUBKEY_DATA)),
):
salt.crypt.verify_signature("/keydir/keyname.pub", message, SIGNATURE)
@pytest.mark.skipif(not HAS_M2, reason="Skip when m2crypto is not installed")
def test_m2crypto_sign_bytes_47124():
message = salt.utils.stringutils.to_unicode("meh")
key = M2Crypto.RSA.load_key_string(salt.utils.stringutils.to_bytes(PRIVKEY_DATA))
with patch("salt.crypt.get_rsa_key", return_value=key):
signature = salt.crypt.sign_message(
"/keydir/keyname.pem", message, passphrase="password"
)
assert SIGNATURE == signature
@pytest.mark.skipif(not HAS_M2, reason="Skip when m2crypto is not installed")
def test_m2crypto_sign_unicode_47124():
message = salt.utils.stringutils.to_bytes("meh")
key = M2Crypto.RSA.load_key_string(salt.utils.stringutils.to_bytes(PRIVKEY_DATA))
with patch("salt.crypt.get_rsa_key", return_value=key):
signature = salt.crypt.sign_message(
"/keydir/keyname.pem", message, passphrase="password"
)
assert SIGNATURE == signature
def test_pwdata_decrypt():
key_string = dedent(
"""-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAzhBRyyHa7b63RLE71uKMKgrpulcAJjaIaN68ltXcCvy4w9pi
Kj+4I3Qp6RvUaHOEmymqyjOMjQc6iwpe0scCFqh3nUk5YYaLZ3WAW0htQVlnesgB
ZiBg9PBeTQY/LzqtudL6RCng/AX+fbnCsddlIysRxnUoNVMvz0gAmCY2mnTDjcTt
pyxuk2T0AHSHNCKCalm75L1bWDFF+UzFemf536tBfBUGRWR6jWTij85vvCntxHS/
HdknaTJ50E7XGVzwBJpCyV4Y2VXuW/3KrCNTqXw+jTmEw0vlcshfDg/vb3IxsUSK
5KuHalKq/nUIc+F4QCJOl+A10goGdIfYC1/67QIDAQABAoIBAAOP+qoFWtCTZH22
hq9PWVb8u0+yY1lFxhPyDdaZueUiu1r/coUCdv996Z+TEJgBr0AzdzVpsLtbbaKr
ujnwoNOdc/vvISPTfKN8P4zUcrcXgZd4z7VhR+vUH/0652q8m/ZDdHorMy2IOP8Z
cAk9DQ2PmA4TRm+tkX0G5KO8vWLsK921aRMWdsKJyQ0lYxl7M8JWupFsCJFr/U+8
dAVtwnUiS7RnhBABZ1cfNTHYhXVAh4d+a9y/gZ00a66OGqPxiXfhjjDUZ6fGvWKN
FlhKWEg6YqIx/H4aNXkLI5Rzzhdx/c2ukNm7+X2veRcAW7bcTwk8wxJxciEP5pBi
1el9VE0CgYEA/lbzdE2M4yRBvTfYYC6BqZcn+BqtrAUc2h3fEy+p7lwlet0af1id
gWpYpOJyLc0AUfR616/m2y3PwEH/nMKDSTuU7o/qKNtlHW0nQcnhDCjTUydS3+J/
JM3dhfgVqi03rjqNcgHA2eOEwcu/OBZtiaC0wqKbuRZRtfGffyoO3ssCgYEAz2iw
wqu/NkA+MdQIxz/a3Is7gGwoFu6h7O+XU2uN8Y2++jSBw9AzzWj31YCvyjuJPAE+
gxHm6yOnNoLVn423NtibHejhabzHNIK6UImH99bSTKabsxfF2BX6v982BimU1jwc
bYykzws37oN/poPb5FTpEiAUrsd2bAMn/1S43icCgYEAulHkY0z0aumCpyUkA8HO
BvjOtPiGRcAxFLBRXPLL3+vtIQachLHcIJRRf+jLkDXfiCo7W4pm6iWzTbqLkMEG
AD3/qowPFAM1Hct6uL01efzmYsIp+g0o60NMhvnolRQu+Bm4yM30AyqjdHzYBjSX
5fyuru8EeSCal1j8aOHcpuUCgYEAhGhDH6Pg59NPYSQJjpm3MMA59hwV473n5Yh2
xKyO6zwgRT6r8MPDrkhqnwQONT6Yt5PbwnT1Q/t4zhXsJnWkFwFk1U1MSeJYEa+7
HZsPECs2CfT6xPRSO0ac00y+AmUdPT8WruDwfbSdukh8f2MCR9vlBsswKPvxH7dM
G3aMplUCgYEAmMFgB/6Ox4OsQPPC6g4G+Ezytkc4iVkMEcjiVWzEsYATITjq3weO
/XDGBYJoBhYwWPi9oBufFc/2pNtWy1FKKXPuVyXQATdA0mfEPbtsHjMFQNZbeKnm
0na/SysSDCK3P+9ijlbjqLjMmPEmhJxGWTJ7khnTTkfre7/w9ZxJxi8=
-----END RSA PRIVATE KEY-----"""
)
pwdata = (
b"V\x80+b\xca\x06M\xb6\x12\xc6\xe8\xf2\xb5\xbb\xd8m\xc0\x97\x9a\xeb\xb9q\x19\xc3"
b'\xcdi\xb84\x90\xaf\x12kT\xe2@u\xd6\xe8T\x89\xa3\xc7\xb2Y\xd1N\x00\xa9\xc0"\xbe'
b"\xed\xb1\xc3\xb7^\xbf\xbd\x8b\x13\xd3/L\x1b\xa1`\xe2\xea\x03\x98\x82\xf3uS&|"
b'\xe5\xd8J\xce\xfc\x97\x8d\x0b\x949\xc0\xbd^\xef\xc6\xfd\xce\xbb\x1e\xd0"(m\xe1'
b"\x95\xfb\xc8/\x07\x93\xb8\xda\x8f\x99\xfe\xdc\xd5\xcb\xdb\xb2\xf11M\xdbD\xcf"
b"\x95\x13p\r\xa4\x1c{\xd5\xdb\xc7\xe5\xaf\x95F\x97\xa9\x00p~\xb5\xec\xa4.\xd0"
b"\xa4\xb4\xf4f\xcds,Y/\xa1:WF\xb8\xc7\x07\xaa\x0b<'~\x1b$D9\xd4\x8d\xf0x\xc5"
b"\xee\xa8:\xe6\x00\x10\xc5i\x11\xc7]C8\x05l\x8b\x9b\xc3\x83e\xf7y\xadi:0\xb4R"
b"\x1a(\x04&yL8\x19s\n\x11\x81\xfd?\xfb2\x80Ll\xa1\xdc\xc9\xb6P\xca\x8d'\x11\xc1"
b"\x07\xa5\xa1\x058\xc7\xce\xbeb\x92\xbf\x0bL\xec\xdf\xc3M\x83\xfb$\xec\xd5\xf9"
)
assert "1234", salt.crypt.pwdata_decrypt(key_string, pwdata)

View file

@ -1,419 +0,0 @@
import os
import shutil
import tempfile
import pytest
import salt.utils.files
import salt.utils.stringutils
from salt import crypt
from tests.support.mock import MagicMock, MockCall, mock_open, patch
from tests.support.unit import TestCase
try:
import M2Crypto
HAS_M2 = True
except ImportError:
HAS_M2 = False
try:
from Cryptodome.PublicKey import RSA
HAS_PYCRYPTO_RSA = True
except ImportError:
HAS_PYCRYPTO_RSA = False
if not HAS_PYCRYPTO_RSA:
try:
from Crypto.PublicKey import RSA # nosec
HAS_PYCRYPTO_RSA = True
except ImportError:
HAS_PYCRYPTO_RSA = False
PRIVKEY_DATA = (
"-----BEGIN RSA PRIVATE KEY-----\n"
"MIIEpAIBAAKCAQEA75GR6ZTv5JOv90Vq8tKhKC7YQnhDIo2hM0HVziTEk5R4UQBW\n"
"a0CKytFMbTONY2msEDwX9iA0x7F5Lgj0X8eD4ZMsYqLzqjWMekLC8bjhxc+EuPo9\n"
"Dygu3mJ2VgRC7XhlFpmdo5NN8J2E7B/CNB3R4hOcMMZNZdi0xLtFoTfwU61UPfFX\n"
"14mV2laqLbvDEfQLJhUTDeFFV8EN5Z4H1ttLP3sMXJvc3EvM0JiDVj4l1TWFUHHz\n"
"eFgCA1Im0lv8i7PFrgW7nyMfK9uDSsUmIp7k6ai4tVzwkTmV5PsriP1ju88Lo3MB\n"
"4/sUmDv/JmlZ9YyzTO3Po8Uz3Aeq9HJWyBWHAQIDAQABAoIBAGOzBzBYZUWRGOgl\n"
"IY8QjTT12dY/ymC05GM6gMobjxuD7FZ5d32HDLu/QrknfS3kKlFPUQGDAbQhbbb0\n"
"zw6VL5NO9mfOPO2W/3FaG1sRgBQcerWonoSSSn8OJwVBHMFLG3a+U1Zh1UvPoiPK\n"
"S734swIM+zFpNYivGPvOm/muF/waFf8tF/47t1cwt/JGXYQnkG/P7z0vp47Irpsb\n"
"Yjw7vPe4BnbY6SppSxscW3KoV7GtJLFKIxAXbxsuJMF/rYe3O3w2VKJ1Sug1VDJl\n"
"/GytwAkSUer84WwP2b07Wn4c5pCnmLslMgXCLkENgi1NnJMhYVOnckxGDZk54hqP\n"
"9RbLnkkCgYEA/yKuWEvgdzYRYkqpzB0l9ka7Y00CV4Dha9Of6GjQi9i4VCJ/UFVr\n"
"UlhTo5y0ZzpcDAPcoZf5CFZsD90a/BpQ3YTtdln2MMCL/Kr3QFmetkmDrt+3wYnX\n"
"sKESfsa2nZdOATRpl1antpwyD4RzsAeOPwBiACj4fkq5iZJBSI0bxrMCgYEA8GFi\n"
"qAjgKh81/Uai6KWTOW2kX02LEMVRrnZLQ9VPPLGid4KZDDk1/dEfxjjkcyOxX1Ux\n"
"Klu4W8ZEdZyzPcJrfk7PdopfGOfrhWzkREK9C40H7ou/1jUecq/STPfSOmxh3Y+D\n"
"ifMNO6z4sQAHx8VaHaxVsJ7SGR/spr0pkZL+NXsCgYEA84rIgBKWB1W+TGRXJzdf\n"
"yHIGaCjXpm2pQMN3LmP3RrcuZWm0vBt94dHcrR5l+u/zc6iwEDTAjJvqdU4rdyEr\n"
"tfkwr7v6TNlQB3WvpWanIPyVzfVSNFX/ZWSsAgZvxYjr9ixw6vzWBXOeOb/Gqu7b\n"
"cvpLkjmJ0wxDhbXtyXKhZA8CgYBZyvcQb+hUs732M4mtQBSD0kohc5TsGdlOQ1AQ\n"
"McFcmbpnzDghkclyW8jzwdLMk9uxEeDAwuxWE/UEvhlSi6qdzxC+Zifp5NBc0fVe\n"
"7lMx2mfJGxj5CnSqQLVdHQHB4zSXkAGB6XHbBd0MOUeuvzDPfs2voVQ4IG3FR0oc\n"
"3/znuwKBgQChZGH3McQcxmLA28aUwOVbWssfXKdDCsiJO+PEXXlL0maO3SbnFn+Q\n"
"Tyf8oHI5cdP7AbwDSx9bUfRPjg9dKKmATBFr2bn216pjGxK0OjYOCntFTVr0psRB\n"
"CrKg52Qrq71/2l4V2NLQZU40Dr1bN9V+Ftd9L0pvpCAEAWpIbLXGDw==\n"
"-----END RSA PRIVATE KEY-----"
)
PUBKEY_DATA = (
"-----BEGIN PUBLIC KEY-----\n"
"MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA75GR6ZTv5JOv90Vq8tKh\n"
"KC7YQnhDIo2hM0HVziTEk5R4UQBWa0CKytFMbTONY2msEDwX9iA0x7F5Lgj0X8eD\n"
"4ZMsYqLzqjWMekLC8bjhxc+EuPo9Dygu3mJ2VgRC7XhlFpmdo5NN8J2E7B/CNB3R\n"
"4hOcMMZNZdi0xLtFoTfwU61UPfFX14mV2laqLbvDEfQLJhUTDeFFV8EN5Z4H1ttL\n"
"P3sMXJvc3EvM0JiDVj4l1TWFUHHzeFgCA1Im0lv8i7PFrgW7nyMfK9uDSsUmIp7k\n"
"6ai4tVzwkTmV5PsriP1ju88Lo3MB4/sUmDv/JmlZ9YyzTO3Po8Uz3Aeq9HJWyBWH\n"
"AQIDAQAB\n"
"-----END PUBLIC KEY-----"
)
MSG = b"It's me, Mario"
SIG = (
b"\x07\xf3\xb1\xe7\xdb\x06\xf4_\xe2\xdc\xcb!F\xfb\xbex{W\x1d\xe4E"
b"\xd3\r\xc5\x90\xca(\x05\x1d\x99\x8b\x1aug\x9f\x95>\x94\x7f\xe3+"
b"\x12\xfa\x9c\xd4\xb8\x02]\x0e\xa5\xa3LL\xc3\xa2\x8f+\x83Z\x1b\x17"
b'\xbfT\xd3\xc7\xfd\x0b\xf4\xd7J\xfe^\x86q"I\xa3x\xbc\xd3$\xe9M<\xe1'
b"\x07\xad\xf2_\x9f\xfa\xf7g(~\xd8\xf5\xe7\xda-\xa3Ko\xfc.\x99\xcf"
b"\x9b\xb9\xc1U\x97\x82'\xcb\xc6\x08\xaa\xa0\xe4\xd0\xc1+\xfc\x86"
b'\r\xe4y\xb1#\xd3\x1dS\x96D28\xc4\xd5\r\xd4\x98\x1a44"\xd7\xc2\xb4'
b"]\xa7\x0f\xa7Db\x85G\x8c\xd6\x94!\x8af1O\xf6g\xd7\x03\xfd\xb3\xbc"
b"\xce\x9f\xe7\x015\xb8\x1d]AHK\xa0\x14m\xda=O\xa7\xde\xf2\xff\x9b"
b"\x8e\x83\xc8j\x11\x1a\x98\x85\xde\xc5\x91\x07\x84!\x12^4\xcb\xa8"
b"\x98\x8a\x8a&#\xb9(#?\x80\x15\x9eW\xb5\x12\xd1\x95S\xf2<G\xeb\xf1"
b"\x14H\xb2\xc4>\xc3A\xed\x86x~\xcfU\xd5Q\xfe~\x10\xd2\x9b"
)
@pytest.mark.skipif(not HAS_PYCRYPTO_RSA, reason="pycrypto >= 2.6 is not available")
@pytest.mark.skipif(HAS_M2, reason="m2crypto is used by salt.crypt if installed")
class CryptTestCase(TestCase):
@pytest.mark.slow_test
def test_gen_keys(self):
open_priv_wb = MockCall("/keydir{}keyname.pem".format(os.sep), "wb+")
open_pub_wb = MockCall("/keydir{}keyname.pub".format(os.sep), "wb+")
with patch.multiple(
os,
umask=MagicMock(),
chmod=MagicMock(),
access=MagicMock(return_value=True),
):
with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
"os.path.isfile", return_value=True
):
result = crypt.gen_keys("/keydir", "keyname", 2048)
assert result == "/keydir{}keyname.pem".format(os.sep), result
assert open_priv_wb not in m_open.calls
assert open_pub_wb not in m_open.calls
with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
"os.path.isfile", return_value=False
):
crypt.gen_keys("/keydir", "keyname", 2048)
assert open_priv_wb in m_open.calls
assert open_pub_wb in m_open.calls
@patch("os.umask", MagicMock())
@patch("os.chmod", MagicMock())
@patch("os.chown", MagicMock(), create=True)
@patch("os.access", MagicMock(return_value=True))
@pytest.mark.slow_test
def test_gen_keys_with_passphrase(self):
key_path = os.path.join(os.sep, "keydir")
open_priv_wb = MockCall(os.path.join(key_path, "keyname.pem"), "wb+")
open_pub_wb = MockCall(os.path.join(key_path, "keyname.pub"), "wb+")
with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
"os.path.isfile", return_value=True
):
self.assertEqual(
crypt.gen_keys(key_path, "keyname", 2048, passphrase="password"),
os.path.join(key_path, "keyname.pem"),
)
result = crypt.gen_keys(key_path, "keyname", 2048, passphrase="password")
assert result == os.path.join(key_path, "keyname.pem"), result
assert open_priv_wb not in m_open.calls
assert open_pub_wb not in m_open.calls
with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
"os.path.isfile", return_value=False
):
crypt.gen_keys(key_path, "keyname", 2048)
assert open_priv_wb in m_open.calls
assert open_pub_wb in m_open.calls
def test_sign_message(self):
key = RSA.importKey(PRIVKEY_DATA)
with patch("salt.crypt.get_rsa_key", return_value=key):
self.assertEqual(SIG, salt.crypt.sign_message("/keydir/keyname.pem", MSG))
def test_sign_message_with_passphrase(self):
key = RSA.importKey(PRIVKEY_DATA)
with patch("salt.crypt.get_rsa_key", return_value=key):
self.assertEqual(
SIG,
crypt.sign_message("/keydir/keyname.pem", MSG, passphrase="password"),
)
def test_verify_signature(self):
with patch("salt.utils.files.fopen", mock_open(read_data=PUBKEY_DATA)):
self.assertTrue(crypt.verify_signature("/keydir/keyname.pub", MSG, SIG))
@pytest.mark.skipif(not HAS_M2, reason="m2crypto is not available")
class M2CryptTestCase(TestCase):
@patch("os.umask", MagicMock())
@patch("os.chmod", MagicMock())
@patch("os.access", MagicMock(return_value=True))
@pytest.mark.slow_test
def test_gen_keys(self):
with patch("M2Crypto.RSA.RSA.save_pem", MagicMock()) as save_pem:
with patch("M2Crypto.RSA.RSA.save_pub_key", MagicMock()) as save_pub:
with patch("os.path.isfile", return_value=True):
self.assertEqual(
crypt.gen_keys("/keydir", "keyname", 2048),
"/keydir{}keyname.pem".format(os.sep),
)
save_pem.assert_not_called()
save_pub.assert_not_called()
with patch("os.path.isfile", return_value=False):
self.assertEqual(
crypt.gen_keys("/keydir", "keyname", 2048),
"/keydir{}keyname.pem".format(os.sep),
)
save_pem.assert_called_once_with(
"/keydir{}keyname.pem".format(os.sep), cipher=None
)
save_pub.assert_called_once_with(
"/keydir{}keyname.pub".format(os.sep)
)
@patch("os.umask", MagicMock())
@patch("os.chmod", MagicMock())
@patch("os.chown", MagicMock())
@patch("os.access", MagicMock(return_value=True))
@pytest.mark.slow_test
def test_gen_keys_with_passphrase(self):
with patch("M2Crypto.RSA.RSA.save_pem", MagicMock()) as save_pem:
with patch("M2Crypto.RSA.RSA.save_pub_key", MagicMock()) as save_pub:
with patch("os.path.isfile", return_value=True):
self.assertEqual(
crypt.gen_keys(
"/keydir", "keyname", 2048, passphrase="password"
),
"/keydir{}keyname.pem".format(os.sep),
)
save_pem.assert_not_called()
save_pub.assert_not_called()
with patch("os.path.isfile", return_value=False):
self.assertEqual(
crypt.gen_keys(
"/keydir", "keyname", 2048, passphrase="password"
),
"/keydir{}keyname.pem".format(os.sep),
)
callback = save_pem.call_args[1]["callback"]
save_pem.assert_called_once_with(
"/keydir{}keyname.pem".format(os.sep),
cipher="des_ede3_cbc",
callback=callback,
)
self.assertEqual(callback(None), b"password")
save_pub.assert_called_once_with(
"/keydir{}keyname.pub".format(os.sep)
)
def test_sign_message(self):
key = M2Crypto.RSA.load_key_string(
salt.utils.stringutils.to_bytes(PRIVKEY_DATA)
)
with patch("salt.crypt.get_rsa_key", return_value=key):
self.assertEqual(SIG, salt.crypt.sign_message("/keydir/keyname.pem", MSG))
def test_sign_message_with_passphrase(self):
key = M2Crypto.RSA.load_key_string(
salt.utils.stringutils.to_bytes(PRIVKEY_DATA)
)
with patch("salt.crypt.get_rsa_key", return_value=key):
self.assertEqual(
SIG,
crypt.sign_message("/keydir/keyname.pem", MSG, passphrase="password"),
)
def test_verify_signature(self):
with patch(
"salt.utils.files.fopen",
mock_open(read_data=salt.utils.stringutils.to_bytes(PUBKEY_DATA)),
):
self.assertTrue(crypt.verify_signature("/keydir/keyname.pub", MSG, SIG))
def test_encrypt_decrypt_bin(self):
priv_key = M2Crypto.RSA.load_key_string(
salt.utils.stringutils.to_bytes(PRIVKEY_DATA)
)
pub_key = M2Crypto.RSA.load_pub_key_bio(
M2Crypto.BIO.MemoryBuffer(salt.utils.stringutils.to_bytes(PUBKEY_DATA))
)
encrypted = salt.crypt.private_encrypt(priv_key, b"salt")
decrypted = salt.crypt.public_decrypt(pub_key, encrypted)
self.assertEqual(b"salt", decrypted)
class TestBadCryptodomePubKey(TestCase):
"""
Test that we can load public keys exported by pycrpytodome<=3.4.6
"""
TEST_KEY = (
"-----BEGIN RSA PUBLIC KEY-----\n"
"MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzLtFhsvfbFDFaUgulSEX\n"
"Gl12XriL1DT78Ef2/u8HHaSMmPie37BLWas/zaHwI6066bIyYQJ/nUCahTaoHM7L\n"
"GlWc0wOU6zyfpihCRQHil05Y6F+olFBoZuYbFPvtp7/hJx/D7I/0n2o/c7M5i3Y2\n"
"3sBxAYNooIQHXHUmPQW6C9iu95ylZDW8JQzYy/EI4vCC8yQMdTK8jK1FQV0Sbwny\n"
"qcMxSyAWDoFbnhh2P2TnO8HOWuUOaXR8ZHOJzVcDl+a6ew+medW090x3K5O1f80D\n"
"+WjgnG6b2HG7VQpOCfM2GALD/FrxicPilvZ38X1aLhJuwjmVE4LAAv8DVNJXohaO\n"
"WQIDAQAB\n"
"-----END RSA PUBLIC KEY-----\n"
)
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.key_path = os.path.join(self.test_dir, "cryptodom-3.4.6.pub")
with salt.utils.files.fopen(self.key_path, "wb") as fd:
fd.write(self.TEST_KEY.encode())
def tearDown(self):
shutil.rmtree(self.test_dir)
@pytest.mark.skipif(not HAS_M2, reason="Skip when m2crypto is not installed")
def test_m2_bad_key(self):
"""
Load public key with an invalid header using m2crypto and validate it
"""
key = salt.crypt.get_rsa_pub_key(self.key_path)
assert key.check_key() == 1
@pytest.mark.skipif(HAS_M2, reason="Skip when m2crypto is installed")
def test_crypto_bad_key(self):
"""
Load public key with an invalid header and validate it without m2crypto
"""
key = salt.crypt.get_rsa_pub_key(self.key_path)
assert key.can_encrypt()
class TestM2CryptoRegression47124(TestCase):
SIGNATURE = (
b"w\xac\xfe18o\xeb\xfb\x14+\x9e\xd1\xb7\x7fe}\xec\xd6\xe1P\x9e\xab"
b"\xb5\x07\xe0\xc1\xfd\xda#\x04Z\x8d\x7f\x0b\x1f}:~\xb2s\x860u\x02N"
b'\xd4q"\xb7\x86*\x8f\x1f\xd0\x9d\x11\x92\xc5~\xa68\xac>\x12H\xc2%y,'
b"\xe6\xceU\x1e\xa3?\x0c,\xf0u\xbb\xd0[g_\xdd\x8b\xb0\x95:Y\x18\xa5*"
b"\x99\xfd\xf3K\x92\x92 ({\xd1\xff\xd9F\xc8\xd6K\x86e\xf9\xa8\xad\xb0z"
b"\xe3\x9dD\xf5k\x8b_<\xe7\xe7\xec\xf3\"'\xd5\xd2M\xb4\xce\x1a\xe3$"
b"\x9c\x81\xad\xf9\x11\xf6\xf5>)\xc7\xdd\x03&\xf7\x86@ks\xa6\x05\xc2"
b"\xd0\xbd\x1a7\xfc\xde\xe6\xb0\xad!\x12#\xc86Y\xea\xc5\xe3\xe2\xb3"
b"\xc9\xaf\xfa\x0c\xf2?\xbf\x93w\x18\x9e\x0b\xa2a\x10:M\x05\x89\xe2W.Q"
b"\xe8;yGT\xb1\xf2\xc6A\xd2\xc4\xbeN\xb3\xcfS\xaf\x03f\xe2\xb4)\xe7\xf6"
b'\xdbs\xd0Z}8\xa4\xd2\x1fW*\xe6\x1c"\x8b\xd0\x18w\xb9\x7f\x9e\x96\xa3'
b"\xd9v\xf7\x833\x8e\x01"
)
@pytest.mark.skipif(not HAS_M2, reason="Skip when m2crypto is not installed")
def test_m2crypto_verify_bytes(self):
message = salt.utils.stringutils.to_unicode("meh")
with patch(
"salt.utils.files.fopen",
mock_open(read_data=salt.utils.stringutils.to_bytes(PUBKEY_DATA)),
):
salt.crypt.verify_signature("/keydir/keyname.pub", message, self.SIGNATURE)
@pytest.mark.skipif(not HAS_M2, reason="Skip when m2crypto is not installed")
def test_m2crypto_verify_unicode(self):
message = salt.utils.stringutils.to_bytes("meh")
with patch(
"salt.utils.files.fopen",
mock_open(read_data=salt.utils.stringutils.to_bytes(PUBKEY_DATA)),
):
salt.crypt.verify_signature("/keydir/keyname.pub", message, self.SIGNATURE)
@pytest.mark.skipif(not HAS_M2, reason="Skip when m2crypto is not installed")
def test_m2crypto_sign_bytes(self):
message = salt.utils.stringutils.to_unicode("meh")
key = M2Crypto.RSA.load_key_string(
salt.utils.stringutils.to_bytes(PRIVKEY_DATA)
)
with patch("salt.crypt.get_rsa_key", return_value=key):
signature = salt.crypt.sign_message(
"/keydir/keyname.pem", message, passphrase="password"
)
self.assertEqual(signature, self.SIGNATURE)
@pytest.mark.skipif(not HAS_M2, reason="Skip when m2crypto is not installed")
def test_m2crypto_sign_unicode(self):
message = salt.utils.stringutils.to_bytes("meh")
key = M2Crypto.RSA.load_key_string(
salt.utils.stringutils.to_bytes(PRIVKEY_DATA)
)
with patch("salt.crypt.get_rsa_key", return_value=key):
signature = salt.crypt.sign_message(
"/keydir/keyname.pem", message, passphrase="password"
)
self.assertEqual(signature, self.SIGNATURE)
@pytest.mark.skipif(
not HAS_M2 and not HAS_PYCRYPTO_RSA,
reason="No crypto library found. Install either M2Crypto or Cryptodome to run this test",
)
class TestCrypt(TestCase):
def test_pwdata_decrypt(self):
key_string = """-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAzhBRyyHa7b63RLE71uKMKgrpulcAJjaIaN68ltXcCvy4w9pi
Kj+4I3Qp6RvUaHOEmymqyjOMjQc6iwpe0scCFqh3nUk5YYaLZ3WAW0htQVlnesgB
ZiBg9PBeTQY/LzqtudL6RCng/AX+fbnCsddlIysRxnUoNVMvz0gAmCY2mnTDjcTt
pyxuk2T0AHSHNCKCalm75L1bWDFF+UzFemf536tBfBUGRWR6jWTij85vvCntxHS/
HdknaTJ50E7XGVzwBJpCyV4Y2VXuW/3KrCNTqXw+jTmEw0vlcshfDg/vb3IxsUSK
5KuHalKq/nUIc+F4QCJOl+A10goGdIfYC1/67QIDAQABAoIBAAOP+qoFWtCTZH22
hq9PWVb8u0+yY1lFxhPyDdaZueUiu1r/coUCdv996Z+TEJgBr0AzdzVpsLtbbaKr
ujnwoNOdc/vvISPTfKN8P4zUcrcXgZd4z7VhR+vUH/0652q8m/ZDdHorMy2IOP8Z
cAk9DQ2PmA4TRm+tkX0G5KO8vWLsK921aRMWdsKJyQ0lYxl7M8JWupFsCJFr/U+8
dAVtwnUiS7RnhBABZ1cfNTHYhXVAh4d+a9y/gZ00a66OGqPxiXfhjjDUZ6fGvWKN
FlhKWEg6YqIx/H4aNXkLI5Rzzhdx/c2ukNm7+X2veRcAW7bcTwk8wxJxciEP5pBi
1el9VE0CgYEA/lbzdE2M4yRBvTfYYC6BqZcn+BqtrAUc2h3fEy+p7lwlet0af1id
gWpYpOJyLc0AUfR616/m2y3PwEH/nMKDSTuU7o/qKNtlHW0nQcnhDCjTUydS3+J/
JM3dhfgVqi03rjqNcgHA2eOEwcu/OBZtiaC0wqKbuRZRtfGffyoO3ssCgYEAz2iw
wqu/NkA+MdQIxz/a3Is7gGwoFu6h7O+XU2uN8Y2++jSBw9AzzWj31YCvyjuJPAE+
gxHm6yOnNoLVn423NtibHejhabzHNIK6UImH99bSTKabsxfF2BX6v982BimU1jwc
bYykzws37oN/poPb5FTpEiAUrsd2bAMn/1S43icCgYEAulHkY0z0aumCpyUkA8HO
BvjOtPiGRcAxFLBRXPLL3+vtIQachLHcIJRRf+jLkDXfiCo7W4pm6iWzTbqLkMEG
AD3/qowPFAM1Hct6uL01efzmYsIp+g0o60NMhvnolRQu+Bm4yM30AyqjdHzYBjSX
5fyuru8EeSCal1j8aOHcpuUCgYEAhGhDH6Pg59NPYSQJjpm3MMA59hwV473n5Yh2
xKyO6zwgRT6r8MPDrkhqnwQONT6Yt5PbwnT1Q/t4zhXsJnWkFwFk1U1MSeJYEa+7
HZsPECs2CfT6xPRSO0ac00y+AmUdPT8WruDwfbSdukh8f2MCR9vlBsswKPvxH7dM
G3aMplUCgYEAmMFgB/6Ox4OsQPPC6g4G+Ezytkc4iVkMEcjiVWzEsYATITjq3weO
/XDGBYJoBhYwWPi9oBufFc/2pNtWy1FKKXPuVyXQATdA0mfEPbtsHjMFQNZbeKnm
0na/SysSDCK3P+9ijlbjqLjMmPEmhJxGWTJ7khnTTkfre7/w9ZxJxi8=
-----END RSA PRIVATE KEY-----"""
pwdata = b"""\
V\x80+b\xca\x06M\xb6\x12\xc6\xe8\xf2\xb5\xbb\xd8m\xc0\x97\x9a\xeb\xb9q\x19\xc3\
\xcdi\xb84\x90\xaf\x12kT\xe2@u\xd6\xe8T\x89\xa3\xc7\xb2Y\xd1N\x00\xa9\xc0"\xbe\
\xed\xb1\xc3\xb7^\xbf\xbd\x8b\x13\xd3/L\x1b\xa1`\xe2\xea\x03\x98\x82\xf3uS&|\
\xe5\xd8J\xce\xfc\x97\x8d\x0b\x949\xc0\xbd^\xef\xc6\xfd\xce\xbb\x1e\xd0"(m\xe1\
\x95\xfb\xc8/\x07\x93\xb8\xda\x8f\x99\xfe\xdc\xd5\xcb\xdb\xb2\xf11M\xdbD\xcf\
\x95\x13p\r\xa4\x1c{\xd5\xdb\xc7\xe5\xaf\x95F\x97\xa9\x00p~\xb5\xec\xa4.\xd0\
\xa4\xb4\xf4f\xcds,Y/\xa1:WF\xb8\xc7\x07\xaa\x0b<\'~\x1b$D9\xd4\x8d\xf0x\xc5\
\xee\xa8:\xe6\x00\x10\xc5i\x11\xc7]C8\x05l\x8b\x9b\xc3\x83e\xf7y\xadi:0\xb4R\
\x1a(\x04&yL8\x19s\n\x11\x81\xfd?\xfb2\x80Ll\xa1\xdc\xc9\xb6P\xca\x8d\'\x11\xc1\
\x07\xa5\xa1\x058\xc7\xce\xbeb\x92\xbf\x0bL\xec\xdf\xc3M\x83\xfb$\xec\xd5\xf9\
"""
self.assertEqual("1234", salt.crypt.pwdata_decrypt(key_string, pwdata))