mirror of
https://github.com/saltstack/salt.git
synced 2025-04-16 17:50:20 +00:00
Fix up pre-commit (linter)
This commit is contained in:
parent
e36d67e83b
commit
836b5421f7
2 changed files with 30 additions and 21 deletions
|
@ -699,7 +699,8 @@ allowed-3rd-party-modules=msgpack,
|
|||
packaging,
|
||||
looseversion,
|
||||
pytestskipmarkers,
|
||||
cryptography
|
||||
cryptography,
|
||||
aiohttp
|
||||
|
||||
[EXCEPTIONS]
|
||||
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
import asyncio
|
||||
import logging
|
||||
import multiprocessing
|
||||
import socket
|
||||
import time
|
||||
import warnings
|
||||
|
||||
import aiohttp
|
||||
import aiohttp.web
|
||||
|
@ -11,14 +14,13 @@ import salt.payload
|
|||
import salt.transport.base
|
||||
from salt.transport.tcp import (
|
||||
USE_LOAD_BALANCER,
|
||||
LoadBalancer,
|
||||
LoadBalancerServer,
|
||||
TCPPuller,
|
||||
_get_bind_addr,
|
||||
_get_socket,
|
||||
_set_tcp_keepalive,
|
||||
_TCPPubServerPublisher,
|
||||
)
|
||||
from salt.utils.network import ip_bracket
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -90,17 +92,19 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
|
||||
# pylint: disable=W1701
|
||||
def __del__(self):
|
||||
self.close()
|
||||
if not self._closing:
|
||||
warnings.warn(
|
||||
"unclosed publish client {self!r}", ResourceWarning, source=self
|
||||
)
|
||||
|
||||
# pylint: enable=W1701
|
||||
|
||||
async def getstream(self, **kwargs):
|
||||
if self.source_ip or self.source_port:
|
||||
kwargs = {
|
||||
"source_ip": self.source_ip,
|
||||
"source_port": self.source_port,
|
||||
}
|
||||
kwargs.update(source_ip=self.source_ip, source_port=self.source_port)
|
||||
ws = None
|
||||
start = time.monotonic()
|
||||
timeout = kwargs.get("timeout", None)
|
||||
while ws is None and (not self._closed and not self._closing):
|
||||
try:
|
||||
if self.host and self.port:
|
||||
|
@ -111,7 +115,7 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
conn = aiohttp.UnixConnector(path=self.path)
|
||||
session = aiohttp.ClientSession(connector=conn)
|
||||
url = f"http://ipc.saltproject.io/ws"
|
||||
ws = await session.ws_connect(url)
|
||||
ws = await asyncio.wait_for(session.ws_connect(url), 1)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
log.warning(
|
||||
"WS Message Client encountered an exception while connecting to"
|
||||
|
@ -122,12 +126,14 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
exc,
|
||||
self.backoff,
|
||||
)
|
||||
if timeout and time.monotonic() - start > timeout:
|
||||
break
|
||||
await asyncio.sleep(self.backoff)
|
||||
return ws, session
|
||||
|
||||
async def _connect(self):
|
||||
async def _connect(self, timeout=None):
|
||||
if self._ws is None:
|
||||
self._ws, self._session = await self.getstream()
|
||||
self._ws, self._session = await self.getstream(timeout=timeout)
|
||||
# if not self._stream_return_running:
|
||||
# self.io_loop.spawn_callback(self._stream_return)
|
||||
if self.connect_callback:
|
||||
|
@ -139,7 +145,7 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
port=None,
|
||||
connect_callback=None,
|
||||
disconnect_callback=None,
|
||||
background=False,
|
||||
timeout=None,
|
||||
):
|
||||
if port is not None:
|
||||
self.port = port
|
||||
|
@ -147,10 +153,7 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
self.connect_callback = None
|
||||
if disconnect_callback:
|
||||
self.disconnect_callback = None
|
||||
if background:
|
||||
self.io_loop.spawn_callback(self._connect)
|
||||
else:
|
||||
await self._connect()
|
||||
await self._connect(timeout=timeout)
|
||||
|
||||
def _decode_messages(self, messages):
|
||||
if not isinstance(messages, dict):
|
||||
|
@ -192,7 +195,9 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
framed_msg = salt.transport.frame.decode_embedded_strs(msg)
|
||||
return framed_msg["body"]
|
||||
elif raw_msg.type == aiohttp.WSMsgType.ERROR:
|
||||
log.error("ws connection closed with exception %s", ws.exception())
|
||||
log.error(
|
||||
"ws connection closed with exception %s", self._ws.exception()
|
||||
)
|
||||
elif timeout:
|
||||
return await asyncio.wait_for(self.recv(), timeout=timeout)
|
||||
else:
|
||||
|
@ -216,7 +221,8 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
return framed_msg["body"]
|
||||
elif raw_msg.type == aiohttp.WSMsgType.ERROR:
|
||||
log.error(
|
||||
"ws connection closed with exception %s", ws.exception()
|
||||
"ws connection closed with exception %s",
|
||||
self._ws.exception(),
|
||||
)
|
||||
finally:
|
||||
self._read_in_progress.release()
|
||||
|
@ -227,7 +233,8 @@ class PublishClient(salt.transport.base.PublishClient):
|
|||
while True:
|
||||
try:
|
||||
msg = await self.recv()
|
||||
except Exception:
|
||||
except Exception: # pylint: disable=broad-except
|
||||
# XXX
|
||||
log.error("Other exception", exc_info=True)
|
||||
else:
|
||||
log.error("on recv got msg %r", msg)
|
||||
|
@ -411,7 +418,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
|
|||
|
||||
|
||||
class RequestServer(salt.transport.base.DaemonizedRequestServer):
|
||||
def __init__(self, opts):
|
||||
def __init__(self, opts): # pylint: disable=W0231
|
||||
self.opts = opts
|
||||
self.site = None
|
||||
|
||||
|
@ -476,7 +483,8 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer):
|
|||
await ws.send_bytes(salt.payload.dumps(reply))
|
||||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||||
log.error("ws connection closed with exception %s", ws.exception())
|
||||
except Exception:
|
||||
except Exception: # pylint: disable=broad-except
|
||||
# XXX
|
||||
log.error("Message handler", exc_info=True)
|
||||
|
||||
def close(self):
|
||||
|
|
Loading…
Add table
Reference in a new issue