Source code for kafka_overwatch.overwatch_resources.schema_registry
# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>
from __future__ import annotations
import json
import tarfile
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from kafka_overwatch.overwatch_resources.clusters import KafkaCluster
from .subject import Subject
from .schema import Schema
from datetime import datetime as dt
from tempfile import TemporaryDirectory
from cryptography.fernet import Fernet
from kafka_schema_registry_admin import SchemaRegistry as SchemaRegistryClient
from kafka_overwatch.aws_helpers.s3 import S3Handler
from kafka_overwatch.specs.config import ConfluentSchemaRegistry
from kafka_overwatch.specs.config import SchemaRegistry as SchemaRegistryConfig
[docs]
class BasicAuthCreds:
def __init__(self, username: str, password, runtime_key):
cipher_suite = Fernet(runtime_key)
self.__username = cipher_suite.encrypt(username.encode())
self.__password = cipher_suite.encrypt(password.encode())
[docs]
def get_sr_creds(self, runtime_key):
cipher_suite = Fernet(runtime_key)
return {
"basic_auth.username": cipher_suite.decrypt(self.__username).decode(),
"basic_auth.password": cipher_suite.decrypt(self.__password).decode(),
}
[docs]
class SchemaRegistry:
"""Manages a schema registry and its assignment to Kafka clusters"""
def __init__(
self, registry_name: str, registry_config: SchemaRegistryConfig, runtime_key
):
self.name: str = registry_name
self.temp_bin_dir = TemporaryDirectory()
self.mmap_file: str = self.temp_bin_dir.name + "/mmap.bin"
self.basic_auth: BasicAuthCreds | None = None
if isinstance(registry_config.config, ConfluentSchemaRegistry):
if registry_config.config.basic_auth:
self.basic_auth = BasicAuthCreds(
registry_config.config.basic_auth.username,
registry_config.config.basic_auth.password,
runtime_key,
)
delattr(registry_config.config, "basic_auth")
else:
raise NotImplementedError("Only confluent style schema registry supported")
"""Index of subjects in the registry"""
self.subjects: dict[str, Subject] = {}
"""Index of the schemas in the registry"""
self.schemas: dict[int, Schema] = {}
"""Kafka clusters this schema registry is linked to"""
self.kafka_clusters: dict[str, KafkaCluster] = {}
self._config = registry_config
self.supported_types = ["JSON", "PROTOBUF", "AVRO"]
@property
def config(self) -> SchemaRegistryConfig:
return self._config
def __repr__(self):
return self.name
[docs]
def init_backup_handler(self) -> S3Handler:
if (
self.config.backup_config
and self.config.backup_config.S3
and self.config.backup_config.enabled
):
return S3Handler(self.config.backup_config.S3)
[docs]
def get_client(self, runtime_key) -> SchemaRegistryClient | None:
if self.basic_auth:
kwargs: dict = self.basic_auth.get_sr_creds(runtime_key)
else:
kwargs: dict = {}
if isinstance(self._config.config, ConfluentSchemaRegistry):
return SchemaRegistryClient(
self.config.config.schema_registry_url,
**kwargs,
)
[docs]
def backup(self, s3_backup_handler: S3Handler | None):
if not s3_backup_handler:
return
process_folder = TemporaryDirectory()
schemas_folder: TemporaryDirectory = TemporaryDirectory(dir=process_folder.name)
subjects_index: dict = {}
for _subject in self.subjects.values():
if _subject.name not in subjects_index:
_subject_index: dict = {}
subjects_index[_subject.name]: dict = _subject_index
else:
_subject_index: dict = subjects_index[_subject.name]
for version in _subject.versions:
_schema: Schema = _subject.versions[version]
schema_file_name = f"{_subject.name}::{version}::{_schema.schema_type}::{_schema.schema_id}.txt"
if version not in _subject_index:
_subject_index[version]: str = schema_file_name
with open(
f"{schemas_folder.name}/{schema_file_name}", "w"
) as subject_version_fd:
subject_version_fd.write(_schema.schema_string)
for _subject in subjects_index:
subjects_index[_subject] = dict(sorted(subjects_index[_subject].items()))
with open(f"{schemas_folder.name}/index.json", "w") as index_fd:
index_fd.write(json.dumps(subjects_index, sort_keys=True))
with tarfile.open(f"{process_folder.name}/schemas.tar.gz", "w:gz") as tar:
tar.add(schemas_folder.name, arcname=".")
with open(f"{process_folder.name}/schemas.tar.gz", "rb") as gz_fd:
s3_backup_handler.upload(
body=gz_fd.read(),
file_name=f'{self.name}/{dt.now().strftime("%Y/%m/%d/%H_%M")}/schemas.tar.gz',
mime_type="application/gzip",
)
schemas_folder.cleanup()
process_folder.cleanup()