kafka_overwatch.specs package

Submodules

kafka_overwatch.specs.config module

class kafka_overwatch.specs.config. AssumeRole ( RoleArn , RoleSessionName = 'kafka-overwatch@aws' , ExternalId = None ) [source]

Bases: object

ExternalId : typing.Optional [ str ] = None

Optional - External ID to use when assuming a role

RoleArn : str

Optional - IAM Role ARN to assume

RoleSessionName : typing.Optional [ str ] = 'kafka-overwatch@aws'

Optional - Name of the session to use

class kafka_overwatch.specs.config. AwsEmf ( namespace , high_resolution_metrics = False , dimensions = None , enabled = None ) [source]

Bases: object

dimensions : typing.Optional [ dict [ str , str ]] = None

Dimension name and value to set in a key/value format.

enabled : typing.Optional [ bool ] = None
high_resolution_metrics : typing.Optional [ bool ] = False

https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/publishingMetrics.html#high-resolution-metrics

namespace : str
class kafka_overwatch.specs.config. AwsEmfModel ( log_group_name = 'kafka/cluster/overwatch/metrics' , service_name = None , watcher_config = None ) [source]

Bases: object

log_group_name : typing.Optional [ str ] = 'kafka/cluster/overwatch/metrics'

override log group name to publish metrics to. Importance: High

service_name : typing.Optional [ str ] = None

override value for EMF Service name. Importance: Low

watcher_config : typing.Optional [ kafka_overwatch.specs.config.AwsEmf ] = None
class kafka_overwatch.specs.config. AwsGlueSchemaRegistry ( registry_arn , iam_override = None ) [source]

Bases: object

AWS Glue Schema Registry configuration.

iam_override : typing.Union [ str , kafka_overwatch.specs.config.AssumeRole , None ] = None
registry_arn : str
class kafka_overwatch.specs.config. BackupConfig ( enabled = None , backup_interval_seconds = None , S3 = None ) [source]

Bases: object

Configuration for schema registry schemas & subjects backup

S3 : typing.Optional [ kafka_overwatch.specs.config.S3Output ] = None
backup_interval_seconds : typing.Optional [ int ] = None
enabled : typing.Optional [ bool ] = None

Turn backup on

class kafka_overwatch.specs.config. BackupStyle ( value , names = None , * , module = None , qualname = None , type = None , start = 1 , boundary = None ) [source]

Bases: Enum

cfn_kafka_admin = 'cfn-kafka-admin'
kafka_topics_sh = 'kafka-topics.sh'
class kafka_overwatch.specs.config. BasicAuth ( username = None , password = None ) [source]

Bases: object

password : typing.Optional [ str ] = None
username : typing.Optional [ str ] = None
class kafka_overwatch.specs.config. ClusterConfig ( kafka = None , schema_registry = None , cluster_config_auth = None ) [source]

Bases: object

cluster_config_auth : typing.Optional [ kafka_overwatch.specs.config.ClusterConfigAuth ] = None

Allows to set override configuration for secret values interpolation

kafka : typing.Optional [ dict [ str , typing.Any ]] = None

Configuration as documented in https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

schema_registry : typing.Optional [ str ] = None

Name of the schema registry defined at the top level.

class kafka_overwatch.specs.config. ClusterConfigAuth ( iam_override = None ) [source]

Bases: object

Allows to set override configuration for secret values interpolation

iam_override : typing.Union [ str , kafka_overwatch.specs.config.AssumeRole , None ] = None
class kafka_overwatch.specs.config. ClusterConfiguration ( cluster_config , reporting_config , cluster_scan_interval_in_seconds = 60 , governance = None , topics_backup_config = None , topic_include_regexes = None , topic_exclude_regexes = None , metrics = None ) [source]

Bases: object

cluster_config : kafka_overwatch.specs.config.ClusterConfig | kafka_overwatch.specs.config.MskClusterConfig
cluster_scan_interval_in_seconds : typing.Optional [ int ] = 60

Overrides the global setting

governance : typing.Optional [ kafka_overwatch.specs.config.GovernanceReportingConfig ] = None
metrics : typing.Optional [ kafka_overwatch.specs.config.ClusterMetrics ] = None

Configure metrics export for the cluster

reporting_config : kafka_overwatch.specs.config.ReportingConfig
topic_exclude_regexes : typing.Optional [ list [ str ]] = None
topic_include_regexes : typing.Optional [ list [ str ]] = None
topics_backup_config : typing.Optional [ kafka_overwatch.specs.config.ClusterTopicBackupConfig ] = None
class kafka_overwatch.specs.config. ClusterMetrics ( prometheus = None , aws_emf = None ) [source]

Bases: object

aws_emf : typing.Optional [ kafka_overwatch.specs.config.AwsEmf ] = None
prometheus : typing.Optional [ kafka_overwatch.specs.config.Prometheus ] = None
class kafka_overwatch.specs.config. ClusterReportSnsChannel ( name , sign_s3_url = None ) [source]

Bases: object

Sns topic to send the messages to

name : str
sign_s3_url : types.UnionType [ bool , float , None ] = None
class kafka_overwatch.specs.config. ClusterReportingNotificationChannels ( sns = None ) [source]

Bases: object

Channels to send notifications to when reports have been generated.

sns : typing.Optional [ list [ kafka_overwatch.specs.config.ClusterReportSnsChannel ]] = None

List of SNS channels defined in notifications_channels

class kafka_overwatch.specs.config. ClusterTopicBackupConfig ( enabled=False , S3=None , BackupStyles=<factory> ) [source]

Bases: object

BackupStyles : typing.Optional [ list [ kafka_overwatch.specs.config.BackupStyle ]]

List the types of backups you want to generate.

S3 : typing.Optional [ kafka_overwatch.specs.config.S3Output ] = None

Enables exports to be sent to S3.

enabled : typing.Optional [ bool ] = False

Enable/disable backup of the topics configuration

class kafka_overwatch.specs.config. ConfluentCloudAuth ( api_key = None , api_secret = None ) [source]

Bases: object

api_key : typing.Optional [ str ] = None

The API key to use to perform API calls to Confluent Cloud

api_secret : typing.Optional [ str ] = None

The API Secret to perform API calls to Confluent Cloud

class kafka_overwatch.specs.config. ConfluentProvider ( confluent_cloud_auth = None , kafka_service_account = None ) [source]

Bases: object

Confluent Cloud settings to use to perform the discovery

confluent_cloud_auth : typing.Optional [ kafka_overwatch.specs.config.ConfluentCloudAuth ] = None
kafka_service_account : typing.Optional [ kafka_overwatch.specs.config.KafkaServiceAccount ] = None
class kafka_overwatch.specs.config. ConfluentSchemaRegistry ( schema_registry_url , basic_auth = None ) [source]

Bases: object

Schema registry client configuration. Uses APIs of Confluent Schema Registry

basic_auth : types.UnionType [ kafka_overwatch.specs.config.BasicAuth , str , None ] = None
schema_registry_url : str
class kafka_overwatch.specs.config. Exports ( S3 = None , local = '/tmp/kafka-overwatch-reports/' , kafka = None ) [source]

Bases: object

Reporting export locations

S3 : typing.Optional [ kafka_overwatch.specs.config.S3Output ] = None
kafka : typing.Optional [ dict [ str , typing.Any ]] = None

Configuration to persist reports into Kafka. Not yet implemented.

local : typing.Optional [ str ] = '/tmp/kafka-overwatch-reports/'

Local directory to store the reports to.

class kafka_overwatch.specs.config. GatewayConfiguration ( gateway_config = None , reporting_config = None , topic_include_regexes = None , topic_exclude_regexes = None , metrics = None ) [source]

Bases: object

gateway_config : typing.Optional [ dict [ str , typing.Any ]] = None
metrics : typing.Optional [ kafka_overwatch.specs.config.ClusterMetrics ] = None

Configure metrics export for the cluster

reporting_config : typing.Optional [ kafka_overwatch.specs.config.ReportingConfig ] = None
topic_exclude_regexes : typing.Optional [ list [ str ]] = None
topic_include_regexes : typing.Optional [ list [ str ]] = None
class kafka_overwatch.specs.config. Global ( cluster_scan_interval_in_seconds ) [source]

Bases: object

Global settings. Defines global default values that can be overriden for each cluster.

cluster_scan_interval_in_seconds : int

Default topics & consumer groups scan interval

class kafka_overwatch.specs.config. GovernanceReportingConfig ( topic_naming_convention = None , consumer_groups_naming_convention = None ) [source]

Bases: object

Configuration for governance cluster analysis

consumer_groups_naming_convention : typing.Optional [ kafka_overwatch.specs.config.NamingConvention ] = None
topic_naming_convention : typing.Optional [ kafka_overwatch.specs.config.NamingConvention ] = None
class kafka_overwatch.specs.config. Iam ( ProfileName = None , AssumeRole = None ) [source]

Bases: object

AssumeRole : typing.Optional [ kafka_overwatch.specs.config.AssumeRole ] = None
ProfileName : typing.Optional [ str ] = None
class kafka_overwatch.specs.config. KafkaOverwatchInputConfiguration ( global_ = None , clusters = None , providers = None , prometheus = None , notification_channels = None , schema_registries = None , aws_emf = None ) [source]

Bases: object

Specification for Kafka topics/partitions hunter service

aws_emf : typing.Optional [ kafka_overwatch.specs.config.AwsEmfModel ] = None
clusters : typing.Optional [ dict [ str , kafka_overwatch.specs.config.ClusterConfiguration ]] = None

Kafka clusters to monitor and report on the partitions usage

global_ : typing.Optional [ kafka_overwatch.specs.config.Global ] = None

Global settings. Defines global default values that can be overriden for each cluster.

notification_channels : typing.Optional [ kafka_overwatch.specs.config.NotificationChannels ] = None

Allows to define notification channels for reporting (not yet implemented).

prometheus : typing.Optional [ typing.Any ] = None
providers : typing.Optional [ kafka_overwatch.specs.config.Providers ] = None

Allows to define a Kafka SaaS provider and perform discovery of existing clusters to scan. (Not yet implemented)

schema_registries : types.UnionType [ dict [ str , kafka_overwatch.specs.config.SchemaRegistry ], dict [ str , typing.Any ], None ] = None
class kafka_overwatch.specs.config. KafkaServiceAccount ( name = 'kafka-overwatch' , description = 'kafka-overwatch' , allow_create = True , save_credentials = None ) [source]

Bases: object

allow_create : typing.Optional [ bool ] = True

If the service account with the ServiceAccountName is not found, creates one. If false and cannot find service account, provider will be failed.

description : typing.Optional [ str ] = 'kafka-overwatch'

Service account description

name : typing.Optional [ str ] = 'kafka-overwatch'

Name of the Confluent service account to create

save_credentials : typing.Optional [ kafka_overwatch.specs.config.SaveCredentials ] = None

Optional - If set, will save the generated credentials for the cluster

class kafka_overwatch.specs.config. MskClusterConfig ( cluster_arn = None , schema_registry = None , iam = None ) [source]

Bases: object

cluster_arn : typing.Optional [ str ] = None

The ARN of the MSK Cluster. This will be used to get the cluster details, including bootstrap details.

iam : typing.Optional [ kafka_overwatch.specs.config.Iam ] = None
schema_registry : typing.Optional [ kafka_overwatch.specs.config.SchemaRegistry ] = None
class kafka_overwatch.specs.config. MskProvider ( iam_override = None , exclude_regions = None ) [source]

Bases: object

exclude_regions : typing.Optional [ list [ str ]] = None

List of regions not to look for MSK clusters

iam_override : typing.Union [ str , kafka_overwatch.specs.config.AssumeRole , None ] = None

Override default session to perform the clusters discovery

class kafka_overwatch.specs.config. NamingConvention ( regexes , ignore_regexes = None ) [source]

Bases: object

Evaluates topic name against one or more regex and reports non-compliant topics

ignore_regexes : typing.Optional [ list [ str ]] = None

List/Array of regular expression of topic names to ignore for review. Use to ignore internal or stream topics

regexes : list [ str ]
class kafka_overwatch.specs.config. NotificationChannels ( sns = None ) [source]

Bases: object

Channels to send notifications to when reports have been generated.

sns : typing.Optional [ dict [ str , kafka_overwatch.specs.config.SnsTopicChannel ]] = None
class kafka_overwatch.specs.config. OutputFormats ( pandas_dataframe = None ) [source]

Bases: object

The different types of outputs to produce.

pandas_dataframe : typing.Optional [ list [ str ]] = None
class kafka_overwatch.specs.config. Prometheus ( enabled = None ) [source]

Bases: object

enabled : typing.Optional [ bool ] = None
class kafka_overwatch.specs.config. Providers ( aiven = None , aws_msk = None , confluent_cloud = None , conduktor_gateway = None ) [source]

Bases: object

Allows to define a Kafka SaaS provider and perform discovery of existing clusters to scan. (Not yet implemented)

aiven : typing.Optional [ typing.Any ] = None
aws_msk : typing.Optional [ kafka_overwatch.specs.config.MskProvider ] = None

AWS MSK Clusters discovery

conduktor_gateway : typing.Optional [ dict [ str , kafka_overwatch.specs.config.GatewayConfiguration ]] = None

Gateways to monitor and import the vClusters from the partitions usage

confluent_cloud : typing.Optional [ kafka_overwatch.specs.config.ConfluentProvider ] = None

Confluent Cloud environments & clusters discovery.

class kafka_overwatch.specs.config. ReportingConfig ( evaluation_period_in_seconds = 60 , notification_channels = None , output_formats = None , exports = None ) [source]

Bases: object

Configure reporting output. Applies to all clusters.

evaluation_period_in_seconds : typing.Optional [ int ] = 60

Interval between reports.

exports : typing.Optional [ kafka_overwatch.specs.config.Exports ] = None

Reporting export locations

notification_channels : typing.Optional [ kafka_overwatch.specs.config.ClusterReportingNotificationChannels ] = None
output_formats : typing.Optional [ kafka_overwatch.specs.config.OutputFormats ] = None

The different types of outputs to produce.

class kafka_overwatch.specs.config. S3Output ( bucket_name = None , prefix_key = '' , iam_override = None ) [source]

Bases: object

bucket_name : typing.Optional [ str ] = None

Name of the S3 bucket

iam_override : typing.Union [ str , kafka_overwatch.specs.config.AssumeRole , None ] = None
prefix_key : typing.Optional [ str ] = ''

Path in the bucket.

class kafka_overwatch.specs.config. SaaSProviderAwsSecretsManager ( secret_id = None , iam_override = None ) [source]

Bases: object

iam_override : typing.Union [ str , kafka_overwatch.specs.config.AssumeRole , None ] = None
secret_id : typing.Optional [ str ] = None

Name or ARN of secret to use to store the key. If ARN is detected, existing secret content will be updated. If name is provided but not found, creates new secret.

class kafka_overwatch.specs.config. SaveCredentials ( aws_secrets_manager = None ) [source]

Bases: object

Optional - If set, will save the generated credentials for the cluster

aws_secrets_manager : typing.Optional [ kafka_overwatch.specs.config.SaaSProviderAwsSecretsManager ] = None
class kafka_overwatch.specs.config. SchemaRegistry ( config , backup_config = None , schema_registry_scan_interval = 300 ) [source]

Bases: object

backup_config : typing.Optional [ kafka_overwatch.specs.config.BackupConfig ] = None

Configuration for schema registry schemas & subjects backup

config : kafka_overwatch.specs.config.ConfluentSchemaRegistry | kafka_overwatch.specs.config.AwsGlueSchemaRegistry
schema_registry_scan_interval : typing.Optional [ int ] = 300

Interval, in seconds, between two scans of the schema registry.

class kafka_overwatch.specs.config. SnsTopicChannel ( topic_arn , role_arn = None , ignore_errors = None , template = None ) [source]

Bases: object

ignore_errors : typing.Optional [ bool ] = None

Prevents exception if true when an exception occurs.

role_arn : typing.Optional [ str ] = None

Optional - Use IAM role to publish messages using another IAM role

template : typing.Optional [ kafka_overwatch.specs.config.Template ] = None

Allows to set specific templates for email and sms

topic_arn : str

ARN of the SNS topic.

class kafka_overwatch.specs.config. Template ( email = None ) [source]

Bases: object

Allows to set specific templates for email and sms

email : typing.Optional [ str ] = None

Optional - Path to a template for SNS Email messages

kafka_overwatch.specs.report module

class kafka_overwatch.specs.report. ClusterReport ( cluster_name , metadata , governance = None , statistics = None , estimated_waste = None , schema_registry = None ) [source]

Bases: object

cluster_name : str
estimated_waste : typing.Optional [ kafka_overwatch.specs.report.EstimatedWaste ] = None
governance : typing.Optional [ kafka_overwatch.specs.report.Governance ] = None

Governance report structure

metadata : kafka_overwatch.specs.report.Metadata
schema_registry : typing.Optional [ kafka_overwatch.specs.report.SchemaRegistryReport ] = None
statistics : typing.Optional [ kafka_overwatch.specs.report.Statistics ] = None
class kafka_overwatch.specs.report. ClusterUsageReportStructure ( cluster = None ) [source]

Bases: object

Defines the format of the cluster topics report

cluster : typing.Optional [ kafka_overwatch.specs.report.ClusterReport ] = None
class kafka_overwatch.specs.report. ConsumerGroups ( total , active = None , inactive = None ) [source]

Bases: object

active : typing.Optional [ int ] = None

Number of active consumer groups (lag = 0) & members > 0

inactive : typing.Optional [ int ] = None

Number of inactive consumer groups (lag > 0) or groups without members

total : int

Total number of consumer groups

class kafka_overwatch.specs.report. EstimatedWaste ( topics = None , partitions = None , topic_categories = None ) [source]

Bases: object

partitions : typing.Optional [ int ] = None

Sum of partitions for the topics

topic_categories : typing.Optional [ dict [ str , kafka_overwatch.specs.report.TopicWasteCategory ]] = None
topics : typing.Optional [ int ] = None
class kafka_overwatch.specs.report. Governance ( topic_naming_convention = None , consumer_group_naming_convention = None ) [source]

Bases: object

Governance report structure

consumer_group_naming_convention : typing.Optional [ kafka_overwatch.specs.report.GovernanceNamingConventionReport ] = None
topic_naming_convention : typing.Optional [ kafka_overwatch.specs.report.GovernanceNamingConventionReport ] = None
class kafka_overwatch.specs.report. GovernanceNamingConventionReport ( total = None , total_ignored = None , total_measured = None , compliant_percentage = None , non_compliant_resources = None ) [source]

Bases: object

compliant_percentage : typing.Optional [ float ] = None

Percentage of compliant consumer_groups, out of total_topic_measured

non_compliant_resources : typing.Optional [ list [ str ]] = None

List of non-compliant topic names

total : typing.Optional [ float ] = None

Total number of consumer_groups in the cluster

total_ignored : typing.Optional [ float ] = None

Total number of consumer_groups ignored via regex

total_measured : typing.Optional [ float ] = None

Total number of consumer_groups measured

class kafka_overwatch.specs.report. Metadata ( timestamp ) [source]

Bases: object

timestamp : str

Time the report was generated at

class kafka_overwatch.specs.report. SchemaRegistryReport ( subjects_count = None , schemas_count = None , schemas_estimates = None ) [source]

Bases: object

schemas_count : typing.Optional [ int ] = None
schemas_estimates : typing.Optional [ kafka_overwatch.specs.report.SchemasWasteEstimates ] = None
subjects_count : typing.Optional [ int ] = None
class kafka_overwatch.specs.report. SchemasWasteEstimates ( detected_unused = None , detected_unused_count = None ) [source]

Bases: object

detected_unused : typing.Optional [ list [ str ]] = None
detected_unused_count : typing.Optional [ int ] = None
class kafka_overwatch.specs.report. Statistics ( topics , partitions = None , most_active_topics = None , consumer_groups = None ) [source]

Bases: object

consumer_groups : typing.Optional [ kafka_overwatch.specs.report.ConsumerGroups ] = None
most_active_topics : types.UnionType [ list [ str ], dict [ str , typing.Any ], None ] = None

Topics in the 0.75 percentile of number of messages and new_messages which active consumer groups

partitions : typing.Optional [ int ] = None

Sum of partitions for the topics

topics : int

Total count of topics counted at the time of generating the report

class kafka_overwatch.specs.report. TopicWasteCategory ( topics , topic_partitions_sum , description , topics_count = None , cluster_percentage = None ) [source]

Bases: object

cluster_percentage : typing.Optional [ float ] = None

The percentage of topics fit into that category within the cluster

description : str

The description of the category

topic_partitions_sum : int
topics : dict [ str , int ]
topics_count : typing.Optional [ int ] = None

Module contents