Allow stateful import of GPG keys from files/strings

This commit is contained in:
jeanluc 2024-03-04 14:01:08 +01:00 committed by Daniel Wozniak
parent 656cd3381b
commit 47341772ac
5 changed files with 500 additions and 93 deletions

1
changelog/66173.added.md Normal file
View file

@ -0,0 +1 @@
Added file and plaintext sources to `gpg.present`, allowed to skip keyserver queries

View file

@ -325,33 +325,7 @@ def list_keys(user=None, gnupghome=None, keyring=None):
"""
_keys = []
for _key in _list_keys(user=user, gnupghome=gnupghome, keyring=keyring):
tmp = {
"keyid": _key["keyid"],
"fingerprint": _key["fingerprint"],
"uids": _key["uids"],
}
expires = _key.get("expires", None)
date = _key.get("date", None)
length = _key.get("length", None)
owner_trust = _key.get("ownertrust", None)
trust = _key.get("trust", None)
if expires:
tmp["expires"] = time.strftime(
"%Y-%m-%d", time.localtime(float(_key["expires"]))
)
if date:
tmp["created"] = time.strftime(
"%Y-%m-%d", time.localtime(float(_key["date"]))
)
if length:
tmp["keyLength"] = _key["length"]
if owner_trust:
tmp["ownerTrust"] = LETTER_TRUST_DICT[_key["ownertrust"]]
if trust:
tmp["trust"] = LETTER_TRUST_DICT[_key["trust"]]
_keys.append(tmp)
_keys.append(_render_key(_key))
return _keys
@ -384,36 +358,38 @@ def list_secret_keys(user=None, gnupghome=None, keyring=None):
for _key in _list_keys(
user=user, gnupghome=gnupghome, keyring=keyring, secret=True
):
tmp = {
"keyid": _key["keyid"],
"fingerprint": _key["fingerprint"],
"uids": _key["uids"],
}
expires = _key.get("expires", None)
date = _key.get("date", None)
length = _key.get("length", None)
owner_trust = _key.get("ownertrust", None)
trust = _key.get("trust", None)
if expires:
tmp["expires"] = time.strftime(
"%Y-%m-%d", time.localtime(float(_key["expires"]))
)
if date:
tmp["created"] = time.strftime(
"%Y-%m-%d", time.localtime(float(_key["date"]))
)
if length:
tmp["keyLength"] = _key["length"]
if owner_trust:
tmp["ownerTrust"] = LETTER_TRUST_DICT[_key["ownertrust"]]
if trust:
tmp["trust"] = LETTER_TRUST_DICT[_key["trust"]]
_keys.append(tmp)
_keys.append(_render_key(_key))
return _keys
def _render_key(_key):
tmp = {
"keyid": _key["keyid"],
"fingerprint": _key["fingerprint"],
"uids": _key["uids"],
}
expires = _key.get("expires", None)
date = _key.get("date", None)
length = _key.get("length", None)
owner_trust = _key.get("ownertrust", None)
trust = _key.get("trust", None)
if expires:
tmp["expires"] = time.strftime(
"%Y-%m-%d", time.localtime(float(_key["expires"]))
)
if date:
tmp["created"] = time.strftime("%Y-%m-%d", time.localtime(float(_key["date"])))
if length:
tmp["keyLength"] = _key["length"]
if owner_trust:
tmp["ownerTrust"] = LETTER_TRUST_DICT[_key["ownertrust"]]
if trust:
tmp["trust"] = LETTER_TRUST_DICT[_key["trust"]]
return tmp
@_restore_ownership
def create_key(
key_type="RSA",
@ -831,7 +807,9 @@ def get_secret_key(
@_restore_ownership
def import_key(text=None, filename=None, user=None, gnupghome=None, keyring=None):
def import_key(
text=None, filename=None, user=None, gnupghome=None, keyring=None, select=None
):
r"""
Import a key from text or a file
@ -855,6 +833,13 @@ def import_key(text=None, filename=None, user=None, gnupghome=None, keyring=None
.. versionadded:: 3007.0
select
Limit imported keys to a (list of) known identifier(s). This can be
anything which GnuPG uses to identify keys like fingerprints, key IDs
or email addresses.
.. versionadded:: 3008.0
CLI Example:
.. code-block:: bash
@ -863,33 +848,61 @@ def import_key(text=None, filename=None, user=None, gnupghome=None, keyring=None
salt '*' gpg.import_key filename='/path/to/public-key-file'
"""
ret = {"res": True, "message": ""}
if not text and not filename:
def _import(gpg, path=None, data=None):
if path:
try:
try:
imported_data = gpg.import_keys_file(path)
except AttributeError:
# python-gnupg < 0.5.0
with salt.utils.files.flopen(filename, "rb") as _fp:
data = salt.utils.stringutils.to_unicode(_fp.read())
except OSError:
raise SaltInvocationError("filename does not exist.")
if data:
imported_data = gpg.import_keys(data)
ret = {"res": True, "message": "", "fingerprints": imported_data.fingerprints}
if imported_data.imported or imported_data.imported_rsa:
ret["message"] = "Successfully imported key(s)."
elif imported_data.unchanged:
ret["message"] = "Key(s) already exist in keychain."
elif imported_data.not_imported:
ret["res"] = False
ret["message"] = "Unable to import key."
elif not imported_data.count:
ret["res"] = False
ret["message"] = "Unable to import key."
return ret
if not (text or filename):
raise SaltInvocationError("filename or text must be passed.")
if text and filename:
raise SaltInvocationError("filename and text are mutually exclusive.")
select = select or []
if not isinstance(select, list):
select = [select]
if select:
# GnuPG does not expose selective import behavior, so import everything
# to a temporary keyring and then export only the wanted keys.
tmpkeyring = __salt__["temp.file"]()
tmpgpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=tmpkeyring)
res = _import(tmpgpg, path=filename, data=text)
if not res["res"]:
return res
text = tmpgpg.export_keys(select)
if not text:
return {
"res": True,
"message": "After filtering, no keys to import were left.",
"fingerprints": [],
}
filename = None
gpg = _create_gpg(user=user, gnupghome=gnupghome, keyring=keyring)
if filename:
try:
with salt.utils.files.flopen(filename, "rb") as _fp:
text = salt.utils.stringutils.to_unicode(_fp.read())
except OSError:
raise SaltInvocationError("filename does not exist.")
imported_data = gpg.import_keys(text)
if imported_data.imported or imported_data.imported_rsa:
ret["message"] = "Successfully imported key(s)."
elif imported_data.unchanged:
ret["message"] = "Key(s) already exist in keychain."
elif imported_data.not_imported:
ret["res"] = False
ret["message"] = "Unable to import key."
elif not imported_data.count:
ret["res"] = False
ret["message"] = "Unable to import key."
return ret
return _import(gpg, path=filename, data=text)
def export_key(
@ -990,6 +1003,69 @@ def export_key(
return ret
def read_key(
path=None, text=None, fingerprint=None, keyid=None, user=None, gnupghome=None
):
"""
.. versionadded:: 3008.0
Read key(s) from the filesystem or a string.
CLI Example:
.. code-block:: bash
salt '*' gpg.read_key /tmp/my-shiny-key.asc
path
The path to the key file to read. Either this or ``text`` is required.
text
The string to read the key from. Either this or ``path`` is required.
.. note::
Requires python-gnupg v0.5.1.
fingerprint
Only return key information if it matches this fingerprint.
keyid
Only return key information if it matches this keyid.
user
Which user's keychain to access, defaults to user Salt is running as.
Passing the user as ``salt`` will set the GnuPG home directory to
``/etc/salt/gpgkeys``.
gnupghome
Specify the location where the GPG keyring and related files are stored.
.. important::
This can accidentally decrypt data on GnuPG versions below 2.1
if the file is not a keyring.
"""
if not (path or text):
raise SaltInvocationError("Either `path` or `text` is required.")
if path and text:
raise SaltInvocationError("`path` and `text` are mutually exclusive.")
gpg = _create_gpg(user=user, gnupghome=gnupghome)
if path:
keys = gpg.scan_keys(path)
else:
keys = gpg.scan_keys_mem(text)
rets = []
for _key in keys:
if (
not (fingerprint or keyid)
or _key["fingerprint"] == fingerprint
or _key["keyid"] == keyid
or _key["keyid"][8:] == keyid
):
rets.append(_render_key(_key))
return rets
@_restore_ownership
def receive_keys(keyserver=None, keys=None, user=None, gnupghome=None, keyring=None):
"""

View file

@ -34,6 +34,9 @@ def present(
gnupghome=None,
trust=None,
keyring=None,
source=None,
skip_keyserver=False,
text=None,
**kwargs,
):
"""
@ -65,10 +68,44 @@ def present(
a local filesystem path.
.. versionadded:: 3007.0
source
A path/URI or list of paths/URI to retrieve the key from.
By default, this works as a backup to retrieving the key from
the keyserver.
.. note::
This works like the ``source`` parameter to ``file.managed``.
Only the first succesfully retrievable source is taken into account.
.. versionadded:: 3008.0
skip_keyserver
Do not attempt to retrieve the key from the keyserver, only use ``source``.
Defaults to false.
.. versionadded:: 3008.0
text
Instead of retrieving the key(s) to import from a keyserver or
a local file source, import the key(s) from this (armored) string.
.. note::
``name`` or ``keys`` must still specify the expected key ID(s).
Requires python-gnupg v0.5.1.
.. versionadded:: 3008.0
"""
ret = {"name": name, "result": True, "changes": {}, "comment": []}
if not text and skip_keyserver and not source:
ret["result"] = False
ret[
"comment"
] = "When skipping keyservers, you must provide at least one source"
return ret
_current_keys = __salt__["gpg.list_keys"](
user=user, gnupghome=gnupghome, keyring=keyring
)
@ -137,13 +174,65 @@ def present(
ret, f"changes:{key}:added", True
)
continue
result = __salt__["gpg.receive_keys"](
keyserver=keyserver,
keys=key,
user=user,
gnupghome=gnupghome,
keyring=keyring,
)
result = {}
if text:
has_key = __salt__["gpg.read_key"](
text=text, keyid=key, gnupghome=gnupghome, user=user
)
if has_key:
log.debug(f"Passed text contains key {key}")
result = __salt__["gpg.import_key"](
text=text,
user=user,
gnupghome=gnupghome,
keyring=keyring,
select=key,
)
else:
result = {
"res": False,
"message": ["Passed text did not contain the requested key"],
}
else:
if not skip_keyserver:
result = __salt__["gpg.receive_keys"](
keyserver=keyserver,
keys=key,
user=user,
gnupghome=gnupghome,
keyring=keyring,
)
if (not result or result["res"] is False) and source:
if not isinstance(source, list):
source = [source]
for src in source:
sfn = __salt__["cp.cache_file"](src)
if sfn:
log.debug(f"Found source: {src}")
has_key = __salt__["gpg.read_key"](
path=sfn, keyid=key, gnupghome=gnupghome, user=user
)
if has_key:
log.debug(f"Found source {src} contains key {key}")
result = __salt__["gpg.import_key"](
filename=sfn,
user=user,
gnupghome=gnupghome,
keyring=keyring,
select=key,
)
break
else:
prev_msg = ""
if result:
prev_msg = " ".join(result["message"]) + ". In addition, "
result = {
"res": False,
"message": [
prev_msg
+ f"none of the specified sources were found or contained the key {key}."
],
}
if result["res"] is False:
ret["result"] = result["res"]
ret["comment"].extend(result["message"])

View file

@ -4,16 +4,10 @@ import subprocess
import psutil
import pytest
try:
import gnupg as gnupglib
HAS_GNUPG = True
except ImportError:
HAS_GNUPG = False
gnupglib = pytest.importorskip("gnupg", reason="Needs python-gnupg library")
PYGNUPG_VERSION = tuple(int(x) for x in gnupglib.__version__.split("."))
pytestmark = [
pytest.mark.skipif(HAS_GNUPG is False, reason="Needs python-gnupg library"),
pytest.mark.skip_if_binaries_missing("gpg", reason="Needs gpg binary"),
]
@ -576,6 +570,23 @@ def test_import_key_to_keyring(
assert gnupg_keyring.list_keys(keys=key_d_fp)
@pytest.mark.parametrize("select", (False, True))
def test_import_key_select(
gpghome, gnupg, gpg, key_a_pub, key_a_fp, key_b_pub, key_b_fp, select
):
select = key_a_fp if select else None
assert not gnupg.list_keys(keys=key_a_fp)
assert not gnupg.list_keys(keys=key_b_fp)
res = gpg.import_key(
text=key_a_pub + "\n" + key_b_pub, select=select, gnupghome=str(gpghome)
)
assert res
assert res["res"]
assert "Successfully imported" in res["message"]
assert gnupg.list_keys(keys=key_a_fp)
assert bool(gnupg.list_keys(keys=key_b_fp)) is not bool(select)
@pytest.mark.usefixtures("_pubkeys_present")
def test_export_key(gpghome, gpg, key_a_fp):
res = gpg.export_key(keyids=key_a_fp, gnupghome=str(gpghome))
@ -849,3 +860,74 @@ def test_decrypt_with_keyring(
assert res["res"]
assert res["comment"]
assert res["comment"] == b"I like turtles"
@pytest.mark.parametrize(
"text",
(
False,
pytest.param(
True,
marks=pytest.mark.skipif(
PYGNUPG_VERSION < (0, 5, 1), reason="Text requires python-gnupg >=0.5.1"
),
),
),
)
def test_read_key(gpg, gpghome, key_a_pub, key_a_fp, text):
if text:
res = gpg.read_key(text=key_a_pub, gnupghome=str(gpghome))
else:
with pytest.helpers.temp_file("key", contents=key_a_pub) as keyfile:
res = gpg.read_key(path=str(keyfile), gnupghome=str(gpghome))
assert res
assert len(res) == 1
assert res[0]["fingerprint"] == key_a_fp
@pytest.mark.parametrize(
"text",
(
False,
pytest.param(
True,
marks=pytest.mark.skipif(
PYGNUPG_VERSION < (0, 5, 1), reason="Text requires python-gnupg >=0.5.1"
),
),
),
)
@pytest.mark.parametrize("fingerprint", (False, True))
@pytest.mark.parametrize("keyid", (False, True))
def test_read_key_multiple(
gpg,
gnupg,
gpghome,
key_a_pub,
key_a_fp,
key_b_pub,
key_b_fp,
text,
fingerprint,
keyid,
):
params = {
"gnupghome": str(gpghome),
}
if fingerprint:
params["fingerprint"] = key_a_fp
if keyid:
params["keyid"] = key_a_fp[-8:]
concat = key_a_pub + "\n" + key_b_pub
if text:
res = gpg.read_key(text=concat, **params)
else:
with pytest.helpers.temp_file("key", contents=concat) as keyfile:
res = gpg.read_key(path=str(keyfile), **params)
assert res
if not (fingerprint or keyid):
assert len(res) == 2
assert any(key["fingerprint"] == key_b_fp for key in res)
else:
assert len(res) == 1
assert any(key["fingerprint"] == key_a_fp for key in res)

View file

@ -6,6 +6,7 @@ import psutil
import pytest
gnupglib = pytest.importorskip("gnupg", reason="Needs python-gnupg library")
PYGNUPG_VERSION = tuple(int(x) for x in gnupglib.__version__.split("."))
pytestmark = [
pytest.mark.skip_if_binaries_missing("gpg", reason="Needs gpg binary"),
@ -81,6 +82,52 @@ pDEmK8EhJDvV/9o0lnhm/9w=
-----END PGP PUBLIC KEY BLOCK-----"""
@pytest.fixture
def key_b_fp():
return "118B4FAB78038CB2DF7B69E20F6C422647465C93"
@pytest.fixture
def key_b_pub():
return """\
-----BEGIN PGP PUBLIC KEY BLOCK-----
mI0EY4fxNQEEAOgAzbpheJrOq4il5BrMVtP1G1kU94QX2+xLXEgW/wPdE4HD6Zbg
vliIg18v7Na4x8ubWy/7CkXC83EJ8SoSqcCccvuKjIWsm6tfeCidNstNCjewFMUR
7ZOQmAe/I2JAlz2SgNxS3ZDiCZpGkxqE0GZ+1N7Mz2WHImnExG149RVHABEBAAG0
LUtleSBCIChHZW5lcmF0ZWQgYnkgU2FsdFN0YWNrKSA8a2V5YkBleGFtcGxlPojR
BBMBCAA7FiEEEYtPq3gDjLLfe2niD2xCJkdGXJMFAmOH8TUCGy8FCwkIBwICIgIG
FQoJCAsCBBYCAwECHgcCF4AACgkQD2xCJkdGXJNR3AQAk5ZoN+/ViIX3vA/LbXPn
2VE1E7ETTeIGqsb5f98UfjIbYfkNE8+OtnPxnDbSOPWBEOT+XPPjmxnE0a2UNTfn
ECO71/ZUiyC3ZN50IZ0vgzwBH+DeIV6PDAAun5FGx4RI7v6n0CPlrUcWKYe8wY1F
COflOxnEyLVHXnX8wUIzZwo=
=Hq0X
-----END PGP PUBLIC KEY BLOCK-----"""
@pytest.fixture
def key_c_fp():
return "96F136AC4C92D78DAF33105E35C03186001C6E31"
@pytest.fixture
def key_c_pub():
return """\
-----BEGIN PGP PUBLIC KEY BLOCK-----
mI0EY4f2GgEEALToT23wZfLGM/JGCV4pWRlIXXqLwwEBSXral92HvsUjC8Vqsh1z
1n0K8/vIpS9OH2Q21emtht4y36rbahy+w6wRc1XXjPQ28Pyd+8v/jSKy/NKW3g+y
ZoB22vj4L35pAu/G6xs9+pKsLHGjMo+LsWZNEZ2Ar06aYA0dbTb0AqYfABEBAAG0
LUtleSBDIChHZW5lcmF0ZWQgYnkgU2FsdFN0YWNrKSA8a2V5Y0BleGFtcGxlPojR
BBMBCAA7FiEElvE2rEyS142vMxBeNcAxhgAcbjEFAmOH9hoCGy8FCwkIBwICIgIG
FQoJCAsCBBYCAwECHgcCF4AACgkQNcAxhgAcbjH2WAP/RtlUfN/novwmxxma6Zom
P6skFnCcRCs0vMU3OnNwuxZt9B+j0sUTu6noGi04Gcbd0eQs7v57DQHcRhNidZU/
8BJv5jD6E2yuzLK9lON+Yhgc6Pg6raA3hBeCY2HuzTEQLAThyV7ihboNILo7FJwo
y9KvnTFP2+oeDX2Z/m4SoWw=
=81Kb
-----END PGP PUBLIC KEY BLOCK-----"""
@pytest.fixture
def gnupg(gpghome):
return gnupglib.GPG(gnupghome=str(gpghome))
@ -186,6 +233,118 @@ def test_gpg_present_keyring_trust_change(
assert key_info[0]["trust"] == "u"
def test_gpg_present_source(
gpghome, gpg, gnupg, key_a_fp, key_a_pub, key_b_pub, key_b_fp
):
with pytest.helpers.temp_file(
"keys", contents=key_a_pub + "\n" + key_b_pub
) as keyfile:
ret = gpg.present(
key_a_fp[-16:],
gnupghome=str(gpghome),
skip_keyserver=True,
source=str(keyfile),
)
assert ret.result
assert ret.changes
assert key_a_fp[-16:] in ret.changes
assert ret.changes[key_a_fp[-16:]]["added"]
assert gnupg.list_keys(keys=key_a_fp)
assert not gnupg.list_keys(keys=key_b_fp)
@pytest.mark.skipif(
PYGNUPG_VERSION < (0, 5, 1), reason="Text requires python-gnupg >=0.5.1"
)
def test_gpg_present_text(
gpghome, gpg, gnupg, key_a_fp, key_a_pub, key_b_pub, key_b_fp
):
concat = key_a_pub + "\n" + key_b_pub
ret = gpg.present(key_a_fp[-16:], gnupghome=str(gpghome), text=concat)
assert ret.result
assert ret.changes
assert key_a_fp[-16:] in ret.changes
assert ret.changes[key_a_fp[-16:]]["added"]
assert gnupg.list_keys(keys=key_a_fp)
assert not gnupg.list_keys(keys=key_b_fp)
@pytest.mark.skipif(
PYGNUPG_VERSION < (0, 5, 1), reason="Text requires python-gnupg >=0.5.1"
)
def test_gpg_present_text_not_contained(
gpghome, gpg, gnupg, key_a_fp, key_a_pub, key_b_pub, key_b_fp, key_c_fp
):
concat = key_a_pub + "\n" + key_b_pub
ret = gpg.present(key_c_fp[-16:], gnupghome=str(gpghome), text=concat)
assert not ret.result
assert not ret.changes
assert not gnupg.list_keys(keys=key_a_fp)
assert not gnupg.list_keys(keys=key_b_fp)
assert "Passed text did not contain the requested key" in ret.comment
def test_gpg_present_multi_source(
gpghome, gpg, gnupg, key_a_fp, key_a_pub, key_b_pub, key_b_fp
):
with pytest.helpers.temp_file("keyb", contents=key_b_pub) as keybfile:
with pytest.helpers.temp_file("keya", contents=key_a_pub) as keyafile:
ret = gpg.present(
key_a_fp[-16:],
gnupghome=str(gpghome),
skip_keyserver=True,
source=[str(keybfile), str(keyafile)],
)
assert ret.result
assert ret.changes
assert key_a_fp[-16:] in ret.changes
assert ret.changes[key_a_fp[-16:]]["added"]
assert gnupg.list_keys(keys=key_a_fp)
assert not gnupg.list_keys(keys=key_b_fp)
def test_gpg_present_source_not_contained(
gpghome, gpg, gnupg, key_a_fp, key_a_pub, key_b_pub, key_b_fp, key_c_fp
):
with pytest.helpers.temp_file(
"keys", contents=key_a_pub + "\n" + key_b_pub
) as keyfile:
ret = gpg.present(
key_c_fp[-16:],
gnupghome=str(gpghome),
skip_keyserver=True,
source=str(keyfile),
)
assert not ret.result
assert not ret.changes
assert not gnupg.list_keys(keys=key_a_fp)
assert not gnupg.list_keys(keys=key_b_fp)
assert (
"none of the specified sources were found or contained the key" in ret.comment
)
def test_gpg_present_source_bad_keyfile(
gpghome, gpg, gnupg, key_a_fp, key_a_pub, key_b_pub, key_b_fp
):
with pytest.helpers.temp_file(
"keys", contents=key_a_pub + "\n" + key_b_pub
) as keyfile:
with pytest.helpers.temp_file("badkeys", contents="foobar") as badkeyfile:
ret = gpg.present(
key_a_fp[-16:],
gnupghome=str(gpghome),
skip_keyserver=True,
source=["/foo/bar/non/ex/is/tent", str(badkeyfile), str(keyfile)],
)
assert ret.result
assert ret.changes
assert key_a_fp[-16:] in ret.changes
assert ret.changes[key_a_fp[-16:]]["added"]
assert gnupg.list_keys(keys=key_a_fp)
assert not gnupg.list_keys(keys=key_b_fp)
@pytest.mark.windows_whitelisted
def test_gpg_absent_no_changes(gpghome, gpg, gnupg, key_a_fp):
assert not gnupg.list_keys(keys=key_a_fp)