salt/tests/integration/cloud/clouds/test_digitalocean.py
2020-04-20 19:25:17 -06:00

125 lines
4 KiB
Python

# -*- coding: utf-8 -*-
"""
Integration tests for DigitalOcean APIv2
"""
# Import Python Libs
from __future__ import absolute_import, print_function, unicode_literals
import base64
import hashlib
import salt.crypt
# Import Salt Libs
import salt.ext.six as six
import salt.utils.stringutils
from salt.ext.six.moves import range
# Import Salt Testing Libs
from tests.integration.cloud.helpers.cloud_test_base import TIMEOUT, CloudTest
class DigitalOceanTest(CloudTest):
"""
Integration tests for the DigitalOcean cloud provider in Salt-Cloud
"""
PROVIDER = "digitalocean"
REQUIRED_PROVIDER_CONFIG_ITEMS = (
"personal_access_token",
"ssh_key_file",
"ssh_key_name",
)
def test_list_images(self):
"""
Tests the return of running the --list-images command for digitalocean
"""
image_list = self.run_cloud("--list-images {0}".format(self.PROVIDER))
self.assertIn("ubuntu-18-04-x64", [i.strip() for i in image_list])
def test_list_locations(self):
"""
Tests the return of running the --list-locations command for digitalocean
"""
_list_locations = self.run_cloud("--list-locations {0}".format(self.PROVIDER))
self.assertIn("San Francisco 2", [i.strip() for i in _list_locations])
def test_list_sizes(self):
"""
Tests the return of running the --list-sizes command for digitalocean
"""
_list_sizes = self.run_cloud("--list-sizes {0}".format(self.PROVIDER))
self.assertIn("16gb", [i.strip() for i in _list_sizes])
def test_key_management(self):
"""
Test key management
"""
do_key_name = self.instance_name + "-key"
# generate key and fingerprint
if salt.crypt.HAS_M2:
rsa_key = salt.crypt.RSA.gen_key(4096, 65537, lambda: None)
pub = six.b(
"ssh-rsa {}".format(
base64.b64encode(
six.b("\x00\x00\x00\x07ssh-rsa{}{}".format(*rsa_key.pub()))
)
)
)
else:
ssh_key = salt.crypt.RSA.generate(4096)
pub = ssh_key.publickey().exportKey("OpenSSH")
pub = salt.utils.stringutils.to_str(pub)
key_hex = hashlib.md5(
base64.b64decode(pub.strip().split()[1].encode())
).hexdigest()
finger_print = ":".join([key_hex[x : x + 2] for x in range(0, len(key_hex), 2)])
try:
_key = self.run_cloud(
'-f create_key {0} name="{1}" public_key="{2}"'.format(
self.PROVIDER, do_key_name, pub
)
)
# Upload public key
self.assertIn(finger_print, [i.strip() for i in _key])
# List all keys
list_keypairs = self.run_cloud("-f list_keypairs {0}".format(self.PROVIDER))
self.assertIn(finger_print, [i.strip() for i in list_keypairs])
# List key
show_keypair = self.run_cloud(
"-f show_keypair {0} keyname={1}".format(self.PROVIDER, do_key_name)
)
self.assertIn(finger_print, [i.strip() for i in show_keypair])
except AssertionError:
# Delete the public key if the above assertions fail
self.run_cloud(
"-f remove_key {0} id={1}".format(self.PROVIDER, finger_print)
)
raise
finally:
# Delete public key
self.assertTrue(
self.run_cloud(
"-f remove_key {0} id={1}".format(self.PROVIDER, finger_print)
)
)
def test_instance(self):
"""
Test creating an instance on DigitalOcean
"""
# check if instance with salt installed returned
ret_str = self.run_cloud(
"-p digitalocean-test {0}".format(self.instance_name), timeout=TIMEOUT
)
self.assertInstanceExists(ret_str)
self.assertDestroyInstance()