* Fix CVE-2020-16846

Stop calling Popen with shell=True to prevent shell injection attacks on
the netapi salt-ssh client.

* Add tests to verify strict permissions on private keys

* Set mode of key files to 0600 instead of leaving them world readable

* Apply pre-commit fixes

* Open files with proper permissions

* Add cve id to changelog

* Security docs updates with newer resource links

* Changelog/Releasenotes update 3001.2

* Add man_pages 3001.2

* cve-2020-17490 consistancy hotfix

* Tests and fix for CVE-2020-25592

* Update man pages 3001.3

* Clear up requirements for salt-api+ssh

* Add ssh_options to roster docs

* Update changelog / releasenotes 3001.3

* Do not overwrite master keys

salt-api should not overwrite the master's keys when it starts up. Give
salt-api it's own cache directory and set of keys.

* Update for 3002.1 Release

* Update releasenotes

* Fix typos and pre-commit

* Fix spelling issue

* Fix pre-commit

* Update 2019.2.6.rst

Fix doc

Co-authored-by: Jasper Lievisse Adriaanse <j@jasper.la>
Co-authored-by: ScriptAutomate <derek@icanteven.io>
Co-authored-by: Frode Gundersen <fgundersen@saltstack.com>
Co-authored-by: Ken Crowell <kcrowell@saltstack.com>
Co-authored-by: Sage the Rage <36676171+sagetherage@users.noreply.github.com>
This commit is contained in:
Daniel Wozniak 2020-11-12 10:48:38 -07:00 committed by GitHub
parent 3dc25dc4ca
commit 023528b3b1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
34 changed files with 1867 additions and 596 deletions

View file

@ -7,6 +7,19 @@ Versions are `MAJOR.PATCH`.
# Changelog
Salt 3002.1 (2020-10-26)
========================
Fixed
-----
- Prevent shell injections in netapi ssh client (cve-2020-16846)
- Prevent creating world readable private keys with the tls execution module. (cve-2020-17490)
- Properly validate eauth credentials and tokens along with their ACLs.
Prior to this change eauth was not properly validated when calling
Salt ssh via the salt-api. Any value for 'eauth' or 'token' would allow a user
to bypass authentication and make calls to Salt ssh. (CVE-2020-25592)
Salt 3002 (2020-10-19)
======================
@ -45,7 +58,6 @@ Changed
arguments. ``formatter`` is still supported, but using both ``serializer`` and
``formatter`` will cause the state to fail. (#57858)
Fixed
-----
@ -236,6 +248,25 @@ Added
This flag will be deprecated in the Phosphorus release when this functionality
becomes the default. (#58652)
Salt 3001.3
===========
Fixed
-----
- Properly validate eauth credentials and tokens along with their ACLs.
Prior to this change eauth was not properly validated when calling
Salt ssh via the salt-api. Any value for 'eauth' or 'token' would allow a user
to bypass authentication and make calls to Salt ssh. (CVE-2020-25592)
Salt 3001.2
===========
Fixed
-----
- Prevent shell injections in netapi ssh client (cve-2020-16846)
- Prevent creating world readable private keys with the tls execution module. (cve-2020-17490)
Salt 3001.1 (2020-07-27)
========================
@ -663,8 +694,26 @@ Added
- [#56637](https://github.com/saltstack/salt/pull/56637) - Add ``win_wua.installed`` to the ``win_wua`` execution module
- Clarify how to get the master fingerprint (#54699)
Salt 3000.5
===========
Fixed
-----
- Properly validate eauth credentials and tokens along with their ACLs.
Prior to this change eauth was not properly validated when calling
Salt ssh via the salt-api. Any value for 'eauth' or 'token' would allow a user
to bypass authentication and make calls to Salt ssh. (CVE-2020-25592)
Salt 3000.4
===========
Fixed
-----
- Prevent shell injections in netapi ssh client (cve-2020-16846)
- Prevent creating world readable private keys with the tls execution module. (cve-2020-17490)
## 3000.1
### 3000.3
### Fixed

View file

@ -1,6 +1,6 @@
.\" Man page generated from reStructuredText.
.
.TH "SALT-API" "1" "Oct 05, 2020" "3002" "Salt"
.TH "SALT-API" "1" "Oct 26, 2020" "3002.1" "Salt"
.SH NAME
salt-api \- salt-api Command
.

View file

@ -1,6 +1,6 @@
.\" Man page generated from reStructuredText.
.
.TH "SALT-CALL" "1" "Oct 05, 2020" "3002" "Salt"
.TH "SALT-CALL" "1" "Oct 26, 2020" "3002.1" "Salt"
.SH NAME
salt-call \- salt-call Documentation
.

View file

@ -1,6 +1,6 @@
.\" Man page generated from reStructuredText.
.
.TH "SALT-CLOUD" "1" "Oct 05, 2020" "3002" "Salt"
.TH "SALT-CLOUD" "1" "Oct 26, 2020" "3002.1" "Salt"
.SH NAME
salt-cloud \- Salt Cloud Command
.

View file

@ -1,6 +1,6 @@
.\" Man page generated from reStructuredText.
.
.TH "SALT-CP" "1" "Oct 05, 2020" "3002" "Salt"
.TH "SALT-CP" "1" "Oct 26, 2020" "3002.1" "Salt"
.SH NAME
salt-cp \- salt-cp Documentation
.

View file

@ -1,6 +1,6 @@
.\" Man page generated from reStructuredText.
.
.TH "SALT-KEY" "1" "Oct 05, 2020" "3002" "Salt"
.TH "SALT-KEY" "1" "Oct 26, 2020" "3002.1" "Salt"
.SH NAME
salt-key \- salt-key Documentation
.

View file

@ -1,6 +1,6 @@
.\" Man page generated from reStructuredText.
.
.TH "SALT-MASTER" "1" "Oct 05, 2020" "3002" "Salt"
.TH "SALT-MASTER" "1" "Oct 26, 2020" "3002.1" "Salt"
.SH NAME
salt-master \- salt-master Documentation
.

View file

@ -1,6 +1,6 @@
.\" Man page generated from reStructuredText.
.
.TH "SALT-MINION" "1" "Oct 05, 2020" "3002" "Salt"
.TH "SALT-MINION" "1" "Oct 26, 2020" "3002.1" "Salt"
.SH NAME
salt-minion \- salt-minion Documentation
.

View file

@ -1,6 +1,6 @@
.\" Man page generated from reStructuredText.
.
.TH "SALT-PROXY" "1" "Oct 05, 2020" "3002" "Salt"
.TH "SALT-PROXY" "1" "Oct 26, 2020" "3002.1" "Salt"
.SH NAME
salt-proxy \- salt-proxy Documentation
.

View file

@ -1,6 +1,6 @@
.\" Man page generated from reStructuredText.
.
.TH "SALT-RUN" "1" "Oct 05, 2020" "3002" "Salt"
.TH "SALT-RUN" "1" "Oct 26, 2020" "3002.1" "Salt"
.SH NAME
salt-run \- salt-run Documentation
.

View file

@ -1,6 +1,6 @@
.\" Man page generated from reStructuredText.
.
.TH "SALT-SSH" "1" "Oct 05, 2020" "3002" "Salt"
.TH "SALT-SSH" "1" "Oct 26, 2020" "3002.1" "Salt"
.SH NAME
salt-ssh \- salt-ssh Documentation
.

View file

@ -1,6 +1,6 @@
.\" Man page generated from reStructuredText.
.
.TH "SALT-SYNDIC" "1" "Oct 05, 2020" "3002" "Salt"
.TH "SALT-SYNDIC" "1" "Oct 26, 2020" "3002.1" "Salt"
.SH NAME
salt-syndic \- salt-syndic Documentation
.

View file

@ -1,6 +1,6 @@
.\" Man page generated from reStructuredText.
.
.TH "SALT-UNITY" "1" "Oct 05, 2020" "3002" "Salt"
.TH "SALT-UNITY" "1" "Oct 26, 2020" "3002.1" "Salt"
.SH NAME
salt-unity \- salt-unity Command
.

View file

@ -1,6 +1,6 @@
.\" Man page generated from reStructuredText.
.
.TH "SALT" "1" "Oct 05, 2020" "3002" "Salt"
.TH "SALT" "1" "Oct 26, 2020" "3002.1" "Salt"
.SH NAME
salt \- salt
.

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,6 @@
.\" Man page generated from reStructuredText.
.
.TH "SPM" "1" "Oct 05, 2020" "3002" "Salt"
.TH "SPM" "1" "Oct 26, 2020" "3002.1" "Salt"
.SH NAME
spm \- Salt Package Manager Command
.

View file

@ -114,8 +114,20 @@ quickly and safely as is possible.
lists. The announcement contains a description of the issue and a link to
the full release documentation and download locations.
.. _saltstack_security_announcements:
Receiving security announcements
================================
The fastest place to receive security announcements is via the `salt-announce`_
mailing list. This list is low-traffic.
The following mailing lists, per the previous tasks identified in our response
procedure, will receive security-relevant notifications:
* `salt-packagers`_
* `salt-users`_
* `salt-announce`_
In addition to the mailing lists, SaltStack also provides the following resources:
* `SaltStack Security Announcements <https://www.saltstack.com/security-announcements/>`__ landing page
* `SaltStack Security RSS Feed <http://www.saltstack.com/feed/?post_type=security>`__
* `SaltStack Community Slack Workspace <http://saltstackcommunity.slack.com/>`__

View file

@ -10,6 +10,11 @@ heavily on how you use Salt, where you use Salt, how your team is structured,
where you get data from, and what kinds of access (internal and external) you
require.
.. important::
Refer to the :ref:`saltstack_security_announcements` documentation in order to stay updated
and secure.
.. warning::
For historical reasons, Salt requires PyCrypto as a "lowest common
@ -43,7 +48,8 @@ Salt hardening tips
===================
- Subscribe to `salt-users`_ or `salt-announce`_ so you know when new Salt
releases are available. Keep your systems up-to-date with the latest patches.
releases are available.
- Keep your systems up-to-date with the latest patches.
- Use Salt's Client :ref:`ACL system <acl>` to avoid having to give out root
access in order to run Salt commands.
- Use Salt's Client :ref:`ACL system <acl>` to restrict which users can run what commands.

View file

@ -0,0 +1,13 @@
.. _release-2019-2-6:
===========================
Salt 2019.2.6 Release Notes
===========================
Version 2019.2.6 is a CVE fix release for :ref:`2019.2.0 <release-2019-2-0>`.
Fixed
-----
- Prevent shell injections in netapi ssh client (cve-2020-16846)
- Prevent creating world readable private keys with the tls execution module. (cve-2020-17490)

View file

@ -0,0 +1,15 @@
.. _release-2019-2-7:
===========================
Salt 2019.2.7 Release Notes
===========================
Version 2019.2.7 is a CVE fix release for :ref:`2019.2.0 <release-2019-2-0>`.
Fixed
-----
- Properly validate eauth credentials and tokens along with their ACLs.
Prior to this change eauth was not properly validated when calling
Salt ssh via the salt-api. Any value for 'eauth' or 'token' would allow a user
to bypass authentication and make calls to Salt ssh. (CVE-2020-25592)

View file

@ -0,0 +1,13 @@
.. _release-3000-4:
=========================
Salt 3000.4 Release Notes
=========================
Version 3000.4 is a CVE fix release for :ref:`3000 <release-3000>`.
Fixed
-----
- Prevent shell injections in netapi ssh client (cve-2020-16846)
- Prevent creating world readable private keys with the tls execution module. (cve-2020-17490)

View file

@ -0,0 +1,15 @@
.. _release-3000-5:
=========================
Salt 3000.5 Release Notes
=========================
Version 3000.5 is a CVE fix release for :ref:`3000 <release-3000>`.
Fixed
-----
- Properly validate eauth credentials and tokens along with their ACLs.
Prior to this change eauth was not properly validated when calling
Salt ssh via the salt-api. Any value for 'eauth' or 'token' would allow a user
to bypass authentication and make calls to Salt ssh. (CVE-2020-25592)

View file

@ -0,0 +1,13 @@
.. _release-3001-2:
=========================
Salt 3001.2 Release Notes
=========================
Version 3001.2 is a CVE fix release for :ref:`3001 <release-3001>`.
Fixed
-----
- Prevent shell injections in netapi ssh client (cve-2020-16846)
- Prevent creating world readable private keys with the tls execution module. (cve-2020-17490)

View file

@ -0,0 +1,15 @@
.. _release-3001-3:
=========================
Salt 3001.3 Release Notes
=========================
Version 3001.3 is a CVE fix release for :ref:`3001 <release-3001>`.
Fixed
-----
- Properly validate eauth credentials and tokens along with their ACLs.
Prior to this change eauth was not properly validated when calling
Salt ssh via the salt-api. Any value for 'eauth' or 'token' would allow a user
to bypass authentication and make calls to Salt ssh. (CVE-2020-25592)

View file

@ -0,0 +1,17 @@
.. _release-3002-1:
=========================
Salt 3002.1 Release Notes
=========================
Version 3002.1 is a CVE fix release for :ref:`3002 <release-3002>`.
Fixed
-----
- Prevent shell injections in netapi ssh client (cve-2020-16846)
- Prevent creating world readable private keys with the tls execution module. (cve-2020-17490)
- Properly validate eauth credentials and tokens along with their ACLs.
Prior to this change eauth was not properly validated when calling
Salt ssh via the salt-api. Any value for 'eauth' or 'token' would allow a user
to bypass authentication and make calls to Salt ssh. (CVE-2020-25592)

View file

@ -69,6 +69,7 @@ The information which can be stored in a roster ``target`` is the following:
set_path: # Set the path environment variable, to ensure the expected python
# binary is in the salt-ssh path, when running the command.
# Example: '$PATH:/usr/local/bin/'. Added in 3001 Release.
ssh_options: # List of options (as 'option=argument') to pass to ssh.
.. _ssh_pre_flight:

View file

@ -1,24 +1,19 @@
# -*- coding: utf-8 -*-
"""
Manage transport commands via ssh
"""
from __future__ import absolute_import, print_function, unicode_literals
import logging
import os
# Import python libs
import re
import shlex
import subprocess
import sys
import time
# Import salt libs
import salt.defaults.exitcodes
import salt.utils.json
import salt.utils.nb_popen
import salt.utils.vt
from salt.ext import six
log = logging.getLogger(__name__)
@ -31,22 +26,15 @@ RSTR = "_edbc7885e4f9aac9b83b35999b68d015148caf467b78fa39c05f669c0ff89878"
RSTR_RE = re.compile(r"(?:^|\r?\n)" + RSTR + r"(?:\r?\n|$)")
class NoPasswdError(Exception):
pass
class KeyAcceptError(Exception):
pass
def gen_key(path):
"""
Generate a key for use with salt-ssh
"""
cmd = 'ssh-keygen -P "" -f {0} -t rsa -q'.format(path)
if not os.path.isdir(os.path.dirname(path)):
cmd = ["ssh-keygen", "-P", "", "-f", path, "-t", "rsa", "-q"]
dirname = os.path.dirname(path)
if dirname and not os.path.isdir(dirname):
os.makedirs(os.path.dirname(path))
subprocess.call(cmd, shell=True)
subprocess.call(cmd)
def gen_shell(opts, **kwargs):
@ -66,7 +54,7 @@ def gen_shell(opts, **kwargs):
return shell
class Shell(object):
class Shell:
"""
Create a shell connection object to encapsulate ssh executions
"""
@ -95,7 +83,7 @@ class Shell(object):
self.host = host.strip("[]")
self.user = user
self.port = port
self.passwd = six.text_type(passwd) if passwd else passwd
self.passwd = str(passwd) if passwd else passwd
self.priv = priv
self.priv_passwd = priv_passwd
self.timeout = timeout
@ -133,26 +121,26 @@ class Shell(object):
options.append("PasswordAuthentication=no")
if self.opts.get("_ssh_version", (0,)) > (4, 9):
options.append("GSSAPIAuthentication=no")
options.append("ConnectTimeout={0}".format(self.timeout))
options.append("ConnectTimeout={}".format(self.timeout))
if self.opts.get("ignore_host_keys"):
options.append("StrictHostKeyChecking=no")
if self.opts.get("no_host_keys"):
options.extend(["StrictHostKeyChecking=no", "UserKnownHostsFile=/dev/null"])
known_hosts = self.opts.get("known_hosts_file")
if known_hosts and os.path.isfile(known_hosts):
options.append("UserKnownHostsFile={0}".format(known_hosts))
options.append("UserKnownHostsFile={}".format(known_hosts))
if self.port:
options.append("Port={0}".format(self.port))
options.append("Port={}".format(self.port))
if self.priv and self.priv != "agent-forwarding":
options.append("IdentityFile={0}".format(self.priv))
options.append("IdentityFile={}".format(self.priv))
if self.user:
options.append("User={0}".format(self.user))
options.append("User={}".format(self.user))
if self.identities_only:
options.append("IdentitiesOnly=yes")
ret = []
for option in options:
ret.append("-o {0} ".format(option))
ret.append("-o {} ".format(option))
return "".join(ret)
def _passwd_opts(self):
@ -168,7 +156,7 @@ class Shell(object):
]
if self.opts["_ssh_version"] > (4, 9):
options.append("GSSAPIAuthentication=no")
options.append("ConnectTimeout={0}".format(self.timeout))
options.append("ConnectTimeout={}".format(self.timeout))
if self.opts.get("ignore_host_keys"):
options.append("StrictHostKeyChecking=no")
if self.opts.get("no_host_keys"):
@ -187,19 +175,19 @@ class Shell(object):
]
)
if self.port:
options.append("Port={0}".format(self.port))
options.append("Port={}".format(self.port))
if self.user:
options.append("User={0}".format(self.user))
options.append("User={}".format(self.user))
if self.identities_only:
options.append("IdentitiesOnly=yes")
ret = []
for option in options:
ret.append("-o {0} ".format(option))
ret.append("-o {} ".format(option))
return "".join(ret)
def _ssh_opts(self):
return " ".join(["-o {0}".format(opt) for opt in self.ssh_options])
return " ".join(["-o {}".format(opt) for opt in self.ssh_options])
def _copy_id_str_old(self):
"""
@ -208,9 +196,9 @@ class Shell(object):
if self.passwd:
# Using single quotes prevents shell expansion and
# passwords containing '$'
return "{0} {1} '{2} -p {3} {4} {5}@{6}'".format(
return "{} {} '{} -p {} {} {}@{}'".format(
"ssh-copy-id",
"-i {0}.pub".format(self.priv),
"-i {}.pub".format(self.priv),
self._passwd_opts(),
self.port,
self._ssh_opts(),
@ -227,9 +215,9 @@ class Shell(object):
if self.passwd:
# Using single quotes prevents shell expansion and
# passwords containing '$'
return "{0} {1} {2} -p {3} {4} {5}@{6}".format(
return "{} {} {} -p {} {} {}@{}".format(
"ssh-copy-id",
"-i {0}.pub".format(self.priv),
"-i {}.pub".format(self.priv),
self._passwd_opts(),
self.port,
self._ssh_opts(),
@ -266,7 +254,7 @@ class Shell(object):
command.append(
" ".join(
[
"-R {0}".format(item)
"-R {}".format(item)
for item in self.remote_port_forwards.split(",")
]
)
@ -278,27 +266,13 @@ class Shell(object):
return " ".join(command)
def _old_run_cmd(self, cmd):
"""
Cleanly execute the command string
"""
try:
proc = subprocess.Popen(
cmd, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
)
data = proc.communicate()
return data[0], data[1], proc.returncode
except Exception: # pylint: disable=broad-except
return ("local", "Unknown Error", None)
def _run_nb_cmd(self, cmd):
"""
cmd iterator
"""
try:
proc = salt.utils.nb_popen.NonBlockingPopen(
cmd, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
self._split_cmd(cmd), stderr=subprocess.PIPE, stdout=subprocess.PIPE,
)
while True:
time.sleep(0.1)
@ -322,7 +296,7 @@ class Shell(object):
rcode = None
cmd = self._cmd_str(cmd)
logmsg = "Executing non-blocking command: {0}".format(cmd)
logmsg = "Executing non-blocking command: {}".format(cmd)
if self.passwd:
logmsg = logmsg.replace(self.passwd, ("*" * 6))
log.debug(logmsg)
@ -341,7 +315,7 @@ class Shell(object):
"""
cmd = self._cmd_str(cmd)
logmsg = "Executing command: {0}".format(cmd)
logmsg = "Executing command: {}".format(cmd)
if self.passwd:
logmsg = logmsg.replace(self.passwd, ("*" * 6))
if 'decode("base64")' in logmsg or "base64.b64decode(" in logmsg:
@ -358,23 +332,38 @@ class Shell(object):
scp a file or files to a remote system
"""
if makedirs:
self.exec_cmd("mkdir -p {0}".format(os.path.dirname(remote)))
self.exec_cmd("mkdir -p {}".format(os.path.dirname(remote)))
# scp needs [<ipv6}
host = self.host
if ":" in host:
host = "[{0}]".format(host)
host = "[{}]".format(host)
cmd = "{0} {1}:{2}".format(local, host, remote)
cmd = "{} {}:{}".format(local, host, remote)
cmd = self._cmd_str(cmd, ssh="scp")
logmsg = "Executing command: {0}".format(cmd)
logmsg = "Executing command: {}".format(cmd)
if self.passwd:
logmsg = logmsg.replace(self.passwd, ("*" * 6))
log.debug(logmsg)
return self._run_cmd(cmd)
def _split_cmd(self, cmd):
"""
Split a command string so that it is suitable to pass to Popen without
shell=True. This prevents shell injection attacks in the options passed
to ssh or some other command.
"""
try:
ssh_part, cmd_part = cmd.split("/bin/sh")
except ValueError:
cmd_lst = shlex.split(cmd)
else:
cmd_lst = shlex.split(ssh_part)
cmd_lst.append("/bin/sh {}".format(cmd_part))
return cmd_lst
def _run_cmd(self, cmd, key_accept=False, passwd_retries=3):
"""
Execute a shell command via VT. This is blocking and assumes that ssh
@ -384,8 +373,7 @@ class Shell(object):
return "", "No command or passphrase", 245
term = salt.utils.vt.Terminal(
cmd,
shell=True,
self._split_cmd(cmd),
log_stdout=True,
log_stdout_level="trace",
log_stderr=True,
@ -440,7 +428,7 @@ class Shell(object):
ret_stdout = (
"The host key needs to be accepted, to "
"auto accept run salt-ssh with the -i "
"flag:\n{0}"
"flag:\n{}"
).format(stdout)
return ret_stdout, "", 254
elif buff and buff.endswith("_||ext_mods||_"):

View file

@ -1,7 +1,5 @@
# -*- coding: utf-8 -*-
r"""
A salt module for SSL/TLS.
Can create a Certificate Authority (CA)
A salt module for SSL/TLS. Can create a Certificate Authority (CA)
or use Self-Signed certificates.
:depends: PyOpenSSL Python module (0.10 or later, 0.14 or later for X509
@ -99,28 +97,20 @@ Create a server req + cert with non-CN filename for the cert
cert_type=server cert_filename="something_completely_different"
Created Certificate for "www.anothersometh.ing": /etc/pki/my_little/certs/something_completely_different.crt
"""
from __future__ import absolute_import, print_function, unicode_literals
import binascii
import calendar
import logging
import math
# Import Python libs
import os
import re
import time
from datetime import datetime
# Import Salt libs
import salt.utils.data
import salt.utils.files
import salt.utils.stringutils
from salt.exceptions import CommandExecutionError
# Import 3rd-party libs
from salt.ext import six
from salt.ext.six.moves import range as _range
from salt.utils.versions import LooseVersion as _LooseVersion
# pylint: disable=C0103
@ -179,7 +169,7 @@ def _microtime():
"""
val1, val2 = math.modf(time.time())
val2 = int(val2)
return "{0:f}{1}".format(val1, val2)
return "{:f}{}".format(val1, val2)
def cert_base_path(cacert_path=None):
@ -242,12 +232,7 @@ def _new_serial(ca_name):
"""
hashnum = int(
binascii.hexlify(
b"_".join(
(
salt.utils.stringutils.to_bytes(_microtime()),
os.urandom(5) if six.PY3 else os.urandom(5).encode("hex"),
)
)
b"_".join((salt.utils.stringutils.to_bytes(_microtime()), os.urandom(5),))
),
16,
)
@ -256,7 +241,7 @@ def _new_serial(ca_name):
# record the hash somewhere
cachedir = __opts__["cachedir"]
log.debug("cachedir: %s", cachedir)
serial_file = "{0}/{1}.serial".format(cachedir, ca_name)
serial_file = "{}/{}.serial".format(cachedir, ca_name)
if not os.path.exists(cachedir):
os.makedirs(cachedir)
if not os.path.exists(serial_file):
@ -278,9 +263,9 @@ def _get_basic_info(ca_name, cert, ca_dir=None):
Get basic info to write out to the index.txt
"""
if ca_dir is None:
ca_dir = "{0}/{1}".format(_cert_base_path(), ca_name)
ca_dir = "{}/{}".format(_cert_base_path(), ca_name)
index_file = "{0}/index.txt".format(ca_dir)
index_file = "{}/index.txt".format(ca_dir)
cert = _read_cert(cert)
expire_date = _four_digit_year_to_two_digit(_get_expiration_date(cert))
@ -291,7 +276,7 @@ def _get_basic_info(ca_name, cert, ca_dir=None):
# then we can add the rest of the subject
subject += "/".join(
["{0}={1}".format(x, y) for x, y in cert.get_subject().get_components()]
["{}={}".format(x, y) for x, y in cert.get_subject().get_components()]
)
subject += "\n"
@ -309,12 +294,12 @@ def _write_cert_to_database(ca_name, cert, cacert_path=None, status="V"):
certificate to be recorded
"""
set_ca_path(cacert_path)
ca_dir = "{0}/{1}".format(cert_base_path(), ca_name)
ca_dir = "{}/{}".format(cert_base_path(), ca_name)
index_file, expire_date, serial_number, subject = _get_basic_info(
ca_name, cert, ca_dir
)
index_data = "{0}\t{1}\t\t{2}\tunknown\t{3}".format(
index_data = "{}\t{}\t\t{}\tunknown\t{}".format(
status, expire_date, serial_number, subject
)
@ -346,9 +331,9 @@ def maybe_fix_ssl_version(ca_name, cacert_path=None, ca_filename=None):
"""
set_ca_path(cacert_path)
if not ca_filename:
ca_filename = "{0}_ca_cert".format(ca_name)
certp = "{0}/{1}/{2}.crt".format(cert_base_path(), ca_name, ca_filename)
ca_keyp = "{0}/{1}/{2}.key".format(cert_base_path(), ca_name, ca_filename)
ca_filename = "{}_ca_cert".format(ca_name)
certp = "{}/{}/{}.crt".format(cert_base_path(), ca_name, ca_filename)
ca_keyp = "{}/{}/{}.key".format(cert_base_path(), ca_name, ca_filename)
with salt.utils.files.fopen(certp) as fic:
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, fic.read())
if cert.get_version() == 3:
@ -407,8 +392,8 @@ def ca_exists(ca_name, cacert_path=None, ca_filename=None):
"""
set_ca_path(cacert_path)
if not ca_filename:
ca_filename = "{0}_ca_cert".format(ca_name)
certp = "{0}/{1}/{2}.crt".format(cert_base_path(), ca_name, ca_filename)
ca_filename = "{}_ca_cert".format(ca_name)
certp = "{}/{}/{}.crt".format(cert_base_path(), ca_name, ca_filename)
if os.path.exists(certp):
maybe_fix_ssl_version(ca_name, cacert_path=cacert_path, ca_filename=ca_filename)
return True
@ -440,7 +425,7 @@ def get_ca(ca_name, as_text=False, cacert_path=None):
set_ca_path(cacert_path)
certp = "{0}/{1}/{1}_ca_cert.crt".format(cert_base_path(), ca_name)
if not os.path.exists(certp):
raise ValueError("Certificate does not exist for {0}".format(ca_name))
raise ValueError("Certificate does not exist for {}".format(ca_name))
else:
if as_text:
with salt.utils.files.fopen(certp) as fic:
@ -478,9 +463,9 @@ def get_ca_signed_cert(
if not cert_filename:
cert_filename = CN
certp = "{0}/{1}/certs/{2}.crt".format(cert_base_path(), ca_name, cert_filename)
certp = "{}/{}/certs/{}.crt".format(cert_base_path(), ca_name, cert_filename)
if not os.path.exists(certp):
raise ValueError("Certificate does not exists for {0}".format(CN))
raise ValueError("Certificate does not exists for {}".format(CN))
else:
if as_text:
with salt.utils.files.fopen(certp) as fic:
@ -522,9 +507,9 @@ def get_ca_signed_key(
if not key_filename:
key_filename = CN
keyp = "{0}/{1}/certs/{2}.key".format(cert_base_path(), ca_name, key_filename)
keyp = "{}/{}/certs/{}.key".format(cert_base_path(), ca_name, key_filename)
if not os.path.exists(keyp):
raise ValueError("Certificate does not exists for {0}".format(CN))
raise ValueError("Certificate does not exists for {}".format(CN))
else:
if as_text:
with salt.utils.files.fopen(keyp) as fic:
@ -533,7 +518,7 @@ def get_ca_signed_key(
def _read_cert(cert):
if isinstance(cert, six.string_types):
if isinstance(cert, str):
try:
with salt.utils.files.fopen(cert) as rfh:
return OpenSSL.crypto.load_certificate(
@ -569,10 +554,10 @@ def validate(cert, ca_name, crl_file):
cert_obj = _read_cert(cert)
if cert_obj is None:
raise CommandExecutionError(
"Failed to read cert from {0}, see log for details".format(cert)
"Failed to read cert from {}, see log for details".format(cert)
)
ca_dir = "{0}/{1}".format(cert_base_path(), ca_name)
ca_cert = _read_cert("{0}/{1}_ca_cert.crt".format(ca_dir, ca_name))
ca_dir = "{}/{}".format(cert_base_path(), ca_name)
ca_cert = _read_cert("{}/{}_ca_cert.crt".format(ca_dir, ca_name))
store.add_cert(ca_cert)
# These flags tell OpenSSL to check the leaf as well as the
# entire cert chain.
@ -604,7 +589,7 @@ def _get_expiration_date(cert):
if cert_obj is None:
raise CommandExecutionError(
"Failed to read cert from {0}, see log for details".format(cert)
"Failed to read cert from {}, see log for details".format(cert)
)
return datetime.strptime(
@ -640,18 +625,18 @@ def _check_onlyif_unless(onlyif, unless):
ret = None
retcode = __salt__["cmd.retcode"]
if onlyif is not None:
if not isinstance(onlyif, six.string_types):
if not isinstance(onlyif, str):
if not onlyif:
ret = {"comment": "onlyif condition is false", "result": True}
elif isinstance(onlyif, six.string_types):
elif isinstance(onlyif, str):
if retcode(onlyif) != 0:
ret = {"comment": "onlyif condition is false", "result": True}
log.debug("onlyif condition is false")
if unless is not None:
if not isinstance(unless, six.string_types):
if not isinstance(unless, str):
if unless:
ret = {"comment": "unless condition is true", "result": True}
elif isinstance(unless, six.string_types):
elif isinstance(unless, str):
if retcode(unless) == 0:
ret = {"comment": "unless condition is true", "result": True}
log.debug("unless condition is true")
@ -726,7 +711,7 @@ def create_ca(
ca_name='koji'
the resulting CA, and corresponding key, would be written in the following
location::
location with appropriate permissions::
/etc/pki/koji/koji_ca_cert.crt
/etc/pki/koji/koji_ca_cert.key
@ -744,18 +729,18 @@ def create_ca(
set_ca_path(cacert_path)
if not ca_filename:
ca_filename = "{0}_ca_cert".format(ca_name)
ca_filename = "{}_ca_cert".format(ca_name)
certp = "{0}/{1}/{2}.crt".format(cert_base_path(), ca_name, ca_filename)
ca_keyp = "{0}/{1}/{2}.key".format(cert_base_path(), ca_name, ca_filename)
certp = "{}/{}/{}.crt".format(cert_base_path(), ca_name, ca_filename)
ca_keyp = "{}/{}/{}.key".format(cert_base_path(), ca_name, ca_filename)
if not replace and not fixmode and ca_exists(ca_name, ca_filename=ca_filename):
return 'Certificate for CA named "{0}" already exists'.format(ca_name)
return 'Certificate for CA named "{}" already exists'.format(ca_name)
if fixmode and not os.path.exists(certp):
raise ValueError("{0} does not exists, can't fix".format(certp))
raise ValueError("{} does not exists, can't fix".format(certp))
if not os.path.exists("{0}/{1}".format(cert_base_path(), ca_name)):
os.makedirs("{0}/{1}".format(cert_base_path(), ca_name))
if not os.path.exists("{}/{}".format(cert_base_path(), ca_name)):
os.makedirs("{}/{}".format(cert_base_path(), ca_name))
# try to reuse existing ssl key
key = None
@ -773,7 +758,7 @@ def create_ca(
ca_keyp,
err,
)
bck = "{0}.unloadable.{1}".format(
bck = "{}.unloadable.{}".format(
ca_keyp, datetime.utcnow().strftime("%Y%m%d%H%M%S")
)
log.info("Saving unloadable CA ssl key in %s", bck)
@ -832,18 +817,19 @@ def create_ca(
keycontent = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
write_key = True
if os.path.exists(ca_keyp):
bck = "{0}.{1}".format(ca_keyp, datetime.utcnow().strftime("%Y%m%d%H%M%S"))
bck = "{}.{}".format(ca_keyp, datetime.utcnow().strftime("%Y%m%d%H%M%S"))
with salt.utils.files.fopen(ca_keyp) as fic:
old_key = salt.utils.stringutils.to_unicode(fic.read()).strip()
if old_key.strip() == keycontent.strip():
write_key = False
else:
log.info("Saving old CA ssl key in %s", bck)
with salt.utils.files.fopen(bck, "w") as bckf:
fp = os.open(bck, os.O_CREAT | os.O_RDWR, 0o600)
with salt.utils.files.fopen(fp, "w") as bckf:
bckf.write(old_key)
os.chmod(bck, 0o600)
if write_key:
with salt.utils.files.fopen(ca_keyp, "wb") as ca_key:
fp = os.open(ca_keyp, os.O_CREAT | os.O_RDWR, 0o600)
with salt.utils.files.fopen(fp, "wb") as ca_key:
ca_key.write(salt.utils.stringutils.to_bytes(keycontent))
with salt.utils.files.fopen(certp, "wb") as ca_crt:
@ -855,7 +841,7 @@ def create_ca(
_write_cert_to_database(ca_name, ca)
ret = ('Created Private Key: "{0}/{1}/{2}.key." ').format(
ret = ('Created Private Key: "{}/{}/{}.key." ').format(
cert_base_path(), ca_name, ca_filename
)
ret += ('Created CA "{0}": "{1}/{0}/{2}.crt."').format(
@ -886,7 +872,7 @@ def get_extensions(cert_type):
assert X509_EXT_ENABLED, (
"X509 extensions are not supported in "
"pyOpenSSL prior to version 0.15.1. Your "
"version: {0}".format(OpenSSL_version)
"version: {}".format(OpenSSL_version)
)
ext = {}
@ -943,7 +929,7 @@ def get_extensions(cert_type):
if cert_type not in ext:
try:
ext[cert_type] = __salt__["pillar.get"](
"tls.extensions:{0}".format(cert_type)
"tls.extensions:{}".format(cert_type)
)
except NameError as e:
log.debug(
@ -1067,7 +1053,7 @@ def create_csr(
CN='test.egavas.org'
the resulting CSR, and corresponding key, would be written in the
following location::
following location with appropriate permissions::
/etc/pki/koji/certs/test.egavas.org.csr
/etc/pki/koji/certs/test.egavas.org.key
@ -1081,28 +1067,28 @@ def create_csr(
set_ca_path(cacert_path)
if not ca_filename:
ca_filename = "{0}_ca_cert".format(ca_name)
ca_filename = "{}_ca_cert".format(ca_name)
if not ca_exists(ca_name, ca_filename=ca_filename):
return (
'Certificate for CA named "{0}" does not exist, please create ' "it first."
'Certificate for CA named "{}" does not exist, please create ' "it first."
).format(ca_name)
if not csr_path:
csr_path = "{0}/{1}/certs/".format(cert_base_path(), ca_name)
csr_path = "{}/{}/certs/".format(cert_base_path(), ca_name)
if not os.path.exists(csr_path):
os.makedirs(csr_path)
CN_ext = "_{0}".format(cert_type) if type_ext else ""
CN_ext = "_{}".format(cert_type) if type_ext else ""
if not csr_filename:
csr_filename = "{0}{1}".format(CN, CN_ext)
csr_filename = "{}{}".format(CN, CN_ext)
csr_f = "{0}/{1}.csr".format(csr_path, csr_filename)
csr_f = "{}/{}.csr".format(csr_path, csr_filename)
if not replace and os.path.exists(csr_f):
return 'Certificate Request "{0}" already exists'.format(csr_f)
return 'Certificate Request "{}" already exists'.format(csr_f)
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, bits)
@ -1125,7 +1111,7 @@ def create_csr(
extension_adds = []
for ext, value in extensions.items():
if isinstance(value, six.string_types):
if isinstance(value, str):
value = salt.utils.stringutils.to_bytes(value)
extension_adds.append(
OpenSSL.crypto.X509Extension(
@ -1138,7 +1124,7 @@ def create_csr(
if subjectAltName:
if X509_EXT_ENABLED:
if isinstance(subjectAltName, six.string_types):
if isinstance(subjectAltName, str):
subjectAltName = [subjectAltName]
extension_adds.append(
@ -1153,7 +1139,7 @@ def create_csr(
"subjectAltName cannot be set as X509 "
"extensions are not supported in pyOpenSSL "
"prior to version 0.15.1. Your "
"version: {0}.".format(OpenSSL_version)
"version: {}.".format(OpenSSL_version)
)
if X509_EXT_ENABLED:
@ -1163,9 +1149,9 @@ def create_csr(
req.sign(key, salt.utils.stringutils.to_str(digest))
# Write private key and request
with salt.utils.files.fopen(
"{0}/{1}.key".format(csr_path, csr_filename), "wb+"
) as priv_key:
priv_keyp = "{}/{}.key".format(csr_path, csr_filename)
fp = os.open(priv_keyp, os.O_CREAT | os.O_RDWR, 0o600)
with salt.utils.files.fopen(fp, "wb+") as priv_key:
priv_key.write(
salt.utils.stringutils.to_bytes(
OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
@ -1181,8 +1167,8 @@ def create_csr(
)
)
ret = 'Created Private Key: "{0}{1}.key." '.format(csr_path, csr_filename)
ret += 'Created CSR for "{0}": "{1}{2}.csr."'.format(CN, csr_path, csr_filename)
ret = 'Created Private Key: "{}{}.key." '.format(csr_path, csr_filename)
ret += 'Created CSR for "{}": "{}{}.csr."'.format(CN, csr_path, csr_filename)
return ret
@ -1246,7 +1232,7 @@ def create_self_signed_cert(
CN='test.egavas.org'
the resulting CERT, and corresponding key, would be written in the
following location::
following location with appropriate permissions::
/etc/pki/koji/certs/test.egavas.org.crt
/etc/pki/koji/certs/test.egavas.org.key
@ -1265,16 +1251,16 @@ def create_self_signed_cert(
"""
set_ca_path(cacert_path)
if not os.path.exists("{0}/{1}/certs/".format(cert_base_path(), tls_dir)):
os.makedirs("{0}/{1}/certs/".format(cert_base_path(), tls_dir))
if not os.path.exists("{}/{}/certs/".format(cert_base_path(), tls_dir)):
os.makedirs("{}/{}/certs/".format(cert_base_path(), tls_dir))
if not cert_filename:
cert_filename = CN
if not replace and os.path.exists(
"{0}/{1}/certs/{2}.crt".format(cert_base_path(), tls_dir, cert_filename)
"{}/{}/certs/{}.crt".format(cert_base_path(), tls_dir, cert_filename)
):
return 'Certificate "{0}" already exists'.format(cert_filename)
return 'Certificate "{}" already exists'.format(cert_filename)
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, bits)
@ -1302,17 +1288,18 @@ def create_self_signed_cert(
cert.sign(key, salt.utils.stringutils.to_str(digest))
# Write private key and cert
priv_key_path = "{0}/{1}/certs/{2}.key".format(
priv_key_path = "{}/{}/certs/{}.key".format(
cert_base_path(), tls_dir, cert_filename
)
with salt.utils.files.fopen(priv_key_path, "wb+") as priv_key:
fp = os.open(priv_key_path, os.O_CREAT | os.O_RDWR, 0o600)
with salt.utils.files.fopen(fp, "wb+") as priv_key:
priv_key.write(
salt.utils.stringutils.to_bytes(
OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
)
)
crt_path = "{0}/{1}/certs/{2}.crt".format(cert_base_path(), tls_dir, cert_filename)
crt_path = "{}/{}/certs/{}.crt".format(cert_base_path(), tls_dir, cert_filename)
with salt.utils.files.fopen(crt_path, "wb+") as crt:
crt.write(
salt.utils.stringutils.to_bytes(
@ -1322,10 +1309,10 @@ def create_self_signed_cert(
_write_cert_to_database(tls_dir, cert)
ret = 'Created Private Key: "{0}/{1}/certs/{2}.key." '.format(
ret = 'Created Private Key: "{}/{}/certs/{}.key." '.format(
cert_base_path(), tls_dir, cert_filename
)
ret += 'Created Certificate: "{0}/{1}/certs/{2}.crt."'.format(
ret += 'Created Certificate: "{}/{}/certs/{}.crt."'.format(
cert_base_path(), tls_dir, cert_filename
)
@ -1440,10 +1427,10 @@ def create_ca_signed_cert(
set_ca_path(cacert_path)
if not ca_filename:
ca_filename = "{0}_ca_cert".format(ca_name)
ca_filename = "{}_ca_cert".format(ca_name)
if not cert_path:
cert_path = "{0}/{1}/certs".format(cert_base_path(), ca_name)
cert_path = "{}/{}/certs".format(cert_base_path(), ca_name)
if type_ext:
if not cert_type:
@ -1452,54 +1439,54 @@ def create_ca_signed_cert(
)
return ret
elif cert_type:
CN_ext = "_{0}".format(cert_type)
CN_ext = "_{}".format(cert_type)
else:
CN_ext = ""
csr_filename = "{0}{1}".format(CN, CN_ext)
csr_filename = "{}{}".format(CN, CN_ext)
if not cert_filename:
cert_filename = "{0}{1}".format(CN, CN_ext)
cert_filename = "{}{}".format(CN, CN_ext)
if not replace and os.path.exists(
os.path.join(
os.path.sep.join(
"{0}/{1}/certs/{2}.crt".format(
"{}/{}/certs/{}.crt".format(
cert_base_path(), ca_name, cert_filename
).split("/")
)
)
):
return 'Certificate "{0}" already exists'.format(cert_filename)
return 'Certificate "{}" already exists'.format(cert_filename)
try:
maybe_fix_ssl_version(ca_name, cacert_path=cacert_path, ca_filename=ca_filename)
with salt.utils.files.fopen(
"{0}/{1}/{2}.crt".format(cert_base_path(), ca_name, ca_filename)
"{}/{}/{}.crt".format(cert_base_path(), ca_name, ca_filename)
) as fhr:
ca_cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, fhr.read()
)
with salt.utils.files.fopen(
"{0}/{1}/{2}.key".format(cert_base_path(), ca_name, ca_filename)
"{}/{}/{}.key".format(cert_base_path(), ca_name, ca_filename)
) as fhr:
ca_key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, fhr.read()
)
except IOError:
except OSError:
ret["retcode"] = 1
ret["comment"] = 'There is no CA named "{0}"'.format(ca_name)
ret["comment"] = 'There is no CA named "{}"'.format(ca_name)
return ret
try:
csr_path = "{0}/{1}.csr".format(cert_path, csr_filename)
csr_path = "{}/{}.csr".format(cert_path, csr_filename)
with salt.utils.files.fopen(csr_path) as fhr:
req = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, fhr.read()
)
except IOError:
except OSError:
ret["retcode"] = 1
ret["comment"] = 'There is no CSR that matches the CN "{0}"'.format(
ret["comment"] = 'There is no CSR that matches the CN "{}"'.format(
cert_filename
)
return ret
@ -1521,7 +1508,7 @@ def create_ca_signed_cert(
)
native_exts_obj = OpenSSL._util.lib.X509_REQ_get_extensions(req._req)
for i in _range(OpenSSL._util.lib.sk_X509_EXTENSION_num(native_exts_obj)):
for i in range(OpenSSL._util.lib.sk_X509_EXTENSION_num(native_exts_obj)):
ext = OpenSSL.crypto.X509Extension.__new__(OpenSSL.crypto.X509Extension)
ext._extension = OpenSSL._util.lib.sk_X509_EXTENSION_value(
native_exts_obj, i
@ -1548,7 +1535,7 @@ def create_ca_signed_cert(
cert.sign(ca_key, salt.utils.stringutils.to_str(digest))
cert_full_path = "{0}/{1}.crt".format(cert_path, cert_filename)
cert_full_path = "{}/{}.crt".format(cert_path, cert_filename)
with salt.utils.files.fopen(cert_full_path, "wb+") as crt:
crt.write(
@ -1559,7 +1546,7 @@ def create_ca_signed_cert(
_write_cert_to_database(ca_name, cert)
return 'Created Certificate for "{0}": "{1}/{2}.crt"'.format(
return 'Created Certificate for "{}": "{}/{}.crt"'.format(
CN, cert_path, cert_filename
)
@ -1600,9 +1587,9 @@ def create_pkcs12(ca_name, CN, passphrase="", cacert_path=None, replace=False):
"""
set_ca_path(cacert_path)
if not replace and os.path.exists(
"{0}/{1}/certs/{2}.p12".format(cert_base_path(), ca_name, CN)
"{}/{}/certs/{}.p12".format(cert_base_path(), ca_name, CN)
):
return 'Certificate "{0}" already exists'.format(CN)
return 'Certificate "{}" already exists'.format(CN)
try:
with salt.utils.files.fopen(
@ -1611,24 +1598,24 @@ def create_pkcs12(ca_name, CN, passphrase="", cacert_path=None, replace=False):
ca_cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, fhr.read()
)
except IOError:
return 'There is no CA named "{0}"'.format(ca_name)
except OSError:
return 'There is no CA named "{}"'.format(ca_name)
try:
with salt.utils.files.fopen(
"{0}/{1}/certs/{2}.crt".format(cert_base_path(), ca_name, CN)
"{}/{}/certs/{}.crt".format(cert_base_path(), ca_name, CN)
) as fhr:
cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, fhr.read()
)
with salt.utils.files.fopen(
"{0}/{1}/certs/{2}.key".format(cert_base_path(), ca_name, CN)
"{}/{}/certs/{}.key".format(cert_base_path(), ca_name, CN)
) as fhr:
key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, fhr.read()
)
except IOError:
return 'There is no certificate that matches the CN "{0}"'.format(CN)
except OSError:
return 'There is no certificate that matches the CN "{}"'.format(CN)
pkcs12 = OpenSSL.crypto.PKCS12()
@ -1637,7 +1624,7 @@ def create_pkcs12(ca_name, CN, passphrase="", cacert_path=None, replace=False):
pkcs12.set_privatekey(key)
with salt.utils.files.fopen(
"{0}/{1}/certs/{2}.p12".format(cert_base_path(), ca_name, CN), "wb"
"{}/{}/certs/{}.p12".format(cert_base_path(), ca_name, CN), "wb"
) as ofile:
ofile.write(
pkcs12.export(passphrase=salt.utils.stringutils.to_bytes(passphrase))
@ -1712,7 +1699,7 @@ def cert_info(cert, digest="sha256"):
# add additional info if your version of pyOpenSSL supports it
if hasattr(cert, "get_extension_count"):
ret["extensions"] = {}
for i in _range(cert.get_extension_count()):
for i in range(cert.get_extension_count()):
try:
ext = cert.get_extension(i)
key = salt.utils.stringutils.to_unicode(ext.get_short_name())
@ -1787,29 +1774,29 @@ def create_empty_crl(
set_ca_path(cacert_path)
if not ca_filename:
ca_filename = "{0}_ca_cert".format(ca_name)
ca_filename = "{}_ca_cert".format(ca_name)
if not crl_file:
crl_file = "{0}/{1}/crl.pem".format(_cert_base_path(), ca_name)
crl_file = "{}/{}/crl.pem".format(_cert_base_path(), ca_name)
if os.path.exists("{0}".format(crl_file)):
return 'CRL "{0}" already exists'.format(crl_file)
if os.path.exists("{}".format(crl_file)):
return 'CRL "{}" already exists'.format(crl_file)
try:
with salt.utils.files.fopen(
"{0}/{1}/{2}.crt".format(cert_base_path(), ca_name, ca_filename)
"{}/{}/{}.crt".format(cert_base_path(), ca_name, ca_filename)
) as fp_:
ca_cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, fp_.read()
)
with salt.utils.files.fopen(
"{0}/{1}/{2}.key".format(cert_base_path(), ca_name, ca_filename)
"{}/{}/{}.key".format(cert_base_path(), ca_name, ca_filename)
) as fp_:
ca_key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, fp_.read()
)
except IOError:
return 'There is no CA named "{0}"'.format(ca_name)
except OSError:
return 'There is no CA named "{}"'.format(ca_name)
crl = OpenSSL.crypto.CRL()
crl_text = crl.export(
@ -1819,7 +1806,7 @@ def create_empty_crl(
with salt.utils.files.fopen(crl_file, "w") as f:
f.write(salt.utils.stringutils.to_str(crl_text))
return 'Created an empty CRL: "{0}"'.format(crl_file)
return 'Created an empty CRL: "{}"'.format(crl_file)
def revoke_cert(
@ -1875,47 +1862,47 @@ def revoke_cert(
"""
set_ca_path(cacert_path)
ca_dir = "{0}/{1}".format(cert_base_path(), ca_name)
ca_dir = "{}/{}".format(cert_base_path(), ca_name)
if ca_filename is None:
ca_filename = "{0}_ca_cert".format(ca_name)
ca_filename = "{}_ca_cert".format(ca_name)
if cert_path is None:
cert_path = "{0}/{1}/certs".format(_cert_base_path(), ca_name)
cert_path = "{}/{}/certs".format(_cert_base_path(), ca_name)
if cert_filename is None:
cert_filename = "{0}".format(CN)
cert_filename = "{}".format(CN)
try:
with salt.utils.files.fopen(
"{0}/{1}/{2}.crt".format(cert_base_path(), ca_name, ca_filename)
"{}/{}/{}.crt".format(cert_base_path(), ca_name, ca_filename)
) as fp_:
ca_cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, fp_.read()
)
with salt.utils.files.fopen(
"{0}/{1}/{2}.key".format(cert_base_path(), ca_name, ca_filename)
"{}/{}/{}.key".format(cert_base_path(), ca_name, ca_filename)
) as fp_:
ca_key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, fp_.read()
)
except IOError:
return 'There is no CA named "{0}"'.format(ca_name)
except OSError:
return 'There is no CA named "{}"'.format(ca_name)
client_cert = _read_cert("{0}/{1}.crt".format(cert_path, cert_filename))
client_cert = _read_cert("{}/{}.crt".format(cert_path, cert_filename))
if client_cert is None:
return 'There is no client certificate named "{0}"'.format(CN)
return 'There is no client certificate named "{}"'.format(CN)
index_file, expire_date, serial_number, subject = _get_basic_info(
ca_name, client_cert, ca_dir
)
index_serial_subject = "{0}\tunknown\t{1}".format(serial_number, subject)
index_v_data = "V\t{0}\t\t{1}".format(expire_date, index_serial_subject)
index_serial_subject = "{}\tunknown\t{}".format(serial_number, subject)
index_v_data = "V\t{}\t\t{}".format(expire_date, index_serial_subject)
index_r_data_pattern = re.compile(
r"R\t" + expire_date + r"\t\d{12}Z\t" + re.escape(index_serial_subject)
)
index_r_data = "R\t{0}\t{1}\t{2}".format(
index_r_data = "R\t{}\t{}\t{}".format(
expire_date,
_four_digit_year_to_two_digit(datetime.utcnow()),
index_serial_subject,
@ -1930,13 +1917,13 @@ def revoke_cert(
try:
datetime.strptime(revoke_date, two_digit_year_fmt)
return (
'"{0}/{1}.crt" was already revoked, ' "serial number: {2}"
'"{}/{}.crt" was already revoked, ' "serial number: {}"
).format(cert_path, cert_filename, serial_number)
except ValueError:
ret["retcode"] = 1
ret["comment"] = (
"Revocation date '{0}' does not match"
"format '{1}'".format(revoke_date, two_digit_year_fmt)
"Revocation date '{}' does not match"
"format '{}'".format(revoke_date, two_digit_year_fmt)
)
return ret
elif index_serial_subject in line:
@ -1967,17 +1954,17 @@ def revoke_cert(
)
if crl_file is None:
crl_file = "{0}/{1}/crl.pem".format(_cert_base_path(), ca_name)
crl_file = "{}/{}/crl.pem".format(_cert_base_path(), ca_name)
if os.path.isdir(crl_file):
ret["retcode"] = 1
ret["comment"] = 'crl_file "{0}" is an existing directory'.format(crl_file)
ret["comment"] = 'crl_file "{}" is an existing directory'.format(crl_file)
return ret
with salt.utils.files.fopen(crl_file, "w") as fp_:
fp_.write(salt.utils.stringutils.to_str(crl_text))
return ('Revoked Certificate: "{0}/{1}.crt", ' "serial number: {2}").format(
return ('Revoked Certificate: "{}/{}.crt", ' "serial number: {}").format(
cert_path, cert_filename, serial_number
)

View file

@ -1,30 +1,32 @@
# encoding: utf-8
"""
Make api awesomeness
"""
from __future__ import absolute_import, print_function, unicode_literals
# Import Python libs
import copy
import inspect
import logging
import os
import salt.auth
import salt.client
import salt.client.ssh.client
import salt.config
import salt.daemons.masterapi
import salt.exceptions
# Import Salt libs
import salt.log # pylint: disable=W0611
import salt.runner
import salt.syspaths
import salt.utils.args
import salt.utils.minions
import salt.wheel
# Import third party libs
from salt.defaults import DEFAULT_TARGET_DELIM
from salt.ext import six
log = logging.getLogger(__name__)
class NetapiClient(object):
class NetapiClient:
"""
Provide a uniform method of accessing the various client interfaces in Salt
in the form of low-data data structures. For example:
@ -36,6 +38,15 @@ class NetapiClient(object):
def __init__(self, opts):
self.opts = opts
apiopts = copy.deepcopy(self.opts)
apiopts["enable_ssh_minions"] = True
apiopts["cachedir"] = os.path.join(opts["cachedir"], "saltapi")
if not os.path.exists(apiopts["cachedir"]):
os.makedirs(apiopts["cachedir"])
self.resolver = salt.auth.Resolver(apiopts)
self.loadauth = salt.auth.LoadAuth(apiopts)
self.key = salt.daemons.masterapi.access_keys(apiopts)
self.ckminions = salt.utils.minions.CkMinions(apiopts)
def _is_master_running(self):
"""
@ -55,6 +66,49 @@ class NetapiClient(object):
ipc_file = "workers.ipc"
return os.path.exists(os.path.join(self.opts["sock_dir"], ipc_file))
def _prep_auth_info(self, clear_load):
sensitive_load_keys = []
key = None
if "token" in clear_load:
auth_type = "token"
err_name = "TokenAuthenticationError"
sensitive_load_keys = ["token"]
return auth_type, err_name, key, sensitive_load_keys
elif "eauth" in clear_load:
auth_type = "eauth"
err_name = "EauthAuthenticationError"
sensitive_load_keys = ["username", "password"]
return auth_type, err_name, key, sensitive_load_keys
raise salt.exceptions.EauthAuthenticationError(
"No authentication credentials given"
)
def _authorize_ssh(self, low):
auth_type, err_name, key, sensitive_load_keys = self._prep_auth_info(low)
auth_check = self.loadauth.check_authentication(low, auth_type, key=key)
auth_list = auth_check.get("auth_list", [])
error = auth_check.get("error")
if error:
raise salt.exceptions.EauthAuthenticationError(error)
delimiter = low.get("kwargs", {}).get("delimiter", DEFAULT_TARGET_DELIM)
_res = self.ckminions.check_minions(
low["tgt"], low.get("tgt_type", "glob"), delimiter
)
minions = _res.get("minions", list())
missing = _res.get("missing", list())
authorized = self.ckminions.auth_check(
auth_list,
low["fun"],
low.get("arg", []),
low["tgt"],
low.get("tgt_type", "glob"),
minions=minions,
)
if not authorized:
raise salt.exceptions.EauthAuthenticationError(
"Authorization error occurred."
)
def run(self, low):
"""
Execute the specified function in the specified client by passing the
@ -68,7 +122,7 @@ class NetapiClient(object):
if low.get("client") not in CLIENTS:
raise salt.exceptions.SaltInvocationError(
"Invalid client specified: '{0}'".format(low.get("client"))
"Invalid client specified: '{}'".format(low.get("client"))
)
if not ("token" in low or "eauth" in low):
@ -81,6 +135,9 @@ class NetapiClient(object):
"Raw shell option not allowed."
)
if low["client"] == "ssh":
self._authorize_ssh(low)
l_fun = getattr(self, low["client"])
f_call = salt.utils.args.format_call(l_fun, low)
return l_fun(*f_call.get("args", ()), **f_call.get("kwargs", {}))

View file

@ -2011,7 +2011,7 @@ class Token(LowDataAdapter):
class Run(LowDataAdapter):
"""
Run commands bypassing the :ref:`normal session handling
<rest_cherrypy-auth>`
<rest_cherrypy-auth>`.
salt-api does not enforce authorization, Salt's eauth system does that.
Local/Runner/WheelClient all accept ``username``/``password``/``eauth``
@ -2032,7 +2032,7 @@ class Run(LowDataAdapter):
def POST(self, **kwargs):
"""
Run commands bypassing the :ref:`normal session handling
<rest_cherrypy-auth>` Other than that this URL is identical to the
<rest_cherrypy-auth>`. Otherwise, this URL is identical to the
:py:meth:`root URL (/) <LowDataAdapter.POST>`.
.. http:post:: /run
@ -2101,13 +2101,8 @@ class Run(LowDataAdapter):
ms-4: true
The /run endpoint can also be used to issue commands using the salt-ssh
subsystem.
When using salt-ssh, eauth credentials should not be supplied. Instead,
authentication should be handled by the SSH layer itself. The use of
the salt-ssh client does not require a salt master to be running.
Instead, only a roster file must be present in the salt configuration
directory.
subsystem. When using salt-ssh, eauth credentials must also be
supplied, and are subject to :ref:`eauth access-control lists <acl>`.
All SSH client requests are synchronous.
@ -2119,6 +2114,9 @@ class Run(LowDataAdapter):
-H 'Accept: application/x-yaml' \\
-d client='ssh' \\
-d tgt='*' \\
-d username='saltdev' \\
-d password='saltdev' \\
-d eauth='auto' \\
-d fun='test.ping'
.. code-block:: text
@ -2129,21 +2127,19 @@ class Run(LowDataAdapter):
Content-Length: 75
Content-Type: application/x-www-form-urlencoded
client=ssh&tgt=*&fun=test.ping
**Example SSH response:**
.. code-block:: text
return:
- silver:
fun: test.ping
fun_args: []
id: silver
jid: '20141203103525666185'
retcode: 0
return: true
success: true
_stamp: '2020-09-08T23:04:28.912609'
fun: test.ping
fun_args: []
id: silver
jid: '20200908230427905565'
retcode: 0
return: true
"""
return {
"return": list(self.exec_lowstate()),

View file

@ -1316,12 +1316,13 @@ class TestDaemon:
except OSError as exc:
if exc.errno != 3:
raise
with salt.utils.files.fopen(self.sshd_pidfile) as fhr:
try:
os.kill(int(fhr.read()), signal.SIGKILL)
except OSError as exc:
if exc.errno != 3:
raise
if os.path.exists(self.sshd_pidfile):
with salt.utils.files.fopen(self.sshd_pidfile) as fhr:
try:
os.kill(int(fhr.read()), signal.SIGKILL)
except OSError as exc:
if exc.errno != 3:
raise
def _exit_mockbin(self):
path = os.environ.get("PATH", "")

View file

View file

@ -1,3 +1,4 @@
import copy
import logging
import os
import time
@ -5,8 +6,11 @@ import time
import pytest
import salt.config
import salt.netapi
import salt.utils.files
import salt.utils.platform
import salt.utils.pycrypto
from salt.exceptions import EauthAuthenticationError
from tests.support.case import SSHCase
from tests.support.case import ModuleCase, SSHCase
from tests.support.helpers import (
SKIP_IF_NOT_RUNNING_PYTEST,
SaveRequestsPostHandler,
@ -174,13 +178,9 @@ class NetapiSSHClientTest(SSHCase):
"""
opts = AdaptedConfigurationTestCaseMixin.get_config("client_config").copy()
self.netapi = salt.netapi.NetapiClient(opts)
self.priv_file = os.path.join(RUNTIME_VARS.TMP_SSH_CONF_DIR, "client_key")
self.priv_file = os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "key_test")
self.rosters = os.path.join(RUNTIME_VARS.TMP_CONF_DIR)
# Initialize salt-ssh
self.run_function("test.ping")
def tearDown(self):
del self.netapi
@ -268,3 +268,293 @@ class NetapiSSHClientTest(SSHCase):
self.assertEqual(ret, None)
self.assertFalse(os.path.exists("badfile.txt"))
@staticmethod
def cleanup_file(path):
try:
os.remove(path)
except OSError:
pass
@staticmethod
def cleanup_dir(path):
try:
salt.utils.files.rm_rf(path)
except OSError:
pass
@slowTest
def test_shell_inject_ssh_priv(self):
"""
Verify CVE-2020-16846 for ssh_priv variable
"""
# ZDI-CAN-11143
path = "/tmp/test-11143"
self.addCleanup(self.cleanup_file, path)
self.addCleanup(self.cleanup_file, "aaa")
self.addCleanup(self.cleanup_file, "aaa.pub")
self.addCleanup(self.cleanup_dir, "aaa|id>")
low = {
"roster": "cache",
"client": "ssh",
"tgt": "www.zerodayinitiative.com",
"ssh_priv": "aaa|id>{} #".format(path),
"fun": "test.ping",
"eauth": "auto",
"username": "saltdev_auto",
"password": "saltdev",
}
ret = self.netapi.run(low)
self.assertFalse(os.path.exists(path))
@slowTest
def test_shell_inject_tgt(self):
"""
Verify CVE-2020-16846 for tgt variable
"""
# ZDI-CAN-11167
path = "/tmp/test-11167"
self.addCleanup(self.cleanup_file, path)
low = {
"roster": "cache",
"client": "ssh",
"tgt": "root|id>{} #@127.0.0.1".format(path),
"roster_file": "/tmp/salt-tests-tmpdir/config/roster",
"rosters": "/",
"fun": "test.ping",
"eauth": "auto",
"username": "saltdev_auto",
"password": "saltdev",
}
ret = self.netapi.run(low)
self.assertFalse(os.path.exists(path))
@slowTest
def test_shell_inject_ssh_options(self):
"""
Verify CVE-2020-16846 for ssh_options
"""
# ZDI-CAN-11169
path = "/tmp/test-11169"
self.addCleanup(self.cleanup_file, path)
low = {
"roster": "cache",
"client": "ssh",
"tgt": "127.0.0.1",
"renderer": "cheetah",
"fun": "test.ping",
"eauth": "auto",
"username": "saltdev_auto",
"password": "saltdev",
"roster_file": "/tmp/salt-tests-tmpdir/config/roster",
"rosters": "/",
"ssh_options": ["|id>{} #".format(path), "lol"],
}
ret = self.netapi.run(low)
self.assertFalse(os.path.exists(path))
@slowTest
def test_shell_inject_ssh_port(self):
"""
Verify CVE-2020-16846 for ssh_port variable
"""
# ZDI-CAN-11172
path = "/tmp/test-11172"
self.addCleanup(self.cleanup_file, path)
low = {
"roster": "cache",
"client": "ssh",
"tgt": "127.0.0.1",
"renderer": "cheetah",
"fun": "test.ping",
"eauth": "auto",
"username": "saltdev_auto",
"password": "saltdev",
"roster_file": "/tmp/salt-tests-tmpdir/config/roster",
"rosters": "/",
"ssh_port": "hhhhh|id>{} #".format(path),
}
ret = self.netapi.run(low)
self.assertFalse(os.path.exists(path))
@slowTest
def test_shell_inject_remote_port_forwards(self):
"""
Verify CVE-2020-16846 for remote_port_forwards variable
"""
# ZDI-CAN-11173
path = "/tmp/test-1173"
self.addCleanup(self.cleanup_file, path)
low = {
"roster": "cache",
"client": "ssh",
"tgt": "127.0.0.1",
"renderer": "cheetah",
"fun": "test.ping",
"roster_file": "/tmp/salt-tests-tmpdir/config/roster",
"rosters": "/",
"ssh_remote_port_forwards": "hhhhh|id>{} #, lol".format(path),
"eauth": "auto",
"username": "saltdev_auto",
"password": "saltdev",
}
ret = self.netapi.run(low)
self.assertFalse(os.path.exists(path))
@pytest.mark.requires_sshd_server
class NetapiSSHClientAuthTest(SSHCase):
USERA = "saltdev"
USERA_PWD = "saltdev"
def setUp(self):
"""
Set up a NetapiClient instance
"""
opts = salt.config.client_config(
os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "master")
)
naopts = copy.deepcopy(opts)
naopts["ignore_host_keys"] = True
self.netapi = salt.netapi.NetapiClient(naopts)
self.priv_file = os.path.join(RUNTIME_VARS.TMP_SSH_CONF_DIR, "client_key")
self.rosters = os.path.join(RUNTIME_VARS.TMP_CONF_DIR)
# Initialize salt-ssh
self.run_function("test.ping")
self.mod_case = ModuleCase()
try:
add_user = self.mod_case.run_function(
"user.add", [self.USERA], createhome=False
)
self.assertTrue(add_user)
if salt.utils.platform.is_darwin():
hashed_password = self.USERA_PWD
else:
hashed_password = salt.utils.pycrypto.gen_hash(password=self.USERA_PWD)
add_pwd = self.mod_case.run_function(
"shadow.set_password", [self.USERA, hashed_password],
)
self.assertTrue(add_pwd)
except AssertionError:
self.mod_case.run_function("user.delete", [self.USERA], remove=True)
self.skipTest("Could not add user or password, skipping test")
def tearDown(self):
del self.netapi
self.mod_case.run_function("user.delete", [self.USERA], remove=True)
@classmethod
def setUpClass(cls):
cls.post_webserver = Webserver(handler=SaveRequestsPostHandler)
cls.post_webserver.start()
cls.post_web_root = cls.post_webserver.web_root
cls.post_web_handler = cls.post_webserver.handler
@classmethod
def tearDownClass(cls):
cls.post_webserver.stop()
del cls.post_webserver
@slowTest
def test_ssh_auth_bypass(self):
"""
CVE-2020-25592 - Bogus eauth raises exception.
"""
low = {
"roster": "cache",
"client": "ssh",
"tgt": "127.0.0.1",
"renderer": "cheetah",
"fun": "test.ping",
"roster_file": "/tmp/salt-tests-tmpdir/config/roster",
"rosters": "/",
"eauth": "xx",
}
with self.assertRaises(salt.exceptions.EauthAuthenticationError):
ret = self.netapi.run(low)
@slowTest
def test_ssh_auth_valid(self):
"""
CVE-2020-25592 - Valid eauth works as expected.
"""
low = {
"client": "ssh",
"tgt": "localhost",
"fun": "test.ping",
"roster_file": "roster",
"rosters": [self.rosters],
"ssh_priv": self.priv_file,
"eauth": "pam",
"username": "saltdev",
"password": "saltdev",
}
ret = self.netapi.run(low)
assert "localhost" in ret
assert ret["localhost"]["return"] is True
@slowTest
def test_ssh_auth_invalid(self):
"""
CVE-2020-25592 - Wrong password raises exception.
"""
low = {
"client": "ssh",
"tgt": "localhost",
"fun": "test.ping",
"roster_file": "roster",
"rosters": [self.rosters],
"ssh_priv": self.priv_file,
"eauth": "pam",
"username": "saltdev",
"password": "notvalidpassword",
}
with self.assertRaises(salt.exceptions.EauthAuthenticationError):
ret = self.netapi.run(low)
@slowTest
def test_ssh_auth_invalid_acl(self):
"""
CVE-2020-25592 - Eauth ACL enforced.
"""
low = {
"client": "ssh",
"tgt": "localhost",
"fun": "at.at",
"args": ["12:05am", "echo foo"],
"roster_file": "roster",
"rosters": [self.rosters],
"ssh_priv": self.priv_file,
"eauth": "pam",
"username": "saltdev",
"password": "notvalidpassword",
}
with self.assertRaises(salt.exceptions.EauthAuthenticationError):
ret = self.netapi.run(low)
@slowTest
def test_ssh_auth_token(self):
"""
CVE-2020-25592 - Eauth tokens work as expected.
"""
low = {
"eauth": "pam",
"username": "saltdev",
"password": "saltdev",
}
ret = self.netapi.loadauth.mk_token(low)
assert "token" in ret and ret["token"]
low = {
"client": "ssh",
"tgt": "localhost",
"fun": "test.ping",
"roster_file": "roster",
"rosters": [self.rosters],
"ssh_priv": self.priv_file,
"token": ret["token"],
}
ret = self.netapi.run(low)
assert "localhost" in ret
assert ret["localhost"]["return"] is True

View file

@ -0,0 +1,115 @@
import os
import pytest
import salt.modules.tls as tls
from tests.support.mock import MagicMock, patch
@pytest.fixture(scope="module")
def tls_test_data():
return {
"create_ca": {
"bits": 2048,
"CN": "localhost",
"C": "US",
"ST": "Utah",
"L": "Salt Lake City",
"O": "SaltStack",
"OU": "Test Unit",
"emailAddress": "xyz@pdq.net",
"digest": "sha256",
"replace": False,
}
}
@pytest.fixture(autouse=True)
def setup_loader(request):
setup_loader_modules = {tls: {}}
with pytest.helpers.loader_mock(request, setup_loader_modules) as loader_mock:
yield loader_mock
def test_create_ca_permissions_on_cert_and_key(tmpdir, tls_test_data):
ca_name = "test_ca"
certp = tmpdir.join(ca_name).join("{}_ca_cert.crt".format(ca_name)).strpath
certk = tmpdir.join(ca_name).join("{}_ca_cert.key".format(ca_name)).strpath
mock_opt = MagicMock(return_value=tmpdir)
mock_ret = MagicMock(return_value=0)
print(tls_test_data)
with patch.dict(
tls.__salt__, {"config.option": mock_opt, "cmd.retcode": mock_ret}
), patch.dict(tls.__opts__, {"hash_type": "sha256", "cachedir": tmpdir}):
tls.create_ca(ca_name, days=365, fixmode=False, **tls_test_data["create_ca"])
certp_mode = os.stat(certp).st_mode & 0o7777
certk_mode = os.stat(certk).st_mode & 0o7777
assert 0o644 == certp_mode
assert 0o600 == certk_mode
def test_create_csr_permissions_on_csr_and_key(tmpdir, tls_test_data):
ca_name = "test_ca"
csrp = (
tmpdir.join(ca_name)
.join("certs")
.join("{}.csr".format(tls_test_data["create_ca"]["CN"]))
.strpath
)
keyp = (
tmpdir.join(ca_name)
.join("certs")
.join("{}.key".format(tls_test_data["create_ca"]["CN"]))
.strpath
)
mock_opt = MagicMock(return_value=tmpdir)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(
tls.__salt__,
{"config.option": mock_opt, "cmd.retcode": mock_ret, "pillar.get": mock_pgt},
), patch.dict(tls.__opts__, {"hash_type": "sha256", "cachedir": tmpdir}):
tls.create_ca(ca_name, days=365, **tls_test_data["create_ca"])
tls.create_csr(ca_name, **tls_test_data["create_ca"])
csrp_mode = os.stat(csrp).st_mode & 0o7777
keyp_mode = os.stat(keyp).st_mode & 0o7777
assert 0o644 == csrp_mode
assert 0o600 == keyp_mode
def test_create_self_signed_cert_permissions_on_csr_cert_and_key(tmpdir, tls_test_data):
ca_name = "test_ca"
certp = (
tmpdir.join(ca_name)
.join("certs")
.join("{}.crt".format(tls_test_data["create_ca"]["CN"]))
.strpath
)
keyp = (
tmpdir.join(ca_name)
.join("certs")
.join("{}.key".format(tls_test_data["create_ca"]["CN"]))
.strpath
)
mock_opt = MagicMock(return_value=tmpdir)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(
tls.__salt__,
{"config.option": mock_opt, "cmd.retcode": mock_ret, "pillar.get": mock_pgt},
), patch.dict(tls.__opts__, {"hash_type": "sha256", "cachedir": tmpdir}):
tls.create_self_signed_cert(ca_name, days=365, **tls_test_data["create_ca"])
certp_mode = os.stat(certp).st_mode & 0o7777
keyp_mode = os.stat(keyp).st_mode & 0o7777
assert 0o644 == certp_mode
assert 0o600 == keyp_mode