Update ansible-test sanity command. (#31958)

* Use correct pip version in ansible-test.
* Add git fallback for validate-modules.
* Run sanity tests in a docker container.
* Use correct python version for sanity tests.
* Pin docker completion images and add default.
* Split pylint execution into multiple contexts.
* Only test .py files in use-argspec-type-path test.
* Accept identical python interpeter name or binary.
* Switch cloud tests to default container.
* Remove unused extras from pip install.
* Filter out empty pip commands.
* Don't force running of pip list.
* Support delegation for windows and network tests.
* Fix ansible-test python version usage.
* Fix ansible-test python version skipping.
* Use absolute path for log in ansible-test.
* Run vyos_command test on python 3.
* Fix windows/network instance persistence.
* Add `test/cache` dir to classification.
* Enable more python versions for network tests.
* Fix cs_router test.
This commit is contained in:
Matt Clay 2017-10-26 00:21:46 -07:00 committed by GitHub
parent 602a618e60
commit cf1337ca9a
37 changed files with 788 additions and 456 deletions

View file

@ -2,15 +2,22 @@
from __future__ import absolute_import, print_function
import atexit
import errno
import filecmp
import inspect
import json
import os
import pipes
import pkgutil
import shutil
import subprocess
import random
import re
import shutil
import stat
import string
import subprocess
import sys
import tempfile
import time
try:
@ -19,6 +26,23 @@ except ImportError:
from abc import ABCMeta
ABC = ABCMeta('ABC', (), {})
DOCKER_COMPLETION = {}
coverage_path = '' # pylint: disable=locally-disabled, invalid-name
def get_docker_completion():
"""
:rtype: dict[str, str]
"""
if not DOCKER_COMPLETION:
with open('test/runner/completion/docker.txt', 'r') as completion_fd:
images = completion_fd.read().splitlines()
DOCKER_COMPLETION.update(dict((i.split('@')[0], i) for i in images))
return DOCKER_COMPLETION
def is_shippable():
"""
@ -35,6 +59,51 @@ def remove_file(path):
os.remove(path)
def find_pip(path=None, version=None):
"""
:type path: str | None
:type version: str | None
:rtype: str
"""
if version:
version_info = version.split('.')
python_bin = find_executable('python%s' % version, path=path)
else:
version_info = sys.version_info
python_bin = sys.executable
choices = (
'pip%s' % '.'.join(str(i) for i in version_info[:2]),
'pip%s' % version_info[0],
'pip',
)
pip = None
for choice in choices:
pip = find_executable(choice, required=False, path=path)
if pip:
break
if not pip:
raise ApplicationError('Required program not found: %s' % ', '.join(choices))
with open(pip) as pip_fd:
shebang = pip_fd.readline().strip()
if not shebang.startswith('#!') or ' ' in shebang:
raise ApplicationError('Unexpected shebang in "%s": %s' % (pip, shebang))
our_python = os.path.realpath(python_bin)
pip_python = os.path.realpath(shebang[2:])
if our_python != pip_python and not filecmp.cmp(our_python, pip_python, False):
raise ApplicationError('Current interpreter "%s" does not match "%s" interpreter "%s".' % (our_python, pip, pip_python))
return pip
def find_executable(executable, cwd=None, path=None, required=True):
"""
:type executable: str
@ -87,6 +156,104 @@ def find_executable(executable, cwd=None, path=None, required=True):
return match
def intercept_command(args, cmd, target_name, capture=False, env=None, data=None, cwd=None, python_version=None, path=None):
"""
:type args: TestConfig
:type cmd: collections.Iterable[str]
:type target_name: str
:type capture: bool
:type env: dict[str, str] | None
:type data: str | None
:type cwd: str | None
:type python_version: str | None
:type path: str | None
:rtype: str | None, str | None
"""
if not env:
env = common_environment()
cmd = list(cmd)
inject_path = get_coverage_path(args)
config_path = os.path.join(inject_path, 'injector.json')
version = python_version or args.python_version
interpreter = find_executable('python%s' % version, path=path)
coverage_file = os.path.abspath(os.path.join(inject_path, '..', 'output', '%s=%s=%s=%s=coverage' % (
args.command, target_name, args.coverage_label or 'local-%s' % version, 'python-%s' % version)))
env['PATH'] = inject_path + os.pathsep + env['PATH']
env['ANSIBLE_TEST_PYTHON_VERSION'] = version
env['ANSIBLE_TEST_PYTHON_INTERPRETER'] = interpreter
config = dict(
python_interpreter=interpreter,
coverage_file=coverage_file if args.coverage else None,
)
if not args.explain:
with open(config_path, 'w') as config_fd:
json.dump(config, config_fd, indent=4, sort_keys=True)
return run_command(args, cmd, capture=capture, env=env, data=data, cwd=cwd)
def get_coverage_path(args):
"""
:type args: TestConfig
:rtype: str
"""
global coverage_path # pylint: disable=locally-disabled, global-statement, invalid-name
if coverage_path:
return os.path.join(coverage_path, 'coverage')
prefix = 'ansible-test-coverage-'
tmp_dir = '/tmp'
if args.explain:
return os.path.join(tmp_dir, '%stmp' % prefix, 'coverage')
src = os.path.abspath(os.path.join(os.getcwd(), 'test/runner/injector/'))
coverage_path = tempfile.mkdtemp('', prefix, dir=tmp_dir)
os.chmod(coverage_path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
shutil.copytree(src, os.path.join(coverage_path, 'coverage'))
shutil.copy('.coveragerc', os.path.join(coverage_path, 'coverage', '.coveragerc'))
for root, dir_names, file_names in os.walk(coverage_path):
for name in dir_names + file_names:
os.chmod(os.path.join(root, name), stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
for directory in 'output', 'logs':
os.mkdir(os.path.join(coverage_path, directory))
os.chmod(os.path.join(coverage_path, directory), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
atexit.register(cleanup_coverage_dir)
return os.path.join(coverage_path, 'coverage')
def cleanup_coverage_dir():
"""Copy over coverage data from temporary directory and purge temporary directory."""
output_dir = os.path.join(coverage_path, 'output')
for filename in os.listdir(output_dir):
src = os.path.join(output_dir, filename)
dst = os.path.join(os.getcwd(), 'test', 'results', 'coverage')
shutil.copy(src, dst)
logs_dir = os.path.join(coverage_path, 'logs')
for filename in os.listdir(logs_dir):
random_suffix = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(8))
new_name = '%s.%s.log' % (os.path.splitext(os.path.basename(filename))[0], random_suffix)
src = os.path.join(logs_dir, filename)
dst = os.path.join(os.getcwd(), 'test', 'results', 'logs', new_name)
shutil.copy(src, dst)
shutil.rmtree(coverage_path)
def run_command(args, cmd, capture=False, env=None, data=None, cwd=None, always=False, stdin=None, stdout=None,
cmd_verbosity=1, str_errors='strict'):
"""
@ -459,6 +626,8 @@ def docker_qualify_image(name):
if not name or any((c in name) for c in ('/', ':')):
return name
name = get_docker_completion().get(name, name)
return 'ansible/ansible:%s' % name