fix 'fail' tests

This commit is contained in:
Sebastian Pfahl 2025-07-15 08:39:05 +02:00
commit 54b4effcc6
3 changed files with 107 additions and 58 deletions

View file

@ -7,23 +7,7 @@ __metaclass__ = type
import pytest
from ansible_collections.community.mysql.plugins.module_utils.implementations.mariadb.replication import uses_replica_terminology
from ..utils import dummy_cursor_class
class MockCursor:
def __init__(self, status="ONLINE"):
self.status = status
self.executed_queries = []
def execute(self, query):
if self.status == "ERROR":
raise Exception("Mocked execution error")
self.executed_queries.append(query)
def fetchone(self):
if "group_replication_status" in self.executed_queries[-1]:
return ["group_replication_status", self.status]
return None
from ..utils import dummy_cursor_class, MockCursor
@pytest.mark.parametrize(
@ -89,25 +73,55 @@ def test_start_group_replication_fail():
"""Test startgroupreplication function with failure."""
from ansible_collections.community.mysql.plugins.modules.mysql_replication import startgroupreplication
cursor = MockCursor(status="ERROR")
module = type('obj', (object,), {
'fail_json': lambda msg: None,
})
# Create a mock mysql_driver with a Warning attribute
class MockDriver:
Warning = MockCursor.Warning
result = startgroupreplication(module, cursor, [], True)
# Save the original mysql_driver
from ansible_collections.community.mysql.plugins.modules import mysql_replication
original_driver = mysql_replication.mysql_driver
assert result is False
try:
# Replace with our mock driver
mysql_replication.mysql_driver = MockDriver
cursor = MockCursor(status="ERROR")
module = type('obj', (object,), {
'fail_json': lambda msg: None,
})
result = startgroupreplication(module, cursor, [], True)
assert result is False
finally:
# Restore the original driver
mysql_replication.mysql_driver = original_driver
def test_stop_group_replication_fail():
"""Test stopgroupreplication function with failure."""
from ansible_collections.community.mysql.plugins.modules.mysql_replication import stopgroupreplication
cursor = MockCursor(status="ERROR")
module = type('obj', (object,), {
'fail_json': lambda msg: None,
})
# Create a mock mysql_driver with a Warning attribute
class MockDriver:
Warning = MockCursor.Warning
result = stopgroupreplication(module, cursor, True)
# Save the original mysql_driver
from ansible_collections.community.mysql.plugins.modules import mysql_replication
original_driver = mysql_replication.mysql_driver
assert result is False
try:
# Replace with our mock driver
mysql_replication.mysql_driver = MockDriver
cursor = MockCursor(status="ERROR")
module = type('obj', (object,), {
'fail_json': lambda msg: None,
})
result = stopgroupreplication(module, cursor, True)
assert result is False
finally:
# Restore the original driver
mysql_replication.mysql_driver = original_driver

View file

@ -7,23 +7,7 @@ __metaclass__ = type
import pytest
from ansible_collections.community.mysql.plugins.module_utils.implementations.mysql.replication import uses_replica_terminology
from ..utils import dummy_cursor_class
class MockCursor:
def __init__(self, status="ONLINE"):
self.status = status
self.executed_queries = []
def execute(self, query):
if self.status == "ERROR":
raise Exception("Mocked execution error")
self.executed_queries.append(query)
def fetchone(self):
if "group_replication_status" in self.executed_queries[-1]:
return ["group_replication_status", self.status]
return None
from ..utils import dummy_cursor_class, MockCursor
@pytest.mark.parametrize(
@ -93,25 +77,55 @@ def test_start_group_replication_fail():
"""Test startgroupreplication function with failure."""
from ansible_collections.community.mysql.plugins.modules.mysql_replication import startgroupreplication
cursor = MockCursor(status="ERROR")
module = type('obj', (object,), {
'fail_json': lambda msg: None,
})
# Create a mock mysql_driver with a Warning attribute
class MockDriver:
Warning = MockCursor.Warning
result = startgroupreplication(module, cursor, [], True)
# Save the original mysql_driver
from ansible_collections.community.mysql.plugins.modules import mysql_replication
original_driver = mysql_replication.mysql_driver
assert result is False
try:
# Replace with our mock driver
mysql_replication.mysql_driver = MockDriver
cursor = MockCursor(status="ERROR")
module = type('obj', (object,), {
'fail_json': lambda msg: None,
})
result = startgroupreplication(module, cursor, [], True)
assert result is False
finally:
# Restore the original driver
mysql_replication.mysql_driver = original_driver
def test_stop_group_replication_fail():
"""Test stopgroupreplication function with failure."""
from ansible_collections.community.mysql.plugins.modules.mysql_replication import stopgroupreplication
cursor = MockCursor(status="ERROR")
module = type('obj', (object,), {
'fail_json': lambda msg: None,
})
# Create a mock mysql_driver with a Warning attribute
class MockDriver:
Warning = MockCursor.Warning
result = stopgroupreplication(module, cursor, True)
# Save the original mysql_driver
from ansible_collections.community.mysql.plugins.modules import mysql_replication
original_driver = mysql_replication.mysql_driver
assert result is False
try:
# Replace with our mock driver
mysql_replication.mysql_driver = MockDriver
cursor = MockCursor(status="ERROR")
module = type('obj', (object,), {
'fail_json': lambda msg: None,
})
result = stopgroupreplication(module, cursor, True)
assert result is False
finally:
# Restore the original driver
mysql_replication.mysql_driver = original_driver

View file

@ -17,3 +17,24 @@ class dummy_cursor_class():
elif self.ret_val_type == 'list':
return [self.output]
class MockCursor:
def __init__(self, status="ONLINE"):
self.status = status
self.executed_queries = []
def execute(self, query):
self.executed_queries.append(query)
if self.status == "ERROR":
# Create a custom exception that mimics mysql_driver.Warning
class MockWarning(Exception):
pass
# Make it available as a class attribute for tests to use
MockCursor.Warning = MockWarning
raise MockWarning("Mocked execution error")
def fetchone(self):
if len(self.executed_queries) > 0 and "group_replication_status" in self.executed_queries[-1]:
return ["group_replication_status", self.status]
return None