mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #45585 from rallytime/bp-45579
Back-port #45579 to 2017.7.3
This commit is contained in:
commit
2a992f9017
3 changed files with 33 additions and 15 deletions
|
@ -127,4 +127,7 @@ class Engine(SignalHandlingMultiprocessingProcess):
|
|||
try:
|
||||
self.engine[self.fun](**kwargs)
|
||||
except Exception as exc:
|
||||
log.critical('Engine {0} could not be started! Error: {1}'.format(self.engine, exc))
|
||||
log.critical(
|
||||
'Engine \'%s\' could not be started!',
|
||||
self.fun.split('.')[0], exc_info=True
|
||||
)
|
||||
|
|
|
@ -16,6 +16,8 @@ import weakref
|
|||
from random import randint
|
||||
|
||||
# Import Salt Libs
|
||||
from salt.ext import six
|
||||
from salt.ext.six.moves import map
|
||||
import salt.auth
|
||||
import salt.crypt
|
||||
import salt.utils
|
||||
|
@ -40,13 +42,15 @@ try:
|
|||
except ImportError:
|
||||
HAS_ZMQ_MONITOR = False
|
||||
|
||||
LIBZMQ_VERSION = tuple(map(int, zmq.zmq_version().split('.')))
|
||||
PYZMQ_VERSION = tuple(map(int, zmq.pyzmq_version().split('.')))
|
||||
|
||||
# Import Tornado Libs
|
||||
import tornado
|
||||
import tornado.gen
|
||||
import tornado.concurrent
|
||||
|
||||
# Import third party libs
|
||||
import salt.ext.six as six
|
||||
try:
|
||||
from Cryptodome.Cipher import PKCS1_OAEP
|
||||
except ImportError:
|
||||
|
@ -359,7 +363,12 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
|
|||
self._monitor.stop()
|
||||
self._monitor = None
|
||||
if hasattr(self, '_stream'):
|
||||
self._stream.close(0)
|
||||
if PYZMQ_VERSION < (14, 3, 0):
|
||||
# stream.close() doesn't work properly on pyzmq < 14.3.0
|
||||
self._stream.io_loop.remove_handler(self._stream.socket)
|
||||
self._stream.socket.close(0)
|
||||
else:
|
||||
self._stream.close(0)
|
||||
elif hasattr(self, '_socket'):
|
||||
self._socket.close(0)
|
||||
if hasattr(self, 'context') and self.context.closed is False:
|
||||
|
@ -911,8 +920,17 @@ class AsyncReqMessageClient(object):
|
|||
# TODO: timeout all in-flight sessions, or error
|
||||
def destroy(self):
|
||||
if hasattr(self, 'stream') and self.stream is not None:
|
||||
self.stream.close()
|
||||
self.socket = None
|
||||
if PYZMQ_VERSION < (14, 3, 0):
|
||||
# stream.close() doesn't work properly on pyzmq < 14.3.0
|
||||
if self.stream.socket:
|
||||
self.stream.socket.close()
|
||||
self.stream.io_loop.remove_handler(self.stream.socket)
|
||||
# set this to None, more hacks for messed up pyzmq
|
||||
self.stream.socket = None
|
||||
self.socket.close()
|
||||
else:
|
||||
self.stream.close()
|
||||
self.socket = None
|
||||
self.stream = None
|
||||
if self.context.closed is False:
|
||||
self.context.term()
|
||||
|
|
|
@ -91,8 +91,8 @@ SUB_EVENT = set([
|
|||
'state.sls',
|
||||
])
|
||||
|
||||
TAGEND = '\n\n' # long tag delimiter
|
||||
TAGPARTER = '/' # name spaced tag delimiter
|
||||
TAGEND = str('\n\n') # long tag delimiter
|
||||
TAGPARTER = str('/') # name spaced tag delimiter
|
||||
SALT = 'salt' # base prefix for all salt/ events
|
||||
# dict map of namespaced base tag prefixes for salt events
|
||||
TAGS = {
|
||||
|
@ -725,14 +725,11 @@ class SaltEvent(object):
|
|||
is_msgpacked=True,
|
||||
use_bin_type=six.PY3
|
||||
)
|
||||
log.debug('Sending event: tag = {0}; data = {1}'.format(tag, data))
|
||||
if six.PY2:
|
||||
event = '{0}{1}{2}'.format(tag, tagend, serialized_data)
|
||||
else:
|
||||
event = b''.join([
|
||||
salt.utils.to_bytes(tag),
|
||||
salt.utils.to_bytes(tagend),
|
||||
serialized_data])
|
||||
log.debug('Sending event: tag = %s; data = %s', tag, data)
|
||||
event = b''.join([
|
||||
salt.utils.to_bytes(tag),
|
||||
salt.utils.to_bytes(tagend),
|
||||
serialized_data])
|
||||
msg = salt.utils.to_bytes(event, 'utf-8')
|
||||
if self._run_io_loop_sync:
|
||||
with salt.utils.async.current_ioloop(self.io_loop):
|
||||
|
|
Loading…
Add table
Reference in a new issue