From 32cfcfd777b0132495d8ef86ca3483a94261ef23 Mon Sep 17 00:00:00 2001 From: jeanluc Date: Fri, 2 Dec 2022 01:15:10 +0100 Subject: [PATCH] Add keyring, gnupghome to rest, add functional tests --- salt/modules/gpg.py | 193 ++++++++--- salt/states/gpg.py | 88 ++++- tests/pytests/functional/modules/test_gpg.py | 331 +++++++++++++++++++ tests/pytests/functional/states/test_gpg.py | 54 +++ tests/pytests/unit/states/test_gpg.py | 62 ++++ 5 files changed, 682 insertions(+), 46 deletions(-) diff --git a/salt/modules/gpg.py b/salt/modules/gpg.py index 7f9a4712e37..5cfc0bcda32 100644 --- a/salt/modules/gpg.py +++ b/salt/modules/gpg.py @@ -154,11 +154,17 @@ def _restore_ownership(func): userinfo = _get_user_info(user) run_user = _get_user_info() - if userinfo["uid"] != run_user["uid"] and os.path.exists(gnupghome): - # Given user is different from one who runs Salt process, - # need to fix ownership permissions for GnuPG home dir - group = __salt__["file.gid_to_group"](run_user["gid"]) - for path in [gnupghome] + __salt__["file.find"](gnupghome): + if userinfo["uid"] != run_user["uid"]: + group = None + if os.path.exists(gnupghome): + # Given user is different from one who runs Salt process, + # need to fix ownership permissions for GnuPG home dir + group = __salt__["file.gid_to_group"](run_user["gid"]) + for path in [gnupghome] + __salt__["file.find"](gnupghome): + __salt__["file.chown"](path, run_user["name"], group) + if "keyring" in kwargs and os.path.exists(kwargs["keyring"]): + if group is None: + group = __salt__["file.gid_to_group"](run_user["gid"]) __salt__["file.chown"](path, run_user["name"], group) # Filter special kwargs @@ -172,38 +178,39 @@ def _restore_ownership(func): group = __salt__["file.gid_to_group"](userinfo["gid"]) for path in [gnupghome] + __salt__["file.find"](gnupghome): __salt__["file.chown"](path, user, group) - + if "keyring" in kwargs and os.path.exists(kwargs["keyring"]): + __salt__["file.chown"](path, user, group) return ret return func_wrapper -def _create_gpg(user=None, gnupghome=None): +def _create_gpg(user=None, gnupghome=None, keyring=None): """ Create the GPG object """ if not gnupghome: gnupghome = _get_user_gnupghome(user) - gpg = gnupg.GPG(gnupghome=gnupghome) + gpg = gnupg.GPG(gnupghome=gnupghome, keyring=keyring) return gpg -def _list_keys(user=None, gnupghome=None, secret=False): +def _list_keys(secret=False, user=None, gnupghome=None, keyring=None): """ Helper function for Listing keys """ - gpg = _create_gpg(user, gnupghome) + gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring) _keys = gpg.list_keys(secret) return _keys -def _search_keys(text, keyserver, user=None): +def _search_keys(text, keyserver, user=None, gnupghome=None): """ Helper function for searching keys from keyserver """ - gpg = _create_gpg(user) + gpg = _create_gpg(user=user, gnupghome=gnupghome) if keyserver: _keys = gpg.search_keys(text, keyserver) else: @@ -211,7 +218,7 @@ def _search_keys(text, keyserver, user=None): return _keys -def search_keys(text, keyserver=None, user=None): +def search_keys(text, keyserver=None, user=None, gnupghome=None): """ Search keys from keyserver @@ -226,6 +233,11 @@ def search_keys(text, keyserver=None, user=None): Passing the user as ``salt`` will set the GnuPG home directory to the ``/etc/salt/gpgkeys``. + gnupghome + Specify the location where GPG keyring and related files are stored. + + .. versionadded:: 3006 + CLI Example: .. code-block:: bash @@ -241,7 +253,7 @@ def search_keys(text, keyserver=None, user=None): keyserver = _DEFAULT_KEY_SERVER _keys = [] - for _key in _search_keys(text, keyserver, user): + for _key in _search_keys(text, keyserver, user=user, gnupghome=gnupghome): tmp = {"keyid": _key["keyid"], "uids": _key["uids"]} expires = _key.get("expires", None) @@ -262,7 +274,7 @@ def search_keys(text, keyserver=None, user=None): return _keys -def list_keys(user=None, gnupghome=None): +def list_keys(user=None, gnupghome=None, keyring=None): """ List keys in GPG keychain @@ -274,6 +286,12 @@ def list_keys(user=None, gnupghome=None): gnupghome Specify the location where GPG keyring and related files are stored. + keyring + Limit the operation to this specific keyring, specified as + a local filesystem path. + + .. versionadded:: 3006 + CLI Example: .. code-block:: bash @@ -282,7 +300,7 @@ def list_keys(user=None, gnupghome=None): """ _keys = [] - for _key in _list_keys(user, gnupghome): + for _key in _list_keys(user=user, gnupghome=gnupghome, keyring=keyring): tmp = { "keyid": _key["keyid"], "fingerprint": _key["fingerprint"], @@ -313,7 +331,7 @@ def list_keys(user=None, gnupghome=None): return _keys -def list_secret_keys(user=None, gnupghome=None): +def list_secret_keys(user=None, gnupghome=None, keyring=None): """ List secret keys in GPG keychain @@ -325,6 +343,12 @@ def list_secret_keys(user=None, gnupghome=None): gnupghome Specify the location where GPG keyring and related files are stored. + keyring + Limit the operation to this specific keyring, specified as + a local filesystem path. + + .. versionadded:: 3006 + CLI Example: .. code-block:: bash @@ -333,7 +357,9 @@ def list_secret_keys(user=None, gnupghome=None): """ _keys = [] - for _key in _list_keys(user, gnupghome, secret=True): + for _key in _list_keys( + user=user, gnupghome=gnupghome, keyring=keyring, secret=True + ): tmp = { "keyid": _key["keyid"], "fingerprint": _key["fingerprint"], @@ -377,6 +403,7 @@ def create_key( use_passphrase=False, user=None, gnupghome=None, + keyring=None, ): """ Create a key in the GPG keychain @@ -433,6 +460,12 @@ def create_key( gnupghome Specify the location where GPG keyring and related files are stored. + keyring + Limit the operation to this specific keyring, specified as + a local filesystem path. + + .. versionadded:: 3006 + CLI Example: .. code-block:: bash @@ -449,7 +482,7 @@ def create_key( "name_comment": name_comment, } - gpg = _create_gpg(user, gnupghome) + gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring) if name_email: create_params["name_email"] = name_email @@ -502,6 +535,7 @@ def delete_key( user=None, gnupghome=None, use_passphrase=True, + keyring=None, ): """ Delete a key from the GPG keychain. @@ -530,6 +564,12 @@ def delete_key( .. versionadded:: 3003 + keyring + Limit the operation to this specific keyring, specified as + a local filesystem path. + + .. versionadded:: 3006 + CLI Example: .. code-block:: bash @@ -555,8 +595,14 @@ def delete_key( ret["message"] = "Required argument, fingerprint or keyid" return ret - gpg = _create_gpg(user, gnupghome) - key = get_key(keyid=keyid, fingerprint=fingerprint, user=user, gnupghome=gnupghome) + gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring) + key = get_key( + keyid=keyid, + fingerprint=fingerprint, + user=user, + gnupghome=gnupghome, + keyring=keyring, + ) def __delete_key(fingerprint, secret, use_passphrase): if secret and use_passphrase: @@ -571,7 +617,13 @@ def delete_key( if key: fingerprint = key["fingerprint"] - skey = get_secret_key(keyid, fingerprint, user, gnupghome=gnupghome) + skey = get_secret_key( + keyid=keyid, + fingerprint=fingerprint, + user=user, + gnupghome=gnupghome, + keyring=keyring, + ) if skey: if not delete_secret: ret["res"] = False @@ -605,7 +657,7 @@ def delete_key( return ret -def get_key(keyid=None, fingerprint=None, user=None, gnupghome=None): +def get_key(keyid=None, fingerprint=None, user=None, gnupghome=None, keyring=None): """ Get a key from the GPG keychain @@ -623,6 +675,12 @@ def get_key(keyid=None, fingerprint=None, user=None, gnupghome=None): gnupghome Specify the location where GPG keyring and related files are stored. + keyring + Limit the operation to this specific keyring, specified as + a local filesystem path. + + .. versionadded:: 3006 + CLI Example: .. code-block:: bash @@ -635,7 +693,7 @@ def get_key(keyid=None, fingerprint=None, user=None, gnupghome=None): """ tmp = {} - for _key in _list_keys(user, gnupghome): + for _key in _list_keys(user=user, gnupghome=gnupghome, keyring=keyring): if ( _key["fingerprint"] == fingerprint or _key["keyid"] == keyid @@ -671,7 +729,9 @@ def get_key(keyid=None, fingerprint=None, user=None, gnupghome=None): return tmp -def get_secret_key(keyid=None, fingerprint=None, user=None, gnupghome=None): +def get_secret_key( + keyid=None, fingerprint=None, user=None, gnupghome=None, keyring=None +): """ Get a key from the GPG keychain @@ -689,6 +749,12 @@ def get_secret_key(keyid=None, fingerprint=None, user=None, gnupghome=None): gnupghome Specify the location where GPG keyring and related files are stored. + keyring + Limit the operation to this specific keyring, specified as + a local filesystem path. + + .. versionadded:: 3006 + CLI Example: .. code-block:: bash @@ -701,7 +767,9 @@ def get_secret_key(keyid=None, fingerprint=None, user=None, gnupghome=None): """ tmp = {} - for _key in _list_keys(user, gnupghome, secret=True): + for _key in _list_keys( + user=user, gnupghome=gnupghome, keyring=keyring, secret=True + ): if ( _key["fingerprint"] == fingerprint or _key["keyid"] == keyid @@ -738,7 +806,7 @@ def get_secret_key(keyid=None, fingerprint=None, user=None, gnupghome=None): @_restore_ownership -def import_key(text=None, filename=None, user=None, gnupghome=None): +def import_key(text=None, filename=None, user=None, gnupghome=None, keyring=None): r""" Import a key from text or file @@ -756,6 +824,12 @@ def import_key(text=None, filename=None, user=None, gnupghome=None): gnupghome Specify the location where GPG keyring and related files are stored. + keyring + Limit the operation to this specific keyring, specified as + a local filesystem path. + + .. versionadded:: 3006 + CLI Example: .. code-block:: bash @@ -766,11 +840,11 @@ def import_key(text=None, filename=None, user=None, gnupghome=None): """ ret = {"res": True, "message": ""} - gpg = _create_gpg(user, gnupghome) - if not text and not filename: raise SaltInvocationError("filename or text must be passed.") + gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring) + if filename: try: with salt.utils.files.flopen(filename, "rb") as _fp: @@ -801,6 +875,7 @@ def export_key( use_passphrase=False, output=None, bare=False, + keyring=None, ): """ Export a key from the GPG keychain @@ -838,6 +913,12 @@ def export_key( .. versionadded:: 3006.0 + keyring + Limit the operation to this specific keyring, specified as + a local filesystem path. + + .. versionadded:: 3006 + CLI Example: .. code-block:: bash @@ -850,7 +931,7 @@ def export_key( """ ret = {"res": True} - gpg = _create_gpg(user, gnupghome) + gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring) if isinstance(keyids, str): keyids = keyids.split(",") @@ -887,7 +968,7 @@ def export_key( @_restore_ownership -def receive_keys(keyserver=None, keys=None, user=None, gnupghome=None): +def receive_keys(keyserver=None, keys=None, user=None, gnupghome=None, keyring=None): """ Receive key(s) from keyserver and add them to keychain @@ -906,6 +987,12 @@ def receive_keys(keyserver=None, keys=None, user=None, gnupghome=None): gnupghome Specify the location where GPG keyring and related files are stored. + keyring + Limit the operation to this specific keyring, specified as + a local filesystem path. + + .. versionadded:: 3006 + CLI Example: .. code-block:: bash @@ -919,7 +1006,7 @@ def receive_keys(keyserver=None, keys=None, user=None, gnupghome=None): """ ret = {"res": True, "message": []} - gpg = _create_gpg(user, gnupghome) + gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring) if not keyserver: keyserver = _DEFAULT_KEY_SERVER @@ -1060,6 +1147,7 @@ def sign( output=None, use_passphrase=False, gnupghome=None, + keyring=None, ): """ Sign message or file @@ -1070,7 +1158,7 @@ def sign( ``/etc/salt/gpgkeys``. keyid - The keyid of the key to set the trust level for, defaults to + The keyid of the key to use for signing, defaults to first key in the secret keyring. text @@ -1089,6 +1177,12 @@ def sign( gnupghome Specify the location where GPG keyring and related files are stored. + keyring + Limit the operation to this specific keyring, specified as + a local filesystem path. + + .. versionadded:: 3006 + CLI Example: .. code-block:: bash @@ -1100,7 +1194,6 @@ def sign( salt '*' gpg.sign filename='/path/to/important.file' use_passphrase=True """ - gpg = _create_gpg(user, gnupghome) if use_passphrase: gpg_passphrase = __salt__["pillar.get"]("gpg_passphrase") if not gpg_passphrase: @@ -1108,6 +1201,8 @@ def sign( else: gpg_passphrase = None + gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring) + if text: signed_data = gpg.sign(text, keyid=keyid, passphrase=gpg_passphrase) elif filename: @@ -1131,6 +1226,7 @@ def verify( trustmodel=None, signed_by_any=None, signed_by_all=None, + keyring=None, ): """ Verify a message or file @@ -1182,6 +1278,12 @@ def verify( .. versionadded:: 3007.0 + keyring + Limit the operation to this specific keyring, specified as + a local filesystem path. + + .. versionadded:: 3006 + CLI Example: .. code-block:: bash @@ -1192,7 +1294,6 @@ def verify( salt '*' gpg.verify filename='/path/to/important.file' trustmodel=direct """ - gpg = _create_gpg(user, gnupghome) trustmodels = ("pgp", "classic", "tofu", "tofu+pgp", "direct", "always", "auto") if trustmodel and trustmodel not in trustmodels: @@ -1202,6 +1303,7 @@ def verify( log.warning(msg) return {"res": False, "message": msg} + gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring) extra_args = [] if trustmodel: @@ -1328,6 +1430,7 @@ def encrypt( always_trust=False, gnupghome=None, bare=False, + keyring=None, ): """ Encrypt a message or file @@ -1370,6 +1473,12 @@ def encrypt( If ``True``, return the (armored) encrypted block as a string without the standard comment/res dict. + keyring + Limit the operation to this specific keyring, specified as + a local filesystem path. + + .. versionadded:: 3006 + CLI Example: .. code-block:: bash @@ -1383,8 +1492,6 @@ def encrypt( """ ret = {"res": True, "comment": ""} - gpg = _create_gpg(user, gnupghome) - if sign and use_passphrase: gpg_passphrase = __salt__["pillar.get"]("gpg_passphrase") if not gpg_passphrase: @@ -1392,6 +1499,8 @@ def encrypt( else: gpg_passphrase = None + gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring) + if text: result = gpg.encrypt( text, @@ -1444,6 +1553,7 @@ def decrypt( use_passphrase=False, gnupghome=None, bare=False, + keyring=None, ): """ Decrypt a message or file @@ -1473,6 +1583,12 @@ def decrypt( If ``True``, return the (armored) decrypted block as a string without the standard comment/res dict. + keyring + Limit the operation to this specific keyring, specified as + a local filesystem path. + + .. versionadded:: 3006 + CLI Example: .. code-block:: bash @@ -1483,7 +1599,6 @@ def decrypt( """ ret = {"res": True, "comment": ""} - gpg = _create_gpg(user, gnupghome) if use_passphrase: gpg_passphrase = __salt__["pillar.get"]("gpg_passphrase") if not gpg_passphrase: @@ -1491,6 +1606,8 @@ def decrypt( else: gpg_passphrase = None + gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring) + if text: result = gpg.decrypt(text, passphrase=gpg_passphrase) elif filename: diff --git a/salt/states/gpg.py b/salt/states/gpg.py index 14ad37d98f0..4178269b668 100644 --- a/salt/states/gpg.py +++ b/salt/states/gpg.py @@ -32,7 +32,14 @@ TRUST_MAP = { def present( - name, keys=None, user=None, keyserver=None, gnupghome=None, trust=None, **kwargs + name, + keys=None, + user=None, + keyserver=None, + gnupghome=None, + trust=None, + keyring=None, + **kwargs, ): """ Ensure a GPG public key is present in the GPG keychain. @@ -57,11 +64,19 @@ def present( ignored by default. Valid trust levels: expired, unknown, not_trusted, marginally, fully, ultimately + + keyring + Limit the operation to this specific keyring, specified as + a local filesystem path. + + .. versionadded:: 3006 """ ret = {"name": name, "result": True, "changes": {}, "comment": []} - _current_keys = __salt__["gpg.list_keys"](user=user, gnupghome=gnupghome) + _current_keys = __salt__["gpg.list_keys"]( + user=user, gnupghome=gnupghome, keyring=keyring + ) current_keys = {} for key in _current_keys: @@ -123,10 +138,11 @@ def present( ) continue result = __salt__["gpg.receive_keys"]( - keyserver, - key, - user, - gnupghome, + keyserver=keyserver, + keys=key, + user=user, + gnupghome=gnupghome, + keyring=keyring, ) if result["res"] is False: ret["result"] = result["res"] @@ -156,7 +172,15 @@ def present( return ret -def absent(name, keys=None, user=None, gnupghome=None, **kwargs): +def absent( + name, + keys=None, + user=None, + gnupghome=None, + keyring=None, + keyring_absent_if_empty=False, + **kwargs, +): """ Ensure a GPG public key is absent from the keychain. @@ -171,11 +195,25 @@ def absent(name, keys=None, user=None, gnupghome=None, **kwargs): gnupghome Override GnuPG home directory. + + keyring + Limit the operation to this specific keyring, specified as + a local filesystem path. + + .. versionadded:: 3006 + + keyring_absent_if_empty + Make sure to not leave behind an empty keyring file + if ``keyring`` was specified. Defaults to false. + + .. versionadded:: 3006 """ ret = {"name": name, "result": True, "changes": {}, "comment": []} - _current_keys = __salt__["gpg.list_keys"](user=user, gnupghome=gnupghome) + _current_keys = __salt__["gpg.list_keys"]( + user=user, gnupghome=gnupghome, keyring=keyring + ) current_keys = [] for key in _current_keys: @@ -198,6 +236,8 @@ def absent(name, keys=None, user=None, gnupghome=None, **kwargs): keyid=key, user=user, gnupghome=gnupghome, + keyring=keyring, + use_passphrase=False, ) if result["res"] is False: ret["result"] = result["res"] @@ -207,5 +247,37 @@ def absent(name, keys=None, user=None, gnupghome=None, **kwargs): salt.utils.dictupdate.append_dict_key_value(ret, "changes:deleted", key) else: ret["comment"].append(f"{key} not found in GPG keychain") + + if __opts__["test"] or not ret["result"]: + return ret + + _new_keys = [ + x["keyid"] + for x in __salt__["gpg.list_keys"]( + user=user, gnupghome=gnupghome, keyring=keyring + ) + ] + + if set(keys) & set(_new_keys): + remaining = set(keys) & set(_new_keys) + ret["result"] = False + ret["comment"].append( + "State check revealed the following keys could not be deleted: " + + ", ".join(remaining) + ) + ret["changes"]["deleted"] = list( + set(ret["changes"]["deleted"]) - set(_new_keys) + ) + + elif ( + not _new_keys + and keyring + and keyring_absent_if_empty + and __salt__["file.file_exists"](keyring) + ): + __salt__["file.remove"](keyring) + ret["comment"].append(f"Removed keyring file {keyring}") + ret["changes"]["removed"] = keyring + ret["comment"] = "\n".join(ret["comment"]) return ret diff --git a/tests/pytests/functional/modules/test_gpg.py b/tests/pytests/functional/modules/test_gpg.py index 48edc5a950c..4dd7cbd79d7 100644 --- a/tests/pytests/functional/modules/test_gpg.py +++ b/tests/pytests/functional/modules/test_gpg.py @@ -350,6 +350,20 @@ kBGl+/D1MBJLt6q8GZWHMWIHOX4GN28A/PEemaKg3dZHEtPM3w== -----END PGP SIGNATURE-----""" +@pytest.fixture +def secret_message(): + return """\ +-----BEGIN PGP MESSAGE----- + +hIwDVTqCoFjAx5UBA/wIt5OUfsKV2VPB2P+c6r7xVvPIPiA1FjNpU2x1G8A/dxVq +kAOhXJ9KkM6yon0PJReF3w8QPgZCo5tCmwqMtin4OY/WTw1ExyIWIaS7XJh1ktPM +TJL7RpyeywGHiAveLs9rznZtVwi0xg+rTSWpoMS/8GbKpOyf3twWMsiFfndr09JJ +ASWYXtfsUT3IVA5dP0Mr3/Yg0v90d+X2RqUHM+sUiUtwh4mb+vUcm7UOQRyGAR4V +h7jTNclSQwWCGzx6OaWKnrCVafRXbH4aeA== +=tw4x +-----END PGP MESSAGE-----""" + + @pytest.fixture(params=["a"]) def sig(request, tmp_path): sigs = "\n".join(request.getfixturevalue(f"key_{x}_sig") for x in request.param) @@ -364,6 +378,16 @@ def gnupg(gpghome): return gnupglib.GPG(gnupghome=str(gpghome)) +@pytest.fixture +def gnupg_keyring(gpghome, keyring): + return gnupglib.GPG(gnupghome=str(gpghome), keyring=keyring) + + +@pytest.fixture +def gnupg_privkeyring(gpghome, keyring_privkeys): + return gnupglib.GPG(gnupghome=str(gpghome), keyring=keyring_privkeys) + + @pytest.fixture(params=["abcde"]) def pubkeys_present(gnupg, request): pubkeys = [request.getfixturevalue(f"key_{x}_pub") for x in request.param] @@ -376,6 +400,240 @@ def pubkeys_present(gnupg, request): # cleanup is taken care of by gpghome and tmp_path +@pytest.fixture(params=["ab"]) +def privkeys_present(gnupg, request): + privkeys = [request.getfixturevalue(f"key_{x}_priv") for x in request.param] + fingerprints = [request.getfixturevalue(f"key_{x}_fp") for x in request.param] + res = gnupg.import_keys("\n".join(privkeys)) + assert set(res.fingerprints) == set(fingerprints) + present_keys = gnupg.list_keys(secret=True) + assert present_keys + for fp in fingerprints: + assert any(x["fingerprint"] == fp for x in present_keys) + yield + # cleanup is taken care of by gpghome and tmp_path + + +@pytest.fixture(params=["a"]) +def keyring(gpghome, tmp_path, request): + keyring = tmp_path / "keys.gpg" + _gnupg_keyring = gnupglib.GPG(gnupghome=str(gpghome), keyring=str(keyring)) + pubkeys = [request.getfixturevalue(f"key_{x}_pub") for x in request.param] + fingerprints = [request.getfixturevalue(f"key_{x}_fp") for x in request.param] + _gnupg_keyring.import_keys("\n".join(pubkeys)) + present_keys = _gnupg_keyring.list_keys() + for fp in fingerprints: + assert any(x["fingerprint"] == fp for x in present_keys) + yield str(keyring) + # cleanup is taken care of by gpghome and tmp_path + + +@pytest.fixture(params=["a"]) +def keyring_privkeys(gpghome, gnupg, tmp_path, request): + keyring = tmp_path / "keys.gpg" + _gnupg_keyring = gnupglib.GPG(gnupghome=str(gpghome), keyring=str(keyring)) + privkeys = [request.getfixturevalue(f"key_{x}_priv") for x in request.param] + fingerprints = [request.getfixturevalue(f"key_{x}_fp") for x in request.param] + _gnupg_keyring.import_keys("\n".join(privkeys)) + present_privkeys = _gnupg_keyring.list_keys(secret=True) + assert present_privkeys + for fp in fingerprints: + assert any(x["fingerprint"] == fp for x in present_privkeys) + yield str(keyring) + # cleanup is taken care of by gpghome and tmp_path + + +@pytest.mark.usefixtures("pubkeys_present") +def test_list_keys(gpg, gpghome, gnupg): + res = gpg.list_keys(gnupghome=str(gpghome)) + assert res + assert len(res) == len(gnupg.list_keys()) + + +def test_list_keys_in_keyring(gpg, gpghome, keyring, gnupg_keyring): + res = gpg.list_keys(gnupghome=str(gpghome), keyring=keyring) + assert len(res) == len(gnupg_keyring.list_keys()) + + +@pytest.mark.usefixtures("privkeys_present") +@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails") +def test_list_secret_keys(gpghome, gpg, gnupg): + res = gpg.list_secret_keys(gnupghome=str(gpghome)) + assert len(res) == len(gnupg.list_keys(secret=True)) + + +@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails") +def test_list_secret_keys_in_keyring(gpghome, gpg, keyring_privkeys, gnupg_privkeyring): + res = gpg.list_secret_keys(gnupghome=str(gpghome), keyring=keyring_privkeys) + assert len(res) == len(gnupg_privkeyring.list_keys(secret=True)) + + +@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails") +@pytest.mark.requires_random_entropy() +def test_create_key(gpghome, gpg, gnupg): + res = gpg.create_key(gnupghome=str(gpghome)) + assert res + assert "message" in res + assert "successfully generated" in res["message"] + assert "fingerprint" in res + assert res["fingerprint"] + assert gnupg.list_keys(secret=True, keys=res["fingerprint"]) + + +@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails") +@pytest.mark.requires_random_entropy() +def test_create_key_in_keyring(gpghome, gpg, gnupg, keyring, gnupg_keyring): + res = gpg.create_key(gnupghome=str(gpghome), keyring=keyring) + assert res + assert "message" in res + assert "successfully generated" in res["message"] + assert "fingerprint" in res + assert res["fingerprint"] + assert not gnupg.list_keys(secret=True, keys=res["fingerprint"]) + assert gnupg_keyring.list_keys(secret=True, keys=res["fingerprint"]) + + +@pytest.mark.usefixtures("pubkeys_present") +@pytest.mark.skip_unless_on_linux( + reason="Complains about deleting private keys first when they are absent" +) +def test_delete_key(gpghome, gpg, gnupg, key_a_fp): + assert gnupg.list_keys(keys=key_a_fp) + res = gpg.delete_key( + fingerprint=key_a_fp, gnupghome=str(gpghome), use_passphrase=False + ) + assert res["res"] + assert not gnupg.list_keys(keys=key_a_fp) + + +@pytest.mark.usefixtures("pubkeys_present") +@pytest.mark.skip_unless_on_linux( + reason="Complains about deleting private keys first when they are absent" +) +def test_delete_key_from_keyring(gpghome, gpg, key_a_fp, keyring, gnupg, gnupg_keyring): + assert gnupg.list_keys(keys=key_a_fp) + assert gnupg_keyring.list_keys(keys=key_a_fp) + res = gpg.delete_key( + fingerprint=key_a_fp, + gnupghome=str(gpghome), + keyring=keyring, + use_passphrase=False, + ) + assert res["res"] + assert gnupg.list_keys(keys=key_a_fp) + assert not gnupg_keyring.list_keys(keys=key_a_fp) + + +@pytest.mark.usefixtures("pubkeys_present") +def test_get_key(gpghome, gpg, key_a_fp): + res = gpg.get_key(fingerprint=key_a_fp, gnupghome=str(gpghome)) + assert res + assert "keyid" in res + assert res["keyid"] == key_a_fp[-16:] + assert "keyLength" in res + assert res["keyLength"] == "1024" + + +def test_get_key_from_keyring(gpghome, gpg, key_a_fp, keyring, gnupg): + assert not gnupg.list_keys() + res = gpg.get_key(fingerprint=key_a_fp, gnupghome=str(gpghome), keyring=keyring) + assert res + assert "keyid" in res + assert res["keyid"] == key_a_fp[-16:] + assert "keyLength" in res + assert res["keyLength"] == "1024" + + +@pytest.mark.usefixtures("privkeys_present") +@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails") +def test_get_secret_key(gpghome, gpg, key_a_fp): + res = gpg.get_secret_key(fingerprint=key_a_fp, gnupghome=str(gpghome)) + assert res + assert "keyid" in res + assert res["keyid"] == key_a_fp[-16:] + assert "keyLength" in res + assert res["keyLength"] == "1024" + + +@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails") +def test_get_secret_key_from_keyring(gpghome, gpg, key_a_fp, keyring_privkeys, gnupg): + assert not gnupg.list_keys(keys=key_a_fp, secret=True) + res = gpg.get_secret_key( + fingerprint=key_a_fp, gnupghome=str(gpghome), keyring=keyring_privkeys + ) + assert res + assert "keyid" in res + assert res["keyid"] == key_a_fp[-16:] + assert "keyLength" in res + assert res["keyLength"] == "1024" + + +def test_import_key(gpghome, gnupg, gpg, key_a_pub, key_a_fp): + assert not gnupg.list_keys(keys=key_a_fp) + res = gpg.import_key(text=key_a_pub, gnupghome=str(gpghome)) + assert res + assert res["res"] + assert "Successfully imported" in res["message"] + assert gnupg.list_keys(keys=key_a_fp) + + +@pytest.mark.parametrize("keyring", [""], indirect=True) +def test_import_key_to_keyring( + gpghome, gnupg, gpg, key_d_pub, key_d_fp, keyring, gnupg_keyring +): + assert not gnupg.list_keys(keys=key_d_fp) + assert not gnupg_keyring.list_keys(keys=key_d_fp) + res = gpg.import_key(text=key_d_pub, gnupghome=str(gpghome), keyring=keyring) + assert res + assert res["res"] + assert "Successfully imported" in res["message"] + assert not gnupg.list_keys(keys=key_d_fp) + assert gnupg_keyring.list_keys(keys=key_d_fp) + + +@pytest.mark.usefixtures("pubkeys_present") +def test_export_key(gpghome, gpg, key_a_fp): + res = gpg.export_key(keyids=key_a_fp, gnupghome=str(gpghome)) + assert res["res"] + assert res["comment"].startswith("-----BEGIN PGP PUBLIC KEY BLOCK-----") + assert res["comment"].endswith("-----END PGP PUBLIC KEY BLOCK-----\n") + + +def test_export_key_from_keyring(gpghome, gnupg, gpg, key_a_fp, keyring, gnupg_keyring): + assert not gnupg.list_keys(keys=key_a_fp) + assert gnupg_keyring.list_keys(keys=key_a_fp) + res = gpg.export_key(keyids=key_a_fp, gnupghome=str(gpghome), keyring=keyring) + assert res["res"] + assert res["comment"].startswith("-----BEGIN PGP PUBLIC KEY BLOCK-----") + assert res["comment"].endswith("-----END PGP PUBLIC KEY BLOCK-----\n") + + +@pytest.mark.usefixtures("privkeys_present") +@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails") +@pytest.mark.requires_random_entropy() +def test_sign(gpghome, gpg, gnupg, key_a_fp): + assert gnupg.list_keys(secret=True, keys=key_a_fp) + res = gpg.sign(text="foo", keyid=key_a_fp, gnupghome=str(gpghome)) + assert res + assert res.startswith(b"-----BEGIN PGP SIGNED MESSAGE-----") + assert res.endswith(b"-----END PGP SIGNATURE-----\n") + + +@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails") +@pytest.mark.requires_random_entropy() +def test_sign_with_keyring( + gpghome, gpg, gnupg, key_a_fp, gnupg_privkeyring, keyring_privkeys +): + assert not gnupg.list_keys(keys=key_a_fp, secret=True) + assert gnupg_privkeyring.list_keys(keys=key_a_fp, secret=True) + res = gpg.sign( + text="foo", keyid=key_a_fp, gnupghome=str(gpghome), keyring=keyring_privkeys + ) + assert res + assert res.startswith(b"-----BEGIN PGP SIGNED MESSAGE-----") + assert res.endswith(b"-----END PGP SIGNATURE-----\n") + + @pytest.mark.parametrize( "sig,expected", [ @@ -496,3 +754,76 @@ def test_verify(gpghome, gpg, sig, signed_data, key_a_fp): assert "is verified" in res["message"] assert "key_id" in res assert res["key_id"] == key_a_fp[-16:] + + +def test_verify_with_keyring(gpghome, gnupg, gpg, keyring, sig, signed_data, key_a_fp): + assert not gnupg.list_keys(keys=key_a_fp) + res = gpg.verify( + filename=str(signed_data), + signature=str(sig), + gnupghome=str(gpghome), + keyring=keyring, + ) + assert res["res"] + assert "is verified" in res["message"] + assert "key_id" in res + assert res["key_id"] == key_a_fp[-16:] + + +@pytest.mark.usefixtures("pubkeys_present") +@pytest.mark.requires_random_entropy() +def test_encrypt(gpghome, gpg, gnupg, key_b_fp): + assert gnupg.list_keys(keys=key_b_fp) + res = gpg.encrypt( + text="I like turtles", + recipients=key_b_fp, + gnupghome=str(gpghome), + always_trust=True, + ) + assert res + assert res["res"] + assert res["comment"] + assert res["comment"].startswith(b"-----BEGIN PGP MESSAGE-----") + assert res["comment"].endswith(b"-----END PGP MESSAGE-----\n") + + +@pytest.mark.requires_random_entropy() +def test_encrypt_with_keyring(gpghome, gpg, gnupg, key_a_fp, keyring, gnupg_keyring): + assert not gnupg.list_keys(keys=key_a_fp) + assert gnupg_keyring.list_keys(keys=key_a_fp) + res = gpg.encrypt( + text="I like turtles", + recipients=key_a_fp, + gnupghome=str(gpghome), + keyring=keyring, + always_trust=True, + ) + assert res + assert res["res"] + assert res["comment"] + assert res["comment"].startswith(b"-----BEGIN PGP MESSAGE-----") + assert res["comment"].endswith(b"-----END PGP MESSAGE-----\n") + + +@pytest.mark.usefixtures("privkeys_present") +@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails") +def test_decrypt(gpghome, gpg, gnupg, secret_message, key_a_fp): + assert gnupg.list_keys(secret=True, keys=key_a_fp) + res = gpg.decrypt(text=secret_message, gnupghome=str(gpghome)) + assert res["res"] + assert res["comment"] + assert res["comment"] == b"I like turtles" + + +@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails") +def test_decrypt_with_keyring( + gpghome, gpg, gnupg, gnupg_privkeyring, keyring_privkeys, secret_message, key_a_fp +): + assert not gnupg.list_keys(secret=True, keys=key_a_fp) + assert gnupg_privkeyring.list_keys(secret=True, keys=key_a_fp) + res = gpg.decrypt( + text=secret_message, gnupghome=str(gpghome), keyring=keyring_privkeys + ) + assert res["res"] + assert res["comment"] + assert res["comment"] == b"I like turtles" diff --git a/tests/pytests/functional/states/test_gpg.py b/tests/pytests/functional/states/test_gpg.py index 80e86aa3a7b..299b91ffa14 100644 --- a/tests/pytests/functional/states/test_gpg.py +++ b/tests/pytests/functional/states/test_gpg.py @@ -1,5 +1,6 @@ import shutil import subprocess +from pathlib import Path import psutil import pytest @@ -103,6 +104,20 @@ def _pubkeys_present(gnupg, request): # cleanup is taken care of by gpghome and tmp_path +@pytest.fixture(params=["a"]) +def keyring(gpghome, tmp_path, request): + keyring = tmp_path / "keys.gpg" + _gnupg_keyring = gnupglib.GPG(gnupghome=str(gpghome), keyring=str(keyring)) + pubkeys = [request.getfixturevalue(f"key_{x}_pub") for x in request.param] + fingerprints = [request.getfixturevalue(f"key_{x}_fp") for x in request.param] + _gnupg_keyring.import_keys("\n".join(pubkeys)) + present_keys = _gnupg_keyring.list_keys() + for fp in fingerprints: + assert any(x["fingerprint"] == fp for x in present_keys) + yield str(keyring) + # cleanup is taken care of by gpghome and tmp_path + + @pytest.mark.usefixtures("_pubkeys_present") def test_gpg_present_no_changes(gpghome, gpg, gnupg, key_a_fp): assert gnupg.list_keys(keys=key_a_fp) @@ -113,6 +128,18 @@ def test_gpg_present_no_changes(gpghome, gpg, gnupg, key_a_fp): assert not ret.changes +def test_gpg_present_keyring_no_changes( + gpghome, gpg, gnupg, gnupg_keyring, keyring, key_a_fp +): + assert not gnupg.list_keys(keys=key_a_fp) + assert gnupg_keyring.list_keys(keys=key_a_fp) + ret = gpg.present( + key_a_fp[-16:], gnupghome=str(gpghome), keyserver="nonexistent", keyring=keyring + ) + assert ret.result + assert not ret.changes + + @pytest.mark.usefixtures("_pubkeys_present") def test_gpg_absent(gpghome, gpg, gnupg, key_a_fp): assert gnupg.list_keys(keys=key_a_fp) @@ -140,3 +167,30 @@ def test_gpg_absent_test_mode_no_changes(gpghome, gpg, gnupg, key_a_fp): assert "deleted" in ret.changes assert ret.changes["deleted"] assert gnupg.list_keys(keys=key_a_fp) + + +@pytest.mark.usefixtures("_pubkeys_present") +def test_gpg_absent_from_keyring(gpghome, gpg, gnupg, gnupg_keyring, keyring, key_a_fp): + assert gnupg.list_keys(keys=key_a_fp) + assert gnupg_keyring.list_keys(keys=key_a_fp) + ret = gpg.absent(key_a_fp[-16:], gnupghome=str(gpghome), keyring=keyring) + assert ret.result + assert ret.changes + assert gnupg.list_keys(keys=key_a_fp) + assert not gnupg_keyring.list_keys(keys=key_a_fp) + + +@pytest.mark.parametrize("keyring", [""], indirect=True) +def test_gpg_absent_from_keyring_delete_keyring( + gpghome, gpg, gnupg, gnupg_keyring, keyring, key_a_fp +): + assert not gnupg_keyring.list_keys() + assert Path(keyring).exists() + ret = gpg.absent( + "abc", gnupghome=str(gpghome), keyring=keyring, keyring_absent_if_empty=True + ) + assert ret.result + assert ret.changes + assert "removed" in ret.changes + assert ret.changes["removed"] == keyring + assert not Path(keyring).exists() diff --git a/tests/pytests/unit/states/test_gpg.py b/tests/pytests/unit/states/test_gpg.py index 13ba97cd493..7250bdabf69 100644 --- a/tests/pytests/unit/states/test_gpg.py +++ b/tests/pytests/unit/states/test_gpg.py @@ -162,3 +162,65 @@ def test_gpg_present_test_mode_no_changes(gpg_receive, gpg_trust, key, trust): gpg_trust.assert_not_called() assert ret["result"] is None assert ret["changes"] + + +@pytest.mark.usefixtures("gpg_list_keys") +def test_gpg_absent_no_changes(gpg_delete): + ret = gpg.absent("nonexistent") + assert ret["result"] + assert not ret["changes"] + gpg_delete.assert_not_called() + + +@pytest.mark.usefixtures("gpg_list_keys") +@pytest.mark.parametrize( + "gpg_delete,expected", + [ + ({"res": True, "message": ["Public key for A deleted"]}, True), + ( + { + "res": False, + "message": [ + "Secret key exists, delete first or pass delete_secret=True." + ], + }, + False, + ), + ], + indirect=["gpg_delete"], +) +def test_gpg_absent_delete_key(gpg_delete, expected, keys_list): + list_ = Mock(spec="salt.modules.gpg.list_keys") + list_.side_effect = (keys_list, [x for x in keys_list if x["keyid"] != "A"]) + with patch.dict(gpg.__salt__, {"gpg.list_keys": list_}): + ret = gpg.absent("A") + assert ret["result"] == expected + assert bool(ret["changes"]) == expected + gpg_delete.assert_called_once() + + +@pytest.mark.usefixtures("gpg_list_keys") +def test_gpg_absent_test_mode_no_changes(gpg_delete): + with patch.dict(gpg.__opts__, {"test": True}): + ret = gpg.absent("A") + gpg_delete.assert_not_called() + assert ret["result"] is None + assert bool(ret["changes"]) + + +def test_gpg_absent_list_keys_with_gnupghome_and_user(gpg_list_keys): + gnupghome = "/pls_respect_me" + user = "imthereaswell" + gpg.absent("nonexistent", gnupghome=gnupghome, user=user) + gpg_list_keys.assert_called_with(gnupghome=gnupghome, user=user, keyring=None) + + +@pytest.mark.usefixtures("gpg_list_keys") +def test_gpg_absent_delete_key_called_with_correct_kwargs(gpg_delete): + key = "A" + user = "hellothere" + gnupghome = "/pls_sir" + gpg.absent(key, user=user, gnupghome=gnupghome) + gpg_delete.assert_called_with( + keyid=key, gnupghome=gnupghome, user=user, keyring=None, use_passphrase=False + )