ACI: Add signature-based authentication (#34451)

ACI: Add signature-based authentication
This commit is contained in:
Dag Wieers 2018-01-08 00:44:30 +01:00 committed by GitHub
commit 49739dda47
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 130 additions and 30 deletions

View file

@ -30,12 +30,21 @@
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import base64
import json
import os
from copy import deepcopy
from ansible.module_utils.urls import fetch_url
from ansible.module_utils._text import to_bytes
# Optional, only used for APIC signature-based authentication
try:
from OpenSSL.crypto import FILETYPE_PEM, load_privatekey, sign
HAS_OPENSSL = True
except ImportError:
HAS_OPENSSL = False
# Optional, only used for XML payload
try:
import lxml.etree
@ -54,7 +63,9 @@ except ImportError:
aci_argument_spec = dict(
hostname=dict(type='str', required=True, aliases=['host']),
username=dict(type='str', default='admin', aliases=['user']),
password=dict(type='str', required=True, no_log=True),
password=dict(type='str', no_log=True),
private_key=dict(type='path', aliases=['cert_key']), # Beware, this is not the same as client_key !
certificate_name=dict(type='str', aliases=['cert_name']), # Beware, this is not the same as client_cert !
protocol=dict(type='str', removed_in_version='2.6'), # Deprecated in v2.6
timeout=dict(type='int', default=30),
use_proxy=dict(type='bool', default=True),
@ -106,6 +117,7 @@ def aci_response_error(result):
''' Set error information when found '''
result['error_code'] = 0
result['error_text'] = 'Success'
# Handle possible APIC error information
if result['totalCount'] != '0':
try:
@ -157,9 +169,20 @@ class ACIModule(object):
self.module = module
self.params = module.params
self.result = dict(changed=False)
self.headers = None
self.headers = dict()
self.login()
# Ensure protocol is set
self.define_protocol()
if self.params['private_key'] is None:
if self.params['password'] is None:
self.module.fail(msg="Parameter 'password' is required for HTTP authentication")
# Only log in when password-based authentication is used
self.login()
elif not HAS_OPENSSL:
self.module.fail_json(msg='Cannot use signature-based authentication because pyopenssl is not available')
elif self.params['password'] is not None:
self.module.warn('When doing ACI signatured-based authentication, a password is not required')
def define_protocol(self):
''' Set protocol based on use_ssl parameter '''
@ -189,9 +212,6 @@ class ACIModule(object):
def login(self):
''' Log in to APIC '''
# Ensure protocol is set (only do this once)
self.define_protocol()
# Perform login request
url = '%(protocol)s://%(hostname)s/api/aaaLogin.json' % self.params
payload = {'aaaUser': {'attributes': {'name': self.params['username'], 'pwd': self.params['password']}}}
@ -214,16 +234,53 @@ class ACIModule(object):
self.module.fail_json(msg='Authentication failed for %(url)s. %(msg)s' % auth)
# Retain cookie for later use
self.headers = dict(Cookie=resp.headers['Set-Cookie'])
self.headers['Cookie'] = resp.headers['Set-Cookie']
def cert_auth(self, path=None, payload='', method=None):
''' Perform APIC signature-based authentication, not the expected SSL client certificate authentication. '''
if method is None:
method = self.params['method'].upper()
# NOTE: ACI documentation incorrectly uses complete URL
if path is None:
path = self.result['path']
path = '/' + path.lstrip('/')
if payload is None:
payload = ''
# Use the private key basename (without extension) as certificate_name
if self.params['certificate_name'] is None:
self.params['certificate_name'] = os.path.basename(os.path.splitext(self.params['private_key'])[0])
try:
sig_key = load_privatekey(FILETYPE_PEM, open(self.params['private_key'], 'r').read())
except:
self.module.fail_json(msg='Cannot load private key %s' % self.params['private_key'])
# NOTE: ACI documentation incorrectly adds a space between method and path
sig_request = method + path + payload
sig_signature = base64.b64encode(sign(sig_key, sig_request, 'sha256'))
sig_dn = 'uni/userext/user-%s/usercert-%s' % (self.params['username'], self.params['certificate_name'])
self.headers['Cookie'] = 'APIC-Certificate-Algorithm=v1.0; ' +\
'APIC-Certificate-DN=%s; ' % sig_dn +\
'APIC-Certificate-Fingerprint=fingerprint; ' +\
'APIC-Request-Signature=%s' % sig_signature
def request(self, path, payload=None):
''' Perform a REST request '''
# Ensure method is set (only do this once)
self.define_method()
self.result['path'] = path
self.result['url'] = '%(protocol)s://%(hostname)s/' % self.params + path.lstrip('/')
# Sign and encode request as to APIC's wishes
if self.params['private_key'] is not None:
self.cert_auth(path=path, payload=payload)
# Perform request
self.result['url'] = '%(protocol)s://%(hostname)s/' % self.params + path.lstrip('/')
resp, info = fetch_url(self.module, self.result['url'],
data=payload,
headers=self.headers,
@ -248,8 +305,17 @@ class ACIModule(object):
def query(self, path):
''' Perform a query with no payload '''
url = '%(protocol)s://%(hostname)s/' % self.params + path.lstrip('/')
resp, query = fetch_url(self.module, url,
# Ensure method is set
self.result['path'] = path
self.result['url'] = '%(protocol)s://%(hostname)s/' % self.params + path.lstrip('/')
# Sign and encode request as to APIC's wishes
if self.params['private_key'] is not None:
self.cert_auth(path=path, method='GET')
# Perform request
resp, query = fetch_url(self.module, self.result['url'],
data=None,
headers=self.headers,
method='GET',
@ -314,6 +380,7 @@ class ACIModule(object):
else:
path, filter_string = self._construct_url_1(root_class, child_includes)
self.result['path'] = path
self.result['url'] = '{}://{}/{}'.format(self.module.params['protocol'], self.module.params['hostname'], path)
self.result['filter_string'] = filter_string
@ -508,6 +575,10 @@ class ACIModule(object):
return
elif not self.module.check_mode:
# Sign and encode request as to APIC's wishes
if self.params['private_key'] is not None:
self.cert_auth(method='DELETE')
resp, info = fetch_url(self.module, self.result['url'],
headers=self.headers,
method='DELETE',
@ -639,6 +710,10 @@ class ACIModule(object):
"""
uri = self.result['url'] + self.result['filter_string']
# Sign and encode request as to APIC's wishes
if self.params['private_key'] is not None:
self.cert_auth(path=self.result['path'] + self.result['filter_string'], method='GET')
resp, info = fetch_url(self.module, uri,
headers=self.headers,
method='GET',
@ -732,6 +807,10 @@ class ACIModule(object):
if not self.result['config']:
return
elif not self.module.check_mode:
# Sign and encode request as to APIC's wishes
if self.params['private_key'] is not None:
self.cert_auth(method='POST', payload=json.dumps(self.result['config']))
resp, info = fetch_url(self.module, self.result['url'],
data=json.dumps(self.result['config']),
headers=self.headers,