Use async with on asyncio.Lock

This commit is contained in:
Daniel A. Wozniak 2023-07-25 02:57:12 -07:00 committed by Gareth J. Greenaway
parent 8190791910
commit ccf6353126

View file

@ -392,24 +392,22 @@ class TCPPubClient(salt.transport.base.PublishClient):
events = []
if events:
while not self._closing:
await self._read_in_progress.acquire()
try:
byts = await self._stream.read_bytes(4096, partial=True)
except tornado.iostream.StreamClosedError:
log.trace("Stream closed, reconnecting.")
stream = self._stream
self._stream = None
stream.close()
if self.disconnect_callback:
self.disconnect_callback()
await self.connect()
return
finally:
self._read_in_progress.release()
self.unpacker.feed(byts)
for msg in self.unpacker:
framed_msg = salt.transport.frame.decode_embedded_strs(msg)
return framed_msg["body"]
async with self._read_in_progress:
try:
byts = await self._stream.read_bytes(4096, partial=True)
except tornado.iostream.StreamClosedError:
log.trace("Stream closed, reconnecting.")
stream = self._stream
self._stream = None
stream.close()
if self.disconnect_callback:
self.disconnect_callback()
await self.connect()
return
self.unpacker.feed(byts)
for msg in self.unpacker:
framed_msg = salt.transport.frame.decode_embedded_strs(msg)
return framed_msg["body"]
elif timeout:
try:
return await asyncio.wait_for(self.recv(), timeout=timeout)
@ -426,25 +424,23 @@ class TCPPubClient(salt.transport.base.PublishClient):
framed_msg = salt.transport.frame.decode_embedded_strs(msg)
return framed_msg["body"]
while not self._closing:
await self._read_in_progress.acquire()
try:
byts = await self._stream.read_bytes(4096, partial=True)
except tornado.iostream.StreamClosedError:
log.trace("Stream closed, reconnecting.")
stream = self._stream
self._stream = None
stream.close()
if self.disconnect_callback:
self.disconnect_callback()
await self.connect()
log.debug("Re-connected - continue")
continue
finally:
self._read_in_progress.release()
self.unpacker.feed(byts)
for msg in self.unpacker:
framed_msg = salt.transport.frame.decode_embedded_strs(msg)
return framed_msg["body"]
async with self._read_in_progress:
try:
byts = await self._stream.read_bytes(4096, partial=True)
except tornado.iostream.StreamClosedError:
log.trace("Stream closed, reconnecting.")
stream = self._stream
self._stream = None
stream.close()
if self.disconnect_callback:
self.disconnect_callback()
await self.connect()
log.debug("Re-connected - continue")
continue
self.unpacker.feed(byts)
for msg in self.unpacker:
framed_msg = salt.transport.frame.decode_embedded_strs(msg)
return framed_msg["body"]
async def on_recv_handler(self, callback):
while not self._stream: