Source code for kafka_overwatch.overwatch_resources.topics

# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>

from __future__ import annotations

from datetime import datetime as dt
from datetime import timedelta as td
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from kafka_overwatch.overwatch_resources.clusters import KafkaCluster
    from kafka_overwatch.overwatch_resources.groups import ConsumerGroup

from confluent_kafka.admin import ConfigEntry


[docs] class Partition: """ Class to represent a partition of a topic, with methods to make it easy to track them """ def __init__( self, topic: Topic, partition_id: int, init_start_offset: int, init_end_offset: int, init_time: dt, ): self._topic = topic self._id = partition_id self._init_start_offset: tuple[int, dt] = (init_start_offset, init_time) self._init_end_offset: tuple[int, dt] = (init_end_offset, init_time) self._end_offset = (init_end_offset, init_time) _topic_partition_new_messages_collector = ( self._topic.cluster.prometheus_collectors["topic_partition_new_messages"] ) self._topic_partition_new_messages_metrics_collector = ( _topic_partition_new_messages_collector.labels( cluster=self._topic.cluster.name, topic_name=self._topic.name, partition_id=self.partition_id, ) ) self._first_offset = None def __repr__(self): return f"{self._topic.name}:{self.partition_id}"
[docs] def cleanup(self): _topic_partition_new_messages_collector = ( self._topic.cluster.prometheus_collectors["topic_partition_new_messages"] ) topic_partition_new_messages_labels = ( self._topic.cluster.name, self._topic.name, str(self.partition_id), ) if ( topic_partition_new_messages_labels in _topic_partition_new_messages_collector._metrics ): del _topic_partition_new_messages_collector._metrics[ topic_partition_new_messages_labels ]
@property def partition_id(self) -> int: return self._id @property def init_start_offset(self) -> tuple[int, dt]: """Represents the offsets as the kafka-overwatch started, allowing to do the long term evaluations""" return self._init_start_offset @property def first_offset(self) -> tuple[int, dt]: """Until the first offset of the partition changes, we use the init start value.""" if self._first_offset: return self._first_offset else: return self.init_start_offset @first_offset.setter def first_offset(self, value: tuple[int, dt]): """In case the first offset of the partition moves, we need to evolve the value""" self._first_offset = value @property def end_offset(self) -> tuple[int, dt]: return self._end_offset @end_offset.setter def end_offset(self, value): new_end_offset, new_time = value self._topic_partition_new_messages_metrics_collector.observe( new_end_offset - self.end_offset[0] ) self._end_offset = value @property def total_messages_count(self) -> int: end_offset, _ = self.end_offset start_offset, _ = self.first_offset return int(end_offset - start_offset)
[docs] def get_end_offset_diff(self) -> tuple: """Returns the numerical difference in offset and the elapsed time between the two measures""" _init_end_offset, _init_offset_time = self._init_end_offset _end_offset, _end_time = self._end_offset _diff_offset = _end_offset - _init_end_offset _diff_time = _end_time - _init_offset_time return _diff_offset, _diff_time
[docs] def has_new_messages(self) -> bool: """Returns True if the topic has new messages""" if self.get_end_offset_diff()[0] > 0: return True return False
[docs] class Topic: def __init__(self, name, cluster, properties: ConfigEntry = None): self._name = name self._cluster: KafkaCluster = cluster self.partitions: dict[int, Partition] = {} self.consumer_groups: dict[str, ConsumerGroup] = {} self._properties: ConfigEntry = properties def __repr__(self): return self.name def __hash__(self): return id(self) @property def name(self) -> str: return self._name @property def cluster(self): return self._cluster @property def config(self) -> dict[str, ConfigEntry]: """Returns the ConfigEntry""" return self._properties if isinstance(self._properties, dict) else None @config.setter def config(self, value: dict[str, ConfigEntry]): if not isinstance(value, dict): raise TypeError(f"Expected dict, got {type(value)}") self._properties = value @property def pd_frame_data(self) -> dict: new_messages, elapsed_time = self.new_messages_count() return { "name": self.name, "partitions": len(self.partitions), "total_messages": sum( _p.total_messages_count for _p in self.partitions.values() ), "new_messages": new_messages, "eval_elapsed_time": elapsed_time, "consumer_groups": len(self.consumer_groups), "active_groups": len( [_cg for _cg in self.consumer_groups.values() if _cg.is_active] ), }
[docs] def generate_kafka_create_topic_command(self): if not self.config: return f"kafka-topics.sh --create --topic {self.name} --partitions {len(self.partitions)}" configs = " \\\n".join( [ f"--config {value}" for value in self.config.values() if not value.is_default ] ) command_config = ( "--bootstrap-server ${BOOTSTRAP_SERVER} " "${CLIENT_CONFIG_PATH+:--command-config CLIENT_CONFIG_PATH}" ) return ( f"kafka-topics.sh --create --topic {self.name} --partitions {len(self.partitions)} \\\n" f"{configs} \\\n{command_config}" )
[docs] def has_active_groups(self) -> bool: """Returns whether any of the consumer groups are active, or not.""" groups_status: list[bool] = [ _group.is_active for _group in self.consumer_groups.values() ] return any(groups_status)
[docs] def has_new_messages(self) -> bool: """Returns True if the topic has new messages by checking if any partition had new messages""" return any( [_partition.has_new_messages() for _partition in self.partitions.values()] )
[docs] def new_messages_count(self) -> tuple[int, int]: total_messages: int = 0 if not self.partitions: return 0, 0 elapsed_time = ( self.partitions[0].end_offset[1] - self.partitions[0].init_start_offset[1] ) for partition in self.partitions.values(): total_messages += partition.get_end_offset_diff()[0] return total_messages, int(elapsed_time.total_seconds())