Making the switch to v2

This commit is contained in:
James Cammarata 2015-05-03 21:47:26 -05:00
parent 8cf4452d48
commit ce3ef7f4c1
486 changed files with 7948 additions and 9070 deletions

View file

@ -0,0 +1,432 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from six.moves import queue as Queue
import time
from ansible.errors import *
from ansible.inventory.host import Host
from ansible.inventory.group import Group
from ansible.playbook.handler import Handler
from ansible.playbook.helpers import load_list_of_blocks
from ansible.playbook.role import ROLE_CACHE, hash_params
from ansible.plugins import filter_loader, lookup_loader, module_loader
from ansible.utils.debug import debug
__all__ = ['StrategyBase']
# FIXME: this should probably be in the plugins/__init__.py, with
# a smarter mechanism to set all of the attributes based on
# the loaders created there
class SharedPluginLoaderObj:
'''
A simple object to make pass the various plugin loaders to
the forked processes over the queue easier
'''
def __init__(self):
self.filter_loader = filter_loader
self.lookup_loader = lookup_loader
self.module_loader = module_loader
class StrategyBase:
'''
This is the base class for strategy plugins, which contains some common
code useful to all strategies like running handlers, cleanup actions, etc.
'''
def __init__(self, tqm):
self._tqm = tqm
self._inventory = tqm.get_inventory()
self._workers = tqm.get_workers()
self._notified_handlers = tqm.get_notified_handlers()
#self._callback = tqm.get_callback()
self._variable_manager = tqm.get_variable_manager()
self._loader = tqm.get_loader()
self._final_q = tqm._final_q
# internal counters
self._pending_results = 0
self._cur_worker = 0
# this dictionary is used to keep track of hosts that have
# outstanding tasks still in queue
self._blocked_hosts = dict()
def run(self, iterator, connection_info, result=True):
# save the counts on failed/unreachable hosts, as the cleanup/handler
# methods will clear that information during their runs
num_failed = len(self._tqm._failed_hosts)
num_unreachable = len(self._tqm._unreachable_hosts)
#debug("running the cleanup portion of the play")
#result &= self.cleanup(iterator, connection_info)
debug("running handlers")
result &= self.run_handlers(iterator, connection_info)
# send the stats callback
self._tqm.send_callback('v2_playbook_on_stats', self._tqm._stats)
if not result:
if num_unreachable > 0:
return 3
elif num_failed > 0:
return 2
else:
return 1
else:
return 0
def get_hosts_remaining(self, play):
return [host for host in self._inventory.get_hosts(play.hosts) if host.name not in self._tqm._failed_hosts and host.name not in self._tqm._unreachable_hosts]
def get_failed_hosts(self, play):
return [host for host in self._inventory.get_hosts(play.hosts) if host.name in self._tqm._failed_hosts]
def _queue_task(self, host, task, task_vars, connection_info):
''' handles queueing the task up to be sent to a worker '''
debug("entering _queue_task() for %s/%s" % (host, task))
# and then queue the new task
debug("%s - putting task (%s) in queue" % (host, task))
try:
debug("worker is %d (out of %d available)" % (self._cur_worker+1, len(self._workers)))
(worker_prc, main_q, rslt_q) = self._workers[self._cur_worker]
self._cur_worker += 1
if self._cur_worker >= len(self._workers):
self._cur_worker = 0
self._pending_results += 1
# create a dummy object with plugin loaders set as an easier
# way to share them with the forked processes
shared_loader_obj = SharedPluginLoaderObj()
main_q.put((host, task, self._loader.get_basedir(), task_vars, connection_info, shared_loader_obj), block=False)
except (EOFError, IOError, AssertionError) as e:
# most likely an abort
debug("got an error while queuing: %s" % e)
return
debug("exiting _queue_task() for %s/%s" % (host, task))
def _process_pending_results(self, iterator):
'''
Reads results off the final queue and takes appropriate action
based on the result (executing callbacks, updating state, etc.).
'''
ret_results = []
while not self._final_q.empty() and not self._tqm._terminated:
try:
result = self._final_q.get(block=False)
debug("got result from result worker: %s" % (result,))
# all host status messages contain 2 entries: (msg, task_result)
if result[0] in ('host_task_ok', 'host_task_failed', 'host_task_skipped', 'host_unreachable'):
task_result = result[1]
host = task_result._host
task = task_result._task
if result[0] == 'host_task_failed':
if not task.ignore_errors:
debug("marking %s as failed" % host.name)
iterator.mark_host_failed(host)
self._tqm._failed_hosts[host.name] = True
self._tqm._stats.increment('failures', host.name)
self._tqm.send_callback('v2_runner_on_failed', task_result)
elif result[0] == 'host_unreachable':
self._tqm._unreachable_hosts[host.name] = True
self._tqm._stats.increment('dark', host.name)
self._tqm.send_callback('v2_runner_on_unreachable', task_result)
elif result[0] == 'host_task_skipped':
self._tqm._stats.increment('skipped', host.name)
self._tqm.send_callback('v2_runner_on_skipped', task_result)
elif result[0] == 'host_task_ok':
self._tqm._stats.increment('ok', host.name)
if 'changed' in task_result._result and task_result._result['changed']:
self._tqm._stats.increment('changed', host.name)
self._tqm.send_callback('v2_runner_on_ok', task_result)
self._pending_results -= 1
if host.name in self._blocked_hosts:
del self._blocked_hosts[host.name]
# If this is a role task, mark the parent role as being run (if
# the task was ok or failed, but not skipped or unreachable)
if task_result._task._role is not None and result[0] in ('host_task_ok', 'host_task_failed'):
# lookup the role in the ROLE_CACHE to make sure we're dealing
# with the correct object and mark it as executed
for (entry, role_obj) in ROLE_CACHE[task_result._task._role._role_name].iteritems():
hashed_entry = hash_params(task_result._task._role._role_params)
if entry == hashed_entry :
role_obj._had_task_run = True
ret_results.append(task_result)
elif result[0] == 'add_host':
task_result = result[1]
new_host_info = task_result.get('add_host', dict())
self._add_host(new_host_info)
elif result[0] == 'add_group':
host = result[1]
task_result = result[2]
group_name = task_result.get('add_group')
self._add_group(host, group_name)
elif result[0] == 'notify_handler':
host = result[1]
handler_name = result[2]
if handler_name not in self._notified_handlers:
self._notified_handlers[handler_name] = []
if host not in self._notified_handlers[handler_name]:
self._notified_handlers[handler_name].append(host)
elif result[0] == 'set_host_var':
host = result[1]
var_name = result[2]
var_value = result[3]
self._variable_manager.set_host_variable(host, var_name, var_value)
elif result[0] == 'set_host_facts':
host = result[1]
facts = result[2]
self._variable_manager.set_host_facts(host, facts)
else:
raise AnsibleError("unknown result message received: %s" % result[0])
except Queue.Empty:
pass
return ret_results
def _wait_on_pending_results(self, iterator):
'''
Wait for the shared counter to drop to zero, using a short sleep
between checks to ensure we don't spin lock
'''
ret_results = []
while self._pending_results > 0 and not self._tqm._terminated:
debug("waiting for pending results (%d left)" % self._pending_results)
results = self._process_pending_results(iterator)
ret_results.extend(results)
if self._tqm._terminated:
break
time.sleep(0.01)
return ret_results
def _add_host(self, host_info):
'''
Helper function to add a new host to inventory based on a task result.
'''
host_name = host_info.get('host_name')
# Check if host in cache, add if not
if host_name in self._inventory._hosts_cache:
new_host = self._inventory._hosts_cache[host_name]
else:
new_host = Host(host_name)
self._inventory._hosts_cache[host_name] = new_host
allgroup = self._inventory.get_group('all')
allgroup.add_host(new_host)
# Set/update the vars for this host
# FIXME: probably should have a set vars method for the host?
new_vars = host_info.get('host_vars', dict())
new_host.vars.update(new_vars)
new_groups = host_info.get('groups', [])
for group_name in new_groups:
if not self._inventory.get_group(group_name):
new_group = Group(group_name)
self._inventory.add_group(new_group)
new_group.vars = self._inventory.get_group_variables(group_name)
else:
new_group = self._inventory.get_group(group_name)
new_group.add_host(new_host)
# add this host to the group cache
if self._inventory._groups_list is not None:
if group_name in self._inventory._groups_list:
if new_host.name not in self._inventory._groups_list[group_name]:
self._inventory._groups_list[group_name].append(new_host.name)
# clear pattern caching completely since it's unpredictable what
# patterns may have referenced the group
# FIXME: is this still required?
self._inventory.clear_pattern_cache()
def _add_group(self, host, group_name):
'''
Helper function to add a group (if it does not exist), and to assign the
specified host to that group.
'''
new_group = self._inventory.get_group(group_name)
if not new_group:
# create the new group and add it to inventory
new_group = Group(group_name)
self._inventory.add_group(new_group)
# and add the group to the proper hierarchy
allgroup = self._inventory.get_group('all')
allgroup.add_child_group(new_group)
# the host here is from the executor side, which means it was a
# serialized/cloned copy and we'll need to look up the proper
# host object from the master inventory
actual_host = self._inventory.get_host(host.name)
# and add the host to the group
new_group.add_host(actual_host)
def _load_included_file(self, included_file):
'''
Loads an included YAML file of tasks, applying the optional set of variables.
'''
data = self._loader.load_from_file(included_file._filename)
if not isinstance(data, list):
raise AnsibleParserError("included task files must contain a list of tasks", obj=included_file._task._ds)
is_handler = isinstance(included_file._task, Handler)
block_list = load_list_of_blocks(
data,
play=included_file._task._block._play,
parent_block=included_file._task._block,
task_include=included_file._task,
role=included_file._task._role,
use_handlers=is_handler,
loader=self._loader
)
# set the vars for this task from those specified as params to the include
for b in block_list:
b._vars = included_file._args.copy()
return block_list
def cleanup(self, iterator, connection_info):
'''
Iterates through failed hosts and runs any outstanding rescue/always blocks
and handlers which may still need to be run after a failure.
'''
debug("in cleanup")
result = True
debug("getting failed hosts")
failed_hosts = self.get_failed_hosts(iterator._play)
if len(failed_hosts) == 0:
debug("there are no failed hosts")
return result
debug("marking hosts failed in the iterator")
# mark the host as failed in the iterator so it will take
# any required rescue paths which may be outstanding
for host in failed_hosts:
iterator.mark_host_failed(host)
debug("clearing the failed hosts list")
# clear the failed hosts dictionary now while also
for entry in self._tqm._failed_hosts.keys():
del self._tqm._failed_hosts[entry]
work_to_do = True
while work_to_do:
work_to_do = False
for host in failed_hosts:
host_name = host.name
if host_name in self._tqm._failed_hosts:
iterator.mark_host_failed(host)
del self._tqm._failed_hosts[host_name]
if host_name in self._blocked_hosts:
work_to_do = True
continue
elif iterator.get_next_task_for_host(host, peek=True) and host_name not in self._tqm._unreachable_hosts:
work_to_do = True
# pop the task, mark the host blocked, and queue it
self._blocked_hosts[host_name] = True
task = iterator.get_next_task_for_host(host)
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task)
self._tqm.send_callback('v2_playbook_on_cleanup_task_start', task)
self._queue_task(host, task, task_vars, connection_info)
self._process_pending_results(iterator)
time.sleep(0.01)
# no more work, wait until the queue is drained
self._wait_on_pending_results(iterator)
return result
def run_handlers(self, iterator, connection_info):
'''
Runs handlers on those hosts which have been notified.
'''
result = True
# FIXME: getting the handlers from the iterators play should be
# a method on the iterator, which may also filter the list
# of handlers based on the notified list
for handler_block in iterator._play.handlers:
# FIXME: handlers need to support the rescue/always portions of blocks too,
# but this may take some work in the iterator and gets tricky when
# we consider the ability of meta tasks to flush handlers
for handler in handler_block.block:
handler_name = handler.get_name()
if handler_name in self._notified_handlers and len(self._notified_handlers[handler_name]):
if not len(self.get_hosts_remaining(iterator._play)):
self._tqm.send_callback('v2_playbook_on_no_hosts_remaining')
result = False
break
self._tqm.send_callback('v2_playbook_on_handler_task_start', handler)
for host in self._notified_handlers[handler_name]:
if not handler.has_triggered(host):
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=handler)
self._queue_task(host, handler, task_vars, connection_info)
handler.flag_for_host(host)
self._process_pending_results(iterator)
self._wait_on_pending_results(iterator)
# wipe the notification list
self._notified_handlers[handler_name] = []
debug("done running handlers, result is: %s" % result)
return result

View file

@ -0,0 +1,151 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import time
from ansible.plugins.strategies import StrategyBase
from ansible.utils.debug import debug
class StrategyModule(StrategyBase):
def run(self, iterator, connection_info):
'''
The "free" strategy is a bit more complex, in that it allows tasks to
be sent to hosts as quickly as they can be processed. This means that
some hosts may finish very quickly if run tasks result in little or no
work being done versus other systems.
The algorithm used here also tries to be more "fair" when iterating
through hosts by remembering the last host in the list to be given a task
and starting the search from there as opposed to the top of the hosts
list again, which would end up favoring hosts near the beginning of the
list.
'''
# the last host to be given a task
last_host = 0
result = True
work_to_do = True
while work_to_do and not self._tqm._terminated:
hosts_left = self.get_hosts_remaining(iterator._play)
if len(hosts_left) == 0:
self._tqm.send_callback('v2_playbook_on_no_hosts_remaining')
result = False
break
work_to_do = False # assume we have no more work to do
starting_host = last_host # save current position so we know when we've
# looped back around and need to break
# try and find an unblocked host with a task to run
host_results = []
while True:
host = hosts_left[last_host]
debug("next free host: %s" % host)
host_name = host.get_name()
# peek at the next task for the host, to see if there's
# anything to do do for this host
(state, task) = iterator.get_next_task_for_host(host, peek=True)
debug("free host state: %s" % state)
debug("free host task: %s" % task)
if host_name not in self._tqm._failed_hosts and host_name not in self._tqm._unreachable_hosts and task:
# set the flag so the outer loop knows we've still found
# some work which needs to be done
work_to_do = True
debug("this host has work to do")
# check to see if this host is blocked (still executing a previous task)
if not host_name in self._blocked_hosts:
# pop the task, mark the host blocked, and queue it
self._blocked_hosts[host_name] = True
(state, task) = iterator.get_next_task_for_host(host)
debug("getting variables")
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task)
debug("done getting variables")
# check to see if this task should be skipped, due to it being a member of a
# role which has already run (and whether that role allows duplicate execution)
if task._role and task._role.has_run():
# If there is no metadata, the default behavior is to not allow duplicates,
# if there is metadata, check to see if the allow_duplicates flag was set to true
if task._role._metadata is None or task._role._metadata and not task._role._metadata.allow_duplicates:
debug("'%s' skipped because role has already run" % task)
continue
if not task.evaluate_tags(connection_info.only_tags, connection_info.skip_tags, task_vars) and task.action != 'setup':
debug("'%s' failed tag evaluation" % task)
continue
if task.action == 'meta':
# meta tasks store their args in the _raw_params field of args,
# since they do not use k=v pairs, so get that
meta_action = task.args.get('_raw_params')
if meta_action == 'noop':
# FIXME: issue a callback for the noop here?
continue
elif meta_action == 'flush_handlers':
# FIXME: in the 'free' mode, flushing handlers should result in
# only those handlers notified for the host doing the flush
self.run_handlers(iterator, connection_info)
else:
raise AnsibleError("invalid meta action requested: %s" % meta_action, obj=task._ds)
self._blocked_hosts[host_name] = False
else:
self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
self._queue_task(host, task, task_vars, connection_info)
# move on to the next host and make sure we
# haven't gone past the end of our hosts list
last_host += 1
if last_host > len(hosts_left) - 1:
last_host = 0
# if we've looped around back to the start, break out
if last_host == starting_host:
break
results = self._process_pending_results(iterator)
host_results.extend(results)
# pause briefly so we don't spin lock
time.sleep(0.05)
try:
results = self._wait_on_pending_results(iterator)
host_results.extend(results)
except Exception as e:
# FIXME: ctrl+c can cause some failures here, so catch them
# with the appropriate error type
print("wtf: %s" % e)
pass
# run the base class run() method, which executes the cleanup function
# and runs any outstanding handlers which have been triggered
super(StrategyModule, self).run(iterator, connection_info)

View file

@ -0,0 +1,307 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.errors import AnsibleError
from ansible.executor.play_iterator import PlayIterator
from ansible.playbook.block import Block
from ansible.playbook.task import Task
from ansible.plugins import action_loader
from ansible.plugins.strategies import StrategyBase
from ansible.utils.debug import debug
class StrategyModule(StrategyBase):
def _get_next_task_lockstep(self, hosts, iterator):
'''
Returns a list of (host, task) tuples, where the task may
be a noop task to keep the iterator in lock step across
all hosts.
'''
noop_task = Task()
noop_task.action = 'meta'
noop_task.args['_raw_params'] = 'noop'
noop_task.set_loader(iterator._play._loader)
host_tasks = {}
for host in hosts:
host_tasks[host.name] = iterator.get_next_task_for_host(host, peek=True)
num_setups = 0
num_tasks = 0
num_rescue = 0
num_always = 0
lowest_cur_block = len(iterator._blocks)
for (k, v) in host_tasks.iteritems():
if v is None:
continue
(s, t) = v
if s.cur_block < lowest_cur_block and s.run_state != PlayIterator.ITERATING_COMPLETE:
lowest_cur_block = s.cur_block
if s.run_state == PlayIterator.ITERATING_SETUP:
num_setups += 1
elif s.run_state == PlayIterator.ITERATING_TASKS:
num_tasks += 1
elif s.run_state == PlayIterator.ITERATING_RESCUE:
num_rescue += 1
elif s.run_state == PlayIterator.ITERATING_ALWAYS:
num_always += 1
def _advance_selected_hosts(hosts, cur_block, cur_state):
'''
This helper returns the task for all hosts in the requested
state, otherwise they get a noop dummy task. This also advances
the state of the host, since the given states are determined
while using peek=True.
'''
# we return the values in the order they were originally
# specified in the given hosts array
rvals = []
for host in hosts:
(s, t) = host_tasks[host.name]
if s.run_state == cur_state and s.cur_block == cur_block:
new_t = iterator.get_next_task_for_host(host)
#if new_t != t:
# raise AnsibleError("iterator error, wtf?")
rvals.append((host, t))
else:
rvals.append((host, noop_task))
return rvals
# if any hosts are in ITERATING_SETUP, return the setup task
# while all other hosts get a noop
if num_setups:
return _advance_selected_hosts(hosts, lowest_cur_block, PlayIterator.ITERATING_SETUP)
# if any hosts are in ITERATING_TASKS, return the next normal
# task for these hosts, while all other hosts get a noop
if num_tasks:
return _advance_selected_hosts(hosts, lowest_cur_block, PlayIterator.ITERATING_TASKS)
# if any hosts are in ITERATING_RESCUE, return the next rescue
# task for these hosts, while all other hosts get a noop
if num_rescue:
return _advance_selected_hosts(hosts, lowest_cur_block, PlayIterator.ITERATING_RESCUE)
# if any hosts are in ITERATING_ALWAYS, return the next always
# task for these hosts, while all other hosts get a noop
if num_always:
return _advance_selected_hosts(hosts, lowest_cur_block, PlayIterator.ITERATING_ALWAYS)
# at this point, everything must be ITERATING_COMPLETE, so we
# return None for all hosts in the list
return [(host, None) for host in hosts]
def run(self, iterator, connection_info):
'''
The linear strategy is simple - get the next task and queue
it for all hosts, then wait for the queue to drain before
moving on to the next task
'''
result = True
# iteratate over each task, while there is one left to run
work_to_do = True
while work_to_do and not self._tqm._terminated:
try:
debug("getting the remaining hosts for this loop")
self._tqm._failed_hosts = iterator.get_failed_hosts()
hosts_left = self.get_hosts_remaining(iterator._play)
debug("done getting the remaining hosts for this loop")
if len(hosts_left) == 0:
debug("out of hosts to run on")
self._tqm.send_callback('v2_playbook_on_no_hosts_remaining')
result = False
break
# queue up this task for each host in the inventory
callback_sent = False
work_to_do = False
host_results = []
host_tasks = self._get_next_task_lockstep(hosts_left, iterator)
for (host, task) in host_tasks:
if not task:
continue
run_once = False
work_to_do = True
# test to see if the task across all hosts points to an action plugin which
# sets BYPASS_HOST_LOOP to true, or if it has run_once enabled. If so, we
# will only send this task to the first host in the list.
try:
action = action_loader.get(task.action, class_only=True)
if task.run_once or getattr(action, 'BYPASS_HOST_LOOP', False):
run_once = True
except KeyError:
# we don't care here, because the action may simply not have a
# corresponding action plugin
pass
debug("getting variables")
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task)
debug("done getting variables")
# check to see if this task should be skipped, due to it being a member of a
# role which has already run (and whether that role allows duplicate execution)
if task._role and task._role.has_run():
# If there is no metadata, the default behavior is to not allow duplicates,
# if there is metadata, check to see if the allow_duplicates flag was set to true
if task._role._metadata is None or task._role._metadata and not task._role._metadata.allow_duplicates:
debug("'%s' skipped because role has already run" % task)
continue
if task.action == 'meta':
# meta tasks store their args in the _raw_params field of args,
# since they do not use k=v pairs, so get that
meta_action = task.args.get('_raw_params')
if meta_action == 'noop':
# FIXME: issue a callback for the noop here?
continue
elif meta_action == 'flush_handlers':
self.run_handlers(iterator, connection_info)
else:
raise AnsibleError("invalid meta action requested: %s" % meta_action, obj=task._ds)
else:
if not callback_sent:
self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
callback_sent = True
self._blocked_hosts[host.get_name()] = True
self._queue_task(host, task, task_vars, connection_info)
results = self._process_pending_results(iterator)
host_results.extend(results)
# if we're bypassing the host loop, break out now
if run_once:
break
debug("done queuing things up, now waiting for results queue to drain")
results = self._wait_on_pending_results(iterator)
host_results.extend(results)
# FIXME: this needs to be somewhere else
class IncludedFile:
def __init__(self, filename, args, task):
self._filename = filename
self._args = args
self._task = task
self._hosts = []
def add_host(self, host):
if host not in self._hosts:
self._hosts.append(host)
def __eq__(self, other):
return other._filename == self._filename and other._args == self._args
def __repr__(self):
return "%s (%s): %s" % (self._filename, self._args, self._hosts)
# FIXME: this should also be moved to the base class in a method
included_files = []
for res in host_results:
if res._task.action == 'include':
if res._task.loop:
include_results = res._result['results']
else:
include_results = [ res._result ]
for include_result in include_results:
# if the task result was skipped or failed, continue
if 'skipped' in include_result and include_result['skipped'] or 'failed' in include_result:
continue
original_task = iterator.get_original_task(res._host, res._task)
if original_task and original_task._role:
include_file = self._loader.path_dwim_relative(original_task._role._role_path, 'tasks', include_result['include'])
else:
include_file = self._loader.path_dwim(res._task.args.get('_raw_params'))
include_variables = include_result.get('include_variables', dict())
if 'item' in include_result:
include_variables['item'] = include_result['item']
inc_file = IncludedFile(include_file, include_variables, original_task)
try:
pos = included_files.index(inc_file)
inc_file = included_files[pos]
except ValueError:
included_files.append(inc_file)
inc_file.add_host(res._host)
# FIXME: should this be moved into the iterator class? Main downside would be
# that accessing the TQM's callback member would be more difficult, if
# we do want to send callbacks from here
if len(included_files) > 0:
noop_task = Task()
noop_task.action = 'meta'
noop_task.args['_raw_params'] = 'noop'
noop_task.set_loader(iterator._play._loader)
all_blocks = dict((host, []) for host in hosts_left)
for included_file in included_files:
# included hosts get the task list while those excluded get an equal-length
# list of noop tasks, to make sure that they continue running in lock-step
try:
new_blocks = self._load_included_file(included_file)
except AnsibleError, e:
for host in included_file._hosts:
iterator.mark_host_failed(host)
# FIXME: callback here?
print(e)
for new_block in new_blocks:
noop_block = Block(parent_block=task._block)
noop_block.block = [noop_task for t in new_block.block]
noop_block.always = [noop_task for t in new_block.always]
noop_block.rescue = [noop_task for t in new_block.rescue]
for host in hosts_left:
if host in included_file._hosts:
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=included_file._task)
final_block = new_block.filter_tagged_tasks(connection_info, task_vars)
all_blocks[host].append(final_block)
else:
all_blocks[host].append(noop_block)
for host in hosts_left:
iterator.add_tasks(host, all_blocks[host])
debug("results queue empty")
except (IOError, EOFError), e:
debug("got IOError/EOFError in task loop: %s" % e)
# most likely an abort, return failed
return 1
# run the base class run() method, which executes the cleanup function
# and runs any outstanding handlers which have been triggered
return super(StrategyModule, self).run(iterator, connection_info, result)