mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
un-commnet things we will keep
This commit is contained in:
parent
eb8328717d
commit
f3522141df
1 changed files with 199 additions and 199 deletions
|
@ -1091,205 +1091,205 @@ class MinionEvent(SaltEvent):
|
|||
)
|
||||
|
||||
|
||||
# class AsyncEventPublisher:
|
||||
# """
|
||||
# An event publisher class intended to run in an ioloop (within a single process)
|
||||
#
|
||||
# TODO: remove references to "minion_event" whenever we need to use this for other things
|
||||
# """
|
||||
#
|
||||
# def __init__(self, opts, io_loop=None):
|
||||
# self.opts = salt.config.DEFAULT_MINION_OPTS.copy()
|
||||
# default_minion_sock_dir = self.opts["sock_dir"]
|
||||
# self.opts.update(opts)
|
||||
#
|
||||
# self.io_loop = io_loop or tornado.ioloop.IOLoop.current()
|
||||
# self._closing = False
|
||||
# self.publisher = None
|
||||
# self.puller = None
|
||||
#
|
||||
# hash_type = getattr(hashlib, self.opts["hash_type"])
|
||||
# # Only use the first 10 chars to keep longer hashes from exceeding the
|
||||
# # max socket path length.
|
||||
# id_hash = hash_type(
|
||||
# salt.utils.stringutils.to_bytes(self.opts["id"])
|
||||
# ).hexdigest()[:10]
|
||||
# epub_sock_path = os.path.join(
|
||||
# self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
|
||||
# )
|
||||
# if os.path.exists(epub_sock_path):
|
||||
# os.unlink(epub_sock_path)
|
||||
# epull_sock_path = os.path.join(
|
||||
# self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
|
||||
# )
|
||||
# if os.path.exists(epull_sock_path):
|
||||
# os.unlink(epull_sock_path)
|
||||
#
|
||||
# if self.opts["ipc_mode"] == "tcp":
|
||||
# epub_uri = int(self.opts["tcp_pub_port"])
|
||||
# epull_uri = int(self.opts["tcp_pull_port"])
|
||||
# else:
|
||||
# epub_uri = epub_sock_path
|
||||
# epull_uri = epull_sock_path
|
||||
#
|
||||
# log.debug("%s PUB socket URI: %s", self.__class__.__name__, epub_uri)
|
||||
# log.debug("%s PULL socket URI: %s", self.__class__.__name__, epull_uri)
|
||||
#
|
||||
# minion_sock_dir = self.opts["sock_dir"]
|
||||
#
|
||||
# if not os.path.isdir(minion_sock_dir):
|
||||
# # Let's try to create the directory defined on the configuration
|
||||
# # file
|
||||
# try:
|
||||
# os.makedirs(minion_sock_dir, 0o755)
|
||||
# except OSError as exc:
|
||||
# log.error("Could not create SOCK_DIR: %s", exc)
|
||||
# # Let's not fail yet and try using the default path
|
||||
# if minion_sock_dir == default_minion_sock_dir:
|
||||
# # We're already trying the default system path, stop now!
|
||||
# raise
|
||||
#
|
||||
# if not os.path.isdir(default_minion_sock_dir):
|
||||
# try:
|
||||
# os.makedirs(default_minion_sock_dir, 0o755)
|
||||
# except OSError as exc:
|
||||
# log.error("Could not create SOCK_DIR: %s", exc)
|
||||
# # Let's stop at this stage
|
||||
# raise
|
||||
#
|
||||
# self.publisher = salt.transport.ipc.IPCMessagePublisher(
|
||||
# self.opts, epub_uri, io_loop=self.io_loop
|
||||
# )
|
||||
#
|
||||
# self.puller = salt.transport.ipc.IPCMessageServer(
|
||||
# epull_uri, io_loop=self.io_loop, payload_handler=self.handle_publish
|
||||
# )
|
||||
#
|
||||
# log.info("Starting pull socket on %s", epull_uri)
|
||||
# with salt.utils.files.set_umask(0o177):
|
||||
# self.publisher.start()
|
||||
# self.puller.start()
|
||||
#
|
||||
# def handle_publish(self, package, _):
|
||||
# """
|
||||
# Get something from epull, publish it out epub, and return the package (or None)
|
||||
# """
|
||||
# try:
|
||||
# self.publisher.publish(package)
|
||||
# return package
|
||||
# # Add an extra fallback in case a forked process leeks through
|
||||
# except Exception: # pylint: disable=broad-except
|
||||
# log.critical("Unexpected error while polling minion events", exc_info=True)
|
||||
# return None
|
||||
#
|
||||
# def close(self):
|
||||
# if self._closing:
|
||||
# return
|
||||
# self._closing = True
|
||||
# if self.publisher is not None:
|
||||
# self.publisher.close()
|
||||
# if self.puller is not None:
|
||||
# self.puller.close()
|
||||
#
|
||||
#
|
||||
# class EventPublisher(salt.utils.process.SignalHandlingProcess):
|
||||
# """
|
||||
# The interface that takes master events and republishes them out to anyone
|
||||
# who wants to listen
|
||||
# """
|
||||
#
|
||||
# def __init__(self, opts, **kwargs):
|
||||
# super().__init__(**kwargs)
|
||||
# self.opts = salt.config.DEFAULT_MASTER_OPTS.copy()
|
||||
# self.opts.update(opts)
|
||||
# self._closing = False
|
||||
# self.io_loop = None
|
||||
# self.puller = None
|
||||
# self.publisher = None
|
||||
#
|
||||
# def run(self):
|
||||
# """
|
||||
# Bind the pub and pull sockets for events
|
||||
# """
|
||||
# if (
|
||||
# self.opts["event_publisher_niceness"]
|
||||
# and not salt.utils.platform.is_windows()
|
||||
# ):
|
||||
# log.info(
|
||||
# "setting EventPublisher niceness to %i",
|
||||
# self.opts["event_publisher_niceness"],
|
||||
# )
|
||||
# os.nice(self.opts["event_publisher_niceness"])
|
||||
#
|
||||
# self.io_loop = tornado.ioloop.IOLoop()
|
||||
# with salt.utils.asynchronous.current_ioloop(self.io_loop):
|
||||
# if self.opts["ipc_mode"] == "tcp":
|
||||
# epub_uri = int(self.opts["tcp_master_pub_port"])
|
||||
# epull_uri = int(self.opts["tcp_master_pull_port"])
|
||||
# else:
|
||||
# epub_uri = os.path.join(self.opts["sock_dir"], "master_event_pub.ipc")
|
||||
# epull_uri = os.path.join(self.opts["sock_dir"], "master_event_pull.ipc")
|
||||
#
|
||||
# self.publisher = salt.transport.ipc.IPCMessagePublisher(
|
||||
# self.opts, epub_uri, io_loop=self.io_loop
|
||||
# )
|
||||
#
|
||||
# self.puller = salt.transport.ipc.IPCMessageServer(
|
||||
# epull_uri,
|
||||
# io_loop=self.io_loop,
|
||||
# payload_handler=self.handle_publish,
|
||||
# )
|
||||
#
|
||||
# # Start the master event publisher
|
||||
# with salt.utils.files.set_umask(0o177):
|
||||
# self.publisher.start()
|
||||
# self.puller.start()
|
||||
# if self.opts["ipc_mode"] != "tcp" and (
|
||||
# self.opts["publisher_acl"] or self.opts["external_auth"]
|
||||
# ):
|
||||
# os.chmod( # nosec
|
||||
# os.path.join(self.opts["sock_dir"], "master_event_pub.ipc"),
|
||||
# 0o660,
|
||||
# )
|
||||
#
|
||||
# atexit.register(self.close)
|
||||
# with contextlib.suppress(KeyboardInterrupt):
|
||||
# try:
|
||||
# self.io_loop.start()
|
||||
# finally:
|
||||
# # Make sure the IO loop and respective sockets are closed and destroyed
|
||||
# self.close()
|
||||
#
|
||||
# def handle_publish(self, package, _):
|
||||
# """
|
||||
# Get something from epull, publish it out epub, and return the package (or None)
|
||||
# """
|
||||
# try:
|
||||
# self.publisher.publish(package)
|
||||
# return package
|
||||
# # Add an extra fallback in case a forked process leeks through
|
||||
# except Exception: # pylint: disable=broad-except
|
||||
# log.critical("Unexpected error while polling master events", exc_info=True)
|
||||
# return None
|
||||
#
|
||||
# def close(self):
|
||||
# if self._closing:
|
||||
# return
|
||||
# self._closing = True
|
||||
# atexit.unregister(self.close)
|
||||
# if self.publisher is not None:
|
||||
# self.publisher.close()
|
||||
# self.publisher = None
|
||||
# if self.puller is not None:
|
||||
# self.puller.close()
|
||||
# self.puller = None
|
||||
# if self.io_loop is not None:
|
||||
# self.io_loop.close()
|
||||
# self.io_loop = None
|
||||
#
|
||||
# def _handle_signals(self, signum, sigframe):
|
||||
# self.close()
|
||||
# super()._handle_signals(signum, sigframe)
|
||||
class AsyncEventPublisher:
|
||||
"""
|
||||
An event publisher class intended to run in an ioloop (within a single process)
|
||||
|
||||
TODO: remove references to "minion_event" whenever we need to use this for other things
|
||||
"""
|
||||
|
||||
def __init__(self, opts, io_loop=None):
|
||||
self.opts = salt.config.DEFAULT_MINION_OPTS.copy()
|
||||
default_minion_sock_dir = self.opts["sock_dir"]
|
||||
self.opts.update(opts)
|
||||
|
||||
self.io_loop = io_loop or tornado.ioloop.IOLoop.current()
|
||||
self._closing = False
|
||||
self.publisher = None
|
||||
self.puller = None
|
||||
|
||||
hash_type = getattr(hashlib, self.opts["hash_type"])
|
||||
# Only use the first 10 chars to keep longer hashes from exceeding the
|
||||
# max socket path length.
|
||||
id_hash = hash_type(
|
||||
salt.utils.stringutils.to_bytes(self.opts["id"])
|
||||
).hexdigest()[:10]
|
||||
epub_sock_path = os.path.join(
|
||||
self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
|
||||
)
|
||||
if os.path.exists(epub_sock_path):
|
||||
os.unlink(epub_sock_path)
|
||||
epull_sock_path = os.path.join(
|
||||
self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
|
||||
)
|
||||
if os.path.exists(epull_sock_path):
|
||||
os.unlink(epull_sock_path)
|
||||
|
||||
if self.opts["ipc_mode"] == "tcp":
|
||||
epub_uri = int(self.opts["tcp_pub_port"])
|
||||
epull_uri = int(self.opts["tcp_pull_port"])
|
||||
else:
|
||||
epub_uri = epub_sock_path
|
||||
epull_uri = epull_sock_path
|
||||
|
||||
log.debug("%s PUB socket URI: %s", self.__class__.__name__, epub_uri)
|
||||
log.debug("%s PULL socket URI: %s", self.__class__.__name__, epull_uri)
|
||||
|
||||
minion_sock_dir = self.opts["sock_dir"]
|
||||
|
||||
if not os.path.isdir(minion_sock_dir):
|
||||
# Let's try to create the directory defined on the configuration
|
||||
# file
|
||||
try:
|
||||
os.makedirs(minion_sock_dir, 0o755)
|
||||
except OSError as exc:
|
||||
log.error("Could not create SOCK_DIR: %s", exc)
|
||||
# Let's not fail yet and try using the default path
|
||||
if minion_sock_dir == default_minion_sock_dir:
|
||||
# We're already trying the default system path, stop now!
|
||||
raise
|
||||
|
||||
if not os.path.isdir(default_minion_sock_dir):
|
||||
try:
|
||||
os.makedirs(default_minion_sock_dir, 0o755)
|
||||
except OSError as exc:
|
||||
log.error("Could not create SOCK_DIR: %s", exc)
|
||||
# Let's stop at this stage
|
||||
raise
|
||||
|
||||
self.publisher = salt.transport.ipc.IPCMessagePublisher(
|
||||
self.opts, epub_uri, io_loop=self.io_loop
|
||||
)
|
||||
|
||||
self.puller = salt.transport.ipc.IPCMessageServer(
|
||||
epull_uri, io_loop=self.io_loop, payload_handler=self.handle_publish
|
||||
)
|
||||
|
||||
log.info("Starting pull socket on %s", epull_uri)
|
||||
with salt.utils.files.set_umask(0o177):
|
||||
self.publisher.start()
|
||||
self.puller.start()
|
||||
|
||||
def handle_publish(self, package, _):
|
||||
"""
|
||||
Get something from epull, publish it out epub, and return the package (or None)
|
||||
"""
|
||||
try:
|
||||
self.publisher.publish(package)
|
||||
return package
|
||||
# Add an extra fallback in case a forked process leeks through
|
||||
except Exception: # pylint: disable=broad-except
|
||||
log.critical("Unexpected error while polling minion events", exc_info=True)
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
if self._closing:
|
||||
return
|
||||
self._closing = True
|
||||
if self.publisher is not None:
|
||||
self.publisher.close()
|
||||
if self.puller is not None:
|
||||
self.puller.close()
|
||||
|
||||
|
||||
class EventPublisher(salt.utils.process.SignalHandlingProcess):
|
||||
"""
|
||||
The interface that takes master events and republishes them out to anyone
|
||||
who wants to listen
|
||||
"""
|
||||
|
||||
def __init__(self, opts, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.opts = salt.config.DEFAULT_MASTER_OPTS.copy()
|
||||
self.opts.update(opts)
|
||||
self._closing = False
|
||||
self.io_loop = None
|
||||
self.puller = None
|
||||
self.publisher = None
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Bind the pub and pull sockets for events
|
||||
"""
|
||||
if (
|
||||
self.opts["event_publisher_niceness"]
|
||||
and not salt.utils.platform.is_windows()
|
||||
):
|
||||
log.info(
|
||||
"setting EventPublisher niceness to %i",
|
||||
self.opts["event_publisher_niceness"],
|
||||
)
|
||||
os.nice(self.opts["event_publisher_niceness"])
|
||||
|
||||
self.io_loop = tornado.ioloop.IOLoop()
|
||||
with salt.utils.asynchronous.current_ioloop(self.io_loop):
|
||||
if self.opts["ipc_mode"] == "tcp":
|
||||
epub_uri = int(self.opts["tcp_master_pub_port"])
|
||||
epull_uri = int(self.opts["tcp_master_pull_port"])
|
||||
else:
|
||||
epub_uri = os.path.join(self.opts["sock_dir"], "master_event_pub.ipc")
|
||||
epull_uri = os.path.join(self.opts["sock_dir"], "master_event_pull.ipc")
|
||||
|
||||
self.publisher = salt.transport.ipc.IPCMessagePublisher(
|
||||
self.opts, epub_uri, io_loop=self.io_loop
|
||||
)
|
||||
|
||||
self.puller = salt.transport.ipc.IPCMessageServer(
|
||||
epull_uri,
|
||||
io_loop=self.io_loop,
|
||||
payload_handler=self.handle_publish,
|
||||
)
|
||||
|
||||
# Start the master event publisher
|
||||
with salt.utils.files.set_umask(0o177):
|
||||
self.publisher.start()
|
||||
self.puller.start()
|
||||
if self.opts["ipc_mode"] != "tcp" and (
|
||||
self.opts["publisher_acl"] or self.opts["external_auth"]
|
||||
):
|
||||
os.chmod( # nosec
|
||||
os.path.join(self.opts["sock_dir"], "master_event_pub.ipc"),
|
||||
0o660,
|
||||
)
|
||||
|
||||
atexit.register(self.close)
|
||||
with contextlib.suppress(KeyboardInterrupt):
|
||||
try:
|
||||
self.io_loop.start()
|
||||
finally:
|
||||
# Make sure the IO loop and respective sockets are closed and destroyed
|
||||
self.close()
|
||||
|
||||
def handle_publish(self, package, _):
|
||||
"""
|
||||
Get something from epull, publish it out epub, and return the package (or None)
|
||||
"""
|
||||
try:
|
||||
self.publisher.publish(package)
|
||||
return package
|
||||
# Add an extra fallback in case a forked process leeks through
|
||||
except Exception: # pylint: disable=broad-except
|
||||
log.critical("Unexpected error while polling master events", exc_info=True)
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
if self._closing:
|
||||
return
|
||||
self._closing = True
|
||||
atexit.unregister(self.close)
|
||||
if self.publisher is not None:
|
||||
self.publisher.close()
|
||||
self.publisher = None
|
||||
if self.puller is not None:
|
||||
self.puller.close()
|
||||
self.puller = None
|
||||
if self.io_loop is not None:
|
||||
self.io_loop.close()
|
||||
self.io_loop = None
|
||||
|
||||
def _handle_signals(self, signum, sigframe):
|
||||
self.close()
|
||||
super()._handle_signals(signum, sigframe)
|
||||
|
||||
|
||||
class EventReturn(salt.utils.process.SignalHandlingProcess):
|
||||
|
|
Loading…
Add table
Reference in a new issue