From e04dab904a7308f0fd65d8689254aa2867b9c9c3 Mon Sep 17 00:00:00 2001 From: Michael DeHaan Date: Fri, 26 Oct 2012 18:11:38 -0400 Subject: [PATCH] Use previous proven multiprocessing logic as the simplification didn't have the same Ctrl-C handling and may be subject to race issues, though still don't pass Runner to each. Still seems performant. --- lib/ansible/runner/__init__.py | 47 ++++++++++++++++++++++++++-------- test/TestRunner.py | 1 - 2 files changed, 37 insertions(+), 11 deletions(-) diff --git a/lib/ansible/runner/__init__.py b/lib/ansible/runner/__init__.py index 03856c4dbe..c65b07937b 100644 --- a/lib/ansible/runner/__init__.py +++ b/lib/ansible/runner/__init__.py @@ -58,13 +58,22 @@ multiprocessing_runner = None ################################################ -def _executor_hook(host): +def _executor_hook(job_queue, result_queue): + # attempt workaround of https://github.com/newsapps/beeswithmachineguns/issues/17 # this function also not present in CentOS 6 if HAS_ATFORK: atfork() + signal.signal(signal.SIGINT, signal.SIG_IGN) - return multiprocessing_runner._executor(host) + while not job_queue.empty(): + try: + host = job_queue.get(block=False) + result_queue.put(multiprocessing_runner._executor(host)) + except Queue.Empty: + pass + except: + traceback.print_exc() class HostVars(dict): ''' A special view of setup_cache that adds values from the inventory when needed. ''' @@ -560,20 +569,38 @@ class Runner(object): # ***************************************************** + def _parallel_exec(self, hosts): ''' handles mulitprocessing when more than 1 fork is required ''' - # experiment for 0.9, we may revert this if it causes - # problems -- used to cause problems when Runner was a passed - # argument but may not be anymore. + manager = multiprocessing.Manager() + job_queue = manager.Queue() + for host in hosts: + job_queue.put(host) + result_queue = manager.Queue() + + workers = [] + for i in range(self.forks): + prc = multiprocessing.Process(target=_executor_hook, + args=(job_queue, result_queue)) + prc.start() + workers.append(prc) - p = multiprocessing.Pool(self.forks) try: - result = p.map(_executor_hook, hosts) + for worker in workers: + worker.join() except KeyboardInterrupt: - p.terminate() - raise errors.AnsibleError("Interrupted") - return result + for worker in workers: + worker.terminate() + worker.join() + + results = [] + try: + while not result_queue.empty(): + results.append(result_queue.get(block=False)) + except socket.error: + raise errors.AnsibleError("") + return results # ***************************************************** diff --git a/test/TestRunner.py b/test/TestRunner.py index a91894c0b2..895b2164c4 100644 --- a/test/TestRunner.py +++ b/test/TestRunner.py @@ -132,7 +132,6 @@ class TestRunner(unittest.TestCase): result = self._run('command', [ "/usr/bin/this_does_not_exist", "splat" ]) assert 'msg' in result assert 'failed' in result - assert 'rc' not in result result = self._run('shell', [ "/bin/echo", "$HOME" ]) assert 'failed' not in result