More test update to support fips

This commit is contained in:
Daniel A. Wozniak 2024-05-25 21:29:40 -07:00 committed by Daniel Wozniak
parent 6d1cb70a37
commit 73d35c9d54
6 changed files with 25 additions and 84 deletions

View file

@ -192,7 +192,7 @@ def get_dependencies():
"""
deps = {
"requests": HAS_REQUESTS,
"pycrypto or m2crypto": salt.crypt.HAS_M2 or salt.crypt.HAS_CRYPTO,
"cryptography": salt.crypt.HAS_CRYPTOGRAPHY,
}
return config.check_driver_dependencies(__virtualname__, deps)
@ -4929,7 +4929,7 @@ def get_password_data(
for item in data:
ret[next(iter(item.keys()))] = next(iter(item.values()))
if not salt.crypt.HAS_M2 and not salt.crypt.HAS_CRYPTO:
if not salt.crypt.HAS_CRYPTOGRAPHY:
if "key" in kwargs or "key_file" in kwargs:
log.warning("No crypto library is installed, can not decrypt password")
return ret

View file

@ -8,6 +8,7 @@ import pytest
from pytestshellutils.exceptions import FactoryTimeout
import salt.utils.platform
from tests.conftest import FIPS_TESTRUN
log = logging.getLogger(__name__)
@ -20,6 +21,10 @@ def salt_mm_master_1(request, salt_factories):
}
config_overrides = {
"interface": "127.0.0.1",
"fips_mode": FIPS_TESTRUN,
"publish_signing_algorithm": (
"PKCS1v15-SHA224" if FIPS_TESTRUN else "PKCS1v15-SHA224"
),
}
factory = salt_factories.salt_master_daemon(
@ -48,6 +53,10 @@ def salt_mm_master_2(salt_factories, salt_mm_master_1):
}
config_overrides = {
"interface": "127.0.0.2",
"fips_mode": FIPS_TESTRUN,
"publish_signing_algorithm": (
"PKCS1v15-SHA224" if FIPS_TESTRUN else "PKCS1v15-SHA224"
),
}
# Use the same ports for both masters, they are binding to different interfaces
@ -95,6 +104,9 @@ def salt_mm_minion_1(salt_mm_master_1, salt_mm_master_2):
f"{mm_master_2_addr}:{mm_master_2_port}",
],
"test.foo": "baz",
"fips_mode": FIPS_TESTRUN,
"encryption_algorithm": "OAEP-SHA224" if FIPS_TESTRUN else "OAEP-SHA1",
"signing_algorithm": "PKCS1v15-SHA224" if FIPS_TESTRUN else "PKCS1v15-SHA1",
}
factory = salt_mm_master_1.salt_minion_daemon(
"mm-minion-1",
@ -122,6 +134,9 @@ def salt_mm_minion_2(salt_mm_master_1, salt_mm_master_2):
f"{mm_master_2_addr}:{mm_master_2_port}",
],
"test.foo": "baz",
"fips_mode": FIPS_TESTRUN,
"encryption_algorithm": "OAEP-SHA224" if FIPS_TESTRUN else "OAEP-SHA1",
"signing_algorithm": "PKCS1v15-SHA224" if FIPS_TESTRUN else "PKCS1v15-SHA1",
}
factory = salt_mm_master_2.salt_minion_daemon(
"mm-minion-2",

View file

@ -64,7 +64,6 @@ def legacy_gen_keys(keydir, keyname, keysize, user=None, passphrase=None):
if HAS_M2:
gen = RSA.gen_key(keysize, 65537, lambda: None)
else:
salt.utils.crypt.reinit_crypto()
gen = RSA.generate(bits=keysize, e=65537)
if os.path.isfile(priv):
@ -115,6 +114,7 @@ def legacy_gen_keys(keydir, keyname, keysize, user=None, passphrase=None):
class LegacyPrivateKey:
def __init__(self, path, passphrase=None):
if HAS_M2:
self.key = RSA.load_key(path, lambda x: bytes(passphrase))

View file

@ -5,33 +5,6 @@ import pytest
import salt.crypt
from tests.support.mock import MagicMock, MockCall, mock_open, patch
from . import MSG, PRIVKEY_DATA, PUBKEY_DATA, SIG
try:
import M2Crypto # pylint: disable=unused-import
HAS_M2 = True
except ImportError:
HAS_M2 = False
try:
from Cryptodome.PublicKey import RSA
HAS_PYCRYPTO_RSA = True
except ImportError:
HAS_PYCRYPTO_RSA = False
if not HAS_PYCRYPTO_RSA:
try:
from Crypto.PublicKey import RSA # nosec
HAS_PYCRYPTO_RSA = True
except ImportError:
HAS_PYCRYPTO_RSA = False
pytestmark = [
pytest.mark.skipif(not HAS_PYCRYPTO_RSA, reason="pycrypto >= 2.6 is not available"),
pytest.mark.skipif(HAS_M2, reason="m2crypto is used by salt.crypt if installed"),
]
@pytest.mark.slow_test
def test_gen_keys():
@ -87,22 +60,3 @@ def test_gen_keys_with_passphrase():
salt.crypt.gen_keys(key_path, "keyname", 2048)
assert open_priv_wb in m_open.calls
assert open_pub_wb in m_open.calls
def test_sign_message():
key = RSA.importKey(PRIVKEY_DATA)
with patch("salt.crypt.get_rsa_key", return_value=key):
assert SIG == salt.crypt.sign_message("/keydir/keyname.pem", MSG)
def test_sign_message_with_passphrase():
key = RSA.importKey(PRIVKEY_DATA)
with patch("salt.crypt.get_rsa_key", return_value=key):
assert SIG == salt.crypt.sign_message(
"/keydir/keyname.pem", MSG, passphrase="password"
)
def test_verify_signature():
with patch("salt.utils.files.fopen", mock_open(read_data=PUBKEY_DATA)):
assert salt.crypt.verify_signature("/keydir/keyname.pub", MSG, SIG)

View file

@ -738,39 +738,6 @@ def test_gen_modules_executors(minion_opts):
minion.destroy()
def test_reinit_crypto_on_fork(minion_opts):
"""
Ensure salt.utils.crypt.reinit_crypto() is executed when forking for new job
"""
minion_opts["multiprocessing"] = True
with patch("salt.utils.process.default_signals"):
io_loop = salt.ext.tornado.ioloop.IOLoop()
io_loop.make_current()
minion = salt.minion.Minion(minion_opts, io_loop=io_loop)
job_data = {"jid": "test-jid", "fun": "test.ping"}
def mock_start(self):
# pylint: disable=comparison-with-callable
assert (
len(
[
x
for x in self._after_fork_methods
if x[0] == salt.utils.crypt.reinit_crypto
]
)
== 1
)
# pylint: enable=comparison-with-callable
with patch.object(
salt.utils.process.SignalHandlingProcess, "start", mock_start
):
io_loop.run_sync(lambda: minion._handle_decoded_payload(job_data))
def test_minion_manage_schedule(minion_opts):
"""
Tests that the manage_schedule will call the add function, adding

View file

@ -404,10 +404,11 @@ async def test_when_async_req_channel_with_syndic_role_should_use_syndic_master_
"transport": "tcp",
"acceptance_wait_time": 30,
"acceptance_wait_time_max": 30,
"signing_algorithm": "MOCK",
}
client = salt.channel.client.ReqChannel.factory(opts, io_loop=mockloop)
assert client.master_pubkey_path == expected_pubkey_path
with patch("salt.crypt.verify_signature") as mock:
with patch("salt.crypt.PublicKey", return_value=MagicMock()) as mock:
client.verify_signature("mockdata", "mocksig")
assert mock.call_args_list[0][0][0] == expected_pubkey_path
@ -432,7 +433,11 @@ async def test_mixin_should_use_correct_path_when_syndic(
}
client = salt.channel.client.AsyncPubChannel.factory(opts, io_loop=mockloop)
client.master_pubkey_path = expected_pubkey_path
payload = {"sig": "abc", "load": {"foo": "bar"}}
payload = {
"sig": "abc",
"load": {"foo": "bar"},
"sig_algo": salt.crypt.PKCS1v15_SHA224,
}
with patch("salt.crypt.verify_signature") as mock:
client._verify_master_signature(payload)
assert mock.call_args_list[0][0][0] == expected_pubkey_path