passwordstore: Make compatible with shims (#4780)

* passwordstore: Make compatible with shims, add backend config

This allows using the passwordstore plugin with scripts that wrap other
password managers. Also adds an explicit configuration (`backend` in
`ini` and `passwordstore_backend` in `vars`) to set the backend to `pass`
(the default) or `gopass`, which allows using gopass as the backend
without the need of a wrapper script. Please be aware that gopass
support is currently limited, but will work for basic operations.

Includes integrations tests.

Resolves #4766

* Apply suggestions from code review
This commit is contained in:
grembo 2022-06-15 08:08:04 +02:00 committed by GitHub
commit 006f3bfa89
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 251 additions and 8 deletions

View file

@ -106,6 +106,22 @@ DOCUMENTATION = '''
type: str
default: 15m
version_added: 4.5.0
backend:
description:
- Specify which backend to use.
- Defaults to C(pass), passwordstore.org's original pass utility.
- C(gopass) support is incomplete.
ini:
- section: passwordstore_lookup
key: backend
vars:
- name: passwordstore_backend
type: str
default: pass
choices:
- pass
- gopass
version_added: 5.2.0
'''
EXAMPLES = """
ansible.cfg: |
@ -231,6 +247,24 @@ def check_output2(*popenargs, **kwargs):
class LookupModule(LookupBase):
def __init__(self, loader=None, templar=None, **kwargs):
super(LookupModule, self).__init__(loader, templar, **kwargs)
self.realpass = None
def is_real_pass(self):
if self.realpass is None:
try:
self.passoutput = to_text(
check_output2([self.pass_cmd, "--version"], env=self.env),
errors='surrogate_or_strict'
)
self.realpass = 'pass: the standard unix password manager' in self.passoutput
except (subprocess.CalledProcessError) as e:
raise AnsibleError(e)
return self.realpass
def parse_params(self, term):
# I went with the "traditional" param followed with space separated KV pairs.
# Waiting for final implementation of lookup parameter parsing.
@ -270,10 +304,12 @@ class LookupModule(LookupBase):
self.env = os.environ.copy()
self.env['LANGUAGE'] = 'C' # make sure to get errors in English as required by check_output2
# Set PASSWORD_STORE_DIR
if os.path.isdir(self.paramvals['directory']):
if self.backend == 'gopass':
self.env['GOPASS_NO_REMINDER'] = "YES"
elif os.path.isdir(self.paramvals['directory']):
# Set PASSWORD_STORE_DIR
self.env['PASSWORD_STORE_DIR'] = self.paramvals['directory']
else:
elif self.is_real_pass():
raise AnsibleError('Passwordstore directory \'{0}\' does not exist'.format(self.paramvals['directory']))
# Set PASSWORD_STORE_UMASK if umask is set
@ -288,7 +324,9 @@ class LookupModule(LookupBase):
def check_pass(self):
try:
self.passoutput = to_text(
check_output2(["pass", "show", self.passname], env=self.env),
check_output2([self.pass_cmd, 'show'] +
(['--password'] if self.backend == 'gopass' else []) +
[self.passname], env=self.env),
errors='surrogate_or_strict'
).splitlines()
self.password = self.passoutput[0]
@ -302,8 +340,10 @@ class LookupModule(LookupBase):
if ':' in line:
name, value = line.split(':', 1)
self.passdict[name.strip()] = value.strip()
if os.path.isfile(os.path.join(self.paramvals['directory'], self.passname + ".gpg")):
# Only accept password as found, if there a .gpg file for it (might be a tree node otherwise)
if (self.backend == 'gopass' or
os.path.isfile(os.path.join(self.paramvals['directory'], self.passname + ".gpg"))
or not self.is_real_pass()):
# When using real pass, only accept password as found if there is a .gpg file for it (might be a tree node otherwise)
return True
except (subprocess.CalledProcessError) as e:
# 'not in password store' is the expected error if a password wasn't found
@ -339,7 +379,7 @@ class LookupModule(LookupBase):
if self.paramvals['backup']:
msg += "lookup_pass: old password was {0} (Updated on {1})\n".format(self.password, datetime)
try:
check_output2(['pass', 'insert', '-f', '-m', self.passname], input=msg, env=self.env)
check_output2([self.pass_cmd, 'insert', '-f', '-m', self.passname], input=msg, env=self.env)
except (subprocess.CalledProcessError) as e:
raise AnsibleError(e)
return newpass
@ -351,7 +391,7 @@ class LookupModule(LookupBase):
datetime = time.strftime("%d/%m/%Y %H:%M:%S")
msg = newpass + '\n' + "lookup_pass: First generated by ansible on {0}\n".format(datetime)
try:
check_output2(['pass', 'insert', '-f', '-m', self.passname], input=msg, env=self.env)
check_output2([self.pass_cmd, 'insert', '-f', '-m', self.passname], input=msg, env=self.env)
except (subprocess.CalledProcessError) as e:
raise AnsibleError(e)
return newpass
@ -380,6 +420,8 @@ class LookupModule(LookupBase):
yield
def setup(self, variables):
self.backend = self.get_option('backend')
self.pass_cmd = self.backend # pass and gopass are commands as well
self.locked = None
timeout = self.get_option('locktimeout')
if not re.match('^[0-9]+[smh]$', timeout):
@ -402,6 +444,7 @@ class LookupModule(LookupBase):
}
def run(self, terms, variables, **kwargs):
self.set_options(var_options=variables, direct=kwargs)
self.setup(variables)
result = []