add tcp transport support for ipv6-only hosts

This commit is contained in:
Sergey Kacheev 2022-04-30 02:08:34 +07:00 committed by Megan Wilhite
parent 3dbbdc8393
commit cf6e8c7a0f
12 changed files with 69 additions and 57 deletions

View file

@ -29,7 +29,7 @@ import salt.utils.files
import salt.utils.minions
import salt.utils.user
import salt.utils.versions
import salt.utils.zeromq
import salt.utils.network
log = logging.getLogger(__name__)
@ -508,7 +508,7 @@ class Resolver:
def _send_token_request(self, load):
master_uri = "tcp://{}:{}".format(
salt.utils.zeromq.ip_bracket(self.opts["interface"]),
salt.utils.network.ip_bracket(self.opts["interface"]),
str(self.opts["ret_port"]),
)
with salt.channel.client.ReqChannel.factory(

View file

@ -45,7 +45,7 @@ warnings.filterwarnings(
try:
import salt.utils.parsers
from salt.utils.verify import check_user, verify_env, verify_socket
from salt.utils.zeromq import ip_bracket
from salt.utils.network import ip_bracket
except ImportError as exc:
if exc.args[0] != "No module named _msgpack":
raise

View file

@ -42,7 +42,7 @@ import salt.utils.platform
import salt.utils.stringutils
import salt.utils.user
import salt.utils.verify
import salt.utils.zeromq
import salt.utils.network
from salt.exceptions import (
AuthenticationError,
AuthorizationError,
@ -1886,7 +1886,7 @@ class LocalClient:
)
master_uri = "tcp://{}:{}".format(
salt.utils.zeromq.ip_bracket(self.opts["interface"]),
salt.utils.network.ip_bracket(self.opts["interface"]),
str(self.opts["ret_port"]),
)
@ -1989,7 +1989,7 @@ class LocalClient:
master_uri = (
"tcp://"
+ salt.utils.zeromq.ip_bracket(self.opts["interface"])
+ salt.utils.network.ip_bracket(self.opts["interface"])
+ ":"
+ str(self.opts["ret_port"])
)

View file

@ -27,7 +27,6 @@ import salt.utils.user
import salt.utils.validate.path
import salt.utils.xdg
import salt.utils.yaml
import salt.utils.zeromq
from salt._logging import (
DFLT_LOG_DATEFMT,
DFLT_LOG_DATEFMT_LOGFILE,
@ -4094,11 +4093,13 @@ def client_config(path, env_var="SALT_CLIENT_CONFIG", defaults=None):
# On some platforms, like OpenBSD, 0.0.0.0 won't catch a master running on localhost
if opts["interface"] == "0.0.0.0":
opts["interface"] = "127.0.0.1"
elif opts["interface"] == "::":
opts["interface"] = "::1"
# Make sure the master_uri is set
if "master_uri" not in opts:
opts["master_uri"] = "tcp://{ip}:{port}".format(
ip=salt.utils.zeromq.ip_bracket(opts["interface"]), port=opts["ret_port"]
ip=salt.utils.network.ip_bracket(opts["interface"]), port=opts["ret_port"]
)
# Return the client options

View file

@ -14,7 +14,7 @@ import salt.channel.client
import salt.crypt
import salt.payload
import salt.utils.event
import salt.utils.zeromq
import salt.utils.network
__proxyenabled__ = ["*"]
log = logging.getLogger(__name__)
@ -49,7 +49,7 @@ def fire_master(data, tag, preload=None):
# slower because it has to independently authenticate)
if "master_uri" not in __opts__:
__opts__["master_uri"] = "tcp://{ip}:{port}".format(
ip=salt.utils.zeromq.ip_bracket(__opts__["interface"]),
ip=salt.utils.network.ip_bracket(__opts__["interface"]),
port=__opts__.get("ret_port", "4506"), # TODO, no fallback
)
masters = list()

View file

@ -33,6 +33,7 @@ import salt.utils.files
import salt.utils.msgpack
import salt.utils.platform
import salt.utils.versions
from salt.utils.network import ip_bracket
from salt.exceptions import SaltClientError, SaltReqTimeoutError
if salt.utils.platform.is_windows():
@ -41,7 +42,6 @@ else:
USE_LOAD_BALANCER = False
if USE_LOAD_BALANCER:
import threading
import multiprocessing
import salt.ext.tornado.util
from salt.utils.process import SignalHandlingProcess
@ -53,6 +53,20 @@ class ClosingError(Exception):
""" """
def _get_socket(opts):
family = socket.AF_INET
if opts.get("ipv6", False):
family = socket.AF_INET6
return socket.socket(family, socket.SOCK_STREAM)
def _get_bind_addr(opts, port_type):
return (
ip_bracket(opts["interface"], strip=True),
int(opts[port_type]),
)
def _set_tcp_keepalive(sock, opts):
"""
Ensure that TCP keepalives are set for the socket.
@ -152,11 +166,11 @@ if USE_LOAD_BALANCER:
"""
Start the load balancer
"""
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket = _get_socket(self.opts)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
_set_tcp_keepalive(self._socket, self.opts)
self._socket.setblocking(1)
self._socket.bind((self.opts["interface"], int(self.opts["ret_port"])))
self._socket.bind(_get_bind_addr(self.opts, "ret_port"))
self._socket.listen(self.backlog)
while True:
@ -337,11 +351,11 @@ class TCPReqServer(salt.transport.base.DaemonizedRequestServer):
name="LoadBalancerServer",
)
elif not salt.utils.platform.is_windows():
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket = _get_socket(self.opts)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
_set_tcp_keepalive(self._socket, self.opts)
self._socket.setblocking(0)
self._socket.bind((self.opts["interface"], int(self.opts["ret_port"])))
self._socket.bind(_get_bind_addr(self.opts, "ret_port"))
def post_fork(self, message_handler, io_loop):
"""
@ -361,13 +375,11 @@ class TCPReqServer(salt.transport.base.DaemonizedRequestServer):
)
else:
if salt.utils.platform.is_windows():
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket = _get_socket(self.opts)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
_set_tcp_keepalive(self._socket, self.opts)
self._socket.setblocking(0)
self._socket.bind(
(self.opts["interface"], int(self.opts["ret_port"]))
)
self._socket.bind(_get_bind_addr(self.opts, "ret_port"))
self.req_server = SaltMessageServer(
self.handle_message,
ssl_options=self.opts.get("ssl"),
@ -519,7 +531,7 @@ class TCPClientKeepAlive(salt.ext.tornado.tcpclient.TCPClient):
"""
# Always connect in plaintext; we'll convert to ssl if necessary
# after one connection has completed.
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = _get_socket(self.opts)
_set_tcp_keepalive(sock, self.opts)
stream = salt.ext.tornado.iostream.IOStream(
sock, max_buffer_size=max_buffer_size
@ -613,7 +625,10 @@ class MessageClient:
while stream is None and (not self._closed and not self._closing):
try:
stream = yield self._tcp_client.connect(
self.host, self.port, ssl_options=self.opts.get("ssl"), **kwargs
ip_bracket(self.host, strip=True),
self.port,
ssl_options=self.opts.get("ssl"),
**kwargs
)
except Exception as exc: # pylint: disable=broad-except
log.warning(
@ -970,11 +985,11 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
presence_callback=presence_callback,
remove_presence_callback=remove_presence_callback,
)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = _get_socket(self.opts)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
_set_tcp_keepalive(sock, self.opts)
sock.setblocking(0)
sock.bind((self.opts["interface"], int(self.opts["publish_port"])))
sock.bind(_get_bind_addr(self.opts, "publish_port"))
sock.listen(self.backlog)
# pub_server will take ownership of the socket
pub_server.add_socket(sock)

View file

@ -45,7 +45,7 @@ def _get_master_uri(master_ip, master_port, source_ip=None, source_port=None):
rc = zmq_connect(socket, "tcp://192.168.1.17:5555;192.168.1.1:5555"); assert (rc == 0);
Source: http://api.zeromq.org/4-1:zmq-tcp
"""
from salt.utils.zeromq import ip_bracket
from salt.utils.network import ip_bracket
master_uri = "tcp://{master_ip}:{master_port}".format(
master_ip=ip_bracket(master_ip), master_port=master_port

View file

@ -23,7 +23,6 @@ import salt.utils.files
import salt.utils.path
import salt.utils.platform
import salt.utils.stringutils
import salt.utils.zeromq
from salt._compat import ipaddress
from salt.exceptions import SaltClientError, SaltSystemExit
from salt.utils.decorators.jinja import jinja_filter
@ -2216,7 +2215,7 @@ def dns_check(addr, port, safe=False, ipv6=None):
raise SaltClientError()
raise SaltSystemExit(code=42, msg=err)
return salt.utils.zeromq.ip_bracket(ip_addrs[0])
return ip_bracket(ip_addrs[0])
def _test_addrs(addrinfo, port):
@ -2332,3 +2331,15 @@ def filter_by_networks(values, networks):
raise ValueError("Do not know how to filter a {}".format(type(values)))
else:
return values
def ip_bracket(addr, strip=False):
"""
Ensure IP addresses are URI-compatible - specifically, add brackets
around IPv6 literals if they are not already present.
"""
addr = str(addr)
addr = addr.lstrip("[")
addr = addr.rstrip("]")
addr = ipaddress.ip_address(addr)
return ("[{}]" if addr.version == 6 and not strip else "{}").format(addr)

View file

@ -4,7 +4,6 @@ ZMQ-specific functions
import logging
from salt._compat import ipaddress
from salt.exceptions import SaltSystemExit
log = logging.getLogger(__name__)
@ -41,15 +40,3 @@ def check_ipc_path_max_len(uri):
"path or switch to TCP; in the configuration file, "
'set "ipc_mode: tcp".'.format(uri, ipc_path_max_len)
)
def ip_bracket(addr):
"""
Ensure IP addresses are URI-compatible - specifically, add brackets
around IPv6 literals if they are not already present.
"""
addr = str(addr)
addr = addr.lstrip("[")
addr = addr.rstrip("]")
addr = ipaddress.ip_address(addr)
return ("[{}]" if addr.version == 6 else "{}").format(addr)

View file

@ -9,7 +9,7 @@ import salt.client.mixins
import salt.config
import salt.loader
import salt.utils.error
import salt.utils.zeromq
import salt.utils.network
class WheelClient(
@ -66,8 +66,10 @@ class WheelClient(
interface = self.opts["interface"]
if interface == "0.0.0.0":
interface = "127.0.0.1"
if interface == "::":
interface = "::1"
master_uri = "tcp://{}:{}".format(
salt.utils.zeromq.ip_bracket(interface),
salt.utils.network.ip_bracket(interface),
str(self.opts["ret_port"]),
)
with salt.channel.client.ReqChannel.factory(

View file

@ -1269,3 +1269,15 @@ class NetworkTestCase(TestCase):
),
):
self.assertEqual(network.get_fqhostname(), host)
def test_ip_bracket(self):
test_ipv4 = "127.0.0.1"
test_ipv6 = "::1"
test_ipv6_uri = "[::1]"
self.assertEqual(test_ipv4, network.ip_bracket(test_ipv4))
self.assertEqual(test_ipv6, network.ip_bracket(test_ipv6_uri, strip=True))
self.assertEqual("[{}]".format(test_ipv6), network.ip_bracket(test_ipv6))
self.assertEqual("[{}]".format(test_ipv6), network.ip_bracket(test_ipv6_uri))
ip_addr_obj = ipaddress.ip_address(test_ipv4)
self.assertEqual(test_ipv4, network.ip_bracket(ip_addr_obj))

View file

@ -5,28 +5,12 @@ Test salt.utils.zeromq
import salt.utils.zeromq
import zmq
from salt._compat import ipaddress
from salt.exceptions import SaltSystemExit
from tests.support.mock import patch
from tests.support.unit import TestCase, skipIf
class UtilsTestCase(TestCase):
def test_ip_bracket(self):
test_ipv4 = "127.0.0.1"
test_ipv6 = "::1"
test_ipv6_uri = "[::1]"
self.assertEqual(test_ipv4, salt.utils.zeromq.ip_bracket(test_ipv4))
self.assertEqual(
"[{}]".format(test_ipv6), salt.utils.zeromq.ip_bracket(test_ipv6)
)
self.assertEqual(
"[{}]".format(test_ipv6), salt.utils.zeromq.ip_bracket(test_ipv6_uri)
)
ip_addr_obj = ipaddress.ip_address(test_ipv4)
self.assertEqual(test_ipv4, salt.utils.zeromq.ip_bracket(ip_addr_obj))
@skipIf(
not hasattr(zmq, "IPC_PATH_MAX_LEN"), "ZMQ does not have max length support."
)