Skip to content

Conversation

vasantteja
Copy link

📋 PR Description

Overview

This PR adds OpenTelemetry instrumentation for Apache Kafka Connect SinkTask operations, providing observability into data pipeline processing from Kafka topics to external systems.

What This Adds

  • Instrumentation for SinkTask.put() method: Creates spans for batch processing operations
  • Trace context propagation: Links producer spans to sink processing spans using Kafka message headers
  • Comprehensive integration tests: Docker-based tests with MongoDB and PostgreSQL sinks
  • Support for Kafka Connect 2.6+: Compatible with modern Kafka Connect deployments

Key Features

🔍 Span Creation

  • Span name: KafkaConnect.put
  • Span kind: INTERNAL
  • Attributes: Thread information (thread.name, thread.id)
  • Links: Creates span links to producer spans extracted from Kafka message headers

🔗 Trace Context Propagation

  • Extracts trace context from Kafka message headers using TextMapPropagator
  • Links sink processing spans to original producer spans for end-to-end tracing
  • Handles batch processing scenarios with multiple records

🧪 Testing Approach

Uses service-side instrumentation testing pattern (similar to JMX metrics instrumentation):

  • Docker containers for Kafka Connect, Kafka, and sink databases
  • OTLP backend to collect spans from containerized services
  • Integration tests with real MongoDB and PostgreSQL connectors
  • Robust container lifecycle management with proper cleanup

Technical Implementation

Instrumentation Details

// Instruments all SinkTask implementations
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
  return hasSuperType(named("org.apache.kafka.connect.sink.SinkTask"));
}

// Instruments the put() method
@Override
public void transform(TypeTransformer transformer) {
  transformer.applyAdviceToMethod(
    isPublic().and(named("put")).and(takesArgument(0, Collection.class)),
    SinkTaskInstrumentation.class.getName() + "$SinkTaskPutAdvice");
}

Context Extraction

  • Uses PropagatorBasedSpanLinksExtractor for creating span links
  • Implements SinkRecordHeadersGetter for extracting headers from Kafka records
  • Handles classloader isolation challenges in Kafka Connect plugin architecture

Classloader Considerations

Kafka Connect's PluginClassLoader isolation required the Singletons Static Reference pattern:

// Access TextMapGetter through singletons to avoid classloader issues
Context extractedContext = KafkaConnectSingletons.propagator()
    .extract(parentContext, firstRecord, KafkaConnectSingletons.sinkRecordHeaderGetter());

Testing Strategy

Why Not InstrumentationExtension?

Unlike client-side instrumentations, Kafka Connect runs as a separate service in its own JVM. The standard InstrumentationExtension cannot collect spans from containerized services, requiring an OTLP-based approach.

Test Architecture

┌─────────────────┐    ┌──────────────┐    ┌─────────────────┐
│   Test JVM      │    │ Kafka Connect│    │  OTLP Backend   │
│                 │    │  Container    │    │   Container     │
│ • Produces msgs │───▶│ • Processes   │───▶│ • Collects      │
│ • Verifies spans│    │ • Instruments │    │   spans         │
└─────────────────┘    └──────────────┘    └─────────────────┘

Test Coverage

  • MongoDB Sink: Tests with MongoDB connector and document insertion
  • PostgreSQL Sink: Tests with JDBC connector and table insertion
  • Span verification: Validates span names, attributes, and trace linking
  • Container lifecycle: Proper setup, execution, and cleanup
  • ARM64 compatibility: Handles Docker architecture emulation with increased timeouts

Performance Considerations

  • Minimal overhead: Only instruments the put() method during batch processing
  • Efficient context extraction: Extracts context only from the first record in batch
  • Proper resource cleanup: All containers and connections are properly closed

Future Enhancements

  • Support for source tasks (SourceTask instrumentation)

Testing Instructions

# Run all Kafka Connect tests
./gradlew :instrumentation:kafka:kafka-connect-2.6:testing:test

# Run specific tests
./gradlew :instrumentation:kafka:kafka-connect-2.6:testing:test --tests="*MongoKafkaConnectSinkTaskTest*"
./gradlew :instrumentation:kafka:kafka-connect-2.6:testing:test --tests="*PostgresKafkaConnectSinkTaskTest*"

# Build the instrumentation
./gradlew :instrumentation:kafka:kafka-connect-2.6:javaagent:build

Related Issues

  1. Multiple Trace-ids are generated for Kafka sink connector  #12322
  2. Traces are not propagated from kafka connect to mongoDB  #12261

Thanks in advance and please let me know your thoughts.

@vasantteja vasantteja requested a review from a team as a code owner August 20, 2025 21:49
Copy link
Contributor

@laurit laurit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that this instrumentation only creates a span that is linked to the producer when the message is consumed by the sink. Or does it do something else like allow for the trace to propagate to where the message is read from where the sink sent it? I don't know anything about kafka connect so sorry if this didn't make sense.

@vasantteja
Copy link
Author

vasantteja commented Aug 28, 2025

Do I understand correctly that this instrumentation only creates a span that is linked to the producer when the message is consumed by the sink. Or does it do something else like allow for the trace to propagate to where the message is read from where the sink sent it? I don't know anything about kafka connect so sorry if this didn't make sense.

@laurit Great question!! Yes, this instrumentation creates a span that is linked to the producer when the message is consumed by the sink. However, it depends on the specific connector implementation when it comes to automatic trace propagation to downstream databases. If we have instrumentation for the database operation that the connector uses, automatic trace propagation happens; if not, the trace is not propagated beyond the Kafka Connect span.

Examples:

JDBC Kafka Connector: Uses PreparedStatement.executeBatch(). Since PreparedStatement.executeBatch() is explicitly excluded from JDBC instrumentation, we don't see a parent-child relationship between the Kafka Connect span and the database operation.

MongoDB Kafka Connector: Uses collection.bulkWrite(). We don't have instrumentation for the bulkWrite() function (MongoDB instrumentation only covers wire protocol commands like insert, update, delete), hence we will not see parent-child relationships between the Kafka Connect span and the resulting MongoDB spans.

Cosmos DB Connector: Uses Cosmos DB SDK calls. Since Cosmos DB doesn't have OpenTelemetry instrumentation, the trace stops at the Kafka Connect span, but span links to producers are preserved.

Summary: This instrumentation provides span linking (connecting to producer spans) and sets up the foundation for trace propagation by making the Kafka Connect span the active context. Whether downstream spans are created depends entirely on whether the specific database operations used by each connector are instrumented by OpenTelemetry.

Let me know if this answers your question!!

@laurit
Copy link
Contributor

laurit commented Sep 2, 2025

@laurit Great question!! Yes, this instrumentation creates a span that is linked to the producer when the message is consumed by the sink. However, it depends on the specific connector implementation when it comes to automatic trace propagation to downstream databases. If we have instrumentation for the database operation that the connector uses, automatic trace propagation happens; if not, the trace is not propagated beyond the Kafka Connect span.

Generally we don't expect databases to propagate context. Even when the context is propagated to the database the intention is to tag the database query. For example context propagation to sql database could be used to get the trace id for a slow query so you could track down what executed that query.

Examples:

JDBC Kafka Connector: Uses PreparedStatement.executeBatch(). Since PreparedStatement.executeBatch() is explicitly excluded from JDBC instrumentation, we don't see a parent-child relationship between the Kafka Connect span and the database operation.

You are misreading this. https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/jdbc/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jdbc/PreparedStatementInstrumentation.java#L58 excludes these methods because they are instrumented by

Summary: This instrumentation provides span linking (connecting to producer spans) and sets up the foundation for trace propagation by making the Kafka Connect span the active context. Whether downstream spans are created depends entirely on whether the specific database operations used by each connector are instrumented by OpenTelemetry.

I'm wondering why you chose to create an internal span, wouldn't a consumer span be more appropriate?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants