From 65bc47068e347b2686545260dc1b7a02bdf5f561 Mon Sep 17 00:00:00 2001 From: weisheng-p Date: Sun, 24 Aug 2025 00:36:53 +0800 Subject: [PATCH] GitHub app access token lookup: allow to use PyJWT + cryptography instead of jwt (#10664) * Fix issue #10299 * Fix issue #10299 * Fix blank lines * Fix blank lines * Add compatibility changes for jwt * Bump to a higher magic number * Update change log fragment * Update changelogs/fragments/10299-github_app_access_token-lookup.yml Co-authored-by: Felix Fontein * Update changelogs/fragments/10299-github_app_access_token-lookup.yml Co-authored-by: Felix Fontein * Update changelogs/fragments/10299-github_app_access_token-lookup.yml Co-authored-by: Felix Fontein * Update plugins/lookup/github_app_access_token.py Co-authored-by: Felix Fontein * Update plugins/lookup/github_app_access_token.py Co-authored-by: Felix Fontein * Update requirement document * Remove a whitespace --------- Co-authored-by: Bruno Lavoie Co-authored-by: Felix Fontein --- .github/BOTMETA.yml | 2 +- .../10299-github_app_access_token-lookup.yml | 2 + plugins/lookup/github_app_access_token.py | 73 ++++++++++++--- .../lookup/test_github_app_access_token.py | 92 ++++++++++++++++--- 4 files changed, 138 insertions(+), 31 deletions(-) create mode 100644 changelogs/fragments/10299-github_app_access_token-lookup.yml diff --git a/.github/BOTMETA.yml b/.github/BOTMETA.yml index 3e24178ae9..fe76d996a5 100644 --- a/.github/BOTMETA.yml +++ b/.github/BOTMETA.yml @@ -287,7 +287,7 @@ files: maintainers: dagwieers $lookups/flattened.py: {} $lookups/github_app_access_token.py: - maintainers: weisheng-p + maintainers: weisheng-p blavoie $lookups/hiera.py: maintainers: jparrill $lookups/keyring.py: {} diff --git a/changelogs/fragments/10299-github_app_access_token-lookup.yml b/changelogs/fragments/10299-github_app_access_token-lookup.yml new file mode 100644 index 0000000000..59233e2a05 --- /dev/null +++ b/changelogs/fragments/10299-github_app_access_token-lookup.yml @@ -0,0 +1,2 @@ +minor_changes: + - github_app_access_token lookup plugin - support both ``jwt`` and ``pyjwt`` to avoid conflict with other modules requirements (https://github.com/ansible-collections/community.general/issues/10299). diff --git a/plugins/lookup/github_app_access_token.py b/plugins/lookup/github_app_access_token.py index dbc8cde3b5..e4ed433231 100644 --- a/plugins/lookup/github_app_access_token.py +++ b/plugins/lookup/github_app_access_token.py @@ -9,10 +9,12 @@ DOCUMENTATION = r""" name: github_app_access_token author: - Poh Wei Sheng (@weisheng-p) + - Bruno Lavoie (@blavoie) short_description: Obtain short-lived Github App Access tokens version_added: '8.2.0' requirements: - - jwt (https://github.com/GehirnInc/python-jwt) + - jwt (https://github.com/GehirnInc/python-jwt) OR + - PyJWT (https://pypi.org/project/PyJWT/) AND cryptography (https://pypi.org/project/cryptography/) description: - This generates a Github access token that can be used with a C(git) command, if you use a Github App. options: @@ -66,13 +68,24 @@ _raw: elements: str """ - try: - from jwt import JWT, jwk_from_pem + import jwt HAS_JWT = True except ImportError: HAS_JWT = False +HAS_PYTHON_JWT = False # vs pyjwt +if HAS_JWT and hasattr(jwt, 'JWT'): + HAS_PYTHON_JWT = True + from jwt import jwk_from_pem, jwt_instance + +try: + from cryptography.hazmat.primitives import serialization + HAS_CRYPTOGRAPHY = True +except ImportError: + HAS_CRYPTOGRAPHY = False + + import time import json from ansible.module_utils.urls import open_url @@ -81,26 +94,52 @@ from ansible.errors import AnsibleError, AnsibleOptionsError from ansible.plugins.lookup import LookupBase from ansible.utils.display import Display -if HAS_JWT: - jwt_instance = JWT() -else: - jwk_from_pem = None - jwt_instance = None - display = Display() +class PythonJWT: + + @staticmethod + def read_key(path, private_key=None): + try: + if private_key: + return jwk_from_pem(private_key.encode('utf-8')) + with open(path, 'rb') as pem_file: + return jwk_from_pem(pem_file.read()) + except Exception as e: + raise AnsibleError(f"Error while parsing key file: {e}") + + @staticmethod + def encode_jwt(app_id, jwk, exp=600): + now = int(time.time()) + payload = { + 'iat': now, + 'exp': now + exp, + 'iss': app_id, + } + try: + return jwt_instance.encode(payload, jwk, alg='RS256') + except Exception as e: + raise AnsibleError(f"Error while encoding jwt: {e}") + + def read_key(path, private_key=None): + if HAS_PYTHON_JWT: + return PythonJWT.read_key(path, private_key) try: if private_key: - return jwk_from_pem(private_key.encode('utf-8')) - with open(path, 'rb') as pem_file: - return jwk_from_pem(pem_file.read()) + key_bytes = private_key.encode('utf-8') + else: + with open(path, 'rb') as pem_file: + key_bytes = pem_file.read() + return serialization.load_pem_private_key(key_bytes, password=None) except Exception as e: raise AnsibleError(f"Error while parsing key file: {e}") -def encode_jwt(app_id, jwk, exp=600): +def encode_jwt(app_id, private_key_obj, exp=600): + if HAS_PYTHON_JWT: + return PythonJWT.encode_jwt(app_id, private_key_obj) now = int(time.time()) payload = { 'iat': now, @@ -108,7 +147,7 @@ def encode_jwt(app_id, jwk, exp=600): 'iss': app_id, } try: - return jwt_instance.encode(payload, jwk, alg='RS256') + return jwt.encode(payload, private_key_obj, algorithm='RS256') except Exception as e: raise AnsibleError(f"Error while encoding jwt: {e}") @@ -150,7 +189,11 @@ class LookupModule(LookupBase): def run(self, terms, variables=None, **kwargs): if not HAS_JWT: raise AnsibleError('Python jwt library is required. ' - 'Please install using "pip install jwt"') + 'Please install using "pip install pyjwt"') + + if not HAS_PYTHON_JWT and not HAS_CRYPTOGRAPHY: + raise AnsibleError('Python cryptography library is required. ' + 'Please install using "pip install cryptography"') self.set_options(var_options=variables, direct=kwargs) diff --git a/tests/unit/plugins/lookup/test_github_app_access_token.py b/tests/unit/plugins/lookup/test_github_app_access_token.py index 7dd907c9ee..698fc0ff55 100644 --- a/tests/unit/plugins/lookup/test_github_app_access_token.py +++ b/tests/unit/plugins/lookup/test_github_app_access_token.py @@ -6,19 +6,29 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type import json +import types +import sys from ansible_collections.community.internal_test_tools.tests.unit.compat import unittest from ansible_collections.community.internal_test_tools.tests.unit.compat.mock import ( patch, MagicMock, - mock_open + mock_open, ) from ansible.plugins.loader import lookup_loader +ENCODE_RESULT = 'Foobar' +PRIVATE_KEY = 'private_key' + class MockJWT(MagicMock): def encode(self, payload, key, alg): - return 'Foobar' + return ENCODE_RESULT + + +class serialization(MagicMock): + def load_pem_private_key(self, key_bytes, password): + return PRIVATE_KEY class MockResponse(MagicMock): @@ -31,14 +41,17 @@ class MockResponse(MagicMock): class TestLookupModule(unittest.TestCase): + def test_get_token_with_file_with_pyjwt(self): + pyjwt = types.ModuleType("jwt") + pyjwt.encode = MagicMock(return_value=ENCODE_RESULT) + with patch.dict(sys.modules, {'jwt': pyjwt}), \ + patch.multiple("ansible_collections.community.general.plugins.lookup.github_app_access_token", + open=mock_open(read_data="foo_bar"), + open_url=MagicMock(return_value=MockResponse()), + HAS_JWT=True, + HAS_CRYPTOGRAPHY=True, + serialization=serialization()): - def test_get_token_with_file(self): - with patch.multiple("ansible_collections.community.general.plugins.lookup.github_app_access_token", - open=mock_open(read_data="foo_bar"), - open_url=MagicMock(return_value=MockResponse()), - jwk_from_pem=MagicMock(return_value='private_key'), - jwt_instance=MockJWT(), - HAS_JWT=True): lookup = lookup_loader.get('community.general.github_app_access_token') self.assertListEqual( [MockResponse.response_token], @@ -51,12 +64,61 @@ class TestLookupModule(unittest.TestCase): ) ) - def test_get_token_with_fact(self): - with patch.multiple("ansible_collections.community.general.plugins.lookup.github_app_access_token", - open_url=MagicMock(return_value=MockResponse()), - jwk_from_pem=MagicMock(return_value='private_key'), - jwt_instance=MockJWT(), - HAS_JWT=True): + def test_get_token_with_fact_with_pyjwt(self): + pyjwt = types.ModuleType("jwt") + pyjwt.encode = MagicMock(return_value=ENCODE_RESULT) + with patch.dict(sys.modules, {'jwt': pyjwt}), \ + patch.multiple("ansible_collections.community.general.plugins.lookup.github_app_access_token", + open=mock_open(read_data="foo_bar"), + open_url=MagicMock(return_value=MockResponse()), + HAS_JWT=True, + HAS_CRYPTOGRAPHY=True, + serialization=serialization()): + + lookup = lookup_loader.get('community.general.github_app_access_token') + self.assertListEqual( + [MockResponse.response_token], + lookup.run( + [], + app_id="app_id", + installation_id="installation_id", + private_key="foo_bar", + token_expiry=600 + ) + ) + + def test_get_token_with_python_jwt(self): + python_jwt = types.ModuleType("jwt") + python_jwt.JWT = MagicMock() + python_jwt.jwk_from_pem = MagicMock(return_value='private_key') + python_jwt.jwt_instance = MockJWT() + with patch.dict(sys.modules, {'jwt': python_jwt}), \ + patch.multiple("ansible_collections.community.general.plugins.lookup.github_app_access_token", + open=mock_open(read_data="foo_bar"), + open_url=MagicMock(return_value=MockResponse()), + HAS_JWT=True): + lookup = lookup_loader.get('community.general.github_app_access_token') + self.assertListEqual( + [MockResponse.response_token], + lookup.run( + [], + key_path="key", + app_id="app_id", + installation_id="installation_id", + token_expiry=600 + ) + ) + + def test_get_token_with_fact_with_python_jwt(self): + python_jwt = types.ModuleType("jwt") + python_jwt.JWT = MagicMock() + python_jwt.jwk_from_pem = MagicMock(return_value='private_key') + python_jwt.jwt_instance = MockJWT() + with patch.dict(sys.modules, {'jwt': python_jwt}), \ + patch.multiple("ansible_collections.community.general.plugins.lookup.github_app_access_token", + open=mock_open(read_data="foo_bar"), + open_url=MagicMock(return_value=MockResponse()), + HAS_JWT=True): lookup = lookup_loader.get('community.general.github_app_access_token') self.assertListEqual( [MockResponse.response_token],