mirror of
				https://github.com/ansible-collections/community.general.git
				synced 2025-10-25 05:23:58 -07:00 
			
		
		
		
	
		
			Some checks are pending
		
		
	
	EOL CI / EOL Sanity (Ⓐ2.17) (push) Waiting to run
				
			EOL CI / EOL Units (Ⓐ2.17+py3.10) (push) Waiting to run
				
			EOL CI / EOL Units (Ⓐ2.17+py3.12) (push) Waiting to run
				
			EOL CI / EOL Units (Ⓐ2.17+py3.7) (push) Waiting to run
				
			EOL CI / EOL I (Ⓐ2.17+alpine319+py:azp/posix/1/) (push) Waiting to run
				
			EOL CI / EOL I (Ⓐ2.17+alpine319+py:azp/posix/2/) (push) Waiting to run
				
			EOL CI / EOL I (Ⓐ2.17+alpine319+py:azp/posix/3/) (push) Waiting to run
				
			EOL CI / EOL I (Ⓐ2.17+fedora39+py:azp/posix/1/) (push) Waiting to run
				
			EOL CI / EOL I (Ⓐ2.17+fedora39+py:azp/posix/2/) (push) Waiting to run
				
			EOL CI / EOL I (Ⓐ2.17+fedora39+py:azp/posix/3/) (push) Waiting to run
				
			EOL CI / EOL I (Ⓐ2.17+ubuntu2004+py:azp/posix/1/) (push) Waiting to run
				
			EOL CI / EOL I (Ⓐ2.17+ubuntu2004+py:azp/posix/2/) (push) Waiting to run
				
			EOL CI / EOL I (Ⓐ2.17+ubuntu2004+py:azp/posix/3/) (push) Waiting to run
				
			nox / Run extra sanity tests (push) Waiting to run
				
			* Adjust all __future__ imports: for i in $(grep -REl "__future__.*absolute_import" plugins/ tests/); do sed -e 's/from __future__ import .*/from __future__ import annotations/g' -i $i; done * Remove all UTF-8 encoding specifications for Python source files: for i in $(grep -REl '[-][*]- coding: utf-8 -[*]-' plugins/ tests/); do sed -e '/^# -\*- coding: utf-8 -\*-/d' -i $i; done * Remove __metaclass__ = type: for i in $(grep -REl '__metaclass__ = type' plugins/ tests/); do sed -e '/^__metaclass__ = type/d' -i $i; done
		
			
				
	
	
		
			295 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			295 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright (c) 2022, Jonathan Lung <lungj@heresjono.com>
 | |
| # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
 | |
| # SPDX-License-Identifier: GPL-3.0-or-later
 | |
| from __future__ import annotations
 | |
| 
 | |
| DOCUMENTATION = r"""
 | |
| name: bitwarden
 | |
| author:
 | |
|   - Jonathan Lung (@lungj) <lungj@heresjono.com>
 | |
| requirements:
 | |
|   - bw (command line utility)
 | |
|   - be logged into bitwarden
 | |
|   - bitwarden vault unlocked
 | |
|   - E(BW_SESSION) environment variable set
 | |
| short_description: Retrieve secrets from Bitwarden
 | |
| version_added: 5.4.0
 | |
| description:
 | |
|   - Retrieve secrets from Bitwarden.
 | |
| options:
 | |
|   _terms:
 | |
|     description: Key(s) to fetch values for from login info.
 | |
|     required: true
 | |
|     type: list
 | |
|     elements: str
 | |
|   search:
 | |
|     description:
 | |
|       - Field to retrieve, for example V(name) or V(id).
 | |
|       - If set to V(id), only zero or one element can be returned. Use the Jinja C(first) filter to get the only list element.
 | |
|       - If set to V(None) or V(''), or if O(_terms) is empty, records are not filtered by fields.
 | |
|     type: str
 | |
|     default: name
 | |
|     version_added: 5.7.0
 | |
|   field:
 | |
|     description: Field to fetch. Leave unset to fetch whole response.
 | |
|     type: str
 | |
|   collection_id:
 | |
|     description:
 | |
|       - Collection ID to filter results by collection. Leave unset to skip filtering.
 | |
|       - O(collection_id) and O(collection_name) are mutually exclusive.
 | |
|     type: str
 | |
|     version_added: 6.3.0
 | |
|   collection_name:
 | |
|     description:
 | |
|       - Collection name to filter results by collection. Leave unset to skip filtering.
 | |
|       - O(collection_id) and O(collection_name) are mutually exclusive.
 | |
|     type: str
 | |
|     version_added: 10.4.0
 | |
|   organization_id:
 | |
|     description: Organization ID to filter results by organization. Leave unset to skip filtering.
 | |
|     type: str
 | |
|     version_added: 8.5.0
 | |
|   bw_session:
 | |
|     description: Pass session key instead of reading from env.
 | |
|     type: str
 | |
|     version_added: 8.4.0
 | |
|   result_count:
 | |
|     description:
 | |
|       - Number of results expected for the lookup query. Task fails if O(result_count) is set but does not match the number
 | |
|         of query results. Leave empty to skip this check.
 | |
|     type: int
 | |
|     version_added: 10.4.0
 | |
| """
 | |
| 
 | |
| EXAMPLES = r"""
 | |
| - name: "Get 'password' from all Bitwarden records named 'a_test'"
 | |
|   ansible.builtin.debug:
 | |
|     msg: >-
 | |
|       {{ lookup('community.general.bitwarden', 'a_test', field='password') }}
 | |
| 
 | |
| - name: "Get 'password' from Bitwarden record with ID 'bafba515-af11-47e6-abe3-af1200cd18b2'"
 | |
|   ansible.builtin.debug:
 | |
|     msg: >-
 | |
|       {{ lookup('community.general.bitwarden', 'bafba515-af11-47e6-abe3-af1200cd18b2', search='id', field='password') | first }}
 | |
| 
 | |
| - name: "Get 'password' from all Bitwarden records named 'a_test' from collection"
 | |
|   ansible.builtin.debug:
 | |
|     msg: >-
 | |
|       {{ lookup('community.general.bitwarden', 'a_test', field='password', collection_id='bafba515-af11-47e6-abe3-af1200cd18b2') }}
 | |
| 
 | |
| - name: "Get list of all full Bitwarden records named 'a_test'"
 | |
|   ansible.builtin.debug:
 | |
|     msg: >-
 | |
|       {{ lookup('community.general.bitwarden', 'a_test') }}
 | |
| 
 | |
| - name: "Get custom field 'api_key' from all Bitwarden records named 'a_test'"
 | |
|   ansible.builtin.debug:
 | |
|     msg: >-
 | |
|       {{ lookup('community.general.bitwarden', 'a_test', field='api_key') }}
 | |
| 
 | |
| - name: "Get 'password' from all Bitwarden records named 'a_test', using given session key"
 | |
|   ansible.builtin.debug:
 | |
|     msg: >-
 | |
|       {{ lookup('community.general.bitwarden', 'a_test', field='password', bw_session='bXZ9B5TXi6...') }}
 | |
| 
 | |
| - name: "Get all Bitwarden records from collection"
 | |
|   ansible.builtin.debug:
 | |
|     msg: >-
 | |
|       {{ lookup('community.general.bitwarden', None, collection_id='bafba515-af11-47e6-abe3-af1200cd18b2') }}
 | |
| 
 | |
| - name: "Get all Bitwarden records from collection"
 | |
|   ansible.builtin.debug:
 | |
|     msg: >-
 | |
|       {{ lookup('community.general.bitwarden', None, collection_name='my_collections/test_collection') }}
 | |
| 
 | |
| - name: "Get Bitwarden record named 'a_test', ensure there is exactly one match"
 | |
|   ansible.builtin.debug:
 | |
|     msg: >-
 | |
|       {{ lookup('community.general.bitwarden', 'a_test', result_count=1) }}
 | |
| """
 | |
| 
 | |
| RETURN = r"""
 | |
| _raw:
 | |
|   description:
 | |
|     - A one-element list that contains a list of requested fields or JSON objects of matches.
 | |
|     - If you use C(query), you get a list of lists. If you use C(lookup) without C(wantlist=true), this always gets reduced
 | |
|       to a list of field values or JSON objects.
 | |
|   type: list
 | |
|   elements: list
 | |
| """
 | |
| 
 | |
| from subprocess import Popen, PIPE
 | |
| 
 | |
| from ansible.errors import AnsibleError, AnsibleOptionsError
 | |
| from ansible.module_utils.common.text.converters import to_bytes, to_text
 | |
| from ansible.parsing.ajson import AnsibleJSONDecoder
 | |
| from ansible.plugins.lookup import LookupBase
 | |
| 
 | |
| 
 | |
| class BitwardenException(AnsibleError):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class Bitwarden(object):
 | |
| 
 | |
|     def __init__(self, path='bw'):
 | |
|         self._cli_path = path
 | |
|         self._session = None
 | |
| 
 | |
|     @property
 | |
|     def cli_path(self):
 | |
|         return self._cli_path
 | |
| 
 | |
|     @property
 | |
|     def session(self):
 | |
|         return self._session
 | |
| 
 | |
|     @session.setter
 | |
|     def session(self, value):
 | |
|         self._session = value
 | |
| 
 | |
|     @property
 | |
|     def unlocked(self):
 | |
|         out, err = self._run(['status'], stdin="")
 | |
|         decoded = AnsibleJSONDecoder().raw_decode(out)[0]
 | |
|         return decoded['status'] == 'unlocked'
 | |
| 
 | |
|     def _run(self, args, stdin=None, expected_rc=0):
 | |
|         if self.session:
 | |
|             args += ['--session', self.session]
 | |
| 
 | |
|         p = Popen([self.cli_path] + args, stdout=PIPE, stderr=PIPE, stdin=PIPE)
 | |
|         out, err = p.communicate(to_bytes(stdin))
 | |
|         rc = p.wait()
 | |
|         if rc != expected_rc:
 | |
|             if len(args) > 2 and args[0] == 'get' and args[1] == 'item' and b'Not found.' in err:
 | |
|                 return 'null', ''
 | |
|             raise BitwardenException(err)
 | |
|         return to_text(out, errors='surrogate_or_strict'), to_text(err, errors='surrogate_or_strict')
 | |
| 
 | |
|     def _get_matches(self, search_value, search_field, collection_id=None, organization_id=None):
 | |
|         """Return matching records whose search_field is equal to key.
 | |
|         """
 | |
| 
 | |
|         # Prepare set of params for Bitwarden CLI
 | |
|         if search_field == 'id':
 | |
|             params = ['get', 'item', search_value]
 | |
|         else:
 | |
|             params = ['list', 'items']
 | |
|             if search_value:
 | |
|                 params.extend(['--search', search_value])
 | |
| 
 | |
|         if collection_id:
 | |
|             params.extend(['--collectionid', collection_id])
 | |
|         if organization_id:
 | |
|             params.extend(['--organizationid', organization_id])
 | |
| 
 | |
|         out, err = self._run(params)
 | |
| 
 | |
|         # This includes things that matched in different fields.
 | |
|         initial_matches = AnsibleJSONDecoder().raw_decode(out)[0]
 | |
| 
 | |
|         if search_field == 'id':
 | |
|             if initial_matches is None:
 | |
|                 initial_matches = []
 | |
|             else:
 | |
|                 initial_matches = [initial_matches]
 | |
| 
 | |
|         # Filter to only include results from the right field, if a search is requested by value or field
 | |
|         return [item for item in initial_matches
 | |
|                 if not search_value or not search_field or item.get(search_field) == search_value]
 | |
| 
 | |
|     def get_field(self, field, search_value, search_field="name", collection_id=None, organization_id=None):
 | |
|         """Return a list of the specified field for records whose search_field match search_value
 | |
|         and filtered by collection if collection has been provided.
 | |
| 
 | |
|         If field is None, return the whole record for each match.
 | |
|         """
 | |
|         matches = self._get_matches(search_value, search_field, collection_id, organization_id)
 | |
|         if not field:
 | |
|             return matches
 | |
|         field_matches = []
 | |
|         for match in matches:
 | |
|             # if there are no custom fields, then `match` has no key 'fields'
 | |
|             if 'fields' in match:
 | |
|                 custom_field_found = False
 | |
|                 for custom_field in match['fields']:
 | |
|                     if field == custom_field['name']:
 | |
|                         field_matches.append(custom_field['value'])
 | |
|                         custom_field_found = True
 | |
|                         break
 | |
|                 if custom_field_found:
 | |
|                     continue
 | |
|             if 'login' in match and field in match['login']:
 | |
|                 field_matches.append(match['login'][field])
 | |
|                 continue
 | |
|             if field in match:
 | |
|                 field_matches.append(match[field])
 | |
|                 continue
 | |
| 
 | |
|         if matches and not field_matches:
 | |
|             raise AnsibleError(f"field {field} does not exist in {search_value}")
 | |
| 
 | |
|         return field_matches
 | |
| 
 | |
|     def get_collection_ids(self, collection_name: str, organization_id=None) -> list[str]:
 | |
|         """Return matching IDs of collections whose name is equal to collection_name."""
 | |
| 
 | |
|         # Prepare set of params for Bitwarden CLI
 | |
|         params = ['list', 'collections', '--search', collection_name]
 | |
| 
 | |
|         if organization_id:
 | |
|             params.extend(['--organizationid', organization_id])
 | |
| 
 | |
|         out, err = self._run(params)
 | |
| 
 | |
|         # This includes things that matched in different fields.
 | |
|         initial_matches = AnsibleJSONDecoder().raw_decode(out)[0]
 | |
| 
 | |
|         # Filter to only return the ID of a collections with exactly matching name
 | |
|         return [item['id'] for item in initial_matches
 | |
|                 if str(item.get('name')).lower() == collection_name.lower()]
 | |
| 
 | |
| 
 | |
| class LookupModule(LookupBase):
 | |
| 
 | |
|     def run(self, terms=None, variables=None, **kwargs):
 | |
|         self.set_options(var_options=variables, direct=kwargs)
 | |
|         field = self.get_option('field')
 | |
|         search_field = self.get_option('search')
 | |
|         collection_id = self.get_option('collection_id')
 | |
|         collection_name = self.get_option('collection_name')
 | |
|         organization_id = self.get_option('organization_id')
 | |
|         result_count = self.get_option('result_count')
 | |
|         _bitwarden.session = self.get_option('bw_session')
 | |
| 
 | |
|         if not _bitwarden.unlocked:
 | |
|             raise AnsibleError("Bitwarden Vault locked. Run 'bw unlock'.")
 | |
| 
 | |
|         if not terms:
 | |
|             terms = [None]
 | |
| 
 | |
|         if collection_name and collection_id:
 | |
|             raise AnsibleOptionsError("'collection_name' and 'collection_id' are mutually exclusive!")
 | |
|         elif collection_name:
 | |
|             collection_ids = _bitwarden.get_collection_ids(collection_name, organization_id)
 | |
|             if not collection_ids:
 | |
|                 raise BitwardenException("No matching collections found!")
 | |
|         else:
 | |
|             collection_ids = [collection_id]
 | |
| 
 | |
|         results = [
 | |
|             _bitwarden.get_field(field, term, search_field, collection_id, organization_id)
 | |
|             for collection_id in collection_ids
 | |
|             for term in terms
 | |
|         ]
 | |
| 
 | |
|         for result in results:
 | |
|             if result_count is not None and len(result) != result_count:
 | |
|                 raise BitwardenException(
 | |
|                     f"Number of results doesn't match result_count! ({len(result)} != {result_count})")
 | |
| 
 | |
|         return results
 | |
| 
 | |
| 
 | |
| _bitwarden = Bitwarden()
 |