Merge pull request #48295 from rallytime/bp-48193

Back-port #48193 to 2017.7
This commit is contained in:
Nicole Thomas 2018-06-26 19:42:17 -04:00 committed by GitHub
commit 21ed5b97ce
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 59 deletions

View file

@ -923,9 +923,11 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
# Generate jid before triggering a job to subscribe all returns from minions
chunk['jid'] = salt.utils.jid.gen_jid()
# Subscribe returns from minions before firing a job
minions = set(self.ckminions.check_minions(chunk['tgt'], chunk.get('tgt_type', 'glob')))
future_minion_map = self.subscribe_minion_returns(chunk['jid'], minions)
# start listening for the event before we fire the job to avoid races
events = [
self.application.event_listener.get_event(self, tag='salt/job/'+chunk['jid']),
self.application.event_listener.get_event(self, tag='syndic/job/'+chunk['jid']),
]
f_call = self._format_call_run_job_async(chunk)
# fire a job off
@ -937,88 +939,92 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
# if the job didn't publish, lets not wait around for nothing
# TODO: set header??
if 'jid' not in pub_data:
for future in future_minion_map:
for future in events:
try:
future.set_result(None)
except Exception:
pass
raise tornado.gen.Return('No minions matched the target. No command was sent, no jid was assigned.')
# Map of minion_id -> returned for all minions we think we need to wait on
minions = {m: False for m in pub_data['minions']}
# minimum time required for return to complete. By default no waiting, if
# we are a syndic then we must wait syndic_wait at a minimum
min_wait_time = Future()
min_wait_time.set_result(True)
# wait syndic a while to avoid missing published events
if self.application.opts['order_masters']:
yield tornado.gen.sleep(self.application.opts['syndic_wait'])
min_wait_time = tornado.gen.sleep(self.application.opts['syndic_wait'])
# To ensure job_not_running and all_return are terminated by each other, communicate using a future
is_finished = Future()
is_finished = tornado.gen.sleep(self.application.opts['gather_job_timeout'])
job_not_running_future = self.job_not_running(pub_data['jid'],
# ping until the job is not running, while doing so, if we see new minions returning
# that they are running the job, add them to the list
tornado.ioloop.IOLoop.current().spawn_callback(self.job_not_running, pub_data['jid'],
chunk['tgt'],
f_call['kwargs']['tgt_type'],
minions,
is_finished)
minion_returns_future = self.sanitize_minion_returns(future_minion_map, pub_data['minions'], is_finished)
yield job_not_running_future
raise tornado.gen.Return((yield minion_returns_future))
def subscribe_minion_returns(self, jid, minions):
# Subscribe each minion event
future_minion_map = {}
for minion in minions:
tag = tagify([jid, 'ret', minion], 'job')
minion_future = self.application.event_listener.get_event(self,
tag=tag,
matcher=EventListener.exact_matcher)
future_minion_map[minion_future] = minion
return future_minion_map
@tornado.gen.coroutine
def sanitize_minion_returns(self, future_minion_map, minions, is_finished):
'''
Return a future which will complete once all returns are completed
(according to minions), or one of the passed in "finish_chunk_ret_future" completes
'''
if minions is None:
minions = []
# Remove redundant minions
redundant_minion_futures = [future for future in future_minion_map.keys() if future_minion_map[future] not in minions]
for redundant_minion_future in redundant_minion_futures:
try:
redundant_minion_future.set_result(None)
except Exception:
pass
del future_minion_map[redundant_minion_future]
def more_todo():
'''Check if there are any more minions we are waiting on returns from
'''
return any(x is False for x in six.itervalues(minions))
# here we want to follow the behavior of LocalClient.get_iter_returns
# namely we want to wait at least syndic_wait (assuming we are a syndic)
# and that there are no more jobs running on minions. We are allowed to exit
# early if gather_job_timeout has been exceeded
chunk_ret = {}
while True:
f = yield Any(list(future_minion_map.keys()) + [is_finished])
to_wait = events+[is_finished]
if not min_wait_time.done():
to_wait += [min_wait_time]
def cancel_inflight_futures():
for event in to_wait:
if not event.done():
event.set_result(None)
f = yield Any(to_wait)
try:
# When finished entire routine, cleanup other futures and return result
if f is is_finished:
for event in future_minion_map.keys():
if not event.done():
event.set_result(None)
cancel_inflight_futures()
raise tornado.gen.Return(chunk_ret)
elif f is min_wait_time:
if not more_todo():
cancel_inflight_futures()
raise tornado.gen.Return(chunk_ret)
continue
f_result = f.result()
chunk_ret[f_result['data']['id']] = f_result['data']['return']
# if this is a start, then we need to add it to the pile
if f_result['tag'].endswith('/new'):
for minion_id in f_result['data']['minions']:
if minion_id not in minions:
minions[minion_id] = False
else:
chunk_ret[f_result['data']['id']] = f_result['data']['return']
# clear finished event future
minions[f_result['data']['id']] = True
# if there are no more minions to wait for, then we are done
if not more_todo() and min_wait_time.done():
cancel_inflight_futures()
raise tornado.gen.Return(chunk_ret)
except TimeoutException:
pass
# clear finished event future
try:
minions.remove(future_minion_map[f])
del future_minion_map[f]
except ValueError:
pass
if not minions:
if not is_finished.done():
is_finished.set_result(True)
raise tornado.gen.Return(chunk_ret)
if f == events[0]:
events[0] = self.application.event_listener.get_event(self, tag='salt/job/'+chunk['jid'])
else:
events[1] = self.application.event_listener.get_event(self, tag='syndic/job/'+chunk['jid'])
@tornado.gen.coroutine
def job_not_running(self, jid, tgt, tgt_type, is_finished):
def job_not_running(self, jid, tgt, tgt_type, minions, is_finished):
'''
Return a future which will complete once jid (passed in) is no longer
running on tgt
@ -1044,8 +1050,6 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
event = f.result()
except TimeoutException:
if not minion_running:
if not is_finished.done():
is_finished.set_result(True)
raise tornado.gen.Return(True)
else:
ping_pub_data = yield self.saltclients['local'](tgt,
@ -1059,6 +1063,8 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
# Minions can return, we want to see if the job is running...
if event['data'].get('return', {}) == {}:
continue
if event['data']['id'] not in minions:
minions[event['data']['id']] = False
minion_running = True
@tornado.gen.coroutine

View file

@ -266,6 +266,10 @@ class TestSaltAPIHandler(_SaltnadoIntegrationTestCase):
'tgt': '*',
'fun': 'test.ping',
}
self.application.opts['order_masters'] = True
self.application.opts['syndic_wait'] = 5
response = self.fetch('/',
method='POST',
body=salt.utils.json.dumps(low),