Support multiple vault passwords (#22756)

Fixes #13243

** Add --vault-id to name/identify multiple vault passwords

Use --vault-id to indicate id and path/type

 --vault-id=prompt  # prompt for default vault id password
 --vault-id=myorg@prompt  # prompt for a vault_id named 'myorg'
 --vault-id=a_password_file  # load ./a_password_file for default id
 --vault-id=myorg@a_password_file # load file for 'myorg' vault id

vault_id's are created implicitly for existing --vault-password-file
and --ask-vault-pass options.

Vault ids are just for UX purposes and bookkeeping. Only the vault
payload and the password bytestring is needed to decrypt a
vault blob.

Replace passing password around everywhere with
a VaultSecrets object.

If we specify a vault_id, mention that in password prompts

Specifying multiple -vault-password-files will
now try each until one works

** Rev vault format in a backwards compatible way

The 1.2 vault format adds the vault_id to the header line
of the vault text. This is backwards compatible with older
versions of ansible. Old versions will just ignore it and
treat it as the default (and only) vault id.

Note: only 2.4+ supports multiple vault passwords, so while
earlier ansible versions can read the vault-1.2 format, it
does not make them magically support multiple vault passwords.

use 1.1 format for 'default' vault_id

Vaulted items that need to include a vault_id will be
written in 1.2 format.

If we set a new DEFAULT_VAULT_IDENTITY, then the default will
use version 1.2

vault will only use a vault_id if one is specified. So if none
is specified and C.DEFAULT_VAULT_IDENTITY is 'default'
we use the old format.

** Changes/refactors needed to implement multiple vault passwords

raise exceptions on decrypt fail, check vault id early

split out parsing the vault plaintext envelope (with the
sha/original plaintext) to _split_plaintext_envelope()

some cli fixups for specifying multiple paths in
the unfrack_paths optparse callback

fix py3 dict.keys() 'dict_keys object is not indexable' error

pluralize cli.options.vault_password_file -> vault_password_files
pluralize cli.options.new_vault_password_file -> new_vault_password_files
pluralize cli.options.vault_id -> cli.options.vault_ids

** Add a config option (vault_id_match) to force vault id matching.

With 'vault_id_match=True' and an ansible
vault that provides a vault_id, then decryption will require
that a matching vault_id is required. (via
--vault-id=my_vault_id@password_file, for ex).

In other words, if the config option is true, then only
the vault secrets with matching vault ids are candidates for
decrypting a vault. If option is false (the default), then
all of the provided vault secrets will be selected.

If a user doesn't want all vault secrets to be tried to
decrypt any vault content, they can enable this option.

Note: The vault id used for the match is not encrypted or
cryptographically signed. It is just a label/id/nickname used
for referencing a specific vault secret.
This commit is contained in:
Adrian Likins 2017-07-28 15:20:58 -04:00 committed by GitHub
commit 934b645191
34 changed files with 1922 additions and 345 deletions

View file

@ -34,7 +34,7 @@ from abc import ABCMeta, abstractmethod
import ansible
from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.errors import AnsibleOptionsError
from ansible.inventory.manager import InventoryManager
from ansible.module_utils.six import with_metaclass, string_types
from ansible.module_utils._text import to_bytes, to_text
@ -43,6 +43,7 @@ from ansible.release import __version__
from ansible.utils.path import unfrackpath
from ansible.utils.vars import load_extra_vars, load_options_vars
from ansible.vars.manager import VariableManager
from ansible.parsing.vault import PromptVaultSecret, get_file_vault_secret
try:
from __main__ import display
@ -168,37 +169,89 @@ class CLI(with_metaclass(ABCMeta, object)):
display.v(u"No config file found; using defaults")
@staticmethod
def ask_vault_passwords():
''' prompt for vault password and/or password change '''
def split_vault_id(vault_id):
# return (before_@, after_@)
# if no @, return whole string as after_
if '@' not in vault_id:
return (None, vault_id)
vault_pass = None
try:
vault_pass = getpass.getpass(prompt="Vault password: ")
except EOFError:
pass
# enforce no newline chars at the end of passwords
if vault_pass:
vault_pass = to_bytes(vault_pass, errors='surrogate_or_strict', nonstring='simplerepr').strip()
return vault_pass
parts = vault_id.split('@', 1)
ret = tuple(parts)
return ret
@staticmethod
def ask_new_vault_passwords():
new_vault_pass = None
try:
new_vault_pass = getpass.getpass(prompt="New Vault password: ")
new_vault_pass2 = getpass.getpass(prompt="Confirm New Vault password: ")
if new_vault_pass != new_vault_pass2:
raise AnsibleError("Passwords do not match")
except EOFError:
pass
def build_vault_ids(vault_ids, vault_password_files=None, ask_vault_pass=None):
vault_password_files = vault_password_files or []
vault_ids = vault_ids or []
if new_vault_pass:
new_vault_pass = to_bytes(new_vault_pass, errors='surrogate_or_strict', nonstring='simplerepr').strip()
# convert vault_password_files into vault_ids slugs
for password_file in vault_password_files:
id_slug = u'%s@%s' % (C.DEFAULT_VAULT_IDENTITY, password_file)
return new_vault_pass
# note this makes --vault-id higher precendence than --vault-password-file
# if we want to intertwingle them in order probably need a cli callback to populate vault_ids
# used by --vault-id and --vault-password-file
vault_ids.append(id_slug)
if ask_vault_pass:
id_slug = u'%s@%s' % (C.DEFAULT_VAULT_IDENTITY, u'prompt')
vault_ids.append(id_slug)
return vault_ids
# TODO: remove the now unused args
@staticmethod
def setup_vault_secrets(loader, vault_ids, vault_password_files=None,
ask_vault_pass=None, create_new_password=False):
# list of tuples
vault_secrets = []
if create_new_password:
prompt_formats = ['New vault password (%s): ',
'Confirm vew vault password (%s): ']
else:
prompt_formats = ['Vault password (%s): ']
vault_ids = CLI.build_vault_ids(vault_ids,
vault_password_files,
ask_vault_pass)
for index, vault_id_slug in enumerate(vault_ids):
vault_id_name, vault_id_value = CLI.split_vault_id(vault_id_slug)
if vault_id_value == 'prompt':
# TODO: we could assume --vault-id=prompt implies --ask-vault-pass
# if not, we need to 'if ask_vault_pass' here
if vault_id_name:
prompted_vault_secret = PromptVaultSecret(prompt_formats=prompt_formats, vault_id=vault_id_name)
prompted_vault_secret.load()
vault_secrets.append((vault_id_name, prompted_vault_secret))
else:
prompted_vault_secret = PromptVaultSecret(prompt_formats=prompt_formats,
vault_id=C.DEFAULT_VAULT_IDENTITY)
prompted_vault_secret.load()
vault_secrets.append((C.DEFAULT_VAULT_IDENTITY, prompted_vault_secret))
# update loader with new secrets incrementally, so we can load a vault password
# that is encrypted with a vault secret provided earlier
loader.set_vault_secrets(vault_secrets)
continue
# assuming anything else is a password file
display.vvvvv('Reading vault password file: %s' % vault_id_value)
# read vault_pass from a file
file_vault_secret = get_file_vault_secret(filename=vault_id_value,
vault_id_name=vault_id_name,
loader=loader)
file_vault_secret.load()
if vault_id_name:
vault_secrets.append((vault_id_name, file_vault_secret))
else:
vault_secrets.append((C.DEFAULT_VAULT_IDENTITY, file_vault_secret))
# update loader with as-yet-known vault secrets
loader.set_vault_secrets(vault_secrets)
return vault_secrets
def ask_passwords(self):
''' prompt for connection and become passwords if needed '''
@ -260,7 +313,7 @@ class CLI(with_metaclass(ABCMeta, object)):
if vault_opts:
# Check for vault related conflicts
if (op.ask_vault_pass and op.vault_password_file):
if (op.ask_vault_pass and op.vault_password_files):
self.parser.error("--ask-vault-pass and --vault-password-file are mutually exclusive")
if runas_opts:
@ -278,12 +331,14 @@ class CLI(with_metaclass(ABCMeta, object)):
@staticmethod
def unfrack_paths(option, opt, value, parser):
paths = getattr(parser.values, option.dest)
if isinstance(value, string_types):
setattr(parser.values, option.dest, [unfrackpath(x) for x in value.split(os.pathsep)])
paths.extend([unfrackpath(x) for x in value.split(os.pathsep)])
elif isinstance(value, list):
setattr(parser.values, option.dest, [unfrackpath(x) for x in value])
paths.extend([unfrackpath(x) for x in value])
else:
pass # FIXME: should we raise options error?
setattr(parser.values, option.dest, paths)
@staticmethod
def unfrack_path(option, opt, value, parser):
@ -324,13 +379,17 @@ class CLI(with_metaclass(ABCMeta, object)):
if vault_opts:
parser.add_option('--ask-vault-pass', default=C.DEFAULT_ASK_VAULT_PASS, dest='ask_vault_pass', action='store_true',
help='ask for vault password')
parser.add_option('--vault-password-file', default=C.DEFAULT_VAULT_PASSWORD_FILE, dest='vault_password_file',
help="vault password file", action="callback", callback=CLI.unfrack_path, type='string')
parser.add_option('--new-vault-password-file', dest='new_vault_password_file',
help="new vault password file for rekey", action="callback", callback=CLI.unfrack_path, type='string')
parser.add_option('--vault-password-file', default=[], dest='vault_password_files',
help="vault password file", action="callback", callback=CLI.unfrack_paths, type='string')
parser.add_option('--new-vault-password-file', default=[], dest='new_vault_password_files',
help="new vault password file for rekey", action="callback", callback=CLI.unfrack_paths, type='string')
parser.add_option('--output', default=None, dest='output_file',
help='output file name for encrypt or decrypt; use - for stdout',
action="callback", callback=CLI.unfrack_path, type='string')
action="callback", callback=CLI.unfrack_path, type='string'),
parser.add_option('--vault-id', default=[], dest='vault_ids', action='append', type='string',
help='the vault identity to use')
parser.add_option('--new-vault-id', default=None, dest='new_vault_id', type='string',
help='the new vault identity to use for rekey')
if subset_opts:
parser.add_option('-t', '--tags', dest='tags', default=[], action='append',
@ -649,54 +708,17 @@ class CLI(with_metaclass(ABCMeta, object)):
return t
@staticmethod
def read_vault_password_file(vault_password_file, loader):
"""
Read a vault password from a file or if executable, execute the script and
retrieve password from STDOUT
"""
this_path = os.path.realpath(os.path.expanduser(vault_password_file))
if not os.path.exists(this_path):
raise AnsibleError("The vault password file %s was not found" % this_path)
if loader.is_executable(this_path):
try:
# STDERR not captured to make it easier for users to prompt for input in their scripts
p = subprocess.Popen(this_path, stdout=subprocess.PIPE)
except OSError as e:
raise AnsibleError("Problem running vault password script %s (%s). If this is not a script, "
"remove the executable bit from the file." % (' '.join(this_path), e))
stdout, stderr = p.communicate()
if p.returncode != 0:
raise AnsibleError("Vault password script %s returned non-zero (%s): %s" % (this_path, p.returncode, p.stderr))
vault_pass = stdout.strip(b'\r\n')
else:
try:
f = open(this_path, "rb")
vault_pass = f.read().strip()
f.close()
except (OSError, IOError) as e:
raise AnsibleError("Could not read vault password file %s: %s" % (this_path, e))
return vault_pass
@staticmethod
def _play_prereqs(options):
# all needs loader
loader = DataLoader()
# vault
b_vault_pass = None
if options.vault_password_file:
# read vault_pass from a file
b_vault_pass = CLI.read_vault_password_file(options.vault_password_file, loader=loader)
elif options.ask_vault_pass:
b_vault_pass = CLI.ask_vault_passwords()
if b_vault_pass is not None:
loader.set_vault_password(b_vault_pass)
vault_secrets = CLI.setup_vault_secrets(loader,
vault_ids=options.vault_ids,
vault_password_files=options.vault_password_files,
ask_vault_pass=options.ask_vault_pass)
loader.set_vault_secrets(vault_secrets)
# create the inventory, and filter it based on the subset specified (if any)
inventory = InventoryManager(loader=loader, sources=options.inventory)

View file

@ -415,6 +415,12 @@ class ConsoleCLI(CLI, cmd.Cmd):
self.loader, self.inventory, self.variable_manager = self._play_prereqs(self.options)
vault_secrets = self.setup_vault_secrets(self.loader,
vault_id=self.options.vault_ids,
vault_password_files=self.options.vault_password_files,
ask_vault_pass=self.options.ask_vault_pass)
self.loader.set_vault_secrets(vault_secrets)
no_hosts = False
if len(self.inventory.list_hosts()) == 0:
# Empty inventory

View file

@ -226,8 +226,9 @@ class PullCLI(CLI):
# Build playbook command
cmd = '%s/ansible-playbook %s %s' % (bin_path, base_opts, playbook)
if self.options.vault_password_file:
cmd += " --vault-password-file=%s" % self.options.vault_password_file
if self.options.vault_password_files:
for vault_password_file in self.options.vault_password_files:
cmd += " --vault-password-file=%s" % vault_password_file
if inv_opts:
cmd += ' %s' % inv_opts
for ev in self.options.extra_vars:

View file

@ -23,10 +23,10 @@ import os
import sys
from ansible.cli import CLI
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.errors import AnsibleOptionsError
from ansible.module_utils._text import to_text, to_bytes
from ansible.parsing.dataloader import DataLoader
from ansible.parsing.vault import VaultEditor
from ansible.parsing.vault import VaultEditor, VaultLib, match_encrypt_secret
try:
from __main__ import display
@ -59,6 +59,12 @@ class VaultCLI(CLI):
self.b_vault_pass = None
self.b_new_vault_pass = None
self.encrypt_string_read_stdin = False
self.encrypt_secret = None
self.encrypt_vault_id = None
self.new_encrypt_secret = None
self.new_encrypt_vault_id = None
super(VaultCLI, self).__init__(args)
def set_action(self):
@ -108,6 +114,11 @@ class VaultCLI(CLI):
can_output = ['encrypt', 'decrypt', 'encrypt_string']
if self.options.vault_ids:
for vault_id in self.options.vault_ids:
if u';' in vault_id:
raise AnsibleOptionsError("'%s' is not a valid vault id. The character ';' is not allowed in vault ids" % vault_id)
if self.action not in can_output:
if self.options.output_file:
raise AnsibleOptionsError("The --output option can be used only with ansible-vault %s" % '/'.join(can_output))
@ -132,43 +143,79 @@ class VaultCLI(CLI):
raise AnsibleOptionsError('The --prompt option is not supported if also reading input from stdin')
def run(self):
super(VaultCLI, self).run()
loader = DataLoader()
# set default restrictive umask
old_umask = os.umask(0o077)
if self.options.vault_password_file:
# read vault_pass from a file
self.b_vault_pass = CLI.read_vault_password_file(self.options.vault_password_file, loader)
vault_ids = self.options.vault_ids
if self.options.new_vault_password_file:
# for rekey only
self.b_new_vault_pass = CLI.read_vault_password_file(self.options.new_vault_password_file, loader)
# there are 3 types of actions, those that just 'read' (decrypt, view) and only
# need to ask for a password once, and those that 'write' (create, encrypt) that
# ask for a new password and confirm it, and 'read/write (rekey) that asks for the
# old password, then asks for a new one and confirms it.
if not self.b_vault_pass or self.options.ask_vault_pass:
# the 'read' options don't need to ask for password confirmation.
# 'edit' is read/write, but the decrypt will confirm.
if self.action in ['decrypt', 'edit', 'view', 'rekey']:
self.b_vault_pass = self.ask_vault_passwords()
else:
self.b_vault_pass = self.ask_new_vault_passwords()
# TODO: instead of prompting for these before, we could let VaultEditor
# call a callback when it needs it.
if self.action in ['decrypt', 'view', 'rekey']:
vault_secrets = self.setup_vault_secrets(loader,
vault_ids=vault_ids,
vault_password_files=self.options.vault_password_files,
ask_vault_pass=self.options.ask_vault_pass)
if not self.b_vault_pass:
raise AnsibleOptionsError("A password is required to use Ansible's Vault")
if not vault_secrets:
raise AnsibleOptionsError("A vault password is required to use Ansible's Vault")
if self.action == 'rekey':
if not self.b_new_vault_pass:
self.b_new_vault_pass = self.ask_new_vault_passwords()
if not self.b_new_vault_pass:
raise AnsibleOptionsError("A password is required to rekey Ansible's Vault")
if self.action in ['encrypt', 'encrypt_string', 'create', 'edit']:
if len(vault_ids) > 1:
raise AnsibleOptionsError("Only one --vault-id can be used for encryption")
if self.action == 'encrypt_string':
if self.options.encrypt_string_prompt:
self.encrypt_string_prompt = True
vault_secrets = None
vault_secrets = \
self.setup_vault_secrets(loader,
vault_ids=vault_ids,
vault_password_files=self.options.vault_password_files,
ask_vault_pass=self.options.ask_vault_pass,
create_new_password=True)
self.editor = VaultEditor(self.b_vault_pass)
if not vault_secrets:
raise AnsibleOptionsError("A vault password is required to use Ansible's Vault")
encrypt_secret = match_encrypt_secret(vault_secrets)
# only one secret for encrypt for now, use the first vault_id and use its first secret
# self.encrypt_vault_id = list(vault_secrets.keys())[0]
# self.encrypt_secret = vault_secrets[self.encrypt_vault_id][0]
self.encrypt_vault_id = encrypt_secret[0]
self.encrypt_secret = encrypt_secret[1]
if self.action in ['rekey']:
new_vault_ids = []
if self.options.new_vault_id:
new_vault_ids.append(self.options.new_vault_id)
new_vault_secrets = \
self.setup_vault_secrets(loader,
vault_ids=new_vault_ids,
vault_password_files=self.options.new_vault_password_files,
ask_vault_pass=self.options.ask_vault_pass,
create_new_password=True)
if not new_vault_secrets:
raise AnsibleOptionsError("A new vault password is required to use Ansible's Vault rekey")
# There is only one new_vault_id currently and one new_vault_secret
new_encrypt_secret = match_encrypt_secret(new_vault_secrets)
self.new_encrypt_vault_id = new_encrypt_secret[0]
self.new_encrypt_secret = new_encrypt_secret[1]
loader.set_vault_secrets(vault_secrets)
self.secrets = vault_secrets
# FIXME: do we need to create VaultEditor here? its not reused
vault = VaultLib(self.secrets)
self.editor = VaultEditor(vault)
self.execute()
@ -182,7 +229,10 @@ class VaultCLI(CLI):
display.display("Reading plaintext input from stdin", stderr=True)
for f in self.args or ['-']:
self.editor.encrypt_file(f, output_file=self.options.output_file)
# Fixme: use the correct vau
self.editor.encrypt_file(f, self.encrypt_secret,
vault_id=self.encrypt_vault_id,
output_file=self.options.output_file)
if sys.stdout.isatty():
display.display("Encryption successful", stderr=True)
@ -227,6 +277,8 @@ class VaultCLI(CLI):
if name_prompt_response != "":
name = name_prompt_response
# TODO: could prompt for which vault_id to use for each plaintext string
# currently, it will just be the default
# could use private=True for shadowed input if useful
prompt_response = display.prompt(msg)
@ -282,8 +334,9 @@ class VaultCLI(CLI):
b_plaintext = to_bytes(plaintext)
b_plaintext_list.append((b_plaintext, self.FROM_ARGS, name))
# TODO: specify vault_id per string?
# Format the encrypted strings and any corresponding stderr output
outputs = self._format_output_vault_strings(b_plaintext_list)
outputs = self._format_output_vault_strings(b_plaintext_list, vault_id=self.encrypt_vault_id)
for output in outputs:
err = output.get('err', None)
@ -297,7 +350,7 @@ class VaultCLI(CLI):
# TODO: offer block or string ala eyaml
def _format_output_vault_strings(self, b_plaintext_list):
def _format_output_vault_strings(self, b_plaintext_list, vault_id=None):
# If we are only showing one item in the output, we don't need to included commented
# delimiters in the text
show_delimiter = False
@ -313,7 +366,9 @@ class VaultCLI(CLI):
for index, b_plaintext_info in enumerate(b_plaintext_list):
# (the text itself, which input it came from, its name)
b_plaintext, src, name = b_plaintext_info
b_ciphertext = self.editor.encrypt_bytes(b_plaintext)
b_ciphertext = self.editor.encrypt_bytes(b_plaintext, self.encrypt_secret,
vault_id=vault_id)
# block formatting
yaml_text = self.format_ciphertext_yaml(b_ciphertext, name=name)
@ -347,7 +402,8 @@ class VaultCLI(CLI):
if len(self.args) > 1:
raise AnsibleOptionsError("ansible-vault create can take only one filename argument")
self.editor.create_file(self.args[0])
self.editor.create_file(self.args[0], self.encrypt_secret,
vault_id=self.encrypt_vault_id)
def execute_edit(self):
''' open and decrypt an existing vaulted file in an editor, that will be encryped again when closed'''
@ -363,15 +419,14 @@ class VaultCLI(CLI):
# unicode here because we are displaying it and therefore can make
# the decision that the display doesn't have to be precisely what
# the input was (leave that to decrypt instead)
self.pager(to_text(self.editor.plaintext(f)))
plaintext = self.editor.plaintext(f)
self.pager(to_text(plaintext))
def execute_rekey(self):
''' re-encrypt a vaulted file with a new secret, the previous secret is required '''
for f in self.args:
if not (os.path.isfile(f)):
raise AnsibleError(f + " does not exist")
for f in self.args:
self.editor.rekey_file(f, self.b_new_vault_pass)
# FIXME: plumb in vault_id, use the default new_vault_secret for now
self.editor.rekey_file(f, self.new_encrypt_secret,
self.new_encrypt_vault_id)
display.display("Rekey successful", stderr=True)

View file

@ -1072,6 +1072,22 @@ DEFAULT_VAR_COMPRESSION_LEVEL:
value_type: integer
vars: []
yaml: {key: defaults.var_compression_level}
DEFAULT_VAULT_ID_MATCH:
default: False
desc: 'If true, decrypting vaults with a vault id will only try the password from the matching vault-id'
env: [{name: ANSIBLE_VAULT_ID_MATCH}]
ini:
- {key: vault_id_match, section: defaults}
vars: []
yaml: {key: defaults.vault_id_match}
DEFAULT_VAULT_IDENTITY:
default: default
desc: 'TODO: write it'
env: [{name: ANSIBLE_VAULT_IDENTITY}]
ini:
- {key: vault_identity, section: defaults}
vars: []
yaml: {key: defaults.vault_identity}
DEFAULT_VAULT_PASSWORD_FILE:
default: ~
desc: 'TODO: write it'

View file

@ -26,12 +26,13 @@ import re
import tempfile
from yaml import YAMLError
from ansible.module_utils.six import text_type, string_types
from ansible.errors import AnsibleFileNotFound, AnsibleParserError
from ansible.errors.yaml_strings import YAML_SYNTAX_ERROR
from ansible.module_utils.basic import is_executable
from ansible.module_utils.six import binary_type, text_type
from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.parsing.vault import VaultLib, b_HEADER, is_encrypted, is_encrypted_file
from ansible.parsing.vault import VaultLib, b_HEADER, is_encrypted, is_encrypted_file, parse_vaulttext_envelope
from ansible.parsing.quoting import unquote
from ansible.parsing.yaml.loader import AnsibleLoader
from ansible.parsing.yaml.objects import AnsibleBaseYAMLObject, AnsibleUnicode
@ -73,11 +74,16 @@ class DataLoader:
self._tempfiles = set()
# initialize the vault stuff with an empty password
self.set_vault_password(None)
# TODO: replace with a ref to something that can get the password
# a creds/auth provider
# self.set_vault_password(None)
self._vaults = {}
self._vault = VaultLib()
self.set_vault_secrets(None)
def set_vault_password(self, b_vault_password):
self._b_vault_password = b_vault_password
self._vault = VaultLib(b_password=b_vault_password)
# TODO: since we can query vault_secrets late, we could provide this to DataLoader init
def set_vault_secrets(self, vault_secrets):
self._vault.secrets = vault_secrets
def load(self, data, file_name='<string>', show_content=True):
'''
@ -170,7 +176,7 @@ class DataLoader:
def _safe_load(self, stream, file_name=None):
''' Implements yaml.safe_load(), except using our custom loader class. '''
loader = AnsibleLoader(stream, file_name, self._b_vault_password)
loader = AnsibleLoader(stream, file_name, self._vault.secrets)
try:
return loader.get_single_data()
finally:
@ -206,6 +212,8 @@ class DataLoader:
with open(b_file_name, 'rb') as f:
data = f.read()
if is_encrypted(data):
# FIXME: plugin vault selector
b_ciphertext, b_version, cipher_name, vault_id = parse_vaulttext_envelope(data)
data = self._vault.decrypt(data, filename=b_file_name)
show_content = False
@ -362,7 +370,6 @@ class DataLoader:
b_upath = to_bytes(upath, errors='surrogate_or_strict')
b_mydir = os.path.dirname(b_upath)
# FIXME: this detection fails with non main.yml roles
# if path is in role and 'tasks' not there already, add it into the search
if is_role or self._is_role(path):
if b_mydir.endswith(b'tasks'):
@ -439,8 +446,8 @@ class DataLoader:
# the decrypt call would throw an error, but we check first
# since the decrypt function doesn't know the file name
data = f.read()
if not self._b_vault_password:
raise AnsibleParserError("A vault password must be specified to decrypt %s" % to_native(file_path))
if not self._vault.secrets:
raise AnsibleParserError("A vault password or secret must be specified to decrypt %s" % to_native(file_path))
data = self._vault.decrypt(data, filename=real_path)
# Make a temp file

View file

@ -23,6 +23,7 @@ import os
import random
import shlex
import shutil
import subprocess
import sys
import tempfile
import warnings
@ -72,6 +73,7 @@ except ImportError:
pass
from ansible.errors import AnsibleError
from ansible import constants as C
from ansible.module_utils.six import PY3, binary_type
from ansible.module_utils.six.moves import zip
from ansible.module_utils._text import to_bytes, to_text
@ -142,12 +144,267 @@ def is_encrypted_file(file_obj, start_pos=0, count=-1):
file_obj.seek(current_position)
class VaultLib:
def parse_vaulttext_envelope(b_vaulttext_envelope, default_vault_id=None):
"""Retrieve information about the Vault and clean the data
def __init__(self, b_password):
self.b_password = to_bytes(b_password, errors='strict', encoding='utf-8')
When data is saved, it has a header prepended and is formatted into 80
character lines. This method extracts the information from the header
and then removes the header and the inserted newlines. The string returned
is suitable for processing by the Cipher classes.
:arg b_vaulttext: byte str containing the data from a save file
:returns: a byte str suitable for passing to a Cipher class's
decrypt() function.
"""
# used by decrypt
default_vault_id = default_vault_id or C.DEFAULT_VAULT_IDENTITY
b_tmpdata = b_vaulttext_envelope.split(b'\n')
b_tmpheader = b_tmpdata[0].strip().split(b';')
b_version = b_tmpheader[1].strip()
cipher_name = to_text(b_tmpheader[2].strip())
vault_id = default_vault_id
# vault_id = None
# Only attempt to find vault_id if the vault file is version 1.2 or newer
# if self.b_version == b'1.2':
if len(b_tmpheader) >= 4:
vault_id = to_text(b_tmpheader[3].strip())
b_ciphertext = b''.join(b_tmpdata[1:])
return b_ciphertext, b_version, cipher_name, vault_id
def format_vaulttext_envelope(b_ciphertext, cipher_name, version=None, vault_id=None):
""" Add header and format to 80 columns
:arg b_ciphertext: the encrypted and hexlified data as a byte string
:arg cipher_name: unicode cipher name (for ex, u'AES256')
:arg version: unicode vault version (for ex, '1.2'). Optional ('1.1' is default)
:arg vault_id: unicode vault identifier. If provided, the version will be bumped to 1.2.
:returns: a byte str that should be dumped into a file. It's
formatted to 80 char columns and has the header prepended
"""
if not cipher_name:
raise AnsibleError("the cipher must be set before adding a header")
version = version or '1.1'
# If we specify a vault_id, use format version 1.2. For no vault_id, stick to 1.1
if vault_id and vault_id != u'default':
version = '1.2'
b_version = to_bytes(version, 'utf-8', errors='strict')
b_vault_id = to_bytes(vault_id, 'utf-8', errors='strict')
b_cipher_name = to_bytes(cipher_name, 'utf-8', errors='strict')
header_parts = [b_HEADER,
b_version,
b_cipher_name]
if b_version == b'1.2' and b_vault_id:
header_parts.append(b_vault_id)
header = b';'.join(header_parts)
b_vaulttext = [header]
b_vaulttext += [b_ciphertext[i:i + 80] for i in range(0, len(b_ciphertext), 80)]
b_vaulttext += [b'']
b_vaulttext = b'\n'.join(b_vaulttext)
return b_vaulttext
class VaultSecret:
'''Opaque/abstract objects for a single vault secret. ie, a password or a key.'''
def __init__(self, _bytes=None):
# FIXME: ? that seems wrong... Unset etc?
self._bytes = _bytes
@property
def bytes(self):
'''The secret as a bytestring.
Sub classes that store text types will need to override to encode the text to bytes.
'''
return self._bytes
def load(self):
return self._bytes
class PromptVaultSecret(VaultSecret):
default_prompt_formats = ["Vault password (%s): "]
def __init__(self, _bytes=None, vault_id=None, prompt_formats=None):
self._bytes = _bytes
self.vault_id = vault_id
if prompt_formats is None:
self.prompt_formats = self.default_prompt_formats
else:
self.prompt_formats = prompt_formats
@property
def bytes(self):
return self._bytes
def load(self):
self._bytes = self.ask_vault_passwords()
def ask_vault_passwords(self):
b_vault_passwords = []
for prompt_format in self.prompt_formats:
prompt = prompt_format % self.vault_id
try:
vault_pass = display.prompt(prompt, private=True)
except EOFError:
pass
b_vault_pass = to_bytes(vault_pass, errors='strict', nonstring='simplerepr').strip()
b_vault_passwords.append(b_vault_pass)
# Make sure the passwords match by comparing them all to the first password
for b_vault_password in b_vault_passwords:
self.confirm(b_vault_passwords[0], b_vault_password)
if b_vault_passwords:
return b_vault_passwords[0]
return None
def confirm(self, b_vault_pass_1, b_vault_pass_2):
# enforce no newline chars at the end of passwords
if b_vault_pass_1 != b_vault_pass_2:
# FIXME: more specific exception
raise AnsibleError("Passwords do not match")
def get_file_vault_secret(filename=None, vault_id_name=None, encoding=None, loader=None):
this_path = os.path.realpath(os.path.expanduser(filename))
if not os.path.exists(this_path):
raise AnsibleError("The vault password file %s was not found" % this_path)
if loader.is_executable(this_path):
# TODO: pass vault_id_name to script via cli
return ScriptVaultSecret(filename=this_path, encoding=encoding, loader=loader)
else:
return FileVaultSecret(filename=this_path, encoding=encoding, loader=loader)
# TODO: mv these classes to a seperate file so we don't pollute vault with 'subprocess' etc
class FileVaultSecret(VaultSecret):
def __init__(self, filename=None, encoding=None, loader=None):
super(FileVaultSecret, self).__init__()
self.filename = filename
self.loader = loader
self.encoding = encoding or 'utf8'
# We could load from file here, but that is eventually a pain to test
self._bytes = None
self._text = None
@property
def bytes(self):
if self._bytes:
return self._bytes
if self._text:
return self._text.encode(self.encoding)
return None
def load(self):
self._bytes = self.read_file(self.filename, self.loader)
@staticmethod
def read_file(filename, loader):
"""
Read a vault password from a file or if executable, execute the script and
retrieve password from STDOUT
"""
try:
f = open(filename, "rb")
vault_pass = f.read().strip()
f.close()
except (OSError, IOError) as e:
raise AnsibleError("Could not read vault password file %s: %s" % (filename, e))
return vault_pass
def __repr__(self):
if self.filename:
return "%s(filename='%s')" % (self.__class__.__name__, self.filename)
return "%s()" % (self.__class__.__name__)
class ScriptVaultSecret(FileVaultSecret):
@staticmethod
def read_file(filename, loader):
if not loader.is_executable(filename):
raise AnsibleVaultError("The vault password script %s was not executable" % filename)
try:
# STDERR not captured to make it easier for users to prompt for input in their scripts
p = subprocess.Popen(filename, stdout=subprocess.PIPE)
except OSError as e:
msg_format = "Problem running vault password script %s (%s)."
"If this is not a script, remove the executable bit from the file."
msg = msg_format % (' '.join(filename), e)
raise AnsibleError(msg)
stdout, stderr = p.communicate()
if p.returncode != 0:
raise AnsibleError("Vault password script %s returned non-zero (%s): %s" % (filename, p.returncode, p.stderr))
vault_pass = stdout.strip(b'\r\n')
return vault_pass
def match_secrets(secrets, target_vault_ids):
'''Find all VaultSecret objects that are mapped to any of the target_vault_ids in secrets'''
if not secrets:
return []
matches = [(vault_id, secret) for vault_id, secret in secrets if vault_id in target_vault_ids]
return matches
def match_best_secret(secrets, target_vault_ids):
'''Find the best secret from secrets that matches target_vault_ids
Since secrets should be ordered so the early secrets are 'better' than later ones, this
just finds all the matches, then returns the first secret'''
matches = match_secrets(secrets, target_vault_ids)
if matches:
return matches[0]
# raise exception?
return None
def match_encrypt_secret(secrets):
'''Find the best/first/only secret in secrets to use for encrypting'''
# ie, consider all of the available secrets as matches
_vault_id_matchers = [_vault_id for _vault_id, _vault_secret in secrets]
best_secret = match_best_secret(secrets, _vault_id_matchers)
# can be empty list sans any tuple
return best_secret
class VaultLib:
def __init__(self, secrets=None):
self.secrets = secrets or []
self.cipher_name = None
self.b_version = b'1.1'
self.b_version = b'1.2'
@staticmethod
def is_encrypted(data):
@ -169,7 +426,7 @@ class VaultLib:
display.deprecated(u'vault.VaultLib.is_encrypted_file is deprecated. Use vault.is_encrypted_file instead', version='2.4')
return is_encrypted_file(file_obj)
def encrypt(self, plaintext):
def encrypt(self, plaintext, secret=None, vault_id=None):
"""Vault encrypt a piece of data.
:arg plaintext: a text or byte string to encrypt.
@ -181,6 +438,13 @@ class VaultLib:
If the string passed in is a text string, it will be encoded to UTF-8
before encryption.
"""
if secret is None:
if self.secrets:
secret_vault_id, secret = match_encrypt_secret(self.secrets)
else:
raise AnsibleVaultError("A vault password must be specified to encrypt data")
b_plaintext = to_bytes(plaintext, errors='surrogate_or_strict')
if is_encrypted(b_plaintext):
@ -195,10 +459,13 @@ class VaultLib:
raise AnsibleError(u"{0} cipher could not be found".format(self.cipher_name))
# encrypt data
b_ciphertext = this_cipher.encrypt(b_plaintext, self.b_password)
display.vvvvv('Encrypting with vault secret %s' % secret)
b_ciphertext = this_cipher.encrypt(b_plaintext, secret)
# format the data for output to the file
b_vaulttext = self._format_output(b_ciphertext)
b_vaulttext = format_vaulttext_envelope(b_ciphertext,
self.cipher_name,
vault_id=vault_id)
return b_vaulttext
def decrypt(self, vaulttext, filename=None):
@ -213,8 +480,8 @@ class VaultLib:
"""
b_vaulttext = to_bytes(vaulttext, errors='strict', encoding='utf-8')
if self.b_password is None:
raise AnsibleError("A vault password must be specified to decrypt data")
if self.secrets is None:
raise AnsibleVaultError("A vault password must be specified to decrypt data")
if not is_encrypted(b_vaulttext):
msg = "input is not vault encrypted data"
@ -222,17 +489,70 @@ class VaultLib:
msg += "%s is not a vault encrypted file" % filename
raise AnsibleError(msg)
# clean out header
b_vaulttext = self._split_header(b_vaulttext)
b_vaulttext, b_version, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext)
# create the cipher object
if self.cipher_name in CIPHER_WHITELIST:
this_cipher = CIPHER_MAPPING[self.cipher_name]()
# create the cipher object, note that the cipher used for decrypt can
# be different than the cipher used for encrypt
if cipher_name in CIPHER_WHITELIST:
this_cipher = CIPHER_MAPPING[cipher_name]()
else:
raise AnsibleError("{0} cipher could not be found".format(self.cipher_name))
raise AnsibleError("{0} cipher could not be found".format(cipher_name))
b_plaintext = None
if not self.secrets:
raise AnsibleVaultError('Attempting to decrypt but no vault secrets found')
# WARNING: Currently, the vault id is not required to match the vault id in the vault blob to
# decrypt a vault properly. The vault id in the vault blob is not part of the encrypted
# or signed vault payload. There is no cryptographic checking/verification/validation of the
# vault blobs vault id. It can be tampered with and changed. The vault id is just a nick
# name to use to pick the best secret and provide some ux/ui info.
# iterate over all the applicable secrets (all of them by default) until one works...
# if we specify a vault_id, only the corresponding vault secret is checked and
# we check it first.
vault_id_matchers = []
if vault_id:
display.vvvvv('Found a vault_id (%s) in the vaulttext' % (vault_id))
vault_id_matchers.append(vault_id)
_matches = match_secrets(self.secrets, vault_id_matchers)
if _matches:
display.vvvvv('We have a secret associated with vault id (%s), will try to use to decrypt %s' % (vault_id, filename))
else:
display.vvvvv('Found a vault_id (%s) in the vault text, but we do not have a associated secret (--vault-id)' % (vault_id))
# Not adding the other secrets to vault_secret_ids enforces a match between the vault_id from the vault_text and
# the known vault secrets.
if not C.DEFAULT_VAULT_ID_MATCH:
# Add all of the known vault_ids as candidates for decrypting a vault.
vault_id_matchers.extend([_vault_id for _vault_id, _secret in self.secrets if _vault_id != vault_id])
matched_secrets = match_secrets(self.secrets, vault_id_matchers)
# for vault_secret_id in vault_secret_ids:
for vault_secret_id, vault_secret in matched_secrets:
display.vvvvv('Trying to use vault secret=(%s) id=%s to decrypt %s' % (vault_secret, vault_secret_id, filename))
try:
# secret = self.secrets[vault_secret_id]
display.vvvv('Trying secret %s for vault_id=%s' % (vault_secret, vault_secret_id))
b_plaintext = this_cipher.decrypt(b_vaulttext, vault_secret)
if b_plaintext is not None:
display.vvvvv('decrypt succesful with secret=%s and vault_id=%s' % (vault_secret, vault_secret_id))
break
except AnsibleError as e:
display.vvvv('Tried to use the vault secret (%s) to decrypt (%s) but it failed. Error: %s' %
(vault_secret_id, filename, e))
continue
else:
msg = "Decryption failed (no vault secrets would found that could decrypt)"
if filename:
msg += " on %s" % filename
raise AnsibleVaultError(msg)
# try to unencrypt vaulttext
b_plaintext = this_cipher.decrypt(b_vaulttext, self.b_password)
if b_plaintext is None:
msg = "Decryption failed"
if filename:
@ -241,54 +561,12 @@ class VaultLib:
return b_plaintext
def _format_output(self, b_ciphertext):
""" Add header and format to 80 columns
:arg b_vaulttext: the encrypted and hexlified data as a byte string
:returns: a byte str that should be dumped into a file. It's
formatted to 80 char columns and has the header prepended
"""
if not self.cipher_name:
raise AnsibleError("the cipher must be set before adding a header")
header = b';'.join([b_HEADER, self.b_version,
to_bytes(self.cipher_name, 'utf-8', errors='strict')])
b_vaulttext = [header]
b_vaulttext += [b_ciphertext[i:i + 80] for i in range(0, len(b_ciphertext), 80)]
b_vaulttext += [b'']
b_vaulttext = b'\n'.join(b_vaulttext)
return b_vaulttext
def _split_header(self, b_vaulttext):
"""Retrieve information about the Vault and clean the data
When data is saved, it has a header prepended and is formatted into 80
character lines. This method extracts the information from the header
and then removes the header and the inserted newlines. The string returned
is suitable for processing by the Cipher classes.
:arg b_vaulttext: byte str containing the data from a save file
:returns: a byte str suitable for passing to a Cipher class's
decrypt() function.
"""
# used by decrypt
b_tmpdata = b_vaulttext.split(b'\n')
b_tmpheader = b_tmpdata[0].strip().split(b';')
self.b_version = b_tmpheader[1].strip()
self.cipher_name = to_text(b_tmpheader[2].strip())
b_ciphertext = b''.join(b_tmpdata[1:])
return b_ciphertext
class VaultEditor:
def __init__(self, b_password):
self.vault = VaultLib(b_password)
def __init__(self, vault=None):
# TODO: it may be more useful to just make VaultSecrets and index of VaultLib objects...
self.vault = vault or VaultLib()
# TODO: mv shred file stuff to it's own class
def _shred_file_custom(self, tmp_path):
@ -358,7 +636,8 @@ class VaultEditor:
os.remove(tmp_path)
def _edit_file_helper(self, filename, existing_data=None, force_save=False):
def _edit_file_helper(self, filename, secret,
existing_data=None, force_save=False, vault_id=None):
# Create a tempfile
fd, tmp_path = tempfile.mkstemp()
@ -385,7 +664,7 @@ class VaultEditor:
# encrypt new data and write out to tmp
# An existing vaultfile will always be UTF-8,
# so decode to unicode here
b_ciphertext = self.vault.encrypt(b_tmpdata)
b_ciphertext = self.vault.encrypt(b_tmpdata, secret, vault_id=vault_id)
self.write_data(b_ciphertext, tmp_path)
# shuffle tmp file into place
@ -399,13 +678,13 @@ class VaultEditor:
real_path = os.path.realpath(filename)
return real_path
def encrypt_bytes(self, b_plaintext):
def encrypt_bytes(self, b_plaintext, secret, vault_id=None):
b_ciphertext = self.vault.encrypt(b_plaintext)
b_ciphertext = self.vault.encrypt(b_plaintext, secret, vault_id=vault_id)
return b_ciphertext
def encrypt_file(self, filename, output_file=None):
def encrypt_file(self, filename, secret, vault_id=None, output_file=None):
# A file to be encrypted into a vaultfile could be any encoding
# so treat the contents as a byte string.
@ -414,7 +693,7 @@ class VaultEditor:
filename = self._real_path(filename)
b_plaintext = self.read_data(filename)
b_ciphertext = self.vault.encrypt(b_plaintext)
b_ciphertext = self.vault.encrypt(b_plaintext, secret, vault_id=vault_id)
self.write_data(b_ciphertext, output_file or filename)
def decrypt_file(self, filename, output_file=None):
@ -425,12 +704,12 @@ class VaultEditor:
ciphertext = self.read_data(filename)
try:
plaintext = self.vault.decrypt(ciphertext)
plaintext = self.vault.decrypt(ciphertext, filename=filename)
except AnsibleError as e:
raise AnsibleError("%s for %s" % (to_bytes(e), to_bytes(filename)))
self.write_data(plaintext, output_file or filename, shred=False)
def create_file(self, filename):
def create_file(self, filename, secret, vault_id=None):
""" create a new encrypted file """
# FIXME: If we can raise an error here, we can probably just make it
@ -438,58 +717,80 @@ class VaultEditor:
if os.path.isfile(filename):
raise AnsibleError("%s exists, please use 'edit' instead" % filename)
self._edit_file_helper(filename)
self._edit_file_helper(filename, secret, vault_id=vault_id)
def edit_file(self, filename):
# follow the symlink
filename = self._real_path(filename)
ciphertext = self.read_data(filename)
b_vaulttext = self.read_data(filename)
# vault or yaml files are always utf8
vaulttext = to_text(b_vaulttext)
try:
plaintext = self.vault.decrypt(ciphertext)
# vaulttext gets converted back to bytes, but alas
plaintext = self.vault.decrypt(vaulttext)
except AnsibleError as e:
raise AnsibleError("%s for %s" % (to_bytes(e), to_bytes(filename)))
# Figure out the vault id from the file, to select the right secret to re-encrypt it
# (duplicates parts of decrypt, but alas...)
b_ciphertext, b_version, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext)
# if we could decrypt, the vault_id should be in secrets
# though we could have multiple secrets for a given vault_id, pick the first one
secrets = match_secrets(self.vault.secrets, [vault_id])
secret = secrets[0][1]
if self.vault.cipher_name not in CIPHER_WRITE_WHITELIST:
# we want to get rid of files encrypted with the AES cipher
self._edit_file_helper(filename, existing_data=plaintext, force_save=True)
self._edit_file_helper(filename, secret, existing_data=plaintext, force_save=True)
else:
self._edit_file_helper(filename, existing_data=plaintext, force_save=False)
self._edit_file_helper(filename, secret, existing_data=plaintext, force_save=False)
def plaintext(self, filename):
ciphertext = self.read_data(filename)
b_vaulttext = self.read_data(filename)
vaulttext = to_text(b_vaulttext)
try:
plaintext = self.vault.decrypt(ciphertext)
plaintext = self.vault.decrypt(vaulttext)
return plaintext
except AnsibleError as e:
raise AnsibleError("%s for %s" % (to_bytes(e), to_bytes(filename)))
raise AnsibleVaultError("%s for %s" % (to_bytes(e), to_bytes(filename)))
return plaintext
def rekey_file(self, filename, b_new_password):
# FIXME/TODO: make this use VaultSecret
def rekey_file(self, filename, new_vault_secret, new_vault_id=None):
# follow the symlink
filename = self._real_path(filename)
prev = os.stat(filename)
ciphertext = self.read_data(filename)
b_vaulttext = self.read_data(filename)
vaulttext = to_text(b_vaulttext)
try:
plaintext = self.vault.decrypt(ciphertext)
plaintext = self.vault.decrypt(vaulttext)
except AnsibleError as e:
raise AnsibleError("%s for %s" % (to_bytes(e), to_bytes(filename)))
# This is more or less an assert, see #18247
if b_new_password is None:
if new_vault_secret is None:
raise AnsibleError('The value for the new_password to rekey %s with is not valid' % filename)
new_vault = VaultLib(b_new_password)
new_ciphertext = new_vault.encrypt(plaintext)
# FIXME: VaultContext...? could rekey to a different vault_id in the same VaultSecrets
self.write_data(new_ciphertext, filename)
# Need a new VaultLib because the new vault data can be a different
# vault lib format or cipher (for ex, when we migrate 1.0 style vault data to
# 1.1 style data we change the version and the cipher). This is where a VaultContext might help
# the new vault will only be used for encrypting, so it doesn't need the vault secrets
# (we will pass one in directly to encrypt)
new_vault = VaultLib(secrets={})
b_new_vaulttext = new_vault.encrypt(plaintext, new_vault_secret, vault_id=new_vault_id)
self.write_data(b_new_vaulttext, filename)
# preserve permissions
os.chmod(filename, prev.st_mode)
@ -565,8 +866,8 @@ class VaultEditor:
os.chown(dest, prev.st_uid, prev.st_gid)
def _editor_shell_command(self, filename):
EDITOR = os.environ.get('EDITOR', 'vi')
editor = shlex.split(EDITOR)
env_editor = os.environ.get('EDITOR', 'vi')
editor = shlex.split(env_editor)
editor.append(filename)
return editor
@ -612,15 +913,26 @@ class VaultAES:
raise AnsibleError("Encryption disabled for deprecated VaultAES class")
@staticmethod
def _parse_plaintext_envelope(b_envelope):
# split out sha and verify decryption
b_split_data = b_envelope.split(b"\n", 1)
b_this_sha = b_split_data[0]
b_plaintext = b_split_data[1]
b_test_sha = to_bytes(sha256(b_plaintext).hexdigest())
return b_plaintext, b_this_sha, b_test_sha
@classmethod
def _decrypt_cryptography(cls, b_salt, b_ciphertext, b_password, key_length):
bs = algorithms.AES.block_size // 8
b_key, b_iv = cls._aes_derive_key_and_iv(b_password, b_salt, key_length, bs)
cipher = C_Cipher(algorithms.AES(b_key), modes.CBC(b_iv), CRYPTOGRAPHY_BACKEND).decryptor()
unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
try:
b_plaintext = unpadder.update(
b_plaintext_envelope = unpadder.update(
cipher.update(b_ciphertext) + cipher.finalize()
) + unpadder.finalize()
except ValueError:
@ -628,11 +940,7 @@ class VaultAES:
# password was given
raise AnsibleError("Decryption failed")
# split out sha and verify decryption
b_split_data = b_plaintext.split(b"\n", 1)
b_this_sha = b_split_data[0]
b_plaintext = b_split_data[1]
b_test_sha = to_bytes(sha256(b_plaintext).hexdigest())
b_plaintext, b_this_sha, b_test_sha = cls._parse_plaintext_envelope(b_plaintext_envelope)
if b_this_sha != b_test_sha:
raise AnsibleError("Decryption failed")
@ -646,6 +954,7 @@ class VaultAES:
out_file = BytesIO()
bs = AES_pycrypto.block_size
b_key, b_iv = cls._aes_derive_key_and_iv(b_password, b_salt, key_length, bs)
cipher = AES_pycrypto.new(b_key, AES_pycrypto.MODE_CBC, b_iv)
b_next_chunk = b''
@ -667,14 +976,10 @@ class VaultAES:
# reset the stream pointer to the beginning
out_file.seek(0)
b_out_data = out_file.read()
b_plaintext_envelope = out_file.read()
out_file.close()
# split out sha and verify decryption
b_split_data = b_out_data.split(b"\n", 1)
b_this_sha = b_split_data[0]
b_plaintext = b_split_data[1]
b_test_sha = to_bytes(sha256(b_plaintext).hexdigest())
b_plaintext, b_this_sha, b_test_sha = cls._parse_plaintext_envelope(b_plaintext_envelope)
if b_this_sha != b_test_sha:
raise AnsibleError("Decryption failed")
@ -682,7 +987,7 @@ class VaultAES:
return b_plaintext
@classmethod
def decrypt(cls, b_vaulttext, b_password, key_length=32):
def decrypt(cls, b_vaulttext, secret, key_length=32, vault_id=None):
""" Decrypt the given data and return it
:arg b_data: A byte string containing the encrypted data
@ -700,6 +1005,8 @@ class VaultAES:
b_salt = b_vaultdata[len(b'Salted__'):16]
b_ciphertext = b_vaultdata[16:]
b_password = secret.bytes
if HAS_CRYPTOGRAPHY:
b_plaintext = cls._decrypt_cryptography(b_salt, b_ciphertext, b_password, key_length)
elif HAS_PYCRYPTO:
@ -789,7 +1096,7 @@ class VaultAES256:
hmac.update(b_ciphertext)
b_hmac = hmac.finalize()
return hexlify(b_hmac), hexlify(b_ciphertext)
return to_bytes(hexlify(b_hmac), errors='surrogate_or_strict'), hexlify(b_ciphertext)
@staticmethod
def _encrypt_pycrypto(b_plaintext, b_salt, b_key1, b_key2, b_iv):
@ -820,8 +1127,11 @@ class VaultAES256:
return to_bytes(hmac.hexdigest(), errors='surrogate_or_strict'), hexlify(b_ciphertext)
@classmethod
def encrypt(cls, b_plaintext, b_password):
def encrypt(cls, b_plaintext, secret):
if secret is None:
raise AnsibleVaultError('The secret passed to encrypt() was None')
b_salt = os.urandom(32)
b_password = secret.bytes
b_key1, b_key2, b_iv = cls._gen_key_initctr(b_password, b_salt)
if HAS_CRYPTOGRAPHY:
@ -837,15 +1147,16 @@ class VaultAES256:
b_vaulttext = hexlify(b_vaulttext)
return b_vaulttext
@staticmethod
def _decrypt_cryptography(b_ciphertext, b_crypted_hmac, b_key1, b_key2, b_iv):
@classmethod
def _decrypt_cryptography(cls, b_ciphertext, b_crypted_hmac, b_key1, b_key2, b_iv):
# b_key1, b_key2, b_iv = self._gen_key_initctr(b_password, b_salt)
# EXIT EARLY IF DIGEST DOESN'T MATCH
hmac = HMAC(b_key2, hashes.SHA256(), CRYPTOGRAPHY_BACKEND)
hmac.update(b_ciphertext)
try:
hmac.verify(unhexlify(b_crypted_hmac))
except InvalidSignature:
return None
except InvalidSignature as e:
raise AnsibleVaultError('HMAC verification failed: %s' % e)
cipher = C_Cipher(algorithms.AES(b_key1), modes.CTR(b_iv), CRYPTOGRAPHY_BACKEND)
decryptor = cipher.decryptor()
@ -904,12 +1215,19 @@ class VaultAES256:
return b_plaintext
@classmethod
def decrypt(cls, b_vaulttext, b_password):
def decrypt(cls, b_vaulttext, secret):
# SPLIT SALT, DIGEST, AND DATA
b_vaulttext = unhexlify(b_vaulttext)
b_salt, b_crypted_hmac, b_ciphertext = b_vaulttext.split(b"\n", 2)
b_salt = unhexlify(b_salt)
b_ciphertext = unhexlify(b_ciphertext)
# TODO: would be nice if a VaultSecret could be passed directly to _decrypt_*
# (move _gen_key_initctr() to a AES256 VaultSecret or VaultContext impl?)
# though, likely needs to be python cryptography specific impl that basically
# creates a Cipher() with b_key1, a Mode.CTR() with b_iv, and a HMAC() with sign key b_key2
b_password = secret.bytes
b_key1, b_key2, b_iv = cls._gen_key_initctr(b_password, b_salt)
if HAS_CRYPTOGRAPHY:
@ -921,6 +1239,7 @@ class VaultAES256:
return b_plaintext
# Keys could be made bytes later if the code that gets the data is more
# naturally byte-oriented
CIPHER_MAPPING = {

View file

@ -23,9 +23,10 @@ from yaml.constructor import SafeConstructor, ConstructorError
from yaml.nodes import MappingNode
from ansible.module_utils._text import to_bytes
from ansible.parsing.vault import VaultLib
from ansible.parsing.yaml.objects import AnsibleMapping, AnsibleSequence, AnsibleUnicode, AnsibleVaultEncryptedUnicode
from ansible.parsing.yaml.objects import AnsibleMapping, AnsibleSequence, AnsibleUnicode
from ansible.parsing.yaml.objects import AnsibleVaultEncryptedUnicode
from ansible.utils.unsafe_proxy import wrap_var
from ansible.parsing.vault import VaultLib, parse_vaulttext_envelope
try:
@ -36,12 +37,12 @@ except ImportError:
class AnsibleConstructor(SafeConstructor):
def __init__(self, file_name=None, b_vault_password=None):
self._b_vault_password = b_vault_password
def __init__(self, file_name=None, vault_secrets=None):
self._ansible_file_name = file_name
super(AnsibleConstructor, self).__init__()
self._vaults = {}
self._vaults['default'] = VaultLib(b_password=self._b_vault_password)
self.vault_secrets = vault_secrets or []
self._vaults['default'] = VaultLib(secrets=self.vault_secrets)
def construct_yaml_map(self, node):
data = AnsibleMapping()
@ -96,17 +97,16 @@ class AnsibleConstructor(SafeConstructor):
def construct_vault_encrypted_unicode(self, node):
value = self.construct_scalar(node)
ciphertext_data = to_bytes(value)
if self._b_vault_password is None:
b_ciphertext_data = to_bytes(value)
# could pass in a key id here to choose the vault to associate with
# TODO/FIXME: plugin vault selector
vault = self._vaults['default']
if vault.secrets is None:
raise ConstructorError(context=None, context_mark=None,
problem="found !vault but no vault password provided",
problem_mark=node.start_mark,
note=None)
# could pass in a key id here to choose the vault to associate with
vault = self._vaults['default']
ret = AnsibleVaultEncryptedUnicode(ciphertext_data)
ret = AnsibleVaultEncryptedUnicode(b_ciphertext_data)
ret.vault = vault
return ret

View file

@ -32,9 +32,9 @@ from ansible.parsing.yaml.constructor import AnsibleConstructor
if HAVE_PYYAML_C:
class AnsibleLoader(CParser, AnsibleConstructor, Resolver):
def __init__(self, stream, file_name=None, vault_password=None):
def __init__(self, stream, file_name=None, vault_secrets=None):
CParser.__init__(self, stream)
AnsibleConstructor.__init__(self, file_name=file_name, b_vault_password=vault_password)
AnsibleConstructor.__init__(self, file_name=file_name, vault_secrets=vault_secrets)
Resolver.__init__(self)
else:
from yaml.composer import Composer
@ -43,10 +43,10 @@ else:
from yaml.parser import Parser
class AnsibleLoader(Reader, Scanner, Parser, Composer, AnsibleConstructor, Resolver):
def __init__(self, stream, file_name=None, vault_password=None):
def __init__(self, stream, file_name=None, vault_secrets=None):
Reader.__init__(self, stream)
Scanner.__init__(self)
Parser.__init__(self)
Composer.__init__(self)
AnsibleConstructor.__init__(self, file_name=file_name, b_vault_password=vault_password)
AnsibleConstructor.__init__(self, file_name=file_name, vault_secrets=vault_secrets)
Resolver.__init__(self)

View file

@ -76,11 +76,11 @@ class AnsibleVaultEncryptedUnicode(yaml.YAMLObject, AnsibleBaseYAMLObject):
yaml_tag = u'!vault'
@classmethod
def from_plaintext(cls, seq, vault):
def from_plaintext(cls, seq, vault, secret):
if not vault:
raise vault.AnsibleVaultError('Error creating AnsibleVaultEncryptedUnicode, invalid vault (%s) provided' % vault)
ciphertext = vault.encrypt(seq)
ciphertext = vault.encrypt(seq, secret)
avu = cls(ciphertext)
avu.vault = vault
return avu