# -*- coding: utf-8 -*- # # Copyright 2017-2021 Tencent Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy from datetime import datetime import hashlib import json import random import sys import time import uuid import warnings import logging import logging.handlers try: from urllib.parse import urlencode except ImportError: from urllib import urlencode import tencentcloud from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException from tencentcloud.common.exception import TencentCloudSDKException as SDKError from tencentcloud.common.http.request import ApiRequest, ResponsePrettyFormatter from tencentcloud.common.http.request import RequestInternal from tencentcloud.common.profile.client_profile import ClientProfile, RegionBreakerProfile from tencentcloud.common.sign import Sign from tencentcloud.common.circuit_breaker import CircuitBreaker from tencentcloud.common.retry import NoopRetryer warnings.filterwarnings("ignore", module="tencentcloud", category=UserWarning) _json_content = 'application/json' _multipart_content = 'multipart/form-data' _form_urlencoded_content = 'application/x-www-form-urlencoded' _octet_stream = "application/octet-stream" class EmptyHandler(logging.Handler): def emit(self, message): pass LOGGER_NAME = "tencentcloud_sdk_common" logger = logging.getLogger(LOGGER_NAME) logger.addHandler(EmptyHandler()) class AbstractClient(object): _requestPath = '/' _params = {} _apiVersion = '' _endpoint = '' _service = '' _sdkVersion = 'SDK_PYTHON_%s' % tencentcloud.__version__ _default_content_type = _form_urlencoded_content FMT = '%(asctime)s %(process)d %(filename)s L%(lineno)s %(levelname)s %(message)s' def __init__(self, credential, region, profile=None): self.credential = credential self.region = region self.profile = ClientProfile() if profile is None else profile is_http = True if self.profile.httpProfile.scheme == "http" else False self.request = ApiRequest(self._get_endpoint(), req_timeout=self.profile.httpProfile.reqTimeout, proxy=self.profile.httpProfile.proxy, is_http=is_http, certification=self.profile.httpProfile.certification, pre_conn_pool_size=self.profile.httpProfile.pre_conn_pool_size) if self.profile.httpProfile.keepAlive: self.request.set_keep_alive() self.circuit_breaker = None if not self.profile.disable_region_breaker: if self.profile.region_breaker_profile is None: self.profile.region_breaker_profile = RegionBreakerProfile() self.circuit_breaker = CircuitBreaker(self.profile.region_breaker_profile) if self.profile.request_client: self.request_client = self._sdkVersion + "; " + self.profile.request_client else: self.request_client = self._sdkVersion def _fix_params(self, params): if not isinstance(params, (dict,)): return params return self._format_params(None, params) def _format_params(self, prefix, params): d = {} if params is None: return d if not isinstance(params, (tuple, list, dict)): d[prefix] = params return d if isinstance(params, (list, tuple)): for idx, item in enumerate(params): if prefix: key = "{0}.{1}".format(prefix, idx) else: key = "{0}".format(idx) d.update(self._format_params(key, item)) return d if isinstance(params, dict): for k, v in params.items(): if prefix: key = '{0}.{1}'.format(prefix, k) else: key = '{0}'.format(k) d.update(self._format_params(key, v)) return d raise TencentCloudSDKException("ClientParamsError", "some params type error") def _build_req_inter(self, action, params, req_inter, options=None): options = options or {} if options.get('SkipSign'): self._build_req_without_signature(action, params, req_inter, options) elif self.profile.signMethod == "TC3-HMAC-SHA256" or options.get("IsMultipart") is True: self._build_req_with_tc3_signature(action, params, req_inter, options) elif self.profile.signMethod in ("HmacSHA1", "HmacSHA256"): self._build_req_with_old_signature(action, params, req_inter) else: raise TencentCloudSDKException("ClientError", "Invalid signature method.") def _build_req_with_old_signature(self, action, params, req): params = copy.deepcopy(self._fix_params(params)) params['Action'] = action[0].upper() + action[1:] params['RequestClient'] = self.request_client params['Nonce'] = random.randint(1, sys.maxsize) params['Timestamp'] = int(time.time()) params['Version'] = self._apiVersion if self.region: params['Region'] = self.region if self.credential.token: params['Token'] = self.credential.token if self.credential.secret_id: params['SecretId'] = self.credential.secret_id if self.profile.signMethod: params['SignatureMethod'] = self.profile.signMethod if self.profile.language: params['Language'] = self.profile.language signInParam = self._format_sign_string(params) params['Signature'] = Sign.sign(str(self.credential.secret_key), str(signInParam), str(self.profile.signMethod)) req.data = urlencode(params) req.header["Content-Type"] = "application/x-www-form-urlencoded" def _build_req_with_tc3_signature(self, action, params, req, options=None): content_type = self._default_content_type if req.method == 'GET': content_type = _form_urlencoded_content elif req.method == 'POST': content_type = _json_content options = options or {} if options.get("IsMultipart"): content_type = _multipart_content if options.get("IsOctetStream"): content_type = _octet_stream req.header["Content-Type"] = content_type if req.method == "GET" and content_type == _multipart_content: raise SDKError("ClientError", "Invalid request method GET for multipart.") endpoint = self._get_endpoint() timestamp = int(time.time()) req.header["Host"] = endpoint req.header["X-TC-Action"] = action[0].upper() + action[1:] req.header["X-TC-RequestClient"] = self.request_client req.header["X-TC-Timestamp"] = str(timestamp) req.header["X-TC-Version"] = self._apiVersion if self.profile.unsignedPayload is True: req.header["X-TC-Content-SHA256"] = "UNSIGNED-PAYLOAD" if self.region: req.header['X-TC-Region'] = self.region if self.credential.token: req.header['X-TC-Token'] = self.credential.token if self.profile.language: req.header['X-TC-Language'] = self.profile.language if req.method == 'GET': params = copy.deepcopy(self._fix_params(params)) req.data = urlencode(params) elif content_type == _json_content: req.data = json.dumps(params) elif content_type == _multipart_content: boundary = uuid.uuid4().hex req.header["Content-Type"] = content_type + "; boundary=" + boundary req.data = self._get_multipart_body(params, boundary, options) service = self._service date = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d') signature = self._get_tc3_signature(params, req, date, service, options) auth = "TC3-HMAC-SHA256 Credential=%s/%s/%s/tc3_request, SignedHeaders=content-type;host, Signature=%s" % ( self.credential.secret_id, date, service, signature) req.header["Authorization"] = auth def _get_tc3_signature(self, params, req, date, service, options=None): options = options or {} canonical_uri = req.uri canonical_querystring = "" payload = req.data if req.method == 'GET': canonical_querystring = req.data payload = "" if req.header.get("X-TC-Content-SHA256") == "UNSIGNED-PAYLOAD": payload = "UNSIGNED-PAYLOAD" if sys.version_info[0] == 3 and isinstance(payload, type("")): payload = payload.encode("utf8") payload_hash = hashlib.sha256(payload).hexdigest() canonical_headers = 'content-type:%s\nhost:%s\n' % ( req.header["Content-Type"], req.header["Host"]) signed_headers = 'content-type;host' canonical_request = '%s\n%s\n%s\n%s\n%s\n%s' % (req.method, canonical_uri, canonical_querystring, canonical_headers, signed_headers, payload_hash) algorithm = 'TC3-HMAC-SHA256' credential_scope = date + '/' + service + '/tc3_request' if sys.version_info[0] == 3: canonical_request = canonical_request.encode("utf8") digest = hashlib.sha256(canonical_request).hexdigest() string2sign = '%s\n%s\n%s\n%s' % (algorithm, req.header["X-TC-Timestamp"], credential_scope, digest) return Sign.sign_tc3(self.credential.secret_key, date, service, string2sign) def _build_req_without_signature(self, action, params, req, options=None): content_type = self._default_content_type if req.method == 'GET': content_type = _form_urlencoded_content elif req.method == 'POST': content_type = _json_content options = options or {} if options.get("IsMultipart"): content_type = _multipart_content if options.get("IsOctetStream"): content_type = _octet_stream req.header["Content-Type"] = content_type if req.method == "GET" and content_type == _multipart_content: raise SDKError("ClientError", "Invalid request method GET for multipart.") endpoint = self._get_endpoint() timestamp = int(time.time()) req.header["Host"] = endpoint req.header["X-TC-Action"] = action[0].upper() + action[1:] req.header["X-TC-RequestClient"] = self.request_client req.header["X-TC-Timestamp"] = str(timestamp) req.header["X-TC-Version"] = self._apiVersion if self.profile.unsignedPayload is True: req.header["X-TC-Content-SHA256"] = "UNSIGNED-PAYLOAD" if self.region: req.header['X-TC-Region'] = self.region if self.profile.language: req.header['X-TC-Language'] = self.profile.language if req.method == 'GET': params = copy.deepcopy(self._fix_params(params)) req.data = urlencode(params) elif content_type == _json_content: req.data = json.dumps(params) elif content_type == _multipart_content: boundary = uuid.uuid4().hex req.header["Content-Type"] = content_type + "; boundary=" + boundary req.data = self._get_multipart_body(params, boundary, options) req.header["Authorization"] = "SKIP" # it must return bytes instead of string def _get_multipart_body(self, params, boundary, options=None): if options is None: options = {} # boundary and params key will never contain unicode characters boundary = boundary.encode() binparas = options.get("BinaryParams", []) body = b'' for k, v in params.items(): kbytes = k.encode() body += b'--%s\r\n' % boundary body += b'Content-Disposition: form-data; name="%s"' % kbytes if k in binparas: body += b'; filename="%s"\r\n' % kbytes else: body += b"\r\n" if isinstance(v, list) or isinstance(v, dict): v = json.dumps(v) body += b'Content-Type: application/json\r\n' if sys.version_info[0] == 3 and isinstance(v, type("")): v = v.encode() body += b'\r\n%s\r\n' % v if body != b'': body += b'--%s--\r\n' % boundary return body def _check_status(self, resp_inter): if resp_inter.status_code != 200: logger.debug("GetResponse: %s", ResponsePrettyFormatter(resp_inter)) raise TencentCloudSDKException("ServerNetworkError", resp_inter.content) def _format_sign_string(self, params): formatParam = {} for k in params: formatParam[k.replace('_', '.')] = params[k] strParam = '&'.join('%s=%s' % (k, formatParam[k]) for k in sorted(formatParam)) msg = '%s%s%s?%s' % (self.profile.httpProfile.reqMethod, self._get_endpoint(), self._requestPath, strParam) return msg def _get_service_domain(self): rootDomain = self.profile.httpProfile.rootDomain return self._service + "." + rootDomain def _get_endpoint(self): endpoint = self.profile.httpProfile.endpoint if endpoint is None: endpoint = self._get_service_domain() return endpoint def _check_error(self, resp): ct = resp.headers.get('Content-Type') if ct not in ('text/plain', _json_content): return data = json.loads(resp.content) if "Error" in data["Response"]: code = data["Response"]["Error"]["Code"] message = data["Response"]["Error"]["Message"] reqid = data["Response"]["RequestId"] logger.debug("GetResponse: %s", ResponsePrettyFormatter(resp)) raise TencentCloudSDKException(code, message, reqid) if "DeprecatedWarning" in data["Response"]: import warnings warnings.filterwarnings("default") warnings.warn("This action is deprecated, detail: %s" % data["Response"]["DeprecatedWarning"], DeprecationWarning) @staticmethod def _process_response_sse(resp): logger.debug("GetResponse: %s", ResponsePrettyFormatter(resp, format_body=False)) e = {} for line in resp.iter_lines(): if not line: yield e e = {} continue logger.debug("GetResponse: %s", line) line = line.decode('utf-8') # comment if line[0] == ':': continue colon_idx = line.find(':') key = line[:colon_idx] val = line[colon_idx + 1:] # If value starts with a U+0020 SPACE character, remove it from value. if val and val[0] == " ": val = val[1:] if key == 'data': # The spec allows for multiple data fields per event, concatenated them with "\n". if 'data' not in e: e['data'] = val else: e['data'] += '\n' + val elif key in ('event', 'id'): e[key] = val elif key == 'retry': e[key] = int(val) @staticmethod def _process_response_json(resp, resp_type): resp_obj = json.loads(resp.content)["Response"] model = resp_type() model._deserialize(resp_obj) return model def _call(self, action, params, options=None, headers=None): if headers is None: headers = {} if not isinstance(headers, dict): raise TencentCloudSDKException("ClientError", "headers must be a dict.") if "x-tc-traceid" not in {k.lower() for k in headers.keys()}: headers["X-TC-TraceId"] = str(uuid.uuid4()) if not self.profile.disable_region_breaker: return self._call_with_region_breaker(action, params, options, headers) req = RequestInternal(self._get_endpoint(), self.profile.httpProfile.reqMethod, self._requestPath, header=headers) self._build_req_inter(action, params, req, options) if self.profile.httpProfile.apigw_endpoint: req.host = self.profile.httpProfile.apigw_endpoint req.header["Host"] = req.host return self.request.send_request(req) def call(self, action, params, options=None, headers=None): def _call_once(): resp = self._call(action, params, options, headers) self._check_status(resp) self._check_error(resp) logger.debug("GetResponse: %s", ResponsePrettyFormatter(resp)) return resp retryer = self.profile.retryer or NoopRetryer() return retryer.send_request(_call_once).content def _call_with_region_breaker(self, action, params, options=None, headers=None): endpoint = self._get_endpoint() generation, need_break = self.circuit_breaker.before_requests() if need_break: endpoint = self._service + "." + self.profile.region_breaker_profile.backup_endpoint req = RequestInternal(endpoint, self.profile.httpProfile.reqMethod, self._requestPath, header=headers) self._build_req_inter(action, params, req, options) resp = None try: resp = self.request.send_request(req) self.circuit_breaker.after_requests(generation, True) return resp except TencentCloudSDKException as e: if resp and "RequestId" in resp.content and e.code != "InternalError": self.circuit_breaker.after_requests(generation, True) else: self.circuit_breaker.after_requests(generation, False) def call_with_region_breaker(self, action, params, options=None, headers=None): resp = self._call_with_region_breaker(action, params, options, headers) self._check_status(resp) self._check_error(resp) return resp.content def call_octet_stream(self, action, headers, body): """ Invoke API with application/ocet-stream content-type. Note: 1. only specific API can be invoked in such manner. 2. only TC3-HMAC-SHA256 signature method can be specified. 3. only POST request method can be specified :type action: str :param action: Specific API action name. :type headers: dict :param headers: Header parameters for this API. :type body: bytes :param body: Bytes of requested body """ if self.profile.signMethod != "TC3-HMAC-SHA256": raise SDKError("ClientError", "Invalid signature method.") if self.profile.httpProfile.reqMethod != "POST": raise SDKError("ClientError", "Invalid request method.") req = RequestInternal(self._get_endpoint(), self.profile.httpProfile.reqMethod, self._requestPath, header=headers) req.data = body options = {"IsOctetStream": True} self._build_req_inter(action, None, req, options) resp = self.request.send_request(req) self._check_status(resp) self._check_error(resp) return json.loads(resp.content) def call_json(self, action, params, headers=None, options=None): """ Call api with json object and return with json object. :type action: str :param action: api name e.g. ``DescribeInstances`` :type params: dict :param params: params with this action :type headers: dict :param headers: request header, like {"X-TC-TraceId": "ffe0c072-8a5d-4e17-8887-a8a60252abca"} :type options: dict :param options: request options, like {"SkipSign": False, "IsMultipart": False, "IsOctetStream": False, "BinaryParams": []} """ def _call_once(): resp = self._call(action, params, options, headers) self._check_status(resp) self._check_error(resp) logger.debug("GetResponse: %s", ResponsePrettyFormatter(resp)) return resp retryer = self.profile.retryer or NoopRetryer() return json.loads(retryer.send_request(_call_once).content) def call_sse(self, action, params, headers=None, options=None): def _call_once(): resp = self._call(action, params, options, headers) self._check_status(resp) self._check_error(resp) return resp retryer = self.profile.retryer or NoopRetryer() return self._process_response_sse(retryer.send_request(_call_once)) def _call_and_deserialize(self, action, params, resp_type, headers=None, options=None): def _call_once(): resp = self._call(action, params, options, headers) self._check_status(resp) self._check_error(resp) return resp retryer = self.profile.retryer or NoopRetryer() return self._process_response(retryer.send_request(_call_once), resp_type) def _process_response(self, resp, resp_type): if resp.headers.get('Content-Type') == "text/event-stream": logger.debug("GetResponse: %s", ResponsePrettyFormatter(resp, format_body=False)) return self._process_response_sse(resp) logger.debug("GetResponse: %s", ResponsePrettyFormatter(resp)) return self._process_response_json(resp, resp_type) def set_stream_logger(self, stream=None, level=logging.DEBUG, log_format=None): """ Add a stream handler :type stream: IO[str] :param stream: e.g. ``sys.stdout`` ``sys.stdin`` ``sys.stderr`` :type level: int :param level: Logging level, e.g. ``logging.INFO`` :type log_format: str :param log_format: Log message format """ log = logging.getLogger(LOGGER_NAME) log.setLevel(level) sh = logging.StreamHandler(stream) sh.setLevel(level) if log_format is None: log_format = self.FMT formatter = logging.Formatter(log_format) sh.setFormatter(formatter) log.addHandler(sh) def set_file_logger(self, file_path, level=logging.DEBUG, log_format=None): """ Add a file handler :type file_path: str :param file_path: path of log file :type level: int :param level: Logging level, e.g. ``logging.INFO`` :type log_format: str :param log_format: Log message format """ log = logging.getLogger(LOGGER_NAME) log.setLevel(level) mb = 1024 * 1024 fh = logging.handlers.RotatingFileHandler(file_path, maxBytes=512 * mb, backupCount=10) fh.setLevel(level) if log_format is None: log_format = self.FMT formatter = logging.Formatter(log_format) fh.setFormatter(formatter) log.addHandler(fh) def set_default_logger(self): """ Set default log handler """ log = logging.getLogger(LOGGER_NAME) log.handlers = [] logger.addHandler(EmptyHandler())
Memory