Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
([#42360] https://github.com/Azure/azure-sdk-for-python/pull/42360)
- Configuration manager/worker fetch via OneSettings part 2 - Concurrency and refactoring of _ConfigurationManager
([#42508] https://github.com/Azure/azure-sdk-for-python/pull/42508)
- Refactoring of statsbeat to use `StatsbeatManager`
([#42716] https://github.com/Azure/azure-sdk-for-python/pull/42716)

## 1.0.0b41 (2025-07-31)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@

# Statsbeat
# (OpenTelemetry metric name, Statsbeat metric name)
# Note: OpenTelemetry SDK normalizes metric names to lowercase, so first element should be lowercase
_ATTACH_METRIC_NAME = ("attach", "Attach")
_FEATURE_METRIC_NAME = ("feature", "Feature")
_REQ_EXCEPTION_NAME = ("statsbeat_exception_count", "Exception_Count")
_REQ_DURATION_NAME = ("statsbeat_duration", "Request_Duration")
_REQ_FAILURE_NAME = ("statsbeat_failure_count", "Request_Failure_Count")
_REQ_RETRY_NAME = ("statsbeat_retry_count", "Retry_Count")
_REQ_SUCCESS_NAME = ("statsbeat_success_count", "Request_Success_Count")
_REQ_THROTTLE_NAME = ("statsbeat_throttle_count", "Throttle_Count")
_REQ_EXCEPTION_NAME = ("exception_count", "Exception_Count")
_REQ_DURATION_NAME = ("request_duration", "Request_Duration")
_REQ_FAILURE_NAME = ("request_failure_count", "Request_Failure_Count")
_REQ_RETRY_NAME = ("retry_count", "Retry_Count")
_REQ_SUCCESS_NAME = ("request_success_count", "Request_Success_Count")
_REQ_THROTTLE_NAME = ("throttle_count", "Throttle_Count")

_STATSBEAT_METRIC_NAME_MAPPINGS = dict(
[
Expand All @@ -117,8 +118,8 @@
# pylint: disable=line-too-long
_DEFAULT_NON_EU_STATS_CONNECTION_STRING = "InstrumentationKey=c4a29126-a7cb-47e5-b348-11414998b11e;IngestionEndpoint=https://westus-0.in.applicationinsights.azure.com/"
_DEFAULT_EU_STATS_CONNECTION_STRING = "InstrumentationKey=7dc56bab-3c0c-4e9f-9ebb-d1acadee8d0f;IngestionEndpoint=https://westeurope-5.in.applicationinsights.azure.com/"
_DEFAULT_STATS_SHORT_EXPORT_INTERVAL = 900 # 15 minutes
_DEFAULT_STATS_LONG_EXPORT_INTERVAL = 86400 # 24 hours
_DEFAULT_STATS_SHORT_EXPORT_INTERVAL = 15 * 60 # 15 minutes in s
_DEFAULT_STATS_LONG_EXPORT_INTERVAL = 24 * 60 * 60 # 24 hours in s
_EU_ENDPOINTS = [
"westeurope",
"northeurope",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1320,7 +1320,7 @@ class TrackResponse(msrest.serialization.Model):
"errors": {"key": "errors", "type": "[TelemetryErrorDetails]"},
}

def __init__(
def __init__( # type: ignore
self,
*,
items_received: Optional[int] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import random
import subprocess
import errno
from typing import Union
from typing import Union, Optional, Any, Generator, Tuple, List, Type
from enum import Enum

from azure.monitor.opentelemetry.exporter._utils import PeriodicTask
Expand All @@ -26,15 +26,15 @@
os.environ.get("SYSTEMDRIVE", "C:"), r"\Windows\System32\icacls.exe"
)

def _fmt(timestamp):
def _fmt(timestamp: datetime.datetime) -> str:
return timestamp.strftime("%Y-%m-%dT%H%M%S.%f")


def _now():
def _now() -> datetime.datetime:
return datetime.datetime.now(tz=datetime.timezone.utc)


def _seconds(seconds):
def _seconds(seconds: int) -> datetime.timedelta:
return datetime.timedelta(seconds=seconds)

class StorageExportResult(Enum):
Expand All @@ -45,24 +45,24 @@ class StorageExportResult(Enum):

# pylint: disable=broad-except
class LocalFileBlob:
def __init__(self, fullpath):
def __init__(self, fullpath: str) -> None:
self.fullpath = fullpath

def delete(self):
def delete(self) -> None:
try:
os.remove(self.fullpath)
except Exception:
pass # keep silent

def get(self):
def get(self) -> Optional[Tuple[Any, ...]]:
try:
with open(self.fullpath, "r", encoding="utf-8") as file:
return tuple(json.loads(line.strip()) for line in file.readlines())
except Exception:
pass # keep silent
return None

def put(self, data, lease_period=0) -> Union[StorageExportResult, str]:
def put(self, data: List[Any], lease_period: int = 0) -> Union[StorageExportResult, str]:
try:
fullpath = self.fullpath + ".tmp"
with open(fullpath, "w", encoding="utf-8") as file:
Expand All @@ -80,7 +80,7 @@ def put(self, data, lease_period=0) -> Union[StorageExportResult, str]:
except Exception as ex:
return str(ex)

def lease(self, period):
def lease(self, period: int) -> Optional['LocalFileBlob']:
timestamp = _now() + _seconds(period)
fullpath = self.fullpath
if fullpath.endswith(".lock"):
Expand All @@ -98,14 +98,14 @@ def lease(self, period):
class LocalFileStorage:
def __init__(
self,
path,
max_size=50 * 1024 * 1024, # 50MiB
maintenance_period=60, # 1 minute
retention_period=48 * 60 * 60, # 48 hours
write_timeout=60, # 1 minute,
name=None,
lease_period=60, # 1 minute
):
path: str,
max_size: int = 50 * 1024 * 1024, # 50MiB
maintenance_period: int = 60, # 1 minute
retention_period: int = 48 * 60 * 60, # 48 hours
write_timeout: int = 60, # 1 minute,
name: Optional[str] = None,
lease_period: int = 60, # 1 minute
) -> None:
self._path = os.path.abspath(path)
self._max_size = max_size
self._retention_period = retention_period
Expand All @@ -124,19 +124,24 @@ def __init__(
else:
logger.error("Could not set secure permissions on storage folder, local storage is disabled.")

def close(self):
def close(self) -> None:
if self._enabled:
self._maintenance_task.cancel()
self._maintenance_task.join()

def __enter__(self):
def __enter__(self) -> 'LocalFileStorage':
return self

# pylint: disable=redefined-builtin
def __exit__(self, type, value, traceback):
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[Any]
) -> None:
self.close()

def _maintenance_routine(self):
def _maintenance_routine(self) -> None:
try:
# pylint: disable=unused-variable
for blob in self.gets():
Expand All @@ -145,7 +150,7 @@ def _maintenance_routine(self):
pass # keep silent

# pylint: disable=too-many-nested-blocks
def gets(self):
def gets(self) -> Generator[LocalFileBlob, None, None]:
if self._enabled:
now = _now()
lease_deadline = _fmt(now)
Expand Down Expand Up @@ -184,7 +189,7 @@ def gets(self):
else:
pass

def get(self):
def get(self) -> Optional['LocalFileBlob']:
if not self._enabled:
return None
cursor = self.gets()
Expand All @@ -194,7 +199,7 @@ def get(self):
pass
return None

def put(self, data, lease_period=None) -> Union[StorageExportResult, str]:
def put(self, data: List[Any], lease_period: Optional[int] = None) -> Union[StorageExportResult, str]:
try:
if not self._enabled:
if get_local_storage_setup_state_readonly():
Expand All @@ -221,7 +226,7 @@ def put(self, data, lease_period=None) -> Union[StorageExportResult, str]:
return str(ex)


def _check_and_set_folder_permissions(self):
def _check_and_set_folder_permissions(self) -> bool:
"""
Validate and set folder permissions where the telemetry data will be stored.
:return: True if folder was created and permissions set successfully, False otherwise.
Expand Down Expand Up @@ -266,7 +271,7 @@ def _check_and_set_folder_permissions(self):
set_local_storage_setup_state_exception(str(ex))
return False

def _check_storage_size(self):
def _check_storage_size(self) -> bool:
size = 0
# pylint: disable=unused-variable
for dirpath, dirnames, filenames in os.walk(self._path):
Expand Down Expand Up @@ -295,7 +300,7 @@ def _check_storage_size(self):
return False
return True

def _get_current_user(self):
def _get_current_user(self) -> str:
user = ""
domain = os.environ.get("USERDOMAIN")
username = os.environ.get("USERNAME")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,13 @@ def _get_scope(aad_audience=None):

class Singleton(type):
_instance = None
_lock = threading.Lock()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will be leveraging the Singleton class in utils going forward in the future where we need to. For example, live metrics uses the same pattern.


def __call__(cls, *args, **kwargs):
def __call__(cls, *args: Any, **kwargs: Any):
if not cls._instance:
cls._instance = super(Singleton, cls).__call__(*args, **kwargs)
with cls._lock:
if not cls._instance:
cls._instance = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instance

def _get_telemetry_type(item: TelemetryItem):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from azure.monitor.opentelemetry.exporter.statsbeat._state import (
get_statsbeat_initial_success,
get_statsbeat_shutdown,
get_customer_sdkstats_shutdown,
increment_and_check_statsbeat_failure_count,
is_statsbeat_enabled,
set_statsbeat_initial_success,
Expand Down Expand Up @@ -99,9 +100,11 @@ def __init__(self, **kwargs: Any) -> None:
# self._configuration_manager = _ConfigurationManager()

self._api_version = kwargs.get("api_version") or _SERVICE_API_LATEST
# We do not need to use entra Id if this is a sdkStats exporter
if self._is_stats_exporter():
self._credential = None
else:
# We use the credential on a regular exporter or customer sdkStats exporter
self._credential = _get_authentication_credential(**kwargs)
self._consecutive_redirects = 0 # To prevent circular redirects
self._disable_offline_storage = kwargs.get("disable_offline_storage", False)
Expand Down Expand Up @@ -157,8 +160,8 @@ def __init__(self, **kwargs: Any) -> None:
)
self.storage = None
if not self._disable_offline_storage:
self.storage = LocalFileStorage(
path=self._storage_directory,
self.storage = LocalFileStorage( # pyright: ignore
path=self._storage_directory, # type: ignore
max_size=self._storage_max_size,
maintenance_period=self._storage_maintenance_period,
retention_period=self._storage_retention_period,
Expand All @@ -170,10 +173,12 @@ def __init__(self, **kwargs: Any) -> None:

# statsbeat initialization
if self._should_collect_stats():
# Import here to avoid circular dependencies
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import collect_statsbeat_metrics

collect_statsbeat_metrics(self)
try:
# Import here to avoid circular dependencies
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import collect_statsbeat_metrics
collect_statsbeat_metrics(self)
except Exception as e: # pylint: disable=broad-except
logger.warning("Failed to initialize statsbeat metrics: %s", e)

# Initialize customer sdkstats if enabled
self._customer_sdkstats_metrics = None
Expand Down Expand Up @@ -453,21 +458,20 @@ def _should_collect_stats(self):
is_statsbeat_enabled()
and not get_statsbeat_shutdown()
and not self._is_stats_exporter()
and not self._is_customer_sdkstats_exporter()
and not self._instrumentation_collection
)


# check to see whether its the case of customer sdkstats collection
def _should_collect_customer_sdkstats(self):
# Import here to avoid circular dependencies
from azure.monitor.opentelemetry.exporter.statsbeat._state import get_customer_sdkstats_shutdown

env_value = os.environ.get("APPLICATIONINSIGHTS_SDKSTATS_ENABLED_PREVIEW", "")
is_customer_sdkstats_enabled = env_value.lower() == "true"
# Don't collect customer sdkstats for instrumentation collection or customer sdkstats exporter
# Don't collect customer sdkstats for instrumentation collection, sdkstats exporter or customer sdkstats exporter
return (
is_customer_sdkstats_enabled
and not get_customer_sdkstats_shutdown()
and not self._is_stats_exporter()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an important change, we do not want to collect cx stats on regular stats exporter.

and not self._is_customer_sdkstats_exporter()
and not self._instrumentation_collection
)
Expand All @@ -477,7 +481,7 @@ def _is_statsbeat_initializing_state(self):
return self._is_stats_exporter() and not get_statsbeat_shutdown() and not get_statsbeat_initial_success()

def _is_stats_exporter(self):
return self.__class__.__name__ == "_StatsBeatExporter"
return getattr(self, "_is_sdkstats", False)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed _StatsBeatExporter class and use regular metrics exporter instead so we use a field on the exporter instead to determine whether it is a stats exporter.


def _is_customer_sdkstats_exporter(self):
return getattr(self, '_is_customer_sdkstats', False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
_APPLICATIONINSIGHTS_METRIC_NAMESPACE_OPT_IN,
_AUTOCOLLECTED_INSTRUMENT_NAMES,
_METRIC_ENVELOPE_NAME,
_STATSBEAT_METRIC_NAME_MAPPINGS,
)
from azure.monitor.opentelemetry.exporter import _utils
from azure.monitor.opentelemetry.exporter._generated.models import (
Expand Down Expand Up @@ -75,13 +76,15 @@ class AzureMonitorMetricExporter(BaseExporter, MetricExporter):
"""Azure Monitor Metric exporter for OpenTelemetry."""

def __init__(self, **kwargs: Any) -> None:
self._is_sdkstats = kwargs.get("is_sdkstats", False)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We move these attribute settings BEFORE the parent __init__ calls because BaseExporter needs to know if these fields are set.

self._is_customer_sdkstats = kwargs.get("is_customer_sdkstats", False)
self._metrics_to_log_analytics = self._determine_metrics_to_log_analytics()
BaseExporter.__init__(self, **kwargs)
MetricExporter.__init__(
self,
preferred_temporality=APPLICATION_INSIGHTS_METRIC_TEMPORALITIES, # type: ignore
preferred_aggregation=kwargs.get("preferred_aggregation"), # type: ignore
)
self._metrics_to_log_analytics = self._determine_metrics_to_log_analytics()

# pylint: disable=R1702
def export(
Expand Down Expand Up @@ -157,7 +160,13 @@ def _point_to_envelope(
# When Metrics to Log Analytics is disabled, only send Standard metrics and _OTELRESOURCE_
if not self._metrics_to_log_analytics and name not in _AUTOCOLLECTED_INSTRUMENT_NAMES:
return None
envelope = _convert_point_to_envelope(point, name, resource, scope)

# Apply statsbeat metric name mapping if this is a statsbeat exporter
final_metric_name = name
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is added since we got rid of _StatsBeatExporter

if self._is_sdkstats and name in _STATSBEAT_METRIC_NAME_MAPPINGS:
final_metric_name = _STATSBEAT_METRIC_NAME_MAPPINGS[name]

envelope = _convert_point_to_envelope(point, final_metric_name, resource, scope)
if name in _AUTOCOLLECTED_INSTRUMENT_NAMES:
envelope = _handle_std_metric_envelope(envelope, name, point.attributes) # type: ignore
if envelope is not None:
Expand All @@ -182,8 +191,11 @@ def _determine_metrics_to_log_analytics(self) -> bool:
:return: False if metrics should not be sent to Log Analytics, True otherwise.
:rtype: bool
"""
# If sdkStats exporter, always send to LA
if self._is_sdkstats:
return True
# Disabling metrics to Log Analytics via env var is currently only specified for AKS Attach scenarios.
if not _utils._is_on_aks() or not _utils._is_attach_enabled() or self._is_stats_exporter():
if not _utils._is_on_aks() or not _utils._is_attach_enabled():
return True
env_var = os.environ.get(_APPLICATIONINSIGHTS_METRICS_TO_LOGANALYTICS_ENABLED)
if not env_var:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

"""
Statsbeat metrics collection module.

This module provides a singleton-based, thread-safe manager for collecting
and reporting statsbeat metrics.
"""

from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import (
collect_statsbeat_metrics,
shutdown_statsbeat_metrics,
)
from azure.monitor.opentelemetry.exporter.statsbeat._manager import (
StatsbeatConfig,
StatsbeatManager,
)

__all__ = [
'StatsbeatConfig',
'StatsbeatManager',
'collect_statsbeat_metrics',
'shutdown_statsbeat_metrics',
]
Loading