mirror of
				https://github.com/ansible-collections/community.general.git
				synced 2025-10-25 05:23:58 -07:00 
			
		
		
		
	* Once cli args are parsed, they're constant.  So, save the parsed args
  into the global context for everyone else to use them from now on.
* Port cli scripts to use the CLIARGS in the context
* Refactor call to parse cli args into the run() method
* Fix unittests for changes to the internals of CLI arg parsing
* Port callback plugins to use context.CLIARGS
  * Got rid of the private self._options attribute
  * Use context.CLIARGS in the individual callback plugins instead.
  * Also output positional arguments in default and unixy plugins
  * Code has been simplified since we're now dealing with a dict rather
    than Optparse.Value
		
	
			
		
			
				
	
	
		
			299 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			299 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| ########################################################################
 | |
| #
 | |
| # (C) 2013, James Cammarata <jcammarata@ansible.com>
 | |
| #
 | |
| # This file is part of Ansible
 | |
| #
 | |
| # Ansible is free software: you can redistribute it and/or modify
 | |
| # it under the terms of the GNU General Public License as published by
 | |
| # the Free Software Foundation, either version 3 of the License, or
 | |
| # (at your option) any later version.
 | |
| #
 | |
| # Ansible is distributed in the hope that it will be useful,
 | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| # GNU General Public License for more details.
 | |
| #
 | |
| # You should have received a copy of the GNU General Public License
 | |
| # along with Ansible.  If not, see <http://www.gnu.org/licenses/>.
 | |
| #
 | |
| ########################################################################
 | |
| 
 | |
| from __future__ import (absolute_import, division, print_function)
 | |
| __metaclass__ = type
 | |
| 
 | |
| import json
 | |
| 
 | |
| from ansible import context
 | |
| import ansible.constants as C
 | |
| from ansible.errors import AnsibleError
 | |
| from ansible.galaxy.token import GalaxyToken
 | |
| from ansible.module_utils.six import string_types
 | |
| from ansible.module_utils.six.moves.urllib.error import HTTPError
 | |
| from ansible.module_utils.six.moves.urllib.parse import quote as urlquote, urlencode
 | |
| from ansible.module_utils._text import to_bytes, to_native, to_text
 | |
| from ansible.module_utils.urls import open_url
 | |
| from ansible.utils.display import Display
 | |
| 
 | |
| display = Display()
 | |
| 
 | |
| 
 | |
| def g_connect(method):
 | |
|     ''' wrapper to lazily initialize connection info to galaxy '''
 | |
|     def wrapped(self, *args, **kwargs):
 | |
|         if not self.initialized:
 | |
|             display.vvvv("Initial connection to galaxy_server: %s" % self._api_server)
 | |
|             server_version = self._get_server_api_version()
 | |
|             if server_version not in self.SUPPORTED_VERSIONS:
 | |
|                 raise AnsibleError("Unsupported Galaxy server API version: %s" % server_version)
 | |
| 
 | |
|             self.baseurl = '%s/api/%s' % (self._api_server, server_version)
 | |
|             self.version = server_version  # for future use
 | |
|             display.vvvv("Base API: %s" % self.baseurl)
 | |
|             self.initialized = True
 | |
|         return method(self, *args, **kwargs)
 | |
|     return wrapped
 | |
| 
 | |
| 
 | |
| class GalaxyAPI(object):
 | |
|     ''' This class is meant to be used as a API client for an Ansible Galaxy server '''
 | |
| 
 | |
|     SUPPORTED_VERSIONS = ['v1']
 | |
| 
 | |
|     def __init__(self, galaxy):
 | |
|         self.galaxy = galaxy
 | |
|         self.token = GalaxyToken()
 | |
|         self._api_server = C.GALAXY_SERVER
 | |
|         self._validate_certs = not context.CLIARGS['ignore_certs']
 | |
|         self.baseurl = None
 | |
|         self.version = None
 | |
|         self.initialized = False
 | |
| 
 | |
|         display.debug('Validate TLS certificates: %s' % self._validate_certs)
 | |
| 
 | |
|         # set the API server
 | |
|         if context.CLIARGS['api_server'] != C.GALAXY_SERVER:
 | |
|             self._api_server = context.CLIARGS['api_server']
 | |
| 
 | |
|     def __auth_header(self):
 | |
|         token = self.token.get()
 | |
|         if token is None:
 | |
|             raise AnsibleError("No access token. You must first use login to authenticate and obtain an access token.")
 | |
|         return {'Authorization': 'Token ' + token}
 | |
| 
 | |
|     @g_connect
 | |
|     def __call_galaxy(self, url, args=None, headers=None, method=None):
 | |
|         if args and not headers:
 | |
|             headers = self.__auth_header()
 | |
|         try:
 | |
|             display.vvv(url)
 | |
|             resp = open_url(url, data=args, validate_certs=self._validate_certs, headers=headers, method=method,
 | |
|                             timeout=20)
 | |
|             data = json.loads(to_text(resp.read(), errors='surrogate_or_strict'))
 | |
|         except HTTPError as e:
 | |
|             res = json.loads(to_text(e.fp.read(), errors='surrogate_or_strict'))
 | |
|             raise AnsibleError(res['detail'])
 | |
|         return data
 | |
| 
 | |
|     @property
 | |
|     def api_server(self):
 | |
|         return self._api_server
 | |
| 
 | |
|     @property
 | |
|     def validate_certs(self):
 | |
|         return self._validate_certs
 | |
| 
 | |
|     def _get_server_api_version(self):
 | |
|         """
 | |
|         Fetches the Galaxy API current version to ensure
 | |
|         the API server is up and reachable.
 | |
|         """
 | |
|         url = '%s/api/' % self._api_server
 | |
|         try:
 | |
|             return_data = open_url(url, validate_certs=self._validate_certs)
 | |
|         except Exception as e:
 | |
|             raise AnsibleError("Failed to get data from the API server (%s): %s " % (url, to_native(e)))
 | |
| 
 | |
|         try:
 | |
|             data = json.loads(to_text(return_data.read(), errors='surrogate_or_strict'))
 | |
|         except Exception as e:
 | |
|             raise AnsibleError("Could not process data from the API server (%s): %s " % (url, to_native(e)))
 | |
| 
 | |
|         if 'current_version' not in data:
 | |
|             raise AnsibleError("missing required 'current_version' from server response (%s)" % url)
 | |
| 
 | |
|         return data['current_version']
 | |
| 
 | |
|     @g_connect
 | |
|     def authenticate(self, github_token):
 | |
|         """
 | |
|         Retrieve an authentication token
 | |
|         """
 | |
|         url = '%s/tokens/' % self.baseurl
 | |
|         args = urlencode({"github_token": github_token})
 | |
|         resp = open_url(url, data=args, validate_certs=self._validate_certs, method="POST")
 | |
|         data = json.loads(to_text(resp.read(), errors='surrogate_or_strict'))
 | |
|         return data
 | |
| 
 | |
|     @g_connect
 | |
|     def create_import_task(self, github_user, github_repo, reference=None, role_name=None):
 | |
|         """
 | |
|         Post an import request
 | |
|         """
 | |
|         url = '%s/imports/' % self.baseurl
 | |
|         args = {
 | |
|             "github_user": github_user,
 | |
|             "github_repo": github_repo,
 | |
|             "github_reference": reference if reference else ""
 | |
|         }
 | |
|         if role_name:
 | |
|             args['alternate_role_name'] = role_name
 | |
|         elif github_repo.startswith('ansible-role'):
 | |
|             args['alternate_role_name'] = github_repo[len('ansible-role') + 1:]
 | |
|         data = self.__call_galaxy(url, args=urlencode(args), method="POST")
 | |
|         if data.get('results', None):
 | |
|             return data['results']
 | |
|         return data
 | |
| 
 | |
|     @g_connect
 | |
|     def get_import_task(self, task_id=None, github_user=None, github_repo=None):
 | |
|         """
 | |
|         Check the status of an import task.
 | |
|         """
 | |
|         url = '%s/imports/' % self.baseurl
 | |
|         if task_id is not None:
 | |
|             url = "%s?id=%d" % (url, task_id)
 | |
|         elif github_user is not None and github_repo is not None:
 | |
|             url = "%s?github_user=%s&github_repo=%s" % (url, github_user, github_repo)
 | |
|         else:
 | |
|             raise AnsibleError("Expected task_id or github_user and github_repo")
 | |
| 
 | |
|         data = self.__call_galaxy(url)
 | |
|         return data['results']
 | |
| 
 | |
|     @g_connect
 | |
|     def lookup_role_by_name(self, role_name, notify=True):
 | |
|         """
 | |
|         Find a role by name.
 | |
|         """
 | |
|         role_name = to_text(urlquote(to_bytes(role_name)))
 | |
| 
 | |
|         try:
 | |
|             parts = role_name.split(".")
 | |
|             user_name = ".".join(parts[0:-1])
 | |
|             role_name = parts[-1]
 | |
|             if notify:
 | |
|                 display.display("- downloading role '%s', owned by %s" % (role_name, user_name))
 | |
|         except Exception:
 | |
|             raise AnsibleError("Invalid role name (%s). Specify role as format: username.rolename" % role_name)
 | |
| 
 | |
|         url = '%s/roles/?owner__username=%s&name=%s' % (self.baseurl, user_name, role_name)
 | |
|         data = self.__call_galaxy(url)
 | |
|         if len(data["results"]) != 0:
 | |
|             return data["results"][0]
 | |
|         return None
 | |
| 
 | |
|     @g_connect
 | |
|     def fetch_role_related(self, related, role_id):
 | |
|         """
 | |
|         Fetch the list of related items for the given role.
 | |
|         The url comes from the 'related' field of the role.
 | |
|         """
 | |
| 
 | |
|         try:
 | |
|             url = '%s/roles/%s/%s/?page_size=50' % (self.baseurl, role_id, related)
 | |
|             data = self.__call_galaxy(url)
 | |
|             results = data['results']
 | |
|             done = (data.get('next_link', None) is None)
 | |
|             while not done:
 | |
|                 url = '%s%s' % (self._api_server, data['next_link'])
 | |
|                 data = self.__call_galaxy(url)
 | |
|                 results += data['results']
 | |
|                 done = (data.get('next_link', None) is None)
 | |
|             return results
 | |
|         except Exception:
 | |
|             return None
 | |
| 
 | |
|     @g_connect
 | |
|     def get_list(self, what):
 | |
|         """
 | |
|         Fetch the list of items specified.
 | |
|         """
 | |
|         try:
 | |
|             url = '%s/%s/?page_size' % (self.baseurl, what)
 | |
|             data = self.__call_galaxy(url)
 | |
|             if "results" in data:
 | |
|                 results = data['results']
 | |
|             else:
 | |
|                 results = data
 | |
|             done = True
 | |
|             if "next" in data:
 | |
|                 done = (data.get('next_link', None) is None)
 | |
|             while not done:
 | |
|                 url = '%s%s' % (self._api_server, data['next_link'])
 | |
|                 data = self.__call_galaxy(url)
 | |
|                 results += data['results']
 | |
|                 done = (data.get('next_link', None) is None)
 | |
|             return results
 | |
|         except Exception as error:
 | |
|             raise AnsibleError("Failed to download the %s list: %s" % (what, to_native(error)))
 | |
| 
 | |
|     @g_connect
 | |
|     def search_roles(self, search, **kwargs):
 | |
| 
 | |
|         search_url = self.baseurl + '/search/roles/?'
 | |
| 
 | |
|         if search:
 | |
|             search_url += '&autocomplete=' + to_text(urlquote(to_bytes(search)))
 | |
| 
 | |
|         tags = kwargs.get('tags', None)
 | |
|         platforms = kwargs.get('platforms', None)
 | |
|         page_size = kwargs.get('page_size', None)
 | |
|         author = kwargs.get('author', None)
 | |
| 
 | |
|         if tags and isinstance(tags, string_types):
 | |
|             tags = tags.split(',')
 | |
|             search_url += '&tags_autocomplete=' + '+'.join(tags)
 | |
| 
 | |
|         if platforms and isinstance(platforms, string_types):
 | |
|             platforms = platforms.split(',')
 | |
|             search_url += '&platforms_autocomplete=' + '+'.join(platforms)
 | |
| 
 | |
|         if page_size:
 | |
|             search_url += '&page_size=%s' % page_size
 | |
| 
 | |
|         if author:
 | |
|             search_url += '&username_autocomplete=%s' % author
 | |
| 
 | |
|         data = self.__call_galaxy(search_url)
 | |
|         return data
 | |
| 
 | |
|     @g_connect
 | |
|     def add_secret(self, source, github_user, github_repo, secret):
 | |
|         url = "%s/notification_secrets/" % self.baseurl
 | |
|         args = urlencode({
 | |
|             "source": source,
 | |
|             "github_user": github_user,
 | |
|             "github_repo": github_repo,
 | |
|             "secret": secret
 | |
|         })
 | |
|         data = self.__call_galaxy(url, args=args, method="POST")
 | |
|         return data
 | |
| 
 | |
|     @g_connect
 | |
|     def list_secrets(self):
 | |
|         url = "%s/notification_secrets" % self.baseurl
 | |
|         data = self.__call_galaxy(url, headers=self.__auth_header())
 | |
|         return data
 | |
| 
 | |
|     @g_connect
 | |
|     def remove_secret(self, secret_id):
 | |
|         url = "%s/notification_secrets/%s/" % (self.baseurl, secret_id)
 | |
|         data = self.__call_galaxy(url, headers=self.__auth_header(), method='DELETE')
 | |
|         return data
 | |
| 
 | |
|     @g_connect
 | |
|     def delete_role(self, github_user, github_repo):
 | |
|         url = "%s/removerole/?github_user=%s&github_repo=%s" % (self.baseurl, github_user, github_repo)
 | |
|         data = self.__call_galaxy(url, headers=self.__auth_header(), method='DELETE')
 | |
|         return data
 |