mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Move 4 functions from salt.utils
- salt.utils.reinit_crypto -> salt.utils.crypt.reinit_crypto - salt.utils.appendproctitle -> salt.utils.process.appendproctitle - salt.utils.daemonize -> salt.utils.process.daemonize - salt.utils.daemonize_if -> salt.utils.process.daemonize_if
This commit is contained in:
parent
3a078304d8
commit
554c685ce5
21 changed files with 222 additions and 177 deletions
|
@ -16,7 +16,7 @@ import copy as pycopy
|
|||
# Import Salt libs
|
||||
import salt.exceptions
|
||||
import salt.minion
|
||||
import salt.utils # Can be removed once daemonize, format_call are moved
|
||||
import salt.utils # Can be removed once format_call is moved
|
||||
import salt.utils.args
|
||||
import salt.utils.doc
|
||||
import salt.utils.error
|
||||
|
@ -469,7 +469,7 @@ class AsyncClientMixin(object):
|
|||
# Shutdown the multiprocessing before daemonizing
|
||||
salt.log.setup.shutdown_multiprocessing_logging()
|
||||
|
||||
salt.utils.daemonize()
|
||||
salt.utils.process.daemonize()
|
||||
|
||||
# Reconfigure multiprocessing logging after daemonizing
|
||||
salt.log.setup.setup_multiprocessing_logging()
|
||||
|
|
|
@ -33,6 +33,7 @@ import salt.utils
|
|||
import salt.utils.args
|
||||
import salt.utils.cloud
|
||||
import salt.utils.context
|
||||
import salt.utils.crypt
|
||||
import salt.utils.dictupdate
|
||||
import salt.utils.files
|
||||
import salt.syspaths
|
||||
|
@ -2300,7 +2301,7 @@ def create_multiprocessing(parallel_data, queue=None):
|
|||
This function will be called from another process when running a map in
|
||||
parallel mode. The result from the create is always a json object.
|
||||
'''
|
||||
salt.utils.reinit_crypto()
|
||||
salt.utils.crypt.reinit_crypto()
|
||||
|
||||
parallel_data['opts']['output'] = 'json'
|
||||
cloud = Cloud(parallel_data['opts'])
|
||||
|
@ -2332,7 +2333,7 @@ def destroy_multiprocessing(parallel_data, queue=None):
|
|||
This function will be called from another process when running a map in
|
||||
parallel mode. The result from the destroy is always a json object.
|
||||
'''
|
||||
salt.utils.reinit_crypto()
|
||||
salt.utils.crypt.reinit_crypto()
|
||||
|
||||
parallel_data['opts']['output'] = 'json'
|
||||
clouds = salt.loader.clouds(parallel_data['opts'])
|
||||
|
@ -2368,7 +2369,7 @@ def run_parallel_map_providers_query(data, queue=None):
|
|||
This function will be called from another process when building the
|
||||
providers map.
|
||||
'''
|
||||
salt.utils.reinit_crypto()
|
||||
salt.utils.crypt.reinit_crypto()
|
||||
|
||||
cloud = Cloud(data['opts'])
|
||||
try:
|
||||
|
|
|
@ -50,7 +50,8 @@ import salt.defaults.exitcodes
|
|||
import salt.payload
|
||||
import salt.transport.client
|
||||
import salt.transport.frame
|
||||
import salt.utils # Can be removed when pem_finger, reinit_crypto are moved
|
||||
import salt.utils # Can be removed when pem_finger is moved
|
||||
import salt.utils.crypt
|
||||
import salt.utils.decorators
|
||||
import salt.utils.event
|
||||
import salt.utils.files
|
||||
|
@ -113,7 +114,7 @@ def gen_keys(keydir, keyname, keysize, user=None, passphrase=None):
|
|||
priv = u'{0}.pem'.format(base)
|
||||
pub = u'{0}.pub'.format(base)
|
||||
|
||||
salt.utils.reinit_crypto()
|
||||
salt.utils.crypt.reinit_crypto()
|
||||
gen = RSA.generate(bits=keysize, e=65537)
|
||||
if os.path.isfile(priv):
|
||||
# Between first checking and the generation another process has made
|
||||
|
@ -446,7 +447,7 @@ class AsyncAuth(object):
|
|||
|
||||
self.io_loop = io_loop or tornado.ioloop.IOLoop.current()
|
||||
|
||||
salt.utils.reinit_crypto()
|
||||
salt.utils.crypt.reinit_crypto()
|
||||
key = self.__key(self.opts)
|
||||
# TODO: if we already have creds for this key, lets just re-use
|
||||
if key in AsyncAuth.creds_map:
|
||||
|
|
|
@ -24,6 +24,7 @@ import salt.utils.args
|
|||
import salt.utils.data
|
||||
import salt.utils.files
|
||||
import salt.utils.kinds as kinds
|
||||
import salt.utils.process
|
||||
import salt.utils.stringutils
|
||||
import salt.transport
|
||||
from raet import raeting, nacling
|
||||
|
@ -273,7 +274,7 @@ class SaltRaetNixJobber(ioflo.base.deeding.Deed):
|
|||
data = msg['pub']
|
||||
fn_ = os.path.join(self.proc_dir, data['jid'])
|
||||
self.opts['__ex_id'] = data['jid']
|
||||
salt.utils.daemonize_if(self.opts)
|
||||
salt.utils.process.daemonize_if(self.opts)
|
||||
|
||||
salt.transport.jobber_stack = stack = self._setup_jobber_stack()
|
||||
# set up return destination from source
|
||||
|
|
|
@ -38,7 +38,6 @@ GARBAGE = logging.GARBAGE = 1
|
|||
QUIET = logging.QUIET = 1000
|
||||
|
||||
# Import salt libs
|
||||
import salt.utils # Can be removed once appendproctitle is moved
|
||||
from salt.textformat import TextFormat
|
||||
from salt.log.handlers import (TemporaryLoggingHandler,
|
||||
StreamHandler,
|
||||
|
@ -998,7 +997,9 @@ def patch_python_logging_handlers():
|
|||
|
||||
|
||||
def __process_multiprocessing_logging_queue(opts, queue):
|
||||
salt.utils.appendproctitle('MultiprocessingLoggingQueue')
|
||||
# Avoid circular import
|
||||
import salt.utils.process
|
||||
salt.utils.process.appendproctitle('MultiprocessingLoggingQueue')
|
||||
|
||||
# Assign UID/GID of user to proc if set
|
||||
from salt.utils.verify import check_user
|
||||
|
|
|
@ -47,7 +47,7 @@ import tornado.gen # pylint: disable=F0401
|
|||
|
||||
# Import salt libs
|
||||
import salt.crypt
|
||||
import salt.utils
|
||||
import salt.utils # Can be removed once get_values_of_matching_keys is moved
|
||||
import salt.client
|
||||
import salt.payload
|
||||
import salt.pillar
|
||||
|
@ -65,6 +65,7 @@ import salt.transport.server
|
|||
import salt.log.setup
|
||||
import salt.utils.args
|
||||
import salt.utils.atomicfile
|
||||
import salt.utils.crypt
|
||||
import salt.utils.event
|
||||
import salt.utils.files
|
||||
import salt.utils.gitfs
|
||||
|
@ -222,7 +223,7 @@ class Maintenance(salt.utils.process.SignalHandlingMultiprocessingProcess):
|
|||
This is where any data that needs to be cleanly maintained from the
|
||||
master is maintained.
|
||||
'''
|
||||
salt.utils.appendproctitle(u'Maintenance')
|
||||
salt.utils.process.appendproctitle(u'Maintenance')
|
||||
|
||||
# init things that need to be done after the process is forked
|
||||
self._post_fork_init()
|
||||
|
@ -652,7 +653,7 @@ class Halite(salt.utils.process.SignalHandlingMultiprocessingProcess):
|
|||
'''
|
||||
Fire up halite!
|
||||
'''
|
||||
salt.utils.appendproctitle(self.__class__.__name__)
|
||||
salt.utils.process.appendproctitle(self.__class__.__name__)
|
||||
halite.start(self.hopts)
|
||||
|
||||
|
||||
|
@ -912,13 +913,13 @@ class MWorker(salt.utils.process.SignalHandlingMultiprocessingProcess):
|
|||
'''
|
||||
Start a Master Worker
|
||||
'''
|
||||
salt.utils.appendproctitle(self.name)
|
||||
salt.utils.process.appendproctitle(self.name)
|
||||
self.clear_funcs = ClearFuncs(
|
||||
self.opts,
|
||||
self.key,
|
||||
)
|
||||
self.aes_funcs = AESFuncs(self.opts)
|
||||
salt.utils.reinit_crypto()
|
||||
salt.utils.crypt.reinit_crypto()
|
||||
self.__bind()
|
||||
|
||||
|
||||
|
@ -2163,7 +2164,7 @@ class FloMWorker(MWorker):
|
|||
'''
|
||||
Prepare the needed objects and socket for iteration within ioflo
|
||||
'''
|
||||
salt.utils.appendproctitle(self.__class__.__name__)
|
||||
salt.utils.crypt.appendproctitle(self.__class__.__name__)
|
||||
self.clear_funcs = salt.master.ClearFuncs(
|
||||
self.opts,
|
||||
self.key,
|
||||
|
|
|
@ -98,6 +98,7 @@ import salt.utils.minion
|
|||
import salt.utils.minions
|
||||
import salt.utils.network
|
||||
import salt.utils.platform
|
||||
import salt.utils.process
|
||||
import salt.utils.schedule
|
||||
import salt.utils.user
|
||||
import salt.utils.zeromq
|
||||
|
@ -1470,12 +1471,12 @@ class Minion(MinionBase):
|
|||
# Shutdown the multiprocessing before daemonizing
|
||||
salt.log.setup.shutdown_multiprocessing_logging()
|
||||
|
||||
salt.utils.daemonize_if(opts)
|
||||
salt.utils.process.daemonize_if(opts)
|
||||
|
||||
# Reconfigure multiprocessing logging after daemonizing
|
||||
salt.log.setup.setup_multiprocessing_logging()
|
||||
|
||||
salt.utils.appendproctitle(u'{0}._thread_return {1}'.format(cls.__name__, data[u'jid']))
|
||||
salt.utils.process.appendproctitle(u'{0}._thread_return {1}'.format(cls.__name__, data[u'jid']))
|
||||
|
||||
sdata = {u'pid': os.getpid()}
|
||||
sdata.update(data)
|
||||
|
@ -1654,7 +1655,7 @@ class Minion(MinionBase):
|
|||
This method should be used as a threading target, start the actual
|
||||
minion side execution.
|
||||
'''
|
||||
salt.utils.appendproctitle(u'{0}._thread_multi_return {1}'.format(cls.__name__, data[u'jid']))
|
||||
salt.utils.process.appendproctitle(u'{0}._thread_multi_return {1}'.format(cls.__name__, data[u'jid']))
|
||||
multifunc_ordered = opts.get(u'multifunc_ordered', False)
|
||||
num_funcs = len(data[u'fun'])
|
||||
if multifunc_ordered:
|
||||
|
|
|
@ -28,7 +28,7 @@ from salt.modules.inspectlib import kiwiproc
|
|||
from salt.modules.inspectlib.entities import (AllowedDir, IgnoredDir, Package,
|
||||
PayloadFile, PackageCfgFile)
|
||||
|
||||
import salt.utils # Can be removed when reinit_crypto is moved
|
||||
import salt.utils.crypt
|
||||
import salt.utils.files
|
||||
import salt.utils.fsutils
|
||||
import salt.utils.path
|
||||
|
@ -505,10 +505,10 @@ if __name__ == '__main__':
|
|||
# Double-fork stuff
|
||||
try:
|
||||
if os.fork() > 0:
|
||||
salt.utils.reinit_crypto()
|
||||
salt.utils.crypt.reinit_crypto()
|
||||
sys.exit(0)
|
||||
else:
|
||||
salt.utils.reinit_crypto()
|
||||
salt.utils.crypt.reinit_crypto()
|
||||
except OSError as ex:
|
||||
sys.exit(1)
|
||||
|
||||
|
@ -518,12 +518,12 @@ if __name__ == '__main__':
|
|||
try:
|
||||
pid = os.fork()
|
||||
if pid > 0:
|
||||
salt.utils.reinit_crypto()
|
||||
salt.utils.crypt.reinit_crypto()
|
||||
with salt.utils.files.fopen(os.path.join(pidfile, EnvLoader.PID_FILE), 'w') as fp_:
|
||||
fp_.write('{0}\n'.format(pid))
|
||||
sys.exit(0)
|
||||
except OSError as ex:
|
||||
sys.exit(1)
|
||||
|
||||
salt.utils.reinit_crypto()
|
||||
salt.utils.crypt.reinit_crypto()
|
||||
main(dbfile, pidfile, mode)
|
||||
|
|
|
@ -97,11 +97,12 @@ def minion_process():
|
|||
Start a minion process
|
||||
'''
|
||||
import salt.utils.platform
|
||||
import salt.utils.process
|
||||
import salt.cli.daemons
|
||||
|
||||
# salt_minion spawns this function in a new process
|
||||
|
||||
salt.utils.appendproctitle(u'KeepAlive')
|
||||
salt.utils.process.appendproctitle(u'KeepAlive')
|
||||
|
||||
def handle_hup(manager, sig, frame):
|
||||
manager.minion.reload()
|
||||
|
|
|
@ -23,6 +23,7 @@ import salt.utils
|
|||
import salt.utils.async
|
||||
import salt.utils.event
|
||||
import salt.utils.platform
|
||||
import salt.utils.process
|
||||
import salt.utils.verify
|
||||
import salt.payload
|
||||
import salt.exceptions
|
||||
|
@ -1314,7 +1315,7 @@ class TCPPubServerChannel(salt.transport.server.PubServerChannel):
|
|||
'''
|
||||
Bind to the interface specified in the configuration file
|
||||
'''
|
||||
salt.utils.appendproctitle(self.__class__.__name__)
|
||||
salt.utils.process.appendproctitle(self.__class__.__name__)
|
||||
|
||||
if log_queue is not None:
|
||||
salt.log.setup.set_multiprocessing_logging_queue(log_queue)
|
||||
|
|
|
@ -21,6 +21,7 @@ import salt.crypt
|
|||
import salt.utils
|
||||
import salt.utils.verify
|
||||
import salt.utils.event
|
||||
import salt.utils.process
|
||||
import salt.utils.stringutils
|
||||
import salt.payload
|
||||
import salt.transport.client
|
||||
|
@ -449,7 +450,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
|
|||
Multiprocessing target for the zmq queue device
|
||||
'''
|
||||
self.__setup_signals()
|
||||
salt.utils.appendproctitle('MWorkerQueue')
|
||||
salt.utils.process.appendproctitle('MWorkerQueue')
|
||||
self.context = zmq.Context(self.opts['worker_threads'])
|
||||
# Prepare the zeromq sockets
|
||||
self.uri = 'tcp://{interface}:{ret_port}'.format(**self.opts)
|
||||
|
@ -694,7 +695,7 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
|
|||
'''
|
||||
Bind to the interface specified in the configuration file
|
||||
'''
|
||||
salt.utils.appendproctitle(self.__class__.__name__)
|
||||
salt.utils.process.appendproctitle(self.__class__.__name__)
|
||||
# Set up the context
|
||||
context = zmq.Context(1)
|
||||
# Prepare minion publish socket
|
||||
|
|
|
@ -50,12 +50,6 @@ try:
|
|||
except ImportError:
|
||||
HAS_CPROFILE = False
|
||||
|
||||
try:
|
||||
import Crypto.Random
|
||||
HAS_CRYPTO = True
|
||||
except ImportError:
|
||||
HAS_CRYPTO = False
|
||||
|
||||
try:
|
||||
import timelib
|
||||
HAS_TIMELIB = True
|
||||
|
@ -74,12 +68,6 @@ try:
|
|||
except ImportError:
|
||||
HAS_WIN32API = False
|
||||
|
||||
try:
|
||||
import setproctitle
|
||||
HAS_SETPROCTITLE = True
|
||||
except ImportError:
|
||||
HAS_SETPROCTITLE = False
|
||||
|
||||
try:
|
||||
import ctypes
|
||||
import ctypes.util
|
||||
|
@ -173,91 +161,6 @@ def get_master_key(key_user, opts, skip_perm_errors=False):
|
|||
return ''
|
||||
|
||||
|
||||
def reinit_crypto():
|
||||
'''
|
||||
When a fork arises, pycrypto needs to reinit
|
||||
From its doc::
|
||||
|
||||
Caveat: For the random number generator to work correctly,
|
||||
you must call Random.atfork() in both the parent and
|
||||
child processes after using os.fork()
|
||||
|
||||
'''
|
||||
if HAS_CRYPTO:
|
||||
Crypto.Random.atfork()
|
||||
|
||||
|
||||
def daemonize(redirect_out=True):
|
||||
'''
|
||||
Daemonize a process
|
||||
'''
|
||||
# Late import to avoid circular import.
|
||||
import salt.utils.files
|
||||
|
||||
try:
|
||||
pid = os.fork()
|
||||
if pid > 0:
|
||||
# exit first parent
|
||||
reinit_crypto()
|
||||
sys.exit(salt.defaults.exitcodes.EX_OK)
|
||||
except OSError as exc:
|
||||
log.error(
|
||||
'fork #1 failed: {0} ({1})'.format(exc.errno, exc.strerror)
|
||||
)
|
||||
sys.exit(salt.defaults.exitcodes.EX_GENERIC)
|
||||
|
||||
# decouple from parent environment
|
||||
os.chdir('/')
|
||||
# noinspection PyArgumentList
|
||||
os.setsid()
|
||||
os.umask(18)
|
||||
|
||||
# do second fork
|
||||
try:
|
||||
pid = os.fork()
|
||||
if pid > 0:
|
||||
reinit_crypto()
|
||||
sys.exit(salt.defaults.exitcodes.EX_OK)
|
||||
except OSError as exc:
|
||||
log.error(
|
||||
'fork #2 failed: {0} ({1})'.format(
|
||||
exc.errno, exc.strerror
|
||||
)
|
||||
)
|
||||
sys.exit(salt.defaults.exitcodes.EX_GENERIC)
|
||||
|
||||
reinit_crypto()
|
||||
|
||||
# A normal daemonization redirects the process output to /dev/null.
|
||||
# Unfortunately when a python multiprocess is called the output is
|
||||
# not cleanly redirected and the parent process dies when the
|
||||
# multiprocessing process attempts to access stdout or err.
|
||||
if redirect_out:
|
||||
with salt.utils.files.fopen('/dev/null', 'r+') as dev_null:
|
||||
# Redirect python stdin/out/err
|
||||
# and the os stdin/out/err which can be different
|
||||
os.dup2(dev_null.fileno(), sys.stdin.fileno())
|
||||
os.dup2(dev_null.fileno(), sys.stdout.fileno())
|
||||
os.dup2(dev_null.fileno(), sys.stderr.fileno())
|
||||
os.dup2(dev_null.fileno(), 0)
|
||||
os.dup2(dev_null.fileno(), 1)
|
||||
os.dup2(dev_null.fileno(), 2)
|
||||
|
||||
|
||||
def daemonize_if(opts):
|
||||
'''
|
||||
Daemonize a module function process if multiprocessing is True and the
|
||||
process is not being called by salt-call
|
||||
'''
|
||||
if 'salt-call' in sys.argv[0]:
|
||||
return
|
||||
if not opts.get('multiprocessing', True):
|
||||
return
|
||||
if sys.platform.startswith('win'):
|
||||
return
|
||||
daemonize(False)
|
||||
|
||||
|
||||
def profile_func(filename=None):
|
||||
'''
|
||||
Decorator for adding profiling to a nested function in Salt
|
||||
|
@ -1385,14 +1288,6 @@ def import_json():
|
|||
continue
|
||||
|
||||
|
||||
def appendproctitle(name):
|
||||
'''
|
||||
Append "name" to the current process title
|
||||
'''
|
||||
if HAS_SETPROCTITLE:
|
||||
setproctitle.setproctitle(setproctitle.getproctitle() + ' ' + name)
|
||||
|
||||
|
||||
def human_size_to_bytes(human_size):
|
||||
'''
|
||||
Convert human-readable units to bytes
|
||||
|
@ -1517,6 +1412,58 @@ def fnmatch_multiple(candidates, pattern):
|
|||
# MOVED FUNCTIONS
|
||||
#
|
||||
# These are deprecated and will be removed in Neon.
|
||||
def appendproctitle(name):
|
||||
# Late import to avoid circular import.
|
||||
import salt.utils.versions
|
||||
import salt.utils.process
|
||||
salt.utils.versions.warn_until(
|
||||
'Neon',
|
||||
'Use of \'salt.utils.appendproctitle\' detected. This function has been '
|
||||
'moved to \'salt.utils.process.appendproctitle\' as of Salt Oxygen. '
|
||||
'This warning will be removed in Salt Neon.'
|
||||
)
|
||||
return salt.utils.process.appendproctitle(name)
|
||||
|
||||
|
||||
def daemonize(redirect_out=True):
|
||||
# Late import to avoid circular import.
|
||||
import salt.utils.versions
|
||||
import salt.utils.process
|
||||
salt.utils.versions.warn_until(
|
||||
'Neon',
|
||||
'Use of \'salt.utils.daemonize\' detected. This function has been '
|
||||
'moved to \'salt.utils.process.daemonize\' as of Salt Oxygen. '
|
||||
'This warning will be removed in Salt Neon.'
|
||||
)
|
||||
return salt.utils.process.daemonize(redirect_out)
|
||||
|
||||
|
||||
def daemonize_if(opts):
|
||||
# Late import to avoid circular import.
|
||||
import salt.utils.versions
|
||||
import salt.utils.process
|
||||
salt.utils.versions.warn_until(
|
||||
'Neon',
|
||||
'Use of \'salt.utils.daemonize_if\' detected. This function has been '
|
||||
'moved to \'salt.utils.process.daemonize_if\' as of Salt Oxygen. '
|
||||
'This warning will be removed in Salt Neon.'
|
||||
)
|
||||
return salt.utils.process.daemonize_if(opts)
|
||||
|
||||
|
||||
def reinit_crypto():
|
||||
# Late import to avoid circular import.
|
||||
import salt.utils.versions
|
||||
import salt.utils.crypt
|
||||
salt.utils.versions.warn_until(
|
||||
'Neon',
|
||||
'Use of \'salt.utils.reinit_crypto\' detected. This function has been '
|
||||
'moved to \'salt.utils.crypt.reinit_crypto\' as of Salt Oxygen. '
|
||||
'This warning will be removed in Salt Neon.'
|
||||
)
|
||||
return salt.utils.crypt.reinit_crypto()
|
||||
|
||||
|
||||
def to_bytes(s, encoding=None):
|
||||
# Late import to avoid circular import.
|
||||
import salt.utils.versions
|
||||
|
|
|
@ -72,7 +72,6 @@ import tornado.iostream
|
|||
# Import salt libs
|
||||
import salt.config
|
||||
import salt.payload
|
||||
import salt.utils
|
||||
import salt.utils.async
|
||||
import salt.utils.cache
|
||||
import salt.utils.dicttrim
|
||||
|
@ -1078,7 +1077,7 @@ class EventPublisher(salt.utils.process.SignalHandlingMultiprocessingProcess):
|
|||
'''
|
||||
Bind the pub and pull sockets for events
|
||||
'''
|
||||
salt.utils.appendproctitle(self.__class__.__name__)
|
||||
salt.utils.process.appendproctitle(self.__class__.__name__)
|
||||
self.io_loop = tornado.ioloop.IOLoop()
|
||||
with salt.utils.async.current_ioloop(self.io_loop):
|
||||
if self.opts['ipc_mode'] == 'tcp':
|
||||
|
@ -1243,7 +1242,7 @@ class EventReturn(salt.utils.process.SignalHandlingMultiprocessingProcess):
|
|||
'''
|
||||
Spin up the multiprocess event returner
|
||||
'''
|
||||
salt.utils.appendproctitle(self.__class__.__name__)
|
||||
salt.utils.process.appendproctitle(self.__class__.__name__)
|
||||
self.event = get_event('master', opts=self.opts, listen=True)
|
||||
events = self.event.iter_events(full=True)
|
||||
self.event.fire_event({}, 'salt/event_listen/start')
|
||||
|
|
|
@ -36,6 +36,7 @@ import salt.utils.files
|
|||
import salt.utils.jid
|
||||
import salt.utils.kinds as kinds
|
||||
import salt.utils.platform
|
||||
import salt.utils.process
|
||||
import salt.utils.user
|
||||
import salt.utils.xdg
|
||||
from salt.defaults import DEFAULT_TARGET_DELIM
|
||||
|
@ -1004,7 +1005,7 @@ class DaemonMixIn(six.with_metaclass(MixInMeta, object)):
|
|||
log.shutdown_multiprocessing_logging_listener(daemonizing=True)
|
||||
|
||||
# Late import so logging works correctly
|
||||
salt.utils.daemonize()
|
||||
salt.utils.process.daemonize()
|
||||
|
||||
# Setup the multiprocessing log queue listener if enabled
|
||||
self._setup_mp_logging_listener()
|
||||
|
|
|
@ -20,7 +20,6 @@ import socket
|
|||
|
||||
# Import salt libs
|
||||
import salt.defaults.exitcodes
|
||||
import salt.utils # Can be removed once appendproctitle is moved
|
||||
import salt.utils.files
|
||||
import salt.utils.path
|
||||
import salt.utils.platform
|
||||
|
@ -43,6 +42,90 @@ try:
|
|||
except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
import setproctitle
|
||||
HAS_SETPROCTITLE = True
|
||||
except ImportError:
|
||||
HAS_SETPROCTITLE = False
|
||||
|
||||
|
||||
def appendproctitle(name):
|
||||
'''
|
||||
Append "name" to the current process title
|
||||
'''
|
||||
if HAS_SETPROCTITLE:
|
||||
setproctitle.setproctitle(setproctitle.getproctitle() + ' ' + name)
|
||||
|
||||
|
||||
def daemonize(redirect_out=True):
|
||||
'''
|
||||
Daemonize a process
|
||||
'''
|
||||
# Avoid circular import
|
||||
import salt.utils.crypt
|
||||
try:
|
||||
pid = os.fork()
|
||||
if pid > 0:
|
||||
# exit first parent
|
||||
salt.utils.crypt.reinit_crypto()
|
||||
sys.exit(salt.defaults.exitcodes.EX_OK)
|
||||
except OSError as exc:
|
||||
log.error(
|
||||
'fork #1 failed: {0} ({1})'.format(exc.errno, exc.strerror)
|
||||
)
|
||||
sys.exit(salt.defaults.exitcodes.EX_GENERIC)
|
||||
|
||||
# decouple from parent environment
|
||||
os.chdir('/')
|
||||
# noinspection PyArgumentList
|
||||
os.setsid()
|
||||
os.umask(18)
|
||||
|
||||
# do second fork
|
||||
try:
|
||||
pid = os.fork()
|
||||
if pid > 0:
|
||||
salt.utils.crypt.reinit_crypto()
|
||||
sys.exit(salt.defaults.exitcodes.EX_OK)
|
||||
except OSError as exc:
|
||||
log.error(
|
||||
'fork #2 failed: {0} ({1})'.format(
|
||||
exc.errno, exc.strerror
|
||||
)
|
||||
)
|
||||
sys.exit(salt.defaults.exitcodes.EX_GENERIC)
|
||||
|
||||
salt.utils.crypt.reinit_crypto()
|
||||
|
||||
# A normal daemonization redirects the process output to /dev/null.
|
||||
# Unfortunately when a python multiprocess is called the output is
|
||||
# not cleanly redirected and the parent process dies when the
|
||||
# multiprocessing process attempts to access stdout or err.
|
||||
if redirect_out:
|
||||
with salt.utils.files.fopen('/dev/null', 'r+') as dev_null:
|
||||
# Redirect python stdin/out/err
|
||||
# and the os stdin/out/err which can be different
|
||||
os.dup2(dev_null.fileno(), sys.stdin.fileno())
|
||||
os.dup2(dev_null.fileno(), sys.stdout.fileno())
|
||||
os.dup2(dev_null.fileno(), sys.stderr.fileno())
|
||||
os.dup2(dev_null.fileno(), 0)
|
||||
os.dup2(dev_null.fileno(), 1)
|
||||
os.dup2(dev_null.fileno(), 2)
|
||||
|
||||
|
||||
def daemonize_if(opts):
|
||||
'''
|
||||
Daemonize a module function process if multiprocessing is True and the
|
||||
process is not being called by salt-call
|
||||
'''
|
||||
if 'salt-call' in sys.argv[0]:
|
||||
return
|
||||
if not opts.get('multiprocessing', True):
|
||||
return
|
||||
if sys.platform.startswith('win'):
|
||||
return
|
||||
daemonize(False)
|
||||
|
||||
|
||||
def systemd_notify_call(action):
|
||||
process = subprocess.Popen(['systemd-notify', action], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
|
@ -397,7 +480,7 @@ class ProcessManager(object):
|
|||
Load and start all available api modules
|
||||
'''
|
||||
log.debug('Process Manager starting!')
|
||||
salt.utils.appendproctitle(self.name)
|
||||
appendproctitle(self.name)
|
||||
|
||||
# make sure to kill the subprocesses if the parent is killed
|
||||
if signal.getsignal(signal.SIGTERM) is signal.SIG_DFL:
|
||||
|
|
|
@ -232,7 +232,7 @@ class Reactor(salt.utils.process.SignalHandlingMultiprocessingProcess, salt.stat
|
|||
'''
|
||||
Enter into the server loop
|
||||
'''
|
||||
salt.utils.appendproctitle(self.__class__.__name__)
|
||||
salt.utils.process.appendproctitle(self.__class__.__name__)
|
||||
|
||||
# instantiate some classes inside our new process
|
||||
self.event = salt.utils.event.get_event(
|
||||
|
|
|
@ -337,7 +337,6 @@ import copy
|
|||
|
||||
# Import Salt libs
|
||||
import salt.config
|
||||
import salt.utils # Can be removed once appendproctitle and daemonize_if are moved
|
||||
import salt.utils.args
|
||||
import salt.utils.error
|
||||
import salt.utils.event
|
||||
|
@ -779,7 +778,7 @@ class Schedule(object):
|
|||
log.warning('schedule: The metadata parameter must be '
|
||||
'specified as a dictionary. Ignoring.')
|
||||
|
||||
salt.utils.appendproctitle('{0} {1}'.format(self.__class__.__name__, ret['jid']))
|
||||
salt.utils.process.appendproctitle('{0} {1}'.format(self.__class__.__name__, ret['jid']))
|
||||
|
||||
if not self.standalone:
|
||||
proc_fn = os.path.join(
|
||||
|
@ -818,7 +817,7 @@ class Schedule(object):
|
|||
log_setup.setup_multiprocessing_logging()
|
||||
|
||||
# Don't *BEFORE* to go into try to don't let it triple execute the finally section.
|
||||
salt.utils.daemonize_if(self.opts)
|
||||
salt.utils.process.daemonize_if(self.opts)
|
||||
|
||||
# TODO: Make it readable! Splt to funcs, remove nested try-except-finally sections.
|
||||
try:
|
||||
|
|
|
@ -52,7 +52,7 @@ except ImportError:
|
|||
import resource
|
||||
|
||||
# Import salt libs
|
||||
import salt.utils
|
||||
import salt.utils.crypt
|
||||
import salt.utils.stringutils
|
||||
from salt.ext.six import string_types
|
||||
from salt.log.setup import LOG_LEVELS
|
||||
|
@ -493,7 +493,7 @@ class Terminal(object):
|
|||
# Close parent FDs
|
||||
os.close(stdout_parent_fd)
|
||||
os.close(stderr_parent_fd)
|
||||
salt.utils.reinit_crypto()
|
||||
salt.utils.crypt.reinit_crypto()
|
||||
|
||||
# ----- Make STDOUT the controlling PTY --------------------->
|
||||
child_name = os.ttyname(stdout_child_fd)
|
||||
|
@ -554,7 +554,7 @@ class Terminal(object):
|
|||
# <---- Duplicate Descriptors --------------------------------
|
||||
else:
|
||||
# Parent. Close Child PTY's
|
||||
salt.utils.reinit_crypto()
|
||||
salt.utils.crypt.reinit_crypto()
|
||||
os.close(stdout_child_fd)
|
||||
os.close(stderr_child_fd)
|
||||
|
||||
|
|
|
@ -49,7 +49,6 @@ import salt.minion
|
|||
import salt.runner
|
||||
import salt.output
|
||||
import salt.version
|
||||
import salt.utils # Can be removed once appendproctitle is moved
|
||||
import salt.utils.color
|
||||
import salt.utils.files
|
||||
import salt.utils.path
|
||||
|
@ -261,7 +260,7 @@ class TestDaemon(object):
|
|||
|
||||
def start_daemon(self, cls, opts, start_fun):
|
||||
def start(cls, opts, start_fun):
|
||||
salt.utils.appendproctitle('{0}-{1}'.format(self.__class__.__name__, cls.__name__))
|
||||
salt.utils.process.appendproctitle('{0}-{1}'.format(self.__class__.__name__, cls.__name__))
|
||||
daemon = cls(opts)
|
||||
getattr(daemon, start_fun)()
|
||||
process = multiprocessing.Process(target=start,
|
||||
|
@ -1143,7 +1142,7 @@ class TestDaemon(object):
|
|||
]
|
||||
|
||||
def wait_for_minion_connections(self, targets, timeout):
|
||||
salt.utils.appendproctitle('WaitForMinionConnections')
|
||||
salt.utils.process.appendproctitle('WaitForMinionConnections')
|
||||
sys.stdout.write(
|
||||
' {LIGHT_BLUE}*{ENDC} Waiting at most {0} for minions({1}) to '
|
||||
'connect back\n'.format(
|
||||
|
@ -1286,13 +1285,13 @@ class TestDaemon(object):
|
|||
return True
|
||||
|
||||
def sync_minion_states(self, targets, timeout=None):
|
||||
salt.utils.appendproctitle('SyncMinionStates')
|
||||
salt.utils.process.appendproctitle('SyncMinionStates')
|
||||
self.sync_minion_modules_('states', targets, timeout=timeout)
|
||||
|
||||
def sync_minion_modules(self, targets, timeout=None):
|
||||
salt.utils.appendproctitle('SyncMinionModules')
|
||||
salt.utils.process.appendproctitle('SyncMinionModules')
|
||||
self.sync_minion_modules_('modules', targets, timeout=timeout)
|
||||
|
||||
def sync_minion_grains(self, targets, timeout=None):
|
||||
salt.utils.appendproctitle('SyncMinionGrains')
|
||||
salt.utils.process.appendproctitle('SyncMinionGrains')
|
||||
self.sync_minion_modules_('grains', targets, timeout=timeout)
|
||||
|
|
|
@ -10,9 +10,13 @@ import multiprocessing
|
|||
|
||||
# Import Salt Testing libs
|
||||
from tests.support.unit import TestCase, skipIf
|
||||
from tests.support.mock import (
|
||||
patch,
|
||||
NO_MOCK,
|
||||
NO_MOCK_REASON
|
||||
)
|
||||
|
||||
# Import salt libs
|
||||
import salt.utils
|
||||
import salt.utils.process
|
||||
|
||||
# Import 3rd-party libs
|
||||
|
@ -27,7 +31,7 @@ class TestProcessManager(TestCase):
|
|||
Make sure that the process is alive 2s later
|
||||
'''
|
||||
def spin():
|
||||
salt.utils.appendproctitle('test_basic')
|
||||
salt.utils.process.appendproctitle('test_basic')
|
||||
while True:
|
||||
time.sleep(1)
|
||||
|
||||
|
@ -50,7 +54,7 @@ class TestProcessManager(TestCase):
|
|||
|
||||
def test_kill(self):
|
||||
def spin():
|
||||
salt.utils.appendproctitle('test_kill')
|
||||
salt.utils.process.appendproctitle('test_kill')
|
||||
while True:
|
||||
time.sleep(1)
|
||||
|
||||
|
@ -79,7 +83,7 @@ class TestProcessManager(TestCase):
|
|||
Make sure that the process is alive 2s later
|
||||
'''
|
||||
def die():
|
||||
salt.utils.appendproctitle('test_restarting')
|
||||
salt.utils.process.appendproctitle('test_restarting')
|
||||
|
||||
process_manager = salt.utils.process.ProcessManager()
|
||||
process_manager.add_process(die)
|
||||
|
@ -101,7 +105,7 @@ class TestProcessManager(TestCase):
|
|||
@skipIf(sys.version_info < (2, 7), 'Needs > Py 2.7 due to bug in stdlib')
|
||||
def test_counter(self):
|
||||
def incr(counter, num):
|
||||
salt.utils.appendproctitle('test_counter')
|
||||
salt.utils.process.appendproctitle('test_counter')
|
||||
for _ in range(0, num):
|
||||
counter.value += 1
|
||||
counter = multiprocessing.Value('i', 0)
|
||||
|
@ -162,3 +166,25 @@ class TestThreadPool(TestCase):
|
|||
self.assertEqual(counter.value, 0)
|
||||
# make sure the queue is still full
|
||||
self.assertEqual(pool._job_queue.qsize(), 1)
|
||||
|
||||
|
||||
class TestProcess(TestCase):
|
||||
|
||||
@skipIf(NO_MOCK, NO_MOCK_REASON)
|
||||
def test_daemonize_if(self):
|
||||
# pylint: disable=assignment-from-none
|
||||
with patch('sys.argv', ['salt-call']):
|
||||
ret = salt.utils.process.daemonize_if({})
|
||||
self.assertEqual(None, ret)
|
||||
|
||||
ret = salt.utils.process.daemonize_if({'multiprocessing': False})
|
||||
self.assertEqual(None, ret)
|
||||
|
||||
with patch('sys.platform', 'win'):
|
||||
ret = salt.utils.process.daemonize_if({})
|
||||
self.assertEqual(None, ret)
|
||||
|
||||
with patch('salt.utils.process.daemonize'):
|
||||
salt.utils.process.daemonize_if({})
|
||||
self.assertTrue(salt.utils.process.daemonize.called)
|
||||
# pylint: enable=assignment-from-none
|
||||
|
|
|
@ -18,6 +18,7 @@ from tests.support.mock import (
|
|||
import salt.utils
|
||||
import salt.utils.data
|
||||
import salt.utils.jid
|
||||
import salt.utils.process
|
||||
import salt.utils.yamlencoding
|
||||
import salt.utils.zeromq
|
||||
from salt.exceptions import SaltSystemExit, CommandNotFoundError
|
||||
|
@ -432,25 +433,6 @@ class UtilsTestCase(TestCase):
|
|||
ret = salt.utils.data.repack_dictlist(LOREM_IPSUM)
|
||||
self.assertDictEqual(ret, {})
|
||||
|
||||
@skipIf(NO_MOCK, NO_MOCK_REASON)
|
||||
def test_daemonize_if(self):
|
||||
# pylint: disable=assignment-from-none
|
||||
with patch('sys.argv', ['salt-call']):
|
||||
ret = salt.utils.daemonize_if({})
|
||||
self.assertEqual(None, ret)
|
||||
|
||||
ret = salt.utils.daemonize_if({'multiprocessing': False})
|
||||
self.assertEqual(None, ret)
|
||||
|
||||
with patch('sys.platform', 'win'):
|
||||
ret = salt.utils.daemonize_if({})
|
||||
self.assertEqual(None, ret)
|
||||
|
||||
with patch('salt.utils.daemonize'):
|
||||
salt.utils.daemonize_if({})
|
||||
self.assertTrue(salt.utils.daemonize.called)
|
||||
# pylint: enable=assignment-from-none
|
||||
|
||||
@skipIf(NO_MOCK, NO_MOCK_REASON)
|
||||
def test_gen_jid(self):
|
||||
now = datetime.datetime(2002, 12, 25, 12, 00, 00, 00)
|
||||
|
|
Loading…
Add table
Reference in a new issue