Merge branch 'threaded_receiver' into devel

This commit is contained in:
James Cammarata 2016-09-20 09:18:26 -05:00
commit 1b54d3b6dc
7 changed files with 461 additions and 389 deletions

View file

@ -19,19 +19,28 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import threading
import time
from collections import deque
from multiprocessing import Lock
from jinja2.exceptions import UndefinedError
from ansible.compat.six.moves import queue as Queue
from ansible.compat.six import iteritems, string_types
from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable
from ansible.executor import action_write_locks
from ansible.executor.process.worker import WorkerProcess
from ansible.executor.task_result import TaskResult
from ansible.inventory.host import Host
from ansible.inventory.group import Group
from ansible.module_utils.facts import Facts
from ansible.playbook.helpers import load_list_of_blocks
from ansible.playbook.included_file import IncludedFile
from ansible.playbook.task_include import TaskInclude
from ansible.playbook.role_include import IncludeRole
from ansible.plugins import action_loader
from ansible.plugins import action_loader, connection_loader, filter_loader, lookup_loader, module_loader, test_loader
from ansible.template import Templar
from ansible.vars import combine_vars, strip_internal_keys
from ansible.module_utils._text import to_text
@ -45,6 +54,41 @@ except ImportError:
__all__ = ['StrategyBase']
# TODO: this should probably be in the plugins/__init__.py, with
# a smarter mechanism to set all of the attributes based on
# the loaders created there
class SharedPluginLoaderObj:
'''
A simple object to make pass the various plugin loaders to
the forked processes over the queue easier
'''
def __init__(self):
self.action_loader = action_loader
self.connection_loader = connection_loader
self.filter_loader = filter_loader
self.test_loader = test_loader
self.lookup_loader = lookup_loader
self.module_loader = module_loader
_sentinel = object()
def results_thread_main(strategy):
#print("RESULT THREAD STARTING: %s" % threading.current_thread())
while True:
try:
result = strategy._final_q.get()
if type(result) == object:
break
else:
#print("result in thread is: %s" % result._result)
strategy._results_lock.acquire()
strategy._results.append(result)
strategy._results_lock.release()
except (IOError, EOFError):
break
except Queue.Empty:
pass
#print("RESULT THREAD EXITED: %s" % threading.current_thread())
class StrategyBase:
@ -56,6 +100,7 @@ class StrategyBase:
def __init__(self, tqm):
self._tqm = tqm
self._inventory = tqm.get_inventory()
self._workers = tqm.get_workers()
self._notified_handlers = tqm._notified_handlers
self._listening_handlers = tqm._listening_handlers
self._variable_manager = tqm.get_variable_manager()
@ -63,16 +108,30 @@ class StrategyBase:
self._final_q = tqm._final_q
self._step = getattr(tqm._options, 'step', False)
self._diff = getattr(tqm._options, 'diff', False)
# Backwards compat: self._display isn't really needed, just import the global display and use that.
self._display = display
# internal counters
self._pending_results = 0
self._cur_worker = 0
# this dictionary is used to keep track of hosts that have
# outstanding tasks still in queue
self._blocked_hosts = dict()
self._results = deque()
self._results_lock = threading.Condition(threading.Lock())
#print("creating thread for strategy %s" % id(self))
self._results_thread = threading.Thread(target=results_thread_main, args=(self,))
self._results_thread.daemon = True
self._results_thread.start()
def cleanup(self):
self._final_q.put(_sentinel)
self._results_thread.join()
def run(self, iterator, play_context, result=0):
# save the failed/unreachable hosts, as the run_handlers()
# method will clear that information during its execution
@ -118,10 +177,57 @@ class StrategyBase:
def _queue_task(self, host, task, task_vars, play_context):
''' handles queueing the task up to be sent to a worker '''
self._tqm.queue_task(host, task, task_vars, play_context)
self._pending_results += 1
def _process_pending_results(self, iterator, one_pass=False, timeout=0.001):
display.debug("entering _queue_task() for %s/%s" % (host.name, task.action))
# Add a write lock for tasks.
# Maybe this should be added somewhere further up the call stack but
# this is the earliest in the code where we have task (1) extracted
# into its own variable and (2) there's only a single code path
# leading to the module being run. This is called by three
# functions: __init__.py::_do_handler_run(), linear.py::run(), and
# free.py::run() so we'd have to add to all three to do it there.
# The next common higher level is __init__.py::run() and that has
# tasks inside of play_iterator so we'd have to extract them to do it
# there.
if task.action not in action_write_locks.action_write_locks:
display.debug('Creating lock for %s' % task.action)
action_write_locks.action_write_locks[task.action] = Lock()
# and then queue the new task
try:
# create a dummy object with plugin loaders set as an easier
# way to share them with the forked processes
shared_loader_obj = SharedPluginLoaderObj()
queued = False
starting_worker = self._cur_worker
while True:
(worker_prc, rslt_q) = self._workers[self._cur_worker]
if worker_prc is None or not worker_prc.is_alive():
worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
self._workers[self._cur_worker][0] = worker_prc
worker_prc.start()
display.debug("worker is %d (out of %d available)" % (self._cur_worker+1, len(self._workers)))
queued = True
self._cur_worker += 1
if self._cur_worker >= len(self._workers):
self._cur_worker = 0
if queued:
break
elif self._cur_worker == starting_worker:
time.sleep(0.0001)
self._pending_results += 1
except (EOFError, IOError, AssertionError) as e:
# most likely an abort
display.debug("got an error while queuing: %s" % e)
return
display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action))
def _process_pending_results(self, iterator, one_pass=False, max_passes=None):
'''
Reads results off the final queue and takes appropriate action
based on the result (executing callbacks, updating state, etc.).
@ -180,226 +286,232 @@ class StrategyBase:
else:
return False
passes = 1
while not self._tqm._terminated and passes < 3:
cur_pass = 0
while True:
try:
task_result = self._final_q.get(timeout=timeout)
original_host = get_original_host(task_result._host)
original_task = iterator.get_original_task(original_host, task_result._task)
task_result._host = original_host
task_result._task = original_task
self._results_lock.acquire()
task_result = self._results.pop()
except IndexError:
break
finally:
self._results_lock.release()
# send callbacks for 'non final' results
if '_ansible_retry' in task_result._result:
self._tqm.send_callback('v2_runner_retry', task_result)
continue
elif '_ansible_item_result' in task_result._result:
if task_result.is_failed() or task_result.is_unreachable():
self._tqm.send_callback('v2_runner_item_on_failed', task_result)
elif task_result.is_skipped():
self._tqm.send_callback('v2_runner_item_on_skipped', task_result)
else:
if 'diff' in task_result._result:
if self._diff:
self._tqm.send_callback('v2_on_file_diff', task_result)
self._tqm.send_callback('v2_runner_item_on_ok', task_result)
continue
original_host = get_original_host(task_result._host)
original_task = iterator.get_original_task(original_host, task_result._task)
task_result._host = original_host
task_result._task = original_task
if original_task.register:
if original_task.run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else:
host_list = [original_host]
clean_copy = strip_internal_keys(task_result._result)
if 'invocation' in clean_copy:
del clean_copy['invocation']
for target_host in host_list:
self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy})
# all host status messages contain 2 entries: (msg, task_result)
role_ran = False
if task_result.is_failed():
role_ran = True
if not original_task.ignore_errors:
display.debug("marking %s as failed" % original_host.name)
if original_task.run_once:
# if we're using run_once, we have to fail every host here
for h in self._inventory.get_hosts(iterator._play.hosts):
if h.name not in self._tqm._unreachable_hosts:
state, _ = iterator.get_next_task_for_host(h, peek=True)
iterator.mark_host_failed(h)
state, new_task = iterator.get_next_task_for_host(h, peek=True)
else:
iterator.mark_host_failed(original_host)
# only add the host to the failed list officially if it has
# been failed by the iterator
if iterator.is_failed(original_host):
self._tqm._failed_hosts[original_host.name] = True
self._tqm._stats.increment('failures', original_host.name)
else:
# otherwise, we grab the current state and if we're iterating on
# the rescue portion of a block then we save the failed task in a
# special var for use within the rescue/always
state, _ = iterator.get_next_task_for_host(original_host, peek=True)
if state.run_state == iterator.ITERATING_RESCUE:
self._variable_manager.set_nonpersistent_facts(
original_host,
dict(
ansible_failed_task=original_task.serialize(),
ansible_failed_result=task_result._result,
),
)
else:
self._tqm._stats.increment('ok', original_host.name)
self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors)
elif task_result.is_unreachable():
self._tqm._unreachable_hosts[original_host.name] = True
self._tqm._stats.increment('dark', original_host.name)
self._tqm.send_callback('v2_runner_on_unreachable', task_result)
# send callbacks for 'non final' results
if '_ansible_retry' in task_result._result:
self._tqm.send_callback('v2_runner_retry', task_result)
continue
elif '_ansible_item_result' in task_result._result:
if task_result.is_failed() or task_result.is_unreachable():
self._tqm.send_callback('v2_runner_item_on_failed', task_result)
elif task_result.is_skipped():
self._tqm._stats.increment('skipped', original_host.name)
self._tqm.send_callback('v2_runner_on_skipped', task_result)
self._tqm.send_callback('v2_runner_item_on_skipped', task_result)
else:
role_ran = True
if original_task.loop:
# this task had a loop, and has more than one result, so
# loop over all of them instead of a single result
result_items = task_result._result.get('results', [])
else:
result_items = [ task_result._result ]
for result_item in result_items:
if '_ansible_notify' in result_item:
if task_result.is_changed():
# The shared dictionary for notified handlers is a proxy, which
# does not detect when sub-objects within the proxy are modified.
# So, per the docs, we reassign the list so the proxy picks up and
# notifies all other threads
for handler_name in result_item['_ansible_notify']:
# Find the handler using the above helper. First we look up the
# dependency chain of the current task (if it's from a role), otherwise
# we just look through the list of handlers in the current play/all
# roles and use the first one that matches the notify name
if handler_name in self._listening_handlers:
for listening_handler_name in self._listening_handlers[handler_name]:
listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers)
if listening_handler is None:
raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name)
if original_host not in self._notified_handlers[listening_handler]:
self._notified_handlers[listening_handler].append(original_host)
display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,))
else:
target_handler = search_handler_blocks(handler_name, iterator._play.handlers)
if target_handler is not None:
if original_host not in self._notified_handlers[target_handler]:
self._notified_handlers[target_handler].append(original_host)
# FIXME: should this be a callback?
display.vv("NOTIFIED HANDLER %s" % (handler_name,))
else:
# As there may be more than one handler with the notified name as the
# parent, so we just keep track of whether or not we found one at all
found = False
for target_handler in self._notified_handlers:
if parent_handler_match(target_handler, handler_name):
self._notified_handlers[target_handler].append(original_host)
display.vv("NOTIFIED HANDLER %s" % (target_handler.get_name(),))
found = True
# and if none were found, then we raise an error
if not found:
raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name)
if 'add_host' in result_item:
# this task added a new host (add_host module)
new_host_info = result_item.get('add_host', dict())
self._add_host(new_host_info, iterator)
elif 'add_group' in result_item:
# this task added a new group (group_by module)
self._add_group(original_host, result_item)
elif 'ansible_facts' in result_item:
loop_var = 'item'
if original_task.loop_control:
loop_var = original_task.loop_control.loop_var or 'item'
item = result_item.get(loop_var, None)
if original_task.action == 'include_vars':
for (var_name, var_value) in iteritems(result_item['ansible_facts']):
# find the host we're actually refering too here, which may
# be a host that is not really in inventory at all
if original_task.delegate_to is not None and original_task.delegate_facts:
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=original_task)
self.add_tqm_variables(task_vars, play=iterator._play)
if item is not None:
task_vars[loop_var] = item
templar = Templar(loader=self._loader, variables=task_vars)
host_name = templar.template(original_task.delegate_to)
actual_host = self._inventory.get_host(host_name)
if actual_host is None:
actual_host = Host(name=host_name)
else:
actual_host = original_host
if original_task.run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else:
host_list = [actual_host]
for target_host in host_list:
self._variable_manager.set_host_variable(target_host, var_name, var_value)
else:
if original_task.run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else:
host_list = [original_host]
for target_host in host_list:
if original_task.action == 'set_fact':
self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy())
else:
self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy())
if 'diff' in task_result._result:
if self._diff:
self._tqm.send_callback('v2_on_file_diff', task_result)
self._tqm.send_callback('v2_runner_item_on_ok', task_result)
continue
if original_task.action not in ['include', 'include_role']:
self._tqm._stats.increment('ok', original_host.name)
if 'changed' in task_result._result and task_result._result['changed']:
self._tqm._stats.increment('changed', original_host.name)
if original_task.register:
#print("^ REGISTERING RESULT %s" % original_task.register)
if original_task.run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else:
host_list = [original_host]
# finally, send the ok for this task
self._tqm.send_callback('v2_runner_on_ok', task_result)
clean_copy = strip_internal_keys(task_result._result)
if 'invocation' in clean_copy:
del clean_copy['invocation']
self._pending_results -= 1
if original_host.name in self._blocked_hosts:
del self._blocked_hosts[original_host.name]
for target_host in host_list:
self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy})
# If this is a role task, mark the parent role as being run (if
# the task was ok or failed, but not skipped or unreachable)
if original_task._role is not None and role_ran: #TODO: and original_task.action != 'include_role':?
# lookup the role in the ROLE_CACHE to make sure we're dealing
# with the correct object and mark it as executed
for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[original_task._role._role_name]):
if role_obj._uuid == original_task._role._uuid:
role_obj._had_task_run[original_host.name] = True
# all host status messages contain 2 entries: (msg, task_result)
role_ran = False
if task_result.is_failed():
role_ran = True
if not original_task.ignore_errors:
display.debug("marking %s as failed" % original_host.name)
if original_task.run_once:
# if we're using run_once, we have to fail every host here
for h in self._inventory.get_hosts(iterator._play.hosts):
if h.name not in self._tqm._unreachable_hosts:
state, _ = iterator.get_next_task_for_host(h, peek=True)
iterator.mark_host_failed(h)
state, new_task = iterator.get_next_task_for_host(h, peek=True)
else:
iterator.mark_host_failed(original_host)
ret_results.append(task_result)
# only add the host to the failed list officially if it has
# been failed by the iterator
if iterator.is_failed(original_host):
self._tqm._failed_hosts[original_host.name] = True
self._tqm._stats.increment('failures', original_host.name)
else:
# otherwise, we grab the current state and if we're iterating on
# the rescue portion of a block then we save the failed task in a
# special var for use within the rescue/always
state, _ = iterator.get_next_task_for_host(original_host, peek=True)
if state.run_state == iterator.ITERATING_RESCUE:
self._variable_manager.set_nonpersistent_facts(
original_host,
dict(
ansible_failed_task=original_task.serialize(),
ansible_failed_result=task_result._result,
),
)
else:
self._tqm._stats.increment('ok', original_host.name)
self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors)
elif task_result.is_unreachable():
self._tqm._unreachable_hosts[original_host.name] = True
self._tqm._stats.increment('dark', original_host.name)
self._tqm.send_callback('v2_runner_on_unreachable', task_result)
elif task_result.is_skipped():
self._tqm._stats.increment('skipped', original_host.name)
self._tqm.send_callback('v2_runner_on_skipped', task_result)
else:
role_ran = True
except Queue.Empty:
passes += 1
if original_task.loop:
# this task had a loop, and has more than one result, so
# loop over all of them instead of a single result
result_items = task_result._result.get('results', [])
else:
result_items = [ task_result._result ]
if one_pass:
for result_item in result_items:
if '_ansible_notify' in result_item:
if task_result.is_changed():
# The shared dictionary for notified handlers is a proxy, which
# does not detect when sub-objects within the proxy are modified.
# So, per the docs, we reassign the list so the proxy picks up and
# notifies all other threads
for handler_name in result_item['_ansible_notify']:
# Find the handler using the above helper. First we look up the
# dependency chain of the current task (if it's from a role), otherwise
# we just look through the list of handlers in the current play/all
# roles and use the first one that matches the notify name
if handler_name in self._listening_handlers:
for listening_handler_name in self._listening_handlers[handler_name]:
listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers)
if listening_handler is None:
raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name)
if original_host not in self._notified_handlers[listening_handler]:
self._notified_handlers[listening_handler].append(original_host)
display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,))
else:
target_handler = search_handler_blocks(handler_name, iterator._play.handlers)
if target_handler is not None:
if original_host not in self._notified_handlers[target_handler]:
self._notified_handlers[target_handler].append(original_host)
# FIXME: should this be a callback?
display.vv("NOTIFIED HANDLER %s" % (handler_name,))
else:
# As there may be more than one handler with the notified name as the
# parent, so we just keep track of whether or not we found one at all
found = False
for target_handler in self._notified_handlers:
if parent_handler_match(target_handler, handler_name):
self._notified_handlers[target_handler].append(original_host)
display.vv("NOTIFIED HANDLER %s" % (target_handler.get_name(),))
found = True
# and if none were found, then we raise an error
if not found:
raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name)
if 'add_host' in result_item:
# this task added a new host (add_host module)
new_host_info = result_item.get('add_host', dict())
self._add_host(new_host_info, iterator)
elif 'add_group' in result_item:
# this task added a new group (group_by module)
self._add_group(original_host, result_item)
elif 'ansible_facts' in result_item:
loop_var = 'item'
if original_task.loop_control:
loop_var = original_task.loop_control.loop_var or 'item'
item = result_item.get(loop_var, None)
if original_task.action == 'include_vars':
for (var_name, var_value) in iteritems(result_item['ansible_facts']):
# find the host we're actually refering too here, which may
# be a host that is not really in inventory at all
if original_task.delegate_to is not None and original_task.delegate_facts:
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=original_task)
self.add_tqm_variables(task_vars, play=iterator._play)
if item is not None:
task_vars[loop_var] = item
templar = Templar(loader=self._loader, variables=task_vars)
host_name = templar.template(original_task.delegate_to)
actual_host = self._inventory.get_host(host_name)
if actual_host is None:
actual_host = Host(name=host_name)
else:
actual_host = original_host
if original_task.run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else:
host_list = [actual_host]
for target_host in host_list:
self._variable_manager.set_host_variable(target_host, var_name, var_value)
else:
if original_task.run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else:
host_list = [original_host]
for target_host in host_list:
if original_task.action == 'set_fact':
self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy())
else:
self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy())
if 'diff' in task_result._result:
if self._diff:
self._tqm.send_callback('v2_on_file_diff', task_result)
if original_task.action not in ['include', 'include_role']:
self._tqm._stats.increment('ok', original_host.name)
if 'changed' in task_result._result and task_result._result['changed']:
self._tqm._stats.increment('changed', original_host.name)
# finally, send the ok for this task
self._tqm.send_callback('v2_runner_on_ok', task_result)
self._pending_results -= 1
if original_host.name in self._blocked_hosts:
del self._blocked_hosts[original_host.name]
# If this is a role task, mark the parent role as being run (if
# the task was ok or failed, but not skipped or unreachable)
if original_task._role is not None and role_ran: #TODO: and original_task.action != 'include_role':?
# lookup the role in the ROLE_CACHE to make sure we're dealing
# with the correct object and mark it as executed
for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[original_task._role._role_name]):
if role_obj._uuid == original_task._role._uuid:
role_obj._had_task_run[original_host.name] = True
ret_results.append(task_result)
if one_pass or max_passes is not None and (cur_pass+1) >= max_passes:
break
cur_pass += 1
return ret_results
def _wait_on_pending_results(self, iterator):
@ -411,18 +523,14 @@ class StrategyBase:
ret_results = []
display.debug("waiting for pending results...")
dead_check = 10
while self._pending_results > 0 and not self._tqm._terminated:
if self._tqm.has_dead_workers():
raise AnsibleError("A worker was found in a dead state")
results = self._process_pending_results(iterator)
ret_results.extend(results)
dead_check -= 1
if dead_check == 0:
if self._pending_results > 0 and self._tqm.has_dead_workers():
raise AnsibleError("A worker was found in a dead state")
dead_check = 10
display.debug("no more pending results, returning what we have")
return ret_results

View file

@ -182,9 +182,7 @@ class StrategyModule(StrategyBase):
any_errors_fatal = False
results = []
items_to_queue = []
for (host, task) in host_tasks:
if not task:
continue
@ -259,27 +257,22 @@ class StrategyModule(StrategyBase):
display.debug("sending task start callback")
self._blocked_hosts[host.get_name()] = True
items_to_queue.append((host, task, task_vars))
self._pending_results += 1
self._queue_task(host, task, task_vars, play_context)
del task_vars
# if we're bypassing the host loop, break out now
if run_once:
break
# FIXME: probably not required here any more with the result proc
# having been removed, so there's no only a single result
# queue for the main thread
results += self._process_pending_results(iterator, one_pass=True)
self._tqm.queue_multiple_tasks(items_to_queue, play_context)
results += self._process_pending_results(iterator, max_passes=max(1, int(len(self._tqm._workers) * 0.1)))
# go to next host/task group
if skip_rest:
continue
display.debug("done queuing things up, now waiting for results queue to drain")
results += self._wait_on_pending_results(iterator)
if self._pending_results > 0:
results += self._wait_on_pending_results(iterator)
host_results.extend(results)
all_role_blocks = []