-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Add and use connectivity package for states #1430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,14 +20,14 @@ package grpc | |
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"net" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"golang.org/x/net/context" | ||
"golang.org/x/net/trace" | ||
"google.golang.org/grpc/connectivity" | ||
"google.golang.org/grpc/credentials" | ||
"google.golang.org/grpc/grpclog" | ||
"google.golang.org/grpc/keepalive" | ||
|
@@ -445,39 +445,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * | |
return cc, nil | ||
} | ||
|
||
// ConnectivityState indicates the state of a client connection. | ||
type ConnectivityState int | ||
|
||
const ( | ||
// Idle indicates the ClientConn is idle. | ||
Idle ConnectivityState = iota | ||
// Connecting indicates the ClienConn is connecting. | ||
Connecting | ||
// Ready indicates the ClientConn is ready for work. | ||
Ready | ||
// TransientFailure indicates the ClientConn has seen a failure but expects to recover. | ||
TransientFailure | ||
// Shutdown indicates the ClientConn has started shutting down. | ||
Shutdown | ||
) | ||
|
||
func (s ConnectivityState) String() string { | ||
switch s { | ||
case Idle: | ||
return "IDLE" | ||
case Connecting: | ||
return "CONNECTING" | ||
case Ready: | ||
return "READY" | ||
case TransientFailure: | ||
return "TRANSIENT_FAILURE" | ||
case Shutdown: | ||
return "SHUTDOWN" | ||
default: | ||
panic(fmt.Sprintf("unknown connectivity state: %d", s)) | ||
} | ||
} | ||
|
||
// connectivityStateEvaluator gets updated by addrConns when their | ||
// states transition, based on which it evaluates the state of | ||
// ClientConn. | ||
|
@@ -492,55 +459,55 @@ type connectivityStateEvaluator struct { | |
|
||
// recordTransition records state change happening in every addrConn and based on | ||
// that it evaluates what state the ClientConn is in. | ||
// It can only transition between Ready, Connecting and TransientFailure. Other states, | ||
// Idle and Shutdown are transitioned into by ClientConn; in the begining of the connection | ||
// It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states, | ||
// Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection | ||
// before any addrConn is created ClientConn is in idle state. In the end when ClientConn | ||
// closes it is in Shutdown state. | ||
// closes it is in connectivity.Shutdown state. | ||
// TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state. | ||
func (cse *connectivityStateEvaluator) recordTransition(oldState, newState ConnectivityState) { | ||
func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) { | ||
cse.mu.Lock() | ||
defer cse.mu.Unlock() | ||
|
||
// Update counters. | ||
for idx, state := range []ConnectivityState{oldState, newState} { | ||
for idx, state := range []connectivity.State{oldState, newState} { | ||
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. | ||
switch state { | ||
case Ready: | ||
case connectivity.Ready: | ||
cse.numReady += updateVal | ||
case Connecting: | ||
case connectivity.Connecting: | ||
cse.numConnecting += updateVal | ||
case TransientFailure: | ||
case connectivity.TransientFailure: | ||
cse.numTransientFailure += updateVal | ||
} | ||
} | ||
|
||
// Evaluate. | ||
if cse.numReady > 0 { | ||
cse.csMgr.updateState(Ready) | ||
cse.csMgr.updateState(connectivity.Ready) | ||
return | ||
} | ||
if cse.numConnecting > 0 { | ||
cse.csMgr.updateState(Connecting) | ||
cse.csMgr.updateState(connectivity.Connecting) | ||
return | ||
} | ||
cse.csMgr.updateState(TransientFailure) | ||
cse.csMgr.updateState(connectivity.TransientFailure) | ||
} | ||
|
||
// connectivityStateManager keeps the ConnectivityState of ClientConn. | ||
// connectivityStateManager keeps the connectivity.State of ClientConn. | ||
// This struct will eventually be exported so the balancers can access it. | ||
type connectivityStateManager struct { | ||
mu sync.Mutex | ||
state ConnectivityState | ||
state connectivity.State | ||
notifyChan chan struct{} | ||
} | ||
|
||
// updateState updates the ConnectivityState of ClientConn. | ||
// updateState updates the connectivity.State of ClientConn. | ||
// If there's a change it notifies goroutines waiting on state change to | ||
// happen. | ||
func (csm *connectivityStateManager) updateState(state ConnectivityState) { | ||
func (csm *connectivityStateManager) updateState(state connectivity.State) { | ||
csm.mu.Lock() | ||
defer csm.mu.Unlock() | ||
if csm.state == Shutdown { | ||
if csm.state == connectivity.Shutdown { | ||
return | ||
} | ||
if csm.state == state { | ||
|
@@ -554,7 +521,7 @@ func (csm *connectivityStateManager) updateState(state ConnectivityState) { | |
} | ||
} | ||
|
||
func (csm *connectivityStateManager) getState() ConnectivityState { | ||
func (csm *connectivityStateManager) getState() connectivity.State { | ||
csm.mu.Lock() | ||
defer csm.mu.Unlock() | ||
return csm.state | ||
|
@@ -587,9 +554,10 @@ type ClientConn struct { | |
mkp keepalive.ClientParameters | ||
} | ||
|
||
// WaitForStateChange waits until the ConnectivityState of ClientConn changes from sourceState or | ||
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or | ||
// ctx expires. A true value is returned in former case and false in latter. | ||
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) bool { | ||
// This is an EXPERIMENTAL API. | ||
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { | ||
ch := cc.csMgr.getNotifyChan() | ||
if cc.csMgr.getState() != sourceState { | ||
return true | ||
|
@@ -602,8 +570,9 @@ func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState Connec | |
} | ||
} | ||
|
||
// GetState returns the ConnectivityState of ClientConn. | ||
func (cc *ClientConn) GetState() ConnectivityState { | ||
// GetState returns the connectivity.State of ClientConn. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
// This is an EXPERIMENTAL API. | ||
func (cc *ClientConn) GetState() connectivity.State { | ||
return cc.csMgr.getState() | ||
} | ||
|
||
|
@@ -855,7 +824,7 @@ func (cc *ClientConn) Close() error { | |
} | ||
conns := cc.conns | ||
cc.conns = nil | ||
cc.csMgr.updateState(Shutdown) | ||
cc.csMgr.updateState(connectivity.Shutdown) | ||
cc.mu.Unlock() | ||
if cc.dopts.balancer != nil { | ||
cc.dopts.balancer.Close() | ||
|
@@ -879,7 +848,7 @@ type addrConn struct { | |
csEvltr *connectivityStateEvaluator | ||
|
||
mu sync.Mutex | ||
state ConnectivityState | ||
state connectivity.State | ||
down func(error) // the handler called when a connection is down. | ||
// ready is closed and becomes nil when a new transport is up or failed | ||
// due to timeout. | ||
|
@@ -926,7 +895,7 @@ func (ac *addrConn) errorf(format string, a ...interface{}) { | |
// - otherwise, it will be closed. | ||
func (ac *addrConn) resetTransport(drain bool) error { | ||
ac.mu.Lock() | ||
if ac.state == Shutdown { | ||
if ac.state == connectivity.Shutdown { | ||
ac.mu.Unlock() | ||
return errConnClosing | ||
} | ||
|
@@ -936,7 +905,7 @@ func (ac *addrConn) resetTransport(drain bool) error { | |
ac.down = nil | ||
} | ||
oldState := ac.state | ||
ac.state = Connecting | ||
ac.state = connectivity.Connecting | ||
ac.csEvltr.recordTransition(oldState, ac.state) | ||
t := ac.transport | ||
ac.transport = nil | ||
|
@@ -949,7 +918,7 @@ func (ac *addrConn) resetTransport(drain bool) error { | |
ac.cc.mu.RUnlock() | ||
for retries := 0; ; retries++ { | ||
ac.mu.Lock() | ||
if ac.state == Shutdown { | ||
if ac.state == connectivity.Shutdown { | ||
// ac.tearDown(...) has been invoked. | ||
ac.mu.Unlock() | ||
return errConnClosing | ||
|
@@ -977,14 +946,14 @@ func (ac *addrConn) resetTransport(drain bool) error { | |
} | ||
grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr) | ||
ac.mu.Lock() | ||
if ac.state == Shutdown { | ||
if ac.state == connectivity.Shutdown { | ||
// ac.tearDown(...) has been invoked. | ||
ac.mu.Unlock() | ||
return errConnClosing | ||
} | ||
ac.errorf("transient failure: %v", err) | ||
oldState = ac.state | ||
ac.state = TransientFailure | ||
ac.state = connectivity.TransientFailure | ||
ac.csEvltr.recordTransition(oldState, ac.state) | ||
if ac.ready != nil { | ||
close(ac.ready) | ||
|
@@ -1003,14 +972,14 @@ func (ac *addrConn) resetTransport(drain bool) error { | |
} | ||
ac.mu.Lock() | ||
ac.printf("ready") | ||
if ac.state == Shutdown { | ||
if ac.state == connectivity.Shutdown { | ||
// ac.tearDown(...) has been invoked. | ||
ac.mu.Unlock() | ||
newTransport.Close() | ||
return errConnClosing | ||
} | ||
oldState = ac.state | ||
ac.state = Ready | ||
ac.state = connectivity.Ready | ||
ac.csEvltr.recordTransition(oldState, ac.state) | ||
ac.transport = newTransport | ||
if ac.ready != nil { | ||
|
@@ -1081,13 +1050,13 @@ func (ac *addrConn) transportMonitor() { | |
default: | ||
} | ||
ac.mu.Lock() | ||
if ac.state == Shutdown { | ||
if ac.state == connectivity.Shutdown { | ||
// ac has been shutdown. | ||
ac.mu.Unlock() | ||
return | ||
} | ||
oldState := ac.state | ||
ac.state = TransientFailure | ||
ac.state = connectivity.TransientFailure | ||
ac.csEvltr.recordTransition(oldState, ac.state) | ||
ac.mu.Unlock() | ||
if err := ac.resetTransport(false); err != nil { | ||
|
@@ -1107,12 +1076,12 @@ func (ac *addrConn) transportMonitor() { | |
} | ||
|
||
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or | ||
// iv) transport is in TransientFailure and there is a balancer/failfast is true. | ||
// iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true. | ||
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) { | ||
for { | ||
ac.mu.Lock() | ||
switch { | ||
case ac.state == Shutdown: | ||
case ac.state == connectivity.Shutdown: | ||
if failfast || !hasBalancer { | ||
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr. | ||
err := ac.tearDownErr | ||
|
@@ -1121,11 +1090,11 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans | |
} | ||
ac.mu.Unlock() | ||
return nil, errConnClosing | ||
case ac.state == Ready: | ||
case ac.state == connectivity.Ready: | ||
ct := ac.transport | ||
ac.mu.Unlock() | ||
return ct, nil | ||
case ac.state == TransientFailure: | ||
case ac.state == connectivity.TransientFailure: | ||
if failfast || hasBalancer { | ||
ac.mu.Unlock() | ||
return nil, errConnUnavailable | ||
|
@@ -1167,11 +1136,11 @@ func (ac *addrConn) tearDown(err error) { | |
// address removal and GoAway. | ||
ac.transport.GracefulClose() | ||
} | ||
if ac.state == Shutdown { | ||
if ac.state == connectivity.Shutdown { | ||
return | ||
} | ||
oldState := ac.state | ||
ac.state = Shutdown | ||
ac.state = connectivity.Shutdown | ||
ac.tearDownErr = err | ||
ac.csEvltr.recordTransition(oldState, ac.state) | ||
if ac.events != nil { | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeesh, we didn't write the "experimental" disclaimer here. Can you add it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.