Skip to content

s3

Sample rules

A few rules that use objects from this package:

s3_acl_disallow_public_and_cross_account
from typing import List, Dict

from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext


class S3AclAllowPublicAccessRule(AwsBaseRule):
    # Notice:
    # - this rule don't issue on violation on the bucket object level
    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues_list: List[Issue] = []
        for bucket in env_context.s3_buckets:
            for resource in bucket.publicly_allowing_resources:
                issues_list.append(Issue(f"~The {bucket.get_type()}~. `{bucket.get_friendly_name()}` is publicly exposed through"
                                         f" the `{resource.get_type()}` with identifier `{resource.get_friendly_name()}`.",
                                         bucket, resource))
        return issues_list

    def get_id(self) -> str:
        return "s3_acl_disallow_public_and_cross_account"

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.s3_buckets)
s3_lambda_indirect_exposure
from typing import List, Dict

from cloudrail.knowledge.context.aws.resources.apigateway.rest_api_gw import RestApiGw
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class S3BucketLambdaIndirectExposureRule(AwsBaseRule):

    def get_id(self) -> str:
        return 's3_lambda_indirect_exposure'

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []

        for s3_bucket in env_context.s3_buckets:
            for agw_method in s3_bucket.exposed_to_agw_methods:
                if not self._is_api_gateway_public(agw_method.rest_api_id, env_context.rest_api_gw):
                    continue
                issues.append(Issue(evidence=f"The S3 Bucket `{s3_bucket.get_friendly_name()}`. is exposed via the execution role in "
                                             f"Lambda Function `{agw_method.integration.lambda_func_integration.get_friendly_name()}`. "
                                             f"which can be invoked by public API Gateway `{agw_method.get_friendly_name()}`",
                                    exposed=s3_bucket,
                                    violating=agw_method.integration.lambda_func_integration))

        return issues

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.s3_buckets
                    and environment_context.lambda_function_list
                    and environment_context.api_gateway_methods)

    @staticmethod
    def _is_api_gateway_public(rest_api_gw_id: str, api_gateways: List[RestApiGw]) -> bool:
        for api_gateway in api_gateways:
            if api_gateway.rest_api_gw_id == rest_api_gw_id:
                return api_gateway.is_public
        raise Exception(f'Rest API Gateway {rest_api_gw_id} could not be found')
vpc_endpoint_s3_exposure
from abc import abstractmethod
from typing import List, Dict
from cloudrail.knowledge.context.mergeable import Mergeable
from cloudrail.knowledge.context.aws.resources.ec2.network_interface import NetworkInterface
from cloudrail.knowledge.context.aws.resources.prefix_lists import PrefixLists, PrefixList
from cloudrail.knowledge.context.aws.resources.ec2.route import Route
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.context.aws.resources.service_name import AwsServiceType
from cloudrail.knowledge.rules.aws.context_aware.vpc_endpoints.abstract_vpc_endpoint_gateway_rule import AbstractVpcEndpointGatewayRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType



class AbstractVpcEndpointGatewayIsNotUsedRule(AbstractVpcEndpointGatewayRule):

    @abstractmethod
    def get_id(self) -> str:
        pass

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        vpc_list, region_to_service_map, vpc_to_eni_map = self._init_maps(env_context)
        issues_list: List[Issue] = []
        region_to_prefix_lists_map: Dict[str, PrefixLists] = self._create_prefix_list_by_region_map(env_context)
        for vpc in vpc_list:
            for eni in vpc_to_eni_map.get(vpc, []):
                if self._is_service_eni_match(eni):
                    prefix_list: PrefixLists = region_to_prefix_lists_map[vpc.region]
                    aws_service_pl: PrefixList = prefix_list.get_prefix_lists_by_service(self.aws_service_type.value)
                    if self._add_new_issue_from_outbound(eni, region_to_service_map, issues_list, aws_service_pl):
                        break

        return issues_list

    def _add_new_issue_from_outbound(self, eni: NetworkInterface, region_to_service_map: Dict[str, List[Mergeable]],
                                     issues_list: List[Issue], service_pl: PrefixList) -> bool:
        if self._is_public_connection_exist(eni):
            most_specific_service_pl_route: Route = self._get_most_specific_service_pl_route(eni.subnet.route_table, service_pl)
            if not self._is_valid_vpc_endpoint_route(most_specific_service_pl_route, service_pl, eni.vpc.endpoints):
                for service in region_to_service_map[eni.vpc.region]:
                    issues_list.append(Issue(f"~The {eni.vpc.get_type()}~. `{eni.vpc.get_friendly_name()}` in region `{eni.vpc.region}`"
                                             f" is in use but not leveraging {self.aws_service_type.name} Endpoint Gateway", service, eni.vpc))
                return True
        return False


class S3VpcEndpointGatewayNotUsedRule(AbstractVpcEndpointGatewayIsNotUsedRule):

    def __init__(self) -> None:
        super().__init__(AwsServiceType.S3, (443, 80), self.S3_SERVICES_EXCLUDE_LIST, False)

    def get_id(self) -> str:
        return "vpc_endpoint_s3_exposure"


class DynamoDbVpcEndpointGatewayNotUsedRule(AbstractVpcEndpointGatewayIsNotUsedRule):

    def __init__(self) -> None:
        super().__init__(AwsServiceType.DYNAMODB, (443,), self.DYNAMODB_SERVICES_INCLUDE_LIST, True)

    def get_id(self) -> str:
        return "vpc_endpoint_dynamodb_exposure"

PublicAccessBlockLevel (Enum)

An enumeration.

PublicAccessBlockSettings (AwsResource)

Attributes:

Name Type Description
bucket_name_or_account_id str

An access block may apply to either a specific bucket or a whole account, this is the identifier to use.

block_public_acls bool

True if the block shouldn't allow public ACLs.

ignore_public_acls bool

True if the block should ignore public ACLs.

block_public_policy bool

True if the block shouldn't allow public policies.

restrict_public_buckets bool

True if the block should enforce restriction on public buckets.

access_level PublicAccessBlockLevel

Whether the block is on the account or specific bucket.

account_access_block PublicAccessBlockSettings

The account-level access block, if this one targets a bucket only.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

GranteeTypes (Enum)

An enumeration.

S3ACL (AwsResource)

Attributes:

Name Type Description
actions Tuple[str]

A list of S3 actions included in this S3 ACL, based on the list from the S3Permission supplied.

type GranteeTypes

The type of the grantee - GROUP or CANONICAL_USER.

type_value str

The value of the grantee. If type is GROUP, this will be the group identifier. If CANONICAL_USER, this will be the canonical identifier for the user.

bucket_name str

The bucket to apply the ACL to.

owner_id str

The owner of this ACL.

owner_name str

The name of the owner.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

S3Permission (Enum)

An enumeration.

S3PredefinedGroups (Enum)

An enumeration.

S3Bucket (ConnectionInstance, PoliciedResource)

Attributes:

Name Type Description
bucket_name

The name of the bucket.

arn

The ARN of the bucket.

resource_based_policy

the policy of this S3 bucket.

acls List[S3ACL]

The list of ACLs applied to this bucket.

public_access_block_settings Optional[PublicAccessBlockSettings]

The public access block applied to this bucket specifically, if any (or None).

access_points List[S3BucketAccessPoint]

The access points defined for this bucket.

encryption_data Optional[S3BucketEncryption]

The encryption configuration for this bucket.

bucket_objects List[S3BucketObject]

A list of objects in this bucket. NOTE: This is not fetched from the live environment and will only include objects that are defined in the infrastructure-as-code reviewed by Cloudrail.

versioning_data S3BucketVersioning

Configuration of versioning on the bucket.

publicly_allowing_resources List[AwsResource]

ACL's/Policies that expose this bucket to the internet.

exposed_to_agw_methods List[ApiGatewayMethod]

The ApiGateway methods that can acccess this bucket.

is_logger

Indicates if this bucket is the target bucket for logging of another bucket.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

S3BucketAccessPoint (PoliciedResource)

Attributes:

Name Type Description
bucket_name

The name of the bucket this access point applies to.

name

The name of the access point.

network_origin

The network-level source of the traffic.

arn

The ARN of the access point.

policy

The policy applied to the access point.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

S3BucketAccessPointNetworkOrigin dataclass

S3BucketAccessPointNetworkOrigin(access_type: str, vpc_id: str)

S3BucketAccessPointNetworkOriginType (str, Enum)

An enumeration.

S3BucketEncryption (AwsResource)

Attributes:

Name Type Description
bucket_name str

The bucket the encryption settings apply to.

encrypted bool

True if encryption is enabled.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

S3BucketObject (AwsResource)

NOTE: Cloudrail does not map objects in the live environment. Instead. only onjects specifically defined in infrastructure-as-code will be included as part of the context.

Attributes:

Name Type Description
bucket_name str

The bucket the object is in.

key str

The ARN of the key used to encrypt the object, if any.

encrypted bool

True if the object is encrypted.

owning_bucket 'S3Bucket'

A pointer to the owning bucket.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

S3BucketVersioning (AwsResource)

Attributes:

Name Type Description
bucket_name str

The name of the bucket the versioning config applies to.

versioning bool

True if versioning is enabled.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

S3BucketLogging (AwsResource)

Attributes:

Name Type Description
bucket_name str

The bucket which the logs associated with.

target_bucket Optional[str]

The bucket name in which to send logs for this bucket.

target_prefix Optional[str]

A key prefix for log objects.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process