Source code for kafka_overwatch.aws_helpers.kafka_client_secrets
# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>
"""
Parses the configuration given to the configuration file and is interpolated with the values from {{resolve}}
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from kafka_overwatch.overwatch_resources.clusters import KafkaCluster
from copy import deepcopy
from aws_cfn_custom_resource_resolve_parser import (
parse_secret_resolve_string,
retrieve_secret,
)
from boto3.session import Session
from kafka_overwatch.aws_helpers import get_session_from_iam_override
from kafka_overwatch.config.logging import KAFKA_LOG
from kafka_overwatch.specs.config import ClusterConfig, MskClusterConfig
[docs]
def handle_librdkafka_config(kafka_cluster: KafkaCluster) -> dict:
if (
kafka_cluster.config.cluster_config.cluster_config_auth
and kafka_cluster.config.cluster_config.cluster_config_auth.iam_override
):
session = get_session_from_iam_override(
kafka_cluster.config.cluster_config.cluster_config_auth.iam_override
)
else:
session = Session()
client_config: dict = deepcopy(kafka_cluster.config.cluster_config.kafka)
for config_key, config_value in client_config.items():
if isinstance(config_value, str) and config_value.startswith("{{resolve:"):
try:
secret_name, key, version = parse_secret_resolve_string(config_value)
secret = retrieve_secret(secret_name, key, version, session=session)
client_config[config_key] = secret
except Exception as error:
KAFKA_LOG.exception(error)
KAFKA_LOG.error(
f"Error while resolving {config_value}: {error}. Using value as-is."
)
return client_config
[docs]
def eval_kafka_client_config(kafka_cluster: KafkaCluster) -> dict:
"""
If a configuration value is a string starting with {{resolve:}} the value is interpolated
using AWS SecretsManager or AWS SSM.
We create a new dict in order to preserve the original
"""
if isinstance(kafka_cluster.config.cluster_config, MskClusterConfig):
raise NotImplementedError("MskClusterConfig is not yet implemented.")
client_config = handle_librdkafka_config(kafka_cluster)
return client_config