Source code for kafka_overwatch.overwatch_resources.schema_registry

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2024 John Mille <john@ews-network.net>

from __future__ import annotations

import json
import tarfile
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from kafka_overwatch.overwatch_resources.clusters import KafkaCluster
    from .subject import Subject
    from .schema import Schema

from datetime import datetime as dt
from tempfile import TemporaryDirectory

from cryptography.fernet import Fernet
from kafka_schema_registry_admin import SchemaRegistry as SchemaRegistryClient

from kafka_overwatch.aws_helpers.s3 import S3Handler
from kafka_overwatch.specs.config import ConfluentSchemaRegistry
from kafka_overwatch.specs.config import SchemaRegistry as SchemaRegistryConfig


[docs] class BasicAuthCreds: def __init__(self, username: str, password, runtime_key): cipher_suite = Fernet(runtime_key) self.__username = cipher_suite.encrypt(username.encode()) self.__password = cipher_suite.encrypt(password.encode())
[docs] def get_sr_creds(self, runtime_key): cipher_suite = Fernet(runtime_key) return { "basic_auth.username": cipher_suite.decrypt(self.__username).decode(), "basic_auth.password": cipher_suite.decrypt(self.__password).decode(), }
[docs] class SchemaRegistry: """Manages a schema registry and its assignment to Kafka clusters""" def __init__( self, registry_name: str, registry_config: SchemaRegistryConfig, runtime_key ): self.name: str = registry_name self.temp_bin_dir = TemporaryDirectory() self.mmap_file: str = self.temp_bin_dir.name + "/mmap.bin" self.basic_auth: BasicAuthCreds | None = None if isinstance(registry_config.config, ConfluentSchemaRegistry): if registry_config.config.basic_auth: self.basic_auth = BasicAuthCreds( registry_config.config.basic_auth.username, registry_config.config.basic_auth.password, runtime_key, ) delattr(registry_config.config, "basic_auth") else: raise NotImplementedError("Only confluent style schema registry supported") """Index of subjects in the registry""" self.subjects: dict[str, Subject] = {} """Index of the schemas in the registry""" self.schemas: dict[int, Schema] = {} """Kafka clusters this schema registry is linked to""" self.kafka_clusters: dict[str, KafkaCluster] = {} self._config = registry_config self.supported_types = ["JSON", "PROTOBUF", "AVRO"] @property def config(self) -> SchemaRegistryConfig: return self._config def __repr__(self): return self.name
[docs] def init_backup_handler(self) -> S3Handler: if ( self.config.backup_config and self.config.backup_config.S3 and self.config.backup_config.enabled ): return S3Handler(self.config.backup_config.S3)
[docs] def get_client(self, runtime_key) -> SchemaRegistryClient | None: if self.basic_auth: kwargs: dict = self.basic_auth.get_sr_creds(runtime_key) else: kwargs: dict = {} if isinstance(self._config.config, ConfluentSchemaRegistry): return SchemaRegistryClient( self.config.config.schema_registry_url, **kwargs, )
[docs] def backup(self, s3_backup_handler: S3Handler | None): if not s3_backup_handler: return process_folder = TemporaryDirectory() schemas_folder: TemporaryDirectory = TemporaryDirectory(dir=process_folder.name) subjects_index: dict = {} for _subject in self.subjects.values(): if _subject.name not in subjects_index: _subject_index: dict = {} subjects_index[_subject.name]: dict = _subject_index else: _subject_index: dict = subjects_index[_subject.name] for version in _subject.versions: _schema: Schema = _subject.versions[version] schema_file_name = f"{_subject.name}::{version}::{_schema.schema_type}::{_schema.schema_id}.txt" if version not in _subject_index: _subject_index[version]: str = schema_file_name with open( f"{schemas_folder.name}/{schema_file_name}", "w" ) as subject_version_fd: subject_version_fd.write(_schema.schema_string) for _subject in subjects_index: subjects_index[_subject] = dict(sorted(subjects_index[_subject].items())) with open(f"{schemas_folder.name}/index.json", "w") as index_fd: index_fd.write(json.dumps(subjects_index, sort_keys=True)) with tarfile.open(f"{process_folder.name}/schemas.tar.gz", "w:gz") as tar: tar.add(schemas_folder.name, arcname=".") with open(f"{process_folder.name}/schemas.tar.gz", "rb") as gz_fd: s3_backup_handler.upload( body=gz_fd.read(), file_name=f'{self.name}/{dt.now().strftime("%Y/%m/%d/%H_%M")}/schemas.tar.gz', mime_type="application/gzip", ) schemas_folder.cleanup() process_folder.cleanup()