Source code for kafka_overwatch.processing.clusters

# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from pandas import DataFrame
    from prometheus_client import Gauge
    from kafka_overwatch.config.config import OverwatchConfig

from datetime import datetime as dt
from datetime import timedelta as td

from compose_x_common.compose_x_common import keyisset

from kafka_overwatch.config.logging import KAFKA_LOG
from kafka_overwatch.kafka_resources.groups import (
    set_update_cluster_consumer_groups,
    update_set_consumer_group_topics_partitions_offsets,
)
from kafka_overwatch.kafka_resources.topics import describe_update_all_topics
from kafka_overwatch.overwatch_resources.clusters import (
    KafkaCluster,
    generate_cluster_consumer_groups_pd_dataframe,
    generate_cluster_topics_pd_dataframe,
)

from . import wait_between_intervals


[docs] def measure_consumer_group_lags( kafka_cluster: KafkaCluster, consumer_group_lag_gauge: Gauge ): """ Evaluates, if consumer groups were retrieved, the consumer groups lags and export metrics to Prometheus. """ for consumer_group in kafka_cluster.groups.values(): consumer_group_lag = consumer_group.fetch_set_lag() for topic, topic_lag in consumer_group_lag.items(): consumer_group_lag_gauge.labels( kafka_cluster.name, consumer_group.group_id, topic ).set(topic_lag["total"])
[docs] def generate_cluster_report( kafka_cluster: KafkaCluster, topics_df: DataFrame, groups_df: DataFrame ) -> None: """ Evaluates whether time to produce the report has passed. If so, generates and updates next monitoring time. """ if ( kafka_cluster.config.reporting_config and kafka_cluster.next_reporting and (dt.utcnow() > kafka_cluster.next_reporting) ): kafka_cluster.render_report(topics_df, groups_df) kafka_cluster.next_reporting = dt.utcnow() + td( seconds=kafka_cluster.config.reporting_config.evaluation_period_in_seconds )
[docs] def process_cluster( kafka_cluster: KafkaCluster, overwatch_config: OverwatchConfig, stop_flag ): """ Initialize the Kafka cluster monitoring/evaluation loop. Creates the cluster, which creates the Kafka clients. """ kafka_cluster.init_cluster_prometheus_reporting(overwatch_config) kafka_cluster.set_reporting_exporters() kafka_cluster.set_cluster_connections() consumer_group_lag_gauge = overwatch_config.prometheus_collectors[ "consumer_group_lag" ] print(stop_flag["stop"] is False) while stop_flag["stop"] is False: print("Cluster loop", stop_flag) try: kafka_cluster.check_replace_kafka_clients() kafka_cluster.set_cluster_properties() processing_start = dt.utcnow() if not keyisset("stop", stop_flag): process_cluster_resources(kafka_cluster, stop_flag) else: break topics_df = generate_cluster_topics_pd_dataframe(kafka_cluster) groups_df = generate_cluster_consumer_groups_pd_dataframe(kafka_cluster) kafka_cluster.cluster_topics_count.set( len(topics_df["name"].values.tolist()) ) kafka_cluster.cluster_partitions_count.set( sum(topics_df["partitions"].values.tolist()) ) kafka_cluster.cluster_consumer_groups_count.set(len(kafka_cluster.groups)) if ( kafka_cluster.config.topics_backup_config and kafka_cluster.config.topics_backup_config.enabled ): kafka_cluster.render_restore_files() elapsed_time = int((dt.utcnow() - processing_start).total_seconds()) KAFKA_LOG.info(f"{kafka_cluster.name} - {elapsed_time}s processing time.") KAFKA_LOG.info(f"{kafka_cluster.name} - Cluster topics stats") print(topics_df.describe()) print(groups_df.describe()) measure_consumer_group_lags(kafka_cluster, consumer_group_lag_gauge) generate_cluster_report(kafka_cluster, topics_df, groups_df) time_to_wait = int( kafka_cluster.config.cluster_scan_interval_in_seconds - elapsed_time ) wait_between_intervals( stop_flag, time_to_wait, ( f"{kafka_cluster.name} - interval set to {kafka_cluster.config.cluster_scan_interval_in_seconds}" f", however it takes {elapsed_time}s to complete the scan. Consider changing scan interval" ), ) except Exception as e: KAFKA_LOG.exception(e) KAFKA_LOG.error(f"{kafka_cluster.name} - {e}") try: kafka_cluster.set_cluster_connections() except Exception as e: KAFKA_LOG.exception(e) KAFKA_LOG.error(f"{kafka_cluster.name} - {e}") return return
[docs] def process_cluster_resources(kafka_cluster: KafkaCluster, stop_flag): """Makes sure that no signal was received in between each instruction""" if stop_flag["stop"] is False: with kafka_cluster.groups_describe_latency.time(): set_update_cluster_consumer_groups(kafka_cluster, stop_flag) if stop_flag["stop"] is False: with kafka_cluster.topics_describe_latency.time(): describe_update_all_topics(kafka_cluster, stop_flag) if stop_flag["stop"] is False: for consumer_group in kafka_cluster.groups.values(): update_set_consumer_group_topics_partitions_offsets( kafka_cluster, consumer_group )