Skip to content

athena

Sample rules

A few rules that use objects from this package:

non_car_athena_workgroup_query_results_encrypt_at_rest_using_customer_managed_cmk
from typing import List, Dict

from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.context.aws.resources.kms.kms_key_manager import KeyManager
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class EnsureAthenaWorkgroupsEncryptionCmkRule(AwsBaseRule):

    def get_id(self) -> str:
        return 'non_car_athena_workgroup_query_results_encrypt_at_rest_using_customer_managed_cmk'

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []

        for workgroup in env_context.athena_workgroups:
            if workgroup.is_new_resource():
                if workgroup.encryption_option == 'SSE_S3' \
                        or workgroup.kms_data is None \
                        or workgroup.kms_data.key_manager != KeyManager.CUSTOMER:
                    issues.append(
                        Issue(
                            f'The {workgroup.get_type()} `{workgroup.get_friendly_name()}` is set to use encrypt at rest '
                            f'but it is not using customer-managed CMKs', workgroup, workgroup))
        return issues

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.athena_workgroups)
non_car_athena_workgroup_query_results_encrypt_at_rest
from typing import List, Dict

from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class EnsureAthenaWorkGroupsResultsEncryptedRule(AwsBaseRule):

    def get_id(self) -> str:
        return 'non_car_athena_workgroup_query_results_encrypt_at_rest'

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []

        for workgroup in env_context.athena_workgroups:
            if workgroup.enforce_workgroup_config:
                if not workgroup.encryption_config:
                    issues.append(
                        Issue(
                            f"The {workgroup.get_type()} `{workgroup.get_friendly_name()}` is not "
                            f"set to encrypt at rest the query results", workgroup, workgroup))
            elif workgroup.encryption_config and not workgroup.enforce_workgroup_config:
                issues.append(
                    Issue(
                        f"The {workgroup.get_type()} `{workgroup.get_friendly_name()}` is "
                        f"set to encrypt at rest the query results, but the workgroup configurations are not set to enforce", workgroup, workgroup))
        return issues

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.athena_workgroups)

AthenaWorkgroup (AwsResource)

Attributes:

Name Type Description
name str

The name of the workgroup.

state str

DISABLED or ENABLED.

encryption_config bool

True if any encryption configuration is set, False otherwise.

enforce_workgroup_config bool

True to enforce Workgroup encryption configuration on clients.

encryption_option str

Set if encryption is configured, one of SSE_S3, SSE_KMS, CSE_KMS.

kms_key_arn str

Set if KMS is used for encryption, this is the ARN of the key.

kms_key_id str

KMS key unique id.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process