Better handling of malformed vault data envelope (#32515)

* Better handling of malformed vault data envelope

If an embedded vaulted variable ('!vault' in yaml)
had an invalid format, it would eventually cause
an error for seemingly unrelated reasons.
"Invalid" meaning not valid hexlify (extra chars,
non-hex chars, etc).

For ex, if a host_vars file had invalid vault format
variables, on py2, it would cause an error like:

  'ansible.vars.hostvars.HostVars object' has no
  attribute u'broken.example.com'

Depending on where the invalid vault is, it could
also cause "VARIABLE IS NOT DEFINED!". The behavior
can also change if ansible-playbook is py2 or py3.

Root cause is errors from binascii.unhexlify() not
being handled consistently.

Fix is to add a AnsibleVaultFormatError exception and
raise it on any unhexlify() errors and to handle it
properly elsewhere.

Add a _unhexlify() that try/excepts around a binascii.unhexlify()
and raises an AnsibleVaultFormatError on invalid vault data.
This is so the same exception type is always raised for this
case. Previous it was different between py2 and py3.

binascii.unhexlify() raises a binascii.Error if the hexlified
blobs in a vault data blob are invalid.

On py2, binascii.Error is a subclass of Exception.
On py3, binascii.Error is a subclass of TypeError

When decrypting content of vault encrypted variables,
if a binascii.Error is raised it propagates up to
playbook.base.Base.post_validate(). post_validate()
handles exceptions for TypeErrors but not for
base Exception subclasses (like py2 binascii.Error).

* Add a display.warning on vault format errors
* Unit tests for _unhexlify, parse_vaulttext*
* Add intg test cases for invalid vault formats

Fixes #28038
This commit is contained in:
Adrian Likins 2017-11-10 14:24:56 -05:00 committed by GitHub
commit 9c58827410
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 220 additions and 24 deletions

View file

@ -29,6 +29,7 @@ import tempfile
import warnings
from binascii import hexlify
from binascii import unhexlify
from binascii import Error as BinasciiError
from hashlib import md5
from hashlib import sha256
from io import BytesIO
@ -105,6 +106,10 @@ class AnsibleVaultPasswordError(AnsibleVaultError):
pass
class AnsibleVaultFormatError(AnsibleError):
pass
def is_encrypted(data):
""" Test if this is vault encrypted data blob
@ -148,20 +153,7 @@ def is_encrypted_file(file_obj, start_pos=0, count=-1):
file_obj.seek(current_position)
def parse_vaulttext_envelope(b_vaulttext_envelope, default_vault_id=None):
"""Retrieve information about the Vault and clean the data
When data is saved, it has a header prepended and is formatted into 80
character lines. This method extracts the information from the header
and then removes the header and the inserted newlines. The string returned
is suitable for processing by the Cipher classes.
:arg b_vaulttext: byte str containing the data from a save file
:returns: a byte str suitable for passing to a Cipher class's
decrypt() function.
"""
# used by decrypt
default_vault_id = default_vault_id or C.DEFAULT_VAULT_IDENTITY
def _parse_vaulttext_envelope(b_vaulttext_envelope, default_vault_id=None):
b_tmpdata = b_vaulttext_envelope.splitlines()
b_tmpheader = b_tmpdata[0].strip().split(b';')
@ -169,7 +161,6 @@ def parse_vaulttext_envelope(b_vaulttext_envelope, default_vault_id=None):
b_version = b_tmpheader[1].strip()
cipher_name = to_text(b_tmpheader[2].strip())
vault_id = default_vault_id
# vault_id = None
# Only attempt to find vault_id if the vault file is version 1.2 or newer
# if self.b_version == b'1.2':
@ -181,6 +172,37 @@ def parse_vaulttext_envelope(b_vaulttext_envelope, default_vault_id=None):
return b_ciphertext, b_version, cipher_name, vault_id
def parse_vaulttext_envelope(b_vaulttext_envelope, default_vault_id=None, filename=None):
"""Parse the vaulttext envelope
When data is saved, it has a header prepended and is formatted into 80
character lines. This method extracts the information from the header
and then removes the header and the inserted newlines. The string returned
is suitable for processing by the Cipher classes.
:arg b_vaulttext: byte str containing the data from a save file
:kwarg default_vault_id: The vault_id name to use if the vaulttext does not provide one.
:kwarg filename: The filename that the data came from. This is only
used to make better error messages in case the data cannot be
decrypted. This is optional.
:returns: A tuple of byte str of the vaulttext suitable to pass to parse_vaultext,
a byte str of the vault format version,
the name of the cipher used, and the vault_id.
:raises: AnsibleVaultFormatError: if the vaulttext_envelope format is invalid
"""
# used by decrypt
default_vault_id = default_vault_id or C.DEFAULT_VAULT_IDENTITY
try:
return _parse_vaulttext_envelope(b_vaulttext_envelope, default_vault_id)
except Exception as exc:
msg = "Vault envelope format error"
if filename:
msg += ' in %s' % (filename)
msg += ': %s' % exc
raise AnsibleVaultFormatError(msg)
def format_vaulttext_envelope(b_ciphertext, cipher_name, version=None, vault_id=None):
""" Add header and format to 80 columns
@ -222,6 +244,41 @@ def format_vaulttext_envelope(b_ciphertext, cipher_name, version=None, vault_id=
return b_vaulttext
def _unhexlify(b_data):
try:
return unhexlify(b_data)
except (BinasciiError, TypeError) as exc:
raise AnsibleVaultFormatError('Vault format unhexlify error: %s' % exc)
def _parse_vaulttext(b_vaulttext):
b_vaulttext = _unhexlify(b_vaulttext)
b_salt, b_crypted_hmac, b_ciphertext = b_vaulttext.split(b"\n", 2)
b_salt = _unhexlify(b_salt)
b_ciphertext = _unhexlify(b_ciphertext)
return b_ciphertext, b_salt, b_crypted_hmac
def parse_vaulttext(b_vaulttext):
"""Parse the vaulttext
:arg b_vaulttext: byte str containing the vaulttext (ciphertext, salt, crypted_hmac)
:returns: A tuple of byte str of the ciphertext suitable for passing to a
Cipher class's decrypt() function, a byte str of the salt,
and a byte str of the crypted_hmac
:raises: AnsibleVaultFormatError: if the vaulttext format is invalid
"""
# SPLIT SALT, DIGEST, AND DATA
try:
return _parse_vaulttext(b_vaulttext)
except AnsibleVaultFormatError:
raise
except Exception as exc:
msg = "Vault vaulttext format error: %s" % exc
raise AnsibleVaultFormatError(msg)
def verify_secret_is_not_empty(secret, msg=None):
'''Check the secret against minimal requirements.
@ -609,7 +666,8 @@ class VaultLib:
msg += "%s is not a vault encrypted file" % filename
raise AnsibleError(msg)
b_vaulttext, dummy, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext)
b_vaulttext, dummy, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext,
filename=filename)
# create the cipher object, note that the cipher used for decrypt can
# be different than the cipher used for encrypt
@ -665,6 +723,13 @@ class VaultLib:
vault_id_used = vault_secret_id
display.vvvvv('decrypt succesful with secret=%s and vault_id=%s' % (vault_secret, vault_secret_id))
break
except AnsibleVaultFormatError as exc:
msg = "There was a vault format error"
if filename:
msg += ' in %s' % (filename)
msg += ': %s' % exc
display.warning(msg)
raise
except AnsibleError as e:
display.vvvv('Tried to use the vault secret (%s) to decrypt (%s) but it failed. Error: %s' %
(vault_secret_id, filename, e))
@ -862,7 +927,8 @@ class VaultEditor:
# Figure out the vault id from the file, to select the right secret to re-encrypt it
# (duplicates parts of decrypt, but alas...)
dummy, dummy, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext)
dummy, dummy, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext,
filename=filename)
# vault id here may not be the vault id actually used for decrypting
# as when the edited file has no vault-id but is decrypted by non-default id in secrets
@ -1136,7 +1202,7 @@ class VaultAES:
'switch to the newer VaultAES256 format', version='2.3')
# http://stackoverflow.com/a/14989032
b_vaultdata = unhexlify(b_vaulttext)
b_vaultdata = _unhexlify(b_vaulttext)
b_salt = b_vaultdata[len(b'Salted__'):16]
b_ciphertext = b_vaultdata[16:]
@ -1289,7 +1355,7 @@ class VaultAES256:
hmac = HMAC(b_key2, hashes.SHA256(), CRYPTOGRAPHY_BACKEND)
hmac.update(b_ciphertext)
try:
hmac.verify(unhexlify(b_crypted_hmac))
hmac.verify(_unhexlify(b_crypted_hmac))
except InvalidSignature as e:
raise AnsibleVaultError('HMAC verification failed: %s' % e)
@ -1351,11 +1417,8 @@ class VaultAES256:
@classmethod
def decrypt(cls, b_vaulttext, secret):
# SPLIT SALT, DIGEST, AND DATA
b_vaulttext = unhexlify(b_vaulttext)
b_salt, b_crypted_hmac, b_ciphertext = b_vaulttext.split(b"\n", 2)
b_salt = unhexlify(b_salt)
b_ciphertext = unhexlify(b_ciphertext)
b_ciphertext, b_salt, b_crypted_hmac = parse_vaulttext(b_vaulttext)
# TODO: would be nice if a VaultSecret could be passed directly to _decrypt_*
# (move _gen_key_initctr() to a AES256 VaultSecret or VaultContext impl?)