Source code for kafka_overwatch.config.config
# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from kafka_overwatch.specs.config import KafkaOverwatchInputConfiguration
from tempfile import TemporaryDirectory
from cryptography.fernet import Fernet
from dacite import from_dict
from prometheus_client import CollectorRegistry, multiprocess
from kafka_overwatch.monitoring.prometheus import (
set_kafka_cluster_prometheus_registry_collectors,
set_schema_registry_prometheus_registry_collectors,
)
from kafka_overwatch.notifications.aws_sns import SnsChannel
from kafka_overwatch.overwatch_resources.schema_registry import SchemaRegistry
from kafka_overwatch.specs.config import Global
[docs]
class OverwatchConfig:
"""
Class to store in-memory the clusters and their configurations, derived from the input configuration
classes.
"""
def __init__(
self,
config: KafkaOverwatchInputConfiguration,
prometheus_dir: TemporaryDirectory,
):
if not config.global_:
config.global_ = from_dict(Global, {"cluster_scan_interval_in_seconds": 30})
self._config = config
self._prometheus_registry_dir = prometheus_dir
self.prometheus_registry: CollectorRegistry = CollectorRegistry(
auto_describe=True,
)
try:
self.prometheus_collectors = (
set_kafka_cluster_prometheus_registry_collectors(
self.prometheus_registry
)
)
self.prometheus_collectors.update(
set_schema_registry_prometheus_registry_collectors(
self.prometheus_registry
)
)
multiprocess.MultiProcessCollector(
self.prometheus_registry, path=self._prometheus_registry_dir.name
)
except Exception as error:
print("Error with prometheus_registry", error)
self.runtime_key = Fernet.generate_key()
self.sns_channels: dict[str, SnsChannel] = {}
self.schema_registries: dict[str, SchemaRegistry] = {}
self.init_schema_registries()
self.init_notification_channels()
def __reduce__(self):
# Return a tuple with the callable and its arguments
return (self.__class__, (self._config, self._prometheus_registry_dir))
@property
def input_config(self):
return self._config
@property
def prometheus_registry_dir(self) -> TemporaryDirectory:
return self._prometheus_registry_dir
[docs]
def init_schema_registries(self):
"""Initializes the Schema Registries client if setup in the configuration"""
if not self.input_config.schema_registries:
return
for registry_name, registry in self.input_config.schema_registries.items():
_registry = SchemaRegistry(registry_name, registry, self.runtime_key)
self.schema_registries[registry_name] = _registry
[docs]
def init_notification_channels(self):
if not self._config.notification_channels:
return
if not self._config.notification_channels.sns:
return
for (
sns_channel_name,
sns_channel_definition,
) in self._config.notification_channels.sns.items():
sns_channel = SnsChannel(sns_channel_name, sns_channel_definition)
self.sns_channels[sns_channel_name] = sns_channel