Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,15 +360,19 @@ type Balancer interface {
// call SubConn.Shutdown for its existing SubConns; however, this will be
// required in a future release, so it is recommended.
Close()
// ExitIdle instructs the LB policy to reconnect to backends / exit the
// IDLE state, if appropriate and possible. Note that SubConns that enter
// the IDLE state will not reconnect until SubConn.Connect is called.
ExitIdle()
}

// ExitIdler is an optional interface for balancers to implement. If
// implemented, ExitIdle will be called when ClientConn.Connect is called, if
// the ClientConn is idle. If unimplemented, ClientConn.Connect will cause
// all SubConns to connect.
//
// Notice: it will be required for all balancers to implement this in a future
// release.
// Deprecated: All balancers must implement this interface. This interface will
// be removed in a future release.
type ExitIdler interface {
// ExitIdle instructs the LB policy to reconnect to backends / exit the
// IDLE state, if appropriate and possible. Note that SubConns that enter
Expand Down
40 changes: 30 additions & 10 deletions balancer/endpointsharding/endpointsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,15 @@ type ChildState struct {

// Balancer exposes only the ExitIdler interface of the child LB policy.
// Other methods of the child policy are called only by endpointsharding.
Balancer balancer.ExitIdler
Balancer ExitIdler
}

// ExitIdler provides access to only the ExitIdle method of the child balancer.
type ExitIdler interface {
// ExitIdle instructs the LB policy to reconnect to backends / exit the
// IDLE state, if appropriate and possible. Note that SubConns that enter
// the IDLE state will not reconnect until SubConn.Connect is called.
ExitIdle()
}

// Options are the options to configure the behaviour of the
Expand Down Expand Up @@ -205,6 +213,20 @@ func (es *endpointSharding) Close() {
}
}

func (es *endpointSharding) ExitIdle() {
es.childMu.Lock()
defer es.childMu.Unlock()
for _, bw := range es.children.Load().Values() {
// this implementation assumes the child balancer supports
// exitidle (but still checks for the interface's existence to
// avoid a panic if not). If the child does not, no subconns
// will be connected.
if !bw.isClosed {
bw.child.ExitIdle()
}
}
}

// updateState updates this component's state. It sends the aggregated state,
// and a picker with round robin behavior with all the child states present if
// needed.
Expand Down Expand Up @@ -326,15 +348,13 @@ func (bw *balancerWrapper) UpdateState(state balancer.State) {
// ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to
// avoid deadlocks due to synchronous balancer state updates.
func (bw *balancerWrapper) ExitIdle() {
if ei, ok := bw.child.(balancer.ExitIdler); ok {
go func() {
bw.es.childMu.Lock()
if !bw.isClosed {
ei.ExitIdle()
}
bw.es.childMu.Unlock()
}()
}
go func() {
bw.es.childMu.Lock()
if !bw.isClosed {
bw.child.ExitIdle()
}
bw.es.childMu.Unlock()
}()
}

// updateClientConnStateLocked delivers the ClientConnState to the child
Expand Down
70 changes: 70 additions & 0 deletions balancer/endpointsharding/endpointsharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ type fakePetiole struct {
bOpts balancer.BuildOptions
}

func (fp *fakePetiole) ExitIdle() {
fp.Balancer.ExitIdle()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't necessary, is it? Balancer is embedded so this should happen automatically?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, removed it. I was working on top of the previous PR that added ExitIdle implementations but didn't add the method to the Balancer interface. This was an artifact of the that.

}

func (fp *fakePetiole) UpdateClientConnState(state balancer.ClientConnState) error {
if el := state.ResolverState.Endpoints; len(el) != 2 {
return fmt.Errorf("UpdateClientConnState wants two endpoints, got: %v", el)
Expand Down Expand Up @@ -285,3 +289,69 @@ func (s) TestEndpointShardingReconnectDisabled(t *testing.T) {
}
}
}

// Tests that endpointsharding doesn't automatically re-connect IDLE children
// until cc.Connect() is called. The test creates an endpoint with a single
// address. The client is connected and the active server is closed to make the
// child pickfirst enter IDLE state. The test verifies that the child pickfirst
// doesn't re-connect automatically. The test calls cc.Connect() and verified
// that the balancer connects causing the channel to enter TransientFailure.
func (s) TestEndpointShardingExitIdle(t *testing.T) {
backend := stubserver.StartTestService(t, nil)
defer backend.Stop()

mr := manual.NewBuilderWithScheme("e2e-test")
defer mr.Close()

name := strings.ReplaceAll(strings.ToLower(t.Name()), "/", "")
bf := stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
epOpts := endpointsharding.Options{DisableAutoReconnect: true}
bd.Data = endpointsharding.NewBalancer(bd.ClientConn, bd.BuildOptions, balancer.Get(pickfirstleaf.Name).Build, epOpts)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs)
},
Close: func(bd *stub.BalancerData) {
bd.Data.(balancer.Balancer).Close()
},
ExitIdle: func(bd *stub.BalancerData) {
bd.Data.(balancer.Balancer).ExitIdle()
},
}
stub.Register(name, bf)

json := fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, name)
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(json)
mr.InitialState(resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: backend.Address}}},
},
ServiceConfig: sc,
})

cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("Failed to create new client: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Errorf("client.EmptyCall() returned unexpected error: %v", err)
}

// On closing the first server, the first child balancer should enter
// IDLE. Since endpointsharding is configured not to auto-reconnect, it will
// remain IDLE and will not try to re-connect
backend.Stop()
testutils.AwaitState(ctx, t, cc, connectivity.Idle)
shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer shortCancel()
testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Idle)

// The balancer should try to re-connect and fail.
cc.Connect()
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
}
4 changes: 1 addition & 3 deletions balancer/lazy/lazy.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ func (lb *lazyBalancer) ExitIdle() {
lb.mu.Lock()
defer lb.mu.Unlock()
if lb.delegate != nil {
if d, ok := lb.delegate.(balancer.ExitIdler); ok {
d.ExitIdle()
}
lb.delegate.ExitIdle()
return
}
lb.delegate = lb.childBuilder(lb.cc, lb.buildOptions)
Expand Down
4 changes: 2 additions & 2 deletions balancer/lazy/lazy_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s) TestExitIdle(t *testing.T) {
bd.Data = lazy.NewBalancer(bd.ClientConn, bd.BuildOptions, balancer.Get(pickfirstleaf.Name).Build)
},
ExitIdle: func(bd *stub.BalancerData) {
bd.Data.(balancer.ExitIdler).ExitIdle()
bd.Data.(balancer.Balancer).ExitIdle()
},
ResolverError: func(bd *stub.BalancerData, err error) {
bd.Data.(balancer.Balancer).ResolverError(err)
Expand Down Expand Up @@ -410,7 +410,7 @@ func (s) TestExitIdlePassthrough(t *testing.T) {
bd.Data = lazy.NewBalancer(bd.ClientConn, bd.BuildOptions, balancer.Get(pickfirstleaf.Name).Build)
},
ExitIdle: func(bd *stub.BalancerData) {
bd.Data.(balancer.ExitIdler).ExitIdle()
bd.Data.(balancer.Balancer).ExitIdle()
},
ResolverError: func(bd *stub.BalancerData, err error) {
bd.Data.(balancer.Balancer).ResolverError(err)
Expand Down
4 changes: 1 addition & 3 deletions balancer/leastrequest/leastrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ func (lrb *leastRequestBalancer) ResolverError(err error) {
}

func (lrb *leastRequestBalancer) ExitIdle() {
if ei, ok := lrb.child.(balancer.ExitIdler); ok { // Should always be ok, as child is endpoint sharding.
ei.ExitIdle()
}
lrb.child.ExitIdle()
}

func (lrb *leastRequestBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
Expand Down
4 changes: 1 addition & 3 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1593,9 +1593,7 @@ func (b *stateStoringBalancer) Close() {
}

func (b *stateStoringBalancer) ExitIdle() {
if ib, ok := b.Balancer.(balancer.ExitIdler); ok {
ib.ExitIdle()
}
b.Balancer.ExitIdle()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above? And more below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. Fixed throughout.

}

type stateStoringBalancerBuilder struct {
Expand Down
4 changes: 2 additions & 2 deletions balancer/ringhash/ringhash.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (b *ringhashBalancer) updatePickerLocked() {
sort.Slice(endpointStates, func(i, j int) bool {
return endpointStates[i].hashKey < endpointStates[j].hashKey
})
var idleBalancer balancer.ExitIdler
var idleBalancer endpointsharding.ExitIdler
for _, es := range endpointStates {
connState := es.state.ConnectivityState
if connState == connectivity.Connecting {
Expand Down Expand Up @@ -394,7 +394,7 @@ type endpointState struct {
// overridden, for example based on EDS endpoint metadata.
hashKey string
weight uint32
balancer balancer.ExitIdler
balancer endpointsharding.ExitIdler

// state is updated by the balancer while receiving resolver updates from
// the channel and picker updates from its children. Access to it is guarded
Expand Down
5 changes: 1 addition & 4 deletions balancer/roundrobin/roundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,5 @@ func (b *rrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
}

func (b *rrBalancer) ExitIdle() {
// Should always be ok, as child is endpoint sharding.
if ei, ok := b.Balancer.(balancer.ExitIdler); ok {
ei.ExitIdle()
}
b.Balancer.ExitIdle()
}
4 changes: 1 addition & 3 deletions balancer/weightedroundrobin/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,7 @@
}

func (b *wrrBalancer) ExitIdle() {
if ei, ok := b.child.(balancer.ExitIdler); ok { // Should always be ok, as child is endpoint sharding.
ei.ExitIdle()
}
b.child.ExitIdle()

Check warning on line 407 in balancer/weightedroundrobin/balancer.go

View check run for this annotation

Codecov / codecov/patch

balancer/weightedroundrobin/balancer.go#L407

Added line #L407 was not covered by tests
}

// picker is the WRR policy's picker. It uses live-updating backend weights to
Expand Down
4 changes: 4 additions & 0 deletions balancer/weightedtarget/weightedtarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ func getConfigKey(attr *attributes.Attributes) (string, bool) {
return name, ok
}

func (b *testConfigBalancer) ExitIdle() {
b.Balancer.ExitIdle()
}

func (b *testConfigBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
c, ok := s.BalancerConfig.(stringBalancerConfig)
if !ok {
Expand Down
8 changes: 8 additions & 0 deletions examples/features/orca/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (orcaLBBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) bala
// designed to run within.
type orcaLB struct {
cc balancer.ClientConn
sc balancer.SubConn
}

func (o *orcaLB) UpdateClientConnState(ccs balancer.ClientConnState) error {
Expand All @@ -112,6 +113,7 @@ func (o *orcaLB) UpdateClientConnState(ccs balancer.ClientConnState) error {
return fmt.Errorf("orcaLB: error creating SubConn: %v", err)
}
sc.Connect()
o.sc = sc

// Register a simple ORCA OOB listener on the SubConn. We request a 1
// second report interval, but in this example the server indicated the
Expand All @@ -124,6 +126,12 @@ func (o *orcaLB) UpdateClientConnState(ccs balancer.ClientConnState) error {

func (o *orcaLB) ResolverError(error) {}

func (o *orcaLB) ExitIdle() {
if o.sc != nil {
o.sc.Connect()
}
}

// TODO: unused; remove when no longer required.
func (o *orcaLB) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {}

Expand Down
10 changes: 1 addition & 9 deletions internal/balancer/gracefulswitch/gracefulswitch.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,7 @@ func (gsb *Balancer) ExitIdle() {
// There is no need to protect this read with a mutex, as the write to the
// Balancer field happens in SwitchTo, which completes before this can be
// called.
if ei, ok := balToUpdate.Balancer.(balancer.ExitIdler); ok {
ei.ExitIdle()
return
}
gsb.mu.Lock()
defer gsb.mu.Unlock()
for sc := range balToUpdate.subconns {
sc.Connect()
}
balToUpdate.Balancer.ExitIdle()
}

// updateSubConnState forwards the update to the appropriate child.
Expand Down
24 changes: 5 additions & 19 deletions internal/balancer/gracefulswitch/gracefulswitch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,9 +795,7 @@ func (s) TestInlineCallbackInBuild(t *testing.T) {
}
}

// TestExitIdle tests the ExitIdle operation on the Graceful Switch Balancer for
// both possible codepaths, one where the child implements ExitIdler interface
// and one where the child doesn't implement ExitIdler interface.
// TestExitIdle tests the ExitIdle operation on the Graceful Switch Balancer.
func (s) TestExitIdle(t *testing.T) {
_, gsb := setup(t)
// switch to a balancer that implements ExitIdle{} (will populate current).
Expand All @@ -811,22 +809,6 @@ func (s) TestExitIdle(t *testing.T) {
if err := currBal.waitForExitIdle(ctx); err != nil {
t.Fatal(err)
}

// switch to a balancer that doesn't implement ExitIdle{} (will populate
// pending).
gsb.SwitchTo(verifyBalancerBuilder{})
// call exitIdle concurrently with newSubConn to make sure there is not a
// data race.
done := make(chan struct{})
go func() {
gsb.ExitIdle()
close(done)
}()
pendBal := gsb.balancerPending.Balancer.(*verifyBalancer)
for i := 0; i < 10; i++ {
pendBal.newSubConn([]resolver.Address{}, balancer.NewSubConnOptions{})
}
<-done
}

const balancerName1 = "mock_balancer_1"
Expand Down Expand Up @@ -1010,6 +992,8 @@ func (vb *verifyBalancer) UpdateClientConnState(balancer.ClientConnState) error
return nil
}

func (vb *verifyBalancer) ExitIdle() {}

func (vb *verifyBalancer) ResolverError(error) {}

func (vb *verifyBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
Expand Down Expand Up @@ -1068,6 +1052,8 @@ func (bcb *buildCallbackBal) UpdateClientConnState(balancer.ClientConnState) err

func (bcb *buildCallbackBal) ResolverError(error) {}

func (bcb *buildCallbackBal) ExitIdle() {}

func (bcb *buildCallbackBal) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
panic(fmt.Sprintf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state))
}
Expand Down
3 changes: 3 additions & 0 deletions internal/balancer/nop/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,6 @@

// Close is a no-op.
func (b *bal) Close() {}

// ExitIdle is a no-op.
func (b *bal) ExitIdle() {}

Check warning on line 65 in internal/balancer/nop/nop.go

View check run for this annotation

Codecov / codecov/patch

internal/balancer/nop/nop.go#L65

Added line #L65 was not covered by tests
6 changes: 6 additions & 0 deletions interop/orcalb.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ type orcab struct {
report *v3orcapb.OrcaLoadReport
}

func (o *orcab) ExitIdle() {
if o.sc != nil {
o.sc.Connect()
}
}

func (o *orcab) UpdateClientConnState(s balancer.ClientConnState) error {
if o.sc != nil {
o.sc.UpdateAddresses(s.ResolverState.Addresses)
Expand Down
2 changes: 1 addition & 1 deletion test/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ func (s) TestSubConn_RegisterHealthListener(t *testing.T) {
return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs)
},
ExitIdle: func(bd *stub.BalancerData) {
bd.Data.(balancer.ExitIdler).ExitIdle()
bd.Data.(balancer.Balancer).ExitIdle()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a very common pattern. I wonder if it makes sense for the stub balancer stuff to support an embedded child balancer by default? (Not in this PR for sure.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened an issue: #8371

},
}

Expand Down
Loading
Loading