mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Fix connection details for minion ipc
This commit is contained in:
parent
e102f8f11e
commit
fb4ce8a741
6 changed files with 374 additions and 269 deletions
|
@ -32,6 +32,7 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser):
|
|||
import salt.client
|
||||
|
||||
self.parse_args()
|
||||
print("A")
|
||||
|
||||
try:
|
||||
# We don't need to bail on config file permission errors
|
||||
|
@ -47,6 +48,7 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser):
|
|||
self.exit(2, "{}\n".format(exc))
|
||||
return
|
||||
|
||||
print("B")
|
||||
if self.options.batch or self.options.static:
|
||||
# _run_batch() will handle all output and
|
||||
# exit with the appropriate error condition
|
||||
|
@ -63,6 +65,7 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser):
|
|||
if self.options.timeout <= 0:
|
||||
self.options.timeout = self.local_client.opts["timeout"]
|
||||
|
||||
print("C")
|
||||
kwargs = {
|
||||
"tgt": self.config["tgt"],
|
||||
"fun": self.config["fun"],
|
||||
|
@ -90,6 +93,7 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser):
|
|||
else:
|
||||
kwargs["tgt_type"] = "glob"
|
||||
|
||||
print("D")
|
||||
# If batch_safe_limit is set, check minions matching target and
|
||||
# potentially switch to batch execution
|
||||
if self.options.batch_safe_limit > 1:
|
||||
|
@ -151,6 +155,7 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser):
|
|||
)
|
||||
return
|
||||
|
||||
print("E")
|
||||
# local will be None when there was an error
|
||||
if not self.local_client:
|
||||
return
|
||||
|
@ -222,8 +227,9 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser):
|
|||
AuthorizationError,
|
||||
SaltInvocationError,
|
||||
EauthAuthenticationError,
|
||||
SaltClientError,
|
||||
#SaltClientError,
|
||||
) as exc:
|
||||
print(repr(exc))
|
||||
ret = str(exc)
|
||||
self._output_ret(ret, "", retcode=1)
|
||||
finally:
|
||||
|
|
|
@ -722,10 +722,22 @@ class Master(SMaster):
|
|||
pub_channels.append(chan)
|
||||
|
||||
log.info("Creating master event publisher process")
|
||||
#self.process_manager.add_process(
|
||||
# salt.utils.event.EventPublisher,
|
||||
# args=(self.opts,),
|
||||
# name="EventPublisher",
|
||||
#)
|
||||
|
||||
ipc_publisher = salt.transport.publish_server(self.opts)
|
||||
ipc_publisher.pub_uri = "ipc://{}".format(
|
||||
os.path.join(self.opts["sock_dir"], "master_event_pub.ipc")
|
||||
)
|
||||
ipc_publisher.pull_uri = "ipc://{}".format(
|
||||
os.path.join(self.opts["sock_dir"], "master_event_pull.ipc")
|
||||
)
|
||||
self.process_manager.add_process(
|
||||
salt.utils.event.EventPublisher,
|
||||
args=(self.opts,),
|
||||
name="EventPublisher",
|
||||
ipc_publisher.publish_daemon,
|
||||
args=[ipc_publisher.publish_payload,],
|
||||
)
|
||||
|
||||
if self.opts.get("reactor"):
|
||||
|
@ -2359,6 +2371,7 @@ class ClearFuncs(TransportMethods):
|
|||
chan = salt.channel.server.PubServerChannel.factory(opts)
|
||||
self.channels.append(chan)
|
||||
for chan in self.channels:
|
||||
log.error("SEND PUB %r", load)
|
||||
chan.publish(load)
|
||||
|
||||
@property
|
||||
|
|
|
@ -1048,6 +1048,28 @@ class MinionManager(MinionBase):
|
|||
self.opts,
|
||||
io_loop=self.io_loop,
|
||||
)
|
||||
def target():
|
||||
import hashlib
|
||||
self.opts['publish_port'] = 12321
|
||||
hash_type = getattr(hashlib, self.opts["hash_type"])
|
||||
ipc_publisher = salt.transport.publish_server(self.opts)
|
||||
id_hash = hash_type(
|
||||
salt.utils.stringutils.to_bytes(self.opts["id"])
|
||||
).hexdigest()[:10]
|
||||
epub_sock_path = "ipc://{}".format(os.path.join(
|
||||
self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
|
||||
))
|
||||
if os.path.exists(epub_sock_path):
|
||||
os.unlink(epub_sock_path)
|
||||
epull_sock_path = "ipc://{}".format(os.path.join(
|
||||
self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
|
||||
))
|
||||
ipc_publisher.pub_uri = epub_sock_path
|
||||
ipc_publisher.pull_uri = epull_sock_path
|
||||
ipc_publisher.publish_daemon(ipc_publisher.publish_payload)
|
||||
|
||||
thread = threading.Thread(target=target)
|
||||
thread.start()
|
||||
self.event = salt.utils.event.get_event(
|
||||
"minion", opts=self.opts, io_loop=self.io_loop
|
||||
)
|
||||
|
|
|
@ -676,7 +676,6 @@ class IPCMessageSubscriber(IPCClient):
|
|||
self._read_stream_future = self.stream.read_bytes(
|
||||
4096, partial=True
|
||||
)
|
||||
|
||||
if timeout is None:
|
||||
wire_bytes = yield self._read_stream_future
|
||||
else:
|
||||
|
|
|
@ -108,6 +108,14 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
|
||||
ttype = "zeromq"
|
||||
|
||||
async_methods = [
|
||||
"connect",
|
||||
"recv",
|
||||
]
|
||||
close_methods = [
|
||||
"close",
|
||||
]
|
||||
|
||||
def __init__(self, opts, io_loop, **kwargs):
|
||||
super().__init__(opts, io_loop, **kwargs)
|
||||
self.callbacks = {}
|
||||
|
@ -149,18 +157,19 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
zmq.TCP_KEEPALIVE_INTVL, self.opts["tcp_keepalive_intvl"]
|
||||
)
|
||||
|
||||
recon_delay = self.opts["recon_default"]
|
||||
recon_delay = self.opts.get("recon_default", 1)
|
||||
recon_max = self.opts.get("recon_max", 1)
|
||||
|
||||
if self.opts["recon_randomize"]:
|
||||
if self.opts.get("recon_randomize"):
|
||||
recon_delay = randint(
|
||||
self.opts["recon_default"],
|
||||
self.opts["recon_default"] + self.opts["recon_max"],
|
||||
recon_delay,
|
||||
recon_delay + recon_max,
|
||||
)
|
||||
|
||||
log.debug(
|
||||
"Generated random reconnect delay between '%sms' and '%sms' (%s)",
|
||||
self.opts["recon_default"],
|
||||
self.opts["recon_default"] + self.opts["recon_max"],
|
||||
recon_delay,
|
||||
recon_delay + recon_max,
|
||||
recon_delay,
|
||||
)
|
||||
|
||||
|
@ -170,12 +179,12 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
if hasattr(zmq, "RECONNECT_IVL_MAX"):
|
||||
log.debug(
|
||||
"Setting zmq_reconnect_ivl_max to '%sms'",
|
||||
self.opts["recon_default"] + self.opts["recon_max"],
|
||||
recon_delay + recon_max,
|
||||
)
|
||||
|
||||
self._socket.setsockopt(zmq.RECONNECT_IVL_MAX, self.opts["recon_max"])
|
||||
self._socket.setsockopt(zmq.RECONNECT_IVL_MAX, recon_max)
|
||||
|
||||
if (self.opts["ipv6"] is True or ":" in self.opts["master_ip"]) and hasattr(
|
||||
if (self.opts["ipv6"] is True or ":" in self.opts.get("master_ip", "")) and hasattr(
|
||||
zmq, "IPV4ONLY"
|
||||
):
|
||||
# IPv6 sockets work for both IPv6 and IPv4 addresses
|
||||
|
@ -213,13 +222,24 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
self, publish_port, connect_callback=None, disconnect_callback=None
|
||||
):
|
||||
self.publish_port = publish_port
|
||||
log.debug(
|
||||
log.error(
|
||||
"Connecting the Minion to the Master publish port, using the URI: %s",
|
||||
self.master_pub,
|
||||
)
|
||||
self._socket.connect(self.master_pub)
|
||||
# await connect_callback(True)
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def connect_uri(self, uri, connect_callback=None, disconnect_callback=None):
|
||||
log.error(
|
||||
"Connecting the Minion to the Master publish port, using the URI: %s",
|
||||
uri
|
||||
)
|
||||
#log.debug("%r connecting to %s", self, self.master_pub)
|
||||
self._socket.connect(uri)
|
||||
if connect_callback:
|
||||
connect_callback(True)
|
||||
|
||||
@property
|
||||
def master_pub(self):
|
||||
"""
|
||||
|
@ -277,6 +297,9 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
async def recv(self, timeout=None):
|
||||
return await self._socket.recv()
|
||||
|
||||
async def recv(self, timeout=None):
|
||||
return await self._socket.recv()
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def send(self, msg):
|
||||
self.stream.send(msg, noblock=True)
|
||||
|
@ -690,6 +713,15 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
|
|||
|
||||
def __init__(self, opts):
|
||||
self.opts = opts
|
||||
if self.opts.get("ipc_mode", "") == "tcp":
|
||||
self.pull_uri = "tcp://127.0.0.1:{}".format(
|
||||
self.opts.get("tcp_master_publish_pull", 4514)
|
||||
)
|
||||
else:
|
||||
self.pull_uri = "ipc://{}".format(
|
||||
os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
|
||||
)
|
||||
self.pub_uri = "tcp://{interface}:{publish_port}".format(**self.opts)
|
||||
|
||||
def connect(self):
|
||||
return tornado.gen.sleep(5)
|
||||
|
@ -742,7 +774,8 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
|
|||
|
||||
async def on_recv(packages):
|
||||
for package in packages:
|
||||
payload = salt.payload.loads(package)
|
||||
log.error("PACAKGE %s %s %r", self.pull_uri, self.pub_uri, package)
|
||||
# payload = salt.payload.loads(package)
|
||||
await publish_payload(payload)
|
||||
|
||||
self.task = None
|
||||
|
@ -757,25 +790,22 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
|
|||
if self.task:
|
||||
self.task.cancel()
|
||||
|
||||
@property
|
||||
def pull_uri(self):
|
||||
if self.opts.get("ipc_mode", "") == "tcp":
|
||||
pull_uri = "tcp://127.0.0.1:{}".format(
|
||||
self.opts.get("tcp_master_publish_pull", 4514)
|
||||
)
|
||||
else:
|
||||
pull_uri = "ipc://{}".format(
|
||||
os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
|
||||
)
|
||||
return pull_uri
|
||||
# @property
|
||||
# def pull_uri(self):
|
||||
# if self.opts.get("ipc_mode", "") == "tcp":
|
||||
# pull_uri = "tcp://127.0.0.1:{}".format(
|
||||
# self.opts.get("tcp_master_publish_pull", 4514)
|
||||
# )
|
||||
# else:
|
||||
# pull_uri = "ipc://{}".format(
|
||||
# os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
|
||||
# )
|
||||
# return pull_uri
|
||||
#
|
||||
# @property
|
||||
# def pub_uri(self):
|
||||
# return "tcp://{interface}:{publish_port}".format(**self.opts)
|
||||
|
||||
@property
|
||||
def pub_uri(self):
|
||||
return "tcp://{interface}:{publish_port}".format(**self.opts)
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def publish_payload(self, payload, topic_list=None):
|
||||
payload = salt.payload.dumps(payload)
|
||||
|
||||
async def publisher(self, pull_sock, publish_payload):
|
||||
while True:
|
||||
|
@ -803,7 +833,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
|
|||
return "tcp://{interface}:{publish_port}".format(**self.opts)
|
||||
|
||||
async def publish_payload(self, payload, topic_list=None):
|
||||
payload = salt.payload.dumps(payload)
|
||||
#payload = salt.payload.dumps(payload)
|
||||
if self.opts["zmq_filtering"]:
|
||||
if topic_list:
|
||||
for topic in topic_list:
|
||||
|
@ -870,16 +900,16 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
|
|||
ctx = zmq.Context()
|
||||
self._sock_data.sock = ctx.socket(zmq.PUSH)
|
||||
self.pub_sock.setsockopt(zmq.LINGER, -1)
|
||||
if self.opts.get("ipc_mode", "") == "tcp":
|
||||
pull_uri = "tcp://127.0.0.1:{}".format(
|
||||
self.opts.get("tcp_master_publish_pull", 4514)
|
||||
)
|
||||
else:
|
||||
pull_uri = "ipc://{}".format(
|
||||
os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
|
||||
)
|
||||
log.debug("Connecting to pub server: %s", pull_uri)
|
||||
self.pub_sock.connect(pull_uri)
|
||||
#if self.opts.get("ipc_mode", "") == "tcp":
|
||||
# pull_uri = "tcp://127.0.0.1:{}".format(
|
||||
# self.opts.get("tcp_master_publish_pull", 4514)
|
||||
# )
|
||||
#else:
|
||||
# pull_uri = "ipc://{}".format(
|
||||
# os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
|
||||
# )
|
||||
log.debug("Connecting to pub server: %s", self.pull_uri)
|
||||
self.pub_sock.connect(self.pull_uri)
|
||||
return self._sock_data.sock
|
||||
|
||||
def pub_close(self):
|
||||
|
@ -900,8 +930,13 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
|
|||
"""
|
||||
if not self.pub_sock:
|
||||
self.pub_connect()
|
||||
serialized = salt.payload.dumps(payload)
|
||||
self.pub_sock.send(serialized)
|
||||
log.error("Payload %r", payload)
|
||||
if "noserial" not in kwargs:
|
||||
serialized = salt.payload.dumps(payload)
|
||||
log.error("Serialized %r", serialized)
|
||||
self.pub_sock.send(serialized)
|
||||
else:
|
||||
self.pub_sock.send(payload)
|
||||
log.debug("Sent payload to publish daemon.")
|
||||
|
||||
@property
|
||||
|
|
|
@ -352,17 +352,27 @@ class SaltEvent:
|
|||
if self.cpub:
|
||||
return True
|
||||
|
||||
log.error("EVENT AT LEAS")
|
||||
if self._run_io_loop_sync:
|
||||
with salt.utils.asynchronous.current_ioloop(self.io_loop):
|
||||
if self.subscriber is None:
|
||||
#self.subscriber = salt.utils.asynchronous.SyncWrapper(
|
||||
# salt.transport.ipc.IPCMessageSubscriber,
|
||||
# args=(self.puburi,),
|
||||
# kwargs={"io_loop": self.io_loop},
|
||||
# loop_kwarg="io_loop",
|
||||
#)
|
||||
#self.subscriber = salt.transport.publish_client(self.opts)
|
||||
self.subscriber = salt.utils.asynchronous.SyncWrapper(
|
||||
salt.transport.ipc.IPCMessageSubscriber,
|
||||
args=(self.puburi,),
|
||||
salt.transport.publish_client,
|
||||
args=(self.opts,),
|
||||
kwargs={"io_loop": self.io_loop},
|
||||
loop_kwarg="io_loop",
|
||||
)
|
||||
try:
|
||||
self.subscriber.connect(timeout=timeout)
|
||||
#self.subscriber.connect(timeout=timeout)
|
||||
puburi = "ipc://{}".format(self.puburi)
|
||||
self.subscriber.connect_uri(puburi)
|
||||
self.cpub = True
|
||||
except tornado.iostream.StreamClosedError:
|
||||
log.error("Encountered StreamClosedException")
|
||||
|
@ -378,9 +388,15 @@ class SaltEvent:
|
|||
)
|
||||
else:
|
||||
if self.subscriber is None:
|
||||
self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
|
||||
self.puburi, io_loop=self.io_loop
|
||||
)
|
||||
if "master_ip" not in self.opts:
|
||||
self.opts["master_ip"] = ""
|
||||
self.subscriber = salt.transport.publish_client(self.opts, self.io_loop)
|
||||
puburi = "ipc://{}".format(self.puburi)
|
||||
self.subscriber.connect_uri(puburi)
|
||||
#self.io_loop.spawn_callback(self.subscriber.connect_uri, self.puburi)
|
||||
#self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
|
||||
# self.puburi, io_loop=self.io_loop
|
||||
#)
|
||||
|
||||
# For the asynchronous case, the connect will be defered to when
|
||||
# set_event_handler() is invoked.
|
||||
|
@ -410,11 +426,12 @@ class SaltEvent:
|
|||
if self._run_io_loop_sync:
|
||||
with salt.utils.asynchronous.current_ioloop(self.io_loop):
|
||||
if self.pusher is None:
|
||||
self.pusher = salt.utils.asynchronous.SyncWrapper(
|
||||
salt.transport.ipc.IPCMessageClient,
|
||||
args=(self.pulluri,),
|
||||
kwargs={"io_loop": self.io_loop},
|
||||
loop_kwarg="io_loop",
|
||||
self.pusher = salt.transport.publish_server(self.opts)
|
||||
self.pusher.pub_uri = "ipc://{}".format(
|
||||
os.path.join(self.opts["sock_dir"], "master_event_pub.ipc")
|
||||
)
|
||||
self.pusher.pull_uri = "ipc://{}".format(
|
||||
os.path.join(self.opts["sock_dir"], "master_event_pull.ipc")
|
||||
)
|
||||
#self.pusher = salt.utils.asynchronous.SyncWrapper(
|
||||
# salt.transport.ipc.IPCMessageClient,
|
||||
|
@ -423,7 +440,8 @@ class SaltEvent:
|
|||
# loop_kwarg="io_loop",
|
||||
#)
|
||||
try:
|
||||
self.pusher.connect(timeout=timeout)
|
||||
#self.pusher.connect(timeout=timeout)
|
||||
self.pusher.connect()
|
||||
self.cpush = True
|
||||
except tornado.iostream.StreamClosedError as exc:
|
||||
log.debug("Unable to connect pusher: %s", exc)
|
||||
|
@ -435,8 +453,15 @@ class SaltEvent:
|
|||
)
|
||||
else:
|
||||
if self.pusher is None:
|
||||
self.pusher = salt.transport.ipc.IPCMessageClient(
|
||||
self.pulluri, io_loop=self.io_loop
|
||||
#self.pusher = salt.transport.ipc.IPCMessageClient(
|
||||
# self.pulluri, io_loop=self.io_loop
|
||||
#)
|
||||
self.pusher = salt.transport.publish_server(self.opts)
|
||||
self.pusher.pub_uri = "ipc://{}".format(
|
||||
os.path.join(self.opts["sock_dir"], "master_event_pub.ipc")
|
||||
)
|
||||
self.pusher.pull_uri = "ipc://{}".format(
|
||||
os.path.join(self.opts["sock_dir"], "master_event_pull.ipc")
|
||||
)
|
||||
# For the asynchronous case, the connect will be deferred to when
|
||||
# fire_event() is invoked.
|
||||
|
@ -449,7 +474,6 @@ class SaltEvent:
|
|||
"""
|
||||
if not self.cpush:
|
||||
return
|
||||
|
||||
self.pusher.close()
|
||||
self.pusher = None
|
||||
self.cpush = False
|
||||
|
@ -570,9 +594,12 @@ class SaltEvent:
|
|||
try:
|
||||
if not self.cpub and not self.connect_pub(timeout=wait):
|
||||
break
|
||||
raw = self.subscriber.read(timeout=wait)
|
||||
#riraw = self.subscriber.read(timeout=wait)
|
||||
print(repr(self.subscriber))
|
||||
raw = self.subscriber.recv(timeout=wait)
|
||||
if raw is None:
|
||||
break
|
||||
print(raw)
|
||||
mtag, data = self.unpack(raw)
|
||||
ret = {"data": data, "tag": mtag}
|
||||
except KeyboardInterrupt:
|
||||
|
@ -693,7 +720,7 @@ class SaltEvent:
|
|||
if not self.cpub:
|
||||
if not self.connect_pub():
|
||||
return None
|
||||
raw = self.subscriber._read(timeout=0)
|
||||
raw = self.subscriber.recv(timeout=0)
|
||||
if raw is None:
|
||||
return None
|
||||
mtag, data = self.unpack(raw)
|
||||
|
@ -709,7 +736,7 @@ class SaltEvent:
|
|||
if not self.cpub:
|
||||
if not self.connect_pub():
|
||||
return None
|
||||
raw = self.subscriber._read(timeout=None)
|
||||
raw = self.subscriber.recv(timeout=None)
|
||||
if raw is None:
|
||||
return None
|
||||
mtag, data = self.unpack(raw)
|
||||
|
@ -767,7 +794,7 @@ class SaltEvent:
|
|||
is_msgpacked=True,
|
||||
use_bin_type=True,
|
||||
)
|
||||
log.debug("Sending event: tag = %s; data = %s", tag, data)
|
||||
log.error("Sending event(fire_event_async): tag = %s; data = %s %r", tag, data, self.pusher)
|
||||
event = b"".join(
|
||||
[
|
||||
salt.utils.stringutils.to_bytes(tag),
|
||||
|
@ -820,7 +847,7 @@ class SaltEvent:
|
|||
is_msgpacked=True,
|
||||
use_bin_type=True,
|
||||
)
|
||||
log.debug("Sending event: tag = %s; data = %s", tag, data)
|
||||
log.error("Sending event(fire_event): tag = %s; data = %s %s", tag, data, self.pusher.pull_uri)
|
||||
event = b"".join(
|
||||
[
|
||||
salt.utils.stringutils.to_bytes(tag),
|
||||
|
@ -829,11 +856,12 @@ class SaltEvent:
|
|||
]
|
||||
)
|
||||
msg = salt.utils.stringutils.to_bytes(event, "utf-8")
|
||||
log.error("FIRE EVENT %r", msg)
|
||||
if self._run_io_loop_sync:
|
||||
with salt.utils.asynchronous.current_ioloop(self.io_loop):
|
||||
try:
|
||||
self.pusher.send(msg)
|
||||
#self.pusher.send(msg)
|
||||
self.pusher.publish(msg, noserial=True)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
log.debug(
|
||||
"Publisher send failed with exception: %s",
|
||||
|
@ -842,7 +870,8 @@ class SaltEvent:
|
|||
)
|
||||
raise
|
||||
else:
|
||||
self.io_loop.spawn_callback(self.pusher.send, msg)
|
||||
self.pusher.publish(msg, noserial=True)
|
||||
#self.io_loop.spawn_callback(self.pusher.send, msg)
|
||||
return True
|
||||
|
||||
def fire_master(self, data, tag, timeout=1000):
|
||||
|
@ -956,7 +985,8 @@ class SaltEvent:
|
|||
if not self.cpub:
|
||||
self.connect_pub()
|
||||
# This will handle reconnects
|
||||
return self.subscriber.read_async(event_handler)
|
||||
#return self.subscriber.read_async(event_handler)
|
||||
self.subscriber.on_recv(event_handler)
|
||||
|
||||
# pylint: disable=W1701
|
||||
def __del__(self):
|
||||
|
@ -1055,205 +1085,205 @@ class MinionEvent(SaltEvent):
|
|||
)
|
||||
|
||||
|
||||
class AsyncEventPublisher:
|
||||
"""
|
||||
An event publisher class intended to run in an ioloop (within a single process)
|
||||
|
||||
TODO: remove references to "minion_event" whenever we need to use this for other things
|
||||
"""
|
||||
|
||||
def __init__(self, opts, io_loop=None):
|
||||
self.opts = salt.config.DEFAULT_MINION_OPTS.copy()
|
||||
default_minion_sock_dir = self.opts["sock_dir"]
|
||||
self.opts.update(opts)
|
||||
|
||||
self.io_loop = io_loop or tornado.ioloop.IOLoop.current()
|
||||
self._closing = False
|
||||
self.publisher = None
|
||||
self.puller = None
|
||||
|
||||
hash_type = getattr(hashlib, self.opts["hash_type"])
|
||||
# Only use the first 10 chars to keep longer hashes from exceeding the
|
||||
# max socket path length.
|
||||
id_hash = hash_type(
|
||||
salt.utils.stringutils.to_bytes(self.opts["id"])
|
||||
).hexdigest()[:10]
|
||||
epub_sock_path = os.path.join(
|
||||
self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
|
||||
)
|
||||
if os.path.exists(epub_sock_path):
|
||||
os.unlink(epub_sock_path)
|
||||
epull_sock_path = os.path.join(
|
||||
self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
|
||||
)
|
||||
if os.path.exists(epull_sock_path):
|
||||
os.unlink(epull_sock_path)
|
||||
|
||||
if self.opts["ipc_mode"] == "tcp":
|
||||
epub_uri = int(self.opts["tcp_pub_port"])
|
||||
epull_uri = int(self.opts["tcp_pull_port"])
|
||||
else:
|
||||
epub_uri = epub_sock_path
|
||||
epull_uri = epull_sock_path
|
||||
|
||||
log.debug("%s PUB socket URI: %s", self.__class__.__name__, epub_uri)
|
||||
log.debug("%s PULL socket URI: %s", self.__class__.__name__, epull_uri)
|
||||
|
||||
minion_sock_dir = self.opts["sock_dir"]
|
||||
|
||||
if not os.path.isdir(minion_sock_dir):
|
||||
# Let's try to create the directory defined on the configuration
|
||||
# file
|
||||
try:
|
||||
os.makedirs(minion_sock_dir, 0o755)
|
||||
except OSError as exc:
|
||||
log.error("Could not create SOCK_DIR: %s", exc)
|
||||
# Let's not fail yet and try using the default path
|
||||
if minion_sock_dir == default_minion_sock_dir:
|
||||
# We're already trying the default system path, stop now!
|
||||
raise
|
||||
|
||||
if not os.path.isdir(default_minion_sock_dir):
|
||||
try:
|
||||
os.makedirs(default_minion_sock_dir, 0o755)
|
||||
except OSError as exc:
|
||||
log.error("Could not create SOCK_DIR: %s", exc)
|
||||
# Let's stop at this stage
|
||||
raise
|
||||
|
||||
self.publisher = salt.transport.ipc.IPCMessagePublisher(
|
||||
self.opts, epub_uri, io_loop=self.io_loop
|
||||
)
|
||||
|
||||
self.puller = salt.transport.ipc.IPCMessageServer(
|
||||
epull_uri, io_loop=self.io_loop, payload_handler=self.handle_publish
|
||||
)
|
||||
|
||||
log.info("Starting pull socket on %s", epull_uri)
|
||||
with salt.utils.files.set_umask(0o177):
|
||||
self.publisher.start()
|
||||
self.puller.start()
|
||||
|
||||
def handle_publish(self, package, _):
|
||||
"""
|
||||
Get something from epull, publish it out epub, and return the package (or None)
|
||||
"""
|
||||
try:
|
||||
self.publisher.publish(package)
|
||||
return package
|
||||
# Add an extra fallback in case a forked process leeks through
|
||||
except Exception: # pylint: disable=broad-except
|
||||
log.critical("Unexpected error while polling minion events", exc_info=True)
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
if self._closing:
|
||||
return
|
||||
self._closing = True
|
||||
if self.publisher is not None:
|
||||
self.publisher.close()
|
||||
if self.puller is not None:
|
||||
self.puller.close()
|
||||
|
||||
|
||||
class EventPublisher(salt.utils.process.SignalHandlingProcess):
|
||||
"""
|
||||
The interface that takes master events and republishes them out to anyone
|
||||
who wants to listen
|
||||
"""
|
||||
|
||||
def __init__(self, opts, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.opts = salt.config.DEFAULT_MASTER_OPTS.copy()
|
||||
self.opts.update(opts)
|
||||
self._closing = False
|
||||
self.io_loop = None
|
||||
self.puller = None
|
||||
self.publisher = None
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Bind the pub and pull sockets for events
|
||||
"""
|
||||
if (
|
||||
self.opts["event_publisher_niceness"]
|
||||
and not salt.utils.platform.is_windows()
|
||||
):
|
||||
log.info(
|
||||
"setting EventPublisher niceness to %i",
|
||||
self.opts["event_publisher_niceness"],
|
||||
)
|
||||
os.nice(self.opts["event_publisher_niceness"])
|
||||
|
||||
self.io_loop = tornado.ioloop.IOLoop()
|
||||
with salt.utils.asynchronous.current_ioloop(self.io_loop):
|
||||
if self.opts["ipc_mode"] == "tcp":
|
||||
epub_uri = int(self.opts["tcp_master_pub_port"])
|
||||
epull_uri = int(self.opts["tcp_master_pull_port"])
|
||||
else:
|
||||
epub_uri = os.path.join(self.opts["sock_dir"], "master_event_pub.ipc")
|
||||
epull_uri = os.path.join(self.opts["sock_dir"], "master_event_pull.ipc")
|
||||
|
||||
self.publisher = salt.transport.ipc.IPCMessagePublisher(
|
||||
self.opts, epub_uri, io_loop=self.io_loop
|
||||
)
|
||||
|
||||
self.puller = salt.transport.ipc.IPCMessageServer(
|
||||
epull_uri,
|
||||
io_loop=self.io_loop,
|
||||
payload_handler=self.handle_publish,
|
||||
)
|
||||
|
||||
# Start the master event publisher
|
||||
with salt.utils.files.set_umask(0o177):
|
||||
self.publisher.start()
|
||||
self.puller.start()
|
||||
if self.opts["ipc_mode"] != "tcp" and (
|
||||
self.opts["publisher_acl"] or self.opts["external_auth"]
|
||||
):
|
||||
os.chmod( # nosec
|
||||
os.path.join(self.opts["sock_dir"], "master_event_pub.ipc"),
|
||||
0o660,
|
||||
)
|
||||
|
||||
atexit.register(self.close)
|
||||
with contextlib.suppress(KeyboardInterrupt):
|
||||
try:
|
||||
self.io_loop.start()
|
||||
finally:
|
||||
# Make sure the IO loop and respective sockets are closed and destroyed
|
||||
self.close()
|
||||
|
||||
def handle_publish(self, package, _):
|
||||
"""
|
||||
Get something from epull, publish it out epub, and return the package (or None)
|
||||
"""
|
||||
try:
|
||||
self.publisher.publish(package)
|
||||
return package
|
||||
# Add an extra fallback in case a forked process leeks through
|
||||
except Exception: # pylint: disable=broad-except
|
||||
log.critical("Unexpected error while polling master events", exc_info=True)
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
if self._closing:
|
||||
return
|
||||
self._closing = True
|
||||
atexit.unregister(self.close)
|
||||
if self.publisher is not None:
|
||||
self.publisher.close()
|
||||
self.publisher = None
|
||||
if self.puller is not None:
|
||||
self.puller.close()
|
||||
self.puller = None
|
||||
if self.io_loop is not None:
|
||||
self.io_loop.close()
|
||||
self.io_loop = None
|
||||
|
||||
def _handle_signals(self, signum, sigframe):
|
||||
self.close()
|
||||
super()._handle_signals(signum, sigframe)
|
||||
#class AsyncEventPublisher:
|
||||
# """
|
||||
# An event publisher class intended to run in an ioloop (within a single process)
|
||||
#
|
||||
# TODO: remove references to "minion_event" whenever we need to use this for other things
|
||||
# """
|
||||
#
|
||||
# def __init__(self, opts, io_loop=None):
|
||||
# self.opts = salt.config.DEFAULT_MINION_OPTS.copy()
|
||||
# default_minion_sock_dir = self.opts["sock_dir"]
|
||||
# self.opts.update(opts)
|
||||
#
|
||||
# self.io_loop = io_loop or tornado.ioloop.IOLoop.current()
|
||||
# self._closing = False
|
||||
# self.publisher = None
|
||||
# self.puller = None
|
||||
#
|
||||
# hash_type = getattr(hashlib, self.opts["hash_type"])
|
||||
# # Only use the first 10 chars to keep longer hashes from exceeding the
|
||||
# # max socket path length.
|
||||
# id_hash = hash_type(
|
||||
# salt.utils.stringutils.to_bytes(self.opts["id"])
|
||||
# ).hexdigest()[:10]
|
||||
# epub_sock_path = os.path.join(
|
||||
# self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
|
||||
# )
|
||||
# if os.path.exists(epub_sock_path):
|
||||
# os.unlink(epub_sock_path)
|
||||
# epull_sock_path = os.path.join(
|
||||
# self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
|
||||
# )
|
||||
# if os.path.exists(epull_sock_path):
|
||||
# os.unlink(epull_sock_path)
|
||||
#
|
||||
# if self.opts["ipc_mode"] == "tcp":
|
||||
# epub_uri = int(self.opts["tcp_pub_port"])
|
||||
# epull_uri = int(self.opts["tcp_pull_port"])
|
||||
# else:
|
||||
# epub_uri = epub_sock_path
|
||||
# epull_uri = epull_sock_path
|
||||
#
|
||||
# log.debug("%s PUB socket URI: %s", self.__class__.__name__, epub_uri)
|
||||
# log.debug("%s PULL socket URI: %s", self.__class__.__name__, epull_uri)
|
||||
#
|
||||
# minion_sock_dir = self.opts["sock_dir"]
|
||||
#
|
||||
# if not os.path.isdir(minion_sock_dir):
|
||||
# # Let's try to create the directory defined on the configuration
|
||||
# # file
|
||||
# try:
|
||||
# os.makedirs(minion_sock_dir, 0o755)
|
||||
# except OSError as exc:
|
||||
# log.error("Could not create SOCK_DIR: %s", exc)
|
||||
# # Let's not fail yet and try using the default path
|
||||
# if minion_sock_dir == default_minion_sock_dir:
|
||||
# # We're already trying the default system path, stop now!
|
||||
# raise
|
||||
#
|
||||
# if not os.path.isdir(default_minion_sock_dir):
|
||||
# try:
|
||||
# os.makedirs(default_minion_sock_dir, 0o755)
|
||||
# except OSError as exc:
|
||||
# log.error("Could not create SOCK_DIR: %s", exc)
|
||||
# # Let's stop at this stage
|
||||
# raise
|
||||
#
|
||||
# self.publisher = salt.transport.ipc.IPCMessagePublisher(
|
||||
# self.opts, epub_uri, io_loop=self.io_loop
|
||||
# )
|
||||
#
|
||||
# self.puller = salt.transport.ipc.IPCMessageServer(
|
||||
# epull_uri, io_loop=self.io_loop, payload_handler=self.handle_publish
|
||||
# )
|
||||
#
|
||||
# log.info("Starting pull socket on %s", epull_uri)
|
||||
# with salt.utils.files.set_umask(0o177):
|
||||
# self.publisher.start()
|
||||
# self.puller.start()
|
||||
#
|
||||
# def handle_publish(self, package, _):
|
||||
# """
|
||||
# Get something from epull, publish it out epub, and return the package (or None)
|
||||
# """
|
||||
# try:
|
||||
# self.publisher.publish(package)
|
||||
# return package
|
||||
# # Add an extra fallback in case a forked process leeks through
|
||||
# except Exception: # pylint: disable=broad-except
|
||||
# log.critical("Unexpected error while polling minion events", exc_info=True)
|
||||
# return None
|
||||
#
|
||||
# def close(self):
|
||||
# if self._closing:
|
||||
# return
|
||||
# self._closing = True
|
||||
# if self.publisher is not None:
|
||||
# self.publisher.close()
|
||||
# if self.puller is not None:
|
||||
# self.puller.close()
|
||||
#
|
||||
#
|
||||
#class EventPublisher(salt.utils.process.SignalHandlingProcess):
|
||||
# """
|
||||
# The interface that takes master events and republishes them out to anyone
|
||||
# who wants to listen
|
||||
# """
|
||||
#
|
||||
# def __init__(self, opts, **kwargs):
|
||||
# super().__init__(**kwargs)
|
||||
# self.opts = salt.config.DEFAULT_MASTER_OPTS.copy()
|
||||
# self.opts.update(opts)
|
||||
# self._closing = False
|
||||
# self.io_loop = None
|
||||
# self.puller = None
|
||||
# self.publisher = None
|
||||
#
|
||||
# def run(self):
|
||||
# """
|
||||
# Bind the pub and pull sockets for events
|
||||
# """
|
||||
# if (
|
||||
# self.opts["event_publisher_niceness"]
|
||||
# and not salt.utils.platform.is_windows()
|
||||
# ):
|
||||
# log.info(
|
||||
# "setting EventPublisher niceness to %i",
|
||||
# self.opts["event_publisher_niceness"],
|
||||
# )
|
||||
# os.nice(self.opts["event_publisher_niceness"])
|
||||
#
|
||||
# self.io_loop = tornado.ioloop.IOLoop()
|
||||
# with salt.utils.asynchronous.current_ioloop(self.io_loop):
|
||||
# if self.opts["ipc_mode"] == "tcp":
|
||||
# epub_uri = int(self.opts["tcp_master_pub_port"])
|
||||
# epull_uri = int(self.opts["tcp_master_pull_port"])
|
||||
# else:
|
||||
# epub_uri = os.path.join(self.opts["sock_dir"], "master_event_pub.ipc")
|
||||
# epull_uri = os.path.join(self.opts["sock_dir"], "master_event_pull.ipc")
|
||||
#
|
||||
# self.publisher = salt.transport.ipc.IPCMessagePublisher(
|
||||
# self.opts, epub_uri, io_loop=self.io_loop
|
||||
# )
|
||||
#
|
||||
# self.puller = salt.transport.ipc.IPCMessageServer(
|
||||
# epull_uri,
|
||||
# io_loop=self.io_loop,
|
||||
# payload_handler=self.handle_publish,
|
||||
# )
|
||||
#
|
||||
# # Start the master event publisher
|
||||
# with salt.utils.files.set_umask(0o177):
|
||||
# self.publisher.start()
|
||||
# self.puller.start()
|
||||
# if self.opts["ipc_mode"] != "tcp" and (
|
||||
# self.opts["publisher_acl"] or self.opts["external_auth"]
|
||||
# ):
|
||||
# os.chmod( # nosec
|
||||
# os.path.join(self.opts["sock_dir"], "master_event_pub.ipc"),
|
||||
# 0o660,
|
||||
# )
|
||||
#
|
||||
# atexit.register(self.close)
|
||||
# with contextlib.suppress(KeyboardInterrupt):
|
||||
# try:
|
||||
# self.io_loop.start()
|
||||
# finally:
|
||||
# # Make sure the IO loop and respective sockets are closed and destroyed
|
||||
# self.close()
|
||||
#
|
||||
# def handle_publish(self, package, _):
|
||||
# """
|
||||
# Get something from epull, publish it out epub, and return the package (or None)
|
||||
# """
|
||||
# try:
|
||||
# self.publisher.publish(package)
|
||||
# return package
|
||||
# # Add an extra fallback in case a forked process leeks through
|
||||
# except Exception: # pylint: disable=broad-except
|
||||
# log.critical("Unexpected error while polling master events", exc_info=True)
|
||||
# return None
|
||||
#
|
||||
# def close(self):
|
||||
# if self._closing:
|
||||
# return
|
||||
# self._closing = True
|
||||
# atexit.unregister(self.close)
|
||||
# if self.publisher is not None:
|
||||
# self.publisher.close()
|
||||
# self.publisher = None
|
||||
# if self.puller is not None:
|
||||
# self.puller.close()
|
||||
# self.puller = None
|
||||
# if self.io_loop is not None:
|
||||
# self.io_loop.close()
|
||||
# self.io_loop = None
|
||||
#
|
||||
# def _handle_signals(self, signum, sigframe):
|
||||
# self.close()
|
||||
# super()._handle_signals(signum, sigframe)
|
||||
|
||||
|
||||
class EventReturn(salt.utils.process.SignalHandlingProcess):
|
||||
|
|
Loading…
Add table
Reference in a new issue