From 298e0f60be2eda314525cd8b5248fecaa7a12056 Mon Sep 17 00:00:00 2001 From: Yevhen Khmelenko Date: Sun, 24 Jan 2021 22:00:55 +0200 Subject: [PATCH] Update ansible-logstash-callback (#641) * Update version of callback/logstash.py * ansible-logstash-callback rollback version_added location * Update plugins/callback/logstash.py Co-authored-by: Felix Fontein * Update plugins/callback/logstash.py Co-authored-by: Felix Fontein * Update changelogs/fragments/641-update-ansible-logstash-callback.yml Co-authored-by: Felix Fontein Co-authored-by: Felix Fontein --- .../641-update-ansible-logstash-callback.yml | 3 + plugins/callback/logstash.py | 427 ++++++++++++------ 2 files changed, 289 insertions(+), 141 deletions(-) create mode 100644 changelogs/fragments/641-update-ansible-logstash-callback.yml diff --git a/changelogs/fragments/641-update-ansible-logstash-callback.yml b/changelogs/fragments/641-update-ansible-logstash-callback.yml new file mode 100644 index 0000000000..d72a8e9bd1 --- /dev/null +++ b/changelogs/fragments/641-update-ansible-logstash-callback.yml @@ -0,0 +1,3 @@ +minor_changes: +- logstash callback - migrate to python3-logstash (https://github.com/ansible-collections/community.general/pull/641). +- logstash callback - improve logstash message structure, needs to be enabled with the ``format_version`` option (https://github.com/ansible-collections/community.general/pull/641). diff --git a/plugins/callback/logstash.py b/plugins/callback/logstash.py index 6b19e051f9..ef862fdb42 100644 --- a/plugins/callback/logstash.py +++ b/plugins/callback/logstash.py @@ -1,12 +1,12 @@ -# (C) 2016, Ievgen Khmelenko +# (C) 2020, Yevhen Khmelenko # (C) 2017 Ansible Project # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import (absolute_import, division, print_function) __metaclass__ = type -DOCUMENTATION = ''' - author: Unknown (!UNKNOWN) +DOCUMENTATION = r''' + author: Yevhen Khmelenko (@ujenmr) name: logstash type: notification short_description: Sends events to Logstash @@ -43,15 +43,60 @@ DOCUMENTATION = ''' key: type version_added: 1.0.0 default: ansible + pre_command: + description: Executes command before run and result put to ansible_pre_command_output field. + version_added: 2.0.0 + ini: + - section: callback_logstash + key: pre_command + env: + - name: LOGSTASH_PRE_COMMAND + format_version: + description: Logging format + type: str + version_added: 2.0.0 + ini: + - section: callback_logstash + key: format_version + env: + - name: LOGSTASH_FORMAT_VERSION + default: v1 + choices: + - v1 + - v2 + +''' + +EXAMPLES = r''' +ansible.cfg: | + # Enable Callback plugin + [defaults] + callback_whitelist = community.general.logstash + + [callback_logstash] + server = logstash.example.com + port = 5000 + pre_command = git rev-parse HEAD + type = ansible + +11-input-tcp.conf: | + # Enable Logstash TCP Input + input { + tcp { + port => 5000 + codec => json + add_field => { "[@metadata][beat]" => "notify" } + add_field => { "[@metadata][type]" => "ansible" } + } + } ''' import os import json import socket import uuid -from datetime import datetime - import logging +from datetime import datetime try: import logstash @@ -63,76 +108,78 @@ from ansible.plugins.callback import CallbackBase class CallbackModule(CallbackBase): - """ - ansible logstash callback plugin - ansible.cfg: - callback_plugins = - callback_whitelist = logstash - and put the plugin in - - logstash config: - input { - tcp { - port => 5000 - codec => json - } - } - - Requires: - python-logstash - - This plugin makes use of the following environment variables or ini config: - LOGSTASH_SERVER (optional): defaults to localhost - LOGSTASH_PORT (optional): defaults to 5000 - LOGSTASH_TYPE (optional): defaults to ansible - """ CALLBACK_VERSION = 2.0 CALLBACK_TYPE = 'aggregate' CALLBACK_NAME = 'community.general.logstash' CALLBACK_NEEDS_WHITELIST = True - def __init__(self, display=None): - super(CallbackModule, self).__init__(display=display) + def __init__(self): + super(CallbackModule, self).__init__() if not HAS_LOGSTASH: self.disabled = True - self._display.warning("The required python-logstash is not installed. " - "pip install python-logstash") + self._display.warning("The required python-logstash/python3-logstash is not installed. " + "pip install python-logstash for Python 2" + "pip install python3-logstash for Python 3") self.start_time = datetime.utcnow() - def set_options(self, task_keys=None, var_options=None, direct=None): + def _init_plugin(self): + if not self.disabled: + self.logger = logging.getLogger('python-logstash-logger') + self.logger.setLevel(logging.DEBUG) + self.handler = logstash.TCPLogstashHandler( + self.ls_server, + self.ls_port, + version=1, + message_type=self.ls_type + ) + + self.logger.addHandler(self.handler) + self.hostname = socket.gethostname() + self.session = str(uuid.uuid4()) + self.errors = 0 + + self.base_data = { + 'session': self.session, + 'host': self.hostname + } + + if self.ls_pre_command is not None: + self.base_data['ansible_pre_command_output'] = os.popen( + self.ls_pre_command).read() + + if self._options is not None: + self.base_data['ansible_checkmode'] = self._options.check + self.base_data['ansible_tags'] = self._options.tags + self.base_data['ansible_skip_tags'] = self._options.skip_tags + self.base_data['inventory'] = self._options.inventory + + def set_options(self, task_keys=None, var_options=None, direct=None): super(CallbackModule, self).set_options(task_keys=task_keys, var_options=var_options, direct=direct) - self.logger = logging.getLogger('python-logstash-logger') - self.logger.setLevel(logging.DEBUG) + self.ls_server = self.get_option('server') + self.ls_port = int(self.get_option('port')) + self.ls_type = self.get_option('type') + self.ls_pre_command = self.get_option('pre_command') + self.ls_format_version = self.get_option('format_version') - self.logstash_server = self.get_option('server') - self.logstash_port = self.get_option('port') - self.logstash_type = self.get_option('type') - self.handler = logstash.TCPLogstashHandler( - self.logstash_server, - int(self.logstash_port), - version=1, - message_type=self.logstash_type - ) - self.logger.addHandler(self.handler) - self.hostname = socket.gethostname() - self.session = str(uuid.uuid1()) - self.errors = 0 + self._init_plugin() def v2_playbook_on_start(self, playbook): - self.playbook = playbook._file_name - data = { - 'status': "OK", - 'host': self.hostname, - 'session': self.session, - 'ansible_type': "start", - 'ansible_playbook': self.playbook, - } - self.logger.info("ansible start", extra=data) + data = self.base_data.copy() + data['ansible_type'] = "start" + data['status'] = "OK" + data['ansible_playbook'] = playbook._file_name + + if (self.ls_format_version == "v2"): + self.logger.info( + "START PLAYBOOK | %s", data['ansible_playbook'], extra=data + ) + else: + self.logger.info("ansible start", extra=data) def v2_playbook_on_stats(self, stats): end_time = datetime.utcnow() @@ -146,103 +193,201 @@ class CallbackModule(CallbackBase): else: status = "FAILED" - data = { - 'status': status, - 'host': self.hostname, - 'session': self.session, - 'ansible_type': "finish", - 'ansible_playbook': self.playbook, - 'ansible_playbook_duration': runtime.total_seconds(), - 'ansible_result': json.dumps(summarize_stat), - } - self.logger.info("ansible stats", extra=data) + data = self.base_data.copy() + data['ansible_type'] = "finish" + data['status'] = status + data['ansible_playbook_duration'] = runtime.total_seconds() + data['ansible_result'] = json.dumps(summarize_stat) # deprecated field + + if (self.ls_format_version == "v2"): + self.logger.info( + "FINISH PLAYBOOK | %s", json.dumps(summarize_stat), extra=data + ) + else: + self.logger.info("ansible stats", extra=data) + + def v2_playbook_on_play_start(self, play): + self.play_id = str(play._uuid) + + if play.name: + self.play_name = play.name + + data = self.base_data.copy() + data['ansible_type'] = "start" + data['status'] = "OK" + data['ansible_play_id'] = self.play_id + data['ansible_play_name'] = self.play_name + + if (self.ls_format_version == "v2"): + self.logger.info("START PLAY | %s", self.play_name, extra=data) + else: + self.logger.info("ansible play", extra=data) + + def v2_playbook_on_task_start(self, task, is_conditional): + self.task_id = str(task._uuid) + + ''' + Tasks and handler tasks are dealt with here + ''' def v2_runner_on_ok(self, result, **kwargs): - data = { - 'status': "OK", - 'host': self.hostname, - 'session': self.session, - 'ansible_type': "task", - 'ansible_playbook': self.playbook, - 'ansible_host': result._host.name, - 'ansible_task': result._task, - 'ansible_result': self._dump_results(result._result) - } - self.logger.info("ansible ok", extra=data) + task_name = str(result._task).replace('TASK: ', '').replace('HANDLER: ', '') + + data = self.base_data.copy() + if task_name == 'setup': + data['ansible_type'] = "setup" + data['status'] = "OK" + data['ansible_host'] = result._host.name + data['ansible_play_id'] = self.play_id + data['ansible_play_name'] = self.play_name + data['ansible_task'] = task_name + data['ansible_facts'] = self._dump_results(result._result) + + if (self.ls_format_version == "v2"): + self.logger.info( + "SETUP FACTS | %s", self._dump_results(result._result), extra=data + ) + else: + self.logger.info("ansible facts", extra=data) + else: + if 'changed' in result._result.keys(): + data['ansible_changed'] = result._result['changed'] + else: + data['ansible_changed'] = False + + data['ansible_type'] = "task" + data['status'] = "OK" + data['ansible_host'] = result._host.name + data['ansible_play_id'] = self.play_id + data['ansible_play_name'] = self.play_name + data['ansible_task'] = task_name + data['ansible_task_id'] = self.task_id + data['ansible_result'] = self._dump_results(result._result) + + if (self.ls_format_version == "v2"): + self.logger.info( + "TASK OK | %s | RESULT | %s", + task_name, self._dump_results(result._result), extra=data + ) + else: + self.logger.info("ansible ok", extra=data) def v2_runner_on_skipped(self, result, **kwargs): - data = { - 'status': "SKIPPED", - 'host': self.hostname, - 'session': self.session, - 'ansible_type': "task", - 'ansible_playbook': self.playbook, - 'ansible_task': result._task, - 'ansible_host': result._host.name - } - self.logger.info("ansible skipped", extra=data) + task_name = str(result._task).replace('TASK: ', '').replace('HANDLER: ', '') + + data = self.base_data.copy() + data['ansible_type'] = "task" + data['status'] = "SKIPPED" + data['ansible_host'] = result._host.name + data['ansible_play_id'] = self.play_id + data['ansible_play_name'] = self.play_name + data['ansible_task'] = task_name + data['ansible_task_id'] = self.task_id + data['ansible_result'] = self._dump_results(result._result) + + if (self.ls_format_version == "v2"): + self.logger.info("TASK SKIPPED | %s", task_name, extra=data) + else: + self.logger.info("ansible skipped", extra=data) def v2_playbook_on_import_for_host(self, result, imported_file): - data = { - 'status': "IMPORTED", - 'host': self.hostname, - 'session': self.session, - 'ansible_type': "import", - 'ansible_playbook': self.playbook, - 'ansible_host': result._host.name, - 'imported_file': imported_file - } - self.logger.info("ansible import", extra=data) + data = self.base_data.copy() + data['ansible_type'] = "import" + data['status'] = "IMPORTED" + data['ansible_host'] = result._host.name + data['ansible_play_id'] = self.play_id + data['ansible_play_name'] = self.play_name + data['imported_file'] = imported_file + + if (self.ls_format_version == "v2"): + self.logger.info("IMPORT | %s", imported_file, extra=data) + else: + self.logger.info("ansible import", extra=data) def v2_playbook_on_not_import_for_host(self, result, missing_file): - data = { - 'status': "NOT IMPORTED", - 'host': self.hostname, - 'session': self.session, - 'ansible_type': "import", - 'ansible_playbook': self.playbook, - 'ansible_host': result._host.name, - 'missing_file': missing_file - } - self.logger.info("ansible import", extra=data) + data = self.base_data.copy() + data['ansible_type'] = "import" + data['status'] = "NOT IMPORTED" + data['ansible_host'] = result._host.name + data['ansible_play_id'] = self.play_id + data['ansible_play_name'] = self.play_name + data['imported_file'] = missing_file + + if (self.ls_format_version == "v2"): + self.logger.info("NOT IMPORTED | %s", missing_file, extra=data) + else: + self.logger.info("ansible import", extra=data) def v2_runner_on_failed(self, result, **kwargs): - data = { - 'status': "FAILED", - 'host': self.hostname, - 'session': self.session, - 'ansible_type': "task", - 'ansible_playbook': self.playbook, - 'ansible_host': result._host.name, - 'ansible_task': result._task, - 'ansible_result': self._dump_results(result._result) - } + task_name = str(result._task).replace('TASK: ', '').replace('HANDLER: ', '') + + data = self.base_data.copy() + if 'changed' in result._result.keys(): + data['ansible_changed'] = result._result['changed'] + else: + data['ansible_changed'] = False + + data['ansible_type'] = "task" + data['status'] = "FAILED" + data['ansible_host'] = result._host.name + data['ansible_play_id'] = self.play_id + data['ansible_play_name'] = self.play_name + data['ansible_task'] = task_name + data['ansible_task_id'] = self.task_id + data['ansible_result'] = self._dump_results(result._result) + self.errors += 1 - self.logger.error("ansible failed", extra=data) + if (self.ls_format_version == "v2"): + self.logger.error( + "TASK FAILED | %s | HOST | %s | RESULT | %s", + task_name, self.hostname, + self._dump_results(result._result), extra=data + ) + else: + self.logger.error("ansible failed", extra=data) def v2_runner_on_unreachable(self, result, **kwargs): - data = { - 'status': "UNREACHABLE", - 'host': self.hostname, - 'session': self.session, - 'ansible_type': "task", - 'ansible_playbook': self.playbook, - 'ansible_host': result._host.name, - 'ansible_task': result._task, - 'ansible_result': self._dump_results(result._result) - } - self.logger.error("ansible unreachable", extra=data) + task_name = str(result._task).replace('TASK: ', '').replace('HANDLER: ', '') + + data = self.base_data.copy() + data['ansible_type'] = "task" + data['status'] = "UNREACHABLE" + data['ansible_host'] = result._host.name + data['ansible_play_id'] = self.play_id + data['ansible_play_name'] = self.play_name + data['ansible_task'] = task_name + data['ansible_task_id'] = self.task_id + data['ansible_result'] = self._dump_results(result._result) + + self.errors += 1 + if (self.ls_format_version == "v2"): + self.logger.error( + "UNREACHABLE | %s | HOST | %s | RESULT | %s", + task_name, self.hostname, + self._dump_results(result._result), extra=data + ) + else: + self.logger.error("ansible unreachable", extra=data) def v2_runner_on_async_failed(self, result, **kwargs): - data = { - 'status': "FAILED", - 'host': self.hostname, - 'session': self.session, - 'ansible_type': "task", - 'ansible_playbook': self.playbook, - 'ansible_host': result._host.name, - 'ansible_task': result._task, - 'ansible_result': self._dump_results(result._result) - } + task_name = str(result._task).replace('TASK: ', '').replace('HANDLER: ', '') + + data = self.base_data.copy() + data['ansible_type'] = "task" + data['status'] = "FAILED" + data['ansible_host'] = result._host.name + data['ansible_play_id'] = self.play_id + data['ansible_play_name'] = self.play_name + data['ansible_task'] = task_name + data['ansible_task_id'] = self.task_id + data['ansible_result'] = self._dump_results(result._result) + self.errors += 1 - self.logger.error("ansible async", extra=data) + if (self.ls_format_version == "v2"): + self.logger.error( + "ASYNC FAILED | %s | HOST | %s | RESULT | %s", + task_name, self.hostname, + self._dump_results(result._result), extra=data + ) + else: + self.logger.error("ansible async", extra=data)