Source code for kafka_overwatch.kafka_resources.groups

# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from kafka_overwatch.overwatch_resources.clusters import KafkaCluster
    from confluent_kafka.admin import ConsumerGroupDescription

import concurrent.futures

from confluent_kafka import ConsumerGroupTopicPartitions
from confluent_kafka.error import KafkaError, KafkaException

from kafka_overwatch.common import waiting_on_futures
from kafka_overwatch.config.logging import KAFKA_LOG
from kafka_overwatch.kafka_resources import wait_for_result
from kafka_overwatch.overwatch_resources.groups import ConsumerGroup


[docs] def get_consumer_groups_desc( kafka_cluster: KafkaCluster, groups_list: list, groups_desc: dict[str, ConsumerGroupDescription] | None = None, ) -> dict[str, ConsumerGroupDescription]: if groups_desc is None: groups_desc: dict[str, ConsumerGroupDescription] = {} groups_to_describe: list[str] = [_group for _group in groups_list] groups_desc_r: dict[str, concurrent.futures.Future] = ( kafka_cluster.get_admin_client().describe_consumer_groups(groups_to_describe) ) to_retry: list[str] = [] for _group_name, _future in groups_desc_r.items(): try: groups_desc[_group_name] = _future.result() except (KafkaError, KafkaException) as error: KAFKA_LOG.error( "%s - %s - Error getting consumer group description" % (kafka_cluster.name, _group_name) ) print(error) to_retry.append(_group_name) if to_retry: print(f"GOT TO RETRY {len(to_retry)} CGs") get_consumer_groups_desc(kafka_cluster, to_retry, groups_desc) return groups_desc
[docs] def tidy_consumer_groups( kafka_cluster: KafkaCluster, query_groups_list: list[str] ) -> None: existing_groups_list: list[str] = list(kafka_cluster.groups.keys()) if existing_groups_list and query_groups_list: for group in existing_groups_list: if group not in query_groups_list: try: for topic in kafka_cluster.topics.values(): if topic.consumer_groups and group in topic.consumer_groups: del topic.consumer_groups[group] print( f"Consumer group {group} no longer on cluster {kafka_cluster.name} metadata" ) del kafka_cluster.groups[group] except KeyError: pass
[docs] def set_update_filter_cluster_consumer_groups(kafka_cluster: KafkaCluster) -> None: """ Lists all the consumer groups from the Kafka cluster Filters out of the Kafka cluster groups attribute all the consumer groups that are not in the list Retrieves the consumer group description/details from the cluster """ groups_list_future = kafka_cluster.get_admin_client().list_consumer_groups() while not groups_list_future.done(): if groups_list_future.exception(): print(f"{kafka_cluster.name} - Failed to get consumer groups list") return query_groups_list = [ _group.group_id for _group in groups_list_future.result().valid ] if not query_groups_list: KAFKA_LOG.warning(f"{kafka_cluster.name}: No consumer groups to describe.") return tidy_consumer_groups(kafka_cluster, query_groups_list) groups_desc = get_consumer_groups_desc(kafka_cluster, query_groups_list) for group_desc in groups_desc.values(): if group_desc.group_id not in kafka_cluster.groups: consumer_group = ConsumerGroup( group_desc.group_id, group_desc.members, group_desc.state, ) kafka_cluster.groups[group_desc.group_id] = consumer_group else: consumer_group: ConsumerGroup = kafka_cluster.groups[group_desc.group_id] consumer_group.members = group_desc.members consumer_group.state = group_desc.state
[docs] def set_update_cluster_consumer_groups( kafka_cluster: KafkaCluster, stop_flag: dict ) -> None: """ Lists all Consumer Groups Checks if all the listed CGs were present in the existing cluster CGs. If not, remove. Describe all Consumer Groups List all Consumer Group Offsets (serial, no support for list of CGs) If CG not in the cluster groups, add to it """ set_update_filter_cluster_consumer_groups(kafka_cluster) _tasks = len(kafka_cluster.groups) groups_jobs: dict = { _consumer_group_name: [_consumer_group, kafka_cluster] for _consumer_group_name, _consumer_group in kafka_cluster.groups.items() } with concurrent.futures.ThreadPoolExecutor( max_workers=kafka_cluster.cluster_brokers_count ) as executor: futures_to_data: dict[concurrent.futures.Future, list] = { executor.submit( describe_update_consumer_group_offsets, *job_params ): job_params for job_params in groups_jobs.values() } _pending = len(futures_to_data) KAFKA_LOG.info(f"Kafka cluster: {kafka_cluster.name} | CGs to scan: {_pending}") waiting_on_futures( executor, futures_to_data, "Kafka Cluster", kafka_cluster.name, "Consumer groups", stop_flag, )
[docs] def describe_update_consumer_group_offsets( consumer_group: ConsumerGroup, kafka_cluster: KafkaCluster ) -> None: """ Retrieve a given Consumer group details, given that the list_consumer_group_offsets cannot take multiple groups as argument. Updates the kafka_cluster.groups. """ if not consumer_group or not kafka_cluster: return try: consumer_group.partitions_offsets = wait_for_result( kafka_cluster.get_admin_client().list_consumer_group_offsets( [ConsumerGroupTopicPartitions(consumer_group.group_id)], require_stable=True, ) ) except Exception as error: if error.args[0] == KafkaError.REQUEST_TIMED_OUT: print("CG DESCRIBE TIMEOUT", error) else: print("CG DESCRIBE ERROR", error)
[docs] def retry_kafka_describe_update_consumer_group_offsets( future, futures_to_data, executor ): # get the associated data for the task data = futures_to_data[future] # submit the task again _retry = executor.submit(describe_update_consumer_group_offsets, data) # store so we can track the retries futures_to_data[_retry] = data return data
[docs] def update_set_consumer_group_topics_partitions_offsets( kafka_cluster: KafkaCluster, consumer_group: ConsumerGroup, ) -> None: """ Groups topic partitions offsets per topic Assigns partitions offsets to the consumer group Maps consumer group to topic """ for _group, _future in consumer_group.partitions_offsets.items(): offsets_result = _future.result() __topic_partitions_new_offsets: dict = {} for _topic_partition in offsets_result.topic_partitions: if _topic_partition.topic in kafka_cluster.topics: _topic = kafka_cluster.topics[_topic_partition.topic] else: print("Not monitored topic") continue if _topic not in __topic_partitions_new_offsets: __topic_partitions_new_offsets[_topic] = [_topic_partition] else: __topic_partitions_new_offsets[_topic].append(_topic_partition) for _topic, _topic_partitions in __topic_partitions_new_offsets.items(): consumer_group.topic_offsets[_topic] = _topic_partitions if consumer_group not in _topic.consumer_groups: _topic.consumer_groups[consumer_group.group_id] = consumer_group