import json import re from typing import Any, Dict, Optional, Pattern, Tuple from .exceptions import ( DisabledCompatibilityVersioningException, GeneralGSRAlreadyExistsException, GeneralResourceNumberLimitExceededException, InvalidCompatibilityException, InvalidDataFormatException, InvalidNumberOfTagsException, InvalidRegistryIdBothParamsProvidedException, InvalidSchemaDefinitionException, InvalidSchemaIdBothParamsProvidedException, InvalidSchemaIdNotProvidedException, InvalidSchemaVersionIdProvidedWithOtherParamsException, InvalidSchemaVersionNumberBothParamsProvidedException, InvalidSchemaVersionNumberNotProvidedException, ParamValueContainsInvalidCharactersException, RegistryNotFoundException, ResourceNameTooLongException, SchemaNotFoundException, SchemaVersionMetadataLimitExceededException, ) from .glue_schema_registry_constants import ( ARN_PATTERN, DEFAULT_REGISTRY_NAME, DESCRIPTION_PATTERN, LATEST_VERSION, MAX_ARN_LENGTH, MAX_DESCRIPTION_LENGTH, MAX_REGISTRIES_ALLOWED, MAX_REGISTRY_NAME_LENGTH, MAX_SCHEMA_DEFINITION_LENGTH, MAX_SCHEMA_NAME_LENGTH, MAX_SCHEMA_VERSION_METADATA_ALLOWED, MAX_SCHEMA_VERSION_METADATA_LENGTH, MAX_SCHEMA_VERSIONS_ALLOWED, MAX_SCHEMAS_ALLOWED, MAX_TAGS_ALLOWED, METADATA_KEY, METADATA_VALUE, REGISTRY_ARN, REGISTRY_NAME, RESOURCE_NAME_PATTERN, SCHEMA_ARN, SCHEMA_DEFINITION, SCHEMA_NAME, SCHEMA_VERSION_ID, SCHEMA_VERSION_ID_PATTERN, SCHEMA_VERSION_METADATA_PATTERN, VERSION_NUMBER, ) def validate_registry_name_pattern_and_length(param_value: str) -> None: validate_param_pattern_and_length( param_value, param_name="registryName", max_name_length=MAX_REGISTRY_NAME_LENGTH, pattern=RESOURCE_NAME_PATTERN, ) def validate_arn_pattern_and_length(param_value: str) -> None: validate_param_pattern_and_length( param_value, param_name="registryArn", max_name_length=MAX_ARN_LENGTH, pattern=ARN_PATTERN, ) def validate_description_pattern_and_length(param_value: str) -> None: validate_param_pattern_and_length( param_value, param_name="description", max_name_length=MAX_DESCRIPTION_LENGTH, pattern=DESCRIPTION_PATTERN, ) def validate_schema_name_pattern_and_length(param_value: str) -> None: validate_param_pattern_and_length( param_value, param_name="schemaName", max_name_length=MAX_SCHEMA_NAME_LENGTH, pattern=RESOURCE_NAME_PATTERN, ) def validate_schema_version_metadata_key_pattern_and_length(param_value: str) -> None: validate_param_pattern_and_length( param_value, param_name="key", max_name_length=MAX_SCHEMA_VERSION_METADATA_LENGTH, pattern=SCHEMA_VERSION_METADATA_PATTERN, ) def validate_schema_version_metadata_value_pattern_and_length(param_value: str) -> None: validate_param_pattern_and_length( param_value, param_name="value", max_name_length=MAX_SCHEMA_VERSION_METADATA_LENGTH, pattern=SCHEMA_VERSION_METADATA_PATTERN, ) def validate_param_pattern_and_length( param_value: str, param_name: str, max_name_length: int, pattern: Pattern[str] ) -> None: if len(param_value.encode("utf-8")) > max_name_length: raise ResourceNameTooLongException(param_name) if re.match(pattern, param_value) is None: raise ParamValueContainsInvalidCharactersException(param_name) def validate_schema_definition(schema_definition: str, data_format: str) -> None: validate_schema_definition_length(schema_definition) if data_format in ["AVRO", "JSON"]: try: json.loads(schema_definition) except ValueError as err: raise InvalidSchemaDefinitionException(data_format, err) def validate_schema_definition_length(schema_definition: str) -> None: if len(schema_definition) > MAX_SCHEMA_DEFINITION_LENGTH: param_name = SCHEMA_DEFINITION raise ResourceNameTooLongException(param_name) def validate_schema_version_id_pattern(schema_version_id: str) -> None: if re.match(SCHEMA_VERSION_ID_PATTERN, schema_version_id) is None: raise ParamValueContainsInvalidCharactersException(SCHEMA_VERSION_ID) def validate_number_of_tags(tags: Dict[str, str]) -> None: if len(tags) > MAX_TAGS_ALLOWED: raise InvalidNumberOfTagsException() def validate_registry_id( registry_id: Dict[str, Any], registries: Dict[str, Any] ) -> str: if not registry_id: return DEFAULT_REGISTRY_NAME if registry_id.get(REGISTRY_NAME) and registry_id.get(REGISTRY_ARN): raise InvalidRegistryIdBothParamsProvidedException() if registry_id.get(REGISTRY_NAME): registry_name = registry_id.get(REGISTRY_NAME) validate_registry_name_pattern_and_length(registry_name) # type: ignore elif registry_id.get(REGISTRY_ARN): registry_arn = registry_id.get(REGISTRY_ARN) validate_arn_pattern_and_length(registry_arn) # type: ignore registry_name = registry_arn.split("/")[-1] # type: ignore if registry_name != DEFAULT_REGISTRY_NAME and registry_name not in registries: if registry_id.get(REGISTRY_NAME): raise RegistryNotFoundException( resource="Registry", param_name=REGISTRY_NAME, param_value=registry_name, ) if registry_id.get(REGISTRY_ARN): raise RegistryNotFoundException( resource="Registry", param_name=REGISTRY_ARN, param_value=registry_arn, ) return registry_name # type: ignore def validate_registry_params( registries: Any, registry_name: str, description: Optional[str] = None, tags: Optional[Dict[str, str]] = None, ) -> None: validate_registry_name_pattern_and_length(registry_name) if description: validate_description_pattern_and_length(description) if tags: validate_number_of_tags(tags) if len(registries) >= MAX_REGISTRIES_ALLOWED: raise GeneralResourceNumberLimitExceededException(resource="registries") if registry_name in registries: raise GeneralGSRAlreadyExistsException( resource="Registry", param_name=REGISTRY_NAME, param_value=registry_name, ) def validate_schema_id( schema_id: Dict[str, str], registries: Dict[str, Any] ) -> Tuple[str, str, Optional[str]]: schema_arn = schema_id.get(SCHEMA_ARN) registry_name = schema_id.get(REGISTRY_NAME) schema_name = schema_id.get(SCHEMA_NAME) if schema_arn: if registry_name or schema_name: raise InvalidSchemaIdBothParamsProvidedException() validate_arn_pattern_and_length(schema_arn) arn_components = schema_arn.split("/") schema_name = arn_components[-1] registry_name = arn_components[-2] else: if registry_name is None or schema_name is None: raise InvalidSchemaIdNotProvidedException() validate_registry_name_pattern_and_length(registry_name) validate_schema_name_pattern_and_length(schema_name) if ( registry_name not in registries or schema_name not in registries[registry_name].schemas ): raise SchemaNotFoundException(schema_name, registry_name, schema_arn) return registry_name, schema_name, schema_arn def validate_schema_params( registry: Any, schema_name: str, data_format: str, compatibility: str, schema_definition: str, num_schemas: int, description: Optional[str] = None, tags: Optional[Dict[str, str]] = None, ) -> None: validate_schema_name_pattern_and_length(schema_name) if data_format not in ["AVRO", "JSON", "PROTOBUF"]: raise InvalidDataFormatException() if compatibility not in [ "NONE", "DISABLED", "BACKWARD", "BACKWARD_ALL", "FORWARD", "FORWARD_ALL", "FULL", "FULL_ALL", ]: raise InvalidCompatibilityException() if description: validate_description_pattern_and_length(description) if tags: validate_number_of_tags(tags) validate_schema_definition(schema_definition, data_format) if num_schemas >= MAX_SCHEMAS_ALLOWED: raise GeneralResourceNumberLimitExceededException(resource="schemas") if schema_name in registry.schemas: raise GeneralGSRAlreadyExistsException( resource="Schema", param_name=SCHEMA_NAME, param_value=schema_name, ) def validate_register_schema_version_params( registry_name: str, schema_name: str, schema_arn: Optional[str], num_schema_versions: int, schema_definition: str, compatibility: str, data_format: str, ) -> None: if compatibility == "DISABLED": raise DisabledCompatibilityVersioningException( schema_name, registry_name, schema_arn ) validate_schema_definition(schema_definition, data_format) if num_schema_versions >= MAX_SCHEMA_VERSIONS_ALLOWED: raise GeneralResourceNumberLimitExceededException(resource="schema versions") def validate_schema_version_params( # type: ignore[return] registries: Dict[str, Any], schema_id: Optional[Dict[str, Any]], schema_version_id: Optional[str], schema_version_number: Optional[Dict[str, Any]], ) -> Tuple[ Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], ]: if not schema_version_id and not schema_id and not schema_version_number: raise InvalidSchemaIdNotProvidedException() if schema_version_id and (schema_id or schema_version_number): raise InvalidSchemaVersionIdProvidedWithOtherParamsException() if schema_version_id: validate_schema_version_id_pattern(schema_version_id) # returns schema_version_id, registry_name, schema_name, schema_arn, version_number, latest_version return schema_version_id, None, None, None, None, None if schema_id and schema_version_number: registry_name, schema_name, schema_arn = validate_schema_id( schema_id, registries ) version_number, latest_version = validate_schema_version_number( registries, registry_name, schema_name, schema_version_number ) return ( None, registry_name, schema_name, schema_arn, version_number, latest_version, ) if not schema_id: raise InvalidSchemaIdNotProvidedException() if not schema_version_number: raise InvalidSchemaVersionNumberNotProvidedException() def validate_schema_version_number( registries: Dict[str, Any], registry_name: str, schema_name: str, schema_version_number: Dict[str, str], ) -> Tuple[str, str]: latest_version = schema_version_number.get(LATEST_VERSION) version_number = schema_version_number.get(VERSION_NUMBER) schema = registries[registry_name].schemas[schema_name] if latest_version: if version_number: raise InvalidSchemaVersionNumberBothParamsProvidedException() return schema.latest_schema_version, latest_version return version_number, latest_version # type: ignore def validate_schema_version_metadata_pattern_and_length( metadata_key_value: Dict[str, str], ) -> Tuple[str, str]: metadata_key = metadata_key_value.get(METADATA_KEY) metadata_value = metadata_key_value.get(METADATA_VALUE) validate_schema_version_metadata_key_pattern_and_length(metadata_key) # type: ignore validate_schema_version_metadata_value_pattern_and_length(metadata_value) # type: ignore return metadata_key, metadata_value # type: ignore[return-value] def validate_number_of_schema_version_metadata_allowed( metadata: Dict[str, Any], ) -> None: num_metadata_key_value_pairs = 0 for m in metadata.values(): num_metadata_key_value_pairs += len(m) if num_metadata_key_value_pairs >= MAX_SCHEMA_VERSION_METADATA_ALLOWED: raise SchemaVersionMetadataLimitExceededException() def get_schema_version_if_definition_exists( schema_versions: Any, data_format: str, schema_definition: str ) -> Optional[Dict[str, Any]]: if data_format in ["AVRO", "JSON"]: for schema_version in schema_versions: if json.loads(schema_definition) == json.loads( schema_version.schema_definition ): return schema_version.as_dict() else: for schema_version in schema_versions: if schema_definition == schema_version.schema_definition: return schema_version.as_dict() return None def get_put_schema_version_metadata_response( schema_id: Dict[str, Any], schema_version_number: Optional[Dict[str, str]], schema_version_id: str, metadata_key_value: Dict[str, str], ) -> Dict[str, Any]: put_schema_version_metadata_response_dict: Dict[str, Any] = {} if schema_version_id: put_schema_version_metadata_response_dict[SCHEMA_VERSION_ID] = schema_version_id if schema_id: schema_arn = schema_id.get(SCHEMA_ARN) registry_name = schema_id.get(REGISTRY_NAME) schema_name = schema_id.get(SCHEMA_NAME) if schema_arn: put_schema_version_metadata_response_dict[SCHEMA_ARN] = schema_arn if registry_name: put_schema_version_metadata_response_dict[REGISTRY_NAME] = registry_name if schema_name: put_schema_version_metadata_response_dict[SCHEMA_NAME] = schema_name if schema_version_number: latest_version = schema_version_number.get(LATEST_VERSION) version_number = schema_version_number.get(VERSION_NUMBER) if latest_version: put_schema_version_metadata_response_dict[LATEST_VERSION] = latest_version else: put_schema_version_metadata_response_dict[LATEST_VERSION] = False if version_number: put_schema_version_metadata_response_dict[VERSION_NUMBER] = version_number else: put_schema_version_metadata_response_dict[LATEST_VERSION] = False put_schema_version_metadata_response_dict[VERSION_NUMBER] = 0 metadata_key = metadata_key_value.get(METADATA_KEY) metadata_value = metadata_key_value.get(METADATA_VALUE) put_schema_version_metadata_response_dict[METADATA_KEY] = metadata_key put_schema_version_metadata_response_dict[METADATA_VALUE] = metadata_value return put_schema_version_metadata_response_dict def delete_schema_response( schema_name: str, schema_arn: str, status: str ) -> Dict[str, Any]: return { "SchemaName": schema_name, "SchemaArn": schema_arn, "Status": status, }
Memory