mysql_role: new module (#189)

* mysql_role: new module

* fixes

* fixes

* Add the role class

* Check if role exists

* role.add()

* role.__get_members

* tmp

* tmp

* Change tests

* Fix

* Fix

* add_members()

* get_privs()

* tmp

* __extract_grants() filler version

* Before big work

* tmp

* drop()

* tmp

* tmp

* Big changes

* Fix

* append_members, detach_members, append_privs

* tmp

* admin option

* Add tests

* Add tests

* Fix tests

* Remove debug warning

* Fix tests

* Add documentation

* Fix MariaDB case

* Fix MariaDB

* Fix MariaDB

* Fix MariaDB

* Fix MariaDB

* Fix MariaDB

* Fix

* Fix

* Remove debug warning

* Add try-except block

* tmp

* tmp

* tmp

* Fix

* Add err handling

* Add user check

* Check admin in db

* Fix CI

* Fix CI

* Fix CI

* Fix CI

* Fix

* Add mutually exclusive options

* Small refactoring, documenting

* Documenting, refactoring

* Change docs

* Refactoring

* Refactoring

* Refactoring

* Add unit tests

* Update README.md
This commit is contained in:
Andrew Klychkov 2021-08-10 14:30:34 +03:00 committed by GitHub
parent 9055bb4c8c
commit ce2b269f84
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 3273 additions and 825 deletions

View file

@ -0,0 +1,119 @@
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import pytest
from ansible_collections.community.mysql.plugins.modules.mysql_role import (
MariaDBQueryBuilder,
MySQLQueryBuilder,
normalize_users,
)
# TODO: Also cover DbServer, Role, MySQLRoleImpl, MariaDBRoleImpl classes
class Module():
def __init__(self):
self.msg = None
def fail_json(self, msg=None):
self.msg = msg
module = Module()
@pytest.mark.parametrize(
'builder,output',
[
(MariaDBQueryBuilder('role0'), ("SELECT count(*) FROM mysql.user WHERE user = %s AND is_role = 'Y'", ('role0'))),
(MySQLQueryBuilder('role0', '%'), ('SELECT count(*) FROM mysql.user WHERE user = %s AND host = %s', ('role0', '%'))),
(MariaDBQueryBuilder('role1'), ("SELECT count(*) FROM mysql.user WHERE user = %s AND is_role = 'Y'", ('role1'))),
(MySQLQueryBuilder('role1', 'fake'), ('SELECT count(*) FROM mysql.user WHERE user = %s AND host = %s', ('role1', 'fake'))),
]
)
def test_query_builder_role_exists(builder, output):
"""Test role_exists method of the builder classes."""
assert builder.role_exists() == output
@pytest.mark.parametrize(
'builder,admin,output',
[
(MariaDBQueryBuilder('role0'), None, ('CREATE ROLE %s', ('role0',))),
(MySQLQueryBuilder('role0', '%'), None, ('CREATE ROLE %s', ('role0',))),
(MariaDBQueryBuilder('role1'), None, ('CREATE ROLE %s', ('role1',))),
(MySQLQueryBuilder('role1', 'fake'), None, ('CREATE ROLE %s', ('role1',))),
(MariaDBQueryBuilder('role0'), ('user0', ''), ('CREATE ROLE %s WITH ADMIN %s', ('role0', 'user0'))),
(MySQLQueryBuilder('role0', '%'), ('user0', ''), ('CREATE ROLE %s', ('role0',))),
(MariaDBQueryBuilder('role1'), ('user0', 'localhost'), ('CREATE ROLE %s WITH ADMIN %s@%s', ('role1', 'user0', 'localhost'))),
(MySQLQueryBuilder('role1', 'fake'), ('user0', 'localhost'), ('CREATE ROLE %s', ('role1',))),
]
)
def test_query_builder_role_create(builder, admin, output):
"""Test role_create method of the builder classes."""
assert builder.role_create(admin) == output
@pytest.mark.parametrize(
'builder,user,output',
[
(MariaDBQueryBuilder('role0'), ('user0', ''), ('GRANT %s TO %s', ('role0', 'user0'))),
(MySQLQueryBuilder('role0', '%'), ('user0', ''), ('GRANT %s@%s TO %s', ('role0', '%', 'user0'))),
(MariaDBQueryBuilder('role1'), ('user0', 'localhost'), ('GRANT %s TO %s@%s', ('role1', 'user0', 'localhost'))),
(MySQLQueryBuilder('role1', 'fake'), ('user0', 'localhost'), ('GRANT %s@%s TO %s@%s', ('role1', 'fake', 'user0', 'localhost'))),
]
)
def test_query_builder_role_grant(builder, user, output):
"""Test role_grant method of the builder classes."""
assert builder.role_grant(user) == output
@pytest.mark.parametrize(
'builder,user,output',
[
(MariaDBQueryBuilder('role0'), ('user0', ''), ('REVOKE %s FROM %s', ('role0', 'user0'))),
(MySQLQueryBuilder('role0', '%'), ('user0', ''), ('REVOKE %s@%s FROM %s', ('role0', '%', 'user0'))),
(MariaDBQueryBuilder('role1'), ('user0', 'localhost'), ('REVOKE %s FROM %s@%s', ('role1', 'user0', 'localhost'))),
(MySQLQueryBuilder('role1', 'fake'), ('user0', 'localhost'), ('REVOKE %s@%s FROM %s@%s', ('role1', 'fake', 'user0', 'localhost'))),
]
)
def test_query_builder_role_revoke(builder, user, output):
"""Test role_revoke method of the builder classes."""
assert builder.role_revoke(user) == output
@pytest.mark.parametrize(
'input_,output,is_mariadb',
[
(['user'], [('user', '')], True),
(['user'], [('user', '%')], False),
(['user@%'], [('user', '%')], True),
(['user@%'], [('user', '%')], False),
(['user@localhost'], [('user', 'localhost')], True),
(['user@localhost'], [('user', 'localhost')], False),
(['user', 'user@%'], [('user', ''), ('user', '%')], True),
(['user', 'user@%'], [('user', '%'), ('user', '%')], False),
]
)
def test_normalize_users(input_, output, is_mariadb):
"""Test normalize_users function with expected input."""
assert normalize_users(None, input_, is_mariadb) == output
@pytest.mark.parametrize(
'input_,is_mariadb,err_msg',
[
([''], True, "Member's name cannot be empty."),
([''], False, "Member's name cannot be empty."),
([None], True, "Error occured while parsing"),
([None], False, "Error occured while parsing"),
]
)
def test_normalize_users_failing(input_, is_mariadb, err_msg):
"""Test normalize_users function with wrong input."""
normalize_users(module, input_, is_mariadb)
assert err_msg in module.msg

View file

@ -1,165 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import pytest
try:
from unittest.mock import MagicMock
except ImportError:
from mock import MagicMock
from ansible_collections.community.mysql.plugins.modules.mysql_user import (
handle_grant_on_col,
has_grant_on_col,
normalize_col_grants,
sort_column_order,
handle_requiressl_in_priv_string
)
from ..utils import dummy_cursor_class
@pytest.mark.parametrize(
'input_list,grant,output_tuple',
[
(['INSERT', 'DELETE'], 'INSERT', (None, None)),
(['SELECT', 'UPDATE'], 'SELECT', (None, None)),
(['INSERT', 'UPDATE', 'INSERT', 'DELETE'], 'DELETE', (None, None)),
(['just', 'a', 'random', 'text'], 'blabla', (None, None)),
(['SELECT (A, B, C)'], 'SELECT', (0, 0)),
(['UPDATE', 'SELECT (A, B, C)'], 'SELECT', (1, 1)),
(['UPDATE', 'REFERENCES (A, B, C)'], 'REFERENCES', (1, 1)),
(['SELECT', 'UPDATE (A, B, C)'], 'UPDATE', (1, 1)),
(['INSERT', 'SELECT (A', 'B)'], 'SELECT', (1, 2)),
(['SELECT (A', 'B)', 'UPDATE'], 'SELECT', (0, 1)),
(['INSERT', 'SELECT (A', 'B)', 'UPDATE'], 'SELECT', (1, 2)),
(['INSERT (A, B)', 'SELECT (A', 'B)', 'UPDATE'], 'INSERT', (0, 0)),
(['INSERT (A', 'B)', 'SELECT (A', 'B)', 'UPDATE'], 'INSERT', (0, 1)),
(['INSERT (A', 'B)', 'SELECT (A', 'B)', 'UPDATE'], 'SELECT', (2, 3)),
(['INSERT (A', 'B)', 'SELECT (A', 'C', 'B)', 'UPDATE'], 'SELECT', (2, 4)),
]
)
def test_has_grant_on_col(input_list, grant, output_tuple):
"""Tests has_grant_on_col function."""
assert has_grant_on_col(input_list, grant) == output_tuple
@pytest.mark.parametrize(
'input_,output',
[
('SELECT (A)', 'SELECT (A)'),
('SELECT (`A`)', 'SELECT (A)'),
('UPDATE (B, A)', 'UPDATE (A, B)'),
('INSERT (`A`, `B`)', 'INSERT (A, B)'),
('REFERENCES (B, A)', 'REFERENCES (A, B)'),
('SELECT (`B`, `A`)', 'SELECT (A, B)'),
('SELECT (`B`, `A`, C)', 'SELECT (A, B, C)'),
]
)
def test_sort_column_order(input_, output):
"""Tests sort_column_order function."""
assert sort_column_order(input_) == output
@pytest.mark.parametrize(
'privileges,start,end,output',
[
(['UPDATE', 'SELECT (C, B, A)'], 1, 1, ['UPDATE', 'SELECT (A, B, C)']),
(['INSERT', 'SELECT (A', 'B)'], 1, 2, ['INSERT', 'SELECT (A, B)']),
(
['SELECT (`A`', 'B)', 'UPDATE', 'REFERENCES (B, A)'], 0, 1,
['SELECT (A, B)', 'UPDATE', 'REFERENCES (B, A)']),
(
['INSERT', 'REFERENCES (`B`', 'A', 'C)', 'UPDATE (A', 'B)'], 1, 3,
['INSERT', 'REFERENCES (A, B, C)', 'UPDATE (A', 'B)']),
]
)
def test_handle_grant_on_col(privileges, start, end, output):
"""Tests handle_grant_on_col function."""
assert handle_grant_on_col(privileges, start, end) == output
@pytest.mark.parametrize(
'input_tuple,output_tuple',
[
(('*.*:REQUIRESSL', None), (None, 'SSL')),
(('*.*:ALL,REQUIRESSL', None), ('*.*:ALL', 'SSL')),
(('*.*:REQUIRESSL,ALL', None), ('*.*:ALL', 'SSL')),
(('*.*:ALL,REQUIRESSL,GRANT', None), ('*.*:ALL,GRANT', 'SSL')),
(('*.*:ALL,REQUIRESSL,GRANT/a.b:USAGE', None), ('*.*:ALL,GRANT/a.b:USAGE', 'SSL')),
(('*.*:REQUIRESSL', 'X509'), (None, 'X509')),
(('*.*:ALL,REQUIRESSL', 'X509'), ('*.*:ALL', 'X509')),
(('*.*:REQUIRESSL,ALL', 'X509'), ('*.*:ALL', 'X509')),
(('*.*:ALL,REQUIRESSL,GRANT', 'X509'), ('*.*:ALL,GRANT', 'X509')),
(('*.*:ALL,REQUIRESSL,GRANT/a.b:USAGE', 'X509'), ('*.*:ALL,GRANT/a.b:USAGE', 'X509')),
(('*.*:REQUIRESSL', {
'subject': '/CN=alice/O=MyDom, Inc./C=US/ST=Oregon/L=Portland',
'cipher': 'ECDHE-ECDSA-AES256-SHA384',
'issuer': '/CN=org/O=MyDom, Inc./C=US/ST=Oregon/L=Portland'
}), (None, {
'subject': '/CN=alice/O=MyDom, Inc./C=US/ST=Oregon/L=Portland',
'cipher': 'ECDHE-ECDSA-AES256-SHA384',
'issuer': '/CN=org/O=MyDom, Inc./C=US/ST=Oregon/L=Portland'
})),
(('*.*:ALL,REQUIRESSL', {
'subject': '/CN=alice/O=MyDom, Inc./C=US/ST=Oregon/L=Portland',
'cipher': 'ECDHE-ECDSA-AES256-SHA384',
'issuer': '/CN=org/O=MyDom, Inc./C=US/ST=Oregon/L=Portland'
}), ('*.*:ALL', {
'subject': '/CN=alice/O=MyDom, Inc./C=US/ST=Oregon/L=Portland',
'cipher': 'ECDHE-ECDSA-AES256-SHA384',
'issuer': '/CN=org/O=MyDom, Inc./C=US/ST=Oregon/L=Portland'
})),
(('*.*:REQUIRESSL,ALL', {
'subject': '/CN=alice/O=MyDom, Inc./C=US/ST=Oregon/L=Portland',
'cipher': 'ECDHE-ECDSA-AES256-SHA384',
'issuer': '/CN=org/O=MyDom, Inc./C=US/ST=Oregon/L=Portland'
}), ('*.*:ALL', {
'subject': '/CN=alice/O=MyDom, Inc./C=US/ST=Oregon/L=Portland',
'cipher': 'ECDHE-ECDSA-AES256-SHA384',
'issuer': '/CN=org/O=MyDom, Inc./C=US/ST=Oregon/L=Portland'
})),
(('*.*:ALL,REQUIRESSL,GRANT', {
'subject': '/CN=alice/O=MyDom, Inc./C=US/ST=Oregon/L=Portland',
'cipher': 'ECDHE-ECDSA-AES256-SHA384',
'issuer': '/CN=org/O=MyDom, Inc./C=US/ST=Oregon/L=Portland'
}), ('*.*:ALL,GRANT', {
'subject': '/CN=alice/O=MyDom, Inc./C=US/ST=Oregon/L=Portland',
'cipher': 'ECDHE-ECDSA-AES256-SHA384',
'issuer': '/CN=org/O=MyDom, Inc./C=US/ST=Oregon/L=Portland'
})),
(('*.*:ALL,REQUIRESSL,GRANT/a.b:USAGE', {
'subject': '/CN=alice/O=MyDom, Inc./C=US/ST=Oregon/L=Portland',
'cipher': 'ECDHE-ECDSA-AES256-SHA384',
'issuer': '/CN=org/O=MyDom, Inc./C=US/ST=Oregon/L=Portland'
}), ('*.*:ALL,GRANT/a.b:USAGE', {
'subject': '/CN=alice/O=MyDom, Inc./C=US/ST=Oregon/L=Portland',
'cipher': 'ECDHE-ECDSA-AES256-SHA384',
'issuer': '/CN=org/O=MyDom, Inc./C=US/ST=Oregon/L=Portland'
}))
]
)
def test_handle_requiressl_in_priv_string(input_tuple, output_tuple):
"""Tests the handle_requiressl_in_priv_string funciton."""
assert handle_requiressl_in_priv_string(MagicMock(), *input_tuple) == output_tuple
@pytest.mark.parametrize(
'input_,expected',
[
(['SELECT'], ['SELECT']),
(['SELECT (A, B)'], ['SELECT (A, B)']),
(['SELECT (B, A)'], ['SELECT (A, B)']),
(['UPDATE', 'SELECT (C, B, A)'], ['UPDATE', 'SELECT (A, B, C)']),
(['INSERT', 'SELECT (A', 'B)'], ['INSERT', 'SELECT (A, B)']),
(
['SELECT (`A`', 'B)', 'UPDATE', 'REFERENCES (B, A)'],
['SELECT (A, B)', 'UPDATE', 'REFERENCES (A, B)']),
(
['INSERT', 'REFERENCES (`B`', 'A', 'C)', 'UPDATE (B', 'A)', 'DELETE'],
['INSERT', 'REFERENCES (A, B, C)', 'UPDATE (A, B)', 'DELETE']),
]
)
def test_normalize_col_grants(input_, expected):
"""Tests normalize_col_grants function."""
assert normalize_col_grants(input_) == expected