Merge pull request #35792 from DSRCorporation/bugs/34973_syndic_reconnect_master_2016.3

Reconnect syndic to event bus if master disappeared.
This commit is contained in:
Mike Place 2016-08-29 11:13:19 +09:00 committed by GitHub
commit 30c2db7b09
3 changed files with 21 additions and 6 deletions

View file

@ -2209,6 +2209,10 @@ class Syndic(Minion):
sync=False,
)
def reconnect_event_bus(self, something):
future = self.local.event.set_event_handler(self._process_event)
self.io_loop.add_future(future, self.reconnect_event_bus)
# Syndic Tune In
@tornado.gen.coroutine
def tune_in(self, start=True):
@ -2231,7 +2235,8 @@ class Syndic(Minion):
# register the event sub to the poller
self._reset_event_aggregation()
self.local.event.set_event_handler(self._process_event)
future = self.local.event.set_event_handler(self._process_event)
self.io_loop.add_future(future, self.reconnect_event_bus)
# forward events every syndic_event_forward_timeout
self.forward_events = tornado.ioloop.PeriodicCallback(self._forward_events,
@ -2555,6 +2560,10 @@ class MultiSyndic(MinionBase):
self.job_rets = {}
self.raw_events = []
def reconnect_event_bus(self, something):
future = self.local.event.set_event_handler(self._process_event)
self.io_loop.add_future(future, self.reconnect_event_bus)
# Syndic Tune In
def tune_in(self):
'''
@ -2570,7 +2579,8 @@ class MultiSyndic(MinionBase):
# register the event sub to the poller
self._reset_event_aggregation()
self.local.event.set_event_handler(self._process_event)
future = self.local.event.set_event_handler(self._process_event)
self.io_loop.add_future(future, self.reconnect_event_bus)
# forward events every syndic_event_forward_timeout
self.forward_events = tornado.ioloop.PeriodicCallback(self._forward_events,

View file

@ -270,6 +270,9 @@ class IPCClient(object):
if hasattr(self, '_connecting_future') and not self._connecting_future.done(): # pylint: disable=E0203
future = self._connecting_future # pylint: disable=E0203
else:
if hasattr(self, '_connecting_future'):
# read previous future result to prevent the "unhandled future exception" error
self._connecting_future.exc_info() # pylint: disable=E0203
future = tornado.concurrent.Future()
self._connecting_future = future
self._connect(timeout=timeout)
@ -684,6 +687,7 @@ class IPCMessageSubscriber(IPCClient):
except Exception as exc:
log.error('Exception occurred while Subscriber handling stream: {0}'.format(exc))
@tornado.gen.coroutine
def read_async(self, callback):
'''
Asynchronously read messages and invoke a callback when they are ready.
@ -692,13 +696,14 @@ class IPCMessageSubscriber(IPCClient):
'''
while not self.connected():
try:
self.connect()
yield self.connect(timeout=5)
except tornado.iostream.StreamClosedError:
log.trace('Subscriber closed stream on IPC {0} before connect'.format(self.socket_path))
yield tornado.gen.sleep(1)
except Exception as exc:
log.error('Exception occurred while Subscriber connecting: {0}'.format(exc))
self.io_loop.spawn_callback(self._read_async, callback)
yield tornado.gen.sleep(1)
yield self._read_async(callback)
def close(self):
'''

View file

@ -733,7 +733,7 @@ class SaltEvent(object):
if not self.cpub:
self.connect_pub()
# This will handle reconnects
self.subscriber.read_async(event_handler)
return self.subscriber.read_async(event_handler)
def __del__(self):
# skip exceptions in destroy-- since destroy() doesn't cover interpreter