Source code for kafka_overwatch.kafka_resources
# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>
from __future__ import annotations
from copy import deepcopy
from os import environ
from confluent_kafka import Consumer, KafkaException
from confluent_kafka.admin import AdminClient
from retry import retry
[docs]
@retry((KafkaException,), delay=5, max_delay=30, backoff=2)
def wait_for_result(result_container: dict) -> dict:
for _future in result_container.values():
while not _future.done():
_future.result()
return result_container
[docs]
def set_consumer_client(settings: dict) -> Consumer:
"""Creates a new librdkafka Consumer client"""
client_id: str = f"consumer_partitions_hunter"
cluster_config = deepcopy(settings)
cluster_config.update({"client.id": client_id})
if "group.id" not in cluster_config:
cluster_config["group.id"] = environ.get(
"CONSUMER_GROUP_ID", "kafka-partitions-hunter"
)
return Consumer(cluster_config)
[docs]
def set_admin_client(settings: dict) -> AdminClient:
"""Creates a new librdkafka Admin client"""
client_id: str = f"admin_partitions_hunter"
timeout_ms_env = int(environ.get("ADMIN_REQUEST_TIMEOUT_MS", 60000))
cluster_config = deepcopy(settings)
cluster_config.update({"client.id": client_id})
if "group.id" in cluster_config:
del cluster_config["group.id"]
cluster_config.update(
{"request.timeout.ms": timeout_ms_env if timeout_ms_env >= 60000 else 60000}
)
return AdminClient(cluster_config)