Migrate to TZ-aware datetime objects in x509_v2

This commit is contained in:
jeanluc 2024-01-10 20:26:49 +01:00 committed by Pedro Algarvio
parent 792424104c
commit 37e50b000f
5 changed files with 73 additions and 40 deletions

1
changelog/65837.fixed.md Normal file
View file

@ -0,0 +1 @@
Corrected x509_v2 CRL creation `last_update` and `next_update` values when system timezone is not UTC

View file

@ -135,12 +135,12 @@ Note that when a ``ca_server`` is involved, both peers must use the updated modu
import base64 import base64
import copy import copy
import datetime
import glob import glob
import logging import logging
import os.path import os.path
import re import re
import sys import sys
from datetime import datetime, timedelta, timezone
try: try:
import cryptography.x509 as cx509 import cryptography.x509 as cx509
@ -1376,10 +1376,12 @@ def expires(certificate, days=0):
Defaults to ``0``, which checks for the current time. Defaults to ``0``, which checks for the current time.
""" """
cert = x509util.load_cert(certificate) cert = x509util.load_cert(certificate)
# dates are encoded in UTC/GMT, they are returned as a naive datetime object try:
return cert.not_valid_after <= datetime.datetime.utcnow() + datetime.timedelta( not_after = cert.not_valid_after_utc
days=days except AttributeError:
) # naive datetime object, release <42 (it's always UTC)
not_after = cert.not_valid_after.replace(tzinfo=timezone.utc)
return not_after <= datetime.now(tz=timezone.utc) + timedelta(days=days)
def expired(certificate): def expired(certificate):
@ -1659,6 +1661,13 @@ def read_certificate(certificate):
cert = x509util.load_cert(certificate) cert = x509util.load_cert(certificate)
key_type = x509util.get_key_type(cert.public_key(), as_string=True) key_type = x509util.get_key_type(cert.public_key(), as_string=True)
try:
not_before = cert.not_valid_before_utc
not_after = cert.not_valid_after_utc
except AttributeError:
# naive datetime object, release <42 (it's always UTC)
not_before = cert.not_valid_before.replace(tzinfo=timezone.utc)
not_after = cert.not_valid_after.replace(tzinfo=timezone.utc)
ret = { ret = {
"version": cert.version.value + 1, # 0-indexed "version": cert.version.value + 1, # 0-indexed
"key_size": cert.public_key().key_size if key_type in ["ec", "rsa"] else None, "key_size": cert.public_key().key_size if key_type in ["ec", "rsa"] else None,
@ -1674,8 +1683,8 @@ def read_certificate(certificate):
"issuer": _parse_dn(cert.issuer), "issuer": _parse_dn(cert.issuer),
"issuer_hash": x509util.pretty_hex(_get_name_hash(cert.issuer)), "issuer_hash": x509util.pretty_hex(_get_name_hash(cert.issuer)),
"issuer_str": cert.issuer.rfc4514_string(), "issuer_str": cert.issuer.rfc4514_string(),
"not_before": cert.not_valid_before.strftime(x509util.TIME_FMT), "not_before": not_before.strftime(x509util.TIME_FMT),
"not_after": cert.not_valid_after.strftime(x509util.TIME_FMT), "not_after": not_after.strftime(x509util.TIME_FMT),
"public_key": get_public_key(cert), "public_key": get_public_key(cert),
"extensions": _parse_extensions(cert.extensions), "extensions": _parse_extensions(cert.extensions),
} }
@ -1741,10 +1750,16 @@ def read_crl(crl):
The certificate revocation list to read. The certificate revocation list to read.
""" """
crl = x509util.load_crl(crl) crl = x509util.load_crl(crl)
try:
last_update = crl.last_update_utc
next_update = crl.next_update_utc
except AttributeError:
last_update = crl.last_update.replace(tzinfo=timezone.utc)
next_update = crl.next_update.replace(tzinfo=timezone.utc)
ret = { ret = {
"issuer": _parse_dn(crl.issuer), "issuer": _parse_dn(crl.issuer),
"last_update": crl.last_update.strftime(x509util.TIME_FMT), "last_update": last_update.strftime(x509util.TIME_FMT),
"next_update": crl.next_update.strftime(x509util.TIME_FMT), "next_update": next_update.strftime(x509util.TIME_FMT),
"revoked_certificates": {}, "revoked_certificates": {},
"extensions": _parse_extensions(crl.extensions), "extensions": _parse_extensions(crl.extensions),
} }
@ -1764,12 +1779,15 @@ def read_crl(crl):
ret["signature_algorithm"] = crl.signature_algorithm_oid.dotted_string ret["signature_algorithm"] = crl.signature_algorithm_oid.dotted_string
for revoked in crl: for revoked in crl:
try:
revocation_date = revoked.revocation_date_utc
except AttributeError:
# naive datetime object, release <42 (it's always UTC)
revocation_date = revoked.revocation_date.replace(tzinfo=timezone.utc)
ret["revoked_certificates"].update( ret["revoked_certificates"].update(
{ {
x509util.dec2hex(revoked.serial_number).replace(":", ""): { x509util.dec2hex(revoked.serial_number).replace(":", ""): {
"revocation_date": revoked.revocation_date.strftime( "revocation_date": revocation_date.strftime(x509util.TIME_FMT),
x509util.TIME_FMT
),
"extensions": _parse_crl_entry_extensions(revoked.extensions), "extensions": _parse_crl_entry_extensions(revoked.extensions),
} }
} }

View file

@ -183,9 +183,9 @@ according to the www policy.
import base64 import base64
import copy import copy
import datetime
import logging import logging
import os.path import os.path
from datetime import datetime, timedelta, timezone
import salt.utils.files import salt.utils.files
from salt.exceptions import CommandExecutionError, SaltInvocationError from salt.exceptions import CommandExecutionError, SaltInvocationError
@ -487,11 +487,16 @@ def certificate_managed(
else None else None
): ):
changes["pkcs12_friendlyname"] = pkcs12_friendlyname changes["pkcs12_friendlyname"] = pkcs12_friendlyname
try:
curr_not_after = current.not_valid_after_utc
except AttributeError:
# naive datetime object, release <42 (it's always UTC)
curr_not_after = current.not_valid_after.replace(
tzinfo=timezone.utc
)
if ( if curr_not_after < datetime.now(tz=timezone.utc) + timedelta(
current.not_valid_after days=days_remaining
< datetime.datetime.utcnow()
+ datetime.timedelta(days=days_remaining)
): ):
changes["expiration"] = True changes["expiration"] = True
@ -896,10 +901,14 @@ def crl_managed(
if encoding != current_encoding: if encoding != current_encoding:
changes["encoding"] = encoding changes["encoding"] = encoding
try:
curr_next_update = current.next_update_utc
except AttributeError:
# naive datetime object, release <42 (it's always UTC)
curr_next_update = current.next_update.replace(tzinfo=timezone.utc)
if days_remaining and ( if days_remaining and (
current.next_update curr_next_update
< datetime.datetime.utcnow() < datetime.now(tz=timezone.utc) + timedelta(days=days_remaining)
+ datetime.timedelta(days=days_remaining)
): ):
changes["expiration"] = True changes["expiration"] = True

View file

@ -1,10 +1,10 @@
import base64 import base64
import copy import copy
import datetime
import ipaddress import ipaddress
import logging import logging
import os.path import os.path
import re import re
from datetime import datetime, timedelta, timezone
from enum import Enum from enum import Enum
from urllib.parse import urlparse, urlunparse from urllib.parse import urlparse, urlunparse
@ -313,14 +313,14 @@ def build_crt(
) )
not_before = ( not_before = (
datetime.datetime.strptime(not_before, TIME_FMT) datetime.strptime(not_before, TIME_FMT).replace(tzinfo=timezone.utc)
if not_before if not_before
else datetime.datetime.utcnow() else datetime.now(tz=timezone.utc)
) )
not_after = ( not_after = (
datetime.datetime.strptime(not_after, TIME_FMT) datetime.strptime(not_after, TIME_FMT).replace(tzinfo=timezone.utc)
if not_after if not_after
else datetime.datetime.utcnow() + datetime.timedelta(days=days_valid) else datetime.now(tz=timezone.utc) + timedelta(days=days_valid)
) )
builder = builder.not_valid_before(not_before).not_valid_after(not_after) builder = builder.not_valid_before(not_before).not_valid_after(not_after)
@ -422,32 +422,38 @@ def build_crl(
builder = cx509.CertificateRevocationListBuilder() builder = cx509.CertificateRevocationListBuilder()
if signing_cert: if signing_cert:
builder = builder.issuer_name(signing_cert.subject) builder = builder.issuer_name(signing_cert.subject)
builder = builder.last_update(datetime.datetime.today()) builder = builder.last_update(datetime.now(tz=timezone.utc))
builder = builder.next_update( builder = builder.next_update(
datetime.datetime.today() + datetime.timedelta(days=days_valid) datetime.now(tz=timezone.utc) + timedelta(days=days_valid)
) )
for rev in revoked: for rev in revoked:
serial_number = not_after = revocation_date = None serial_number = not_after = revocation_date = None
if "not_after" in rev: if "not_after" in rev:
not_after = datetime.datetime.strptime(rev["not_after"], TIME_FMT) not_after = datetime.strptime(rev["not_after"], TIME_FMT).replace(
tzinfo=timezone.utc
)
if "serial_number" in rev: if "serial_number" in rev:
serial_number = rev["serial_number"] serial_number = rev["serial_number"]
if "certificate" in rev: if "certificate" in rev:
rev_cert = load_cert(rev["certificate"]) rev_cert = load_cert(rev["certificate"])
serial_number = rev_cert.serial_number serial_number = rev_cert.serial_number
not_after = rev_cert.not_valid_after try:
not_after = rev_cert.not_valid_after_utc
except AttributeError:
# naive datetime object, release <42 (it's always UTC)
not_after = rev_cert.not_valid_after.replace(tzinfo=timezone.utc)
if not serial_number: if not serial_number:
raise SaltInvocationError("Need serial_number or certificate") raise SaltInvocationError("Need serial_number or certificate")
serial_number = _get_serial_number(serial_number) serial_number = _get_serial_number(serial_number)
if not_after and not include_expired: if not_after and not include_expired:
if datetime.datetime.utcnow() > not_after: if datetime.now(tz=timezone.utc) > not_after:
continue continue
if "revocation_date" in rev: if "revocation_date" in rev:
revocation_date = datetime.datetime.strptime( revocation_date = datetime.strptime(
rev["revocation_date"], TIME_FMT rev["revocation_date"], TIME_FMT
) ).replace(tzinfo=timezone.utc)
else: else:
revocation_date = datetime.datetime.utcnow() revocation_date = datetime.now(tz=timezone.utc)
revoked_cert = cx509.RevokedCertificateBuilder( revoked_cert = cx509.RevokedCertificateBuilder(
serial_number=serial_number, revocation_date=revocation_date serial_number=serial_number, revocation_date=revocation_date
@ -1624,8 +1630,9 @@ def _create_invalidity_date(val, **kwargs):
if critical: if critical:
val = val.split(" ", maxsplit=1)[1] val = val.split(" ", maxsplit=1)[1]
try: try:
# InvalidityDate deals in naive datetime objects only currently
return ( return (
cx509.InvalidityDate(datetime.datetime.strptime(val, TIME_FMT)), cx509.InvalidityDate(datetime.strptime(val, TIME_FMT)),
critical, critical,
) )
except ValueError as err: except ValueError as err:

View file

@ -1,5 +1,5 @@
import datetime
import ipaddress import ipaddress
from datetime import datetime
import pytest import pytest
@ -1019,12 +1019,12 @@ class TestCreateExtension:
[ [
( (
"critical, 2022-10-11 13:37:42", "critical, 2022-10-11 13:37:42",
datetime.datetime.strptime("2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S"), datetime.strptime("2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S"),
True, True,
), ),
( (
"2022-10-11 13:37:42", "2022-10-11 13:37:42",
datetime.datetime.strptime("2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S"), datetime.strptime("2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S"),
False, False,
), ),
], ],
@ -1875,9 +1875,7 @@ def test_get_dn(inpt, expected):
cx509.Extension( cx509.Extension(
cx509.InvalidityDate.oid, cx509.InvalidityDate.oid,
value=cx509.InvalidityDate( value=cx509.InvalidityDate(
datetime.datetime.strptime( datetime.strptime("2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S")
"2022-10-11 13:37:42", "%Y-%m-%d %H:%M:%S"
)
), ),
critical=False, critical=False,
), ),