Add keyring, gnupghome to rest, add functional tests

This commit is contained in:
jeanluc 2022-12-02 01:15:10 +01:00 committed by Daniel Wozniak
parent d7549bd155
commit 32cfcfd777
5 changed files with 682 additions and 46 deletions

View file

@ -154,11 +154,17 @@ def _restore_ownership(func):
userinfo = _get_user_info(user)
run_user = _get_user_info()
if userinfo["uid"] != run_user["uid"] and os.path.exists(gnupghome):
# Given user is different from one who runs Salt process,
# need to fix ownership permissions for GnuPG home dir
group = __salt__["file.gid_to_group"](run_user["gid"])
for path in [gnupghome] + __salt__["file.find"](gnupghome):
if userinfo["uid"] != run_user["uid"]:
group = None
if os.path.exists(gnupghome):
# Given user is different from one who runs Salt process,
# need to fix ownership permissions for GnuPG home dir
group = __salt__["file.gid_to_group"](run_user["gid"])
for path in [gnupghome] + __salt__["file.find"](gnupghome):
__salt__["file.chown"](path, run_user["name"], group)
if "keyring" in kwargs and os.path.exists(kwargs["keyring"]):
if group is None:
group = __salt__["file.gid_to_group"](run_user["gid"])
__salt__["file.chown"](path, run_user["name"], group)
# Filter special kwargs
@ -172,38 +178,39 @@ def _restore_ownership(func):
group = __salt__["file.gid_to_group"](userinfo["gid"])
for path in [gnupghome] + __salt__["file.find"](gnupghome):
__salt__["file.chown"](path, user, group)
if "keyring" in kwargs and os.path.exists(kwargs["keyring"]):
__salt__["file.chown"](path, user, group)
return ret
return func_wrapper
def _create_gpg(user=None, gnupghome=None):
def _create_gpg(user=None, gnupghome=None, keyring=None):
"""
Create the GPG object
"""
if not gnupghome:
gnupghome = _get_user_gnupghome(user)
gpg = gnupg.GPG(gnupghome=gnupghome)
gpg = gnupg.GPG(gnupghome=gnupghome, keyring=keyring)
return gpg
def _list_keys(user=None, gnupghome=None, secret=False):
def _list_keys(secret=False, user=None, gnupghome=None, keyring=None):
"""
Helper function for Listing keys
"""
gpg = _create_gpg(user, gnupghome)
gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring)
_keys = gpg.list_keys(secret)
return _keys
def _search_keys(text, keyserver, user=None):
def _search_keys(text, keyserver, user=None, gnupghome=None):
"""
Helper function for searching keys from keyserver
"""
gpg = _create_gpg(user)
gpg = _create_gpg(user=user, gnupghome=gnupghome)
if keyserver:
_keys = gpg.search_keys(text, keyserver)
else:
@ -211,7 +218,7 @@ def _search_keys(text, keyserver, user=None):
return _keys
def search_keys(text, keyserver=None, user=None):
def search_keys(text, keyserver=None, user=None, gnupghome=None):
"""
Search keys from keyserver
@ -226,6 +233,11 @@ def search_keys(text, keyserver=None, user=None):
Passing the user as ``salt`` will set the GnuPG home directory to the
``/etc/salt/gpgkeys``.
gnupghome
Specify the location where GPG keyring and related files are stored.
.. versionadded:: 3006
CLI Example:
.. code-block:: bash
@ -241,7 +253,7 @@ def search_keys(text, keyserver=None, user=None):
keyserver = _DEFAULT_KEY_SERVER
_keys = []
for _key in _search_keys(text, keyserver, user):
for _key in _search_keys(text, keyserver, user=user, gnupghome=gnupghome):
tmp = {"keyid": _key["keyid"], "uids": _key["uids"]}
expires = _key.get("expires", None)
@ -262,7 +274,7 @@ def search_keys(text, keyserver=None, user=None):
return _keys
def list_keys(user=None, gnupghome=None):
def list_keys(user=None, gnupghome=None, keyring=None):
"""
List keys in GPG keychain
@ -274,6 +286,12 @@ def list_keys(user=None, gnupghome=None):
gnupghome
Specify the location where GPG keyring and related files are stored.
keyring
Limit the operation to this specific keyring, specified as
a local filesystem path.
.. versionadded:: 3006
CLI Example:
.. code-block:: bash
@ -282,7 +300,7 @@ def list_keys(user=None, gnupghome=None):
"""
_keys = []
for _key in _list_keys(user, gnupghome):
for _key in _list_keys(user=user, gnupghome=gnupghome, keyring=keyring):
tmp = {
"keyid": _key["keyid"],
"fingerprint": _key["fingerprint"],
@ -313,7 +331,7 @@ def list_keys(user=None, gnupghome=None):
return _keys
def list_secret_keys(user=None, gnupghome=None):
def list_secret_keys(user=None, gnupghome=None, keyring=None):
"""
List secret keys in GPG keychain
@ -325,6 +343,12 @@ def list_secret_keys(user=None, gnupghome=None):
gnupghome
Specify the location where GPG keyring and related files are stored.
keyring
Limit the operation to this specific keyring, specified as
a local filesystem path.
.. versionadded:: 3006
CLI Example:
.. code-block:: bash
@ -333,7 +357,9 @@ def list_secret_keys(user=None, gnupghome=None):
"""
_keys = []
for _key in _list_keys(user, gnupghome, secret=True):
for _key in _list_keys(
user=user, gnupghome=gnupghome, keyring=keyring, secret=True
):
tmp = {
"keyid": _key["keyid"],
"fingerprint": _key["fingerprint"],
@ -377,6 +403,7 @@ def create_key(
use_passphrase=False,
user=None,
gnupghome=None,
keyring=None,
):
"""
Create a key in the GPG keychain
@ -433,6 +460,12 @@ def create_key(
gnupghome
Specify the location where GPG keyring and related files are stored.
keyring
Limit the operation to this specific keyring, specified as
a local filesystem path.
.. versionadded:: 3006
CLI Example:
.. code-block:: bash
@ -449,7 +482,7 @@ def create_key(
"name_comment": name_comment,
}
gpg = _create_gpg(user, gnupghome)
gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring)
if name_email:
create_params["name_email"] = name_email
@ -502,6 +535,7 @@ def delete_key(
user=None,
gnupghome=None,
use_passphrase=True,
keyring=None,
):
"""
Delete a key from the GPG keychain.
@ -530,6 +564,12 @@ def delete_key(
.. versionadded:: 3003
keyring
Limit the operation to this specific keyring, specified as
a local filesystem path.
.. versionadded:: 3006
CLI Example:
.. code-block:: bash
@ -555,8 +595,14 @@ def delete_key(
ret["message"] = "Required argument, fingerprint or keyid"
return ret
gpg = _create_gpg(user, gnupghome)
key = get_key(keyid=keyid, fingerprint=fingerprint, user=user, gnupghome=gnupghome)
gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring)
key = get_key(
keyid=keyid,
fingerprint=fingerprint,
user=user,
gnupghome=gnupghome,
keyring=keyring,
)
def __delete_key(fingerprint, secret, use_passphrase):
if secret and use_passphrase:
@ -571,7 +617,13 @@ def delete_key(
if key:
fingerprint = key["fingerprint"]
skey = get_secret_key(keyid, fingerprint, user, gnupghome=gnupghome)
skey = get_secret_key(
keyid=keyid,
fingerprint=fingerprint,
user=user,
gnupghome=gnupghome,
keyring=keyring,
)
if skey:
if not delete_secret:
ret["res"] = False
@ -605,7 +657,7 @@ def delete_key(
return ret
def get_key(keyid=None, fingerprint=None, user=None, gnupghome=None):
def get_key(keyid=None, fingerprint=None, user=None, gnupghome=None, keyring=None):
"""
Get a key from the GPG keychain
@ -623,6 +675,12 @@ def get_key(keyid=None, fingerprint=None, user=None, gnupghome=None):
gnupghome
Specify the location where GPG keyring and related files are stored.
keyring
Limit the operation to this specific keyring, specified as
a local filesystem path.
.. versionadded:: 3006
CLI Example:
.. code-block:: bash
@ -635,7 +693,7 @@ def get_key(keyid=None, fingerprint=None, user=None, gnupghome=None):
"""
tmp = {}
for _key in _list_keys(user, gnupghome):
for _key in _list_keys(user=user, gnupghome=gnupghome, keyring=keyring):
if (
_key["fingerprint"] == fingerprint
or _key["keyid"] == keyid
@ -671,7 +729,9 @@ def get_key(keyid=None, fingerprint=None, user=None, gnupghome=None):
return tmp
def get_secret_key(keyid=None, fingerprint=None, user=None, gnupghome=None):
def get_secret_key(
keyid=None, fingerprint=None, user=None, gnupghome=None, keyring=None
):
"""
Get a key from the GPG keychain
@ -689,6 +749,12 @@ def get_secret_key(keyid=None, fingerprint=None, user=None, gnupghome=None):
gnupghome
Specify the location where GPG keyring and related files are stored.
keyring
Limit the operation to this specific keyring, specified as
a local filesystem path.
.. versionadded:: 3006
CLI Example:
.. code-block:: bash
@ -701,7 +767,9 @@ def get_secret_key(keyid=None, fingerprint=None, user=None, gnupghome=None):
"""
tmp = {}
for _key in _list_keys(user, gnupghome, secret=True):
for _key in _list_keys(
user=user, gnupghome=gnupghome, keyring=keyring, secret=True
):
if (
_key["fingerprint"] == fingerprint
or _key["keyid"] == keyid
@ -738,7 +806,7 @@ def get_secret_key(keyid=None, fingerprint=None, user=None, gnupghome=None):
@_restore_ownership
def import_key(text=None, filename=None, user=None, gnupghome=None):
def import_key(text=None, filename=None, user=None, gnupghome=None, keyring=None):
r"""
Import a key from text or file
@ -756,6 +824,12 @@ def import_key(text=None, filename=None, user=None, gnupghome=None):
gnupghome
Specify the location where GPG keyring and related files are stored.
keyring
Limit the operation to this specific keyring, specified as
a local filesystem path.
.. versionadded:: 3006
CLI Example:
.. code-block:: bash
@ -766,11 +840,11 @@ def import_key(text=None, filename=None, user=None, gnupghome=None):
"""
ret = {"res": True, "message": ""}
gpg = _create_gpg(user, gnupghome)
if not text and not filename:
raise SaltInvocationError("filename or text must be passed.")
gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring)
if filename:
try:
with salt.utils.files.flopen(filename, "rb") as _fp:
@ -801,6 +875,7 @@ def export_key(
use_passphrase=False,
output=None,
bare=False,
keyring=None,
):
"""
Export a key from the GPG keychain
@ -838,6 +913,12 @@ def export_key(
.. versionadded:: 3006.0
keyring
Limit the operation to this specific keyring, specified as
a local filesystem path.
.. versionadded:: 3006
CLI Example:
.. code-block:: bash
@ -850,7 +931,7 @@ def export_key(
"""
ret = {"res": True}
gpg = _create_gpg(user, gnupghome)
gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring)
if isinstance(keyids, str):
keyids = keyids.split(",")
@ -887,7 +968,7 @@ def export_key(
@_restore_ownership
def receive_keys(keyserver=None, keys=None, user=None, gnupghome=None):
def receive_keys(keyserver=None, keys=None, user=None, gnupghome=None, keyring=None):
"""
Receive key(s) from keyserver and add them to keychain
@ -906,6 +987,12 @@ def receive_keys(keyserver=None, keys=None, user=None, gnupghome=None):
gnupghome
Specify the location where GPG keyring and related files are stored.
keyring
Limit the operation to this specific keyring, specified as
a local filesystem path.
.. versionadded:: 3006
CLI Example:
.. code-block:: bash
@ -919,7 +1006,7 @@ def receive_keys(keyserver=None, keys=None, user=None, gnupghome=None):
"""
ret = {"res": True, "message": []}
gpg = _create_gpg(user, gnupghome)
gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring)
if not keyserver:
keyserver = _DEFAULT_KEY_SERVER
@ -1060,6 +1147,7 @@ def sign(
output=None,
use_passphrase=False,
gnupghome=None,
keyring=None,
):
"""
Sign message or file
@ -1070,7 +1158,7 @@ def sign(
``/etc/salt/gpgkeys``.
keyid
The keyid of the key to set the trust level for, defaults to
The keyid of the key to use for signing, defaults to
first key in the secret keyring.
text
@ -1089,6 +1177,12 @@ def sign(
gnupghome
Specify the location where GPG keyring and related files are stored.
keyring
Limit the operation to this specific keyring, specified as
a local filesystem path.
.. versionadded:: 3006
CLI Example:
.. code-block:: bash
@ -1100,7 +1194,6 @@ def sign(
salt '*' gpg.sign filename='/path/to/important.file' use_passphrase=True
"""
gpg = _create_gpg(user, gnupghome)
if use_passphrase:
gpg_passphrase = __salt__["pillar.get"]("gpg_passphrase")
if not gpg_passphrase:
@ -1108,6 +1201,8 @@ def sign(
else:
gpg_passphrase = None
gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring)
if text:
signed_data = gpg.sign(text, keyid=keyid, passphrase=gpg_passphrase)
elif filename:
@ -1131,6 +1226,7 @@ def verify(
trustmodel=None,
signed_by_any=None,
signed_by_all=None,
keyring=None,
):
"""
Verify a message or file
@ -1182,6 +1278,12 @@ def verify(
.. versionadded:: 3007.0
keyring
Limit the operation to this specific keyring, specified as
a local filesystem path.
.. versionadded:: 3006
CLI Example:
.. code-block:: bash
@ -1192,7 +1294,6 @@ def verify(
salt '*' gpg.verify filename='/path/to/important.file' trustmodel=direct
"""
gpg = _create_gpg(user, gnupghome)
trustmodels = ("pgp", "classic", "tofu", "tofu+pgp", "direct", "always", "auto")
if trustmodel and trustmodel not in trustmodels:
@ -1202,6 +1303,7 @@ def verify(
log.warning(msg)
return {"res": False, "message": msg}
gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring)
extra_args = []
if trustmodel:
@ -1328,6 +1430,7 @@ def encrypt(
always_trust=False,
gnupghome=None,
bare=False,
keyring=None,
):
"""
Encrypt a message or file
@ -1370,6 +1473,12 @@ def encrypt(
If ``True``, return the (armored) encrypted block as a string without
the standard comment/res dict.
keyring
Limit the operation to this specific keyring, specified as
a local filesystem path.
.. versionadded:: 3006
CLI Example:
.. code-block:: bash
@ -1383,8 +1492,6 @@ def encrypt(
"""
ret = {"res": True, "comment": ""}
gpg = _create_gpg(user, gnupghome)
if sign and use_passphrase:
gpg_passphrase = __salt__["pillar.get"]("gpg_passphrase")
if not gpg_passphrase:
@ -1392,6 +1499,8 @@ def encrypt(
else:
gpg_passphrase = None
gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring)
if text:
result = gpg.encrypt(
text,
@ -1444,6 +1553,7 @@ def decrypt(
use_passphrase=False,
gnupghome=None,
bare=False,
keyring=None,
):
"""
Decrypt a message or file
@ -1473,6 +1583,12 @@ def decrypt(
If ``True``, return the (armored) decrypted block as a string without the
standard comment/res dict.
keyring
Limit the operation to this specific keyring, specified as
a local filesystem path.
.. versionadded:: 3006
CLI Example:
.. code-block:: bash
@ -1483,7 +1599,6 @@ def decrypt(
"""
ret = {"res": True, "comment": ""}
gpg = _create_gpg(user, gnupghome)
if use_passphrase:
gpg_passphrase = __salt__["pillar.get"]("gpg_passphrase")
if not gpg_passphrase:
@ -1491,6 +1606,8 @@ def decrypt(
else:
gpg_passphrase = None
gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring)
if text:
result = gpg.decrypt(text, passphrase=gpg_passphrase)
elif filename:

View file

@ -32,7 +32,14 @@ TRUST_MAP = {
def present(
name, keys=None, user=None, keyserver=None, gnupghome=None, trust=None, **kwargs
name,
keys=None,
user=None,
keyserver=None,
gnupghome=None,
trust=None,
keyring=None,
**kwargs,
):
"""
Ensure a GPG public key is present in the GPG keychain.
@ -57,11 +64,19 @@ def present(
ignored by default. Valid trust levels:
expired, unknown, not_trusted, marginally,
fully, ultimately
keyring
Limit the operation to this specific keyring, specified as
a local filesystem path.
.. versionadded:: 3006
"""
ret = {"name": name, "result": True, "changes": {}, "comment": []}
_current_keys = __salt__["gpg.list_keys"](user=user, gnupghome=gnupghome)
_current_keys = __salt__["gpg.list_keys"](
user=user, gnupghome=gnupghome, keyring=keyring
)
current_keys = {}
for key in _current_keys:
@ -123,10 +138,11 @@ def present(
)
continue
result = __salt__["gpg.receive_keys"](
keyserver,
key,
user,
gnupghome,
keyserver=keyserver,
keys=key,
user=user,
gnupghome=gnupghome,
keyring=keyring,
)
if result["res"] is False:
ret["result"] = result["res"]
@ -156,7 +172,15 @@ def present(
return ret
def absent(name, keys=None, user=None, gnupghome=None, **kwargs):
def absent(
name,
keys=None,
user=None,
gnupghome=None,
keyring=None,
keyring_absent_if_empty=False,
**kwargs,
):
"""
Ensure a GPG public key is absent from the keychain.
@ -171,11 +195,25 @@ def absent(name, keys=None, user=None, gnupghome=None, **kwargs):
gnupghome
Override GnuPG home directory.
keyring
Limit the operation to this specific keyring, specified as
a local filesystem path.
.. versionadded:: 3006
keyring_absent_if_empty
Make sure to not leave behind an empty keyring file
if ``keyring`` was specified. Defaults to false.
.. versionadded:: 3006
"""
ret = {"name": name, "result": True, "changes": {}, "comment": []}
_current_keys = __salt__["gpg.list_keys"](user=user, gnupghome=gnupghome)
_current_keys = __salt__["gpg.list_keys"](
user=user, gnupghome=gnupghome, keyring=keyring
)
current_keys = []
for key in _current_keys:
@ -198,6 +236,8 @@ def absent(name, keys=None, user=None, gnupghome=None, **kwargs):
keyid=key,
user=user,
gnupghome=gnupghome,
keyring=keyring,
use_passphrase=False,
)
if result["res"] is False:
ret["result"] = result["res"]
@ -207,5 +247,37 @@ def absent(name, keys=None, user=None, gnupghome=None, **kwargs):
salt.utils.dictupdate.append_dict_key_value(ret, "changes:deleted", key)
else:
ret["comment"].append(f"{key} not found in GPG keychain")
if __opts__["test"] or not ret["result"]:
return ret
_new_keys = [
x["keyid"]
for x in __salt__["gpg.list_keys"](
user=user, gnupghome=gnupghome, keyring=keyring
)
]
if set(keys) & set(_new_keys):
remaining = set(keys) & set(_new_keys)
ret["result"] = False
ret["comment"].append(
"State check revealed the following keys could not be deleted: "
+ ", ".join(remaining)
)
ret["changes"]["deleted"] = list(
set(ret["changes"]["deleted"]) - set(_new_keys)
)
elif (
not _new_keys
and keyring
and keyring_absent_if_empty
and __salt__["file.file_exists"](keyring)
):
__salt__["file.remove"](keyring)
ret["comment"].append(f"Removed keyring file {keyring}")
ret["changes"]["removed"] = keyring
ret["comment"] = "\n".join(ret["comment"])
return ret

View file

@ -350,6 +350,20 @@ kBGl+/D1MBJLt6q8GZWHMWIHOX4GN28A/PEemaKg3dZHEtPM3w==
-----END PGP SIGNATURE-----"""
@pytest.fixture
def secret_message():
return """\
-----BEGIN PGP MESSAGE-----
hIwDVTqCoFjAx5UBA/wIt5OUfsKV2VPB2P+c6r7xVvPIPiA1FjNpU2x1G8A/dxVq
kAOhXJ9KkM6yon0PJReF3w8QPgZCo5tCmwqMtin4OY/WTw1ExyIWIaS7XJh1ktPM
TJL7RpyeywGHiAveLs9rznZtVwi0xg+rTSWpoMS/8GbKpOyf3twWMsiFfndr09JJ
ASWYXtfsUT3IVA5dP0Mr3/Yg0v90d+X2RqUHM+sUiUtwh4mb+vUcm7UOQRyGAR4V
h7jTNclSQwWCGzx6OaWKnrCVafRXbH4aeA==
=tw4x
-----END PGP MESSAGE-----"""
@pytest.fixture(params=["a"])
def sig(request, tmp_path):
sigs = "\n".join(request.getfixturevalue(f"key_{x}_sig") for x in request.param)
@ -364,6 +378,16 @@ def gnupg(gpghome):
return gnupglib.GPG(gnupghome=str(gpghome))
@pytest.fixture
def gnupg_keyring(gpghome, keyring):
return gnupglib.GPG(gnupghome=str(gpghome), keyring=keyring)
@pytest.fixture
def gnupg_privkeyring(gpghome, keyring_privkeys):
return gnupglib.GPG(gnupghome=str(gpghome), keyring=keyring_privkeys)
@pytest.fixture(params=["abcde"])
def pubkeys_present(gnupg, request):
pubkeys = [request.getfixturevalue(f"key_{x}_pub") for x in request.param]
@ -376,6 +400,240 @@ def pubkeys_present(gnupg, request):
# cleanup is taken care of by gpghome and tmp_path
@pytest.fixture(params=["ab"])
def privkeys_present(gnupg, request):
privkeys = [request.getfixturevalue(f"key_{x}_priv") for x in request.param]
fingerprints = [request.getfixturevalue(f"key_{x}_fp") for x in request.param]
res = gnupg.import_keys("\n".join(privkeys))
assert set(res.fingerprints) == set(fingerprints)
present_keys = gnupg.list_keys(secret=True)
assert present_keys
for fp in fingerprints:
assert any(x["fingerprint"] == fp for x in present_keys)
yield
# cleanup is taken care of by gpghome and tmp_path
@pytest.fixture(params=["a"])
def keyring(gpghome, tmp_path, request):
keyring = tmp_path / "keys.gpg"
_gnupg_keyring = gnupglib.GPG(gnupghome=str(gpghome), keyring=str(keyring))
pubkeys = [request.getfixturevalue(f"key_{x}_pub") for x in request.param]
fingerprints = [request.getfixturevalue(f"key_{x}_fp") for x in request.param]
_gnupg_keyring.import_keys("\n".join(pubkeys))
present_keys = _gnupg_keyring.list_keys()
for fp in fingerprints:
assert any(x["fingerprint"] == fp for x in present_keys)
yield str(keyring)
# cleanup is taken care of by gpghome and tmp_path
@pytest.fixture(params=["a"])
def keyring_privkeys(gpghome, gnupg, tmp_path, request):
keyring = tmp_path / "keys.gpg"
_gnupg_keyring = gnupglib.GPG(gnupghome=str(gpghome), keyring=str(keyring))
privkeys = [request.getfixturevalue(f"key_{x}_priv") for x in request.param]
fingerprints = [request.getfixturevalue(f"key_{x}_fp") for x in request.param]
_gnupg_keyring.import_keys("\n".join(privkeys))
present_privkeys = _gnupg_keyring.list_keys(secret=True)
assert present_privkeys
for fp in fingerprints:
assert any(x["fingerprint"] == fp for x in present_privkeys)
yield str(keyring)
# cleanup is taken care of by gpghome and tmp_path
@pytest.mark.usefixtures("pubkeys_present")
def test_list_keys(gpg, gpghome, gnupg):
res = gpg.list_keys(gnupghome=str(gpghome))
assert res
assert len(res) == len(gnupg.list_keys())
def test_list_keys_in_keyring(gpg, gpghome, keyring, gnupg_keyring):
res = gpg.list_keys(gnupghome=str(gpghome), keyring=keyring)
assert len(res) == len(gnupg_keyring.list_keys())
@pytest.mark.usefixtures("privkeys_present")
@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails")
def test_list_secret_keys(gpghome, gpg, gnupg):
res = gpg.list_secret_keys(gnupghome=str(gpghome))
assert len(res) == len(gnupg.list_keys(secret=True))
@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails")
def test_list_secret_keys_in_keyring(gpghome, gpg, keyring_privkeys, gnupg_privkeyring):
res = gpg.list_secret_keys(gnupghome=str(gpghome), keyring=keyring_privkeys)
assert len(res) == len(gnupg_privkeyring.list_keys(secret=True))
@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails")
@pytest.mark.requires_random_entropy()
def test_create_key(gpghome, gpg, gnupg):
res = gpg.create_key(gnupghome=str(gpghome))
assert res
assert "message" in res
assert "successfully generated" in res["message"]
assert "fingerprint" in res
assert res["fingerprint"]
assert gnupg.list_keys(secret=True, keys=res["fingerprint"])
@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails")
@pytest.mark.requires_random_entropy()
def test_create_key_in_keyring(gpghome, gpg, gnupg, keyring, gnupg_keyring):
res = gpg.create_key(gnupghome=str(gpghome), keyring=keyring)
assert res
assert "message" in res
assert "successfully generated" in res["message"]
assert "fingerprint" in res
assert res["fingerprint"]
assert not gnupg.list_keys(secret=True, keys=res["fingerprint"])
assert gnupg_keyring.list_keys(secret=True, keys=res["fingerprint"])
@pytest.mark.usefixtures("pubkeys_present")
@pytest.mark.skip_unless_on_linux(
reason="Complains about deleting private keys first when they are absent"
)
def test_delete_key(gpghome, gpg, gnupg, key_a_fp):
assert gnupg.list_keys(keys=key_a_fp)
res = gpg.delete_key(
fingerprint=key_a_fp, gnupghome=str(gpghome), use_passphrase=False
)
assert res["res"]
assert not gnupg.list_keys(keys=key_a_fp)
@pytest.mark.usefixtures("pubkeys_present")
@pytest.mark.skip_unless_on_linux(
reason="Complains about deleting private keys first when they are absent"
)
def test_delete_key_from_keyring(gpghome, gpg, key_a_fp, keyring, gnupg, gnupg_keyring):
assert gnupg.list_keys(keys=key_a_fp)
assert gnupg_keyring.list_keys(keys=key_a_fp)
res = gpg.delete_key(
fingerprint=key_a_fp,
gnupghome=str(gpghome),
keyring=keyring,
use_passphrase=False,
)
assert res["res"]
assert gnupg.list_keys(keys=key_a_fp)
assert not gnupg_keyring.list_keys(keys=key_a_fp)
@pytest.mark.usefixtures("pubkeys_present")
def test_get_key(gpghome, gpg, key_a_fp):
res = gpg.get_key(fingerprint=key_a_fp, gnupghome=str(gpghome))
assert res
assert "keyid" in res
assert res["keyid"] == key_a_fp[-16:]
assert "keyLength" in res
assert res["keyLength"] == "1024"
def test_get_key_from_keyring(gpghome, gpg, key_a_fp, keyring, gnupg):
assert not gnupg.list_keys()
res = gpg.get_key(fingerprint=key_a_fp, gnupghome=str(gpghome), keyring=keyring)
assert res
assert "keyid" in res
assert res["keyid"] == key_a_fp[-16:]
assert "keyLength" in res
assert res["keyLength"] == "1024"
@pytest.mark.usefixtures("privkeys_present")
@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails")
def test_get_secret_key(gpghome, gpg, key_a_fp):
res = gpg.get_secret_key(fingerprint=key_a_fp, gnupghome=str(gpghome))
assert res
assert "keyid" in res
assert res["keyid"] == key_a_fp[-16:]
assert "keyLength" in res
assert res["keyLength"] == "1024"
@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails")
def test_get_secret_key_from_keyring(gpghome, gpg, key_a_fp, keyring_privkeys, gnupg):
assert not gnupg.list_keys(keys=key_a_fp, secret=True)
res = gpg.get_secret_key(
fingerprint=key_a_fp, gnupghome=str(gpghome), keyring=keyring_privkeys
)
assert res
assert "keyid" in res
assert res["keyid"] == key_a_fp[-16:]
assert "keyLength" in res
assert res["keyLength"] == "1024"
def test_import_key(gpghome, gnupg, gpg, key_a_pub, key_a_fp):
assert not gnupg.list_keys(keys=key_a_fp)
res = gpg.import_key(text=key_a_pub, gnupghome=str(gpghome))
assert res
assert res["res"]
assert "Successfully imported" in res["message"]
assert gnupg.list_keys(keys=key_a_fp)
@pytest.mark.parametrize("keyring", [""], indirect=True)
def test_import_key_to_keyring(
gpghome, gnupg, gpg, key_d_pub, key_d_fp, keyring, gnupg_keyring
):
assert not gnupg.list_keys(keys=key_d_fp)
assert not gnupg_keyring.list_keys(keys=key_d_fp)
res = gpg.import_key(text=key_d_pub, gnupghome=str(gpghome), keyring=keyring)
assert res
assert res["res"]
assert "Successfully imported" in res["message"]
assert not gnupg.list_keys(keys=key_d_fp)
assert gnupg_keyring.list_keys(keys=key_d_fp)
@pytest.mark.usefixtures("pubkeys_present")
def test_export_key(gpghome, gpg, key_a_fp):
res = gpg.export_key(keyids=key_a_fp, gnupghome=str(gpghome))
assert res["res"]
assert res["comment"].startswith("-----BEGIN PGP PUBLIC KEY BLOCK-----")
assert res["comment"].endswith("-----END PGP PUBLIC KEY BLOCK-----\n")
def test_export_key_from_keyring(gpghome, gnupg, gpg, key_a_fp, keyring, gnupg_keyring):
assert not gnupg.list_keys(keys=key_a_fp)
assert gnupg_keyring.list_keys(keys=key_a_fp)
res = gpg.export_key(keyids=key_a_fp, gnupghome=str(gpghome), keyring=keyring)
assert res["res"]
assert res["comment"].startswith("-----BEGIN PGP PUBLIC KEY BLOCK-----")
assert res["comment"].endswith("-----END PGP PUBLIC KEY BLOCK-----\n")
@pytest.mark.usefixtures("privkeys_present")
@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails")
@pytest.mark.requires_random_entropy()
def test_sign(gpghome, gpg, gnupg, key_a_fp):
assert gnupg.list_keys(secret=True, keys=key_a_fp)
res = gpg.sign(text="foo", keyid=key_a_fp, gnupghome=str(gpghome))
assert res
assert res.startswith(b"-----BEGIN PGP SIGNED MESSAGE-----")
assert res.endswith(b"-----END PGP SIGNATURE-----\n")
@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails")
@pytest.mark.requires_random_entropy()
def test_sign_with_keyring(
gpghome, gpg, gnupg, key_a_fp, gnupg_privkeyring, keyring_privkeys
):
assert not gnupg.list_keys(keys=key_a_fp, secret=True)
assert gnupg_privkeyring.list_keys(keys=key_a_fp, secret=True)
res = gpg.sign(
text="foo", keyid=key_a_fp, gnupghome=str(gpghome), keyring=keyring_privkeys
)
assert res
assert res.startswith(b"-----BEGIN PGP SIGNED MESSAGE-----")
assert res.endswith(b"-----END PGP SIGNATURE-----\n")
@pytest.mark.parametrize(
"sig,expected",
[
@ -496,3 +754,76 @@ def test_verify(gpghome, gpg, sig, signed_data, key_a_fp):
assert "is verified" in res["message"]
assert "key_id" in res
assert res["key_id"] == key_a_fp[-16:]
def test_verify_with_keyring(gpghome, gnupg, gpg, keyring, sig, signed_data, key_a_fp):
assert not gnupg.list_keys(keys=key_a_fp)
res = gpg.verify(
filename=str(signed_data),
signature=str(sig),
gnupghome=str(gpghome),
keyring=keyring,
)
assert res["res"]
assert "is verified" in res["message"]
assert "key_id" in res
assert res["key_id"] == key_a_fp[-16:]
@pytest.mark.usefixtures("pubkeys_present")
@pytest.mark.requires_random_entropy()
def test_encrypt(gpghome, gpg, gnupg, key_b_fp):
assert gnupg.list_keys(keys=key_b_fp)
res = gpg.encrypt(
text="I like turtles",
recipients=key_b_fp,
gnupghome=str(gpghome),
always_trust=True,
)
assert res
assert res["res"]
assert res["comment"]
assert res["comment"].startswith(b"-----BEGIN PGP MESSAGE-----")
assert res["comment"].endswith(b"-----END PGP MESSAGE-----\n")
@pytest.mark.requires_random_entropy()
def test_encrypt_with_keyring(gpghome, gpg, gnupg, key_a_fp, keyring, gnupg_keyring):
assert not gnupg.list_keys(keys=key_a_fp)
assert gnupg_keyring.list_keys(keys=key_a_fp)
res = gpg.encrypt(
text="I like turtles",
recipients=key_a_fp,
gnupghome=str(gpghome),
keyring=keyring,
always_trust=True,
)
assert res
assert res["res"]
assert res["comment"]
assert res["comment"].startswith(b"-----BEGIN PGP MESSAGE-----")
assert res["comment"].endswith(b"-----END PGP MESSAGE-----\n")
@pytest.mark.usefixtures("privkeys_present")
@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails")
def test_decrypt(gpghome, gpg, gnupg, secret_message, key_a_fp):
assert gnupg.list_keys(secret=True, keys=key_a_fp)
res = gpg.decrypt(text=secret_message, gnupghome=str(gpghome))
assert res["res"]
assert res["comment"]
assert res["comment"] == b"I like turtles"
@pytest.mark.skip_unless_on_linux(reason="Test setup with private keys fails")
def test_decrypt_with_keyring(
gpghome, gpg, gnupg, gnupg_privkeyring, keyring_privkeys, secret_message, key_a_fp
):
assert not gnupg.list_keys(secret=True, keys=key_a_fp)
assert gnupg_privkeyring.list_keys(secret=True, keys=key_a_fp)
res = gpg.decrypt(
text=secret_message, gnupghome=str(gpghome), keyring=keyring_privkeys
)
assert res["res"]
assert res["comment"]
assert res["comment"] == b"I like turtles"

View file

@ -1,5 +1,6 @@
import shutil
import subprocess
from pathlib import Path
import psutil
import pytest
@ -103,6 +104,20 @@ def _pubkeys_present(gnupg, request):
# cleanup is taken care of by gpghome and tmp_path
@pytest.fixture(params=["a"])
def keyring(gpghome, tmp_path, request):
keyring = tmp_path / "keys.gpg"
_gnupg_keyring = gnupglib.GPG(gnupghome=str(gpghome), keyring=str(keyring))
pubkeys = [request.getfixturevalue(f"key_{x}_pub") for x in request.param]
fingerprints = [request.getfixturevalue(f"key_{x}_fp") for x in request.param]
_gnupg_keyring.import_keys("\n".join(pubkeys))
present_keys = _gnupg_keyring.list_keys()
for fp in fingerprints:
assert any(x["fingerprint"] == fp for x in present_keys)
yield str(keyring)
# cleanup is taken care of by gpghome and tmp_path
@pytest.mark.usefixtures("_pubkeys_present")
def test_gpg_present_no_changes(gpghome, gpg, gnupg, key_a_fp):
assert gnupg.list_keys(keys=key_a_fp)
@ -113,6 +128,18 @@ def test_gpg_present_no_changes(gpghome, gpg, gnupg, key_a_fp):
assert not ret.changes
def test_gpg_present_keyring_no_changes(
gpghome, gpg, gnupg, gnupg_keyring, keyring, key_a_fp
):
assert not gnupg.list_keys(keys=key_a_fp)
assert gnupg_keyring.list_keys(keys=key_a_fp)
ret = gpg.present(
key_a_fp[-16:], gnupghome=str(gpghome), keyserver="nonexistent", keyring=keyring
)
assert ret.result
assert not ret.changes
@pytest.mark.usefixtures("_pubkeys_present")
def test_gpg_absent(gpghome, gpg, gnupg, key_a_fp):
assert gnupg.list_keys(keys=key_a_fp)
@ -140,3 +167,30 @@ def test_gpg_absent_test_mode_no_changes(gpghome, gpg, gnupg, key_a_fp):
assert "deleted" in ret.changes
assert ret.changes["deleted"]
assert gnupg.list_keys(keys=key_a_fp)
@pytest.mark.usefixtures("_pubkeys_present")
def test_gpg_absent_from_keyring(gpghome, gpg, gnupg, gnupg_keyring, keyring, key_a_fp):
assert gnupg.list_keys(keys=key_a_fp)
assert gnupg_keyring.list_keys(keys=key_a_fp)
ret = gpg.absent(key_a_fp[-16:], gnupghome=str(gpghome), keyring=keyring)
assert ret.result
assert ret.changes
assert gnupg.list_keys(keys=key_a_fp)
assert not gnupg_keyring.list_keys(keys=key_a_fp)
@pytest.mark.parametrize("keyring", [""], indirect=True)
def test_gpg_absent_from_keyring_delete_keyring(
gpghome, gpg, gnupg, gnupg_keyring, keyring, key_a_fp
):
assert not gnupg_keyring.list_keys()
assert Path(keyring).exists()
ret = gpg.absent(
"abc", gnupghome=str(gpghome), keyring=keyring, keyring_absent_if_empty=True
)
assert ret.result
assert ret.changes
assert "removed" in ret.changes
assert ret.changes["removed"] == keyring
assert not Path(keyring).exists()

View file

@ -162,3 +162,65 @@ def test_gpg_present_test_mode_no_changes(gpg_receive, gpg_trust, key, trust):
gpg_trust.assert_not_called()
assert ret["result"] is None
assert ret["changes"]
@pytest.mark.usefixtures("gpg_list_keys")
def test_gpg_absent_no_changes(gpg_delete):
ret = gpg.absent("nonexistent")
assert ret["result"]
assert not ret["changes"]
gpg_delete.assert_not_called()
@pytest.mark.usefixtures("gpg_list_keys")
@pytest.mark.parametrize(
"gpg_delete,expected",
[
({"res": True, "message": ["Public key for A deleted"]}, True),
(
{
"res": False,
"message": [
"Secret key exists, delete first or pass delete_secret=True."
],
},
False,
),
],
indirect=["gpg_delete"],
)
def test_gpg_absent_delete_key(gpg_delete, expected, keys_list):
list_ = Mock(spec="salt.modules.gpg.list_keys")
list_.side_effect = (keys_list, [x for x in keys_list if x["keyid"] != "A"])
with patch.dict(gpg.__salt__, {"gpg.list_keys": list_}):
ret = gpg.absent("A")
assert ret["result"] == expected
assert bool(ret["changes"]) == expected
gpg_delete.assert_called_once()
@pytest.mark.usefixtures("gpg_list_keys")
def test_gpg_absent_test_mode_no_changes(gpg_delete):
with patch.dict(gpg.__opts__, {"test": True}):
ret = gpg.absent("A")
gpg_delete.assert_not_called()
assert ret["result"] is None
assert bool(ret["changes"])
def test_gpg_absent_list_keys_with_gnupghome_and_user(gpg_list_keys):
gnupghome = "/pls_respect_me"
user = "imthereaswell"
gpg.absent("nonexistent", gnupghome=gnupghome, user=user)
gpg_list_keys.assert_called_with(gnupghome=gnupghome, user=user, keyring=None)
@pytest.mark.usefixtures("gpg_list_keys")
def test_gpg_absent_delete_key_called_with_correct_kwargs(gpg_delete):
key = "A"
user = "hellothere"
gnupghome = "/pls_sir"
gpg.absent(key, user=user, gnupghome=gnupghome)
gpg_delete.assert_called_with(
keyid=key, gnupghome=gnupghome, user=user, keyring=None, use_passphrase=False
)