mirror of
				https://github.com/ansible-collections/community.general.git
				synced 2025-10-25 05:23:58 -07:00 
			
		
		
		
	
		
			
				
	
	
		
			271 lines
		
	
	
	
		
			9.4 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			271 lines
		
	
	
	
		
			9.4 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| # (c) 2012, Michael DeHaan <michael.dehaan@gmail.com>
 | |
| #
 | |
| # This file is part of Ansible
 | |
| #
 | |
| # Ansible is free software: you can redistribute it and/or modify
 | |
| # it under the terms of the GNU General Public License as published by
 | |
| # the Free Software Foundation, either version 3 of the License, or
 | |
| # (at your option) any later version.
 | |
| #
 | |
| # Ansible is distributed in the hope that it will be useful,
 | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| # GNU General Public License for more details.
 | |
| #
 | |
| # You should have received a copy of the GNU General Public License
 | |
| # along with Ansible.  If not, see <http://www.gnu.org/licenses/>.
 | |
| #
 | |
| 
 | |
| # this script is for testing modules without running through the
 | |
| # entire guts of ansible, and is very helpful for when developing
 | |
| # modules
 | |
| #
 | |
| # example:
 | |
| #    test-module -m ../library/commands/command -a "/bin/sleep 3"
 | |
| #    test-module -m ../library/system/service -a "name=httpd ensure=restarted"
 | |
| #    test-module -m ../library/system/service -a "name=httpd ensure=restarted" --debugger /usr/bin/pdb
 | |
| #    test-module -m ../library/file/lineinfile -a "dest=/etc/exports line='/srv/home hostname1(rw,sync)'" --check
 | |
| #    test-module -m ../library/commands/command -a "echo hello" -n -o "test_hello"
 | |
| 
 | |
| import optparse
 | |
| import os
 | |
| import subprocess
 | |
| import sys
 | |
| import traceback
 | |
| import shutil
 | |
| 
 | |
| import ansible.utils.vars as utils_vars
 | |
| from ansible.parsing.dataloader import DataLoader
 | |
| from ansible.parsing.utils.jsonify import jsonify
 | |
| from ansible.parsing.splitter import parse_kv
 | |
| import ansible.executor.module_common as module_common
 | |
| import ansible.constants as C
 | |
| from ansible.module_utils._text import to_text
 | |
| 
 | |
| try:
 | |
|     import json
 | |
| except ImportError:
 | |
|     import simplejson as json
 | |
| 
 | |
| 
 | |
| def parse():
 | |
|     """parse command line
 | |
| 
 | |
|     :return : (options, args)"""
 | |
|     parser = optparse.OptionParser()
 | |
| 
 | |
|     parser.usage = "%prog -[options] (-h for help)"
 | |
| 
 | |
|     parser.add_option('-m', '--module-path', dest='module_path',
 | |
|         help="REQUIRED: full path of module source to execute")
 | |
|     parser.add_option('-a', '--args', dest='module_args', default="",
 | |
|         help="module argument string")
 | |
|     parser.add_option('-D', '--debugger', dest='debugger',
 | |
|         help="path to python debugger (e.g. /usr/bin/pdb)")
 | |
|     parser.add_option('-I', '--interpreter', dest='interpreter',
 | |
|         help="path to interpreter to use for this module (e.g. ansible_python_interpreter=/usr/bin/python)",
 | |
|         metavar='INTERPRETER_TYPE=INTERPRETER_PATH')
 | |
|     parser.add_option('-c', '--check', dest='check', action='store_true',
 | |
|         help="run the module in check mode")
 | |
|     parser.add_option('-n', '--noexecute', dest='execute', action='store_false',
 | |
|         default=True, help="do not run the resulting module")
 | |
|     parser.add_option('-o', '--output', dest='filename',
 | |
|         help="Filename for resulting module",
 | |
|         default="~/.ansible_module_generated")
 | |
|     options, args = parser.parse_args()
 | |
|     if not options.module_path:
 | |
|         parser.print_help()
 | |
|         sys.exit(1)
 | |
|     else:
 | |
|         return options, args
 | |
| 
 | |
| 
 | |
| def write_argsfile(argstring, json=False):
 | |
|     """ Write args to a file for old-style module's use. """
 | |
|     argspath = os.path.expanduser("~/.ansible_test_module_arguments")
 | |
|     argsfile = open(argspath, 'w')
 | |
|     if json:
 | |
|         args = parse_kv(argstring)
 | |
|         argstring = jsonify(args)
 | |
|     argsfile.write(argstring)
 | |
|     argsfile.close()
 | |
|     return argspath
 | |
| 
 | |
| 
 | |
| def get_interpreters(interpreter):
 | |
|     result = dict()
 | |
|     if interpreter:
 | |
|         if '=' not in interpreter:
 | |
|             print("interpreter must by in the form of ansible_python_interpreter=/usr/bin/python")
 | |
|             sys.exit(1)
 | |
|         interpreter_type, interpreter_path = interpreter.split('=')
 | |
|         if not interpreter_type.startswith('ansible_'):
 | |
|             interpreter_type = 'ansible_%s' % interpreter_type
 | |
|         if not interpreter_type.endswith('_interpreter'):
 | |
|             interpreter_type = '%s_interpreter' % interpreter_type
 | |
|         result[interpreter_type] = interpreter_path
 | |
|     return result
 | |
| 
 | |
| 
 | |
| def boilerplate_module(modfile, args, interpreters, check, destfile):
 | |
|     """ simulate what ansible does with new style modules """
 | |
| 
 | |
|     # module_fh = open(modfile)
 | |
|     # module_data = module_fh.read()
 | |
|     # module_fh.close()
 | |
| 
 | |
|     # replacer = module_common.ModuleReplacer()
 | |
|     loader = DataLoader()
 | |
| 
 | |
|     # included_boilerplate = module_data.find(module_common.REPLACER) != -1 or module_data.find("import ansible.module_utils") != -1
 | |
| 
 | |
|     complex_args = {}
 | |
| 
 | |
|     # default selinux fs list is pass in as _ansible_selinux_special_fs arg
 | |
|     complex_args['_ansible_selinux_special_fs'] = C.DEFAULT_SELINUX_SPECIAL_FS
 | |
| 
 | |
|     if args.startswith("@"):
 | |
|         # Argument is a YAML file (JSON is a subset of YAML)
 | |
|         complex_args = utils_vars.combine_vars(complex_args, loader.load_from_file(args[1:]))
 | |
|         args=''
 | |
|     elif args.startswith("{"):
 | |
|         # Argument is a YAML document (not a file)
 | |
|         complex_args = utils_vars.combine_vars(complex_args, loader.load(args))
 | |
|         args=''
 | |
| 
 | |
|     if args:
 | |
|         parsed_args = parse_kv(args)
 | |
|         complex_args = utils_vars.combine_vars(complex_args, parsed_args)
 | |
| 
 | |
|     task_vars = interpreters
 | |
| 
 | |
|     if check:
 | |
|         complex_args['_ansible_check_mode'] = True
 | |
| 
 | |
|     modname = os.path.basename(modfile)
 | |
|     modname = os.path.splitext(modname)[0]
 | |
|     (module_data, module_style, shebang) = module_common.modify_module(
 | |
|         modname,
 | |
|         modfile,
 | |
|         complex_args,
 | |
|         task_vars=task_vars
 | |
|     )
 | |
| 
 | |
|     if module_style == 'new' and 'ANSIBALLZ_WRAPPER = True' in module_data:
 | |
|         module_style = 'ansiballz'
 | |
| 
 | |
|     modfile2_path = os.path.expanduser(destfile)
 | |
|     print("* including generated source, if any, saving to: %s" % modfile2_path)
 | |
|     if module_style not in ('ansiballz', 'old'):
 | |
|         print("* this may offset any line numbers in tracebacks/debuggers!")
 | |
|     modfile2 = open(modfile2_path, 'wb')
 | |
|     modfile2.write(module_data)
 | |
|     modfile2.close()
 | |
|     modfile = modfile2_path
 | |
| 
 | |
|     return (modfile2_path, modname, module_style)
 | |
| 
 | |
| 
 | |
| def ansiballz_setup(modfile, modname, interpreters):
 | |
|     os.system("chmod +x %s" % modfile)
 | |
| 
 | |
|     if 'ansible_python_interpreter' in interpreters:
 | |
|         command = [interpreters['ansible_python_interpreter']]
 | |
|     else:
 | |
|         command = []
 | |
|     command.extend([modfile, 'explode'])
 | |
| 
 | |
|     cmd = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | |
|     out, err = cmd.communicate()
 | |
|     out, err = to_text(out, errors='surrogate_or_strict'), to_text(err)
 | |
|     lines = out.splitlines()
 | |
|     if len(lines) != 2 or 'Module expanded into' not in lines[0]:
 | |
|         print("*" * 35)
 | |
|         print("INVALID OUTPUT FROM ANSIBALLZ MODULE WRAPPER")
 | |
|         print(out)
 | |
|         sys.exit(err)
 | |
|     debug_dir = lines[1].strip()
 | |
| 
 | |
|     argsfile = os.path.join(debug_dir, 'args')
 | |
|     modfile = os.path.join(debug_dir, 'ansible_module_%s.py' % modname)
 | |
| 
 | |
|     print("* ansiballz module detected; extracted module source to: %s" % debug_dir)
 | |
|     return modfile, argsfile
 | |
| 
 | |
| 
 | |
| def runtest(modfile, argspath, modname, module_style, interpreters):
 | |
|     """Test run a module, piping it's output for reporting."""
 | |
|     invoke = ""
 | |
|     if module_style == 'ansiballz':
 | |
|         modfile, argspath = ansiballz_setup(modfile, modname, interpreters)
 | |
|         if 'ansible_python_interpreter' in interpreters:
 | |
|             invoke = "%s " % interpreters['ansible_python_interpreter']
 | |
| 
 | |
|     os.system("chmod +x %s" % modfile)
 | |
| 
 | |
|     invoke = "%s%s" % (invoke, modfile)
 | |
|     if argspath is not None:
 | |
|         invoke = "%s %s" % (invoke, argspath)
 | |
| 
 | |
|     cmd = subprocess.Popen(invoke, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | |
|     (out, err) = cmd.communicate()
 | |
|     out, err = to_text(out), to_text(err)
 | |
| 
 | |
|     try:
 | |
|         print("*" * 35)
 | |
|         print("RAW OUTPUT")
 | |
|         print(out)
 | |
|         print(err)
 | |
|         results = json.loads(out)
 | |
|     except:
 | |
|         print("*" * 35)
 | |
|         print("INVALID OUTPUT FORMAT")
 | |
|         print(out)
 | |
|         traceback.print_exc()
 | |
|         sys.exit(1)
 | |
| 
 | |
|     print("*" * 35)
 | |
|     print("PARSED OUTPUT")
 | |
|     print(jsonify(results,format=True))
 | |
| 
 | |
| 
 | |
| def rundebug(debugger, modfile, argspath, modname, module_style, interpreters):
 | |
|     """Run interactively with console debugger."""
 | |
| 
 | |
|     if module_style == 'ansiballz':
 | |
|         modfile, argspath = ansiballz_setup(modfile, modname, interpreters)
 | |
| 
 | |
|     if argspath is not None:
 | |
|         subprocess.call("%s %s %s" % (debugger, modfile, argspath), shell=True)
 | |
|     else:
 | |
|         subprocess.call("%s %s" % (debugger, modfile), shell=True)
 | |
| 
 | |
| 
 | |
| def main():
 | |
| 
 | |
|     options, args = parse()
 | |
|     interpreters = get_interpreters(options.interpreter)
 | |
|     (modfile, modname, module_style) = boilerplate_module(options.module_path, options.module_args, interpreters, options.check, options.filename)
 | |
| 
 | |
|     argspath = None
 | |
|     if module_style not in ('new', 'ansiballz'):
 | |
|         if module_style in ('non_native_want_json', 'binary'):
 | |
|             argspath = write_argsfile(options.module_args, json=True)
 | |
|         elif module_style == 'old':
 | |
|             argspath = write_argsfile(options.module_args, json=False)
 | |
|         else:
 | |
|             raise Exception("internal error, unexpected module style: %s" % module_style)
 | |
| 
 | |
|     if options.execute:
 | |
|         if options.debugger:
 | |
|             rundebug(options.debugger, modfile, argspath, modname, module_style, interpreters)
 | |
|         else:
 | |
|             runtest(modfile, argspath, modname, module_style, interpreters)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     try:
 | |
|         main()
 | |
|     finally:
 | |
|         shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
 |