-
Notifications
You must be signed in to change notification settings - Fork 15
Update retry bq table lookup #87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/GcsToBqWriter.java
Show resolved
Hide resolved
@Claudenw
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the javadoc as requested and I will approve.
import java.util.Random; | ||
|
||
/** | ||
* Simple exponential backoff helper with jitter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delay doubling backoff, not exponential.
} | ||
|
||
/** | ||
* Sleep for the current backoff period and increase the delay exponentially. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment should state that backoff period doubles
while (!timer.isExpired()) { | ||
try { | ||
return func.get(); | ||
} catch (BaseServiceException e) { | ||
retryCount++; | ||
if (e.isRetryable()) { | ||
logger.warn("Retryable exception on attempt {}: {}", retryCount, e.getMessage()); | ||
backoff.delay(); | ||
} else { | ||
logger.error("Non-retryable exception on attempt {}", retryCount, e); | ||
throw e; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This delay code has a couple of edge cases.
backoff.delay() may delay longer than the timer has left and so there will be a delay in execution that may amount to several seconds before the timer expiration is detected. At the high end of the delay time it may make sense to adjust the backoff to only delay for a period slightly less than the the time remaining or to abort the delay all together.
By default there are 10 retries of 1 second each based on default config options. So timeout is 10 seconds.
The initial backoff delay is 100ms and is multiplied by 2 on each backoff call.
next delay | ms remaining | ms used |
---|---|---|
100 | 10000 | 0 |
200 | 9800 | 100 |
400 | 9600 | 300 |
800 | 9200 | 700 |
1600 | 8400 | 1500 |
3200 | 6800 | 3100 |
6400 | 3600 | 6300 |
12800 | -2800 | 12700 |
On the 7th iteration we have spent 6.3 seconds waiting, there are 3.6 seconds remaining on the timer but the delay is calling for 6.4 seconds of delay
We delay 6.4. seconds, the timer has expired by 2.8 seconds so we exit the loop without retrying.
We could have exited the loop before the 6.4 seconds if we knew the timer would expire and saved 6.4 seconds of execution time.
If the outer while loop were changed to a "do/while" loop, and the backoff delay started with no delay, then you could call backoff.delay() and then func.get()
. In this way you only delay if there is still time left after the get
call and if you adjust the backoff.delay to account for the time remaining on the timer then you can ensure that your total time is close to the maximum time + jitter.
The original code I pointed you to handles this case. It also applies negative jitter as well as positive jitter so that multiple threads will diverge more rapidly.
Since this is in a private method and can be adjusted easily in the future, I will approve this change.
However, if you decide that you want to make adjustments, I will be happy to accept them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Claude,
Thanks for your detailed comment 🙏
I have two points I'd like to highlight.
First, I completely agree with your statements here. Therefore, I added this line to ensure delay doesn't exceed maxDelayMs. If the retry value is 10, then it can be a maximum of 10 seconds, as you mentioned:
long delay = Math.min(currentDelayMs, maxDelayMs);
On the other hand, the retry count actually comes from the BIGQUERY_RETRY_DEFAULT
parameter. At first, I also assumed the value was retrieved from MAX_RETRIES_DEFAULT
, but I realized when I was running the unit tests.
The default value of BIGQUERY_RETRY_DEFAULT is 0, and it will only retry only once as we set this condition here:
if (retries == 0) {
timeout = Duration.ofMillis(retryWaitMs);
}
I would really appreciate if you can confirm/reject my understading here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's start with retries = 0. Timeout is then retryWaitMs (default 100, minimum 0)
if retrywait = 0 then executeWithTimout will not execute the command at all.
if retries = 1. Timeout is then retryWaitMs (default 100, minimum 0)
assume first call fails with retryable exception.
thread will wait for 100ms
test at line 231 will fail and a retry will not occur
looking at the default values, retries=10 retrayWaitMs=100 maxdelay = 10000ms
on the 7th interation, using the table above) the currentDelayMs is in the first column (6400).
currentDelayMs is less than maxdelay so the delay time will be 6400ms
the total elapsed time thus far is 6300ms so timer has not expired
system will delay for 6400ms so the total elapsed time will be 12700ms and the timer will expire after 7 retries.
This code neither attempts the number of retries, nor stops at the maxdelay.
The example code calculates the number of retries based on the amount of time remaining before the clock expires. Your code might switch to calculating the retries.
If you calculate int(log2(maxdelay))
and subtract from that int(log2(retrywait))
You will have the number of loops necessary to reach maxdelay. (Assuming retrywait>1)
example:
maxdelay=10000, log2(10000) = 13.288, int(13.288) = 13
retrywait=100, log2(100) = 6.644, int(6.644) = 6
13-6 = 7 loops.
loop | delay | total delay |
---|---|---|
1 | 100 | 100 |
2 | 200 | 300 |
3 | 400 | 700 |
4 | 800 | 1500 |
5 | 1600 | 3100 |
6 | 3200 | 6300 |
7 | 6400 | 12700 |
7 loops will have a delay that just exceeds or matches the limit
6 loops will stop 1 short of exceeding the limit.
Given that the actual call that we are attempting to time takes some time, I would think that 6 is the number of loops you actually want to execute, based on the maxdelay and retrywait. values
Hello @Claudenw , |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will approve with changes as exist, or you can fix the retry,
while (!timer.isExpired()) { | ||
try { | ||
return func.get(); | ||
} catch (BaseServiceException e) { | ||
retryCount++; | ||
if (e.isRetryable()) { | ||
logger.warn("Retryable exception on attempt {}: {}", retryCount, e.getMessage()); | ||
backoff.delay(); | ||
} else { | ||
logger.error("Non-retryable exception on attempt {}", retryCount, e); | ||
throw e; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's start with retries = 0. Timeout is then retryWaitMs (default 100, minimum 0)
if retrywait = 0 then executeWithTimout will not execute the command at all.
if retries = 1. Timeout is then retryWaitMs (default 100, minimum 0)
assume first call fails with retryable exception.
thread will wait for 100ms
test at line 231 will fail and a retry will not occur
looking at the default values, retries=10 retrayWaitMs=100 maxdelay = 10000ms
on the 7th interation, using the table above) the currentDelayMs is in the first column (6400).
currentDelayMs is less than maxdelay so the delay time will be 6400ms
the total elapsed time thus far is 6300ms so timer has not expired
system will delay for 6400ms so the total elapsed time will be 12700ms and the timer will expire after 7 retries.
This code neither attempts the number of retries, nor stops at the maxdelay.
The example code calculates the number of retries based on the amount of time remaining before the clock expires. Your code might switch to calculating the retries.
If you calculate int(log2(maxdelay))
and subtract from that int(log2(retrywait))
You will have the number of loops necessary to reach maxdelay. (Assuming retrywait>1)
example:
maxdelay=10000, log2(10000) = 13.288, int(13.288) = 13
retrywait=100, log2(100) = 6.644, int(6.644) = 6
13-6 = 7 loops.
loop | delay | total delay |
---|---|---|
1 | 100 | 100 |
2 | 200 | 300 |
3 | 400 | 700 |
4 | 800 | 1500 |
5 | 1600 | 3100 |
6 | 3200 | 6300 |
7 | 6400 | 12700 |
7 loops will have a delay that just exceeds or matches the limit
6 loops will stop 1 short of exceeding the limit.
Given that the actual call that we are attempting to time takes some time, I would think that 6 is the number of loops you actually want to execute, based on the maxdelay and retrywait. values
@veliuenal please add the proper license headers to your submission |
Overview - Support Ticket ID: 98024
getTable operation can randomly time out due to network or API hiccups. Retrying is safe/idempotent and avoids unnecessary pipeline failures.
This PR makes the table lookup operation more resilient by adding a bounded retry loop.
Error:
Tests :