Skip to content

Commit eb76a6f

Browse files
committed
Refactor BatchLogRecordProcessor
1 parent 3644a1e commit eb76a6f

File tree

2 files changed

+173
-277
lines changed

2 files changed

+173
-277
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 79 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121
import sys
2222
import threading
2323
from os import environ, linesep
24-
from time import time_ns
25-
from typing import IO, Callable, Deque, List, Optional, Sequence
24+
from typing import IO, Callable, Deque, Optional, Sequence
2625

2726
from opentelemetry.context import (
2827
_SUPPRESS_INSTRUMENTATION_KEY,
@@ -55,6 +54,12 @@ class LogExportResult(enum.Enum):
5554
FAILURE = 1
5655

5756

57+
class BatchLogExportStrategy(enum.Enum):
58+
EXPORT_ALL = 0
59+
EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1
60+
EXPORT_AT_LEAST_ONE_BATCH = 2
61+
62+
5863
class LogExporter(abc.ABC):
5964
"""Interface for exporting logs.
6065
@@ -140,14 +145,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=n
140145
return True
141146

142147

143-
class _FlushRequest:
144-
__slots__ = ["event", "num_log_records"]
145-
146-
def __init__(self):
147-
self.event = threading.Event()
148-
self.num_log_records = 0
149-
150-
151148
_BSP_RESET_ONCE = Once()
152149

153150

@@ -166,8 +163,6 @@ class BatchLogRecordProcessor(LogRecordProcessor):
166163
"""
167164

168165
_queue: Deque[LogData]
169-
_flush_request: _FlushRequest | None
170-
_log_records: List[LogData | None]
171166

172167
def __init__(
173168
self,
@@ -189,7 +184,7 @@ def __init__(
189184
max_export_batch_size = (
190185
BatchLogRecordProcessor._default_max_export_batch_size()
191186
)
192-
187+
# Not used. No way currently to pass timeout to export.
193188
if export_timeout_millis is None:
194189
export_timeout_millis = (
195190
BatchLogRecordProcessor._default_export_timeout_millis()
@@ -198,29 +193,44 @@ def __init__(
198193
BatchLogRecordProcessor._validate_arguments(
199194
max_queue_size, schedule_delay_millis, max_export_batch_size
200195
)
201-
202196
self._exporter = exporter
203197
self._max_queue_size = max_queue_size
204-
self._schedule_delay_millis = schedule_delay_millis
198+
self._schedule_delay = schedule_delay_millis / 1e3
205199
self._max_export_batch_size = max_export_batch_size
200+
# Not used. No way currently to pass timeout to export.
206201
self._export_timeout_millis = export_timeout_millis
202+
# Deque is thread safe.
207203
self._queue = collections.deque([], max_queue_size)
208204
self._worker_thread = threading.Thread(
209205
name="OtelBatchLogRecordProcessor",
210206
target=self.worker,
211207
daemon=True,
212208
)
213-
self._condition = threading.Condition(threading.Lock())
214209
self._shutdown = False
215-
self._flush_request = None
216-
self._log_records = [None] * self._max_export_batch_size
210+
self._export_lock = threading.Lock()
211+
self._worker_sleep = threading.Event()
217212
self._worker_thread.start()
218213
if hasattr(os, "register_at_fork"):
219214
os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
220215
self._pid = os.getpid()
221216

217+
def _should_export_batch(
218+
self, batch_strategy: BatchLogExportStrategy, num_iterations: int
219+
) -> bool:
220+
if not self._queue:
221+
return False
222+
# Always continue to export while queue length exceeds max batch size.
223+
if len(self._queue) >= self._max_export_batch_size:
224+
return True
225+
if batch_strategy == BatchLogExportStrategy.EXPORT_ALL:
226+
return True
227+
if batch_strategy == BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH:
228+
return num_iterations == 0
229+
return False
230+
222231
def _at_fork_reinit(self):
223-
self._condition = threading.Condition(threading.Lock())
232+
self._export_lock = threading.Lock()
233+
self._worker_sleep = threading.Event()
224234
self._queue.clear()
225235
self._worker_thread = threading.Thread(
226236
name="OtelBatchLogRecordProcessor",
@@ -231,152 +241,75 @@ def _at_fork_reinit(self):
231241
self._pid = os.getpid()
232242

233243
def worker(self):
234-
timeout = self._schedule_delay_millis / 1e3
235-
flush_request: Optional[_FlushRequest] = None
236244
while not self._shutdown:
237-
with self._condition:
238-
if self._shutdown:
239-
# shutdown may have been called, avoid further processing
240-
break
241-
flush_request = self._get_and_unset_flush_request()
242-
if (
243-
len(self._queue) < self._max_export_batch_size
244-
and flush_request is None
245-
):
246-
self._condition.wait(timeout)
247-
248-
flush_request = self._get_and_unset_flush_request()
249-
if not self._queue:
250-
timeout = self._schedule_delay_millis / 1e3
251-
self._notify_flush_request_finished(flush_request)
252-
flush_request = None
253-
continue
254-
if self._shutdown:
255-
break
256-
257-
start_ns = time_ns()
258-
self._export(flush_request)
259-
end_ns = time_ns()
260-
# subtract the duration of this export call to the next timeout
261-
timeout = self._schedule_delay_millis / 1e3 - (
262-
(end_ns - start_ns) / 1e9
263-
)
264-
265-
self._notify_flush_request_finished(flush_request)
266-
flush_request = None
267-
268-
# there might have been a new flush request while export was running
269-
# and before the done flag switched to true
270-
with self._condition:
271-
shutdown_flush_request = self._get_and_unset_flush_request()
272-
273-
# flush the remaining logs
274-
self._drain_queue()
275-
self._notify_flush_request_finished(flush_request)
276-
self._notify_flush_request_finished(shutdown_flush_request)
277-
278-
def _export(self, flush_request: Optional[_FlushRequest] = None):
279-
"""Exports logs considering the given flush_request.
280-
281-
If flush_request is not None then logs are exported in batches
282-
until the number of exported logs reached or exceeded the num of logs in
283-
flush_request, otherwise exports at max max_export_batch_size logs.
284-
"""
285-
if flush_request is None:
286-
self._export_batch()
287-
return
288-
289-
num_log_records = flush_request.num_log_records
290-
while self._queue:
291-
exported = self._export_batch()
292-
num_log_records -= exported
293-
294-
if num_log_records <= 0:
245+
# Lots of strategies in the spec for setting next timeout.
246+
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
247+
# Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
248+
sleep_interrupted = self._worker_sleep.wait(self._schedule_delay)
249+
if self._shutdown:
295250
break
296-
297-
def _export_batch(self) -> int:
298-
"""Exports at most max_export_batch_size logs and returns the number of
299-
exported logs.
300-
"""
301-
idx = 0
302-
while idx < self._max_export_batch_size and self._queue:
303-
record = self._queue.pop()
304-
self._log_records[idx] = record
305-
idx += 1
306-
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
307-
try:
308-
self._exporter.export(self._log_records[:idx]) # type: ignore
309-
except Exception: # pylint: disable=broad-exception-caught
310-
_logger.exception("Exception while exporting logs.")
311-
detach(token)
312-
313-
for index in range(idx):
314-
self._log_records[index] = None
315-
return idx
316-
317-
def _drain_queue(self):
318-
"""Export all elements until queue is empty.
319-
320-
Can only be called from the worker thread context because it invokes
321-
`export` that is not thread safe.
322-
"""
323-
while self._queue:
324-
self._export_batch()
325-
326-
def _get_and_unset_flush_request(self) -> Optional[_FlushRequest]:
327-
flush_request = self._flush_request
328-
self._flush_request = None
329-
if flush_request is not None:
330-
flush_request.num_log_records = len(self._queue)
331-
return flush_request
332-
333-
@staticmethod
334-
def _notify_flush_request_finished(
335-
flush_request: Optional[_FlushRequest] = None,
336-
):
337-
if flush_request is not None:
338-
flush_request.event.set()
339-
340-
def _get_or_create_flush_request(self) -> _FlushRequest:
341-
if self._flush_request is None:
342-
self._flush_request = _FlushRequest()
343-
return self._flush_request
251+
self._export(
252+
BatchLogExportStrategy.EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
253+
if sleep_interrupted
254+
else BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH
255+
)
256+
self._worker_sleep.clear()
257+
self._export(BatchLogExportStrategy.EXPORT_ALL)
258+
259+
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
260+
with self._export_lock:
261+
iteration = 0
262+
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
263+
# once the lock is obtained to see if we still need to make the requested export.
264+
while self._should_export_batch(batch_strategy, iteration):
265+
iteration += 1
266+
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
267+
try:
268+
self._exporter.export(
269+
[
270+
# Oldest records are at the back, so pop from there.
271+
self._queue.pop()
272+
for _ in range(
273+
min(
274+
self._max_export_batch_size,
275+
len(self._queue),
276+
)
277+
)
278+
]
279+
)
280+
except Exception: # pylint: disable=broad-exception-caught
281+
_logger.exception("Exception while exporting logs.")
282+
detach(token)
344283

345284
def emit(self, log_data: LogData) -> None:
346-
"""Adds the `LogData` to queue and notifies the waiting threads
347-
when size of queue reaches max_export_batch_size.
348-
"""
349285
if self._shutdown:
286+
_logger.warning("Shutdown called, ignoring log.")
350287
return
351288
if self._pid != os.getpid():
352289
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)
353290

291+
if len(self._queue) == self._max_queue_size:
292+
_logger.warning("Queue full, dropping log.")
354293
self._queue.appendleft(log_data)
355294
if len(self._queue) >= self._max_export_batch_size:
356-
with self._condition:
357-
self._condition.notify()
295+
self._worker_sleep.set()
358296

359297
def shutdown(self):
298+
if self._shutdown:
299+
return
300+
# Prevents emit and force_flush from further calling export.
360301
self._shutdown = True
361-
with self._condition:
362-
self._condition.notify_all()
302+
# Interrupts sleep in the worker, if it's sleeping.
303+
self._worker_sleep.set()
304+
# Main worker loop should exit after one final export call with flush all strategy.
363305
self._worker_thread.join()
364306
self._exporter.shutdown()
365307

366308
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
367-
if timeout_millis is None:
368-
timeout_millis = self._export_timeout_millis
369309
if self._shutdown:
370-
return True
371-
372-
with self._condition:
373-
flush_request = self._get_or_create_flush_request()
374-
self._condition.notify_all()
375-
376-
ret = flush_request.event.wait(timeout_millis / 1e3)
377-
if not ret:
378-
_logger.warning("Timeout was exceeded in force_flush().")
379-
return ret
310+
return
311+
# Blocking call to export.
312+
self._export(BatchLogExportStrategy.EXPORT_ALL)
380313

381314
@staticmethod
382315
def _default_max_queue_size():

0 commit comments

Comments
 (0)