diff --git a/changelog/65703.fixed.md b/changelog/65703.fixed.md new file mode 100644 index 00000000000..091c754114d --- /dev/null +++ b/changelog/65703.fixed.md @@ -0,0 +1 @@ +fix 65703 by using OrderedDict instead of a index that breaks. . diff --git a/changelog/66056.changed.md b/changelog/66056.changed.md new file mode 100644 index 00000000000..f088b055cd3 --- /dev/null +++ b/changelog/66056.changed.md @@ -0,0 +1 @@ +re-work the aptpkg module to remove system libraries that onedir and virtualenvs do not have access. Streamline testing, and code use to needed libraries only. diff --git a/changelog/66201.added.md b/changelog/66201.added.md new file mode 100644 index 00000000000..1a5738512a6 --- /dev/null +++ b/changelog/66201.added.md @@ -0,0 +1 @@ +added pkg.which to aptpkg, for finding which package installed a file. diff --git a/salt/modules/aptpkg.py b/salt/modules/aptpkg.py index ff5991c11c1..e04642ac6f4 100644 --- a/salt/modules/aptpkg.py +++ b/salt/modules/aptpkg.py @@ -6,8 +6,6 @@ Support for APT (Advanced Packaging Tool) minion, and it is using a different module (or gives an error similar to *'pkg.install' is not available*), see :ref:`here `. - - For repository management, the ``python-apt`` package must be installed. """ import copy @@ -18,7 +16,6 @@ import os import pathlib import re import shutil -import tempfile import time from urllib.error import HTTPError from urllib.request import Request as _Request @@ -47,32 +44,10 @@ from salt.exceptions import ( SaltInvocationError, ) from salt.modules.cmdmod import _parse_env +from salt.utils.pkg.deb import SourceEntry, SourcesList log = logging.getLogger(__name__) -# pylint: disable=import-error -try: - from aptsources.sourceslist import SourceEntry, SourcesList - - HAS_APT = True -except ImportError: - HAS_APT = False - -try: - import apt_pkg - - HAS_APTPKG = True -except ImportError: - HAS_APTPKG = False - -try: - import softwareproperties.ppa - - HAS_SOFTWAREPROPERTIES = True -except ImportError: - HAS_SOFTWAREPROPERTIES = False -# pylint: enable=import-error - APT_LISTS_PATH = "/var/lib/apt/lists" PKG_ARCH_SEPARATOR = ":" @@ -120,212 +95,6 @@ def __init__(opts): os.environ.update(DPKG_ENV_VARS) -def _invalid(line): - """ - This is a workaround since python3-apt does not support - the signed-by argument. This function was removed from - the class to ensure users using the python3-apt module or - not can use the signed-by option. - """ - disabled = False - invalid = False - comment = "" - line = line.strip() - if not line: - invalid = True - return disabled, invalid, comment, "" - - if line.startswith("#"): - disabled = True - line = line[1:] - - idx = line.find("#") - if idx > 0: - comment = line[idx + 1 :] - line = line[:idx] - - cdrom_match = re.match(r"(.*)(cdrom:.*/)(.*)", line.strip()) - if cdrom_match: - repo_line = ( - [p.strip() for p in cdrom_match.group(1).split()] - + [cdrom_match.group(2).strip()] - + [p.strip() for p in cdrom_match.group(3).split()] - ) - else: - repo_line = line.strip().split() - if ( - not repo_line - or repo_line[0] not in ["deb", "deb-src", "rpm", "rpm-src"] - or len(repo_line) < 3 - ): - invalid = True - return disabled, invalid, comment, repo_line - - if repo_line[1].startswith("["): - if not any(x.endswith("]") for x in repo_line[1:]): - invalid = True - return disabled, invalid, comment, repo_line - - return disabled, invalid, comment, repo_line - - -if not HAS_APT: - - class SourceEntry: # pylint: disable=function-redefined - def __init__(self, line, file=None): - self.invalid = False - self.comps = [] - self.disabled = False - self.comment = "" - self.dist = "" - self.type = "" - self.uri = "" - self.line = line - self.architectures = [] - self.signedby = "" - self.file = file - if not self.file: - self.file = str(pathlib.Path(os.sep, "etc", "apt", "sources.list")) - self._parse_sources(line) - - def str(self): - return self.repo_line() - - def repo_line(self): - """ - Return the repo line for the sources file - """ - repo_line = [] - if self.invalid: - return self.line - if self.disabled: - repo_line.append("#") - - repo_line.append(self.type) - opts = _get_opts(self.line) - if self.architectures: - archs = ",".join(self.architectures) - opts["arch"]["full"] = f"arch={archs}" - opts["arch"]["value"] = self.architectures - if self.signedby: - opts["signedby"]["full"] = f"signed-by={self.signedby}" - opts["signedby"]["value"] = self.signedby - - ordered_opts = [ - opt_type for opt_type, opt in opts.items() if opt["full"] != "" - ] - - for opt in opts.values(): - if opt["full"] != "": - ordered_opts[opt["index"]] = opt["full"] - - if ordered_opts: - repo_line.append("[{}]".format(" ".join(ordered_opts))) - - repo_line += [self.uri, self.dist, " ".join(self.comps)] - if self.comment: - repo_line.append(f"#{self.comment}") - return " ".join(repo_line) + "\n" - - def _parse_sources(self, line): - """ - Parse lines from sources files - """ - self.disabled, self.invalid, self.comment, repo_line = _invalid(line) - if self.invalid: - return False - if repo_line[1].startswith("["): - repo_line = [x for x in (line.strip("[]") for line in repo_line) if x] - opts = _get_opts(self.line) - self.architectures.extend(opts["arch"]["value"]) - self.signedby = opts["signedby"]["value"] - for opt in opts: - opt = opts[opt]["full"] - if opt: - try: - repo_line.pop(repo_line.index(opt)) - except ValueError: - repo_line.pop(repo_line.index("[" + opt + "]")) - self.type = repo_line[0] - self.uri = repo_line[1] - self.dist = repo_line[2] - self.comps = repo_line[3:] - return True - - class SourcesList: # pylint: disable=function-redefined - def __init__(self): - self.list = [] - self.files = [ - pathlib.Path(os.sep, "etc", "apt", "sources.list"), - pathlib.Path(os.sep, "etc", "apt", "sources.list.d"), - ] - for file in self.files: - if file.is_dir(): - for fp in file.glob("**/*.list"): - self.add_file(file=fp) - else: - self.add_file(file) - - def __iter__(self): - yield from self.list - - def add_file(self, file): - """ - Add the lines of a file to self.list - """ - if file.is_file(): - with salt.utils.files.fopen(str(file)) as source: - for line in source: - self.list.append(SourceEntry(line, file=str(file))) - else: - log.debug("The apt sources file %s does not exist", file) - - def add(self, type, uri, dist, orig_comps, architectures, signedby): - opts_count = [] - opts_line = "" - if architectures: - architectures = "arch={}".format(",".join(architectures)) - opts_count.append(architectures) - if signedby: - signedby = f"signed-by={signedby}" - opts_count.append(signedby) - if len(opts_count) > 1: - opts_line = "[" + " ".join(opts_count) + "]" - elif len(opts_count) == 1: - opts_line = "[" + "".join(opts_count) + "]" - repo_line = [ - type, - opts_line, - uri, - dist, - " ".join(orig_comps), - ] - return SourceEntry(" ".join([line for line in repo_line if line.strip()])) - - def remove(self, source): - """ - remove a source from the list of sources - """ - self.list.remove(source) - - def save(self): - """ - write all of the sources from the list of sources - to the file. - """ - filemap = {} - with tempfile.TemporaryDirectory() as tmpdir: - for source in self.list: - fname = pathlib.Path(tmpdir, pathlib.Path(source.file).name) - with salt.utils.files.fopen(str(fname), "a") as fp: - fp.write(source.repo_line()) - if source.file not in filemap: - filemap[source.file] = {"tmp": fname} - - for fp in filemap: - shutil.move(str(filemap[fp]["tmp"]), fp) - - def _get_ppa_info_from_launchpad(owner_name, ppa_name): """ Idea from softwareproperties.ppa. @@ -338,21 +107,12 @@ def _get_ppa_info_from_launchpad(owner_name, ppa_name): :return: """ - lp_url = "https://launchpad.net/api/1.0/~{}/+archive/{}".format( - owner_name, ppa_name - ) + lp_url = f"https://launchpad.net/api/1.0/~{owner_name}/+archive/{ppa_name}" request = _Request(lp_url, headers={"Accept": "application/json"}) lp_page = _urlopen(request) return salt.utils.json.load(lp_page) -def _reconstruct_ppa_name(owner_name, ppa_name): - """ - Stringify PPA name from args. - """ - return f"ppa:{owner_name}/{ppa_name}" - - def _call_apt(args, scope=True, **kwargs): """ Call apt* utilities. @@ -383,18 +143,6 @@ def _call_apt(args, scope=True, **kwargs): return cmd_ret -def _warn_software_properties(repo): - """ - Warn of missing python-software-properties package. - """ - log.warning( - "The 'python-software-properties' package is not installed. " - "For more accurate support of PPA repositories, you should " - "install this package." - ) - log.warning("Best guess at ppa format: %s", repo) - - def normalize_name(name): """ Strips the architecture from the specified package name, if necessary. @@ -635,9 +383,7 @@ def refresh_db(cache_valid_time=0, failhard=False, **kwargs): error_repos.append(ident) if failhard and error_repos: - raise CommandExecutionError( - "Error getting repos: {}".format(", ".join(error_repos)) - ) + raise CommandExecutionError(f"Error getting repos: {', '.join(error_repos)}") return ret @@ -863,22 +609,23 @@ def install( ) else: pkg_params_items = [] - for pkg_source in pkg_params: - if "lowpkg.bin_pkg_info" in __salt__: + # we don't need to do the test below for every package in the list. + # it either exists or doesn't. test once then loop. + if "lowpkg.bin_pkg_info" in __salt__: + for pkg_source in pkg_params: deb_info = __salt__["lowpkg.bin_pkg_info"](pkg_source) - else: - deb_info = None - if deb_info is None: + pkg_params_items.append( + [deb_info["name"], pkg_source, deb_info["version"]] + ) + else: + for pkg_source in pkg_params: log.error( "pkg.install: Unable to get deb information for %s. " "Version comparisons will be unavailable.", pkg_source, ) pkg_params_items.append([pkg_source]) - else: - pkg_params_items.append( - [deb_info["name"], pkg_source, deb_info["version"]] - ) + # Build command prefix cmd_prefix.extend(["apt-get", "-q", "-y"]) if kwargs.get("force_yes", False): @@ -886,8 +633,14 @@ def install( if "force_conf_new" in kwargs and kwargs["force_conf_new"]: cmd_prefix.extend(["-o", "DPkg::Options::=--force-confnew"]) else: - cmd_prefix.extend(["-o", "DPkg::Options::=--force-confold"]) - cmd_prefix += ["-o", "DPkg::Options::=--force-confdef"] + cmd_prefix.extend( + [ + "-o", + "DPkg::Options::=--force-confold", + "-o", + "DPkg::Options::=--force-confdef", + ] + ) if "install_recommends" in kwargs: if not kwargs["install_recommends"]: cmd_prefix.append("--no-install-recommends") @@ -942,12 +695,8 @@ def install( ) if target is None: errors.append( - "No version matching '{}{}' could be found " - "(available: {})".format( - pkgname, - version_num, - ", ".join(candidates) if candidates else None, - ) + f"No version matching '{pkgname}{version_num}' could be found " + f"(available: {', '.join(candidates) if candidates else None})" ) continue else: @@ -1412,9 +1161,7 @@ def hold(name=None, pkgs=None, sources=None, **kwargs): # pylint: disable=W0613 ret[target]["comment"] = f"Package {target} is now being held." else: ret[target].update(result=True) - ret[target]["comment"] = "Package {} is already set to be held.".format( - target - ) + ret[target]["comment"] = f"Package {target} is already set to be held." return ret @@ -1470,20 +1217,14 @@ def unhold(name=None, pkgs=None, sources=None, **kwargs): # pylint: disable=W06 elif salt.utils.data.is_true(state.get("hold", False)): if "test" in __opts__ and __opts__["test"]: ret[target].update(result=None) - ret[target]["comment"] = "Package {} is set not to be held.".format( - target - ) + ret[target]["comment"] = f"Package {target} is set not to be held." else: result = set_selections(selection={"install": [target]}) ret[target].update(changes=result[target], result=True) - ret[target]["comment"] = "Package {} is no longer being held.".format( - target - ) + ret[target]["comment"] = f"Package {target} is no longer being held." else: ret[target].update(result=True) - ret[target]["comment"] = "Package {} is already set not to be held.".format( - target - ) + ret[target]["comment"] = f"Package {target} is already set not to be held." return ret @@ -1603,7 +1344,7 @@ def _get_upgradable(dist_upgrade=True, **kwargs): else: cmd.append("upgrade") try: - cmd.extend(["-o", "APT::Default-Release={}".format(kwargs["fromrepo"])]) + cmd.extend(["-o", f"APT::Default-Release={kwargs['fromrepo']}"]) except KeyError: pass @@ -1708,23 +1449,6 @@ def version_cmp(pkg1, pkg2, ignore_epoch=False, **kwargs): # if we have apt_pkg, this will be quickier this way # and also do not rely on shell. - if HAS_APTPKG: - try: - # the apt_pkg module needs to be manually initialized - apt_pkg.init_system() - - # if there is a difference in versions, apt_pkg.version_compare will - # return an int representing the difference in minor versions, or - # 1/-1 if the difference is smaller than minor versions. normalize - # to -1, 0 or 1. - try: - ret = apt_pkg.version_compare(pkg1, pkg2) - except TypeError: - ret = apt_pkg.version_compare(str(pkg1), str(pkg2)) - return 1 if ret > 0 else -1 if ret < 0 else 0 - except Exception: # pylint: disable=broad-except - # Try to use shell version in case of errors w/python bindings - pass try: for oper, ret in (("lt", -1), ("eq", 0), ("gt", 1)): cmd = ["dpkg", "--compare-versions", pkg1, oper, pkg2] @@ -1738,54 +1462,13 @@ def version_cmp(pkg1, pkg2, ignore_epoch=False, **kwargs): return None -def _get_opts(line): - """ - Return all opts in [] for a repo line - """ - get_opts = re.search(r"\[(.*=.*)\]", line) - ret = { - "arch": {"full": "", "value": "", "index": 0}, - "signedby": {"full": "", "value": "", "index": 0}, - } - - if not get_opts: - return ret - opts = get_opts.group(0).strip("[]") - architectures = [] - for idx, opt in enumerate(opts.split()): - if opt.startswith("arch"): - architectures.extend(opt.split("=", 1)[1].split(",")) - ret["arch"]["full"] = opt - ret["arch"]["value"] = architectures - ret["arch"]["index"] = idx - elif opt.startswith("signed-by"): - ret["signedby"]["full"] = opt - ret["signedby"]["value"] = opt.split("=", 1)[1] - ret["signedby"]["index"] = idx - else: - other_opt = opt.split("=", 1)[0] - ret[other_opt] = {} - ret[other_opt]["full"] = opt - ret[other_opt]["value"] = opt.split("=", 1)[1] - ret[other_opt]["index"] = idx - return ret - - def _split_repo_str(repo): """ Return APT source entry as a dictionary """ entry = SourceEntry(repo) invalid = entry.invalid - if not HAS_APT: - signedby = entry.signedby - else: - signedby = _get_opts(line=repo)["signedby"].get("value", "") - if signedby: - # python3-apt does not support signedby. So if signedby - # is in the repo we have to check our code to see if the - # repo is invalid ourselves. - _, invalid, _, _ = _invalid(repo) + signedby = entry.signedby return { "invalid": invalid, @@ -1945,10 +1628,7 @@ def list_repos(**kwargs): for source in sources.list: if _skip_source(source): continue - if not HAS_APT: - signedby = source.signedby - else: - signedby = _get_opts(line=source.line)["signedby"].get("value", "") + signedby = source.signedby repo = {} repo["file"] = source.file repo["comps"] = getattr(source, "comps", []) @@ -1987,20 +1667,7 @@ def get_repo(repo, **kwargs): auth_info = f"{ppa_auth}@" repo = LP_PVT_SRC_FORMAT.format(auth_info, owner_name, ppa_name, dist) else: - if HAS_SOFTWAREPROPERTIES: - try: - if hasattr(softwareproperties.ppa, "PPAShortcutHandler"): - repo = softwareproperties.ppa.PPAShortcutHandler(repo).expand( - dist - )[0] - else: - repo = softwareproperties.ppa.expand_ppa_line(repo, dist)[0] - except NameError as name_error: - raise CommandExecutionError( - f"Could not find ppa {repo}: {name_error}" - ) - else: - repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist) + repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist) repos = list_repos() @@ -2011,9 +1678,9 @@ def get_repo(repo, **kwargs): uri_match = re.search("(http[s]?://)(.+)", repo_entry["uri"]) if uri_match: if not uri_match.group(2).startswith(ppa_auth): - repo_entry["uri"] = "{}{}@{}".format( - uri_match.group(1), ppa_auth, uri_match.group(2) - ) + repo_entry[ + "uri" + ] = f"{uri_match.group(1)}{ppa_auth}@{uri_match.group(2)}" except SyntaxError: raise CommandExecutionError( f"Error: repo '{repo}' is not a well formatted definition" @@ -2057,19 +1724,12 @@ def del_repo(repo, **kwargs): # to derive the name. is_ppa = True dist = __grains__["oscodename"] - if not HAS_SOFTWAREPROPERTIES: - _warn_software_properties(repo) - owner_name, ppa_name = repo[4:].split("/") - if "ppa_auth" in kwargs: - auth_info = "{}@".format(kwargs["ppa_auth"]) - repo = LP_PVT_SRC_FORMAT.format(auth_info, dist, owner_name, ppa_name) - else: - repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist) + owner_name, ppa_name = repo[4:].split("/") + if "ppa_auth" in kwargs: + auth_info = f"{kwargs['ppa_auth']}@" + repo = LP_PVT_SRC_FORMAT.format(auth_info, dist, owner_name, ppa_name) else: - if hasattr(softwareproperties.ppa, "PPAShortcutHandler"): - repo = softwareproperties.ppa.PPAShortcutHandler(repo).expand(dist)[0] - else: - repo = softwareproperties.ppa.expand_ppa_line(repo, dist)[0] + repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist) sources = SourcesList() repos = [s for s in sources.list if not s.invalid] @@ -2410,9 +2070,7 @@ def add_repo_key( kwargs.update({"stdin": text}) elif keyserver: if not keyid: - error_msg = "No keyid or keyid too short for keyserver: {}".format( - keyserver - ) + error_msg = f"No keyid or keyid too short for keyserver: {keyserver}" raise SaltInvocationError(error_msg) if not aptkey: @@ -2657,30 +2315,20 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs): out = _call_apt(cmd, env=env, scope=False, **kwargs) if out["retcode"]: raise CommandExecutionError( - "Unable to add PPA '{}'. '{}' exited with " - "status {!s}: '{}' ".format( - repo[4:], cmd, out["retcode"], out["stderr"] - ) + f"Unable to add PPA '{repo[4:]}'. '{cmd}' exited with status {out['retcode']!s}: '{out['stderr']}'" ) # explicit refresh when a repo is modified. if refresh: refresh_db() return {repo: out} else: - if not HAS_SOFTWAREPROPERTIES: - _warn_software_properties(repo) - else: - log.info("Falling back to urllib method for private PPA") # fall back to urllib style try: owner_name, ppa_name = repo[4:].split("/", 1) except ValueError: raise CommandExecutionError( - "Unable to get PPA info from argument. " - 'Expected format "/" ' - "(e.g. saltstack/salt) not found. Received " - "'{}' instead.".format(repo[4:]) + f"Unable to get PPA info from argument. Expected format \"/\" (e.g. saltstack/salt) not found. Received '{repo[4:]}' instead." ) dist = __grains__["oscodename"] # ppa has a lot of implicit arguments. Make them explicit. @@ -2688,8 +2336,10 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs): kwargs["dist"] = dist ppa_auth = "" if "file" not in kwargs: - filename = "/etc/apt/sources.list.d/{0}-{1}-{2}.list" - kwargs["file"] = filename.format(owner_name, ppa_name, dist) + filename = ( + f"/etc/apt/sources.list.d/{owner_name}-{ppa_name}-{dist}.list" + ) + kwargs["file"] = filename try: launchpad_ppa_info = _get_ppa_info_from_launchpad( owner_name, ppa_name @@ -2698,23 +2348,16 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs): kwargs["keyid"] = launchpad_ppa_info["signing_key_fingerprint"] else: if "keyid" not in kwargs: - error_str = ( - "Private PPAs require a keyid to be specified: {0}/{1}" - ) raise CommandExecutionError( - error_str.format(owner_name, ppa_name) + f"Private PPAs require a keyid to be specified: {owner_name}/{ppa_name}" ) except HTTPError as exc: raise CommandExecutionError( - "Launchpad does not know about {}/{}: {}".format( - owner_name, ppa_name, exc - ) + f"Launchpad does not know about {owner_name}/{ppa_name}: {exc}" ) except IndexError as exc: raise CommandExecutionError( - "Launchpad knows about {}/{} but did not " - "return a fingerprint. Please set keyid " - "manually: {}".format(owner_name, ppa_name, exc) + f"Launchpad knows about {owner_name}/{ppa_name} but did not return a fingerprint. Please set keyid manually: {exc}" ) if "keyserver" not in kwargs: @@ -2722,15 +2365,13 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs): if "ppa_auth" in kwargs: if not launchpad_ppa_info["private"]: raise CommandExecutionError( - "PPA is not private but auth credentials passed: {}".format( - repo - ) + f"PPA is not private but auth credentials passed: {repo}" ) # assign the new repo format to the "repo" variable # so we can fall through to the "normal" mechanism # here. if "ppa_auth" in kwargs: - ppa_auth = "{}@".format(kwargs["ppa_auth"]) + ppa_auth = f"{kwargs['ppa_auth']}@" repo = LP_PVT_SRC_FORMAT.format( ppa_auth, owner_name, ppa_name, dist ) @@ -2758,12 +2399,7 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs): repos = [] for source in sources: - if HAS_APT: - _, invalid, _, _ = _invalid(source.line) - if not invalid: - repos.append(source) - else: - repos.append(source) + repos.append(source) mod_source = None try: @@ -2854,9 +2490,7 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs): ret = _call_apt(cmd, scope=False, **kwargs) if ret["retcode"] != 0: raise CommandExecutionError( - "Error: key retrieval failed: {}".format( - ret["stdout"] - ) + f"Error: key retrieval failed: {ret['stdout']}" ) elif "key_url" in kwargs: @@ -2965,10 +2599,7 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs): if refresh: refresh_db() - if not HAS_APT: - signedby = mod_source.signedby - else: - signedby = _get_opts(repo)["signedby"].get("value", "") + signedby = mod_source.signedby return { repo: { @@ -3039,19 +2670,12 @@ def _expand_repo_def(os_name, os_codename=None, **kwargs): auth_info = "{}@".format(kwargs["ppa_auth"]) repo = LP_PVT_SRC_FORMAT.format(auth_info, owner_name, ppa_name, dist) else: - if HAS_SOFTWAREPROPERTIES: - if hasattr(softwareproperties.ppa, "PPAShortcutHandler"): - repo = softwareproperties.ppa.PPAShortcutHandler(repo).expand(dist)[ - 0 - ] - else: - repo = softwareproperties.ppa.expand_ppa_line(repo, dist)[0] - else: - repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist) + repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist) if "file" not in kwargs: - filename = "/etc/apt/sources.list.d/{0}-{1}-{2}.list" - kwargs["file"] = filename.format(owner_name, ppa_name, dist) + kwargs[ + "file" + ] = f"/etc/apt/sources.list.d/{owner_name}-{ppa_name}-{dist}.list" source_entry = SourceEntry(repo) for list_args in ("architectures", "comps"): @@ -3065,11 +2689,8 @@ def _expand_repo_def(os_name, os_codename=None, **kwargs): source_list = SourcesList() kwargs = {} - if not HAS_APT: - signedby = source_entry.signedby - kwargs["signedby"] = signedby - else: - signedby = _get_opts(repo)["signedby"].get("value", "") + signedby = source_entry.signedby + kwargs["signedby"] = signedby _source_entry = source_list.add( type=source_entry.type, @@ -3094,28 +2715,6 @@ def _expand_repo_def(os_name, os_codename=None, **kwargs): sanitized["line"] = _source_entry.line.strip() sanitized["architectures"] = getattr(_source_entry, "architectures", []) sanitized["signedby"] = signedby - if HAS_APT and signedby: - # python3-apt does not supported the signed-by opt currently. - # creating the line with all opts including signed-by - if signedby not in sanitized["line"]: - line = sanitized["line"].split() - repo_opts = _get_opts(repo) - opts_order = [ - opt_type - for opt_type, opt_def in repo_opts.items() - if opt_def["full"] != "" - ] - for opt in repo_opts: - if "index" in repo_opts[opt]: - idx = repo_opts[opt]["index"] - opts_order[idx] = repo_opts[opt]["full"] - - opts = "[" + " ".join(opts_order) + "]" - if line[1].startswith("["): - line[1] = opts - else: - line.insert(1, opts) - sanitized["line"] = " ".join(line) return sanitized @@ -3251,9 +2850,7 @@ def set_selections(path=None, selection=None, clear=False, saltenv="base"): valid_states = ("install", "hold", "deinstall", "purge") bad_states = [x for x in selection if x not in valid_states] if bad_states: - raise SaltInvocationError( - "Invalid state(s): {}".format(", ".join(bad_states)) - ) + raise SaltInvocationError(f"Invalid state(s): {', '.join(bad_states)}") if clear: cmd = ["dpkg", "--clear-selections"] @@ -3547,3 +3144,31 @@ def services_need_restart(**kwargs): services.add(service) return list(services) + + +def which(path): + """ + Displays which package installed a specific file + + CLI Examples: + + .. code-block:: bash + + salt * pkg.which + """ + filepath = pathlib.Path(path) + cmd = ["dpkg"] + if filepath.is_absolute(): + if filepath.exists(): + cmd.extend(["-S", str(filepath)]) + else: + log.debug("%s does not exist", filepath) + return False + else: + log.debug("%s is not absolute path", filepath) + return False + cmd_ret = _call_apt(cmd) + if "no path found matching pattern" in cmd_ret["stdout"]: + return None + pkg = cmd_ret["stdout"].split(":")[0] + return pkg diff --git a/salt/utils/pkg/deb.py b/salt/utils/pkg/deb.py index c8bfa8ae20e..6a3b3a833b2 100644 --- a/salt/utils/pkg/deb.py +++ b/salt/utils/pkg/deb.py @@ -2,6 +2,254 @@ Common functions for working with deb packages """ +import logging +import os +import pathlib +import re +import shutil +import tempfile +from collections import OrderedDict + +import salt.utils.files + +log = logging.getLogger(__name__) + + +class SourceEntry: # pylint: disable=function-redefined + def __init__(self, line, file=None): + self.invalid = False + self.comps = [] + self.disabled = False + self.comment = "" + self.dist = "" + self.type = "" + self.uri = "" + self.line = line + self.architectures = [] + self.signedby = "" + self.file = file + if not self.file: + self.file = str(pathlib.Path(os.sep, "etc", "apt", "sources.list")) + self._parse_sources(line) + + def str(self): + return self.repo_line() + + def repo_line(self): + """ + Return the repo line for the sources file + """ + repo_line = [] + if self.invalid: + return self.line + if self.disabled: + repo_line.append("#") + + repo_line.append(self.type) + opts = _get_opts(self.line) + if self.architectures: + if "arch" not in opts: + opts["arch"] = {} + opts["arch"]["full"] = f"arch={','.join(self.architectures)}" + opts["arch"]["value"] = self.architectures + if self.signedby: + if "signedby" not in opts: + opts["signedby"] = {} + opts["signedby"]["full"] = f"signed-by={self.signedby}" + opts["signedby"]["value"] = self.signedby + + ordered_opts = [] + for opt in opts.values(): + if opt["full"] != "": + ordered_opts.append(opt["full"]) + + if ordered_opts: + repo_line.append(f"[{' '.join(ordered_opts)}]") + + repo_line += [self.uri, self.dist, " ".join(self.comps)] + if self.comment: + repo_line.append(f"#{self.comment}") + return " ".join(repo_line) + "\n" + + def _parse_sources(self, line): + """ + Parse lines from sources files + """ + self.disabled, self.invalid, self.comment, repo_line = _invalid(line) + if self.invalid: + return False + if repo_line[1].startswith("["): + repo_line = [x for x in (line.strip("[]") for line in repo_line) if x] + opts = _get_opts(self.line) + if "arch" in opts: + self.architectures.extend(opts["arch"]["value"]) + if "signedby" in opts: + self.signedby = opts["signedby"]["value"] + for opt in opts.values(): + opt = opt["full"] + if opt: + try: + repo_line.pop(repo_line.index(opt)) + except ValueError: + repo_line.pop(repo_line.index(f"[{opt}]")) + self.type = repo_line[0] + self.uri = repo_line[1] + self.dist = repo_line[2] + self.comps = repo_line[3:] + return True + + +class SourcesList: # pylint: disable=function-redefined + def __init__(self): + self.list = [] + self.files = [ + pathlib.Path(os.sep, "etc", "apt", "sources.list"), + pathlib.Path(os.sep, "etc", "apt", "sources.list.d"), + ] + for file in self.files: + if file.is_dir(): + for fp in file.glob("**/*.list"): + self.add_file(file=fp) + else: + self.add_file(file) + + def __iter__(self): + yield from self.list + + def add_file(self, file): + """ + Add the lines of a file to self.list + """ + if file.is_file(): + with salt.utils.files.fopen(str(file)) as source: + for line in source: + self.list.append(SourceEntry(line, file=str(file))) + else: + log.debug("The apt sources file %s does not exist", file) + + def add(self, type, uri, dist, orig_comps, architectures, signedby): + opts_count = [] + opts_line = "" + if architectures: + architectures = f"arch={','.join(architectures)}" + opts_count.append(architectures) + if signedby: + signedby = f"signed-by={signedby}" + opts_count.append(signedby) + if len(opts_count) > 1: + opts_line = f"[{' '.join(opts_count)}]" + elif len(opts_count) == 1: + opts_line = f"[{''.join(opts_count)}]" + repo_line = [ + type, + opts_line, + uri, + dist, + " ".join(orig_comps), + ] + return SourceEntry(" ".join([line for line in repo_line if line.strip()])) + + def remove(self, source): + """ + remove a source from the list of sources + """ + self.list.remove(source) + + def save(self): + """ + write all of the sources from the list of sources + to the file. + """ + filemap = {} + with tempfile.TemporaryDirectory() as tmpdir: + for source in self.list: + fname = pathlib.Path(tmpdir, pathlib.Path(source.file).name) + with salt.utils.files.fopen(str(fname), "a") as fp: + fp.write(source.repo_line()) + if source.file not in filemap: + filemap[source.file] = {"tmp": fname} + + for fp in filemap: + shutil.move(str(filemap[fp]["tmp"]), fp) + + +def _invalid(line): + """ + This is a workaround since python3-apt does not support + the signed-by argument. This function was removed from + the class to ensure users using the python3-apt module or + not can use the signed-by option. + """ + disabled = False + invalid = False + comment = "" + line = line.strip() + if not line: + invalid = True + return disabled, invalid, comment, "" + + if line.startswith("#"): + disabled = True + line = line[1:] + + idx = line.find("#") + if idx > 0: + comment = line[idx + 1 :] + line = line[:idx] + + cdrom_match = re.match(r"(.*)(cdrom:.*/)(.*)", line.strip()) + if cdrom_match: + repo_line = ( + [p.strip() for p in cdrom_match.group(1).split()] + + [cdrom_match.group(2).strip()] + + [p.strip() for p in cdrom_match.group(3).split()] + ) + else: + repo_line = line.strip().split() + if ( + not repo_line + or repo_line[0] not in ["deb", "deb-src", "rpm", "rpm-src"] + or len(repo_line) < 3 + ): + invalid = True + return disabled, invalid, comment, repo_line + + if repo_line[1].startswith("["): + if not any(x.endswith("]") for x in repo_line[1:]): + invalid = True + return disabled, invalid, comment, repo_line + + return disabled, invalid, comment, repo_line + + +def _get_opts(line): + """ + Return all opts in [] for a repo line + """ + get_opts = re.search(r"\[(.*=.*)\]", line) + ret = OrderedDict() + + if not get_opts: + return ret + opts = get_opts.group(0).strip("[]") + architectures = [] + for opt in opts.split(): + if opt.startswith("arch"): + architectures.extend(opt.split("=", 1)[1].split(",")) + ret["arch"] = {} + ret["arch"]["full"] = opt + ret["arch"]["value"] = architectures + elif opt.startswith("signed-by"): + ret["signedby"] = {} + ret["signedby"]["full"] = opt + ret["signedby"]["value"] = opt.split("=", 1)[1] + else: + other_opt = opt.split("=", 1)[0] + ret[other_opt] = {} + ret[other_opt]["full"] = opt + ret[other_opt]["value"] = opt.split("=", 1)[1] + return ret + def combine_comments(comments): """ diff --git a/tests/pytests/functional/modules/test_pkg.py b/tests/pytests/functional/modules/test_pkg.py index 6af7533f2e4..a7f1ee60b9d 100644 --- a/tests/pytests/functional/modules/test_pkg.py +++ b/tests/pytests/functional/modules/test_pkg.py @@ -224,11 +224,15 @@ def test_owner(modules): # Similar to pkg.owner, but for FreeBSD's pkgng @pytest.mark.skip_on_freebsd(reason="test for new package manager for FreeBSD") @pytest.mark.requires_salt_modules("pkg.which") -def test_which(modules): +def test_which(grains, modules): """ test finding the package owning a file """ - ret = modules.pkg.which("/usr/local/bin/salt-call") + if grains["os_family"] in ["Debian", "RedHat"]: + file = "/bin/mknod" + else: + file = "/usr/local/bin/salt-call" + ret = modules.pkg.which(file) assert len(ret) != 0 diff --git a/tests/pytests/functional/utils/pkg/__init__.py b/tests/pytests/functional/utils/pkg/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/pytests/functional/utils/pkg/test_deb.py b/tests/pytests/functional/utils/pkg/test_deb.py new file mode 100644 index 00000000000..37b1fb0ed33 --- /dev/null +++ b/tests/pytests/functional/utils/pkg/test_deb.py @@ -0,0 +1,88 @@ +import salt.utils.pkg.deb +from salt.utils.pkg.deb import SourceEntry + + +def test__get_opts(): + tests = [ + { + "oneline": "deb [signed-by=/etc/apt/keyrings/example.key arch=amd64] https://example.com/pub/repos/apt xenial main", + "result": { + "signedby": { + "full": "signed-by=/etc/apt/keyrings/example.key", + "value": "/etc/apt/keyrings/example.key", + }, + "arch": {"full": "arch=amd64", "value": ["amd64"]}, + }, + }, + { + "oneline": "deb [arch=amd64 signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main", + "result": { + "arch": {"full": "arch=amd64", "value": ["amd64"]}, + "signedby": { + "full": "signed-by=/etc/apt/keyrings/example.key", + "value": "/etc/apt/keyrings/example.key", + }, + }, + }, + { + "oneline": "deb [arch=amd64] https://example.com/pub/repos/apt xenial main", + "result": { + "arch": {"full": "arch=amd64", "value": ["amd64"]}, + }, + }, + ] + + for test in tests: + ret = salt.utils.pkg.deb._get_opts(test["oneline"]) + assert ret == test["result"] + + +def test_SourceEntry_init(): + source = SourceEntry( + "deb [arch=amd64 signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main", + file="/tmp/test.list", + ) + assert source.invalid is False + assert source.comps == ["main"] + assert source.comment == "" + assert source.dist == "xenial" + assert source.type == "deb" + assert source.uri == "https://example.com/pub/repos/apt" + assert source.architectures == ["amd64"] + assert source.signedby == "/etc/apt/keyrings/example.key" + assert source.file == "/tmp/test.list" + + +def test_SourceEntry_repo_line(): + + lines = [ + "deb [arch=amd64 signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main\n", + "deb [signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main\n", + "deb [signed-by=/etc/apt/keyrings/example.key arch=amd64,x86_64] https://example.com/pub/repos/apt xenial main\n", + ] + for line in lines: + source = SourceEntry(line, file="/tmp/test.list") + assert source.invalid is False + assert source.repo_line() == line + + lines = [ + ( + "deb [arch=amd64 signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main\n", + "deb [arch=x86_64 signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main\n", + ), + ( + "deb [signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main\n", + "deb [signed-by=/etc/apt/keyrings/example.key arch=x86_64] https://example.com/pub/repos/apt xenial main\n", + ), + ( + "deb [signed-by=/etc/apt/keyrings/example.key arch=amd64,x86_64] https://example.com/pub/repos/apt xenial main\n", + "deb [signed-by=/etc/apt/keyrings/example.key arch=x86_64] https://example.com/pub/repos/apt xenial main\n", + ), + ] + for line in lines: + line_key, line_value = line + source = SourceEntry(line_key, file="/tmp/test.list") + source.architectures = ["x86_64"] + assert source.invalid is False + assert source.repo_line() == line_value + assert source.invalid is False diff --git a/tests/pytests/unit/modules/test_aptpkg.py b/tests/pytests/unit/modules/test_aptpkg.py index 159b5db3537..f1be3ca8b29 100644 --- a/tests/pytests/unit/modules/test_aptpkg.py +++ b/tests/pytests/unit/modules/test_aptpkg.py @@ -18,23 +18,6 @@ from salt.exceptions import ( from salt.utils.odict import OrderedDict from tests.support.mock import MagicMock, Mock, call, patch -try: - from aptsources.sourceslist import ( # pylint: disable=unused-import - SourceEntry, - SourcesList, - ) - - HAS_APT = True -except ImportError: - HAS_APT = False - -try: - from aptsources import sourceslist # pylint: disable=unused-import - - HAS_APTSOURCES = True -except ImportError: - HAS_APTSOURCES = False - log = logging.getLogger(__name__) @@ -380,12 +363,9 @@ def test_get_repo_keys(repo_keys_var): mock = MagicMock(return_value={"retcode": 0, "stdout": APT_KEY_LIST}) with patch.dict(aptpkg.__salt__, {"cmd.run_all": mock}): - if not HAS_APT: - with patch("os.listdir", return_value="/tmp/keys"): - with patch("pathlib.Path.is_dir", return_value=True): - assert aptpkg.get_repo_keys() == repo_keys_var - else: - assert aptpkg.get_repo_keys() == repo_keys_var + with patch("os.listdir", return_value="/tmp/keys"): + with patch("pathlib.Path.is_dir", return_value=True): + assert aptpkg.get_repo_keys() == repo_keys_var def test_file_dict(lowpkg_files_var): @@ -1558,21 +1538,17 @@ def _test_sourceslist_multiple_comps_fs(fs): yield -@pytest.mark.skipif( - HAS_APTSOURCES is True, reason="Only run test with python3-apt library is missing." -) @pytest.mark.usefixtures("_test_sourceslist_multiple_comps_fs") def test_sourceslist_multiple_comps(): """ Test SourcesList when repo has multiple comps """ - with patch.object(aptpkg, "HAS_APT", return_value=True): - sources = aptpkg.SourcesList() - for source in sources: - assert source.type == "deb" - assert source.uri == "http://archive.ubuntu.com/ubuntu/" - assert source.comps == ["main", "restricted"] - assert source.dist == "focal-updates" + sources = aptpkg.SourcesList() + for source in sources: + assert source.type == "deb" + assert source.uri == "http://archive.ubuntu.com/ubuntu/" + assert source.comps == ["main", "restricted"] + assert source.dist == "focal-updates" @pytest.fixture( @@ -1592,9 +1568,6 @@ def repo_line(request, fs): yield request.param -@pytest.mark.skipif( - HAS_APTSOURCES is True, reason="Only run test with python3-apt library is missing." -) def test_sourceslist_architectures(repo_line): """ Test SourcesList when architectures is in repo