Skip to content

Commit 299a61e

Browse files
authored
util: Outlier detection tracer delegation (#10459) (#10483)
OutlierDetectionLoadBalancer did not delegate calls to an existing ClientStreamTracer from the tracer it installed. This change has the OD tracer delegate all calls to the underlying one.
1 parent df9148c commit 299a61e

File tree

2 files changed

+111
-27
lines changed

2 files changed

+111
-27
lines changed

core/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -394,47 +394,55 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
394394

395395
Subchannel subchannel = pickResult.getSubchannel();
396396
if (subchannel != null) {
397-
return PickResult.withSubchannel(subchannel,
398-
new ResultCountingClientStreamTracerFactory(
399-
subchannel.getAttributes().get(ADDRESS_TRACKER_ATTR_KEY)));
397+
return PickResult.withSubchannel(subchannel, new ResultCountingClientStreamTracerFactory(
398+
subchannel.getAttributes().get(ADDRESS_TRACKER_ATTR_KEY),
399+
pickResult.getStreamTracerFactory()));
400400
}
401401

402402
return pickResult;
403403
}
404404

405405
/**
406-
* Builds instances of {@link ResultCountingClientStreamTracer}.
406+
* Builds instances of a {@link ClientStreamTracer} that increments the call count in the
407+
* tracker for each closed stream.
407408
*/
408409
class ResultCountingClientStreamTracerFactory extends ClientStreamTracer.Factory {
409410

410411
private final AddressTracker tracker;
411412

412-
ResultCountingClientStreamTracerFactory(AddressTracker tracker) {
413-
this.tracker = tracker;
414-
}
415-
416-
@Override
417-
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
418-
return new ResultCountingClientStreamTracer(tracker);
419-
}
420-
}
413+
@Nullable
414+
private final ClientStreamTracer.Factory delegateFactory;
421415

422-
/**
423-
* Counts the results (successful/unsuccessful) of a particular {@link
424-
* OutlierDetectionSubchannel}s streams and increments the counter in the associated {@link
425-
* AddressTracker}.
426-
*/
427-
class ResultCountingClientStreamTracer extends ClientStreamTracer {
428-
429-
AddressTracker tracker;
430-
431-
public ResultCountingClientStreamTracer(AddressTracker tracker) {
416+
ResultCountingClientStreamTracerFactory(AddressTracker tracker,
417+
@Nullable ClientStreamTracer.Factory delegateFactory) {
432418
this.tracker = tracker;
419+
this.delegateFactory = delegateFactory;
433420
}
434421

435422
@Override
436-
public void streamClosed(Status status) {
437-
tracker.incrementCallCount(status.isOk());
423+
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
424+
if (delegateFactory != null) {
425+
ClientStreamTracer delegateTracer = delegateFactory.newClientStreamTracer(info, headers);
426+
return new ForwardingClientStreamTracer() {
427+
@Override
428+
protected ClientStreamTracer delegate() {
429+
return delegateTracer;
430+
}
431+
432+
@Override
433+
public void streamClosed(Status status) {
434+
tracker.incrementCallCount(status.isOk());
435+
delegate().streamClosed(status);
436+
}
437+
};
438+
} else {
439+
return new ClientStreamTracer() {
440+
@Override
441+
public void streamClosed(Status status) {
442+
tracker.incrementCallCount(status.isOk());
443+
}
444+
};
445+
}
438446
}
439447
}
440448
}

core/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.mockito.Mockito.mock;
2525
import static org.mockito.Mockito.times;
2626
import static org.mockito.Mockito.verify;
27+
import static org.mockito.Mockito.verifyNoInteractions;
2728
import static org.mockito.Mockito.when;
2829

2930
import com.google.common.collect.ImmutableList;
@@ -46,6 +47,7 @@
4647
import io.grpc.LoadBalancer.SubchannelPicker;
4748
import io.grpc.LoadBalancer.SubchannelStateListener;
4849
import io.grpc.LoadBalancerProvider;
50+
import io.grpc.Metadata;
4951
import io.grpc.Status;
5052
import io.grpc.SynchronizationContext;
5153
import io.grpc.internal.FakeClock;
@@ -96,6 +98,10 @@ public class OutlierDetectionLoadBalancerTest {
9698
private Helper mockHelper;
9799
@Mock
98100
private SocketAddress mockSocketAddress;
101+
@Mock
102+
private ClientStreamTracer.Factory mockStreamTracerFactory;
103+
@Mock
104+
private ClientStreamTracer mockStreamTracer;
99105

100106
@Captor
101107
private ArgumentCaptor<ConnectivityState> connectivityStateCaptor;
@@ -193,6 +199,9 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
193199
}
194200
});
195201

202+
when(mockStreamTracerFactory.newClientStreamTracer(any(),
203+
any())).thenReturn(mockStreamTracer);
204+
196205
loadBalancer = new OutlierDetectionLoadBalancer(mockHelper, fakeClock.getTimeProvider());
197206
}
198207

@@ -355,6 +364,72 @@ public void delegatePick() throws Exception {
355364
readySubchannel);
356365
}
357366

367+
/**
368+
* Any ClientStreamTracer.Factory set by the delegate picker should still get used.
369+
*/
370+
@Test
371+
public void delegatePickTracerFactoryPreserved() throws Exception {
372+
OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder()
373+
.setSuccessRateEjection(new SuccessRateEjection.Builder().build())
374+
.setChildPolicy(new PolicySelection(fakeLbProvider, null)).build();
375+
376+
loadBalancer.acceptResolvedAddresses(buildResolvedAddress(config, servers.get(0)));
377+
378+
// Make one of the subchannels READY.
379+
final Subchannel readySubchannel = subchannels.values().iterator().next();
380+
deliverSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
381+
382+
verify(mockHelper, times(2)).updateBalancingState(stateCaptor.capture(),
383+
pickerCaptor.capture());
384+
385+
// Make sure that we can pick the single READY subchannel.
386+
SubchannelPicker picker = pickerCaptor.getAllValues().get(1);
387+
PickResult pickResult = picker.pickSubchannel(mock(PickSubchannelArgs.class));
388+
389+
// Calls to a stream tracer created with the factory in the result should make it to a stream
390+
// tracer the underlying LB/picker is using.
391+
ClientStreamTracer clientStreamTracer = pickResult.getStreamTracerFactory()
392+
.newClientStreamTracer(ClientStreamTracer.StreamInfo.newBuilder().build(), new Metadata());
393+
clientStreamTracer.inboundHeaders();
394+
// The underlying fake LB provider is configured with a factory that returns a mock stream
395+
// tracer.
396+
verify(mockStreamTracer).inboundHeaders();
397+
}
398+
399+
/**
400+
* Assure the tracer works even when the underlying LB does not have a tracer to delegate to.
401+
*/
402+
@Test
403+
public void delegatePickTracerFactoryNotSet() throws Exception {
404+
// We set the mock factory to null to indicate that the delegate does not have its own tracer.
405+
mockStreamTracerFactory = null;
406+
407+
OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder()
408+
.setSuccessRateEjection(new SuccessRateEjection.Builder().build())
409+
.setChildPolicy(new PolicySelection(fakeLbProvider, null)).build();
410+
411+
loadBalancer.acceptResolvedAddresses(buildResolvedAddress(config, servers.get(0)));
412+
413+
// Make one of the subchannels READY.
414+
final Subchannel readySubchannel = subchannels.values().iterator().next();
415+
deliverSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
416+
417+
verify(mockHelper, times(2)).updateBalancingState(stateCaptor.capture(),
418+
pickerCaptor.capture());
419+
420+
// Make sure that we can pick the single READY subchannel.
421+
SubchannelPicker picker = pickerCaptor.getAllValues().get(1);
422+
PickResult pickResult = picker.pickSubchannel(mock(PickSubchannelArgs.class));
423+
424+
// With no delegate tracers factory a call to the OD tracer should still work
425+
ClientStreamTracer clientStreamTracer = pickResult.getStreamTracerFactory()
426+
.newClientStreamTracer(ClientStreamTracer.StreamInfo.newBuilder().build(), new Metadata());
427+
clientStreamTracer.inboundHeaders();
428+
429+
// Sanity check to make sure the delegate tracer does not get called.
430+
verifyNoInteractions(mockStreamTracer);
431+
}
432+
358433
/**
359434
* The success rate algorithm leaves a healthy set of addresses alone.
360435
*/
@@ -1121,7 +1196,7 @@ void assertEjectedSubchannels(Set<SocketAddress> addresses) {
11211196
}
11221197

11231198
/** Round robin like fake load balancer. */
1124-
private static final class FakeLoadBalancer extends LoadBalancer {
1199+
private final class FakeLoadBalancer extends LoadBalancer {
11251200
private final Helper helper;
11261201

11271202
List<Subchannel> subchannelList;
@@ -1159,7 +1234,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
11591234
if (lastPickIndex < 0 || lastPickIndex > subchannelList.size() - 1) {
11601235
lastPickIndex = 0;
11611236
}
1162-
return PickResult.withSubchannel(subchannelList.get(lastPickIndex++));
1237+
return PickResult.withSubchannel(subchannelList.get(lastPickIndex++),
1238+
mockStreamTracerFactory);
11631239
}
11641240
};
11651241
helper.updateBalancingState(state, picker);

0 commit comments

Comments
 (0)