import math
from abc import ABCMeta, abstractmethod
from collections import OrderedDict
from copy import deepcopy
from datetime import datetime
from typing import Any, Dict, List, Optional
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.core.utils import camelcase_to_pascal, underscores_to_camelcase
from moto.utilities.paginator import paginate
from moto.utilities.utils import get_partition
from .exceptions import (
AlreadyExistsException,
ConflictException,
ResourceNotFoundException,
RulesetAlreadyExistsException,
RulesetNotFoundException,
ValidationException,
)
class DataBrewBackend(BaseBackend):
PAGINATION_MODEL = {
"list_recipes": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "name",
},
"list_recipe_versions": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "name",
},
"list_rulesets": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "name",
},
"list_datasets": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "name",
},
"list_jobs": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "name",
},
}
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.recipes: Dict[str, FakeRecipe] = OrderedDict()
self.rulesets: Dict[str, FakeRuleset] = OrderedDict()
self.datasets: Dict[str, FakeDataset] = OrderedDict()
self.jobs: Dict[str, FakeJob] = OrderedDict()
@staticmethod
def validate_length(param: str, param_name: str, max_length: int) -> None:
if len(param) > max_length:
raise ValidationException(
f"1 validation error detected: Value '{param}' at '{param_name}' failed to "
f"satisfy constraint: Member must have length less than or equal to {max_length}"
)
def create_recipe(
self,
recipe_name: str,
recipe_description: str,
recipe_steps: List[Dict[str, Any]],
tags: Dict[str, str],
) -> "FakeRecipeVersion":
# https://docs.aws.amazon.com/databrew/latest/dg/API_CreateRecipe.html
if recipe_name in self.recipes:
raise ConflictException(f"The recipe {recipe_name} already exists")
recipe = FakeRecipe(
self.region_name, recipe_name, recipe_description, recipe_steps, tags
)
self.recipes[recipe_name] = recipe
return recipe.latest_working
def delete_recipe_version(self, recipe_name: str, recipe_version: str) -> None:
if not FakeRecipe.version_is_valid(recipe_version, latest_published=False):
raise ValidationException(
f"Recipe {recipe_name} version {recipe_version} is invalid."
)
try:
recipe = self.recipes[recipe_name]
except KeyError:
raise ResourceNotFoundException(f"The recipe {recipe_name} wasn't found")
if (
recipe_version != FakeRecipe.LATEST_WORKING
and float(recipe_version) not in recipe.versions
):
raise ResourceNotFoundException(
f"The recipe {recipe_name} version {recipe_version} wasn't found."
)
if recipe_version in (
FakeRecipe.LATEST_WORKING,
str(recipe.latest_working.version),
):
if recipe.latest_published is not None:
# Can only delete latest working version when there are no others
raise ValidationException(
f"Recipe version {recipe_version} is not allowed to be deleted"
)
else:
del self.recipes[recipe_name]
else:
recipe.delete_published_version(recipe_version)
def update_recipe(
self,
recipe_name: str,
recipe_description: str,
recipe_steps: List[Dict[str, Any]],
) -> None:
if recipe_name not in self.recipes:
raise ResourceNotFoundException(f"The recipe {recipe_name} wasn't found")
recipe = self.recipes[recipe_name]
recipe.update(recipe_description, recipe_steps)
@paginate(pagination_model=PAGINATION_MODEL)
def list_recipes(
self, recipe_version: Optional[str] = None
) -> List["FakeRecipeVersion"]:
# https://docs.aws.amazon.com/databrew/latest/dg/API_ListRecipes.html
if recipe_version == FakeRecipe.LATEST_WORKING:
version = "latest_working"
elif recipe_version in (None, FakeRecipe.LATEST_PUBLISHED):
version = "latest_published"
else:
raise ValidationException(
f"Invalid version {recipe_version}. "
"Valid versions are LATEST_PUBLISHED and LATEST_WORKING."
)
recipes = [getattr(self.recipes[key], version) for key in self.recipes]
return [r for r in recipes if r is not None]
@paginate(pagination_model=PAGINATION_MODEL)
def list_recipe_versions(self, recipe_name: str) -> List["FakeRecipeVersion"]:
# https://docs.aws.amazon.com/databrew/latest/dg/API_ListRecipeVersions.html
self.validate_length(recipe_name, "name", 255)
recipe = self.recipes.get(recipe_name)
if recipe is None:
return []
latest_working = recipe.latest_working
recipe_versions = [
recipe_version
for recipe_version in recipe.versions.values()
if recipe_version is not latest_working
]
return [r for r in recipe_versions if r is not None]
def describe_recipe(
self, recipe_name: str, recipe_version: Optional[str] = None
) -> "FakeRecipeVersion":
# https://docs.aws.amazon.com/databrew/latest/dg/API_DescribeRecipe.html
self.validate_length(recipe_name, "name", 255)
if recipe_version is None:
recipe_version = FakeRecipe.LATEST_PUBLISHED
else:
self.validate_length(recipe_version, "recipeVersion", 16)
if not FakeRecipe.version_is_valid(recipe_version):
raise ValidationException(
f"Recipe {recipe_name} version {recipe_version} isn't valid."
)
recipe = None
if recipe_name in self.recipes:
if recipe_version == FakeRecipe.LATEST_PUBLISHED:
recipe = self.recipes[recipe_name].latest_published
elif recipe_version == FakeRecipe.LATEST_WORKING:
recipe = self.recipes[recipe_name].latest_working
else:
recipe = self.recipes[recipe_name].versions.get(float(recipe_version))
if recipe is None:
raise ResourceNotFoundException(
f"The recipe {recipe_name} for version {recipe_version} wasn't found."
)
return recipe
def publish_recipe(
self, recipe_name: str, description: Optional[str] = None
) -> None:
# https://docs.aws.amazon.com/databrew/latest/dg/API_PublishRecipe.html
self.validate_length(recipe_name, "name", 255)
try:
self.recipes[recipe_name].publish(description)
except KeyError:
raise ResourceNotFoundException(f"Recipe {recipe_name} wasn't found")
def create_ruleset(
self,
ruleset_name: str,
ruleset_description: str,
ruleset_rules: List[Dict[str, Any]],
ruleset_target_arn: str,
tags: Dict[str, str],
) -> "FakeRuleset":
if ruleset_name in self.rulesets:
raise RulesetAlreadyExistsException()
ruleset = FakeRuleset(
self.region_name,
ruleset_name,
ruleset_description,
ruleset_rules,
ruleset_target_arn,
tags,
)
self.rulesets[ruleset_name] = ruleset
return ruleset
def update_ruleset(
self,
ruleset_name: str,
ruleset_description: str,
ruleset_rules: List[Dict[str, Any]],
tags: Dict[str, str],
) -> "FakeRuleset":
if ruleset_name not in self.rulesets:
raise RulesetNotFoundException(ruleset_name)
ruleset = self.rulesets[ruleset_name]
if ruleset_description is not None:
ruleset.description = ruleset_description
if ruleset_rules is not None:
ruleset.rules = ruleset_rules
if tags is not None:
ruleset.tags = tags
return ruleset
def describe_ruleset(self, ruleset_name: str) -> "FakeRuleset":
if ruleset_name not in self.rulesets:
raise RulesetNotFoundException(ruleset_name)
return self.rulesets[ruleset_name]
@paginate(pagination_model=PAGINATION_MODEL)
def list_rulesets(self) -> List["FakeRuleset"]:
return list(self.rulesets.values())
def delete_ruleset(self, ruleset_name: str) -> None:
if ruleset_name not in self.rulesets:
raise RulesetNotFoundException(ruleset_name)
del self.rulesets[ruleset_name]
def create_dataset(
self,
dataset_name: str,
dataset_format: str,
dataset_format_options: Dict[str, Any],
dataset_input: Dict[str, Any],
dataset_path_options: Dict[str, Any],
tags: Dict[str, str],
) -> "FakeDataset":
if dataset_name in self.datasets:
raise AlreadyExistsException(dataset_name)
dataset = FakeDataset(
self.region_name,
self.account_id,
dataset_name,
dataset_format,
dataset_format_options,
dataset_input,
dataset_path_options,
tags,
)
self.datasets[dataset_name] = dataset
return dataset
@paginate(pagination_model=PAGINATION_MODEL)
def list_datasets(self) -> List["FakeDataset"]:
return list(self.datasets.values())
def update_dataset(
self,
dataset_name: str,
dataset_format: str,
dataset_format_options: Dict[str, Any],
dataset_input: Dict[str, Any],
dataset_path_options: Dict[str, Any],
tags: Dict[str, str],
) -> "FakeDataset":
if dataset_name not in self.datasets:
raise ResourceNotFoundException("One or more resources can't be found.")
dataset = self.datasets[dataset_name]
if dataset_format is not None:
dataset.format = dataset_format
if dataset_format_options is not None:
dataset.format_options = dataset_format_options
if dataset_input is not None:
dataset.input = dataset_input
if dataset_path_options is not None:
dataset.path_options = dataset_path_options
if tags is not None:
dataset.tags = tags
return dataset
def delete_dataset(self, dataset_name: str) -> None:
if dataset_name not in self.datasets:
raise ResourceNotFoundException("One or more resources can't be found.")
del self.datasets[dataset_name]
def describe_dataset(self, dataset_name: str) -> "FakeDataset":
if dataset_name not in self.datasets:
raise ResourceNotFoundException("One or more resources can't be found.")
return self.datasets[dataset_name]
def describe_job(self, job_name: str) -> "FakeJob":
# https://docs.aws.amazon.com/databrew/latest/dg/API_DescribeJob.html
self.validate_length(job_name, "name", 240)
if job_name not in self.jobs:
raise ResourceNotFoundException(f"Job {job_name} wasn't found.")
return self.jobs[job_name]
def delete_job(self, job_name: str) -> None:
# https://docs.aws.amazon.com/databrew/latest/dg/API_DeleteJob.html
self.validate_length(job_name, "name", 240)
if job_name not in self.jobs:
raise ResourceNotFoundException(f"The job {job_name} wasn't found.")
del self.jobs[job_name]
def create_profile_job(self, **kwargs: Any) -> "FakeProfileJob":
# https://docs.aws.amazon.com/databrew/latest/dg/API_CreateProfileJob.html
job_name = kwargs["name"]
self.validate_length(job_name, "name", 240)
if job_name in self.jobs:
raise ConflictException(
f"The job {job_name} {self.jobs[job_name].job_type.lower()} job already exists."
)
job = FakeProfileJob(
account_id=self.account_id, region_name=self.region_name, **kwargs
)
self.jobs[job_name] = job
return job
def create_recipe_job(self, **kwargs: Any) -> "FakeRecipeJob":
# https://docs.aws.amazon.com/databrew/latest/dg/API_CreateRecipeJob.html
job_name = kwargs["name"]
self.validate_length(job_name, "name", 240)
if job_name in self.jobs:
raise ConflictException(
f"The job {job_name} {self.jobs[job_name].job_type.lower()} job already exists."
)
job = FakeRecipeJob(
account_id=self.account_id, region_name=self.region_name, **kwargs
)
self.jobs[job_name] = job
return job
def update_job(self, **kwargs: Any) -> "FakeJob":
job_name = kwargs["name"]
self.validate_length(job_name, "name", 240)
if job_name not in self.jobs:
raise ResourceNotFoundException(f"The job {job_name} wasn't found")
job = self.jobs[job_name]
for param, value in kwargs.items():
setattr(job, param, value)
return job
def update_recipe_job(self, **kwargs: Any) -> "FakeJob":
# https://docs.aws.amazon.com/databrew/latest/dg/API_UpdateRecipeJob.html
return self.update_job(**kwargs)
def update_profile_job(self, **kwargs: Any) -> "FakeJob":
# https://docs.aws.amazon.com/databrew/latest/dg/API_UpdateProfileJob.html
return self.update_job(**kwargs)
@paginate(pagination_model=PAGINATION_MODEL)
def list_jobs(
self, dataset_name: Optional[str] = None, project_name: Optional[str] = None
) -> List["FakeJob"]:
# https://docs.aws.amazon.com/databrew/latest/dg/API_ListJobs.html
if dataset_name is not None:
self.validate_length(dataset_name, "datasetName", 255)
if project_name is not None:
self.validate_length(project_name, "projectName", 255)
def filter_jobs(job: FakeJob) -> bool:
if dataset_name is not None and job.dataset_name != dataset_name:
return False
if (
project_name is not None
and getattr(job, "project_name", None) != project_name
):
return False
return True
return list(filter(filter_jobs, self.jobs.values()))
class FakeRecipe(BaseModel):
INITIAL_VERSION = 0.1
LATEST_WORKING = "LATEST_WORKING"
LATEST_PUBLISHED = "LATEST_PUBLISHED"
@classmethod
def version_is_valid(
cls, version: str, latest_working: bool = True, latest_published: bool = True
) -> bool:
validity = True
if len(version) < 1 or len(version) > 16:
validity = False
else:
try:
float(version)
except ValueError:
if not (
(version == cls.LATEST_WORKING and latest_working)
or (version == cls.LATEST_PUBLISHED and latest_published)
):
validity = False
return validity
def __init__(
self,
region_name: str,
recipe_name: str,
recipe_description: str,
recipe_steps: List[Dict[str, Any]],
tags: Dict[str, str],
):
self.versions: Dict[float, FakeRecipeVersion] = OrderedDict()
self.versions[self.INITIAL_VERSION] = FakeRecipeVersion(
region_name,
recipe_name,
recipe_description,
recipe_steps,
tags,
version=self.INITIAL_VERSION,
)
self.latest_working = self.versions[self.INITIAL_VERSION]
self.latest_published: Optional[FakeRecipeVersion] = None
def publish(self, description: Optional[str] = None) -> None:
self.latest_published = self.latest_working
self.latest_working = deepcopy(self.latest_working)
self.latest_published.publish(description)
del self.versions[self.latest_working.version]
self.versions[self.latest_published.version] = self.latest_published
self.latest_working.version = self.latest_published.version + 0.1
if self.latest_published.published_date:
self.latest_working.created_time = self.latest_published.published_date
self.versions[self.latest_working.version] = self.latest_working
def update(
self, description: Optional[str], steps: Optional[List[Dict[str, Any]]]
) -> None:
if description is not None:
self.latest_working.description = description
if steps is not None:
self.latest_working.steps = steps
def delete_published_version(self, version: str) -> None:
float_version = float(version)
assert float_version.is_integer()
if float_version == self.latest_published.version: # type: ignore[union-attr]
prev_version = float_version - 1.0
# Iterate back through the published versions until we find one that exists
while prev_version >= 1.0:
if prev_version in self.versions:
self.latest_published = self.versions[prev_version]
break
prev_version -= 1.0
else:
# Didn't find an earlier published version
self.latest_published = None
del self.versions[float_version]
class FakeRecipeVersion(BaseModel):
def __init__(
self,
region_name: str,
recipe_name: str,
recipe_description: str,
recipe_steps: List[Dict[str, Any]],
tags: Dict[str, str],
version: float,
):
self.region_name = region_name
self.name = recipe_name
self.description = recipe_description
self.steps = recipe_steps
self.created_time = datetime.now()
self.tags = tags
self.published_date: Optional[datetime] = None
self.version = version
def as_dict(self) -> Dict[str, Any]:
dict_recipe = {
"Name": self.name,
"Steps": self.steps,
"Description": self.description,
"CreateDate": f"{self.created_time.timestamp():.3f}",
"Tags": self.tags or dict(),
"RecipeVersion": str(self.version),
}
if self.published_date is not None:
dict_recipe["PublishedDate"] = f"{self.published_date.timestamp():.3f}"
return dict_recipe
def publish(self, description: Optional[str]) -> None:
self.version = float(math.ceil(self.version))
self.published_date = datetime.now()
if description is not None:
self.description = description
class FakeRuleset(BaseModel):
def __init__(
self,
region_name: str,
ruleset_name: str,
ruleset_description: str,
ruleset_rules: List[Dict[str, Any]],
ruleset_target_arn: str,
tags: Dict[str, str],
):
self.region_name = region_name
self.name = ruleset_name
self.description = ruleset_description
self.rules = ruleset_rules
self.target_arn = ruleset_target_arn
self.created_time = datetime.now()
self.tags = tags
def as_dict(self) -> Dict[str, Any]:
return {
"Name": self.name,
"Rules": self.rules,
"Description": self.description,
"TargetArn": self.target_arn,
"CreateDate": f"{self.created_time.timestamp():.3f}",
"Tags": self.tags or dict(),
}
class FakeDataset(BaseModel):
def __init__(
self,
region_name: str,
account_id: str,
dataset_name: str,
dataset_format: str,
dataset_format_options: Dict[str, Any],
dataset_input: Dict[str, Any],
dataset_path_options: Dict[str, Any],
tags: Dict[str, str],
):
self.region_name = region_name
self.account_id = account_id
self.name = dataset_name
self.format = dataset_format
self.format_options = dataset_format_options
self.input = dataset_input
self.path_options = dataset_path_options
self.created_time = datetime.now()
self.tags = tags
@property
def resource_arn(self) -> str:
return f"arn:{get_partition(self.region_name)}:databrew:{self.region_name}:{self.account_id}:dataset/{self.name}"
def as_dict(self) -> Dict[str, Any]:
return {
"Name": self.name,
"Format": self.format,
"FormatOptions": self.format_options,
"Input": self.input,
"PathOptions": self.path_options,
"CreateDate": f"{self.created_time.timestamp():.3f}",
"Tags": self.tags or dict(),
"ResourceArn": self.resource_arn,
}
class BaseModelABCMeta(ABCMeta, type(BaseModel)): # type: ignore[misc]
pass
class FakeJob(BaseModel, metaclass=BaseModelABCMeta): # type: ignore[misc]
ENCRYPTION_MODES = ("SSE-S3", "SSE-KMS")
LOG_SUBSCRIPTION_VALUES = ("ENABLE", "DISABLE")
@property
@abstractmethod
def local_attrs(self) -> List[str]:
raise NotImplementedError
def __init__(self, account_id: str, region_name: str, **kwargs: Any):
self.account_id = account_id
self.region_name = region_name
self.name = kwargs.get("name")
self.created_time = datetime.now()
self.dataset_name = kwargs.get("dataset_name")
self.encryption_mode = kwargs.get("encryption_mode")
self.log_subscription = kwargs.get("log_subscription")
self.max_capacity = kwargs.get("max_capacity")
self.max_retries = kwargs.get("max_retries")
self.role_arn = kwargs.get("role_arn")
self.tags = kwargs.get("tags")
self.validate()
# Set attributes specific to subclass
for k in self.local_attrs:
setattr(self, k, kwargs.get(k))
def validate(self) -> None:
if self.encryption_mode is not None:
if self.encryption_mode not in FakeJob.ENCRYPTION_MODES:
raise ValidationException(
f"1 validation error detected: Value '{self.encryption_mode}' at 'encryptionMode' failed to satisfy constraint: Member must satisfy enum value set: [{', '.join(self.ENCRYPTION_MODES)}]"
)
if self.log_subscription is not None:
if self.log_subscription not in FakeJob.LOG_SUBSCRIPTION_VALUES:
raise ValidationException(
f"1 validation error detected: Value '{self.log_subscription}' at 'logSubscription' failed to satisfy constraint: Member must satisfy enum value set: [{', '.join(self.LOG_SUBSCRIPTION_VALUES)}]"
)
@property
@abstractmethod
def job_type(self) -> str:
pass
@property
def resource_arn(self) -> str:
return f"arn:{get_partition(self.region_name)}:databrew:{self.region_name}:{self.account_id}:job/{self.name}"
def as_dict(self) -> Dict[str, Any]:
rtn_dict = {
"Name": self.name,
"AccountId": self.account_id,
"CreateDate": f"{self.created_time.timestamp():.3f}",
"DatasetName": self.dataset_name,
"EncryptionMode": self.encryption_mode,
"Tags": self.tags or dict(),
"LogSubscription": self.log_subscription,
"MaxCapacity": self.max_capacity,
"MaxRetries": self.max_retries,
"ResourceArn": self.resource_arn,
"RoleArn": self.role_arn,
"Type": self.job_type,
}
# Add in subclass attributes
for k in self.local_attrs:
rtn_dict[camelcase_to_pascal(underscores_to_camelcase(k))] = getattr(
self, k
)
# Remove items that have a value of None
rtn_dict = {k: v for k, v in rtn_dict.items() if v is not None}
return rtn_dict
class FakeProfileJob(FakeJob): # type: ignore[misc]
job_type = "PROFILE"
local_attrs = ["output_location", "configuration", "validation_configurations"]
class FakeRecipeJob(FakeJob): # type: ignore[misc]
local_attrs = [
"database_outputs",
"data_catalog_outputs",
"outputs",
"project_name",
"recipe_reference",
]
job_type = "RECIPE"
databrew_backends = BackendDict(DataBrewBackend, "databrew")