mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #37602 from DSRCorporation/bugs/37238_salt_hang_on_master_restart
Handle master restart in appropriate places using `salt.event` listener.
This commit is contained in:
commit
39b75878cf
7 changed files with 35 additions and 54 deletions
|
@ -159,8 +159,7 @@ class LocalClient(object):
|
|||
opts=self.opts,
|
||||
listen=False,
|
||||
io_loop=io_loop,
|
||||
keep_loop=keep_loop,
|
||||
raise_errors=auto_reconnect)
|
||||
keep_loop=keep_loop)
|
||||
self.utils = salt.loader.utils(self.opts)
|
||||
self.functions = salt.loader.minion_mods(self.opts, utils=self.utils)
|
||||
self.returners = salt.loader.returners(self.opts, self.functions)
|
||||
|
@ -895,16 +894,9 @@ class LocalClient(object):
|
|||
'''
|
||||
|
||||
while True:
|
||||
try:
|
||||
raw = self.event.get_event(wait=0.01, tag=tag, match_type=match_type, full=True, no_block=True)
|
||||
yield raw
|
||||
except tornado.iostream.StreamClosedError:
|
||||
if self.auto_reconnect:
|
||||
log.warning('Connection to master lost. Reconnecting.')
|
||||
self.event.close_pub()
|
||||
self.event.connect_pub(timeout=self._get_timeout(None))
|
||||
else:
|
||||
raise
|
||||
raw = self.event.get_event(wait=0.01, tag=tag, match_type=match_type, full=True,
|
||||
no_block=True, auto_reconnect=self.auto_reconnect)
|
||||
yield raw
|
||||
|
||||
def get_iter_returns(
|
||||
self,
|
||||
|
@ -1134,16 +1126,7 @@ class LocalClient(object):
|
|||
while True:
|
||||
time_left = timeout_at - int(time.time())
|
||||
wait = max(1, time_left)
|
||||
try:
|
||||
raw = self.event.get_event(wait, jid)
|
||||
except tornado.iostream.StreamClosedError:
|
||||
if self.auto_reconnect:
|
||||
log.warning('Connection to master lost. Reconnecting.')
|
||||
self.event.close_pub()
|
||||
self.event.connect_pub(timeout=self._get_timeout(wait))
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
raw = self.event.get_event(wait, jid, auto_reconnect=self.auto_reconnect)
|
||||
if raw is not None and 'return' in raw:
|
||||
found.add(raw['id'])
|
||||
ret[raw['id']] = raw['return']
|
||||
|
@ -1296,16 +1279,7 @@ class LocalClient(object):
|
|||
# Wait 0 == forever, use a minimum of 1s
|
||||
wait = max(1, time_left)
|
||||
jid_tag = 'salt/job/{0}'.format(jid)
|
||||
try:
|
||||
raw = self.event.get_event(wait, jid_tag)
|
||||
except tornado.iostream.StreamClosedError:
|
||||
if self.auto_reconnect:
|
||||
log.warning('Connection to master lost. Reconnecting.')
|
||||
self.event.close_pub()
|
||||
self.event.connect_pub(timeout=self._get_timeout(wait))
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
raw = self.event.get_event(wait, jid_tag, auto_reconnect=self.auto_reconnect)
|
||||
if raw is not None and 'return' in raw:
|
||||
if 'minions' in raw.get('data', {}):
|
||||
minions.update(raw['data']['minions'])
|
||||
|
@ -1424,16 +1398,7 @@ class LocalClient(object):
|
|||
raise StopIteration()
|
||||
# Wait for the hosts to check in
|
||||
while True:
|
||||
try:
|
||||
raw = self.event.get_event(timeout)
|
||||
except tornado.iostream.StreamClosedError:
|
||||
if self.auto_reconnect:
|
||||
log.warning('Connection to master lost. Reconnecting.')
|
||||
self.event.close_pub()
|
||||
self.event.connect_pub(timeout=self._get_timeout(timeout))
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
raw = self.event.get_event(timeout, auto_reconnect=self.auto_reconnect)
|
||||
if raw is None or time.time() > timeout_at:
|
||||
# Timeout reached
|
||||
break
|
||||
|
|
|
@ -313,7 +313,7 @@ class APIClient(object):
|
|||
|
||||
If wait is 0 then block forever or until next event becomes available.
|
||||
'''
|
||||
return self.event.get_event(wait=wait, tag=tag, full=full)
|
||||
return self.event.get_event(wait=wait, tag=tag, full=full, auto_reconnect=True)
|
||||
|
||||
def fire_event(self, data, tag):
|
||||
'''
|
||||
|
|
|
@ -160,7 +160,7 @@ class SyncClientMixin(object):
|
|||
|
||||
if timeout is None:
|
||||
timeout = self.opts.get('rest_timeout', 300)
|
||||
ret = event.get_event(tag=ret_tag, full=True, wait=timeout)
|
||||
ret = event.get_event(tag=ret_tag, full=True, wait=timeout, auto_reconnect=True)
|
||||
if ret is None:
|
||||
raise salt.exceptions.SaltClientTimeout(
|
||||
"RunnerClient job '{0}' timed out".format(job['jid']),
|
||||
|
|
|
@ -1644,7 +1644,7 @@ def event(tagmatch='*',
|
|||
listen=True)
|
||||
|
||||
while True:
|
||||
ret = sevent.get_event(full=True)
|
||||
ret = sevent.get_event(full=True, auto_reconnect=True)
|
||||
if ret is None:
|
||||
continue
|
||||
|
||||
|
|
|
@ -2006,7 +2006,7 @@ class Events(object):
|
|||
transport=self.opts['transport'],
|
||||
opts=self.opts,
|
||||
listen=True)
|
||||
stream = event.iter_events(full=True)
|
||||
stream = event.iter_events(full=True, auto_reconnect=True)
|
||||
|
||||
yield u'retry: {0}\n'.format(400)
|
||||
|
||||
|
@ -2180,7 +2180,7 @@ class WebsocketEndpoint(object):
|
|||
transport=self.opts['transport'],
|
||||
opts=self.opts,
|
||||
listen=True)
|
||||
stream = event.iter_events(full=True)
|
||||
stream = event.iter_events(full=True, auto_reconnect=True)
|
||||
SaltInfo = event_processor.SaltInfo(handler)
|
||||
while True:
|
||||
data = next(stream)
|
||||
|
|
|
@ -547,7 +547,8 @@ class SaltEvent(object):
|
|||
use_pending=None,
|
||||
pending_tags=None,
|
||||
match_type=None,
|
||||
no_block=False):
|
||||
no_block=False,
|
||||
auto_reconnect=False):
|
||||
'''
|
||||
Get a single publication.
|
||||
IF no publication available THEN block for up to wait seconds
|
||||
|
@ -611,7 +612,20 @@ class SaltEvent(object):
|
|||
ret = self._check_pending(tag, match_func)
|
||||
if ret is None:
|
||||
with salt.utils.async.current_ioloop(self.io_loop):
|
||||
ret = self._get_event(wait, tag, match_func, no_block)
|
||||
if auto_reconnect:
|
||||
raise_errors = self.raise_errors
|
||||
self.raise_errors = True
|
||||
while True:
|
||||
try:
|
||||
ret = self._get_event(wait, tag, match_func, no_block)
|
||||
break
|
||||
except tornado.iostream.StreamClosedError:
|
||||
self.close_pub()
|
||||
self.connect_pub(timeout=wait)
|
||||
continue
|
||||
self.raise_errors = raise_errors
|
||||
else:
|
||||
ret = self._get_event(wait, tag, match_func, no_block)
|
||||
|
||||
if ret is None or full:
|
||||
return ret
|
||||
|
@ -647,12 +661,13 @@ class SaltEvent(object):
|
|||
mtag, data = self.unpack(raw, self.serial)
|
||||
return {'data': data, 'tag': mtag}
|
||||
|
||||
def iter_events(self, tag='', full=False, match_type=None):
|
||||
def iter_events(self, tag='', full=False, match_type=None, auto_reconnect=False):
|
||||
'''
|
||||
Creates a generator that continuously listens for events
|
||||
'''
|
||||
while True:
|
||||
data = self.get_event(tag=tag, full=full, match_type=match_type)
|
||||
data = self.get_event(tag=tag, full=full, match_type=match_type,
|
||||
auto_reconnect=auto_reconnect)
|
||||
if data is None:
|
||||
continue
|
||||
yield data
|
||||
|
|
|
@ -150,7 +150,8 @@ class RAETEvent(object):
|
|||
'''
|
||||
return raw
|
||||
|
||||
def get_event(self, wait=5, tag='', match_type=None, full=False, no_block=None):
|
||||
def get_event(self, wait=5, tag='', match_type=None, full=False, no_block=None,
|
||||
auto_reconnect=False):
|
||||
'''
|
||||
Get a single publication.
|
||||
IF no publication available THEN block for up to wait seconds
|
||||
|
@ -193,12 +194,12 @@ class RAETEvent(object):
|
|||
return None
|
||||
return msg
|
||||
|
||||
def iter_events(self, tag='', full=False):
|
||||
def iter_events(self, tag='', full=False, auto_reconnect=False):
|
||||
'''
|
||||
Creates a generator that continuously listens for events
|
||||
'''
|
||||
while True:
|
||||
data = self.get_event(tag=tag, full=full)
|
||||
data = self.get_event(tag=tag, full=full, auto_reconnect=auto_reconnect)
|
||||
if data is None:
|
||||
continue
|
||||
yield data
|
||||
|
|
Loading…
Add table
Reference in a new issue