Add support for addressing subgroups by paths to the keycloak_user module (issue #9647)

This commit is contained in:
Christian Schlichtherle 2025-03-18 16:13:44 +01:00
parent 4a2cc71141
commit 838280c2b6

View file

@ -2816,23 +2816,13 @@ class KeycloakAPI(object):
:return: Representation of the client groups. :return: Representation of the client groups.
""" """
try: try:
groups = [] user_groups_url = URL_USER_GROUPS.format(url=self.baseurl, realm=realm, id=user_id)
user_groups_url = URL_USER_GROUPS.format( return self._request_and_deserialize(user_groups_url, method='GET')
url=self.baseurl,
realm=realm,
id=user_id)
user_groups = json.load(
self._request(
user_groups_url,
method='GET'))
for user_group in user_groups:
groups.append(user_group["name"])
return groups
except Exception as e: except Exception as e:
self.fail_request(e, msg='Could not get groups for user %s in realm %s: %s' self.fail_request(e, msg='Could not get groups for user %s in realm %s: %s'
% (user_id, realm, str(e))) % (user_id, realm, str(e)))
def add_user_in_group(self, user_id, group_id, realm='master'): def add_user_to_group(self, user_id, group_id, realm='master'):
""" """
Add a user to a group. Add a user to a group.
:param user_id: User ID :param user_id: User ID
@ -2850,7 +2840,7 @@ class KeycloakAPI(object):
user_group_url, user_group_url,
method='PUT') method='PUT')
except Exception as e: except Exception as e:
self.fail_request(e, msg='Could not add user %s in group %s in realm %s: %s' self.fail_request(e, msg='Could not add user %s to group %s in realm %s: %s'
% (user_id, group_id, realm, str(e))) % (user_id, group_id, realm, str(e)))
def remove_user_from_group(self, user_id, group_id, realm='master'): def remove_user_from_group(self, user_id, group_id, realm='master'):
@ -2881,49 +2871,72 @@ class KeycloakAPI(object):
:param realm: Realm :param realm: Realm
:return: True if group membership has been changed. False Otherwise. :return: True if group membership has been changed. False Otherwise.
""" """
changed = False
try: try:
user_existing_groups = self.get_user_groups( groups_to_add, groups_to_remove = self.extract_groups_to_add_to_and_remove_from_user(groups)
user_id=userrep['id'], if not groups_to_add and not groups_to_remove:
realm=realm) return False
groups_to_add_and_remove = self.extract_groups_to_add_to_and_remove_from_user(groups)
# If group membership need to be changed user_groups = self.get_user_groups(user_id=userrep['id'], realm=realm)
if not is_struct_included(groups_to_add_and_remove['add'], user_existing_groups): user_group_names = [user_group['name'] for user_group in user_groups if 'name' in user_group]
# Get available groups in the realm user_group_paths = [user_group['path'] for user_group in user_groups if 'path' in user_group]
realm_groups = self.get_groups(realm=realm)
for realm_group in realm_groups: groups_to_add = [group_to_add for group_to_add in groups_to_add
if "name" in realm_group and realm_group["name"] in groups_to_add_and_remove['add']: if group_to_add not in user_group_names and group_to_add not in user_group_paths]
self.add_user_in_group( groups_to_remove = [group_to_remove for group_to_remove in groups_to_remove
user_id=userrep["id"], if group_to_remove in user_group_names or group_to_remove in user_group_paths]
group_id=realm_group["id"], if not groups_to_add and not groups_to_remove:
realm=realm) return False
changed = True
elif "name" in realm_group and realm_group['name'] in groups_to_add_and_remove['remove']: for group_to_add in groups_to_add:
self.remove_user_from_group( realm_group = self.find_group_by_path(group_to_add, realm=realm)
user_id=userrep['id'], if realm_group:
group_id=realm_group['id'], self.add_user_to_group(user_id=userrep['id'], group_id=realm_group['id'], realm=realm)
realm=realm)
changed = True for group_to_remove in groups_to_remove:
return changed realm_group = self.find_group_by_path(group_to_remove, realm=realm)
if realm_group:
self.remove_user_from_group(user_id=userrep['id'], group_id=realm_group['id'], realm=realm)
return True
except Exception as e: except Exception as e:
self.module.fail_json(msg='Could not update group membership for user %s in realm %s: %s' self.module.fail_json(msg='Could not update group membership for user %s in realm %s: %s'
% (userrep['id]'], realm, str(e))) % (userrep['username'], realm, e))
def extract_groups_to_add_to_and_remove_from_user(self, groups): def extract_groups_to_add_to_and_remove_from_user(self, groups):
groups_extract = {}
groups_to_add = [] groups_to_add = []
groups_to_remove = [] groups_to_remove = []
if isinstance(groups, list) and len(groups) > 0: if isinstance(groups, list):
for group in groups: for group in groups:
group_name = group['name'] if isinstance(group, dict) and 'name' in group else group group_name = group['name'] if isinstance(group, dict) and 'name' in group else group
if isinstance(group, dict) and ('state' not in group or group['state'] == 'present'): if isinstance(group, dict):
groups_to_add.append(group_name) if 'state' not in group or group['state'] == 'present':
else: groups_to_add.append(group_name)
groups_to_remove.append(group_name) else:
groups_extract['add'] = groups_to_add groups_to_remove.append(group_name)
groups_extract['remove'] = groups_to_remove return groups_to_add, groups_to_remove
return groups_extract def find_group_by_path(self, target, realm='master'):
"""
Finds a realm group by path, e.g. '/my/group'.
The path is formed by prepending a '/' character to `target` unless it's already present.
This adds support for finding top level groups by name and subgroups by path.
"""
groups = self.get_groups(realm=realm)
path = target if target.startswith('/') else '/' + target
for segment in path.split('/'):
if not segment:
continue
abort = True
for group in groups:
if group['path'] == path:
return self.get_group_by_groupid(group['id'], realm=realm)
if group['name'] == segment:
groups = self.get_subgroups(group, realm=realm)
abort = False
break
if abort:
break
return None
def convert_user_group_list_of_str_to_list_of_dict(self, groups): def convert_user_group_list_of_str_to_list_of_dict(self, groups):
list_of_groups = [] list_of_groups = []