salt/pkg/tests/support/helpers.py

1404 lines
49 KiB
Python

import atexit
import contextlib
import logging
import os
import pathlib
import pprint
import re
import shutil
import tarfile
import textwrap
import time
from typing import TYPE_CHECKING, Any, Dict, List
from zipfile import ZipFile
import attr
import distro
import psutil
import pytest
import requests
from pytestshellutils.shell import DaemonImpl, Subprocess
from pytestshellutils.utils.processes import (
ProcessResult,
_get_cmdline,
terminate_process,
)
from pytestskipmarkers.utils import platform
from saltfactories.bases import SystemdSaltDaemonImpl
from saltfactories.cli import call, key, salt
from saltfactories.daemons import api, master, minion
try:
import crypt
HAS_CRYPT = True
except ImportError:
HAS_CRYPT = False
try:
import pwd
HAS_PWD = True
except ImportError:
HAS_PWD = False
TESTS_DIR = pathlib.Path(__file__).resolve().parent.parent
CODE_DIR = TESTS_DIR.parent
ARTIFACTS_DIR = CODE_DIR / "artifacts"
log = logging.getLogger(__name__)
@attr.s(kw_only=True, slots=True)
class SaltPkgInstall:
conf_dir: pathlib.Path = attr.ib()
system_service: bool = attr.ib(default=False)
proc: Subprocess = attr.ib(init=False)
pkgs: List[str] = attr.ib(factory=list)
onedir: bool = attr.ib(default=False)
singlebin: bool = attr.ib(default=False)
compressed: bool = attr.ib(default=False)
hashes: Dict[str, Dict[str, Any]] = attr.ib()
root: pathlib.Path = attr.ib(default=None)
run_root: pathlib.Path = attr.ib(default=None)
ssm_bin: pathlib.Path = attr.ib(default=None)
bin_dir: pathlib.Path = attr.ib(default=None)
# The artifact is an installer (exe, pkg, rpm, deb)
installer_pkg: bool = attr.ib(default=False)
upgrade: bool = attr.ib(default=False)
# install salt or not. This allows someone
# to test a currently installed version of salt
no_install: bool = attr.ib(default=False)
no_uninstall: bool = attr.ib(default=False)
distro_id: str = attr.ib(init=False)
pkg_mngr: str = attr.ib(init=False)
rm_pkg: str = attr.ib(init=False)
salt_pkgs: List[str] = attr.ib(init=False)
binary_paths: List[pathlib.Path] = attr.ib(init=False)
@proc.default
def _default_proc(self):
return Subprocess()
@hashes.default
def _default_hashes(self):
return {
"BLAKE2B": {"file": None, "tool": "-blake2b512"},
"SHA3_512": {"file": None, "tool": "-sha3-512"},
"SHA512": {"file": None, "tool": "-sha512"},
}
@distro_id.default
def _default_distro_id(self):
return distro.id().lower()
@pkg_mngr.default
def _default_pkg_mngr(self):
if self.distro_id in ("centos", "redhat", "amzn", "fedora"):
return "yum"
elif self.distro_id in ("ubuntu", "debian"):
ret = self.proc.run("apt-get", "update")
self._check_retcode(ret)
return "apt-get"
@rm_pkg.default
def _default_rm_pkg(self):
if self.distro_id in ("centos", "redhat", "amzn", "fedora"):
return "remove"
elif self.distro_id in ("ubuntu", "debian"):
return "purge"
@salt_pkgs.default
def _default_salt_pkgs(self):
salt_pkgs = [
"salt-api",
"salt-syndic",
"salt-ssh",
"salt-master",
"salt-cloud",
"salt-minion",
]
if self.distro_id in ("centos", "redhat", "amzn", "fedora"):
salt_pkgs.append("salt")
elif self.distro_id in ("ubuntu", "debian"):
salt_pkgs.append("salt-common")
return salt_pkgs
def __attrs_post_init__(self):
file_ext_re = r"tar\.gz"
if platform.is_darwin():
file_ext_re = r"tar\.gz|pkg"
if platform.is_windows():
file_ext_re = "zip|exe"
for f_path in ARTIFACTS_DIR.glob("**/*.*"):
f_path = str(f_path)
if re.search(f"salt-(.*).({file_ext_re})$", f_path):
# Compressed can be zip, tar.gz, exe, or pkg. All others are
# deb and rpm
self.compressed = True
file_ext = os.path.splitext(f_path)[1].strip(".")
if file_ext == "gz":
if f_path.endswith("tar.gz"):
file_ext = "tar.gz"
self.pkgs.append(f_path)
if platform.is_windows():
self.root = pathlib.Path(os.getenv("LocalAppData")).resolve()
if file_ext == "zip":
with ZipFile(f_path, "r") as zip:
first = zip.infolist()[0]
if first.filename == "salt/ssm.exe":
self.onedir = True
self.bin_dir = self.root / "salt" / "salt"
self.run_root = self.bin_dir / "salt.exe"
self.ssm_bin = self.root / "salt" / "ssm.exe"
elif first.filename == "salt.exe":
self.singlebin = True
self.run_root = self.root / "salt.exe"
self.ssm_bin = self.root / "ssm.exe"
else:
log.error(
"Unexpected archive layout. First: %s",
first.filename,
)
elif file_ext == "exe":
self.onedir = True
self.installer_pkg = True
install_dir = pathlib.Path(
os.getenv("ProgramFiles"), "Salt Project", "Salt"
).resolve()
self.bin_dir = install_dir / "bin"
self.run_root = self.bin_dir / "salt.exe"
self.ssm_bin = self.bin_dir / "ssm.exe"
else:
log.error("Unexpected file extension: %s", file_ext)
else:
if platform.is_darwin():
self.root = pathlib.Path(os.sep, "opt")
else:
self.root = pathlib.Path(os.sep, "usr", "local", "bin")
if file_ext == "pkg":
self.onedir = True
self.installer_pkg = True
self.bin_dir = self.root / "salt" / "bin"
self.run_root = self.bin_dir / "run"
elif file_ext == "tar.gz":
with tarfile.open(f_path) as tar:
# The first item will be called salt
first = next(iter(tar.getmembers()))
if first.name == "salt" and first.isdir():
self.onedir = True
self.bin_dir = self.root / "salt" / "run"
self.run_root = self.bin_dir / "run"
elif first.name == "salt" and first.isfile():
self.singlebin = True
self.run_root = self.root / "salt"
else:
log.error(
"Unexpected archive layout. First: %s (isdir: %s, isfile: %s)",
first.name,
first.isdir(),
first.isfile(),
)
else:
log.error("Unexpected file extension: %s", file_ext)
if re.search(r"salt(.*)(x86_64|all|amd64|aarch64)\.(rpm|deb)$", f_path):
self.installer_pkg = True
self.pkgs.append(f_path)
if not self.pkgs:
pytest.fail("Could not find Salt Artifacts")
if not self.compressed:
self.binary_paths = {
"salt": ["salt"],
"api": ["salt-api"],
"call": ["salt-call"],
"cloud": ["salt-cloud"],
"cp": ["salt-cp"],
"key": ["salt-key"],
"master": ["salt-master"],
"minion": ["salt-minion"],
"proxy": ["salt-proxy"],
"run": ["salt-run"],
"ssh": ["salt-ssh"],
"syndic": ["salt-syndic"],
"spm": ["spm"],
"pip": ["salt-pip"],
}
else:
self.binary_paths = {
"salt": [str(self.run_root)],
"api": [str(self.run_root), "api"],
"call": [str(self.run_root), "call"],
"cloud": [str(self.run_root), "cloud"],
"cp": [str(self.run_root), "cp"],
"key": [str(self.run_root), "key"],
"master": [str(self.run_root), "master"],
"minion": [str(self.run_root), "minion"],
"proxy": [str(self.run_root), "proxy"],
"run": [str(self.run_root), "run"],
"ssh": [str(self.run_root), "ssh"],
"syndic": [str(self.run_root), "syndic"],
"spm": [str(self.run_root), "spm"],
"pip": [str(self.run_root), "pip"],
}
@staticmethod
def salt_factories_root_dir(system_service: bool = False) -> pathlib.Path:
if system_service is False:
return None
if platform.is_windows():
return pathlib.Path("C:/salt")
if platform.is_darwin():
return pathlib.Path("/opt/salt")
return pathlib.Path("/")
def _check_retcode(self, ret):
"""
helper function ot check subprocess.run
returncode equals 0, if not raise assertionerror
"""
if ret.returncode != 0:
log.error(ret)
assert ret.returncode == 0
return True
@property
def salt_hashes(self):
for _hash in self.hashes.keys():
for fpath in ARTIFACTS_DIR.glob(f"**/*{_hash}*"):
fpath = str(fpath)
if re.search(f"{_hash}", fpath):
self.hashes[_hash]["file"] = fpath
return self.hashes
def _install_ssm_service(self):
# Register the services
# run_root and ssm_bin are configured in helper.py to point to the
# correct binary location
log.debug("Installing master service")
ret = self.proc.run(
str(self.ssm_bin),
"install",
"salt-master",
str(self.run_root),
"master",
"-c",
str(self.conf_dir),
)
self._check_retcode(ret)
log.debug("Installing minion service")
ret = self.proc.run(
str(self.ssm_bin),
"install",
"salt-minion",
str(self.run_root),
"minion",
"-c",
str(self.conf_dir),
)
self._check_retcode(ret)
log.debug("Installing api service")
ret = self.proc.run(
str(self.ssm_bin),
"install",
"salt-api",
str(self.run_root),
"api",
"-c",
str(self.conf_dir),
)
self._check_retcode(ret)
def _install_compressed(self, upgrade=False):
pkg = self.pkgs[0]
log.info("Installing %s", pkg)
if platform.is_windows():
if pkg.endswith("zip"):
# Extract the files
log.debug("Extracting zip file")
with ZipFile(pkg, "r") as zip:
zip.extractall(path=self.root)
elif pkg.endswith("exe"):
# Install the package
log.debug("Installing: %s", str(pkg))
if upgrade:
ret = self.proc.run(str(pkg), "/S")
else:
ret = self.proc.run(str(pkg), "/start-minion=0", "/S")
self._check_retcode(ret)
# Remove the service installed by the installer
log.debug("Removing installed salt-minion service")
self.proc.run(
str(self.ssm_bin),
"remove",
"salt-minion",
"confirm",
)
else:
log.error("Unknown package type: %s", pkg)
if self.system_service:
self._install_ssm_service()
elif platform.is_darwin():
if pkg.endswith("pkg"):
daemons_dir = pathlib.Path(os.sep, "Library", "LaunchDaemons")
service_name = "com.saltstack.salt.minion"
plist_file = daemons_dir / f"{service_name}.plist"
log.debug("Installing: %s", str(pkg))
ret = self.proc.run("installer", "-pkg", str(pkg), "-target", "/")
self._check_retcode(ret)
# Stop the service installed by the installer
self.proc.run(
"launchctl",
"disable",
f"system/{service_name}",
)
self.proc.run("launchctl", "bootout", "system", str(plist_file))
else:
log.debug("Extracting tarball into %s", self.root)
with tarfile.open(pkg) as tar: # , "r:gz")
tar.extractall(path=str(self.root))
else:
log.debug("Extracting tarball into %s", self.root)
with tarfile.open(pkg) as tar: # , "r:gz")
tar.extractall(path=str(self.root))
def _install_pkgs(self, upgrade=False):
if upgrade:
log.info("Installing packages:\n%s", pprint.pformat(self.pkgs))
if self.distro_id in ("ubuntu", "debian"):
# --allow-downgrades and yum's downgrade is a workaround since
# dpkg/yum is seeing 3005 version as a greater version than our nightly builds.
# Also this helps work around the situation when the Salt
# branch has not been updated with code so the versions might
# be the same and you can still install and test the new
# package.
ret = self.proc.run(
self.pkg_mngr, "upgrade", "-y", "--allow-downgrades", *self.pkgs
)
else:
ret = self.proc.run(self.pkg_mngr, "upgrade", "-y", *self.pkgs)
if (
ret.returncode != 0
or "does not update installed package" in ret.stdout
or "cannot update it" in ret.stderr
):
log.info(
"The new packages version is not returning as new. Attempting to downgrade"
)
ret = self.proc.run(self.pkg_mngr, "downgrade", "-y", *self.pkgs)
if ret.returncode != 0:
log.error("Could not install the packages")
return False
else:
log.info("Installing packages:\n%s", pprint.pformat(self.pkgs))
ret = self.proc.run(self.pkg_mngr, "install", "-y", *self.pkgs)
log.info(ret)
self._check_retcode(ret)
def install(self, upgrade=False):
if self.compressed:
self._install_compressed(upgrade=upgrade)
else:
self._install_pkgs(upgrade=upgrade)
def install_previous(self):
"""
Install previous version. This is used for
upgrade tests.
"""
major_ver = "3005"
min_ver = f"{major_ver}"
os_name, version, code_name = distro.linux_distribution()
if os_name:
os_name = os_name.split()[0].lower()
if os_name == "centos" or os_name == "fedora":
os_name = "redhat"
# TODO: When tiamat is considered production we need to update these
# TODO: paths to the tiamat paths instead of the old package paths.
if os_name.lower() in ["redhat", "centos", "amazon", "fedora"]:
for fp in pathlib.Path("/etc", "yum.repos.d").glob("epel*"):
fp.unlink()
ret = self.proc.run(
"rpm",
"--import",
f"https://repo.saltproject.io/salt/py3/{os_name}/{version}/x86_64/{major_ver}/SALTSTACK-GPG-KEY.pub",
)
self._check_retcode(ret)
ret = self.proc.run(
"curl",
"-fsSL",
f"https://repo.saltproject.io/salt/py3/{os_name}/{version}/x86_64/{major_ver}.repo",
"-o",
f"/etc/yum.repos.d/salt-{os_name}.repo",
)
self._check_retcode(ret)
ret = self.proc.run(self.pkg_mngr, "clean", "expire-cache")
self._check_retcode(ret)
ret = self.proc.run(
self.pkg_mngr,
"install",
*self.salt_pkgs,
"-y",
)
self._check_retcode(ret)
elif os_name.lower() in ["debian", "ubuntu"]:
ret = self.proc.run(self.pkg_mngr, "install", "curl", "-y")
self._check_retcode(ret)
ret = self.proc.run(self.pkg_mngr, "install", "apt-transport-https", "-y")
self._check_retcode(ret)
ret = self.proc.run(
"curl",
"-fsSL",
"-o",
"/usr/share/keyrings/salt-archive-keyring.gpg",
f"https://repo.saltproject.io/salt/py3/{os_name}/{version}/amd64/{major_ver}/salt-archive-keyring.gpg",
)
self._check_retcode(ret)
with open(
pathlib.Path("/etc", "apt", "sources.list.d", "salt.list"), "w"
) as fp:
fp.write(
"deb [signed-by=/usr/share/keyrings/salt-archive-keyring.gpg arch=amd64] "
f"https://repo.saltproject.io/salt/py3/{os_name}/{version}/amd64/{major_ver} {code_name} main"
)
ret = self.proc.run(self.pkg_mngr, "update")
self._check_retcode(ret)
ret = self.proc.run(
self.pkg_mngr,
"install",
*self.salt_pkgs,
"-y",
)
self._check_retcode(ret)
elif platform.is_windows():
win_pkg = f"salt-{min_ver}-1-windows-amd64.exe"
win_pkg_url = (
f"https://repo.saltproject.io/salt/py3/windows/{major_ver}/{win_pkg}"
)
pkg_path = pathlib.Path(r"C:\TEMP", win_pkg)
pkg_path.parent.mkdir(exist_ok=True)
ret = requests.get(win_pkg_url)
with open(pkg_path, "wb") as fp:
fp.write(ret.content)
ret = self.proc.run(pkg_path, "/start-minion=0", "/S")
self._check_retcode(ret)
log.debug("Removing installed salt-minion service")
self.proc.run(
"cmd", "/c", str(self.ssm_bin), "remove", "salt-minion", "confirm"
)
if self.system_service:
self._install_system_service()
self.onedir = True
self.installer_pkg = True
install_dir = pathlib.Path(
os.getenv("ProgramFiles"), "Salt Project", "Salt"
).resolve()
self.bin_dir = install_dir / "bin"
self.run_root = self.bin_dir / "salt.exe"
self.ssm_bin = self.bin_dir / "ssm.exe"
def _uninstall_compressed(self):
if platform.is_windows():
if self.system_service:
# Uninstall the services
log.debug("Uninstalling master service")
self.proc.run(
str(self.ssm_bin),
"stop",
"salt-master",
)
self.proc.run(
str(self.ssm_bin),
"remove",
"salt-master",
"confirm",
)
log.debug("Uninstalling minion service")
self.proc.run(
str(self.ssm_bin),
"stop",
"salt-minion",
)
self.proc.run(
str(self.ssm_bin),
"remove",
"salt-minion",
"confirm",
)
log.debug("Uninstalling api service")
self.proc.run(
str(self.ssm_bin),
"stop",
"salt-api",
)
self.proc.run(
str(self.ssm_bin),
"remove",
"salt-api",
"confirm",
)
log.debug("Removing the Salt Service Manager")
if self.ssm_bin:
try:
self.ssm_bin.unlink()
except PermissionError:
atexit.register(self.ssm_bin.unlink)
if platform.is_darwin():
# From here: https://stackoverflow.com/a/46118276/4581998
daemons_dir = pathlib.Path(os.sep, "Library", "LaunchDaemons")
for service in ("minion", "master", "api", "syndic"):
service_name = f"com.saltstack.salt.{service}"
plist_file = daemons_dir / f"{service_name}.plist"
# Stop the services
self.proc.run("launchctl", "disable", f"system/{service_name}")
self.proc.run("launchctl", "bootout", "system", str(plist_file))
# Remove Symlink to salt-config
if os.path.exists("/usr/local/sbin/salt-config"):
os.unlink("/usr/local/sbin/salt-config")
# Remove supporting files
self.proc.run(
"pkgutil",
"--only-files",
"--files",
"com.saltstack.salt",
"|",
"grep",
"-v",
"opt",
"|",
"tr",
"'\n'",
"' '",
"|",
"xargs",
"-0",
"rm",
"-f",
)
# Remove directories
if os.path.exists("/etc/salt"):
shutil.rmtree("/etc/salt")
# Remove path
if os.path.exists("/etc/paths.d/salt"):
os.remove("/etc/paths.d/salt")
# Remove receipt
self.proc.run("pkgutil", "--forget", "com.saltstack.salt")
if self.singlebin:
log.debug("Deleting the salt binary: %s", self.run_root)
if self.run_root:
try:
self.run_root.unlink()
except PermissionError:
atexit.register(self.run_root.unlink)
else:
log.debug("Deleting the onedir directory: %s", self.root / "salt")
shutil.rmtree(str(self.root / "salt"))
def _uninstall_pkgs(self):
log.debug("Un-Installing packages:\n%s", pprint.pformat(self.salt_pkgs))
ret = self.proc.run(self.pkg_mngr, self.rm_pkg, "-y", *self.salt_pkgs)
self._check_retcode(ret)
def uninstall(self):
if self.compressed:
self._uninstall_compressed()
else:
self._uninstall_pkgs()
def assert_uninstalled(self):
"""
Assert that the paths in /opt/saltstack/ were correctly
removed or not removed
"""
return
if platform.is_windows():
# I'm not sure where the /opt/saltstack path is coming from
# This is the path we're using to test windows
opt_path = pathlib.Path(os.getenv("LocalAppData"), "salt", "pypath")
else:
opt_path = pathlib.Path(os.sep, "opt", "saltstack", "salt", "pypath")
if not opt_path.exists():
if platform.is_windows():
assert not opt_path.parent.exists()
else:
assert not opt_path.parent.parent.exists()
else:
opt_path_contents = list(opt_path.rglob("*"))
if not opt_path_contents:
pytest.fail(
f"The path '{opt_path}' exists but there are no files in it."
)
else:
for path in list(opt_path_contents):
if path.name in (".installs.json", "__pycache__"):
opt_path_contents.remove(path)
if opt_path_contents:
pytest.fail(
"The test left some files behind: {}".format(
", ".join([str(p) for p in opt_path_contents])
)
)
def write_launchd_conf(self, service):
service_name = f"com.saltstack.salt.{service}"
ret = self.proc.run("launchctl", "list", service_name)
# 113 means it couldn't find a service with that name
if ret.returncode == 113:
daemons_dir = pathlib.Path(os.sep, "Library", "LaunchDaemons")
plist_file = daemons_dir / f"{service_name}.plist"
# Make sure we're using this plist file
if plist_file.exists():
log.warning("Removing existing plist file for service: %s", service)
plist_file.unlink()
log.debug("Creating plist file for service: %s", service)
contents = textwrap.dedent(
f"""\
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>{service_name}</string>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>ProgramArguments</key>
<array>
<string>{self.run_root}</string>
<string>{service}</string>
<string>-c</string>
<string>{self.conf_dir}</string>
</array>
<key>SoftResourceLimits</key>
<dict>
<key>NumberOfFiles</key>
<integer>100000</integer>
</dict>
<key>HardResourceLimits</key>
<dict>
<key>NumberOfFiles</key>
<integer>100000</integer>
</dict>
</dict>
</plist>
"""
)
plist_file.write_text(contents, encoding="utf-8")
contents = plist_file.read_text()
log.debug("Created '%s'. Contents:\n%s", plist_file, contents)
# Delete the plist file upon completion
atexit.register(plist_file.unlink)
def write_systemd_conf(self, service, binary):
ret = self.proc.run("systemctl", "daemon-reload")
self._check_retcode(ret)
ret = self.proc.run("systemctl", "status", service)
if ret.returncode in (3, 4):
log.warning(
"No systemd unit file was found for service %s. Creating one.", service
)
contents = textwrap.dedent(
"""\
[Unit]
Description={service}
[Service]
KillMode=process
Type=notify
NotifyAccess=all
LimitNOFILE=8192
ExecStart={tgt} -c {conf_dir}
[Install]
WantedBy=multi-user.target
"""
)
if isinstance(binary, list) and len(binary) == 1:
binary = shutil.which(binary[0]) or binary[0]
elif isinstance(binary, list):
binary = " ".join(binary)
unit_path = pathlib.Path(
os.sep, "etc", "systemd", "system", f"{service}.service"
)
contents = contents.format(
service=service, tgt=binary, conf_dir=self.conf_dir
)
log.info("Created '%s'. Contents:\n%s", unit_path, contents)
unit_path.write_text(contents, encoding="utf-8")
ret = self.proc.run("systemctl", "daemon-reload")
atexit.register(unit_path.unlink)
self._check_retcode(ret)
def __enter__(self):
if not self.no_install:
if self.upgrade:
self.install_previous()
else:
self.install()
return self
def __exit__(self, *_):
if not self.no_uninstall:
self.uninstall()
self.assert_uninstalled()
class PkgSystemdSaltDaemonImpl(SystemdSaltDaemonImpl):
def get_service_name(self):
if self._service_name is None:
self._service_name = self.factory.script_name
return self._service_name
@attr.s(kw_only=True)
class PkgLaunchdSaltDaemonImpl(PkgSystemdSaltDaemonImpl):
plist_file = attr.ib()
@plist_file.default
def _default_plist_file(self):
daemons_dir = pathlib.Path(os.sep, "Library", "LaunchDaemons")
return daemons_dir / f"{self.get_service_name()}.plist"
def get_service_name(self):
if self._service_name is None:
service_name = super().get_service_name()
if "-" in service_name:
service_name = service_name.split("-")[-1]
self._service_name = f"com.saltstack.salt.{service_name}"
return self._service_name
def cmdline(self, *args): # pylint: disable=arguments-differ
"""
Construct a list of arguments to use when starting the subprocess.
:param str args:
Additional arguments to use when starting the subprocess
"""
if args: # pragma: no cover
log.debug(
"%s.run() is ignoring the passed in arguments: %r",
self.__class__.__name__,
args,
)
self._internal_run(
"launchctl",
"enable",
f"system/{self.get_service_name()}",
)
return (
"launchctl",
"bootstrap",
"system",
str(self.plist_file),
)
def is_running(self):
"""
Returns true if the sub-process is alive.
"""
if self._process is None:
ret = self._internal_run("launchctl", "list", self.get_service_name())
if ret.stdout == "":
return False
if "PID" not in ret.stdout:
return False
pid = None
# PID in a line that looks like this
# "PID" = 445;
for line in ret.stdout.splitlines():
if "PID" in line:
pid = line.rstrip(";").split(" = ")[1]
if pid is None:
return False
self._process = psutil.Process(int(pid))
return self._process.is_running()
def _terminate(self):
"""
This method actually terminates the started daemon.
"""
# We completely override the parent class method because we're not using
# the self._terminal property, it's a launchd service
if self._process is None: # pragma: no cover
if TYPE_CHECKING:
# Make mypy happy
assert self._terminal_result
return (
self._terminal_result
) # pylint: disable=access-member-before-definition
atexit.unregister(self.terminate)
log.info("Stopping %s", self.factory)
pid = self.pid
# Collect any child processes information before terminating the process
with contextlib.suppress(psutil.NoSuchProcess):
for child in psutil.Process(pid).children(recursive=True):
if (
child not in self._children
): # pylint: disable=access-member-before-definition
self._children.append(
child
) # pylint: disable=access-member-before-definition
if self._process.is_running(): # pragma: no cover
cmdline = _get_cmdline(self._process)
else:
cmdline = []
# Disable the service
self._internal_run(
"launchctl",
"disable",
f"system/{self.get_service_name()}",
)
# Unload the service
self._internal_run("launchctl", "bootout", "system", str(self.plist_file))
if self._process.is_running(): # pragma: no cover
try:
self._process.wait()
except psutil.TimeoutExpired:
self._process.terminate()
try:
self._process.wait()
except psutil.TimeoutExpired:
pass
exitcode = self._process.wait() or 0
# Dereference the internal _process attribute
self._process = None
# Lets log and kill any child processes left behind, including the main subprocess
# if it failed to properly stop
terminate_process(
pid=pid,
kill_children=True,
children=self._children, # pylint: disable=access-member-before-definition
slow_stop=self.factory.slow_stop,
)
if self._terminal_stdout is not None:
self._terminal_stdout.close() # pylint: disable=access-member-before-definition
if self._terminal_stderr is not None:
self._terminal_stderr.close() # pylint: disable=access-member-before-definition
stdout = stderr = ""
try:
self._terminal_result = ProcessResult(
returncode=exitcode, stdout=stdout, stderr=stderr, cmdline=cmdline
)
log.info("%s %s", self.factory.__class__.__name__, self._terminal_result)
return self._terminal_result
finally:
self._terminal = None
self._terminal_stdout = None
self._terminal_stderr = None
self._terminal_timeout = None
self._children = []
@attr.s(kw_only=True)
class PkgSsmSaltDaemonImpl(PkgSystemdSaltDaemonImpl):
def cmdline(self, *args): # pylint: disable=arguments-differ
"""
Construct a list of arguments to use when starting the subprocess.
:param str args:
Additional arguments to use when starting the subprocess
"""
if args: # pragma: no cover
log.debug(
"%s.run() is ignoring the passed in arguments: %r",
self.__class__.__name__,
args,
)
return (
str(self.factory.salt_pkg_install.ssm_bin),
"start",
self.get_service_name(),
)
def is_running(self):
"""
Returns true if the sub-process is alive.
"""
if self._process is None:
n = 1
while True:
if self._process is not None:
break
time.sleep(1)
ret = self._internal_run(
str(self.factory.salt_pkg_install.ssm_bin),
"processes",
self.get_service_name(),
)
log.warning(ret)
if not ret.stdout or (ret.stdout and not ret.stdout.strip()):
if n >= 120:
return False
n += 1
continue
for line in ret.stdout.splitlines():
log.warning("Line: %s", line)
if not line.strip():
continue
mainpid = line.strip().split()[0]
self._process = psutil.Process(int(mainpid))
break
return self._process.is_running()
def _terminate(self):
"""
This method actually terminates the started daemon.
"""
# We completely override the parent class method because we're not using the
# self._terminal property, it's a systemd service
if self._process is None: # pragma: no cover
if TYPE_CHECKING:
# Make mypy happy
assert self._terminal_result
return (
self._terminal_result
) # pylint: disable=access-member-before-definition
atexit.unregister(self.terminate)
log.info("Stopping %s", self.factory)
pid = self.pid
# Collect any child processes information before terminating the process
with contextlib.suppress(psutil.NoSuchProcess):
for child in psutil.Process(pid).children(recursive=True):
if (
child not in self._children
): # pylint: disable=access-member-before-definition
self._children.append(
child
) # pylint: disable=access-member-before-definition
if self._process.is_running(): # pragma: no cover
cmdline = _get_cmdline(self._process)
else:
cmdline = []
# Tell ssm to stop the service
try:
self._internal_run(
str(self.factory.salt_pkg_install.ssm_bin),
"stop",
self.get_service_name(),
)
except FileNotFoundError:
pass
if self._process.is_running(): # pragma: no cover
try:
self._process.wait()
except psutil.TimeoutExpired:
self._process.terminate()
try:
self._process.wait()
except psutil.TimeoutExpired:
pass
exitcode = self._process.wait() or 0
# Dereference the internal _process attribute
self._process = None
# Lets log and kill any child processes left behind, including the main subprocess
# if it failed to properly stop
terminate_process(
pid=pid,
kill_children=True,
children=self._children, # pylint: disable=access-member-before-definition
slow_stop=self.factory.slow_stop,
)
if self._terminal_stdout is not None:
self._terminal_stdout.close() # pylint: disable=access-member-before-definition
if self._terminal_stderr is not None:
self._terminal_stderr.close() # pylint: disable=access-member-before-definition
stdout = stderr = ""
try:
self._terminal_result = ProcessResult(
returncode=exitcode, stdout=stdout, stderr=stderr, cmdline=cmdline
)
log.info("%s %s", self.factory.__class__.__name__, self._terminal_result)
return self._terminal_result
finally:
self._terminal = None
self._terminal_stdout = None
self._terminal_stderr = None
self._terminal_timeout = None
self._children = []
@attr.s(kw_only=True)
class PkgMixin:
salt_pkg_install: SaltPkgInstall = attr.ib()
def get_script_path(self):
if self.salt_pkg_install.compressed:
return str(self.salt_pkg_install.run_root)
return super().get_script_path()
def get_base_script_args(self):
base_script_args = []
if self.salt_pkg_install.compressed:
if self.script_name == "spm":
base_script_args.append(self.script_name)
elif self.script_name != "salt":
base_script_args.append(self.script_name.split("salt-")[-1])
base_script_args.extend(super().get_base_script_args())
return base_script_args
def cmdline(self, *args, **kwargs):
_cmdline = super().cmdline(*args, **kwargs)
if self.salt_pkg_install.compressed is False:
return _cmdline
if _cmdline[0] == self.python_executable:
_cmdline.pop(0)
return _cmdline
@attr.s(kw_only=True)
class DaemonPkgMixin(PkgMixin):
def __attrs_post_init__(self):
if not platform.is_windows() and self.salt_pkg_install.system_service:
if platform.is_darwin():
self.write_launchd_conf()
else:
self.write_systemd_conf()
def get_service_name(self):
return self.script_name
def write_launchd_conf(self):
raise NotImplementedError
def write_systemd_conf(self):
raise NotImplementedError
@attr.s(kw_only=True)
class SaltMaster(DaemonPkgMixin, master.SaltMaster):
"""
Subclassed just to tweak the binary paths if needed and factory classes.
"""
def __attrs_post_init__(self):
self.script_name = "salt-master"
master.SaltMaster.__attrs_post_init__(self)
DaemonPkgMixin.__attrs_post_init__(self)
def _get_impl_class(self):
if self.system_install and self.salt_pkg_install.system_service:
if platform.is_windows():
return PkgSsmSaltDaemonImpl
if platform.is_darwin():
return PkgLaunchdSaltDaemonImpl
return PkgSystemdSaltDaemonImpl
return DaemonImpl
def write_launchd_conf(self):
self.salt_pkg_install.write_launchd_conf("master")
def write_systemd_conf(self):
self.salt_pkg_install.write_systemd_conf(
"salt-master", self.salt_pkg_install.binary_paths["master"]
)
def salt_minion_daemon(self, minion_id, **kwargs):
return super().salt_minion_daemon(
minion_id,
factory_class=SaltMinion,
salt_pkg_install=self.salt_pkg_install,
**kwargs,
)
def salt_api_daemon(self, **kwargs):
return super().salt_api_daemon(
factory_class=SaltApi, salt_pkg_install=self.salt_pkg_install, **kwargs
)
def salt_key_cli(self, **factory_class_kwargs):
return super().salt_key_cli(
factory_class=SaltKey,
salt_pkg_install=self.salt_pkg_install,
**factory_class_kwargs,
)
def salt_cli(self, **factory_class_kwargs):
return super().salt_cli(
factory_class=SaltCli,
salt_pkg_install=self.salt_pkg_install,
**factory_class_kwargs,
)
@attr.s(kw_only=True, slots=True)
class SaltMinion(DaemonPkgMixin, minion.SaltMinion):
"""
Subclassed just to tweak the binary paths if needed and factory classes.
"""
def __attrs_post_init__(self):
self.script_name = "salt-minion"
minion.SaltMinion.__attrs_post_init__(self)
DaemonPkgMixin.__attrs_post_init__(self)
def _get_impl_class(self):
if self.system_install and self.salt_pkg_install.system_service:
if platform.is_windows():
return PkgSsmSaltDaemonImpl
if platform.is_darwin():
return PkgLaunchdSaltDaemonImpl
return PkgSystemdSaltDaemonImpl
return DaemonImpl
def write_launchd_conf(self):
self.salt_pkg_install.write_launchd_conf("minion")
def write_systemd_conf(self):
self.salt_pkg_install.write_systemd_conf(
"salt-minion", self.salt_pkg_install.binary_paths["minion"]
)
def salt_call_cli(self, **factory_class_kwargs):
return super().salt_call_cli(
factory_class=SaltCall,
salt_pkg_install=self.salt_pkg_install,
**factory_class_kwargs,
)
@attr.s(kw_only=True, slots=True)
class SaltApi(DaemonPkgMixin, api.SaltApi):
"""
Subclassed just to tweak the binary paths if needed.
"""
def __attrs_post_init__(self):
self.script_name = "salt-api"
api.SaltApi.__attrs_post_init__(self)
DaemonPkgMixin.__attrs_post_init__(self)
def _get_impl_class(self):
if self.system_install and self.salt_pkg_install.system_service:
if platform.is_windows():
return PkgSsmSaltDaemonImpl
if platform.is_darwin():
return PkgLaunchdSaltDaemonImpl
return PkgSystemdSaltDaemonImpl
return DaemonImpl
def write_launchd_conf(self):
self.salt_pkg_install.write_launchd_conf("api")
def write_systemd_conf(self):
self.salt_pkg_install.write_systemd_conf(
"salt-api",
self.salt_pkg_install.binary_paths["api"],
)
@attr.s(kw_only=True, slots=True)
class SaltCall(PkgMixin, call.SaltCall):
"""
Subclassed just to tweak the binary paths if needed.
"""
def __attrs_post_init__(self):
call.SaltCall.__attrs_post_init__(self)
self.script_name = "salt-call"
@attr.s(kw_only=True, slots=True)
class SaltCli(PkgMixin, salt.SaltCli):
"""
Subclassed just to tweak the binary paths if needed.
"""
def __attrs_post_init__(self):
self.script_name = "salt"
salt.SaltCli.__attrs_post_init__(self)
@attr.s(kw_only=True, slots=True)
class SaltKey(PkgMixin, key.SaltKey):
"""
Subclassed just to tweak the binary paths if needed.
"""
def __attrs_post_init__(self):
self.script_name = "salt-key"
key.SaltKey.__attrs_post_init__(self)
@attr.s(kw_only=True, slots=True)
class TestUser:
"""
Add a test user
"""
salt_call_cli = attr.ib()
username = attr.ib(default="saltdev")
# Must follow Windows Password Complexity requirements
password = attr.ib(default="P@ssW0rd")
_pw_record = attr.ib(init=False, repr=False, default=None)
def salt_call_local(self, *args):
ret = self.salt_call_cli.run("--local", *args)
if ret.returncode != 0:
log.error(ret)
assert ret.returncode == 0
return ret.data
def add_user(self):
log.debug("Adding system account %r", self.username)
if platform.is_windows():
self.salt_call_local("user.add", self.username, self.password)
else:
self.salt_call_local("user.add", self.username)
hash_passwd = crypt.crypt(self.password, crypt.mksalt(crypt.METHOD_SHA512))
self.salt_call_local("shadow.set_password", self.username, hash_passwd)
assert self.username in self.salt_call_local("user.list_users")
def remove_user(self):
log.debug("Removing system account %r", self.username)
if platform.is_windows():
self.salt_call_local(
"user.delete", self.username, "purge=True", "force=True"
)
else:
self.salt_call_local("user.delete", self.username, "remove=True")
@property
def pw_record(self):
if self._pw_record is None and HAS_PWD:
self._pw_record = pwd.getpwnam(self.username)
return self._pw_record
@property
def uid(self):
if HAS_PWD:
return self.pw_record.pw_uid
return None
@property
def gid(self):
if HAS_PWD:
return self.pw_record.pw_gid
return None
@property
def env(self):
environ = os.environ.copy()
environ["LOGNAME"] = environ["USER"] = self.username
environ["HOME"] = self.pw_record.pw_dir
return environ
def __enter__(self):
self.add_user()
return self
def __exit__(self, *_):
self.remove_user()
@attr.s(kw_only=True, slots=True)
class ApiRequest:
salt_api: SaltApi = attr.ib(repr=False)
test_account: TestUser = attr.ib(repr=False)
session: requests.Session = attr.ib(init=False, repr=False)
api_uri: str = attr.ib(init=False)
auth_data: Dict[str, str] = attr.ib(init=False)
@session.default
def _default_session(self):
return requests.Session()
@api_uri.default
def _default_api_uri(self):
return f"http://localhost:{self.salt_api.config['rest_cherrypy']['port']}"
@auth_data.default
def _default_auth_data(self):
return {
"username": self.test_account.username,
"password": self.test_account.password,
"eauth": "auto",
"out": "json",
}
def post(self, url, data):
post_data = dict(**self.auth_data, **data)
resp = self.session.post(f"{self.api_uri}/run", data=post_data).json()
minion = next(iter(resp["return"][0]))
return resp["return"][0][minion]
def __enter__(self):
self.session.__enter__()
return self
def __exit__(self, *args):
self.session.__exit__(*args)
@pytest.helpers.register
def remove_stale_minion_key(master, minion_id):
key_path = os.path.join(master.config["pki_dir"], "minions", minion_id)
if os.path.exists(key_path):
os.unlink(key_path)
else:
log.debug("The minion(id=%r) key was not found at %s", minion_id, key_path)
@pytest.helpers.register
def remove_stale_master_key(master):
keys_path = os.path.join(master.config["pki_dir"], "master")
for key_name in ("master.pem", "master.pub"):
key_path = os.path.join(keys_path, key_name)
if os.path.exists(key_path):
os.unlink(key_path)
else:
log.debug(
"The master(id=%r) %s key was not found at %s",
master.id,
key_name,
key_path,
)
key_path = os.path.join(master.config["pki_dir"], "minion", "minion_master.pub")
if os.path.exists(key_path):
os.unlink(key_path)
else:
log.debug(
"The master(id=%r) minion_master.pub key was not found at %s",
master.id,
key_path,
)