Skip to content

Commit 832a86c

Browse files
committed
addressed review comments - 1
1 parent 5de4424 commit 832a86c

File tree

12 files changed

+51
-52
lines changed

12 files changed

+51
-52
lines changed
Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,18 @@
2323

2424
@Getter
2525
@AllArgsConstructor
26-
public enum GaaSOpenTelemetryMetrics {
27-
GAAS_JOB_STATUS("gaas_job_status", "Gaas job status counter", "1", OpenTelemetryMetricType.LONG_COUNTER),
28-
GAAS_JOB_STATE_LATENCY("gaas_job_state_latency", "Gaas job state latency", "s", OpenTelemetryMetricType.DOUBLE_HISTOGRAM);
26+
public enum GobblinOpenTelemetryMetrics {
27+
/**
28+
* Metric to track the count of Gobblin Jobs for each of its state (GenerateWorkUnit, ProcessWorkUnit, CommitStep).
29+
* Metric Unit: 1 represents each increment will add one data point to the counter.
30+
* */
31+
GOBBLIN_JOB_STATE("gobblin_job_state", "Gobblin job state counter", "1", OpenTelemetryMetricType.LONG_COUNTER),
32+
33+
/**
34+
* Metric to track the latency of each Gobblin Job state (GenerateWorkUnit, ProcessWorkUnit, CommitStep).
35+
* Metric Unit: seconds (s) represents the time taken for each state.
36+
* */
37+
GOBBLIN_JOB_STATE_LATENCY("gobblin_job_state_latency", "Gobblin job state latency", "s", OpenTelemetryMetricType.DOUBLE_HISTOGRAM);
2938

3039
private final String metricName;
3140
private final String metricDescription;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.gobblin.metrics.opentelemetry;
1919

20-
public class GaaSOpenTelemetryMetricsConstants {
20+
public class GobblinOpenTelemetryMetricsConstants {
2121

2222
public static class DimensionKeys {
2323
public static final String STATE = "state";

gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
@UtilityClass
3535
public class OpenTelemetryHelper {
3636

37-
private static final String DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE = "-";
37+
private static final String DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE = "UNKNOWN";
3838

3939
/**
4040
* Returns the provided attribute value when it is non-null and non-empty;
@@ -44,7 +44,7 @@ public class OpenTelemetryHelper {
4444
* @return the original value if not empty, or DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE otherwise
4545
*/
4646
public static String getOrDefaultOpenTelemetryAttrValue(String value) {
47-
return StringUtils.isNotEmpty(value) ? value : DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE;
47+
return StringUtils.defaultIfBlank(value, DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE);
4848
}
4949

5050
/**

gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentation.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
* Provides OpenTelemetry instrumentation for metrics.
4242
*
4343
* <p>Maintains a singleton instance that holds common attributes {@link Attributes} and a Meter {@link Meter}.
44-
* Exposes methods to retrieve or create metric instruments defined in {@link GaaSOpenTelemetryMetrics}.
44+
* Exposes methods to retrieve or create metric instruments defined in {@link GobblinOpenTelemetryMetrics}.
4545
*/
4646
@Slf4j
4747
@Getter
@@ -102,14 +102,14 @@ public static OpenTelemetryInstrumentation getInstance(final Properties props) {
102102
/**
103103
* Retrieves an existing metric by its enum definition or creates it if absent.
104104
*
105-
* @param metric the {@link GaaSOpenTelemetryMetrics} enum defining name, description, unit, and type {@link OpenTelemetryMetricType}
105+
* @param metric the {@link GobblinOpenTelemetryMetrics} enum defining name, description, unit, and type {@link OpenTelemetryMetricType}
106106
* @return an {@link OpenTelemetryMetric} instance corresponding to the provided enum
107107
*/
108-
public OpenTelemetryMetric getOrCreate(GaaSOpenTelemetryMetrics metric) {
108+
public OpenTelemetryMetric getOrCreate(GobblinOpenTelemetryMetrics metric) {
109109
return this.metrics.computeIfAbsent(metric.getMetricName(), name -> createMetric(metric));
110110
}
111111

112-
private OpenTelemetryMetric createMetric(GaaSOpenTelemetryMetrics metric) {
112+
private OpenTelemetryMetric createMetric(GobblinOpenTelemetryMetrics metric) {
113113
String name = metric.getMetricName();
114114
String description = metric.getMetricDescription();
115115
String unit = metric.getMetricUnit();
@@ -157,7 +157,7 @@ private Attributes buildCommonAttributes(final State state) {
157157
return attributesBuilder.build();
158158
}
159159

160-
private String getFlowEdgeId(final State state, String fullFlowEdgeId) {
160+
private static String getFlowEdgeId(final State state, String fullFlowEdgeId) {
161161
// Parse the flowEdgeId from fullFlowEdgeId that is stored in format sourceNode_destinationNode_flowEdgeId
162162
return StringUtils.substringAfter(
163163
StringUtils.substringAfter(fullFlowEdgeId, state.getProp(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "")),

gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricType.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,4 @@ public enum OpenTelemetryMetricType {
2727
/** Represents a metric of type DoubleHistogram. */
2828
DOUBLE_HISTOGRAM;
2929

30-
@Override
31-
public String toString() {
32-
return this.name();
33-
}
34-
3530
}

gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentationTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,11 @@ public void flowEdgeIdParsedCorrectly() {
101101
public void metricsAreCreatedAndCached() {
102102
instrumentation = OpenTelemetryInstrumentation.getInstance(state);
103103
Assert.assertEquals(instrumentation.getMetrics().size(), 0, "Metrics map should be empty initially");
104-
instrumentation.getOrCreate(GaaSOpenTelemetryMetrics.GAAS_JOB_STATUS);
104+
instrumentation.getOrCreate(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE);
105105
Assert.assertEquals(instrumentation.getMetrics().size(), 1, "Metrics map should contain one metric after creation");
106-
instrumentation.getOrCreate(GaaSOpenTelemetryMetrics.GAAS_JOB_STATUS);
106+
instrumentation.getOrCreate(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE);
107107
Assert.assertEquals(instrumentation.getMetrics().size(), 1, "Metrics map should still contain one metric after duplicate creation");
108-
instrumentation.getOrCreate(GaaSOpenTelemetryMetrics.GAAS_JOB_STATE_LATENCY);
108+
instrumentation.getOrCreate(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY);
109109
Assert.assertEquals(instrumentation.getMetrics().size(), 2, "Metrics map should contain two metrics after creating another");
110110
}
111111

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/EmitOTelMetrics.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@
2323
import io.temporal.activity.ActivityInterface;
2424
import io.temporal.activity.ActivityMethod;
2525

26-
import org.apache.gobblin.metrics.opentelemetry.GaaSOpenTelemetryMetrics;
26+
import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
2727

2828
@ActivityInterface
2929
public interface EmitOTelMetrics {
3030

3131
@ActivityMethod
32-
void emitLongCounterMetric(GaaSOpenTelemetryMetrics metric, long value, Map<String, String> attributes, Properties jobProps);
32+
void emitLongCounterMetric(GobblinOpenTelemetryMetrics metric, long value, Map<String, String> attributes, Properties jobProps);
3333

3434
@ActivityMethod
35-
void emitDoubleHistogramMetric(GaaSOpenTelemetryMetrics metric, double value, Map<String, String> attributes, Properties jobProps);
35+
void emitDoubleHistogramMetric(GobblinOpenTelemetryMetrics metric, double value, Map<String, String> attributes, Properties jobProps);
3636
}

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/EmitOTelMetricsImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.Map;
2121
import java.util.Properties;
2222

23-
import org.apache.gobblin.metrics.opentelemetry.GaaSOpenTelemetryMetrics;
23+
import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
2424
import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryDoubleHistogram;
2525
import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryHelper;
2626
import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryInstrumentation;
@@ -31,13 +31,13 @@
3131
public class EmitOTelMetricsImpl implements EmitOTelMetrics {
3232

3333
@Override
34-
public void emitLongCounterMetric(GaaSOpenTelemetryMetrics metric, long value, Map<String, String> attributes, Properties jobProps) {
34+
public void emitLongCounterMetric(GobblinOpenTelemetryMetrics metric, long value, Map<String, String> attributes, Properties jobProps) {
3535
OpenTelemetryLongCounter longCounter = (OpenTelemetryLongCounter) OpenTelemetryInstrumentation.getInstance(jobProps).getOrCreate(metric);
3636
longCounter.add(value, OpenTelemetryHelper.toOpenTelemetryAttributes(attributes));
3737
}
3838

3939
@Override
40-
public void emitDoubleHistogramMetric(GaaSOpenTelemetryMetrics metric, double value, Map<String, String> attributes, Properties jobProps) {
40+
public void emitDoubleHistogramMetric(GobblinOpenTelemetryMetrics metric, double value, Map<String, String> attributes, Properties jobProps) {
4141
OpenTelemetryDoubleHistogram doubleHistogram = (OpenTelemetryDoubleHistogram) OpenTelemetryInstrumentation.getInstance(jobProps).getOrCreate(metric);
4242
doubleHistogram.record(value, OpenTelemetryHelper.toOpenTelemetryAttributes(attributes));
4343
}

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
3636
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
3737
import org.apache.gobblin.configuration.ConfigurationKeys;
38-
import org.apache.gobblin.metrics.opentelemetry.GaaSOpenTelemetryMetrics;
38+
import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
3939
import org.apache.gobblin.metrics.Tag;
4040
import org.apache.gobblin.runtime.JobContext;
4141
import org.apache.gobblin.runtime.JobLauncher;
@@ -54,8 +54,8 @@
5454
import org.apache.gobblin.util.JobLauncherUtils;
5555
import org.apache.gobblin.util.PropertiesUtils;
5656

57-
import static org.apache.gobblin.metrics.opentelemetry.GaaSOpenTelemetryMetricsConstants.DimensionKeys.*;
58-
import static org.apache.gobblin.metrics.opentelemetry.GaaSOpenTelemetryMetricsConstants.DimensionValues.*;
57+
import static org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionKeys.*;
58+
import static org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionValues.*;
5959

6060

6161
/**
@@ -107,17 +107,17 @@ public void submitJob(List<WorkUnit> workunits) {
107107
Map<String, String> attributes = new HashMap<>();
108108
attributes.put(CURR_STATE, JOB_START);
109109
EmitOTelMetricsImpl emitOTelMetrics = new EmitOTelMetricsImpl();
110-
emitOTelMetrics.emitLongCounterMetric(GaaSOpenTelemetryMetrics.GAAS_JOB_STATUS, 1L, attributes, finalProps);
110+
emitOTelMetrics.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE, 1L, attributes, finalProps);
111111

112112
long startTimeMillis = System.currentTimeMillis();
113113
ExecGobblinStats execGobblinStats = workflow.execute(finalProps, eventSubmitterContext);
114114
double timeTaken = (System.currentTimeMillis() - startTimeMillis) / 1000.0;
115115
log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", execGobblinStats);
116116
attributes.put(CURR_STATE, JOB_COMPLETE);
117-
emitOTelMetrics.emitLongCounterMetric(GaaSOpenTelemetryMetrics.GAAS_JOB_STATUS, 1L, attributes, finalProps);
117+
emitOTelMetrics.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE, 1L, attributes, finalProps);
118118
attributes.remove(CURR_STATE);
119119
attributes.put(STATE, JOB_COMPLETE);
120-
emitOTelMetrics.emitDoubleHistogramMetric(GaaSOpenTelemetryMetrics.GAAS_JOB_STATE_LATENCY, timeTaken, attributes, finalProps);
120+
emitOTelMetrics.emitDoubleHistogramMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY, timeTaken, attributes, finalProps);
121121
} catch (Exception e) {
122122
throw new RuntimeException(e);
123123
}

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import lombok.extern.slf4j.Slf4j;
3030

3131
import org.apache.gobblin.metrics.event.TimingEvent;
32-
import org.apache.gobblin.metrics.opentelemetry.GaaSOpenTelemetryMetrics;
32+
import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
3333
import org.apache.gobblin.runtime.DatasetTaskSummary;
3434
import org.apache.gobblin.temporal.ddm.activity.ActivityType;
3535
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
@@ -47,11 +47,6 @@ public class CommitStepWorkflowImpl implements CommitStepWorkflow {
4747
@Override
4848
public CommitStats commit(WUProcessingSpec workSpec, final Properties props) {
4949
final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ActivityType.COMMIT.buildActivityOptions(props, true));
50-
Map<String, String> attributes = new HashMap<>();
51-
attributes.put("currState", "processWUStart");
52-
final EmitOTelMetrics emitOTelMetricsActivityStub = Workflow.newActivityStub(EmitOTelMetrics.class,
53-
ActivityType.EMIT_OTEL_METRICS.buildActivityOptions(props, false));
54-
emitOTelMetricsActivityStub.emitLongCounterMetric(GaaSOpenTelemetryMetrics.GAAS_JOB_STATUS, 1L, attributes, props);
5550
CommitStats commitGobblinStats = activityStub.commit(workSpec);
5651
if (!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) {
5752
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext(), props);

0 commit comments

Comments
 (0)