refactored BOM detection and removal code to be more robust

This commit is contained in:
Joe Zollo 2025-06-13 17:32:25 -04:00 committed by GitHub
commit 8e95aee351
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 155 additions and 97 deletions

View file

@ -1,5 +1,3 @@
minor_changes: minor_changes:
- mssql_script - add ``login_user_alt`` and ``login_password_alt`` parameters to support fallback authentication credentials.
- mssql_script - add ``script_path`` parameter to allow executing SQL scripts from files as an alternative to inline scripts. - mssql_script - add ``script_path`` parameter to allow executing SQL scripts from files as an alternative to inline scripts.
- mssql_script - add ``authenticated_user`` return value to indicate which username was successfully used for authentication.
- mssql_script - add automatic Byte Order Mark (BOM) detection and removal for script files to prevent SQL parsing errors. - mssql_script - add automatic Byte Order Mark (BOM) detection and removal for script files to prevent SQL parsing errors.

View file

@ -39,14 +39,6 @@ options:
login_password: login_password:
description: The password used to authenticate with. description: The password used to authenticate with.
type: str type: str
login_user_alt:
description: The alternate username used to authenticate with if primary login fails.
type: str
version_added: 10.8.0
login_password_alt:
description: The alternate password used to authenticate with if primary login fails.
type: str
version_added: 10.8.0
login_host: login_host:
description: Host running the database. description: Host running the database.
type: str type: str
@ -64,9 +56,10 @@ options:
type: str type: str
script_path: script_path:
description: description:
- Path to file containing the SQL script to be executed. - Path to a file containing the SQL script to be executed.
- Script can contain multiple SQL statements. Multiple Batches can be separated by V(GO) command. - Script can contain multiple SQL statements. Multiple Batches can be separated by V(GO) command.
- Each batch must return at least one result set. - Each batch must return at least one result set.
- If the file contains a Byte Order Mark (BOM), it will be automatically detected and removed.
- Mutually exclusive with O(script). - Mutually exclusive with O(script).
type: path type: path
version_added: 10.8.0 version_added: 10.8.0
@ -98,6 +91,7 @@ requirements:
author: author:
- Kris Budde (@kbudde) - Kris Budde (@kbudde)
- Joe Zollo (@zollo)
""" """
EXAMPLES = r""" EXAMPLES = r"""
@ -110,26 +104,6 @@ EXAMPLES = r"""
db: master db: master
script: "SELECT 1" script: "SELECT 1"
- name: Check DB connection with alternate credentials
community.general.mssql_script:
login_user: "{{ mssql_login_user }}"
login_password: "{{ mssql_login_password }}"
login_user_alt: "{{ mssql_login_user_alt }}"
login_password_alt: "{{ mssql_login_password_alt }}"
login_host: "{{ mssql_host }}"
login_port: "{{ mssql_port }}"
db: master
script: "SELECT 1"
- name: Execute script from file
community.general.mssql_script:
login_user: "{{ mssql_login_user }}"
login_password: "{{ mssql_login_password }}"
login_host: "{{ mssql_host }}"
login_port: "{{ mssql_port }}"
db: master
script_path: /path/to/script.sql
- name: Query with parameter - name: Query with parameter
community.general.mssql_script: community.general.mssql_script:
login_user: "{{ mssql_login_user }}" login_user: "{{ mssql_login_user }}"
@ -141,11 +115,27 @@ EXAMPLES = r"""
params: params:
dbname: msdb dbname: msdb
register: result_params register: result_params
- assert: - assert:
that: that:
- result_params.query_results[0][0][0][0] == 'msdb' - result_params.query_results[0][0][0][0] == 'msdb'
- result_params.query_results[0][0][0][1] == 'ONLINE' - result_params.query_results[0][0][0][1] == 'ONLINE'
- name: Execute script from file with BOM present
community.general.mssql_script:
login_user: "{{ mssql_login_user }}"
login_password: "{{ mssql_login_password }}"
login_host: "{{ mssql_host }}"
login_port: "{{ mssql_port }}"
script_path: /path/to/sql/script_with_bom.sql
register: result_bom
- assert:
that:
- result_bom.script_source = 'file'
- result_bom.bom_removed = True
- result_bom.bom_type = 'UTF-8'
- name: Query within a transaction - name: Query within a transaction
community.general.mssql_script: community.general.mssql_script:
login_user: "{{ mssql_login_user }}" login_user: "{{ mssql_login_user }}"
@ -201,11 +191,27 @@ EXAMPLES = r"""
""" """
RETURN = r""" RETURN = r"""
authenticated_user: script_source:
description: The username that was successfully used to authenticate with the database. description: Source of the executed script.
type: str type: str
returned: always returned: always
sample: "sa" sample: "file"
choices: ["file", "parameter"]
script_path:
description: Path to the script file that was executed.
type: str
returned: when script_path parameter is used
sample: "/path/to/script.sql"
bom_removed:
description: Whether a Byte Order Mark was detected and removed from the script file.
type: bool
returned: when script_path parameter is used and BOM was found
sample: true
bom_type:
description: Type of Byte Order Mark that was detected and removed.
type: str
returned: when script_path parameter is used and BOM was found
sample: "UTF-8"
query_results: query_results:
description: List of batches (queries separated by V(GO) keyword). description: List of batches (queries separated by V(GO) keyword).
type: list type: list
@ -265,6 +271,9 @@ query_results_dict:
from ansible.module_utils.basic import AnsibleModule, missing_required_lib from ansible.module_utils.basic import AnsibleModule, missing_required_lib
import traceback import traceback
import json import json
import os
import codecs
PYMSSQL_IMP_ERR = None PYMSSQL_IMP_ERR = None
try: try:
import pymssql import pymssql
@ -279,13 +288,101 @@ def clean_output(o):
return str(o) return str(o)
def detect_and_remove_bom(content):
"""
Detect and remove Byte Order Mark (BOM) from content.
Returns tuple (cleaned_content, bom_removed, bom_type)
"""
bom_removed = False
bom_type = None
# Common BOM patterns
boms = [
(codecs.BOM_UTF8, 'UTF-8'),
(codecs.BOM_UTF16_LE, 'UTF-16 LE'),
(codecs.BOM_UTF16_BE, 'UTF-16 BE'),
(codecs.BOM_UTF32_LE, 'UTF-32 LE'),
(codecs.BOM_UTF32_BE, 'UTF-32 BE'),
]
# Check if content is bytes or string
if isinstance(content, str):
content_bytes = content.encode('utf-8')
else:
content_bytes = content
# Check for BOM
for bom, bom_name in boms:
if content_bytes.startswith(bom):
content_bytes = content_bytes[len(bom):]
bom_removed = True
bom_type = bom_name
break
# Convert back to string if original was string
if isinstance(content, str):
try:
cleaned_content = content_bytes.decode('utf-8')
except UnicodeDecodeError:
# Fallback to original encoding detection
cleaned_content = content_bytes.decode('utf-8', errors='replace')
else:
cleaned_content = content_bytes
return cleaned_content, bom_removed, bom_type
def read_script_file(script_path):
"""
Read SQL script file and remove BOM if present.
Returns tuple (script_content, bom_info)
"""
if not os.path.exists(script_path):
raise IOError(f"Script file not found: {script_path}")
if not os.path.isfile(script_path):
raise IOError(f"Path is not a file: {script_path}")
try:
with open(script_path, 'rb') as f:
raw_content = f.read()
# Convert to string for processing
try:
script_content = raw_content.decode('utf-8')
except UnicodeDecodeError:
# Try other common encodings
for encoding in ['utf-16', 'latin1', 'cp1252']:
try:
script_content = raw_content.decode(encoding)
break
except UnicodeDecodeError:
continue
else:
# Last resort - decode with error replacement
script_content = raw_content.decode('utf-8', errors='replace')
# Detect and remove BOM
cleaned_content, bom_removed, bom_type = detect_and_remove_bom(
script_content)
bom_info = {
'bom_found': bom_removed,
'bom_type': bom_type,
'file_path': script_path
}
return cleaned_content, bom_info
except IOError as e:
raise IOError(f"Error reading script file \
{script_path}: {str(e)}") from e
def run_module(): def run_module():
module_args = dict( module_args = dict(
name=dict(required=False, aliases=['db'], default=''), name=dict(required=False, aliases=['db'], default=''),
login_user=dict(), login_user=dict(),
login_password=dict(no_log=True), login_password=dict(no_log=True),
login_user_alt=dict(),
login_password_alt=dict(no_log=True),
login_host=dict(required=True), login_host=dict(required=True),
login_port=dict(type='int', default=1433), login_port=dict(type='int', default=1433),
script=dict(), script=dict(),
@ -303,7 +400,7 @@ def run_module():
argument_spec=module_args, argument_spec=module_args,
supports_check_mode=True, supports_check_mode=True,
mutually_exclusive=[('script', 'script_path')], mutually_exclusive=[('script', 'script_path')],
required_one_of=[('script', 'script_path')] required_one_of=[('script', 'script_path')],
) )
if not MSSQL_FOUND: if not MSSQL_FOUND:
module.fail_json(msg=missing_required_lib( module.fail_json(msg=missing_required_lib(
@ -312,8 +409,6 @@ def run_module():
db = module.params['name'] db = module.params['name']
login_user = module.params['login_user'] login_user = module.params['login_user']
login_password = module.params['login_password'] login_password = module.params['login_password']
login_user_alt = module.params['login_user_alt']
login_password_alt = module.params['login_password_alt']
login_host = module.params['login_host'] login_host = module.params['login_host']
login_port = module.params['login_port'] login_port = module.params['login_port']
script = module.params['script'] script = module.params['script']
@ -323,16 +418,20 @@ def run_module():
# Added param to set the transactional mode (true/false) # Added param to set the transactional mode (true/false)
transaction = module.params['transaction'] transaction = module.params['transaction']
# Load script from file if script_path is provided # Handle script source - either from direct script or file
bom_info = None
if script_path: if script_path:
try: try:
with open(script_path, 'r', encoding='utf-8-sig') as f: script, bom_info = read_script_file(script_path)
script = f.read() result['script_source'] = 'file'
# Additional check to ensure no BOM remains after utf-8-sig handling result['script_path'] = script_path
if script.startswith('\uFEFF'): if bom_info['bom_found']:
script = script[1:] result['bom_removed'] = True
except IOError as e: result['bom_type'] = bom_info['bom_type']
module.fail_json(msg="Failed to read script file: %s" % str(e)) except (IOError, OSError) as e:
module.fail_json(msg=f"Error reading script file: {str(e)}")
else:
result['script_source'] = 'parameter'
login_querystring = login_host login_querystring = login_host
if login_port != 1433: if login_port != 1433:
@ -342,55 +441,17 @@ def run_module():
module.fail_json( module.fail_json(
msg="when supplying login_user argument, login_password must also be provided") msg="when supplying login_user argument, login_password must also be provided")
if login_user_alt is not None and login_password_alt is None: try:
module.fail_json( conn = pymssql.connect(
msg="when supplying login_user_alt argument, login_password_alt must also be provided") user=login_user, password=login_password, host=login_querystring, database=db)
cursor = conn.cursor()
# Try primary credentials first, then alternate if primary fails except Exception as e:
conn = None if "Unknown database" in str(e):
authenticated_user = None errno, errstr = e.args
module.fail_json(msg="ERROR: %s %s" % (errno, errstr))
# Try primary credentials else:
if login_user is not None: module.fail_json(msg="unable to connect, check login_user and login_password are correct, or alternatively check your "
try: "@sysconfdir@/freetds.conf / ${HOME}/.freetds.conf")
conn = pymssql.connect(
user=login_user, password=login_password, host=login_querystring, database=db)
authenticated_user = login_user
except Exception as e:
if login_user_alt is not None:
# Try alternate credentials
try:
conn = pymssql.connect(
user=login_user_alt, password=login_password_alt, host=login_querystring, database=db)
authenticated_user = login_user_alt
except Exception as e2:
if "Unknown database" in str(e2):
errno, errstr = e2.args
module.fail_json(msg="ERROR: %s %s" % (errno, errstr))
else:
module.fail_json(msg="unable to connect with primary or alternate credentials, check login credentials are correct, "
"or alternatively check your @sysconfdir@/freetds.conf / ${HOME}/.freetds.conf")
else:
if "Unknown database" in str(e):
errno, errstr = e.args
module.fail_json(msg="ERROR: %s %s" % (errno, errstr))
else:
module.fail_json(msg="unable to connect, check login_user and login_password are correct, or alternatively check your "
"@sysconfdir@/freetds.conf / ${HOME}/.freetds.conf")
else:
# No credentials provided, try to connect without authentication
try:
conn = pymssql.connect(host=login_querystring, database=db)
authenticated_user = "Windows Authentication"
except Exception as e:
if "Unknown database" in str(e):
errno, errstr = e.args
module.fail_json(msg="ERROR: %s %s" % (errno, errstr))
else:
module.fail_json(msg="unable to connect, check login_user and login_password are correct, or alternatively check your "
"@sysconfdir@/freetds.conf / ${HOME}/.freetds.conf")
cursor = conn.cursor()
# If transactional mode is requested, start a transaction # If transactional mode is requested, start a transaction
conn.autocommit(not transaction) conn.autocommit(not transaction)
@ -456,7 +517,6 @@ def run_module():
qry_results = json.loads(json.dumps(query_results, default=clean_output)) qry_results = json.loads(json.dumps(query_results, default=clean_output))
result[query_results_key] = qry_results result[query_results_key] = qry_results
result['authenticated_user'] = authenticated_user
module.exit_json(**result) module.exit_json(**result)