mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Remove code duplicate by reusing utilities functions
This commit is contained in:
parent
57da54b676
commit
4e650c0b44
1 changed files with 45 additions and 18 deletions
|
@ -59,6 +59,42 @@ except ImportError:
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_master_uri(master_ip,
|
||||
master_port,
|
||||
source_ip=None,
|
||||
source_port=None):
|
||||
'''
|
||||
Return the ZeroMQ URI to connect the Minion to the Master.
|
||||
It supports different source IP / port, given the ZeroMQ syntax:
|
||||
|
||||
// Connecting using a IP address and bind to an IP address
|
||||
rc = zmq_connect(socket, "tcp://192.168.1.17:5555;192.168.1.1:5555"); assert (rc == 0);
|
||||
|
||||
Source: http://api.zeromq.org/4-1:zmq-tcp
|
||||
'''
|
||||
if LIBZMQ_VERSION_INFO >= (4, 1, 6) and ZMQ_VERSION_INFO >= (16, 0, 1):
|
||||
# The source:port syntax for ZeroMQ has been added in libzmq 4.1.6
|
||||
# which is included in the pyzmq wheels starting with 16.0.1.
|
||||
if source_ip or source_port:
|
||||
if source_ip and source_port:
|
||||
return 'tcp://{source_ip}:{source_port};{master_ip}:{master_port}'.format(
|
||||
source_ip=source_ip, source_port=source_port,
|
||||
master_ip=master_ip, master_port=master_port)
|
||||
elif source_ip and not source_port:
|
||||
return 'tcp://{source_ip}:0;{master_ip}:{master_port}'.format(
|
||||
source_ip=source_ip,
|
||||
master_ip=master_ip, master_port=master_port)
|
||||
elif not source_ip and source_port:
|
||||
return 'tcp://0.0.0.0:{source_port};{master_ip}:{master_port}'.format(
|
||||
source_port=source_port,
|
||||
master_ip=master_ip, master_port=master_port)
|
||||
if source_ip or source_port:
|
||||
log.warning('Unable to connect to the Master using a specific source IP / port')
|
||||
log.warning('Consider upgrading to pyzmq >= 16.0.1 and libzmq >= 4.1.6')
|
||||
return 'tcp://{master_ip}:{master_port}'.format(
|
||||
master_ip=master_ip, master_port=master_port)
|
||||
|
||||
|
||||
class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
|
||||
'''
|
||||
Encapsulate sending routines to ZeroMQ.
|
||||
|
@ -77,9 +113,8 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
|
|||
# do we have any mapping for this io_loop
|
||||
io_loop = kwargs.get('io_loop')
|
||||
if io_loop is None:
|
||||
if not TORNADO_50:
|
||||
zmq.eventloop.ioloop.install()
|
||||
io_loop = tornado.ioloop.IOLoop.current()
|
||||
install_zmq()
|
||||
io_loop = ZMQDefaultLoop.current()
|
||||
if io_loop not in cls.instance_map:
|
||||
cls.instance_map[io_loop] = weakref.WeakValueDictionary()
|
||||
loop_instance_map = cls.instance_map[io_loop]
|
||||
|
@ -146,9 +181,8 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
|
|||
|
||||
self._io_loop = kwargs.get('io_loop')
|
||||
if self._io_loop is None:
|
||||
if not TORNADO_50:
|
||||
zmq.eventloop.ioloop.install()
|
||||
self._io_loop = tornado.ioloop.IOLoop.current()
|
||||
install_zmq()
|
||||
self._io_loop = ZMQDefaultLoop.current()
|
||||
|
||||
if self.crypt != 'clear':
|
||||
# we don't need to worry about auth as a kwarg, since its a singleton
|
||||
|
@ -288,19 +322,14 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
|
|||
**kwargs):
|
||||
self.opts = opts
|
||||
self.ttype = 'zeromq'
|
||||
|
||||
self.io_loop = kwargs.get('io_loop')
|
||||
if self.io_loop is None:
|
||||
if not TORNADO_50:
|
||||
zmq.eventloop.ioloop.install()
|
||||
self.io_loop = tornado.ioloop.IOLoop.current()
|
||||
|
||||
self.hexid = hashlib.sha1(six.b(self.opts['id'])).hexdigest()
|
||||
install_zmq()
|
||||
self.io_loop = ZMQDefaultLoop.current()
|
||||
|
||||
self.hexid = hashlib.sha1(salt.utils.stringutils.to_bytes(self.opts['id'])).hexdigest()
|
||||
self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop)
|
||||
|
||||
self.serial = salt.payload.Serial(self.opts)
|
||||
|
||||
self.context = zmq.Context()
|
||||
self._socket = self.context.socket(zmq.SUB)
|
||||
|
||||
|
@ -899,14 +928,12 @@ class AsyncReqMessageClient(object):
|
|||
self.addr = addr
|
||||
self.linger = linger
|
||||
if io_loop is None:
|
||||
if not TORNADO_50:
|
||||
zmq.eventloop.ioloop.install()
|
||||
tornado.ioloop.IOLoop.current()
|
||||
install_zmq()
|
||||
ZMQDefaultLoop.current()
|
||||
else:
|
||||
self.io_loop = io_loop
|
||||
|
||||
self.serial = salt.payload.Serial(self.opts)
|
||||
|
||||
self.context = zmq.Context()
|
||||
|
||||
# wire up sockets
|
||||
|
|
Loading…
Add table
Reference in a new issue