mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Fix minion comming up without master
This commit is contained in:
parent
2f6241aaff
commit
0dc6cfc78f
4 changed files with 106 additions and 25 deletions
|
@ -1062,7 +1062,7 @@ class MinionManager(MinionBase):
|
|||
pub_path=epub_sock_path,
|
||||
pull_path=epull_sock_path,
|
||||
)
|
||||
self.io_loop.add_callback(
|
||||
self.io_loop.spawn_callback(
|
||||
ipc_publisher.publisher, ipc_publisher.publish_payload, self.io_loop
|
||||
)
|
||||
self.event = salt.utils.event.get_event(
|
||||
|
|
|
@ -390,13 +390,17 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
self._read_in_progress.release()
|
||||
|
||||
async def on_recv_handler(self, callback):
|
||||
log.error("ON RECV HANDLER")
|
||||
while not self._stream:
|
||||
await asyncio.sleep(0.003)
|
||||
while True:
|
||||
try:
|
||||
log.error("ON RECV HANDLER - RECV")
|
||||
msg = await self.recv()
|
||||
log.error("ON RECV HANDLER - RECVED")
|
||||
logit = True
|
||||
except tornado.iostream.StreamClosedError:
|
||||
log.error("Stream Closed")
|
||||
self._stream.close()
|
||||
self._stream = None
|
||||
await self._connect()
|
||||
|
@ -404,6 +408,8 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
self.disconnect_callback()
|
||||
self.unpacker = salt.utils.msgpack.Unpacker()
|
||||
continue
|
||||
except:
|
||||
log.error("Stream Closed", exc_info=True)
|
||||
callback(msg)
|
||||
|
||||
def on_recv(self, callback):
|
||||
|
@ -1432,6 +1438,7 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
|
|||
process_manager.add_process(self.publish_daemon, name=self.__class__.__name__)
|
||||
|
||||
async def publish_payload(self, payload, *args):
|
||||
log.error("publisher - publish payload")
|
||||
return await self.pub_server.publish_payload(payload)
|
||||
|
||||
def connect(self):
|
||||
|
@ -1448,6 +1455,7 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
|
|||
Publish "load" to minions
|
||||
"""
|
||||
if not self.pub_sock:
|
||||
log.error("CONNECT")
|
||||
self.connect()
|
||||
# if self.opts.get("ipc_mode", "") == "tcp":
|
||||
# pull_uri = int(self.opts.get("tcp_master_publish_pull", 4514))
|
||||
|
@ -1461,6 +1469,7 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
|
|||
# )
|
||||
# self.pub_sock.connect()
|
||||
# await self.pub_sock.send(payload)
|
||||
log.error("publish payload")
|
||||
self.pub_sock.send(payload)
|
||||
|
||||
def close(self):
|
||||
|
@ -1694,6 +1703,7 @@ class TCPReqClient(salt.transport.base.RequestClient):
|
|||
self._connecting_future = tornado.concurrent.Future()
|
||||
self._stream_return_running = False
|
||||
self._stream = None
|
||||
self.disconnect_callback = None
|
||||
|
||||
async def getstream(self, **kwargs):
|
||||
if self.source_ip or self.source_port:
|
||||
|
|
|
@ -254,6 +254,8 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
# TODO: this is the time to see if we are connected, maybe use the req channel to guess?
|
||||
async def connect(self, port=None, connect_callback=None, disconnect_callback=None):
|
||||
self.connect_called = True
|
||||
if port is not None:
|
||||
self.port = port
|
||||
if self.path:
|
||||
pub_uri = f"ipc://{self.path}"
|
||||
log.debug("Connecting the publisher client to: %s", pub_uri)
|
||||
|
@ -635,8 +637,7 @@ class AsyncReqMessageClient:
|
|||
self.io_loop = tornado.ioloop.IOLoop.current()
|
||||
else:
|
||||
self.io_loop = io_loop
|
||||
|
||||
self.context = zmq.asyncio.Context()
|
||||
self.context = None
|
||||
|
||||
self.send_queue = []
|
||||
# mapping of message -> future
|
||||
|
@ -644,7 +645,7 @@ class AsyncReqMessageClient:
|
|||
|
||||
self._closing = False
|
||||
self.socket = None
|
||||
self.sending = False
|
||||
self.sending = asyncio.Lock()
|
||||
|
||||
async def connect(self):
|
||||
if self.socket is None:
|
||||
|
@ -658,12 +659,15 @@ class AsyncReqMessageClient:
|
|||
self._closing = True
|
||||
if self.socket:
|
||||
self.socket.close()
|
||||
self.socket = None
|
||||
if self.context.closed is False:
|
||||
# This hangs if closing the stream causes an import error
|
||||
self.context.term()
|
||||
self.context = None
|
||||
|
||||
def _init_socket(self):
|
||||
if self.socket is not None:
|
||||
self.context = zmq.asyncio.Context()
|
||||
self.socket.close() # pylint: disable=E0203
|
||||
del self.socket
|
||||
|
||||
|
@ -698,28 +702,28 @@ class AsyncReqMessageClient:
|
|||
future.set_exception(SaltReqTimeoutError("Message timed out"))
|
||||
|
||||
async def _send_recv(self, message):
|
||||
if not self.socket:
|
||||
await self.connect()
|
||||
message = salt.payload.dumps(message)
|
||||
await self.socket.send(message)
|
||||
ret = await self.socket.recv()
|
||||
data = salt.payload.loads(ret)
|
||||
return data
|
||||
return salt.payload.loads(ret)
|
||||
|
||||
async def send(self, message, timeout=None, callback=None):
|
||||
"""
|
||||
Return a future which will be completed when the message has a response
|
||||
"""
|
||||
while self.sending:
|
||||
await asyncio.sleep(0.03)
|
||||
self.sending = True
|
||||
if not self.socket:
|
||||
await self.connect()
|
||||
await self.sending.acquire()
|
||||
try:
|
||||
response = await asyncio.wait_for(self._send_recv(message), timeout=timeout)
|
||||
if callback:
|
||||
callback(response)
|
||||
return response
|
||||
except TimeoutError:
|
||||
self.close()
|
||||
raise
|
||||
finally:
|
||||
self.sending = False
|
||||
self.sending.release()
|
||||
|
||||
|
||||
class ZeroMQSocketMonitor:
|
||||
|
@ -1043,24 +1047,87 @@ class RequestClient(salt.transport.base.RequestClient):
|
|||
|
||||
ttype = "zeromq"
|
||||
|
||||
def __init__(self, opts, io_loop): # pylint: disable=W0231
|
||||
def __init__(self, opts, io_loop, linger=0): # pylint: disable=W0231
|
||||
self.opts = opts
|
||||
master_uri = self.get_master_uri(opts)
|
||||
self.message_client = AsyncReqMessageClient(
|
||||
self.opts,
|
||||
master_uri,
|
||||
io_loop=io_loop,
|
||||
)
|
||||
self.master_uri = self.get_master_uri(opts)
|
||||
self.linger = linger
|
||||
if io_loop is None:
|
||||
self.io_loop = tornado.ioloop.IOLoop.current()
|
||||
else:
|
||||
self.io_loop = io_loop
|
||||
self.context = None
|
||||
self.send_queue = []
|
||||
# mapping of message -> future
|
||||
self.send_future_map = {}
|
||||
self._closing = False
|
||||
self.socket = None
|
||||
self.sending = asyncio.Lock()
|
||||
|
||||
async def connect(self):
|
||||
await self.message_client.connect()
|
||||
if self.socket is None:
|
||||
# wire up sockets
|
||||
self._init_socket()
|
||||
|
||||
async def send(self, load, timeout=60):
|
||||
await self.connect()
|
||||
return await self.message_client.send(load, timeout=timeout)
|
||||
def _init_socket(self):
|
||||
if self.socket is not None:
|
||||
self.context = zmq.asyncio.Context()
|
||||
self.socket.close() # pylint: disable=E0203
|
||||
del self.socket
|
||||
self.context = zmq.asyncio.Context()
|
||||
self.socket = self.context.socket(zmq.REQ)
|
||||
self.socket.setsockopt(zmq.LINGER, -1)
|
||||
|
||||
# socket options
|
||||
if hasattr(zmq, "RECONNECT_IVL_MAX"):
|
||||
self.socket.setsockopt(zmq.RECONNECT_IVL_MAX, 5000)
|
||||
|
||||
_set_tcp_keepalive(self.socket, self.opts)
|
||||
if self.master_uri.startswith("tcp://["):
|
||||
# Hint PF type if bracket enclosed IPv6 address
|
||||
if hasattr(zmq, "IPV6"):
|
||||
self.socket.setsockopt(zmq.IPV6, 1)
|
||||
elif hasattr(zmq, "IPV4ONLY"):
|
||||
self.socket.setsockopt(zmq.IPV4ONLY, 0)
|
||||
self.socket.linger = self.linger
|
||||
self.socket.connect(self.master_uri)
|
||||
|
||||
# TODO: timeout all in-flight sessions, or error
|
||||
def close(self):
|
||||
self.message_client.close()
|
||||
if self._closing:
|
||||
return
|
||||
self._closing = True
|
||||
if self.socket:
|
||||
self.socket.close()
|
||||
self.socket = None
|
||||
if self.context.closed is False:
|
||||
# This hangs if closing the stream causes an import error
|
||||
self.context.term()
|
||||
self.context = None
|
||||
|
||||
async def _send_recv(self, message):
|
||||
message = salt.payload.dumps(message)
|
||||
await self.socket.send(message)
|
||||
ret = await self.socket.recv()
|
||||
return salt.payload.loads(ret)
|
||||
|
||||
async def send(self, message, timeout=None, callback=None):
|
||||
"""
|
||||
Return a future which will be completed when the message has a response
|
||||
"""
|
||||
if not self.socket:
|
||||
await self.connect()
|
||||
await self.sending.acquire()
|
||||
try:
|
||||
response = await asyncio.wait_for(self._send_recv(message), timeout=timeout)
|
||||
if callback:
|
||||
callback(response)
|
||||
return response
|
||||
except TimeoutError:
|
||||
self.close()
|
||||
except Exception:
|
||||
self.close()
|
||||
finally:
|
||||
self.sending.release()
|
||||
|
||||
@staticmethod
|
||||
def get_master_uri(opts):
|
||||
|
|
|
@ -137,7 +137,11 @@ def _connect_and_publish(
|
|||
io_loop.stop()
|
||||
|
||||
channel.on_recv(cb)
|
||||
server.publish({"tgt_type": "glob", "tgt": [channel_minion_id], "WTF": "SON"})
|
||||
log.error("TEST - RUN PUBLISH")
|
||||
io_loop.spawn_callback(
|
||||
server.publish, {"tgt_type": "glob", "tgt": [channel_minion_id], "WTF": "SON"}
|
||||
)
|
||||
# server.publish({"tgt_type": "glob", "tgt": [channel_minion_id], "WTF": "SON"})
|
||||
start = time.time()
|
||||
while time.time() - start < timeout:
|
||||
yield tornado.gen.sleep(1)
|
||||
|
|
Loading…
Add table
Reference in a new issue