Add tests for garbage input

This commit is contained in:
Daniel A. Wozniak 2024-06-05 15:42:49 -07:00 committed by Daniel Wozniak
parent c408cddfaf
commit d4b4067ee4
2 changed files with 308 additions and 2 deletions

View file

@ -203,20 +203,26 @@ class BaseKey:
@staticmethod
def parse_padding_for_signing(algorithm):
if algorithm not in VALID_SIGNING_ALGORITHMS:
raise UnsupportedAlgorithm(f"Invalid signing algorithm: {algorithm}")
_pad, _hash = algorithm.split("-", 1)
if _pad not in VALID_PADDING_FOR_SIGNING:
raise Exception("Invalid padding algorithm")
raise UnsupportedAlgorithm(f"Invalid padding algorithm: {_pad}")
return getattr(padding, _pad)
@staticmethod
def parse_padding_for_encryption(algorithm):
if algorithm not in VALID_ENCRYPTION_ALGORITHMS:
raise UnsupportedAlgorithm(f"Invalid encryption algorithm: {algorithm}")
_pad, _hash = algorithm.split("-", 1)
if _pad not in VALID_PADDING_FOR_ENCRYPTION:
raise Exception("Invalid padding algorithm")
raise UnsupportedAlgorithm(f"Invalid padding algorithm: {_pad}")
return getattr(padding, _pad)
@staticmethod
def parse_hash(algorithm):
if "-" not in algorithm:
raise UnsupportedAlgorithm(f"Invalid encryption algorithm: {algorithm}")
_pad, _hash = algorithm.split("-", 1)
if _hash not in VALID_HASHES:
raise Exception("Invalid hashing algorithm")

View file

@ -1688,3 +1688,303 @@ async def test_unclosed_publish_client(minion_opts, io_loop):
client.__del__() # pylint: disable=unnecessary-dunder-call
finally:
client.close()
@pytest.mark.skipif(not FIPS_TESTRUN, reason="Only run on fips enabled platforms")
def test_req_server_auth_unsupported_sig_algo(
pki_dir, minion_opts, master_opts, caplog
):
minion_opts.update(
{
"master_uri": "tcp://127.0.0.1:4506",
"interface": "127.0.0.1",
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
"max_minions": 0,
"auto_accept": False,
"open_mode": False,
"key_pass": None,
"master_sign_pubkey": False,
"publish_port": 4505,
"auth_mode": 1,
}
)
SMaster.secrets["aes"] = {
"secret": multiprocessing.Array(
ctypes.c_char,
salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
),
"reload": salt.crypt.Crypticle.generate_key_string,
}
master_opts.update(pki_dir=str(pki_dir.joinpath("master")))
server = salt.channel.server.ReqServerChannel.factory(master_opts)
server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
server.cache_cli = False
server.event = salt.utils.event.get_master_event(
master_opts, master_opts["sock_dir"], listen=False
)
server.master_key = salt.crypt.MasterKeys(server.opts)
pub = salt.crypt.PublicKey(str(pki_dir.joinpath("master", "master.pub")))
token = pub.encrypt(
salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
algorithm=minion_opts["encryption_algorithm"],
)
nonce = uuid.uuid4().hex
# We need to read the public key with fopen otherwise the newlines might
# not match on windows.
with salt.utils.files.fopen(
str(pki_dir.joinpath("minion", "minion.pub")), "r"
) as fp:
pub_key = salt.crypt.clean_key(fp.read())
load = {
"version": 2,
"cmd": "_auth",
"id": "minion",
"token": token,
"pub": pub_key,
"nonce": "asdfse",
"enc_algo": minion_opts["encryption_algorithm"],
"sig_algo": salt.crypt.PKCS1v15_SHA1,
}
with caplog.at_level(logging.INFO):
ret = server._auth(load, sign_messages=True)
assert (
"Minion tried to authenticate with unsupported signing algorithm: PKCS1v15-SHA1"
in caplog.text
)
assert "load" in ret
assert "ret" in ret["load"]
assert ret["load"]["ret"] == "bad sig algo"
def test_req_server_auth_garbage_sig_algo(pki_dir, minion_opts, master_opts, caplog):
minion_opts.update(
{
"master_uri": "tcp://127.0.0.1:4506",
"interface": "127.0.0.1",
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
"max_minions": 0,
"auto_accept": False,
"open_mode": False,
"key_pass": None,
"master_sign_pubkey": False,
"publish_port": 4505,
"auth_mode": 1,
}
)
SMaster.secrets["aes"] = {
"secret": multiprocessing.Array(
ctypes.c_char,
salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
),
"reload": salt.crypt.Crypticle.generate_key_string,
}
master_opts.update(pki_dir=str(pki_dir.joinpath("master")))
server = salt.channel.server.ReqServerChannel.factory(master_opts)
server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
server.cache_cli = False
server.event = salt.utils.event.get_master_event(
master_opts, master_opts["sock_dir"], listen=False
)
server.master_key = salt.crypt.MasterKeys(server.opts)
pub = salt.crypt.PublicKey(str(pki_dir.joinpath("master", "master.pub")))
token = pub.encrypt(
salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
algorithm=minion_opts["encryption_algorithm"],
)
nonce = uuid.uuid4().hex
# We need to read the public key with fopen otherwise the newlines might
# not match on windows.
with salt.utils.files.fopen(
str(pki_dir.joinpath("minion", "minion.pub")), "r"
) as fp:
pub_key = salt.crypt.clean_key(fp.read())
load = {
"version": 2,
"cmd": "_auth",
"id": "minion",
"token": token,
"pub": pub_key,
"nonce": "asdfse",
"enc_algo": minion_opts["encryption_algorithm"],
"sig_algo": "IAMNOTANALGO",
}
with caplog.at_level(logging.INFO):
ret = server._auth(load, sign_messages=True)
assert (
"Minion tried to authenticate with unsupported signing algorithm: IAMNOTANALGO"
in caplog.text
)
assert "load" in ret
assert "ret" in ret["load"]
assert ret["load"]["ret"] == "bad sig algo"
@pytest.mark.skipif(not FIPS_TESTRUN, reason="Only run on fips enabled platforms")
def test_req_server_auth_unsupported_enc_algo(
pki_dir, minion_opts, master_opts, caplog
):
minion_opts.update(
{
"master_uri": "tcp://127.0.0.1:4506",
"interface": "127.0.0.1",
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
"max_minions": 0,
"auto_accept": False,
"open_mode": False,
"key_pass": None,
"master_sign_pubkey": False,
"publish_port": 4505,
"auth_mode": 1,
}
)
SMaster.secrets["aes"] = {
"secret": multiprocessing.Array(
ctypes.c_char,
salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
),
"reload": salt.crypt.Crypticle.generate_key_string,
}
master_opts.update(pki_dir=str(pki_dir.joinpath("master")))
server = salt.channel.server.ReqServerChannel.factory(master_opts)
server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
server.cache_cli = False
server.event = salt.utils.event.get_master_event(
master_opts, master_opts["sock_dir"], listen=False
)
server.master_key = salt.crypt.MasterKeys(server.opts)
import tests.pytests.unit.crypt
pub = tests.pytests.unit.crypt.LegacyPublicKey(
str(pki_dir.joinpath("master", "master.pub"))
)
token = pub.encrypt(
salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
)
nonce = uuid.uuid4().hex
# We need to read the public key with fopen otherwise the newlines might
# not match on windows.
with salt.utils.files.fopen(
str(pki_dir.joinpath("minion", "minion.pub")), "r"
) as fp:
pub_key = salt.crypt.clean_key(fp.read())
load = {
"version": 2,
"cmd": "_auth",
"id": "minion",
"token": token,
"pub": pub_key,
"nonce": "asdfse",
"enc_algo": "OAEP-SHA1",
"sig_algo": minion_opts["signing_algorithm"],
}
with caplog.at_level(logging.INFO):
ret = server._auth(load, sign_messages=True)
assert (
"Minion minion tried to authenticate with unsupported encryption algorithm: OAEP-SHA1"
in caplog.text
)
assert "load" in ret
assert "ret" in ret["load"]
assert ret["load"]["ret"] == "bad enc algo"
def test_req_server_auth_garbage_enc_algo(pki_dir, minion_opts, master_opts, caplog):
minion_opts.update(
{
"master_uri": "tcp://127.0.0.1:4506",
"interface": "127.0.0.1",
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
"max_minions": 0,
"auto_accept": False,
"open_mode": False,
"key_pass": None,
"master_sign_pubkey": False,
"publish_port": 4505,
"auth_mode": 1,
}
)
SMaster.secrets["aes"] = {
"secret": multiprocessing.Array(
ctypes.c_char,
salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
),
"reload": salt.crypt.Crypticle.generate_key_string,
}
master_opts.update(pki_dir=str(pki_dir.joinpath("master")))
server = salt.channel.server.ReqServerChannel.factory(master_opts)
server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
server.cache_cli = False
server.event = salt.utils.event.get_master_event(
master_opts, master_opts["sock_dir"], listen=False
)
server.master_key = salt.crypt.MasterKeys(server.opts)
import tests.pytests.unit.crypt
pub = tests.pytests.unit.crypt.LegacyPublicKey(
str(pki_dir.joinpath("master", "master.pub"))
)
token = pub.encrypt(
salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()),
)
nonce = uuid.uuid4().hex
# We need to read the public key with fopen otherwise the newlines might
# not match on windows.
with salt.utils.files.fopen(
str(pki_dir.joinpath("minion", "minion.pub")), "r"
) as fp:
pub_key = salt.crypt.clean_key(fp.read())
load = {
"version": 2,
"cmd": "_auth",
"id": "minion",
"token": token,
"pub": pub_key,
"nonce": "asdfse",
"enc_algo": "IAMNOTAENCALGO",
"sig_algo": minion_opts["signing_algorithm"],
}
with caplog.at_level(logging.INFO):
ret = server._auth(load, sign_messages=True)
assert (
"Minion minion tried to authenticate with unsupported encryption algorithm: IAMNOTAENCALGO"
in caplog.text
)
assert "load" in ret
assert "ret" in ret["load"]
assert ret["load"]["ret"] == "bad enc algo"