mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Deltaproxy test fix
This commit is contained in:
parent
ade9da2703
commit
6a5e032214
5 changed files with 36 additions and 39 deletions
|
@ -1308,7 +1308,7 @@ class SAuth(AsyncAuth):
|
|||
self.authenticate()
|
||||
return self._crypticle
|
||||
|
||||
def authenticate(self): # TODO: remove unused var
|
||||
def authenticate(self, _=None): # TODO: remove unused var
|
||||
"""
|
||||
Authenticate with the master, this method breaks the functional
|
||||
paradigm, it will update the master information from a fresh sign
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
# Proxy minion metaproxy modules
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import concurrent.futures
|
||||
import logging
|
||||
import os
|
||||
|
@ -58,6 +59,7 @@ from salt.utils.process import SignalHandlingProcess, default_signals
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@salt.ext.tornado.gen.coroutine
|
||||
def post_master_init(self, master):
|
||||
"""
|
||||
Function to finish init after a deltaproxy proxy
|
||||
|
@ -337,31 +339,19 @@ def post_master_init(self, master):
|
|||
_failed = list()
|
||||
if self.opts["proxy"].get("parallel_startup"):
|
||||
log.debug("Initiating parallel startup for proxies")
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
futures = {
|
||||
executor.submit(
|
||||
subproxy_post_master_init,
|
||||
waitfor = []
|
||||
for _id in self.opts["proxy"].get("ids", []):
|
||||
waitfor.append(
|
||||
subproxy_post_master_init(
|
||||
_id,
|
||||
uid,
|
||||
self.opts,
|
||||
self.proxy,
|
||||
self.utils,
|
||||
): _id
|
||||
for _id in self.opts["proxy"].get("ids", [])
|
||||
}
|
||||
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
try:
|
||||
sub_proxy_data = future.result()
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
_id = futures[future]
|
||||
log.info(
|
||||
"An exception occured during initialization for %s, skipping: %s",
|
||||
_id,
|
||||
exc,
|
||||
)
|
||||
_failed.append(_id)
|
||||
continue
|
||||
)
|
||||
results = yield salt.ext.tornado.gen.multi(waitfor)
|
||||
for sub_proxy_data in results:
|
||||
minion_id = sub_proxy_data["proxy_opts"].get("id")
|
||||
|
||||
if sub_proxy_data["proxy_minion"]:
|
||||
|
@ -378,7 +368,7 @@ def post_master_init(self, master):
|
|||
log.debug("Initiating non-parallel startup for proxies")
|
||||
for _id in self.opts["proxy"].get("ids", []):
|
||||
try:
|
||||
sub_proxy_data = subproxy_post_master_init(
|
||||
sub_proxy_data = yield subproxy_post_master_init(
|
||||
_id, uid, self.opts, self.proxy, self.utils
|
||||
)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
|
@ -407,6 +397,7 @@ def post_master_init(self, master):
|
|||
self.ready = True
|
||||
|
||||
|
||||
@salt.ext.tornado.gen.coroutine
|
||||
def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils):
|
||||
"""
|
||||
Function to finish init after a deltaproxy proxy
|
||||
|
@ -415,6 +406,7 @@ def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils):
|
|||
This is primarily loading modules, pillars, etc. (since they need
|
||||
to know which master they connected to) for the sub proxy minions.
|
||||
"""
|
||||
|
||||
proxy_grains = {}
|
||||
proxy_pillar = {}
|
||||
|
||||
|
@ -433,7 +425,7 @@ def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils):
|
|||
proxy_grains = salt.loader.grains(
|
||||
proxyopts, proxy=main_proxy, context=proxy_context
|
||||
)
|
||||
proxy_pillar = salt.pillar.get_pillar(
|
||||
proxy_pillar = yield salt.pillar.get_async_pillar(
|
||||
proxyopts,
|
||||
proxy_grains,
|
||||
minion_id,
|
||||
|
@ -577,7 +569,9 @@ def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils):
|
|||
"__proxy_keepalive", persist=True, fire_event=False
|
||||
)
|
||||
|
||||
return {"proxy_minion": _proxy_minion, "proxy_opts": proxyopts}
|
||||
raise salt.ext.tornado.gen.Return(
|
||||
{"proxy_minion": _proxy_minion, "proxy_opts": proxyopts}
|
||||
)
|
||||
|
||||
|
||||
def target(cls, minion_instance, opts, data, connected):
|
||||
|
@ -598,11 +592,10 @@ def target(cls, minion_instance, opts, data, connected):
|
|||
uid = salt.utils.user.get_uid(user=opts.get("user", None))
|
||||
minion_instance.proc_dir = salt.minion.get_proc_dir(opts["cachedir"], uid=uid)
|
||||
|
||||
with salt.ext.tornado.stack_context.StackContext(minion_instance.ctx):
|
||||
if isinstance(data["fun"], tuple) or isinstance(data["fun"], list):
|
||||
ProxyMinion._thread_multi_return(minion_instance, opts, data)
|
||||
else:
|
||||
ProxyMinion._thread_return(minion_instance, opts, data)
|
||||
if isinstance(data["fun"], tuple) or isinstance(data["fun"], list):
|
||||
ProxyMinion._thread_multi_return(minion_instance, opts, data)
|
||||
else:
|
||||
ProxyMinion._thread_return(minion_instance, opts, data)
|
||||
|
||||
|
||||
def thread_return(cls, minion_instance, opts, data):
|
||||
|
@ -1130,7 +1123,9 @@ def tune_in(self, start=True):
|
|||
if self.opts["proxy"].get("parallel_startup"):
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
futures = [
|
||||
executor.submit(subproxy_tune_in, self.deltaproxy_objs[proxy_minion])
|
||||
executor.submit(
|
||||
threaded_subproxy_tune_in, self.deltaproxy_objs[proxy_minion]
|
||||
)
|
||||
for proxy_minion in self.deltaproxy_objs
|
||||
]
|
||||
|
||||
|
@ -1144,6 +1139,12 @@ def tune_in(self, start=True):
|
|||
super(ProxyMinion, self).tune_in(start=start)
|
||||
|
||||
|
||||
def threaded_subproxy_tune_in(proxy_minion):
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
subproxy_tune_in(proxy_minion)
|
||||
|
||||
|
||||
def subproxy_tune_in(proxy_minion, start=True):
|
||||
"""
|
||||
Tunein sub proxy minions
|
||||
|
@ -1152,5 +1153,4 @@ def subproxy_tune_in(proxy_minion, start=True):
|
|||
proxy_minion.setup_beacons()
|
||||
proxy_minion.add_periodic_callback("cleanup", proxy_minion.cleanup_subprocesses)
|
||||
proxy_minion._state_run()
|
||||
|
||||
return proxy_minion
|
||||
|
|
|
@ -380,11 +380,10 @@ def target(cls, minion_instance, opts, data, connected):
|
|||
opts["cachedir"], uid=uid
|
||||
)
|
||||
|
||||
with salt.ext.tornado.stack_context.StackContext(minion_instance.ctx):
|
||||
if isinstance(data["fun"], tuple) or isinstance(data["fun"], list):
|
||||
ProxyMinion._thread_multi_return(minion_instance, opts, data)
|
||||
else:
|
||||
ProxyMinion._thread_return(minion_instance, opts, data)
|
||||
if isinstance(data["fun"], tuple) or isinstance(data["fun"], list):
|
||||
ProxyMinion._thread_multi_return(minion_instance, opts, data)
|
||||
else:
|
||||
ProxyMinion._thread_return(minion_instance, opts, data)
|
||||
|
||||
|
||||
def thread_return(cls, minion_instance, opts, data):
|
||||
|
|
|
@ -3820,7 +3820,7 @@ class ProxyMinion(Minion):
|
|||
functions.
|
||||
"""
|
||||
mp_call = _metaproxy_call(self.opts, "post_master_init")
|
||||
return mp_call(self, master)
|
||||
yield mp_call(self, master)
|
||||
|
||||
@salt.ext.tornado.gen.coroutine
|
||||
def subproxy_post_master_init(self, minion_id, uid):
|
||||
|
@ -3830,7 +3830,7 @@ class ProxyMinion(Minion):
|
|||
:rtype : None
|
||||
"""
|
||||
mp_call = _metaproxy_call(self.opts, "subproxy_post_master_init")
|
||||
return mp_call(self, minion_id, uid)
|
||||
yield mp_call(self, minion_id, uid)
|
||||
|
||||
def tune_in(self, start=True):
|
||||
"""
|
||||
|
|
|
@ -20,7 +20,6 @@ pytestmark = [
|
|||
reason="Deltaproxy minions do not currently work on spawning platforms.",
|
||||
),
|
||||
pytest.mark.core_test,
|
||||
pytest.mark.skip(reason="Nest patch needs testing"),
|
||||
]
|
||||
|
||||
|
||||
|
@ -205,7 +204,6 @@ def test_exit_status_correct_usage_large_number_of_minions(
|
|||
|
||||
with factory.started():
|
||||
assert factory.is_running()
|
||||
|
||||
# Let's issue a ping the control proxy
|
||||
ret = salt_cli.run("test.ping", minion_tgt=proxy_minion_id)
|
||||
assert ret.returncode == 0
|
||||
|
|
Loading…
Add table
Reference in a new issue