"""ACMPCABackend class with methods for supported APIs."""
import base64
import datetime
from typing import Any, Dict, List, Optional, Tuple, cast
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.core.utils import unix_time, utcnow
from moto.moto_api._internal import mock_random
from moto.utilities.tagging_service import TaggingService
from moto.utilities.utils import get_partition
from .exceptions import (
InvalidS3ObjectAclInCrlConfiguration,
InvalidStateException,
MalformedCertificateAuthorityException,
ResourceNotFoundException,
)
class CertificateAuthority(BaseModel):
def __init__(
self,
region: str,
account_id: str,
certificate_authority_configuration: Dict[str, Any],
certificate_authority_type: str,
revocation_configuration: Dict[str, Any],
security_standard: Optional[str],
):
self.id = mock_random.uuid4()
self.arn = f"arn:{get_partition(region)}:acm-pca:{region}:{account_id}:certificate-authority/{self.id}"
self.account_id = account_id
self.region_name = region
self.certificate_authority_configuration = certificate_authority_configuration
self.certificate_authority_type = certificate_authority_type
self.revocation_configuration: Dict[str, Any] = {
"CrlConfiguration": {"Enabled": False}
}
self.set_revocation_configuration(revocation_configuration)
self.created_at = unix_time()
self.updated_at: Optional[float] = None
self.status = "PENDING_CERTIFICATE"
self.usage_mode = "SHORT_LIVED_CERTIFICATE"
self.security_standard = security_standard or "FIPS_140_2_LEVEL_3_OR_HIGHER"
self.password = str(mock_random.uuid4()).encode("utf-8")
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
self.private_bytes = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.BestAvailableEncryption(self.password),
)
self.certificate_bytes: bytes = b""
self.certificate_chain: Optional[bytes] = None
self.issued_certificates: Dict[str, bytes] = dict()
self.subject = self.certificate_authority_configuration.get("Subject", {})
def generate_cert(
self,
subject: x509.Name,
public_key: rsa.RSAPublicKey,
extensions: List[Tuple[x509.ExtensionType, bool]],
) -> bytes:
builder = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(self.issuer)
.public_key(public_key)
.serial_number(x509.random_serial_number())
.not_valid_before(utcnow())
.not_valid_after(utcnow() + datetime.timedelta(days=365))
)
for extension, critical in extensions:
builder = builder.add_extension(extension, critical)
cert = builder.sign(self.key, hashes.SHA512(), default_backend())
return cert.public_bytes(serialization.Encoding.PEM)
@property
def key(self) -> rsa.RSAPrivateKey:
private_key = serialization.load_pem_private_key(
self.private_bytes,
password=self.password,
)
return cast(rsa.RSAPrivateKey, private_key)
@property
def certificate(self) -> Optional[x509.Certificate]:
if self.certificate_bytes:
return x509.load_pem_x509_certificate(self.certificate_bytes)
return None
@property
def issuer(self) -> x509.Name:
name_attributes = []
if "Country" in self.subject:
name_attributes.append(
x509.NameAttribute(x509.NameOID.COUNTRY_NAME, self.subject["Country"])
)
if "State" in self.subject:
name_attributes.append(
x509.NameAttribute(
x509.NameOID.STATE_OR_PROVINCE_NAME, self.subject["State"]
)
)
if "Organization" in self.subject:
name_attributes.append(
x509.NameAttribute(
x509.NameOID.ORGANIZATION_NAME, self.subject["Organization"]
)
)
if "OrganizationalUnit" in self.subject:
name_attributes.append(
x509.NameAttribute(
x509.NameOID.ORGANIZATIONAL_UNIT_NAME,
self.subject["OrganizationalUnit"],
)
)
if "CommonName" in self.subject:
name_attributes.append(
x509.NameAttribute(x509.NameOID.COMMON_NAME, self.subject["CommonName"])
)
return x509.Name(name_attributes)
@property
def csr(self) -> bytes:
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(self.issuer)
.add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True,
)
.sign(self.key, hashes.SHA256())
)
return csr.public_bytes(serialization.Encoding.PEM)
def issue_certificate(self, csr_bytes: bytes, template_arn: Optional[str]) -> str:
csr = x509.load_pem_x509_csr(base64.b64decode(csr_bytes))
extensions = self._x509_extensions(csr, template_arn)
new_cert = self.generate_cert(
subject=csr.subject,
public_key=csr.public_key(), # type: ignore[arg-type]
extensions=extensions,
)
cert_id = str(mock_random.uuid4()).replace("-", "")
cert_arn = f"arn:{get_partition(self.region_name)}:acm-pca:{self.region_name}:{self.account_id}:certificate-authority/{self.id}/certificate/{cert_id}"
self.issued_certificates[cert_arn] = new_cert
return cert_arn
def _x509_extensions(
self, csr: x509.CertificateSigningRequest, template_arn: Optional[str]
) -> List[Tuple[x509.ExtensionType, bool]]:
"""
Uses a PCA certificate template ARN to return a list of X.509 extensions.
These extensions are part of the constructed certificate.
See https://docs.aws.amazon.com/privateca/latest/userguide/UsingTemplates.html
"""
extensions = []
if template_arn == "arn:aws:acm-pca:::template/RootCACertificate/V1":
extensions.extend(
[
(
x509.BasicConstraints(ca=True, path_length=None),
True,
),
(
x509.KeyUsage(
crl_sign=True,
key_cert_sign=True,
digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
encipher_only=False,
decipher_only=False,
),
True,
),
(
x509.SubjectKeyIdentifier.from_public_key(csr.public_key()),
False,
),
]
)
elif template_arn in (
"arn:aws:acm-pca:::template/EndEntityCertificate/V1",
None,
):
extensions.extend(
[
(
x509.BasicConstraints(ca=False, path_length=None),
True,
),
(
x509.AuthorityKeyIdentifier.from_issuer_public_key(
self.key.public_key()
),
False,
),
(
x509.SubjectKeyIdentifier.from_public_key(csr.public_key()),
False,
),
(
x509.KeyUsage(
crl_sign=False,
key_cert_sign=False,
digital_signature=True,
content_commitment=False,
key_encipherment=True,
data_encipherment=False,
key_agreement=False,
encipher_only=False,
decipher_only=False,
),
True,
),
(
x509.ExtendedKeyUsage(
[
x509.ExtendedKeyUsageOID.SERVER_AUTH,
x509.ExtendedKeyUsageOID.CLIENT_AUTH,
]
),
False,
),
]
)
cn = csr.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
if cn:
extensions.append(
(
x509.SubjectAlternativeName([x509.DNSName(cn[0].value)]), # type: ignore[arg-type]
False,
),
)
return extensions
def get_certificate(self, certificate_arn: str) -> bytes:
return self.issued_certificates[certificate_arn]
def set_revocation_configuration(
self, revocation_configuration: Optional[Dict[str, Any]]
) -> None:
if revocation_configuration is not None:
self.revocation_configuration = revocation_configuration
if "CrlConfiguration" in self.revocation_configuration:
acl = self.revocation_configuration["CrlConfiguration"].get(
"S3ObjectAcl", None
)
if acl is None:
self.revocation_configuration["CrlConfiguration"]["S3ObjectAcl"] = (
"PUBLIC_READ"
)
else:
if acl not in ["PUBLIC_READ", "BUCKET_OWNER_FULL_CONTROL"]:
raise InvalidS3ObjectAclInCrlConfiguration(acl)
@property
def not_valid_after(self) -> Optional[float]:
if self.certificate is None:
return None
try:
return unix_time(self.certificate.not_valid_after_utc.replace(tzinfo=None))
except AttributeError:
return unix_time(self.certificate.not_valid_after)
@property
def not_valid_before(self) -> Optional[float]:
if self.certificate is None:
return None
try:
return unix_time(self.certificate.not_valid_before_utc.replace(tzinfo=None))
except AttributeError:
return unix_time(self.certificate.not_valid_before)
def import_certificate_authority_certificate(
self, certificate: bytes, certificate_chain: Optional[bytes]
) -> None:
try:
x509.load_pem_x509_certificate(certificate)
except ValueError:
raise MalformedCertificateAuthorityException()
self.certificate_bytes = certificate
self.certificate_chain = certificate_chain
self.status = "ACTIVE"
self.updated_at = unix_time()
def to_json(self) -> Dict[str, Any]:
dct = {
"Arn": self.arn,
"OwnerAccount": self.account_id,
"CertificateAuthorityConfiguration": self.certificate_authority_configuration,
"Type": self.certificate_authority_type,
"RevocationConfiguration": self.revocation_configuration,
"CreatedAt": self.created_at,
"Status": self.status,
"UsageMode": self.usage_mode,
"KeyStorageSecurityStandard": self.security_standard,
}
if self.updated_at:
dct["LastStateChangeAt"] = self.updated_at
if self.certificate:
dct.update(
{
"NotBefore": self.not_valid_before,
"NotAfter": self.not_valid_after,
}
)
return dct
class ACMPCABackend(BaseBackend):
"""Implementation of ACMPCA APIs."""
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.certificate_authorities: Dict[str, CertificateAuthority] = dict()
self.tagger = TaggingService()
def create_certificate_authority(
self,
certificate_authority_configuration: Dict[str, Any],
revocation_configuration: Dict[str, Any],
certificate_authority_type: str,
security_standard: Optional[str],
tags: List[Dict[str, str]],
) -> str:
"""
The following parameters are not yet implemented: IdempotencyToken, KeyStorageSecurityStandard, UsageMode
"""
authority = CertificateAuthority(
region=self.region_name,
account_id=self.account_id,
certificate_authority_configuration=certificate_authority_configuration,
certificate_authority_type=certificate_authority_type,
revocation_configuration=revocation_configuration,
security_standard=security_standard,
)
self.certificate_authorities[authority.arn] = authority
if tags:
self.tagger.tag_resource(authority.arn, tags)
return authority.arn
def describe_certificate_authority(
self, certificate_authority_arn: str
) -> CertificateAuthority:
if certificate_authority_arn not in self.certificate_authorities:
raise ResourceNotFoundException(certificate_authority_arn)
return self.certificate_authorities[certificate_authority_arn]
def get_certificate_authority_certificate(
self, certificate_authority_arn: str
) -> Tuple[bytes, Optional[bytes]]:
ca = self.describe_certificate_authority(certificate_authority_arn)
if ca.status != "ACTIVE":
raise InvalidStateException(certificate_authority_arn)
return ca.certificate_bytes, ca.certificate_chain
def get_certificate_authority_csr(self, certificate_authority_arn: str) -> bytes:
ca = self.describe_certificate_authority(certificate_authority_arn)
return ca.csr
def list_tags(
self, certificate_authority_arn: str
) -> Dict[str, List[Dict[str, str]]]:
"""
Pagination is not yet implemented
"""
return self.tagger.list_tags_for_resource(certificate_authority_arn)
def update_certificate_authority(
self,
certificate_authority_arn: str,
revocation_configuration: Dict[str, Any],
status: str,
) -> None:
ca = self.describe_certificate_authority(certificate_authority_arn)
if status is not None:
ca.status = status
ca.set_revocation_configuration(revocation_configuration)
ca.updated_at = unix_time()
def delete_certificate_authority(self, certificate_authority_arn: str) -> None:
ca = self.describe_certificate_authority(certificate_authority_arn)
ca.status = "DELETED"
def issue_certificate(
self, certificate_authority_arn: str, csr: bytes, template_arn: Optional[str]
) -> str:
"""
The following parameters are not yet implemented: ApiPassthrough, SigningAlgorithm, Validity, ValidityNotBefore, IdempotencyToken
Some fields of the resulting certificate will have default values, instead of using the CSR
"""
ca = self.describe_certificate_authority(certificate_authority_arn)
certificate_arn = ca.issue_certificate(csr, template_arn)
return certificate_arn
def get_certificate(
self, certificate_authority_arn: str, certificate_arn: str
) -> Tuple[bytes, Optional[str]]:
"""
The CertificateChain will always return None for now
"""
ca = self.describe_certificate_authority(certificate_authority_arn)
certificate = ca.get_certificate(certificate_arn)
certificate_chain = None
return certificate, certificate_chain
def import_certificate_authority_certificate(
self,
certificate_authority_arn: str,
certificate: bytes,
certificate_chain: Optional[bytes],
) -> None:
ca = self.describe_certificate_authority(certificate_authority_arn)
ca.import_certificate_authority_certificate(certificate, certificate_chain)
def revoke_certificate(
self,
certificate_authority_arn: str,
certificate_serial: str,
revocation_reason: str,
) -> None:
"""
This is currently a NO-OP
"""
def tag_certificate_authority(
self, certificate_authority_arn: str, tags: List[Dict[str, str]]
) -> None:
self.tagger.tag_resource(certificate_authority_arn, tags)
def untag_certificate_authority(
self, certificate_authority_arn: str, tags: List[Dict[str, str]]
) -> None:
self.tagger.untag_resource_using_tags(certificate_authority_arn, tags)
acmpca_backends = BackendDict(ACMPCABackend, "acm-pca")