New module: Add Pritunl VPN user module (net_tools/pritunl/) (#803)

This commit is contained in:
Florian Dambrine 2021-03-21 03:30:16 -07:00 committed by GitHub
parent 81f3ad45c9
commit 68fc48cd1f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 1768 additions and 0 deletions

View file

@ -0,0 +1,541 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2021, Florian Dambrine <android.florian@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
import json
import pytest
from ansible.module_utils.common.dict_transformations import dict_merge
from ansible.module_utils.six import iteritems
from ansible_collections.community.general.plugins.module_utils.net_tools.pritunl import api
from mock import MagicMock
__metaclass__ = type
# Pritunl Mocks
class PritunlListOrganizationMock(MagicMock):
"""Pritunl API Mock for organization GET API calls."""
def getcode(self):
return 200
def read(self):
return json.dumps(
[
{
"auth_api": False,
"name": "Foo",
"auth_token": None,
"user_count": 0,
"auth_secret": None,
"id": "csftwlu6uhralzi2dpmhekz3",
},
{
"auth_api": False,
"name": "GumGum",
"auth_token": None,
"user_count": 3,
"auth_secret": None,
"id": "58070daee63f3b2e6e472c36",
},
{
"auth_api": False,
"name": "Bar",
"auth_token": None,
"user_count": 0,
"auth_secret": None,
"id": "v1sncsxxybnsylc8gpqg85pg",
},
]
)
class PritunlListUserMock(MagicMock):
"""Pritunl API Mock for user GET API calls."""
def getcode(self):
return 200
def read(self):
return json.dumps(
[
{
"auth_type": "google",
"dns_servers": None,
"pin": True,
"dns_suffix": None,
"servers": [
{
"status": False,
"platform": None,
"server_id": "580711322bb66c1d59b9568f",
"virt_address6": "fd00:c0a8: 9700: 0: 192: 168: 101: 27",
"virt_address": "192.168.101.27",
"name": "vpn-A",
"real_address": None,
"connected_since": None,
"id": "580711322bb66c1d59b9568f",
"device_name": None,
},
{
"status": False,
"platform": None,
"server_id": "5dad2cc6e63f3b3f4a6dfea5",
"virt_address6": "fd00:c0a8:f200: 0: 192: 168: 201: 37",
"virt_address": "192.168.201.37",
"name": "vpn-B",
"real_address": None,
"connected_since": None,
"id": "5dad2cc6e63f3b3f4a6dfea5",
"device_name": None,
},
],
"disabled": False,
"network_links": [],
"port_forwarding": [],
"id": "58070dafe63f3b2e6e472c3b",
"organization_name": "GumGum",
"type": "server",
"email": "bot@company.com",
"status": True,
"dns_mapping": None,
"otp_secret": "123456789ABCDEFG",
"client_to_client": False,
"sso": "google",
"bypass_secondary": False,
"groups": ["admin", "multiregion"],
"audit": False,
"name": "bot",
"gravatar": True,
"otp_auth": True,
"organization": "58070daee63f3b2e6e472c36",
},
{
"auth_type": "google",
"dns_servers": None,
"pin": True,
"dns_suffix": None,
"servers": [
{
"status": False,
"platform": None,
"server_id": "580711322bb66c1d59b9568f",
"virt_address6": "fd00:c0a8: 9700: 0: 192: 168: 101: 27",
"virt_address": "192.168.101.27",
"name": "vpn-A",
"real_address": None,
"connected_since": None,
"id": "580711322bb66c1d59b9568f",
"device_name": None,
},
{
"status": False,
"platform": None,
"server_id": "5dad2cc6e63f3b3f4a6dfea5",
"virt_address6": "fd00:c0a8:f200: 0: 192: 168: 201: 37",
"virt_address": "192.168.201.37",
"name": "vpn-B",
"real_address": None,
"connected_since": None,
"id": "5dad2cc6e63f3b3f4a6dfea5",
"device_name": None,
},
],
"disabled": False,
"network_links": [],
"port_forwarding": [],
"id": "58070dafe63f3b2e6e472c3b",
"organization_name": "GumGum",
"type": "client",
"email": "florian@company.com",
"status": True,
"dns_mapping": None,
"otp_secret": "123456789ABCDEFG",
"client_to_client": False,
"sso": "google",
"bypass_secondary": False,
"groups": ["web", "database"],
"audit": False,
"name": "florian",
"gravatar": True,
"otp_auth": True,
"organization": "58070daee63f3b2e6e472c36",
},
{
"auth_type": "google",
"dns_servers": None,
"pin": True,
"dns_suffix": None,
"servers": [
{
"status": False,
"platform": None,
"server_id": "580711322bb66c1d59b9568f",
"virt_address6": "fd00:c0a8: 9700: 0: 192: 168: 101: 27",
"virt_address": "192.168.101.27",
"name": "vpn-A",
"real_address": None,
"connected_since": None,
"id": "580711322bb66c1d59b9568f",
"device_name": None,
},
{
"status": False,
"platform": None,
"server_id": "5dad2cc6e63f3b3f4a6dfea5",
"virt_address6": "fd00:c0a8:f200: 0: 192: 168: 201: 37",
"virt_address": "192.168.201.37",
"name": "vpn-B",
"real_address": None,
"connected_since": None,
"id": "5dad2cc6e63f3b3f4a6dfea5",
"device_name": None,
},
],
"disabled": False,
"network_links": [],
"port_forwarding": [],
"id": "58070dafe63f3b2e6e472c3b",
"organization_name": "GumGum",
"type": "server",
"email": "ops@company.com",
"status": True,
"dns_mapping": None,
"otp_secret": "123456789ABCDEFG",
"client_to_client": False,
"sso": "google",
"bypass_secondary": False,
"groups": ["web", "database"],
"audit": False,
"name": "ops",
"gravatar": True,
"otp_auth": True,
"organization": "58070daee63f3b2e6e472c36",
},
]
)
class PritunlErrorMock(MagicMock):
"""Pritunl API Mock for API call failures."""
def getcode(self):
return 500
def read(self):
return "{}"
class PritunlPostUserMock(MagicMock):
"""Pritunl API Mock for POST API calls."""
def getcode(self):
return 200
def read(self):
return json.dumps(
[
{
"auth_type": "local",
"disabled": False,
"dns_servers": None,
"otp_secret": "6M4UWP2BCJBSYZAT",
"name": "alice",
"pin": False,
"dns_suffix": None,
"client_to_client": False,
"email": "alice@company.com",
"organization_name": "GumGum",
"bypass_secondary": False,
"groups": ["a", "b"],
"organization": "58070daee63f3b2e6e472c36",
"port_forwarding": [],
"type": "client",
"id": "590add71e63f3b72d8bb951a",
}
]
)
class PritunlPutUserMock(MagicMock):
"""Pritunl API Mock for PUT API calls."""
def getcode(self):
return 200
def read(self):
return json.dumps(
{
"auth_type": "local",
"disabled": True,
"dns_servers": None,
"otp_secret": "WEJANJYMF3Q2QSLG",
"name": "bob",
"pin": False,
"dns_suffix": False,
"client_to_client": False,
"email": "bob@company.com",
"organization_name": "GumGum",
"bypass_secondary": False,
"groups": ["c", "d"],
"organization": "58070daee63f3b2e6e472c36",
"port_forwarding": [],
"type": "client",
"id": "590add71e63f3b72d8bb951a",
}
)
class PritunlDeleteUserMock(MagicMock):
"""Pritunl API Mock for DELETE API calls."""
def getcode(self):
return 200
def read(self):
return "{}"
# Ansible Module Mock and Pytest mock fixtures
class ModuleFailException(Exception):
def __init__(self, msg, **kwargs):
super(ModuleFailException, self).__init__(msg)
self.fail_msg = msg
self.fail_kwargs = kwargs
@pytest.fixture
def pritunl_settings():
return {
"api_token": "token",
"api_secret": "secret",
"base_url": "https://pritunl.domain.com",
"validate_certs": True,
}
@pytest.fixture
def pritunl_user_data():
return {
"name": "alice",
"email": "alice@company.com",
"groups": ["a", "b"],
"disabled": False,
"type": "client",
}
@pytest.fixture
def get_pritunl_organization_mock():
return PritunlListOrganizationMock()
@pytest.fixture
def get_pritunl_user_mock():
return PritunlListUserMock()
@pytest.fixture
def get_pritunl_error_mock():
return PritunlErrorMock()
@pytest.fixture
def post_pritunl_user_mock():
return PritunlPostUserMock()
@pytest.fixture
def put_pritunl_user_mock():
return PritunlPutUserMock()
@pytest.fixture
def delete_pritunl_user_mock():
return PritunlDeleteUserMock()
class TestPritunlApi:
"""
Test class to validate CRUD operations on Pritunl.
"""
# Test for GET / list operation on Pritunl API
@pytest.mark.parametrize(
"org_id,org_user_count",
[
("58070daee63f3b2e6e472c36", 3),
("v1sncsxxybnsylc8gpqg85pg", 0),
],
)
def test_list_all_pritunl_organization(
self,
pritunl_settings,
get_pritunl_organization_mock,
org_id,
org_user_count,
):
api._get_pritunl_organizations = get_pritunl_organization_mock()
response = api.list_pritunl_organizations(**pritunl_settings)
assert len(response) == 3
for org in response:
if org["id"] == org_id:
org["user_count"] == org_user_count
@pytest.mark.parametrize(
"org_filters,org_expected",
[
({"id": "58070daee63f3b2e6e472c36"}, "GumGum"),
({"name": "GumGum"}, "GumGum"),
],
)
def test_list_filtered_pritunl_organization(
self,
pritunl_settings,
get_pritunl_organization_mock,
org_filters,
org_expected,
):
api._get_pritunl_organizations = get_pritunl_organization_mock()
response = api.list_pritunl_organizations(
**dict_merge(pritunl_settings, {"filters": org_filters})
)
assert len(response) == 1
assert response[0]["name"] == org_expected
@pytest.mark.parametrize(
"org_id,org_user_count",
[("58070daee63f3b2e6e472c36", 3)],
)
def test_list_all_pritunl_user(
self, pritunl_settings, get_pritunl_user_mock, org_id, org_user_count
):
api._get_pritunl_users = get_pritunl_user_mock()
response = api.list_pritunl_users(
**dict_merge(pritunl_settings, {"organization_id": org_id})
)
assert len(response) == org_user_count
@pytest.mark.parametrize(
"org_id,user_filters,user_expected",
[
("58070daee63f3b2e6e472c36", {"email": "bot@company.com"}, "bot"),
("58070daee63f3b2e6e472c36", {"name": "florian"}, "florian"),
],
)
def test_list_filtered_pritunl_user(
self,
pritunl_settings,
get_pritunl_user_mock,
org_id,
user_filters,
user_expected,
):
api._get_pritunl_users = get_pritunl_user_mock()
response = api.list_pritunl_users(
**dict_merge(
pritunl_settings, {"organization_id": org_id, "filters": user_filters}
)
)
assert len(response) > 0
for user in response:
assert user["organization"] == org_id
assert user["name"] == user_expected
# Test for POST operation on Pritunl API
@pytest.mark.parametrize("org_id", [("58070daee63f3b2e6e472c36")])
def test_add_and_update_pritunl_user(
self,
pritunl_settings,
pritunl_user_data,
post_pritunl_user_mock,
put_pritunl_user_mock,
org_id,
):
api._post_pritunl_user = post_pritunl_user_mock()
api._put_pritunl_user = put_pritunl_user_mock()
create_response = api.post_pritunl_user(
**dict_merge(
pritunl_settings,
{
"organization_id": org_id,
"user_data": pritunl_user_data,
},
)
)
# Ensure provided settings match with the ones returned by Pritunl
for k, v in iteritems(pritunl_user_data):
assert create_response[k] == v
# Update the newly created user to ensure only certain settings are changed
user_updates = {
"name": "bob",
"email": "bob@company.com",
"disabled": True,
}
update_response = api.post_pritunl_user(
**dict_merge(
pritunl_settings,
{
"organization_id": org_id,
"user_id": create_response["id"],
"user_data": dict_merge(pritunl_user_data, user_updates),
},
)
)
# Ensure only certain settings changed and the rest remained untouched.
for k, v in iteritems(update_response):
if k in update_response:
assert update_response[k] == v
else:
assert update_response[k] == create_response[k]
# Test for DELETE operation on Pritunl API
@pytest.mark.parametrize(
"org_id,user_id", [("58070daee63f3b2e6e472c36", "590add71e63f3b72d8bb951a")]
)
def test_delete_pritunl_user(
self, pritunl_settings, org_id, user_id, delete_pritunl_user_mock
):
api._delete_pritunl_user = delete_pritunl_user_mock()
response = api.delete_pritunl_user(
**dict_merge(
pritunl_settings,
{
"organization_id": org_id,
"user_id": user_id,
},
)
)
assert response == {}
# Test API call errors
def test_pritunl_error(self, pritunl_settings, get_pritunl_error_mock):
api.pritunl_auth_request = get_pritunl_error_mock()
with pytest.raises(api.PritunlException):
response = api.list_pritunl_organizations(**pritunl_settings)

View file

@ -0,0 +1,208 @@
# -*- coding: utf-8 -*-
# (c) 2021 Florian Dambrine <android.florian@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
import sys
from ansible.module_utils.common.dict_transformations import dict_merge
from ansible.module_utils.six import iteritems
from ansible_collections.community.general.plugins.modules.net_tools.pritunl import (
pritunl_user,
)
from ansible_collections.community.general.tests.unit.compat.mock import patch
from ansible_collections.community.general.tests.unit.plugins.module_utils.net_tools.pritunl.test_api import (
PritunlDeleteUserMock,
PritunlListOrganizationMock,
PritunlListUserMock,
PritunlPostUserMock,
PritunlPutUserMock,
)
from ansible_collections.community.general.tests.unit.plugins.modules.utils import (
AnsibleExitJson,
AnsibleFailJson,
ModuleTestCase,
set_module_args,
)
__metaclass__ = type
def mock_pritunl_api(func, **kwargs):
def wrapped(self=None):
with self.patch_get_pritunl_organizations(
side_effect=PritunlListOrganizationMock
):
with self.patch_get_pritunl_users(side_effect=PritunlListUserMock):
with self.patch_add_pritunl_users(side_effect=PritunlPostUserMock):
with self.patch_delete_pritunl_users(
side_effect=PritunlDeleteUserMock
):
func(self, **kwargs)
return wrapped
class TestPritunlUser(ModuleTestCase):
def setUp(self):
super(TestPritunlUser, self).setUp()
self.module = pritunl_user
# Add backward compatibility
if sys.version_info < (3, 2):
self.assertRegex = self.assertRegexpMatches
def tearDown(self):
super(TestPritunlUser, self).tearDown()
def patch_get_pritunl_users(self, **kwds):
return patch(
"ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api._get_pritunl_users",
autospec=True,
**kwds
)
def patch_add_pritunl_users(self, **kwds):
return patch(
"ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api._post_pritunl_user",
autospec=True,
**kwds
)
def patch_update_pritunl_users(self, **kwds):
return patch(
"ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api._put_pritunl_user",
autospec=True,
**kwds
)
def patch_delete_pritunl_users(self, **kwds):
return patch(
"ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api._delete_pritunl_user",
autospec=True,
**kwds
)
def patch_get_pritunl_organizations(self, **kwds):
return patch(
"ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api._get_pritunl_organizations",
autospec=True,
**kwds
)
def test_without_parameters(self):
"""Test without parameters"""
set_module_args({})
with self.assertRaises(AnsibleFailJson):
self.module.main()
@mock_pritunl_api
def test_present(self):
"""Test Pritunl user creation and update."""
user_params = {
"user_name": "alice",
"user_email": "alice@company.com",
}
set_module_args(
dict_merge(
{
"pritunl_api_token": "token",
"pritunl_api_secret": "secret",
"pritunl_url": "https://pritunl.domain.com",
"organization": "GumGum",
},
user_params,
)
)
with self.patch_update_pritunl_users(
side_effect=PritunlPostUserMock
) as post_mock:
with self.assertRaises(AnsibleExitJson) as create_result:
self.module.main()
create_exc = create_result.exception.args[0]
self.assertTrue(create_exc["changed"])
self.assertEqual(create_exc["response"]["name"], user_params["user_name"])
self.assertEqual(create_exc["response"]["email"], user_params["user_email"])
self.assertFalse(create_exc["response"]["disabled"])
# Changing user from alice to bob should update certain fields only
new_user_params = {
"user_name": "bob",
"user_email": "bob@company.com",
"user_disabled": True,
}
set_module_args(
dict_merge(
{
"pritunl_api_token": "token",
"pritunl_api_secret": "secret",
"pritunl_url": "https://pritunl.domain.com",
"organization": "GumGum",
},
new_user_params,
)
)
with self.patch_update_pritunl_users(
side_effect=PritunlPutUserMock
) as put_mock:
with self.assertRaises(AnsibleExitJson) as update_result:
self.module.main()
update_exc = update_result.exception.args[0]
# Ensure only certain settings changed and the rest remained untouched.
for k, v in iteritems(update_exc):
if k in new_user_params:
assert update_exc[k] == v
else:
assert update_exc[k] == create_exc[k]
@mock_pritunl_api
def test_absent(self):
"""Test user removal from Pritunl."""
set_module_args(
{
"state": "absent",
"pritunl_api_token": "token",
"pritunl_api_secret": "secret",
"pritunl_url": "https://pritunl.domain.com",
"organization": "GumGum",
"user_name": "florian",
}
)
with self.assertRaises(AnsibleExitJson) as result:
self.module.main()
exc = result.exception.args[0]
self.assertTrue(exc["changed"])
self.assertEqual(exc["response"], {})
@mock_pritunl_api
def test_absent_failure(self):
"""Test user removal from a non existing organization."""
set_module_args(
{
"state": "absent",
"pritunl_api_token": "token",
"pritunl_api_secret": "secret",
"pritunl_url": "https://pritunl.domain.com",
"organization": "Unknown",
"user_name": "floria@company.com",
}
)
with self.assertRaises(AnsibleFailJson) as result:
self.module.main()
exc = result.exception.args[0]
self.assertRegex(exc["msg"], "Can not remove user")

View file

@ -0,0 +1,160 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2021, Florian Dambrine <android.florian@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
import sys
from ansible_collections.community.general.plugins.modules.net_tools.pritunl import (
pritunl_user_info,
)
from ansible_collections.community.general.tests.unit.compat.mock import patch
from ansible_collections.community.general.tests.unit.plugins.module_utils.net_tools.pritunl.test_api import (
PritunlListOrganizationMock,
PritunlListUserMock,
)
from ansible_collections.community.general.tests.unit.plugins.modules.utils import (
AnsibleExitJson,
AnsibleFailJson,
ModuleTestCase,
set_module_args,
)
__metaclass__ = type
class TestPritunlUserInfo(ModuleTestCase):
def setUp(self):
super(TestPritunlUserInfo, self).setUp()
self.module = pritunl_user_info
# Add backward compatibility
if sys.version_info < (3, 2):
self.assertRegex = self.assertRegexpMatches
def tearDown(self):
super(TestPritunlUserInfo, self).tearDown()
def patch_get_pritunl_users(self, **kwds):
return patch(
"ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api._get_pritunl_users",
autospec=True,
**kwds
)
def patch_get_pritunl_organizations(self, **kwds):
return patch(
"ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api._get_pritunl_organizations",
autospec=True,
**kwds
)
def test_without_parameters(self):
"""Test without parameters"""
with self.patch_get_pritunl_organizations(
side_effect=PritunlListOrganizationMock
) as org_mock:
with self.patch_get_pritunl_users(
side_effect=PritunlListUserMock
) as user_mock:
set_module_args({})
with self.assertRaises(AnsibleFailJson):
self.module.main()
self.assertEqual(org_mock.call_count, 0)
self.assertEqual(user_mock.call_count, 0)
def test_missing_organization(self):
"""Failure must occur when the requested organization is not found."""
with self.patch_get_pritunl_organizations(
side_effect=PritunlListOrganizationMock
) as org_mock:
with self.patch_get_pritunl_users(
side_effect=PritunlListUserMock
) as user_mock:
with self.assertRaises(AnsibleFailJson) as result:
set_module_args(
{
"pritunl_api_token": "token",
"pritunl_api_secret": "secret",
"pritunl_url": "https://pritunl.domain.com",
"organization": "Unknown",
}
)
self.module.main()
self.assertEqual(org_mock.call_count, 1)
self.assertEqual(user_mock.call_count, 0)
exc = result.exception.args[0]
self.assertRegex(exc["msg"], "Can not list users from the organization")
def test_get_all_client_users_from_organization(self):
"""
The list of all Pritunl client users from the organization must be returned when no user specified.
"""
expected_user_type = "client"
with self.patch_get_pritunl_organizations(
side_effect=PritunlListOrganizationMock
) as org_mock:
with self.patch_get_pritunl_users(
side_effect=PritunlListUserMock
) as user_mock:
with self.assertRaises(AnsibleExitJson) as result:
set_module_args(
{
"pritunl_api_token": "token",
"pritunl_api_secret": "secret",
"pritunl_url": "https://pritunl.domain.com",
"organization": "GumGum",
}
)
self.module.main()
self.assertEqual(org_mock.call_count, 1)
self.assertEqual(user_mock.call_count, 1)
exc = result.exception.args[0]
# module should not report changes
self.assertFalse(exc["changed"])
# user_type when not provided is set client and should only return client user type
self.assertEqual(len(exc["users"]), 1)
for user in exc["users"]:
self.assertEqual(user["type"], expected_user_type)
def test_get_specific_server_user_from_organization(self):
"""
Retrieving a specific user from the organization must return a single record.
"""
expected_user_type = "server"
expected_user_name = "ops"
with self.patch_get_pritunl_organizations(
side_effect=PritunlListOrganizationMock
) as org_mock:
with self.patch_get_pritunl_users(
side_effect=PritunlListUserMock
) as user_mock:
with self.assertRaises(AnsibleExitJson) as result:
set_module_args(
{
"pritunl_api_token": "token",
"pritunl_api_secret": "secret",
"pritunl_url": "https://pritunl.domain.com",
"organization": "GumGum",
"user_name": expected_user_name,
"user_type": expected_user_type,
}
)
self.module.main()
self.assertEqual(org_mock.call_count, 1)
self.assertEqual(user_mock.call_count, 1)
exc = result.exception.args[0]
# module should not report changes
self.assertFalse(exc["changed"])
self.assertEqual(len(exc["users"]), 1)
for user in exc["users"]:
self.assertEqual(user["type"], expected_user_type)
self.assertEqual(user["name"], expected_user_name)