Merge branch 'performance_improvements' into devel

This commit is contained in:
James Cammarata 2016-08-08 16:08:51 -05:00
commit 43febc76c9
21 changed files with 504 additions and 636 deletions

View file

@ -244,14 +244,14 @@ class PlayIterator:
if ra != rb: if ra != rb:
return True return True
else: else:
return old_s.cur_dep_chain != task._block.get_dep_chain() return old_s.cur_dep_chain != task.get_dep_chain()
if task and task._role: if task and task._role:
# if we had a current role, mark that role as completed # if we had a current role, mark that role as completed
if s.cur_role and _roles_are_different(task._role, s.cur_role) and host.name in s.cur_role._had_task_run and not peek: if s.cur_role and _roles_are_different(task._role, s.cur_role) and host.name in s.cur_role._had_task_run and not peek:
s.cur_role._completed[host.name] = True s.cur_role._completed[host.name] = True
s.cur_role = task._role s.cur_role = task._role
s.cur_dep_chain = task._block.get_dep_chain() s.cur_dep_chain = task.get_dep_chain()
if not peek: if not peek:
self._host_states[host.name] = s self._host_states[host.name] = s
@ -508,6 +508,12 @@ class PlayIterator:
the different processes, and not all data structures are preserved. This method the different processes, and not all data structures are preserved. This method
allows us to find the original task passed into the executor engine. allows us to find the original task passed into the executor engine.
''' '''
if isinstance(task, Task):
the_uuid = task._uuid
else:
the_uuid = task
def _search_block(block): def _search_block(block):
''' '''
helper method to check a block's task lists (block/rescue/always) helper method to check a block's task lists (block/rescue/always)
@ -521,7 +527,7 @@ class PlayIterator:
res = _search_block(t) res = _search_block(t)
if res: if res:
return res return res
elif t._uuid == task._uuid: elif t._uuid == the_uuid:
return t return t
return None return None

View file

@ -1,196 +0,0 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.compat.six.moves import queue
from ansible.compat.six import iteritems, text_type
from ansible.vars import strip_internal_keys
import multiprocessing
import time
import traceback
# TODO: not needed if we use the cryptography library with its default RNG
# engine
HAS_ATFORK=True
try:
from Crypto.Random import atfork
except ImportError:
HAS_ATFORK=False
try:
from __main__ import display
except ImportError:
from ansible.utils.display import Display
display = Display()
__all__ = ['ResultProcess']
class ResultProcess(multiprocessing.Process):
'''
The result worker thread, which reads results from the results
queue and fires off callbacks/etc. as necessary.
'''
def __init__(self, final_q, workers):
# takes a task queue manager as the sole param:
self._final_q = final_q
self._workers = workers
self._cur_worker = 0
self._terminated = False
super(ResultProcess, self).__init__()
def _send_result(self, result):
display.debug(u"sending result: %s" % ([text_type(x) for x in result],))
self._final_q.put(result)
display.debug("done sending result")
def _read_worker_result(self):
result = None
starting_point = self._cur_worker
while True:
(worker_prc, rslt_q) = self._workers[self._cur_worker]
self._cur_worker += 1
if self._cur_worker >= len(self._workers):
self._cur_worker = 0
try:
if not rslt_q.empty():
display.debug("worker %d has data to read" % self._cur_worker)
result = rslt_q.get()
display.debug("got a result from worker %d: %s" % (self._cur_worker, result))
break
except queue.Empty:
pass
if self._cur_worker == starting_point:
break
return result
def terminate(self):
self._terminated = True
super(ResultProcess, self).terminate()
def run(self):
'''
The main thread execution, which reads from the results queue
indefinitely and sends callbacks/etc. when results are received.
'''
if HAS_ATFORK:
atfork()
while True:
try:
result = self._read_worker_result()
if result is None:
time.sleep(0.005)
continue
# send callbacks for 'non final' results
if '_ansible_retry' in result._result:
self._send_result(('v2_runner_retry', result))
continue
elif '_ansible_item_result' in result._result:
if result.is_failed() or result.is_unreachable():
self._send_result(('v2_runner_item_on_failed', result))
elif result.is_skipped():
self._send_result(('v2_runner_item_on_skipped', result))
else:
self._send_result(('v2_runner_item_on_ok', result))
if 'diff' in result._result:
self._send_result(('v2_on_file_diff', result))
continue
clean_copy = strip_internal_keys(result._result)
if 'invocation' in clean_copy:
del clean_copy['invocation']
# if this task is registering a result, do it now
if result._task.register:
self._send_result(('register_host_var', result._host, result._task, clean_copy))
# send callbacks, execute other options based on the result status
# TODO: this should all be cleaned up and probably moved to a sub-function.
# the fact that this sometimes sends a TaskResult and other times
# sends a raw dictionary back may be confusing, but the result vs.
# results implementation for tasks with loops should be cleaned up
# better than this
if result.is_unreachable():
self._send_result(('host_unreachable', result))
elif result.is_failed():
self._send_result(('host_task_failed', result))
elif result.is_skipped():
self._send_result(('host_task_skipped', result))
else:
if result._task.loop:
# this task had a loop, and has more than one result, so
# loop over all of them instead of a single result
result_items = result._result.get('results', [])
else:
result_items = [ result._result ]
for result_item in result_items:
# if this task is notifying a handler, do it now
if '_ansible_notify' in result_item:
if result.is_changed():
# The shared dictionary for notified handlers is a proxy, which
# does not detect when sub-objects within the proxy are modified.
# So, per the docs, we reassign the list so the proxy picks up and
# notifies all other threads
for notify in result_item['_ansible_notify']:
self._send_result(('notify_handler', result, notify))
if 'add_host' in result_item:
# this task added a new host (add_host module)
self._send_result(('add_host', result_item))
elif 'add_group' in result_item:
# this task added a new group (group_by module)
self._send_result(('add_group', result._host, result_item))
elif 'ansible_facts' in result_item:
# if this task is registering facts, do that now
loop_var = 'item'
if result._task.loop_control:
loop_var = result._task.loop_control.loop_var or 'item'
item = result_item.get(loop_var, None)
if result._task.action == 'include_vars':
for (key, value) in iteritems(result_item['ansible_facts']):
self._send_result(('set_host_var', result._host, result._task, item, key, value))
else:
self._send_result(('set_host_facts', result._host, result._task, item, result_item['ansible_facts']))
# finally, send the ok for this task
self._send_result(('host_task_ok', result))
except queue.Empty:
pass
except (KeyboardInterrupt, SystemExit, IOError, EOFError):
break
except:
# TODO: we should probably send a proper callback here instead of
# simply dumping a stack trace on the screen
traceback.print_exc()
break

View file

@ -100,6 +100,10 @@ class WorkerProcess(multiprocessing.Process):
signify that they are ready for their next task. signify that they are ready for their next task.
''' '''
#import cProfile, pstats, StringIO
#pr = cProfile.Profile()
#pr.enable()
if HAS_ATFORK: if HAS_ATFORK:
atfork() atfork()
@ -120,7 +124,7 @@ class WorkerProcess(multiprocessing.Process):
display.debug("done running TaskExecutor() for %s/%s" % (self._host, self._task)) display.debug("done running TaskExecutor() for %s/%s" % (self._host, self._task))
self._host.vars = dict() self._host.vars = dict()
self._host.groups = [] self._host.groups = []
task_result = TaskResult(self._host, self._task, executor_result) task_result = TaskResult(self._host.name, self._task._uuid, executor_result)
# put the result on the result queue # put the result on the result queue
display.debug("sending task result") display.debug("sending task result")
@ -130,7 +134,7 @@ class WorkerProcess(multiprocessing.Process):
except AnsibleConnectionFailure: except AnsibleConnectionFailure:
self._host.vars = dict() self._host.vars = dict()
self._host.groups = [] self._host.groups = []
task_result = TaskResult(self._host, self._task, dict(unreachable=True)) task_result = TaskResult(self._host.name, self._task._uuid, dict(unreachable=True))
self._rslt_q.put(task_result, block=False) self._rslt_q.put(task_result, block=False)
except Exception as e: except Exception as e:
@ -138,7 +142,7 @@ class WorkerProcess(multiprocessing.Process):
try: try:
self._host.vars = dict() self._host.vars = dict()
self._host.groups = [] self._host.groups = []
task_result = TaskResult(self._host, self._task, dict(failed=True, exception=to_unicode(traceback.format_exc()), stdout='')) task_result = TaskResult(self._host.name, self._task._uuid, dict(failed=True, exception=to_unicode(traceback.format_exc()), stdout=''))
self._rslt_q.put(task_result, block=False) self._rslt_q.put(task_result, block=False)
except: except:
display.debug(u"WORKER EXCEPTION: %s" % to_unicode(e)) display.debug(u"WORKER EXCEPTION: %s" % to_unicode(e))
@ -146,3 +150,11 @@ class WorkerProcess(multiprocessing.Process):
display.debug("WORKER PROCESS EXITING") display.debug("WORKER PROCESS EXITING")
#pr.disable()
#s = StringIO.StringIO()
#sortby = 'time'
#ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
#ps.print_stats()
#with open('worker_%06d.stats' % os.getpid(), 'w') as f:
# f.write(s.getvalue())

View file

@ -246,7 +246,8 @@ class TaskExecutor:
task_vars[loop_var] = item task_vars[loop_var] = item
try: try:
tmp_task = self._task.copy() tmp_task = self._task.copy(exclude_parent=True, exclude_tasks=True)
tmp_task._parent = self._task._parent
tmp_play_context = self._play_context.copy() tmp_play_context = self._play_context.copy()
except AnsibleParserError as e: except AnsibleParserError as e:
results.append(dict(failed=True, msg=to_unicode(e))) results.append(dict(failed=True, msg=to_unicode(e)))
@ -265,7 +266,7 @@ class TaskExecutor:
res[loop_var] = item res[loop_var] = item
res['_ansible_item_result'] = True res['_ansible_item_result'] = True
self._rslt_q.put(TaskResult(self._host, self._task, res), block=False) self._rslt_q.put(TaskResult(self._host.name, self._task._uuid, res), block=False)
results.append(res) results.append(res)
del task_vars[loop_var] del task_vars[loop_var]
@ -516,7 +517,7 @@ class TaskExecutor:
result['_ansible_retry'] = True result['_ansible_retry'] = True
result['retries'] = retries result['retries'] = retries
display.debug('Retrying task, attempt %d of %d' % (attempt, retries)) display.debug('Retrying task, attempt %d of %d' % (attempt, retries))
self._rslt_q.put(TaskResult(self._host, self._task, result), block=False) self._rslt_q.put(TaskResult(self._host.name, self._task._uuid, result), block=False)
time.sleep(delay) time.sleep(delay)
else: else:
if retries > 1: if retries > 1:

View file

@ -26,7 +26,6 @@ import tempfile
from ansible import constants as C from ansible import constants as C
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
from ansible.executor.play_iterator import PlayIterator from ansible.executor.play_iterator import PlayIterator
from ansible.executor.process.result import ResultProcess
from ansible.executor.stats import AggregateStats from ansible.executor.stats import AggregateStats
from ansible.playbook.block import Block from ansible.playbook.block import Block
from ansible.playbook.play_context import PlayContext from ansible.playbook.play_context import PlayContext
@ -81,7 +80,6 @@ class TaskQueueManager:
self._callbacks_loaded = False self._callbacks_loaded = False
self._callback_plugins = [] self._callback_plugins = []
self._start_at_done = False self._start_at_done = False
self._result_prc = None
# make sure the module path (if specified) is parsed and # make sure the module path (if specified) is parsed and
# added to the module_loader object # added to the module_loader object
@ -113,9 +111,6 @@ class TaskQueueManager:
rslt_q = multiprocessing.Queue() rslt_q = multiprocessing.Queue()
self._workers.append([None, rslt_q]) self._workers.append([None, rslt_q])
self._result_prc = ResultProcess(self._final_q, self._workers)
self._result_prc.start()
def _initialize_notified_handlers(self, play): def _initialize_notified_handlers(self, play):
''' '''
Clears and initializes the shared notified handlers dict with entries Clears and initializes the shared notified handlers dict with entries
@ -299,9 +294,7 @@ class TaskQueueManager:
self._cleanup_processes() self._cleanup_processes()
def _cleanup_processes(self): def _cleanup_processes(self):
if self._result_prc: if hasattr(self, '_workers'):
self._result_prc.terminate()
for (worker_prc, rslt_q) in self._workers: for (worker_prc, rslt_q) in self._workers:
rslt_q.close() rslt_q.close()
if worker_prc and worker_prc.is_alive(): if worker_prc and worker_prc.is_alive():

View file

@ -41,7 +41,7 @@ class TaskResult:
def is_skipped(self): def is_skipped(self):
# loop results # loop results
if 'results' in self._result and self._task.loop: if 'results' in self._result:
results = self._result['results'] results = self._result['results']
# Loop tasks are only considered skipped if all items were skipped. # Loop tasks are only considered skipped if all items were skipped.
# some squashed results (eg, yum) are not dicts and can't be skipped individually # some squashed results (eg, yum) are not dicts and can't be skipped individually
@ -62,7 +62,7 @@ class TaskResult:
return self._check_key('unreachable') return self._check_key('unreachable')
def _check_key(self, key): def _check_key(self, key):
if self._result.get('results', []) and self._task.loop: if self._result.get('results', []):
flag = False flag = False
for res in self._result.get('results', []): for res in self._result.get('results', []):
if isinstance(res, dict): if isinstance(res, dict):

View file

@ -64,12 +64,12 @@ class Host:
) )
def deserialize(self, data): def deserialize(self, data):
self.__init__() self.__init__(gen_uuid=False)
self.name = data.get('name') self.name = data.get('name')
self.vars = data.get('vars', dict()) self.vars = data.get('vars', dict())
self.address = data.get('address', '') self.address = data.get('address', '')
self._uuid = data.get('uuid', uuid.uuid4()) self._uuid = data.get('uuid', None)
self.implicit= data.get('implicit', False) self.implicit= data.get('implicit', False)
groups = data.get('groups', []) groups = data.get('groups', [])
@ -78,7 +78,7 @@ class Host:
g.deserialize(group_data) g.deserialize(group_data)
self.groups.append(g) self.groups.append(g)
def __init__(self, name=None, port=None): def __init__(self, name=None, port=None, gen_uuid=True):
self.name = name self.name = name
self.vars = {} self.vars = {}
@ -90,7 +90,9 @@ class Host:
self.set_variable('ansible_port', int(port)) self.set_variable('ansible_port', int(port))
self._gathered_facts = False self._gathered_facts = False
self._uuid = uuid.uuid4() self._uuid = None
if gen_uuid:
self._uuid = uuid.uuid4()
self.implicit = False self.implicit = False
def __repr__(self): def __repr__(self):

View file

@ -80,10 +80,13 @@ class Base:
# every object gets a random uuid: # every object gets a random uuid:
self._uuid = uuid.uuid4() self._uuid = uuid.uuid4()
#self._uuid = 1
# and initialize the base attributes # and initialize the base attributes
self._initialize_base_attributes() self._initialize_base_attributes()
self._cached_parent_attrs = dict()
# and init vars, avoid using defaults in field declaration as it lives across plays # and init vars, avoid using defaults in field declaration as it lives across plays
self.vars = dict() self.vars = dict()
@ -110,13 +113,21 @@ class Base:
@staticmethod @staticmethod
def _generic_g(prop_name, self): def _generic_g(prop_name, self):
method = "_get_attr_%s" % prop_name method = "_get_attr_%s" % prop_name
if hasattr(self, method): try:
value = getattr(self, method)() value = getattr(self, method)()
else: except AttributeError:
try: try:
value = self._attributes[prop_name] value = self._attributes[prop_name]
if value is None and hasattr(self, '_get_parent_attribute'): if value is None:
value = self._get_parent_attribute(prop_name) try:
if prop_name in self._cached_parent_attrs:
value = self._cached_parent_attrs[prop_name]
else:
value = self._get_parent_attribute(prop_name)
# FIXME: temporarily disabling due to bugs
#self._cached_parent_attrs[prop_name] = value
except AttributeError:
pass
except KeyError: except KeyError:
raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, prop_name)) raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, prop_name))
@ -137,8 +148,8 @@ class Base:
''' '''
# check cache before retrieving attributes # check cache before retrieving attributes
if self.__class__ in BASE_ATTRIBUTES: if self.__class__.__name__ in BASE_ATTRIBUTES:
return BASE_ATTRIBUTES[self.__class__] return BASE_ATTRIBUTES[self.__class__.__name__]
# Cache init # Cache init
base_attributes = dict() base_attributes = dict()
@ -147,9 +158,23 @@ class Base:
if name.startswith('_'): if name.startswith('_'):
name = name[1:] name = name[1:]
base_attributes[name] = value base_attributes[name] = value
BASE_ATTRIBUTES[self.__class__] = base_attributes BASE_ATTRIBUTES[self.__class__.__name__] = base_attributes
return base_attributes return base_attributes
def dump_me(self, depth=0):
if depth == 0:
print("DUMPING OBJECT ------------------------------------------------------")
print("%s- %s (%s)" % (" " * depth, self.__class__.__name__, self))
if hasattr(self, '_parent') and self._parent:
self._parent.dump_me(depth+2)
dep_chain = self._parent.get_dep_chain()
print("%s^ dep chain: %s" % (" "*(depth+2), dep_chain))
if dep_chain:
for dep in dep_chain:
dep.dump_me(depth+2)
if hasattr(self, '_play') and self._play:
self._play.dump_me(depth+2)
def _initialize_base_attributes(self): def _initialize_base_attributes(self):
# each class knows attributes set upon it, see Task.py for example # each class knows attributes set upon it, see Task.py for example
self._attributes = dict() self._attributes = dict()

View file

@ -44,16 +44,15 @@ class Block(Base, Become, Conditional, Taggable):
def __init__(self, play=None, parent_block=None, role=None, task_include=None, use_handlers=False, implicit=False): def __init__(self, play=None, parent_block=None, role=None, task_include=None, use_handlers=False, implicit=False):
self._play = play self._play = play
self._role = role self._role = role
self._task_include = None self._parent = None
self._parent_block = None
self._dep_chain = None self._dep_chain = None
self._use_handlers = use_handlers self._use_handlers = use_handlers
self._implicit = implicit self._implicit = implicit
if task_include: if task_include:
self._task_include = task_include self._parent = task_include
elif parent_block: elif parent_block:
self._parent_block = parent_block self._parent = parent_block
super(Block, self).__init__() super(Block, self).__init__()
@ -65,10 +64,8 @@ class Block(Base, Become, Conditional, Taggable):
all_vars = self.vars.copy() all_vars = self.vars.copy()
if self._parent_block: if self._parent:
all_vars.update(self._parent_block.get_vars()) all_vars.update(self._parent.get_vars())
if self._task_include:
all_vars.update(self._task_include.get_vars())
return all_vars return all_vars
@ -109,7 +106,7 @@ class Block(Base, Become, Conditional, Taggable):
play=self._play, play=self._play,
block=self, block=self,
role=self._role, role=self._role,
task_include=self._task_include, task_include=None,
variable_manager=self._variable_manager, variable_manager=self._variable_manager,
loader=self._loader, loader=self._loader,
use_handlers=self._use_handlers, use_handlers=self._use_handlers,
@ -124,7 +121,7 @@ class Block(Base, Become, Conditional, Taggable):
play=self._play, play=self._play,
block=self, block=self,
role=self._role, role=self._role,
task_include=self._task_include, task_include=None,
variable_manager=self._variable_manager, variable_manager=self._variable_manager,
loader=self._loader, loader=self._loader,
use_handlers=self._use_handlers, use_handlers=self._use_handlers,
@ -139,7 +136,7 @@ class Block(Base, Become, Conditional, Taggable):
play=self._play, play=self._play,
block=self, block=self,
role=self._role, role=self._role,
task_include=self._task_include, task_include=None,
variable_manager=self._variable_manager, variable_manager=self._variable_manager,
loader=self._loader, loader=self._loader,
use_handlers=self._use_handlers, use_handlers=self._use_handlers,
@ -149,10 +146,8 @@ class Block(Base, Become, Conditional, Taggable):
def get_dep_chain(self): def get_dep_chain(self):
if self._dep_chain is None: if self._dep_chain is None:
if self._parent_block: if self._parent:
return self._parent_block.get_dep_chain() return self._parent.get_dep_chain()
elif self._task_include:
return self._task_include._block.get_dep_chain()
else: else:
return None return None
else: else:
@ -162,12 +157,18 @@ class Block(Base, Become, Conditional, Taggable):
def _dupe_task_list(task_list, new_block): def _dupe_task_list(task_list, new_block):
new_task_list = [] new_task_list = []
for task in task_list: for task in task_list:
if isinstance(task, Block): new_task = task.copy(exclude_parent=True)
new_task = task.copy(exclude_parent=True) if task._parent:
new_task._parent_block = new_block new_task._parent = task._parent.copy(exclude_tasks=True)
# go up the parentage tree until we find an
# object without a parent and make this new
# block their parent
cur_obj = new_task
while cur_obj._parent:
cur_obj = cur_obj._parent
cur_obj._parent = new_block
else: else:
new_task = task.copy(exclude_block=True) new_task._parent = new_block
new_task._block = new_block
new_task_list.append(new_task) new_task_list.append(new_task)
return new_task_list return new_task_list
@ -175,27 +176,22 @@ class Block(Base, Become, Conditional, Taggable):
new_me._play = self._play new_me._play = self._play
new_me._use_handlers = self._use_handlers new_me._use_handlers = self._use_handlers
if self._dep_chain: if self._dep_chain is not None:
new_me._dep_chain = self._dep_chain[:] new_me._dep_chain = self._dep_chain[:]
new_me._parent = None
if self._parent and not exclude_parent:
new_me._parent = self._parent.copy(exclude_tasks=exclude_tasks)
if not exclude_tasks: if not exclude_tasks:
new_me.block = _dupe_task_list(self.block or [], new_me) new_me.block = _dupe_task_list(self.block or [], new_me)
new_me.rescue = _dupe_task_list(self.rescue or [], new_me) new_me.rescue = _dupe_task_list(self.rescue or [], new_me)
new_me.always = _dupe_task_list(self.always or [], new_me) new_me.always = _dupe_task_list(self.always or [], new_me)
new_me._parent_block = None
if self._parent_block and not exclude_parent:
new_me._parent_block = self._parent_block.copy(exclude_tasks=exclude_tasks)
new_me._role = None new_me._role = None
if self._role: if self._role:
new_me._role = self._role new_me._role = self._role
new_me._task_include = None
if self._task_include:
new_me._task_include = self._task_include.copy(exclude_block=True)
new_me._task_include._block = self._task_include._block.copy(exclude_tasks=True)
return new_me return new_me
def serialize(self): def serialize(self):
@ -213,10 +209,9 @@ class Block(Base, Become, Conditional, Taggable):
if self._role is not None: if self._role is not None:
data['role'] = self._role.serialize() data['role'] = self._role.serialize()
if self._task_include is not None: if self._parent is not None:
data['task_include'] = self._task_include.serialize() data['parent'] = self._parent.copy(exclude_tasks=True).serialize()
if self._parent_block is not None: data['parent_type'] = self._parent.__class__.__name__
data['parent_block'] = self._parent_block.copy(exclude_tasks=True).serialize()
return data return data
@ -226,7 +221,10 @@ class Block(Base, Become, Conditional, Taggable):
serialize method serialize method
''' '''
# import is here to avoid import loops
from ansible.playbook.task import Task from ansible.playbook.task import Task
from ansible.playbook.task_include import TaskInclude
from ansible.playbook.handler_task_include import HandlerTaskInclude
# we don't want the full set of attributes (the task lists), as that # we don't want the full set of attributes (the task lists), as that
# would lead to a serialize/deserialize loop # would lead to a serialize/deserialize loop
@ -243,19 +241,18 @@ class Block(Base, Become, Conditional, Taggable):
r.deserialize(role_data) r.deserialize(role_data)
self._role = r self._role = r
# if there was a serialized task include, unpack it too parent_data = data.get('parent')
ti_data = data.get('task_include') if parent_data:
if ti_data: parent_type = data.get('parent_type')
ti = Task() if parent_type == 'Block':
ti.deserialize(ti_data) p = Block()
self._task_include = ti elif parent_type == 'TaskInclude':
p = TaskInclude()
pb_data = data.get('parent_block') elif parent_type == 'HandlerTaskInclude':
if pb_data: p = HandlerTaskInclude()
pb = Block() p.deserialize(pb_data)
pb.deserialize(pb_data) self._parent = p
self._parent_block = pb self._dep_chain = self._parent.get_dep_chain()
self._dep_chain = self._parent_block.get_dep_chain()
def evaluate_conditional(self, templar, all_vars): def evaluate_conditional(self, templar, all_vars):
dep_chain = self.get_dep_chain() dep_chain = self.get_dep_chain()
@ -263,24 +260,18 @@ class Block(Base, Become, Conditional, Taggable):
for dep in dep_chain: for dep in dep_chain:
if not dep.evaluate_conditional(templar, all_vars): if not dep.evaluate_conditional(templar, all_vars):
return False return False
if self._task_include is not None: if self._parent is not None:
if not self._task_include.evaluate_conditional(templar, all_vars): if not self._parent.evaluate_conditional(templar, all_vars):
return False
if self._parent_block is not None:
if not self._parent_block.evaluate_conditional(templar, all_vars):
return False return False
return super(Block, self).evaluate_conditional(templar, all_vars) return super(Block, self).evaluate_conditional(templar, all_vars)
def set_loader(self, loader): def set_loader(self, loader):
self._loader = loader self._loader = loader
if self._parent_block: if self._parent:
self._parent_block.set_loader(loader) self._parent.set_loader(loader)
elif self._role: elif self._role:
self._role.set_loader(loader) self._role.set_loader(loader)
if self._task_include:
self._task_include.set_loader(loader)
dep_chain = self.get_dep_chain() dep_chain = self.get_dep_chain()
if dep_chain: if dep_chain:
for dep in dep_chain: for dep in dep_chain:
@ -295,14 +286,8 @@ class Block(Base, Become, Conditional, Taggable):
try: try:
value = self._attributes[attr] value = self._attributes[attr]
if self._parent_block and (value is None or extend): if self._parent and (value is None or extend):
parent_value = getattr(self._parent_block, attr, None) parent_value = getattr(self._parent, attr, None)
if extend:
value = self._extend_value(value, parent_value)
else:
value = parent_value
if self._task_include and (value is None or extend):
parent_value = getattr(self._task_include, attr, None)
if extend: if extend:
value = self._extend_value(value, parent_value) value = self._extend_value(value, parent_value)
else: else:
@ -383,3 +368,8 @@ class Block(Base, Become, Conditional, Taggable):
def has_tasks(self): def has_tasks(self):
return len(self.block) > 0 or len(self.rescue) > 0 or len(self.always) > 0 return len(self.block) > 0 or len(self.rescue) > 0 or len(self.always) > 0
def get_include_params(self):
if self._parent:
return self._parent.get_include_params()
else:
return dict()

View file

@ -46,9 +46,9 @@ def load_list_of_blocks(ds, play, parent_block=None, role=None, task_include=Non
block_list = [] block_list = []
if ds: if ds:
for block in ds: for block_ds in ds:
b = Block.load( b = Block.load(
block, block_ds,
play=play, play=play,
parent_block=parent_block, parent_block=parent_block,
role=role, role=role,
@ -96,7 +96,7 @@ def load_list_of_tasks(ds, play, block=None, role=None, task_include=None, use_h
play=play, play=play,
parent_block=block, parent_block=block,
role=role, role=role,
task_include=None, task_include=task_include,
use_handlers=use_handlers, use_handlers=use_handlers,
variable_manager=variable_manager, variable_manager=variable_manager,
loader=loader, loader=loader,
@ -105,9 +105,19 @@ def load_list_of_tasks(ds, play, block=None, role=None, task_include=None, use_h
else: else:
if 'include' in task_ds: if 'include' in task_ds:
if use_handlers: if use_handlers:
t = HandlerTaskInclude.load(task_ds, block=block, role=role, task_include=task_include, variable_manager=variable_manager, loader=loader) include_class = HandlerTaskInclude
else: else:
t = TaskInclude.load(task_ds, block=block, role=role, task_include=task_include, variable_manager=variable_manager, loader=loader) include_class = TaskInclude
t = include_class.load(
task_ds,
block=block,
role=role,
task_include=None,
variable_manager=variable_manager,
loader=loader
)
all_vars = variable_manager.get_vars(loader=loader, play=play, task=t) all_vars = variable_manager.get_vars(loader=loader, play=play, task=t)
templar = Templar(loader=loader, variables=all_vars) templar = Templar(loader=loader, variables=all_vars)
@ -134,6 +144,9 @@ def load_list_of_tasks(ds, play, block=None, role=None, task_include=None, use_h
parent_include = task_include parent_include = task_include
cumulative_path = None cumulative_path = None
while parent_include is not None: while parent_include is not None:
if not isinstance(parent_include, TaskInclude):
parent_include = parent_include._parent
continue
parent_include_dir = templar.template(os.path.dirname(parent_include.args.get('_raw_params'))) parent_include_dir = templar.template(os.path.dirname(parent_include.args.get('_raw_params')))
if cumulative_path is None: if cumulative_path is None:
cumulative_path = parent_include_dir cumulative_path = parent_include_dir
@ -149,7 +162,7 @@ def load_list_of_tasks(ds, play, block=None, role=None, task_include=None, use_h
if os.path.exists(include_file): if os.path.exists(include_file):
break break
else: else:
parent_include = parent_include._task_include parent_include = parent_include._parent
else: else:
try: try:
include_target = templar.template(t.args['_raw_params']) include_target = templar.template(t.args['_raw_params'])
@ -195,8 +208,8 @@ def load_list_of_tasks(ds, play, block=None, role=None, task_include=None, use_h
included_blocks = load_list_of_blocks( included_blocks = load_list_of_blocks(
data, data,
play=play, play=play,
parent_block=block, parent_block=None,
task_include=t, task_include=t.copy(),
role=role, role=role,
use_handlers=use_handlers, use_handlers=use_handlers,
loader=loader, loader=loader,
@ -213,8 +226,8 @@ def load_list_of_tasks(ds, play, block=None, role=None, task_include=None, use_h
if len(tags) > 0: if len(tags) > 0:
if len(t.tags) > 0: if len(t.tags) > 0:
raise AnsibleParserError( raise AnsibleParserError(
"Include tasks should not specify tags in more than one way (both via args and directly on the task)." \ "Include tasks should not specify tags in more than one way (both via args and directly on the task). " \
" Mixing tag specify styles is prohibited for whole import hierarchy, not only for single import statement", "Mixing styles in which tags are specified is prohibited for whole import hierarchy, not only for single import statement",
obj=task_ds, obj=task_ds,
suppress_extended_error=True, suppress_extended_error=True,
) )

View file

@ -22,6 +22,7 @@ __metaclass__ = type
import os import os
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
from ansible.playbook.task_include import TaskInclude
from ansible.template import Templar from ansible.template import Templar
try: try:
@ -60,8 +61,11 @@ class IncludedFile:
for res in results: for res in results:
if res._task.action == 'include': original_host = res._host
if res._task.loop: original_task = res._task
if original_task.action == 'include':
if original_task.loop:
if 'results' not in res._result: if 'results' not in res._result:
continue continue
include_results = res._result['results'] include_results = res._result['results']
@ -73,29 +77,30 @@ class IncludedFile:
if 'skipped' in include_result and include_result['skipped'] or 'failed' in include_result: if 'skipped' in include_result and include_result['skipped'] or 'failed' in include_result:
continue continue
original_host = get_original_host(res._host)
original_task = iterator.get_original_task(original_host, res._task)
task_vars = variable_manager.get_vars(loader=loader, play=iterator._play, host=original_host, task=original_task) task_vars = variable_manager.get_vars(loader=loader, play=iterator._play, host=original_host, task=original_task)
templar = Templar(loader=loader, variables=task_vars) templar = Templar(loader=loader, variables=task_vars)
include_variables = include_result.get('include_variables', dict()) include_variables = include_result.get('include_variables', dict())
loop_var = 'item' loop_var = 'item'
if res._task.loop_control: if original_task.loop_control:
loop_var = res._task.loop_control.loop_var or 'item' loop_var = original_task.loop_control.loop_var or 'item'
if loop_var in include_result: if loop_var in include_result:
task_vars[loop_var] = include_variables[loop_var] = include_result[loop_var] task_vars[loop_var] = include_variables[loop_var] = include_result[loop_var]
include_file = None
if original_task: if original_task:
if original_task.static: if original_task.static:
continue continue
if original_task._task_include: if original_task._parent:
# handle relative includes by walking up the list of parent include # handle relative includes by walking up the list of parent include
# tasks and checking the relative result to see if it exists # tasks and checking the relative result to see if it exists
parent_include = original_task._task_include parent_include = original_task._parent
cumulative_path = None cumulative_path = None
while parent_include is not None: while parent_include is not None:
if not isinstance(parent_include, TaskInclude):
parent_include = parent_include._parent
continue
parent_include_dir = templar.template(os.path.dirname(parent_include.args.get('_raw_params'))) parent_include_dir = templar.template(os.path.dirname(parent_include.args.get('_raw_params')))
if cumulative_path is None: if cumulative_path is None:
cumulative_path = parent_include_dir cumulative_path = parent_include_dir
@ -111,14 +116,14 @@ class IncludedFile:
if os.path.exists(include_file): if os.path.exists(include_file):
break break
else: else:
parent_include = parent_include._task_include parent_include = parent_include._parent
elif original_task._role:
if include_file is None:
if original_task._role:
include_target = templar.template(include_result['include']) include_target = templar.template(include_result['include'])
include_file = loader.path_dwim_relative(original_task._role._role_path, 'tasks', include_target) include_file = loader.path_dwim_relative(original_task._role._role_path, 'tasks', include_target)
else: else:
include_file = loader.path_dwim(include_result['include']) include_file = loader.path_dwim(include_result['include'])
else:
include_file = loader.path_dwim(include_result['include'])
include_file = templar.template(include_file) include_file = templar.template(include_file)
inc_file = IncludedFile(include_file, include_variables, original_task) inc_file = IncludedFile(include_file, include_variables, original_task)

View file

@ -353,7 +353,9 @@ class Role(Base, Become, Conditional, Taggable):
block_list.extend(dep_blocks) block_list.extend(dep_blocks)
for task_block in self._task_blocks: for task_block in self._task_blocks:
new_task_block = task_block.copy() new_task_block = task_block.copy(exclude_parent=True)
if task_block._parent:
new_task_block._parent = task_block._parent.copy()
new_task_block._dep_chain = new_dep_chain new_task_block._dep_chain = new_dep_chain
new_task_block._play = play new_task_block._play = play
block_list.append(new_task_block) block_list.append(new_task_block)

View file

@ -92,9 +92,13 @@ class Task(Base, Conditional, Taggable, Become):
def __init__(self, block=None, role=None, task_include=None): def __init__(self, block=None, role=None, task_include=None):
''' constructors a task, without the Task.load classmethod, it will be pretty blank ''' ''' constructors a task, without the Task.load classmethod, it will be pretty blank '''
self._block = block self._role = role
self._role = role self._parent = None
self._task_include = task_include
if task_include:
self._parent = task_include
else:
self._parent = block
super(Task, self).__init__() super(Task, self).__init__()
@ -242,10 +246,8 @@ class Task(Base, Conditional, Taggable, Become):
the block and task include (if any) to which this task belongs. the block and task include (if any) to which this task belongs.
''' '''
if self._block: if self._parent:
self._block.post_validate(templar) self._parent.post_validate(templar)
if self._task_include:
self._task_include.post_validate(templar)
super(Task, self).post_validate(templar) super(Task, self).post_validate(templar)
@ -304,10 +306,8 @@ class Task(Base, Conditional, Taggable, Become):
def get_vars(self): def get_vars(self):
all_vars = dict() all_vars = dict()
if self._block: if self._parent:
all_vars.update(self._block.get_vars()) all_vars.update(self._parent.get_vars())
if self._task_include:
all_vars.update(self._task_include.get_vars())
all_vars.update(self.vars) all_vars.update(self.vars)
@ -320,55 +320,55 @@ class Task(Base, Conditional, Taggable, Become):
def get_include_params(self): def get_include_params(self):
all_vars = dict() all_vars = dict()
if self._task_include: if self._parent:
all_vars.update(self._task_include.get_include_params()) all_vars.update(self._parent.get_include_params())
if self.action == 'include': if self.action == 'include':
all_vars.update(self.vars) all_vars.update(self.vars)
return all_vars return all_vars
def copy(self, exclude_block=False): def copy(self, exclude_parent=False, exclude_tasks=False):
new_me = super(Task, self).copy() new_me = super(Task, self).copy()
new_me._block = None new_me._parent = None
if self._block and not exclude_block: if self._parent and not exclude_parent:
new_me._block = self._block.copy() new_me._parent = self._parent.copy(exclude_tasks=exclude_tasks)
new_me._role = None new_me._role = None
if self._role: if self._role:
new_me._role = self._role new_me._role = self._role
new_me._task_include = None
if self._task_include:
new_me._task_include = self._task_include.copy(exclude_block=exclude_block)
return new_me return new_me
def serialize(self): def serialize(self):
data = super(Task, self).serialize() data = super(Task, self).serialize()
if self._block: if self._parent:
data['block'] = self._block.serialize() data['parent'] = self._parent.serialize()
data['parent_type'] = self._parent.__class__.__name__
if self._role: if self._role:
data['role'] = self._role.serialize() data['role'] = self._role.serialize()
if self._task_include:
data['task_include'] = self._task_include.serialize()
return data return data
def deserialize(self, data): def deserialize(self, data):
# import is here to avoid import loops # import is here to avoid import loops
#from ansible.playbook.task_include import TaskInclude from ansible.playbook.task_include import TaskInclude
from ansible.playbook.handler_task_include import HandlerTaskInclude
block_data = data.get('block') parent_data = data.get('parent', None)
if parent_data:
if block_data: parent_type = data.get('parent_type')
b = Block() if parent_type == 'Block':
b.deserialize(block_data) p = Block()
self._block = b elif parent_type == 'TaskInclude':
del data['block'] p = TaskInclude()
elif parent_type == 'HandlerTaskInclude':
p = HandlerTaskInclude()
p.deserialize(parent_data)
self._parent = p
del data['parent']
role_data = data.get('role') role_data = data.get('role')
if role_data: if role_data:
@ -377,22 +377,11 @@ class Task(Base, Conditional, Taggable, Become):
self._role = r self._role = r
del data['role'] del data['role']
ti_data = data.get('task_include')
if ti_data:
#ti = TaskInclude()
ti = Task()
ti.deserialize(ti_data)
self._task_include = ti
del data['task_include']
super(Task, self).deserialize(data) super(Task, self).deserialize(data)
def evaluate_conditional(self, templar, all_vars): def evaluate_conditional(self, templar, all_vars):
if self._block is not None: if self._parent is not None:
if not self._block.evaluate_conditional(templar, all_vars): if not self._parent.evaluate_conditional(templar, all_vars):
return False
if self._task_include is not None:
if not self._task_include.evaluate_conditional(templar, all_vars):
return False return False
return super(Task, self).evaluate_conditional(templar, all_vars) return super(Task, self).evaluate_conditional(templar, all_vars)
@ -405,10 +394,8 @@ class Task(Base, Conditional, Taggable, Become):
self._loader = loader self._loader = loader
if self._block: if self._parent:
self._block.set_loader(loader) self._parent.set_loader(loader)
if self._task_include:
self._task_include.set_loader(loader)
def _get_parent_attribute(self, attr, extend=False): def _get_parent_attribute(self, attr, extend=False):
''' '''
@ -418,14 +405,8 @@ class Task(Base, Conditional, Taggable, Become):
try: try:
value = self._attributes[attr] value = self._attributes[attr]
if self._block and (value is None or extend): if self._parent and (value is None or extend):
parent_value = getattr(self._block, attr, None) parent_value = getattr(self._parent, attr, None)
if extend:
value = self._extend_value(value, parent_value)
else:
value = parent_value
if self._task_include and (value is None or extend):
parent_value = getattr(self._task_include, attr, None)
if extend: if extend:
value = self._extend_value(value, parent_value) value = self._extend_value(value, parent_value)
else: else:
@ -457,6 +438,11 @@ class Task(Base, Conditional, Taggable, Become):
def _get_attr_loop_control(self): def _get_attr_loop_control(self):
return self._attributes['loop_control'] return self._attributes['loop_control']
def get_dep_chain(self):
if self._parent:
return self._parent.get_dep_chain()
else:
return None
def get_search_path(self): def get_search_path(self):
''' '''
@ -465,7 +451,7 @@ class Task(Base, Conditional, Taggable, Become):
''' '''
path_stack = [] path_stack = []
dep_chain = self._block.get_dep_chain() dep_chain = self.get_dep_chain()
# inside role: add the dependency chain from current to dependant # inside role: add the dependency chain from current to dependant
if dep_chain: if dep_chain:
path_stack.extend(reversed([x._role_path for x in dep_chain])) path_stack.extend(reversed([x._role_path for x in dep_chain]))

View file

@ -55,10 +55,8 @@ class TaskInclude(Task):
they are params to the included tasks. they are params to the included tasks.
''' '''
all_vars = dict() all_vars = dict()
if self._block: if self._parent:
all_vars.update(self._block.get_vars()) all_vars.update(self._parent.get_vars())
if self._task_include:
all_vars.update(self._task_include.get_vars())
all_vars.update(self.vars) all_vars.update(self.vars)
all_vars.update(self.args) all_vars.update(self.args)

View file

@ -43,7 +43,8 @@ from ansible.plugins import action_loader, connection_loader, filter_loader, loo
from ansible.template import Templar from ansible.template import Templar
from ansible.utils.unicode import to_unicode from ansible.utils.unicode import to_unicode
from ansible.vars.unsafe_proxy import wrap_var from ansible.vars.unsafe_proxy import wrap_var
from ansible.vars import combine_vars from ansible.vars import combine_vars, strip_internal_keys
try: try:
from __main__ import display from __main__ import display
@ -174,6 +175,7 @@ class StrategyBase:
# The next common higher level is __init__.py::run() and that has # The next common higher level is __init__.py::run() and that has
# tasks inside of play_iterator so we'd have to extract them to do it # tasks inside of play_iterator so we'd have to extract them to do it
# there. # there.
global action_write_locks global action_write_locks
if task.action not in action_write_locks: if task.action not in action_write_locks:
display.debug('Creating lock for %s' % task.action) display.debug('Creating lock for %s' % task.action)
@ -189,21 +191,22 @@ class StrategyBase:
shared_loader_obj = SharedPluginLoaderObj() shared_loader_obj = SharedPluginLoaderObj()
queued = False queued = False
starting_worker = self._cur_worker
while True: while True:
(worker_prc, rslt_q) = self._workers[self._cur_worker] (worker_prc, rslt_q) = self._workers[self._cur_worker]
if worker_prc is None or not worker_prc.is_alive(): if worker_prc is None or not worker_prc.is_alive():
worker_prc = WorkerProcess(rslt_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj) worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
self._workers[self._cur_worker][0] = worker_prc self._workers[self._cur_worker][0] = worker_prc
worker_prc.start() worker_prc.start()
queued = True queued = True
self._cur_worker += 1 self._cur_worker += 1
if self._cur_worker >= len(self._workers): if self._cur_worker >= len(self._workers):
self._cur_worker = 0 self._cur_worker = 0
time.sleep(0.005)
if queued: if queued:
break break
elif self._cur_worker == starting_worker:
time.sleep(0.0001)
del task_vars
self._pending_results += 1 self._pending_results += 1
except (EOFError, IOError, AssertionError) as e: except (EOFError, IOError, AssertionError) as e:
# most likely an abort # most likely an abort
@ -219,222 +222,233 @@ class StrategyBase:
ret_results = [] ret_results = []
def get_original_host(host_name):
host_name = to_unicode(host_name)
if host_name in self._inventory._hosts_cache:
return self._inventory._hosts_cache[host_name]
else:
return self._inventory.get_host(host_name)
def search_handler_blocks(handler_name, handler_blocks):
for handler_block in handler_blocks:
for handler_task in handler_block.block:
handler_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, task=handler_task)
templar = Templar(loader=self._loader, variables=handler_vars)
try:
# first we check with the full result of get_name(), which may
# include the role name (if the handler is from a role). If that
# is not found, we resort to the simple name field, which doesn't
# have anything extra added to it.
target_handler_name = templar.template(handler_task.name)
if target_handler_name == handler_name:
return handler_task
else:
target_handler_name = templar.template(handler_task.get_name())
if target_handler_name == handler_name:
return handler_task
except (UndefinedError, AnsibleUndefinedVariable) as e:
# We skip this handler due to the fact that it may be using
# a variable in the name that was conditionally included via
# set_fact or some other method, and we don't want to error
# out unnecessarily
continue
return None
while not self._final_q.empty() and not self._tqm._terminated: while not self._final_q.empty() and not self._tqm._terminated:
try: try:
result = self._final_q.get() task_result = self._final_q.get(block=False)
display.debug("got result from result worker: %s" % ([text_type(x) for x in result],)) original_host = get_original_host(task_result._host)
original_task = iterator.get_original_task(original_host, task_result._task)
task_result._host = original_host
task_result._task = original_task
# helper method, used to find the original host from the one # send callbacks for 'non final' results
# returned in the result/message, which has been serialized and if '_ansible_retry' in task_result._result:
# thus had some information stripped from it to speed up the self._tqm.send_callback('v2_runner_retry', task_result)
# serialization process continue
def get_original_host(host): elif '_ansible_item_result' in task_result._result:
if host.name in self._inventory._hosts_cache: if task_result.is_failed() or task_result.is_unreachable():
return self._inventory._hosts_cache[host.name] self._tqm.send_callback('v2_runner_item_on_failed', task_result)
elif task_result.is_skipped():
self._tqm.send_callback('v2_runner_item_on_skipped', task_result)
else: else:
return self._inventory.get_host(host.name) self._tqm.send_callback('v2_runner_item_on_ok', task_result)
continue
if original_task.register:
if original_task.run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else:
host_list = [original_host]
clean_copy = strip_internal_keys(task_result._result)
if 'invocation' in clean_copy:
del clean_copy['invocation']
for target_host in host_list:
self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy})
# all host status messages contain 2 entries: (msg, task_result) # all host status messages contain 2 entries: (msg, task_result)
if result[0] in ('host_task_ok', 'host_task_failed', 'host_task_skipped', 'host_unreachable'): role_ran = False
task_result = result[1] if task_result.is_failed():
host = get_original_host(task_result._host) role_ran = True
task = task_result._task if not original_task.ignore_errors:
if result[0] == 'host_task_failed' or task_result.is_failed(): display.debug("marking %s as failed" % original_host.name)
if not task.ignore_errors: if original_task.run_once:
display.debug("marking %s as failed" % host.name) # if we're using run_once, we have to fail every host here
if task.run_once: [iterator.mark_host_failed(h) for h in self._inventory.get_hosts(iterator._play.hosts) if h.name not in self._tqm._unreachable_hosts]
# if we're using run_once, we have to fail every host here
[iterator.mark_host_failed(h) for h in self._inventory.get_hosts(iterator._play.hosts) if h.name not in self._tqm._unreachable_hosts]
else:
iterator.mark_host_failed(host)
# only add the host to the failed list officially if it has
# been failed by the iterator
if iterator.is_failed(host):
self._tqm._failed_hosts[host.name] = True
self._tqm._stats.increment('failures', host.name)
else:
# otherwise, we grab the current state and if we're iterating on
# the rescue portion of a block then we save the failed task in a
# special var for use within the rescue/always
state, _ = iterator.get_next_task_for_host(host, peek=True)
if state.run_state == iterator.ITERATING_RESCUE:
original_task = iterator.get_original_task(host, task)
self._variable_manager.set_nonpersistent_facts(
host,
dict(
ansible_failed_task=original_task.serialize(),
ansible_failed_result=task_result._result,
),
)
else: else:
self._tqm._stats.increment('ok', host.name) iterator.mark_host_failed(original_host)
self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=task.ignore_errors)
elif result[0] == 'host_unreachable':
self._tqm._unreachable_hosts[host.name] = True
self._tqm._stats.increment('dark', host.name)
self._tqm.send_callback('v2_runner_on_unreachable', task_result)
elif result[0] == 'host_task_skipped':
self._tqm._stats.increment('skipped', host.name)
self._tqm.send_callback('v2_runner_on_skipped', task_result)
elif result[0] == 'host_task_ok':
if task.action != 'include':
self._tqm._stats.increment('ok', host.name)
if 'changed' in task_result._result and task_result._result['changed']:
self._tqm._stats.increment('changed', host.name)
self._tqm.send_callback('v2_runner_on_ok', task_result)
# only add the host to the failed list officially if it has
# been failed by the iterator
if iterator.is_failed(original_host):
self._tqm._failed_hosts[original_host.name] = True
self._tqm._stats.increment('failures', original_host.name)
else:
# otherwise, we grab the current state and if we're iterating on
# the rescue portion of a block then we save the failed task in a
# special var for use within the rescue/always
state, _ = iterator.get_next_task_for_host(original_host, peek=True)
if state.run_state == iterator.ITERATING_RESCUE:
self._variable_manager.set_nonpersistent_facts(
original_host,
dict(
ansible_failed_task=original_task.serialize(),
ansible_failed_result=task_result._result,
),
)
else:
self._tqm._stats.increment('ok', original_host.name)
self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors)
elif task_result.is_unreachable():
self._tqm._unreachable_hosts[original_host.name] = True
self._tqm._stats.increment('dark', original_host.name)
self._tqm.send_callback('v2_runner_on_unreachable', task_result)
elif task_result.is_skipped():
self._tqm._stats.increment('skipped', original_host.name)
self._tqm.send_callback('v2_runner_on_skipped', task_result)
else:
role_ran = True
if original_task.loop:
# this task had a loop, and has more than one result, so
# loop over all of them instead of a single result
result_items = task_result._result.get('results', [])
else:
result_items = [ task_result._result ]
for result_item in result_items:
if '_ansible_notify' in result_item:
if task_result.is_changed():
# The shared dictionary for notified handlers is a proxy, which
# does not detect when sub-objects within the proxy are modified.
# So, per the docs, we reassign the list so the proxy picks up and
# notifies all other threads
for handler_name in result_item['_ansible_notify']:
# Find the handler using the above helper. First we look up the
# dependency chain of the current task (if it's from a role), otherwise
# we just look through the list of handlers in the current play/all
# roles and use the first one that matches the notify name
if handler_name in self._listening_handlers:
for listening_handler_name in self._listening_handlers[handler_name]:
listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers)
if listening_handler is None:
raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name)
if original_host not in self._notified_handlers[listening_handler]:
self._notified_handlers[listening_handler].append(original_host)
display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,))
else:
target_handler = search_handler_blocks(handler_name, iterator._play.handlers)
if target_handler is None:
raise AnsibleError("The requested handler '%s' was not found in any of the known handlers" % handler_name)
if target_handler in self._notified_handlers:
if original_host not in self._notified_handlers[target_handler]:
self._notified_handlers[target_handler].append(original_host)
# FIXME: should this be a callback?
display.vv("NOTIFIED HANDLER %s" % (handler_name,))
else:
raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name)
if 'add_host' in result_item:
# this task added a new host (add_host module)
new_host_info = result_item.get('add_host', dict())
self._add_host(new_host_info, iterator)
elif 'add_group' in result_item:
# this task added a new group (group_by module)
self._add_group(original_host, result_item)
elif 'ansible_facts' in result_item:
loop_var = 'item'
if original_task.loop_control:
loop_var = original_task.loop_control.loop_var or 'item'
item = result_item.get(loop_var, None)
if original_task.action == 'include_vars':
for (var_name, var_value) in iteritems(result_item['ansible_facts']):
# find the host we're actually refering too here, which may
# be a host that is not really in inventory at all
if original_task.delegate_to is not None and original_task.delegate_facts:
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task)
self.add_tqm_variables(task_vars, play=iterator._play)
if item is not None:
task_vars[loop_var] = item
templar = Templar(loader=self._loader, variables=task_vars)
host_name = templar.template(original_task.delegate_to)
actual_host = self._inventory.get_host(host_name)
if actual_host is None:
actual_host = Host(name=host_name)
else:
actual_host = original_host
if original_task.run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else:
host_list = [actual_host]
for target_host in host_list:
self._variable_manager.set_host_variable(target_host, var_name, var_value)
else:
if original_task.run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else:
host_list = [original_host]
for target_host in host_list:
if original_task.action == 'set_fact':
self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy())
else:
self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy())
if 'diff' in task_result._result:
if self._diff: if self._diff:
self._tqm.send_callback('v2_on_file_diff', task_result) self._tqm.send_callback('v2_on_file_diff', task_result)
self._pending_results -= 1 if original_task.action != 'include':
if host.name in self._blocked_hosts: self._tqm._stats.increment('ok', original_host.name)
del self._blocked_hosts[host.name] if 'changed' in task_result._result and task_result._result['changed']:
self._tqm._stats.increment('changed', original_host.name)
# If this is a role task, mark the parent role as being run (if # finally, send the ok for this task
# the task was ok or failed, but not skipped or unreachable) self._tqm.send_callback('v2_runner_on_ok', task_result)
if task_result._task._role is not None and result[0] in ('host_task_ok', 'host_task_failed'):
# lookup the role in the ROLE_CACHE to make sure we're dealing
# with the correct object and mark it as executed
for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[task_result._task._role._role_name]):
if role_obj._uuid == task_result._task._role._uuid:
role_obj._had_task_run[host.name] = True
ret_results.append(task_result) self._pending_results -= 1
if original_host.name in self._blocked_hosts:
del self._blocked_hosts[original_host.name]
elif result[0] == 'add_host': # If this is a role task, mark the parent role as being run (if
result_item = result[1] # the task was ok or failed, but not skipped or unreachable)
new_host_info = result_item.get('add_host', dict()) if original_task._role is not None and role_ran:
# lookup the role in the ROLE_CACHE to make sure we're dealing
# with the correct object and mark it as executed
for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[original_task._role._role_name]):
if role_obj._uuid == original_task._role._uuid:
role_obj._had_task_run[original_host.name] = True
self._add_host(new_host_info, iterator) ret_results.append(task_result)
elif result[0] == 'add_group':
host = get_original_host(result[1])
result_item = result[2]
self._add_group(host, result_item)
elif result[0] == 'notify_handler':
task_result = result[1]
handler_name = result[2]
original_host = get_original_host(task_result._host)
original_task = iterator.get_original_task(original_host, task_result._task)
def search_handler_blocks(handler_name, handler_blocks):
for handler_block in handler_blocks:
for handler_task in handler_block.block:
handler_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, task=handler_task)
templar = Templar(loader=self._loader, variables=handler_vars)
try:
# first we check with the full result of get_name(), which may
# include the role name (if the handler is from a role). If that
# is not found, we resort to the simple name field, which doesn't
# have anything extra added to it.
target_handler_name = templar.template(handler_task.name)
if target_handler_name == handler_name:
return handler_task
else:
target_handler_name = templar.template(handler_task.get_name())
if target_handler_name == handler_name:
return handler_task
except (UndefinedError, AnsibleUndefinedVariable):
# We skip this handler due to the fact that it may be using
# a variable in the name that was conditionally included via
# set_fact or some other method, and we don't want to error
# out unnecessarily
continue
return None
# Find the handler using the above helper. First we look up the
# dependency chain of the current task (if it's from a role), otherwise
# we just look through the list of handlers in the current play/all
# roles and use the first one that matches the notify name
if handler_name in self._listening_handlers:
for listening_handler_name in self._listening_handlers[handler_name]:
listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers)
if listening_handler is None:
raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name)
if original_host not in self._notified_handlers[listening_handler]:
self._notified_handlers[listening_handler].append(original_host)
display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,))
else:
target_handler = search_handler_blocks(handler_name, iterator._play.handlers)
if target_handler is None:
raise AnsibleError("The requested handler '%s' was not found in any of the known handlers" % handler_name)
if target_handler in self._notified_handlers:
if original_host not in self._notified_handlers[target_handler]:
self._notified_handlers[target_handler].append(original_host)
# FIXME: should this be a callback?
display.vv("NOTIFIED HANDLER %s" % (handler_name,))
else:
raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name)
elif result[0] == 'register_host_var':
# essentially the same as 'set_host_var' below, however we
# never follow the delegate_to value for registered vars and
# the variable goes in the fact_cache
host = get_original_host(result[1])
task = result[2]
var_value = wrap_var(result[3])
var_name = task.register
if task.run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else:
host_list = [host]
for target_host in host_list:
self._variable_manager.set_nonpersistent_facts(target_host, {var_name: var_value})
elif result[0] in ('set_host_var', 'set_host_facts'):
host = get_original_host(result[1])
task = result[2]
item = result[3]
# find the host we're actually refering too here, which may
# be a host that is not really in inventory at all
if task.delegate_to is not None and task.delegate_facts:
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task)
self.add_tqm_variables(task_vars, play=iterator._play)
loop_var = 'item'
if task.loop_control:
loop_var = task.loop_control.loop_var or 'item'
if item is not None:
task_vars[loop_var] = item
templar = Templar(loader=self._loader, variables=task_vars)
host_name = templar.template(task.delegate_to)
actual_host = self._inventory.get_host(host_name)
if actual_host is None:
actual_host = Host(name=host_name)
else:
actual_host = host
if task.run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else:
host_list = [actual_host]
if result[0] == 'set_host_var':
var_name = result[4]
var_value = result[5]
for target_host in host_list:
self._variable_manager.set_host_variable(target_host, var_name, var_value)
elif result[0] == 'set_host_facts':
facts = result[4]
for target_host in host_list:
if task.action == 'set_fact':
self._variable_manager.set_nonpersistent_facts(target_host, facts.copy())
else:
self._variable_manager.set_host_facts(target_host, facts.copy())
elif result[0].startswith('v2_runner_item') or result[0] == 'v2_runner_retry':
self._tqm.send_callback(result[0], result[1])
elif result[0] == 'v2_on_file_diff':
if self._diff:
self._tqm.send_callback('v2_on_file_diff', result[1])
else:
raise AnsibleError("unknown result message received: %s" % result[0])
except Queue.Empty: except Queue.Empty:
time.sleep(0.005) time.sleep(0.005)
@ -560,9 +574,9 @@ class StrategyBase:
block_list = load_list_of_blocks( block_list = load_list_of_blocks(
data, data,
play=included_file._task._block._play, play=iterator._play,
parent_block=None, parent_block=None,
task_include=included_file._task, task_include=None,
role=included_file._task._role, role=included_file._task._role,
use_handlers=is_handler, use_handlers=is_handler,
loader=self._loader, loader=self._loader,
@ -588,9 +602,9 @@ class StrategyBase:
# set the vars for this task from those specified as params to the include # set the vars for this task from those specified as params to the include
for b in block_list: for b in block_list:
# first make a copy of the including task, so that each has a unique copy to modify # first make a copy of the including task, so that each has a unique copy to modify
b._task_include = b._task_include.copy() b._parent = included_file._task.copy()
# then we create a temporary set of vars to ensure the variable reference is unique # then we create a temporary set of vars to ensure the variable reference is unique
temp_vars = b._task_include.vars.copy() temp_vars = b._parent.vars.copy()
temp_vars.update(included_file._args.copy()) temp_vars.update(included_file._args.copy())
# pop tags out of the include args, if they were specified there, and assign # pop tags out of the include args, if they were specified there, and assign
# them to the include. If the include already had tags specified, we raise an # them to the include. If the include already had tags specified, we raise an
@ -599,12 +613,12 @@ class StrategyBase:
if isinstance(tags, string_types): if isinstance(tags, string_types):
tags = tags.split(',') tags = tags.split(',')
if len(tags) > 0: if len(tags) > 0:
if len(b._task_include.tags) > 0: if len(b._parent.tags) > 0:
raise AnsibleParserError("Include tasks should not specify tags in more than one way (both via args and directly on the task). Mixing tag specify styles is prohibited for whole import hierarchy, not only for single import statement", raise AnsibleParserError("Include tasks should not specify tags in more than one way (both via args and directly on the task). Mixing tag specify styles is prohibited for whole import hierarchy, not only for single import statement",
obj=included_file._task._ds) obj=included_file._task._ds)
display.deprecated("You should not specify tags in the include parameters. All tags should be specified using the task-level option") display.deprecated("You should not specify tags in the include parameters. All tags should be specified using the task-level option")
b._task_include.tags = tags b._parent.tags = tags
b._task_include.vars = temp_vars b._parent.vars = temp_vars
# finally, send the callback and return the list of blocks loaded # finally, send the callback and return the list of blocks loaded
self._tqm.send_callback('v2_playbook_on_include', included_file) self._tqm.send_callback('v2_playbook_on_include', included_file)

View file

@ -146,6 +146,7 @@ class StrategyModule(StrategyBase):
display.warning("Using any_errors_fatal with the free strategy is not supported, as tasks are executed independently on each host") display.warning("Using any_errors_fatal with the free strategy is not supported, as tasks are executed independently on each host")
self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False) self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
self._queue_task(host, task, task_vars, play_context) self._queue_task(host, task, task_vars, play_context)
del task_vars
else: else:
display.debug("%s is blocked, skipping for now" % host_name) display.debug("%s is blocked, skipping for now" % host_name)

View file

@ -253,6 +253,7 @@ class StrategyModule(StrategyBase):
self._blocked_hosts[host.get_name()] = True self._blocked_hosts[host.get_name()] = True
self._queue_task(host, task, task_vars, play_context) self._queue_task(host, task, task_vars, play_context)
del task_vars
# if we're bypassing the host loop, break out now # if we're bypassing the host loop, break out now
if run_once: if run_once:
@ -315,7 +316,7 @@ class StrategyModule(StrategyBase):
final_block = new_block.filter_tagged_tasks(play_context, task_vars) final_block = new_block.filter_tagged_tasks(play_context, task_vars)
display.debug("done filtering new block on tags") display.debug("done filtering new block on tags")
noop_block = Block(parent_block=task._block) noop_block = Block(parent_block=task._parent)
noop_block.block = [noop_task for t in new_block.block] noop_block.block = [noop_task for t in new_block.block]
noop_block.always = [noop_task for t in new_block.always] noop_block.always = [noop_task for t in new_block.always]
noop_block.rescue = [noop_task for t in new_block.rescue] noop_block.rescue = [noop_task for t in new_block.rescue]

View file

@ -237,7 +237,7 @@ class VariableManager:
# sure it sees its defaults above any other roles, as we previously # sure it sees its defaults above any other roles, as we previously
# (v1) made sure each task had a copy of its roles default vars # (v1) made sure each task had a copy of its roles default vars
if task and task._role is not None: if task and task._role is not None:
all_vars = combine_vars(all_vars, task._role.get_default_vars(dep_chain=task._block.get_dep_chain())) all_vars = combine_vars(all_vars, task._role.get_default_vars(dep_chain=task.get_dep_chain()))
if host: if host:
# next, if a host is specified, we load any vars from group_vars # next, if a host is specified, we load any vars from group_vars
@ -334,7 +334,7 @@ class VariableManager:
# vars (which will look at parent blocks/task includes) # vars (which will look at parent blocks/task includes)
if task: if task:
if task._role: if task._role:
all_vars = combine_vars(all_vars, task._role.get_vars(task._block._dep_chain, include_params=False)) all_vars = combine_vars(all_vars, task._role.get_vars(task.get_dep_chain(), include_params=False))
all_vars = combine_vars(all_vars, task.get_vars()) all_vars = combine_vars(all_vars, task.get_vars())
# next, we merge in the vars cache (include vars) and nonpersistent # next, we merge in the vars cache (include vars) and nonpersistent
@ -346,7 +346,7 @@ class VariableManager:
# next, we merge in role params and task include params # next, we merge in role params and task include params
if task: if task:
if task._role: if task._role:
all_vars = combine_vars(all_vars, task._role.get_role_params(task._block.get_dep_chain())) all_vars = combine_vars(all_vars, task._role.get_role_params(task.get_dep_chain()))
# special case for include tasks, where the include params # special case for include tasks, where the include params
# may be specified in the vars field for the task, which should # may be specified in the vars field for the task, which should

View file

@ -121,7 +121,7 @@ class TestPlayIterator(unittest.TestCase):
# lookup up an original task # lookup up an original task
target_task = p._entries[0].tasks[0].block[0] target_task = p._entries[0].tasks[0].block[0]
task_copy = target_task.copy(exclude_block=True) task_copy = target_task.copy(exclude_parent=True)
found_task = itr.get_original_task(hosts[0], task_copy) found_task = itr.get_original_task(hosts[0], task_copy)
self.assertEqual(target_task, found_task) self.assertEqual(target_task, found_task)

View file

@ -139,7 +139,7 @@ class TestTaskExecutor(unittest.TestCase):
mock_host = MagicMock() mock_host = MagicMock()
def _copy(): def _copy(exclude_parent=False, exclude_tasks=False):
new_item = MagicMock() new_item = MagicMock()
return new_item return new_item

View file

@ -19,6 +19,8 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import uuid
from ansible.compat.tests import unittest from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock from ansible.compat.tests.mock import patch, MagicMock
@ -27,6 +29,7 @@ from ansible.plugins.strategy import StrategyBase
from ansible.executor.process.worker import WorkerProcess from ansible.executor.process.worker import WorkerProcess
from ansible.executor.task_queue_manager import TaskQueueManager from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.executor.task_result import TaskResult from ansible.executor.task_result import TaskResult
from ansible.playbook.block import Block
from ansible.playbook.handler import Handler from ansible.playbook.handler import Handler
from ansible.inventory.host import Host from ansible.inventory.host import Host
@ -183,11 +186,6 @@ class TestStrategyBase(unittest.TestCase):
mock_play = MagicMock() mock_play = MagicMock()
mock_iterator = MagicMock()
mock_iterator._play = mock_play
mock_iterator.mark_host_failed.return_value = None
mock_iterator.get_next_task_for_host.return_value = (None, None)
mock_host = MagicMock() mock_host = MagicMock()
mock_host.name = 'test01' mock_host.name = 'test01'
mock_host.vars = dict() mock_host.vars = dict()
@ -196,6 +194,8 @@ class TestStrategyBase(unittest.TestCase):
mock_task = MagicMock() mock_task = MagicMock()
mock_task._role = None mock_task._role = None
mock_task.ignore_errors = False mock_task.ignore_errors = False
mock_task._uuid = uuid.uuid4()
mock_task.loop = None
mock_handler_task = MagicMock(Handler) mock_handler_task = MagicMock(Handler)
mock_handler_task.name = 'test handler' mock_handler_task.name = 'test handler'
@ -203,8 +203,16 @@ class TestStrategyBase(unittest.TestCase):
mock_handler_task.get_name.return_value = "test handler" mock_handler_task.get_name.return_value = "test handler"
mock_handler_task.has_triggered.return_value = False mock_handler_task.has_triggered.return_value = False
mock_iterator = MagicMock()
mock_iterator._play = mock_play
mock_iterator.mark_host_failed.return_value = None
mock_iterator.get_next_task_for_host.return_value = (None, None)
mock_iterator.get_original_task.return_value = mock_task
mock_handler_block = MagicMock() mock_handler_block = MagicMock()
mock_handler_block.block = [mock_handler_task] mock_handler_block.block = [mock_handler_task]
mock_handler_block.rescue = []
mock_handler_block.always = []
mock_play.handlers = [mock_handler_block] mock_play.handlers = [mock_handler_block]
mock_tqm._notified_handlers = {mock_handler_task: []} mock_tqm._notified_handlers = {mock_handler_task: []}
@ -245,8 +253,8 @@ class TestStrategyBase(unittest.TestCase):
results = strategy_base._wait_on_pending_results(iterator=mock_iterator) results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 0) self.assertEqual(len(results), 0)
task_result = TaskResult(host=mock_host, task=mock_task, return_data=dict(changed=True)) task_result = TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True))
queue_items.append(('host_task_ok', task_result)) queue_items.append(task_result)
strategy_base._blocked_hosts['test01'] = True strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1 strategy_base._pending_results = 1
results = strategy_base._wait_on_pending_results(iterator=mock_iterator) results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
@ -255,10 +263,11 @@ class TestStrategyBase(unittest.TestCase):
self.assertEqual(strategy_base._pending_results, 0) self.assertEqual(strategy_base._pending_results, 0)
self.assertNotIn('test01', strategy_base._blocked_hosts) self.assertNotIn('test01', strategy_base._blocked_hosts)
task_result = TaskResult(host=mock_host, task=mock_task, return_data='{"failed":true}') task_result = TaskResult(host=mock_host.name, task=mock_task._uuid, return_data='{"failed":true}')
queue_items.append(('host_task_failed', task_result)) queue_items.append(task_result)
strategy_base._blocked_hosts['test01'] = True strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1 strategy_base._pending_results = 1
mock_iterator.is_failed.return_value = True
results = strategy_base._process_pending_results(iterator=mock_iterator) results = strategy_base._process_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result) self.assertEqual(results[0], task_result)
@ -266,9 +275,10 @@ class TestStrategyBase(unittest.TestCase):
self.assertNotIn('test01', strategy_base._blocked_hosts) self.assertNotIn('test01', strategy_base._blocked_hosts)
self.assertIn('test01', mock_tqm._failed_hosts) self.assertIn('test01', mock_tqm._failed_hosts)
del mock_tqm._failed_hosts['test01'] del mock_tqm._failed_hosts['test01']
mock_iterator.is_failed.return_value = False
task_result = TaskResult(host=mock_host, task=mock_task, return_data='{}') task_result = TaskResult(host=mock_host.name, task=mock_task._uuid, return_data='{"unreachable": true}')
queue_items.append(('host_unreachable', task_result)) queue_items.append(task_result)
strategy_base._blocked_hosts['test01'] = True strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1 strategy_base._pending_results = 1
results = strategy_base._wait_on_pending_results(iterator=mock_iterator) results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
@ -279,8 +289,8 @@ class TestStrategyBase(unittest.TestCase):
self.assertIn('test01', mock_tqm._unreachable_hosts) self.assertIn('test01', mock_tqm._unreachable_hosts)
del mock_tqm._unreachable_hosts['test01'] del mock_tqm._unreachable_hosts['test01']
task_result = TaskResult(host=mock_host, task=mock_task, return_data='{}') task_result = TaskResult(host=mock_host.name, task=mock_task._uuid, return_data='{"skipped": true}')
queue_items.append(('host_task_skipped', task_result)) queue_items.append(task_result)
strategy_base._blocked_hosts['test01'] = True strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1 strategy_base._pending_results = 1
results = strategy_base._wait_on_pending_results(iterator=mock_iterator) results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
@ -289,42 +299,44 @@ class TestStrategyBase(unittest.TestCase):
self.assertEqual(strategy_base._pending_results, 0) self.assertEqual(strategy_base._pending_results, 0)
self.assertNotIn('test01', strategy_base._blocked_hosts) self.assertNotIn('test01', strategy_base._blocked_hosts)
queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_host=dict(host_name='newhost01', new_groups=['foo']))))
strategy_base._blocked_hosts['test01'] = True strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1 strategy_base._pending_results = 1
queue_items.append(('add_host', dict(add_host=dict(host_name='newhost01', new_groups=['foo']))))
results = strategy_base._process_pending_results(iterator=mock_iterator) results = strategy_base._process_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 0) self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 1) self.assertEqual(strategy_base._pending_results, 0)
self.assertIn('test01', strategy_base._blocked_hosts) self.assertNotIn('test01', strategy_base._blocked_hosts)
queue_items.append(('add_group', mock_host, dict(add_group=dict(group_name='foo')))) queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_group=dict(group_name='foo'))))
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
results = strategy_base._process_pending_results(iterator=mock_iterator) results = strategy_base._process_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 0) self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 1) self.assertEqual(strategy_base._pending_results, 0)
self.assertIn('test01', strategy_base._blocked_hosts) self.assertNotIn('test01', strategy_base._blocked_hosts)
task_result = TaskResult(host=mock_host, task=mock_task, return_data=dict(changed=True)) queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True, _ansible_notify=['test handler'])))
queue_items.append(('notify_handler', task_result, 'test handler')) strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
results = strategy_base._process_pending_results(iterator=mock_iterator) results = strategy_base._process_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 0) self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 1) self.assertEqual(strategy_base._pending_results, 0)
self.assertIn('test01', strategy_base._blocked_hosts) self.assertNotIn('test01', strategy_base._blocked_hosts)
self.assertIn(mock_handler_task, strategy_base._notified_handlers) self.assertIn(mock_handler_task, strategy_base._notified_handlers)
self.assertIn(mock_host, strategy_base._notified_handlers[mock_handler_task]) self.assertIn(mock_host, strategy_base._notified_handlers[mock_handler_task])
queue_items.append(('set_host_var', mock_host, mock_task, None, 'foo', 'bar')) #queue_items.append(('set_host_var', mock_host, mock_task, None, 'foo', 'bar'))
results = strategy_base._process_pending_results(iterator=mock_iterator) #results = strategy_base._process_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 0) #self.assertEqual(len(results), 0)
self.assertEqual(strategy_base._pending_results, 1) #self.assertEqual(strategy_base._pending_results, 1)
queue_items.append(('set_host_facts', mock_host, mock_task, None, 'foo', dict())) #queue_items.append(('set_host_facts', mock_host, mock_task, None, 'foo', dict()))
results = strategy_base._process_pending_results(iterator=mock_iterator) #results = strategy_base._process_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 0) #self.assertEqual(len(results), 0)
self.assertEqual(strategy_base._pending_results, 1) #self.assertEqual(strategy_base._pending_results, 1)
queue_items.append(('bad')) #queue_items.append(('bad'))
self.assertRaises(AnsibleError, strategy_base._process_pending_results, iterator=mock_iterator) #self.assertRaises(AnsibleError, strategy_base._process_pending_results, iterator=mock_iterator)
def test_strategy_base_load_included_file(self): def test_strategy_base_load_included_file(self):
fake_loader = DictDataLoader({ fake_loader = DictDataLoader({
@ -377,6 +389,8 @@ class TestStrategyBase(unittest.TestCase):
mock_handler_task.action = 'foo' mock_handler_task.action = 'foo'
mock_handler_task.get_name.return_value = "test handler" mock_handler_task.get_name.return_value = "test handler"
mock_handler_task.has_triggered.return_value = False mock_handler_task.has_triggered.return_value = False
mock_handler_task.listen = None
mock_handler_task._role = None
mock_handler = MagicMock() mock_handler = MagicMock()
mock_handler.block = [mock_handler_task] mock_handler.block = [mock_handler_task]
@ -395,8 +409,9 @@ class TestStrategyBase(unittest.TestCase):
mock_var_mgr = MagicMock() mock_var_mgr = MagicMock()
mock_var_mgr.get_vars.return_value = dict() mock_var_mgr.get_vars.return_value = dict()
mock_iterator = MagicMock mock_iterator = MagicMock()
mock_iterator._play = mock_play mock_iterator._play = mock_play
mock_iterator.get_original_task.return_value = mock_handler_task
fake_loader = DictDataLoader() fake_loader = DictDataLoader()
mock_options = MagicMock() mock_options = MagicMock()
@ -420,7 +435,7 @@ class TestStrategyBase(unittest.TestCase):
strategy_base._notified_handlers = {mock_handler_task: [mock_host]} strategy_base._notified_handlers = {mock_handler_task: [mock_host]}
task_result = TaskResult(Host('host01'), Handler(), dict(changed=False)) task_result = TaskResult(Host('host01'), Handler(), dict(changed=False))
tqm._final_q.put(('host_task_ok', task_result)) tqm._final_q.put(task_result)
result = strategy_base.run_handlers(iterator=mock_iterator, play_context=mock_play_context) result = strategy_base.run_handlers(iterator=mock_iterator, play_context=mock_play_context)
finally: finally: