41
41
import io .grpc .Status .Code ;
42
42
import java .io .IOException ;
43
43
import java .util .Map ;
44
+ import java .util .concurrent .ExecutorService ;
45
+ import java .util .concurrent .Executors ;
44
46
import java .util .concurrent .Phaser ;
45
47
import javax .annotation .concurrent .GuardedBy ;
46
48
import org .json .JSONArray ;
@@ -193,6 +195,8 @@ static class AppendCompleteCallback implements ApiFutureCallback<AppendRowsRespo
193
195
194
196
private final DataWriter parent ;
195
197
private final AppendContext appendContext ;
198
+ // Prepare a thread pool
199
+ static ExecutorService pool = Executors .newFixedThreadPool (50 );
196
200
197
201
public AppendCompleteCallback (DataWriter parent , AppendContext appendContext ) {
198
202
this .parent = parent ;
@@ -213,19 +217,18 @@ public void onFailure(Throwable throwable) {
213
217
&& RETRIABLE_ERROR_CODES .contains (status .getCode ())) {
214
218
appendContext .retryCount ++;
215
219
// Use a separate thread to avoid potentially blocking while we are in a callback.
216
- new Thread (
217
- () -> {
218
- try {
219
- // Since default stream appends are not ordered, we can simply retry the
220
- // appends.
221
- // Retrying with exclusive streams requires more careful consideration.
222
- this .parent .append (appendContext );
223
- } catch (Exception e ) {
224
- // Fall through to return error.
225
- System .out .format ("Failed to retry append: %s%n" , e );
226
- }
227
- })
228
- .start ();
220
+ pool .submit (
221
+ () -> {
222
+ try {
223
+ // Since default stream appends are not ordered, we can simply retry the
224
+ // appends.
225
+ // Retrying with exclusive streams requires more careful consideration.
226
+ this .parent .append (appendContext );
227
+ } catch (Exception e ) {
228
+ // Fall through to return error.
229
+ System .out .format ("Failed to retry append: %s%n" , e );
230
+ }
231
+ });
229
232
// Mark the existing attempt as done since it's being retried.
230
233
done ();
231
234
return ;
@@ -251,15 +254,14 @@ public void onFailure(Throwable throwable) {
251
254
// Retry the remaining valid rows, but using a separate thread to
252
255
// avoid potentially blocking while we are in a callback.
253
256
if (dataNew .length () > 0 ) {
254
- new Thread (
255
- () -> {
256
- try {
257
- this .parent .append (new AppendContext (dataNew , 0 ));
258
- } catch (Exception e2 ) {
259
- System .out .format ("Failed to retry append with filtered rows: %s%n" , e2 );
260
- }
261
- })
262
- .start ();
257
+ pool .submit (
258
+ () -> {
259
+ try {
260
+ this .parent .append (new AppendContext (dataNew , 0 ));
261
+ } catch (Exception e2 ) {
262
+ System .out .format ("Failed to retry append with filtered rows: %s%n" , e2 );
263
+ }
264
+ });
263
265
}
264
266
return ;
265
267
}
0 commit comments