Fix merge forward duplicate tests

This commit is contained in:
Daniel A. Wozniak 2024-06-22 11:42:42 -07:00
parent 5f1b51901c
commit 84a9175e2c
3 changed files with 355 additions and 1507 deletions

View file

@ -55,42 +55,3 @@ def test_gen_keys(tmp_path):
salt.crypt.gen_keys(key_path, "keyname", 2048)
assert open_priv_wb in m_open.calls
assert open_pub_wb in m_open.calls
@pytest.mark.slow_test
def test_gen_keys_with_passphrase(tmp_path):
key_path = str(tmp_path / "keydir")
open_priv_wb = MockCall(os.path.join(key_path, "keyname.pem"), "wb+")
open_pub_wb = MockCall(os.path.join(key_path, "keyname.pub"), "wb+")
real_is_file = os.path.isfile
def is_file(path):
if path.startswith(str(tmp_path)):
return False
return real_is_file(path)
with patch.multiple(
os,
umask=MagicMock(),
chmod=MagicMock(),
access=MagicMock(return_value=True),
):
salt.crypt.gen_keys(key_path, "keyname", 2048)
with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
"os.path.isfile", return_value=True
):
result = salt.crypt.gen_keys(
key_path, "keyname", 2048, passphrase="password"
)
assert result == os.path.join(key_path, "keyname.pem")
assert open_priv_wb not in m_open.calls
assert open_pub_wb not in m_open.calls
with patch("salt.utils.files.fopen", mock_open()) as m_open, patch(
"salt.crypt.os.path.isfile", is_file
):
salt.crypt.gen_keys(key_path, "keyname", 2048)
assert open_priv_wb in m_open.calls
assert open_pub_wb in m_open.calls

View file

@ -22,19 +22,9 @@ import salt.transport.zeromq
import salt.utils.process
import salt.utils.stringutils
from salt.master import SMaster
from tests.conftest import FIPS_TESTRUN
from tests.support.mock import MagicMock, patch
try:
from M2Crypto import RSA
HAS_M2 = True
except ImportError:
HAS_M2 = False
try:
from Cryptodome.Cipher import PKCS1_OAEP
except ImportError:
from Crypto.Cipher import PKCS1_OAEP # nosec
log = logging.getLogger(__name__)
@ -218,6 +208,20 @@ oQIDAQAB
AES_KEY = "8wxWlOaMMQ4d3yT74LL4+hGrGTf65w8VgrcNjLJeLRQ2Q6zMa8ItY2EQUgMKKDb7JY+RnPUxbB0="
@pytest.fixture
def signing_algorithm():
if FIPS_TESTRUN:
return salt.crypt.PKCS1v15_SHA224
return salt.crypt.PKCS1v15_SHA1
@pytest.fixture
def encryption_algorithm():
if FIPS_TESTRUN:
return salt.crypt.OAEP_SHA224
return salt.crypt.OAEP_SHA1
@pytest.fixture
def pki_dir(tmp_path):
_pki_dir = tmp_path / "pki"
@ -478,58 +482,9 @@ def test_serverside_exception(temp_salt_minion, temp_salt_master):
assert ret == "Server-side exception handling payload"
def test_req_server_chan_encrypt_v2(master_opts, pki_dir):
loop = tornado.ioloop.IOLoop.current()
master_opts.update(
{
"worker_threads": 1,
"master_uri": "tcp://127.0.0.1:4506",
"interface": "127.0.0.1",
"ret_port": 4506,
"ipv6": False,
"zmq_monitor": False,
"mworker_queue_niceness": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.joinpath("master")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
}
)
server = salt.channel.server.ReqServerChannel.factory(master_opts)
dictkey = "pillar"
nonce = "abcdefg"
pillar_data = {"pillar1": "meh"}
try:
ret = server._encrypt_private(pillar_data, dictkey, "minion", nonce)
assert "key" in ret
assert dictkey in ret
key = salt.crypt.get_rsa_key(
str(pki_dir.joinpath("minion", "minion.pem")), None
)
if HAS_M2:
aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
else:
cipher = PKCS1_OAEP.new(key) # pylint: disable=used-before-assignment
aes = cipher.decrypt(ret["key"])
pcrypt = salt.crypt.Crypticle(master_opts, aes)
signed_msg = pcrypt.loads(ret[dictkey])
assert "sig" in signed_msg
assert "data" in signed_msg
data = salt.payload.loads(signed_msg["data"])
assert "key" in data
assert data["key"] == ret["key"]
assert "key" in data
assert data["nonce"] == nonce
assert "pillar" in data
assert data["pillar"] == pillar_data
finally:
server.close()
def test_req_server_chan_encrypt_v1(master_opts, pki_dir):
def test_req_server_chan_encrypt_v2(
pki_dir, encryption_algorithm, signing_algorithm, master_opts
):
loop = tornado.ioloop.IOLoop.current()
master_opts.update(
{
@ -553,20 +508,70 @@ def test_req_server_chan_encrypt_v1(master_opts, pki_dir):
pillar_data = {"pillar1": "meh"}
try:
ret = server._encrypt_private(
pillar_data, dictkey, "minion", sign_messages=False
pillar_data,
dictkey,
"minion",
nonce,
encryption_algorithm=encryption_algorithm,
signing_algorithm=signing_algorithm,
)
assert "key" in ret
assert dictkey in ret
key = salt.crypt.PrivateKey(str(pki_dir.joinpath("minion", "minion.pem")))
aes = key.decrypt(ret["key"], encryption_algorithm)
pcrypt = salt.crypt.Crypticle(master_opts, aes)
signed_msg = pcrypt.loads(ret[dictkey])
assert "sig" in signed_msg
assert "data" in signed_msg
data = salt.payload.loads(signed_msg["data"])
assert "key" in data
assert data["key"] == ret["key"]
assert "key" in data
assert data["nonce"] == nonce
assert "pillar" in data
assert data["pillar"] == pillar_data
finally:
server.close()
def test_req_server_chan_encrypt_v1(pki_dir, encryption_algorithm, master_opts):
loop = tornado.ioloop.IOLoop.current()
master_opts.update(
{
"worker_threads": 1,
"master_uri": "tcp://127.0.0.1:4506",
"interface": "127.0.0.1",
"ret_port": 4506,
"ipv6": False,
"zmq_monitor": False,
"mworker_queue_niceness": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.joinpath("master")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
}
)
server = salt.channel.server.ReqServerChannel.factory(master_opts)
dictkey = "pillar"
nonce = "abcdefg"
pillar_data = {"pillar1": "meh"}
try:
ret = server._encrypt_private(
pillar_data,
dictkey,
"minion",
sign_messages=False,
encryption_algorithm=encryption_algorithm,
)
assert "key" in ret
assert dictkey in ret
key = salt.crypt.get_rsa_key(
str(pki_dir.joinpath("minion", "minion.pem")), None
)
if HAS_M2:
aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
else:
cipher = PKCS1_OAEP.new(key)
aes = cipher.decrypt(ret["key"])
key = salt.crypt.PrivateKey(str(pki_dir.joinpath("minion", "minion.pem")))
aes = key.decrypt(ret["key"], encryption_algorithm)
pcrypt = salt.crypt.Crypticle(master_opts, aes)
data = pcrypt.loads(ret[dictkey])
assert data == pillar_data
@ -574,7 +579,9 @@ def test_req_server_chan_encrypt_v1(master_opts, pki_dir):
server.close()
def test_req_chan_decode_data_dict_entry_v1(minion_opts, master_opts, pki_dir):
def test_req_chan_decode_data_dict_entry_v1(
pki_dir, encryption_algorithm, minion_opts, master_opts
):
mockloop = MagicMock()
minion_opts.update(
{
@ -591,20 +598,22 @@ def test_req_chan_decode_data_dict_entry_v1(minion_opts, master_opts, pki_dir):
"acceptance_wait_time_max": 3,
}
)
master_opts.update(pki_dir=str(pki_dir.joinpath("master")))
master_opts = dict(master_opts, pki_dir=str(pki_dir.joinpath("master")))
server = salt.channel.server.ReqServerChannel.factory(master_opts)
client = salt.channel.client.ReqChannel.factory(minion_opts, io_loop=mockloop)
try:
dictkey = "pillar"
target = "minion"
pillar_data = {"pillar1": "meh"}
ret = server._encrypt_private(pillar_data, dictkey, target, sign_messages=False)
ret = server._encrypt_private(
pillar_data,
dictkey,
target,
sign_messages=False,
encryption_algorithm=encryption_algorithm,
)
key = client.auth.get_keys()
if HAS_M2:
aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
else:
cipher = PKCS1_OAEP.new(key)
aes = cipher.decrypt(ret["key"])
aes = key.decrypt(ret["key"], encryption_algorithm)
pcrypt = salt.crypt.Crypticle(client.opts, aes)
ret_pillar_data = pcrypt.loads(ret[dictkey])
assert ret_pillar_data == pillar_data
@ -647,16 +656,22 @@ async def test_req_chan_decode_data_dict_entry_v2(minion_opts, master_opts, pki_
client.auth.get_keys = auth.get_keys
client.auth.crypticle.dumps = auth.crypticle.dumps
client.auth.crypticle.loads = auth.crypticle.loads
real_transport = client.transport
client.transport = MagicMock()
real_transport.close()
print(minion_opts["encryption_algorithm"])
@tornado.gen.coroutine
def mocksend(msg, timeout=60, tries=3):
client.transport.msg = msg
load = client.auth.crypticle.loads(msg["load"])
ret = server._encrypt_private(
pillar_data, dictkey, target, nonce=load["nonce"], sign_messages=True
pillar_data,
dictkey,
target,
nonce=load["nonce"],
sign_messages=True,
encryption_algorithm=minion_opts["encryption_algorithm"],
signing_algorithm=minion_opts["signing_algorithm"],
)
raise tornado.gen.Return(ret)
@ -764,7 +779,7 @@ async def test_req_chan_decode_data_dict_entry_v2_bad_nonce(
async def test_req_chan_decode_data_dict_entry_v2_bad_signature(
minion_opts, master_opts, pki_dir
pki_dir, minion_opts, master_opts
):
mockloop = MagicMock()
minion_opts.update(
@ -800,24 +815,24 @@ async def test_req_chan_decode_data_dict_entry_v2_bad_signature(
client.auth.get_keys = auth.get_keys
client.auth.crypticle.dumps = auth.crypticle.dumps
client.auth.crypticle.loads = auth.crypticle.loads
real_transport = client.transport
client.transport = MagicMock()
real_transport.close()
@tornado.gen.coroutine
def mocksend(msg, timeout=60, tries=3):
client.transport.msg = msg
load = client.auth.crypticle.loads(msg["load"])
ret = server._encrypt_private(
pillar_data, dictkey, target, nonce=load["nonce"], sign_messages=True
pillar_data,
dictkey,
target,
nonce=load["nonce"],
sign_messages=True,
encryption_algorithm=minion_opts["encryption_algorithm"],
signing_algorithm=minion_opts["signing_algorithm"],
)
key = client.auth.get_keys()
if HAS_M2:
aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
else:
cipher = PKCS1_OAEP.new(key)
aes = cipher.decrypt(ret["key"])
aes = key.decrypt(ret["key"], minion_opts["encryption_algorithm"])
pcrypt = salt.crypt.Crypticle(client.opts, aes)
signed_msg = pcrypt.loads(ret[dictkey])
# Changing the pillar data will cause the signature verification to
@ -856,7 +871,7 @@ async def test_req_chan_decode_data_dict_entry_v2_bad_signature(
async def test_req_chan_decode_data_dict_entry_v2_bad_key(
minion_opts, master_opts, pki_dir
pki_dir, minion_opts, master_opts
):
mockloop = MagicMock()
minion_opts.update(
@ -885,46 +900,42 @@ async def test_req_chan_decode_data_dict_entry_v2_bad_key(
# Mock auth and message client.
auth = client.auth
auth._crypticle = salt.crypt.Crypticle(minion_opts, AES_KEY)
auth._crypticle = salt.crypt.Crypticle(master_opts, AES_KEY)
client.auth = MagicMock()
client.auth.mpub = auth.mpub
client.auth.authenticated = True
client.auth.get_keys = auth.get_keys
client.auth.crypticle.dumps = auth.crypticle.dumps
client.auth.crypticle.loads = auth.crypticle.loads
real_transport = client.transport
client.transport = MagicMock()
real_transport.close()
@tornado.gen.coroutine
def mocksend(msg, timeout=60, tries=3):
client.transport.msg = msg
load = client.auth.crypticle.loads(msg["load"])
ret = server._encrypt_private(
pillar_data, dictkey, target, nonce=load["nonce"], sign_messages=True
pillar_data,
dictkey,
target,
nonce=load["nonce"],
sign_messages=True,
encryption_algorithm=minion_opts["encryption_algorithm"],
signing_algorithm=minion_opts["signing_algorithm"],
)
key = client.auth.get_keys()
if HAS_M2:
aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
else:
cipher = PKCS1_OAEP.new(key)
aes = cipher.decrypt(ret["key"])
mkey = client.auth.get_keys()
aes = mkey.decrypt(ret["key"], minion_opts["encryption_algorithm"])
pcrypt = salt.crypt.Crypticle(client.opts, aes)
signed_msg = pcrypt.loads(ret[dictkey])
# Now encrypt with a different key
key = salt.crypt.Crypticle.generate_key_string()
pcrypt = salt.crypt.Crypticle(minion_opts, key)
pcrypt = salt.crypt.Crypticle(master_opts, key)
pubfn = os.path.join(master_opts["pki_dir"], "minions", "minion")
pub = salt.crypt.get_rsa_pub_key(pubfn)
pub = salt.crypt.PublicKey(pubfn)
ret[dictkey] = pcrypt.dumps(signed_msg)
key = salt.utils.stringutils.to_bytes(key)
if HAS_M2:
ret["key"] = pub.public_encrypt(key, RSA.pkcs1_oaep_padding)
else:
cipher = PKCS1_OAEP.new(pub)
ret["key"] = cipher.encrypt(key)
ret["key"] = pub.encrypt(key, minion_opts["encryption_algorithm"])
raise tornado.gen.Return(ret)
client.transport.send = mocksend
@ -941,7 +952,6 @@ async def test_req_chan_decode_data_dict_entry_v2_bad_key(
"ver": "2",
"cmd": "_pillar",
}
try:
with pytest.raises(salt.crypt.AuthenticationError) as excinfo:
await client.crypted_transfer_decode_dictentry(
@ -1128,7 +1138,7 @@ async def test_req_chan_auth_v2(minion_opts, master_opts, pki_dir, io_loop):
async def test_req_chan_auth_v2_with_master_signing(
minion_opts, master_opts, pki_dir, io_loop
pki_dir, io_loop, minion_opts, master_opts
):
minion_opts.update(
{
@ -1158,14 +1168,17 @@ async def test_req_chan_auth_v2_with_master_signing(
),
"reload": salt.crypt.Crypticle.generate_key_string,
}
master_opts.update(pki_dir=str(pki_dir.joinpath("master")))
master_opts = dict(master_opts, pki_dir=str(pki_dir.joinpath("master")))
master_opts["master_sign_pubkey"] = True
master_opts["master_use_pubkey_signature"] = False
master_opts["signing_key_pass"] = True
master_opts["signing_key_pass"] = ""
master_opts["master_sign_key_name"] = "master_sign"
server = salt.channel.server.ReqServerChannel.factory(master_opts)
server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
server.cache_cli = False
server.event = salt.utils.event.get_master_event(
master_opts, master_opts["sock_dir"], listen=False
)
server.master_key = salt.crypt.MasterKeys(server.opts)
minion_opts["verify_master_pubkey_sign"] = True
minion_opts["always_verify_signature"] = True
@ -1205,6 +1218,9 @@ async def test_req_chan_auth_v2_with_master_signing(
server = salt.channel.server.ReqServerChannel.factory(master_opts)
server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
server.cache_cli = False
server.event = salt.utils.event.get_master_event(
master_opts, master_opts["sock_dir"], listen=False
)
server.master_key = salt.crypt.MasterKeys(server.opts)
signin_payload = client.auth.minion_sign_in_payload()
@ -1448,3 +1464,228 @@ async def test_req_chan_bad_payload_to_decode(
server._decode_payload({})
with pytest.raises(salt.exceptions.SaltDeserializationError):
server._decode_payload(12345)
def test_req_server_auth_garbage_sig_algo(pki_dir, minion_opts, master_opts, caplog):
minion_opts.update(
{
"master_uri": "tcp://127.0.0.1:4506",
"interface": "127.0.0.1",
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
"max_minions": 0,
"auto_accept": False,
"open_mode": False,
"key_pass": None,
"master_sign_pubkey": False,
"publish_port": 4505,
"auth_mode": 1,
}
)
SMaster.secrets["aes"] = {
"secret": multiprocessing.Array(
ctypes.c_char,
salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
),
"reload": salt.crypt.Crypticle.generate_key_string,
}
master_opts.update(pki_dir=str(pki_dir.joinpath("master")))
server = salt.channel.server.ReqServerChannel.factory(master_opts)
server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
server.cache_cli = False
server.event = salt.utils.event.get_master_event(
master_opts, master_opts["sock_dir"], listen=False
)
server.master_key = salt.crypt.MasterKeys(server.opts)
pub = salt.crypt.PublicKey(str(pki_dir.joinpath("master", "master.pub")))
token = pub.encrypt(
salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
algorithm=minion_opts["encryption_algorithm"],
)
nonce = uuid.uuid4().hex
# We need to read the public key with fopen otherwise the newlines might
# not match on windows.
with salt.utils.files.fopen(
str(pki_dir.joinpath("minion", "minion.pub")), "r"
) as fp:
pub_key = salt.crypt.clean_key(fp.read())
load = {
"version": 2,
"cmd": "_auth",
"id": "minion",
"token": token,
"pub": pub_key,
"nonce": "asdfse",
"enc_algo": minion_opts["encryption_algorithm"],
"sig_algo": "IAMNOTANALGO",
}
with caplog.at_level(logging.INFO):
ret = server._auth(load, sign_messages=True)
assert (
"Minion tried to authenticate with unsupported signing algorithm: IAMNOTANALGO"
in caplog.text
)
assert "load" in ret
assert "ret" in ret["load"]
assert ret["load"]["ret"] == "bad sig algo"
@pytest.mark.skipif(not FIPS_TESTRUN, reason="Only run on fips enabled platforms")
def test_req_server_auth_unsupported_enc_algo(
pki_dir, minion_opts, master_opts, caplog
):
minion_opts.update(
{
"master_uri": "tcp://127.0.0.1:4506",
"interface": "127.0.0.1",
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
"max_minions": 0,
"auto_accept": False,
"open_mode": False,
"key_pass": None,
"master_sign_pubkey": False,
"publish_port": 4505,
"auth_mode": 1,
}
)
SMaster.secrets["aes"] = {
"secret": multiprocessing.Array(
ctypes.c_char,
salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
),
"reload": salt.crypt.Crypticle.generate_key_string,
}
master_opts.update(pki_dir=str(pki_dir.joinpath("master")))
server = salt.channel.server.ReqServerChannel.factory(master_opts)
server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
server.cache_cli = False
server.event = salt.utils.event.get_master_event(
master_opts, master_opts["sock_dir"], listen=False
)
server.master_key = salt.crypt.MasterKeys(server.opts)
import tests.pytests.unit.crypt
pub = tests.pytests.unit.crypt.LegacyPublicKey(
str(pki_dir.joinpath("master", "master.pub"))
)
token = pub.encrypt(
salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
)
nonce = uuid.uuid4().hex
# We need to read the public key with fopen otherwise the newlines might
# not match on windows.
with salt.utils.files.fopen(
str(pki_dir.joinpath("minion", "minion.pub")), "r"
) as fp:
pub_key = salt.crypt.clean_key(fp.read())
load = {
"version": 2,
"cmd": "_auth",
"id": "minion",
"token": token,
"pub": pub_key,
"nonce": "asdfse",
"enc_algo": "OAEP-SHA1",
"sig_algo": minion_opts["signing_algorithm"],
}
with caplog.at_level(logging.INFO):
ret = server._auth(load, sign_messages=True)
assert (
"Minion minion tried to authenticate with unsupported encryption algorithm: OAEP-SHA1"
in caplog.text
)
assert "load" in ret
assert "ret" in ret["load"]
assert ret["load"]["ret"] == "bad enc algo"
def test_req_server_auth_garbage_enc_algo(pki_dir, minion_opts, master_opts, caplog):
minion_opts.update(
{
"master_uri": "tcp://127.0.0.1:4506",
"interface": "127.0.0.1",
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
"max_minions": 0,
"auto_accept": False,
"open_mode": False,
"key_pass": None,
"master_sign_pubkey": False,
"publish_port": 4505,
"auth_mode": 1,
}
)
SMaster.secrets["aes"] = {
"secret": multiprocessing.Array(
ctypes.c_char,
salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
),
"reload": salt.crypt.Crypticle.generate_key_string,
}
master_opts.update(pki_dir=str(pki_dir.joinpath("master")))
server = salt.channel.server.ReqServerChannel.factory(master_opts)
server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
server.cache_cli = False
server.event = salt.utils.event.get_master_event(
master_opts, master_opts["sock_dir"], listen=False
)
server.master_key = salt.crypt.MasterKeys(server.opts)
import tests.pytests.unit.crypt
pub = tests.pytests.unit.crypt.LegacyPublicKey(
str(pki_dir.joinpath("master", "master.pub"))
)
token = pub.encrypt(
salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
)
nonce = uuid.uuid4().hex
# We need to read the public key with fopen otherwise the newlines might
# not match on windows.
with salt.utils.files.fopen(
str(pki_dir.joinpath("minion", "minion.pub")), "r"
) as fp:
pub_key = salt.crypt.clean_key(fp.read())
load = {
"version": 2,
"cmd": "_auth",
"id": "minion",
"token": token,
"pub": pub_key,
"nonce": "asdfse",
"enc_algo": "IAMNOTAENCALGO",
"sig_algo": minion_opts["signing_algorithm"],
}
with caplog.at_level(logging.INFO):
ret = server._auth(load, sign_messages=True)
assert (
"Minion minion tried to authenticate with unsupported encryption algorithm: IAMNOTAENCALGO"
in caplog.text
)
assert "load" in ret
assert "ret" in ret["load"]
assert ret["load"]["ret"] == "bad enc algo"

File diff suppressed because it is too large Load diff