Merge pull request #23170 from DSRCompany/readd_zmq_monitor

#22595 Re-add ZMQ Monitor socket
This commit is contained in:
Justin Findlay 2015-04-29 10:19:26 -06:00
commit ee91670a7f

View file

@ -29,6 +29,8 @@ import zmq.eventloop.ioloop
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
import zmq.eventloop.zmqstream
import zmq.utils.monitor
EVENT_MAP = None
# Import Tornado Libs
import tornado
@ -213,7 +215,41 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
# IPv6 sockets work for both IPv6 and IPv4 addresses
self._socket.setsockopt(zmq.IPV4ONLY, 0)
self._init_monitor()
def _init_monitor(self):
if not self.opts['zmq_monitor']:
return
global EVENT_MAP
EVENT_MAP = {}
for name in dir(zmq):
if name.startswith('EVENT_'):
value = getattr(zmq, name)
EVENT_MAP[value] = name
def monitor_callback(msg):
evt = zmq.utils.monitor.parse_monitor_message(msg)
evt['description'] = EVENT_MAP[evt['event']]
log.debug("ZeroMQ event: {0}".format(evt))
if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
self._stop_monitor()
self._monitor_socket = self._socket.get_monitor_socket()
self._monitor_stream = zmq.eventloop.zmqstream.ZMQStream(self._monitor_socket, io_loop=self.io_loop)
self._monitor_stream.on_recv(monitor_callback)
def _stop_monitor(self):
if not hasattr(self, '_monitor_socket') or self._monitor_socket is None:
return
self._socket.disable_monitor()
self._monitor_stream.close()
self._monitor_socket = None
self._monitor_stream = None
log.trace("Event monitor done!")
def destroy(self):
self._stop_monitor()
if hasattr(self, '_stream'):
# TODO: Optionally call stream.close() on newer pyzmq? Its broken on some
self._stream.io_loop.remove_handler(self._stream.socket)