import json
from urllib.parse import unquote
from moto.core.responses import BaseResponse
from .models import GuardDutyBackend, guardduty_backends
class GuardDutyResponse(BaseResponse):
def __init__(self) -> None:
super().__init__(service_name="guardduty")
@property
def guardduty_backend(self) -> GuardDutyBackend:
return guardduty_backends[self.current_account][self.region]
def create_filter(self) -> str:
detector_id = self.path.split("/")[-2]
name = self._get_param("name")
action = self._get_param("action")
description = self._get_param("description")
finding_criteria = self._get_param("findingCriteria")
rank = self._get_param("rank")
self.guardduty_backend.create_filter(
detector_id, name, action, description, finding_criteria, rank
)
return json.dumps({"name": name})
def create_detector(self) -> str:
enable = self._get_param("enable")
finding_publishing_frequency = self._get_param("findingPublishingFrequency")
data_sources = self._get_param("dataSources")
tags = self._get_param("tags")
features = self._get_param("features")
detector_id = self.guardduty_backend.create_detector(
enable, finding_publishing_frequency, data_sources, tags, features
)
return json.dumps(dict(detectorId=detector_id))
def delete_detector(self) -> str:
detector_id = self.path.split("/")[-1]
self.guardduty_backend.delete_detector(detector_id)
return "{}"
def delete_filter(self) -> str:
detector_id = self.path.split("/")[-3]
filter_name = unquote(self.path.split("/")[-1])
self.guardduty_backend.delete_filter(detector_id, filter_name)
return "{}"
def enable_organization_admin_account(self) -> str:
admin_account = self._get_param("adminAccountId")
self.guardduty_backend.enable_organization_admin_account(admin_account)
return "{}"
def list_organization_admin_accounts(self) -> str:
account_ids = self.guardduty_backend.list_organization_admin_accounts()
accounts = [
{"adminAccountId": a, "adminStatus": "ENABLED"} for a in account_ids
]
return json.dumps({"adminAccounts": accounts})
def list_detectors(self) -> str:
detector_ids = self.guardduty_backend.list_detectors()
return json.dumps({"detectorIds": detector_ids})
def get_detector(self) -> str:
detector_id = self.path.split("/")[-1]
detector = self.guardduty_backend.get_detector(detector_id)
return json.dumps(detector.to_json())
def get_filter(self) -> str:
detector_id = self.path.split("/")[-3]
filter_name = unquote(self.path.split("/")[-1])
_filter = self.guardduty_backend.get_filter(detector_id, filter_name)
return json.dumps(_filter.to_json())
def update_detector(self) -> str:
detector_id = self.path.split("/")[-1]
enable = self._get_param("enable")
finding_publishing_frequency = self._get_param("findingPublishingFrequency")
data_sources = self._get_param("dataSources")
features = self._get_param("features")
self.guardduty_backend.update_detector(
detector_id, enable, finding_publishing_frequency, data_sources, features
)
return "{}"
def update_filter(self) -> str:
detector_id = self.path.split("/")[-3]
filter_name = unquote(self.path.split("/")[-1])
action = self._get_param("action")
description = self._get_param("description")
finding_criteria = self._get_param("findingCriteria")
rank = self._get_param("rank")
self.guardduty_backend.update_filter(
detector_id,
filter_name,
action=action,
description=description,
finding_criteria=finding_criteria,
rank=rank,
)
return json.dumps({"name": filter_name})
def get_administrator_account(self) -> str:
"""Get administrator account details."""
detector_id = self.path.split("/")[-2]
result = self.guardduty_backend.get_administrator_account(detector_id)
return json.dumps(result)