fix pre-commit

This commit is contained in:
Insoo Ha 2023-12-14 10:04:21 +09:00 committed by Pedro Algarvio
parent 13cfa9341f
commit 10b9aeda05
3 changed files with 45 additions and 26 deletions

View file

@ -1 +1 @@
Use `send_multipart` instead of `send` when sending multipart message.
Use `send_multipart` instead of `send` when sending multipart message.

View file

@ -1,16 +1,15 @@
from contextlib import contextmanager
import copy
import logging
import random
import time
import threading
import time
from contextlib import contextmanager
import pytest
from saltfactories.utils import random_string
import salt.transport.zeromq
import salt.utils.process
from tests.support.mock import MagicMock, patch
from tests.support.pytest.transport import PubServerChannelProcess
@ -42,9 +41,13 @@ def generate_msg_list(msg_cnt, minions_list, broadcast):
for i in range(msg_cnt):
for idx, minion_id in enumerate(minions_list):
if broadcast:
msg_list.append({"tgt_type": "grain", "tgt": 'id:*', "jid": msg_cnt * idx + i})
msg_list.append(
{"tgt_type": "grain", "tgt": "id:*", "jid": msg_cnt * idx + i}
)
else:
msg_list.append({"tgt_type": "list", "tgt": [minion_id], "jid": msg_cnt * idx + i})
msg_list.append(
{"tgt_type": "list", "tgt": [minion_id], "jid": msg_cnt * idx + i}
)
return msg_list
@ -54,7 +57,9 @@ def channel_publisher_manager(msg_list, p_cnt, pub_server_channel):
msg_list = copy.deepcopy(msg_list)
random.shuffle(msg_list)
batch_size = len(msg_list) // p_cnt
list_batch = [[x * batch_size, x * batch_size + batch_size] for x in range(0, p_cnt)]
list_batch = [
[x * batch_size, x * batch_size + batch_size] for x in range(0, p_cnt)
]
list_batch[-1][1] = list_batch[-1][1] + 1
try:
for i, j in list_batch:
@ -103,7 +108,9 @@ def test_zeromq_filtering_minion(salt_master, salt_minion):
),
):
with PubServerChannelProcess(opts, minion_opts) as server_channel:
with channel_publisher_manager(msg_list, workers, server_channel.pub_server_channel):
with channel_publisher_manager(
msg_list, workers, server_channel.pub_server_channel
):
cnt = 0
last_results_len = 0
while cnt < 20:
@ -114,8 +121,9 @@ def test_zeromq_filtering_minion(salt_master, salt_minion):
last_results_len = results_len
cnt += 1
results = set(server_channel.collector.results)
assert results == expect, \
f"{len(results)}, != {len(expect)}, difference: {expect.difference(results)} {results}"
assert (
results == expect
), f"{len(results)}, != {len(expect)}, difference: {expect.difference(results)} {results}"
@pytest.mark.skip_on_windows
@ -132,7 +140,7 @@ def test_zeromq_filtering_syndic(salt_master, salt_minion):
minion_opts = dict(
salt_minion.config.copy(),
zmq_filtering=True,
__role='syndic',
__role="syndic",
)
messages = 200
workers = 5
@ -153,7 +161,9 @@ def test_zeromq_filtering_syndic(salt_master, salt_minion):
),
):
with PubServerChannelProcess(opts, minion_opts) as server_channel:
with channel_publisher_manager(msg_list, workers, server_channel.pub_server_channel):
with channel_publisher_manager(
msg_list, workers, server_channel.pub_server_channel
):
cnt = 0
last_results_len = 0
while cnt < 20:
@ -164,8 +174,9 @@ def test_zeromq_filtering_syndic(salt_master, salt_minion):
last_results_len = results_len
cnt += 1
results = set(server_channel.collector.results)
assert results == expect, \
f"{len(results)}, != {len(expect)}, difference: {expect.difference(results)} {results}"
assert (
results == expect
), f"{len(results)}, != {len(expect)}, difference: {expect.difference(results)} {results}"
@pytest.mark.skip_on_windows
@ -205,7 +216,9 @@ def test_zeromq_filtering_broadcast(salt_master, salt_minion):
),
):
with PubServerChannelProcess(opts, minion_opts) as server_channel:
with channel_publisher_manager(msg_list, workers, server_channel.pub_server_channel):
with channel_publisher_manager(
msg_list, workers, server_channel.pub_server_channel
):
cnt = 0
last_results_len = 0
while cnt < 20:
@ -216,8 +229,9 @@ def test_zeromq_filtering_broadcast(salt_master, salt_minion):
last_results_len = results_len
cnt += 1
results = set(server_channel.collector.results)
assert results == expect, \
f"{len(results)}, != {len(expect)}, difference: {expect.difference(results)} {results}"
assert (
results == expect
), f"{len(results)}, != {len(expect)}, difference: {expect.difference(results)} {results}"
def test_pub_channel(master_opts):

View file

@ -42,7 +42,9 @@ class Collector(salt.utils.process.SignalHandlingProcess):
):
super().__init__()
self.minion_config = minion_config
self.hexid = hashlib.sha1(salt.utils.stringutils.to_bytes(self.minion_config["id"])).hexdigest()
self.hexid = hashlib.sha1(
salt.utils.stringutils.to_bytes(self.minion_config["id"])
).hexdigest()
self.interface = interface
self.port = port
self.aes_key = aes_key
@ -51,7 +53,7 @@ class Collector(salt.utils.process.SignalHandlingProcess):
self.hard_timeout = time.time() + timeout + 120
self.manager = multiprocessing.Manager()
self.results = self.manager.list()
self.zmq_filtering = minion_config['zmq_filtering']
self.zmq_filtering = minion_config["zmq_filtering"]
self.stopped = multiprocessing.Event()
self.started = multiprocessing.Event()
self.running = multiprocessing.Event()
@ -87,7 +89,9 @@ class Collector(salt.utils.process.SignalHandlingProcess):
if self.minion_config.get("__role") == "syndic":
self.sock.setsockopt(zmq.SUBSCRIBE, b"syndic")
else:
self.sock.setsockopt(zmq.SUBSCRIBE, salt.utils.stringutils.to_bytes(self.hexid))
self.sock.setsockopt(
zmq.SUBSCRIBE, salt.utils.stringutils.to_bytes(self.hexid)
)
else:
self.sock.setsockopt(zmq.SUBSCRIBE, b"")
pub_uri = "tcp://{}:{}".format(self.interface, self.port)
@ -120,11 +124,12 @@ class Collector(salt.utils.process.SignalHandlingProcess):
message_target = salt.utils.stringutils.to_str(messages[0])
is_syndic = self.minion_config.get("__role") == "syndic"
if (
not is_syndic and message_target not in ("broadcast", self.hexid)
) or (
is_syndic and message_target not in ("broadcast", "syndic")
):
log.debug("Publish received for not this minion: %s", message_target)
not is_syndic
and message_target not in ("broadcast", self.hexid)
) or (is_syndic and message_target not in ("broadcast", "syndic")):
log.debug(
"Publish received for not this minion: %s", message_target
)
raise salt.ext.tornado.gen.Return(None)
serial_payload = salt.payload.loads(messages[1])
else: