mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
AsyncTCPReqChannel will fail after 10 uses
The issue is that the resolver that is used (tornado.netutil.ThreadedResolver) is shared. Threads don't seem to be returned for reuse until 'close' is invoked on the resolver. Due to this, sharing the same Resolver object may not be a good idea. Instead, we will configure which resolver type will be used (tornado.netutil.ThreadedResolver) as well as the number of threads. We will let 'tornado.tcpclient.TCPClient' create its own resolver object (by effectively passing resolver=None). This object will pull from the shared thread pool, but since it will be closed when 'tornado.tcpclient.TCPClient' is closed, it will return threads back to the thread pool for later use. Signed-off-by: Sergey Kizunov <sergey.kizunov@ni.com>
This commit is contained in:
parent
61ba00b1c3
commit
d94d0db805
2 changed files with 25 additions and 12 deletions
|
@ -67,16 +67,20 @@ class AsyncChannel(object):
|
|||
'''
|
||||
Parent class for Async communication channels
|
||||
'''
|
||||
# Resolver used by Tornado TCPClient
|
||||
# Resolver is used by Tornado TCPClient.
|
||||
# This static field is shared between
|
||||
# AsyncReqChannel and AsyncPubChannel
|
||||
_resolver = None
|
||||
# AsyncReqChannel and AsyncPubChannel.
|
||||
# This will check to make sure the Resolver
|
||||
# is configured before first use.
|
||||
_resolver_configured = False
|
||||
|
||||
@classmethod
|
||||
def _init_resolver(cls, num_threads=10):
|
||||
from tornado.netutil import ThreadedResolver
|
||||
cls._resolver = ThreadedResolver()
|
||||
cls._resolver.initialize(num_threads=num_threads)
|
||||
def _config_resolver(cls, num_threads=10):
|
||||
from tornado.netutil import Resolver
|
||||
Resolver.configure(
|
||||
'tornado.netutil.ThreadedResolver',
|
||||
num_threads=num_threads)
|
||||
cls._resolver_configured = True
|
||||
|
||||
|
||||
# TODO: better doc strings
|
||||
|
@ -103,11 +107,11 @@ class AsyncReqChannel(AsyncChannel):
|
|||
import salt.transport.raet
|
||||
return salt.transport.raet.AsyncRAETReqChannel(opts, **kwargs)
|
||||
elif ttype == 'tcp':
|
||||
if not cls._resolver:
|
||||
if not cls._resolver_configured:
|
||||
# TODO: add opt to specify number of resolver threads
|
||||
AsyncChannel._init_resolver()
|
||||
AsyncChannel._config_resolver()
|
||||
import salt.transport.tcp
|
||||
return salt.transport.tcp.AsyncTCPReqChannel(opts, resolver=cls._resolver, **kwargs)
|
||||
return salt.transport.tcp.AsyncTCPReqChannel(opts, **kwargs)
|
||||
elif ttype == 'local':
|
||||
import salt.transport.local
|
||||
return salt.transport.local.AsyncLocalChannel(opts, **kwargs)
|
||||
|
@ -152,9 +156,9 @@ class AsyncPubChannel(AsyncChannel):
|
|||
import salt.transport.raet
|
||||
return salt.transport.raet.AsyncRAETPubChannel(opts, **kwargs)
|
||||
elif ttype == 'tcp':
|
||||
if not cls._resolver:
|
||||
if not cls._resolver_configured:
|
||||
# TODO: add opt to specify number of resolver threads
|
||||
AsyncChannel._init_resolver()
|
||||
AsyncChannel._config_resolver()
|
||||
import salt.transport.tcp
|
||||
return salt.transport.tcp.AsyncTCPPubChannel(opts, **kwargs)
|
||||
elif ttype == 'local': # TODO:
|
||||
|
|
|
@ -417,6 +417,15 @@ class SaltMessageClient(object):
|
|||
self._closing = True
|
||||
if hasattr(self, '_stream') and not self._stream.closed():
|
||||
self._stream.close()
|
||||
if self._read_until_future is not None:
|
||||
# This will prevent this message from showing up:
|
||||
# '[ERROR ] Future exception was never retrieved:
|
||||
# StreamClosedError'
|
||||
# This happens because the logic is always waiting to read
|
||||
# the next message and the associated read future is marked
|
||||
# 'StreamClosedError' when the stream is closed.
|
||||
self._read_until_future.exc_info()
|
||||
self._tcp_client.close()
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
|
Loading…
Add table
Reference in a new issue