mirror of
				https://github.com/ansible-collections/community.general.git
				synced 2025-10-26 05:50:36 -07:00 
			
		
		
		
	use dict comprehension in plugins, part 3 (#8833)
* use dict comprehension in plugins, part 3
* add changelog frag
(cherry picked from commit 26df6c7657)
Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>
		
	
			
		
			
				
	
	
		
			502 lines
		
	
	
	
		
			21 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			502 lines
		
	
	
	
		
			21 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| # Copyright (c) 2022 Western Digital Corporation
 | |
| # GNU General Public License v3.0+ (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt)
 | |
| # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
 | |
| # SPDX-License-Identifier: GPL-3.0-or-later
 | |
| 
 | |
| from __future__ import absolute_import, division, print_function
 | |
| __metaclass__ = type
 | |
| 
 | |
| import json
 | |
| import os
 | |
| import uuid
 | |
| 
 | |
| from ansible.module_utils.urls import open_url
 | |
| from ansible.module_utils.common.text.converters import to_native
 | |
| from ansible.module_utils.common.text.converters import to_text
 | |
| from ansible.module_utils.six.moves.urllib.error import URLError, HTTPError
 | |
| from ansible.module_utils.six.moves.urllib.parse import urlparse
 | |
| 
 | |
| 
 | |
| GET_HEADERS = {'accept': 'application/json'}
 | |
| PUT_HEADERS = {'content-type': 'application/json', 'accept': 'application/json'}
 | |
| POST_HEADERS = {'content-type': 'application/json', 'accept': 'application/json'}
 | |
| DELETE_HEADERS = {'accept': 'application/json'}
 | |
| 
 | |
| HEALTH_OK = 5
 | |
| 
 | |
| 
 | |
| class OcapiUtils(object):
 | |
| 
 | |
|     def __init__(self, creds, base_uri, proxy_slot_number, timeout, module):
 | |
|         self.root_uri = base_uri
 | |
|         self.proxy_slot_number = proxy_slot_number
 | |
|         self.creds = creds
 | |
|         self.timeout = timeout
 | |
|         self.module = module
 | |
| 
 | |
|     def _auth_params(self):
 | |
|         """
 | |
|         Return tuple of required authentication params based on the username and password.
 | |
| 
 | |
|         :return: tuple of username, password
 | |
|         """
 | |
|         username = self.creds['user']
 | |
|         password = self.creds['pswd']
 | |
|         force_basic_auth = True
 | |
|         return username, password, force_basic_auth
 | |
| 
 | |
|     def get_request(self, uri):
 | |
|         req_headers = dict(GET_HEADERS)
 | |
|         username, password, basic_auth = self._auth_params()
 | |
|         try:
 | |
|             resp = open_url(uri, method="GET", headers=req_headers,
 | |
|                             url_username=username, url_password=password,
 | |
|                             force_basic_auth=basic_auth, validate_certs=False,
 | |
|                             follow_redirects='all',
 | |
|                             use_proxy=True, timeout=self.timeout)
 | |
|             data = json.loads(to_native(resp.read()))
 | |
|             headers = {k.lower(): v for (k, v) in resp.info().items()}
 | |
|         except HTTPError as e:
 | |
|             return {'ret': False,
 | |
|                     'msg': "HTTP Error %s on GET request to '%s'"
 | |
|                            % (e.code, uri),
 | |
|                     'status': e.code}
 | |
|         except URLError as e:
 | |
|             return {'ret': False, 'msg': "URL Error on GET request to '%s': '%s'"
 | |
|                                          % (uri, e.reason)}
 | |
|         # Almost all errors should be caught above, but just in case
 | |
|         except Exception as e:
 | |
|             return {'ret': False,
 | |
|                     'msg': "Failed GET request to '%s': '%s'" % (uri, to_text(e))}
 | |
|         return {'ret': True, 'data': data, 'headers': headers}
 | |
| 
 | |
|     def delete_request(self, uri, etag=None):
 | |
|         req_headers = dict(DELETE_HEADERS)
 | |
|         if etag is not None:
 | |
|             req_headers['If-Match'] = etag
 | |
|         username, password, basic_auth = self._auth_params()
 | |
|         try:
 | |
|             resp = open_url(uri, method="DELETE", headers=req_headers,
 | |
|                             url_username=username, url_password=password,
 | |
|                             force_basic_auth=basic_auth, validate_certs=False,
 | |
|                             follow_redirects='all',
 | |
|                             use_proxy=True, timeout=self.timeout)
 | |
|             if resp.status != 204:
 | |
|                 data = json.loads(to_native(resp.read()))
 | |
|             else:
 | |
|                 data = ""
 | |
|             headers = {k.lower(): v for (k, v) in resp.info().items()}
 | |
|         except HTTPError as e:
 | |
|             return {'ret': False,
 | |
|                     'msg': "HTTP Error %s on DELETE request to '%s'"
 | |
|                            % (e.code, uri),
 | |
|                     'status': e.code}
 | |
|         except URLError as e:
 | |
|             return {'ret': False, 'msg': "URL Error on DELETE request to '%s': '%s'"
 | |
|                                          % (uri, e.reason)}
 | |
|         # Almost all errors should be caught above, but just in case
 | |
|         except Exception as e:
 | |
|             return {'ret': False,
 | |
|                     'msg': "Failed DELETE request to '%s': '%s'" % (uri, to_text(e))}
 | |
|         return {'ret': True, 'data': data, 'headers': headers}
 | |
| 
 | |
|     def put_request(self, uri, payload, etag=None):
 | |
|         req_headers = dict(PUT_HEADERS)
 | |
|         if etag is not None:
 | |
|             req_headers['If-Match'] = etag
 | |
|         username, password, basic_auth = self._auth_params()
 | |
|         try:
 | |
|             resp = open_url(uri, data=json.dumps(payload),
 | |
|                             headers=req_headers, method="PUT",
 | |
|                             url_username=username, url_password=password,
 | |
|                             force_basic_auth=basic_auth, validate_certs=False,
 | |
|                             follow_redirects='all',
 | |
|                             use_proxy=True, timeout=self.timeout)
 | |
|             headers = {k.lower(): v for (k, v) in resp.info().items()}
 | |
|         except HTTPError as e:
 | |
|             return {'ret': False,
 | |
|                     'msg': "HTTP Error %s on PUT request to '%s'"
 | |
|                            % (e.code, uri),
 | |
|                     'status': e.code}
 | |
|         except URLError as e:
 | |
|             return {'ret': False, 'msg': "URL Error on PUT request to '%s': '%s'"
 | |
|                                          % (uri, e.reason)}
 | |
|         # Almost all errors should be caught above, but just in case
 | |
|         except Exception as e:
 | |
|             return {'ret': False,
 | |
|                     'msg': "Failed PUT request to '%s': '%s'" % (uri, to_text(e))}
 | |
|         return {'ret': True, 'headers': headers, 'resp': resp}
 | |
| 
 | |
|     def post_request(self, uri, payload, content_type="application/json", timeout=None):
 | |
|         req_headers = dict(POST_HEADERS)
 | |
|         if content_type != "application/json":
 | |
|             req_headers["content-type"] = content_type
 | |
|         username, password, basic_auth = self._auth_params()
 | |
|         if content_type == "application/json":
 | |
|             request_data = json.dumps(payload)
 | |
|         else:
 | |
|             request_data = payload
 | |
|         try:
 | |
|             resp = open_url(uri, data=request_data,
 | |
|                             headers=req_headers, method="POST",
 | |
|                             url_username=username, url_password=password,
 | |
|                             force_basic_auth=basic_auth, validate_certs=False,
 | |
|                             follow_redirects='all',
 | |
|                             use_proxy=True, timeout=self.timeout if timeout is None else timeout)
 | |
|             headers = {k.lower(): v for (k, v) in resp.info().items()}
 | |
|         except HTTPError as e:
 | |
|             return {'ret': False,
 | |
|                     'msg': "HTTP Error %s on POST request to '%s'"
 | |
|                            % (e.code, uri),
 | |
|                     'status': e.code}
 | |
|         except URLError as e:
 | |
|             return {'ret': False, 'msg': "URL Error on POST request to '%s': '%s'"
 | |
|                                          % (uri, e.reason)}
 | |
|         # Almost all errors should be caught above, but just in case
 | |
|         except Exception as e:
 | |
|             return {'ret': False,
 | |
|                     'msg': "Failed POST request to '%s': '%s'" % (uri, to_text(e))}
 | |
|         return {'ret': True, 'headers': headers, 'resp': resp}
 | |
| 
 | |
|     def get_uri_with_slot_number_query_param(self, uri):
 | |
|         """Return the URI with proxy slot number added as a query param, if there is one.
 | |
| 
 | |
|         If a proxy slot number is provided, to access it, we must append it as a query parameter.
 | |
|         This method returns the given URI with the slotnumber query param added, if there is one.
 | |
|         If there is not a proxy slot number, it just returns the URI as it was passed in.
 | |
|         """
 | |
|         if self.proxy_slot_number is not None:
 | |
|             parsed_url = urlparse(uri)
 | |
|             return parsed_url._replace(query="slotnumber=" + str(self.proxy_slot_number)).geturl()
 | |
|         else:
 | |
|             return uri
 | |
| 
 | |
|     def manage_system_power(self, command):
 | |
|         """Process a command to manage the system power.
 | |
| 
 | |
|         :param str command: The Ansible command being processed.
 | |
|         """
 | |
|         if command == "PowerGracefulRestart":
 | |
|             resource_uri = self.root_uri
 | |
|             resource_uri = self.get_uri_with_slot_number_query_param(resource_uri)
 | |
| 
 | |
|             # Get the resource so that we have the Etag
 | |
|             response = self.get_request(resource_uri)
 | |
|             if 'etag' not in response['headers']:
 | |
|                 return {'ret': False, 'msg': 'Etag not found in response.'}
 | |
|             etag = response['headers']['etag']
 | |
|             if response['ret'] is False:
 | |
|                 return response
 | |
| 
 | |
|             # Issue the PUT to do the reboot (unless we are in check mode)
 | |
|             if self.module.check_mode:
 | |
|                 return {
 | |
|                     'ret': True,
 | |
|                     'changed': True,
 | |
|                     'msg': 'Update not performed in check mode.'
 | |
|                 }
 | |
|             payload = {'Reboot': True}
 | |
|             response = self.put_request(resource_uri, payload, etag)
 | |
|             if response['ret'] is False:
 | |
|                 return response
 | |
|         elif command.startswith("PowerMode"):
 | |
|             return self.manage_power_mode(command)
 | |
|         else:
 | |
|             return {'ret': False, 'msg': 'Invalid command: ' + command}
 | |
| 
 | |
|         return {'ret': True}
 | |
| 
 | |
|     def manage_chassis_indicator_led(self, command):
 | |
|         """Process a command to manage the chassis indicator LED.
 | |
| 
 | |
|         :param string command: The Ansible command being processed.
 | |
|         """
 | |
|         return self.manage_indicator_led(command, self.root_uri)
 | |
| 
 | |
|     def manage_indicator_led(self, command, resource_uri=None):
 | |
|         """Process a command to manage an indicator LED.
 | |
| 
 | |
|         :param string command: The Ansible command being processed.
 | |
|         :param string resource_uri: URI of the resource whose indicator LED is being managed.
 | |
|         """
 | |
|         key = "IndicatorLED"
 | |
|         if resource_uri is None:
 | |
|             resource_uri = self.root_uri
 | |
|         resource_uri = self.get_uri_with_slot_number_query_param(resource_uri)
 | |
| 
 | |
|         payloads = {
 | |
|             'IndicatorLedOn': {
 | |
|                 'ID': 2
 | |
|             },
 | |
|             'IndicatorLedOff': {
 | |
|                 'ID': 4
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         response = self.get_request(resource_uri)
 | |
|         if 'etag' not in response['headers']:
 | |
|             return {'ret': False, 'msg': 'Etag not found in response.'}
 | |
|         etag = response['headers']['etag']
 | |
|         if response['ret'] is False:
 | |
|             return response
 | |
|         data = response['data']
 | |
|         if key not in data:
 | |
|             return {'ret': False, 'msg': "Key %s not found" % key}
 | |
|         if 'ID' not in data[key]:
 | |
|             return {'ret': False, 'msg': 'IndicatorLED for resource has no ID.'}
 | |
| 
 | |
|         if command in payloads.keys():
 | |
|             # See if the LED is already set as requested.
 | |
|             current_led_status = data[key]['ID']
 | |
|             if current_led_status == payloads[command]['ID']:
 | |
|                 return {'ret': True, 'changed': False}
 | |
| 
 | |
|             # Set the LED (unless we are in check mode)
 | |
|             if self.module.check_mode:
 | |
|                 return {
 | |
|                     'ret': True,
 | |
|                     'changed': True,
 | |
|                     'msg': 'Update not performed in check mode.'
 | |
|                 }
 | |
|             payload = {'IndicatorLED': payloads[command]}
 | |
|             response = self.put_request(resource_uri, payload, etag)
 | |
|             if response['ret'] is False:
 | |
|                 return response
 | |
|         else:
 | |
|             return {'ret': False, 'msg': 'Invalid command'}
 | |
| 
 | |
|         return {'ret': True}
 | |
| 
 | |
|     def manage_power_mode(self, command):
 | |
|         key = "PowerState"
 | |
|         resource_uri = self.get_uri_with_slot_number_query_param(self.root_uri)
 | |
| 
 | |
|         payloads = {
 | |
|             "PowerModeNormal": 2,
 | |
|             "PowerModeLow": 4
 | |
|         }
 | |
| 
 | |
|         response = self.get_request(resource_uri)
 | |
|         if 'etag' not in response['headers']:
 | |
|             return {'ret': False, 'msg': 'Etag not found in response.'}
 | |
|         etag = response['headers']['etag']
 | |
|         if response['ret'] is False:
 | |
|             return response
 | |
|         data = response['data']
 | |
|         if key not in data:
 | |
|             return {'ret': False, 'msg': "Key %s not found" % key}
 | |
|         if 'ID' not in data[key]:
 | |
|             return {'ret': False, 'msg': 'PowerState for resource has no ID.'}
 | |
| 
 | |
|         if command in payloads.keys():
 | |
|             # See if the PowerState is already set as requested.
 | |
|             current_power_state = data[key]['ID']
 | |
|             if current_power_state == payloads[command]:
 | |
|                 return {'ret': True, 'changed': False}
 | |
| 
 | |
|             # Set the Power State (unless we are in check mode)
 | |
|             if self.module.check_mode:
 | |
|                 return {
 | |
|                     'ret': True,
 | |
|                     'changed': True,
 | |
|                     'msg': 'Update not performed in check mode.'
 | |
|                 }
 | |
|             payload = {'PowerState': {"ID": payloads[command]}}
 | |
|             response = self.put_request(resource_uri, payload, etag)
 | |
|             if response['ret'] is False:
 | |
|                 return response
 | |
|         else:
 | |
|             return {'ret': False, 'msg': 'Invalid command: ' + command}
 | |
| 
 | |
|         return {'ret': True}
 | |
| 
 | |
|     def prepare_multipart_firmware_upload(self, filename):
 | |
|         """Prepare a multipart/form-data body for OCAPI firmware upload.
 | |
| 
 | |
|         :arg filename: The name of the file to upload.
 | |
|         :returns: tuple of (content_type, body) where ``content_type`` is
 | |
|             the ``multipart/form-data`` ``Content-Type`` header including
 | |
|             ``boundary`` and ``body`` is the prepared bytestring body
 | |
| 
 | |
|         Prepares the body to include "FirmwareFile" field with the contents of the file.
 | |
|         Because some OCAPI targets do not support Base-64 encoding for multipart/form-data,
 | |
|         this method sends the file as binary.
 | |
|         """
 | |
|         boundary = str(uuid.uuid4())  # Generate a random boundary
 | |
|         body = "--" + boundary + '\r\n'
 | |
|         body += 'Content-Disposition: form-data; name="FirmwareFile"; filename="%s"\r\n' % to_native(os.path.basename(filename))
 | |
|         body += 'Content-Type: application/octet-stream\r\n\r\n'
 | |
|         body_bytes = bytearray(body, 'utf-8')
 | |
|         with open(filename, 'rb') as f:
 | |
|             body_bytes += f.read()
 | |
|         body_bytes += bytearray("\r\n--%s--" % boundary, 'utf-8')
 | |
|         return ("multipart/form-data; boundary=%s" % boundary,
 | |
|                 body_bytes)
 | |
| 
 | |
|     def upload_firmware_image(self, update_image_path):
 | |
|         """Perform Firmware Upload to the OCAPI storage device.
 | |
| 
 | |
|         :param str update_image_path: The path/filename of the firmware image, on the local filesystem.
 | |
|         """
 | |
|         if not (os.path.exists(update_image_path) and os.path.isfile(update_image_path)):
 | |
|             return {'ret': False, 'msg': 'File does not exist.'}
 | |
|         url = self.root_uri + "OperatingSystem"
 | |
|         url = self.get_uri_with_slot_number_query_param(url)
 | |
|         content_type, b_form_data = self.prepare_multipart_firmware_upload(update_image_path)
 | |
| 
 | |
|         # Post the firmware (unless we are in check mode)
 | |
|         if self.module.check_mode:
 | |
|             return {
 | |
|                 'ret': True,
 | |
|                 'changed': True,
 | |
|                 'msg': 'Update not performed in check mode.'
 | |
|             }
 | |
|         result = self.post_request(url, b_form_data, content_type=content_type, timeout=300)
 | |
|         if result['ret'] is False:
 | |
|             return result
 | |
|         return {'ret': True}
 | |
| 
 | |
|     def update_firmware_image(self):
 | |
|         """Perform a Firmware Update on the OCAPI storage device."""
 | |
|         resource_uri = self.root_uri
 | |
|         resource_uri = self.get_uri_with_slot_number_query_param(resource_uri)
 | |
|         # We have to do a GET to obtain the Etag.  It's required on the PUT.
 | |
|         response = self.get_request(resource_uri)
 | |
|         if response['ret'] is False:
 | |
|             return response
 | |
|         if 'etag' not in response['headers']:
 | |
|             return {'ret': False, 'msg': 'Etag not found in response.'}
 | |
|         etag = response['headers']['etag']
 | |
| 
 | |
|         # Issue the PUT (unless we are in check mode)
 | |
|         if self.module.check_mode:
 | |
|             return {
 | |
|                 'ret': True,
 | |
|                 'changed': True,
 | |
|                 'msg': 'Update not performed in check mode.'
 | |
|             }
 | |
|         payload = {'FirmwareUpdate': True}
 | |
|         response = self.put_request(resource_uri, payload, etag)
 | |
|         if response['ret'] is False:
 | |
|             return response
 | |
| 
 | |
|         return {'ret': True, 'jobUri': response["headers"]["location"]}
 | |
| 
 | |
|     def activate_firmware_image(self):
 | |
|         """Perform a Firmware Activate on the OCAPI storage device."""
 | |
|         resource_uri = self.root_uri
 | |
|         resource_uri = self.get_uri_with_slot_number_query_param(resource_uri)
 | |
|         # We have to do a GET to obtain the Etag.  It's required on the PUT.
 | |
|         response = self.get_request(resource_uri)
 | |
|         if 'etag' not in response['headers']:
 | |
|             return {'ret': False, 'msg': 'Etag not found in response.'}
 | |
|         etag = response['headers']['etag']
 | |
|         if response['ret'] is False:
 | |
|             return response
 | |
| 
 | |
|         # Issue the PUT (unless we are in check mode)
 | |
|         if self.module.check_mode:
 | |
|             return {
 | |
|                 'ret': True,
 | |
|                 'changed': True,
 | |
|                 'msg': 'Update not performed in check mode.'
 | |
|             }
 | |
|         payload = {'FirmwareActivate': True}
 | |
|         response = self.put_request(resource_uri, payload, etag)
 | |
|         if response['ret'] is False:
 | |
|             return response
 | |
| 
 | |
|         return {'ret': True, 'jobUri': response["headers"]["location"]}
 | |
| 
 | |
|     def get_job_status(self, job_uri):
 | |
|         """Get the status of a job.
 | |
| 
 | |
|         :param str job_uri: The URI of the job's status monitor.
 | |
|         """
 | |
|         job_uri = self.get_uri_with_slot_number_query_param(job_uri)
 | |
|         response = self.get_request(job_uri)
 | |
|         if response['ret'] is False:
 | |
|             if response.get('status') == 404:
 | |
|                 # Job not found -- assume 0%
 | |
|                 return {
 | |
|                     "ret": True,
 | |
|                     "percentComplete": 0,
 | |
|                     "operationStatus": "Not Available",
 | |
|                     "operationStatusId": 1,
 | |
|                     "operationHealth": None,
 | |
|                     "operationHealthId": None,
 | |
|                     "details": "Job does not exist.",
 | |
|                     "jobExists": False
 | |
|                 }
 | |
|             else:
 | |
|                 return response
 | |
|         details = response["data"]["Status"].get("Details")
 | |
|         if isinstance(details, str):
 | |
|             details = [details]
 | |
|         health_list = response["data"]["Status"]["Health"]
 | |
|         return_value = {
 | |
|             "ret": True,
 | |
|             "percentComplete": response["data"]["PercentComplete"],
 | |
|             "operationStatus": response["data"]["Status"]["State"]["Name"],
 | |
|             "operationStatusId": response["data"]["Status"]["State"]["ID"],
 | |
|             "operationHealth": health_list[0]["Name"] if len(health_list) > 0 else None,
 | |
|             "operationHealthId": health_list[0]["ID"] if len(health_list) > 0 else None,
 | |
|             "details": details,
 | |
|             "jobExists": True
 | |
|         }
 | |
|         return return_value
 | |
| 
 | |
|     def delete_job(self, job_uri):
 | |
|         """Delete the OCAPI job referenced by the specified job_uri."""
 | |
|         job_uri = self.get_uri_with_slot_number_query_param(job_uri)
 | |
|         # We have to do a GET to obtain the Etag.  It's required on the DELETE.
 | |
|         response = self.get_request(job_uri)
 | |
| 
 | |
|         if response['ret'] is True:
 | |
|             if 'etag' not in response['headers']:
 | |
|                 return {'ret': False, 'msg': 'Etag not found in response.'}
 | |
|             else:
 | |
|                 etag = response['headers']['etag']
 | |
| 
 | |
|             if response['data']['PercentComplete'] != 100:
 | |
|                 return {
 | |
|                     'ret': False,
 | |
|                     'changed': False,
 | |
|                     'msg': 'Cannot delete job because it is in progress.'
 | |
|                 }
 | |
| 
 | |
|         if response['ret'] is False:
 | |
|             if response['status'] == 404:
 | |
|                 return {
 | |
|                     'ret': True,
 | |
|                     'changed': False,
 | |
|                     'msg': 'Job already deleted.'
 | |
|                 }
 | |
|             return response
 | |
|         if self.module.check_mode:
 | |
|             return {
 | |
|                 'ret': True,
 | |
|                 'changed': True,
 | |
|                 'msg': 'Update not performed in check mode.'
 | |
|             }
 | |
| 
 | |
|         # Do the DELETE (unless we are in check mode)
 | |
|         response = self.delete_request(job_uri, etag)
 | |
|         if response['ret'] is False:
 | |
|             if response['status'] == 404:
 | |
|                 return {
 | |
|                     'ret': True,
 | |
|                     'changed': False
 | |
|                 }
 | |
|             elif response['status'] == 409:
 | |
|                 return {
 | |
|                     'ret': False,
 | |
|                     'changed': False,
 | |
|                     'msg': 'Cannot delete job because it is in progress.'
 | |
|                 }
 | |
|             return response
 | |
|         return {
 | |
|             'ret': True,
 | |
|             'changed': True
 | |
|         }
 |