mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Fix up basic ping between master and minion
This commit is contained in:
parent
836b5421f7
commit
74195b8306
3 changed files with 34 additions and 50 deletions
|
@ -132,7 +132,7 @@ def publish_client(opts, io_loop, host=None, port=None, path=None, transport=Non
|
|||
return salt.transport.tcp.TCPPubClient(
|
||||
opts, io_loop, host=host, port=port, path=path
|
||||
)
|
||||
elif ttype == "tcp":
|
||||
elif ttype == "ws":
|
||||
import salt.transport.ws
|
||||
|
||||
return salt.transport.ws.PublishClient(
|
||||
|
|
|
@ -26,6 +26,7 @@ import tornado.iostream
|
|||
import tornado.netutil
|
||||
import tornado.tcpclient
|
||||
import tornado.tcpserver
|
||||
import tornado.util
|
||||
|
||||
import salt.master
|
||||
import salt.payload
|
||||
|
@ -34,6 +35,7 @@ import salt.utils.asynchronous
|
|||
import salt.utils.files
|
||||
import salt.utils.msgpack
|
||||
import salt.utils.platform
|
||||
import salt.utils.process
|
||||
import salt.utils.versions
|
||||
from salt.exceptions import SaltClientError, SaltReqTimeoutError
|
||||
from salt.utils.network import ip_bracket
|
||||
|
@ -44,10 +46,6 @@ if salt.utils.platform.is_windows():
|
|||
else:
|
||||
USE_LOAD_BALANCER = False
|
||||
|
||||
if USE_LOAD_BALANCER:
|
||||
import tornado.util
|
||||
|
||||
from salt.utils.process import SignalHandlingProcess
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -136,7 +134,7 @@ def _set_tcp_keepalive(sock, opts):
|
|||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 0)
|
||||
|
||||
|
||||
class LoadBalancerServer(SignalHandlingProcess):
|
||||
class LoadBalancerServer(salt.utils.process.SignalHandlingProcess):
|
||||
"""
|
||||
Raw TCP server which runs in its own process and will listen
|
||||
for incoming connections. Each incoming connection will be
|
||||
|
|
|
@ -172,7 +172,7 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
try:
|
||||
await self._read_in_progress.acquire(timeout=0.001)
|
||||
except tornado.gen.TimeoutError:
|
||||
log.error("Timeout Error")
|
||||
log.error("Unable to acquire read lock")
|
||||
return
|
||||
try:
|
||||
if timeout == 0:
|
||||
|
@ -213,10 +213,8 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
if raw_msg.data == "close":
|
||||
await self._ws.close()
|
||||
if raw_msg.type == aiohttp.WSMsgType.BINARY:
|
||||
log.error("ORIG MSG IS %r", raw_msg.data)
|
||||
self.unpacker.feed(raw_msg.data)
|
||||
for msg in self.unpacker:
|
||||
log.error("MSG IS %r", msg)
|
||||
framed_msg = salt.transport.frame.decode_embedded_strs(msg)
|
||||
return framed_msg["body"]
|
||||
elif raw_msg.type == aiohttp.WSMsgType.ERROR:
|
||||
|
@ -231,14 +229,8 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
while not self._ws:
|
||||
await asyncio.sleep(0.003)
|
||||
while True:
|
||||
try:
|
||||
msg = await self.recv()
|
||||
except Exception: # pylint: disable=broad-except
|
||||
# XXX
|
||||
log.error("Other exception", exc_info=True)
|
||||
else:
|
||||
log.error("on recv got msg %r", msg)
|
||||
callback(msg)
|
||||
msg = await self.recv()
|
||||
callback(msg)
|
||||
|
||||
def on_recv(self, callback):
|
||||
"""
|
||||
|
@ -376,9 +368,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
|
|||
|
||||
async def handle_request(self, request):
|
||||
ws = aiohttp.web.WebSocketResponse()
|
||||
log.error("perpare request")
|
||||
await ws.prepare(request)
|
||||
log.error("request prepared")
|
||||
self.clients.add(ws)
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
|
@ -387,7 +377,11 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
|
|||
log.debug("Connect pusher %s", self.pull_path)
|
||||
self.pub_sock = salt.utils.asynchronous.SyncWrapper(
|
||||
_TCPPubServerPublisher,
|
||||
(self.pull_path,),
|
||||
(
|
||||
self.pull_host,
|
||||
self.pull_port,
|
||||
self.pull_path,
|
||||
),
|
||||
loop_kwarg="io_loop",
|
||||
)
|
||||
self.pub_sock.connect()
|
||||
|
@ -404,7 +398,6 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
|
|||
payload = salt.transport.frame.frame_msg(package)
|
||||
for ws in list(self.clients):
|
||||
try:
|
||||
log.error("Publish package %r %r", ws, payload)
|
||||
await ws.send_bytes(payload)
|
||||
except ConnectionResetError:
|
||||
self.clients.discard(ws)
|
||||
|
@ -467,25 +460,18 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer):
|
|||
io_loop.spawn_callback(server)
|
||||
|
||||
async def handle_message(self, request):
|
||||
try:
|
||||
ws = aiohttp.web.WebSocketResponse()
|
||||
await ws.prepare(request)
|
||||
async for msg in ws:
|
||||
log.error("got msg %r", msg)
|
||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||
if msg.data == "close":
|
||||
await ws.close()
|
||||
if msg.type == aiohttp.WSMsgType.BINARY:
|
||||
payload = salt.payload.loads(msg.data)
|
||||
log.error("Handle message got %r", payload)
|
||||
reply = await self.message_handler(payload)
|
||||
log.error("Handle message reply %r", reply)
|
||||
await ws.send_bytes(salt.payload.dumps(reply))
|
||||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||||
log.error("ws connection closed with exception %s", ws.exception())
|
||||
except Exception: # pylint: disable=broad-except
|
||||
# XXX
|
||||
log.error("Message handler", exc_info=True)
|
||||
ws = aiohttp.web.WebSocketResponse()
|
||||
await ws.prepare(request)
|
||||
async for msg in ws:
|
||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||
if msg.data == "close":
|
||||
await ws.close()
|
||||
if msg.type == aiohttp.WSMsgType.BINARY:
|
||||
payload = salt.payload.loads(msg.data)
|
||||
reply = await self.message_handler(payload)
|
||||
await ws.send_bytes(salt.payload.dumps(reply))
|
||||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||||
log.error("ws connection closed with exception %s", ws.exception())
|
||||
|
||||
def close(self):
|
||||
self._run.clear()
|
||||
|
@ -505,13 +491,13 @@ class RequestClient(salt.transport.base.RequestClient):
|
|||
self._closed = False
|
||||
|
||||
async def connect(self):
|
||||
if self.session is None:
|
||||
self.session = aiohttp.ClientSession()
|
||||
URL = self.get_master_uri(self.opts)
|
||||
self.ws = await self.session.ws_connect(URL)
|
||||
# if self.session is None:
|
||||
self.session = aiohttp.ClientSession()
|
||||
URL = self.get_master_uri(self.opts)
|
||||
self.ws = await self.session.ws_connect(URL)
|
||||
|
||||
async def send(self, load, timeout=60):
|
||||
if self.sending:
|
||||
if self.sending or self._closing:
|
||||
await asyncio.sleep(0.03)
|
||||
self.sending = True
|
||||
try:
|
||||
|
@ -519,7 +505,6 @@ class RequestClient(salt.transport.base.RequestClient):
|
|||
message = salt.payload.dumps(load)
|
||||
await self.ws.send_bytes(message)
|
||||
async for msg in self.ws:
|
||||
log.error("Got MSG %r", msg)
|
||||
if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
|
||||
break
|
||||
data = salt.payload.loads(msg.data)
|
||||
|
@ -531,27 +516,28 @@ class RequestClient(salt.transport.base.RequestClient):
|
|||
async def _close(self):
|
||||
if self.ws is not None:
|
||||
await self.ws.close()
|
||||
self.ws = None
|
||||
if self.session is not None:
|
||||
await self.session.close()
|
||||
self.session = None
|
||||
self._closed = True
|
||||
|
||||
def close(self):
|
||||
if self._closing:
|
||||
return
|
||||
self._closing = True
|
||||
self.io_loop.spawn_callback(self._close)
|
||||
self.close_task = asyncio.create_task(self._close())
|
||||
|
||||
@staticmethod
|
||||
def get_master_uri(opts):
|
||||
if "master_uri" in opts:
|
||||
return opts["master_uri"]
|
||||
return opts["master_uri"].replace("tcp:", "http:", 1)
|
||||
return f"http://{opts['master_ip']}:{opts['master_port']}/ws"
|
||||
|
||||
# pylint: disable=W1701
|
||||
def __del__(self):
|
||||
if not self._closing:
|
||||
warnings.warn(
|
||||
"unclosed publish client {self!r}", ResourceWarning, source=self
|
||||
"Unclosed publish client {self!r}", ResourceWarning, source=self
|
||||
)
|
||||
|
||||
# pylint: enable=W1701
|
||||
|
|
Loading…
Add table
Reference in a new issue