Add new filter to parse xml output for network use cases (#31562)

* Add new filter to parse xml output for network use cases

Fixes #31026
*  Add parse_xml filter
*  Add documentation for parse_xml filter

* Edited for clarity.

* Fix review comment and add unit tests

* Fix unit test CI failure

* Fix CI issues

* Fix unit test failures

* Fix review comments

* More copy edits.
This commit is contained in:
Ganesh Nalawade 2017-11-21 12:16:18 +05:30 committed by GitHub
commit 0ddf092ae3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 393 additions and 6 deletions

View file

@ -22,9 +22,10 @@ __metaclass__ = type
import re
import os
import json
import traceback
from collections import Mapping
from xml.etree.ElementTree import fromstring
from ansible.module_utils.network_common import Template
from ansible.module_utils.six import iteritems, string_types
@ -242,12 +243,115 @@ def parse_cli_textfsm(value, template):
return results
def _extract_param(template, root, attrs, value):
key = None
when = attrs.get('when')
conditional = "{%% if %s %%}True{%% else %%}False{%% endif %%}" % when
param_to_xpath_map = attrs['items']
if isinstance(value, Mapping):
key = value.get('key', None)
if key:
value = value['values']
entries = dict() if key else list()
for element in root.findall(attrs['top']):
entry = dict()
item_dict = dict()
for param, param_xpath in iteritems(param_to_xpath_map):
fields = None
try:
fields = element.findall(param_xpath)
except:
display.warning("Failed to evaluate value of '%s' with XPath '%s'.\nUnexpected error: %s." % (param, param_xpath, traceback.format_exc()))
tags = param_xpath.split('/')
# check if xpath ends with attribute.
# If yes set attribute key/value dict to param value in case attribute matches
# else if it is a normal xpath assign matched element text value.
if len(tags) and tags[-1].endswith(']'):
if fields:
if len(fields) > 1:
item_dict[param] = [field.attrib for field in fields]
else:
item_dict[param] = fields[0].attrib
else:
item_dict[param] = {}
else:
if fields:
if len(fields) > 1:
item_dict[param] = [field.text for field in fields]
else:
item_dict[param] = fields[0].text
else:
item_dict[param] = None
if isinstance(value, Mapping):
for item_key, item_value in iteritems(value):
entry[item_key] = template(item_value, {'item': item_dict})
else:
entry = template(value, {'item': item_dict})
if key:
expanded_key = template(key, {'item': item_dict})
if when:
if template(conditional, {'item': {'key': expanded_key, 'value': entry}}):
entries[expanded_key] = entry
else:
entries[expanded_key] = entry
else:
if when:
if template(conditional, {'item': entry}):
entries.append(entry)
else:
entries.append(entry)
return entries
def parse_xml(output, tmpl):
if not os.path.exists(tmpl):
raise AnsibleError('unable to locate parse_cli template: %s' % tmpl)
if not isinstance(output, string_types):
raise AnsibleError('parse_xml works on string input, but given input of : %s' % type(output))
root = fromstring(output)
try:
template = Template()
except ImportError as exc:
raise AnsibleError(str(exc))
spec = yaml.safe_load(open(tmpl).read())
obj = {}
for name, attrs in iteritems(spec['keys']):
value = attrs['value']
try:
variables = spec.get('vars', {})
value = template(value, variables)
except:
pass
if 'items' in attrs:
obj[name] = _extract_param(template, root, attrs, value)
else:
obj[name] = value
return obj
class FilterModule(object):
"""Filters for working with output from network devices"""
filter_map = {
'parse_cli': parse_cli,
'parse_cli_textfsm': parse_cli_textfsm
'parse_cli_textfsm': parse_cli_textfsm,
'parse_xml': parse_xml
}
def filters(self):