Try to refresh expired GPG keys

... otherwise fail since expired keys are meaningless.

Also:
* Untangle `gpg.present` logic
* Correctly order key changes in returned comment and prefix messages
  with key ID
* Reuse key rendering logic in execution module
* Add some tests for test mode because of logic reordering
This commit is contained in:
jeanluc 2024-04-04 17:37:09 +02:00 committed by Daniel Wozniak
parent 4294a82322
commit 21d5cca071
6 changed files with 522 additions and 223 deletions

View file

@ -0,0 +1 @@
Made `gpg.present` attempt to refresh keys if they are expired

View file

@ -139,16 +139,24 @@ def _get_user_info(user=None):
"""
Wrapper for user.info Salt function
"""
user_from_config = False
if not user:
# Get user Salt running as
# Get user Salt is running as
user = __salt__["config.option"]("user")
# Ensure we don't get an infinite loop when `salt` is returned as the user,
# but it does not exist for some reason.
user_from_config = True
if salt.utils.platform.is_windows() and "\\" in user:
# At least in the test suite, this config option is set
# including the hostname, so split it off
user = user.split("\\", maxsplit=1)[1]
userinfo = __salt__["user.info"](user)
if not userinfo:
if user == "salt":
if user == "salt" and not user_from_config:
# Special case with `salt` user:
# if it doesn't exist then fall back to user Salt running as
# if it doesn't exist then fall back to user Salt is running as
userinfo = _get_user_info()
else:
raise SaltInvocationError(f"User {user} does not exist")
@ -316,23 +324,7 @@ def search_keys(text, keyserver=None, user=None, gnupghome=None):
_keys = []
for _key in _search_keys(text, keyserver, user=user, gnupghome=gnupghome):
tmp = {"keyid": _key["keyid"], "uids": _key["uids"]}
expires = _key.get("expires", None)
date = _key.get("date", None)
length = _key.get("length", None)
if expires:
tmp["expires"] = time.strftime(
"%Y-%m-%d", time.localtime(float(_key["expires"]))
)
if date:
tmp["created"] = time.strftime(
"%Y-%m-%d", time.localtime(float(_key["date"]))
)
if length:
tmp["keyLength"] = _key["length"]
_keys.append(tmp)
_keys.append(_render_key(_key))
return _keys
@ -403,9 +395,10 @@ def list_secret_keys(user=None, gnupghome=None, keyring=None):
def _render_key(_key):
tmp = {
"keyid": _key["keyid"],
"fingerprint": _key["fingerprint"],
"uids": _key["uids"],
}
if "fingerprint" in _key:
tmp["fingerprint"] = _key["fingerprint"]
expires = _key.get("expires", None)
date = _key.get("date", None)
@ -417,6 +410,7 @@ def _render_key(_key):
tmp["expires"] = time.strftime(
"%Y-%m-%d", time.localtime(float(_key["expires"]))
)
tmp["expired"] = time.time() >= float(expires)
if date:
tmp["created"] = time.strftime("%Y-%m-%d", time.localtime(float(_key["date"])))
if length:
@ -731,41 +725,14 @@ def get_key(keyid=None, fingerprint=None, user=None, gnupghome=None, keyring=Non
salt '*' gpg.get_key keyid=3FAD9F1E user=username
"""
tmp = {}
for _key in _list_keys(user=user, gnupghome=gnupghome, keyring=keyring):
if (
_key["fingerprint"] == fingerprint
or _key["keyid"] == keyid
or _key["keyid"][8:] == keyid
):
tmp["keyid"] = _key["keyid"]
tmp["fingerprint"] = _key["fingerprint"]
tmp["uids"] = _key["uids"]
expires = _key.get("expires", None)
date = _key.get("date", None)
length = _key.get("length", None)
owner_trust = _key.get("ownertrust", None)
trust = _key.get("trust", None)
if expires:
tmp["expires"] = time.strftime(
"%Y-%m-%d", time.localtime(float(_key["expires"]))
)
if date:
tmp["created"] = time.strftime(
"%Y-%m-%d", time.localtime(float(_key["date"]))
)
if length:
tmp["keyLength"] = _key["length"]
if owner_trust:
tmp["ownerTrust"] = LETTER_TRUST_DICT[_key["ownertrust"]]
if trust:
tmp["trust"] = LETTER_TRUST_DICT[_key["trust"]]
if not tmp:
return False
else:
return tmp
return _render_key(_key)
return False
def get_secret_key(
@ -805,7 +772,6 @@ def get_secret_key(
salt '*' gpg.get_secret_key keyid=3FAD9F1E user=username
"""
tmp = {}
for _key in _list_keys(
user=user, gnupghome=gnupghome, keyring=keyring, secret=True
):
@ -814,34 +780,8 @@ def get_secret_key(
or _key["keyid"] == keyid
or _key["keyid"][8:] == keyid
):
tmp["keyid"] = _key["keyid"]
tmp["fingerprint"] = _key["fingerprint"]
tmp["uids"] = _key["uids"]
expires = _key.get("expires", None)
date = _key.get("date", None)
length = _key.get("length", None)
owner_trust = _key.get("ownertrust", None)
trust = _key.get("trust", None)
if expires:
tmp["expires"] = time.strftime(
"%Y-%m-%d", time.localtime(float(_key["expires"]))
)
if date:
tmp["created"] = time.strftime(
"%Y-%m-%d", time.localtime(float(_key["date"]))
)
if length:
tmp["keyLength"] = _key["length"]
if owner_trust:
tmp["ownerTrust"] = LETTER_TRUST_DICT[_key["ownertrust"]]
if trust:
tmp["trust"] = LETTER_TRUST_DICT[_key["trust"]]
if not tmp:
return False
else:
return tmp
return _render_key(_key)
return False
@_restore_ownership
@ -1164,6 +1104,14 @@ def receive_keys(keyserver=None, keys=None, user=None, gnupghome=None, keyring=N
ret["message"].append(
f"Key {result['fingerprint']} already exists in keychain"
)
elif result["ok"] == "4":
ret["message"].append(
f"Key {result['fingerprint']} updated: new signatures"
)
elif result["ok"] == "8":
ret["message"].append(
f"Key {result['fingerprint']} updated: new subkeys"
)
elif "problem" in result:
ret["message"].append(
f"Unable to add key to keychain: {result.get('text', 'No further description')}"

View file

@ -10,7 +10,7 @@ import logging
import salt.utils.dictupdate
import salt.utils.immutabletypes as immutabletypes
from salt.exceptions import SaltInvocationError
from salt.exceptions import CommandExecutionError, SaltInvocationError
log = logging.getLogger(__name__)
@ -26,6 +26,12 @@ TRUST_MAP = immutabletypes.freeze(
)
class KeyNotContained(CommandExecutionError):
"""
Raised when a data source does not contain a requested key
"""
def present(
name,
keys=None,
@ -40,7 +46,8 @@ def present(
**kwargs,
):
"""
Ensure a GPG public key is present in the GPG keychain.
Ensure a GPG public key is present in the GPG keychain and
that it is not expired.
name
The key ID of the GPG public key.
@ -104,102 +111,65 @@ def present(
.. versionadded:: 3008.0
"""
ret = {"name": name, "result": True, "changes": {}, "comment": []}
ret = {"name": name, "result": True, "changes": {}, "comment": ""}
if not text and skip_keyserver and not source:
ret["result"] = False
ret["comment"] = (
"When skipping keyservers, you must provide at least one source"
try:
if not text and skip_keyserver and not source:
raise SaltInvocationError(
"When skipping keyservers, you must provide at least one source"
)
if trust and trust not in TRUST_MAP:
raise SaltInvocationError(f"Invalid trust level {trust}")
_current_keys = __salt__["gpg.list_keys"](
user=user, gnupghome=gnupghome, keyring=keyring
)
except (CommandExecutionError, SaltInvocationError) as err:
ret["result"] = False
ret["comment"] = str(err)
return ret
_current_keys = __salt__["gpg.list_keys"](
user=user, gnupghome=gnupghome, keyring=keyring
)
current_keys = {}
expired_keys = []
for key in _current_keys:
keyid = key["keyid"]
current_keys[keyid] = {}
current_keys[keyid]["trust"] = key["trust"]
if key.get("expired"):
expired_keys.append(keyid)
if not keys:
keys = name
if isinstance(keys, str):
keys = [keys]
key_res = {}
# First, ensure all keys are present
for key in keys:
if key in current_keys:
if trust:
if trust in TRUST_MAP:
if current_keys[key]["trust"] != TRUST_MAP[trust]:
if __opts__["test"]:
ret["result"] = None
ret["comment"].append(
f"Would have set trust level for {key} to {trust}"
)
salt.utils.dictupdate.set_dict_key_value(
ret, f"changes:{key}:trust", trust
)
continue
try:
# update trust level
result = __salt__["gpg.trust_key"](
keyid=key,
trust_level=trust,
user=user,
gnupghome=gnupghome,
keyring=keyring,
)
except SaltInvocationError as err:
result = {"res": False, "message": str(err)}
if result["res"] is False:
ret["result"] = result["res"]
ret["comment"].append(result["message"])
else:
salt.utils.dictupdate.set_dict_key_value(
ret, f"changes:{key}:trust", trust
)
ret["comment"].append(
f"Set trust level for {key} to {trust}"
)
else:
ret["comment"].append(
f"GPG Public Key {key} already in correct trust state"
)
else:
ret["comment"].append(f"Invalid trust level {trust}")
ret["comment"].append(f"GPG Public Key {key} already in keychain")
else:
key_res[key] = []
try:
refresh = key in expired_keys
if key in current_keys and not refresh:
key_res[key].append(f"GPG Public Key {key} already in keychain")
continue
if __opts__["test"]:
ret["result"] = None
ret["comment"].append(f"Would have added {key} to GPG keychain")
key_res[key] = [f"Would have added {key} to GPG keychain"]
salt.utils.dictupdate.set_dict_key_value(
ret, f"changes:{key}:added", True
)
if refresh:
key_res[key][-1] += " (the existing one was expired)"
salt.utils.dictupdate.set_dict_key_value(
ret, f"changes:{key}:refresh", True
)
else:
current_keys[key] = {"trust": "unknown"}
continue
result = {}
if text:
has_key = __salt__["gpg.read_key"](
text=text, keyid=key, gnupghome=gnupghome, user=user
)
if has_key:
log.debug("Passed text contains key %s", key)
result = __salt__["gpg.import_key"](
text=text,
user=user,
gnupghome=gnupghome,
keyring=keyring,
select=key,
)
else:
result = {
"res": False,
"message": ["Passed text did not contain the requested key"],
}
result = _import_data(key, refresh, user, gnupghome, keyring, text=text)
else:
if not skip_keyserver:
result = __salt__["gpg.receive_keys"](
@ -209,68 +179,112 @@ def present(
gnupghome=gnupghome,
keyring=keyring,
)
if refresh and result["res"]:
# If we're refreshing and no updated key could be found,
# ensure we're failing here.
result["res"] = any(
"updated: new" in x for x in result["message"]
)
result["message"] = "\n".join(result["message"])
if (not result or result["res"] is False) and source:
if not isinstance(source, list):
source = [source]
prev_msg = ""
if result:
prev_msg = result["message"] + "\n"
for src in source:
sfn = __salt__["cp.cache_file"](src)
if sfn:
log.debug("Found source: %s", src)
has_key = __salt__["gpg.read_key"](
path=sfn, keyid=key, gnupghome=gnupghome, user=user
)
if has_key:
log.debug("Found source %s contains key %s", src, key)
result = __salt__["gpg.import_key"](
filename=sfn,
user=user,
gnupghome=gnupghome,
keyring=keyring,
select=key,
try:
result = _import_data(
key, refresh, user, gnupghome, keyring, path=sfn
)
break
except KeyNotContained as err:
if "expired" in str(err):
log.warning(
"Found source %s contains key %s, but it's expired",
src,
key,
)
else:
prev_msg = ""
if result:
prev_msg = " ".join(result["message"]) + ". In addition, "
result = {
"res": False,
"message": [
prev_msg
+ f"none of the specified sources were found or contained the key {key}."
],
}
if result["res"] is False:
ret["result"] = result["res"]
ret["comment"].extend(result["message"])
else:
ret["comment"].append(f"Added {key} to GPG keychain")
salt.utils.dictupdate.set_dict_key_value(
ret, f"changes:{key}:added", True
)
if trust:
if trust in TRUST_MAP:
try:
# update trust level
result = __salt__["gpg.trust_key"](
keyid=key,
trust_level=trust,
user=user,
gnupghome=gnupghome,
keyring=keyring,
raise CommandExecutionError(
prev_msg
+ f"none of the specified sources were found or contained the (unexpired) key {key}."
)
except SaltInvocationError as err:
result = {"res": False, "message": str(err)}
if result["res"] is False:
ret["result"] = result["res"]
ret["comment"].append(result["message"])
else:
ret["comment"].append(f"Set trust level for {key} to {trust}")
else:
ret["comment"].append(f"Invalid trust level {trust}")
ret["comment"] = "\n".join(ret["comment"])
if result["res"] is False:
raise CommandExecutionError(result["message"])
new_key = __salt__["gpg.get_key"](
keyid=key, user=user, gnupghome=gnupghome, keyring=keyring
)
if not new_key:
raise CommandExecutionError(
result["message"]
+ f"\nThe new key {key} could not be retrieved though."
)
salt.utils.dictupdate.set_dict_key_value(ret, f"changes:{key}:added", True)
if new_key.get("expired"):
raise CommandExecutionError(
result["message"] + f"\nThe new key {key} is expired though."
)
key_res[key].append(f"Added {key} to GPG keychain")
current_keys[key] = {"trust": new_key["trust"]}
if refresh:
key_res[key][-1] += " (the existing one was expired)"
salt.utils.dictupdate.set_dict_key_value(
ret, f"changes:{key}:refresh", True
)
except (CommandExecutionError, SaltInvocationError) as err:
ret["result"] = False
if refresh:
key_res[key].append(
"Existing key is expired, tried to fetch updated one"
)
key_res[key].extend(str(err).splitlines())
# Now all possible keys are present, manage their trust if requested
if trust:
for key in keys:
if key not in current_keys:
# This means the key was not present and could not be retrieved
continue
try:
if current_keys[key]["trust"] == TRUST_MAP[trust]:
key_res[key].append(
f"GPG Public Key {key} already in correct trust state"
)
continue
if __opts__["test"]:
ret["result"] = None
key_res[key].append(
f"Would have set trust level for {key} to {trust}"
)
salt.utils.dictupdate.set_dict_key_value(
ret, f"changes:{key}:trust", trust
)
continue
result = __salt__["gpg.trust_key"](
keyid=key,
trust_level=trust,
user=user,
gnupghome=gnupghome,
keyring=keyring,
)
if result["res"] is False:
raise CommandExecutionError(result["message"])
key_res[key].append(f"Set trust level for {key} to {trust}")
salt.utils.dictupdate.set_dict_key_value(
ret, f"changes:{key}:trust", trust
)
except (CommandExecutionError, SaltInvocationError) as err:
ret["result"] = False
key_res[key].append(str(err))
final_res = {
key: "\n * " + "\n * ".join(msgs) for key, msgs in key_res.items() if msgs
}
ret["comment"] = "\n".join(f"Key {key}:{msg}" for key, msg in final_res.items())
return ret
@ -382,3 +396,24 @@ def absent(
ret["comment"] = "\n".join(ret["comment"])
return ret
def _import_data(key, refresh, user, gnupghome, keyring, text=None, path=None):
has_key = __salt__["gpg.read_key"](
text=text, path=path, keyid=key, gnupghome=gnupghome, user=user
)
if has_key:
is_expired = has_key[0].get("expired")
# Ensure we still import the expired key if it's not present
if not is_expired or not refresh:
log.debug("Passed text contains key %s", key)
return __salt__["gpg.import_key"](
text=text,
filename=path,
user=user,
gnupghome=gnupghome,
keyring=keyring,
select=key,
)
raise KeyNotContained(f"Passed text contained the key {key}, but it's expired")
raise KeyNotContained(f"Passed text did not contain the requested key {key}")

View file

@ -1,3 +1,4 @@
import contextlib
import shutil
import subprocess
from pathlib import Path
@ -128,6 +129,49 @@ y9KvnTFP2+oeDX2Z/m4SoWw=
-----END PGP PUBLIC KEY BLOCK-----"""
@pytest.fixture
def key_e_fp():
return "2401C402776328D78D6B4C5D67D35BC98502D9B9"
# expires 2022-12-01
@pytest.fixture
def key_e_pub():
return """\
-----BEGIN PGP PUBLIC KEY BLOCK-----
mI0EY4gjEQEEAKYpWezZQWiAUDvAMcMhBkjHGY2fM4MMiXc6+fRbNV4VCL9TtJYE
gjccYVu44DtIYQzMVimrPQ6xepUmFRalezCG0OO4v25Ciwyeg8LX+Tb3kyAYFAxi
qLXAJyr3aZ/539xBak/Vf5xdURIi7WF5qBGQxd87tRDDqyPFnr87JJtFABEBAAG0
LUtleSBFIChHZW5lcmF0ZWQgYnkgU2FsdFN0YWNrKSA8a2V5ZUBleGFtcGxlPojX
BBMBCABBFiEEJAHEAndjKNeNa0xdZ9NbyYUC2bkFAmOIIxECGy8FCQAAZh8FCwkI
BwICIgIGFQoJCAsCBBYCAwECHgcCF4AACgkQZ9NbyYUC2bmn1QP/WPVhj1bC9/9R
hifv29MG9maRNIkuEkZKtRJj7HMSaamD5IOtGoyMuBwicb38n2Z2KQZUiJbvyZTt
PS328F8YSUSyWQKqmhwL0iLlnDzx8l/nFr5tiss2b/ZzjlMP4iXtAgEdVMJnfjrM
J7xvL0cNSsHha4hUIrekvzM+SNwYkzs=
=nue4
-----END PGP PUBLIC KEY BLOCK-----"""
@pytest.fixture
def key_e_pub_notexpired():
return """\
-----BEGIN PGP PUBLIC KEY BLOCK-----
mI0EY4gjEQEEAKYpWezZQWiAUDvAMcMhBkjHGY2fM4MMiXc6+fRbNV4VCL9TtJYE
gjccYVu44DtIYQzMVimrPQ6xepUmFRalezCG0OO4v25Ciwyeg8LX+Tb3kyAYFAxi
qLXAJyr3aZ/539xBak/Vf5xdURIi7WF5qBGQxd87tRDDqyPFnr87JJtFABEBAAG0
LUtleSBFIChHZW5lcmF0ZWQgYnkgU2FsdFN0YWNrKSA8a2V5ZUBleGFtcGxlPojR
BBMBCAA7AhsvBQsJCAcCAiICBhUKCQgLAgQWAgMBAh4HAheAFiEEJAHEAndjKNeN
a0xdZ9NbyYUC2bkFAmYOi8gACgkQZ9NbyYUC2bmTyAP+Jo5WUP9LYtXgcbKdhcbz
Kt6Cgbk39rzpmAYpejRSmiu0VrSuSou5W+60YhPPLOVdNOOsKFK1n1wO6sNwCTRU
xrQwNI2yBnuCIV/ZmuOdXLRKc4L8nGXW4lmDKK1PqrXDNH14Bpw0e+FVOR+iR3nW
G5lpc2BZ/RGsECq/HcbpFIM=
=qG1x
-----END PGP PUBLIC KEY BLOCK-----
"""
@pytest.fixture
def gnupg(gpghome):
return gnupglib.GPG(gnupghome=str(gpghome))
@ -164,19 +208,28 @@ def keyring(gpghome, tmp_path, request):
# cleanup is taken care of by gpghome and tmp_path
@pytest.fixture(params=(False, True))
def testmode(request):
return request.param
@pytest.mark.windows_whitelisted
@pytest.mark.usefixtures("_pubkeys_present")
def test_gpg_present_no_changes(gpghome, gpg, gnupg, key_a_fp):
def test_gpg_present_no_changes(gpghome, gpg, gnupg, key_a_fp, testmode):
assert gnupg.list_keys(keys=key_a_fp)
ret = gpg.present(
key_a_fp[-16:], trust="unknown", gnupghome=str(gpghome), keyserver="nonexistent"
key_a_fp[-16:],
trust="unknown",
gnupghome=str(gpghome),
keyserver="nonexistent",
test=testmode,
)
assert ret.result
assert not ret.changes
def test_gpg_present_keyring_no_changes(
gpghome, gpg, gnupg, gnupg_keyring, keyring, key_a_fp
gpghome, gpg, gnupg, gnupg_keyring, keyring, key_a_fp, testmode
):
"""
The keyring tests are not whitelisted on Windows since they are just
@ -190,6 +243,7 @@ def test_gpg_present_keyring_no_changes(
gnupghome=str(gpghome),
keyserver="nonexistent",
keyring=keyring,
test=testmode,
)
assert ret.result
assert not ret.changes
@ -197,24 +251,26 @@ def test_gpg_present_keyring_no_changes(
@pytest.mark.windows_whitelisted
@pytest.mark.usefixtures("_pubkeys_present")
def test_gpg_present_trust_change(gpghome, gpg, gnupg, key_a_fp):
def test_gpg_present_trust_change(gpghome, gpg, gnupg, key_a_fp, testmode):
assert gnupg.list_keys(keys=key_a_fp)
ret = gpg.present(
key_a_fp[-16:],
gnupghome=str(gpghome),
trust="ultimately",
keyserver="nonexistent",
test=testmode,
)
assert ret.result
assert ret.result is not False
assert (ret.result is None) is testmode
assert ret.changes
assert ret.changes == {key_a_fp[-16:]: {"trust": "ultimately"}}
key_info = gnupg.list_keys(keys=key_a_fp)
assert key_info
assert key_info[0]["trust"] == "u"
assert (key_info[0]["trust"] == "u") is not testmode
def test_gpg_present_keyring_trust_change(
gpghome, gpg, gnupg, gnupg_keyring, keyring, key_a_fp
gpghome, gpg, gnupg, gnupg_keyring, keyring, key_a_fp, testmode
):
assert not gnupg.list_keys(keys=key_a_fp)
assert gnupg_keyring.list_keys(keys=key_a_fp)
@ -224,17 +280,21 @@ def test_gpg_present_keyring_trust_change(
trust="ultimately",
keyserver="nonexistent",
keyring=keyring,
test=testmode,
)
assert ret.result
assert ret.result is not False
assert (ret.result is None) is testmode
assert ret.changes
assert ret.changes == {key_a_fp[-16:]: {"trust": "ultimately"}}
key_info = gnupg_keyring.list_keys(keys=key_a_fp)
assert key_info
assert key_info[0]["trust"] == "u"
assert (key_info[0]["trust"] == "u") is not testmode
# Cannot whitelist source/text tests for Windows since it uses a
# keyring internally, which causes test timeouts for some reason.
def test_gpg_present_source(
gpghome, gpg, gnupg, key_a_fp, key_a_pub, key_b_pub, key_b_fp
gpghome, gpg, gnupg, key_a_fp, key_a_pub, key_b_pub, key_b_fp, testmode
):
with pytest.helpers.temp_file(
"keys", contents=key_a_pub + "\n" + key_b_pub
@ -244,31 +304,110 @@ def test_gpg_present_source(
gnupghome=str(gpghome),
skip_keyserver=True,
source=str(keyfile),
test=testmode,
)
assert ret.result
assert ret.result is not False
assert (ret.result is None) is testmode
assert ret.changes
assert key_a_fp[-16:] in ret.changes
assert ret.changes[key_a_fp[-16:]]["added"]
assert gnupg.list_keys(keys=key_a_fp)
assert bool(gnupg.list_keys(keys=key_a_fp)) is not testmode
assert not gnupg.list_keys(keys=key_b_fp)
@pytest.mark.parametrize("keyring", ((),), indirect=True)
def test_gpg_present_source_keyring(
gpghome,
gpg,
gnupg,
gnupg_keyring,
keyring,
key_a_fp,
key_a_pub,
key_b_pub,
key_b_fp,
testmode,
):
"""
Ensure imports from a list of file sources to a keyring work
"""
with pytest.helpers.temp_file(
"keys", contents=key_a_pub + "\n" + key_b_pub
) as keyfile:
ret = gpg.present(
key_a_fp[-16:],
gnupghome=str(gpghome),
skip_keyserver=True,
source=str(keyfile),
keyring=keyring,
test=testmode,
)
assert ret.result is not False
assert (ret.result is None) is testmode
assert ret.changes
assert key_a_fp[-16:] in ret.changes
assert ret.changes[key_a_fp[-16:]]["added"]
assert not gnupg.list_keys(keys=key_a_fp)
assert bool(gnupg_keyring.list_keys(keys=key_a_fp)) is not testmode
assert not gnupg_keyring.list_keys(keys=key_b_fp)
@pytest.mark.skipif(
PYGNUPG_VERSION < (0, 5, 1), reason="Text requires python-gnupg >=0.5.1"
)
def test_gpg_present_text(
gpghome, gpg, gnupg, key_a_fp, key_a_pub, key_b_pub, key_b_fp
gpghome, gpg, gnupg, key_a_fp, key_a_pub, key_b_pub, key_b_fp, testmode
):
concat = key_a_pub + "\n" + key_b_pub
ret = gpg.present(key_a_fp[-16:], gnupghome=str(gpghome), text=concat)
assert ret.result
ret = gpg.present(
key_a_fp[-16:], gnupghome=str(gpghome), text=concat, test=testmode
)
assert ret.result is not False
assert (ret.result is None) is testmode
assert ret.changes
assert key_a_fp[-16:] in ret.changes
assert ret.changes[key_a_fp[-16:]]["added"]
assert gnupg.list_keys(keys=key_a_fp)
assert bool(gnupg.list_keys(keys=key_a_fp)) is not testmode
assert not gnupg.list_keys(keys=key_b_fp)
@pytest.mark.skipif(
PYGNUPG_VERSION < (0, 5, 1), reason="Text requires python-gnupg >=0.5.1"
)
@pytest.mark.parametrize("keyring", ((),), indirect=True)
def test_gpg_present_text_keyring(
gpghome,
gpg,
gnupg,
gnupg_keyring,
keyring,
key_a_fp,
key_a_pub,
key_b_pub,
key_b_fp,
testmode,
):
"""
Ensure imports from a textual source to a keyring work
"""
concat = key_a_pub + "\n" + key_b_pub
ret = gpg.present(
key_a_fp[-16:],
gnupghome=str(gpghome),
keyring=keyring,
text=concat,
test=testmode,
)
assert ret.result is not False
assert (ret.result is None) is testmode
assert ret.changes
assert key_a_fp[-16:] in ret.changes
assert ret.changes[key_a_fp[-16:]]["added"]
assert not gnupg.list_keys(keys=key_a_fp)
assert bool(gnupg_keyring.list_keys(keys=key_a_fp)) is not testmode
assert not gnupg_keyring.list_keys(keys=key_b_fp)
@pytest.mark.skipif(
PYGNUPG_VERSION < (0, 5, 1), reason="Text requires python-gnupg >=0.5.1"
)
@ -320,7 +459,8 @@ def test_gpg_present_source_not_contained(
assert not gnupg.list_keys(keys=key_a_fp)
assert not gnupg.list_keys(keys=key_b_fp)
assert (
"none of the specified sources were found or contained the key" in ret.comment
"none of the specified sources were found or contained the (unexpired) key"
in ret.comment
)
@ -345,6 +485,162 @@ def test_gpg_present_source_bad_keyfile(
assert not gnupg.list_keys(keys=key_b_fp)
@pytest.mark.parametrize(
"method",
(
"source",
pytest.param(
"text",
marks=pytest.mark.skipif(
PYGNUPG_VERSION < (0, 5, 1), reason="Text requires python-gnupg >=0.5.1"
),
),
),
)
def test_gpg_present_import_expired_key(
method, gpghome, gpg, gnupg, key_e_fp, key_e_pub
):
"""
Ensure that when a newly imported key is expired, the state fails.
The key should be imported though if it was not present before.
"""
if method == "source":
ctx = pytest.helpers.temp_file("keys", contents=key_e_pub)
else:
ctx = contextlib.nullcontext()
params = {"text": key_e_pub}
with ctx as inst:
if method == "source":
params = {"source": [str(inst)]}
ret = gpg.present(
key_e_fp[-16:],
gnupghome=str(gpghome),
skip_keyserver=True,
**params,
)
assert ret.result is False
assert "is expired" in ret.comment
assert ret.changes
assert ret.changes[key_e_fp[-16:]]["added"]
assert gnupg.list_keys(keys=key_e_fp)
@pytest.mark.usefixtures("_pubkeys_present")
@pytest.mark.parametrize("_pubkeys_present", (("e",),), indirect=True)
@pytest.mark.parametrize(
"method",
(
"source",
pytest.param(
"text",
marks=pytest.mark.skipif(
PYGNUPG_VERSION < (0, 5, 1), reason="Text requires python-gnupg >=0.5.1"
),
),
),
)
def test_gpg_present_expired_key_already_present_fails(
method, gpghome, gpg, gnupg, key_e_fp, key_e_pub
):
"""
Ensure that when a present key is expired and no new one can be found,
the state fails without changes.
"""
if method == "source":
ctx = pytest.helpers.temp_file("keys", contents=key_e_pub)
else:
ctx = contextlib.nullcontext()
params = {"text": key_e_pub}
with ctx as inst:
if method == "source":
params = {"source": [str(inst)]}
ret = gpg.present(
key_e_fp[-16:],
gnupghome=str(gpghome),
skip_keyserver=True,
**params,
)
assert ret.result is False
if method == "source":
assert "contained the (unexpired) key" in ret.comment
else:
assert "but it's expired" in ret.comment
assert not ret.changes
@pytest.mark.usefixtures("_pubkeys_present")
@pytest.mark.parametrize("_pubkeys_present", (("e",),), indirect=True)
@pytest.mark.parametrize(
"method",
(
"source",
pytest.param(
"text",
marks=pytest.mark.skipif(
PYGNUPG_VERSION < (0, 5, 1), reason="Text requires python-gnupg >=0.5.1"
),
),
),
)
def test_gpg_present_expired_key_already_present_refresh(
method, gpghome, gpg, gnupg, key_e_fp, key_e_pub_notexpired, testmode
):
"""
Ensure that when a present key is expired and a new one is available,
the key is reimported.
"""
if method == "source":
ctx = pytest.helpers.temp_file("keys", contents=key_e_pub_notexpired)
else:
ctx = contextlib.nullcontext()
params = {"text": key_e_pub_notexpired}
with ctx as inst:
if method == "source":
params = {"source": [str(inst)]}
ret = gpg.present(
key_e_fp[-16:],
gnupghome=str(gpghome),
skip_keyserver=True,
**params,
test=testmode,
)
assert ret.result is not False
assert (ret.result is None) is testmode
assert "the existing one was expired" in ret.comment
assert ret.changes
assert ret.changes[key_e_fp[-16:]]["added"]
assert ret.changes[key_e_fp[-16:]]["refresh"]
@pytest.mark.usefixtures("_pubkeys_present")
@pytest.mark.parametrize("_pubkeys_present", (("e",),), indirect=True)
def test_gpg_present_expired_key_trust_change(
gpghome, gpg, gnupg, key_e_fp, key_e_pub_notexpired, testmode
):
"""
Test that key expiry updates and trust changes work together
"""
assert gnupg.list_keys(keys=key_e_fp)
with pytest.helpers.temp_file("keys", contents=key_e_pub_notexpired) as keyfile:
ret = gpg.present(
key_e_fp[-16:],
gnupghome=str(gpghome),
trust="ultimately",
skip_keyserver=True,
source=[str(keyfile)],
test=testmode,
)
assert ret.result is not False
assert (ret.result is None) is testmode
assert ret.changes
assert ret.changes == {
key_e_fp[-16:]: {"added": True, "refresh": True, "trust": "ultimately"}
}
key_info = gnupg.list_keys(keys=key_e_fp)
assert key_info
assert (key_info[0]["trust"] == "u") is not testmode
@pytest.mark.windows_whitelisted
def test_gpg_absent_no_changes(gpghome, gpg, gnupg, key_a_fp):
assert not gnupg.list_keys(keys=key_a_fp)

View file

@ -283,6 +283,7 @@ def test_list_keys():
"uids": ["GPG Person <person@example.com>"],
"created": "2017-09-28",
"expires": "2033-09-24",
"expired": False,
"keyLength": "4096",
"ownerTrust": "Unknown",
"fingerprint": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
@ -293,6 +294,7 @@ def test_list_keys():
"uids": ["GPG Person <person@example.com>"],
"created": "2017-09-28",
"expires": "2033-09-24",
"expired": False,
"keyLength": "4096",
"ownerTrust": "Unknown",
"fingerprint": "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy",
@ -355,6 +357,7 @@ def test_get_key():
"trust": "Unknown",
"ownerTrust": "Unknown",
"expires": "2033-09-24",
"expired": False,
"keyLength": "4096",
}

View file

@ -82,7 +82,7 @@ def gpg_trust(request):
yield trust
@pytest.fixture()
@pytest.fixture
def gpg_receive(request):
recv = Mock(spec="salt.modules.gpg.receive_keys")
recv.return_value = getattr(
@ -92,6 +92,22 @@ def gpg_receive(request):
yield recv
@pytest.fixture
def gpg_get_key(keys_list):
def _get_key(keyid=None, **kwargs):
if keyid == "new":
ret = keys_list[3].copy()
ret["keyid"] = "new"
return ret
return next(iter(x for x in keys_list if x["keyid"] == keyid))
getkey = Mock(spec="salt.modules.gpg.get_key")
getkey.side_effect = _get_key
with patch.dict(gpg.__salt__, {"gpg.get_key": getkey}):
yield getkey
@pytest.mark.usefixtures("gpg_list_keys")
@pytest.mark.parametrize(
"gpg_trust,expected",
@ -109,7 +125,7 @@ def test_gpg_present_trust_change(gpg_receive, gpg_trust, expected):
gpg_receive.assert_not_called()
@pytest.mark.usefixtures("gpg_list_keys")
@pytest.mark.usefixtures("gpg_list_keys", "gpg_get_key")
@pytest.mark.parametrize(
"gpg_receive,expected",
[
@ -134,7 +150,7 @@ def test_gpg_present_new_key(gpg_receive, gpg_trust, expected):
gpg_trust.assert_not_called()
@pytest.mark.usefixtures("gpg_list_keys")
@pytest.mark.usefixtures("gpg_list_keys", "gpg_get_key")
@pytest.mark.parametrize(
"gpg_trust,expected",
[