mirror of
				https://github.com/ansible-collections/community.general.git
				synced 2025-10-25 13:34:01 -07:00 
			
		
		
		
	
		
			
				
	
	
		
			444 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			444 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| # Copyright (c) 2020, Adam Migus <adam@migus.org>
 | |
| # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
 | |
| # SPDX-License-Identifier: GPL-3.0-or-later
 | |
| from __future__ import absolute_import, division, print_function
 | |
| 
 | |
| __metaclass__ = type
 | |
| 
 | |
| DOCUMENTATION = r"""
 | |
| name: tss
 | |
| author: Adam Migus (@amigus) <adam@migus.org>
 | |
| short_description: Get secrets from Thycotic Secret Server
 | |
| version_added: 1.0.0
 | |
| description:
 | |
|   - Uses the Thycotic Secret Server Python SDK to get Secrets from Secret Server using token authentication with O(username)
 | |
|     and O(password) on the REST API at O(base_url).
 | |
|   - When using self-signed certificates the environment variable E(REQUESTS_CA_BUNDLE) can be set to a file containing the
 | |
|     trusted certificates (in C(.pem) format).
 | |
|   - For example, C(export REQUESTS_CA_BUNDLE='/etc/ssl/certs/ca-bundle.trust.crt').
 | |
| requirements:
 | |
|   - python-tss-sdk - https://pypi.org/project/python-tss-sdk/
 | |
| options:
 | |
|   _terms:
 | |
|     description: The integer ID of the secret.
 | |
|     required: true
 | |
|     type: list
 | |
|     elements: int
 | |
|   secret_path:
 | |
|     description: Indicate a full path of secret including folder and secret name when the secret ID is set to 0.
 | |
|     required: false
 | |
|     type: str
 | |
|     version_added: 7.2.0
 | |
|   fetch_secret_ids_from_folder:
 | |
|     description:
 | |
|       - Boolean flag which indicates whether secret IDs are in a folder is fetched by folder ID or not.
 | |
|       - V(true) then the terms are considered as a folder IDs. Otherwise (default), they are considered as secret IDs.
 | |
|     required: false
 | |
|     type: bool
 | |
|     version_added: 7.1.0
 | |
|   fetch_attachments:
 | |
|     description:
 | |
|       - Boolean flag which indicates whether attached files are downloaded or not.
 | |
|       - The download only happens if O(file_download_path) has been provided.
 | |
|     required: false
 | |
|     type: bool
 | |
|     version_added: 7.0.0
 | |
|   file_download_path:
 | |
|     description: Indicate the file attachment download location.
 | |
|     required: false
 | |
|     type: path
 | |
|     version_added: 7.0.0
 | |
|   base_url:
 | |
|     description: The base URL of the server, for example V(https://localhost/SecretServer).
 | |
|     type: string
 | |
|     env:
 | |
|       - name: TSS_BASE_URL
 | |
|     ini:
 | |
|       - section: tss_lookup
 | |
|         key: base_url
 | |
|     required: true
 | |
|   username:
 | |
|     description: The username with which to request the OAuth2 Access Grant.
 | |
|     type: string
 | |
|     env:
 | |
|       - name: TSS_USERNAME
 | |
|     ini:
 | |
|       - section: tss_lookup
 | |
|         key: username
 | |
|   password:
 | |
|     description:
 | |
|       - The password associated with the supplied username.
 | |
|       - Required when O(token) is not provided.
 | |
|     type: string
 | |
|     env:
 | |
|       - name: TSS_PASSWORD
 | |
|     ini:
 | |
|       - section: tss_lookup
 | |
|         key: password
 | |
|   domain:
 | |
|     default: ""
 | |
|     description:
 | |
|       - The domain with which to request the OAuth2 Access Grant.
 | |
|       - Optional when O(token) is not provided.
 | |
|       - Requires C(python-tss-sdk) version 1.0.0 or greater.
 | |
|     type: string
 | |
|     env:
 | |
|       - name: TSS_DOMAIN
 | |
|     ini:
 | |
|       - section: tss_lookup
 | |
|         key: domain
 | |
|     required: false
 | |
|     version_added: 3.6.0
 | |
|   token:
 | |
|     description:
 | |
|       - Existing token for Thycotic authorizer.
 | |
|       - If provided, O(username) and O(password) are not needed.
 | |
|       - Requires C(python-tss-sdk) version 1.0.0 or greater.
 | |
|     type: string
 | |
|     env:
 | |
|       - name: TSS_TOKEN
 | |
|     ini:
 | |
|       - section: tss_lookup
 | |
|         key: token
 | |
|     version_added: 3.7.0
 | |
|   api_path_uri:
 | |
|     default: /api/v1
 | |
|     description: The path to append to the base URL to form a valid REST API request.
 | |
|     type: string
 | |
|     env:
 | |
|       - name: TSS_API_PATH_URI
 | |
|     required: false
 | |
|   token_path_uri:
 | |
|     default: /oauth2/token
 | |
|     description: The path to append to the base URL to form a valid OAuth2 Access Grant request.
 | |
|     type: string
 | |
|     env:
 | |
|       - name: TSS_TOKEN_PATH_URI
 | |
|     required: false
 | |
| """
 | |
| 
 | |
| RETURN = r"""
 | |
| _list:
 | |
|   description:
 | |
|     - The JSON responses to C(GET /secrets/{id}).
 | |
|     - See U(https://updates.thycotic.net/secretserver/restapiguide/TokenAuth/#operation--secrets--id--get).
 | |
|   type: list
 | |
|   elements: dict
 | |
| """
 | |
| 
 | |
| EXAMPLES = r"""
 | |
| - hosts: localhost
 | |
|   vars:
 | |
|     secret: >-
 | |
|       {{
 | |
|         lookup(
 | |
|           'community.general.tss',
 | |
|           102,
 | |
|           base_url='https://secretserver.domain.com/SecretServer/',
 | |
|           username='user.name',
 | |
|           password='password'
 | |
|         )
 | |
|       }}
 | |
|   tasks:
 | |
|     - ansible.builtin.debug:
 | |
|         msg: >
 | |
|           the password is {{
 | |
|             (secret['items']
 | |
|               | items2dict(key_name='slug',
 | |
|                            value_name='itemValue'))['password']
 | |
|           }}
 | |
| 
 | |
| - hosts: localhost
 | |
|   vars:
 | |
|     secret: >-
 | |
|       {{
 | |
|         lookup(
 | |
|           'community.general.tss',
 | |
|           102,
 | |
|           base_url='https://secretserver.domain.com/SecretServer/',
 | |
|           username='user.name',
 | |
|           password='password',
 | |
|           domain='domain'
 | |
|         )
 | |
|       }}
 | |
|   tasks:
 | |
|     - ansible.builtin.debug:
 | |
|         msg: >
 | |
|           the password is {{
 | |
|             (secret['items']
 | |
|               | items2dict(key_name='slug',
 | |
|                            value_name='itemValue'))['password']
 | |
|           }}
 | |
| 
 | |
| - hosts: localhost
 | |
|   vars:
 | |
|     secret_password: >-
 | |
|       {{
 | |
|         ((lookup(
 | |
|           'community.general.tss',
 | |
|           102,
 | |
|           base_url='https://secretserver.domain.com/SecretServer/',
 | |
|           token='thycotic_access_token',
 | |
|         ) | from_json).get('items') | items2dict(key_name='slug', value_name='itemValue'))['password']
 | |
|       }}
 | |
|   tasks:
 | |
|     - ansible.builtin.debug:
 | |
|         msg: the password is {{ secret_password }}
 | |
| 
 | |
| # Private key stores into certificate file which is attached with secret.
 | |
| # If fetch_attachments=True then private key file will be download on specified path
 | |
| # and file content will display in debug message.
 | |
| - hosts: localhost
 | |
|   vars:
 | |
|     secret: >-
 | |
|       {{
 | |
|         lookup(
 | |
|           'community.general.tss',
 | |
|           102,
 | |
|           fetch_attachments=True,
 | |
|           file_download_path='/home/certs',
 | |
|           base_url='https://secretserver.domain.com/SecretServer/',
 | |
|           token='thycotic_access_token'
 | |
|         )
 | |
|       }}
 | |
|   tasks:
 | |
|     - ansible.builtin.debug:
 | |
|         msg: >
 | |
|           the private key is {{
 | |
|             (secret['items']
 | |
|               | items2dict(key_name='slug',
 | |
|                            value_name='itemValue'))['private-key']
 | |
|           }}
 | |
| 
 | |
| # If fetch_secret_ids_from_folder=true then secret IDs are in a folder is fetched based on folder ID
 | |
| - hosts: localhost
 | |
|   vars:
 | |
|     secret: >-
 | |
|       {{
 | |
|         lookup(
 | |
|           'community.general.tss',
 | |
|           102,
 | |
|           fetch_secret_ids_from_folder=true,
 | |
|           base_url='https://secretserver.domain.com/SecretServer/',
 | |
|           token='thycotic_access_token'
 | |
|         )
 | |
|       }}
 | |
|   tasks:
 | |
|     - ansible.builtin.debug:
 | |
|         msg: >
 | |
|           the secret id's are {{
 | |
|               secret
 | |
|           }}
 | |
| 
 | |
| # If secret ID is 0 and secret_path has value then secret is fetched by secret path
 | |
| - hosts: localhost
 | |
|   vars:
 | |
|     secret: >-
 | |
|       {{
 | |
|         lookup(
 | |
|           'community.general.tss',
 | |
|           0,
 | |
|           secret_path='\folderName\secretName'
 | |
|           base_url='https://secretserver.domain.com/SecretServer/',
 | |
|           username='user.name',
 | |
|           password='password'
 | |
|         )
 | |
|       }}
 | |
|   tasks:
 | |
|     - ansible.builtin.debug:
 | |
|         msg: >-
 | |
|           the password is {{
 | |
|             (secret['items']
 | |
|               | items2dict(key_name='slug',
 | |
|                            value_name='itemValue'))['password']
 | |
|           }}
 | |
| """
 | |
| 
 | |
| import abc
 | |
| import os
 | |
| from ansible.errors import AnsibleError, AnsibleOptionsError
 | |
| from ansible.plugins.lookup import LookupBase
 | |
| from ansible.utils.display import Display
 | |
| 
 | |
| try:
 | |
|     from delinea.secrets.server import SecretServer, SecretServerError, PasswordGrantAuthorizer, DomainPasswordGrantAuthorizer, AccessTokenAuthorizer
 | |
| 
 | |
|     HAS_TSS_SDK = True
 | |
|     HAS_DELINEA_SS_SDK = True
 | |
|     HAS_TSS_AUTHORIZER = True
 | |
| except ImportError:
 | |
|     try:
 | |
|         from thycotic.secrets.server import SecretServer, SecretServerError, PasswordGrantAuthorizer, DomainPasswordGrantAuthorizer, AccessTokenAuthorizer
 | |
| 
 | |
|         HAS_TSS_SDK = True
 | |
|         HAS_DELINEA_SS_SDK = False
 | |
|         HAS_TSS_AUTHORIZER = True
 | |
|     except ImportError:
 | |
|         SecretServer = None
 | |
|         SecretServerError = None
 | |
|         HAS_TSS_SDK = False
 | |
|         HAS_DELINEA_SS_SDK = False
 | |
|         PasswordGrantAuthorizer = None
 | |
|         DomainPasswordGrantAuthorizer = None
 | |
|         AccessTokenAuthorizer = None
 | |
|         HAS_TSS_AUTHORIZER = False
 | |
| 
 | |
| 
 | |
| display = Display()
 | |
| 
 | |
| 
 | |
| class TSSClient(object, metaclass=abc.ABCMeta):
 | |
|     def __init__(self):
 | |
|         self._client = None
 | |
| 
 | |
|     @staticmethod
 | |
|     def from_params(**server_parameters):
 | |
|         if HAS_TSS_AUTHORIZER:
 | |
|             return TSSClientV1(**server_parameters)
 | |
|         else:
 | |
|             return TSSClientV0(**server_parameters)
 | |
| 
 | |
|     def get_secret(self, term, secret_path, fetch_file_attachments, file_download_path):
 | |
|         display.debug(f"tss_lookup term: {term}")
 | |
|         secret_id = self._term_to_secret_id(term)
 | |
|         if secret_id == 0 and secret_path:
 | |
|             fetch_secret_by_path = True
 | |
|             display.vvv(f"Secret Server lookup of Secret with path {secret_path}")
 | |
|         else:
 | |
|             fetch_secret_by_path = False
 | |
|             display.vvv(f"Secret Server lookup of Secret with ID {secret_id}")
 | |
| 
 | |
|         if fetch_file_attachments:
 | |
|             if fetch_secret_by_path:
 | |
|                 obj = self._client.get_secret_by_path(secret_path, fetch_file_attachments)
 | |
|             else:
 | |
|                 obj = self._client.get_secret(secret_id, fetch_file_attachments)
 | |
|             for i in obj['items']:
 | |
|                 if file_download_path and os.path.isdir(file_download_path):
 | |
|                     if i['isFile']:
 | |
|                         try:
 | |
|                             file_content = i['itemValue'].content
 | |
|                             with open(os.path.join(file_download_path, f"{obj['id']}_{i['slug']}"), "wb") as f:
 | |
|                                 f.write(file_content)
 | |
|                         except ValueError:
 | |
|                             raise AnsibleOptionsError(f"Failed to download {i['slug']}")
 | |
|                         except AttributeError:
 | |
|                             display.warning(f"Could not read file content for {i['slug']}")
 | |
|                         finally:
 | |
|                             i['itemValue'] = "*** Not Valid For Display ***"
 | |
|                 else:
 | |
|                     raise AnsibleOptionsError("File download path does not exist")
 | |
|             return obj
 | |
|         else:
 | |
|             if fetch_secret_by_path:
 | |
|                 return self._client.get_secret_by_path(secret_path, False)
 | |
|             else:
 | |
|                 return self._client.get_secret_json(secret_id)
 | |
| 
 | |
|     def get_secret_ids_by_folderid(self, term):
 | |
|         display.debug(f"tss_lookup term: {term}")
 | |
|         folder_id = self._term_to_folder_id(term)
 | |
|         display.vvv(f"Secret Server lookup of Secret id's with Folder ID {folder_id}")
 | |
| 
 | |
|         return self._client.get_secret_ids_by_folderid(folder_id)
 | |
| 
 | |
|     @staticmethod
 | |
|     def _term_to_secret_id(term):
 | |
|         try:
 | |
|             return int(term)
 | |
|         except ValueError:
 | |
|             raise AnsibleOptionsError("Secret ID must be an integer")
 | |
| 
 | |
|     @staticmethod
 | |
|     def _term_to_folder_id(term):
 | |
|         try:
 | |
|             return int(term)
 | |
|         except ValueError:
 | |
|             raise AnsibleOptionsError("Folder ID must be an integer")
 | |
| 
 | |
| 
 | |
| class TSSClientV0(TSSClient):
 | |
|     def __init__(self, **server_parameters):
 | |
|         super(TSSClientV0, self).__init__()
 | |
| 
 | |
|         if server_parameters.get("domain"):
 | |
|             raise AnsibleError("The 'domain' option requires 'python-tss-sdk' version 1.0.0 or greater")
 | |
| 
 | |
|         self._client = SecretServer(
 | |
|             server_parameters["base_url"],
 | |
|             server_parameters["username"],
 | |
|             server_parameters["password"],
 | |
|             server_parameters["api_path_uri"],
 | |
|             server_parameters["token_path_uri"],
 | |
|         )
 | |
| 
 | |
| 
 | |
| class TSSClientV1(TSSClient):
 | |
|     def __init__(self, **server_parameters):
 | |
|         super(TSSClientV1, self).__init__()
 | |
| 
 | |
|         authorizer = self._get_authorizer(**server_parameters)
 | |
|         self._client = SecretServer(
 | |
|             server_parameters["base_url"], authorizer, server_parameters["api_path_uri"]
 | |
|         )
 | |
| 
 | |
|     @staticmethod
 | |
|     def _get_authorizer(**server_parameters):
 | |
|         if server_parameters.get("token"):
 | |
|             return AccessTokenAuthorizer(
 | |
|                 server_parameters["token"],
 | |
|             )
 | |
| 
 | |
|         if server_parameters.get("domain"):
 | |
|             return DomainPasswordGrantAuthorizer(
 | |
|                 server_parameters["base_url"],
 | |
|                 server_parameters["username"],
 | |
|                 server_parameters["domain"],
 | |
|                 server_parameters["password"],
 | |
|                 server_parameters["token_path_uri"],
 | |
|             )
 | |
| 
 | |
|         return PasswordGrantAuthorizer(
 | |
|             server_parameters["base_url"],
 | |
|             server_parameters["username"],
 | |
|             server_parameters["password"],
 | |
|             server_parameters["token_path_uri"],
 | |
|         )
 | |
| 
 | |
| 
 | |
| class LookupModule(LookupBase):
 | |
|     def run(self, terms, variables, **kwargs):
 | |
|         if not HAS_TSS_SDK:
 | |
|             raise AnsibleError("python-tss-sdk must be installed to use this plugin")
 | |
| 
 | |
|         self.set_options(var_options=variables, direct=kwargs)
 | |
| 
 | |
|         tss = TSSClient.from_params(
 | |
|             base_url=self.get_option("base_url"),
 | |
|             username=self.get_option("username"),
 | |
|             password=self.get_option("password"),
 | |
|             domain=self.get_option("domain"),
 | |
|             token=self.get_option("token"),
 | |
|             api_path_uri=self.get_option("api_path_uri"),
 | |
|             token_path_uri=self.get_option("token_path_uri"),
 | |
|         )
 | |
| 
 | |
|         try:
 | |
|             if self.get_option("fetch_secret_ids_from_folder"):
 | |
|                 if HAS_DELINEA_SS_SDK:
 | |
|                     return [tss.get_secret_ids_by_folderid(term) for term in terms]
 | |
|                 else:
 | |
|                     raise AnsibleError("latest python-tss-sdk must be installed to use this plugin")
 | |
|             else:
 | |
|                 return [
 | |
|                     tss.get_secret(
 | |
|                         term,
 | |
|                         self.get_option("secret_path"),
 | |
|                         self.get_option("fetch_attachments"),
 | |
|                         self.get_option("file_download_path"),
 | |
|                     )
 | |
|                     for term in terms
 | |
|                 ]
 | |
|         except SecretServerError as error:
 | |
|             raise AnsibleError(f"Secret Server lookup failure: {error.message}")
 |