Add gitlab_merge_request_rule.py

This commit is contained in:
mil1i 2025-04-28 10:46:43 -06:00
commit 09fc96fcf5
No known key found for this signature in database

View file

@ -0,0 +1,454 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Based on code:
# Copyright (c) 2023, Ondrej Zvara (ozvara1@gmail.com)
# Copyright (c) 2021, Lennert Mertens (lennert@nubera.be)
# Copyright (c) 2021, Werner Dijkerman (ikben@werner-dijkerman.nl)
# Copyright (c) 2015, Werner Dijkerman (ikben@werner-dijkerman.nl)
# Copyright (c) 2019, Guillaume Martinez (lunik@tiwabbit.fr)
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
DOCUMENTATION = r"""
module: gitlab_merge_request_rule
short_description: Manage Gitlab Project merge request rules
version_added: 3.4.0
description:
- Managing Gitlab Project merge request rules (requires Enterprise/Ultimate Edition).
author:
- mil1i
requirements:
- python-gitlab >= 2.3.0
extends_documentation_fragment:
- community.general.auth_basic
- community.general.gitlab
- community.general.attributes
attributes:
check_mode:
support: full
diff_mode:
support: none
options:
state:
description:
- Create or delete protected branch.
default: present
type: str
choices: ["present", "absent"]
project_group:
description:
- The name/path of the group the project resides in.
required: true
type: str
project_name:
description:
- The name of the the project/repository.
required: true
type: str
rule_name:
description:
- The name of the custom merge request rule to manage.
type: str
approvals_required:
description:
- The number of approvals required before the rule passes to allow a merge.
type: int
usernames:
description:
- List of usernames of who can approve merge requests for this rule.
type: list
user_ids:
description:
- List of user_ids of who can approve merge requests for this rule.
type: list
groups:
description:
- List of group names (full path) or group ids of members who can approve merge requests for this rule.
type: list
protected_branches:
description:
- List of protected branch names or branch ids this rule applies to.
- Defaults to ALL branches if none specified.
type: str
applies_to_all_protected_branches:
description:
- Flag to enable applying this rule to ALL protected branches.
- Ignores "protected_branches" if this flag is set to True.
default: False
type: bool
"""
EXAMPLES = r"""
- name: Create merge request rule 'first-rule' on main and develop
community.general.gitlab_merge_request_rule:
api_url: "https://{{ gitlab_url }}/"
api_token: "{{ api_token }}"
validate_certs: false
state: present
project_group: 'group/subgroup'
project_name: 'repo-name'
rule_name: 'first-rule'
approvals_required: 3
applies_to_all_protected_branches: false
protected_branch: main
usernames:
- someonesusername
user_ids:
- 290
groups:
- Teams/subgroup/subsubgroup
- 743
"""
RETURN = r"""
"""
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.api import basic_auth_argument_spec
from ansible_collections.community.general.plugins.module_utils.version import LooseVersion
from ansible_collections.community.general.plugins.module_utils.gitlab import (
auth_argument_spec, gitlab_authentication, gitlab
)
from itertools import chain
class GitlabMergeRequestRule(object):
def __init__(self, module, gitlab_instance, group, project, rule_name, rule_approvals_required=0,
rule_groups=None, rule_user_ids=None, rule_usernames=None, rule_branches=None,
rule_all_protected_branches=False):
self._module = module
self.gitlab = gitlab_instance
self.group = self.get_project_group(group)
self.project = self.get_project(group, project)
self.protected_branches = self.project.protectedbranches.list(all=True)
self.rules = self.list_approval_rules()
self.rule_name: str = rule_name
self.rule_groups: int | list = self.build_groups(self._string_to_set(rule_groups))
self.rule_usernames: str = self._string_to_set(rule_usernames) if rule_usernames is not None else []
self.rule_user_ids: str | int | list = self._string_to_set(rule_user_ids) if rule_user_ids is not None else []
self.rule_users: str | int | list = self.build_users()
self.rule_apporvals_required: str = rule_approvals_required
self.rule_branches: str | int | list = self._gather_ids_for_protected_branches(rule_branches) if rule_branches is not None else []
self.rule_all_protected_branches: bool = rule_all_protected_branches
self.rule_exists, self.existing_rule = self.check_rule_exists(rule_name)
if self.rule_exists:
self._updates = False
self.rule_updated: bool = self.check_for_updates_to_existing_rule()
def _gather_ids_for_protected_branches(self, wanted_branches):
final_branches = set()
string_branches = self._string_to_set(wanted_branches)
for b in string_branches:
_id = b
if not b.isdigit():
try:
_id = self._check_protected_branch_exists(b)
except gitlab.GitlabGetError as gerr:
print(f'[{b}] {gerr.error_message}')
final_branches.add(_id)
return final_branches
def _check_protected_branch_exists(self, protected_branch):
try:
return next(pb.id for pb in self.protected_branches if pb.name == protected_branch)
except StopIteration:
return None
def _string_to_set(self, string):
try:
return set(string.strip().split(',')) if string is not None else None
except AttributeError:
if isinstance(string, int):
return set([string]) if string is not None else None
elif isinstance(string, list):
return set(string) if string is not None else None
return None
def _set_to_string(self, selfset):
return ','.join(str(s) for s in selfset) if selfset is not None else None
def build_groups(self, wanted_groups):
final_groups = set()
for grp in wanted_groups:
try:
final_groups.add(GitlabGroup(self._module, self.gitlab, grp).id)
except gitlab.GitlabError as err:
self._module.fail_json(msg=f'Failed to determine specified group ({grp})', err=err)
return final_groups
def build_users(self):
final_users = set()
if self.rule_user_ids is not None:
if self.rule_usernames is not None:
combined_users = set(chain(self.rule_user_ids, self.rule_usernames))
else:
combined_users = self.rule_user_ids
elif self.rule_usernames is not None:
combined_users = self.rule_usernames
else:
return None
for usr in combined_users:
try:
found_user = GitlabUser(self._module, self.gitlab, usr)
final_users.add(found_user.id)
except gitlab.GitlabError as err:
self._module.fail_json(msg=f'Failed to determine specified user ({usr})', err=err)
return final_users
def get_project(self, group_name, project_name):
return self.gitlab.projects.get(f'{group_name}/{project_name}')
def get_project_group(self, group_name):
return self.gitlab.groups.get(group_name)
def list_approval_rules(self):
return self.project.approvalrules.list()
def check_rule_exists(self, rule_name):
try:
return True, next(r for r in self.rules if r.name == rule_name)
except StopIteration:
return False, None
def _generated_data(self):
_name = str(self.rule_name)
_approvals_required = int(self.rule_apporvals_required)
_protected_branch_ids = list(self.rule_branches) if self.rule_branches is not None else []
_user_ids = list(self.rule_users) if self.rule_users is not None else []
_group_ids = list(self.rule_groups) if self.rule_groups is not None else []
_applies_to_all_protected_branches = bool(self.rule_all_protected_branches)
return {
"name": _name,
"approvals_required": _approvals_required,
"protected_branch_ids": _protected_branch_ids,
"user_ids": _user_ids,
"group_ids": _group_ids,
"applies_to_all_protected_branches": _applies_to_all_protected_branches
}
def create_approval_rule(self):
if self._module.check_mode:
return True
return self.project.approvalrules.create(
data = self._generated_data()
)
def update_approval_rule(self):
if self._module.check_mode:
return True
self.existing_rule.approvals_required = int(self.rule_apporvals_required)
self.existing_rule.protected_branch_ids = list(self.rule_branches) if self.rule_branches is not None else []
self.existing_rule.user_ids = list(self.rule_users) if self.rule_users is not None else []
self.existing_rule.group_ids = list(self.rule_groups) if self.rule_groups is not None else []
self.existing_rule.applies_to_all_protected_branches = bool(self.rule_all_protected_branches)
return self.existing_rule.save()
def _check_changed_members(self, updated_bool, new_members, previous_members) -> bool:
for m in new_members:
try:
if not any(pm.get('id') == m for pm in previous_members):
updated_bool = True
except AttributeError:
if not any(pm == m['id'] for pm in previous_members):
updated_bool = True
return updated_bool
def check_for_updates_to_existing_rule(self) -> bool:
updated = False
if self.existing_rule.applies_to_all_protected_branches != self.rule_all_protected_branches:
updated = True
if int(self.existing_rule.approvals_required) != int(self.rule_apporvals_required):
updated = True
# Check for changed / removed protected branches
updated = self._check_changed_members(updated, self.rule_branches, self.existing_rule.protected_branches)
updated = self._check_changed_members(updated, self.existing_rule.protected_branches, self.rule_branches)
# Check for changed /removed users
updated = self._check_changed_members(updated, self.rule_users, self.existing_rule.users)
updated = self._check_changed_members(updated, self.existing_rule.users, self.rule_users)
# Check for changed / removed groups
updated = self._check_changed_members(updated, self.rule_groups, self.existing_rule.groups)
updated = self._check_changed_members(updated, self.existing_rule.groups, self.rule_groups)
return updated
def delete_approval_rule(self):
if self._module.check_mode:
return True
if self.rule_exists:
return self.existing_rule.delete()
return False
class GitlabUser(object):
def __init__(self, module, gitlab_instance, user):
self._module = module
self.gitlab = gitlab_instance
self.given_user = user
self.id, self.name, self.full = self.get_id()
def get_id(self):
if isinstance(self.given_user, str) and not self.given_user.isdigit():
try:
found_user = self.gitlab.users.list(username=self.given_user)
num_found = len(found_user)
if num_found == 1:
return found_user[0].id, found_user[0].username, found_user[0]
elif num_found > 1:
self._module.fail_json(msg=f'More than one matching user found, unable to determine correct user ({self.given_user})', query_result=found_user)
else:
self._module.fail_json(msg=f'User could not be found ({self.given_user})')
except gitlab.GitlabError as gerr:
self._module.fail_json(msg=f'User could not be found ({self.given_user})', err=gerr.error_message)
else:
if isinstance(self.given_user, int) or self.given_user.isdigit():
try:
found_user = self.gitlab.users.get(self.given_user)
return found_user.id, found_user.username, found_user
except gitlab.GitlabError as gerr:
self._module.fail_json(msg=f'User could not be found ({self.given_user})', err=gerr.error_message)
class GitlabGroup(object):
def __init__(self, module, gitlab_instance, group):
self._module = module
self.gitlab = gitlab_instance
self.given_group = group
self.full = self.get()
self.id = self.full.id
self.name = self.full.name
self.full_path = self.full.full_path
self.full_name = self.full.full_name
self.members = self.full.members.list(get_all=True)
def get(self):
if isinstance(self.given_group, int) or self.given_group.isdigit():
try:
return self.gitlab.groups.get(self.given_group)
except gitlab.GitlabError as gerr:
self._module.fail_json(msg=f'Group could not be found ({self.given_group})', err=gerr.error_message)
else:
try:
found_groups = self.gitlab.groups.list(search=self.given_group)
num_groups = len(found_groups)
if num_groups == 1:
return found_groups[0]
elif num_groups > 1:
self._module.fail_json(msg=f'More than one matching group found, unable to determine correct group ({self.given_group})', query_result=found_groups)
else:
self._module.fail_json(msg=f'Group could not be found ({self.given_group})')
except gitlab.GitlabError as gerr:
self._module.fail_json(msg=f'Group could not be found ({self.given_group})', err=gerr.error_message)
def run_module():
module_args = basic_auth_argument_spec()
module_args.update(auth_argument_spec())
module_args.update(
project_group=dict(type='str', required=True),
project_name=dict(type='str', required=True),
rule_name=dict(type='str', required=True),
approvals_required=dict(type='int', required=True),
usernames=dict(type='list', default=[], required=False),
user_ids=dict(type='list', default=[], required=False),
groups=dict(type='list', default=[], required=False),
protected_branch=dict(type='str', default=None, required=False),
applies_to_all_protected_branches=dict(type='bool', default=False, required=False),
state=dict(type='str', default="present", choices=["absent", "present"]),
)
module = AnsibleModule(
argument_spec=module_args,
mutually_exclusive=[
['api_username', 'api_token'],
['api_username', 'api_oauth_token'],
['api_username', 'api_job_token'],
['api_token', 'api_oauth_token'],
['api_token', 'api_job_token'],
],
required_together=[
['api_username', 'api_password'],
],
required_one_of=[
['api_username', 'api_token', 'api_oauth_token', 'api_job_token']
],
supports_check_mode=True
)
# check prerequisites and connect to gitlab server
gitlab_instance = gitlab_authentication(module)
gitlab_version = gitlab.__version__
if LooseVersion(gitlab_version) < LooseVersion('2.3.0'):
module.fail_json(msg="community.general.gitlab_proteched_branch requires python-gitlab Python module >= 2.3.0 (installed version: [%s])."
" Please upgrade python-gitlab to version 2.3.0 or above." % gitlab_version)
gmmr = GitlabMergeRequestRule(
module = module,
gitlab_instance = gitlab_instance,
group = module.params['project_group'],
project = module.params['project_name'],
rule_name = module.params['rule_name'],
rule_approvals_required = module.params['approvals_required'],
rule_usernames = module.params['usernames'],
rule_user_ids = module.params['user_ids'],
rule_groups = module.params['groups'],
rule_branches = module.params['protected_branch'],
rule_all_protected_branches = module.params['applies_to_all_protected_branches']
)
if not gmmr.rule_exists:
if module.params['state'] == 'absent':
module.exit_json(
changed=False, msg=f'Merge Request Rule ({module.params["rule_name"]}) does not exist in {module.params["project_name"]}, nothing to delete.'
)
elif module.params['state'] == 'present':
gmmr.create_approval_rule()
gmmr.rules = gmmr.list_approval_rules()
_, new_mmr = gmmr.check_rule_exists(gmmr.rule_name)
module.exit_json(
changed=True, msg=f'Merge Request Rule ({module.params["rule_name"]}) successfully created.',
mrr=new_mmr.asdict()
)
elif gmmr.rule_exists:
if module.params['state'] == 'absent':
gmmr.delete_approval_rule()
module.exit_json(
changed=True, msg=f'Merge Request Rule ({module.params["rule_name"]}) was deleted from {module.params["project_name"]}.'
)
elif module.params['state'] == 'present' and gmmr.rule_updated:
gmmr.update_approval_rule()
_, new_mmr = gmmr.check_rule_exists(gmmr.rule_name)
module.exit_json(
changed=True, msg=f'Merge Request Rule ({module.params["rule_name"]}) changes were found, updated rule.',
mrr=new_mmr.asdict()
)
else:
module.exit_json(
changed=False, msg='No changes were detected.',
mrr=gmmr.existing_rule.asdict()
)
def main():
run_module()
if __name__ == '__main__':
main()