mirror of
				https://github.com/ansible-collections/community.general.git
				synced 2025-10-24 21:14:00 -07:00 
			
		
		
		
	
		
			
				
	
	
		
			537 lines
		
	
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			537 lines
		
	
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
| #!/usr/bin/env python
 | |
| # (c) 2016-2017, Toshio Kuratomi <tkuratomi@ansible.com>
 | |
| #
 | |
| # This file is part of Ansible
 | |
| #
 | |
| # Ansible is free software: you can redistribute it and/or modify
 | |
| # it under the terms of the GNU General Public License as published by
 | |
| # the Free Software Foundation, either version 3 of the License, or
 | |
| # (at your option) any later version.
 | |
| #
 | |
| # Ansible is distributed in the hope that it will be useful,
 | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| # GNU General Public License for more details.
 | |
| #
 | |
| # You should have received a copy of the GNU General Public License
 | |
| # along with Ansible.  If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| # Make coding more python3-ish
 | |
| from __future__ import (absolute_import, division, print_function)
 | |
| __metaclass__ = type
 | |
| 
 | |
| import ast
 | |
| import csv
 | |
| import os
 | |
| import sys
 | |
| from collections import defaultdict
 | |
| from distutils.version import StrictVersion
 | |
| from pprint import pformat, pprint
 | |
| 
 | |
| from ansible.parsing.metadata import DEFAULT_METADATA, ParseError, extract_metadata
 | |
| from ansible.plugins.loader import module_loader
 | |
| 
 | |
| 
 | |
| # There's a few files that are not new-style modules.  Have to blacklist them
 | |
| NONMODULE_PY_FILES = frozenset(('async_wrapper.py',))
 | |
| NONMODULE_MODULE_NAMES = frozenset(os.path.splitext(p)[0] for p in NONMODULE_PY_FILES)
 | |
| 
 | |
| 
 | |
| class MissingModuleError(Exception):
 | |
|     """Thrown when unable to find a plugin"""
 | |
|     pass
 | |
| 
 | |
| 
 | |
| def usage():
 | |
|     print("""Usage:
 | |
|       metadata-tool.py report [--version X]
 | |
|       metadata-tool.py add [--version X] [--overwrite] CSVFILE
 | |
|       metadata-tool.py add-default [--version X] [--overwrite]
 | |
|       medatada-tool.py upgrade [--version X]""")
 | |
|     sys.exit(1)
 | |
| 
 | |
| 
 | |
| def parse_args(arg_string):
 | |
|     if len(arg_string) < 1:
 | |
|         usage()
 | |
| 
 | |
|     action = arg_string[0]
 | |
| 
 | |
|     version = None
 | |
|     if '--version' in arg_string:
 | |
|         version_location = arg_string.index('--version')
 | |
|         arg_string.pop(version_location)
 | |
|         version = arg_string.pop(version_location)
 | |
| 
 | |
|     overwrite = False
 | |
|     if '--overwrite' in arg_string:
 | |
|         overwrite = True
 | |
|         arg_string.remove('--overwrite')
 | |
| 
 | |
|     csvfile = None
 | |
|     if len(arg_string) == 2:
 | |
|         csvfile = arg_string[1]
 | |
|     elif len(arg_string) > 2:
 | |
|         usage()
 | |
| 
 | |
|     return action, {'version': version, 'overwrite': overwrite, 'csvfile': csvfile}
 | |
| 
 | |
| 
 | |
| def find_documentation(module_data):
 | |
|     """Find the DOCUMENTATION metadata for a module file"""
 | |
|     start_line = -1
 | |
|     mod_ast_tree = ast.parse(module_data)
 | |
|     for child in mod_ast_tree.body:
 | |
|         if isinstance(child, ast.Assign):
 | |
|             for target in child.targets:
 | |
|                 if target.id == 'DOCUMENTATION':
 | |
|                     start_line = child.lineno - 1
 | |
|                     break
 | |
| 
 | |
|     return start_line
 | |
| 
 | |
| 
 | |
| def remove_metadata(module_data, start_line, start_col, end_line, end_col):
 | |
|     """Remove a section of a module file"""
 | |
|     lines = module_data.split('\n')
 | |
|     new_lines = lines[:start_line]
 | |
|     if start_col != 0:
 | |
|         new_lines.append(lines[start_line][:start_col])
 | |
| 
 | |
|     next_line = lines[end_line]
 | |
|     if len(next_line) - 1 != end_col:
 | |
|         new_lines.append(next_line[end_col:])
 | |
| 
 | |
|     if len(lines) > end_line:
 | |
|         new_lines.extend(lines[end_line + 1:])
 | |
|     return '\n'.join(new_lines)
 | |
| 
 | |
| 
 | |
| def insert_metadata(module_data, new_metadata, insertion_line, targets=('ANSIBLE_METADATA',)):
 | |
|     """Insert a new set of metadata at a specified line"""
 | |
|     assignments = ' = '.join(targets)
 | |
|     pretty_metadata = pformat(new_metadata, width=1).split('\n')
 | |
| 
 | |
|     new_lines = []
 | |
|     new_lines.append('{0} = {1}'.format(assignments, pretty_metadata[0]))
 | |
| 
 | |
|     if len(pretty_metadata) > 1:
 | |
|         for line in pretty_metadata[1:]:
 | |
|             new_lines.append('{0}{1}'.format(' ' * (len(assignments) - 1 + len(' = {')), line))
 | |
| 
 | |
|     old_lines = module_data.split('\n')
 | |
|     lines = old_lines[:insertion_line] + new_lines + old_lines[insertion_line:]
 | |
|     return '\n'.join(lines)
 | |
| 
 | |
| 
 | |
| def parse_assigned_metadata_initial(csvfile):
 | |
|     """
 | |
|     Fields:
 | |
|         :0: Module name
 | |
|         :1: Core (x if so)
 | |
|         :2: Extras (x if so)
 | |
|         :3: Category
 | |
|         :4: Supported/SLA
 | |
|         :5: Curated
 | |
|         :6: Stable
 | |
|         :7: Deprecated
 | |
|         :8: Notes
 | |
|         :9: Team Notes
 | |
|         :10: Notes 2
 | |
|         :11: final supported_by field
 | |
|     """
 | |
|     with open(csvfile, 'rb') as f:
 | |
|         for record in csv.reader(f):
 | |
|             module = record[0]
 | |
| 
 | |
|             if record[12] == 'core':
 | |
|                 supported_by = 'core'
 | |
|             elif record[12] == 'curated':
 | |
|                 supported_by = 'curated'
 | |
|             elif record[12] == 'community':
 | |
|                 supported_by = 'community'
 | |
|             else:
 | |
|                 print('Module %s has no supported_by field.  Using community' % record[0])
 | |
|                 supported_by = 'community'
 | |
|                 supported_by = DEFAULT_METADATA['supported_by']
 | |
| 
 | |
|             status = []
 | |
|             if record[6]:
 | |
|                 status.append('stableinterface')
 | |
|             if record[7]:
 | |
|                 status.append('deprecated')
 | |
|             if not status:
 | |
|                 status.extend(DEFAULT_METADATA['status'])
 | |
| 
 | |
|             yield (module, {'version': DEFAULT_METADATA['metadata_version'], 'supported_by': supported_by, 'status': status})
 | |
| 
 | |
| 
 | |
| def parse_assigned_metadata(csvfile):
 | |
|     """
 | |
|     Fields:
 | |
|         :0: Module name
 | |
|         :1: supported_by  string.  One of the valid support fields
 | |
|             core, community, certified, network
 | |
|         :2: stableinterface
 | |
|         :3: preview
 | |
|         :4: deprecated
 | |
|         :5: removed
 | |
| 
 | |
|         https://docs.ansible.com/ansible/latest/dev_guide/developing_modules_documenting.html#ansible-metadata-block
 | |
|     """
 | |
|     with open(csvfile, 'rb') as f:
 | |
|         for record in csv.reader(f):
 | |
|             module = record[0]
 | |
|             supported_by = record[1]
 | |
| 
 | |
|             status = []
 | |
|             if record[2]:
 | |
|                 status.append('stableinterface')
 | |
|             if record[4]:
 | |
|                 status.append('deprecated')
 | |
|             if record[5]:
 | |
|                 status.append('removed')
 | |
|             if not status or record[3]:
 | |
|                 status.append('preview')
 | |
| 
 | |
|             yield (module, {'metadata_version': '1.1', 'supported_by': supported_by, 'status': status})
 | |
| 
 | |
| 
 | |
| def write_metadata(filename, new_metadata, version=None, overwrite=False):
 | |
|     with open(filename, 'rb') as f:
 | |
|         module_data = f.read()
 | |
| 
 | |
|     try:
 | |
|         current_metadata, start_line, start_col, end_line, end_col, targets = \
 | |
|             extract_metadata(module_data=module_data, offsets=True)
 | |
|     except SyntaxError:
 | |
|         if filename.endswith('.py'):
 | |
|             raise
 | |
|         # Probably non-python modules.  These should all have python
 | |
|         # documentation files where we can place the data
 | |
|         raise ParseError('Could not add metadata to {0}'.format(filename))
 | |
| 
 | |
|     if current_metadata is None:
 | |
|         # No current metadata so we can just add it
 | |
|         start_line = find_documentation(module_data)
 | |
|         if start_line < 0:
 | |
|             if os.path.basename(filename) in NONMODULE_PY_FILES:
 | |
|                 # These aren't new-style modules
 | |
|                 return
 | |
| 
 | |
|             raise Exception('Module file {0} had no ANSIBLE_METADATA or DOCUMENTATION'.format(filename))
 | |
| 
 | |
|         module_data = insert_metadata(module_data, new_metadata, start_line, targets=('ANSIBLE_METADATA',))
 | |
| 
 | |
|     elif overwrite or (version is not None and ('metadata_version' not in current_metadata or
 | |
|                                                 StrictVersion(current_metadata['metadata_version']) < StrictVersion(version))):
 | |
|         # Current metadata that we do not want.  Remove the current
 | |
|         # metadata and put the new version in its place
 | |
|         module_data = remove_metadata(module_data, start_line, start_col, end_line, end_col)
 | |
|         module_data = insert_metadata(module_data, new_metadata, start_line, targets=targets)
 | |
| 
 | |
|     else:
 | |
|         # Current metadata and we don't want to overwrite it
 | |
|         return
 | |
| 
 | |
|     # Save the new version of the module
 | |
|     with open(filename, 'wb') as f:
 | |
|         f.write(module_data)
 | |
| 
 | |
| 
 | |
| def return_metadata(plugins):
 | |
|     """Get the metadata for all modules
 | |
| 
 | |
|     Handle duplicate module names
 | |
| 
 | |
|     :arg plugins: List of plugins to look for
 | |
|     :returns: Mapping of plugin name to metadata dictionary
 | |
|     """
 | |
|     metadata = {}
 | |
|     for name, filename in plugins:
 | |
|         # There may be several files for a module (if it is written in another
 | |
|         # language, for instance) but only one of them (the .py file) should
 | |
|         # contain the metadata.
 | |
|         if name not in metadata or metadata[name] is not None:
 | |
|             with open(filename, 'rb') as f:
 | |
|                 module_data = f.read()
 | |
|             metadata[name] = extract_metadata(module_data=module_data, offsets=True)[0]
 | |
|     return metadata
 | |
| 
 | |
| 
 | |
| def metadata_summary(plugins, version=None):
 | |
|     """Compile information about the metadata status for a list of modules
 | |
| 
 | |
|     :arg plugins: List of plugins to look for.  Each entry in the list is
 | |
|         a tuple of (module name, full path to module)
 | |
|     :kwarg version: If given, make sure the modules have this version of
 | |
|         metadata or higher.
 | |
|     :returns: A tuple consisting of a list of modules with no metadata at the
 | |
|         required version and a list of files that have metadata at the
 | |
|         required version.
 | |
|     """
 | |
|     no_metadata = {}
 | |
|     has_metadata = {}
 | |
|     supported_by = defaultdict(set)
 | |
|     status = defaultdict(set)
 | |
|     requested_version = StrictVersion(version)
 | |
| 
 | |
|     all_mods_metadata = return_metadata(plugins)
 | |
|     for name, filename in plugins:
 | |
|         # Does the module have metadata?
 | |
|         if name not in no_metadata and name not in has_metadata:
 | |
|             metadata = all_mods_metadata[name]
 | |
|             if metadata is None:
 | |
|                 no_metadata[name] = filename
 | |
|             elif version is not None and ('metadata_version' not in metadata or StrictVersion(metadata['metadata_version']) < requested_version):
 | |
|                 no_metadata[name] = filename
 | |
|             else:
 | |
|                 has_metadata[name] = filename
 | |
| 
 | |
|         # What categories does the plugin belong in?
 | |
|         if all_mods_metadata[name] is None:
 | |
|             # No metadata for this module.  Use the default metadata
 | |
|             supported_by[DEFAULT_METADATA['supported_by']].add(filename)
 | |
|             status[DEFAULT_METADATA['status'][0]].add(filename)
 | |
|         else:
 | |
|             supported_by[all_mods_metadata[name]['supported_by']].add(filename)
 | |
|             for one_status in all_mods_metadata[name]['status']:
 | |
|                 status[one_status].add(filename)
 | |
| 
 | |
|     return list(no_metadata.values()), list(has_metadata.values()), supported_by, status
 | |
| 
 | |
| # Filters to convert between metadata versions
 | |
| 
 | |
| 
 | |
| def convert_metadata_pre_1_0_to_1_0(metadata):
 | |
|     """
 | |
|     Convert pre-1.0 to 1.0 metadata format
 | |
| 
 | |
|     :arg metadata: The old metadata
 | |
|     :returns: The new metadata
 | |
| 
 | |
|     Changes from pre-1.0 to 1.0:
 | |
|     * ``version`` field renamed to ``metadata_version``
 | |
|     * ``supported_by`` field value ``unmaintained`` has been removed (change to
 | |
|       ``community`` and let an external list track whether a module is unmaintained)
 | |
|     * ``supported_by`` field value ``committer`` has been renamed to ``curated``
 | |
|     """
 | |
|     new_metadata = {'metadata_version': '1.0',
 | |
|                     'supported_by': metadata['supported_by'],
 | |
|                     'status': metadata['status']
 | |
|                     }
 | |
|     if new_metadata['supported_by'] == 'unmaintained':
 | |
|         new_metadata['supported_by'] = 'community'
 | |
|     elif new_metadata['supported_by'] == 'committer':
 | |
|         new_metadata['supported_by'] = 'curated'
 | |
| 
 | |
|     return new_metadata
 | |
| 
 | |
| 
 | |
| def convert_metadata_1_0_to_1_1(metadata):
 | |
|     """
 | |
|     Convert 1.0 to 1.1 metadata format
 | |
| 
 | |
|     :arg metadata: The old metadata
 | |
|     :returns: The new metadata
 | |
| 
 | |
|     Changes from 1.0 to 1.1:
 | |
| 
 | |
|     * ``supported_by`` field value ``curated`` has been removed
 | |
|     * ``supported_by`` field value ``certified`` has been added
 | |
|     * ``supported_by`` field value ``network`` has been added
 | |
|     """
 | |
|     new_metadata = {'metadata_version': '1.1',
 | |
|                     'supported_by': metadata['supported_by'],
 | |
|                     'status': metadata['status']
 | |
|                     }
 | |
|     if new_metadata['supported_by'] == 'unmaintained':
 | |
|         new_metadata['supported_by'] = 'community'
 | |
|     elif new_metadata['supported_by'] == 'curated':
 | |
|         new_metadata['supported_by'] = 'certified'
 | |
| 
 | |
|     return new_metadata
 | |
| 
 | |
| # Subcommands
 | |
| 
 | |
| 
 | |
| def add_from_csv(csv_file, version=None, overwrite=False):
 | |
|     """Implement the subcommand to add metadata from a csv file
 | |
|     """
 | |
|     # Add metadata for everything from the CSV file
 | |
|     diagnostic_messages = []
 | |
|     for module_name, new_metadata in parse_assigned_metadata(csv_file):
 | |
|         filename = module_loader.find_plugin(module_name, mod_type='.py')
 | |
|         if filename is None:
 | |
|             diagnostic_messages.append('Unable to find the module file for {0}'.format(module_name))
 | |
|             continue
 | |
| 
 | |
|         try:
 | |
|             write_metadata(filename, new_metadata, version, overwrite)
 | |
|         except ParseError as e:
 | |
|             diagnostic_messages.append(e.args[0])
 | |
|             continue
 | |
| 
 | |
|     if diagnostic_messages:
 | |
|         pprint(diagnostic_messages)
 | |
| 
 | |
|     return 0
 | |
| 
 | |
| 
 | |
| def add_default(version=None, overwrite=False):
 | |
|     """Implement the subcommand to add default metadata to modules
 | |
| 
 | |
|     Add the default metadata to any plugin which lacks it.
 | |
|     :kwarg version: If given, the metadata must be at least this version.
 | |
|         Otherwise, treat the module as not having existing metadata.
 | |
|     :kwarg overwrite: If True, overwrite any existing metadata.  Otherwise,
 | |
|         do not modify files which have metadata at an appropriate version
 | |
|     """
 | |
|     # List of all plugins
 | |
|     plugins = module_loader.all(path_only=True)
 | |
|     plugins = ((os.path.splitext((os.path.basename(p)))[0], p) for p in plugins)
 | |
|     plugins = (p for p in plugins if p[0] not in NONMODULE_MODULE_NAMES)
 | |
| 
 | |
|     # Iterate through each plugin
 | |
|     processed = set()
 | |
|     diagnostic_messages = []
 | |
|     for name, filename in (info for info in plugins if info[0] not in processed):
 | |
|         try:
 | |
|             write_metadata(filename, DEFAULT_METADATA, version, overwrite)
 | |
|         except ParseError as e:
 | |
|             diagnostic_messages.append(e.args[0])
 | |
|             continue
 | |
|         processed.add(name)
 | |
| 
 | |
|     if diagnostic_messages:
 | |
|         pprint(diagnostic_messages)
 | |
| 
 | |
|     return 0
 | |
| 
 | |
| 
 | |
| def upgrade_metadata(version=None):
 | |
|     """Implement the subcommand to upgrade the default metadata in modules.
 | |
| 
 | |
|     :kwarg version: If given, the version of the metadata to upgrade to.  If
 | |
|         not given, upgrade to the latest format version.
 | |
|     """
 | |
|     if version is None:
 | |
|         # Number larger than any of the defined metadata formats.
 | |
|         version = 9999999
 | |
|     requested_version = StrictVersion(version)
 | |
| 
 | |
|     # List all plugins
 | |
|     plugins = module_loader.all(path_only=True)
 | |
|     plugins = ((os.path.splitext((os.path.basename(p)))[0], p) for p in plugins)
 | |
|     plugins = (p for p in plugins if p[0] not in NONMODULE_MODULE_NAMES)
 | |
| 
 | |
|     processed = set()
 | |
|     diagnostic_messages = []
 | |
|     for name, filename in (info for info in plugins if info[0] not in processed):
 | |
|         # For each plugin, read the existing metadata
 | |
|         with open(filename, 'rb') as f:
 | |
|             module_data = f.read()
 | |
|         metadata = extract_metadata(module_data=module_data, offsets=True)[0]
 | |
| 
 | |
|         # If the metadata isn't the requested version, convert it to the new
 | |
|         # version
 | |
|         if 'metadata_version' not in metadata or metadata['metadata_version'] != version:
 | |
|             #
 | |
|             # With each iteration of metadata, add a new conditional to
 | |
|             # upgrade from the previous version
 | |
|             #
 | |
| 
 | |
|             if 'metadata_version' not in metadata:
 | |
|                 # First version, pre-1.0 final metadata
 | |
|                 metadata = convert_metadata_pre_1_0_to_1_0(metadata)
 | |
| 
 | |
|             if metadata['metadata_version'] == '1.0' and StrictVersion('1.0') < requested_version:
 | |
|                 metadata = convert_metadata_1_0_to_1_1(metadata)
 | |
| 
 | |
|             if metadata['metadata_version'] == '1.1' and StrictVersion('1.1') < requested_version:
 | |
|                 # 1.1 version => XXX.  We don't yet have anything beyond 1.1
 | |
|                 # so there's nothing here
 | |
|                 pass
 | |
| 
 | |
|             # Replace the existing metadata with the new format
 | |
|             try:
 | |
|                 write_metadata(filename, metadata, version, overwrite=True)
 | |
|             except ParseError as e:
 | |
|                 diagnostic_messages.append(e.args[0])
 | |
|                 continue
 | |
| 
 | |
|         processed.add(name)
 | |
| 
 | |
|     if diagnostic_messages:
 | |
|         pprint(diagnostic_messages)
 | |
| 
 | |
|     return 0
 | |
| 
 | |
| 
 | |
| def report(version=None):
 | |
|     """Implement the report subcommand
 | |
| 
 | |
|     Print out all the modules that have metadata and all the ones that do not.
 | |
| 
 | |
|     :kwarg version: If given, the metadata must be at least this version.
 | |
|         Otherwise return it as not having metadata
 | |
|     """
 | |
|     # List of all plugins
 | |
|     plugins = module_loader.all(path_only=True)
 | |
|     plugins = ((os.path.splitext((os.path.basename(p)))[0], p) for p in plugins)
 | |
|     plugins = (p for p in plugins if p[0] not in NONMODULE_MODULE_NAMES)
 | |
|     plugins = list(plugins)
 | |
| 
 | |
|     no_metadata, has_metadata, support, status = metadata_summary(plugins, version=version)
 | |
| 
 | |
|     print('== Has metadata ==')
 | |
|     pprint(sorted(has_metadata))
 | |
|     print('')
 | |
| 
 | |
|     print('== Has no metadata ==')
 | |
|     pprint(sorted(no_metadata))
 | |
|     print('')
 | |
| 
 | |
|     print('== Supported by core ==')
 | |
|     pprint(sorted(support['core']))
 | |
|     print('== Supported by value certified ==')
 | |
|     pprint(sorted(support['certified']))
 | |
|     print('== Supported by value network ==')
 | |
|     pprint(sorted(support['network']))
 | |
|     print('== Supported by community ==')
 | |
|     pprint(sorted(support['community']))
 | |
|     print('')
 | |
| 
 | |
|     print('== Status: stableinterface ==')
 | |
|     pprint(sorted(status['stableinterface']))
 | |
|     print('== Status: preview ==')
 | |
|     pprint(sorted(status['preview']))
 | |
|     print('== Status: deprecated ==')
 | |
|     pprint(sorted(status['deprecated']))
 | |
|     print('== Status: removed ==')
 | |
|     pprint(sorted(status['removed']))
 | |
|     print('')
 | |
| 
 | |
|     print('== Summary ==')
 | |
|     print('No Metadata: {0}             Has Metadata: {1}'.format(len(no_metadata), len(has_metadata)))
 | |
|     print('Support level: core: {0}   community: {1}   certified: {2}   network: {3}'.format(len(support['core']),
 | |
|           len(support['community']), len(support['certified']), len(support['network'])))
 | |
|     print('Status StableInterface: {0} Status Preview: {1}            Status Deprecated: {2}      Status Removed: {3}'.format(len(status['stableinterface']),
 | |
|           len(status['preview']), len(status['deprecated']), len(status['removed'])))
 | |
| 
 | |
|     return 0
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     action, args = parse_args(sys.argv[1:])
 | |
| 
 | |
|     if action == 'report':
 | |
|         rc = report(version=args['version'])
 | |
|     elif action == 'add':
 | |
|         rc = add_from_csv(args['csvfile'], version=args['version'], overwrite=args['overwrite'])
 | |
|     elif action == 'add-default':
 | |
|         rc = add_default(version=args['version'], overwrite=args['overwrite'])
 | |
|     elif action == 'upgrade':
 | |
|         rc = upgrade_metadata(version=args['version'])
 | |
| 
 | |
|     sys.exit(rc)
 |