# -*- coding: utf-8 -*-
# Copyright (c) 2018 Tencent Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import time
try:
# py3
import configparser
from urllib.parse import urlencode
from urllib.request import urlopen
except ImportError:
# py2
import ConfigParser as configparser
from urllib import urlencode
from urllib import urlopen
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.common.common_client import CommonClient
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.profile.client_profile import ClientProfile
class Credential(object):
def __init__(self, secret_id, secret_key, token=None):
"""Tencent Cloud Credentials.
Access https://console.cloud.tencent.com/cam/capi to manage your
credentials.
:param secret_id: The secret id of your credential.
:type secret_id: str
:param secret_key: The secret key of your credential.
:type secret_key: str
:param token: The federation token of your credential, if this field
is specified, secret_id and secret_key should be set
accordingly, see: https://cloud.tencent.com/document/product/598/13896
"""
if secret_id is None or secret_id.strip() == "":
raise TencentCloudSDKException("InvalidCredential", "secret id should not be none or empty")
if secret_id.strip() != secret_id:
raise TencentCloudSDKException("InvalidCredential", "secret id should not contain spaces")
self.secret_id = secret_id
if secret_key is None or secret_key.strip() == "":
raise TencentCloudSDKException("InvalidCredential", "secret key should not be none or empty")
if secret_key.strip() != secret_key:
raise TencentCloudSDKException("InvalidCredential", "secret key should not contain spaces")
self.secret_key = secret_key
self.token = token
@property
def secretId(self):
return self.secret_id
@property
def secretKey(self):
return self.secret_key
class CVMRoleCredential(object):
_metadata_endpoint = "http://metadata.tencentyun.com/latest/meta-data/"
_role_endpoint = _metadata_endpoint + "cam/security-credentials/"
# In seconds.
# Signatrue will fail if request timestamp gap is larger than 300s,
# so a strategy to refresh token just before that time is acceptable.
_expired_timeout = 300
def __init__(self, role_name=None):
self.role = role_name
self._secret_id = None
self._secret_key = None
self._token = None
self._expired_ts = 0
@property
def secretId(self):
return self.secret_id
@property
def secret_id(self):
self.update_credential()
return self._secret_id
@property
def secretKey(self):
return self.secret_key
@property
def secret_key(self):
self.update_credential()
return self._secret_key
@property
def token(self):
self.update_credential()
return self._token
def get_role_name(self):
if self.role:
return self.role
try:
resp = urlopen(self._role_endpoint)
self.role = resp.read().decode("utf8")
except Exception as e:
raise TencentCloudSDKException("ClientError.MetadataError", str(e))
finally:
return self.role
def _need_refresh(self):
ts_remain = self._expired_ts - int(time.time())
if ts_remain <= self._expired_timeout:
return True
else:
return False
def update_credential(self):
if not self._need_refresh():
return
role = self.get_role_name()
try:
# TODO: what if role has special characters such as space and unicode?
resp = urlopen(self._role_endpoint + role)
# py3 requires it to be string rather than byte
data = resp.read().decode("utf8")
j = json.loads(data)
if j.get("Code") != "Success":
raise Exception("CVM role token data failed: %s" % data)
self._secret_id = j["TmpSecretId"]
self._secret_key = j["TmpSecretKey"]
self._token = j["Token"]
self._expired_ts = j["ExpiredTime"]
except Exception as e:
# we shoud log it
# maybe we should validate token to None as well
pass
def get_credential(self):
if not self.secret_id or not self.secret_key or not self.token:
return None
return self
class STSAssumeRoleCredential(object):
"""使用STSAssumeRoleCredential,制动role,
可以自动生成临时凭证,并使用临时凭证调用接口
"""
_region = "ap-guangzhou"
_version = '2018-08-13'
_service = "sts"
_endpoint = "sts.tencentcloudapi.com"
def __init__(self, secret_id, secret_key, role_arn, role_session_name, duration_seconds=7200, endpoint=None):
"""
:param secret_id: 接口调用凭证id
:type secret_id: str
:param secret_key: 接口调用凭证key
:type secret_key: str
https://cloud.tencent.com/document/api/1312/48197
:param role_arn: 角色的资源描述,参考官网文档 https://cloud.tencent.com/document/api/1312/48197 中 RoleArn 参数的描述。
:type role_arn: str
:param role_session_name: 临时会话名称,由用户自定义名称
:type role_session_name: str
:param duration_seconds: 获取临时凭证的有效期,默认7200s
:type duration_seconds: int
"""
self._long_secret_id = secret_id
self._long_secret_key = secret_key
self._role_arn = role_arn
self._role_session_name = role_session_name
self._duration_seconds = duration_seconds
self._token = None
self._tmp_secret_id = None
self._tmp_secret_key = None
self._expired_time = 0
self._last_role_arn = None
self._tmp_credential = None
if endpoint:
self._endpoint = endpoint
@property
def secretId(self):
self._need_refresh()
return self._tmp_secret_id
@property
def secretKey(self):
self._need_refresh()
return self._tmp_secret_key
@property
def secret_id(self):
self._need_refresh()
return self._tmp_secret_id
@property
def secret_key(self):
self._need_refresh()
return self._tmp_secret_key
@property
def token(self):
self._need_refresh()
return self._token
def _need_refresh(self):
"""
https://cloud.tencent.com/document/api/1312/48197
此函数自动使用初始secret_id和secret_key,自动调用上述链接中获取临时凭证的接口,并返回临时凭证
:param role_arn: 角色的资源描述,上述链接RoleArn参数中有详细获取方式
:type role_arn: str
:param role_session_name: 临时会话名称,由用户自定义名称
:type role_session_name: str
:param duration_seconds: 获取临时凭证的有效期,默认7200s
:type duration_seconds: int
"""
if None in [self._token, self._tmp_secret_key, self._tmp_secret_id] or self._expired_time < int(time.time()):
self.get_sts_tmp_role_arn()
def get_sts_tmp_role_arn(self):
cred = Credential(self._long_secret_id, self._long_secret_key)
http_profile = HttpProfile()
http_profile.endpoint = self._endpoint
client_profile = ClientProfile()
client_profile.httpProfile = http_profile
common_client = CommonClient(credential=cred, region=self._region, version=self._version,
service=self._service, profile=client_profile)
params = {
"RoleArn": self._role_arn,
"RoleSessionName": self._role_session_name,
"DurationSeconds": self._duration_seconds,
}
rsp = common_client.call_json("AssumeRole", params)
self._token = rsp["Response"]["Credentials"]["Token"]
self._tmp_secret_id = rsp["Response"]["Credentials"]["TmpSecretId"]
self._tmp_secret_key = rsp["Response"]["Credentials"]["TmpSecretKey"]
self._expired_time = rsp["Response"]["ExpiredTime"] - self._duration_seconds * 0.9
class EnvironmentVariableCredential(object):
def get_credential(self):
"""Tencent Cloud EnvironmentVariableCredential.
Access https://console.cloud.tencent.com/cam/capi to manage your
credentials.
:param secret_id: The secret id of your credential, get by environment variable TENCENTCLOUD_SECRET_ID
:type secret_id: str
:param secret_key: The secret key of your credential. get by environment variable TENCENTCLOUD_SECRET_KEY
:type secret_key: str
"""
self.secret_id = os.environ.get('TENCENTCLOUD_SECRET_ID')
self.secret_key = os.environ.get('TENCENTCLOUD_SECRET_KEY')
if self.secret_id is None or self.secret_key is None:
return None
if len(self.secret_id) == 0 or len(self.secret_key) == 0:
return None
return Credential(self.secret_id, self.secret_key)
class ProfileCredential(object):
def get_credential(self):
"""Tencent Cloud ProfileCredential.
Access https://console.cloud.tencent.com/cam/capi to manage your credentials.
default file position is "~/.tencentcloud/credentials" or "/etc/tencentcloud/credentials", it is ini format.
such as:
[default]
secret_id=""
secret_key=""
:param secret_id: The secret id of your credential.
:type secret_id: str
:param secret_key: The secret key of your credential.
:type secret_key: str
"""
home_path = os.environ.get('HOME') or os.environ.get('HOMEPATH')
if os.path.exists(home_path + "/.tencentcloud/credentials"):
file_path = home_path + "/.tencentcloud/credentials"
elif os.path.exists("/etc/tencentcloud/credentials"):
file_path = "/etc/tencentcloud/credentials"
else:
file_path = ""
if file_path:
# loads config
conf = configparser.ConfigParser()
conf.read(file_path)
ini_map = dict(conf._sections)
for k in dict(conf._sections):
option = dict(ini_map[k])
for key, value in dict(ini_map[k]).items():
option[key] = value.strip()
ini_map[k] = option
if "default" in ini_map:
client_config = ini_map.get("default")
self.secret_id = client_config.get('secret_id', None)
self.secret_key = client_config.get('secret_key', None)
self.role_arn = client_config.get('role_arn', None)
else:
self.secret_id = None
self.secret_key = None
self.role_arn = None
if self.secret_id is None or self.secret_key is None:
return None
if len(self.secret_id) == 0 or len(self.secret_key) == 0:
return None
return Credential(self.secret_id, self.secret_key)
class DefaultCredentialProvider(object):
"""Tencent Cloud DefaultCredentialProvider.
DefaultCredentialProvider will search credential by order EnvironmentVariableCredential ProfileCredential
and CVMRoleCredential.
"""
def __init__(self):
self.cred = None
def get_credential(self):
return self.get_credentials()
def get_credentials(self):
if self.cred is not None:
return self.cred
self.cred = EnvironmentVariableCredential().get_credential()
if self.cred is not None:
return self.cred
self.cred = ProfileCredential().get_credential()
if self.cred is not None:
return self.cred
self.cred = CVMRoleCredential().get_credential()
if self.cred is not None:
return self.cred
self.cred = DefaultTkeOIDCRoleArnProvider().get_credential()
if self.cred is not None:
return self.cred
raise TencentCloudSDKException("ClientSideError", "no valid credentail.")
class DefaultTkeOIDCRoleArnProvider(object):
def get_credential(self):
return self.get_credentials()
def get_credentials(self):
cred = OIDCRoleArnCredential('', '', '', '', '', 7200)
cred._is_tke = True
cred._init_from_tke()
return cred
class OIDCRoleArnCredential(object):
_version = '2018-08-13'
_service = "sts"
_action = 'AssumeRoleWithWebIdentity'
_default_session_name = 'tencentcloud-python-sdk-'
def __init__(self, region, provider_id, web_identity_token, role_arn, role_session_name, duration_seconds=7200):
self._region = region
self._provider_id = provider_id
self._web_identity_token = web_identity_token
self._role_arn = role_arn
self._role_session_name = role_session_name
self._duration_seconds = duration_seconds
self._token = None
self._tmp_secret_id = None
self._tmp_secret_key = None
self._expired_time = 0
self._is_tke = False
@property
def secretId(self):
self._keep_fresh()
return self._tmp_secret_id
@property
def secretKey(self):
self._keep_fresh()
return self._tmp_secret_key
@property
def secret_id(self):
self._keep_fresh()
return self._tmp_secret_id
@property
def secret_key(self):
self._keep_fresh()
return self._tmp_secret_key
@property
def token(self):
self._keep_fresh()
return self._token
def _keep_fresh(self):
if None in [self._token, self._tmp_secret_key, self._tmp_secret_id] or self._expired_time < int(time.time()):
self.refresh()
def refresh(self):
if self._is_tke:
self._init_from_tke()
common_client = CommonClient(credential=None, region=self._region, version=self._version, service=self._service)
params = {
"ProviderId": self._provider_id,
"WebIdentityToken": self._web_identity_token,
"RoleArn": self._role_arn,
"RoleSessionName": self._role_session_name,
"DurationSeconds": self._duration_seconds,
}
options = {'SkipSign': True}
rsp = common_client.call_json(self._action, params, options=options)
self._token = rsp["Response"]["Credentials"]["Token"]
self._tmp_secret_id = rsp["Response"]["Credentials"]["TmpSecretId"]
self._tmp_secret_key = rsp["Response"]["Credentials"]["TmpSecretKey"]
self._expired_time = rsp["Response"]["ExpiredTime"] - self._duration_seconds * 0.1
def _init_from_tke(self):
self._region = os.getenv('TKE_REGION')
if not self._region:
raise EnvironmentError("TKE_REGION not exist")
self._provider_id = os.getenv('TKE_PROVIDER_ID')
if not self._provider_id:
raise EnvironmentError("TKE_PROVIDER_ID not exist")
token_file = os.getenv('TKE_WEB_IDENTITY_TOKEN_FILE')
if not token_file:
raise EnvironmentError("TKE_WEB_IDENTITY_TOKEN_FILE not exist")
with open(token_file) as f:
self._web_identity_token = f.read()
self._role_arn = os.getenv('TKE_ROLE_ARN')
if not self._role_arn:
raise EnvironmentError("TKE_ROLE_ARN not exist")
self._role_session_name = self._default_session_name + str(time.time() * 1e6) # time in microsecond