Source code for kafka_overwatch.reporting.tools
# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>
from __future__ import annotations
from typing import TYPE_CHECKING
from pandas import DataFrame
if TYPE_CHECKING:
from kafka_overwatch.overwatch_resources.clusters import KafkaCluster
from kafka_overwatch.config.logging import KAFKA_LOG
[docs]
def output_dataframe(
kafka_cluster: KafkaCluster, content, file_name: str, mime_type: str
):
"""Writes dataframe to S3 or to local disk (or both)"""
if kafka_cluster.s3_report:
kafka_cluster.s3_report.upload(
content,
file_name,
mime_type=mime_type,
)
if kafka_cluster.local_reports_directory_path:
df_output_path: str = (
f"{kafka_cluster.local_reports_directory_path}/{file_name}"
)
with open(
df_output_path,
"w",
) as df_fd:
df_fd.write(content)
KAFKA_LOG.info(f"{kafka_cluster.name} - Outputted DF to {df_output_path}")
[docs]
def export_df(kafka_cluster: KafkaCluster, df: DataFrame, resource_name: str) -> None:
"""
If DF exporters are set, write/export to these.
"""
export_to_mime: dict = {"csv": "text/csv", "json": "application/json"}
if (
kafka_cluster.config.reporting_config.output_formats
and not kafka_cluster.config.reporting_config.output_formats.pandas_dataframe
):
return
for (
export_type
) in kafka_cluster.config.reporting_config.output_formats.pandas_dataframe:
export_fn = None
if hasattr(df, f"to_{export_type}"):
export_fn = getattr(df, f"to_{export_type}")
if export_type in export_to_mime and export_fn:
output_dataframe(
kafka_cluster,
export_fn(),
f"{kafka_cluster.name}.{resource_name}_dataframe.{export_type}",
mime_type=export_to_mime[export_type],
)