Skip to content

es

Sample rules

A few rules that use objects from this package:

public_access_elasticsearch_rule
from typing import List, Dict

from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class PublicAccessElasticSearchRule(AwsBaseRule):

    def get_id(self) -> str:
        return 'public_access_elasticsearch_rule'

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:

        issues: List[Issue] = [Issue(
            f"~Internet~. {es.get_type()}: `{es.get_friendly_name()}` is publicly accessible. "
            f"{es.get_type()} is currently not deployed within a VPC. ~ElasticSearch~", es, es)
                               for es in env_context.elastic_search_domains if es.is_public]
        return issues

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.elastic_search_domains)
indirect_public_access_elastic_search_rule
from typing import List, Dict, Optional

from cloudrail.knowledge.context.aws.resources.indirect_public_connection_data import IndirectPublicConnectionData
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class IndirectPublicAccessElasticSearchRule(AwsBaseRule):

    def get_id(self) -> str:
        return 'indirect_public_access_elastic_search_rule'

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []
        for es_domain in [es for es in env_context.elastic_search_domains if es.is_in_vpc]:
            violation_data: Optional[IndirectPublicConnectionData] = es_domain.indirect_public_connection_data
            if violation_data:
                issues.append(
                    Issue(
                        f"~Internet~. "
                        f"Instance `{violation_data.target_eni.owner.get_friendly_name()}"
                        f"` resides in subnet(s) that are routable to internet gateway. "
                        f"Instance has public IP address. "
                        f"Instance accepts incoming traffic on port 443. "
                        f"~Instance `{violation_data.target_eni.owner.get_friendly_name()}`~. "
                        f"{es_domain.get_type()} `{es_domain.get_friendly_name()}` is on Vpc "
                        f"`{es_domain.network_resource.vpc.get_friendly_name()}`. "
                        f"{es_domain.get_type()} is not publically accessible and "
                        f"uses subnets `{', '.join([x.get_friendly_name() for x in es_domain.network_resource.subnets])}`. "
                        f"{es_domain.get_type()} resides in same subnet as Instance"
                        f"`{violation_data.target_eni.owner.get_friendly_name()}`. "
                        f"{es_domain.get_type()} uses Network ACL's "
                        f"`{', '.join([x.network_acl.get_friendly_name() for x in es_domain.network_resource.subnets])}`. "
                        f"{es_domain.get_type()} is indirectly accessible from instance "
                        f"`{violation_data.target_eni.owner.get_friendly_name()}`. "
                        f"~{es_domain.get_type()} `{es_domain.get_friendly_name()}`~"
                        , es_domain, violation_data.security_group))
        return issues

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.elastic_search_domains)
not_car_elasticsearch_domains_encrypted_note_to_node
from typing import List, Dict

from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class EsEncryptNodeToNodeRule(AwsBaseRule):

    def get_id(self) -> str:
        return 'not_car_elasticsearch_domains_encrypted_note_to_node'

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []

        for es_domain in env_context.elastic_search_domains:
            if es_domain.is_new_resource():
                if es_domain.es_domain_version not in ('5.6', '5.5', '5.3', '5.1', '2.3', '1.5') and not es_domain.encrypt_node_to_node_state:
                    issues.append(
                        Issue(
                            f"~{es_domain.get_type()}~. {es_domain.get_type()} `{es_domain.get_friendly_name()}`. "
                            f"is not set to use encrypt node-to-node", es_domain, es_domain))
        return issues

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.elastic_search_domains)

ElasticSearchDomain (NetworkEntity, INetworkConfiguration, PoliciedResource)

Attributes:

Name Type Description
domain_id str

The ID of the ElasticSearch Domain.

domain_name

The name of the domain.

arn str

The ARN of the domain.

enforce_https

True if only HTTPS is allowed.

subnet_ids

The IDs of the subnets the domain is attached to, if any.

security_group_ids

The IDs of the security groups used with the ElasticSearch Domain, if any.

encrypt_at_rest_state bool

True if encryption at rest is enabled.

encrypt_node_to_node_state bool

True if node-to-node traffic is encrypted.

is_public bool

True if the ElasticSearch Domain is public.

is_in_vpc bool

True if the ElasticSearch Domain is accessible at a specific VPC.

ports List[int]

The ports the ElasticSearch is listening on.

indirect_public_connection_data Optional[IndirectPublicConnectionData]

The data that describes that a publicly-accessible resource can access this resource by a security group of this resource.

log_publishing_options Optional[List[LogPublishingOptions]]

Set of data about the publishing logs to CloudWatch, if enabled.

es_domain_version str

The ElasticSearch Domain version.

es_domain_cluster_instance_type str

The ElasticSearch Domain cluster instance type.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

LogPublishingOptions dataclass

Attributes:

Name Type Description
log_type str

The type of Elasticsearch log to publish.

cloudwatch_log_group_arn str

The ARN of the Cloudwatch log group to publish logs into.

enable

Indication if log publishing is enabled.

ElasticSearchDomainPolicy (ResourceBasedPolicy)

Attributes:

Name Type Description
domain_name str

The name of the domain the policy is related to.

policy_statements

The statements contained in the policy.

raw_document

The raw JSON content of the policy.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process