community.general/lib/ansible/cli/console.py
Toshio Kuratomi 7e92ff823e Split up the base_parser function
The goal of breaking apart the base_parser() function is to get rid of
a bunch of conditionals and parameters in the code and, instead, make
code look like simple composition.

When splitting, a choice had to be made as to whether this would operate
by side effect (modifying a passed in parser) or side effect-free
(returning a new parser everytime).

Making a version that's side-effect-free appears to be fighting with the
optparse API (it wants to work by creating a parser object, configuring
the object, and then parsing the arguments with it) so instead, make it
clear that our helper functions are modifying the passed in parser by
(1) not returning the parser and (2) changing the function names to be
more clear that it is operating by side-effect.

Also move all of the generic optparse code, along with the argument
context classes, into a new subdirectory.
2019-01-03 18:12:23 -08:00

458 lines
16 KiB
Python

# Copyright: (c) 2014, Nandor Sivok <dominis@haxor.hu>
# Copyright: (c) 2016, Redhat Inc
# Copyright: (c) 2018, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
########################################################
# ansible-console is an interactive REPL shell for ansible
# with built-in tab completion for all the documented modules
#
# Available commands:
# cd - change host/group (you can use host patterns eg.: app*.dc*:!app01*)
# list - list available hosts in the current path
# forks - change fork
# become - become
# ! - forces shell module instead of the ansible module (!yum update -y)
import atexit
import cmd
import getpass
import readline
import os
import sys
from ansible import constants as C
from ansible import context
from ansible.arguments import optparse_helpers as opt_help
from ansible.cli import CLI
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.module_utils._text import to_native, to_text
from ansible.module_utils.parsing.convert_bool import boolean
from ansible.parsing.splitter import parse_kv
from ansible.playbook.play import Play
from ansible.plugins.loader import module_loader, fragment_loader
from ansible.utils import plugin_docs
from ansible.utils.color import stringc
from ansible.utils.display import Display
display = Display()
class ConsoleCLI(CLI, cmd.Cmd):
''' a REPL that allows for running ad-hoc tasks against a chosen inventory (based on dominis' ansible-shell).'''
modules = []
ARGUMENTS = {'host-pattern': 'A name of a group in the inventory, a shell-like glob '
'selecting hosts in inventory or any combination of the two separated by commas.'}
# use specific to console, but fallback to highlight for backwards compatibility
NORMAL_PROMPT = C.COLOR_CONSOLE_PROMPT or C.COLOR_HIGHLIGHT
def __init__(self, args):
super(ConsoleCLI, self).__init__(args)
self.intro = 'Welcome to the ansible console.\nType help or ? to list commands.\n'
self.groups = []
self.hosts = []
self.pattern = None
self.variable_manager = None
self.loader = None
self.passwords = dict()
self.modules = None
self.cwd = '*'
# Defaults for these are set from the CLI in run()
self.remote_user = None
self.become = None
self.become_user = None
self.become_method = None
self.check_mode = None
self.diff = None
self.forks = None
cmd.Cmd.__init__(self)
def init_parser(self):
super(ConsoleCLI, self).init_parser(
usage='%prog [<host-pattern>] [options]',
desc="REPL console for executing Ansible tasks.",
epilog="This is not a live session/connection, each task executes in the background and returns it's results."
)
opt_help.add_runas_options(self.parser)
opt_help.add_inventory_options(self.parser)
opt_help.add_connect_options(self.parser)
opt_help.add_check_options(self.parser)
opt_help.add_vault_options(self.parser)
opt_help.add_fork_options(self.parser)
opt_help.add_module_options(self.parser)
opt_help.add_basedir_options(self.parser)
# options unique to shell
self.parser.add_option('--step', dest='step', action='store_true',
help="one-step-at-a-time: confirm each task before running")
def post_process_args(self, options, args):
options, args = super(ConsoleCLI, self).post_process_args(options, args)
display.verbosity = options.verbosity
options = self.normalize_become_options(options)
self.validate_conflicts(options, runas_opts=True, vault_opts=True, fork_opts=True)
return options, args
def get_names(self):
return dir(self)
def cmdloop(self):
try:
cmd.Cmd.cmdloop(self)
except KeyboardInterrupt:
self.do_exit(self)
def set_prompt(self):
login_user = self.remote_user or getpass.getuser()
self.selected = self.inventory.list_hosts(self.cwd)
prompt = "%s@%s (%d)[f:%s]" % (login_user, self.cwd, len(self.selected), self.forks)
if self.become and self.become_user in [None, 'root']:
prompt += "# "
color = C.COLOR_ERROR
else:
prompt += "$ "
color = self.NORMAL_PROMPT
self.prompt = stringc(prompt, color)
def list_modules(self):
modules = set()
if context.CLIARGS['module_path']:
for path in context.CLIARGS['module_path']:
if path:
module_loader.add_directory(path)
module_paths = module_loader._get_paths()
for path in module_paths:
if path is not None:
modules.update(self._find_modules_in_path(path))
return modules
def _find_modules_in_path(self, path):
if os.path.isdir(path):
for module in os.listdir(path):
if module.startswith('.'):
continue
elif os.path.isdir(module):
self._find_modules_in_path(module)
elif module.startswith('__'):
continue
elif any(module.endswith(x) for x in C.BLACKLIST_EXTS):
continue
elif module in C.IGNORE_FILES:
continue
elif module.startswith('_'):
fullpath = '/'.join([path, module])
if os.path.islink(fullpath): # avoids aliases
continue
module = module.replace('_', '', 1)
module = os.path.splitext(module)[0] # removes the extension
yield module
def default(self, arg, forceshell=False):
""" actually runs modules """
if arg.startswith("#"):
return False
if not self.cwd:
display.error("No host found")
return False
if arg.split()[0] in self.modules:
module = arg.split()[0]
module_args = ' '.join(arg.split()[1:])
else:
module = 'shell'
module_args = arg
if forceshell is True:
module = 'shell'
module_args = arg
result = None
try:
check_raw = module in ('command', 'shell', 'script', 'raw')
play_ds = dict(
name="Ansible Shell",
hosts=self.cwd,
gather_facts='no',
tasks=[dict(action=dict(module=module, args=parse_kv(module_args, check_raw=check_raw)))],
remote_user=self.remote_user,
become=self.become,
become_user=self.become_user,
become_method=self.become_method,
check_mode=self.check_mode,
diff=self.diff,
)
play = Play().load(play_ds, variable_manager=self.variable_manager, loader=self.loader)
except Exception as e:
display.error(u"Unable to build command: %s" % to_text(e))
return False
try:
cb = 'minimal' # FIXME: make callbacks configurable
# now create a task queue manager to execute the play
self._tqm = None
try:
self._tqm = TaskQueueManager(
inventory=self.inventory,
variable_manager=self.variable_manager,
loader=self.loader,
passwords=self.passwords,
stdout_callback=cb,
run_additional_callbacks=C.DEFAULT_LOAD_CALLBACK_PLUGINS,
run_tree=False,
forks=self.forks,
)
result = self._tqm.run(play)
finally:
if self._tqm:
self._tqm.cleanup()
if self.loader:
self.loader.cleanup_all_tmp_files()
if result is None:
display.error("No hosts found")
return False
except KeyboardInterrupt:
display.error('User interrupted execution')
return False
except Exception as e:
display.error(to_text(e))
# FIXME: add traceback in very very verbose mode
return False
def emptyline(self):
return
def do_shell(self, arg):
"""
You can run shell commands through the shell module.
eg.:
shell ps uax | grep java | wc -l
shell killall python
shell halt -n
You can use the ! to force the shell module. eg.:
!ps aux | grep java | wc -l
"""
self.default(arg, True)
def do_forks(self, arg):
"""Set the number of forks"""
if not arg:
display.display('Usage: forks <number>')
return
forks = int(arg)
if forks <= 0:
display.display('forks must be greater than or equal to 1')
return
self.forks = forks
self.set_prompt()
do_serial = do_forks
def do_verbosity(self, arg):
"""Set verbosity level"""
if not arg:
display.display('Usage: verbosity <number>')
else:
display.verbosity = int(arg)
display.v('verbosity level set to %s' % arg)
def do_cd(self, arg):
"""
Change active host/group. You can use hosts patterns as well eg.:
cd webservers
cd webservers:dbservers
cd webservers:!phoenix
cd webservers:&staging
cd webservers:dbservers:&staging:!phoenix
"""
if not arg:
self.cwd = '*'
elif arg in '/*':
self.cwd = 'all'
elif self.inventory.get_hosts(arg):
self.cwd = arg
else:
display.display("no host matched")
self.set_prompt()
def do_list(self, arg):
"""List the hosts in the current group"""
if arg == 'groups':
for group in self.groups:
display.display(group)
else:
for host in self.selected:
display.display(host.name)
def do_become(self, arg):
"""Toggle whether plays run with become"""
if arg:
self.become = boolean(arg, strict=False)
display.v("become changed to %s" % self.become)
self.set_prompt()
else:
display.display("Please specify become value, e.g. `become yes`")
def do_remote_user(self, arg):
"""Given a username, set the remote user plays are run by"""
if arg:
self.remote_user = arg
self.set_prompt()
else:
display.display("Please specify a remote user, e.g. `remote_user root`")
def do_become_user(self, arg):
"""Given a username, set the user that plays are run by when using become"""
if arg:
self.become_user = arg
else:
display.display("Please specify a user, e.g. `become_user jenkins`")
display.v("Current user is %s" % self.become_user)
self.set_prompt()
def do_become_method(self, arg):
"""Given a become_method, set the privilege escalation method when using become"""
if arg:
self.become_method = arg
display.v("become_method changed to %s" % self.become_method)
else:
display.display("Please specify a become_method, e.g. `become_method su`")
def do_check(self, arg):
"""Toggle whether plays run with check mode"""
if arg:
self.check_mode = boolean(arg, strict=False)
display.v("check mode changed to %s" % self.check_mode)
else:
display.display("Please specify check mode value, e.g. `check yes`")
def do_diff(self, arg):
"""Toggle whether plays run with diff"""
if arg:
self.diff = boolean(arg, strict=False)
display.v("diff mode changed to %s" % self.diff)
else:
display.display("Please specify a diff value , e.g. `diff yes`")
def do_exit(self, args):
"""Exits from the console"""
sys.stdout.write('\n')
return -1
do_EOF = do_exit
def helpdefault(self, module_name):
if module_name in self.modules:
in_path = module_loader.find_plugin(module_name)
if in_path:
oc, a, _, _ = plugin_docs.get_docstring(in_path, fragment_loader)
if oc:
display.display(oc['short_description'])
display.display('Parameters:')
for opt in oc['options'].keys():
display.display(' ' + stringc(opt, self.NORMAL_PROMPT) + ' ' + oc['options'][opt]['description'][0])
else:
display.error('No documentation found for %s.' % module_name)
else:
display.error('%s is not a valid command, use ? to list all valid commands.' % module_name)
def complete_cd(self, text, line, begidx, endidx):
mline = line.partition(' ')[2]
offs = len(mline) - len(text)
if self.cwd in ('all', '*', '\\'):
completions = self.hosts + self.groups
else:
completions = [x.name for x in self.inventory.list_hosts(self.cwd)]
return [to_native(s)[offs:] for s in completions if to_native(s).startswith(to_native(mline))]
def completedefault(self, text, line, begidx, endidx):
if line.split()[0] in self.modules:
mline = line.split(' ')[-1]
offs = len(mline) - len(text)
completions = self.module_args(line.split()[0])
return [s[offs:] + '=' for s in completions if s.startswith(mline)]
def module_args(self, module_name):
in_path = module_loader.find_plugin(module_name)
oc, a, _, _ = plugin_docs.get_docstring(in_path, fragment_loader)
return list(oc['options'].keys())
def run(self):
super(ConsoleCLI, self).run()
sshpass = None
becomepass = None
# hosts
if len(context.CLIARGS['args']) != 1:
self.pattern = 'all'
else:
self.pattern = context.CLIARGS['args'][0]
self.cwd = self.pattern
# Defaults from the command line
self.remote_user = context.CLIARGS['remote_user']
self.become = context.CLIARGS['become']
self.become_user = context.CLIARGS['become_user']
self.become_method = context.CLIARGS['become_method']
self.check_mode = context.CLIARGS['check']
self.diff = context.CLIARGS['diff']
self.forks = context.CLIARGS['forks']
# dynamically add modules as commands
self.modules = self.list_modules()
for module in self.modules:
setattr(self, 'do_' + module, lambda arg, module=module: self.default(module + ' ' + arg))
setattr(self, 'help_' + module, lambda module=module: self.helpdefault(module))
(sshpass, becomepass) = self.ask_passwords()
self.passwords = {'conn_pass': sshpass, 'become_pass': becomepass}
self.loader, self.inventory, self.variable_manager = self._play_prereqs()
hosts = self.get_host_list(self.inventory, context.CLIARGS['subset'], self.pattern)
self.groups = self.inventory.list_groups()
self.hosts = [x.name for x in hosts]
# This hack is to work around readline issues on a mac:
# http://stackoverflow.com/a/7116997/541202
if 'libedit' in readline.__doc__:
readline.parse_and_bind("bind ^I rl_complete")
else:
readline.parse_and_bind("tab: complete")
histfile = os.path.join(os.path.expanduser("~"), ".ansible-console_history")
try:
readline.read_history_file(histfile)
except IOError:
pass
atexit.register(readline.write_history_file, histfile)
self.set_prompt()
self.cmdloop()