Keep tcp as ipc transport, for now

This commit is contained in:
Daniel A. Wozniak 2023-06-30 16:56:57 -07:00 committed by Gareth J. Greenaway
parent 5f36621afa
commit e0bea13bf2
6 changed files with 25 additions and 31 deletions

View file

@ -727,6 +727,7 @@ class Master(SMaster):
self.opts,
pub_path=os.path.join(self.opts["sock_dir"], "master_event_pub.ipc"),
pull_path=os.path.join(self.opts["sock_dir"], "master_event_pull.ipc"),
transport="tcp",
)
self.process_manager.add_process(
ipc_publisher.publish_daemon,
@ -1043,7 +1044,6 @@ class MWorker(salt.utils.process.SignalHandlingProcess):
ret = await self._handle_clear(load)
else:
ret = self._handle_aes(load)
# ret = {"aes": self._handle_aes, "clear": self._handle_clear}[key](load)
return ret
def _post_stats(self, start, cmd):
@ -2167,7 +2167,6 @@ class ClearFuncs(TransportMethods):
This method sends out publications to the minions, it can only be used
by the LocalClient.
"""
log.error("CLEAR LOAD %r", clear_load)
extra = clear_load.get("kwargs", {})
publisher_acl = salt.acl.PublisherACL(self.opts["publisher_acl_blacklist"])

View file

@ -1061,6 +1061,7 @@ class MinionManager(MinionBase):
self.opts,
pub_path=epub_sock_path,
pull_path=epull_sock_path,
transport="tcp",
)
self.io_loop.spawn_callback(
ipc_publisher.publisher, ipc_publisher.publish_payload, self.io_loop

View file

@ -25,10 +25,6 @@ def request_server(opts, **kwargs):
import salt.transport.tcp
return salt.transport.tcp.TCPReqServer(opts)
elif ttype == "ws":
import salt.transport.ws
return salt.transport.ws.RequestServer(opts)
elif ttype == "local":
import salt.transport.local
@ -51,10 +47,6 @@ def request_client(opts, io_loop):
import salt.transport.tcp
return salt.transport.tcp.TCPReqClient(opts, io_loop=io_loop)
elif ttype == "ws":
import salt.transport.ws
return salt.transport.ws.RequestClient(opts, io_loop=io_loop)
else:
raise Exception("Channels are only defined for tcp, zeromq")
@ -63,7 +55,9 @@ def publish_server(opts, **kwargs):
# Default to ZeroMQ for now
ttype = "zeromq"
# determine the ttype
if "transport" in opts:
if "transport" in kwargs:
ttype = kwargs.pop("transport")
elif "transport" in opts:
ttype = opts["transport"]
elif "transport" in opts.get("pillar", {}).get("master", {}):
ttype = opts["pillar"]["master"]["transport"]
@ -89,10 +83,6 @@ def publish_server(opts, **kwargs):
import salt.transport.tcp
return salt.transport.tcp.TCPPublishServer(opts, **kwargs)
elif ttype == "ws":
import salt.transport.ws
return salt.transport.ws.PublishServer(opts, **kwargs)
elif ttype == "local": # TODO:
import salt.transport.local
@ -120,11 +110,13 @@ def ipc_publish_client(opts, io_loop):
raise Exception("Transport type not found: {}".format(ttype))
def publish_client(opts, io_loop, host=None, port=None, path=None):
def publish_client(opts, io_loop, host=None, port=None, path=None, transport=None):
# Default to ZeroMQ for now
ttype = "zeromq"
# determine the ttype
if "transport" in opts:
if transport is not None:
ttype = transport
elif "transport" in opts:
ttype = opts["transport"]
elif "transport" in opts.get("pillar", {}).get("master", {}):
ttype = opts["pillar"]["master"]["transport"]
@ -142,12 +134,6 @@ def publish_client(opts, io_loop, host=None, port=None, path=None):
return salt.transport.tcp.TCPPubClient(
opts, io_loop, host=host, port=port, path=path
)
elif ttype == "ws":
import salt.transport.ws
return salt.transport.ws.PublishClient(
opts, io_loop, host=host, port=port, path=path
)
raise Exception("Transport type not found: {}".format(ttype))

View file

@ -16,6 +16,7 @@ import socket
import threading
import time
import urllib
import warnings
import tornado
import tornado.concurrent
@ -259,14 +260,18 @@ class TCPPubClient(salt.transport.base.PublishClient):
if self._closing:
return
self._closing = True
self._stream.close()
if self._stream is not None:
self._stream.close()
self._stream = None
self._closed = True
# pylint: disable=W1701
def __del__(self):
if not self._closing:
warnings.warn("%r not closed", self)
warnings.warn(
"unclosed publish client {self!r}", ResourceWarning, source=self
)
# pylint: enable=W1701
async def getstream(self, **kwargs):
@ -754,6 +759,7 @@ class MessageClient:
def __del__(self):
if not self._closing:
warnings.warn("%r not closed", self)
# pylint: enable=W1701
async def getstream(self, **kwargs):
@ -913,10 +919,6 @@ class MessageClient:
await asyncio.sleep(0.03)
message_id = self._message_id()
header = {"mid": message_id}
# item = salt.transport.frame.frame_msg(msg, header=header)
# await self._stream.write(item)
# if reply:
# return await self.recv(timeout=None)
future = tornado.concurrent.Future()
if callback is not None:
@ -989,7 +991,6 @@ class MessageClient:
return framed_msg["body"]
finally:
self._read_in_progress.release()
# await asyncio.sleep(.003)
class Subscriber:
@ -1023,6 +1024,7 @@ class Subscriber:
def __del__(self):
if not self._closing:
warnings.warn("%r not closed", self)
# pylint: enable=W1701
@ -1277,6 +1279,7 @@ class TCPPuller:
def __del__(self):
if not self._closing:
warnings.warn("%r not closed", self)
# pylint: enable=W1701
def __enter__(self):
@ -1587,6 +1590,7 @@ class _TCPPubServerPublisher:
def __del__(self):
if not self._closing:
warnings.warn("%r not closed", self)
# pylint: enable=W1701
def __enter__(self):

View file

@ -630,6 +630,10 @@ class AsyncReqMessageClient:
http://api.zeromq.org/2-1:zmq-setsockopt [ZMQ_LINGER]
:param IOLoop io_loop: A Tornado IOLoop event scheduler [tornado.ioloop.IOLoop]
"""
salt.utils.versions.warn_until(
3008,
"AsyncReqMessageClient has been deprecated and will be removed.",
)
self.opts = opts
self.addr = addr
self.linger = linger

View file

@ -353,7 +353,7 @@ class SaltEvent:
if self.cpub:
return True
kwargs = {}
kwargs = {"transport": "tcp"}
if isinstance(self.puburi, int):
kwargs.update(host="127.0.0.1", port=self.puburi)
else: