mirror of
https://github.com/ansible-collections/community.general.git
synced 2025-07-27 23:21:22 -07:00
commit
8af8eec789
8 changed files with 389 additions and 374 deletions
|
@ -24,11 +24,9 @@ import shutil
|
|||
import tarfile
|
||||
import tempfile
|
||||
|
||||
from mock import patch
|
||||
|
||||
from ansible.compat.six import PY3
|
||||
from ansible.compat.tests import unittest
|
||||
from mock import patch, call
|
||||
from ansible.compat.tests.mock import call, patch
|
||||
|
||||
import ansible
|
||||
from ansible.errors import AnsibleError, AnsibleOptionsError
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import collections
|
||||
import mock
|
||||
import os
|
||||
import re
|
||||
|
||||
|
@ -24,6 +23,8 @@ except ImportError:
|
|||
else: excName = str(expected)
|
||||
raise AssertionError("%s not raised" % excName)
|
||||
|
||||
from ansible.compat.tests import mock
|
||||
|
||||
from ansible.module_utils.database import (
|
||||
pg_quote_identifier,
|
||||
SQLParseError,
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
import collections
|
||||
import mock
|
||||
import os
|
||||
import sys
|
||||
from ansible.compat.tests import unittest
|
||||
|
||||
from ansible.compat.tests import mock
|
||||
from ansible.compat.tests import unittest
|
||||
|
||||
try:
|
||||
from ansible.modules.core.packaging.os.apt import (
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
|
||||
# (c) 2016, Toshio Kuratomi <tkuratomi@ansible.com>
|
||||
#
|
||||
# This file is part of Ansible
|
||||
#
|
||||
|
@ -34,7 +35,7 @@ from ansible.compat.tests import unittest
|
|||
from ansible import errors
|
||||
from ansible.parsing.vault import VaultLib
|
||||
from ansible.parsing import vault
|
||||
from ansible.module_utils._text import to_bytes
|
||||
from ansible.module_utils._text import to_bytes, to_text
|
||||
|
||||
|
||||
# Counter import fails for 2.0.1, requires >= 2.6.1 from pip
|
||||
|
@ -60,241 +61,254 @@ except ImportError:
|
|||
|
||||
|
||||
class TestVaultIsEncrypted(unittest.TestCase):
|
||||
def test_utf8_not_encrypted(self):
|
||||
b_data = "foobar".encode('utf8')
|
||||
self.assertFalse(vault.is_encrypted(b_data))
|
||||
|
||||
def test_utf8_encrypted(self):
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
b_data = data.encode('utf8')
|
||||
self.assertTrue(vault.is_encrypted(b_data))
|
||||
|
||||
def test_bytes_not_encrypted(self):
|
||||
b_data = b"foobar"
|
||||
self.assertFalse(vault.is_encrypted(b_data))
|
||||
|
||||
def test_bytes_encrypted(self):
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" + hexlify(b"ansible")
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
self.assertTrue(vault.is_encrypted(b_data))
|
||||
|
||||
def test_unicode_not_encrypted_py3(self):
|
||||
if not six.PY3:
|
||||
raise SkipTest()
|
||||
data = u"ァ ア ィ イ ゥ ウ ェ エ ォ オ カ ガ キ ギ ク グ ケ "
|
||||
self.assertRaises(TypeError, vault.is_encrypted, data)
|
||||
def test_text_not_encrypted(self):
|
||||
b_data = to_text(b"foobar")
|
||||
self.assertFalse(vault.is_encrypted(b_data))
|
||||
|
||||
def test_unicode_not_encrypted_py2(self):
|
||||
if six.PY3:
|
||||
raise SkipTest()
|
||||
data = u"ァ ア ィ イ ゥ ウ ェ エ ォ オ カ ガ キ ギ ク グ ケ "
|
||||
# py2 will take a unicode string, but that should always fails
|
||||
def test_text_encrypted(self):
|
||||
b_data = to_text(b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible"))
|
||||
self.assertTrue(vault.is_encrypted(b_data))
|
||||
|
||||
def test_invalid_text_not_ascii(self):
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s"% u"ァ ア ィ イ ゥ ウ ェ エ ォ オ カ ガ キ ギ ク グ ケ "
|
||||
self.assertFalse(vault.is_encrypted(data))
|
||||
|
||||
def test_unicode_is_encrypted_py3(self):
|
||||
if not six.PY3:
|
||||
raise SkipTest()
|
||||
data = "$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
# should still be a type error
|
||||
self.assertRaises(TypeError, vault.is_encrypted, data)
|
||||
|
||||
def test_unicode_is_encrypted_py2(self):
|
||||
if six.PY3:
|
||||
raise SkipTest()
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
# THis works, but arguably shouldn't...
|
||||
self.assertTrue(vault.is_encrypted(data))
|
||||
def test_invalid_bytes_not_ascii(self):
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s"% u"ァ ア ィ イ ゥ ウ ェ エ ォ オ カ ガ キ ギ ク グ ケ "
|
||||
b_data = to_bytes(data, encoding='utf-8')
|
||||
self.assertFalse(vault.is_encrypted(b_data))
|
||||
|
||||
|
||||
class TestVaultIsEncryptedFile(unittest.TestCase):
|
||||
def test_utf8_not_encrypted(self):
|
||||
b_data = "foobar".encode('utf8')
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
self.assertFalse(vault.is_encrypted_file(b_data_fo))
|
||||
|
||||
def test_utf8_encrypted(self):
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
b_data = data.encode('utf8')
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
self.assertTrue(vault.is_encrypted_file(b_data_fo))
|
||||
|
||||
def test_bytes_not_encrypted(self):
|
||||
def test_binary_file_handle_not_encrypted(self):
|
||||
b_data = b"foobar"
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
self.assertFalse(vault.is_encrypted_file(b_data_fo))
|
||||
|
||||
def test_bytes_encrypted(self):
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" + hexlify(b"ansible")
|
||||
def test_text_file_handle_not_encrypted(self):
|
||||
data = u"foobar"
|
||||
data_fo = io.StringIO(data)
|
||||
self.assertFalse(vault.is_encrypted_file(data_fo))
|
||||
|
||||
def test_binary_file_handle_encrypted(self):
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
self.assertTrue(vault.is_encrypted_file(b_data_fo))
|
||||
|
||||
def test_text_file_handle_encrypted(self):
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % to_text(hexlify(b"ansible"))
|
||||
data_fo = io.StringIO(data)
|
||||
self.assertTrue(vault.is_encrypted_file(data_fo))
|
||||
|
||||
def test_binary_file_handle_invalid(self):
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % u"ァ ア ィ イ ゥ ウ ェ エ ォ オ カ ガ キ ギ ク グ ケ "
|
||||
b_data = to_bytes(data)
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
self.assertFalse(vault.is_encrypted_file(b_data_fo))
|
||||
|
||||
def test_text_file_handle_invalid(self):
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % u"ァ ア ィ イ ゥ ウ ェ エ ォ オ カ ガ キ ギ ク グ ケ "
|
||||
data_fo = io.StringIO(data)
|
||||
self.assertFalse(vault.is_encrypted_file(data_fo))
|
||||
|
||||
def test_file_already_read_from_finds_header(self):
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible\ntesting\nfile pos")
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
b_data_fo.read(42) # Arbitrary number
|
||||
self.assertTrue(vault.is_encrypted_file(b_data_fo))
|
||||
|
||||
def test_file_already_read_from_saves_file_pos(self):
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible\ntesting\nfile pos")
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
b_data_fo.read(69) # Arbitrary number
|
||||
vault.is_encrypted_file(b_data_fo)
|
||||
self.assertEqual(b_data_fo.tell(), 69)
|
||||
|
||||
def test_file_with_offset(self):
|
||||
b_data = b"JUNK$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible\ntesting\nfile pos")
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
self.assertTrue(vault.is_encrypted_file(b_data_fo, start_pos=4))
|
||||
|
||||
def test_file_with_count(self):
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible\ntesting\nfile pos")
|
||||
vault_length = len(b_data)
|
||||
b_data = b_data + u'ァ ア'.encode('utf-8')
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
self.assertTrue(vault.is_encrypted_file(b_data_fo, count=vault_length))
|
||||
|
||||
def test_file_with_offset_and_count(self):
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible\ntesting\nfile pos")
|
||||
vault_length = len(b_data)
|
||||
b_data = b'JUNK' + b_data + u'ァ ア'.encode('utf-8')
|
||||
b_data_fo = io.BytesIO(b_data)
|
||||
self.assertTrue(vault.is_encrypted_file(b_data_fo, start_pos=4, count=vault_length))
|
||||
|
||||
|
||||
class TestVaultCipherAes256(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.vault_cipher = vault.VaultAES256()
|
||||
|
||||
def test(self):
|
||||
vault_cipher = vault.VaultAES256()
|
||||
self.assertIsInstance(vault_cipher, vault.VaultAES256)
|
||||
self.assertIsInstance(self.vault_cipher, vault.VaultAES256)
|
||||
|
||||
# TODO: tag these as slow tests
|
||||
def test_create_key(self):
|
||||
vault_cipher = vault.VaultAES256()
|
||||
password = 'hunter42'
|
||||
b_password = b'hunter42'
|
||||
b_salt = os.urandom(32)
|
||||
b_key = vault_cipher.create_key(password=password, salt=b_salt, keylength=32, ivlength=16)
|
||||
b_key = self.vault_cipher._create_key(b_password, b_salt, keylength=32, ivlength=16)
|
||||
self.assertIsInstance(b_key, six.binary_type)
|
||||
|
||||
def test_create_key_known(self):
|
||||
vault_cipher = vault.VaultAES256()
|
||||
password = 'hunter42'
|
||||
b_password = b'hunter42'
|
||||
|
||||
# A fixed salt
|
||||
b_salt = b'q' * 32 # q is the most random letter.
|
||||
b_key = vault_cipher.create_key(password=password, salt=b_salt, keylength=32, ivlength=16)
|
||||
b_key = self.vault_cipher._create_key(b_password, b_salt, keylength=32, ivlength=16)
|
||||
self.assertIsInstance(b_key, six.binary_type)
|
||||
|
||||
# verify we get the same answer
|
||||
# we could potentially run a few iterations of this and time it to see if it's roughly constant time
|
||||
# and or that it exceeds some minimal time, but that would likely cause unreliable fails, esp in CI
|
||||
b_key_2 = vault_cipher.create_key(password=password, salt=b_salt, keylength=32, ivlength=16)
|
||||
b_key_2 = self.vault_cipher._create_key(b_password, b_salt, keylength=32, ivlength=16)
|
||||
self.assertIsInstance(b_key, six.binary_type)
|
||||
self.assertEqual(b_key, b_key_2)
|
||||
|
||||
def test_is_equal_is_equal(self):
|
||||
vault_cipher = vault.VaultAES256()
|
||||
res = vault_cipher.is_equal(b'abcdefghijklmnopqrstuvwxyz', b'abcdefghijklmnopqrstuvwxyz')
|
||||
self.assertTrue(res)
|
||||
self.assertTrue(self.vault_cipher._is_equal(b'abcdefghijklmnopqrstuvwxyz', b'abcdefghijklmnopqrstuvwxyz'))
|
||||
|
||||
def test_is_equal_unequal_length(self):
|
||||
vault_cipher = vault.VaultAES256()
|
||||
res = vault_cipher.is_equal(b'abcdefghijklmnopqrstuvwxyz', b'abcdefghijklmnopqrstuvwx and sometimes y')
|
||||
self.assertFalse(res)
|
||||
self.assertFalse(self.vault_cipher._is_equal(b'abcdefghijklmnopqrstuvwxyz', b'abcdefghijklmnopqrstuvwx and sometimes y'))
|
||||
|
||||
def test_is_equal_not_equal(self):
|
||||
vault_cipher = vault.VaultAES256()
|
||||
res = vault_cipher.is_equal(b'abcdefghijklmnopqrstuvwxyz', b'AbcdefghijKlmnopQrstuvwxZ')
|
||||
self.assertFalse(res)
|
||||
self.assertFalse(self.vault_cipher._is_equal(b'abcdefghijklmnopqrstuvwxyz', b'AbcdefghijKlmnopQrstuvwxZ'))
|
||||
|
||||
def test_is_equal_empty(self):
|
||||
vault_cipher = vault.VaultAES256()
|
||||
res = vault_cipher.is_equal(b'', b'')
|
||||
self.assertTrue(res)
|
||||
self.assertTrue(self.vault_cipher._is_equal(b'', b''))
|
||||
|
||||
# NOTE: I'm not really sure what the method should do if it doesn't get bytes,
|
||||
# but this at least sees if it explodes (maybe it should?)
|
||||
def test_is_equal_unicode_py3(self):
|
||||
if not six.PY3:
|
||||
raise SkipTest
|
||||
vault_cipher = vault.VaultAES256()
|
||||
self.assertRaises(TypeError, vault_cipher.is_equal,
|
||||
u'私はガラスを食べられます。それは私を傷つけません。',
|
||||
u'私はガラスを食べられます。それは私を傷つけません。')
|
||||
def test_is_equal_non_ascii_equal(self):
|
||||
utf8_data = to_bytes(u'私はガラスを食べられます。それは私を傷つけません。')
|
||||
self.assertTrue(self.vault_cipher._is_equal(utf8_data, utf8_data))
|
||||
|
||||
def test_is_equal_unicode_py2(self):
|
||||
if not six.PY2:
|
||||
raise SkipTest
|
||||
vault_cipher = vault.VaultAES256()
|
||||
res = vault_cipher.is_equal(u'私はガラスを食べられます。それは私を傷つけません。',
|
||||
u'私はガラスを食べられます。それは私を傷つけません。')
|
||||
self.assertTrue(res)
|
||||
def test_is_equal_non_ascii_unequal(self):
|
||||
utf8_data = to_bytes(u'私はガラスを食べられます。それは私を傷つけません。')
|
||||
utf8_data2 = to_bytes(u'Pot să mănânc sticlă și ea nu mă rănește.')
|
||||
|
||||
def test_is_equal_unicode_different(self):
|
||||
vault_cipher = vault.VaultAES256()
|
||||
res = vault_cipher.is_equal(u'私はガラスを食べられます。それは私を傷つけません。',
|
||||
u'Pot să mănânc sticlă și ea nu mă rănește.')
|
||||
self.assertFalse(res)
|
||||
# Test for the len optimization path
|
||||
self.assertFalse(self.vault_cipher._is_equal(utf8_data, utf8_data2))
|
||||
# Test for the slower, char by char comparison path
|
||||
self.assertFalse(self.vault_cipher._is_equal(utf8_data, utf8_data[:-1] + b'P'))
|
||||
|
||||
def test_is_equal_non_bytes(self):
|
||||
""" Anything not a byte string should raise a TypeError """
|
||||
self.assertRaises(TypeError, self.vault_cipher._is_equal, u"One fish", b"two fish")
|
||||
self.assertRaises(TypeError, self.vault_cipher._is_equal, b"One fish", u"two fish")
|
||||
self.assertRaises(TypeError, self.vault_cipher._is_equal, 1, b"red fish")
|
||||
self.assertRaises(TypeError, self.vault_cipher._is_equal, b"blue fish", 2)
|
||||
|
||||
|
||||
class TestVaultLib(unittest.TestCase):
|
||||
|
||||
def test_methods_exist(self):
|
||||
v = VaultLib('ansible')
|
||||
slots = ['is_encrypted',
|
||||
'encrypt',
|
||||
'decrypt',
|
||||
'_format_output',
|
||||
'_split_header',]
|
||||
for slot in slots:
|
||||
assert hasattr(v, slot), "VaultLib is missing the %s method" % slot
|
||||
def setUp(self):
|
||||
self.v = VaultLib('test-vault-password')
|
||||
|
||||
def test_encrypt(self):
|
||||
v = VaultLib(password='the_unit_test_password')
|
||||
plaintext = u'Some text to encrypt.'
|
||||
ciphertext = v.encrypt(plaintext)
|
||||
plaintext = u'Some text to encrypt in a café'
|
||||
b_vaulttext = self.v.encrypt(plaintext)
|
||||
|
||||
self.assertIsInstance(ciphertext, (bytes, str))
|
||||
# TODO: assert something...
|
||||
self.assertIsInstance(b_vaulttext, six.binary_type)
|
||||
|
||||
b_header = b'$ANSIBLE_VAULT;1.1;AES256\n'
|
||||
self.assertEqual(b_vaulttext[:len(b_header)], b_header)
|
||||
|
||||
def test_encrypt_bytes(self):
|
||||
|
||||
plaintext = to_bytes(u'Some text to encrypt in a café')
|
||||
b_vaulttext = self.v.encrypt(plaintext)
|
||||
|
||||
self.assertIsInstance(b_vaulttext, six.binary_type)
|
||||
|
||||
b_header = b'$ANSIBLE_VAULT;1.1;AES256\n'
|
||||
self.assertEqual(b_vaulttext[:len(b_header)], b_header)
|
||||
|
||||
def test_is_encrypted(self):
|
||||
v = VaultLib(None)
|
||||
assert not v.is_encrypted("foobar".encode('utf-8')), "encryption check on plaintext failed"
|
||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
assert v.is_encrypted(data.encode('utf-8')), "encryption check on headered text failed"
|
||||
|
||||
def test_is_encrypted_bytes(self):
|
||||
v = VaultLib(None)
|
||||
assert not v.is_encrypted(b"foobar"), "encryption check on plaintext failed"
|
||||
data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" + hexlify(b"ansible")
|
||||
assert v.is_encrypted(data), "encryption check on headered text failed"
|
||||
self.assertFalse(self.v.is_encrypted(b"foobar"), msg="encryption check on plaintext yielded false positive")
|
||||
b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
self.assertTrue(self.v.is_encrypted(b_data), msg="encryption check on headered text failed")
|
||||
|
||||
def test_format_output(self):
|
||||
v = VaultLib('ansible')
|
||||
v.cipher_name = "TEST"
|
||||
sensitive_data = b"ansible"
|
||||
data = v._format_output(sensitive_data)
|
||||
lines = data.split(b'\n')
|
||||
assert len(lines) > 1, "failed to properly add header"
|
||||
header = to_bytes(lines[0])
|
||||
assert header.endswith(b';TEST'), "header does end with cipher name"
|
||||
header_parts = header.split(b';')
|
||||
assert len(header_parts) == 3, "header has the wrong number of parts"
|
||||
assert header_parts[0] == b'$ANSIBLE_VAULT', "header does not start with $ANSIBLE_VAULT"
|
||||
assert header_parts[1] == v.b_version, "header version is incorrect"
|
||||
assert header_parts[2] == b'TEST', "header does end with cipher name"
|
||||
self.v.cipher_name = "TEST"
|
||||
b_ciphertext = b"ansible"
|
||||
b_vaulttext = self.v._format_output(b_ciphertext)
|
||||
b_lines = b_vaulttext.split(b'\n')
|
||||
self.assertGreater(len(b_lines), 1, msg="failed to properly add header")
|
||||
|
||||
b_header = b_lines[0]
|
||||
self.assertTrue(b_header.endswith(b';TEST'), msg="header does not end with cipher name")
|
||||
|
||||
b_header_parts = b_header.split(b';')
|
||||
self.assertEqual(len(b_header_parts), 3, msg="header has the wrong number of parts")
|
||||
self.assertEqual(b_header_parts[0], b'$ANSIBLE_VAULT', msg="header does not start with $ANSIBLE_VAULT")
|
||||
self.assertEqual(b_header_parts[1], self.v.b_version, msg="header version is incorrect")
|
||||
self.assertEqual(b_header_parts[2], b'TEST', msg="header does not end with cipher name")
|
||||
|
||||
def test_split_header(self):
|
||||
v = VaultLib('ansible')
|
||||
data = b"$ANSIBLE_VAULT;9.9;TEST\nansible"
|
||||
rdata = v._split_header(data)
|
||||
lines = rdata.split(b'\n')
|
||||
assert lines[0] == b"ansible"
|
||||
assert v.cipher_name == 'TEST', "cipher name was not set"
|
||||
assert v.b_version == b"9.9"
|
||||
b_vaulttext = b"$ANSIBLE_VAULT;9.9;TEST\nansible"
|
||||
b_ciphertext = self.v._split_header(b_vaulttext)
|
||||
b_lines = b_ciphertext.split(b'\n')
|
||||
self.assertEqual(b_lines[0], b"ansible", msg="Payload was not properly split from the header")
|
||||
self.assertEqual(self.v.cipher_name, u'TEST', msg="cipher name was not properly set")
|
||||
self.assertEqual(self.v.b_version, b"9.9", msg="version was not properly set")
|
||||
|
||||
def test_encrypt_decrypt_aes(self):
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
v = VaultLib('ansible')
|
||||
v.cipher_name = u'AES'
|
||||
self.v.cipher_name = u'AES'
|
||||
self.v.b_password = b'ansible'
|
||||
# AES encryption code has been removed, so this is old output for
|
||||
# AES-encrypted 'foobar' with password 'ansible'.
|
||||
enc_data = b'$ANSIBLE_VAULT;1.1;AES\n53616c7465645f5fc107ce1ef4d7b455e038a13b053225776458052f8f8f332d554809d3f150bfa3\nfe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e\n786a5a15efeb787e1958cbdd480d076c\n'
|
||||
dec_data = v.decrypt(enc_data)
|
||||
assert dec_data == b"foobar", "decryption failed"
|
||||
b_vaulttext = b'''$ANSIBLE_VAULT;1.1;AES
|
||||
53616c7465645f5fc107ce1ef4d7b455e038a13b053225776458052f8f8f332d554809d3f150bfa3
|
||||
fe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e
|
||||
786a5a15efeb787e1958cbdd480d076c
|
||||
'''
|
||||
b_plaintext = self.v.decrypt(b_vaulttext)
|
||||
self.assertEqual(b_plaintext, b"foobar", msg="decryption failed")
|
||||
|
||||
def test_encrypt_decrypt_aes256(self):
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
v = VaultLib('ansible')
|
||||
v.cipher_name = 'AES256'
|
||||
plaintext = "foobar"
|
||||
enc_data = v.encrypt(plaintext)
|
||||
dec_data = v.decrypt(enc_data)
|
||||
assert enc_data != b"foobar", "encryption failed"
|
||||
assert dec_data == b"foobar", "decryption failed"
|
||||
self.v.cipher_name = u'AES256'
|
||||
plaintext = u"foobar"
|
||||
b_vaulttext = self.v.encrypt(plaintext)
|
||||
b_plaintext = self.v.decrypt(b_vaulttext)
|
||||
self.assertNotEqual(b_vaulttext, b"foobar", msg="encryption failed")
|
||||
self.assertEqual(b_plaintext, b"foobar", msg="decryption failed")
|
||||
|
||||
def test_encrypt_decrypt_aes256_existing_vault(self):
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
v = VaultLib('test-vault-password')
|
||||
v.cipher_name = 'AES256'
|
||||
plaintext = b"Setec Astronomy"
|
||||
enc_data = '''$ANSIBLE_VAULT;1.1;AES256
|
||||
self.v.cipher_name = u'AES256'
|
||||
b_orig_plaintext = b"Setec Astronomy"
|
||||
vaulttext = u'''$ANSIBLE_VAULT;1.1;AES256
|
||||
33363965326261303234626463623963633531343539616138316433353830356566396130353436
|
||||
3562643163366231316662386565383735653432386435610a306664636137376132643732393835
|
||||
63383038383730306639353234326630666539346233376330303938323639306661313032396437
|
||||
6233623062366136310a633866373936313238333730653739323461656662303864663666653563
|
||||
3138'''
|
||||
|
||||
dec_data = v.decrypt(enc_data)
|
||||
assert dec_data == plaintext, "decryption failed"
|
||||
b_plaintext = self.v.decrypt(vaulttext)
|
||||
self.assertEqual(b_plaintext, b_plaintext, msg="decryption failed")
|
||||
|
||||
b_vaulttext = to_bytes(vaulttext, encoding='ascii', errors='strict')
|
||||
b_plaintext = self.v.decrypt(b_vaulttext)
|
||||
self.assertEqual(b_plaintext, b_orig_plaintext, msg="decryption failed")
|
||||
|
||||
def test_encrypt_decrypt_aes256_bad_hmac(self):
|
||||
# FIXME This test isn't working quite yet.
|
||||
|
@ -302,8 +316,7 @@ class TestVaultLib(unittest.TestCase):
|
|||
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
v = VaultLib('test-vault-password')
|
||||
v.cipher_name = 'AES256'
|
||||
self.v.cipher_name = 'AES256'
|
||||
# plaintext = "Setec Astronomy"
|
||||
enc_data = '''$ANSIBLE_VAULT;1.1;AES256
|
||||
33363965326261303234626463623963633531343539616138316433353830356566396130353436
|
||||
|
@ -312,7 +325,7 @@ class TestVaultLib(unittest.TestCase):
|
|||
6233623062366136310a633866373936313238333730653739323461656662303864663666653563
|
||||
3138'''
|
||||
b_data = to_bytes(enc_data, errors='strict', encoding='utf-8')
|
||||
b_data = v._split_header(b_data)
|
||||
b_data = self.v._split_header(b_data)
|
||||
foo = binascii.unhexlify(b_data)
|
||||
lines = foo.splitlines()
|
||||
# line 0 is salt, line 1 is hmac, line 2+ is ciphertext
|
||||
|
@ -331,31 +344,33 @@ class TestVaultLib(unittest.TestCase):
|
|||
b_ciphertext_data = binascii.hexlify(b_ciphertext)
|
||||
b_payload = b'\n'.join([b_salt, b_hmac, b_ciphertext_data])
|
||||
# reformat
|
||||
b_invalid_ciphertext = v._format_output(b_payload)
|
||||
b_invalid_ciphertext = self.v._format_output(b_payload)
|
||||
|
||||
# assert we throw an error
|
||||
v.decrypt(b_invalid_ciphertext)
|
||||
self.v.decrypt(b_invalid_ciphertext)
|
||||
|
||||
def test_encrypt_encrypted(self):
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
v = VaultLib('ansible')
|
||||
v.cipher_name = 'AES'
|
||||
data = "$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(six.b("ansible"))
|
||||
self.assertRaises(errors.AnsibleError, v.encrypt, data,)
|
||||
self.v.cipher_name = u'AES'
|
||||
b_vaulttext = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
vaulttext = to_text(b_vaulttext, errors='strict')
|
||||
self.assertRaises(errors.AnsibleError, self.v.encrypt, b_vaulttext)
|
||||
self.assertRaises(errors.AnsibleError, self.v.encrypt, vaulttext)
|
||||
|
||||
def test_decrypt_decrypted(self):
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
v = VaultLib('ansible')
|
||||
data = "ansible"
|
||||
self.assertRaises(errors.AnsibleError, v.decrypt, data)
|
||||
plaintext = u"ansible"
|
||||
self.assertRaises(errors.AnsibleError, self.v.decrypt, plaintext)
|
||||
|
||||
b_plaintext = b"ansible"
|
||||
self.assertRaises(errors.AnsibleError, self.v.decrypt, b_plaintext)
|
||||
|
||||
def test_cipher_not_set(self):
|
||||
# not setting the cipher should default to AES256
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
v = VaultLib('ansible')
|
||||
data = "ansible"
|
||||
v.encrypt(data)
|
||||
self.assertEquals(v.cipher_name, "AES256")
|
||||
plaintext = u"ansible"
|
||||
self.v.encrypt(plaintext)
|
||||
self.assertEquals(self.v.cipher_name, "AES256")
|
||||
|
|
|
@ -104,12 +104,8 @@ class TestVaultEditor(unittest.TestCase):
|
|||
|
||||
self.assertTrue(os.path.exists(tmp_file.name))
|
||||
|
||||
@unittest.skipIf(sys.version_info[0] >= 3, "VaultAES still needs to be ported to Python 3")
|
||||
def test_decrypt_1_0(self):
|
||||
"""
|
||||
Skip testing decrypting 1.0 files if we don't have access to AES, KDF or
|
||||
Counter, or we are running on python3 since VaultAES hasn't been backported.
|
||||
"""
|
||||
# Skip testing decrypting 1.0 files if we don't have access to AES, KDF or Counter.
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
|
||||
|
@ -163,12 +159,8 @@ class TestVaultEditor(unittest.TestCase):
|
|||
assert error_hit is False, "error decrypting 1.0 file"
|
||||
assert fdata.strip() == "foo", "incorrect decryption of 1.0 file: %s" % fdata.strip()
|
||||
|
||||
@unittest.skipIf(sys.version_info[0] >= 3, "VaultAES still needs to be ported to Python 3")
|
||||
def test_rekey_migration(self):
|
||||
"""
|
||||
Skip testing rekeying files if we don't have access to AES, KDF or
|
||||
Counter, or we are running on python3 since VaultAES hasn't been backported.
|
||||
"""
|
||||
# Skip testing rekeying files if we don't have access to AES, KDF or Counter.
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
|
||||
|
@ -205,4 +197,4 @@ class TestVaultEditor(unittest.TestCase):
|
|||
|
||||
assert vl.cipher_name == "AES256", "wrong cipher name set after rekey: %s" % vl.cipher_name
|
||||
assert error_hit is False, "error decrypting migrated 1.0 file"
|
||||
assert dec_data.strip() == "foo", "incorrect decryption of rekeyed/migrated file: %s" % dec_data
|
||||
assert dec_data.strip() == b"foo", "incorrect decryption of rekeyed/migrated file: %s" % dec_data
|
||||
|
|
|
@ -21,8 +21,7 @@ __metaclass__ = type
|
|||
|
||||
from io import StringIO
|
||||
|
||||
import mock
|
||||
|
||||
from ansible.compat.tests import mock
|
||||
from ansible.compat.tests import unittest
|
||||
from ansible.errors import AnsibleError
|
||||
from ansible.playbook.play_context import PlayContext
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue