Fixes a timing bug of saltnado's client local.

This commit is contained in:
kstreee 2018-03-05 07:48:04 +00:00 committed by kstreee@gmail.com
parent 83ed40c06a
commit 0f358a9c9e
No known key found for this signature in database
GPG key ID: 08BD0BFFDE2A9996

View file

@ -371,7 +371,7 @@ class EventListener(object):
for (tag, matcher), futures in six.iteritems(self.tag_map):
try:
is_matched = matcher(mtag, tag)
except Exception as e:
except Exception:
logger.error('Failed to run a matcher.', exc_info=True)
is_matched = False
@ -899,7 +899,7 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
try:
chunk_ret = yield getattr(self, '_disbatch_{0}'.format(low['client']))(low)
ret.append(chunk_ret)
except EauthAuthenticationError as exc:
except EauthAuthenticationError:
ret.append('Failed to authenticate')
break
except Exception as ex:
@ -914,7 +914,11 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
'''
Dispatch local client commands
'''
chunk_ret = {}
# Generate jid before triggering a job to subscribe all returns from minions
chunk['jid'] = salt.utils.jid.gen_jid()
# Subscribe returns from minions before firing a job
future_minion_map = self.subscribe_minion_returns(chunk['jid'], chunk['tgt'])
f_call = self._format_call_run_job_async(chunk)
# fire a job off
@ -926,64 +930,71 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
# if the job didn't publish, lets not wait around for nothing
# TODO: set header??
if 'jid' not in pub_data:
for future in future_minion_map:
try:
future.set_result(None)
except Exception:
pass
raise tornado.gen.Return('No minions matched the target. No command was sent, no jid was assigned.')
# seed minions_remaining with the pub_data
minions_remaining = pub_data['minions']
syndic_min_wait = None
if self.application.opts['order_masters']:
syndic_min_wait = tornado.gen.sleep(self.application.opts['syndic_wait'])
# To ensure job_not_running and all_return are terminated by each other, communicate using a future
is_finished = Future()
job_not_running_future = self.job_not_running(pub_data['jid'],
chunk['tgt'],
f_call['kwargs']['tgt_type'],
is_finished,
minions_remaining=list(minions_remaining),
)
is_finished)
# if we have a min_wait, do that
if syndic_min_wait is not None:
yield syndic_min_wait
all_return_future = self.all_returns(pub_data['jid'],
is_finished,
minions_remaining=list(minions_remaining),
)
minion_returns_future = self.sanitize_minion_returns(future_minion_map, pub_data['minions'], is_finished)
yield job_not_running_future
raise tornado.gen.Return((yield all_return_future))
raise tornado.gen.Return((yield minion_returns_future))
def subscribe_minion_returns(self, jid, minions):
# Subscribe each minion event
future_minion_map = {}
for minion in minions:
tag = tagify([jid, 'ret', minion], 'job')
minion_future = self.application.event_listener.get_event(self,
tag=tag,
matcher=EventListener.exact_matcher,
timeout=self.application.opts['timeout'])
future_minion_map[minion_future] = minion
return future_minion_map
@tornado.gen.coroutine
def all_returns(self,
jid,
is_finished,
minions_remaining=None,
):
def sanitize_minion_returns(self, future_minion_map, minions, is_finished):
'''
Return a future which will complete once all returns are completed
(according to minions_remaining), or one of the passed in "is_finished" completes
(according to minions), or one of the passed in "finish_chunk_ret_future" completes
'''
if minions_remaining is None:
minions_remaining = []
if minions is None:
minions = []
# Remove redundant minions
redundant_minion_futures = [future for future in future_minion_map.keys() if future_minion_map[future] not in minions]
for redundant_minion_future in redundant_minion_futures:
try:
redundant_minion_future.set_result(None)
except Exception:
pass
del future_minion_map[redundant_minion_future]
chunk_ret = {}
minion_events = {}
for minion in minions_remaining:
tag = tagify([jid, 'ret', minion], 'job')
minion_event = self.application.event_listener.get_event(self,
tag=tag,
matcher=EventListener.exact_matcher,
timeout=self.application.opts['timeout'])
minion_events[minion_event] = minion
while True:
f = yield Any(minion_events.keys() + [is_finished])
f = yield Any(future_minion_map.keys() + [is_finished])
try:
# When finished entire routine, cleanup other futures and return result
if f is is_finished:
for event in minion_events:
for event in future_minion_map.keys():
if not event.done():
event.set_result(None)
raise tornado.gen.Return(chunk_ret)
@ -994,31 +1005,22 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
# clear finished event future
try:
minions_remaining.remove(minion_events[f])
del minion_events[f]
minions.remove(future_minion_map[f])
del future_minion_map[f]
except ValueError:
pass
if len(minions_remaining) == 0:
if not minions:
if not is_finished.done():
is_finished.set_result(True)
raise tornado.gen.Return(chunk_ret)
@tornado.gen.coroutine
def job_not_running(self,
jid,
tgt,
tgt_type,
is_finished,
minions_remaining=None,
):
def job_not_running(self, jid, tgt, tgt_type, is_finished):
'''
Return a future which will complete once jid (passed in) is no longer
running on tgt
'''
if minions_remaining is None:
minions_remaining = []
ping_pub_data = yield self.saltclients['local'](tgt,
'saltutil.find_job',
[jid],
@ -1052,13 +1054,11 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
ping_tag = tagify([ping_pub_data['jid'], 'ret'], 'job')
minion_running = False
continue
# Minions can return, we want to see if the job is running...
if event['data'].get('return', {}) == {}:
continue
minion_running = True
id_ = event['data']['id']
if id_ not in minions_remaining:
minions_remaining.append(event['data']['id'])
@tornado.gen.coroutine
def _disbatch_local_async(self, chunk):