Adding option to having deltaproxy sub proxies startup sequentially by default and in parallel if parallel_startup is configured on the deltaproxy. Updating tests to test with parallel and non-parallel startup.

This commit is contained in:
Gareth J. Greenaway 2022-11-01 12:40:25 -07:00
parent ab558db0b8
commit bb2dd3da3b
No known key found for this signature in database
GPG key ID: 10B62F8A7CAD7A41
3 changed files with 96 additions and 32 deletions

View file

@ -320,28 +320,55 @@ def post_master_init(self, master):
self.proxy_pillar = {}
self.proxy_context = {}
self.add_periodic_callback("cleanup", self.cleanup_subprocesses)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(
subproxy_post_master_init, _id, uid, self.opts, self.proxy, self.utils
)
for _id in self.opts["proxy"].get("ids", [])
]
for f in concurrent.futures.as_completed(futures):
sub_proxy_data = f.result()
minion_id = sub_proxy_data["proxy_opts"].get("id")
if sub_proxy_data["proxy_minion"]:
self.deltaproxy_opts[minion_id] = sub_proxy_data["proxy_opts"]
self.deltaproxy_objs[minion_id] = sub_proxy_data["proxy_minion"]
if self.deltaproxy_opts[minion_id] and self.deltaproxy_objs[minion_id]:
self.deltaproxy_objs[
minion_id
].req_channel = salt.transport.client.AsyncReqChannel.factory(
sub_proxy_data["proxy_opts"], io_loop=self.io_loop
if self.opts["proxy"].get("parallel_startup"):
log.debug("Doing parallel startup")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(
subproxy_post_master_init,
_id,
uid,
self.opts,
self.proxy,
self.utils,
)
for _id in self.opts["proxy"].get("ids", [])
]
for f in concurrent.futures.as_completed(futures):
sub_proxy_data = f.result()
minion_id = sub_proxy_data["proxy_opts"].get("id")
if sub_proxy_data["proxy_minion"]:
self.deltaproxy_opts[minion_id] = sub_proxy_data["proxy_opts"]
self.deltaproxy_objs[minion_id] = sub_proxy_data["proxy_minion"]
if self.deltaproxy_opts[minion_id] and self.deltaproxy_objs[minion_id]:
self.deltaproxy_objs[
minion_id
].req_channel = salt.transport.client.AsyncReqChannel.factory(
sub_proxy_data["proxy_opts"], io_loop=self.io_loop
)
else:
log.debug("Doing non-parallel startup")
for _id in self.opts["proxy"].get("ids", []):
sub_proxy_data = subproxy_post_master_init(
_id, uid, self.opts, self.proxy, self.utils
)
minion_id = sub_proxy_data["proxy_opts"].get("id")
if sub_proxy_data["proxy_minion"]:
self.deltaproxy_opts[minion_id] = sub_proxy_data["proxy_opts"]
self.deltaproxy_objs[minion_id] = sub_proxy_data["proxy_minion"]
if self.deltaproxy_opts[minion_id] and self.deltaproxy_objs[minion_id]:
self.deltaproxy_objs[
minion_id
].req_channel = salt.transport.client.AsyncReqChannel.factory(
sub_proxy_data["proxy_opts"], io_loop=self.io_loop
)
self.ready = True
@ -1060,15 +1087,20 @@ def tune_in(self, start=True):
Lock onto the publisher. This is the main event loop for the minion
:rtype : None
"""
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(subproxy_tune_in, self.deltaproxy_objs[proxy_minion])
for proxy_minion in self.deltaproxy_objs
]
if self.opts["proxy"].get("parallel_startup"):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(subproxy_tune_in, self.deltaproxy_objs[proxy_minion])
for proxy_minion in self.deltaproxy_objs
]
for f in concurrent.futures.as_completed(futures):
_proxy_minion = f.result()
log.debug("Tune in for sub proxy %r finished", _proxy_minion.opts.get("id"))
for f in concurrent.futures.as_completed(futures):
_proxy_minion = f.result()
log.debug("Tune in for sub proxy %r finished", _proxy_minion.opts.get("id"))
else:
for proxy_minion in self.deltaproxy_objs:
_proxy_minion = subproxy_tune_in(self.deltaproxy_objs[proxy_minion])
log.debug("Tune in for sub proxy %r finished", _proxy_minion.opts.get("id"))
super(ProxyMinion, self).tune_in(start=start)

View file

@ -116,10 +116,16 @@ def test_exit_status_unknown_argument(salt_master, proxy_minion_id):
# Hangs on Windows. You can add a timeout to the proxy.run command, but then
# it just times out.
@pytest.mark.skip_on_windows(reason=PRE_PYTEST_SKIP_REASON)
@pytest.mark.parametrize(
"parallel_startup",
[True, False],
ids=["parallel_startup=True", "parallel_startup=False"],
)
def test_exit_status_correct_usage(
salt_master,
salt_cli,
proxy_minion_id,
parallel_startup,
):
"""
Ensure the salt-proxy control proxy starts and
@ -153,11 +159,12 @@ def test_exit_status_correct_usage(
controlproxy_pillar_file = """
proxy:
proxytype: deltaproxy
parallel_startup: {}
ids:
- {}
- {}
""".format(
proxy_one, proxy_two
parallel_startup, proxy_one, proxy_two
)
dummy_proxy_one_pillar_file = """
@ -227,10 +234,16 @@ def test_exit_status_correct_usage(
# Hangs on Windows. You can add a timeout to the proxy.run command, but then
# it just times out.
@pytest.mark.skip_on_windows(reason=PRE_PYTEST_SKIP_REASON)
@pytest.mark.parametrize(
"parallel_startup",
[True, False],
ids=["parallel_startup=True", "parallel_startup=False"],
)
def test_missing_pillar_file(
salt_master,
salt_cli,
proxy_minion_id,
parallel_startup,
):
"""
Ensure that the control proxy minion starts up when
@ -258,11 +271,12 @@ def test_missing_pillar_file(
controlproxy_pillar_file = """
proxy:
proxytype: deltaproxy
parallel_startup: {}
ids:
- {}
- {}
""".format(
proxy_one, proxy_two
parallel_startup, proxy_one, proxy_two
)
dummy_proxy_one_pillar_file = """
@ -318,10 +332,16 @@ def test_missing_pillar_file(
# Hangs on Windows. You can add a timeout to the proxy.run command, but then
# it just times out.
@pytest.mark.skip_on_windows(reason=PRE_PYTEST_SKIP_REASON)
@pytest.mark.parametrize(
"parallel_startup",
[True, False],
ids=["parallel_startup=True", "parallel_startup=False"],
)
def test_invalid_connection(
salt_master,
salt_cli,
proxy_minion_id,
parallel_startup,
):
"""
Ensure that the control proxy minion starts up when
@ -356,12 +376,13 @@ def test_invalid_connection(
controlproxy_pillar_file = """
proxy:
proxytype: deltaproxy
parallel_startup: {}
ids:
- {}
- {}
- {}
""".format(
broken_proxy_one, broken_proxy_two, proxy_one
parallel_startup, broken_proxy_one, broken_proxy_two, proxy_one
)
dummy_proxy_one_pillar_file = """

View file

@ -12,7 +12,16 @@ def salt_proxy(salt_master, salt_proxy_factory):
@pytest.fixture(scope="module")
def deltaproxy_pillar_tree(salt_master, salt_delta_proxy_factory):
def deltaproxy_parallel_startup():
yield from [True, False]
@pytest.fixture(
scope="module",
params=[True, False],
ids=["parallel_startup=True", "parallel_startup=False"],
)
def deltaproxy_pillar_tree(request, salt_master, salt_delta_proxy_factory):
"""
Create the pillar files for controlproxy and two dummy proxy minions
"""
@ -45,12 +54,14 @@ def deltaproxy_pillar_tree(salt_master, salt_delta_proxy_factory):
controlproxy_pillar_file = """
proxy:
proxytype: deltaproxy
parallel_startup: {}
ids:
- {}
- {}
- {}
- {}
""".format(
request.param,
proxy_one,
proxy_two,
proxy_three,