mirror of
https://github.com/ansible-collections/community.general.git
synced 2025-04-24 11:21:25 -07:00
* Using docstrings conflicts with the standard use of docstrings * PYTHON_OPTIMIZE=2 will omit docstrings. Using docstrings makes future changes to the plugin and module code subject to the requirement that we ensure it won't be run with optimization.
180 lines
5.8 KiB
Python
180 lines
5.8 KiB
Python
# This file is part of Ansible
|
|
#
|
|
# Ansible is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Ansible is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
|
from __future__ import (absolute_import, division, print_function)
|
|
__metaclass__ = type
|
|
|
|
DOCUMENTATION = '''
|
|
strategy: debug
|
|
short_description: Executes tasks in interactive debug session.
|
|
description:
|
|
- Task execution is 'linear' but controlled by an interactive debug session.
|
|
version_added: "2.1"
|
|
author: Kishin Yagami
|
|
'''
|
|
|
|
import cmd
|
|
import pprint
|
|
import sys
|
|
|
|
from ansible.module_utils.six.moves import reduce
|
|
from ansible.plugins.strategy.linear import StrategyModule as LinearStrategyModule
|
|
|
|
try:
|
|
from __main__ import display
|
|
except ImportError:
|
|
from ansible.utils.display import Display
|
|
display = Display()
|
|
|
|
|
|
class NextAction(object):
|
|
""" The next action after an interpreter's exit. """
|
|
REDO = 1
|
|
CONTINUE = 2
|
|
EXIT = 3
|
|
|
|
def __init__(self, result=EXIT):
|
|
self.result = result
|
|
|
|
|
|
class StrategyModule(LinearStrategyModule):
|
|
def __init__(self, tqm):
|
|
self.curr_tqm = tqm
|
|
super(StrategyModule, self).__init__(tqm)
|
|
|
|
def _queue_task(self, host, task, task_vars, play_context):
|
|
self.curr_host = host
|
|
self.curr_task = task
|
|
self.curr_task_vars = task_vars
|
|
self.curr_play_context = play_context
|
|
|
|
super(StrategyModule, self)._queue_task(host, task, task_vars, play_context)
|
|
|
|
def _process_pending_results(self, iterator, one_pass=False, max_passes=None):
|
|
if not hasattr(self, "curr_host"):
|
|
return super(StrategyModule, self)._process_pending_results(iterator, one_pass, max_passes)
|
|
|
|
prev_host_state = iterator.get_host_state(self.curr_host)
|
|
results = super(StrategyModule, self)._process_pending_results(iterator, one_pass)
|
|
|
|
while self._need_debug(results):
|
|
next_action = NextAction()
|
|
dbg = Debugger(self, results, next_action)
|
|
dbg.cmdloop()
|
|
|
|
if next_action.result == NextAction.REDO:
|
|
# rollback host state
|
|
self.curr_tqm.clear_failed_hosts()
|
|
iterator._host_states[self.curr_host.name] = prev_host_state
|
|
if reduce(lambda total, res: res.is_failed() or total, results, False):
|
|
self._tqm._stats.failures[self.curr_host.name] -= 1
|
|
elif reduce(lambda total, res: res.is_unreachable() or total, results, False):
|
|
self._tqm._stats.dark[self.curr_host.name] -= 1
|
|
|
|
# redo
|
|
super(StrategyModule, self)._queue_task(self.curr_host, self.curr_task, self.curr_task_vars, self.curr_play_context)
|
|
results = super(StrategyModule, self)._process_pending_results(iterator, one_pass)
|
|
elif next_action.result == NextAction.CONTINUE:
|
|
break
|
|
elif next_action.result == NextAction.EXIT:
|
|
exit(1)
|
|
|
|
return results
|
|
|
|
def _need_debug(self, results):
|
|
return reduce(lambda total, res: res.is_failed() or res.is_unreachable() or total, results, False)
|
|
|
|
|
|
class Debugger(cmd.Cmd):
|
|
prompt = '(debug) ' # debugger
|
|
prompt_continuous = '> ' # multiple lines
|
|
|
|
def __init__(self, strategy_module, results, next_action):
|
|
# cmd.Cmd is old-style class
|
|
cmd.Cmd.__init__(self)
|
|
|
|
self.intro = "Debugger invoked"
|
|
self.scope = {}
|
|
self.scope['task'] = strategy_module.curr_task
|
|
self.scope['vars'] = strategy_module.curr_task_vars
|
|
self.scope['host'] = strategy_module.curr_host
|
|
self.scope['result'] = results[0]._result
|
|
self.scope['results'] = results # for debug of this debugger
|
|
self.next_action = next_action
|
|
|
|
def cmdloop(self):
|
|
try:
|
|
cmd.Cmd.cmdloop(self)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
def do_EOF(self, args):
|
|
return self.do_quit(args)
|
|
|
|
def do_quit(self, args):
|
|
display.display('aborted')
|
|
self.next_action.result = NextAction.EXIT
|
|
return True
|
|
|
|
do_q = do_quit
|
|
|
|
def do_continue(self, args):
|
|
self.next_action.result = NextAction.CONTINUE
|
|
return True
|
|
|
|
do_c = do_continue
|
|
|
|
def do_redo(self, args):
|
|
self.next_action.result = NextAction.REDO
|
|
return True
|
|
|
|
do_r = do_redo
|
|
|
|
def evaluate(self, args):
|
|
try:
|
|
return eval(args, globals(), self.scope)
|
|
except:
|
|
t, v = sys.exc_info()[:2]
|
|
if isinstance(t, str):
|
|
exc_type_name = t
|
|
else:
|
|
exc_type_name = t.__name__
|
|
display.display('***%s:%s' % (exc_type_name, repr(v)))
|
|
raise
|
|
|
|
def do_p(self, args):
|
|
try:
|
|
result = self.evaluate(args)
|
|
display.display(pprint.pformat(result))
|
|
except:
|
|
pass
|
|
|
|
def execute(self, args):
|
|
try:
|
|
code = compile(args + '\n', '<stdin>', 'single')
|
|
exec(code, globals(), self.scope)
|
|
except:
|
|
t, v = sys.exc_info()[:2]
|
|
if isinstance(t, str):
|
|
exc_type_name = t
|
|
else:
|
|
exc_type_name = t.__name__
|
|
display.display('***%s:%s' % (exc_type_name, repr(v)))
|
|
raise
|
|
|
|
def default(self, line):
|
|
try:
|
|
self.execute(line)
|
|
except:
|
|
pass
|