mirror of
https://github.com/ansible-collections/community.general.git
synced 2025-07-22 21:00:22 -07:00
parent
887456ab8e
commit
02f66b9369
38 changed files with 865 additions and 888 deletions
|
@ -178,10 +178,10 @@ CLOUD_CLIENT_MINIMUM_VERSION = '0.22.0'
|
|||
CLOUD_CLIENT_USER_AGENT = 'ansible-pubsub-0.1'
|
||||
|
||||
try:
|
||||
from ast import literal_eval
|
||||
HAS_PYTHON26 = True
|
||||
from ast import literal_eval
|
||||
HAS_PYTHON26 = True
|
||||
except ImportError:
|
||||
HAS_PYTHON26 = False
|
||||
HAS_PYTHON26 = False
|
||||
|
||||
try:
|
||||
from google.cloud import pubsub
|
||||
|
@ -191,137 +191,137 @@ except ImportError as e:
|
|||
|
||||
|
||||
def publish_messages(message_list, topic):
|
||||
with topic.batch() as batch:
|
||||
for message in message_list:
|
||||
msg = message['message']
|
||||
attrs = {}
|
||||
if 'attributes' in message:
|
||||
attrs = message['attributes']
|
||||
batch.publish(bytes(msg), **attrs)
|
||||
return True
|
||||
with topic.batch() as batch:
|
||||
for message in message_list:
|
||||
msg = message['message']
|
||||
attrs = {}
|
||||
if 'attributes' in message:
|
||||
attrs = message['attributes']
|
||||
batch.publish(bytes(msg), **attrs)
|
||||
return True
|
||||
|
||||
def pull_messages(pull_params, sub):
|
||||
"""
|
||||
:rtype: tuple (output, changed)
|
||||
"""
|
||||
changed = False
|
||||
max_messages=pull_params.get('max_messages', None)
|
||||
message_ack = pull_params.get('message_ack', 'no')
|
||||
return_immediately = pull_params.get('return_immediately', False)
|
||||
"""
|
||||
:rtype: tuple (output, changed)
|
||||
"""
|
||||
changed = False
|
||||
max_messages=pull_params.get('max_messages', None)
|
||||
message_ack = pull_params.get('message_ack', 'no')
|
||||
return_immediately = pull_params.get('return_immediately', False)
|
||||
|
||||
output= []
|
||||
pulled = sub.pull(return_immediately=return_immediately,
|
||||
max_messages=max_messages)
|
||||
output= []
|
||||
pulled = sub.pull(return_immediately=return_immediately,
|
||||
max_messages=max_messages)
|
||||
|
||||
for ack_id, msg in pulled:
|
||||
msg_dict = {'message_id': msg.message_id,
|
||||
'attributes': msg.attributes,
|
||||
'data': msg.data,
|
||||
'ack_id': ack_id }
|
||||
output.append(msg_dict)
|
||||
for ack_id, msg in pulled:
|
||||
msg_dict = {'message_id': msg.message_id,
|
||||
'attributes': msg.attributes,
|
||||
'data': msg.data,
|
||||
'ack_id': ack_id }
|
||||
output.append(msg_dict)
|
||||
|
||||
if message_ack:
|
||||
ack_ids = [m['ack_id'] for m in output]
|
||||
if ack_ids:
|
||||
sub.acknowledge(ack_ids)
|
||||
changed = True
|
||||
return (output, changed)
|
||||
if message_ack:
|
||||
ack_ids = [m['ack_id'] for m in output]
|
||||
if ack_ids:
|
||||
sub.acknowledge(ack_ids)
|
||||
changed = True
|
||||
return (output, changed)
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
module = AnsibleModule(argument_spec=dict(
|
||||
topic=dict(required=True),
|
||||
state=dict(choices=['absent', 'present'], default='present'),
|
||||
publish=dict(type='list', default=None),
|
||||
subscription=dict(type='dict', default=None),
|
||||
service_account_email=dict(),
|
||||
credentials_file=dict(),
|
||||
project_id=dict(), ),)
|
||||
module = AnsibleModule(argument_spec=dict(
|
||||
topic=dict(required=True),
|
||||
state=dict(choices=['absent', 'present'], default='present'),
|
||||
publish=dict(type='list', default=None),
|
||||
subscription=dict(type='dict', default=None),
|
||||
service_account_email=dict(),
|
||||
credentials_file=dict(),
|
||||
project_id=dict(), ),)
|
||||
|
||||
if not HAS_PYTHON26:
|
||||
module.fail_json(
|
||||
msg="GCE module requires python's 'ast' module, python v2.6+")
|
||||
if not HAS_PYTHON26:
|
||||
module.fail_json(
|
||||
msg="GCE module requires python's 'ast' module, python v2.6+")
|
||||
|
||||
if not HAS_GOOGLE_CLOUD_PUBSUB:
|
||||
module.fail_json(msg="Please install google-cloud-pubsub library.")
|
||||
if not HAS_GOOGLE_CLOUD_PUBSUB:
|
||||
module.fail_json(msg="Please install google-cloud-pubsub library.")
|
||||
|
||||
if not check_min_pkg_version(CLOUD_CLIENT, CLOUD_CLIENT_MINIMUM_VERSION):
|
||||
if not check_min_pkg_version(CLOUD_CLIENT, CLOUD_CLIENT_MINIMUM_VERSION):
|
||||
module.fail_json(msg="Please install %s client version %s" % (CLOUD_CLIENT, CLOUD_CLIENT_MINIMUM_VERSION))
|
||||
|
||||
mod_params = {}
|
||||
mod_params['publish'] = module.params.get('publish')
|
||||
mod_params['state'] = module.params.get('state')
|
||||
mod_params['topic'] = module.params.get('topic')
|
||||
mod_params['subscription'] = module.params.get('subscription')
|
||||
mod_params = {}
|
||||
mod_params['publish'] = module.params.get('publish')
|
||||
mod_params['state'] = module.params.get('state')
|
||||
mod_params['topic'] = module.params.get('topic')
|
||||
mod_params['subscription'] = module.params.get('subscription')
|
||||
|
||||
creds, params = get_google_cloud_credentials(module)
|
||||
pubsub_client = pubsub.Client(project=params['project_id'], credentials=creds, use_gax=False)
|
||||
pubsub_client.user_agent = CLOUD_CLIENT_USER_AGENT
|
||||
creds, params = get_google_cloud_credentials(module)
|
||||
pubsub_client = pubsub.Client(project=params['project_id'], credentials=creds, use_gax=False)
|
||||
pubsub_client.user_agent = CLOUD_CLIENT_USER_AGENT
|
||||
|
||||
changed = False
|
||||
json_output = {}
|
||||
changed = False
|
||||
json_output = {}
|
||||
|
||||
t = None
|
||||
if mod_params['topic']:
|
||||
t = pubsub_client.topic(mod_params['topic'])
|
||||
s = None
|
||||
if mod_params['subscription']:
|
||||
# Note: default ack deadline cannot be changed without deleting/recreating subscription
|
||||
s = t.subscription(mod_params['subscription']['name'],
|
||||
ack_deadline=mod_params['subscription'].get('ack_deadline', None),
|
||||
push_endpoint=mod_params['subscription'].get('push_endpoint', None))
|
||||
t = None
|
||||
if mod_params['topic']:
|
||||
t = pubsub_client.topic(mod_params['topic'])
|
||||
s = None
|
||||
if mod_params['subscription']:
|
||||
# Note: default ack deadline cannot be changed without deleting/recreating subscription
|
||||
s = t.subscription(mod_params['subscription']['name'],
|
||||
ack_deadline=mod_params['subscription'].get('ack_deadline', None),
|
||||
push_endpoint=mod_params['subscription'].get('push_endpoint', None))
|
||||
|
||||
if mod_params['state'] == 'absent':
|
||||
# Remove the most granular resource. If subcription is specified
|
||||
# we remove it. If only topic is specified, that is what is removed.
|
||||
# Note that a topic can be removed without first removing the subscription.
|
||||
# TODO(supertom): Enhancement: Provide an option to only delete a topic
|
||||
# if there are no subscriptions associated with it (which the API does not support).
|
||||
if s is not None:
|
||||
if s.exists():
|
||||
s.delete()
|
||||
if mod_params['state'] == 'absent':
|
||||
# Remove the most granular resource. If subcription is specified
|
||||
# we remove it. If only topic is specified, that is what is removed.
|
||||
# Note that a topic can be removed without first removing the subscription.
|
||||
# TODO(supertom): Enhancement: Provide an option to only delete a topic
|
||||
# if there are no subscriptions associated with it (which the API does not support).
|
||||
if s is not None:
|
||||
if s.exists():
|
||||
s.delete()
|
||||
changed = True
|
||||
else:
|
||||
if t.exists():
|
||||
t.delete()
|
||||
changed = True
|
||||
elif mod_params['state'] == 'present':
|
||||
if not t.exists():
|
||||
t.create()
|
||||
changed = True
|
||||
else:
|
||||
if t.exists():
|
||||
t.delete()
|
||||
changed = True
|
||||
elif mod_params['state'] == 'present':
|
||||
if not t.exists():
|
||||
t.create()
|
||||
changed = True
|
||||
if s:
|
||||
if not s.exists():
|
||||
s.create()
|
||||
s.reload()
|
||||
changed = True
|
||||
else:
|
||||
# Subscription operations
|
||||
# TODO(supertom): if more 'update' operations arise, turn this into a function.
|
||||
s.reload()
|
||||
push_endpoint=mod_params['subscription'].get('push_endpoint', None)
|
||||
if push_endpoint is not None:
|
||||
if push_endpoint != s.push_endpoint:
|
||||
if push_endpoint == 'None':
|
||||
push_endpoint = None
|
||||
s.modify_push_configuration(push_endpoint=push_endpoint)
|
||||
s.reload()
|
||||
changed = push_endpoint == s.push_endpoint
|
||||
if s:
|
||||
if not s.exists():
|
||||
s.create()
|
||||
s.reload()
|
||||
changed = True
|
||||
else:
|
||||
# Subscription operations
|
||||
# TODO(supertom): if more 'update' operations arise, turn this into a function.
|
||||
s.reload()
|
||||
push_endpoint=mod_params['subscription'].get('push_endpoint', None)
|
||||
if push_endpoint is not None:
|
||||
if push_endpoint != s.push_endpoint:
|
||||
if push_endpoint == 'None':
|
||||
push_endpoint = None
|
||||
s.modify_push_configuration(push_endpoint=push_endpoint)
|
||||
s.reload()
|
||||
changed = push_endpoint == s.push_endpoint
|
||||
|
||||
if 'pull' in mod_params['subscription']:
|
||||
if s.push_endpoint is not None:
|
||||
module.fail_json(msg="Cannot pull messages, push_endpoint is configured.")
|
||||
(json_output['pulled_messages'], changed) = pull_messages(
|
||||
mod_params['subscription']['pull'], s)
|
||||
if 'pull' in mod_params['subscription']:
|
||||
if s.push_endpoint is not None:
|
||||
module.fail_json(msg="Cannot pull messages, push_endpoint is configured.")
|
||||
(json_output['pulled_messages'], changed) = pull_messages(
|
||||
mod_params['subscription']['pull'], s)
|
||||
|
||||
# publish messages to the topic
|
||||
if mod_params['publish'] and len(mod_params['publish']) > 0:
|
||||
changed = publish_messages(mod_params['publish'], t)
|
||||
# publish messages to the topic
|
||||
if mod_params['publish'] and len(mod_params['publish']) > 0:
|
||||
changed = publish_messages(mod_params['publish'], t)
|
||||
|
||||
|
||||
json_output['changed'] = changed
|
||||
json_output.update(mod_params)
|
||||
module.exit_json(**json_output)
|
||||
json_output['changed'] = changed
|
||||
json_output.update(mod_params)
|
||||
module.exit_json(**json_output)
|
||||
|
||||
# import module snippets
|
||||
from ansible.module_utils.basic import *
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue