Merge pull request #37602 from DSRCorporation/bugs/37238_salt_hang_on_master_restart

Handle master restart in appropriate places using `salt.event` listener.
This commit is contained in:
Mike Place 2016-11-11 10:53:20 +13:00 committed by GitHub
commit 39b75878cf
7 changed files with 35 additions and 54 deletions

View file

@ -159,8 +159,7 @@ class LocalClient(object):
opts=self.opts,
listen=False,
io_loop=io_loop,
keep_loop=keep_loop,
raise_errors=auto_reconnect)
keep_loop=keep_loop)
self.utils = salt.loader.utils(self.opts)
self.functions = salt.loader.minion_mods(self.opts, utils=self.utils)
self.returners = salt.loader.returners(self.opts, self.functions)
@ -895,16 +894,9 @@ class LocalClient(object):
'''
while True:
try:
raw = self.event.get_event(wait=0.01, tag=tag, match_type=match_type, full=True, no_block=True)
yield raw
except tornado.iostream.StreamClosedError:
if self.auto_reconnect:
log.warning('Connection to master lost. Reconnecting.')
self.event.close_pub()
self.event.connect_pub(timeout=self._get_timeout(None))
else:
raise
raw = self.event.get_event(wait=0.01, tag=tag, match_type=match_type, full=True,
no_block=True, auto_reconnect=self.auto_reconnect)
yield raw
def get_iter_returns(
self,
@ -1134,16 +1126,7 @@ class LocalClient(object):
while True:
time_left = timeout_at - int(time.time())
wait = max(1, time_left)
try:
raw = self.event.get_event(wait, jid)
except tornado.iostream.StreamClosedError:
if self.auto_reconnect:
log.warning('Connection to master lost. Reconnecting.')
self.event.close_pub()
self.event.connect_pub(timeout=self._get_timeout(wait))
continue
else:
raise
raw = self.event.get_event(wait, jid, auto_reconnect=self.auto_reconnect)
if raw is not None and 'return' in raw:
found.add(raw['id'])
ret[raw['id']] = raw['return']
@ -1296,16 +1279,7 @@ class LocalClient(object):
# Wait 0 == forever, use a minimum of 1s
wait = max(1, time_left)
jid_tag = 'salt/job/{0}'.format(jid)
try:
raw = self.event.get_event(wait, jid_tag)
except tornado.iostream.StreamClosedError:
if self.auto_reconnect:
log.warning('Connection to master lost. Reconnecting.')
self.event.close_pub()
self.event.connect_pub(timeout=self._get_timeout(wait))
continue
else:
raise
raw = self.event.get_event(wait, jid_tag, auto_reconnect=self.auto_reconnect)
if raw is not None and 'return' in raw:
if 'minions' in raw.get('data', {}):
minions.update(raw['data']['minions'])
@ -1424,16 +1398,7 @@ class LocalClient(object):
raise StopIteration()
# Wait for the hosts to check in
while True:
try:
raw = self.event.get_event(timeout)
except tornado.iostream.StreamClosedError:
if self.auto_reconnect:
log.warning('Connection to master lost. Reconnecting.')
self.event.close_pub()
self.event.connect_pub(timeout=self._get_timeout(timeout))
continue
else:
raise
raw = self.event.get_event(timeout, auto_reconnect=self.auto_reconnect)
if raw is None or time.time() > timeout_at:
# Timeout reached
break

View file

@ -313,7 +313,7 @@ class APIClient(object):
If wait is 0 then block forever or until next event becomes available.
'''
return self.event.get_event(wait=wait, tag=tag, full=full)
return self.event.get_event(wait=wait, tag=tag, full=full, auto_reconnect=True)
def fire_event(self, data, tag):
'''

View file

@ -160,7 +160,7 @@ class SyncClientMixin(object):
if timeout is None:
timeout = self.opts.get('rest_timeout', 300)
ret = event.get_event(tag=ret_tag, full=True, wait=timeout)
ret = event.get_event(tag=ret_tag, full=True, wait=timeout, auto_reconnect=True)
if ret is None:
raise salt.exceptions.SaltClientTimeout(
"RunnerClient job '{0}' timed out".format(job['jid']),

View file

@ -1644,7 +1644,7 @@ def event(tagmatch='*',
listen=True)
while True:
ret = sevent.get_event(full=True)
ret = sevent.get_event(full=True, auto_reconnect=True)
if ret is None:
continue

View file

@ -2006,7 +2006,7 @@ class Events(object):
transport=self.opts['transport'],
opts=self.opts,
listen=True)
stream = event.iter_events(full=True)
stream = event.iter_events(full=True, auto_reconnect=True)
yield u'retry: {0}\n'.format(400)
@ -2180,7 +2180,7 @@ class WebsocketEndpoint(object):
transport=self.opts['transport'],
opts=self.opts,
listen=True)
stream = event.iter_events(full=True)
stream = event.iter_events(full=True, auto_reconnect=True)
SaltInfo = event_processor.SaltInfo(handler)
while True:
data = next(stream)

View file

@ -547,7 +547,8 @@ class SaltEvent(object):
use_pending=None,
pending_tags=None,
match_type=None,
no_block=False):
no_block=False,
auto_reconnect=False):
'''
Get a single publication.
IF no publication available THEN block for up to wait seconds
@ -611,7 +612,20 @@ class SaltEvent(object):
ret = self._check_pending(tag, match_func)
if ret is None:
with salt.utils.async.current_ioloop(self.io_loop):
ret = self._get_event(wait, tag, match_func, no_block)
if auto_reconnect:
raise_errors = self.raise_errors
self.raise_errors = True
while True:
try:
ret = self._get_event(wait, tag, match_func, no_block)
break
except tornado.iostream.StreamClosedError:
self.close_pub()
self.connect_pub(timeout=wait)
continue
self.raise_errors = raise_errors
else:
ret = self._get_event(wait, tag, match_func, no_block)
if ret is None or full:
return ret
@ -647,12 +661,13 @@ class SaltEvent(object):
mtag, data = self.unpack(raw, self.serial)
return {'data': data, 'tag': mtag}
def iter_events(self, tag='', full=False, match_type=None):
def iter_events(self, tag='', full=False, match_type=None, auto_reconnect=False):
'''
Creates a generator that continuously listens for events
'''
while True:
data = self.get_event(tag=tag, full=full, match_type=match_type)
data = self.get_event(tag=tag, full=full, match_type=match_type,
auto_reconnect=auto_reconnect)
if data is None:
continue
yield data

View file

@ -150,7 +150,8 @@ class RAETEvent(object):
'''
return raw
def get_event(self, wait=5, tag='', match_type=None, full=False, no_block=None):
def get_event(self, wait=5, tag='', match_type=None, full=False, no_block=None,
auto_reconnect=False):
'''
Get a single publication.
IF no publication available THEN block for up to wait seconds
@ -193,12 +194,12 @@ class RAETEvent(object):
return None
return msg
def iter_events(self, tag='', full=False):
def iter_events(self, tag='', full=False, auto_reconnect=False):
'''
Creates a generator that continuously listens for events
'''
while True:
data = self.get_event(tag=tag, full=full)
data = self.get_event(tag=tag, full=full, auto_reconnect=auto_reconnect)
if data is None:
continue
yield data