Merge pull request #56850 from waynew/master-port/51461

Port 51461 to master
This commit is contained in:
Daniel Wozniak 2020-04-22 19:53:42 -07:00 committed by GitHub
commit c41d76c93e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 234 additions and 11 deletions

1
changelog/7424.added Normal file
View file

@ -0,0 +1 @@
Added `validate` to tls module.

View file

@ -550,6 +550,52 @@ def _read_cert(cert):
return cert
def validate(cert, ca_name, crl_file):
"""
.. versionadded:: Neon
Validate a certificate against a given CA/CRL.
cert
path to the certifiate PEM file or string
ca_name
name of the CA
crl_file
full path to the CRL file
"""
store = OpenSSL.crypto.X509Store()
cert_obj = _read_cert(cert)
if cert_obj is None:
raise CommandExecutionError(
"Failed to read cert from {0}, see log for details".format(cert)
)
ca_dir = "{0}/{1}".format(cert_base_path(), ca_name)
ca_cert = _read_cert("{0}/{1}_ca_cert.crt".format(ca_dir, ca_name))
store.add_cert(ca_cert)
# These flags tell OpenSSL to check the leaf as well as the
# entire cert chain.
X509StoreFlags = OpenSSL.crypto.X509StoreFlags
store.set_flags(X509StoreFlags.CRL_CHECK | X509StoreFlags.CRL_CHECK_ALL)
if crl_file is None:
crl = OpenSSL.crypto.CRL()
else:
with salt.utils.files.fopen(crl_file) as fhr:
crl = OpenSSL.crypto.load_crl(OpenSSL.crypto.FILETYPE_PEM, fhr.read())
store.add_crl(crl)
context = OpenSSL.crypto.X509StoreContext(store, cert_obj)
ret = {}
try:
context.verify_certificate()
ret["valid"] = True
except OpenSSL.crypto.X509StoreContextError as e:
ret["error"] = str(e)
ret["error_cert"] = e.certificate
ret["valid"] = False
return ret
def _get_expiration_date(cert):
"""
Returns a datetime.datetime object
@ -901,8 +947,10 @@ def get_extensions(cert_type):
)
except NameError as e:
log.debug(
"pillar, tls:extensions:{0} not available or "
"not operating in a salt context\n{1}".format(cert_type, e)
"pillar, tls:extensions:%s not available or "
"not operating in a salt context\n%s",
cert_type,
e,
)
retval = ext["common"]
@ -1468,7 +1516,8 @@ def create_ca_signed_cert(
log.info(
"req.get_extensions() not supported in pyOpenSSL versions "
"prior to 0.15. Processing extensions internally. "
" Your version: {0}".format(OpenSSL_version)
"Your version: %s",
OpenSSL_version,
)
native_exts_obj = OpenSSL._util.lib.X509_REQ_get_extensions(req._req)
@ -1482,7 +1531,8 @@ def create_ca_signed_cert(
log.error(
"X509 extensions are unsupported in pyOpenSSL "
"versions prior to 0.14. Upgrade required to "
"use extensions. Current version: {0}".format(OpenSSL_version)
"use extensions. Current version: %s",
OpenSSL_version,
)
cert = OpenSSL.crypto.X509()
@ -1677,8 +1727,10 @@ def cert_info(cert, digest="sha256"):
entry, name = name.split(":", 1)
if entry not in valid_entries:
log.error(
"Cert {0} has an entry ({1}) which does not start "
"with {2}".format(ret["subject"], name, "/".join(valid_entries))
"Cert %s has an entry (%s) which does not start " "with %s",
ret["subject"],
name,
"/".join(valid_entries),
)
else:
valid_names.add(name)
@ -1698,7 +1750,9 @@ def cert_info(cert, digest="sha256"):
return ret
def create_empty_crl(ca_name, cacert_path=None, ca_filename=None, crl_file=None):
def create_empty_crl(
ca_name, cacert_path=None, ca_filename=None, crl_file=None, digest="sha256"
):
"""
Create an empty Certificate Revocation List.
@ -1716,6 +1770,11 @@ def create_empty_crl(ca_name, cacert_path=None, ca_filename=None, crl_file=None)
crl_file
full path to the CRL file
digest
The message digest algorithm. Must be a string describing a digest
algorithm supported by OpenSSL (by EVP_get_digestbyname, specifically).
For example, "md5" or "sha1". Default: 'sha256'
CLI Example:
.. code-block:: bash
@ -1753,7 +1812,9 @@ def create_empty_crl(ca_name, cacert_path=None, ca_filename=None, crl_file=None)
return 'There is no CA named "{0}"'.format(ca_name)
crl = OpenSSL.crypto.CRL()
crl_text = crl.export(ca_cert, ca_key)
crl_text = crl.export(
ca_cert, ca_key, digest=salt.utils.stringutils.to_bytes(digest),
)
with salt.utils.files.fopen(crl_file, "w") as f:
f.write(salt.utils.stringutils.to_str(crl_text))
@ -1769,6 +1830,7 @@ def revoke_cert(
cert_path=None,
cert_filename=None,
crl_file=None,
digest="sha256",
):
"""
Revoke a certificate.
@ -1797,6 +1859,11 @@ def revoke_cert(
crl_file
Full path to the CRL file.
digest
The message digest algorithm. Must be a string describing a digest
algorithm supported by OpenSSL (by EVP_get_digestbyname, specifically).
For example, "md5" or "sha1". Default: 'sha256'
CLI Example:
.. code-block:: bash
@ -1886,12 +1953,18 @@ def revoke_cert(
if line.startswith("R"):
fields = line.split("\t")
revoked = OpenSSL.crypto.Revoked()
revoked.set_serial(fields[3])
revoked.set_serial(salt.utils.stringutils.to_bytes(fields[3]))
revoke_date_2_digit = datetime.strptime(fields[2], two_digit_year_fmt)
revoked.set_rev_date(revoke_date_2_digit.strftime(four_digit_year_fmt))
revoked.set_rev_date(
salt.utils.stringutils.to_bytes(
revoke_date_2_digit.strftime(four_digit_year_fmt)
)
)
crl.add_revoked(revoked)
crl_text = crl.export(ca_cert, ca_key)
crl_text = crl.export(
ca_cert, ca_key, digest=salt.utils.stringutils.to_bytes(digest)
)
if crl_file is None:
crl_file = "{0}/{1}/crl.pem".format(_cert_base_path(), ca_name)

View file

@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
"""
:codeauthor: Wayne Werner <wwerner@saltstack.com>
"""
# Import the future
from __future__ import absolute_import, print_function, unicode_literals
import os
import tempfile
# Salt Libs
import salt.modules.cmdmod as cmd
import salt.modules.file as file
import salt.modules.tls as tls
import salt.utils.files as files
import salt.utils.stringutils as stringutils
# Testing libs
from tests.support.case import ModuleCase
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.mock import MagicMock
from tests.support.runtests import RUNTIME_VARS
class TLSModuleTest(ModuleCase, LoaderModuleMockMixin):
"""
Tests for salt.modules.tls
"""
def setup_loader_modules(self):
opts = {
"cachedir": os.path.join(RUNTIME_VARS.TMP, "cache"),
"test": True,
}
return {
tls: {
"__salt__": {
"config.option": MagicMock(return_value=self.tempdir),
"cmd.retcode": cmd.retcode,
"pillar.get": MagicMock(return_value=False),
"file.replace": file.replace,
},
"__opts__": opts,
},
file: {
"__utils__": {
"files.is_text": files.is_text,
"stringutils.get_diff": stringutils.get_diff,
},
"__opts__": opts,
},
}
@classmethod
def setUpClass(cls):
cls.ca_name = "roscivs"
cls.tempdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
def test_ca_exists_should_be_False_before_ca_is_created(self):
self.assertFalse(tls.ca_exists(self.ca_name))
def test_ca_exists_should_be_True_after_ca_is_created(self):
tls.create_ca(self.ca_name)
self.assertTrue(tls.ca_exists(self.ca_name))
def test_creating_csr_should_fail_with_no_ca(self):
expected_message = (
'Certificate for CA named "bad_ca" does not exist,'
" please create it first."
)
self.assertEqual(tls.create_csr(ca_name="bad_ca"), expected_message)
def test_with_existing_ca_signing_csr_should_produce_valid_cert(self):
print("Revoked should not be here")
empty_crl_filename = os.path.join(self.tempdir, "empty.crl")
tls.create_ca(self.ca_name)
tls.create_csr(
ca_name=self.ca_name, CN="testing.localhost",
)
tls.create_ca_signed_cert(
ca_name=self.ca_name, CN="testing.localhost",
)
tls.create_empty_crl(
ca_name=self.ca_name, crl_file=empty_crl_filename,
)
ret = tls.validate(
cert=os.path.join(
self.tempdir, self.ca_name, "certs", "testing.localhost.crt",
),
ca_name=self.ca_name,
crl_file=empty_crl_filename,
)
print("not there")
self.assertTrue(ret["valid"], ret.get("error"))
def test_revoked_cert_should_return_False_from_validate(self):
revoked_crl_filename = os.path.join(self.tempdir, "revoked.crl")
tls.create_ca(self.ca_name)
tls.create_csr(
ca_name=self.ca_name, CN="testing.bad.localhost",
)
tls.create_ca_signed_cert(
ca_name=self.ca_name, CN="testing.bad.localhost",
)
tls.create_empty_crl(
ca_name=self.ca_name, crl_file=revoked_crl_filename,
)
tls.revoke_cert(
ca_name=self.ca_name,
CN="testing.bad.localhost",
crl_file=revoked_crl_filename,
)
self.assertFalse(
tls.validate(
cert=os.path.join(
self.tempdir, self.ca_name, "certs", "testing.bad.localhost.crt",
),
ca_name=self.ca_name,
crl_file=revoked_crl_filename,
)["valid"]
)
def test_validating_revoked_cert_with_no_crl_file_should_return_False(self):
revoked_crl_filename = None
tls.create_ca(self.ca_name)
tls.create_csr(
ca_name=self.ca_name, CN="testing.bad.localhost",
)
tls.create_ca_signed_cert(
ca_name=self.ca_name, CN="testing.bad.localhost",
)
tls.create_empty_crl(
ca_name=self.ca_name, crl_file=revoked_crl_filename,
)
tls.revoke_cert(
ca_name=self.ca_name,
CN="testing.bad.localhost",
crl_file=revoked_crl_filename,
)
self.assertFalse(
tls.validate(
cert=os.path.join(
self.tempdir, self.ca_name, "certs", "testing.bad.localhost.crt",
),
ca_name=self.ca_name,
crl_file=revoked_crl_filename,
)["valid"]
)