Make sure all local clients are "destroyed" after being used

This also ensures that all `salt.utils.event.SaltEvent` classes are also
properly terminated.
This commit is contained in:
Pedro Algarvio 2020-12-23 08:11:24 +00:00
parent d3d7bc5606
commit 2fe1c5858f
35 changed files with 1228 additions and 1139 deletions

View file

@ -310,3 +310,4 @@ class Batch:
active.remove(minion)
if bwait:
wait.append(datetime.now() + timedelta(seconds=bwait))
self.local.destroy()

View file

@ -143,7 +143,6 @@ class SaltCP:
Make the salt client call in old-style all-in-one call method
"""
arg = [self._load_files(), self.opts["dest"]]
local = salt.client.get_local_client(self.opts["conf_file"])
args = [
self.opts["tgt"],
"cp.recv",
@ -155,7 +154,8 @@ class SaltCP:
if selected_target_option is not None:
args.append(selected_target_option)
return local.cmd(*args)
with salt.client.get_local_client(self.opts["conf_file"]) as local:
return local.cmd(*args)
def run_chunked(self):
"""
@ -183,8 +183,6 @@ class SaltCP:
)
minions = _res["minions"]
local = salt.client.get_local_client(self.opts["conf_file"])
def _get_remote_path(fn_):
if fn_ in self.opts["src"]:
# This was a filename explicitly passed on the CLI
@ -205,79 +203,81 @@ class SaltCP:
ret = {}
parent = ".." + os.sep
for fn_, mode in files.items():
remote_path = _get_remote_path(fn_)
index = 1
failed = {}
for chunk in reader(fn_, chunk_size=self.opts["salt_cp_chunk_size"]):
chunk = base64.b64encode(salt.utils.stringutils.to_bytes(chunk))
append = index > 1
with salt.client.get_local_client(self.opts["conf_file"]) as local:
for fn_, mode in files.items():
remote_path = _get_remote_path(fn_)
index = 1
failed = {}
for chunk in reader(fn_, chunk_size=self.opts["salt_cp_chunk_size"]):
chunk = base64.b64encode(salt.utils.stringutils.to_bytes(chunk))
append = index > 1
log.debug(
"Copying %s to %starget '%s' as %s%s",
fn_,
"{} ".format(selected_target_option)
if selected_target_option
else "",
tgt,
remote_path,
" (chunk #{})".format(index) if append else "",
)
args = [
tgt,
"cp.recv_chunked",
[remote_path, chunk, append, gzip, mode],
timeout,
]
if selected_target_option is not None:
args.append(selected_target_option)
result = local.cmd(*args)
if not result:
# Publish failed
msg = (
"Publish failed.{} It may be necessary to "
"decrease salt_cp_chunk_size (current value: "
"{})".format(
" File partially transferred." if index > 1 else "",
self.opts["salt_cp_chunk_size"],
)
)
for minion in minions:
ret.setdefault(minion, {})[remote_path] = msg
break
for minion_id, minion_ret in result.items():
ret.setdefault(minion_id, {})[remote_path] = minion_ret
# Catch first error message for a given minion, we will
# rewrite the results after we're done iterating through
# the chunks.
if minion_ret is not True and minion_id not in failed:
failed[minion_id] = minion_ret
index += 1
for minion_id, msg in failed.items():
ret[minion_id][remote_path] = msg
for dirname in empty_dirs:
remote_path = _get_remote_path(dirname)
log.debug(
"Copying %s to %starget '%s' as %s%s",
fn_,
"{} ".format(selected_target_option)
"Creating empty dir %s on %starget '%s'",
dirname,
"{} ".format(
selected_target_option
) # pylint: disable=str-format-in-logging
if selected_target_option
else "",
tgt,
remote_path,
" (chunk #{})".format(index) if append else "",
)
args = [
tgt,
"cp.recv_chunked",
[remote_path, chunk, append, gzip, mode],
timeout,
]
args = [tgt, "cp.recv_chunked", [remote_path, None], timeout]
if selected_target_option is not None:
args.append(selected_target_option)
result = local.cmd(*args)
if not result:
# Publish failed
msg = (
"Publish failed.{} It may be necessary to "
"decrease salt_cp_chunk_size (current value: "
"{})".format(
" File partially transferred." if index > 1 else "",
self.opts["salt_cp_chunk_size"],
)
)
for minion in minions:
ret.setdefault(minion, {})[remote_path] = msg
break
for minion_id, minion_ret in result.items():
for minion_id, minion_ret in local.cmd(*args).items():
ret.setdefault(minion_id, {})[remote_path] = minion_ret
# Catch first error message for a given minion, we will
# rewrite the results after we're done iterating through
# the chunks.
if minion_ret is not True and minion_id not in failed:
failed[minion_id] = minion_ret
index += 1
for minion_id, msg in failed.items():
ret[minion_id][remote_path] = msg
for dirname in empty_dirs:
remote_path = _get_remote_path(dirname)
log.debug(
"Creating empty dir %s on %starget '%s'",
dirname,
"{} ".format(
selected_target_option
) # pylint: disable=str-format-in-logging
if selected_target_option
else "",
tgt,
)
args = [tgt, "cp.recv_chunked", [remote_path, None], timeout]
if selected_target_option is not None:
args.append(selected_target_option)
for minion_id, minion_ret in local.cmd(*args).items():
ret.setdefault(minion_id, {})[remote_path] = minion_ret
return ret
return ret

View file

@ -104,7 +104,10 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser):
"\nNOTICE: Too many minions targeted, switching to batch execution."
)
self.options.batch = self.options.batch_safe_size
self._run_batch()
try:
self._run_batch()
finally:
self.local_client.destroy()
return
if getattr(self.options, "return"):
@ -229,6 +232,8 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser):
) as exc:
ret = str(exc)
self._output_ret(ret, "", retcode=1)
finally:
self.local_client.destroy()
def _preview_target(self):
"""

View file

@ -9,11 +9,6 @@ The data structure needs to be:
"""
# pylint: disable=import-error
# Try to import range from https://github.com/ytoolshed/range
#
import logging
# The components here are simple, and they need to be and stay simple, we
@ -33,9 +28,7 @@ from datetime import datetime
import salt.cache
import salt.config
import salt.defaults.exitcodes
# Import tornado
import salt.ext.tornado.gen # pylint: disable=F0401
import salt.ext.tornado.gen
import salt.loader
import salt.payload
import salt.syspaths as syspaths
@ -59,7 +52,6 @@ from salt.exceptions import (
SaltInvocationError,
SaltReqTimeoutError,
)
from salt.ext import six
HAS_RANGE = False
try:
@ -68,7 +60,6 @@ try:
HAS_RANGE = True
except ImportError:
pass
# pylint: enable=import-error
log = logging.getLogger(__name__)
@ -1991,9 +1982,7 @@ class LocalClient:
# This IS really necessary!
# When running tests, if self.events is not destroyed, we leak 2
# threads per test case which uses self.client
if hasattr(self, "event"):
# The call below will take care of calling 'self.event.destroy()'
del self.event
self.destroy()
# pylint: enable=W1701
@ -2002,6 +1991,17 @@ class LocalClient:
self.event.unsubscribe("syndic/.*/{}".format(job_id), "regex")
self.event.unsubscribe("salt/job/{}".format(job_id))
def destroy(self):
if self.event is not None:
self.event.destroy()
self.event = None
def __enter__(self):
return self
def __exit__(self, *args):
self.destroy()
class FunctionWrapper(dict):
"""

View file

@ -188,3 +188,20 @@ class SSHClient:
return self.cmd_iter(
f_tgt, fun, arg, timeout, tgt_type="list", ret=ret, kwarg=kwarg, **kwargs
)
def destroy(self):
"""
API compatibility method with salt.client.LocalClient
"""
def __enter__(self):
"""
API compatibility method with salt.client.LocalClient
"""
return self
def __exit__(self, *args):
"""
API compatibility method with salt.client.LocalClient
"""
self.destroy()

View file

@ -1250,19 +1250,18 @@ class Cloud:
salt.config.master_config(os.path.join(conf_path, "master"))
)
client = salt.client.get_local_client(mopts=mopts_)
ret = client.cmd(
vm_["name"],
"saltutil.sync_{}".format(self.opts["sync_after_install"]),
timeout=self.opts["timeout"],
)
if ret:
log.info(
"Synchronized the following dynamic modules: "
" {}".format(ret)
with salt.client.get_local_client(mopts=mopts_) as client:
ret = client.cmd(
vm_["name"],
"saltutil.sync_{}".format(self.opts["sync_after_install"]),
timeout=self.opts["timeout"],
)
break
if ret:
log.info(
"Synchronized the following dynamic modules: "
" {}".format(ret)
)
break
except KeyError as exc:
log.exception(
"Failed to create VM %s. Configuration value %s needs " "to be set",
@ -1277,12 +1276,12 @@ class Cloud:
opt_map = False
if self.opts["parallel"] and self.opts["start_action"] and not opt_map:
log.info("Running %s on %s", self.opts["start_action"], vm_["name"])
client = salt.client.get_local_client(mopts=self.opts)
action_out = client.cmd(
vm_["name"],
self.opts["start_action"],
timeout=self.opts["timeout"] * 60,
)
with salt.client.get_local_client(mopts=self.opts) as client:
action_out = client.cmd(
vm_["name"],
self.opts["start_action"],
timeout=self.opts["timeout"] * 60,
)
output["ret"] = action_out
return output
@ -2242,15 +2241,15 @@ class Map(Cloud):
log.info(
"Running %s on %s", self.opts["start_action"], ", ".join(group)
)
client = salt.client.get_local_client()
out.update(
client.cmd(
",".join(group),
self.opts["start_action"],
timeout=self.opts["timeout"] * 60,
tgt_type="list",
with salt.client.get_local_client() as client:
out.update(
client.cmd(
",".join(group),
self.opts["start_action"],
timeout=self.opts["timeout"] * 60,
tgt_type="list",
)
)
)
for obj in output_multip:
next(iter(obj.values()))["ret"] = out[next(iter(obj.keys()))]
output.update(obj)

View file

@ -137,95 +137,95 @@ def _salt(fun, *args, **kw):
skwargs = ""
cache_key = (laps, target, fun, sargs, skw, skwargs)
if not cache or (cache and (cache_key not in __CACHED_CALLS)):
conn = _client()
runner = _runner()
rkwargs = kwargs.copy()
rkwargs["timeout"] = timeout
rkwargs.setdefault("tgt_type", "list")
kwargs.setdefault("tgt_type", "list")
ping_retries = 0
# the target(s) have environ one minute to respond
# we call 60 ping request, this prevent us
# from blindly send commands to unmatched minions
ping_max_retries = 60
ping = True
# do not check ping... if we are pinguing
if fun == "test.ping":
ping_retries = ping_max_retries + 1
# be sure that the executors are alive
while ping_retries <= ping_max_retries:
try:
if ping_retries > 0:
time.sleep(1)
pings = conn.cmd(tgt=target, timeout=10, fun="test.ping")
values = list(pings.values())
if not values:
ping = False
for v in values:
if v is not True:
with _client() as conn:
runner = _runner()
rkwargs = kwargs.copy()
rkwargs["timeout"] = timeout
rkwargs.setdefault("tgt_type", "list")
kwargs.setdefault("tgt_type", "list")
ping_retries = 0
# the target(s) have environ one minute to respond
# we call 60 ping request, this prevent us
# from blindly send commands to unmatched minions
ping_max_retries = 60
ping = True
# do not check ping... if we are pinguing
if fun == "test.ping":
ping_retries = ping_max_retries + 1
# be sure that the executors are alive
while ping_retries <= ping_max_retries:
try:
if ping_retries > 0:
time.sleep(1)
pings = conn.cmd(tgt=target, timeout=10, fun="test.ping")
values = list(pings.values())
if not values:
ping = False
if not ping:
raise ValueError("Unreachable")
break
except Exception: # pylint: disable=broad-except
ping = False
ping_retries += 1
log.error("%s unreachable, retrying", target)
if not ping:
raise SaltCloudSystemExit("Target {} unreachable".format(target))
jid = conn.cmd_async(tgt=target, fun=fun, arg=args, kwarg=kw, **rkwargs)
cret = conn.cmd(
tgt=target, fun="saltutil.find_job", arg=[jid], timeout=10, **kwargs
)
running = bool(cret.get(target, False))
endto = time.time() + timeout
while running:
rkwargs = {
"tgt": target,
"fun": "saltutil.find_job",
"arg": [jid],
"timeout": 10,
}
cret = conn.cmd(**rkwargs)
for v in values:
if v is not True:
ping = False
if not ping:
raise ValueError("Unreachable")
break
except Exception: # pylint: disable=broad-except
ping = False
ping_retries += 1
log.error("%s unreachable, retrying", target)
if not ping:
raise SaltCloudSystemExit("Target {} unreachable".format(target))
jid = conn.cmd_async(tgt=target, fun=fun, arg=args, kwarg=kw, **rkwargs)
cret = conn.cmd(
tgt=target, fun="saltutil.find_job", arg=[jid], timeout=10, **kwargs
)
running = bool(cret.get(target, False))
if not running:
break
if running and (time.time() > endto):
raise Exception(
"Timeout {}s for {} is elapsed".format(
timeout, pprint.pformat(rkwargs)
endto = time.time() + timeout
while running:
rkwargs = {
"tgt": target,
"fun": "saltutil.find_job",
"arg": [jid],
"timeout": 10,
}
cret = conn.cmd(**rkwargs)
running = bool(cret.get(target, False))
if not running:
break
if running and (time.time() > endto):
raise Exception(
"Timeout {}s for {} is elapsed".format(
timeout, pprint.pformat(rkwargs)
)
)
)
time.sleep(poll)
# timeout for the master to return data about a specific job
wait_for_res = float({"test.ping": "5"}.get(fun, "120"))
while wait_for_res:
wait_for_res -= 0.5
cret = runner.cmd("jobs.lookup_jid", [jid, {"__kwarg__": True}])
if target in cret:
ret = cret[target]
break
# recent changes
elif "data" in cret and "outputter" in cret:
ret = cret["data"]
break
# special case, some answers may be crafted
# to handle the unresponsivness of a specific command
# which is also meaningful, e.g. a minion not yet provisioned
if fun in ["test.ping"] and not wait_for_res:
ret = {"test.ping": False}.get(fun, False)
time.sleep(0.5)
try:
if "is not available." in ret:
raise SaltCloudSystemExit(
"module/function {} is not available".format(fun)
)
except SaltCloudSystemExit: # pylint: disable=try-except-raise
raise
except TypeError:
pass
if cache:
__CACHED_CALLS[cache_key] = ret
time.sleep(poll)
# timeout for the master to return data about a specific job
wait_for_res = float({"test.ping": "5"}.get(fun, "120"))
while wait_for_res:
wait_for_res -= 0.5
cret = runner.cmd("jobs.lookup_jid", [jid, {"__kwarg__": True}])
if target in cret:
ret = cret[target]
break
# recent changes
elif "data" in cret and "outputter" in cret:
ret = cret["data"]
break
# special case, some answers may be crafted
# to handle the unresponsivness of a specific command
# which is also meaningful, e.g. a minion not yet provisioned
if fun in ["test.ping"] and not wait_for_res:
ret = {"test.ping": False}.get(fun, False)
time.sleep(0.5)
try:
if "is not available." in ret:
raise SaltCloudSystemExit(
"module/function {} is not available".format(fun)
)
except SaltCloudSystemExit: # pylint: disable=try-except-raise
raise
except TypeError:
pass
if cache:
__CACHED_CALLS[cache_key] = ret
elif cache and cache_key in __CACHED_CALLS:
ret = __CACHED_CALLS[cache_key]
return ret

View file

@ -197,8 +197,10 @@ def _list_nodes_full(call=None):
"""
List the nodes, ask all 'saltify' minions, return dict of grains.
"""
local = salt.client.LocalClient()
return local.cmd("salt-cloud:driver:saltify", "grains.items", "", tgt_type="grain")
with salt.client.LocalClient() as local:
return local.cmd(
"salt-cloud:driver:saltify", "grains.items", "", tgt_type="grain"
)
def list_nodes_select(call=None):
@ -215,10 +217,10 @@ def show_instance(name, call=None):
"""
List the a single node, return dict of grains.
"""
local = salt.client.LocalClient()
ret = local.cmd(name, "grains.items")
ret.update(_build_required_items(ret))
return ret
with salt.client.LocalClient() as local:
ret = local.cmd(name, "grains.items")
ret.update(_build_required_items(ret))
return ret
def create(vm_):
@ -273,31 +275,33 @@ def create(vm_):
)
if wol_mac and wol_host:
good_ping = False
local = salt.client.LocalClient()
ssh_host = config.get_cloud_config_value(
"ssh_host", vm_, __opts__, default=""
)
if ssh_host:
log.info("trying to ping %s", ssh_host)
count = "n" if salt.utils.platform.is_windows() else "c"
cmd = "ping -{} 1 {}".format(count, ssh_host)
good_ping = local.cmd(wol_host, "cmd.retcode", [cmd]) == 0
if good_ping:
log.info("successful ping.")
else:
log.info("sending wake-on-lan to %s using node %s", wol_mac, wol_host)
if isinstance(wol_mac, str):
wol_mac = [wol_mac] # a smart user may have passed more params
ret = local.cmd(wol_host, "network.wol", wol_mac)
log.info("network.wol returned value %s", ret)
if ret and ret[wol_host]:
sleep_time = config.get_cloud_config_value(
"wol_boot_wait", vm_, __opts__, default=30
with salt.client.LocalClient() as local:
if ssh_host:
log.info("trying to ping %s", ssh_host)
count = "n" if salt.utils.platform.is_windows() else "c"
cmd = "ping -{} 1 {}".format(count, ssh_host)
good_ping = local.cmd(wol_host, "cmd.retcode", [cmd]) == 0
if good_ping:
log.info("successful ping.")
else:
log.info(
"sending wake-on-lan to %s using node %s", wol_mac, wol_host
)
if sleep_time > 0.0:
log.info("delaying %d seconds for boot", sleep_time)
time.sleep(sleep_time)
if isinstance(wol_mac, str):
wol_mac = [wol_mac] # a smart user may have passed more params
ret = local.cmd(wol_host, "network.wol", wol_mac)
log.info("network.wol returned value %s", ret)
if ret and ret[wol_host]:
sleep_time = config.get_cloud_config_value(
"wol_boot_wait", vm_, __opts__, default=30
)
if sleep_time > 0.0:
log.info("delaying %d seconds for boot", sleep_time)
time.sleep(sleep_time)
log.info("Provisioning existing machine %s", vm_["name"])
ret = __utils__["cloud.bootstrap"](vm_, __opts__)
else:
@ -460,39 +464,41 @@ def destroy(name, call=None):
)
vm_ = get_configured_provider()
local = salt.client.LocalClient()
my_info = local.cmd(name, "grains.get", ["salt-cloud"])
try:
vm_.update(my_info[name]) # get profile name to get config value
except (IndexError, TypeError):
pass
if config.get_cloud_config_value(
"remove_config_on_destroy", vm_, opts, default=True
):
ret = local.cmd(
name, # prevent generating new keys on restart
"service.disable",
["salt-minion"],
)
if ret and ret[name]:
log.info("disabled salt-minion service on %s", name)
ret = local.cmd(name, "config.get", ["conf_file"])
if ret and ret[name]:
confile = ret[name]
ret = local.cmd(name, "file.remove", [confile])
with salt.client.LocalClient() as local:
my_info = local.cmd(name, "grains.get", ["salt-cloud"])
try:
vm_.update(my_info[name]) # get profile name to get config value
except (IndexError, TypeError):
pass
if config.get_cloud_config_value(
"remove_config_on_destroy", vm_, opts, default=True
):
ret = local.cmd(
name, # prevent generating new keys on restart
"service.disable",
["salt-minion"],
)
if ret and ret[name]:
log.info("removed minion %s configuration file %s", name, confile)
ret = local.cmd(name, "config.get", ["pki_dir"])
if ret and ret[name]:
pki_dir = ret[name]
ret = local.cmd(name, "file.remove", [pki_dir])
log.info("disabled salt-minion service on %s", name)
ret = local.cmd(name, "config.get", ["conf_file"])
if ret and ret[name]:
log.info("removed minion %s key files in %s", name, pki_dir)
confile = ret[name]
ret = local.cmd(name, "file.remove", [confile])
if ret and ret[name]:
log.info("removed minion %s configuration file %s", name, confile)
ret = local.cmd(name, "config.get", ["pki_dir"])
if ret and ret[name]:
pki_dir = ret[name]
ret = local.cmd(name, "file.remove", [pki_dir])
if ret and ret[name]:
log.info("removed minion %s key files in %s", name, pki_dir)
if config.get_cloud_config_value("shutdown_on_destroy", vm_, opts, default=False):
ret = local.cmd(name, "system.shutdown")
if ret and ret[name]:
log.info("system.shutdown for minion %s successful", name)
if config.get_cloud_config_value(
"shutdown_on_destroy", vm_, opts, default=False
):
ret = local.cmd(name, "system.shutdown")
if ret and ret[name]:
log.info("system.shutdown for minion %s successful", name)
__utils__["cloud.fire_event"](
"event",
@ -527,5 +533,5 @@ def reboot(name, call=None):
"The reboot action must be called with -a or --action."
)
local = salt.client.LocalClient()
return local.cmd(name, "system.reboot")
with salt.client.LocalClient() as local:
return local.cmd(name, "system.reboot")

View file

@ -156,9 +156,10 @@ def _list_nodes(call=None):
"""
List the nodes, ask all 'vagrant' minions, return dict of grains.
"""
local = salt.client.LocalClient()
ret = local.cmd("salt-cloud:driver:vagrant", "grains.items", "", tgt_type="grain")
return ret
with salt.client.LocalClient() as local:
return local.cmd(
"salt-cloud:driver:vagrant", "grains.items", "", tgt_type="grain"
)
def list_nodes_select(call=None):
@ -175,16 +176,16 @@ def show_instance(name, call=None):
"""
List the a single node, return dict of grains.
"""
local = salt.client.LocalClient()
ret = local.cmd(name, "grains.items", "")
reqs = _build_required_items(ret)
ret[name].update(reqs[name])
return ret
with salt.client.LocalClient() as local:
ret = local.cmd(name, "grains.items", "")
reqs = _build_required_items(ret)
ret[name].update(reqs[name])
return ret
def _get_my_info(name):
local = salt.client.LocalClient()
return local.cmd(name, "grains.get", ["salt-cloud"])
with salt.client.LocalClient() as local:
return local.cmd(name, "grains.get", ["salt-cloud"])
def create(vm_):
@ -216,20 +217,20 @@ def create(vm_):
log.info("sending 'vagrant.init %s machine=%s' command to %s", name, machine, host)
local = salt.client.LocalClient()
ret = local.cmd(host, "vagrant.init", [name], kwarg={"vm": vm_, "start": True})
log.info("response ==> %s", ret[host])
with salt.client.LocalClient() as local:
ret = local.cmd(host, "vagrant.init", [name], kwarg={"vm": vm_, "start": True})
log.info("response ==> %s", ret[host])
network_mask = config.get_cloud_config_value(
"network_mask", vm_, __opts__, default=""
)
if "ssh_host" not in vm_:
ret = local.cmd(
host,
"vagrant.get_ssh_config",
[name],
kwarg={"network_mask": network_mask, "get_private_key": True},
)[host]
network_mask = config.get_cloud_config_value(
"network_mask", vm_, __opts__, default=""
)
if "ssh_host" not in vm_:
ret = local.cmd(
host,
"vagrant.get_ssh_config",
[name],
kwarg={"network_mask": network_mask, "get_private_key": True},
)[host]
with tempfile.NamedTemporaryFile() as pks:
if "private_key" not in vm_ and ret and ret.get("private_key", False):
pks.write(ret["private_key"])
@ -300,8 +301,8 @@ def destroy(name, call=None):
profile_name = my_info[name]["profile"]
profile = opts["profiles"][profile_name]
host = profile["host"]
local = salt.client.LocalClient()
ret = local.cmd(host, "vagrant.destroy", [name])
with salt.client.LocalClient() as local:
ret = local.cmd(host, "vagrant.destroy", [name])
if ret[host]:
__utils__["cloud.fire_event"](
@ -347,5 +348,5 @@ def reboot(name, call=None):
profile_name = my_info[name]["profile"]
profile = __opts__["profiles"][profile_name]
host = profile["host"]
local = salt.client.LocalClient()
return local.cmd(host, "vagrant.reboot", [name])
with salt.client.LocalClient() as local:
return local.cmd(host, "vagrant.reboot", [name])

View file

@ -324,8 +324,8 @@ def destroy(name, conn=None, call=None):
mopts_ = salt.config.DEFAULT_MINION_OPTS
conf_path = "/".join(__opts__["conf_file"].split("/")[:-1])
mopts_.update(salt.config.minion_config(os.path.join(conf_path, "minion")))
client = salt.client.get_local_client(mopts_)
minions = client.cmd(name, "mine.flush")
with salt.client.get_local_client(mopts=mopts_) as client:
minions = client.cmd(name, "mine.flush")
log.info("Clearing Salt Mine: %s, %s", name, flush_mine_on_destroy)
log.info("Destroying VM: %s", name)

View file

@ -1047,6 +1047,9 @@ class RemoteFuncs:
if self.event is not None:
self.event.destroy()
self.event = None
if self.local is not None:
self.local.destroy()
self.local = None
class LocalFuncs:
@ -1423,3 +1426,6 @@ class LocalFuncs:
if self.event is not None:
self.event.destroy()
self.event = None
if self.local is not None:
self.local.destroy()
self.local = None

View file

@ -102,9 +102,9 @@ class Listener:
def start(hosts, channels, tag=None):
if tag is None:
tag = "salt/engine/redis_sentinel"
local = salt.client.LocalClient()
ips = local.cmd(
hosts["matching"], "network.ip_addrs", [hosts["interface"]]
).values()
with salt.client.LocalClient() as local:
ips = local.cmd(
hosts["matching"], "network.ip_addrs", [hosts["interface"]]
).values()
client = Listener(host=ips.pop()[0], port=hosts["port"], channels=channels, tag=tag)
client.run()

View file

@ -895,15 +895,15 @@ class SlackClient:
# Default to trying to run as a client module.
else:
local = salt.client.LocalClient()
log.debug(
"Command %s will run via local.cmd_async, targeting %s", cmd, target
)
log.debug("Running %s, %s, %s, %s, %s", target, cmd, args, kwargs, tgt_type)
# according to https://github.com/saltstack/salt-api/issues/164, tgt_type has changed to expr_form
job_id = local.cmd_async(
str(target), cmd, arg=args, kwarg=kwargs, tgt_type=str(tgt_type),
)
with salt.client.LocalClient() as local:
job_id = local.cmd_async(
str(target), cmd, arg=args, kwarg=kwargs, tgt_type=str(tgt_type),
)
log.info("ret from local.cmd_async is %s", job_id)
return job_id

View file

@ -688,31 +688,33 @@ class Key:
matches = match_dict
else:
matches = {}
for status, keys in matches.items():
for key in keys:
try:
if revoke_auth:
if self.opts.get("rotate_aes_key") is False:
print(
"Immediate auth revocation specified but AES key rotation not allowed. "
"Minion will not be disconnected until the master AES key is rotated."
)
else:
try:
client = salt.client.get_local_client(mopts=self.opts)
client.cmd_async(key, "saltutil.revoke_auth")
except salt.exceptions.SaltClientError:
with salt.client.get_local_client(mopts=self.opts) as client:
for status, keys in matches.items():
for key in keys:
try:
if revoke_auth:
if self.opts.get("rotate_aes_key") is False:
print(
"Cannot contact Salt master. "
"Connection for {} will remain up until "
"master AES key is rotated or auth is revoked "
"with 'saltutil.revoke_auth'.".format(key)
"Immediate auth revocation specified but AES key rotation not allowed. "
"Minion will not be disconnected until the master AES key is rotated."
)
os.remove(os.path.join(self.opts["pki_dir"], status, key))
eload = {"result": True, "act": "delete", "id": key}
self.event.fire_event(eload, salt.utils.event.tagify(prefix="key"))
except OSError:
pass
else:
try:
client.cmd_async(key, "saltutil.revoke_auth")
except salt.exceptions.SaltClientError:
print(
"Cannot contact Salt master. "
"Connection for {} will remain up until "
"master AES key is rotated or auth is revoked "
"with 'saltutil.revoke_auth'.".format(key)
)
os.remove(os.path.join(self.opts["pki_dir"], status, key))
eload = {"result": True, "act": "delete", "id": key}
self.event.fire_event(
eload, salt.utils.event.tagify(prefix="key")
)
except OSError:
pass
if self.opts.get("preserve_minions") is True:
self.check_minion_cache(preserve_minions=matches.get("minions", []))
else:

View file

@ -1964,6 +1964,9 @@ class AESFuncs(TransportMethods):
def destroy(self):
self.masterapi.destroy()
if self.local is not None:
self.local.destroy()
self.local = None
class ClearFuncs(TransportMethods):
@ -2510,3 +2513,6 @@ class ClearFuncs(TransportMethods):
if self.masterapi is not None:
self.masterapi.destroy()
self.masterapi = None
if self.local is not None:
self.local.destroy()
self.local = None

View file

@ -27,11 +27,9 @@ import salt.crypt
import salt.defaults.events
import salt.defaults.exitcodes
import salt.engines
# pylint: enable=no-name-in-module,redefined-builtin
import salt.ext.tornado
import salt.ext.tornado.gen # pylint: disable=F0401
import salt.ext.tornado.ioloop # pylint: disable=F0401
import salt.ext.tornado.gen
import salt.ext.tornado.ioloop
import salt.loader
import salt.log.setup
import salt.payload
@ -71,8 +69,6 @@ from salt.exceptions import (
SaltReqTimeoutError,
SaltSystemExit,
)
# pylint: disable=import-error,no-name-in-module,redefined-builtin
from salt.template import SLS_ENCODING
from salt.utils.ctx import RequestContext
from salt.utils.debug import enable_sigusr1_handler
@ -1036,6 +1032,8 @@ class MinionManager(MinionBase):
self.io_loop.spawn_callback(
self.process_manager.run, **{"asynchronous": True}
) # Tornado backward compat
self.event_publisher = None
self.event = None
# pylint: disable=W1701
def __del__(self):
@ -1193,12 +1191,22 @@ class MinionManager(MinionBase):
# kill any remaining processes
minion.process_manager.kill_children()
minion.destroy()
self.event.destroy()
if self.event_publisher is not None:
self.event_publisher.close()
self.event_publisher = None
if self.event is not None:
self.event.destroy()
self.event = None
def destroy(self):
for minion in self.minions:
minion.destroy()
self.event.destroy()
if self.event_publisher is not None:
self.event_publisher.close()
self.event_publisher = None
if self.event is not None:
self.event.destroy()
self.event = None
class Minion(MinionBase):
@ -3134,6 +3142,8 @@ class Syndic(Minion):
"""
def __init__(self, opts, **kwargs):
self.local = None
self.forward_events = None
self._syndic_interface = opts.get("interface")
self._syndic = True
# force auth_safemode True because Syndic don't support autorestart
@ -3256,11 +3266,13 @@ class Syndic(Minion):
# We borrowed the local clients poller so give it back before
# it's destroyed. Reset the local poller reference.
super().destroy()
if hasattr(self, "local"):
del self.local
if self.local is not None:
self.local.destroy()
self.local = None
if hasattr(self, "forward_events"):
if self.forward_events is not None:
self.forward_events.stop()
self.forward_events = None
# TODO: need a way of knowing if the syndic connection is busted
@ -3291,6 +3303,7 @@ class SyndicManager(MinionBase):
def __init__(self, opts, io_loop=None):
opts["loop_interval"] = 1
super().__init__(opts)
self._closing = False
self.mminion = salt.minion.MasterMinion(opts)
# sync (old behavior), cluster (only returns and publishes)
self.syndic_mode = self.opts.get("syndic_mode", "sync")
@ -3603,6 +3616,13 @@ class SyndicManager(MinionBase):
if res:
del self.job_rets[master]
def destroy(self):
if self._closing is True:
return
self._closing = True
if self.local is not None:
self.local.destroy()
class ProxyMinionManager(MinionManager):
"""

View file

@ -62,10 +62,12 @@ def set_option(file_name, sections=None, separator="="):
.. code-block:: python
import salt
sc = salt.client.get_local_client()
sc.cmd('target', 'ini.set_option',
['path_to_ini_file', '{"section_to_change": {"key": "value"}}'])
import salt.client
with salt.client.get_local_client() as sc:
sc.cmd(
'target', 'ini.set_option',
['path_to_ini_file', '{"section_to_change": {"key": "value"}}']
)
CLI Example:
@ -90,10 +92,10 @@ def get_option(file_name, section, option, separator="="):
.. code-block:: python
import salt
sc = salt.client.get_local_client()
sc.cmd('target', 'ini.get_option',
[path_to_ini_file, section_name, option])
import salt.client
with salt.client.get_local_client() as sc:
sc.cmd('target', 'ini.get_option',
[path_to_ini_file, section_name, option])
CLI Example:
@ -149,10 +151,10 @@ def get_section(file_name, section, separator="="):
.. code-block:: python
import salt
sc = salt.client.get_local_client()
sc.cmd('target', 'ini.get_section',
[path_to_ini_file, section_name])
import salt.client
with salt.client.get_local_client() as sc:
sc.cmd('target', 'ini.get_section',
[path_to_ini_file, section_name])
CLI Example:
@ -177,10 +179,10 @@ def remove_section(file_name, section, separator="="):
.. code-block:: python
import salt
sc = salt.client.get_local_client()
sc.cmd('target', 'ini.remove_section',
[path_to_ini_file, section_name])
import salt.client
with salt.client.get_local_client() as sc:
sc.cmd('target', 'ini.remove_section',
[path_to_ini_file, section_name])
CLI Example:
@ -207,10 +209,10 @@ def get_ini(file_name, separator="="):
.. code-block:: python
import salt
sc = salt.client.get_local_client()
sc.cmd('target', 'ini.get_ini',
[path_to_ini_file])
import salt.client
with salt.client.giet_local_client() as sc:
sc.cmd('target', 'ini.get_ini',
[path_to_ini_file])
CLI Example:

View file

@ -1644,8 +1644,8 @@ def cmd(
salt '*' saltutil.cmd
"""
cfgfile = __opts__["conf_file"]
client = _get_ssh_or_api_client(cfgfile, ssh)
fcn_ret = _exec(client, tgt, fun, arg, timeout, tgt_type, ret, kwarg, **kwargs)
with _get_ssh_or_api_client(cfgfile, ssh) as client:
fcn_ret = _exec(client, tgt, fun, arg, timeout, tgt_type, ret, kwarg, **kwargs)
# if return is empty, we may have not used the right conf,
# try with the 'minion relative master configuration counter part
# if available
@ -1655,8 +1655,10 @@ def cmd(
and cfgfile.endswith("{}{}".format(os.path.sep, "minion"))
and os.path.exists(master_cfgfile)
):
client = _get_ssh_or_api_client(master_cfgfile, ssh)
fcn_ret = _exec(client, tgt, fun, arg, timeout, tgt_type, ret, kwarg, **kwargs)
with _get_ssh_or_api_client(master_cfgfile, ssh) as client:
fcn_ret = _exec(
client, tgt, fun, arg, timeout, tgt_type, ret, kwarg, **kwargs
)
return fcn_ret

View file

@ -150,8 +150,8 @@ class NetapiClient:
:return: job ID
"""
local = salt.client.get_local_client(mopts=self.opts)
return local.run_job(*args, **kwargs)
with salt.client.get_local_client(mopts=self.opts) as client:
return client.run_job(*args, **kwargs)
def local(self, *args, **kwargs):
"""
@ -167,8 +167,8 @@ class NetapiClient:
:return: Returns the result from the execution module
"""
local = salt.client.get_local_client(mopts=self.opts)
return local.cmd(*args, **kwargs)
with salt.client.get_local_client(mopts=self.opts) as client:
return client.cmd(*args, **kwargs)
def local_subset(self, *args, **kwargs):
"""
@ -178,8 +178,8 @@ class NetapiClient:
Wraps :py:meth:`salt.client.LocalClient.cmd_subset`
"""
local = salt.client.get_local_client(mopts=self.opts)
return local.cmd_subset(*args, **kwargs)
with salt.client.get_local_client(mopts=self.opts) as client:
return client.cmd_subset(*args, **kwargs)
def local_batch(self, *args, **kwargs):
"""
@ -192,8 +192,8 @@ class NetapiClient:
:return: Returns the result from the exeuction module for each batch of
returns
"""
local = salt.client.get_local_client(mopts=self.opts)
return local.cmd_batch(*args, **kwargs)
with salt.client.get_local_client(mopts=self.opts) as client:
return client.cmd_batch(*args, **kwargs)
def ssh(self, *args, **kwargs):
"""
@ -203,10 +203,10 @@ class NetapiClient:
:return: Returns the result from the salt-ssh command
"""
ssh_client = salt.client.ssh.client.SSHClient(
with salt.client.ssh.client.SSHClient(
mopts=self.opts, disable_custom_roster=True
)
return ssh_client.cmd_sync(kwargs)
) as client:
return client.cmd_sync(kwargs)
def runner(self, fun, timeout=None, full_return=False, **kwargs):
"""

View file

@ -58,18 +58,17 @@ def execution():
salt-run doc.execution
"""
client = salt.client.get_local_client(__opts__["conf_file"])
docs = {}
try:
for ret in client.cmd_iter("*", "sys.doc", timeout=__opts__["timeout"]):
for v in ret.values():
docs.update(v)
except SaltClientError as exc:
print(exc)
return []
with salt.client.get_local_client(__opts__["conf_file"]) as client:
try:
for ret in client.cmd_iter("*", "sys.doc", timeout=__opts__["timeout"]):
for v in ret.values():
docs.update(v)
except SaltClientError as exc:
print(exc)
return []
i = itertools.chain.from_iterable([docs["ret"].items()])
ret = dict(list(i))
i = itertools.chain.from_iterable([docs["ret"].items()])
ret = dict(list(i))
return ret
return ret

View file

@ -51,12 +51,12 @@ def active(display_progress=False):
salt-run jobs.active
"""
ret = {}
client = salt.client.get_local_client(__opts__["conf_file"])
try:
active_ = client.cmd("*", "saltutil.running", timeout=__opts__["timeout"])
except SaltClientError as client_error:
print(client_error)
return ret
with salt.client.get_local_client(__opts__["conf_file"]) as client:
try:
active_ = client.cmd("*", "saltutil.running", timeout=__opts__["timeout"])
except SaltClientError as client_error:
print(client_error)
return ret
if display_progress:
__jid_event__.fire_event(

View file

@ -39,15 +39,15 @@ def _do(name, fun, path=None):
if not host:
return False
client = salt.client.get_local_client(__opts__["conf_file"])
cmd_ret = client.cmd_iter(
host, "lxc.{}".format(fun), [name], kwarg={"path": path}, timeout=60
)
data = next(cmd_ret)
data = data.get(host, {}).get("ret", None)
if data:
data = {host: data}
return data
with salt.client.get_local_client(__opts__["conf_file"]) as client:
cmd_ret = client.cmd_iter(
host, "lxc.{}".format(fun), [name], kwarg={"path": path}, timeout=60
)
data = next(cmd_ret)
data = data.get(host, {}).get("ret", None)
if data:
data = {host: data}
return data
def _do_names(names, fun, path=None):
@ -65,25 +65,25 @@ def _do_names(names, fun, path=None):
if not hosts:
return False
client = salt.client.get_local_client(__opts__["conf_file"])
for host, sub_names in hosts.items():
cmds = []
for name in sub_names:
cmds.append(
client.cmd_iter(
host,
"lxc.{}".format(fun),
[name],
kwarg={"path": path},
timeout=60,
with salt.client.get_local_client(__opts__["conf_file"]) as client:
for host, sub_names in hosts.items():
cmds = []
for name in sub_names:
cmds.append(
client.cmd_iter(
host,
"lxc.{}".format(fun),
[name],
kwarg={"path": path},
timeout=60,
)
)
)
for cmd in cmds:
data = next(cmd)
data = data.get(host, {}).get("ret", None)
if data:
ret.update({host: data})
return ret
for cmd in cmds:
data = next(cmd)
data = data.get(host, {}).get("ret", None)
if data:
ret.update({host: data})
return ret
def find_guest(name, quiet=False, path=None):
@ -244,165 +244,171 @@ def init(names, host=None, saltcloud_mode=False, quiet=False, **kwargs):
ret["result"] = False
return ret
# check that the host is alive
client = salt.client.get_local_client(__opts__["conf_file"])
alive = False
try:
if client.cmd(host, "test.ping", timeout=20).get(host, None):
alive = True
except (TypeError, KeyError):
pass
if not alive:
ret["comment"] = "Host {} is not reachable".format(host)
ret["result"] = False
return ret
with salt.client.get_local_client(__opts__["conf_file"]) as client:
try:
if client.cmd(host, "test.ping", timeout=20).get(host, None):
alive = True
except (TypeError, KeyError):
pass
if not alive:
ret["comment"] = "Host {} is not reachable".format(host)
ret["result"] = False
return ret
log.info("Searching for LXC Hosts")
data = __salt__["lxc.list"](host, quiet=True, path=path)
for host, containers in data.items():
for name in names:
if name in sum(containers.values(), []):
log.info(
"Container '%s' already exists on host '%s', init "
"can be a NO-OP",
name,
host,
)
if host not in data:
ret["comment"] = "Host '{}' was not found".format(host)
ret["result"] = False
return ret
kw = salt.utils.args.clean_kwargs(**kwargs)
pub_key = kw.get("pub_key", None)
priv_key = kw.get("priv_key", None)
explicit_auth = pub_key and priv_key
approve_key = kw.get("approve_key", True)
seeds = {}
seed_arg = kwargs.get("seed", True)
if approve_key and not explicit_auth:
with salt.key.Key(__opts__) as skey:
all_minions = skey.all_keys().get("minions", [])
log.info("Searching for LXC Hosts")
data = __salt__["lxc.list"](host, quiet=True, path=path)
for host, containers in data.items():
for name in names:
seed = seed_arg
if name in all_minions:
try:
if client.cmd(name, "test.ping", timeout=20).get(name, None):
seed = False
except (TypeError, KeyError):
pass
seeds[name] = seed
kv = salt.utils.virt.VirtKey(host, name, __opts__)
if kv.authorize():
log.info("Container key will be preauthorized")
else:
ret["comment"] = "Container key preauthorization failed"
ret["result"] = False
return ret
if name in sum(containers.values(), []):
log.info(
"Container '%s' already exists on host '%s', init "
"can be a NO-OP",
name,
host,
)
if host not in data:
ret["comment"] = "Host '{}' was not found".format(host)
ret["result"] = False
return ret
log.info("Creating container(s) '%s' on host '%s'", names, host)
cmds = []
for name in names:
args = [name]
kw = salt.utils.args.clean_kwargs(**kwargs)
if saltcloud_mode:
kw = copy.deepcopy(kw)
kw["name"] = name
saved_kwargs = kw
kw = client.cmd(
host,
"lxc.cloud_init_interface",
args + [kw],
tgt_type="list",
timeout=600,
).get(host, {})
kw.update(saved_kwargs)
name = kw.pop("name", name)
# be sure not to seed an already seeded host
kw["seed"] = seeds.get(name, seed_arg)
if not kw["seed"]:
kw.pop("seed_cmd", "")
cmds.append(
(host, name, client.cmd_iter(host, "lxc.init", args, kwarg=kw, timeout=600))
)
done = ret.setdefault("done", [])
errors = ret.setdefault("errors", _OrderedDict())
pub_key = kw.get("pub_key", None)
priv_key = kw.get("priv_key", None)
explicit_auth = pub_key and priv_key
approve_key = kw.get("approve_key", True)
seeds = {}
seed_arg = kwargs.get("seed", True)
if approve_key and not explicit_auth:
with salt.key.Key(__opts__) as skey:
all_minions = skey.all_keys().get("minions", [])
for name in names:
seed = seed_arg
if name in all_minions:
try:
if client.cmd(name, "test.ping", timeout=20).get(
name, None
):
seed = False
except (TypeError, KeyError):
pass
seeds[name] = seed
kv = salt.utils.virt.VirtKey(host, name, __opts__)
if kv.authorize():
log.info("Container key will be preauthorized")
else:
ret["comment"] = "Container key preauthorization failed"
ret["result"] = False
return ret
for ix, acmd in enumerate(cmds):
hst, container_name, cmd = acmd
containers = ret.setdefault(hst, [])
herrs = errors.setdefault(hst, _OrderedDict())
serrs = herrs.setdefault(container_name, [])
sub_ret = next(cmd)
error = None
if isinstance(sub_ret, dict) and host in sub_ret:
j_ret = sub_ret[hst]
container = j_ret.get("ret", {})
if container and isinstance(container, dict):
if not container.get("result", False):
error = container
else:
error = "Invalid return for {}: {} {}".format(
container_name, container, sub_ret
log.info("Creating container(s) '%s' on host '%s'", names, host)
cmds = []
for name in names:
args = [name]
kw = salt.utils.args.clean_kwargs(**kwargs)
if saltcloud_mode:
kw = copy.deepcopy(kw)
kw["name"] = name
saved_kwargs = kw
kw = client.cmd(
host,
"lxc.cloud_init_interface",
args + [kw],
tgt_type="list",
timeout=600,
).get(host, {})
kw.update(saved_kwargs)
name = kw.pop("name", name)
# be sure not to seed an already seeded host
kw["seed"] = seeds.get(name, seed_arg)
if not kw["seed"]:
kw.pop("seed_cmd", "")
cmds.append(
(
host,
name,
client.cmd_iter(host, "lxc.init", args, kwarg=kw, timeout=600),
)
else:
error = sub_ret
if not error:
error = "unknown error (no return)"
if error:
)
done = ret.setdefault("done", [])
errors = ret.setdefault("errors", _OrderedDict())
for ix, acmd in enumerate(cmds):
hst, container_name, cmd = acmd
containers = ret.setdefault(hst, [])
herrs = errors.setdefault(hst, _OrderedDict())
serrs = herrs.setdefault(container_name, [])
sub_ret = next(cmd)
error = None
if isinstance(sub_ret, dict) and host in sub_ret:
j_ret = sub_ret[hst]
container = j_ret.get("ret", {})
if container and isinstance(container, dict):
if not container.get("result", False):
error = container
else:
error = "Invalid return for {}: {} {}".format(
container_name, container, sub_ret
)
else:
error = sub_ret
if not error:
error = "unknown error (no return)"
if error:
ret["result"] = False
serrs.append(error)
else:
container["container_name"] = name
containers.append(container)
done.append(container)
# marking ping status as True only and only if we have at
# least provisioned one container
ret["ping_status"] = bool(len(done))
# for all provisioned containers, last job is to verify
# - the key status
# - we can reach them
for container in done:
# explicitly check and update
# the minion key/pair stored on the master
container_name = container["container_name"]
key = os.path.join(__opts__["pki_dir"], "minions", container_name)
if explicit_auth:
fcontent = ""
if os.path.exists(key):
with salt.utils.files.fopen(key) as fic:
fcontent = salt.utils.stringutils.to_unicode(fic.read()).strip()
pub_key = salt.utils.stringutils.to_unicode(pub_key)
if pub_key.strip() != fcontent:
with salt.utils.files.fopen(key, "w") as fic:
fic.write(salt.utils.stringutils.to_str(pub_key))
fic.flush()
mid = j_ret.get("mid", None)
if not mid:
continue
def testping(**kw):
mid_ = kw["mid"]
ping = client.cmd(mid_, "test.ping", timeout=20)
time.sleep(1)
if ping:
return "OK"
raise Exception("Unresponsive {}".format(mid_))
ping = salt.utils.cloud.wait_for_fun(testping, timeout=21, mid=mid)
if ping != "OK":
ret["ping_status"] = False
ret["result"] = False
# if no lxc detected as touched (either inited or verified)
# we result to False
if not done:
ret["result"] = False
serrs.append(error)
else:
container["container_name"] = name
containers.append(container)
done.append(container)
# marking ping status as True only and only if we have at
# least provisioned one container
ret["ping_status"] = bool(len(done))
# for all provisioned containers, last job is to verify
# - the key status
# - we can reach them
for container in done:
# explicitly check and update
# the minion key/pair stored on the master
container_name = container["container_name"]
key = os.path.join(__opts__["pki_dir"], "minions", container_name)
if explicit_auth:
fcontent = ""
if os.path.exists(key):
with salt.utils.files.fopen(key) as fic:
fcontent = salt.utils.stringutils.to_unicode(fic.read()).strip()
pub_key = salt.utils.stringutils.to_unicode(pub_key)
if pub_key.strip() != fcontent:
with salt.utils.files.fopen(key, "w") as fic:
fic.write(salt.utils.stringutils.to_str(pub_key))
fic.flush()
mid = j_ret.get("mid", None)
if not mid:
continue
def testping(**kw):
mid_ = kw["mid"]
ping = client.cmd(mid_, "test.ping", timeout=20)
time.sleep(1)
if ping:
return "OK"
raise Exception("Unresponsive {}".format(mid_))
ping = salt.utils.cloud.wait_for_fun(testping, timeout=21, mid=mid)
if ping != "OK":
ret["ping_status"] = False
ret["result"] = False
# if no lxc detected as touched (either inited or verified)
# we result to False
if not done:
ret["result"] = False
if not quiet:
__jid_event__.fire_event({"message": ret}, "progress")
return ret
if not quiet:
__jid_event__.fire_event({"message": ret}, "progress")
return ret
def cloud_init(names, host=None, quiet=False, **kwargs):
@ -443,24 +449,24 @@ def _list_iter(host=None, path=None):
.. versionadded:: 2015.8.0
"""
tgt = host or "*"
client = salt.client.get_local_client(__opts__["conf_file"])
for container_info in client.cmd_iter(tgt, "lxc.list", kwarg={"path": path}):
if not container_info:
continue
if not isinstance(container_info, dict):
continue
chunk = {}
id_ = next(iter(container_info.keys()))
if host and host != id_:
continue
if not isinstance(container_info[id_], dict):
continue
if "ret" not in container_info[id_]:
continue
if not isinstance(container_info[id_]["ret"], dict):
continue
chunk[id_] = container_info[id_]["ret"]
yield chunk
with salt.client.get_local_client(__opts__["conf_file"]) as client:
for container_info in client.cmd_iter(tgt, "lxc.list", kwarg={"path": path}):
if not container_info:
continue
if not isinstance(container_info, dict):
continue
chunk = {}
id_ = next(iter(container_info.keys()))
if host and host != id_:
continue
if not isinstance(container_info[id_], dict):
continue
if "ret" not in container_info[id_]:
continue
if not isinstance(container_info[id_]["ret"], dict):
continue
chunk[id_] = container_info[id_]["ret"]
yield chunk
def list_(host=None, quiet=False, path=None):

View file

@ -32,38 +32,38 @@ log = logging.getLogger(__name__)
def _ping(tgt, tgt_type, timeout, gather_job_timeout):
client = salt.client.get_local_client(__opts__["conf_file"])
pub_data = client.run_job(
tgt, "test.ping", (), tgt_type, "", timeout, "", listen=True
)
with salt.client.get_local_client(__opts__["conf_file"]) as client:
pub_data = client.run_job(
tgt, "test.ping", (), tgt_type, "", timeout, "", listen=True
)
if not pub_data:
return pub_data
if not pub_data:
return pub_data
log.debug(
"manage runner will ping the following minion(s): %s",
", ".join(sorted(pub_data["minions"])),
)
log.debug(
"manage runner will ping the following minion(s): %s",
", ".join(sorted(pub_data["minions"])),
)
returned = set()
for fn_ret in client.get_cli_event_returns(
pub_data["jid"],
pub_data["minions"],
client._get_timeout(timeout),
tgt,
tgt_type,
gather_job_timeout=gather_job_timeout,
):
returned = set()
for fn_ret in client.get_cli_event_returns(
pub_data["jid"],
pub_data["minions"],
client._get_timeout(timeout),
tgt,
tgt_type,
gather_job_timeout=gather_job_timeout,
):
if fn_ret:
for mid, _ in fn_ret.items():
log.debug("minion '%s' returned from ping", mid)
returned.add(mid)
if fn_ret:
for mid, _ in fn_ret.items():
log.debug("minion '%s' returned from ping", mid)
returned.add(mid)
not_returned = sorted(set(pub_data["minions"]) - returned)
returned = sorted(returned)
not_returned = sorted(set(pub_data["minions"]) - returned)
returned = sorted(returned)
return returned, not_returned
return returned, not_returned
def status(

View file

@ -148,21 +148,20 @@ def execute(
days: 1
returner: redis
"""
client = salt.client.get_local_client(__opts__["conf_file"])
try:
ret = client.cmd(
tgt,
fun,
arg=arg,
timeout=timeout or __opts__["timeout"],
tgt_type=tgt_type, # no warn_until, as this is introduced only in 2017.7.0
ret=ret,
jid=jid,
kwarg=kwarg,
**kwargs
)
except SaltClientError as client_error:
log.error("Error while executing %s on %s (%s)", fun, tgt, tgt_type)
log.error(client_error)
return {}
return ret
with salt.client.get_local_client(__opts__["conf_file"]) as client:
try:
return client.cmd(
tgt,
fun,
arg=arg,
timeout=timeout or __opts__["timeout"],
tgt_type=tgt_type, # no warn_until, as this is introduced only in 2017.7.0
ret=ret,
jid=jid,
kwarg=kwarg,
**kwargs
)
except SaltClientError as client_error:
log.error("Error while executing %s on %s (%s)", fun, tgt, tgt_type)
log.error(client_error)
return {}

View file

@ -30,102 +30,102 @@ def _action(action="get", search=None, one=True, force=False):
"""
vms = {}
matched_vms = []
client = salt.client.get_local_client(__opts__["conf_file"])
## lookup vms
try:
vmadm_args = {}
vmadm_args["order"] = "uuid,alias,hostname,state"
if "=" in search:
vmadm_args["search"] = search
for cn in client.cmd_iter(
"G@virtual:physical and G@os:smartos",
"vmadm.list",
kwarg=vmadm_args,
tgt_type="compound",
):
if not cn:
continue
node = next(iter(cn.keys()))
if (
not isinstance(cn[node], dict)
or "ret" not in cn[node]
or not isinstance(cn[node]["ret"], dict)
with salt.client.get_local_client(__opts__["conf_file"]) as client:
try:
vmadm_args = {}
vmadm_args["order"] = "uuid,alias,hostname,state"
if "=" in search:
vmadm_args["search"] = search
for cn in client.cmd_iter(
"G@virtual:physical and G@os:smartos",
"vmadm.list",
kwarg=vmadm_args,
tgt_type="compound",
):
continue
for vm in cn[node]["ret"]:
vmcfg = cn[node]["ret"][vm]
vmcfg["node"] = node
vms[vm] = vmcfg
except SaltClientError as client_error:
pass
## check if we have vms
if len(vms) == 0:
return {"Error": "No vms found."}
## simple search
if "=" not in search:
loop_pass = 0
while loop_pass < 3:
## each pass will try a different field
if loop_pass == 0:
field = "uuid"
elif loop_pass == 1:
field = "hostname"
else:
field = "alias"
## loop vms and try to match
for vm in vms:
if field == "uuid" and vm == search:
matched_vms.append(vm)
break # exit for on uuid match (max = 1)
elif field in vms[vm] and vms[vm][field] == search:
matched_vms.append(vm)
## exit on match(es) or try again
if len(matched_vms) > 0:
break
else:
loop_pass += 1
else:
for vm in vms:
matched_vms.append(vm)
## check if we have vms
if len(matched_vms) == 0:
return {"Error": "No vms matched."}
## multiple allowed?
if one and len(matched_vms) > 1:
return {
"Error": "Matched {} vms, only one allowed!".format(len(matched_vms)),
"Matches": matched_vms,
}
## perform action
ret = {}
if action in ["start", "stop", "reboot", "get"]:
for vm in matched_vms:
vmadm_args = {"key": "uuid", "vm": vm}
try:
for vmadm_res in client.cmd_iter(
vms[vm]["node"], "vmadm.{}".format(action), kwarg=vmadm_args
if not cn:
continue
node = next(iter(cn.keys()))
if (
not isinstance(cn[node], dict)
or "ret" not in cn[node]
or not isinstance(cn[node]["ret"], dict)
):
if not vmadm_res:
continue
if vms[vm]["node"] in vmadm_res:
ret[vm] = vmadm_res[vms[vm]["node"]]["ret"]
except SaltClientError as client_error:
ret[vm] = False
elif action in ["is_running"]:
ret = True
for vm in matched_vms:
if vms[vm]["state"] != "running":
ret = False
break
return ret
continue
for vm in cn[node]["ret"]:
vmcfg = cn[node]["ret"][vm]
vmcfg["node"] = node
vms[vm] = vmcfg
except SaltClientError as client_error:
pass
## check if we have vms
if len(vms) == 0:
return {"Error": "No vms found."}
## simple search
if "=" not in search:
loop_pass = 0
while loop_pass < 3:
## each pass will try a different field
if loop_pass == 0:
field = "uuid"
elif loop_pass == 1:
field = "hostname"
else:
field = "alias"
## loop vms and try to match
for vm in vms:
if field == "uuid" and vm == search:
matched_vms.append(vm)
break # exit for on uuid match (max = 1)
elif field in vms[vm] and vms[vm][field] == search:
matched_vms.append(vm)
## exit on match(es) or try again
if len(matched_vms) > 0:
break
else:
loop_pass += 1
else:
for vm in vms:
matched_vms.append(vm)
## check if we have vms
if len(matched_vms) == 0:
return {"Error": "No vms matched."}
## multiple allowed?
if one and len(matched_vms) > 1:
return {
"Error": "Matched {} vms, only one allowed!".format(len(matched_vms)),
"Matches": matched_vms,
}
## perform action
ret = {}
if action in ["start", "stop", "reboot", "get"]:
for vm in matched_vms:
vmadm_args = {"key": "uuid", "vm": vm}
try:
for vmadm_res in client.cmd_iter(
vms[vm]["node"], "vmadm.{}".format(action), kwarg=vmadm_args
):
if not vmadm_res:
continue
if vms[vm]["node"] in vmadm_res:
ret[vm] = vmadm_res[vms[vm]["node"]]["ret"]
except SaltClientError as client_error:
ret[vm] = False
elif action in ["is_running"]:
ret = True
for vm in matched_vms:
if vms[vm]["state"] != "running":
ret = False
break
return ret
def nodes(verbose=False):
@ -144,53 +144,55 @@ def nodes(verbose=False):
salt-run vmadm.nodes verbose=True
"""
ret = {} if verbose else []
client = salt.client.get_local_client(__opts__["conf_file"])
with salt.client.get_local_client(__opts__["conf_file"]) as client:
## get list of nodes
try:
for cn in client.cmd_iter(
"G@virtual:physical and G@os:smartos", "grains.items", tgt_type="compound"
):
if not cn:
continue
node = next(iter(cn.keys()))
if (
not isinstance(cn[node], dict)
or "ret" not in cn[node]
or not isinstance(cn[node]["ret"], dict)
## get list of nodes
try:
for cn in client.cmd_iter(
"G@virtual:physical and G@os:smartos",
"grains.items",
tgt_type="compound",
):
continue
if verbose:
ret[node] = {}
ret[node]["version"] = {}
ret[node]["version"]["platform"] = cn[node]["ret"]["osrelease"]
if "computenode_sdc_version" in cn[node]["ret"]:
ret[node]["version"]["sdc"] = cn[node]["ret"][
"computenode_sdc_version"
]
ret[node]["vms"] = {}
if not cn:
continue
node = next(iter(cn.keys()))
if (
"computenode_vm_capable" in cn[node]["ret"]
and cn[node]["ret"]["computenode_vm_capable"]
and "computenode_vm_hw_virt" in cn[node]["ret"]
not isinstance(cn[node], dict)
or "ret" not in cn[node]
or not isinstance(cn[node]["ret"], dict)
):
ret[node]["vms"]["hw_cap"] = cn[node]["ret"][
"computenode_vm_hw_virt"
]
continue
if verbose:
ret[node] = {}
ret[node]["version"] = {}
ret[node]["version"]["platform"] = cn[node]["ret"]["osrelease"]
if "computenode_sdc_version" in cn[node]["ret"]:
ret[node]["version"]["sdc"] = cn[node]["ret"][
"computenode_sdc_version"
]
ret[node]["vms"] = {}
if (
"computenode_vm_capable" in cn[node]["ret"]
and cn[node]["ret"]["computenode_vm_capable"]
and "computenode_vm_hw_virt" in cn[node]["ret"]
):
ret[node]["vms"]["hw_cap"] = cn[node]["ret"][
"computenode_vm_hw_virt"
]
else:
ret[node]["vms"]["hw_cap"] = False
if "computenode_vms_running" in cn[node]["ret"]:
ret[node]["vms"]["running"] = cn[node]["ret"][
"computenode_vms_running"
]
else:
ret[node]["vms"]["hw_cap"] = False
if "computenode_vms_running" in cn[node]["ret"]:
ret[node]["vms"]["running"] = cn[node]["ret"][
"computenode_vms_running"
]
else:
ret.append(node)
except SaltClientError as client_error:
return "{}".format(client_error)
ret.append(node)
except SaltClientError as client_error:
return "{}".format(client_error)
if not verbose:
ret.sort()
return ret
if not verbose:
ret.sort()
return ret
def list_vms(search=None, verbose=False):
@ -211,55 +213,55 @@ def list_vms(search=None, verbose=False):
salt-run vmadm.list verbose=True
"""
ret = OrderedDict() if verbose else []
client = salt.client.get_local_client(__opts__["conf_file"])
try:
vmadm_args = {}
vmadm_args["order"] = "uuid,alias,hostname,state,type,cpu_cap,vcpus,ram"
if search:
vmadm_args["search"] = search
for cn in client.cmd_iter(
"G@virtual:physical and G@os:smartos",
"vmadm.list",
kwarg=vmadm_args,
tgt_type="compound",
):
if not cn:
continue
node = next(iter(cn.keys()))
if (
not isinstance(cn[node], dict)
or "ret" not in cn[node]
or not isinstance(cn[node]["ret"], dict)
with salt.client.get_local_client(__opts__["conf_file"]) as client:
try:
vmadm_args = {}
vmadm_args["order"] = "uuid,alias,hostname,state,type,cpu_cap,vcpus,ram"
if search:
vmadm_args["search"] = search
for cn in client.cmd_iter(
"G@virtual:physical and G@os:smartos",
"vmadm.list",
kwarg=vmadm_args,
tgt_type="compound",
):
continue
for vm in cn[node]["ret"]:
vmcfg = cn[node]["ret"][vm]
if verbose:
ret[vm] = OrderedDict()
ret[vm]["hostname"] = vmcfg["hostname"]
ret[vm]["alias"] = vmcfg["alias"]
ret[vm]["computenode"] = node
ret[vm]["state"] = vmcfg["state"]
ret[vm]["resources"] = OrderedDict()
ret[vm]["resources"]["memory"] = vmcfg["ram"]
if vmcfg["type"] == "KVM":
ret[vm]["resources"]["cpu"] = "{:.2f}".format(
int(vmcfg["vcpus"])
)
else:
if vmcfg["cpu_cap"] != "":
if not cn:
continue
node = next(iter(cn.keys()))
if (
not isinstance(cn[node], dict)
or "ret" not in cn[node]
or not isinstance(cn[node]["ret"], dict)
):
continue
for vm in cn[node]["ret"]:
vmcfg = cn[node]["ret"][vm]
if verbose:
ret[vm] = OrderedDict()
ret[vm]["hostname"] = vmcfg["hostname"]
ret[vm]["alias"] = vmcfg["alias"]
ret[vm]["computenode"] = node
ret[vm]["state"] = vmcfg["state"]
ret[vm]["resources"] = OrderedDict()
ret[vm]["resources"]["memory"] = vmcfg["ram"]
if vmcfg["type"] == "KVM":
ret[vm]["resources"]["cpu"] = "{:.2f}".format(
int(vmcfg["cpu_cap"]) / 100
int(vmcfg["vcpus"])
)
else:
ret.append(vm)
except SaltClientError as client_error:
return "{}".format(client_error)
else:
if vmcfg["cpu_cap"] != "":
ret[vm]["resources"]["cpu"] = "{:.2f}".format(
int(vmcfg["cpu_cap"]) / 100
)
else:
ret.append(vm)
except SaltClientError as client_error:
return "{}".format(client_error)
if not verbose:
ret = sorted(ret)
if not verbose:
ret = sorted(ret)
return ret
return ret
def start(search, one=True):

View file

@ -155,19 +155,19 @@ def _get_pool_results(*args, **kwargs):
key: value for (key, value) in kwargs.items() if not key.startswith("_")
}
client = salt.client.get_local_client(__opts__["conf_file"])
try:
minions = client.cmd(
tgt,
cmd,
args[2:],
timeout=__opts__["timeout"],
tgt_type=tgt_type,
kwarg=kwargs_passthru,
)
except SaltClientError as client_error:
print(client_error)
return ret
with salt.client.get_local_client(__opts__["conf_file"]) as client:
try:
minions = client.cmd(
tgt,
cmd,
args[2:],
timeout=__opts__["timeout"],
tgt_type=tgt_type,
kwarg=kwargs_passthru,
)
except SaltClientError as client_error:
print(client_error)
return ret
# hash minion return values as a string
for minion in sorted(minions):

View file

@ -62,10 +62,50 @@ def query(host=None, quiet=False):
if quiet:
log.warning("'quiet' is deprecated. Please migrate to --quiet")
ret = {}
client = salt.client.get_local_client(__opts__["conf_file"])
try:
with salt.client.get_local_client(__opts__["conf_file"]) as client:
try:
for info in client.cmd_iter(
"virtual:physical", "virt.full_info", tgt_type="grain"
):
if not info:
continue
if not isinstance(info, dict):
continue
chunk = {}
id_ = next(iter(info.keys()))
if host:
if host != id_:
continue
if not isinstance(info[id_], dict):
continue
if "ret" not in info[id_]:
continue
if not isinstance(info[id_]["ret"], dict):
continue
chunk[id_] = info[id_]["ret"]
ret.update(chunk)
if not quiet:
__jid_event__.fire_event(
{"data": chunk, "outputter": "virt_query"}, "progress"
)
except SaltClientError as client_error:
print(client_error)
return ret
def list(host=None, quiet=False, hyper=None): # pylint: disable=redefined-builtin
"""
List the virtual machines on each host, this is a simplified query,
showing only the virtual machine names belonging to each host.
A single host can be passed in to specify an individual host
to list.
"""
if quiet:
log.warning("'quiet' is deprecated. Please migrate to --quiet")
ret = {}
with salt.client.get_local_client(__opts__["conf_file"]) as client:
for info in client.cmd_iter(
"virtual:physical", "virt.full_info", tgt_type="grain"
"virtual:physical", "virt.vm_info", tgt_type="grain"
):
if not info:
continue
@ -82,56 +122,20 @@ def query(host=None, quiet=False):
continue
if not isinstance(info[id_]["ret"], dict):
continue
chunk[id_] = info[id_]["ret"]
data = {}
for key, val in info[id_]["ret"].items():
if val["state"] in data:
data[val["state"]].append(key)
else:
data[val["state"]] = [key]
chunk[id_] = data
ret.update(chunk)
if not quiet:
__jid_event__.fire_event(
{"data": chunk, "outputter": "virt_query"}, "progress"
{"data": chunk, "outputter": "nested"}, "progress"
)
except SaltClientError as client_error:
print(client_error)
return ret
def list(host=None, quiet=False, hyper=None): # pylint: disable=redefined-builtin
"""
List the virtual machines on each host, this is a simplified query,
showing only the virtual machine names belonging to each host.
A single host can be passed in to specify an individual host
to list.
"""
if quiet:
log.warning("'quiet' is deprecated. Please migrate to --quiet")
ret = {}
client = salt.client.get_local_client(__opts__["conf_file"])
for info in client.cmd_iter("virtual:physical", "virt.vm_info", tgt_type="grain"):
if not info:
continue
if not isinstance(info, dict):
continue
chunk = {}
id_ = next(iter(info.keys()))
if host:
if host != id_:
continue
if not isinstance(info[id_], dict):
continue
if "ret" not in info[id_]:
continue
if not isinstance(info[id_]["ret"], dict):
continue
data = {}
for key, val in info[id_]["ret"].items():
if val["state"] in data:
data[val["state"]].append(key)
else:
data[val["state"]] = [key]
chunk[id_] = data
ret.update(chunk)
if not quiet:
__jid_event__.fire_event({"data": chunk, "outputter": "nested"}, "progress")
return ret
return ret
def next_host():
@ -266,57 +270,57 @@ def init(
with salt.utils.files.fopen(accepted_key, "w") as fp_:
fp_.write(salt.utils.stringutils.to_str(pub_key))
client = salt.client.get_local_client(__opts__["conf_file"])
with salt.client.get_local_client(__opts__["conf_file"]) as client:
__jid_event__.fire_event(
{"message": "Creating VM {} on host {}".format(name, host)}, "progress"
)
try:
cmd_ret = client.cmd_iter(
host,
"virt.init",
[name, cpu, mem],
timeout=600,
kwarg={
"image": image,
"nic": nic,
"hypervisor": hypervisor,
"start": start,
"disk": disk,
"saltenv": saltenv,
"seed": seed,
"install": install,
"pub_key": pub_key,
"priv_key": priv_key,
"seed_cmd": seed_cmd,
"enable_vnc": enable_vnc,
"enable_qcow": enable_qcow,
"serial_type": serial_type,
},
)
except SaltClientError as client_error:
# Fall through to ret error handling below
print(client_error)
ret = next(cmd_ret)
if not ret:
__jid_event__.fire_event(
{"message": "VM {} was not initialized.".format(name)}, "progress"
{"message": "Creating VM {} on host {}".format(name, host)}, "progress"
)
return "fail"
for minion_id in ret:
if ret[minion_id]["ret"] is False:
print(
"VM {} initialization failed. Returned error: {}".format(
name, ret[minion_id]["ret"]
)
try:
cmd_ret = client.cmd_iter(
host,
"virt.init",
[name, cpu, mem],
timeout=600,
kwarg={
"image": image,
"nic": nic,
"hypervisor": hypervisor,
"start": start,
"disk": disk,
"saltenv": saltenv,
"seed": seed,
"install": install,
"pub_key": pub_key,
"priv_key": priv_key,
"seed_cmd": seed_cmd,
"enable_vnc": enable_vnc,
"enable_qcow": enable_qcow,
"serial_type": serial_type,
},
)
except SaltClientError as client_error:
# Fall through to ret error handling below
print(client_error)
ret = next(cmd_ret)
if not ret:
__jid_event__.fire_event(
{"message": "VM {} was not initialized.".format(name)}, "progress"
)
return "fail"
for minion_id in ret:
if ret[minion_id]["ret"] is False:
print(
"VM {} initialization failed. Returned error: {}".format(
name, ret[minion_id]["ret"]
)
)
return "fail"
__jid_event__.fire_event(
{"message": "VM {} initialized on host {}".format(name, host)}, "progress"
)
return "good"
__jid_event__.fire_event(
{"message": "VM {} initialized on host {}".format(name, host)}, "progress"
)
return "good"
def vm_info(name, quiet=False):
@ -332,22 +336,24 @@ def reset(name):
Force power down and restart an existing VM
"""
ret = {}
client = salt.client.get_local_client(__opts__["conf_file"])
data = vm_info(name, quiet=True)
if not data:
__jid_event__.fire_event(
{"message": "Failed to find VM {} to reset".format(name)}, "progress"
)
return "fail"
host = next(iter(data.keys()))
try:
cmd_ret = client.cmd_iter(host, "virt.reset", [name], timeout=600)
for comp in cmd_ret:
ret.update(comp)
__jid_event__.fire_event({"message": "Reset VM {}".format(name)}, "progress")
except SaltClientError as client_error:
print(client_error)
return ret
with salt.client.get_local_client(__opts__["conf_file"]) as client:
data = vm_info(name, quiet=True)
if not data:
__jid_event__.fire_event(
{"message": "Failed to find VM {} to reset".format(name)}, "progress"
)
return "fail"
host = next(iter(data.keys()))
try:
cmd_ret = client.cmd_iter(host, "virt.reset", [name], timeout=600)
for comp in cmd_ret:
ret.update(comp)
__jid_event__.fire_event(
{"message": "Reset VM {}".format(name)}, "progress"
)
except SaltClientError as client_error:
print(client_error)
return ret
def start(name):
@ -355,25 +361,25 @@ def start(name):
Start a named virtual machine
"""
ret = {}
client = salt.client.get_local_client(__opts__["conf_file"])
data = vm_info(name, quiet=True)
if not data:
__jid_event__.fire_event(
{"message": "Failed to find VM {} to start".format(name)}, "progress"
)
return "fail"
host = next(iter(data.keys()))
if data[host][name]["state"] == "running":
print("VM {} is already running".format(name))
return "bad state"
try:
cmd_ret = client.cmd_iter(host, "virt.start", [name], timeout=600)
except SaltClientError as client_error:
return "Virtual machine {} not started: {}".format(name, client_error)
for comp in cmd_ret:
ret.update(comp)
__jid_event__.fire_event({"message": "Started VM {}".format(name)}, "progress")
return "good"
with salt.client.get_local_client(__opts__["conf_file"]) as client:
data = vm_info(name, quiet=True)
if not data:
__jid_event__.fire_event(
{"message": "Failed to find VM {} to start".format(name)}, "progress"
)
return "fail"
host = next(iter(data.keys()))
if data[host][name]["state"] == "running":
print("VM {} is already running".format(name))
return "bad state"
try:
cmd_ret = client.cmd_iter(host, "virt.start", [name], timeout=600)
except SaltClientError as client_error:
return "Virtual machine {} not started: {}".format(name, client_error)
for comp in cmd_ret:
ret.update(comp)
__jid_event__.fire_event({"message": "Started VM {}".format(name)}, "progress")
return "good"
def force_off(name):
@ -381,25 +387,27 @@ def force_off(name):
Force power down the named virtual machine
"""
ret = {}
client = salt.client.get_local_client(__opts__["conf_file"])
data = vm_info(name, quiet=True)
if not data:
print("Failed to find VM {} to destroy".format(name))
return "fail"
host = next(iter(data.keys()))
if data[host][name]["state"] == "shutdown":
print("VM {} is already shutdown".format(name))
return "bad state"
try:
cmd_ret = client.cmd_iter(host, "virt.stop", [name], timeout=600)
except SaltClientError as client_error:
return "Virtual machine {} could not be forced off: {}".format(
name, client_error
with salt.client.get_local_client(__opts__["conf_file"]) as client:
data = vm_info(name, quiet=True)
if not data:
print("Failed to find VM {} to destroy".format(name))
return "fail"
host = next(iter(data.keys()))
if data[host][name]["state"] == "shutdown":
print("VM {} is already shutdown".format(name))
return "bad state"
try:
cmd_ret = client.cmd_iter(host, "virt.stop", [name], timeout=600)
except SaltClientError as client_error:
return "Virtual machine {} could not be forced off: {}".format(
name, client_error
)
for comp in cmd_ret:
ret.update(comp)
__jid_event__.fire_event(
{"message": "Powered off VM {}".format(name)}, "progress"
)
for comp in cmd_ret:
ret.update(comp)
__jid_event__.fire_event({"message": "Powered off VM {}".format(name)}, "progress")
return "good"
return "good"
def purge(name, delete_key=True):
@ -407,28 +415,30 @@ def purge(name, delete_key=True):
Destroy the named VM
"""
ret = {}
client = salt.client.get_local_client(__opts__["conf_file"])
data = vm_info(name, quiet=True)
if not data:
__jid_event__.fire_event(
{"error": "Failed to find VM {} to purge".format(name)}, "progress"
)
return "fail"
host = next(iter(data.keys()))
try:
cmd_ret = client.cmd_iter(host, "virt.purge", [name, True], timeout=600)
except SaltClientError as client_error:
return "Virtual machine {} could not be purged: {}".format(name, client_error)
with salt.client.get_local_client(__opts__["conf_file"]) as client:
data = vm_info(name, quiet=True)
if not data:
__jid_event__.fire_event(
{"error": "Failed to find VM {} to purge".format(name)}, "progress"
)
return "fail"
host = next(iter(data.keys()))
try:
cmd_ret = client.cmd_iter(host, "virt.purge", [name, True], timeout=600)
except SaltClientError as client_error:
return "Virtual machine {} could not be purged: {}".format(
name, client_error
)
for comp in cmd_ret:
ret.update(comp)
for comp in cmd_ret:
ret.update(comp)
if delete_key:
log.debug("Deleting key %s", name)
with salt.key.Key(__opts__) as skey:
skey.delete_key(name)
__jid_event__.fire_event({"message": "Purged VM {}".format(name)}, "progress")
return "good"
if delete_key:
log.debug("Deleting key %s", name)
with salt.key.Key(__opts__) as skey:
skey.delete_key(name)
__jid_event__.fire_event({"message": "Purged VM {}".format(name)}, "progress")
return "good"
def pause(name):
@ -436,28 +446,30 @@ def pause(name):
Pause the named VM
"""
ret = {}
client = salt.client.get_local_client(__opts__["conf_file"])
with salt.client.get_local_client(__opts__["conf_file"]) as client:
data = vm_info(name, quiet=True)
if not data:
__jid_event__.fire_event(
{"error": "Failed to find VM {} to pause".format(name)}, "progress"
)
return "fail"
host = next(iter(data.keys()))
if data[host][name]["state"] == "paused":
__jid_event__.fire_event(
{"error": "VM {} is already paused".format(name)}, "progress"
)
return "bad state"
try:
cmd_ret = client.cmd_iter(host, "virt.pause", [name], timeout=600)
except SaltClientError as client_error:
return "Virtual machine {} could not be pasued: {}".format(name, client_error)
for comp in cmd_ret:
ret.update(comp)
__jid_event__.fire_event({"message": "Paused VM {}".format(name)}, "progress")
return "good"
data = vm_info(name, quiet=True)
if not data:
__jid_event__.fire_event(
{"error": "Failed to find VM {} to pause".format(name)}, "progress"
)
return "fail"
host = next(iter(data.keys()))
if data[host][name]["state"] == "paused":
__jid_event__.fire_event(
{"error": "VM {} is already paused".format(name)}, "progress"
)
return "bad state"
try:
cmd_ret = client.cmd_iter(host, "virt.pause", [name], timeout=600)
except SaltClientError as client_error:
return "Virtual machine {} could not be pasued: {}".format(
name, client_error
)
for comp in cmd_ret:
ret.update(comp)
__jid_event__.fire_event({"message": "Paused VM {}".format(name)}, "progress")
return "good"
def resume(name):
@ -465,27 +477,29 @@ def resume(name):
Resume a paused VM
"""
ret = {}
client = salt.client.get_local_client(__opts__["conf_file"])
data = vm_info(name, quiet=True)
if not data:
__jid_event__.fire_event(
{"error": "Failed to find VM {} to pause".format(name)}, "progress"
)
return "not found"
host = next(iter(data.keys()))
if data[host][name]["state"] != "paused":
__jid_event__.fire_event(
{"error": "VM {} is not paused".format(name)}, "progress"
)
return "bad state"
try:
cmd_ret = client.cmd_iter(host, "virt.resume", [name], timeout=600)
except SaltClientError as client_error:
return "Virtual machine {} could not be resumed: {}".format(name, client_error)
for comp in cmd_ret:
ret.update(comp)
__jid_event__.fire_event({"message": "Resumed VM {}".format(name)}, "progress")
return "good"
with salt.client.get_local_client(__opts__["conf_file"]) as client:
data = vm_info(name, quiet=True)
if not data:
__jid_event__.fire_event(
{"error": "Failed to find VM {} to pause".format(name)}, "progress"
)
return "not found"
host = next(iter(data.keys()))
if data[host][name]["state"] != "paused":
__jid_event__.fire_event(
{"error": "VM {} is not paused".format(name)}, "progress"
)
return "bad state"
try:
cmd_ret = client.cmd_iter(host, "virt.resume", [name], timeout=600)
except SaltClientError as client_error:
return "Virtual machine {} could not be resumed: {}".format(
name, client_error
)
for comp in cmd_ret:
ret.update(comp)
__jid_event__.fire_event({"message": "Resumed VM {}".format(name)}, "progress")
return "good"
def migrate(name, target=""):
@ -493,39 +507,45 @@ def migrate(name, target=""):
Migrate a VM from one host to another. This routine will just start
the migration and display information on how to look up the progress.
"""
client = salt.client.get_local_client(__opts__["conf_file"])
data = query(quiet=True)
origin_data = _find_vm(name, data, quiet=True)
try:
origin_host = next(iter(origin_data))
except StopIteration:
__jid_event__.fire_event(
{"error": "Named VM {} was not found to migrate".format(name)}, "progress"
)
return ""
disks = origin_data[origin_host][name]["disks"]
if not origin_data:
__jid_event__.fire_event(
{"error": "Named VM {} was not found to migrate".format(name)}, "progress"
)
return ""
if not target:
target = _determine_host(data, origin_host)
if target not in data:
__jid_event__.fire_event(
{"error": "Target host {} not found".format(origin_data)}, "progress"
)
return ""
try:
client.cmd(target, "virt.seed_non_shared_migrate", [disks, True])
jid = client.cmd_async(origin_host, "virt.migrate_non_shared", [name, target])
except SaltClientError as client_error:
return "Virtual machine {} could not be migrated: {}".format(name, client_error)
with salt.client.get_local_client(__opts__["conf_file"]) as client:
data = query(quiet=True)
origin_data = _find_vm(name, data, quiet=True)
try:
origin_host = next(iter(origin_data))
except StopIteration:
__jid_event__.fire_event(
{"error": "Named VM {} was not found to migrate".format(name)},
"progress",
)
return ""
disks = origin_data[origin_host][name]["disks"]
if not origin_data:
__jid_event__.fire_event(
{"error": "Named VM {} was not found to migrate".format(name)},
"progress",
)
return ""
if not target:
target = _determine_host(data, origin_host)
if target not in data:
__jid_event__.fire_event(
{"error": "Target host {} not found".format(origin_data)}, "progress"
)
return ""
try:
client.cmd(target, "virt.seed_non_shared_migrate", [disks, True])
jid = client.cmd_async(
origin_host, "virt.migrate_non_shared", [name, target]
)
except SaltClientError as client_error:
return "Virtual machine {} could not be migrated: {}".format(
name, client_error
)
msg = (
"The migration of virtual machine {} to host {} has begun, "
"and can be tracked via jid {}. The ``salt-run virt.query`` "
"runner can also be used, the target VM will be shown as paused "
"until the migration is complete."
).format(name, target, jid)
__jid_event__.fire_event({"message": msg}, "progress")
msg = (
"The migration of virtual machine {} to host {} has begun, "
"and can be tracked via jid {}. The ``salt-run virt.query`` "
"runner can also be used, the target VM will be shown as paused "
"until the migration is complete."
).format(name, target, jid)
__jid_event__.fire_event({"message": msg}, "progress")

View file

@ -33,9 +33,9 @@ def cmd(name, tgt, func, arg=(), tgt_type="glob", ret="", kwarg=None, **kwargs):
length: 30
"""
ret = {"name": name, "changes": {}, "comment": "", "result": True}
local = salt.client.get_local_client(mopts=__opts__)
jid = local.cmd_async(
tgt, func, arg, tgt_type=tgt_type, ret=ret, kwarg=kwarg, **kwargs
)
ret["changes"]["jid"] = jid
with salt.client.get_local_client(mopts=__opts__) as client:
jid = client.cmd_async(
tgt, func, arg, tgt_type=tgt_type, ret=ret, kwarg=kwarg, **kwargs
)
ret["changes"]["jid"] = jid
return ret

View file

@ -468,17 +468,19 @@ class AsyncTCPPubChannel(
self.connected = False
self._closing = False
self._reconnected = False
self.message_client = None
self.event = salt.utils.event.get_event("minion", opts=self.opts, listen=False)
def close(self):
if self._closing:
return
self._closing = True
if self.message_client is not None:
self.message_client.close()
self.message_client = None
if self.event is not None:
self.event.destroy()
self.event = None
if hasattr(self, "message_client"):
self.message_client.close()
# pylint: disable=W1701
def __del__(self):
@ -1450,6 +1452,7 @@ class PubServer(salt.ext.tornado.tcpserver.TCPServer):
self.clients = set()
self.aes_funcs = salt.master.AESFuncs(self.opts)
self.present = {}
self.event = None
self.presence_events = False
if self.opts.get("presence_events", False):
tcp_only = True
@ -1476,6 +1479,9 @@ class PubServer(salt.ext.tornado.tcpserver.TCPServer):
if self.event is not None:
self.event.destroy()
self.event = None
if self.aes_funcs is not None:
self.aes_funcs.destroy()
self.aes_funcs = None
# pylint: disable=W1701
def __del__(self):

View file

@ -1073,12 +1073,6 @@ class AsyncEventPublisher:
if self.puller is not None:
self.puller.close()
# pylint: disable=W1701
def __del__(self):
self.close()
# pylint: enable=W1701
class EventPublisher(salt.utils.process.SignalHandlingProcess):
"""
@ -1196,12 +1190,6 @@ class EventPublisher(salt.utils.process.SignalHandlingProcess):
self.close()
super()._handle_signals(signum, sigframe)
# pylint: disable=W1701
def __del__(self):
self.close()
# pylint: enable=W1701
class EventReturn(salt.utils.process.SignalHandlingProcess):
"""

View file

@ -243,14 +243,13 @@ class MasterPillarUtil:
def _get_live_minion_grains(self, minion_ids):
# Returns a dict of grains fetched directly from the minions
log.debug('Getting live grains for minions: "%s"', minion_ids)
client = salt.client.get_local_client(self.opts["conf_file"])
ret = client.cmd(
",".join(minion_ids),
"grains.items",
timeout=self.opts["timeout"],
tgt_type="list",
)
return ret
with salt.client.get_local_client(self.opts["conf_file"]) as client:
return client.cmd(
",".join(minion_ids),
"grains.items",
timeout=self.opts["timeout"],
tgt_type="list",
)
def _get_live_minion_pillar(self, minion_id=None, minion_grains=None):
# Returns a dict of pillar data for one minion
@ -841,14 +840,14 @@ class ConnectedCache(Process):
def ping_all_connected_minions(opts):
client = salt.client.LocalClient()
if opts["minion_data_cache"]:
tgt = list(salt.utils.minions.CkMinions(opts).connected_ids())
form = "list"
else:
tgt = "*"
form = "glob"
client.cmd_async(tgt, "test.ping", tgt_type=form)
with salt.client.LocalClient() as client:
client.cmd_async(tgt, "test.ping", tgt_type=form)
def get_master_key(key_user, opts, skip_perm_errors=False):

View file

@ -107,28 +107,29 @@ class SaltifyTestCase(TestCase, LoaderModuleMockMixin):
mock_sleep = MagicMock()
mock_cmd = MagicMock(return_value=True)
mm_cmd = MagicMock(return_value={"friend1": True})
lcl = salt.client.LocalClient()
lcl.cmd = mm_cmd
with patch("time.sleep", mock_sleep):
with patch("salt.client.LocalClient", return_value=lcl):
with patch.dict(
"salt.cloud.clouds.saltify.__utils__", {"cloud.bootstrap": mock_cmd}
):
vm_ = {
"deploy": True,
"driver": "saltify",
"name": "new1",
"profile": "testprofile3",
}
result = saltify.create(vm_)
mock_cmd.assert_called_once_with(vm_, ANY)
mm_cmd.assert_called_with(
"friend1", "network.wol", ["aa-bb-cc-dd-ee-ff"]
)
# The test suite might call time.sleep, look for any call
# that has the expected wait time.
mock_sleep.assert_any_call(0.01)
self.assertTrue(result)
with salt.client.LocalClient() as lcl:
lcl.cmd = mm_cmd
with patch("time.sleep", mock_sleep):
with patch("salt.client.LocalClient", return_value=lcl):
with patch.dict(
"salt.cloud.clouds.saltify.__utils__",
{"cloud.bootstrap": mock_cmd},
):
vm_ = {
"deploy": True,
"driver": "saltify",
"name": "new1",
"profile": "testprofile3",
}
result = saltify.create(vm_)
mock_cmd.assert_called_once_with(vm_, ANY)
mm_cmd.assert_called_with(
"friend1", "network.wol", ["aa-bb-cc-dd-ee-ff"]
)
# The test suite might call time.sleep, look for any call
# that has the expected wait time.
mock_sleep.assert_any_call(0.01)
self.assertTrue(result)
def test_avail_locations(self):
"""
@ -177,19 +178,19 @@ class SaltifyTestCase(TestCase, LoaderModuleMockMixin):
}
}
mm_cmd = MagicMock(return_value=testgrains)
lcl = salt.client.LocalClient()
lcl.cmd = mm_cmd
with patch("salt.client.LocalClient", return_value=lcl):
self.assertEqual(saltify.list_nodes(), expected_result)
with salt.client.LocalClient() as lcl:
lcl.cmd = mm_cmd
with patch("salt.client.LocalClient", return_value=lcl):
self.assertEqual(saltify.list_nodes(), expected_result)
def test_saltify_reboot(self):
mm_cmd = MagicMock(return_value=True)
lcl = salt.client.LocalClient()
lcl.cmd = mm_cmd
with patch("salt.client.LocalClient", return_value=lcl):
result = saltify.reboot("nodeS1", "action")
mm_cmd.assert_called_with("nodeS1", "system.reboot")
self.assertTrue(result)
with salt.client.LocalClient() as lcl:
lcl.cmd = mm_cmd
with patch("salt.client.LocalClient", return_value=lcl):
result = saltify.reboot("nodeS1", "action")
mm_cmd.assert_called_with("nodeS1", "system.reboot")
self.assertTrue(result)
def test_saltify_destroy(self):
# destroy calls local.cmd several times and expects
@ -213,9 +214,9 @@ class SaltifyTestCase(TestCase, LoaderModuleMockMixin):
},
]
mm_cmd = MagicMock(side_effect=result_list)
lcl = salt.client.LocalClient()
lcl.cmd = mm_cmd
with patch("salt.client.LocalClient", return_value=lcl):
result = saltify.destroy("nodeS1", "action")
mm_cmd.assert_called_with("nodeS1", "system.shutdown")
self.assertTrue(result)
with salt.client.LocalClient() as lcl:
lcl.cmd = mm_cmd
with patch("salt.client.LocalClient", return_value=lcl):
result = saltify.destroy("nodeS1", "action")
mm_cmd.assert_called_with("nodeS1", "system.shutdown")
self.assertTrue(result)

View file

@ -168,7 +168,7 @@ class MasterACLTestCase(ModuleCase):
("salt.auth.LoadAuth.time_auth", MagicMock(return_value=True)),
("salt.minion.MasterMinion", MagicMock()),
("salt.utils.verify.check_path_traversal", MagicMock()),
("salt.client.get_local_client", MagicMock(return_value=opts["conf_file"])),
("salt.client.get_local_client", MagicMock()),
)
for mod, mock in patches:
patcher = patch(mod, mock)
@ -734,7 +734,7 @@ class AuthACLTestCase(ModuleCase):
("salt.utils.verify.check_path_traversal", MagicMock()),
("salt.utils.minions.CkMinions.auth_check", self.auth_check_mock),
("salt.auth.LoadAuth.time_auth", MagicMock(return_value=True)),
("salt.client.get_local_client", MagicMock(return_value=opts["conf_file"])),
("salt.client.get_local_client", MagicMock()),
)
for mod, mock in patches:
patcher = patch(mod, mock)

View file

@ -25,12 +25,12 @@ class LocalClientTestCase(TestCase, SaltClientTestCaseMixin):
jid = "0815"
raw_return = {"id": "fake-id", "jid": jid, "data": "", "return": "fake-return"}
expected_return = {"fake-id": {"ret": "fake-return"}}
local_client = client.LocalClient(mopts=self.get_temp_config("master"))
local_client.event.get_event = MagicMock(return_value=raw_return)
local_client.returners = MagicMock()
ret = local_client.get_event_iter_returns(jid, minions)
val = next(ret)
self.assertEqual(val, expected_return)
with client.LocalClient(mopts=self.get_temp_config("master")) as local_client:
local_client.event.get_event = MagicMock(return_value=raw_return)
local_client.returners = MagicMock()
ret = local_client.get_event_iter_returns(jid, minions)
val = next(ret)
self.assertEqual(val, expected_return)
def test_job_result_return_failure(self):
"""
@ -45,21 +45,21 @@ class LocalClientTestCase(TestCase, SaltClientTestCaseMixin):
"data": "",
"return": "fake-return",
}
local_client = client.LocalClient(mopts=self.get_temp_config("master"))
local_client.event.get_event = MagicMock()
local_client.event.get_event.side_effect = [raw_return, None]
local_client.returners = MagicMock()
ret = local_client.get_event_iter_returns(jid, minions)
with self.assertRaises(StopIteration):
next(ret)
with client.LocalClient(mopts=self.get_temp_config("master")) as local_client:
local_client.event.get_event = MagicMock()
local_client.event.get_event.side_effect = [raw_return, None]
local_client.returners = MagicMock()
ret = local_client.get_event_iter_returns(jid, minions)
with self.assertRaises(StopIteration):
next(ret)
def test_create_local_client(self):
local_client = client.LocalClient(mopts=self.get_temp_config("master"))
self.assertIsInstance(
local_client,
client.LocalClient,
"LocalClient did not create a LocalClient instance",
)
with client.LocalClient(mopts=self.get_temp_config("master")) as local_client:
self.assertIsInstance(
local_client,
client.LocalClient,
"LocalClient did not create a LocalClient instance",
)
def test_check_pub_data(self):
just_minions = {"minions": ["m1", "m2"]}

View file

@ -72,6 +72,7 @@ class TransportMethodsTest(TestCase):
"__subclasshook__",
"get_method",
"run_func",
"destroy",
]
for name in dir(aes_funcs):
if name in aes_funcs.expose_methods:
@ -127,6 +128,7 @@ class TransportMethodsTest(TestCase):
"_send_pub",
"_send_ssh_pub",
"get_method",
"destroy",
]
for name in dir(clear_funcs):
if name in clear_funcs.expose_methods: