diff --git a/changelog/66314.changed.md b/changelog/66314.changed.md new file mode 100644 index 00000000000..1125bec27e2 --- /dev/null +++ b/changelog/66314.changed.md @@ -0,0 +1 @@ +Made `gpg.present` attempt to refresh keys if they are expired diff --git a/salt/modules/gpg.py b/salt/modules/gpg.py index 8df4b7860b6..9f24c40e65d 100644 --- a/salt/modules/gpg.py +++ b/salt/modules/gpg.py @@ -139,16 +139,24 @@ def _get_user_info(user=None): """ Wrapper for user.info Salt function """ + user_from_config = False if not user: - # Get user Salt running as + # Get user Salt is running as user = __salt__["config.option"]("user") + # Ensure we don't get an infinite loop when `salt` is returned as the user, + # but it does not exist for some reason. + user_from_config = True + if salt.utils.platform.is_windows() and "\\" in user: + # At least in the test suite, this config option is set + # including the hostname, so split it off + user = user.split("\\", maxsplit=1)[1] userinfo = __salt__["user.info"](user) if not userinfo: - if user == "salt": + if user == "salt" and not user_from_config: # Special case with `salt` user: - # if it doesn't exist then fall back to user Salt running as + # if it doesn't exist then fall back to user Salt is running as userinfo = _get_user_info() else: raise SaltInvocationError(f"User {user} does not exist") @@ -316,23 +324,7 @@ def search_keys(text, keyserver=None, user=None, gnupghome=None): _keys = [] for _key in _search_keys(text, keyserver, user=user, gnupghome=gnupghome): - tmp = {"keyid": _key["keyid"], "uids": _key["uids"]} - - expires = _key.get("expires", None) - date = _key.get("date", None) - length = _key.get("length", None) - - if expires: - tmp["expires"] = time.strftime( - "%Y-%m-%d", time.localtime(float(_key["expires"])) - ) - if date: - tmp["created"] = time.strftime( - "%Y-%m-%d", time.localtime(float(_key["date"])) - ) - if length: - tmp["keyLength"] = _key["length"] - _keys.append(tmp) + _keys.append(_render_key(_key)) return _keys @@ -403,9 +395,10 @@ def list_secret_keys(user=None, gnupghome=None, keyring=None): def _render_key(_key): tmp = { "keyid": _key["keyid"], - "fingerprint": _key["fingerprint"], "uids": _key["uids"], } + if "fingerprint" in _key: + tmp["fingerprint"] = _key["fingerprint"] expires = _key.get("expires", None) date = _key.get("date", None) @@ -417,6 +410,7 @@ def _render_key(_key): tmp["expires"] = time.strftime( "%Y-%m-%d", time.localtime(float(_key["expires"])) ) + tmp["expired"] = time.time() >= float(expires) if date: tmp["created"] = time.strftime("%Y-%m-%d", time.localtime(float(_key["date"]))) if length: @@ -731,41 +725,14 @@ def get_key(keyid=None, fingerprint=None, user=None, gnupghome=None, keyring=Non salt '*' gpg.get_key keyid=3FAD9F1E user=username """ - tmp = {} for _key in _list_keys(user=user, gnupghome=gnupghome, keyring=keyring): if ( _key["fingerprint"] == fingerprint or _key["keyid"] == keyid or _key["keyid"][8:] == keyid ): - tmp["keyid"] = _key["keyid"] - tmp["fingerprint"] = _key["fingerprint"] - tmp["uids"] = _key["uids"] - - expires = _key.get("expires", None) - date = _key.get("date", None) - length = _key.get("length", None) - owner_trust = _key.get("ownertrust", None) - trust = _key.get("trust", None) - - if expires: - tmp["expires"] = time.strftime( - "%Y-%m-%d", time.localtime(float(_key["expires"])) - ) - if date: - tmp["created"] = time.strftime( - "%Y-%m-%d", time.localtime(float(_key["date"])) - ) - if length: - tmp["keyLength"] = _key["length"] - if owner_trust: - tmp["ownerTrust"] = LETTER_TRUST_DICT[_key["ownertrust"]] - if trust: - tmp["trust"] = LETTER_TRUST_DICT[_key["trust"]] - if not tmp: - return False - else: - return tmp + return _render_key(_key) + return False def get_secret_key( @@ -805,7 +772,6 @@ def get_secret_key( salt '*' gpg.get_secret_key keyid=3FAD9F1E user=username """ - tmp = {} for _key in _list_keys( user=user, gnupghome=gnupghome, keyring=keyring, secret=True ): @@ -814,34 +780,8 @@ def get_secret_key( or _key["keyid"] == keyid or _key["keyid"][8:] == keyid ): - tmp["keyid"] = _key["keyid"] - tmp["fingerprint"] = _key["fingerprint"] - tmp["uids"] = _key["uids"] - - expires = _key.get("expires", None) - date = _key.get("date", None) - length = _key.get("length", None) - owner_trust = _key.get("ownertrust", None) - trust = _key.get("trust", None) - - if expires: - tmp["expires"] = time.strftime( - "%Y-%m-%d", time.localtime(float(_key["expires"])) - ) - if date: - tmp["created"] = time.strftime( - "%Y-%m-%d", time.localtime(float(_key["date"])) - ) - if length: - tmp["keyLength"] = _key["length"] - if owner_trust: - tmp["ownerTrust"] = LETTER_TRUST_DICT[_key["ownertrust"]] - if trust: - tmp["trust"] = LETTER_TRUST_DICT[_key["trust"]] - if not tmp: - return False - else: - return tmp + return _render_key(_key) + return False @_restore_ownership @@ -1164,6 +1104,14 @@ def receive_keys(keyserver=None, keys=None, user=None, gnupghome=None, keyring=N ret["message"].append( f"Key {result['fingerprint']} already exists in keychain" ) + elif result["ok"] == "4": + ret["message"].append( + f"Key {result['fingerprint']} updated: new signatures" + ) + elif result["ok"] == "8": + ret["message"].append( + f"Key {result['fingerprint']} updated: new subkeys" + ) elif "problem" in result: ret["message"].append( f"Unable to add key to keychain: {result.get('text', 'No further description')}" diff --git a/salt/states/gpg.py b/salt/states/gpg.py index aa7bb137cc3..bc8be86a1bc 100644 --- a/salt/states/gpg.py +++ b/salt/states/gpg.py @@ -10,7 +10,7 @@ import logging import salt.utils.dictupdate import salt.utils.immutabletypes as immutabletypes -from salt.exceptions import SaltInvocationError +from salt.exceptions import CommandExecutionError, SaltInvocationError log = logging.getLogger(__name__) @@ -26,6 +26,12 @@ TRUST_MAP = immutabletypes.freeze( ) +class KeyNotContained(CommandExecutionError): + """ + Raised when a data source does not contain a requested key + """ + + def present( name, keys=None, @@ -40,7 +46,8 @@ def present( **kwargs, ): """ - Ensure a GPG public key is present in the GPG keychain. + Ensure a GPG public key is present in the GPG keychain and + that it is not expired. name The key ID of the GPG public key. @@ -104,102 +111,65 @@ def present( .. versionadded:: 3008.0 """ - ret = {"name": name, "result": True, "changes": {}, "comment": []} + ret = {"name": name, "result": True, "changes": {}, "comment": ""} - if not text and skip_keyserver and not source: - ret["result"] = False - ret["comment"] = ( - "When skipping keyservers, you must provide at least one source" + try: + if not text and skip_keyserver and not source: + raise SaltInvocationError( + "When skipping keyservers, you must provide at least one source" + ) + + if trust and trust not in TRUST_MAP: + raise SaltInvocationError(f"Invalid trust level {trust}") + + _current_keys = __salt__["gpg.list_keys"]( + user=user, gnupghome=gnupghome, keyring=keyring ) + except (CommandExecutionError, SaltInvocationError) as err: + ret["result"] = False + ret["comment"] = str(err) return ret - _current_keys = __salt__["gpg.list_keys"]( - user=user, gnupghome=gnupghome, keyring=keyring - ) - current_keys = {} + expired_keys = [] for key in _current_keys: keyid = key["keyid"] current_keys[keyid] = {} current_keys[keyid]["trust"] = key["trust"] + if key.get("expired"): + expired_keys.append(keyid) if not keys: keys = name - if isinstance(keys, str): keys = [keys] + key_res = {} + # First, ensure all keys are present for key in keys: - if key in current_keys: - if trust: - if trust in TRUST_MAP: - if current_keys[key]["trust"] != TRUST_MAP[trust]: - if __opts__["test"]: - ret["result"] = None - ret["comment"].append( - f"Would have set trust level for {key} to {trust}" - ) - salt.utils.dictupdate.set_dict_key_value( - ret, f"changes:{key}:trust", trust - ) - continue - try: - # update trust level - result = __salt__["gpg.trust_key"]( - keyid=key, - trust_level=trust, - user=user, - gnupghome=gnupghome, - keyring=keyring, - ) - except SaltInvocationError as err: - result = {"res": False, "message": str(err)} - if result["res"] is False: - ret["result"] = result["res"] - ret["comment"].append(result["message"]) - else: - salt.utils.dictupdate.set_dict_key_value( - ret, f"changes:{key}:trust", trust - ) - ret["comment"].append( - f"Set trust level for {key} to {trust}" - ) - else: - ret["comment"].append( - f"GPG Public Key {key} already in correct trust state" - ) - else: - ret["comment"].append(f"Invalid trust level {trust}") - - ret["comment"].append(f"GPG Public Key {key} already in keychain") - - else: + key_res[key] = [] + try: + refresh = key in expired_keys + if key in current_keys and not refresh: + key_res[key].append(f"GPG Public Key {key} already in keychain") + continue if __opts__["test"]: ret["result"] = None - ret["comment"].append(f"Would have added {key} to GPG keychain") + key_res[key] = [f"Would have added {key} to GPG keychain"] salt.utils.dictupdate.set_dict_key_value( ret, f"changes:{key}:added", True ) + if refresh: + key_res[key][-1] += " (the existing one was expired)" + salt.utils.dictupdate.set_dict_key_value( + ret, f"changes:{key}:refresh", True + ) + else: + current_keys[key] = {"trust": "unknown"} continue result = {} if text: - has_key = __salt__["gpg.read_key"]( - text=text, keyid=key, gnupghome=gnupghome, user=user - ) - if has_key: - log.debug("Passed text contains key %s", key) - result = __salt__["gpg.import_key"]( - text=text, - user=user, - gnupghome=gnupghome, - keyring=keyring, - select=key, - ) - else: - result = { - "res": False, - "message": ["Passed text did not contain the requested key"], - } + result = _import_data(key, refresh, user, gnupghome, keyring, text=text) else: if not skip_keyserver: result = __salt__["gpg.receive_keys"]( @@ -209,68 +179,112 @@ def present( gnupghome=gnupghome, keyring=keyring, ) + if refresh and result["res"]: + # If we're refreshing and no updated key could be found, + # ensure we're failing here. + result["res"] = any( + "updated: new" in x for x in result["message"] + ) + result["message"] = "\n".join(result["message"]) if (not result or result["res"] is False) and source: if not isinstance(source, list): source = [source] + prev_msg = "" + if result: + prev_msg = result["message"] + "\n" for src in source: sfn = __salt__["cp.cache_file"](src) if sfn: log.debug("Found source: %s", src) - has_key = __salt__["gpg.read_key"]( - path=sfn, keyid=key, gnupghome=gnupghome, user=user - ) - if has_key: - log.debug("Found source %s contains key %s", src, key) - result = __salt__["gpg.import_key"]( - filename=sfn, - user=user, - gnupghome=gnupghome, - keyring=keyring, - select=key, + try: + result = _import_data( + key, refresh, user, gnupghome, keyring, path=sfn ) break + except KeyNotContained as err: + if "expired" in str(err): + log.warning( + "Found source %s contains key %s, but it's expired", + src, + key, + ) else: - prev_msg = "" - if result: - prev_msg = " ".join(result["message"]) + ". In addition, " - result = { - "res": False, - "message": [ - prev_msg - + f"none of the specified sources were found or contained the key {key}." - ], - } - if result["res"] is False: - ret["result"] = result["res"] - ret["comment"].extend(result["message"]) - else: - ret["comment"].append(f"Added {key} to GPG keychain") - salt.utils.dictupdate.set_dict_key_value( - ret, f"changes:{key}:added", True - ) - - if trust: - if trust in TRUST_MAP: - try: - # update trust level - result = __salt__["gpg.trust_key"]( - keyid=key, - trust_level=trust, - user=user, - gnupghome=gnupghome, - keyring=keyring, + raise CommandExecutionError( + prev_msg + + f"none of the specified sources were found or contained the (unexpired) key {key}." ) - except SaltInvocationError as err: - result = {"res": False, "message": str(err)} - if result["res"] is False: - ret["result"] = result["res"] - ret["comment"].append(result["message"]) - else: - ret["comment"].append(f"Set trust level for {key} to {trust}") - else: - ret["comment"].append(f"Invalid trust level {trust}") - ret["comment"] = "\n".join(ret["comment"]) + if result["res"] is False: + raise CommandExecutionError(result["message"]) + new_key = __salt__["gpg.get_key"]( + keyid=key, user=user, gnupghome=gnupghome, keyring=keyring + ) + if not new_key: + raise CommandExecutionError( + result["message"] + + f"\nThe new key {key} could not be retrieved though." + ) + salt.utils.dictupdate.set_dict_key_value(ret, f"changes:{key}:added", True) + if new_key.get("expired"): + raise CommandExecutionError( + result["message"] + f"\nThe new key {key} is expired though." + ) + key_res[key].append(f"Added {key} to GPG keychain") + current_keys[key] = {"trust": new_key["trust"]} + if refresh: + key_res[key][-1] += " (the existing one was expired)" + salt.utils.dictupdate.set_dict_key_value( + ret, f"changes:{key}:refresh", True + ) + except (CommandExecutionError, SaltInvocationError) as err: + ret["result"] = False + if refresh: + key_res[key].append( + "Existing key is expired, tried to fetch updated one" + ) + key_res[key].extend(str(err).splitlines()) + + # Now all possible keys are present, manage their trust if requested + if trust: + for key in keys: + if key not in current_keys: + # This means the key was not present and could not be retrieved + continue + try: + if current_keys[key]["trust"] == TRUST_MAP[trust]: + key_res[key].append( + f"GPG Public Key {key} already in correct trust state" + ) + continue + if __opts__["test"]: + ret["result"] = None + key_res[key].append( + f"Would have set trust level for {key} to {trust}" + ) + salt.utils.dictupdate.set_dict_key_value( + ret, f"changes:{key}:trust", trust + ) + continue + result = __salt__["gpg.trust_key"]( + keyid=key, + trust_level=trust, + user=user, + gnupghome=gnupghome, + keyring=keyring, + ) + if result["res"] is False: + raise CommandExecutionError(result["message"]) + key_res[key].append(f"Set trust level for {key} to {trust}") + salt.utils.dictupdate.set_dict_key_value( + ret, f"changes:{key}:trust", trust + ) + except (CommandExecutionError, SaltInvocationError) as err: + ret["result"] = False + key_res[key].append(str(err)) + final_res = { + key: "\n * " + "\n * ".join(msgs) for key, msgs in key_res.items() if msgs + } + ret["comment"] = "\n".join(f"Key {key}:{msg}" for key, msg in final_res.items()) return ret @@ -382,3 +396,24 @@ def absent( ret["comment"] = "\n".join(ret["comment"]) return ret + + +def _import_data(key, refresh, user, gnupghome, keyring, text=None, path=None): + has_key = __salt__["gpg.read_key"]( + text=text, path=path, keyid=key, gnupghome=gnupghome, user=user + ) + if has_key: + is_expired = has_key[0].get("expired") + # Ensure we still import the expired key if it's not present + if not is_expired or not refresh: + log.debug("Passed text contains key %s", key) + return __salt__["gpg.import_key"]( + text=text, + filename=path, + user=user, + gnupghome=gnupghome, + keyring=keyring, + select=key, + ) + raise KeyNotContained(f"Passed text contained the key {key}, but it's expired") + raise KeyNotContained(f"Passed text did not contain the requested key {key}") diff --git a/tests/pytests/functional/states/test_gpg.py b/tests/pytests/functional/states/test_gpg.py index 2d31725baa9..848e0b8d68e 100644 --- a/tests/pytests/functional/states/test_gpg.py +++ b/tests/pytests/functional/states/test_gpg.py @@ -1,3 +1,4 @@ +import contextlib import shutil import subprocess from pathlib import Path @@ -128,6 +129,49 @@ y9KvnTFP2+oeDX2Z/m4SoWw= -----END PGP PUBLIC KEY BLOCK-----""" +@pytest.fixture +def key_e_fp(): + return "2401C402776328D78D6B4C5D67D35BC98502D9B9" + + +# expires 2022-12-01 +@pytest.fixture +def key_e_pub(): + return """\ +-----BEGIN PGP PUBLIC KEY BLOCK----- + +mI0EY4gjEQEEAKYpWezZQWiAUDvAMcMhBkjHGY2fM4MMiXc6+fRbNV4VCL9TtJYE +gjccYVu44DtIYQzMVimrPQ6xepUmFRalezCG0OO4v25Ciwyeg8LX+Tb3kyAYFAxi +qLXAJyr3aZ/539xBak/Vf5xdURIi7WF5qBGQxd87tRDDqyPFnr87JJtFABEBAAG0 +LUtleSBFIChHZW5lcmF0ZWQgYnkgU2FsdFN0YWNrKSA8a2V5ZUBleGFtcGxlPojX +BBMBCABBFiEEJAHEAndjKNeNa0xdZ9NbyYUC2bkFAmOIIxECGy8FCQAAZh8FCwkI +BwICIgIGFQoJCAsCBBYCAwECHgcCF4AACgkQZ9NbyYUC2bmn1QP/WPVhj1bC9/9R +hifv29MG9maRNIkuEkZKtRJj7HMSaamD5IOtGoyMuBwicb38n2Z2KQZUiJbvyZTt +PS328F8YSUSyWQKqmhwL0iLlnDzx8l/nFr5tiss2b/ZzjlMP4iXtAgEdVMJnfjrM +J7xvL0cNSsHha4hUIrekvzM+SNwYkzs= +=nue4 +-----END PGP PUBLIC KEY BLOCK-----""" + + +@pytest.fixture +def key_e_pub_notexpired(): + return """\ +-----BEGIN PGP PUBLIC KEY BLOCK----- + +mI0EY4gjEQEEAKYpWezZQWiAUDvAMcMhBkjHGY2fM4MMiXc6+fRbNV4VCL9TtJYE +gjccYVu44DtIYQzMVimrPQ6xepUmFRalezCG0OO4v25Ciwyeg8LX+Tb3kyAYFAxi +qLXAJyr3aZ/539xBak/Vf5xdURIi7WF5qBGQxd87tRDDqyPFnr87JJtFABEBAAG0 +LUtleSBFIChHZW5lcmF0ZWQgYnkgU2FsdFN0YWNrKSA8a2V5ZUBleGFtcGxlPojR +BBMBCAA7AhsvBQsJCAcCAiICBhUKCQgLAgQWAgMBAh4HAheAFiEEJAHEAndjKNeN +a0xdZ9NbyYUC2bkFAmYOi8gACgkQZ9NbyYUC2bmTyAP+Jo5WUP9LYtXgcbKdhcbz +Kt6Cgbk39rzpmAYpejRSmiu0VrSuSou5W+60YhPPLOVdNOOsKFK1n1wO6sNwCTRU +xrQwNI2yBnuCIV/ZmuOdXLRKc4L8nGXW4lmDKK1PqrXDNH14Bpw0e+FVOR+iR3nW +G5lpc2BZ/RGsECq/HcbpFIM= +=qG1x +-----END PGP PUBLIC KEY BLOCK----- +""" + + @pytest.fixture def gnupg(gpghome): return gnupglib.GPG(gnupghome=str(gpghome)) @@ -164,19 +208,28 @@ def keyring(gpghome, tmp_path, request): # cleanup is taken care of by gpghome and tmp_path +@pytest.fixture(params=(False, True)) +def testmode(request): + return request.param + + @pytest.mark.windows_whitelisted @pytest.mark.usefixtures("_pubkeys_present") -def test_gpg_present_no_changes(gpghome, gpg, gnupg, key_a_fp): +def test_gpg_present_no_changes(gpghome, gpg, gnupg, key_a_fp, testmode): assert gnupg.list_keys(keys=key_a_fp) ret = gpg.present( - key_a_fp[-16:], trust="unknown", gnupghome=str(gpghome), keyserver="nonexistent" + key_a_fp[-16:], + trust="unknown", + gnupghome=str(gpghome), + keyserver="nonexistent", + test=testmode, ) assert ret.result assert not ret.changes def test_gpg_present_keyring_no_changes( - gpghome, gpg, gnupg, gnupg_keyring, keyring, key_a_fp + gpghome, gpg, gnupg, gnupg_keyring, keyring, key_a_fp, testmode ): """ The keyring tests are not whitelisted on Windows since they are just @@ -190,6 +243,7 @@ def test_gpg_present_keyring_no_changes( gnupghome=str(gpghome), keyserver="nonexistent", keyring=keyring, + test=testmode, ) assert ret.result assert not ret.changes @@ -197,24 +251,26 @@ def test_gpg_present_keyring_no_changes( @pytest.mark.windows_whitelisted @pytest.mark.usefixtures("_pubkeys_present") -def test_gpg_present_trust_change(gpghome, gpg, gnupg, key_a_fp): +def test_gpg_present_trust_change(gpghome, gpg, gnupg, key_a_fp, testmode): assert gnupg.list_keys(keys=key_a_fp) ret = gpg.present( key_a_fp[-16:], gnupghome=str(gpghome), trust="ultimately", keyserver="nonexistent", + test=testmode, ) - assert ret.result + assert ret.result is not False + assert (ret.result is None) is testmode assert ret.changes assert ret.changes == {key_a_fp[-16:]: {"trust": "ultimately"}} key_info = gnupg.list_keys(keys=key_a_fp) assert key_info - assert key_info[0]["trust"] == "u" + assert (key_info[0]["trust"] == "u") is not testmode def test_gpg_present_keyring_trust_change( - gpghome, gpg, gnupg, gnupg_keyring, keyring, key_a_fp + gpghome, gpg, gnupg, gnupg_keyring, keyring, key_a_fp, testmode ): assert not gnupg.list_keys(keys=key_a_fp) assert gnupg_keyring.list_keys(keys=key_a_fp) @@ -224,17 +280,21 @@ def test_gpg_present_keyring_trust_change( trust="ultimately", keyserver="nonexistent", keyring=keyring, + test=testmode, ) - assert ret.result + assert ret.result is not False + assert (ret.result is None) is testmode assert ret.changes assert ret.changes == {key_a_fp[-16:]: {"trust": "ultimately"}} key_info = gnupg_keyring.list_keys(keys=key_a_fp) assert key_info - assert key_info[0]["trust"] == "u" + assert (key_info[0]["trust"] == "u") is not testmode +# Cannot whitelist source/text tests for Windows since it uses a +# keyring internally, which causes test timeouts for some reason. def test_gpg_present_source( - gpghome, gpg, gnupg, key_a_fp, key_a_pub, key_b_pub, key_b_fp + gpghome, gpg, gnupg, key_a_fp, key_a_pub, key_b_pub, key_b_fp, testmode ): with pytest.helpers.temp_file( "keys", contents=key_a_pub + "\n" + key_b_pub @@ -244,31 +304,110 @@ def test_gpg_present_source( gnupghome=str(gpghome), skip_keyserver=True, source=str(keyfile), + test=testmode, ) - assert ret.result + assert ret.result is not False + assert (ret.result is None) is testmode assert ret.changes assert key_a_fp[-16:] in ret.changes assert ret.changes[key_a_fp[-16:]]["added"] - assert gnupg.list_keys(keys=key_a_fp) + assert bool(gnupg.list_keys(keys=key_a_fp)) is not testmode assert not gnupg.list_keys(keys=key_b_fp) +@pytest.mark.parametrize("keyring", ((),), indirect=True) +def test_gpg_present_source_keyring( + gpghome, + gpg, + gnupg, + gnupg_keyring, + keyring, + key_a_fp, + key_a_pub, + key_b_pub, + key_b_fp, + testmode, +): + """ + Ensure imports from a list of file sources to a keyring work + """ + with pytest.helpers.temp_file( + "keys", contents=key_a_pub + "\n" + key_b_pub + ) as keyfile: + ret = gpg.present( + key_a_fp[-16:], + gnupghome=str(gpghome), + skip_keyserver=True, + source=str(keyfile), + keyring=keyring, + test=testmode, + ) + assert ret.result is not False + assert (ret.result is None) is testmode + assert ret.changes + assert key_a_fp[-16:] in ret.changes + assert ret.changes[key_a_fp[-16:]]["added"] + assert not gnupg.list_keys(keys=key_a_fp) + assert bool(gnupg_keyring.list_keys(keys=key_a_fp)) is not testmode + assert not gnupg_keyring.list_keys(keys=key_b_fp) + + @pytest.mark.skipif( PYGNUPG_VERSION < (0, 5, 1), reason="Text requires python-gnupg >=0.5.1" ) def test_gpg_present_text( - gpghome, gpg, gnupg, key_a_fp, key_a_pub, key_b_pub, key_b_fp + gpghome, gpg, gnupg, key_a_fp, key_a_pub, key_b_pub, key_b_fp, testmode ): concat = key_a_pub + "\n" + key_b_pub - ret = gpg.present(key_a_fp[-16:], gnupghome=str(gpghome), text=concat) - assert ret.result + ret = gpg.present( + key_a_fp[-16:], gnupghome=str(gpghome), text=concat, test=testmode + ) + assert ret.result is not False + assert (ret.result is None) is testmode assert ret.changes assert key_a_fp[-16:] in ret.changes assert ret.changes[key_a_fp[-16:]]["added"] - assert gnupg.list_keys(keys=key_a_fp) + assert bool(gnupg.list_keys(keys=key_a_fp)) is not testmode assert not gnupg.list_keys(keys=key_b_fp) +@pytest.mark.skipif( + PYGNUPG_VERSION < (0, 5, 1), reason="Text requires python-gnupg >=0.5.1" +) +@pytest.mark.parametrize("keyring", ((),), indirect=True) +def test_gpg_present_text_keyring( + gpghome, + gpg, + gnupg, + gnupg_keyring, + keyring, + key_a_fp, + key_a_pub, + key_b_pub, + key_b_fp, + testmode, +): + """ + Ensure imports from a textual source to a keyring work + """ + concat = key_a_pub + "\n" + key_b_pub + ret = gpg.present( + key_a_fp[-16:], + gnupghome=str(gpghome), + keyring=keyring, + text=concat, + test=testmode, + ) + assert ret.result is not False + assert (ret.result is None) is testmode + assert ret.changes + assert key_a_fp[-16:] in ret.changes + assert ret.changes[key_a_fp[-16:]]["added"] + assert not gnupg.list_keys(keys=key_a_fp) + assert bool(gnupg_keyring.list_keys(keys=key_a_fp)) is not testmode + assert not gnupg_keyring.list_keys(keys=key_b_fp) + + @pytest.mark.skipif( PYGNUPG_VERSION < (0, 5, 1), reason="Text requires python-gnupg >=0.5.1" ) @@ -320,7 +459,8 @@ def test_gpg_present_source_not_contained( assert not gnupg.list_keys(keys=key_a_fp) assert not gnupg.list_keys(keys=key_b_fp) assert ( - "none of the specified sources were found or contained the key" in ret.comment + "none of the specified sources were found or contained the (unexpired) key" + in ret.comment ) @@ -345,6 +485,162 @@ def test_gpg_present_source_bad_keyfile( assert not gnupg.list_keys(keys=key_b_fp) +@pytest.mark.parametrize( + "method", + ( + "source", + pytest.param( + "text", + marks=pytest.mark.skipif( + PYGNUPG_VERSION < (0, 5, 1), reason="Text requires python-gnupg >=0.5.1" + ), + ), + ), +) +def test_gpg_present_import_expired_key( + method, gpghome, gpg, gnupg, key_e_fp, key_e_pub +): + """ + Ensure that when a newly imported key is expired, the state fails. + The key should be imported though if it was not present before. + """ + if method == "source": + ctx = pytest.helpers.temp_file("keys", contents=key_e_pub) + else: + ctx = contextlib.nullcontext() + params = {"text": key_e_pub} + with ctx as inst: + if method == "source": + params = {"source": [str(inst)]} + ret = gpg.present( + key_e_fp[-16:], + gnupghome=str(gpghome), + skip_keyserver=True, + **params, + ) + assert ret.result is False + assert "is expired" in ret.comment + assert ret.changes + assert ret.changes[key_e_fp[-16:]]["added"] + assert gnupg.list_keys(keys=key_e_fp) + + +@pytest.mark.usefixtures("_pubkeys_present") +@pytest.mark.parametrize("_pubkeys_present", (("e",),), indirect=True) +@pytest.mark.parametrize( + "method", + ( + "source", + pytest.param( + "text", + marks=pytest.mark.skipif( + PYGNUPG_VERSION < (0, 5, 1), reason="Text requires python-gnupg >=0.5.1" + ), + ), + ), +) +def test_gpg_present_expired_key_already_present_fails( + method, gpghome, gpg, gnupg, key_e_fp, key_e_pub +): + """ + Ensure that when a present key is expired and no new one can be found, + the state fails without changes. + """ + if method == "source": + ctx = pytest.helpers.temp_file("keys", contents=key_e_pub) + else: + ctx = contextlib.nullcontext() + params = {"text": key_e_pub} + with ctx as inst: + if method == "source": + params = {"source": [str(inst)]} + ret = gpg.present( + key_e_fp[-16:], + gnupghome=str(gpghome), + skip_keyserver=True, + **params, + ) + assert ret.result is False + if method == "source": + assert "contained the (unexpired) key" in ret.comment + else: + assert "but it's expired" in ret.comment + assert not ret.changes + + +@pytest.mark.usefixtures("_pubkeys_present") +@pytest.mark.parametrize("_pubkeys_present", (("e",),), indirect=True) +@pytest.mark.parametrize( + "method", + ( + "source", + pytest.param( + "text", + marks=pytest.mark.skipif( + PYGNUPG_VERSION < (0, 5, 1), reason="Text requires python-gnupg >=0.5.1" + ), + ), + ), +) +def test_gpg_present_expired_key_already_present_refresh( + method, gpghome, gpg, gnupg, key_e_fp, key_e_pub_notexpired, testmode +): + """ + Ensure that when a present key is expired and a new one is available, + the key is reimported. + """ + if method == "source": + ctx = pytest.helpers.temp_file("keys", contents=key_e_pub_notexpired) + else: + ctx = contextlib.nullcontext() + params = {"text": key_e_pub_notexpired} + with ctx as inst: + if method == "source": + params = {"source": [str(inst)]} + ret = gpg.present( + key_e_fp[-16:], + gnupghome=str(gpghome), + skip_keyserver=True, + **params, + test=testmode, + ) + assert ret.result is not False + assert (ret.result is None) is testmode + assert "the existing one was expired" in ret.comment + assert ret.changes + assert ret.changes[key_e_fp[-16:]]["added"] + assert ret.changes[key_e_fp[-16:]]["refresh"] + + +@pytest.mark.usefixtures("_pubkeys_present") +@pytest.mark.parametrize("_pubkeys_present", (("e",),), indirect=True) +def test_gpg_present_expired_key_trust_change( + gpghome, gpg, gnupg, key_e_fp, key_e_pub_notexpired, testmode +): + """ + Test that key expiry updates and trust changes work together + """ + assert gnupg.list_keys(keys=key_e_fp) + with pytest.helpers.temp_file("keys", contents=key_e_pub_notexpired) as keyfile: + ret = gpg.present( + key_e_fp[-16:], + gnupghome=str(gpghome), + trust="ultimately", + skip_keyserver=True, + source=[str(keyfile)], + test=testmode, + ) + assert ret.result is not False + assert (ret.result is None) is testmode + assert ret.changes + assert ret.changes == { + key_e_fp[-16:]: {"added": True, "refresh": True, "trust": "ultimately"} + } + key_info = gnupg.list_keys(keys=key_e_fp) + assert key_info + assert (key_info[0]["trust"] == "u") is not testmode + + @pytest.mark.windows_whitelisted def test_gpg_absent_no_changes(gpghome, gpg, gnupg, key_a_fp): assert not gnupg.list_keys(keys=key_a_fp) diff --git a/tests/pytests/unit/modules/test_gpg.py b/tests/pytests/unit/modules/test_gpg.py index aca8b4d3b04..332f13f7895 100644 --- a/tests/pytests/unit/modules/test_gpg.py +++ b/tests/pytests/unit/modules/test_gpg.py @@ -283,6 +283,7 @@ def test_list_keys(): "uids": ["GPG Person "], "created": "2017-09-28", "expires": "2033-09-24", + "expired": False, "keyLength": "4096", "ownerTrust": "Unknown", "fingerprint": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", @@ -293,6 +294,7 @@ def test_list_keys(): "uids": ["GPG Person "], "created": "2017-09-28", "expires": "2033-09-24", + "expired": False, "keyLength": "4096", "ownerTrust": "Unknown", "fingerprint": "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy", @@ -355,6 +357,7 @@ def test_get_key(): "trust": "Unknown", "ownerTrust": "Unknown", "expires": "2033-09-24", + "expired": False, "keyLength": "4096", } diff --git a/tests/pytests/unit/states/test_gpg.py b/tests/pytests/unit/states/test_gpg.py index 13ba97cd493..cba09db0cad 100644 --- a/tests/pytests/unit/states/test_gpg.py +++ b/tests/pytests/unit/states/test_gpg.py @@ -82,7 +82,7 @@ def gpg_trust(request): yield trust -@pytest.fixture() +@pytest.fixture def gpg_receive(request): recv = Mock(spec="salt.modules.gpg.receive_keys") recv.return_value = getattr( @@ -92,6 +92,22 @@ def gpg_receive(request): yield recv +@pytest.fixture +def gpg_get_key(keys_list): + def _get_key(keyid=None, **kwargs): + if keyid == "new": + ret = keys_list[3].copy() + ret["keyid"] = "new" + return ret + return next(iter(x for x in keys_list if x["keyid"] == keyid)) + + getkey = Mock(spec="salt.modules.gpg.get_key") + getkey.side_effect = _get_key + + with patch.dict(gpg.__salt__, {"gpg.get_key": getkey}): + yield getkey + + @pytest.mark.usefixtures("gpg_list_keys") @pytest.mark.parametrize( "gpg_trust,expected", @@ -109,7 +125,7 @@ def test_gpg_present_trust_change(gpg_receive, gpg_trust, expected): gpg_receive.assert_not_called() -@pytest.mark.usefixtures("gpg_list_keys") +@pytest.mark.usefixtures("gpg_list_keys", "gpg_get_key") @pytest.mark.parametrize( "gpg_receive,expected", [ @@ -134,7 +150,7 @@ def test_gpg_present_new_key(gpg_receive, gpg_trust, expected): gpg_trust.assert_not_called() -@pytest.mark.usefixtures("gpg_list_keys") +@pytest.mark.usefixtures("gpg_list_keys", "gpg_get_key") @pytest.mark.parametrize( "gpg_trust,expected", [