Skip to content

dynamodb

Sample rules

A few rules that use objects from this package:

endpoint_dynamodb_route_table_exposure
from abc import abstractmethod
from typing import List, Dict
from cloudrail.knowledge.context.aws.resources.prefix_lists import PrefixLists, PrefixList
from cloudrail.knowledge.context.aws.resources.ec2.route import Route
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.context.aws.resources.service_name import AwsServiceType
from cloudrail.knowledge.rules.aws.context_aware.vpc_endpoints.abstract_vpc_endpoint_gateway_rule import AbstractVpcEndpointGatewayRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType



class AbstractVpcEndpointRouteTableExposureRule(AbstractVpcEndpointGatewayRule):

    @abstractmethod
    def get_id(self) -> str:
        pass

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        vpc_list, _, vpc_to_eni_map = self._init_maps(env_context)
        region_to_prefix_lists_map: Dict[str, PrefixLists] = self._create_prefix_list_by_region_map(env_context)
        issues_list: List[Issue] = []

        for vpc in vpc_list:
            for eni in vpc_to_eni_map.get(vpc, []):
                if self._is_service_eni_match(eni):
                    for subnet in vpc.subnets:
                        prefix_list: PrefixLists = region_to_prefix_lists_map[vpc.region]
                        aws_service_pl: PrefixList = prefix_list.get_prefix_lists_by_service(self.aws_service_type.value)
                        most_specific_service_pl_route: Route = self._get_most_specific_service_pl_route(subnet.route_table, aws_service_pl)
                        if vpc.endpoints and \
                                not (most_specific_service_pl_route and
                                     self._is_valid_vpc_endpoint_route(most_specific_service_pl_route, aws_service_pl, subnet.vpc.endpoints)):
                            issues_list.append(Issue(f"~The {vpc.get_type()}~. `{vpc.get_friendly_name()}` "
                                                     f"has a {aws_service_pl.service_name.upper()} Endpoint gateway "
                                                     f"but `{subnet.get_friendly_name()}` uses `{subnet.route_table.get_friendly_name()}`"
                                                     f", which does not have a route to the Endpoint gateway.", subnet.vpc, subnet.route_table))
                break

        return issues_list

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.vpc_endpoints)


class S3VpcEndpointRouteTableExposureRule(AbstractVpcEndpointRouteTableExposureRule):

    def __init__(self) -> None:
        super().__init__(AwsServiceType.S3, (443, 80), self.S3_SERVICES_EXCLUDE_LIST, False)

    def get_id(self) -> str:
        return "endpoint_s3_route_table_exposure"


class DynamoDbVpcEndpointRouteTableExposureRule(AbstractVpcEndpointRouteTableExposureRule):

    def __init__(self) -> None:
        super().__init__(AwsServiceType.DYNAMODB, (443,), self.DYNAMODB_SERVICES_INCLUDE_LIST, True)

    def get_id(self) -> str:
        return "endpoint_dynamodb_route_table_exposure"

BillingMode (str, Enum)

An enumeration.

DynamoDbTable (AwsResource)

Attributes:

Name Type Description
table_name str

The name of the table.

table_arn str

The ARN of the table.

billing_mode BillingMode

One of PROVISIONED or PAY_PER_REQUEST.

partition_key str

The partition key used.

sort_key str

The sort key used.

write_capacity int

The write capacity configured.

read_capacity int

The read capacity configured.

fields_attributes List[TableField]

The list of table field attributes (may be empty).

kms_key_id Optional[str]

The KMS key ID to use to encrypt this table, if one is used.

kms_data Optional[KmsKey]

The actual KmsKey object referenced by the KMS ID.

server_side_encryption bool

True if SSE is enabled.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

TableField dataclass

Attributes:

Name Type Description
name str

The field's name.

type TableFieldType

The field type (one of B for Byte, N for Number, S for String).

TableFieldType (str, Enum)

An enumeration.