Implement vault encrypted yaml variables. (#16274)

Make !vault-encrypted create a AnsibleVaultUnicode
yaml object that can be used as a regular string object.

This allows a playbook to include a encrypted vault
blob for the value of a yaml variable. A 'secret_password'
variable can have it's value encrypted instead of having
to vault encrypt an entire vars file.

Add __ENCRYPTED__ to the vault yaml types so
template.Template can treat it similar
to __UNSAFE__ flags.

vault.VaultLib api changes:
    - Split VaultLib.encrypt to encrypt and encrypt_bytestring

    - VaultLib.encrypt() previously accepted the plaintext data
      as either a byte string or a unicode string.
      Doing the right thing based on the input type would fail
      on py3 if given a arg of type 'bytes'. To simplify the
      API, vaultlib.encrypt() now assumes input plaintext is a
      py2 unicode or py3 str. It will encode to utf-8 then call
      the new encrypt_bytestring(). The new methods are less
      ambiguous.

    - moved VaultLib.is_encrypted logic to vault module scope
      and split to is_encrypted() and is_encrypted_file().

Add a test/unit/mock/yaml_helper.py
It has some helpers for testing parsing/yaml

Integration tests added as roles test_vault and test_vault_embedded
This commit is contained in:
Adrian Likins 2016-08-23 20:03:11 -04:00 committed by GitHub
parent dbf7df4439
commit e396d5d508
21 changed files with 934 additions and 111 deletions

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
@ -19,14 +20,12 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import getpass
import os
import shutil
import time
import tempfile
import six
from binascii import unhexlify
import binascii
import io
import os
from binascii import hexlify
from nose.plugins.skip import SkipTest
@ -35,6 +34,7 @@ from ansible.utils.unicode import to_bytes, to_unicode
from ansible import errors
from ansible.parsing.vault import VaultLib
from ansible.parsing import vault
# Counter import fails for 2.0.1, requires >= 2.6.1 from pip
try:
@ -57,6 +57,150 @@ try:
except ImportError:
HAS_AES = False
class TestVaultIsEncrypted(unittest.TestCase):
def test_utf8_not_encrypted(self):
b_data = "foobar".encode('utf8')
self.assertFalse(vault.is_encrypted(b_data))
def test_utf8_encrypted(self):
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
b_data = data.encode('utf8')
self.assertTrue(vault.is_encrypted(b_data))
def test_bytes_not_encrypted(self):
b_data = b"foobar"
self.assertFalse(vault.is_encrypted(b_data))
def test_bytes_encrypted(self):
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" + hexlify(b"ansible")
self.assertTrue(vault.is_encrypted(b_data))
def test_unicode_not_encrypted_py3(self):
if not six.PY3:
raise SkipTest()
data = u"ァ ア ィ イ ゥ ウ ェ エ ォ オ カ ガ キ ギ ク グ ケ "
self.assertRaises(TypeError, vault.is_encrypted, data)
def test_unicode_not_encrypted_py2(self):
if six.PY3:
raise SkipTest()
data = u"ァ ア ィ イ ゥ ウ ェ エ ォ オ カ ガ キ ギ ク グ ケ "
# py2 will take a unicode string, but that should always fails
self.assertFalse(vault.is_encrypted(data))
def test_unicode_is_encrypted_py3(self):
if not six.PY3:
raise SkipTest()
data = "$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
# should still be a type error
self.assertRaises(TypeError, vault.is_encrypted, data)
def test_unicode_is_encrypted_py2(self):
if six.PY3:
raise SkipTest()
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
# THis works, but arguably shouldn't...
self.assertTrue(vault.is_encrypted(data))
class TestVaultIsEncryptedFile(unittest.TestCase):
def test_utf8_not_encrypted(self):
b_data = "foobar".encode('utf8')
b_data_fo = io.BytesIO(b_data)
self.assertFalse(vault.is_encrypted_file(b_data_fo))
def test_utf8_encrypted(self):
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
b_data = data.encode('utf8')
b_data_fo = io.BytesIO(b_data)
self.assertTrue(vault.is_encrypted_file(b_data_fo))
def test_bytes_not_encrypted(self):
b_data = b"foobar"
b_data_fo = io.BytesIO(b_data)
self.assertFalse(vault.is_encrypted_file(b_data_fo))
def test_bytes_encrypted(self):
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" + hexlify(b"ansible")
b_data_fo = io.BytesIO(b_data)
self.assertTrue(vault.is_encrypted_file(b_data_fo))
class TestVaultCipherAes256(unittest.TestCase):
def test(self):
vault_cipher = vault.VaultAES256()
self.assertIsInstance(vault_cipher, vault.VaultAES256)
# TODO: tag these as slow tests
def test_create_key(self):
vault_cipher = vault.VaultAES256()
password = 'hunter42'
b_salt = os.urandom(32)
b_key = vault_cipher.create_key(password=password, salt=b_salt, keylength=32, ivlength=16)
self.assertIsInstance(b_key, six.binary_type)
def test_create_key_known(self):
vault_cipher = vault.VaultAES256()
password = 'hunter42'
# A fixed salt
b_salt = b'q' * 32 # q is the most random letter.
b_key = vault_cipher.create_key(password=password, salt=b_salt, keylength=32, ivlength=16)
self.assertIsInstance(b_key, six.binary_type)
# verify we get the same answer
# we could potentially run a few iterations of this and time it to see if it's roughly constant time
# and or that it exceeds some minimal time, but that would likely cause unreliable fails, esp in CI
b_key_2 = vault_cipher.create_key(password=password, salt=b_salt, keylength=32, ivlength=16)
self.assertIsInstance(b_key, six.binary_type)
self.assertEqual(b_key, b_key_2)
def test_is_equal_is_equal(self):
vault_cipher = vault.VaultAES256()
res = vault_cipher.is_equal(b'abcdefghijklmnopqrstuvwxyz', b'abcdefghijklmnopqrstuvwxyz')
self.assertTrue(res)
def test_is_equal_unequal_length(self):
vault_cipher = vault.VaultAES256()
res = vault_cipher.is_equal(b'abcdefghijklmnopqrstuvwxyz', b'abcdefghijklmnopqrstuvwx and sometimes y')
self.assertFalse(res)
def test_is_equal_not_equal(self):
vault_cipher = vault.VaultAES256()
res = vault_cipher.is_equal(b'abcdefghijklmnopqrstuvwxyz', b'AbcdefghijKlmnopQrstuvwxZ')
self.assertFalse(res)
def test_is_equal_empty(self):
vault_cipher = vault.VaultAES256()
res = vault_cipher.is_equal(b'', b'')
self.assertTrue(res)
# NOTE: I'm not really sure what the method should do if it doesn't get bytes,
# but this at least sees if it explodes (maybe it should?)
def test_is_equal_unicode_py3(self):
if not six.PY3:
raise SkipTest
vault_cipher = vault.VaultAES256()
self.assertRaises(TypeError, vault_cipher.is_equal,
u'私はガラスを食べられます。それは私を傷つけません。',
u'私はガラスを食べられます。それは私を傷つけません。')
def test_is_equal_unicode_py2(self):
if not six.PY2:
raise SkipTest
vault_cipher = vault.VaultAES256()
res = vault_cipher.is_equal(u'私はガラスを食べられます。それは私を傷つけません。',
u'私はガラスを食べられます。それは私を傷つけません。')
self.assertTrue(res)
def test_is_equal_unicode_different(self):
vault_cipher = vault.VaultAES256()
res = vault_cipher.is_equal(u'私はガラスを食べられます。それは私を傷つけません。',
u'Pot să mănânc sticlă și ea nu mă rănește.')
self.assertFalse(res)
class TestVaultLib(unittest.TestCase):
def test_methods_exist(self):
@ -69,10 +213,24 @@ class TestVaultLib(unittest.TestCase):
for slot in slots:
assert hasattr(v, slot), "VaultLib is missing the %s method" % slot
def test_encrypt(self):
v = VaultLib(password='the_unit_test_password')
plaintext = u'Some text to encrypt.'
ciphertext = v.encrypt(plaintext)
self.assertIsInstance(ciphertext, (bytes, str))
# TODO: assert something...
def test_is_encrypted(self):
v = VaultLib(None)
assert not v.is_encrypted(u"foobar"), "encryption check on plaintext failed"
assert not v.is_encrypted("foobar".encode('utf-8')), "encryption check on plaintext failed"
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
assert v.is_encrypted(data.encode('utf-8')), "encryption check on headered text failed"
def test_is_encrypted_bytes(self):
v = VaultLib(None)
assert not v.is_encrypted(b"foobar"), "encryption check on plaintext failed"
data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" + hexlify(b"ansible")
assert v.is_encrypted(data), "encryption check on headered text failed"
def test_format_output(self):
@ -115,35 +273,82 @@ class TestVaultLib(unittest.TestCase):
raise SkipTest
v = VaultLib('ansible')
v.cipher_name = 'AES256'
enc_data = v.encrypt(b"foobar")
plaintext = "foobar"
enc_data = v.encrypt(plaintext)
dec_data = v.decrypt(enc_data)
assert enc_data != b"foobar", "encryption failed"
assert dec_data == b"foobar", "decryption failed"
def test_encrypt_decrypt_aes256_existing_vault(self):
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
raise SkipTest
v = VaultLib('test-vault-password')
v.cipher_name = 'AES256'
plaintext = b"Setec Astronomy"
enc_data = '''$ANSIBLE_VAULT;1.1;AES256
33363965326261303234626463623963633531343539616138316433353830356566396130353436
3562643163366231316662386565383735653432386435610a306664636137376132643732393835
63383038383730306639353234326630666539346233376330303938323639306661313032396437
6233623062366136310a633866373936313238333730653739323461656662303864663666653563
3138'''
dec_data = v.decrypt(enc_data)
assert dec_data == plaintext, "decryption failed"
def test_encrypt_decrypt_aes256_bad_hmac(self):
# FIXME This test isn't working quite yet.
raise SkipTest
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
raise SkipTest
v = VaultLib('test-vault-password')
v.cipher_name = 'AES256'
# plaintext = "Setec Astronomy"
enc_data = '''$ANSIBLE_VAULT;1.1;AES256
33363965326261303234626463623963633531343539616138316433353830356566396130353436
3562643163366231316662386565383735653432386435610a306664636137376132643732393835
63383038383730306639353234326630666539346233376330303938323639306661313032396437
6233623062366136310a633866373936313238333730653739323461656662303864663666653563
3138'''
b_data = to_bytes(enc_data, errors='strict', encoding='utf-8')
b_data = v._split_header(b_data)
foo = binascii.unhexlify(b_data)
lines = foo.splitlines()
# line 0 is salt, line 1 is hmac, line 2+ is ciphertext
b_salt = lines[0]
b_hmac = lines[1]
b_ciphertext_data = b'\n'.join(lines[2:])
b_ciphertext = binascii.unhexlify(b_ciphertext_data)
# b_orig_ciphertext = b_ciphertext[:]
# now muck with the text
# b_munged_ciphertext = b_ciphertext[:10] + b'\x00' + b_ciphertext[11:]
# b_munged_ciphertext = b_ciphertext
# assert b_orig_ciphertext != b_munged_ciphertext
b_ciphertext_data = binascii.hexlify(b_ciphertext)
b_payload = b'\n'.join([b_salt, b_hmac, b_ciphertext_data])
# reformat
b_invalid_ciphertext = v._format_output(b_payload)
# assert we throw an error
v.decrypt(b_invalid_ciphertext)
def test_encrypt_encrypted(self):
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
raise SkipTest
v = VaultLib('ansible')
v.cipher_name = 'AES'
data = "$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(six.b("ansible"))
error_hit = False
try:
enc_data = v.encrypt(data)
except errors.AnsibleError as e:
error_hit = True
assert error_hit, "No error was thrown when trying to encrypt data with a header"
self.assertRaises(errors.AnsibleError, v.encrypt, data,)
def test_decrypt_decrypted(self):
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
raise SkipTest
v = VaultLib('ansible')
data = "ansible"
error_hit = False
try:
dec_data = v.decrypt(data)
except errors.AnsibleError as e:
error_hit = True
assert error_hit, "No error was thrown when trying to decrypt data without a header"
self.assertRaises(errors.AnsibleError, v.decrypt, data)
def test_cipher_not_set(self):
# not setting the cipher should default to AES256
@ -151,10 +356,5 @@ class TestVaultLib(unittest.TestCase):
raise SkipTest
v = VaultLib('ansible')
data = "ansible"
error_hit = False
try:
enc_data = v.encrypt(data)
except errors.AnsibleError as e:
error_hit = True
assert not error_hit, "An error was thrown when trying to encrypt data without the cipher set"
assert v.cipher_name == "AES256", "cipher name is not set to AES256: %s" % v.cipher_name
v.encrypt(data)
self.assertEquals(v.cipher_name, "AES256")