Fix Log Analytics Ingestion pep8 tests

This commit is contained in:
Cline, Wade 2025-06-26 14:00:01 -07:00
commit 315716aeb0
2 changed files with 16 additions and 12 deletions

View file

@ -155,9 +155,10 @@ from ansible.utils.display import Display
display = Display()
class AzureLogAnalyticsIngestionSource(object):
def __init__(self, dce_url, dcr_id, disable_attempts, disable_on_failure, client_id, client_secret, tenant_id, stream_name, include_task_args, include_content):
class AzureLogAnalyticsIngestionSource(object):
def __init__(self, dce_url, dcr_id, disable_attempts, disable_on_failure, client_id, client_secret, tenant_id, stream_name, include_task_args,
include_content):
self.dce_url = dce_url
self.dcr_id = dcr_id
self.disabled = False
@ -185,21 +186,22 @@ class AzureLogAnalyticsIngestionSource(object):
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret,
# The scope value comes from https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview#headers and https://learn.microsoft.com/en-us/entra/identity-platform/scopes-oidc#the-default-scope
# The scope value comes from https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview#headers
# and https://learn.microsoft.com/en-us/entra/identity-platform/scopes-oidc#the-default-scope
'scope': 'https://monitor.azure.com/.default'
}
response = self.requests_session.post(url, data=payload)
response.raise_for_status()
self.token_expiration_time=datetime.now()+timedelta(seconds=response.json().get("expires_in"))
self.token_expiration_time = datetime.now() + timedelta(seconds=response.json().get("expires_in"))
return response.json().get('access_token')
def is_token_valid(self):
return True if (datetime.now() + timedelta(seconds=10))< self.token_expiration_time else False
return True if (datetime.now() + timedelta(seconds=10)) < self.token_expiration_time else False
# Method to send event data to the Azure Logs Ingestion API
# This replaces the legacy API call and now uses the Logs Ingestion API endpoint
def send_event(self, event_data):
if not(self.is_token_valid()):
if not self.is_token_valid():
self.bearer_token = self.get_bearer_token()
ingestion_url = f"{self.dce_url}/dataCollectionRules/{self.dcr_id}/streams/{self.stream_name}?api-version=2023-01-01"
headers = {
@ -241,12 +243,12 @@ class AzureLogAnalyticsIngestionSource(object):
if result._task._role:
ansible_role = str(result._task._role)
#Include/Exclude task args
if not(self.include_task_args):
# Include/Exclude task args
if not self.include_task_args:
result._task_fields.pop('args', None)
#Include/Exclude content
if not(self.include_content):
# Include/Exclude content
if not self.include_content:
result._result.pop('content', None)
# Build the event data
@ -263,7 +265,7 @@ class AzureLogAnalyticsIngestionSource(object):
"Result": result._result,
"Session": self.session
}]
#Display event data
# Display event data
display.vvv(f"Event Data :{str(event_data)}")
# Send the event data using the new Logs Ingestion API method
@ -300,7 +302,8 @@ class CallbackModule(CallbackBase):
# Initialize the AzureLogAnalyticsIngestionSource with the new settings
self.azure_loganalytics = AzureLogAnalyticsIngestionSource(
self.dce_url, self.dcr_id, self.disable_attempts, self.disable_on_failure, self.client_id, self.client_secret, self.tenant_id, self.stream_name, self.include_task_args, self.include_content
self.dce_url, self.dcr_id, self.disable_attempts, self.disable_on_failure, self.client_id, self.client_secret, self.tenant_id, self.stream_name,
self.include_task_args, self.include_content
)
# Input checks

View file

@ -12,6 +12,7 @@ import urllib
from ansible.executor.task_result import TaskResult
from ansible_collections.community.general.plugins.callback.loganalytics_ingestion import AzureLogAnalyticsIngestionSource
class TestAzureLogAnalyticsIngestion(unittest.TestCase):
dce_url = "https://fake.dce_url.ansible.com"
dcr_id = "fake-dcr-id"