Connection plugins network_cli and netconf (#32521)

* implements jsonrpc message passing for ansible-connection

* implements more generic mechanism for persistent connections
* starts persistent connection in task_executor if enabled and supported
* supports using network_cli as top level connection plugin
* enhances logging for persistent connection to stdout

* Update action plugins

* Fix Python3 RPC

* Fix Junos bytes<-->str issues

* supports using netconf as top level connection plugin

* Error message when running netconf on an unsupported platform
* Update tests

* Fix `authorize: yes` for `connection: local`

* Handle potentially JSON data in terminal

* Add clarifying detail if possible on ConnectionError
This commit is contained in:
Nathaniel Case 2017-11-09 15:04:40 -05:00 committed by GitHub
parent 897b31f249
commit 9c0275a879
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 722 additions and 798 deletions

View file

@ -47,6 +47,7 @@ DOCUMENTATION = """
import json
import logging
import re
import os
import signal
import socket
import traceback
@ -57,9 +58,11 @@ from ansible import constants as C
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils.six import BytesIO, binary_type
from ansible.module_utils._text import to_bytes, to_text
from ansible.plugins.loader import cliconf_loader, terminal_loader
from ansible.plugins.connection.paramiko_ssh import Connection as _Connection
from ansible.utils.jsonrpc import Rpc
from ansible.plugins.loader import cliconf_loader, terminal_loader, connection_loader
from ansible.plugins.connection import ConnectionBase
from ansible.plugins.connection.local import Connection as LocalConnection
from ansible.plugins.connection.paramiko_ssh import Connection as ParamikoSshConnection
from ansible.utils.path import unfrackpath, makedirs_safe
try:
from __main__ import display
@ -68,31 +71,73 @@ except ImportError:
display = Display()
class Connection(Rpc, _Connection):
class Connection(ConnectionBase):
''' CLI (shell) SSH connections on Paramiko '''
transport = 'network_cli'
has_pipelining = True
force_persistence = True
def __init__(self, play_context, new_stdin, *args, **kwargs):
super(Connection, self).__init__(play_context, new_stdin, *args, **kwargs)
self._terminal = None
self._cliconf = None
self._shell = None
self.ssh = None
self._ssh_shell = None
self._matched_prompt = None
self._matched_pattern = None
self._last_response = None
self._history = list()
self._play_context = play_context
if play_context.verbosity > 3:
self._local = LocalConnection(play_context, new_stdin, *args, **kwargs)
self._terminal = None
self._cliconf = None
if self._play_context.verbosity > 3:
logging.getLogger('paramiko').setLevel(logging.DEBUG)
# reconstruct the socket_path and set instance values accordingly
self._update_connection_state()
def __getattr__(self, name):
try:
return self.__dict__[name]
except KeyError:
if name.startswith('_'):
raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
return getattr(self._cliconf, name)
def exec_command(self, cmd, in_data=None, sudoable=True):
# this try..except block is just to handle the transition to supporting
# network_cli as a toplevel connection. Once connection=local is gone,
# this block can be removed as well and all calls passed directly to
# the local connection
if self._ssh_shell:
try:
cmd = json.loads(to_text(cmd, errors='surrogate_or_strict'))
kwargs = {'command': to_bytes(cmd['command'], errors='surrogate_or_strict')}
for key in ('prompts', 'answer', 'send_only'):
if key in cmd:
kwargs[key] = to_bytes(cmd[key], errors='surrogate_or_strict')
return self.send(**kwargs)
except ValueError:
cmd = to_bytes(cmd, errors='surrogate_or_strict')
return self.send(command=cmd)
else:
return self._local.exec_command(cmd, in_data, sudoable)
def put_file(self, in_path, out_path):
return self._local.put_file(in_path, out_path)
def fetch_file(self, in_path, out_path):
return self._local.fetch_file(in_path, out_path)
def update_play_context(self, play_context):
"""Updates the play context information for the connection"""
display.display('updating play_context for connection', log_only=True)
display.vvvv('updating play_context for connection', host=self._play_context.remote_addr)
if self._play_context.become is False and play_context.become is True:
auth_pass = play_context.become_pass
@ -104,17 +149,22 @@ class Connection(Rpc, _Connection):
self._play_context = play_context
def _connect(self):
"""Connections to the device and sets the terminal type"""
'''
Connects to the remote device and starts the terminal
'''
if self.connected:
return
if self._play_context.password and not self._play_context.private_key_file:
C.PARAMIKO_LOOK_FOR_KEYS = False
super(Connection, self)._connect()
ssh = ParamikoSshConnection(self._play_context, '/dev/null')._connect()
self.ssh = ssh.ssh
display.display('ssh connection done, setting terminal', log_only=True)
display.vvvv('ssh connection done, setting terminal', host=self._play_context.remote_addr)
self._shell = self.ssh.invoke_shell()
self._shell.settimeout(self._play_context.timeout)
self._ssh_shell = self.ssh.invoke_shell()
self._ssh_shell.settimeout(self._play_context.timeout)
network_os = self._play_context.network_os
if not network_os:
@ -127,53 +177,83 @@ class Connection(Rpc, _Connection):
if not self._terminal:
raise AnsibleConnectionFailure('network os %s is not supported' % network_os)
display.display('loaded terminal plugin for network_os %s' % network_os, log_only=True)
display.vvvv('loaded terminal plugin for network_os %s' % network_os, host=self._play_context.remote_addr)
self._cliconf = cliconf_loader.get(network_os, self)
if self._cliconf:
self._rpc.add(self._cliconf)
display.display('loaded cliconf plugin for network_os %s' % network_os, log_only=True)
display.vvvv('loaded cliconf plugin for network_os %s' % network_os, host=self._play_context.remote_addr)
else:
display.display('unable to load cliconf for network_os %s' % network_os)
display.vvvv('unable to load cliconf for network_os %s' % network_os)
self.receive()
display.display('firing event: on_open_shell()', log_only=True)
display.vvvv('firing event: on_open_shell()', host=self._play_context.remote_addr)
self._terminal.on_open_shell()
if getattr(self._play_context, 'become', None):
display.display('firing event: on_authorize', log_only=True)
if self._play_context.become and self._play_context.become_method == 'enable':
display.vvvv('firing event: on_authorize', host=self._play_context.remote_addr)
auth_pass = self._play_context.become_pass
self._terminal.on_authorize(passwd=auth_pass)
display.vvvv('ssh connection has completed successfully', host=self._play_context.remote_addr)
self._connected = True
display.display('ssh connection has completed successfully', log_only=True)
return self
def _update_connection_state(self):
'''
Reconstruct the connection socket_path and check if it exists
If the socket path exists then the connection is active and set
both the _socket_path value to the path and the _connected value
to True. If the socket path doesn't exist, leave the socket path
value to None and the _connected value to False
'''
ssh = connection_loader.get('ssh', class_only=True)
cp = ssh._create_control_path(self._play_context.remote_addr, self._play_context.port, self._play_context.remote_user)
tmp_path = unfrackpath(C.PERSISTENT_CONTROL_PATH_DIR)
socket_path = unfrackpath(cp % dict(directory=tmp_path))
if os.path.exists(socket_path):
self._connected = True
self._socket_path = socket_path
def reset(self):
'''
Reset the connection
'''
if self._socket_path:
display.vvvv('resetting persistent connection for socket_path %s' % self._socket_path, host=self._play_context.remote_addr)
self.shutdown()
def close(self):
"""Close the active connection to the device
"""
display.display("closing ssh connection to device", log_only=True)
if self._shell:
display.display("firing event: on_close_shell()", log_only=True)
self._terminal.on_close_shell()
self._shell.close()
self._shell = None
display.display("cli session is now closed", log_only=True)
super(Connection, self).close()
self._connected = False
display.display("ssh connection has been closed successfully", log_only=True)
'''
Close the active connection to the device
'''
# only close the connection if its connected.
if self._connected:
display.debug("closing ssh connection to device")
if self._ssh_shell:
display.debug("firing event: on_close_shell()")
self._terminal.on_close_shell()
self._ssh_shell.close()
self._ssh_shell = None
display.debug("cli session is now closed")
self._connected = False
display.debug("ssh connection has been closed successfully")
def receive(self, command=None, prompts=None, answer=None):
"""Handles receiving of output from command"""
'''
Handles receiving of output from command
'''
recv = BytesIO()
handled = False
self._matched_prompt = None
while True:
data = self._shell.recv(256)
data = self._ssh_shell.recv(256)
recv.write(data)
offset = recv.tell() - 256 if recv.tell() > 256 else 0
@ -190,25 +270,30 @@ class Connection(Rpc, _Connection):
return self._sanitize(resp, command)
def send(self, command, prompts=None, answer=None, send_only=False):
"""Sends the command to the device in the opened shell"""
'''
Sends the command to the device in the opened shell
'''
try:
self._history.append(command)
self._shell.sendall(b'%s\r' % command)
self._ssh_shell.sendall(b'%s\r' % command)
if send_only:
return
return self.receive(command, prompts, answer)
response = self.receive(command, prompts, answer)
return to_text(response, errors='surrogate_or_strict')
except (socket.timeout, AttributeError):
display.display(traceback.format_exc(), log_only=True)
display.vvvv(traceback.format_exc(), host=self._play_context.remote_addr)
raise AnsibleConnectionFailure("timeout trying to send command: %s" % command.strip())
def _strip(self, data):
"""Removes ANSI codes from device response"""
'''
Removes ANSI codes from device response
'''
for regex in self._terminal.ansi_re:
data = regex.sub(b'', data)
return data
def _handle_prompt(self, resp, prompts, answer):
"""
'''
Matches the command prompt and responds
:arg resp: Byte string containing the raw response from the remote
@ -216,17 +301,19 @@ class Connection(Rpc, _Connection):
:arg answer: Byte string to send back to the remote if we find a prompt.
A carriage return is automatically appended to this string.
:returns: True if a prompt was found in ``resp``. False otherwise
"""
'''
prompts = [re.compile(r, re.I) for r in prompts]
for regex in prompts:
match = regex.search(resp)
if match:
self._shell.sendall(b'%s\r' % answer)
self._ssh_shell.sendall(b'%s\r' % answer)
return True
return False
def _sanitize(self, resp, command=None):
"""Removes elements from the response before returning to the caller"""
'''
Removes elements from the response before returning to the caller
'''
cleaned = []
for line in resp.splitlines():
if (command and line.strip() == command.strip()) or self._matched_prompt.strip() in line:
@ -235,7 +322,8 @@ class Connection(Rpc, _Connection):
return b'\n'.join(cleaned).strip()
def _find_prompt(self, response):
"""Searches the buffered response for a matching command prompt"""
'''Searches the buffered response for a matching command prompt
'''
errored_response = None
is_error_message = False
for regex in self._terminal.terminal_stderr_re:
@ -264,64 +352,3 @@ class Connection(Rpc, _Connection):
raise AnsibleConnectionFailure(errored_response)
return False
def alarm_handler(self, signum, frame):
"""Alarm handler raised in case of command timeout """
display.display('closing shell due to sigalarm', log_only=True)
self.close()
def exec_command(self, cmd):
"""Executes the cmd on in the shell and returns the output
The method accepts three forms of cmd. The first form is as a byte
string that represents the command to be executed in the shell. The
second form is as a utf8 JSON byte string with additional keywords.
The third form is a json-rpc (2.0)
Keywords supported for cmd:
:command: the command string to execute
:prompt: the expected prompt generated by executing command.
This can be a string or a list of strings
:answer: the string to respond to the prompt with
:sendonly: bool to disable waiting for response
:arg cmd: the byte string that represents the command to be executed
which can be a single command or a json encoded string.
:returns: a tuple of (return code, stdout, stderr). The return
code is an integer and stdout and stderr are byte strings
"""
try:
obj = json.loads(to_text(cmd, errors='surrogate_or_strict'))
except (ValueError, TypeError):
obj = {'command': to_bytes(cmd.strip(), errors='surrogate_or_strict')}
obj = dict((k, to_bytes(v, errors='surrogate_or_strict', nonstring='passthru')) for k, v in obj.items())
if 'prompt' in obj:
if isinstance(obj['prompt'], binary_type):
# Prompt was a string
obj['prompt'] = [obj['prompt']]
elif not isinstance(obj['prompt'], Sequence):
# Convert nonstrings into byte strings (to_bytes(5) => b'5')
if obj['prompt'] is not None:
obj['prompt'] = [to_bytes(obj['prompt'], errors='surrogate_or_strict')]
else:
# Prompt was a Sequence of strings. Make sure they're byte strings
obj['prompt'] = [to_bytes(p, errors='surrogate_or_strict') for p in obj['prompt'] if p is not None]
if 'jsonrpc' in obj:
if self._cliconf:
out = self._exec_rpc(obj)
else:
out = self.internal_error("cliconf is not supported for network_os %s" % self._play_context.network_os)
return 0, to_bytes(out, errors='surrogate_or_strict'), b''
if obj['command'] == b'prompt()':
return 0, self._matched_prompt, b''
try:
if not signal.getsignal(signal.SIGALRM):
signal.signal(signal.SIGALRM, self.alarm_handler)
signal.alarm(self._play_context.timeout)
out = self.send(obj['command'], obj.get('prompt'), obj.get('answer'), obj.get('sendonly'))
signal.alarm(0)
return 0, out, b''
except (AnsibleConnectionFailure, ValueError) as exc:
return 1, b'', to_bytes(exc)