updated lookuup plugin based on comment 76eb024c25#

This commit is contained in:
Dave Costakos 2023-06-22 14:36:55 -07:00
parent 76eb024c25
commit 953b06ff05
No known key found for this signature in database
GPG key ID: C4DC31A1B32AC45C
2 changed files with 146 additions and 91 deletions

View file

@ -19,9 +19,13 @@ DOCUMENTATION = '''
options:
key:
description:
- the key of the secret to look up in Secret Manager
- the name of the secret to look up in Secret Manager
type: str
required: True
aliases:
- name
- secret
- secret_id
project:
description:
- The name of the google cloud project
@ -57,11 +61,30 @@ DOCUMENTATION = '''
- defaults to OS env variable GCP_SERVICE_ACCOUNT_INFO if not present
type: jsonarg
required: False
errors:
access_token:
description:
- support for GCP Access Token
- defaults to OS env variable GCP_ACCESS_TOKEN if not present
type: str
required: False
on_error:
description:
- how to handle errors
choices: ['strict','warn','ignore']
default: strict
- strict means raise an exception
- warn means warn, and return none
- ignore means just return none
type: str
required: False
choices:
- 'strict'
- 'warn'
- 'ignore'
default: 'strict'
scopes:
description:
- Authenticaiton scopes for Google Secret Manager
type: list
default: ["https://www.googleapis.com/auth/cloud-platform"]
'''
EXAMPLES = '''
@ -99,6 +122,8 @@ import base64
from ansible.plugins.lookup import LookupBase
from ansible.errors import AnsibleError
from ansible.utils.display import Display
try:
import requests
@ -107,103 +132,117 @@ except ImportError:
HAS_REQUESTS = False
try:
import google.auth
from google.oauth2 import service_account
from google.auth.transport.requests import AuthorizedSession
HAS_GOOGLE_LIBRARIES = True
from ansible_collections.google.cloud.plugins.module_utils.gcp_utils import (
GcpSession,
)
HAS_GOOGLE_CLOUD_COLLECTION = True
except ImportError:
HAS_GOOGLE_LIBRARIES = False
HAS_GOOGLE_CLOUD_COLLECTION = False
from ansible_collections.google.cloud.plugins.module_utils.gcp_utils import GcpSession, GcpRequest
from ansible.errors import AnsibleError
from ansible.utils.display import Display
class GcpLookupException(Exception):
pass
class GcpMockModule(object):
def __init__(self, params):
self.params = params
def fail_json(self, *args, **kwargs):
raise AnsibleError(kwargs["msg"])
def raise_for_status(self, response):
try:
response.raise_for_status()
except getattr(requests.exceptions, "RequestException"):
self.fail_json(msg="GCP returned error: %s" % response.json())
class LookupModule(LookupBase):
def run(self, terms, variables, **kwargs):
def run(self, terms=None, variables=None, **kwargs):
self._display = Display()
if not HAS_GOOGLE_CLOUD_COLLECTION:
raise AnsibleError(
"gcp_secret lookup needs a supported version of the google.cloud collection installed. Use `ansible-galaxy collection install google.cloud` to install it"
)
self.set_options(var_options=variables, direct=kwargs)
self.scopes = ["https://www.googleapis.com/auth/cloud-platform"]
self._validate()
self.service_acct_creds = self._credentials()
session = AuthorizedSession(self.service_acct_creds)
response = session.get("https://secretmanager.googleapis.com/v1/projects/{project}/secrets/{key}/versions/{version}:access".format(**self.get_options()))
if response.status_code == 200:
result_data = response.json()
secret_value = base64.b64decode(result_data['payload']['data'])
return [ secret_value ]
params = {
"key": self.get_option("key"),
"version": self.get_option("version"),
"access_token": self.get_option("access_token"),
"scopes": self.get_option("scopes"),
"on_error": self.get_option("on_error")
}
params['name'] = params['key']
# support GCP_* env variables for some parameters
for param in ["project", "auth_kind", "service_account_file", "service_account_info", "service_account_email", "access_token"]:
params[param] = self.fallback_from_env(param)
self._display.vvv(msg=f"Module Parameters: {params}")
fake_module = GcpMockModule(params)
result = self.get_secret(fake_module)
return [base64.b64decode(result)]
def fallback_from_env(self, arg):
if self.get_option(arg):
return self.get_option(arg)
else:
if self.get_option('errors') == 'warn':
self.warn(f"secret request returned bad status: {response.status_code} {response.json()}")
return [ '' ]
elif self.get_option('error') == 'ignore':
return [ '' ]
else:
raise AnsibleError(f"secret request returned bad status: {response.status_code} {response.json()}")
env_name = f"GCP_{arg.upper()}"
if env_name in os.environ:
self.set_option(arg, os.environ[env_name])
return self.get_option(arg)
def _validate(self):
if HAS_GOOGLE_LIBRARIES == False:
raise AnsibleError("Please install the google-auth library")
if HAS_REQUESTS == False:
raise AnsibleError("Please install the requests library")
if self.get_option('key') == None:
raise AnsibleError("'key' is a required parameter")
if self.get_option('version') == None:
self.set_option('version', 'latest')
# set version to the latest version because
# we can't be sure that "latest" is always going
# to be set if secret versions get disabled
# see https://issuetracker.google.com/issues/286489671
def get_latest_version(self, module, auth):
url = "https://secretmanager.googleapis.com/v1/projects/{project}/secrets/{name}/versions?filter=state:ENABLED".format(
**module.params
)
response = auth.get(url)
self._display.vvv(msg=f"List Version Response: {response.status_code} for {response.request.url}: {response.json()}")
if response.status_code != 200:
self.raise_error(module, f"unable to list versions of secret {response.status_code}")
version_list = response.json()
if "versions" in version_list:
return sorted(version_list['versions'], key=lambda d: d['name'])[-1]['name'].split('/')[-1]
else:
self.raise_error(module, f"Unable to list secret versions via {response.request.url}: {response.json()}")
self._set_from_env('project', 'GCP_PROJECT', True)
self._set_from_env('auth_kind', 'GCP_AUTH_KIND', True)
self._set_from_env('service_account_email', 'GCP_SERVICE_ACCOUNT_EMAIL')
self._set_from_env('service_account_file', 'GCP_SERVICE_ACCOUNT_FILE')
self._set_from_env('service_account_info', 'GCP_SERVICE_ACCOUNT_INFO')
def raise_error(self, module, msg):
if module.params['on_error'] == 'strict':
raise GcpLookupException(msg)
elif module.params['on_error'] == 'warn':
self._display.warning(msg)
return None
def get_secret(self, module):
auth = GcpSession(module, "secretmanager")
if module.params['version'] == "latest":
module.params['calc_version'] = self.get_latest_version(module, auth)
else:
module.params['calc_version'] = module.params['version']
# there was an error listing secret versions
if module.params['calc_version'] is None:
return ''
url = "https://secretmanager.googleapis.com/v1/projects/{project}/secrets/{name}/versions/{calc_version}:access".format(
**module.params
)
response = auth.get(url)
self._display.vvv(msg=f"Response: {response.status_code} for {response.request.url}: {response.json()}")
if response.status_code != 200:
self.raise_error(module, f"Failed to lookup secret value via {response.request.url} {response.status_code}")
return ''
def _set_from_env(self, var=None, env_name=None, raise_on_empty=False):
if self.get_option(var) == None:
if env_name is not None and env_name in os.environ:
fallback = os.environ[env_name]
self.set_option(var, fallback)
if self.get_option(var) == None and raise_on_empty:
msg = f"No key '{var}' provided"
if env_name is not None:
msg += f" and no fallback to env['{env_name}'] available"
raise AnsibleError(msg)
def _credentials(self):
cred_type = self.get_option('auth_kind')
if cred_type == 'application':
credentials, project_id = google.auth.default(scopes=self.scopes)
return credentials
if cred_type == 'serviceaccount':
if self.get_option('service_account_file') is not None:
path = os.path.realpath(os.path.expanduser(self.get_option('service_account_file')))
try:
svc_acct_creds = service_account.Credentials.from_service_account_file(path)
except OSError as e:
raise GcpLookupException("Unable to read service_account_file at %s: %s" % (path, e.strerror))
elif self.get_option('service_account_contents') is not None:
try:
info = json.loads(self.get_option('service_account_contents'))
except json.decoder.JSONDecodeError as e:
raise GcpLookupException("Unable to decode service_account_contents as JSON: %s" % e)
svc_acct_creds = service_account.Credentials.from_service_account_info(info)
else:
raise GcpLookupException('Service Account authentication requires setting either service_account_file or service_account_contents')
return svc_acct_creds.with_scopes(self.scopes)
if cred_type == 'machineaccount':
self.svc_acct_creds = google.auth.compute_engine.Credentials(self.service_account_email)
return self.svc_acct_creds
raise GcpLookupException("Credential type '%s' not implemented" % cred_type)
return response.json()['payload']['data']

View file

@ -57,6 +57,10 @@ options:
description:
- Name of the secret to be used
type: str
aliases:
- key
- secret
- secret_id
value:
description:
- The secret value that the secret should have
@ -80,6 +84,13 @@ options:
- "all" is also acceptable on delete (which will delete all versions of a secret)
type: str
default: 'latest'
labels:
description:
- A set of key-value pairs to assign as labels to asecret
- only used in creation
- Note that the "value" piece of a label must contain only readable chars
type: dict
required: False
'''
EXAMPLES='''
@ -120,7 +131,6 @@ EXAMPLES='''
version: all
state: absent
- name: Get
'''
RETURN = '''
@ -258,10 +268,14 @@ def snake_to_camel(snake):
def create_secret(module):
# build the payload
payload = { "replication": { "automatic": {} } }
if module.params['labels']:
payload['labels'] = module.params['labels']
url = "https://secretmanager.googleapis.com/v1/projects/{project}/secrets".format(**module.params)
auth = get_auth(module)
post_response = auth.post(url, body=payload, params={'secretId': module.params['name']})
# validate create
module.raise_for_status(post_response)
return update_secret(module)
def update_secret(module):
@ -343,13 +357,15 @@ def main():
module = GcpModule(
argument_spec=dict(
state=dict(default='present', choices=['present', 'absent'], type='str'),
name=dict(required=True, type='str', aliases=['key', 'secret']),
name=dict(required=True, type='str', aliases=['key', 'secret', 'secret_id']),
value=dict(required=False, type='str'),
version=dict(required=False, type='str', default='latest'),
return_value=dict(required=False, type='bool', default=True)
return_value=dict(required=False, type='bool', default=True),
labels=dict(required=False, type='dict', default=dict())
)
)
if not module.params['scopes']:
module.params['scopes'] = ["https://www.googleapis.com/auth/cloud-platform"]