Port #49697 and #49926 to master (#56893)

* Port #49697 to master

Fix set_static_all, always display requestmode and refactor
_change_state.

Display requestmode even if service is offline. This will keep parity
between older and newer nilrt distro.

Split the logic of _change_state function for older and newer disto.

Fix set_static_all. If there are no nameservers specified, we shouldn't
set any property for nameservers configuration to the service.

* Port #49926 to master.

Configure network interfaces without cable & add tests.

On NG we use connman as a network manager which consider a service, an
interface connected with a cable. The only way to set-up an interface
without a cable is through config files. This works also for services
because before we start the connamn daemon, we create a config file,
/var/lib/connman/interfaces.config with DHCP settings for each
interface.

* Require pyifaces, skip test if no pyifaces installed.

* Update unit test correspondingly to the module changes.

* Ran pre-commit hook

Co-authored-by: Alexandru Vasiu <alexandru.vasiu@ni.com>
This commit is contained in:
Dmitry Kuzmenko 2020-09-30 02:54:08 +03:00 committed by GitHub
parent 4e3082bb86
commit 3434a94203
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 321 additions and 164 deletions

View file

@ -27,6 +27,7 @@ paramiko>=2.1.6
psutil
pygit2<=0.28.2; python_version < '3.8'
pygit2>=1.2.0; python_version >= '3.8'
pyiface
pyinotify
pyopenssl
python-etcd>0.4.2

View file

@ -174,6 +174,7 @@ pycparser==2.19 # via cffi
pycryptodome==3.8.1 # via python-jose
pycryptodomex==3.9.7
pygit2==0.28.2 ; python_version < "3.8"
pyiface==0.0.11
pyinotify==0.9.6
pyjwt==1.7.1 # via adal
pynacl==1.3.0 # via paramiko

View file

@ -178,6 +178,7 @@ pycryptodome==3.8.1 # via python-jose
pycryptodomex==3.9.7
pyeapi==0.8.3 # via napalm
pygit2==0.28.2 ; python_version < "3.8"
pyiface==0.0.11
pyinotify==0.9.6
pyjwt==1.7.1 # via adal
pynacl==1.3.0 # via paramiko

View file

@ -177,6 +177,7 @@ pycryptodome==3.8.1 # via python-jose
pycryptodomex==3.9.7
pyeapi==0.8.3 # via napalm
pygit2==0.28.2 ; python_version < "3.8"
pyiface==0.0.11
pyinotify==0.9.6
pyjwt==1.7.1 # via adal
pynacl==1.3.0 # via paramiko

View file

@ -177,6 +177,7 @@ pycryptodome==3.8.1 # via python-jose
pycryptodomex==3.9.7
pyeapi==0.8.3 # via napalm
pygit2==1.2.0 ; python_version >= "3.8"
pyiface==0.0.11
pyinotify==0.9.6
pyjwt==1.7.1 # via adal
pynacl==1.3.0 # via paramiko

View file

@ -177,6 +177,7 @@ pycryptodome==3.8.1 # via python-jose
pycryptodomex==3.9.7
pyeapi==0.8.3 # via napalm
pygit2==1.2.0 ; python_version >= "3.8"
pyiface==0.0.11
pyinotify==0.9.6
pyjwt==1.7.1 # via adal
pynacl==1.3.0 # via paramiko

View file

@ -3,23 +3,15 @@ The networking module for NI Linux Real-Time distro
"""
# Import python libs
import logging
import os
import re
import time
# Import salt libs
import salt.exceptions
import salt.utils.files
import salt.utils.validate.net
# Import 3rd-party libs
# pylint: disable=import-error,redefined-builtin,no-name-in-module
from salt.ext.six.moves import configparser, map, range
# pylint: enable=import-error,redefined-builtin,no-name-in-module
from salt.ext.six.moves import configparser
try:
import pyconnman
@ -149,8 +141,7 @@ def _space_delimited_list(value):
if valid:
return True, "space-delimited string"
else:
return False, "{} is not a valid list.\n".format(value)
return False, "{} is not a valid list.\n".format(value)
def _validate_ipv4(value):
@ -227,9 +218,9 @@ def _get_service_info(service):
data["ipv4"]["dns"] = nameservers
else:
data["up"] = False
data["ipv4"] = {"requestmode": "disabled"}
if "ipv4" in data:
data["ipv4"]["supportedrequestmodes"] = ["static", "dhcp_linklocal"]
data["ipv4"]["supportedrequestmodes"] = ["dhcp_linklocal", "disabled", "static"]
return data
@ -368,22 +359,32 @@ def _get_static_info(interface):
"hwaddr": interface.hwaddr[:-1],
"up": False,
"ipv4": {
"supportedrequestmodes": ["static", "dhcp_linklocal"],
"requestmode": "static",
"supportedrequestmodes": ["dhcp_linklocal", "disabled", "static"],
"requestmode": "dhcp_linklocal",
},
"wireless": False,
}
hwaddr_section_number = "".join(data["hwaddr"].split(":"))
if os.path.exists(INTERFACES_CONFIG):
information = _load_config(
hwaddr_section_number, ["IPv4", "Nameservers"], filename=INTERFACES_CONFIG
"service_" + hwaddr_section_number,
["IPv4", "Nameservers", "IPv4.method"],
filename=INTERFACES_CONFIG,
)
if information["IPv4"] != "":
if information["IPv4.method"] == "manual" and information["IPv4"] != "":
ipv4_information = information["IPv4"].split("/")
data["ipv4"]["address"] = ipv4_information[0]
data["ipv4"]["dns"] = information["Nameservers"].split(",")
data["ipv4"]["dns"] = (
""
if information["Nameservers"] == "''"
else information["Nameservers"].split(",")
)
data["ipv4"]["netmask"] = ipv4_information[1]
data["ipv4"]["gateway"] = ipv4_information[2]
data["ipv4"]["requestmode"] = "static"
elif information["IPv4"] == "off":
data["ipv4"]["requestmode"] = "disabled"
return data
@ -535,6 +536,71 @@ def get_interfaces_details():
return {"interfaces": list(map(_get_info, _interfaces))}
def _change_state_legacy(interface, new_state):
"""
Enable or disable an interface on a legacy distro
Change adapter mode to TCP/IP. If previous adapter mode was EtherCAT, the target will need reboot.
:param interface: interface label
:param new_state: up or down
:return: True if the service was enabled, otherwise an exception will be thrown.
:rtype: bool
"""
initial_mode = _get_adapter_mode_info(interface)
_save_config(interface, "Mode", "TCPIP" if new_state == "up" else "Disabled")
if initial_mode == "ethercat":
__salt__["system.set_reboot_required_witnessed"]()
else:
out = __salt__["cmd.run_all"]("ip link set {} {}".format(interface, new_state))
if out["retcode"] != 0:
msg = "Couldn't {} interface {}. Error: {}".format(
"enable" if new_state == "up" else "disable", interface, out["stderr"]
)
raise salt.exceptions.CommandExecutionError(msg)
return True
def _change_dhcp_config(interface, enable_dhcp=True, filename=INTERFACES_CONFIG):
"""
Enable or disable dhcp for an interface which isn't a service (in a config file)
:param interface: interface label
:param enable_dhcp: True to enable dhcp and False to disable dhcp. Default is True
:param filename: Config file name. Default is INTERFACES_CONFIG.
"""
parser = configparser.ConfigParser()
parser.optionxform = str
if os.path.exists(filename):
try:
with salt.utils.files.fopen(filename, "r") as config_file:
parser.readfp(config_file)
except configparser.MissingSectionHeaderError:
pass
interface = pyiface.Interface(name=interface)
hwaddr = interface.hwaddr[:-1]
hwaddr_section_number = "".join(hwaddr.split(":"))
if parser.has_section("service_{}".format(hwaddr_section_number)):
parser.remove_section("service_{}".format(hwaddr_section_number))
parser.add_section("service_{}".format(hwaddr_section_number))
parser.set("service_{}".format(hwaddr_section_number), "MAC", hwaddr)
parser.set(
"service_{}".format(hwaddr_section_number),
"Name",
"ethernet_cable_{}".format(hwaddr_section_number),
)
parser.set("service_{}".format(hwaddr_section_number), "Type", "ethernet")
if enable_dhcp:
parser.set("service_{}".format(hwaddr_section_number), "IPv4.Method", "dhcp")
parser.set("service_{}".format(hwaddr_section_number), "AutoConnect", "true")
parser.set("service_{}".format(hwaddr_section_number), "Nameservers", "''")
else:
parser.set("service_{}".format(hwaddr_section_number), "IPv4", "off")
with salt.utils.files.fopen(filename, "w") as config_file:
parser.write(config_file)
return True
def _change_state(interface, new_state):
"""
Enable or disable an interface
@ -547,40 +613,16 @@ def _change_state(interface, new_state):
:rtype: bool
"""
if __grains__["lsb_distrib_id"] == "nilrt":
initial_mode = _get_adapter_mode_info(interface)
_save_config(interface, "Mode", "TCPIP" if new_state == "up" else "Disabled")
if initial_mode == "ethercat":
__salt__["system.set_reboot_required_witnessed"]()
else:
out = __salt__["cmd.run_all"](
"ip link set {} {}".format(interface, new_state)
)
if out["retcode"] != 0:
msg = "Couldn't {} interface {}. Error: {}".format(
"enable" if new_state == "up" else "disable",
interface,
out["stderr"],
)
raise salt.exceptions.CommandExecutionError(msg)
return True
service = _interface_to_service(interface)
if not service:
raise salt.exceptions.CommandExecutionError(
"Invalid interface name: {}".format(interface)
return _change_state_legacy(interface, new_state)
if interface in [x.name for x in pyiface.getIfaces()]:
return (
_change_dhcp_config(interface)
if new_state == "up"
else _change_dhcp_config(interface, False)
)
connected = _connected(service)
if (not connected and new_state == "up") or (connected and new_state == "down"):
service = pyconnman.ConnService(os.path.join(SERVICE_PATH, service))
try:
state = service.connect() if new_state == "up" else service.disconnect()
return state is None
except Exception: # pylint: disable=broad-except
raise salt.exceptions.CommandExecutionError(
"Couldn't {} service: {}\n".format(
"enable" if new_state == "up" else "disable", service
)
)
return True
raise salt.exceptions.CommandExecutionError(
"Invalid interface name: {}".format(interface)
)
def up(interface, iface_type=None): # pylint: disable=invalid-name,unused-argument
@ -729,28 +771,11 @@ def set_dhcp_linklocal_all(interface):
else:
_restart(interface)
return True
service = _interface_to_service(interface)
if not service:
raise salt.exceptions.CommandExecutionError(
"Invalid interface name: {}".format(interface)
)
service = pyconnman.ConnService(os.path.join(SERVICE_PATH, service))
ipv4 = service.get_property("IPv4.Configuration")
ipv4["Method"] = dbus.String("dhcp", variant_level=1)
ipv4["Address"] = dbus.String("", variant_level=1)
ipv4["Netmask"] = dbus.String("", variant_level=1)
ipv4["Gateway"] = dbus.String("", variant_level=1)
try:
service.set_property("IPv4.Configuration", ipv4)
service.set_property(
"Nameservers.Configuration", [""]
) # reset nameservers list
except Exception as exc: # pylint: disable=broad-except
exc_msg = "Couldn't set dhcp linklocal for service: {}\nError: {}\n".format(
service, exc
)
raise salt.exceptions.CommandExecutionError(exc_msg)
return True
if interface in [x.name for x in pyiface.getIfaces()]:
return _change_dhcp_config(interface)
raise salt.exceptions.CommandExecutionError(
"Invalid interface name: {}".format(interface)
)
def set_dhcp_only_all(interface):
@ -827,6 +852,7 @@ def _configure_static_interface(interface, **settings):
"""
interface = pyiface.Interface(name=interface)
parser = configparser.ConfigParser()
parser.optionxform = str
if os.path.exists(INTERFACES_CONFIG):
try:
with salt.utils.files.fopen(INTERFACES_CONFIG, "r") as config_file:
@ -835,28 +861,30 @@ def _configure_static_interface(interface, **settings):
pass
hwaddr = interface.hwaddr[:-1]
hwaddr_section_number = "".join(hwaddr.split(":"))
if not parser.has_section("interface_{}".format(hwaddr_section_number)):
parser.add_section("interface_{}".format(hwaddr_section_number))
if parser.has_section("service_{}".format(hwaddr_section_number)):
parser.remove_section("service_{}".format(hwaddr_section_number))
parser.add_section("service_{}".format(hwaddr_section_number))
ip_address = settings.get("ip", "0.0.0.0")
netmask = settings.get("netmask", "0.0.0.0")
gateway = settings.get("gateway", "0.0.0.0")
dns_servers = settings.get("dns", "")
dns_servers = settings.get("dns", "''")
name = settings.get("name", "ethernet_cable_{}".format(hwaddr_section_number))
parser.set(
"interface_{}".format(hwaddr_section_number),
"service_{}".format(hwaddr_section_number),
"IPv4",
"{}/{}/{}".format(ip_address, netmask, gateway),
)
parser.set("interface_{}".format(hwaddr_section_number), "Nameservers", dns_servers)
parser.set("interface_{}".format(hwaddr_section_number), "Name", name)
parser.set("interface_{}".format(hwaddr_section_number), "MAC", hwaddr)
parser.set("interface_{}".format(hwaddr_section_number), "Type", "ethernet")
parser.set("service_{}".format(hwaddr_section_number), "Nameservers", dns_servers)
parser.set("service_{}".format(hwaddr_section_number), "Name", name)
parser.set("service_{}".format(hwaddr_section_number), "MAC", hwaddr)
parser.set("service_{}".format(hwaddr_section_number), "Type", "ethernet")
parser.set("service_{}".format(hwaddr_section_number), "IPv4.method", "manual")
with salt.utils.files.fopen(INTERFACES_CONFIG, "w") as config_file:
parser.write(config_file)
return True
def set_static_all(interface, address, netmask, gateway, nameservers):
def set_static_all(interface, address, netmask, gateway, nameservers=None):
"""
Configure specified adapter to use ipv4 manual settings
@ -866,7 +894,7 @@ def set_static_all(interface, address, netmask, gateway, nameservers):
:param str address: ipv4 address
:param str netmask: ipv4 netmask
:param str gateway: ipv4 gateway
:param str nameservers: list of nameservers servers separated by spaces
:param str nameservers: list of nameservers servers separated by spaces (Optional)
:return: True if the settings were applied, otherwise an exception will be thrown.
:rtype: bool
@ -879,11 +907,12 @@ def set_static_all(interface, address, netmask, gateway, nameservers):
validate, msg = _validate_ipv4([address, netmask, gateway])
if not validate:
raise salt.exceptions.CommandExecutionError(msg)
validate, msg = _space_delimited_list(nameservers)
if not validate:
raise salt.exceptions.CommandExecutionError(msg)
if not isinstance(nameservers, list):
nameservers = nameservers.split(" ")
if nameservers:
validate, msg = _space_delimited_list(nameservers)
if not validate:
raise salt.exceptions.CommandExecutionError(msg)
if not isinstance(nameservers, list):
nameservers = nameservers.split(" ")
if __grains__["lsb_distrib_id"] == "nilrt":
initial_mode = _get_adapter_mode_info(interface)
_save_config(interface, "Mode", "TCPIP")
@ -899,39 +928,19 @@ def set_static_all(interface, address, netmask, gateway, nameservers):
else:
_restart(interface)
return True
service = _interface_to_service(interface)
if not service:
if interface in pyiface.getIfaces():
return _configure_static_interface(
interface,
**{
"ip": address,
"dns": ",".join(nameservers),
"netmask": netmask,
"gateway": gateway,
}
)
raise salt.exceptions.CommandExecutionError(
"Invalid interface name: {}".format(interface)
if interface in [x.name for x in pyiface.getIfaces()]:
return _configure_static_interface(
interface,
**{
"ip": address,
"dns": ",".join(nameservers) if nameservers else "''",
"netmask": netmask,
"gateway": gateway,
}
)
service = pyconnman.ConnService(os.path.join(SERVICE_PATH, service))
ipv4 = service.get_property("IPv4.Configuration")
ipv4["Method"] = dbus.String("manual", variant_level=1)
ipv4["Address"] = dbus.String("{}".format(address), variant_level=1)
ipv4["Netmask"] = dbus.String("{}".format(netmask), variant_level=1)
ipv4["Gateway"] = dbus.String("{}".format(gateway), variant_level=1)
try:
service.set_property("IPv4.Configuration", ipv4)
service.set_property(
"Nameservers.Configuration",
[dbus.String("{}".format(d)) for d in nameservers],
)
except Exception as exc: # pylint: disable=broad-except
exc_msg = "Couldn't set manual settings for service: {}\nError: {}\n".format(
service, exc
)
raise salt.exceptions.CommandExecutionError(exc_msg)
return True
raise salt.exceptions.CommandExecutionError(
"Invalid interface name: {}".format(interface)
)
def get_interface(iface):

View file

@ -1,10 +1,7 @@
# -*- coding: utf-8 -*-
"""
integration tests for nilirt_ip
"""
from __future__ import absolute_import, print_function, unicode_literals
import re
import shutil
import time
@ -12,7 +9,6 @@ import time
import salt.modules.nilrt_ip as ip
import salt.utils.files
import salt.utils.platform
from salt.ext import six
from salt.ext.six.moves import configparser
from tests.support.case import ModuleCase
from tests.support.helpers import (
@ -34,6 +30,8 @@ try:
except ImportError:
CaseInsensitiveDict = None
INTERFACE_FOR_TEST = "eth1"
@skip_if_not_root
@destructiveTest
@ -67,7 +65,7 @@ class NilrtIpModuleTest(ModuleCase):
Get current settings
"""
# save files from var/lib/connman*
super(NilrtIpModuleTest, self).setUp()
super().setUp()
if self.grains["lsb_distrib_id"] == "nilrt":
shutil.move("/etc/natinst/share/ni-rt.ini", "/tmp/ni-rt.ini")
else:
@ -120,21 +118,12 @@ class NilrtIpModuleTest(ModuleCase):
with salt.utils.files.fopen("/etc/natinst/share/ni-rt.ini", "r") as config_file:
config_parser = configparser.RawConfigParser(dict_type=CaseInsensitiveDict)
config_parser.readfp(config_file)
if six.PY2:
if (
config_parser.has_option("lvrt", "AdditionalNetworkProtocols")
and "ethercat"
in config_parser.get("lvrt", "AdditionalNetworkProtocols").lower()
):
return True
return False
else:
return (
"ethercat"
in config_parser.get(
"lvrt", "AdditionalNetworkProtocols", fallback=""
).lower()
)
return (
"ethercat"
in config_parser.get(
"lvrt", "AdditionalNetworkProtocols", fallback=""
).lower()
)
def test_down(self):
"""
@ -146,7 +135,8 @@ class NilrtIpModuleTest(ModuleCase):
self.assertTrue(result)
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
self.assertEqual(interface["adapter_mode"], "disabled")
if self.grains["lsb_distrib_id"] == "nilrt":
self.assertEqual(interface["adapter_mode"], "disabled")
self.assertFalse(
self.__connected(pyiface.Interface(name=interface["connectionid"]))
)
@ -164,9 +154,10 @@ class NilrtIpModuleTest(ModuleCase):
for interface in interfaces:
result = self.run_function("ip.up", [interface.name])
self.assertTrue(result)
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
self.assertEqual(interface["adapter_mode"], "tcpip")
if self.grains["lsb_distrib_id"] == "nilrt":
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
self.assertEqual(interface["adapter_mode"], "tcpip")
def test_set_dhcp_linklocal_all(self):
"""
@ -179,7 +170,8 @@ class NilrtIpModuleTest(ModuleCase):
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
self.assertEqual(interface["ipv4"]["requestmode"], "dhcp_linklocal")
self.assertEqual(interface["adapter_mode"], "tcpip")
if self.grains["lsb_distrib_id"] == "nilrt":
self.assertEqual(interface["adapter_mode"], "tcpip")
def test_set_dhcp_only_all(self):
"""
@ -231,12 +223,12 @@ class NilrtIpModuleTest(ModuleCase):
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
self.assertEqual(interface["adapter_mode"], "tcpip")
if self.grains["lsb_distrib_id"] != "nilrt":
self.assertIn("8.8.4.4", interface["ipv4"]["dns"])
self.assertIn("8.8.8.8", interface["ipv4"]["dns"])
else:
self.assertEqual(interface["ipv4"]["dns"], ["8.8.4.4"])
self.assertEqual(interface["adapter_mode"], "tcpip")
self.assertEqual(interface["ipv4"]["requestmode"], "static")
self.assertEqual(interface["ipv4"]["address"], "192.168.10.4")
self.assertEqual(interface["ipv4"]["netmask"], "255.255.255.0")
@ -246,6 +238,8 @@ class NilrtIpModuleTest(ModuleCase):
"""
Test supported adapter modes for each interface
"""
if self.grains["lsb_distrib_id"] != "nilrt":
self.skipTest("Test is just for older nilrt distros")
interface_pattern = re.compile("^eth[0-9]+$")
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
@ -264,17 +258,157 @@ class NilrtIpModuleTest(ModuleCase):
"""
if not self.__check_ethercat():
self.skipTest("Test is just for systems with Ethercat")
self.assertTrue(self.run_function("ip.set_ethercat", ["eth1", 19]))
self.assertTrue(self.run_function("ip.set_ethercat", [INTERFACE_FOR_TEST, 19]))
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
if interface["connectionid"] == "eth1":
if interface["connectionid"] == INTERFACE_FOR_TEST:
self.assertEqual(interface["adapter_mode"], "ethercat")
self.assertEqual(int(interface["ethercat"]["masterid"]), 19)
break
self.assertTrue(self.run_function("ip.set_dhcp_linklocal_all", ["eth1"]))
self.assertTrue(
self.run_function("ip.set_dhcp_linklocal_all", [INTERFACE_FOR_TEST])
)
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
if interface["connectionid"] == "eth1":
if interface["connectionid"] == INTERFACE_FOR_TEST:
self.assertEqual(interface["adapter_mode"], "tcpip")
self.assertEqual(interface["ipv4"]["requestmode"], "dhcp_linklocal")
break
@destructiveTest
def test_dhcp_disable(self):
"""
Test cases:
- dhcp -> disable
- disable -> dhcp
"""
if self.grains["lsb_distrib_id"] == "nilrt":
self.skipTest("Test is just for newer nilrt distros")
self.assertTrue(
self.run_function("ip.set_dhcp_linklocal_all", [INTERFACE_FOR_TEST])
)
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
if interface["connectionid"] == INTERFACE_FOR_TEST:
self.assertEqual(interface["ipv4"]["requestmode"], "dhcp_linklocal")
break
self.assertTrue(self.run_function("ip.disable", [INTERFACE_FOR_TEST]))
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
if interface["connectionid"] == INTERFACE_FOR_TEST:
self.assertEqual(interface["ipv4"]["requestmode"], "disabled")
break
self.assertTrue(
self.run_function("ip.set_dhcp_linklocal_all", [INTERFACE_FOR_TEST])
)
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
if interface["connectionid"] == INTERFACE_FOR_TEST:
self.assertEqual(interface["ipv4"]["requestmode"], "dhcp_linklocal")
break
@destructiveTest
def test_dhcp_static(self):
"""
Test cases:
- dhcp -> static
- static -> dhcp
"""
if self.grains["lsb_distrib_id"] == "nilrt":
self.skipTest("Test is just for newer nilrt distros")
self.assertTrue(
self.run_function("ip.set_dhcp_linklocal_all", [INTERFACE_FOR_TEST])
)
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
if interface["connectionid"] == INTERFACE_FOR_TEST:
self.assertEqual(interface["ipv4"]["requestmode"], "dhcp_linklocal")
break
self.assertTrue(
self.run_function(
"ip.set_static_all",
[
INTERFACE_FOR_TEST,
"192.168.1.125",
"255.255.255.0",
"192.168.1.1",
"8.8.8.8 8.8.8.4",
],
)
)
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
if interface["connectionid"] == INTERFACE_FOR_TEST:
self.assertEqual(interface["ipv4"]["requestmode"], "static")
self.assertEqual(interface["ipv4"]["address"], "192.168.1.125")
self.assertEqual(interface["ipv4"]["netmask"], "255.255.255.0")
self.assertIn("8.8.8.4", interface["ipv4"]["dns"])
self.assertIn("8.8.8.8", interface["ipv4"]["dns"])
break
self.assertTrue(
self.run_function("ip.set_dhcp_linklocal_all", [INTERFACE_FOR_TEST])
)
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
if interface["connectionid"] == INTERFACE_FOR_TEST:
self.assertEqual(interface["ipv4"]["requestmode"], "dhcp_linklocal")
break
@destructiveTest
def test_static_disable(self):
"""
Test cases:
- static -> disable
- disable -> static
"""
if self.grains["lsb_distrib_id"] == "nilrt":
self.skipTest("Test is just for newer nilrt distros")
self.assertTrue(
self.run_function(
"ip.set_static_all",
[
INTERFACE_FOR_TEST,
"192.168.1.125",
"255.255.255.0",
"192.168.1.1",
"8.8.8.8",
],
)
)
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
if interface["connectionid"] == INTERFACE_FOR_TEST:
self.assertEqual(interface["ipv4"]["requestmode"], "static")
self.assertEqual(interface["ipv4"]["address"], "192.168.1.125")
self.assertEqual(interface["ipv4"]["netmask"], "255.255.255.0")
self.assertEqual(interface["ipv4"]["dns"], ["8.8.8.8"])
break
self.assertTrue(self.run_function("ip.disable", [INTERFACE_FOR_TEST]))
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
if interface["connectionid"] == INTERFACE_FOR_TEST:
self.assertEqual(interface["ipv4"]["requestmode"], "disabled")
break
self.assertTrue(
self.run_function(
"ip.set_static_all",
[INTERFACE_FOR_TEST, "192.168.1.125", "255.255.255.0", "192.168.1.1"],
)
)
info = self.run_function("ip.get_interfaces_details", timeout=300)
for interface in info["interfaces"]:
if interface["connectionid"] == INTERFACE_FOR_TEST:
self.assertEqual(interface["ipv4"]["requestmode"], "static")
self.assertEqual(interface["ipv4"]["address"], "192.168.1.125")
self.assertEqual(interface["ipv4"]["netmask"], "255.255.255.0")
self.assertEqual(interface["ipv4"]["dns"], [])
break

View file

@ -1,17 +1,15 @@
# -*- coding: utf-8 -*-
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Libs
import salt.modules.nilrt_ip as nilrt_ip
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.mock import patch
from tests.support.unit import TestCase
from tests.support.mock import MagicMock, patch
from tests.support.unit import TestCase, skipIf
try:
import pyiface
except ImportError:
pyiface = None
@skipIf(not pyiface, "The python pyiface package is not installed")
class NilrtIPTestCase(TestCase, LoaderModuleMockMixin):
"""
TestCase for salt.modules.nilrt_ip module
@ -25,15 +23,25 @@ class NilrtIPTestCase(TestCase, LoaderModuleMockMixin):
Tests _change_state when not connected
and new state is down
"""
with patch("salt.modules.nilrt_ip._interface_to_service", return_value=True):
with patch("salt.modules.nilrt_ip._connected", return_value=False):
iface_mock = MagicMock()
iface_mock.name = "test_interface"
with patch("pyiface.getIfaces", return_value=[iface_mock]):
with patch(
"salt.modules.nilrt_ip._change_dhcp_config", return_value=True
) as change_dhcp_config_mock:
assert nilrt_ip._change_state("test_interface", "down")
assert change_dhcp_config_mock.called_with("test_interface", False)
def test_change_state_up_state(self):
"""
Tests _change_state when connected
and new state is up
"""
with patch("salt.modules.nilrt_ip._interface_to_service", return_value=True):
with patch("salt.modules.nilrt_ip._connected", return_value=True):
iface_mock = MagicMock()
iface_mock.name = "test_interface"
with patch("pyiface.getIfaces", return_value=[iface_mock]):
with patch(
"salt.modules.nilrt_ip._change_dhcp_config", return_value=True
) as change_dhcp_config_mock:
assert nilrt_ip._change_state("test_interface", "up")
assert change_dhcp_config_mock.called_with("test_interface")