mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #46326 from kstreee/fix-client-local
Fixes a timing bug of saltnado's client local.
This commit is contained in:
commit
1700a10ebe
1 changed files with 50 additions and 50 deletions
|
@ -371,7 +371,7 @@ class EventListener(object):
|
|||
for (tag, matcher), futures in six.iteritems(self.tag_map):
|
||||
try:
|
||||
is_matched = matcher(mtag, tag)
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
logger.error('Failed to run a matcher.', exc_info=True)
|
||||
is_matched = False
|
||||
|
||||
|
@ -899,7 +899,7 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
|
|||
try:
|
||||
chunk_ret = yield getattr(self, '_disbatch_{0}'.format(low['client']))(low)
|
||||
ret.append(chunk_ret)
|
||||
except EauthAuthenticationError as exc:
|
||||
except EauthAuthenticationError:
|
||||
ret.append('Failed to authenticate')
|
||||
break
|
||||
except Exception as ex:
|
||||
|
@ -914,7 +914,11 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
|
|||
'''
|
||||
Dispatch local client commands
|
||||
'''
|
||||
chunk_ret = {}
|
||||
# Generate jid before triggering a job to subscribe all returns from minions
|
||||
chunk['jid'] = salt.utils.jid.gen_jid()
|
||||
|
||||
# Subscribe returns from minions before firing a job
|
||||
future_minion_map = self.subscribe_minion_returns(chunk['jid'], chunk['tgt'])
|
||||
|
||||
f_call = self._format_call_run_job_async(chunk)
|
||||
# fire a job off
|
||||
|
@ -926,64 +930,71 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
|
|||
# if the job didn't publish, lets not wait around for nothing
|
||||
# TODO: set header??
|
||||
if 'jid' not in pub_data:
|
||||
for future in future_minion_map:
|
||||
try:
|
||||
future.set_result(None)
|
||||
except Exception:
|
||||
pass
|
||||
raise tornado.gen.Return('No minions matched the target. No command was sent, no jid was assigned.')
|
||||
|
||||
# seed minions_remaining with the pub_data
|
||||
minions_remaining = pub_data['minions']
|
||||
|
||||
syndic_min_wait = None
|
||||
if self.application.opts['order_masters']:
|
||||
syndic_min_wait = tornado.gen.sleep(self.application.opts['syndic_wait'])
|
||||
|
||||
# To ensure job_not_running and all_return are terminated by each other, communicate using a future
|
||||
is_finished = Future()
|
||||
|
||||
job_not_running_future = self.job_not_running(pub_data['jid'],
|
||||
chunk['tgt'],
|
||||
f_call['kwargs']['tgt_type'],
|
||||
is_finished,
|
||||
minions_remaining=list(minions_remaining),
|
||||
)
|
||||
is_finished)
|
||||
|
||||
# if we have a min_wait, do that
|
||||
if syndic_min_wait is not None:
|
||||
yield syndic_min_wait
|
||||
|
||||
all_return_future = self.all_returns(pub_data['jid'],
|
||||
is_finished,
|
||||
minions_remaining=list(minions_remaining),
|
||||
)
|
||||
minion_returns_future = self.sanitize_minion_returns(future_minion_map, pub_data['minions'], is_finished)
|
||||
|
||||
yield job_not_running_future
|
||||
raise tornado.gen.Return((yield all_return_future))
|
||||
raise tornado.gen.Return((yield minion_returns_future))
|
||||
|
||||
def subscribe_minion_returns(self, jid, minions):
|
||||
# Subscribe each minion event
|
||||
future_minion_map = {}
|
||||
for minion in minions:
|
||||
tag = tagify([jid, 'ret', minion], 'job')
|
||||
minion_future = self.application.event_listener.get_event(self,
|
||||
tag=tag,
|
||||
matcher=EventListener.exact_matcher,
|
||||
timeout=self.application.opts['timeout'])
|
||||
future_minion_map[minion_future] = minion
|
||||
return future_minion_map
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def all_returns(self,
|
||||
jid,
|
||||
is_finished,
|
||||
minions_remaining=None,
|
||||
):
|
||||
def sanitize_minion_returns(self, future_minion_map, minions, is_finished):
|
||||
'''
|
||||
Return a future which will complete once all returns are completed
|
||||
(according to minions_remaining), or one of the passed in "is_finished" completes
|
||||
(according to minions), or one of the passed in "finish_chunk_ret_future" completes
|
||||
'''
|
||||
if minions_remaining is None:
|
||||
minions_remaining = []
|
||||
if minions is None:
|
||||
minions = []
|
||||
|
||||
# Remove redundant minions
|
||||
redundant_minion_futures = [future for future in future_minion_map.keys() if future_minion_map[future] not in minions]
|
||||
for redundant_minion_future in redundant_minion_futures:
|
||||
try:
|
||||
redundant_minion_future.set_result(None)
|
||||
except Exception:
|
||||
pass
|
||||
del future_minion_map[redundant_minion_future]
|
||||
|
||||
chunk_ret = {}
|
||||
|
||||
minion_events = {}
|
||||
for minion in minions_remaining:
|
||||
tag = tagify([jid, 'ret', minion], 'job')
|
||||
minion_event = self.application.event_listener.get_event(self,
|
||||
tag=tag,
|
||||
matcher=EventListener.exact_matcher,
|
||||
timeout=self.application.opts['timeout'])
|
||||
minion_events[minion_event] = minion
|
||||
|
||||
while True:
|
||||
f = yield Any(minion_events.keys() + [is_finished])
|
||||
f = yield Any(future_minion_map.keys() + [is_finished])
|
||||
try:
|
||||
# When finished entire routine, cleanup other futures and return result
|
||||
if f is is_finished:
|
||||
for event in minion_events:
|
||||
for event in future_minion_map.keys():
|
||||
if not event.done():
|
||||
event.set_result(None)
|
||||
raise tornado.gen.Return(chunk_ret)
|
||||
|
@ -994,31 +1005,22 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
|
|||
|
||||
# clear finished event future
|
||||
try:
|
||||
minions_remaining.remove(minion_events[f])
|
||||
del minion_events[f]
|
||||
minions.remove(future_minion_map[f])
|
||||
del future_minion_map[f]
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if len(minions_remaining) == 0:
|
||||
if not minions:
|
||||
if not is_finished.done():
|
||||
is_finished.set_result(True)
|
||||
raise tornado.gen.Return(chunk_ret)
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def job_not_running(self,
|
||||
jid,
|
||||
tgt,
|
||||
tgt_type,
|
||||
is_finished,
|
||||
minions_remaining=None,
|
||||
):
|
||||
def job_not_running(self, jid, tgt, tgt_type, is_finished):
|
||||
'''
|
||||
Return a future which will complete once jid (passed in) is no longer
|
||||
running on tgt
|
||||
'''
|
||||
if minions_remaining is None:
|
||||
minions_remaining = []
|
||||
|
||||
ping_pub_data = yield self.saltclients['local'](tgt,
|
||||
'saltutil.find_job',
|
||||
[jid],
|
||||
|
@ -1052,13 +1054,11 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
|
|||
ping_tag = tagify([ping_pub_data['jid'], 'ret'], 'job')
|
||||
minion_running = False
|
||||
continue
|
||||
|
||||
# Minions can return, we want to see if the job is running...
|
||||
if event['data'].get('return', {}) == {}:
|
||||
continue
|
||||
minion_running = True
|
||||
id_ = event['data']['id']
|
||||
if id_ not in minions_remaining:
|
||||
minions_remaining.append(event['data']['id'])
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def _disbatch_local_async(self, chunk):
|
||||
|
|
Loading…
Add table
Reference in a new issue