add salt parameter to hash generation for sha256 plugin

This commit is contained in:
Matthieu Bourgain 2024-04-19 09:43:10 +02:00
commit 805c3ea248
No known key found for this signature in database
GPG key ID: 33BA95C808890C39
4 changed files with 194 additions and 6 deletions

View file

@ -0,0 +1,105 @@
"""Generate MySQL caching_sha2_password hash for a given password and salt."""
import hashlib
def to64(v: int, n: int) -> str:
"""Convert a 32-bit integer to a base-64 string"""
i64 = (
[".", "/"]
+ [chr(x) for x in range(48, 58)]
+ [chr(x) for x in range(65, 91)]
+ [chr(x) for x in range(97, 123)]
)
result: str = ""
while n > 0:
n -= 1
result += i64[v & 0x3F]
v >>= 6
return result
def hashlib_sha256(data: bytes) -> bytes:
"""Return SHA-256 digest from hashlib ."""
return hashlib.sha256(data).digest()
def sha256_digest(key: str, salt: str, loops: int) -> str:
"""Return a SHA-256 digest of the concatenation of the key, the salt, and the key, repeated as necessary."""
# https://www.akkadia.org/drepper/SHA-crypt.txt
num_bytes: bytes = 32
bytes_key: bytes = key.encode()
bytes_salt: bytes = salt.encode()
digest_b = hashlib_sha256(bytes_key + bytes_salt + bytes_key)
tmp = bytes_key + bytes_salt
for i in range(len(bytes_key), 0, -num_bytes):
tmp += digest_b if i > num_bytes else digest_b[:i]
i = len(bytes_key)
while i > 0:
tmp += digest_b if (i & 1) != 0 else bytes_key
i >>= 1
digest_a = hashlib_sha256(tmp)
tmp = b""
for i in range(len(bytes_key)):
tmp += bytes_key
digest_dp = hashlib_sha256(tmp)
byte_sequence_p = b""
for i in range(len(bytes_key), 0, -num_bytes):
byte_sequence_p += digest_dp if i > num_bytes else digest_dp[:i]
tmp = b""
til = 16 + digest_a[0]
for i in range(til):
tmp += bytes_salt
digest_ds = hashlib_sha256(tmp)
byte_sequence_s = b""
for i in range(len(bytes_salt), 0, -num_bytes):
byte_sequence_s += digest_ds if i > num_bytes else digest_ds[:i]
digest_c = digest_a
for i in range(loops):
tmp = byte_sequence_p if (i & 1) else digest_c
if i % 3:
tmp += byte_sequence_s
if i % 7:
tmp += byte_sequence_p
tmp += digest_c if (i & 1) else byte_sequence_p
digest_c = hashlib_sha256(tmp)
inc1, inc2, mod, end = (10, 21, 30, 0)
i = 0
tmp = ""
while True:
tmp += to64(
(digest_c[i] << 16) | (digest_c[(i + inc1) % mod] << 8) | digest_c[(i + inc1 * 2) % mod], 4
)
i = (i + inc2) % mod
if i == end:
break
tmp += to64((digest_c[31] << 8) | digest_c[30], 3)
return tmp
def mysql_sha256_password_hash_hex(password: str, salt: str) -> str:
"""Return a MySQL compatible caching_sha2_password hash in hex format."""
assert len(salt) == 20, "Salt must be 20 characters long."
count = 5
iteration = 1000 * count
digest = sha256_digest(password, salt, iteration)
return f"$A${count:>03}${salt}{digest}".encode().hex().upper()

View file

@ -1,4 +1,6 @@
from __future__ import (absolute_import, division, print_function)
from plugins.module_utils.implementations.mysql.hash import mysql_sha256_password_hash_hex
__metaclass__ = type
# This code is part of Ansible, but is an independent component.
@ -135,12 +137,15 @@ def get_existing_authentication(cursor, user, host):
def user_add(cursor, user, host, host_all, password, encrypted,
plugin, plugin_hash_string, plugin_auth_string, new_priv,
plugin, plugin_hash_string, plugin_auth_string, salt, new_priv,
attributes, tls_requires, reuse_existing_password, module,
password_expire, password_expire_interval):
# If attributes are set, perform a sanity check to ensure server supports user attributes before creating user
if attributes and not get_attribute_support(cursor):
module.fail_json(msg="user attributes were specified but the server does not support user attributes")
# Only caching_sha2_password and sha256_password are supported for hash generation
if salt and plugin not in ['caching_sha2_password', 'sha256_password']:
module.fail_json(msg="salt requires caching_sha2_password or sha256_password plugin")
# we cannot create users without a proper hostname
if host_all:
@ -181,6 +186,10 @@ def user_add(cursor, user, host, host_all, password, encrypted,
# Mysql and MariaDB differ in naming pam plugin and Syntax to set it
if plugin == 'pam': # Used by MariaDB which requires the USING keyword, not BY
query_with_args = "CREATE USER %s@%s IDENTIFIED WITH %s USING %s", (user, host, plugin, plugin_auth_string)
elif salt:
if plugin in ['caching_sha2_password', 'sha256_password']:
generated_hash_string = mysql_sha256_password_hash_hex(password=plugin_auth_string, salt=salt)
query_with_args = "CREATE USER %s@%s IDENTIFIED WITH %s AS %s", (user, host, plugin, generated_hash_string)
else:
query_with_args = "CREATE USER %s@%s IDENTIFIED WITH %s BY %s", (user, host, plugin, plugin_auth_string)
elif plugin:
@ -221,9 +230,13 @@ def is_hash(password):
def user_mod(cursor, user, host, host_all, password, encrypted,
plugin, plugin_hash_string, plugin_auth_string, new_priv,
plugin, plugin_hash_string, plugin_auth_string, salt, new_priv,
append_privs, subtract_privs, attributes, tls_requires, module,
password_expire, password_expire_interval, role=False, maria_role=False):
# Only caching_sha2_password and sha256_password are supported for hash generation
if salt and plugin not in ['caching_sha2_password', 'sha256_password']:
module.fail_json(msg="salt requires caching_sha2_password or sha256_password plugin")
changed = False
msg = "User unchanged"
grant_option = False
@ -356,6 +369,10 @@ def user_mod(cursor, user, host, host_all, password, encrypted,
# Mysql and MariaDB differ in naming pam plugin and syntax to set it
if plugin in ('pam', 'ed25519'):
query_with_args = "ALTER USER %s@%s IDENTIFIED WITH %s USING %s", (user, host, plugin, plugin_auth_string)
elif salt:
if plugin in ['caching_sha2_password', 'sha256_password']:
generated_hash_string = mysql_sha256_password_hash_hex(password=plugin_auth_string, salt=salt)
query_with_args = "ALTER USER %s@%s IDENTIFIED WITH %s AS %s", (user, host, plugin, generated_hash_string)
else:
query_with_args = "ALTER USER %s@%s IDENTIFIED WITH %s BY %s", (user, host, plugin, plugin_auth_string)
else:

View file

@ -139,8 +139,15 @@ options:
description:
- User's plugin auth_string (``CREATE USER user IDENTIFIED WITH plugin BY plugin_auth_string``).
- If I(plugin) is ``pam`` (MariaDB) or ``auth_pam`` (MySQL) an optional I(plugin_auth_string) can be used to choose a specific PAM service.
- You need to define a I(salt) to have idempotence on password change.
type: str
version_added: '0.1.0'
salt:
description:
- Salt used to generate password hash.
- I(plugin) must be equal to ``caching_sha2_password`` and I(plugin_auth_string) must be defined.
type: str
version_added: '3.10.0'
resource_limits:
description:
- Limit the user for certain server resources. Provided since MySQL 5.6 / MariaDB 10.2.
@ -440,6 +447,7 @@ def main():
plugin=dict(default=None, type='str'),
plugin_hash_string=dict(default=None, type='str'),
plugin_auth_string=dict(default=None, type='str'),
salt=dict(default=None, type='str'),
resource_limits=dict(type='dict'),
force_context=dict(type='bool', default=False),
session_vars=dict(type='dict'),
@ -480,6 +488,7 @@ def main():
plugin = module.params["plugin"]
plugin_hash_string = module.params["plugin_hash_string"]
plugin_auth_string = module.params["plugin_auth_string"]
salt = module.params["salt"]
resource_limits = module.params["resource_limits"]
session_vars = module.params["session_vars"]
column_case_sensitive = module.params["column_case_sensitive"]
@ -542,13 +551,13 @@ def main():
try:
if update_password == "always":
result = user_mod(cursor, user, host, host_all, password, encrypted,
plugin, plugin_hash_string, plugin_auth_string,
plugin, plugin_hash_string, plugin_auth_string, salt,
priv, append_privs, subtract_privs, attributes, tls_requires, module,
password_expire, password_expire_interval)
else:
result = user_mod(cursor, user, host, host_all, None, encrypted,
None, None, None,
None, None, None, None,
priv, append_privs, subtract_privs, attributes, tls_requires, module,
password_expire, password_expire_interval)
changed = result['changed']
@ -566,7 +575,7 @@ def main():
priv = None # avoid granting unwanted privileges
reuse_existing_password = update_password == 'on_new_username'
result = user_add(cursor, user, host, host_all, password, encrypted,
plugin, plugin_hash_string, plugin_auth_string,
plugin, plugin_hash_string, plugin_auth_string, salt,
priv, attributes, tls_requires, reuse_existing_password, module,
password_expire, password_expire_interval)
changed = result['changed']

View file

@ -13,6 +13,7 @@
test_plugin_auth_string: 'Fdt8fd^34ds'
test_plugin_new_hash: '*E74368AC90460FA669F6D41BFB7F2A877DB73745'
test_plugin_new_auth_string: 'c$K01LsmK7nJnIR4!h'
test_salt: 'TDwqdanU82d0yNtvaabb'
test_default_priv_type: 'SELECT'
test_default_priv: '*.*:{{ test_default_priv_type }}'
@ -115,7 +116,7 @@
<<: *mysql_params
name: '{{ test_user_name }}'
host: '%'
plugin: '{{ test_plugin_type }}'
plugin: '{{ test_plugin_type_sha256 }}'
plugin_hash_string: '{{ test_plugin_hash }}'
priv: '{{ test_default_priv }}'
register: result
@ -475,3 +476,59 @@
- include_tasks: utils/remove_user.yml
vars:
user_name: "{{ test_user_name }}"
# ============================================================
# Test plugin auth with a salt
#
- name: Plugin auth | Create user with plugin auth and salt
mysql_user:
<<: *mysql_params
name: '{{ test_user_name }}'
host: '%'
plugin: caching_sha2_password
plugin_auth_string: '{{ test_plugin_auth_string }}'
salt: '{{ test_salt }}'
priv: '{{ test_default_priv }}'
register: result
- name: Plugin auth | Assert that plugin_auth_string and salt was successful
assert:
that:
- result is succeeded
- include_tasks: utils/assert_user.yml
vars:
user_name: "{{ test_user_name }}"
user_host: "%"
priv: "{{ test_default_priv_type }}"
- name: Plugin auth | Connect with user and password
command: "{{ mysql_command }} -u {{ test_user_name }} -p{{ test_plugin_auth_string }} -e \"SELECT 1\""
register: result
- name: Plugin auth | Assert that connection was successful
assert:
that:
- result is succeeded
- name: Plugin auth | Alter user with same plugin auth and same salt
mysql_user:
<<: *mysql_params
name: '{{ test_user_name }}'
host: '%'
plugin: caching_sha2_password
plugin_auth_string: '{{ test_plugin_auth_string }}'
salt: '{{ test_salt }}'
priv: '{{ test_default_priv }}'
register: result
- name: Plugin auth | Assert that plugin_auth_string and salt doesn't trigger change
assert:
that:
- result is not changed
# Cleanup
- include_tasks: utils/remove_user.yml
vars:
user_name: "{{ test_user_name }}"