Fix up issues found in code review

This commit is contained in:
Daniel A. Wozniak 2023-07-27 15:23:34 -07:00 committed by Gareth J. Greenaway
parent 2b90a91add
commit ca93da0d7d
2 changed files with 14 additions and 13 deletions

View file

@ -1,6 +1,6 @@
Jinja2
jmespath
msgpack>=0.5.2,!=0.5.5
msgpack>=1.0.0
PyYAML
MarkupSafe
requests>=1.0.0

View file

@ -253,10 +253,12 @@ class TCPPubClient(salt.transport.base.PublishClient):
self.on_recv_task = None
if self.host is None and self.port is None:
if self.path is None:
raise Exception("A host and port or a path must be provided")
raise RuntimeError("A host and port or a path must be provided")
elif self.host and self.port:
if self.path:
raise Exception("A host and port or a path must be provided, not both")
raise RuntimeError(
"A host and port or a path must be provided, not both"
)
self.connect_callback = kwargs.get("connect_callback", _null_callback)
self.disconnect_callback = kwargs.get("disconnect_callback", _null_callback)
@ -288,7 +290,7 @@ class TCPPubClient(salt.transport.base.PublishClient):
"source_port": self.source_port,
}
stream = None
start = time.time()
start = time.monotonic()
timeout = kwargs.get("timeout", None)
while stream is None and (not self._closed and not self._closing):
try:
@ -331,10 +333,10 @@ class TCPPubClient(salt.transport.base.PublishClient):
exc,
self.backoff,
)
if timeout and time.time() - start > timeout:
if timeout and time.monotonic() - start > timeout:
break
await asyncio.sleep(self.backoff)
if timeout and time.time() - start > timeout:
if timeout and time.monotonic() - start > timeout:
break
return stream
@ -360,9 +362,9 @@ class TCPPubClient(salt.transport.base.PublishClient):
if port is not None:
self.port = port
if connect_callback:
self.connect_callback = None
self.connect_callback = connect_callback
if disconnect_callback:
self.disconnect_callback = None
self.disconnect_callback = disconnect_callback
await self._connect(timeout=timeout)
def _decode_messages(self, messages):
@ -446,7 +448,6 @@ class TCPPubClient(salt.transport.base.PublishClient):
while not self._stream:
await asyncio.sleep(0.003)
while True:
# try:
msg = await self.recv()
if msg:
callback(msg)
@ -456,6 +457,8 @@ class TCPPubClient(salt.transport.base.PublishClient):
Register a callback for received messages (that we didn't initiate)
"""
if self.on_recv_task:
# XXX: We are not awaiting this canceled task. This still needs to
# be addressed.
self.on_recv_task.cancel()
if callback is None:
self.on_recv_task = None
@ -848,7 +851,6 @@ class MessageClient:
if self._stream:
if not self._stream_return_running:
self.task = asyncio.create_task(self._stream_return())
# self.io_loop.spawn_callback(self._stream_return)
if self.connect_callback:
self.connect_callback(True)
@ -1602,7 +1604,7 @@ class _TCPPubServerPublisher:
self.stream = None
if timeout is not None:
timeout_at = time.time() + timeout
timeout_at = time.monotonic() + timeout
while True:
if self._closing:
@ -1621,7 +1623,7 @@ class _TCPPubServerPublisher:
if self.stream.closed():
self.stream = None
if timeout is None or time.time() > timeout_at:
if timeout is None or time.monotonic() > timeout_at:
if self.stream is not None:
self.stream.close()
self.stream = None
@ -1737,7 +1739,6 @@ class TCPReqClient(salt.transport.base.RequestClient):
)
else:
sock_type = socket.AF_UNIX
path = self.url.replace("ipc://", "")
stream = tornado.iostream.IOStream(
socket.socket(sock_type, socket.SOCK_STREAM)
)