mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #35792 from DSRCorporation/bugs/34973_syndic_reconnect_master_2016.3
Reconnect syndic to event bus if master disappeared.
This commit is contained in:
commit
30c2db7b09
3 changed files with 21 additions and 6 deletions
|
@ -2209,6 +2209,10 @@ class Syndic(Minion):
|
|||
sync=False,
|
||||
)
|
||||
|
||||
def reconnect_event_bus(self, something):
|
||||
future = self.local.event.set_event_handler(self._process_event)
|
||||
self.io_loop.add_future(future, self.reconnect_event_bus)
|
||||
|
||||
# Syndic Tune In
|
||||
@tornado.gen.coroutine
|
||||
def tune_in(self, start=True):
|
||||
|
@ -2231,7 +2235,8 @@ class Syndic(Minion):
|
|||
|
||||
# register the event sub to the poller
|
||||
self._reset_event_aggregation()
|
||||
self.local.event.set_event_handler(self._process_event)
|
||||
future = self.local.event.set_event_handler(self._process_event)
|
||||
self.io_loop.add_future(future, self.reconnect_event_bus)
|
||||
|
||||
# forward events every syndic_event_forward_timeout
|
||||
self.forward_events = tornado.ioloop.PeriodicCallback(self._forward_events,
|
||||
|
@ -2555,6 +2560,10 @@ class MultiSyndic(MinionBase):
|
|||
self.job_rets = {}
|
||||
self.raw_events = []
|
||||
|
||||
def reconnect_event_bus(self, something):
|
||||
future = self.local.event.set_event_handler(self._process_event)
|
||||
self.io_loop.add_future(future, self.reconnect_event_bus)
|
||||
|
||||
# Syndic Tune In
|
||||
def tune_in(self):
|
||||
'''
|
||||
|
@ -2570,7 +2579,8 @@ class MultiSyndic(MinionBase):
|
|||
|
||||
# register the event sub to the poller
|
||||
self._reset_event_aggregation()
|
||||
self.local.event.set_event_handler(self._process_event)
|
||||
future = self.local.event.set_event_handler(self._process_event)
|
||||
self.io_loop.add_future(future, self.reconnect_event_bus)
|
||||
|
||||
# forward events every syndic_event_forward_timeout
|
||||
self.forward_events = tornado.ioloop.PeriodicCallback(self._forward_events,
|
||||
|
|
|
@ -270,6 +270,9 @@ class IPCClient(object):
|
|||
if hasattr(self, '_connecting_future') and not self._connecting_future.done(): # pylint: disable=E0203
|
||||
future = self._connecting_future # pylint: disable=E0203
|
||||
else:
|
||||
if hasattr(self, '_connecting_future'):
|
||||
# read previous future result to prevent the "unhandled future exception" error
|
||||
self._connecting_future.exc_info() # pylint: disable=E0203
|
||||
future = tornado.concurrent.Future()
|
||||
self._connecting_future = future
|
||||
self._connect(timeout=timeout)
|
||||
|
@ -684,6 +687,7 @@ class IPCMessageSubscriber(IPCClient):
|
|||
except Exception as exc:
|
||||
log.error('Exception occurred while Subscriber handling stream: {0}'.format(exc))
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def read_async(self, callback):
|
||||
'''
|
||||
Asynchronously read messages and invoke a callback when they are ready.
|
||||
|
@ -692,13 +696,14 @@ class IPCMessageSubscriber(IPCClient):
|
|||
'''
|
||||
while not self.connected():
|
||||
try:
|
||||
self.connect()
|
||||
yield self.connect(timeout=5)
|
||||
except tornado.iostream.StreamClosedError:
|
||||
log.trace('Subscriber closed stream on IPC {0} before connect'.format(self.socket_path))
|
||||
yield tornado.gen.sleep(1)
|
||||
except Exception as exc:
|
||||
log.error('Exception occurred while Subscriber connecting: {0}'.format(exc))
|
||||
|
||||
self.io_loop.spawn_callback(self._read_async, callback)
|
||||
yield tornado.gen.sleep(1)
|
||||
yield self._read_async(callback)
|
||||
|
||||
def close(self):
|
||||
'''
|
||||
|
|
|
@ -733,7 +733,7 @@ class SaltEvent(object):
|
|||
if not self.cpub:
|
||||
self.connect_pub()
|
||||
# This will handle reconnects
|
||||
self.subscriber.read_async(event_handler)
|
||||
return self.subscriber.read_async(event_handler)
|
||||
|
||||
def __del__(self):
|
||||
# skip exceptions in destroy-- since destroy() doesn't cover interpreter
|
||||
|
|
Loading…
Add table
Reference in a new issue