Skip to content

apigateway

Sample rules

A few rules that use objects from this package:

s3_lambda_indirect_exposure
from typing import List, Dict

from cloudrail.knowledge.context.aws.resources.apigateway.rest_api_gw import RestApiGw
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class S3BucketLambdaIndirectExposureRule(AwsBaseRule):

    def get_id(self) -> str:
        return 's3_lambda_indirect_exposure'

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []

        for s3_bucket in env_context.s3_buckets:
            for agw_method in s3_bucket.exposed_to_agw_methods:
                if not self._is_api_gateway_public(agw_method.rest_api_id, env_context.rest_api_gw):
                    continue
                issues.append(Issue(evidence=f"The S3 Bucket `{s3_bucket.get_friendly_name()}`. is exposed via the execution role in "
                                             f"Lambda Function `{agw_method.integration.lambda_func_integration.get_friendly_name()}`. "
                                             f"which can be invoked by public API Gateway `{agw_method.get_friendly_name()}`",
                                    exposed=s3_bucket,
                                    violating=agw_method.integration.lambda_func_integration))

        return issues

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.s3_buckets
                    and environment_context.lambda_function_list
                    and environment_context.api_gateway_methods)

    @staticmethod
    def _is_api_gateway_public(rest_api_gw_id: str, api_gateways: List[RestApiGw]) -> bool:
        for api_gateway in api_gateways:
            if api_gateway.rest_api_gw_id == rest_api_gw_id:
                return api_gateway.is_public
        raise Exception(f'Rest API Gateway {rest_api_gw_id} could not be found')
non_car_api_gateway_caching_encrypted
from typing import List, Dict

from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class EnsureApiGwCachingEncryptedRule(AwsBaseRule):

    def get_id(self) -> str:
        return 'non_car_api_gateway_caching_encrypted'

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []
        for api_gw in env_context.rest_api_gw:
            if api_gw.method_settings and api_gw.method_settings.caching_enabled and not api_gw.method_settings.caching_encrypted:
                issues.append(
                    Issue(f"The {api_gw.get_type()} `{api_gw.get_id()}` has caching enabled and not encrypted for "
                          f"method `{api_gw.method_settings.http_method.value}` in the stage `{api_gw.method_settings.stage_name}`", api_gw, api_gw))
        return issues

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.rest_api_gw)
non_car_api_gateway_tls
from typing import List, Dict
from packaging import version

from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


# Currently only checking API GW V1 (rest API's), as V2 does not support but TLS v1.2.


class EnsureApiGwUseModernTlsRule(AwsBaseRule):

    def get_id(self) -> str:
        return 'non_car_api_gateway_tls'

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []
        for api_gw in env_context.rest_api_gw:
            if api_gw.domain and self._version_check(api_gw.domain.security_policy):
                issues.append(
                    Issue(f"The {api_gw.get_type()} `{api_gw.get_friendly_name()}` "
                          f"has a domain configured but not enforcing TLS v1.2 ", api_gw, api_gw.domain))
        return issues

    @staticmethod
    def _version_check(proto_version: str) -> bool:
        version_num = proto_version.replace('TLS_', '').replace('_', '.')
        return version.parse(version_num) < version.parse('1.2')

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.rest_api_gw)

ApiGatewayIntegration (AwsResource)

Attributes:

Name Type Description
rest_api_id str

The ID of the associated REST API.

resource_id str

The API resource ID.

request_http_method RestApiMethod

The HTTP method used when calling the associated resource.

integration_http_method RestApiMethod

The integration HTTP method, may be None.

integration_type IntegrationType

The integration's input type.

uri str

The input's URI.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

IntegrationType (str, Enum)

The type of integration.

ApiGatewayMethod (AwsResource)

Attributes:

Name Type Description
rest_api_id str

The ID of the associated REST API.

resource_id str

The API resource ID.

http_method RestApiMethod

The HTTP Method.

integration Optional[ApiGatewayIntegration]

A reference to the matching ApiGatewayIntegration based on rest_api_id.

authorization str

The type of authorization used for the method.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

ApiGatewayMethodSettings (AwsResource)

Attributes:

Name Type Description
api_gw_id str

The ID of the REST API Gateway.

stage_name str

The name of the stage.

method_path str

The method resource path.

http_method RestApiMethod

The actual HTTP method (GET, etc.).

caching_enabled bool

Set to True if caching is enabled, False or None otherwise.

caching_encrypted bool

Set to True or a KMS ARN is enabled, False or None otherwise.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

RestApiMethod (Enum)

An enumeration.

ApiGatewayType (str, Enum)

An enumeration.

RestApiGw (PoliciedResource)

Attributes:

Name Type Description
rest_api_gw_id str

The ID of the REST API Gateway.

api_gw_name str

The name of the API gateway.

api_gateway_type ApiGatewayType

One of EDGE, REGIONAL, PRIVATE.

is_public bool

An indication on if this resource is accessible from the internet.

api_gateway_methods List[ApiGatewayMethod]

All the ApiGatewayMethods associated with this gateway.

api_gw_stages List[ApiGatewayStage]

The stages associated with this REST API Gateway.

agw_methods_with_valid_integrations_and_allowed_lambda_access List[ApiGatewayMethod]

The ApiGatewayMethods associated with this gateway, with valid integrations, and are allowed to access a lambda function.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

RestApiGwDomain (AwsResource)

Attributes:

Name Type Description
domain_name str

The name of the REST API domain.

security_policy str

The Transport Layer Security (TLS) version + cipher suite for this DomainName. The valid values are TLS_1_0 and TLS_1_2.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

RestApiGwMapping (AwsResource)

Attributes:

Name Type Description
api_id str

The ID of the REST API Gateway.

domain_name str

The name of the domain.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

RestApiGwPolicy (ResourceBasedPolicy)

Attributes:

Name Type Description
rest_api_gw_id str

The ID of the REST API Gateway.

policy_statements

The statements of the resource policy attached to this gateway.

raw_document

The raw JSON of the resource policy.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

AccessLogsSettings dataclass

Attributes:

Name Type Description
destination_arn str

The ARN of either Cloudwatch log group or Kinesis Data Firehose delivery stream to receive the access logs.

format str

The formatting and values recorded in the logs.

ApiGatewayStage (AwsResource)

Attributes:

Name Type Description
api_gw_id str

The ID of the REST API Gateway.

stage_name str

The name of the stage.

xray_tracing_enabled bool

An indication if active tracing with X-ray is enabled.

access_logs Optional[AccessLogsSettings]

Block information about the access logs settings of the REST API Gateway stage (if any configured).

method_settings Optional[ApiGatewayMethodSettings]

The method settings configured for this stage, if configured.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process