Source code for kafka_overwatch.reporting.topics
# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
pass
import pandas as pd
from dacite import from_dict
from kafka_overwatch.specs.report import TopicWasteCategory
[docs]
def process_cluster_topic_df(topics_df: pd.DataFrame) -> dict:
topic_categories: dict[str, TopicWasteCategory] = {}
"""
Category for all topics without any messages
"""
no_messages_topics = topics_df[topics_df["total_messages"] == 0]
no_messages_topics_partitions_dict = (
no_messages_topics[["name", "partitions"]]
.set_index("name")
.to_dict()["partitions"]
)
no_messages_topics_category = from_dict(
TopicWasteCategory,
{
"topics_count": int(no_messages_topics["name"].count()),
"topic_partitions_sum": int(sum(no_messages_topics["partitions"].values)),
"topics": no_messages_topics_partitions_dict,
"cluster_percentage": int(
(no_messages_topics["name"].count() / len(topics_df)) * 100
),
"description": "All the topics with no messages.",
},
)
topic_categories["no_messages"] = no_messages_topics_category
no_active_cg_no_messages_topics_with_multiple_partitions = topics_df[
(topics_df["total_messages"] == 0)
& (topics_df["partitions"] > 1)
& (topics_df["active_groups"] == 0)
]
no_messages_topics_with_multiple_partitions_dict = (
no_active_cg_no_messages_topics_with_multiple_partitions[["name", "partitions"]]
.set_index("name")
.to_dict()["partitions"]
)
no_messages_topics_with_multiple_partitions_category = from_dict(
TopicWasteCategory,
{
"topics_count": int(
no_active_cg_no_messages_topics_with_multiple_partitions["name"].count()
),
"topic_partitions_sum": int(
sum(
no_active_cg_no_messages_topics_with_multiple_partitions[
"partitions"
].values
)
),
"topics": no_messages_topics_with_multiple_partitions_dict,
"cluster_percentage": int(
(
no_active_cg_no_messages_topics_with_multiple_partitions[
"name"
].count()
/ len(topics_df)
)
* 100
),
"description": "Topics with no messages, no active consumer group, and more than one partition",
},
)
topic_categories["no_messages_topics_with_multiple_partitions"] = (
no_messages_topics_with_multiple_partitions_category
)
no_cgs_and_no_new_messages = topics_df[
(topics_df["total_messages"] > 0)
& (topics_df["new_messages"] == 0)
& (topics_df["active_groups"] == 0)
]
no_cgs_and_no_new_messages_dict = (
no_cgs_and_no_new_messages[["name", "partitions"]]
.set_index("name")
.to_dict()["partitions"]
)
no_cgs_and_no_new_messages_category = from_dict(
TopicWasteCategory,
{
"topics_count": int(no_cgs_and_no_new_messages["name"].count()),
"topic_partitions_sum": int(
sum(no_cgs_and_no_new_messages["partitions"].values)
),
"topics": no_cgs_and_no_new_messages_dict,
"description": (
"Topics with messages, "
"but no active consumer group and no messages produced during the evaluation period"
),
"cluster_percentage": int(
(no_cgs_and_no_new_messages["name"].count() / len(topics_df)) * 100
),
},
)
topic_categories["no_cgs_and_no_new_messages"] = no_cgs_and_no_new_messages_category
return topic_categories