New module: Add Pritunl VPN user module (net_tools/pritunl/) (#803) (#2071)

(cherry picked from commit 68fc48cd1f)

Co-authored-by: Florian Dambrine <Lowess@users.noreply.github.com>
This commit is contained in:
patchback[bot] 2021-03-21 11:46:33 +01:00 committed by GitHub
parent ca81a5cf2f
commit 9bfd61e117
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 1768 additions and 0 deletions

View file

@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2021, Florian Dambrine <android.florian@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
class ModuleDocFragment(object):
DOCUMENTATION = r"""
options:
pritunl_url:
type: str
required: true
description:
- URL and port of the Pritunl server on which the API is enabled.
pritunl_api_token:
type: str
required: true
description:
- API Token of a Pritunl admin user.
- It needs to be enabled in Administrators > USERNAME > Enable Token Authentication.
pritunl_api_secret:
type: str
required: true
description:
- API Secret found in Administrators > USERNAME > API Secret.
validate_certs:
type: bool
required: false
default: true
description:
- If certificates should be validated or not.
- This should never be set to C(false), except if you are very sure that
your connection to the server can not be subject to a Man In The Middle
attack.
"""

View file

@ -0,0 +1,300 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2021, Florian Dambrine <android.florian@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""
Pritunl API that offers CRUD operations on Pritunl Organizations and Users
"""
from __future__ import absolute_import, division, print_function
import base64
import hashlib
import hmac
import json
import time
import uuid
from ansible.module_utils.six import iteritems
from ansible.module_utils.urls import open_url
__metaclass__ = type
class PritunlException(Exception):
pass
def pritunl_argument_spec():
return dict(
pritunl_url=dict(required=True, type="str"),
pritunl_api_token=dict(required=True, type="str", no_log=False),
pritunl_api_secret=dict(required=True, type="str", no_log=True),
validate_certs=dict(required=False, type="bool", default=True),
)
def get_pritunl_settings(module):
"""
Helper function to set required Pritunl request params from module arguments.
"""
return {
"api_token": module.params.get("pritunl_api_token"),
"api_secret": module.params.get("pritunl_api_secret"),
"base_url": module.params.get("pritunl_url"),
"validate_certs": module.params.get("validate_certs"),
}
def _get_pritunl_organizations(api_token, api_secret, base_url, validate_certs=True):
return pritunl_auth_request(
base_url=base_url,
api_token=api_token,
api_secret=api_secret,
method="GET",
path="/organization",
validate_certs=validate_certs,
)
def _get_pritunl_users(
api_token, api_secret, base_url, organization_id, validate_certs=True
):
return pritunl_auth_request(
api_token=api_token,
api_secret=api_secret,
base_url=base_url,
method="GET",
path="/user/%s" % organization_id,
validate_certs=validate_certs,
)
def _delete_pritunl_user(
api_token, api_secret, base_url, organization_id, user_id, validate_certs=True
):
return pritunl_auth_request(
api_token=api_token,
api_secret=api_secret,
base_url=base_url,
method="DELETE",
path="/user/%s/%s" % (organization_id, user_id),
validate_certs=validate_certs,
)
def _post_pritunl_user(
api_token, api_secret, base_url, organization_id, user_data, validate_certs=True
):
return pritunl_auth_request(
api_token=api_token,
api_secret=api_secret,
base_url=base_url,
method="POST",
path="/user/%s" % organization_id,
headers={"Content-Type": "application/json"},
data=json.dumps(user_data),
validate_certs=validate_certs,
)
def _put_pritunl_user(
api_token,
api_secret,
base_url,
organization_id,
user_id,
user_data,
validate_certs=True,
):
return pritunl_auth_request(
api_token=api_token,
api_secret=api_secret,
base_url=base_url,
method="PUT",
path="/user/%s/%s" % (organization_id, user_id),
headers={"Content-Type": "application/json"},
data=json.dumps(user_data),
validate_certs=validate_certs,
)
def list_pritunl_organizations(
api_token, api_secret, base_url, validate_certs=True, filters=None
):
orgs = []
response = _get_pritunl_organizations(
api_token=api_token,
api_secret=api_secret,
base_url=base_url,
validate_certs=validate_certs,
)
if response.getcode() != 200:
raise PritunlException("Could not retrieve organizations from Pritunl")
else:
for org in json.loads(response.read()):
# No filtering
if filters is None:
orgs.append(org)
else:
if not any(
filter_val != org[filter_key]
for filter_key, filter_val in iteritems(filters)
):
orgs.append(org)
return orgs
def list_pritunl_users(
api_token, api_secret, base_url, organization_id, validate_certs=True, filters=None
):
users = []
response = _get_pritunl_users(
api_token=api_token,
api_secret=api_secret,
base_url=base_url,
validate_certs=validate_certs,
organization_id=organization_id,
)
if response.getcode() != 200:
raise PritunlException("Could not retrieve users from Pritunl")
else:
for user in json.loads(response.read()):
# No filtering
if filters is None:
users.append(user)
else:
if not any(
filter_val != user[filter_key]
for filter_key, filter_val in iteritems(filters)
):
users.append(user)
return users
def post_pritunl_user(
api_token,
api_secret,
base_url,
organization_id,
user_data,
user_id=None,
validate_certs=True,
):
# If user_id is provided will do PUT otherwise will do POST
if user_id is None:
response = _post_pritunl_user(
api_token=api_token,
api_secret=api_secret,
base_url=base_url,
organization_id=organization_id,
user_data=user_data,
validate_certs=True,
)
if response.getcode() != 200:
raise PritunlException(
"Could not remove user %s from organization %s from Pritunl"
% (user_id, organization_id)
)
# user POST request returns an array of a single item,
# so return this item instead of the list
return json.loads(response.read())[0]
else:
response = _put_pritunl_user(
api_token=api_token,
api_secret=api_secret,
base_url=base_url,
organization_id=organization_id,
user_data=user_data,
user_id=user_id,
validate_certs=True,
)
if response.getcode() != 200:
raise PritunlException(
"Could not update user %s from organization %s from Pritunl"
% (user_id, organization_id)
)
# The user PUT request returns the updated user object
return json.loads(response.read())
def delete_pritunl_user(
api_token, api_secret, base_url, organization_id, user_id, validate_certs=True
):
response = _delete_pritunl_user(
api_token=api_token,
api_secret=api_secret,
base_url=base_url,
organization_id=organization_id,
user_id=user_id,
validate_certs=True,
)
if response.getcode() != 200:
raise PritunlException(
"Could not remove user %s from organization %s from Pritunl"
% (user_id, organization_id)
)
return json.loads(response.read())
def pritunl_auth_request(
api_token,
api_secret,
base_url,
method,
path,
validate_certs=True,
headers=None,
data=None,
):
"""
Send an API call to a Pritunl server.
Taken from https://pritunl.com/api and adaped work with Ansible open_url
"""
auth_timestamp = str(int(time.time()))
auth_nonce = uuid.uuid4().hex
auth_string = "&".join(
[api_token, auth_timestamp, auth_nonce, method.upper(), path]
+ ([data] if data else [])
)
auth_signature = base64.b64encode(
hmac.new(
api_secret.encode("utf-8"), auth_string.encode("utf-8"), hashlib.sha256
).digest()
)
auth_headers = {
"Auth-Token": api_token,
"Auth-Timestamp": auth_timestamp,
"Auth-Nonce": auth_nonce,
"Auth-Signature": auth_signature,
}
if headers:
auth_headers.update(headers)
try:
uri = "%s%s" % (base_url, path)
return open_url(
uri,
method=method.upper(),
headers=auth_headers,
data=data,
validate_certs=validate_certs,
)
except Exception as e:
raise PritunlException(e)

View file

@ -0,0 +1,343 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: (c) 2021, Florian Dambrine <android.florian@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = """
---
module: pritunl_user
author: "Florian Dambrine (@Lowess)"
version_added: 2.3.0
short_description: Manage Pritunl Users using the Pritunl API
description:
- A module to manage Pritunl users using the Pritunl API.
extends_documentation_fragment:
- community.general.pritunl
options:
organization:
type: str
required: true
aliases:
- org
description:
- The name of the organization the user is part of.
state:
type: str
default: 'present'
choices:
- present
- absent
description:
- If C(present), the module adds user I(user_name) to
the Pritunl I(organization). If C(absent), removes the user
I(user_name) from the Pritunl I(organization).
user_name:
type: str
required: true
default: null
description:
- Name of the user to create or delete from Pritunl.
user_email:
type: str
required: false
default: null
description:
- Email address associated with the user I(user_name).
user_type:
type: str
required: false
default: client
choices:
- client
- server
description:
- Type of the user I(user_name).
user_groups:
type: list
elements: str
required: false
default: null
description:
- List of groups associated with the user I(user_name).
user_disabled:
type: bool
required: false
default: null
description:
- Enable/Disable the user I(user_name).
user_gravatar:
type: bool
required: false
default: null
description:
- Enable/Disable Gravatar usage for the user I(user_name).
"""
EXAMPLES = """
- name: Create the user Foo with email address foo@bar.com in MyOrg
community.general.pritunl_user:
state: present
name: MyOrg
user_name: Foo
user_email: foo@bar.com
- name: Disable the user Foo but keep it in Pritunl
community.general.pritunl_user:
state: present
name: MyOrg
user_name: Foo
user_email: foo@bar.com
user_disabled: yes
- name: Make sure the user Foo is not part of MyOrg anymore
community.general.pritunl_user:
state: absent
name: MyOrg
user_name: Foo
"""
RETURN = """
response:
description: JSON representation of Pritunl Users.
returned: success
type: dict
sample:
{
"audit": false,
"auth_type": "google",
"bypass_secondary": false,
"client_to_client": false,
"disabled": false,
"dns_mapping": null,
"dns_servers": null,
"dns_suffix": null,
"email": "foo@bar.com",
"gravatar": true,
"groups": [
"foo", "bar"
],
"id": "5d070dafe63q3b2e6s472c3b",
"name": "foo@acme.com",
"network_links": [],
"organization": "58070daee6sf342e6e4s2c36",
"organization_name": "Acme",
"otp_auth": true,
"otp_secret": "35H5EJA3XB2$4CWG",
"pin": false,
"port_forwarding": [],
"servers": [],
}
"""
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_native
from ansible.module_utils.common.dict_transformations import dict_merge
from ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api import (
PritunlException,
delete_pritunl_user,
get_pritunl_settings,
list_pritunl_organizations,
list_pritunl_users,
post_pritunl_user,
pritunl_argument_spec,
)
def add_or_update_pritunl_user(module):
result = {}
org_name = module.params.get("organization")
user_name = module.params.get("user_name")
user_params = {
"name": user_name,
"email": module.params.get("user_email"),
"groups": module.params.get("user_groups"),
"disabled": module.params.get("user_disabled"),
"gravatar": module.params.get("user_gravatar"),
"type": module.params.get("user_type"),
}
org_obj_list = list_pritunl_organizations(
**dict_merge(
get_pritunl_settings(module),
{"filters": {"name": org_name}},
)
)
if len(org_obj_list) == 0:
module.fail_json(
msg="Can not add user to organization '%s' which does not exist" % org_name
)
org_id = org_obj_list[0]["id"]
# Grab existing users from this org
users = list_pritunl_users(
**dict_merge(
get_pritunl_settings(module),
{
"organization_id": org_id,
"filters": {"name": user_name},
},
)
)
# Check if the pritunl user already exists
if len(users) > 0:
# Compare remote user params with local user_params and trigger update if needed
user_params_changed = False
for key in user_params.keys():
# When a param is not specified grab existing ones to prevent from changing it with the PUT request
if user_params[key] is None:
user_params[key] = users[0][key]
# 'groups' is a list comparison
if key == "groups":
if set(users[0][key]) != set(user_params[key]):
user_params_changed = True
# otherwise it is either a boolean or a string
else:
if users[0][key] != user_params[key]:
user_params_changed = True
# Trigger a PUT on the API to update the current user if settings have changed
if user_params_changed:
response = post_pritunl_user(
**dict_merge(
get_pritunl_settings(module),
{
"organization_id": org_id,
"user_id": users[0]["id"],
"user_data": user_params,
},
)
)
result["changed"] = True
result["response"] = response
else:
result["changed"] = False
result["response"] = users
else:
response = post_pritunl_user(
**dict_merge(
get_pritunl_settings(module),
{
"organization_id": org_id,
"user_data": user_params,
},
)
)
result["changed"] = True
result["response"] = response
module.exit_json(**result)
def remove_pritunl_user(module):
result = {}
org_name = module.params.get("organization")
user_name = module.params.get("user_name")
org_obj_list = []
org_obj_list = list_pritunl_organizations(
**dict_merge(
get_pritunl_settings(module),
{
"filters": {"name": org_name},
},
)
)
if len(org_obj_list) == 0:
module.fail_json(
msg="Can not remove user '%s' from a non existing organization '%s'"
% (user_name, org_name)
)
org_id = org_obj_list[0]["id"]
# Grab existing users from this org
users = list_pritunl_users(
**dict_merge(
get_pritunl_settings(module),
{
"organization_id": org_id,
"filters": {"name": user_name},
},
)
)
# Check if the pritunl user exists, if not, do nothing
if len(users) == 0:
result["changed"] = False
result["response"] = {}
# Otherwise remove the org from Pritunl
else:
response = delete_pritunl_user(
**dict_merge(
get_pritunl_settings(module),
{
"organization_id": org_id,
"user_id": users[0]["id"],
},
)
)
result["changed"] = True
result["response"] = response
module.exit_json(**result)
def main():
argument_spec = pritunl_argument_spec()
argument_spec.update(
dict(
organization=dict(required=True, type="str", aliases=["org"]),
state=dict(
required=False, choices=["present", "absent"], default="present"
),
user_name=dict(required=True, type="str"),
user_type=dict(
required=False, choices=["client", "server"], default="client"
),
user_email=dict(required=False, type="str", default=None),
user_groups=dict(required=False, type="list", elements="str", default=None),
user_disabled=dict(required=False, type="bool", default=None),
user_gravatar=dict(required=False, type="bool", default=None),
)
),
module = AnsibleModule(argument_spec=argument_spec)
state = module.params.get("state")
try:
if state == "present":
add_or_update_pritunl_user(module)
elif state == "absent":
remove_pritunl_user(module)
except PritunlException as e:
module.fail_json(msg=to_native(e))
if __name__ == "__main__":
main()

View file

@ -0,0 +1,171 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: (c) 2021, Florian Dambrine <android.florian@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = """
---
module: pritunl_user_info
author: "Florian Dambrine (@Lowess)"
version_added: 2.3.0
short_description: List Pritunl Users using the Pritunl API
description:
- A module to list Pritunl users using the Pritunl API.
extends_documentation_fragment:
- community.general.pritunl
options:
organization:
type: str
required: true
aliases:
- org
description:
- The name of the organization the user is part of.
user_name:
type: str
required: false
description:
- Name of the user to filter on Pritunl.
user_type:
type: str
required: false
default: client
choices:
- client
- server
description:
- Type of the user I(user_name).
"""
EXAMPLES = """
- name: List all existing users part of the organization MyOrg
community.general.pritunl_user_info:
state: list
organization: MyOrg
- name: Search for the user named Florian part of the organization MyOrg
community.general.pritunl_user_info:
state: list
organization: MyOrg
user_name: Florian
"""
RETURN = """
users:
description: List of Pritunl users.
returned: success
type: list
elements: dict
sample:
[
{
"audit": false,
"auth_type": "google",
"bypass_secondary": false,
"client_to_client": false,
"disabled": false,
"dns_mapping": null,
"dns_servers": null,
"dns_suffix": null,
"email": "foo@bar.com",
"gravatar": true,
"groups": [
"foo", "bar"
],
"id": "5d070dafe63q3b2e6s472c3b",
"name": "foo@acme.com",
"network_links": [],
"organization": "58070daee6sf342e6e4s2c36",
"organization_name": "Acme",
"otp_auth": true,
"otp_secret": "35H5EJA3XB2$4CWG",
"pin": false,
"port_forwarding": [],
"servers": [],
}
]
"""
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_native
from ansible.module_utils.common.dict_transformations import dict_merge
from ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api import (
PritunlException,
get_pritunl_settings,
list_pritunl_organizations,
list_pritunl_users,
pritunl_argument_spec,
)
def get_pritunl_user(module):
user_name = module.params.get("user_name")
user_type = module.params.get("user_type")
org_name = module.params.get("organization")
org_obj_list = []
org_obj_list = list_pritunl_organizations(
**dict_merge(get_pritunl_settings(module), {"filters": {"name": org_name}})
)
if len(org_obj_list) == 0:
module.fail_json(
msg="Can not list users from the organization '%s' which does not exist"
% org_name
)
org_id = org_obj_list[0]["id"]
users = list_pritunl_users(
**dict_merge(
get_pritunl_settings(module),
{
"organization_id": org_id,
"filters": (
{"type": user_type}
if user_name is None
else {"name": user_name, "type": user_type}
),
},
)
)
result = {}
result["changed"] = False
result["users"] = users
module.exit_json(**result)
def main():
argument_spec = pritunl_argument_spec()
argument_spec.update(
dict(
organization=dict(required=True, type="str", aliases=["org"]),
user_name=dict(required=False, type="str", default=None),
user_type=dict(
required=False,
choices=["client", "server"],
default="client",
),
)
),
module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=True)
try:
get_pritunl_user(module)
except PritunlException as e:
module.fail_json(msg=to_native(e))
if __name__ == "__main__":
main()

View file

@ -0,0 +1 @@
./net_tools/pritunl/pritunl_user.py

View file

@ -0,0 +1 @@
net_tools/pritunl/pritunl_user_info.py