Skip to content

neptune

Sample rules

A few rules that use objects from this package:

public_access_db_neptune
from typing import Dict, List

from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class PublicAccessDbNeptuneRule(AwsBaseRule):

    def get_id(self) -> str:
        return 'public_access_db_neptune'

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []

        for neptune_cluster in env_context.neptune_clusters:
            for neptune_instance in neptune_cluster.cluster_instances:
                security_group = neptune_instance.security_group_allowing_public_access
                if security_group:
                    issues.append(Issue(
                        f'~Internet~. '
                        f"Instance `{neptune_instance.get_friendly_name()}` is "
                        f"in {neptune_cluster.get_type()} `{neptune_cluster.get_friendly_name()}`. "
                        f"{neptune_instance.get_type()} is on {neptune_instance.network_resource.vpc.get_type()}"
                        f" `{neptune_instance.network_resource.vpc.get_friendly_name()}`. "
                        f"{neptune_instance.get_type()} uses subnet(s) "
                        f"`{', '.join([x.get_friendly_name() for x in neptune_instance.network_resource.subnets])}`. "
                        f"{neptune_instance.get_type()} is reachable from the internet due to subnet(s) and route table(s). "
                        f"Subnet uses Network ACL(s) "
                        f"`{', '.join({x.network_acl.get_friendly_name() for x in neptune_instance.network_resource.subnets})}`. "
                        f"Network ACL's and security group(s) allows the {neptune_instance.get_type()} configured ports. "
                        f'~{neptune_instance.get_type()}~',
                        neptune_instance, security_group))
        return issues

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.neptune_clusters)

NeptuneCluster (ConnectionInstance, AwsResource)

Attributes:

Name Type Description
cluster_identifier str

The identifier of the cluster.

arn str

The ARN of the cluster.

encrypted_at_rest bool

True if the cluster is configured to be encrypted at rest.

port int

The port the cluster is listening to.

db_subnet_group_name str

The subnet group's name.

security_group_ids List[str]

The IDs of the securiry groups used by the cluster, if any.

cluster_id str

The ID of the cluster.

kms_key str

The KMS key used for encryption, if any.

kms_data Optional[KmsKey]

A pointer to the KMS key, if one is used.

is_in_default_vpc bool

True if the cluster is in the default VPC.

cluster_instances List[NeptuneInstance]

The instances of the cluster.

cloudwatch_logs_exports Optional[list]

A list of the log types this Neptune cluster is configured to export to Cloudwatch Logs.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

NeptuneInstance (NetworkEntity, INetworkConfiguration)

Attributes:

Name Type Description
name

The name of the instance.

arn str

The ARN of the instance.

port int

The port the instance is accessible through.

cluster_identifier str

The ID of the cluster it belongs to.

publicly_accessible

True if the instance is set to publicly accessible.

instance_identifier str

The identifier of the instance.

security_group_allowing_public_access Optional[SecurityGroup]

A security group that allows access from the internet. This value will be None when this resource is not accessible from the internet.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process