Source code for kafka_overwatch.overwatch_resources.groups
# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from kafka_overwatch.overwatch_resources.topics import Topic
from datetime import datetime as dt
from confluent_kafka import ConsumerGroupState, TopicPartition
from confluent_kafka.admin import MemberDescription
from kafka_overwatch.config.logging import KAFKA_LOG
[docs]
class ConsumerGroup:
def __init__(self, group_id, init_members: list, init_state: ConsumerGroupState):
self._group_id = group_id
self.topic_offsets: dict[Topic, [TopicPartition]] = {}
self._init_members = init_members
self._init_state: ConsumerGroupState = init_state
self._state = init_state
self._members: list = []
self._init_time = dt.utcnow()
self._partitions_offsets = None
self._lag: dict[str, dict] = {}
def __repr__(self):
return self._group_id
def __hash__(self):
return id(self)
@property
def group_id(self):
return self._group_id
@property
def partitions_offsets(self):
return self._partitions_offsets
@partitions_offsets.setter
def partitions_offsets(self, value):
self._partitions_offsets = value
@property
def state(self) -> ConsumerGroupState:
return self._state
@state.setter
def state(self, value: ConsumerGroupState):
if not isinstance(value, ConsumerGroupState):
raise TypeError("CG state must be ", ConsumerGroupState, "got", type(value))
self._state = value
@property
def init_members(self) -> tuple[list[MemberDescription], dt]:
return self._init_members, self._init_time
@property
def members(self) -> list[MemberDescription]:
return self._members
@members.setter
def members(self, value: list[MemberDescription]):
if not all([isinstance(_member, MemberDescription) for _member in value]):
raise TypeError(
"One of the members is not valid. Expected",
MemberDescription,
"Got",
[type(_member) for _member in value],
)
self._members = value
@property
def is_active(self) -> bool:
if (
self.state not in [ConsumerGroupState.DEAD, ConsumerGroupState.EMPTY]
and self.members
):
return True
return False
@property
def pd_frame_data(self) -> dict:
elapsed_time = dt.utcnow() - self._init_time
return {
"name": self.group_id,
"members": len(self.members),
"state": self.state,
"eval_elapsed_time": elapsed_time.total_seconds(),
"overall_lag": sum([_topic["total"] for _topic in self._lag.values()]),
}
[docs]
def fetch_set_lag(self, topic_name: str = None) -> dict[str, dict]:
"""
Returns the lag for a topic and its partitions
If topic_name is set, returns the lag for that topic alone.
"""
lag: dict[str, dict] = {}
for overwatch_topic, cg_topic_partitions in self.topic_offsets.items():
partitions_lag: list = []
total_lag: int = 0
for partition in cg_topic_partitions:
_overwatch_topic_partition = overwatch_topic.partitions[
partition.partition
]
if _overwatch_topic_partition.total_messages_count == 0:
KAFKA_LOG.debug(
"{} - {}: {}.{} No messages on partition. Skipping for consumer lag.".format(
overwatch_topic.cluster.name,
self.group_id,
overwatch_topic.name,
_overwatch_topic_partition.partition_id,
)
)
continue
if partition.offset < 0:
KAFKA_LOG.debug(
"{} - {} - No committed offset found for topic:partition {}:{}".format(
overwatch_topic.cluster.name,
self.group_id,
overwatch_topic.name,
_overwatch_topic_partition.partition_id,
)
)
break
_partition_lag: int = (
_overwatch_topic_partition.end_offset[0] - partition.offset
)
total_lag += _partition_lag
partitions_lag.append((partition.partition, _partition_lag))
if total_lag and partitions_lag:
lag[overwatch_topic.name] = {
"total": total_lag,
"partitions": partitions_lag,
}
self._lag = lag
if topic_name and topic_name in lag:
return lag[topic_name]
return lag