Skip to content

keyvault

Sample rules

A few rules that use objects from this package:

car_key_vault_diagnostic_logs_enabled
from abc import abstractmethod
from typing import Iterable, List, Dict, Union

from cloudrail.knowledge.context.azure.azure_environment_context import AzureEnvironmentContext
from cloudrail.knowledge.context.azure.resources.i_monitor_settings import IMonitorSettings
from cloudrail.knowledge.context.mergeable import Mergeable
from cloudrail.knowledge.rules.azure.azure_base_rule import AzureBaseRule
from cloudrail.knowledge.rules.base_rule import Issue
from cloudrail.knowledge.rules.rule_parameters.base_paramerter import ParameterType


class AbstractDiagnosticLogsRule(AzureBaseRule):

    def execute(self, env_context: AzureEnvironmentContext, parameters: Dict[ParameterType, any]) -> List[Issue]:
        issues: List[Issue] = []

        for resource in self.get_resources(env_context):
            evidence_msg = f'{resource.get_type()} `{resource.get_friendly_name()}`'
            if not resource.get_monitor_settings():
                issues.append(Issue(f'The {evidence_msg} does not have diagnostic settings', resource, resource))
            else:
                for monitor_settings in resource.get_monitor_settings():

                    monitor_msg = f'The Monitor Diagnostic Setting {monitor_settings.name}, associated to {evidence_msg},'
                    if not monitor_settings.logs_settings:
                        issues.append(Issue(f'{monitor_msg} does not have log block configuration', resource, monitor_settings))
                    elif not monitor_settings.logs_settings.enabled:
                        issues.append(Issue(f'{monitor_msg} does not have log enabled', resource, monitor_settings))
                    elif not monitor_settings.logs_settings.retention_policy:
                        issues.append(Issue(f'{monitor_msg} does not have a log retention policy', resource, monitor_settings))
                    elif not monitor_settings.logs_settings.retention_policy.enabled:
                        issues.append(Issue(f'{monitor_msg} have a disabled log retention policy', resource, monitor_settings))
                    elif 0 < monitor_settings.logs_settings.retention_policy.days < 365:
                        issues.append(Issue(f'{monitor_msg} does not have a log retention policy days equal to 0 or greater than or equal to 365',
                                            resource, monitor_settings))

        return issues

    @abstractmethod
    def get_id(self) -> str:
        pass

    @staticmethod
    @abstractmethod
    def get_resources(env_context: AzureEnvironmentContext) -> Iterable[Union[Mergeable, IMonitorSettings]]:
        pass

    def should_run_rule(self, environment_context: AzureEnvironmentContext) -> bool:
        return bool(self.get_resources(environment_context))


class KeyVaultDiagnosticLogsEnabledRule(AbstractDiagnosticLogsRule):

    def get_id(self) -> str:
        return 'car_key_vault_diagnostic_logs_enabled'

    @staticmethod
    def get_resources(env_context: AzureEnvironmentContext):
        return env_context.key_vaults


class DataLakeAnalyticsDiagnosticLogsEnabledRule(AbstractDiagnosticLogsRule):

    def get_id(self) -> str:
        return 'car_data_lake_analytics_account_diagnostic_logs_enabled'

    @staticmethod
    def get_resources(env_context: AzureEnvironmentContext) -> Iterable[Union[Mergeable, IMonitorSettings]]:
        return env_context.data_lake_analytics_accounts


class BatchAccountDiagnosticLogsEnabledRule(AbstractDiagnosticLogsRule):

    def get_id(self) -> str:
        return 'car_batch_account_diagnostic_logs_enabled'

    @staticmethod
    def get_resources(env_context: AzureEnvironmentContext) -> Iterable[Union[Mergeable, IMonitorSettings]]:
        return env_context.batch_accounts


class IotHubDiagnosticLogsEnabledRule(AbstractDiagnosticLogsRule):

    def get_id(self) -> str:
        return 'car_iot_hub_diagnostic_logs_enabled'

    @staticmethod
    def get_resources(env_context: AzureEnvironmentContext):
        return env_context.iot_hubs


class DataLakeStoreDiagnosticLogsEnabledRule(AbstractDiagnosticLogsRule):

    def get_id(self) -> str:
        return 'car_data_lake_store_diagnostic_logs_enabled'

    def get_resources(self, env_context: AzureEnvironmentContext) -> Iterable[Union[Mergeable, IMonitorSettings]]:
        return env_context.data_lake_store.values()


class LogicAppWorkflowDiagnosticLogsEnabledRule(AbstractDiagnosticLogsRule):

    def get_id(self) -> str:
        return 'car_logic_app_workflow_diagnostic_logs_enabled'

    def get_resources(self, env_context: AzureEnvironmentContext) -> Iterable[Union[Mergeable, IMonitorSettings]]:
        return env_context.logic_app_workflows


class SearchServiceDiagnosticLogsEnabledRule(AbstractDiagnosticLogsRule):

    def get_id(self) -> str:
        return 'car_search_service_diagnostic_logs_enabled'

    def get_resources(self, env_context: AzureEnvironmentContext) -> Iterable[Union[Mergeable, IMonitorSettings]]:
        return env_context.search_services


class ServiceBusNamespaceDiagnosticLogsEnabledRule(AbstractDiagnosticLogsRule):

    def get_id(self) -> str:
        return 'car_service_bus_namespace_diagnostic_logs_enabled'

    def get_resources(self, env_context: AzureEnvironmentContext) -> Iterable[Union[Mergeable, IMonitorSettings]]:
        return env_context.service_bus_namespaces


class StreamAnalyitcsJobDiagnosticLogsEnabledRule(AbstractDiagnosticLogsRule):

    def get_id(self) -> str:
        return 'car_stream_analytics_job_diagnostic_logs_enabled'

    def get_resources(self, env_context: AzureEnvironmentContext) -> Iterable[Union[Mergeable, IMonitorSettings]]:
        return env_context.stream_analytics_jobs


class EventHubNamespaceDiagnosticLogsEnabledRule(AbstractDiagnosticLogsRule):

    def get_id(self) -> str:
        return 'car_event_hub_namespace_diagnostic_logs_enabled'

    def get_resources(self, env_context: AzureEnvironmentContext) -> Iterable[Union[Mergeable, IMonitorSettings]]:
        return env_context.event_hub_namespaces

AzureKeyVault (AzureResource, IMonitorSettings)

Attributes:

Name Type Description
name str

The KeyVault name

monitor_diagnostic_settings List[AzureMonitorDiagnosticSetting]

The monitoring settings of this KeyVault

purge_protection_enabled bool

Indication if Purge Protection is enabled for this KeyVault

vault_uri

The URI of the Key Vault, used for performing operations on keys and secrets.

custom_invalidation(self) inherited

A list of manual reasons why this resource should be invalidated

exclude_from_invalidation(self) inherited

A list of attributes that should be excluded from the invalidation process