Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 41 additions & 72 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ package grpc

import (
"errors"
"fmt"
"net"
"strings"
"sync"
"time"

"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -445,39 +445,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
return cc, nil
}

// ConnectivityState indicates the state of a client connection.
type ConnectivityState int

const (
// Idle indicates the ClientConn is idle.
Idle ConnectivityState = iota
// Connecting indicates the ClienConn is connecting.
Connecting
// Ready indicates the ClientConn is ready for work.
Ready
// TransientFailure indicates the ClientConn has seen a failure but expects to recover.
TransientFailure
// Shutdown indicates the ClientConn has started shutting down.
Shutdown
)

func (s ConnectivityState) String() string {
switch s {
case Idle:
return "IDLE"
case Connecting:
return "CONNECTING"
case Ready:
return "READY"
case TransientFailure:
return "TRANSIENT_FAILURE"
case Shutdown:
return "SHUTDOWN"
default:
panic(fmt.Sprintf("unknown connectivity state: %d", s))
}
}

// connectivityStateEvaluator gets updated by addrConns when their
// states transition, based on which it evaluates the state of
// ClientConn.
Expand All @@ -492,55 +459,55 @@ type connectivityStateEvaluator struct {

// recordTransition records state change happening in every addrConn and based on
// that it evaluates what state the ClientConn is in.
// It can only transition between Ready, Connecting and TransientFailure. Other states,
// Idle and Shutdown are transitioned into by ClientConn; in the begining of the connection
// It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states,
// Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection
// before any addrConn is created ClientConn is in idle state. In the end when ClientConn
// closes it is in Shutdown state.
// closes it is in connectivity.Shutdown state.
// TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
func (cse *connectivityStateEvaluator) recordTransition(oldState, newState ConnectivityState) {
func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) {
cse.mu.Lock()
defer cse.mu.Unlock()

// Update counters.
for idx, state := range []ConnectivityState{oldState, newState} {
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
switch state {
case Ready:
case connectivity.Ready:
cse.numReady += updateVal
case Connecting:
case connectivity.Connecting:
cse.numConnecting += updateVal
case TransientFailure:
case connectivity.TransientFailure:
cse.numTransientFailure += updateVal
}
}

// Evaluate.
if cse.numReady > 0 {
cse.csMgr.updateState(Ready)
cse.csMgr.updateState(connectivity.Ready)
return
}
if cse.numConnecting > 0 {
cse.csMgr.updateState(Connecting)
cse.csMgr.updateState(connectivity.Connecting)
return
}
cse.csMgr.updateState(TransientFailure)
cse.csMgr.updateState(connectivity.TransientFailure)
}

// connectivityStateManager keeps the ConnectivityState of ClientConn.
// connectivityStateManager keeps the connectivity.State of ClientConn.
// This struct will eventually be exported so the balancers can access it.
type connectivityStateManager struct {
mu sync.Mutex
state ConnectivityState
state connectivity.State
notifyChan chan struct{}
}

// updateState updates the ConnectivityState of ClientConn.
// updateState updates the connectivity.State of ClientConn.
// If there's a change it notifies goroutines waiting on state change to
// happen.
func (csm *connectivityStateManager) updateState(state ConnectivityState) {
func (csm *connectivityStateManager) updateState(state connectivity.State) {
csm.mu.Lock()
defer csm.mu.Unlock()
if csm.state == Shutdown {
if csm.state == connectivity.Shutdown {
return
}
if csm.state == state {
Expand All @@ -554,7 +521,7 @@ func (csm *connectivityStateManager) updateState(state ConnectivityState) {
}
}

func (csm *connectivityStateManager) getState() ConnectivityState {
func (csm *connectivityStateManager) getState() connectivity.State {
csm.mu.Lock()
defer csm.mu.Unlock()
return csm.state
Expand Down Expand Up @@ -587,9 +554,10 @@ type ClientConn struct {
mkp keepalive.ClientParameters
}

// WaitForStateChange waits until the ConnectivityState of ClientConn changes from sourceState or
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
// ctx expires. A true value is returned in former case and false in latter.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeesh, we didn't write the "experimental" disclaimer here. Can you add it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) bool {
// This is an EXPERIMENTAL API.
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
ch := cc.csMgr.getNotifyChan()
if cc.csMgr.getState() != sourceState {
return true
Expand All @@ -602,8 +570,9 @@ func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState Connec
}
}

// GetState returns the ConnectivityState of ClientConn.
func (cc *ClientConn) GetState() ConnectivityState {
// GetState returns the connectivity.State of ClientConn.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// This is an EXPERIMENTAL API.
func (cc *ClientConn) GetState() connectivity.State {
return cc.csMgr.getState()
}

Expand Down Expand Up @@ -855,7 +824,7 @@ func (cc *ClientConn) Close() error {
}
conns := cc.conns
cc.conns = nil
cc.csMgr.updateState(Shutdown)
cc.csMgr.updateState(connectivity.Shutdown)
cc.mu.Unlock()
if cc.dopts.balancer != nil {
cc.dopts.balancer.Close()
Expand All @@ -879,7 +848,7 @@ type addrConn struct {
csEvltr *connectivityStateEvaluator

mu sync.Mutex
state ConnectivityState
state connectivity.State
down func(error) // the handler called when a connection is down.
// ready is closed and becomes nil when a new transport is up or failed
// due to timeout.
Expand Down Expand Up @@ -926,7 +895,7 @@ func (ac *addrConn) errorf(format string, a ...interface{}) {
// - otherwise, it will be closed.
func (ac *addrConn) resetTransport(drain bool) error {
ac.mu.Lock()
if ac.state == Shutdown {
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
Expand All @@ -936,7 +905,7 @@ func (ac *addrConn) resetTransport(drain bool) error {
ac.down = nil
}
oldState := ac.state
ac.state = Connecting
ac.state = connectivity.Connecting
ac.csEvltr.recordTransition(oldState, ac.state)
t := ac.transport
ac.transport = nil
Expand All @@ -949,7 +918,7 @@ func (ac *addrConn) resetTransport(drain bool) error {
ac.cc.mu.RUnlock()
for retries := 0; ; retries++ {
ac.mu.Lock()
if ac.state == Shutdown {
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
Expand Down Expand Up @@ -977,14 +946,14 @@ func (ac *addrConn) resetTransport(drain bool) error {
}
grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
ac.mu.Lock()
if ac.state == Shutdown {
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
}
ac.errorf("transient failure: %v", err)
oldState = ac.state
ac.state = TransientFailure
ac.state = connectivity.TransientFailure
ac.csEvltr.recordTransition(oldState, ac.state)
if ac.ready != nil {
close(ac.ready)
Expand All @@ -1003,14 +972,14 @@ func (ac *addrConn) resetTransport(drain bool) error {
}
ac.mu.Lock()
ac.printf("ready")
if ac.state == Shutdown {
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
newTransport.Close()
return errConnClosing
}
oldState = ac.state
ac.state = Ready
ac.state = connectivity.Ready
ac.csEvltr.recordTransition(oldState, ac.state)
ac.transport = newTransport
if ac.ready != nil {
Expand Down Expand Up @@ -1081,13 +1050,13 @@ func (ac *addrConn) transportMonitor() {
default:
}
ac.mu.Lock()
if ac.state == Shutdown {
if ac.state == connectivity.Shutdown {
// ac has been shutdown.
ac.mu.Unlock()
return
}
oldState := ac.state
ac.state = TransientFailure
ac.state = connectivity.TransientFailure
ac.csEvltr.recordTransition(oldState, ac.state)
ac.mu.Unlock()
if err := ac.resetTransport(false); err != nil {
Expand All @@ -1107,12 +1076,12 @@ func (ac *addrConn) transportMonitor() {
}

// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
// iv) transport is in TransientFailure and there is a balancer/failfast is true.
// iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true.
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
for {
ac.mu.Lock()
switch {
case ac.state == Shutdown:
case ac.state == connectivity.Shutdown:
if failfast || !hasBalancer {
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
err := ac.tearDownErr
Expand All @@ -1121,11 +1090,11 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans
}
ac.mu.Unlock()
return nil, errConnClosing
case ac.state == Ready:
case ac.state == connectivity.Ready:
ct := ac.transport
ac.mu.Unlock()
return ct, nil
case ac.state == TransientFailure:
case ac.state == connectivity.TransientFailure:
if failfast || hasBalancer {
ac.mu.Unlock()
return nil, errConnUnavailable
Expand Down Expand Up @@ -1167,11 +1136,11 @@ func (ac *addrConn) tearDown(err error) {
// address removal and GoAway.
ac.transport.GracefulClose()
}
if ac.state == Shutdown {
if ac.state == connectivity.Shutdown {
return
}
oldState := ac.state
ac.state = Shutdown
ac.state = connectivity.Shutdown
ac.tearDownErr = err
ac.csEvltr.recordTransition(oldState, ac.state)
if ac.events != nil {
Expand Down
11 changes: 6 additions & 5 deletions clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ import (

"golang.org/x/net/context"

"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/naming"
"google.golang.org/grpc/testdata"
)

func assertState(wantState ConnectivityState, cc *ClientConn) (ConnectivityState, bool) {
func assertState(wantState connectivity.State, cc *ClientConn) (connectivity.State, bool) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
var state ConnectivityState
var state connectivity.State
for state = cc.GetState(); state != wantState && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
}
return state, state == wantState
Expand All @@ -54,7 +55,7 @@ func TestConnectivityStates(t *testing.T) {
t.Fatalf("Dial(\"foo.bar.com\", WithBalancer(_)) = _, %v, want _ <nil>", err)
}
defer cc.Close()
wantState := Ready
wantState := connectivity.Ready
if state, ok := assertState(wantState, cc); !ok {
t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
}
Expand All @@ -66,7 +67,7 @@ func TestConnectivityStates(t *testing.T) {
},
}
resolver.w.inject(update)
wantState = TransientFailure
wantState = connectivity.TransientFailure
if state, ok := assertState(wantState, cc); !ok {
t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
}
Expand All @@ -75,7 +76,7 @@ func TestConnectivityStates(t *testing.T) {
Addr: "localhost:" + servers[1].port,
}
resolver.w.inject(update)
wantState = Ready
wantState = connectivity.Ready
if state, ok := assertState(wantState, cc); !ok {
t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
}
Expand Down
Loading