salt/tools/utils/__init__.py
2025-01-29 15:22:23 -07:00

432 lines
12 KiB
Python

# pylint: disable=resource-leakage,broad-except,3rd-party-module-not-gated,bad-whitespace
from __future__ import annotations
import hashlib
import json
import os
import pathlib
import shutil
import sys
from enum import IntEnum
from functools import cache
import attr
import packaging.version
import yaml
from ptscripts import Context
from rich.progress import (
BarColumn,
Column,
DownloadColumn,
Progress,
TextColumn,
TimeRemainingColumn,
TransferSpeedColumn,
)
if sys.version_info < (3, 11):
from typing_extensions import TypedDict
else:
from typing import TypedDict # pylint: disable=no-name-in-module
REPO_ROOT = pathlib.Path(__file__).resolve().parent.parent.parent
GPG_KEY_FILENAME = "SALT-PROJECT-GPG-PUBKEY-2023"
SPB_ENVIRONMENT = os.environ.get("SPB_ENVIRONMENT") or "test"
STAGING_BUCKET_NAME = f"salt-project-{SPB_ENVIRONMENT}-salt-artifacts-staging"
RELEASE_BUCKET_NAME = f"salt-project-{SPB_ENVIRONMENT}-salt-artifacts-release"
BACKUP_BUCKET_NAME = f"salt-project-{SPB_ENVIRONMENT}-salt-artifacts-backup"
SHARED_WORKFLOW_CONTEXT_FILEPATH = (
REPO_ROOT / "cicd" / "shared-gh-workflows-context.yml"
)
class ExitCode(IntEnum):
OK = 0
FAIL = 1
SOFT_FAIL = 2
@attr.s(frozen=True, slots=True)
class OS:
platform: str = attr.ib()
slug: str = attr.ib()
arch: str = attr.ib()
display_name: str = attr.ib(default=None)
pkg_type: str = attr.ib(default=None)
@arch.default
def _default_arch(self):
return self._get_default_arch()
def _get_default_arch(self):
if "aarch64" in self.slug:
return "arm64"
if "arm64" in self.slug:
return "arm64"
return "x86_64"
@attr.s(frozen=True, slots=True)
class Linux(OS):
platform: str = attr.ib(default="linux")
fips: bool = attr.ib(default=False)
container: str = attr.ib(default=None)
@property
def job_name(self):
return f"test-{ self.slug.replace('.', '') }{'-fips' if self.fips else ''}"
def as_dict(self):
return {
"platform": self.platform,
"slug": self.slug,
"arch": self.arch,
"display_name": self.display_name,
"pkg_type": self.pkg_type,
"fips": self.fips,
"container": self.container,
"job_name": self.job_name,
}
@attr.s(frozen=True, slots=True)
class LinuxPkg(Linux):
@property
def job_name(self):
return f"test-pkg-{ self.slug.replace('.', '') }{ '-fips' if self.fips else ''}"
@attr.s(frozen=True, slots=True)
class MacOS(OS):
runner: str = attr.ib()
platform: str = attr.ib(default="macos")
@runner.default
def _default_runner(self):
return self.slug
@property
def job_name(self):
return f"test-{ self.slug.replace('.', '') }"
def as_dict(self):
return {
"platform": self.platform,
"slug": self.slug,
"arch": self.arch,
"display_name": self.display_name,
"pkg_type": self.pkg_type,
"runner": self.runner,
"job_name": self.job_name,
}
@attr.s(frozen=True, slots=True)
class MacOSPkg(MacOS):
@property
def job_name(self):
return f"test-pkg-{ self.slug.replace('.', '') }"
@attr.s(frozen=True, slots=True)
class Windows(OS):
platform: str = attr.ib(default="windows")
def _get_default_arch(self):
return "amd64"
@property
def job_name(self):
return f"test-{ self.slug.replace('.', '') }"
def as_dict(self):
return {
"platform": self.platform,
"slug": self.slug,
"arch": self.arch,
"display_name": self.display_name,
"pkg_type": self.pkg_type,
"job_name": self.job_name,
}
@attr.s(frozen=True, slots=True)
class WindowsPkg(Windows):
@property
def job_name(self):
return f"test-pkg-{ self.slug.replace('.', '') }-{ self.pkg_type.lower() }"
class PlatformDefinitions(TypedDict):
linux: list[Linux]
macos: list[MacOS]
windows: list[Windows]
def create_progress_bar(file_progress: bool = False, **kwargs):
if file_progress:
return Progress(
TextColumn("[progress.description]{task.description}"),
BarColumn(),
DownloadColumn(),
TransferSpeedColumn(),
TextColumn("eta"),
TimeRemainingColumn(),
**kwargs,
)
return Progress(
TextColumn(
"[progress.description]{task.description}", table_column=Column(ratio=3)
),
BarColumn(),
expand=True,
**kwargs,
)
def export_gpg_key(ctx: Context, key_id: str, export_path: pathlib.Path):
keyfile_gpg = export_path.joinpath(GPG_KEY_FILENAME).with_suffix(".gpg")
if keyfile_gpg.exists():
keyfile_gpg.unlink()
ctx.info(f"Exporting GnuPG Key '{key_id}' to {keyfile_gpg} ...")
ctx.run("gpg", "--output", str(keyfile_gpg), "--export", key_id)
keyfile_pub = export_path.joinpath(GPG_KEY_FILENAME).with_suffix(".pub")
if keyfile_pub.exists():
keyfile_pub.unlink()
ctx.info(f"Exporting GnuPG Key '{key_id}' to {keyfile_pub} ...")
ctx.run("gpg", "--armor", "--output", str(keyfile_pub), "--export", key_id)
def gpg_sign(ctx: Context, key_id: str, path: pathlib.Path):
ctx.info(f"GPG Signing '{path}' ...")
signature_fpath = path.parent / f"{path.name}.asc"
if signature_fpath.exists():
signature_fpath.unlink()
ctx.run(
"gpg",
"--local-user",
key_id,
"--output",
str(signature_fpath),
"--armor",
"--detach-sign",
"--sign",
str(path),
)
class Version(packaging.version.Version):
def __lt__(self, other):
if not isinstance(other, self.__class__):
other = self.__class__(other)
return super().__lt__(other)
def __le__(self, other):
if not isinstance(other, self.__class__):
other = self.__class__(other)
return super().__le__(other)
def __eq__(self, other):
if not isinstance(other, self.__class__):
other = self.__class__(other)
return super().__eq__(other)
def __ge__(self, other):
if not isinstance(other, self.__class__):
other = self.__class__(other)
return super().__ge__(other)
def __gt__(self, other):
if not isinstance(other, self.__class__):
other = self.__class__(other)
return super().__gt__(other)
def __ne__(self, other):
if not isinstance(other, self.__class__):
other = self.__class__(other)
return super().__ne__(other)
def __str__(self):
return super().__str__().replace(".post", "-")
def __hash__(self):
return hash(str(self))
def get_salt_releases(
ctx: Context, repository: str = "saltstack/salt"
) -> list[Version]:
"""
Return a list of salt versions
"""
# Deferred import
import tools.utils.gh
ctx.info(f"Collecting salt releases from repository '{repository}'")
versions = set()
with ctx.web as web:
headers = {
"Accept": "application/vnd.github+json",
}
github_token = tools.utils.gh.get_github_token(ctx)
if github_token is not None:
headers["Authorization"] = f"Bearer {github_token}"
web.headers.update(headers)
ret = web.get(f"https://api.github.com/repos/{repository}/tags")
if ret.status_code != 200:
ctx.error(
f"Failed to get the tags for repository {repository!r}: {ret.reason}"
)
ctx.exit(1)
for tag in ret.json():
name = tag["name"]
if name.startswith("v"):
name = name[1:]
if "docs" in name:
# We're not going to consider doc tags
continue
versions.add(Version(name))
# Now let's go through the github releases
ret = web.get(f"https://api.github.com/repos/{repository}/releases")
if ret.status_code != 200:
ctx.error(
f"Failed to get the releases for repository {repository!r}: {ret.reason}"
)
ctx.exit(1)
for release in ret.json():
name = release["name"]
if name.startswith("v"):
name = name[1:]
if name and "docs" not in name:
# We're not going to parse docs releases
versions.add(Version(name))
name = release["tag_name"]
if "docs" not in name:
# We're not going to parse docs releases
versions.add(Version(name))
return sorted(versions)
def parse_versions(*versions: str) -> list[Version]:
_versions = []
for version in set(versions):
if version == "latest":
continue
_versions.append(Version(version))
if _versions:
_versions.sort(reverse=True)
return _versions
def get_file_checksum(fpath: pathlib.Path, hash_name: str) -> str:
with fpath.open("rb") as rfh:
try:
digest = hashlib.file_digest(rfh, hash_name) # type: ignore[attr-defined]
except AttributeError:
# Python < 3.11
buf = bytearray(2**18) # Reusable buffer to reduce allocations.
view = memoryview(buf)
digest = getattr(hashlib, hash_name)()
while True:
size = rfh.readinto(buf)
if size == 0:
break # EOF
digest.update(view[:size])
hexdigest: str = digest.hexdigest()
return hexdigest
def download_file(
ctx: Context,
url: str,
dest: pathlib.Path,
auth: tuple[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> pathlib.Path:
ctx.info(f"Downloading {dest.name!r} @ {url} ...")
if sys.platform == "win32":
# We don't want to use curl on Windows, it doesn't work
curl = None
else:
curl = shutil.which("curl")
if curl is not None:
command = [curl, "-sS", "-L"]
if headers:
for key, value in headers.items():
command.extend(["-H", f"{key}: {value}"])
command.extend(["-o", str(dest), url])
ret = ctx.run(*command)
if ret.returncode:
ctx.error(f"Failed to download {url}")
ctx.exit(1)
return dest
wget = shutil.which("wget")
if wget is not None:
with ctx.chdir(dest.parent):
command = [wget, "--no-verbose"]
if headers:
for key, value in headers.items():
command.append(f"--header={key}: {value}")
command.append(url)
ret = ctx.run(*command)
if ret.returncode:
ctx.error(f"Failed to download {url}")
ctx.exit(1)
return dest
# NOTE the stream=True parameter below
with ctx.web as web:
if headers:
web.headers.update(headers)
with web.get(url, stream=True, auth=auth) as r:
r.raise_for_status()
with dest.open("wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return dest
def get_platform_and_arch_from_slug(slug: str) -> tuple[str, str]:
if "windows" in slug:
platform = "windows"
arch = "amd64"
elif "macos" in slug:
platform = "macos"
if "macos-13" in slug and "xlarge" in slug:
arch = "arm64"
else:
arch = "x86_64"
else:
platform = "linux"
if "arm64" in slug:
arch = "arm64"
elif "aarch64" in slug:
arch = "arm64"
else:
arch = "x86_64"
return platform, arch
@cache
def get_cicd_shared_context():
"""
Return the CI/CD shared context file contents.
"""
shared_context_file = REPO_ROOT / "cicd" / "shared-gh-workflows-context.yml"
return yaml.safe_load(shared_context_file.read_text())
@cache
def get_golden_images():
"""
Return the golden images information stored on file.
"""
with REPO_ROOT.joinpath("cicd", "golden-images.json").open(
"r", encoding="utf-8"
) as rfh:
return json.load(rfh)