Merge pull request #50345 from dwoz/pub_d_logging

Configure logging for ZMQ PubServer daemon
This commit is contained in:
Daniel Wozniak 2018-11-02 12:40:31 -07:00 committed by GitHub
commit 99c8f356eb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 13 additions and 13 deletions

View file

@ -529,9 +529,10 @@ class Master(SMaster):
self.process_manager = salt.utils.process.ProcessManager(wait_for_kill=5)
pub_channels = []
log.info('Creating master publisher process')
log_queue = salt.log.setup.get_multiprocessing_logging_queue()
for transport, opts in iter_transport_opts(self.opts):
chan = salt.transport.server.PubServerChannel.factory(opts)
chan.pre_fork(self.process_manager)
chan.pre_fork(self.process_manager, kwargs={'log_queue': log_queue})
pub_channels.append(chan)
log.info('Creating master event publisher process')
@ -589,7 +590,7 @@ class Master(SMaster):
log.info('Creating master request server process')
kwargs = {}
if salt.utils.is_windows():
kwargs['log_queue'] = salt.log.setup.get_multiprocessing_logging_queue()
kwargs['log_queue'] = log_queue
kwargs['secrets'] = SMaster.secrets
self.process_manager.add_process(

View file

@ -92,7 +92,7 @@ class PubServerChannel(object):
raise Exception('Channels are only defined for ZeroMQ and raet')
# return NewKindOfChannel(opts, **kwargs)
def pre_fork(self, process_manager):
def pre_fork(self, process_manager, kwargs=None):
'''
Do anything necessary pre-fork. Since this is on the master side this will
primarily be used to create IPC channels and create our daemon process to

View file

@ -1384,18 +1384,12 @@ class TCPPubServerChannel(salt.transport.server.PubServerChannel):
except (KeyboardInterrupt, SystemExit):
salt.log.setup.shutdown_multiprocessing_logging()
def pre_fork(self, process_manager):
def pre_fork(self, process_manager, kwargs=None):
'''
Do anything necessary pre-fork. Since this is on the master side this will
primarily be used to create IPC channels and create our daemon process to
do the actual publishing
'''
kwargs = {}
if salt.utils.is_windows():
kwargs['log_queue'] = (
salt.log.setup.get_multiprocessing_logging_queue()
)
process_manager.add_process(self._publish_daemon, kwargs=kwargs)
def publish(self, load):

View file

@ -21,6 +21,7 @@ from salt.ext.six.moves import map
import salt.auth
import salt.crypt
import salt.utils
import salt.log.setup
import salt.utils.verify
import salt.utils.event
import salt.utils.files
@ -745,11 +746,15 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
def connect(self):
return tornado.gen.sleep(5)
def _publish_daemon(self):
def _publish_daemon(self, log_queue=None):
'''
Bind to the interface specified in the configuration file
'''
salt.utils.appendproctitle(self.__class__.__name__)
if log_queue:
salt.log.setup.set_multiprocessing_logging_queue(log_queue)
salt.log.setup.setup_multiprocessing_logging(log_queue)
# Set up the context
context = zmq.Context(1)
# Prepare minion publish socket
@ -841,7 +846,7 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
if context.closed is False:
context.term()
def pre_fork(self, process_manager):
def pre_fork(self, process_manager, kwargs=None):
'''
Do anything necessary pre-fork. Since this is on the master side this will
primarily be used to create IPC channels and create our daemon process to
@ -849,7 +854,7 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
:param func process_manager: A ProcessManager, from salt.utils.process.ProcessManager
'''
process_manager.add_process(self._publish_daemon)
process_manager.add_process(self._publish_daemon, kwargs=kwargs)
def publish(self, load):
'''