Merge pull request #47399 from isbm/isbm-zeromq17-deprecationwarning-2017.7.2-v2

zeromq 17 deprecation warning backport from 2018.3 + tornado 5 fixes
This commit is contained in:
Mike Place 2018-05-02 10:11:15 -05:00 committed by GitHub
commit 2f5fc4ecc5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 169 additions and 186 deletions

View file

@ -47,11 +47,6 @@ from salt.exceptions import (
# Import third party libs
import salt.ext.six as six
# pylint: disable=import-error
try:
import zmq
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
# Try to import range from https://github.com/ytoolshed/range
HAS_RANGE = False

View file

@ -56,11 +56,7 @@ try:
HAS_WINSHELL = True
except ImportError:
HAS_WINSHELL = False
try:
import zmq
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
from salt.utils.zeromq import zmq
# The directory where salt thin is deployed
DEFAULT_THIN_DIR = '/var/tmp/.%%USER%%_%%FQDNUUID%%_salt'
@ -207,7 +203,7 @@ class SSH(object):
'''
def __init__(self, opts):
pull_sock = os.path.join(opts['sock_dir'], 'master_event_pull.ipc')
if os.path.isfile(pull_sock) and HAS_ZMQ:
if os.path.exists(pull_sock) and zmq:
self.event = salt.utils.event.get_event(
'master',
opts['sock_dir'],

View file

@ -15,15 +15,12 @@ import errno
# Import ioflo libs
import ioflo.base.deeding
# Import third party libs
try:
import zmq
import salt.master
import salt.crypt
import salt.daemons.masterapi
import salt.payload
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
from salt.utils.zeromq import zmq
import salt.master
import salt.crypt
import salt.daemons.masterapi
import salt.payload
log = logging.getLogger(__name__)
@ -159,7 +156,7 @@ class SaltZmqPublisher(ioflo.base.deeding.Deed):
'''
Set up tracking value(s)
'''
if not HAS_ZMQ:
if not zmq:
return
self.created = False
self.serial = salt.payload.Serial(self.opts.value)

View file

@ -173,11 +173,7 @@ from __future__ import absolute_import
import logging
# Import third party libraries
try:
import zmq
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
from salt.utils.zeromq import zmq
try:
# pylint: disable=W0611
@ -209,7 +205,7 @@ def __virtual__():
'''
Load only if napalm-logs is installed.
'''
if not HAS_NAPALM_LOGS or not HAS_ZMQ:
if not HAS_NAPALM_LOGS or not zmq:
return (False, 'napalm_syslog could not be loaded. \
Please install napalm-logs library amd ZeroMQ.')
return True

View file

@ -28,23 +28,9 @@ except ImportError:
# pylint: disable=import-error,no-name-in-module,redefined-builtin
import salt.ext.six as six
from salt.ext.six.moves import range
from salt.utils.zeromq import zmq, ZMQDefaultLoop, install_zmq, ZMQ_VERSION_INFO
# pylint: enable=import-error,no-name-in-module,redefined-builtin
try:
import zmq
import zmq.eventloop.ioloop
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
import tornado
TORNADO_50 = tornado.version_info >= (5,)
from salt.utils.async import LOOP_CLASS
import tornado.gen # pylint: disable=F0401
# Import salt libs
@ -378,23 +364,13 @@ class Master(SMaster):
:param dict: The salt options
'''
if HAS_ZMQ:
# Warn if ZMQ < 3.2
try:
zmq_version_info = zmq.zmq_version_info()
except AttributeError:
# PyZMQ <= 2.1.9 does not have zmq_version_info, fall back to
# using zmq.zmq_version() and build a version info tuple.
zmq_version_info = tuple(
[int(x) for x in zmq.zmq_version().split('.')]
)
if zmq_version_info < (3, 2):
log.warning(
'You have a version of ZMQ less than ZMQ 3.2! There are '
'known connection keep-alive issues with ZMQ < 3.2 which '
'may result in loss of contact with minions. Please '
'upgrade your ZMQ!'
)
if zmq and ZMQ_VERSION_INFO < (3, 2):
log.warning(
'You have a version of ZMQ less than ZMQ 3.2! There are '
'known connection keep-alive issues with ZMQ < 3.2 which '
'may result in loss of contact with minions. Please '
'upgrade your ZMQ!'
)
SMaster.__init__(self, opts)
def __set_max_open_files(self):
@ -858,9 +834,8 @@ class MWorker(SignalHandlingMultiprocessingProcess):
Bind to the local port
'''
# using ZMQIOLoop since we *might* need zmq in there
if HAS_ZMQ and not TORNADO_50:
zmq.eventloop.ioloop.install()
self.io_loop = LOOP_CLASS()
install_zmq()
self.io_loop = ZMQDefaultLoop()
self.io_loop.make_current()
for req_channel in self.req_channels:
req_channel.post_fork(self._handle_payload, io_loop=self.io_loop) # TODO: cleaner? Maybe lazily?

View file

@ -30,23 +30,10 @@ if six.PY3:
else:
import salt.ext.ipaddress as ipaddress
from salt.ext.six.moves import range
from salt.utils.zeromq import zmq, ZMQDefaultLoop, install_zmq, ZMQ_VERSION_INFO
# pylint: enable=no-name-in-module,redefined-builtin
from salt.utils.async import LOOP_CLASS
# Import third party libs
try:
import zmq
# TODO: cleanup
import zmq.eventloop.ioloop
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
import tornado
TORNADO_50 = tornado.version_info >= (5,)
HAS_RANGE = False
try:
@ -620,7 +607,7 @@ class MinionBase(object):
if self.opts['transport'] == 'detect':
self.opts['detect_mode'] = True
for trans in ('zeromq', 'tcp'):
if trans == 'zeromq' and not HAS_ZMQ:
if trans == 'zeromq' and not zmq:
continue
self.opts['transport'] = trans
pub_channel = salt.transport.client.AsyncPubChannel.factory(self.opts, **factory_kwargs)
@ -657,10 +644,8 @@ class SMinion(MinionBase):
# Clean out the proc directory (default /var/cache/salt/minion/proc)
if (self.opts.get('file_client', 'remote') == 'remote'
or self.opts.get('use_master_when_local', False)):
if self.opts['transport'] == 'zeromq' and HAS_ZMQ and not TORNADO_50:
io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
else:
io_loop = LOOP_CLASS.current()
install_zmq()
io_loop = ZMQDefaultLoop.current()
io_loop.run_sync(
lambda: self.eval_master(self.opts, failed=True)
)
@ -806,9 +791,8 @@ class MinionManager(MinionBase):
self.minions = []
self.jid_queue = []
if HAS_ZMQ and not TORNADO_50:
zmq.eventloop.ioloop.install()
self.io_loop = LOOP_CLASS.current()
install_zmq()
self.io_loop = ZMQDefaultLoop.current()
self.process_manager = ProcessManager(name='MultiMinionProcessManager')
self.io_loop.spawn_callback(self.process_manager.run, async=True)
@ -955,23 +939,14 @@ class Minion(MinionBase):
self.periodic_callbacks = {}
if io_loop is None:
if HAS_ZMQ and not TORNADO_50:
zmq.eventloop.ioloop.install()
self.io_loop = LOOP_CLASS.current()
install_zmq()
self.io_loop = ZMQDefaultLoop.current()
else:
self.io_loop = io_loop
# Warn if ZMQ < 3.2
if HAS_ZMQ:
try:
zmq_version_info = zmq.zmq_version_info()
except AttributeError:
# PyZMQ <= 2.1.9 does not have zmq_version_info, fall back to
# using zmq.zmq_version() and build a version info tuple.
zmq_version_info = tuple(
[int(x) for x in zmq.zmq_version().split('.')] # pylint: disable=no-member
)
if zmq_version_info < (3, 2):
if zmq:
if ZMQ_VERSION_INFO < (3, 2):
log.warning(
'You have a version of ZMQ less than ZMQ 3.2! There are '
'known connection keep-alive issues with ZMQ < 3.2 which '
@ -2636,9 +2611,8 @@ class SyndicManager(MinionBase):
self.jid_forward_cache = set()
if io_loop is None:
if HAS_ZMQ and not TORNADO_50:
zmq.eventloop.ioloop.install()
self.io_loop = LOOP_CLASS.current()
install_zmq()
self.io_loop = ZMQDefaultLoop.current()
else:
self.io_loop = io_loop

View file

@ -30,13 +30,11 @@ import salt.transport.server
import salt.transport.mixins.auth
from salt.exceptions import SaltReqTimeoutError
import zmq
from salt.utils.zeromq import zmq, ZMQDefaultLoop, install_zmq, ZMQ_VERSION_INFO, LIBZMQ_VERSION_INFO
import zmq.error
import zmq.eventloop.ioloop
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
import zmq.eventloop.zmqstream
try:
import zmq.utils.monitor
HAS_ZMQ_MONITOR = True
@ -61,6 +59,42 @@ except ImportError:
log = logging.getLogger(__name__)
def _get_master_uri(master_ip,
master_port,
source_ip=None,
source_port=None):
'''
Return the ZeroMQ URI to connect the Minion to the Master.
It supports different source IP / port, given the ZeroMQ syntax:
// Connecting using a IP address and bind to an IP address
rc = zmq_connect(socket, "tcp://192.168.1.17:5555;192.168.1.1:5555"); assert (rc == 0);
Source: http://api.zeromq.org/4-1:zmq-tcp
'''
if LIBZMQ_VERSION_INFO >= (4, 1, 6) and ZMQ_VERSION_INFO >= (16, 0, 1):
# The source:port syntax for ZeroMQ has been added in libzmq 4.1.6
# which is included in the pyzmq wheels starting with 16.0.1.
if source_ip or source_port:
if source_ip and source_port:
return 'tcp://{source_ip}:{source_port};{master_ip}:{master_port}'.format(
source_ip=source_ip, source_port=source_port,
master_ip=master_ip, master_port=master_port)
elif source_ip and not source_port:
return 'tcp://{source_ip}:0;{master_ip}:{master_port}'.format(
source_ip=source_ip,
master_ip=master_ip, master_port=master_port)
elif not source_ip and source_port:
return 'tcp://0.0.0.0:{source_port};{master_ip}:{master_port}'.format(
source_port=source_port,
master_ip=master_ip, master_port=master_port)
if source_ip or source_port:
log.warning('Unable to connect to the Master using a specific source IP / port')
log.warning('Consider upgrading to pyzmq >= 16.0.1 and libzmq >= 4.1.6')
return 'tcp://{master_ip}:{master_port}'.format(
master_ip=master_ip, master_port=master_port)
class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
'''
Encapsulate sending routines to ZeroMQ.
@ -79,9 +113,8 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
# do we have any mapping for this io_loop
io_loop = kwargs.get('io_loop')
if io_loop is None:
if not TORNADO_50:
zmq.eventloop.ioloop.install()
io_loop = tornado.ioloop.IOLoop.current()
install_zmq()
io_loop = ZMQDefaultLoop.current()
if io_loop not in cls.instance_map:
cls.instance_map[io_loop] = weakref.WeakValueDictionary()
loop_instance_map = cls.instance_map[io_loop]
@ -96,7 +129,8 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
obj = object.__new__(cls)
obj.__singleton_init__(opts, **kwargs)
loop_instance_map[key] = obj
log.trace('Inserted key into loop_instance_map id {0} for key {1} and process {2}'.format(id(loop_instance_map), key, os.getpid()))
log.trace('Inserted key into loop_instance_map id %s for key %s and process %s',
id(loop_instance_map), key, os.getpid())
else:
log.debug('Re-using AsyncZeroMQReqChannel for {0}'.format(key))
return obj
@ -148,9 +182,8 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
self._io_loop = kwargs.get('io_loop')
if self._io_loop is None:
if not TORNADO_50:
zmq.eventloop.ioloop.install()
self._io_loop = tornado.ioloop.IOLoop.current()
install_zmq()
self._io_loop = ZMQDefaultLoop.current()
if self.crypt != 'clear':
# we don't need to worry about auth as a kwarg, since its a singleton
@ -290,19 +323,14 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
**kwargs):
self.opts = opts
self.ttype = 'zeromq'
self.io_loop = kwargs.get('io_loop')
if self.io_loop is None:
if not TORNADO_50:
zmq.eventloop.ioloop.install()
self.io_loop = tornado.ioloop.IOLoop.current()
self.hexid = hashlib.sha1(six.b(self.opts['id'])).hexdigest()
install_zmq()
self.io_loop = ZMQDefaultLoop.current()
self.hexid = hashlib.sha1(salt.utils.to_bytes(self.opts['id'])).hexdigest()
self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop)
self.serial = salt.payload.Serial(self.opts)
self.context = zmq.Context()
self._socket = self.context.socket(zmq.SUB)
@ -334,8 +362,7 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
if self.opts['recon_randomize']:
recon_delay = randint(self.opts['recon_default'],
self.opts['recon_default'] + self.opts['recon_max']
)
self.opts['recon_default'] + self.opts['recon_max'])
log.debug("Generated random reconnect delay between '{0}ms' and '{1}ms' ({2})".format(
self.opts['recon_default'],
@ -449,7 +476,8 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
return self.stream.on_recv(wrap_callback)
class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.transport.server.ReqServerChannel):
class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin,
salt.transport.server.ReqServerChannel):
def __init__(self, opts):
salt.transport.server.ReqServerChannel.__init__(self, opts)
@ -469,13 +497,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
# IPv6 sockets work for both IPv6 and IPv4 addresses
self.clients.setsockopt(zmq.IPV4ONLY, 0)
self.clients.setsockopt(zmq.BACKLOG, self.opts.get('zmq_backlog', 1000))
if HAS_ZMQ_MONITOR and self.opts['zmq_monitor']:
# Socket monitor shall be used the only for debug purposes so using threading doesn't look too bad here
import threading
self._monitor = ZeroMQSocketMonitor(self.clients)
t = threading.Thread(target=self._monitor.start_poll)
t.start()
self._start_zmq_monitor()
self.workers = self.context.socket(zmq.DEALER)
if self.opts.get('ipc_mode', '') == 'tcp':
@ -489,7 +511,6 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
log.info('Setting up the master communication server')
self.clients.bind(self.uri)
self.workers.bind(self.w_uri)
while True:
@ -512,10 +533,11 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
return
log.info('MWorkerQueue under PID %s is closing', os.getpid())
self._closing = True
if hasattr(self, '_monitor') and self._monitor is not None:
# pylint: disable=E0203
if getattr(self, '_monitor', None) is not None:
self._monitor.stop()
self._monitor = None
if hasattr(self, '_w_monitor') and self._w_monitor is not None:
if getattr(self, '_w_monitor', None) is not None:
self._w_monitor.stop()
self._w_monitor = None
if hasattr(self, 'clients') and self.clients.closed is False:
@ -528,6 +550,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
self._socket.close()
if hasattr(self, 'context') and self.context.closed is False:
self.context.term()
# pylint: enable=E0203
def pre_fork(self, process_manager):
'''
@ -538,6 +561,21 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
salt.transport.mixins.auth.AESReqServerMixin.pre_fork(self, process_manager)
process_manager.add_process(self.zmq_device)
def _start_zmq_monitor(self):
'''
Starts ZMQ monitor for debugging purposes.
:return:
'''
# Socket monitor shall be used the only for debug
# purposes so using threading doesn't look too bad here
if HAS_ZMQ_MONITOR and self.opts['zmq_monitor']:
log.debug('Starting ZMQ monitor')
import threading
self._w_monitor = ZeroMQSocketMonitor(self._socket)
threading.Thread(target=self._w_monitor.start_poll).start()
log.debug('ZMQ monitor has been started started')
def post_fork(self, payload_handler, io_loop):
'''
After forking we need to create all of the local sockets to listen to the
@ -552,12 +590,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
self.context = zmq.Context(1)
self._socket = self.context.socket(zmq.REP)
if HAS_ZMQ_MONITOR and self.opts['zmq_monitor']:
# Socket monitor shall be used the only for debug purposes so using threading doesn't look too bad here
import threading
self._w_monitor = ZeroMQSocketMonitor(self._socket)
t = threading.Thread(target=self._w_monitor.start_poll)
t.start()
self._start_zmq_monitor()
if self.opts.get('ipc_mode', '') == 'tcp':
self.w_uri = 'tcp://127.0.0.1:{0}'.format(
@ -763,27 +796,35 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
# Catch and handle EINTR from when this process is sent
# SIGUSR1 gracefully so we don't choke and die horribly
try:
log.trace('Getting data from puller %s', pull_uri)
package = pull_sock.recv()
unpacked_package = salt.payload.unpackage(package)
if six.PY3:
unpacked_package = salt.transport.frame.decode_embedded_strs(unpacked_package)
payload = unpacked_package['payload']
log.trace('Accepted unpacked package from puller')
if self.opts['zmq_filtering']:
# if you have a specific topic list, use that
if 'topic_lst' in unpacked_package:
for topic in unpacked_package['topic_lst']:
log.trace('Sending filtered data over publisher %s', pub_uri)
# zmq filters are substring match, hash the topic
# to avoid collisions
htopic = hashlib.sha1(topic).hexdigest()
pub_sock.send(htopic, flags=zmq.SNDMORE)
pub_sock.send(payload)
log.trace('Filtered data has been sent')
# otherwise its a broadcast
else:
# TODO: constants file for "broadcast"
log.trace('Sending broadcasted data over publisher %s', pub_uri)
pub_sock.send('broadcast', flags=zmq.SNDMORE)
pub_sock.send(payload)
log.trace('Broadcasted data has been sent')
else:
log.trace('Sending ZMQ-unfiltered data over publisher %s', pub_uri)
pub_sock.send(payload)
log.trace('Unfiltered data has been sent')
except zmq.ZMQError as exc:
if exc.errno == errno.EINTR:
continue
@ -901,14 +942,12 @@ class AsyncReqMessageClient(object):
self.addr = addr
self.linger = linger
if io_loop is None:
if not TORNADO_50:
zmq.eventloop.ioloop.install()
tornado.ioloop.IOLoop.current()
install_zmq()
ZMQDefaultLoop.current()
else:
self.io_loop = io_loop
self.serial = salt.payload.Serial(self.opts)
self.context = zmq.Context()
# wire up sockets
@ -987,7 +1026,8 @@ class AsyncReqMessageClient(object):
try:
ret = yield future
except: # pylint: disable=W0702
except Exception as err: # pylint: disable=W0702
log.debug('Re-init ZMQ socket: %s', err)
self._init_socket() # re-init the zmq socket (no other way in zmq)
del self.send_queue[0]
continue

View file

@ -7,26 +7,9 @@ from __future__ import absolute_import
import tornado.ioloop
import tornado.concurrent
# attempt to use zmq-- if we have it otherwise fallback to tornado loop
try:
import zmq.eventloop.ioloop
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
import tornado
TORNADO_50 = tornado.version_info >= (5,)
if HAS_ZMQ and not TORNADO_50:
LOOP_CLASS = zmq.eventloop.ioloop.ZMQIOLoop
else:
import tornado.ioloop
LOOP_CLASS = tornado.ioloop.IOLoop
import contextlib
from salt.utils import zeromq
@contextlib.contextmanager
@ -60,7 +43,7 @@ class SyncWrapper(object):
if kwargs is None:
kwargs = {}
self.io_loop = LOOP_CLASS()
self.io_loop = zeromq.ZMQDefaultLoop()
kwargs['io_loop'] = self.io_loop
with current_ioloop(self.io_loop):

View file

@ -18,11 +18,7 @@ import salt.utils.dictupdate
# Import third party libs
from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
try:
import zmq
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
from salt.utils.zeromq import zmq
log = logging.getLogger(__name__)

View file

@ -29,12 +29,8 @@ from salt.utils.cache import CacheCli as cache_cli
from salt.utils.process import MultiprocessingProcess
# Import third party libs
import salt.ext.six as six
try:
import zmq
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
from salt.ext import six
from salt.utils.zeromq import zmq
log = logging.getLogger(__name__)

View file

@ -3,21 +3,60 @@
# Import Python libs
from __future__ import absolute_import
# Import Salt libs
import logging
import tornado.ioloop
from salt.exceptions import SaltSystemExit
# Import 3rd-party libs
log = logging.getLogger(__name__)
try:
import zmq
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
zmq = None
log.debug('ZMQ module is not found')
ZMQDefaultLoop = None
ZMQ_VERSION_INFO = (-1, -1, -1)
LIBZMQ_VERSION_INFO = (-1, -1, -1)
try:
if zmq:
ZMQ_VERSION_INFO = tuple([int(v_el) for v_el in zmq.__version__.split('.')])
LIBZMQ_VERSION_INFO = tuple([int(v_el) for v_el in zmq.zmq_version().split('.')])
if ZMQ_VERSION_INFO[0] > 16: # 17.0.x+ deprecates zmq's ioloops
ZMQDefaultLoop = tornado.ioloop.IOLoop
except Exception:
log.exception('Error while getting LibZMQ/PyZMQ library version')
if ZMQDefaultLoop is None:
try:
import zmq.eventloop.ioloop
# Support for ZeroMQ 13.x
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
if tornado.version_info < (5,):
ZMQDefaultLoop = zmq.eventloop.ioloop.ZMQIOLoop
except ImportError:
ZMQDefaultLoop = None
if ZMQDefaultLoop is None:
ZMQDefaultLoop = tornado.ioloop.IOLoop
def install_zmq():
'''
While pyzmq 17 no longer needs any special integration for tornado,
older version still need one.
:return:
'''
if zmq and ZMQ_VERSION_INFO[0] < 17:
if tornado.version_info < (5,):
zmq.eventloop.ioloop.install()
def check_ipc_path_max_len(uri):
# The socket path is limited to 107 characters on Solaris and
# Linux, and 103 characters on BSD-based systems.
if not HAS_ZMQ:
if zmq is None:
return
ipc_path_max_len = getattr(zmq, 'IPC_PATH_MAX_LEN', 103)
if ipc_path_max_len and len(uri) > ipc_path_max_len:

View file

@ -18,13 +18,9 @@ from tests.support.helpers import flaky
from tests.support.unit import skipIf
# Import 3rd-party libs
import salt.ext.six as six
try:
import zmq
from zmq.eventloop.ioloop import ZMQIOLoop
HAS_ZMQ_IOLOOP = True
except ImportError:
HAS_ZMQ_IOLOOP = False
from salt.ext import six
from salt.utils.zeromq import zmq, ZMQDefaultLoop as ZMQIOLoop
HAS_ZMQ_IOLOOP = bool(zmq)
def json_loads(data):