Source code for kafka_overwatch.overwatch_resources.schema_registry.schema

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2024 John Mille <john@ews-network.net>
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from kafka_overwatch.overwatch_resources.schema_registry import (
        SchemaRegistry,
        Subject,
    )


[docs] class Schema: """Class to maintain metadata about schema""" def __init__( self, schema_id: int, schema_string: str, overwatch_registry: SchemaRegistry, schema_type: str = None, ): self.schema_id: int = schema_id self.schema_string: str = schema_string self._schema_type: str | None = schema_type self._registry: SchemaRegistry = overwatch_registry def __str__(self): return f"{self.schema_id}" def __repr__(self): return f"{self.schema_id}" @property def schema_type(self) -> str | None: return self._schema_type @schema_type.setter def schema_type(self, value: str): if value not in self._registry.supported_types: raise ValueError( f"Invalid schema type: {value}. Must be one of", self._registry.supported_types, ) self._schema_type = value @property def overwatch_registry(self) -> SchemaRegistry: return self._registry
[docs] def refresh_subject_metadata(subject: Subject, sr_client): """ Iterates over all the versions of a given subject. If the version and schema is already in the registry inventory, skip. If not, retrieve the schema details, and store to the in-memory registry. """ subject_versions = sr_client.get_subject_versions(subject.name).json() for version in subject_versions: if version in subject.versions: continue subject_version_schema = ( sr_client.get_subject_version_id(subject.name, version) ).json() if subject_version_schema["id"] not in subject.overwatch_registry.schemas: _schema = Schema(subject_version_schema["id"], subject.overwatch_registry) subject.overwatch_registry.schemas[_schema.schema_id] = _schema else: _schema = subject.overwatch_registry.schemas[subject_version_schema["id"]] if subject_version_schema["version"] not in subject.versions: subject.versions[subject_version_schema["version"]] = _schema