iam
Sample rules
A few rules that use objects from this package:
ec2_role_share_rule
from typing import List, Dict
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
from cloudrail.knowledge.context.aws.resources.ec2.ec2_instance import Ec2Instance
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
class Ec2RoleShareRule(AwsBaseRule):
def get_id(self) -> str:
return 'ec2_role_share_rule'
def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
issues: List[Issue] = []
ec2s: List[Ec2Instance] = env_context.ec2s
profile_to_public_ec2 = {}
for public_ec2 in (x for x in ec2s if x.network_resource.is_inbound_public and x.iam_profile_name):
profile_to_public_ec2[public_ec2.iam_profile_name] = public_ec2
for private_ec2 in (x for x in ec2s if not x.network_resource.is_inbound_public and x.iam_profile_name):
public_ec2 = profile_to_public_ec2.get(private_ec2.iam_profile_name)
profile = private_ec2.iam_role.get_friendly_name() \
if private_ec2.iam_role \
else private_ec2.iam_profile_name
if public_ec2:
issues.append(
Issue(
f"~Instance `{public_ec2.get_friendly_name()}`~. Instance is publicly exposed. "
f"Instance uses IAM role `{profile}`. "
f"Private EC2 instance shares IAM role `{profile}` as well. "
f"~Instance `{private_ec2.get_friendly_name()}`~",
private_ec2,
private_ec2.iam_role))
return issues
def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
return bool(environment_context.ec2s)
car_iam_policy_control_in_iac_only
from typing import List, Dict, Tuple, Union
from cloudrail.knowledge.context.aws.resources.account.account import Account
from cloudrail.knowledge.context.aws.resources.iam.iam_group import IamGroup
from cloudrail.knowledge.context.aws.resources.iam.iam_identity import IamIdentity
from cloudrail.knowledge.context.aws.resources.iam.iam_user import IamUser
from cloudrail.knowledge.context.aws.resources.iam.policy import InlinePolicy, ManagedPolicy
from cloudrail.knowledge.context.mergeable import EntityOrigin
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
class EnsureIamEntitiesPolicyManagedSolely(AwsBaseRule):
def get_id(self) -> str:
return 'car_iam_policy_control_in_iac_only'
def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
issues: List[Issue] = []
for account in env_context.accounts:
filtered_iam_entities = self._filter_entities_by_account(env_context.roles, account) + \
self._filter_entities_by_account(env_context.users, account) + \
self._filter_entities_by_account(env_context.groups, account)
for entity in filtered_iam_entities:
affected_policies = self._get_live_env_attached_policies(entity)
for policy in affected_policies:
issues.append(
Issue(f'~`{policy.get_friendly_name()}`~. '
f'is attached to {entity.get_type()} `{entity.get_friendly_name()}`. `{entity.get_friendly_name()}` '
f'is declared in infrastructure-as-code. The attachment of the policy was done outside of the code '
f'(for example, directly via the console)', account, policy))
if isinstance(entity, IamUser):
affected_groups = self._get_group_attach_policies_aws(entity)
for group, policies in affected_groups:
policies_list_string = ', '.join([policy.get_friendly_name() for policy in policies])
issues.append(
Issue(f'{entity.get_type()} `{entity.get_friendly_name()}` is declared in infrastructure-as-code. '
f'The attachment of policy(s) `{policies_list_string}` to it was done outside the code '
f'(for example, directly via the console), by adding it to the group `{group.get_friendly_name()}`.',
account, group))
return issues
@staticmethod
def filter_non_iac_managed_issues() -> bool:
return False
def _filter_entities_by_account(self, entities: List[IamIdentity], account: Account) -> List[IamIdentity]:
return [entity for entity in entities if entity.account == account.account and self._are_there_existing_tf_entities(entity)]
@staticmethod
def _are_there_existing_tf_entities(entity: IamIdentity) -> bool:
return entity.is_managed_by_iac and not entity.is_new_resource() and \
any((isinstance(policy, ManagedPolicy) and
any(pao.get(policy.get_name()) == EntityOrigin.TERRAFORM for pao in entity.get_policies_attach_origin_maps())
or (isinstance(policy, InlinePolicy) and policy.is_managed_by_iac)) for policy in entity.get_policies())
@staticmethod
def _get_live_env_attached_policies(entity: IamIdentity) -> List[Union[ManagedPolicy, InlinePolicy]]:
affected_policies = []
for policy in entity.permissions_policies:
if (isinstance(policy, ManagedPolicy)
and any(pao.get(policy.get_name()) == EntityOrigin.LIVE_ENV for pao in entity.get_policies_attach_origin_maps())):
affected_policies.append(policy)
elif isinstance(policy, InlinePolicy) and not policy.is_managed_by_iac:
affected_policies.append(policy)
return affected_policies
def _get_group_attach_policies_aws(self, user: IamUser) -> List[Tuple[IamGroup, List[Union[ManagedPolicy, InlinePolicy]]]]:
issues_list = []
for group in user.groups:
affected_policies = self._get_live_env_attached_policies(group)
if affected_policies:
issues_list.append((group, affected_policies))
return issues_list
def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
return bool(environment_context.accounts and environment_context.get_all_iam_entities())
iam_priv_escalation_policy
from typing import Dict, List, Set, Optional
from cloudrail.knowledge.context.aws.resources.iam.iam_identity import IamIdentity
from cloudrail.knowledge.context.aws.resources.iam.policy import Policy
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
from cloudrail.knowledge.utils.action_utils import is_combo_escalation_permissions_match, attribute_match, LAMBDA_UPDATE_ACTION, EC2_RUN_INSTANCE_ACTION, \
LAMBDA_INVOKE_FUNCTION_ACTION, LAMBDA_CREATE_EVENT_ACTION
class IamPrivilegeEscalationPolicyRule(AwsBaseRule):
EVIDENCE_TEMPLATE: str = "~`{}`~. is applied to `{}`. {}{}"
def get_id(self) -> str:
return "iam_priv_escalation_policy"
def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
issues = []
for iam_entity in env_context.get_all_iam_entities():
issue = self._add_issues_by_iam_entity(iam_entity)
if issue:
issues.append(issue)
return issues
def _add_issues_by_iam_entity(self, iam_entity: IamIdentity) -> Optional[Issue]:
if iam_entity.policy_to_escalation_actions_map:
all_policies_esc_actions: Set[str] = {esc_action for esc_actions in iam_entity.policy_to_escalation_actions_map.values()
for esc_action in esc_actions}
uuid_to_policy_map: Dict[str, Policy] = {policy.uuid: policy for policy in iam_entity.permissions_policies}
if is_combo_escalation_permissions_match(all_policies_esc_actions):
policies: List[Policy] = [uuid_to_policy_map[policy_uuid] for policy_uuid in iam_entity.policy_to_escalation_actions_map.keys()]
return self._handle_issues(iam_entity, policies, all_policies_esc_actions)
return None
def _handle_issues(self, iam_entity: IamIdentity, policies: List[Policy], esc_action_list: Set[str]) -> Issue:
specific_evidence: str = self._get_evidence_str(esc_action_list)
multiple_policies_section: str = self._get_multiple_policies_evidence_section(policies)
policy: Policy = policies[0]
if multiple_policies_section:
specific_evidence = ' ' + specific_evidence
evidence: str = self.EVIDENCE_TEMPLATE.format(
policy.get_friendly_name(), iam_entity.get_arn(), multiple_policies_section, specific_evidence)
if policy.is_managed_by_iac:
return Issue(evidence, policy, policy)
else:
return Issue(evidence, iam_entity, iam_entity)
@staticmethod
def _get_multiple_policies_evidence_section(policies: List[Policy]) -> str:
multiple_policies_section: str = ""
if len(policies) > 1:
multiple_policies_section = "in conjunction with " + \
', '.join([f"`{policy.get_friendly_name()}`" for policy in policies])
return multiple_policies_section
@classmethod
def _get_evidence_str(cls, esc_action_list: Set[str]):
if cls._is_specific_evidence(esc_action_list, LAMBDA_UPDATE_ACTION):
return "`lambda:UpdateFunctionCode` allows a hacker to run their code under the lambda permission"
elif cls._is_specific_evidence(esc_action_list, EC2_RUN_INSTANCE_ACTION):
return "`iam:PassRole` and `ec2:RunInstances` allows a hacker to run an EC2 instance with a role they choose"
elif cls._is_specific_evidence(esc_action_list, LAMBDA_INVOKE_FUNCTION_ACTION) or \
cls._is_specific_evidence(esc_action_list, LAMBDA_CREATE_EVENT_ACTION):
return f"{str(esc_action_list)} allows a hacker to trigger a Lambda function with a role they choose"
else:
return f"granting the problematic permissions `{str(esc_action_list)}` which can allow for privilege escalation"
@staticmethod
def _is_specific_evidence(esc_action_list: Set[str], action: str):
return "*" not in esc_action_list and any(attribute_match(esc_action, action)
for esc_action in esc_action_list)
def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
return bool(environment_context.get_all_iam_entities())
non_car_iam_no_human_users
from typing import Dict, List
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
class IamNoHumanUsersRule(AwsBaseRule):
def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
issues_list: List[Issue] = []
for user in env_context.users:
if any(user.name == login_profile.name and user.account == login_profile.account for login_profile in env_context.users_login_profile):
issues_list.append(Issue(f'The {user.get_type()} `{user.get_friendly_name()}` has console access, '
f'and so is considered human', user, user))
return issues_list
def get_id(self) -> str:
return "non_car_iam_no_human_users"
def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
return bool(environment_context.users and environment_context.users_login_profile)
not_car_access_analyzer_validation_error_and_security
from abc import ABC, abstractmethod
from typing import List, Dict, Set
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
class AccessAnalyzerValidationBaseRule(AwsBaseRule, ABC):
def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
return bool(environment_context.get_iac_managed_policies())
def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
issues = []
for policy in env_context.get_iac_managed_policies():
evidence = self._create_evidence_from_findings(policy.access_analyzer_findings)
if evidence:
issues.append(Issue(evidence, policy, policy))
return issues
def _create_evidence_from_findings(self, findings):
evidences = []
for finding in findings:
evidence = ''
if finding.get('findingType') in self._get_violated_finding_types():
if finding.get('locations'):
start = finding['locations'][0]['span']['start']
line = start['line']
column = start['column']
prefix = f'Line {line}, Col {column}:'
else:
prefix = 'Finding Without Specific Location In Policy:'
finding_details = finding['findingDetails']
learn_more = finding['learnMoreLink']
evidence += f'~{prefix}~. {finding_details} See {learn_more}'
evidences.append(evidence)
return '. '.join(evidences)
@abstractmethod
def _get_violated_finding_types(self) -> Set[str]:
pass
IamGroup (IamIdentity)
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
The name of the IAM Group. |
group_id |
str |
The ID of the group. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
IamGroupMembership (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
The name of the group membership. |
group |
str |
The group the users belong to. |
users |
List[str] |
The list of users who are members of the designated group. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
IamIdentity (AwsResource, ConnectionInstance, Cloneable)
Attributes:
Name | Type | Description |
---|---|---|
qualified_arn |
str |
A Cloudrail-caculated ARN for the role that ensures it's the same whether the role came from infrastructure-as-code (such as Terraform) or the live AWS environment. |
arn |
str |
The ARN of the IAM identity. |
permissions_policies |
List[Union[ManagedPolicy, InlinePolicy]] |
One or more policies used to give the IAM entity permissions to take certain actions. |
permission_boundary |
Optional[Policy] |
The permission boundary limiting the IAM entity's permissions. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
IamIdentityType (str, Enum)
An enumeration.
IamInstanceProfile (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
role_name |
str |
The name of the role. |
iam_instance_profile_name |
str |
The name of the instance profile. |
ec2_instance_data |
Ec2Instance |
The Ec2Instance using this profile. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
IamPasswordPolicy (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
min_pass_length |
int |
The minimum length required for passwords. |
require_low_case_characters |
bool |
True if lower cases characters are required according to the policy. |
require_upper_case_characters |
bool |
True if upper cases characters are required according to the policy. |
require_numbers |
bool |
True if number characters are required according to the policy. |
require_symbols |
bool |
True if symbol characters are required according to the policy. |
allow_users_to_change_pass |
bool |
True if users can change their password. |
max_pass_age |
int |
The maximum age of a password before it needs to be replaced. |
password_reuse_prevention |
int |
Number of passwords kept in history that cannot be repeated. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
IamPolicyAttachment (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
policy_arn |
str |
The ARN of the policy to attach. |
users |
Optional[List] |
The list of users to attach the policy to, may be empty or None. |
roles |
Optional[List] |
The list of roles to attach the policy to, may be empty or None. |
groups |
Optional[List] |
The list of groups to attach the policy to, may be empty or None. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
IamUser (IamIdentity)
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
The name of the user. |
user_id |
str |
The ID of the user. |
groups |
List[IamGroup] |
The groups the user belongs to. |
groups_attach_origin_map |
List[Dict] |
A cache map used to "remember" the origin of user-to-group attachments (whether from live account, IaC, etc.). |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
IamUserGroupMembership (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
user |
str |
The user the membership is focused on. |
groups |
List[str] |
The groups the user should be a member of. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
IamUsersLoginProfile (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
The name of the user the login profile is for. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
AssumeRolePolicy (Policy)
Attributes:
Name | Type | Description |
---|---|---|
role_name |
str |
The name of the role that uses this policy. |
role_arn |
str |
The ARN of the role that uses this policy. |
is_allowing_external_assume |
bool |
An indication on if this policy can be assumed by a resource outside of this policy's account. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
InlinePolicy (Policy)
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
ManagedPolicy (Policy)
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
Policy (AwsResource, Cloneable)
Attributes:
Name | Type | Description |
---|---|---|
statements |
List[PolicyStatement] |
The list of statements in the policy. |
uuid |
str |
A randomly generated uuid for the policy (ignore, for internal use). |
raw_document |
The raw JSON of the policy. |
|
access_analyzer_findings |
The results from running IAM Access Analyzer's policy validation API on this policy's JSON. |
|
policy_type |
The type of the policy (identity, resource, SCP). |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
PolicyType (str, Enum)
An enumeration.
PolicyGroupAttachment (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
policy_arn |
str |
The policy to attach to the group. |
group_id |
str |
The ID of the group to attach the policy to. |
group_name |
str |
The name of the group to attach the policy to. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
PolicyRoleAttachment (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
policy_arn |
str |
The policy to attach to the role. |
role_name |
str |
The name of the role to attach the policy to. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
PolicyStatement (Cloneable)
Attributes:
Name | Type | Description |
---|---|---|
effect |
The effect of the statement (Allow / Deny). |
|
actions |
The actions covered by the statements. |
|
resources |
The resources covered by the statement. |
|
principal |
The principal(s) included. |
|
statement_id |
The id of the statement. |
|
condition_block |
List of conditions included in the statement, or None if there aren't any. |
|
policy |
The policy the statement belong to, if it does. |
StatementCondition
dataclass
StatementCondition(operator: str, key: str, values: List[str])
StatementEffect (Enum)
An enumeration.
PolicyUserAttachment (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
policy_arn |
str |
The policy to attach to the user. |
user_id |
str |
The ID of the user to attach the policy to. |
user_name |
str |
The name of the user to attach the policy to. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
Principal
dataclass
Principal(principal_type: cloudrail.knowledge.context.aws.resources.iam.principal.PrincipalType, principal_values: List[str])
PrincipalType (Enum)
An enumeration.
Role (IamIdentity)
Attributes:
Name | Type | Description |
---|---|---|
role_name |
str |
THe name of the role. |
role_id |
str |
The role's ID. |
permission_boundary_arn |
Optional[str] |
The ARN of the permission boundary if one applies (may be None). |
creation_date |
str |
The date of creation of the role. |
arn |
The ARN of the role. |
|
assume_role_policy |
AssumeRolePolicy |
The assume role policy. |
policy_evaluation_result_map |
Dict[str, PolicyEvaluation] |
A caching of the policy evaluation for the role. |
last_used_date |
RoleLastUsed |
Last date the role was used (comes from an API call made to the AWS IAM API). |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
RoleLastUsed (AwsResource)
Attributes:
Name | Type | Description |
---|---|---|
role_name |
str |
The name of the role. |
arn |
str |
The ARN of the role. |
last_used_date |
str |
The last date the role was used in. |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process