mirror of
				https://github.com/ansible-collections/community.general.git
				synced 2025-10-26 05:50:36 -07:00 
			
		
		
		
	* Previous fix to this failed to account for open_url returning a filehandle
Fixes the bugs introduced by c6fb355
* read() from HTTPError for python-3.6+
HTTPError is funny.  It contains a filehandle to read the response from
and also makes it available via a read() method.  On earlier versions of
python (2 and 3) the read() method was enough to make it work with
json.load().  The newer version of json.load() needs a more complete
file interface than this and has stopped working.  Read the bytes,
transform to str and pass it in manually to fix it.
		
	
			
		
			
				
	
	
		
			303 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			303 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| ########################################################################
 | |
| #
 | |
| # (C) 2013, James Cammarata <jcammarata@ansible.com>
 | |
| #
 | |
| # This file is part of Ansible
 | |
| #
 | |
| # Ansible is free software: you can redistribute it and/or modify
 | |
| # it under the terms of the GNU General Public License as published by
 | |
| # the Free Software Foundation, either version 3 of the License, or
 | |
| # (at your option) any later version.
 | |
| #
 | |
| # Ansible is distributed in the hope that it will be useful,
 | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| # GNU General Public License for more details.
 | |
| #
 | |
| # You should have received a copy of the GNU General Public License
 | |
| # along with Ansible.  If not, see <http://www.gnu.org/licenses/>.
 | |
| #
 | |
| ########################################################################
 | |
| 
 | |
| from __future__ import (absolute_import, division, print_function)
 | |
| __metaclass__ = type
 | |
| 
 | |
| import json
 | |
| 
 | |
| import ansible.constants as C
 | |
| from ansible.compat.six import string_types
 | |
| from ansible.compat.six.moves.urllib.error import HTTPError
 | |
| from ansible.compat.six.moves.urllib.parse import quote as urlquote, urlencode
 | |
| from ansible.errors import AnsibleError
 | |
| from ansible.galaxy.token import GalaxyToken
 | |
| from ansible.module_utils._text import to_native, to_text
 | |
| from ansible.module_utils.urls import open_url
 | |
| 
 | |
| try:
 | |
|     from __main__ import display
 | |
| except ImportError:
 | |
|     from ansible.utils.display import Display
 | |
|     display = Display()
 | |
| 
 | |
| 
 | |
| def g_connect(method):
 | |
|     ''' wrapper to lazily initialize connection info to galaxy '''
 | |
|     def wrapped(self, *args, **kwargs):
 | |
|         if not self.initialized:
 | |
|             display.vvvv("Initial connection to galaxy_server: %s" % self._api_server)
 | |
|             server_version = self._get_server_api_version()
 | |
|             if server_version not in self.SUPPORTED_VERSIONS:
 | |
|                 raise AnsibleError("Unsupported Galaxy server API version: %s" % server_version)
 | |
| 
 | |
|             self.baseurl = '%s/api/%s' % (self._api_server, server_version)
 | |
|             self.version = server_version  # for future use
 | |
|             display.vvvv("Base API: %s" % self.baseurl)
 | |
|             self.initialized = True
 | |
|         return method(self, *args, **kwargs)
 | |
|     return wrapped
 | |
| 
 | |
| 
 | |
| class GalaxyAPI(object):
 | |
|     ''' This class is meant to be used as a API client for an Ansible Galaxy server '''
 | |
| 
 | |
|     SUPPORTED_VERSIONS = ['v1']
 | |
| 
 | |
|     def __init__(self, galaxy):
 | |
|         self.galaxy = galaxy
 | |
|         self.token = GalaxyToken()
 | |
|         self._api_server = C.GALAXY_SERVER
 | |
|         self._validate_certs = not galaxy.options.ignore_certs
 | |
|         self.baseurl = None
 | |
|         self.version = None
 | |
|         self.initialized = False
 | |
| 
 | |
|         display.debug('Validate TLS certificates: %s' % self._validate_certs)
 | |
| 
 | |
|         # set the API server
 | |
|         if galaxy.options.api_server != C.GALAXY_SERVER:
 | |
|             self._api_server = galaxy.options.api_server
 | |
| 
 | |
|     def __auth_header(self):
 | |
|         token = self.token.get()
 | |
|         if token is None:
 | |
|             raise AnsibleError("No access token. You must first use login to authenticate and obtain an access token.")
 | |
|         return {'Authorization': 'Token ' + token}
 | |
| 
 | |
|     @g_connect
 | |
|     def __call_galaxy(self, url, args=None, headers=None, method=None):
 | |
|         if args and not headers:
 | |
|             headers = self.__auth_header()
 | |
|         try:
 | |
|             display.vvv(url)
 | |
|             resp = open_url(url, data=args, validate_certs=self._validate_certs, headers=headers, method=method,
 | |
|                             timeout=20)
 | |
|             data = json.loads(to_text(resp.read(), errors='surrogate_or_strict'))
 | |
|         except HTTPError as e:
 | |
|             res = json.loads(to_text(e.fp.read(), errors='surrogate_or_strict'))
 | |
|             raise AnsibleError(res['detail'])
 | |
|         return data
 | |
| 
 | |
|     @property
 | |
|     def api_server(self):
 | |
|         return self._api_server
 | |
| 
 | |
|     @property
 | |
|     def validate_certs(self):
 | |
|         return self._validate_certs
 | |
| 
 | |
|     def _get_server_api_version(self):
 | |
|         """
 | |
|         Fetches the Galaxy API current version to ensure
 | |
|         the API server is up and reachable.
 | |
|         """
 | |
|         url = '%s/api/' % self._api_server
 | |
|         try:
 | |
|             return_data = open_url(url, validate_certs=self._validate_certs)
 | |
|         except Exception as e:
 | |
|             raise AnsibleError("Failed to get data from the API server (%s): %s " % (url, to_native(e)))
 | |
| 
 | |
|         try:
 | |
|             data = json.loads(to_text(return_data.read(), errors='surrogate_or_strict'))
 | |
|         except Exception as e:
 | |
|             raise AnsibleError("Could not process data from the API server (%s): %s " % (url, to_native(e)))
 | |
| 
 | |
|         if 'current_version' not in data:
 | |
|             raise AnsibleError("missing required 'current_version' from server response (%s)" % url)
 | |
| 
 | |
|         return data['current_version']
 | |
| 
 | |
|     @g_connect
 | |
|     def authenticate(self, github_token):
 | |
|         """
 | |
|         Retrieve an authentication token
 | |
|         """
 | |
|         url = '%s/tokens/' % self.baseurl
 | |
|         args = urlencode({"github_token": github_token})
 | |
|         resp = open_url(url, data=args, validate_certs=self._validate_certs, method="POST")
 | |
|         data = json.loads(to_text(resp.read(), errors='surrogate_or_strict'))
 | |
|         return data
 | |
| 
 | |
|     @g_connect
 | |
|     def create_import_task(self, github_user, github_repo, reference=None, role_name=None):
 | |
|         """
 | |
|         Post an import request
 | |
|         """
 | |
|         url = '%s/imports/' % self.baseurl
 | |
|         args = {
 | |
|             "github_user": github_user,
 | |
|             "github_repo": github_repo,
 | |
|             "github_reference": reference if reference else ""
 | |
|         }
 | |
|         if role_name:
 | |
|             args['alternate_role_name'] = role_name
 | |
|         elif github_repo.startswith('ansible-role'):
 | |
|             args['alternate_role_name'] = github_repo[len('ansible-role')+1:]
 | |
|         data = self.__call_galaxy(url, args=urlencode(args))
 | |
|         if data.get('results', None):
 | |
|             return data['results']
 | |
|         return data
 | |
| 
 | |
|     @g_connect
 | |
|     def get_import_task(self, task_id=None, github_user=None, github_repo=None):
 | |
|         """
 | |
|         Check the status of an import task.
 | |
|         """
 | |
|         url = '%s/imports/' % self.baseurl
 | |
|         if task_id is not None:
 | |
|             url = "%s?id=%d" % (url,task_id)
 | |
|         elif github_user is not None and github_repo is not None:
 | |
|             url = "%s?github_user=%s&github_repo=%s" % (url,github_user,github_repo)
 | |
|         else:
 | |
|             raise AnsibleError("Expected task_id or github_user and github_repo")
 | |
| 
 | |
|         data = self.__call_galaxy(url)
 | |
|         return data['results']
 | |
| 
 | |
|     @g_connect
 | |
|     def lookup_role_by_name(self, role_name, notify=True):
 | |
|         """
 | |
|         Find a role by name.
 | |
|         """
 | |
|         role_name = urlquote(role_name)
 | |
| 
 | |
|         try:
 | |
|             parts = role_name.split(".")
 | |
|             user_name = ".".join(parts[0:-1])
 | |
|             role_name = parts[-1]
 | |
|             if notify:
 | |
|                 display.display("- downloading role '%s', owned by %s" % (role_name, user_name))
 | |
|         except:
 | |
|             raise AnsibleError("Invalid role name (%s). Specify role as format: username.rolename" % role_name)
 | |
| 
 | |
|         url = '%s/roles/?owner__username=%s&name=%s' % (self.baseurl, user_name, role_name)
 | |
|         data = self.__call_galaxy(url)
 | |
|         if len(data["results"]) != 0:
 | |
|             return data["results"][0]
 | |
|         return None
 | |
| 
 | |
|     @g_connect
 | |
|     def fetch_role_related(self, related, role_id):
 | |
|         """
 | |
|         Fetch the list of related items for the given role.
 | |
|         The url comes from the 'related' field of the role.
 | |
|         """
 | |
| 
 | |
|         try:
 | |
|             url = '%s/roles/%d/%s/?page_size=50' % (self.baseurl, int(role_id), related)
 | |
|             data = self.__call_galaxy(url)
 | |
|             results = data['results']
 | |
|             done = (data.get('next_link', None) is None)
 | |
|             while not done:
 | |
|                 url = '%s%s' % (self._api_server, data['next_link'])
 | |
|                 data = self.__call_galaxy(url)
 | |
|                 results += data['results']
 | |
|                 done = (data.get('next_link', None) is None)
 | |
|             return results
 | |
|         except:
 | |
|             return None
 | |
| 
 | |
|     @g_connect
 | |
|     def get_list(self, what):
 | |
|         """
 | |
|         Fetch the list of items specified.
 | |
|         """
 | |
|         try:
 | |
|             url = '%s/%s/?page_size' % (self.baseurl, what)
 | |
|             data = self.__call_galaxy(url)
 | |
|             if "results" in data:
 | |
|                 results = data['results']
 | |
|             else:
 | |
|                 results = data
 | |
|             done = True
 | |
|             if "next" in data:
 | |
|                 done = (data.get('next_link', None) is None)
 | |
|             while not done:
 | |
|                 url = '%s%s' % (self._api_server, data['next_link'])
 | |
|                 data = self.__call_galaxy(url)
 | |
|                 results += data['results']
 | |
|                 done = (data.get('next_link', None) is None)
 | |
|             return results
 | |
|         except Exception as error:
 | |
|             raise AnsibleError("Failed to download the %s list: %s" % (what, str(error)))
 | |
| 
 | |
|     @g_connect
 | |
|     def search_roles(self, search, **kwargs):
 | |
| 
 | |
|         search_url = self.baseurl + '/search/roles/?'
 | |
| 
 | |
|         if search:
 | |
|             search_url += '&autocomplete=' + urlquote(search)
 | |
| 
 | |
|         tags = kwargs.get('tags',None)
 | |
|         platforms = kwargs.get('platforms', None)
 | |
|         page_size = kwargs.get('page_size', None)
 | |
|         author = kwargs.get('author', None)
 | |
| 
 | |
|         if tags and isinstance(tags, string_types):
 | |
|             tags = tags.split(',')
 | |
|             search_url += '&tags_autocomplete=' + '+'.join(tags)
 | |
| 
 | |
|         if platforms and isinstance(platforms, string_types):
 | |
|             platforms = platforms.split(',')
 | |
|             search_url += '&platforms_autocomplete=' + '+'.join(platforms)
 | |
| 
 | |
|         if page_size:
 | |
|             search_url += '&page_size=%s' % page_size
 | |
| 
 | |
|         if author:
 | |
|             search_url += '&username_autocomplete=%s' % author
 | |
| 
 | |
|         data = self.__call_galaxy(search_url)
 | |
|         return data
 | |
| 
 | |
|     @g_connect
 | |
|     def add_secret(self, source, github_user, github_repo, secret):
 | |
|         url = "%s/notification_secrets/" % self.baseurl
 | |
|         args = urlencode({
 | |
|             "source": source,
 | |
|             "github_user": github_user,
 | |
|             "github_repo": github_repo,
 | |
|             "secret": secret
 | |
|         })
 | |
|         data = self.__call_galaxy(url, args=args)
 | |
|         return data
 | |
| 
 | |
|     @g_connect
 | |
|     def list_secrets(self):
 | |
|         url = "%s/notification_secrets" % self.baseurl
 | |
|         data = self.__call_galaxy(url, headers=self.__auth_header())
 | |
|         return data
 | |
| 
 | |
|     @g_connect
 | |
|     def remove_secret(self, secret_id):
 | |
|         url = "%s/notification_secrets/%s/" % (self.baseurl, secret_id)
 | |
|         data = self.__call_galaxy(url, headers=self.__auth_header(), method='DELETE')
 | |
|         return data
 | |
| 
 | |
|     @g_connect
 | |
|     def delete_role(self, github_user, github_repo):
 | |
|         url = "%s/removerole/?github_user=%s&github_repo=%s" % (self.baseurl,github_user,github_repo)
 | |
|         data = self.__call_galaxy(url, headers=self.__auth_header(), method='DELETE')
 | |
|         return data
 |