Source code for kafka_overwatch.cli
# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>
import faulthandler
import sys
from argparse import ArgumentParser
from os import environ
from tempfile import TemporaryDirectory
try:
from codeguru_profiler_agent import Profiler
_ATTEMPT_PROFILING: bool = True
except ImportError:
_ATTEMPT_PROFILING: bool = False
from boto3.session import Session
from kafka_overwatch.config import load_config_file
faulthandler.enable()
[docs]
def set_parser():
parser = ArgumentParser()
parser.add_argument(
"-c",
"--config-file",
default=None,
dest="config_file",
help="Path to the clusters config file",
)
parser.add_argument("--verbose", "-v", action="store_true")
return parser
[docs]
def profiling_setup() -> Profiler | None:
if _ATTEMPT_PROFILING:
_watcher_session = Session()
try:
profiler = Profiler(
environ.get("AWS_CODEGURU_PROFILER_GROUP_NAME", "overwatch"),
aws_session=_watcher_session,
region_name=environ.get(
"AWS_CODEGURU_PROFILER_TARGET_REGION", "eu-west-1"
),
)
return profiler
except Exception as error:
print("Failed to start CodeGuru profiler")
print(error)
return None
[docs]
def set_prometheus_env_vars() -> TemporaryDirectory:
"""Prometheus temp dir needs to be setup before at this point. Won't work later in the code..."""
prometheus_registry_dir: TemporaryDirectory = TemporaryDirectory()
if not environ.get("PROMETHEUS_MULTIPROC_DIR") or not environ.get(
"prometheus_multiproc_dir"
):
print(
"## PROMETHEUS_MULTIPROC_DIR not set. Setting to:",
prometheus_registry_dir.name,
)
environ["PROMETHEUS_MULTIPROC_DIR"] = prometheus_registry_dir.name
environ["prometheus_multiproc_dir"] = prometheus_registry_dir.name
return prometheus_registry_dir
[docs]
def main():
"""
Main entrypoint
"""
_parser = set_parser()
_args = _parser.parse_args()
_config = load_config_file(_args.config_file)
profiler = profiling_setup()
prometheus_registry_dir = set_prometheus_env_vars()
from kafka_overwatch.config.config import OverwatchConfig
from kafka_overwatch.overwatch import KafkaOverwatchService
try:
_overwatch_config: OverwatchConfig = OverwatchConfig(
_config, prometheus_registry_dir
)
except Exception as error:
print("Failed to load up the configuration", error)
return 2
try:
watcher = KafkaOverwatchService(_overwatch_config)
if profiler:
profiler.start()
watcher.start()
return 0
except Exception as error:
print("Failed to start kafka-overwatch", error)
return 1
if __name__ == "__main__":
sys.exit(main())