import json import re from typing import Any, Callable, Optional from moto.core.responses import BaseResponse from .exceptions import InvalidParameterException from .models import LogsBackend, logs_backends # See http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/Welcome.html REGEX_LOG_GROUP_NAME = r"[-._\/#A-Za-z0-9]+" def validate_param( param_name: str, param_value: str, constraint: str, constraint_expression: Callable[[str], bool], pattern: Optional[str] = None, ) -> None: try: assert constraint_expression(param_value) except (AssertionError, TypeError): raise InvalidParameterException( constraint=constraint, parameter=param_name, value=param_value ) if pattern and param_value: try: assert re.fullmatch(pattern, param_value) except (AssertionError, TypeError): raise InvalidParameterException( constraint=f"Must match pattern: {pattern}", parameter=param_name, value=param_value, ) class LogsResponse(BaseResponse): def __init__(self) -> None: super().__init__(service_name="logs") @property def logs_backend(self) -> LogsBackend: return logs_backends[self.current_account][self.region] def _get_validated_param( self, param: str, constraint: str, constraint_expression: Callable[[str], bool], pattern: Optional[str] = None, ) -> Any: param_value = self._get_param(param) validate_param(param, param_value, constraint, constraint_expression, pattern) return param_value def put_metric_filter(self) -> str: filter_name = self._get_validated_param( "filterName", "Minimum length of 1. Maximum length of 512.", lambda x: 1 <= len(x) <= 512, pattern="[^:*]*", ) filter_pattern = self._get_validated_param( "filterPattern", "Minimum length of 0. Maximum length of 1024.", lambda x: 0 <= len(x) <= 1024, ) log_group_name = self._get_validated_param( "logGroupName", "Minimum length of 1. Maximum length of 512.", lambda x: 1 <= len(x) <= 512, pattern=REGEX_LOG_GROUP_NAME, ) metric_transformations = self._get_validated_param( "metricTransformations", "Fixed number of 1 item.", lambda x: len(x) == 1 ) self.logs_backend.put_metric_filter( filter_name, filter_pattern, log_group_name, metric_transformations ) return "" def describe_metric_filters(self) -> str: filter_name_prefix = self._get_validated_param( "filterNamePrefix", "Minimum length of 1. Maximum length of 512.", lambda x: x is None or 1 <= len(x) <= 512, pattern="[^:*]*", ) log_group_name = self._get_validated_param( "logGroupName", "Minimum length of 1. Maximum length of 512", lambda x: x is None or 1 <= len(x) <= 512, pattern=REGEX_LOG_GROUP_NAME, ) metric_name = self._get_validated_param( "metricName", "Maximum length of 255.", lambda x: x is None or len(x) <= 255, pattern="[^:*$]*", ) metric_namespace = self._get_validated_param( "metricNamespace", "Maximum length of 255.", lambda x: x is None or len(x) <= 255, pattern="[0-9A-Za-z\\.\\-_/#:]*", ) next_token = self._get_validated_param( "nextToken", "Minimum length of 1.", lambda x: x is None or 1 <= len(x) ) if metric_name and not metric_namespace: raise InvalidParameterException( constraint=f'{"If you include the metricName parameter in your request, "}' f'{"you must also include the metricNamespace parameter."}', parameter="metricNamespace", value=metric_namespace, ) if metric_namespace and not metric_name: raise InvalidParameterException( constraint=f'{"If you include the metricNamespace parameter in your request, "}' f'{"you must also include the metricName parameter."}', parameter="metricName", value=metric_name, ) filters = self.logs_backend.describe_metric_filters( filter_name_prefix, log_group_name, metric_name, metric_namespace ) return json.dumps({"metricFilters": filters, "nextToken": next_token}) def delete_metric_filter(self) -> str: filter_name = self._get_validated_param( "filterName", "Minimum length of 1. Maximum length of 512.", lambda x: 1 <= len(x) <= 512, pattern="[^:*]*$", ) log_group_name = self._get_validated_param( "logGroupName", "Minimum length of 1. Maximum length of 512.", lambda x: 1 <= len(x) <= 512, pattern=REGEX_LOG_GROUP_NAME, ) self.logs_backend.delete_metric_filter(filter_name, log_group_name) return "" def create_log_group(self) -> str: log_group_name = self._get_param("logGroupName") tags = self._get_param("tags") kms_key_id = self._get_param("kmsKeyId") self.logs_backend.create_log_group(log_group_name, tags, kmsKeyId=kms_key_id) return "" def delete_log_group(self) -> str: log_group_name = self._get_param("logGroupName") self.logs_backend.delete_log_group(log_group_name) return "" def describe_log_groups(self) -> str: log_group_name_prefix = self._get_param("logGroupNamePrefix") next_token = self._get_param("nextToken") limit = self._get_param("limit", 50) if limit > 50: raise InvalidParameterException( constraint="Member must have value less than or equal to 50", parameter="limit", value=limit, ) groups, next_token = self.logs_backend.describe_log_groups( limit=limit, log_group_name_prefix=log_group_name_prefix, next_token=next_token, ) result = {"logGroups": [g.to_describe_dict() for g in groups]} if next_token: result["nextToken"] = next_token return json.dumps(result) def put_destination(self) -> str: destination_name = self._get_param("destinationName") role_arn = self._get_param("roleArn") target_arn = self._get_param("targetArn") tags = self._get_param("tags") destination = self.logs_backend.put_destination( destination_name, role_arn, target_arn, tags, ) result = {"destination": destination.to_dict()} return json.dumps(result) def delete_destination(self) -> str: destination_name = self._get_param("destinationName") self.logs_backend.delete_destination(destination_name) return "" def describe_destinations(self) -> str: destination_name_prefix = self._get_param("DestinationNamePrefix") limit = self._get_param("limit", 50) next_token = self._get_param("nextToken") destinations, next_token = self.logs_backend.describe_destinations( destination_name_prefix, int(limit), next_token ) result = {"destinations": destinations, "nextToken": next_token} return json.dumps(result) def put_destination_policy(self) -> str: access_policy = self._get_param("accessPolicy") destination_name = self._get_param("destinationName") self.logs_backend.put_destination_policy(destination_name, access_policy) return "" def create_log_stream(self) -> str: log_group_name = self._get_param("logGroupName") log_stream_name = self._get_param("logStreamName") self.logs_backend.create_log_stream(log_group_name, log_stream_name) return "" def delete_log_stream(self) -> str: log_group_name = self._get_param("logGroupName") log_stream_name = self._get_param("logStreamName") self.logs_backend.delete_log_stream(log_group_name, log_stream_name) return "" def describe_log_streams(self) -> str: log_group_name = self._get_param("logGroupName") log_group_id = self._get_param("logGroupIdentifier") log_stream_name_prefix = self._get_param("logStreamNamePrefix", "") descending = self._get_param("descending", False) limit = self._get_param("limit", 50) next_token = self._get_param("nextToken") order_by = self._get_param("orderBy", "LogStreamName") streams, next_token = self.logs_backend.describe_log_streams( descending=descending, limit=limit, log_group_name=log_group_name, log_group_id=log_group_id, log_stream_name_prefix=log_stream_name_prefix, next_token=next_token, order_by=order_by, ) return json.dumps({"logStreams": streams, "nextToken": next_token}) def put_log_events(self) -> str: log_group_name = self._get_param("logGroupName") log_stream_name = self._get_param("logStreamName") log_events = self._get_param("logEvents") next_sequence_token, rejected_info = self.logs_backend.put_log_events( log_group_name, log_stream_name, log_events ) if rejected_info: return json.dumps( { "nextSequenceToken": next_sequence_token, "rejectedLogEventsInfo": rejected_info, } ) else: return json.dumps({"nextSequenceToken": next_sequence_token}) def get_log_events(self) -> str: log_group_name = self._get_param("logGroupName") log_group_id = self._get_param("logGroupIdentifier") log_stream_name = self._get_param("logStreamName") start_time = self._get_param("startTime") end_time = self._get_param("endTime") limit = self._get_param("limit") next_token = self._get_param("nextToken") start_from_head = self._get_param("startFromHead", False) ( events, next_backward_token, next_forward_token, ) = self.logs_backend.get_log_events( log_group_name=log_group_name, log_group_id=log_group_id, log_stream_name=log_stream_name, start_time=start_time, end_time=end_time, limit=limit, next_token=next_token, start_from_head=start_from_head, ) return json.dumps( { "events": events, "nextBackwardToken": next_backward_token, "nextForwardToken": next_forward_token, } ) def filter_log_events(self) -> str: log_group_name = self._get_param("logGroupName") log_stream_names = self._get_param("logStreamNames", []) start_time = self._get_param("startTime") # impl, see: http://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html filter_pattern = self._get_param("filterPattern") interleaved = self._get_param("interleaved", False) end_time = self._get_param("endTime") limit = self._get_param("limit") next_token = self._get_param("nextToken") events, next_token, searched_streams = self.logs_backend.filter_log_events( log_group_name, log_stream_names, start_time, end_time, limit, next_token, filter_pattern, interleaved, ) return json.dumps( { "events": events, "nextToken": next_token, "searchedLogStreams": searched_streams, } ) def put_retention_policy(self) -> str: log_group_name = self._get_param("logGroupName") retention_in_days = self._get_param("retentionInDays") self.logs_backend.put_retention_policy(log_group_name, retention_in_days) return "" def delete_retention_policy(self) -> str: log_group_name = self._get_param("logGroupName") self.logs_backend.delete_retention_policy(log_group_name) return "" def describe_resource_policies(self) -> str: policies = self.logs_backend.describe_resource_policies() return json.dumps({"resourcePolicies": [p.describe() for p in policies]}) def put_resource_policy(self) -> str: policy_name = self._get_param("policyName") policy_doc = self._get_param("policyDocument") policy = self.logs_backend.put_resource_policy(policy_name, policy_doc) return json.dumps({"resourcePolicy": policy.describe()}) def delete_resource_policy(self) -> str: policy_name = self._get_param("policyName") self.logs_backend.delete_resource_policy(policy_name) return "" def list_tags_log_group(self) -> str: log_group_name = self._get_param("logGroupName") tags = self.logs_backend.list_tags_log_group(log_group_name) return json.dumps({"tags": tags}) def tag_log_group(self) -> str: log_group_name = self._get_param("logGroupName") tags = self._get_param("tags") self.logs_backend.tag_log_group(log_group_name, tags) return "" def untag_log_group(self) -> str: log_group_name = self._get_param("logGroupName") tags = self._get_param("tags") self.logs_backend.untag_log_group(log_group_name, tags) return "" def describe_subscription_filters(self) -> str: log_group_name = self._get_param("logGroupName") _filters = self.logs_backend.describe_subscription_filters(log_group_name) return json.dumps({"subscriptionFilters": [f.to_json() for f in _filters]}) def put_subscription_filter(self) -> str: log_group_name = self._get_param("logGroupName") filter_name = self._get_param("filterName") filter_pattern = self._get_param("filterPattern") destination_arn = self._get_param("destinationArn") role_arn = self._get_param("roleArn") self.logs_backend.put_subscription_filter( log_group_name, filter_name, filter_pattern, destination_arn, role_arn ) return "" def delete_subscription_filter(self) -> str: log_group_name = self._get_param("logGroupName") filter_name = self._get_param("filterName") self.logs_backend.delete_subscription_filter(log_group_name, filter_name) return "" def start_query(self) -> str: log_group_name = self._get_param("logGroupName") log_group_names = self._get_param("logGroupNames") start_time = self._get_int_param("startTime") end_time = self._get_int_param("endTime") query_string = self._get_param("queryString") if log_group_name and log_group_names: raise InvalidParameterException() if log_group_name: log_group_names = [log_group_name] query_id = self.logs_backend.start_query( log_group_names, start_time, end_time, query_string ) return json.dumps({"queryId": f"{query_id}"}) def describe_queries(self) -> str: log_group_name = self._get_param("logGroupName") status = self._get_param("status") queries = self.logs_backend.describe_queries(log_group_name, status) return json.dumps( {"queries": [query.to_json(log_group_name) for query in queries]} ) def get_query_results(self) -> str: query_id = self._get_param("queryId") query = self.logs_backend.get_query_results(query_id) return json.dumps(query.to_result_json()) def cancel_export_task(self) -> str: task_id = self._get_param("taskId") self.logs_backend.cancel_export_task(task_id) return "{}" def create_export_task(self) -> str: task_id = self.logs_backend.create_export_task( logGroupName=self._get_param("logGroupName"), fromTime=self._get_int_param("from"), to=self._get_int_param("to"), destination=self._get_param("destination"), destinationPrefix=self._get_param("destinationPrefix", "exportedlogs"), taskName=self._get_param("taskName"), ) return json.dumps(dict(taskId=str(task_id))) def describe_export_tasks(self) -> str: task_id = self._get_param("taskId") tasks = self.logs_backend.describe_export_tasks(task_id=task_id) return json.dumps({"exportTasks": [t.to_json() for t in tasks]}) def list_tags_for_resource(self) -> str: resource_arn = self._get_param("resourceArn") tags = self.logs_backend.list_tags_for_resource(resource_arn) return json.dumps({"tags": tags}) def tag_resource(self) -> str: resource_arn = self._get_param("resourceArn") tags = self._get_param("tags") self.logs_backend.tag_resource(resource_arn, tags) return "{}" def untag_resource(self) -> str: resource_arn = self._get_param("resourceArn") tag_keys = self._get_param("tagKeys") self.logs_backend.untag_resource(resource_arn, tag_keys) return "{}" def put_delivery_destination(self) -> str: name = self._get_param("name") output_format = self._get_param("outputFormat") delivery_destination_configuration = self._get_param( "deliveryDestinationConfiguration" ) tags = self._get_param("tags") delivery_destination = self.logs_backend.put_delivery_destination( name=name, output_format=output_format, delivery_destination_configuration=delivery_destination_configuration, tags=tags, ) return json.dumps(dict(deliveryDestination=delivery_destination.to_dict())) def get_delivery_destination(self) -> str: name = self._get_param("name") delivery_destination = self.logs_backend.get_delivery_destination( name=name, ) return json.dumps(dict(deliveryDestination=delivery_destination.to_dict())) def describe_delivery_destinations(self) -> str: delivery_destinations = self.logs_backend.describe_delivery_destinations() return json.dumps( dict(deliveryDestinations=[dd.to_dict() for dd in delivery_destinations]) ) def put_delivery_destination_policy(self) -> str: delivery_destination_name = self._get_param("deliveryDestinationName") delivery_destination_policy = self._get_param("deliveryDestinationPolicy") policy = self.logs_backend.put_delivery_destination_policy( delivery_destination_name=delivery_destination_name, delivery_destination_policy=delivery_destination_policy, ) return json.dumps(dict(policy=policy)) def get_delivery_destination_policy(self) -> str: delivery_destination_name = self._get_param("deliveryDestinationName") policy = self.logs_backend.get_delivery_destination_policy( delivery_destination_name=delivery_destination_name, ) return json.dumps(dict(policy=policy)) def put_delivery_source(self) -> str: name = self._get_param("name") resource_arn = self._get_param("resourceArn") log_type = self._get_param("logType") tags = self._get_param("tags") delivery_source = self.logs_backend.put_delivery_source( name=name, resource_arn=resource_arn, log_type=log_type, tags=tags, ) return json.dumps(dict(deliverySource=delivery_source.to_dict())) def describe_delivery_sources(self) -> str: delivery_sources = self.logs_backend.describe_delivery_sources() return json.dumps( dict(deliverySources=[ds.to_dict() for ds in delivery_sources]) ) def get_delivery_source(self) -> str: name = self._get_param("name") delivery_source = self.logs_backend.get_delivery_source( name=name, ) return json.dumps(dict(deliverySource=delivery_source.to_dict())) def create_delivery(self) -> str: delivery_source_name = self._get_param("deliverySourceName") delivery_destination_arn = self._get_param("deliveryDestinationArn") record_fields = self._get_param("recordFields") field_delimiter = self._get_param("fieldDelimiter") s3_delivery_configuration = self._get_param("s3DeliveryConfiguration") tags = self._get_param("tags") delivery = self.logs_backend.create_delivery( delivery_source_name=delivery_source_name, delivery_destination_arn=delivery_destination_arn, record_fields=record_fields, field_delimiter=field_delimiter, s3_delivery_configuration=s3_delivery_configuration, tags=tags, ) return json.dumps(dict(delivery=delivery.to_dict())) def describe_deliveries(self) -> str: deliveries = self.logs_backend.describe_deliveries() return json.dumps(dict(deliveries=[d.to_dict() for d in deliveries])) def get_delivery(self) -> str: id = self._get_param("id") delivery = self.logs_backend.get_delivery( id=id, ) return json.dumps(dict(delivery=delivery.to_dict())) def delete_delivery(self) -> str: id = self._get_param("id") self.logs_backend.delete_delivery( id=id, ) return "" def delete_delivery_destination(self) -> str: name = self._get_param("name") self.logs_backend.delete_delivery_destination( name=name, ) return "" def delete_delivery_destination_policy(self) -> str: delivery_destination_name = self._get_param("deliveryDestinationName") self.logs_backend.delete_delivery_destination_policy( delivery_destination_name=delivery_destination_name, ) return "" def delete_delivery_source(self) -> str: name = self._get_param("name") self.logs_backend.delete_delivery_source( name=name, ) return ""
Memory