Merge pull request #53258 from s0undt3ch/2019.2.1

[2019.2.1] Fix multiprocessing logging queue dict changing during iteration errors
This commit is contained in:
Pedro Algarvio 2019-05-27 20:26:26 +01:00 committed by GitHub
commit d011beb90f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 184 additions and 39 deletions

View file

@ -11,6 +11,7 @@ from __future__ import absolute_import, print_function, unicode_literals
# Import python libs
import sys
import copy
import logging
import threading
import logging.handlers
@ -193,32 +194,33 @@ if sys.version_info < (3, 2):
self.queue.put_nowait(record)
except queue.Full:
sys.stderr.write('[WARNING ] Message queue is full, '
'unable to write "{0}" to log'.format(record)
)
'unable to write "{0}" to log'.format(record))
def prepare(self, record):
'''
Prepares a record for queuing. The object returned by this method is
enqueued.
The base implementation formats the record to merge the message
and arguments, and removes unpickleable items from the record
in-place.
You might want to override this method if you want to convert
the record to a dict or JSON string, or send a modified copy
of the record while leaving the original intact.
'''
# The format operation gets traceback text into record.exc_text
# (if there's exception data), and also puts the message into
# record.message. We can then use this to replace the original
# (if there's exception data), and also returns the formatted
# message. We can then use this to replace the original
# msg + args, as these might be unpickleable. We also zap the
# exc_info attribute, as it's no longer needed and, if not None,
# will typically not be pickleable.
self.format(record)
record.msg = record.getMessage()
# exc_info and exc_text attributes, as they are no longer
# needed and, if not None, will typically not be pickleable.
msg = self.format(record)
# bpo-35726: make copy of record to avoid affecting other handlers in the chain.
record = copy.copy(record)
record.message = msg
record.msg = msg
record.args = None
record.exc_info = None
record.exc_text = None
return record
def emit(self, record):
@ -231,6 +233,64 @@ if sys.version_info < (3, 2):
self.enqueue(self.prepare(record))
except Exception:
self.handleError(record)
elif sys.version_info < (3, 7):
# On python versions lower than 3.7, we sill subclass and overwrite prepare to include the fix for:
# https://bugs.python.org/issue35726
class QueueHandler(ExcInfoOnLogLevelFormatMixIn, logging.handlers.QueueHandler): # pylint: disable=no-member,E0240
def enqueue(self, record):
'''
Enqueue a record.
The base implementation uses put_nowait. You may want to override
this method if you want to use blocking, timeouts or custom queue
implementations.
'''
try:
self.queue.put_nowait(record)
except queue.Full:
sys.stderr.write('[WARNING ] Message queue is full, '
'unable to write "{0}" to log'.format(record))
def prepare(self, record):
'''
Prepares a record for queuing. The object returned by this method is
enqueued.
The base implementation formats the record to merge the message
and arguments, and removes unpickleable items from the record
in-place.
You might want to override this method if you want to convert
the record to a dict or JSON string, or send a modified copy
of the record while leaving the original intact.
'''
# The format operation gets traceback text into record.exc_text
# (if there's exception data), and also returns the formatted
# message. We can then use this to replace the original
# msg + args, as these might be unpickleable. We also zap the
# exc_info and exc_text attributes, as they are no longer
# needed and, if not None, will typically not be pickleable.
msg = self.format(record)
# bpo-35726: make copy of record to avoid affecting other handlers in the chain.
record = copy.copy(record)
record.message = msg
record.msg = msg
record.args = None
record.exc_info = None
record.exc_text = None
return record
else:
class QueueHandler(ExcInfoOnLogLevelFormatMixIn, logging.handlers.QueueHandler): # pylint: disable=no-member,E0240
pass
def enqueue(self, record):
'''
Enqueue a record.
The base implementation uses put_nowait. You may want to override
this method if you want to use blocking, timeouts or custom queue
implementations.
'''
try:
self.queue.put_nowait(record)
except queue.Full:
sys.stderr.write('[WARNING ] Message queue is full, '
'unable to write "{0}" to log'.format(record))

View file

@ -178,9 +178,9 @@ LOGGING_STORE_HANDLER = __StoreLoggingHandler()
class SaltLogQueueHandler(QueueHandler):
def prepare(self, record):
record = QueueHandler.prepare(self, record)
return record.__dict__.copy()
'''
Subclassed just to differentiate when debugging
'''
class SaltLogRecord(logging.LogRecord):

View file

@ -1 +0,0 @@
minion_blackout: False

View file

@ -7,16 +7,19 @@ Tests for minion blackout
from __future__ import absolute_import
import os
import time
import logging
import textwrap
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.paths import PILLAR_DIR
from tests.support.helpers import flaky
from tests.support.runtests import RUNTIME_VARS
# Import Salt libs
import salt.utils.files
log = logging.getLogger(__name__)
class MinionBlackoutTestCase(ModuleCase):
'''
@ -25,32 +28,54 @@ class MinionBlackoutTestCase(ModuleCase):
@classmethod
def setUpClass(cls):
cls.blackout_pillar = os.path.join(PILLAR_DIR, 'base', 'blackout.sls')
cls.blackout_pillar = os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, 'blackout.sls')
@classmethod
def tearDownClass(cls):
if os.path.exists(cls.blackout_pillar):
os.unlink(cls.blackout_pillar)
del cls.blackout_pillar
def setUp(self):
with salt.utils.files.fopen(self.blackout_pillar, 'w') as wfh:
wfh.write('minion_blackout: False')
self.addCleanup(self.cleanup_blackout_pillar)
def tearDown(self):
self.end_blackout(sleep=False)
# Be sure to also refresh the sub_minion pillar
self.run_function('saltutil.refresh_pillar', minion_tgt='sub_minion')
time.sleep(10) # wait for minion to exit blackout mode
self.wait_for_all_jobs()
def cleanup_blackout_pillar(self):
if os.path.exists(self.blackout_pillar):
os.unlink(self.blackout_pillar)
def begin_blackout(self, blackout_data='minion_blackout: True'):
'''
setup minion blackout mode
'''
log.info('Entering minion blackout...')
self.wait_for_all_jobs()
with salt.utils.files.fopen(self.blackout_pillar, 'w') as wfh:
wfh.write(blackout_data)
self.run_function('saltutil.refresh_pillar')
time.sleep(10) # wait for minion to enter blackout mode
log.info('Entered minion blackout.')
def end_blackout(self, sleep=True):
'''
takedown minion blackout mode
'''
log.info('Exiting minion blackout...')
with salt.utils.files.fopen(self.blackout_pillar, 'w') as wfh:
wfh.write('minion_blackout: False\n')
self.run_function('saltutil.refresh_pillar')
if sleep:
time.sleep(10) # wait for minion to exit blackout mode
self.wait_for_all_jobs()
log.info('Exited minion blackout.')
@flaky
def test_blackout(self):

View file

@ -5,6 +5,7 @@ Test the salt mine system
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import time
import pprint
# Import Salt Testing libs
from tests.support.case import ModuleCase
@ -14,17 +15,24 @@ class MineTest(ModuleCase):
'''
Test the mine system
'''
def setUp(self):
self.wait_for_all_jobs()
def test_get(self):
'''
test mine.get and mine.update
'''
self.assertTrue(self.run_function('mine.update', minion_tgt='minion'))
self.assertTrue(
# The sub_minion does not have mine_functions defined in its configuration
# In this case, mine.update returns None
self.assertIsNone(
self.run_function(
'mine.update',
minion_tgt='sub_minion'
)
)
# Since the minion has mine_functions defined in its configuration,
# mine.update will return True
self.assertTrue(
self.run_function(
'mine.get',
@ -110,29 +118,57 @@ class MineTest(ModuleCase):
'''
Test mine.delete
'''
# TODO The calls to sleep were added in an attempt to make this tests
# less flaky. If we still see it fail we need to look for a more robust
# solution.
self.assertTrue(
self.run_function(
'mine.send',
['grains.items']
['grains.items'],
minion_tgt='minion'
)
)
time.sleep(1)
# Smoke testing that grains should now exist in the mine
ret_grains = self.run_function(
'mine.get',
['minion', 'grains.items']
self.wait_for_all_jobs(minions=('minion',))
attempts = 10
ret_grains = None
while True:
if ret_grains:
break
# Smoke testing that grains should now exist in the mine
ret_grains = self.run_function(
'mine.get',
['minion', 'grains.items'],
minion_tgt='minion'
)
if ret_grains and 'minion' in ret_grains:
break
if attempts:
attempts -= 1
if attempts:
time.sleep(1.5)
continue
self.fail(
'\'minion\' was not found as a key of the \'mine.get\' \'grains.items\' call. Full return: {}'.format(
pprint.pformat(ret_grains)
)
)
self.assertEqual(
ret_grains['minion']['id'], 'minion',
msg='{} != minion, full return payload: {}'.format(
ret_grains['minion']['id'],
pprint.pformat(ret_grains)
)
)
self.assertEqual(ret_grains['minion']['id'], 'minion')
self.assertTrue(
self.run_function(
'mine.send',
['test.arg', 'foo=bar', 'fnord=roscivs'],
minion_tgt='minion'
)
)
time.sleep(1)
self.wait_for_all_jobs(minions=('minion',))
ret_args = self.run_function(
'mine.get',
['minion', 'test.arg']
@ -151,26 +187,30 @@ class MineTest(ModuleCase):
self.assertTrue(
self.run_function(
'mine.send',
['test.echo', 'foo']
['test.echo', 'foo'],
minion_tgt='minion'
)
)
time.sleep(1)
self.wait_for_all_jobs(minions=('minion',))
ret_echo = self.run_function(
'mine.get',
['minion', 'test.echo']
['minion', 'test.echo'],
minion_tgt='minion'
)
# Smoke testing that we were also able to set test.echo in the mine
self.assertEqual(ret_echo['minion'], 'foo')
self.assertTrue(
self.run_function(
'mine.delete',
['test.arg']
['test.arg'],
minion_tgt='minion'
)
)
time.sleep(1)
self.wait_for_all_jobs(minions=('minion',))
ret_arg_deleted = self.run_function(
'mine.get',
['minion', 'test.arg']
['minion', 'test.arg'],
minion_tgt='minion'
)
# Now comes the real test - did we obliterate test.arg from the mine?
# We could assert this a different way, but there shouldn't be any
@ -186,7 +226,8 @@ class MineTest(ModuleCase):
)
ret_echo_stays = self.run_function(
'mine.get',
['minion', 'test.echo']
['minion', 'test.echo'],
minion_tgt='minion'
)
# Of course, one more health check - we want targeted removal.
# This isn't horseshoes or hand grenades - test.arg should go away

View file

@ -788,6 +788,19 @@ class ModuleCase(TestCase, SaltClientTestCaseMixin):
Execute a module function
'''
def wait_for_all_jobs(self, minions=('minion', 'sub_minion',), sleep=.3):
'''
Wait for all jobs currently running on the list of minions to finish
'''
for minion in minions:
while True:
ret = self.run_function('saltutil.running', minion_tgt=minion, timeout=300)
if ret:
log.debug('Waiting for minion\'s jobs: %s', minion)
time.sleep(sleep)
else:
break
def minion_run(self, _function, *args, **kw):
'''
Run a single salt function on the 'minion' target and condition
@ -800,9 +813,16 @@ class ModuleCase(TestCase, SaltClientTestCaseMixin):
Run a single salt function and condition the return down to match the
behavior of the raw function call
'''
know_to_return_none = (
'file.chown', 'file.chgrp', 'ssh.recv_known_host_entries'
known_to_return_none = (
'data.get',
'file.chown',
'file.chgrp',
'pkg.refresh_db',
'ssh.recv_known_host_entries',
'time.sleep'
)
if minion_tgt == 'sub_minion':
known_to_return_none += ('mine.update',)
if 'f_arg' in kwargs:
kwargs['arg'] = kwargs.pop('f_arg')
if 'f_timeout' in kwargs:
@ -820,7 +840,7 @@ class ModuleCase(TestCase, SaltClientTestCaseMixin):
minion_tgt, orig
)
)
elif orig[minion_tgt] is None and function not in know_to_return_none:
elif orig[minion_tgt] is None and function not in known_to_return_none:
self.skipTest(
'WARNING(SHOULD NOT HAPPEN #1935): Failed to get \'{0}\' from '
'the minion \'{1}\'. Command output: {2}'.format(