apigateway
Sample rules
A few rules that use objects from this package:
s3_lambda_indirect_exposure
from typing import List, Dict
from cloudrail.knowledge.context.aws.resources.apigateway.rest_api_gw import RestApiGw
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
class S3BucketLambdaIndirectExposureRule(AwsBaseRule):
def get_id(self) -> str:
return 's3_lambda_indirect_exposure'
def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
issues: List[Issue] = []
for s3_bucket in env_context.s3_buckets:
for agw_method in s3_bucket.exposed_to_agw_methods:
if not self._is_api_gateway_public(agw_method.rest_api_id, env_context.rest_api_gw):
continue
issues.append(Issue(evidence=f"The S3 Bucket `{s3_bucket.get_friendly_name()}`. is exposed via the execution role in "
f"Lambda Function `{agw_method.integration.lambda_func_integration.get_friendly_name()}`. "
f"which can be invoked by public API Gateway `{agw_method.get_friendly_name()}`",
exposed=s3_bucket,
violating=agw_method.integration.lambda_func_integration))
return issues
def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
return bool(environment_context.s3_buckets
and environment_context.lambda_function_list
and environment_context.api_gateway_methods)
@staticmethod
def _is_api_gateway_public(rest_api_gw_id: str, api_gateways: List[RestApiGw]) -> bool:
for api_gateway in api_gateways:
if api_gateway.rest_api_gw_id == rest_api_gw_id:
return api_gateway.is_public
raise Exception(f'Rest API Gateway {rest_api_gw_id} could not be found')
non_car_api_gateway_caching_encrypted
from typing import List, Dict
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
class EnsureApiGwCachingEncryptedRule(AwsBaseRule):
def get_id(self) -> str:
return 'non_car_api_gateway_caching_encrypted'
def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
issues: List[Issue] = []
for api_gw in env_context.rest_api_gw:
if api_gw.method_settings and api_gw.method_settings.caching_enabled and not api_gw.method_settings.caching_encrypted:
issues.append(
Issue(f"The {api_gw.get_type()} `{api_gw.get_id()}` has caching enabled and not encrypted for "
f"method `{api_gw.method_settings.http_method.value}` in the stage `{api_gw.method_settings.stage_name}`", api_gw, api_gw))
return issues
def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
return bool(environment_context.rest_api_gw)
non_car_api_gateway_tls
from typing import List, Dict
from packaging import version
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
# Currently only checking API GW V1 (rest API's), as V2 does not support but TLS v1.2.
class EnsureApiGwUseModernTlsRule(AwsBaseRule):
def get_id(self) -> str:
return 'non_car_api_gateway_tls'
def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
issues: List[Issue] = []
for api_gw in env_context.rest_api_gw:
if api_gw.domain and self._version_check(api_gw.domain.security_policy):
issues.append(
Issue(f"The {api_gw.get_type()} `{api_gw.get_friendly_name()}` "
f"has a domain configured but not enforcing TLS v1.2 ", api_gw, api_gw.domain))
return issues
@staticmethod
def _version_check(proto_version: str) -> bool:
version_num = proto_version.replace('TLS_', '').replace('_', '.')
return version.parse(version_num) < version.parse('1.2')
def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
return bool(environment_context.rest_api_gw)
ApiGatewayIntegration (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
rest_api_id |
str |
The ID of the associated REST API. |
resource_id |
str |
The API resource ID. |
request_http_method |
RestApiMethod |
The HTTP method used when calling the associated resource. |
integration_http_method |
RestApiMethod |
The integration HTTP method, may be None. |
integration_type |
IntegrationType |
The integration's input type. |
uri |
str |
The input's URI. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
IntegrationType (str, Enum)
The type of integration.
ApiGatewayMethod (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
rest_api_id |
str |
The ID of the associated REST API. |
resource_id |
str |
The API resource ID. |
http_method |
RestApiMethod |
The HTTP Method. |
integration |
Optional[ApiGatewayIntegration] |
A reference to the matching ApiGatewayIntegration based on rest_api_id. |
authorization |
str |
The type of authorization used for the method. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
ApiGatewayMethodSettings (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
api_gw_id |
str |
The ID of the REST API Gateway. |
stage_name |
str |
The name of the stage. |
method_path |
str |
The method resource path. |
http_method |
RestApiMethod |
The actual HTTP method (GET, etc.). |
caching_enabled |
bool |
Set to True if caching is enabled, False or None otherwise. |
caching_encrypted |
bool |
Set to True or a KMS ARN is enabled, False or None otherwise. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
RestApiMethod (Enum)
An enumeration.
ApiGatewayType (str, Enum)
An enumeration.
RestApiGw (PoliciedResource)
Attributes:
Name | Type | Description |
---|---|---|
rest_api_gw_id |
str |
The ID of the REST API Gateway. |
api_gw_name |
str |
The name of the API gateway. |
api_gateway_type |
ApiGatewayType |
One of EDGE, REGIONAL, PRIVATE. |
is_public |
bool |
An indication on if this resource is accessible from the internet. |
api_gateway_methods |
List[ApiGatewayMethod] |
All the ApiGatewayMethods associated with this gateway. |
api_gw_stages |
List[ApiGatewayStage] |
The stages associated with this REST API Gateway. |
agw_methods_with_valid_integrations_and_allowed_lambda_access |
List[ApiGatewayMethod] |
The ApiGatewayMethods associated with this gateway, with valid integrations, and are allowed to access a lambda function. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
RestApiGwDomain (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
domain_name |
str |
The name of the REST API domain. |
security_policy |
str |
The Transport Layer Security (TLS) version + cipher suite for this DomainName. The valid values are TLS_1_0 and TLS_1_2. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
RestApiGwMapping (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
api_id |
str |
The ID of the REST API Gateway. |
domain_name |
str |
The name of the domain. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
RestApiGwPolicy (ResourceBasedPolicy)
Attributes:
Name | Type | Description |
---|---|---|
rest_api_gw_id |
str |
The ID of the REST API Gateway. |
policy_statements |
The statements of the resource policy attached to this gateway. |
|
raw_document |
The raw JSON of the resource policy. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
AccessLogsSettings
dataclass
Attributes:
Name | Type | Description |
---|---|---|
destination_arn |
str |
The ARN of either Cloudwatch log group or Kinesis Data Firehose delivery stream to receive the access logs. |
format |
str |
The formatting and values recorded in the logs. |
ApiGatewayStage (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
api_gw_id |
str |
The ID of the REST API Gateway. |
stage_name |
str |
The name of the stage. |
xray_tracing_enabled |
bool |
An indication if active tracing with X-ray is enabled. |
access_logs |
Optional[AccessLogsSettings] |
Block information about the access logs settings of the REST API Gateway stage (if any configured). |
method_settings |
Optional[ApiGatewayMethodSettings] |
The method settings configured for this stage, if configured. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process