mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #47399 from isbm/isbm-zeromq17-deprecationwarning-2017.7.2-v2
zeromq 17 deprecation warning backport from 2018.3 + tornado 5 fixes
This commit is contained in:
commit
2f5fc4ecc5
12 changed files with 169 additions and 186 deletions
|
@ -47,11 +47,6 @@ from salt.exceptions import (
|
|||
# Import third party libs
|
||||
import salt.ext.six as six
|
||||
# pylint: disable=import-error
|
||||
try:
|
||||
import zmq
|
||||
HAS_ZMQ = True
|
||||
except ImportError:
|
||||
HAS_ZMQ = False
|
||||
|
||||
# Try to import range from https://github.com/ytoolshed/range
|
||||
HAS_RANGE = False
|
||||
|
|
|
@ -56,11 +56,7 @@ try:
|
|||
HAS_WINSHELL = True
|
||||
except ImportError:
|
||||
HAS_WINSHELL = False
|
||||
try:
|
||||
import zmq
|
||||
HAS_ZMQ = True
|
||||
except ImportError:
|
||||
HAS_ZMQ = False
|
||||
from salt.utils.zeromq import zmq
|
||||
|
||||
# The directory where salt thin is deployed
|
||||
DEFAULT_THIN_DIR = '/var/tmp/.%%USER%%_%%FQDNUUID%%_salt'
|
||||
|
@ -207,7 +203,7 @@ class SSH(object):
|
|||
'''
|
||||
def __init__(self, opts):
|
||||
pull_sock = os.path.join(opts['sock_dir'], 'master_event_pull.ipc')
|
||||
if os.path.isfile(pull_sock) and HAS_ZMQ:
|
||||
if os.path.exists(pull_sock) and zmq:
|
||||
self.event = salt.utils.event.get_event(
|
||||
'master',
|
||||
opts['sock_dir'],
|
||||
|
|
|
@ -15,15 +15,12 @@ import errno
|
|||
# Import ioflo libs
|
||||
import ioflo.base.deeding
|
||||
# Import third party libs
|
||||
try:
|
||||
import zmq
|
||||
import salt.master
|
||||
import salt.crypt
|
||||
import salt.daemons.masterapi
|
||||
import salt.payload
|
||||
HAS_ZMQ = True
|
||||
except ImportError:
|
||||
HAS_ZMQ = False
|
||||
from salt.utils.zeromq import zmq
|
||||
import salt.master
|
||||
import salt.crypt
|
||||
import salt.daemons.masterapi
|
||||
import salt.payload
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -159,7 +156,7 @@ class SaltZmqPublisher(ioflo.base.deeding.Deed):
|
|||
'''
|
||||
Set up tracking value(s)
|
||||
'''
|
||||
if not HAS_ZMQ:
|
||||
if not zmq:
|
||||
return
|
||||
self.created = False
|
||||
self.serial = salt.payload.Serial(self.opts.value)
|
||||
|
|
|
@ -173,11 +173,7 @@ from __future__ import absolute_import
|
|||
import logging
|
||||
|
||||
# Import third party libraries
|
||||
try:
|
||||
import zmq
|
||||
HAS_ZMQ = True
|
||||
except ImportError:
|
||||
HAS_ZMQ = False
|
||||
from salt.utils.zeromq import zmq
|
||||
|
||||
try:
|
||||
# pylint: disable=W0611
|
||||
|
@ -209,7 +205,7 @@ def __virtual__():
|
|||
'''
|
||||
Load only if napalm-logs is installed.
|
||||
'''
|
||||
if not HAS_NAPALM_LOGS or not HAS_ZMQ:
|
||||
if not HAS_NAPALM_LOGS or not zmq:
|
||||
return (False, 'napalm_syslog could not be loaded. \
|
||||
Please install napalm-logs library amd ZeroMQ.')
|
||||
return True
|
||||
|
|
|
@ -28,23 +28,9 @@ except ImportError:
|
|||
# pylint: disable=import-error,no-name-in-module,redefined-builtin
|
||||
import salt.ext.six as six
|
||||
from salt.ext.six.moves import range
|
||||
from salt.utils.zeromq import zmq, ZMQDefaultLoop, install_zmq, ZMQ_VERSION_INFO
|
||||
# pylint: enable=import-error,no-name-in-module,redefined-builtin
|
||||
|
||||
try:
|
||||
import zmq
|
||||
import zmq.eventloop.ioloop
|
||||
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
|
||||
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
|
||||
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
|
||||
HAS_ZMQ = True
|
||||
except ImportError:
|
||||
HAS_ZMQ = False
|
||||
|
||||
import tornado
|
||||
TORNADO_50 = tornado.version_info >= (5,)
|
||||
|
||||
from salt.utils.async import LOOP_CLASS
|
||||
|
||||
import tornado.gen # pylint: disable=F0401
|
||||
|
||||
# Import salt libs
|
||||
|
@ -378,23 +364,13 @@ class Master(SMaster):
|
|||
|
||||
:param dict: The salt options
|
||||
'''
|
||||
if HAS_ZMQ:
|
||||
# Warn if ZMQ < 3.2
|
||||
try:
|
||||
zmq_version_info = zmq.zmq_version_info()
|
||||
except AttributeError:
|
||||
# PyZMQ <= 2.1.9 does not have zmq_version_info, fall back to
|
||||
# using zmq.zmq_version() and build a version info tuple.
|
||||
zmq_version_info = tuple(
|
||||
[int(x) for x in zmq.zmq_version().split('.')]
|
||||
)
|
||||
if zmq_version_info < (3, 2):
|
||||
log.warning(
|
||||
'You have a version of ZMQ less than ZMQ 3.2! There are '
|
||||
'known connection keep-alive issues with ZMQ < 3.2 which '
|
||||
'may result in loss of contact with minions. Please '
|
||||
'upgrade your ZMQ!'
|
||||
)
|
||||
if zmq and ZMQ_VERSION_INFO < (3, 2):
|
||||
log.warning(
|
||||
'You have a version of ZMQ less than ZMQ 3.2! There are '
|
||||
'known connection keep-alive issues with ZMQ < 3.2 which '
|
||||
'may result in loss of contact with minions. Please '
|
||||
'upgrade your ZMQ!'
|
||||
)
|
||||
SMaster.__init__(self, opts)
|
||||
|
||||
def __set_max_open_files(self):
|
||||
|
@ -858,9 +834,8 @@ class MWorker(SignalHandlingMultiprocessingProcess):
|
|||
Bind to the local port
|
||||
'''
|
||||
# using ZMQIOLoop since we *might* need zmq in there
|
||||
if HAS_ZMQ and not TORNADO_50:
|
||||
zmq.eventloop.ioloop.install()
|
||||
self.io_loop = LOOP_CLASS()
|
||||
install_zmq()
|
||||
self.io_loop = ZMQDefaultLoop()
|
||||
self.io_loop.make_current()
|
||||
for req_channel in self.req_channels:
|
||||
req_channel.post_fork(self._handle_payload, io_loop=self.io_loop) # TODO: cleaner? Maybe lazily?
|
||||
|
|
|
@ -30,23 +30,10 @@ if six.PY3:
|
|||
else:
|
||||
import salt.ext.ipaddress as ipaddress
|
||||
from salt.ext.six.moves import range
|
||||
from salt.utils.zeromq import zmq, ZMQDefaultLoop, install_zmq, ZMQ_VERSION_INFO
|
||||
|
||||
# pylint: enable=no-name-in-module,redefined-builtin
|
||||
from salt.utils.async import LOOP_CLASS
|
||||
|
||||
# Import third party libs
|
||||
try:
|
||||
import zmq
|
||||
# TODO: cleanup
|
||||
import zmq.eventloop.ioloop
|
||||
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
|
||||
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
|
||||
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
|
||||
HAS_ZMQ = True
|
||||
except ImportError:
|
||||
HAS_ZMQ = False
|
||||
|
||||
import tornado
|
||||
TORNADO_50 = tornado.version_info >= (5,)
|
||||
|
||||
HAS_RANGE = False
|
||||
try:
|
||||
|
@ -620,7 +607,7 @@ class MinionBase(object):
|
|||
if self.opts['transport'] == 'detect':
|
||||
self.opts['detect_mode'] = True
|
||||
for trans in ('zeromq', 'tcp'):
|
||||
if trans == 'zeromq' and not HAS_ZMQ:
|
||||
if trans == 'zeromq' and not zmq:
|
||||
continue
|
||||
self.opts['transport'] = trans
|
||||
pub_channel = salt.transport.client.AsyncPubChannel.factory(self.opts, **factory_kwargs)
|
||||
|
@ -657,10 +644,8 @@ class SMinion(MinionBase):
|
|||
# Clean out the proc directory (default /var/cache/salt/minion/proc)
|
||||
if (self.opts.get('file_client', 'remote') == 'remote'
|
||||
or self.opts.get('use_master_when_local', False)):
|
||||
if self.opts['transport'] == 'zeromq' and HAS_ZMQ and not TORNADO_50:
|
||||
io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
|
||||
else:
|
||||
io_loop = LOOP_CLASS.current()
|
||||
install_zmq()
|
||||
io_loop = ZMQDefaultLoop.current()
|
||||
io_loop.run_sync(
|
||||
lambda: self.eval_master(self.opts, failed=True)
|
||||
)
|
||||
|
@ -806,9 +791,8 @@ class MinionManager(MinionBase):
|
|||
self.minions = []
|
||||
self.jid_queue = []
|
||||
|
||||
if HAS_ZMQ and not TORNADO_50:
|
||||
zmq.eventloop.ioloop.install()
|
||||
self.io_loop = LOOP_CLASS.current()
|
||||
install_zmq()
|
||||
self.io_loop = ZMQDefaultLoop.current()
|
||||
self.process_manager = ProcessManager(name='MultiMinionProcessManager')
|
||||
self.io_loop.spawn_callback(self.process_manager.run, async=True)
|
||||
|
||||
|
@ -955,23 +939,14 @@ class Minion(MinionBase):
|
|||
self.periodic_callbacks = {}
|
||||
|
||||
if io_loop is None:
|
||||
if HAS_ZMQ and not TORNADO_50:
|
||||
zmq.eventloop.ioloop.install()
|
||||
self.io_loop = LOOP_CLASS.current()
|
||||
install_zmq()
|
||||
self.io_loop = ZMQDefaultLoop.current()
|
||||
else:
|
||||
self.io_loop = io_loop
|
||||
|
||||
# Warn if ZMQ < 3.2
|
||||
if HAS_ZMQ:
|
||||
try:
|
||||
zmq_version_info = zmq.zmq_version_info()
|
||||
except AttributeError:
|
||||
# PyZMQ <= 2.1.9 does not have zmq_version_info, fall back to
|
||||
# using zmq.zmq_version() and build a version info tuple.
|
||||
zmq_version_info = tuple(
|
||||
[int(x) for x in zmq.zmq_version().split('.')] # pylint: disable=no-member
|
||||
)
|
||||
if zmq_version_info < (3, 2):
|
||||
if zmq:
|
||||
if ZMQ_VERSION_INFO < (3, 2):
|
||||
log.warning(
|
||||
'You have a version of ZMQ less than ZMQ 3.2! There are '
|
||||
'known connection keep-alive issues with ZMQ < 3.2 which '
|
||||
|
@ -2636,9 +2611,8 @@ class SyndicManager(MinionBase):
|
|||
self.jid_forward_cache = set()
|
||||
|
||||
if io_loop is None:
|
||||
if HAS_ZMQ and not TORNADO_50:
|
||||
zmq.eventloop.ioloop.install()
|
||||
self.io_loop = LOOP_CLASS.current()
|
||||
install_zmq()
|
||||
self.io_loop = ZMQDefaultLoop.current()
|
||||
else:
|
||||
self.io_loop = io_loop
|
||||
|
||||
|
|
|
@ -30,13 +30,11 @@ import salt.transport.server
|
|||
import salt.transport.mixins.auth
|
||||
from salt.exceptions import SaltReqTimeoutError
|
||||
|
||||
import zmq
|
||||
from salt.utils.zeromq import zmq, ZMQDefaultLoop, install_zmq, ZMQ_VERSION_INFO, LIBZMQ_VERSION_INFO
|
||||
import zmq.error
|
||||
import zmq.eventloop.ioloop
|
||||
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
|
||||
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
|
||||
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
|
||||
import zmq.eventloop.zmqstream
|
||||
|
||||
try:
|
||||
import zmq.utils.monitor
|
||||
HAS_ZMQ_MONITOR = True
|
||||
|
@ -61,6 +59,42 @@ except ImportError:
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_master_uri(master_ip,
|
||||
master_port,
|
||||
source_ip=None,
|
||||
source_port=None):
|
||||
'''
|
||||
Return the ZeroMQ URI to connect the Minion to the Master.
|
||||
It supports different source IP / port, given the ZeroMQ syntax:
|
||||
|
||||
// Connecting using a IP address and bind to an IP address
|
||||
rc = zmq_connect(socket, "tcp://192.168.1.17:5555;192.168.1.1:5555"); assert (rc == 0);
|
||||
|
||||
Source: http://api.zeromq.org/4-1:zmq-tcp
|
||||
'''
|
||||
if LIBZMQ_VERSION_INFO >= (4, 1, 6) and ZMQ_VERSION_INFO >= (16, 0, 1):
|
||||
# The source:port syntax for ZeroMQ has been added in libzmq 4.1.6
|
||||
# which is included in the pyzmq wheels starting with 16.0.1.
|
||||
if source_ip or source_port:
|
||||
if source_ip and source_port:
|
||||
return 'tcp://{source_ip}:{source_port};{master_ip}:{master_port}'.format(
|
||||
source_ip=source_ip, source_port=source_port,
|
||||
master_ip=master_ip, master_port=master_port)
|
||||
elif source_ip and not source_port:
|
||||
return 'tcp://{source_ip}:0;{master_ip}:{master_port}'.format(
|
||||
source_ip=source_ip,
|
||||
master_ip=master_ip, master_port=master_port)
|
||||
elif not source_ip and source_port:
|
||||
return 'tcp://0.0.0.0:{source_port};{master_ip}:{master_port}'.format(
|
||||
source_port=source_port,
|
||||
master_ip=master_ip, master_port=master_port)
|
||||
if source_ip or source_port:
|
||||
log.warning('Unable to connect to the Master using a specific source IP / port')
|
||||
log.warning('Consider upgrading to pyzmq >= 16.0.1 and libzmq >= 4.1.6')
|
||||
return 'tcp://{master_ip}:{master_port}'.format(
|
||||
master_ip=master_ip, master_port=master_port)
|
||||
|
||||
|
||||
class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
|
||||
'''
|
||||
Encapsulate sending routines to ZeroMQ.
|
||||
|
@ -79,9 +113,8 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
|
|||
# do we have any mapping for this io_loop
|
||||
io_loop = kwargs.get('io_loop')
|
||||
if io_loop is None:
|
||||
if not TORNADO_50:
|
||||
zmq.eventloop.ioloop.install()
|
||||
io_loop = tornado.ioloop.IOLoop.current()
|
||||
install_zmq()
|
||||
io_loop = ZMQDefaultLoop.current()
|
||||
if io_loop not in cls.instance_map:
|
||||
cls.instance_map[io_loop] = weakref.WeakValueDictionary()
|
||||
loop_instance_map = cls.instance_map[io_loop]
|
||||
|
@ -96,7 +129,8 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
|
|||
obj = object.__new__(cls)
|
||||
obj.__singleton_init__(opts, **kwargs)
|
||||
loop_instance_map[key] = obj
|
||||
log.trace('Inserted key into loop_instance_map id {0} for key {1} and process {2}'.format(id(loop_instance_map), key, os.getpid()))
|
||||
log.trace('Inserted key into loop_instance_map id %s for key %s and process %s',
|
||||
id(loop_instance_map), key, os.getpid())
|
||||
else:
|
||||
log.debug('Re-using AsyncZeroMQReqChannel for {0}'.format(key))
|
||||
return obj
|
||||
|
@ -148,9 +182,8 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
|
|||
|
||||
self._io_loop = kwargs.get('io_loop')
|
||||
if self._io_loop is None:
|
||||
if not TORNADO_50:
|
||||
zmq.eventloop.ioloop.install()
|
||||
self._io_loop = tornado.ioloop.IOLoop.current()
|
||||
install_zmq()
|
||||
self._io_loop = ZMQDefaultLoop.current()
|
||||
|
||||
if self.crypt != 'clear':
|
||||
# we don't need to worry about auth as a kwarg, since its a singleton
|
||||
|
@ -290,19 +323,14 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
|
|||
**kwargs):
|
||||
self.opts = opts
|
||||
self.ttype = 'zeromq'
|
||||
|
||||
self.io_loop = kwargs.get('io_loop')
|
||||
if self.io_loop is None:
|
||||
if not TORNADO_50:
|
||||
zmq.eventloop.ioloop.install()
|
||||
self.io_loop = tornado.ioloop.IOLoop.current()
|
||||
|
||||
self.hexid = hashlib.sha1(six.b(self.opts['id'])).hexdigest()
|
||||
install_zmq()
|
||||
self.io_loop = ZMQDefaultLoop.current()
|
||||
|
||||
self.hexid = hashlib.sha1(salt.utils.to_bytes(self.opts['id'])).hexdigest()
|
||||
self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop)
|
||||
|
||||
self.serial = salt.payload.Serial(self.opts)
|
||||
|
||||
self.context = zmq.Context()
|
||||
self._socket = self.context.socket(zmq.SUB)
|
||||
|
||||
|
@ -334,8 +362,7 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
|
|||
|
||||
if self.opts['recon_randomize']:
|
||||
recon_delay = randint(self.opts['recon_default'],
|
||||
self.opts['recon_default'] + self.opts['recon_max']
|
||||
)
|
||||
self.opts['recon_default'] + self.opts['recon_max'])
|
||||
|
||||
log.debug("Generated random reconnect delay between '{0}ms' and '{1}ms' ({2})".format(
|
||||
self.opts['recon_default'],
|
||||
|
@ -449,7 +476,8 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
|
|||
return self.stream.on_recv(wrap_callback)
|
||||
|
||||
|
||||
class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.transport.server.ReqServerChannel):
|
||||
class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin,
|
||||
salt.transport.server.ReqServerChannel):
|
||||
|
||||
def __init__(self, opts):
|
||||
salt.transport.server.ReqServerChannel.__init__(self, opts)
|
||||
|
@ -469,13 +497,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
|
|||
# IPv6 sockets work for both IPv6 and IPv4 addresses
|
||||
self.clients.setsockopt(zmq.IPV4ONLY, 0)
|
||||
self.clients.setsockopt(zmq.BACKLOG, self.opts.get('zmq_backlog', 1000))
|
||||
if HAS_ZMQ_MONITOR and self.opts['zmq_monitor']:
|
||||
# Socket monitor shall be used the only for debug purposes so using threading doesn't look too bad here
|
||||
import threading
|
||||
self._monitor = ZeroMQSocketMonitor(self.clients)
|
||||
t = threading.Thread(target=self._monitor.start_poll)
|
||||
t.start()
|
||||
|
||||
self._start_zmq_monitor()
|
||||
self.workers = self.context.socket(zmq.DEALER)
|
||||
|
||||
if self.opts.get('ipc_mode', '') == 'tcp':
|
||||
|
@ -489,7 +511,6 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
|
|||
|
||||
log.info('Setting up the master communication server')
|
||||
self.clients.bind(self.uri)
|
||||
|
||||
self.workers.bind(self.w_uri)
|
||||
|
||||
while True:
|
||||
|
@ -512,10 +533,11 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
|
|||
return
|
||||
log.info('MWorkerQueue under PID %s is closing', os.getpid())
|
||||
self._closing = True
|
||||
if hasattr(self, '_monitor') and self._monitor is not None:
|
||||
# pylint: disable=E0203
|
||||
if getattr(self, '_monitor', None) is not None:
|
||||
self._monitor.stop()
|
||||
self._monitor = None
|
||||
if hasattr(self, '_w_monitor') and self._w_monitor is not None:
|
||||
if getattr(self, '_w_monitor', None) is not None:
|
||||
self._w_monitor.stop()
|
||||
self._w_monitor = None
|
||||
if hasattr(self, 'clients') and self.clients.closed is False:
|
||||
|
@ -528,6 +550,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
|
|||
self._socket.close()
|
||||
if hasattr(self, 'context') and self.context.closed is False:
|
||||
self.context.term()
|
||||
# pylint: enable=E0203
|
||||
|
||||
def pre_fork(self, process_manager):
|
||||
'''
|
||||
|
@ -538,6 +561,21 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
|
|||
salt.transport.mixins.auth.AESReqServerMixin.pre_fork(self, process_manager)
|
||||
process_manager.add_process(self.zmq_device)
|
||||
|
||||
def _start_zmq_monitor(self):
|
||||
'''
|
||||
Starts ZMQ monitor for debugging purposes.
|
||||
:return:
|
||||
'''
|
||||
# Socket monitor shall be used the only for debug
|
||||
# purposes so using threading doesn't look too bad here
|
||||
|
||||
if HAS_ZMQ_MONITOR and self.opts['zmq_monitor']:
|
||||
log.debug('Starting ZMQ monitor')
|
||||
import threading
|
||||
self._w_monitor = ZeroMQSocketMonitor(self._socket)
|
||||
threading.Thread(target=self._w_monitor.start_poll).start()
|
||||
log.debug('ZMQ monitor has been started started')
|
||||
|
||||
def post_fork(self, payload_handler, io_loop):
|
||||
'''
|
||||
After forking we need to create all of the local sockets to listen to the
|
||||
|
@ -552,12 +590,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
|
|||
|
||||
self.context = zmq.Context(1)
|
||||
self._socket = self.context.socket(zmq.REP)
|
||||
if HAS_ZMQ_MONITOR and self.opts['zmq_monitor']:
|
||||
# Socket monitor shall be used the only for debug purposes so using threading doesn't look too bad here
|
||||
import threading
|
||||
self._w_monitor = ZeroMQSocketMonitor(self._socket)
|
||||
t = threading.Thread(target=self._w_monitor.start_poll)
|
||||
t.start()
|
||||
self._start_zmq_monitor()
|
||||
|
||||
if self.opts.get('ipc_mode', '') == 'tcp':
|
||||
self.w_uri = 'tcp://127.0.0.1:{0}'.format(
|
||||
|
@ -763,27 +796,35 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
|
|||
# Catch and handle EINTR from when this process is sent
|
||||
# SIGUSR1 gracefully so we don't choke and die horribly
|
||||
try:
|
||||
log.trace('Getting data from puller %s', pull_uri)
|
||||
package = pull_sock.recv()
|
||||
unpacked_package = salt.payload.unpackage(package)
|
||||
if six.PY3:
|
||||
unpacked_package = salt.transport.frame.decode_embedded_strs(unpacked_package)
|
||||
payload = unpacked_package['payload']
|
||||
log.trace('Accepted unpacked package from puller')
|
||||
if self.opts['zmq_filtering']:
|
||||
# if you have a specific topic list, use that
|
||||
if 'topic_lst' in unpacked_package:
|
||||
for topic in unpacked_package['topic_lst']:
|
||||
log.trace('Sending filtered data over publisher %s', pub_uri)
|
||||
# zmq filters are substring match, hash the topic
|
||||
# to avoid collisions
|
||||
htopic = hashlib.sha1(topic).hexdigest()
|
||||
pub_sock.send(htopic, flags=zmq.SNDMORE)
|
||||
pub_sock.send(payload)
|
||||
log.trace('Filtered data has been sent')
|
||||
# otherwise its a broadcast
|
||||
else:
|
||||
# TODO: constants file for "broadcast"
|
||||
log.trace('Sending broadcasted data over publisher %s', pub_uri)
|
||||
pub_sock.send('broadcast', flags=zmq.SNDMORE)
|
||||
pub_sock.send(payload)
|
||||
log.trace('Broadcasted data has been sent')
|
||||
else:
|
||||
log.trace('Sending ZMQ-unfiltered data over publisher %s', pub_uri)
|
||||
pub_sock.send(payload)
|
||||
log.trace('Unfiltered data has been sent')
|
||||
except zmq.ZMQError as exc:
|
||||
if exc.errno == errno.EINTR:
|
||||
continue
|
||||
|
@ -901,14 +942,12 @@ class AsyncReqMessageClient(object):
|
|||
self.addr = addr
|
||||
self.linger = linger
|
||||
if io_loop is None:
|
||||
if not TORNADO_50:
|
||||
zmq.eventloop.ioloop.install()
|
||||
tornado.ioloop.IOLoop.current()
|
||||
install_zmq()
|
||||
ZMQDefaultLoop.current()
|
||||
else:
|
||||
self.io_loop = io_loop
|
||||
|
||||
self.serial = salt.payload.Serial(self.opts)
|
||||
|
||||
self.context = zmq.Context()
|
||||
|
||||
# wire up sockets
|
||||
|
@ -987,7 +1026,8 @@ class AsyncReqMessageClient(object):
|
|||
|
||||
try:
|
||||
ret = yield future
|
||||
except: # pylint: disable=W0702
|
||||
except Exception as err: # pylint: disable=W0702
|
||||
log.debug('Re-init ZMQ socket: %s', err)
|
||||
self._init_socket() # re-init the zmq socket (no other way in zmq)
|
||||
del self.send_queue[0]
|
||||
continue
|
||||
|
|
|
@ -7,26 +7,9 @@ from __future__ import absolute_import
|
|||
|
||||
import tornado.ioloop
|
||||
import tornado.concurrent
|
||||
# attempt to use zmq-- if we have it otherwise fallback to tornado loop
|
||||
try:
|
||||
import zmq.eventloop.ioloop
|
||||
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
|
||||
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
|
||||
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
|
||||
HAS_ZMQ = True
|
||||
except ImportError:
|
||||
HAS_ZMQ = False
|
||||
|
||||
import tornado
|
||||
TORNADO_50 = tornado.version_info >= (5,)
|
||||
|
||||
if HAS_ZMQ and not TORNADO_50:
|
||||
LOOP_CLASS = zmq.eventloop.ioloop.ZMQIOLoop
|
||||
else:
|
||||
import tornado.ioloop
|
||||
LOOP_CLASS = tornado.ioloop.IOLoop
|
||||
|
||||
import contextlib
|
||||
from salt.utils import zeromq
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
|
@ -60,7 +43,7 @@ class SyncWrapper(object):
|
|||
if kwargs is None:
|
||||
kwargs = {}
|
||||
|
||||
self.io_loop = LOOP_CLASS()
|
||||
self.io_loop = zeromq.ZMQDefaultLoop()
|
||||
kwargs['io_loop'] = self.io_loop
|
||||
|
||||
with current_ioloop(self.io_loop):
|
||||
|
|
|
@ -18,11 +18,7 @@ import salt.utils.dictupdate
|
|||
|
||||
# Import third party libs
|
||||
from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
|
||||
try:
|
||||
import zmq
|
||||
HAS_ZMQ = True
|
||||
except ImportError:
|
||||
HAS_ZMQ = False
|
||||
from salt.utils.zeromq import zmq
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
|
|
@ -29,12 +29,8 @@ from salt.utils.cache import CacheCli as cache_cli
|
|||
from salt.utils.process import MultiprocessingProcess
|
||||
|
||||
# Import third party libs
|
||||
import salt.ext.six as six
|
||||
try:
|
||||
import zmq
|
||||
HAS_ZMQ = True
|
||||
except ImportError:
|
||||
HAS_ZMQ = False
|
||||
from salt.ext import six
|
||||
from salt.utils.zeromq import zmq
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
|
|
@ -3,21 +3,60 @@
|
|||
# Import Python libs
|
||||
from __future__ import absolute_import
|
||||
|
||||
# Import Salt libs
|
||||
import logging
|
||||
import tornado.ioloop
|
||||
from salt.exceptions import SaltSystemExit
|
||||
|
||||
# Import 3rd-party libs
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
import zmq
|
||||
HAS_ZMQ = True
|
||||
except ImportError:
|
||||
HAS_ZMQ = False
|
||||
zmq = None
|
||||
log.debug('ZMQ module is not found')
|
||||
|
||||
ZMQDefaultLoop = None
|
||||
ZMQ_VERSION_INFO = (-1, -1, -1)
|
||||
LIBZMQ_VERSION_INFO = (-1, -1, -1)
|
||||
|
||||
try:
|
||||
if zmq:
|
||||
ZMQ_VERSION_INFO = tuple([int(v_el) for v_el in zmq.__version__.split('.')])
|
||||
LIBZMQ_VERSION_INFO = tuple([int(v_el) for v_el in zmq.zmq_version().split('.')])
|
||||
if ZMQ_VERSION_INFO[0] > 16: # 17.0.x+ deprecates zmq's ioloops
|
||||
ZMQDefaultLoop = tornado.ioloop.IOLoop
|
||||
except Exception:
|
||||
log.exception('Error while getting LibZMQ/PyZMQ library version')
|
||||
|
||||
if ZMQDefaultLoop is None:
|
||||
try:
|
||||
import zmq.eventloop.ioloop
|
||||
# Support for ZeroMQ 13.x
|
||||
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
|
||||
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
|
||||
if tornado.version_info < (5,):
|
||||
ZMQDefaultLoop = zmq.eventloop.ioloop.ZMQIOLoop
|
||||
except ImportError:
|
||||
ZMQDefaultLoop = None
|
||||
if ZMQDefaultLoop is None:
|
||||
ZMQDefaultLoop = tornado.ioloop.IOLoop
|
||||
|
||||
|
||||
def install_zmq():
|
||||
'''
|
||||
While pyzmq 17 no longer needs any special integration for tornado,
|
||||
older version still need one.
|
||||
:return:
|
||||
'''
|
||||
if zmq and ZMQ_VERSION_INFO[0] < 17:
|
||||
if tornado.version_info < (5,):
|
||||
zmq.eventloop.ioloop.install()
|
||||
|
||||
|
||||
def check_ipc_path_max_len(uri):
|
||||
# The socket path is limited to 107 characters on Solaris and
|
||||
# Linux, and 103 characters on BSD-based systems.
|
||||
if not HAS_ZMQ:
|
||||
if zmq is None:
|
||||
return
|
||||
ipc_path_max_len = getattr(zmq, 'IPC_PATH_MAX_LEN', 103)
|
||||
if ipc_path_max_len and len(uri) > ipc_path_max_len:
|
||||
|
|
|
@ -18,13 +18,9 @@ from tests.support.helpers import flaky
|
|||
from tests.support.unit import skipIf
|
||||
|
||||
# Import 3rd-party libs
|
||||
import salt.ext.six as six
|
||||
try:
|
||||
import zmq
|
||||
from zmq.eventloop.ioloop import ZMQIOLoop
|
||||
HAS_ZMQ_IOLOOP = True
|
||||
except ImportError:
|
||||
HAS_ZMQ_IOLOOP = False
|
||||
from salt.ext import six
|
||||
from salt.utils.zeromq import zmq, ZMQDefaultLoop as ZMQIOLoop
|
||||
HAS_ZMQ_IOLOOP = bool(zmq)
|
||||
|
||||
|
||||
def json_loads(data):
|
||||
|
|
Loading…
Add table
Reference in a new issue