Vendor `importlib_metadata==3.3.0` to used only when no newer version is available

This commit is contained in:
Pedro Algarvio 2021-08-25 15:11:03 +01:00 committed by Megan Wilhite
parent bd7195c3c6
commit ff1b8d964a
7 changed files with 4344 additions and 84 deletions

View file

@ -10,3 +10,22 @@ if sys.version_info >= (3, 9, 5):
import ipaddress
else:
import salt.ext.ipaddress as ipaddress
if sys.version_info >= (3, 6):
# importlib_metadata available for python version lower than 3.6 do not
# include the functionality we need.
try:
import importlib_metadata
importlib_metadata_version = [
int(part)
for part in importlib_metadata.version("importlib_metadata").split(".")
if part.isdigit()
]
if tuple(importlib_metadata_version) < (3, 3, 0):
# Use the vendored importlib_metadata
import salt.ext.importlib_metadata as importlib_metadata
except ImportError:
# Use the vendored importlib_metadata
import salt.ext.importlib_metadata as importlib_metadata

View file

@ -0,0 +1,687 @@
# This is import_module version 3.3.0 vendored into Salt Project
#
# Copyright 2017-2019 Jason R. Coombs, Barry Warsaw
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# THIS CODE SHOULD BE REMOVED WITH importlib-metadata >= 3.3.0 IS AVAILABLE
# AS A SYSTEM PACKAGE ON ALL THE PLATFORMS FOR WHICH SALT BUILDS PACKAGES OR
# WHEN THE MINIMUM PYTHON VERSION IS 3.10
# pylint: skip-file
import os
import re
import abc
import csv
import sys
import email
import pathlib
import operator
import functools
import itertools
import posixpath
import collections
try:
import zipp
except ImportError:
import salt.ext.zipp as zipp
from ._compat import (
NullFinder,
PyPy_repr,
install,
Protocol,
)
from configparser import ConfigParser
from contextlib import suppress
from importlib import import_module
from importlib.abc import MetaPathFinder
from itertools import starmap
from typing import Any, List, Optional, TypeVar, Union
__all__ = [
'Distribution',
'DistributionFinder',
'PackageNotFoundError',
'distribution',
'distributions',
'entry_points',
'files',
'metadata',
'requires',
'version',
]
class PackageNotFoundError(ModuleNotFoundError):
"""The package was not found."""
def __str__(self):
tmpl = "No package metadata was found for {self.name}"
return tmpl.format(**locals())
@property
def name(self):
(name,) = self.args
return name
class EntryPoint(
PyPy_repr, collections.namedtuple('EntryPointBase', 'name value group')
):
"""An entry point as defined by Python packaging conventions.
See `the packaging docs on entry points
<https://packaging.python.org/specifications/entry-points/>`_
for more information.
"""
pattern = re.compile(
r'(?P<module>[\w.]+)\s*'
r'(:\s*(?P<attr>[\w.]+))?\s*'
r'(?P<extras>\[.*\])?\s*$'
)
"""
A regular expression describing the syntax for an entry point,
which might look like:
- module
- package.module
- package.module:attribute
- package.module:object.attribute
- package.module:attr [extra1, extra2]
Other combinations are possible as well.
The expression is lenient about whitespace around the ':',
following the attr, and following any extras.
"""
dist: Optional['Distribution'] = None
def load(self):
"""Load the entry point from its definition. If only a module
is indicated by the value, return that module. Otherwise,
return the named object.
"""
match = self.pattern.match(self.value)
module = import_module(match.group('module'))
attrs = filter(None, (match.group('attr') or '').split('.'))
return functools.reduce(getattr, attrs, module)
@property
def module(self):
match = self.pattern.match(self.value)
return match.group('module')
@property
def attr(self):
match = self.pattern.match(self.value)
return match.group('attr')
@property
def extras(self):
match = self.pattern.match(self.value)
return list(re.finditer(r'\w+', match.group('extras') or ''))
@classmethod
def _from_config(cls, config):
return (
cls(name, value, group)
for group in config.sections()
for name, value in config.items(group)
)
@classmethod
def _from_text(cls, text):
config = ConfigParser(delimiters='=')
# case sensitive: https://stackoverflow.com/q/1611799/812183
config.optionxform = str
config.read_string(text)
return cls._from_config(config)
@classmethod
def _from_text_for(cls, text, dist):
return (ep._for(dist) for ep in cls._from_text(text))
def _for(self, dist):
self.dist = dist
return self
def __iter__(self):
"""
Supply iter so one may construct dicts of EntryPoints easily.
"""
return iter((self.name, self))
def __reduce__(self):
return (
self.__class__,
(self.name, self.value, self.group),
)
class PackagePath(pathlib.PurePosixPath):
"""A reference to a path in a package"""
def read_text(self, encoding='utf-8'):
with self.locate().open(encoding=encoding) as stream:
return stream.read()
def read_binary(self):
with self.locate().open('rb') as stream:
return stream.read()
def locate(self):
"""Return a path-like object for this path"""
return self.dist.locate_file(self)
class FileHash:
def __init__(self, spec):
self.mode, _, self.value = spec.partition('=')
def __repr__(self):
return '<FileHash mode: {} value: {}>'.format(self.mode, self.value)
_T = TypeVar("_T")
class PackageMetadata(Protocol):
def __len__(self) -> int:
... # pragma: no cover
def __contains__(self, item: str) -> bool:
... # pragma: no cover
def __getitem__(self, key: str) -> str:
... # pragma: no cover
def get_all(self, name: str, failobj: _T = ...) -> Union[List[Any], _T]:
"""
Return all values associated with a possibly multi-valued key.
"""
class Distribution:
"""A Python distribution package."""
@abc.abstractmethod
def read_text(self, filename):
"""Attempt to load metadata file given by the name.
:param filename: The name of the file in the distribution info.
:return: The text if found, otherwise None.
"""
@abc.abstractmethod
def locate_file(self, path):
"""
Given a path to a file in this distribution, return a path
to it.
"""
@classmethod
def from_name(cls, name):
"""Return the Distribution for the given package name.
:param name: The name of the distribution package to search for.
:return: The Distribution instance (or subclass thereof) for the named
package, if found.
:raises PackageNotFoundError: When the named package's distribution
metadata cannot be found.
"""
for resolver in cls._discover_resolvers():
dists = resolver(DistributionFinder.Context(name=name))
dist = next(iter(dists), None)
if dist is not None:
return dist
else:
raise PackageNotFoundError(name)
@classmethod
def discover(cls, **kwargs):
"""Return an iterable of Distribution objects for all packages.
Pass a ``context`` or pass keyword arguments for constructing
a context.
:context: A ``DistributionFinder.Context`` object.
:return: Iterable of Distribution objects for all packages.
"""
context = kwargs.pop('context', None)
if context and kwargs:
raise ValueError("cannot accept context and kwargs")
context = context or DistributionFinder.Context(**kwargs)
return itertools.chain.from_iterable(
resolver(context) for resolver in cls._discover_resolvers()
)
@staticmethod
def at(path):
"""Return a Distribution for the indicated metadata path
:param path: a string or path-like object
:return: a concrete Distribution instance for the path
"""
return PathDistribution(pathlib.Path(path))
@staticmethod
def _discover_resolvers():
"""Search the meta_path for resolvers."""
declared = (
getattr(finder, 'find_distributions', None) for finder in sys.meta_path
)
return filter(None, declared)
@classmethod
def _local(cls, root='.'):
from pep517 import build, meta
system = build.compat_system(root)
builder = functools.partial(
meta.build,
source_dir=root,
system=system,
)
return PathDistribution(zipp.Path(meta.build_as_zip(builder)))
@property
def metadata(self) -> PackageMetadata:
"""Return the parsed metadata for this Distribution.
The returned object will have keys that name the various bits of
metadata. See PEP 566 for details.
"""
text = (
self.read_text('METADATA')
or self.read_text('PKG-INFO')
# This last clause is here to support old egg-info files. Its
# effect is to just end up using the PathDistribution's self._path
# (which points to the egg-info file) attribute unchanged.
or self.read_text('')
)
return email.message_from_string(text)
@property
def name(self):
"""Return the 'Name' metadata for the distribution package."""
return self.metadata['Name']
@property
def version(self):
"""Return the 'Version' metadata for the distribution package."""
return self.metadata['Version']
@property
def entry_points(self):
return list(EntryPoint._from_text_for(self.read_text('entry_points.txt'), self))
@property
def files(self):
"""Files in this distribution.
:return: List of PackagePath for this distribution or None
Result is `None` if the metadata file that enumerates files
(i.e. RECORD for dist-info or SOURCES.txt for egg-info) is
missing.
Result may be empty if the metadata exists but is empty.
"""
file_lines = self._read_files_distinfo() or self._read_files_egginfo()
def make_file(name, hash=None, size_str=None):
result = PackagePath(name)
result.hash = FileHash(hash) if hash else None
result.size = int(size_str) if size_str else None
result.dist = self
return result
return file_lines and list(starmap(make_file, csv.reader(file_lines)))
def _read_files_distinfo(self):
"""
Read the lines of RECORD
"""
text = self.read_text('RECORD')
return text and text.splitlines()
def _read_files_egginfo(self):
"""
SOURCES.txt might contain literal commas, so wrap each line
in quotes.
"""
text = self.read_text('SOURCES.txt')
return text and map('"{}"'.format, text.splitlines())
@property
def requires(self):
"""Generated requirements specified for this Distribution"""
reqs = self._read_dist_info_reqs() or self._read_egg_info_reqs()
return reqs and list(reqs)
def _read_dist_info_reqs(self):
return self.metadata.get_all('Requires-Dist')
def _read_egg_info_reqs(self):
source = self.read_text('requires.txt')
return source and self._deps_from_requires_text(source)
@classmethod
def _deps_from_requires_text(cls, source):
section_pairs = cls._read_sections(source.splitlines())
sections = {
section: list(map(operator.itemgetter('line'), results))
for section, results in itertools.groupby(
section_pairs, operator.itemgetter('section')
)
}
return cls._convert_egg_info_reqs_to_simple_reqs(sections)
@staticmethod
def _read_sections(lines):
section = None
for line in filter(None, lines):
section_match = re.match(r'\[(.*)\]$', line)
if section_match:
section = section_match.group(1)
continue
yield locals()
@staticmethod
def _convert_egg_info_reqs_to_simple_reqs(sections):
"""
Historically, setuptools would solicit and store 'extra'
requirements, including those with environment markers,
in separate sections. More modern tools expect each
dependency to be defined separately, with any relevant
extras and environment markers attached directly to that
requirement. This method converts the former to the
latter. See _test_deps_from_requires_text for an example.
"""
def make_condition(name):
return name and 'extra == "{name}"'.format(name=name)
def parse_condition(section):
section = section or ''
extra, sep, markers = section.partition(':')
if extra and markers:
markers = '({markers})'.format(markers=markers)
conditions = list(filter(None, [markers, make_condition(extra)]))
return '; ' + ' and '.join(conditions) if conditions else ''
for section, deps in sections.items():
for dep in deps:
yield dep + parse_condition(section)
class DistributionFinder(MetaPathFinder):
"""
A MetaPathFinder capable of discovering installed distributions.
"""
class Context:
"""
Keyword arguments presented by the caller to
``distributions()`` or ``Distribution.discover()``
to narrow the scope of a search for distributions
in all DistributionFinders.
Each DistributionFinder may expect any parameters
and should attempt to honor the canonical
parameters defined below when appropriate.
"""
name = None
"""
Specific name for which a distribution finder should match.
A name of ``None`` matches all distributions.
"""
def __init__(self, **kwargs):
vars(self).update(kwargs)
@property
def path(self):
"""
The path that a distribution finder should search.
Typically refers to Python package paths and defaults
to ``sys.path``.
"""
return vars(self).get('path', sys.path)
@abc.abstractmethod
def find_distributions(self, context=Context()):
"""
Find distributions.
Return an iterable of all Distribution instances capable of
loading the metadata for packages matching the ``context``,
a DistributionFinder.Context instance.
"""
class FastPath:
"""
Micro-optimized class for searching a path for
children.
"""
def __init__(self, root):
self.root = str(root)
self.base = os.path.basename(self.root).lower()
def joinpath(self, child):
return pathlib.Path(self.root, child)
def children(self):
with suppress(Exception):
return os.listdir(self.root or '')
with suppress(Exception):
return self.zip_children()
return []
def zip_children(self):
zip_path = zipp.Path(self.root)
names = zip_path.root.namelist()
self.joinpath = zip_path.joinpath
return dict.fromkeys(child.split(posixpath.sep, 1)[0] for child in names)
def search(self, name):
return (
self.joinpath(child)
for child in self.children()
if name.matches(child, self.base)
)
class Prepared:
"""
A prepared search for metadata on a possibly-named package.
"""
normalized = None
suffixes = '.dist-info', '.egg-info'
exact_matches = [''][:0]
def __init__(self, name):
self.name = name
if name is None:
return
self.normalized = self.normalize(name)
self.exact_matches = [self.normalized + suffix for suffix in self.suffixes]
@staticmethod
def normalize(name):
"""
PEP 503 normalization plus dashes as underscores.
"""
return re.sub(r"[-_.]+", "-", name).lower().replace('-', '_')
@staticmethod
def legacy_normalize(name):
"""
Normalize the package name as found in the convention in
older packaging tools versions and specs.
"""
return name.lower().replace('-', '_')
def matches(self, cand, base):
low = cand.lower()
pre, ext = os.path.splitext(low)
name, sep, rest = pre.partition('-')
return (
low in self.exact_matches
or ext in self.suffixes
and (not self.normalized or name.replace('.', '_') == self.normalized)
# legacy case:
or self.is_egg(base)
and low == 'egg-info'
)
def is_egg(self, base):
normalized = self.legacy_normalize(self.name or '')
prefix = normalized + '-' if normalized else ''
versionless_egg_name = normalized + '.egg' if self.name else ''
return (
base == versionless_egg_name
or base.startswith(prefix)
and base.endswith('.egg')
)
@install
class MetadataPathFinder(NullFinder, DistributionFinder):
"""A degenerate finder for distribution packages on the file system.
This finder supplies only a find_distributions() method for versions
of Python that do not have a PathFinder find_distributions().
"""
def find_distributions(self, context=DistributionFinder.Context()):
"""
Find distributions.
Return an iterable of all Distribution instances capable of
loading the metadata for packages matching ``context.name``
(or all names if ``None`` indicated) along the paths in the list
of directories ``context.path``.
"""
found = self._search_paths(context.name, context.path)
return map(PathDistribution, found)
@classmethod
def _search_paths(cls, name, paths):
"""Find metadata directories in paths heuristically."""
return itertools.chain.from_iterable(
path.search(Prepared(name)) for path in map(FastPath, paths)
)
class PathDistribution(Distribution):
def __init__(self, path):
"""Construct a distribution from a path to the metadata directory.
:param path: A pathlib.Path or similar object supporting
.joinpath(), __div__, .parent, and .read_text().
"""
self._path = path
def read_text(self, filename):
with suppress(
FileNotFoundError,
IsADirectoryError,
KeyError,
NotADirectoryError,
PermissionError,
):
return self._path.joinpath(filename).read_text(encoding='utf-8')
read_text.__doc__ = Distribution.read_text.__doc__
def locate_file(self, path):
return self._path.parent / path
def distribution(distribution_name):
"""Get the ``Distribution`` instance for the named package.
:param distribution_name: The name of the distribution package as a string.
:return: A ``Distribution`` instance (or subclass thereof).
"""
return Distribution.from_name(distribution_name)
def distributions(**kwargs):
"""Get all ``Distribution`` instances in the current environment.
:return: An iterable of ``Distribution`` instances.
"""
return Distribution.discover(**kwargs)
def metadata(distribution_name) -> PackageMetadata:
"""Get the metadata for the named package.
:param distribution_name: The name of the distribution package to query.
:return: A PackageMetadata containing the parsed metadata.
"""
return Distribution.from_name(distribution_name).metadata
def version(distribution_name):
"""Get the version string for the named package.
:param distribution_name: The name of the distribution package to query.
:return: The version string for the package as defined in the package's
"Version" metadata key.
"""
return distribution(distribution_name).version
def entry_points():
"""Return EntryPoint objects for all installed packages.
:return: EntryPoint objects for all installed packages.
"""
eps = itertools.chain.from_iterable(dist.entry_points for dist in distributions())
by_group = operator.attrgetter('group')
ordered = sorted(eps, key=by_group)
grouped = itertools.groupby(ordered, by_group)
return {group: tuple(eps) for group, eps in grouped}
def files(distribution_name):
"""Return a list of files for the named package.
:param distribution_name: The name of the distribution package to query.
:return: List of files composing the distribution.
"""
return distribution(distribution_name).files
def requires(distribution_name):
"""
Return a list of requirements for the named package.
:return: An iterator of requirements, suitable for
packaging.requirement.Requirement.
"""
return distribution(distribution_name).requires

View file

@ -0,0 +1,107 @@
# This is import_module version 3.3.0 vendored into Salt Project
#
# Copyright 2017-2019 Jason R. Coombs, Barry Warsaw
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# THIS CODE SHOULD BE REMOVED WITH importlib-metadata >= 3.3.0 IS AVAILABLE
# AS A SYSTEM PACKAGE ON ALL THE PLATFORMS FOR WHICH SALT BUILDS PACKAGES OR
# WHEN THE MINIMUM PYTHON VERSION IS 3.10
# pylint: skip-file
import sys
__all__ = ['install', 'NullFinder', 'PyPy_repr', 'Protocol']
try:
from typing import Protocol
except ImportError: # pragma: no cover
"""
pytest-mypy complains here because:
error: Incompatible import of "Protocol" (imported name has type
"typing_extensions._SpecialForm", local name has type "typing._SpecialForm")
"""
from salt.ext.typing_extensions import Protocol # type: ignore
def install(cls):
"""
Class decorator for installation on sys.meta_path.
Adds the backport DistributionFinder to sys.meta_path and
attempts to disable the finder functionality of the stdlib
DistributionFinder.
"""
sys.meta_path.append(cls())
disable_stdlib_finder()
return cls
def disable_stdlib_finder():
"""
Give the backport primacy for discovering path-based distributions
by monkey-patching the stdlib O_O.
See #91 for more background for rationale on this sketchy
behavior.
"""
def matches(finder):
return getattr(
finder, '__module__', None
) == '_frozen_importlib_external' and hasattr(finder, 'find_distributions')
for finder in filter(matches, sys.meta_path): # pragma: nocover
del finder.find_distributions
class NullFinder:
"""
A "Finder" (aka "MetaClassFinder") that never finds any modules,
but may find distributions.
"""
@staticmethod
def find_spec(*args, **kwargs):
return None
# In Python 2, the import system requires finders
# to have a find_module() method, but this usage
# is deprecated in Python 3 in favor of find_spec().
# For the purposes of this finder (i.e. being present
# on sys.meta_path but having no other import
# system functionality), the two methods are identical.
find_module = find_spec
class PyPy_repr:
"""
Override repr for EntryPoint objects on PyPy to avoid __iter__ access.
Ref #97, #102.
"""
affected = hasattr(sys, 'pypy_version_info')
def __compat_repr__(self): # pragma: nocover
def make_param(name):
value = getattr(self, name)
return '{name}={value!r}'.format(**locals())
params = ', '.join(map(make_param, self._fields))
return 'EntryPoint({params})'.format(**locals())
if affected: # pragma: nocover
__repr__ = __compat_repr__
del affected

File diff suppressed because it is too large Load diff

353
salt/ext/zipp.py Normal file
View file

@ -0,0 +1,353 @@
# This is https://raw.githubusercontent.com/jaraco/zipp/v3.5.0/zipp.py
#
# Copyright Jason R. Coombs
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
# THIS CODE SHOULD BE REMOVED WITH importlib-metadata >= 3.3.0 IS AVAILABLE
# AS A SYSTEM PACKAGE ON ALL THE PLATFORMS FOR WHICH SALT BUILDS PACKAGES OR
# WHEN THE MINIMUM PYTHON VERSION IS 3.10
# pylint: skip-file
import io
import posixpath
import zipfile
import itertools
import contextlib
import sys
import pathlib
if sys.version_info < (3, 7):
from collections import OrderedDict
else:
OrderedDict = dict
def _parents(path):
"""
Given a path with elements separated by
posixpath.sep, generate all parents of that path.
>>> list(_parents('b/d'))
['b']
>>> list(_parents('/b/d/'))
['/b']
>>> list(_parents('b/d/f/'))
['b/d', 'b']
>>> list(_parents('b'))
[]
>>> list(_parents(''))
[]
"""
return itertools.islice(_ancestry(path), 1, None)
def _ancestry(path):
"""
Given a path with elements separated by
posixpath.sep, generate all elements of that path
>>> list(_ancestry('b/d'))
['b/d', 'b']
>>> list(_ancestry('/b/d/'))
['/b/d', '/b']
>>> list(_ancestry('b/d/f/'))
['b/d/f', 'b/d', 'b']
>>> list(_ancestry('b'))
['b']
>>> list(_ancestry(''))
[]
"""
path = path.rstrip(posixpath.sep)
while path and path != posixpath.sep:
yield path
path, tail = posixpath.split(path)
_dedupe = OrderedDict.fromkeys
"""Deduplicate an iterable in original order"""
def _difference(minuend, subtrahend):
"""
Return items in minuend not in subtrahend, retaining order
with O(1) lookup.
"""
return itertools.filterfalse(set(subtrahend).__contains__, minuend)
class CompleteDirs(zipfile.ZipFile):
"""
A ZipFile subclass that ensures that implied directories
are always included in the namelist.
"""
@staticmethod
def _implied_dirs(names):
parents = itertools.chain.from_iterable(map(_parents, names))
as_dirs = (p + posixpath.sep for p in parents)
return _dedupe(_difference(as_dirs, names))
def namelist(self):
names = super().namelist()
return names + list(self._implied_dirs(names))
def _name_set(self):
return set(self.namelist())
def resolve_dir(self, name):
"""
If the name represents a directory, return that name
as a directory (with the trailing slash).
"""
names = self._name_set()
dirname = name + '/'
dir_match = name not in names and dirname in names
return dirname if dir_match else name
@classmethod
def make(cls, source):
"""
Given a source (filename or zipfile), return an
appropriate CompleteDirs subclass.
"""
if isinstance(source, CompleteDirs):
return source
if not isinstance(source, zipfile.ZipFile):
return cls(_pathlib_compat(source))
# Only allow for FastLookup when supplied zipfile is read-only
if 'r' not in source.mode:
cls = CompleteDirs
source.__class__ = cls
return source
class FastLookup(CompleteDirs):
"""
ZipFile subclass to ensure implicit
dirs exist and are resolved rapidly.
"""
def namelist(self):
with contextlib.suppress(AttributeError):
return self.__names
self.__names = super().namelist()
return self.__names
def _name_set(self):
with contextlib.suppress(AttributeError):
return self.__lookup
self.__lookup = super()._name_set()
return self.__lookup
def _pathlib_compat(path):
"""
For path-like objects, convert to a filename for compatibility
on Python 3.6.1 and earlier.
"""
try:
return path.__fspath__()
except AttributeError:
return str(path)
class Path:
"""
A pathlib-compatible interface for zip files.
Consider a zip file with this structure::
.
a.txt
b
c.txt
d
e.txt
>>> data = io.BytesIO()
>>> zf = zipfile.ZipFile(data, 'w')
>>> zf.writestr('a.txt', 'content of a')
>>> zf.writestr('b/c.txt', 'content of c')
>>> zf.writestr('b/d/e.txt', 'content of e')
>>> zf.filename = 'mem/abcde.zip'
Path accepts the zipfile object itself or a filename
>>> root = Path(zf)
From there, several path operations are available.
Directory iteration (including the zip file itself):
>>> a, b = root.iterdir()
>>> a
Path('mem/abcde.zip', 'a.txt')
>>> b
Path('mem/abcde.zip', 'b/')
name property:
>>> b.name
'b'
join with divide operator:
>>> c = b / 'c.txt'
>>> c
Path('mem/abcde.zip', 'b/c.txt')
>>> c.name
'c.txt'
Read text:
>>> c.read_text()
'content of c'
existence:
>>> c.exists()
True
>>> (b / 'missing.txt').exists()
False
Coercion to string:
>>> import os
>>> str(c).replace(os.sep, posixpath.sep)
'mem/abcde.zip/b/c.txt'
At the root, ``name``, ``filename``, and ``parent``
resolve to the zipfile. Note these attributes are not
valid and will raise a ``ValueError`` if the zipfile
has no filename.
>>> root.name
'abcde.zip'
>>> str(root.filename).replace(os.sep, posixpath.sep)
'mem/abcde.zip'
>>> str(root.parent)
'mem'
"""
__repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})"
def __init__(self, root, at=""):
"""
Construct a Path from a ZipFile or filename.
Note: When the source is an existing ZipFile object,
its type (__class__) will be mutated to a
specialized type. If the caller wishes to retain the
original type, the caller should either create a
separate ZipFile object or pass a filename.
"""
self.root = FastLookup.make(root)
self.at = at
def open(self, mode='r', *args, pwd=None, **kwargs):
"""
Open this entry as text or binary following the semantics
of ``pathlib.Path.open()`` by passing arguments through
to io.TextIOWrapper().
"""
if self.is_dir():
raise IsADirectoryError(self)
zip_mode = mode[0]
if not self.exists() and zip_mode == 'r':
raise FileNotFoundError(self)
stream = self.root.open(self.at, zip_mode, pwd=pwd)
if 'b' in mode:
if args or kwargs:
raise ValueError("encoding args invalid for binary operation")
return stream
return io.TextIOWrapper(stream, *args, **kwargs)
@property
def name(self):
return pathlib.Path(self.at).name or self.filename.name
@property
def suffix(self):
return pathlib.Path(self.at).suffix or self.filename.suffix
@property
def suffixes(self):
return pathlib.Path(self.at).suffixes or self.filename.suffixes
@property
def stem(self):
return pathlib.Path(self.at).stem or self.filename.stem
@property
def filename(self):
return pathlib.Path(self.root.filename).joinpath(self.at)
def read_text(self, *args, **kwargs):
with self.open('r', *args, **kwargs) as strm:
return strm.read()
def read_bytes(self):
with self.open('rb') as strm:
return strm.read()
def _is_child(self, path):
return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/")
def _next(self, at):
return self.__class__(self.root, at)
def is_dir(self):
return not self.at or self.at.endswith("/")
def is_file(self):
return self.exists() and not self.is_dir()
def exists(self):
return self.at in self.root._name_set()
def iterdir(self):
if not self.is_dir():
raise ValueError("Can't listdir a file")
subs = map(self._next, self.root.namelist())
return filter(self._is_child, subs)
def __str__(self):
return posixpath.join(self.root.filename, self.at)
def __repr__(self):
return self.__repr.format(self=self)
def joinpath(self, *other):
next = posixpath.join(self.at, *map(_pathlib_compat, other))
return self._next(self.root.resolve_dir(next))
__truediv__ = joinpath
@property
def parent(self):
if not self.at:
return self.filename.parent
parent_at = posixpath.dirname(self.at.rstrip('/'))
if parent_at:
parent_at += '/'
return self._next(parent_at)

View file

@ -12,24 +12,13 @@ if sys.version_info >= (3, 10):
USE_IMPORTLIB_METADATA_STDLIB = True
else:
if sys.version_info >= (3, 6):
# importlib_metadata available for python version lower than 3.6 do not
# include the functionality we need.
try:
import importlib_metadata
try:
from salt._compat import importlib_metadata
importlib_metadata_version = [
int(part)
for part in importlib_metadata.version("importlib_metadata").split(".")
if part.isdigit()
]
if tuple(importlib_metadata_version) >= (3, 3, 0):
# Version 3.3.0 of importlib_metadata includes a fix which allows us to
# get the distribution of a loaded entry-point
USE_IMPORTLIB_METADATA = True
except ImportError:
# We don't have importlib_metadata but USE_IMPORTLIB_METADATA is set to false by default
pass
USE_IMPORTLIB_METADATA = True
except ImportError:
# We don't have importlib_metadata but USE_IMPORTLIB_METADATA is set to false by default
pass
if not USE_IMPORTLIB_METADATA_STDLIB and not USE_IMPORTLIB_METADATA:
# Try to use pkg_resources

View file

@ -151,73 +151,6 @@ def test_utils_loader_does_not_load_extensions(
assert "foobar.echo" not in loader_functions
def test_extension_discovery_without_reload_with_pkg_resources(
venv, salt_extension, salt_minion_factory
):
# Install our extension into the virtualenv
installed_packages = venv.get_installed_packages()
assert salt_extension.name not in installed_packages
if "importlib-metadata" in installed_packages:
importlib_metadata_version = installed_packages["importlib-metadata"]
if salt.utils.versions.StrictVersion(importlib_metadata_version) >= "3.3.0":
venv.install("-U", "importlib-metadata<3.3.0")
code = """
import sys
import json
import subprocess
# If the test fails, for debugging purposes, comment out the following 2 lines
import salt.log.setup
salt.log.setup.setup_console_logger(log_level="debug")
extension_path = "{}"
import salt.loader
minion_config = json.loads(sys.stdin.read())
loader = salt.loader.minion_mods(minion_config)
if "foobar.echo1" in loader:
sys.exit(1)
# Install the extension
proc = subprocess.run(
[sys.executable, "-m", "pip", "install", extension_path],
check=False,
shell=False,
stdout=subprocess.PIPE,
)
if proc.returncode != 0:
sys.exit(2)
loader = salt.loader.minion_mods(minion_config)
if "foobar.echo1" not in loader:
sys.exit(3)
print(json.dumps(list(loader)))
""".format(
salt_extension.srcdir
)
ret = venv.run_code(
code, input=json.dumps(salt_minion_factory.config.copy()), check=False
)
# Exitcode 1 - Extension was already installed
# Exitcode 2 - Failed to install the extension
# Exitcode 3 - Extension was not found within the same python process after being installed
assert ret.exitcode == 0
installed_packages = venv.get_installed_packages()
assert salt_extension.name in installed_packages
assert "Using pkg_resources to load entry points" in ret.stderr
loader_functions = json.loads(ret.stdout)
# A non existing module should not appear in the loader
assert "monty.python" not in loader_functions
# But our extension's modules should appear on the loader
assert "foobar.echo1" in loader_functions
assert "foobar.echo2" in loader_functions
@pytest.mark.skipif(
sys.version_info < (3, 6),
reason="importlib-metadata>=3.3.0 does not exist for Py3.5",
@ -284,3 +217,74 @@ def test_extension_discovery_without_reload_with_importlib_metadata(
# But our extension's modules should appear on the loader
assert "foobar.echo1" in loader_functions
assert "foobar.echo2" in loader_functions
@pytest.mark.skipif(
sys.version_info > (3, 6),
reason="Reloading with pkg_resources is only available on Py3.5",
)
def test_extension_discovery_without_reload_with_pkg_resources(
venv, salt_extension, salt_minion_factory
):
# Install our extension into the virtualenv
installed_packages = venv.get_installed_packages()
assert salt_extension.name not in installed_packages
if "importlib-metadata" in installed_packages:
importlib_metadata_version = installed_packages["importlib-metadata"]
if salt.utils.versions.StrictVersion(importlib_metadata_version) >= "3.3.0":
venv.install("-U", "importlib-metadata<3.3.0")
code = """
import sys
import json
import subprocess
# If the test fails, for debugging purposes, comment out the following 2 lines
import salt.log.setup
salt.log.setup.setup_console_logger(log_level="debug")
extension_path = "{}"
import salt.loader
minion_config = json.loads(sys.stdin.read())
loader = salt.loader.minion_mods(minion_config)
if "foobar.echo1" in loader:
sys.exit(1)
# Install the extension
proc = subprocess.run(
[sys.executable, "-m", "pip", "install", extension_path],
check=False,
shell=False,
stdout=subprocess.PIPE,
)
if proc.returncode != 0:
sys.exit(2)
loader = salt.loader.minion_mods(minion_config)
if "foobar.echo1" not in loader:
sys.exit(3)
print(json.dumps(list(loader)))
""".format(
salt_extension.srcdir
)
ret = venv.run_code(
code, input=json.dumps(salt_minion_factory.config.copy()), check=False
)
# Exitcode 1 - Extension was already installed
# Exitcode 2 - Failed to install the extension
# Exitcode 3 - Extension was not found within the same python process after being installed
assert ret.exitcode == 0
installed_packages = venv.get_installed_packages()
assert salt_extension.name in installed_packages
assert "Using pkg_resources to load entry points" in ret.stderr
loader_functions = json.loads(ret.stdout)
# A non existing module should not appear in the loader
assert "monty.python" not in loader_functions
# But our extension's modules should appear on the loader
assert "foobar.echo1" in loader_functions
assert "foobar.echo2" in loader_functions