# -*- coding: utf-8 -*- # # Copyright 1999-2017 Tencent Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy import time import random import sys import os import warnings try: from urllib.parse import urlencode except ImportError: from urllib import urlencode import QcloudApi from QcloudApi.common.api_exception import ApiClientParamException from QcloudApi.common.api_exception import ApiServerNetworkException from QcloudApi.common.request import ApiRequest from QcloudApi.common.request import RequestInternal from QcloudApi.common.sign import Sign warnings.filterwarnings("ignore") class Base(object): requestHost = '' requestUri = '/v2/index.php' _params = {} version = 'SDK_PYTHON_%s' % QcloudApi.__version__ def __init__(self, config): self.secretId = config['secretId'] self.secretKey = config['secretKey'] self.defaultRegion = config.get('Region', '') self.Version = config.get('Version', '') self.method = config.get('method', 'GET').upper() self.sign_method = config.get('SignatureMethod', 'HmacSHA1') self.requestHost = self.requestHost or config.get("endpoint") self.apiRequest = ApiRequest(self.requestHost) self.Token = config.get('Token', '') def set_req_timeout(self, req_timeout): self.apiRequest.set_req_timeout(req_timeout) def open_debug(self): self.apiRequest.set_debug(True) def close_debug(self): self.apiRequest.set_debug(False) def _build_header(self, req): if self.apiRequest.is_keep_alive(): req.header["Connection"] = "Keep-Alive" if req.method == 'POST': req.header["Content-Type"] = "application/x-www-form-urlencoded" def _fix_params(self, params): if not isinstance(params, (dict,)): return params return self._format_params(None, params) def _format_params(self, prefix, params): d = {} if params is None: return d if not isinstance(params, (tuple, list, dict)): d[prefix] = params return d if isinstance(params, (list, tuple)): for idx, item in enumerate(params): if prefix: key = "{0}.{1}".format(prefix, idx) else: key = "{0}".format(idx) d.update(self._format_params(key, item)) return d if isinstance(params, dict): for k, v in params.items(): if prefix: key = '{0}.{1}'.format(prefix, k) else: key = '{0}'.format(k) d.update(self._format_params(key, v)) return d raise ApiClientParamException('some params type error') def _build_req_inter(self, action, params, req_inter): _params = copy.deepcopy(self._fix_params(params)) _params['Action'] = action[0].upper() + action[1:] _params['RequestClient'] = self.version if ('Region' not in _params and self.defaultRegion != ''): _params['Region'] = self.defaultRegion if ('Version' not in _params and self.Version != ''): _params['Version'] = self.Version if ('Token' not in _params and self.Token != ''): _params['Token'] = self.Token if ('SecretId' not in _params): _params['SecretId'] = self.secretId if ('Nonce' not in _params): _params['Nonce'] = random.randint(1, sys.maxsize) if ('Timestamp' not in _params): _params['Timestamp'] = int(time.time()) if ('SignatureMethod' in _params): self.sign_method = _params['SignatureMethod'] else: _params['SignatureMethod'] = self.sign_method sign = Sign(self.secretId, self.secretKey) _params['Signature'] = sign.make( req_inter.host, req_inter.uri, _params, req_inter.method, self.sign_method) req_inter.data = urlencode(_params) self._build_header(req_inter) def _check_status(self, resp_inter): if resp_inter.status != 200: raise ApiServerNetworkException( resp_inter.status, resp_inter.header, resp_inter.data) def generateUrl(self, action, params): req_inter = RequestInternal( self.requestHost, self.method, self.requestUri) self._build_req_inter(action, params, req_inter) url = 'https://%s%s' % (req_inter.host, req_inter.uri) if (req_inter.method == 'GET'): url += '?' + req_inter.data return url def call(self, action, params, files={}): req_inter = RequestInternal( self.requestHost, self.method, self.requestUri) self._build_req_inter(action, params, req_inter) resp_inter = self.apiRequest.send_request(req_inter) self._check_status(resp_inter) return resp_inter.data
Memory