Add transport factories for ipc comms

This commit is contained in:
Daniel A. Wozniak 2023-07-24 18:45:25 -07:00 committed by Gareth J. Greenaway
parent 6320f769ea
commit 9ef3a59698
3 changed files with 19 additions and 14 deletions

View file

@ -6,11 +6,12 @@ import warnings
from salt.transport.base import (
TRANSPORTS,
ipc_publish_client,
ipc_publish_server,
publish_client,
publish_server,
request_client,
request_server,
ipc_publish_server,
)
log = logging.getLogger(__name__)
@ -23,9 +24,10 @@ warnings.filterwarnings(
__all__ = (
"TRANSPORTS",
"ipc_publish_client",
"ipc_publish_server",
"publish_client",
"publish_server",
"request_client",
"request_server",
"ipc_publish_server",
)

View file

@ -1,13 +1,11 @@
import os
import hashlib
import os
import salt.utils.stringutils
TRANSPORTS = (
"zeromq",
"tcp",
"ws",
)
@ -132,9 +130,7 @@ def _minion_hash(hash_type, minion_id):
def ipc_publish_client(node, opts, io_loop):
# Default to ZeroMQ for now
ttype = "tcp"
kwargs = {}
kwargs = {"transport": "tcp"}
if opts["ipc_mode"] == "tcp":
if node == "master":
kwargs.update(
@ -193,10 +189,8 @@ def ipc_publish_server(node, opts):
hash_type=opts["hash_type"],
minion_id=opts.get("hash_id", opts["id"]),
)
pub_path = (
os.path.join(
opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
),
pub_path = os.path.join(
opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
)
kwargs.update(
pub_path=pub_path,
@ -333,6 +327,17 @@ class PublishClient:
"""
raise NotImplementedError
async def recv(self, timeout=None):
"""
Receive a single message from the publish server.
The default timeout=None will wait indefinitly for a message. When
timeout is 0 return immediately if no message is ready. A positive
value sepcifies a period of time to wait for a message before raising a
TimeoutError.
"""
raise NotImplementedError
def close(self):
"""
Close the underlying network connection

View file

@ -399,7 +399,6 @@ class SaltEvent:
loop_kwarg="io_loop",
)
try:
log.debug("Event connect subscriber %r", self.pub_path)
self.subscriber.connect(timeout=timeout)
self.cpub = True
except tornado.iostream.StreamClosedError:
@ -419,7 +418,6 @@ class SaltEvent:
self.subscriber = salt.transport.ipc_publish_client(
self.node, self.opts, io_loop=self.io_loop
)
log.debug("Event connect subscriber %r", self.pub_path)
self.io_loop.spawn_callback(self.subscriber.connect)
# For the asynchronous case, the connect will be defered to when