mirror of
https://github.com/ansible-collections/community.general.git
synced 2025-04-24 19:31:26 -07:00
Galaxy 2.0
This commit is contained in:
parent
0719eb3e2d
commit
4f84769a17
11 changed files with 949 additions and 112 deletions
|
@ -25,11 +25,15 @@ from __future__ import (absolute_import, division, print_function)
|
|||
__metaclass__ = type
|
||||
|
||||
import json
|
||||
import urllib
|
||||
|
||||
from urllib2 import quote as urlquote, HTTPError
|
||||
from urlparse import urlparse
|
||||
|
||||
import ansible.constants as C
|
||||
from ansible.errors import AnsibleError
|
||||
from ansible.module_utils.urls import open_url
|
||||
from ansible.galaxy.token import GalaxyToken
|
||||
|
||||
try:
|
||||
from __main__ import display
|
||||
|
@ -43,45 +47,113 @@ class GalaxyAPI(object):
|
|||
|
||||
SUPPORTED_VERSIONS = ['v1']
|
||||
|
||||
def __init__(self, galaxy, api_server):
|
||||
def __init__(self, galaxy):
|
||||
|
||||
self.galaxy = galaxy
|
||||
self.token = GalaxyToken()
|
||||
self._api_server = C.GALAXY_SERVER
|
||||
self._validate_certs = C.GALAXY_IGNORE_CERTS
|
||||
|
||||
try:
|
||||
urlparse(api_server, scheme='https')
|
||||
except:
|
||||
raise AnsibleError("Invalid server API url passed: %s" % api_server)
|
||||
# set validate_certs
|
||||
if galaxy.options.validate_certs == False:
|
||||
self._validate_certs = False
|
||||
display.vvv('Check for valid certs: %s' % self._validate_certs)
|
||||
|
||||
server_version = self.get_server_api_version('%s/api/' % (api_server))
|
||||
if not server_version:
|
||||
raise AnsibleError("Could not retrieve server API version: %s" % api_server)
|
||||
# set the API server
|
||||
if galaxy.options.api_server != C.GALAXY_SERVER:
|
||||
self._api_server = galaxy.options.api_server
|
||||
display.vvv("Connecting to galaxy_server: %s" % self._api_server)
|
||||
|
||||
server_version = self.get_server_api_version()
|
||||
|
||||
if server_version in self.SUPPORTED_VERSIONS:
|
||||
self.baseurl = '%s/api/%s' % (api_server, server_version)
|
||||
self.baseurl = '%s/api/%s' % (self._api_server, server_version)
|
||||
self.version = server_version # for future use
|
||||
display.vvvvv("Base API: %s" % self.baseurl)
|
||||
display.vvv("Base API: %s" % self.baseurl)
|
||||
else:
|
||||
raise AnsibleError("Unsupported Galaxy server API version: %s" % server_version)
|
||||
|
||||
def get_server_api_version(self, api_server):
|
||||
def __auth_header(self):
|
||||
token = self.token.get()
|
||||
if token is None:
|
||||
raise AnsibleError("No access token. You must first use login to authenticate and obtain an access token.")
|
||||
return {'Authorization': 'Token ' + token}
|
||||
|
||||
def __call_galaxy(self, url, args=None, headers=None, method=None):
|
||||
if args and not headers:
|
||||
headers = self.__auth_header()
|
||||
try:
|
||||
display.vvv(url)
|
||||
resp = open_url(url, data=args, validate_certs=self._validate_certs, headers=headers, method=method)
|
||||
data = json.load(resp)
|
||||
except HTTPError as e:
|
||||
res = json.load(e)
|
||||
raise AnsibleError(res['detail'])
|
||||
return data
|
||||
|
||||
@property
|
||||
def api_server(self):
|
||||
return self._api_server
|
||||
|
||||
@property
|
||||
def validate_certs(self):
|
||||
return self._validate_certs
|
||||
|
||||
def get_server_api_version(self):
|
||||
"""
|
||||
Fetches the Galaxy API current version to ensure
|
||||
the API server is up and reachable.
|
||||
"""
|
||||
#TODO: fix galaxy server which returns current_version path (/api/v1) vs actual version (v1)
|
||||
# also should set baseurl using supported_versions which has path
|
||||
return 'v1'
|
||||
|
||||
try:
|
||||
data = json.load(open_url(api_server, validate_certs=self.galaxy.options.validate_certs))
|
||||
return data.get("current_version", 'v1')
|
||||
except Exception:
|
||||
# TODO: report error
|
||||
return None
|
||||
url = '%s/api/' % self._api_server
|
||||
data = json.load(open_url(url, validate_certs=self._validate_certs))
|
||||
return data['current_version']
|
||||
except Exception as e:
|
||||
raise AnsibleError("The API server (%s) is not responding, please try again later." % url)
|
||||
|
||||
def authenticate(self, github_token):
|
||||
"""
|
||||
Retrieve an authentication token
|
||||
"""
|
||||
url = '%s/tokens/' % self.baseurl
|
||||
args = urllib.urlencode({"github_token": github_token})
|
||||
resp = open_url(url, data=args, validate_certs=self._validate_certs, method="POST")
|
||||
data = json.load(resp)
|
||||
return data
|
||||
|
||||
def create_import_task(self, github_user, github_repo, reference=None):
|
||||
"""
|
||||
Post an import request
|
||||
"""
|
||||
url = '%s/imports/' % self.baseurl
|
||||
args = urllib.urlencode({
|
||||
"github_user": github_user,
|
||||
"github_repo": github_repo,
|
||||
"github_reference": reference if reference else ""
|
||||
})
|
||||
data = self.__call_galaxy(url, args=args)
|
||||
if data.get('results', None):
|
||||
return data['results']
|
||||
return data
|
||||
|
||||
def get_import_task(self, task_id=None, github_user=None, github_repo=None):
|
||||
"""
|
||||
Check the status of an import task.
|
||||
"""
|
||||
url = '%s/imports/' % self.baseurl
|
||||
if not task_id is None:
|
||||
url = "%s?id=%d" % (url,task_id)
|
||||
elif not github_user is None and not github_repo is None:
|
||||
url = "%s?github_user=%s&github_repo=%s" % (url,github_user,github_repo)
|
||||
else:
|
||||
raise AnsibleError("Expected task_id or github_user and github_repo")
|
||||
|
||||
data = self.__call_galaxy(url)
|
||||
return data['results']
|
||||
|
||||
def lookup_role_by_name(self, role_name, notify=True):
|
||||
"""
|
||||
Find a role by name
|
||||
Find a role by name.
|
||||
"""
|
||||
role_name = urlquote(role_name)
|
||||
|
||||
|
@ -92,18 +164,12 @@ class GalaxyAPI(object):
|
|||
if notify:
|
||||
display.display("- downloading role '%s', owned by %s" % (role_name, user_name))
|
||||
except:
|
||||
raise AnsibleError("- invalid role name (%s). Specify role as format: username.rolename" % role_name)
|
||||
raise AnsibleError("Invalid role name (%s). Specify role as format: username.rolename" % role_name)
|
||||
|
||||
url = '%s/roles/?owner__username=%s&name=%s' % (self.baseurl, user_name, role_name)
|
||||
display.vvvv("- %s" % (url))
|
||||
try:
|
||||
data = json.load(open_url(url, validate_certs=self.galaxy.options.validate_certs))
|
||||
if len(data["results"]) != 0:
|
||||
return data["results"][0]
|
||||
except:
|
||||
# TODO: report on connection/availability errors
|
||||
pass
|
||||
|
||||
data = self.__call_galaxy(url)
|
||||
if len(data["results"]) != 0:
|
||||
return data["results"][0]
|
||||
return None
|
||||
|
||||
def fetch_role_related(self, related, role_id):
|
||||
|
@ -114,13 +180,12 @@ class GalaxyAPI(object):
|
|||
|
||||
try:
|
||||
url = '%s/roles/%d/%s/?page_size=50' % (self.baseurl, int(role_id), related)
|
||||
data = json.load(open_url(url, validate_certs=self.galaxy.options.validate_certs))
|
||||
data = self.__call_galaxy(url)
|
||||
results = data['results']
|
||||
done = (data.get('next', None) is None)
|
||||
while not done:
|
||||
url = '%s%s' % (self.baseurl, data['next'])
|
||||
display.display(url)
|
||||
data = json.load(open_url(url, validate_certs=self.galaxy.options.validate_certs))
|
||||
data = self.__call_galaxy(url)
|
||||
results += data['results']
|
||||
done = (data.get('next', None) is None)
|
||||
return results
|
||||
|
@ -131,10 +196,9 @@ class GalaxyAPI(object):
|
|||
"""
|
||||
Fetch the list of items specified.
|
||||
"""
|
||||
|
||||
try:
|
||||
url = '%s/%s/?page_size' % (self.baseurl, what)
|
||||
data = json.load(open_url(url, validate_certs=self.galaxy.options.validate_certs))
|
||||
data = self.__call_galaxy(url)
|
||||
if "results" in data:
|
||||
results = data['results']
|
||||
else:
|
||||
|
@ -144,41 +208,64 @@ class GalaxyAPI(object):
|
|||
done = (data.get('next', None) is None)
|
||||
while not done:
|
||||
url = '%s%s' % (self.baseurl, data['next'])
|
||||
display.display(url)
|
||||
data = json.load(open_url(url, validate_certs=self.galaxy.options.validate_certs))
|
||||
data = self.__call_galaxy(url)
|
||||
results += data['results']
|
||||
done = (data.get('next', None) is None)
|
||||
return results
|
||||
except Exception as error:
|
||||
raise AnsibleError("Failed to download the %s list: %s" % (what, str(error)))
|
||||
|
||||
def search_roles(self, search, platforms=None, tags=None):
|
||||
def search_roles(self, search, **kwargs):
|
||||
|
||||
search_url = self.baseurl + '/roles/?page=1'
|
||||
search_url = self.baseurl + '/search/roles/?'
|
||||
|
||||
if search:
|
||||
search_url += '&search=' + urlquote(search)
|
||||
search_url += '&autocomplete=' + urlquote(search)
|
||||
|
||||
if tags is None:
|
||||
tags = []
|
||||
elif isinstance(tags, basestring):
|
||||
tags = kwargs.get('tags',None)
|
||||
platforms = kwargs.get('platforms', None)
|
||||
page_size = kwargs.get('page_size', None)
|
||||
author = kwargs.get('author', None)
|
||||
|
||||
if tags and isinstance(tags, basestring):
|
||||
tags = tags.split(',')
|
||||
|
||||
for tag in tags:
|
||||
search_url += '&chain__tags__name=' + urlquote(tag)
|
||||
|
||||
if platforms is None:
|
||||
platforms = []
|
||||
elif isinstance(platforms, basestring):
|
||||
search_url += '&tags_autocomplete=' + '+'.join(tags)
|
||||
|
||||
if platforms and isinstance(platforms, basestring):
|
||||
platforms = platforms.split(',')
|
||||
search_url += '&platforms_autocomplete=' + '+'.join(platforms)
|
||||
|
||||
for plat in platforms:
|
||||
search_url += '&chain__platforms__name=' + urlquote(plat)
|
||||
|
||||
display.debug("Executing query: %s" % search_url)
|
||||
try:
|
||||
data = json.load(open_url(search_url, validate_certs=self.galaxy.options.validate_certs))
|
||||
except HTTPError as e:
|
||||
raise AnsibleError("Unsuccessful request to server: %s" % str(e))
|
||||
if page_size:
|
||||
search_url += '&page_size=%s' % page_size
|
||||
|
||||
if author:
|
||||
search_url += '&username_autocomplete=%s' % author
|
||||
|
||||
data = self.__call_galaxy(search_url)
|
||||
return data
|
||||
|
||||
def add_secret(self, source, github_user, github_repo, secret):
|
||||
url = "%s/notification_secrets/" % self.baseurl
|
||||
args = urllib.urlencode({
|
||||
"source": source,
|
||||
"github_user": github_user,
|
||||
"github_repo": github_repo,
|
||||
"secret": secret
|
||||
})
|
||||
data = self.__call_galaxy(url, args=args)
|
||||
return data
|
||||
|
||||
def list_secrets(self):
|
||||
url = "%s/notification_secrets" % self.baseurl
|
||||
data = self.__call_galaxy(url, headers=self.__auth_header())
|
||||
return data
|
||||
|
||||
def remove_secret(self, secret_id):
|
||||
url = "%s/notification_secrets/%s/" % (self.baseurl, secret_id)
|
||||
data = self.__call_galaxy(url, headers=self.__auth_header(), method='DELETE')
|
||||
return data
|
||||
|
||||
def delete_role(self, github_user, github_repo):
|
||||
url = "%s/removerole/?github_user=%s&github_repo=%s" % (self.baseurl,github_user,github_repo)
|
||||
data = self.__call_galaxy(url, headers=self.__auth_header(), method='DELETE')
|
||||
return data
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue