Switch resolver to threaded variant since ExecutorResolver by default uses blocking implementation

This commit is contained in:
Vladimir Didenko 2015-04-29 11:08:29 +03:00
parent a8235b3188
commit ed8e068bf7
3 changed files with 36 additions and 14 deletions

View file

@ -4,3 +4,5 @@ PyYAML
MarkupSafe
requests >= 1.0.0
tornado >= 4.0
# Required by Tornado to handle threads stuff.
futures >= 2.0

View file

@ -37,12 +37,28 @@ class ReqChannel(object):
# TODO: better doc strings
class AsyncReqChannel(object):
class AsyncChannel(object):
'''
Parent class for Async communication channels
'''
# Resolver used by Tornado TCPClient
# This static field is shared between
# AsyncReqChannel and AsyncPubChannel
_resolver = None
@classmethod
def _init_resolver(cls, num_threads=10):
from tornado.netutil import ThreadedResolver
cls._resolver = ThreadedResolver()
cls._resolver.initialize(num_threads=num_threads)
# TODO: better doc strings
class AsyncReqChannel(AsyncChannel):
'''
Factory class to create a Async communication channels to the ReqServer
'''
@staticmethod
def factory(opts, **kwargs):
@classmethod
def factory(cls, opts, **kwargs):
# Default to ZeroMQ for now
ttype = 'zeromq'
@ -60,8 +76,11 @@ class AsyncReqChannel(object):
import salt.transport.raet
return salt.transport.raet.AsyncRAETReqChannel(opts, **kwargs)
elif ttype == 'tcp':
if not cls._resolver:
# TODO: add opt to specify number of resolver threads
AsyncChannel._init_resolver()
import salt.transport.tcp
return salt.transport.tcp.AsyncTCPReqChannel(opts, **kwargs)
return salt.transport.tcp.AsyncTCPReqChannel(opts, resolver=cls._resolver, **kwargs)
elif ttype == 'local':
import salt.transport.local
return salt.transport.local.AsyncLocalChannel(opts, **kwargs)
@ -83,12 +102,12 @@ class AsyncReqChannel(object):
raise NotImplementedError()
class AsyncPubChannel(object):
class AsyncPubChannel(AsyncChannel):
'''
Factory class to create subscription channels to the master's Publisher
'''
@staticmethod
def factory(opts, **kwargs):
@classmethod
def factory(cls, opts, **kwargs):
# Default to ZeroMQ for now
ttype = 'zeromq'
@ -106,6 +125,9 @@ class AsyncPubChannel(object):
import salt.transport.raet
return salt.transport.raet.AsyncRAETPubChannel(opts, **kwargs)
elif ttype == 'tcp':
if not cls._resolver:
# TODO: add opt to specify number of resolver threads
AsyncChannel._init_resolver()
import salt.transport.tcp
return salt.transport.tcp.AsyncTCPPubChannel(opts, **kwargs)
elif ttype == 'local': # TODO:

View file

@ -119,11 +119,13 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
if self.crypt != 'clear':
self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop)
resolver = kwargs.get('resolver')
parse = urlparse.urlparse(self.opts['master_uri'])
host, port = parse.netloc.rsplit(':', 1)
self.master_addr = (host, int(port))
self.message_client = SaltMessageClient(host, int(port), io_loop=self.io_loop)
self.message_client = SaltMessageClient(host, int(port), io_loop=self.io_loop, resolver=resolver)
def __del__(self):
self.message_client.destroy()
@ -379,17 +381,13 @@ class SaltMessageClient(object):
'''
Low-level message sending client
'''
def __init__(self, host, port, io_loop=None):
def __init__(self, host, port, io_loop=None, resolver=None):
self.host = host
self.port = port
self.io_loop = io_loop or tornado.ioloop.IOLoop.current()
# Configure the resolver to use a non-blocking one
# Not Threaded since we need to work on python2
tornado.netutil.Resolver.configure('tornado.netutil.ExecutorResolver')
self._tcp_client = tornado.tcpclient.TCPClient(io_loop=self.io_loop)
self._tcp_client = tornado.tcpclient.TCPClient(io_loop=self.io_loop, resolver=resolver)
self._mid = 1
self._max_messages = sys.maxint - 1 # number of IDs before we wrap