mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #53258 from s0undt3ch/2019.2.1
[2019.2.1] Fix multiprocessing logging queue dict changing during iteration errors
This commit is contained in:
commit
d011beb90f
6 changed files with 184 additions and 39 deletions
|
@ -11,6 +11,7 @@ from __future__ import absolute_import, print_function, unicode_literals
|
|||
|
||||
# Import python libs
|
||||
import sys
|
||||
import copy
|
||||
import logging
|
||||
import threading
|
||||
import logging.handlers
|
||||
|
@ -193,32 +194,33 @@ if sys.version_info < (3, 2):
|
|||
self.queue.put_nowait(record)
|
||||
except queue.Full:
|
||||
sys.stderr.write('[WARNING ] Message queue is full, '
|
||||
'unable to write "{0}" to log'.format(record)
|
||||
)
|
||||
'unable to write "{0}" to log'.format(record))
|
||||
|
||||
def prepare(self, record):
|
||||
'''
|
||||
Prepares a record for queuing. The object returned by this method is
|
||||
enqueued.
|
||||
|
||||
The base implementation formats the record to merge the message
|
||||
and arguments, and removes unpickleable items from the record
|
||||
in-place.
|
||||
|
||||
You might want to override this method if you want to convert
|
||||
the record to a dict or JSON string, or send a modified copy
|
||||
of the record while leaving the original intact.
|
||||
'''
|
||||
# The format operation gets traceback text into record.exc_text
|
||||
# (if there's exception data), and also puts the message into
|
||||
# record.message. We can then use this to replace the original
|
||||
# (if there's exception data), and also returns the formatted
|
||||
# message. We can then use this to replace the original
|
||||
# msg + args, as these might be unpickleable. We also zap the
|
||||
# exc_info attribute, as it's no longer needed and, if not None,
|
||||
# will typically not be pickleable.
|
||||
self.format(record)
|
||||
record.msg = record.getMessage()
|
||||
# exc_info and exc_text attributes, as they are no longer
|
||||
# needed and, if not None, will typically not be pickleable.
|
||||
msg = self.format(record)
|
||||
# bpo-35726: make copy of record to avoid affecting other handlers in the chain.
|
||||
record = copy.copy(record)
|
||||
record.message = msg
|
||||
record.msg = msg
|
||||
record.args = None
|
||||
record.exc_info = None
|
||||
record.exc_text = None
|
||||
return record
|
||||
|
||||
def emit(self, record):
|
||||
|
@ -231,6 +233,64 @@ if sys.version_info < (3, 2):
|
|||
self.enqueue(self.prepare(record))
|
||||
except Exception:
|
||||
self.handleError(record)
|
||||
elif sys.version_info < (3, 7):
|
||||
# On python versions lower than 3.7, we sill subclass and overwrite prepare to include the fix for:
|
||||
# https://bugs.python.org/issue35726
|
||||
class QueueHandler(ExcInfoOnLogLevelFormatMixIn, logging.handlers.QueueHandler): # pylint: disable=no-member,E0240
|
||||
|
||||
def enqueue(self, record):
|
||||
'''
|
||||
Enqueue a record.
|
||||
|
||||
The base implementation uses put_nowait. You may want to override
|
||||
this method if you want to use blocking, timeouts or custom queue
|
||||
implementations.
|
||||
'''
|
||||
try:
|
||||
self.queue.put_nowait(record)
|
||||
except queue.Full:
|
||||
sys.stderr.write('[WARNING ] Message queue is full, '
|
||||
'unable to write "{0}" to log'.format(record))
|
||||
|
||||
def prepare(self, record):
|
||||
'''
|
||||
Prepares a record for queuing. The object returned by this method is
|
||||
enqueued.
|
||||
The base implementation formats the record to merge the message
|
||||
and arguments, and removes unpickleable items from the record
|
||||
in-place.
|
||||
You might want to override this method if you want to convert
|
||||
the record to a dict or JSON string, or send a modified copy
|
||||
of the record while leaving the original intact.
|
||||
'''
|
||||
# The format operation gets traceback text into record.exc_text
|
||||
# (if there's exception data), and also returns the formatted
|
||||
# message. We can then use this to replace the original
|
||||
# msg + args, as these might be unpickleable. We also zap the
|
||||
# exc_info and exc_text attributes, as they are no longer
|
||||
# needed and, if not None, will typically not be pickleable.
|
||||
msg = self.format(record)
|
||||
# bpo-35726: make copy of record to avoid affecting other handlers in the chain.
|
||||
record = copy.copy(record)
|
||||
record.message = msg
|
||||
record.msg = msg
|
||||
record.args = None
|
||||
record.exc_info = None
|
||||
record.exc_text = None
|
||||
return record
|
||||
else:
|
||||
class QueueHandler(ExcInfoOnLogLevelFormatMixIn, logging.handlers.QueueHandler): # pylint: disable=no-member,E0240
|
||||
pass
|
||||
|
||||
def enqueue(self, record):
|
||||
'''
|
||||
Enqueue a record.
|
||||
|
||||
The base implementation uses put_nowait. You may want to override
|
||||
this method if you want to use blocking, timeouts or custom queue
|
||||
implementations.
|
||||
'''
|
||||
try:
|
||||
self.queue.put_nowait(record)
|
||||
except queue.Full:
|
||||
sys.stderr.write('[WARNING ] Message queue is full, '
|
||||
'unable to write "{0}" to log'.format(record))
|
||||
|
|
|
@ -178,9 +178,9 @@ LOGGING_STORE_HANDLER = __StoreLoggingHandler()
|
|||
|
||||
|
||||
class SaltLogQueueHandler(QueueHandler):
|
||||
def prepare(self, record):
|
||||
record = QueueHandler.prepare(self, record)
|
||||
return record.__dict__.copy()
|
||||
'''
|
||||
Subclassed just to differentiate when debugging
|
||||
'''
|
||||
|
||||
|
||||
class SaltLogRecord(logging.LogRecord):
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
minion_blackout: False
|
|
@ -7,16 +7,19 @@ Tests for minion blackout
|
|||
from __future__ import absolute_import
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
import textwrap
|
||||
|
||||
# Import Salt Testing libs
|
||||
from tests.support.case import ModuleCase
|
||||
from tests.support.paths import PILLAR_DIR
|
||||
from tests.support.helpers import flaky
|
||||
from tests.support.runtests import RUNTIME_VARS
|
||||
|
||||
# Import Salt libs
|
||||
import salt.utils.files
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MinionBlackoutTestCase(ModuleCase):
|
||||
'''
|
||||
|
@ -25,32 +28,54 @@ class MinionBlackoutTestCase(ModuleCase):
|
|||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.blackout_pillar = os.path.join(PILLAR_DIR, 'base', 'blackout.sls')
|
||||
cls.blackout_pillar = os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, 'blackout.sls')
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
if os.path.exists(cls.blackout_pillar):
|
||||
os.unlink(cls.blackout_pillar)
|
||||
del cls.blackout_pillar
|
||||
|
||||
def setUp(self):
|
||||
with salt.utils.files.fopen(self.blackout_pillar, 'w') as wfh:
|
||||
wfh.write('minion_blackout: False')
|
||||
self.addCleanup(self.cleanup_blackout_pillar)
|
||||
|
||||
def tearDown(self):
|
||||
self.end_blackout(sleep=False)
|
||||
# Be sure to also refresh the sub_minion pillar
|
||||
self.run_function('saltutil.refresh_pillar', minion_tgt='sub_minion')
|
||||
time.sleep(10) # wait for minion to exit blackout mode
|
||||
self.wait_for_all_jobs()
|
||||
|
||||
def cleanup_blackout_pillar(self):
|
||||
if os.path.exists(self.blackout_pillar):
|
||||
os.unlink(self.blackout_pillar)
|
||||
|
||||
def begin_blackout(self, blackout_data='minion_blackout: True'):
|
||||
'''
|
||||
setup minion blackout mode
|
||||
'''
|
||||
log.info('Entering minion blackout...')
|
||||
self.wait_for_all_jobs()
|
||||
with salt.utils.files.fopen(self.blackout_pillar, 'w') as wfh:
|
||||
wfh.write(blackout_data)
|
||||
self.run_function('saltutil.refresh_pillar')
|
||||
time.sleep(10) # wait for minion to enter blackout mode
|
||||
log.info('Entered minion blackout.')
|
||||
|
||||
def end_blackout(self, sleep=True):
|
||||
'''
|
||||
takedown minion blackout mode
|
||||
'''
|
||||
log.info('Exiting minion blackout...')
|
||||
with salt.utils.files.fopen(self.blackout_pillar, 'w') as wfh:
|
||||
wfh.write('minion_blackout: False\n')
|
||||
self.run_function('saltutil.refresh_pillar')
|
||||
if sleep:
|
||||
time.sleep(10) # wait for minion to exit blackout mode
|
||||
self.wait_for_all_jobs()
|
||||
log.info('Exited minion blackout.')
|
||||
|
||||
@flaky
|
||||
def test_blackout(self):
|
||||
|
|
|
@ -5,6 +5,7 @@ Test the salt mine system
|
|||
# Import Python libs
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
import time
|
||||
import pprint
|
||||
|
||||
# Import Salt Testing libs
|
||||
from tests.support.case import ModuleCase
|
||||
|
@ -14,17 +15,24 @@ class MineTest(ModuleCase):
|
|||
'''
|
||||
Test the mine system
|
||||
'''
|
||||
def setUp(self):
|
||||
self.wait_for_all_jobs()
|
||||
|
||||
def test_get(self):
|
||||
'''
|
||||
test mine.get and mine.update
|
||||
'''
|
||||
self.assertTrue(self.run_function('mine.update', minion_tgt='minion'))
|
||||
self.assertTrue(
|
||||
# The sub_minion does not have mine_functions defined in its configuration
|
||||
# In this case, mine.update returns None
|
||||
self.assertIsNone(
|
||||
self.run_function(
|
||||
'mine.update',
|
||||
minion_tgt='sub_minion'
|
||||
)
|
||||
)
|
||||
# Since the minion has mine_functions defined in its configuration,
|
||||
# mine.update will return True
|
||||
self.assertTrue(
|
||||
self.run_function(
|
||||
'mine.get',
|
||||
|
@ -110,29 +118,57 @@ class MineTest(ModuleCase):
|
|||
'''
|
||||
Test mine.delete
|
||||
'''
|
||||
# TODO The calls to sleep were added in an attempt to make this tests
|
||||
# less flaky. If we still see it fail we need to look for a more robust
|
||||
# solution.
|
||||
self.assertTrue(
|
||||
self.run_function(
|
||||
'mine.send',
|
||||
['grains.items']
|
||||
['grains.items'],
|
||||
minion_tgt='minion'
|
||||
)
|
||||
)
|
||||
time.sleep(1)
|
||||
# Smoke testing that grains should now exist in the mine
|
||||
ret_grains = self.run_function(
|
||||
'mine.get',
|
||||
['minion', 'grains.items']
|
||||
self.wait_for_all_jobs(minions=('minion',))
|
||||
|
||||
attempts = 10
|
||||
ret_grains = None
|
||||
while True:
|
||||
if ret_grains:
|
||||
break
|
||||
# Smoke testing that grains should now exist in the mine
|
||||
ret_grains = self.run_function(
|
||||
'mine.get',
|
||||
['minion', 'grains.items'],
|
||||
minion_tgt='minion'
|
||||
)
|
||||
if ret_grains and 'minion' in ret_grains:
|
||||
break
|
||||
|
||||
if attempts:
|
||||
attempts -= 1
|
||||
|
||||
if attempts:
|
||||
time.sleep(1.5)
|
||||
continue
|
||||
|
||||
self.fail(
|
||||
'\'minion\' was not found as a key of the \'mine.get\' \'grains.items\' call. Full return: {}'.format(
|
||||
pprint.pformat(ret_grains)
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
ret_grains['minion']['id'], 'minion',
|
||||
msg='{} != minion, full return payload: {}'.format(
|
||||
ret_grains['minion']['id'],
|
||||
pprint.pformat(ret_grains)
|
||||
)
|
||||
)
|
||||
self.assertEqual(ret_grains['minion']['id'], 'minion')
|
||||
self.assertTrue(
|
||||
self.run_function(
|
||||
'mine.send',
|
||||
['test.arg', 'foo=bar', 'fnord=roscivs'],
|
||||
minion_tgt='minion'
|
||||
)
|
||||
)
|
||||
time.sleep(1)
|
||||
self.wait_for_all_jobs(minions=('minion',))
|
||||
ret_args = self.run_function(
|
||||
'mine.get',
|
||||
['minion', 'test.arg']
|
||||
|
@ -151,26 +187,30 @@ class MineTest(ModuleCase):
|
|||
self.assertTrue(
|
||||
self.run_function(
|
||||
'mine.send',
|
||||
['test.echo', 'foo']
|
||||
['test.echo', 'foo'],
|
||||
minion_tgt='minion'
|
||||
)
|
||||
)
|
||||
time.sleep(1)
|
||||
self.wait_for_all_jobs(minions=('minion',))
|
||||
ret_echo = self.run_function(
|
||||
'mine.get',
|
||||
['minion', 'test.echo']
|
||||
['minion', 'test.echo'],
|
||||
minion_tgt='minion'
|
||||
)
|
||||
# Smoke testing that we were also able to set test.echo in the mine
|
||||
self.assertEqual(ret_echo['minion'], 'foo')
|
||||
self.assertTrue(
|
||||
self.run_function(
|
||||
'mine.delete',
|
||||
['test.arg']
|
||||
['test.arg'],
|
||||
minion_tgt='minion'
|
||||
)
|
||||
)
|
||||
time.sleep(1)
|
||||
self.wait_for_all_jobs(minions=('minion',))
|
||||
ret_arg_deleted = self.run_function(
|
||||
'mine.get',
|
||||
['minion', 'test.arg']
|
||||
['minion', 'test.arg'],
|
||||
minion_tgt='minion'
|
||||
)
|
||||
# Now comes the real test - did we obliterate test.arg from the mine?
|
||||
# We could assert this a different way, but there shouldn't be any
|
||||
|
@ -186,7 +226,8 @@ class MineTest(ModuleCase):
|
|||
)
|
||||
ret_echo_stays = self.run_function(
|
||||
'mine.get',
|
||||
['minion', 'test.echo']
|
||||
['minion', 'test.echo'],
|
||||
minion_tgt='minion'
|
||||
)
|
||||
# Of course, one more health check - we want targeted removal.
|
||||
# This isn't horseshoes or hand grenades - test.arg should go away
|
||||
|
|
|
@ -788,6 +788,19 @@ class ModuleCase(TestCase, SaltClientTestCaseMixin):
|
|||
Execute a module function
|
||||
'''
|
||||
|
||||
def wait_for_all_jobs(self, minions=('minion', 'sub_minion',), sleep=.3):
|
||||
'''
|
||||
Wait for all jobs currently running on the list of minions to finish
|
||||
'''
|
||||
for minion in minions:
|
||||
while True:
|
||||
ret = self.run_function('saltutil.running', minion_tgt=minion, timeout=300)
|
||||
if ret:
|
||||
log.debug('Waiting for minion\'s jobs: %s', minion)
|
||||
time.sleep(sleep)
|
||||
else:
|
||||
break
|
||||
|
||||
def minion_run(self, _function, *args, **kw):
|
||||
'''
|
||||
Run a single salt function on the 'minion' target and condition
|
||||
|
@ -800,9 +813,16 @@ class ModuleCase(TestCase, SaltClientTestCaseMixin):
|
|||
Run a single salt function and condition the return down to match the
|
||||
behavior of the raw function call
|
||||
'''
|
||||
know_to_return_none = (
|
||||
'file.chown', 'file.chgrp', 'ssh.recv_known_host_entries'
|
||||
known_to_return_none = (
|
||||
'data.get',
|
||||
'file.chown',
|
||||
'file.chgrp',
|
||||
'pkg.refresh_db',
|
||||
'ssh.recv_known_host_entries',
|
||||
'time.sleep'
|
||||
)
|
||||
if minion_tgt == 'sub_minion':
|
||||
known_to_return_none += ('mine.update',)
|
||||
if 'f_arg' in kwargs:
|
||||
kwargs['arg'] = kwargs.pop('f_arg')
|
||||
if 'f_timeout' in kwargs:
|
||||
|
@ -820,7 +840,7 @@ class ModuleCase(TestCase, SaltClientTestCaseMixin):
|
|||
minion_tgt, orig
|
||||
)
|
||||
)
|
||||
elif orig[minion_tgt] is None and function not in know_to_return_none:
|
||||
elif orig[minion_tgt] is None and function not in known_to_return_none:
|
||||
self.skipTest(
|
||||
'WARNING(SHOULD NOT HAPPEN #1935): Failed to get \'{0}\' from '
|
||||
'the minion \'{1}\'. Command output: {2}'.format(
|
||||
|
|
Loading…
Add table
Reference in a new issue