Source code for kafka_overwatch.overwatch_resources.schema_registry.schemas_restore
# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>
"""Module used to re-import schemas from backup to existing or new schema registry"""
from __future__ import annotations
import json
import tarfile
from tempfile import TemporaryDirectory
from kafka_schema_registry_admin.client_wrapper.errors import NotFoundException
from kafka_schema_registry_admin.kafka_schema_registry_admin import SchemaRegistry
# subject-name::subject_version::schema_type::schema_id.txt
[docs]
def restore_subject_versions(
schema_registry: SchemaRegistry,
subject_name: str,
subject_details: dict,
tmp_folder: TemporaryDirectory,
existing_versions: list = None,
) -> None:
"""Restore a full subject given it doesn't exist in Registry"""
try:
schema_registry.put_subject_mode(subject_name, "IMPORT")
subject_mode_failed: bool = False
except Exception as error:
subject_mode_failed = True
if existing_versions is None:
existing_versions = []
for version_id, file_name in subject_details.items():
if version_id in existing_versions:
continue
if subject_mode_failed:
schema_registry.put_subject_mode(subject_name, "IMPORT", force=True)
__subject_name, _version_id, _schema_type, _schema_id = file_name.split("::")
_schema_id: int = int(_schema_id.replace(".txt", ""))
with open(f"{tmp_folder.name}/{file_name}") as schema_fd:
schema_content = schema_fd.read()
schema_registry.post_subject_schema_version(
subject_name,
schema_content,
version_id=_version_id,
schema_type=_schema_type,
schema_id=_schema_id,
)
schema_registry.put_subject_mode(subject_name, "READWRITE")
[docs]
def restore_subject(
schema_registry: SchemaRegistry,
subject_name: str,
subject_details: dict,
tmp_folder: TemporaryDirectory,
) -> None:
try:
subject_existing_versions: list = schema_registry.get_subject_versions(
subject_name
).json()
restore_subject_versions(
schema_registry,
subject_name,
subject_details,
tmp_folder,
subject_existing_versions,
)
except NotFoundException:
restore_subject_versions(
schema_registry, subject_name, subject_details, tmp_folder
)
[docs]
def main(*args, **kwargs):
"""Main function"""
schema_registry = SchemaRegistry(kwargs.get("url"))
tmp_folder = TemporaryDirectory()
with tarfile.open(kwargs.get("input_backup"), "r:gz") as gz_fd:
gz_fd.extractall(tmp_folder.name)
with open(f"{tmp_folder.name}/index.json") as index_fd:
subjects_index = json.load(index_fd)
for _subject_name in subjects_index:
try:
restore_subject(
schema_registry,
_subject_name,
subjects_index[_subject_name],
tmp_folder,
)
except Exception as error:
print(f"Error restoring subject {_subject_name}: {error}")