mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Clean up cruft
This commit is contained in:
parent
9ef3a59698
commit
3294e14752
3 changed files with 7 additions and 80 deletions
|
@ -390,15 +390,6 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
events, _, _ = select.select([self._stream.socket], [], [], 0)
|
||||
except TimeoutError:
|
||||
events = []
|
||||
# except TypeError:
|
||||
# # Stream's socket is closed.
|
||||
# stream = self._stream
|
||||
# self._stream = None
|
||||
# stream.close()
|
||||
# if self.disconnect_callback:
|
||||
# self.disconnect_callback()
|
||||
# await self.connect()
|
||||
# events = []
|
||||
if events:
|
||||
while not self._closing:
|
||||
await self._read_in_progress.acquire()
|
||||
|
@ -413,9 +404,6 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
self.disconnect_callback()
|
||||
await self.connect()
|
||||
return
|
||||
# except Exception:
|
||||
# log.error("Unhandled Exception")
|
||||
# raise
|
||||
finally:
|
||||
self._read_in_progress.release()
|
||||
self.unpacker.feed(byts)
|
||||
|
@ -451,10 +439,6 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
await self.connect()
|
||||
log.debug("Re-connected - continue")
|
||||
continue
|
||||
# except AttributeError:
|
||||
# return
|
||||
# except Exception:
|
||||
# raise
|
||||
finally:
|
||||
self._read_in_progress.release()
|
||||
self.unpacker.feed(byts)
|
||||
|
@ -470,17 +454,6 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
msg = await self.recv()
|
||||
if msg:
|
||||
callback(msg)
|
||||
# except tornado.iostream.StreamClosedError:
|
||||
# log.trace("Stream closed, reconnecting.")
|
||||
# self._stream.close()
|
||||
# self._stream = None
|
||||
# await self._connect()
|
||||
# if self.disconnect_callback:
|
||||
# self.disconnect_callback()
|
||||
# self.unpacker = salt.utils.msgpack.Unpacker()
|
||||
# continue
|
||||
# except Exception: # py-lint: disable=broad-except
|
||||
# log.error("Unhandled exception in on_recv handler.", exc_info=True)
|
||||
|
||||
def on_recv(self, callback):
|
||||
"""
|
||||
|
@ -1788,7 +1761,6 @@ class TCPReqClient(salt.transport.base.RequestClient):
|
|||
if self._stream:
|
||||
if not self._stream_return_running:
|
||||
self.task = asyncio.create_task(self._stream_return())
|
||||
# self.io_loop.spawn_callback(self._stream_return)
|
||||
if self.connect_callback is not None:
|
||||
self.connect_callback()
|
||||
|
||||
|
@ -1882,7 +1854,7 @@ class TCPReqClient(salt.transport.base.RequestClient):
|
|||
if future is not None:
|
||||
future.set_exception(SaltReqTimeoutError("Message timed out"))
|
||||
|
||||
async def send(self, load, timeout=60): # , callback=None, raw=False, reply=True):
|
||||
async def send(self, load, timeout=60):
|
||||
await self.connect()
|
||||
if self._closing:
|
||||
raise ClosingError()
|
||||
|
@ -1892,12 +1864,6 @@ class TCPReqClient(salt.transport.base.RequestClient):
|
|||
header = {"mid": message_id}
|
||||
future = tornado.concurrent.Future()
|
||||
|
||||
# if callback is not None:
|
||||
# def handle_future(future):
|
||||
# response = future.result()
|
||||
# self.io_loop.add_callback(callback, response)
|
||||
# future.add_done_callback(handle_future)
|
||||
|
||||
# Add this future to the mapping
|
||||
self.send_future_map[message_id] = future
|
||||
|
||||
|
|
|
@ -284,18 +284,6 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
if connect_callback:
|
||||
await connect_callback(True)
|
||||
|
||||
# @property
|
||||
# def master_pub(self):
|
||||
# """
|
||||
# Return the master publish port
|
||||
# """
|
||||
# return _get_master_uri(
|
||||
# self.opts["master_ip"],
|
||||
# self.publish_port,
|
||||
# source_ip=self.opts.get("source_ip"),
|
||||
# source_port=self.opts.get("source_publish_port"),
|
||||
# )
|
||||
|
||||
def _decode_messages(self, messages):
|
||||
"""
|
||||
Take the zmq messages, decrypt/decode them into a payload
|
||||
|
@ -378,9 +366,6 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
except zmq.error.ZMQError as exc:
|
||||
# We've disconnected just die
|
||||
break
|
||||
# except Exception: # pylint: disable=broad-except
|
||||
# log.error("WTF", exc_info=True)
|
||||
# break
|
||||
if msg:
|
||||
try:
|
||||
await callback(msg)
|
||||
|
@ -615,9 +600,6 @@ def _set_tcp_keepalive(zmq_socket, opts):
|
|||
zmq_socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, opts["tcp_keepalive_intvl"])
|
||||
|
||||
|
||||
ctx = zmq.asyncio.Context()
|
||||
|
||||
|
||||
# TODO: unit tests!
|
||||
class AsyncReqMessageClient:
|
||||
"""
|
||||
|
@ -824,10 +806,8 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
|
|||
Encapsulate synchronous operations for a publisher channel
|
||||
"""
|
||||
|
||||
# _sock_data = threading.local()
|
||||
async_methods = [
|
||||
"publish",
|
||||
# "close",
|
||||
]
|
||||
close_methods = [
|
||||
"close",
|
||||
|
@ -835,18 +815,6 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
|
|||
|
||||
def __init__(self, opts, **kwargs):
|
||||
self.opts = opts
|
||||
# if self.opts.get("ipc_mode", "") == "tcp":
|
||||
# self.pull_uri = "tcp://127.0.0.1:{}".format(
|
||||
# self.opts.get("tcp_master_publish_pull", 4514)
|
||||
# )
|
||||
# else:
|
||||
# self.pull_uri = "ipc://{}".format(
|
||||
# os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
|
||||
# )
|
||||
# interface = self.opts.get("interface", "127.0.0.1")
|
||||
# publish_port = self.opts.get("publish_port", 4560)
|
||||
# self.pub_uri = f"tcp://{interface}:{publish_port}"
|
||||
|
||||
pub_host = kwargs.get("pub_host", None)
|
||||
pub_port = kwargs.get("pub_port", None)
|
||||
pub_path = kwargs.get("pub_path", None)
|
||||
|
@ -960,7 +928,6 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
|
|||
|
||||
async def publish_payload(self, payload, topic_list=None):
|
||||
log.trace("Publish payload %r", payload)
|
||||
# payload = salt.payload.dumps(payload)
|
||||
if self.opts["zmq_filtering"]:
|
||||
if topic_list:
|
||||
for topic in topic_list:
|
||||
|
|
|
@ -431,10 +431,6 @@ class SaltEvent:
|
|||
"""
|
||||
if not self.cpub:
|
||||
return
|
||||
# if isinstance(self.subscriber, salt.utils.asynchronous.SyncWrapper):
|
||||
# self.subscriber.close()
|
||||
# else:
|
||||
# asyncio.create_task(self.subscriber.close())
|
||||
self.subscriber.close()
|
||||
self.subscriber = None
|
||||
self.pending_events = []
|
||||
|
@ -818,9 +814,12 @@ class SaltEvent:
|
|||
)
|
||||
msg = salt.utils.stringutils.to_bytes(event, "utf-8")
|
||||
self.pusher.publish(msg)
|
||||
# ret = yield self.pusher.send(msg)
|
||||
# if cb is not None:
|
||||
# cb(ret)
|
||||
if cb is not None:
|
||||
warn_until(
|
||||
3008,
|
||||
"The cb argument to fire_event_async will be removed in 3008",
|
||||
)
|
||||
cb(None)
|
||||
|
||||
def fire_event(self, data, tag, timeout=1000):
|
||||
"""
|
||||
|
@ -876,7 +875,6 @@ class SaltEvent:
|
|||
msg = salt.utils.stringutils.to_bytes(event, "utf-8")
|
||||
if self._run_io_loop_sync:
|
||||
try:
|
||||
# self.pusher.send(msg)
|
||||
self.pusher.publish(msg)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
log.debug(
|
||||
|
@ -887,7 +885,6 @@ class SaltEvent:
|
|||
raise
|
||||
else:
|
||||
asyncio.create_task(self.pusher.publish(msg))
|
||||
# self.io_loop.spawn_callback(self.pusher.send, msg)
|
||||
return True
|
||||
|
||||
def fire_master(self, data, tag, timeout=1000):
|
||||
|
@ -905,8 +902,6 @@ class SaltEvent:
|
|||
self.close_pub()
|
||||
if self.pusher is not None:
|
||||
self.close_pull()
|
||||
# if self._run_io_loop_sync and not self.keep_loop:
|
||||
# self.io_loop.close()
|
||||
|
||||
def _fire_ret_load_specific_fun(self, load, fun_index=0):
|
||||
"""
|
||||
|
@ -1000,7 +995,6 @@ class SaltEvent:
|
|||
if not self.cpub:
|
||||
self.connect_pub()
|
||||
# This will handle reconnects
|
||||
# return self.subscriber.read_async(event_handler)
|
||||
self.io_loop.spawn_callback(self.subscriber.on_recv, event_handler)
|
||||
|
||||
# pylint: disable=W1701
|
||||
|
|
Loading…
Add table
Reference in a new issue