Source code for kafka_overwatch.common
# SPDX-License-Identifier: MPL-2.0
# Copyright 2024 John Mille <john@ews-network.net>
from __future__ import annotations
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from concurrent.futures import Future
import concurrent
from compose_x_common.compose_x_common import keyisset
from kafka_overwatch.config.logging import KAFKA_LOG
from kafka_overwatch.processing.signals import STOP_FLAG
[docs]
def waiting_on_futures(
executor,
futures_to_data: list[Future] | dict[Future, Any],
resource_type: str,
resource_name: str,
scan_type: str,
stop_flag: dict,
):
_pending = len(futures_to_data)
KAFKA_LOG.debug(
"{}: {} | {} to scan: {}".format(
resource_type, resource_name, scan_type, _pending
)
)
while _pending > 0:
if STOP_FLAG.is_set() or stop_flag["stop"] is True:
for _future in futures_to_data:
_future.cancel()
executor.shutdown(wait=False, cancel_futures=True)
return
_, other = concurrent.futures.wait(futures_to_data, timeout=5)
_pending = len([_f for _f in other if not _f.done()])
KAFKA_LOG.debug(
"%s: %s | %s pending: %s"
% (resource_type, resource_name, scan_type, _pending)
)