import time import base64 import hashlib from authlib.common.security import generate_token from authlib.common.urls import extract_params from authlib.common.encoding import to_native from .wrapper import OAuth1Request from .signature import ( SIGNATURE_HMAC_SHA1, SIGNATURE_PLAINTEXT, SIGNATURE_RSA_SHA1, SIGNATURE_TYPE_HEADER, SIGNATURE_TYPE_BODY, SIGNATURE_TYPE_QUERY, ) from .signature import ( sign_hmac_sha1, sign_rsa_sha1, sign_plaintext ) from .parameters import ( prepare_form_encoded_body, prepare_headers, prepare_request_uri_query, ) CONTENT_TYPE_FORM_URLENCODED = 'application/x-www-form-urlencoded' CONTENT_TYPE_MULTI_PART = 'multipart/form-data' class ClientAuth: SIGNATURE_METHODS = { SIGNATURE_HMAC_SHA1: sign_hmac_sha1, SIGNATURE_RSA_SHA1: sign_rsa_sha1, SIGNATURE_PLAINTEXT: sign_plaintext, } @classmethod def register_signature_method(cls, name, sign): """Extend client signature methods. :param name: A string to represent signature method. :param sign: A function to generate signature. The ``sign`` method accept 2 parameters:: def custom_sign_method(client, request): # client is the instance of Client. return 'your-signed-string' Client.register_signature_method('custom-name', custom_sign_method) """ cls.SIGNATURE_METHODS[name] = sign def __init__(self, client_id, client_secret=None, token=None, token_secret=None, redirect_uri=None, rsa_key=None, verifier=None, signature_method=SIGNATURE_HMAC_SHA1, signature_type=SIGNATURE_TYPE_HEADER, realm=None, force_include_body=False): self.client_id = client_id self.client_secret = client_secret self.token = token self.token_secret = token_secret self.redirect_uri = redirect_uri self.signature_method = signature_method self.signature_type = signature_type self.rsa_key = rsa_key self.verifier = verifier self.realm = realm self.force_include_body = force_include_body def get_oauth_signature(self, method, uri, headers, body): """Get an OAuth signature to be used in signing a request To satisfy `section 3.4.1.2`_ item 2, if the request argument's headers dict attribute contains a Host item, its value will replace any netloc part of the request argument's uri attribute value. .. _`section 3.4.1.2`: https://tools.ietf.org/html/rfc5849#section-3.4.1.2 """ sign = self.SIGNATURE_METHODS.get(self.signature_method) if not sign: raise ValueError('Invalid signature method.') request = OAuth1Request(method, uri, body=body, headers=headers) return sign(self, request) def get_oauth_params(self, nonce, timestamp): oauth_params = [ ('oauth_nonce', nonce), ('oauth_timestamp', timestamp), ('oauth_version', '1.0'), ('oauth_signature_method', self.signature_method), ('oauth_consumer_key', self.client_id), ] if self.token: oauth_params.append(('oauth_token', self.token)) if self.redirect_uri: oauth_params.append(('oauth_callback', self.redirect_uri)) if self.verifier: oauth_params.append(('oauth_verifier', self.verifier)) return oauth_params def _render(self, uri, headers, body, oauth_params): if self.signature_type == SIGNATURE_TYPE_HEADER: headers = prepare_headers(oauth_params, headers, realm=self.realm) elif self.signature_type == SIGNATURE_TYPE_BODY: if CONTENT_TYPE_FORM_URLENCODED in headers.get('Content-Type', ''): decoded_body = extract_params(body) or [] body = prepare_form_encoded_body(oauth_params, decoded_body) headers['Content-Type'] = CONTENT_TYPE_FORM_URLENCODED elif self.signature_type == SIGNATURE_TYPE_QUERY: uri = prepare_request_uri_query(oauth_params, uri) else: raise ValueError('Unknown signature type specified.') return uri, headers, body def sign(self, method, uri, headers, body): """Sign the HTTP request, add OAuth parameters and signature. :param method: HTTP method of the request. :param uri: URI of the HTTP request. :param body: Body payload of the HTTP request. :param headers: Headers of the HTTP request. :return: uri, headers, body """ nonce = generate_nonce() timestamp = generate_timestamp() if body is None: body = b'' # transform int to str timestamp = str(timestamp) if headers is None: headers = {} oauth_params = self.get_oauth_params(nonce, timestamp) # https://datatracker.ietf.org/doc/html/draft-eaton-oauth-bodyhash-00.html # include oauth_body_hash if body and headers.get('Content-Type') != CONTENT_TYPE_FORM_URLENCODED: oauth_body_hash = base64.b64encode(hashlib.sha1(body).digest()) oauth_params.append(('oauth_body_hash', oauth_body_hash.decode('utf-8'))) uri, headers, body = self._render(uri, headers, body, oauth_params) sig = self.get_oauth_signature(method, uri, headers, body) oauth_params.append(('oauth_signature', sig)) uri, headers, body = self._render(uri, headers, body, oauth_params) return uri, headers, body def prepare(self, method, uri, headers, body): """Add OAuth parameters to the request. Parameters may be included from the body if the content-type is urlencoded, if no content type is set, a guess is made. """ content_type = to_native(headers.get('Content-Type', '')) if self.signature_type == SIGNATURE_TYPE_BODY: content_type = CONTENT_TYPE_FORM_URLENCODED elif not content_type and extract_params(body): content_type = CONTENT_TYPE_FORM_URLENCODED if CONTENT_TYPE_FORM_URLENCODED in content_type: headers['Content-Type'] = CONTENT_TYPE_FORM_URLENCODED uri, headers, body = self.sign(method, uri, headers, body) elif self.force_include_body: # To allow custom clients to work on non form encoded bodies. uri, headers, body = self.sign(method, uri, headers, body) else: # Omit body data in the signing of non form-encoded requests uri, headers, _ = self.sign(method, uri, headers, b'') body = b'' return uri, headers, body def generate_nonce(): return generate_token() def generate_timestamp(): return str(int(time.time()))
Memory