es
Sample rules
A few rules that use objects from this package:
public_access_elasticsearch_rule
from typing import List, Dict
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
class PublicAccessElasticSearchRule(AwsBaseRule):
def get_id(self) -> str:
return 'public_access_elasticsearch_rule'
def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
issues: List[Issue] = [Issue(
f"~Internet~. {es.get_type()}: `{es.get_friendly_name()}` is publicly accessible. "
f"{es.get_type()} is currently not deployed within a VPC. ~ElasticSearch~", es, es)
for es in env_context.elastic_search_domains if es.is_public]
return issues
def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
return bool(environment_context.elastic_search_domains)
indirect_public_access_elastic_search_rule
from typing import List, Dict, Optional
from cloudrail.knowledge.context.aws.resources.indirect_public_connection_data import IndirectPublicConnectionData
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
class IndirectPublicAccessElasticSearchRule(AwsBaseRule):
def get_id(self) -> str:
return 'indirect_public_access_elastic_search_rule'
def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
issues: List[Issue] = []
for es_domain in [es for es in env_context.elastic_search_domains if es.is_in_vpc]:
violation_data: Optional[IndirectPublicConnectionData] = es_domain.indirect_public_connection_data
if violation_data:
issues.append(
Issue(
f"~Internet~. "
f"Instance `{violation_data.target_eni.owner.get_friendly_name()}"
f"` resides in subnet(s) that are routable to internet gateway. "
f"Instance has public IP address. "
f"Instance accepts incoming traffic on port 443. "
f"~Instance `{violation_data.target_eni.owner.get_friendly_name()}`~. "
f"{es_domain.get_type()} `{es_domain.get_friendly_name()}` is on Vpc "
f"`{es_domain.network_resource.vpc.get_friendly_name()}`. "
f"{es_domain.get_type()} is not publically accessible and "
f"uses subnets `{', '.join([x.get_friendly_name() for x in es_domain.network_resource.subnets])}`. "
f"{es_domain.get_type()} resides in same subnet as Instance"
f"`{violation_data.target_eni.owner.get_friendly_name()}`. "
f"{es_domain.get_type()} uses Network ACL's "
f"`{', '.join([x.network_acl.get_friendly_name() for x in es_domain.network_resource.subnets])}`. "
f"{es_domain.get_type()} is indirectly accessible from instance "
f"`{violation_data.target_eni.owner.get_friendly_name()}`. "
f"~{es_domain.get_type()} `{es_domain.get_friendly_name()}`~"
, es_domain, violation_data.security_group))
return issues
def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
return bool(environment_context.elastic_search_domains)
not_car_elasticsearch_domains_encrypted_note_to_node
from typing import List, Dict
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
class EsEncryptNodeToNodeRule(AwsBaseRule):
def get_id(self) -> str:
return 'not_car_elasticsearch_domains_encrypted_note_to_node'
def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
issues: List[Issue] = []
for es_domain in env_context.elastic_search_domains:
if es_domain.is_new_resource():
if es_domain.es_domain_version not in ('5.6', '5.5', '5.3', '5.1', '2.3', '1.5') and not es_domain.encrypt_node_to_node_state:
issues.append(
Issue(
f"~{es_domain.get_type()}~. {es_domain.get_type()} `{es_domain.get_friendly_name()}`. "
f"is not set to use encrypt node-to-node", es_domain, es_domain))
return issues
def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
return bool(environment_context.elastic_search_domains)
ElasticSearchDomain (NetworkEntity, INetworkConfiguration, PoliciedResource)
Attributes:
Name | Type | Description |
---|---|---|
domain_id |
str |
The ID of the ElasticSearch Domain. |
domain_name |
The name of the domain. |
|
arn |
str |
The ARN of the domain. |
enforce_https |
True if only HTTPS is allowed. |
|
subnet_ids |
The IDs of the subnets the domain is attached to, if any. |
|
security_group_ids |
The IDs of the security groups used with the ElasticSearch Domain, if any. |
|
encrypt_at_rest_state |
bool |
True if encryption at rest is enabled. |
encrypt_node_to_node_state |
bool |
True if node-to-node traffic is encrypted. |
is_public |
bool |
True if the ElasticSearch Domain is public. |
is_in_vpc |
bool |
True if the ElasticSearch Domain is accessible at a specific VPC. |
ports |
List[int] |
The ports the ElasticSearch is listening on. |
indirect_public_connection_data |
Optional[IndirectPublicConnectionData] |
The data that describes that a publicly-accessible resource can access this resource by a security group of this resource. |
log_publishing_options |
Optional[List[LogPublishingOptions]] |
Set of data about the publishing logs to CloudWatch, if enabled. |
es_domain_version |
str |
The ElasticSearch Domain version. |
es_domain_cluster_instance_type |
str |
The ElasticSearch Domain cluster instance type. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
LogPublishingOptions
dataclass
Attributes:
Name | Type | Description |
---|---|---|
log_type |
str |
The type of Elasticsearch log to publish. |
cloudwatch_log_group_arn |
str |
The ARN of the Cloudwatch log group to publish logs into. |
enable |
Indication if log publishing is enabled. |
ElasticSearchDomainPolicy (ResourceBasedPolicy)
Attributes:
Name | Type | Description |
---|---|---|
domain_name |
str |
The name of the domain the policy is related to. |
policy_statements |
The statements contained in the policy. |
|
raw_document |
The raw JSON content of the policy. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process