mysql_user: add proper handling of INSERT, UPDATE, REFERENCES on columns (#107)

* mysql_user: add proper handling of INSERT, UPDATE, REFERENCES on columns

* Add changelog fragment

* fix sanity

* fix CI

* fix sanity

* fix CI

* make the assertion fairer

* Improve
This commit is contained in:
Andrew Klychkov 2021-03-08 17:18:04 +01:00 committed by GitHub
parent 2694464ffb
commit 979588e2cd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 127 additions and 40 deletions

View file

@ -0,0 +1,2 @@
bugfixes:
- mysql_user - fix handling of INSERT, UPDATE, REFERENCES on columns (https://github.com/ansible-collections/community.mysql/issues/106).

View file

@ -769,15 +769,12 @@ def privileges_get(cursor, user, host):
privileges = res.group(1).split(",") privileges = res.group(1).split(",")
privileges = [pick(x.strip()) for x in privileges] privileges = [pick(x.strip()) for x in privileges]
# Handle cases when there's GRANT SELECT (colA, ...) in privileges. # Handle cases when there's privs like GRANT SELECT (colA, ...) in privs.
# To this point, the privileges list can look like # To this point, the privileges list can look like
# ['SELECT (`A`', '`B`)', 'INSERT'] that is incorrect (SELECT statement is splitted). # ['SELECT (`A`', '`B`)', 'INSERT'] that is incorrect (SELECT statement is splitted).
# Columns should also be sorted to compare it with desired privileges later. # Columns should also be sorted to compare it with desired privileges later.
# Determine if there's a case similar to the above: # Determine if there's a case similar to the above:
start, end = has_select_on_col(privileges) privileges = normalize_col_grants(privileges)
# If not, either start and end will be None
if start is not None:
privileges = handle_select_on_col(privileges, start, end)
if "WITH GRANT OPTION" in res.group(7): if "WITH GRANT OPTION" in res.group(7):
privileges.append('GRANT') privileges.append('GRANT')
@ -788,7 +785,23 @@ def privileges_get(cursor, user, host):
return output return output
def has_select_on_col(privileges): def normalize_col_grants(privileges):
"""Fix and sort grants on columns in privileges list
Make ['SELECT (A, B)', 'INSERT (A, B)', 'DETELE']
from ['SELECT (A', 'B)', 'INSERT (B', 'A)', 'DELETE'].
See unit tests in tests/unit/plugins/modules/test_mysql_user.py
"""
for grant in ('SELECT', 'UPDATE', 'INSERT', 'REFERENCES'):
start, end = has_grant_on_col(privileges, grant)
# If not, either start and end will be None
if start is not None:
privileges = handle_grant_on_col(privileges, start, end)
return privileges
def has_grant_on_col(privileges, grant):
"""Check if there is a statement like SELECT (colA, colB) """Check if there is a statement like SELECT (colA, colB)
in the privilege list. in the privilege list.
@ -799,7 +812,7 @@ def has_select_on_col(privileges):
start = None start = None
end = None end = None
for n, priv in enumerate(privileges): for n, priv in enumerate(privileges):
if 'SELECT (' in priv: if '%s (' % grant in priv:
# We found the start element # We found the start element
start = n start = n
@ -819,8 +832,8 @@ def has_select_on_col(privileges):
return None, None return None, None
def handle_select_on_col(privileges, start, end): def handle_grant_on_col(privileges, start, end):
"""Handle cases when the SELECT (colA, ...) is in the privileges list.""" """Handle cases when the privs like SELECT (colA, ...) is in the privileges list."""
# When the privileges list look like ['SELECT (colA,', 'colB)'] # When the privileges list look like ['SELECT (colA,', 'colB)']
# (Notice that the statement is splitted) # (Notice that the statement is splitted)
if start != end: if start != end:
@ -844,7 +857,7 @@ def handle_select_on_col(privileges, start, end):
def sort_column_order(statement): def sort_column_order(statement):
"""Sort column order in SELECT (colA, colB, ...) grants. """Sort column order in grants like SELECT (colA, colB, ...).
MySQL changes columns order like below: MySQL changes columns order like below:
--------------------------------------- ---------------------------------------
@ -870,8 +883,10 @@ def sort_column_order(statement):
# 3. Sort # 3. Sort
# 4. Put between () and return # 4. Put between () and return
# "SELECT (colA, colB) => "colA, colB" # "SELECT/UPDATE/.. (colA, colB) => "colA, colB"
columns = statement.split('(')[1].rstrip(')') tmp = statement.split('(')
priv_name = tmp[0]
columns = tmp[1].rstrip(')')
# "colA, colB" => ["colA", "colB"] # "colA, colB" => ["colA", "colB"]
columns = columns.split(',') columns = columns.split(',')
@ -881,7 +896,7 @@ def sort_column_order(statement):
columns[i] = col.strip('`') columns[i] = col.strip('`')
columns.sort() columns.sort()
return 'SELECT (%s)' % ', '.join(columns) return '%s(%s)' % (priv_name, ', '.join(columns))
def privileges_unpack(priv, mode): def privileges_unpack(priv, mode):
@ -927,10 +942,8 @@ def privileges_unpack(priv, mode):
output[pieces[0]] = pieces[1].upper().split(',') output[pieces[0]] = pieces[1].upper().split(',')
privs = output[pieces[0]] privs = output[pieces[0]]
# Handle cases when there's GRANT SELECT (colA, ...) in privs. # Handle cases when there's privs like GRANT SELECT (colA, ...) in privs.
start, end = has_select_on_col(output[pieces[0]]) output[pieces[0]] = normalize_col_grants(output[pieces[0]])
if start is not None:
output[pieces[0]] = handle_select_on_col(output[pieces[0]], start, end)
new_privs = frozenset(privs) new_privs = frozenset(privs)
if not new_privs.issubset(VALID_PRIVS): if not new_privs.issubset(VALID_PRIVS):

View file

@ -93,6 +93,46 @@
that: that:
- result is not changed - result is not changed
- name: Grant privs on columns
mysql_user:
<<: *mysql_params
name: '{{ user_name_3 }}'
priv:
'data3.test_table_issue99': 'SELECT (a, b), INSERT (a, b), UPDATE'
register: result
- assert:
that:
- result is changed
- name: Grant same privs on columns again, note that the column order is different
mysql_user:
<<: *mysql_params
name: '{{ user_name_3 }}'
priv:
'data3.test_table_issue99': 'SELECT (a, b), UPDATE, INSERT (b, a)'
register: result
- assert:
that:
- result is not changed
- name: Run command to show privileges for user (expect privileges in stdout)
command: "{{ mysql_command }} -e \"SHOW GRANTS FOR '{{ user_name_3 }}'@'localhost'\""
register: result
- name: Assert user has giving privileges
assert:
that:
- "'GRANT SELECT (`A`, `B`), INSERT (`A`, `B`), UPDATE' in result.stdout"
when: "'(`A`, `B`)' in result.stdout"
- name: Assert user has giving privileges
assert:
that:
- "'GRANT SELECT (A, B), INSERT (A, B), UPDATE' in result.stdout"
when: "'(`A`, `B`)' not in result.stdout"
########## ##########
# Clean up # Clean up
- name: Drop test databases - name: Drop test databases

View file

@ -6,8 +6,9 @@ __metaclass__ = type
import pytest import pytest
from ansible_collections.community.mysql.plugins.modules.mysql_user import ( from ansible_collections.community.mysql.plugins.modules.mysql_user import (
handle_select_on_col, handle_grant_on_col,
has_select_on_col, has_grant_on_col,
normalize_col_grants,
sort_column_order, sort_column_order,
supports_identified_by_password, supports_identified_by_password,
) )
@ -41,22 +42,28 @@ def test_supports_identified_by_password(function_return, cursor_output, cursor_
@pytest.mark.parametrize( @pytest.mark.parametrize(
'input_list,output_tuple', 'input_list,grant,output_tuple',
[ [
(['INSERT', 'DELETE'], (None, None)), (['INSERT', 'DELETE'], 'INSERT', (None, None)),
(['SELECT', 'UPDATE'], (None, None)), (['SELECT', 'UPDATE'], 'SELECT', (None, None)),
(['INSERT', 'UPDATE', 'INSERT', 'DELETE'], (None, None)), (['INSERT', 'UPDATE', 'INSERT', 'DELETE'], 'DELETE', (None, None)),
(['just', 'a', 'random', 'text'], (None, None)), (['just', 'a', 'random', 'text'], 'blabla', (None, None)),
(['SELECT (A, B, C)'], (0, 0)), (['SELECT (A, B, C)'], 'SELECT', (0, 0)),
(['UPDATE', 'SELECT (A, B, C)'], (1, 1)), (['UPDATE', 'SELECT (A, B, C)'], 'SELECT', (1, 1)),
(['INSERT', 'SELECT (A', 'B)'], (1, 2)), (['UPDATE', 'REFERENCES (A, B, C)'], 'REFERENCES', (1, 1)),
(['SELECT (A', 'B)', 'UPDATE'], (0, 1)), (['SELECT', 'UPDATE (A, B, C)'], 'UPDATE', (1, 1)),
(['INSERT', 'SELECT (A', 'B)', 'UPDATE'], (1, 2)), (['INSERT', 'SELECT (A', 'B)'], 'SELECT', (1, 2)),
(['SELECT (A', 'B)', 'UPDATE'], 'SELECT', (0, 1)),
(['INSERT', 'SELECT (A', 'B)', 'UPDATE'], 'SELECT', (1, 2)),
(['INSERT (A, B)', 'SELECT (A', 'B)', 'UPDATE'], 'INSERT', (0, 0)),
(['INSERT (A', 'B)', 'SELECT (A', 'B)', 'UPDATE'], 'INSERT', (0, 1)),
(['INSERT (A', 'B)', 'SELECT (A', 'B)', 'UPDATE'], 'SELECT', (2, 3)),
(['INSERT (A', 'B)', 'SELECT (A', 'C', 'B)', 'UPDATE'], 'SELECT', (2, 4)),
] ]
) )
def test_has_select_on_col(input_list, output_tuple): def test_has_grant_on_col(input_list, grant, output_tuple):
"""Tests has_select_on_col function.""" """Tests has_grant_on_col function."""
assert has_select_on_col(input_list) == output_tuple assert has_grant_on_col(input_list, grant) == output_tuple
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -64,9 +71,9 @@ def test_has_select_on_col(input_list, output_tuple):
[ [
('SELECT (A)', 'SELECT (A)'), ('SELECT (A)', 'SELECT (A)'),
('SELECT (`A`)', 'SELECT (A)'), ('SELECT (`A`)', 'SELECT (A)'),
('SELECT (A, B)', 'SELECT (A, B)'), ('UPDATE (B, A)', 'UPDATE (A, B)'),
('SELECT (`A`, `B`)', 'SELECT (A, B)'), ('INSERT (`A`, `B`)', 'INSERT (A, B)'),
('SELECT (B, A)', 'SELECT (A, B)'), ('REFERENCES (B, A)', 'REFERENCES (A, B)'),
('SELECT (`B`, `A`)', 'SELECT (A, B)'), ('SELECT (`B`, `A`)', 'SELECT (A, B)'),
('SELECT (`B`, `A`, C)', 'SELECT (A, B, C)'), ('SELECT (`B`, `A`, C)', 'SELECT (A, B, C)'),
] ]
@ -81,10 +88,35 @@ def test_sort_column_order(input_, output):
[ [
(['UPDATE', 'SELECT (C, B, A)'], 1, 1, ['UPDATE', 'SELECT (A, B, C)']), (['UPDATE', 'SELECT (C, B, A)'], 1, 1, ['UPDATE', 'SELECT (A, B, C)']),
(['INSERT', 'SELECT (A', 'B)'], 1, 2, ['INSERT', 'SELECT (A, B)']), (['INSERT', 'SELECT (A', 'B)'], 1, 2, ['INSERT', 'SELECT (A, B)']),
(['SELECT (`A`', 'B)', 'UPDATE'], 0, 1, ['SELECT (A, B)', 'UPDATE']), (
(['INSERT', 'SELECT (`B`', 'A', 'C)', 'UPDATE'], 1, 3, ['INSERT', 'SELECT (A, B, C)', 'UPDATE']), ['SELECT (`A`', 'B)', 'UPDATE', 'REFERENCES (B, A)'], 0, 1,
['SELECT (A, B)', 'UPDATE', 'REFERENCES (B, A)']),
(
['INSERT', 'REFERENCES (`B`', 'A', 'C)', 'UPDATE (A', 'B)'], 1, 3,
['INSERT', 'REFERENCES (A, B, C)', 'UPDATE (A', 'B)']),
] ]
) )
def test_handle_select_on_col(privileges, start, end, output): def test_handle_grant_on_col(privileges, start, end, output):
"""Tests handle_select_on_col function.""" """Tests handle_grant_on_col function."""
assert handle_select_on_col(privileges, start, end) == output assert handle_grant_on_col(privileges, start, end) == output
@pytest.mark.parametrize(
'input_,expected',
[
(['SELECT'], ['SELECT']),
(['SELECT (A, B)'], ['SELECT (A, B)']),
(['SELECT (B, A)'], ['SELECT (A, B)']),
(['UPDATE', 'SELECT (C, B, A)'], ['UPDATE', 'SELECT (A, B, C)']),
(['INSERT', 'SELECT (A', 'B)'], ['INSERT', 'SELECT (A, B)']),
(
['SELECT (`A`', 'B)', 'UPDATE', 'REFERENCES (B, A)'],
['SELECT (A, B)', 'UPDATE', 'REFERENCES (A, B)']),
(
['INSERT', 'REFERENCES (`B`', 'A', 'C)', 'UPDATE (B', 'A)', 'DELETE'],
['INSERT', 'REFERENCES (A, B, C)', 'UPDATE (A, B)', 'DELETE']),
]
)
def test_normalize_col_grants(input_, expected):
"""Tests normalize_col_grants function."""
assert normalize_col_grants(input_) == expected