diff --git a/.github/BOTMETA.yml b/.github/BOTMETA.yml index 4cd9838099..a34e5770db 100644 --- a/.github/BOTMETA.yml +++ b/.github/BOTMETA.yml @@ -926,6 +926,9 @@ files: support: network maintainers: $team_networking labels: networking + lib/ansible/plugins/lookup/rabbitmq.py: + maintainers: Im0 + support: community lib/ansible/plugins/netconf/sros.py: maintainers: wisotzky $team_networking labels: networking diff --git a/lib/ansible/plugins/lookup/rabbitmq.py b/lib/ansible/plugins/lookup/rabbitmq.py new file mode 100644 index 0000000000..4ca258af70 --- /dev/null +++ b/lib/ansible/plugins/lookup/rabbitmq.py @@ -0,0 +1,188 @@ +# (c) 2018, John Imison +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +DOCUMENTATION = """ + lookup: rabbitmq + author: John Imison <@Im0> + version_added: "2.8" + short_description: Retrieve messages from an AMQP/AMQPS RabbitMQ queue. + description: + - This lookup uses a basic get to retrieve all, or a limited number C(count), messages from a RabbitMQ queue. + options: + url: + description: + - An URI connection string to connect to the AMQP/AMQPS RabbitMQ server. + - For more information refer to the URI spec U(https://www.rabbitmq.com/uri-spec.html). + required: True + queue: + description: + - The queue to get messages from. + required: True + count: + description: + - How many messages to collect from the queue. + - If not set, defaults to retrieving all the messages from the queue. + requirements: + - The python pika package U(https://pypi.org/project/pika/). + notes: + - This lookup implements BlockingChannel.basic_get to get messages from a RabbitMQ server. + - After retrieving a message from the server, receipt of the message is acknowledged and the message on the server is deleted. + - Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library. + - More information about pika can be found at U(https://pika.readthedocs.io/en/stable/). + - This plugin is tested against RabbitMQ. Other AMQP 0.9.1 protocol based servers may work but not tested/guaranteed. + - Assigning the return messages to a variable under C(vars) may result in unexpected results as the lookup is evaluated every time the + variable is referenced. + - Currently this plugin only handles text based messages from a queue. Unexpected results may occur when retrieving binary data. +""" + + +EXAMPLES = """ +- name: Get all messages off a queue + debug: + msg: "{{ lookup('rabbitmq', url='amqp://guest:guest@192.168.0.10:5672/%2F', queue='hello') }}" + + +# If you are intending on using the returned messages as a variable in more than +# one task (eg. debug, template), it is recommended to set_fact. + +- name: Get 2 messages off a queue and set a fact for re-use + set_fact: + messages: "{{ lookup('rabbitmq', url='amqp://guest:guest@192.168.0.10:5672/%2F', queue='hello', count=2) }}" + +- name: Dump out contents of the messages + debug: + var: messages + +""" + +RETURN = """ + _list: + description: + - A list of dictionaries with keys and value from the queue. + type: list + contains: + content_type: + description: The content_type on the message in the queue. + type: str + delivery_mode: + description: The delivery_mode on the message in the queue. + type: str + delivery_tag: + description: The delivery_tag on the message in the queue. + type: str + exchange: + description: The exchange the message came from. + type: str + message_count: + description: The message_count for the message on the queue. + type: str + msg: + description: The content of the message. + type: str + redelivered: + description: The redelivered flag. True if the message has been delivered before. + type: bool + routing_key: + description: The routing_key on the message in the queue. + type: str + json: + description: If application/json is specified in content_type, json will be loaded into variables. + type: dict + +""" + +from ansible.errors import AnsibleError, AnsibleParserError +from ansible.plugins.lookup import LookupBase +from ansible.module_utils._text import to_native, to_text +import json + +try: + from __main__ import display +except ImportError: + from ansible.utils.display import Display + display = Display() + +try: + import pika + from pika import spec + HAS_PIKA = True +except ImportError: + HAS_PIKA = False + + +class LookupModule(LookupBase): + + def run(self, terms, variables=None, url=None, queue=None, count=None): + if not HAS_PIKA: + raise AnsibleError('pika python package is required for rabbitmq lookup.') + if not url: + raise AnsibleError('URL is required for rabbitmq lookup.') + if not queue: + raise AnsibleError('Queue is required for rabbitmq lookup.') + + display.vvv(u"terms:%s : variables:%s url:%s queue:%s count:%s" % (terms, variables, url, queue, count)) + + try: + parameters = pika.URLParameters(url) + except Exception as e: + raise AnsibleError("URL malformed: %s" % to_native(e)) + + try: + connection = pika.BlockingConnection(parameters) + except Exception as e: + raise AnsibleError("Connection issue: %s" % to_native(e)) + + try: + conn_channel = connection.channel() + except pika.exceptions.AMQPChannelError as e: + try: + connection.close() + except pika.exceptions.AMQPConnectionError as ie: + raise AnsibleError("Channel and connection closing issues: %s / %s" % to_native(e), to_native(ie)) + raise AnsibleError("Channel issue: %s" % to_native(e)) + + ret = [] + idx = 0 + + while True: + method_frame, properties, body = conn_channel.basic_get(queue=queue) + if method_frame: + display.vvv(u"%s, %s, %s " % (method_frame, properties, to_text(body))) + + # TODO: In the future consider checking content_type and handle text/binary data differently. + msg_details = dict({ + 'msg': to_text(body), + 'message_count': method_frame.message_count, + 'routing_key': method_frame.routing_key, + 'delivery_tag': method_frame.delivery_tag, + 'redelivered': method_frame.redelivered, + 'exchange': method_frame.exchange, + 'delivery_mode': properties.delivery_mode, + 'content_type': properties.content_type + }) + if properties.content_type == 'application/json': + try: + msg_details['json'] = json.loads(msg_details['msg']) + except ValueError as e: + raise AnsibleError("Unable to decode JSON for message %s: %s" % (method_frame.delivery_tag, to_native(e))) + + ret.append(msg_details) + conn_channel.basic_ack(method_frame.delivery_tag) + idx += 1 + if method_frame.message_count == 0 or idx == count: + break + # If we didn't get a method_frame, exit. + else: + break + + if connection.is_closing or connection.is_closed: + return [ret] + else: + try: + connection.close() + except pika.exceptions.AMQPConnectionError: + pass + return [ret] diff --git a/test/integration/targets/rabbitmq_lookup/aliases b/test/integration/targets/rabbitmq_lookup/aliases new file mode 100644 index 0000000000..3d0091e7a9 --- /dev/null +++ b/test/integration/targets/rabbitmq_lookup/aliases @@ -0,0 +1,5 @@ +destructive +shippable/posix/group1 +skip/osx +skip/freebsd +skip/rhel diff --git a/test/integration/targets/rabbitmq_lookup/meta/main.yml b/test/integration/targets/rabbitmq_lookup/meta/main.yml new file mode 100644 index 0000000000..05ab59000b --- /dev/null +++ b/test/integration/targets/rabbitmq_lookup/meta/main.yml @@ -0,0 +1,2 @@ +dependencies: + - setup_rabbitmq diff --git a/test/integration/targets/rabbitmq_lookup/tasks/main.yml b/test/integration/targets/rabbitmq_lookup/tasks/main.yml new file mode 100644 index 0000000000..740f899805 --- /dev/null +++ b/test/integration/targets/rabbitmq_lookup/tasks/main.yml @@ -0,0 +1,5 @@ +# Rabbitmq lookup +- include: ubuntu.yml + when: + - ansible_distribution == 'Ubuntu' + - ansible_distribution_release != 'trusty' diff --git a/test/integration/targets/rabbitmq_lookup/tasks/ubuntu.yml b/test/integration/targets/rabbitmq_lookup/tasks/ubuntu.yml new file mode 100644 index 0000000000..6fba6ebdf9 --- /dev/null +++ b/test/integration/targets/rabbitmq_lookup/tasks/ubuntu.yml @@ -0,0 +1,138 @@ +- name: Test failure without pika installed + set_fact: + rabbit_missing_pika: "{{ lookup('rabbitmq', url='amqp://guest:guest@192.168.250.1:5672/%2F', queue='hello', count=3) }}" + ignore_errors: yes + register: rabbitmq_missing_pika_error + +- assert: + that: + - "'pika python package is required' in rabbitmq_missing_pika_error.msg" + +- name: Install pika and requests + pip: + name: pika,requests + state: latest + +- name: Test that giving an incorrect amqp protocol in URL will error + set_fact: + rabbitmq_test_protocol: "{{ lookup('rabbitmq', url='zzzamqp://guest:guest@192.168.250.1:5672/%2F', queue='hello', count=3) }}" + ignore_errors: yes + register: rabbitmq_protocol_error + +- assert: + that: + - "rabbitmq_protocol_error is failed" + - "'URL malformed' in rabbitmq_protocol_error.msg" + +- name: Test that giving an incorrect IP address in URL will error + set_fact: + rabbitmq_test_protocol: "{{ lookup('rabbitmq', url='amqp://guest:guest@xxxxx192.112312368.250.1:5672/%2F', queue='hello', count=3) }}" + ignore_errors: yes + register: rabbitmq_ip_error + +- assert: + that: + - "rabbitmq_ip_error is failed" + - "'Connection issue' in rabbitmq_ip_error.msg" + +- name: Test missing parameters will error + set_fact: + rabbitmq_test_protocol: "{{ lookup('rabbitmq') }}" + ignore_errors: yes + register: rabbitmq_params_error + +- assert: + that: + - "rabbitmq_params_error is failed" + - "'URL is required for rabbitmq lookup.' in rabbitmq_params_error.msg" + +- name: Test missing queue will error + set_fact: + rabbitmq_queue_protocol: "{{ lookup('rabbitmq', url='amqp://guest:guest@192.168.250.1:5672/%2F') }}" + ignore_errors: yes + register: rabbitmq_queue_error + +- assert: + that: + - "rabbitmq_queue_error is failed" + - "'Queue is required for rabbitmq lookup' in rabbitmq_queue_error.msg" + +- name: Enables the rabbitmq_management plugin + rabbitmq_plugin: + names: rabbitmq_management + state: enabled + +- name: Setup test queue + rabbitmq_queue: + name: hello + +- name: Post test message to the exchange (string) + uri: + url: http://localhost:15672/api/exchanges/%2f/amq.default/publish + method: POST + body: '{"properties":{},"routing_key":"hello","payload":"ansible-test","payload_encoding":"string"}' + user: guest + password: guest + force_basic_auth: yes + return_content: yes + register: post_data + headers: + Content-Type: "application/json" + + +- name: Post test message to the exchange (json) + uri: + url: http://localhost:15672/api/exchanges/%2f/amq.default/publish + method: POST + body: '{"properties":{"content_type": "application/json"},"routing_key":"hello","payload":"{\"key\": \"value\" }","payload_encoding":"string"}' + user: guest + password: guest + force_basic_auth: yes + return_content: yes + register: post_data_json + headers: + Content-Type: "application/json" + +- name: Test retrieve messages + set_fact: + rabbitmq_msg: "{{ lookup('rabbitmq', url='amqp://guest:guest@localhost:5672/%2f/hello', queue='hello') }}" + ignore_errors: yes + register: rabbitmq_msg_error + +- name: Ensure two messages received + assert: + that: + - "rabbitmq_msg_error is not failed" + - rabbitmq_msg | length == 2 + +- name: Ensure first message is a string + assert: + that: + - rabbitmq_msg[0].msg == "ansible-test" + +- name: Ensure second message is json + assert: + that: + - rabbitmq_msg[1].json.key == "value" + +- name: Test missing vhost + set_fact: + rabbitmq_msg: "{{ lookup('rabbitmq', url='amqp://guest:guest@localhost:5672/missing/', queue='hello') }}" + ignore_errors: yes + register: rabbitmq_vhost_error + +- assert: + that: + - "rabbitmq_vhost_error is failed" + - "'NOT_ALLOWED' in rabbitmq_vhost_error.msg" + +# Tidy up +- name: Uninstall pika and requests + pip: + name: pika,requests + state: absent + +- name: Disable the rabbitmq_management plugin + rabbitmq_plugin: + names: rabbitmq_management + state: disabled