Preserve ipc_mod logic

This commit is contained in:
Daniel A. Wozniak 2023-07-03 00:01:58 -07:00 committed by Gareth J. Greenaway
parent 5540fd8111
commit d52df08f22
5 changed files with 159 additions and 66 deletions

View file

@ -723,12 +723,26 @@ class Master(SMaster):
pub_channels.append(chan)
log.info("Creating master event publisher process")
ipc_publisher = salt.transport.publish_server(
self.opts,
pub_path=os.path.join(self.opts["sock_dir"], "master_event_pub.ipc"),
pull_path=os.path.join(self.opts["sock_dir"], "master_event_pull.ipc"),
transport="tcp",
)
if self.opts["ipc_mode"] == "tcp":
ipc_publisher = salt.transport.publish_server(
self.opts,
transport="tcp",
pub_host="127.0.0.1",
pub_port=int(self.opts["tcp_master_pub_port"]),
pull_host="127.0.0.1",
pull_port=int(self.opts["tcp_master_pull_port"]),
)
else:
ipc_publisher = salt.transport.publish_server(
self.opts,
transport="tcp",
pub_path=os.path.join(
self.opts["sock_dir"], "master_event_pub.ipc"
),
pull_path=os.path.join(
self.opts["sock_dir"], "master_event_pull.ipc"
),
)
self.process_manager.add_process(
ipc_publisher.publish_daemon,
args=[

View file

@ -1049,20 +1049,30 @@ class MinionManager(MinionBase):
id_hash = hash_type(
salt.utils.stringutils.to_bytes(self.opts["id"])
).hexdigest()[:10]
epub_sock_path = os.path.join(
self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
)
epull_sock_path = os.path.join(
self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
)
if os.path.exists(epub_sock_path):
os.unlink(epub_sock_path)
ipc_publisher = salt.transport.publish_server(
self.opts,
pub_path=epub_sock_path,
pull_path=epull_sock_path,
transport="tcp",
)
if self.opts["ipc_mode"] == "tcp":
ipc_publisher = salt.transport.publish_server(
self.opts,
pub_host="127.0.0.1",
pub_port=int(self.opts["tcp_pub_port"]),
pull_host="127.0.0.1",
pull_port=int(self.opts["tcp_pull_port"]),
transport="tcp",
)
else:
epub_sock_path = os.path.join(
self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
)
epull_sock_path = os.path.join(
self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
)
if os.path.exists(epub_sock_path):
os.unlink(epub_sock_path)
ipc_publisher = salt.transport.publish_server(
self.opts,
pub_path=epub_sock_path,
pull_path=epull_sock_path,
transport="tcp",
)
self.io_loop.spawn_callback(
ipc_publisher.publisher, ipc_publisher.publish_payload, self.io_loop
)

View file

@ -298,7 +298,9 @@ class TCPPubClient(salt.transport.base.PublishClient):
ssl_options=self.opts.get("ssl"),
**kwargs,
)
log.error("PubClient conencted to %r %r:%r", self, self.host, self.port)
log.error(
"PubClient conencted to %r %r:%r", self, self.host, self.port
)
else:
sock_type = socket.AF_UNIX
stream = tornado.iostream.IOStream(
@ -398,7 +400,11 @@ class TCPPubClient(salt.transport.base.PublishClient):
elif timeout:
try:
return await asyncio.wait_for(self.recv(), timeout=timeout)
except (TimeoutError, asyncio.exceptions.TimeoutError, asyncio.exceptions.CancelledError):
except (
TimeoutError,
asyncio.exceptions.TimeoutError,
asyncio.exceptions.CancelledError,
):
self.close()
await self.connect()
return
@ -416,9 +422,9 @@ class TCPPubClient(salt.transport.base.PublishClient):
self.close()
await self.connect()
continue
#except AttributeError:
# except AttributeError:
# return
#except Exception:
# except Exception:
# raise
finally:
self._read_in_progress.release()
@ -1325,9 +1331,7 @@ class TCPPuller:
# pylint: disable=W1701
def __del__(self):
if not self._closing:
warnings.warn(
"unclosed tcp puller {self!r}", ResourceWarning, source=self
)
warnings.warn("unclosed tcp puller {self!r}", ResourceWarning, source=self)
# pylint: enable=W1701
@ -1368,10 +1372,18 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
return not self.opts.get("order_masters", False)
def __setstate__(self, state):
self.__init__(state["opts"])
self.__init__(**state)
def __getstate__(self):
return {"opts": self.opts}
return {
"opts": self.opts,
"pub_host": self.pub_host,
"pub_port": self.pub_port,
"pub_path": self.pub_path,
"pull_host": self.pull_host,
"pull_port": self.pull_port,
"pull_path": self.pull_path,
}
def publish_daemon(
self,
@ -1459,7 +1471,11 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
log.debug("Connect pusher %s", self.pull_path)
self.pub_sock = salt.utils.asynchronous.SyncWrapper(
_TCPPubServerPublisher,
(self.pull_path,),
(
self.pull_host,
self.pull_port,
self.pull_path,
),
loop_kwarg="io_loop",
)
self.pub_sock.connect()
@ -1519,7 +1535,7 @@ class _TCPPubServerPublisher:
"close",
]
def __init__(self, socket_path, io_loop=None):
def __init__(self, host, port, path, io_loop=None):
"""
Create a new IPC client
@ -1529,7 +1545,9 @@ class _TCPPubServerPublisher:
"""
self.io_loop = io_loop or tornado.ioloop.IOLoop.current()
self.socket_path = socket_path
self.host = host
self.port = port
self.path = path
self._closing = False
self.stream = None
# msgpack deprecated `encoding` starting with version 0.5.2
@ -1573,12 +1591,12 @@ class _TCPPubServerPublisher:
"""
Connect to a running IPCServer
"""
if isinstance(self.socket_path, int):
if isinstance(self.path, int):
sock_type = socket.AF_INET
sock_addr = ("127.0.0.1", self.socket_path)
sock_addr = (self.host, self.port)
else:
sock_type = socket.AF_UNIX
sock_addr = self.socket_path
sock_addr = self.path
self.stream = None
if timeout is not None:
@ -1595,7 +1613,10 @@ class _TCPPubServerPublisher:
)
try:
log.trace(
"TCPMessageClient: Connecting to socket: %s", self.socket_path
"TCPMessageClient: Connecting to socket: %s:%s %s",
self.host,
self.port,
self.path,
)
await self.stream.connect(sock_addr)
self._connecting_future.set_result(True)

View file

@ -253,7 +253,14 @@ class SaltEvent:
if salt.utils.platform.is_windows() and "ipc_mode" not in opts:
self.opts["ipc_mode"] = "tcp"
self.puburi, self.pulluri = self.__load_uri(sock_dir, node)
(
self.pub_host,
self.pub_port,
self.pub_path,
self.pull_host,
self.pull_port,
self.pull_path,
) = self.__load_uri(sock_dir, node)
self.pending_tags = []
self.pending_events = []
self.__load_cache_regex()
@ -281,17 +288,53 @@ class SaltEvent:
Return the string URI for the location of the pull and pub sockets to
use for firing and listening to events
"""
pub_host = None
pub_port = None
pub_path = None
pull_host = None
pull_port = None
pull_path = None
if node == "master":
if self.opts["ipc_mode"] == "tcp":
puburi = int(self.opts["tcp_master_pub_port"])
pulluri = int(self.opts["tcp_master_pull_port"])
pub_host = "127.0.0.1"
pub_port = int(self.opts["tcp_master_pub_port"])
pull_host = "127.0.0.1"
pull_port = int(self.opts["tcp_master_pull_port"])
log.debug(
"%s PUB socket URI: %s:%s",
self.__class__.__name__,
pub_host,
pub_port,
)
log.debug(
"%s PULL socket URI: %s:%s",
self.__class__.__name__,
pull_host,
pull_port,
)
else:
puburi = os.path.join(sock_dir, "master_event_pub.ipc")
pulluri = os.path.join(sock_dir, "master_event_pull.ipc")
pub_path = os.path.join(sock_dir, "master_event_pub.ipc")
pull_path = os.path.join(sock_dir, "master_event_pull.ipc")
log.debug("%s PUB socket URI: %s", self.__class__.__name__, pub_path)
log.debug("%s PULL socket URI: %s", self.__class__.__name__, pull_path)
else:
if self.opts["ipc_mode"] == "tcp":
puburi = int(self.opts["tcp_pub_port"])
pulluri = int(self.opts["tcp_pull_port"])
pub_host = "127.0.0.1"
pub_port = int(self.opts["tcp_pub_port"])
pull_host = "127.0.0.1"
pull_port = int(self.opts["tcp_pull_port"])
log.debug(
"%s PUB socket URI: %s:%s",
self.__class__.__name__,
pub_host,
pub_port,
)
log.debug(
"%s PULL socket URI: %s:%s",
self.__class__.__name__,
pull_host,
pull_port,
)
else:
hash_type = getattr(hashlib, self.opts["hash_type"])
# Only use the first 10 chars to keep longer hashes from exceeding the
@ -300,15 +343,15 @@ class SaltEvent:
id_hash = hash_type(
salt.utils.stringutils.to_bytes(minion_id)
).hexdigest()[:10]
puburi = os.path.join(
pub_path = os.path.join(
sock_dir, "minion_event_{}_pub.ipc".format(id_hash)
)
pulluri = os.path.join(
pull_path = os.path.join(
sock_dir, "minion_event_{}_pull.ipc".format(id_hash)
)
log.debug("%s PUB socket URI: %s", self.__class__.__name__, puburi)
log.debug("%s PULL socket URI: %s", self.__class__.__name__, pulluri)
return puburi, pulluri
log.debug("%s PUB socket URI: %s", self.__class__.__name__, pub_path)
log.debug("%s PULL socket URI: %s", self.__class__.__name__, pull_path)
return pub_host, pub_port, pub_path, pull_host, pull_port, pull_path
def subscribe(self, tag=None, match_type=None):
"""
@ -354,10 +397,7 @@ class SaltEvent:
return True
kwargs = {"transport": "tcp"}
if isinstance(self.puburi, int):
kwargs.update(host="127.0.0.1", port=self.puburi)
else:
kwargs.update(path=self.puburi)
kwargs.update(host=self.pub_host, port=self.pub_port, path=self.pub_path)
if self._run_io_loop_sync:
if self.subscriber is None:
self.subscriber = salt.utils.asynchronous.SyncWrapper(
@ -368,7 +408,7 @@ class SaltEvent:
)
try:
# self.subscriber.connect(timeout=timeout)
log.debug("Event connect subscriber %r", self.puburi)
log.debug("Event connect subscriber %r", self.pub_path)
self.subscriber.connect()
self.cpub = True
except tornado.iostream.StreamClosedError:
@ -389,7 +429,7 @@ class SaltEvent:
self.opts["master_ip"] = ""
kwargs["io_loop"] = self.io_loop
self.subscriber = salt.transport.publish_client(self.opts, **kwargs)
log.debug("Event connect subscriber %r", self.puburi)
log.debug("Event connect subscriber %r", self.pub_path)
self.io_loop.spawn_callback(self.subscriber.connect)
# self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
# self.puburi, io_loop=self.io_loop
@ -429,8 +469,12 @@ class SaltEvent:
salt.transport.publish_server,
args=(self.opts,),
kwargs={
"pub_path": self.puburi,
"pull_path": self.pulluri,
"pub_host": self.pub_host,
"pub_port": self.pub_port,
"pub_path": self.pub_path,
"pull_host": self.pull_host,
"pull_port": self.pull_port,
"pull_path": self.pull_path,
"transport": "tcp",
},
)
@ -450,8 +494,12 @@ class SaltEvent:
if self.pusher is None:
self.pusher = salt.transport.publish_server(
self.opts,
pub_path=self.puburi,
pull_path=self.pulluri,
pub_host=self.pub_host,
pub_port=self.pub_port,
pub_path=self.pub_path,
pull_host=self.pull_host,
pull_port=self.pull_port,
pull_path=self.pull_path,
transport="tcp",
)
# For the asynchronous case, the connect will be deferred to when

View file

@ -48,8 +48,8 @@ def _assert_got_event(evt, data, msg=None, expected_failure=False):
def test_master_event(sock_dir):
with salt.utils.event.MasterEvent(str(sock_dir), listen=False) as me:
assert me.puburi == str(sock_dir / "master_event_pub.ipc")
assert me.pulluri == str(sock_dir / "master_event_pull.ipc")
assert me.pub_path == str(sock_dir / "master_event_pub.ipc")
assert me.pull_path == str(sock_dir / "master_event_pull.ipc")
def test_minion_event(sock_dir):
@ -58,22 +58,22 @@ def test_minion_event(sock_dir):
:10
]
with salt.utils.event.MinionEvent(opts, listen=False) as me:
assert me.puburi == str(sock_dir / f"minion_event_{id_hash}_pub.ipc")
assert me.pulluri == str(sock_dir / f"minion_event_{id_hash}_pull.ipc")
assert me.pub_path == str(sock_dir / f"minion_event_{id_hash}_pub.ipc")
assert me.pull_path == str(sock_dir / f"minion_event_{id_hash}_pull.ipc")
def test_minion_event_tcp_ipc_mode():
opts = dict(id="foo", ipc_mode="tcp")
with salt.utils.event.MinionEvent(opts, listen=False) as me:
assert me.puburi == 4510
assert me.pulluri == 4511
assert me.pub_port == 4510
assert me.pull_port == 4511
def test_minion_event_no_id(sock_dir):
with salt.utils.event.MinionEvent(dict(sock_dir=str(sock_dir)), listen=False) as me:
id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes("")).hexdigest()[:10]
assert me.puburi == str(sock_dir / f"minion_event_{id_hash}_pub.ipc")
assert me.pulluri == str(sock_dir / f"minion_event_{id_hash}_pull.ipc")
assert me.pub_path == str(sock_dir / f"minion_event_{id_hash}_pub.ipc")
assert me.pull_path == str(sock_dir / f"minion_event_{id_hash}_pull.ipc")
@pytest.mark.slow_test