Source code for kafka_overwatch.aws_helpers.s3
# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>
from __future__ import annotations
from boto3.exceptions import S3UploadFailedError
from boto3.session import Session
from botocore.exceptions import ClientError
from compose_x_common.aws import get_assume_role_session
from retry import retry
from kafka_overwatch.aws_helpers import get_session_from_iam_override
from kafka_overwatch.config.logging import KAFKA_LOG
from kafka_overwatch.specs.config import S3Output
[docs]
class S3Handler:
def __init__(self, s3_output_config: S3Output):
self._config = s3_output_config
if not self._config.iam_override:
self._session: Session = Session()
else:
self._session = None
if self._session:
self._client = self._session.client("s3")
else:
self._client = None
@property
def config(self) -> S3Output:
return self._config
@property
def session(self) -> Session:
if not self._config.iam_override:
return self._session
else:
return get_session_from_iam_override(self._config.iam_override)
@property
def client(self):
"""Ensures to get client with a fresh session if needed"""
if not self._client and self._session:
self._client = self.session.client("s3")
return self._client
elif self._client:
return self._client
return self.session.client("s3")
[docs]
@retry((S3UploadFailedError,), tries=3, delay=1, logger=KAFKA_LOG)
def upload(
self, body: str | bytes, file_name: str, mime_type: str = None
) -> str | None:
prefix_key: str = (
f"{self._config.prefix_key}/{file_name}"
if self._config.prefix_key != ""
else file_name
)
upload_path: str = (
f"{self.config.bucket_name}/{self.config.prefix_key}/{file_name}"
)
try:
self.client.put_object(
Bucket=self._config.bucket_name,
Key=prefix_key,
Body=body.encode("utf-8") if not isinstance(body, bytes) else body,
ContentType="application/json" if not mime_type else mime_type,
)
KAFKA_LOG.info(f"File uploaded to {upload_path}")
return upload_path
except ClientError as error:
KAFKA_LOG.exception(error)
KAFKA_LOG.error(
"Failed to upload report to "
f"{self.config.bucket_name}/{self.config.prefix_key}/{file_name}"
)
except Exception as error:
KAFKA_LOG.exception(error)
KAFKA_LOG.error(
"Failed to upload report to "
f"{self.config.bucket_name}/{self.config.prefix_key}/{file_name}"
)