Merge branch 'master' into 18907_lazy_unmount_when_fails

This commit is contained in:
David Murphy 2023-12-04 13:41:44 -07:00 committed by GitHub
commit fd1cd1dcac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
48 changed files with 2374 additions and 1771 deletions

1
changelog/63144.fixed.md Normal file
View file

@ -0,0 +1 @@
Fixed gpg.receive_keys returns success on failed import

1
changelog/63153.fixed.md Normal file
View file

@ -0,0 +1 @@
Fixed GPG state module always reports success without changes

1
changelog/63156.fixed.md Normal file
View file

@ -0,0 +1 @@
Fixed GPG state module does not respect test mode

1
changelog/63159.fixed.md Normal file
View file

@ -0,0 +1 @@
Fixed gpg.absent with gnupghome/user, fixed gpg.delete_key with gnupghome

1
changelog/63214.fixed.md Normal file
View file

@ -0,0 +1 @@
Fixed service module does not handle enable/disable if systemd service is an alias

1
changelog/65169.fixed.md Normal file
View file

@ -0,0 +1 @@
Fixed `gpg.present` succeeds when the keyserver is unreachable

1
changelog/65295.fixed.md Normal file
View file

@ -0,0 +1 @@
Fix typo in nftables module to ensure unique nft family values

1
changelog/65458.fixed.md Normal file
View file

@ -0,0 +1 @@
pip.installed state will now properly fail when a specified user does not exists

1
changelog/65479.added.md Normal file
View file

@ -0,0 +1 @@
Added support for master top modules on masterless minions

1
changelog/65480.fixed.md Normal file
View file

@ -0,0 +1 @@
Made salt-ssh merge master top returns for the same environment

View file

@ -0,0 +1 @@
Deprecation warning for Salt's backport of ``OrderedDict`` class which will be removed in 3009

1
changelog/65562.fixed.md Normal file
View file

@ -0,0 +1 @@
Improve the condition of overriding target for pip with VENV_PIP_TARGET environment variable.

View file

@ -0,0 +1 @@
Deprecate Kubernetes modules for move to saltext-kubernetes in version 3009

View file

@ -0,0 +1 @@
Deprecated all Pushover modules in favor of the Salt Extension at https://github.com/salt-extensions/saltext-pushover. The Pushover modules will be removed from Salt core in 3009.0

View file

@ -3,12 +3,18 @@
Sphinx documentation for Salt
"""
import os
import pathlib
import re
import shutil
import sys
import textwrap
import time
import types
from sphinx.directives.other import TocTree
from sphinx.util import logging
log = logging.getLogger(__name__)
# -- Add paths to PYTHONPATH ---------------------------------------------------
try:
@ -414,6 +420,67 @@ class ReleasesTree(TocTree):
return rst
def copy_release_templates_pre(app):
app._copied_release_files = []
docs_path = pathlib.Path(docs_basepath)
release_files_dir = docs_path / "topics" / "releases"
release_template_files_dir = release_files_dir / "templates"
for fpath in release_template_files_dir.iterdir():
dest = release_files_dir / fpath.name.replace(".template", "")
if dest.exists():
continue
log.info(
"Copying '%s' -> '%s' just for this build ...",
fpath.relative_to(docs_path),
dest.relative_to(docs_path),
)
app._copied_release_files.append(dest)
shutil.copyfile(fpath, dest)
def copy_release_templates_post(app, exception):
docs_path = pathlib.Path(docs_basepath)
for fpath in app._copied_release_files:
log.info(
"The release file '%s' was copied for the build, but its not in "
"version control system. Deleting.",
fpath.relative_to(docs_path),
)
fpath.unlink()
def extract_module_deprecations(app, what, name, obj, options, lines):
"""
Add a warning to the modules being deprecated into extensions.
"""
# https://www.sphinx-doc.org/en/master/usage/extensions/autodoc.html#event-autodoc-process-docstring
if what != "module":
# We're only interested in module deprecations
return
try:
deprecated_info = obj.__deprecated__
except AttributeError:
# The module is not deprecated
return
_version, _extension, _url = deprecated_info
msg = textwrap.dedent(
f"""
.. warning::
This module will be removed from Salt in version {_version} in favor of
the `{_extension} Salt Extension <{_url}>`_.
"""
)
# Modify the docstring lines in-place
lines[:] = msg.splitlines() + lines
def setup(app):
app.add_directive("releasestree", ReleasesTree)
app.connect("autodoc-skip-member", skip_mod_init_member)
app.connect("builder-inited", copy_release_templates_pre)
app.connect("build-finished", copy_release_templates_post)
app.connect("autodoc-process-docstring", extract_module_deprecations)

View file

@ -12,7 +12,6 @@ This section contains a list of the Python modules that are used to extend the v
../ref/beacons/all/index
../ref/cache/all/index
../ref/clouds/all/index
../ref/configuration/index
../ref/engines/all/index
../ref/modules/all/index
../ref/executors/all/index

View file

@ -9,6 +9,7 @@ secure and troubleshoot, and how to perform many other administrative tasks.
:maxdepth: 1
:glob:
../../ref/configuration/index
../../ref/configuration/master
../../ref/configuration/minion
../../ref/configuration/proxy

View file

@ -12,6 +12,10 @@ The old `external_nodes` option has been removed. The master tops system
provides a pluggable and extendable replacement for it, allowing for multiple
different subsystems to provide top file data.
.. versionchanged:: 3007.0
Masterless minions now support master top modules as well.
Using the new `master_tops` option is simple:
.. code-block:: yaml

View file

@ -170,8 +170,7 @@ later minions. When using this new repository, the repo cache is compiled on
the Salt Minion, which enables pillar, grains and other things to be available
during compilation time.
See the :ref:`Windows Software Repository <2015-8-0-winrepo-changes>`
documentation for more information.
See the Windows Software Repository documentation for more information.
Changes to legacy Windows repository
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@ -183,8 +182,7 @@ If you were previously using this repository and have customized settings, be
aware that several config options have been renamed to make their naming more
consistent.
See the :ref:`Windows Software Repository <2015-8-0-winrepo-changes>`
documentation for more information.
See the Windows Software Repository documentation for more information.
Win System Module
-----------------

File diff suppressed because it is too large Load diff

View file

@ -74,7 +74,6 @@ class SSHHighState(salt.state.BaseHighState):
salt.state.BaseHighState.__init__(self, opts)
self.state = SSHState(opts, pillar, wrapper, context=context)
self.matchers = salt.loader.matchers(self.opts)
self.tops = salt.loader.tops(self.opts)
self._pydsl_all_decls = {}
self._pydsl_render_stack = []
@ -92,32 +91,7 @@ class SSHHighState(salt.state.BaseHighState):
"""
Evaluate master_tops locally
"""
if "id" not in self.opts:
log.error("Received call for external nodes without an id")
return {}
if not salt.utils.verify.valid_id(self.opts, self.opts["id"]):
return {}
# Evaluate all configured master_tops interfaces
grains = {}
ret = {}
if "grains" in self.opts:
grains = self.opts["grains"]
for fun in self.tops:
if fun not in self.opts.get("master_tops", {}):
continue
try:
ret.update(self.tops[fun](opts=self.opts, grains=grains))
except Exception as exc: # pylint: disable=broad-except
# If anything happens in the top generation, log it and move on
log.error(
"Top function %s failed with error %s for minion %s",
fun,
exc,
self.opts["id"],
)
return ret
return self._local_master_tops()
def destroy(self):
if self.client:

View file

@ -122,7 +122,7 @@ def _get_user_info(user=None):
# if it doesn't exist then fall back to user Salt running as
userinfo = _get_user_info()
else:
raise SaltInvocationError("User {} does not exist".format(user))
raise SaltInvocationError(f"User {user} does not exist")
return userinfo
@ -556,15 +556,13 @@ def delete_key(
return ret
gpg = _create_gpg(user, gnupghome)
key = get_key(keyid, fingerprint, user)
key = get_key(keyid=keyid, fingerprint=fingerprint, user=user, gnupghome=gnupghome)
def __delete_key(fingerprint, secret, use_passphrase):
if use_passphrase:
if secret and use_passphrase:
gpg_passphrase = __salt__["pillar.get"]("gpg_passphrase")
if not gpg_passphrase:
ret["res"] = False
ret["message"] = "gpg_passphrase not available in pillar."
return ret
return "gpg_passphrase not available in pillar."
else:
out = gpg.delete_keys(fingerprint, secret, passphrase=gpg_passphrase)
else:
@ -573,7 +571,7 @@ def delete_key(
if key:
fingerprint = key["fingerprint"]
skey = get_secret_key(keyid, fingerprint, user)
skey = get_secret_key(keyid, fingerprint, user, gnupghome=gnupghome)
if skey:
if not delete_secret:
ret["res"] = False
@ -582,19 +580,29 @@ def delete_key(
] = "Secret key exists, delete first or pass delete_secret=True."
return ret
else:
if str(__delete_key(fingerprint, True, use_passphrase)) == "ok":
out = __delete_key(fingerprint, True, use_passphrase)
if str(out) == "ok":
# Delete the secret key
ret["message"] = "Secret key for {} deleted\n".format(fingerprint)
ret["message"] = f"Secret key for {fingerprint} deleted\n"
else:
ret["res"] = False
ret[
"message"
] = f"Failed to delete secret key for {fingerprint}: {out}"
return ret
# Delete the public key
if str(__delete_key(fingerprint, False, use_passphrase)) == "ok":
ret["message"] += "Public key for {} deleted".format(fingerprint)
ret["res"] = True
return ret
out = __delete_key(fingerprint, False, use_passphrase)
if str(out) == "ok":
ret["res"] = True
ret["message"] += f"Public key for {fingerprint} deleted"
else:
ret["res"] = False
ret["message"] += f"Failed to delete public key for {fingerprint}: {out}"
else:
ret["res"] = False
ret["message"] = "Key not available in keychain."
return ret
return ret
def get_key(keyid=None, fingerprint=None, user=None, gnupghome=None):
@ -909,7 +917,7 @@ def receive_keys(keyserver=None, keys=None, user=None, gnupghome=None):
salt '*' gpg.receive_keys keys=3FAD9F1E user=username
"""
ret = {"res": True, "changes": {}, "message": []}
ret = {"res": True, "message": []}
gpg = _create_gpg(user, gnupghome)
@ -920,18 +928,30 @@ def receive_keys(keyserver=None, keys=None, user=None, gnupghome=None):
keys = keys.split(",")
recv_data = gpg.recv_keys(keyserver, *keys)
for result in recv_data.results:
if "ok" in result:
if result["ok"] == "1":
ret["message"].append(
"Key {} added to keychain".format(result["fingerprint"])
)
elif result["ok"] == "0":
ret["message"].append(
"Key {} already exists in keychain".format(result["fingerprint"])
)
elif "problem" in result:
ret["message"].append("Unable to add key to keychain")
try:
if recv_data.results:
for result in recv_data.results:
if "ok" in result:
if result["ok"] == "1":
ret["message"].append(
f"Key {result['fingerprint']} added to keychain"
)
elif result["ok"] == "0":
ret["message"].append(
f"Key {result['fingerprint']} already exists in keychain"
)
elif "problem" in result:
ret["message"].append(
f"Unable to add key to keychain: {result.get('text', 'No further description')}"
)
if not recv_data:
ret["res"] = False
ret["message"].append(f"GPG reported failure: {recv_data.stderr}")
except AttributeError:
ret["res"] = False
ret["message"] = ["Invalid return from python-gpg"]
return ret
@ -986,12 +1006,12 @@ def trust_key(keyid=None, fingerprint=None, trust_level=None, user=None):
if key:
if "fingerprint" not in key:
ret["res"] = False
ret["message"] = "Fingerprint not found for keyid {}".format(keyid)
ret["message"] = f"Fingerprint not found for keyid {keyid}"
return ret
fingerprint = key["fingerprint"]
else:
ret["res"] = False
ret["message"] = "KeyID {} not in GPG keychain".format(keyid)
ret["message"] = f"KeyID {keyid} not in GPG keychain"
return ret
else:
ret["res"] = False
@ -1001,7 +1021,7 @@ def trust_key(keyid=None, fingerprint=None, trust_level=None, user=None):
if trust_level not in _VALID_TRUST_LEVELS:
return "ERROR: Valid trust levels - {}".format(",".join(_VALID_TRUST_LEVELS))
stdin = "{}:{}\n".format(fingerprint, NUM_TRUST_DICT[trust_level])
stdin = f"{fingerprint}:{NUM_TRUST_DICT[trust_level]}\n"
cmd = [_gpg(), "--import-ownertrust"]
_user = user
@ -1397,7 +1417,7 @@ def encrypt(
if result.ok:
if not bare:
if output:
ret["comment"] = "Encrypted data has been written to {}".format(output)
ret["comment"] = f"Encrypted data has been written to {output}"
else:
ret["comment"] = result.data
else:
@ -1485,7 +1505,7 @@ def decrypt(
if result.ok:
if not bare:
if output:
ret["comment"] = "Decrypted data has been written to {}".format(output)
ret["comment"] = f"Decrypted data has been written to {output}"
else:
ret["comment"] = result.data
else:

View file

@ -23,6 +23,12 @@ import salt.utils.files
import salt.utils.http as http
import salt.utils.json
__deprecated__ = (
3009,
"kubernetes",
"https://github.com/salt-extensions/saltext-kubernetes",
)
__virtualname__ = "k8s"
# Setup the logger
@ -174,14 +180,14 @@ def _guess_node_id(node):
def _get_labels(node, apiserver_url):
"""Get all labels from a kube node."""
# Prepare URL
url = "{}/api/v1/nodes/{}".format(apiserver_url, node)
url = f"{apiserver_url}/api/v1/nodes/{node}"
# Make request
ret = http.query(url)
# Check requests status
if "body" in ret:
ret = salt.utils.json.loads(ret.get("body"))
elif ret.get("status", 0) == 404:
return "Node {} doesn't exist".format(node)
return f"Node {node} doesn't exist"
else:
return ret
# Get and return labels
@ -191,13 +197,13 @@ def _get_labels(node, apiserver_url):
def _set_labels(node, apiserver_url, labels):
"""Replace labels dict by a new one"""
# Prepare URL
url = "{}/api/v1/nodes/{}".format(apiserver_url, node)
url = f"{apiserver_url}/api/v1/nodes/{node}"
# Prepare data
data = [{"op": "replace", "path": "/metadata/labels", "value": labels}]
# Make request
ret = _kpatch(url, data)
if ret.get("status") == 404:
return "Node {} doesn't exist".format(node)
return f"Node {node} doesn't exist"
return ret
@ -264,9 +270,9 @@ def label_present(name, value, node=None, apiserver_url=None):
# there is an update during operation, need to retry
log.debug("Got 409, will try later")
ret["changes"] = {}
ret["comment"] = "Could not create label {}, please retry".format(name)
ret["comment"] = f"Could not create label {name}, please retry"
else:
ret["comment"] = "Label {} created".format(name)
ret["comment"] = f"Label {name} created"
elif labels.get(name) != str(value):
# This is a old label and we are going to edit it
ret["changes"] = {name: str(value)}
@ -276,12 +282,12 @@ def label_present(name, value, node=None, apiserver_url=None):
# there is an update during operation, need to retry
log.debug("Got 409, will try later")
ret["changes"] = {}
ret["comment"] = "Could not update label {}, please retry".format(name)
ret["comment"] = f"Could not update label {name}, please retry"
else:
ret["comment"] = "Label {} updated".format(name)
ret["comment"] = f"Label {name} updated"
else:
# This is a old label and it has already the wanted value
ret["comment"] = "Label {} already set".format(name)
ret["comment"] = f"Label {name} already set"
return ret
@ -316,7 +322,7 @@ def label_absent(name, node=None, apiserver_url=None):
# Compare old labels and what we want
if labels == old_labels:
# Label already absent
ret["comment"] = "Label {} already absent".format(name)
ret["comment"] = f"Label {name} already absent"
else:
# Label needs to be delete
res = _set_labels(node, apiserver_url, labels)
@ -324,10 +330,10 @@ def label_absent(name, node=None, apiserver_url=None):
# there is an update during operation, need to retry
log.debug("Got 409, will try later")
ret["changes"] = {}
ret["comment"] = "Could not delete label {}, please retry".format(name)
ret["comment"] = f"Could not delete label {name}, please retry"
else:
ret["changes"] = {"deleted": name}
ret["comment"] = "Label {} absent".format(name)
ret["comment"] = f"Label {name} absent"
return ret
@ -365,7 +371,7 @@ def label_folder_absent(name, node=None, apiserver_url=None):
# Prepare a temp labels dict
if labels == old_labels:
# Label already absent
ret["comment"] = "Label folder {} already absent".format(folder)
ret["comment"] = f"Label folder {folder} already absent"
else:
# Label needs to be delete
res = _set_labels(node, apiserver_url, labels)
@ -377,7 +383,7 @@ def label_folder_absent(name, node=None, apiserver_url=None):
)
else:
ret["changes"] = {"deleted": folder}
ret["comment"] = "Label folder {} absent".format(folder)
ret["comment"] = f"Label folder {folder} absent"
return ret
@ -386,7 +392,7 @@ def label_folder_absent(name, node=None, apiserver_url=None):
def _get_namespaces(apiserver_url, name=""):
"""Get namespace is namespace is defined otherwise return all namespaces"""
# Prepare URL
url = "{}/api/v1/namespaces/{}".format(apiserver_url, name)
url = f"{apiserver_url}/api/v1/namespaces/{name}"
# Make request
ret = http.query(url)
if ret.get("body"):
@ -398,7 +404,7 @@ def _get_namespaces(apiserver_url, name=""):
def _create_namespace(namespace, apiserver_url):
"""create namespace on the defined k8s cluster"""
# Prepare URL
url = "{}/api/v1/namespaces".format(apiserver_url)
url = f"{apiserver_url}/api/v1/namespaces"
# Prepare data
data = {"kind": "Namespace", "apiVersion": "v1", "metadata": {"name": namespace}}
log.trace("namespace creation requests: %s", data)
@ -438,9 +444,9 @@ def create_namespace(name, apiserver_url=None):
# This is a new namespace
_create_namespace(name, apiserver_url)
ret["changes"] = name
ret["comment"] = "Namespace {} created".format(name)
ret["comment"] = f"Namespace {name} created"
else:
ret["comment"] = "Namespace {} already present".format(name)
ret["comment"] = f"Namespace {name} already present"
return ret
@ -484,7 +490,7 @@ def get_namespaces(namespace="", apiserver_url=None):
def _get_secrets(namespace, name, apiserver_url):
"""Get secrets of the namespace."""
# Prepare URL
url = "{}/api/v1/namespaces/{}/secrets/{}".format(apiserver_url, namespace, name)
url = f"{apiserver_url}/api/v1/namespaces/{namespace}/secrets/{name}"
# Make request
ret = http.query(url)
if ret.get("body"):
@ -496,20 +502,20 @@ def _get_secrets(namespace, name, apiserver_url):
def _update_secret(namespace, name, data, apiserver_url):
"""Replace secrets data by a new one"""
# Prepare URL
url = "{}/api/v1/namespaces/{}/secrets/{}".format(apiserver_url, namespace, name)
url = f"{apiserver_url}/api/v1/namespaces/{namespace}/secrets/{name}"
# Prepare data
data = [{"op": "replace", "path": "/data", "value": data}]
# Make request
ret = _kpatch(url, data)
if ret.get("status") == 404:
return "Node {} doesn't exist".format(url)
return f"Node {url} doesn't exist"
return ret
def _create_secret(namespace, name, data, apiserver_url):
"""create namespace on the defined k8s cluster"""
# Prepare URL
url = "{}/api/v1/namespaces/{}/secrets".format(apiserver_url, namespace)
url = f"{apiserver_url}/api/v1/namespaces/{namespace}/secrets"
# Prepare data
request = {
"apiVersion": "v1",
@ -738,7 +744,7 @@ def create_secret(
return {
"name": name,
"result": False,
"comment": "Secret {} is already present".format(name),
"comment": f"Secret {name} is already present",
"changes": {},
}
@ -755,7 +761,7 @@ def create_secret(
if sname == encoded == "":
ret[
"comment"
] += "Source file {} is missing or name is incorrect\n".format(v)
] += f"Source file {v} is missing or name is incorrect\n"
if force:
continue
else:
@ -825,8 +831,8 @@ def delete_secret(namespace, name, apiserver_url=None, force=True):
"changes": {},
}
url = "{}/api/v1/namespaces/{}/secrets/{}".format(apiserver_url, namespace, name)
url = f"{apiserver_url}/api/v1/namespaces/{namespace}/secrets/{name}"
res = http.query(url, method="DELETE")
if res.get("body"):
ret["comment"] = "Removed secret {} in {} namespace".format(name, namespace)
ret["comment"] = f"Removed secret {name} in {namespace} namespace"
return ret

View file

@ -17,6 +17,12 @@ ADMIN_CFG = "/etc/kubernetes/admin.conf"
log = logging.getLogger(__name__)
__deprecated__ = (
3009,
"kubernetes",
"https://github.com/salt-extensions/saltext-kubernetes",
)
__virtualname__ = "kubeadm"
# Define not exported variables from Salt, so this can be imported as
@ -139,7 +145,7 @@ def version(kubeconfig=None, rootfs=None):
parameters = [("kubeconfig", kubeconfig), ("rootfs", rootfs)]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
cmd.extend(["--output", "json"])
@ -226,9 +232,9 @@ def token_create(
for parameter, value in parameters:
if value:
if parameter in ("groups", "usages"):
cmd.extend(["--{}".format(parameter), json.dumps(value)])
cmd.extend([f"--{parameter}", json.dumps(value)])
else:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd)
@ -264,7 +270,7 @@ def token_delete(token, kubeconfig=None, rootfs=None):
parameters = [("kubeconfig", kubeconfig), ("rootfs", rootfs)]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return bool(_cmd(cmd))
@ -295,7 +301,7 @@ def token_generate(kubeconfig=None, rootfs=None):
parameters = [("kubeconfig", kubeconfig), ("rootfs", rootfs)]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd)
@ -325,7 +331,7 @@ def token_list(kubeconfig=None, rootfs=None):
parameters = [("kubeconfig", kubeconfig), ("rootfs", rootfs)]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
lines = _cmd(cmd).splitlines()
@ -368,7 +374,7 @@ def alpha_certs_renew(rootfs=None):
parameters = [("rootfs", rootfs)]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd)
@ -430,7 +436,7 @@ def alpha_kubeconfig_user(
]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd)
@ -469,7 +475,7 @@ def alpha_kubelet_config_download(kubeconfig=None, kubelet_version=None, rootfs=
]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd)
@ -520,7 +526,7 @@ def alpha_kubelet_config_enable_dynamic(
]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd)
@ -574,7 +580,7 @@ def alpha_selfhosting_pivot(
]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd)
@ -627,7 +633,7 @@ def config_images_list(
]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd).splitlines()
@ -685,7 +691,7 @@ def config_images_pull(
]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
prefix = "[config/images] Pulled "
return [(line.replace(prefix, "")) for line in _cmd(cmd).splitlines()]
@ -729,7 +735,7 @@ def config_migrate(old_config, new_config=None, kubeconfig=None, rootfs=None):
]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd)
@ -769,7 +775,7 @@ def config_print_init_defaults(component_configs=None, kubeconfig=None, rootfs=N
]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd)
@ -809,7 +815,7 @@ def config_print_join_defaults(component_configs=None, kubeconfig=None, rootfs=N
]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd)
@ -843,7 +849,7 @@ def config_upload_from_file(config, kubeconfig=None, rootfs=None):
parameters = [("kubeconfig", kubeconfig), ("rootfs", rootfs)]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd)
@ -940,7 +946,7 @@ def config_upload_from_flags(
]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd)
@ -970,7 +976,7 @@ def config_view(kubeconfig=None, rootfs=None):
parameters = [("kubeconfig", kubeconfig), ("rootfs", rootfs)]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd)
@ -1132,7 +1138,7 @@ def init(
]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd)
@ -1297,7 +1303,7 @@ def join(
]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd)
@ -1364,7 +1370,7 @@ def reset(
]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
return _cmd(cmd)

View file

@ -85,6 +85,12 @@ except ImportError:
log = logging.getLogger(__name__)
__deprecated__ = (
3009,
"kubernetes",
"https://github.com/salt-extensions/saltext-kubernetes",
)
__virtualname__ = "kubernetes"
@ -1099,7 +1105,7 @@ def create_secret(
source=None,
template=None,
saltenv="base",
**kwargs
**kwargs,
):
"""
Creates the kubernetes secret as defined by the user.
@ -1274,7 +1280,7 @@ def replace_service(
old_service,
saltenv,
namespace="default",
**kwargs
**kwargs,
):
"""
Replaces an existing service with a new one defined by name and namespace,
@ -1324,7 +1330,7 @@ def replace_secret(
template=None,
saltenv="base",
namespace="default",
**kwargs
**kwargs,
):
"""
Replaces an existing secret with a new one defined by name and namespace,
@ -1379,7 +1385,7 @@ def replace_configmap(
template=None,
saltenv="base",
namespace="default",
**kwargs
**kwargs,
):
"""
Replaces an existing configmap with a new one defined by name and
@ -1446,7 +1452,7 @@ def __create_object_body(
or src_obj["kind"] != kind
):
raise CommandExecutionError(
"The source file should define only a {} object".format(kind)
f"The source file should define only a {kind} object"
)
if "metadata" in src_obj:
@ -1467,7 +1473,7 @@ def __read_and_render_yaml_file(source, template, saltenv):
"""
sfn = __salt__["cp.cache_file"](source, saltenv)
if not sfn:
raise CommandExecutionError("Source file '{}' not found".format(source))
raise CommandExecutionError(f"Source file '{source}' not found")
with salt.utils.files.fopen(sfn, "r") as src:
contents = src.read()
@ -1496,9 +1502,7 @@ def __read_and_render_yaml_file(source, template, saltenv):
contents = data["data"].encode("utf-8")
else:
raise CommandExecutionError(
"Unknown template specified: {}".format(template)
)
raise CommandExecutionError(f"Unknown template specified: {template}")
return salt.utils.yaml.safe_load(contents)

View file

@ -73,7 +73,7 @@ def version():
salt '*' nftables.version
"""
cmd = "{} --version".format(_nftables_cmd())
cmd = f"{_nftables_cmd()} --version"
out = __salt__["cmd.run"](cmd).split()
return out[1]
@ -85,7 +85,7 @@ def build_rule(
position="",
full=None,
family="ipv4",
**kwargs
**kwargs,
):
"""
Build a well-formatted nftables rule based on kwargs.
@ -260,8 +260,8 @@ def build_rule(
rule = rule.strip()
# Insert the protocol prior to dport or sport
rule = rule.replace("dport", "{} dport".format(proto))
rule = rule.replace("sport", "{} sport".format(proto))
rule = rule.replace("dport", f"{proto} dport")
rule = rule.replace("sport", f"{proto} sport")
ret["rule"] = rule
@ -442,9 +442,9 @@ def save(filename=None, family="ipv4"):
if _conf() and not filename:
filename = _conf()
# Not a typo. Invert the dictionary twice to get unique values only.
nft_families = {v: k for k, v in _NFTABLES_FAMILIES.items()}
# Invert the dictionary twice to get unique values only.
nft_families = {v: k for k, v in _NFTABLES_FAMILIES.items()}
nft_families = {v: k for k, v in nft_families.items()}
rules = "#! nft -f\n"
@ -456,16 +456,14 @@ def save(filename=None, family="ipv4"):
rules = rules + "\n"
if __salt__["file.directory_exists"](filename):
filename = "{}/salt-all-in-one.nft".format(filename)
filename = f"{filename}/salt-all-in-one.nft"
try:
with salt.utils.files.fopen(filename, "wb") as _fh:
# Write out any changes
_fh.write(salt.utils.data.encode(rules))
except OSError as exc:
raise CommandExecutionError(
"Problem writing to configuration file: {}".format(exc)
)
raise CommandExecutionError(f"Problem writing to configuration file: {exc}")
return rules
@ -519,12 +517,12 @@ def get_rule_handle(table="filter", chain=None, rule=None, family="ipv4"):
out = __salt__["cmd.run"](cmd, python_shell=False)
rules = re.split("\n+", out)
pat = re.compile(r"{} # handle (?P<handle>\d+)".format(rule))
pat = re.compile(rf"{rule} # handle (?P<handle>\d+)")
for r in rules:
match = pat.search(r)
if match:
return {"result": True, "handle": match.group("handle")}
return {"result": False, "comment": "Could not find rule {}".format(rule)}
return {"result": False, "comment": f"Could not find rule {rule}"}
def check(table="filter", chain=None, rule=None, family="ipv4"):
@ -570,7 +568,7 @@ def check(table="filter", chain=None, rule=None, family="ipv4"):
cmd = "{} --handle --numeric --numeric --numeric list chain {} {} {}".format(
_nftables_cmd(), nft_family, table, chain
)
search_rule = "{} #".format(rule)
search_rule = f"{rule} #"
out = __salt__["cmd.run"](cmd, python_shell=False).find(search_rule)
if out == -1:
@ -610,10 +608,8 @@ def check_chain(table="filter", chain=None, family="ipv4"):
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = "{} list table {} {}".format(_nftables_cmd(), nft_family, table)
out = __salt__["cmd.run"](cmd, python_shell=False).find(
"chain {0} {{".format(chain)
)
cmd = f"{_nftables_cmd()} list table {nft_family} {table}"
out = __salt__["cmd.run"](cmd, python_shell=False).find(f"chain {chain} {{")
if out == -1:
ret["comment"] = "Chain {} in table {} in family {} does not exist".format(
@ -644,15 +640,15 @@ def check_table(table=None, family="ipv4"):
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = "{} list tables {}".format(_nftables_cmd(), nft_family)
cmd = f"{_nftables_cmd()} list tables {nft_family}"
out = __salt__["cmd.run"](cmd, python_shell=False).find(
"table {} {}".format(nft_family, table)
f"table {nft_family} {table}"
)
if out == -1:
ret["comment"] = "Table {} in family {} does not exist".format(table, family)
ret["comment"] = f"Table {table} in family {family} does not exist"
else:
ret["comment"] = "Table {} in family {} exists".format(table, family)
ret["comment"] = f"Table {table} in family {family} exists"
ret["result"] = True
return ret
@ -683,11 +679,11 @@ def new_table(table, family="ipv4"):
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = "{} add table {} {}".format(_nftables_cmd(), nft_family, table)
cmd = f"{_nftables_cmd()} add table {nft_family} {table}"
out = __salt__["cmd.run"](cmd, python_shell=False)
if not out:
ret["comment"] = "Table {} in family {} created".format(table, family)
ret["comment"] = f"Table {table} in family {family} created"
ret["result"] = True
else:
ret["comment"] = "Table {} in family {} could not be created".format(
@ -722,11 +718,11 @@ def delete_table(table, family="ipv4"):
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = "{} delete table {} {}".format(_nftables_cmd(), nft_family, table)
cmd = f"{_nftables_cmd()} delete table {nft_family} {table}"
out = __salt__["cmd.run"](cmd, python_shell=False)
if not out:
ret["comment"] = "Table {} in family {} deleted".format(table, family)
ret["comment"] = f"Table {table} in family {family} deleted"
ret["result"] = True
else:
ret["comment"] = "Table {} in family {} could not be deleted".format(
@ -780,7 +776,7 @@ def new_chain(
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = "{} -- add chain {} {} {}".format(_nftables_cmd(), nft_family, table, chain)
cmd = f"{_nftables_cmd()} -- add chain {nft_family} {table} {chain}"
if table_type or hook or priority:
if table_type and hook and str(priority):
cmd = r"{0} \{{ type {1} hook {2} priority {3}\; \}}".format(
@ -841,7 +837,7 @@ def delete_chain(table="filter", chain=None, family="ipv4"):
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = "{} delete chain {} {} {}".format(_nftables_cmd(), nft_family, table, chain)
cmd = f"{_nftables_cmd()} delete chain {nft_family} {table} {chain}"
out = __salt__["cmd.run"](cmd, python_shell=False)
if not out:
@ -962,7 +958,7 @@ def insert(table="filter", chain=None, position=None, rule=None, family="ipv4"):
family=ipv6
"""
ret = {
"comment": "Failed to insert rule {} to table {}.".format(rule, table),
"comment": f"Failed to insert rule {rule} to table {table}.",
"result": False,
}
@ -1043,7 +1039,7 @@ def delete(table, chain=None, position=None, rule=None, family="ipv4"):
family=ipv6
"""
ret = {
"comment": "Failed to delete rule {} in table {}.".format(rule, table),
"comment": f"Failed to delete rule {rule} in table {table}.",
"result": False,
}
@ -1131,17 +1127,17 @@ def flush(table="filter", chain="", family="ipv4"):
cmd = "{} flush chain {} {} {}".format(
_nftables_cmd(), nft_family, table, chain
)
comment = "from chain {} in table {} in family {}.".format(chain, table, family)
comment = f"from chain {chain} in table {table} in family {family}."
else:
cmd = "{} flush table {} {}".format(_nftables_cmd(), nft_family, table)
comment = "from table {} in family {}.".format(table, family)
cmd = f"{_nftables_cmd()} flush table {nft_family} {table}"
comment = f"from table {table} in family {family}."
out = __salt__["cmd.run"](cmd, python_shell=False)
if not out:
ret["result"] = True
ret["comment"] = "Flushed rules {}".format(comment)
ret["comment"] = f"Flushed rules {comment}"
else:
ret["comment"] = "Failed to flush rules {}".format(comment)
ret["comment"] = f"Failed to flush rules {comment}"
return ret
@ -1239,7 +1235,7 @@ def set_policy(table="filter", chain=None, policy=None, family="ipv4"):
if not chain_info:
return False
cmd = "{} add chain {} {} {}".format(_nftables_cmd(), nft_family, table, chain)
cmd = f"{_nftables_cmd()} add chain {nft_family} {table} {chain}"
# We can't infer the base chain parameters. Bail out if they're not present.
if "type" not in chain_info or "hook" not in chain_info or "prio" not in chain_info:
@ -1249,7 +1245,7 @@ def set_policy(table="filter", chain=None, policy=None, family="ipv4"):
chain_info["type"], chain_info["hook"], chain_info["prio"]
)
cmd = '{0} "{{ {1} policy {2}; }}"'.format(cmd, params, policy)
cmd = f'{cmd} "{{ {params} policy {policy}; }}"'
out = __salt__["cmd.run_all"](cmd, python_shell=False)

View file

@ -253,7 +253,6 @@ def _get_env_activate(bin_env):
def _find_req(link):
logger.info("_find_req -- link = %s", link)
with salt.utils.files.fopen(link) as fh_link:
@ -849,9 +848,11 @@ def install(
cmd.extend(["--build", build])
# Use VENV_PIP_TARGET environment variable value as target
# if set and no target specified on the function call
# if set and no target specified on the function call.
# Do not set target if bin_env specified, use default
# for specified binary environment or expect explicit target specification.
target_env = os.environ.get("VENV_PIP_TARGET", None)
if target is None and target_env is not None:
if target is None and target_env is not None and bin_env is None:
target = target_env
if target:

View file

@ -25,6 +25,12 @@ from salt.exceptions import SaltInvocationError
log = logging.getLogger(__name__)
__virtualname__ = "pushover"
__deprecated__ = (
3009,
"pushover",
"https://github.com/saltstack/saltext-pushover",
)
def __virtual__():
"""

View file

@ -70,13 +70,15 @@ __proxyenabled__ = ["*"]
log = logging.getLogger(__name__)
TOP_ENVS_CKEY = "saltutil._top_file_envs"
def _get_top_file_envs():
"""
Get all environments from the top file
"""
try:
return __context__["saltutil._top_file_envs"]
return __context__[TOP_ENVS_CKEY]
except KeyError:
with salt.state.HighState(__opts__, initial_pillar=__pillar__.value()) as st_:
try:
@ -87,7 +89,7 @@ def _get_top_file_envs():
envs = "base"
except SaltRenderError as exc:
raise CommandExecutionError(f"Unable to render top file(s): {exc}")
__context__["saltutil._top_file_envs"] = envs
__context__[TOP_ENVS_CKEY] = envs
return envs
@ -244,10 +246,6 @@ def sync_sdb(saltenv=None, extmod_whitelist=None, extmod_blacklist=None):
<states-top>` will be checked for sdb modules to sync. If no top files
are found, then the ``base`` environment will be synced.
refresh : False
This argument has no affect and is included for consistency with the
other sync functions.
extmod_whitelist : None
comma-separated list of modules to sync
@ -473,8 +471,7 @@ def sync_renderers(
refresh : True
If ``True``, refresh the available execution modules on the minion.
This refresh will be performed even if no new renderers are synced.
Set to ``False`` to prevent this refresh. Set to ``False`` to prevent
this refresh.
Set to ``False`` to prevent this refresh.
extmod_whitelist : None
comma-separated list of modules to sync
@ -973,6 +970,57 @@ def sync_pillar(
return ret
def sync_tops(
saltenv=None,
refresh=True,
extmod_whitelist=None,
extmod_blacklist=None,
):
"""
.. versionadded:: 3007.0
Sync master tops from ``salt://_tops`` to the minion.
saltenv
The fileserver environment from which to sync. To sync from more than
one environment, pass a comma-separated list.
If not passed, then all environments configured in the :ref:`top files
<states-top>` will be checked for master tops to sync. If no top files
are found, then the ``base`` environment will be synced.
refresh : True
Refresh this module's cache containing the environments from which
extension modules are synced when ``saltenv`` is not specified.
This refresh will be performed even if no new master tops are synced.
Set to ``False`` to prevent this refresh.
extmod_whitelist : None
comma-separated list of modules to sync
extmod_blacklist : None
comma-separated list of modules to blacklist based on type
.. note::
This function will raise an error if executed on a traditional (i.e.
not masterless) minion
CLI Examples:
.. code-block:: bash
salt '*' saltutil.sync_tops
salt '*' saltutil.sync_tops saltenv=dev
"""
if __opts__["file_client"] != "local":
raise CommandExecutionError(
"Master top modules can only be synced to masterless minions"
)
if refresh:
__context__.pop(TOP_ENVS_CKEY, None)
return _sync("tops", saltenv, extmod_whitelist, extmod_blacklist)
def sync_executors(
saltenv=None, refresh=True, extmod_whitelist=None, extmod_blacklist=None
):
@ -1071,6 +1119,13 @@ def sync_all(
clean_pillar_cache=False,
):
"""
.. versionchanged:: 3007.0
On masterless minions, master top modules are now synced as well.
When ``refresh`` is set to ``True``, this module's cache containing
the environments from which extension modules are synced when
``saltenv`` is not specified will be refreshed.
.. versionchanged:: 2015.8.11,2016.3.2
On masterless minions, pillar modules are now synced, and refreshed
when ``refresh`` is set to ``True``.
@ -1081,7 +1136,9 @@ def sync_all(
refresh : True
Also refresh the execution modules and recompile pillar data available
to the minion. This refresh will be performed even if no new dynamic
to the minion. If this is a masterless minion, also refresh the environments
from which extension modules are synced after syncing master tops.
This refresh will be performed even if no new dynamic
modules are synced. Set to ``False`` to prevent this refresh.
.. important::
@ -1121,6 +1178,9 @@ def sync_all(
"""
log.debug("Syncing all")
ret = {}
if __opts__["file_client"] == "local":
# Sync tops first since this might influence the other syncs
ret["tops"] = sync_tops(saltenv, refresh, extmod_whitelist, extmod_blacklist)
ret["clouds"] = sync_clouds(saltenv, False, extmod_whitelist, extmod_blacklist)
ret["beacons"] = sync_beacons(saltenv, False, extmod_whitelist, extmod_blacklist)
ret["modules"] = sync_modules(saltenv, False, extmod_whitelist, extmod_blacklist)

View file

@ -138,7 +138,7 @@ def _check_for_unit_changes(name):
Check for modified/updated unit files, and run a daemon-reload if any are
found.
"""
contextkey = "systemd._check_for_unit_changes.{}".format(name)
contextkey = f"systemd._check_for_unit_changes.{name}"
if contextkey not in __context__:
if _untracked_custom_unit_found(name) or _unit_file_changed(name):
systemctl_reload()
@ -329,7 +329,9 @@ def _strip_scope(msg):
return "\n".join(ret).strip()
def _systemctl_cmd(action, name=None, systemd_scope=False, no_block=False, root=None):
def _systemctl_cmd(
action, name=None, systemd_scope=False, no_block=False, root=None, extra_args=None
):
"""
Build a systemctl command line. Treat unit names without one
of the valid suffixes as a service.
@ -353,6 +355,8 @@ def _systemctl_cmd(action, name=None, systemd_scope=False, no_block=False, root=
ret.append(_canonical_unit_name(name))
if "status" in ret:
ret.extend(["-n", "0"])
if isinstance(extra_args, list):
ret.extend(extra_args)
return ret
@ -380,7 +384,7 @@ def _sysv_enabled(name, root):
runlevel.
"""
# Find exact match (disambiguate matches like "S01anacron" for cron)
rc = _root("/etc/rc{}.d/S*{}".format(_runlevel(), name), root)
rc = _root(f"/etc/rc{_runlevel()}.d/S*{name}", root)
for match in glob.glob(rc):
if re.match(r"S\d{,2}%s" % name, os.path.basename(match)):
return True
@ -1289,15 +1293,27 @@ def enabled(name, root=None, **kwargs): # pylint: disable=unused-argument
# Try 'systemctl is-enabled' first, then look for a symlink created by
# systemctl (older systemd releases did not support using is-enabled to
# check templated services), and lastly check for a sysvinit service.
if (
__salt__["cmd.retcode"](
_systemctl_cmd("is-enabled", name, root=root),
python_shell=False,
ignore_retcode=True,
)
== 0
):
cmd_result = __salt__["cmd.run_all"](
_systemctl_cmd("is-enabled", name, root=root),
python_shell=False,
ignore_retcode=True,
)
if cmd_result["retcode"] == 0 and cmd_result["stdout"] != "alias":
return True
elif cmd_result["stdout"] == "alias":
# check the service behind the alias
aliased_name = __salt__["cmd.run_stdout"](
_systemctl_cmd("show", name, root=root, extra_args=["-P", "Id"]),
python_shell=False,
)
if (
__salt__["cmd.retcode"](
_systemctl_cmd("is-enabled", aliased_name, root=root),
python_shell=False,
ignore_retcode=True,
)
) == 0:
return True
elif "@" in name:
# On older systemd releases, templated services could not be checked
# with ``systemctl is-enabled``. As a fallback, look for the symlinks
@ -1454,7 +1470,7 @@ def firstboot(
]
for parameter, value in parameters:
if value:
cmd.extend(["--{}".format(parameter), str(value)])
cmd.extend([f"--{parameter}", str(value)])
out = __salt__["cmd.run_all"](cmd)

View file

@ -91,6 +91,12 @@ log = logging.getLogger(__name__)
__virtualname__ = "pushover"
__deprecated__ = (
3009,
"pushover",
"https://github.com/saltstack/saltext-pushover",
)
def _get_options(ret=None):
"""

View file

@ -43,6 +43,7 @@ import salt.utils.msgpack
import salt.utils.platform
import salt.utils.process
import salt.utils.url
import salt.utils.verify
# Explicit late import to avoid circular import. DO NOT MOVE THIS.
import salt.utils.yamlloader as yamlloader
@ -4243,8 +4244,43 @@ class BaseHighState:
Get results from the master_tops system. Override this function if the
execution of the master_tops needs customization.
"""
if self.opts.get("file_client", "remote") == "local":
return self._local_master_tops()
return self.client.master_tops()
def _local_master_tops(self):
# return early if we got nothing to do
if "master_tops" not in self.opts:
return {}
if "id" not in self.opts:
log.error("Received call for external nodes without an id")
return {}
if not salt.utils.verify.valid_id(self.opts, self.opts["id"]):
return {}
if getattr(self, "tops", None) is None:
self.tops = salt.loader.tops(self.opts)
grains = {}
ret = {}
if "grains" in self.opts:
grains = self.opts["grains"]
for fun in self.tops:
if fun not in self.opts["master_tops"]:
continue
try:
ret = salt.utils.dictupdate.merge(
ret, self.tops[fun](opts=self.opts, grains=grains), merge_lists=True
)
except Exception as exc: # pylint: disable=broad-except
# If anything happens in the top generation, log it and move on
log.error(
"Top function %s failed with error %s for minion %s",
fun,
exc,
self.opts["id"],
)
return ret
def load_dynamic(self, matches):
"""
If autoload_dynamic_modules is True then automatically load the

View file

@ -8,6 +8,8 @@ Management of the GPG keychains
import logging
import salt.utils.dictupdate
log = logging.getLogger(__name__)
_VALID_TRUST_VALUES = [
@ -33,30 +35,28 @@ def present(
name, keys=None, user=None, keyserver=None, gnupghome=None, trust=None, **kwargs
):
"""
Ensure GPG public key is present in keychain
Ensure a GPG public key is present in the GPG keychain.
name
The unique name or keyid for the GPG public key.
The key ID of the GPG public key.
keys
The keyId or keyIds to add to the GPG keychain.
The key ID or key IDs to add to the GPG keychain.
user
Add GPG keys to the specified user's keychain
Add GPG keys to the specified user's keychain.
keyserver
The keyserver to retrieve the keys from.
gnupghome
Override GNUPG Home directory
Override GnuPG home directory.
trust
Trust level for the key in the keychain,
ignored by default. Valid trust levels:
ignored by default. Valid trust levels:
expired, unknown, not_trusted, marginally,
fully, ultimately
"""
ret = {"name": name, "result": True, "changes": {}, "comment": []}
@ -80,42 +80,62 @@ def present(
if trust:
if trust in _VALID_TRUST_VALUES:
if current_keys[key]["trust"] != TRUST_MAP[trust]:
if __opts__["test"]:
ret["result"] = None
ret["comment"].append(
f"Would have set trust level for {key} to {trust}"
)
salt.utils.dictupdate.set_dict_key_value(
ret, f"changes:{key}:trust", trust
)
continue
# update trust level
result = __salt__["gpg.trust_key"](
keyid=key,
trust_level=trust,
user=user,
)
if "result" in result and not result["result"]:
ret["result"] = result["result"]
ret["comment"].append(result["comment"])
if result["res"] is False:
ret["result"] = result["res"]
ret["comment"].append(result["message"])
else:
salt.utils.dictupdate.set_dict_key_value(
ret, f"changes:{key}:trust", trust
)
ret["comment"].append(
"Set trust level for {} to {}".format(key, trust)
f"Set trust level for {key} to {trust}"
)
else:
ret["comment"].append(
"GPG Public Key {} already in correct trust state".format(
key
)
f"GPG Public Key {key} already in correct trust state"
)
else:
ret["comment"].append("Invalid trust level {}".format(trust))
ret["comment"].append(f"Invalid trust level {trust}")
ret["comment"].append("GPG Public Key {} already in keychain ".format(key))
ret["comment"].append(f"GPG Public Key {key} already in keychain")
else:
if __opts__["test"]:
ret["result"] = None
ret["comment"].append(f"Would have added {key} to GPG keychain")
salt.utils.dictupdate.set_dict_key_value(
ret, f"changes:{key}:added", True
)
continue
result = __salt__["gpg.receive_keys"](
keyserver,
key,
user,
gnupghome,
)
if "result" in result and not result["result"]:
ret["result"] = result["result"]
ret["comment"].append(result["comment"])
if result["res"] is False:
ret["result"] = result["res"]
ret["comment"].extend(result["message"])
else:
ret["comment"].append("Adding {} to GPG keychain".format(name))
ret["comment"].append(f"Added {key} to GPG keychain")
salt.utils.dictupdate.set_dict_key_value(
ret, f"changes:{key}:added", True
)
if trust:
if trust in _VALID_TRUST_VALUES:
@ -124,15 +144,13 @@ def present(
trust_level=trust,
user=user,
)
if "result" in result and not result["result"]:
ret["result"] = result["result"]
ret["comment"].append(result["comment"])
if result["res"] is False:
ret["result"] = result["res"]
ret["comment"].append(result["message"])
else:
ret["comment"].append(
"Set trust level for {} to {}".format(key, trust)
)
ret["comment"].append(f"Set trust level for {key} to {trust}")
else:
ret["comment"].append("Invalid trust level {}".format(trust))
ret["comment"].append(f"Invalid trust level {trust}")
ret["comment"] = "\n".join(ret["comment"])
return ret
@ -140,25 +158,24 @@ def present(
def absent(name, keys=None, user=None, gnupghome=None, **kwargs):
"""
Ensure GPG public key is absent in keychain
Ensure a GPG public key is absent from the keychain.
name
The unique name or keyid for the GPG public key.
The key ID of the GPG public key.
keys
The keyId or keyIds to add to the GPG keychain.
The key ID or key IDs to remove from the GPG keychain.
user
Remove GPG keys from the specified user's keychain
Remove GPG keys from the specified user's keychain.
gnupghome
Override GNUPG Home directory
Override GnuPG home directory.
"""
ret = {"name": name, "result": True, "changes": {}, "comment": []}
_current_keys = __salt__["gpg.list_keys"]()
_current_keys = __salt__["gpg.list_keys"](user=user, gnupghome=gnupghome)
current_keys = []
for key in _current_keys:
@ -172,17 +189,23 @@ def absent(name, keys=None, user=None, gnupghome=None, **kwargs):
for key in keys:
if key in current_keys:
if __opts__["test"]:
ret["result"] = None
ret["comment"].append(f"Would have deleted {key} from GPG keychain")
salt.utils.dictupdate.append_dict_key_value(ret, "changes:deleted", key)
continue
result = __salt__["gpg.delete_key"](
key,
user,
gnupghome,
keyid=key,
user=user,
gnupghome=gnupghome,
)
if "result" in result and not result["result"]:
ret["result"] = result["result"]
ret["comment"].append(result["comment"])
if result["res"] is False:
ret["result"] = result["res"]
ret["comment"].append(result["message"])
else:
ret["comment"].append("Deleting {} from GPG keychain".format(name))
ret["comment"].append(f"Deleted {key} from GPG keychain")
salt.utils.dictupdate.append_dict_key_value(ret, "changes:deleted", key)
else:
ret["comment"].append("{} not found in GPG keychain".format(name))
ret["comment"].append(f"{key} not found in GPG keychain")
ret["comment"] = "\n".join(ret["comment"])
return ret

View file

@ -85,6 +85,12 @@ import logging
log = logging.getLogger(__name__)
__deprecated__ = (
3009,
"kubernetes",
"https://github.com/salt-extensions/saltext-kubernetes",
)
def __virtual__():
"""
@ -136,7 +142,7 @@ def deployment_absent(name, namespace="default", **kwargs):
ret["changes"] = {"kubernetes.deployment": {"new": "absent", "old": "present"}}
ret["comment"] = res["message"]
else:
ret["comment"] = "Something went wrong, response: {}".format(res)
ret["comment"] = f"Something went wrong, response: {res}"
return ret
@ -148,7 +154,7 @@ def deployment_present(
spec=None,
source="",
template="",
**kwargs
**kwargs,
):
"""
Ensures that the named deployment is present inside of the specified
@ -203,9 +209,9 @@ def deployment_present(
source=source,
template=template,
saltenv=__env__,
**kwargs
**kwargs,
)
ret["changes"]["{}.{}".format(namespace, name)] = {"old": {}, "new": res}
ret["changes"][f"{namespace}.{name}"] = {"old": {}, "new": res}
else:
if __opts__["test"]:
ret["result"] = None
@ -222,7 +228,7 @@ def deployment_present(
source=source,
template=template,
saltenv=__env__,
**kwargs
**kwargs,
)
ret["changes"] = {"metadata": metadata, "spec": spec}
@ -237,7 +243,7 @@ def service_present(
spec=None,
source="",
template="",
**kwargs
**kwargs,
):
"""
Ensures that the named service is present inside of the specified namespace
@ -292,9 +298,9 @@ def service_present(
source=source,
template=template,
saltenv=__env__,
**kwargs
**kwargs,
)
ret["changes"]["{}.{}".format(namespace, name)] = {"old": {}, "new": res}
ret["changes"][f"{namespace}.{name}"] = {"old": {}, "new": res}
else:
if __opts__["test"]:
ret["result"] = None
@ -312,7 +318,7 @@ def service_present(
template=template,
old_service=service,
saltenv=__env__,
**kwargs
**kwargs,
)
ret["changes"] = {"metadata": metadata, "spec": spec}
@ -351,7 +357,7 @@ def service_absent(name, namespace="default", **kwargs):
ret["changes"] = {"kubernetes.service": {"new": "absent", "old": "present"}}
ret["comment"] = res["message"]
else:
ret["comment"] = "Something went wrong, response: {}".format(res)
ret["comment"] = f"Something went wrong, response: {res}"
return ret
@ -391,7 +397,7 @@ def namespace_absent(name, **kwargs):
else:
ret["comment"] = "Terminating"
else:
ret["comment"] = "Something went wrong, response: {}".format(res)
ret["comment"] = f"Something went wrong, response: {res}"
return ret
@ -506,9 +512,9 @@ def secret_present(
source=source,
template=template,
saltenv=__env__,
**kwargs
**kwargs,
)
ret["changes"]["{}.{}".format(namespace, name)] = {"old": {}, "new": res}
ret["changes"][f"{namespace}.{name}"] = {"old": {}, "new": res}
else:
if __opts__["test"]:
ret["result"] = None
@ -525,7 +531,7 @@ def secret_present(
source=source,
template=template,
saltenv=__env__,
**kwargs
**kwargs,
)
ret["changes"] = {
@ -620,9 +626,9 @@ def configmap_present(
source=source,
template=template,
saltenv=__env__,
**kwargs
**kwargs,
)
ret["changes"]["{}.{}".format(namespace, name)] = {"old": {}, "new": res}
ret["changes"][f"{namespace}.{name}"] = {"old": {}, "new": res}
else:
if __opts__["test"]:
ret["result"] = None
@ -639,7 +645,7 @@ def configmap_present(
source=source,
template=template,
saltenv=__env__,
**kwargs
**kwargs,
)
ret["changes"] = {"data": res["data"]}
@ -681,7 +687,7 @@ def pod_absent(name, namespace="default", **kwargs):
else:
ret["comment"] = res["message"]
else:
ret["comment"] = "Something went wrong, response: {}".format(res)
ret["comment"] = f"Something went wrong, response: {res}"
return ret
@ -693,7 +699,7 @@ def pod_present(
spec=None,
source="",
template="",
**kwargs
**kwargs,
):
"""
Ensures that the named pod is present inside of the specified
@ -748,9 +754,9 @@ def pod_present(
source=source,
template=template,
saltenv=__env__,
**kwargs
**kwargs,
)
ret["changes"]["{}.{}".format(namespace, name)] = {"old": {}, "new": res}
ret["changes"][f"{namespace}.{name}"] = {"old": {}, "new": res}
else:
if __opts__["test"]:
ret["result"] = None
@ -896,7 +902,7 @@ def node_label_present(name, node, value, **kwargs):
old_labels = copy.copy(labels)
labels[name] = value
ret["changes"]["{}.{}".format(node, name)] = {"old": old_labels, "new": labels}
ret["changes"][f"{node}.{name}"] = {"old": old_labels, "new": labels}
ret["result"] = True
return ret

View file

@ -839,6 +839,13 @@ def installed(
ret["comment"] = "\n".join(comments)
return ret
# If the user does not exist, stop here with error:
if user and "user.info" in __salt__ and not __salt__["user.info"](user):
# The user does not exists, exit with result set to False
ret["result"] = False
ret["comment"] = f"User {user} does not exist"
return ret
# If a requirements file is specified, only install the contents of the
# requirements file. Similarly, using the --editable flag with pip should
# also ignore the "name" and "pkgs" parameters.

View file

@ -27,6 +27,12 @@ The api key can be specified in the master or minion configuration like below:
"""
__deprecated__ = (
3009,
"pushover",
"https://github.com/saltstack/saltext-pushover",
)
def __virtual__():
"""
@ -109,11 +115,11 @@ def post_message(
return ret
if not user:
ret["comment"] = "PushOver user is missing: {}".format(user)
ret["comment"] = f"PushOver user is missing: {user}"
return ret
if not message:
ret["comment"] = "PushOver message is missing: {}".format(message)
ret["comment"] = f"PushOver message is missing: {message}"
return ret
result = __salt__["pushover.post_message"](
@ -129,8 +135,8 @@ def post_message(
if result:
ret["result"] = True
ret["comment"] = "Sent message: {}".format(name)
ret["comment"] = f"Sent message: {name}"
else:
ret["comment"] = "Failed to send message: {}".format(name)
ret["comment"] = f"Failed to send message: {name}"
return ret

View file

@ -309,6 +309,20 @@ def sync_states(name, **kwargs):
return _sync_single(name, "states", **kwargs)
def sync_tops(name, **kwargs):
"""
Performs the same task as saltutil.sync_tops module
See :mod:`saltutil module for full list of options <salt.modules.saltutil>`
.. code-block:: yaml
sync_everything:
saltutil.sync_tops:
- refresh: True
"""
return _sync_single(name, "tops", **kwargs)
def sync_thorium(name, **kwargs):
"""
Performs the same task as saltutil.sync_thorium module

View file

@ -216,6 +216,8 @@ def present(name, **kwargs):
offline
Add the scheduled job to the Salt minion when the Salt minion is not running.
.. versionadded:: 3006.3
"""
ret = {"name": name, "result": True, "changes": {}, "comment": []}

View file

@ -24,6 +24,8 @@
from collections.abc import Callable
import salt.utils.versions
try:
# pylint: disable=E0611,minimum-python-version
import collections
@ -72,6 +74,14 @@ except (ImportError, AttributeError):
because their insertion order is arbitrary.
"""
salt.utils.versions.warn_until(
3009,
"The Salt backport `OrderedDict` class introduced for Python 2 "
"has been deprecated, and is set to be removed in {version}. "
"Please import `OrderedDict` from `collections`.",
category=DeprecationWarning,
)
super().__init__()
if len(args) > 1:
raise TypeError(f"expected at most 1 arguments, got {len(args)}")

View file

@ -0,0 +1,33 @@
from pathlib import Path
import pytest
from tests.support.runtests import RUNTIME_VARS
@pytest.fixture(scope="module")
def minion_config_overrides():
return {"master_tops": {"master_tops_test": True}}
@pytest.fixture(scope="module", autouse=True)
def _master_tops_test(state_tree, loaders):
mod_contents = (
Path(RUNTIME_VARS.FILES) / "extension_modules" / "tops" / "master_tops_test.py"
).read_text()
try:
with pytest.helpers.temp_file(
"master_tops_test.py", mod_contents, state_tree / "_tops"
):
res = loaders.modules.saltutil.sync_tops()
assert "tops.master_tops_test" in res
yield
finally:
loaders.modules.saltutil.sync_tops()
def test_masterless_master_tops(loaders):
res = loaders.modules.state.show_top()
assert res
assert "base" in res
assert "master_tops_test" in res["base"]

View file

@ -0,0 +1,142 @@
import shutil
import subprocess
import psutil
import pytest
gnupglib = pytest.importorskip("gnupg", reason="Needs python-gnupg library")
pytestmark = [
pytest.mark.skip_if_binaries_missing("gpg", reason="Needs gpg binary"),
pytest.mark.windows_whitelisted,
]
@pytest.fixture
def gpghome(tmp_path):
root = tmp_path / "gpghome"
root.mkdir(mode=0o0700)
try:
yield root
finally:
# Make sure we don't leave any gpg-agents running behind
gpg_connect_agent = shutil.which("gpg-connect-agent")
if gpg_connect_agent:
gnupghome = root / ".gnupg"
if not gnupghome.is_dir():
gnupghome = root
try:
subprocess.run(
[gpg_connect_agent, "killagent", "/bye"],
env={"GNUPGHOME": str(gnupghome)},
shell=False,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except subprocess.CalledProcessError:
# This is likely CentOS 7 or Amazon Linux 2
pass
# If the above errored or was not enough, as a last resort, let's check
# the running processes.
for proc in psutil.process_iter():
try:
if "gpg-agent" in proc.name():
for arg in proc.cmdline():
if str(root) in arg:
proc.terminate()
except Exception: # pylint: disable=broad-except
pass
@pytest.fixture
def gpg(loaders, states, gpghome):
try:
yield states.gpg
finally:
pass
@pytest.fixture
def key_a_fp():
return "EF03765F59EE904930C8A781553A82A058C0C795"
@pytest.fixture
def key_a_pub():
return """\
-----BEGIN PGP PUBLIC KEY BLOCK-----
mI0EY4fxHQEEAJvXEaaw+o/yZCwMOJbt5FQHbVMMDX/0YI8UdzsE5YCC4iKnoC3x
FwFdkevKj3qp+45iBGLLnalfXIcVGXJGACB+tPHgsfHaXSDQPSfmX6jbZ6pHosSm
v1tTixY+NTJzGL7hDLz2sAXTbYmTbXeE9ifWWk6NcIwZivUbhNRBM+KxABEBAAG0
LUtleSBBIChHZW5lcmF0ZWQgYnkgU2FsdFN0YWNrKSA8a2V5YUBleGFtcGxlPojR
BBMBCAA7FiEE7wN2X1nukEkwyKeBVTqCoFjAx5UFAmOH8R0CGy8FCwkIBwICIgIG
FQoJCAsCBBYCAwECHgcCF4AACgkQVTqCoFjAx5XURAQAguOwI+49lG0Kby+Bsyv3
of3GgxvhS1Qa7+ysj088az5GVt0pqVe3SbRVvn/jyC6yZvWuv94KdL3R7hCeEz2/
JakCRJ4wxEsdeASE8t9H/oTqD0I5asMa9EMvn5ICEGeLsTeQb7OYYihTQj7HJLG6
pDEmK8EhJDvV/9o0lnhm/9w=
=Wc0O
-----END PGP PUBLIC KEY BLOCK-----"""
@pytest.fixture
def gnupg(gpghome):
return gnupglib.GPG(gnupghome=str(gpghome))
@pytest.fixture
def gnupg_keyring(gpghome, keyring):
return gnupglib.GPG(gnupghome=str(gpghome), keyring=keyring)
@pytest.fixture(params=["a"])
def _pubkeys_present(gnupg, request):
pubkeys = [request.getfixturevalue(f"key_{x}_pub") for x in request.param]
fingerprints = [request.getfixturevalue(f"key_{x}_fp") for x in request.param]
gnupg.import_keys("\n".join(pubkeys))
present_keys = gnupg.list_keys()
for fp in fingerprints:
assert any(x["fingerprint"] == fp for x in present_keys)
yield
# cleanup is taken care of by gpghome and tmp_path
@pytest.mark.usefixtures("_pubkeys_present")
def test_gpg_present_no_changes(gpghome, gpg, gnupg, key_a_fp):
assert gnupg.list_keys(keys=key_a_fp)
ret = gpg.present(
key_a_fp[-16:], trust="unknown", gnupghome=str(gpghome), keyserver="nonexistent"
)
assert ret.result
assert not ret.changes
@pytest.mark.usefixtures("_pubkeys_present")
def test_gpg_absent(gpghome, gpg, gnupg, key_a_fp):
assert gnupg.list_keys(keys=key_a_fp)
assert not gnupg.list_keys(keys=key_a_fp, secret=True)
ret = gpg.absent(key_a_fp[-16:], gnupghome=str(gpghome))
assert ret.result
assert ret.changes
assert "deleted" in ret.changes
assert ret.changes["deleted"]
def test_gpg_absent_no_changes(gpghome, gpg, gnupg, key_a_fp):
assert not gnupg.list_keys(keys=key_a_fp)
ret = gpg.absent(key_a_fp[-16:], gnupghome=str(gpghome))
assert ret.result
assert not ret.changes
@pytest.mark.usefixtures("_pubkeys_present")
def test_gpg_absent_test_mode_no_changes(gpghome, gpg, gnupg, key_a_fp):
assert gnupg.list_keys(keys=key_a_fp)
ret = gpg.absent(key_a_fp[-16:], gnupghome=str(gpghome), test=True)
assert ret.result is None
assert ret.changes
assert "deleted" in ret.changes
assert ret.changes["deleted"]
assert gnupg.list_keys(keys=key_a_fp)

View file

@ -15,7 +15,7 @@ import psutil
import pytest
import salt.modules.gpg as gpg
from tests.support.mock import MagicMock, call, patch
from tests.support.mock import MagicMock, Mock, call, patch
pytest.importorskip("gnupg")
@ -466,8 +466,8 @@ def test_delete_key_with_passphrase_without_gpg_passphrase_in_pillar(gpghome):
]
_expected_result = {
"res": True,
"message": "gpg_passphrase not available in pillar.",
"res": False,
"message": "Failed to delete secret key for xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx: gpg_passphrase not available in pillar.",
}
mock_opt = MagicMock(return_value="root")
@ -546,10 +546,15 @@ def test_delete_key_with_passphrase_with_gpg_passphrase_in_pillar(gpghome):
) as gnupg_delete_keys:
ret = gpg.delete_key("xxxxxxxxxxxxxxxx", delete_secret=True)
assert ret == _expected_result
gnupg_delete_keys.assert_any_call(
"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
True,
passphrase=GPG_TEST_KEY_PASSPHRASE,
)
gnupg_delete_keys.assert_called_with(
"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
False,
passphrase=GPG_TEST_KEY_PASSPHRASE,
expect_passphrase=False,
)
@ -1039,3 +1044,76 @@ def test_gpg_decrypt_message_with_gpg_passphrase_in_pillar(gpghome):
gnupghome=str(gpghome.path),
)
assert ret["res"] is True
@pytest.fixture(params={})
def _import_result_mock(request):
defaults = {
"gpg": Mock(),
"imported": 0,
"results": [],
"fingerprints": [],
"count": 0,
"no_user_id": 0,
"imported_rsa": 0,
"unchanged": 0,
"n_uids": 0,
"n_subk": 0,
"n_sigs": 0,
"n_revoc": 0,
"sec_read": 0,
"sec_imported": 0,
"sec_dups": 0,
"not_imported": 0,
"stderr": "",
"data": b"",
}
defaults.update(request.param)
import_result = MagicMock()
import_result.__bool__.return_value = False
for var, val in defaults.items():
setattr(import_result, var, val)
return import_result
@pytest.mark.parametrize(
"_import_result_mock",
(
{
"count": 1,
"stderr": "gpg: key ABCDEF0123456789: no user ID\ngpg: Total number processed: 1\n[GNUPG:] IMPORT_RES 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0\n",
},
),
indirect=True,
)
def test_gpg_receive_keys_no_user_id(_import_result_mock):
with patch("salt.modules.gpg._create_gpg") as create:
with patch.dict(
gpg.__salt__, {"user.info": MagicMock(), "config.option": Mock()}
):
create.return_value.recv_keys.return_value = _import_result_mock
res = gpg.receive_keys(keys="abc", user="abc")
assert res["res"] is False
assert any("no user ID" in x for x in res["message"])
@pytest.mark.parametrize(
"_import_result_mock",
(
{
"results": [{"fingerprint": None, "problem": "0", "text": "Other failure"}],
"stderr": "[GNUPG:] FAILURE recv-keys 167772346\ngpg: keyserver receive failed: No keyserver available\n",
"returncode": 2,
},
),
indirect=True,
)
def test_gpg_receive_keys_keyserver_unavailable(_import_result_mock):
with patch("salt.modules.gpg._create_gpg") as create:
with patch.dict(
gpg.__salt__, {"user.info": MagicMock(), "config.option": Mock()}
):
create.return_value.recv_keys.return_value = _import_result_mock
res = gpg.receive_keys(keys="abc", user="abc")
assert res["res"] is False
assert any("No keyserver available" in x for x in res["message"])

View file

@ -1101,7 +1101,6 @@ def test_install_extra_args_arguments_recursion_error():
pkg = "pep8"
mock = MagicMock(return_value={"retcode": 0, "stdout": ""})
with patch.dict(pip.__salt__, {"cmd.run_all": mock}):
pytest.raises(
TypeError,
lambda: pip.install(
@ -1522,7 +1521,7 @@ def test_list_upgrades_gt9(python_binary):
{"latest_filetype": "wheel", "version": "1.4.1", "name": "appdirs", "latest_version": "1.4.3"},
{"latest_filetype": "sdist", "version": "1.11.63", "name": "awscli", "latest_version": "1.12.1"}
]"""
mock = MagicMock(return_value={"retcode": 0, "stdout": "{}".format(eggs)})
mock = MagicMock(return_value={"retcode": 0, "stdout": f"{eggs}"})
with patch.dict(pip.__salt__, {"cmd.run_all": mock}):
with patch("salt.modules.pip.version", MagicMock(return_value="9.1.1")):
ret = pip.list_upgrades()
@ -1738,28 +1737,44 @@ def test_when_version_is_called_with_a_user_it_should_be_passed_to_undelying_run
)
def test_install_target_from_VENV_PIP_TARGET_in_resulting_command(python_binary):
@pytest.mark.parametrize(
"bin_env,target,target_env,expected_target",
[
(None, None, None, None),
(None, "/tmp/foo", None, "/tmp/foo"),
(None, None, "/tmp/bar", "/tmp/bar"),
(None, "/tmp/foo", "/tmp/bar", "/tmp/foo"),
("/tmp/venv", "/tmp/foo", None, "/tmp/foo"),
("/tmp/venv", None, "/tmp/bar", None),
("/tmp/venv", "/tmp/foo", "/tmp/bar", "/tmp/foo"),
],
)
def test_install_target_from_VENV_PIP_TARGET_in_resulting_command(
python_binary, bin_env, target, target_env, expected_target
):
pkg = "pep8"
target = "/tmp/foo"
target_env = "/tmp/bar"
mock = MagicMock(return_value={"retcode": 0, "stdout": ""})
environment = os.environ.copy()
environment["VENV_PIP_TARGET"] = target_env
real_get_pip_bin = pip._get_pip_bin
def mock_get_pip_bin(bin_env):
if not bin_env:
return real_get_pip_bin(bin_env)
return [f"{bin_env}/bin/pip"]
if target_env is not None:
environment["VENV_PIP_TARGET"] = target_env
with patch.dict(pip.__salt__, {"cmd.run_all": mock}), patch.object(
os, "environ", environment
):
pip.install(pkg)
expected = [*python_binary, "install", "--target", target_env, pkg]
mock.assert_called_with(
expected,
saltenv="base",
runas=None,
use_vt=False,
python_shell=False,
)
mock.reset_mock()
pip.install(pkg, target=target)
expected = [*python_binary, "install", "--target", target, pkg]
), patch.object(pip, "_get_pip_bin", mock_get_pip_bin):
pip.install(pkg, bin_env=bin_env, target=target)
expected_binary = python_binary
if bin_env is not None:
expected_binary = [f"{bin_env}/bin/pip"]
if expected_target is not None:
expected = [*expected_binary, "install", "--target", expected_target, pkg]
else:
expected = [*expected_binary, "install", pkg]
mock.assert_called_with(
expected,
saltenv="base",

View file

@ -0,0 +1,770 @@
import os
import pytest
import salt.modules.systemd_service as systemd
import salt.utils.systemd
from salt.exceptions import CommandExecutionError
from tests.support.mock import MagicMock, patch
@pytest.fixture()
def systemctl_status():
return {
"sshd.service": {
"stdout": """\
* sshd.service - OpenSSH Daemon
Loaded: loaded (/usr/lib/systemd/system/sshd.service; disabled; vendor preset: disabled)
Active: inactive (dead)""",
"stderr": "",
"retcode": 3,
"pid": 12345,
},
"foo.service": {
"stdout": """\
* foo.service
Loaded: not-found (Reason: No such file or directory)
Active: inactive (dead)""",
"stderr": "",
"retcode": 3,
"pid": 12345,
},
}
# This reflects systemd >= 231 behavior
@pytest.fixture()
def systemctl_status_gte_231():
return {
"bar.service": {
"stdout": "Unit bar.service could not be found.",
"stderr": "",
"retcode": 4,
"pid": 12345,
},
}
@pytest.fixture()
def list_unit_files():
return """\
service1.service enabled -
service2.service disabled -
service3.service static -
timer1.timer enabled -
timer2.timer disabled -
timer3.timer static -
service4.service enabled enabled
service5.service disabled enabled
service6.service static enabled
timer4.timer enabled enabled
timer5.timer disabled enabled
timer6.timer static enabled
service7.service enabled disabled
service8.service disabled disabled
service9.service static disabled
timer7.timer enabled disabled
timer8.timer disabled disabled
timer9.timer static disabled
service10.service enabled
service11.service disabled
service12.service static
timer10.timer enabled
timer11.timer disabled
timer12.timer static"""
@pytest.fixture()
def configure_loader_modules():
return {systemd: {}}
def test_systemctl_reload():
"""
Test to Reloads systemctl
"""
mock = MagicMock(
side_effect=[
{"stdout": "Who knows why?", "stderr": "", "retcode": 1, "pid": 12345},
{"stdout": "", "stderr": "", "retcode": 0, "pid": 54321},
]
)
with patch.dict(systemd.__salt__, {"cmd.run_all": mock}):
with pytest.raises(
CommandExecutionError,
match="Problem performing systemctl daemon-reload: Who knows why?",
):
systemd.systemctl_reload()
assert systemd.systemctl_reload() is True
def test_get_enabled(list_unit_files, systemctl_status):
"""
Test to return a list of all enabled services
"""
cmd_mock = MagicMock(return_value=list_unit_files)
listdir_mock = MagicMock(return_value=["foo", "bar", "baz", "README"])
sd_mock = MagicMock(
return_value={x.replace(".service", "") for x in systemctl_status}
)
access_mock = MagicMock(
side_effect=lambda x, y: x != os.path.join(systemd.INITSCRIPT_PATH, "README")
)
sysv_enabled_mock = MagicMock(side_effect=lambda x, _: x == "baz")
with patch.dict(systemd.__salt__, {"cmd.run": cmd_mock}):
with patch.object(os, "listdir", listdir_mock):
with patch.object(systemd, "_get_systemd_services", sd_mock):
with patch.object(os, "access", side_effect=access_mock):
with patch.object(systemd, "_sysv_enabled", sysv_enabled_mock):
assert systemd.get_enabled() == [
"baz",
"service1",
"service10",
"service4",
"service7",
"timer1.timer",
"timer10.timer",
"timer4.timer",
"timer7.timer",
]
def test_get_disabled(list_unit_files, systemctl_status):
"""
Test to return a list of all disabled services
"""
cmd_mock = MagicMock(return_value=list_unit_files)
# 'foo' should collide with the systemd services (as returned by
# sd_mock) and thus not be returned by _get_sysv_services(). It doesn't
# matter that it's not part of the _LIST_UNIT_FILES output, we just
# want to ensure that 'foo' isn't identified as a disabled initscript
# even though below we are mocking it to show as not enabled (since
# only 'baz' will be considered an enabled sysv service).
listdir_mock = MagicMock(return_value=["foo", "bar", "baz", "README"])
sd_mock = MagicMock(
return_value={x.replace(".service", "") for x in systemctl_status}
)
access_mock = MagicMock(
side_effect=lambda x, y: x != os.path.join(systemd.INITSCRIPT_PATH, "README")
)
sysv_enabled_mock = MagicMock(side_effect=lambda x, _: x == "baz")
with patch.dict(systemd.__salt__, {"cmd.run": cmd_mock}):
with patch.object(os, "listdir", listdir_mock):
with patch.object(systemd, "_get_systemd_services", sd_mock):
with patch.object(os, "access", side_effect=access_mock):
with patch.object(systemd, "_sysv_enabled", sysv_enabled_mock):
assert systemd.get_disabled() == [
"bar",
"service11",
"service2",
"service5",
"service8",
"timer11.timer",
"timer2.timer",
"timer5.timer",
"timer8.timer",
]
def test_get_static(list_unit_files, systemctl_status):
"""
Test to return a list of all disabled services
"""
cmd_mock = MagicMock(return_value=list_unit_files)
# 'foo' should collide with the systemd services (as returned by
# sd_mock) and thus not be returned by _get_sysv_services(). It doesn't
# matter that it's not part of the _LIST_UNIT_FILES output, we just
# want to ensure that 'foo' isn't identified as a disabled initscript
# even though below we are mocking it to show as not enabled (since
# only 'baz' will be considered an enabled sysv service).
listdir_mock = MagicMock(return_value=["foo", "bar", "baz", "README"])
sd_mock = MagicMock(
return_value={x.replace(".service", "") for x in systemctl_status}
)
access_mock = MagicMock(
side_effect=lambda x, y: x != os.path.join(systemd.INITSCRIPT_PATH, "README")
)
sysv_enabled_mock = MagicMock(side_effect=lambda x, _: x == "baz")
with patch.dict(systemd.__salt__, {"cmd.run": cmd_mock}):
with patch.object(os, "listdir", listdir_mock):
with patch.object(systemd, "_get_systemd_services", sd_mock):
with patch.object(os, "access", side_effect=access_mock):
with patch.object(systemd, "_sysv_enabled", sysv_enabled_mock):
assert systemd.get_static() == [
"service12",
"service3",
"service6",
"service9",
"timer12.timer",
"timer3.timer",
"timer6.timer",
"timer9.timer",
]
def test_get_all():
"""
Test to return a list of all available services
"""
listdir_mock = MagicMock(
side_effect=[
["foo.service", "multi-user.target.wants", "mytimer.timer"],
[],
["foo.service", "multi-user.target.wants", "bar.service"],
["mysql", "nginx", "README"],
]
)
access_mock = MagicMock(
side_effect=lambda x, y: x != os.path.join(systemd.INITSCRIPT_PATH, "README")
)
with patch.object(os, "listdir", listdir_mock):
with patch.object(os, "access", side_effect=access_mock):
assert systemd.get_all() == [
"bar",
"foo",
"mysql",
"mytimer.timer",
"nginx",
]
def test_available(systemctl_status, systemctl_status_gte_231):
"""
Test to check that the given service is available
"""
mock = MagicMock(side_effect=lambda x: systemctl_status[x])
# systemd < 231
with patch.dict(systemd.__context__, {"salt.utils.systemd.version": 230}):
with patch.object(systemd, "_systemctl_status", mock), patch.object(
systemd, "offline", MagicMock(return_value=False)
):
assert systemd.available("sshd.service") is True
assert systemd.available("foo.service") is False
# systemd >= 231
with patch.dict(systemd.__context__, {"salt.utils.systemd.version": 231}):
with patch.dict(systemctl_status, systemctl_status_gte_231):
with patch.object(systemd, "_systemctl_status", mock), patch.object(
systemd, "offline", MagicMock(return_value=False)
):
assert systemd.available("sshd.service") is True
assert systemd.available("bar.service") is False
# systemd < 231 with retcode/output changes backported (e.g. RHEL 7.3)
with patch.dict(systemd.__context__, {"salt.utils.systemd.version": 219}):
with patch.dict(systemctl_status, systemctl_status_gte_231):
with patch.object(systemd, "_systemctl_status", mock), patch.object(
systemd, "offline", MagicMock(return_value=False)
):
assert systemd.available("sshd.service") is True
assert systemd.available("bar.service") is False
def test_missing(systemctl_status, systemctl_status_gte_231):
"""
Test to the inverse of service.available.
"""
mock = MagicMock(side_effect=lambda x: systemctl_status[x])
# systemd < 231
with patch.dict(systemd.__context__, {"salt.utils.systemd.version": 230}):
with patch.object(systemd, "_systemctl_status", mock), patch.object(
systemd, "offline", MagicMock(return_value=False)
):
assert systemd.missing("sshd.service") is False
assert systemd.missing("foo.service") is True
# systemd >= 231
with patch.dict(systemd.__context__, {"salt.utils.systemd.version": 231}):
with patch.dict(systemctl_status, systemctl_status_gte_231):
with patch.object(systemd, "_systemctl_status", mock), patch.object(
systemd, "offline", MagicMock(return_value=False)
):
assert systemd.missing("sshd.service") is False
assert systemd.missing("bar.service") is True
# systemd < 231 with retcode/output changes backported (e.g. RHEL 7.3)
with patch.dict(systemd.__context__, {"salt.utils.systemd.version": 219}):
with patch.dict(systemctl_status, systemctl_status_gte_231):
with patch.object(systemd, "_systemctl_status", mock), patch.object(
systemd, "offline", MagicMock(return_value=False)
):
assert systemd.missing("sshd.service") is False
assert systemd.missing("bar.service") is True
def test_show():
"""
Test to show properties of one or more units/jobs or the manager
"""
show_output = "a=b\nc=d\ne={ f=g ; h=i }\nWants=foo.service bar.service\n"
mock = MagicMock(return_value=show_output)
with patch.dict(systemd.__salt__, {"cmd.run": mock}):
assert systemd.show("sshd") == {
"a": "b",
"c": "d",
"e": {"f": "g", "h": "i"},
"Wants": ["foo.service", "bar.service"],
}
def test_execs():
"""
Test to return a list of all files specified as ``ExecStart`` for all
services
"""
mock = MagicMock(return_value=["a", "b"])
with patch.object(systemd, "get_all", mock):
mock = MagicMock(return_value={"ExecStart": {"path": "c"}})
with patch.object(systemd, "show", mock):
assert systemd.execs() == {"a": "c", "b": "c"}
@pytest.fixture()
def unit_name():
return "foo"
@pytest.fixture()
def mock_none():
return MagicMock(return_value=None)
@pytest.fixture()
def mock_success():
return MagicMock(return_value=0)
@pytest.fixture()
def mock_failure():
return MagicMock(return_value=1)
@pytest.fixture()
def mock_true():
return MagicMock(return_value=True)
@pytest.fixture()
def mock_false():
return MagicMock(return_value=False)
@pytest.fixture()
def mock_empty_list():
return MagicMock(return_value=[])
@pytest.fixture()
def mock_run_all_success():
return MagicMock(
return_value={"retcode": 0, "stdout": "", "stderr": "", "pid": 12345}
)
@pytest.fixture()
def mock_run_all_failure():
return MagicMock(
return_value={"retcode": 1, "stdout": "", "stderr": "", "pid": 12345}
)
@pytest.mark.parametrize(
"action,no_block",
[
["start", False],
["start", True],
["stop", False],
["stop", True],
["restart", False],
["restart", True],
["reload_", False],
["reload_", True],
["force_reload", False],
["force_reload", True],
["enable", False],
["enable", True],
["disable", False],
["disable", True],
],
)
def test_change_state(
unit_name,
mock_none,
mock_empty_list,
mock_true,
mock_false,
mock_run_all_success,
mock_run_all_failure,
action,
no_block,
):
"""
Common code for start/stop/restart/reload/force_reload tests
"""
# We want the traceback if the function name can't be found in the
# systemd execution module.
func = getattr(systemd, action)
# Remove trailing _ in "reload_"
action = action.rstrip("_").replace("_", "-")
systemctl_command = ["/bin/systemctl"]
if no_block:
systemctl_command.append("--no-block")
systemctl_command.extend([action, unit_name + ".service"])
scope_prefix = ["/bin/systemd-run", "--scope"]
assert_kwargs = {"python_shell": False}
if action in ("enable", "disable"):
assert_kwargs["ignore_retcode"] = True
with patch("salt.utils.path.which", lambda x: "/bin/" + x):
with patch.object(systemd, "_check_for_unit_changes", mock_none):
with patch.object(systemd, "_unit_file_changed", mock_none):
with patch.object(systemd, "_check_unmask", mock_none):
with patch.object(systemd, "_get_sysv_services", mock_empty_list):
# Has scopes available
with patch.object(salt.utils.systemd, "has_scope", mock_true):
# Scope enabled, successful
with patch.dict(
systemd.__salt__,
{
"config.get": mock_true,
"cmd.run_all": mock_run_all_success,
},
):
ret = func(unit_name, no_block=no_block)
assert ret is True
mock_run_all_success.assert_called_with(
scope_prefix + systemctl_command, **assert_kwargs
)
# Scope enabled, failed
with patch.dict(
systemd.__salt__,
{
"config.get": mock_true,
"cmd.run_all": mock_run_all_failure,
},
):
if action in ("stop", "disable"):
ret = func(unit_name, no_block=no_block)
assert ret is False
else:
with pytest.raises(CommandExecutionError):
func(unit_name, no_block=no_block)
mock_run_all_failure.assert_called_with(
scope_prefix + systemctl_command, **assert_kwargs
)
# Scope disabled, successful
with patch.dict(
systemd.__salt__,
{
"config.get": mock_false,
"cmd.run_all": mock_run_all_success,
},
):
ret = func(unit_name, no_block=no_block)
assert ret is True
mock_run_all_success.assert_called_with(
systemctl_command, **assert_kwargs
)
# Scope disabled, failed
with patch.dict(
systemd.__salt__,
{
"config.get": mock_false,
"cmd.run_all": mock_run_all_failure,
},
):
if action in ("stop", "disable"):
ret = func(unit_name, no_block=no_block)
assert ret is False
else:
with pytest.raises(CommandExecutionError):
func(unit_name, no_block=no_block)
mock_run_all_failure.assert_called_with(
systemctl_command, **assert_kwargs
)
# Does not have scopes available
with patch.object(salt.utils.systemd, "has_scope", mock_false):
# The results should be the same irrespective of
# whether or not scope is enabled, since scope is not
# available, so we repeat the below tests with it both
# enabled and disabled.
for scope_mock in (mock_true, mock_false):
# Successful
with patch.dict(
systemd.__salt__,
{
"config.get": scope_mock,
"cmd.run_all": mock_run_all_success,
},
):
ret = func(unit_name, no_block=no_block)
assert ret is True
mock_run_all_success.assert_called_with(
systemctl_command, **assert_kwargs
)
# Failed
with patch.dict(
systemd.__salt__,
{
"config.get": scope_mock,
"cmd.run_all": mock_run_all_failure,
},
):
if action in ("stop", "disable"):
ret = func(unit_name, no_block=no_block)
assert ret is False
else:
with pytest.raises(CommandExecutionError):
func(unit_name, no_block=no_block)
mock_run_all_failure.assert_called_with(
systemctl_command, **assert_kwargs
)
@pytest.mark.parametrize(
"action,runtime",
[
["mask", False],
["mask", True],
["unmask_", False],
["unmask_", True],
],
)
def test_mask_unmask(
unit_name,
mock_none,
mock_true,
mock_false,
mock_run_all_success,
mock_run_all_failure,
action,
runtime,
):
"""
Common code for mask/unmask tests
"""
# We want the traceback if the function name can't be found in the
# systemd execution module, so don't provide a fallback value for the
# call to getattr() here.
func = getattr(systemd, action)
# Remove trailing _ in "unmask_"
action = action.rstrip("_").replace("_", "-")
systemctl_command = ["/bin/systemctl", action]
if runtime:
systemctl_command.append("--runtime")
systemctl_command.append(unit_name + ".service")
scope_prefix = ["/bin/systemd-run", "--scope"]
args = [unit_name, runtime]
masked_mock = mock_true if action == "unmask" else mock_false
with patch("salt.utils.path.which", lambda x: "/bin/" + x):
with patch.object(systemd, "_check_for_unit_changes", mock_none):
if action == "unmask":
mock_not_run = MagicMock(
return_value={
"retcode": 0,
"stdout": "",
"stderr": "",
"pid": 12345,
}
)
with patch.dict(systemd.__salt__, {"cmd.run_all": mock_not_run}):
with patch.object(systemd, "masked", mock_false):
# Test not masked (should take no action and return True)
assert systemd.unmask_(unit_name) is True
# Also should not have called cmd.run_all
assert mock_not_run.call_count == 0
with patch.object(systemd, "masked", masked_mock):
# Has scopes available
with patch.object(salt.utils.systemd, "has_scope", mock_true):
# Scope enabled, successful
with patch.dict(
systemd.__salt__,
{
"config.get": mock_true,
"cmd.run_all": mock_run_all_success,
},
):
ret = func(*args)
assert ret is True
mock_run_all_success.assert_called_with(
scope_prefix + systemctl_command,
python_shell=False,
redirect_stderr=True,
)
# Scope enabled, failed
with patch.dict(
systemd.__salt__,
{
"config.get": mock_true,
"cmd.run_all": mock_run_all_failure,
},
):
with pytest.raises(CommandExecutionError):
func(*args)
mock_run_all_failure.assert_called_with(
scope_prefix + systemctl_command,
python_shell=False,
redirect_stderr=True,
)
# Scope disabled, successful
with patch.dict(
systemd.__salt__,
{
"config.get": mock_false,
"cmd.run_all": mock_run_all_success,
},
):
ret = func(*args)
assert ret is True
mock_run_all_success.assert_called_with(
systemctl_command,
python_shell=False,
redirect_stderr=True,
)
# Scope disabled, failed
with patch.dict(
systemd.__salt__,
{
"config.get": mock_false,
"cmd.run_all": mock_run_all_failure,
},
):
with pytest.raises(CommandExecutionError):
func(*args)
mock_run_all_failure.assert_called_with(
systemctl_command,
python_shell=False,
redirect_stderr=True,
)
# Does not have scopes available
with patch.object(salt.utils.systemd, "has_scope", mock_false):
# The results should be the same irrespective of
# whether or not scope is enabled, since scope is not
# available, so we repeat the below tests with it both
# enabled and disabled.
for scope_mock in (mock_true, mock_false):
# Successful
with patch.dict(
systemd.__salt__,
{
"config.get": scope_mock,
"cmd.run_all": mock_run_all_success,
},
):
ret = func(*args)
assert ret is True
mock_run_all_success.assert_called_with(
systemctl_command,
python_shell=False,
redirect_stderr=True,
)
# Failed
with patch.dict(
systemd.__salt__,
{
"config.get": scope_mock,
"cmd.run_all": mock_run_all_failure,
},
):
with pytest.raises(CommandExecutionError):
func(*args)
mock_run_all_failure.assert_called_with(
systemctl_command,
python_shell=False,
redirect_stderr=True,
)
def test_firstboot():
"""
Test service.firstboot without parameters
"""
result = {"retcode": 0, "stdout": "stdout"}
salt_mock = {
"cmd.run_all": MagicMock(return_value=result),
}
with patch("salt.utils.path.which", lambda x: "/bin/" + x):
with patch.dict(systemd.__salt__, salt_mock):
assert systemd.firstboot()
salt_mock["cmd.run_all"].assert_called_with(["/bin/systemd-firstboot"])
def test_firstboot_params():
"""
Test service.firstboot with parameters
"""
result = {"retcode": 0, "stdout": "stdout"}
salt_mock = {
"cmd.run_all": MagicMock(return_value=result),
}
with patch("salt.utils.path.which", lambda x: "/bin/" + x):
with patch.dict(systemd.__salt__, salt_mock):
assert systemd.firstboot(
locale="en_US.UTF-8",
locale_message="en_US.UTF-8",
keymap="jp",
timezone="Europe/Berlin",
hostname="node-001",
machine_id="1234567890abcdef",
root="/mnt",
)
salt_mock["cmd.run_all"].assert_called_with(
[
"/bin/systemd-firstboot",
"--locale",
"en_US.UTF-8",
"--locale-message",
"en_US.UTF-8",
"--keymap",
"jp",
"--timezone",
"Europe/Berlin",
"--hostname",
"node-001",
"--machine-ID",
"1234567890abcdef",
"--root",
"/mnt",
]
)
def test_firstboot_error():
"""
Test service.firstboot error
"""
result = {"retcode": 1, "stderr": "error"}
salt_mock = {
"cmd.run_all": MagicMock(return_value=result),
}
with patch.dict(systemd.__salt__, salt_mock):
with pytest.raises(CommandExecutionError):
assert systemd.firstboot()

View file

@ -0,0 +1,164 @@
import pytest
import salt.states.gpg as gpg
from tests.support.mock import Mock, patch
@pytest.fixture
def configure_loader_modules():
return {gpg: {"__opts__": {"test": False}}}
@pytest.fixture
def keys_list():
return [
{
"keyid": "A",
"fingerprint": "A",
"uids": ["Key A"],
"created": "2010-04-01",
"keyLength": "4096",
"ownerTrust": "Ultimately Trusted",
"trust": "Ultimately Trusted",
},
{
"keyid": "B",
"fingerprint": "B",
"uids": ["Key B"],
"created": "2017-03-06",
"keyLength": "4096",
"ownerTrust": "Unknown",
"trust": "Fully Trusted",
},
{
"keyid": "C",
"fingerprint": "C",
"uids": ["Key C"],
"expires": "2022-06-24",
"created": "2018-06-24",
"keyLength": "4096",
"ownerTrust": "Unknown",
"trust": "Expired",
},
{
"keyid": "D",
"fingerprint": "D",
"uids": ["Key D"],
"created": "2018-01-18",
"keyLength": "3072",
"ownerTrust": "Unknown",
"trust": "Unknown",
},
{
"keyid": "E",
"fingerprint": "E",
"uids": ["Key E"],
"expires": "2222-11-18",
"created": "2019-11-20",
"keyLength": "4096",
"ownerTrust": "Unknown",
"trust": "Unknown",
},
]
@pytest.fixture
def gpg_list_keys(request, keys_list):
list_ = Mock(spec="salt.modules.gpg.list_keys")
list_.return_value = getattr(request, "param", keys_list)
with patch.dict(gpg.__salt__, {"gpg.list_keys": list_}):
yield list_
@pytest.fixture
def gpg_trust(request):
trust = Mock(spec="salt.modules.gpg.trust_key")
trust.return_value = getattr(
request,
"param",
{"res": True, "message": "Setting ownership trust to Marginally"},
)
with patch.dict(gpg.__salt__, {"gpg.trust_key": trust}):
yield trust
@pytest.fixture()
def gpg_receive(request):
recv = Mock(spec="salt.modules.gpg.receive_keys")
recv.return_value = getattr(
request, "param", {"res": True, "message": ["Key new added to keychain"]}
)
with patch.dict(gpg.__salt__, {"gpg.receive_keys": recv}):
yield recv
@pytest.mark.usefixtures("gpg_list_keys")
@pytest.mark.parametrize(
"gpg_trust,expected",
[
({"res": True, "message": "Setting ownership trust to Marginally"}, True),
({"res": False, "message": "KeyID A not in GPG keychain"}, False),
],
indirect=["gpg_trust"],
)
def test_gpg_present_trust_change(gpg_receive, gpg_trust, expected):
ret = gpg.present("A", trust="marginally")
assert ret["result"] is expected
assert bool(ret["changes"]) is expected
gpg_trust.assert_called_once()
gpg_receive.assert_not_called()
@pytest.mark.usefixtures("gpg_list_keys")
@pytest.mark.parametrize(
"gpg_receive,expected",
[
({"res": True, "message": ["Key new added to keychain"]}, True),
(
{
"res": False,
"message": [
"Something went wrong during gpg call: gpg: key new: no user ID"
],
},
False,
),
],
indirect=["gpg_receive"],
)
def test_gpg_present_new_key(gpg_receive, gpg_trust, expected):
ret = gpg.present("new")
assert ret["result"] is expected
assert bool(ret["changes"]) is expected
gpg_receive.assert_called_once()
gpg_trust.assert_not_called()
@pytest.mark.usefixtures("gpg_list_keys")
@pytest.mark.parametrize(
"gpg_trust,expected",
[
({"res": True, "message": "Setting ownership trust to Marginally"}, True),
({"res": False, "message": "KeyID A not in GPG keychain"}, False),
],
indirect=["gpg_trust"],
)
@pytest.mark.usefixtures("gpg_list_keys")
def test_gpg_present_new_key_and_trust(gpg_receive, gpg_trust, expected):
ret = gpg.present("new", trust="marginally")
assert ret["result"] is expected
# the key is always marked as added
assert ret["changes"]
gpg_receive.assert_called_once()
gpg_trust.assert_called_once()
@pytest.mark.usefixtures("gpg_list_keys")
@pytest.mark.parametrize("key,trust", [("new", None), ("A", "marginally")])
def test_gpg_present_test_mode_no_changes(gpg_receive, gpg_trust, key, trust):
with patch.dict(gpg.__opts__, {"test": True}):
ret = gpg.present(key, trust=trust)
gpg_receive.assert_not_called()
gpg_trust.assert_not_called()
assert ret["result"] is None
assert ret["changes"]

View file

@ -37,6 +37,7 @@ def test_saltutil_sync_all_nochange():
"matchers": [],
"serializers": [],
"wrapper": [],
"tops": [],
}
state_id = "somename"
state_result = {
@ -73,6 +74,7 @@ def test_saltutil_sync_all_test():
"matchers": [],
"serializers": [],
"wrapper": [],
"tops": [],
}
state_id = "somename"
state_result = {
@ -110,6 +112,7 @@ def test_saltutil_sync_all_change():
"matchers": [],
"serializers": [],
"wrapper": [],
"tops": [],
}
state_id = "somename"
state_result = {

View file

@ -1,757 +0,0 @@
"""
:codeauthor: Rahul Handay <rahulha@saltstack.com>
"""
import os
import pytest
import salt.modules.systemd_service as systemd
import salt.utils.systemd
from salt.exceptions import CommandExecutionError
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.mock import MagicMock, patch
from tests.support.unit import TestCase
_SYSTEMCTL_STATUS = {
"sshd.service": {
"stdout": """\
* sshd.service - OpenSSH Daemon
Loaded: loaded (/usr/lib/systemd/system/sshd.service; disabled; vendor preset: disabled)
Active: inactive (dead)""",
"stderr": "",
"retcode": 3,
"pid": 12345,
},
"foo.service": {
"stdout": """\
* foo.service
Loaded: not-found (Reason: No such file or directory)
Active: inactive (dead)""",
"stderr": "",
"retcode": 3,
"pid": 12345,
},
}
# This reflects systemd >= 231 behavior
_SYSTEMCTL_STATUS_GTE_231 = {
"bar.service": {
"stdout": "Unit bar.service could not be found.",
"stderr": "",
"retcode": 4,
"pid": 12345,
},
}
_LIST_UNIT_FILES = """\
service1.service enabled -
service2.service disabled -
service3.service static -
timer1.timer enabled -
timer2.timer disabled -
timer3.timer static -
service4.service enabled enabled
service5.service disabled enabled
service6.service static enabled
timer4.timer enabled enabled
timer5.timer disabled enabled
timer6.timer static enabled
service7.service enabled disabled
service8.service disabled disabled
service9.service static disabled
timer7.timer enabled disabled
timer8.timer disabled disabled
timer9.timer static disabled
service10.service enabled
service11.service disabled
service12.service static
timer10.timer enabled
timer11.timer disabled
timer12.timer static"""
class SystemdTestCase(TestCase, LoaderModuleMockMixin):
"""
Test case for salt.modules.systemd
"""
def setup_loader_modules(self):
return {systemd: {}}
def test_systemctl_reload(self):
"""
Test to Reloads systemctl
"""
mock = MagicMock(
side_effect=[
{"stdout": "Who knows why?", "stderr": "", "retcode": 1, "pid": 12345},
{"stdout": "", "stderr": "", "retcode": 0, "pid": 54321},
]
)
with patch.dict(systemd.__salt__, {"cmd.run_all": mock}):
self.assertRaisesRegex(
CommandExecutionError,
"Problem performing systemctl daemon-reload: Who knows why?",
systemd.systemctl_reload,
)
self.assertTrue(systemd.systemctl_reload())
def test_get_enabled(self):
"""
Test to return a list of all enabled services
"""
cmd_mock = MagicMock(return_value=_LIST_UNIT_FILES)
listdir_mock = MagicMock(return_value=["foo", "bar", "baz", "README"])
sd_mock = MagicMock(
return_value={x.replace(".service", "") for x in _SYSTEMCTL_STATUS}
)
access_mock = MagicMock(
side_effect=lambda x, y: x
!= os.path.join(systemd.INITSCRIPT_PATH, "README")
)
sysv_enabled_mock = MagicMock(side_effect=lambda x, _: x == "baz")
with patch.dict(systemd.__salt__, {"cmd.run": cmd_mock}):
with patch.object(os, "listdir", listdir_mock):
with patch.object(systemd, "_get_systemd_services", sd_mock):
with patch.object(os, "access", side_effect=access_mock):
with patch.object(systemd, "_sysv_enabled", sysv_enabled_mock):
self.assertListEqual(
systemd.get_enabled(),
[
"baz",
"service1",
"service10",
"service4",
"service7",
"timer1.timer",
"timer10.timer",
"timer4.timer",
"timer7.timer",
],
)
def test_get_disabled(self):
"""
Test to return a list of all disabled services
"""
cmd_mock = MagicMock(return_value=_LIST_UNIT_FILES)
# 'foo' should collide with the systemd services (as returned by
# sd_mock) and thus not be returned by _get_sysv_services(). It doesn't
# matter that it's not part of the _LIST_UNIT_FILES output, we just
# want to ensure that 'foo' isn't identified as a disabled initscript
# even though below we are mocking it to show as not enabled (since
# only 'baz' will be considered an enabled sysv service).
listdir_mock = MagicMock(return_value=["foo", "bar", "baz", "README"])
sd_mock = MagicMock(
return_value={x.replace(".service", "") for x in _SYSTEMCTL_STATUS}
)
access_mock = MagicMock(
side_effect=lambda x, y: x
!= os.path.join(systemd.INITSCRIPT_PATH, "README")
)
sysv_enabled_mock = MagicMock(side_effect=lambda x, _: x == "baz")
with patch.dict(systemd.__salt__, {"cmd.run": cmd_mock}):
with patch.object(os, "listdir", listdir_mock):
with patch.object(systemd, "_get_systemd_services", sd_mock):
with patch.object(os, "access", side_effect=access_mock):
with patch.object(systemd, "_sysv_enabled", sysv_enabled_mock):
self.assertListEqual(
systemd.get_disabled(),
[
"bar",
"service11",
"service2",
"service5",
"service8",
"timer11.timer",
"timer2.timer",
"timer5.timer",
"timer8.timer",
],
)
def test_get_static(self):
"""
Test to return a list of all disabled services
"""
cmd_mock = MagicMock(return_value=_LIST_UNIT_FILES)
# 'foo' should collide with the systemd services (as returned by
# sd_mock) and thus not be returned by _get_sysv_services(). It doesn't
# matter that it's not part of the _LIST_UNIT_FILES output, we just
# want to ensure that 'foo' isn't identified as a disabled initscript
# even though below we are mocking it to show as not enabled (since
# only 'baz' will be considered an enabled sysv service).
listdir_mock = MagicMock(return_value=["foo", "bar", "baz", "README"])
sd_mock = MagicMock(
return_value={x.replace(".service", "") for x in _SYSTEMCTL_STATUS}
)
access_mock = MagicMock(
side_effect=lambda x, y: x
!= os.path.join(systemd.INITSCRIPT_PATH, "README")
)
sysv_enabled_mock = MagicMock(side_effect=lambda x, _: x == "baz")
with patch.dict(systemd.__salt__, {"cmd.run": cmd_mock}):
with patch.object(os, "listdir", listdir_mock):
with patch.object(systemd, "_get_systemd_services", sd_mock):
with patch.object(os, "access", side_effect=access_mock):
with patch.object(systemd, "_sysv_enabled", sysv_enabled_mock):
self.assertListEqual(
systemd.get_static(),
[
"service12",
"service3",
"service6",
"service9",
"timer12.timer",
"timer3.timer",
"timer6.timer",
"timer9.timer",
],
)
def test_get_all(self):
"""
Test to return a list of all available services
"""
listdir_mock = MagicMock(
side_effect=[
["foo.service", "multi-user.target.wants", "mytimer.timer"],
[],
["foo.service", "multi-user.target.wants", "bar.service"],
["mysql", "nginx", "README"],
]
)
access_mock = MagicMock(
side_effect=lambda x, y: x
!= os.path.join(systemd.INITSCRIPT_PATH, "README")
)
with patch.object(os, "listdir", listdir_mock):
with patch.object(os, "access", side_effect=access_mock):
self.assertListEqual(
systemd.get_all(), ["bar", "foo", "mysql", "mytimer.timer", "nginx"]
)
def test_available(self):
"""
Test to check that the given service is available
"""
mock = MagicMock(side_effect=lambda x: _SYSTEMCTL_STATUS[x])
# systemd < 231
with patch.dict(systemd.__context__, {"salt.utils.systemd.version": 230}):
with patch.object(systemd, "_systemctl_status", mock), patch.object(
systemd, "offline", MagicMock(return_value=False)
):
self.assertTrue(systemd.available("sshd.service"))
self.assertFalse(systemd.available("foo.service"))
# systemd >= 231
with patch.dict(systemd.__context__, {"salt.utils.systemd.version": 231}):
with patch.dict(_SYSTEMCTL_STATUS, _SYSTEMCTL_STATUS_GTE_231):
with patch.object(systemd, "_systemctl_status", mock), patch.object(
systemd, "offline", MagicMock(return_value=False)
):
self.assertTrue(systemd.available("sshd.service"))
self.assertFalse(systemd.available("bar.service"))
# systemd < 231 with retcode/output changes backported (e.g. RHEL 7.3)
with patch.dict(systemd.__context__, {"salt.utils.systemd.version": 219}):
with patch.dict(_SYSTEMCTL_STATUS, _SYSTEMCTL_STATUS_GTE_231):
with patch.object(systemd, "_systemctl_status", mock), patch.object(
systemd, "offline", MagicMock(return_value=False)
):
self.assertTrue(systemd.available("sshd.service"))
self.assertFalse(systemd.available("bar.service"))
def test_missing(self):
"""
Test to the inverse of service.available.
"""
mock = MagicMock(side_effect=lambda x: _SYSTEMCTL_STATUS[x])
# systemd < 231
with patch.dict(systemd.__context__, {"salt.utils.systemd.version": 230}):
with patch.object(systemd, "_systemctl_status", mock), patch.object(
systemd, "offline", MagicMock(return_value=False)
):
self.assertFalse(systemd.missing("sshd.service"))
self.assertTrue(systemd.missing("foo.service"))
# systemd >= 231
with patch.dict(systemd.__context__, {"salt.utils.systemd.version": 231}):
with patch.dict(_SYSTEMCTL_STATUS, _SYSTEMCTL_STATUS_GTE_231):
with patch.object(systemd, "_systemctl_status", mock), patch.object(
systemd, "offline", MagicMock(return_value=False)
):
self.assertFalse(systemd.missing("sshd.service"))
self.assertTrue(systemd.missing("bar.service"))
# systemd < 231 with retcode/output changes backported (e.g. RHEL 7.3)
with patch.dict(systemd.__context__, {"salt.utils.systemd.version": 219}):
with patch.dict(_SYSTEMCTL_STATUS, _SYSTEMCTL_STATUS_GTE_231):
with patch.object(systemd, "_systemctl_status", mock), patch.object(
systemd, "offline", MagicMock(return_value=False)
):
self.assertFalse(systemd.missing("sshd.service"))
self.assertTrue(systemd.missing("bar.service"))
def test_show(self):
"""
Test to show properties of one or more units/jobs or the manager
"""
show_output = "a=b\nc=d\ne={ f=g ; h=i }\nWants=foo.service bar.service\n"
mock = MagicMock(return_value=show_output)
with patch.dict(systemd.__salt__, {"cmd.run": mock}):
self.assertDictEqual(
systemd.show("sshd"),
{
"a": "b",
"c": "d",
"e": {"f": "g", "h": "i"},
"Wants": ["foo.service", "bar.service"],
},
)
def test_execs(self):
"""
Test to return a list of all files specified as ``ExecStart`` for all
services
"""
mock = MagicMock(return_value=["a", "b"])
with patch.object(systemd, "get_all", mock):
mock = MagicMock(return_value={"ExecStart": {"path": "c"}})
with patch.object(systemd, "show", mock):
self.assertDictEqual(systemd.execs(), {"a": "c", "b": "c"})
class SystemdScopeTestCase(TestCase, LoaderModuleMockMixin):
"""
Test case for salt.modules.systemd, for functions which use systemd
scopes
"""
def setup_loader_modules(self):
return {systemd: {}}
unit_name = "foo"
mock_none = MagicMock(return_value=None)
mock_success = MagicMock(return_value=0)
mock_failure = MagicMock(return_value=1)
mock_true = MagicMock(return_value=True)
mock_false = MagicMock(return_value=False)
mock_empty_list = MagicMock(return_value=[])
mock_run_all_success = MagicMock(
return_value={"retcode": 0, "stdout": "", "stderr": "", "pid": 12345}
)
mock_run_all_failure = MagicMock(
return_value={"retcode": 1, "stdout": "", "stderr": "", "pid": 12345}
)
def _change_state(self, action, no_block=False):
"""
Common code for start/stop/restart/reload/force_reload tests
"""
# We want the traceback if the function name can't be found in the
# systemd execution module.
func = getattr(systemd, action)
# Remove trailing _ in "reload_"
action = action.rstrip("_").replace("_", "-")
systemctl_command = ["/bin/systemctl"]
if no_block:
systemctl_command.append("--no-block")
systemctl_command.extend([action, self.unit_name + ".service"])
scope_prefix = ["/bin/systemd-run", "--scope"]
assert_kwargs = {"python_shell": False}
if action in ("enable", "disable"):
assert_kwargs["ignore_retcode"] = True
with patch("salt.utils.path.which", lambda x: "/bin/" + x):
with patch.object(systemd, "_check_for_unit_changes", self.mock_none):
with patch.object(systemd, "_unit_file_changed", self.mock_none):
with patch.object(systemd, "_check_unmask", self.mock_none):
with patch.object(
systemd, "_get_sysv_services", self.mock_empty_list
):
# Has scopes available
with patch.object(
salt.utils.systemd, "has_scope", self.mock_true
):
# Scope enabled, successful
with patch.dict(
systemd.__salt__,
{
"config.get": self.mock_true,
"cmd.run_all": self.mock_run_all_success,
},
):
ret = func(self.unit_name, no_block=no_block)
self.assertTrue(ret)
self.mock_run_all_success.assert_called_with(
scope_prefix + systemctl_command,
**assert_kwargs
)
# Scope enabled, failed
with patch.dict(
systemd.__salt__,
{
"config.get": self.mock_true,
"cmd.run_all": self.mock_run_all_failure,
},
):
if action in ("stop", "disable"):
ret = func(self.unit_name, no_block=no_block)
self.assertFalse(ret)
else:
self.assertRaises(
CommandExecutionError,
func,
self.unit_name,
no_block=no_block,
)
self.mock_run_all_failure.assert_called_with(
scope_prefix + systemctl_command,
**assert_kwargs
)
# Scope disabled, successful
with patch.dict(
systemd.__salt__,
{
"config.get": self.mock_false,
"cmd.run_all": self.mock_run_all_success,
},
):
ret = func(self.unit_name, no_block=no_block)
self.assertTrue(ret)
self.mock_run_all_success.assert_called_with(
systemctl_command, **assert_kwargs
)
# Scope disabled, failed
with patch.dict(
systemd.__salt__,
{
"config.get": self.mock_false,
"cmd.run_all": self.mock_run_all_failure,
},
):
if action in ("stop", "disable"):
ret = func(self.unit_name, no_block=no_block)
self.assertFalse(ret)
else:
self.assertRaises(
CommandExecutionError,
func,
self.unit_name,
no_block=no_block,
)
self.mock_run_all_failure.assert_called_with(
systemctl_command, **assert_kwargs
)
# Does not have scopes available
with patch.object(
salt.utils.systemd, "has_scope", self.mock_false
):
# The results should be the same irrespective of
# whether or not scope is enabled, since scope is not
# available, so we repeat the below tests with it both
# enabled and disabled.
for scope_mock in (self.mock_true, self.mock_false):
# Successful
with patch.dict(
systemd.__salt__,
{
"config.get": scope_mock,
"cmd.run_all": self.mock_run_all_success,
},
):
ret = func(self.unit_name, no_block=no_block)
self.assertTrue(ret)
self.mock_run_all_success.assert_called_with(
systemctl_command, **assert_kwargs
)
# Failed
with patch.dict(
systemd.__salt__,
{
"config.get": scope_mock,
"cmd.run_all": self.mock_run_all_failure,
},
):
if action in ("stop", "disable"):
ret = func(
self.unit_name, no_block=no_block
)
self.assertFalse(ret)
else:
self.assertRaises(
CommandExecutionError,
func,
self.unit_name,
no_block=no_block,
)
self.mock_run_all_failure.assert_called_with(
systemctl_command, **assert_kwargs
)
def _mask_unmask(self, action, runtime):
"""
Common code for mask/unmask tests
"""
# We want the traceback if the function name can't be found in the
# systemd execution module, so don't provide a fallback value for the
# call to getattr() here.
func = getattr(systemd, action)
# Remove trailing _ in "unmask_"
action = action.rstrip("_").replace("_", "-")
systemctl_command = ["/bin/systemctl", action]
if runtime:
systemctl_command.append("--runtime")
systemctl_command.append(self.unit_name + ".service")
scope_prefix = ["/bin/systemd-run", "--scope"]
args = [self.unit_name, runtime]
masked_mock = self.mock_true if action == "unmask" else self.mock_false
with patch("salt.utils.path.which", lambda x: "/bin/" + x):
with patch.object(systemd, "_check_for_unit_changes", self.mock_none):
if action == "unmask":
mock_not_run = MagicMock(
return_value={
"retcode": 0,
"stdout": "",
"stderr": "",
"pid": 12345,
}
)
with patch.dict(systemd.__salt__, {"cmd.run_all": mock_not_run}):
with patch.object(systemd, "masked", self.mock_false):
# Test not masked (should take no action and return True)
self.assertTrue(systemd.unmask_(self.unit_name))
# Also should not have called cmd.run_all
self.assertTrue(mock_not_run.call_count == 0)
with patch.object(systemd, "masked", masked_mock):
# Has scopes available
with patch.object(salt.utils.systemd, "has_scope", self.mock_true):
# Scope enabled, successful
with patch.dict(
systemd.__salt__,
{
"config.get": self.mock_true,
"cmd.run_all": self.mock_run_all_success,
},
):
ret = func(*args)
self.assertTrue(ret)
self.mock_run_all_success.assert_called_with(
scope_prefix + systemctl_command,
python_shell=False,
redirect_stderr=True,
)
# Scope enabled, failed
with patch.dict(
systemd.__salt__,
{
"config.get": self.mock_true,
"cmd.run_all": self.mock_run_all_failure,
},
):
self.assertRaises(CommandExecutionError, func, *args)
self.mock_run_all_failure.assert_called_with(
scope_prefix + systemctl_command,
python_shell=False,
redirect_stderr=True,
)
# Scope disabled, successful
with patch.dict(
systemd.__salt__,
{
"config.get": self.mock_false,
"cmd.run_all": self.mock_run_all_success,
},
):
ret = func(*args)
self.assertTrue(ret)
self.mock_run_all_success.assert_called_with(
systemctl_command,
python_shell=False,
redirect_stderr=True,
)
# Scope disabled, failed
with patch.dict(
systemd.__salt__,
{
"config.get": self.mock_false,
"cmd.run_all": self.mock_run_all_failure,
},
):
self.assertRaises(CommandExecutionError, func, *args)
self.mock_run_all_failure.assert_called_with(
systemctl_command,
python_shell=False,
redirect_stderr=True,
)
# Does not have scopes available
with patch.object(salt.utils.systemd, "has_scope", self.mock_false):
# The results should be the same irrespective of
# whether or not scope is enabled, since scope is not
# available, so we repeat the below tests with it both
# enabled and disabled.
for scope_mock in (self.mock_true, self.mock_false):
# Successful
with patch.dict(
systemd.__salt__,
{
"config.get": scope_mock,
"cmd.run_all": self.mock_run_all_success,
},
):
ret = func(*args)
self.assertTrue(ret)
self.mock_run_all_success.assert_called_with(
systemctl_command,
python_shell=False,
redirect_stderr=True,
)
# Failed
with patch.dict(
systemd.__salt__,
{
"config.get": scope_mock,
"cmd.run_all": self.mock_run_all_failure,
},
):
self.assertRaises(CommandExecutionError, func, *args)
self.mock_run_all_failure.assert_called_with(
systemctl_command,
python_shell=False,
redirect_stderr=True,
)
def test_start(self):
self._change_state("start", no_block=False)
self._change_state("start", no_block=True)
def test_stop(self):
self._change_state("stop", no_block=False)
self._change_state("stop", no_block=True)
def test_restart(self):
self._change_state("restart", no_block=False)
self._change_state("restart", no_block=True)
def test_reload(self):
self._change_state("reload_", no_block=False)
self._change_state("reload_", no_block=True)
def test_force_reload(self):
self._change_state("force_reload", no_block=False)
self._change_state("force_reload", no_block=True)
def test_enable(self):
self._change_state("enable", no_block=False)
self._change_state("enable", no_block=True)
def test_disable(self):
self._change_state("disable", no_block=False)
self._change_state("disable", no_block=True)
def test_mask(self):
self._mask_unmask("mask", False)
def test_mask_runtime(self):
self._mask_unmask("mask", True)
def test_unmask(self):
self._mask_unmask("unmask_", False)
def test_unmask_runtime(self):
self._mask_unmask("unmask_", True)
def test_firstboot(self):
"""
Test service.firstboot without parameters
"""
result = {"retcode": 0, "stdout": "stdout"}
salt_mock = {
"cmd.run_all": MagicMock(return_value=result),
}
with patch("salt.utils.path.which", lambda x: "/bin/" + x):
with patch.dict(systemd.__salt__, salt_mock):
assert systemd.firstboot()
salt_mock["cmd.run_all"].assert_called_with(["/bin/systemd-firstboot"])
def test_firstboot_params(self):
"""
Test service.firstboot with parameters
"""
result = {"retcode": 0, "stdout": "stdout"}
salt_mock = {
"cmd.run_all": MagicMock(return_value=result),
}
with patch("salt.utils.path.which", lambda x: "/bin/" + x):
with patch.dict(systemd.__salt__, salt_mock):
assert systemd.firstboot(
locale="en_US.UTF-8",
locale_message="en_US.UTF-8",
keymap="jp",
timezone="Europe/Berlin",
hostname="node-001",
machine_id="1234567890abcdef",
root="/mnt",
)
salt_mock["cmd.run_all"].assert_called_with(
[
"/bin/systemd-firstboot",
"--locale",
"en_US.UTF-8",
"--locale-message",
"en_US.UTF-8",
"--keymap",
"jp",
"--timezone",
"Europe/Berlin",
"--hostname",
"node-001",
"--machine-ID",
"1234567890abcdef",
"--root",
"/mnt",
]
)
def test_firstboot_error(self):
"""
Test service.firstboot error
"""
result = {"retcode": 1, "stderr": "error"}
salt_mock = {
"cmd.run_all": MagicMock(return_value=result),
}
with patch.dict(systemd.__salt__, salt_mock):
with pytest.raises(CommandExecutionError):
assert systemd.firstboot()

View file

@ -379,6 +379,24 @@ class PipStateTest(TestCase, SaltReturnAssertsMixin, LoaderModuleMockMixin):
self.assertSaltTrueReturn({"test": ret})
self.assertInSaltComment("successfully installed", {"test": ret})
def test_install_with_specified_user(self):
"""
Check that if `user` parameter is set and the user does not exists
it will fail with an error, see #65458
"""
user_info = MagicMock(return_value={})
pip_version = MagicMock(return_value="10.0.1")
with patch.dict(
pip_state.__salt__,
{
"user.info": user_info,
"pip.version": pip_version,
},
):
ret = pip_state.installed("mypkg", user="fred")
self.assertSaltFalseReturn({"test": ret})
self.assertInSaltComment("User fred does not exist", {"test": ret})
class PipStateUtilsTest(TestCase):
def test_has_internal_exceptions_mod_function(self):
@ -414,7 +432,7 @@ class PipStateInstallationErrorTest(TestCase):
extra_requirements = []
for name, version in salt.version.dependency_information():
if name in ["PyYAML", "packaging", "looseversion"]:
extra_requirements.append("{}=={}".format(name, version))
extra_requirements.append(f"{name}=={version}")
failures = {}
pip_version_requirements = [
# Latest pip 18
@ -453,7 +471,7 @@ class PipStateInstallationErrorTest(TestCase):
with VirtualEnv() as venv:
venv.install(*extra_requirements)
if requirement:
venv.install("pip{}".format(requirement))
venv.install(f"pip{requirement}")
try:
subprocess.check_output([venv.venv_python, "-c", code])
except subprocess.CalledProcessError as exc: