mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Address Bandit's B602(subprocess_popen_with_shell_equals_true) flagged issue
This commit is contained in:
parent
fbdd6478d1
commit
16751fa002
11 changed files with 132 additions and 90 deletions
|
@ -239,12 +239,14 @@ def get_executable():
|
||||||
"python",
|
"python",
|
||||||
)
|
)
|
||||||
for py_cmd in pycmds:
|
for py_cmd in pycmds:
|
||||||
cmd = (
|
|
||||||
py_cmd
|
|
||||||
+ " -c \"import sys; sys.stdout.write('%s:%s' % (sys.version_info[0], sys.version_info[1]))\""
|
|
||||||
)
|
|
||||||
stdout, _ = subprocess.Popen(
|
stdout, _ = subprocess.Popen(
|
||||||
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
|
[
|
||||||
|
py_cmd,
|
||||||
|
"-c"
|
||||||
|
"import sys; sys.stdout.write('%s:%s' % (sys.version_info[0], sys.version_info[1]))",
|
||||||
|
],
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
).communicate()
|
).communicate()
|
||||||
if sys.version_info[0] == 2 and sys.version_info[1] < 7:
|
if sys.version_info[0] == 2 and sys.version_info[1] < 7:
|
||||||
stdout = stdout.decode(get_system_encoding(), "replace").strip()
|
stdout = stdout.decode(get_system_encoding(), "replace").strip()
|
||||||
|
|
|
@ -203,12 +203,12 @@ def list_(
|
||||||
else:
|
else:
|
||||||
if not salt.utils.path.which("tar"):
|
if not salt.utils.path.which("tar"):
|
||||||
raise CommandExecutionError("'tar' command not available")
|
raise CommandExecutionError("'tar' command not available")
|
||||||
if decompress_cmd is not None:
|
if decompress_cmd is not None and isinstance(decompress_cmd, str):
|
||||||
# Guard against shell injection
|
# Guard against shell injection
|
||||||
try:
|
try:
|
||||||
decompress_cmd = " ".join(
|
decompress_cmd = [
|
||||||
[shlex.quote(x) for x in shlex.split(decompress_cmd)]
|
shlex.quote(x) for x in shlex.split(decompress_cmd)
|
||||||
)
|
]
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
raise CommandExecutionError("Invalid CLI options")
|
raise CommandExecutionError("Invalid CLI options")
|
||||||
else:
|
else:
|
||||||
|
@ -221,12 +221,11 @@ def list_(
|
||||||
)
|
)
|
||||||
== 0
|
== 0
|
||||||
):
|
):
|
||||||
decompress_cmd = "xz --decompress --stdout"
|
decompress_cmd = ["xz", "--decompress", "--stdout"]
|
||||||
|
|
||||||
if decompress_cmd:
|
if decompress_cmd:
|
||||||
decompressed = subprocess.Popen(
|
decompressed = subprocess.Popen(
|
||||||
"{} {}".format(decompress_cmd, shlex.quote(cached)),
|
decompress_cmd + [shlex.quote(cached)],
|
||||||
shell=True,
|
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.PIPE,
|
stderr=subprocess.PIPE,
|
||||||
)
|
)
|
||||||
|
|
|
@ -656,19 +656,15 @@ def _libvirt_creds():
|
||||||
"""
|
"""
|
||||||
Returns the user and group that the disk images should be owned by
|
Returns the user and group that the disk images should be owned by
|
||||||
"""
|
"""
|
||||||
g_cmd = "grep ^\\s*group /etc/libvirt/qemu.conf"
|
g_cmd = ["grep", "^\\s*group", "/etc/libvirt/qemu.conf"]
|
||||||
u_cmd = "grep ^\\s*user /etc/libvirt/qemu.conf"
|
u_cmd = ["grep", "^\\s*user", "/etc/libvirt/qemu.conf"]
|
||||||
try:
|
try:
|
||||||
stdout = subprocess.Popen(
|
stdout = subprocess.Popen(g_cmd, stdout=subprocess.PIPE).communicate()[0]
|
||||||
g_cmd, shell=True, stdout=subprocess.PIPE
|
|
||||||
).communicate()[0]
|
|
||||||
group = salt.utils.stringutils.to_str(stdout).split('"')[1]
|
group = salt.utils.stringutils.to_str(stdout).split('"')[1]
|
||||||
except IndexError:
|
except IndexError:
|
||||||
group = "root"
|
group = "root"
|
||||||
try:
|
try:
|
||||||
stdout = subprocess.Popen(
|
stdout = subprocess.Popen(u_cmd, stdout=subprocess.PIPE).communicate()[0]
|
||||||
u_cmd, shell=True, stdout=subprocess.PIPE
|
|
||||||
).communicate()[0]
|
|
||||||
user = salt.utils.stringutils.to_str(stdout).split('"')[1]
|
user = salt.utils.stringutils.to_str(stdout).split('"')[1]
|
||||||
except IndexError:
|
except IndexError:
|
||||||
user = "root"
|
user = "root"
|
||||||
|
@ -5728,7 +5724,7 @@ def seed_non_shared_migrate(disks, force=False):
|
||||||
# the target exists, check to see if it is compatible
|
# the target exists, check to see if it is compatible
|
||||||
pre = salt.utils.yaml.safe_load(
|
pre = salt.utils.yaml.safe_load(
|
||||||
subprocess.Popen(
|
subprocess.Popen(
|
||||||
"qemu-img info arch", shell=True, stdout=subprocess.PIPE
|
["qemu-img", "info", "arch"], stdout=subprocess.PIPE
|
||||||
).communicate()[0]
|
).communicate()[0]
|
||||||
)
|
)
|
||||||
if (
|
if (
|
||||||
|
@ -5740,11 +5736,9 @@ def seed_non_shared_migrate(disks, force=False):
|
||||||
os.makedirs(os.path.dirname(fn_))
|
os.makedirs(os.path.dirname(fn_))
|
||||||
if os.path.isfile(fn_):
|
if os.path.isfile(fn_):
|
||||||
os.remove(fn_)
|
os.remove(fn_)
|
||||||
cmd = "qemu-img create -f " + form + " " + fn_ + " " + size
|
subprocess.call(["qemu-img", "create", "-f", form, fn_, size])
|
||||||
subprocess.call(cmd, shell=True)
|
|
||||||
creds = _libvirt_creds()
|
creds = _libvirt_creds()
|
||||||
cmd = "chown " + creds["user"] + ":" + creds["group"] + " " + fn_
|
subprocess.call(["chown", "{user}:{group}".format(**creds), fn_])
|
||||||
subprocess.call(cmd, shell=True)
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
@ -5979,7 +5973,7 @@ def _is_bhyve_hyper():
|
||||||
vmm_enabled = False
|
vmm_enabled = False
|
||||||
try:
|
try:
|
||||||
stdout = subprocess.Popen(
|
stdout = subprocess.Popen(
|
||||||
sysctl_cmd, shell=True, stdout=subprocess.PIPE
|
["sysctl", "hw.vmm.create"], stdout=subprocess.PIPE
|
||||||
).communicate()[0]
|
).communicate()[0]
|
||||||
vmm_enabled = len(salt.utils.stringutils.to_str(stdout).split('"')[1]) != 0
|
vmm_enabled = len(salt.utils.stringutils.to_str(stdout).split('"')[1]) != 0
|
||||||
except IndexError:
|
except IndexError:
|
||||||
|
|
|
@ -71,13 +71,27 @@ def gen_hyper_keys(
|
||||||
with salt.utils.files.fopen(cainfo, "w+") as fp_:
|
with salt.utils.files.fopen(cainfo, "w+") as fp_:
|
||||||
fp_.write("cn = salted\nca\ncert_signing_key")
|
fp_.write("cn = salted\nca\ncert_signing_key")
|
||||||
if not os.path.isfile(cakey):
|
if not os.path.isfile(cakey):
|
||||||
subprocess.call("certtool --generate-privkey > {}".format(cakey), shell=True)
|
proc = subprocess.run(
|
||||||
|
["certtool", "--generate-privkey"],
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
universal_newlines=True,
|
||||||
|
check=True,
|
||||||
|
)
|
||||||
|
with salt.utils.files.fopen(cakey, "w") as wfh:
|
||||||
|
wfh.write(proc.stdout)
|
||||||
if not os.path.isfile(cacert):
|
if not os.path.isfile(cacert):
|
||||||
cmd = (
|
subprocess.call(
|
||||||
"certtool --generate-self-signed --load-privkey {} "
|
[
|
||||||
"--template {} --outfile {}"
|
"certtool",
|
||||||
).format(cakey, cainfo, cacert)
|
"--generate-self-signed",
|
||||||
subprocess.call(cmd, shell=True)
|
"--load-privkey",
|
||||||
|
cakey,
|
||||||
|
"--template",
|
||||||
|
cainfo,
|
||||||
|
"--outfile",
|
||||||
|
cacert,
|
||||||
|
]
|
||||||
|
)
|
||||||
sub_dir = os.path.join(key_dir, minion_id)
|
sub_dir = os.path.join(key_dir, minion_id)
|
||||||
if not os.path.isdir(sub_dir):
|
if not os.path.isdir(sub_dir):
|
||||||
os.makedirs(sub_dir)
|
os.makedirs(sub_dir)
|
||||||
|
@ -98,14 +112,31 @@ def gen_hyper_keys(
|
||||||
)
|
)
|
||||||
fp_.write(infodat)
|
fp_.write(infodat)
|
||||||
if not os.path.isfile(priv):
|
if not os.path.isfile(priv):
|
||||||
subprocess.call("certtool --generate-privkey > {}".format(priv), shell=True)
|
proc = subprocess.run(
|
||||||
|
["certtool", "--generate-privkey"],
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
universal_newlines=True,
|
||||||
|
check=True,
|
||||||
|
)
|
||||||
|
with salt.utils.files.fopen(priv, "w") as wfh:
|
||||||
|
wfh.write(proc.stdout)
|
||||||
if not os.path.isfile(cert):
|
if not os.path.isfile(cert):
|
||||||
cmd = (
|
subprocess.call(
|
||||||
"certtool --generate-certificate --load-privkey {} "
|
[
|
||||||
"--load-ca-certificate {} --load-ca-privkey {} "
|
"certtool",
|
||||||
"--template {} --outfile {}"
|
"--generate-certificate",
|
||||||
).format(priv, cacert, cakey, srvinfo, cert)
|
"--load-privkey",
|
||||||
subprocess.call(cmd, shell=True)
|
priv,
|
||||||
|
"--load-ca-certificate",
|
||||||
|
cacert,
|
||||||
|
"--load-ca-privkey",
|
||||||
|
cakey,
|
||||||
|
"--template",
|
||||||
|
srvinfo,
|
||||||
|
"--outfile",
|
||||||
|
cert,
|
||||||
|
]
|
||||||
|
)
|
||||||
if not os.path.isfile(clientinfo):
|
if not os.path.isfile(clientinfo):
|
||||||
with salt.utils.files.fopen(clientinfo, "w+") as fp_:
|
with salt.utils.files.fopen(clientinfo, "w+") as fp_:
|
||||||
infodat = salt.utils.stringutils.to_str(
|
infodat = salt.utils.stringutils.to_str(
|
||||||
|
@ -118,11 +149,28 @@ def gen_hyper_keys(
|
||||||
)
|
)
|
||||||
fp_.write(infodat)
|
fp_.write(infodat)
|
||||||
if not os.path.isfile(cpriv):
|
if not os.path.isfile(cpriv):
|
||||||
subprocess.call("certtool --generate-privkey > {}".format(cpriv), shell=True)
|
proc = subprocess.run(
|
||||||
|
["certtool", "--generate-privkey"],
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
universal_newlines=True,
|
||||||
|
check=True,
|
||||||
|
)
|
||||||
|
with salt.utils.files.fopen(cpriv, "w") as wfh:
|
||||||
|
wfh.write(proc.stdout)
|
||||||
if not os.path.isfile(ccert):
|
if not os.path.isfile(ccert):
|
||||||
cmd = (
|
subprocess.call(
|
||||||
"certtool --generate-certificate --load-privkey {} "
|
[
|
||||||
"--load-ca-certificate {} --load-ca-privkey {} "
|
"certtool",
|
||||||
"--template {} --outfile {}"
|
"--generate-certificate",
|
||||||
).format(cpriv, cacert, cakey, clientinfo, ccert)
|
"--load-privkey",
|
||||||
subprocess.call(cmd, shell=True)
|
cpriv,
|
||||||
|
"--load-ca-certificate",
|
||||||
|
cacert,
|
||||||
|
"--load-ca-privkey",
|
||||||
|
cakey,
|
||||||
|
"--template",
|
||||||
|
clientinfo,
|
||||||
|
"--outfile",
|
||||||
|
ccert,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
|
@ -45,10 +45,17 @@ The above essentially is the same as a top.sls containing the following:
|
||||||
- database
|
- database
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
|
import shlex
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
|
import salt.utils.platform
|
||||||
import salt.utils.yaml
|
import salt.utils.yaml
|
||||||
|
|
||||||
|
if salt.utils.platform.is_windows():
|
||||||
|
from salt.utils.win_functions import escape_argument as _cmd_quote
|
||||||
|
else:
|
||||||
|
_cmd_quote = shlex.quote
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -67,10 +74,20 @@ def top(**kwargs):
|
||||||
"""
|
"""
|
||||||
if "id" not in kwargs["opts"]:
|
if "id" not in kwargs["opts"]:
|
||||||
return {}
|
return {}
|
||||||
cmd = "{} {}".format(__opts__["master_tops"]["ext_nodes"], kwargs["opts"]["id"])
|
proc = subprocess.run(
|
||||||
ndata = salt.utils.yaml.safe_load(
|
[
|
||||||
subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).communicate()[0]
|
_cmd_quote(part)
|
||||||
|
for part in shlex.split(
|
||||||
|
__opts__["master_tops"]["ext_nodes"],
|
||||||
|
posix=salt.utils.platform.is_windows() is False,
|
||||||
|
)
|
||||||
|
+ [_cmd_quote(kwargs["opts"]["id"])]
|
||||||
|
],
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
check=True,
|
||||||
|
shell=True, # nosec
|
||||||
)
|
)
|
||||||
|
ndata = salt.utils.yaml.safe_load(proc.stdout)
|
||||||
if not ndata:
|
if not ndata:
|
||||||
log.info("master_tops ext_nodes call did not return any data")
|
log.info("master_tops ext_nodes call did not return any data")
|
||||||
ret = {}
|
ret = {}
|
||||||
|
|
|
@ -2535,8 +2535,7 @@ def remove_sshkey(host, known_hosts=None):
|
||||||
else:
|
else:
|
||||||
log.debug("Removing ssh key for %s from known hosts file", host)
|
log.debug("Removing ssh key for %s from known hosts file", host)
|
||||||
|
|
||||||
cmd = "ssh-keygen -R {}".format(host)
|
subprocess.call(["ssh-keygen", "-R", host])
|
||||||
subprocess.call(cmd, shell=True)
|
|
||||||
|
|
||||||
|
|
||||||
def wait_for_ip(
|
def wait_for_ip(
|
||||||
|
|
|
@ -896,15 +896,13 @@ def linux_interfaces():
|
||||||
ifconfig_path = None if ip_path else salt.utils.path.which("ifconfig")
|
ifconfig_path = None if ip_path else salt.utils.path.which("ifconfig")
|
||||||
if ip_path:
|
if ip_path:
|
||||||
cmd1 = subprocess.Popen(
|
cmd1 = subprocess.Popen(
|
||||||
"{} link show".format(ip_path),
|
[ip_path, "link", "show"],
|
||||||
shell=True,
|
|
||||||
close_fds=True,
|
close_fds=True,
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.STDOUT,
|
stderr=subprocess.STDOUT,
|
||||||
).communicate()[0]
|
).communicate()[0]
|
||||||
cmd2 = subprocess.Popen(
|
cmd2 = subprocess.Popen(
|
||||||
"{} addr show".format(ip_path),
|
[ip_path, "addr", "show"],
|
||||||
shell=True,
|
|
||||||
close_fds=True,
|
close_fds=True,
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.STDOUT,
|
stderr=subprocess.STDOUT,
|
||||||
|
@ -916,10 +914,7 @@ def linux_interfaces():
|
||||||
)
|
)
|
||||||
elif ifconfig_path:
|
elif ifconfig_path:
|
||||||
cmd = subprocess.Popen(
|
cmd = subprocess.Popen(
|
||||||
"{} -a".format(ifconfig_path),
|
[ifconfig_path, "-a"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
||||||
shell=True,
|
|
||||||
stdout=subprocess.PIPE,
|
|
||||||
stderr=subprocess.STDOUT,
|
|
||||||
).communicate()[0]
|
).communicate()[0]
|
||||||
ifaces = _interfaces_ifconfig(salt.utils.stringutils.to_str(cmd))
|
ifaces = _interfaces_ifconfig(salt.utils.stringutils.to_str(cmd))
|
||||||
return ifaces
|
return ifaces
|
||||||
|
@ -1062,10 +1057,7 @@ def junos_interfaces():
|
||||||
"""
|
"""
|
||||||
ifconfig_path = salt.utils.path.which("ifconfig")
|
ifconfig_path = salt.utils.path.which("ifconfig")
|
||||||
cmd = subprocess.Popen(
|
cmd = subprocess.Popen(
|
||||||
"{} -a".format(ifconfig_path),
|
[ifconfig_path, "-a"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
||||||
shell=True,
|
|
||||||
stdout=subprocess.PIPE,
|
|
||||||
stderr=subprocess.STDOUT,
|
|
||||||
).communicate()[0]
|
).communicate()[0]
|
||||||
return _junos_interfaces_ifconfig(salt.utils.stringutils.to_str(cmd))
|
return _junos_interfaces_ifconfig(salt.utils.stringutils.to_str(cmd))
|
||||||
|
|
||||||
|
@ -1082,10 +1074,7 @@ def netbsd_interfaces():
|
||||||
|
|
||||||
ifconfig_path = salt.utils.path.which("ifconfig")
|
ifconfig_path = salt.utils.path.which("ifconfig")
|
||||||
cmd = subprocess.Popen(
|
cmd = subprocess.Popen(
|
||||||
"{} -a".format(ifconfig_path),
|
[ifconfig_path, "-a"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
||||||
shell=True,
|
|
||||||
stdout=subprocess.PIPE,
|
|
||||||
stderr=subprocess.STDOUT,
|
|
||||||
).communicate()[0]
|
).communicate()[0]
|
||||||
return _netbsd_interfaces_ifconfig(salt.utils.stringutils.to_str(cmd))
|
return _netbsd_interfaces_ifconfig(salt.utils.stringutils.to_str(cmd))
|
||||||
|
|
||||||
|
@ -1227,8 +1216,10 @@ def _hw_addr_aix(iface):
|
||||||
MAC address not available in through interfaces
|
MAC address not available in through interfaces
|
||||||
"""
|
"""
|
||||||
cmd = subprocess.Popen(
|
cmd = subprocess.Popen(
|
||||||
"entstat -d {} | grep 'Hardware Address'".format(iface),
|
["grep", "Hardware Address"],
|
||||||
shell=True,
|
stdin=subprocess.Popen(
|
||||||
|
["entstat", "-d", iface], stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
||||||
|
).stdout,
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.STDOUT,
|
stderr=subprocess.STDOUT,
|
||||||
).communicate()[0]
|
).communicate()[0]
|
||||||
|
|
|
@ -53,8 +53,7 @@ def get_osarch():
|
||||||
"""
|
"""
|
||||||
if salt.utils.path.which("rpm"):
|
if salt.utils.path.which("rpm"):
|
||||||
ret = subprocess.Popen(
|
ret = subprocess.Popen(
|
||||||
'rpm --eval "%{_host_cpu}"',
|
["rpm", "--eval", "%{_host_cpu}"],
|
||||||
shell=True,
|
|
||||||
close_fds=True,
|
close_fds=True,
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.PIPE,
|
stderr=subprocess.PIPE,
|
||||||
|
|
|
@ -1116,9 +1116,7 @@ class WindowsUpdateAgent:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
log.debug(cmd)
|
log.debug(cmd)
|
||||||
p = subprocess.Popen(
|
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
||||||
)
|
|
||||||
return p.communicate()
|
return p.communicate()
|
||||||
|
|
||||||
except OSError as exc:
|
except OSError as exc:
|
||||||
|
|
14
setup.py
14
setup.py
|
@ -685,25 +685,25 @@ class TestCommand(Command):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
from subprocess import Popen
|
# This should either be removed or migrated to use nox
|
||||||
|
import subprocess
|
||||||
|
|
||||||
self.run_command("build")
|
self.run_command("build")
|
||||||
build_cmd = self.get_finalized_command("build_ext")
|
build_cmd = self.get_finalized_command("build_ext")
|
||||||
runner = os.path.abspath("tests/runtests.py")
|
runner = os.path.abspath("tests/runtests.py")
|
||||||
test_cmd = sys.executable + " {}".format(runner)
|
test_cmd = [sys.executable, runner]
|
||||||
if self.runtests_opts:
|
if self.runtests_opts:
|
||||||
test_cmd += " {}".format(self.runtests_opts)
|
test_cmd.extend(self.runtests_opts.split())
|
||||||
|
|
||||||
print("running test")
|
print("running test")
|
||||||
test_process = Popen(
|
ret = subprocess.run(
|
||||||
test_cmd,
|
test_cmd,
|
||||||
shell=True,
|
|
||||||
stdout=sys.stdout,
|
stdout=sys.stdout,
|
||||||
stderr=sys.stderr,
|
stderr=sys.stderr,
|
||||||
cwd=build_cmd.build_lib,
|
cwd=build_cmd.build_lib,
|
||||||
|
check=False,
|
||||||
)
|
)
|
||||||
test_process.communicate()
|
sys.exit(ret.returncode)
|
||||||
sys.exit(test_process.returncode)
|
|
||||||
|
|
||||||
|
|
||||||
class Clean(clean):
|
class Clean(clean):
|
||||||
|
|
|
@ -1,10 +1,6 @@
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
|
|
||||||
from __future__ import absolute_import, print_function, unicode_literals
|
|
||||||
|
|
||||||
import salt.utils.pkg
|
import salt.utils.pkg
|
||||||
from salt.utils.pkg import rpm
|
from salt.utils.pkg import rpm
|
||||||
from tests.support.mock import MagicMock, patch
|
from tests.support.mock import ANY, MagicMock, patch
|
||||||
from tests.support.unit import TestCase
|
from tests.support.unit import TestCase
|
||||||
|
|
||||||
|
|
||||||
|
@ -65,10 +61,9 @@ class PkgRPMTestCase(TestCase):
|
||||||
with patch("salt.utils.pkg.rpm.subprocess", subprocess_mock):
|
with patch("salt.utils.pkg.rpm.subprocess", subprocess_mock):
|
||||||
assert rpm.get_osarch() == "Z80"
|
assert rpm.get_osarch() == "Z80"
|
||||||
assert subprocess_mock.Popen.call_count == 2 # One within the mock
|
assert subprocess_mock.Popen.call_count == 2 # One within the mock
|
||||||
assert subprocess_mock.Popen.call_args[1]["close_fds"]
|
subprocess_mock.Popen.assert_called_with(
|
||||||
assert subprocess_mock.Popen.call_args[1]["shell"]
|
["rpm", "--eval", "%{_host_cpu}"], close_fds=True, stderr=ANY, stdout=ANY
|
||||||
assert len(subprocess_mock.Popen.call_args_list) == 2
|
)
|
||||||
assert subprocess_mock.Popen.call_args[0][0] == 'rpm --eval "%{_host_cpu}"'
|
|
||||||
|
|
||||||
@patch("salt.utils.path.which", MagicMock(return_value=False))
|
@patch("salt.utils.path.which", MagicMock(return_value=False))
|
||||||
@patch("salt.utils.pkg.rpm.subprocess", MagicMock(return_value=False))
|
@patch("salt.utils.pkg.rpm.subprocess", MagicMock(return_value=False))
|
||||||
|
|
Loading…
Add table
Reference in a new issue