Allow signature verification logic to be reused for other backends

This commit is contained in:
jeanluc 2024-04-18 22:57:13 +02:00 committed by Daniel Wozniak
parent b171fae4e2
commit aaad0d2ecf
10 changed files with 421 additions and 65 deletions

1
changelog/66527.added.md Normal file
View file

@ -0,0 +1 @@
Added support for specifying different signature verification backends in `file.managed`/`archive.extracted`

View file

@ -771,6 +771,7 @@ def get_source_sum(
signed_by_all=None,
keyring=None,
gnupghome=None,
sig_backend="gpg",
):
"""
.. versionadded:: 2016.11.0
@ -813,7 +814,7 @@ def get_source_sum(
source_hash_sig
When ``source`` is a remote file source and ``source_hash`` is a file,
ensure a valid GPG signature exists on the source hash file.
ensure a valid signature exists on the source hash file.
Set this to ``true`` for an inline (clearsigned) signature, or to a
file URI retrievable by `:py:func:`cp.cache_file <salt.modules.cp.cache_file>`
for a detached one.
@ -822,15 +823,17 @@ def get_source_sum(
signed_by_any
When verifying ``source_hash_sig``, require at least one valid signature
from one of a list of key fingerprints. This is passed to :py:func:`gpg.verify
<salt.modules.gpg.verify>`.
from one of a list of keys.
By default, this is passed to :py:func:`gpg.verify <salt.modules.gpg.verify>`,
meaning a key is identified by its fingerprint.
.. versionadded:: 3007.0
signed_by_all
When verifying ``source_hash_sig``, require a valid signature from each
of the key fingerprints in this list. This is passed to :py:func:`gpg.verify
<salt.modules.gpg.verify>`.
of the keys in this list.
By default, this is passed to :py:func:`gpg.verify <salt.modules.gpg.verify>`,
meaning a key is identified by its fingerprint.
.. versionadded:: 3007.0
@ -844,6 +847,13 @@ def get_source_sum(
.. versionadded:: 3007.0
sig_backend
When verifying signatures, use this execution module as a backend.
It must be compatible with the :py:func:`gpg.verify <salt.modules.gpg.verify>` API.
Defaults to ``gpg``. All signature-related parameters are passed through.
.. versionadded:: 3008.0
CLI Example:
.. code-block:: bash
@ -888,14 +898,13 @@ def get_source_sum(
_check_sig(
hash_fn,
signature=(
source_hash_sig
if isinstance(source_hash_sig, str)
else None
source_hash_sig if source_hash_sig is not True else None
),
signed_by_any=signed_by_any,
signed_by_all=signed_by_all,
keyring=keyring,
gnupghome=gnupghome,
sig_backend=sig_backend,
saltenv=saltenv,
verify_ssl=verify_ssl,
)
@ -1028,29 +1037,51 @@ def _check_sig(
signed_by_all=None,
keyring=None,
gnupghome=None,
sig_backend="gpg",
saltenv="base",
verify_ssl=True,
):
try:
verify = __salt__["gpg.verify"]
verify = __salt__[f"{sig_backend}.verify"]
except KeyError:
raise CommandExecutionError(
"Signature verification requires the gpg module, "
f"Signature verification requires the {sig_backend} module, "
"which could not be found. Make sure you have the "
"necessary tools and libraries intalled (gpg, python-gnupg)"
"necessary tools and libraries intalled"
)
sig = None
# The GPG module does not understand URLs as signatures currently.
# Also, we want to ensure that, when verification fails, we get rid
# of the cached signatures.
final_sigs = None
if signature is not None:
# Fetch detached signature
sig = __salt__["cp.cache_file"](signature, saltenv, verify_ssl=verify_ssl)
if not sig:
raise CommandExecutionError(
f"Detached signature file {signature} not found"
)
sigs = [signature] if isinstance(signature, str) else signature
sigs_cached = []
final_sigs = []
for sig in sigs:
cached_sig = None
try:
urllib.parse.urlparse(sig)
except (TypeError, ValueError):
pass
else:
cached_sig = __salt__["cp.cache_file"](
sig, saltenv, verify_ssl=verify_ssl
)
if not cached_sig:
# The GPG module expects signatures as a single file path currently
if sig_backend == "gpg":
raise CommandExecutionError(
f"Detached signature file {sig} not found"
)
else:
sigs_cached.append(cached_sig)
final_sigs.append(cached_sig or sig)
if isinstance(signature, str):
final_sigs = final_sigs[0]
res = verify(
filename=on_file,
signature=sig,
signature=final_sigs,
keyring=keyring,
gnupghome=gnupghome,
signed_by_any=signed_by_any,
@ -1061,8 +1092,9 @@ def _check_sig(
return
# Ensure detached signature and file are deleted from cache
# on signature verification failure.
if sig:
salt.utils.files.safe_rm(sig)
if signature is not None:
for sig in sigs_cached:
salt.utils.files.safe_rm(sig)
salt.utils.files.safe_rm(on_file)
raise CommandExecutionError(
f"The file's signature could not be verified: {res['message']}"
@ -4705,6 +4737,7 @@ def get_managed(
ignore_ordering=False,
ignore_whitespace=False,
ignore_comment_characters=None,
sig_backend="gpg",
**kwargs,
):
"""
@ -4773,7 +4806,7 @@ def get_managed(
source_hash_sig
When ``source`` is a remote file source, ``source_hash`` is a file,
``skip_verify`` is not true and ``use_etag`` is not true, ensure a
valid GPG signature exists on the source hash file.
valid signature exists on the source hash file.
Set this to ``true`` for an inline (clearsigned) signature, or to a
file URI retrievable by `:py:func:`cp.cache_file <salt.modules.cp.cache_file>`
for a detached one.
@ -4782,15 +4815,17 @@ def get_managed(
signed_by_any
When verifying ``source_hash_sig``, require at least one valid signature
from one of a list of key fingerprints. This is passed to :py:func:`gpg.verify
<salt.modules.gpg.verify>`.
from one of a list of keys.
By default, this is passed to :py:func:`gpg.verify <salt.modules.gpg.verify>`,
meaning a key is identified by its fingerprint.
.. versionadded:: 3007.0
signed_by_all
When verifying ``source_hash_sig``, require a valid signature from each
of the key fingerprints in this list. This is passed to :py:func:`gpg.verify
<salt.modules.gpg.verify>`.
of the keys in this list.
By default, this is passed to :py:func:`gpg.verify <salt.modules.gpg.verify>`,
meaning a key is identified by its fingerprint.
.. versionadded:: 3007.0
@ -4837,6 +4872,13 @@ def get_managed(
.. versionadded:: 3007.0
sig_backend
When verifying signatures, use this execution module as a backend.
It must be compatible with the :py:func:`gpg.verify <salt.modules.gpg.verify>` API.
Defaults to ``gpg``. All signature-related parameters are passed through.
.. versionadded:: 3008.0
CLI Example:
.. code-block:: bash
@ -4910,6 +4952,7 @@ def get_managed(
signed_by_all=signed_by_all,
keyring=keyring,
gnupghome=gnupghome,
sig_backend=sig_backend,
)
except CommandExecutionError as exc:
return "", {}, exc.strerror
@ -6362,6 +6405,7 @@ def manage_file(
ignore_whitespace=False,
ignore_comment_characters=None,
new_file_diff=False,
sig_backend="gpg",
**kwargs,
):
"""
@ -6492,7 +6536,7 @@ def manage_file(
.. versionadded:: 3005
signature
Ensure a valid GPG signature exists on the selected ``source`` file.
Ensure a valid signature exists on the selected ``source`` file.
Set this to true for inline signatures, or to a file URI retrievable
by `:py:func:`cp.cache_file <salt.modules.cp.cache_file>`
for a detached one.
@ -6514,7 +6558,7 @@ def manage_file(
source_hash_sig
When ``source`` is a remote file source, ``source_hash`` is a file,
``skip_verify`` is not true and ``use_etag`` is not true, ensure a
valid GPG signature exists on the source hash file.
valid signature exists on the source hash file.
Set this to ``true`` for an inline (clearsigned) signature, or to a
file URI retrievable by `:py:func:`cp.cache_file <salt.modules.cp.cache_file>`
for a detached one.
@ -6531,15 +6575,17 @@ def manage_file(
signed_by_any
When verifying signatures either on the managed file or its source hash file,
require at least one valid signature from one of a list of key fingerprints.
This is passed to :py:func:`gpg.verify <salt.modules.gpg.verify>`.
require at least one valid signature from one of a list of keys.
By default, this is passed to :py:func:`gpg.verify <salt.modules.gpg.verify>`,
meaning a key is identified by its fingerprint.
.. versionadded:: 3007.0
signed_by_all
When verifying signatures either on the managed file or its source hash file,
require a valid signature from each of the key fingerprints in this list.
This is passed to :py:func:`gpg.verify <salt.modules.gpg.verify>`.
require a valid signature from each of the keys in this list.
By default, this is passed to :py:func:`gpg.verify <salt.modules.gpg.verify>`,
meaning a key is identified by its fingerprint.
.. versionadded:: 3007.0
@ -6592,6 +6638,13 @@ def manage_file(
.. versionadded:: 3008.0
sig_backend
When verifying signatures, use this execution module as a backend.
It must be compatible with the :py:func:`gpg.verify <salt.modules.gpg.verify>` API.
Defaults to ``gpg``. All signature-related parameters are passed through.
.. versionadded:: 3008.0
CLI Example:
.. code-block:: bash
@ -6682,11 +6735,12 @@ def manage_file(
try:
_check_sig(
sfn,
signature=signature if isinstance(signature, str) else None,
signature=signature if signature is not True else None,
signed_by_any=signed_by_any,
signed_by_all=signed_by_all,
keyring=keyring,
gnupghome=gnupghome,
sig_backend=sig_backend,
saltenv=saltenv,
verify_ssl=verify_ssl,
)
@ -6817,11 +6871,12 @@ def manage_file(
try:
_check_sig(
sfn,
signature=signature if isinstance(signature, str) else None,
signature=signature if signature is not True else None,
signed_by_any=signed_by_any,
signed_by_all=signed_by_all,
keyring=keyring,
gnupghome=gnupghome,
sig_backend=sig_backend,
saltenv=saltenv,
verify_ssl=verify_ssl,
)
@ -6948,11 +7003,12 @@ def manage_file(
try:
_check_sig(
sfn,
signature=signature if isinstance(signature, str) else None,
signature=signature if signature is not True else None,
signed_by_any=signed_by_any,
signed_by_all=signed_by_all,
keyring=keyring,
gnupghome=gnupghome,
sig_backend=sig_backend,
saltenv=saltenv,
verify_ssl=verify_ssl,
)

View file

@ -172,27 +172,47 @@ def _check_sig(
signed_by_all=None,
keyring=None,
gnupghome=None,
sig_backend="gpg",
):
try:
verify = __salt__["gpg.verify"]
verify = __salt__[f"{sig_backend}.verify"]
except KeyError:
raise CommandExecutionError(
"Signature verification requires the gpg module, "
f"Signature verification requires the {sig_backend} module, "
"which could not be found. Make sure you have the "
"necessary tools and libraries intalled (gpg, python-gnupg)"
"necessary tools and libraries intalled"
)
sig = None
# The GPG module does not understand URLs as signatures currently.
# Also, we want to ensure that, when verification fails, we get rid
# of the cached signatures.
final_sigs = None
if signature is not None:
# fetch detached signature
sig = __salt__["cp.cache_file"](signature, __env__)
if not sig:
raise CommandExecutionError(
f"Detached signature file {signature} not found"
)
sigs = [signature] if isinstance(signature, str) else signature
sigs_cached = []
final_sigs = []
for sig in sigs:
cached_sig = None
try:
urlparse(sig)
except (TypeError, ValueError):
pass
else:
cached_sig = __salt__["cp.cache_file"](sig, __env__)
if not cached_sig:
# The GPG module expects signatures as a single file path currently
if sig_backend == "gpg":
raise CommandExecutionError(
f"Detached signature file {sig} not found"
)
else:
sigs_cached.append(cached_sig)
final_sigs.append(cached_sig or sig)
if isinstance(signature, str):
final_sigs = final_sigs[0]
res = verify(
filename=on_file,
signature=sig,
signature=final_sigs,
keyring=keyring,
gnupghome=gnupghome,
signed_by_any=signed_by_any,
@ -203,8 +223,9 @@ def _check_sig(
return
# Ensure detached signature and file are deleted from cache
# on signature verification failure.
if sig:
salt.utils.files.safe_rm(sig)
if signature is not None:
for sig in sigs_cached:
salt.utils.files.safe_rm(sig)
salt.utils.files.safe_rm(on_file)
raise CommandExecutionError(
f"The file's signature could not be verified: {res['message']}"
@ -242,6 +263,7 @@ def extracted(
signed_by_all=None,
keyring=None,
gnupghome=None,
sig_backend="gpg",
**kwargs,
):
"""
@ -781,6 +803,13 @@ def extracted(
.. versionadded:: 3007.0
sig_backend
When verifying signatures, use this execution module as a backend.
It must be compatible with the :py:func:`gpg.verify <salt.modules.gpg.verify>` API.
Defaults to ``gpg``. All signature-related parameters are passed through.
.. versionadded:: 3008.0
**Examples**
1. tar with lmza (i.e. xz) compression:
@ -935,12 +964,12 @@ def extracted(
)
if signature or source_hash_sig:
# Fail early in case the gpg module is not present
# Fail early in case the signature verification backend is not present
try:
__salt__["gpg.verify"]
__salt__[f"{sig_backend}.verify"]
except KeyError:
ret["comment"] = (
"Cannot verify signatures because the gpg module was not loaded"
f"Cannot verify signatures because the {sig_backend} module was not loaded"
)
return ret
@ -1108,6 +1137,7 @@ def extracted(
signed_by_all=signed_by_all,
keyring=keyring,
gnupghome=gnupghome,
sig_backend=sig_backend,
)
except CommandExecutionError as exc:
ret["comment"] = exc.strerror
@ -1193,6 +1223,7 @@ def extracted(
signed_by_all=signed_by_all,
keyring=keyring,
gnupghome=gnupghome,
sig_backend=sig_backend,
)
except Exception as exc: # pylint: disable=broad-except
msg = "Failed to cache {}: {}".format(
@ -1223,6 +1254,7 @@ def extracted(
signed_by_all=signed_by_all,
keyring=keyring,
gnupghome=gnupghome,
sig_backend=sig_backend,
)
except CommandExecutionError as err:
ret["comment"] = f"Failed verifying the source file's signature: {err}"

View file

@ -2326,6 +2326,7 @@ def managed(
ignore_whitespace=False,
ignore_comment_characters=None,
new_file_diff=False,
sig_backend="gpg",
**kwargs,
):
r"""
@ -2928,7 +2929,7 @@ def managed(
.. versionadded:: 3005
signature
Ensure a valid GPG signature exists on the selected ``source`` file.
Ensure a valid signature exists on the selected ``source`` file.
Set this to true for inline signatures, or to a file URI retrievable
by `:py:func:`cp.cache_file <salt.modules.cp.cache_file>`
for a detached one.
@ -2950,7 +2951,7 @@ def managed(
source_hash_sig
When ``source`` is a remote file source, ``source_hash`` is a file,
``skip_verify`` is not true and ``use_etag`` is not true, ensure a
valid GPG signature exists on the source hash file.
valid signature exists on the source hash file.
Set this to ``true`` for an inline (clearsigned) signature, or to a
file URI retrievable by `:py:func:`cp.cache_file <salt.modules.cp.cache_file>`
for a detached one.
@ -2967,15 +2968,17 @@ def managed(
signed_by_any
When verifying signatures either on the managed file or its source hash file,
require at least one valid signature from one of a list of key fingerprints.
This is passed to :py:func:`gpg.verify <salt.modules.gpg.verify>`.
require at least one valid signature from one of a list of keys.
By default, this is passed to :py:func:`gpg.verify <salt.modules.gpg.verify>`,
meaning a key is identified by its fingerprint.
.. versionadded:: 3007.0
signed_by_all
When verifying signatures either on the managed file or its source hash file,
require a valid signature from each of the key fingerprints in this list.
This is passed to :py:func:`gpg.verify <salt.modules.gpg.verify>`.
require a valid signature from each of the keys in this list.
By default, this is passed to :py:func:`gpg.verify <salt.modules.gpg.verify>`,
meaning a key is identified by its fingerprint.
.. versionadded:: 3007.0
@ -3026,6 +3029,13 @@ def managed(
If ``True``, creation of new files will still show a diff in the
changes return.
.. versionadded:: 3008.0
sig_backend
When verifying signatures, use this execution module as a backend.
It must be compatible with the :py:func:`gpg.verify <salt.modules.gpg.verify>` API.
Defaults to ``gpg``. All signature-related parameters are passed through.
.. versionadded:: 3008.0
"""
if "env" in kwargs:
@ -3051,12 +3061,13 @@ def managed(
has_changes = False
if signature or source_hash_sig:
# Fail early in case the gpg module is not present
# Fail early in case the signature verification backend is not present
try:
__salt__["gpg.verify"]
__salt__[f"{sig_backend}.verify"]
except KeyError:
_error(
ret, "Cannot verify signatures because the gpg module was not loaded"
ret,
f"Cannot verify signatures because the {sig_backend} module was not loaded",
)
if selinux:
@ -3310,6 +3321,7 @@ def managed(
signed_by_all=signed_by_all,
keyring=keyring,
gnupghome=gnupghome,
sig_backend=sig_backend,
)
hsum = __salt__["file.get_hash"](name, source_sum["hash_type"])
except (CommandExecutionError, OSError) as err:
@ -3405,6 +3417,7 @@ def managed(
signed_by_all=signed_by_all,
keyring=keyring,
gnupghome=gnupghome,
sig_backend=sig_backend,
ignore_ordering=ignore_ordering,
ignore_whitespace=ignore_whitespace,
ignore_comment_characters=ignore_comment_characters,
@ -3490,6 +3503,7 @@ def managed(
signed_by_all=signed_by_all,
keyring=keyring,
gnupghome=gnupghome,
sig_backend=sig_backend,
**kwargs,
)
except Exception as exc: # pylint: disable=broad-except
@ -3550,6 +3564,7 @@ def managed(
signed_by_all=signed_by_all,
keyring=keyring,
gnupghome=gnupghome,
sig_backend=sig_backend,
ignore_ordering=ignore_ordering,
ignore_whitespace=ignore_whitespace,
ignore_comment_characters=ignore_comment_characters,
@ -3643,6 +3658,7 @@ def managed(
signed_by_all=signed_by_all,
keyring=keyring,
gnupghome=gnupghome,
sig_backend=sig_backend,
ignore_ordering=ignore_ordering,
ignore_whitespace=ignore_whitespace,
ignore_comment_characters=ignore_comment_characters,
@ -9190,6 +9206,7 @@ def cached(
signed_by_all=None,
keyring=None,
gnupghome=None,
sig_backend="gpg",
):
"""
.. versionadded:: 2017.7.3
@ -9250,7 +9267,7 @@ def cached(
source_hash_sig
When ``name`` is a remote file source, ``source_hash`` is a file,
``skip_verify`` is not true and ``use_etag`` is not true, ensure a
valid GPG signature exists on the source hash file.
valid signature exists on the source hash file.
Set this to ``true`` for an inline (clearsigned) signature, or to a
file URI retrievable by `:py:func:`cp.cache_file <salt.modules.cp.cache_file>`
for a detached one.
@ -9265,15 +9282,17 @@ def cached(
signed_by_any
When verifying ``source_hash_sig``, require at least one valid signature
from one of a list of key fingerprints. This is passed to
:py:func:`gpg.verify <salt.modules.gpg.verify>`.
from one of a list of keys.
By default, this is passed to :py:func:`gpg.verify <salt.modules.gpg.verify>`,
meaning a key is identified by its fingerprint.
.. versionadded:: 3007.0
signed_by_all
When verifying ``source_hash_sig``, require a valid signature from each
of the key fingerprints in this list. This is passed to
:py:func:`gpg.verify <salt.modules.gpg.verify>`.
of the keys in this list.
By default, this is passed to :py:func:`gpg.verify <salt.modules.gpg.verify>`,
meaning a key is identified by its fingerprint.
.. versionadded:: 3007.0
@ -9287,6 +9306,13 @@ def cached(
.. versionadded:: 3007.0
sig_backend
When verifying signatures, use this execution module as a backend.
It must be compatible with the :py:func:`gpg.verify <salt.modules.gpg.verify>` API.
Defaults to ``gpg``. All signature-related parameters are passed through.
.. versionadded:: 3008.0
This state will in most cases not be useful in SLS files, but it is useful
when writing a state or remote-execution module that needs to make sure
that a file at a given URL has been downloaded to the cachedir. One example
@ -9357,6 +9383,7 @@ def cached(
signed_by_all=signed_by_all,
keyring=keyring,
gnupghome=gnupghome,
sig_backend=sig_backend,
)
except CommandExecutionError as exc:
ret["comment"] = exc.strerror

Binary file not shown.

Binary file not shown.

View file

@ -135,6 +135,24 @@ def b_fp():
return "118B4FAB78038CB2DF7B69E20F6C422647465C93"
@pytest.fixture
def pub_ec():
return """\
-----BEGIN PUBLIC KEY-----
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEACXBqu2ndMLUS/Z0X/fKUGAgRUfe
nYBie3erw/QNOYfQpgDIjNu+6xVxMLRRvSYGrQ2JREwUVXR0SR5pERAnoQ==
-----END PUBLIC KEY-----"""
@pytest.fixture
def pub_ec2():
return """\
-----BEGIN PUBLIC KEY-----
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAErtBZ3qL5m97SzlSwOoxFzzG/1v5a
sLzOIrXykh4yO8tDn4h6JMOe+P0HuoUbENxk4+f/1D9hTEI88rj70bi7Ig==
-----END PUBLIC KEY-----"""
@pytest.fixture
def _gpg_keys_present(gnupg, a_pubkey, b_pubkey, a_fp, b_fp):
pubkeys = [a_pubkey, b_pubkey]
@ -949,6 +967,30 @@ def test_file_managed_signature(
assert name.read_text() == contents_file.read_text()
@pytest.mark.requires_salt_modules("asymmetric.verify")
@pytest.mark.parametrize("is_list", (False, True))
def test_file_managed_signature_sig_backend(
file, tmp_path, remote_grail_scene33, pub_ec, pub_ec2, is_list
):
name = tmp_path / "test_file_managed_signature.txt"
source = remote_grail_scene33.url
signature = source + ".sig"
contents_file = remote_grail_scene33.file
source_hash = remote_grail_scene33.hash
ret = file.managed(
str(name),
source=source,
source_hash=source_hash,
signature=[signature] if is_list else signature,
signed_by_any=[pub_ec2, pub_ec] if is_list else pub_ec,
sig_backend="asymmetric",
)
assert ret.result is True
assert ret.changes
assert name.exists()
assert name.read_text() == contents_file.read_text()
@pytest.mark.skipif(HAS_GNUPG is False, reason="Needs python-gnupg library")
@pytest.mark.usefixtures("_gpg_keys_present")
def test_file_managed_signature_fail(
@ -969,6 +1011,33 @@ def test_file_managed_signature_fail(
signed_by_all=signed_by_all,
)
assert ret.result is False
assert "signature could not be verified" in ret.comment
assert not ret.changes
assert not name.exists()
# Ensure that a new state run will attempt to redownload the source
# instead of verifying the invalid signature again
assert not modules.cp.is_cached(source)
assert not modules.cp.is_cached(signature)
@pytest.mark.requires_salt_modules("asymmetric.verify")
def test_file_managed_signature_sig_backend_fail(
file, tmp_path, remote_grail_scene33, pub_ec2, modules
):
name = tmp_path / "test_file_managed_signature.txt"
source = remote_grail_scene33.url
signature = source + ".sig"
source_hash = remote_grail_scene33.hash
ret = file.managed(
str(name),
source=source,
source_hash=source_hash,
signature=[signature],
signed_by_any=pub_ec2,
sig_backend="asymmetric",
)
assert ret.result is False
assert "signature could not be verified" in ret.comment
assert not ret.changes
assert not name.exists()
# Ensure that a new state run will attempt to redownload the source
@ -1004,6 +1073,30 @@ def test_file_managed_source_hash_sig(
assert name.read_text() == contents_file.read_text()
@pytest.mark.requires_salt_modules("asymmetric.verify")
@pytest.mark.parametrize("is_list", (False, True))
def test_file_managed_source_hash_sig_sig_backend(
file, tmp_path, remote_grail_scene33, pub_ec, pub_ec2, is_list
):
name = tmp_path / "test_file_managed_source_hash_sig.txt"
source = remote_grail_scene33.url
source_hash = remote_grail_scene33.url_hash
contents_file = remote_grail_scene33.file
signature = source_hash + ".sig"
ret = file.managed(
str(name),
source=source,
source_hash=source_hash,
source_hash_sig=[signature] if is_list else signature,
signed_by_any=[pub_ec2, pub_ec] if is_list else pub_ec,
sig_backend="asymmetric",
)
assert ret.result is True
assert ret.changes
assert name.exists()
assert name.read_text() == contents_file.read_text()
@pytest.mark.skipif(HAS_GNUPG is False, reason="Needs python-gnupg library")
@pytest.mark.usefixtures("_gpg_keys_present")
def test_file_managed_source_hash_sig_fail(
@ -1023,6 +1116,29 @@ def test_file_managed_source_hash_sig_fail(
signed_by_all=signed_by_all,
)
assert ret.result is False
assert "signature could not be verified" in ret.comment
assert not ret.changes
assert not name.exists()
@pytest.mark.requires_salt_modules("asymmetric.verify")
def test_file_managed_source_hash_sig_sig_backend_fail(
file, tmp_path, remote_grail_scene33, pub_ec2
):
name = tmp_path / "test_file_managed_source_hash_sig.txt"
source = remote_grail_scene33.url
source_hash = remote_grail_scene33.url_hash
signature = source_hash + ".sig"
ret = file.managed(
str(name),
source=source,
source_hash=source_hash,
source_hash_sig=[signature],
signed_by_any=pub_ec2,
sig_backend="asymmetric",
)
assert ret.result is False
assert "signature could not be verified" in ret.comment
assert not ret.changes
assert not name.exists()

View file

@ -326,6 +326,24 @@ def b_fp():
return "8DF09DE54AB67F6031D71D6F21E59FD705B38781"
@pytest.fixture
def pub_ec():
return """\
-----BEGIN PUBLIC KEY-----
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEACXBqu2ndMLUS/Z0X/fKUGAgRUfe
nYBie3erw/QNOYfQpgDIjNu+6xVxMLRRvSYGrQ2JREwUVXR0SR5pERAnoQ==
-----END PUBLIC KEY-----"""
@pytest.fixture
def pub_ec2():
return """\
-----BEGIN PUBLIC KEY-----
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAErtBZ3qL5m97SzlSwOoxFzzG/1v5a
sLzOIrXykh4yO8tDn4h6JMOe+P0HuoUbENxk4+f/1D9hTEI88rj70bi7Ig==
-----END PUBLIC KEY-----"""
@pytest.fixture
def gpg_keys_present(gnupg, a_pubkey, b_pubkey, a_fp, b_fp):
pubkeys = [a_pubkey, b_pubkey]
@ -344,9 +362,11 @@ def sig_files_present(web_root, modules):
for file in [
"custom.tar.gz",
"custom.tar.gz.asc",
"custom.tar.gz.sig",
"custom.tar.gz.SHA256",
"custom.tar.gz.SHA256.clearsign.asc",
"custom.tar.gz.SHA256.asc",
"custom.tar.gz.SHA256.sig",
]:
modules.file.copy(base / file, Path(web_root) / file)
@ -373,6 +393,31 @@ def test_archive_extracted_signature(tmp_path, gpghome, free_port, modules, stat
assert modules.file.find(str(name))
@pytest.mark.requires_salt_modules("asymmetric.verify")
@pytest.mark.parametrize("is_list", (False, True))
def test_archive_extracted_signature_sig_backend(
tmp_path, free_port, modules, states, pub_ec, pub_ec2, is_list
):
name = tmp_path / "test_archive_extracted_signature"
source = f"http://localhost:{free_port}/custom.tar.gz"
signature = source + ".sig"
source_hash = source + ".SHA256"
ret = states.archive.extracted(
str(name),
source=source,
source_hash=source_hash,
archive_format="tar",
options="z",
signature=[signature] if is_list else signature,
signed_by_any=[pub_ec2, pub_ec] if is_list else pub_ec,
sig_backend="asymmetric",
)
assert ret.result is True
assert ret.changes
assert name.exists()
assert modules.file.find(str(name))
@pytest.mark.skipif(HAS_GNUPG is False, reason="Needs python-gnupg library")
@pytest.mark.usefixtures("gpg_keys_present")
def test_archive_extracted_signature_fail(
@ -395,6 +440,33 @@ def test_archive_extracted_signature_fail(
gnupghome=str(gpghome),
)
assert ret.result is False
assert "signature could not be verified" in ret.comment
assert not ret.changes
assert not name.exists()
assert not modules.cp.is_cached(source)
assert not modules.cp.is_cached(signature)
@pytest.mark.requires_salt_modules("asymmetric.verify")
def test_archive_extracted_signature_sig_backend_fail(
tmp_path, free_port, modules, states, pub_ec2
):
name = tmp_path / "test_archive_extracted_signature"
source = f"http://localhost:{free_port}/custom.tar.gz"
signature = source + ".sig"
source_hash = source + ".SHA256"
ret = states.archive.extracted(
str(name),
source=source,
source_hash=source_hash,
archive_format="tar",
options="z",
signature=signature,
signed_by_any=[pub_ec2],
sig_backend="asymmetric",
)
assert ret.result is False
assert "signature could not be verified" in ret.comment
assert not ret.changes
assert not name.exists()
assert not modules.cp.is_cached(source)
@ -429,6 +501,31 @@ def test_archive_extracted_source_hash_sig(
assert modules.file.find(str(name))
@pytest.mark.requires_salt_modules("asymmetric.verify")
@pytest.mark.parametrize("is_list", (False, True))
def test_archive_extracted_source_hash_sig_sig_backend(
tmp_path, pub_ec, free_port, modules, states, is_list
):
name = tmp_path / "test_archive_extracted_source_hash_sig"
source = f"http://localhost:{free_port}/custom.tar.gz"
source_hash = source + ".SHA256"
sig = source_hash + ".sig"
ret = states.archive.extracted(
str(name),
source=source,
source_hash=source_hash,
archive_format="tar",
options="z",
source_hash_sig=[sig] if is_list else sig,
signed_by_any=[pub_ec2, pub_ec] if is_list else pub_ec,
sig_backend="asymmetric",
)
assert ret.result is True
assert ret.changes
assert name.exists()
assert modules.file.find(str(name))
@pytest.mark.skipif(HAS_GNUPG is False, reason="Needs python-gnupg library")
@pytest.mark.usefixtures("gpg_keys_present")
@pytest.mark.parametrize("sig", [True, ".asc"])
@ -450,6 +547,33 @@ def test_archive_extracted_source_hash_sig_fail(
gnupghome=str(gpghome),
)
assert ret.result is False
assert "signature could not be verified" in ret.comment
assert not ret.changes
assert not name.exists()
assert not modules.cp.is_cached(source)
assert not modules.cp.is_cached(source_hash)
@pytest.mark.requires_salt_modules("asymmetric.verify")
def test_archive_extracted_source_hash_sig_sig_backend_fail(
tmp_path, pub_ec2, free_port, modules, states
):
name = tmp_path / "test_archive_extracted_source_hash_sig"
source = f"http://localhost:{free_port}/custom.tar.gz"
source_hash = source + ".SHA256"
sig = source_hash + ".sig"
ret = states.archive.extracted(
str(name),
source=source,
source_hash=source_hash,
archive_format="tar",
options="z",
source_hash_sig=[sig],
signed_by_any=pub_ec2,
sig_backend="asymmetric",
)
assert ret.result is False
assert "signature could not be verified" in ret.comment
assert not ret.changes
assert not name.exists()
assert not modules.cp.is_cached(source)