mirror of
https://github.com/ansible-collections/community.general.git
synced 2025-04-27 12:51:25 -07:00
* Fix iosxr netconf plugin response namespace * iosxr netconf plugin removes namespace by default for all the responses as parsing of xml is easier without namepsace in iosxr module. However to validate the response received from device against yang model requires namespace to be present in resposne. * Add a parameter in iosxr netconf plugin to control if namespace should be removed from response or not. * Fix CI issues * Fix review comment
204 lines
8 KiB
Python
204 lines
8 KiB
Python
#
|
|
# (c) 2017 Red Hat Inc.
|
|
# (c) 2017 Kedar Kekan (kkekan@redhat.com)
|
|
#
|
|
# This file is part of Ansible
|
|
#
|
|
# Ansible is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Ansible is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
from __future__ import (absolute_import, division, print_function)
|
|
__metaclass__ = type
|
|
|
|
import json
|
|
import re
|
|
import collections
|
|
|
|
from ansible import constants as C
|
|
from ansible.module_utils._text import to_native
|
|
from ansible.module_utils.network.common.netconf import remove_namespaces
|
|
from ansible.module_utils.network.iosxr.iosxr import build_xml, etree_find
|
|
from ansible.errors import AnsibleConnectionFailure, AnsibleError
|
|
from ansible.plugins.netconf import NetconfBase
|
|
from ansible.plugins.netconf import ensure_connected
|
|
|
|
try:
|
|
from ncclient import manager
|
|
from ncclient.operations import RPCError
|
|
from ncclient.transport.errors import SSHUnknownHostError
|
|
from ncclient.xml_ import to_ele, to_xml, new_ele
|
|
except ImportError:
|
|
raise AnsibleError("ncclient is not installed")
|
|
|
|
try:
|
|
from lxml import etree
|
|
except ImportError:
|
|
raise AnsibleError("lxml is not installed")
|
|
|
|
|
|
class Netconf(NetconfBase):
|
|
|
|
@ensure_connected
|
|
def get_device_info(self):
|
|
device_info = {}
|
|
device_info['network_os'] = 'iosxr'
|
|
install_meta = collections.OrderedDict()
|
|
install_meta.update([
|
|
('boot-variables', {'xpath': 'install/boot-variables', 'tag': True}),
|
|
('boot-variable', {'xpath': 'install/boot-variables/boot-variable', 'tag': True, 'lead': True}),
|
|
('software', {'xpath': 'install/software', 'tag': True}),
|
|
('alias-devices', {'xpath': 'install/software/alias-devices', 'tag': True}),
|
|
('alias-device', {'xpath': 'install/software/alias-devices/alias-device', 'tag': True}),
|
|
('m:device-name', {'xpath': 'install/software/alias-devices/alias-device/device-name', 'value': 'disk0:'}),
|
|
])
|
|
|
|
install_filter = build_xml('install', install_meta, opcode='filter')
|
|
|
|
reply = self.get(install_filter)
|
|
ele_boot_variable = etree_find(reply, 'boot-variable/boot-variable')
|
|
if ele_boot_variable is not None:
|
|
device_info['network_os_image'] = re.split('[:|,]', ele_boot_variable.text)[1]
|
|
ele_package_name = etree_find(reply, 'package-name')
|
|
if ele_package_name is not None:
|
|
device_info['network_os_package'] = ele_package_name.text
|
|
device_info['network_os_version'] = re.split('-', ele_package_name.text)[-1]
|
|
|
|
hostname_filter = build_xml('host-names', opcode='filter')
|
|
|
|
reply = self.get(hostname_filter)
|
|
hostname_ele = etree_find(reply, 'host-name')
|
|
device_info['network_os_hostname'] = hostname_ele.text if hostname_ele is not None else None
|
|
|
|
return device_info
|
|
|
|
def get_capabilities(self):
|
|
result = dict()
|
|
result['rpc'] = self.get_base_rpc() + ['commit', 'discard_changes', 'validate', 'lock', 'unlock', 'get-schema']
|
|
result['network_api'] = 'netconf'
|
|
result['device_info'] = self.get_device_info()
|
|
result['server_capabilities'] = [c for c in self.m.server_capabilities]
|
|
result['client_capabilities'] = [c for c in self.m.client_capabilities]
|
|
result['session_id'] = self.m.session_id
|
|
result['device_operations'] = self.get_device_operations(result['server_capabilities'])
|
|
return json.dumps(result)
|
|
|
|
@staticmethod
|
|
def guess_network_os(obj):
|
|
"""
|
|
Guess the remote network os name
|
|
:param obj: Netconf connection class object
|
|
:return: Network OS name
|
|
"""
|
|
try:
|
|
m = manager.connect(
|
|
host=obj._play_context.remote_addr,
|
|
port=obj._play_context.port or 830,
|
|
username=obj._play_context.remote_user,
|
|
password=obj._play_context.password,
|
|
key_filename=obj.key_filename,
|
|
hostkey_verify=obj.get_option('host_key_checking'),
|
|
look_for_keys=obj.get_option('look_for_keys'),
|
|
allow_agent=obj._play_context.allow_agent,
|
|
timeout=obj._play_context.timeout
|
|
)
|
|
except SSHUnknownHostError as exc:
|
|
raise AnsibleConnectionFailure(to_native(exc))
|
|
|
|
guessed_os = None
|
|
for c in m.server_capabilities:
|
|
if re.search('IOS-XR', c):
|
|
guessed_os = 'iosxr'
|
|
break
|
|
|
|
m.close_session()
|
|
return guessed_os
|
|
|
|
# TODO: change .xml to .data_xml, when ncclient supports data_xml on all platforms
|
|
@ensure_connected
|
|
def get(self, filter=None, remove_ns=False):
|
|
if isinstance(filter, list):
|
|
filter = tuple(filter)
|
|
try:
|
|
resp = self.m.get(filter=filter)
|
|
if remove_ns:
|
|
response = remove_namespaces(resp)
|
|
else:
|
|
response = resp.data_xml if hasattr(resp, 'data_xml') else resp.xml
|
|
return response
|
|
except RPCError as exc:
|
|
raise Exception(to_xml(exc.xml))
|
|
|
|
@ensure_connected
|
|
def get_config(self, source=None, filter=None, remove_ns=False):
|
|
if isinstance(filter, list):
|
|
filter = tuple(filter)
|
|
try:
|
|
resp = self.m.get_config(source=source, filter=filter)
|
|
if remove_ns:
|
|
response = remove_namespaces(resp)
|
|
else:
|
|
response = resp.data_xml if hasattr(resp, 'data_xml') else resp.xml
|
|
return response
|
|
except RPCError as exc:
|
|
raise Exception(to_xml(exc.xml))
|
|
|
|
@ensure_connected
|
|
def edit_config(self, config=None, format='xml', target='candidate', default_operation=None, test_option=None, error_option=None, remove_ns=False):
|
|
if config is None:
|
|
raise ValueError('config value must be provided')
|
|
try:
|
|
resp = self.m.edit_config(config, format=format, target=target, default_operation=default_operation, test_option=test_option,
|
|
error_option=error_option)
|
|
if remove_ns:
|
|
response = remove_namespaces(resp)
|
|
else:
|
|
response = resp.data_xml if hasattr(resp, 'data_xml') else resp.xml
|
|
return response
|
|
except RPCError as exc:
|
|
raise Exception(to_xml(exc.xml))
|
|
|
|
@ensure_connected
|
|
def commit(self, confirmed=False, timeout=None, persist=None, remove_ns=False):
|
|
try:
|
|
resp = self.m.commit(confirmed=confirmed, timeout=timeout, persist=persist)
|
|
if remove_ns:
|
|
response = remove_namespaces(resp)
|
|
else:
|
|
response = resp.data_xml if hasattr(resp, 'data_xml') else resp.xml
|
|
return response
|
|
except RPCError as exc:
|
|
raise Exception(to_xml(exc.xml))
|
|
|
|
@ensure_connected
|
|
def validate(self, source="candidate", remove_ns=False):
|
|
try:
|
|
resp = self.m.validate(source=source)
|
|
if remove_ns:
|
|
response = remove_namespaces(resp)
|
|
else:
|
|
response = resp.data_xml if hasattr(resp, 'data_xml') else resp.xml
|
|
return response
|
|
except RPCError as exc:
|
|
raise Exception(to_xml(exc.xml))
|
|
|
|
@ensure_connected
|
|
def discard_changes(self, remove_ns=False):
|
|
try:
|
|
resp = self.m.discard_changes()
|
|
if remove_ns:
|
|
response = remove_namespaces(resp)
|
|
else:
|
|
response = resp.data_xml if hasattr(resp, 'data_xml') else resp.xml
|
|
return response
|
|
except RPCError as exc:
|
|
raise Exception(to_xml(exc.xml))
|