Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,6 @@ public static Writer createWriter(
"CI Visibility functionality is limited. Please upgrade to Agent v6.40+ or v7.40+ or enable Agentless mode.");
}
}
if (DD_AGENT_WRITER_TYPE.equals(configuredType) && (config.isLlmObsEnabled())) {
featuresDiscovery.discoverIfOutdated();
if (featuresDiscovery.supportsEvpProxy() || config.isLlmObsAgentlessEnabled()) {
configuredType = DD_INTAKE_WRITER_TYPE;
} else {
log.info("LLM Observability functionality is limited.");
// TODO: add supported agent version to this log line for llm obs
}
}

RemoteWriter remoteWriter;
if (DD_INTAKE_WRITER_TYPE.equals(configuredType)) {
Expand Down Expand Up @@ -190,8 +181,21 @@ private static RemoteApi createDDIntakeRemoteApi(
TrackType trackType) {
featuresDiscovery.discoverIfOutdated();
boolean evpProxySupported = featuresDiscovery.supportsEvpProxy();

boolean useLlmObsAgentless = config.isLlmObsAgentlessEnabled() || !evpProxySupported;
if (useLlmObsAgentless && !config.isLlmObsAgentlessEnabled()) {
boolean agentRunning = null != featuresDiscovery.getTraceEndpoint();
log.info(
"LLM Observability configured to use agent proxy, but is not compatible or agent is not running (agentRunning={}, compatible={})",
agentRunning,
evpProxySupported);
log.info(
"LLM Observability will use agentless data submission instead. Compatible agent versions are >=7.55.0 (found version={}",
featuresDiscovery.getVersion());
}

boolean useProxyApi =
(evpProxySupported && TrackType.LLMOBS == trackType && !config.isLlmObsAgentlessEnabled())
(TrackType.LLMOBS == trackType && !useLlmObsAgentless)
|| (evpProxySupported
&& (TrackType.CITESTCOV == trackType || TrackType.CITESTCYCLE == trackType)
&& !config.isCiVisibilityAgentlessEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import static datadog.trace.api.config.TracerConfig.PRIORITIZATION_TYPE
import datadog.communication.ddagent.DDAgentFeaturesDiscovery
import datadog.communication.ddagent.SharedCommunicationObjects
import datadog.trace.api.Config
import datadog.trace.api.intake.TrackType
import datadog.trace.common.sampling.Sampler
import datadog.trace.common.writer.ddagent.DDAgentApi
import datadog.trace.common.writer.ddagent.Prioritization
Expand Down Expand Up @@ -97,6 +98,73 @@ class WriterFactoryTest extends DDSpecification {
"not-found" | false | false | false | DDAgentWriter | [DDAgentApi] | false
}

def "test writer creation for #configuredType when agentHasEvpProxy=#hasEvpProxy llmObsAgentless=#isLlmObsAgentlessEnabled for LLM Observability"() {
setup:
def config = Mock(Config)
config.apiKey >> "my-api-key"
config.agentUrl >> "http://my-agent.url"
config.getEnumValue(PRIORITIZATION_TYPE, _, _) >> Prioritization.FAST_LANE
config.tracerMetricsEnabled >> true
config.isLlmObsEnabled() >> true

// Mock agent info response
def response
if (agentRunning) {
response = buildHttpResponse(hasEvpProxy, true, HttpUrl.parse(config.agentUrl + "/info"))
} else {
response = buildHttpResponseNotOk(HttpUrl.parse(config.agentUrl + "/info"))
}

// Mock HTTP client that simulates delayed response for async feature discovery
def mockCall = Mock(Call)
def mockHttpClient = Mock(OkHttpClient)
mockCall.execute() >> {
// Add a delay
sleep(400)
return response
}
mockHttpClient.newCall(_ as Request) >> mockCall

// Create SharedCommunicationObjects with mocked HTTP client
def sharedComm = new SharedCommunicationObjects()
sharedComm.okHttpClient = mockHttpClient
sharedComm.agentUrl = HttpUrl.parse(config.agentUrl)
sharedComm.createRemaining(config)

def sampler = Mock(Sampler)

when:
config.llmObsAgentlessEnabled >> isLlmObsAgentlessEnabled

def writer = WriterFactory.createWriter(config, sharedComm, sampler, null, HealthMetrics.NO_OP, configuredType)
def llmObsApiClasses = ((RemoteWriter) writer).apis
.stream()
.filter(api -> {
try {
def trackTypeField = api.class.getDeclaredField("trackType")
trackTypeField.setAccessible(true)
return trackTypeField.get(api) == TrackType.LLMOBS
} catch (Exception e) {
return false
}
})
.map(Object::getClass)
.collect(Collectors.toList())

then:
writer.class == expectedWriterClass
llmObsApiClasses == expectedLlmObsApiClasses

where:
configuredType | agentRunning | hasEvpProxy | isLlmObsAgentlessEnabled |expectedWriterClass | expectedLlmObsApiClasses
"DDIntakeWriter" | true | true | false | DDIntakeWriter | [DDEvpProxyApi]
"DDIntakeWriter" | true | false | false | DDIntakeWriter | [DDIntakeApi]
"DDIntakeWriter" | false | false | false | DDIntakeWriter | [DDIntakeApi]
"DDIntakeWriter" | true | true | true | DDIntakeWriter | [DDIntakeApi]
"DDIntakeWriter" | true | false | true | DDIntakeWriter | [DDIntakeApi]
"DDIntakeWriter" | false | false | true | DDIntakeWriter | [DDIntakeApi]
}

Response buildHttpResponse(boolean hasEvpProxy, boolean evpProxySupportsCompression, HttpUrl agentUrl) {
def endpoints = []
if (hasEvpProxy && evpProxySupportsCompression) {
Expand All @@ -120,4 +188,13 @@ class WriterFactoryTest extends DDSpecification {
.body(ResponseBody.create(MediaType.parse("application/json"), new JsonBuilder(response).toString()))
return builder.build()
}

Response buildHttpResponseNotOk(HttpUrl agentUrl) {
def builder = new Response.Builder()
.code(500)
.message("ERROR")
.protocol(Protocol.HTTP_1_1)
.request(new Request.Builder().url(agentUrl.resolve("/info")).build())
return builder.build()
}
}
Loading