Add ansible-test finally block after tests run.

This will record data from passing tests even when tests fail.
This commit is contained in:
Matt Clay 2019-03-07 16:44:26 -08:00
parent 8ef2e6da05
commit 8535c05b29

View file

@ -833,124 +833,126 @@ def command_integration_filtered(args, targets, all_targets, inventory_path, pre
current_environment = None # type: EnvironmentDescription | None current_environment = None # type: EnvironmentDescription | None
for target in targets_iter: try:
if args.start_at and not found: for target in targets_iter:
found = target.name == args.start_at if args.start_at and not found:
found = target.name == args.start_at
if not found: if not found:
continue
if args.list_targets:
print(target.name)
continue continue
if args.list_targets: tries = 2 if args.retry_on_error else 1
print(target.name) verbosity = args.verbosity
continue
tries = 2 if args.retry_on_error else 1 cloud_environment = get_cloud_environment(args, target)
verbosity = args.verbosity
cloud_environment = get_cloud_environment(args, target) original_environment = current_environment if current_environment else EnvironmentDescription(args)
current_environment = None
original_environment = current_environment if current_environment else EnvironmentDescription(args) display.info('>>> Environment Description\n%s' % original_environment, verbosity=3)
current_environment = None
display.info('>>> Environment Description\n%s' % original_environment, verbosity=3) try:
while tries:
try: tries -= 1
while tries:
tries -= 1
try:
if cloud_environment:
cloud_environment.setup_once()
run_setup_targets(args, test_dir, target.setup_once, all_targets_dict, setup_targets_executed, inventory_path, False)
start_time = time.time()
run_setup_targets(args, test_dir, target.setup_always, all_targets_dict, setup_targets_executed, inventory_path, True)
if not args.explain:
# create a fresh test directory for each test target
remove_tree(test_dir)
make_dirs(test_dir)
if pre_target:
pre_target(target)
try: try:
if target.script_path: if cloud_environment:
command_integration_script(args, target, test_dir, inventory_path) cloud_environment.setup_once()
else:
command_integration_role(args, target, start_at_task, test_dir, inventory_path)
start_at_task = None
finally:
if post_target:
post_target(target)
end_time = time.time() run_setup_targets(args, test_dir, target.setup_once, all_targets_dict, setup_targets_executed, inventory_path, False)
results[target.name] = dict( start_time = time.time()
name=target.name,
type=target.type,
aliases=target.aliases,
modules=target.modules,
run_time_seconds=int(end_time - start_time),
setup_once=target.setup_once,
setup_always=target.setup_always,
coverage=args.coverage,
coverage_label=args.coverage_label,
python_version=args.python_version,
)
break run_setup_targets(args, test_dir, target.setup_always, all_targets_dict, setup_targets_executed, inventory_path, True)
except SubprocessError:
if cloud_environment:
cloud_environment.on_failure(target, tries)
if not original_environment.validate(target.name, throw=False): if not args.explain:
raise # create a fresh test directory for each test target
remove_tree(test_dir)
make_dirs(test_dir)
if not tries: if pre_target:
raise pre_target(target)
display.warning('Retrying test target "%s" with maximum verbosity.' % target.name) try:
display.verbosity = args.verbosity = 6 if target.script_path:
command_integration_script(args, target, test_dir, inventory_path)
else:
command_integration_role(args, target, start_at_task, test_dir, inventory_path)
start_at_task = None
finally:
if post_target:
post_target(target)
start_time = time.time() end_time = time.time()
current_environment = EnvironmentDescription(args)
end_time = time.time()
EnvironmentDescription.check(original_environment, current_environment, target.name, throw=True) results[target.name] = dict(
name=target.name,
type=target.type,
aliases=target.aliases,
modules=target.modules,
run_time_seconds=int(end_time - start_time),
setup_once=target.setup_once,
setup_always=target.setup_always,
coverage=args.coverage,
coverage_label=args.coverage_label,
python_version=args.python_version,
)
results[target.name]['validation_seconds'] = int(end_time - start_time) break
except SubprocessError:
if cloud_environment:
cloud_environment.on_failure(target, tries)
passed.append(target) if not original_environment.validate(target.name, throw=False):
except Exception as ex: raise
failed.append(target)
if args.continue_on_error: if not tries:
display.error(ex) raise
continue
display.notice('To resume at this test target, use the option: --start-at %s' % target.name) display.warning('Retrying test target "%s" with maximum verbosity.' % target.name)
display.verbosity = args.verbosity = 6
next_target = next(targets_iter, None) start_time = time.time()
current_environment = EnvironmentDescription(args)
end_time = time.time()
if next_target: EnvironmentDescription.check(original_environment, current_environment, target.name, throw=True)
display.notice('To resume after this test target, use the option: --start-at %s' % next_target.name)
raise results[target.name]['validation_seconds'] = int(end_time - start_time)
finally:
display.verbosity = args.verbosity = verbosity
if not args.explain: passed.append(target)
results_path = 'test/results/data/%s-%s.json' % (args.command, re.sub(r'[^0-9]', '-', str(datetime.datetime.utcnow().replace(microsecond=0)))) except Exception as ex:
failed.append(target)
data = dict( if args.continue_on_error:
targets=results, display.error(ex)
) continue
with open(results_path, 'w') as results_fd: display.notice('To resume at this test target, use the option: --start-at %s' % target.name)
results_fd.write(json.dumps(data, sort_keys=True, indent=4))
next_target = next(targets_iter, None)
if next_target:
display.notice('To resume after this test target, use the option: --start-at %s' % next_target.name)
raise
finally:
display.verbosity = args.verbosity = verbosity
finally:
if not args.explain:
results_path = 'test/results/data/%s-%s.json' % (args.command, re.sub(r'[^0-9]', '-', str(datetime.datetime.utcnow().replace(microsecond=0))))
data = dict(
targets=results,
)
with open(results_path, 'w') as results_fd:
results_fd.write(json.dumps(data, sort_keys=True, indent=4))
if failed: if failed:
raise ApplicationError('The %d integration test(s) listed below (out of %d) failed. See error output above for details:\n%s' % ( raise ApplicationError('The %d integration test(s) listed below (out of %d) failed. See error output above for details:\n%s' % (