Skip to content

iam

Sample rules

A few rules that use objects from this package:

ec2_role_share_rule
from typing import List, Dict

from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
from cloudrail.knowledge.context.aws.resources.ec2.ec2_instance import Ec2Instance
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext


class Ec2RoleShareRule(AwsBaseRule):

    def get_id(self) -> str:
        return 'ec2_role_share_rule'

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []

        ec2s: List[Ec2Instance] = env_context.ec2s
        profile_to_public_ec2 = {}
        for public_ec2 in (x for x in ec2s if x.network_resource.is_inbound_public and x.iam_profile_name):
            profile_to_public_ec2[public_ec2.iam_profile_name] = public_ec2
        for private_ec2 in (x for x in ec2s if not x.network_resource.is_inbound_public and x.iam_profile_name):
            public_ec2 = profile_to_public_ec2.get(private_ec2.iam_profile_name)
            profile = private_ec2.iam_role.get_friendly_name() \
                if private_ec2.iam_role  \
                else private_ec2.iam_profile_name
            if public_ec2:
                issues.append(
                    Issue(
                        f"~Instance `{public_ec2.get_friendly_name()}`~. Instance is publicly exposed. "
                        f"Instance uses IAM role `{profile}`. "
                        f"Private EC2 instance shares IAM role `{profile}` as well. "
                        f"~Instance `{private_ec2.get_friendly_name()}`~",
                        private_ec2,
                        private_ec2.iam_role))
        return issues

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.ec2s)
car_iam_policy_control_in_iac_only
from typing import List, Dict, Tuple, Union

from cloudrail.knowledge.context.aws.resources.account.account import Account
from cloudrail.knowledge.context.aws.resources.iam.iam_group import IamGroup
from cloudrail.knowledge.context.aws.resources.iam.iam_identity import IamIdentity
from cloudrail.knowledge.context.aws.resources.iam.iam_user import IamUser
from cloudrail.knowledge.context.aws.resources.iam.policy import InlinePolicy, ManagedPolicy
from cloudrail.knowledge.context.mergeable import EntityOrigin
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class EnsureIamEntitiesPolicyManagedSolely(AwsBaseRule):

    def get_id(self) -> str:
        return 'car_iam_policy_control_in_iac_only'

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []

        for account in env_context.accounts:
            filtered_iam_entities = self._filter_entities_by_account(env_context.roles, account) + \
                                    self._filter_entities_by_account(env_context.users, account) + \
                                    self._filter_entities_by_account(env_context.groups, account)

            for entity in filtered_iam_entities:
                affected_policies = self._get_live_env_attached_policies(entity)
                for policy in affected_policies:
                    issues.append(
                        Issue(f'~`{policy.get_friendly_name()}`~. '
                              f'is attached to {entity.get_type()} `{entity.get_friendly_name()}`. `{entity.get_friendly_name()}` '
                              f'is declared in infrastructure-as-code. The attachment of the policy was done outside of the code '
                              f'(for example, directly via the console)', account, policy))
                if isinstance(entity, IamUser):
                    affected_groups = self._get_group_attach_policies_aws(entity)
                    for group, policies in affected_groups:
                        policies_list_string = ', '.join([policy.get_friendly_name() for policy in policies])
                        issues.append(
                            Issue(f'{entity.get_type()} `{entity.get_friendly_name()}` is declared in infrastructure-as-code. '
                                  f'The attachment of policy(s) `{policies_list_string}` to it was done outside the code '
                                  f'(for example, directly via the console), by adding it to the group `{group.get_friendly_name()}`.',
                                  account, group))
        return issues

    @staticmethod
    def filter_non_iac_managed_issues() -> bool:
        return False

    def _filter_entities_by_account(self, entities: List[IamIdentity], account: Account) -> List[IamIdentity]:
        return [entity for entity in entities if entity.account == account.account and self._are_there_existing_tf_entities(entity)]

    @staticmethod
    def _are_there_existing_tf_entities(entity: IamIdentity) -> bool:
        return entity.is_managed_by_iac and not entity.is_new_resource() and \
               any((isinstance(policy, ManagedPolicy) and
                    any(pao.get(policy.get_name()) == EntityOrigin.TERRAFORM for pao in entity.get_policies_attach_origin_maps())
                    or (isinstance(policy, InlinePolicy) and policy.is_managed_by_iac)) for policy in entity.get_policies())

    @staticmethod
    def _get_live_env_attached_policies(entity: IamIdentity) -> List[Union[ManagedPolicy, InlinePolicy]]:
        affected_policies = []
        for policy in entity.permissions_policies:
            if (isinstance(policy, ManagedPolicy)
                    and any(pao.get(policy.get_name()) == EntityOrigin.LIVE_ENV for pao in entity.get_policies_attach_origin_maps())):
                affected_policies.append(policy)
            elif isinstance(policy, InlinePolicy) and not policy.is_managed_by_iac:
                affected_policies.append(policy)
        return affected_policies

    def _get_group_attach_policies_aws(self, user: IamUser) -> List[Tuple[IamGroup, List[Union[ManagedPolicy, InlinePolicy]]]]:
        issues_list = []
        for group in user.groups:
            affected_policies = self._get_live_env_attached_policies(group)
            if affected_policies:
                issues_list.append((group, affected_policies))
        return issues_list

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.accounts and environment_context.get_all_iam_entities())
iam_priv_escalation_policy
from typing import Dict, List, Set, Optional

from cloudrail.knowledge.context.aws.resources.iam.iam_identity import IamIdentity
from cloudrail.knowledge.context.aws.resources.iam.policy import Policy
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
from cloudrail.knowledge.utils.action_utils import is_combo_escalation_permissions_match, attribute_match, LAMBDA_UPDATE_ACTION, EC2_RUN_INSTANCE_ACTION, \
    LAMBDA_INVOKE_FUNCTION_ACTION, LAMBDA_CREATE_EVENT_ACTION


class IamPrivilegeEscalationPolicyRule(AwsBaseRule):
    EVIDENCE_TEMPLATE: str = "~`{}`~. is applied to `{}`. {}{}"

    def get_id(self) -> str:
        return "iam_priv_escalation_policy"

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues = []
        for iam_entity in env_context.get_all_iam_entities():
            issue = self._add_issues_by_iam_entity(iam_entity)
            if issue:
                issues.append(issue)
        return issues

    def _add_issues_by_iam_entity(self, iam_entity: IamIdentity) -> Optional[Issue]:
        if iam_entity.policy_to_escalation_actions_map:
            all_policies_esc_actions: Set[str] = {esc_action for esc_actions in iam_entity.policy_to_escalation_actions_map.values()
                                                  for esc_action in esc_actions}
            uuid_to_policy_map: Dict[str, Policy] = {policy.uuid: policy for policy in iam_entity.permissions_policies}
            if is_combo_escalation_permissions_match(all_policies_esc_actions):
                policies: List[Policy] = [uuid_to_policy_map[policy_uuid] for policy_uuid in iam_entity.policy_to_escalation_actions_map.keys()]
                return self._handle_issues(iam_entity, policies, all_policies_esc_actions)
        return None

    def _handle_issues(self, iam_entity: IamIdentity, policies: List[Policy], esc_action_list: Set[str]) -> Issue:
        specific_evidence: str = self._get_evidence_str(esc_action_list)
        multiple_policies_section: str = self._get_multiple_policies_evidence_section(policies)
        policy: Policy = policies[0]
        if multiple_policies_section:
            specific_evidence = ' ' + specific_evidence
        evidence: str = self.EVIDENCE_TEMPLATE.format(
            policy.get_friendly_name(), iam_entity.get_arn(), multiple_policies_section, specific_evidence)
        if policy.is_managed_by_iac:
            return Issue(evidence, policy, policy)
        else:
            return Issue(evidence, iam_entity, iam_entity)

    @staticmethod
    def _get_multiple_policies_evidence_section(policies: List[Policy]) -> str:
        multiple_policies_section: str = ""
        if len(policies) > 1:
            multiple_policies_section = "in conjunction with " + \
                                        ', '.join([f"`{policy.get_friendly_name()}`" for policy in policies])
        return multiple_policies_section

    @classmethod
    def _get_evidence_str(cls, esc_action_list: Set[str]):
        if cls._is_specific_evidence(esc_action_list, LAMBDA_UPDATE_ACTION):
            return "`lambda:UpdateFunctionCode` allows a hacker to run their code under the lambda permission"
        elif cls._is_specific_evidence(esc_action_list, EC2_RUN_INSTANCE_ACTION):
            return "`iam:PassRole` and `ec2:RunInstances` allows a hacker to run an EC2 instance with a role they choose"
        elif cls._is_specific_evidence(esc_action_list, LAMBDA_INVOKE_FUNCTION_ACTION) or \
                cls._is_specific_evidence(esc_action_list, LAMBDA_CREATE_EVENT_ACTION):
            return f"{str(esc_action_list)} allows a hacker to trigger a Lambda function with a role they choose"
        else:
            return f"granting the problematic permissions `{str(esc_action_list)}` which can allow for privilege escalation"

    @staticmethod
    def _is_specific_evidence(esc_action_list: Set[str], action: str):
        return "*" not in esc_action_list and any(attribute_match(esc_action, action)
                                                  for esc_action in esc_action_list)

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.get_all_iam_entities())
non_car_iam_no_human_users
from typing import Dict, List
from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class IamNoHumanUsersRule(AwsBaseRule):

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues_list: List[Issue] = []
        for user in env_context.users:
            if any(user.name == login_profile.name and user.account == login_profile.account for login_profile in env_context.users_login_profile):
                issues_list.append(Issue(f'The {user.get_type()} `{user.get_friendly_name()}` has console access, '
                                         f'and so is considered human', user, user))
        return issues_list

    def get_id(self) -> str:
        return "non_car_iam_no_human_users"

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.users and environment_context.users_login_profile)
not_car_access_analyzer_validation_error_and_security
from abc import ABC, abstractmethod
from typing import List, Dict, Set

from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext
from cloudrail.knowledge.rules.aws.aws_base_rule import AwsBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class AccessAnalyzerValidationBaseRule(AwsBaseRule, ABC):

    def should_run_rule(self, environment_context: AwsEnvironmentContext) -> bool:
        return bool(environment_context.get_iac_managed_policies())

    def execute(self, env_context: AwsEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues = []

        for policy in env_context.get_iac_managed_policies():
            evidence = self._create_evidence_from_findings(policy.access_analyzer_findings)
            if evidence:
                issues.append(Issue(evidence, policy, policy))

        return issues

    def _create_evidence_from_findings(self, findings):
        evidences = []
        for finding in findings:
            evidence = ''
            if finding.get('findingType') in self._get_violated_finding_types():
                if finding.get('locations'):
                    start = finding['locations'][0]['span']['start']
                    line = start['line']
                    column = start['column']
                    prefix = f'Line {line}, Col {column}:'
                else:
                    prefix = 'Finding Without Specific Location In Policy:'
                finding_details = finding['findingDetails']
                learn_more = finding['learnMoreLink']
                evidence += f'~{prefix}~. {finding_details} See {learn_more}'
                evidences.append(evidence)
        return '. '.join(evidences)

    @abstractmethod
    def _get_violated_finding_types(self) -> Set[str]:
        pass

IamGroup (IamIdentity)

Attributes:

Name Type Description
name str

The name of the IAM Group.

group_id str

The ID of the group.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

IamGroupMembership (AwsResource)

Attributes:

Name Type Description
name str

The name of the group membership.

group str

The group the users belong to.

users List[str]

The list of users who are members of the designated group.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

IamIdentity (AwsResource, ConnectionInstance, Cloneable)

Attributes:

Name Type Description
qualified_arn str

A Cloudrail-caculated ARN for the role that ensures it's the same whether the role came from infrastructure-as-code (such as Terraform) or the live AWS environment.

arn str

The ARN of the IAM identity.

permissions_policies List[Union[ManagedPolicy, InlinePolicy]]

One or more policies used to give the IAM entity permissions to take certain actions.

permission_boundary Optional[Policy]

The permission boundary limiting the IAM entity's permissions.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

IamIdentityType (str, Enum)

An enumeration.

IamInstanceProfile (AwsResource)

Attributes:

Name Type Description
role_name str

The name of the role.

iam_instance_profile_name str

The name of the instance profile.

ec2_instance_data Ec2Instance

The Ec2Instance using this profile.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

IamPasswordPolicy (AwsResource)

Attributes:

Name Type Description
min_pass_length int

The minimum length required for passwords.

require_low_case_characters bool

True if lower cases characters are required according to the policy.

require_upper_case_characters bool

True if upper cases characters are required according to the policy.

require_numbers bool

True if number characters are required according to the policy.

require_symbols bool

True if symbol characters are required according to the policy.

allow_users_to_change_pass bool

True if users can change their password.

max_pass_age int

The maximum age of a password before it needs to be replaced.

password_reuse_prevention int

Number of passwords kept in history that cannot be repeated.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

IamPolicyAttachment (AwsResource)

Attributes:

Name Type Description
policy_arn str

The ARN of the policy to attach.

users Optional[List]

The list of users to attach the policy to, may be empty or None.

roles Optional[List]

The list of roles to attach the policy to, may be empty or None.

groups Optional[List]

The list of groups to attach the policy to, may be empty or None.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

IamUser (IamIdentity)

Attributes:

Name Type Description
name str

The name of the user.

user_id str

The ID of the user.

groups List[IamGroup]

The groups the user belongs to.

groups_attach_origin_map List[Dict]

A cache map used to "remember" the origin of user-to-group attachments (whether from live account, IaC, etc.).

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

IamUserGroupMembership (AwsResource)

Attributes:

Name Type Description
user str

The user the membership is focused on.

groups List[str]

The groups the user should be a member of.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

IamUsersLoginProfile (AwsResource)

Attributes:

Name Type Description
name str

The name of the user the login profile is for.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

AssumeRolePolicy (Policy)

Attributes:

Name Type Description
role_name str

The name of the role that uses this policy.

role_arn str

The ARN of the role that uses this policy.

is_allowing_external_assume bool

An indication on if this policy can be assumed by a resource outside of this policy's account.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

InlinePolicy (Policy)

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

ManagedPolicy (Policy)

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

Policy (AwsResource, Cloneable)

Attributes:

Name Type Description
statements List[PolicyStatement]

The list of statements in the policy.

uuid str

A randomly generated uuid for the policy (ignore, for internal use).

raw_document

The raw JSON of the policy.

access_analyzer_findings

The results from running IAM Access Analyzer's policy validation API on this policy's JSON.

policy_type

The type of the policy (identity, resource, SCP).

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

PolicyType (str, Enum)

An enumeration.

PolicyGroupAttachment (AwsResource)

Attributes:

Name Type Description
policy_arn str

The policy to attach to the group.

group_id str

The ID of the group to attach the policy to.

group_name str

The name of the group to attach the policy to.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

PolicyRoleAttachment (AwsResource)

Attributes:

Name Type Description
policy_arn str

The policy to attach to the role.

role_name str

The name of the role to attach the policy to.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

PolicyStatement (Cloneable)

Attributes:

Name Type Description
effect

The effect of the statement (Allow / Deny).

actions

The actions covered by the statements.

resources

The resources covered by the statement.

principal

The principal(s) included.

statement_id

The id of the statement.

condition_block

List of conditions included in the statement, or None if there aren't any.

policy

The policy the statement belong to, if it does.

StatementCondition dataclass

StatementCondition(operator: str, key: str, values: List[str])

StatementEffect (Enum)

An enumeration.

PolicyUserAttachment (AwsResource)

Attributes:

Name Type Description
policy_arn str

The policy to attach to the user.

user_id str

The ID of the user to attach the policy to.

user_name str

The name of the user to attach the policy to.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

Principal dataclass

Principal(principal_type: cloudrail.knowledge.context.aws.resources.iam.principal.PrincipalType, principal_values: List[str])

PrincipalType (Enum)

An enumeration.

Role (IamIdentity)

Attributes:

Name Type Description
role_name str

THe name of the role.

role_id str

The role's ID.

permission_boundary_arn Optional[str]

The ARN of the permission boundary if one applies (may be None).

creation_date str

The date of creation of the role.

arn

The ARN of the role.

assume_role_policy AssumeRolePolicy

The assume role policy.

policy_evaluation_result_map Dict[str, PolicyEvaluation]

A caching of the policy evaluation for the role.

last_used_date RoleLastUsed

Last date the role was used (comes from an API call made to the AWS IAM API).

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

RoleLastUsed (AwsResource)

Attributes:

Name Type Description
role_name str

The name of the role.

arn str

The ARN of the role.

last_used_date str

The last date the role was used in.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process