AsyncTCPReqChannel will fail after 10 uses

The issue is that the resolver that is used
(tornado.netutil.ThreadedResolver) is shared. Threads don't seem to be
returned for reuse until 'close' is invoked on the resolver. Due to
this, sharing the same Resolver object may not be a good idea. Instead,
we will configure which resolver type will be used
(tornado.netutil.ThreadedResolver) as well as the number of threads.
We will let 'tornado.tcpclient.TCPClient' create its own resolver
object (by effectively passing resolver=None). This object will pull
from the shared thread pool, but since it will be closed when
'tornado.tcpclient.TCPClient' is closed, it will return threads back to
the thread pool for later use.

Signed-off-by: Sergey Kizunov <sergey.kizunov@ni.com>
This commit is contained in:
Sergey Kizunov 2015-11-03 09:47:12 -06:00 committed by rallytime
parent 61ba00b1c3
commit d94d0db805
2 changed files with 25 additions and 12 deletions

View file

@ -67,16 +67,20 @@ class AsyncChannel(object):
'''
Parent class for Async communication channels
'''
# Resolver used by Tornado TCPClient
# Resolver is used by Tornado TCPClient.
# This static field is shared between
# AsyncReqChannel and AsyncPubChannel
_resolver = None
# AsyncReqChannel and AsyncPubChannel.
# This will check to make sure the Resolver
# is configured before first use.
_resolver_configured = False
@classmethod
def _init_resolver(cls, num_threads=10):
from tornado.netutil import ThreadedResolver
cls._resolver = ThreadedResolver()
cls._resolver.initialize(num_threads=num_threads)
def _config_resolver(cls, num_threads=10):
from tornado.netutil import Resolver
Resolver.configure(
'tornado.netutil.ThreadedResolver',
num_threads=num_threads)
cls._resolver_configured = True
# TODO: better doc strings
@ -103,11 +107,11 @@ class AsyncReqChannel(AsyncChannel):
import salt.transport.raet
return salt.transport.raet.AsyncRAETReqChannel(opts, **kwargs)
elif ttype == 'tcp':
if not cls._resolver:
if not cls._resolver_configured:
# TODO: add opt to specify number of resolver threads
AsyncChannel._init_resolver()
AsyncChannel._config_resolver()
import salt.transport.tcp
return salt.transport.tcp.AsyncTCPReqChannel(opts, resolver=cls._resolver, **kwargs)
return salt.transport.tcp.AsyncTCPReqChannel(opts, **kwargs)
elif ttype == 'local':
import salt.transport.local
return salt.transport.local.AsyncLocalChannel(opts, **kwargs)
@ -152,9 +156,9 @@ class AsyncPubChannel(AsyncChannel):
import salt.transport.raet
return salt.transport.raet.AsyncRAETPubChannel(opts, **kwargs)
elif ttype == 'tcp':
if not cls._resolver:
if not cls._resolver_configured:
# TODO: add opt to specify number of resolver threads
AsyncChannel._init_resolver()
AsyncChannel._config_resolver()
import salt.transport.tcp
return salt.transport.tcp.AsyncTCPPubChannel(opts, **kwargs)
elif ttype == 'local': # TODO:

View file

@ -417,6 +417,15 @@ class SaltMessageClient(object):
self._closing = True
if hasattr(self, '_stream') and not self._stream.closed():
self._stream.close()
if self._read_until_future is not None:
# This will prevent this message from showing up:
# '[ERROR ] Future exception was never retrieved:
# StreamClosedError'
# This happens because the logic is always waiting to read
# the next message and the associated read future is marked
# 'StreamClosedError' when the stream is closed.
self._read_until_future.exc_info()
self._tcp_client.close()
def __del__(self):
self.close()