Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
([#42360] https://github.com/Azure/azure-sdk-for-python/pull/42360)
- Configuration manager/worker fetch via OneSettings part 2 - Concurrency and refactoring of _ConfigurationManager
([#42508] https://github.com/Azure/azure-sdk-for-python/pull/42508)
- Refactoring of statsbeat to use `StatsbeatManager`
([#42508] https://github.com/Azure/azure-sdk-for-python/pull/42508)

## 1.0.0b41 (2025-07-31)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,13 @@ def _get_scope(aad_audience=None):

class Singleton(type):
_instance = None
_lock = threading.Lock()

def __call__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(Singleton, cls).__call__(*args, **kwargs)
with cls._lock:
if not cls._instance:
cls._instance = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instance

def _get_telemetry_type(item: TelemetryItem):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,14 @@ def __init__(self, **kwargs: Any) -> None:
self._instrumentation_collection = kwargs.get("instrumentation_collection", False)

# statsbeat initialization
self._stats_exporter = kwargs.get("is_stats_exporter", False)
if self._should_collect_stats():
# Import here to avoid circular dependencies
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import collect_statsbeat_metrics

collect_statsbeat_metrics(self)
try:
# Import here to avoid circular dependencies
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import collect_statsbeat_metrics
collect_statsbeat_metrics(self)
except Exception as e: # pylint: disable=broad-except
logger.warning("Failed to initialize statsbeat metrics: %s", e)

# Initialize customer sdkstats if enabled
self._customer_sdkstats_metrics = None
Expand Down Expand Up @@ -477,7 +480,7 @@ def _is_statsbeat_initializing_state(self):
return self._is_stats_exporter() and not get_statsbeat_shutdown() and not get_statsbeat_initial_success()

def _is_stats_exporter(self):
return self.__class__.__name__ == "_StatsBeatExporter"
return getattr(self, "_stats_exporter", False)

def _is_customer_sdkstats_exporter(self):
return getattr(self, '_is_customer_sdkstats', False)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

"""
Statsbeat metrics collection module.

This module provides a singleton-based, thread-safe manager for collecting
and reporting statsbeat metrics.
"""

from azure.monitor.opentelemetry.exporter.statsbeat._manager import (
StatsbeatConfig,
StatsbeatManager,
collect_statsbeat_metrics,
shutdown_statsbeat_metrics,
)

__all__ = [
'StatsbeatConfig',
'StatsbeatManager',
'collect_statsbeat_metrics',
'shutdown_statsbeat_metrics',
]
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
_CUSTOMER_SDKSTATS_LANGUAGE,
)

from azure.monitor.opentelemetry.exporter.export.metrics._exporter import AzureMonitorMetricExporter

from azure.monitor.opentelemetry.exporter._utils import (
Singleton,
get_compute_type,
Expand All @@ -35,7 +35,6 @@
categorize_status_code,
_get_customer_sdkstats_export_interval,
)
from azure.monitor.opentelemetry.exporter import VERSION

from azure.monitor.opentelemetry.exporter.statsbeat._state import (
_CUSTOMER_SDKSTATS_STATE,
Expand All @@ -48,6 +47,7 @@ def __init__(self):
self.total_item_drop_count: Dict[str, Dict[DropCodeType, Dict[str, int]]] = {}
self.total_item_retry_count: Dict[str, Dict[RetryCodeType, Dict[str, int]]] = {}


class CustomerSdkStatsMetrics(metaclass=Singleton): # pylint: disable=too-many-instance-attributes
def __init__(self, connection_string):
self._counters = _CustomerSdkStatsTelemetryCounters()
Expand All @@ -56,6 +56,10 @@ def __init__(self, connection_string):
if not self._is_enabled:
return

# Use delayed import to avoid circular import
from azure.monitor.opentelemetry.exporter.export.metrics._exporter import AzureMonitorMetricExporter
from azure.monitor.opentelemetry.exporter import VERSION

exporter_config = {
"connection_string": connection_string,
"instrumentation_collection": True, # Prevent circular dependency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@
from opentelemetry.sdk.util.instrumentation import InstrumentationScope

from azure.monitor.opentelemetry.exporter._generated.models import TelemetryItem
from azure.monitor.opentelemetry.exporter import AzureMonitorMetricExporter
from azure.monitor.opentelemetry.exporter._constants import _STATSBEAT_METRIC_NAME_MAPPINGS


class _StatsBeatExporter(AzureMonitorMetricExporter):
class _StatsBeatExporter:
def __init__(self, **kwargs):
# Create the actual exporter using delayed import
from azure.monitor.opentelemetry.exporter import AzureMonitorMetricExporter

kwargs['is_stats_exporter'] = True
self._exporter = AzureMonitorMetricExporter(**kwargs)

def _point_to_envelope(
self,
Expand All @@ -21,9 +26,13 @@ def _point_to_envelope(
) -> Optional[TelemetryItem]:
# map statsbeat name from OpenTelemetry name
name = _STATSBEAT_METRIC_NAME_MAPPINGS[name]
return super()._point_to_envelope(
return self._exporter._point_to_envelope(
point,
name,
resource,
None,
)

def __getattr__(self, name):
"""Delegate all other attributes to the wrapped exporter."""
return getattr(self._exporter, name)
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import logging
import threading
from typing import Optional, Any

from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource

from azure.monitor.opentelemetry.exporter.statsbeat._exporter import _StatsBeatExporter
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat_metrics import _StatsbeatMetrics
from azure.monitor.opentelemetry.exporter.statsbeat._state import (
_STATSBEAT_STATE,
_STATSBEAT_STATE_LOCK,
is_statsbeat_enabled,
set_statsbeat_shutdown, # Add this import
)
from azure.monitor.opentelemetry.exporter.statsbeat._utils import (
_get_stats_connection_string,
_get_stats_long_export_interval,
_get_stats_short_export_interval,
)
from azure.monitor.opentelemetry.exporter._utils import Singleton

logger = logging.getLogger(__name__)


class StatsbeatConfig:
"""Configuration class for Statsbeat metrics collection."""

def __init__(self,
endpoint: str,
instrumentation_key: str,
disable_offline_storage: bool = False,
credential: Optional[Any] = None,
distro_version: Optional[str] = None):
self.endpoint = endpoint
self.instrumentation_key = instrumentation_key
self.connection_string = _get_stats_connection_string(endpoint)
self.distro_version = distro_version
# features
self.disable_offline_storage = disable_offline_storage
self.credential = credential

@classmethod
def from_exporter(cls, exporter) -> 'StatsbeatConfig':
"""Create configuration from an exporter instance."""
return cls(
endpoint=exporter._endpoint,
instrumentation_key=exporter._instrumentation_key,
disable_offline_storage=exporter._disable_offline_storage,
credential=exporter._credential,
distro_version=exporter._distro_version,
)

def __eq__(self, other) -> bool:
"""Compare two configurations for equality based on what can be changed via control plane."""
if not isinstance(other, StatsbeatConfig):
return False
return (
self.connection_string == other.connection_string and
self.disable_offline_storage == other.disable_offline_storage
)

def __hash__(self) -> int:
"""Hash based on connection string and offline storage setting."""
return hash((self.connection_string, self.disable_offline_storage))


class StatsbeatManager(metaclass=Singleton):
"""Thread-safe singleton manager for Statsbeat metrics collection with dynamic reconfiguration support."""

def __init__(self):
"""Initialize instance attributes. Called only once due to Singleton metaclass."""
# Instance-level attributes
self._lock = threading.Lock()
self._initialized: bool = False
self._metrics: Optional[_StatsbeatMetrics] = None
self._meter_provider: Optional[MeterProvider] = None
self._config: Optional[StatsbeatConfig] = None

def initialize(self, config: StatsbeatConfig) -> bool:
"""Initialize statsbeat collection with thread safety."""
if not is_statsbeat_enabled():
return False

with self._lock:
if self._initialized:
# If already initialized with the same config, return True
if self._config and self._config == config:
return True
# If config is different, reconfigure
return self._reconfigure(config)

return self._do_initialize(config)

def _do_initialize(self, config: StatsbeatConfig) -> bool:
"""Internal initialization method."""
try:
# Create statsbeat exporter
statsbeat_exporter = _StatsBeatExporter(
connection_string=config.connection_string,
disable_offline_storage=config.disable_offline_storage,
)

# Create metric reader
reader = PeriodicExportingMetricReader(
statsbeat_exporter,
export_interval_millis=_get_stats_short_export_interval() * 1000, # 15m by default
)

# Create meter provider
self._meter_provider = MeterProvider(
metric_readers=[reader],
resource=Resource.get_empty(),
)

# long_interval_threshold represents how many collects for short interval
# should have passed before a long interval collect
long_interval_threshold = (
_get_stats_long_export_interval() // _get_stats_short_export_interval()
)

# Create statsbeat metrics
self._metrics = _StatsbeatMetrics(
self._meter_provider,
config.instrumentation_key,
config.endpoint,
config.disable_offline_storage,
long_interval_threshold,
config.credential is not None,
config.distro_version,
)

# Force initial flush and initialize non-initial metrics
self._meter_provider.force_flush()
self._metrics.init_non_initial_metrics()

self._config = config
self._initialized = True
return True

except Exception as e: # pylint: disable=broad-except
# Log the error for debugging
logger.warning("Failed to initialize statsbeat: %s", e)
# Clean up on failure
self._cleanup()
return False

def _cleanup(self):
"""Clean up resources."""
if self._meter_provider:
try:
self._meter_provider.shutdown()
except Exception: # pylint: disable=broad-except
pass
self._meter_provider = None
self._metrics = None
self._config = None
self._initialized = False

def shutdown(self) -> bool:
"""Shutdown statsbeat collection with thread safety."""
with self._lock:
if not self._initialized:
return False

shutdown_success = False
try:
if self._meter_provider is not None:
self._meter_provider.shutdown()
shutdown_success = True
except Exception: # pylint: disable=broad-except
pass
finally:
self._cleanup()

if shutdown_success:
set_statsbeat_shutdown(True) # Use the proper setter function

return shutdown_success

def reconfigure(self, new_config: StatsbeatConfig) -> bool:
"""Reconfigure statsbeat with new configuration."""
if not is_statsbeat_enabled():
return False

with self._lock:
if not self._initialized:
# If not initialized, just initialize with new config
return self._do_initialize(new_config)

# If same config, no need to reconfigure
if self._config and self._config == new_config:
return True

return self._reconfigure(new_config)

def _reconfigure(self, new_config: StatsbeatConfig) -> bool:
"""Internal reconfiguration method."""
# Shutdown current instance with timeout
if self._meter_provider:
try:
# Force flush before shutdown to ensure data is sent
self._meter_provider.force_flush(timeout_millis=5000)
self._meter_provider.shutdown(timeout_millis=5000)
except Exception: # pylint: disable=broad-except
pass

# Reset state but keep initialized=True
self._meter_provider = None
self._metrics = None
self._config = None

# Initialize with new config
success = self._do_initialize(new_config)

if not success:
# If reinitialization failed, mark as not initialized
self._initialized = False

return success


# Global convenience functions
def collect_statsbeat_metrics(exporter) -> None:
"""Collect statsbeat metrics from an exporter."""
config = StatsbeatConfig.from_exporter(exporter)
StatsbeatManager().initialize(config)


def shutdown_statsbeat_metrics() -> bool:
"""Shutdown statsbeat collection globally."""
return StatsbeatManager().shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,8 @@ def get_local_storage_setup_state_exception():
def set_local_storage_setup_state_exception(value):
with _LOCAL_STORAGE_SETUP_STATE_LOCK:
_LOCAL_STORAGE_SETUP_STATE["EXCEPTION_OCCURRED"] = value

def set_statsbeat_shutdown(shutdown: bool):
"""Set the statsbeat shutdown state."""
with _STATSBEAT_STATE_LOCK:
_STATSBEAT_STATE["SHUTDOWN"] = shutdown
Loading
Loading