New Module: Keycloak User Rolemapping (#4898)

* keycloak_user_rolemapping: implement user role mapping

* keycloak_user_rolemapping: additional logging

* keycloak_user_rolemapping: move to getters, use names parameters

* keycloak_user_rolemapping: add service account user example

* Add keyring and keyring_info modules (#4764)

* keycloak_user_rolemapping: write tests, address ansibullbot concerns no.1

* keycloak_user_rolemapping: address felixfontein concerns no.1

* keycloak_user_rolemapping: remove rebase mistakes

* keycloak_user_rolemapping: address felixfontein concerns no.2

* keycloak_user_rolemapping: refactor duplicate username usage example

* keycloak_user_rolemapping: fix sanity check errors no.1

* keycloak_user_rolemapping: fix sanity check errors no.2

* keycloak_user_rolemapping: fix sanity check errors no.3

* keycloak_user_rolemapping: fix sanity check errors no.4

* keycloak_user_rolemapping: write tests, address ansibullbot concerns no.1

* keycloak_user_rolemapping: resolve rebase conflicts with origin/main branch

# Conflicts:
#	plugins/module_utils/identity/keycloak/keycloak.py

* keycloak_user_rolemapping: remove keycloak_role_composites from BOTMETA.yml

* keycloak_user_rolemapping: fix sanity check errors no.5

* keycloak_user_rolemapping: address felixfontein reviews concerns no.1

* keycloak_user_rolemapping: address felixfontein reviews concerns no.2

Co-authored-by: Dušan Markovič <dusan.markovic@better.care>
Co-authored-by: ahussey-redhat <93101976+ahussey-redhat@users.noreply.github.com>
This commit is contained in:
bratwurzt 2022-10-01 18:16:47 +02:00 committed by GitHub
commit 2cac3ae879
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 916 additions and 90 deletions

View file

@ -29,8 +29,15 @@ URL_CLIENT_ROLE_COMPOSITES = "{url}/admin/realms/{realm}/clients/{id}/roles/{nam
URL_REALM_ROLES = "{url}/admin/realms/{realm}/roles"
URL_REALM_ROLE = "{url}/admin/realms/{realm}/roles/{name}"
URL_REALM_ROLEMAPPINGS = "{url}/admin/realms/{realm}/users/{id}/role-mappings/realm"
URL_REALM_ROLEMAPPINGS_AVAILABLE = "{url}/admin/realms/{realm}/users/{id}/role-mappings/realm/available"
URL_REALM_ROLEMAPPINGS_COMPOSITE = "{url}/admin/realms/{realm}/users/{id}/role-mappings/realm/composite"
URL_REALM_ROLE_COMPOSITES = "{url}/admin/realms/{realm}/roles/{name}/composites"
URL_ROLES_BY_ID = "{url}/admin/realms/{realm}/roles-by-id/{id}"
URL_ROLES_BY_ID_COMPOSITES_CLIENTS = "{url}/admin/realms/{realm}/roles-by-id/{id}/composites/clients/{cid}"
URL_ROLES_BY_ID_COMPOSITES = "{url}/admin/realms/{realm}/roles-by-id/{id}/composites"
URL_CLIENTTEMPLATE = "{url}/admin/realms/{realm}/client-templates/{id}"
URL_CLIENTTEMPLATES = "{url}/admin/realms/{realm}/client-templates"
URL_GROUPS = "{url}/admin/realms/{realm}/groups"
@ -41,9 +48,15 @@ URL_CLIENTSCOPE = "{url}/admin/realms/{realm}/client-scopes/{id}"
URL_CLIENTSCOPE_PROTOCOLMAPPERS = "{url}/admin/realms/{realm}/client-scopes/{id}/protocol-mappers/models"
URL_CLIENTSCOPE_PROTOCOLMAPPER = "{url}/admin/realms/{realm}/client-scopes/{id}/protocol-mappers/models/{mapper_id}"
URL_CLIENT_ROLEMAPPINGS = "{url}/admin/realms/{realm}/groups/{id}/role-mappings/clients/{client}"
URL_CLIENT_ROLEMAPPINGS_AVAILABLE = "{url}/admin/realms/{realm}/groups/{id}/role-mappings/clients/{client}/available"
URL_CLIENT_ROLEMAPPINGS_COMPOSITE = "{url}/admin/realms/{realm}/groups/{id}/role-mappings/clients/{client}/composite"
URL_CLIENT_GROUP_ROLEMAPPINGS = "{url}/admin/realms/{realm}/groups/{id}/role-mappings/clients/{client}"
URL_CLIENT_GROUP_ROLEMAPPINGS_AVAILABLE = "{url}/admin/realms/{realm}/groups/{id}/role-mappings/clients/{client}/available"
URL_CLIENT_GROUP_ROLEMAPPINGS_COMPOSITE = "{url}/admin/realms/{realm}/groups/{id}/role-mappings/clients/{client}/composite"
URL_USERS = "{url}/admin/realms/{realm}/users"
URL_CLIENT_SERVICE_ACCOUNT_USER = "{url}/admin/realms/{realm}/clients/{id}/service-account-user"
URL_CLIENT_USER_ROLEMAPPINGS = "{url}/admin/realms/{realm}/users/{id}/role-mappings/clients/{client}"
URL_CLIENT_USER_ROLEMAPPINGS_AVAILABLE = "{url}/admin/realms/{realm}/users/{id}/role-mappings/clients/{client}/available"
URL_CLIENT_USER_ROLEMAPPINGS_COMPOSITE = "{url}/admin/realms/{realm}/users/{id}/role-mappings/clients/{client}/composite"
URL_AUTHENTICATION_FLOWS = "{url}/admin/realms/{realm}/authentication/flows"
URL_AUTHENTICATION_FLOW = "{url}/admin/realms/{realm}/authentication/flows/{id}"
@ -446,10 +459,9 @@ class KeycloakAPI(object):
self.module.fail_json(msg="Could not fetch rolemappings for client %s in realm %s: %s"
% (cid, realm, str(e)))
def get_client_role_by_name(self, gid, cid, name, realm="master"):
def get_client_role_id_by_name(self, cid, name, realm="master"):
""" Get the role ID of a client.
:param gid: ID of the group from which to obtain the rolemappings.
:param cid: ID of the client from which to obtain the rolemappings.
:param name: Name of the role.
:param realm: Realm from which to obtain the rolemappings.
@ -461,7 +473,7 @@ class KeycloakAPI(object):
return role['id']
return None
def get_client_rolemapping_by_id(self, gid, cid, rid, realm='master'):
def get_client_group_rolemapping_by_id(self, gid, cid, rid, realm='master'):
""" Obtain client representation by id
:param gid: ID of the group from which to obtain the rolemappings.
@ -470,7 +482,7 @@ class KeycloakAPI(object):
:param realm: client from this realm
:return: dict of rolemapping representation or None if none matching exist
"""
rolemappings_url = URL_CLIENT_ROLEMAPPINGS.format(url=self.baseurl, realm=realm, id=gid, client=cid)
rolemappings_url = URL_CLIENT_GROUP_ROLEMAPPINGS.format(url=self.baseurl, realm=realm, id=gid, client=cid)
try:
rolemappings = json.loads(to_native(open_url(rolemappings_url, method="GET", http_agent=self.http_agent, headers=self.restheaders,
timeout=self.connection_timeout,
@ -483,7 +495,7 @@ class KeycloakAPI(object):
% (cid, gid, realm, str(e)))
return None
def get_client_available_rolemappings(self, gid, cid, realm="master"):
def get_client_group_available_rolemappings(self, gid, cid, realm="master"):
""" Fetch the available role of a client in a specified goup on the Keycloak server.
:param gid: ID of the group from which to obtain the rolemappings.
@ -491,7 +503,7 @@ class KeycloakAPI(object):
:param realm: Realm from which to obtain the rolemappings.
:return: The rollemappings of specified group and client of the realm (default "master").
"""
available_rolemappings_url = URL_CLIENT_ROLEMAPPINGS_AVAILABLE.format(url=self.baseurl, realm=realm, id=gid, client=cid)
available_rolemappings_url = URL_CLIENT_GROUP_ROLEMAPPINGS_AVAILABLE.format(url=self.baseurl, realm=realm, id=gid, client=cid)
try:
return json.loads(to_native(open_url(available_rolemappings_url, method="GET", http_agent=self.http_agent, headers=self.restheaders,
timeout=self.connection_timeout,
@ -500,7 +512,7 @@ class KeycloakAPI(object):
self.module.fail_json(msg="Could not fetch available rolemappings for client %s in group %s, realm %s: %s"
% (cid, gid, realm, str(e)))
def get_client_composite_rolemappings(self, gid, cid, realm="master"):
def get_client_group_composite_rolemappings(self, gid, cid, realm="master"):
""" Fetch the composite role of a client in a specified group on the Keycloak server.
:param gid: ID of the group from which to obtain the rolemappings.
@ -508,15 +520,64 @@ class KeycloakAPI(object):
:param realm: Realm from which to obtain the rolemappings.
:return: The rollemappings of specified group and client of the realm (default "master").
"""
available_rolemappings_url = URL_CLIENT_ROLEMAPPINGS_COMPOSITE.format(url=self.baseurl, realm=realm, id=gid, client=cid)
composite_rolemappings_url = URL_CLIENT_GROUP_ROLEMAPPINGS_COMPOSITE.format(url=self.baseurl, realm=realm, id=gid, client=cid)
try:
return json.loads(to_native(open_url(available_rolemappings_url, method="GET", http_agent=self.http_agent, headers=self.restheaders,
return json.loads(to_native(open_url(composite_rolemappings_url, method="GET", http_agent=self.http_agent, headers=self.restheaders,
timeout=self.connection_timeout,
validate_certs=self.validate_certs).read()))
except Exception as e:
self.module.fail_json(msg="Could not fetch available rolemappings for client %s in group %s, realm %s: %s"
% (cid, gid, realm, str(e)))
def get_role_by_id(self, rid, realm="master"):
""" Fetch a role by its id on the Keycloak server.
:param rid: ID of the role.
:param realm: Realm from which to obtain the rolemappings.
:return: The role.
"""
client_roles_url = URL_ROLES_BY_ID.format(url=self.baseurl, realm=realm, id=rid)
try:
return json.loads(to_native(open_url(client_roles_url, method="GET", http_agent=self.http_agent, headers=self.restheaders,
timeout=self.connection_timeout,
validate_certs=self.validate_certs).read()))
except Exception as e:
self.module.fail_json(msg="Could not fetch role for id %s in realm %s: %s"
% (rid, realm, str(e)))
def get_client_roles_by_id_composite_rolemappings(self, rid, cid, realm="master"):
""" Fetch a role by its id on the Keycloak server.
:param rid: ID of the composite role.
:param cid: ID of the client from which to obtain the rolemappings.
:param realm: Realm from which to obtain the rolemappings.
:return: The role.
"""
client_roles_url = URL_ROLES_BY_ID_COMPOSITES_CLIENTS.format(url=self.baseurl, realm=realm, id=rid, cid=cid)
try:
return json.loads(to_native(open_url(client_roles_url, method="GET", http_agent=self.http_agent, headers=self.restheaders,
timeout=self.connection_timeout,
validate_certs=self.validate_certs).read()))
except Exception as e:
self.module.fail_json(msg="Could not fetch role for id %s and cid %s in realm %s: %s"
% (rid, cid, realm, str(e)))
def add_client_roles_by_id_composite_rolemapping(self, rid, roles_rep, realm="master"):
""" Assign roles to composite role
:param rid: ID of the composite role.
:param roles_rep: Representation of the roles to assign.
:param realm: Realm from which to obtain the rolemappings.
:return: None.
"""
available_rolemappings_url = URL_ROLES_BY_ID_COMPOSITES.format(url=self.baseurl, realm=realm, id=rid)
try:
open_url(available_rolemappings_url, method="POST", http_agent=self.http_agent, headers=self.restheaders, data=json.dumps(roles_rep),
validate_certs=self.validate_certs, timeout=self.connection_timeout)
except Exception as e:
self.module.fail_json(msg="Could not assign roles to composite role %s and realm %s: %s"
% (rid, realm, str(e)))
def add_group_rolemapping(self, gid, cid, role_rep, realm="master"):
""" Fetch the composite role of a client in a specified goup on the Keycloak server.
@ -526,7 +587,7 @@ class KeycloakAPI(object):
:param realm: Realm from which to obtain the rolemappings.
:return: None.
"""
available_rolemappings_url = URL_CLIENT_ROLEMAPPINGS.format(url=self.baseurl, realm=realm, id=gid, client=cid)
available_rolemappings_url = URL_CLIENT_GROUP_ROLEMAPPINGS.format(url=self.baseurl, realm=realm, id=gid, client=cid)
try:
open_url(available_rolemappings_url, method="POST", http_agent=self.http_agent, headers=self.restheaders, data=json.dumps(role_rep),
validate_certs=self.validate_certs, timeout=self.connection_timeout)
@ -543,7 +604,7 @@ class KeycloakAPI(object):
:param realm: Realm from which to obtain the rolemappings.
:return: None.
"""
available_rolemappings_url = URL_CLIENT_ROLEMAPPINGS.format(url=self.baseurl, realm=realm, id=gid, client=cid)
available_rolemappings_url = URL_CLIENT_GROUP_ROLEMAPPINGS.format(url=self.baseurl, realm=realm, id=gid, client=cid)
try:
open_url(available_rolemappings_url, method="DELETE", http_agent=self.http_agent, headers=self.restheaders,
validate_certs=self.validate_certs, timeout=self.connection_timeout)
@ -551,6 +612,206 @@ class KeycloakAPI(object):
self.module.fail_json(msg="Could not delete available rolemappings for client %s in group %s, realm %s: %s"
% (cid, gid, realm, str(e)))
def get_client_user_rolemapping_by_id(self, uid, cid, rid, realm='master'):
""" Obtain client representation by id
:param uid: ID of the user from which to obtain the rolemappings.
:param cid: ID of the client from which to obtain the rolemappings.
:param rid: ID of the role.
:param realm: client from this realm
:return: dict of rolemapping representation or None if none matching exist
"""
rolemappings_url = URL_CLIENT_USER_ROLEMAPPINGS.format(url=self.baseurl, realm=realm, id=uid, client=cid)
try:
rolemappings = json.loads(to_native(open_url(rolemappings_url, method="GET", http_agent=self.http_agent, headers=self.restheaders,
timeout=self.connection_timeout,
validate_certs=self.validate_certs).read()))
for role in rolemappings:
if rid == role['id']:
return role
except Exception as e:
self.module.fail_json(msg="Could not fetch rolemappings for client %s and user %s, realm %s: %s"
% (cid, uid, realm, str(e)))
return None
def get_client_user_available_rolemappings(self, uid, cid, realm="master"):
""" Fetch the available role of a client for a specified user on the Keycloak server.
:param uid: ID of the user from which to obtain the rolemappings.
:param cid: ID of the client from which to obtain the rolemappings.
:param realm: Realm from which to obtain the rolemappings.
:return: The effective rollemappings of specified client and user of the realm (default "master").
"""
available_rolemappings_url = URL_CLIENT_USER_ROLEMAPPINGS_AVAILABLE.format(url=self.baseurl, realm=realm, id=uid, client=cid)
try:
return json.loads(to_native(open_url(available_rolemappings_url, method="GET", http_agent=self.http_agent, headers=self.restheaders,
timeout=self.connection_timeout,
validate_certs=self.validate_certs).read()))
except Exception as e:
self.module.fail_json(msg="Could not fetch effective rolemappings for client %s and user %s, realm %s: %s"
% (cid, uid, realm, str(e)))
def get_client_user_composite_rolemappings(self, uid, cid, realm="master"):
""" Fetch the composite role of a client for a specified user on the Keycloak server.
:param uid: ID of the user from which to obtain the rolemappings.
:param cid: ID of the client from which to obtain the rolemappings.
:param realm: Realm from which to obtain the rolemappings.
:return: The rollemappings of specified group and client of the realm (default "master").
"""
composite_rolemappings_url = URL_CLIENT_USER_ROLEMAPPINGS_COMPOSITE.format(url=self.baseurl, realm=realm, id=uid, client=cid)
try:
return json.loads(to_native(open_url(composite_rolemappings_url, method="GET", http_agent=self.http_agent, headers=self.restheaders,
timeout=self.connection_timeout,
validate_certs=self.validate_certs).read()))
except Exception as e:
self.module.fail_json(msg="Could not fetch available rolemappings for user %s of realm %s: %s"
% (uid, realm, str(e)))
def get_realm_user_rolemapping_by_id(self, uid, rid, realm='master'):
""" Obtain role representation by id
:param uid: ID of the user from which to obtain the rolemappings.
:param rid: ID of the role.
:param realm: client from this realm
:return: dict of rolemapping representation or None if none matching exist
"""
rolemappings_url = URL_REALM_ROLEMAPPINGS.format(url=self.baseurl, realm=realm, id=uid)
try:
rolemappings = json.loads(to_native(open_url(rolemappings_url, method="GET", http_agent=self.http_agent, headers=self.restheaders,
timeout=self.connection_timeout,
validate_certs=self.validate_certs).read()))
for role in rolemappings:
if rid == role['id']:
return role
except Exception as e:
self.module.fail_json(msg="Could not fetch rolemappings for user %s, realm %s: %s"
% (uid, realm, str(e)))
return None
def get_realm_user_available_rolemappings(self, uid, realm="master"):
""" Fetch the available role of a realm for a specified user on the Keycloak server.
:param uid: ID of the user from which to obtain the rolemappings.
:param realm: Realm from which to obtain the rolemappings.
:return: The rollemappings of specified group and client of the realm (default "master").
"""
available_rolemappings_url = URL_REALM_ROLEMAPPINGS_AVAILABLE.format(url=self.baseurl, realm=realm, id=uid)
try:
return json.loads(to_native(open_url(available_rolemappings_url, method="GET", http_agent=self.http_agent, headers=self.restheaders,
timeout=self.connection_timeout,
validate_certs=self.validate_certs).read()))
except Exception as e:
self.module.fail_json(msg="Could not fetch available rolemappings for user %s of realm %s: %s"
% (uid, realm, str(e)))
def get_realm_user_composite_rolemappings(self, uid, realm="master"):
""" Fetch the composite role of a realm for a specified user on the Keycloak server.
:param uid: ID of the user from which to obtain the rolemappings.
:param realm: Realm from which to obtain the rolemappings.
:return: The effective rollemappings of specified client and user of the realm (default "master").
"""
composite_rolemappings_url = URL_REALM_ROLEMAPPINGS_COMPOSITE.format(url=self.baseurl, realm=realm, id=uid)
try:
return json.loads(to_native(open_url(composite_rolemappings_url, method="GET", http_agent=self.http_agent, headers=self.restheaders,
timeout=self.connection_timeout,
validate_certs=self.validate_certs).read()))
except Exception as e:
self.module.fail_json(msg="Could not fetch effective rolemappings for user %s, realm %s: %s"
% (uid, realm, str(e)))
def get_user_by_username(self, username, realm="master"):
""" Fetch a keycloak user within a realm based on its username.
If the user does not exist, None is returned.
:param username: Username of the user to fetch.
:param realm: Realm in which the user resides; default 'master'
"""
users_url = URL_USERS.format(url=self.baseurl, realm=realm)
users_url += '?username=%s&exact=true' % username
try:
return json.loads(to_native(open_url(users_url, method='GET', headers=self.restheaders, timeout=self.connection_timeout,
validate_certs=self.validate_certs).read()))
except ValueError as e:
self.module.fail_json(msg='API returned incorrect JSON when trying to obtain the user for realm %s and username %s: %s'
% (realm, username, str(e)))
except Exception as e:
self.module.fail_json(msg='Could not obtain the user for realm %s and username %s: %s'
% (realm, username, str(e)))
def get_service_account_user_by_client_id(self, client_id, realm="master"):
""" Fetch a keycloak service account user within a realm based on its client_id.
If the user does not exist, None is returned.
:param client_id: clientId of the service account user to fetch.
:param realm: Realm in which the user resides; default 'master'
"""
cid = self.get_client_id(client_id, realm=realm)
service_account_user_url = URL_CLIENT_SERVICE_ACCOUNT_USER.format(url=self.baseurl, realm=realm, id=cid)
try:
return json.loads(to_native(open_url(service_account_user_url, method='GET', headers=self.restheaders, timeout=self.connection_timeout,
validate_certs=self.validate_certs).read()))
except ValueError as e:
self.module.fail_json(msg='API returned incorrect JSON when trying to obtain the service-account-user for realm %s and client_id %s: %s'
% (realm, client_id, str(e)))
except Exception as e:
self.module.fail_json(msg='Could not obtain the service-account-user for realm %s and client_id %s: %s'
% (realm, client_id, str(e)))
def add_user_rolemapping(self, uid, cid, role_rep, realm="master"):
""" Assign a realm or client role to a specified user on the Keycloak server.
:param uid: ID of the user roles are assigned to.
:param cid: ID of the client from which to obtain the rolemappings. If empty, roles are from the realm
:param role_rep: Representation of the role to assign.
:param realm: Realm from which to obtain the rolemappings.
:return: None.
"""
if cid is None:
user_realm_rolemappings_url = URL_REALM_ROLEMAPPINGS.format(url=self.baseurl, realm=realm, id=uid)
try:
open_url(user_realm_rolemappings_url, method="POST", http_agent=self.http_agent, headers=self.restheaders, data=json.dumps(role_rep),
validate_certs=self.validate_certs, timeout=self.connection_timeout)
except Exception as e:
self.module.fail_json(msg="Could not map roles to userId %s for realm %s and roles %s: %s"
% (uid, realm, json.dumps(role_rep), str(e)))
else:
user_client_rolemappings_url = URL_CLIENT_USER_ROLEMAPPINGS.format(url=self.baseurl, realm=realm, id=uid, client=cid)
try:
open_url(user_client_rolemappings_url, method="POST", http_agent=self.http_agent, headers=self.restheaders, data=json.dumps(role_rep),
validate_certs=self.validate_certs, timeout=self.connection_timeout)
except Exception as e:
self.module.fail_json(msg="Could not map roles to userId %s for client %s, realm %s and roles %s: %s"
% (cid, uid, realm, json.dumps(role_rep), str(e)))
def delete_user_rolemapping(self, uid, cid, role_rep, realm="master"):
""" Delete the rolemapping of a client in a specified user on the Keycloak server.
:param uid: ID of the user from which to remove the rolemappings.
:param cid: ID of the client from which to remove the rolemappings.
:param role_rep: Representation of the role to remove from rolemappings.
:param realm: Realm from which to remove the rolemappings.
:return: None.
"""
if cid is None:
user_realm_rolemappings_url = URL_REALM_ROLEMAPPINGS.format(url=self.baseurl, realm=realm, id=uid)
try:
open_url(user_realm_rolemappings_url, method="DELETE", http_agent=self.http_agent, headers=self.restheaders, data=json.dumps(role_rep),
validate_certs=self.validate_certs, timeout=self.connection_timeout)
except Exception as e:
self.module.fail_json(msg="Could not remove roles %s from userId %s, realm %s: %s"
% (json.dumps(role_rep), uid, realm, str(e)))
else:
user_client_rolemappings_url = URL_CLIENT_USER_ROLEMAPPINGS.format(url=self.baseurl, realm=realm, id=uid, client=cid)
try:
open_url(user_client_rolemappings_url, method="DELETE", http_agent=self.http_agent, headers=self.restheaders, data=json.dumps(role_rep),
validate_certs=self.validate_certs, timeout=self.connection_timeout)
except Exception as e:
self.module.fail_json(msg="Could not remove roles %s for client %s from userId %s, realm %s: %s"
% (json.dumps(role_rep), cid, uid, realm, str(e)))
def get_client_templates(self, realm='master'):
""" Obtains client template representations for client templates in a realm
@ -930,7 +1191,6 @@ class KeycloakAPI(object):
return json.loads(to_native(open_url(groups_url, method="GET", http_agent=self.http_agent, headers=self.restheaders,
timeout=self.connection_timeout,
validate_certs=self.validate_certs).read()))
except HTTPError as e:
if e.code == 404:
return None