1919import com .netflix .concurrency .limits .MetricRegistry ;
2020import com .netflix .concurrency .limits .MetricRegistry .SampleListener ;
2121import com .netflix .concurrency .limits .internal .EmptyMetricRegistry ;
22- import com .netflix .concurrency .limits .internal .Preconditions ;
2322import com .netflix .concurrency .limits .limit .functions .Log10RootFunction ;
2423import org .slf4j .Logger ;
2524import org .slf4j .LoggerFactory ;
2625
2726import java .util .concurrent .ThreadLocalRandom ;
2827import java .util .concurrent .TimeUnit ;
28+ import java .util .function .DoubleUnaryOperator ;
2929import java .util .function .Function ;
30+ import java .util .function .IntUnaryOperator ;
3031
3132/**
3233 * Limiter based on TCP Vegas where the limit increases by alpha if the queue_use is small ({@literal <} alpha)
4142public class VegasLimit extends AbstractLimit {
4243 private static final Logger LOG = LoggerFactory .getLogger (VegasLimit .class );
4344
44- private static final Function < Integer , Integer > LOG10 = Log10RootFunction .create (0 );
45+ private static final IntUnaryOperator LOG10 = Log10RootFunction .create (0 );
4546
4647 public static class Builder {
4748 private int initialLimit = 20 ;
4849 private int maxConcurrency = 1000 ;
4950 private MetricRegistry registry = EmptyMetricRegistry .INSTANCE ;
5051 private double smoothing = 1.0 ;
5152
52- private Function < Integer , Integer > alphaFunc = (limit ) -> 3 * LOG10 .apply (limit . intValue () );
53- private Function < Integer , Integer > betaFunc = (limit ) -> 6 * LOG10 .apply (limit . intValue () );
54- private Function < Integer , Integer > thresholdFunc = ( limit ) -> LOG10 . apply ( limit . intValue ()) ;
55- private Function < Double , Double > increaseFunc = (limit ) -> limit + LOG10 .apply ( limit . intValue () );
56- private Function < Double , Double > decreaseFunc = (limit ) -> limit - LOG10 .apply ( limit . intValue () );
53+ private IntUnaryOperator alphaFunc = (limit ) -> 3 * LOG10 .applyAsInt (limit );
54+ private IntUnaryOperator betaFunc = (limit ) -> 6 * LOG10 .applyAsInt (limit );
55+ private IntUnaryOperator thresholdFunc = LOG10 ;
56+ private DoubleUnaryOperator increaseFunc = (limit ) -> limit + LOG10 .applyAsInt (( int ) limit );
57+ private DoubleUnaryOperator decreaseFunc = (limit ) -> limit - LOG10 .applyAsInt (( int ) limit );
5758 private int probeMultiplier = 30 ;
5859
5960 private Builder () {
@@ -74,13 +75,31 @@ public Builder alpha(int alpha) {
7475 this .alphaFunc = (ignore ) -> alpha ;
7576 return this ;
7677 }
77-
78+
79+ /**
80+ * @deprecated use {@link #thresholdFunction(IntUnaryOperator)}
81+ */
82+ @ Deprecated
7883 public Builder threshold (Function <Integer , Integer > threshold ) {
84+ this .thresholdFunc = threshold ::apply ;
85+ return this ;
86+ }
87+
88+ public Builder thresholdFunction (IntUnaryOperator threshold ) {
7989 this .thresholdFunc = threshold ;
8090 return this ;
8191 }
82-
92+
93+ /**
94+ * @deprecated use {@link #alphaFunction(IntUnaryOperator)}
95+ */
96+ @ Deprecated
8397 public Builder alpha (Function <Integer , Integer > alpha ) {
98+ this .alphaFunc = alpha ::apply ;
99+ return this ;
100+ }
101+
102+ public Builder alphaFunction (IntUnaryOperator alpha ) {
84103 this .alphaFunc = alpha ;
85104 return this ;
86105 }
@@ -89,18 +108,45 @@ public Builder beta(int beta) {
89108 this .betaFunc = (ignore ) -> beta ;
90109 return this ;
91110 }
92-
111+
112+ /**
113+ * @deprecated use {@link #betaFunction(IntUnaryOperator)}
114+ */
115+ @ Deprecated
93116 public Builder beta (Function <Integer , Integer > beta ) {
117+ this .betaFunc = beta ::apply ;
118+ return this ;
119+ }
120+
121+ public Builder betaFunction (IntUnaryOperator beta ) {
94122 this .betaFunc = beta ;
95123 return this ;
96124 }
97-
125+
126+ /**
127+ * @deprecated use {@link #increaseFunction(DoubleUnaryOperator)}
128+ */
129+ @ Deprecated
98130 public Builder increase (Function <Double , Double > increase ) {
131+ this .increaseFunc = increase ::apply ;
132+ return this ;
133+ }
134+
135+ public Builder increaseFunction (DoubleUnaryOperator increase ) {
99136 this .increaseFunc = increase ;
100137 return this ;
101138 }
102-
139+
140+ /**
141+ * @deprecated use {@link #decreaseFunction(DoubleUnaryOperator)}
142+ */
143+ @ Deprecated
103144 public Builder decrease (Function <Double , Double > decrease ) {
145+ this .decreaseFunc = decrease ::apply ;
146+ return this ;
147+ }
148+
149+ public Builder decreaseFunction (DoubleUnaryOperator decrease ) {
104150 this .decreaseFunc = decrease ;
105151 return this ;
106152 }
@@ -164,11 +210,11 @@ public static VegasLimit newDefault() {
164210 private final int maxLimit ;
165211
166212 private final double smoothing ;
167- private final Function < Integer , Integer > alphaFunc ;
168- private final Function < Integer , Integer > betaFunc ;
169- private final Function < Integer , Integer > thresholdFunc ;
170- private final Function < Double , Double > increaseFunc ;
171- private final Function < Double , Double > decreaseFunc ;
213+ private final IntUnaryOperator alphaFunc ;
214+ private final IntUnaryOperator betaFunc ;
215+ private final IntUnaryOperator thresholdFunc ;
216+ private final DoubleUnaryOperator increaseFunc ;
217+ private final DoubleUnaryOperator decreaseFunc ;
172218 private final SampleListener rttSampleListener ;
173219 private final int probeMultiplier ;
174220 private int probeCount = 0 ;
@@ -201,69 +247,77 @@ private boolean shouldProbe() {
201247
202248 @ Override
203249 protected int _update (long startTime , long rtt , int inflight , boolean didDrop ) {
204- Preconditions .checkArgument (rtt > 0 , "rtt must be >0 but got " + rtt );
250+ if (rtt <= 0 ) {
251+ throw new IllegalArgumentException ("rtt must be >0 but got " + rtt );
252+ }
205253
206254 probeCount ++;
207255 if (shouldProbe ()) {
208- LOG .debug ("Probe MinRTT {}" , TimeUnit .NANOSECONDS .toMicros (rtt ) / 1000.0 );
256+ if (LOG .isDebugEnabled ()) {
257+ LOG .debug ("Probe MinRTT {}" , TimeUnit .NANOSECONDS .toMicros (rtt ) / 1000.0 );
258+ }
209259 resetProbeJitter ();
210260 probeCount = 0 ;
211261 rtt_noload = rtt ;
212- return (int )estimatedLimit ;
262+ return (int ) estimatedLimit ;
213263 }
214-
264+
265+ long rtt_noload = this .rtt_noload ;
215266 if (rtt_noload == 0 || rtt < rtt_noload ) {
216- LOG .debug ("New MinRTT {}" , TimeUnit .NANOSECONDS .toMicros (rtt ) / 1000.0 );
217- rtt_noload = rtt ;
218- return (int )estimatedLimit ;
267+ if (LOG .isDebugEnabled ()) {
268+ LOG .debug ("New MinRTT {}" , TimeUnit .NANOSECONDS .toMicros (rtt ) / 1000.0 );
269+ }
270+ this .rtt_noload = rtt ;
271+ return (int ) estimatedLimit ;
219272 }
220-
273+
221274 rttSampleListener .addLongSample (rtt_noload );
222275
223- return updateEstimatedLimit (rtt , inflight , didDrop );
276+ return updateEstimatedLimit (rtt , rtt_noload , inflight , didDrop );
224277 }
225278
226- private int updateEstimatedLimit (long rtt , int inflight , boolean didDrop ) {
227- final int queueSize = (int ) Math .ceil (estimatedLimit * (1 - (double )rtt_noload / rtt ));
279+ private int updateEstimatedLimit (long rtt , long rtt_noload , int inflight , boolean didDrop ) {
280+ double estimatedLimit = this .estimatedLimit ;
281+ final int queueSize = (int ) Math .ceil (estimatedLimit * (1 - (double ) rtt_noload / rtt ));
228282
229283 double newLimit ;
230284 // Treat any drop (i.e timeout) as needing to reduce the limit
231285 if (didDrop ) {
232- newLimit = decreaseFunc .apply (estimatedLimit );
286+ newLimit = decreaseFunc .applyAsDouble (estimatedLimit );
233287 // Prevent upward drift if not close to the limit
234288 } else if (inflight * 2 < estimatedLimit ) {
235- return (int )estimatedLimit ;
289+ return (int ) estimatedLimit ;
236290 } else {
237- int alpha = alphaFunc .apply ((int )estimatedLimit );
238- int beta = betaFunc .apply ((int )estimatedLimit );
239- int threshold = this . thresholdFunc .apply ((int )estimatedLimit );
291+ int alpha = alphaFunc .applyAsInt ((int ) estimatedLimit );
292+ int beta = betaFunc .applyAsInt ((int ) estimatedLimit );
293+ int threshold = thresholdFunc .applyAsInt ((int ) estimatedLimit );
240294
241295 // Aggressive increase when no queuing
242296 if (queueSize <= threshold ) {
243297 newLimit = estimatedLimit + beta ;
244298 // Increase the limit if queue is still manageable
245299 } else if (queueSize < alpha ) {
246- newLimit = increaseFunc .apply (estimatedLimit );
300+ newLimit = increaseFunc .applyAsDouble (estimatedLimit );
247301 // Detecting latency so decrease
248302 } else if (queueSize > beta ) {
249- newLimit = decreaseFunc .apply (estimatedLimit );
303+ newLimit = decreaseFunc .applyAsDouble (estimatedLimit );
250304 // We're within he sweet spot so nothing to do
251305 } else {
252- return (int )estimatedLimit ;
306+ return (int ) estimatedLimit ;
253307 }
254308 }
255309
256310 newLimit = Math .max (1 , Math .min (maxLimit , newLimit ));
257311 newLimit = (1 - smoothing ) * estimatedLimit + smoothing * newLimit ;
258- if ((int )newLimit != (int )estimatedLimit && LOG .isDebugEnabled ()) {
312+ if ((int ) newLimit != (int ) estimatedLimit && LOG .isDebugEnabled ()) {
259313 LOG .debug ("New limit={} minRtt={} ms winRtt={} ms queueSize={}" ,
260- (int )newLimit ,
314+ (int ) newLimit ,
261315 TimeUnit .NANOSECONDS .toMicros (rtt_noload ) / 1000.0 ,
262316 TimeUnit .NANOSECONDS .toMicros (rtt ) / 1000.0 ,
263317 queueSize );
264318 }
265- estimatedLimit = newLimit ;
266- return (int )estimatedLimit ;
319+ this . estimatedLimit = newLimit ;
320+ return (int ) newLimit ;
267321 }
268322
269323 @ Override
0 commit comments