-
Notifications
You must be signed in to change notification settings - Fork 986
Add initial instrumentation of kafka connect SinkTask #14478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
...telemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectInstrumentationModule.java
Outdated
Show resolved
Hide resolved
...va/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkTaskInstrumentation.java
Outdated
Show resolved
Hide resolved
...va/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkTaskInstrumentation.java
Outdated
Show resolved
Hide resolved
instrumentation/kafka/kafka-connect-2.6/testing/build.gradle.kts
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I understand correctly that this instrumentation only creates a span that is linked to the producer when the message is consumed by the sink. Or does it do something else like allow for the trace to propagate to where the message is read from where the sink sent it? I don't know anything about kafka connect so sorry if this didn't make sense.
instrumentation/kafka/kafka-connect-2.6/javaagent/build.gradle.kts
Outdated
Show resolved
Hide resolved
instrumentation/kafka/kafka-connect-2.6/javaagent/build.gradle.kts
Outdated
Show resolved
Hide resolved
instrumentation/kafka/kafka-connect-2.6/javaagent/build.gradle.kts
Outdated
Show resolved
Hide resolved
instrumentation/kafka/kafka-connect-2.6/javaagent/build.gradle.kts
Outdated
Show resolved
Hide resolved
@laurit Great question!! Yes, this instrumentation creates a span that is linked to the producer when the message is consumed by the sink. However, it depends on the specific connector implementation when it comes to automatic trace propagation to downstream databases. If we have instrumentation for the database operation that the connector uses, automatic trace propagation happens; if not, the trace is not propagated beyond the Kafka Connect span. Examples:JDBC Kafka Connector: Uses MongoDB Kafka Connector: Uses Cosmos DB Connector: Uses Cosmos DB SDK calls. Since Cosmos DB doesn't have OpenTelemetry instrumentation, the trace stops at the Kafka Connect span, but span links to producers are preserved. Summary: This instrumentation provides span linking (connecting to producer spans) and sets up the foundation for trace propagation by making the Kafka Connect span the active context. Whether downstream spans are created depends entirely on whether the specific database operations used by each connector are instrumented by OpenTelemetry. Let me know if this answers your question!! |
Generally we don't expect databases to propagate context. Even when the context is propagated to the database the intention is to tag the database query. For example context propagation to sql database could be used to get the trace id for a slow query so you could track down what executed that query.
You are misreading this. https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/jdbc/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jdbc/PreparedStatementInstrumentation.java#L58 excludes these methods because they are instrumented by Line 57 in eb7ea9e
I'm wondering why you chose to create an internal span, wouldn't a consumer span be more appropriate? |
📋 PR Description
Overview
This PR adds OpenTelemetry instrumentation for Apache Kafka Connect SinkTask operations, providing observability into data pipeline processing from Kafka topics to external systems.
What This Adds
SinkTask.put()
method: Creates spans for batch processing operationsKey Features
🔍 Span Creation
KafkaConnect.put
INTERNAL
thread.name
,thread.id
)🔗 Trace Context Propagation
TextMapPropagator
🧪 Testing Approach
Uses service-side instrumentation testing pattern (similar to JMX metrics instrumentation):
Technical Implementation
Instrumentation Details
Context Extraction
PropagatorBasedSpanLinksExtractor
for creating span linksSinkRecordHeadersGetter
for extracting headers from Kafka recordsClassloader Considerations
Kafka Connect's
PluginClassLoader
isolation required the Singletons Static Reference pattern:Testing Strategy
Why Not
InstrumentationExtension
?Unlike client-side instrumentations, Kafka Connect runs as a separate service in its own JVM. The standard
InstrumentationExtension
cannot collect spans from containerized services, requiring an OTLP-based approach.Test Architecture
Test Coverage
Performance Considerations
put()
method during batch processingFuture Enhancements
SourceTask
instrumentation)Testing Instructions
Related Issues
Thanks in advance and please let me know your thoughts.