mirror of
https://github.com/ansible-collections/community.general.git
synced 2025-05-29 04:19:15 -07:00
Dynamic task includes still need some work, this is a rough first version. * doesn't work with handler sections of playbooks yet * when using include + with*, the insertion order is backwards * fix potential for task lists to be unsynchronized when using the linear strategy, as the include conditional could be predicated on an inventory variable
413 lines
17 KiB
Python
413 lines
17 KiB
Python
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
|
|
#
|
|
# This file is part of Ansible
|
|
#
|
|
# Ansible is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Ansible is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
# Make coding more python3-ish
|
|
from __future__ import (absolute_import, division, print_function)
|
|
__metaclass__ = type
|
|
|
|
import Queue
|
|
import time
|
|
|
|
from ansible.errors import *
|
|
|
|
from ansible.inventory.host import Host
|
|
from ansible.inventory.group import Group
|
|
|
|
from ansible.playbook.handler import Handler
|
|
from ansible.playbook.helpers import load_list_of_blocks, compile_block_list
|
|
from ansible.playbook.role import ROLE_CACHE, hash_params
|
|
from ansible.plugins import module_loader
|
|
from ansible.utils.debug import debug
|
|
|
|
|
|
__all__ = ['StrategyBase']
|
|
|
|
|
|
class StrategyBase:
|
|
|
|
'''
|
|
This is the base class for strategy plugins, which contains some common
|
|
code useful to all strategies like running handlers, cleanup actions, etc.
|
|
'''
|
|
|
|
def __init__(self, tqm):
|
|
self._tqm = tqm
|
|
self._inventory = tqm.get_inventory()
|
|
self._workers = tqm.get_workers()
|
|
self._notified_handlers = tqm.get_notified_handlers()
|
|
self._callback = tqm.get_callback()
|
|
self._variable_manager = tqm.get_variable_manager()
|
|
self._loader = tqm.get_loader()
|
|
self._final_q = tqm._final_q
|
|
|
|
# internal counters
|
|
self._pending_results = 0
|
|
self._cur_worker = 0
|
|
|
|
# this dictionary is used to keep track of hosts that have
|
|
# outstanding tasks still in queue
|
|
self._blocked_hosts = dict()
|
|
|
|
def run(self, iterator, connection_info, result=True):
|
|
# save the counts on failed/unreachable hosts, as the cleanup/handler
|
|
# methods will clear that information during their runs
|
|
num_failed = len(self._tqm._failed_hosts)
|
|
num_unreachable = len(self._tqm._unreachable_hosts)
|
|
|
|
debug("running the cleanup portion of the play")
|
|
result &= self.cleanup(iterator, connection_info)
|
|
debug("running handlers")
|
|
result &= self.run_handlers(iterator, connection_info)
|
|
|
|
if not result:
|
|
if num_unreachable > 0:
|
|
return 3
|
|
elif num_failed > 0:
|
|
return 2
|
|
else:
|
|
return 1
|
|
else:
|
|
return 0
|
|
|
|
def get_hosts_remaining(self, play):
|
|
return [host for host in self._inventory.get_hosts(play.hosts) if host.name not in self._tqm._failed_hosts and host.get_name() not in self._tqm._unreachable_hosts]
|
|
|
|
def get_failed_hosts(self, play):
|
|
return [host for host in self._inventory.get_hosts(play.hosts) if host.name in self._tqm._failed_hosts]
|
|
|
|
def _queue_task(self, host, task, task_vars, connection_info):
|
|
''' handles queueing the task up to be sent to a worker '''
|
|
|
|
debug("entering _queue_task() for %s/%s" % (host, task))
|
|
|
|
# and then queue the new task
|
|
debug("%s - putting task (%s) in queue" % (host, task))
|
|
try:
|
|
debug("worker is %d (out of %d available)" % (self._cur_worker+1, len(self._workers)))
|
|
|
|
(worker_prc, main_q, rslt_q) = self._workers[self._cur_worker]
|
|
self._cur_worker += 1
|
|
if self._cur_worker >= len(self._workers):
|
|
self._cur_worker = 0
|
|
|
|
self._pending_results += 1
|
|
main_q.put((host, task, self._loader.get_basedir(), task_vars, connection_info, module_loader), block=False)
|
|
except (EOFError, IOError, AssertionError), e:
|
|
# most likely an abort
|
|
debug("got an error while queuing: %s" % e)
|
|
return
|
|
debug("exiting _queue_task() for %s/%s" % (host, task))
|
|
|
|
def _process_pending_results(self, iterator):
|
|
'''
|
|
Reads results off the final queue and takes appropriate action
|
|
based on the result (executing callbacks, updating state, etc.).
|
|
'''
|
|
|
|
while not self._final_q.empty() and not self._tqm._terminated:
|
|
try:
|
|
result = self._final_q.get(block=False)
|
|
debug("got result from result worker: %s" % (result,))
|
|
|
|
# all host status messages contain 2 entries: (msg, task_result)
|
|
if result[0] in ('host_task_ok', 'host_task_failed', 'host_task_skipped', 'host_unreachable'):
|
|
task_result = result[1]
|
|
host = task_result._host
|
|
task = task_result._task
|
|
if result[0] == 'host_task_failed':
|
|
if not task.ignore_errors:
|
|
debug("marking %s as failed" % host.get_name())
|
|
self._tqm._failed_hosts[host.get_name()] = True
|
|
self._callback.runner_on_failed(task, task_result)
|
|
elif result[0] == 'host_unreachable':
|
|
self._tqm._unreachable_hosts[host.get_name()] = True
|
|
self._callback.runner_on_unreachable(task, task_result)
|
|
elif result[0] == 'host_task_skipped':
|
|
self._callback.runner_on_skipped(task, task_result)
|
|
elif result[0] == 'host_task_ok':
|
|
self._callback.runner_on_ok(task, task_result)
|
|
|
|
self._pending_results -= 1
|
|
if host.name in self._blocked_hosts:
|
|
del self._blocked_hosts[host.name]
|
|
|
|
# If this is a role task, mark the parent role as being run (if
|
|
# the task was ok or failed, but not skipped or unreachable)
|
|
if task_result._task._role is not None and result[0] in ('host_task_ok', 'host_task_failed'):
|
|
# lookup the role in the ROLE_CACHE to make sure we're dealing
|
|
# with the correct object and mark it as executed
|
|
for (entry, role_obj) in ROLE_CACHE[task_result._task._role._role_name].iteritems():
|
|
#hashed_entry = frozenset(task_result._task._role._role_params.iteritems())
|
|
hashed_entry = hash_params(task_result._task._role._role_params)
|
|
if entry == hashed_entry :
|
|
role_obj._had_task_run = True
|
|
|
|
elif result[0] == 'include':
|
|
host = result[1]
|
|
task = result[2]
|
|
include_file = result[3]
|
|
include_vars = result[4]
|
|
|
|
if isinstance(task, Handler):
|
|
# FIXME: figure out how to make includes work for handlers
|
|
pass
|
|
else:
|
|
original_task = iterator.get_original_task(task)
|
|
if original_task._role:
|
|
include_file = self._loader.path_dwim_relative(original_task._role._role_path, 'tasks', include_file)
|
|
new_tasks = self._load_included_file(original_task, include_file, include_vars)
|
|
iterator.add_tasks(host, new_tasks)
|
|
|
|
elif result[0] == 'add_host':
|
|
task_result = result[1]
|
|
new_host_info = task_result.get('add_host', dict())
|
|
|
|
self._add_host(new_host_info)
|
|
|
|
elif result[0] == 'add_group':
|
|
host = result[1]
|
|
task_result = result[2]
|
|
group_name = task_result.get('add_group')
|
|
|
|
self._add_group(host, group_name)
|
|
|
|
elif result[0] == 'notify_handler':
|
|
host = result[1]
|
|
handler_name = result[2]
|
|
|
|
if handler_name not in self._notified_handlers:
|
|
self._notified_handlers[handler_name] = []
|
|
|
|
if host not in self._notified_handlers[handler_name]:
|
|
self._notified_handlers[handler_name].append(host)
|
|
|
|
elif result[0] == 'set_host_var':
|
|
host = result[1]
|
|
var_name = result[2]
|
|
var_value = result[3]
|
|
self._variable_manager.set_host_variable(host, var_name, var_value)
|
|
|
|
elif result[0] == 'set_host_facts':
|
|
host = result[1]
|
|
facts = result[2]
|
|
self._variable_manager.set_host_facts(host, facts)
|
|
|
|
else:
|
|
raise AnsibleError("unknown result message received: %s" % result[0])
|
|
except Queue.Empty:
|
|
pass
|
|
|
|
def _wait_on_pending_results(self, iterator):
|
|
'''
|
|
Wait for the shared counter to drop to zero, using a short sleep
|
|
between checks to ensure we don't spin lock
|
|
'''
|
|
|
|
while self._pending_results > 0 and not self._tqm._terminated:
|
|
debug("waiting for pending results (%d left)" % self._pending_results)
|
|
self._process_pending_results(iterator)
|
|
if self._tqm._terminated:
|
|
break
|
|
time.sleep(0.01)
|
|
|
|
def _add_host(self, host_info):
|
|
'''
|
|
Helper function to add a new host to inventory based on a task result.
|
|
'''
|
|
|
|
host_name = host_info.get('host_name')
|
|
|
|
# Check if host in cache, add if not
|
|
if host_name in self._inventory._hosts_cache:
|
|
new_host = self._inventory._hosts_cache[host_name]
|
|
else:
|
|
new_host = Host(host_name)
|
|
self._inventory._hosts_cache[host_name] = new_host
|
|
|
|
allgroup = self._inventory.get_group('all')
|
|
allgroup.add_host(new_host)
|
|
|
|
# Set/update the vars for this host
|
|
# FIXME: probably should have a set vars method for the host?
|
|
new_vars = host_info.get('host_vars', dict())
|
|
new_host.vars.update(new_vars)
|
|
|
|
new_groups = host_info.get('groups', [])
|
|
for group_name in new_groups:
|
|
if not self._inventory.get_group(group_name):
|
|
new_group = Group(group_name)
|
|
self._inventory.add_group(new_group)
|
|
new_group.vars = self._inventory.get_group_variables(group_name)
|
|
else:
|
|
new_group = self._inventory.get_group(group_name)
|
|
|
|
new_group.add_host(new_host)
|
|
|
|
# add this host to the group cache
|
|
if self._inventory._groups_list is not None:
|
|
if group_name in self._inventory._groups_list:
|
|
if new_host.name not in self._inventory._groups_list[group_name]:
|
|
self._inventory._groups_list[group_name].append(new_host.name)
|
|
|
|
# clear pattern caching completely since it's unpredictable what
|
|
# patterns may have referenced the group
|
|
# FIXME: is this still required?
|
|
self._inventory.clear_pattern_cache()
|
|
|
|
def _add_group(self, host, group_name):
|
|
'''
|
|
Helper function to add a group (if it does not exist), and to assign the
|
|
specified host to that group.
|
|
'''
|
|
|
|
new_group = self._inventory.get_group(group_name)
|
|
if not new_group:
|
|
# create the new group and add it to inventory
|
|
new_group = Group(group_name)
|
|
self._inventory.add_group(new_group)
|
|
|
|
# and add the group to the proper hierarchy
|
|
allgroup = self._inventory.get_group('all')
|
|
allgroup.add_child_group(new_group)
|
|
|
|
# the host here is from the executor side, which means it was a
|
|
# serialized/cloned copy and we'll need to look up the proper
|
|
# host object from the master inventory
|
|
actual_host = self._inventory.get_host(host.name)
|
|
|
|
# and add the host to the group
|
|
new_group.add_host(actual_host)
|
|
|
|
def _load_included_file(self, task, include_file, include_vars):
|
|
'''
|
|
Loads an included YAML file of tasks, applying the optional set of variables.
|
|
'''
|
|
|
|
data = self._loader.load_from_file(include_file)
|
|
if not isinstance(data, list):
|
|
raise AnsibleParsingError("included task files must contain a list of tasks", obj=ds)
|
|
|
|
is_handler = isinstance(task, Handler)
|
|
|
|
block_list = load_list_of_blocks(
|
|
data,
|
|
parent_block=task._block,
|
|
task_include=task,
|
|
role=task._role,
|
|
use_handlers=is_handler,
|
|
loader=self._loader
|
|
)
|
|
|
|
|
|
task_list = compile_block_list(block_list)
|
|
for t in task_list:
|
|
t.vars = include_vars.copy()
|
|
|
|
return task_list
|
|
|
|
def cleanup(self, iterator, connection_info):
|
|
'''
|
|
Iterates through failed hosts and runs any outstanding rescue/always blocks
|
|
and handlers which may still need to be run after a failure.
|
|
'''
|
|
|
|
debug("in cleanup")
|
|
result = True
|
|
|
|
debug("getting failed hosts")
|
|
failed_hosts = self.get_failed_hosts(iterator._play)
|
|
if len(failed_hosts) == 0:
|
|
debug("there are no failed hosts")
|
|
return result
|
|
|
|
debug("marking hosts failed in the iterator")
|
|
# mark the host as failed in the iterator so it will take
|
|
# any required rescue paths which may be outstanding
|
|
for host in failed_hosts:
|
|
iterator.mark_host_failed(host)
|
|
|
|
debug("clearing the failed hosts list")
|
|
# clear the failed hosts dictionary now while also
|
|
for entry in self._tqm._failed_hosts.keys():
|
|
del self._tqm._failed_hosts[entry]
|
|
|
|
work_to_do = True
|
|
while work_to_do:
|
|
work_to_do = False
|
|
for host in failed_hosts:
|
|
host_name = host.get_name()
|
|
|
|
if host_name in self._tqm._failed_hosts:
|
|
iterator.mark_host_failed(host)
|
|
del self._tqm._failed_hosts[host_name]
|
|
|
|
if host_name not in self._tqm._unreachable_hosts and iterator.get_next_task_for_host(host, peek=True):
|
|
work_to_do = True
|
|
# check to see if this host is blocked (still executing a previous task)
|
|
if not host_name in self._blocked_hosts:
|
|
# pop the task, mark the host blocked, and queue it
|
|
self._blocked_hosts[host_name] = True
|
|
task = iterator.get_next_task_for_host(host)
|
|
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task)
|
|
self._callback.playbook_on_cleanup_task_start(task.get_name())
|
|
self._queue_task(host, task, task_vars, connection_info)
|
|
|
|
self._process_pending_results(iterator)
|
|
|
|
# no more work, wait until the queue is drained
|
|
self._wait_on_pending_results(iterator)
|
|
|
|
return result
|
|
|
|
def run_handlers(self, iterator, connection_info):
|
|
'''
|
|
Runs handlers on those hosts which have been notified.
|
|
'''
|
|
|
|
result = True
|
|
|
|
# FIXME: getting the handlers from the iterators play should be
|
|
# a method on the iterator, which may also filter the list
|
|
# of handlers based on the notified list
|
|
handlers = compile_block_list(iterator._play.handlers)
|
|
|
|
debug("handlers are: %s" % handlers)
|
|
for handler in handlers:
|
|
handler_name = handler.get_name()
|
|
|
|
if handler_name in self._notified_handlers and len(self._notified_handlers[handler_name]):
|
|
if not len(self.get_hosts_remaining(iterator._play)):
|
|
self._callback.playbook_on_no_hosts_remaining()
|
|
result = False
|
|
break
|
|
|
|
self._callback.playbook_on_handler_task_start(handler_name)
|
|
for host in self._notified_handlers[handler_name]:
|
|
if not handler.has_triggered(host):
|
|
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=handler)
|
|
self._queue_task(host, handler, task_vars, connection_info)
|
|
handler.flag_for_host(host)
|
|
|
|
self._process_pending_results(iterator)
|
|
|
|
self._wait_on_pending_results(iterator)
|
|
|
|
# wipe the notification list
|
|
self._notified_handlers[handler_name] = []
|
|
|
|
debug("done running handlers, result is: %s" % result)
|
|
return result
|