"""SecurityHubBackend class with methods for supported APIs.""" from typing import Any, Dict, List, Optional, Tuple from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel from moto.securityhub.exceptions import InvalidInputException from moto.utilities.paginator import paginate class Finding(BaseModel): def __init__(self, finding_id: str, finding_data: Dict[str, Any]): self.id = finding_id self.data = finding_data def as_dict(self) -> Dict[str, Any]: return self.data class SecurityHubBackend(BaseBackend): """Implementation of SecurityHub APIs.""" PAGINATION_MODEL = { "get_findings": { "input_token": "next_token", "limit_key": "max_results", "limit_default": 100, "unique_attribute": "Id", "fail_on_invalid_token": True, } } def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.findings: List[Finding] = [] @paginate(pagination_model=PAGINATION_MODEL) def get_findings( self, filters: Optional[Dict[str, Any]] = None, sort_criteria: Optional[List[Dict[str, str]]] = None, max_results: Optional[int] = None, next_token: Optional[str] = None, ) -> List[Dict[str, str]]: """ Returns findings based on optional filters and sort criteria. """ if max_results is not None: try: max_results = int(max_results) if max_results < 1 or max_results > 100: raise InvalidInputException( op="GetFindings", msg="MaxResults must be a number between 1 and 100", ) except ValueError: raise InvalidInputException( op="GetFindings", msg="MaxResults must be a number greater than 0" ) findings = self.findings # TODO: Apply filters if provided # TODO: Apply sort criteria if provided return [f.as_dict() for f in findings] def batch_import_findings( self, findings: List[Dict[str, Any]] ) -> Tuple[int, int, List[Dict[str, Any]]]: """ Import findings in batch to SecurityHub. Args: findings: List of finding dictionaries to import Returns: Tuple of (failed_count, success_count, failed_findings) """ failed_count = 0 success_count = 0 failed_findings = [] for finding_data in findings: try: if ( not isinstance(finding_data["Resources"], list) or len(finding_data["Resources"]) == 0 ): raise InvalidInputException( op="BatchImportFindings", msg="Finding must contain at least one resource in the Resources array", ) finding_id = finding_data["Id"] existing_finding = next( (f for f in self.findings if f.id == finding_id), None ) if existing_finding: existing_finding.data.update(finding_data) else: new_finding = Finding(finding_id, finding_data) self.findings.append(new_finding) success_count += 1 except Exception as e: failed_count += 1 failed_findings.append( { "Id": finding_data.get("Id", ""), "ErrorCode": "InvalidInput", "ErrorMessage": str(e), } ) return failed_count, success_count, failed_findings securityhub_backends = BackendDict(SecurityHubBackend, "securityhub")
Memory