Add event publisher for the master

This commit is contained in:
Thomas S Hatch 2012-06-03 16:29:06 -06:00
parent 4953115e9b
commit 02a1c677ec

View file

@ -173,6 +173,7 @@ class Master(SMaster):
aes_funcs,
clear_funcs)
reqserv.start_publisher()
reqserv.start_event_publisher()
def sigterm_clean(signum, frame):
'''
@ -199,6 +200,52 @@ class Master(SMaster):
raise SystemExit('\nExiting on Ctrl-c')
class EventPublisher(multiprocessing.Process):
'''
The interface that takes master events and republishes them out to anyone
who wants to listen
'''
def __init__(self, opts):
super(EventPublisher, self).__init__()
self.opts = opts
def run(self):
'''
Bind the pub and pull sockets for events
'''
# Set up the context
context = zmq.Context(1)
# Prepare the master event publisher
epub_sock = context.socket(zmq.PUB)
epub_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'master_event_pub.ipc')
)
# Prepare master event pull socket
epull_sock = context.socket(zmq.PULL)
epull_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'master_event_pull.ipc')
)
# Start the master event publisher
log.info('Starting the Salt Event Publisher on {0}'.format(epub_uri))
epub_sock.bind(epub_uri)
epull_sock.bind(epull_uri)
try:
while True:
# Catch and handle EINTR from when this process is sent
# SIGUSR1 gracefully so we don't choke and die horribly
try:
package = epull_sock.recv()
epub_sock.send(package)
except zmq.ZMQError as exc:
if exc.errno == errno.EINTR:
continue
raise exc
except KeyboardInterrupt:
epub_sock.close()
epull_sock.close()
class Publisher(multiprocessing.Process):
'''
The publishing interface, a simple zeromq publisher that sends out the
@ -212,14 +259,18 @@ class Publisher(multiprocessing.Process):
'''
Bind to the interface specified in the configuration file
'''
# Set up the context
context = zmq.Context(1)
# Prepare minion publish socket
pub_sock = context.socket(zmq.PUB)
pub_sock.setsockopt(zmq.HWM, 1)
pull_sock = context.socket(zmq.PULL)
pub_uri = 'tcp://{0[interface]}:{0[publish_port]}'.format(self.opts)
# Prepare minion pull socket
pull_sock = context.socket(zmq.PULL)
pull_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
# Start the minion command publisher
log.info('Starting the Salt Publisher on {0}'.format(pub_uri))
pub_sock.bind(pub_uri)
pull_sock.bind(pull_uri)
@ -300,6 +351,15 @@ class ReqServer(object):
self.publisher = Publisher(self.opts)
self.publisher.start()
def start_event_publisher(self):
'''
Start the salt publisher interface
'''
# Start the publisher
self.eventpublisher = EventPublisher(self.opts)
self.eventpublisher.start()
def run(self):
'''
Start up the ReqServer