"""Handles incoming securityhub requests, invokes methods, returns responses."""
import json
from moto.core.responses import BaseResponse
from .models import SecurityHubBackend, securityhub_backends
class SecurityHubResponse(BaseResponse):
def __init__(self) -> None:
super().__init__(service_name="securityhub")
@property
def securityhub_backend(self) -> SecurityHubBackend:
return securityhub_backends[self.current_account][self.region]
def get_findings(self) -> str:
filters = self._get_param("Filters")
sort_criteria = self._get_param("SortCriteria")
max_results = self._get_param("MaxResults")
next_token = self._get_param("NextToken")
findings, next_token = self.securityhub_backend.get_findings(
filters=filters,
sort_criteria=sort_criteria,
max_results=max_results,
next_token=next_token,
)
response = {"Findings": findings, "NextToken": next_token}
return json.dumps(response)
def batch_import_findings(self) -> str:
raw_body = self.body
if isinstance(raw_body, bytes):
raw_body = raw_body.decode("utf-8")
body = json.loads(raw_body)
findings = body.get("Findings", [])
failed_count, success_count, failed_findings = (
self.securityhub_backend.batch_import_findings(
findings=findings,
)
)
return json.dumps(
{
"FailedCount": failed_count,
"FailedFindings": [
{
"ErrorCode": finding.get("ErrorCode"),
"ErrorMessage": finding.get("ErrorMessage"),
"Id": finding.get("Id"),
}
for finding in failed_findings
],
"SuccessCount": success_count,
}
)