mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Fix resolver
This commit is contained in:
parent
85504a91a6
commit
cf4e901ba8
1 changed files with 11 additions and 19 deletions
|
@ -196,21 +196,14 @@ if USE_LOAD_BALANCER:
|
|||
raise
|
||||
|
||||
|
||||
class Resolver:
|
||||
class Resolver(tornado.netutil.DefaultLoopResolver):
|
||||
"""
|
||||
Default resolver for tornado
|
||||
"""
|
||||
|
||||
_resolver_configured = False
|
||||
#def __init__(self, *args, **kwargs):
|
||||
# super().configure('tornado.netutil.DefaultLoopResolver')
|
||||
|
||||
@classmethod
|
||||
def _config_resolver(cls, num_threads=10):
|
||||
tornado.netutil.Resolver.configure(
|
||||
"tornado.netutil.ThreadedResolver", num_threads=num_threads
|
||||
)
|
||||
cls._resolver_configured = True
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
if not self._resolver_configured:
|
||||
# TODO: add opt to specify number of resolver threads
|
||||
self._config_resolver()
|
||||
|
||||
|
||||
class TCPPubClient(salt.transport.base.PublishClient):
|
||||
|
@ -235,7 +228,6 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
self.unpacker = salt.utils.msgpack.Unpacker()
|
||||
self.connected = False
|
||||
self._closing = False
|
||||
self.resolver = Resolver()
|
||||
self._stream = None
|
||||
self._closing = False
|
||||
self._closed = False
|
||||
|
@ -374,7 +366,7 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
return body
|
||||
|
||||
async def send(self, msg):
|
||||
await self._stream.send(msg)
|
||||
await self._stream.write(msg)
|
||||
|
||||
async def recv(self, timeout=None):
|
||||
while self._stream is None:
|
||||
|
@ -442,6 +434,7 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
|
||||
async def on_recv_handler(self, callback):
|
||||
while not self._stream:
|
||||
# Retry quickly, we may want to increase this if it's hogging cpu.
|
||||
await asyncio.sleep(0.003)
|
||||
while True:
|
||||
msg = await self.recv()
|
||||
|
@ -1636,12 +1629,10 @@ class TCPReqClient(salt.transport.base.RequestClient):
|
|||
parse = urllib.parse.urlparse(self.opts["master_uri"])
|
||||
master_host, master_port = parse.netloc.rsplit(":", 1)
|
||||
master_addr = (master_host, int(master_port))
|
||||
# self.resolver = Resolver()
|
||||
resolver = kwargs.get("resolver")
|
||||
|
||||
resolver = kwargs.get("resolver", None)
|
||||
self.host = master_host
|
||||
self.port = int(master_port)
|
||||
self._tcp_client = TCPClientKeepAlive(opts, resolver=resolver)
|
||||
self._tcp_client = TCPClientKeepAlive(opts)
|
||||
self.source_ip = opts.get("source_ip")
|
||||
self.source_port = opts.get("source_ret_port")
|
||||
self._mid = 1
|
||||
|
@ -1658,6 +1649,7 @@ class TCPReqClient(salt.transport.base.RequestClient):
|
|||
self._stream = None
|
||||
self.disconnect_callback = _null_callback
|
||||
self.connect_callback = _null_callback
|
||||
self.backoff = opts.get("tcp_reconnect_backoff", 1)
|
||||
|
||||
async def getstream(self, **kwargs):
|
||||
if self.source_ip or self.source_port:
|
||||
|
|
Loading…
Add table
Reference in a new issue