Move modules and module_utils unit tests to correct place (#81)

* Move modules and module_utils unit tests to correct place.

* Update ignore.txt

* Fix imports.

* Fix typos.

* Fix more typos.
This commit is contained in:
Felix Fontein 2020-03-31 10:42:38 +02:00 committed by GitHub
commit be191cce6c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
1170 changed files with 732 additions and 751 deletions

View file

@ -0,0 +1,137 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import sys
import pytest
import ansible_collections.community.general.tests.unit.compat.unittest as unittest
from ansible_collections.community.general.tests.unit.compat.mock import MagicMock
from ansible_collections.community.general.tests.unit.compat.unittest import TestCase
from ansible_collections.community.general.tests.unit.plugins.modules.utils import set_module_args
# Exoscale's cs doesn't support Python 2.6
pytestmark = []
if sys.version_info[:2] != (2, 6):
from ansible_collections.community.general.plugins.modules.cloud.cloudstack.cs_traffic_type import AnsibleCloudStackTrafficType, setup_module_object
from ansible_collections.community.general.plugins.module_utils.cloudstack import HAS_LIB_CS
if not HAS_LIB_CS:
pytestmark.append(pytest.mark.skip('The cloudstack library, "cs", is needed to test cs_traffic_type'))
else:
pytestmark.append(pytest.mark.skip('Exoscale\'s cs doesn\'t support Python 2.6'))
EXISTING_TRAFFIC_TYPES_RESPONSE = {
"count": 3,
"traffictype": [
{
"id": "9801cf73-5a73-4883-97e4-fa20c129226f",
"kvmnetworklabel": "cloudbr0",
"physicalnetworkid": "659c1840-9374-440d-a412-55ca360c9d3c",
"traffictype": "Management"
},
{
"id": "28ed70b7-9a1f-41bf-94c3-53a9f22da8b6",
"kvmnetworklabel": "cloudbr0",
"physicalnetworkid": "659c1840-9374-440d-a412-55ca360c9d3c",
"traffictype": "Guest"
},
{
"id": "9c05c802-84c0-4eda-8f0a-f681364ffb46",
"kvmnetworklabel": "cloudbr0",
"physicalnetworkid": "659c1840-9374-440d-a412-55ca360c9d3c",
"traffictype": "Storage"
}
]
}
VALID_LIST_NETWORKS_RESPONSE = {
"count": 1,
"physicalnetwork": [
{
"broadcastdomainrange": "ZONE",
"id": "659c1840-9374-440d-a412-55ca360c9d3c",
"name": "eth1",
"state": "Enabled",
"vlan": "3900-4000",
"zoneid": "49acf813-a8dd-4da0-aa53-1d826d6003e7"
}
]
}
VALID_LIST_ZONES_RESPONSE = {
"count": 1,
"zone": [
{
"allocationstate": "Enabled",
"dhcpprovider": "VirtualRouter",
"dns1": "8.8.8.8",
"dns2": "8.8.4.4",
"guestcidraddress": "10.10.0.0/16",
"id": "49acf813-a8dd-4da0-aa53-1d826d6003e7",
"internaldns1": "192.168.56.1",
"localstorageenabled": True,
"name": "DevCloud-01",
"networktype": "Advanced",
"securitygroupsenabled": False,
"tags": [],
"zonetoken": "df20d65a-c6c8-3880-9064-4f77de2291ef"
}
]
}
base_module_args = {
"api_key": "api_key",
"api_secret": "very_secret_content",
"api_url": "http://localhost:8888/api/client",
"kvm_networklabel": "cloudbr0",
"physical_network": "eth1",
"poll_async": True,
"state": "present",
"traffic_type": "Guest",
"zone": "DevCloud-01"
}
class TestAnsibleCloudstackTraffiType(TestCase):
def test_module_is_created_sensibly(self):
set_module_args(base_module_args)
module = setup_module_object()
assert module.params['traffic_type'] == 'Guest'
def test_update_called_when_traffic_type_exists(self):
set_module_args(base_module_args)
module = setup_module_object()
actt = AnsibleCloudStackTrafficType(module)
actt.get_traffic_type = MagicMock(return_value=EXISTING_TRAFFIC_TYPES_RESPONSE['traffictype'][0])
actt.update_traffic_type = MagicMock()
actt.present_traffic_type()
self.assertTrue(actt.update_traffic_type.called)
def test_update_not_called_when_traffic_type_doesnt_exist(self):
set_module_args(base_module_args)
module = setup_module_object()
actt = AnsibleCloudStackTrafficType(module)
actt.get_traffic_type = MagicMock(return_value=None)
actt.update_traffic_type = MagicMock()
actt.add_traffic_type = MagicMock()
actt.present_traffic_type()
self.assertFalse(actt.update_traffic_type.called)
self.assertTrue(actt.add_traffic_type.called)
def test_traffic_type_returned_if_exists(self):
set_module_args(base_module_args)
module = setup_module_object()
actt = AnsibleCloudStackTrafficType(module)
actt.get_physical_network = MagicMock(return_value=VALID_LIST_NETWORKS_RESPONSE['physicalnetwork'][0])
actt.get_traffic_types = MagicMock(return_value=EXISTING_TRAFFIC_TYPES_RESPONSE)
tt = actt.present_traffic_type()
self.assertTrue(tt.get('kvmnetworklabel') == base_module_args['kvm_networklabel'])
self.assertTrue(tt.get('traffictype') == base_module_args['traffic_type'])
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,22 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import unittest
from ansible_collections.community.general.plugins.modules.cloud.docker.docker_container import TaskParameters
class TestTaskParameters(unittest.TestCase):
"""Unit tests for TaskParameters."""
def test_parse_exposed_ports_tcp_udp(self):
"""
Ensure _parse_exposed_ports does not cancel ports with the same
number but different protocol.
"""
task_params = TaskParameters.__new__(TaskParameters)
task_params.exposed_ports = None
result = task_params._parse_exposed_ports([80, '443', '443/udp'])
self.assertTrue((80, 'tcp') in result)
self.assertTrue((443, 'tcp') in result)
self.assertTrue((443, 'udp') in result)

View file

@ -0,0 +1,31 @@
"""Unit tests for docker_network."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import pytest
from ansible_collections.community.general.plugins.modules.cloud.docker.docker_network import validate_cidr
@pytest.mark.parametrize("cidr,expected", [
('192.168.0.1/16', 'ipv4'),
('192.168.0.1/24', 'ipv4'),
('192.168.0.1/32', 'ipv4'),
('fdd1:ac8c:0557:7ce2::/64', 'ipv6'),
('fdd1:ac8c:0557:7ce2::/128', 'ipv6'),
])
def test_validate_cidr_positives(cidr, expected):
assert validate_cidr(cidr) == expected
@pytest.mark.parametrize("cidr", [
'192.168.0.1',
'192.168.0.1/34',
'192.168.0.1/asd',
'fdd1:ac8c:0557:7ce2::',
])
def test_validate_cidr_negatives(cidr):
with pytest.raises(ValueError) as e:
validate_cidr(cidr)
assert '"{0}" is not a valid CIDR'.format(cidr) == str(e.value)

View file

@ -0,0 +1,510 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import pytest
class APIErrorMock(Exception):
def __init__(self, message, response=None, explanation=None):
self.message = message
self.response = response
self.explanation = explanation
@pytest.fixture(autouse=True)
def docker_module_mock(mocker):
docker_module_mock = mocker.MagicMock()
docker_utils_module_mock = mocker.MagicMock()
docker_errors_module_mock = mocker.MagicMock()
docker_errors_module_mock.APIError = APIErrorMock
mock_modules = {
'docker': docker_module_mock,
'docker.utils': docker_utils_module_mock,
'docker.errors': docker_errors_module_mock,
}
return mocker.patch.dict('sys.modules', **mock_modules)
@pytest.fixture(autouse=True)
def docker_swarm_service():
from ansible_collections.community.general.plugins.modules.cloud.docker import docker_swarm_service
return docker_swarm_service
def test_retry_on_out_of_sequence_error(mocker, docker_swarm_service):
run_mock = mocker.MagicMock(
side_effect=APIErrorMock(
message='',
response=None,
explanation='rpc error: code = Unknown desc = update out of sequence',
)
)
manager = docker_swarm_service.DockerServiceManager(client=None)
manager.run = run_mock
with pytest.raises(APIErrorMock):
manager.run_safe()
assert run_mock.call_count == 3
def test_no_retry_on_general_api_error(mocker, docker_swarm_service):
run_mock = mocker.MagicMock(
side_effect=APIErrorMock(message='', response=None, explanation='some error')
)
manager = docker_swarm_service.DockerServiceManager(client=None)
manager.run = run_mock
with pytest.raises(APIErrorMock):
manager.run_safe()
assert run_mock.call_count == 1
def test_get_docker_environment(mocker, docker_swarm_service):
env_file_result = {'TEST1': 'A', 'TEST2': 'B', 'TEST3': 'C'}
env_dict = {'TEST3': 'CC', 'TEST4': 'D'}
env_string = "TEST3=CC,TEST4=D"
env_list = ['TEST3=CC', 'TEST4=D']
expected_result = sorted(['TEST1=A', 'TEST2=B', 'TEST3=CC', 'TEST4=D'])
mocker.patch.object(
docker_swarm_service, 'parse_env_file', return_value=env_file_result
)
mocker.patch.object(
docker_swarm_service,
'format_environment',
side_effect=lambda d: ['{0}={1}'.format(key, value) for key, value in d.items()],
)
# Test with env dict and file
result = docker_swarm_service.get_docker_environment(
env_dict, env_files=['dummypath']
)
assert result == expected_result
# Test with env list and file
result = docker_swarm_service.get_docker_environment(
env_list,
env_files=['dummypath']
)
assert result == expected_result
# Test with env string and file
result = docker_swarm_service.get_docker_environment(
env_string, env_files=['dummypath']
)
assert result == expected_result
assert result == expected_result
# Test with empty env
result = docker_swarm_service.get_docker_environment(
[], env_files=None
)
assert result == []
# Test with empty env_files
result = docker_swarm_service.get_docker_environment(
None, env_files=[]
)
assert result == []
def test_get_nanoseconds_from_raw_option(docker_swarm_service):
value = docker_swarm_service.get_nanoseconds_from_raw_option('test', None)
assert value is None
value = docker_swarm_service.get_nanoseconds_from_raw_option('test', '1m30s535ms')
assert value == 90535000000
value = docker_swarm_service.get_nanoseconds_from_raw_option('test', 10000000000)
assert value == 10000000000
with pytest.raises(ValueError):
docker_swarm_service.get_nanoseconds_from_raw_option('test', [])
def test_has_dict_changed(docker_swarm_service):
assert not docker_swarm_service.has_dict_changed(
{"a": 1},
{"a": 1},
)
assert not docker_swarm_service.has_dict_changed(
{"a": 1},
{"a": 1, "b": 2}
)
assert docker_swarm_service.has_dict_changed(
{"a": 1},
{"a": 2, "b": 2}
)
assert docker_swarm_service.has_dict_changed(
{"a": 1, "b": 1},
{"a": 1}
)
assert not docker_swarm_service.has_dict_changed(
None,
{"a": 2, "b": 2}
)
assert docker_swarm_service.has_dict_changed(
{},
{"a": 2, "b": 2}
)
assert docker_swarm_service.has_dict_changed(
{"a": 1},
{}
)
assert docker_swarm_service.has_dict_changed(
{"a": 1},
None
)
assert not docker_swarm_service.has_dict_changed(
{},
{}
)
assert not docker_swarm_service.has_dict_changed(
None,
None
)
assert not docker_swarm_service.has_dict_changed(
{},
None
)
assert not docker_swarm_service.has_dict_changed(
None,
{}
)
def test_has_list_changed(docker_swarm_service):
# List comparisons without dictionaries
# I could improve the indenting, but pycodestyle wants this instead
assert not docker_swarm_service.has_list_changed(None, None)
assert not docker_swarm_service.has_list_changed(None, [])
assert not docker_swarm_service.has_list_changed(None, [1, 2])
assert not docker_swarm_service.has_list_changed([], None)
assert not docker_swarm_service.has_list_changed([], [])
assert docker_swarm_service.has_list_changed([], [1, 2])
assert docker_swarm_service.has_list_changed([1, 2], None)
assert docker_swarm_service.has_list_changed([1, 2], [])
assert docker_swarm_service.has_list_changed([1, 2, 3], [1, 2])
assert docker_swarm_service.has_list_changed([1, 2], [1, 2, 3])
# Check list sorting
assert not docker_swarm_service.has_list_changed([1, 2], [2, 1])
assert docker_swarm_service.has_list_changed(
[1, 2],
[2, 1],
sort_lists=False
)
# Check type matching
assert docker_swarm_service.has_list_changed([None, 1], [2, 1])
assert docker_swarm_service.has_list_changed([2, 1], [None, 1])
assert docker_swarm_service.has_list_changed(
"command --with args",
['command', '--with', 'args']
)
assert docker_swarm_service.has_list_changed(
['sleep', '3400'],
[u'sleep', u'3600'],
sort_lists=False
)
# List comparisons with dictionaries
assert not docker_swarm_service.has_list_changed(
[{'a': 1}],
[{'a': 1}],
sort_key='a'
)
assert not docker_swarm_service.has_list_changed(
[{'a': 1}, {'a': 2}],
[{'a': 1}, {'a': 2}],
sort_key='a'
)
with pytest.raises(Exception):
docker_swarm_service.has_list_changed(
[{'a': 1}, {'a': 2}],
[{'a': 1}, {'a': 2}]
)
# List sort checking with sort key
assert not docker_swarm_service.has_list_changed(
[{'a': 1}, {'a': 2}],
[{'a': 2}, {'a': 1}],
sort_key='a'
)
assert docker_swarm_service.has_list_changed(
[{'a': 1}, {'a': 2}],
[{'a': 2}, {'a': 1}],
sort_lists=False
)
assert docker_swarm_service.has_list_changed(
[{'a': 1}, {'a': 2}, {'a': 3}],
[{'a': 2}, {'a': 1}],
sort_key='a'
)
assert docker_swarm_service.has_list_changed(
[{'a': 1}, {'a': 2}],
[{'a': 1}, {'a': 2}, {'a': 3}],
sort_lists=False
)
# Additional dictionary elements
assert not docker_swarm_service.has_list_changed(
[
{"src": 1, "dst": 2},
{"src": 1, "dst": 2, "protocol": "udp"},
],
[
{"src": 1, "dst": 2, "protocol": "tcp"},
{"src": 1, "dst": 2, "protocol": "udp"},
],
sort_key='dst'
)
assert not docker_swarm_service.has_list_changed(
[
{"src": 1, "dst": 2, "protocol": "udp"},
{"src": 1, "dst": 3, "protocol": "tcp"},
],
[
{"src": 1, "dst": 2, "protocol": "udp"},
{"src": 1, "dst": 3, "protocol": "tcp"},
],
sort_key='dst'
)
assert docker_swarm_service.has_list_changed(
[
{"src": 1, "dst": 2, "protocol": "udp"},
{"src": 1, "dst": 2},
{"src": 3, "dst": 4},
],
[
{"src": 1, "dst": 3, "protocol": "udp"},
{"src": 1, "dst": 2, "protocol": "tcp"},
{"src": 3, "dst": 4, "protocol": "tcp"},
],
sort_key='dst'
)
assert docker_swarm_service.has_list_changed(
[
{"src": 1, "dst": 3, "protocol": "tcp"},
{"src": 1, "dst": 2, "protocol": "udp"},
],
[
{"src": 1, "dst": 2, "protocol": "tcp"},
{"src": 1, "dst": 2, "protocol": "udp"},
],
sort_key='dst'
)
assert docker_swarm_service.has_list_changed(
[
{"src": 1, "dst": 2, "protocol": "udp"},
{"src": 1, "dst": 2, "protocol": "tcp", "extra": {"test": "foo"}},
],
[
{"src": 1, "dst": 2, "protocol": "udp"},
{"src": 1, "dst": 2, "protocol": "tcp"},
],
sort_key='dst'
)
assert not docker_swarm_service.has_list_changed(
[{'id': '123', 'aliases': []}],
[{'id': '123'}],
sort_key='id'
)
def test_have_networks_changed(docker_swarm_service):
assert not docker_swarm_service.have_networks_changed(
None,
None
)
assert not docker_swarm_service.have_networks_changed(
[],
None
)
assert not docker_swarm_service.have_networks_changed(
[{'id': 1}],
[{'id': 1}]
)
assert docker_swarm_service.have_networks_changed(
[{'id': 1}],
[{'id': 1}, {'id': 2}]
)
assert not docker_swarm_service.have_networks_changed(
[{'id': 1}, {'id': 2}],
[{'id': 1}, {'id': 2}]
)
assert not docker_swarm_service.have_networks_changed(
[{'id': 1}, {'id': 2}],
[{'id': 2}, {'id': 1}]
)
assert not docker_swarm_service.have_networks_changed(
[
{'id': 1},
{'id': 2, 'aliases': []}
],
[
{'id': 1},
{'id': 2}
]
)
assert docker_swarm_service.have_networks_changed(
[
{'id': 1},
{'id': 2, 'aliases': ['alias1']}
],
[
{'id': 1},
{'id': 2}
]
)
assert docker_swarm_service.have_networks_changed(
[
{'id': 1},
{'id': 2, 'aliases': ['alias1', 'alias2']}
],
[
{'id': 1},
{'id': 2, 'aliases': ['alias1']}
]
)
assert not docker_swarm_service.have_networks_changed(
[
{'id': 1},
{'id': 2, 'aliases': ['alias1', 'alias2']}
],
[
{'id': 1},
{'id': 2, 'aliases': ['alias1', 'alias2']}
]
)
assert not docker_swarm_service.have_networks_changed(
[
{'id': 1},
{'id': 2, 'aliases': ['alias1', 'alias2']}
],
[
{'id': 1},
{'id': 2, 'aliases': ['alias2', 'alias1']}
]
)
assert not docker_swarm_service.have_networks_changed(
[
{'id': 1, 'options': {}},
{'id': 2, 'aliases': ['alias1', 'alias2']}],
[
{'id': 1},
{'id': 2, 'aliases': ['alias2', 'alias1']}
]
)
assert not docker_swarm_service.have_networks_changed(
[
{'id': 1, 'options': {'option1': 'value1'}},
{'id': 2, 'aliases': ['alias1', 'alias2']}],
[
{'id': 1, 'options': {'option1': 'value1'}},
{'id': 2, 'aliases': ['alias2', 'alias1']}
]
)
assert docker_swarm_service.have_networks_changed(
[
{'id': 1, 'options': {'option1': 'value1'}},
{'id': 2, 'aliases': ['alias1', 'alias2']}],
[
{'id': 1, 'options': {'option1': 'value2'}},
{'id': 2, 'aliases': ['alias2', 'alias1']}
]
)
def test_get_docker_networks(docker_swarm_service):
network_names = [
'network_1',
'network_2',
'network_3',
'network_4',
]
networks = [
network_names[0],
{'name': network_names[1]},
{'name': network_names[2], 'aliases': ['networkalias1']},
{'name': network_names[3], 'aliases': ['networkalias2'], 'options': {'foo': 'bar'}},
]
network_ids = {
network_names[0]: '1',
network_names[1]: '2',
network_names[2]: '3',
network_names[3]: '4',
}
parsed_networks = docker_swarm_service.get_docker_networks(
networks,
network_ids
)
assert len(parsed_networks) == 4
for i, network in enumerate(parsed_networks):
assert 'name' not in network
assert 'id' in network
expected_name = network_names[i]
assert network['id'] == network_ids[expected_name]
if i == 2:
assert network['aliases'] == ['networkalias1']
if i == 3:
assert network['aliases'] == ['networkalias2']
if i == 3:
assert 'foo' in network['options']
# Test missing name
with pytest.raises(TypeError):
docker_swarm_service.get_docker_networks([{'invalid': 'err'}], {'err': 1})
# test for invalid aliases type
with pytest.raises(TypeError):
docker_swarm_service.get_docker_networks(
[{'name': 'test', 'aliases': 1}],
{'test': 1}
)
# Test invalid aliases elements
with pytest.raises(TypeError):
docker_swarm_service.get_docker_networks(
[{'name': 'test', 'aliases': [1]}],
{'test': 1}
)
# Test for invalid options type
with pytest.raises(TypeError):
docker_swarm_service.get_docker_networks(
[{'name': 'test', 'options': 1}],
{'test': 1}
)
# Test for invalid networks type
with pytest.raises(TypeError):
docker_swarm_service.get_docker_networks(
1,
{'test': 1}
)
# Test for non existing networks
with pytest.raises(ValueError):
docker_swarm_service.get_docker_networks(
[{'name': 'idontexist'}],
{'test': 1}
)
# Test empty values
assert docker_swarm_service.get_docker_networks([], {}) == []
assert docker_swarm_service.get_docker_networks(None, {}) is None
# Test invalid options
with pytest.raises(TypeError):
docker_swarm_service.get_docker_networks(
[{'name': 'test', 'nonexisting_option': 'foo'}],
{'test': '1'}
)

View file

@ -0,0 +1,36 @@
# Copyright (c) 2018 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import json
import pytest
from ansible_collections.community.general.plugins.modules.cloud.docker import docker_volume
from ansible_collections.community.general.plugins.module_utils.docker import common
pytestmark = pytest.mark.usefixtures('patch_ansible_module')
TESTCASE_DOCKER_VOLUME = [
{
'name': 'daemon_config',
'state': 'present'
}
]
@pytest.mark.parametrize('patch_ansible_module', TESTCASE_DOCKER_VOLUME, indirect=['patch_ansible_module'])
def test_create_volume_on_invalid_docker_version(mocker, capfd):
mocker.patch.object(common, 'HAS_DOCKER_PY', True)
mocker.patch.object(common, 'docker_version', '1.8.0')
with pytest.raises(SystemExit):
docker_volume.main()
out, dummy = capfd.readouterr()
results = json.loads(out)
assert results['failed']
assert 'Error: Docker SDK for Python version is 1.8.0 ' in results['msg']
assert 'Minimum version required is 1.10.0.' in results['msg']

View file

@ -0,0 +1,61 @@
import unittest
from ansible_collections.community.general.plugins.modules.cloud.google.gce_tag import _get_changed_items, _intersect_items, _union_items
class TestGCETag(unittest.TestCase):
"""Unit tests for gce_tag module."""
def test_union_items(self):
"""
Combine items in both lists
removing duplicates.
"""
listA = [1, 2, 3, 4, 5, 8, 9]
listB = [1, 2, 3, 4, 5, 6, 7]
want = [1, 2, 3, 4, 5, 6, 7, 8, 9]
got = _union_items(listA, listB)
self.assertEqual(want, got)
def test_intersect_items(self):
"""
All unique items from either list.
"""
listA = [1, 2, 3, 4, 5, 8, 9]
listB = [1, 2, 3, 4, 5, 6, 7]
want = [1, 2, 3, 4, 5]
got = _intersect_items(listA, listB)
self.assertEqual(want, got)
# tags removed
new_tags = ['one', 'two']
existing_tags = ['two']
want = ['two'] # only remove the tag that was present
got = _intersect_items(existing_tags, new_tags)
self.assertEqual(want, got)
def test_get_changed_items(self):
"""
All the items from left list that don't match
any item from the right list.
"""
listA = [1, 2, 3, 4, 5, 8, 9]
listB = [1, 2, 3, 4, 5, 6, 7]
want = [8, 9]
got = _get_changed_items(listA, listB)
self.assertEqual(want, got)
# simulate new tags added
tags_to_add = ['one', 'two']
existing_tags = ['two']
want = ['one']
got = _get_changed_items(tags_to_add, existing_tags)
self.assertEqual(want, got)
# simulate removing tags
# specifying one tag on right that doesn't exist
tags_to_remove = ['one', 'two']
existing_tags = ['two', 'three']
want = ['three']
got = _get_changed_items(existing_tags, tags_to_remove)
self.assertEqual(want, got)

View file

@ -0,0 +1,30 @@
import unittest
from ansible_collections.community.general.plugins.modules.cloud.google.gcp_forwarding_rule import _build_global_forwarding_rule_dict
class TestGCPFowardingRule(unittest.TestCase):
"""Unit tests for gcp_fowarding_rule module."""
params_dict = {
'forwarding_rule_name': 'foo_fowarding_rule_name',
'address': 'foo_external_address',
'target': 'foo_targetproxy',
'region': 'global',
'port_range': 80,
'protocol': 'TCP',
'state': 'present',
}
def test__build_global_forwarding_rule_dict(self):
expected = {
'name': 'foo_fowarding_rule_name',
'IPAddress': 'https://www.googleapis.com/compute/v1/projects/my-project/global/addresses/foo_external_address',
'target': 'https://www.googleapis.com/compute/v1/projects/my-project/global/targetHttpProxies/foo_targetproxy',
'region': 'global',
'portRange': 80,
'IPProtocol': 'TCP',
}
actual = _build_global_forwarding_rule_dict(
self.params_dict, 'my-project')
self.assertEqual(expected, actual)

View file

@ -0,0 +1,164 @@
import unittest
from ansible_collections.community.general.plugins.modules.cloud.google.gcp_url_map import _build_path_matchers, _build_url_map_dict
class TestGCPUrlMap(unittest.TestCase):
"""Unit tests for gcp_url_map module."""
params_dict = {
'url_map_name': 'foo_url_map_name',
'description': 'foo_url_map description',
'host_rules': [
{
'description': 'host rules description',
'hosts': [
'www.example.com',
'www2.example.com'
],
'path_matcher': 'host_rules_path_matcher'
}
],
'path_matchers': [
{
'name': 'path_matcher_one',
'description': 'path matcher one',
'defaultService': 'bes-pathmatcher-one-default',
'pathRules': [
{
'service': 'my-one-bes',
'paths': [
'/',
'/aboutus'
]
}
]
},
{
'name': 'path_matcher_two',
'description': 'path matcher two',
'defaultService': 'bes-pathmatcher-two-default',
'pathRules': [
{
'service': 'my-two-bes',
'paths': [
'/webapp',
'/graphs'
]
}
]
}
]
}
def test__build_path_matchers(self):
input_list = [
{
'defaultService': 'bes-pathmatcher-one-default',
'description': 'path matcher one',
'name': 'path_matcher_one',
'pathRules': [
{
'paths': [
'/',
'/aboutus'
],
'service': 'my-one-bes'
}
]
},
{
'defaultService': 'bes-pathmatcher-two-default',
'description': 'path matcher two',
'name': 'path_matcher_two',
'pathRules': [
{
'paths': [
'/webapp',
'/graphs'
],
'service': 'my-two-bes'
}
]
}
]
expected = [
{
'defaultService': 'https://www.googleapis.com/compute/v1/projects/my-project/global/backendServices/bes-pathmatcher-one-default',
'description': 'path matcher one',
'name': 'path_matcher_one',
'pathRules': [
{
'paths': [
'/',
'/aboutus'
],
'service': 'https://www.googleapis.com/compute/v1/projects/my-project/global/backendServices/my-one-bes'
}
]
},
{
'defaultService': 'https://www.googleapis.com/compute/v1/projects/my-project/global/backendServices/bes-pathmatcher-two-default',
'description': 'path matcher two',
'name': 'path_matcher_two',
'pathRules': [
{
'paths': [
'/webapp',
'/graphs'
],
'service': 'https://www.googleapis.com/compute/v1/projects/my-project/global/backendServices/my-two-bes'
}
]
}
]
actual = _build_path_matchers(input_list, 'my-project')
self.assertEqual(expected, actual)
def test__build_url_map_dict(self):
expected = {
'description': 'foo_url_map description',
'hostRules': [
{
'description': 'host rules description',
'hosts': [
'www.example.com',
'www2.example.com'
],
'pathMatcher': 'host_rules_path_matcher'
}
],
'name': 'foo_url_map_name',
'pathMatchers': [
{
'defaultService': 'https://www.googleapis.com/compute/v1/projects/my-project/global/backendServices/bes-pathmatcher-one-default',
'description': 'path matcher one',
'name': 'path_matcher_one',
'pathRules': [
{
'paths': [
'/',
'/aboutus'
],
'service': 'https://www.googleapis.com/compute/v1/projects/my-project/global/backendServices/my-one-bes'
}
]
},
{
'defaultService': 'https://www.googleapis.com/compute/v1/projects/my-project/global/backendServices/bes-pathmatcher-two-default',
'description': 'path matcher two',
'name': 'path_matcher_two',
'pathRules': [
{
'paths': [
'/webapp',
'/graphs'
],
'service': 'https://www.googleapis.com/compute/v1/projects/my-project/global/backendServices/my-two-bes'
}
]
}
]
}
actual = _build_url_map_dict(self.params_dict, 'my-project')
self.assertEqual(expected, actual)

View file

@ -0,0 +1,73 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import pytest
from ansible_collections.community.general.tests.unit.compat.mock import MagicMock
from ansible_collections.community.kubernetes.plugins.module_utils.common import K8sAnsibleMixin
from ansible_collections.community.kubernetes.plugins.module_utils.raw import KubernetesRawModule
from ansible_collections.community.general.plugins.module_utils.kubevirt import KubeVirtRawModule
import openshift.dynamic
RESOURCE_DEFAULT_ARGS = {'api_version': 'v1alpha3', 'group': 'kubevirt.io',
'prefix': 'apis', 'namespaced': True}
class AnsibleExitJson(Exception):
"""Exception class to be raised by module.exit_json and caught
by the test case"""
def __init__(self, **kwargs):
for k in kwargs:
setattr(self, k, kwargs[k])
def __getitem__(self, attr):
return getattr(self, attr)
class AnsibleFailJson(Exception):
"""Exception class to be raised by module.fail_json and caught
by the test case"""
def __init__(self, **kwargs):
for k in kwargs:
setattr(self, k, kwargs[k])
def __getitem__(self, attr):
return getattr(self, attr)
def exit_json(*args, **kwargs):
kwargs['success'] = True
if 'changed' not in kwargs:
kwargs['changed'] = False
raise AnsibleExitJson(**kwargs)
def fail_json(*args, **kwargs):
kwargs['success'] = False
raise AnsibleFailJson(**kwargs)
@pytest.fixture()
def base_fixture(monkeypatch):
monkeypatch.setattr(
KubernetesRawModule, "exit_json", exit_json)
monkeypatch.setattr(
KubernetesRawModule, "fail_json", fail_json)
# Create mock methods in Resource directly, otherwise dyn client
# tries binding those to corresponding methods in DynamicClient
# (with partial()), which is more problematic to intercept
openshift.dynamic.Resource.get = MagicMock()
openshift.dynamic.Resource.create = MagicMock()
openshift.dynamic.Resource.delete = MagicMock()
openshift.dynamic.Resource.patch = MagicMock()
openshift.dynamic.Resource.search = MagicMock()
openshift.dynamic.Resource.watch = MagicMock()
# Globally mock some methods, since all tests will use this
KubernetesRawModule.patch_resource = MagicMock()
KubernetesRawModule.patch_resource.return_value = ({}, None)
K8sAnsibleMixin.get_api_client = MagicMock()
K8sAnsibleMixin.get_api_client.return_value = None
K8sAnsibleMixin.find_resource = MagicMock()
KubeVirtRawModule.find_supported_resource = MagicMock()

View file

@ -0,0 +1,75 @@
import pytest
openshiftdynamic = pytest.importorskip("openshift.dynamic")
from ansible_collections.community.general.tests.unit.plugins.modules.utils import set_module_args
from .kubevirt_fixtures import base_fixture, RESOURCE_DEFAULT_ARGS, AnsibleExitJson
from ansible_collections.community.kubernetes.plugins.module_utils.raw import KubernetesRawModule
from ansible_collections.community.general.plugins.modules.cloud.kubevirt import kubevirt_rs as mymodule
KIND = 'VirtualMachineInstanceReplicaSet'
@pytest.mark.usefixtures("base_fixture")
@pytest.mark.parametrize("_replicas, _changed", ((1, True),
(3, True),
(2, False),
(5, True),))
def test_scale_rs_nowait(_replicas, _changed):
_name = 'test-rs'
# Desired state:
args = dict(name=_name, namespace='vms', replicas=_replicas, wait=False)
set_module_args(args)
# Mock pre-change state:
resource_args = dict(kind=KIND, **RESOURCE_DEFAULT_ARGS)
mymodule.KubeVirtVMIRS.find_supported_resource.return_value = openshiftdynamic.Resource(**resource_args)
res_inst = openshiftdynamic.ResourceInstance('', dict(kind=KIND, metadata={'name': _name}, spec={'replicas': 2}))
openshiftdynamic.Resource.get.return_value = res_inst
openshiftdynamic.Resource.search.return_value = [res_inst]
# Final state, after patching the object
KubernetesRawModule.patch_resource.return_value = dict(kind=KIND, metadata={'name': _name},
spec={'replicas': _replicas}), None
# Run code:
with pytest.raises(AnsibleExitJson) as result:
mymodule.KubeVirtVMIRS().execute_module()
# Verify result:
assert result.value['changed'] == _changed
@pytest.mark.usefixtures("base_fixture")
@pytest.mark.parametrize("_replicas, _success", ((1, False),
(2, False),
(5, True),))
def test_scale_rs_wait(_replicas, _success):
_name = 'test-rs'
# Desired state:
args = dict(name=_name, namespace='vms', replicas=5, wait=True)
set_module_args(args)
# Mock pre-change state:
resource_args = dict(kind=KIND, **RESOURCE_DEFAULT_ARGS)
mymodule.KubeVirtVMIRS.find_supported_resource.return_value = openshiftdynamic.Resource(**resource_args)
res_inst = openshiftdynamic.ResourceInstance('', dict(kind=KIND, metadata={'name': _name}, spec={'replicas': 2}))
openshiftdynamic.Resource.get.return_value = res_inst
openshiftdynamic.Resource.search.return_value = [res_inst]
# ~Final state, after patching the object (`replicas` match desired state)
KubernetesRawModule.patch_resource.return_value = dict(kind=KIND, name=_name, metadata={'name': _name},
spec={'replicas': 5}), None
# Final final state, as returned by resource.watch()
final_obj = dict(metadata=dict(name=_name), status=dict(readyReplicas=_replicas), **resource_args)
event = openshiftdynamic.ResourceInstance(None, final_obj)
openshiftdynamic.Resource.watch.return_value = [dict(object=event)]
# Run code:
with pytest.raises(Exception) as result:
mymodule.KubeVirtVMIRS().execute_module()
# Verify result:
assert result.value['success'] == _success

View file

@ -0,0 +1,110 @@
import pytest
openshiftdynamic = pytest.importorskip("openshift.dynamic")
from ansible_collections.community.general.tests.unit.plugins.modules.utils import set_module_args
from .kubevirt_fixtures import base_fixture, RESOURCE_DEFAULT_ARGS, AnsibleExitJson
from ansible_collections.community.general.plugins.module_utils.kubevirt import KubeVirtRawModule
from ansible_collections.community.general.plugins.modules.cloud.kubevirt import kubevirt_vm as mymodule
KIND = 'VirtulMachine'
@pytest.mark.usefixtures("base_fixture")
def test_create_vm_with_multus_nowait():
# Desired state:
args = dict(
state='present', name='testvm',
namespace='vms',
interfaces=[
{'bridge': {}, 'name': 'default', 'network': {'pod': {}}},
{'bridge': {}, 'name': 'mynet', 'network': {'multus': {'networkName': 'mynet'}}},
],
wait=False,
)
set_module_args(args)
# State as "returned" by the "k8s cluster":
resource_args = dict(kind=KIND, **RESOURCE_DEFAULT_ARGS)
KubeVirtRawModule.find_supported_resource.return_value = openshiftdynamic.Resource(**resource_args)
openshiftdynamic.Resource.get.return_value = None # Object doesn't exist in the cluster
# Run code:
with pytest.raises(AnsibleExitJson) as result:
mymodule.KubeVirtVM().execute_module()
# Verify result:
assert result.value['changed']
assert result.value['method'] == 'create'
@pytest.mark.usefixtures("base_fixture")
@pytest.mark.parametrize("_wait", (False, True))
def test_vm_is_absent(_wait):
# Desired state:
args = dict(
state='absent', name='testvmi',
namespace='vms',
wait=_wait,
)
set_module_args(args)
# State as "returned" by the "k8s cluster":
resource_args = dict(kind=KIND, **RESOURCE_DEFAULT_ARGS)
KubeVirtRawModule.find_supported_resource.return_value = openshiftdynamic.Resource(**resource_args)
openshiftdynamic.Resource.get.return_value = None # Object doesn't exist in the cluster
# Run code:
with pytest.raises(AnsibleExitJson) as result:
mymodule.KubeVirtVM().execute_module()
# Verify result:
assert not result.value['kubevirt_vm']
assert result.value['method'] == 'delete'
# Note: nothing actually gets deleted, as we mock that there's not object in the cluster present,
# so if the method changes to something other than 'delete' at some point, that's fine
@pytest.mark.usefixtures("base_fixture")
def test_vmpreset_create():
KIND = 'VirtulMachineInstancePreset'
# Desired state:
args = dict(state='present', name='testvmipreset', namespace='vms', memory='1024Mi', wait=False)
set_module_args(args)
# State as "returned" by the "k8s cluster":
resource_args = dict(kind=KIND, **RESOURCE_DEFAULT_ARGS)
KubeVirtRawModule.find_supported_resource.return_value = openshiftdynamic.Resource(**resource_args)
openshiftdynamic.Resource.get.return_value = None # Object doesn't exist in the cluster
# Run code:
with pytest.raises(AnsibleExitJson) as result:
mymodule.KubeVirtVM().execute_module()
# Verify result:
assert result.value['changed']
assert result.value['method'] == 'create'
@pytest.mark.usefixtures("base_fixture")
def test_vmpreset_is_absent():
KIND = 'VirtulMachineInstancePreset'
# Desired state:
args = dict(state='absent', name='testvmipreset', namespace='vms')
set_module_args(args)
# State as "returned" by the "k8s cluster":
resource_args = dict(kind=KIND, **RESOURCE_DEFAULT_ARGS)
KubeVirtRawModule.find_supported_resource.return_value = openshiftdynamic.Resource(**resource_args)
openshiftdynamic.Resource.get.return_value = None # Object doesn't exist in the cluster
# Run code:
with pytest.raises(AnsibleExitJson) as result:
mymodule.KubeVirtVM().execute_module()
# Verify result:
assert not result.value['kubevirt_vm']
assert result.value['method'] == 'delete'
# Note: nothing actually gets deleted, as we mock that there's not object in the cluster present,
# so if the method changes to something other than 'delete' at some point, that's fine

View file

@ -0,0 +1,80 @@
import pytest
@pytest.fixture
def api_key(monkeypatch):
monkeypatch.setenv('LINODE_API_KEY', 'foobar')
@pytest.fixture
def auth(monkeypatch):
def patched_test_echo(dummy):
return []
monkeypatch.setattr('linode.api.Api.test_echo', patched_test_echo)
@pytest.fixture
def access_token(monkeypatch):
monkeypatch.setenv('LINODE_ACCESS_TOKEN', 'barfoo')
@pytest.fixture
def no_access_token_in_env(monkeypatch):
try:
monkeypatch.delenv('LINODE_ACCESS_TOKEN')
except KeyError:
pass
@pytest.fixture
def default_args():
return {'state': 'present', 'label': 'foo'}
@pytest.fixture
def mock_linode():
class Linode():
def delete(self, *args, **kwargs):
pass
@property
def _raw_json(self):
return {
"alerts": {
"cpu": 90,
"io": 10000,
"network_in": 10,
"network_out": 10,
"transfer_quota": 80
},
"backups": {
"enabled": False,
"schedule": {
"day": None,
"window": None,
}
},
"created": "2018-09-26T08:12:33",
"group": "Foobar Group",
"hypervisor": "kvm",
"id": 10480444,
"image": "linode/centos7",
"ipv4": [
"130.132.285.233"
],
"ipv6": "2a82:7e00::h03c:46ff:fe04:5cd2/64",
"label": "lin-foo",
"region": "eu-west",
"specs": {
"disk": 25600,
"memory": 1024,
"transfer": 1000,
"vcpus": 1
},
"status": "running",
"tags": [],
"type": "g6-nanode-1",
"updated": "2018-09-26T10:10:14",
"watchdog_enabled": True
}
return Linode()

View file

@ -0,0 +1,15 @@
from __future__ import (absolute_import, division, print_function)
import pytest
from ansible_collections.community.general.plugins.modules.cloud.linode import linode
from ansible_collections.community.general.tests.unit.plugins.modules.utils import set_module_args
if not linode.HAS_LINODE:
pytestmark = pytest.mark.skip('test_linode.py requires the `linode-python` module')
def test_name_is_a_required_parameter(api_key, auth):
with pytest.raises(SystemExit):
set_module_args({})
linode.main()

View file

@ -0,0 +1,323 @@
from __future__ import (absolute_import, division, print_function)
import json
import os
import sys
import pytest
linode_apiv4 = pytest.importorskip('linode_api4')
mandatory_py_version = pytest.mark.skipif(
sys.version_info < (2, 7),
reason='The linode_api4 dependency requires python2.7 or higher'
)
from linode_api4.errors import ApiError as LinodeApiError
from linode_api4 import LinodeClient
from ansible_collections.community.general.plugins.modules.cloud.linode import linode_v4
from ansible_collections.community.general.plugins.module_utils.linode import get_user_agent
from ansible_collections.community.general.tests.unit.plugins.modules.utils import set_module_args
from ansible_collections.community.general.tests.unit.compat import mock
def test_mandatory_state_is_validated(capfd):
with pytest.raises(SystemExit):
set_module_args({'label': 'foo'})
linode_v4.initialise_module()
out, err = capfd.readouterr()
results = json.loads(out)
assert all(txt in results['msg'] for txt in ('state', 'required'))
assert results['failed'] is True
def test_mandatory_label_is_validated(capfd):
with pytest.raises(SystemExit):
set_module_args({'state': 'present'})
linode_v4.initialise_module()
out, err = capfd.readouterr()
results = json.loads(out)
assert all(txt in results['msg'] for txt in ('label', 'required'))
assert results['failed'] is True
def test_mandatory_access_token_is_validated(default_args,
no_access_token_in_env,
capfd):
with pytest.raises(SystemExit):
set_module_args(default_args)
linode_v4.initialise_module()
out, err = capfd.readouterr()
results = json.loads(out)
assert results['failed'] is True
assert all(txt in results['msg'] for txt in (
'missing',
'required',
'access_token',
))
def test_mandatory_access_token_passed_in_env(default_args,
access_token):
set_module_args(default_args)
try:
module = linode_v4.initialise_module()
except SystemExit:
pytest.fail("'access_token' is passed in environment")
now_set_token = module.params['access_token']
assert now_set_token == os.environ['LINODE_ACCESS_TOKEN']
def test_mandatory_access_token_passed_in_as_parameter(default_args,
no_access_token_in_env):
default_args.update({'access_token': 'foo'})
set_module_args(default_args)
try:
module = linode_v4.initialise_module()
except SystemExit:
pytest.fail("'access_token' is passed in as parameter")
assert module.params['access_token'] == 'foo'
def test_instance_by_label_cannot_authenticate(capfd, access_token,
default_args):
set_module_args(default_args)
module = linode_v4.initialise_module()
client = LinodeClient(module.params['access_token'])
target = 'linode_api4.linode_client.LinodeGroup.instances'
with mock.patch(target, side_effect=LinodeApiError('foo')):
with pytest.raises(SystemExit):
linode_v4.maybe_instance_from_label(module, client)
out, err = capfd.readouterr()
results = json.loads(out)
assert results['failed'] is True
assert 'Unable to query the Linode API' in results['msg']
def test_no_instances_found_with_label_gives_none(default_args,
access_token):
set_module_args(default_args)
module = linode_v4.initialise_module()
client = LinodeClient(module.params['access_token'])
target = 'linode_api4.linode_client.LinodeGroup.instances'
with mock.patch(target, return_value=[]):
result = linode_v4.maybe_instance_from_label(module, client)
assert result is None
def test_optional_region_is_validated(default_args, capfd, access_token):
default_args.update({'type': 'foo', 'image': 'bar'})
set_module_args(default_args)
with pytest.raises(SystemExit):
linode_v4.initialise_module()
out, err = capfd.readouterr()
results = json.loads(out)
assert results['failed'] is True
assert all(txt in results['msg'] for txt in (
'required',
'together',
'region'
))
def test_optional_type_is_validated(default_args, capfd, access_token):
default_args.update({'region': 'foo', 'image': 'bar'})
set_module_args(default_args)
with pytest.raises(SystemExit):
linode_v4.initialise_module()
out, err = capfd.readouterr()
results = json.loads(out)
assert results['failed'] is True
assert all(txt in results['msg'] for txt in (
'required',
'together',
'type'
))
def test_optional_image_is_validated(default_args, capfd, access_token):
default_args.update({'type': 'foo', 'region': 'bar'})
set_module_args(default_args)
with pytest.raises(SystemExit):
linode_v4.initialise_module()
out, err = capfd.readouterr()
results = json.loads(out)
assert results['failed'] is True
assert all(txt in results['msg'] for txt in (
'required',
'together',
'image'
))
def test_instance_already_created(default_args,
mock_linode,
capfd,
access_token):
default_args.update({
'type': 'foo',
'region': 'bar',
'image': 'baz'
})
set_module_args(default_args)
target = 'linode_api4.linode_client.LinodeGroup.instances'
with mock.patch(target, return_value=[mock_linode]):
with pytest.raises(SystemExit) as sys_exit_exc:
linode_v4.main()
assert sys_exit_exc.value.code == 0
out, err = capfd.readouterr()
results = json.loads(out)
assert results['changed'] is False
assert 'root_password' not in results['instance']
assert (
results['instance']['label'] ==
mock_linode._raw_json['label']
)
def test_instance_to_be_created_without_root_pass(default_args,
mock_linode,
capfd,
access_token):
default_args.update({
'type': 'foo',
'region': 'bar',
'image': 'baz'
})
set_module_args(default_args)
target = 'linode_api4.linode_client.LinodeGroup.instances'
with mock.patch(target, return_value=[]):
with pytest.raises(SystemExit) as sys_exit_exc:
target = 'linode_api4.linode_client.LinodeGroup.instance_create'
with mock.patch(target, return_value=(mock_linode, 'passw0rd')):
linode_v4.main()
assert sys_exit_exc.value.code == 0
out, err = capfd.readouterr()
results = json.loads(out)
assert results['changed'] is True
assert (
results['instance']['label'] ==
mock_linode._raw_json['label']
)
assert results['instance']['root_pass'] == 'passw0rd'
def test_instance_to_be_created_with_root_pass(default_args,
mock_linode,
capfd,
access_token):
default_args.update({
'type': 'foo',
'region': 'bar',
'image': 'baz',
'root_pass': 'passw0rd',
})
set_module_args(default_args)
target = 'linode_api4.linode_client.LinodeGroup.instances'
with mock.patch(target, return_value=[]):
with pytest.raises(SystemExit) as sys_exit_exc:
target = 'linode_api4.linode_client.LinodeGroup.instance_create'
with mock.patch(target, return_value=mock_linode):
linode_v4.main()
assert sys_exit_exc.value.code == 0
out, err = capfd.readouterr()
results = json.loads(out)
assert results['changed'] is True
assert (
results['instance']['label'] ==
mock_linode._raw_json['label']
)
assert 'root_pass' not in results['instance']
def test_instance_to_be_deleted(default_args,
mock_linode,
capfd,
access_token):
default_args.update({'state': 'absent'})
set_module_args(default_args)
target = 'linode_api4.linode_client.LinodeGroup.instances'
with mock.patch(target, return_value=[mock_linode]):
with pytest.raises(SystemExit) as sys_exit_exc:
linode_v4.main()
assert sys_exit_exc.value.code == 0
out, err = capfd.readouterr()
results = json.loads(out)
assert results['changed'] is True
assert (
results['instance']['label'] ==
mock_linode._raw_json['label']
)
def test_instance_already_deleted_no_change(default_args,
mock_linode,
capfd,
access_token):
default_args.update({'state': 'absent'})
set_module_args(default_args)
target = 'linode_api4.linode_client.LinodeGroup.instances'
with mock.patch(target, return_value=[]):
with pytest.raises(SystemExit) as sys_exit_exc:
linode_v4.main()
assert sys_exit_exc.value.code == 0
out, err = capfd.readouterr()
results = json.loads(out)
assert results['changed'] is False
assert results['instance'] == {}
def test_user_agent_created_properly():
try:
from ansible.module_utils.ansible_release import (
__version__ as ansible_version
)
except ImportError:
ansible_version = 'unknown'
expected_user_agent = 'Ansible-linode_v4_module/%s' % ansible_version
assert expected_user_agent == get_user_agent('linode_v4_module')

View file

@ -0,0 +1,20 @@
# Copyright: (c) 2019, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
import json
import pytest
from ansible_collections.community.general.plugins.modules.cloud.misc import terraform
from ansible_collections.community.general.tests.unit.plugins.modules.utils import set_module_args
def test_terraform_without_argument(capfd):
set_module_args({})
with pytest.raises(SystemExit) as results:
terraform.main()
out, err = capfd.readouterr()
assert not err
assert json.loads(out)['failed']
assert 'project_path' in json.loads(out)['msg']

View file

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
#
# Copyright: (c) 2019, Bojan Vitnik <bvitnik@mainstream.rs>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
class AnsibleModuleException(Exception):
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
class ExitJsonException(AnsibleModuleException):
pass
class FailJsonException(AnsibleModuleException):
pass
class FakeAnsibleModule:
def __init__(self, params=None, check_mode=False):
self.params = params
self.check_mode = check_mode
def exit_json(self, *args, **kwargs):
raise ExitJsonException(*args, **kwargs)
def fail_json(self, *args, **kwargs):
raise FailJsonException(*args, **kwargs)

View file

@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
#
# Copyright: (c) 2019, Bojan Vitnik <bvitnik@mainstream.rs>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
FAKE_API_VERSION = "1.1"
class Failure(Exception):
def __init__(self, details):
self.details = details
def __str__(self):
return str(self.details)
class Session(object):
def __init__(self, uri, transport=None, encoding=None, verbose=0,
allow_none=1, ignore_ssl=False):
self.transport = transport
self._session = None
self.last_login_method = None
self.last_login_params = None
self.API_version = FAKE_API_VERSION
def _get_api_version(self):
return FAKE_API_VERSION
def _login(self, method, params):
self._session = "OpaqueRef:fake-xenapi-session-ref"
self.last_login_method = method
self.last_login_params = params
self.API_version = self._get_api_version()
def _logout(self):
self._session = None
self.last_login_method = None
self.last_login_params = None
self.API_version = FAKE_API_VERSION
def xenapi_request(self, methodname, params):
if methodname.startswith('login'):
self._login(methodname, params)
return None
elif methodname == 'logout' or methodname == 'session.logout':
self._logout()
return None
else:
# Should be patched with mocker.patch().
return None
def __getattr__(self, name):
if name == 'handle':
return self._session
elif name == 'xenapi':
# Should be patched with mocker.patch().
return None
elif name.startswith('login') or name.startswith('slave_local'):
return lambda *params: self._login(name, params)
elif name == 'logout':
return self._logout
def xapi_local():
return Session("http://_var_lib_xcp_xapi/")

View file

@ -0,0 +1,11 @@
# -*- coding: utf-8 -*-
#
# Copyright: (c) 2019, Bojan Vitnik <bvitnik@mainstream.rs>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
def fake_xenapi_ref(xenapi_class):
return "OpaqueRef:fake-xenapi-%s-ref" % xenapi_class

View file

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
#
# Copyright: (c) 2019, Bojan Vitnik <bvitnik@mainstream.rs>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import sys
import importlib
import pytest
from .FakeAnsibleModule import FakeAnsibleModule
@pytest.fixture
def fake_ansible_module(request):
"""Returns fake AnsibleModule with fake module params."""
if hasattr(request, 'param'):
return FakeAnsibleModule(request.param)
else:
params = {
"hostname": "somehost",
"username": "someuser",
"password": "somepwd",
"validate_certs": True,
}
return FakeAnsibleModule(params)
@pytest.fixture(autouse=True)
def XenAPI():
"""Imports and returns fake XenAPI module."""
# Import of fake XenAPI module is wrapped by fixture so that it does not
# affect other unit tests which could potentialy also use XenAPI module.
# First we use importlib.import_module() to import the module and assign
# it to a local symbol.
fake_xenapi = importlib.import_module('ansible_collections.community.general.tests.unit.plugins.modules.cloud.xenserver.FakeXenAPI')
# Now we populate Python module cache with imported fake module using the
# original module name (XenAPI). That way, any 'import XenAPI' statement
# will just load already imported fake module from the cache.
sys.modules['XenAPI'] = fake_xenapi
return fake_xenapi
@pytest.fixture
def xenserver_guest_info(XenAPI):
"""Imports and returns xenserver_guest_info module."""
# Since we are wrapping fake XenAPI module inside a fixture, all modules
# that depend on it have to be imported inside a test function. To make
# this easier to handle and remove some code repetition, we wrap the import
# of xenserver_guest_info module with a fixture.
from ansible_collections.community.general.plugins.modules.cloud.xenserver import xenserver_guest_info
return xenserver_guest_info
@pytest.fixture
def xenserver_guest_powerstate(XenAPI):
"""Imports and returns xenserver_guest_powerstate module."""
# Since we are wrapping fake XenAPI module inside a fixture, all modules
# that depend on it have to be imported inside a test function. To make
# this easier to handle and remove some code repetition, we wrap the import
# of xenserver_guest_powerstate module with a fixture.
from ansible_collections.community.general.plugins.modules.cloud.xenserver import xenserver_guest_powerstate
return xenserver_guest_powerstate

View file

@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
#
# Copyright: (c) 2019, Bojan Vitnik <bvitnik@mainstream.rs>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import json
import pytest
from .common import fake_xenapi_ref
pytestmark = pytest.mark.usefixtures('patch_ansible_module')
testcase_module_params = {
"params": [
{
"hostname": "somehost",
"username": "someuser",
"password": "somepwd",
"name": "somevmname",
},
{
"hostname": "somehost",
"username": "someuser",
"password": "somepwd",
"uuid": "somevmuuid",
},
{
"hostname": "somehost",
"username": "someuser",
"password": "somepwd",
"name": "somevmname",
"uuid": "somevmuuid",
},
],
"ids": [
"name",
"uuid",
"name+uuid",
],
}
@pytest.mark.parametrize('patch_ansible_module', testcase_module_params['params'], ids=testcase_module_params['ids'], indirect=True)
def test_xenserver_guest_info(mocker, capfd, XenAPI, xenserver_guest_info):
"""
Tests regular module invocation including parsing and propagation of
module params and module output.
"""
fake_vm_facts = {"fake-vm-fact": True}
mocker.patch('ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_info.get_object_ref', return_value=None)
mocker.patch('ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_info.gather_vm_params', return_value=None)
mocker.patch('ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_info.gather_vm_facts', return_value=fake_vm_facts)
mocked_xenapi = mocker.patch.object(XenAPI.Session, 'xenapi', create=True)
mocked_returns = {
"pool.get_all.return_value": [fake_xenapi_ref('pool')],
"pool.get_default_SR.return_value": fake_xenapi_ref('SR'),
}
mocked_xenapi.configure_mock(**mocked_returns)
mocker.patch('ansible_collections.community.general.plugins.module_utils.xenserver.get_xenserver_version', return_value=[7, 2, 0])
with pytest.raises(SystemExit):
xenserver_guest_info.main()
out, err = capfd.readouterr()
result = json.loads(out)
assert result['instance'] == fake_vm_facts

View file

@ -0,0 +1,298 @@
# -*- coding: utf-8 -*-
#
# Copyright: (c) 2019, Bojan Vitnik <bvitnik@mainstream.rs>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import json
import pytest
from .common import fake_xenapi_ref
testcase_set_powerstate = {
"params": [
(False, "someoldstate"),
(True, "somenewstate"),
],
"ids": [
"state-same",
"state-changed",
],
}
testcase_module_params_state_present = {
"params": [
{
"hostname": "somehost",
"username": "someuser",
"password": "somepwd",
"name": "somevmname",
},
{
"hostname": "somehost",
"username": "someuser",
"password": "somepwd",
"name": "somevmname",
"state": "present",
},
],
"ids": [
"present-implicit",
"present-explicit",
],
}
testcase_module_params_state_other = {
"params": [
{
"hostname": "somehost",
"username": "someuser",
"password": "somepwd",
"name": "somevmname",
"state": "powered-on",
},
{
"hostname": "somehost",
"username": "someuser",
"password": "somepwd",
"name": "somevmname",
"state": "powered-off",
},
{
"hostname": "somehost",
"username": "someuser",
"password": "somepwd",
"name": "somevmname",
"state": "restarted",
},
{
"hostname": "somehost",
"username": "someuser",
"password": "somepwd",
"name": "somevmname",
"state": "shutdown-guest",
},
{
"hostname": "somehost",
"username": "someuser",
"password": "somepwd",
"name": "somevmname",
"state": "reboot-guest",
},
{
"hostname": "somehost",
"username": "someuser",
"password": "somepwd",
"name": "somevmname",
"state": "suspended",
},
],
"ids": [
"powered-on",
"powered-off",
"restarted",
"shutdown-guest",
"reboot-guest",
"suspended",
],
}
testcase_module_params_wait = {
"params": [
{
"hostname": "somehost",
"username": "someuser",
"password": "somepwd",
"name": "somevmname",
"state": "present",
"wait_for_ip_address": "yes",
},
{
"hostname": "somehost",
"username": "someuser",
"password": "somepwd",
"name": "somevmname",
"state": "powered-on",
"wait_for_ip_address": "yes",
},
],
"ids": [
"wait-present",
"wait-other",
],
}
@pytest.mark.parametrize('power_state', testcase_set_powerstate['params'], ids=testcase_set_powerstate['ids'])
def test_xenserver_guest_powerstate_set_power_state(mocker, fake_ansible_module, XenAPI, xenserver_guest_powerstate, power_state):
"""Tests power state change handling."""
mocker.patch('ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.get_object_ref',
return_value=fake_xenapi_ref('VM'))
mocker.patch('ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.gather_vm_params',
return_value={"power_state": "Someoldstate"})
mocked_set_vm_power_state = mocker.patch(
'ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.set_vm_power_state',
return_value=power_state)
mocked_xenapi = mocker.patch.object(XenAPI.Session, 'xenapi', create=True)
mocked_returns = {
"pool.get_all.return_value": [fake_xenapi_ref('pool')],
"pool.get_default_SR.return_value": fake_xenapi_ref('SR'),
}
mocked_xenapi.configure_mock(**mocked_returns)
mocker.patch('ansible_collections.community.general.plugins.module_utils.xenserver.get_xenserver_version', return_value=[7, 2, 0])
fake_ansible_module.params.update({
"name": "somename",
"uuid": "someuuid",
"state_change_timeout": 1,
})
vm = xenserver_guest_powerstate.XenServerVM(fake_ansible_module)
state_changed = vm.set_power_state(None)
mocked_set_vm_power_state.assert_called_once_with(fake_ansible_module, fake_xenapi_ref('VM'), None, 1)
assert state_changed == power_state[0]
assert vm.vm_params['power_state'] == power_state[1].capitalize()
@pytest.mark.parametrize('patch_ansible_module',
testcase_module_params_state_present['params'],
ids=testcase_module_params_state_present['ids'],
indirect=True)
def test_xenserver_guest_powerstate_present(mocker, patch_ansible_module, capfd, XenAPI, xenserver_guest_powerstate):
"""
Tests regular module invocation including parsing and propagation of
module params and module output when state is set to present.
"""
fake_vm_facts = {"fake-vm-fact": True}
mocker.patch('ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.get_object_ref',
return_value=fake_xenapi_ref('VM'))
mocker.patch('ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.gather_vm_params', return_value={})
mocker.patch('ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.gather_vm_facts',
return_value=fake_vm_facts)
mocked_set_vm_power_state = mocker.patch(
'ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.set_vm_power_state',
return_value=(True, "somenewstate"))
mocked_wait_for_vm_ip_address = mocker.patch(
'ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.wait_for_vm_ip_address',
return_value={})
mocked_xenapi = mocker.patch.object(XenAPI.Session, 'xenapi', create=True)
mocked_returns = {
"pool.get_all.return_value": [fake_xenapi_ref('pool')],
"pool.get_default_SR.return_value": fake_xenapi_ref('SR'),
}
mocked_xenapi.configure_mock(**mocked_returns)
mocker.patch('ansible_collections.community.general.plugins.module_utils.xenserver.get_xenserver_version', return_value=[7, 2, 0])
with pytest.raises(SystemExit):
xenserver_guest_powerstate.main()
out, err = capfd.readouterr()
result = json.loads(out)
mocked_set_vm_power_state.assert_not_called()
mocked_wait_for_vm_ip_address.assert_not_called()
assert result['changed'] is False
assert result['instance'] == fake_vm_facts
@pytest.mark.parametrize('patch_ansible_module',
testcase_module_params_state_other['params'],
ids=testcase_module_params_state_other['ids'],
indirect=True)
def test_xenserver_guest_powerstate_other(mocker, patch_ansible_module, capfd, XenAPI, xenserver_guest_powerstate):
"""
Tests regular module invocation including parsing and propagation of
module params and module output when state is set to other value than
present.
"""
fake_vm_facts = {"fake-vm-fact": True}
mocker.patch('ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.get_object_ref',
return_value=fake_xenapi_ref('VM'))
mocker.patch('ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.gather_vm_params', return_value={})
mocker.patch('ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.gather_vm_facts', return_value=fake_vm_facts)
mocked_set_vm_power_state = mocker.patch(
'ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.set_vm_power_state',
return_value=(True, "somenewstate"))
mocked_wait_for_vm_ip_address = mocker.patch(
'ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.wait_for_vm_ip_address',
return_value={})
mocked_xenapi = mocker.patch.object(XenAPI.Session, 'xenapi', create=True)
mocked_returns = {
"pool.get_all.return_value": [fake_xenapi_ref('pool')],
"pool.get_default_SR.return_value": fake_xenapi_ref('SR'),
}
mocked_xenapi.configure_mock(**mocked_returns)
mocker.patch('ansible_collections.community.general.plugins.module_utils.xenserver.get_xenserver_version', return_value=[7, 2, 0])
with pytest.raises(SystemExit):
xenserver_guest_powerstate.main()
out, err = capfd.readouterr()
result = json.loads(out)
mocked_set_vm_power_state.assert_called_once()
mocked_wait_for_vm_ip_address.assert_not_called()
assert result['changed'] is True
assert result['instance'] == fake_vm_facts
@pytest.mark.parametrize('patch_ansible_module',
testcase_module_params_wait['params'],
ids=testcase_module_params_wait['ids'],
indirect=True)
def test_xenserver_guest_powerstate_wait(mocker, patch_ansible_module, capfd, XenAPI, xenserver_guest_powerstate):
"""
Tests regular module invocation including parsing and propagation of
module params and module output when wait_for_ip_address option is used.
"""
fake_vm_facts = {"fake-vm-fact": True}
mocker.patch('ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.get_object_ref',
return_value=fake_xenapi_ref('VM'))
mocker.patch('ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.gather_vm_params', return_value={})
mocker.patch('ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.gather_vm_facts', return_value=fake_vm_facts)
mocked_set_vm_power_state = mocker.patch(
'ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.set_vm_power_state',
return_value=(True, "somenewstate"))
mocked_wait_for_vm_ip_address = mocker.patch(
'ansible_collections.community.general.plugins.modules.cloud.xenserver.xenserver_guest_powerstate.wait_for_vm_ip_address',
return_value={})
mocked_xenapi = mocker.patch.object(XenAPI.Session, 'xenapi', create=True)
mocked_returns = {
"pool.get_all.return_value": [fake_xenapi_ref('pool')],
"pool.get_default_SR.return_value": fake_xenapi_ref('SR'),
}
mocked_xenapi.configure_mock(**mocked_returns)
mocker.patch('ansible_collections.community.general.plugins.module_utils.xenserver.get_xenserver_version', return_value=[7, 2, 0])
with pytest.raises(SystemExit):
xenserver_guest_powerstate.main()
out, err = capfd.readouterr()
result = json.loads(out)
mocked_wait_for_vm_ip_address.assert_called_once()
assert result['instance'] == fake_vm_facts