Revert windows test fix

This commit is contained in:
Daniel A. Wozniak 2023-07-23 22:35:20 -07:00 committed by Gareth J. Greenaway
parent 0b7285b766
commit 6320f769ea
6 changed files with 121 additions and 113 deletions

View file

@ -723,26 +723,7 @@ class Master(SMaster):
pub_channels.append(chan)
log.info("Creating master event publisher process")
if self.opts["ipc_mode"] == "tcp":
ipc_publisher = salt.transport.publish_server(
self.opts,
transport="tcp",
pub_host="127.0.0.1",
pub_port=int(self.opts["tcp_master_pub_port"]),
pull_host="127.0.0.1",
pull_port=int(self.opts["tcp_master_pull_port"]),
)
else:
ipc_publisher = salt.transport.publish_server(
self.opts,
transport="tcp",
pub_path=os.path.join(
self.opts["sock_dir"], "master_event_pub.ipc"
),
pull_path=os.path.join(
self.opts["sock_dir"], "master_event_pull.ipc"
),
)
ipc_publisher = salt.transport.ipc_publish_server("master", self.opts)
self.process_manager.add_process(
ipc_publisher.publish_daemon,
args=[

View file

@ -5,7 +5,6 @@ import asyncio
import binascii
import contextlib
import copy
import hashlib
import logging
import multiprocessing
import os
@ -1045,34 +1044,7 @@ class MinionManager(MinionBase):
def _bind(self):
# start up the event publisher, so we can see events during startup
hash_type = getattr(hashlib, self.opts["hash_type"])
id_hash = hash_type(
salt.utils.stringutils.to_bytes(self.opts["id"])
).hexdigest()[:10]
if self.opts["ipc_mode"] == "tcp":
ipc_publisher = salt.transport.publish_server(
self.opts,
pub_host="127.0.0.1",
pub_port=int(self.opts["tcp_pub_port"]),
pull_host="127.0.0.1",
pull_port=int(self.opts["tcp_pull_port"]),
transport="tcp",
)
else:
epub_sock_path = os.path.join(
self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
)
epull_sock_path = os.path.join(
self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
)
if os.path.exists(epub_sock_path):
os.unlink(epub_sock_path)
ipc_publisher = salt.transport.publish_server(
self.opts,
pub_path=epub_sock_path,
pull_path=epull_sock_path,
transport="tcp",
)
ipc_publisher = salt.transport.ipc_publish_server("minion", self.opts)
self.io_loop.spawn_callback(
ipc_publisher.publisher, ipc_publisher.publish_payload, self.io_loop
)

View file

@ -1,8 +1,6 @@
"""
Encapsulate the different transports available to Salt.
"""
import logging
import warnings
@ -12,6 +10,7 @@ from salt.transport.base import (
publish_server,
request_client,
request_server,
ipc_publish_server,
)
log = logging.getLogger(__name__)
@ -21,3 +20,12 @@ log = logging.getLogger(__name__)
warnings.filterwarnings(
"ignore", message="IOLoop.current expected instance.*", category=RuntimeWarning
)
__all__ = (
"TRANSPORTS",
"publish_client",
"publish_server",
"request_client",
"request_server",
"ipc_publish_server",
)

View file

@ -1,4 +1,8 @@
import os
import hashlib
import salt.utils.stringutils
TRANSPORTS = (
"zeromq",
@ -90,26 +94,6 @@ def publish_server(opts, **kwargs):
raise Exception("Transport type not found: {}".format(ttype))
def ipc_publish_client(opts, io_loop):
# Default to ZeroMQ for now
ttype = "zeromq"
# determine the ttype
if "transport" in opts:
ttype = opts["transport"]
elif "transport" in opts.get("pillar", {}).get("master", {}):
ttype = opts["pillar"]["master"]["transport"]
# switch on available ttypes
if ttype == "zeromq":
import salt.transport.zeromq
return salt.transport.zeromq.PublishClient(opts, io_loop)
elif ttype == "tcp":
import salt.transport.tcp
return salt.transport.tcp.TCPPubClient(opts, io_loop)
raise Exception("Transport type not found: {}".format(ttype))
def publish_client(opts, io_loop, host=None, port=None, path=None, transport=None):
# Default to ZeroMQ for now
ttype = "zeromq"
@ -138,6 +122,93 @@ def publish_client(opts, io_loop, host=None, port=None, path=None, transport=Non
raise Exception("Transport type not found: {}".format(ttype))
def _minion_hash(hash_type, minion_id):
"""
Generate a hash string for the minion id
"""
hasher = getattr(hashlib, hash_type)
return hasher(salt.utils.stringutils.to_bytes(minion_id)).hexdigest()[:10]
def ipc_publish_client(node, opts, io_loop):
# Default to ZeroMQ for now
ttype = "tcp"
kwargs = {}
if opts["ipc_mode"] == "tcp":
if node == "master":
kwargs.update(
host="127.0.0.1",
port=int(opts["tcp_master_pub_port"]),
)
else:
kwargs.update(
host="127.0.0.1",
port=int(opts["tcp_pub_port"]),
)
else:
if node == "master":
kwargs.update(
path=os.path.join(opts["sock_dir"], "master_event_pub.ipc"),
)
else:
id_hash = _minion_hash(
hash_type=opts["hash_type"],
minion_id=opts.get("hash_id", opts["id"]),
)
kwargs.update(
path=os.path.join(
opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
)
)
return publish_client(opts, io_loop, **kwargs)
def ipc_publish_server(node, opts):
# Default to TCP for now
kwargs = {"transport": "tcp"}
if opts["ipc_mode"] == "tcp":
if node == "master":
kwargs.update(
pub_host="127.0.0.1",
pub_port=int(opts["tcp_master_pub_port"]),
pull_host="127.0.0.1",
pull_port=int(opts["tcp_master_pull_port"]),
)
else:
kwargs.update(
pub_host="127.0.0.1",
pub_port=int(opts["tcp_pub_port"]),
pull_host="127.0.0.1",
pull_port=int(opts["tcp_pull_port"]),
)
else:
if node == "master":
kwargs.update(
pub_path=os.path.join(opts["sock_dir"], "master_event_pub.ipc"),
pull_path=os.path.join(opts["sock_dir"], "master_event_pull.ipc"),
)
else:
id_hash = _minion_hash(
hash_type=opts["hash_type"],
minion_id=opts.get("hash_id", opts["id"]),
)
pub_path = (
os.path.join(
opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
),
)
kwargs.update(
pub_path=pub_path,
pull_path=os.path.join(
opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
),
)
if os.path.exists(pub_path):
os.unlink(pub_path)
return publish_server(opts, **kwargs)
class RequestClient:
"""
The RequestClient transport is used to make requests and get corresponding

View file

@ -225,6 +225,7 @@ class SaltEvent:
is destroyed. This is useful when using event
loops from within third party asynchronous code
"""
self.node = node
self.keep_loop = keep_loop
if io_loop is not None:
self.io_loop = io_loop
@ -253,14 +254,6 @@ class SaltEvent:
if salt.utils.platform.is_windows() and "ipc_mode" not in opts:
self.opts["ipc_mode"] = "tcp"
(
self.pub_host,
self.pub_port,
self.pub_path,
self.pull_host,
self.pull_port,
self.pull_path,
) = self.__load_uri(sock_dir, node)
self.pending_tags = []
self.pending_events = []
self.__load_cache_regex()
@ -395,19 +388,17 @@ class SaltEvent:
"""
if self.cpub:
return True
kwargs = {"transport": "tcp"}
kwargs.update(host=self.pub_host, port=self.pub_port, path=self.pub_path)
if self._run_io_loop_sync:
if self.subscriber is None:
self.subscriber = salt.utils.asynchronous.SyncWrapper(
salt.transport.publish_client,
args=(self.opts,),
kwargs=kwargs,
salt.transport.ipc_publish_client,
args=(
self.node,
self.opts,
),
loop_kwarg="io_loop",
)
try:
# self.subscriber.connect(timeout=timeout)
log.debug("Event connect subscriber %r", self.pub_path)
self.subscriber.connect(timeout=timeout)
self.cpub = True
@ -425,15 +416,11 @@ class SaltEvent:
)
else:
if self.subscriber is None:
if "master_ip" not in self.opts:
self.opts["master_ip"] = ""
kwargs["io_loop"] = self.io_loop
self.subscriber = salt.transport.publish_client(self.opts, **kwargs)
self.subscriber = salt.transport.ipc_publish_client(
self.node, self.opts, io_loop=self.io_loop
)
log.debug("Event connect subscriber %r", self.pub_path)
self.io_loop.spawn_callback(self.subscriber.connect)
# self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
# self.puburi, io_loop=self.io_loop
# )
# For the asynchronous case, the connect will be defered to when
# set_event_handler() is invoked.
@ -466,17 +453,11 @@ class SaltEvent:
if self._run_io_loop_sync:
if self.pusher is None:
self.pusher = salt.utils.asynchronous.SyncWrapper(
salt.transport.publish_server,
args=(self.opts,),
kwargs={
"pub_host": self.pub_host,
"pub_port": self.pub_port,
"pub_path": self.pub_path,
"pull_host": self.pull_host,
"pull_port": self.pull_port,
"pull_path": self.pull_path,
"transport": "tcp",
},
salt.transport.ipc_publish_server,
args=(
self.node,
self.opts,
),
)
try:
# self.pusher.connect(timeout=timeout)
@ -492,15 +473,9 @@ class SaltEvent:
)
else:
if self.pusher is None:
self.pusher = salt.transport.publish_server(
self.pusher = salt.transport.ipc_publish_server(
self.node,
self.opts,
pub_host=self.pub_host,
pub_port=self.pub_port,
pub_path=self.pub_path,
pull_host=self.pull_host,
pull_port=self.pull_port,
pull_path=self.pull_path,
transport="tcp",
)
# For the asynchronous case, the connect will be deferred to when
# fire_event() is invoked.

View file

@ -143,6 +143,7 @@ async def test_send_req_async_regression_62453(minion_opts):
minion_opts["random_startup_delay"] = 0
minion_opts["return_retry_tries"] = 30
minion_opts["grains"] = {}
minion_opts["ipc_mode"] = "tcp"
with patch("salt.loader.grains"):
minion = salt.minion.Minion(minion_opts)
@ -151,7 +152,7 @@ async def test_send_req_async_regression_62453(minion_opts):
# We are just validating no exception is raised
rtn = await minion._send_req_async(load, timeout)
assert rtn is None
assert rtn is False
def test_mine_send_tries():