Source code for kafka_overwatch.kafka_resources.topics

# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from confluent_kafka.admin import TopicDescription
    from kafka_overwatch.overwatch_resources.clusters import KafkaCluster

import concurrent.futures
import re
from datetime import datetime as dt

from compose_x_common.compose_x_common import keyisset
from confluent_kafka import TopicCollection, TopicPartition
from confluent_kafka.admin import AclOperation, ConfigResource, ResourceType
from confluent_kafka.error import KafkaException
from retry.api import retry, retry_call

from kafka_overwatch.common import waiting_on_futures
from kafka_overwatch.config.logging import KAFKA_LOG
from kafka_overwatch.kafka_resources import wait_for_result
from kafka_overwatch.overwatch_resources.topics import Partition, Topic


[docs] @retry((KafkaException,), tries=5) def get_topic_descriptions( topic_names: list[str], admin_client ) -> dict[str, TopicDescription]: desc_topics = { topic_name: topic_desc.result() for topic_name, topic_desc in wait_for_result( admin_client.describe_topics( TopicCollection([topic_name for topic_name in topic_names]), include_authorized_operations=True, ) ).items() } return desc_topics
[docs] @retry((KafkaException,), tries=5) def get_topics_list(kafka_cluster: KafkaCluster) -> list[str]: try: topic_names: list[str] = list( kafka_cluster.get_admin_client().list_topics().topics.keys() ) return topic_names except Exception as error: KAFKA_LOG.error(f"{kafka_cluster.name} - Failed to list topics: {error}") raise
[docs] def describe_update_all_topics(kafka_cluster: KafkaCluster, stop_flag: dict) -> None: """ Lists all topics """ topic_names = get_topics_list(kafka_cluster) desc_topics = retry_call( get_topic_descriptions, fargs=[topic_names, kafka_cluster.get_admin_client()] ) for _cluster_topic_name in list(kafka_cluster.topics.keys()): if _cluster_topic_name not in topic_names: KAFKA_LOG.warning( f"Topic {_cluster_topic_name} no longer in cluster {kafka_cluster.name} metadata. Clearing" ) _to_clear = kafka_cluster.topics[_cluster_topic_name] for _partition in _to_clear.partitions.values(): _partition.cleanup() del kafka_cluster.topics[_cluster_topic_name] if desc_topics and not keyisset("stop", stop_flag): describe_update_topics(kafka_cluster, desc_topics, stop_flag) else: KAFKA_LOG.info(f"No topics matched for {kafka_cluster.name}")
[docs] def update_set_topic_config(kafka_cluster, topics_configs_resources) -> None: if not topics_configs_resources: return try: topics_config = wait_for_result( kafka_cluster.get_admin_client().describe_configs(topics_configs_resources) ) for __name, __topic in topics_config.items(): if __name.name in kafka_cluster.topics: kafka_cluster.topics[__name.name].config = __topic.result() except KafkaException as error: print(f"{kafka_cluster.name} - DescribeConfigs on topics failed: {error}")
[docs] def define_topic_jobs( kafka_cluster: KafkaCluster, desc_topics: dict ) -> tuple[dict, list]: """ Goes over all the topic configurations, and if authorization allows for describe configs, plans for the list of configurations to return. Returns the dict of topic description jobs to perform, and the topic descriptions list. """ topics_configs_resources = [] topic_jobs: dict[str, list] = {} now = dt.utcnow() for _topic_name, _topic in desc_topics.items(): if ( hasattr(_topic, "authorized_operations") and AclOperation.DESCRIBE_CONFIGS in _topic.authorized_operations ): topics_configs_resources.append( ConfigResource(ResourceType.TOPIC, _topic.name) ) else: KAFKA_LOG.debug( f"Cluster {kafka_cluster.name} - Topic {_topic.name}: Not authorized to perform DescribeConfig." ) if _topic.name not in kafka_cluster.topics: _topic_obj = Topic(_topic.name, kafka_cluster) kafka_cluster.topics[_topic.name] = _topic_obj else: _topic_obj = kafka_cluster.topics[_topic.name] topic_jobs[_topic_name] = [ _topic_obj, kafka_cluster.consumer_client(), _topic, now, ] return topic_jobs, topics_configs_resources
[docs] def describe_update_topics( kafka_cluster: KafkaCluster, desc_topics: dict, stop_flag ) -> None: """ Leverages threads to retrieve the topic offset watermarks which cannot be sent in a single call to Kafka. """ topic_jobs, topics_configs_resources = define_topic_jobs(kafka_cluster, desc_topics) _tasks = len(topic_jobs) with concurrent.futures.ThreadPoolExecutor( max_workers=kafka_cluster.cluster_brokers_count ) as executor: futures_to_data: dict[concurrent.futures.Future, list] = { executor.submit(init_set_partitions, *job_params): job_params for job_params in topic_jobs.values() } _pending: int = len(futures_to_data) KAFKA_LOG.info( "Kafka cluster: {} | Topics to scan: {}".format( kafka_cluster.name, _pending ) ) waiting_on_futures( executor, futures_to_data, "Kafka Cluster", kafka_cluster.name, "Topics", stop_flag, ) update_set_topic_config(kafka_cluster, topics_configs_resources)
[docs] @retry((KafkaException,), tries=10, delay=5, backoff=2, jitter=(2, 5), logger=KAFKA_LOG) def get_topic_partition_watermarks(consumer_client, topic_name, partition_id): try: start_offset, end_offset = consumer_client.get_watermark_offsets( TopicPartition(topic_name, partition_id) ) return start_offset, end_offset except KafkaException as error: KAFKA_LOG.exception(error) KAFKA_LOG.error(f"Failed to get topic {topic_name}-{partition_id} watermarks") return None, None
[docs] def init_set_partitions(topic_obj: Topic, consumer_client, topic, now: dt): partitions = topic.partitions for _partition in partitions: try: start_offset, end_offset = get_topic_partition_watermarks( consumer_client, topic.name, _partition.id ) except Exception as error: KAFKA_LOG.exception(error) KAFKA_LOG.error( f"Unable to update topic {topic.name} partition {_partition.id} watermarks" ) start_offset = None end_offset = None if start_offset is None or end_offset is None: KAFKA_LOG.debug("No start offset or end offset data retrieved") continue if _partition.id not in topic_obj.partitions: partition = Partition( topic_obj, _partition.id, start_offset, end_offset, now ) topic_obj.partitions[_partition.id] = partition else: partition = topic_obj.partitions[_partition.id] partition.end_offset = end_offset, now if start_offset != partition.init_start_offset[0]: partition.first_offset = start_offset, now return f"{topic_obj.name} - {(dt.utcnow() - now).total_seconds()}"