Source code for kafka_overwatch.overwatch

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2024 John Mille <john@ews-network.net>

from __future__ import annotations

import time
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from kafka_overwatch.config.config import OverwatchConfig

import concurrent.futures
import signal
from multiprocessing import Event, Manager

from kafka_overwatch.config.logging import KAFKA_LOG
from kafka_overwatch.overwatch_resources.clusters import KafkaCluster
from kafka_overwatch.processing.clusters import process_cluster
from kafka_overwatch.processing.schema_registries import process_schema_registry

STOP_FLAG = Event()


[docs] def handle_signals(pid, frame, stop_flag): print("Cluster processing received signal to stop", pid, frame) stop_flag["stop"] = True STOP_FLAG.set()
[docs] class KafkaOverwatchService: """ Main service which starts the prometheus server and the processes for each Kafka cluster to be processed independently. Upon receiving SIGTERM or SIGINT, it will send a SIGTERM to all child processes The prometheus dependency is imported at the last minute to make sure it will take the multiprocess folder env var into account. """ def __init__(self, config: OverwatchConfig) -> None: self._config = config self.kafka_clusters: dict[str, KafkaCluster] = {} @property def config(self) -> OverwatchConfig: return self._config
[docs] def init_prometheus(self): """ Import prometheus_client.start_http_server at the latest point to make sure the multiprocess folder env var is taken into account. """ from prometheus_client import start_http_server return start_http_server(8000, registry=self.config.prometheus_registry)
[docs] def start(self): KAFKA_LOG.info("Starting Kafka Overwatch") httpd, _ = self.init_prometheus() clusters_jobs = [] manager = Manager() stop_flag = manager.dict() stop_flag["stop"] = False signal.signal( signal.SIGTERM, lambda signum, frame: handle_signals(signum, frame, stop_flag), ) signal.signal( signal.SIGINT, lambda signum, frame: handle_signals(signum, frame, stop_flag), ) self.kafka_clusters.update( { name: KafkaCluster(name, config, self.config) for name, config in self.config.input_config.clusters.items() } ) sr_jobs: list = [ [_sr, self.config.runtime_key, self.config, stop_flag] for _sr in self.config.schema_registries.values() ] for ( cluster_name, cluster, ) in self.kafka_clusters.items(): clusters_jobs.append([cluster, self.config, stop_flag]) self.multi_clusters_processing(clusters_jobs, sr_jobs, httpd)
[docs] @staticmethod def multi_clusters_processing(clusters_jobs: list, sr_jobs: list, httpd): with concurrent.futures.ProcessPoolExecutor( max_workers=(len(clusters_jobs) + len(sr_jobs)) ) as executor: futures_to_data: dict[concurrent.futures.Future, list] = {} if sr_jobs: futures_to_data.update( { executor.submit(process_schema_registry, *sr_job): sr_job for sr_job in sr_jobs } ) futures_to_data.update( { executor.submit(process_cluster, *cluster_job): cluster_job for cluster_job in clusters_jobs } ) try: while not STOP_FLAG.is_set(): concurrent.futures.wait(futures_to_data, timeout=10) except KeyboardInterrupt: executor.shutdown(wait=True, cancel_futures=True) finally: executor.shutdown(wait=True, cancel_futures=True) print("Executor has been shut down") httpd.shutdown() return