mirror of
https://github.com/ansible-collections/community.general.git
synced 2025-07-22 04:40:22 -07:00
Many Cleanups to vault
* Make is_encrypted_file handle both files opened in text and binary mode On python3, by default files are opened in text mode. Since we know the encoding of vault files (and especially the header which is the first set of bytes) we can decide whether the file is an encrypted vault file in either case. * Fix is_encrypted_file not resetting the file position * Update is_encrypted_file to check that all the data in the file is ascii * For is_encrypted_file(), add start_pos and count parameters This allows callers to specify reading vaulttext from the middle of a file if necessary. * Combine VaultLib.encrypt() and VaultLib.encrypt_bytestring() * Change vault's is_encrypted() to take either text or byte strings and to return False if any part of the data is non-ascii. * Remove unnecessary use of six.b * Vault Cipher: mark a few methods as private. * VaultAES256._is_equal throws a TypeError if given non byte strings * Make VaultAES256 methods that don't need self staticmethods and classmethods * Mark VaultAES and is_encrypted as deprecated * Get rid of VaultFile (unused and feature implemented in a different way) * Normalize variable and parameter names on plaintext, ciphertext, vaulttext * Normalize variable and parameter names on "b_" prefix when dealing with bytes * Test changes: * Remove redundant tests( both checking the same byte string) * Fix use of format string without format operator * Enable vault editor tests on python3 * Initialize the vault_cipher for VaultAES256 testing in setUp() * Make assertTrue and assertFalse take the actual method calls for better error messages. * Test that non-ascii byte strings compare correctly. * Test that unicode strings and ints raise TypeError * Test-specific: * Removed test_methods_exist(). We only have one VaultLib so the implementation is the assurance that the methods exist. (Can use an abc for this if it changes). * Add tests for both byte string and text string input where the API takes either. * Convert "assert" to unittest assert functions or add a custom message where that will make failures easier to debug. * Move instantiating the VaultLib into setUp().
This commit is contained in:
parent
0139298786
commit
e70066a6f7
4 changed files with 383 additions and 366 deletions
|
@ -1,5 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
|
||||
# (c) 2016, Toshio Kuratomi <tkuratomi@ansible.com>
|
||||
#
|
||||
# This file is part of Ansible
|
||||
#
|
||||
|
@ -34,7 +35,7 @@ from ansible.compat.tests import unittest
|
|||
from ansible import errors
|
||||
from ansible.parsing.vault import VaultLib
|
||||
from ansible.parsing import vault
|
||||
from ansible.module_utils._text import to_bytes
|
||||
from ansible.module_utils._text import to_bytes, to_text
|
||||
|
||||
|
||||
# Counter import fails for 2.0.1, requires >= 2.6.1 from pip
|
||||
|
@ -60,241 +61,254 @@ except ImportError:
|
|||
|
||||
|
||||
class TestVaultIsEncrypted(unittest.TestCase):
|
||||
def test_utf8_not_encrypted(self):
|
||||
b_data = "foobar".encode('utf8')
|
||||
self.assertFalse(vault.is_encrypted(b_data))
|
||||
|
||||
def test_utf8_encrypted(self):
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
b_data = data.encode('utf8')
|
||||
self.assertTrue(vault.is_encrypted(b_data))
|
||||
|
||||
def test_bytes_not_encrypted(self):
|
||||
b_data = b"foobar"
|
||||
self.assertFalse(vault.is_encrypted(b_data))
|
||||
|
||||
def test_bytes_encrypted(self):
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" + hexlify(b"ansible")
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
self.assertTrue(vault.is_encrypted(b_data))
|
||||
|
||||
def test_unicode_not_encrypted_py3(self):
|
||||
if not six.PY3:
|
||||
raise SkipTest()
|
||||
data = u"ァ ア ィ イ ゥ ウ ェ エ ォ オ カ ガ キ ギ ク グ ケ "
|
||||
self.assertRaises(TypeError, vault.is_encrypted, data)
|
||||
def test_text_not_encrypted(self):
|
||||
b_data = to_text(b"foobar")
|
||||
self.assertFalse(vault.is_encrypted(b_data))
|
||||
|
||||
def test_unicode_not_encrypted_py2(self):
|
||||
if six.PY3:
|
||||
raise SkipTest()
|
||||
data = u"ァ ア ィ イ ゥ ウ ェ エ ォ オ カ ガ キ ギ ク グ ケ "
|
||||
# py2 will take a unicode string, but that should always fails
|
||||
def test_text_encrypted(self):
|
||||
b_data = to_text(b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible"))
|
||||
self.assertTrue(vault.is_encrypted(b_data))
|
||||
|
||||
def test_invalid_text_not_ascii(self):
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s"% u"ァ ア ィ イ ゥ ウ ェ エ ォ オ カ ガ キ ギ ク グ ケ "
|
||||
self.assertFalse(vault.is_encrypted(data))
|
||||
|
||||
def test_unicode_is_encrypted_py3(self):
|
||||
if not six.PY3:
|
||||
raise SkipTest()
|
||||
data = "$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
# should still be a type error
|
||||
self.assertRaises(TypeError, vault.is_encrypted, data)
|
||||
|
||||
def test_unicode_is_encrypted_py2(self):
|
||||
if six.PY3:
|
||||
raise SkipTest()
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
# THis works, but arguably shouldn't...
|
||||
self.assertTrue(vault.is_encrypted(data))
|
||||
def test_invalid_bytes_not_ascii(self):
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s"% u"ァ ア ィ イ ゥ ウ ェ エ ォ オ カ ガ キ ギ ク グ ケ "
|
||||
b_data = to_bytes(data, encoding='utf-8')
|
||||
self.assertFalse(vault.is_encrypted(b_data))
|
||||
|
||||
|
||||
class TestVaultIsEncryptedFile(unittest.TestCase):
|
||||
def test_utf8_not_encrypted(self):
|
||||
b_data = "foobar".encode('utf8')
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
self.assertFalse(vault.is_encrypted_file(b_data_fo))
|
||||
|
||||
def test_utf8_encrypted(self):
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
b_data = data.encode('utf8')
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
self.assertTrue(vault.is_encrypted_file(b_data_fo))
|
||||
|
||||
def test_bytes_not_encrypted(self):
|
||||
def test_binary_file_handle_not_encrypted(self):
|
||||
b_data = b"foobar"
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
self.assertFalse(vault.is_encrypted_file(b_data_fo))
|
||||
|
||||
def test_bytes_encrypted(self):
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" + hexlify(b"ansible")
|
||||
def test_text_file_handle_not_encrypted(self):
|
||||
data = u"foobar"
|
||||
data_fo = io.StringIO(data)
|
||||
self.assertFalse(vault.is_encrypted_file(data_fo))
|
||||
|
||||
def test_binary_file_handle_encrypted(self):
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
self.assertTrue(vault.is_encrypted_file(b_data_fo))
|
||||
|
||||
def test_text_file_handle_encrypted(self):
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % to_text(hexlify(b"ansible"))
|
||||
data_fo = io.StringIO(data)
|
||||
self.assertTrue(vault.is_encrypted_file(data_fo))
|
||||
|
||||
def test_binary_file_handle_invalid(self):
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % u"ァ ア ィ イ ゥ ウ ェ エ ォ オ カ ガ キ ギ ク グ ケ "
|
||||
b_data = to_bytes(data)
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
self.assertFalse(vault.is_encrypted_file(b_data_fo))
|
||||
|
||||
def test_text_file_handle_invalid(self):
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % u"ァ ア ィ イ ゥ ウ ェ エ ォ オ カ ガ キ ギ ク グ ケ "
|
||||
data_fo = io.StringIO(data)
|
||||
self.assertFalse(vault.is_encrypted_file(data_fo))
|
||||
|
||||
def test_file_already_read_from_finds_header(self):
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible\ntesting\nfile pos")
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
b_data_fo.read(42) # Arbitrary number
|
||||
self.assertTrue(vault.is_encrypted_file(b_data_fo))
|
||||
|
||||
def test_file_already_read_from_saves_file_pos(self):
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible\ntesting\nfile pos")
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
b_data_fo.read(69) # Arbitrary number
|
||||
vault.is_encrypted_file(b_data_fo)
|
||||
self.assertEqual(b_data_fo.tell(), 69)
|
||||
|
||||
def test_file_with_offset(self):
|
||||
b_data = b"JUNK$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible\ntesting\nfile pos")
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
self.assertTrue(vault.is_encrypted_file(b_data_fo, start_pos=4))
|
||||
|
||||
def test_file_with_count(self):
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible\ntesting\nfile pos")
|
||||
vault_length = len(b_data)
|
||||
b_data = b_data + u'ァ ア'.encode('utf-8')
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
self.assertTrue(vault.is_encrypted_file(b_data_fo, count=vault_length))
|
||||
|
||||
def test_file_with_offset_and_count(self):
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible\ntesting\nfile pos")
|
||||
vault_length = len(b_data)
|
||||
b_data = b'JUNK' + b_data + u'ァ ア'.encode('utf-8')
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
self.assertTrue(vault.is_encrypted_file(b_data_fo, start_pos=4, count=vault_length))
|
||||
|
||||
|
||||
class TestVaultCipherAes256(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.vault_cipher = vault.VaultAES256()
|
||||
|
||||
def test(self):
|
||||
vault_cipher = vault.VaultAES256()
|
||||
self.assertIsInstance(vault_cipher, vault.VaultAES256)
|
||||
self.assertIsInstance(self.vault_cipher, vault.VaultAES256)
|
||||
|
||||
# TODO: tag these as slow tests
|
||||
def test_create_key(self):
|
||||
vault_cipher = vault.VaultAES256()
|
||||
password = 'hunter42'
|
||||
b_password = b'hunter42'
|
||||
b_salt = os.urandom(32)
|
||||
b_key = vault_cipher.create_key(password=password, salt=b_salt, keylength=32, ivlength=16)
|
||||
b_key = self.vault_cipher._create_key(b_password, b_salt, keylength=32, ivlength=16)
|
||||
self.assertIsInstance(b_key, six.binary_type)
|
||||
|
||||
def test_create_key_known(self):
|
||||
vault_cipher = vault.VaultAES256()
|
||||
password = 'hunter42'
|
||||
b_password = b'hunter42'
|
||||
|
||||
# A fixed salt
|
||||
b_salt = b'q' * 32 # q is the most random letter.
|
||||
b_key = vault_cipher.create_key(password=password, salt=b_salt, keylength=32, ivlength=16)
|
||||
b_key = self.vault_cipher._create_key(b_password, b_salt, keylength=32, ivlength=16)
|
||||
self.assertIsInstance(b_key, six.binary_type)
|
||||
|
||||
# verify we get the same answer
|
||||
# we could potentially run a few iterations of this and time it to see if it's roughly constant time
|
||||
# and or that it exceeds some minimal time, but that would likely cause unreliable fails, esp in CI
|
||||
b_key_2 = vault_cipher.create_key(password=password, salt=b_salt, keylength=32, ivlength=16)
|
||||
b_key_2 = self.vault_cipher._create_key(b_password, b_salt, keylength=32, ivlength=16)
|
||||
self.assertIsInstance(b_key, six.binary_type)
|
||||
self.assertEqual(b_key, b_key_2)
|
||||
|
||||
def test_is_equal_is_equal(self):
|
||||
vault_cipher = vault.VaultAES256()
|
||||
res = vault_cipher.is_equal(b'abcdefghijklmnopqrstuvwxyz', b'abcdefghijklmnopqrstuvwxyz')
|
||||
self.assertTrue(res)
|
||||
self.assertTrue(self.vault_cipher._is_equal(b'abcdefghijklmnopqrstuvwxyz', b'abcdefghijklmnopqrstuvwxyz'))
|
||||
|
||||
def test_is_equal_unequal_length(self):
|
||||
vault_cipher = vault.VaultAES256()
|
||||
res = vault_cipher.is_equal(b'abcdefghijklmnopqrstuvwxyz', b'abcdefghijklmnopqrstuvwx and sometimes y')
|
||||
self.assertFalse(res)
|
||||
self.assertFalse(self.vault_cipher._is_equal(b'abcdefghijklmnopqrstuvwxyz', b'abcdefghijklmnopqrstuvwx and sometimes y'))
|
||||
|
||||
def test_is_equal_not_equal(self):
|
||||
vault_cipher = vault.VaultAES256()
|
||||
res = vault_cipher.is_equal(b'abcdefghijklmnopqrstuvwxyz', b'AbcdefghijKlmnopQrstuvwxZ')
|
||||
self.assertFalse(res)
|
||||
self.assertFalse(self.vault_cipher._is_equal(b'abcdefghijklmnopqrstuvwxyz', b'AbcdefghijKlmnopQrstuvwxZ'))
|
||||
|
||||
def test_is_equal_empty(self):
|
||||
vault_cipher = vault.VaultAES256()
|
||||
res = vault_cipher.is_equal(b'', b'')
|
||||
self.assertTrue(res)
|
||||
self.assertTrue(self.vault_cipher._is_equal(b'', b''))
|
||||
|
||||
# NOTE: I'm not really sure what the method should do if it doesn't get bytes,
|
||||
# but this at least sees if it explodes (maybe it should?)
|
||||
def test_is_equal_unicode_py3(self):
|
||||
if not six.PY3:
|
||||
raise SkipTest
|
||||
vault_cipher = vault.VaultAES256()
|
||||
self.assertRaises(TypeError, vault_cipher.is_equal,
|
||||
u'私はガラスを食べられます。それは私を傷つけません。',
|
||||
u'私はガラスを食べられます。それは私を傷つけません。')
|
||||
def test_is_equal_non_ascii_equal(self):
|
||||
utf8_data = to_bytes(u'私はガラスを食べられます。それは私を傷つけません。')
|
||||
self.assertTrue(self.vault_cipher._is_equal(utf8_data, utf8_data))
|
||||
|
||||
def test_is_equal_unicode_py2(self):
|
||||
if not six.PY2:
|
||||
raise SkipTest
|
||||
vault_cipher = vault.VaultAES256()
|
||||
res = vault_cipher.is_equal(u'私はガラスを食べられます。それは私を傷つけません。',
|
||||
u'私はガラスを食べられます。それは私を傷つけません。')
|
||||
self.assertTrue(res)
|
||||
def test_is_equal_non_ascii_unequal(self):
|
||||
utf8_data = to_bytes(u'私はガラスを食べられます。それは私を傷つけません。')
|
||||
utf8_data2 = to_bytes(u'Pot să mănânc sticlă și ea nu mă rănește.')
|
||||
|
||||
def test_is_equal_unicode_different(self):
|
||||
vault_cipher = vault.VaultAES256()
|
||||
res = vault_cipher.is_equal(u'私はガラスを食べられます。それは私を傷つけません。',
|
||||
u'Pot să mănânc sticlă și ea nu mă rănește.')
|
||||
self.assertFalse(res)
|
||||
# Test for the len optimization path
|
||||
self.assertFalse(self.vault_cipher._is_equal(utf8_data, utf8_data2))
|
||||
# Test for the slower, char by char comparison path
|
||||
self.assertFalse(self.vault_cipher._is_equal(utf8_data, utf8_data[:-1] + b'P'))
|
||||
|
||||
def test_is_equal_non_bytes(self):
|
||||
""" Anything not a byte string should raise a TypeError """
|
||||
self.assertRaises(TypeError, self.vault_cipher._is_equal, u"One fish", b"two fish")
|
||||
self.assertRaises(TypeError, self.vault_cipher._is_equal, b"One fish", u"two fish")
|
||||
self.assertRaises(TypeError, self.vault_cipher._is_equal, 1, b"red fish")
|
||||
self.assertRaises(TypeError, self.vault_cipher._is_equal, b"blue fish", 2)
|
||||
|
||||
|
||||
class TestVaultLib(unittest.TestCase):
|
||||
|
||||
def test_methods_exist(self):
|
||||
v = VaultLib('ansible')
|
||||
slots = ['is_encrypted',
|
||||
'encrypt',
|
||||
'decrypt',
|
||||
'_format_output',
|
||||
'_split_header',]
|
||||
for slot in slots:
|
||||
assert hasattr(v, slot), "VaultLib is missing the %s method" % slot
|
||||
def setUp(self):
|
||||
self.v = VaultLib('test-vault-password')
|
||||
|
||||
def test_encrypt(self):
|
||||
v = VaultLib(password='the_unit_test_password')
|
||||
plaintext = u'Some text to encrypt.'
|
||||
ciphertext = v.encrypt(plaintext)
|
||||
plaintext = u'Some text to encrypt in a café'
|
||||
b_vaulttext = self.v.encrypt(plaintext)
|
||||
|
||||
self.assertIsInstance(ciphertext, (bytes, str))
|
||||
# TODO: assert something...
|
||||
self.assertIsInstance(b_vaulttext, six.binary_type)
|
||||
|
||||
b_header = b'$ANSIBLE_VAULT;1.1;AES256\n'
|
||||
self.assertEqual(b_vaulttext[:len(b_header)], b_header)
|
||||
|
||||
def test_encrypt_bytes(self):
|
||||
|
||||
plaintext = to_bytes(u'Some text to encrypt in a café')
|
||||
b_vaulttext = self.v.encrypt(plaintext)
|
||||
|
||||
self.assertIsInstance(b_vaulttext, six.binary_type)
|
||||
|
||||
b_header = b'$ANSIBLE_VAULT;1.1;AES256\n'
|
||||
self.assertEqual(b_vaulttext[:len(b_header)], b_header)
|
||||
|
||||
def test_is_encrypted(self):
|
||||
v = VaultLib(None)
|
||||
assert not v.is_encrypted("foobar".encode('utf-8')), "encryption check on plaintext failed"
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
assert v.is_encrypted(data.encode('utf-8')), "encryption check on headered text failed"
|
||||
|
||||
def test_is_encrypted_bytes(self):
|
||||
v = VaultLib(None)
|
||||
assert not v.is_encrypted(b"foobar"), "encryption check on plaintext failed"
|
||||
data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" + hexlify(b"ansible")
|
||||
assert v.is_encrypted(data), "encryption check on headered text failed"
|
||||
self.assertFalse(self.v.is_encrypted(b"foobar"), msg="encryption check on plaintext yielded false positive")
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
self.assertTrue(self.v.is_encrypted(b_data), msg="encryption check on headered text failed")
|
||||
|
||||
def test_format_output(self):
|
||||
v = VaultLib('ansible')
|
||||
v.cipher_name = "TEST"
|
||||
sensitive_data = b"ansible"
|
||||
data = v._format_output(sensitive_data)
|
||||
lines = data.split(b'\n')
|
||||
assert len(lines) > 1, "failed to properly add header"
|
||||
header = to_bytes(lines[0])
|
||||
assert header.endswith(b';TEST'), "header does end with cipher name"
|
||||
header_parts = header.split(b';')
|
||||
assert len(header_parts) == 3, "header has the wrong number of parts"
|
||||
assert header_parts[0] == b'$ANSIBLE_VAULT', "header does not start with $ANSIBLE_VAULT"
|
||||
assert header_parts[1] == v.b_version, "header version is incorrect"
|
||||
assert header_parts[2] == b'TEST', "header does end with cipher name"
|
||||
self.v.cipher_name = "TEST"
|
||||
b_ciphertext = b"ansible"
|
||||
b_vaulttext = self.v._format_output(b_ciphertext)
|
||||
b_lines = b_vaulttext.split(b'\n')
|
||||
self.assertGreater(len(b_lines), 1, msg="failed to properly add header")
|
||||
|
||||
b_header = b_lines[0]
|
||||
self.assertTrue(b_header.endswith(b';TEST'), msg="header does not end with cipher name")
|
||||
|
||||
b_header_parts = b_header.split(b';')
|
||||
self.assertEqual(len(b_header_parts), 3, msg="header has the wrong number of parts")
|
||||
self.assertEqual(b_header_parts[0], b'$ANSIBLE_VAULT', msg="header does not start with $ANSIBLE_VAULT")
|
||||
self.assertEqual(b_header_parts[1], self.v.b_version, msg="header version is incorrect")
|
||||
self.assertEqual(b_header_parts[2], b'TEST', msg="header does not end with cipher name")
|
||||
|
||||
def test_split_header(self):
|
||||
v = VaultLib('ansible')
|
||||
data = b"$ANSIBLE_VAULT;9.9;TEST\nansible"
|
||||
rdata = v._split_header(data)
|
||||
lines = rdata.split(b'\n')
|
||||
assert lines[0] == b"ansible"
|
||||
assert v.cipher_name == 'TEST', "cipher name was not set"
|
||||
assert v.b_version == b"9.9"
|
||||
b_vaulttext = b"$ANSIBLE_VAULT;9.9;TEST\nansible"
|
||||
b_ciphertext = self.v._split_header(b_vaulttext)
|
||||
b_lines = b_ciphertext.split(b'\n')
|
||||
self.assertEqual(b_lines[0], b"ansible", msg="Payload was not properly split from the header")
|
||||
self.assertEqual(self.v.cipher_name, u'TEST', msg="cipher name was not properly set")
|
||||
self.assertEqual(self.v.b_version, b"9.9", msg="version was not properly set")
|
||||
|
||||
def test_encrypt_decrypt_aes(self):
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
v = VaultLib('ansible')
|
||||
v.cipher_name = u'AES'
|
||||
self.v.cipher_name = u'AES'
|
||||
self.v.b_password = b'ansible'
|
||||
# AES encryption code has been removed, so this is old output for
|
||||
# AES-encrypted 'foobar' with password 'ansible'.
|
||||
enc_data = b'$ANSIBLE_VAULT;1.1;AES\n53616c7465645f5fc107ce1ef4d7b455e038a13b053225776458052f8f8f332d554809d3f150bfa3\nfe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e\n786a5a15efeb787e1958cbdd480d076c\n'
|
||||
dec_data = v.decrypt(enc_data)
|
||||
assert dec_data == b"foobar", "decryption failed"
|
||||
b_vaulttext = b'''$ANSIBLE_VAULT;1.1;AES
|
||||
53616c7465645f5fc107ce1ef4d7b455e038a13b053225776458052f8f8f332d554809d3f150bfa3
|
||||
fe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e
|
||||
786a5a15efeb787e1958cbdd480d076c
|
||||
'''
|
||||
b_plaintext = self.v.decrypt(b_vaulttext)
|
||||
self.assertEqual(b_plaintext, b"foobar", msg="decryption failed")
|
||||
|
||||
def test_encrypt_decrypt_aes256(self):
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
v = VaultLib('ansible')
|
||||
v.cipher_name = 'AES256'
|
||||
plaintext = "foobar"
|
||||
enc_data = v.encrypt(plaintext)
|
||||
dec_data = v.decrypt(enc_data)
|
||||
assert enc_data != b"foobar", "encryption failed"
|
||||
assert dec_data == b"foobar", "decryption failed"
|
||||
self.v.cipher_name = u'AES256'
|
||||
plaintext = u"foobar"
|
||||
b_vaulttext = self.v.encrypt(plaintext)
|
||||
b_plaintext = self.v.decrypt(b_vaulttext)
|
||||
self.assertNotEqual(b_vaulttext, b"foobar", msg="encryption failed")
|
||||
self.assertEqual(b_plaintext, b"foobar", msg="decryption failed")
|
||||
|
||||
def test_encrypt_decrypt_aes256_existing_vault(self):
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
v = VaultLib('test-vault-password')
|
||||
v.cipher_name = 'AES256'
|
||||
plaintext = b"Setec Astronomy"
|
||||
enc_data = '''$ANSIBLE_VAULT;1.1;AES256
|
||||
self.v.cipher_name = u'AES256'
|
||||
b_orig_plaintext = b"Setec Astronomy"
|
||||
vaulttext = u'''$ANSIBLE_VAULT;1.1;AES256
|
||||
33363965326261303234626463623963633531343539616138316433353830356566396130353436
|
||||
3562643163366231316662386565383735653432386435610a306664636137376132643732393835
|
||||
63383038383730306639353234326630666539346233376330303938323639306661313032396437
|
||||
6233623062366136310a633866373936313238333730653739323461656662303864663666653563
|
||||
3138'''
|
||||
|
||||
dec_data = v.decrypt(enc_data)
|
||||
assert dec_data == plaintext, "decryption failed"
|
||||
b_plaintext = self.v.decrypt(vaulttext)
|
||||
self.assertEqual(b_plaintext, b_plaintext, msg="decryption failed")
|
||||
|
||||
b_vaulttext = to_bytes(vaulttext, encoding='ascii', errors='strict')
|
||||
b_plaintext = self.v.decrypt(b_vaulttext)
|
||||
self.assertEqual(b_plaintext, b_orig_plaintext, msg="decryption failed")
|
||||
|
||||
def test_encrypt_decrypt_aes256_bad_hmac(self):
|
||||
# FIXME This test isn't working quite yet.
|
||||
|
@ -302,8 +316,7 @@ class TestVaultLib(unittest.TestCase):
|
|||
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
v = VaultLib('test-vault-password')
|
||||
v.cipher_name = 'AES256'
|
||||
self.v.cipher_name = 'AES256'
|
||||
# plaintext = "Setec Astronomy"
|
||||
enc_data = '''$ANSIBLE_VAULT;1.1;AES256
|
||||
33363965326261303234626463623963633531343539616138316433353830356566396130353436
|
||||
|
@ -312,7 +325,7 @@ class TestVaultLib(unittest.TestCase):
|
|||
6233623062366136310a633866373936313238333730653739323461656662303864663666653563
|
||||
3138'''
|
||||
b_data = to_bytes(enc_data, errors='strict', encoding='utf-8')
|
||||
b_data = v._split_header(b_data)
|
||||
b_data = self.v._split_header(b_data)
|
||||
foo = binascii.unhexlify(b_data)
|
||||
lines = foo.splitlines()
|
||||
# line 0 is salt, line 1 is hmac, line 2+ is ciphertext
|
||||
|
@ -331,31 +344,33 @@ class TestVaultLib(unittest.TestCase):
|
|||
b_ciphertext_data = binascii.hexlify(b_ciphertext)
|
||||
b_payload = b'\n'.join([b_salt, b_hmac, b_ciphertext_data])
|
||||
# reformat
|
||||
b_invalid_ciphertext = v._format_output(b_payload)
|
||||
b_invalid_ciphertext = self.v._format_output(b_payload)
|
||||
|
||||
# assert we throw an error
|
||||
v.decrypt(b_invalid_ciphertext)
|
||||
self.v.decrypt(b_invalid_ciphertext)
|
||||
|
||||
def test_encrypt_encrypted(self):
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
v = VaultLib('ansible')
|
||||
v.cipher_name = 'AES'
|
||||
data = "$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(six.b("ansible"))
|
||||
self.assertRaises(errors.AnsibleError, v.encrypt, data,)
|
||||
self.v.cipher_name = u'AES'
|
||||
b_vaulttext = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
vaulttext = to_text(b_vaulttext, errors='strict')
|
||||
self.assertRaises(errors.AnsibleError, self.v.encrypt, b_vaulttext)
|
||||
self.assertRaises(errors.AnsibleError, self.v.encrypt, vaulttext)
|
||||
|
||||
def test_decrypt_decrypted(self):
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
v = VaultLib('ansible')
|
||||
data = "ansible"
|
||||
self.assertRaises(errors.AnsibleError, v.decrypt, data)
|
||||
plaintext = u"ansible"
|
||||
self.assertRaises(errors.AnsibleError, self.v.decrypt, plaintext)
|
||||
|
||||
b_plaintext = b"ansible"
|
||||
self.assertRaises(errors.AnsibleError, self.v.decrypt, b_plaintext)
|
||||
|
||||
def test_cipher_not_set(self):
|
||||
# not setting the cipher should default to AES256
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
v = VaultLib('ansible')
|
||||
data = "ansible"
|
||||
v.encrypt(data)
|
||||
self.assertEquals(v.cipher_name, "AES256")
|
||||
plaintext = u"ansible"
|
||||
self.v.encrypt(plaintext)
|
||||
self.assertEquals(self.v.cipher_name, "AES256")
|
||||
|
|
|
@ -104,12 +104,8 @@ class TestVaultEditor(unittest.TestCase):
|
|||
|
||||
self.assertTrue(os.path.exists(tmp_file.name))
|
||||
|
||||
@unittest.skipIf(sys.version_info[0] >= 3, "VaultAES still needs to be ported to Python 3")
|
||||
def test_decrypt_1_0(self):
|
||||
"""
|
||||
Skip testing decrypting 1.0 files if we don't have access to AES, KDF or
|
||||
Counter, or we are running on python3 since VaultAES hasn't been backported.
|
||||
"""
|
||||
# Skip testing decrypting 1.0 files if we don't have access to AES, KDF or Counter.
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
|
||||
|
@ -163,12 +159,8 @@ class TestVaultEditor(unittest.TestCase):
|
|||
assert error_hit is False, "error decrypting 1.0 file"
|
||||
assert fdata.strip() == "foo", "incorrect decryption of 1.0 file: %s" % fdata.strip()
|
||||
|
||||
@unittest.skipIf(sys.version_info[0] >= 3, "VaultAES still needs to be ported to Python 3")
|
||||
def test_rekey_migration(self):
|
||||
"""
|
||||
Skip testing rekeying files if we don't have access to AES, KDF or
|
||||
Counter, or we are running on python3 since VaultAES hasn't been backported.
|
||||
"""
|
||||
# Skip testing rekeying files if we don't have access to AES, KDF or Counter.
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
|
||||
|
@ -205,4 +197,4 @@ class TestVaultEditor(unittest.TestCase):
|
|||
|
||||
assert vl.cipher_name == "AES256", "wrong cipher name set after rekey: %s" % vl.cipher_name
|
||||
assert error_hit is False, "error decrypting migrated 1.0 file"
|
||||
assert dec_data.strip() == "foo", "incorrect decryption of rekeyed/migrated file: %s" % dec_data
|
||||
assert dec_data.strip() == b"foo", "incorrect decryption of rekeyed/migrated file: %s" % dec_data
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue