Move utility functions out of basic.py (#51715)

Move the following methods to lib/anisble/module_utils/common/validation.py:

- _count_terms()
- _check_mutually_exclusive()
- _check_required_one_of()
- _check_required_together()
- _check_required_by()
- _check_required_arguments()
- _check_required_if
- fail_on_missing_params() --> create check_missing_parameters()
This commit is contained in:
Sam Doran 2019-03-14 21:29:55 -04:00 committed by GitHub
parent 34b928d283
commit 43a44e6f35
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 461 additions and 97 deletions

View file

@ -173,6 +173,16 @@ from ansible.module_utils.six import (
text_type,
)
from ansible.module_utils.six.moves import map, reduce, shlex_quote
from ansible.module_utils.common.validation import (
check_missing_parameters,
check_mutually_exclusive,
check_required_arguments,
check_required_by,
check_required_if,
check_required_one_of,
check_required_together,
count_terms,
)
from ansible.module_utils._text import to_native, to_bytes, to_text
from ansible.module_utils.common._utils import get_all_subclasses as _get_all_subclasses
from ansible.module_utils.parsing.convert_bool import BOOLEANS, BOOLEANS_FALSE, BOOLEANS_TRUE, boolean
@ -1593,80 +1603,72 @@ class AnsibleModule(object):
self.exit_json(skipped=True, msg="remote module (%s) does not support check mode" % self._name)
def _count_terms(self, check, param=None):
count = 0
if param is None:
param = self.params
for term in check:
if term in param:
count += 1
return count
return count_terms(check, param)
def _check_mutually_exclusive(self, spec, param=None):
if spec is None:
return
for check in spec:
count = self._count_terms(check, param)
if count > 1:
msg = "parameters are mutually exclusive: %s" % ', '.join(check)
if self._options_context:
msg += " found in %s" % " -> ".join(self._options_context)
self.fail_json(msg=msg)
if param is None:
param = self.params
try:
check_mutually_exclusive(spec, param)
except TypeError as e:
msg = to_native(e)
if self._options_context:
msg += " found in %s" % " -> ".join(self._options_context)
self.fail_json(msg=msg)
def _check_required_one_of(self, spec, param=None):
if spec is None:
return
for check in spec:
count = self._count_terms(check, param)
if count == 0:
msg = "one of the following is required: %s" % ', '.join(check)
if self._options_context:
msg += " found in %s" % " -> ".join(self._options_context)
self.fail_json(msg=msg)
if param is None:
param = self.params
try:
check_required_one_of(spec, param)
except TypeError as e:
msg = to_native(e)
if self._options_context:
msg += " found in %s" % " -> ".join(self._options_context)
self.fail_json(msg=msg)
def _check_required_together(self, spec, param=None):
if spec is None:
return
for check in spec:
counts = [self._count_terms([field], param) for field in check]
non_zero = [c for c in counts if c > 0]
if len(non_zero) > 0:
if 0 in counts:
msg = "parameters are required together: %s" % ', '.join(check)
if self._options_context:
msg += " found in %s" % " -> ".join(self._options_context)
self.fail_json(msg=msg)
if param is None:
param = self.params
try:
check_required_together(spec, param)
except TypeError as e:
msg = to_native(e)
if self._options_context:
msg += " found in %s" % " -> ".join(self._options_context)
self.fail_json(msg=msg)
def _check_required_by(self, spec, param=None):
if spec is None:
return
if param is None:
param = self.params
for (key, value) in spec.items():
if key not in param or param[key] is None:
continue
missing = []
# Support strings (single-item lists)
if isinstance(value, string_types):
value = [value, ]
for required in value:
if required not in param or param[required] is None:
missing.append(required)
if len(missing) > 0:
self.fail_json(msg="missing parameter(s) required by '%s': %s" % (key, ', '.join(missing)))
try:
check_required_by(spec, param)
except TypeError as e:
self.fail_json(msg=to_native(e))
def _check_required_arguments(self, spec=None, param=None):
''' ensure all required arguments are present '''
missing = []
if spec is None:
spec = self.argument_spec
if param is None:
param = self.params
for (k, v) in spec.items():
required = v.get('required', False)
if required and k not in param:
missing.append(k)
if len(missing) > 0:
msg = "missing required arguments: %s" % ", ".join(missing)
try:
check_required_arguments(spec, param)
except TypeError as e:
msg = to_native(e)
if self._options_context:
msg += " found in %s" % " -> ".join(self._options_context)
self.fail_json(msg=msg)
@ -1677,33 +1679,14 @@ class AnsibleModule(object):
return
if param is None:
param = self.params
for sp in spec:
missing = []
max_missing_count = 0
is_one_of = False
if len(sp) == 4:
key, val, requirements, is_one_of = sp
else:
key, val, requirements = sp
# is_one_of is True at least one requirement should be
# present, else all requirements should be present.
if is_one_of:
max_missing_count = len(requirements)
term = 'any'
else:
term = 'all'
if key in param and param[key] == val:
for check in requirements:
count = self._count_terms((check,), param)
if count == 0:
missing.append(check)
if len(missing) and len(missing) >= max_missing_count:
msg = "%s is %s but %s of the following are missing: %s" % (key, val, term, ', '.join(missing))
if self._options_context:
msg += " found in %s" % " -> ".join(self._options_context)
self.fail_json(msg=msg)
try:
check_required_if(spec, param)
except TypeError as e:
msg = to_native(e)
if self._options_context:
msg += " found in %s" % " -> ".join(self._options_context)
self.fail_json(msg=msg)
def _check_argument_values(self, spec=None, param=None):
''' ensure all arguments have the requested values, and there are no stray arguments '''
@ -2315,17 +2298,12 @@ class AnsibleModule(object):
sys.exit(1)
def fail_on_missing_params(self, required_params=None):
''' This is for checking for required params when we can not check via argspec because we
need more information than is simply given in the argspec.
'''
if not required_params:
return
missing_params = []
for required_param in required_params:
if not self.params.get(required_param):
missing_params.append(required_param)
if missing_params:
self.fail_json(msg="missing required arguments: %s" % ', '.join(missing_params))
try:
check_missing_parameters(self.params, required_params)
except TypeError as e:
self.fail_json(msg=to_native(e))
def digest_from_file(self, filename, algorithm):
''' Return hex digest of local file for a digest_method specified by name, or None if file is not present. '''