Remove code duplicate by reusing utilities functions

This commit is contained in:
Bo Maryniuk 2018-02-14 10:13:29 +01:00
parent 57da54b676
commit 4e650c0b44

View file

@ -59,6 +59,42 @@ except ImportError:
log = logging.getLogger(__name__)
def _get_master_uri(master_ip,
master_port,
source_ip=None,
source_port=None):
'''
Return the ZeroMQ URI to connect the Minion to the Master.
It supports different source IP / port, given the ZeroMQ syntax:
// Connecting using a IP address and bind to an IP address
rc = zmq_connect(socket, "tcp://192.168.1.17:5555;192.168.1.1:5555"); assert (rc == 0);
Source: http://api.zeromq.org/4-1:zmq-tcp
'''
if LIBZMQ_VERSION_INFO >= (4, 1, 6) and ZMQ_VERSION_INFO >= (16, 0, 1):
# The source:port syntax for ZeroMQ has been added in libzmq 4.1.6
# which is included in the pyzmq wheels starting with 16.0.1.
if source_ip or source_port:
if source_ip and source_port:
return 'tcp://{source_ip}:{source_port};{master_ip}:{master_port}'.format(
source_ip=source_ip, source_port=source_port,
master_ip=master_ip, master_port=master_port)
elif source_ip and not source_port:
return 'tcp://{source_ip}:0;{master_ip}:{master_port}'.format(
source_ip=source_ip,
master_ip=master_ip, master_port=master_port)
elif not source_ip and source_port:
return 'tcp://0.0.0.0:{source_port};{master_ip}:{master_port}'.format(
source_port=source_port,
master_ip=master_ip, master_port=master_port)
if source_ip or source_port:
log.warning('Unable to connect to the Master using a specific source IP / port')
log.warning('Consider upgrading to pyzmq >= 16.0.1 and libzmq >= 4.1.6')
return 'tcp://{master_ip}:{master_port}'.format(
master_ip=master_ip, master_port=master_port)
class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
'''
Encapsulate sending routines to ZeroMQ.
@ -77,9 +113,8 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
# do we have any mapping for this io_loop
io_loop = kwargs.get('io_loop')
if io_loop is None:
if not TORNADO_50:
zmq.eventloop.ioloop.install()
io_loop = tornado.ioloop.IOLoop.current()
install_zmq()
io_loop = ZMQDefaultLoop.current()
if io_loop not in cls.instance_map:
cls.instance_map[io_loop] = weakref.WeakValueDictionary()
loop_instance_map = cls.instance_map[io_loop]
@ -146,9 +181,8 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
self._io_loop = kwargs.get('io_loop')
if self._io_loop is None:
if not TORNADO_50:
zmq.eventloop.ioloop.install()
self._io_loop = tornado.ioloop.IOLoop.current()
install_zmq()
self._io_loop = ZMQDefaultLoop.current()
if self.crypt != 'clear':
# we don't need to worry about auth as a kwarg, since its a singleton
@ -288,19 +322,14 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
**kwargs):
self.opts = opts
self.ttype = 'zeromq'
self.io_loop = kwargs.get('io_loop')
if self.io_loop is None:
if not TORNADO_50:
zmq.eventloop.ioloop.install()
self.io_loop = tornado.ioloop.IOLoop.current()
self.hexid = hashlib.sha1(six.b(self.opts['id'])).hexdigest()
install_zmq()
self.io_loop = ZMQDefaultLoop.current()
self.hexid = hashlib.sha1(salt.utils.stringutils.to_bytes(self.opts['id'])).hexdigest()
self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop)
self.serial = salt.payload.Serial(self.opts)
self.context = zmq.Context()
self._socket = self.context.socket(zmq.SUB)
@ -899,14 +928,12 @@ class AsyncReqMessageClient(object):
self.addr = addr
self.linger = linger
if io_loop is None:
if not TORNADO_50:
zmq.eventloop.ioloop.install()
tornado.ioloop.IOLoop.current()
install_zmq()
ZMQDefaultLoop.current()
else:
self.io_loop = io_loop
self.serial = salt.payload.Serial(self.opts)
self.context = zmq.Context()
# wire up sockets