Ensure exceptions in service future are handled

This commit is contained in:
Daniel A. Wozniak 2019-04-10 03:32:20 +00:00
parent c7ad732854
commit 1bdaf2931b
No known key found for this signature in database
GPG key ID: 166B9D2C06C82D61

View file

@ -574,6 +574,7 @@ class IPCMessageSubscriberService(IPCClient):
self.saved_data = []
self._read_in_progress = Lock()
self.handlers = weakref.WeakSet()
self.read_stream_future = None
def _subscribe(self, handler):
self.handlers.add(handler)
@ -601,16 +602,16 @@ class IPCMessageSubscriberService(IPCClient):
if timeout is None:
timeout = 5
read_stream_future = None
self.read_stream_future = None
while self._has_subscribers():
if read_stream_future is None:
read_stream_future = self.stream.read_bytes(4096, partial=True)
if self.read_stream_future is None:
self.read_stream_future = self.stream.read_bytes(4096, partial=True)
try:
wire_bytes = yield FutureWithTimeout(self.io_loop,
read_stream_future,
self.read_stream_future,
timeout)
read_stream_future = None
self.read_stream_future = None
self.unpacker.feed(wire_bytes)
msgs = [msg['body'] for msg in self.unpacker]
@ -650,7 +651,7 @@ class IPCMessageSubscriberService(IPCClient):
except Exception as exc:
log.error('Exception occurred while Subscriber connecting: %s', exc)
yield tornado.gen.sleep(1)
self._read(timeout)
yield self._read(timeout)
def close(self):
'''
@ -659,6 +660,10 @@ class IPCMessageSubscriberService(IPCClient):
leaks.
'''
super(IPCMessageSubscriberService, self).close()
if self.read_stream_future is not None and self.read_stream_future.done():
exc = self.read_stream_future.exception()
if exc and not isinstance(exc, tornado.iostream.StreamClosedError):
log.error("Read future returned exception %r", exc)
def __del__(self):
if IPCMessageSubscriberService in globals():