s3
Sample rules
A few rules that use objects from this package:
s3_acl_disallow_public_and_cross_account
from typing import List, Dict
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
class S3AclAllowPublicAccessRule(AwsBaseRule):
# Notice:
# - this rule don't issue on violation on the bucket object level
def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
issues_list: List[Issue] = []
for bucket in env_context.s3_buckets:
for resource in bucket.publicly_allowing_resources:
issues_list.append(Issue(f"~The {bucket.get_type()}~. `{bucket.get_friendly_name()}` is publicly exposed through"
f" the `{resource.get_type()}` with identifier `{resource.get_friendly_name()}`.",
bucket, resource))
return issues_list
def get_id(self) -> str:
return "s3_acl_disallow_public_and_cross_account"
def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
return bool(environment_context.s3_buckets)
s3_lambda_indirect_exposure
from typing import List, Dict
from cloudrail.knowledge.context.aws.resources.apigateway.rest_api_gw import RestApiGw
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
class S3BucketLambdaIndirectExposureRule(AwsBaseRule):
def get_id(self) -> str:
return 's3_lambda_indirect_exposure'
def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
issues: List[Issue] = []
for s3_bucket in env_context.s3_buckets:
for agw_method in s3_bucket.exposed_to_agw_methods:
if not self._is_api_gateway_public(agw_method.rest_api_id, env_context.rest_api_gw):
continue
issues.append(Issue(evidence=f"The S3 Bucket `{s3_bucket.get_friendly_name()}`. is exposed via the execution role in "
f"Lambda Function `{agw_method.integration.lambda_func_integration.get_friendly_name()}`. "
f"which can be invoked by public API Gateway `{agw_method.get_friendly_name()}`",
exposed=s3_bucket,
violating=agw_method.integration.lambda_func_integration))
return issues
def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
return bool(environment_context.s3_buckets
and environment_context.lambda_function_list
and environment_context.api_gateway_methods)
@staticmethod
def _is_api_gateway_public(rest_api_gw_id: str, api_gateways: List[RestApiGw]) -> bool:
for api_gateway in api_gateways:
if api_gateway.rest_api_gw_id == rest_api_gw_id:
return api_gateway.is_public
raise Exception(f'Rest API Gateway {rest_api_gw_id} could not be found')
vpc_endpoint_s3_exposure
from abc import abstractmethod
from typing import List, Dict
from cloudrail.knowledge.context.mergeable import Mergeable
from cloudrail.knowledge.context.aws.resources.ec2.network_interface import NetworkInterface
from cloudrail.knowledge.context.aws.resources.prefix_lists import PrefixLists, PrefixList
from cloudrail.knowledge.context.aws.resources.ec2.route import Route
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.context.aws.resources.service_name import AwsServiceType
from cloudrail.knowledge.rules.aws.context_aware.vpc_endpoints.abstract_vpc_endpoint_gateway_rule import AbstractVpcEndpointGatewayRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
class AbstractVpcEndpointGatewayIsNotUsedRule(AbstractVpcEndpointGatewayRule):
@abstractmethod
def get_id(self) -> str:
pass
def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
vpc_list, region_to_service_map, vpc_to_eni_map = self._init_maps(env_context)
issues_list: List[Issue] = []
region_to_prefix_lists_map: Dict[str, PrefixLists] = self._create_prefix_list_by_region_map(env_context)
for vpc in vpc_list:
for eni in vpc_to_eni_map.get(vpc, []):
if self._is_service_eni_match(eni):
prefix_list: PrefixLists = region_to_prefix_lists_map[vpc.region]
aws_service_pl: PrefixList = prefix_list.get_prefix_lists_by_service(self.aws_service_type.value)
if self._add_new_issue_from_outbound(eni, region_to_service_map, issues_list, aws_service_pl):
break
return issues_list
def _add_new_issue_from_outbound(self, eni: NetworkInterface, region_to_service_map: Dict[str, List[Mergeable]],
issues_list: List[Issue], service_pl: PrefixList) -> bool:
if self._is_public_connection_exist(eni):
most_specific_service_pl_route: Route = self._get_most_specific_service_pl_route(eni.subnet.route_table, service_pl)
if not self._is_valid_vpc_endpoint_route(most_specific_service_pl_route, service_pl, eni.vpc.endpoints):
for service in region_to_service_map[eni.vpc.region]:
issues_list.append(Issue(f"~The {eni.vpc.get_type()}~. `{eni.vpc.get_friendly_name()}` in region `{eni.vpc.region}`"
f" is in use but not leveraging {self.aws_service_type.name} Endpoint Gateway", service, eni.vpc))
return True
return False
class S3VpcEndpointGatewayNotUsedRule(AbstractVpcEndpointGatewayIsNotUsedRule):
def __init__(self) -> None:
super().__init__(AwsServiceType.S3, (443, 80), self.S3_SERVICES_EXCLUDE_LIST, False)
def get_id(self) -> str:
return "vpc_endpoint_s3_exposure"
class DynamoDbVpcEndpointGatewayNotUsedRule(AbstractVpcEndpointGatewayIsNotUsedRule):
def __init__(self) -> None:
super().__init__(AwsServiceType.DYNAMODB, (443,), self.DYNAMODB_SERVICES_INCLUDE_LIST, True)
def get_id(self) -> str:
return "vpc_endpoint_dynamodb_exposure"
PublicAccessBlockLevel (Enum)
An enumeration.
PublicAccessBlockSettings (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
bucket_name_or_account_id |
str |
An access block may apply to either a specific bucket or a whole account, this is the identifier to use. |
block_public_acls |
bool |
True if the block shouldn't allow public ACLs. |
ignore_public_acls |
bool |
True if the block should ignore public ACLs. |
block_public_policy |
bool |
True if the block shouldn't allow public policies. |
restrict_public_buckets |
bool |
True if the block should enforce restriction on public buckets. |
access_level |
PublicAccessBlockLevel |
Whether the block is on the account or specific bucket. |
account_access_block |
PublicAccessBlockSettings |
The account-level access block, if this one targets a bucket only. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
GranteeTypes (Enum)
An enumeration.
S3ACL (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
actions |
Tuple[str] |
A list of S3 actions included in this S3 ACL, based on the list from the S3Permission supplied. |
type |
GranteeTypes |
The type of the grantee - GROUP or CANONICAL_USER. |
type_value |
str |
The value of the grantee. If type is GROUP, this will be the group identifier. If CANONICAL_USER, this will be the canonical identifier for the user. |
bucket_name |
str |
The bucket to apply the ACL to. |
owner_id |
str |
The owner of this ACL. |
owner_name |
str |
The name of the owner. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
S3Permission (Enum)
An enumeration.
S3PredefinedGroups (Enum)
An enumeration.
S3Bucket (ConnectionInstance, PoliciedResource)
Attributes:
Name | Type | Description |
---|---|---|
bucket_name |
The name of the bucket. |
|
arn |
The ARN of the bucket. |
|
resource_based_policy |
the policy of this S3 bucket. |
|
acls |
List[S3ACL] |
The list of ACLs applied to this bucket. |
public_access_block_settings |
Optional[PublicAccessBlockSettings] |
The public access block applied to this bucket specifically, if any (or None). |
access_points |
List[S3BucketAccessPoint] |
The access points defined for this bucket. |
encryption_data |
Optional[S3BucketEncryption] |
The encryption configuration for this bucket. |
bucket_objects |
List[S3BucketObject] |
A list of objects in this bucket. NOTE: This is not fetched from the live environment and will only include objects that are defined in the infrastructure-as-code reviewed by Cloudrail. |
versioning_data |
S3BucketVersioning |
Configuration of versioning on the bucket. |
publicly_allowing_resources |
List[AwsResource] |
ACL's/Policies that expose this bucket to the internet. |
exposed_to_agw_methods |
List[ApiGatewayMethod] |
The ApiGateway methods that can acccess this bucket. |
is_logger |
Indicates if this bucket is the target bucket for logging of another bucket. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
S3BucketAccessPoint (PoliciedResource)
Attributes:
Name | Type | Description |
---|---|---|
bucket_name |
The name of the bucket this access point applies to. |
|
name |
The name of the access point. |
|
network_origin |
The network-level source of the traffic. |
|
arn |
The ARN of the access point. |
|
policy |
The policy applied to the access point. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
S3BucketAccessPointNetworkOrigin
dataclass
S3BucketAccessPointNetworkOrigin(access_type: str, vpc_id: str)
S3BucketAccessPointNetworkOriginType (str, Enum)
An enumeration.
S3BucketEncryption (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
bucket_name |
str |
The bucket the encryption settings apply to. |
encrypted |
bool |
True if encryption is enabled. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
S3BucketObject (AwsResource)
NOTE: Cloudrail does not map objects in the live environment. Instead. only onjects specifically defined in infrastructure-as-code will be included as part of the context.
Attributes:
Name | Type | Description |
---|---|---|
bucket_name |
str |
The bucket the object is in. |
key |
str |
The ARN of the key used to encrypt the object, if any. |
encrypted |
bool |
True if the object is encrypted. |
owning_bucket |
'S3Bucket' |
A pointer to the owning bucket. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
S3BucketVersioning (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
bucket_name |
str |
The name of the bucket the versioning config applies to. |
versioning |
bool |
True if versioning is enabled. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
S3BucketLogging (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
bucket_name |
str |
The bucket which the logs associated with. |
target_bucket |
Optional[str] |
The bucket name in which to send logs for this bucket. |
target_prefix |
Optional[str] |
A key prefix for log objects. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process