Merge pull request #658 from lsieradzki/feature/#657

https://github.com/ansible-collections/google.cloud/issues/657
This commit is contained in:
Chris Hawk 2024-11-11 16:55:06 -08:00 committed by GitHub
commit 850e4c6ea5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 163 additions and 0 deletions

View file

@ -0,0 +1,2 @@
features:
- gcp_pubsub_subscription - allows to create GCS subscription

View file

@ -63,6 +63,65 @@ options:
}}"'
required: true
type: dict
cloud_storage:
description:
- Cloud Storage Subscription configuration.
required: false
type: dict
suboptions:
bucket:
description:
- A reference to a Bucket resource.
- This field represents a name of a Bucket resource in GCP where messages read from a Topic will be stored.
required: true
type: str
file_prefix:
description:
- File object name prefix stored in a Bucket.
required: false
type: str
file_suffix:
description:
- File object name suffix stored in a Bucket.
required: false
type: str
file_datetime_format:
description:
- File object datetime format stored in a Bucket.
required: false
type: str
max_duration:
description:
- Subscription writes a new output file if the specified value of max duration is exceeded. Min 60s, max 600s.
required: true
type: str
max_bytes:
description:
- Cloud Storage Subscription writes a new output file if the specified value of max bytes is exceeded. Min 1000, max 10737418240.
required: false
type: int
max_messages:
description:
- Cloud Storage Subscription writes a new output file if the specified number of messages is exceeded. Min 1000.
required: false
type: int
output_format:
description:
- Specify the format of the output files that are to be stored in a Cloud Storage bucket as text or avro.
required: true
type: str
write_metadata:
description:
- This option allows to store the message metadata along with the message, e.g. message headers.
- This field is valid for avro message format only.
required: false
type: bool
use_topic_schema:
description:
- When true, the output Cloud Storage file will be serialized using the topic schema, if it exists.
- This field is valid for avro message format only.
required: false
type: bool
labels:
description:
- A set of key/value label pairs to assign to this Subscription.
@ -576,6 +635,21 @@ def main():
state=dict(default='present', choices=['present', 'absent'], type='str'),
name=dict(required=True, type='str'),
topic=dict(required=True, type='dict'),
cloud_storage=dict(
type='dict',
options=dict(
bucket=dict(required=True, type='str'),
file_prefix=dict(type='str'),
file_suffix=dict(type='str'),
file_datetime_format=dict(type='str'),
max_duration=dict(type='str'),
max_bytes=dict(type='int'),
max_messages=dict(type='int'),
output_format=dict(type='str'),
write_metadata=dict(type='bool'),
use_topic_schema=dict(type='bool'),
),
),
labels=dict(type='dict'),
push_config=dict(
type='dict',
@ -642,6 +716,8 @@ def update(module, link, fetch):
def updateMask(request, response):
update_mask = []
if request.get('cloudStorageConfig') != response.get('cloudStorageConfig'):
update_mask.append('cloudStorageConfig')
if request.get('labels') != response.get('labels'):
update_mask.append('labels')
if request.get('pushConfig') != response.get('pushConfig'):
@ -671,6 +747,7 @@ def resource_to_request(module):
u'name': name_pattern(module.params.get('name'), module),
u'topic': topic_pattern(replace_resource_dict(module.params.get(u'topic', {}), 'name'), module),
u'labels': module.params.get('labels'),
u'cloudStorageConfig': SubscriptionCloudStorageConfig(module.params.get(u'cloud_storage', {}), module).to_request(),
u'pushConfig': SubscriptionPushconfig(module.params.get('push_config', {}), module).to_request(),
u'ackDeadlineSeconds': module.params.get('ack_deadline_seconds'),
u'messageRetentionDuration': module.params.get('message_retention_duration'),
@ -748,6 +825,7 @@ def response_to_hash(module, response):
u'name': name_pattern(module.params.get('name'), module),
u'topic': topic_pattern(replace_resource_dict(module.params.get(u'topic', {}), 'name'), module),
u'labels': response.get(u'labels'),
u'cloudStorageConfig': SubscriptionCloudStorageConfig(response.get(u'cloudStorageConfig', {}), module).from_response(),
u'pushConfig': SubscriptionPushconfig(response.get(u'pushConfig', {}), module).from_response(),
u'ackDeadlineSeconds': response.get(u'ackDeadlineSeconds'),
u'messageRetentionDuration': response.get(u'messageRetentionDuration'),
@ -881,5 +959,47 @@ class SubscriptionRetrypolicy(object):
return remove_nones_from_dict({u'minimumBackoff': self.request.get(u'minimumBackoff'), u'maximumBackoff': self.request.get(u'maximumBackoff')})
class SubscriptionCloudStorageConfig(object):
def __init__(self, request, module):
self.module = module
if request:
self.request = request
else:
self.request = {}
def to_request(self):
return remove_nones_from_dict(
{
u'bucket': self.request.get('bucket'),
u'filenamePrefix': self.request.get('file_prefix', {}),
u'filenameSuffix': self.request.get('file_suffix', {}),
u'filenameDatetimeFormat': self.request.get('file_datetime_format', {}),
u'maxDuration': self.request.get('max_duration', {}),
u'maxBytes': self.request.get('max_bytes', {}),
u'maxMessages': self.request.get('max_messages', {}),
u'avroConfig': {'writeMetadata': self.request.get('write_metadata', False),
'useTopicSchema': self.request.get('use_topic_schema', False)}
if self.request.get('output_format', {}) == 'avro'
else {},
}
)
def from_response(self):
storageConfig = {
u'bucket': self.request.get('bucket', {}),
u'filenamePrefix': self.request.get('filenamePrefix', {}),
u'filenameSuffix': self.request.get('filenameSuffix', {}),
u'filenameDatetimeFormat': self.request.get('filenameDatetimeFormat', {}),
u'maxDuration': self.request.get('maxDuration', {}),
u'maxBytes': self.request.get('maxBytes', {}),
u'maxMessages': self.request.get('maxMessages', {}),
u'avroConfig': {'writeMetadata': self.request.get('avroConfig', {}).get('writeMetadata', False),
'useTopicSchema': self.request.get('avroConfig', {}).get('useTopicSchema', False)}
if self.request.get('avroConfig', {})
else {},
}
return remove_nones_from_dict(storageConfig) if self.request else storageConfig
if __name__ == '__main__':
main()

View file

@ -32,6 +32,8 @@ SERVICE_LIST=(
REQUIRED_ROLE_LIST=(
"roles/storage.objectAdmin"
"roles/storage.legacyBucketReader"
"roles/storage.objectCreator"
"roles/source.admin"
)

View file

@ -21,6 +21,13 @@
service_account_file: "{{ gcp_cred_file | default(omit) }}"
state: present
register: topic
- name: Create a bucket
google.cloud.gcp_storage_bucket:
name: topic-subscription-bucket
project: "{{ gcp_project }}"
auth_kind: "{{ gcp_cred_kind }}"
service_account_file: "{{ gcp_cred_file | default(omit) }}"
state: present
- name: Delete a subscription
google.cloud.gcp_pubsub_subscription:
name: "{{ resource_name }}"
@ -88,6 +95,30 @@
that:
- result.changed == true
#----------------------------------------------------------
- name: Update cloudStorageConfig of a subscription that already exists
google.cloud.gcp_pubsub_subscription:
name: "{{ resource_name }}"
topic: "{{ topic }}"
ack_deadline_seconds: 300
cloud_storage: {
bucket: "topic-subscription-bucket",
file_prefix: "test_",
file_suffix: "_test",
max_bytes: 10737418240,
max_duration: "600s",
output_format: "avro",
write_metadata: true
}
project: "{{ gcp_project }}"
auth_kind: "{{ gcp_cred_kind }}"
service_account_file: "{{ gcp_cred_file | default(omit) }}"
state: present
register: result
- name: Assert changed is true
ansible.builtin.assert:
that:
- result.changed == true
#----------------------------------------------------------
- name: Delete a subscription
google.cloud.gcp_pubsub_subscription:
name: "{{ resource_name }}"
@ -141,3 +172,11 @@
state: absent
register: topic
ignore_errors: true
- name: Delete a bucket
google.cloud.gcp_storage_bucket:
name: topic-subscription-bucket
project: "{{ gcp_project }}"
auth_kind: "{{ gcp_cred_kind }}"
service_account_file: "{{ gcp_cred_file | default(omit) }}"
state: absent