Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,24 @@ public class DDAgentFeaturesDiscovery implements DroppingPolicy {
private final String[] evpProxyEndpoints = {V4_EVP_PROXY_ENDPOINT, V2_EVP_PROXY_ENDPOINT};
private final String[] telemetryProxyEndpoints = {TELEMETRY_PROXY_ENDPOINT};

private volatile String traceEndpoint;
private volatile String metricsEndpoint;
private volatile String dataStreamsEndpoint;
private volatile boolean supportsLongRunning;
private volatile boolean supportsDropping;
private volatile String state;
private volatile String configEndpoint;
private volatile String debuggerEndpoint;
private volatile String debuggerDiagnosticsEndpoint;
private volatile String evpProxyEndpoint;
private volatile String version;
private volatile String telemetryProxyEndpoint;
private volatile Set<String> peerTags = emptySet();

private long lastTimeDiscovered;
private static class State {
String traceEndpoint;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't put those final to avoid having a too heavy refactoring to do on this sensible part

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this stuff should basically be an immutable record...

String metricsEndpoint;
String dataStreamsEndpoint;
boolean supportsLongRunning;
boolean supportsDropping;
String state;
String configEndpoint;
String debuggerEndpoint;
String debuggerDiagnosticsEndpoint;
String evpProxyEndpoint;
String version;
String telemetryProxyEndpoint;
Set<String> peerTags = emptySet();
long lastTimeDiscovered;
}

private volatile State discoveryState;

public DDAgentFeaturesDiscovery(
OkHttpClient client,
Expand All @@ -111,23 +114,7 @@ public DDAgentFeaturesDiscovery(
? new String[] {V5_ENDPOINT, V4_ENDPOINT, V3_ENDPOINT}
: new String[] {V4_ENDPOINT, V3_ENDPOINT};
this.discoveryTimer = monitoring.newTimer("trace.agent.discovery.time");
}

private void reset() {
traceEndpoint = null;
metricsEndpoint = null;
supportsDropping = false;
supportsLongRunning = false;
state = null;
configEndpoint = null;
debuggerEndpoint = null;
debuggerDiagnosticsEndpoint = null;
dataStreamsEndpoint = null;
evpProxyEndpoint = null;
version = null;
lastTimeDiscovered = 0;
telemetryProxyEndpoint = null;
peerTags = emptySet();
this.discoveryState = new State();
}

/** Run feature discovery, unconditionally. */
Expand All @@ -146,15 +133,17 @@ protected long getFeaturesDiscoveryMinDelayMillis() {

private synchronized void discoverIfOutdated(final long maxElapsedMs) {
final long now = System.currentTimeMillis();
final long elapsed = now - lastTimeDiscovered;
final long elapsed = now - discoveryState.lastTimeDiscovered;
if (elapsed > maxElapsedMs) {
doDiscovery();
lastTimeDiscovered = now;
final State newState = new State();
doDiscovery(newState);
newState.lastTimeDiscovered = now;
// swap atomically states
discoveryState = newState;
}
}

private void doDiscovery() {
reset();
private void doDiscovery(State newState) {
// 1. try to fetch info about the agent, if the endpoint is there
// 2. try to parse the response, if it can be parsed, finish
// 3. fallback if the endpoint couldn't be found or the response couldn't be parsed
Expand All @@ -169,44 +158,44 @@ private void doDiscovery() {
try (Response response = client.newCall(requestBuilder.build()).execute()) {
if (response.isSuccessful()) {
processInfoResponseHeaders(response);
fallback = !processInfoResponse(response.body().string());
fallback = !processInfoResponse(newState, response.body().string());
}
} catch (Throwable error) {
errorQueryingEndpoint("info", error);
}
if (fallback) {
supportsDropping = false;
supportsLongRunning = false;
newState.supportsDropping = false;
newState.supportsLongRunning = false;
log.debug("Falling back to probing, client dropping will be disabled");
// disable metrics unless the info endpoint is present, which prevents
// sending metrics to 7.26.0, which has a bug in reporting metric origin
metricsEndpoint = null;
newState.metricsEndpoint = null;
}

// don't want to rewire the traces pipeline
if (null == traceEndpoint) {
traceEndpoint = probeTracesEndpoint(traceEndpoints);
} else if (state == null || state.isEmpty()) {
if (null == newState.traceEndpoint) {
newState.traceEndpoint = probeTracesEndpoint(newState, traceEndpoints);
} else if (newState.state == null || newState.state.isEmpty()) {
// Still need to probe so that state is correctly assigned
probeTracesEndpoint(new String[] {traceEndpoint});
probeTracesEndpoint(newState, new String[] {newState.traceEndpoint});
}
}

if (log.isDebugEnabled()) {
log.debug(
"discovered traceEndpoint={}, metricsEndpoint={}, supportsDropping={}, supportsLongRunning={}, dataStreamsEndpoint={}, configEndpoint={}, evpProxyEndpoint={}, telemetryProxyEndpoint={}",
traceEndpoint,
metricsEndpoint,
supportsDropping,
supportsLongRunning,
dataStreamsEndpoint,
configEndpoint,
evpProxyEndpoint,
telemetryProxyEndpoint);
newState.traceEndpoint,
newState.metricsEndpoint,
newState.supportsDropping,
newState.supportsLongRunning,
newState.dataStreamsEndpoint,
newState.configEndpoint,
newState.evpProxyEndpoint,
newState.telemetryProxyEndpoint);
}
}

private String probeTracesEndpoint(String[] endpoints) {
private String probeTracesEndpoint(State newState, String[] endpoints) {
for (String candidate : endpoints) {
try (Response response =
client
Expand All @@ -219,7 +208,7 @@ private String probeTracesEndpoint(String[] endpoints) {
.build())
.execute()) {
if (response.code() != 404) {
state = response.header(DATADOG_AGENT_STATE);
newState.state = response.header(DATADOG_AGENT_STATE);
return candidate;
}
} catch (Throwable e) {
Expand All @@ -243,11 +232,11 @@ private void processInfoResponseHeaders(Response response) {
}

@SuppressWarnings("unchecked")
private boolean processInfoResponse(String response) {
private boolean processInfoResponse(State newState, String response) {
try {
Map<String, Object> map = RESPONSE_ADAPTER.fromJson(response);
discoverStatsDPort(map);
version = (String) map.get("version");
newState.version = (String) map.get("version");
Set<String> endpoints = new HashSet<>((List<String>) map.get("endpoints"));

String foundMetricsEndpoint = null;
Expand All @@ -261,18 +250,18 @@ private boolean processInfoResponse(String response) {
}

// This is done outside of the loop to set metricsEndpoint to null if not found
metricsEndpoint = foundMetricsEndpoint;
newState.metricsEndpoint = foundMetricsEndpoint;

for (String endpoint : traceEndpoints) {
if (containsEndpoint(endpoints, endpoint)) {
traceEndpoint = endpoint;
newState.traceEndpoint = endpoint;
break;
}
}

for (String endpoint : configEndpoints) {
if (containsEndpoint(endpoints, endpoint)) {
configEndpoint = endpoint;
newState.configEndpoint = endpoint;
break;
}
}
Expand All @@ -281,56 +270,58 @@ private boolean processInfoResponse(String response) {
// intake
// because older agents support diagnostics, we fallback to it before falling back to v1
if (containsEndpoint(endpoints, DEBUGGER_ENDPOINT_V2)) {
debuggerEndpoint = DEBUGGER_ENDPOINT_V2;
newState.debuggerEndpoint = DEBUGGER_ENDPOINT_V2;
} else if (containsEndpoint(endpoints, DEBUGGER_DIAGNOSTICS_ENDPOINT)) {
debuggerEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT;
newState.debuggerEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT;
} else if (containsEndpoint(endpoints, DEBUGGER_ENDPOINT_V1)) {
debuggerEndpoint = DEBUGGER_ENDPOINT_V1;
newState.debuggerEndpoint = DEBUGGER_ENDPOINT_V1;
}
if (containsEndpoint(endpoints, DEBUGGER_DIAGNOSTICS_ENDPOINT)) {
debuggerDiagnosticsEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT;
newState.debuggerDiagnosticsEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT;
}

for (String endpoint : dataStreamsEndpoints) {
if (containsEndpoint(endpoints, endpoint)) {
dataStreamsEndpoint = endpoint;
newState.dataStreamsEndpoint = endpoint;
break;
}
}

for (String endpoint : evpProxyEndpoints) {
if (containsEndpoint(endpoints, endpoint)) {
evpProxyEndpoint = endpoint;
newState.evpProxyEndpoint = endpoint;
break;
}
}

for (String endpoint : telemetryProxyEndpoints) {
if (containsEndpoint(endpoints, endpoint)) {
telemetryProxyEndpoint = endpoint;
newState.telemetryProxyEndpoint = endpoint;
break;
}
}

supportsLongRunning = Boolean.TRUE.equals(map.getOrDefault("long_running_spans", false));
newState.supportsLongRunning =
Boolean.TRUE.equals(map.getOrDefault("long_running_spans", false));

if (metricsEnabled) {
Object canDrop = map.get("client_drop_p0s");
supportsDropping =
newState.supportsDropping =
null != canDrop
&& ("true".equalsIgnoreCase(String.valueOf(canDrop))
|| Boolean.TRUE.equals(canDrop));

Object peer_tags = map.get("peer_tags");
peerTags =
newState.peerTags =
peer_tags instanceof List
? unmodifiableSet(new HashSet<>((List<String>) peer_tags))
: emptySet();
}
try {
state = Strings.sha256(response);
newState.state = Strings.sha256(response);
} catch (NoSuchAlgorithmException ex) {
log.debug("Failed to hash trace agent /info response. Will probe {}", traceEndpoint, ex);
log.debug(
"Failed to hash trace agent /info response. Will probe {}", newState.traceEndpoint, ex);
}
return true;
} catch (Throwable error) {
Expand Down Expand Up @@ -364,88 +355,89 @@ private static void discoverStatsDPort(final Map<String, Object> info) {
}

public boolean supportsMetrics() {
return metricsEnabled && null != metricsEndpoint;
return metricsEnabled && null != discoveryState.metricsEndpoint;
}

public boolean supportsDebugger() {
return debuggerEndpoint != null;
return discoveryState.debuggerEndpoint != null;
}

public String getDebuggerEndpoint() {
return debuggerEndpoint;
return discoveryState.debuggerEndpoint;
}

public boolean supportsDebuggerDiagnostics() {
return debuggerDiagnosticsEndpoint != null;
return discoveryState.debuggerDiagnosticsEndpoint != null;
}

public boolean supportsDropping() {
return supportsDropping;
return discoveryState.supportsDropping;
}

public boolean supportsLongRunning() {
return supportsLongRunning;
return discoveryState.supportsLongRunning;
}

public Set<String> peerTags() {
return peerTags;
return discoveryState.peerTags;
}

public String getMetricsEndpoint() {
return metricsEndpoint;
return discoveryState.metricsEndpoint;
}

public String getTraceEndpoint() {
return traceEndpoint;
return discoveryState.traceEndpoint;
}

public String getDataStreamsEndpoint() {
return dataStreamsEndpoint;
return discoveryState.dataStreamsEndpoint;
}

public String getEvpProxyEndpoint() {
return evpProxyEndpoint;
return discoveryState.evpProxyEndpoint;
}

public HttpUrl buildUrl(String endpoint) {
return agentBaseUrl.resolve(endpoint);
}

public boolean supportsDataStreams() {
return dataStreamsEndpoint != null;
return discoveryState.dataStreamsEndpoint != null;
}

public boolean supportsEvpProxy() {
return evpProxyEndpoint != null;
return discoveryState.evpProxyEndpoint != null;
}

public boolean supportsContentEncodingHeadersWithEvpProxy() {
// content encoding headers are supported in /v4 and above
final String evpProxyEndpoint = discoveryState.evpProxyEndpoint;
return evpProxyEndpoint != null && V4_EVP_PROXY_ENDPOINT.compareTo(evpProxyEndpoint) <= 0;
}

public String getConfigEndpoint() {
return configEndpoint;
return discoveryState.configEndpoint;
}

public String getVersion() {
return version;
return discoveryState.version;
}

private void errorQueryingEndpoint(String endpoint, Throwable t) {
log.debug(LogCollector.EXCLUDE_TELEMETRY, "Error querying {} at {}", endpoint, agentBaseUrl, t);
}

public String state() {
return state;
return discoveryState.state;
}

@Override
public boolean active() {
return supportsMetrics() && supportsDropping;
return supportsMetrics() && discoveryState.supportsDropping;
}

public boolean supportsTelemetryProxy() {
return telemetryProxyEndpoint != null;
return discoveryState.telemetryProxyEndpoint != null;
}
}
Loading