initial removal of python librarys that only exist in debian system python, not done yet

remove unused functions

add pkg.which for deb packages. an item from pkng that should have spread to systems that support it

move SourcesList and SourceEntry to salt.utils.pkg.deb where it belongs.

fix pkg.which test hopfully coreutils is installed

first attempt at fixng #65703

add first changelogs

fix the indexing issue with deb opts by using OrderedDict instead

move salt.utils.pkg.deb tests to functional since not actually unit test.

use example.com instead of real repo in tests.

fix changelog 65703

added changelog for 66201

fix two to many toos in changelog
This commit is contained in:
Thomas Phipps 2024-02-28 19:44:33 +00:00 committed by Thomas Phipps
parent d036b1177e
commit aabba2793a
9 changed files with 443 additions and 502 deletions

1
changelog/65703.fixed.md Normal file
View file

@ -0,0 +1 @@
fix 65703 by using OrderedDict instead of a index that breaks. .

View file

@ -0,0 +1 @@
re-work the aptpkg module to remove system libraries that onedir and virtualenvs do not have access. Streamline testing, and code use to needed libraries only.

1
changelog/66201.added.md Normal file
View file

@ -0,0 +1 @@
added pkg.which to aptpkg, for finding which package installed a file.

View file

@ -6,8 +6,6 @@ Support for APT (Advanced Packaging Tool)
minion, and it is using a different module (or gives an error similar to
*'pkg.install' is not available*), see :ref:`here
<module-provider-override>`.
For repository management, the ``python-apt`` package must be installed.
"""
import copy
@ -18,7 +16,6 @@ import os
import pathlib
import re
import shutil
import tempfile
import time
from urllib.error import HTTPError
from urllib.request import Request as _Request
@ -47,32 +44,10 @@ from salt.exceptions import (
SaltInvocationError,
)
from salt.modules.cmdmod import _parse_env
from salt.utils.pkg.deb import SourceEntry, SourcesList
log = logging.getLogger(__name__)
# pylint: disable=import-error
try:
from aptsources.sourceslist import SourceEntry, SourcesList
HAS_APT = True
except ImportError:
HAS_APT = False
try:
import apt_pkg
HAS_APTPKG = True
except ImportError:
HAS_APTPKG = False
try:
import softwareproperties.ppa
HAS_SOFTWAREPROPERTIES = True
except ImportError:
HAS_SOFTWAREPROPERTIES = False
# pylint: enable=import-error
APT_LISTS_PATH = "/var/lib/apt/lists"
PKG_ARCH_SEPARATOR = ":"
@ -120,212 +95,6 @@ def __init__(opts):
os.environ.update(DPKG_ENV_VARS)
def _invalid(line):
"""
This is a workaround since python3-apt does not support
the signed-by argument. This function was removed from
the class to ensure users using the python3-apt module or
not can use the signed-by option.
"""
disabled = False
invalid = False
comment = ""
line = line.strip()
if not line:
invalid = True
return disabled, invalid, comment, ""
if line.startswith("#"):
disabled = True
line = line[1:]
idx = line.find("#")
if idx > 0:
comment = line[idx + 1 :]
line = line[:idx]
cdrom_match = re.match(r"(.*)(cdrom:.*/)(.*)", line.strip())
if cdrom_match:
repo_line = (
[p.strip() for p in cdrom_match.group(1).split()]
+ [cdrom_match.group(2).strip()]
+ [p.strip() for p in cdrom_match.group(3).split()]
)
else:
repo_line = line.strip().split()
if (
not repo_line
or repo_line[0] not in ["deb", "deb-src", "rpm", "rpm-src"]
or len(repo_line) < 3
):
invalid = True
return disabled, invalid, comment, repo_line
if repo_line[1].startswith("["):
if not any(x.endswith("]") for x in repo_line[1:]):
invalid = True
return disabled, invalid, comment, repo_line
return disabled, invalid, comment, repo_line
if not HAS_APT:
class SourceEntry: # pylint: disable=function-redefined
def __init__(self, line, file=None):
self.invalid = False
self.comps = []
self.disabled = False
self.comment = ""
self.dist = ""
self.type = ""
self.uri = ""
self.line = line
self.architectures = []
self.signedby = ""
self.file = file
if not self.file:
self.file = str(pathlib.Path(os.sep, "etc", "apt", "sources.list"))
self._parse_sources(line)
def str(self):
return self.repo_line()
def repo_line(self):
"""
Return the repo line for the sources file
"""
repo_line = []
if self.invalid:
return self.line
if self.disabled:
repo_line.append("#")
repo_line.append(self.type)
opts = _get_opts(self.line)
if self.architectures:
archs = ",".join(self.architectures)
opts["arch"]["full"] = f"arch={archs}"
opts["arch"]["value"] = self.architectures
if self.signedby:
opts["signedby"]["full"] = f"signed-by={self.signedby}"
opts["signedby"]["value"] = self.signedby
ordered_opts = [
opt_type for opt_type, opt in opts.items() if opt["full"] != ""
]
for opt in opts.values():
if opt["full"] != "":
ordered_opts[opt["index"]] = opt["full"]
if ordered_opts:
repo_line.append("[{}]".format(" ".join(ordered_opts)))
repo_line += [self.uri, self.dist, " ".join(self.comps)]
if self.comment:
repo_line.append(f"#{self.comment}")
return " ".join(repo_line) + "\n"
def _parse_sources(self, line):
"""
Parse lines from sources files
"""
self.disabled, self.invalid, self.comment, repo_line = _invalid(line)
if self.invalid:
return False
if repo_line[1].startswith("["):
repo_line = [x for x in (line.strip("[]") for line in repo_line) if x]
opts = _get_opts(self.line)
self.architectures.extend(opts["arch"]["value"])
self.signedby = opts["signedby"]["value"]
for opt in opts:
opt = opts[opt]["full"]
if opt:
try:
repo_line.pop(repo_line.index(opt))
except ValueError:
repo_line.pop(repo_line.index("[" + opt + "]"))
self.type = repo_line[0]
self.uri = repo_line[1]
self.dist = repo_line[2]
self.comps = repo_line[3:]
return True
class SourcesList: # pylint: disable=function-redefined
def __init__(self):
self.list = []
self.files = [
pathlib.Path(os.sep, "etc", "apt", "sources.list"),
pathlib.Path(os.sep, "etc", "apt", "sources.list.d"),
]
for file in self.files:
if file.is_dir():
for fp in file.glob("**/*.list"):
self.add_file(file=fp)
else:
self.add_file(file)
def __iter__(self):
yield from self.list
def add_file(self, file):
"""
Add the lines of a file to self.list
"""
if file.is_file():
with salt.utils.files.fopen(str(file)) as source:
for line in source:
self.list.append(SourceEntry(line, file=str(file)))
else:
log.debug("The apt sources file %s does not exist", file)
def add(self, type, uri, dist, orig_comps, architectures, signedby):
opts_count = []
opts_line = ""
if architectures:
architectures = "arch={}".format(",".join(architectures))
opts_count.append(architectures)
if signedby:
signedby = f"signed-by={signedby}"
opts_count.append(signedby)
if len(opts_count) > 1:
opts_line = "[" + " ".join(opts_count) + "]"
elif len(opts_count) == 1:
opts_line = "[" + "".join(opts_count) + "]"
repo_line = [
type,
opts_line,
uri,
dist,
" ".join(orig_comps),
]
return SourceEntry(" ".join([line for line in repo_line if line.strip()]))
def remove(self, source):
"""
remove a source from the list of sources
"""
self.list.remove(source)
def save(self):
"""
write all of the sources from the list of sources
to the file.
"""
filemap = {}
with tempfile.TemporaryDirectory() as tmpdir:
for source in self.list:
fname = pathlib.Path(tmpdir, pathlib.Path(source.file).name)
with salt.utils.files.fopen(str(fname), "a") as fp:
fp.write(source.repo_line())
if source.file not in filemap:
filemap[source.file] = {"tmp": fname}
for fp in filemap:
shutil.move(str(filemap[fp]["tmp"]), fp)
def _get_ppa_info_from_launchpad(owner_name, ppa_name):
"""
Idea from softwareproperties.ppa.
@ -338,21 +107,12 @@ def _get_ppa_info_from_launchpad(owner_name, ppa_name):
:return:
"""
lp_url = "https://launchpad.net/api/1.0/~{}/+archive/{}".format(
owner_name, ppa_name
)
lp_url = f"https://launchpad.net/api/1.0/~{owner_name}/+archive/{ppa_name}"
request = _Request(lp_url, headers={"Accept": "application/json"})
lp_page = _urlopen(request)
return salt.utils.json.load(lp_page)
def _reconstruct_ppa_name(owner_name, ppa_name):
"""
Stringify PPA name from args.
"""
return f"ppa:{owner_name}/{ppa_name}"
def _call_apt(args, scope=True, **kwargs):
"""
Call apt* utilities.
@ -383,18 +143,6 @@ def _call_apt(args, scope=True, **kwargs):
return cmd_ret
def _warn_software_properties(repo):
"""
Warn of missing python-software-properties package.
"""
log.warning(
"The 'python-software-properties' package is not installed. "
"For more accurate support of PPA repositories, you should "
"install this package."
)
log.warning("Best guess at ppa format: %s", repo)
def normalize_name(name):
"""
Strips the architecture from the specified package name, if necessary.
@ -635,9 +383,7 @@ def refresh_db(cache_valid_time=0, failhard=False, **kwargs):
error_repos.append(ident)
if failhard and error_repos:
raise CommandExecutionError(
"Error getting repos: {}".format(", ".join(error_repos))
)
raise CommandExecutionError(f"Error getting repos: {', '.join(error_repos)}")
return ret
@ -863,22 +609,23 @@ def install(
)
else:
pkg_params_items = []
for pkg_source in pkg_params:
if "lowpkg.bin_pkg_info" in __salt__:
# we don't need to do the test below for every package in the list.
# it either exists or doesn't. test once then loop.
if "lowpkg.bin_pkg_info" in __salt__:
for pkg_source in pkg_params:
deb_info = __salt__["lowpkg.bin_pkg_info"](pkg_source)
else:
deb_info = None
if deb_info is None:
pkg_params_items.append(
[deb_info["name"], pkg_source, deb_info["version"]]
)
else:
for pkg_source in pkg_params:
log.error(
"pkg.install: Unable to get deb information for %s. "
"Version comparisons will be unavailable.",
pkg_source,
)
pkg_params_items.append([pkg_source])
else:
pkg_params_items.append(
[deb_info["name"], pkg_source, deb_info["version"]]
)
# Build command prefix
cmd_prefix.extend(["apt-get", "-q", "-y"])
if kwargs.get("force_yes", False):
@ -886,8 +633,14 @@ def install(
if "force_conf_new" in kwargs and kwargs["force_conf_new"]:
cmd_prefix.extend(["-o", "DPkg::Options::=--force-confnew"])
else:
cmd_prefix.extend(["-o", "DPkg::Options::=--force-confold"])
cmd_prefix += ["-o", "DPkg::Options::=--force-confdef"]
cmd_prefix.extend(
[
"-o",
"DPkg::Options::=--force-confold",
"-o",
"DPkg::Options::=--force-confdef",
]
)
if "install_recommends" in kwargs:
if not kwargs["install_recommends"]:
cmd_prefix.append("--no-install-recommends")
@ -942,12 +695,8 @@ def install(
)
if target is None:
errors.append(
"No version matching '{}{}' could be found "
"(available: {})".format(
pkgname,
version_num,
", ".join(candidates) if candidates else None,
)
f"No version matching '{pkgname}{version_num}' could be found "
f"(available: {', '.join(candidates) if candidates else None})"
)
continue
else:
@ -1412,9 +1161,7 @@ def hold(name=None, pkgs=None, sources=None, **kwargs): # pylint: disable=W0613
ret[target]["comment"] = f"Package {target} is now being held."
else:
ret[target].update(result=True)
ret[target]["comment"] = "Package {} is already set to be held.".format(
target
)
ret[target]["comment"] = f"Package {target} is already set to be held."
return ret
@ -1470,20 +1217,14 @@ def unhold(name=None, pkgs=None, sources=None, **kwargs): # pylint: disable=W06
elif salt.utils.data.is_true(state.get("hold", False)):
if "test" in __opts__ and __opts__["test"]:
ret[target].update(result=None)
ret[target]["comment"] = "Package {} is set not to be held.".format(
target
)
ret[target]["comment"] = f"Package {target} is set not to be held."
else:
result = set_selections(selection={"install": [target]})
ret[target].update(changes=result[target], result=True)
ret[target]["comment"] = "Package {} is no longer being held.".format(
target
)
ret[target]["comment"] = f"Package {target} is no longer being held."
else:
ret[target].update(result=True)
ret[target]["comment"] = "Package {} is already set not to be held.".format(
target
)
ret[target]["comment"] = f"Package {target} is already set not to be held."
return ret
@ -1603,7 +1344,7 @@ def _get_upgradable(dist_upgrade=True, **kwargs):
else:
cmd.append("upgrade")
try:
cmd.extend(["-o", "APT::Default-Release={}".format(kwargs["fromrepo"])])
cmd.extend(["-o", f"APT::Default-Release={kwargs['fromrepo']}"])
except KeyError:
pass
@ -1708,23 +1449,6 @@ def version_cmp(pkg1, pkg2, ignore_epoch=False, **kwargs):
# if we have apt_pkg, this will be quickier this way
# and also do not rely on shell.
if HAS_APTPKG:
try:
# the apt_pkg module needs to be manually initialized
apt_pkg.init_system()
# if there is a difference in versions, apt_pkg.version_compare will
# return an int representing the difference in minor versions, or
# 1/-1 if the difference is smaller than minor versions. normalize
# to -1, 0 or 1.
try:
ret = apt_pkg.version_compare(pkg1, pkg2)
except TypeError:
ret = apt_pkg.version_compare(str(pkg1), str(pkg2))
return 1 if ret > 0 else -1 if ret < 0 else 0
except Exception: # pylint: disable=broad-except
# Try to use shell version in case of errors w/python bindings
pass
try:
for oper, ret in (("lt", -1), ("eq", 0), ("gt", 1)):
cmd = ["dpkg", "--compare-versions", pkg1, oper, pkg2]
@ -1738,54 +1462,13 @@ def version_cmp(pkg1, pkg2, ignore_epoch=False, **kwargs):
return None
def _get_opts(line):
"""
Return all opts in [] for a repo line
"""
get_opts = re.search(r"\[(.*=.*)\]", line)
ret = {
"arch": {"full": "", "value": "", "index": 0},
"signedby": {"full": "", "value": "", "index": 0},
}
if not get_opts:
return ret
opts = get_opts.group(0).strip("[]")
architectures = []
for idx, opt in enumerate(opts.split()):
if opt.startswith("arch"):
architectures.extend(opt.split("=", 1)[1].split(","))
ret["arch"]["full"] = opt
ret["arch"]["value"] = architectures
ret["arch"]["index"] = idx
elif opt.startswith("signed-by"):
ret["signedby"]["full"] = opt
ret["signedby"]["value"] = opt.split("=", 1)[1]
ret["signedby"]["index"] = idx
else:
other_opt = opt.split("=", 1)[0]
ret[other_opt] = {}
ret[other_opt]["full"] = opt
ret[other_opt]["value"] = opt.split("=", 1)[1]
ret[other_opt]["index"] = idx
return ret
def _split_repo_str(repo):
"""
Return APT source entry as a dictionary
"""
entry = SourceEntry(repo)
invalid = entry.invalid
if not HAS_APT:
signedby = entry.signedby
else:
signedby = _get_opts(line=repo)["signedby"].get("value", "")
if signedby:
# python3-apt does not support signedby. So if signedby
# is in the repo we have to check our code to see if the
# repo is invalid ourselves.
_, invalid, _, _ = _invalid(repo)
signedby = entry.signedby
return {
"invalid": invalid,
@ -1945,10 +1628,7 @@ def list_repos(**kwargs):
for source in sources.list:
if _skip_source(source):
continue
if not HAS_APT:
signedby = source.signedby
else:
signedby = _get_opts(line=source.line)["signedby"].get("value", "")
signedby = source.signedby
repo = {}
repo["file"] = source.file
repo["comps"] = getattr(source, "comps", [])
@ -1987,20 +1667,7 @@ def get_repo(repo, **kwargs):
auth_info = f"{ppa_auth}@"
repo = LP_PVT_SRC_FORMAT.format(auth_info, owner_name, ppa_name, dist)
else:
if HAS_SOFTWAREPROPERTIES:
try:
if hasattr(softwareproperties.ppa, "PPAShortcutHandler"):
repo = softwareproperties.ppa.PPAShortcutHandler(repo).expand(
dist
)[0]
else:
repo = softwareproperties.ppa.expand_ppa_line(repo, dist)[0]
except NameError as name_error:
raise CommandExecutionError(
f"Could not find ppa {repo}: {name_error}"
)
else:
repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist)
repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist)
repos = list_repos()
@ -2011,9 +1678,9 @@ def get_repo(repo, **kwargs):
uri_match = re.search("(http[s]?://)(.+)", repo_entry["uri"])
if uri_match:
if not uri_match.group(2).startswith(ppa_auth):
repo_entry["uri"] = "{}{}@{}".format(
uri_match.group(1), ppa_auth, uri_match.group(2)
)
repo_entry[
"uri"
] = f"{uri_match.group(1)}{ppa_auth}@{uri_match.group(2)}"
except SyntaxError:
raise CommandExecutionError(
f"Error: repo '{repo}' is not a well formatted definition"
@ -2057,19 +1724,12 @@ def del_repo(repo, **kwargs):
# to derive the name.
is_ppa = True
dist = __grains__["oscodename"]
if not HAS_SOFTWAREPROPERTIES:
_warn_software_properties(repo)
owner_name, ppa_name = repo[4:].split("/")
if "ppa_auth" in kwargs:
auth_info = "{}@".format(kwargs["ppa_auth"])
repo = LP_PVT_SRC_FORMAT.format(auth_info, dist, owner_name, ppa_name)
else:
repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist)
owner_name, ppa_name = repo[4:].split("/")
if "ppa_auth" in kwargs:
auth_info = f"{kwargs['ppa_auth']}@"
repo = LP_PVT_SRC_FORMAT.format(auth_info, dist, owner_name, ppa_name)
else:
if hasattr(softwareproperties.ppa, "PPAShortcutHandler"):
repo = softwareproperties.ppa.PPAShortcutHandler(repo).expand(dist)[0]
else:
repo = softwareproperties.ppa.expand_ppa_line(repo, dist)[0]
repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist)
sources = SourcesList()
repos = [s for s in sources.list if not s.invalid]
@ -2410,9 +2070,7 @@ def add_repo_key(
kwargs.update({"stdin": text})
elif keyserver:
if not keyid:
error_msg = "No keyid or keyid too short for keyserver: {}".format(
keyserver
)
error_msg = f"No keyid or keyid too short for keyserver: {keyserver}"
raise SaltInvocationError(error_msg)
if not aptkey:
@ -2657,30 +2315,20 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
out = _call_apt(cmd, env=env, scope=False, **kwargs)
if out["retcode"]:
raise CommandExecutionError(
"Unable to add PPA '{}'. '{}' exited with "
"status {!s}: '{}' ".format(
repo[4:], cmd, out["retcode"], out["stderr"]
)
f"Unable to add PPA '{repo[4:]}'. '{cmd}' exited with status {out['retcode']!s}: '{out['stderr']}'"
)
# explicit refresh when a repo is modified.
if refresh:
refresh_db()
return {repo: out}
else:
if not HAS_SOFTWAREPROPERTIES:
_warn_software_properties(repo)
else:
log.info("Falling back to urllib method for private PPA")
# fall back to urllib style
try:
owner_name, ppa_name = repo[4:].split("/", 1)
except ValueError:
raise CommandExecutionError(
"Unable to get PPA info from argument. "
'Expected format "<PPA_OWNER>/<PPA_NAME>" '
"(e.g. saltstack/salt) not found. Received "
"'{}' instead.".format(repo[4:])
f"Unable to get PPA info from argument. Expected format \"<PPA_OWNER>/<PPA_NAME>\" (e.g. saltstack/salt) not found. Received '{repo[4:]}' instead."
)
dist = __grains__["oscodename"]
# ppa has a lot of implicit arguments. Make them explicit.
@ -2688,8 +2336,10 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
kwargs["dist"] = dist
ppa_auth = ""
if "file" not in kwargs:
filename = "/etc/apt/sources.list.d/{0}-{1}-{2}.list"
kwargs["file"] = filename.format(owner_name, ppa_name, dist)
filename = (
f"/etc/apt/sources.list.d/{owner_name}-{ppa_name}-{dist}.list"
)
kwargs["file"] = filename
try:
launchpad_ppa_info = _get_ppa_info_from_launchpad(
owner_name, ppa_name
@ -2698,23 +2348,16 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
kwargs["keyid"] = launchpad_ppa_info["signing_key_fingerprint"]
else:
if "keyid" not in kwargs:
error_str = (
"Private PPAs require a keyid to be specified: {0}/{1}"
)
raise CommandExecutionError(
error_str.format(owner_name, ppa_name)
f"Private PPAs require a keyid to be specified: {owner_name}/{ppa_name}"
)
except HTTPError as exc:
raise CommandExecutionError(
"Launchpad does not know about {}/{}: {}".format(
owner_name, ppa_name, exc
)
f"Launchpad does not know about {owner_name}/{ppa_name}: {exc}"
)
except IndexError as exc:
raise CommandExecutionError(
"Launchpad knows about {}/{} but did not "
"return a fingerprint. Please set keyid "
"manually: {}".format(owner_name, ppa_name, exc)
f"Launchpad knows about {owner_name}/{ppa_name} but did not return a fingerprint. Please set keyid manually: {exc}"
)
if "keyserver" not in kwargs:
@ -2722,15 +2365,13 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
if "ppa_auth" in kwargs:
if not launchpad_ppa_info["private"]:
raise CommandExecutionError(
"PPA is not private but auth credentials passed: {}".format(
repo
)
f"PPA is not private but auth credentials passed: {repo}"
)
# assign the new repo format to the "repo" variable
# so we can fall through to the "normal" mechanism
# here.
if "ppa_auth" in kwargs:
ppa_auth = "{}@".format(kwargs["ppa_auth"])
ppa_auth = f"{kwargs['ppa_auth']}@"
repo = LP_PVT_SRC_FORMAT.format(
ppa_auth, owner_name, ppa_name, dist
)
@ -2758,12 +2399,7 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
repos = []
for source in sources:
if HAS_APT:
_, invalid, _, _ = _invalid(source.line)
if not invalid:
repos.append(source)
else:
repos.append(source)
repos.append(source)
mod_source = None
try:
@ -2854,9 +2490,7 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
ret = _call_apt(cmd, scope=False, **kwargs)
if ret["retcode"] != 0:
raise CommandExecutionError(
"Error: key retrieval failed: {}".format(
ret["stdout"]
)
f"Error: key retrieval failed: {ret['stdout']}"
)
elif "key_url" in kwargs:
@ -2965,10 +2599,7 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
if refresh:
refresh_db()
if not HAS_APT:
signedby = mod_source.signedby
else:
signedby = _get_opts(repo)["signedby"].get("value", "")
signedby = mod_source.signedby
return {
repo: {
@ -3039,19 +2670,12 @@ def _expand_repo_def(os_name, os_codename=None, **kwargs):
auth_info = "{}@".format(kwargs["ppa_auth"])
repo = LP_PVT_SRC_FORMAT.format(auth_info, owner_name, ppa_name, dist)
else:
if HAS_SOFTWAREPROPERTIES:
if hasattr(softwareproperties.ppa, "PPAShortcutHandler"):
repo = softwareproperties.ppa.PPAShortcutHandler(repo).expand(dist)[
0
]
else:
repo = softwareproperties.ppa.expand_ppa_line(repo, dist)[0]
else:
repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist)
repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist)
if "file" not in kwargs:
filename = "/etc/apt/sources.list.d/{0}-{1}-{2}.list"
kwargs["file"] = filename.format(owner_name, ppa_name, dist)
kwargs[
"file"
] = f"/etc/apt/sources.list.d/{owner_name}-{ppa_name}-{dist}.list"
source_entry = SourceEntry(repo)
for list_args in ("architectures", "comps"):
@ -3065,11 +2689,8 @@ def _expand_repo_def(os_name, os_codename=None, **kwargs):
source_list = SourcesList()
kwargs = {}
if not HAS_APT:
signedby = source_entry.signedby
kwargs["signedby"] = signedby
else:
signedby = _get_opts(repo)["signedby"].get("value", "")
signedby = source_entry.signedby
kwargs["signedby"] = signedby
_source_entry = source_list.add(
type=source_entry.type,
@ -3094,28 +2715,6 @@ def _expand_repo_def(os_name, os_codename=None, **kwargs):
sanitized["line"] = _source_entry.line.strip()
sanitized["architectures"] = getattr(_source_entry, "architectures", [])
sanitized["signedby"] = signedby
if HAS_APT and signedby:
# python3-apt does not supported the signed-by opt currently.
# creating the line with all opts including signed-by
if signedby not in sanitized["line"]:
line = sanitized["line"].split()
repo_opts = _get_opts(repo)
opts_order = [
opt_type
for opt_type, opt_def in repo_opts.items()
if opt_def["full"] != ""
]
for opt in repo_opts:
if "index" in repo_opts[opt]:
idx = repo_opts[opt]["index"]
opts_order[idx] = repo_opts[opt]["full"]
opts = "[" + " ".join(opts_order) + "]"
if line[1].startswith("["):
line[1] = opts
else:
line.insert(1, opts)
sanitized["line"] = " ".join(line)
return sanitized
@ -3251,9 +2850,7 @@ def set_selections(path=None, selection=None, clear=False, saltenv="base"):
valid_states = ("install", "hold", "deinstall", "purge")
bad_states = [x for x in selection if x not in valid_states]
if bad_states:
raise SaltInvocationError(
"Invalid state(s): {}".format(", ".join(bad_states))
)
raise SaltInvocationError(f"Invalid state(s): {', '.join(bad_states)}")
if clear:
cmd = ["dpkg", "--clear-selections"]
@ -3547,3 +3144,31 @@ def services_need_restart(**kwargs):
services.add(service)
return list(services)
def which(path):
"""
Displays which package installed a specific file
CLI Examples:
.. code-block:: bash
salt * pkg.which <file name>
"""
filepath = pathlib.Path(path)
cmd = ["dpkg"]
if filepath.is_absolute():
if filepath.exists():
cmd.extend(["-S", str(filepath)])
else:
log.debug("%s does not exist", filepath)
return False
else:
log.debug("%s is not absolute path", filepath)
return False
cmd_ret = _call_apt(cmd)
if "no path found matching pattern" in cmd_ret["stdout"]:
return None
pkg = cmd_ret["stdout"].split(":")[0]
return pkg

View file

@ -2,6 +2,254 @@
Common functions for working with deb packages
"""
import logging
import os
import pathlib
import re
import shutil
import tempfile
from collections import OrderedDict
import salt.utils.files
log = logging.getLogger(__name__)
class SourceEntry: # pylint: disable=function-redefined
def __init__(self, line, file=None):
self.invalid = False
self.comps = []
self.disabled = False
self.comment = ""
self.dist = ""
self.type = ""
self.uri = ""
self.line = line
self.architectures = []
self.signedby = ""
self.file = file
if not self.file:
self.file = str(pathlib.Path(os.sep, "etc", "apt", "sources.list"))
self._parse_sources(line)
def str(self):
return self.repo_line()
def repo_line(self):
"""
Return the repo line for the sources file
"""
repo_line = []
if self.invalid:
return self.line
if self.disabled:
repo_line.append("#")
repo_line.append(self.type)
opts = _get_opts(self.line)
if self.architectures:
if "arch" not in opts:
opts["arch"] = {}
opts["arch"]["full"] = f"arch={','.join(self.architectures)}"
opts["arch"]["value"] = self.architectures
if self.signedby:
if "signedby" not in opts:
opts["signedby"] = {}
opts["signedby"]["full"] = f"signed-by={self.signedby}"
opts["signedby"]["value"] = self.signedby
ordered_opts = []
for opt in opts.values():
if opt["full"] != "":
ordered_opts.append(opt["full"])
if ordered_opts:
repo_line.append(f"[{' '.join(ordered_opts)}]")
repo_line += [self.uri, self.dist, " ".join(self.comps)]
if self.comment:
repo_line.append(f"#{self.comment}")
return " ".join(repo_line) + "\n"
def _parse_sources(self, line):
"""
Parse lines from sources files
"""
self.disabled, self.invalid, self.comment, repo_line = _invalid(line)
if self.invalid:
return False
if repo_line[1].startswith("["):
repo_line = [x for x in (line.strip("[]") for line in repo_line) if x]
opts = _get_opts(self.line)
if "arch" in opts:
self.architectures.extend(opts["arch"]["value"])
if "signedby" in opts:
self.signedby = opts["signedby"]["value"]
for opt in opts.values():
opt = opt["full"]
if opt:
try:
repo_line.pop(repo_line.index(opt))
except ValueError:
repo_line.pop(repo_line.index(f"[{opt}]"))
self.type = repo_line[0]
self.uri = repo_line[1]
self.dist = repo_line[2]
self.comps = repo_line[3:]
return True
class SourcesList: # pylint: disable=function-redefined
def __init__(self):
self.list = []
self.files = [
pathlib.Path(os.sep, "etc", "apt", "sources.list"),
pathlib.Path(os.sep, "etc", "apt", "sources.list.d"),
]
for file in self.files:
if file.is_dir():
for fp in file.glob("**/*.list"):
self.add_file(file=fp)
else:
self.add_file(file)
def __iter__(self):
yield from self.list
def add_file(self, file):
"""
Add the lines of a file to self.list
"""
if file.is_file():
with salt.utils.files.fopen(str(file)) as source:
for line in source:
self.list.append(SourceEntry(line, file=str(file)))
else:
log.debug("The apt sources file %s does not exist", file)
def add(self, type, uri, dist, orig_comps, architectures, signedby):
opts_count = []
opts_line = ""
if architectures:
architectures = f"arch={','.join(architectures)}"
opts_count.append(architectures)
if signedby:
signedby = f"signed-by={signedby}"
opts_count.append(signedby)
if len(opts_count) > 1:
opts_line = f"[{' '.join(opts_count)}]"
elif len(opts_count) == 1:
opts_line = f"[{''.join(opts_count)}]"
repo_line = [
type,
opts_line,
uri,
dist,
" ".join(orig_comps),
]
return SourceEntry(" ".join([line for line in repo_line if line.strip()]))
def remove(self, source):
"""
remove a source from the list of sources
"""
self.list.remove(source)
def save(self):
"""
write all of the sources from the list of sources
to the file.
"""
filemap = {}
with tempfile.TemporaryDirectory() as tmpdir:
for source in self.list:
fname = pathlib.Path(tmpdir, pathlib.Path(source.file).name)
with salt.utils.files.fopen(str(fname), "a") as fp:
fp.write(source.repo_line())
if source.file not in filemap:
filemap[source.file] = {"tmp": fname}
for fp in filemap:
shutil.move(str(filemap[fp]["tmp"]), fp)
def _invalid(line):
"""
This is a workaround since python3-apt does not support
the signed-by argument. This function was removed from
the class to ensure users using the python3-apt module or
not can use the signed-by option.
"""
disabled = False
invalid = False
comment = ""
line = line.strip()
if not line:
invalid = True
return disabled, invalid, comment, ""
if line.startswith("#"):
disabled = True
line = line[1:]
idx = line.find("#")
if idx > 0:
comment = line[idx + 1 :]
line = line[:idx]
cdrom_match = re.match(r"(.*)(cdrom:.*/)(.*)", line.strip())
if cdrom_match:
repo_line = (
[p.strip() for p in cdrom_match.group(1).split()]
+ [cdrom_match.group(2).strip()]
+ [p.strip() for p in cdrom_match.group(3).split()]
)
else:
repo_line = line.strip().split()
if (
not repo_line
or repo_line[0] not in ["deb", "deb-src", "rpm", "rpm-src"]
or len(repo_line) < 3
):
invalid = True
return disabled, invalid, comment, repo_line
if repo_line[1].startswith("["):
if not any(x.endswith("]") for x in repo_line[1:]):
invalid = True
return disabled, invalid, comment, repo_line
return disabled, invalid, comment, repo_line
def _get_opts(line):
"""
Return all opts in [] for a repo line
"""
get_opts = re.search(r"\[(.*=.*)\]", line)
ret = OrderedDict()
if not get_opts:
return ret
opts = get_opts.group(0).strip("[]")
architectures = []
for opt in opts.split():
if opt.startswith("arch"):
architectures.extend(opt.split("=", 1)[1].split(","))
ret["arch"] = {}
ret["arch"]["full"] = opt
ret["arch"]["value"] = architectures
elif opt.startswith("signed-by"):
ret["signedby"] = {}
ret["signedby"]["full"] = opt
ret["signedby"]["value"] = opt.split("=", 1)[1]
else:
other_opt = opt.split("=", 1)[0]
ret[other_opt] = {}
ret[other_opt]["full"] = opt
ret[other_opt]["value"] = opt.split("=", 1)[1]
return ret
def combine_comments(comments):
"""

View file

@ -224,11 +224,15 @@ def test_owner(modules):
# Similar to pkg.owner, but for FreeBSD's pkgng
@pytest.mark.skip_on_freebsd(reason="test for new package manager for FreeBSD")
@pytest.mark.requires_salt_modules("pkg.which")
def test_which(modules):
def test_which(grains, modules):
"""
test finding the package owning a file
"""
ret = modules.pkg.which("/usr/local/bin/salt-call")
if grains["os_family"] in ["Debian", "RedHat"]:
file = "/bin/mknod"
else:
file = "/usr/local/bin/salt-call"
ret = modules.pkg.which(file)
assert len(ret) != 0

View file

@ -0,0 +1,88 @@
import salt.utils.pkg.deb
from salt.utils.pkg.deb import SourceEntry
def test__get_opts():
tests = [
{
"oneline": "deb [signed-by=/etc/apt/keyrings/example.key arch=amd64] https://example.com/pub/repos/apt xenial main",
"result": {
"signedby": {
"full": "signed-by=/etc/apt/keyrings/example.key",
"value": "/etc/apt/keyrings/example.key",
},
"arch": {"full": "arch=amd64", "value": ["amd64"]},
},
},
{
"oneline": "deb [arch=amd64 signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main",
"result": {
"arch": {"full": "arch=amd64", "value": ["amd64"]},
"signedby": {
"full": "signed-by=/etc/apt/keyrings/example.key",
"value": "/etc/apt/keyrings/example.key",
},
},
},
{
"oneline": "deb [arch=amd64] https://example.com/pub/repos/apt xenial main",
"result": {
"arch": {"full": "arch=amd64", "value": ["amd64"]},
},
},
]
for test in tests:
ret = salt.utils.pkg.deb._get_opts(test["oneline"])
assert ret == test["result"]
def test_SourceEntry_init():
source = SourceEntry(
"deb [arch=amd64 signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main",
file="/tmp/test.list",
)
assert source.invalid is False
assert source.comps == ["main"]
assert source.comment == ""
assert source.dist == "xenial"
assert source.type == "deb"
assert source.uri == "https://example.com/pub/repos/apt"
assert source.architectures == ["amd64"]
assert source.signedby == "/etc/apt/keyrings/example.key"
assert source.file == "/tmp/test.list"
def test_SourceEntry_repo_line():
lines = [
"deb [arch=amd64 signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main\n",
"deb [signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main\n",
"deb [signed-by=/etc/apt/keyrings/example.key arch=amd64,x86_64] https://example.com/pub/repos/apt xenial main\n",
]
for line in lines:
source = SourceEntry(line, file="/tmp/test.list")
assert source.invalid is False
assert source.repo_line() == line
lines = [
(
"deb [arch=amd64 signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main\n",
"deb [arch=x86_64 signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main\n",
),
(
"deb [signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main\n",
"deb [signed-by=/etc/apt/keyrings/example.key arch=x86_64] https://example.com/pub/repos/apt xenial main\n",
),
(
"deb [signed-by=/etc/apt/keyrings/example.key arch=amd64,x86_64] https://example.com/pub/repos/apt xenial main\n",
"deb [signed-by=/etc/apt/keyrings/example.key arch=x86_64] https://example.com/pub/repos/apt xenial main\n",
),
]
for line in lines:
line_key, line_value = line
source = SourceEntry(line_key, file="/tmp/test.list")
source.architectures = ["x86_64"]
assert source.invalid is False
assert source.repo_line() == line_value
assert source.invalid is False

View file

@ -18,23 +18,6 @@ from salt.exceptions import (
from salt.utils.odict import OrderedDict
from tests.support.mock import MagicMock, Mock, call, patch
try:
from aptsources.sourceslist import ( # pylint: disable=unused-import
SourceEntry,
SourcesList,
)
HAS_APT = True
except ImportError:
HAS_APT = False
try:
from aptsources import sourceslist # pylint: disable=unused-import
HAS_APTSOURCES = True
except ImportError:
HAS_APTSOURCES = False
log = logging.getLogger(__name__)
@ -380,12 +363,9 @@ def test_get_repo_keys(repo_keys_var):
mock = MagicMock(return_value={"retcode": 0, "stdout": APT_KEY_LIST})
with patch.dict(aptpkg.__salt__, {"cmd.run_all": mock}):
if not HAS_APT:
with patch("os.listdir", return_value="/tmp/keys"):
with patch("pathlib.Path.is_dir", return_value=True):
assert aptpkg.get_repo_keys() == repo_keys_var
else:
assert aptpkg.get_repo_keys() == repo_keys_var
with patch("os.listdir", return_value="/tmp/keys"):
with patch("pathlib.Path.is_dir", return_value=True):
assert aptpkg.get_repo_keys() == repo_keys_var
def test_file_dict(lowpkg_files_var):
@ -1558,21 +1538,17 @@ def _test_sourceslist_multiple_comps_fs(fs):
yield
@pytest.mark.skipif(
HAS_APTSOURCES is True, reason="Only run test with python3-apt library is missing."
)
@pytest.mark.usefixtures("_test_sourceslist_multiple_comps_fs")
def test_sourceslist_multiple_comps():
"""
Test SourcesList when repo has multiple comps
"""
with patch.object(aptpkg, "HAS_APT", return_value=True):
sources = aptpkg.SourcesList()
for source in sources:
assert source.type == "deb"
assert source.uri == "http://archive.ubuntu.com/ubuntu/"
assert source.comps == ["main", "restricted"]
assert source.dist == "focal-updates"
sources = aptpkg.SourcesList()
for source in sources:
assert source.type == "deb"
assert source.uri == "http://archive.ubuntu.com/ubuntu/"
assert source.comps == ["main", "restricted"]
assert source.dist == "focal-updates"
@pytest.fixture(
@ -1592,9 +1568,6 @@ def repo_line(request, fs):
yield request.param
@pytest.mark.skipif(
HAS_APTSOURCES is True, reason="Only run test with python3-apt library is missing."
)
def test_sourceslist_architectures(repo_line):
"""
Test SourcesList when architectures is in repo