Address review remarks

This commit is contained in:
jeanluc 2022-12-08 19:25:21 +01:00 committed by Megan Wilhite
parent dca6305064
commit 073fc0e7a6
8 changed files with 280 additions and 283 deletions

View file

@ -490,9 +490,9 @@ def create_certificate(
"sha256, sha384, sha512, sha512_224, sha512_256, sha3_224, sha3_256, "
"sha3_384, sha3_512"
)
if "der" == encoding and append_certs:
if encoding == "der" and append_certs:
raise SaltInvocationError("Cannot encode a certificate chain in DER")
if "pkcs12" == encoding and "private_key" not in kwargs:
if encoding == "pkcs12" and "private_key" not in kwargs:
# The creation will work, but it will be listed in additional certs, not
# as the main certificate. This might confuse other parts of the code.
raise SaltInvocationError(
@ -512,7 +512,7 @@ def create_certificate(
x509util.merge_signing_policy(_get_signing_policy(signing_policy), kwargs)
cert, private_key_loaded = _create_certificate_local(**kwargs)
if "pkcs12" == encoding:
if encoding == "pkcs12":
return encode_certificate(
cert,
append_certs=append_certs,
@ -645,13 +645,13 @@ def encode_certificate(
f"Invalid value '{encoding}' for encoding. Valid: "
"der, pem, pkcs7_der, pkcs7_pem, pkcs12"
)
if "der" == encoding and append_certs:
if encoding == "der" and append_certs:
raise SaltInvocationError("Cannot encode a certificate chain in DER")
if "pkcs12" != encoding and private_key:
if encoding != "pkcs12" and private_key:
raise SaltInvocationError(
"Embedding private keys is only supported for pkcs12 encoding"
)
if "pkcs12" == encoding and not private_key:
if encoding == "pkcs12" and not private_key:
# The creation will work, but it will be listed in additional certs, not
# as the main certificate. This might confuse other parts of the code.
raise SaltInvocationError(
@ -672,7 +672,7 @@ def encode_certificate(
for append_cert in append_certs:
# this can only happen for PEM, checked in the beginning
crt_bytes += b"\n" + append_cert.public_bytes(crt_encoding)
elif "pkcs12" == encoding:
elif encoding == "pkcs12":
private_key = x509util.load_privkey(
private_key, passphrase=private_key_passphrase
)
@ -707,7 +707,7 @@ def encode_certificate(
crt_bytes = serialization.pkcs7.serialize_certificates(
[cert] + append_certs,
encoding=getattr(
serialization.Encoding, "PEM" if "pkcs7_pem" == encoding else "DER"
serialization.Encoding, "PEM" if encoding == "pkcs7_pem" else "DER"
),
)
except AttributeError as err:
@ -914,7 +914,7 @@ def encode_crl(crl, encoding="pem"):
crl_encoding = getattr(serialization.Encoding, encoding.upper())
crl_bytes = crl.public_bytes(crl_encoding)
if "pem" == encoding:
if encoding == "pem":
return crl_bytes.decode()
return base64.b64encode(crl_bytes).decode()
@ -1019,7 +1019,7 @@ def encode_csr(csr, encoding="pem"):
csr_encoding = getattr(serialization.Encoding, encoding.upper())
csr_bytes = csr.public_bytes(csr_encoding)
if "pem" == encoding:
if encoding == "pem":
return csr_bytes.decode()
return base64.b64encode(csr_bytes).decode()
@ -1107,7 +1107,7 @@ def encode_private_key(
else:
if isinstance(passphrase, str):
passphrase = passphrase.encode()
if "pkcs12" == encoding and pkcs12_encryption_compat:
if encoding == "pkcs12" and pkcs12_encryption_compat:
cipher = (
serialization.PrivateFormat.PKCS12.encryption_builder()
.kdf_rounds(50000)
@ -1132,7 +1132,7 @@ def encode_private_key(
name=None, key=private_key, cert=None, cas=None, encryption_algorithm=cipher
)
if "pem" == encoding:
if encoding == "pem":
return pk_bytes.decode()
return base64.b64encode(pk_bytes).decode()
@ -1833,13 +1833,13 @@ def _valid_pem(pem, pem_type=None):
def _generate_pk(algo="rsa", keysize=None):
if "rsa" == algo:
if algo == "rsa":
return x509util.generate_rsa_privkey(keysize=keysize or 2048)
if "ec" == algo:
if algo == "ec":
return x509util.generate_ec_privkey(keysize=keysize or 256)
if "ed25519" == algo:
if algo == "ed25519":
return x509util.generate_ed25519_privkey()
if "ed448" == algo:
if algo == "ed448":
return x509util.generate_ed448_privkey()
raise SaltInvocationError(
f"Invalid algorithm specified for generating private key: {algo}. Valid: "
@ -1888,7 +1888,7 @@ def _get_name_hash(name, digest="sha1"):
hsh = hashes.Hash(x509util.get_hashing_algorithm(digest))
hsh.update(name.public_bytes())
res = hsh.finalize()[:4]
if "little" == sys.byteorder:
if sys.byteorder == "little":
res = res[::-1]
return res

View file

@ -446,9 +446,9 @@ def certificate_managed(
else:
raise
else:
if current_encoding != encoding:
if encoding != current_encoding:
changes["encoding"] = encoding
elif "pkcs12" == encoding and current_extra.cert.friendly_name != (
elif encoding == "pkcs12" and current_extra.cert.friendly_name != (
salt.utils.stringutils.to_bytes(pkcs12_friendlyname)
if pkcs12_friendlyname
else None
@ -551,7 +551,7 @@ def certificate_managed(
"pkcs12_friendlyname",
}:
# do not reissue if only metaparameters changed
if "pkcs12" == encoding:
if encoding == "pkcs12":
cert = __salt__["x509.encode_certificate"](
current,
append_certs=append_certs,
@ -603,7 +603,7 @@ def certificate_managed(
)
if not changes or encoding in ["pem", "pkcs7_pem"]:
replace = encoding in ["pem", "pkcs7_pem"] and changes
replace = bool(encoding in ["pem", "pkcs7_pem"] and changes)
contents = cert if replace else None
file_managed_ret = _file_managed(
name, contents=contents, replace=replace, **file_args
@ -814,7 +814,7 @@ def crl_managed(
name,
)
if current_encoding != encoding:
if encoding != current_encoding:
changes["encoding"] = encoding
if days_remaining and (
current.next_update
@ -902,7 +902,7 @@ def crl_managed(
extensions=extensions,
)
ret["comment"] = f"The certificate revocation list has been {verb}d"
if "der" == encoding:
if encoding == "der":
# file.managed does not support binary contents, so create
# an empty file first (makedirs). This will not work with check_cmd!
file_managed_ret = _file_managed(name, replace=False, **file_args)
@ -913,7 +913,7 @@ def crl_managed(
real_name, base64.b64decode(crl), file_args.get("backup", "")
)
if not changes or "pem" == encoding:
if not changes or encoding == "pem":
replace = bool((encoding == "pem") and changes)
contents = crl if replace else None
file_managed_ret = _file_managed(
@ -1048,7 +1048,7 @@ def csr_managed(
name,
)
if current_encoding != encoding:
if encoding != current_encoding:
changes["encoding"] = encoding
builder, privkey = x509util.build_csr(
@ -1102,7 +1102,7 @@ def csr_managed(
**csr_args,
)
ret["comment"] = f"The certificate signing request has been {verb}d"
if "der" == encoding:
if encoding == "der":
# file.managed does not support binary contents, so create
# an empty file first (makedirs). This will not work with check_cmd!
file_managed_ret = _file_managed(name, replace=False, **file_args)
@ -1112,7 +1112,7 @@ def csr_managed(
_safe_atomic_write(
real_name, base64.b64decode(csr), file_args.get("backup", "")
)
if not changes or "pem" == encoding:
if not changes or encoding == "pem":
replace = bool((encoding == "pem") and changes)
contents = csr if replace else None
file_managed_ret = _file_managed(
@ -1320,16 +1320,16 @@ def private_key_managed(
key_type = x509util.get_key_type(current)
check_keysize = keysize
if check_keysize is None:
if "rsa" == algo:
if algo == "rsa":
check_keysize = 2048
elif "ec" == algo:
elif algo == "ec":
check_keysize = 256
if any(
(
("rsa" == algo and not x509util.KEY_TYPE.RSA == key_type),
("ec" == algo and not x509util.KEY_TYPE.EC == key_type),
("ed25519" == algo and not x509util.KEY_TYPE.ED25519 == key_type),
("ed448" == algo and not x509util.KEY_TYPE.ED448 == key_type),
(algo == "rsa" and not key_type == x509util.KEY_TYPE.RSA),
(algo == "ec" and not key_type == x509util.KEY_TYPE.EC),
(algo == "ed25519" and not key_type == x509util.KEY_TYPE.ED25519),
(algo == "ed448" and not key_type == x509util.KEY_TYPE.ED448),
)
):
changes["algo"] = algo
@ -1339,7 +1339,7 @@ def private_key_managed(
and current.key_size != check_keysize
):
changes["keysize"] = keysize
if current_encoding != encoding:
if encoding != current_encoding:
changes["encoding"] = encoding
elif file_exists and new:
changes["replaced"] = name
@ -1383,7 +1383,7 @@ def private_key_managed(
pkcs12_encryption_compat=pkcs12_encryption_compat,
)
ret["comment"] = f"The private key has been {verb}d"
if "pem" != encoding:
if encoding != "pem":
# file.managed does not support binary contents, so create
# an empty file first (makedirs). This will not work with check_cmd!
file_managed_ret = _file_managed(name, replace=False, **file_args)
@ -1394,7 +1394,7 @@ def private_key_managed(
real_name, base64.b64decode(pk), file_args.get("backup", "")
)
if not changes or "pem" == encoding:
if not changes or encoding == "pem":
replace = bool((encoding == "pem") and changes)
contents = pk if replace else None
file_managed_ret = _file_managed(
@ -1508,7 +1508,7 @@ def _compare_cert(current, builder, signing_cert, serial_number, not_before, not
changes["serial_number"] = serial_number
if not x509util.match_pubkey(
current.public_key(), _getattr_safe(builder, "_public_key")
_getattr_safe(builder, "_public_key"), current.public_key()
):
changes["private_key"] = True
@ -1644,7 +1644,7 @@ def _compare_ca_chain(current, new):
if not len(current) == len(new):
return False
for i, new_cert in enumerate(new):
if current[i].fingerprint(hashes.SHA256()) != new_cert.fingerprint(
if new_cert.fingerprint(hashes.SHA256()) != current[i].fingerprint(
hashes.SHA256()
):
return False

View file

@ -931,11 +931,11 @@ def verify_signature(cert, pubkey):
since it does not imply the certificate chain is valid.
"""
key_type = get_key_type(pubkey)
if KEY_TYPE.RSA == key_type:
if key_type == KEY_TYPE.RSA:
try:
# SignatureAlgorithmOID is not present in older versions,
# otherwise cx509.SignatureAlgorithmOID.RSASSA_PSS could be used
if "1.2.840.113549.1.1.10" == cert.signature_algorithm_oid.dotted_string:
if cert.signature_algorithm_oid.dotted_string == "1.2.840.113549.1.1.10":
pubkey.verify(
cert.signature,
cert.tbs_certificate_bytes,
@ -954,7 +954,7 @@ def verify_signature(cert, pubkey):
return True
except InvalidSignature:
return False
if KEY_TYPE.EC == key_type:
if key_type == KEY_TYPE.EC:
try:
pubkey.verify(
cert.signature,
@ -977,7 +977,7 @@ def verify_signature(cert, pubkey):
def isfile(path):
"""
A wrapper around os.path.isfile that ignores ValueError exception,s which
A wrapper around os.path.isfile that ignores ValueError exceptions which
can be raised if the input to isfile is too long.
"""
try:
@ -1036,7 +1036,7 @@ def load_file_or_bytes(fob):
def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
if "basicConstraints" == name:
if name == "basicConstraints":
try:
critical = val.get("critical", False)
except AttributeError:
@ -1063,7 +1063,7 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
except (TypeError, ValueError) as err:
raise SaltInvocationError(err) from err
if "keyUsage" == name:
if name == "keyUsage":
critical = "critical" in val
args = {
"digital_signature": "digitalSignature" in val,
@ -1081,7 +1081,7 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
except ValueError as err:
raise SaltInvocationError(err) from err
if "extendedKeyUsage" == name:
if name == "extendedKeyUsage":
critical = "critical" in val
if isinstance(val, str):
val, critical = _deserialize_openssl_confstring(val)
@ -1090,17 +1090,17 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
val = [val]
usages = []
for usage in val:
if "critical" == usage:
if usage == "critical":
continue
usages.append(EXTENDED_KEY_USAGE_OID.get(usage) or _get_oid(str(usage)))
return cx509.ExtendedKeyUsage(usages), critical
if "subjectKeyIdentifier" == name:
if name == "subjectKeyIdentifier":
if "critical" in val:
raise SaltInvocationError(
"subjectKeyIdentifier must be marked as non-critical"
)
if "hash" == val:
if val == "hash":
if not subject_pubkey:
raise RuntimeError(
"Cannot calculate digest for subjectKeyIdentifier: missing pubkey"
@ -1124,7 +1124,7 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
# this must be marked as non-critical
return cx509.SubjectKeyIdentifier(val), False
if "authorityKeyIdentifier" == name:
if name == "authorityKeyIdentifier":
if "critical" in val:
raise SaltInvocationError(
"authorityKeyIdentifier must be marked as non-critical"
@ -1165,7 +1165,7 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
except Exception: # pylint: disable=broad-except
pass
if "always" == val["keyid"] and args["key_identifier"] is None:
if val["keyid"] == "always" and args["key_identifier"] is None:
raise CommandExecutionError(
"Could not retrieve authorityKeyIdentifier keyid, but it was set to always"
)
@ -1181,7 +1181,7 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
args["authority_cert_issuer"] = args[
"authority_cert_serial_number"
] = None
if "always" == val["issuer"]:
if val["issuer"] == "always":
raise CommandExecutionError(
"Could not add authority_cert_issuer and "
"authority_cert_serial_number, but was set to always."
@ -1208,7 +1208,7 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
val, critical = _deserialize_openssl_confstring(val, multiple=True)
val = tuple(val)
parsed = []
if any(("issuer", "copy") == x for x in val):
if any(x == ("issuer", "copy") for x in val):
if not ca_crt:
raise RuntimeError("Need CA certificate to copy to issuerAltName")
try:
@ -1229,17 +1229,17 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
"internal API that the issuer:copy functionality relies on"
) from err
parsed.extend(_parse_general_names(val))
if "certificateIssuer" == name:
if name == "certificateIssuer":
return cx509.CertificateIssuer(parsed), critical
return cx509.IssuerAlternativeName(parsed), critical
if "authorityInfoAccess" == name:
if name == "authorityInfoAccess":
if isinstance(val, str):
val = (
x.strip().split(";") for x in val.split(",") if "critical" != x.strip()
x.strip().split(";") for x in val.split(",") if x.strip() != "critical"
)
elif isinstance(val, dict):
val = ((k, v) for k, v in val.items() if "critical" != k)
val = ((k, v) for k, v in val.items() if k != "critical")
elif isinstance(val, list):
val = ((k, v) for x in val for k, v in x.items() if x != "critical")
@ -1253,7 +1253,7 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
parsed.append(cx509.AccessDescription(oid, general_name))
return cx509.AuthorityInformationAccess(parsed), False # always noncritical
if "subjectAltName" == name:
if name == "subjectAltName":
# subjectAltName must be marked as critical if subject is empty
critical = "critical" in val
if isinstance(val, list):
@ -1323,11 +1323,11 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
)
except (ValueError, TypeError) as err:
raise SaltInvocationError(err) from err
if "freshestCRL" == name:
if name == "freshestCRL":
return cx509.FreshestCRL(parsed), False # must be non-critical
return cx509.CRLDistributionPoints(parsed), critical
if "issuingDistributionPoint" == name:
if name == "issuingDistributionPoint":
if not isinstance(val, dict):
raise SaltInvocationError("issuingDistributionPoint must be a dictionary")
critical = val.get("critical", False)
@ -1368,14 +1368,14 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
except (ValueError, TypeError) as err:
raise SaltInvocationError(err) from err
if "certificatePolicies" == name:
if name == "certificatePolicies":
if isinstance(val, str):
try:
critical = val.startswith("critical")
policy_identifiers = (
_get_oid(x.strip())
for x in val.split(",")
if "critical" != x.strip()
if x.strip() != "critical"
)
policy_information = [
cx509.PolicyInformation(policy_identifier=p, policy_qualifiers=None)
@ -1390,7 +1390,7 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
critical = val.get("critical", False)
parsed = []
for polid, qualifiers in val.items():
if "critical" == polid:
if polid == "critical":
continue
parsed_qualifiers = []
for qual in qualifiers:
@ -1420,7 +1420,7 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
)
return cx509.CertificatePolicies(parsed), critical
if "policyConstraints" == name:
if name == "policyConstraints":
critical = "critical" in val
if isinstance(val, str):
val, critical = _deserialize_openssl_confstring(val)
@ -1439,7 +1439,7 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
except (TypeError, ValueError) as err:
raise SaltInvocationError(err) from err
if "inhibitAnyPolicy" == name:
if name == "inhibitAnyPolicy":
critical = "critical" in val if not isinstance(val, int) else False
if isinstance(val, str):
val, critical = _deserialize_openssl_confstring(val)
@ -1456,7 +1456,7 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
except (TypeError, ValueError) as err:
raise SaltInvocationError(err) from err
if "nameConstraints" == name:
if name == "nameConstraints":
critical = "critical" in val
if isinstance(val, dict):
parsed = {}
@ -1476,10 +1476,10 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
items = tuple(x.strip().split(";") for x in val.split(","))
val = {
"permitted": [
x[1].split(":", maxsplit=1) for x in items if "permitted" == x[0]
x[1].split(":", maxsplit=1) for x in items if x[0] == "permitted"
],
"excluded": [
x[1].split(":", maxsplit=1) for x in items if "excluded" == x[0]
x[1].split(":", maxsplit=1) for x in items if x[0] == "excluded"
],
}
args = {
@ -1494,26 +1494,26 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
raise SaltInvocationError("nameConstraints needs at least one definition")
return cx509.NameConstraints(**args), critical
if "noCheck" == name:
if name == "noCheck":
return cx509.OCSPNoCheck(), "critical" in str(val)
if "tlsfeature" == name:
if name == "tlsfeature":
if isinstance(val, str):
val = [x.strip() for x in val.split(",")]
critical = "critical" in val
try:
types = [getattr(cx509.TLSFeatureType, x) for x in val if "critical" != x]
types = [getattr(cx509.TLSFeatureType, x) for x in val if x != "critical"]
except ValueError as err:
raise SaltInvocationError(err) from err
return cx509.TLSFeature(types), critical
if "nsComment" == name:
if name == "nsComment":
raise SaltInvocationError("nsComment is currently not implemented.")
if "nsCertType" == name:
if name == "nsCertType":
raise SaltInvocationError("nsCertType is currently not implemented.")
if "cRLNumber" == name:
if name == "cRLNumber":
try:
return cx509.CRLNumber(int(val)), False
except ValueError as err:
@ -1521,7 +1521,7 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
"cRLNumber must be an integer and must be marked as non-critical"
) from err
if "deltaCRLIndicator" == name:
if name == "deltaCRLIndicator":
critical = "critical" in str(val)
val = re.findall(r"[\d]+", str(val))
if len(val) != 1:
@ -1530,21 +1530,21 @@ def _create_extension(name, val, subject_pubkey=None, ca_crt=None, ca_pub=None):
)
return cx509.DeltaCRLIndicator(int(val[0])), critical
if "CRLReason" == name:
if name == "CRLReason":
critical = False
if isinstance(val, str):
val, critical = _deserialize_openssl_confstring(val)
else:
if "critical" in val:
critical = True
val = [x for x in val if "critical" != x]
val = [x for x in val if x != "critical"]
try:
return cx509.CRLReason(cx509.ReasonFlags(next(iter(val)))), critical
except ValueError as err:
raise SaltInvocationError(str(err)) from err
if "invalidityDate" == name:
if name == "invalidityDate":
if not isinstance(val, str):
raise SaltInvocationError("invalidityDate must be a string")
critical = val.startswith("critical")
@ -1617,11 +1617,11 @@ def _parse_general_names(val):
parsed = []
for typ, v in val:
typ = typ.lower()
if "dirname" == typ:
if typ == "dirname":
v = _get_dn(v)
elif "rid" == typ:
elif typ == "rid":
v = _get_oid(v)
elif "ip" == typ:
elif typ == "ip":
try:
v = ipaddress.ip_address(v)
except ValueError:
@ -1631,7 +1631,7 @@ def _parse_general_names(val):
raise CommandExecutionError(
f"Provided value {v} does not seem to be an IP address or network range."
) from err
elif "email" == typ:
elif typ == "email":
splits = v.rsplit("@", maxsplit=1)
if len(splits) > 1:
user, domain = splits
@ -1639,16 +1639,16 @@ def _parse_general_names(val):
v = "@".join((user, domain))
else:
v = idna_encode(splits[0], allow_leading_dot=True)
elif "uri" == typ:
elif typ == "uri":
url = urlparse(v)
if url.netloc:
domain = idna_encode(url.netloc)
v = urlunparse(
(url.scheme, domain, url.path, url.params, url.query, url.fragment)
)
elif "dns" == typ:
elif typ == "dns":
v = idna_encode(v, allow_leading_dot=True)
elif "othername" == typ:
elif typ == "othername":
raise SaltInvocationError("otherName is currently not implemented")
if typ in valid_types:
try:
@ -1920,7 +1920,7 @@ def render_extension(ext):
policies = []
for policy in ext.value._policies:
polid = policy.policy_identifier._name
if "Unknown OID" == polid:
if polid == "Unknown OID":
polid = policy.policy_identifier.dotted_string
qualifiers = []
for notice in policy.policy_qualifiers or []:

View file

@ -54,15 +54,12 @@ def minion_config_overrides():
}
@pytest.fixture()
@pytest.fixture
def x509(loaders, modules):
try:
yield modules.x509
finally:
pass
yield modules.x509
@pytest.fixture()
@pytest.fixture
def ca_cert():
return """\
-----BEGIN CERTIFICATE-----
@ -88,7 +85,7 @@ LN1w5sybsYwIw6QN
"""
@pytest.fixture()
@pytest.fixture
def ca_key():
return """\
-----BEGIN RSA PRIVATE KEY-----
@ -120,7 +117,7 @@ HdI7Pfaf/l0HozAw/Al+LXbpmSBdfmz0U/EGAKRqXMW5+vQ7XHXD
-----END RSA PRIVATE KEY-----"""
@pytest.fixture()
@pytest.fixture
def ca_key_enc():
return """\
-----BEGIN ENCRYPTED PRIVATE KEY-----
@ -155,7 +152,7 @@ A62orBDc+8x+AehfwYSm11dz5/P6aL3QZf+tzr05vbVn
-----END ENCRYPTED PRIVATE KEY-----"""
@pytest.fixture()
@pytest.fixture
def rsa_privkey():
return """\
-----BEGIN RSA PRIVATE KEY-----
@ -187,7 +184,7 @@ DYRTDIS9eg2LF4B64hZvkCLTmP4rLJWdRnWrLosIC4rD1uWgGayC
-----END RSA PRIVATE KEY-----"""
@pytest.fixture()
@pytest.fixture
def rsa_privkey_enc():
return """\
-----BEGIN ENCRYPTED PRIVATE KEY-----
@ -222,7 +219,7 @@ ahZPgPpP2p2uAz1+9MHpVPo2EIrvibm5T89DznwuaEfe
-----END ENCRYPTED PRIVATE KEY-----"""
@pytest.fixture()
@pytest.fixture
def rsa_pubkey():
return """\
-----BEGIN PUBLIC KEY-----
@ -236,7 +233,7 @@ ye1mdbD5KVBgJ9MArc2tJ3rmB0lxjEbAhTEHrNnIkDOJCKE8TaQOW4RyVWlIvSEL
-----END PUBLIC KEY-----"""
@pytest.fixture()
@pytest.fixture
def csr():
return """\
-----BEGIN CERTIFICATE REQUEST-----
@ -256,7 +253,7 @@ q1HXd62bA8k27ukX7w8qWsk6fOTwPh5F3883L5jVqcRsL9pqb4RUugTh/aReVlKW
-----END CERTIFICATE REQUEST-----"""
@pytest.fixture()
@pytest.fixture
def ec_privkey():
return """\
-----BEGIN PRIVATE KEY-----
@ -266,7 +263,7 @@ ldlNqU8U1Lz3ckCGI3TdGZ6nPaL3IT/UNH6C+J86RWSLY18hFHXoeKBD
-----END PRIVATE KEY-----"""
@pytest.fixture()
@pytest.fixture
def ec_pubkey():
return """\
-----BEGIN PUBLIC KEY-----
@ -275,7 +272,7 @@ MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEvuZnhksFYiq0UNzYtXH2tPFhuXSv
-----END PUBLIC KEY-----"""
@pytest.fixture()
@pytest.fixture
def ed25519_privkey():
return """\
-----BEGIN PRIVATE KEY-----
@ -283,7 +280,7 @@ MC4CAQAwBQYDK2VwBCIEIFKFjPIOBze2eo9x/EiCL0ni5GacaKIRZdfREBfuEdE9
-----END PRIVATE KEY-----"""
@pytest.fixture()
@pytest.fixture
def ed25519_pubkey():
return """\
-----BEGIN PUBLIC KEY-----
@ -291,7 +288,7 @@ MCowBQYDK2VwAyEAK+1yspaj/3Mb2K7H6y2d0Y+udSF+7sBozMY4aKUBR0I=
-----END PUBLIC KEY-----"""
@pytest.fixture()
@pytest.fixture
def ed448_privkey():
return """\
-----BEGIN PRIVATE KEY-----
@ -300,7 +297,7 @@ Y2QKHSRG0M1ZUFr/EYH9F9mTgnAwmyp7oA==
-----END PRIVATE KEY-----"""
@pytest.fixture()
@pytest.fixture
def ed448_pubkey():
return """\
-----BEGIN PUBLIC KEY-----
@ -309,7 +306,7 @@ vHv0+Ke3LRlEzGbwroKtP66opn4A
-----END PUBLIC KEY-----"""
@pytest.fixture()
@pytest.fixture
def cert_exts():
return """
-----BEGIN CERTIFICATE-----
@ -339,7 +336,7 @@ IiC+2Um3mhImnIoeRxH/cXTABsOrSE+QzIv7Z3orIUxyMqtm
-----END CERTIFICATE-----"""
@pytest.fixture()
@pytest.fixture
def csr_exts():
return """\
-----BEGIN CERTIFICATE REQUEST-----
@ -367,7 +364,7 @@ O68=
-----END CERTIFICATE REQUEST-----"""
@pytest.fixture()
@pytest.fixture
def cert_exts_read():
return {
"extensions": {
@ -460,7 +457,7 @@ def cert_exts_read():
}
@pytest.fixture()
@pytest.fixture
def csr_exts_read():
return {
"extensions": {
@ -526,7 +523,7 @@ def csr_exts_read():
}
@pytest.fixture()
@pytest.fixture
def crl():
return """\
-----BEGIN X509 CRL-----
@ -542,7 +539,7 @@ zfEPMyxWSMAqcsjSQ+MuF3KCdtaWAk7xTYpBafvRK4pC
-----END X509 CRL-----"""
@pytest.fixture()
@pytest.fixture
def crl_all():
return """\
-----BEGIN X509 CRL-----
@ -566,7 +563,7 @@ WcBGtev/8VsUijyjsM072C6Ut5TwNyrrthb952+eKlmxLNgT0o5hVYxjXhtwLQsL
-----END X509 CRL-----"""
@pytest.fixture()
@pytest.fixture
def crl_args(tmp_path, ca_cert, ca_key):
return {
"signing_private_key": ca_key,
@ -575,7 +572,7 @@ def crl_args(tmp_path, ca_cert, ca_key):
}
@pytest.fixture()
@pytest.fixture
def crl_args_exts():
return {
"authorityKeyIdentifier": "keyid:always",
@ -590,7 +587,7 @@ def crl_args_exts():
}
@pytest.fixture()
@pytest.fixture
def crl_revoked():
return [
{
@ -671,7 +668,7 @@ def test_create_certificate_self_signed(x509, algo, request):
res = x509.create_certificate(signing_private_key=privkey, CN="success")
assert res.startswith("-----BEGIN CERTIFICATE-----")
cert = _get_cert(res)
assert "CN=success" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=success"
@pytest.mark.parametrize("algo", ["rsa", "ec", "ed25519", "ed448"])
@ -685,7 +682,7 @@ def test_create_certificate_from_privkey(x509, ca_key, ca_cert, algo, request):
)
assert res.startswith("-----BEGIN CERTIFICATE-----")
cert = _get_cert(res)
assert "CN=success" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=success"
def test_create_certificate_from_encrypted_privkey(
@ -700,7 +697,7 @@ def test_create_certificate_from_encrypted_privkey(
)
assert res.startswith("-----BEGIN CERTIFICATE-----")
cert = _get_cert(res)
assert "CN=success" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=success"
def test_create_certificate_from_encrypted_privkey_with_encrypted_privkey(
@ -716,7 +713,7 @@ def test_create_certificate_from_encrypted_privkey_with_encrypted_privkey(
)
assert res.startswith("-----BEGIN CERTIFICATE-----")
cert = _get_cert(res)
assert "CN=success" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=success"
@pytest.mark.parametrize("algo", ["rsa", "ec", "ed25519", "ed448"])
@ -730,7 +727,7 @@ def test_create_certificate_from_pubkey(x509, ca_key, ca_cert, algo, request):
)
assert res.startswith("-----BEGIN CERTIFICATE-----")
cert = _get_cert(res)
assert "CN=success" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=success"
def test_create_certificate_from_csr(x509, ca_key, ca_cert, csr):
@ -739,7 +736,7 @@ def test_create_certificate_from_csr(x509, ca_key, ca_cert, csr):
)
assert res.startswith("-----BEGIN CERTIFICATE-----")
cert = _get_cert(res)
assert "CN=success" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=success"
def test_create_certificate_from_mismatching_private_key(
@ -864,7 +861,7 @@ def test_create_certificate_with_distinguished_name(
)
assert res.startswith("-----BEGIN CERTIFICATE-----")
cert = _get_cert(res)
assert "CN=Homer,L=Springfield,C=US" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=Homer,L=Springfield,C=US"
def test_create_certificate_with_signing_policy(x509, ca_cert, ca_key, rsa_privkey):
@ -878,10 +875,10 @@ def test_create_certificate_with_signing_policy(x509, ca_cert, ca_key, rsa_privk
)
assert res.startswith("-----BEGIN CERTIFICATE-----")
cert = _get_cert(res)
assert "CN=from_signing_policy" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=from_signing_policy"
for x in [cx509.BasicConstraints, cx509.KeyUsage, cx509.SubjectKeyIdentifier]:
ext = cert.extensions.get_extension_for_class(x)
if cx509.BasicConstraints == x:
if x == cx509.BasicConstraints:
assert not ext.value.ca
@ -901,7 +898,7 @@ def test_create_certificate_with_signing_policy_no_subject_override(
)
assert res.startswith("-----BEGIN CERTIFICATE-----")
cert = _get_cert(res)
assert "CN=from_signing_policy" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=from_signing_policy"
@pytest.mark.parametrize(
@ -1073,7 +1070,7 @@ def test_create_certificate_as_der(x509, ca_cert, ca_key, rsa_privkey):
private_key=rsa_privkey,
)
cert = _get_cert(res, "der")
assert "CN=success" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=success"
@pytest.mark.skipif(
@ -1090,7 +1087,7 @@ def test_create_certificate_as_pkcs7(x509, ca_cert, ca_key, rsa_privkey, typ):
private_key=rsa_privkey,
)
cert = _get_cert(res, f"pkcs7_{typ}")
assert "CN=success" == cert[0].subject.rfc4514_string()
assert cert[0].subject.rfc4514_string() == "CN=success"
@pytest.mark.skipif(
@ -1107,8 +1104,8 @@ def test_create_certificate_as_pkcs12(x509, ca_cert, ca_key, rsa_privkey):
private_key=rsa_privkey,
)
cert = _get_cert(res, "pkcs12")
assert "CN=success" == cert.cert.certificate.subject.rfc4514_string()
assert b"foo" == cert.cert.friendly_name
assert cert.cert.certificate.subject.rfc4514_string() == "CN=success"
assert cert.cert.friendly_name == b"foo"
@pytest.mark.skipif(
@ -1128,8 +1125,8 @@ def test_create_certificate_as_encrypted_pkcs12(x509, ca_cert, ca_key, rsa_privk
private_key=rsa_privkey_enc,
)
cert = _get_cert(res, "pkcs12", "hunter3")
assert "CN=success" == cert.cert.certificate.subject.rfc4514_string()
assert b"foo" == cert.cert.friendly_name
assert cert.cert.certificate.subject.rfc4514_string() == "CN=success"
assert cert.cert.friendly_name == b"foo"
def test_create_certificate_append_certs_pem(x509, ca_cert, ca_key, rsa_privkey):
@ -1141,7 +1138,7 @@ def test_create_certificate_append_certs_pem(x509, ca_cert, ca_key, rsa_privkey)
private_key=rsa_privkey,
)
cert = _get_cert(res)
assert "CN=success" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=success"
assert res.endswith(ca_cert)
@ -1160,7 +1157,7 @@ def test_create_certificate_append_certs_pkcs7(x509, ca_cert, ca_key, rsa_privke
private_key=rsa_privkey,
)
cert = _get_cert(res, f"pkcs7_{typ}")
assert "CN=success" == cert[0].subject.rfc4514_string()
assert cert[0].subject.rfc4514_string() == "CN=success"
assert cert[1].serial_number == _get_cert(ca_cert).serial_number
@ -1178,7 +1175,7 @@ def test_create_certificate_append_certs_pkcs12(x509, ca_cert, ca_key, rsa_privk
private_key=rsa_privkey,
)
cert = _get_cert(res, "pkcs12")
assert "CN=success" == cert.cert.certificate.subject.rfc4514_string()
assert cert.cert.certificate.subject.rfc4514_string() == "CN=success"
assert (
cert.additional_certs[0].certificate.serial_number
== _get_cert(ca_cert).serial_number
@ -1199,7 +1196,7 @@ def test_create_certificate_copypath(
)
assert res.startswith("-----BEGIN CERTIFICATE-----")
cert = _get_cert(res)
assert "CN=success" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=success"
prefix = ""
if prepend_cn:
prefix = "success-"
@ -1459,7 +1456,7 @@ def test_get_pem_entry_newline_fix(x509, ca_cert):
assert res == ca_cert.encode()
@pytest.fixture()
@pytest.fixture
def fresh_cert(x509, ca_key):
return x509.create_certificate(signing_private_key=ca_key, days_valid=1, CN="fresh")
@ -1485,12 +1482,16 @@ def test_expired(x509, ca_key, fresh_cert, tmp_path):
def test_will_expire(x509, fresh_cert):
assert {"check_days": 0, "cn": "fresh", "will_expire": False} == x509.will_expire(
fresh_cert, 0
)
assert {"check_days": 2, "cn": "fresh", "will_expire": True} == x509.will_expire(
fresh_cert, 2
)
assert x509.will_expire(fresh_cert, 0) == {
"check_days": 0,
"cn": "fresh",
"will_expire": False,
}
assert x509.will_expire(fresh_cert, 2) == {
"check_days": 2,
"cn": "fresh",
"will_expire": True,
}
def test_write_pem(x509, fresh_cert, tmp_path):
@ -1519,23 +1520,23 @@ def test_read_certificates(x509, cert_exts, cert_exts_read, tmp_path):
def _get_cert(cert, encoding="pem", passphrase=None):
if "pem" == encoding:
if encoding == "pem":
if not isinstance(cert, bytes):
cert = cert.encode()
return cx509.load_pem_x509_certificate(cert)
if "der" == encoding:
if encoding == "der":
if not isinstance(cert, bytes):
cert = base64.b64decode(cert)
return cx509.load_der_x509_certificate(cert)
if "pkcs7_pem" == encoding:
if encoding == "pkcs7_pem":
if not isinstance(cert, bytes):
cert = cert.encode()
return pkcs7.load_pem_pkcs7_certificates(cert)
if "pkcs7_der" == encoding:
if encoding == "pkcs7_der":
if not isinstance(cert, bytes):
cert = base64.b64decode(cert)
return pkcs7.load_der_pkcs7_certificates(cert)
if "pkcs12" == encoding:
if encoding == "pkcs12":
if not isinstance(cert, bytes):
cert = base64.b64decode(cert)
if passphrase is not None and not isinstance(passphrase, bytes):

View file

@ -1,5 +1,4 @@
import base64
import shutil
from pathlib import Path
import pytest
@ -51,15 +50,12 @@ def minion_config_overrides():
}
@pytest.fixture()
@pytest.fixture
def x509(loaders, states, tmp_path):
try:
yield states.x509
finally:
shutil.rmtree(tmp_path, ignore_errors=True)
yield states.x509
@pytest.fixture()
@pytest.fixture
def ca_cert():
return """\
-----BEGIN CERTIFICATE-----
@ -85,7 +81,7 @@ LN1w5sybsYwIw6QN
"""
@pytest.fixture()
@pytest.fixture
def ca_key():
return """\
-----BEGIN RSA PRIVATE KEY-----
@ -117,7 +113,7 @@ HdI7Pfaf/l0HozAw/Al+LXbpmSBdfmz0U/EGAKRqXMW5+vQ7XHXD
-----END RSA PRIVATE KEY-----"""
@pytest.fixture()
@pytest.fixture
def ca_key_enc():
return """\
-----BEGIN ENCRYPTED PRIVATE KEY-----
@ -152,7 +148,7 @@ A62orBDc+8x+AehfwYSm11dz5/P6aL3QZf+tzr05vbVn
-----END ENCRYPTED PRIVATE KEY-----"""
@pytest.fixture()
@pytest.fixture
def rsa_privkey():
return """\
-----BEGIN RSA PRIVATE KEY-----
@ -184,7 +180,7 @@ DYRTDIS9eg2LF4B64hZvkCLTmP4rLJWdRnWrLosIC4rD1uWgGayC
-----END RSA PRIVATE KEY-----"""
@pytest.fixture()
@pytest.fixture
def rsa_privkey_enc():
return """\
-----BEGIN ENCRYPTED PRIVATE KEY-----
@ -219,7 +215,7 @@ ahZPgPpP2p2uAz1+9MHpVPo2EIrvibm5T89DznwuaEfe
-----END ENCRYPTED PRIVATE KEY-----"""
@pytest.fixture()
@pytest.fixture
def rsa_pubkey():
return """\
-----BEGIN PUBLIC KEY-----
@ -233,7 +229,7 @@ ye1mdbD5KVBgJ9MArc2tJ3rmB0lxjEbAhTEHrNnIkDOJCKE8TaQOW4RyVWlIvSEL
-----END PUBLIC KEY-----"""
@pytest.fixture()
@pytest.fixture
def csr():
return """\
-----BEGIN CERTIFICATE REQUEST-----
@ -253,7 +249,7 @@ q1HXd62bA8k27ukX7w8qWsk6fOTwPh5F3883L5jVqcRsL9pqb4RUugTh/aReVlKW
-----END CERTIFICATE REQUEST-----"""
@pytest.fixture()
@pytest.fixture
def csr_invalid_version():
return """\
-----BEGIN CERTIFICATE REQUEST-----
@ -273,7 +269,7 @@ BldjvVnQN7bCjM2TQTMSbd00lD+071hLm6ceDQdoewbipNKyhBnQd4hFYJgDPQR7
-----END CERTIFICATE REQUEST-----"""
@pytest.fixture()
@pytest.fixture
def ec_privkey():
return """\
-----BEGIN PRIVATE KEY-----
@ -283,7 +279,7 @@ ldlNqU8U1Lz3ckCGI3TdGZ6nPaL3IT/UNH6C+J86RWSLY18hFHXoeKBD
-----END PRIVATE KEY-----"""
@pytest.fixture()
@pytest.fixture
def ec_pubkey():
return """\
-----BEGIN PUBLIC KEY-----
@ -292,7 +288,7 @@ MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEvuZnhksFYiq0UNzYtXH2tPFhuXSv
-----END PUBLIC KEY-----"""
@pytest.fixture()
@pytest.fixture
def ed25519_privkey():
return """\
-----BEGIN PRIVATE KEY-----
@ -300,7 +296,7 @@ MC4CAQAwBQYDK2VwBCIEIFKFjPIOBze2eo9x/EiCL0ni5GacaKIRZdfREBfuEdE9
-----END PRIVATE KEY-----"""
@pytest.fixture()
@pytest.fixture
def ed25519_pubkey():
return """\
-----BEGIN PUBLIC KEY-----
@ -308,7 +304,7 @@ MCowBQYDK2VwAyEAK+1yspaj/3Mb2K7H6y2d0Y+udSF+7sBozMY4aKUBR0I=
-----END PUBLIC KEY-----"""
@pytest.fixture()
@pytest.fixture
def ed448_privkey():
return """\
-----BEGIN PRIVATE KEY-----
@ -317,7 +313,7 @@ Y2QKHSRG0M1ZUFr/EYH9F9mTgnAwmyp7oA==
-----END PRIVATE KEY-----"""
@pytest.fixture()
@pytest.fixture
def ed448_pubkey():
return """\
-----BEGIN PUBLIC KEY-----
@ -326,7 +322,7 @@ vHv0+Ke3LRlEzGbwroKtP66opn4A
-----END PUBLIC KEY-----"""
@pytest.fixture()
@pytest.fixture
def cert_exts():
return """
-----BEGIN CERTIFICATE-----
@ -356,7 +352,7 @@ IiC+2Um3mhImnIoeRxH/cXTABsOrSE+QzIv7Z3orIUxyMqtm
-----END CERTIFICATE-----"""
@pytest.fixture()
@pytest.fixture
def csr_exts():
return """\
-----BEGIN CERTIFICATE REQUEST-----
@ -384,7 +380,7 @@ O68=
-----END CERTIFICATE REQUEST-----"""
@pytest.fixture()
@pytest.fixture
def cert_args(tmp_path, ca_cert, ca_key):
return {
"name": f"{tmp_path}/cert",
@ -394,7 +390,7 @@ def cert_args(tmp_path, ca_cert, ca_key):
}
@pytest.fixture()
@pytest.fixture
def cert_args_exts():
return {
"basicConstraints": "critical, CA:TRUE, pathlen:1",
@ -415,7 +411,7 @@ def cert_args_exts():
}
@pytest.fixture()
@pytest.fixture
def crl_args(tmp_path, ca_cert, ca_key):
return {
"name": f"{tmp_path}/crl",
@ -425,7 +421,7 @@ def crl_args(tmp_path, ca_cert, ca_key):
}
@pytest.fixture()
@pytest.fixture
def crl_args_exts():
return {
"authorityKeyIdentifier": "keyid:always",
@ -440,7 +436,7 @@ def crl_args_exts():
}
@pytest.fixture()
@pytest.fixture
def crl_revoked():
return [
{
@ -515,7 +511,7 @@ def crl_revoked():
]
@pytest.fixture()
@pytest.fixture
def csr_args(tmp_path, rsa_privkey):
return {
"name": f"{tmp_path}/csr",
@ -524,7 +520,7 @@ def csr_args(tmp_path, rsa_privkey):
}
@pytest.fixture()
@pytest.fixture
def csr_args_exts():
return {
"basicConstraints": "critical, CA:TRUE, pathlen:1",
@ -541,7 +537,7 @@ def csr_args_exts():
}
@pytest.fixture()
@pytest.fixture
def pk_args(tmp_path):
return {
"name": f"{tmp_path}/private_key",
@ -820,7 +816,7 @@ def test_certificate_managed_with_extensions(
cert_args.update(cert_args_exts)
ret = x509.certificate_managed(**cert_args)
cert = _assert_cert_created_basic(ret, cert_args["name"], rsa_privkey, ca_key)
assert len(cert_args_exts) == len(cert.extensions)
assert len(cert.extensions) == len(cert_args_exts)
def test_certificate_managed_with_signing_policy(x509, cert_args, rsa_privkey, ca_key):
@ -833,7 +829,7 @@ def test_certificate_managed_with_signing_policy(x509, cert_args, rsa_privkey, c
assert ret.changes
assert ret.changes.get("created")
cert = _get_cert(cert_args["name"])
assert "CN=from_signing_policy" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=from_signing_policy"
assert _belongs_to(cert, rsa_privkey)
assert _signed_by(cert, ca_key)
@ -858,8 +854,8 @@ def test_certificate_managed_with_distinguished_name_kwargs(
assert ret.changes.get("created") == cert_args["name"]
cert = _get_cert(cert_args["name"])
assert (
"CN=salt.test,OU=SaltStack Test,O=SaltStack,L=Some Town,ST=Some State,C=US"
== cert.subject.rfc4514_string()
cert.subject.rfc4514_string()
== "CN=salt.test,OU=SaltStack Test,O=SaltStack,L=Some Town,ST=Some State,C=US"
)
assert _belongs_to(cert, rsa_privkey)
assert _signed_by(cert, ca_key)
@ -874,7 +870,7 @@ def test_certificate_managed_without_subject(x509, cert_args, rsa_privkey, ca_ke
assert ret.changes
assert ret.changes.get("created") == cert_args["name"]
cert = _get_cert(cert_args["name"])
assert "" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == ""
assert _belongs_to(cert, rsa_privkey)
assert _signed_by(cert, ca_key)
@ -1029,7 +1025,7 @@ def test_certificate_managed_days_remaining(x509, cert_args, days, expected):
"""
cert_args["days_remaining"] = days
ret = x509.certificate_managed(**cert_args)
assert bool(ret.changes) == expected
assert bool(ret.changes) is expected
@pytest.mark.usefixtures("existing_cert")
@ -1155,7 +1151,7 @@ def test_certificate_managed_encoding_change(
cert_new = _assert_cert_basic(
ret, cert_args["name"], rsa_privkey, ca_key, encoding=encoding
)
assert cert.serial_number == cert_new.serial_number
assert cert_new.serial_number == cert.serial_number
@pytest.mark.usefixtures("existing_cert_chain")
@ -1205,12 +1201,12 @@ def test_certificate_managed_chain_change(
)
if cert_args["encoding"].startswith("pkcs7"):
cert = cert[0]
elif "pkcs12" == cert_args["encoding"]:
elif cert_args["encoding"] == "pkcs12":
if CRYPTOGRAPHY_VERSION[0] == 36:
# it seems (serial number) parsing of pkcs12 certificates is broken (?) in that release
return
cert = cert.cert.certificate
assert cert.serial_number == cert_new.serial_number
assert cert_new.serial_number == cert.serial_number
@pytest.mark.usefixtures("existing_cert")
@ -1224,7 +1220,7 @@ def test_certificate_managed_additional_certs_change(
cert = _get_cert(cert_args["name"])
ret = x509.certificate_managed(**cert_args)
cert_new = _assert_cert_basic(ret, cert_args["name"], rsa_privkey, ca_key)
assert cert.serial_number == cert_new.serial_number
assert cert_new.serial_number == cert.serial_number
def test_certificate_managed_wrong_ca_key(
@ -1255,7 +1251,7 @@ def test_pkcs12_friendlyname_change(x509, cert_args, ca_cert, ca_key, rsa_privke
_assert_cert_basic(ret, cert_args["name"], rsa_privkey, ca_key, encoding="pkcs12")
cert_new = _get_cert(cert_args["name"], encoding="pkcs12")
assert (
cert.cert.certificate.serial_number == cert_new.cert.certificate.serial_number
cert_new.cert.certificate.serial_number == cert.cert.certificate.serial_number
)
assert cert_new.cert.friendly_name == b"bar"
@ -1311,7 +1307,7 @@ def test_certificate_managed_mode(x509, cert_args, rsa_privkey, ca_key, mode, mo
cert_args["mode"] = mode
ret = x509.certificate_managed(**cert_args)
_assert_cert_created_basic(ret, cert_args["name"], rsa_privkey, ca_key)
assert mode == modules.file.get_mode(cert_args["name"])
assert modules.file.get_mode(cert_args["name"]) == mode
def test_certificate_managed_file_managed_create_false(
@ -1336,7 +1332,7 @@ def test_certificate_managed_mode_change_only(
"""
This serves as a proxy for all file.managed args
"""
assert "0644" == modules.file.get_mode(cert_args["name"])
assert modules.file.get_mode(cert_args["name"]) == "0644"
cert_args["mode"] = "0640"
cert_args.pop("serial_number", None)
cert = _get_cert(cert_args["name"])
@ -1344,9 +1340,9 @@ def test_certificate_managed_mode_change_only(
assert ret.result is True
assert ret.filtered["sub_state_run"][0]["changes"]
assert "mode" in ret.filtered["sub_state_run"][0]["changes"]
assert "0640" == modules.file.get_mode(cert_args["name"])
assert modules.file.get_mode(cert_args["name"]) == "0640"
cert_new = _get_cert(cert_args["name"])
assert cert.serial_number == cert_new.serial_number
assert cert_new.serial_number == cert.serial_number
@pytest.mark.usefixtures("existing_cert")
@ -1458,7 +1454,7 @@ def test_certificate_managed_pkcs12_embedded_pk_kept(
_assert_cert_basic(ret, cert_args["name"], rsa_privkey, ca_key, encoding="pkcs12")
assert list(ret.changes) == ["expiration"]
new_pk = _get_cert(cert_args["name"], encoding="pkcs12").key
assert cur_pk.public_key().public_numbers() == new_pk.public_key().public_numbers()
assert new_pk.public_key().public_numbers() == cur_pk.public_key().public_numbers()
def test_crl_managed_empty(x509, crl_args, ca_key):
@ -1480,7 +1476,7 @@ def test_crl_managed_with_revocations(x509, crl_args, crl_revoked, ca_key):
ret = x509.crl_managed(**crl_args)
crl = _assert_crl_basic(ret, ca_key)
assert len(crl) == len(crl_args["revoked"])
assert 2 == len((next(iter(crl))).extensions)
assert len((next(iter(crl))).extensions) == 2
def test_crl_managed_der(x509, crl_args, ca_key):
@ -1543,7 +1539,7 @@ def test_crl_managed_existing_renew(x509, crl_args, ca_key):
crl_args["days_remaining"] = 300
ret = x509.crl_managed(**crl_args)
_assert_crl_basic(ret, ca_key)
assert {"expiration"} == set(ret.changes)
assert set(ret.changes) == {"expiration"}
@pytest.mark.usefixtures("existing_crl")
@ -1562,7 +1558,7 @@ def test_crl_managed_existing_revocations_changed(x509, crl_args, crl_revoked, c
ret = x509.crl_managed(**crl_args)
_assert_crl_basic(ret, ca_key)
assert "revocations" in ret.changes
assert 2 == len(ret.changes["revocations"]["changed"])
assert len(ret.changes["revocations"]["changed"]) == 2
@pytest.mark.usefixtures("existing_crl_rev")
@ -1571,7 +1567,7 @@ def test_crl_managed_existing_revocations_removed(x509, crl_args, crl_revoked, c
ret = x509.crl_managed(**crl_args)
_assert_crl_basic(ret, ca_key)
assert "revocations" in ret.changes
assert 1 == len(ret.changes["revocations"]["removed"])
assert len(ret.changes["revocations"]["removed"]) == 1
@pytest.mark.usefixtures("existing_crl")
@ -1582,7 +1578,7 @@ def test_crl_managed_existing_signing_key_change(
crl_args["signing_cert"] = cert_exts
ret = x509.crl_managed(**crl_args)
_assert_crl_basic(ret, rsa_privkey)
assert {"issuer_name", "public_key"} == set(ret.changes)
assert set(ret.changes) == {"issuer_name", "public_key"}
@pytest.mark.usefixtures("existing_crl")
@ -1590,7 +1586,7 @@ def test_crl_managed_existing_digest_change(x509, crl_args, ca_key):
crl_args["digest"] = "sha512"
ret = x509.crl_managed(**crl_args)
_assert_crl_basic(ret, ca_key)
assert {"digest"} == set(ret.changes)
assert set(ret.changes) == {"digest"}
@pytest.mark.usefixtures("existing_crl")
@ -1622,7 +1618,7 @@ def test_crl_managed_exts_added(x509, crl_args, crl_args_exts, ca_key):
ret = x509.crl_managed(**crl_args)
_assert_crl_basic(ret, ca_key)
assert "extensions" in ret.changes
assert len(crl_args_exts) == len(ret.changes["extensions"]["added"])
assert len(ret.changes["extensions"]["added"]) == len(crl_args_exts)
@pytest.mark.usefixtures("existing_crl_exts")
@ -1631,7 +1627,7 @@ def test_crl_managed_existing_exts_changed(x509, crl_args, ca_key):
ret = x509.crl_managed(**crl_args)
_assert_crl_basic(ret, ca_key)
assert "extensions" in ret.changes
assert 1 == len(ret.changes["extensions"]["changed"])
assert len(ret.changes["extensions"]["changed"]) == 1
@pytest.mark.usefixtures("existing_crl_exts")
@ -1640,7 +1636,7 @@ def test_crl_managed_existing_exts_removed(x509, crl_args, ca_key):
ret = x509.crl_managed(**crl_args)
_assert_crl_basic(ret, ca_key)
assert "extensions" in ret.changes
assert 1 == len(ret.changes["extensions"]["removed"])
assert len(ret.changes["extensions"]["removed"]) == 1
@pytest.mark.usefixtures("existing_crl")
@ -1651,11 +1647,11 @@ def test_crl_managed_existing_crl_crlnumber_auto(x509, crl_args, crl_revoked, ca
# the dict is manipulated by the state function, it contains 1 now
crl_args["extensions"]["cRLNumber"] = "auto"
cur = _get_crl(crl_args["name"])
assert 1 == cur.extensions[0].value.crl_number
assert cur.extensions[0].value.crl_number == 1
crl_args["revoked"] = crl_revoked
ret = x509.crl_managed(**crl_args)
new = _assert_crl_basic(ret, ca_key)
assert 2 == new.extensions[0].value.crl_number
assert new.extensions[0].value.crl_number == 2
@pytest.mark.usefixtures("existing_crl")
@ -1666,11 +1662,11 @@ def test_crl_managed_existing_crl_crlnumber_auto_no_change(x509, crl_args):
# the dict is manipulated by the state function, it contains 1 now
crl_args["extensions"]["cRLNumber"] = "auto"
cur = _get_crl(crl_args["name"])
assert 1 == cur.extensions[0].value.crl_number
assert cur.extensions[0].value.crl_number == 1
ret = x509.crl_managed(**crl_args)
_assert_not_changed(ret)
new = _get_crl(crl_args["name"])
assert cur.extensions[0].value.crl_number == new.extensions[0].value.crl_number
assert new.extensions[0].value.crl_number == cur.extensions[0].value.crl_number
@pytest.mark.usefixtures("existing_crl")
@ -1682,12 +1678,12 @@ def test_crl_managed_existing_encoding_change_only(x509, crl_args, ca_key):
crl_args["extensions"]["cRLNumber"] = "auto"
crl_args["encoding"] = "der"
cur = _get_crl(crl_args["name"])
assert 1 == cur.extensions[0].value.crl_number
assert cur.extensions[0].value.crl_number == 1
ret = x509.crl_managed(**crl_args)
assert ret.result
assert ret.changes
new = _get_crl(crl_args["name"], encoding="der")
assert 1 == new.extensions[0].value.crl_number
assert new.extensions[0].value.crl_number == 1
@pytest.mark.parametrize("mode", ["0400", "0640", "0644"])
@ -1698,7 +1694,7 @@ def test_crl_managed_mode(x509, crl_args, ca_key, mode, modules):
crl_args["mode"] = mode
ret = x509.crl_managed(**crl_args)
_assert_crl_basic(ret, ca_key)
assert mode == modules.file.get_mode(crl_args["name"])
assert modules.file.get_mode(crl_args["name"]) == mode
def test_crl_managed_file_managed_create_false(x509, crl_args):
@ -1722,18 +1718,18 @@ def test_crl_managed_mode_change_only(x509, crl_args, ca_key, modules):
"""
This serves as a proxy for all file.managed args
"""
assert "0644" == modules.file.get_mode(crl_args["name"])
assert modules.file.get_mode(crl_args["name"]) == "0644"
crl_args["mode"] = "0640"
crl = _get_crl(crl_args["name"])
ret = x509.crl_managed(**crl_args)
assert ret.result is True
assert ret.filtered["sub_state_run"][0]["changes"]
assert "mode" in ret.filtered["sub_state_run"][0]["changes"]
assert "0640" == modules.file.get_mode(crl_args["name"])
assert modules.file.get_mode(crl_args["name"]) == "0640"
crl_new = _get_crl(crl_args["name"])
assert (
crl.extensions.get_extension_for_class(cx509.CRLNumber).value
== crl_new.extensions.get_extension_for_class(cx509.CRLNumber).value
crl_new.extensions.get_extension_for_class(cx509.CRLNumber).value
== crl.extensions.get_extension_for_class(cx509.CRLNumber).value
)
@ -1851,7 +1847,7 @@ def test_csr_managed_with_extensions(x509, csr_args, csr_args_exts, rsa_privkey)
csr_args.update(csr_args_exts)
ret = x509.csr_managed(**csr_args)
csr = _assert_csr_basic(ret, rsa_privkey)
assert len(csr_args_exts) == len(csr.extensions)
assert len(csr.extensions) == len(csr_args_exts)
def test_csr_managed_with_subject(x509, csr_args, rsa_privkey):
@ -1992,7 +1988,7 @@ def test_csr_managed_mode(x509, csr_args, rsa_privkey, mode, modules):
csr_args["mode"] = mode
ret = x509.csr_managed(**csr_args)
_assert_csr_basic(ret, rsa_privkey)
assert mode == modules.file.get_mode(csr_args["name"])
assert modules.file.get_mode(csr_args["name"]) == mode
def test_csr_managed_file_managed_create_false(x509, csr_args):
@ -2012,14 +2008,14 @@ def test_csr_managed_mode_change_only(x509, csr_args, ca_key, modules):
"""
This serves as a proxy for all file.managed args
"""
assert "0644" == modules.file.get_mode(csr_args["name"])
assert modules.file.get_mode(csr_args["name"]) == "0644"
csr_args["mode"] = "0640"
ret = x509.csr_managed(**csr_args)
assert ret.result is True
assert not ret.changes
assert ret.filtered["sub_state_run"][0]["changes"]
assert "mode" in ret.filtered["sub_state_run"][0]["changes"]
assert "0640" == modules.file.get_mode(csr_args["name"])
assert modules.file.get_mode(csr_args["name"]) == "0640"
@pytest.mark.usefixtures("existing_csr")
@ -2131,7 +2127,7 @@ def test_csr_managed_file_managed_error(x509, csr_args, encoding):
def test_private_key_managed(x509, pk_args, algo, encoding, passphrase):
if (
algo in ["ed25519", "ed448"]
and "pkcs12" == encoding
and encoding == "pkcs12"
and CRYPTOGRAPHY_VERSION[0] < 37
):
pytest.skip(
@ -2224,7 +2220,7 @@ def test_private_key_managed_encoding_change(x509, pk_args, encoding):
pk_args["encoding"] = encoding
ret = x509.private_key_managed(**pk_args)
new = _assert_pk_basic(ret, "rsa", encoding=encoding)
assert cur.public_key().public_numbers() == new.public_key().public_numbers()
assert new.public_key().public_numbers() == cur.public_key().public_numbers()
@pytest.mark.usefixtures("existing_pk")
@ -2233,7 +2229,7 @@ def test_private_key_managed_passphrase_introduced(x509, pk_args):
cur = _get_privkey(pk_args["name"])
ret = x509.private_key_managed(**pk_args)
new = _assert_pk_basic(ret, "rsa", passphrase="hunter1")
assert cur.public_key().public_numbers() == new.public_key().public_numbers()
assert new.public_key().public_numbers() == cur.public_key().public_numbers()
@pytest.mark.usefixtures("existing_pk")
@ -2287,7 +2283,7 @@ def test_private_key_managed_mode(x509, pk_args, mode, encoding, modules):
pk_args["encoding"] = encoding
ret = x509.private_key_managed(**pk_args)
_assert_pk_basic(ret, "rsa", encoding=encoding)
assert (mode or "0400") == modules.file.get_mode(pk_args["name"])
assert modules.file.get_mode(pk_args["name"]) == (mode or "0400")
def test_private_key_managed_file_managed_create_false(x509, pk_args):
@ -2382,16 +2378,16 @@ def test_private_key_managed_mode_change_only(x509, pk_args, modules):
"""
This serves as a proxy for all file.managed args
"""
assert "0400" == modules.file.get_mode(pk_args["name"])
assert modules.file.get_mode(pk_args["name"]) == "0400"
pk_args["mode"] = "0600"
cur = _get_privkey(pk_args["name"])
ret = x509.private_key_managed(**pk_args)
assert ret.result is True
assert ret.filtered["sub_state_run"][0]["changes"]
assert "mode" in ret.filtered["sub_state_run"][0]["changes"]
assert "0600" == modules.file.get_mode(pk_args["name"])
assert modules.file.get_mode(pk_args["name"]) == "0600"
new = _get_privkey(pk_args["name"])
assert cur.public_key().public_numbers() == new.public_key().public_numbers()
assert new.public_key().public_numbers() == cur.public_key().public_numbers()
@pytest.mark.parametrize("encoding", ["pem", "der"])
@ -2463,7 +2459,7 @@ def _assert_cert_created_basic(
cert = _get_cert(name, encoding=encoding, passphrase=passphrase)
if encoding.startswith("pkcs7"):
cert = cert[0]
elif "pkcs12" == encoding:
elif encoding == "pkcs12":
# pkcs12 embeds the private key inside the container
assert _belongs_to(cert.key.public_key(), privkey)
if get_pkcs12:
@ -2471,7 +2467,7 @@ def _assert_cert_created_basic(
cert = cert.cert.certificate
if subject is None:
subject = "CN=success"
assert subject == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == subject
assert _belongs_to(cert, privkey)
assert _signed_by(cert, ca_key)
return cert
@ -2485,7 +2481,7 @@ def _assert_cert_basic(
cert = _get_cert(name, encoding=encoding, passphrase=passphrase)
if encoding.startswith("pkcs7"):
cert = cert[0]
elif "pkcs12" == encoding:
elif encoding == "pkcs12":
assert _belongs_to(cert.key.public_key(), privkey)
if get_pkcs12:
return cert
@ -2503,23 +2499,23 @@ def _get_cert(cert, encoding="pem", passphrase=None):
except Exception: # pylint: disable=broad-except
pass
if "pem" == encoding:
if encoding == "pem":
if not isinstance(cert, bytes):
cert = cert.encode()
return cx509.load_pem_x509_certificate(cert)
if "der" == encoding:
if encoding == "der":
if not isinstance(cert, bytes):
cert = base64.b64decode(cert)
return cx509.load_der_x509_certificate(cert)
if "pkcs7_pem" == encoding:
if encoding == "pkcs7_pem":
if not isinstance(cert, bytes):
cert = cert.encode()
return pkcs7.load_pem_pkcs7_certificates(cert)
if "pkcs7_der" == encoding:
if encoding == "pkcs7_der":
if not isinstance(cert, bytes):
cert = base64.b64decode(cert)
return pkcs7.load_der_pkcs7_certificates(cert)
if "pkcs12" == encoding:
if encoding == "pkcs12":
if not isinstance(cert, bytes):
cert = base64.b64decode(cert)
if passphrase is not None and not isinstance(passphrase, bytes):
@ -2560,13 +2556,13 @@ def _assert_pk_basic(ret, algo, encoding="pem", passphrase=None):
assert ret.result
assert ret.changes
pk = _get_privkey(ret.name, encoding=encoding, passphrase=passphrase)
if "rsa" == algo:
if algo == "rsa":
assert isinstance(pk, rsa.RSAPrivateKey)
if "ec" == algo:
if algo == "ec":
assert isinstance(pk, ec.EllipticCurvePrivateKey)
if "ed25519" == algo:
if algo == "ed25519":
assert isinstance(pk, ed25519.Ed25519PrivateKey)
if "ed448" == algo:
if algo == "ed448":
assert isinstance(pk, ed448.Ed448PrivateKey)
return pk
@ -2585,11 +2581,11 @@ def _get_crl(crl, encoding="pem"):
except Exception: # pylint: disable=broad-except
pass
if "pem" == encoding:
if encoding == "pem":
if not isinstance(crl, bytes):
crl = crl.encode()
return cx509.load_pem_x509_crl(crl)
if "der" == encoding:
if encoding == "der":
if not isinstance(crl, bytes):
crl = base64.b64decode(crl)
return cx509.load_der_x509_crl(crl)
@ -2603,11 +2599,11 @@ def _get_csr(csr, encoding="pem"):
except Exception: # pylint: disable=broad-except
pass
if "pem" == encoding:
if encoding == "pem":
if not isinstance(csr, bytes):
csr = csr.encode()
return cx509.load_pem_x509_csr(csr)
if "der" == encoding:
if encoding == "der":
if not isinstance(csr, bytes):
csr = base64.b64decode(csr)
return cx509.load_der_x509_csr(csr)
@ -2623,15 +2619,15 @@ def _get_privkey(pk, encoding="pem", passphrase=None):
if passphrase is not None:
passphrase = passphrase.encode()
if "pem" == encoding:
if encoding == "pem":
if not isinstance(pk, bytes):
pk = pk.encode()
return load_pem_private_key(pk, passphrase)
if "der" == encoding:
if encoding == "der":
if not isinstance(pk, bytes):
pk = base64.b64decode(pk)
return load_der_private_key(pk, passphrase)
if "pkcs12" == encoding:
if encoding == "pkcs12":
if not isinstance(pk, bytes):
pk = base64.b64decode(pk)
return pkcs12.load_pkcs12(pk, passphrase).key

View file

@ -1,5 +1,5 @@
"""
Tests for the Vault module
Tests for the x509_v2 module
"""
import base64
@ -404,7 +404,7 @@ A62orBDc+8x+AehfwYSm11dz5/P6aL3QZf+tzr05vbVn
-----END ENCRYPTED PRIVATE KEY-----"""
@pytest.fixture()
@pytest.fixture
def cert_args(ca_minion_id, x509_data):
return {
"ca_server": ca_minion_id,
@ -414,7 +414,7 @@ def cert_args(ca_minion_id, x509_data):
}
@pytest.fixture()
@pytest.fixture
def cert_args_exts():
return {
"basicConstraints": "critical, CA:TRUE, pathlen:1",
@ -449,7 +449,7 @@ def test_sign_remote_certificate(x509_salt_call_cli, cert_args, ca_key, rsa_priv
ret = x509_salt_call_cli.run("x509.create_certificate", **cert_args)
assert ret.data
cert = _get_cert(ret.data)
assert "CN=from_signing_policy" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=from_signing_policy"
assert _signed_by(cert, ca_key)
assert _belongs_to(cert, rsa_privkey)
@ -461,7 +461,7 @@ def test_sign_remote_certificate_match(
ret = x509_salt_call_cli.run("x509.create_certificate", **cert_args)
assert ret.data
cert = _get_cert(ret.data)
assert "CN=from_matching_policy" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=from_matching_policy"
assert _signed_by(cert, ca_key)
assert _belongs_to(cert, rsa_privkey)
@ -473,7 +473,7 @@ def test_sign_remote_certificate_compound_match(
ret = x509_salt_call_cli.run("x509.create_certificate", **cert_args)
assert ret.data
cert = _get_cert(ret.data)
assert "CN=from_compound_match_policy" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=from_compound_match_policy"
assert _signed_by(cert, ca_key)
assert _belongs_to(cert, rsa_privkey)
@ -486,7 +486,7 @@ def test_sign_remote_certificate_enc(
ret = x509_salt_call_cli.run("x509.create_certificate", **cert_args)
assert ret.data
cert = _get_cert(ret.data)
assert "CN=from_signing_policy" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=from_signing_policy"
assert _signed_by(cert, ca_key)
assert _belongs_to(cert, rsa_privkey)
@ -498,7 +498,7 @@ def test_sign_remote_certificate_ca_enc(
ret = x509_salt_call_cli.run("x509.create_certificate", **cert_args)
assert ret.data
cert = _get_cert(ret.data)
assert "CN=from_signing_policy" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=from_signing_policy"
assert _signed_by(cert, ca_key)
assert _belongs_to(cert, rsa_privkey)
@ -511,7 +511,7 @@ def test_sign_remote_certificate_pubkey(
ret = x509_salt_call_cli.run("x509.create_certificate", **cert_args)
assert ret.data
cert = _get_cert(ret.data)
assert "CN=from_signing_policy" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=from_signing_policy"
assert _signed_by(cert, ca_key)
assert _belongs_to(cert, rsa_privkey)
@ -524,7 +524,7 @@ def test_sign_remote_certificate_csr(
ret = x509_salt_call_cli.run("x509.create_certificate", **cert_args)
assert ret.data
cert = _get_cert(ret.data)
assert "CN=from_signing_policy" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=from_signing_policy"
assert _signed_by(cert, ca_key)
assert _belongs_to(cert, rsa_privkey)
@ -559,7 +559,7 @@ def test_sign_remote_certificate_no_subject_override(
ret = x509_salt_call_cli.run("x509.create_certificate", **cert_args)
assert ret.data
cert = _get_cert(ret.data)
assert "CN=from_signing_policy" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=from_signing_policy"
assert _signed_by(cert, ca_key)
assert _belongs_to(cert, rsa_privkey)
@ -580,7 +580,7 @@ def test_sign_remote_certificate_no_name_attribute_override(
ret = x509_salt_call_cli.run("x509.create_certificate", **cert_args)
assert ret.data
cert = _get_cert(ret.data)
assert "CN=from_signing_policy" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=from_signing_policy"
assert _signed_by(cert, ca_key)
assert _belongs_to(cert, rsa_privkey)
@ -656,23 +656,23 @@ def _get_cert(cert, encoding="pem", passphrase=None):
except Exception: # pylint: disable=broad-except
pass
if "pem" == encoding:
if encoding == "pem":
if not isinstance(cert, bytes):
cert = cert.encode()
return cx509.load_pem_x509_certificate(cert)
if "der" == encoding:
if encoding == "der":
if not isinstance(cert, bytes):
cert = base64.b64decode(cert)
return cx509.load_der_x509_certificate(cert)
if "pkcs7_pem" == encoding:
if encoding == "pkcs7_pem":
if not isinstance(cert, bytes):
cert = cert.encode()
return pkcs7.load_pem_pkcs7_certificates(cert)
if "pkcs7_der" == encoding:
if encoding == "pkcs7_der":
if not isinstance(cert, bytes):
cert = base64.b64decode(cert)
return pkcs7.load_der_pkcs7_certificates(cert)
if "pkcs12" == encoding:
if encoding == "pkcs12":
if not isinstance(cert, bytes):
cert = base64.b64decode(cert)
if passphrase is not None and not isinstance(passphrase, bytes):

View file

@ -1,5 +1,5 @@
"""
Tests for the Vault module
Tests for the x509_v2 module
"""
import base64
@ -184,7 +184,7 @@ def x509_master_config(ca_minion_id):
}
@pytest.fixture()
@pytest.fixture
def privkey_new(x509_salt_master, tmp_path, ca_minion_id, x509_salt_call_cli):
state = f"""\
Private key:
@ -216,7 +216,7 @@ Certificate:
yield
@pytest.fixture()
@pytest.fixture
def privkey_new_pkcs12(x509_salt_master, tmp_path, ca_minion_id, x509_salt_call_cli):
state = f"""\
Private key:
@ -470,7 +470,7 @@ A62orBDc+8x+AehfwYSm11dz5/P6aL3QZf+tzr05vbVn
-----END ENCRYPTED PRIVATE KEY-----"""
@pytest.fixture()
@pytest.fixture
def cert_args(ca_minion_id, tmp_path, x509_data):
return {
"name": str(tmp_path / "cert_managed"),
@ -481,7 +481,7 @@ def cert_args(ca_minion_id, tmp_path, x509_data):
}
@pytest.fixture()
@pytest.fixture
def cert_args_exts():
return {
"basicConstraints": "critical, CA:TRUE, pathlen:1",
@ -510,7 +510,7 @@ def existing_cert(x509_salt_call_cli, cert_args, ca_key, rsa_privkey, request):
)
assert ret.returncode == 0
cert = _get_cert(cert_args["name"])
assert "CN=from_signing_policy" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=from_signing_policy"
assert _signed_by(cert, ca_key)
assert _belongs_to(cert, rsa_privkey)
yield cert_args["name"]
@ -534,7 +534,7 @@ def test_certificate_managed_remote(
)
assert ret.returncode == 0
cert = _get_cert(cert_args["name"])
assert "CN=from_signing_policy" == cert.subject.rfc4514_string()
assert cert.subject.rfc4514_string() == "CN=from_signing_policy"
assert _signed_by(cert, ca_key)
assert _belongs_to(cert, rsa_privkey)
@ -603,7 +603,7 @@ def test_certificate_managed_remote_renew(x509_salt_call_cli, cert_args):
)
assert ret.returncode == 0
cert_new = _get_cert(cert_args["name"])
assert cert_cur.serial_number != cert_new.serial_number
assert cert_new.serial_number != cert_cur.serial_number
@pytest.mark.usefixtures("privkey_new")
@ -658,23 +658,23 @@ def _get_cert(cert, encoding="pem", passphrase=None):
except Exception: # pylint: disable=broad-except
pass
if "pem" == encoding:
if encoding == "pem":
if not isinstance(cert, bytes):
cert = cert.encode()
return cx509.load_pem_x509_certificate(cert)
if "der" == encoding:
if encoding == "der":
if not isinstance(cert, bytes):
cert = base64.b64decode(cert)
return cx509.load_der_x509_certificate(cert)
if "pkcs7_pem" == encoding:
if encoding == "pkcs7_pem":
if not isinstance(cert, bytes):
cert = cert.encode()
return pkcs7.load_pem_pkcs7_certificates(cert)
if "pkcs7_der" == encoding:
if encoding == "pkcs7_der":
if not isinstance(cert, bytes):
cert = base64.b64decode(cert)
return pkcs7.load_der_pkcs7_certificates(cert)
if "pkcs12" == encoding:
if encoding == "pkcs12":
if not isinstance(cert, bytes):
cert = base64.b64decode(cert)
if passphrase is not None and not isinstance(passphrase, bytes):
@ -692,15 +692,15 @@ def _get_privkey(pk, encoding="pem", passphrase=None):
if passphrase is not None:
passphrase = passphrase.encode()
if "pem" == encoding:
if encoding == "pem":
if not isinstance(pk, bytes):
pk = pk.encode()
return load_pem_private_key(pk, passphrase)
if "der" == encoding:
if encoding == "der":
if not isinstance(pk, bytes):
pk = base64.b64decode(pk)
return load_der_private_key(pk, passphrase)
if "pkcs12" == encoding:
if encoding == "pkcs12":
if not isinstance(pk, bytes):
pk = base64.b64decode(pk)
return pkcs12.load_pkcs12(pk, passphrase).key

View file

@ -22,7 +22,7 @@ pytestmark = [
CRYPTOGRAPHY_VERSION = tuple(int(x) for x in cryptography.__version__.split("."))
@pytest.fixture()
@pytest.fixture
def single_pem():
return """\
-----BEGIN RSA PRIVATE KEY-----
@ -55,7 +55,7 @@ bQdPnxzSwrf6edD2AmIT9L8IwiCYiplC+JvqSlqDP2pxIQbilmw=
"""
@pytest.fixture()
@pytest.fixture
def multi_pem():
return """\
-----BEGIN CERTIFICATE-----
@ -103,7 +103,7 @@ LN1w5sybsYwIw6QN
def test_split_pems_single(single_pem):
res = x509.split_pems(single_pem)
assert 1 == len(res)
assert len(res) == 1
assert res[0].startswith(b"-----BEGIN RSA PRIVATE KEY-----\n")
assert res[0].endswith(b"-----END RSA PRIVATE KEY-----\n")
assert len(res[0].splitlines()) == 27
@ -111,7 +111,7 @@ def test_split_pems_single(single_pem):
def test_split_pems_multi(multi_pem):
res = x509.split_pems(multi_pem)
assert 2 == len(res)
assert len(res) == 2
for x in res:
assert x.startswith(b"-----BEGIN CERTIFICATE-----\n")
assert x.endswith(b"-----END CERTIFICATE-----\n")
@ -126,7 +126,7 @@ def test_split_pems_garbage_between(single_pem):
+ single_pem
)
res = x509.split_pems(garbage_pem)
assert 2 == len(res)
assert len(res) == 2
for x in res:
assert x.startswith(b"-----BEGIN RSA PRIVATE KEY-----\n")
assert x.endswith(b"-----END RSA PRIVATE KEY-----\n")
@ -134,12 +134,12 @@ def test_split_pems_garbage_between(single_pem):
class TestCreateExtension:
@pytest.fixture()
@pytest.fixture
def aki(self):
with patch("cryptography.x509.AuthorityKeyIdentifier", autospec=True) as ext:
yield ext
@pytest.fixture()
@pytest.fixture
def ca_crt(self):
ca = Mock(spec=cx509.Certificate)
return ca
@ -286,7 +286,7 @@ class TestCreateExtension:
"subjectKeyIdentifier", val, subject_pubkey="testpub"
)
assert crit is False
if "hash" == val:
if val == "hash":
ext.from_public_key.assert_called_once_with("testpub")
else:
ext.from_public_key.assert_not_called()
@ -720,7 +720,7 @@ class TestCreateExtension:
):
with patch(f"cryptography.x509.{tgt}", autospec=True) as ext:
res, crit = x509._create_extension(extname, val)
if "FreshestCRL" == tgt:
if tgt == "FreshestCRL":
assert crit is False
else:
assert crit == critical
@ -1191,7 +1191,7 @@ class TestCreateExtension:
def test_parse_general_names(inpt, cls, parsed):
expected = cls(parsed)
res = x509._parse_general_names([inpt])
if "dirName" == inpt[0]:
if inpt[0] == "dirName":
assert res[0].value == expected
else:
assert res[0] == expected