@@ -248,91 +248,104 @@ def extract_context_from_sqs_or_sns_event_or_context(
248248 except Exception :
249249 logger .debug ("Failed extracting context as EventBridge to SQS." )
250250
251- try :
252- first_record = event .get ("Records" )[0 ]
253- source_arn = first_record .get ("eventSourceARN" , "" )
254-
255- # logic to deal with SNS => SQS event
256- if "body" in first_record :
257- body_str = first_record .get ("body" )
258- try :
259- body = json .loads (body_str )
260- if body .get ("Type" , "" ) == "Notification" and "TopicArn" in body :
261- logger .debug ("Found SNS message inside SQS event" )
262- first_record = get_first_record (create_sns_event (body ))
263- except Exception :
264- pass
265-
266- msg_attributes = first_record .get ("messageAttributes" )
267- if msg_attributes is None :
268- sns_record = first_record .get ("Sns" ) or {}
269- # SNS->SQS event would extract SNS arn without this check
270- if event_source .equals (EventTypes .SNS ):
271- source_arn = sns_record .get ("TopicArn" , "" )
272- msg_attributes = sns_record .get ("MessageAttributes" ) or {}
273- dd_payload = msg_attributes .get ("_datadog" )
274- if dd_payload :
275- # SQS uses dataType and binaryValue/stringValue
276- # SNS uses Type and Value
277- dd_json_data = None
278- dd_json_data_type = dd_payload .get ("Type" ) or dd_payload .get ("dataType" )
279- if dd_json_data_type == "Binary" :
280- import base64
251+ context = None
252+ records = (
253+ event .get ("Records" , [])
254+ if config .data_streams_enabled
255+ else [event .get ("Records" )[0 ]]
256+ )
257+ is_first_record = True
258+ for record in records :
259+ try :
260+ source_arn = record .get ("eventSourceARN" , "" )
261+ dsm_data = None
262+
263+ # logic to deal with SNS => SQS event
264+ if "body" in record :
265+ body_str = record .get ("body" )
266+ try :
267+ body = json .loads (body_str )
268+ if body .get ("Type" , "" ) == "Notification" and "TopicArn" in body :
269+ logger .debug ("Found SNS message inside SQS event" )
270+ record = get_first_record (create_sns_event (body ))
271+ except Exception :
272+ pass
273+
274+ msg_attributes = record .get ("messageAttributes" )
275+ if msg_attributes is None :
276+ sns_record = record .get ("Sns" ) or {}
277+ # SNS->SQS event would extract SNS arn without this check
278+ if event_source .equals (EventTypes .SNS ):
279+ source_arn = sns_record .get ("TopicArn" , "" )
280+ msg_attributes = sns_record .get ("MessageAttributes" ) or {}
281+ dd_payload = msg_attributes .get ("_datadog" )
282+ if dd_payload :
283+ # SQS uses dataType and binaryValue/stringValue
284+ # SNS uses Type and Value
285+ dd_json_data = None
286+ dd_json_data_type = dd_payload .get ("Type" ) or dd_payload .get ("dataType" )
287+ if dd_json_data_type == "Binary" :
288+ import base64
289+
290+ dd_json_data = dd_payload .get ("binaryValue" ) or dd_payload .get (
291+ "Value"
292+ )
293+ if dd_json_data :
294+ dd_json_data = base64 .b64decode (dd_json_data )
295+ elif dd_json_data_type == "String" :
296+ dd_json_data = dd_payload .get ("stringValue" ) or dd_payload .get (
297+ "Value"
298+ )
299+ else :
300+ logger .debug (
301+ "Datadog Lambda Python only supports extracting trace"
302+ "context from String or Binary SQS/SNS message attributes"
303+ )
281304
282- dd_json_data = dd_payload .get ("binaryValue" ) or dd_payload .get ("Value" )
283305 if dd_json_data :
284- dd_json_data = base64 .b64decode (dd_json_data )
285- elif dd_json_data_type == "String" :
286- dd_json_data = dd_payload .get ("stringValue" ) or dd_payload .get ("Value" )
306+ dd_data = json .loads (dd_json_data )
307+
308+ if is_step_function_event (dd_data ):
309+ try :
310+ return extract_context_from_step_functions (dd_data , None )
311+ except Exception :
312+ logger .debug (
313+ "Failed to extract Step Functions context from SQS/SNS event."
314+ )
315+ if is_first_record :
316+ context = propagator .extract (dd_data )
317+ dsm_data = dd_data
287318 else :
288- logger .debug (
289- "Datadog Lambda Python only supports extracting trace"
290- "context from String or Binary SQS/SNS message attributes"
291- )
319+ # Handle case where trace context is injected into attributes.AWSTraceHeader
320+ # example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
321+ attrs = event .get ("Records" )[0 ].get ("attributes" )
322+ if attrs :
323+ x_ray_header = attrs .get ("AWSTraceHeader" )
324+ if x_ray_header :
325+ x_ray_context = parse_xray_header (x_ray_header )
326+ trace_id_parts = x_ray_context .get ("trace_id" , "" ).split ("-" )
327+ if len (trace_id_parts ) > 2 and trace_id_parts [2 ].startswith (
328+ DD_TRACE_JAVA_TRACE_ID_PADDING
329+ ):
330+ # If it starts with eight 0's padding,
331+ # then this AWSTraceHeader contains Datadog injected trace context
332+ logger .debug (
333+ "Found dd-trace injected trace context from AWSTraceHeader"
334+ )
335+ if is_first_record :
336+ context = Context (
337+ trace_id = int (trace_id_parts [2 ][8 :], 16 ),
338+ span_id = int (x_ray_context ["parent_id" ], 16 ),
339+ sampling_priority = float (x_ray_context ["sampled" ]),
340+ )
341+ except Exception as e :
342+ logger .debug ("The trace extractor returned with error %s" , e )
292343
293- if dd_json_data :
294- dd_data = json .loads (dd_json_data )
344+ # Set DSM checkpoint once per record
345+ _dsm_set_checkpoint (dsm_data , event_type , source_arn )
346+ is_first_record = False
295347
296- if is_step_function_event (dd_data ):
297- try :
298- return extract_context_from_step_functions (dd_data , None )
299- except Exception :
300- logger .debug (
301- "Failed to extract Step Functions context from SQS/SNS event."
302- )
303- context = propagator .extract (dd_data )
304- _dsm_set_checkpoint (dd_data , event_type , source_arn )
305- return context
306- else :
307- # Handle case where trace context is injected into attributes.AWSTraceHeader
308- # example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
309- attrs = event .get ("Records" )[0 ].get ("attributes" )
310- if attrs :
311- x_ray_header = attrs .get ("AWSTraceHeader" )
312- if x_ray_header :
313- x_ray_context = parse_xray_header (x_ray_header )
314- trace_id_parts = x_ray_context .get ("trace_id" , "" ).split ("-" )
315- if len (trace_id_parts ) > 2 and trace_id_parts [2 ].startswith (
316- DD_TRACE_JAVA_TRACE_ID_PADDING
317- ):
318- # If it starts with eight 0's padding,
319- # then this AWSTraceHeader contains Datadog injected trace context
320- logger .debug (
321- "Found dd-trace injected trace context from AWSTraceHeader"
322- )
323- return Context (
324- trace_id = int (trace_id_parts [2 ][8 :], 16 ),
325- span_id = int (x_ray_context ["parent_id" ], 16 ),
326- sampling_priority = float (x_ray_context ["sampled" ]),
327- )
328- # Still want to set a DSM checkpoint even if DSM context not propagated
329- _dsm_set_checkpoint (None , event_type , source_arn )
330- return extract_context_from_lambda_context (lambda_context )
331- except Exception as e :
332- logger .debug ("The trace extractor returned with error %s" , e )
333- # Still want to set a DSM checkpoint even if DSM context not propagated
334- _dsm_set_checkpoint (None , event_type , source_arn )
335- return extract_context_from_lambda_context (lambda_context )
348+ return context if context else extract_context_from_lambda_context (lambda_context )
336349
337350
338351def _extract_context_from_eventbridge_sqs_event (event ):
@@ -393,30 +406,44 @@ def extract_context_from_kinesis_event(event, lambda_context):
393406 Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
394407 """
395408 source_arn = ""
396- try :
397- record = get_first_record (event )
398- source_arn = record .get ("eventSourceARN" , "" )
399- kinesis = record .get ("kinesis" )
400- if not kinesis :
401- return extract_context_from_lambda_context (lambda_context )
402- data = kinesis .get ("data" )
403- if data :
404- import base64
405-
406- b64_bytes = data .encode ("ascii" )
407- str_bytes = base64 .b64decode (b64_bytes )
408- data_str = str_bytes .decode ("ascii" )
409- data_obj = json .loads (data_str )
410- dd_ctx = data_obj .get ("_datadog" )
411- if dd_ctx :
412- context = propagator .extract (dd_ctx )
413- _dsm_set_checkpoint (dd_ctx , "kinesis" , source_arn )
414- return context
415- except Exception as e :
416- logger .debug ("The trace extractor returned with error %s" , e )
417- # Still want to set a DSM checkpoint even if DSM context not propagated
418- _dsm_set_checkpoint (None , "kinesis" , source_arn )
419- return extract_context_from_lambda_context (lambda_context )
409+ records = (
410+ [get_first_record (event )]
411+ if not config .data_streams_enabled
412+ else event .get ("Records" )
413+ )
414+ context = None
415+ is_first_record = True
416+ for record in records :
417+ dsm_data = None
418+ try :
419+ source_arn = record .get ("eventSourceARN" , "" )
420+ kinesis = record .get ("kinesis" )
421+ if not kinesis :
422+ context = (
423+ extract_context_from_lambda_context (lambda_context )
424+ if is_first_record
425+ else context
426+ )
427+ is_first_record = False
428+ continue
429+ data = kinesis .get ("data" )
430+ if data :
431+ import base64
432+
433+ b64_bytes = data .encode ("ascii" )
434+ str_bytes = base64 .b64decode (b64_bytes )
435+ data_str = str_bytes .decode ("ascii" )
436+ data_obj = json .loads (data_str )
437+ dd_ctx = data_obj .get ("_datadog" )
438+ if dd_ctx :
439+ if is_first_record :
440+ context = propagator .extract (dd_ctx )
441+ dsm_data = dd_ctx
442+ except Exception as e :
443+ logger .debug ("The trace extractor returned with error %s" , e )
444+ _dsm_set_checkpoint (dsm_data , "kinesis" , source_arn )
445+ is_first_record = False
446+ return context if context else extract_context_from_lambda_context (lambda_context )
420447
421448
422449def _deterministic_sha256_hash (s : str , part : str ) -> int :
0 commit comments