From fb4ce8a741386d1ad4218cfa20648ec38801bdfa Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Thu, 15 Jun 2023 17:57:05 -0700 Subject: [PATCH] Fix connection details for minion ipc --- salt/cli/salt.py | 8 +- salt/master.py | 19 +- salt/minion.py | 22 ++ salt/transport/ipc.py | 1 - salt/transport/zeromq.py | 119 ++++++---- salt/utils/event.py | 474 +++++++++++++++++++++------------------ 6 files changed, 374 insertions(+), 269 deletions(-) diff --git a/salt/cli/salt.py b/salt/cli/salt.py index f90057f668e..600a5d9850d 100644 --- a/salt/cli/salt.py +++ b/salt/cli/salt.py @@ -32,6 +32,7 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser): import salt.client self.parse_args() + print("A") try: # We don't need to bail on config file permission errors @@ -47,6 +48,7 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser): self.exit(2, "{}\n".format(exc)) return + print("B") if self.options.batch or self.options.static: # _run_batch() will handle all output and # exit with the appropriate error condition @@ -63,6 +65,7 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser): if self.options.timeout <= 0: self.options.timeout = self.local_client.opts["timeout"] + print("C") kwargs = { "tgt": self.config["tgt"], "fun": self.config["fun"], @@ -90,6 +93,7 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser): else: kwargs["tgt_type"] = "glob" + print("D") # If batch_safe_limit is set, check minions matching target and # potentially switch to batch execution if self.options.batch_safe_limit > 1: @@ -151,6 +155,7 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser): ) return + print("E") # local will be None when there was an error if not self.local_client: return @@ -222,8 +227,9 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser): AuthorizationError, SaltInvocationError, EauthAuthenticationError, - SaltClientError, + #SaltClientError, ) as exc: + print(repr(exc)) ret = str(exc) self._output_ret(ret, "", retcode=1) finally: diff --git a/salt/master.py b/salt/master.py index d2d8110b02a..4de61f18fcb 100644 --- a/salt/master.py +++ b/salt/master.py @@ -722,10 +722,22 @@ class Master(SMaster): pub_channels.append(chan) log.info("Creating master event publisher process") + #self.process_manager.add_process( + # salt.utils.event.EventPublisher, + # args=(self.opts,), + # name="EventPublisher", + #) + + ipc_publisher = salt.transport.publish_server(self.opts) + ipc_publisher.pub_uri = "ipc://{}".format( + os.path.join(self.opts["sock_dir"], "master_event_pub.ipc") + ) + ipc_publisher.pull_uri = "ipc://{}".format( + os.path.join(self.opts["sock_dir"], "master_event_pull.ipc") + ) self.process_manager.add_process( - salt.utils.event.EventPublisher, - args=(self.opts,), - name="EventPublisher", + ipc_publisher.publish_daemon, + args=[ipc_publisher.publish_payload,], ) if self.opts.get("reactor"): @@ -2359,6 +2371,7 @@ class ClearFuncs(TransportMethods): chan = salt.channel.server.PubServerChannel.factory(opts) self.channels.append(chan) for chan in self.channels: + log.error("SEND PUB %r", load) chan.publish(load) @property diff --git a/salt/minion.py b/salt/minion.py index 91441400ac0..b6ef65ef060 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -1048,6 +1048,28 @@ class MinionManager(MinionBase): self.opts, io_loop=self.io_loop, ) + def target(): + import hashlib + self.opts['publish_port'] = 12321 + hash_type = getattr(hashlib, self.opts["hash_type"]) + ipc_publisher = salt.transport.publish_server(self.opts) + id_hash = hash_type( + salt.utils.stringutils.to_bytes(self.opts["id"]) + ).hexdigest()[:10] + epub_sock_path = "ipc://{}".format(os.path.join( + self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash) + )) + if os.path.exists(epub_sock_path): + os.unlink(epub_sock_path) + epull_sock_path = "ipc://{}".format(os.path.join( + self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash) + )) + ipc_publisher.pub_uri = epub_sock_path + ipc_publisher.pull_uri = epull_sock_path + ipc_publisher.publish_daemon(ipc_publisher.publish_payload) + + thread = threading.Thread(target=target) + thread.start() self.event = salt.utils.event.get_event( "minion", opts=self.opts, io_loop=self.io_loop ) diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index f80dd0e7562..e56fb5ee9e7 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -676,7 +676,6 @@ class IPCMessageSubscriber(IPCClient): self._read_stream_future = self.stream.read_bytes( 4096, partial=True ) - if timeout is None: wire_bytes = yield self._read_stream_future else: diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index a5e1a6876ca..679544de6db 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -108,6 +108,14 @@ class PublishClient(salt.transport.base.PublishClient): ttype = "zeromq" + async_methods = [ + "connect", + "recv", + ] + close_methods = [ + "close", + ] + def __init__(self, opts, io_loop, **kwargs): super().__init__(opts, io_loop, **kwargs) self.callbacks = {} @@ -149,18 +157,19 @@ class PublishClient(salt.transport.base.PublishClient): zmq.TCP_KEEPALIVE_INTVL, self.opts["tcp_keepalive_intvl"] ) - recon_delay = self.opts["recon_default"] + recon_delay = self.opts.get("recon_default", 1) + recon_max = self.opts.get("recon_max", 1) - if self.opts["recon_randomize"]: + if self.opts.get("recon_randomize"): recon_delay = randint( - self.opts["recon_default"], - self.opts["recon_default"] + self.opts["recon_max"], + recon_delay, + recon_delay + recon_max, ) log.debug( "Generated random reconnect delay between '%sms' and '%sms' (%s)", - self.opts["recon_default"], - self.opts["recon_default"] + self.opts["recon_max"], + recon_delay, + recon_delay + recon_max, recon_delay, ) @@ -170,12 +179,12 @@ class PublishClient(salt.transport.base.PublishClient): if hasattr(zmq, "RECONNECT_IVL_MAX"): log.debug( "Setting zmq_reconnect_ivl_max to '%sms'", - self.opts["recon_default"] + self.opts["recon_max"], + recon_delay + recon_max, ) - self._socket.setsockopt(zmq.RECONNECT_IVL_MAX, self.opts["recon_max"]) + self._socket.setsockopt(zmq.RECONNECT_IVL_MAX, recon_max) - if (self.opts["ipv6"] is True or ":" in self.opts["master_ip"]) and hasattr( + if (self.opts["ipv6"] is True or ":" in self.opts.get("master_ip", "")) and hasattr( zmq, "IPV4ONLY" ): # IPv6 sockets work for both IPv6 and IPv4 addresses @@ -213,13 +222,24 @@ class PublishClient(salt.transport.base.PublishClient): self, publish_port, connect_callback=None, disconnect_callback=None ): self.publish_port = publish_port - log.debug( + log.error( "Connecting the Minion to the Master publish port, using the URI: %s", self.master_pub, ) self._socket.connect(self.master_pub) # await connect_callback(True) + @tornado.gen.coroutine + def connect_uri(self, uri, connect_callback=None, disconnect_callback=None): + log.error( + "Connecting the Minion to the Master publish port, using the URI: %s", + uri + ) + #log.debug("%r connecting to %s", self, self.master_pub) + self._socket.connect(uri) + if connect_callback: + connect_callback(True) + @property def master_pub(self): """ @@ -277,6 +297,9 @@ class PublishClient(salt.transport.base.PublishClient): async def recv(self, timeout=None): return await self._socket.recv() + async def recv(self, timeout=None): + return await self._socket.recv() + @tornado.gen.coroutine def send(self, msg): self.stream.send(msg, noblock=True) @@ -690,6 +713,15 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): def __init__(self, opts): self.opts = opts + if self.opts.get("ipc_mode", "") == "tcp": + self.pull_uri = "tcp://127.0.0.1:{}".format( + self.opts.get("tcp_master_publish_pull", 4514) + ) + else: + self.pull_uri = "ipc://{}".format( + os.path.join(self.opts["sock_dir"], "publish_pull.ipc") + ) + self.pub_uri = "tcp://{interface}:{publish_port}".format(**self.opts) def connect(self): return tornado.gen.sleep(5) @@ -742,7 +774,8 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): async def on_recv(packages): for package in packages: - payload = salt.payload.loads(package) + log.error("PACAKGE %s %s %r", self.pull_uri, self.pub_uri, package) +# payload = salt.payload.loads(package) await publish_payload(payload) self.task = None @@ -757,25 +790,22 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): if self.task: self.task.cancel() - @property - def pull_uri(self): - if self.opts.get("ipc_mode", "") == "tcp": - pull_uri = "tcp://127.0.0.1:{}".format( - self.opts.get("tcp_master_publish_pull", 4514) - ) - else: - pull_uri = "ipc://{}".format( - os.path.join(self.opts["sock_dir"], "publish_pull.ipc") - ) - return pull_uri +# @property +# def pull_uri(self): +# if self.opts.get("ipc_mode", "") == "tcp": +# pull_uri = "tcp://127.0.0.1:{}".format( +# self.opts.get("tcp_master_publish_pull", 4514) +# ) +# else: +# pull_uri = "ipc://{}".format( +# os.path.join(self.opts["sock_dir"], "publish_pull.ipc") +# ) +# return pull_uri +# +# @property +# def pub_uri(self): +# return "tcp://{interface}:{publish_port}".format(**self.opts) - @property - def pub_uri(self): - return "tcp://{interface}:{publish_port}".format(**self.opts) - - @tornado.gen.coroutine - def publish_payload(self, payload, topic_list=None): - payload = salt.payload.dumps(payload) async def publisher(self, pull_sock, publish_payload): while True: @@ -803,7 +833,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): return "tcp://{interface}:{publish_port}".format(**self.opts) async def publish_payload(self, payload, topic_list=None): - payload = salt.payload.dumps(payload) + #payload = salt.payload.dumps(payload) if self.opts["zmq_filtering"]: if topic_list: for topic in topic_list: @@ -870,16 +900,16 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): ctx = zmq.Context() self._sock_data.sock = ctx.socket(zmq.PUSH) self.pub_sock.setsockopt(zmq.LINGER, -1) - if self.opts.get("ipc_mode", "") == "tcp": - pull_uri = "tcp://127.0.0.1:{}".format( - self.opts.get("tcp_master_publish_pull", 4514) - ) - else: - pull_uri = "ipc://{}".format( - os.path.join(self.opts["sock_dir"], "publish_pull.ipc") - ) - log.debug("Connecting to pub server: %s", pull_uri) - self.pub_sock.connect(pull_uri) + #if self.opts.get("ipc_mode", "") == "tcp": + # pull_uri = "tcp://127.0.0.1:{}".format( + # self.opts.get("tcp_master_publish_pull", 4514) + # ) + #else: + # pull_uri = "ipc://{}".format( + # os.path.join(self.opts["sock_dir"], "publish_pull.ipc") + # ) + log.debug("Connecting to pub server: %s", self.pull_uri) + self.pub_sock.connect(self.pull_uri) return self._sock_data.sock def pub_close(self): @@ -900,8 +930,13 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): """ if not self.pub_sock: self.pub_connect() - serialized = salt.payload.dumps(payload) - self.pub_sock.send(serialized) + log.error("Payload %r", payload) + if "noserial" not in kwargs: + serialized = salt.payload.dumps(payload) + log.error("Serialized %r", serialized) + self.pub_sock.send(serialized) + else: + self.pub_sock.send(payload) log.debug("Sent payload to publish daemon.") @property diff --git a/salt/utils/event.py b/salt/utils/event.py index bfbfeaa7f4b..27b07723d92 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -352,17 +352,27 @@ class SaltEvent: if self.cpub: return True + log.error("EVENT AT LEAS") if self._run_io_loop_sync: with salt.utils.asynchronous.current_ioloop(self.io_loop): if self.subscriber is None: + #self.subscriber = salt.utils.asynchronous.SyncWrapper( + # salt.transport.ipc.IPCMessageSubscriber, + # args=(self.puburi,), + # kwargs={"io_loop": self.io_loop}, + # loop_kwarg="io_loop", + #) + #self.subscriber = salt.transport.publish_client(self.opts) self.subscriber = salt.utils.asynchronous.SyncWrapper( - salt.transport.ipc.IPCMessageSubscriber, - args=(self.puburi,), + salt.transport.publish_client, + args=(self.opts,), kwargs={"io_loop": self.io_loop}, loop_kwarg="io_loop", ) try: - self.subscriber.connect(timeout=timeout) + #self.subscriber.connect(timeout=timeout) + puburi = "ipc://{}".format(self.puburi) + self.subscriber.connect_uri(puburi) self.cpub = True except tornado.iostream.StreamClosedError: log.error("Encountered StreamClosedException") @@ -378,9 +388,15 @@ class SaltEvent: ) else: if self.subscriber is None: - self.subscriber = salt.transport.ipc.IPCMessageSubscriber( - self.puburi, io_loop=self.io_loop - ) + if "master_ip" not in self.opts: + self.opts["master_ip"] = "" + self.subscriber = salt.transport.publish_client(self.opts, self.io_loop) + puburi = "ipc://{}".format(self.puburi) + self.subscriber.connect_uri(puburi) + #self.io_loop.spawn_callback(self.subscriber.connect_uri, self.puburi) + #self.subscriber = salt.transport.ipc.IPCMessageSubscriber( + # self.puburi, io_loop=self.io_loop + #) # For the asynchronous case, the connect will be defered to when # set_event_handler() is invoked. @@ -410,11 +426,12 @@ class SaltEvent: if self._run_io_loop_sync: with salt.utils.asynchronous.current_ioloop(self.io_loop): if self.pusher is None: - self.pusher = salt.utils.asynchronous.SyncWrapper( - salt.transport.ipc.IPCMessageClient, - args=(self.pulluri,), - kwargs={"io_loop": self.io_loop}, - loop_kwarg="io_loop", + self.pusher = salt.transport.publish_server(self.opts) + self.pusher.pub_uri = "ipc://{}".format( + os.path.join(self.opts["sock_dir"], "master_event_pub.ipc") + ) + self.pusher.pull_uri = "ipc://{}".format( + os.path.join(self.opts["sock_dir"], "master_event_pull.ipc") ) #self.pusher = salt.utils.asynchronous.SyncWrapper( # salt.transport.ipc.IPCMessageClient, @@ -423,7 +440,8 @@ class SaltEvent: # loop_kwarg="io_loop", #) try: - self.pusher.connect(timeout=timeout) + #self.pusher.connect(timeout=timeout) + self.pusher.connect() self.cpush = True except tornado.iostream.StreamClosedError as exc: log.debug("Unable to connect pusher: %s", exc) @@ -435,8 +453,15 @@ class SaltEvent: ) else: if self.pusher is None: - self.pusher = salt.transport.ipc.IPCMessageClient( - self.pulluri, io_loop=self.io_loop + #self.pusher = salt.transport.ipc.IPCMessageClient( + # self.pulluri, io_loop=self.io_loop + #) + self.pusher = salt.transport.publish_server(self.opts) + self.pusher.pub_uri = "ipc://{}".format( + os.path.join(self.opts["sock_dir"], "master_event_pub.ipc") + ) + self.pusher.pull_uri = "ipc://{}".format( + os.path.join(self.opts["sock_dir"], "master_event_pull.ipc") ) # For the asynchronous case, the connect will be deferred to when # fire_event() is invoked. @@ -449,7 +474,6 @@ class SaltEvent: """ if not self.cpush: return - self.pusher.close() self.pusher = None self.cpush = False @@ -570,9 +594,12 @@ class SaltEvent: try: if not self.cpub and not self.connect_pub(timeout=wait): break - raw = self.subscriber.read(timeout=wait) + #riraw = self.subscriber.read(timeout=wait) + print(repr(self.subscriber)) + raw = self.subscriber.recv(timeout=wait) if raw is None: break + print(raw) mtag, data = self.unpack(raw) ret = {"data": data, "tag": mtag} except KeyboardInterrupt: @@ -693,7 +720,7 @@ class SaltEvent: if not self.cpub: if not self.connect_pub(): return None - raw = self.subscriber._read(timeout=0) + raw = self.subscriber.recv(timeout=0) if raw is None: return None mtag, data = self.unpack(raw) @@ -709,7 +736,7 @@ class SaltEvent: if not self.cpub: if not self.connect_pub(): return None - raw = self.subscriber._read(timeout=None) + raw = self.subscriber.recv(timeout=None) if raw is None: return None mtag, data = self.unpack(raw) @@ -767,7 +794,7 @@ class SaltEvent: is_msgpacked=True, use_bin_type=True, ) - log.debug("Sending event: tag = %s; data = %s", tag, data) + log.error("Sending event(fire_event_async): tag = %s; data = %s %r", tag, data, self.pusher) event = b"".join( [ salt.utils.stringutils.to_bytes(tag), @@ -820,7 +847,7 @@ class SaltEvent: is_msgpacked=True, use_bin_type=True, ) - log.debug("Sending event: tag = %s; data = %s", tag, data) + log.error("Sending event(fire_event): tag = %s; data = %s %s", tag, data, self.pusher.pull_uri) event = b"".join( [ salt.utils.stringutils.to_bytes(tag), @@ -829,11 +856,12 @@ class SaltEvent: ] ) msg = salt.utils.stringutils.to_bytes(event, "utf-8") + log.error("FIRE EVENT %r", msg) if self._run_io_loop_sync: with salt.utils.asynchronous.current_ioloop(self.io_loop): try: - self.pusher.send(msg) #self.pusher.send(msg) + self.pusher.publish(msg, noserial=True) except Exception as exc: # pylint: disable=broad-except log.debug( "Publisher send failed with exception: %s", @@ -842,7 +870,8 @@ class SaltEvent: ) raise else: - self.io_loop.spawn_callback(self.pusher.send, msg) + self.pusher.publish(msg, noserial=True) + #self.io_loop.spawn_callback(self.pusher.send, msg) return True def fire_master(self, data, tag, timeout=1000): @@ -956,7 +985,8 @@ class SaltEvent: if not self.cpub: self.connect_pub() # This will handle reconnects - return self.subscriber.read_async(event_handler) + #return self.subscriber.read_async(event_handler) + self.subscriber.on_recv(event_handler) # pylint: disable=W1701 def __del__(self): @@ -1055,205 +1085,205 @@ class MinionEvent(SaltEvent): ) -class AsyncEventPublisher: - """ - An event publisher class intended to run in an ioloop (within a single process) - - TODO: remove references to "minion_event" whenever we need to use this for other things - """ - - def __init__(self, opts, io_loop=None): - self.opts = salt.config.DEFAULT_MINION_OPTS.copy() - default_minion_sock_dir = self.opts["sock_dir"] - self.opts.update(opts) - - self.io_loop = io_loop or tornado.ioloop.IOLoop.current() - self._closing = False - self.publisher = None - self.puller = None - - hash_type = getattr(hashlib, self.opts["hash_type"]) - # Only use the first 10 chars to keep longer hashes from exceeding the - # max socket path length. - id_hash = hash_type( - salt.utils.stringutils.to_bytes(self.opts["id"]) - ).hexdigest()[:10] - epub_sock_path = os.path.join( - self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash) - ) - if os.path.exists(epub_sock_path): - os.unlink(epub_sock_path) - epull_sock_path = os.path.join( - self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash) - ) - if os.path.exists(epull_sock_path): - os.unlink(epull_sock_path) - - if self.opts["ipc_mode"] == "tcp": - epub_uri = int(self.opts["tcp_pub_port"]) - epull_uri = int(self.opts["tcp_pull_port"]) - else: - epub_uri = epub_sock_path - epull_uri = epull_sock_path - - log.debug("%s PUB socket URI: %s", self.__class__.__name__, epub_uri) - log.debug("%s PULL socket URI: %s", self.__class__.__name__, epull_uri) - - minion_sock_dir = self.opts["sock_dir"] - - if not os.path.isdir(minion_sock_dir): - # Let's try to create the directory defined on the configuration - # file - try: - os.makedirs(minion_sock_dir, 0o755) - except OSError as exc: - log.error("Could not create SOCK_DIR: %s", exc) - # Let's not fail yet and try using the default path - if minion_sock_dir == default_minion_sock_dir: - # We're already trying the default system path, stop now! - raise - - if not os.path.isdir(default_minion_sock_dir): - try: - os.makedirs(default_minion_sock_dir, 0o755) - except OSError as exc: - log.error("Could not create SOCK_DIR: %s", exc) - # Let's stop at this stage - raise - - self.publisher = salt.transport.ipc.IPCMessagePublisher( - self.opts, epub_uri, io_loop=self.io_loop - ) - - self.puller = salt.transport.ipc.IPCMessageServer( - epull_uri, io_loop=self.io_loop, payload_handler=self.handle_publish - ) - - log.info("Starting pull socket on %s", epull_uri) - with salt.utils.files.set_umask(0o177): - self.publisher.start() - self.puller.start() - - def handle_publish(self, package, _): - """ - Get something from epull, publish it out epub, and return the package (or None) - """ - try: - self.publisher.publish(package) - return package - # Add an extra fallback in case a forked process leeks through - except Exception: # pylint: disable=broad-except - log.critical("Unexpected error while polling minion events", exc_info=True) - return None - - def close(self): - if self._closing: - return - self._closing = True - if self.publisher is not None: - self.publisher.close() - if self.puller is not None: - self.puller.close() - - -class EventPublisher(salt.utils.process.SignalHandlingProcess): - """ - The interface that takes master events and republishes them out to anyone - who wants to listen - """ - - def __init__(self, opts, **kwargs): - super().__init__(**kwargs) - self.opts = salt.config.DEFAULT_MASTER_OPTS.copy() - self.opts.update(opts) - self._closing = False - self.io_loop = None - self.puller = None - self.publisher = None - - def run(self): - """ - Bind the pub and pull sockets for events - """ - if ( - self.opts["event_publisher_niceness"] - and not salt.utils.platform.is_windows() - ): - log.info( - "setting EventPublisher niceness to %i", - self.opts["event_publisher_niceness"], - ) - os.nice(self.opts["event_publisher_niceness"]) - - self.io_loop = tornado.ioloop.IOLoop() - with salt.utils.asynchronous.current_ioloop(self.io_loop): - if self.opts["ipc_mode"] == "tcp": - epub_uri = int(self.opts["tcp_master_pub_port"]) - epull_uri = int(self.opts["tcp_master_pull_port"]) - else: - epub_uri = os.path.join(self.opts["sock_dir"], "master_event_pub.ipc") - epull_uri = os.path.join(self.opts["sock_dir"], "master_event_pull.ipc") - - self.publisher = salt.transport.ipc.IPCMessagePublisher( - self.opts, epub_uri, io_loop=self.io_loop - ) - - self.puller = salt.transport.ipc.IPCMessageServer( - epull_uri, - io_loop=self.io_loop, - payload_handler=self.handle_publish, - ) - - # Start the master event publisher - with salt.utils.files.set_umask(0o177): - self.publisher.start() - self.puller.start() - if self.opts["ipc_mode"] != "tcp" and ( - self.opts["publisher_acl"] or self.opts["external_auth"] - ): - os.chmod( # nosec - os.path.join(self.opts["sock_dir"], "master_event_pub.ipc"), - 0o660, - ) - - atexit.register(self.close) - with contextlib.suppress(KeyboardInterrupt): - try: - self.io_loop.start() - finally: - # Make sure the IO loop and respective sockets are closed and destroyed - self.close() - - def handle_publish(self, package, _): - """ - Get something from epull, publish it out epub, and return the package (or None) - """ - try: - self.publisher.publish(package) - return package - # Add an extra fallback in case a forked process leeks through - except Exception: # pylint: disable=broad-except - log.critical("Unexpected error while polling master events", exc_info=True) - return None - - def close(self): - if self._closing: - return - self._closing = True - atexit.unregister(self.close) - if self.publisher is not None: - self.publisher.close() - self.publisher = None - if self.puller is not None: - self.puller.close() - self.puller = None - if self.io_loop is not None: - self.io_loop.close() - self.io_loop = None - - def _handle_signals(self, signum, sigframe): - self.close() - super()._handle_signals(signum, sigframe) +#class AsyncEventPublisher: +# """ +# An event publisher class intended to run in an ioloop (within a single process) +# +# TODO: remove references to "minion_event" whenever we need to use this for other things +# """ +# +# def __init__(self, opts, io_loop=None): +# self.opts = salt.config.DEFAULT_MINION_OPTS.copy() +# default_minion_sock_dir = self.opts["sock_dir"] +# self.opts.update(opts) +# +# self.io_loop = io_loop or tornado.ioloop.IOLoop.current() +# self._closing = False +# self.publisher = None +# self.puller = None +# +# hash_type = getattr(hashlib, self.opts["hash_type"]) +# # Only use the first 10 chars to keep longer hashes from exceeding the +# # max socket path length. +# id_hash = hash_type( +# salt.utils.stringutils.to_bytes(self.opts["id"]) +# ).hexdigest()[:10] +# epub_sock_path = os.path.join( +# self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash) +# ) +# if os.path.exists(epub_sock_path): +# os.unlink(epub_sock_path) +# epull_sock_path = os.path.join( +# self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash) +# ) +# if os.path.exists(epull_sock_path): +# os.unlink(epull_sock_path) +# +# if self.opts["ipc_mode"] == "tcp": +# epub_uri = int(self.opts["tcp_pub_port"]) +# epull_uri = int(self.opts["tcp_pull_port"]) +# else: +# epub_uri = epub_sock_path +# epull_uri = epull_sock_path +# +# log.debug("%s PUB socket URI: %s", self.__class__.__name__, epub_uri) +# log.debug("%s PULL socket URI: %s", self.__class__.__name__, epull_uri) +# +# minion_sock_dir = self.opts["sock_dir"] +# +# if not os.path.isdir(minion_sock_dir): +# # Let's try to create the directory defined on the configuration +# # file +# try: +# os.makedirs(minion_sock_dir, 0o755) +# except OSError as exc: +# log.error("Could not create SOCK_DIR: %s", exc) +# # Let's not fail yet and try using the default path +# if minion_sock_dir == default_minion_sock_dir: +# # We're already trying the default system path, stop now! +# raise +# +# if not os.path.isdir(default_minion_sock_dir): +# try: +# os.makedirs(default_minion_sock_dir, 0o755) +# except OSError as exc: +# log.error("Could not create SOCK_DIR: %s", exc) +# # Let's stop at this stage +# raise +# +# self.publisher = salt.transport.ipc.IPCMessagePublisher( +# self.opts, epub_uri, io_loop=self.io_loop +# ) +# +# self.puller = salt.transport.ipc.IPCMessageServer( +# epull_uri, io_loop=self.io_loop, payload_handler=self.handle_publish +# ) +# +# log.info("Starting pull socket on %s", epull_uri) +# with salt.utils.files.set_umask(0o177): +# self.publisher.start() +# self.puller.start() +# +# def handle_publish(self, package, _): +# """ +# Get something from epull, publish it out epub, and return the package (or None) +# """ +# try: +# self.publisher.publish(package) +# return package +# # Add an extra fallback in case a forked process leeks through +# except Exception: # pylint: disable=broad-except +# log.critical("Unexpected error while polling minion events", exc_info=True) +# return None +# +# def close(self): +# if self._closing: +# return +# self._closing = True +# if self.publisher is not None: +# self.publisher.close() +# if self.puller is not None: +# self.puller.close() +# +# +#class EventPublisher(salt.utils.process.SignalHandlingProcess): +# """ +# The interface that takes master events and republishes them out to anyone +# who wants to listen +# """ +# +# def __init__(self, opts, **kwargs): +# super().__init__(**kwargs) +# self.opts = salt.config.DEFAULT_MASTER_OPTS.copy() +# self.opts.update(opts) +# self._closing = False +# self.io_loop = None +# self.puller = None +# self.publisher = None +# +# def run(self): +# """ +# Bind the pub and pull sockets for events +# """ +# if ( +# self.opts["event_publisher_niceness"] +# and not salt.utils.platform.is_windows() +# ): +# log.info( +# "setting EventPublisher niceness to %i", +# self.opts["event_publisher_niceness"], +# ) +# os.nice(self.opts["event_publisher_niceness"]) +# +# self.io_loop = tornado.ioloop.IOLoop() +# with salt.utils.asynchronous.current_ioloop(self.io_loop): +# if self.opts["ipc_mode"] == "tcp": +# epub_uri = int(self.opts["tcp_master_pub_port"]) +# epull_uri = int(self.opts["tcp_master_pull_port"]) +# else: +# epub_uri = os.path.join(self.opts["sock_dir"], "master_event_pub.ipc") +# epull_uri = os.path.join(self.opts["sock_dir"], "master_event_pull.ipc") +# +# self.publisher = salt.transport.ipc.IPCMessagePublisher( +# self.opts, epub_uri, io_loop=self.io_loop +# ) +# +# self.puller = salt.transport.ipc.IPCMessageServer( +# epull_uri, +# io_loop=self.io_loop, +# payload_handler=self.handle_publish, +# ) +# +# # Start the master event publisher +# with salt.utils.files.set_umask(0o177): +# self.publisher.start() +# self.puller.start() +# if self.opts["ipc_mode"] != "tcp" and ( +# self.opts["publisher_acl"] or self.opts["external_auth"] +# ): +# os.chmod( # nosec +# os.path.join(self.opts["sock_dir"], "master_event_pub.ipc"), +# 0o660, +# ) +# +# atexit.register(self.close) +# with contextlib.suppress(KeyboardInterrupt): +# try: +# self.io_loop.start() +# finally: +# # Make sure the IO loop and respective sockets are closed and destroyed +# self.close() +# +# def handle_publish(self, package, _): +# """ +# Get something from epull, publish it out epub, and return the package (or None) +# """ +# try: +# self.publisher.publish(package) +# return package +# # Add an extra fallback in case a forked process leeks through +# except Exception: # pylint: disable=broad-except +# log.critical("Unexpected error while polling master events", exc_info=True) +# return None +# +# def close(self): +# if self._closing: +# return +# self._closing = True +# atexit.unregister(self.close) +# if self.publisher is not None: +# self.publisher.close() +# self.publisher = None +# if self.puller is not None: +# self.puller.close() +# self.puller = None +# if self.io_loop is not None: +# self.io_loop.close() +# self.io_loop = None +# +# def _handle_signals(self, signum, sigframe): +# self.close() +# super()._handle_signals(signum, sigframe) class EventReturn(salt.utils.process.SignalHandlingProcess):