Source code for kafka_overwatch.notifications.aws_sns

#   SPDX-License-Identifier: MPL-2.0
#   Copyright 2023 John "Preston" Mille <john@ews-network.net>


"""Manages SNS notifications to report error and status"""

from __future__ import annotations

import json
from typing import TYPE_CHECKING, Union

if TYPE_CHECKING:
    from kafka_overwatch.overwatch_resources.clusters import KafkaCluster
    from kafka_overwatch.specs.config import SnsTopicChannel

from os import environ, path

from boto3.session import Session
from botocore.exceptions import ClientError
from compose_x_common.aws import get_assume_role_session
from importlib_resources import files as pkg_files
from jinja2 import BaseLoader, Environment

from kafka_overwatch.config.logging import KAFKA_LOG


[docs] class SnsChannel: def __init__(self, name: str, definition: SnsTopicChannel): self._definition = definition self.name = name self._usage_report_templates_definitions: dict = { "default": pkg_files("kafka_overwatch").joinpath( "notifications/aws_sns/usage_report/default.j2" ), "email": pkg_files("kafka_overwatch").joinpath( "notifications/aws_sns/usage_report/email.j2" ), } self.ignore_errors = self.definition.ignore_errors self._messages_templates: dict = {} self.import_usage_report_jinja2_templates() def __repr__(self): return f"sns.{self.name}" @property def usage_report_messages_templates(self) -> dict: """Messages templates""" return self._messages_templates @property def definition(self) -> SnsTopicChannel: """Initial definition""" return self._definition @property def session(self) -> Session: if self.definition.role_arn: return get_assume_role_session( Session(), self.definition.role_arn, "KafkaOverwatch", ) else: return Session()
[docs] def import_usage_report_jinja2_templates(self) -> None: if self.definition.template and self.definition.template.email: self._usage_report_templates_definitions["email"] = ( self.definition.template.email ) for ( message_type, template_path, ) in self._usage_report_templates_definitions.items(): if not path.exists(template_path): raise FileNotFoundError(f"Template file not found: {template_path}") with open(path.abspath(template_path)) as template_file: self._messages_templates[message_type] = template_file.read()
[docs] def publish(self, subject: str, message: str | dict) -> None: """Publish message to SNS""" if not isinstance(message, (str, dict)): raise TypeError(f"message must be str or dict, not {type(message)}") client = self.session.client("sns") try: if isinstance(message, str): client.publish( TopicArn=self.definition.topic_arn, Subject=subject, Message=message ) else: client.publish( TopicArn=self.definition.topic_arn, Subject=subject, Message=json.dumps(message), MessageStructure="json", ) except (client.exceptions, ClientError) as error: KAFKA_LOG.exception(error) KAFKA_LOG.error( f"{self.name} - Failed to send notification to {self.definition.topic_arn}" )
[docs] @staticmethod def render_usage_report_message_template( template: str, cluster_id: str, s3_url: str, s3_uri: str, s3_signed_url: str ) -> str: jinja_env = Environment( loader=BaseLoader(), autoescape=True, auto_reload=False, ).from_string(template) content = jinja_env.render( env=environ, USAGE_REPORT_S3_URL=s3_url, USAGE_REPORT_S3_URI=s3_uri, USAGE_REPORT_S3_SIGNED_URL=s3_signed_url, KAFKA_CLUSTER_ID=cluster_id, ) return content
[docs] def send_usage_report_notification( self, cluster: KafkaCluster, subject: str, s3_uri: str = None, s3_url: str = None, s3_signed_url: str = None, ): """Send error notification""" messages: dict = {} for sns_message_type in self.usage_report_messages_templates: try: content = self.render_usage_report_message_template( self.usage_report_messages_templates[sns_message_type], cluster.name, s3_url, s3_uri, s3_signed_url, ) messages[sns_message_type] = content except Exception as error: KAFKA_LOG.exception(error) KAFKA_LOG.error( f"Failed to render the Jinja2 template for {sns_message_type}" ) if not self.ignore_errors: raise self.publish(subject, messages)