# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from typing import Any, cast, Dict, Optional, Tuple, Union, TYPE_CHECKING
try:
from urllib.parse import quote
except ImportError:
from urllib2 import quote # type: ignore
from azure.core import MatchConditions
from ._generated.models import (
ArrowConfiguration,
BlobTag,
BlobTags,
ContainerCpkScopeInfo,
CpkScopeInfo,
DelimitedTextConfiguration,
JsonTextConfiguration,
LeaseAccessConditions,
ModifiedAccessConditions,
QueryFormat,
QueryFormatType,
QuerySerialization,
SourceModifiedAccessConditions
)
from ._models import ContainerEncryptionScope, DelimitedJsonDialect
if TYPE_CHECKING:
from ._lease import BlobLeaseClient
_SUPPORTED_API_VERSIONS = [
'2019-02-02',
'2019-07-07',
'2019-10-10',
'2019-12-12',
'2020-02-10',
'2020-04-08',
'2020-06-12',
'2020-08-04',
'2020-10-02',
'2020-12-06',
'2021-02-12',
'2021-04-10',
'2021-06-08',
'2021-08-06',
'2021-12-02',
'2022-11-02',
'2023-01-03',
'2023-05-03',
'2023-08-03',
'2023-11-03',
'2024-05-04',
'2024-08-04',
'2024-11-04',
'2025-01-05',
]
def _get_match_headers(
kwargs: Dict[str, Any],
match_param: str,
etag_param: str
) -> Tuple[Optional[str], Optional[Any]]:
if_match = None
if_none_match = None
match_condition = kwargs.pop(match_param, None)
if match_condition == MatchConditions.IfNotModified:
if_match = kwargs.pop(etag_param, None)
if not if_match:
raise ValueError(f"'{match_param}' specified without '{etag_param}'.")
elif match_condition == MatchConditions.IfPresent:
if_match = '*'
elif match_condition == MatchConditions.IfModified:
if_none_match = kwargs.pop(etag_param, None)
if not if_none_match:
raise ValueError(f"'{match_param}' specified without '{etag_param}'.")
elif match_condition == MatchConditions.IfMissing:
if_none_match = '*'
elif match_condition is None:
if kwargs.get(etag_param):
raise ValueError(f"'{etag_param}' specified without '{match_param}'.")
else:
raise TypeError(f"Invalid match condition: {match_condition}")
return if_match, if_none_match
def get_access_conditions(lease: Optional[Union["BlobLeaseClient", str]]) -> Optional[LeaseAccessConditions]:
try:
lease_id = lease.id # type: ignore
except AttributeError:
lease_id = lease # type: ignore
return LeaseAccessConditions(lease_id=lease_id) if lease_id else None
def get_modify_conditions(kwargs: Dict[str, Any]) -> ModifiedAccessConditions:
if_match, if_none_match = _get_match_headers(kwargs, 'match_condition', 'etag')
return ModifiedAccessConditions(
if_modified_since=kwargs.pop('if_modified_since', None),
if_unmodified_since=kwargs.pop('if_unmodified_since', None),
if_match=if_match or kwargs.pop('if_match', None),
if_none_match=if_none_match or kwargs.pop('if_none_match', None),
if_tags=kwargs.pop('if_tags_match_condition', None)
)
def get_source_conditions(kwargs: Dict[str, Any]) -> SourceModifiedAccessConditions:
if_match, if_none_match = _get_match_headers(kwargs, 'source_match_condition', 'source_etag')
return SourceModifiedAccessConditions(
source_if_modified_since=kwargs.pop('source_if_modified_since', None),
source_if_unmodified_since=kwargs.pop('source_if_unmodified_since', None),
source_if_match=if_match or kwargs.pop('source_if_match', None),
source_if_none_match=if_none_match or kwargs.pop('source_if_none_match', None),
source_if_tags=kwargs.pop('source_if_tags_match_condition', None)
)
def get_cpk_scope_info(kwargs: Dict[str, Any]) -> Optional[CpkScopeInfo]:
if 'encryption_scope' in kwargs:
return CpkScopeInfo(encryption_scope=kwargs.pop('encryption_scope'))
return None
def get_container_cpk_scope_info(kwargs: Dict[str, Any]) -> Optional[ContainerCpkScopeInfo]:
encryption_scope = kwargs.pop('container_encryption_scope', None)
if encryption_scope:
if isinstance(encryption_scope, ContainerEncryptionScope):
return ContainerCpkScopeInfo(
default_encryption_scope=encryption_scope.default_encryption_scope,
prevent_encryption_scope_override=encryption_scope.prevent_encryption_scope_override
)
if isinstance(encryption_scope, dict):
return ContainerCpkScopeInfo(
default_encryption_scope=encryption_scope['default_encryption_scope'],
prevent_encryption_scope_override=encryption_scope.get('prevent_encryption_scope_override')
)
raise TypeError("Container encryption scope must be dict or type ContainerEncryptionScope.")
return None
def get_api_version(kwargs: Dict[str, Any]) -> str:
api_version = kwargs.get('api_version', None)
if api_version and api_version not in _SUPPORTED_API_VERSIONS:
versions = '\n'.join(_SUPPORTED_API_VERSIONS)
raise ValueError(f"Unsupported API version '{api_version}'. Please select from:\n{versions}")
return api_version or _SUPPORTED_API_VERSIONS[-1]
def get_version_id(self_vid: Optional[str], kwargs: Dict[str, Any]) -> Optional[str]:
if 'version_id' in kwargs:
return cast(str, kwargs.pop('version_id'))
return self_vid
def serialize_blob_tags_header(tags: Optional[Dict[str, str]] = None) -> Optional[str]:
if tags is None:
return None
components = []
if tags:
for key, value in tags.items():
components.append(quote(key, safe='.-'))
components.append('=')
components.append(quote(value, safe='.-'))
components.append('&')
if components:
del components[-1]
return ''.join(components)
def serialize_blob_tags(tags: Optional[Dict[str, str]] = None) -> BlobTags:
tag_list = []
if tags:
tag_list = [BlobTag(key=k, value=v) for k, v in tags.items()]
return BlobTags(blob_tag_set=tag_list)
def serialize_query_format(formater: Union[str, DelimitedJsonDialect]) -> Optional[QuerySerialization]:
if formater == "ParquetDialect":
qq_format = QueryFormat(type=QueryFormatType.PARQUET, parquet_text_configuration=' ') #type: ignore [arg-type]
elif isinstance(formater, DelimitedJsonDialect):
json_serialization_settings = JsonTextConfiguration(record_separator=formater.delimiter)
qq_format = QueryFormat(type=QueryFormatType.JSON, json_text_configuration=json_serialization_settings)
elif hasattr(formater, 'quotechar'): # This supports a csv.Dialect as well
try:
headers = formater.has_header # type: ignore
except AttributeError:
headers = False
if isinstance(formater, str):
raise ValueError("Unknown string value provided. Accepted values: ParquetDialect")
csv_serialization_settings = DelimitedTextConfiguration(
column_separator=formater.delimiter,
field_quote=formater.quotechar,
record_separator=formater.lineterminator,
escape_char=formater.escapechar,
headers_present=headers
)
qq_format = QueryFormat(
type=QueryFormatType.DELIMITED,
delimited_text_configuration=csv_serialization_settings
)
elif isinstance(formater, list):
arrow_serialization_settings = ArrowConfiguration(schema=formater)
qq_format = QueryFormat(type=QueryFormatType.arrow, arrow_configuration=arrow_serialization_settings)
elif not formater:
return None
else:
raise TypeError("Format must be DelimitedTextDialect or DelimitedJsonDialect or ParquetDialect.")
return QuerySerialization(format=qq_format)