Turn entropy generation into a helper

Signed-off-by: Pedro Algarvio <palgarvio@vmware.com>
This commit is contained in:
Pedro Algarvio 2022-03-26 08:29:02 +00:00 committed by Pedro Algarvio
parent 0ffed58e1a
commit 096ea64485
2 changed files with 101 additions and 56 deletions

View file

@ -5,7 +5,6 @@
import datetime
import logging
import pathlib
import shutil
import subprocess
import time
@ -16,6 +15,7 @@ import pytest
import salt.modules.gpg as gpg
from salt.exceptions import SaltInvocationError
from tests.support.mock import MagicMock, patch
from tests.support.pytest.helpers import EntropyGenerator
pytest.importorskip("gnupg")
@ -158,61 +158,9 @@ OZV2Hg+93dg3Wi6g/JW4OuTKWKuHRqpRB1J4i4lO
@pytest.fixture(autouse=True)
def entropy_generation(tmp_path):
max_time = 5 * 60 # Take at most 5 minutes to generate enough entropy
minimum_entropy = 1500
kernel_entropy_file = pathlib.Path("/proc/sys/kernel/random/entropy_avail")
if kernel_entropy_file.exists():
available_entropy = int(kernel_entropy_file.read_text().strip())
log.critical("Available Entropy: %s", available_entropy)
if available_entropy >= minimum_entropy:
return
rngd = shutil.which("rngd")
openssl = shutil.which("openssl")
timeout = time.time() + max_time
if rngd:
log.info("Using rngd to generate entropy")
while available_entropy < minimum_entropy:
if time.time() >= timeout:
pytest.skip(
"Skipping test as generating entropy took more than 5 minutes. "
"Current entropy value {}".format(available_entropy)
)
subprocess.run([rngd, "-r", "/dev/urandom"], shell=False, check=True)
available_entropy = int(kernel_entropy_file.read_text().strip())
log.critical("Available Entropy: %s", available_entropy)
elif openssl:
log.info("Using openssl to generate entropy")
target_file = tmp_path / "sample.txt"
while available_entropy < minimum_entropy:
if time.time() >= timeout:
pytest.skip(
"Skipping test as generating entropy took more than 5 minutes. "
"Current entropy value {}".format(available_entropy)
)
subprocess.run(
[
"openssl",
"rand",
"-out",
str(tmp_path / "sample.txt"),
"-base64",
str(int(2 ** 30 * 3 / 4)), # 1GB
],
shell=False,
check=True,
)
target_file.unlink()
available_entropy = int(kernel_entropy_file.read_text().strip())
log.critical("Available Entropy: %s", available_entropy)
else:
pytest.skip(
"Skipping test as there's not enough entropy({}) to continue".format(
available_entropy
)
)
else:
log.info("The '%s' file is not avilable", kernel_entropy_file)
def entropy_generation():
with EntropyGenerator():
yield
@pytest.fixture

View file

@ -6,12 +6,17 @@
"""
import logging
import os
import pathlib
import shutil
import subprocess
import tempfile
import textwrap
import time
import types
import warnings
from contextlib import contextmanager
import _pytest._version
import attr
import pytest
import salt.utils.platform
@ -22,6 +27,9 @@ from tests.support.pytest.loader import LoaderModuleMock
from tests.support.runtests import RUNTIME_VARS
from tests.support.sminion import create_sminion
PYTEST_GE_7 = getattr(_pytest._version, "version_tuple", (-1, -1)) >= (7, 0)
log = logging.getLogger(__name__)
@ -605,6 +613,95 @@ class FakeSaltExtension:
shutil.rmtree(str(self.srcdir), ignore_errors=True)
class EntropyGenerator:
def __init__(self, max_minutes=5, minimum_entropy=800):
self.max_minutes = max_minutes
self.minimum_entropy = minimum_entropy
def generate_entropy(self):
max_time = self.max_minutes * 60
kernel_entropy_file = pathlib.Path("/proc/sys/kernel/random/entropy_avail")
if not kernel_entropy_file.exists():
log.info("The '%s' file is not avilable", kernel_entropy_file)
return
available_entropy = int(kernel_entropy_file.read_text().strip())
log.info("Available Entropy: %s", available_entropy)
if available_entropy >= self.minimum_entropy:
return
exc_kwargs = {}
if PYTEST_GE_7:
exc_kwargs["_use_item_location"] = True
rngd = shutil.which("rngd")
openssl = shutil.which("openssl")
timeout = time.time() + max_time
if rngd:
log.info("Using rngd to generate entropy")
while True:
if time.time() >= timeout:
raise pytest.skip.Exception(
"Skipping test as generating entropy took more than {} minutes. "
"Current entropy value {}".format(
self.max_minutes, available_entropy
),
**exc_kwargs
)
subprocess.run([rngd, "-r", "/dev/urandom"], shell=False, check=True)
available_entropy = int(kernel_entropy_file.read_text().strip())
log.info("Available Entropy: %s", available_entropy)
if available_entropy >= self.minimum_entropy:
break
elif openssl:
log.info("Using openssl to generate entropy")
while True:
if time.time() >= timeout:
raise pytest.skip.Exception(
"Skipping test as generating entropy took more than {} minutes. "
"Current entropy value {}".format(
self.max_minutes, available_entropy
),
**exc_kwargs
)
target_file = tempfile.NamedTemporaryFile(
delete=False, suffix="sample.txt"
)
target_file.close()
subprocess.run(
[
"openssl",
"rand",
"-out",
target_file.name,
"-base64",
str(int(2 ** 30 * 3 / 4)), # 1GB
],
shell=False,
check=True,
)
os.unlink(target_file.name)
available_entropy = int(kernel_entropy_file.read_text().strip())
log.info("Available Entropy: %s", available_entropy)
if available_entropy >= self.minimum_entropy:
break
else:
raise pytest.skip.Exception(
"Skipping test as there's not enough entropy({}) to continue".format(
available_entropy
),
**exc_kwargs
)
def __enter__(self):
self.generate_entropy()
return self
def __exit__(self, *_):
pass
# Only allow star importing the functions defined in this module
__all__ = [
name