mirror of
https://github.com/ansible-collections/community.general.git
synced 2025-08-12 00:54:22 -07:00
Fix issue #10299
This commit is contained in:
parent
682a89cdf5
commit
f67bdcb367
1 changed files with 21 additions and 17 deletions
|
@ -9,10 +9,12 @@ DOCUMENTATION = r"""
|
||||||
name: github_app_access_token
|
name: github_app_access_token
|
||||||
author:
|
author:
|
||||||
- Poh Wei Sheng (@weisheng-p)
|
- Poh Wei Sheng (@weisheng-p)
|
||||||
|
- Bruno Lavoie (@blavoie)
|
||||||
short_description: Obtain short-lived Github App Access tokens
|
short_description: Obtain short-lived Github App Access tokens
|
||||||
version_added: '8.2.0'
|
version_added: '8.2.0'
|
||||||
requirements:
|
requirements:
|
||||||
- jwt (https://github.com/GehirnInc/python-jwt)
|
- PyJWT (https://pypi.org/project/PyJWT/)
|
||||||
|
- cryptography (https://pypi.org/project/cryptography/)
|
||||||
description:
|
description:
|
||||||
- This generates a Github access token that can be used with a C(git) command, if you use a Github App.
|
- This generates a Github access token that can be used with a C(git) command, if you use a Github App.
|
||||||
options:
|
options:
|
||||||
|
@ -66,9 +68,14 @@ _raw:
|
||||||
elements: str
|
elements: str
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
HAS_CRYPTOGRAPHY = True
|
||||||
|
except ImportError:
|
||||||
|
HAS_CRYPTOGRAPHY = False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from jwt import JWT, jwk_from_pem
|
import jwt # pyjwt
|
||||||
HAS_JWT = True
|
HAS_JWT = True
|
||||||
except ImportError:
|
except ImportError:
|
||||||
HAS_JWT = False
|
HAS_JWT = False
|
||||||
|
@ -81,26 +88,20 @@ from ansible.errors import AnsibleError, AnsibleOptionsError
|
||||||
from ansible.plugins.lookup import LookupBase
|
from ansible.plugins.lookup import LookupBase
|
||||||
from ansible.utils.display import Display
|
from ansible.utils.display import Display
|
||||||
|
|
||||||
if HAS_JWT:
|
|
||||||
jwt_instance = JWT()
|
|
||||||
else:
|
|
||||||
jwk_from_pem = None
|
|
||||||
jwt_instance = None
|
|
||||||
|
|
||||||
display = Display()
|
display = Display()
|
||||||
|
|
||||||
|
|
||||||
def read_key(path, private_key=None):
|
def read_key(path, private_key=None):
|
||||||
try:
|
try:
|
||||||
if private_key:
|
if private_key:
|
||||||
return jwk_from_pem(private_key.encode('utf-8'))
|
key_bytes = private_key.encode('utf-8')
|
||||||
with open(path, 'rb') as pem_file:
|
else:
|
||||||
return jwk_from_pem(pem_file.read())
|
with open(path, 'rb') as pem_file:
|
||||||
|
key_bytes = pem_file.read()
|
||||||
|
return serialization.load_pem_private_key(key_bytes, password=None)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise AnsibleError(f"Error while parsing key file: {e}")
|
raise AnsibleError(f"Error while parsing key file: {e}")
|
||||||
|
|
||||||
|
def encode_jwt(app_id, private_key_obj, exp=600):
|
||||||
def encode_jwt(app_id, jwk, exp=600):
|
|
||||||
now = int(time.time())
|
now = int(time.time())
|
||||||
payload = {
|
payload = {
|
||||||
'iat': now,
|
'iat': now,
|
||||||
|
@ -108,11 +109,10 @@ def encode_jwt(app_id, jwk, exp=600):
|
||||||
'iss': app_id,
|
'iss': app_id,
|
||||||
}
|
}
|
||||||
try:
|
try:
|
||||||
return jwt_instance.encode(payload, jwk, alg='RS256')
|
return jwt.encode(payload, private_key_obj, algorithm='RS256')
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise AnsibleError(f"Error while encoding jwt: {e}")
|
raise AnsibleError(f"Error while encoding jwt: {e}")
|
||||||
|
|
||||||
|
|
||||||
def post_request(generated_jwt, installation_id):
|
def post_request(generated_jwt, installation_id):
|
||||||
github_api_url = f'https://api.github.com/app/installations/{installation_id}/access_tokens'
|
github_api_url = f'https://api.github.com/app/installations/{installation_id}/access_tokens'
|
||||||
headers = {
|
headers = {
|
||||||
|
@ -150,7 +150,11 @@ class LookupModule(LookupBase):
|
||||||
def run(self, terms, variables=None, **kwargs):
|
def run(self, terms, variables=None, **kwargs):
|
||||||
if not HAS_JWT:
|
if not HAS_JWT:
|
||||||
raise AnsibleError('Python jwt library is required. '
|
raise AnsibleError('Python jwt library is required. '
|
||||||
'Please install using "pip install jwt"')
|
'Please install using "pip install pyjwt"')
|
||||||
|
|
||||||
|
if not HAS_CRYPTOGRAPHY:
|
||||||
|
raise AnsibleError('Python cryptography library is required. '
|
||||||
|
'Please install using "pip install cryptography"')
|
||||||
|
|
||||||
self.set_options(var_options=variables, direct=kwargs)
|
self.set_options(var_options=variables, direct=kwargs)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue