GitHub app access token lookup: allow to use PyJWT + cryptography instead of jwt (#10664)

* Fix issue #10299

* Fix issue #10299

* Fix blank lines

* Fix blank lines

* Add compatibility changes for jwt

* Bump to a higher magic number

* Update change log fragment

* Update changelogs/fragments/10299-github_app_access_token-lookup.yml

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update changelogs/fragments/10299-github_app_access_token-lookup.yml

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update changelogs/fragments/10299-github_app_access_token-lookup.yml

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/lookup/github_app_access_token.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/lookup/github_app_access_token.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update requirement document

* Remove a whitespace

---------

Co-authored-by: Bruno Lavoie <bruno.lavoie@dti.ulaval.ca>
Co-authored-by: Felix Fontein <felix@fontein.de>
This commit is contained in:
weisheng-p 2025-08-24 00:36:53 +08:00 committed by GitHub
commit 65bc47068e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 138 additions and 31 deletions

2
.github/BOTMETA.yml vendored
View file

@ -287,7 +287,7 @@ files:
maintainers: dagwieers maintainers: dagwieers
$lookups/flattened.py: {} $lookups/flattened.py: {}
$lookups/github_app_access_token.py: $lookups/github_app_access_token.py:
maintainers: weisheng-p maintainers: weisheng-p blavoie
$lookups/hiera.py: $lookups/hiera.py:
maintainers: jparrill maintainers: jparrill
$lookups/keyring.py: {} $lookups/keyring.py: {}

View file

@ -0,0 +1,2 @@
minor_changes:
- github_app_access_token lookup plugin - support both ``jwt`` and ``pyjwt`` to avoid conflict with other modules requirements (https://github.com/ansible-collections/community.general/issues/10299).

View file

@ -9,10 +9,12 @@ DOCUMENTATION = r"""
name: github_app_access_token name: github_app_access_token
author: author:
- Poh Wei Sheng (@weisheng-p) - Poh Wei Sheng (@weisheng-p)
- Bruno Lavoie (@blavoie)
short_description: Obtain short-lived Github App Access tokens short_description: Obtain short-lived Github App Access tokens
version_added: '8.2.0' version_added: '8.2.0'
requirements: requirements:
- jwt (https://github.com/GehirnInc/python-jwt) - jwt (https://github.com/GehirnInc/python-jwt) OR
- PyJWT (https://pypi.org/project/PyJWT/) AND cryptography (https://pypi.org/project/cryptography/)
description: description:
- This generates a Github access token that can be used with a C(git) command, if you use a Github App. - This generates a Github access token that can be used with a C(git) command, if you use a Github App.
options: options:
@ -66,13 +68,24 @@ _raw:
elements: str elements: str
""" """
try: try:
from jwt import JWT, jwk_from_pem import jwt
HAS_JWT = True HAS_JWT = True
except ImportError: except ImportError:
HAS_JWT = False HAS_JWT = False
HAS_PYTHON_JWT = False # vs pyjwt
if HAS_JWT and hasattr(jwt, 'JWT'):
HAS_PYTHON_JWT = True
from jwt import jwk_from_pem, jwt_instance
try:
from cryptography.hazmat.primitives import serialization
HAS_CRYPTOGRAPHY = True
except ImportError:
HAS_CRYPTOGRAPHY = False
import time import time
import json import json
from ansible.module_utils.urls import open_url from ansible.module_utils.urls import open_url
@ -81,26 +94,52 @@ from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.plugins.lookup import LookupBase from ansible.plugins.lookup import LookupBase
from ansible.utils.display import Display from ansible.utils.display import Display
if HAS_JWT:
jwt_instance = JWT()
else:
jwk_from_pem = None
jwt_instance = None
display = Display() display = Display()
class PythonJWT:
@staticmethod
def read_key(path, private_key=None):
try:
if private_key:
return jwk_from_pem(private_key.encode('utf-8'))
with open(path, 'rb') as pem_file:
return jwk_from_pem(pem_file.read())
except Exception as e:
raise AnsibleError(f"Error while parsing key file: {e}")
@staticmethod
def encode_jwt(app_id, jwk, exp=600):
now = int(time.time())
payload = {
'iat': now,
'exp': now + exp,
'iss': app_id,
}
try:
return jwt_instance.encode(payload, jwk, alg='RS256')
except Exception as e:
raise AnsibleError(f"Error while encoding jwt: {e}")
def read_key(path, private_key=None): def read_key(path, private_key=None):
if HAS_PYTHON_JWT:
return PythonJWT.read_key(path, private_key)
try: try:
if private_key: if private_key:
return jwk_from_pem(private_key.encode('utf-8')) key_bytes = private_key.encode('utf-8')
with open(path, 'rb') as pem_file: else:
return jwk_from_pem(pem_file.read()) with open(path, 'rb') as pem_file:
key_bytes = pem_file.read()
return serialization.load_pem_private_key(key_bytes, password=None)
except Exception as e: except Exception as e:
raise AnsibleError(f"Error while parsing key file: {e}") raise AnsibleError(f"Error while parsing key file: {e}")
def encode_jwt(app_id, jwk, exp=600): def encode_jwt(app_id, private_key_obj, exp=600):
if HAS_PYTHON_JWT:
return PythonJWT.encode_jwt(app_id, private_key_obj)
now = int(time.time()) now = int(time.time())
payload = { payload = {
'iat': now, 'iat': now,
@ -108,7 +147,7 @@ def encode_jwt(app_id, jwk, exp=600):
'iss': app_id, 'iss': app_id,
} }
try: try:
return jwt_instance.encode(payload, jwk, alg='RS256') return jwt.encode(payload, private_key_obj, algorithm='RS256')
except Exception as e: except Exception as e:
raise AnsibleError(f"Error while encoding jwt: {e}") raise AnsibleError(f"Error while encoding jwt: {e}")
@ -150,7 +189,11 @@ class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs): def run(self, terms, variables=None, **kwargs):
if not HAS_JWT: if not HAS_JWT:
raise AnsibleError('Python jwt library is required. ' raise AnsibleError('Python jwt library is required. '
'Please install using "pip install jwt"') 'Please install using "pip install pyjwt"')
if not HAS_PYTHON_JWT and not HAS_CRYPTOGRAPHY:
raise AnsibleError('Python cryptography library is required. '
'Please install using "pip install cryptography"')
self.set_options(var_options=variables, direct=kwargs) self.set_options(var_options=variables, direct=kwargs)

View file

@ -6,19 +6,29 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import json import json
import types
import sys
from ansible_collections.community.internal_test_tools.tests.unit.compat import unittest from ansible_collections.community.internal_test_tools.tests.unit.compat import unittest
from ansible_collections.community.internal_test_tools.tests.unit.compat.mock import ( from ansible_collections.community.internal_test_tools.tests.unit.compat.mock import (
patch, patch,
MagicMock, MagicMock,
mock_open mock_open,
) )
from ansible.plugins.loader import lookup_loader from ansible.plugins.loader import lookup_loader
ENCODE_RESULT = 'Foobar'
PRIVATE_KEY = 'private_key'
class MockJWT(MagicMock): class MockJWT(MagicMock):
def encode(self, payload, key, alg): def encode(self, payload, key, alg):
return 'Foobar' return ENCODE_RESULT
class serialization(MagicMock):
def load_pem_private_key(self, key_bytes, password):
return PRIVATE_KEY
class MockResponse(MagicMock): class MockResponse(MagicMock):
@ -31,14 +41,17 @@ class MockResponse(MagicMock):
class TestLookupModule(unittest.TestCase): class TestLookupModule(unittest.TestCase):
def test_get_token_with_file_with_pyjwt(self):
pyjwt = types.ModuleType("jwt")
pyjwt.encode = MagicMock(return_value=ENCODE_RESULT)
with patch.dict(sys.modules, {'jwt': pyjwt}), \
patch.multiple("ansible_collections.community.general.plugins.lookup.github_app_access_token",
open=mock_open(read_data="foo_bar"),
open_url=MagicMock(return_value=MockResponse()),
HAS_JWT=True,
HAS_CRYPTOGRAPHY=True,
serialization=serialization()):
def test_get_token_with_file(self):
with patch.multiple("ansible_collections.community.general.plugins.lookup.github_app_access_token",
open=mock_open(read_data="foo_bar"),
open_url=MagicMock(return_value=MockResponse()),
jwk_from_pem=MagicMock(return_value='private_key'),
jwt_instance=MockJWT(),
HAS_JWT=True):
lookup = lookup_loader.get('community.general.github_app_access_token') lookup = lookup_loader.get('community.general.github_app_access_token')
self.assertListEqual( self.assertListEqual(
[MockResponse.response_token], [MockResponse.response_token],
@ -51,12 +64,61 @@ class TestLookupModule(unittest.TestCase):
) )
) )
def test_get_token_with_fact(self): def test_get_token_with_fact_with_pyjwt(self):
with patch.multiple("ansible_collections.community.general.plugins.lookup.github_app_access_token", pyjwt = types.ModuleType("jwt")
open_url=MagicMock(return_value=MockResponse()), pyjwt.encode = MagicMock(return_value=ENCODE_RESULT)
jwk_from_pem=MagicMock(return_value='private_key'), with patch.dict(sys.modules, {'jwt': pyjwt}), \
jwt_instance=MockJWT(), patch.multiple("ansible_collections.community.general.plugins.lookup.github_app_access_token",
HAS_JWT=True): open=mock_open(read_data="foo_bar"),
open_url=MagicMock(return_value=MockResponse()),
HAS_JWT=True,
HAS_CRYPTOGRAPHY=True,
serialization=serialization()):
lookup = lookup_loader.get('community.general.github_app_access_token')
self.assertListEqual(
[MockResponse.response_token],
lookup.run(
[],
app_id="app_id",
installation_id="installation_id",
private_key="foo_bar",
token_expiry=600
)
)
def test_get_token_with_python_jwt(self):
python_jwt = types.ModuleType("jwt")
python_jwt.JWT = MagicMock()
python_jwt.jwk_from_pem = MagicMock(return_value='private_key')
python_jwt.jwt_instance = MockJWT()
with patch.dict(sys.modules, {'jwt': python_jwt}), \
patch.multiple("ansible_collections.community.general.plugins.lookup.github_app_access_token",
open=mock_open(read_data="foo_bar"),
open_url=MagicMock(return_value=MockResponse()),
HAS_JWT=True):
lookup = lookup_loader.get('community.general.github_app_access_token')
self.assertListEqual(
[MockResponse.response_token],
lookup.run(
[],
key_path="key",
app_id="app_id",
installation_id="installation_id",
token_expiry=600
)
)
def test_get_token_with_fact_with_python_jwt(self):
python_jwt = types.ModuleType("jwt")
python_jwt.JWT = MagicMock()
python_jwt.jwk_from_pem = MagicMock(return_value='private_key')
python_jwt.jwt_instance = MockJWT()
with patch.dict(sys.modules, {'jwt': python_jwt}), \
patch.multiple("ansible_collections.community.general.plugins.lookup.github_app_access_token",
open=mock_open(read_data="foo_bar"),
open_url=MagicMock(return_value=MockResponse()),
HAS_JWT=True):
lookup = lookup_loader.get('community.general.github_app_access_token') lookup = lookup_loader.get('community.general.github_app_access_token')
self.assertListEqual( self.assertListEqual(
[MockResponse.response_token], [MockResponse.response_token],