Merge pull request #25468 from joejulian/use_pyopenssl_0_10

Add support for pyOpenSSL > 0.10
This commit is contained in:
Thomas S Hatch 2015-07-16 09:10:30 -06:00
commit 7a20ecbf46
2 changed files with 218 additions and 38 deletions

View file

@ -4,7 +4,8 @@ A salt module for SSL/TLS.
Can create a Certificate Authority (CA)
or use Self-Signed certificates.
:depends: - PyOpenSSL Python module (0.14 or later)
:depends: - PyOpenSSL Python module (0.10 or later, 0.14 or later for
X509 extension support)
:configuration: Add the following values in /etc/salt/minion for the CA module
to function properly::
@ -113,6 +114,7 @@ from distutils.version import LooseVersion
import re
HAS_SSL = False
X509_EXT_ENABLED = True
try:
import OpenSSL
HAS_SSL = True
@ -133,9 +135,15 @@ def __virtual__():
'''
Only load this module if the ca config options are set
'''
if HAS_SSL and OpenSSL_version >= LooseVersion('0.14'):
if OpenSSL_version <= LooseVersion('0.15'):
log.warn('You should upgrade pyOpenSSL to at least 0.15.1')
global X509_EXT_ENABLED
if HAS_SSL and OpenSSL_version >= LooseVersion('0.10'):
if OpenSSL_version < LooseVersion('0.14'):
X509_EXT_ENABLED = False
log.error('You should upgrade pyOpenSSL to at least 0.14.1 '
'to enable the use of X509 extensions')
elif OpenSSL_version <= LooseVersion('0.15'):
log.warn('You should upgrade pyOpenSSL to at least 0.15.1 '
'to enable the full use of X509 extensions')
# never EVER reactivate this code, this has been done too many times.
# not having configured a cert path in the configuration does not
# mean that users cant use this module as we provide methods
@ -147,9 +155,9 @@ def __virtual__():
# return False
return True
else:
return False, ['PyOpenSSL version 0.14 or later'
' must be installed before '
' this module can be used.']
X509_EXT_ENABLED = False
return False, ['PyOpenSSL version 0.10 or later must be installed '
'before this module can be used.']
def cert_base_path(cacert_path=None):
@ -686,20 +694,21 @@ def create_ca(ca_name,
ca.set_issuer(ca.get_subject())
ca.set_pubkey(key)
ca.add_extensions([
OpenSSL.crypto.X509Extension('basicConstraints', True,
'CA:TRUE, pathlen:0'),
OpenSSL.crypto.X509Extension('keyUsage', True,
'keyCertSign, cRLSign'),
OpenSSL.crypto.X509Extension('subjectKeyIdentifier', False, 'hash',
subject=ca)])
if X509_EXT_ENABLED:
ca.add_extensions([
OpenSSL.crypto.X509Extension('basicConstraints', True,
'CA:TRUE, pathlen:0'),
OpenSSL.crypto.X509Extension('keyUsage', True,
'keyCertSign, cRLSign'),
OpenSSL.crypto.X509Extension('subjectKeyIdentifier', False,
'hash', subject=ca)])
ca.add_extensions([
OpenSSL.crypto.X509Extension(
'authorityKeyIdentifier',
False,
'issuer:always,keyid:always',
issuer=ca)])
ca.add_extensions([
OpenSSL.crypto.X509Extension(
'authorityKeyIdentifier',
False,
'issuer:always,keyid:always',
issuer=ca)])
ca.sign(key, digest)
# alway backup existing keys in case
@ -754,6 +763,10 @@ def get_extensions(cert_type):
'''
assert X509_EXT_ENABLED, ('X509 extensions are not supported in '
'pyOpenSSL prior to version 0.15.1. Your '
'version: {0}'.format(OpenSSL_version))
ext = {}
if cert_type == '':
log.error('cert_type set to empty in tls_ca.get_extensions(); '
@ -974,21 +987,36 @@ def create_csr(ca_name,
req.get_subject().CN = CN
req.get_subject().emailAddress = emailAddress
extensions = get_extensions(cert_type)['csr']
extension_adds = []
try:
extensions = get_extensions(cert_type)['csr']
for ext, value in extensions.items():
extension_adds.append(OpenSSL.crypto.X509Extension(ext, False, value))
extension_adds = []
for ext, value in extensions.items():
extension_adds.append(OpenSSL.crypto.X509Extension(ext, False,
value))
except AssertionError as err:
log.error(err)
extensions = []
if subjectAltName:
if isinstance(subjectAltName, str):
subjectAltName = [subjectAltName]
if X509_EXT_ENABLED:
if isinstance(subjectAltName, str):
subjectAltName = [subjectAltName]
extension_adds.append(
OpenSSL.crypto.X509Extension(
'subjectAltName', False, ", ".join(subjectAltName)))
extension_adds.append(
OpenSSL.crypto.X509Extension(
'subjectAltName', False, ", ".join(subjectAltName)))
else:
raise ValueError('subjectAltName cannot be set as X509 '
'extensions are not supported in pyOpenSSL '
'prior to version 0.15.1. Your '
'version: {0}.'.format(OpenSSL_version))
if X509_EXT_ENABLED:
req.add_extensions(extension_adds)
req.add_extensions(extension_adds)
req.set_pubkey(key)
req.sign(key, digest)
@ -1344,8 +1372,6 @@ def create_ca_signed_cert(ca_name,
exts = []
try:
exts.extend(req.get_extensions())
log.debug('req.get_extensions() supported in pyOpenSSL {0}'.format(
OpenSSL.__dict__.get('__version__', '')))
except AttributeError:
try:
# see: http://bazaar.launchpad.net/~exarkun/pyopenssl/master/revision/189
@ -1353,9 +1379,9 @@ def create_ca_signed_cert(ca_name,
# so we mimic the newly get_extensions method present in ultra
# recent pyopenssl distros
log.info('req.get_extensions() not supported in pyOpenSSL versions '
'prior to 0.15. Switching to Dark Magic(tm) '
'prior to 0.15. Processing extensions internally. '
' Your version: {0}'.format(
OpenSSL.__dict__.get('__version__', 'pre-2014')))
OpenSSL_version))
native_exts_obj = OpenSSL._util.lib.X509_REQ_get_extensions(
req._req)
@ -1369,10 +1395,9 @@ def create_ca_signed_cert(ca_name,
exts.append(ext)
except Exception:
log.error('X509 extensions are unsupported in pyOpenSSL '
'versions prior to 0.14. Upgrade required. Current '
'version: {0}'.format(
OpenSSL.__dict__.get('__version__', 'pre-2014'))
)
'versions prior to 0.14. Upgrade required to '
'use extensions. Current version: {0}'.format(
OpenSSL_version))
cert = OpenSSL.crypto.X509()
cert.set_version(2)

View file

@ -11,6 +11,7 @@ NO_PYOPENSSL = False
import shutil
import tempfile
import os
from distutils.version import LooseVersion
try:
# We're not going to actually use OpenSSL, we just want to check that
# it's installed.
@ -642,6 +643,160 @@ class TLSAddTestCase(TestCase):
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
def test_pyOpenSSL_version(self):
'''
Test extension logic with different pyOpenSSL versions
'''
pillarval = {'csr': {'extendedKeyUsage': 'serverAuth'}}
mock_pgt = MagicMock(return_value=pillarval)
with patch.dict(tls.__dict__, {
'OpenSSL_version': LooseVersion('0.1.1'),
'X509_EXT_ENABLED': False}):
self.assertEqual(tls.__virtual__(),
(False, ['PyOpenSSL version 0.10 or later must be installed '
'before this module can be used.']))
with patch.dict(tls.__salt__, {'pillar.get': mock_pgt}):
self.assertRaises(AssertionError, tls.get_extensions, 'server')
self.assertRaises(AssertionError, tls.get_extensions, 'client')
with patch.dict(tls.__dict__, {
'OpenSSL_version': LooseVersion('0.14.1'),
'X509_EXT_ENABLED': True}):
self.assertTrue(tls.__virtual__())
with patch.dict(tls.__salt__, {'pillar.get': mock_pgt}):
self.assertEqual(tls.get_extensions('server'), pillarval)
self.assertEqual(tls.get_extensions('client'), pillarval)
with patch.dict(tls.__dict__, {
'OpenSSL_version': LooseVersion('0.15.1'),
'X509_EXT_ENABLED': True}):
self.assertTrue(tls.__virtual__())
with patch.dict(tls.__salt__, {'pillar.get': mock_pgt}):
self.assertEqual(tls.get_extensions('server'), pillarval)
self.assertEqual(tls.get_extensions('client'), pillarval)
@destructiveTest
def test_pyOpenSSL_version_destructive(self):
'''
Test extension logic with different pyOpenSSL versions
'''
pillarval = {'csr': {'extendedKeyUsage': 'serverAuth'}}
mock_pgt = MagicMock(return_value=pillarval)
ca_path = tempfile.mkdtemp(dir=integration.SYS_TMP_DIR)
ca_name = 'test_ca'
certp = '{0}/{1}/{2}_ca_cert.crt'.format(
ca_path,
ca_name,
ca_name)
certk = '{0}/{1}/{2}_ca_cert.key'.format(
ca_path,
ca_name,
ca_name)
ret = 'Created Private Key: "{0}." Created CA "{1}": "{2}."'.format(
certk, ca_name, certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
try:
with patch.dict(tls.__salt__, {
'config.option': mock_opt,
'cmd.retcode': mock_ret}):
with patch.dict(tls.__opts__, {
'hash_type': 'sha256',
'cachedir': ca_path}):
with patch.dict(_TLS_TEST_DATA['create_ca'],
{'replace': True}):
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.1.1'),
'X509_EXT_ENABLED': False}):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.14.1'),
'X509_EXT_ENABLED': True}):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.15.1'),
'X509_EXT_ENABLED': True}):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
try:
certp = '{0}/{1}/certs/{2}.csr'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created CSR for "{1}": "{2}."').format(
certk, _TLS_TEST_DATA['create_ca']['CN'], certp)
with patch.dict(tls.__salt__, {
'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}):
with patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}):
with patch.dict(_TLS_TEST_DATA['create_ca'], {
'subjectAltName': 'DNS:foo.bar',
'replace': True}):
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.1.1'),
'X509_EXT_ENABLED': False}):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertRaises(ValueError,
tls.create_csr,
ca_name,
**_TLS_TEST_DATA['create_ca'])
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.14.1'),
'X509_EXT_ENABLED': True}):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.15.1'),
'X509_EXT_ENABLED': True}):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
if __name__ == '__main__':
from integration import run_tests
run_tests(TLSAddTestCase, needs_daemon=False)