Fix exceptions being set on completed futures

Fixes an issue in the ZeroMQ transport where caught exceptions were being set on futures that had already completed (i.e. done) and were logging lots of error messages.
This commit is contained in:
Cian Yong Leow 2024-02-06 17:47:48 +00:00 committed by Daniel Wozniak
parent 1ad68fcd93
commit a974d5d7db
3 changed files with 40 additions and 3 deletions

1
changelog/66006.fixed.md Normal file
View file

@ -0,0 +1 @@
Fix exceptions being set on futures that are already done in ZeroMQ transport

View file

@ -607,14 +607,16 @@ class AsyncReqMessageClient:
try:
recv = yield self.socket.recv()
except zmq.eventloop.future.CancelledError as exc:
future.set_exception(exc)
if not future.done():
future.set_exception(exc)
return
if not future.done():
data = salt.payload.loads(recv)
future.set_result(data)
except Exception as exc: # pylint: disable=broad-except
future.set_exception(exc)
if not future.done():
future.set_exception(exc)
class ZeroMQSocketMonitor:

View file

@ -13,6 +13,7 @@ import uuid
import msgpack
import pytest
import zmq.eventloop.future
import salt.channel.client
import salt.channel.server
@ -27,7 +28,7 @@ import salt.utils.platform
import salt.utils.process
import salt.utils.stringutils
from salt.master import SMaster
from tests.support.mock import MagicMock, patch
from tests.support.mock import AsyncMock, MagicMock, patch
try:
from M2Crypto import RSA
@ -1489,6 +1490,39 @@ async def test_client_timeout_msg(minion_opts):
client.close()
async def test_client_send_recv_on_cancelled_error(minion_opts):
client = salt.transport.zeromq.AsyncReqMessageClient(
minion_opts, "tcp://127.0.0.1:4506"
)
mock_future = MagicMock(**{"done.return_value": True})
try:
client.socket = AsyncMock()
client.socket.recv.side_effect = zmq.eventloop.future.CancelledError
await client._send_recv({"meh": "bah"}, mock_future)
mock_future.set_exception.assert_not_called()
finally:
client.close()
async def test_client_send_recv_on_exception(minion_opts):
client = salt.transport.zeromq.AsyncReqMessageClient(
minion_opts, "tcp://127.0.0.1:4506"
)
mock_future = MagicMock(**{"done.return_value": True})
try:
client.socket = None
await client._send_recv({"meh": "bah"}, mock_future)
mock_future.set_exception.assert_not_called()
finally:
client.close()
def test_pub_client_init(minion_opts, io_loop):
minion_opts["id"] = "minion"
minion_opts["__role"] = "syndic"