From 8b9a2c70dd442900c9de2fe51a13e8c347d3ca5d Mon Sep 17 00:00:00 2001 From: "Sieradzki, Lukasz" Date: Fri, 8 Nov 2024 18:46:49 +0100 Subject: [PATCH 1/5] https://github.com/ansible-collections/google.cloud/issues/657 --- .../gcp_pubsub_subscription_gcs_feature.yaml | 2 + plugins/modules/gcp_pubsub_subscription.py | 120 ++++++++++++++++++ .../gcp_pubsub_subscription/tasks/autogen.yml | 19 ++- 3 files changed, 139 insertions(+), 2 deletions(-) create mode 100644 changelogs/gcp_pubsub_subscription_gcs_feature.yaml diff --git a/changelogs/gcp_pubsub_subscription_gcs_feature.yaml b/changelogs/gcp_pubsub_subscription_gcs_feature.yaml new file mode 100644 index 0000000..c136289 --- /dev/null +++ b/changelogs/gcp_pubsub_subscription_gcs_feature.yaml @@ -0,0 +1,2 @@ +features: + - gcp_pubsub_subscription - allows to create GCS subscription \ No newline at end of file diff --git a/plugins/modules/gcp_pubsub_subscription.py b/plugins/modules/gcp_pubsub_subscription.py index 4f6b3b9..bc706f0 100644 --- a/plugins/modules/gcp_pubsub_subscription.py +++ b/plugins/modules/gcp_pubsub_subscription.py @@ -63,6 +63,65 @@ options: }}"' required: true type: dict + cloud_storage: + description: + - Cloud Storage Subscription configuration. + required: false + type: dict + suboptions: + bucket: + description: + - A reference to a Bucket resource. + - This field represents a name of a Bucket resource in GCP where messages read from a Topic will be stored. + required: true + type: str + file_prefix: + description: + - File object name prefix stored in a Bucket. + required: false + type: str + file_suffix: + description: + - File object name suffix stored in a Bucket. + required: false + type: str + file_datetime_format: + description: + - File object datetime format stored in a Bucket. + required: false + type: str + max_duration: + description: + - Subscription writes a new output file if the specified value of max duration is exceeded. Min: 1m, max: 10m. + required: true + type: str + max_bytes: + description: + - Cloud Storage Subscription writes a new output file if the specified value of max bytes is exceeded. Min: 1KB, max: 10GiB. + required: false + type: str + max_messages: + description: + - Cloud Storage Subscription writes a new output file if the specified number of messages is exceeded. Min: 1000. + required: false + type: int + output_format: + description: + - Specify the format of the output files that are to be stored in a Cloud Storage bucket as text or avro. + required: true + type: str + write_metadata: + description: + - This option allows to store the message metadata along with the message, e.g. message headers. + - This field is valid for avro message format only. + required: false + type: bool + use_topic_schema: + description: + - When true, the output Cloud Storage file will be serialized using the topic schema, if it exists. + - This field is valid for avro message format only. + required: false + type: bool labels: description: - A set of key/value label pairs to assign to this Subscription. @@ -576,6 +635,21 @@ def main(): state=dict(default='present', choices=['present', 'absent'], type='str'), name=dict(required=True, type='str'), topic=dict(required=True, type='dict'), + cloud_storage=dict( + type='dict', + options=dict( + bucket=dict(required=True, type='str'), + file_prefix=dict(type='str'), + file_suffix=dict(type='str'), + file_datetime_format=dict(type='str'), + max_duration=dict(type='str'), + max_bytes=dict(type='str'), + max_messages=dict(type='int'), + output_format=dict(type='str'), + write_metadata=dict(type='bool'), + use_topic_schema=dict(type='bool'), + ), + ), labels=dict(type='dict'), push_config=dict( type='dict', @@ -642,6 +716,8 @@ def update(module, link, fetch): def updateMask(request, response): update_mask = [] + if request.get('cloudStorageConfig') != response.get('cloudStorageConfig'): + update_mask.append('cloudStorageConfig') if request.get('labels') != response.get('labels'): update_mask.append('labels') if request.get('pushConfig') != response.get('pushConfig'): @@ -671,6 +747,7 @@ def resource_to_request(module): u'name': name_pattern(module.params.get('name'), module), u'topic': topic_pattern(replace_resource_dict(module.params.get(u'topic', {}), 'name'), module), u'labels': module.params.get('labels'), + u'cloudStorageConfig': SubscriptionCloudStorageConfig(module.params.get(u'cloud_storage', {}), module).to_request(), u'pushConfig': SubscriptionPushconfig(module.params.get('push_config', {}), module).to_request(), u'ackDeadlineSeconds': module.params.get('ack_deadline_seconds'), u'messageRetentionDuration': module.params.get('message_retention_duration'), @@ -748,6 +825,7 @@ def response_to_hash(module, response): u'name': name_pattern(module.params.get('name'), module), u'topic': topic_pattern(replace_resource_dict(module.params.get(u'topic', {}), 'name'), module), u'labels': response.get(u'labels'), + u'cloudStorageConfig': SubscriptionCloudStorageConfig(response.get(u'cloudStorageConfig', {}), module).from_response(), u'pushConfig': SubscriptionPushconfig(response.get(u'pushConfig', {}), module).from_response(), u'ackDeadlineSeconds': response.get(u'ackDeadlineSeconds'), u'messageRetentionDuration': response.get(u'messageRetentionDuration'), @@ -880,6 +958,48 @@ class SubscriptionRetrypolicy(object): def from_response(self): return remove_nones_from_dict({u'minimumBackoff': self.request.get(u'minimumBackoff'), u'maximumBackoff': self.request.get(u'maximumBackoff')}) +class SubscriptionCloudStorageConfig(object): + def __init__(self, request, module): + self.module = module + if request: + self.request = request + else: + self.request = {} + + def to_request(self): + return remove_nones_from_dict( + { + u'bucket': self.request.get('bucket'), + u'filenamePrefix': self.request.get('file_prefix', {}), + u'filenameSuffix': self.request.get('file_suffix', {}), + u'filenameDatetimeFormat': self.request.get('file_datetime_format', {}), + u'maxDuration': self.request.get('max_duration', {}), + u'maxBytes': self.request.get('max_bytes', {}), + u'maxMessages': self.request.get('max_messages', {}), + u'avroConfig': {'writeMetadata': self.request.get('write_metadata', False), + 'useTopicSchema': self.request.get('use_topic_schema', False)} + if self.request.get('output_format', {}) == 'avro' + else {}, + } + ) + + def from_response(self): + return remove_nones_from_dict( + { + u'bucket': self.request.get('bucket'), + u'filenamePrefix': self.request.get('filenamePrefix', {}), + u'filenameSuffix': self.request.get('filenameSuffix', {}), + u'filenameDatetimeFormat': self.request.get('filenameDatetimeFormat', {}), + u'maxDuration': self.request.get('maxDuration', {}), + u'maxBytes': self.request.get('maxBytes', {}), + u'maxMessages': self.request.get('maxMessages', {}), + u'avroConfig': {'writeMetadata': self.request.get('avroConfig', {}).get('writeMetadata', False), + 'useTopicSchema': self.request.get('avroConfig', {}).get('useTopicSchema', False)} + if self.request.get('avroConfig', {}) + else {}, + } + ) + if __name__ == '__main__': main() diff --git a/tests/integration/targets/gcp_pubsub_subscription/tasks/autogen.yml b/tests/integration/targets/gcp_pubsub_subscription/tasks/autogen.yml index 6aeaa58..b431e27 100644 --- a/tests/integration/targets/gcp_pubsub_subscription/tasks/autogen.yml +++ b/tests/integration/targets/gcp_pubsub_subscription/tasks/autogen.yml @@ -73,11 +73,26 @@ that: - result.changed == false #---------------------------------------------------------- +- name: Update ack_deadline_seconds of a subscription that already exists + google.cloud.gcp_pubsub_subscription: + name: "{{ resource_name }}" + topic: "{{ topic }}" + ack_deadline_seconds: 500 + project: "{{ gcp_project }}" + auth_kind: "{{ gcp_cred_kind }}" + service_account_file: "{{ gcp_cred_file | default(omit) }}" + state: present + register: result +- name: Assert changed is true + ansible.builtin.assert: + that: + - result.changed == true +#---------------------------------------------------------- - name: Delete a subscription google.cloud.gcp_pubsub_subscription: name: "{{ resource_name }}" topic: "{{ topic }}" - ack_deadline_seconds: 300 + ack_deadline_seconds: 500 project: "{{ gcp_project }}" auth_kind: "{{ gcp_cred_kind }}" service_account_file: "{{ gcp_cred_file | default(omit) }}" @@ -104,7 +119,7 @@ google.cloud.gcp_pubsub_subscription: name: "{{ resource_name }}" topic: "{{ topic }}" - ack_deadline_seconds: 300 + ack_deadline_seconds: 500 project: "{{ gcp_project }}" auth_kind: "{{ gcp_cred_kind }}" service_account_file: "{{ gcp_cred_file | default(omit) }}" From 2b35fbf4047487d7cc9067cebefc119ac853e537 Mon Sep 17 00:00:00 2001 From: "Sieradzki, Lukasz" Date: Fri, 8 Nov 2024 23:02:23 +0100 Subject: [PATCH 2/5] https://github.com/ansible-collections/google.cloud/issues/657 --- plugins/modules/gcp_pubsub_subscription.py | 22 ++++++++++--- scripts/bootstrap-project.sh | 2 ++ .../gcp_pubsub_subscription/tasks/autogen.yml | 32 ++++++++++++++++--- 3 files changed, 48 insertions(+), 8 deletions(-) diff --git a/plugins/modules/gcp_pubsub_subscription.py b/plugins/modules/gcp_pubsub_subscription.py index bc706f0..f55c0d5 100644 --- a/plugins/modules/gcp_pubsub_subscription.py +++ b/plugins/modules/gcp_pubsub_subscription.py @@ -92,14 +92,14 @@ options: type: str max_duration: description: - - Subscription writes a new output file if the specified value of max duration is exceeded. Min: 1m, max: 10m. + - Subscription writes a new output file if the specified value of max duration is exceeded. Min: 60s, max: 600s. required: true type: str max_bytes: description: - - Cloud Storage Subscription writes a new output file if the specified value of max bytes is exceeded. Min: 1KB, max: 10GiB. + - Cloud Storage Subscription writes a new output file if the specified value of max bytes is exceeded. Min: 1000, max: 10737418240. required: false - type: str + type: int max_messages: description: - Cloud Storage Subscription writes a new output file if the specified number of messages is exceeded. Min: 1000. @@ -643,7 +643,7 @@ def main(): file_suffix=dict(type='str'), file_datetime_format=dict(type='str'), max_duration=dict(type='str'), - max_bytes=dict(type='str'), + max_bytes=dict(type='int'), max_messages=dict(type='int'), output_format=dict(type='str'), write_metadata=dict(type='bool'), @@ -999,6 +999,20 @@ class SubscriptionCloudStorageConfig(object): else {}, } ) + storageConfig = { + u'bucket': self.request.get('bucket', {}), + u'filenamePrefix': self.request.get('filenamePrefix', {}), + u'filenameSuffix': self.request.get('filenameSuffix', {}), + u'filenameDatetimeFormat': self.request.get('filenameDatetimeFormat', {}), + u'maxDuration': self.request.get('maxDuration', {}), + u'maxBytes': self.request.get('maxBytes', {}), + u'maxMessages': self.request.get('maxMessages', {}), + u'avroConfig': {'writeMetadata': self.request.get('avroConfig', {}).get('writeMetadata', False), + 'useTopicSchema': self.request.get('avroConfig', {}).get('useTopicSchema', False)} + if self.request.get('avroConfig', {}) + else {}, + } + return remove_nones_from_dict(storageConfig) if self.request else storageConfig if __name__ == '__main__': diff --git a/scripts/bootstrap-project.sh b/scripts/bootstrap-project.sh index a28f42c..3e90c21 100755 --- a/scripts/bootstrap-project.sh +++ b/scripts/bootstrap-project.sh @@ -32,6 +32,8 @@ SERVICE_LIST=( REQUIRED_ROLE_LIST=( "roles/storage.objectAdmin" + "roles/storage.legacyBucketReader" + "roles/storage.objectCreator" "roles/source.admin" ) diff --git a/tests/integration/targets/gcp_pubsub_subscription/tasks/autogen.yml b/tests/integration/targets/gcp_pubsub_subscription/tasks/autogen.yml index b431e27..bdce0e6 100644 --- a/tests/integration/targets/gcp_pubsub_subscription/tasks/autogen.yml +++ b/tests/integration/targets/gcp_pubsub_subscription/tasks/autogen.yml @@ -21,6 +21,13 @@ service_account_file: "{{ gcp_cred_file | default(omit) }}" state: present register: topic +- name: Create a bucket + google.cloud.gcp_storage_bucket: + name: topic-subscription-bucket + project: "{{ gcp_project }}" + auth_kind: "{{ gcp_cred_kind }}" + service_account_file: "{{ gcp_cred_file | default(omit) }}" + state: present - name: Delete a subscription google.cloud.gcp_pubsub_subscription: name: "{{ resource_name }}" @@ -73,11 +80,20 @@ that: - result.changed == false #---------------------------------------------------------- -- name: Update ack_deadline_seconds of a subscription that already exists +- name: Update cloudStorageConfig of a subscription that already exists google.cloud.gcp_pubsub_subscription: name: "{{ resource_name }}" topic: "{{ topic }}" - ack_deadline_seconds: 500 + ack_deadline_seconds: 300 + cloud_storage: { + bucket: "topic-subscription-bucket", + file_prefix: "test_", + file_suffix: "_test", + max_bytes: 10737418240, + max_duration: "600s", + output_format: "avro", + write_metadata: true + } project: "{{ gcp_project }}" auth_kind: "{{ gcp_cred_kind }}" service_account_file: "{{ gcp_cred_file | default(omit) }}" @@ -92,7 +108,7 @@ google.cloud.gcp_pubsub_subscription: name: "{{ resource_name }}" topic: "{{ topic }}" - ack_deadline_seconds: 500 + ack_deadline_seconds: 300 project: "{{ gcp_project }}" auth_kind: "{{ gcp_cred_kind }}" service_account_file: "{{ gcp_cred_file | default(omit) }}" @@ -119,7 +135,7 @@ google.cloud.gcp_pubsub_subscription: name: "{{ resource_name }}" topic: "{{ topic }}" - ack_deadline_seconds: 500 + ack_deadline_seconds: 300 project: "{{ gcp_project }}" auth_kind: "{{ gcp_cred_kind }}" service_account_file: "{{ gcp_cred_file | default(omit) }}" @@ -141,3 +157,11 @@ state: absent register: topic ignore_errors: true + +- name: Delete a bucket + google.cloud.gcp_storage_bucket: + name: topic-subscription-bucket + project: "{{ gcp_project }}" + auth_kind: "{{ gcp_cred_kind }}" + service_account_file: "{{ gcp_cred_file | default(omit) }}" + state: absent From e89571eb914e2f277f5bd7994f15e535f869b7f2 Mon Sep 17 00:00:00 2001 From: "Sieradzki, Lukasz" Date: Fri, 8 Nov 2024 23:03:14 +0100 Subject: [PATCH 3/5] https://github.com/ansible-collections/google.cloud/issues/657 --- plugins/modules/gcp_pubsub_subscription.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/plugins/modules/gcp_pubsub_subscription.py b/plugins/modules/gcp_pubsub_subscription.py index f55c0d5..fa1420a 100644 --- a/plugins/modules/gcp_pubsub_subscription.py +++ b/plugins/modules/gcp_pubsub_subscription.py @@ -984,21 +984,6 @@ class SubscriptionCloudStorageConfig(object): ) def from_response(self): - return remove_nones_from_dict( - { - u'bucket': self.request.get('bucket'), - u'filenamePrefix': self.request.get('filenamePrefix', {}), - u'filenameSuffix': self.request.get('filenameSuffix', {}), - u'filenameDatetimeFormat': self.request.get('filenameDatetimeFormat', {}), - u'maxDuration': self.request.get('maxDuration', {}), - u'maxBytes': self.request.get('maxBytes', {}), - u'maxMessages': self.request.get('maxMessages', {}), - u'avroConfig': {'writeMetadata': self.request.get('avroConfig', {}).get('writeMetadata', False), - 'useTopicSchema': self.request.get('avroConfig', {}).get('useTopicSchema', False)} - if self.request.get('avroConfig', {}) - else {}, - } - ) storageConfig = { u'bucket': self.request.get('bucket', {}), u'filenamePrefix': self.request.get('filenamePrefix', {}), From 68c9af276c65e49c173463ccb74906fa60d4f3b0 Mon Sep 17 00:00:00 2001 From: Chris Hawk Date: Mon, 11 Nov 2024 16:40:47 -0800 Subject: [PATCH 4/5] Update gcp_pubsub_subscription.py Fix lint errors --- plugins/modules/gcp_pubsub_subscription.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/plugins/modules/gcp_pubsub_subscription.py b/plugins/modules/gcp_pubsub_subscription.py index fa1420a..01c0147 100644 --- a/plugins/modules/gcp_pubsub_subscription.py +++ b/plugins/modules/gcp_pubsub_subscription.py @@ -89,7 +89,7 @@ options: description: - File object datetime format stored in a Bucket. required: false - type: str + type: str max_duration: description: - Subscription writes a new output file if the specified value of max duration is exceeded. Min: 60s, max: 600s. @@ -104,7 +104,7 @@ options: description: - Cloud Storage Subscription writes a new output file if the specified number of messages is exceeded. Min: 1000. required: false - type: int + type: int output_format: description: - Specify the format of the output files that are to be stored in a Cloud Storage bucket as text or avro. @@ -958,6 +958,7 @@ class SubscriptionRetrypolicy(object): def from_response(self): return remove_nones_from_dict({u'minimumBackoff': self.request.get(u'minimumBackoff'), u'maximumBackoff': self.request.get(u'maximumBackoff')}) + class SubscriptionCloudStorageConfig(object): def __init__(self, request, module): self.module = module From d719b0efaaeffe33f565ac9b495e8f606f555768 Mon Sep 17 00:00:00 2001 From: Chris Hawk Date: Mon, 11 Nov 2024 16:49:34 -0800 Subject: [PATCH 5/5] Fix a YAML doc parsing error in gcp_pubsub_subscription.py --- plugins/modules/gcp_pubsub_subscription.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/modules/gcp_pubsub_subscription.py b/plugins/modules/gcp_pubsub_subscription.py index 01c0147..41a0c2d 100644 --- a/plugins/modules/gcp_pubsub_subscription.py +++ b/plugins/modules/gcp_pubsub_subscription.py @@ -92,17 +92,17 @@ options: type: str max_duration: description: - - Subscription writes a new output file if the specified value of max duration is exceeded. Min: 60s, max: 600s. + - Subscription writes a new output file if the specified value of max duration is exceeded. Min 60s, max 600s. required: true type: str max_bytes: description: - - Cloud Storage Subscription writes a new output file if the specified value of max bytes is exceeded. Min: 1000, max: 10737418240. + - Cloud Storage Subscription writes a new output file if the specified value of max bytes is exceeded. Min 1000, max 10737418240. required: false type: int max_messages: description: - - Cloud Storage Subscription writes a new output file if the specified number of messages is exceeded. Min: 1000. + - Cloud Storage Subscription writes a new output file if the specified number of messages is exceeded. Min 1000. required: false type: int output_format: