This commit is contained in:
Daniel A. Wozniak 2023-06-27 17:17:14 -07:00 committed by Gareth J. Greenaway
parent ea3322b412
commit fe5990536b
7 changed files with 25 additions and 50 deletions

View file

@ -14,7 +14,6 @@ from collections import deque
from salt._logging.mixins import ExcInfoOnLogLevelFormatMixin
from salt.utils.versions import warn_until_date
log = logging.getLogger(__name__)

View file

@ -103,7 +103,6 @@ class AsyncReqChannel:
"_uncrypted_transfer",
"send",
"connect",
# "close",
]
close_methods = [
"close",

View file

@ -717,9 +717,6 @@ class PubServerChannel:
transport = salt.transport.publish_server(opts, **kwargs)
return cls(opts, transport, presence_events=presence_events)
def __repr__(self):
return f"<PubServerChannel pub_uri={self.transport.pub_uri} pull_uri={self.transport.pull_uri} at {id(self)}>"
def __init__(self, opts, transport, presence_events=False):
self.opts = opts
self.ckminions = salt.utils.minions.CkMinions(self.opts)

View file

@ -723,24 +723,17 @@ class Master(SMaster):
pub_channels.append(chan)
log.info("Creating master event publisher process")
# self.process_manager.add_process(
# salt.utils.event.EventPublisher,
# args=(self.opts,),
# name="EventPublisher",
# )
ipc_publisher = salt.transport.publish_server(self.opts)
ipc_publisher.pub_uri = "ipc://{}".format(
os.path.join(self.opts["sock_dir"], "master_event_pub.ipc")
)
ipc_publisher.pull_uri = "ipc://{}".format(
os.path.join(self.opts["sock_dir"], "master_event_pull.ipc")
ipc_publisher = salt.transport.publish_server(
self.opts,
pub_path=os.path.join(self.opts["sock_dir"], "master_event_pub.ipc"),
pull_path=os.path.join(self.opts["sock_dir"], "master_event_pull.ipc"),
)
self.process_manager.add_process(
ipc_publisher.publish_daemon,
args=[
ipc_publisher.publish_payload,
],
name="EventPublisher",
)
if self.opts.get("reactor"):

View file

@ -1213,6 +1213,19 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
Bind to the interface specified in the configuration file
"""
io_loop = tornado.ioloop.IOLoop()
ioloop.add_callback(self.publisher, publish_payload)
# run forever
try:
io_loop.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
self.close()
async def publisher(self, publish_payload, ioloop=None):
if ioloop is None:
ioloop = tornado.ioloop.IOLoop.current()
ioloop.asyncio_loop.set_debug(True)
# log.error(
# "TCPPubServer daemon %r %s %s %s",
# self,
@ -1259,7 +1272,7 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
else:
pull_uri = self.pull_port
pull_sock = salt.transport.ipc.IPCMessageServer(
self.pull_sock = salt.transport.ipc.IPCMessageServer(
pull_uri,
io_loop=io_loop,
payload_handler=publish_payload,
@ -1268,15 +1281,7 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
# Securely create socket
log.warning("Starting the Salt Puller on %s", pull_uri)
with salt.utils.files.set_umask(0o177):
pull_sock.start()
# run forever
try:
io_loop.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
pull_sock.close()
self.pull_sock.start()
def pre_fork(self, process_manager):
"""
@ -1286,11 +1291,8 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
"""
process_manager.add_process(self.publish_daemon, name=self.__class__.__name__)
@tornado.gen.coroutine
def publish_payload(self, payload, *args):
# log.error("Publish paylaod %r %r", payload, args)
ret = yield self.pub_server.publish_payload(payload) # , *args)
raise tornado.gen.Return(ret)
async def publish_payload(self, payload, *args):
return await self.pub_server.publish_payload(payload)
def connect(self):
# path = self.pull_uri.replace("ipc://", "")

View file

@ -866,7 +866,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
try:
ioloop.start()
finally:
self.daemon_context.term()
self.close()
def _get_sockets(self, context, ioloop):
pub_sock = context.socket(zmq.PUB)

View file

@ -431,16 +431,8 @@ class SaltEvent:
kwargs={
"pub_path": self.puburi,
"pull_path": self.pulluri,
}
},
)
self.pusher.obj.pub_uri = "ipc://{}".format(self.puburi)
self.pusher.obj.pull_uri = "ipc://{}".format(self.pulluri)
# self.pusher = salt.utils.asynchronous.SyncWrapper(
# salt.transport.ipc.IPCMessageClient,
# args=(self.pulluri,),
# kwargs={"io_loop": self.io_loop},
# loop_kwarg="io_loop",
# )
try:
# self.pusher.connect(timeout=timeout)
self.pusher.connect()
@ -455,16 +447,9 @@ class SaltEvent:
)
else:
if self.pusher is None:
# self.pusher = salt.transport.ipc.IPCMessageClient(
# self.pulluri, io_loop=self.io_loop
# )
self.pusher = salt.transport.publish_server(
self.opts,
pub_path=self.puburi,
pull_path=self.pulluri
self.opts, pub_path=self.puburi, pull_path=self.pulluri
)
self.pusher.pub_uri = "ipc://{}".format(self.puburi)
self.pusher.pull_uri = "ipc://{}".format(self.pulluri)
# For the asynchronous case, the connect will be deferred to when
# fire_event() is invoked.
self.cpush = True