Do not raise timeout error in PubClient.recv

This commit is contained in:
Daniel A. Wozniak 2023-07-01 15:39:28 -07:00 committed by Gareth J. Greenaway
parent 4e22161bee
commit d92df14ecb
2 changed files with 20 additions and 3 deletions

View file

@ -7,6 +7,7 @@ Wire protocol: "len(payload) msgpack({'head': SOMEHEADER, 'body': SOMEBODY})"
"""
import asyncio
import asyncio.exceptions
import errno
import logging
import multiprocessing
@ -353,6 +354,7 @@ class TCPPubClient(salt.transport.base.PublishClient):
async def recv(self, timeout=None):
if not self._stream:
await self.connect()
await asyncio.sleep(0.001)
return
if timeout == 0:
@ -370,6 +372,11 @@ class TCPPubClient(salt.transport.base.PublishClient):
await self._read_in_progress.acquire()
try:
byts = await self._stream.read_bytes(4096, partial=True)
except tornado.iostream.StreamClosedError:
self.close()
return
except Exception:
raise
finally:
self._read_in_progress.release()
self.unpacker.feed(byts)
@ -377,7 +384,12 @@ class TCPPubClient(salt.transport.base.PublishClient):
framed_msg = salt.transport.frame.decode_embedded_strs(msg)
return framed_msg["body"]
elif timeout:
return await asyncio.wait_for(self.recv(), timeout=timeout)
try:
return await asyncio.wait_for(self.recv(), timeout=timeout)
except asyncio.exceptions.TimeoutError:
self.close()
await self.connect()
return
else:
for msg in self.unpacker:
framed_msg = salt.transport.frame.decode_embedded_strs(msg)
@ -386,6 +398,12 @@ class TCPPubClient(salt.transport.base.PublishClient):
await self._read_in_progress.acquire()
try:
byts = await self._stream.read_bytes(4096, partial=True)
except tornado.iostream.StreamClosedError:
self.close()
await self.connect()
continue
except Exception:
raise
finally:
self._read_in_progress.release()
self.unpacker.feed(byts)
@ -1403,7 +1421,6 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
process_manager.add_process(self.publish_daemon, name=self.__class__.__name__)
async def publish_payload(self, payload, *args):
log.error("publisher - publish payload")
return await self.pub_server.publish_payload(payload)
def connect(self):

View file

@ -550,7 +550,7 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer):
request = await asyncio.wait_for(self._socket.recv(), 1)
reply = await self.handle_message(None, request)
await self._socket.send(self.encode_payload(reply))
except TimeoutError:
except asyncio.exceptions.TimeoutError:
continue
except Exception as exc: # pylint: disable=broad-except
log.error("Exception in request handler", exc_info=True)