mirror of
https://github.com/ansible-collections/community.general.git
synced 2025-04-28 21:31:26 -07:00
Initial ansible-test implementation. (#18556)
This commit is contained in:
parent
d95eac16eb
commit
6bbd92e422
191 changed files with 5483 additions and 48 deletions
530
test/runner/lib/target.py
Normal file
530
test/runner/lib/target.py
Normal file
|
@ -0,0 +1,530 @@
|
|||
"""Test target identification, iteration and inclusion/exclusion."""
|
||||
|
||||
from __future__ import absolute_import, print_function
|
||||
|
||||
import os
|
||||
import re
|
||||
import errno
|
||||
import itertools
|
||||
import abc
|
||||
|
||||
from lib.util import ApplicationError
|
||||
|
||||
MODULE_EXTENSIONS = '.py', '.ps1'
|
||||
|
||||
|
||||
def find_target_completion(target_func, prefix):
|
||||
"""
|
||||
:type target_func: () -> collections.Iterable[CompletionTarget]
|
||||
:type prefix: unicode
|
||||
:rtype: list[str]
|
||||
"""
|
||||
try:
|
||||
targets = target_func()
|
||||
prefix = prefix.encode()
|
||||
short = os.environ.get('COMP_TYPE') == '63' # double tab completion from bash
|
||||
matches = walk_completion_targets(targets, prefix, short)
|
||||
return matches
|
||||
except Exception as ex: # pylint: disable=locally-disabled, broad-except
|
||||
return [str(ex)]
|
||||
|
||||
|
||||
def walk_completion_targets(targets, prefix, short=False):
|
||||
"""
|
||||
:type targets: collections.Iterable[CompletionTarget]
|
||||
:type prefix: str
|
||||
:type short: bool
|
||||
:rtype: tuple[str]
|
||||
"""
|
||||
aliases = set(alias for target in targets for alias in target.aliases)
|
||||
|
||||
if prefix.endswith('/') and prefix in aliases:
|
||||
aliases.remove(prefix)
|
||||
|
||||
matches = [alias for alias in aliases if alias.startswith(prefix) and '/' not in alias[len(prefix):-1]]
|
||||
|
||||
if short:
|
||||
offset = len(os.path.dirname(prefix))
|
||||
if offset:
|
||||
offset += 1
|
||||
relative_matches = [match[offset:] for match in matches if len(match) > offset]
|
||||
if len(relative_matches) > 1:
|
||||
matches = relative_matches
|
||||
|
||||
return tuple(sorted(matches))
|
||||
|
||||
|
||||
def walk_internal_targets(targets, includes=None, excludes=None, requires=None):
|
||||
"""
|
||||
:type targets: collections.Iterable[T <= CompletionTarget]
|
||||
:type includes: list[str]
|
||||
:type excludes: list[str]
|
||||
:type requires: list[str]
|
||||
:rtype: tuple[T <= CompletionTarget]
|
||||
"""
|
||||
targets = tuple(targets)
|
||||
|
||||
include_targets = sorted(filter_targets(targets, includes, errors=True, directories=False), key=lambda t: t.name)
|
||||
|
||||
if requires:
|
||||
require_targets = set(filter_targets(targets, requires, errors=True, directories=False))
|
||||
include_targets = [target for target in include_targets if target in require_targets]
|
||||
|
||||
if excludes:
|
||||
list(filter_targets(targets, excludes, errors=True, include=False, directories=False))
|
||||
|
||||
internal_targets = set(filter_targets(include_targets, excludes, errors=False, include=False, directories=False))
|
||||
return tuple(sorted(internal_targets, key=lambda t: t.name))
|
||||
|
||||
|
||||
def walk_external_targets(targets, includes=None, excludes=None, requires=None):
|
||||
"""
|
||||
:type targets: collections.Iterable[CompletionTarget]
|
||||
:type includes: list[str]
|
||||
:type excludes: list[str]
|
||||
:type requires: list[str]
|
||||
:rtype: tuple[CompletionTarget], tuple[CompletionTarget]
|
||||
"""
|
||||
targets = tuple(targets)
|
||||
|
||||
if requires:
|
||||
include_targets = list(filter_targets(targets, includes, errors=True, directories=False))
|
||||
require_targets = set(filter_targets(targets, requires, errors=True, directories=False))
|
||||
includes = [target.name for target in include_targets if target in require_targets]
|
||||
|
||||
if includes:
|
||||
include_targets = sorted(filter_targets(targets, includes, errors=True), key=lambda t: t.name)
|
||||
else:
|
||||
include_targets = []
|
||||
else:
|
||||
include_targets = sorted(filter_targets(targets, includes, errors=True), key=lambda t: t.name)
|
||||
|
||||
if excludes:
|
||||
exclude_targets = sorted(filter_targets(targets, excludes, errors=True), key=lambda t: t.name)
|
||||
else:
|
||||
exclude_targets = []
|
||||
|
||||
previous = None
|
||||
include = []
|
||||
for target in include_targets:
|
||||
if isinstance(previous, DirectoryTarget) and isinstance(target, DirectoryTarget) \
|
||||
and previous.name == target.name:
|
||||
previous.modules = tuple(set(previous.modules) | set(target.modules))
|
||||
else:
|
||||
include.append(target)
|
||||
previous = target
|
||||
|
||||
previous = None
|
||||
exclude = []
|
||||
for target in exclude_targets:
|
||||
if isinstance(previous, DirectoryTarget) and isinstance(target, DirectoryTarget) \
|
||||
and previous.name == target.name:
|
||||
previous.modules = tuple(set(previous.modules) | set(target.modules))
|
||||
else:
|
||||
exclude.append(target)
|
||||
previous = target
|
||||
|
||||
return tuple(include), tuple(exclude)
|
||||
|
||||
|
||||
def filter_targets(targets, patterns, include=True, directories=True, errors=True):
|
||||
"""
|
||||
:type targets: collections.Iterable[CompletionTarget]
|
||||
:type patterns: list[str]
|
||||
:type include: bool
|
||||
:type directories: bool
|
||||
:type errors: bool
|
||||
:rtype: collections.Iterable[CompletionTarget]
|
||||
"""
|
||||
unmatched = set(patterns or ())
|
||||
|
||||
for target in targets:
|
||||
matched_directories = set()
|
||||
match = False
|
||||
|
||||
if patterns:
|
||||
for alias in target.aliases:
|
||||
for pattern in patterns:
|
||||
if re.match('^%s$' % pattern, alias):
|
||||
match = True
|
||||
|
||||
try:
|
||||
unmatched.remove(pattern)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
if alias.endswith('/'):
|
||||
if target.base_path and len(target.base_path) > len(alias):
|
||||
matched_directories.add(target.base_path)
|
||||
else:
|
||||
matched_directories.add(alias)
|
||||
elif include:
|
||||
match = True
|
||||
if not target.base_path:
|
||||
matched_directories.add('.')
|
||||
for alias in target.aliases:
|
||||
if alias.endswith('/'):
|
||||
if target.base_path and len(target.base_path) > len(alias):
|
||||
matched_directories.add(target.base_path)
|
||||
else:
|
||||
matched_directories.add(alias)
|
||||
|
||||
if match != include:
|
||||
continue
|
||||
|
||||
if directories and matched_directories:
|
||||
yield DirectoryTarget(sorted(matched_directories, key=len)[0], target.modules)
|
||||
else:
|
||||
yield target
|
||||
|
||||
if errors:
|
||||
if unmatched:
|
||||
raise TargetPatternsNotMatched(unmatched)
|
||||
|
||||
|
||||
def walk_module_targets():
|
||||
"""
|
||||
:rtype: collections.Iterable[TestTarget]
|
||||
"""
|
||||
path = 'lib/ansible/modules'
|
||||
|
||||
for target in walk_test_targets(path, path + '/', extensions=MODULE_EXTENSIONS):
|
||||
if not target.module:
|
||||
continue
|
||||
|
||||
yield target
|
||||
|
||||
|
||||
def walk_units_targets():
|
||||
"""
|
||||
:rtype: collections.Iterable[TestTarget]
|
||||
"""
|
||||
return walk_test_targets(path='test/units', module_path='test/units/modules/', extensions=('.py',), prefix='test_')
|
||||
|
||||
|
||||
def walk_compile_targets():
|
||||
"""
|
||||
:rtype: collections.Iterable[TestTarget]
|
||||
"""
|
||||
return walk_test_targets(module_path='lib/ansible/modules/', extensions=('.py',))
|
||||
|
||||
|
||||
def walk_sanity_targets():
|
||||
"""
|
||||
:rtype: collections.Iterable[TestTarget]
|
||||
"""
|
||||
return walk_test_targets(module_path='lib/ansible/modules/')
|
||||
|
||||
|
||||
def walk_posix_integration_targets():
|
||||
"""
|
||||
:rtype: collections.Iterable[IntegrationTarget]
|
||||
"""
|
||||
for target in walk_integration_targets():
|
||||
if 'posix/' in target.aliases:
|
||||
yield target
|
||||
|
||||
|
||||
def walk_network_integration_targets():
|
||||
"""
|
||||
:rtype: collections.Iterable[IntegrationTarget]
|
||||
"""
|
||||
for target in walk_integration_targets():
|
||||
if 'network/' in target.aliases:
|
||||
yield target
|
||||
|
||||
|
||||
def walk_windows_integration_targets():
|
||||
"""
|
||||
:rtype: collections.Iterable[IntegrationTarget]
|
||||
"""
|
||||
for target in walk_integration_targets():
|
||||
if 'windows/' in target.aliases:
|
||||
yield target
|
||||
|
||||
|
||||
def walk_integration_targets():
|
||||
"""
|
||||
:rtype: collections.Iterable[IntegrationTarget]
|
||||
"""
|
||||
path = 'test/integration/targets'
|
||||
modules = frozenset(t.module for t in walk_module_targets())
|
||||
paths = sorted(os.path.join(path, p) for p in os.listdir(path))
|
||||
prefixes = load_integration_prefixes()
|
||||
|
||||
for path in paths:
|
||||
yield IntegrationTarget(path, modules, prefixes)
|
||||
|
||||
|
||||
def load_integration_prefixes():
|
||||
"""
|
||||
:rtype: dict[str, str]
|
||||
"""
|
||||
path = 'test/integration'
|
||||
names = sorted(f for f in os.listdir(path) if os.path.splitext(f)[0] == 'target-prefixes')
|
||||
prefixes = {}
|
||||
|
||||
for name in names:
|
||||
prefix = os.path.splitext(name)[1][1:]
|
||||
with open(os.path.join(path, name), 'r') as prefix_fd:
|
||||
prefixes.update(dict((k, prefix) for k in prefix_fd.read().splitlines()))
|
||||
|
||||
return prefixes
|
||||
|
||||
|
||||
def walk_test_targets(path=None, module_path=None, extensions=None, prefix=None):
|
||||
"""
|
||||
:type path: str | None
|
||||
:type module_path: str | None
|
||||
:type extensions: tuple[str] | None
|
||||
:type prefix: str | None
|
||||
:rtype: collections.Iterable[TestTarget]
|
||||
"""
|
||||
for root, _, file_names in os.walk(path or '.', topdown=False):
|
||||
if root.endswith('/__pycache__'):
|
||||
continue
|
||||
|
||||
if path is None:
|
||||
root = root[2:]
|
||||
|
||||
if root.startswith('.'):
|
||||
continue
|
||||
|
||||
for file_name in file_names:
|
||||
name, ext = os.path.splitext(os.path.basename(file_name))
|
||||
|
||||
if name.startswith('.'):
|
||||
continue
|
||||
|
||||
if extensions and ext not in extensions:
|
||||
continue
|
||||
|
||||
if prefix and not name.startswith(prefix):
|
||||
continue
|
||||
|
||||
yield TestTarget(os.path.join(root, file_name), module_path, prefix, path)
|
||||
|
||||
|
||||
class CompletionTarget(object):
|
||||
"""Command-line argument completion target base class."""
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
def __init__(self):
|
||||
self.name = None
|
||||
self.path = None
|
||||
self.base_path = None
|
||||
self.modules = tuple()
|
||||
self.aliases = tuple()
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, CompletionTarget):
|
||||
return self.__repr__() == other.__repr__()
|
||||
else:
|
||||
return False
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self.__eq__(other)
|
||||
|
||||
def __lt__(self, other):
|
||||
return self.name.__lt__(other.name)
|
||||
|
||||
def __gt__(self, other):
|
||||
return self.name.__gt__(other.name)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.__repr__())
|
||||
|
||||
def __repr__(self):
|
||||
if self.modules:
|
||||
return '%s (%s)' % (self.name, ', '.join(self.modules))
|
||||
|
||||
return self.name
|
||||
|
||||
|
||||
class DirectoryTarget(CompletionTarget):
|
||||
"""Directory target."""
|
||||
def __init__(self, path, modules):
|
||||
"""
|
||||
:type path: str
|
||||
:type modules: tuple[str]
|
||||
"""
|
||||
super(DirectoryTarget, self).__init__()
|
||||
|
||||
self.name = path
|
||||
self.path = path
|
||||
self.modules = modules
|
||||
|
||||
|
||||
class TestTarget(CompletionTarget):
|
||||
"""Generic test target."""
|
||||
def __init__(self, path, module_path, module_prefix, base_path):
|
||||
"""
|
||||
:type path: str
|
||||
:type module_path: str | None
|
||||
:type module_prefix: str | None
|
||||
:type base_path: str
|
||||
"""
|
||||
super(TestTarget, self).__init__()
|
||||
|
||||
self.name = path
|
||||
self.path = path
|
||||
self.base_path = base_path + '/' if base_path else None
|
||||
|
||||
name, ext = os.path.splitext(os.path.basename(self.path))
|
||||
|
||||
if module_path and path.startswith(module_path) and name != '__init__' and ext in MODULE_EXTENSIONS:
|
||||
self.module = name[len(module_prefix or ''):].lstrip('_')
|
||||
self.modules = self.module,
|
||||
else:
|
||||
self.module = None
|
||||
self.modules = tuple()
|
||||
|
||||
aliases = [self.path, self.module]
|
||||
parts = self.path.split('/')
|
||||
|
||||
for i in range(1, len(parts)):
|
||||
alias = '%s/' % '/'.join(parts[:i])
|
||||
aliases.append(alias)
|
||||
|
||||
aliases = [a for a in aliases if a]
|
||||
|
||||
self.aliases = tuple(sorted(aliases))
|
||||
|
||||
|
||||
class IntegrationTarget(CompletionTarget):
|
||||
"""Integration test target."""
|
||||
non_posix = frozenset((
|
||||
'network',
|
||||
'windows',
|
||||
))
|
||||
|
||||
categories = frozenset(non_posix | frozenset((
|
||||
'posix',
|
||||
'module',
|
||||
'needs',
|
||||
'skip',
|
||||
)))
|
||||
|
||||
def __init__(self, path, modules, prefixes):
|
||||
"""
|
||||
:type path: str
|
||||
:type modules: frozenset[str]
|
||||
:type prefixes: dict[str, str]
|
||||
"""
|
||||
super(IntegrationTarget, self).__init__()
|
||||
|
||||
self.name = os.path.basename(path)
|
||||
self.path = path
|
||||
|
||||
# script_path and type
|
||||
|
||||
contents = sorted(os.listdir(path))
|
||||
|
||||
runme_files = tuple(c for c in contents if os.path.splitext(c)[0] == 'runme')
|
||||
test_files = tuple(c for c in contents if os.path.splitext(c)[0] == 'test')
|
||||
|
||||
self.script_path = None
|
||||
|
||||
if runme_files:
|
||||
self.type = 'script'
|
||||
self.script_path = os.path.join(path, runme_files[0])
|
||||
elif test_files:
|
||||
self.type = 'special'
|
||||
elif os.path.isdir(os.path.join(path, 'tasks')):
|
||||
self.type = 'role'
|
||||
else:
|
||||
self.type = 'unknown'
|
||||
|
||||
# static_aliases
|
||||
|
||||
try:
|
||||
with open(os.path.join(path, 'aliases'), 'r') as aliases_file:
|
||||
static_aliases = tuple(aliases_file.read().splitlines())
|
||||
except IOError as ex:
|
||||
if ex.errno != errno.ENOENT:
|
||||
raise
|
||||
static_aliases = tuple()
|
||||
|
||||
# modules
|
||||
|
||||
if self.name in modules:
|
||||
module = self.name
|
||||
elif self.name.startswith('win_') and self.name[4:] in modules:
|
||||
module = self.name[4:]
|
||||
else:
|
||||
module = None
|
||||
|
||||
self.modules = tuple(sorted(a for a in static_aliases + tuple([module]) if a in modules))
|
||||
|
||||
# groups
|
||||
|
||||
groups = [self.type]
|
||||
groups += [a for a in static_aliases if a not in modules]
|
||||
groups += ['module/%s' % m for m in self.modules]
|
||||
|
||||
if not self.modules:
|
||||
groups.append('non_module')
|
||||
|
||||
if 'destructive' not in groups:
|
||||
groups.append('non_destructive')
|
||||
|
||||
if '_' in self.name:
|
||||
prefix = self.name[:self.name.find('_')]
|
||||
else:
|
||||
prefix = None
|
||||
|
||||
if prefix in prefixes:
|
||||
group = prefixes[prefix]
|
||||
|
||||
if group != prefix:
|
||||
group = '%s/%s' % (group, prefix)
|
||||
|
||||
groups.append(group)
|
||||
|
||||
if self.name.startswith('win_'):
|
||||
groups.append('windows')
|
||||
|
||||
if self.name.startswith('connection_'):
|
||||
groups.append('connection')
|
||||
|
||||
if self.name.startswith('setup_') or self.name.startswith('prepare_'):
|
||||
groups.append('hidden')
|
||||
|
||||
if self.type not in ('script', 'role'):
|
||||
groups.append('hidden')
|
||||
|
||||
for group in itertools.islice(groups, 0, len(groups)):
|
||||
if '/' in group:
|
||||
parts = group.split('/')
|
||||
for i in range(1, len(parts)):
|
||||
groups.append('/'.join(parts[:i]))
|
||||
|
||||
if not any(g in self.non_posix for g in groups):
|
||||
groups.append('posix')
|
||||
|
||||
# aliases
|
||||
|
||||
aliases = [self.name] + \
|
||||
['%s/' % g for g in groups] + \
|
||||
['%s/%s' % (g, self.name) for g in groups if g not in self.categories]
|
||||
|
||||
if 'hidden/' in aliases:
|
||||
aliases = ['hidden/'] + ['hidden/%s' % a for a in aliases if not a.startswith('hidden/')]
|
||||
|
||||
self.aliases = tuple(sorted(set(aliases)))
|
||||
|
||||
|
||||
class TargetPatternsNotMatched(ApplicationError):
|
||||
"""One or more targets were not matched when a match was required."""
|
||||
def __init__(self, patterns):
|
||||
"""
|
||||
:type patterns: set[str]
|
||||
"""
|
||||
self.patterns = sorted(patterns)
|
||||
|
||||
if len(patterns) > 1:
|
||||
message = 'Target patterns not matched:\n%s' % '\n'.join(self.patterns)
|
||||
else:
|
||||
message = 'Target pattern not matched: %s' % self.patterns[0]
|
||||
|
||||
super(TargetPatternsNotMatched, self).__init__(message)
|
Loading…
Add table
Add a link
Reference in a new issue