mirror of
https://github.com/ansible-collections/google.cloud.git
synced 2025-04-05 02:10:27 -07:00
Merge branch 'master' into issue-613
This commit is contained in:
commit
25d53ff320
7 changed files with 206 additions and 6 deletions
|
@ -0,0 +1,2 @@
|
|||
bugfixes:
|
||||
- gcp_bigquery_table - properly handle BigQuery table clustering fields
|
2
changelogs/gcp_pubsub_subscription_bugfix.yaml
Normal file
2
changelogs/gcp_pubsub_subscription_bugfix.yaml
Normal file
|
@ -0,0 +1,2 @@
|
|||
bugfixes:
|
||||
- gcp_pubsub_subscription - improper subscription uprade PATCH request
|
2
changelogs/gcp_pubsub_subscription_gcs_feature.yaml
Normal file
2
changelogs/gcp_pubsub_subscription_gcs_feature.yaml
Normal file
|
@ -0,0 +1,2 @@
|
|||
features:
|
||||
- gcp_pubsub_subscription - allows to create GCS subscription
|
|
@ -1172,7 +1172,7 @@ def resource_to_request(module):
|
|||
request = {
|
||||
u'kind': 'bigquery#table',
|
||||
u'tableReference': TableTablereference(module.params.get('table_reference', {}), module).to_request(),
|
||||
u'clustering': module.params.get('clustering'),
|
||||
u'clustering': TableClustering(module.params.get('clustering', {}), module).to_request(),
|
||||
u'description': module.params.get('description'),
|
||||
u'friendlyName': module.params.get('friendly_name'),
|
||||
u'labels': module.params.get('labels'),
|
||||
|
@ -1250,7 +1250,7 @@ def is_different(module, response):
|
|||
def response_to_hash(module, response):
|
||||
return {
|
||||
u'tableReference': TableTablereference(response.get(u'tableReference', {}), module).from_response(),
|
||||
u'clustering': response.get(u'clustering'),
|
||||
u'clustering': TableClustering(response.get(u'clustering', {}), module).from_response(),
|
||||
u'creationTime': response.get(u'creationTime'),
|
||||
u'description': response.get(u'description'),
|
||||
u'friendlyName': response.get(u'friendlyName'),
|
||||
|
@ -1716,5 +1716,20 @@ class TableColumnsArray(object):
|
|||
)
|
||||
|
||||
|
||||
class TableClustering(object):
|
||||
def __init__(self, request, module):
|
||||
self.module = module
|
||||
if request:
|
||||
self.request = request
|
||||
else:
|
||||
self.request = {}
|
||||
|
||||
def to_request(self):
|
||||
return remove_nones_from_dict({'fields': self.request})
|
||||
|
||||
def from_response(self):
|
||||
return remove_nones_from_dict({'fields': self.request})
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
|
@ -63,6 +63,65 @@ options:
|
|||
}}"'
|
||||
required: true
|
||||
type: dict
|
||||
cloud_storage:
|
||||
description:
|
||||
- Cloud Storage Subscription configuration.
|
||||
required: false
|
||||
type: dict
|
||||
suboptions:
|
||||
bucket:
|
||||
description:
|
||||
- A reference to a Bucket resource.
|
||||
- This field represents a name of a Bucket resource in GCP where messages read from a Topic will be stored.
|
||||
required: true
|
||||
type: str
|
||||
file_prefix:
|
||||
description:
|
||||
- File object name prefix stored in a Bucket.
|
||||
required: false
|
||||
type: str
|
||||
file_suffix:
|
||||
description:
|
||||
- File object name suffix stored in a Bucket.
|
||||
required: false
|
||||
type: str
|
||||
file_datetime_format:
|
||||
description:
|
||||
- File object datetime format stored in a Bucket.
|
||||
required: false
|
||||
type: str
|
||||
max_duration:
|
||||
description:
|
||||
- Subscription writes a new output file if the specified value of max duration is exceeded. Min 60s, max 600s.
|
||||
required: true
|
||||
type: str
|
||||
max_bytes:
|
||||
description:
|
||||
- Cloud Storage Subscription writes a new output file if the specified value of max bytes is exceeded. Min 1000, max 10737418240.
|
||||
required: false
|
||||
type: int
|
||||
max_messages:
|
||||
description:
|
||||
- Cloud Storage Subscription writes a new output file if the specified number of messages is exceeded. Min 1000.
|
||||
required: false
|
||||
type: int
|
||||
output_format:
|
||||
description:
|
||||
- Specify the format of the output files that are to be stored in a Cloud Storage bucket as text or avro.
|
||||
required: true
|
||||
type: str
|
||||
write_metadata:
|
||||
description:
|
||||
- This option allows to store the message metadata along with the message, e.g. message headers.
|
||||
- This field is valid for avro message format only.
|
||||
required: false
|
||||
type: bool
|
||||
use_topic_schema:
|
||||
description:
|
||||
- When true, the output Cloud Storage file will be serialized using the topic schema, if it exists.
|
||||
- This field is valid for avro message format only.
|
||||
required: false
|
||||
type: bool
|
||||
labels:
|
||||
description:
|
||||
- A set of key/value label pairs to assign to this Subscription.
|
||||
|
@ -576,6 +635,21 @@ def main():
|
|||
state=dict(default='present', choices=['present', 'absent'], type='str'),
|
||||
name=dict(required=True, type='str'),
|
||||
topic=dict(required=True, type='dict'),
|
||||
cloud_storage=dict(
|
||||
type='dict',
|
||||
options=dict(
|
||||
bucket=dict(required=True, type='str'),
|
||||
file_prefix=dict(type='str'),
|
||||
file_suffix=dict(type='str'),
|
||||
file_datetime_format=dict(type='str'),
|
||||
max_duration=dict(type='str'),
|
||||
max_bytes=dict(type='int'),
|
||||
max_messages=dict(type='int'),
|
||||
output_format=dict(type='str'),
|
||||
write_metadata=dict(type='bool'),
|
||||
use_topic_schema=dict(type='bool'),
|
||||
),
|
||||
),
|
||||
labels=dict(type='dict'),
|
||||
push_config=dict(
|
||||
type='dict',
|
||||
|
@ -636,13 +710,16 @@ def create(module, link):
|
|||
def update(module, link, fetch):
|
||||
auth = GcpSession(module, 'pubsub')
|
||||
params = {'updateMask': updateMask(resource_to_request(module), response_to_hash(module, fetch))}
|
||||
request = resource_to_request(module)
|
||||
del request['name']
|
||||
subscription = resource_to_request(module)
|
||||
del subscription['name']
|
||||
request = {'subscription': subscription}
|
||||
return return_if_object(module, auth.patch(link, request, params=params))
|
||||
|
||||
|
||||
def updateMask(request, response):
|
||||
update_mask = []
|
||||
if request.get('cloudStorageConfig') != response.get('cloudStorageConfig'):
|
||||
update_mask.append('cloudStorageConfig')
|
||||
if request.get('labels') != response.get('labels'):
|
||||
update_mask.append('labels')
|
||||
if request.get('pushConfig') != response.get('pushConfig'):
|
||||
|
@ -653,7 +730,7 @@ def updateMask(request, response):
|
|||
update_mask.append('messageRetentionDuration')
|
||||
if request.get('retainAckedMessages') != response.get('retainAckedMessages'):
|
||||
update_mask.append('retainAckedMessages')
|
||||
if request.get('expirationPolicy') != response.get('expirationPolicy'):
|
||||
if request.get('expirationPolicy') and request.get('expirationPolicy') != response.get('expirationPolicy'):
|
||||
update_mask.append('expirationPolicy')
|
||||
if request.get('deadLetterPolicy') != response.get('deadLetterPolicy'):
|
||||
update_mask.append('deadLetterPolicy')
|
||||
|
@ -672,6 +749,7 @@ def resource_to_request(module):
|
|||
u'name': name_pattern(module.params.get('name'), module),
|
||||
u'topic': topic_pattern(replace_resource_dict(module.params.get(u'topic', {}), 'name'), module),
|
||||
u'labels': module.params.get('labels'),
|
||||
u'cloudStorageConfig': SubscriptionCloudStorageConfig(module.params.get(u'cloud_storage', {}), module).to_request(),
|
||||
u'pushConfig': SubscriptionPushconfig(module.params.get('push_config', {}), module).to_request(),
|
||||
u'ackDeadlineSeconds': module.params.get('ack_deadline_seconds'),
|
||||
u'messageRetentionDuration': module.params.get('message_retention_duration'),
|
||||
|
@ -749,6 +827,7 @@ def response_to_hash(module, response):
|
|||
u'name': name_pattern(module.params.get('name'), module),
|
||||
u'topic': topic_pattern(replace_resource_dict(module.params.get(u'topic', {}), 'name'), module),
|
||||
u'labels': response.get(u'labels'),
|
||||
u'cloudStorageConfig': SubscriptionCloudStorageConfig(response.get(u'cloudStorageConfig', {}), module).from_response(),
|
||||
u'pushConfig': SubscriptionPushconfig(response.get(u'pushConfig', {}), module).from_response(),
|
||||
u'ackDeadlineSeconds': response.get(u'ackDeadlineSeconds'),
|
||||
u'messageRetentionDuration': response.get(u'messageRetentionDuration'),
|
||||
|
@ -840,7 +919,9 @@ class SubscriptionExpirationpolicy(object):
|
|||
self.request = {}
|
||||
|
||||
def to_request(self):
|
||||
return remove_nones_from_dict({u'ttl': self.request.get('ttl')})
|
||||
ttl = self.request.get('ttl')
|
||||
ttl = None if ttl == "" else ttl
|
||||
return remove_nones_from_dict({u'ttl': ttl})
|
||||
|
||||
def from_response(self):
|
||||
return remove_nones_from_dict({u'ttl': self.request.get(u'ttl')})
|
||||
|
@ -880,5 +961,47 @@ class SubscriptionRetrypolicy(object):
|
|||
return remove_nones_from_dict({u'minimumBackoff': self.request.get(u'minimumBackoff'), u'maximumBackoff': self.request.get(u'maximumBackoff')})
|
||||
|
||||
|
||||
class SubscriptionCloudStorageConfig(object):
|
||||
def __init__(self, request, module):
|
||||
self.module = module
|
||||
if request:
|
||||
self.request = request
|
||||
else:
|
||||
self.request = {}
|
||||
|
||||
def to_request(self):
|
||||
return remove_nones_from_dict(
|
||||
{
|
||||
u'bucket': self.request.get('bucket'),
|
||||
u'filenamePrefix': self.request.get('file_prefix', {}),
|
||||
u'filenameSuffix': self.request.get('file_suffix', {}),
|
||||
u'filenameDatetimeFormat': self.request.get('file_datetime_format', {}),
|
||||
u'maxDuration': self.request.get('max_duration', {}),
|
||||
u'maxBytes': self.request.get('max_bytes', {}),
|
||||
u'maxMessages': self.request.get('max_messages', {}),
|
||||
u'avroConfig': {'writeMetadata': self.request.get('write_metadata', False),
|
||||
'useTopicSchema': self.request.get('use_topic_schema', False)}
|
||||
if self.request.get('output_format', {}) == 'avro'
|
||||
else {},
|
||||
}
|
||||
)
|
||||
|
||||
def from_response(self):
|
||||
storageConfig = {
|
||||
u'bucket': self.request.get('bucket', {}),
|
||||
u'filenamePrefix': self.request.get('filenamePrefix', {}),
|
||||
u'filenameSuffix': self.request.get('filenameSuffix', {}),
|
||||
u'filenameDatetimeFormat': self.request.get('filenameDatetimeFormat', {}),
|
||||
u'maxDuration': self.request.get('maxDuration', {}),
|
||||
u'maxBytes': self.request.get('maxBytes', {}),
|
||||
u'maxMessages': self.request.get('maxMessages', {}),
|
||||
u'avroConfig': {'writeMetadata': self.request.get('avroConfig', {}).get('writeMetadata', False),
|
||||
'useTopicSchema': self.request.get('avroConfig', {}).get('useTopicSchema', False)}
|
||||
if self.request.get('avroConfig', {})
|
||||
else {},
|
||||
}
|
||||
return remove_nones_from_dict(storageConfig) if self.request else storageConfig
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
|
@ -32,6 +32,8 @@ SERVICE_LIST=(
|
|||
|
||||
REQUIRED_ROLE_LIST=(
|
||||
"roles/storage.objectAdmin"
|
||||
"roles/storage.legacyBucketReader"
|
||||
"roles/storage.objectCreator"
|
||||
"roles/source.admin"
|
||||
)
|
||||
|
||||
|
|
|
@ -21,6 +21,13 @@
|
|||
service_account_file: "{{ gcp_cred_file | default(omit) }}"
|
||||
state: present
|
||||
register: topic
|
||||
- name: Create a bucket
|
||||
google.cloud.gcp_storage_bucket:
|
||||
name: topic-subscription-bucket
|
||||
project: "{{ gcp_project }}"
|
||||
auth_kind: "{{ gcp_cred_kind }}"
|
||||
service_account_file: "{{ gcp_cred_file | default(omit) }}"
|
||||
state: present
|
||||
- name: Delete a subscription
|
||||
google.cloud.gcp_pubsub_subscription:
|
||||
name: "{{ resource_name }}"
|
||||
|
@ -73,6 +80,45 @@
|
|||
that:
|
||||
- result.changed == false
|
||||
#----------------------------------------------------------
|
||||
- name: Update a subscription
|
||||
google.cloud.gcp_pubsub_subscription:
|
||||
name: "{{ resource_name }}"
|
||||
topic: "{{ topic }}"
|
||||
ack_deadline_seconds: 60
|
||||
project: "{{ gcp_project }}"
|
||||
auth_kind: "{{ gcp_cred_kind }}"
|
||||
service_account_file: "{{ gcp_cred_file | default(omit) }}"
|
||||
state: present
|
||||
register: result
|
||||
- name: Assert changed is true
|
||||
ansible.builtin.assert:
|
||||
that:
|
||||
- result.changed == true
|
||||
#----------------------------------------------------------
|
||||
- name: Update cloudStorageConfig of a subscription that already exists
|
||||
google.cloud.gcp_pubsub_subscription:
|
||||
name: "{{ resource_name }}"
|
||||
topic: "{{ topic }}"
|
||||
ack_deadline_seconds: 300
|
||||
cloud_storage: {
|
||||
bucket: "topic-subscription-bucket",
|
||||
file_prefix: "test_",
|
||||
file_suffix: "_test",
|
||||
max_bytes: 10737418240,
|
||||
max_duration: "600s",
|
||||
output_format: "avro",
|
||||
write_metadata: true
|
||||
}
|
||||
project: "{{ gcp_project }}"
|
||||
auth_kind: "{{ gcp_cred_kind }}"
|
||||
service_account_file: "{{ gcp_cred_file | default(omit) }}"
|
||||
state: present
|
||||
register: result
|
||||
- name: Assert changed is true
|
||||
ansible.builtin.assert:
|
||||
that:
|
||||
- result.changed == true
|
||||
#----------------------------------------------------------
|
||||
- name: Delete a subscription
|
||||
google.cloud.gcp_pubsub_subscription:
|
||||
name: "{{ resource_name }}"
|
||||
|
@ -126,3 +172,11 @@
|
|||
state: absent
|
||||
register: topic
|
||||
ignore_errors: true
|
||||
|
||||
- name: Delete a bucket
|
||||
google.cloud.gcp_storage_bucket:
|
||||
name: topic-subscription-bucket
|
||||
project: "{{ gcp_project }}"
|
||||
auth_kind: "{{ gcp_cred_kind }}"
|
||||
service_account_file: "{{ gcp_cred_file | default(omit) }}"
|
||||
state: absent
|
||||
|
|
Loading…
Add table
Reference in a new issue