mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #48193 from jacksontj/issue_42659
Properly wait on returns in saltnado
This commit is contained in:
commit
08735c9cd7
2 changed files with 65 additions and 60 deletions
|
@ -944,9 +944,11 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
|
|||
# Generate jid before triggering a job to subscribe all returns from minions
|
||||
chunk['jid'] = salt.utils.jid.gen_jid(self.application.opts)
|
||||
|
||||
# Subscribe returns from minions before firing a job
|
||||
minions = self.ckminions.check_minions(chunk['tgt'], chunk.get('tgt_type', 'glob')).get('minions', list())
|
||||
future_minion_map = self.subscribe_minion_returns(chunk['jid'], minions)
|
||||
# start listening for the event before we fire the job to avoid races
|
||||
events = [
|
||||
self.application.event_listener.get_event(self, tag='salt/job/'+chunk['jid']),
|
||||
self.application.event_listener.get_event(self, tag='syndic/job/'+chunk['jid']),
|
||||
]
|
||||
|
||||
f_call = self._format_call_run_job_async(chunk)
|
||||
# fire a job off
|
||||
|
@ -955,88 +957,91 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
|
|||
# if the job didn't publish, lets not wait around for nothing
|
||||
# TODO: set header??
|
||||
if 'jid' not in pub_data:
|
||||
for future in future_minion_map:
|
||||
for future in events:
|
||||
try:
|
||||
future.set_result(None)
|
||||
except Exception:
|
||||
pass
|
||||
raise tornado.gen.Return('No minions matched the target. No command was sent, no jid was assigned.')
|
||||
|
||||
# Map of minion_id -> returned for all minions we think we need to wait on
|
||||
minions = {m: False for m in pub_data['minions']}
|
||||
|
||||
# minimum time required for return to complete. By default no waiting, if
|
||||
# we are a syndic then we must wait syndic_wait at a minimum
|
||||
min_wait_time = Future().set_result(True)
|
||||
|
||||
# wait syndic a while to avoid missing published events
|
||||
if self.application.opts['order_masters']:
|
||||
yield tornado.gen.sleep(self.application.opts['syndic_wait'])
|
||||
min_wait_time = tornado.gen.sleep(self.application.opts['syndic_wait'])
|
||||
|
||||
# To ensure job_not_running and all_return are terminated by each other, communicate using a future
|
||||
is_finished = Future()
|
||||
is_finished = tornado.gen.sleep(self.application.opts['gather_job_timeout'])
|
||||
|
||||
job_not_running_future = self.job_not_running(pub_data['jid'],
|
||||
# ping until the job is not running, while doing so, if we see new minions returning
|
||||
# that they are running the job, add them to the list
|
||||
tornado.ioloop.IOLoop.current().spawn_callback(self.job_not_running, pub_data['jid'],
|
||||
chunk['tgt'],
|
||||
f_call['kwargs']['tgt_type'],
|
||||
minions,
|
||||
is_finished)
|
||||
|
||||
minion_returns_future = self.sanitize_minion_returns(future_minion_map, pub_data['minions'], is_finished)
|
||||
|
||||
yield job_not_running_future
|
||||
raise tornado.gen.Return((yield minion_returns_future))
|
||||
|
||||
def subscribe_minion_returns(self, jid, minions):
|
||||
# Subscribe each minion event
|
||||
future_minion_map = {}
|
||||
for minion in minions:
|
||||
tag = tagify([jid, 'ret', minion], 'job')
|
||||
minion_future = self.application.event_listener.get_event(self,
|
||||
tag=tag,
|
||||
matcher=EventListener.exact_matcher)
|
||||
future_minion_map[minion_future] = minion
|
||||
return future_minion_map
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def sanitize_minion_returns(self, future_minion_map, minions, is_finished):
|
||||
'''
|
||||
Return a future which will complete once all returns are completed
|
||||
(according to minions), or one of the passed in "finish_chunk_ret_future" completes
|
||||
'''
|
||||
if minions is None:
|
||||
minions = []
|
||||
|
||||
# Remove redundant minions
|
||||
redundant_minion_futures = [future for future in future_minion_map.keys() if future_minion_map[future] not in minions]
|
||||
for redundant_minion_future in redundant_minion_futures:
|
||||
try:
|
||||
redundant_minion_future.set_result(None)
|
||||
except Exception:
|
||||
pass
|
||||
del future_minion_map[redundant_minion_future]
|
||||
def more_todo():
|
||||
'''Check if there are any more minions we are waiting on returns from
|
||||
'''
|
||||
return any(x is False for x in six.itervalues(minions))
|
||||
|
||||
# here we want to follow the behavior of LocalClient.get_iter_returns
|
||||
# namely we want to wait at least syndic_wait (assuming we are a syndic)
|
||||
# and that there are no more jobs running on minions. We are allowed to exit
|
||||
# early if gather_job_timeout has been exceeded
|
||||
chunk_ret = {}
|
||||
while True:
|
||||
f = yield Any(list(future_minion_map.keys()) + [is_finished])
|
||||
to_wait = events+[is_finished]
|
||||
if not min_wait_time.done():
|
||||
to_wait += [min_wait_time]
|
||||
|
||||
def cancel_inflight_futures():
|
||||
for event in to_wait:
|
||||
if not event.done():
|
||||
event.set_result(None)
|
||||
f = yield Any(to_wait)
|
||||
try:
|
||||
# When finished entire routine, cleanup other futures and return result
|
||||
if f is is_finished:
|
||||
for event in future_minion_map.keys():
|
||||
if not event.done():
|
||||
event.set_result(None)
|
||||
cancel_inflight_futures()
|
||||
raise tornado.gen.Return(chunk_ret)
|
||||
elif f is min_wait_time:
|
||||
if not more_todo():
|
||||
cancel_inflight_futures()
|
||||
raise tornado.gen.Return(chunk_ret)
|
||||
continue
|
||||
f_result = f.result()
|
||||
chunk_ret[f_result['data']['id']] = f_result['data']['return']
|
||||
# if this is a start, then we need to add it to the pile
|
||||
if f_result['tag'].endswith('/new'):
|
||||
for minion_id in f_result['data']['minions']:
|
||||
if minion_id not in minions:
|
||||
minions[minion_id] = False
|
||||
else:
|
||||
chunk_ret[f_result['data']['id']] = f_result['data']['return']
|
||||
# clear finished event future
|
||||
minions[f_result['data']['id']] = True
|
||||
|
||||
# if there are no more minions to wait for, then we are done
|
||||
if not more_todo() and min_wait_time.done():
|
||||
cancel_inflight_futures()
|
||||
raise tornado.gen.Return(chunk_ret)
|
||||
|
||||
except TimeoutException:
|
||||
pass
|
||||
|
||||
# clear finished event future
|
||||
try:
|
||||
minions.remove(future_minion_map[f])
|
||||
del future_minion_map[f]
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if not minions:
|
||||
if not is_finished.done():
|
||||
is_finished.set_result(True)
|
||||
raise tornado.gen.Return(chunk_ret)
|
||||
if f == events[0]:
|
||||
events[0] = self.application.event_listener.get_event(self, tag='salt/job/'+chunk['jid'])
|
||||
else:
|
||||
events[1] = self.application.event_listener.get_event(self, tag='syndic/job/'+chunk['jid'])
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def job_not_running(self, jid, tgt, tgt_type, is_finished):
|
||||
def job_not_running(self, jid, tgt, tgt_type, minions, is_finished):
|
||||
'''
|
||||
Return a future which will complete once jid (passed in) is no longer
|
||||
running on tgt
|
||||
|
@ -1062,8 +1067,6 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
|
|||
event = f.result()
|
||||
except TimeoutException:
|
||||
if not minion_running:
|
||||
if not is_finished.done():
|
||||
is_finished.set_result(True)
|
||||
raise tornado.gen.Return(True)
|
||||
else:
|
||||
ping_pub_data = yield self.saltclients['local'](tgt,
|
||||
|
@ -1077,6 +1080,8 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
|
|||
# Minions can return, we want to see if the job is running...
|
||||
if event['data'].get('return', {}) == {}:
|
||||
continue
|
||||
if event['data']['id'] not in minions:
|
||||
minions[event['data']['id']] = False
|
||||
minion_running = True
|
||||
|
||||
@tornado.gen.coroutine
|
||||
|
|
|
@ -264,7 +264,7 @@ class TestSaltAPIHandler(_SaltnadoIntegrationTestCase):
|
|||
'fun': 'test.ping',
|
||||
}
|
||||
|
||||
self.application.opts['order_masters'] = ['']
|
||||
self.application.opts['order_masters'] = True
|
||||
self.application.opts['syndic_wait'] = 5
|
||||
|
||||
response = self.fetch('/',
|
||||
|
|
Loading…
Add table
Reference in a new issue