Skip to content

rds

Sample rules

A few rules that use objects from this package:

public_access_db_rds_rule
from typing import Dict, List

from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class PublicAccessDbRdsRule(AwsBaseRule):

    def get_id(self) -> str:
        return 'public_access_db_rds_rule'

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []

        for rds_cluster in env_context.rds_clusters:
            for rds_instance in rds_cluster.cluster_instances:
                security_group = rds_instance.security_group_allowing_public_access
                if security_group:
                    issues.append(Issue(
                        f'~Internet~. '
                        f"Instance `{rds_instance.get_friendly_name()}` is "
                        f"in {rds_cluster.get_type()} `{rds_cluster.get_friendly_name()}`. "
                        f"{rds_instance.get_type()} is on {rds_instance.network_resource.vpc.get_type()}"
                        f" `{rds_instance.network_resource.vpc.get_friendly_name()}`. "
                        f"{rds_instance.get_type()} uses subnet(s) "
                        f"`{', '.join([x.get_friendly_name() for x in rds_instance.network_resource.security_groups])}`. "
                        f"{rds_instance.get_type()} is reachable from the internet due to subnet(s) and route table(s). "
                        f"Subnet uses Network ACL's "
                        f"`{', '.join([x.network_acl.get_friendly_name() for x in rds_instance.network_resource.subnets])}`. "
                        f"Network ACL's and security group(s) allows the RDS configured ports. "
                        f'~{rds_instance.get_type()}~',
                        rds_cluster, security_group))

        for rds_instance in (x for x in env_context.rds_instances if x.db_cluster_id is None):
            security_group = rds_instance.security_group_allowing_public_access
            if security_group:
                issues.append(Issue(
                    f"~Internet~. {rds_instance.get_type()} `{rds_instance.get_friendly_name()}` "
                    f"is on {rds_instance.network_resource.vpc.get_type()} "
                    f"`{rds_instance.network_resource.vpc.get_friendly_name()}`. "
                    f'instance uses security groups '
                    f"`{', '.join([x.get_friendly_name() for x in rds_instance.network_resource.security_groups])}` . "
                    f"{rds_instance.get_type()} uses the subnets "
                    f"`{', '.join([x.get_friendly_name() for x in rds_instance.network_resource.subnets])}`. "
                    f"{rds_instance.get_type()} is reachable from the internet due to subnet(s) and route table(s). "
                    f"Subnet uses Network ACL's "
                    f"`{', '.join([x.network_acl.get_friendly_name() for x in rds_instance.network_resource.subnets])}`. "
                    f"Network ACL's and security group(s) allows the RDS configured ports. "
                    f"~{rds_instance.get_type()}~", rds_instance, security_group))

        return issues

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.rds_clusters or environment_context.rds_instances)
indirect_public_access_db_rds
from typing import List, Dict, Optional

from cloudrail.knowledge.context.aws.resources.indirect_public_connection_data import IndirectPublicConnectionData
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class IndirectPublicAccessDbRds(AwsBaseRule):

    def get_id(self) -> str:
        return 'indirect_public_access_db_rds'

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []

        for rds_cluster in env_context.rds_clusters:
            for rds_instance in rds_cluster.cluster_instances:
                violation_info: Optional[IndirectPublicConnectionData] = rds_instance.indirect_public_connection_data
                if violation_info:
                    issues.append(Issue(
                        f"~Internet~. "
                        f"Instance resides in subnet(s) that are routable to internet gateway. Instance has public IP address."
                        f"Instance accepts incoming traffic on port 443. "
                        f"~Instance `{violation_info.target_eni.owner.get_friendly_name()}`~. "
                        f"{rds_cluster.get_type()} `{rds_cluster.get_friendly_name()}` "
                        f"is exposed due to {rds_instance.get_type()} `{rds_instance.get_friendly_name()}`. "
                        f"{rds_instance.get_type()} uses subnets "
                        f"`{', '.join([x.get_friendly_name() for x in rds_instance.network_resource.subnets])}`. "
                        f"{rds_instance.get_type()} "
                        f"resides in same subnet as instance `{violation_info.target_eni.owner.get_friendly_name()}`. "
                        f"{rds_instance.get_type()} relies on Network ACL's "
                        f"`{', '.join([x.network_acl.get_friendly_name() for x in rds_instance.network_resource.subnets])}`. "
                        f"{rds_instance.get_type()} also relies on security groups "
                        f"`{', '.join([x.get_friendly_name() for x in rds_instance.network_resource.security_groups])}`. "
                        f"{rds_instance.get_type()} is accessible from instance within public subnet. "
                        f"~{rds_instance.get_type()} `{rds_instance.get_friendly_name()}`~. ",
                        rds_cluster,
                        violation_info.security_group))

        for rds_instance in (x for x in env_context.rds_instances if x.db_cluster_id is None):
            violation_info: Optional[IndirectPublicConnectionData] = rds_instance.indirect_public_connection_data
            if violation_info:
                issues.append(Issue(
                    f"~Internet~. "
                    f"Instance resides in subnet(s) that are routable to internet gateway. Instance has public IP address. "
                    f"Instance accepts incoming traffic on port 443. "
                    f"~Instance `{violation_info.target_eni.owner.get_friendly_name()}`~. "
                    f"{rds_instance.get_type()} `{rds_instance.get_friendly_name()}` does not have public IP associated. "
                    f"{rds_instance.get_type()} is on subnets: "
                    f"`{', '.join([x.get_friendly_name() for x in rds_instance.network_resource.subnets])}`. "
                    f"{rds_instance.get_type()} resides in same subnet as instance `{violation_info.target_eni.owner.name}`. "
                    f"{rds_instance.get_type()} relies on Network ACL's "
                    f"`{', '.join([x.network_acl.get_friendly_name() for x in rds_instance.network_resource.subnets])}`. "
                    f"{rds_instance.get_type()} also relies on security groups "
                    f"`{', '.join([x.get_friendly_name() for x in rds_instance.network_resource.security_groups])}`. "
                    f"{rds_instance.get_type()} is accessible from instance within public subnet. "
                    f"~{rds_instance.get_type()}~",
                    rds_instance,
                    violation_info.security_group))
        return issues

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.rds_clusters or environment_context.rds_instances)

DbSubnetGroup (AwsResource)

Attributes:

Name Type Description
name str

The name of the subnet group.

subnet_ids List[str]

The IDs of the subnets in the group.

db_subnet_group_arn str

The ARN of the subnet group.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

RdsCluster (ConnectionInstance, AwsResource)

Attributes:

Name Type Description
cluster_id str

The ID of the cluster.

arn str

The ARN of the RDS cluster.

port int

The port the cluster is configured to listen to.

db_subnet_group_name str

The name of DB subnet group used.

security_group_ids List[str]

The IDs of the security groups used by this database.

is_in_default_vpc bool

True if the RDS is in the default VPC.

encrypted_at_rest bool

True if the database is configured to be encrypted at rest.

backup_retention_period int

Number of days to retain backups.

engine_type str

The Database engine name to be used for this RDS cluster.

engine_version str

The Database engine version to be used for this RDS cluster.

iam_database_authentication_enabled bool

An indication whether authentication to the RDS cluster using IAM entities is enabled.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

RdsGlobalCluster (AwsResource)

Attributes:

Name Type Description
cluster_id str

The ID of the cluster.

encrypted_at_rest bool

True if the cluster is set to be encrypted at rest.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

RdsGlobalClusterRawData dataclass

RdsGlobalClusterRawData(source_id: 'Optional[str]' = None)

RdsInstance (NetworkEntity, INetworkConfiguration)

Attributes:

Name Type Description
name

The name of the instance.

arn str

The ARN of the instance.

port int

The port the instance is listening on.

publicly_accessible bool

True if the database is configured to have a public IP address.

db_subnet_group_name str

The name of the SB subnet group.

security_group_ids List[str]

The IDs of the security groups in use with the instance.

db_cluster_id Optional[str]

The cluster ID, if this instance is part of a cluster, or None otherwise.

instance_id Optional[str]

The RDS instance ID, if this instance is a standalone DB, or None otherwise.

encrypted_at_rest bool

True is encryption at rest is enabled.

performance_insights_enabled bool

True if performance insights is enabled.

performance_insights_kms_key Optional[str]

The ARN of the KMS Key used to encrypt the performance insights, if any is used.

performance_insights_kms_data Optional[KmsKey]

The actual KMS Key object, if a KMS key is used to encrypt performance insights.

security_group_allowing_public_access Optional[SecurityGroup]

A security group that allows access from the internet. This value will be None when this resource is not accessible from the internet.

indirect_public_connection_data Optional[IndirectPublicConnectionData]

The data that describes that a publicly-accessible resource can access this resource by a security group of this resource.

backup_retention_period Optional[int]

Number of days to retain backups.

engine_type str

The Database engine name to be used for this RDS instance.

engine_version str

The Database engine version to be used for this RDS instance.

iam_database_authentication_enabled Optional[bool]

An indication whether authentication to the RDS instance using IAM entities is enabled.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process