Source code for kafka_overwatch.cli.schema_registry_restore
# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>
import faulthandler
from argparse import ArgumentParser
try:
from codeguru_profiler_agent import Profiler
_ATTEMPT_PROFILING: bool = True
except ImportError:
_ATTEMPT_PROFILING: bool = False
from kafka_overwatch.overwatch_resources.schema_registry.schemas_restore import (
main as restore_schemas,
)
faulthandler.enable()
[docs]
def set_parser():
parser = ArgumentParser()
parser.add_argument(
"-b",
"--backup-file",
default=None,
dest="input_backup",
help="Path to the backup .tar.gz file containing the schemas definitions and index.json",
required=True,
)
parser.add_argument(
"--sr-url",
default=None,
required=True,
dest="url",
)
parser.add_argument(
"--sr-user",
default=None,
required=False,
dest="username",
)
parser.add_argument(
"--sr-password",
default=None,
required=False,
dest="password",
)
parser.add_argument("--verbose", "-v", action="store_true")
return parser
[docs]
def main():
parser = set_parser()
args = parser.parse_args()
_kwargs: dict = vars(args)
restore_schemas(**_kwargs)
if __name__ == "__main__":
main()