vm
Sample rules
A few rules that use objects from this package:
car_vm_not_publicly_accessible_rdp
from abc import abstractmethod
from dataclasses import dataclass
from typing import List, Dict, Optional, Union, Iterable
from cloudrail.knowledge.context.azure.resources.network.azure_network_interface import AzureNetworkInterface
from cloudrail.knowledge.context.azure.resources.network.azure_network_security_group import AzureNetworkSecurityGroup
from cloudrail.knowledge.context.azure.resources.network.azure_network_security_group_rule import AzureNetworkSecurityRule, \
NetworkSecurityRuleActionType
from cloudrail.knowledge.context.azure.resources.network.azure_public_ip import AzurePublicIp
from cloudrail.knowledge.context.azure.resources.network_resource import NetworkResource
from cloudrail.knowledge.context.connection import ConnectionType, PortConnectionProperty, ConnectionDirectionType
from cloudrail.knowledge.context.azure.azure_environment_context import AzureEnvironmentContext
from cloudrail.knowledge.rules.azure.azure_base_rule import AzureBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType
from cloudrail.knowledge.utils.port_set import PortSet
@dataclass
class ViolationData:
violating_resource: Optional[Union[AzureNetworkSecurityGroup, AzureNetworkSecurityRule, AzurePublicIp]]
public_ip: Optional[str]
class NotPubliclyAccessibleBaseRule(AzureBaseRule):
def __init__(self, port: int, port_protocol: str):
self.port: int = port
self.port_protocol: str = port_protocol
def execute(self, env_context: AzureEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
issues: List[Issue] = []
for resource in self.get_resources(env_context):
violating = self._get_violating(resource.network_interfaces)
if violating:
resource_msg = f'The {resource.get_type()} `{resource.get_friendly_name()}`'
if isinstance(violating.violating_resource, (AzureNetworkSecurityGroup, AzurePublicIp)):
if violating.public_ip:
public_ip_msg = f'with public IP address `{violating.public_ip}`'
else:
public_ip_msg = 'with unknown public IP address'
issues.append(Issue(f'{resource_msg} '
f'{public_ip_msg} is reachable from the internet via {self.port_protocol} port', resource, violating.violating_resource))
else:
issues.append(Issue(
f'{resource_msg} with NAT rule '
f'`{violating.violating_resource.get_friendly_name()}` is reachable from the internet via {self.port_protocol} port',
resource, violating.violating_resource))
return issues
def _get_violating(self, network_interfaces: List[AzureNetworkInterface]) -> Optional[ViolationData]:
for nic in network_interfaces:
for ip_config in nic.ip_configurations:
for inbound_connection in ip_config.inbound_connections:
if inbound_connection.connection_type == ConnectionType.PUBLIC \
and isinstance(inbound_connection.connection_property, PortConnectionProperty) \
and self.port in PortSet(inbound_connection.connection_property.ports):
if nic.network_security_group is None and ip_config.subnet.network_security_group is None:
return ViolationData(ip_config.public_ip, ip_config.public_ip.public_ip_address)
return ViolationData(self._get_violating_nsg_or_rule(nic.network_security_group, ip_config.subnet.network_security_group),
ip_config.public_ip.public_ip_address)
return None
def _get_violating_nsg_or_rule(self, nic_nsg: Optional[AzureNetworkSecurityGroup], subnet_nsg: Optional[AzureNetworkSecurityGroup]):
for nsg in list(filter(None, [nic_nsg, subnet_nsg])):
for nsg_rule in nsg.network_security_rules:
if nsg_rule.direction == ConnectionDirectionType.INBOUND and nsg_rule.access == NetworkSecurityRuleActionType.ALLOW and self.port in nsg_rule.destination_port_ranges:
if nsg_rule.is_managed_by_iac:
return nsg_rule
return nsg
return None
@abstractmethod
def get_id(self) -> str:
pass
@staticmethod
@abstractmethod
def get_resources(env_context: AzureEnvironmentContext) -> Iterable[NetworkResource]:
pass
def should_run_rule(self, environment_context: AzureEnvironmentContext) -> bool:
return bool(self.get_resources(environment_context))
class VirtualMachineNotPubliclyAccessibleRdpRule(NotPubliclyAccessibleBaseRule):
def __init__(self):
super().__init__(3389, 'RDP')
def get_id(self) -> str:
return 'car_vm_not_publicly_accessible_rdp'
@staticmethod
def get_resources(env_context: AzureEnvironmentContext):
return env_context.virtual_machines
class VirtualMachineNotPubliclyAccessibleSshRule(NotPubliclyAccessibleBaseRule):
def __init__(self):
super().__init__(22, 'SSH')
def get_id(self) -> str:
return 'car_vm_not_publicly_accessible_ssh'
@staticmethod
def get_resources(env_context: AzureEnvironmentContext):
return env_context.virtual_machines
AzureVirtualMachine (NetworkResource)
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
The name of this Public IP. |
network_interface_ids |
List[str] |
A list of Network Interface ID's which are associated with the Virtual Machine. |
os_type |
OperatingSystemType |
The VM's operating system. Either Windows or Linux. |
disk_settings |
DiskSettings |
Information about the disks used by this Virtual Machine. |
sku |
str |
The SKU name of this Virtual Machine. |
source_image_reference |
SourceImageReference |
The image used for the Virtual Machine OS. |
disable_password_authentication |
Optional[bool] |
If Password Authentication should be disabled on this Virtual Machine (relevant only for Linux OS). |
custom_invalidation(self)
inherited
A list of manual reasons why this resource should be invalidated
exclude_from_invalidation(self)
inherited
A list of attributes that should be excluded from the invalidation process
DataDisk
dataclass
DataDisk(name: Optional[str], is_managed_disk: bool)
DiskSettings
dataclass
DiskSettings(os_disk: cloudrail.knowledge.context.azure.resources.vm.azure_virtual_machine.OsDisk, data_disks: Optional[List[cloudrail.knowledge.context.azure.resources.vm.azure_virtual_machine.DataDisk]])
OperatingSystemType (Enum)
An enumeration.
OsDisk
dataclass
Attributes:
Name | Type | Description |
---|---|---|
name |
Optional[str] |
Name of the disk. |
is_managed_disk |
bool |
An indication if the disk is managed or not. |
caching |
OsDiskCaching |
The Type of Caching which should be used for the Internal OS Disk. |
storage_account_type |
Optional[cloudrail.knowledge.context.azure.resources.vm.azure_virtual_machine.OsDiskStorageAccountType] |
The Type of Storage Account which should back this the Internal OS Disk. |
OsDiskCaching (Enum)
An enumeration.
OsDiskStorageAccountType (Enum)
An enumeration.
SourceImageReference
dataclass
Attributes:
Name | Type | Description |
---|---|---|
publisher |
str |
Specifies the publisher of the image used to create the virtual machines. |
offer |
str |
Specifies the offer of the image used to create the virtual machines. |
sku |
str |
Specifies the SKU of the image used to create the virtual machines. |
version |
str |
Specifies the version of the image used to create the virtual machines. |