Fix sorting issues on x509.certificate_managed triggering unnecessary changes

Fixes #56556
This commit is contained in:
Pedro Algarvio 2020-06-16 13:46:53 +01:00 committed by Daniel Wozniak
parent 7493fdb4d3
commit ade7a09205
4 changed files with 248 additions and 84 deletions

1
changelog/56556.fixed Normal file
View file

@ -0,0 +1 @@
The `x509.certificate_managed` state no longer triggers a change because of sorting issues if the certificate being evaluated was previously generated under Python 2.

View file

@ -7,8 +7,6 @@ Manage X509 certificates
:depends: M2Crypto
"""
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import ast
@ -25,22 +23,13 @@ import tempfile
import salt.exceptions
import salt.utils.data
# Import salt libs
import salt.utils.files
import salt.utils.path
import salt.utils.platform
import salt.utils.stringutils
from salt.ext import six
# pylint: disable=import-error,redefined-builtin
from salt.ext.six.moves import range
# pylint: enable=import-error,redefined-builtin
from salt.state import STATE_INTERNAL_KEYWORDS as _STATE_INTERNAL_KEYWORDS
from salt.utils.odict import OrderedDict
# Import 3rd Party Libs
try:
import M2Crypto
@ -56,7 +45,7 @@ except ImportError:
__virtualname__ = "x509"
log = logging.getLogger(__name__) # pylint: disable=invalid-name
log = logging.getLogger(__name__)
EXT_NAME_MAPPINGS = OrderedDict(
[
@ -104,7 +93,6 @@ class _Ctx(ctypes.Structure):
https://bugzilla.osafoundation.org/show_bug.cgi?id=7530#c13
"""
# pylint: disable=too-few-public-methods
_fields_ = [
("flags", ctypes.c_int),
("issuer_cert", ctypes.c_void_p),
@ -121,7 +109,7 @@ def _fix_ctx(m2_ctx, issuer=None):
This is part of an ugly hack to fix an ancient bug in M2Crypto
https://bugzilla.osafoundation.org/show_bug.cgi?id=7530#c13
"""
ctx = _Ctx.from_address(int(m2_ctx)) # pylint: disable=no-member
ctx = _Ctx.from_address(int(m2_ctx))
ctx.flags = 0
ctx.subject_cert = None
@ -154,13 +142,11 @@ def _new_extension(name, value, critical=0, issuer=None, _pyfree=1):
x509_ext_ptr = M2Crypto.m2.x509v3_ext_conf(None, ctx, name, value)
lhash = None
except AttributeError:
lhash = M2Crypto.m2.x509v3_lhash() # pylint: disable=no-member
ctx = M2Crypto.m2.x509v3_set_conf_lhash(lhash) # pylint: disable=no-member
lhash = M2Crypto.m2.x509v3_lhash()
ctx = M2Crypto.m2.x509v3_set_conf_lhash(lhash)
# ctx not zeroed
_fix_ctx(ctx, issuer)
x509_ext_ptr = M2Crypto.m2.x509v3_ext_conf(
lhash, ctx, name, value
) # pylint: disable=no-member
x509_ext_ptr = M2Crypto.m2.x509v3_ext_conf(lhash, ctx, name, value)
# ctx,lhash freed
if x509_ext_ptr is None:
@ -201,7 +187,7 @@ def _get_csr_extensions(csr):
"""
ret = OrderedDict()
csrtempfile = tempfile.NamedTemporaryFile()
csrtempfile = tempfile.NamedTemporaryFile(delete=True)
csrtempfile.write(csr.as_pem())
csrtempfile.flush()
csryaml = _parse_openssl_req(csrtempfile.name)
@ -212,16 +198,14 @@ def _get_csr_extensions(csr):
if not csrexts:
return ret
for short_name, long_name in six.iteritems(EXT_NAME_MAPPINGS):
for short_name, long_name in EXT_NAME_MAPPINGS.items():
if csrexts and long_name in csrexts:
ret[short_name] = csrexts[long_name]
return ret
# None of python libraries read CRLs. Again have to hack it with the
# openssl CLI
# pylint: disable=too-many-branches,too-many-locals
# None of python libraries read CRLs. Again have to hack it with the openssl CLI
def _parse_openssl_crl(crl_filename):
"""
Parses openssl command line output, this is a workaround for M2Crypto's
@ -278,9 +262,7 @@ def _parse_openssl_crl(crl_filename):
rev_sn = revoked.split("\n")[0].strip()
revoked = rev_sn + ":\n" + "\n".join(revoked.split("\n")[1:])
rev_yaml = salt.utils.data.decode(salt.utils.yaml.safe_load(revoked))
# pylint: disable=unused-variable
for rev_item, rev_values in six.iteritems(rev_yaml):
# pylint: enable=unused-variable
for rev_values in rev_yaml.values():
if "Revocation Date" in rev_values:
rev_date = datetime.datetime.strptime(
rev_values["Revocation Date"], "%b %d %H:%M:%S %Y %Z"
@ -294,9 +276,6 @@ def _parse_openssl_crl(crl_filename):
return crl
# pylint: enable=too-many-branches
def _get_signing_policy(name):
policies = __salt__["pillar.get"]("x509_signing_policies", None)
if policies:
@ -350,19 +329,21 @@ def _parse_subject(subject):
"""
Returns a dict containing all values in an X509 Subject
"""
ret = {}
ret = OrderedDict()
nids = []
for nid_name, nid_num in six.iteritems(subject.nid):
ret_list = []
for nid_name, nid_num in subject.nid.items():
if nid_num in nids:
continue
try:
val = getattr(subject, nid_name)
if val:
ret[nid_name] = val
ret_list.append((nid_num, nid_name, val))
nids.append(nid_num)
except TypeError as err:
log.trace("Missing attribute '%s'. Error: %s", nid_name, err)
for nid_num, nid_name, val in sorted(ret_list):
ret[nid_name] = val
return ret
@ -693,7 +674,7 @@ def read_crl(crl):
text = _text_or_file(crl)
text = get_pem_entry(text, pem_type="X509 CRL")
crltempfile = tempfile.NamedTemporaryFile()
crltempfile = tempfile.NamedTemporaryFile(delete=True)
crltempfile.write(salt.utils.stringutils.to_str(text))
crltempfile.flush()
crlparsed = _parse_openssl_crl(crltempfile.name)
@ -880,9 +861,7 @@ def create_private_key(
else:
_callback_func = _keygen_callback
# pylint: disable=no-member
rsa = M2Crypto.RSA.gen_key(bits, M2Crypto.m2.RSA_F4, _callback_func)
# pylint: enable=no-member
bio = M2Crypto.BIO.MemoryBuffer()
if passphrase is None:
cipher = None
@ -896,7 +875,7 @@ def create_private_key(
return salt.utils.stringutils.to_str(bio.read_all())
def create_crl( # pylint: disable=too-many-arguments,too-many-locals
def create_crl(
path=None,
text=False,
signing_private_key=None,
@ -907,7 +886,7 @@ def create_crl( # pylint: disable=too-many-arguments,too-many-locals
days_valid=100,
digest="",
):
"""
r"""
Create a CRL
:depends: - PyOpenSSL Python module
@ -972,7 +951,10 @@ def create_crl( # pylint: disable=too-many-arguments,too-many-locals
.. code-block:: bash
salt '*' x509.create_crl path=/etc/pki/mykey.key signing_private_key=/etc/pki/ca.key signing_cert=/etc/pki/ca.crt revoked="{'compromized-web-key': {'certificate': '/etc/pki/certs/www1.crt', 'revocation_date': '2015-03-01 00:00:00'}}"
salt '*' x509.create_crl path=/etc/pki/mykey.key \
signing_private_key=/etc/pki/ca.key \
signing_cert=/etc/pki/ca.crt \
revoked="{'compromized-web-key': {'certificate': '/etc/pki/certs/www1.crt', 'revocation_date': '2015-03-01 00:00:00'}}"
"""
# pyOpenSSL is required for dealing with CSLs. Importing inside these
# functions because Client operations like creating CRLs shouldn't require
@ -1111,7 +1093,7 @@ def sign_remote_certificate(argdic, **kwargs):
try:
return create_certificate(path=None, text=True, **argdic)
except Exception as except_: # pylint: disable=broad-except
return six.text_type(except_)
return str(except_)
def get_signing_policy(signing_policy_name):
@ -1150,7 +1132,6 @@ def get_signing_policy(signing_policy_name):
return signing_policy
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
def create_certificate(path=None, text=False, overwrite=True, ca_server=None, **kwargs):
"""
Create an X509 certificate.
@ -1501,7 +1482,7 @@ def create_certificate(path=None, text=False, overwrite=True, ca_server=None, **
# Overwrite any arguments in kwargs with signing_policy
kwargs.update(signing_policy)
for prop, default in six.iteritems(CERT_DEFAULTS):
for prop, default in CERT_DEFAULTS.items():
if prop not in kwargs:
kwargs[prop] = default
@ -1520,14 +1501,13 @@ def create_certificate(path=None, text=False, overwrite=True, ca_server=None, **
# long max_value. This causes an overflow error due to a bug in M2Crypto.
# See issue: https://gitlab.com/m2crypto/m2crypto/issues/232
# Remove this after M2Crypto fixes the bug.
if six.PY3:
if salt.utils.platform.is_windows():
INT_MAX = 2147483647
if serial_number >= INT_MAX:
serial_number -= int(serial_number / INT_MAX) * INT_MAX
else:
if serial_number >= sys.maxsize:
serial_number -= int(serial_number / sys.maxsize) * sys.maxsize
if salt.utils.platform.is_windows():
INT_MAX = 2147483647
if serial_number >= INT_MAX:
serial_number -= int(serial_number / INT_MAX) * INT_MAX
else:
if serial_number >= sys.maxsize:
serial_number -= int(serial_number / sys.maxsize) * sys.maxsize
cert.set_serial_number(serial_number)
# Handle not_before and not_after dates for custom certificate validity
@ -1567,7 +1547,6 @@ def create_certificate(path=None, text=False, overwrite=True, ca_server=None, **
cert.set_not_after(asn1_not_after)
# Set validity dates
# pylint: disable=no-member
# if no 'not_before' or 'not_after' dates are setup, both of the following
# dates will be the date of today. then the days_valid offset makes sense.
@ -1582,8 +1561,6 @@ def create_certificate(path=None, text=False, overwrite=True, ca_server=None, **
valid_seconds = 60 * 60 * 24 * kwargs["days_valid"] # 60s * 60m * 24 * days
M2Crypto.m2.x509_gmtime_adj(not_after, valid_seconds)
# pylint: enable=no-member
# If neither public_key or csr are included, this cert is self-signed
if "public_key" not in kwargs and "csr" not in kwargs:
kwargs["public_key"] = kwargs["signing_private_key"]
@ -1605,11 +1582,9 @@ def create_certificate(path=None, text=False, overwrite=True, ca_server=None, **
subject = cert.get_subject()
# pylint: disable=unused-variable
for entry, num in six.iteritems(subject.nid):
for entry in sorted(subject.nid):
if entry in kwargs:
setattr(subject, entry, kwargs[entry])
# pylint: enable=unused-variable
if "signing_cert" in kwargs:
signing_cert = _get_certificate_obj(kwargs["signing_cert"])
@ -1617,7 +1592,7 @@ def create_certificate(path=None, text=False, overwrite=True, ca_server=None, **
signing_cert = cert
cert.set_issuer(signing_cert.get_subject())
for extname, extlongname in six.iteritems(EXT_NAME_MAPPINGS):
for extname, extlongname in EXT_NAME_MAPPINGS.items():
if (
extname in kwargs
or extlongname in kwargs
@ -1695,7 +1670,7 @@ def create_certificate(path=None, text=False, overwrite=True, ca_server=None, **
if "copypath" in kwargs:
if "prepend_cn" in kwargs and kwargs["prepend_cn"] is True:
prepend = six.text_type(kwargs["CN"]) + "-"
prepend = str(kwargs["CN"]) + "-"
else:
prepend = ""
write_pem(
@ -1714,9 +1689,6 @@ def create_certificate(path=None, text=False, overwrite=True, ca_server=None, **
return salt.utils.stringutils.to_str(cert.as_pem())
# pylint: enable=too-many-locals
def create_csr(path=None, text=False, **kwargs):
"""
Create a certificate signing request.
@ -1755,7 +1727,7 @@ def create_csr(path=None, text=False, **kwargs):
csr = M2Crypto.X509.Request()
subject = csr.get_subject()
for prop, default in six.iteritems(CERT_DEFAULTS):
for prop, default in CERT_DEFAULTS.items():
if prop not in kwargs:
kwargs[prop] = default
@ -1788,14 +1760,12 @@ def create_csr(path=None, text=False, **kwargs):
)
)
# pylint: disable=unused-variable
for entry, num in six.iteritems(subject.nid):
for entry in sorted(subject.nid):
if entry in kwargs:
setattr(subject, entry, kwargs[entry])
# pylint: enable=unused-variable
extstack = M2Crypto.X509.X509_Extension_Stack()
for extname, extlongname in six.iteritems(EXT_NAME_MAPPINGS):
for extname, extlongname in EXT_NAME_MAPPINGS.items():
if extname not in kwargs and extlongname not in kwargs:
continue
@ -1921,13 +1891,13 @@ def verify_crl(crl, cert):
raise salt.exceptions.SaltInvocationError("openssl binary not found in path")
crltext = _text_or_file(crl)
crltext = get_pem_entry(crltext, pem_type="X509 CRL")
crltempfile = tempfile.NamedTemporaryFile()
crltempfile = tempfile.NamedTemporaryFile(delete=True)
crltempfile.write(salt.utils.stringutils.to_str(crltext))
crltempfile.flush()
certtext = _text_or_file(cert)
certtext = get_pem_entry(certtext, pem_type="CERTIFICATE")
certtempfile = tempfile.NamedTemporaryFile()
certtempfile = tempfile.NamedTemporaryFile(delete=True)
certtempfile.write(salt.utils.stringutils.to_str(certtext))
certtempfile.flush()

View file

@ -170,27 +170,24 @@ be considered valid.
- backup: True
"""
# Import Python Libs
from __future__ import absolute_import, print_function, unicode_literals
import copy
import datetime
import logging
import os
import re
# Import Salt Libs
import salt.exceptions
import salt.utils.versions
# Import 3rd-party libs
from salt.ext import six
try:
from M2Crypto.RSA import RSAError
except ImportError:
pass
log = logging.getLogger(__name__)
def __virtual__():
"""
@ -210,10 +207,10 @@ def _revoked_to_list(revs):
list_ = []
for rev in revs:
for rev_name, props in six.iteritems(rev): # pylint: disable=unused-variable
for props in rev.values():
dict_ = {}
for prop in props:
for propname, val in six.iteritems(prop):
for propname, val in prop.items():
if isinstance(val, datetime.datetime):
val = val.strftime("%Y-%m-%d %H:%M:%S")
dict_[propname] = val
@ -433,10 +430,43 @@ def _certificate_info_matches(cert_info, required_cert_info, check_serial=False)
pass
diff = []
for k, v in six.iteritems(required_cert_info):
for k, v in required_cert_info.items():
try:
if v != cert_info[k]:
diff.append(k)
if k == "Subject Hash":
# If we failed the subject hash check but the subject matches, then this is
# likely a certificate generated under Python 2 where sorting differs and thus
# the hash also differs
if required_cert_info["Subject"] != cert_info["Subject"]:
diff.append(k)
elif k == "Issuer Hash":
# If we failed the issuer hash check but the issuer matches, then this is
# likely a certificate generated under Python 2 where sorting differs and thus
# the hash also differs
if required_cert_info["Issuer"] != cert_info["Issuer"]:
diff.append(k)
elif k == "X509v3 Extensions":
v_ext = v.copy()
cert_info_ext = cert_info[k].copy()
# DirName depends on ordering which was different on certificates created
# under Python 2. Remove that from the comparisson
try:
v_ext["authorityKeyIdentifier"] = re.sub(
r"DirName:([^\n]+)",
"Dirname:--",
v_ext["authorityKeyIdentifier"],
)
cert_info_ext["authorityKeyIdentifier"] = re.sub(
r"DirName:([^\n]+)",
"Dirname:--",
cert_info_ext["authorityKeyIdentifier"],
)
except KeyError:
pass
if v_ext != cert_info_ext:
diff.append(k)
else:
diff.append(k)
except KeyError:
diff.append(k)
@ -459,7 +489,8 @@ def _certificate_days_remaining(cert_info):
def _certificate_is_valid(name, days_remaining, append_certs, **cert_spec):
"""
Return True if the given certificate file exists, is a certificate, matches the given specification, and has the required days remaining.
Return True if the given certificate file exists, is a certificate, matches the given specification,
and has the required days remaining.
If False, also provide a message explaining why.
"""
@ -524,7 +555,7 @@ def _certificate_file_managed(ret, file_args):
"""
Run file.managed and merge the result with an existing return dict.
The overall True/False result will be the result of the file.managed call.
"""
"""
file_ret = __states__["file.managed"](**file_args)
ret["result"] = file_ret["result"]
@ -628,7 +659,8 @@ def certificate_managed(
if managed_private_key:
salt.utils.versions.warn_until(
"Aluminium",
"Passing 'managed_private_key' to x509.certificate_managed has no effect and will be removed Salt Aluminium. Use a separate x509.private_key_managed call instead.",
"Passing 'managed_private_key' to x509.certificate_managed has no effect and "
"will be removed Salt Aluminium. Use a separate x509.private_key_managed call instead.",
)
ret = {"name": name, "result": False, "changes": {}, "comment": ""}

View file

@ -3,11 +3,13 @@ from __future__ import absolute_import, unicode_literals
import datetime
import hashlib
import logging
import os
import pprint
import textwrap
import pytest
import salt.utils.files
from salt.ext import six
from tests.support.case import ModuleCase
from tests.support.helpers import slowTest, with_tempfile
from tests.support.mixins import SaltReturnAssertsMixin
@ -21,7 +23,10 @@ try:
except ImportError:
HAS_M2CRYPTO = False
log = logging.getLogger(__name__)
@pytest.mark.usefixtures("salt_sub_minion")
@skipIf(not HAS_M2CRYPTO, "Skip when no M2Crypto found")
class x509Test(ModuleCase, SaltReturnAssertsMixin):
@classmethod
@ -130,7 +135,7 @@ class x509Test(ModuleCase, SaltReturnAssertsMixin):
pillar={"keyfile": keyfile, "crtfile": crtfile},
)
assert isinstance(ret, dict), ret
for state_result in six.itervalues(ret):
for state_result in ret.values():
assert state_result["result"] is True, state_result
assert os.path.exists(keyfile)
assert os.path.exists(crtfile)
@ -546,3 +551,159 @@ class x509Test(ModuleCase, SaltReturnAssertsMixin):
self.assertFalse(
os.path.exists(crtfile), "Certificate should not have been created."
)
@with_tempfile(suffix=".crt", create=False)
@with_tempfile(suffix=".key", create=False)
def test_py2_generated_cert_is_not_recreated(self, keyfile, crtfile):
keyfile_contents = textwrap.dedent(
"""\
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAp5PQyx5NlYrfzd7vU/Xb2YR5qbWWtpWWoKmJC1gML5v5DBI7
+p/kAHNNmK8uqHXTaI4N/zgarfjrg4zceq2Du7pP0xiCAYolhFqF78ibxNrN4OkT
UPm2kM88iJ8Z14Yph8ueSxLIlujCGaEFhr6wRzTj4T9b+0Bb/PZHI2t5YwtIooVM
EFCBFkt4bb004tO0D9q0CPPVT2AsGmxnY43Aj3Epy++kqmaWj1hIucSprkDrAXFS
WacBQPFQ8XctnL2Z1Q6CJ5WUNrW8ohAJ9RJkwjiqbZTwYIPSSrl+FO3XqDY70SxU
3xDeqhU4zvyjxJ8w9SPqTUu/C3BZtRBT9dCBEQIDAQABAoIBAQCZvS23u1RYVrEe
sWGF+LA67aOkg9kCJ1iqiv8UrjF32DNy1KO8OcY2d5H/+u/mUzqh2HmU5QbtBsoi
xS9dSSTrLHGhbAGRogjrVRU9uCDYSBjLN2mmR4IrdkTF3pkZtpcRY0gU/eWTNXUl
iCmGxhj5KtfJxZQAfLon6FW5dBdIOgxSCJhvRq0zFpWJZFGWWkBExDfeNg//0fCU
UbjRjGacP/+R6FSJa6tevzgR7tIIapm1dY/ofPXIXsZGo1R87fRgLI1D+e84Jdds
/U0bKzPOgAjcC1b262lJ8058pjG/nqWC0YUfpIJUVv2ciJpH3Ha+90526InLAUXA
RWe1Z2YxAoGBANqACEKvUbxENu+XxQj0SI1co4SRTOvgbrSQGL61rDY6PvY/bOqC
JeR0KC3MN6e7fx52tsl/eqP9iyExUpO9b0BCnGg967MivJXWUxhUdOL/r2ceQBqD
DiPVZCFsjeNdSNihnNctAig9Po3GEUWE0ikHr3NcD+wXTnhnIEjJ/fltAoGBAMRW
dIcOiuDLm/oDLNCpwEO4m63ymbUgeOj2cZhKMTqFmspnKnuCU1U/A8cuQcs1gydL
7MzxVP7MZDIEqT5gGj3eyuVMAmKbvLFR2NctDIDjaUs6oz0J9NGByPNjXaYr4uMd
EZrxD8gLZ/G+/7eKsCgBA9ksSydDo00Vf/qAsmO1AoGBANWqc+l59eyrrCj5egU6
lKQf3gsp51WV/8v0SS5dC41vwdgdx80+/fz8FbpLRHVypWlN34sFbRFmQ6Juz/iH
O35UZQyO2KkxI8dGcbWOCUtditHExBzo4W/rIWKJ++pFc5Hb4DqO2dgto7kR4hvg
OX9D869UbIGLfQHCntBvLju1AoGAHpcl0sEmTD4NEFgcTGqWZTbHMsQAxOLJU+rJ
6iNtJiQY6P5H9TRqDXci/I6te57bz2yZ+ZiEWKq51b06LVjF3evviuhb2sdPEAWj
lmsTbqWAC1OYiXMarOXezGUn+zMNR7uIua5jehSk3lqW9x7psWHvGpA3KWf1cpYt
+XbB1J0CgYBCSjALTv4dcn+CtS3kqb806z8H9MSZznUwSmcgvwCR5sqwLAUk1xRn
hEqXbC1RGee3Xqv9mXPDK2LirpdRYi9Jr9ApZkrSkeaXSd2d4cy2ujUT0c7P8JrD
i6QXb+HaFeBuS5ulYDmo4mIbCysuTsgrLzplViUy3xUQv23M/Eh1gw==
-----END RSA PRIVATE KEY-----
"""
)
crtfile_contents = textwrap.dedent(
"""\
-----BEGIN CERTIFICATE-----
MIIEhTCCA22gAwIBAgIIUijHgif6VJUwDQYJKoZIhvcNAQELBQAwgYIxCzAJBgNV
BAYTAkJFMRgwFgYDVQQDDA9FeGFtcGxlIFJvb3QgQ0ExETAPBgNVBAcMCEthcGVs
bGVuMRAwDgYDVQQIDAdBbnR3ZXJwMRAwDgYDVQQKDAdFeGFtcGxlMSIwIAYJKoZI
hvcNAQkBFhNjZXJ0YWRtQGV4YW1wbGUub3JnMB4XDTIwMDYxNjA3Mzk1OVoXDTMw
MDYxNDA3Mzk1OVowgYIxCzAJBgNVBAYTAkJFMRgwFgYDVQQDDA9FeGFtcGxlIFJv
b3QgQ0ExETAPBgNVBAcMCEthcGVsbGVuMRAwDgYDVQQIDAdBbnR3ZXJwMRAwDgYD
VQQKDAdFeGFtcGxlMSIwIAYJKoZIhvcNAQkBFhNjZXJ0YWRtQGV4YW1wbGUub3Jn
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAp5PQyx5NlYrfzd7vU/Xb
2YR5qbWWtpWWoKmJC1gML5v5DBI7+p/kAHNNmK8uqHXTaI4N/zgarfjrg4zceq2D
u7pP0xiCAYolhFqF78ibxNrN4OkTUPm2kM88iJ8Z14Yph8ueSxLIlujCGaEFhr6w
RzTj4T9b+0Bb/PZHI2t5YwtIooVMEFCBFkt4bb004tO0D9q0CPPVT2AsGmxnY43A
j3Epy++kqmaWj1hIucSprkDrAXFSWacBQPFQ8XctnL2Z1Q6CJ5WUNrW8ohAJ9RJk
wjiqbZTwYIPSSrl+FO3XqDY70SxU3xDeqhU4zvyjxJ8w9SPqTUu/C3BZtRBT9dCB
EQIDAQABo4H8MIH5MA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgEGMB0G
A1UdDgQWBBTmNsYLuQTxpANgTuw7LRn1qHJsjzCBtgYDVR0jBIGuMIGrgBTmNsYL
uQTxpANgTuw7LRn1qHJsj6GBiKSBhTCBgjELMAkGA1UEBhMCQkUxGDAWBgNVBAMM
D0V4YW1wbGUgUm9vdCBDQTERMA8GA1UEBwwIS2FwZWxsZW4xEDAOBgNVBAgMB0Fu
dHdlcnAxEDAOBgNVBAoMB0V4YW1wbGUxIjAgBgkqhkiG9w0BCQEWE2NlcnRhZG1A
ZXhhbXBsZS5vcmeCCFIox4In+lSVMA0GCSqGSIb3DQEBCwUAA4IBAQBnC1/kK+xr
Vjr5Y2YRjyjm4e8I/nTU+RX2p5K+Yth3CqWO3JuDiV/31UMtPl832n2GWSgXG2pP
B52oeuCP4Re76jqhOmJWY3CKPji+Rs16wj199i9AAcwhSF0rpi5+Fi84HtP3q6pH
cuzZfIPW44aJ5l4k+QvTLoWzr0XujMFcYzI45i3SJqTMs8xdIP5YLN8JXtQSPw9Z
8/nBKbPj7WTUC9cj9Cw2bz+wTpdRF4XCsUF3Vpl9fP7SK8yvv0I85LZnWQx1eQlv
COAM5HWxUT9bWgv18zXdYkc6VLw6ufQSxxuhLMjJxuK27Ny/F18/xYLRTVnse36d
tPJrseUPmvIK
-----END CERTIFICATE-----
"""
)
slsfile = textwrap.dedent(
"""\
{%- set ca_key_path = '"""
+ keyfile
+ """' %}
{%- set ca_crt_path = '"""
+ crtfile
+ """' %}
certificate.authority::private-key:
x509.private_key_managed:
- name: {{ ca_key_path }}
- backup: True
certificate.authority::certificate:
x509.certificate_managed:
- name: {{ ca_crt_path }}
- signing_private_key: {{ ca_key_path }}
- CN: Example Root CA
- O: Example
- C: BE
- ST: Antwerp
- L: Kapellen
- Email: certadm@example.org
- basicConstraints: "critical CA:true"
- keyUsage: "critical cRLSign, keyCertSign"
- subjectKeyIdentifier: hash
- authorityKeyIdentifier: keyid,issuer:always
- days_valid: 3650
- days_remaining: 0
- backup: True
- require:
- x509: certificate.authority::private-key
"""
)
with salt.utils.files.fopen(
os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "cert.sls"), "w"
) as wfh:
wfh.write(slsfile)
# Generate the certificate twice.
# On the first run, no key nor cert exist.
ret = self.run_function("state.sls", ["cert"])
log.debug(
"First state run ret dictionary:\n%s", pprint.pformat(list(ret.values()))
)
for state_run_id, state_run_details in ret.items():
if state_run_id.endswith("private_key_managed"):
assert state_run_details["result"]
assert "new" in state_run_details["changes"]
if state_run_id.endswith("certificate_managed"):
assert state_run_details["result"]
assert "Certificate" in state_run_details["changes"]
assert "New" in state_run_details["changes"]["Certificate"]
assert "Status" in state_run_details["changes"]
assert "New" in state_run_details["changes"]["Status"]
# On the second run, they exist and should not trigger any modification
ret = self.run_function("state.sls", ["cert"])
log.debug(
"Second state run ret dictionary:\n%s", pprint.pformat(list(ret.values()))
)
for state_run_id, state_run_details in ret.items():
if state_run_id.endswith("private_key_managed"):
assert state_run_details["result"]
assert state_run_details["changes"] == {}
if state_run_id.endswith("certificate_managed"):
assert state_run_details["result"]
assert state_run_details["changes"] == {}
# Now we repleace they key and cert contents with the contents of the above
# call, but under Py2
with salt.utils.files.fopen(keyfile, "w") as wfh:
wfh.write(keyfile_contents)
with salt.utils.files.fopen(keyfile) as rfh:
log.debug("Written keyfile, %r, contents:\n%s", keyfile, rfh.read())
with salt.utils.files.fopen(crtfile, "w") as wfh:
wfh.write(crtfile_contents)
with salt.utils.files.fopen(crtfile) as rfh:
log.debug("Written crtfile, %r, contents:\n%s", crtfile, rfh.read())
# We should not trigger any modification
ret = self.run_function("state.sls", ["cert"])
log.debug(
"Third state run ret dictionary:\n%s", pprint.pformat(list(ret.values()))
)
for state_run_id, state_run_details in ret.items():
if state_run_id.endswith("private_key_managed"):
assert state_run_details["result"]
assert state_run_details["changes"] == {}
if state_run_id.endswith("certificate_managed"):
assert state_run_details["result"]
assert state_run_details["changes"] == {}