Source code for kafka_overwatch.reporting.schema_registry
# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from pandas import DataFrame
from kafka_overwatch.overwatch_resources.clusters import KafkaCluster
from kafka_overwatch.specs.report import SchemaRegistryReport, SchemasWasteEstimates
[docs]
def get_schema_registry_report(
topics_df: DataFrame, kafka_cluster: KafkaCluster
) -> SchemaRegistryReport | None:
"""Generate schema registry."""
cluster_sr = kafka_cluster.get_schema_registry()
if not cluster_sr:
return None
topic_names = set(topics_df["name"])
detected_unused = [
item
for item in cluster_sr.subjects
if item.replace("-value", "").replace("-key", "") not in topic_names
]
report = SchemaRegistryReport(
subjects_count=len(cluster_sr.subjects),
schemas_count=len(cluster_sr.schemas),
schemas_estimates=SchemasWasteEstimates(
detected_unused=detected_unused, detected_unused_count=len(detected_unused)
),
)
return report