diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index ae90a29e93..b027d487ea 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -63,6 +63,7 @@ class TaskQueueManager: self._callbacks_loaded = False self._callback_plugins = [] self._start_at_done = False + self._result_prc = None # make sure the module path (if specified) is parsed and # added to the module_loader object @@ -92,17 +93,14 @@ class TaskQueueManager: # plugins for inter-process locking. self._connection_lockfile = tempfile.TemporaryFile() - #FIXME: should this move to 'run' and get serial and play pattern applied as limiter? - # Treat "forks" config parameter as max value. Only create number of workers - # equal to number of hosts in inventory if less than max value. - num_workers = min(self._options.forks, len(self._inventory.list_hosts())) - self._workers = [] - for i in range(num_workers): + + def _initialize_workers(self, num): + for i in range(num): main_q = multiprocessing.Queue() rslt_q = multiprocessing.Queue() - prc = WorkerProcess(self, main_q, rslt_q, loader) + prc = WorkerProcess(self, main_q, rslt_q, self._loader) prc.start() self._workers.append((prc, main_q, rslt_q)) @@ -178,6 +176,12 @@ class TaskQueueManager: are done with the current task). ''' + # Treat "forks" config parameter as max value. Only create number of workers + # equal to number of hosts in inventory if less than max value. + contenders = [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))] + contenders = [ v for v in contenders if v is not None and v > 0 ] + self._initialize_workers(min( contenders )) + if not self._callbacks_loaded: self.load_callbacks() @@ -218,21 +222,25 @@ class TaskQueueManager: if getattr(self._options, 'start_at_task', None) is not None and play_context.start_at_task is None: self._start_at_done = True - # and run the play using the strategy - return strategy.run(iterator, play_context) + # and run the play using the strategy and cleanup on way out + play_return = strategy.run(iterator, play_context) + self._cleanup_workers() + return play_return def cleanup(self): self._display.debug("RUNNING CLEANUP") - self.terminate() - self._final_q.close() - self._result_prc.terminate() + self._cleanup_workers() - for (worker_prc, main_q, rslt_q) in self._workers: - rslt_q.close() - main_q.close() - worker_prc.terminate() + def _cleanup_workers(self): + if self._result_prc: + self._result_prc.terminate() + + for (worker_prc, main_q, rslt_q) in self._workers: + rslt_q.close() + main_q.close() + worker_prc.terminate() def clear_failed_hosts(self): self._failed_hosts = dict()