mirror of
https://github.com/ansible-collections/community.general.git
synced 2025-05-02 23:31:25 -07:00
Cyptography pr 20566 rebase (#25560)
Make pyca/cryptography the preferred backend for cryptographic needs (mainly vault) falling back to pycrypto pyca/cryptography is already implicitly a dependency in many cases through paramiko (2.0+) as well as the new openssl_publickey module, which requires pyOpenSSL 16.0+. Additionally, pyca/cryptography is an optional dep for better performance with vault already. This commit leverages cryptography's padding, constant time comparisons, and CBC/CTR modes to reduce the amount of code ansible needs to maintain. * Handle wrong password given for VaultAES format * Do not display deprecation warning for cryptography on python-2.6 * Namespace all of the pycrypto imports and always import them Makes unittests better and the code less likely to get stupid mistakes (like using HMAC from cryptogrpahy when the one from pycrypto is needed) * Add back in atfork since we need pycrypto to reinitialize its RNG just in case we're being used with old paramiko * contrib/inventory/gce: Remove spurious require on pycrypto (cherry picked from commit 9e16b9db275263b3ea8d1b124966fdebfc9ab271) * Add cryptography to ec2_win_password module requirements * Fix python3 bug which would pass text strings to a function which requires byte strings. * Attempt to add pycrypto version to setup deps * Change hacking README for dual pycrypto/cryptography * update dependencies for various CI scripts * additional CI dockerfile/script updates * add paramiko to the windows and sanity requirement set This is needed because ansible lists it as a requirement. Previously the missing dep wasn't enforced, but cryptography imports pkg_resources so you can't ignore a requirement any more * Add integration test cases for old vault and for wrong passwords * helper script for manual testing of pycrypto/cryptography * Skip the pycrypto tests so that users without it installed can still run the unittests * Run unittests for vault with both cryptography and pycrypto backend
This commit is contained in:
parent
e7e091d154
commit
e238ae999b
25 changed files with 456 additions and 242 deletions
|
@ -26,7 +26,7 @@ import io
|
|||
import os
|
||||
|
||||
from binascii import hexlify
|
||||
from nose.plugins.skip import SkipTest
|
||||
import pytest
|
||||
|
||||
from ansible.compat.tests import unittest
|
||||
|
||||
|
@ -37,28 +37,6 @@ from ansible.parsing.vault import VaultLib
|
|||
from ansible.parsing import vault
|
||||
|
||||
|
||||
# Counter import fails for 2.0.1, requires >= 2.6.1 from pip
|
||||
try:
|
||||
from Crypto.Util import Counter
|
||||
HAS_COUNTER = True
|
||||
except ImportError:
|
||||
HAS_COUNTER = False
|
||||
|
||||
# KDF import fails for 2.0.1, requires >= 2.6.1 from pip
|
||||
try:
|
||||
from Crypto.Protocol.KDF import PBKDF2
|
||||
HAS_PBKDF2 = True
|
||||
except ImportError:
|
||||
HAS_PBKDF2 = False
|
||||
|
||||
# AES IMPORTS
|
||||
try:
|
||||
from Crypto.Cipher import AES as AES
|
||||
HAS_AES = True
|
||||
except ImportError:
|
||||
HAS_AES = False
|
||||
|
||||
|
||||
class TestVaultIsEncrypted(unittest.TestCase):
|
||||
def test_bytes_not_encrypted(self):
|
||||
b_data = b"foobar"
|
||||
|
@ -151,6 +129,8 @@ class TestVaultIsEncryptedFile(unittest.TestCase):
|
|||
self.assertTrue(vault.is_encrypted_file(b_data_fo, start_pos=4, count=vault_length))
|
||||
|
||||
|
||||
@pytest.mark.skipif(not vault.HAS_CRYPTOGRAPHY,
|
||||
reason="Skipping cryptography tests because cryptography is not installed")
|
||||
class TestVaultCipherAes256(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.vault_cipher = vault.VaultAES256()
|
||||
|
@ -159,26 +139,71 @@ class TestVaultCipherAes256(unittest.TestCase):
|
|||
self.assertIsInstance(self.vault_cipher, vault.VaultAES256)
|
||||
|
||||
# TODO: tag these as slow tests
|
||||
def test_create_key(self):
|
||||
def test_create_key_cryptography(self):
|
||||
b_password = b'hunter42'
|
||||
b_salt = os.urandom(32)
|
||||
b_key = self.vault_cipher._create_key(b_password, b_salt, keylength=32, ivlength=16)
|
||||
self.assertIsInstance(b_key, six.binary_type)
|
||||
b_key_cryptography = self.vault_cipher._create_key_cryptography(b_password, b_salt, key_length=32, iv_length=16)
|
||||
self.assertIsInstance(b_key_cryptography, six.binary_type)
|
||||
|
||||
def test_create_key_known(self):
|
||||
@pytest.mark.skipif(not vault.HAS_PYCRYPTO, reason='Not testing pycrypto key as pycrypto is not installed')
|
||||
def test_create_key_pycrypto(self):
|
||||
b_password = b'hunter42'
|
||||
b_salt = os.urandom(32)
|
||||
|
||||
b_key_pycrypto = self.vault_cipher._create_key_pycrypto(b_password, b_salt, key_length=32, iv_length=16)
|
||||
self.assertIsInstance(b_key_pycrypto, six.binary_type)
|
||||
|
||||
@pytest.mark.skipif(not vault.HAS_PYCRYPTO,
|
||||
reason='Not comparing cryptography key to pycrypto key as pycrypto is not installed')
|
||||
def test_compare_new_keys(self):
|
||||
b_password = b'hunter42'
|
||||
b_salt = os.urandom(32)
|
||||
b_key_cryptography = self.vault_cipher._create_key_cryptography(b_password, b_salt, key_length=32, iv_length=16)
|
||||
|
||||
b_key_pycrypto = self.vault_cipher._create_key_pycrypto(b_password, b_salt, key_length=32, iv_length=16)
|
||||
self.assertEqual(b_key_cryptography, b_key_pycrypto)
|
||||
|
||||
def test_create_key_known_cryptography(self):
|
||||
b_password = b'hunter42'
|
||||
|
||||
# A fixed salt
|
||||
b_salt = b'q' * 32 # q is the most random letter.
|
||||
b_key = self.vault_cipher._create_key(b_password, b_salt, keylength=32, ivlength=16)
|
||||
self.assertIsInstance(b_key, six.binary_type)
|
||||
b_key_1 = self.vault_cipher._create_key_cryptography(b_password, b_salt, key_length=32, iv_length=16)
|
||||
self.assertIsInstance(b_key_1, six.binary_type)
|
||||
|
||||
# verify we get the same answer
|
||||
# we could potentially run a few iterations of this and time it to see if it's roughly constant time
|
||||
# and or that it exceeds some minimal time, but that would likely cause unreliable fails, esp in CI
|
||||
b_key_2 = self.vault_cipher._create_key(b_password, b_salt, keylength=32, ivlength=16)
|
||||
self.assertIsInstance(b_key, six.binary_type)
|
||||
self.assertEqual(b_key, b_key_2)
|
||||
b_key_2 = self.vault_cipher._create_key_cryptography(b_password, b_salt, key_length=32, iv_length=16)
|
||||
self.assertIsInstance(b_key_2, six.binary_type)
|
||||
self.assertEqual(b_key_1, b_key_2)
|
||||
|
||||
# And again with pycrypto
|
||||
b_key_3 = self.vault_cipher._create_key_pycrypto(b_password, b_salt, key_length=32, iv_length=16)
|
||||
self.assertIsInstance(b_key_3, six.binary_type)
|
||||
|
||||
# verify we get the same answer
|
||||
# we could potentially run a few iterations of this and time it to see if it's roughly constant time
|
||||
# and or that it exceeds some minimal time, but that would likely cause unreliable fails, esp in CI
|
||||
b_key_4 = self.vault_cipher._create_key_pycrypto(b_password, b_salt, key_length=32, iv_length=16)
|
||||
self.assertIsInstance(b_key_4, six.binary_type)
|
||||
self.assertEqual(b_key_3, b_key_4)
|
||||
self.assertEqual(b_key_1, b_key_4)
|
||||
|
||||
def test_create_key_known_pycrypto(self):
|
||||
b_password = b'hunter42'
|
||||
|
||||
# A fixed salt
|
||||
b_salt = b'q' * 32 # q is the most random letter.
|
||||
b_key_3 = self.vault_cipher._create_key_pycrypto(b_password, b_salt, key_length=32, iv_length=16)
|
||||
self.assertIsInstance(b_key_3, six.binary_type)
|
||||
|
||||
# verify we get the same answer
|
||||
# we could potentially run a few iterations of this and time it to see if it's roughly constant time
|
||||
# and or that it exceeds some minimal time, but that would likely cause unreliable fails, esp in CI
|
||||
b_key_4 = self.vault_cipher._create_key_pycrypto(b_password, b_salt, key_length=32, iv_length=16)
|
||||
self.assertIsInstance(b_key_4, six.binary_type)
|
||||
self.assertEqual(b_key_3, b_key_4)
|
||||
|
||||
def test_is_equal_is_equal(self):
|
||||
self.assertTrue(self.vault_cipher._is_equal(b'abcdefghijklmnopqrstuvwxyz', b'abcdefghijklmnopqrstuvwxyz'))
|
||||
|
@ -213,6 +238,21 @@ class TestVaultCipherAes256(unittest.TestCase):
|
|||
self.assertRaises(TypeError, self.vault_cipher._is_equal, b"blue fish", 2)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not vault.HAS_PYCRYPTO,
|
||||
reason="Skipping Pycrypto tests because pycrypto is not installed")
|
||||
class TestVaultCipherAes256PyCrypto(TestVaultCipherAes256):
|
||||
def setUp(self):
|
||||
self.has_cryptography = vault.HAS_CRYPTOGRAPHY
|
||||
vault.HAS_CRYPTOGRAPHY = False
|
||||
super(TestVaultCipherAes256PyCrypto, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
vault.HAS_CRYPTOGRAPHY = self.has_cryptography
|
||||
super(TestVaultCipherAes256PyCrypto, self).tearDown()
|
||||
|
||||
|
||||
@pytest.mark.skipif(not vault.HAS_CRYPTOGRAPHY,
|
||||
reason="Skipping cryptography tests because cryptography is not installed")
|
||||
class TestVaultLib(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.v = VaultLib('test-vault-password')
|
||||
|
@ -266,8 +306,6 @@ class TestVaultLib(unittest.TestCase):
|
|||
self.assertEqual(self.v.b_version, b"9.9", msg="version was not properly set")
|
||||
|
||||
def test_encrypt_decrypt_aes(self):
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
self.v.cipher_name = u'AES'
|
||||
self.v.b_password = b'ansible'
|
||||
# AES encryption code has been removed, so this is old output for
|
||||
|
@ -281,8 +319,6 @@ fe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e
|
|||
self.assertEqual(b_plaintext, b"foobar", msg="decryption failed")
|
||||
|
||||
def test_encrypt_decrypt_aes256(self):
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
self.v.cipher_name = u'AES256'
|
||||
plaintext = u"foobar"
|
||||
b_vaulttext = self.v.encrypt(plaintext)
|
||||
|
@ -291,8 +327,6 @@ fe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e
|
|||
self.assertEqual(b_plaintext, b"foobar", msg="decryption failed")
|
||||
|
||||
def test_encrypt_decrypt_aes256_existing_vault(self):
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
self.v.cipher_name = u'AES256'
|
||||
b_orig_plaintext = b"Setec Astronomy"
|
||||
vaulttext = u'''$ANSIBLE_VAULT;1.1;AES256
|
||||
|
@ -309,12 +343,10 @@ fe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e
|
|||
b_plaintext = self.v.decrypt(b_vaulttext)
|
||||
self.assertEqual(b_plaintext, b_orig_plaintext, msg="decryption failed")
|
||||
|
||||
# FIXME This test isn't working quite yet.
|
||||
@pytest.mark.skip(reason='This test is not ready yet')
|
||||
def test_encrypt_decrypt_aes256_bad_hmac(self):
|
||||
# FIXME This test isn't working quite yet.
|
||||
raise SkipTest
|
||||
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
self.v.cipher_name = 'AES256'
|
||||
# plaintext = "Setec Astronomy"
|
||||
enc_data = '''$ANSIBLE_VAULT;1.1;AES256
|
||||
|
@ -349,8 +381,6 @@ fe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e
|
|||
self.v.decrypt(b_invalid_ciphertext)
|
||||
|
||||
def test_encrypt_encrypted(self):
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
self.v.cipher_name = u'AES'
|
||||
b_vaulttext = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||
vaulttext = to_text(b_vaulttext, errors='strict')
|
||||
|
@ -358,8 +388,6 @@ fe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e
|
|||
self.assertRaises(errors.AnsibleError, self.v.encrypt, vaulttext)
|
||||
|
||||
def test_decrypt_decrypted(self):
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
plaintext = u"ansible"
|
||||
self.assertRaises(errors.AnsibleError, self.v.decrypt, plaintext)
|
||||
|
||||
|
@ -367,9 +395,19 @@ fe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e
|
|||
self.assertRaises(errors.AnsibleError, self.v.decrypt, b_plaintext)
|
||||
|
||||
def test_cipher_not_set(self):
|
||||
# not setting the cipher should default to AES256
|
||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||
raise SkipTest
|
||||
plaintext = u"ansible"
|
||||
self.v.encrypt(plaintext)
|
||||
self.assertEquals(self.v.cipher_name, "AES256")
|
||||
|
||||
|
||||
@pytest.mark.skipif(not vault.HAS_PYCRYPTO,
|
||||
reason="Skipping Pycrypto tests because pycrypto is not installed")
|
||||
class TestVaultLibPyCrypto(TestVaultLib):
|
||||
def setUp(self):
|
||||
self.has_cryptography = vault.HAS_CRYPTOGRAPHY
|
||||
vault.HAS_CRYPTOGRAPHY = False
|
||||
super(TestVaultLibPyCrypto, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
vault.HAS_CRYPTOGRAPHY = self.has_cryptography
|
||||
super(TestVaultLibPyCrypto, self).tearDown()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue