mirror of
				https://github.com/ansible-collections/community.general.git
				synced 2025-10-24 21:14:00 -07:00 
			
		
		
		
	
		
			
				
	
	
		
			177 lines
		
	
	
	
		
			4.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			177 lines
		
	
	
	
		
			4.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| import sys
 | |
| import time
 | |
| import Queue
 | |
| import traceback
 | |
| import multiprocessing
 | |
| 
 | |
| from ansible.inventory import Inventory
 | |
| from ansible.inventory.host import Host
 | |
| from ansible.playbook.play import Play
 | |
| from ansible.playbook.play_context import PlayContext
 | |
| from ansible.playbook.task import Task
 | |
| from ansible.executor.task_executor import TaskExecutor
 | |
| from ansible.executor.task_result import TaskResult
 | |
| from ansible.parsing.dataloader import DataLoader
 | |
| from ansible.vars import VariableManager
 | |
| 
 | |
| from ansible.utils.display import Display
 | |
| display = Display()
 | |
| debug = display.debug
 | |
| 
 | |
| NUM_WORKERS = 20
 | |
| NUM_HOSTS   = 1778
 | |
| NUM_TASKS   = 1
 | |
| 
 | |
| def results(final_q, workers):
 | |
|    cur_worker = 0
 | |
|    def _read_worker_result(cur_worker):
 | |
|       result = None
 | |
|       starting_point = cur_worker
 | |
|       while True:
 | |
|          (worker_prc, main_q, res_q) = workers[cur_worker]
 | |
|          cur_worker += 1
 | |
|          if cur_worker >= len(workers):
 | |
|             cur_worker = 0
 | |
| 
 | |
|          try:
 | |
|             if not res_q.empty():
 | |
|                debug("worker %d has data to read" % cur_worker)
 | |
|                result = res_q.get()
 | |
|                debug("got a result from worker %d: %s" % (cur_worker, result))
 | |
|                break
 | |
|          except:
 | |
|             pass
 | |
| 
 | |
|          if cur_worker == starting_point:
 | |
|             break
 | |
| 
 | |
|       return (result, cur_worker)
 | |
| 
 | |
|    while True:
 | |
|       result = None
 | |
|       try:
 | |
|          (result, cur_worker) = _read_worker_result(cur_worker)
 | |
|          if result is None:
 | |
|             time.sleep(0.01)
 | |
|             continue
 | |
|          final_q.put(result, block=False)
 | |
|       except (IOError, EOFError, KeyboardInterrupt) as e:
 | |
|          debug("got a breaking error: %s" % e)
 | |
|          break
 | |
|       except Exception as e:
 | |
|          debug("EXCEPTION DURING RESULTS PROCESSING: %s" % e)
 | |
|          traceback.print_exc()
 | |
|          break
 | |
| 
 | |
| def worker(main_q, res_q, loader):
 | |
|    while True:
 | |
|       task = None
 | |
|       try:
 | |
|          if not main_q.empty():
 | |
|             (host, task, task_vars, conn_info) = main_q.get(block=False)
 | |
|             executor_result = TaskExecutor(host, task, task_vars, conn_info, loader).run()
 | |
|             debug("executor result: %s" % executor_result)
 | |
|             task_result = TaskResult(host, task, executor_result)
 | |
|             res_q.put(task_result)
 | |
|          else:
 | |
|             time.sleep(0.01)
 | |
|       except Queue.Empty:
 | |
|          pass
 | |
|       except (IOError, EOFError, KeyboardInterrupt) as e:
 | |
|          debug("got a breaking error: %s" % e)
 | |
|          break
 | |
|       except Exception as e:
 | |
|          debug("EXCEPTION DURING WORKER PROCESSING: %s" % e)
 | |
|          traceback.print_exc()
 | |
|          break
 | |
| 
 | |
| loader = DataLoader()
 | |
| 
 | |
| workers = []
 | |
| for i in range(NUM_WORKERS):
 | |
|    main_q = multiprocessing.Queue()
 | |
|    res_q  = multiprocessing.Queue()
 | |
|    worker_p = multiprocessing.Process(target=worker, args=(main_q, res_q, loader))
 | |
|    worker_p.start()
 | |
|    workers.append((worker_p, main_q, res_q))
 | |
| 
 | |
| res_q = multiprocessing.Queue()
 | |
| res_p = multiprocessing.Process(target=results, args=(res_q, workers))
 | |
| res_p.start()
 | |
| 
 | |
| def send_data(obj):
 | |
|    global cur_worker
 | |
|    global workers
 | |
|    global pending_results
 | |
| 
 | |
|    (w_proc, main_q, wrkr_q) = workers[cur_worker]
 | |
|    cur_worker += 1
 | |
|    if cur_worker >= len(workers):
 | |
|       cur_worker = 0
 | |
| 
 | |
|    pending_results += 1
 | |
|    main_q.put(obj, block=False)
 | |
|  
 | |
| def _process_pending_results():
 | |
|    global res_q
 | |
|    global pending_results
 | |
|    
 | |
|    while not res_q.empty():
 | |
|       try:
 | |
|          result = res_q.get(block=False)
 | |
|          debug("got final result: %s" % (result,))
 | |
|          pending_results -= 1
 | |
|       except Queue.Empty:
 | |
|          pass
 | |
| 
 | |
| def _wait_on_pending_results():
 | |
|    global pending_results
 | |
|    while pending_results > 0:
 | |
|       debug("waiting for pending results (%d left)" % pending_results)
 | |
|       _process_pending_results()
 | |
|       time.sleep(0.01)
 | |
| 
 | |
| 
 | |
| debug("starting")
 | |
| cur_worker      = 0
 | |
| pending_results = 0
 | |
| 
 | |
| 
 | |
| var_manager = VariableManager()
 | |
| 
 | |
| debug("loading inventory")
 | |
| inventory = Inventory(host_list='/tmp/med_inventory', loader=loader, variable_manager=var_manager)
 | |
| hosts = inventory.get_hosts()[:]
 | |
| debug("done loading inventory")
 | |
| 
 | |
| play_context = PlayContext()
 | |
| play_context.connection = 'local'
 | |
| 
 | |
| for i in range(NUM_TASKS):
 | |
|    #for j in range(NUM_HOSTS):
 | |
|    for h in hosts:
 | |
|       debug("queuing %s %d" % (h, i))
 | |
|       #h = Host(name="host%06d" % j)
 | |
|       t = Task().load(dict(name="task %d" % (i,), debug="msg='hello from %s, %d'" % (h,i)))
 | |
|       #t = Task().load(dict(name="task %d" % (i,), ping=""))
 | |
|       #task_vars = var_manager.get_vars(loader=loader, host=h, task=t)
 | |
|       task_vars = dict()
 | |
|       new_t = t.copy()
 | |
|       new_t.post_validate(task_vars)
 | |
|       send_data((h, t, task_vars, play_context))
 | |
|       debug("done queuing %s %d" % (h, i))
 | |
|       _process_pending_results()
 | |
|    debug("waiting for the results to drain...")
 | |
|    _wait_on_pending_results()
 | |
| 
 | |
| res_q.close()
 | |
| res_p.terminate()
 | |
| 
 | |
| for (w_p, main_q, wrkr_q) in workers:
 | |
|    main_q.close()
 | |
|    wrkr_q.close()
 | |
|    w_p.terminate()
 | |
| 
 | |
| debug("done")
 |