mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
138 lines
3.9 KiB
Python
138 lines
3.9 KiB
Python
import logging
|
|
import socket
|
|
|
|
import attr
|
|
import tornado.escape
|
|
import tornado.web
|
|
from tornado import netutil
|
|
from tornado.httpclient import AsyncHTTPClient, HTTPError
|
|
from tornado.httpserver import HTTPServer
|
|
from tornado.ioloop import TimeoutError as IOLoopTimeoutError
|
|
|
|
import salt.auth
|
|
from salt.netapi.rest_tornado import saltnado
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
@attr.s
|
|
class TestsHttpClient:
|
|
address = attr.ib()
|
|
io_loop = attr.ib(repr=False)
|
|
headers = attr.ib(default=None)
|
|
client = attr.ib(init=False, repr=False)
|
|
|
|
@client.default
|
|
def _client_default(self):
|
|
return AsyncHTTPClient(self.io_loop)
|
|
|
|
async def fetch(self, path, **kwargs):
|
|
if "headers" not in kwargs and self.headers:
|
|
kwargs["headers"] = self.headers.copy()
|
|
try:
|
|
response = await self.client.fetch(f"{self.address}{path}", **kwargs)
|
|
return self._decode_body(response)
|
|
except HTTPError as exc:
|
|
exc.response = self._decode_body(exc.response)
|
|
raise
|
|
|
|
def _decode_body(self, response):
|
|
if response is None:
|
|
return response
|
|
if response.body:
|
|
# Decode it
|
|
if response.headers.get("Content-Type") == "application/json":
|
|
response._body = response.body.decode("utf-8")
|
|
else:
|
|
response._body = tornado.escape.native_str(response.body)
|
|
return response
|
|
|
|
|
|
@attr.s
|
|
class TestsTornadoHttpServer:
|
|
io_loop = attr.ib(repr=False)
|
|
app = attr.ib()
|
|
port = attr.ib(repr=False)
|
|
protocol = attr.ib(default="http", repr=False)
|
|
http_server_options = attr.ib(default=attr.Factory(dict))
|
|
sock = attr.ib(init=False, repr=False)
|
|
address = attr.ib(init=False)
|
|
server = attr.ib(init=False)
|
|
client_headers = attr.ib(default=None)
|
|
client = attr.ib(init=False, repr=False)
|
|
|
|
@sock.default
|
|
def _sock_default(self):
|
|
return netutil.bind_sockets(
|
|
self.port, "127.0.0.1", family=socket.AF_INET, reuse_port=False
|
|
)[0]
|
|
|
|
@port.default
|
|
def _port_default(self):
|
|
return self.sock.getsockname()[1]
|
|
|
|
@address.default
|
|
def _address_default(self):
|
|
return f"{self.protocol}://127.0.0.1:{self.port}"
|
|
|
|
@server.default
|
|
def _server_default(self):
|
|
server = HTTPServer(self.app, **self.http_server_options)
|
|
server.add_sockets([self.sock])
|
|
return server
|
|
|
|
@client.default
|
|
def _client_default(self):
|
|
return TestsHttpClient(
|
|
address=self.address, io_loop=self.io_loop, headers=self.client_headers
|
|
)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *_):
|
|
self.server.stop()
|
|
try:
|
|
self.io_loop.run_sync(self.server.close_all_connections, timeout=10)
|
|
except IOLoopTimeoutError:
|
|
pass
|
|
self.client.client.close()
|
|
|
|
|
|
def load_auth(client_config):
|
|
return salt.auth.LoadAuth(client_config)
|
|
|
|
|
|
def auth_token(load_auth, auth_creds):
|
|
"""
|
|
Mint and return a valid token for auth_creds
|
|
"""
|
|
return load_auth.mk_token(auth_creds)
|
|
|
|
|
|
def build_tornado_app(
|
|
urls, load_auth, client_config, minion_config, setup_event_listener=False
|
|
):
|
|
application = tornado.web.Application(urls, debug=True)
|
|
|
|
application.auth = load_auth
|
|
application.opts = client_config
|
|
application.mod_opts = minion_config
|
|
if setup_event_listener:
|
|
application.event_listener = saltnado.EventListener(
|
|
minion_config, client_config
|
|
)
|
|
return application
|
|
|
|
|
|
def content_type_map():
|
|
return {
|
|
"json": "application/json",
|
|
"json-utf8": "application/json; charset=utf-8",
|
|
"yaml": "application/x-yaml",
|
|
"text": "text/plain",
|
|
"form": "application/x-www-form-urlencoded",
|
|
"xml": "application/xml",
|
|
"real-accept-header-json": "application/json, text/javascript, */*; q=0.01",
|
|
"real-accept-header-yaml": "application/x-yaml, text/yaml, */*; q=0.01",
|
|
}
|