diff --git a/changelog/66528.added.md b/changelog/66528.added.md new file mode 100644 index 00000000000..fdb71aba9ed --- /dev/null +++ b/changelog/66528.added.md @@ -0,0 +1 @@ +Added an `asymmetric` execution module for signing/verifying data using raw asymmetric algorithms diff --git a/doc/ref/modules/all/index.rst b/doc/ref/modules/all/index.rst index b794d724639..61fea5ff143 100644 --- a/doc/ref/modules/all/index.rst +++ b/doc/ref/modules/all/index.rst @@ -34,6 +34,7 @@ execution modules archive arista_pyeapi artifactory + asymmetric at at_solaris baredoc diff --git a/doc/ref/modules/all/salt.modules.asymmetric.rst b/doc/ref/modules/all/salt.modules.asymmetric.rst new file mode 100644 index 00000000000..babca913d87 --- /dev/null +++ b/doc/ref/modules/all/salt.modules.asymmetric.rst @@ -0,0 +1,6 @@ +salt.modules.asymmetric +======================= + +.. automodule:: salt.modules.asymmetric + :members: + :undoc-members: diff --git a/salt/modules/asymmetric.py b/salt/modules/asymmetric.py new file mode 100644 index 00000000000..a0e5b614002 --- /dev/null +++ b/salt/modules/asymmetric.py @@ -0,0 +1,330 @@ +""" +.. versionadded:: 3008.0 + +Low-level asymmetric cryptographic operations. + +:depends: cryptography + +.. note:: + + All parameters that take a public key or private key can be specified either + as a PEM/hex/base64 string or a path to a local file encoded in all supported + formats for the type. + + A signature can be specified as a base64 string or a path to a file with the + raw signature or its base64 encoding. + + Public keys and signatures can additionally be specified as a URL that can be + retrieved using :py:func:`cp.cache_file `. +""" + +import base64 +import logging +from pathlib import Path +from urllib.parse import urlparse + +import salt.utils.files +from salt.exceptions import CommandExecutionError, SaltInvocationError + +try: + from salt.utils import asymmetric as asym + from salt.utils import x509 + + HAS_CRYPTOGRAPHY = True +except ImportError: + HAS_CRYPTOGRAPHY = False + +log = logging.getLogger(__name__) + +__virtualname__ = "asymmetric" + + +def __virtual__(): + if HAS_CRYPTOGRAPHY: + return __virtualname__ + return False, "Missing `cryptography` library" + + +def sign( + privkey, passphrase=None, text=None, filename=None, digest=None, raw=None, path=None +): + """ + Sign a file or text using an (RSA|ECDSA|Ed25519|Ed448) private key. + You can employ :py:func:`x509.create_private_key ` + to generate one. Returns the signature encoded in base64 by default. + + CLI Example: + + .. code-block:: bash + + salt '*' asymmetric.sign /root/my_privkey.pem text='I like you' + salt '*' asymmetric.sign /root/my_privkey.pem filename=/data/to/be/signed + + privkey + The private key to sign with. + + passphrase + If the private key is encrypted, the passphrase to decrypt it. Optional. + + text + Pass the text to sign. Either this or ``filename`` is required. + + filename + Pass the path of a file to sign. Either this or ``text`` is required. + + digest + The name of the hashing algorithm to use when creating signatures. + Defaults to ``sha256``. Only relevant for ECDSA or RSA. + + raw + Return the raw bytes instead of encoding them to base64. Defaults to false. + + path + Instead of returning the data, write it to a path on the local filesystem. + Optional. + """ + if text is not None: + try: + data = text.encode() + except AttributeError: + data = text + elif filename: + data = Path(filename) + else: + raise SaltInvocationError("Either `text` or `filename` is required") + raw = raw if raw is not None else bool(path) + sig = asym.sign(privkey, data, digest, passphrase=passphrase) + mode = "wb" + if not raw: + sig = base64.b64encode(sig).decode() + mode = "w" + if path: + with salt.utils.files.fopen(path, mode) as out: + out.write(sig) + return f"Signature written to '{path}'" + return sig + + +def verify( + text=None, + filename=None, + pubkey=None, + signature=None, + digest=None, + signed_by_any=None, + signed_by_all=None, + **kwargs, # pylint: disable=unused-argument +): + """ + Verify signatures on a specific input against (RSA|ECDSA|Ed25519|Ed448) public keys. + + .. note:: + + This function is supposed to be compatible with the same interface + as :py:func:`gpg.verify `` regarding keyword + arguments and return value format. + + CLI Example: + + .. code-block:: bash + + salt '*' asymmetric.verify pubkey=/root/my_pubkey.pem text='I like you' signature=/root/ilikeyou.sig + salt '*' asymmetric.verify pubkey=/root/my_pubkey.pem path=/root/confidential signature=/root/confidential.sig + + text + The text to verify. Either this or ``filename`` is required. + + filename + The path of a file to verify. Either this or ``text`` is required. + + pubkey + The single public key to verify ``signature`` against. Specify either + this or make use of ``signed_by_any``/``signed_by_all`` for compound checks. + + signature + If ``pubkey`` is specified, the single signature to verify. + If ``signed_by_any`` and/or ``signed_by_all`` is specified, this can be + a list of multiple signatures to check against the provided keys. + Required. + + digest + The name of the hashing algorithm to use when verifying signatures. + Defaults to ``sha256``. Only relevant for ECDSA or RSA. + + signed_by_any + A list of pubkeys from which any valid signature will mark verification + as passed. If none of the listed pubkeys provided a signature, + verification fails. Works with ``signed_by_all``, but mutually + exclusive with ``pubkey``. + + signed_by_all + A list of pubkeys, all of which must provide a signature for verification + to pass. If a single one of the listed pubkeys did not provide a signature, + verification fails. Works with ``signed_by_any``, but mutually + exclusive with ``pubkey``. + """ + # Basic compatibility with gpg.verify + ret = {"res": False, "message": "internal error"} + + signed_by_any = signed_by_any or [] + signed_by_all = signed_by_all or [] + if text and filename: + raise SaltInvocationError( + "`text` and `filename` arguments are mutually exclusive" + ) + if not signature: + raise SaltInvocationError("Missing `signature` parameter") + # We're constrained by compatibility with gpg.verify, so ensure the parameters + # are as expected. + multi_check = bool(signed_by_any or signed_by_all) + if multi_check: + if pubkey: + raise SaltInvocationError( + "Either specify pubkey + signature or signed_by_(any|all)" + ) + if isinstance(signature, (str, bytes)): + signature = [signature] + if not isinstance(signed_by_any, list): + signed_by_any = [signed_by_any] + if not isinstance(signed_by_all, list): + signed_by_all = [signed_by_all] + elif not pubkey: + raise SaltInvocationError("Missing pubkey(s) to check against") + elif not isinstance(signature, (str, bytes)): + raise SaltInvocationError( + "`signature` must be a string or bytes when verifying a single signing `pubkey`" + ) + else: + signed_by_all = [pubkey] + if not isinstance(signature, list): + signature = [signature] + + file_digest = None + if text: + try: + data = text.encode() + except AttributeError: + data = text + elif filename: + data = Path(filename) + if not data.exists(): + raise CommandExecutionError(f"Path '{filename}' does not exist") + else: + raise SaltInvocationError( + "Missing data to verify. Either specify `text` or `filename`" + ) + any_check = all_check = False + sigs = [] + for sig in signature: + try: + sigs.append(_fetch(sig)) + except CommandExecutionError as err: + if pubkey: + return {"res": False, "message": str(err)} + log.error(str(err), exc_info_on_loglevel=logging.DEBUG) + if not sigs: + raise CommandExecutionError("Unable to locate any of the provided signatures") + if signed_by_any: + for signer in signed_by_any: + try: + # Since we don't know if the signature algorithm supports + # `prehashed` (only rsa/ec), don't calculate it early, but + # cache it once it has been calculated. If a verification fails, + # it throws an exception. + _, data, file_digest = _verify_pubkey_against_list( + signer, sigs, data, digest, file_digest=file_digest + ) + any_check = True + break + except asym.InvalidSignature as err: + log.info(str(err), exc_info_on_loglevel=logging.DEBUG) + if err.file_digest is not None: + file_digest = err.file_digest + if err.data is not None: + data = err.data + except Exception as err: # pylint: disable=broad-except + log.error(str(err), exc_info_on_loglevel=logging.DEBUG) + else: + ret["res"] = False + ret["message"] = ( + "None of the public keys listed in signed_by_any provided a valid signature" + ) + return ret + + if signed_by_all: + all_sigs = sigs.copy() + for signer in signed_by_all: + try: + match, data, file_digest = _verify_pubkey_against_list( + signer, all_sigs, data, digest, file_digest=file_digest + ) + # Remove already associated signatures from list of possible ones + # Since pubkeys can be specified in different ways, this fails if + # the user passes in the same pubkey twice + all_sigs = list(set(all_sigs).difference(match)) + continue + except Exception as err: # pylint: disable=broad-except + log.error(str(err), exc_info_on_loglevel=logging.DEBUG) + ret["res"] = False + if pubkey: + ret["message"] = f"Failed checking signature: {err}" + else: + ret["message"] = f"Failed while checking `signed_by_all`: {err}" + return ret + all_check = True + + if bool(signed_by_any) is any_check and bool(signed_by_all) is all_check: + ret["res"] = True + if pubkey: + ret["message"] = "The signature is valid" + else: + ret["message"] = "All required keys have provided a signature" + return ret + # This should never be reached + ret["res"] = False + return ret + + +def _verify_pubkey_against_list(pub, sigs, data, digest, file_digest=None): + pubkey = _fetch(pub) + pubkey = x509.load_pubkey(pubkey) + match = [] + for sig in sigs: + try: + data, file_digest = asym.verify( + pubkey, sig, data, digest, file_digest=file_digest + ) + match.append(sig) + except asym.InvalidSignature as err: + if err.file_digest is not None: + file_digest = err.file_digest + if err.data is not None: + data = err.data + if not match: + raise asym.InvalidSignature( + f"Invalid signature for key {asym.fingerprint(pubkey)}", + file_digest=file_digest, + data=data, + pubkey=pubkey, + ) + return match, data, file_digest + + +def _fetch(url): + try: + parsed = urlparse(url) + except (TypeError, ValueError): + return url + sfn = None + if parsed.scheme == "": + sfn = url + elif parsed.scheme == "file": + sfn = parsed.path + else: + sfn = __salt__["cp.cache_file"](url) + if not sfn: + raise CommandExecutionError(f"Failed fetching '{url}'") + if parsed.scheme != "": + if not Path(sfn).exists(): + raise CommandExecutionError(f"Failed fetching '{url}'") + return sfn diff --git a/salt/utils/asymmetric.py b/salt/utils/asymmetric.py new file mode 100644 index 00000000000..a7e35212a47 --- /dev/null +++ b/salt/utils/asymmetric.py @@ -0,0 +1,263 @@ +import base64 +import hashlib +import logging +from pathlib import Path + +from cryptography.exceptions import InvalidSignature as CryptographyInvalidSig +from cryptography.hazmat.primitives.asymmetric import ec, padding, utils + +from salt.exceptions import CommandExecutionError, SaltInvocationError +from salt.utils import x509 +from salt.utils.hashutils import get_hash + +log = logging.getLogger(__name__) + +DEFAULT_HASHALG = "sha256" + + +class InvalidSignature(CommandExecutionError): + """ + Raised when a signature is invalid. + We save the file hash to avoid computing it multiple times for + signed_by_any, if a path has been passed in and the signature algorithm + supports prehashed data. + """ + + def __init__(self, *args, file_digest=None, data=None, pubkey=None): + super().__init__(*args) + self.file_digest = file_digest + self.data = data + self.pubkey = pubkey + + +def fingerprint(pubkey): + """ + Return the SHA256 hexdigest of a pubkey's DER representation. + + pub + The public key to calculate the fingerprint of. + Can be any reference that can be passed to ``salt.utils.x509.load_pubkey``. + """ + pubkey = x509.load_pubkey(pubkey) + hsh = getattr(hashlib, DEFAULT_HASHALG)() + hsh.update(x509.to_der(pubkey)) + return hsh.hexdigest() + + +def sign(privkey, data, digest=None, passphrase=None): + """ + Sign data with a private key. + + privkey + The private key to sign with. Can be any reference that can be passed + to ``salt.utils.x509.load_privkey``. + + data + The data to sign. Should be either ``str``, ``bytes`` or ``pathlib.Path`` object. + + digest + The name of the hashing algorithm to use when creating signatures. + Defaults to ``sha256``. Only relevant for ECDSA or RSA. + + passphrase + If the private key is encrypted, the passphrase to decrypt it. Optional. + """ + privkey = x509.load_privkey(privkey, passphrase=passphrase) + key_type = x509.get_key_type(privkey) + prehashed = False + if isinstance(data, Path): + if key_type in (x509.KEY_TYPE.RSA, x509.KEY_TYPE.EC): + data = _get_file_digest(data, digest) + prehashed = True + else: + data = data.read_bytes() + elif isinstance(data, str): + data = data.encode() + if key_type == x509.KEY_TYPE.RSA: + return _sign_rsa(privkey, data, digest, prehashed=prehashed) + if key_type == x509.KEY_TYPE.EC: + return _sign_ec(privkey, data, digest, prehashed=prehashed) + if key_type == x509.KEY_TYPE.ED25519: + return _sign_ed25519(privkey, data) + if key_type == x509.KEY_TYPE.ED448: + return _sign_ed448(privkey, data) + raise CommandExecutionError(f"Unknown private key type: {privkey.__class__}") + + +def try_base64(data): + """ + Check if the data is valid base64 and return the decoded + bytes if so, otherwise return the data untouched. + """ + try: + if isinstance(data, str): + data = data.encode("ascii", "strict") + elif isinstance(data, bytes): + pass + else: + raise CommandExecutionError("is_base64 only works with strings and bytes") + decoded = base64.b64decode(data) + if base64.b64encode(decoded) == data.replace(b"\n", b""): + return decoded + return data + except (TypeError, ValueError): + return data + + +def load_sig(sig): + """ + Try to load an input that represents a signature into the signature's bytes. + + sig + The reference to load. Can either be a base64-encoded string or a path + to a local file in base64 encoding or raw bytes. + """ + if x509.isfile(sig): + sig = Path(sig).read_bytes() + sig = try_base64(sig) + if isinstance(sig, bytes): + return sig + raise CommandExecutionError( + f"Failed loading signature '{sig}' as file and/or base64 string" + ) + + +def verify(pubkey, sig, data, digest=None, file_digest=None): + """ + Verify a signature against a public key. + + On success, returns a tuple of (data, file_digest), which can reused + by the callee when multiple signatures are checked against the same file. + This avoids reading the file/calculating the digest multiple times. + + On failure, raises an InvalidSignature exception which carries + ``data`` and ``file_digest`` attributes with the corresponding values. + + pub + The public key to verify the signature against. + Can be any reference that can be passed to ``salt.utils.x509.load_pubkey``. + + sig + The signature to verify. + Can be any reference that can be passed to ``salt.utils.asymmetric.load_sig``. + + data + The data to sign. Should be either ``str``, ``bytes`` or ``pathlib.Path`` object. + Ignored when ``file_digest`` is passed and the signing algorithm is either + ECDSA or RSA. + + digest + The name of the hashing algorithm to use when creating signatures. + Defaults to ``sha256``. Only relevant for ECDSA or RSA. + + file_digest + The ECDSA and RSA algorithms can be invoked with a precalculated digest + in order to avoid loading the whole file into memory. This happens automatically + during the execution of this function, but when checking multiple signatures, + you can cache the calculated value and pass it back in. + """ + pubkey = x509.load_pubkey(pubkey) + signature = load_sig(sig) + key_type = x509.get_key_type(pubkey) + sig_data = data + if key_type in (x509.KEY_TYPE.RSA, x509.KEY_TYPE.EC): + if file_digest: + sig_data = file_digest + elif isinstance(data, Path): + file_digest = sig_data = _get_file_digest(data, digest) + elif isinstance(data, str): + sig_data = data.encode() + elif isinstance(data, Path): + data = sig_data = data.read_bytes() + elif isinstance(data, str): + data = sig_data = data.encode() + + try: + if key_type == x509.KEY_TYPE.RSA: + _verify_rsa( + pubkey, + signature, + sig_data, + digest or DEFAULT_HASHALG, + prehashed=bool(file_digest), + ) + return data, file_digest + if key_type == x509.KEY_TYPE.EC: + _verify_ec( + pubkey, + signature, + sig_data, + digest or DEFAULT_HASHALG, + prehashed=bool(file_digest), + ) + return data, file_digest + if key_type == x509.KEY_TYPE.ED25519: + _verify_ed25519(pubkey, signature, sig_data) + return data, file_digest + if key_type == x509.KEY_TYPE.ED448: + _verify_ed448(pubkey, signature, sig_data) + return data, file_digest + except CryptographyInvalidSig as err: + raise InvalidSignature( + f"Invalid signature for key {fingerprint(pubkey)}", + file_digest=file_digest, + data=data, + pubkey=pubkey, + ) from err + raise SaltInvocationError(f"Unknown public key type: {pubkey.__class__}") + + +def _sign_rsa(priv, data, digest, prehashed=False): + pad_hashalg = sig_hashalg = x509.get_hashing_algorithm(digest or DEFAULT_HASHALG) + if prehashed: + sig_hashalg = utils.Prehashed(sig_hashalg) + return priv.sign( + data, + padding.PSS(mgf=padding.MGF1(pad_hashalg), salt_length=padding.PSS.MAX_LENGTH), + sig_hashalg, + ) + + +def _sign_ec(priv, data, digest, prehashed=False): + hashalg = x509.get_hashing_algorithm(digest or DEFAULT_HASHALG) + if prehashed: + hashalg = utils.Prehashed(hashalg) + return priv.sign(data, ec.ECDSA(hashalg)) + + +def _sign_ed25519(priv, data): + return priv.sign(data) + + +def _sign_ed448(priv, data): + return priv.sign(data) + + +def _verify_rsa(pub, sig, data, digest=DEFAULT_HASHALG, prehashed=False): + pad_hashalg = sig_hashalg = x509.get_hashing_algorithm(digest) + if prehashed: + sig_hashalg = utils.Prehashed(sig_hashalg) + # Technically, scheme hash function and the MGF hash function can be different, + # but that's not common practice. + pad = padding.PSS(mgf=padding.MGF1(pad_hashalg), salt_length=padding.PSS.AUTO) + pub.verify(sig, data, pad, sig_hashalg) + + +def _verify_ec(pub, sig, data, digest=DEFAULT_HASHALG, prehashed=False): + hashalg = x509.get_hashing_algorithm(digest) + if prehashed: + hashalg = utils.Prehashed(hashalg) + pub.verify(sig, data, ec.ECDSA(hashalg)) + + +def _verify_ed25519(pub, sig, data): + pub.verify(sig, data) + + +def _verify_ed448(pub, sig, data): + pub.verify(sig, data) + + +def _get_file_digest(file, digest): + hexdigest = get_hash(file, digest or DEFAULT_HASHALG) + return bytes.fromhex(hexdigest) diff --git a/tests/pytests/functional/modules/test_asymmetric.py b/tests/pytests/functional/modules/test_asymmetric.py new file mode 100644 index 00000000000..b4930641f0c --- /dev/null +++ b/tests/pytests/functional/modules/test_asymmetric.py @@ -0,0 +1,834 @@ +import base64 +import logging +from pathlib import Path +from textwrap import dedent + +import pytest + +from salt.exceptions import CommandExecutionError, SaltInvocationError + +util = pytest.importorskip("salt.utils.asymmetric") + + +@pytest.fixture(params=(False,)) +def signed_data(request, tmp_path): + data = "I am an important message" + if request.param: + with pytest.helpers.temp_file("signed_data", data, tmp_path) as f: + yield str(f) + else: + yield data + + +@pytest.fixture +def priv_rsa(): + return dedent( + """ + -----BEGIN PRIVATE KEY----- + MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC5f9OHbgZcXzqx + ZfH5DSicaYCTreYBGt+gQAClufIdrl+Tu147o2/tXJQNFWjvyl+FJXaDYTj2PdZp + fdPNWw4k7v5ndqwVdevZGD/nrjzd9eX0WXN/OPPKmnLe7pVYOO179jKEGWIgbewv + B9VhNodvYLbGmgHSDA9ijytzhTV7gJEswN89vhi0ovVR3uyEOYDXvWCkkG5UIdJz + cKa8DHRLd+JoaZ5E5i3VtUgqn8wVQzW8CK8FcM/GdYMmcbHf5YNaWCZ5DZlM+83h + ggUNrnsIao0x5LjFQ9uKgbLUvJnQBRmI+nKd5xhyMtwMidTxMj5Qlr0E3jlRBI1k + JlT+BDCbAgMBAAECggEAAR8sn74++RJxiFVH7vG4kwDeF+wugJLL8TseFtNrXhYN + uI9e8/S6gjoMXK9OFBA1nud6l/dyk6YrM5+zu7JzW3nhOa3VP3WBbEvgKDwTTb5S + cPGy1THrW4DrEzY7JfmjVVCMGQLwhUw2wpQMwUMga8frROz1UoHUk0gnt7snOuje + sIZCUlXi9yyRu00upFjKpPCONO5lp6ipTCxMZeD6cLw4SNE893kmmJXv0wVIKvhA + PSqxPGas74h/+7x4/jrVNnuai8YYIMHtQd60YVoWOLV4l3s8m12HARaOUzyMTG/m + vBAYx+4vCyNCGb+m7S+fDzaQdlsY+qsnE3RL9q6kAQKBgQDueHZLc1JgoahxWM26 + 9ZhHZKAFclOQOVMgHPAfFEpmetrIgIZk9qiHaCIyo8sGSN2PLBwwS/Gc1/FikldS + ACcsOC8w7Zfha4n1tyWvxY1btnbim1YPUDg1ciJuVTOh8vc949pLmyjKUZBvhZTZ + qah416upxCv7I2hdsKRE1IicmwKBgQDHIo0/CTBr3xJ535bc8I0XlB5Oed6jXUSD + Z83rpFx+IILotu8MFiOxH4wElr1vZz8VJYWH6paYm6F82hoEdQeFMKQf4DIq1tPO + 6WSC/icipM+y3TBuXTEtqtjRFeRr7siEj8kb11r7EaPYyw3Z2jEjcvfJHdV++tte + QOrfN6n8AQKBgACfSde6jk14PoNFMww41dPh3FUHTlaC/8eGq8249NS9n1KEm1Uq + G5h22hf9u2rhx8o22D/8Ar5hBd02+olZPMDtyJm9FPdem3aLqsqBnnPNzxOaSigy + EmN5T8Ov7zmN870ymgA2gG2+trzDwXar7aebEHSZ8W9vUTdlXZhcYZrfAoGBAIzM + b1Y8pxH+fc/SOZcqNniPcAZIwRR9I65NvRl58zPyxNzKS6ceGEpqZdPwySx1sfK/ + vvRk9+obUEk45OB15sVTqRgoqxADKWvJNhownXcvVPPA1TeTiOwjOn5LnmB6Syj/ + iVC4KkoPJOxqVfbNAaVw6qY3A/duY6D3AZqmfvgBAoGAQ6OyxYh6kAkGDZi8ITkc + 8i7Eqjb9C+DMURmaycXGm/9Ft11PQDjNIkJA1tiYKacs4v7ELk/dv5swvgOHthZi + GqcVceAozUEluAj/crlD+IuwD4ohlW0HMqtoHtgsEg02n5q52e3ubmEqJzoeFTJf + ofJwPHKSkcvnRSH9ZQFbhqo= + -----END PRIVATE KEY----- + """ + ).strip() + + +@pytest.fixture +def pub_rsa(): + return dedent( + """ + -----BEGIN PUBLIC KEY----- + MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuX/Th24GXF86sWXx+Q0o + nGmAk63mARrfoEAApbnyHa5fk7teO6Nv7VyUDRVo78pfhSV2g2E49j3WaX3TzVsO + JO7+Z3asFXXr2Rg/56483fXl9Flzfzjzyppy3u6VWDjte/YyhBliIG3sLwfVYTaH + b2C2xpoB0gwPYo8rc4U1e4CRLMDfPb4YtKL1Ud7shDmA171gpJBuVCHSc3CmvAx0 + S3fiaGmeROYt1bVIKp/MFUM1vAivBXDPxnWDJnGx3+WDWlgmeQ2ZTPvN4YIFDa57 + CGqNMeS4xUPbioGy1LyZ0AUZiPpynecYcjLcDInU8TI+UJa9BN45UQSNZCZU/gQw + mwIDAQAB + -----END PUBLIC KEY----- + """ + ).strip() + + +@pytest.fixture +def sig_rsa(): + return dedent( + """ + JQcgQPkK4ys7Nfipn/lZvVvXd4xSRKhgd+Z/zmwB25z9uAXLFB0nVUgIUUk+r2eP8H7LX9lWqNO3 + hNRwkEaEijnnCILW8YezgBXpmKccG7Or50Us3okp52aTvxrCb0BK7CA0h/Tg+aRxNYmM+3RAgttk + QGSKEkEHHZ2X0DEr8BDPb2bm27ghy5HPYB7DeFb+vJBWn2gCHbIaXy3nNCahApU+UnstdB6Cbe78 + A3TmOoPSVsdekV31FYztb3RBsjnlx76/t2zZ7B9BH1HsbFkz4fSblWJR+W36vg9gpKH4Ife82NlM + YOhJapxsaXtAfxDlwMGk7NUacj66d0EJ+NJpUg== + """ + ).strip() + + +@pytest.fixture +def priv_ec(): + return dedent( + """ + -----BEGIN PRIVATE KEY----- + MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgv25d59EOFqxFC07k + Vzlc+7wOwBjTRYxuvMrcVqEeG3WhRANCAAQAJcGq7ad0wtRL9nRf98pQYCBFR96d + gGJ7d6vD9A05h9CmAMiM277rFXEwtFG9JgatDYlETBRVdHRJHmkRECeh + -----END PRIVATE KEY----- + """ + ).strip() + + +@pytest.fixture +def priv_ec_enc(): + return dedent( + """ + -----BEGIN ENCRYPTED PRIVATE KEY----- + MIHsMFcGCSqGSIb3DQEFDTBKMCkGCSqGSIb3DQEFDDAcBAg2tWHDgK+VvQICCAAw + DAYIKoZIhvcNAgkFADAdBglghkgBZQMEASoEEP1nD6+tv5dFqdeEPh+FYPcEgZAq + K7gUEvPm68aprE0jLURTIzB8VlJBpwZMYtWdBH2XBOIVjUms/rOaQvw7JDy3DaOq + w7lpslo2twZj5rkyueXQkULyhMdg9nA2kIjZckApMPvKClRZtVmt9erMrhlstKYw + IY2Nc2sYZeohwoifb+n2vMTg/rCVCCce40KDX5jdxQR7AwAHeL8shosqM4xpvtQ= + -----END ENCRYPTED PRIVATE KEY----- + """ + ).strip() + + +@pytest.fixture +def pub_ec(): + return dedent( + """ + -----BEGIN PUBLIC KEY----- + MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEACXBqu2ndMLUS/Z0X/fKUGAgRUfe + nYBie3erw/QNOYfQpgDIjNu+6xVxMLRRvSYGrQ2JREwUVXR0SR5pERAnoQ== + -----END PUBLIC KEY----- + """ + ).strip() + + +@pytest.fixture +def sig_ec(): + return dedent( + """ + MEYCIQDwWw2fvPO/ZtE+ezTS4voFoEHmg73ehTXwOfPwIygSJAIhAKnWM9PrXxVLTKE0JTogjz8HVXKn2cTD9ozlnnHWQzbS + """ + ).strip() + + +@pytest.fixture +def sig_ec_sha512(): + return dedent( + """ + MEUCIQDu+3SngDQQhxnYGUyFhiiqYfVFKSWnNWfXXW8dmFe0FgIgXo5KByO0h6q8cyz6reQ4GTseWhn6Df+1UpACCCFiGIQ= + """ + ).strip() + + +@pytest.fixture +def priv_ed25519(): + return dedent( + """ + -----BEGIN PRIVATE KEY----- + MC4CAQAwBQYDK2VwBCIEIO6BM6CnCIbOI4WvxFYys+QYjrFZLyrQYRxTqnYZLuEm + -----END PRIVATE KEY----- + """ + ).strip() + + +@pytest.fixture +def pub_ed25519(): + return dedent( + """ + -----BEGIN PUBLIC KEY----- + MCowBQYDK2VwAyEAC5+5Ei58lozexyPknspZWwdONLxJFgKEHwmxpFsc+4k= + -----END PUBLIC KEY----- + """ + ).strip() + + +@pytest.fixture +def sig_ed25519(): + return dedent( + """ + Lo6RH3psCiop68Ryvb8b/Hf3Hkkr2CwBDBoIvuCU22n2rOz3XMJTfQL7FNqsQBhQparsy8zfq4XFf3K2YhQvBQ== + """ + ).strip() + + +@pytest.fixture +def priv_ed448(): + return dedent( + """ + -----BEGIN PRIVATE KEY----- + MEcCAQAwBQYDK2VxBDsEOeHr/G4AyIHMuzQRY4rqX2Z52aE7QebFN1TZZUlHpt30 + taTKZ5B3zV+HEXUtBwPXQLl5Zz1+PKfNCQ== + -----END PRIVATE KEY----- + """ + ).strip() + + +@pytest.fixture +def pub_ed448(): + return dedent( + """ + -----BEGIN PUBLIC KEY----- + MEMwBQYDK2VxAzoAaI9rB2S21l+PWK7KznH4O1G9sjsyzMeIRGolsN4/f9B+gKOh + E+WnN/+tA0YoN+n8GluwOlss1cOA + -----END PUBLIC KEY----- + """ + ).strip() + + +@pytest.fixture +def sig_ed448(): + return dedent( + """ + oQj3viRXHfZdH655AF3Q9j18lHecXdRQf85QKVJo0FHz6ycM/cZ3ptEuMpi3MMYJDHv7MHwcTaQA6/5lRoVMARDaqmaBCHeer+j12KOu/uCK0iuSTvLFN7XmTIC2i0pX6RRE4pVLPo85ELHfE2tovy8A + """ + ).strip() + + +@pytest.fixture +def asymm(modules): + return modules.asymmetric + + +@pytest.mark.parametrize("signed_data", (False, True), indirect=True) +@pytest.mark.parametrize( + "algo", + ( + "rsa", + "ec", + pytest.param("ed25519", marks=pytest.mark.skip_on_fips_enabled_platform), + pytest.param("ed448", marks=pytest.mark.skip_on_fips_enabled_platform), + ), +) +def test_sign(algo, signed_data, request, asymm): + privkey = request.getfixturevalue(f"priv_{algo}") + filename = text = None + if signed_data.startswith("/"): + filename = signed_data + data = Path(filename).read_bytes() + else: + text = signed_data + data = text.encode() + res = asymm.sign(privkey, filename=filename, text=text) + pubkey = request.getfixturevalue(f"pub_{algo}") + util.verify(pubkey, res, data) + + +def test_sign_encrypted_privkey(priv_ec_enc, pub_ec, signed_data, asymm): + res = asymm.sign(priv_ec_enc, text=signed_data, passphrase="hunter1") + util.verify(pub_ec, res, signed_data) + + +@pytest.mark.parametrize("algo", ("rsa", "ec")) +def test_sign_digest(algo, signed_data, request, asymm): + privkey = request.getfixturevalue(f"priv_{algo}") + res = asymm.sign(privkey, text=signed_data, digest="sha512") + pubkey = request.getfixturevalue(f"pub_{algo}") + with pytest.raises(util.InvalidSignature): + util.verify(pubkey, res, signed_data) + util.verify(pubkey, res, signed_data, digest="sha512") + + +@pytest.mark.parametrize("raw", (False, True)) +def test_sign_raw(raw, priv_ec, signed_data, asymm): + res = asymm.sign(priv_ec, text=signed_data, raw=raw) + assert isinstance(res, bytes) is raw + assert isinstance(res, str) is not raw + + +@pytest.mark.parametrize("raw", (False, True)) +def test_sign_path(raw, priv_ec, signed_data, tmp_path, asymm): + out = tmp_path / "out" + res = asymm.sign(priv_ec, text=signed_data, raw=raw, path=str(out)) + assert str(out) in res + data = out.read_bytes() + try: + data.decode() + unicode = True + except UnicodeDecodeError: + unicode = False + assert raw is not unicode + + +def test_sign_bytes(priv_ec, pub_ec, signed_data, asymm): + res = asymm.sign(priv_ec, text=signed_data.encode()) + util.verify(pub_ec, res, signed_data.encode()) + + +@pytest.mark.parametrize("signed_data", (False, True), indirect=True) +@pytest.mark.parametrize( + "algo", + ( + "rsa", + "ec", + pytest.param("ed25519", marks=pytest.mark.skip_on_fips_enabled_platform), + pytest.param("ed448", marks=pytest.mark.skip_on_fips_enabled_platform), + ), +) +def test_verify(algo, signed_data, request, asymm): + pubkey = request.getfixturevalue(f"pub_{algo}") + sig = request.getfixturevalue(f"sig_{algo}") + filename = text = None + if signed_data.startswith("/"): + filename = signed_data + else: + text = signed_data + res = asymm.verify(pubkey=pubkey, filename=filename, text=text, signature=sig) + assert isinstance(res, dict) + assert res["res"] is True + assert "is valid" in res["message"] + + +def test_verify_pub_sig_from_url(pub_ec, sig_ec, signed_data, state_tree, asymm): + with pytest.helpers.temp_file("pub_ec.pem", pub_ec, state_tree): + with pytest.helpers.temp_file("sig_ec", sig_ec, state_tree): + res = asymm.verify( + text=signed_data, pubkey="salt://pub_ec.pem", signature="salt://sig_ec" + ) + assert res["res"] is True + assert "is valid" in res["message"] + + +def test_verify_pub_sig_from_file_url( + pub_ec, sig_ec, signed_data, state_tree, tmp_path, asymm +): + sig = tmp_path / "sig" + pub = tmp_path / "pub" + sig.write_bytes(sig_ec.encode()) + pub.write_bytes(pub_ec.encode()) + res = asymm.verify( + text=signed_data, pubkey=f"file://{pub}", signature=f"file://{sig}" + ) + assert res["res"] is True + assert "is valid" in res["message"] + + +def test_verify_pub_from_url_notfound(pub_ec, sig_ec, signed_data, state_tree, asymm): + with pytest.helpers.temp_file("sig_ec", sig_ec, state_tree): + res = asymm.verify( + text=signed_data, pubkey="salt://pub_ec.pem", signature="salt://sig_ec" + ) + assert res["res"] is False + assert "Failed fetching" in res["message"] + + +def test_verify_sig_from_url_notfound(pub_ec, sig_ec, signed_data, state_tree, asymm): + with pytest.helpers.temp_file("pub_ec.pem", pub_ec, state_tree): + res = asymm.verify( + text=signed_data, pubkey="salt://pub_ec.pem", signature="salt://sig_ec" + ) + assert res["res"] is False + assert "Failed fetching" in res["message"] + + +def test_verify_bytes(pub_ec, sig_ec, signed_data, asymm): + sig = base64.b64decode(sig_ec) + res = asymm.verify(text=signed_data.encode(), pubkey=pub_ec, signature=sig) + assert res["res"] is True + + +def test_verify_fail_wrong_data(pub_ec, sig_ec, signed_data, asymm): + signed_data += "!" + res = asymm.verify(pubkey=pub_ec, signature=sig_ec, text=signed_data) + assert res["res"] is False + assert "Invalid signature" in res["message"] + + +@pytest.mark.parametrize( + "algo", + ( + "rsa", + "ec", + pytest.param("ed25519", marks=pytest.mark.skip_on_fips_enabled_platform), + pytest.param("ed448", marks=pytest.mark.skip_on_fips_enabled_platform), + ), +) +def test_verify_fail_wrong_pubkey(algo, signed_data, request, modules, asymm): + sig = request.getfixturevalue(f"sig_{algo}") + pub = modules.x509.get_public_key(modules.x509.create_private_key(algo)) + res = asymm.verify(pubkey=pub, signature=sig, text=signed_data) + assert res["res"] is False + assert "Invalid signature" in res["message"] + + +@pytest.mark.parametrize("raw", (False, True)) +@pytest.mark.parametrize( + "algo", + ( + "rsa", + "ec", + pytest.param("ed25519", marks=pytest.mark.skip_on_fips_enabled_platform), + pytest.param("ed448", marks=pytest.mark.skip_on_fips_enabled_platform), + ), +) +def test_verify_signature_from_file(raw, algo, signed_data, request, asymm, tmp_path): + pubkey = request.getfixturevalue(f"pub_{algo}") + sig = request.getfixturevalue(f"sig_{algo}") + sig_path = tmp_path / "sig" + if raw: + sig_data = base64.b64decode(sig) + else: + sig_data = sig.encode() + sig_path.write_bytes(sig_data) + + res = asymm.verify(pubkey=pubkey, text=signed_data, signature=str(sig_path)) + assert isinstance(res, dict) + assert res["res"] is True + assert "is valid" in res["message"] + + +def test_verify_digest(pub_ec, signed_data, sig_ec_sha512, asymm): + res = asymm.verify( + pubkey=pub_ec, text=signed_data, signature=sig_ec_sha512, digest="sha512" + ) + assert isinstance(res, dict) + assert res["res"] is True + assert "is valid" in res["message"] + + +@pytest.fixture +def pub_ec1(): + return dedent( + """ + -----BEGIN PUBLIC KEY----- + MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEsxSmSTxtsNiCKtHru74H7L+62nF6 + qGgyw6gGkTBkh56GCPXfhLk7yR67aypZncncWMcJSTPYSo3jSVNEfxAHhw== + -----END PUBLIC KEY----- + """ + ).strip() + + +@pytest.fixture +def sig_ec1(): + return dedent( + """ + MEYCIQCgd+u3FHFrVCFxOgiUtGWeBnB38Vf9U8DkW/A2yqZhoQIhAIFFANHzHqjoTQcCazyCx8imEmchVCPssF9m5FRSnLxD + """ + ).strip() + + +@pytest.fixture +def pub_ec2(): + return dedent( + """ + -----BEGIN PUBLIC KEY----- + MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAErtBZ3qL5m97SzlSwOoxFzzG/1v5a + sLzOIrXykh4yO8tDn4h6JMOe+P0HuoUbENxk4+f/1D9hTEI88rj70bi7Ig== + -----END PUBLIC KEY----- + """ + ).strip() + + +@pytest.fixture +def sig_ec2(): + return dedent( + """ + MEUCIQDRcivGIrzfFv0bZaLpP7cG3DOucRTIcAObez12H9dpuQIgHt56uSCHJqJK8J0EHLOjeunffAyM2Vllnv6zhZPKFjA= + """ + ).strip() + + +@pytest.fixture +def pub_ec3(): + return dedent( + """ + -----BEGIN PUBLIC KEY----- + MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE/cuKGCA0Kj8YBEmBxKtj+jg4Hpy5 + OCN5s7cYUVq3Cl/dVObv3ZbBv7ttct4tWd25f4g46cpIjDwoUXP6IRwKYg== + -----END PUBLIC KEY----- + """ + ).strip() + + +@pytest.fixture +def sig_ec3(): + return dedent( + """ + MEQCIG5IPYRuQSykJYkp3wm9K4dxI12u/raQ1VSGoMP+iEtFAiB0f6NZh7QLlB+OazGUdrgdiQo/YXeQf6zrHOAYNQ0iOg== + """ + ).strip() + + +@pytest.fixture +def sig_notasig(): + return "chaos is a ladder" + + +@pytest.mark.parametrize("signed_data", (False, True), indirect=True) +@pytest.mark.parametrize( + "sig,by,expected", + [ + (["ec1"], ["ec1", "ec2"], True), + (["ec1", "ec3"], ["ec1", "ec2"], True), + (["ec1", "ec3"], ["ec2", "ec3"], True), + (["ec1"], ["ec2", "ec3"], False), + (["notasig"], ["ec1", "ec2", "ec3"], False), + (["notasig", "ec2"], ["ec1", "ec2"], True), + pytest.param( + ["ec2", "rsa", "ed25519", "ed448"], + ["ec1", "ec2"], + True, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec1", "rsa", "ed25519", "ed448"], + ["ec2", "ed448"], + True, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec1", "rsa", "ed25519", "ed448"], + ["ec2", "rsa"], + True, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec1", "rsa", "ed25519", "ed448"], + ["ec2", "ed25519"], + True, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec1", "rsa", "ed448"], + ["ec2", "ed25519"], + False, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec1", "rsa", "ed25519"], + ["ec2", "ed448"], + False, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec1", "ed25519", "ed448"], + ["ec2", "rsa"], + False, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec2", "rsa", "ed25519", "ed448"], + ["ec1"], + False, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec", "rsa", "ed25519", "ed448"], + ["ec1", "ec2", "ec3"], + False, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + ], +) +def test_verify_signed_by_any(signed_data, sig, by, expected, request, asymm): + sigs = [request.getfixturevalue(f"sig_{key}") for key in sig] + keys = [request.getfixturevalue(f"pub_{key}") for key in by] + filename = text = None + if signed_data.startswith("/"): + filename = signed_data + else: + text = signed_data + res = asymm.verify( + filename=filename, + text=text, + signature=sigs, + signed_by_any=keys, + ) + assert res["res"] is expected + + +def test_verify_signed_by_any_url( + state_tree, signed_data, sig_ec, sig_rsa, pub_rsa, pub_ec, asymm +): + with pytest.helpers.temp_file("pub_rsa.pem", pub_rsa, state_tree): + with pytest.helpers.temp_file("sig_rsa", sig_rsa, state_tree): + res = asymm.verify( + text=signed_data, + signature=[sig_ec, "salt://sig_rsa"], + signed_by_any=["salt://pub_rsa.pem", pub_ec], + ) + assert res["res"] is True + assert "All required keys have provided a signature" in res["message"] + + +def test_verify_signed_by_any_sig_url_fail( + state_tree, signed_data, sig_ec, sig_rsa, pub_rsa, pub_ec, asymm +): + with pytest.helpers.temp_file("pub_rsa.pem", pub_rsa, state_tree): + res = asymm.verify( + text=signed_data, + signature=[sig_ec, "salt://sig_rsa"], + signed_by_any=["salt://pub_rsa.pem", pub_ec], + ) + assert res["res"] is True + assert "All required keys have provided a signature" in res["message"] + + +def test_verify_signed_by_any_pub_url_fail( + state_tree, signed_data, sig_ec, sig_rsa, pub_rsa, pub_ec, asymm +): + with pytest.helpers.temp_file("sig_rsa", sig_rsa, state_tree): + res = asymm.verify( + text=signed_data, + signature=[sig_ec, "salt://sig_rsa"], + signed_by_any=["salt://pub_rsa.pem", pub_ec], + ) + assert res["res"] is True + assert "All required keys have provided a signature" in res["message"] + + +def test_verify_sig_url_all_fail(state_tree, signed_data, pub_rsa, pub_ec, asymm): + with pytest.raises(CommandExecutionError, match="Unable to locate .* signatures"): + asymm.verify( + text=signed_data, + signature=["salt://sig_ec", "salt://sig_rsa"], + signed_by_any=[pub_rsa, pub_ec], + ) + + +def test_verify_pub_url_all_fail( + state_tree, signed_data, sig_rsa, sig_ec, asymm, caplog +): + with caplog.at_level(logging.ERROR): + res = asymm.verify( + text=signed_data, + signature=[sig_ec, sig_rsa], + signed_by_any=["salt://pub_ec", "salt://pub_rsa"], + ) + assert res["res"] is False + for src in ("pub_ec", "pub_rsa"): + assert f"Failed fetching 'salt://{src}'" in caplog.text + + +@pytest.mark.parametrize("signed_data", (False, True), indirect=True) +@pytest.mark.parametrize( + "sig,by,expected", + [ + (["ec1"], ["ec1"], True), + (["ec1", "ec2"], ["ec3"], False), + (["ec1", "ec2"], ["ec1", "ec2"], True), + (["ec1", "ec2"], ["ec2", "ec1"], True), + (["ec1", "ec2"], ["ec2", "ec1", "ec3"], False), + (["ec1", "ec2", "notasig"], ["ec2", "ec1"], True), + pytest.param( + ["ec1", "rsa", "ed25519", "ed448"], + ["ed448", "ec2"], + False, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec1", "rsa", "ed25519", "ed448"], + ["ed25519", "ec2"], + False, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec1", "rsa", "ed25519", "ed448"], + ["rsa", "ec2"], + False, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec1", "rsa", "ed25519", "ed448"], + ["ec1", "ec2"], + False, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec1", "rsa", "ed25519", "ed448", "ec"], + ["rsa", "ed448", "ed25519", "ec1"], + True, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec1", "rsa", "ed25519", "ed448", "ec"], + ["rsa", "ed448", "ed25519", "ec2"], + False, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec1", "rsa", "ed25519", "ed448"], + ["ed448", "ec1", "rsa", "ed25519"], + True, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec1", "rsa", "ed25519", "ed448"], + ["ed25519", "rsa", "ec1"], + True, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec1", "rsa", "ed25519", "ed448", "notasig"], + ["ed25519", "rsa", "ec1"], + True, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + ], +) +def test_verify_signed_by_all(signed_data, sig, by, expected, request, asymm): + sigs = [request.getfixturevalue(f"sig_{key}") for key in sig] + keys = [request.getfixturevalue(f"pub_{key}") for key in by] + filename = text = None + if signed_data.startswith("/"): + filename = signed_data + else: + text = signed_data + res = asymm.verify( + filename=filename, + text=text, + signature=sigs, + signed_by_all=keys, + ) + assert res["res"] is expected + + +@pytest.mark.parametrize("signed_data", (False, True), indirect=True) +@pytest.mark.parametrize( + "sig,any_by,all_by,expected", + [ + (["ec1", "ec2"], ["ec2", "ec3"], ["ec1"], True), + (["ec1", "ec2"], ["ec", "ec3"], ["ec1", "ec2"], False), + (["ec1", "ec2"], ["ec1", "ec3"], ["ec2", "ec3"], False), + (["ec1", "ec2", "ec3"], ["ec1", "ec3"], ["ec2", "ec3"], True), + pytest.param( + ["ec1", "ec2", "ec3", "rsa", "ed25519", "ed448"], + ["ec", "rsa"], + ["ed25519", "ed448", "ec2"], + True, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec1", "ec2", "ec3", "rsa", "ed25519", "ed448"], + ["ec"], + ["ed25519", "ed448", "ec2", "rsa"], + False, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + pytest.param( + ["ec1", "ec2", "ec3", "rsa", "ed25519", "ed448"], + ["ec", "ec2"], + ["ed25519", "ed448", "ec2", "rsa"], + True, + marks=pytest.mark.skip_on_fips_enabled_platform, + ), + ], +) +def test_verify_signed_by_any_and_all( + signed_data, sig, any_by, all_by, expected, request, asymm +): + sigs = [request.getfixturevalue(f"sig_{key}") for key in sig] + any_keys = [request.getfixturevalue(f"pub_{key}") for key in any_by] + all_keys = [request.getfixturevalue(f"pub_{key}") for key in all_by] + filename = text = None + if signed_data.startswith("/"): + filename = signed_data + else: + text = signed_data + res = asymm.verify( + filename=filename, + text=text, + signature=sigs, + signed_by_any=any_keys, + signed_by_all=all_keys, + ) + assert res["res"] is expected + + +def test_signed_by_any_notalist(pub_ec, sig_ec, signed_data, asymm): + res = asymm.verify(text=signed_data, signature=sig_ec, signed_by_any=pub_ec) + assert res["res"] is True + + +def test_signed_by_all_notalist(pub_ec, sig_ec, signed_data, asymm): + res = asymm.verify(text=signed_data, signature=sig_ec, signed_by_all=pub_ec) + assert res["res"] is True + + +@pytest.mark.parametrize( + "params,err", + [ + ( + ("text", "filename", "pubkey", "signature"), + "`text` and `filename` arguments are mutually exclusive", + ), + ( + ("text", "pubkey"), + "Missing `signature` parameter", + ), + ( + ("text", "signature"), + r"Missing pubkey\(s\) to check against", + ), + ( + ("signature", "pubkey"), + "Missing data to verify.*", + ), + ( + ("text", "pubkey", "signed_by_all", "signature"), + r"Either specify pubkey \+ signature or signed_by_\(any\|all\)", + ), + ( + ("text", "pubkey", "signed_by_any", "signature"), + r"Either specify pubkey \+ signature or signed_by_\(any\|all\)", + ), + ], +) +def test_verify_parameter_validation( + params, err, signed_data, pub_ec, sig_ec, tmp_path, asymm +): + kwargs = {} + if "text" in params: + kwargs["text"] = sig_ec + if "filename" in params: + data = tmp_path / "data" + data.write_text(signed_data) + kwargs["filename"] = str(data) + if "pubkey" in params: + kwargs["pubkey"] = pub_ec + if "signature" in params: + kwargs["signature"] = sig_ec + if "signed_by_any" in params: + kwargs["signed_by_any"] = [pub_ec] + if "signed_by_all" in params: + kwargs["signed_by_all"] = [pub_ec] + with pytest.raises(SaltInvocationError, match=err): + asymm.verify(**kwargs) + + +def test_verify_single_pubkey_no_signature_list(pub_ec, sig_ec, signed_data, asymm): + with pytest.raises( + SaltInvocationError, + match=".*must be a string or bytes when verifying a single.*", + ): + asymm.verify(text=signed_data, pubkey=pub_ec, signature=[sig_ec]) + + +def test_verify_missing_filename(pub_ec, sig_ec, tmp_path, asymm): + with pytest.raises(CommandExecutionError, match="Path .*does not exist.*"): + asymm.verify( + filename=str(tmp_path / "missing"), pubkey=pub_ec, signature=sig_ec + )