PY3 support for Windows master/minion using TCP transport

This allows masters and minions to run on Windows using Python 3. Tested
with Python 3.5.1 64-bit on Windows 7. Only the TCP transport has been
ported. It will likely not work with other transports such as ZeroMQ or
RAET without further work.

Since there were many changes necessary to make this work, here is a list
of common changes:
- Python 3 distinguishes `bytes` from `str`. In some cases `bytes` must be
used (eg read/write to binary file). In other cases `str` must be used
(eg general Salt dictionary look-ups, read/write to text file). Due to
this, there are a lot of extra uses of `salt.utils.to_bytes` in necessary
locations.
- Use `six.itervalues(varname)` instead of `varname.itervalues()`.
- In Python 2.6 and 2.7, the 'b' prefix is ignored. So instead of

```
if six.PY2:
    some_var = 'some_value'
else:
    some_var = b'some_value'
```

we choose to simply use:

```
some_var = b'some_value'

```

`salt/engine/__init__.py`:
- In Python 3, `engine.items()` will return an iterator instead of
a list. Due to this, needed to change `engine.items()[0]` to
`list(engine.items())[0]`.

`salt/minion.py`:
- `contextlib.nested` no longer supported in Python 3. Use
`contextlib.ExitStack` in this case.
- In `Minion.handle_event`, in PY3, `package` is of type `bytes` and `tag`
is of type `str`. Due to this, do all string searches via `tag` instead
of `package` since you can't search for a `str` in a `bytes` object.

`salt/payload.py`:
- Added `encoding` parameter to `Serial.loads`. See description in comments
for details.
- Added `use_bin_type` parameter to `Serial.dumps`. See description in
comments for details.
- When reading / writing local files, encode using `use_bin_type=True` and
decode using `encoding='utf-8'` to distinguish `bytes` and `str` types.

`salt/transport/frame.py`:
- Added `frame_msg_ipc`. This is used for IPC communications where it is
safe to break the wire protocol and so we encode using `use_bin_type=True`.
- Added `decode_embedded_strs`. This is used when it is not safe to break
the wire protocol (eg external communications). This will convert `bytes`
objects to `str` objects if possible. This will search for such objects
within dicts and lists.

`salt/transport/ipc.py`:
- Use `salt.transport.frame.frame_msg_ipc` to encode using
`use_bin_type=True`.
- Use `encoding='utf-8'` to decode such messages and ensure proper
distinction of `bytes` and `str` types.

`salt/transport/tpc.py`:
- Since we need to preserve the wire protocol, we don't use
`use_bin_type=True`. When decoding, we use `decode_embedded_strs` to
convert the embedded `bytes` objects that can be converted to `str`.
- `urlparse` doesn't exist in PY3. Use `urllib.parse` instead.
- `sys.maxint` not supported in PY3. Use `int((1 << 31) - 1)` instead.
Hence `sys.maxint - 1` becomes `int((1 << 31) - 2)`.

`salt/utils/__init__.py`:
- `libc = ctypes.cdll.LoadLibrary(ctypes.util.find_library("c"))` returns
`TypeError` in PY3. This is because `ctypes.util.find_library("c")`
returns `None` and in PY3 `ctypes.cdll.LoadLibrary(None)` is a
`TypeError`.
- On Windows, `salt.utils.fopen` would always read and write files in
binary mode. This has been changed for Python 3 since `bytes` must be
used for binary mode and `str` must be used for text mode and forcing
binary mode would screw things up if you want to read/write to `str`
objects. Preserved original behavior on PY2.
- For `salt.utils.fopen`, when reading and writing text files in PY3, if
an encoding is not specified, it will choose 'utf-8'.

`salt/utils/args.py`:
- In Python 3.5, every time `inspect.getargsspec` is used, a warning would
appear announcing its deprecation (it will be removed entirely in Python
3.6). Due to this, implemented our own version using
`inspect.getfullargspec`.

`salt/utils/event.py`:
- Since all functionality here is used only in IPC (the mechanism for
firing an event from minion to master is outside this file), we can break
the wire protocol and hence encode using `use_bin_type=True` and decode
using `encoding='utf-8'`.

`salt/utils/vt.py`:
- `_subprocess` is not available in PY3 under Windows, so we use an
alternate method to invoke `TerminateProcess` and `GetExitCodeProcess`.

`setup.py`:
- In PY3, `req.read(4096)` returns a `bytes` object. If you try to for
loop through it, each element is an `int` which represents an individual
byte. Trying to write an `int` to `wfh.write` raises an exception. Due to
this, use an alternate approach when writing from a
`http.client.HTTPResponse` object to file in PY3.

Signed-off-by: Sergey Kizunov <sergey.kizunov@ni.com>
This commit is contained in:
Sergey Kizunov 2016-03-18 13:51:12 -05:00
parent 05c62e914d
commit 49f89250d3
18 changed files with 412 additions and 102 deletions

View file

@ -38,6 +38,7 @@ import salt.defaults.exitcodes
import salt.utils
import salt.payload
import salt.transport.client
import salt.transport.frame
import salt.utils.rsax931
import salt.utils.verify
import salt.version
@ -66,7 +67,7 @@ def dropfile(cachedir, user=None):
if os.path.isfile(dfn) and not os.access(dfn, os.W_OK):
os.chmod(dfn, stat.S_IRUSR | stat.S_IWUSR)
with salt.utils.fopen(dfn, 'wb+') as fp_:
fp_.write('')
fp_.write(b'')
os.chmod(dfn, stat.S_IRUSR)
if user:
try:
@ -168,7 +169,7 @@ def gen_signature(priv_path, pub_path, sign_path):
'remove it first and try again'.format(sign_path))
else:
with salt.utils.fopen(sign_path, 'wb+') as sig_f:
sig_f.write(mpub_sig_64)
sig_f.write(salt.utils.to_bytes(mpub_sig_64))
log.trace('Wrote signature to {0}'.format(sign_path))
return True
@ -358,7 +359,10 @@ class AsyncAuth(object):
:rtype: Auth
'''
self.opts = opts
self.token = Crypticle.generate_key_string()
if six.PY2:
self.token = Crypticle.generate_key_string()
else:
self.token = salt.utils.to_bytes(Crypticle.generate_key_string())
self.serial = salt.payload.Serial(self.opts)
self.pub_path = os.path.join(self.opts['pki_dir'], 'minion.pub')
self.rsa_path = os.path.join(self.opts['pki_dir'], 'minion.pem')
@ -698,11 +702,17 @@ class AsyncAuth(object):
except Exception:
return '', ''
digest = hashlib.sha256(key_str).hexdigest()
if six.PY3:
digest = salt.utils.to_bytes(digest)
m_digest = public_decrypt(mkey.publickey(), payload['sig'])
if m_digest != digest:
return '', ''
else:
return '', ''
if six.PY3:
key_str = salt.utils.to_str(key_str)
if '_|-' in key_str:
return key_str.split('_|-')
else:
@ -757,7 +767,7 @@ class AsyncAuth(object):
m_pub_fn = os.path.join(self.opts['pki_dir'], self.mpub)
uid = salt.utils.get_uid(self.opts.get('user', None))
with salt.utils.fpopen(m_pub_fn, 'wb+', uid=uid) as wfh:
wfh.write(payload['pub_key'])
wfh.write(salt.utils.to_bytes(payload['pub_key']))
return True
else:
log.error('Received signed public-key from master {0} '
@ -903,7 +913,9 @@ class AsyncAuth(object):
if not m_pub_exists:
# the minion has not received any masters pubkey yet, write
# the newly received pubkey to minion_master.pub
salt.utils.fopen(m_pub_fn, 'wb+').write(payload['pub_key'])
salt.utils.fopen(m_pub_fn, 'wb+').write(
salt.utils.to_bytes(payload['pub_key'])
)
return self.extract_aes(payload, master_pub=False)
@ -950,7 +962,10 @@ class SAuth(AsyncAuth):
:rtype: Auth
'''
self.opts = opts
self.token = Crypticle.generate_key_string()
if six.PY2:
self.token = Crypticle.generate_key_string()
else:
self.token = salt.utils.to_bytes(Crypticle.generate_key_string())
self.serial = salt.payload.Serial(self.opts)
self.pub_path = os.path.join(self.opts['pki_dir'], 'minion.pub')
self.rsa_path = os.path.join(self.opts['pki_dir'], 'minion.pem')
@ -1136,7 +1151,7 @@ class Crypticle(object):
Signing algorithm: HMAC-SHA256
'''
PICKLE_PAD = 'pickle::'
PICKLE_PAD = b'pickle::'
AES_BLOCK_SIZE = 16
SIG_SIZE = hashlib.sha256().digest_size
@ -1156,7 +1171,10 @@ class Crypticle(object):
@classmethod
def extract_keys(cls, key_string, key_size):
key = key_string.decode('base64')
if six.PY2:
key = key_string.decode('base64')
else:
key = salt.utils.to_bytes(base64.b64decode(key_string))
assert len(key) == key_size / 8 + cls.SIG_SIZE, 'invalid key'
return key[:-cls.SIG_SIZE], key[-cls.SIG_SIZE:]
@ -1166,7 +1184,10 @@ class Crypticle(object):
'''
aes_key, hmac_key = self.keys
pad = self.AES_BLOCK_SIZE - len(data) % self.AES_BLOCK_SIZE
data = data + pad * chr(pad)
if six.PY2:
data = data + pad * chr(pad)
else:
data = data + salt.utils.to_bytes(pad * chr(pad))
iv_bytes = os.urandom(self.AES_BLOCK_SIZE)
cypher = AES.new(aes_key, AES.MODE_CBC, iv_bytes)
data = iv_bytes + cypher.encrypt(data)
@ -1185,8 +1206,13 @@ class Crypticle(object):
log.debug('Failed to authenticate message')
raise AuthenticationError('message authentication failed')
result = 0
for zipped_x, zipped_y in zip(mac_bytes, sig):
result |= ord(zipped_x) ^ ord(zipped_y)
if six.PY2:
for zipped_x, zipped_y in zip(mac_bytes, sig):
result |= ord(zipped_x) ^ ord(zipped_y)
else:
for zipped_x, zipped_y in zip(mac_bytes, sig):
result |= zipped_x ^ zipped_y
if result != 0:
log.debug('Failed to authenticate message')
raise AuthenticationError('message authentication failed')
@ -1194,7 +1220,10 @@ class Crypticle(object):
data = data[self.AES_BLOCK_SIZE:]
cypher = AES.new(aes_key, AES.MODE_CBC, iv_bytes)
data = cypher.decrypt(data)
return data[:-ord(data[-1])]
if six.PY2:
return data[:-ord(data[-1])]
else:
return data[:-data[-1]]
def dumps(self, obj):
'''
@ -1210,4 +1239,7 @@ class Crypticle(object):
# simple integrity check to verify that we got meaningful data
if not data.startswith(self.PICKLE_PAD):
return {}
return self.serial.loads(data[len(self.PICKLE_PAD):])
load = self.serial.loads(data[len(self.PICKLE_PAD):])
if six.PY3:
load = salt.transport.frame.decode_embedded_strs(load)
return load

View file

@ -229,6 +229,9 @@ def access_keys(opts):
if os.path.exists(keyfile):
log.debug('Removing stale keyfile: {0}'.format(keyfile))
if salt.utils.is_windows() and not os.access(keyfile, os.W_OK):
# Cannot delete read-only files on Windows.
os.chmod(keyfile, stat.S_IRUSR | stat.S_IWUSR)
os.unlink(keyfile)
key = salt.crypt.Crypticle.generate_key_string()

View file

@ -43,7 +43,7 @@ def start_engines(opts, proc_mgr):
for engine in engines_opt:
if isinstance(engine, dict):
engine, engine_opts = engine.items()[0]
engine, engine_opts = list(engine.items())[0]
else:
engine_opts = None
fun = '{0}.start'.format(engine)

View file

@ -957,12 +957,12 @@ class AESFuncs(object):
.format(tmp_pub, err))
try:
os.remove(tmp_pub)
if salt.crypt.public_decrypt(pub, token) == 'salt':
if salt.crypt.public_decrypt(pub, token) == b'salt':
return True
except ValueError as err:
log.error('Unable to decrypt token: {0}'.format(err))
log.error('Salt minion claiming to be {0} has attempted to'
log.error('Salt minion claiming to be {0} has attempted to '
'communicate with the master and could not be verified'
.format(id_))
return False

View file

@ -1217,11 +1217,18 @@ class Minion(MinionBase):
def ctx(self):
'''Return a single context manager for the minion's data
'''
return contextlib.nested(
self.functions.context_dict.clone(),
self.returners.context_dict.clone(),
self.executors.context_dict.clone(),
)
if six.PY2:
return contextlib.nested(
self.functions.context_dict.clone(),
self.returners.context_dict.clone(),
self.executors.context_dict.clone(),
)
else:
exitstack = contextlib.ExitStack()
exitstack.push(self.functions.context_dict.clone())
exitstack.push(self.returners.context_dict.clone())
exitstack.push(self.executors.context_dict.clone())
return exitstack
@classmethod
def _target(cls, minion_instance, opts, data):
@ -1804,31 +1811,31 @@ class Minion(MinionBase):
'''
tag, data = salt.utils.event.SaltEvent.unpack(package)
log.debug('Handling event tag \'{0}\''.format(tag))
if package.startswith('module_refresh'):
if tag.startswith('module_refresh'):
self.module_refresh(notify=data.get('notify', False))
elif package.startswith('pillar_refresh'):
elif tag.startswith('pillar_refresh'):
yield self.pillar_refresh()
elif package.startswith('manage_schedule'):
elif tag.startswith('manage_schedule'):
self.manage_schedule(tag, data)
elif package.startswith('manage_beacons'):
elif tag.startswith('manage_beacons'):
self.manage_beacons(tag, data)
elif package.startswith('grains_refresh'):
elif tag.startswith('grains_refresh'):
if self.grains_cache != self.opts['grains']:
self.pillar_refresh(force_refresh=True)
self.grains_cache = self.opts['grains']
elif package.startswith('environ_setenv'):
elif tag.startswith('environ_setenv'):
self.environ_setenv(tag, data)
elif package.startswith('_minion_mine'):
elif tag.startswith('_minion_mine'):
self._mine_send(tag, data)
elif package.startswith('fire_master'):
elif tag.startswith('fire_master'):
log.debug('Forwarding master event tag={tag}'.format(tag=data['tag']))
self._fire_master(data['data'], data['tag'], data['events'], data['pretag'])
elif package.startswith('__master_disconnected') or package.startswith('__master_failback'):
elif tag.startswith('__master_disconnected') or tag.startswith('__master_failback'):
# if the master disconnect event is for a different master, raise an exception
if package.startswith('__master_disconnected') and data['master'] != self.opts['master']:
if tag.startswith('__master_disconnected') and data['master'] != self.opts['master']:
raise SaltException('Bad master disconnected \'{0}\' when mine one is \'{1}\''.format(
data['master'], self.opts['master']))
if package.startswith('__master_failback'):
if tag.startswith('__master_failback'):
# if the master failback event is not for the top master, raise an exception
if data['master'] != self.opts['master_list'][0]:
raise SaltException('Bad master \'{0}\' when mine failback is \'{1}\''.format(
@ -1909,7 +1916,7 @@ class Minion(MinionBase):
else:
self.schedule.delete_job(name='__master_failback', persist=True)
elif package.startswith('__master_connected'):
elif tag.startswith('__master_connected'):
# handle this event only once. otherwise it will pollute the log
if not self.connected:
log.info('Connection to master {0} re-established'.format(self.opts['master']))
@ -1928,9 +1935,9 @@ class Minion(MinionBase):
self.schedule.modify_job(name='__master_alive',
schedule=schedule)
elif package.startswith('__schedule_return'):
elif tag.startswith('__schedule_return'):
self._return_pub(data, ret_cmd='_return', sync=False)
elif package.startswith('_salt_error'):
elif tag.startswith('_salt_error'):
log.debug('Forwarding salt error event tag={tag}'.format(tag=tag))
self._fire_master(data, tag)

View file

@ -108,13 +108,33 @@ class Serial(object):
else:
self.serial = 'msgpack'
def loads(self, msg):
def loads(self, msg, encoding=None):
'''
Run the correct loads serialization format
:param encoding: Useful for Python 3 support. If the msgpack data
was encoded using "use_bin_type=True", this will
differentiate between the 'bytes' type and the
'str' type by decoding contents with 'str' type
to what the encoding was set as. Recommended
encoding is 'utf-8' when using Python 3.
If the msgpack data was not encoded using
"use_bin_type=True", it will try to decode
all 'bytes' and 'str' data (the distinction has
been lost in this case) to what the encoding is
set as. In this case, it will fail if any of
the contents cannot be converted.
'''
try:
gc.disable() # performance optimization for msgpack
return msgpack.loads(msg, use_list=True)
if msgpack.version >= (0, 4, 0):
# msgpack only supports 'encoding' starting in 0.4.0.
# Due to this, if we don't need it, don't pass it at all so
# that under Python 2 we can still work with older versions
# of msgpack.
return msgpack.loads(msg, use_list=True, encoding=encoding)
else:
return msgpack.loads(msg, use_list=True)
except Exception as exc:
log.critical('Could not deserialize msgpack message: {0}'
'This often happens when trying to read a file not in binary mode.'
@ -130,14 +150,30 @@ class Serial(object):
data = fn_.read()
fn_.close()
if data:
return self.loads(data)
if six.PY3:
return self.loads(data, encoding='utf-8')
else:
return self.loads(data)
def dumps(self, msg):
def dumps(self, msg, use_bin_type=False):
'''
Run the correct dumps serialization format
:param use_bin_type: Useful for Python 3 support. Tells msgpack to
differentiate between 'str' and 'bytes' types
by encoding them differently.
Since this changes the wire protocol, this
option should not be used outside of IPC.
'''
try:
return msgpack.dumps(msg)
if msgpack.version >= (0, 4, 0):
# msgpack only supports 'use_bin_type' starting in 0.4.0.
# Due to this, if we don't need it, don't pass it at all so
# that under Python 2 we can still work with older versions
# of msgpack.
return msgpack.dumps(msg, use_bin_type=use_bin_type)
else:
return msgpack.dumps(msg)
except (OverflowError, msgpack.exceptions.PackValueError):
# msgpack can't handle the very long Python longs for jids
# Convert any very long longs to strings
@ -158,7 +194,10 @@ class Serial(object):
return str(obj)
else:
return obj
return msgpack.dumps(verylong_encoder(msg))
if msgpack.version >= (0, 4, 0):
return msgpack.dumps(verylong_encoder(msg), use_bin_type=use_bin_type)
else:
return msgpack.dumps(verylong_encoder(msg))
except TypeError as e:
# msgpack doesn't support datetime.datetime datatype
# So here we have converted datetime.datetime to custom datatype
@ -168,7 +207,10 @@ class Serial(object):
def dt_encode(obj):
datetime_str = obj.strftime("%Y%m%dT%H:%M:%S.%f")
return msgpack.packb(datetime_str, default=default)
if msgpack.version >= (0, 4, 0):
return msgpack.packb(datetime_str, default=default, use_bin_type=use_bin_type)
else:
return msgpack.packb(datetime_str, default=default)
def datetime_encoder(obj):
if isinstance(obj, dict):
@ -186,7 +228,10 @@ class Serial(object):
return obj
if "datetime.datetime" in str(e):
return msgpack.dumps(datetime_encoder(msg))
if msgpack.version >= (0, 4, 0):
return msgpack.dumps(datetime_encoder(msg), use_bin_type=use_bin_type)
else:
return msgpack.dumps(datetime_encoder(msg))
if msgpack.version >= (0, 2, 0):
# Should support OrderedDict serialization, so, let's
@ -210,7 +255,10 @@ class Serial(object):
obj[idx] = odict_encoder(entry)
return obj
return obj
return msgpack.dumps(odict_encoder(msg))
if msgpack.version >= (0, 4, 0):
return msgpack.dumps(odict_encoder(msg), use_bin_type=use_bin_type)
else:
return msgpack.dumps(odict_encoder(msg))
except (SystemError, TypeError) as exc: # pylint: disable=W0705
log.critical('Unable to serialize message! Consider upgrading msgpack. '
'Message which failed was {failed_message} '
@ -220,7 +268,13 @@ class Serial(object):
'''
Serialize the correct data into the named file object
'''
fn_.write(self.dumps(msg))
if six.PY2:
fn_.write(self.dumps(msg))
else:
# When using Python 3, write files in such a way
# that the 'bytes' and 'str' types are distinguishable
# by using "use_bin_type=True".
fn_.write(self.dumps(msg, use_bin_type=True))
fn_.close()

View file

@ -123,7 +123,7 @@ def prep_jid(nocache=False, passed_jid=None, recurse_count=0):
fn_.write(bytes(jid, 'utf-8'))
if nocache:
with salt.utils.fopen(os.path.join(jid_dir_, 'nocache'), 'wb+') as fn_:
fn_.write('')
fn_.write(b'')
except IOError:
log.warning('Could not write out jid file for job {0}. Retrying.'.format(jid))
time.sleep(0.1)

View file

@ -5,9 +5,10 @@ Helper functions for transport components to handle message framing
# Import python libs
from __future__ import absolute_import
import msgpack
import salt.ext.six as six
def frame_msg(body, header=None, raw_body=False):
def frame_msg(body, header=None, raw_body=False): # pylint: disable=unused-argument
'''
Frame the given message with our wire protocol
'''
@ -18,3 +19,94 @@ def frame_msg(body, header=None, raw_body=False):
framed_msg['head'] = header
framed_msg['body'] = body
return msgpack.dumps(framed_msg)
def frame_msg_ipc(body, header=None, raw_body=False): # pylint: disable=unused-argument
'''
Frame the given message with our wire protocol for IPC
For IPC, we don't need to be backwards compatible, so
use the more efficient "use_bin_type=True" on Python 3.
'''
framed_msg = {}
if header is None:
header = {}
framed_msg['head'] = header
framed_msg['body'] = body
if six.PY2:
return msgpack.dumps(framed_msg)
else:
return msgpack.dumps(framed_msg, use_bin_type=True)
def _decode_embedded_list(src):
'''
Convert enbedded bytes to strings if possible.
List helper.
'''
output = []
for elem in src:
if isinstance(elem, dict):
elem = _decode_embedded_dict(elem)
elif isinstance(elem, list):
elem = _decode_embedded_list(elem) # pylint: disable=redefined-variable-type
elif isinstance(elem, bytes):
try:
elem = elem.decode()
except UnicodeError:
pass
output.append(elem)
return output
def _decode_embedded_dict(src):
'''
Convert enbedded bytes to strings if possible.
Dict helper.
'''
output = {}
for key, val in six.iteritems(src):
if isinstance(val, dict):
val = _decode_embedded_dict(val)
elif isinstance(val, list):
val = _decode_embedded_list(val) # pylint: disable=redefined-variable-type
elif isinstance(val, bytes):
try:
val = val.decode()
except UnicodeError:
pass
if isinstance(key, bytes):
try:
key = key.decode()
except UnicodeError:
pass
output[key] = val
return output
def decode_embedded_strs(src):
'''
Convert enbedded bytes to strings if possible.
This is necessary because Python 3 makes a distinction
between these types.
This wouldn't be needed if we used "use_bin_type=True" when encoding
and "encoding='utf-8'" when decoding. Unfortunately, this would break
backwards compatibility due to a change in wire protocol, so this less
than ideal solution is used instead.
'''
if not six.PY3:
return src
if isinstance(src, dict):
return _decode_embedded_dict(src)
elif isinstance(src, list):
return _decode_embedded_list(src)
elif isinstance(src, bytes):
try:
return src.decode() # pylint: disable=redefined-variable-type
except UnicodeError:
return src
else:
return src

View file

@ -22,6 +22,7 @@ from tornado.iostream import IOStream
# Import Salt libs
import salt.transport.client
import salt.transport.frame
import salt.ext.six as six
log = logging.getLogger(__name__)
@ -152,7 +153,7 @@ class IPCServer(object):
if header.get('mid'):
@tornado.gen.coroutine
def return_message(msg):
pack = salt.transport.frame.frame_msg(
pack = salt.transport.frame.frame_msg_ipc(
msg,
header={'mid': header['mid']},
raw_body=True,
@ -161,7 +162,11 @@ class IPCServer(object):
return return_message
else:
return _null
unpacker = msgpack.Unpacker()
if six.PY2:
encoding = None
else:
encoding = 'utf-8'
unpacker = msgpack.Unpacker(encoding=encoding)
while not stream.closed():
try:
wire_bytes = yield stream.read_bytes(4096, partial=True)
@ -254,7 +259,11 @@ class IPCClient(object):
self.socket_path = socket_path
self._closing = False
self.stream = None
self.unpacker = msgpack.Unpacker()
if six.PY2:
encoding = None
else:
encoding = 'utf-8'
self.unpacker = msgpack.Unpacker(encoding=encoding)
def __init__(self, socket_path, io_loop=None):
# Handled by singleton __new__
@ -385,7 +394,7 @@ class IPCMessageClient(IPCClient):
'''
if not self.connected():
yield self.connect()
pack = salt.transport.frame.frame_msg(msg, raw_body=True)
pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True)
yield self.stream.write(pack)
@ -498,7 +507,7 @@ class IPCMessagePublisher(object):
if not len(self.streams):
return
pack = salt.transport.frame.frame_msg(msg, raw_body=True)
pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True)
for stream in self.streams:
self.io_loop.spawn_callback(self._write, stream, pack)

View file

@ -14,7 +14,9 @@ import binascii
import salt.crypt
import salt.payload
import salt.master
import salt.transport.frame
import salt.utils.event
import salt.ext.six as six
from salt.utils.cache import CacheCli
# Import Third Party Libs
@ -109,7 +111,10 @@ class AESReqServerMixin(object):
pret = {}
cipher = PKCS1_OAEP.new(pub)
pret['key'] = cipher.encrypt(key)
if six.PY2:
pret['key'] = cipher.encrypt(key)
else:
pret['key'] = cipher.encrypt(salt.utils.to_bytes(key))
pret[dictkey] = pcrypt.dumps(
ret if ret is not False else {}
)

View file

@ -11,10 +11,8 @@ from __future__ import absolute_import
import logging
import msgpack
import socket
import sys
import os
import weakref
import urlparse # TODO: remove
import time
import traceback
@ -31,6 +29,7 @@ import salt.transport.ipc
import salt.transport.client
import salt.transport.server
import salt.transport.mixins.auth
import salt.ext.six as six
from salt.exceptions import SaltReqTimeoutError, SaltClientError
from salt.transport import iter_transport_opts
@ -42,6 +41,13 @@ import tornado.concurrent
import tornado.tcpclient
import tornado.netutil
# pylint: disable=import-error,no-name-in-module
if six.PY2:
import urlparse
else:
import urllib.parse as urlparse
# pylint: enable=import-error,no-name-in-module
# Import third party libs
from Crypto.Cipher import PKCS1_OAEP
@ -199,7 +205,10 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
cipher = PKCS1_OAEP.new(key)
aes = cipher.decrypt(ret['key'])
pcrypt = salt.crypt.Crypticle(self.opts, aes)
raise tornado.gen.Return(pcrypt.loads(ret[dictkey]))
data = pcrypt.loads(ret[dictkey])
if six.PY3:
data = salt.transport.frame.decode_embedded_strs(data)
raise tornado.gen.Return(data)
@tornado.gen.coroutine
def _crypted_transfer(self, load, tries=3, timeout=60):
@ -220,6 +229,8 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
# upload the results to the master
if data:
data = self.auth.crypticle.loads(data)
if six.PY3:
data = salt.transport.frame.decode_embedded_strs(data)
raise tornado.gen.Return(data)
if not self.auth.authenticated:
@ -413,6 +424,8 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
# TODO: For some reason we need to decode here for things
# to work. Fix this.
body = msgpack.loads(body)
if six.PY3:
body = salt.transport.frame.decode_embedded_strs(body)
ret = yield self._decode_payload(body)
callback(ret)
return self.message_client.on_recv(wrap_callback)
@ -549,6 +562,10 @@ class SaltMessageServer(tornado.tcpserver.TCPServer, object):
wire_bytes = yield stream.read_bytes(4096, partial=True)
unpacker.feed(wire_bytes)
for framed_msg in unpacker:
if six.PY3:
framed_msg = salt.transport.frame.decode_embedded_strs(
framed_msg
)
header = framed_msg['head']
self.io_loop.spawn_callback(self.message_handler, stream, header, framed_msg['body'])
@ -611,7 +628,7 @@ class SaltMessageClient(object):
opts, io_loop=self.io_loop, resolver=resolver)
self._mid = 1
self._max_messages = sys.maxint - 1 # number of IDs before we wrap
self._max_messages = int((1 << 31) - 2) # number of IDs before we wrap
# TODO: max queue size
self.send_queue = [] # queue of messages to be sent
@ -698,20 +715,25 @@ class SaltMessageClient(object):
wire_bytes = yield self._read_until_future
unpacker.feed(wire_bytes)
for framed_msg in unpacker:
if six.PY3:
framed_msg = salt.transport.frame.decode_embedded_strs(
framed_msg
)
header = framed_msg['head']
body = framed_msg['body']
message_id = header.get('mid')
if message_id in self.send_future_map:
self.send_future_map.pop(message_id).set_result(framed_msg['body'])
self.send_future_map.pop(message_id).set_result(body)
self.remove_message_timeout(message_id)
else:
if self._on_recv is not None:
self.io_loop.spawn_callback(self._on_recv, header, framed_msg['body'])
self.io_loop.spawn_callback(self._on_recv, header, body)
else:
log.error('Got response for message_id {0} that we are not tracking'.format(message_id))
except tornado.iostream.StreamClosedError as e:
log.debug('tcp stream to {0}:{1} closed, unable to recv'.format(self.host, self.port))
for future in self.send_future_map.itervalues():
for future in six.itervalues(self.send_future_map):
future.set_exception(e)
self.send_future_map = {}
if self._closing:
@ -724,7 +746,7 @@ class SaltMessageClient(object):
yield self._connecting_future
except Exception as e:
log.error('Exception parsing response', exc_info=True)
for future in self.send_future_map.itervalues():
for future in six.itervalues(self.send_future_map):
future.set_exception(e)
self.send_future_map = {}
if self._closing:
@ -956,12 +978,18 @@ class PubServer(tornado.tcpserver.TCPServer, object):
wire_bytes = yield client._read_until_future
unpacker.feed(wire_bytes)
for framed_msg in unpacker:
if six.PY3:
framed_msg = salt.transport.frame.decode_embedded_strs(
framed_msg
)
body = framed_msg['body']
if body['enc'] != 'aes':
# We only accept 'aes' encoded messages for 'id'
continue
crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)
load = crypticle.loads(body['load'])
if six.PY3:
load = salt.transport.frame.decode_embedded_strs(load)
if not self.aes_funcs.verify_minion(load['id'], load['tok']):
continue
client.id_ = load['id']

View file

@ -111,7 +111,7 @@ try:
libc = ctypes.cdll.LoadLibrary(ctypes.util.find_library("c"))
res_init = libc.__res_init
HAS_RESINIT = True
except (ImportError, OSError, AttributeError):
except (ImportError, OSError, AttributeError, TypeError):
HAS_RESINIT = False
# Import salt libs
@ -1192,19 +1192,32 @@ def fopen(*args, **kwargs):
NB! We still have small race condition between open and fcntl.
'''
# ensure 'binary' mode is always used on windows
if kwargs.pop('binary', True):
if is_windows():
if len(args) > 1:
args = list(args)
if 'b' not in args[1]:
args[1] += 'b'
elif kwargs.get('mode', None):
if 'b' not in kwargs['mode']:
kwargs['mode'] += 'b'
else:
# the default is to read
kwargs['mode'] = 'rb'
# ensure 'binary' mode is always used on Windows in Python 2
if ((six.PY2 and is_windows() and 'binary' not in kwargs) or
kwargs.pop('binary', False)):
if len(args) > 1:
args = list(args)
if 'b' not in args[1]:
args[1] += 'b'
elif kwargs.get('mode', None):
if 'b' not in kwargs['mode']:
kwargs['mode'] += 'b'
else:
# the default is to read
kwargs['mode'] = 'rb'
elif six.PY3 and 'encoding' not in kwargs:
# In Python 3, if text mode is used and the encoding
# is not specified, set the encoding to 'utf-8'.
binary = False
if len(args) > 1:
args = list(args)
if 'b' in args[1]:
binary = True
if kwargs.get('mode', None):
if 'b' in kwargs['mode']:
binary = True
if not binary:
kwargs['encoding'] = 'utf-8'
fhandle = open(*args, **kwargs)
if is_fcntl_available():

View file

@ -150,6 +150,25 @@ def yamlify_arg(arg):
return original_arg
if six.PY3:
from collections import namedtuple # pylint: disable=wrong-import-position,wrong-import-order
_ArgSpec = namedtuple('ArgSpec', 'args varargs keywords defaults')
def _getargspec(func):
'''
Python 3 wrapper for inspect.getargsspec
inspect.getargsspec is deprecated and will be removed in Python 3.6.
'''
args, varargs, varkw, defaults, kwonlyargs, _, ann = \
inspect.getfullargspec(func) # pylint: disable=no-member
if kwonlyargs or ann:
raise ValueError('Function has keyword-only arguments or annotations'
', use getfullargspec() API which can support them')
return _ArgSpec(args, varargs, varkw, defaults)
def get_function_argspec(func):
'''
A small wrapper around getargspec that also supports callable classes
@ -157,15 +176,30 @@ def get_function_argspec(func):
if not callable(func):
raise TypeError('{0} is not a callable'.format(func))
if inspect.isfunction(func):
aspec = inspect.getargspec(func)
elif inspect.ismethod(func):
aspec = inspect.getargspec(func)
del aspec.args[0] # self
elif isinstance(func, object):
aspec = inspect.getargspec(func.__call__)
del aspec.args[0] # self
if six.PY2:
if inspect.isfunction(func):
aspec = inspect.getargspec(func)
elif inspect.ismethod(func):
aspec = inspect.getargspec(func)
del aspec.args[0] # self
elif isinstance(func, object):
aspec = inspect.getargspec(func.__call__)
del aspec.args[0] # self
else:
raise TypeError(
'Cannot inspect argument list for \'{0}\''.format(func)
)
else:
raise TypeError('Cannot inspect argument list for \'{0}\''.format(func))
if inspect.isfunction(func):
aspec = _getargspec(func) # pylint: disable=redefined-variable-type
elif inspect.ismethod(func):
aspec = _getargspec(func)
del aspec.args[0] # self
elif isinstance(func, object):
aspec = _getargspec(func.__call__)
del aspec.args[0] # self
else:
raise TypeError(
'Cannot inspect argument list for \'{0}\''.format(func)
)
return aspec

View file

@ -17,7 +17,7 @@ import threading
import collections
from contextlib import contextmanager
import salt.ext.six
import salt.ext.six as six
@contextmanager
@ -132,7 +132,7 @@ class ChildContextDict(collections.MutableMapping):
self._data = {} if overrides is None else overrides
# merge self.global_data into self._data
for k, v in self.parent.global_data.iteritems():
for k, v in six.iteritems(self.parent.global_data):
if k not in self._data:
self._data[k] = copy.deepcopy(v)
@ -166,7 +166,7 @@ class NamespacedDictWrapper(collections.MutableMapping, dict):
'''
def __init__(self, d, pre_keys): # pylint: disable=W0231
self.__dict = d
if isinstance(pre_keys, salt.ext.six.string_types):
if isinstance(pre_keys, six.string_types):
self.pre_keys = (pre_keys,)
else:
self.pre_keys = pre_keys

View file

@ -377,9 +377,13 @@ class SaltEvent(object):
if serial is None:
serial = salt.payload.Serial({'serial': 'msgpack'})
mtag, sep, mdata = raw.partition(TAGEND) # split tag from data
data = serial.loads(mdata)
if six.PY2:
mtag, sep, mdata = raw.partition(TAGEND) # split tag from data
data = serial.loads(mdata)
else:
mtag, sep, mdata = raw.partition(salt.utils.to_bytes(TAGEND)) # split tag from data
mtag = salt.utils.to_str(mtag)
data = serial.loads(mdata, encoding='utf-8')
return mtag, data
def _get_match_func(self, match_type=None):
@ -648,13 +652,28 @@ class SaltEvent(object):
data['_stamp'] = datetime.datetime.utcnow().isoformat()
tagend = TAGEND
if six.PY2:
dump_data = self.serial.dumps(data)
else:
# Since the pack / unpack logic here is for local events only,
# it is safe to change the wire protocol. The mechanism
# that sends events from minion to master is outside this
# file.
dump_data = self.serial.dumps(data, use_bin_type=True)
serialized_data = salt.utils.dicttrim.trim_dict(
self.serial.dumps(data),
dump_data,
self.opts['max_event_size'],
is_msgpacked=True,
)
log.debug('Sending event - data = {0}'.format(data))
event = '{0}{1}{2}'.format(tag, tagend, serialized_data)
if six.PY2:
event = '{0}{1}{2}'.format(tag, tagend, serialized_data)
else:
event = b''.join([
salt.utils.to_bytes(tag),
salt.utils.to_bytes(tagend),
serialized_data])
msg = salt.utils.to_bytes(event, 'utf-8')
if self._run_io_loop_sync:
with salt.utils.async.current_ioloop(self.io_loop):

View file

@ -355,7 +355,11 @@ class Schedule(object):
log.debug('Persisting schedule')
try:
with salt.utils.fopen(schedule_conf, 'wb+') as fp_:
fp_.write(yaml.dump({'schedule': self.opts['schedule']}))
fp_.write(
salt.utils.to_bytes(
yaml.dump({'schedule': self.opts['schedule']})
)
)
except (IOError, OSError):
log.error('Failed to persist the updated schedule',
exc_info_on_loglevel=logging.DEBUG)

View file

@ -37,7 +37,9 @@ if mswindows:
from win32file import ReadFile, WriteFile
from win32pipe import PeekNamedPipe
import msvcrt
import _subprocess
import win32api
import win32con
import win32process
# pylint: enable=F0401,W0611
else:
import pty
@ -384,12 +386,12 @@ class Terminal(object):
Terminates the process
'''
try:
_subprocess.TerminateProcess(self._handle, 1)
win32api.TerminateProcess(self._handle, 1)
except OSError:
# ERROR_ACCESS_DENIED (winerror 5) is received when the
# process already died.
ecode = _subprocess.GetExitCodeProcess(self._handle)
if ecode == _subprocess.STILL_ACTIVE:
ecode = win32process.GetExitCodeProcess(self._handle)
if ecode == win32con.STILL_ACTIVE:
raise
self.exitstatus = ecode

View file

@ -435,12 +435,20 @@ class DownloadWindowsDlls(Command):
if req.getcode() == 200:
with open(fdest, 'wb') as wfh:
while True:
for chunk in req.read(4096):
if not chunk:
break
if IS_PY3:
while True:
chunk = req.read(4096)
if len(chunk) == 0:
break;
wfh.write(chunk)
wfh.flush()
else:
while True:
for chunk in req.read(4096):
if not chunk:
break
wfh.write(chunk)
wfh.flush()
else:
log.error(
'Failed to download {0}32.dll to {1} from {2}'.format(