mirror of
https://github.com/ansible-collections/community.general.git
synced 2025-04-26 12:21:26 -07:00
Merge branch 'samples-to-test' of https://github.com/dagwieers/ansible into dagwieers-samples-to-test
This commit is contained in:
commit
67c32ac30a
69 changed files with 0 additions and 0 deletions
177
test/samples/multi_queues.py
Normal file
177
test/samples/multi_queues.py
Normal file
|
@ -0,0 +1,177 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import sys
|
||||
import time
|
||||
import Queue
|
||||
import traceback
|
||||
import multiprocessing
|
||||
|
||||
from ansible.inventory import Inventory
|
||||
from ansible.inventory.host import Host
|
||||
from ansible.playbook.play import Play
|
||||
from ansible.playbook.play_context import PlayContext
|
||||
from ansible.playbook.task import Task
|
||||
from ansible.executor.task_executor import TaskExecutor
|
||||
from ansible.executor.task_result import TaskResult
|
||||
from ansible.parsing.dataloader import DataLoader
|
||||
from ansible.vars import VariableManager
|
||||
|
||||
from ansible.utils.display import Display
|
||||
display = Display()
|
||||
debug = display.debug
|
||||
|
||||
NUM_WORKERS = 20
|
||||
NUM_HOSTS = 1778
|
||||
NUM_TASKS = 1
|
||||
|
||||
def results(final_q, workers):
|
||||
cur_worker = 0
|
||||
def _read_worker_result(cur_worker):
|
||||
result = None
|
||||
starting_point = cur_worker
|
||||
while True:
|
||||
(worker_prc, main_q, res_q) = workers[cur_worker]
|
||||
cur_worker += 1
|
||||
if cur_worker >= len(workers):
|
||||
cur_worker = 0
|
||||
|
||||
try:
|
||||
if not res_q.empty():
|
||||
debug("worker %d has data to read" % cur_worker)
|
||||
result = res_q.get()
|
||||
debug("got a result from worker %d: %s" % (cur_worker, result))
|
||||
break
|
||||
except:
|
||||
pass
|
||||
|
||||
if cur_worker == starting_point:
|
||||
break
|
||||
|
||||
return (result, cur_worker)
|
||||
|
||||
while True:
|
||||
result = None
|
||||
try:
|
||||
(result, cur_worker) = _read_worker_result(cur_worker)
|
||||
if result is None:
|
||||
time.sleep(0.01)
|
||||
continue
|
||||
final_q.put(result, block=False)
|
||||
except (IOError, EOFError, KeyboardInterrupt) as e:
|
||||
debug("got a breaking error: %s" % e)
|
||||
break
|
||||
except Exception as e:
|
||||
debug("EXCEPTION DURING RESULTS PROCESSING: %s" % e)
|
||||
traceback.print_exc()
|
||||
break
|
||||
|
||||
def worker(main_q, res_q, loader):
|
||||
while True:
|
||||
task = None
|
||||
try:
|
||||
if not main_q.empty():
|
||||
(host, task, task_vars, conn_info) = main_q.get(block=False)
|
||||
executor_result = TaskExecutor(host, task, task_vars, conn_info, loader).run()
|
||||
debug("executor result: %s" % executor_result)
|
||||
task_result = TaskResult(host, task, executor_result)
|
||||
res_q.put(task_result)
|
||||
else:
|
||||
time.sleep(0.01)
|
||||
except Queue.Empty:
|
||||
pass
|
||||
except (IOError, EOFError, KeyboardInterrupt) as e:
|
||||
debug("got a breaking error: %s" % e)
|
||||
break
|
||||
except Exception as e:
|
||||
debug("EXCEPTION DURING WORKER PROCESSING: %s" % e)
|
||||
traceback.print_exc()
|
||||
break
|
||||
|
||||
loader = DataLoader()
|
||||
|
||||
workers = []
|
||||
for i in range(NUM_WORKERS):
|
||||
main_q = multiprocessing.Queue()
|
||||
res_q = multiprocessing.Queue()
|
||||
worker_p = multiprocessing.Process(target=worker, args=(main_q, res_q, loader))
|
||||
worker_p.start()
|
||||
workers.append((worker_p, main_q, res_q))
|
||||
|
||||
res_q = multiprocessing.Queue()
|
||||
res_p = multiprocessing.Process(target=results, args=(res_q, workers))
|
||||
res_p.start()
|
||||
|
||||
def send_data(obj):
|
||||
global cur_worker
|
||||
global workers
|
||||
global pending_results
|
||||
|
||||
(w_proc, main_q, wrkr_q) = workers[cur_worker]
|
||||
cur_worker += 1
|
||||
if cur_worker >= len(workers):
|
||||
cur_worker = 0
|
||||
|
||||
pending_results += 1
|
||||
main_q.put(obj, block=False)
|
||||
|
||||
def _process_pending_results():
|
||||
global res_q
|
||||
global pending_results
|
||||
|
||||
while not res_q.empty():
|
||||
try:
|
||||
result = res_q.get(block=False)
|
||||
debug("got final result: %s" % (result,))
|
||||
pending_results -= 1
|
||||
except Queue.Empty:
|
||||
pass
|
||||
|
||||
def _wait_on_pending_results():
|
||||
global pending_results
|
||||
while pending_results > 0:
|
||||
debug("waiting for pending results (%d left)" % pending_results)
|
||||
_process_pending_results()
|
||||
time.sleep(0.01)
|
||||
|
||||
|
||||
debug("starting")
|
||||
cur_worker = 0
|
||||
pending_results = 0
|
||||
|
||||
|
||||
var_manager = VariableManager()
|
||||
|
||||
debug("loading inventory")
|
||||
inventory = Inventory(host_list='/tmp/med_inventory', loader=loader, variable_manager=var_manager)
|
||||
hosts = inventory.get_hosts()[:]
|
||||
debug("done loading inventory")
|
||||
|
||||
play_context = PlayContext()
|
||||
play_context.connection = 'local'
|
||||
|
||||
for i in range(NUM_TASKS):
|
||||
#for j in range(NUM_HOSTS):
|
||||
for h in hosts:
|
||||
debug("queuing %s %d" % (h, i))
|
||||
#h = Host(name="host%06d" % j)
|
||||
t = Task().load(dict(name="task %d" % (i,), debug="msg='hello from %s, %d'" % (h,i)))
|
||||
#t = Task().load(dict(name="task %d" % (i,), ping=""))
|
||||
#task_vars = var_manager.get_vars(loader=loader, host=h, task=t)
|
||||
task_vars = dict()
|
||||
new_t = t.copy()
|
||||
new_t.post_validate(task_vars)
|
||||
send_data((h, t, task_vars, play_context))
|
||||
debug("done queuing %s %d" % (h, i))
|
||||
_process_pending_results()
|
||||
debug("waiting for the results to drain...")
|
||||
_wait_on_pending_results()
|
||||
|
||||
res_q.close()
|
||||
res_p.terminate()
|
||||
|
||||
for (w_p, main_q, wrkr_q) in workers:
|
||||
main_q.close()
|
||||
wrkr_q.close()
|
||||
w_p.terminate()
|
||||
|
||||
debug("done")
|
Loading…
Add table
Add a link
Reference in a new issue