File tree Expand file tree Collapse file tree 1 file changed +17
-14
lines changed Expand file tree Collapse file tree 1 file changed +17
-14
lines changed Original file line number Diff line number Diff line change @@ -44,6 +44,7 @@ public <T> List<WatchResponse> chunkedPush(
44
44
) {
45
45
List< WatchResponse> responses = new ArrayList<> ();
46
46
List< T> records = new ArrayList<> ();
47
+ int offset = 0;
47
48
int waitBatchSize = batchSize / 10;
48
49
if (waitBatchSize < 1) {
49
50
waitBatchSize = batchSize;
@@ -53,23 +54,24 @@ public <T> List<WatchResponse> chunkedPush(
53
54
T current = it.next();
54
55
55
56
while (true) {
57
+ records.add(current);
58
+
56
59
if (records.size() == batchSize || ! it.hasNext()) {
57
- WatchResponse watch =
58
- this.push(
59
- indexName,
60
- new PushTaskPayload().setAction(action).setRecords(this.objectsToPushTaskRecords(records)),
61
- waitForTasks,
62
- referenceIndexName,
63
- requestOptions
64
- );
60
+ WatchResponse watch = this.push(
61
+ indexName,
62
+ new PushTaskPayload().setAction(action).setRecords(this.objectsToPushTaskRecords(records)),
63
+ waitForTasks,
64
+ referenceIndexName,
65
+ requestOptions
66
+ );
65
67
responses.add(watch);
66
68
records.clear();
67
69
}
68
70
69
- records.add(current);
70
-
71
- if (waitForTasks && (responses.size() % waitBatchSize == 0 || !it.hasNext ())) {
72
- responses.subList(Math.max(responses.size() - waitBatchSize, 0), responses.size()) .forEach(response -> {
71
+ if (waitForTasks && responses.size() > 0 && (responses.size() % waitBatchSize == 0 || !it.hasNext())) {
72
+ responses
73
+ .subList(offset, Math.min(offset + waitBatchSize, responses.size ()))
74
+ .forEach(response -> {
73
75
TaskUtils.retryUntil(
74
76
() -> {
75
77
try {
@@ -88,8 +90,9 @@ public <T> List<WatchResponse> chunkedPush(
88
90
50,
89
91
null
90
92
);
91
- }
92
- );
93
+ });
94
+
95
+ offset += waitBatchSize;
93
96
}
94
97
95
98
if (!it.hasNext()) {
You can’t perform that action at this time.
0 commit comments