mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #45211 from terminalmage/py3-transport
[PY3] Add unicode_literals to transports
This commit is contained in:
commit
4637a3b43c
14 changed files with 104 additions and 110 deletions
|
@ -2,7 +2,7 @@
|
|||
'''
|
||||
Encapsulate the different transports available to Salt.
|
||||
'''
|
||||
from __future__ import absolute_import
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
import logging
|
||||
|
||||
# Import third party libs
|
||||
|
@ -57,8 +57,10 @@ class MessageClientPool(object):
|
|||
def __init__(self, tgt, opts, args=None, kwargs=None):
|
||||
sock_pool_size = opts['sock_pool_size'] if 'sock_pool_size' in opts else 1
|
||||
if sock_pool_size < 1:
|
||||
log.warn('sock_pool_size is not correctly set, \
|
||||
the option should be greater than 0 but, {0}'.format(sock_pool_size))
|
||||
log.warning(
|
||||
'sock_pool_size is not correctly set, the option should be '
|
||||
'greater than 0 but is instead %s', sock_pool_size
|
||||
)
|
||||
sock_pool_size = 1
|
||||
|
||||
if args is None:
|
||||
|
|
|
@ -6,7 +6,7 @@ This includes client side transport, for the ReqServer and the Publisher
|
|||
'''
|
||||
|
||||
# Import Python Libs
|
||||
from __future__ import absolute_import
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
import logging
|
||||
|
||||
# Import Salt Libs
|
||||
|
@ -156,7 +156,7 @@ class AsyncPubChannel(AsyncChannel):
|
|||
# switch on available ttypes
|
||||
if ttype == 'detect':
|
||||
opts['detect_mode'] = True
|
||||
log.info('Transport is set to detect; using {0}'.format(ttype))
|
||||
log.info('Transport is set to detect; using %s', ttype)
|
||||
if ttype == 'zeromq':
|
||||
import salt.transport.zeromq
|
||||
return salt.transport.zeromq.AsyncZeroMQPubChannel(opts, **kwargs)
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
Helper functions for transport components to handle message framing
|
||||
'''
|
||||
# Import python libs
|
||||
from __future__ import absolute_import
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
import msgpack
|
||||
from salt.ext import six
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ IPC transport classes
|
|||
'''
|
||||
|
||||
# Import Python libs
|
||||
from __future__ import absolute_import
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
import logging
|
||||
import socket
|
||||
import weakref
|
||||
|
@ -119,7 +119,7 @@ class IPCServer(object):
|
|||
Blocks until socket is established
|
||||
'''
|
||||
# Start up the ioloop
|
||||
log.trace('IPCServer: binding to socket: {0}'.format(self.socket_path))
|
||||
log.trace('IPCServer: binding to socket: %s', self.socket_path)
|
||||
if isinstance(self.socket_path, int):
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
@ -177,25 +177,24 @@ class IPCServer(object):
|
|||
body = framed_msg['body']
|
||||
self.io_loop.spawn_callback(self.payload_handler, body, write_callback(stream, framed_msg['head']))
|
||||
except tornado.iostream.StreamClosedError:
|
||||
log.trace('Client disconnected '
|
||||
'from IPC {0}'.format(self.socket_path))
|
||||
log.trace('Client disconnected from IPC %s', self.socket_path)
|
||||
break
|
||||
except socket.error as exc:
|
||||
# On occasion an exception will occur with
|
||||
# an error code of 0, it's a spurious exception.
|
||||
if exc.errno == 0:
|
||||
log.trace('Exception occured with error number 0, '
|
||||
'spurious exception: {0}'.format(exc))
|
||||
'spurious exception: %s', exc)
|
||||
else:
|
||||
log.error('Exception occurred while '
|
||||
'handling stream: {0}'.format(exc))
|
||||
'handling stream: %s', exc)
|
||||
except Exception as exc:
|
||||
log.error('Exception occurred while '
|
||||
'handling stream: {0}'.format(exc))
|
||||
'handling stream: %s', exc)
|
||||
|
||||
def handle_connection(self, connection, address):
|
||||
log.trace('IPCServer: Handling connection '
|
||||
'to address: {0}'.format(address))
|
||||
'to address: %s', address)
|
||||
try:
|
||||
stream = IOStream(
|
||||
connection,
|
||||
|
@ -203,7 +202,7 @@ class IPCServer(object):
|
|||
)
|
||||
self.io_loop.spawn_callback(self.handle_stream, stream)
|
||||
except Exception as exc:
|
||||
log.error('IPC streaming error: {0}'.format(exc))
|
||||
log.error('IPC streaming error: %s', exc)
|
||||
|
||||
def close(self):
|
||||
'''
|
||||
|
@ -248,17 +247,17 @@ class IPCClient(object):
|
|||
loop_instance_map = IPCClient.instance_map[io_loop]
|
||||
|
||||
# FIXME
|
||||
key = str(socket_path)
|
||||
key = six.text_type(socket_path)
|
||||
|
||||
client = loop_instance_map.get(key)
|
||||
if client is None:
|
||||
log.debug('Initializing new IPCClient for path: {0}'.format(key))
|
||||
log.debug('Initializing new IPCClient for path: %s', key)
|
||||
client = object.__new__(cls)
|
||||
# FIXME
|
||||
client.__singleton_init__(io_loop=io_loop, socket_path=socket_path)
|
||||
loop_instance_map[key] = client
|
||||
else:
|
||||
log.debug('Re-using IPCClient for {0}'.format(key))
|
||||
log.debug('Re-using IPCClient for %s', key)
|
||||
return client
|
||||
|
||||
def __singleton_init__(self, socket_path, io_loop=None):
|
||||
|
@ -336,7 +335,7 @@ class IPCClient(object):
|
|||
)
|
||||
|
||||
try:
|
||||
log.trace('IPCClient: Connecting to socket: {0}'.format(self.socket_path))
|
||||
log.trace('IPCClient: Connecting to socket: %s', self.socket_path)
|
||||
yield self.stream.connect(sock_addr)
|
||||
self._connecting_future.set_result(True)
|
||||
break
|
||||
|
@ -374,7 +373,7 @@ class IPCClient(object):
|
|||
# count of the entry has not yet gone to zero.
|
||||
if self.io_loop in IPCClient.instance_map:
|
||||
loop_instance_map = IPCClient.instance_map[self.io_loop]
|
||||
key = str(self.socket_path)
|
||||
key = six.text_type(self.socket_path)
|
||||
if key in loop_instance_map:
|
||||
del loop_instance_map[key]
|
||||
|
||||
|
@ -500,7 +499,7 @@ class IPCMessagePublisher(object):
|
|||
Blocks until socket is established
|
||||
'''
|
||||
# Start up the ioloop
|
||||
log.trace('IPCMessagePublisher: binding to socket: {0}'.format(self.socket_path))
|
||||
log.trace('IPCMessagePublisher: binding to socket: %s', self.socket_path)
|
||||
if isinstance(self.socket_path, int):
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
@ -523,10 +522,10 @@ class IPCMessagePublisher(object):
|
|||
try:
|
||||
yield stream.write(pack)
|
||||
except tornado.iostream.StreamClosedError:
|
||||
log.trace('Client disconnected from IPC {0}'.format(self.socket_path))
|
||||
log.trace('Client disconnected from IPC %s', self.socket_path)
|
||||
self.streams.discard(stream)
|
||||
except Exception as exc:
|
||||
log.error('Exception occurred while handling stream: {0}'.format(exc))
|
||||
log.error('Exception occurred while handling stream: %s', exc)
|
||||
if not stream.closed():
|
||||
stream.close()
|
||||
self.streams.discard(stream)
|
||||
|
@ -544,10 +543,10 @@ class IPCMessagePublisher(object):
|
|||
self.io_loop.spawn_callback(self._write, stream, pack)
|
||||
|
||||
def handle_connection(self, connection, address):
|
||||
log.trace('IPCServer: Handling connection to address: {0}'.format(address))
|
||||
log.trace('IPCServer: Handling connection to address: %s', address)
|
||||
try:
|
||||
if self.opts['ipc_write_buffer'] > 0:
|
||||
log.trace('Setting IPC connection write buffer: {0}'.format((self.opts['ipc_write_buffer'])))
|
||||
log.trace('Setting IPC connection write buffer: %s', (self.opts['ipc_write_buffer']))
|
||||
stream = IOStream(
|
||||
connection,
|
||||
io_loop=self.io_loop,
|
||||
|
@ -560,7 +559,7 @@ class IPCMessagePublisher(object):
|
|||
)
|
||||
self.streams.add(stream)
|
||||
except Exception as exc:
|
||||
log.error('IPC streaming error: {0}'.format(exc))
|
||||
log.error('IPC streaming error: %s', exc)
|
||||
|
||||
def close(self):
|
||||
'''
|
||||
|
@ -664,11 +663,11 @@ class IPCMessageSubscriber(IPCClient):
|
|||
# Keep 'self._read_stream_future' alive.
|
||||
ret = None
|
||||
except tornado.iostream.StreamClosedError as exc:
|
||||
log.trace('Subscriber disconnected from IPC {0}'.format(self.socket_path))
|
||||
log.trace('Subscriber disconnected from IPC %s', self.socket_path)
|
||||
self._read_stream_future = None
|
||||
exc_to_raise = exc
|
||||
except Exception as exc:
|
||||
log.error('Exception occurred in Subscriber while handling stream: {0}'.format(exc))
|
||||
log.error('Exception occurred in Subscriber while handling stream: %s', exc)
|
||||
self._read_stream_future = None
|
||||
exc_to_raise = exc
|
||||
|
||||
|
@ -716,10 +715,10 @@ class IPCMessageSubscriber(IPCClient):
|
|||
body = framed_msg['body']
|
||||
self.io_loop.spawn_callback(callback, body)
|
||||
except tornado.iostream.StreamClosedError:
|
||||
log.trace('Subscriber disconnected from IPC {0}'.format(self.socket_path))
|
||||
log.trace('Subscriber disconnected from IPC %s', self.socket_path)
|
||||
break
|
||||
except Exception as exc:
|
||||
log.error('Exception occurred while Subscriber handling stream: {0}'.format(exc))
|
||||
log.error('Exception occurred while Subscriber handling stream: %s', exc)
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def read_async(self, callback):
|
||||
|
@ -732,10 +731,10 @@ class IPCMessageSubscriber(IPCClient):
|
|||
try:
|
||||
yield self.connect(timeout=5)
|
||||
except tornado.iostream.StreamClosedError:
|
||||
log.trace('Subscriber closed stream on IPC {0} before connect'.format(self.socket_path))
|
||||
log.trace('Subscriber closed stream on IPC %s before connect', self.socket_path)
|
||||
yield tornado.gen.sleep(1)
|
||||
except Exception as exc:
|
||||
log.error('Exception occurred while Subscriber connecting: {0}'.format(exc))
|
||||
log.error('Exception occurred while Subscriber connecting: %s', exc)
|
||||
yield tornado.gen.sleep(1)
|
||||
yield self._read_async(callback)
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Import Python Libs
|
||||
from __future__ import absolute_import, print_function
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
import logging
|
||||
|
||||
# Import Salt Libs
|
||||
|
@ -23,7 +23,7 @@ class LocalChannel(ReqChannel):
|
|||
def send(self, load, tries=3, timeout=60, raw=False):
|
||||
|
||||
if self.tries == 0:
|
||||
log.debug('LocalChannel load: {0}').format(load)
|
||||
log.debug('LocalChannel load: %s', load)
|
||||
#data = json.loads(load)
|
||||
#{'path': 'apt-cacher-ng/map.jinja', 'saltenv': 'base', 'cmd': '_serve_file', 'loc': 0}
|
||||
#f = open(data['path'])
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Import Python Libs
|
||||
from __future__ import absolute_import
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
import multiprocessing
|
||||
import ctypes
|
||||
import logging
|
||||
|
@ -51,7 +51,7 @@ class AESPubClientMixin(object):
|
|||
@tornado.gen.coroutine
|
||||
def _decode_payload(self, payload):
|
||||
# we need to decrypt it
|
||||
log.trace('Decoding payload: {0}'.format(payload))
|
||||
log.trace('Decoding payload: %s', payload)
|
||||
if payload['enc'] == 'aes':
|
||||
self._verify_master_signature(payload)
|
||||
try:
|
||||
|
@ -172,12 +172,10 @@ class AESReqServerMixin(object):
|
|||
'''
|
||||
|
||||
if not salt.utils.verify.valid_id(self.opts, load['id']):
|
||||
log.info(
|
||||
'Authentication request from invalid id {id}'.format(**load)
|
||||
)
|
||||
log.info('Authentication request from invalid id %s', load['id'])
|
||||
return {'enc': 'clear',
|
||||
'load': {'ret': False}}
|
||||
log.info('Authentication request from {id}'.format(**load))
|
||||
log.info('Authentication request from %s', load['id'])
|
||||
|
||||
# 0 is default which should be 'unlimited'
|
||||
if self.opts['max_minions'] > 0:
|
||||
|
@ -231,8 +229,8 @@ class AESReqServerMixin(object):
|
|||
pass
|
||||
elif os.path.isfile(pubfn_rejected):
|
||||
# The key has been rejected, don't place it in pending
|
||||
log.info('Public key rejected for {0}. Key is present in '
|
||||
'rejection key dir.'.format(load['id']))
|
||||
log.info('Public key rejected for %s. Key is present in '
|
||||
'rejection key dir.', load['id'])
|
||||
eload = {'result': False,
|
||||
'id': load['id'],
|
||||
'pub': load['pub']}
|
||||
|
@ -245,9 +243,9 @@ class AESReqServerMixin(object):
|
|||
with salt.utils.files.fopen(pubfn, 'r') as pubfn_handle:
|
||||
if pubfn_handle.read().strip() != load['pub'].strip():
|
||||
log.error(
|
||||
'Authentication attempt from {id} failed, the public '
|
||||
'Authentication attempt from %s failed, the public '
|
||||
'keys did not match. This may be an attempt to compromise '
|
||||
'the Salt cluster.'.format(**load)
|
||||
'the Salt cluster.', load['id']
|
||||
)
|
||||
# put denied minion key into minions_denied
|
||||
with salt.utils.files.fopen(pubfn_denied, 'w+') as fp_:
|
||||
|
@ -264,9 +262,7 @@ class AESReqServerMixin(object):
|
|||
# The key has not been accepted, this is a new minion
|
||||
if os.path.isdir(pubfn_pend):
|
||||
# The key path is a directory, error out
|
||||
log.info(
|
||||
'New public key {id} is a directory'.format(**load)
|
||||
)
|
||||
log.info('New public key %s is a directory', load['id'])
|
||||
eload = {'result': False,
|
||||
'id': load['id'],
|
||||
'pub': load['pub']}
|
||||
|
@ -276,14 +272,12 @@ class AESReqServerMixin(object):
|
|||
|
||||
if auto_reject:
|
||||
key_path = pubfn_rejected
|
||||
log.info('New public key for {id} rejected via autoreject_file'
|
||||
.format(**load))
|
||||
log.info('New public key for %s rejected via autoreject_file', load['id'])
|
||||
key_act = 'reject'
|
||||
key_result = False
|
||||
elif not auto_sign:
|
||||
key_path = pubfn_pend
|
||||
log.info('New public key for {id} placed in pending'
|
||||
.format(**load))
|
||||
log.info('New public key for %s placed in pending', load['id'])
|
||||
key_act = 'pend'
|
||||
key_result = True
|
||||
else:
|
||||
|
@ -314,8 +308,8 @@ class AESReqServerMixin(object):
|
|||
shutil.move(pubfn_pend, pubfn_rejected)
|
||||
except (IOError, OSError):
|
||||
pass
|
||||
log.info('Pending public key for {id} rejected via '
|
||||
'autoreject_file'.format(**load))
|
||||
log.info('Pending public key for %s rejected via '
|
||||
'autoreject_file', load['id'])
|
||||
ret = {'enc': 'clear',
|
||||
'load': {'ret': False}}
|
||||
eload = {'result': False,
|
||||
|
@ -333,10 +327,9 @@ class AESReqServerMixin(object):
|
|||
with salt.utils.files.fopen(pubfn_pend, 'r') as pubfn_handle:
|
||||
if pubfn_handle.read() != load['pub']:
|
||||
log.error(
|
||||
'Authentication attempt from {id} failed, the public '
|
||||
'Authentication attempt from %s failed, the public '
|
||||
'key in pending did not match. This may be an '
|
||||
'attempt to compromise the Salt cluster.'
|
||||
.format(**load)
|
||||
'attempt to compromise the Salt cluster.', load['id']
|
||||
)
|
||||
# put denied minion key into minions_denied
|
||||
with salt.utils.files.fopen(pubfn_denied, 'w+') as fp_:
|
||||
|
@ -350,9 +343,9 @@ class AESReqServerMixin(object):
|
|||
'load': {'ret': False}}
|
||||
else:
|
||||
log.info(
|
||||
'Authentication failed from host {id}, the key is in '
|
||||
'Authentication failed from host %s, the key is in '
|
||||
'pending and needs to be accepted with salt-key '
|
||||
'-a {id}'.format(**load)
|
||||
'-a %s', load['id'], load['id']
|
||||
)
|
||||
eload = {'result': True,
|
||||
'act': 'pend',
|
||||
|
@ -369,10 +362,9 @@ class AESReqServerMixin(object):
|
|||
with salt.utils.files.fopen(pubfn_pend, 'r') as pubfn_handle:
|
||||
if pubfn_handle.read() != load['pub']:
|
||||
log.error(
|
||||
'Authentication attempt from {id} failed, the public '
|
||||
'Authentication attempt from %s failed, the public '
|
||||
'keys in pending did not match. This may be an '
|
||||
'attempt to compromise the Salt cluster.'
|
||||
.format(**load)
|
||||
'attempt to compromise the Salt cluster.', load['id']
|
||||
)
|
||||
# put denied minion key into minions_denied
|
||||
with salt.utils.files.fopen(pubfn_denied, 'w+') as fp_:
|
||||
|
@ -396,7 +388,7 @@ class AESReqServerMixin(object):
|
|||
return {'enc': 'clear',
|
||||
'load': {'ret': False}}
|
||||
|
||||
log.info('Authentication accepted from {id}'.format(**load))
|
||||
log.info('Authentication accepted from %s', load['id'])
|
||||
# only write to disk if you are adding the file, and in open mode,
|
||||
# which implies we accept any key from a minion.
|
||||
if not os.path.isfile(pubfn) and not self.opts['open_mode']:
|
||||
|
@ -424,7 +416,7 @@ class AESReqServerMixin(object):
|
|||
with salt.utils.files.fopen(pubfn) as f:
|
||||
pub = RSA.importKey(f.read())
|
||||
except (ValueError, IndexError, TypeError) as err:
|
||||
log.error('Corrupt public key "{0}": {1}'.format(pubfn, err))
|
||||
log.error('Corrupt public key "%s": %s', pubfn, err)
|
||||
return {'enc': 'clear',
|
||||
'load': {'ret': False}}
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
RAET transport classes
|
||||
'''
|
||||
|
||||
from __future__ import absolute_import
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
import time
|
||||
|
||||
# Import Salt Libs
|
||||
|
@ -77,7 +77,7 @@ class RAETReqChannel(ReqChannel):
|
|||
self.stack = jobber_stack
|
||||
else:
|
||||
self.stack = jobber_stack = self._setup_stack(ryn=self.ryn)
|
||||
log.debug("RAETReqChannel Using Jobber Stack at = {0}\n".format(self.stack.ha))
|
||||
log.debug("RAETReqChannel Using Jobber Stack at = %s\n", self.stack.ha)
|
||||
|
||||
def _setup_stack(self, ryn='manor'):
|
||||
'''
|
||||
|
@ -117,7 +117,7 @@ class RAETReqChannel(ReqChannel):
|
|||
name=ryn,
|
||||
lanename=lanename,
|
||||
dirpath=self.opts['sock_dir']))
|
||||
log.debug("Created Channel Jobber Stack {0}\n".format(stack.name))
|
||||
log.debug("Created Channel Jobber Stack %s\n", stack.name)
|
||||
return stack
|
||||
|
||||
def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout=60):
|
||||
|
|
|
@ -6,7 +6,7 @@ This includes server side transport, for the ReqServer and the Publisher
|
|||
'''
|
||||
|
||||
# Import Python Libs
|
||||
from __future__ import absolute_import
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
|
||||
|
||||
class ReqServerChannel(object):
|
||||
|
|
|
@ -7,7 +7,7 @@ Wire protocol: "len(payload) msgpack({'head': SOMEHEADER, 'body': SOMEBODY})"
|
|||
'''
|
||||
|
||||
# Import Python Libs
|
||||
from __future__ import absolute_import
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
import logging
|
||||
import msgpack
|
||||
import socket
|
||||
|
@ -224,7 +224,7 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
|
|||
key = cls.__key(opts, **kwargs)
|
||||
obj = loop_instance_map.get(key)
|
||||
if obj is None:
|
||||
log.debug('Initializing new AsyncTCPReqChannel for {0}'.format(key))
|
||||
log.debug('Initializing new AsyncTCPReqChannel for %s', key)
|
||||
# we need to make a local variable for this, as we are going to store
|
||||
# it in a WeakValueDictionary-- which will remove the item if no one
|
||||
# references it-- this forces a reference while we return to the caller
|
||||
|
@ -232,7 +232,7 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
|
|||
obj.__singleton_init__(opts, **kwargs)
|
||||
loop_instance_map[key] = obj
|
||||
else:
|
||||
log.debug('Re-using AsyncTCPReqChannel for {0}'.format(key))
|
||||
log.debug('Re-using AsyncTCPReqChannel for %s', key)
|
||||
return obj
|
||||
|
||||
@classmethod
|
||||
|
@ -475,9 +475,7 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
|
|||
except salt.exceptions.SaltReqTimeoutError:
|
||||
log.info('fire_master failed: master could not be contacted. Request timed out.')
|
||||
except Exception:
|
||||
log.info('fire_master failed: {0}'.format(
|
||||
traceback.format_exc())
|
||||
)
|
||||
log.info('fire_master failed: %s', traceback.format_exc())
|
||||
else:
|
||||
self._reconnected = True
|
||||
|
||||
|
@ -512,7 +510,7 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
|
|||
except KeyboardInterrupt:
|
||||
raise
|
||||
except Exception as exc:
|
||||
if '-|RETRY|-' not in str(exc):
|
||||
if '-|RETRY|-' not in six.text_type(exc):
|
||||
raise SaltClientError('Unable to sign_in to master: {0}'.format(exc)) # TODO: better error message
|
||||
|
||||
def on_recv(self, callback):
|
||||
|
@ -667,7 +665,7 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra
|
|||
req_opts['tgt'],
|
||||
), header=header))
|
||||
else:
|
||||
log.error('Unknown req_fun {0}'.format(req_fun))
|
||||
log.error('Unknown req_fun %s', req_fun)
|
||||
# always attempt to return an error to the minion
|
||||
stream.write('Server-side exception handling payload')
|
||||
stream.close()
|
||||
|
@ -680,7 +678,7 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra
|
|||
log.error('Connection was unexpectedly closed', exc_info=True)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
# Absorb any other exceptions
|
||||
log.error('Unexpected exception occurred: {0}'.format(exc), exc_info=True)
|
||||
log.error('Unexpected exception occurred: %s', exc, exc_info=True)
|
||||
|
||||
raise tornado.gen.Return()
|
||||
|
||||
|
@ -701,7 +699,7 @@ class SaltMessageServer(tornado.tcpserver.TCPServer, object):
|
|||
'''
|
||||
Handle incoming streams and add messages to the incoming queue
|
||||
'''
|
||||
log.trace('Req client {0} connected'.format(address))
|
||||
log.trace('Req client %s connected', address)
|
||||
self.clients.append((stream, address))
|
||||
unpacker = msgpack.Unpacker()
|
||||
try:
|
||||
|
@ -717,10 +715,10 @@ class SaltMessageServer(tornado.tcpserver.TCPServer, object):
|
|||
self.io_loop.spawn_callback(self.message_handler, stream, header, framed_msg['body'])
|
||||
|
||||
except tornado.iostream.StreamClosedError:
|
||||
log.trace('req client disconnected {0}'.format(address))
|
||||
log.trace('req client disconnected %s', address)
|
||||
self.clients.remove((stream, address))
|
||||
except Exception as e:
|
||||
log.trace('other master-side exception: {0}'.format(e))
|
||||
log.trace('other master-side exception: %s', e)
|
||||
self.clients.remove((stream, address))
|
||||
stream.close()
|
||||
|
||||
|
@ -989,9 +987,9 @@ class SaltMessageClient(object):
|
|||
if self._on_recv is not None:
|
||||
self.io_loop.spawn_callback(self._on_recv, header, body)
|
||||
else:
|
||||
log.error('Got response for message_id {0} that we are not tracking'.format(message_id))
|
||||
log.error('Got response for message_id %s that we are not tracking', message_id)
|
||||
except tornado.iostream.StreamClosedError as e:
|
||||
log.debug('tcp stream to {0}:{1} closed, unable to recv'.format(self.host, self.port))
|
||||
log.debug('tcp stream to %s:%s closed, unable to recv', self.host, self.port)
|
||||
for future in six.itervalues(self.send_future_map):
|
||||
future.set_exception(e)
|
||||
self.send_future_map = {}
|
||||
|
@ -1266,7 +1264,7 @@ class PubServer(tornado.tcpserver.TCPServer, object):
|
|||
client.id_ = load['id']
|
||||
self._add_client_present(client)
|
||||
except tornado.iostream.StreamClosedError as e:
|
||||
log.debug('tcp stream to {0} closed, unable to recv'.format(client.address))
|
||||
log.debug('tcp stream to %s closed, unable to recv', client.address)
|
||||
client.close()
|
||||
self._remove_client_present(client)
|
||||
self.clients.discard(client)
|
||||
|
@ -1276,7 +1274,7 @@ class PubServer(tornado.tcpserver.TCPServer, object):
|
|||
continue
|
||||
|
||||
def handle_stream(self, stream, address):
|
||||
log.trace('Subscriber at {0} connected'.format(address))
|
||||
log.trace('Subscriber at %s connected', address)
|
||||
client = Subscriber(stream, address)
|
||||
self.clients.add(client)
|
||||
self.io_loop.spawn_callback(self._stream_read, client)
|
||||
|
@ -1284,7 +1282,7 @@ class PubServer(tornado.tcpserver.TCPServer, object):
|
|||
# TODO: ACK the publish through IPC
|
||||
@tornado.gen.coroutine
|
||||
def publish_payload(self, package, _):
|
||||
log.debug('TCP PubServer sending payload: {0}'.format(package))
|
||||
log.debug('TCP PubServer sending payload: %s', package)
|
||||
payload = salt.transport.frame.frame_msg(package['payload'])
|
||||
|
||||
to_remove = []
|
||||
|
@ -1305,7 +1303,7 @@ class PubServer(tornado.tcpserver.TCPServer, object):
|
|||
except tornado.iostream.StreamClosedError:
|
||||
to_remove.append(client)
|
||||
else:
|
||||
log.debug('Publish target {0} not connected'.format(topic))
|
||||
log.debug('Publish target %s not connected', topic)
|
||||
else:
|
||||
for client in self.clients:
|
||||
try:
|
||||
|
@ -1315,7 +1313,7 @@ class PubServer(tornado.tcpserver.TCPServer, object):
|
|||
except tornado.iostream.StreamClosedError:
|
||||
to_remove.append(client)
|
||||
for client in to_remove:
|
||||
log.debug('Subscriber at {0} has disconnected from publisher'.format(client.address))
|
||||
log.debug('Subscriber at %s has disconnected from publisher', client.address)
|
||||
client.close()
|
||||
self._remove_client_present(client)
|
||||
self.clients.discard(client)
|
||||
|
@ -1378,7 +1376,7 @@ class TCPPubServerChannel(salt.transport.server.PubServerChannel):
|
|||
)
|
||||
|
||||
# Securely create socket
|
||||
log.info('Starting the Salt Puller on {0}'.format(pull_uri))
|
||||
log.info('Starting the Salt Puller on %s', pull_uri)
|
||||
old_umask = os.umask(0o177)
|
||||
try:
|
||||
pull_sock.start()
|
||||
|
|
|
@ -4,7 +4,7 @@ Zeromq transport classes
|
|||
'''
|
||||
|
||||
# Import Python Libs
|
||||
from __future__ import absolute_import
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
import os
|
||||
import sys
|
||||
import copy
|
||||
|
@ -125,16 +125,16 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
|
|||
key = cls.__key(opts, **kwargs)
|
||||
obj = loop_instance_map.get(key)
|
||||
if obj is None:
|
||||
log.debug('Initializing new AsyncZeroMQReqChannel for {0}'.format(key))
|
||||
log.debug('Initializing new AsyncZeroMQReqChannel for %s', key)
|
||||
# we need to make a local variable for this, as we are going to store
|
||||
# it in a WeakValueDictionary-- which will remove the item if no one
|
||||
# references it-- this forces a reference while we return to the caller
|
||||
obj = object.__new__(cls)
|
||||
obj.__singleton_init__(opts, **kwargs)
|
||||
loop_instance_map[key] = obj
|
||||
log.trace('Inserted key into loop_instance_map id {0} for key {1} and process {2}'.format(id(loop_instance_map), key, os.getpid()))
|
||||
log.trace('Inserted key into loop_instance_map id %s for key %s and process %s', id(loop_instance_map), key, os.getpid())
|
||||
else:
|
||||
log.debug('Re-using AsyncZeroMQReqChannel for {0}'.format(key))
|
||||
log.debug('Re-using AsyncZeroMQReqChannel for %s', key)
|
||||
return obj
|
||||
|
||||
def __deepcopy__(self, memo):
|
||||
|
@ -377,18 +377,20 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
|
|||
self.opts['recon_default'] + self.opts['recon_max']
|
||||
)
|
||||
|
||||
log.debug("Generated random reconnect delay between '{0}ms' and '{1}ms' ({2})".format(
|
||||
log.debug(
|
||||
"Generated random reconnect delay between '%sms' and '%sms' (%s)",
|
||||
self.opts['recon_default'],
|
||||
self.opts['recon_default'] + self.opts['recon_max'],
|
||||
recon_delay)
|
||||
recon_delay
|
||||
)
|
||||
|
||||
log.debug("Setting zmq_reconnect_ivl to '{0}ms'".format(recon_delay))
|
||||
log.debug("Setting zmq_reconnect_ivl to '%sms'", recon_delay)
|
||||
self._socket.setsockopt(zmq.RECONNECT_IVL, recon_delay)
|
||||
|
||||
if hasattr(zmq, 'RECONNECT_IVL_MAX'):
|
||||
log.debug("Setting zmq_reconnect_ivl_max to '{0}ms'".format(
|
||||
self.opts['recon_default'] + self.opts['recon_max'])
|
||||
log.debug(
|
||||
"Setting zmq_reconnect_ivl_max to '%sms'",
|
||||
self.opts['recon_default'] + self.opts['recon_max']
|
||||
)
|
||||
|
||||
self._socket.setsockopt(
|
||||
|
@ -452,7 +454,7 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
|
|||
# 2 includes a header which says who should do it
|
||||
elif messages_len == 2:
|
||||
if messages[0] not in ('broadcast', self.hexid):
|
||||
log.debug('Publish received for not this minion: {0}'.format(messages[0]))
|
||||
log.debug('Publish received for not this minion: %s', messages[0])
|
||||
raise tornado.gen.Return(None)
|
||||
payload = self.serial.loads(messages[1])
|
||||
else:
|
||||
|
@ -607,7 +609,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
|
|||
self.w_uri = 'ipc://{0}'.format(
|
||||
os.path.join(self.opts['sock_dir'], 'workers.ipc')
|
||||
)
|
||||
log.info('Worker binding to socket {0}'.format(self.w_uri))
|
||||
log.info('Worker binding to socket %s', self.w_uri)
|
||||
self._socket.connect(self.w_uri)
|
||||
|
||||
salt.transport.mixins.auth.AESReqServerMixin.post_fork(self, payload_handler, io_loop)
|
||||
|
@ -644,7 +646,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
|
|||
|
||||
# TODO helper functions to normalize payload?
|
||||
if not isinstance(payload, dict) or not isinstance(payload.get('load'), dict):
|
||||
log.error('payload and load must be a dict. Payload was: {0} and load was {1}'.format(payload, payload.get('load')))
|
||||
log.error('payload and load must be a dict. Payload was: %s and load was %s', payload, payload.get('load'))
|
||||
stream.send(self.serial.dumps('payload and load must be a dict'))
|
||||
raise tornado.gen.Return()
|
||||
|
||||
|
@ -687,7 +689,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
|
|||
req_opts['tgt'],
|
||||
)))
|
||||
else:
|
||||
log.error('Unknown req_fun {0}'.format(req_fun))
|
||||
log.error('Unknown req_fun %s', req_fun)
|
||||
# always attempt to return an error to the minion
|
||||
stream.send('Server-side exception handling payload')
|
||||
raise tornado.gen.Return()
|
||||
|
@ -790,11 +792,11 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
|
|||
salt.utils.zeromq.check_ipc_path_max_len(pull_uri)
|
||||
|
||||
# Start the minion command publisher
|
||||
log.info('Starting the Salt Publisher on {0}'.format(pub_uri))
|
||||
log.info('Starting the Salt Publisher on %s', pub_uri)
|
||||
pub_sock.bind(pub_uri)
|
||||
|
||||
# Securely create socket
|
||||
log.info('Starting the Salt Puller on {0}'.format(pull_uri))
|
||||
log.info('Starting the Salt Puller on %s', pull_uri)
|
||||
old_umask = os.umask(0o177)
|
||||
try:
|
||||
pull_sock.bind(pull_uri)
|
||||
|
@ -893,7 +895,7 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
|
|||
tgt_type=load['tgt_type'])
|
||||
match_ids = _res['minions']
|
||||
|
||||
log.debug("Publish Side Match: {0}".format(match_ids))
|
||||
log.debug("Publish Side Match: %s", match_ids)
|
||||
# Send list of miions thru so zmq can target them
|
||||
int_payload['topic_lst'] = match_ids
|
||||
|
||||
|
@ -1057,7 +1059,7 @@ class AsyncReqMessageClient(object):
|
|||
del self.send_timeout_map[message]
|
||||
if future.attempts < future.tries:
|
||||
future.attempts += 1
|
||||
log.debug('SaltReqTimeoutError, retrying. ({0}/{1})'.format(future.attempts, future.tries))
|
||||
log.debug('SaltReqTimeoutError, retrying. (%s/%s)', future.attempts, future.tries)
|
||||
self.send(
|
||||
message,
|
||||
timeout=future.timeout,
|
||||
|
@ -1146,7 +1148,7 @@ class ZeroMQSocketMonitor(object):
|
|||
def monitor_callback(self, msg):
|
||||
evt = zmq.utils.monitor.parse_monitor_message(msg)
|
||||
evt['description'] = self.event_map[evt['event']]
|
||||
log.debug("ZeroMQ event: {0}".format(evt))
|
||||
log.debug("ZeroMQ event: %s", evt)
|
||||
if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
|
||||
self.stop()
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Import Python libs
|
||||
from __future__ import absolute_import
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
|
||||
# Import Salt Libs
|
||||
import salt.transport.client
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
'''
|
||||
|
||||
# Import python libs
|
||||
from __future__ import absolute_import
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
import os
|
||||
import errno
|
||||
import socket
|
||||
|
@ -21,6 +21,7 @@ import salt.transport.server
|
|||
import salt.transport.client
|
||||
import salt.utils.platform
|
||||
|
||||
from salt.ext import six
|
||||
from salt.ext.six.moves import range
|
||||
|
||||
# Import Salt Testing libs
|
||||
|
@ -125,7 +126,7 @@ class IPCMessageClient(BaseIPCReqCase):
|
|||
self.assertEqual(self.payloads[:-1], msgs)
|
||||
|
||||
def test_very_big_message(self):
|
||||
long_str = ''.join([str(num) for num in range(10**5)])
|
||||
long_str = ''.join([six.text_type(num) for num in range(10**5)])
|
||||
msg = {'long_str': long_str, 'stop': True}
|
||||
self.channel.send(msg)
|
||||
self.wait()
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
'''
|
||||
|
||||
# Import python libs
|
||||
from __future__ import absolute_import
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
import threading
|
||||
|
||||
import tornado.gen
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
'''
|
||||
|
||||
# Import python libs
|
||||
from __future__ import absolute_import
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
|
|
Loading…
Add table
Reference in a new issue