mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Ensure exceptions in service future are handled
This commit is contained in:
parent
c7ad732854
commit
1bdaf2931b
1 changed files with 11 additions and 6 deletions
|
@ -574,6 +574,7 @@ class IPCMessageSubscriberService(IPCClient):
|
|||
self.saved_data = []
|
||||
self._read_in_progress = Lock()
|
||||
self.handlers = weakref.WeakSet()
|
||||
self.read_stream_future = None
|
||||
|
||||
def _subscribe(self, handler):
|
||||
self.handlers.add(handler)
|
||||
|
@ -601,16 +602,16 @@ class IPCMessageSubscriberService(IPCClient):
|
|||
if timeout is None:
|
||||
timeout = 5
|
||||
|
||||
read_stream_future = None
|
||||
self.read_stream_future = None
|
||||
while self._has_subscribers():
|
||||
if read_stream_future is None:
|
||||
read_stream_future = self.stream.read_bytes(4096, partial=True)
|
||||
if self.read_stream_future is None:
|
||||
self.read_stream_future = self.stream.read_bytes(4096, partial=True)
|
||||
|
||||
try:
|
||||
wire_bytes = yield FutureWithTimeout(self.io_loop,
|
||||
read_stream_future,
|
||||
self.read_stream_future,
|
||||
timeout)
|
||||
read_stream_future = None
|
||||
self.read_stream_future = None
|
||||
|
||||
self.unpacker.feed(wire_bytes)
|
||||
msgs = [msg['body'] for msg in self.unpacker]
|
||||
|
@ -650,7 +651,7 @@ class IPCMessageSubscriberService(IPCClient):
|
|||
except Exception as exc:
|
||||
log.error('Exception occurred while Subscriber connecting: %s', exc)
|
||||
yield tornado.gen.sleep(1)
|
||||
self._read(timeout)
|
||||
yield self._read(timeout)
|
||||
|
||||
def close(self):
|
||||
'''
|
||||
|
@ -659,6 +660,10 @@ class IPCMessageSubscriberService(IPCClient):
|
|||
leaks.
|
||||
'''
|
||||
super(IPCMessageSubscriberService, self).close()
|
||||
if self.read_stream_future is not None and self.read_stream_future.done():
|
||||
exc = self.read_stream_future.exception()
|
||||
if exc and not isinstance(exc, tornado.iostream.StreamClosedError):
|
||||
log.error("Read future returned exception %r", exc)
|
||||
|
||||
def __del__(self):
|
||||
if IPCMessageSubscriberService in globals():
|
||||
|
|
Loading…
Add table
Reference in a new issue