From ccf635312607ed4794e7d8c083b68e4d483f048b Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Tue, 25 Jul 2023 02:57:12 -0700 Subject: [PATCH] Use async with on asyncio.Lock --- salt/transport/tcp.py | 70 ++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 1425531ba01..faa341b961f 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -392,24 +392,22 @@ class TCPPubClient(salt.transport.base.PublishClient): events = [] if events: while not self._closing: - await self._read_in_progress.acquire() - try: - byts = await self._stream.read_bytes(4096, partial=True) - except tornado.iostream.StreamClosedError: - log.trace("Stream closed, reconnecting.") - stream = self._stream - self._stream = None - stream.close() - if self.disconnect_callback: - self.disconnect_callback() - await self.connect() - return - finally: - self._read_in_progress.release() - self.unpacker.feed(byts) - for msg in self.unpacker: - framed_msg = salt.transport.frame.decode_embedded_strs(msg) - return framed_msg["body"] + async with self._read_in_progress: + try: + byts = await self._stream.read_bytes(4096, partial=True) + except tornado.iostream.StreamClosedError: + log.trace("Stream closed, reconnecting.") + stream = self._stream + self._stream = None + stream.close() + if self.disconnect_callback: + self.disconnect_callback() + await self.connect() + return + self.unpacker.feed(byts) + for msg in self.unpacker: + framed_msg = salt.transport.frame.decode_embedded_strs(msg) + return framed_msg["body"] elif timeout: try: return await asyncio.wait_for(self.recv(), timeout=timeout) @@ -426,25 +424,23 @@ class TCPPubClient(salt.transport.base.PublishClient): framed_msg = salt.transport.frame.decode_embedded_strs(msg) return framed_msg["body"] while not self._closing: - await self._read_in_progress.acquire() - try: - byts = await self._stream.read_bytes(4096, partial=True) - except tornado.iostream.StreamClosedError: - log.trace("Stream closed, reconnecting.") - stream = self._stream - self._stream = None - stream.close() - if self.disconnect_callback: - self.disconnect_callback() - await self.connect() - log.debug("Re-connected - continue") - continue - finally: - self._read_in_progress.release() - self.unpacker.feed(byts) - for msg in self.unpacker: - framed_msg = salt.transport.frame.decode_embedded_strs(msg) - return framed_msg["body"] + async with self._read_in_progress: + try: + byts = await self._stream.read_bytes(4096, partial=True) + except tornado.iostream.StreamClosedError: + log.trace("Stream closed, reconnecting.") + stream = self._stream + self._stream = None + stream.close() + if self.disconnect_callback: + self.disconnect_callback() + await self.connect() + log.debug("Re-connected - continue") + continue + self.unpacker.feed(byts) + for msg in self.unpacker: + framed_msg = salt.transport.frame.decode_embedded_strs(msg) + return framed_msg["body"] async def on_recv_handler(self, callback): while not self._stream: