Merge pull request #37438 from DSRCorporation/bugs/37238_salt_hang_on_master_restart

Fix for #37238 salt hang on master restart
This commit is contained in:
Mike Place 2016-11-04 17:10:51 +13:00 committed by GitHub
commit 9eab5c8f71
3 changed files with 82 additions and 21 deletions

View file

@ -46,7 +46,8 @@ class SaltCMD(parsers.SaltCMDOptionParser):
self.local_client = salt.client.get_local_client(
self.get_config_file_path(),
skip_perm_errors=skip_perm_errors)
skip_perm_errors=skip_perm_errors,
auto_reconnect=True)
except SaltClientError as exc:
self.exit(2, '{0}\n'.format(exc))
return

View file

@ -73,7 +73,8 @@ def get_local_client(
c_path=os.path.join(syspaths.CONFIG_DIR, 'master'),
mopts=None,
skip_perm_errors=False,
io_loop=None):
io_loop=None,
auto_reconnect=False):
'''
.. versionadded:: 2014.7.0
@ -100,7 +101,8 @@ def get_local_client(
return LocalClient(
mopts=opts,
skip_perm_errors=skip_perm_errors,
io_loop=io_loop)
io_loop=io_loop,
auto_reconnect=auto_reconnect)
class LocalClient(object):
@ -126,7 +128,7 @@ class LocalClient(object):
def __init__(self,
c_path=os.path.join(syspaths.CONFIG_DIR, 'master'),
mopts=None, skip_perm_errors=False,
io_loop=None, keep_loop=False):
io_loop=None, keep_loop=False, auto_reconnect=False):
'''
:param IOLoop io_loop: io_loop used for events.
Pass in an io_loop if you want asynchronous
@ -149,6 +151,7 @@ class LocalClient(object):
self.salt_user = salt.utils.get_specific_user()
self.skip_perm_errors = skip_perm_errors
self.key = self.__read_master_key()
self.auto_reconnect = auto_reconnect
self.event = salt.utils.event.get_event(
'master',
self.opts['sock_dir'],
@ -156,7 +159,8 @@ class LocalClient(object):
opts=self.opts,
listen=False,
io_loop=io_loop,
keep_loop=keep_loop)
keep_loop=keep_loop,
raise_errors=auto_reconnect)
self.utils = salt.loader.utils(self.opts)
self.functions = salt.loader.minion_mods(self.opts, utils=self.utils)
self.returners = salt.loader.returners(self.opts, self.functions)
@ -891,8 +895,16 @@ class LocalClient(object):
'''
while True:
raw = self.event.get_event(wait=0.01, tag=tag, match_type=match_type, full=True, no_block=True)
yield raw
try:
raw = self.event.get_event(wait=0.01, tag=tag, match_type=match_type, full=True, no_block=True)
yield raw
except tornado.iostream.StreamClosedError:
if self.auto_reconnect:
log.warning('Connection to master lost. Reconnecting.')
self.event.close_pub()
self.event.connect_pub(timeout=self._get_timeout(None))
else:
raise
def get_iter_returns(
self,
@ -1122,7 +1134,16 @@ class LocalClient(object):
while True:
time_left = timeout_at - int(time.time())
wait = max(1, time_left)
raw = self.event.get_event(wait, jid)
try:
raw = self.event.get_event(wait, jid)
except tornado.iostream.StreamClosedError:
if self.auto_reconnect:
log.warning('Connection to master lost. Reconnecting.')
self.event.close_pub()
self.event.connect_pub(timeout=self._get_timeout(wait))
continue
else:
raise
if raw is not None and 'return' in raw:
found.add(raw['id'])
ret[raw['id']] = raw['return']
@ -1275,7 +1296,16 @@ class LocalClient(object):
# Wait 0 == forever, use a minimum of 1s
wait = max(1, time_left)
jid_tag = 'salt/job/{0}'.format(jid)
raw = self.event.get_event(wait, jid_tag)
try:
raw = self.event.get_event(wait, jid_tag)
except tornado.iostream.StreamClosedError:
if self.auto_reconnect:
log.warning('Connection to master lost. Reconnecting.')
self.event.close_pub()
self.event.connect_pub(timeout=self._get_timeout(wait))
continue
else:
raise
if raw is not None and 'return' in raw:
if 'minions' in raw.get('data', {}):
minions.update(raw['data']['minions'])
@ -1394,7 +1424,16 @@ class LocalClient(object):
raise StopIteration()
# Wait for the hosts to check in
while True:
raw = self.event.get_event(timeout)
try:
raw = self.event.get_event(timeout)
except tornado.iostream.StreamClosedError:
if self.auto_reconnect:
log.warning('Connection to master lost. Reconnecting.')
self.event.close_pub()
self.event.connect_pub(timeout=self._get_timeout(timeout))
continue
else:
raise
if raw is None or time.time() > timeout_at:
# Timeout reached
break

View file

@ -112,7 +112,7 @@ TAGS = {
def get_event(
node, sock_dir=None, transport='zeromq',
opts=None, listen=True, io_loop=None, keep_loop=False):
opts=None, listen=True, io_loop=None, keep_loop=False, raise_errors=False):
'''
Return an event object suitable for the named transport
@ -125,8 +125,19 @@ def get_event(
# TODO: AIO core is separate from transport
if transport in ('zeromq', 'tcp'):
if node == 'master':
return MasterEvent(sock_dir, opts, listen=listen, io_loop=io_loop, keep_loop=keep_loop)
return SaltEvent(node, sock_dir, opts, listen=listen, io_loop=io_loop, keep_loop=keep_loop)
return MasterEvent(sock_dir,
opts,
listen=listen,
io_loop=io_loop,
keep_loop=keep_loop,
raise_errors=raise_errors)
return SaltEvent(node,
sock_dir,
opts,
listen=listen,
io_loop=io_loop,
keep_loop=keep_loop,
raise_errors=raise_errors)
elif transport == 'raet':
import salt.utils.raetevent
return salt.utils.raetevent.RAETEvent(node,
@ -135,13 +146,13 @@ def get_event(
opts=opts)
def get_master_event(opts, sock_dir, listen=True, io_loop=None):
def get_master_event(opts, sock_dir, listen=True, io_loop=None, raise_errors=False):
'''
Return an event object suitable for the named transport
'''
# TODO: AIO core is separate from transport
if opts['transport'] in ('zeromq', 'tcp'):
return MasterEvent(sock_dir, opts, listen=listen, io_loop=io_loop)
return MasterEvent(sock_dir, opts, listen=listen, io_loop=io_loop, raise_errors=raise_errors)
elif opts['transport'] == 'raet':
import salt.utils.raetevent
return salt.utils.raetevent.MasterEvent(
@ -177,7 +188,8 @@ class SaltEvent(object):
'''
def __init__(
self, node, sock_dir=None,
opts=None, listen=True, io_loop=None, keep_loop=False):
opts=None, listen=True, io_loop=None,
keep_loop=False, raise_errors=False):
'''
:param IOLoop io_loop: Pass in an io_loop if you want asynchronous
operation for obtaining events. Eg use of
@ -200,6 +212,7 @@ class SaltEvent(object):
self.cpush = False
self.subscriber = None
self.pusher = None
self.raise_errors = raise_errors
if opts is None:
opts = {}
@ -505,7 +518,12 @@ class SaltEvent(object):
ret = {'data': data, 'tag': mtag}
except KeyboardInterrupt:
return {'tag': 'salt/event/exit', 'data': {}}
except (tornado.iostream.StreamClosedError, RuntimeError):
except tornado.iostream.StreamClosedError:
if self.raise_errors:
raise
else:
return None
except RuntimeError:
return None
if not match_func(ret['tag'], tag):
@ -773,14 +791,16 @@ class MasterEvent(SaltEvent):
opts=None,
listen=True,
io_loop=None,
keep_loop=False):
keep_loop=False,
raise_errors=False):
super(MasterEvent, self).__init__(
'master',
sock_dir,
opts,
listen=listen,
io_loop=io_loop,
keep_loop=keep_loop)
keep_loop=keep_loop,
raise_errors=raise_errors)
class LocalClientEvent(MasterEvent):
@ -813,10 +833,11 @@ class MinionEvent(SaltEvent):
RAET compatible
Create a master event management object
'''
def __init__(self, opts, listen=True, io_loop=None):
def __init__(self, opts, listen=True, io_loop=None, raise_errors=False):
super(MinionEvent, self).__init__(
'minion', sock_dir=opts.get('sock_dir'),
opts=opts, listen=listen, io_loop=io_loop)
opts=opts, listen=listen, io_loop=io_loop,
raise_errors=raise_errors)
class AsyncEventPublisher(object):