mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Windows fix
This commit is contained in:
parent
8035d2418d
commit
f556db26a9
2 changed files with 12 additions and 9 deletions
|
@ -252,8 +252,6 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
self.path = kwargs.get("path", None)
|
||||
self.source_ip = self.opts.get("source_ip")
|
||||
self.source_port = self.opts.get("source_publish_port")
|
||||
self.connect_callback = _null_callback
|
||||
self.disconnect_callback = _null_callback
|
||||
self.on_recv_task = None
|
||||
if self.host is None and self.port is None:
|
||||
if self.path is None:
|
||||
|
@ -261,6 +259,8 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
elif self.host and self.port:
|
||||
if self.path:
|
||||
raise Exception("A host and port or a path must be provided, not both")
|
||||
self.connect_callback = kwargs.get("connect_callback", _null_callback)
|
||||
self.disconnect_callback = kwargs.get("disconnect_callback", _null_callback)
|
||||
|
||||
def close(self):
|
||||
if self._closing:
|
||||
|
@ -321,8 +321,6 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
await asyncio.wait_for(stream.connect(self.path), 1)
|
||||
self.unpacker = salt.utils.msgpack.Unpacker()
|
||||
log.debug("PubClient conencted to %r %r", self, self.path)
|
||||
self.poller = select.poll()
|
||||
self.poller.register(stream.socket, select.POLLIN)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
log.warning(
|
||||
"TCP Publish Client encountered an exception while connecting to"
|
||||
|
@ -384,7 +382,7 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
framed_msg = salt.transport.frame.decode_embedded_strs(msg)
|
||||
return framed_msg["body"]
|
||||
try:
|
||||
events = self.poller.poll(0)
|
||||
events, _, _ = select.select([self._stream.socket], [], [], 0)
|
||||
except TimeoutError:
|
||||
events = []
|
||||
if events:
|
||||
|
@ -397,6 +395,8 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
stream = self._stream
|
||||
self._stream = None
|
||||
stream.close()
|
||||
if self.disconnect_callback:
|
||||
self.disconnect_callback()
|
||||
await self.connect()
|
||||
return
|
||||
# except Exception:
|
||||
|
@ -432,6 +432,8 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
stream = self._stream
|
||||
self._stream = None
|
||||
stream.close()
|
||||
if self.disconnect_callback:
|
||||
self.disconnect_callback()
|
||||
await self.connect()
|
||||
log.error("Re-connected - continue")
|
||||
continue
|
||||
|
|
|
@ -1053,6 +1053,7 @@ class RequestClient(salt.transport.base.RequestClient):
|
|||
|
||||
def __init__(self, opts, io_loop, linger=0): # pylint: disable=W0231
|
||||
self.opts = opts
|
||||
# XXX Support host, port, path, instead of using get_master_uri
|
||||
self.master_uri = self.get_master_uri(opts)
|
||||
self.linger = linger
|
||||
if io_loop is None:
|
||||
|
@ -1114,7 +1115,7 @@ class RequestClient(salt.transport.base.RequestClient):
|
|||
ret = await self.socket.recv()
|
||||
return salt.payload.loads(ret)
|
||||
|
||||
async def send(self, message, timeout=None):
|
||||
async def send(self, load, timeout=60):
|
||||
"""
|
||||
Return a future which will be completed when the message has a response
|
||||
"""
|
||||
|
@ -1122,11 +1123,11 @@ class RequestClient(salt.transport.base.RequestClient):
|
|||
await self.connect()
|
||||
await self.sending.acquire()
|
||||
try:
|
||||
return await asyncio.wait_for(self._send_recv(message), timeout=timeout)
|
||||
return await asyncio.wait_for(self._send_recv(load), timeout=timeout)
|
||||
except TimeoutError:
|
||||
self.close()
|
||||
except Exception:
|
||||
self.close()
|
||||
# except Exception:
|
||||
# self.close()
|
||||
finally:
|
||||
self.sending.release()
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue