18
18
19
19
import static com .google .cloud .datastore .telemetry .TraceUtil .ATTRIBUTES_KEY_DEFERRED ;
20
20
import static com .google .cloud .datastore .telemetry .TraceUtil .ATTRIBUTES_KEY_DOCUMENT_COUNT ;
21
- import static com .google .cloud .datastore .telemetry .TraceUtil .ATTRIBUTES_KEY_EXCEPTION_MESSAGE ;
22
- import static com .google .cloud .datastore .telemetry .TraceUtil .ATTRIBUTES_KEY_EXCEPTION_STACKTRACE ;
23
- import static com .google .cloud .datastore .telemetry .TraceUtil .ATTRIBUTES_KEY_EXCEPTION_TYPE ;
24
21
import static com .google .cloud .datastore .telemetry .TraceUtil .ATTRIBUTES_KEY_MISSING ;
25
22
import static com .google .cloud .datastore .telemetry .TraceUtil .ATTRIBUTES_KEY_MORE_RESULTS ;
26
23
import static com .google .cloud .datastore .telemetry .TraceUtil .ATTRIBUTES_KEY_READ_CONSISTENCY ;
37
34
import com .google .cloud .ServiceOptions ;
38
35
import com .google .cloud .datastore .execution .AggregationQueryExecutor ;
39
36
import com .google .cloud .datastore .spi .v1 .DatastoreRpc ;
37
+ import com .google .cloud .datastore .telemetry .TraceUtil ;
40
38
import com .google .cloud .datastore .telemetry .TraceUtil .Scope ;
41
39
import com .google .common .base .MoreObjects ;
42
40
import com .google .common .base .Preconditions ;
43
- import com .google .common .base .Throwables ;
44
41
import com .google .common .collect .AbstractIterator ;
45
42
import com .google .common .collect .ImmutableList ;
46
43
import com .google .common .collect .ImmutableMap ;
53
50
import com .google .datastore .v1 .RunQueryResponse ;
54
51
import com .google .datastore .v1 .TransactionOptions ;
55
52
import com .google .protobuf .ByteString ;
56
- import io .opentelemetry .api .common .Attributes ;
57
- import io .opentelemetry .api .trace .SpanBuilder ;
58
- import io .opentelemetry .api .trace .SpanKind ;
59
- import io .opentelemetry .api .trace .StatusCode ;
60
53
import io .opentelemetry .context .Context ;
61
54
import java .util .ArrayList ;
62
55
import java .util .Arrays ;
@@ -115,25 +108,24 @@ public Transaction newTransaction() {
115
108
return new TransactionImpl (this );
116
109
}
117
110
118
- static class ReadWriteTransactionCallable <T > implements Callable <T > {
119
-
111
+ static class TracedReadWriteTransactionCallable <T > implements Callable <T > {
120
112
private final Datastore datastore ;
121
113
private final TransactionCallable <T > callable ;
122
114
private volatile TransactionOptions options ;
123
115
private volatile Transaction transaction ;
124
116
125
- private final com . google . cloud . datastore . telemetry . TraceUtil .SpanContext parentSpanContext ;
117
+ private final TraceUtil .Span parentSpan ;
126
118
127
- ReadWriteTransactionCallable (
119
+ TracedReadWriteTransactionCallable (
128
120
Datastore datastore ,
129
121
TransactionCallable <T > callable ,
130
122
TransactionOptions options ,
131
- @ Nullable com .google .cloud .datastore .telemetry .TraceUtil .SpanContext parentSpanContext ) {
123
+ @ Nullable com .google .cloud .datastore .telemetry .TraceUtil .Span parentSpan ) {
132
124
this .datastore = datastore ;
133
125
this .callable = callable ;
134
126
this .options = options ;
135
127
this .transaction = null ;
136
- this .parentSpanContext = parentSpanContext ;
128
+ this .parentSpan = parentSpan ;
137
129
}
138
130
139
131
Datastore getDatastore () {
@@ -154,57 +146,75 @@ void setPrevTransactionId(ByteString transactionId) {
154
146
options = options .toBuilder ().setReadWrite (readWrite ).build ();
155
147
}
156
148
157
- private io .opentelemetry .api .trace .Span startSpanWithParentContext (
158
- String spanName ,
159
- com .google .cloud .datastore .telemetry .TraceUtil .SpanContext parentSpanContext ) {
160
- com .google .cloud .datastore .telemetry .TraceUtil otelTraceUtil =
161
- datastore .getOptions ().getTraceUtil ();
162
- SpanBuilder spanBuilder =
163
- otelTraceUtil
164
- .getTracer ()
165
- .spanBuilder (com .google .cloud .datastore .telemetry .TraceUtil .SPAN_NAME_TRANSACTION_RUN )
166
- .setSpanKind (SpanKind .PRODUCER )
167
- .setParent (
168
- Context .current ()
169
- .with (
170
- io .opentelemetry .api .trace .Span .wrap (
171
- parentSpanContext .getSpanContext ())));
172
- return otelTraceUtil .addSettingsAttributesToCurrentSpan (spanBuilder ).startSpan ();
149
+ @ Override
150
+ public T call () throws DatastoreException {
151
+ try (io .opentelemetry .context .Scope ignored =
152
+ Context .current ().with (parentSpan .getSpan ()).makeCurrent ()) {
153
+ transaction = datastore .newTransaction (options );
154
+ T value = callable .run (transaction );
155
+ transaction .commit ();
156
+ return value ;
157
+ } catch (Exception ex ) {
158
+ transaction .rollback ();
159
+ throw DatastoreException .propagateUserException (ex );
160
+ } finally {
161
+ if (transaction .isActive ()) {
162
+ transaction .rollback ();
163
+ }
164
+ if (options != null
165
+ && options .getModeCase ().equals (TransactionOptions .ModeCase .READ_WRITE )) {
166
+ setPrevTransactionId (transaction .getTransactionId ());
167
+ }
168
+ }
169
+ }
170
+ }
171
+
172
+ static class ReadWriteTransactionCallable <T > implements Callable <T > {
173
+ private final Datastore datastore ;
174
+ private final TransactionCallable <T > callable ;
175
+ private volatile TransactionOptions options ;
176
+ private volatile Transaction transaction ;
177
+
178
+ ReadWriteTransactionCallable (
179
+ Datastore datastore , TransactionCallable <T > callable , TransactionOptions options ) {
180
+ this .datastore = datastore ;
181
+ this .callable = callable ;
182
+ this .options = options ;
183
+ this .transaction = null ;
184
+ }
185
+
186
+ Datastore getDatastore () {
187
+ return datastore ;
188
+ }
189
+
190
+ TransactionOptions getOptions () {
191
+ return options ;
192
+ }
193
+
194
+ Transaction getTransaction () {
195
+ return transaction ;
196
+ }
197
+
198
+ void setPrevTransactionId (ByteString transactionId ) {
199
+ TransactionOptions .ReadWrite readWrite =
200
+ TransactionOptions .ReadWrite .newBuilder ().setPreviousTransaction (transactionId ).build ();
201
+ options = options .toBuilder ().setReadWrite (readWrite ).build ();
173
202
}
174
203
175
204
@ Override
176
205
public T call () throws DatastoreException {
177
- // TODO Instead of using OTel Spans directly, TraceUtil.Span should be used here. However,
178
- // the same code in startSpanInternal doesn't work when EnabledTraceUtil.StartSpan is called
179
- // probably because of some thread-local caching that is getting lost. This needs more
180
- // debugging. The code below works and is idiomatic but could be prettier and more consistent
181
- // with the use of TraceUtil-provided framework.
182
- io .opentelemetry .api .trace .Span span =
183
- startSpanWithParentContext (
184
- com .google .cloud .datastore .telemetry .TraceUtil .SPAN_NAME_TRANSACTION_RUN ,
185
- parentSpanContext );
186
- try (io .opentelemetry .context .Scope ignored = span .makeCurrent ()) {
206
+ try {
187
207
transaction = datastore .newTransaction (options );
188
208
T value = callable .run (transaction );
189
209
transaction .commit ();
190
210
return value ;
191
211
} catch (Exception ex ) {
192
212
transaction .rollback ();
193
- span .setStatus (StatusCode .ERROR , ex .getMessage ());
194
- span .recordException (
195
- ex ,
196
- Attributes .builder ()
197
- .put (ATTRIBUTES_KEY_EXCEPTION_MESSAGE , ex .getMessage ())
198
- .put (ATTRIBUTES_KEY_EXCEPTION_TYPE , ex .getClass ().getName ())
199
- .put (ATTRIBUTES_KEY_EXCEPTION_STACKTRACE , Throwables .getStackTraceAsString (ex ))
200
- .build ());
201
- span .end ();
202
213
throw DatastoreException .propagateUserException (ex );
203
214
} finally {
204
215
if (transaction .isActive ()) {
205
216
transaction .rollback ();
206
217
}
207
- span .end ();
208
218
if (options != null
209
219
&& options .getModeCase ().equals (TransactionOptions .ModeCase .READ_WRITE )) {
210
220
setPrevTransactionId (transaction .getTransactionId ());
@@ -215,30 +225,51 @@ public T call() throws DatastoreException {
215
225
216
226
@ Override
217
227
public <T > T runInTransaction (final TransactionCallable <T > callable ) {
218
- try {
228
+ TraceUtil .Span span =
229
+ otelTraceUtil .startSpan (
230
+ com .google .cloud .datastore .telemetry .TraceUtil .SPAN_NAME_TRANSACTION_RUN );
231
+ Callable <T > transactionCallable =
232
+ (getOptions ().getOpenTelemetryOptions ().isEnabled ()
233
+ ? new TracedReadWriteTransactionCallable <T >(
234
+ this , callable , /*transactionOptions=*/ null , span )
235
+ : new ReadWriteTransactionCallable <T >(this , callable , /*transactionOptions=*/ null ));
236
+ try (Scope ignored = span .makeCurrent ()) {
219
237
return RetryHelper .runWithRetries (
220
- new ReadWriteTransactionCallable <T >(
221
- this , callable , null , otelTraceUtil .getCurrentSpanContext ()),
238
+ transactionCallable ,
222
239
retrySettings ,
223
240
TRANSACTION_EXCEPTION_HANDLER ,
224
241
getOptions ().getClock ());
225
242
} catch (RetryHelperException e ) {
243
+ span .end (e );
226
244
throw DatastoreException .translateAndThrow (e );
245
+ } finally {
246
+ span .end ();
227
247
}
228
248
}
229
249
230
250
@ Override
231
251
public <T > T runInTransaction (
232
252
final TransactionCallable <T > callable , TransactionOptions transactionOptions ) {
233
- try {
253
+ TraceUtil .Span span =
254
+ otelTraceUtil .startSpan (
255
+ com .google .cloud .datastore .telemetry .TraceUtil .SPAN_NAME_TRANSACTION_RUN );
256
+
257
+ Callable <T > transactionCallable =
258
+ (getOptions ().getOpenTelemetryOptions ().isEnabled ()
259
+ ? new TracedReadWriteTransactionCallable <T >(this , callable , transactionOptions , span )
260
+ : new ReadWriteTransactionCallable <T >(this , callable , transactionOptions ));
261
+
262
+ try (Scope ignored = span .makeCurrent ()) {
234
263
return RetryHelper .runWithRetries (
235
- new ReadWriteTransactionCallable <T >(
236
- this , callable , transactionOptions , otelTraceUtil .getCurrentSpanContext ()),
264
+ transactionCallable ,
237
265
retrySettings ,
238
266
TRANSACTION_EXCEPTION_HANDLER ,
239
267
getOptions ().getClock ());
240
268
} catch (RetryHelperException e ) {
269
+ span .end (e );
241
270
throw DatastoreException .translateAndThrow (e );
271
+ } finally {
272
+ span .end ();
242
273
}
243
274
}
244
275
@@ -747,8 +778,7 @@ com.google.datastore.v1.BeginTransactionResponse beginTransaction(
747
778
final com .google .datastore .v1 .BeginTransactionRequest requestPb ) {
748
779
com .google .cloud .datastore .telemetry .TraceUtil .Span span =
749
780
otelTraceUtil .startSpan (
750
- com .google .cloud .datastore .telemetry .TraceUtil .SPAN_NAME_BEGIN_TRANSACTION ,
751
- otelTraceUtil .getCurrentSpanContext ());
781
+ com .google .cloud .datastore .telemetry .TraceUtil .SPAN_NAME_BEGIN_TRANSACTION );
752
782
try (com .google .cloud .datastore .telemetry .TraceUtil .Scope scope = span .makeCurrent ()) {
753
783
return RetryHelper .runWithRetries (
754
784
() -> datastoreRpc .beginTransaction (requestPb ),
0 commit comments