K8s dynamic collected changes (#40745)

* Move k8s modules to dynamic backend

* update required openshift version

* update -> patch

* use new dynamic client exceptions

* style

* guard urllib3 import

* guard ansibleerror import

* give more information about error cause

* format in variable

* style

* rename tests

* Search for provided kind in a few more places to match old behavior, properly handle failure

* make common code use fail instead of fail_json, to work for lookup plugins as well

* update docs

* move openshift_raw tests into k8s tests

* fix typo

* Use diff of response and resource to determine change, don't do any checking client-side before making requests

* remove duplicate yaml blocks

* Update porting guide for k8s module

* remove invalid doc refs

* If fuzzy searching finds a resource, update resource_definition to match proper kind and version

* remote unsupported openshift_raw variables

* properly check environment variables when determining auth method:
This commit is contained in:
Fabian von Feilitzsch 2018-05-30 13:04:48 -04:00 committed by Adam Miller
commit 4d77878654
23 changed files with 661 additions and 2038 deletions

View file

@ -22,13 +22,12 @@ import copy
import math
import time
from ansible.module_utils.six import iteritems
from ansible.module_utils.k8s.common import OpenShiftAnsibleModuleMixin
from ansible.module_utils.k8s.raw import KubernetesRawModule
from ansible.module_utils.k8s.helper import AUTH_ARG_SPEC, COMMON_ARG_SPEC
from ansible.module_utils.k8s.common import AUTH_ARG_SPEC, COMMON_ARG_SPEC
try:
from openshift import watch
from openshift.dynamic.client import ResourceInstance
from openshift.helper.exceptions import KubernetesException
except ImportError as exc:
class KubernetesException(Exception):
@ -47,14 +46,14 @@ SCALE_ARG_SPEC = {
class KubernetesAnsibleScaleModule(KubernetesRawModule):
def execute_module(self):
if self.resource_definition:
resource_params = self.resource_to_parameters(self.resource_definition)
self.params.update(resource_params)
definition = self.resource_definitions[0]
self.authenticate()
self.client = self.get_api_client()
name = self.params.get('name')
namespace = self.params.get('namespace')
name = definition['metadata']['name']
namespace = definition['metadata'].get('namespace')
api_version = definition['apiVersion']
kind = definition['kind']
current_replicas = self.params.get('current_replicas')
replicas = self.params.get('replicas')
resource_version = self.params.get('resource_version')
@ -65,8 +64,10 @@ class KubernetesAnsibleScaleModule(KubernetesRawModule):
existing_count = None
return_attributes = dict(changed=False, result=dict())
resource = self.find_resource(kind, api_version, fail=True)
try:
existing = self.helper.get_object(name, namespace)
existing = resource.get(name=name, namespace=namespace)
return_attributes['result'] = existing.to_dict()
except KubernetesException as exc:
self.fail_json(msg='Failed to retrieve requested object: {0}'.format(exc.message),
@ -80,7 +81,7 @@ class KubernetesAnsibleScaleModule(KubernetesRawModule):
if existing_count is None:
self.fail_json(msg='Failed to retrieve the available count for the requested object.')
if resource_version and resource_version != existing.metadata.resource_version:
if resource_version and resource_version != existing.metadata.resourceVersion:
self.exit_json(**return_attributes)
if current_replicas is not None and existing_count != current_replicas:
@ -91,25 +92,13 @@ class KubernetesAnsibleScaleModule(KubernetesRawModule):
if not self.check_mode:
if self.kind == 'job':
existing.spec.parallelism = replicas
k8s_obj = self.helper.patch_object(name, namespace, existing)
k8s_obj = resource.patch(existing.to_dict())
else:
k8s_obj = self.scale(existing, replicas, wait, wait_time)
k8s_obj = self.scale(resource, existing, replicas, wait, wait_time)
return_attributes['result'] = k8s_obj.to_dict()
self.exit_json(**return_attributes)
def resource_to_parameters(self, resource):
""" Converts a resource definition to module parameters """
parameters = {}
for key, value in iteritems(resource):
if key in ('apiVersion', 'kind', 'status'):
continue
elif key == 'metadata' and isinstance(value, dict):
for meta_key, meta_value in iteritems(value):
if meta_key in ('name', 'namespace', 'resourceVersion'):
parameters[meta_key] = meta_value
return parameters
@property
def argspec(self):
args = copy.deepcopy(COMMON_ARG_SPEC)
@ -119,91 +108,67 @@ class KubernetesAnsibleScaleModule(KubernetesRawModule):
args.update(SCALE_ARG_SPEC)
return args
def scale(self, existing_object, replicas, wait, wait_time):
def scale(self, resource, existing_object, replicas, wait, wait_time):
name = existing_object.metadata.name
namespace = existing_object.metadata.namespace
method_name = 'patch_namespaced_{0}_scale'.format(self.kind)
method = None
model = None
try:
method = self.helper.lookup_method(method_name=method_name)
except KubernetesException:
if not hasattr(resource, 'scale'):
self.fail_json(
msg="Failed to get method {0}. Is 'scale' a valid operation for {1}?".format(method_name, self.kind)
msg="Cannot perform scale on resource of kind {0}".format(resource.kind)
)
try:
model = self.helper.get_model(self.api_version, 'scale')
except KubernetesException:
self.fail_json(
msg="Failed to fetch the 'Scale' model for API version {0}. Are you using the correct "
"API?".format(self.api_version)
)
scale_obj = model()
scale_obj.kind = 'scale'
scale_obj.api_version = self.api_version.lower()
scale_obj.metadata = self.helper.get_model(
self.api_version,
self.helper.get_base_model_name(scale_obj.swagger_types['metadata'])
)()
scale_obj.metadata.name = name
scale_obj.metadata.namespace = namespace
scale_obj.spec = self.helper.get_model(
self.api_version,
self.helper.get_base_model_name(scale_obj.swagger_types['spec'])
)()
scale_obj.spec.replicas = replicas
scale_obj = {'metadata': {'name': name, 'namespace': namespace}, 'spec': {'replicas': replicas}}
return_obj = None
stream = None
if wait:
w, stream = self._create_stream(namespace, wait_time)
w, stream = self._create_stream(resource, namespace, wait_time)
try:
method(name, namespace, scale_obj)
resource.scale.patch(body=scale_obj)
except Exception as exc:
self.fail_json(
msg="Scale request failed: {0}".format(exc.message)
)
if wait and stream is not None:
return_obj = self._read_stream(w, stream, name, replicas)
return_obj = self._read_stream(resource, w, stream, name, replicas)
if not return_obj:
return_obj = self._wait_for_response(name, namespace)
return return_obj
def _create_stream(self, namespace, wait_time):
def _create_stream(self, resource, namespace, wait_time):
""" Create a stream of events for the object """
w = None
stream = None
try:
list_method = self.helper.lookup_method('list', namespace)
w = watch.Watch()
w._api_client = self.helper.api_client
w._api_client = self.client.client
if namespace:
stream = w.stream(list_method, namespace, timeout_seconds=wait_time)
stream = w.stream(resource.get, serialize=False, namespace=namespace, timeout_seconds=wait_time)
else:
stream = w.stream(list_method, timeout_seconds=wait_time)
stream = w.stream(resource.get, serialize=False, namespace=namespace, timeout_seconds=wait_time)
except KubernetesException:
pass
except Exception:
raise
return w, stream
def _read_stream(self, watcher, stream, name, replicas):
def _read_stream(self, resource, watcher, stream, name, replicas):
""" Wait for ready_replicas to equal the requested number of replicas. """
return_obj = None
try:
for event in stream:
if event.get('object'):
obj = event['object']
obj = ResourceInstance(resource, event['object'])
if obj.metadata.name == name and hasattr(obj, 'status'):
if hasattr(obj.status, 'ready_replicas') and obj.status.ready_replicas == replicas:
if replicas == 0:
if not hasattr(obj.status, 'readyReplicas') or not obj.status.readyReplicas:
return_obj = obj
watcher.stop()
break
if hasattr(obj.status, 'readyReplicas') and obj.status.readyReplicas == replicas:
return_obj = obj
watcher.stop()
break
@ -212,27 +177,23 @@ class KubernetesAnsibleScaleModule(KubernetesRawModule):
if not return_obj:
self.fail_json(msg="Error fetching the patched object. Try a higher wait_timeout value.")
if return_obj.status.ready_replicas is None:
if replicas and return_obj.status.readyReplicas is None:
self.fail_json(msg="Failed to fetch the number of ready replicas. Try a higher wait_timeout value.")
if return_obj.status.ready_replicas != replicas:
if replicas and return_obj.status.readyReplicas != replicas:
self.fail_json(msg="Number of ready replicas is {0}. Failed to reach {1} ready replicas within "
"the wait_timeout period.".format(return_obj.status.ready_replicas, replicas))
return return_obj
def _wait_for_response(self, name, namespace):
def _wait_for_response(self, resource, name, namespace):
""" Wait for an API response """
tries = 0
half = math.ceil(20 / 2)
obj = None
while tries <= half:
obj = self.helper.get_object(name, namespace)
obj = resource.get(name=name, namespace=namespace)
if obj:
break
tries += 2
time.sleep(2)
return obj
class OpenShiftAnsibleScaleModule(OpenShiftAnsibleModuleMixin, KubernetesAnsibleScaleModule):
pass