More cleanup

This commit is contained in:
Daniel A. Wozniak 2023-06-27 22:24:13 -07:00 committed by Gareth J. Greenaway
parent fe5990536b
commit b4e407a8a9
8 changed files with 455 additions and 149 deletions

View file

@ -4,6 +4,7 @@ Salt package
import asyncio
import importlib
import locale
import os
import sys
import warnings
@ -92,11 +93,6 @@ warnings.filterwarnings(
def __define_global_system_encoding_variable__():
import sys
print("define global system encoding")
sys.stdout.flush()
# This is the most trustworthy source of the system encoding, though, if
# salt is being imported after being daemonized, this information is lost
# and reset to None
@ -112,17 +108,12 @@ def __define_global_system_encoding_variable__():
# If the system is properly configured this should return a valid
# encoding. MS Windows has problems with this and reports the wrong
# encoding
import locale
try:
encoding = locale.getencoding()
except AttributeError:
# Python < 3.11
encoding = locale.getpreferredencoding(do_setlocale=True)
# This is now garbage collectable
del locale
if not encoding:
# This is most likely ascii which is not the best but we were
# unable to find a better encoding. If this fails, we fall all
@ -146,7 +137,6 @@ def __define_global_system_encoding_variable__():
setattr(builtins, "__salt_system_encoding__", encoding)
# This is now garbage collectable
del sys
del builtins
del encoding

View file

@ -371,7 +371,6 @@ class AsyncReqChannel:
return self
async def __aexit__(self, exc_type, exc, tb):
# print("AEXIT")
self.close()
@ -383,7 +382,6 @@ class AsyncPubChannel:
async_methods = [
"connect",
"_decode_messages",
# "close",
]
close_methods = [
"close",
@ -441,7 +439,6 @@ class AsyncPubChannel:
try:
if not self.auth.authenticated:
yield self.auth.authenticate()
# log.error("*** Creds %r", self.auth.creds)
# if this is changed from the default, we assume it was intentional
if int(self.opts.get("publish_port", 4506)) != 4506:
publish_port = self.opts.get("publish_port")
@ -646,11 +643,16 @@ class AsyncPushChannel:
"""
# FIXME for now, just UXD
# Obviously, this makes the factory approach pointless, but we'll extend later
warn_until(
3008,
"AsyncPushChannel is deprecated. Use zeromq or tcp transport instead.",
)
import salt.transport.ipc
return salt.transport.ipc.IPCMessageClient(opts, **kwargs)
# XXX This is deprecated remove in 3008
class AsyncPullChannel:
"""
Factory class to create IPC pull channels
@ -661,6 +663,10 @@ class AsyncPullChannel:
"""
If we have additional IPC transports other than UXD and TCP, add them here
"""
warn_until(
3008,
"AsyncPullChannel is deprecated. Use zeromq or tcp transport instead.",
)
import salt.transport.ipc
return salt.transport.ipc.IPCMessageServer(opts, **kwargs)

View file

@ -1039,10 +1039,11 @@ class MWorker(salt.utils.process.SignalHandlingProcess):
"""
key = payload["enc"]
load = payload["load"]
ret = {"aes": self._handle_aes, "clear": self._handle_clear}[key](load)
while self.clear_funcs.tasks:
# dequeue
await self.clear_funcs.tasks.pop(0)
if key == "clear":
ret = await self._handle_clear(load)
else:
ret = self._handle_aes(load)
# ret = {"aes": self._handle_aes, "clear": self._handle_clear}[key](load)
return ret
def _post_stats(self, start, cmd):
@ -1067,7 +1068,7 @@ class MWorker(salt.utils.process.SignalHandlingProcess):
self.stats = collections.defaultdict(lambda: {"mean": 0, "runs": 0})
self.stat_clock = end
def _handle_clear(self, load):
async def _handle_clear(self, load):
"""
Process a cleartext command
@ -1083,7 +1084,11 @@ class MWorker(salt.utils.process.SignalHandlingProcess):
if self.opts["master_stats"]:
start = time.time()
self.stats[cmd]["runs"] += 1
ret = method(load), {"fun": "send_clear"}
if cmd in self.clear_funcs.async_methods:
reply = await method(load)
ret = reply, {"fun": "send_clear"}
else:
ret = method(load), {"fun": "send_clear"}
if self.opts["master_stats"]:
self._post_stats(start, cmd)
return ret
@ -1148,7 +1153,6 @@ class MWorker(salt.utils.process.SignalHandlingProcess):
self.opts["mworker_niceness"],
)
os.nice(self.opts["mworker_niceness"])
self.clear_funcs = ClearFuncs(
self.opts,
self.key,
@ -1968,6 +1972,7 @@ class ClearFuncs(TransportMethods):
"wheel",
"runner",
)
async_methods = ("publish",)
# The ClearFuncs object encapsulates the functions that can be executed in
# the clear:
@ -1995,8 +2000,6 @@ class ClearFuncs(TransportMethods):
# Make a masterapi object
self.masterapi = salt.daemons.masterapi.LocalFuncs(opts, key)
self.channels = []
self.tasks = []
# self.task_group = asyncio.TaskGroup()
def runner(self, clear_load):
"""
@ -2159,7 +2162,7 @@ class ClearFuncs(TransportMethods):
return False
return self.loadauth.get_tok(clear_load["token"])
def publish(self, clear_load):
async def publish(self, clear_load):
"""
This method sends out publications to the minions, it can only be used
by the LocalClient.
@ -2313,7 +2316,7 @@ class ClearFuncs(TransportMethods):
# Send it!
self._send_ssh_pub(payload, ssh_minions=ssh_minions)
self._send_pub(payload)
await self._send_pub(payload)
return {
"enc": "clear",
@ -2362,7 +2365,7 @@ class ClearFuncs(TransportMethods):
return {"error": msg}
return jid
def _send_pub(self, load):
async def _send_pub(self, load):
"""
Take a load and send it across the network to connected minions
"""
@ -2370,9 +2373,10 @@ class ClearFuncs(TransportMethods):
for transport, opts in iter_transport_opts(self.opts):
chan = salt.channel.server.PubServerChannel.factory(opts)
self.channels.append(chan)
tasks = set()
for chan in self.channels:
task = asyncio.create_task(chan.publish(load))
self.tasks.append(task)
tasks.add(asyncio.create_task(chan.publish(load)))
await asyncio.gather(*tasks)
@property
def ssh_client(self):

View file

@ -5,6 +5,7 @@ import asyncio
import binascii
import contextlib
import copy
import hashlib
import logging
import multiprocessing
import os
@ -1044,64 +1045,26 @@ class MinionManager(MinionBase):
def _bind(self):
# start up the event publisher, so we can see events during startup
# self.event_publisher = salt.utils.event.AsyncEventPublisher(
# self.opts,
# io_loop=self.io_loop,
# )
# import hashlib
# ipc_publisher = salt.transport.publish_server(self.opts)
# hash_type = getattr(hashlib, self.opts["hash_type"])
# id_hash = hash_type(
# salt.utils.stringutils.to_bytes(self.opts["id"])
# ).hexdigest()[:10]
# epub_sock_path = "ipc://{}".format(
# os.path.join(
# self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
# )
# )
# if os.path.exists(epub_sock_path):
# os.unlink(epub_sock_path)
# epull_sock_path = "ipc://{}".format(
# os.path.join(
# self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
# )
# )
# ipc_publisher.pub_uri = epub_sock_path
# ipc_publisher.pull_uri = epull_sock_path
# self.io_loop.add_callback(ipc_publisher.publisher, ipc_publisher.publish_payload, self.io_loop)
import hashlib
ipc_publisher = salt.transport.publish_server(self.opts)
hash_type = getattr(hashlib, self.opts["hash_type"])
id_hash = hash_type(
salt.utils.stringutils.to_bytes(self.opts["id"])
).hexdigest()[:10]
epub_sock_path = "ipc://{}".format(
os.path.join(
self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
)
epub_sock_path = os.path.join(
self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
)
epull_sock_path = os.path.join(
self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
)
if os.path.exists(epub_sock_path):
os.unlink(epub_sock_path)
epull_sock_path = "ipc://{}".format(
os.path.join(
self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
)
ipc_publisher = salt.transport.publish_server(
self.opts,
pub_path=epub_sock_path,
pull_path=epull_sock_path,
)
self.io_loop.add_callback(
ipc_publisher.publisher, ipc_publisher.publish_payload, self.io_loop
)
ipc_publisher.pub_uri = epub_sock_path
ipc_publisher.pull_uri = epull_sock_path
if self.opts["transport"] == "tcp":
def target():
ipc_publisher.publish_daemon(ipc_publisher.publish_payload)
proc = salt.utils.process.Process(target=target, daemon=True)
proc.start()
else:
self.io_loop.add_callback(
ipc_publisher.publisher, ipc_publisher.publish_payload, self.io_loop
)
self.event = salt.utils.event.get_event(
"minion", opts=self.opts, io_loop=self.io_loop
)
@ -3296,20 +3259,11 @@ class Minion(MinionBase):
del self.schedule
if hasattr(self, "pub_channel") and self.pub_channel is not None:
self.pub_channel.on_recv(None)
log.error("create pub_channel.close task %r", self)
self.pub_channel.close()
# self.io_loop.asyncio_loop.run_until_complete(self.pub_channel.close())
# if hasattr(self.pub_channel, "close"):
# asyncio.create_task(
# self.pub_channel.close()
# )
# #self.pub_channel.close()
# del self.pub_channel
if hasattr(self, "event"):
log.error("HAS EVENT")
# if hasattr(self, "periodic_callbacks"):
# for cb in self.periodic_callbacks.values():
# cb.stop()
del self.pub_channel
if hasattr(self, "periodic_callbacks"):
for cb in self.periodic_callbacks.values():
cb.stop()
log.error("%r destroy method finished", self)
# pylint: disable=W1701

View file

@ -1,7 +1,5 @@
import os
import tornado.gen
TRANSPORTS = (
"zeromq",
"tcp",
@ -142,8 +140,7 @@ class RequestClient:
def __init__(self, opts, io_loop, **kwargs):
pass
@tornado.gen.coroutine
def send(self, load, timeout=60):
async def send(self, load, timeout=60):
"""
Send a request message and return the reply from the server.
"""
@ -250,8 +247,9 @@ class PublishClient:
"""
raise NotImplementedError
@tornado.gen.coroutine
def connect(self, publish_port, connect_callback=None, disconnect_callback=None):
async def connect(
self, publish_port, connect_callback=None, disconnect_callback=None
):
"""
Create a network connection to the the PublishServer or broker.
"""

View file

@ -20,9 +20,14 @@ from tornado.locks import Lock
import salt.transport.frame
import salt.utils.msgpack
from salt.utils.versions import warn_until
log = logging.getLogger(__name__)
warn_until(
3008,
"This module is deprecated. Use zeromq or tcp transport instead.",
)
# 'tornado.concurrent.Future' doesn't support
# remove_done_callback() which we would have called

View file

@ -28,7 +28,6 @@ from tornado.locks import Lock
import salt.master
import salt.payload
import salt.transport.frame
import salt.transport.ipc
import salt.utils.asynchronous
import salt.utils.files
import salt.utils.msgpack
@ -307,7 +306,7 @@ class TCPPubClient(salt.transport.base.PublishClient):
exc,
self.backoff,
)
await tornado.gen.sleep(self.backoff)
await asyncio.sleep(self.backoff)
return stream
async def _connect(self):
@ -533,10 +532,9 @@ class TCPReqServer(salt.transport.base.DaemonizedRequestServer):
self.req_server.add_socket(self._socket)
self._socket.listen(self.backlog)
@tornado.gen.coroutine
def handle_message(self, stream, payload, header=None):
async def handle_message(self, stream, payload, header=None):
payload = self.decode_payload(payload)
reply = yield self.message_handler(payload)
reply = await self.message_handler(payload)
# XXX Handle StreamClosedError
stream.write(salt.transport.frame.frame_msg(reply, header=header))
@ -558,8 +556,7 @@ class SaltMessageServer(tornado.tcpserver.TCPServer):
self.clients = []
self.message_handler = message_handler
@tornado.gen.coroutine
def handle_stream( # pylint: disable=arguments-differ
async def handle_stream( # pylint: disable=arguments-differ
self,
stream,
address,
@ -573,7 +570,7 @@ class SaltMessageServer(tornado.tcpserver.TCPServer):
unpacker = salt.utils.msgpack.Unpacker()
try:
while True:
wire_bytes = yield stream.read_bytes(4096, partial=True)
wire_bytes = await stream.read_bytes(4096, partial=True)
unpacker.feed(wire_bytes)
for framed_msg in unpacker:
framed_msg = salt.transport.frame.decode_embedded_strs(framed_msg)
@ -749,8 +746,7 @@ class MessageClient:
if self.task is not None:
self.task.cancel()
@tornado.gen.coroutine
def check_close(self):
async def check_close(self):
if not self.send_future_map:
self._tcp_client.close()
self._stream = None
@ -797,7 +793,7 @@ class MessageClient:
exc,
self.backoff,
)
await tornado.gen.sleep(self.backoff)
await asyncio.sleep(self.backoff)
return stream
async def connect(self):
@ -1071,13 +1067,12 @@ class PubServer(tornado.tcpserver.TCPServer):
# pylint: enable=W1701
@tornado.gen.coroutine
def _stream_read(self, client):
async def _stream_read(self, client):
unpacker = salt.utils.msgpack.Unpacker()
while not self._closing:
try:
client._read_until_future = client.stream.read_bytes(4096, partial=True)
wire_bytes = yield client._read_until_future
wire_bytes = await client._read_until_future
unpacker.feed(wire_bytes)
for framed_msg in unpacker:
framed_msg = salt.transport.frame.decode_embedded_strs(framed_msg)
@ -1140,6 +1135,174 @@ class PubServer(tornado.tcpserver.TCPServer):
log.trace("TCP PubServer finished publishing payload")
class TCPServer:
"""
A Tornado IPC server very similar to Tornado's TCPServer class
but using either UNIX domain sockets or TCP sockets
"""
async_methods = [
"handle_stream",
]
close_methods = [
"close",
]
def __init__(self, socket_path, io_loop=None, payload_handler=None):
"""
Create a new Tornado IPC server
:param str/int socket_path: Path on the filesystem for the
socket to bind to. This socket does
not need to exist prior to calling
this method, but parent directories
should.
It may also be of type 'int', in
which case it is used as the port
for a tcp localhost connection.
:param IOLoop io_loop: A Tornado ioloop to handle scheduling
:param func payload_handler: A function to customize handling of
incoming data.
"""
self.socket_path = socket_path
self._started = False
self.payload_handler = payload_handler
# Placeholders for attributes to be populated by method calls
self.sock = None
self.io_loop = io_loop or tornado.ioloop.IOLoop.current()
self._closing = False
def start(self):
"""
Perform the work necessary to start up a Tornado IPC server
Blocks until socket is established
"""
# Start up the ioloop
log.trace("IPCServer: binding to socket: %s", self.socket_path)
if isinstance(self.socket_path, int):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.setblocking(0)
self.sock.bind(("127.0.0.1", self.socket_path))
# Based on default used in tornado.netutil.bind_sockets()
self.sock.listen(128)
else:
self.sock = tornado.netutil.bind_unix_socket(self.socket_path)
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
)
self._started = True
async def handle_stream(self, stream):
"""
Override this to handle the streams as they arrive
:param IOStream stream: An IOStream for processing
See https://tornado.readthedocs.io/en/latest/iostream.html#tornado.iostream.IOStream
for additional details.
"""
async def _null(msg):
return
def write_callback(stream, header):
if header.get("mid"):
async def return_message(msg):
pack = salt.transport.frame.frame_msg_ipc(
msg,
header={"mid": header["mid"]},
raw_body=True,
)
await stream.write(pack)
return return_message
else:
return _null
# msgpack deprecated `encoding` starting with version 0.5.2
if salt.utils.msgpack.version >= (0, 5, 2):
# Under Py2 we still want raw to be set to True
msgpack_kwargs = {"raw": False}
else:
msgpack_kwargs = {"encoding": "utf-8"}
unpacker = salt.utils.msgpack.Unpacker(**msgpack_kwargs)
while not stream.closed():
try:
wire_bytes = await stream.read_bytes(4096, partial=True)
unpacker.feed(wire_bytes)
for framed_msg in unpacker:
body = framed_msg["body"]
self.io_loop.spawn_callback(
self.payload_handler,
body,
write_callback(stream, framed_msg["head"]),
)
except tornado.iostream.StreamClosedError:
log.trace("Client disconnected from IPC %s", self.socket_path)
break
except OSError as exc:
# On occasion an exception will occur with
# an error code of 0, it's a spurious exception.
if exc.errno == 0:
log.trace(
"Exception occurred with error number 0, "
"spurious exception: %s",
exc,
)
else:
log.error("Exception occurred while handling stream: %s", exc)
except Exception as exc: # pylint: disable=broad-except
log.error("Exception occurred while handling stream: %s", exc)
def handle_connection(self, connection, address):
log.error(
"IPCServer: Handling connection to address: %s",
address if address else connection,
)
try:
stream = tornado.iostream.IOStream(
connection,
)
self.io_loop.spawn_callback(self.handle_stream, stream)
except Exception as exc: # pylint: disable=broad-except
log.error("IPC streaming error: %s", exc)
def close(self):
"""
Routines to handle any cleanup before the instance shuts down.
Sockets and filehandles should be closed explicitly, to prevent
leaks.
"""
if self._closing:
return
self._closing = True
if hasattr(self.sock, "close"):
self.sock.close()
# pylint: disable=W1701
def __del__(self):
try:
self.close()
except TypeError:
# This is raised when Python's GC has collected objects which
# would be needed when calling self.close()
pass
# pylint: enable=W1701
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
"""
Tornado based TCP PublishServer
@ -1213,7 +1376,13 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
Bind to the interface specified in the configuration file
"""
io_loop = tornado.ioloop.IOLoop()
ioloop.add_callback(self.publisher, publish_payload)
io_loop.add_callback(
self.publisher,
publish_payload,
presence_callback,
remove_presence_callback,
io_loop,
)
# run forever
try:
io_loop.start()
@ -1222,18 +1391,15 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
finally:
self.close()
async def publisher(self, publish_payload, ioloop=None):
if ioloop is None:
ioloop = tornado.ioloop.IOLoop.current()
ioloop.asyncio_loop.set_debug(True)
# log.error(
# "TCPPubServer daemon %r %s %s %s",
# self,
# self.pull_uri,
# self.publish_port,
# self.pub_uri,
# )
async def publisher(
self,
publish_payload,
presence_callback=None,
remove_presence_callback=None,
io_loop=None,
):
if io_loop is None:
io_loop = tornado.ioloop.IOLoop.current()
# Spin up the publisher
self.pub_server = pub_server = PubServer(
self.opts,
@ -1254,25 +1420,13 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
pub_server.add_socket(sock)
# Set up Salt IPC server
# if self.opts.get("ipc_mode", "") == "tcp":
# pull_uri = int(self.opts.get("tcp_master_publish_pull", 4514))
# else:
# pull_uri = os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
self.pub_server = pub_server
# if "ipc://" in self.pull_uri:
# pull_uri = pull_uri = self.pull_uri.replace("ipc://", "")
# log.error("WTF PULL URI %r", pull_uri)
# elif "tcp://" in self.pull_uri:
# log.error("Fallback to publish port %r", self.pull_uri)
# pull_uri = self.publish_port
# else:
# pull_uri = self.pull_uri
if self.pull_path:
pull_uri = self.pull_path
else:
pull_uri = self.pull_port
self.pull_sock = salt.transport.ipc.IPCMessageServer(
self.pull_sock = TCPServer(
pull_uri,
io_loop=io_loop,
payload_handler=publish_payload,
@ -1295,19 +1449,12 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
return await self.pub_server.publish_payload(payload)
def connect(self):
# path = self.pull_uri.replace("ipc://", "")
log.error("Connect pusher %s", self.pull_path)
# self.pub_sock = salt.utils.asynchronous.SyncWrapper(
# salt.transport.ipc.IPCMessageClient,
# (path,),
# loop_kwarg="io_loop",
# )
log.debug("Connect pusher %s", self.pull_path)
self.pub_sock = salt.utils.asynchronous.SyncWrapper(
salt.transport.ipc.IPCMessageClient,
TCPMessageClient,
(self.pull_path,),
loop_kwarg="io_loop",
)
# self.pub_sock = salt.transport.ipc.IPCMessageClient(path)
self.pub_sock.connect()
async def publish(self, payload, **kwargs):
@ -1336,6 +1483,197 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
self.pub_sock = None
class TCPMessageClient:
"""
Salt IPC message client
Create an IPC client to send messages to an IPC server
An example of a very simple IPCMessageClient connecting to an IPCServer. This
example assumes an already running IPCMessage server.
IMPORTANT: The below example also assumes a running IOLoop process.
# Import Tornado libs
import tornado.ioloop
# Import Salt libs
import salt.config
import salt.transport.ipc
io_loop = tornado.ioloop.IOLoop.current()
ipc_server_socket_path = '/var/run/ipc_server.ipc'
ipc_client = salt.transport.ipc.IPCMessageClient(ipc_server_socket_path, io_loop=io_loop)
# Connect to the server
ipc_client.connect()
# Send some data
ipc_client.send('Hello world')
"""
async_methods = [
"send",
"connect",
"_connect",
]
close_methods = [
"close",
]
def __init__(self, socket_path, io_loop=None):
"""
Create a new IPC client
IPC clients cannot bind to ports, but must connect to
existing IPC servers. Clients can then send messages
to the server.
"""
self.io_loop = io_loop or tornado.ioloop.IOLoop.current()
self.socket_path = socket_path
self._closing = False
self.stream = None
# msgpack deprecated `encoding` starting with version 0.5.2
if salt.utils.msgpack.version >= (0, 5, 2):
# Under Py2 we still want raw to be set to True
msgpack_kwargs = {"raw": False}
else:
msgpack_kwargs = {"encoding": "utf-8"}
self.unpacker = salt.utils.msgpack.Unpacker(**msgpack_kwargs)
self._connecting_future = None
def connected(self):
return self.stream is not None and not self.stream.closed()
def connect(self, callback=None, timeout=None):
"""
Connect to the IPC socket
"""
if self._connecting_future is not None and not self._connecting_future.done():
future = self._connecting_future
else:
if self._connecting_future is not None:
# read previous future result to prevent the "unhandled future exception" error
self._connecting_future.exception() # pylint: disable=E0203
future = tornado.concurrent.Future()
self._connecting_future = future
# self._connect(timeout)
self.io_loop.spawn_callback(self._connect, timeout)
if callback is not None:
def handle_future(future):
response = future.result()
self.io_loop.add_callback(callback, response)
future.add_done_callback(handle_future)
return future
async def _connect(self, timeout=None):
"""
Connect to a running IPCServer
"""
if isinstance(self.socket_path, int):
sock_type = socket.AF_INET
sock_addr = ("127.0.0.1", self.socket_path)
else:
sock_type = socket.AF_UNIX
sock_addr = self.socket_path
self.stream = None
if timeout is not None:
timeout_at = time.time() + timeout
while True:
if self._closing:
break
if self.stream is None:
# with salt.utils.asynchronous.current_ioloop(self.io_loop):
self.stream = tornado.iostream.IOStream(
socket.socket(sock_type, socket.SOCK_STREAM)
)
try:
log.trace(
"TCPMessageClient: Connecting to socket: %s", self.socket_path
)
await self.stream.connect(sock_addr)
self._connecting_future.set_result(True)
break
except Exception as e: # pylint: disable=broad-except
if self.stream.closed():
self.stream = None
if timeout is None or time.time() > timeout_at:
if self.stream is not None:
self.stream.close()
self.stream = None
self._connecting_future.set_exception(e)
break
await asyncio.sleep(1)
def close(self):
"""
Routines to handle any cleanup before the instance shuts down.
Sockets and filehandles should be closed explicitly, to prevent
leaks.
"""
if self._closing:
return
self._closing = True
self._connecting_future = None
log.debug("Closing %s instance", self.__class__.__name__)
if self.stream is not None and not self.stream.closed():
try:
self.stream.close()
except OSError as exc:
if exc.errno != errno.EBADF:
# If its not a bad file descriptor error, raise
raise
# pylint: disable=W1701
def __del__(self):
try:
self.close()
except TypeError:
# This is raised when Python's GC has collected objects which
# would be needed when calling self.close()
pass
# pylint: enable=W1701
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
# FIXME timeout unimplemented
# FIXME tries unimplemented
# @tornado.gen.coroutine
async def send(self, msg, timeout=None, tries=None):
"""
Send a message to an IPC socket
If the socket is not currently connected, a connection will be established.
:param dict msg: The message to be sent
:param int timeout: Timeout when sending message (Currently unimplemented)
"""
if not self.connected():
await self.connect()
pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True)
await self.stream.write(pack)
class TCPReqClient(salt.transport.base.RequestClient):
"""
Tornado based TCP RequestClient

View file

@ -68,7 +68,6 @@ import salt.channel.client
import salt.config
import salt.defaults.exitcodes
import salt.payload
import salt.transport.ipc
import salt.utils.asynchronous
import salt.utils.cache
import salt.utils.dicttrim
@ -77,6 +76,7 @@ import salt.utils.platform
import salt.utils.process
import salt.utils.stringutils
import salt.utils.zeromq
from salt.utils.versions import warn_until
log = logging.getLogger(__name__)
@ -837,10 +837,9 @@ class SaltEvent:
use_bin_type=True,
)
log.debug(
"Sending event(fire_event): tag = %s; data = %s %s",
"Sending event(fire_event): tag = %s; data = %s",
tag,
data,
self.pusher.pull_uri,
)
event = b"".join(
[
@ -1084,6 +1083,11 @@ class AsyncEventPublisher:
"""
def __init__(self, opts, io_loop=None):
warn_until(
3008,
"salt.utils.event.AsyncEventPublisher is deprecated. "
"Please use salt.transport.publish_server instead.",
)
self.opts = salt.config.DEFAULT_MINION_OPTS.copy()
default_minion_sock_dir = self.opts["sock_dir"]
self.opts.update(opts)
@ -1184,6 +1188,11 @@ class EventPublisher(salt.utils.process.SignalHandlingProcess):
"""
def __init__(self, opts, **kwargs):
warn_until(
3008,
"salt.utils.event.EventPublisher is deprecated. "
"Please use salt.transport.publish_server instead.",
)
super().__init__(**kwargs)
self.opts = salt.config.DEFAULT_MASTER_OPTS.copy()
self.opts.update(opts)
@ -1196,6 +1205,8 @@ class EventPublisher(salt.utils.process.SignalHandlingProcess):
"""
Bind the pub and pull sockets for events
"""
import salt.transport.ipc
if (
self.opts["event_publisher_niceness"]
and not salt.utils.platform.is_windows()