mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #37438 from DSRCorporation/bugs/37238_salt_hang_on_master_restart
Fix for #37238 salt hang on master restart
This commit is contained in:
commit
9eab5c8f71
3 changed files with 82 additions and 21 deletions
|
@ -46,7 +46,8 @@ class SaltCMD(parsers.SaltCMDOptionParser):
|
|||
|
||||
self.local_client = salt.client.get_local_client(
|
||||
self.get_config_file_path(),
|
||||
skip_perm_errors=skip_perm_errors)
|
||||
skip_perm_errors=skip_perm_errors,
|
||||
auto_reconnect=True)
|
||||
except SaltClientError as exc:
|
||||
self.exit(2, '{0}\n'.format(exc))
|
||||
return
|
||||
|
|
|
@ -73,7 +73,8 @@ def get_local_client(
|
|||
c_path=os.path.join(syspaths.CONFIG_DIR, 'master'),
|
||||
mopts=None,
|
||||
skip_perm_errors=False,
|
||||
io_loop=None):
|
||||
io_loop=None,
|
||||
auto_reconnect=False):
|
||||
'''
|
||||
.. versionadded:: 2014.7.0
|
||||
|
||||
|
@ -100,7 +101,8 @@ def get_local_client(
|
|||
return LocalClient(
|
||||
mopts=opts,
|
||||
skip_perm_errors=skip_perm_errors,
|
||||
io_loop=io_loop)
|
||||
io_loop=io_loop,
|
||||
auto_reconnect=auto_reconnect)
|
||||
|
||||
|
||||
class LocalClient(object):
|
||||
|
@ -126,7 +128,7 @@ class LocalClient(object):
|
|||
def __init__(self,
|
||||
c_path=os.path.join(syspaths.CONFIG_DIR, 'master'),
|
||||
mopts=None, skip_perm_errors=False,
|
||||
io_loop=None, keep_loop=False):
|
||||
io_loop=None, keep_loop=False, auto_reconnect=False):
|
||||
'''
|
||||
:param IOLoop io_loop: io_loop used for events.
|
||||
Pass in an io_loop if you want asynchronous
|
||||
|
@ -149,6 +151,7 @@ class LocalClient(object):
|
|||
self.salt_user = salt.utils.get_specific_user()
|
||||
self.skip_perm_errors = skip_perm_errors
|
||||
self.key = self.__read_master_key()
|
||||
self.auto_reconnect = auto_reconnect
|
||||
self.event = salt.utils.event.get_event(
|
||||
'master',
|
||||
self.opts['sock_dir'],
|
||||
|
@ -156,7 +159,8 @@ class LocalClient(object):
|
|||
opts=self.opts,
|
||||
listen=False,
|
||||
io_loop=io_loop,
|
||||
keep_loop=keep_loop)
|
||||
keep_loop=keep_loop,
|
||||
raise_errors=auto_reconnect)
|
||||
self.utils = salt.loader.utils(self.opts)
|
||||
self.functions = salt.loader.minion_mods(self.opts, utils=self.utils)
|
||||
self.returners = salt.loader.returners(self.opts, self.functions)
|
||||
|
@ -891,8 +895,16 @@ class LocalClient(object):
|
|||
'''
|
||||
|
||||
while True:
|
||||
raw = self.event.get_event(wait=0.01, tag=tag, match_type=match_type, full=True, no_block=True)
|
||||
yield raw
|
||||
try:
|
||||
raw = self.event.get_event(wait=0.01, tag=tag, match_type=match_type, full=True, no_block=True)
|
||||
yield raw
|
||||
except tornado.iostream.StreamClosedError:
|
||||
if self.auto_reconnect:
|
||||
log.warning('Connection to master lost. Reconnecting.')
|
||||
self.event.close_pub()
|
||||
self.event.connect_pub(timeout=self._get_timeout(None))
|
||||
else:
|
||||
raise
|
||||
|
||||
def get_iter_returns(
|
||||
self,
|
||||
|
@ -1122,7 +1134,16 @@ class LocalClient(object):
|
|||
while True:
|
||||
time_left = timeout_at - int(time.time())
|
||||
wait = max(1, time_left)
|
||||
raw = self.event.get_event(wait, jid)
|
||||
try:
|
||||
raw = self.event.get_event(wait, jid)
|
||||
except tornado.iostream.StreamClosedError:
|
||||
if self.auto_reconnect:
|
||||
log.warning('Connection to master lost. Reconnecting.')
|
||||
self.event.close_pub()
|
||||
self.event.connect_pub(timeout=self._get_timeout(wait))
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
if raw is not None and 'return' in raw:
|
||||
found.add(raw['id'])
|
||||
ret[raw['id']] = raw['return']
|
||||
|
@ -1275,7 +1296,16 @@ class LocalClient(object):
|
|||
# Wait 0 == forever, use a minimum of 1s
|
||||
wait = max(1, time_left)
|
||||
jid_tag = 'salt/job/{0}'.format(jid)
|
||||
raw = self.event.get_event(wait, jid_tag)
|
||||
try:
|
||||
raw = self.event.get_event(wait, jid_tag)
|
||||
except tornado.iostream.StreamClosedError:
|
||||
if self.auto_reconnect:
|
||||
log.warning('Connection to master lost. Reconnecting.')
|
||||
self.event.close_pub()
|
||||
self.event.connect_pub(timeout=self._get_timeout(wait))
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
if raw is not None and 'return' in raw:
|
||||
if 'minions' in raw.get('data', {}):
|
||||
minions.update(raw['data']['minions'])
|
||||
|
@ -1394,7 +1424,16 @@ class LocalClient(object):
|
|||
raise StopIteration()
|
||||
# Wait for the hosts to check in
|
||||
while True:
|
||||
raw = self.event.get_event(timeout)
|
||||
try:
|
||||
raw = self.event.get_event(timeout)
|
||||
except tornado.iostream.StreamClosedError:
|
||||
if self.auto_reconnect:
|
||||
log.warning('Connection to master lost. Reconnecting.')
|
||||
self.event.close_pub()
|
||||
self.event.connect_pub(timeout=self._get_timeout(timeout))
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
if raw is None or time.time() > timeout_at:
|
||||
# Timeout reached
|
||||
break
|
||||
|
|
|
@ -112,7 +112,7 @@ TAGS = {
|
|||
|
||||
def get_event(
|
||||
node, sock_dir=None, transport='zeromq',
|
||||
opts=None, listen=True, io_loop=None, keep_loop=False):
|
||||
opts=None, listen=True, io_loop=None, keep_loop=False, raise_errors=False):
|
||||
'''
|
||||
Return an event object suitable for the named transport
|
||||
|
||||
|
@ -125,8 +125,19 @@ def get_event(
|
|||
# TODO: AIO core is separate from transport
|
||||
if transport in ('zeromq', 'tcp'):
|
||||
if node == 'master':
|
||||
return MasterEvent(sock_dir, opts, listen=listen, io_loop=io_loop, keep_loop=keep_loop)
|
||||
return SaltEvent(node, sock_dir, opts, listen=listen, io_loop=io_loop, keep_loop=keep_loop)
|
||||
return MasterEvent(sock_dir,
|
||||
opts,
|
||||
listen=listen,
|
||||
io_loop=io_loop,
|
||||
keep_loop=keep_loop,
|
||||
raise_errors=raise_errors)
|
||||
return SaltEvent(node,
|
||||
sock_dir,
|
||||
opts,
|
||||
listen=listen,
|
||||
io_loop=io_loop,
|
||||
keep_loop=keep_loop,
|
||||
raise_errors=raise_errors)
|
||||
elif transport == 'raet':
|
||||
import salt.utils.raetevent
|
||||
return salt.utils.raetevent.RAETEvent(node,
|
||||
|
@ -135,13 +146,13 @@ def get_event(
|
|||
opts=opts)
|
||||
|
||||
|
||||
def get_master_event(opts, sock_dir, listen=True, io_loop=None):
|
||||
def get_master_event(opts, sock_dir, listen=True, io_loop=None, raise_errors=False):
|
||||
'''
|
||||
Return an event object suitable for the named transport
|
||||
'''
|
||||
# TODO: AIO core is separate from transport
|
||||
if opts['transport'] in ('zeromq', 'tcp'):
|
||||
return MasterEvent(sock_dir, opts, listen=listen, io_loop=io_loop)
|
||||
return MasterEvent(sock_dir, opts, listen=listen, io_loop=io_loop, raise_errors=raise_errors)
|
||||
elif opts['transport'] == 'raet':
|
||||
import salt.utils.raetevent
|
||||
return salt.utils.raetevent.MasterEvent(
|
||||
|
@ -177,7 +188,8 @@ class SaltEvent(object):
|
|||
'''
|
||||
def __init__(
|
||||
self, node, sock_dir=None,
|
||||
opts=None, listen=True, io_loop=None, keep_loop=False):
|
||||
opts=None, listen=True, io_loop=None,
|
||||
keep_loop=False, raise_errors=False):
|
||||
'''
|
||||
:param IOLoop io_loop: Pass in an io_loop if you want asynchronous
|
||||
operation for obtaining events. Eg use of
|
||||
|
@ -200,6 +212,7 @@ class SaltEvent(object):
|
|||
self.cpush = False
|
||||
self.subscriber = None
|
||||
self.pusher = None
|
||||
self.raise_errors = raise_errors
|
||||
|
||||
if opts is None:
|
||||
opts = {}
|
||||
|
@ -505,7 +518,12 @@ class SaltEvent(object):
|
|||
ret = {'data': data, 'tag': mtag}
|
||||
except KeyboardInterrupt:
|
||||
return {'tag': 'salt/event/exit', 'data': {}}
|
||||
except (tornado.iostream.StreamClosedError, RuntimeError):
|
||||
except tornado.iostream.StreamClosedError:
|
||||
if self.raise_errors:
|
||||
raise
|
||||
else:
|
||||
return None
|
||||
except RuntimeError:
|
||||
return None
|
||||
|
||||
if not match_func(ret['tag'], tag):
|
||||
|
@ -773,14 +791,16 @@ class MasterEvent(SaltEvent):
|
|||
opts=None,
|
||||
listen=True,
|
||||
io_loop=None,
|
||||
keep_loop=False):
|
||||
keep_loop=False,
|
||||
raise_errors=False):
|
||||
super(MasterEvent, self).__init__(
|
||||
'master',
|
||||
sock_dir,
|
||||
opts,
|
||||
listen=listen,
|
||||
io_loop=io_loop,
|
||||
keep_loop=keep_loop)
|
||||
keep_loop=keep_loop,
|
||||
raise_errors=raise_errors)
|
||||
|
||||
|
||||
class LocalClientEvent(MasterEvent):
|
||||
|
@ -813,10 +833,11 @@ class MinionEvent(SaltEvent):
|
|||
RAET compatible
|
||||
Create a master event management object
|
||||
'''
|
||||
def __init__(self, opts, listen=True, io_loop=None):
|
||||
def __init__(self, opts, listen=True, io_loop=None, raise_errors=False):
|
||||
super(MinionEvent, self).__init__(
|
||||
'minion', sock_dir=opts.get('sock_dir'),
|
||||
opts=opts, listen=listen, io_loop=io_loop)
|
||||
opts=opts, listen=listen, io_loop=io_loop,
|
||||
raise_errors=raise_errors)
|
||||
|
||||
|
||||
class AsyncEventPublisher(object):
|
||||
|
|
Loading…
Add table
Reference in a new issue