from __future__ import annotations
import json
from collections import defaultdict
from enum import Enum
from typing import Any, Dict, List, Optional, Set, Tuple
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.utilities.tagging_service import TaggingService
from .exceptions import AlreadyExists, EntityNotFound, InvalidInput
class RessourceType(Enum):
catalog = "CATALOG"
database = "DATABASE"
table = "TABLE"
data_location = "DATA_LOCATION"
class Resource(BaseModel):
def __init__(self, arn: str, role_arn: str):
self.arn = arn
self.role_arn = role_arn
def to_dict(self) -> Dict[str, Any]:
return {
"ResourceArn": self.arn,
"RoleArn": self.role_arn,
}
class Permission:
def __init__(
self,
principal: Dict[str, str],
resource: Dict[str, Any],
permissions: List[str],
permissions_with_grant_options: List[str],
):
self.principal = principal
self.resource = resource
self.permissions = permissions
self.permissions_with_grant_options = permissions_with_grant_options
def __eq__(self, other: Any) -> bool:
if isinstance(other, Permission):
return (
(self.principal == other.principal)
and (self.resource == other.resource)
and (self.permissions == other.permissions)
and (
self.permissions_with_grant_options
== other.permissions_with_grant_options
)
)
return False
def __hash__(self) -> int:
return hash(
(
json.dumps(self.principal),
json.dumps(self.resource),
json.dumps(self.permissions),
json.dumps(self.permissions_with_grant_options),
)
)
def equal_principal_and_resouce(self, other: Permission) -> bool:
return (self.principal == other.principal) and (self.resource == other.resource)
def merge(self, other: Permission) -> None:
self.permissions = list(set(self.permissions).union(other.permissions))
self.permissions_with_grant_options = list(
set(self.permissions_with_grant_options).union(
other.permissions_with_grant_options
)
)
def diff(self, other: Permission) -> None:
if self.permissions is not None:
self.permissions = list(set(self.permissions).difference(other.permissions))
if self.permissions_with_grant_options is not None:
self.permissions_with_grant_options = list(
set(self.permissions_with_grant_options).difference(
other.permissions_with_grant_options
)
)
def is_empty(self) -> bool:
return (
len(self.permissions) == 0 and len(self.permissions_with_grant_options) == 0
)
def to_external_form(self) -> Dict[str, Any]:
return {
"Permissions": self.permissions,
"PermissionsWithGrantOption": self.permissions_with_grant_options,
"Resource": self.resource,
"Principal": self.principal,
}
class PermissionCatalog:
def __init__(self) -> None:
self.permissions: Set[Permission] = set()
def add_permission(self, permission: Permission) -> None:
for existing_permission in self.permissions:
if permission.equal_principal_and_resouce(existing_permission):
# Permission with same principal and resouce, only once of these can exist
existing_permission.merge(permission)
return
# found no match
self.permissions.add(permission)
def remove_permission(self, permission: Permission) -> None:
for existing_permission in self.permissions:
if permission.equal_principal_and_resouce(existing_permission):
# Permission with same principal and resouce, only once of these can exist
# remove and readd to recalculate the hash value after the diff
self.permissions.remove(existing_permission)
existing_permission.diff(permission)
self.permissions.add(existing_permission)
if existing_permission.is_empty():
self.permissions.remove(existing_permission)
return
class ListPermissionsResourceDatabase:
def __init__(self, catalog_id: Optional[str], name: str):
self.name = name
self.catalog_id = catalog_id
class ListPermissionsResourceTable:
def __init__(
self,
catalog_id: Optional[str],
database_name: str,
name: Optional[str],
table_wildcard: Optional[
Dict[str, str]
], # Placeholder type, table_wildcard is an empty dict in docs
):
if name is None and table_wildcard is None:
raise InvalidInput("Table name and table wildcard cannot both be empty.")
if name is not None and table_wildcard is not None:
raise InvalidInput("Table name and table wildcard cannot both be present.")
self.database_name = database_name
self.name = name
self.catalog_id = catalog_id
self.table_wildcard = table_wildcard
class ExcludedColumnNames:
def __init__(self, excluded_column_names: List[str]):
self.excluded_column_names = excluded_column_names
class ListPermissionsResourceTableWithColumns:
def __init__(
self,
catalog_id: Optional[str],
database_name: str,
name: str,
column_names: List[str],
column_wildcard: ExcludedColumnNames,
):
self.database_name = database_name
self.name = name
self.catalog_id = catalog_id
self.column_names = column_names
self.column_wildcard = column_wildcard
class ListPermissionsResourceDataLocation:
def __init__(self, catalog_id: Optional[str], resource_arn: str):
self.catalog_id = catalog_id
self.resource_arn = resource_arn
class ListPermissionsResourceDataCellsFilter:
def __init__(
self, table_catalog_id: str, database_name: str, table_name: str, name: str
):
self.table_catalog_id = table_catalog_id
self.database_name = database_name
self.table_name = table_name
self.name = name
class ListPermissionsResourceLFTag:
def __init__(self, catalog_id: str, tag_key: str, tag_values: List[str]):
self.catalog_id = catalog_id
self.tag_key = tag_key
self.tag_values = tag_values
class LFTag:
def __init__(self, tag_key: str, tag_values: List[str]):
self.tag_key = tag_key
self.tag_values = tag_values
class ListPermissionsResourceLFTagPolicy:
def __init__(self, catalog_id: str, resource_type: str, expression: List[LFTag]):
self.catalog_id = catalog_id
self.resource_type = resource_type
self.expression = expression
class ListPermissionsResource:
def __init__(
self,
catalog: Optional[
Dict[str, str]
], # Placeholder type, catalog is an empty dict in docs
database: Optional[ListPermissionsResourceDatabase],
table: Optional[ListPermissionsResourceTable],
table_with_columns: Optional[ListPermissionsResourceTableWithColumns],
data_location: Optional[ListPermissionsResourceDataLocation],
data_cells_filter: Optional[ListPermissionsResourceDataCellsFilter],
lf_tag: Optional[ListPermissionsResourceLFTag],
lf_tag_policy: Optional[ListPermissionsResourceLFTagPolicy],
):
if (
catalog is None
and database is None
and table is None
and data_location is None
):
# Error message is the exact string returned by the AWS-CLI eventhough it is valid
# to not populate the respective fields as long as data_location is given.
raise InvalidInput(
"Resource must have either the catalog, table or database field populated."
)
self.catalog = catalog
self.database = database
self.table = table
self.table_with_columns = table_with_columns
self.data_location = data_location
self.data_cells_filter = data_cells_filter
self.lf_tag = lf_tag
self.lf_tag_policy = lf_tag_policy
def default_settings() -> Dict[str, Any]:
return {
"DataLakeAdmins": [],
"CreateDatabaseDefaultPermissions": [
{
"Principal": {"DataLakePrincipalIdentifier": "IAM_ALLOWED_PRINCIPALS"},
"Permissions": ["ALL"],
}
],
"CreateTableDefaultPermissions": [
{
"Principal": {"DataLakePrincipalIdentifier": "IAM_ALLOWED_PRINCIPALS"},
"Permissions": ["ALL"],
}
],
"TrustedResourceOwners": [],
"AllowExternalDataFiltering": False,
"ExternalDataFilteringAllowList": [],
}
class LakeFormationBackend(BaseBackend):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.resources: Dict[str, Resource] = dict()
self.settings: Dict[str, Dict[str, Any]] = defaultdict(default_settings)
self.grants: Dict[str, PermissionCatalog] = {}
self.tagger = TaggingService()
self.lf_database_tags: Dict[Tuple[str, str], List[Dict[str, str]]] = {}
self.lf_table_tags: Dict[Tuple[str, str, str], List[Dict[str, str]]] = {}
self.lf_columns_tags: Dict[Tuple[str, ...], List[Dict[str, str]]] = {}
def describe_resource(self, resource_arn: str) -> Resource:
if resource_arn not in self.resources:
raise EntityNotFound
return self.resources[resource_arn]
def deregister_resource(self, resource_arn: str) -> None:
if resource_arn not in self.resources:
raise EntityNotFound
del self.resources[resource_arn]
def register_resource(self, resource_arn: str, role_arn: str) -> None:
if resource_arn in self.resources:
raise AlreadyExists(
"An error occurred (AlreadyExistsException) when calling the RegisterResource operation: Resource is already registered"
)
self.resources[resource_arn] = Resource(resource_arn, role_arn)
def list_resources(self) -> List[Resource]:
return list(self.resources.values())
def get_data_lake_settings(self, catalog_id: str) -> Dict[str, Any]:
return self.settings[catalog_id]
def put_data_lake_settings(self, catalog_id: str, settings: Dict[str, Any]) -> None:
self.settings[catalog_id] = settings
def grant_permissions(
self,
catalog_id: str,
principal: Dict[str, str],
resource: Dict[str, Any],
permissions: List[str],
permissions_with_grant_options: List[str],
) -> None:
if catalog_id not in self.grants:
self.grants[catalog_id] = PermissionCatalog()
self.grants[catalog_id].add_permission(
Permission(
principal=principal,
resource=resource,
permissions=permissions or [],
permissions_with_grant_options=permissions_with_grant_options or [],
)
)
def revoke_permissions(
self,
catalog_id: str,
principal: Dict[str, str],
resource: Dict[str, Any],
permissions_to_revoke: List[str],
permissions_with_grant_options_to_revoke: List[str],
) -> None:
if catalog_id not in self.grants:
return
catalog = self.grants[catalog_id]
catalog.remove_permission(
Permission(
principal=principal,
resource=resource,
permissions=permissions_to_revoke or [],
permissions_with_grant_options=permissions_with_grant_options_to_revoke
or [],
)
)
def list_permissions(
self,
catalog_id: str,
principal: Optional[Dict[str, str]] = None,
resource: Optional[ListPermissionsResource] = None,
resource_type: Optional[RessourceType] = None,
) -> List[Dict[str, Any]]:
"""
No pagination has been implemented yet.
"""
if catalog_id not in self.grants:
return []
permissions = list(self.grants[catalog_id].permissions)
def filter_for_principal(permission: Permission) -> bool:
return permission.principal == principal
if principal is not None:
permissions = list(filter(filter_for_principal, permissions))
def filter_for_resource_type(permission: Permission) -> bool:
if resource_type is None: # Check for mypy
return False
resource = permission.resource
if resource_type == RessourceType.catalog:
return "Catalog" in resource
elif resource_type == RessourceType.database:
return "Database" in resource
elif resource_type == RessourceType.data_location:
return "DataLocation" in resource
elif resource_type == RessourceType.table:
return "Table" in resource or "TableWithColumns" in resource
return False
if resource_type is not None:
permissions = list(filter(filter_for_resource_type, permissions))
def filter_for_resource(permission: Permission) -> bool:
"""
If catalog is provided:
only matching permissions with resource-type "Catalog" are returned;
if catalog is not provided and database is provided:
only matching permissions with resource-type "Database" are returned;
if catalog and database are not provided and table is provided:
only matching permissions with resource-type "Table" are returned;
if catalog and database and table are not provided and data location is provided:
only matching permissions with resource-type "DataLocation" are returned;
"""
if resource is None: # Check for linter
return False
permission_resource = permission.resource
catalog = resource.catalog
if catalog is not None and "Catalog" in permission_resource:
return catalog == permission_resource["Catalog"]
database = resource.database
if database is not None and "Database" in permission_resource:
equals = database.name == permission_resource["Database"]["Name"]
if database.catalog_id is not None:
equals = equals and (
database.catalog_id
== permission_resource["Database"].get("CatalogId")
)
return equals
table = resource.table
if table is not None and "Table" in permission_resource:
equals = (
table.database_name == permission_resource["Table"]["DatabaseName"]
)
if table.catalog_id is not None:
equals = equals and (
table.catalog_id
== permission_resource["Table"].get("CatalogId")
)
if table.name is not None and table.table_wildcard is None:
equals = equals and (
table.name == permission_resource["Table"]["Name"]
)
if table.name is None and table.table_wildcard is not None:
equals = equals and (
table.table_wildcard
== permission_resource["Table"]["TableWildcard"]
)
return equals
data_location = resource.data_location
if data_location is not None and "DataLocation" in permission_resource:
equals = (
data_location.resource_arn
== permission_resource["DataLocation"]["ResourceArn"]
)
if data_location.catalog_id is not None:
equals = equals and (
data_location.catalog_id
== permission_resource["DataLocation"].get("CatalogId")
)
return equals
return False
if resource is not None:
permissions = list(filter(filter_for_resource, permissions))
return [permission.to_external_form() for permission in permissions]
def create_lf_tag(self, catalog_id: str, key: str, values: List[str]) -> None:
# There is no ARN that we can use, so just create another unique identifier that's easy to recognize and reproduce
arn = f"arn:lakeformation:{catalog_id}"
tag_list = TaggingService.convert_dict_to_tags_input({key: values}) # type: ignore
self.tagger.tag_resource(arn=arn, tags=tag_list)
def get_lf_tag(self, catalog_id: str, key: str) -> List[str]:
# There is no ARN that we can use, so just create another unique identifier that's easy to recognize and reproduce
arn = f"arn:lakeformation:{catalog_id}"
all_tags = self.tagger.get_tag_dict_for_resource(arn=arn)
return all_tags.get(key, []) # type: ignore
def delete_lf_tag(self, catalog_id: str, key: str) -> None:
# There is no ARN that we can use, so just create another unique identifier that's easy to recognize and reproduce
arn = f"arn:lakeformation:{catalog_id}"
self.tagger.untag_resource_using_names(arn, tag_names=[key])
# Also remove any LF resource tags that used this tag-key
for db_name in self.lf_database_tags:
self.lf_database_tags[db_name] = [
tag for tag in self.lf_database_tags[db_name] if tag["TagKey"] != key
]
for table in self.lf_table_tags:
self.lf_table_tags[table] = [
tag for tag in self.lf_table_tags[table] if tag["TagKey"] != key
]
for column in self.lf_columns_tags:
self.lf_columns_tags[column] = [
tag for tag in self.lf_columns_tags[column] if tag["TagKey"] != key
]
def list_lf_tags(self, catalog_id: str) -> Dict[str, str]:
# There is no ARN that we can use, so just create another unique identifier that's easy to recognize and reproduce
arn = f"arn:lakeformation:{catalog_id}"
return self.tagger.get_tag_dict_for_resource(arn=arn)
def update_lf_tag(
self, catalog_id: str, tag_key: str, to_delete: List[str], to_add: List[str]
) -> None:
arn = f"arn:lakeformation:{catalog_id}"
existing_tags = self.list_lf_tags(catalog_id)
existing_tags[tag_key].extend(to_add or []) # type: ignore
for tag in to_delete or []:
existing_tags[tag_key].remove(tag) # type: ignore
self.tagger.tag_resource(
arn, TaggingService.convert_dict_to_tags_input(existing_tags)
)
def list_data_cells_filter(self) -> List[Dict[str, Any]]:
"""
This currently just returns an empty list, as the corresponding Create is not yet implemented
"""
return []
def batch_grant_permissions(
self, catalog_id: str, entries: List[Dict[str, Any]]
) -> None:
for entry in entries:
self.grant_permissions(
catalog_id=catalog_id,
principal=entry.get("Principal"), # type: ignore[arg-type]
resource=entry.get("Resource"), # type: ignore[arg-type]
permissions=entry.get("Permissions"), # type: ignore[arg-type]
permissions_with_grant_options=entry.get("PermissionsWithGrantOptions"), # type: ignore[arg-type]
)
def batch_revoke_permissions(
self, catalog_id: str, entries: List[Dict[str, Any]]
) -> None:
for entry in entries:
self.revoke_permissions(
catalog_id=catalog_id,
principal=entry.get("Principal"), # type: ignore[arg-type]
resource=entry.get("Resource"), # type: ignore[arg-type]
permissions_to_revoke=entry.get("Permissions"), # type: ignore[arg-type]
permissions_with_grant_options_to_revoke=entry.get( # type: ignore[arg-type]
"PermissionsWithGrantOptions"
),
)
def add_lf_tags_to_resource(
self, catalog_id: str, resource: Dict[str, Any], tags: List[Dict[str, str]]
) -> List[Dict[str, Any]]:
existing_lf_tags = self.list_lf_tags(catalog_id)
failures = []
for tag in tags:
if "CatalogId" not in tag:
tag["CatalogId"] = catalog_id
if tag["TagKey"] not in existing_lf_tags:
failures.append(
{
"LFTag": tag,
"Error": {
"ErrorCode": "EntityNotFoundException",
"ErrorMessage": "Tag or tag value does not exist.",
},
}
)
if failures:
return failures
if "Database" in resource:
db_catalog_id = resource["Database"].get("CatalogId", self.account_id)
db_name = resource["Database"]["Name"]
self.lf_database_tags[(db_catalog_id, db_name)] = tags
if "Table" in resource:
db_catalog_id = resource["Table"].get("CatalogId", self.account_id)
db_name = resource["Table"]["DatabaseName"]
name = resource["Table"]["Name"]
self.lf_table_tags[(db_catalog_id, db_name, name)] = tags
if "TableWithColumns" in resource:
db_catalog_id = resource["TableWithColumns"].get(
"CatalogId", self.account_id
)
db_name = resource["TableWithColumns"]["DatabaseName"]
name = resource["TableWithColumns"]["Name"]
for column in resource["TableWithColumns"]["ColumnNames"]:
self.lf_columns_tags[(db_catalog_id, db_name, name, column)] = tags
return failures
def get_resource_lf_tags(
self,
catalog_id: str, # pylint: disable=unused-argument
resource: Dict[str, Any],
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
database_tags = []
table_tags = []
column_tags = []
if "Database" in resource:
database_catalog_id = resource["Database"].get("CatalogId", self.account_id)
database_name = resource["Database"]["Name"]
database_tags = self.lf_database_tags[(database_catalog_id, database_name)]
if "Table" in resource:
db_catalog_id = resource["Table"].get("CatalogId", self.account_id)
db_name = resource["Table"]["DatabaseName"]
name = resource["Table"]["Name"]
table_tags = self.lf_table_tags[(db_catalog_id, db_name, name)]
if "TableWithColumns" in resource:
for column in resource["TableWithColumns"]["ColumnNames"]:
db_catalog_id = resource["TableWithColumns"].get(
"CatalogId", self.account_id
)
db_name = resource["TableWithColumns"]["DatabaseName"]
name = resource["TableWithColumns"]["Name"]
dct_key = (db_catalog_id, db_name, name, column)
if self.lf_columns_tags.get(dct_key):
column_tags.append(
{"Name": column, "LFTags": self.lf_columns_tags[dct_key]}
)
return database_tags, table_tags, column_tags
def remove_lf_tags_from_resource(
self, catalog_id: str, resource: Dict[str, Any], tags: List[Dict[str, str]]
) -> None:
for tag in tags:
if "CatalogId" not in tag:
tag["CatalogId"] = catalog_id
if "Database" in resource:
database_catalog_id = resource["Database"].get("CatalogId", self.account_id)
database_name = resource["Database"]["Name"]
existing_tags = self.lf_database_tags[(database_catalog_id, database_name)]
for tag in tags:
existing_tags.remove(tag)
if "Table" in resource:
db_catalog_id = resource["Table"].get("CatalogId", self.account_id)
db_name = resource["Table"]["DatabaseName"]
name = resource["Table"]["Name"]
existing_tags = self.lf_table_tags[(db_catalog_id, db_name, name)]
for tag in tags:
existing_tags.remove(tag)
if "TableWithColumns" in resource:
for column in resource["TableWithColumns"]["ColumnNames"]:
db_catalog_id = resource["TableWithColumns"].get(
"CatalogId", self.account_id
)
db_name = resource["TableWithColumns"]["DatabaseName"]
name = resource["TableWithColumns"]["Name"]
dct_key = (db_catalog_id, db_name, name, column)
existing_tags = self.lf_columns_tags[dct_key]
for tag in tags:
existing_tags.remove(tag)
lakeformation_backends = BackendDict(LakeFormationBackend, "lakeformation")