Skip to content

cloudwatch

Sample rules

A few rules that use objects from this package:

non_car_lambda_logging_not_infinite
from typing import List, Dict

from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class EnsureLambdaFunctionHasNonInfiniteLogRetentionRule(AwsBaseRule):

    def get_id(self) -> str:
        return 'non_car_lambda_logging_not_infinite'

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []

        for lambda_func in env_context.lambda_function_list:
            if lambda_func.log_group and (lambda_func.log_group.retention_in_days == 0 or not lambda_func.log_group.retention_in_days):
                if lambda_func.log_group.is_pseudo:
                    issues.append(
                        Issue(
                            f'Upon creation, {lambda_func.get_type()} `{lambda_func.get_friendly_name()}` '
                            f'will have a log group generated automatically with its retention set to Never Expire'
                            , lambda_func, lambda_func))
                else:
                    issues.append(
                        Issue(
                            f'The {lambda_func.log_group.get_type()} `{lambda_func.log_group.get_friendly_name()}` has '
                            f'retention set to Never Expire', lambda_func, lambda_func.log_group))
        return issues

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.lambda_function_list)
non_car_cw_log_group_no_retention
from typing import List, Dict

from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class EnsureCloudWatchLogGroupsRetentionUsageRule(AwsBaseRule):

    def get_id(self) -> str:
        return 'non_car_cw_log_group_no_retention'

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []

        for log_group in env_context.cloud_watch_log_groups:
            if not log_group.retention_in_days:
                issues.append(
                    Issue(
                        f'The {log_group.get_type()} `{log_group.get_friendly_name()}` does not have a retention policy set'
                        , log_group, log_group))
        return issues

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.cloud_watch_log_groups)

CloudWatchEventTarget (AwsResource)

Attributes:

Name Type Description
name str

The name of the CloudWatch Event Target.

rule_name str

The name of the rule used with the target.

target_id str

The ID of this traget.

role_arn str

The ARN of the role used to send the events, may be None.

cluster_arn str

If an ECS cluster is targeted, this is the ARN of the ECS cluster.

ecs_target_list

If an ECS cluster is targeted, lists the ECS targets.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self)

A list of attributes that should be excluded from the invalidation process

CloudWatchLogGroup (AwsResource)

Attributes:

Name Type Description
name str

The name of the CloudWatch Log Group.

kms_encryption str

KMS key ID is used, or None if not.

kms_data KmsKey

A pointer to the actual KMS key, if used.

arn str

The ARN of the Log Group.

retention_in_days int

If configured, this is the retention of the log data in days. May be None.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

CloudWatchLogsDestination (PoliciedResource)

Attributes:

Name Type Description
name str

The name of the destination.

arn str

THe ARN of the destination.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

CloudWatchLogsDestinationPolicy (ResourceBasedPolicy)

Attributes:

Name Type Description
destination_name str

The name of the destination the policy applies to.

statements

The list of statements in the policy.

uuid

A randomly generated uuid for the policy (ignore, for internal use).

raw_document

The raw JSON of the policy.

access_analyzer_findings

The results from running IAM Access Analyzer's policy validation API on this policy's JSON.

policy_type

The type of the policy (identity, resource, SCP).

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process