code cleanup and reoorg, renamed vars and functions to actual purpose reneabled logging of steps

This commit is contained in:
Brian Coca 2015-10-31 21:35:48 -04:00 committed by Matt Clay
parent 3906fd426b
commit c8a7c25468

View file

@ -27,15 +27,20 @@ import shlex
import os import os
import subprocess import subprocess
import sys import sys
import datetime
import traceback import traceback
import signal import signal
import time import time
import syslog import syslog
syslog.openlog('ansible-%s' % os.path.basename(__file__))
syslog.syslog(syslog.LOG_NOTICE, 'Invoked with %s' % " ".join(sys.argv[1:]))
def notice(msg):
syslog.syslog(syslog.LOG_NOTICE, msg)
def daemonize_self(): def daemonize_self():
# daemonizing code: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012 # daemonizing code: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
# logger.info("cobblerd started")
try: try:
pid = os.fork() pid = os.fork()
if pid > 0: if pid > 0:
@ -65,50 +70,21 @@ def daemonize_self():
os.dup2(dev_null.fileno(), sys.stdout.fileno()) os.dup2(dev_null.fileno(), sys.stdout.fileno())
os.dup2(dev_null.fileno(), sys.stderr.fileno()) os.dup2(dev_null.fileno(), sys.stderr.fileno())
if len(sys.argv) < 3:
print json.dumps({
"failed" : True,
"msg" : "usage: async_wrapper <jid> <time_limit> <modulescript> <argsfile>. Humans, do not call directly!"
})
sys.exit(1)
jid = "%s.%d" % (sys.argv[1], os.getpid()) def _run_module(wrapped_cmd, jid, job_path):
time_limit = sys.argv[2]
wrapped_module = sys.argv[3]
argsfile = sys.argv[4]
cmd = "%s %s" % (wrapped_module, argsfile)
syslog.openlog('ansible-%s' % os.path.basename(__file__)) jobfile = open(job_path, "w")
syslog.syslog(syslog.LOG_NOTICE, 'Invoked with %s' % " ".join(sys.argv[1:])) jobfile.write(json.dumps({ "started" : 1, "ansible_job_id" : jid }))
jobfile.close()
# setup logging directory jobfile = open(job_path, "w")
logdir = os.path.expanduser("~/.ansible_async")
log_path = os.path.join(logdir, jid)
if not os.path.exists(logdir):
try:
os.makedirs(logdir)
except:
print json.dumps({
"failed" : 1,
"msg" : "could not create: %s" % logdir
})
def _run_command(wrapped_cmd, jid, log_path):
logfile = open(log_path, "w")
logfile.write(json.dumps({ "started" : 1, "ansible_job_id" : jid }))
logfile.close()
logfile = open(log_path, "w")
result = {} result = {}
outdata = '' outdata = ''
try: try:
cmd = shlex.split(wrapped_cmd) cmd = shlex.split(wrapped_cmd)
script = subprocess.Popen(cmd, shell=False, script = subprocess.Popen(cmd, shell=False, stdin=None, stdout=jobfile, stderr=jobfile)
stdin=None, stdout=logfile, stderr=logfile)
script.communicate() script.communicate()
outdata = file(log_path).read() outdata = file(job_path).read()
result = json.loads(outdata) result = json.loads(outdata)
except (OSError, IOError), e: except (OSError, IOError), e:
@ -118,32 +94,54 @@ def _run_command(wrapped_cmd, jid, log_path):
"msg": str(e), "msg": str(e),
} }
result['ansible_job_id'] = jid result['ansible_job_id'] = jid
logfile.write(json.dumps(result)) jobfile.write(json.dumps(result))
except: except:
result = { result = {
"failed" : 1, "failed" : 1,
"cmd" : wrapped_cmd, "cmd" : wrapped_cmd,
"data" : outdata, # temporary debug only "data" : outdata, # temporary notice only
"msg" : traceback.format_exc() "msg" : traceback.format_exc()
} }
result['ansible_job_id'] = jid result['ansible_job_id'] = jid
logfile.write(json.dumps(result)) jobfile.write(json.dumps(result))
logfile.close() jobfile.close()
# immediately exit this process, leaving an orphaned process
# running which immediately forks a supervisory timing process
#import logging ####################
#import logging.handlers ## main ##
####################
if __name__ == '__main__':
#logger = logging.getLogger("ansible_async") if len(sys.argv) < 3:
#logger.setLevel(logging.WARNING) print json.dumps({
#logger.addHandler( logging.handlers.SysLogHandler("/dev/log") ) "failed" : True,
def debug(msg): "msg" : "usage: async_wrapper <jid> <time_limit> <modulescript> <argsfile>. Humans, do not call directly!"
#logger.warning(msg) })
pass sys.exit(1)
try: jid = "%s.%d" % (sys.argv[1], os.getpid())
time_limit = sys.argv[2]
wrapped_module = sys.argv[3]
argsfile = sys.argv[4]
cmd = "%s %s" % (wrapped_module, argsfile)
step = 5
# setup job output directory
jobdir = os.path.expanduser("~/.ansible_async")
job_path = os.path.join(jobdir, jid)
if not os.path.exists(jobdir):
try:
os.makedirs(jobdir)
except:
print json.dumps({
"failed" : 1,
"msg" : "could not create: %s" % jobdir
})
# immediately exit this process, leaving an orphaned process
# running which immediately forks a supervisory timing process
try:
pid = os.fork() pid = os.fork()
if pid: if pid:
# Notify the overlord that the async process started # Notify the overlord that the async process started
@ -152,10 +150,10 @@ try:
# to initialize PRIOR to ansible trying to clean up the launch directory (and argsfile) # to initialize PRIOR to ansible trying to clean up the launch directory (and argsfile)
# this probably could be done with some IPC later. Modules should always read # this probably could be done with some IPC later. Modules should always read
# the argsfile at the very first start of their execution anyway # the argsfile at the very first start of their execution anyway
time.sleep(1) notice("Return async_wrapper task started.")
debug("Return async_wrapper task started.") print json.dumps({ "started" : 1, "ansible_job_id" : jid, "results_file" : job_path })
print json.dumps({ "started" : 1, "ansible_job_id" : jid, "results_file" : log_path })
sys.stdout.flush() sys.stdout.flush()
time.sleep(1)
sys.exit(0) sys.exit(0)
else: else:
# The actual wrapper process # The actual wrapper process
@ -164,7 +162,7 @@ try:
daemonize_self() daemonize_self()
# we are now daemonized, create a supervisory process # we are now daemonized, create a supervisory process
debug("Starting module and watcher") notice("Starting module and watcher")
sub_pid = os.fork() sub_pid = os.fork()
if sub_pid: if sub_pid:
@ -174,27 +172,31 @@ try:
# set the child process group id to kill all children # set the child process group id to kill all children
os.setpgid(sub_pid, sub_pid) os.setpgid(sub_pid, sub_pid)
debug("Start watching %s (%s)"%(sub_pid, remaining)) notice("Start watching %s (%s)"%(sub_pid, remaining))
time.sleep(5) time.sleep(step)
while os.waitpid(sub_pid, os.WNOHANG) == (0, 0): while os.waitpid(sub_pid, os.WNOHANG) == (0, 0):
debug("%s still running (%s)"%(sub_pid, remaining)) notice("%s still running (%s)"%(sub_pid, remaining))
time.sleep(5) time.sleep(step)
remaining = remaining - 5 remaining = remaining - step
if remaining <= 0: if remaining <= 0:
debug("Now killing %s"%(sub_pid)) notice("Now killing %s"%(sub_pid))
os.killpg(sub_pid, signal.SIGKILL) os.killpg(sub_pid, signal.SIGKILL)
debug("Sent kill to group %s"%sub_pid) notice("Sent kill to group %s"%sub_pid)
time.sleep(1) time.sleep(1)
sys.exit(0) sys.exit(0)
debug("Done in kid B.") notice("Done in kid B.")
os._exit(0) sys.exit(0)
else: else:
# the child process runs the actual module # the child process runs the actual module
debug("Start module (%s)"%os.getpid()) notice("Start module (%s)"%os.getpid())
_run_command(cmd, jid, log_path) _run_module(cmd, jid, job_path)
debug("Module complete (%s)"%os.getpid()) notice("Module complete (%s)"%os.getpid())
sys.exit(0) sys.exit(0)
except Exception, err: except Exception, err:
debug("error: %s"%(err)) notice("error: %s"%(err))
raise err print json.dumps({
"failed" : True,
"msg" : "FATAL ERROR: %s" % str(err)
})
sys.exit(1)