Add signature verification to file.managed/archive.extracted

This commit is contained in:
jeanluc 2023-02-02 17:07:31 +01:00 committed by Daniel Wozniak
parent 3aed9b5ced
commit ba2b208b3b
3 changed files with 468 additions and 1 deletions

View file

@ -42,11 +42,21 @@ def grail_scene33_file(grail):
return grail / "scene33"
@pytest.fixture
def grail_scene33_clearsign_file(grail_scene33_file):
return grail_scene33_file.with_suffix(".clearsign.asc")
@pytest.fixture
def grail_scene33_file_hash(grail_scene33_file):
return hashlib.sha256(grail_scene33_file.read_bytes()).hexdigest()
@pytest.fixture
def grail_scene33_clearsign_file_hash(grail_scene33_clearsign_file):
return hashlib.sha256(grail_scene33_clearsign_file.read_bytes()).hexdigest()
@pytest.fixture(scope="module")
def state_file_account():
with pytest.helpers.create_account(create_group=True) as system_account:

View file

@ -3,13 +3,22 @@ import hashlib
import os
import shutil
import stat
import subprocess
import types
import psutil
import pytest
import salt.utils.files
import salt.utils.platform
try:
import gnupg as gnupglib
HAS_GNUPG = True
except ImportError:
HAS_GNUPG = False
pytestmark = [
pytest.mark.windows_whitelisted,
]
@ -19,14 +28,125 @@ BINARY_FILE = b"GIF89a\x01\x00\x01\x00\x80\x00\x00\x05\x04\x04\x00\x00\x00,\x00\
@pytest.fixture
def remote_grail_scene33(webserver, grail_scene33_file, grail_scene33_file_hash):
def remote_grail_scene33(
webserver,
grail_scene33_file,
grail_scene33_file_hash,
grail_scene33_clearsign_file,
grail_scene33_clearsign_file_hash,
):
return types.SimpleNamespace(
file=grail_scene33_file,
file_clearsign=grail_scene33_clearsign_file,
hash=grail_scene33_file_hash,
hash_clearsign=grail_scene33_clearsign_file_hash,
hash_file=grail_scene33_file.with_suffix(".SHA256"),
url=webserver.url("grail/scene33"),
url_hash=webserver.url("grail/scene33.SHA256"),
)
@pytest.fixture
def gpghome(tmp_path):
root = tmp_path / "gpghome"
root.mkdir(mode=0o0700)
try:
yield root
finally:
# Make sure we don't leave any gpg-agents running behind
gpg_connect_agent = shutil.which("gpg-connect-agent")
if gpg_connect_agent:
gnupghome = root / ".gnupg"
if not gnupghome.is_dir():
gnupghome = root
try:
subprocess.run(
[gpg_connect_agent, "killagent", "/bye"],
env={"GNUPGHOME": str(gnupghome)},
shell=False,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except subprocess.CalledProcessError:
# This is likely CentOS 7 or Amazon Linux 2
pass
# If the above errored or was not enough, as a last resort, let's check
# the running processes.
for proc in psutil.process_iter():
try:
if "gpg-agent" in proc.name():
for arg in proc.cmdline():
if str(root) in arg:
proc.terminate()
except Exception: # pylint: disable=broad-except
pass
@pytest.fixture
def gnupg(gpghome):
return gnupglib.GPG(gnupghome=str(gpghome))
@pytest.fixture
def a_pubkey():
return """\
-----BEGIN PGP PUBLIC KEY BLOCK-----
mI0EY9pxawEEAPpBbXxRYFUm6np5h746Nch7+OrbLdtBxP8x7VDOockr/x7drssb
llVFuK4HmiJg+Nkyakn3XmVYBHY2yBIkN/MP+R1zRxiFmniKOTD15UuHSQaWZTqh
qac6XrLZ20BiWl1fKweCz1wGUcMZaOBs0WVB0sIupqfS90Ub93VC/+oxABEBAAG0
JlNhbHRTdGFjayBBIFRlc3QgPGF0ZXN0QHNhbHRzdGFjay5jb20+iNEEEwEIADsW
IQT4zMjLXh2IaNqlhZqx+apXxJfnHAUCY9pxawIbAwULCQgHAgIiAgYVCgkICwIE
FgIDAQIeBwIXgAAKCRCx+apXxJfnHFDhA/47t5yYdCcjxXu/1Kn9sQwI+aq/S3x9
/ZKE+RodlryqA43BUT7N6JLQ5zJO6p+kRhMwCcVfBeDNJANqVi63HEDp8q3633BF
q1Cbi3BG0ugBdCADIETYBwl/ytMSgYwRO8b4TkYCyhWuWAgliVF3ceX0AVsng8pF
o6Vh4A3SqosQgA==
=eHpb
-----END PGP PUBLIC KEY BLOCK-----"""
@pytest.fixture
def a_fp():
return "F8CCC8CB5E1D8868DAA5859AB1F9AA57C497E71C"
@pytest.fixture
def b_pubkey():
return """\
-----BEGIN PGP PUBLIC KEY BLOCK-----
mI0EY9pxigEEANbHCh566IEbp9Ez1WE3oEi+XXyf7H3GDgrVc8v9COMexpAFkJa1
gG+yCm4bOZ5vHAXbP2rGvlOEcao3y3evj2TWahg0+05CDugRjL0pO4JcMUBV1mBZ
ynUGoQ5T+WtKilJ5k/JrSRpJW3y//46q0g5c470qVNn9ZX0YZW/b7DFXABEBAAG0
JlNhbHRTdGFjayBCIFRlc3QgPGJ0ZXN0QHNhbHRzdGFjay5jb20+iNEEEwEIADsW
IQSN8J3lSrZ/YDHXHW8h5Z/XBbOHgQUCY9pxigIbAwULCQgHAgIiAgYVCgkICwIE
FgIDAQIeBwIXgAAKCRAh5Z/XBbOHgTCuA/9mYXAehM9avvq0Jm2dVbPidqxLstki
tgo3gCWmO1b5dXEBrhOZ8pZAktQ3WWoRrbwpNA7NAEIDF5l6uwMLLbGPQ5jreOdP
uzHpHONR1WWAzw2dj3v+5IcLDQ4sLi9VRgJqtMasTd8TpqMCVNcMArDBiy5hRF/e
XWEkf19Nb8qrdg==
=OEiT
-----END PGP PUBLIC KEY BLOCK-----"""
@pytest.fixture
def b_fp():
return "8DF09DE54AB67F6031D71D6F21E59FD705B38781"
@pytest.fixture
def gpg_keys_present(gnupg, a_pubkey, b_pubkey, a_fp, b_fp):
pubkeys = [a_pubkey, b_pubkey]
fingerprints = [a_fp, b_fp]
gnupg.import_keys("\n".join(pubkeys))
present_keys = gnupg.list_keys()
for fp in fingerprints:
assert any(x["fingerprint"] == fp for x in present_keys)
yield
# cleanup is taken care of by gpghome and tmp_path
def _format_ids(key, value):
return "{}={}".format(key, value)
@ -801,6 +921,113 @@ def test_verify_ssl_https_source(file, tmp_path, ssl_webserver, verify_ssl):
assert name.exists()
@pytest.mark.skipif(HAS_GNUPG is False, reason="Needs python-gnupg library")
@pytest.mark.usefixtures("gpg_keys_present")
@pytest.mark.parametrize("signature", [True, ".asc"])
def test_file_managed_signature(
file, tmp_path, signature, remote_grail_scene33, gpghome
):
name = tmp_path / "test_file_managed_signature.txt"
source = remote_grail_scene33.url
if signature is True:
source += ".clearsign.asc"
contents_file = remote_grail_scene33.file_clearsign
source_hash = remote_grail_scene33.hash_clearsign
else:
signature = source + signature
contents_file = remote_grail_scene33.file
source_hash = remote_grail_scene33.hash
ret = file.managed(
str(name),
source=source,
source_hash=source_hash,
signature=signature,
gnupghome=str(gpghome),
)
assert ret.result is True
assert ret.changes
assert name.exists()
assert name.read_text() == contents_file.read_text()
@pytest.mark.skipif(HAS_GNUPG is False, reason="Needs python-gnupg library")
@pytest.mark.usefixtures("gpg_keys_present")
def test_file_managed_signature_fail(
file, tmp_path, remote_grail_scene33, gpghome, modules
):
name = tmp_path / "test_file_managed_signature_fail.txt"
source = remote_grail_scene33.url
signature = source + ".asc"
source_hash = remote_grail_scene33.hash
# although there are valid signatures, this will be denied since the one below is required
signed_by_all = ["DEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF"]
ret = file.managed(
str(name),
source=source,
source_hash=source_hash,
signature=signature,
gnupghome=str(gpghome),
signed_by_all=signed_by_all,
)
assert ret.result is False
assert not ret.changes
assert not name.exists()
# Ensure that a new state run will attempt to redownload the source
# instead of verifying the invalid signature again
assert not modules.cp.is_cached(source)
assert not modules.cp.is_cached(signature)
@pytest.mark.skipif(HAS_GNUPG is False, reason="Needs python-gnupg library")
@pytest.mark.usefixtures("gpg_keys_present")
@pytest.mark.parametrize("sig", [True, ".asc"])
def test_file_managed_source_hash_sig(
file, tmp_path, sig, remote_grail_scene33, gpghome
):
name = tmp_path / "test_file_managed_source_hash_sig.txt"
source = remote_grail_scene33.url
source_hash = remote_grail_scene33.url_hash
contents_file = remote_grail_scene33.file
if sig is True:
source_hash += ".clearsign.asc"
else:
sig = source_hash + sig
ret = file.managed(
str(name),
source=source,
source_hash=source_hash,
source_hash_sig=sig,
gnupghome=str(gpghome),
)
assert ret.result is True
assert ret.changes
assert name.exists()
assert name.read_text() == contents_file.read_text()
@pytest.mark.skipif(HAS_GNUPG is False, reason="Needs python-gnupg library")
@pytest.mark.usefixtures("gpg_keys_present")
def test_file_managed_source_hash_sig_fail(
file, tmp_path, remote_grail_scene33, gpghome
):
name = tmp_path / "test_file_managed_source_hash_sig.txt"
source = remote_grail_scene33.url
source_hash = remote_grail_scene33.url_hash
sig = source_hash + ".asc"
signed_by_all = ["DEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF"]
ret = file.managed(
str(name),
source=source,
source_hash=source_hash,
source_hash_sig=sig,
gnupghome=str(gpghome),
signed_by_all=signed_by_all,
)
assert ret.result is False
assert not ret.changes
assert not name.exists()
def test_issue_60203(
file,
tmp_path,

View file

@ -6,12 +6,23 @@ import os
import random
import shutil
import socket
import subprocess
import sys
from contextlib import closing
from pathlib import Path
import psutil
import pytest
import salt.utils.files
from tests.support.runtests import RUNTIME_VARS
try:
import gnupg as gnupglib
HAS_GNUPG = True
except ImportError:
HAS_GNUPG = False
class TestRequestHandler(http.server.SimpleHTTPRequestHandler):
@ -224,3 +235,222 @@ def test_archive_extracted_web_source_etag_operation(
# The modified time of the cached file now changes
assert cached_file_mtime != os.path.getmtime(cached_file)
@pytest.fixture
def gpghome(tmp_path):
root = tmp_path / "gpghome"
root.mkdir(mode=0o0700)
try:
yield root
finally:
# Make sure we don't leave any gpg-agents running behind
gpg_connect_agent = shutil.which("gpg-connect-agent")
if gpg_connect_agent:
gnupghome = root / ".gnupg"
if not gnupghome.is_dir():
gnupghome = root
try:
subprocess.run(
[gpg_connect_agent, "killagent", "/bye"],
env={"GNUPGHOME": str(gnupghome)},
shell=False,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except subprocess.CalledProcessError:
# This is likely CentOS 7 or Amazon Linux 2
pass
# If the above errored or was not enough, as a last resort, let's check
# the running processes.
for proc in psutil.process_iter():
try:
if "gpg-agent" in proc.name():
for arg in proc.cmdline():
if str(root) in arg:
proc.terminate()
except Exception: # pylint: disable=broad-except
pass
@pytest.fixture
def gnupg(gpghome):
return gnupglib.GPG(gnupghome=str(gpghome))
@pytest.fixture
def a_pubkey():
return """\
-----BEGIN PGP PUBLIC KEY BLOCK-----
mI0EY9pxawEEAPpBbXxRYFUm6np5h746Nch7+OrbLdtBxP8x7VDOockr/x7drssb
llVFuK4HmiJg+Nkyakn3XmVYBHY2yBIkN/MP+R1zRxiFmniKOTD15UuHSQaWZTqh
qac6XrLZ20BiWl1fKweCz1wGUcMZaOBs0WVB0sIupqfS90Ub93VC/+oxABEBAAG0
JlNhbHRTdGFjayBBIFRlc3QgPGF0ZXN0QHNhbHRzdGFjay5jb20+iNEEEwEIADsW
IQT4zMjLXh2IaNqlhZqx+apXxJfnHAUCY9pxawIbAwULCQgHAgIiAgYVCgkICwIE
FgIDAQIeBwIXgAAKCRCx+apXxJfnHFDhA/47t5yYdCcjxXu/1Kn9sQwI+aq/S3x9
/ZKE+RodlryqA43BUT7N6JLQ5zJO6p+kRhMwCcVfBeDNJANqVi63HEDp8q3633BF
q1Cbi3BG0ugBdCADIETYBwl/ytMSgYwRO8b4TkYCyhWuWAgliVF3ceX0AVsng8pF
o6Vh4A3SqosQgA==
=eHpb
-----END PGP PUBLIC KEY BLOCK-----"""
@pytest.fixture
def a_fp():
return "F8CCC8CB5E1D8868DAA5859AB1F9AA57C497E71C"
@pytest.fixture
def b_pubkey():
return """\
-----BEGIN PGP PUBLIC KEY BLOCK-----
mI0EY9pxigEEANbHCh566IEbp9Ez1WE3oEi+XXyf7H3GDgrVc8v9COMexpAFkJa1
gG+yCm4bOZ5vHAXbP2rGvlOEcao3y3evj2TWahg0+05CDugRjL0pO4JcMUBV1mBZ
ynUGoQ5T+WtKilJ5k/JrSRpJW3y//46q0g5c470qVNn9ZX0YZW/b7DFXABEBAAG0
JlNhbHRTdGFjayBCIFRlc3QgPGJ0ZXN0QHNhbHRzdGFjay5jb20+iNEEEwEIADsW
IQSN8J3lSrZ/YDHXHW8h5Z/XBbOHgQUCY9pxigIbAwULCQgHAgIiAgYVCgkICwIE
FgIDAQIeBwIXgAAKCRAh5Z/XBbOHgTCuA/9mYXAehM9avvq0Jm2dVbPidqxLstki
tgo3gCWmO1b5dXEBrhOZ8pZAktQ3WWoRrbwpNA7NAEIDF5l6uwMLLbGPQ5jreOdP
uzHpHONR1WWAzw2dj3v+5IcLDQ4sLi9VRgJqtMasTd8TpqMCVNcMArDBiy5hRF/e
XWEkf19Nb8qrdg==
=OEiT
-----END PGP PUBLIC KEY BLOCK-----"""
@pytest.fixture
def b_fp():
return "8DF09DE54AB67F6031D71D6F21E59FD705B38781"
@pytest.fixture
def gpg_keys_present(gnupg, a_pubkey, b_pubkey, a_fp, b_fp):
pubkeys = [a_pubkey, b_pubkey]
fingerprints = [a_fp, b_fp]
gnupg.import_keys("\n".join(pubkeys))
present_keys = gnupg.list_keys()
for fp in fingerprints:
assert any(x["fingerprint"] == fp for x in present_keys)
yield
# cleanup is taken care of by gpghome and tmp_path
@pytest.fixture(scope="module", autouse=True)
def sig_files_present(web_root, modules):
base = Path(RUNTIME_VARS.BASE_FILES)
for file in [
"custom.tar.gz",
"custom.tar.gz.asc",
"custom.tar.gz.SHA256",
"custom.tar.gz.SHA256.clearsign.asc",
"custom.tar.gz.SHA256.asc",
]:
modules.file.copy(base / file, Path(web_root) / file)
@pytest.mark.skipif(HAS_GNUPG is False, reason="Needs python-gnupg library")
@pytest.mark.usefixtures("gpg_keys_present")
def test_archive_extracted_signature(tmp_path, gpghome, free_port, modules, states):
name = tmp_path / "test_archive_extracted_signature"
source = f"http://localhost:{free_port}/custom.tar.gz"
signature = source + ".asc"
source_hash = source + ".SHA256"
ret = states.archive.extracted(
str(name),
source=source,
source_hash=source_hash,
archive_format="tar",
options="z",
signature=signature,
gnupghome=str(gpghome),
)
assert ret.result is True
assert ret.changes
assert name.exists()
assert modules.file.find(str(name))
@pytest.mark.skipif(HAS_GNUPG is False, reason="Needs python-gnupg library")
@pytest.mark.usefixtures("gpg_keys_present")
def test_archive_extracted_signature_fail(
tmp_path, gpghome, free_port, modules, states
):
name = tmp_path / "test_archive_extracted_signature_fail"
source = f"http://localhost:{free_port}/custom.tar.gz"
signature = source + ".asc"
source_hash = source + ".SHA256"
# although there are valid signatures, this will be denied since the one below is required
signed_by_all = ["DEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF"]
ret = states.archive.extracted(
str(name),
source=source,
source_hash=source_hash,
archive_format="tar",
options="z",
signature=signature,
signed_by_all=signed_by_all,
gnupghome=str(gpghome),
)
assert ret.result is False
assert not ret.changes
assert not name.exists()
assert not modules.cp.is_cached(source)
assert not modules.cp.is_cached(signature)
@pytest.mark.skipif(HAS_GNUPG is False, reason="Needs python-gnupg library")
@pytest.mark.usefixtures("gpg_keys_present")
@pytest.mark.parametrize("sig", [True, ".asc"])
def test_archive_extracted_source_hash_sig(
tmp_path, sig, gpghome, free_port, modules, states
):
name = tmp_path / "test_archive_extracted_source_hash_sig"
source = f"http://localhost:{free_port}/custom.tar.gz"
source_hash = source + ".SHA256"
if sig is True:
source_hash += ".clearsign.asc"
else:
sig = source_hash + sig
ret = states.archive.extracted(
str(name),
source=source,
source_hash=source_hash,
archive_format="tar",
options="z",
source_hash_sig=sig,
gnupghome=str(gpghome),
)
assert ret.result is True
assert ret.changes
assert name.exists()
assert modules.file.find(str(name))
@pytest.mark.skipif(HAS_GNUPG is False, reason="Needs python-gnupg library")
@pytest.mark.usefixtures("gpg_keys_present")
@pytest.mark.parametrize("sig", [True, ".asc"])
def test_archive_extracted_source_hash_sig_fail(
tmp_path, sig, gpghome, free_port, modules, states
):
name = tmp_path / "test_archive_extracted_source_hash_sig_fail"
source = f"http://localhost:{free_port}/custom.tar.gz"
source_hash = source + ".SHA256.clearsign.asc"
signed_by_any = ["DEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF"]
ret = states.archive.extracted(
str(name),
source=source,
source_hash=source_hash,
archive_format="tar",
options="z",
source_hash_sig=True,
signed_by_any=signed_by_any,
gnupghome=str(gpghome),
)
assert ret.result is False
assert not ret.changes
assert not name.exists()
assert not modules.cp.is_cached(source)
assert not modules.cp.is_cached(source_hash)