Clean up some docs and test fix

This commit is contained in:
Daniel A. Wozniak 2021-10-06 22:07:40 -07:00 committed by Gareth J. Greenaway
parent 198cd9207f
commit 18d5cff107
3 changed files with 67 additions and 38 deletions

View file

@ -120,6 +120,9 @@ class RequestClient:
replies from the RequestServer.
"""
def __init__(self, opts, io_loop, **kwargs):
pass
@salt.ext.tornado.gen.coroutine
def send(self, load, tries=3, timeout=60):
"""
@ -144,9 +147,14 @@ class RequestServer:
RequestClients and sending replies to those requests.
"""
def close(self):
def __init__(self, opts):
pass
def close(self):
"""
Close the underlying network connection.
"""
def pre_fork(self, process_manager):
""" """
@ -160,7 +168,8 @@ class RequestServer:
class PublishServer:
"""
The PublishServer publishes messages to PubilshClients
The PublishServer publishes messages to PublishClients or to a borker
service.
"""
def pre_fork(self, process_manager, kwargs=None):
@ -190,6 +199,13 @@ class PublishServer:
class PublishClient:
"""
The PublishClient receives messages from the PublishServer and runs a callback.
"""
def __init__(self, opts, io_loop, **kwargs):
pass
def on_recv(self, callback):
"""
Add a message handler when we recieve a message from the PublishServer
@ -197,4 +213,17 @@ class PublishClient:
@salt.ext.tornado.gen.coroutine
def connect(self, publish_port, connect_callback=None, disconnect_callback=None):
""" """
"""
Create a network connection to the the PublishServer or broker.
"""
def close(self):
"""
Close the underlying network connection
"""
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass

View file

@ -179,7 +179,7 @@ if USE_LOAD_BALANCER:
raise
class ResolverMixin:
class Resolver:
_resolver_configured = False
@ -198,25 +198,20 @@ class ResolverMixin:
self._config_resolver()
class TCPPubClient(ResolverMixin):
async_methods = [
"send_id",
"connect_callback",
"connect",
]
close_methods = [
"close",
]
class TCPPubClient(salt.transport.base.PublishClient):
"""
Tornado based TCP Pub Client
"""
ttype = "tcp"
def __init__(self, opts, io_loop, **kwargs):
super().__init__()
def __init__(self, opts, io_loop, **kwargs): # pylint: disable=W0231
self.opts = opts
self.io_loop = io_loop
self.message_client = None
self.connected = False
self._closing = False
self.resolver = Resolver()
def close(self):
if self._closing:
@ -272,16 +267,21 @@ class TCPPubClient(ResolverMixin):
def __enter__(self):
return self
def __exit__(self, *args):
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class TCPReqServer(salt.transport.base.RequestServer):
"""
Tornado based TCP Request/Reply Server
:param dict opts: Salt master config options.
"""
# TODO: opts!
backlog = 5
def __init__(self, opts):
def __init__(self, opts): # pylint: disable=W0231
self.opts = opts
self._socket = None
self.req_server = None
@ -320,14 +320,8 @@ class TCPReqServer(salt.transport.base.RequestServer):
)
self.req_server = None
## pylint: disable=W1701
# def __del__(self):
# self.close()
## pylint: enable=W1701
# def __enter__(self):
# return self
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
@ -948,6 +942,10 @@ class PubServer(salt.ext.tornado.tcpserver.TCPServer):
class TCPPublishServer(salt.transport.base.PublishServer):
"""
Tornado based TCP PublishServer
"""
# TODO: opts!
# Based on default used in salt.ext.tornado.netutil.bind_sockets()
backlog = 128
@ -1064,18 +1062,20 @@ class TCPPublishServer(salt.transport.base.PublishServer):
self.pub_sock = None
class TCPReqClient(ResolverMixin):
class TCPReqClient(salt.transport.base.RequestClient):
"""
Tornado based TCP RequestClient
"""
ttype = "tcp"
def __init__(self, opts, io_loop, **kwargs):
super().__init__()
def __init__(self, opts, io_loop, **kwargs): # pylint: disable=W0231
self.opts = opts
# self.master_uri = master_uri
self.io_loop = io_loop
parse = urllib.parse.urlparse(self.opts["master_uri"])
master_host, master_port = parse.netloc.rsplit(":", 1)
master_addr = (master_host, int(master_port))
# self.resolver = Resolver()
resolver = kwargs.get("resolver")
self.message_client = salt.transport.tcp.MessageClient(
opts,
@ -1088,8 +1088,8 @@ class TCPReqClient(ResolverMixin):
)
@salt.ext.tornado.gen.coroutine
def send(self, message, **kwargs):
ret = yield self.message_client.send(message, **kwargs)
def send(self, load, tries=3, timeout=60):
ret = yield self.message_client.send(load, tries=3, timeout=60)
raise salt.ext.tornado.gen.Return(ret)
def close(self):

View file

@ -106,7 +106,7 @@ class PublishClient(salt.transport.base.PublishClient):
ttype = "zeromq"
def __init__(self, opts, io_loop, **kwargs):
def __init__(self, opts, io_loop, **kwargs): # pylint: disable=W0231
self.opts = opts
self.io_loop = io_loop
self.hexid = hashlib.sha1(
@ -198,7 +198,7 @@ class PublishClient(salt.transport.base.PublishClient):
def __enter__(self):
return self
def __exit__(self, *args):
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
# TODO: this is the time to see if we are connected, maybe use the req channel to guess?
@ -284,7 +284,7 @@ class PublishClient(salt.transport.base.PublishClient):
class RequestServer(salt.transport.base.RequestServer):
def __init__(self, opts):
def __init__(self, opts): # pylint: disable=W0231
self.opts = opts
self._closing = False
self._monitor = None
@ -728,7 +728,7 @@ class PublishServer(salt.transport.base.PublishServer):
ioloop = salt.ext.tornado.ioloop.IOLoop()
ioloop.make_current()
self.io_loop = ioloop
context = self.context = zmq.Context(1)
context = zmq.Context(1)
pub_sock = context.socket(zmq.PUB)
monitor = ZeroMQSocketMonitor(pub_sock)
monitor.start_io_loop(ioloop)
@ -855,7 +855,7 @@ class PublishServer(salt.transport.base.PublishServer):
"""
if self.pub_sock:
self.pub_close()
ctx = zmq.Context.instance()
ctx = zmq.Context()
self._sock_data.sock = ctx.socket(zmq.PUSH)
self.pub_sock.setsockopt(zmq.LINGER, -1)
if self.opts.get("ipc_mode", "") == "tcp":
@ -902,7 +902,7 @@ class PublishServer(salt.transport.base.PublishServer):
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@ -910,7 +910,7 @@ class RequestClient(salt.transport.base.RequestClient):
ttype = "zeromq"
def __init__(self, opts, io_loop):
def __init__(self, opts, io_loop): # pylint: disable=W0231
self.opts = opts
master_uri = self.get_master_uri(opts)
self.message_client = AsyncReqMessageClient(