Source code for kafka_overwatch.processing.schema_registries

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2024 John Mille <john@ews-network.net>

from __future__ import annotations

import concurrent.futures
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from kafka_overwatch.overwatch_resources.schema_registry import (
        SchemaRegistry,
    )
    from kafka_overwatch.config.config import OverwatchConfig

import pickle
from datetime import datetime, timedelta, timezone

from compose_x_common.compose_x_common import keyisset, set_else_none
from prometheus_client import Gauge

from kafka_overwatch.common import waiting_on_futures
from kafka_overwatch.config.logging import KAFKA_LOG
from kafka_overwatch.config.threads_settings import NUM_THREADS
from kafka_overwatch.overwatch_resources.schema_registry.schema import (
    Schema,
    refresh_subject_metadata,
)
from kafka_overwatch.overwatch_resources.schema_registry.subject import Subject
from kafka_overwatch.processing import ensure_prometheus_multiproc

from . import wait_between_intervals


[docs] def retrieve_from_subjects( schema_registry: SchemaRegistry, sr_client, stop_flag ) -> None: """ Much longer way to retrieve all the schemas & subjects, using the subjects endpoints. Using threading to speed up the processing, but still slower by an order of magnitude than retrieving the schemas directly. """ KAFKA_LOG.info(f"{schema_registry.name} - Retrieving all subjects") all_subjects = sr_client.get_all_subjects().json() KAFKA_LOG.info(f"{schema_registry.name} - Retrieved all subjects") for subject_name in all_subjects: if subject_name not in schema_registry.subjects: _subject = Subject(subject_name, schema_registry) schema_registry.subjects[subject_name] = _subject KAFKA_LOG.info( "%s - Started subjects/schemas update at %s" % (schema_registry.name, datetime.now(timezone.utc)) ) subject_jobs: list = [ [_subject, sr_client] for _subject in schema_registry.subjects.values() ] with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: futures_to_data: dict[concurrent.futures, list] = {} futures_to_data.update( { executor.submit(refresh_subject_metadata, *job): job for job in subject_jobs } ) _pending = len(futures_to_data) KAFKA_LOG.debug( "Schema registry: %s | Subjects to scan: %s" % (schema_registry.name, _pending) ) waiting_on_futures( executor, futures_to_data, "Schema Registry", schema_registry.name, "Subjects", stop_flag, )
[docs] def retrieve_from_schemas(schema_registry: SchemaRegistry, sr_client) -> None: """ Fastest way, used if /schemas worked """ schemas_r = sr_client.get_all_schemas() if not (200 <= schemas_r.status_code <= 299): raise LookupError(f"Error retrieving schemas: {schemas_r.text}") schemas = schemas_r.json() KAFKA_LOG.info(f"Schema Registry: {schema_registry.name} | Retrieving all schemas") for _schema in schemas: try: schema_subject: str = _schema["subject"] schema_id: int = _schema["id"] schema_version: int = _schema["version"] _schema_content: str = _schema["schema"] _schema_type: str | None = set_else_none("schemaType", _schema, "AVRO") if schema_id not in schema_registry.schemas: schema = Schema( schema_id, _schema_content, schema_registry, _schema_type ) schema_registry.schemas[schema_id] = schema else: schema = schema_registry.schemas[schema_id] if schema_subject not in schema_registry.subjects: subject = Subject(schema_subject, schema_registry) schema_registry.subjects[schema_subject] = subject else: subject = schema_registry.subjects[schema_subject] if schema_version not in subject.versions: subject.versions[schema_version] = schema except Exception as error: KAFKA_LOG.exception(error) KAFKA_LOG.error( "Schema registry: %s | Error retrieving schema %s" % (schema_registry.name, _schema) )
[docs] def init_schema_registry_prometheus_reporting( schema_registry: SchemaRegistry, overwatch_config: OverwatchConfig ): ensure_prometheus_multiproc(overwatch_config.prometheus_registry_dir.name) subjects_count: Gauge = overwatch_config.prometheus_collectors["subjects_count"] schemas_count: Gauge = overwatch_config.prometheus_collectors["schemas_count"] subjects_count = subjects_count.labels(schema_registry=schema_registry.name) schemas_count = schemas_count.labels(schema_registry=schema_registry.name) return subjects_count, schemas_count
[docs] def process_schemas(schema_registry: SchemaRegistry, sr_client, stop_flag) -> None: """ Process all schemas in the schema registry. If getting schemas from /schemas fails, fall back to /subjects """ try: retrieve_from_schemas(schema_registry, sr_client) except Exception as error: KAFKA_LOG.exception(error) KAFKA_LOG.error( f"{schema_registry.name} Failed to retrieve schemas via /schemas" ) retrieve_from_subjects(schema_registry, sr_client, stop_flag)
[docs] def set_prometheus_metrics( subjects_count, schemas_count, schema_registry: SchemaRegistry ): try: subjects_count.set(len(schema_registry.subjects)) schemas_count.set(len(schema_registry.schemas)) except Exception as error: print(error) KAFKA_LOG.error( "Schema registry: %s | Unable to set prometheus metrics" % (schema_registry.name,) )
[docs] def backup_schema_registry_subjects(schema_registry: SchemaRegistry) -> None: """Attempts to back up all the subjects and their associated schemas to S3.""" try: schema_registry.backup(schema_registry.init_backup_handler()) except Exception as error: KAFKA_LOG.exception(error) KAFKA_LOG.error( f"Schema Registry: {schema_registry.name} | Failed to backup to S3" )
[docs] def write_schema_registry_mmap(schema_registry: SchemaRegistry) -> None: """Writes the schema registry to a mmap file.""" try: with open(schema_registry.mmap_file, "wb") as bin_fd: bin_fd.write(pickle.dumps(schema_registry)) KAFKA_LOG.info( "%s Wrote schema registry to %s" % (schema_registry.name, schema_registry.mmap_file) ) except Exception as error: KAFKA_LOG.exception(error) KAFKA_LOG.error( "Schema Registry: %s | Failed to write mmap file to %s" % (schema_registry.name, schema_registry.mmap_file) )
[docs] def process_schema_registry( schema_registry: SchemaRegistry, runtime_key, overwatch_config: OverwatchConfig, stop_flag, ) -> None: """Process function for the schema registry. Responsible for * retrieving all the schemas & subjects * backup the subjects & schemas if configured. """ subjects_count, schemas_count = init_schema_registry_prometheus_reporting( schema_registry, overwatch_config ) sr_client = schema_registry.get_client(runtime_key) schema_registry.init_backup_handler() while stop_flag["stop"] is False: now = datetime.now(timezone.utc) next_scan = now + timedelta( seconds=schema_registry.config.schema_registry_scan_interval ) try: process_schemas(schema_registry, sr_client, stop_flag) set_prometheus_metrics(subjects_count, schemas_count, schema_registry) backup_schema_registry_subjects(schema_registry) write_schema_registry_mmap(schema_registry) except Exception as error: KAFKA_LOG.exception(error) KAFKA_LOG.error( "Schema registry: %s | Failed to process successfully." % (schema_registry.name,) ) schema_registry.temp_bin_dir.cleanup() return then = datetime.now(timezone.utc) delta = int((next_scan - then).total_seconds()) if delta > 0: KAFKA_LOG.info("%s - Waiting %d seconds" % (schema_registry.name, delta)) wait_between_intervals( stop_flag, delta, ) else: KAFKA_LOG.warning( "%s - Interval is %d - yet it took %d to complete processing." % ( schema_registry.name, schema_registry.config.schema_registry_scan_interval, (then - now).total_seconds(), ) )