Skip to content


Sample rules

A few rules that use objects from this package:

from abc import abstractmethod
from dataclasses import dataclass
from typing import List, Dict, Optional, Union, Iterable

from import AzureNetworkInterface
from import AzureNetworkSecurityGroup
from import AzureNetworkSecurityRule, \
from import AzurePublicIp
from import NetworkResource
from cloudrail.knowledge.context.connection import ConnectionType, PortConnectionProperty, ConnectionDirectionType
from import AzureEnvironmentContext
from import AzureBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
from cloudrail.knowledge.utils.port_set import PortSet

class ViolationData:
    violating_resource: Optional[Union[AzureNetworkSecurityGroup, AzureNetworkSecurityRule, AzurePublicIp]]
    public_ip: Optional[str]

class NotPubliclyAccessibleBaseRule(AzureBaseRule):

    def __init__(self, port: int, port_protocol: str):
        self.port: int = port
        self.port_protocol: str = port_protocol

    def execute(self, env_context: AzureEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []
        for resource in self.get_resources(env_context):
            violating = self._get_violating(resource.network_interfaces)

            if violating:
                resource_msg = f'The {resource.get_type()} `{resource.get_friendly_name()}`'
                if isinstance(violating.violating_resource, (AzureNetworkSecurityGroup, AzurePublicIp)):
                    if violating.public_ip:
                        public_ip_msg = f'with public IP address `{violating.public_ip}`'
                        public_ip_msg = 'with unknown public IP address'
                    issues.append(Issue(f'{resource_msg} '
                                        f'{public_ip_msg} is reachable from the internet via {self.port_protocol} port', resource, violating.violating_resource))
                        f'{resource_msg} with NAT rule '
                        f'`{violating.violating_resource.get_friendly_name()}` is reachable from the internet via {self.port_protocol} port',
                        resource, violating.violating_resource))
        return issues

    def _get_violating(self, network_interfaces: List[AzureNetworkInterface]) -> Optional[ViolationData]:
        for nic in network_interfaces:
            for ip_config in nic.ip_configurations:
                for inbound_connection in ip_config.inbound_connections:
                    if inbound_connection.connection_type == ConnectionType.PUBLIC \
                            and isinstance(inbound_connection.connection_property, PortConnectionProperty) \
                            and self.port in PortSet(inbound_connection.connection_property.ports):
                        if nic.network_security_group is None and ip_config.subnet.network_security_group is None:
                            return ViolationData(ip_config.public_ip, ip_config.public_ip.public_ip_address)
                        return ViolationData(self._get_violating_nsg_or_rule(nic.network_security_group, ip_config.subnet.network_security_group),
        return None

    def _get_violating_nsg_or_rule(self, nic_nsg: Optional[AzureNetworkSecurityGroup], subnet_nsg: Optional[AzureNetworkSecurityGroup]):
        for nsg in list(filter(None, [nic_nsg, subnet_nsg])):
            for nsg_rule in nsg.network_security_rules:
                if nsg_rule.direction == ConnectionDirectionType.INBOUND and nsg_rule.access == NetworkSecurityRuleActionType.ALLOW and self.port in nsg_rule.destination_port_ranges:
                    if nsg_rule.is_managed_by_iac:
                        return nsg_rule
                    return nsg
        return None

    def get_id(self) -> str:

    def get_resources(env_context: AzureEnvironmentContext) -> Iterable[NetworkResource]:

    def should_run_rule(self, environment_context: AzureEnvironmentContext) -> bool:
        return bool(self.get_resources(environment_context))

class VirtualMachineNotPubliclyAccessibleRdpRule(NotPubliclyAccessibleBaseRule):

    def __init__(self):
        super().__init__(3389, 'RDP')

    def get_id(self) -> str:
        return 'car_vm_not_publicly_accessible_rdp'

    def get_resources(env_context: AzureEnvironmentContext):
        return env_context.virtual_machines

class VirtualMachineNotPubliclyAccessibleSshRule(NotPubliclyAccessibleBaseRule):

    def __init__(self):
        super().__init__(22, 'SSH')

    def get_id(self) -> str:
        return 'car_vm_not_publicly_accessible_ssh'

    def get_resources(env_context: AzureEnvironmentContext):
        return env_context.virtual_machines

AzureVirtualMachine (NetworkResource)


Name Type Description
name str

The name of this Public IP.

network_interface_ids List[str]

A list of Network Interface ID's which are associated with the Virtual Machine.

os_type OperatingSystemType

The VM's operating system. Either Windows or Linux.

disk_settings DiskSettings

Information about the disks used by this Virtual Machine.

sku str

The SKU name of this Virtual Machine.

source_image_reference SourceImageReference

The image used for the Virtual Machine OS.

disable_password_authentication Optional[bool]

If Password Authentication should be disabled on this Virtual Machine (relevant only for Linux OS).

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process

DataDisk dataclass

DataDisk(name: Optional[str], is_managed_disk: bool)

DiskSettings dataclass

DiskSettings(os_disk:, data_disks: Optional[List[]])

OperatingSystemType (Enum)

An enumeration.

OsDisk dataclass


Name Type Description
name Optional[str]

Name of the disk.

is_managed_disk bool

An indication if the disk is managed or not.

caching OsDiskCaching

The Type of Caching which should be used for the Internal OS Disk.

storage_account_type Optional[]

The Type of Storage Account which should back this the Internal OS Disk.

OsDiskCaching (Enum)

An enumeration.

OsDiskStorageAccountType (Enum)

An enumeration.

SourceImageReference dataclass


Name Type Description
publisher str

Specifies the publisher of the image used to create the virtual machines.

offer str

Specifies the offer of the image used to create the virtual machines.

sku str

Specifies the SKU of the image used to create the virtual machines.

version str

Specifies the version of the image used to create the virtual machines.