Add Keycloak roles module (#2930)

* implement simple realm and client role

* fix documentation

* code cleanup

* separate realm and client roles functions

* remove blank lines

* add tests

* fix linefeeds

* fix indentation

* fix error message

* fix documentation

* fix documentation

* keycloak_role integration tests

* keycloak_role integration tests

* remove extra blank line

* add version_added tag

Co-authored-by: Felix Fontein <felix@fontein.de>

Co-authored-by: Felix Fontein <felix@fontein.de>
This commit is contained in:
Laurent Paumier 2021-07-19 23:17:39 +02:00 committed by GitHub
parent a3607a745e
commit d7c6ba89f8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 1141 additions and 1 deletions

View file

@ -43,8 +43,14 @@ URL_REALM = "{url}/admin/realms/{realm}"
URL_TOKEN = "{url}/realms/{realm}/protocol/openid-connect/token"
URL_CLIENT = "{url}/admin/realms/{realm}/clients/{id}"
URL_CLIENTS = "{url}/admin/realms/{realm}/clients"
URL_CLIENT_ROLES = "{url}/admin/realms/{realm}/clients/{id}/roles"
URL_CLIENT_ROLE = "{url}/admin/realms/{realm}/clients/{id}/roles/{name}"
URL_CLIENT_ROLE_COMPOSITES = "{url}/admin/realms/{realm}/clients/{id}/roles/{name}/composites"
URL_REALM_ROLES = "{url}/admin/realms/{realm}/roles"
URL_REALM_ROLE = "{url}/admin/realms/{realm}/roles/{name}"
URL_REALM_ROLE_COMPOSITES = "{url}/admin/realms/{realm}/roles/{name}/composites"
URL_CLIENTTEMPLATE = "{url}/admin/realms/{realm}/client-templates/{id}"
URL_CLIENTTEMPLATES = "{url}/admin/realms/{realm}/client-templates"
@ -632,10 +638,197 @@ class KeycloakAPI(object):
try:
return open_url(group_url, method='DELETE', headers=self.restheaders,
validate_certs=self.validate_certs)
except Exception as e:
self.module.fail_json(msg="Unable to delete group %s: %s" % (groupid, str(e)))
def get_realm_roles(self, realm='master'):
""" Obtains role representations for roles in a realm
:param realm: realm to be queried
:return: list of dicts of role representations
"""
rolelist_url = URL_REALM_ROLES.format(url=self.baseurl, realm=realm)
try:
return json.loads(to_native(open_url(rolelist_url, method='GET', headers=self.restheaders,
validate_certs=self.validate_certs).read()))
except ValueError as e:
self.module.fail_json(msg='API returned incorrect JSON when trying to obtain list of roles for realm %s: %s'
% (realm, str(e)))
except Exception as e:
self.module.fail_json(msg='Could not obtain list of roles for realm %s: %s'
% (realm, str(e)))
def get_realm_role(self, name, realm='master'):
""" Fetch a keycloak role from the provided realm using the role's name.
If the role does not exist, None is returned.
:param name: Name of the role to fetch.
:param realm: Realm in which the role resides; default 'master'.
"""
role_url = URL_REALM_ROLE.format(url=self.baseurl, realm=realm, name=name)
try:
return json.loads(to_native(open_url(role_url, method="GET", headers=self.restheaders,
validate_certs=self.validate_certs).read()))
except HTTPError as e:
if e.code == 404:
return None
else:
self.module.fail_json(msg='Could not fetch role %s in realm %s: %s'
% (name, realm, str(e)))
except Exception as e:
self.module.fail_json(msg='Could not fetch role %s in realm %s: %s'
% (name, realm, str(e)))
def create_realm_role(self, rolerep, realm='master'):
""" Create a Keycloak realm role.
:param rolerep: a RoleRepresentation of the role to be created. Must contain at minimum the field name.
:return: HTTPResponse object on success
"""
roles_url = URL_REALM_ROLES.format(url=self.baseurl, realm=realm)
try:
return open_url(roles_url, method='POST', headers=self.restheaders,
data=json.dumps(rolerep), validate_certs=self.validate_certs)
except Exception as e:
self.module.fail_json(msg='Could not create role %s in realm %s: %s'
% (rolerep['name'], realm, str(e)))
def update_realm_role(self, rolerep, realm='master'):
""" Update an existing realm role.
:param rolerep: A RoleRepresentation of the updated role.
:return HTTPResponse object on success
"""
role_url = URL_REALM_ROLE.format(url=self.baseurl, realm=realm, name=rolerep['name'])
try:
return open_url(role_url, method='PUT', headers=self.restheaders,
data=json.dumps(rolerep), validate_certs=self.validate_certs)
except Exception as e:
self.module.fail_json(msg='Could not update role %s in realm %s: %s'
% (rolerep['name'], realm, str(e)))
def delete_realm_role(self, name, realm='master'):
""" Delete a realm role.
:param name: The name of the role.
:param realm: The realm in which this role resides, default "master".
"""
role_url = URL_REALM_ROLE.format(url=self.baseurl, realm=realm, name=name)
try:
return open_url(role_url, method='DELETE', headers=self.restheaders,
validate_certs=self.validate_certs)
except Exception as e:
self.module.fail_json(msg='Unable to delete role %s in realm %s: %s'
% (name, realm, str(e)))
def get_client_roles(self, clientid, realm='master'):
""" Obtains role representations for client roles in a specific client
:param clientid: Client id to be queried
:param realm: Realm to be queried
:return: List of dicts of role representations
"""
cid = self.get_client_id(clientid, realm=realm)
if cid is None:
self.module.fail_json(msg='Could not find client %s in realm %s'
% (clientid, realm))
rolelist_url = URL_CLIENT_ROLES.format(url=self.baseurl, realm=realm, id=cid)
try:
return json.loads(to_native(open_url(rolelist_url, method='GET', headers=self.restheaders,
validate_certs=self.validate_certs).read()))
except ValueError as e:
self.module.fail_json(msg='API returned incorrect JSON when trying to obtain list of roles for client %s in realm %s: %s'
% (clientid, realm, str(e)))
except Exception as e:
self.module.fail_json(msg='Could not obtain list of roles for client %s in realm %s: %s'
% (clientid, realm, str(e)))
def get_client_role(self, name, clientid, realm='master'):
""" Fetch a keycloak client role from the provided realm using the role's name.
:param name: Name of the role to fetch.
:param clientid: Client id for the client role
:param realm: Realm in which the role resides
:return: Dict of role representation
If the role does not exist, None is returned.
"""
cid = self.get_client_id(clientid, realm=realm)
if cid is None:
self.module.fail_json(msg='Could not find client %s in realm %s'
% (clientid, realm))
role_url = URL_CLIENT_ROLE.format(url=self.baseurl, realm=realm, id=cid, name=name)
try:
return json.loads(to_native(open_url(role_url, method="GET", headers=self.restheaders,
validate_certs=self.validate_certs).read()))
except HTTPError as e:
if e.code == 404:
return None
else:
self.module.fail_json(msg='Could not fetch role %s in client %s of realm %s: %s'
% (name, clientid, realm, str(e)))
except Exception as e:
self.module.fail_json(msg='Could not fetch role %s for client %s in realm %s: %s'
% (name, clientid, realm, str(e)))
def create_client_role(self, rolerep, clientid, realm='master'):
""" Create a Keycloak client role.
:param rolerep: a RoleRepresentation of the role to be created. Must contain at minimum the field name.
:param clientid: Client id for the client role
:param realm: Realm in which the role resides
:return: HTTPResponse object on success
"""
cid = self.get_client_id(clientid, realm=realm)
if cid is None:
self.module.fail_json(msg='Could not find client %s in realm %s'
% (clientid, realm))
roles_url = URL_CLIENT_ROLES.format(url=self.baseurl, realm=realm, id=cid)
try:
return open_url(roles_url, method='POST', headers=self.restheaders,
data=json.dumps(rolerep), validate_certs=self.validate_certs)
except Exception as e:
self.module.fail_json(msg='Could not create role %s for client %s in realm %s: %s'
% (rolerep['name'], clientid, realm, str(e)))
def update_client_role(self, rolerep, clientid, realm="master"):
""" Update an existing client role.
:param rolerep: A RoleRepresentation of the updated role.
:param clientid: Client id for the client role
:param realm: Realm in which the role resides
:return HTTPResponse object on success
"""
cid = self.get_client_id(clientid, realm=realm)
if cid is None:
self.module.fail_json(msg='Could not find client %s in realm %s'
% (clientid, realm))
role_url = URL_CLIENT_ROLE.format(url=self.baseurl, realm=realm, id=cid, name=rolerep['name'])
try:
return open_url(role_url, method='PUT', headers=self.restheaders,
data=json.dumps(rolerep), validate_certs=self.validate_certs)
except Exception as e:
self.module.fail_json(msg='Could not update role %s for client %s in realm %s: %s'
% (rolerep['name'], clientid, realm, str(e)))
def delete_client_role(self, name, clientid, realm="master"):
""" Delete a role. One of name or roleid must be provided.
:param name: The name of the role.
:param clientid: Client id for the client role
:param realm: Realm in which the role resides
"""
cid = self.get_client_id(clientid, realm=realm)
if cid is None:
self.module.fail_json(msg='Could not find client %s in realm %s'
% (clientid, realm))
role_url = URL_CLIENT_ROLE.format(url=self.baseurl, realm=realm, id=cid, name=name)
try:
return open_url(role_url, method='DELETE', headers=self.restheaders,
validate_certs=self.validate_certs)
except Exception as e:
self.module.fail_json(msg='Unable to delete role %s for client %s in realm %s: %s'
% (name, clientid, realm, str(e)))
def get_authentication_flow_by_alias(self, alias, realm='master'):
"""
Get an authentication flow by it's alias