Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
for _, opt := range opts {
opt(&cc.dopts)
}
cc.mkp = cc.dopts.copts.KeepaliveParams

grpcUA := "grpc-go/" + Version
if cc.dopts.copts.UserAgent != "" {
Expand Down Expand Up @@ -458,6 +459,9 @@ type ClientConn struct {
mu sync.RWMutex
sc ServiceConfig
conns map[Address]*addrConn

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the empty line here to indicate that mkp is protected by mu?

// Keepalive parameter can be udated if a GoAway is received.
mkp keepalive.ClientParameters
}

// lbWatcher watches the Notify channel of the balancer in cc and manages
Expand Down Expand Up @@ -533,6 +537,9 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
addr: addr,
dopts: cc.dopts,
}
cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = cc.mkp
cc.mu.RUnlock()
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
ac.stateCV = sync.NewCond(&ac.mu)
if EnableTracing {
Expand Down Expand Up @@ -714,6 +721,20 @@ type addrConn struct {
tearDownErr error
}

// adjustParams updates parameters used to create transports upon
// receiving a GoAway.
func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
switch r {
case transport.TooManyPings:
v := 2 * ac.dopts.copts.KeepaliveParams.Time
ac.cc.mu.Lock()
if v > ac.cc.mkp.Time {
ac.cc.mkp.Time = v
}
ac.cc.mu.Unlock()
}
}

// printf records an event in ac's event log, unless ac has been closed.
// REQUIRES ac.mu is held.
func (ac *addrConn) printf(format string, a ...interface{}) {
Expand Down Expand Up @@ -870,6 +891,7 @@ func (ac *addrConn) transportMonitor() {
}
return
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
// If GoAway happens without any network I/O error, ac is closed without shutting down the
// underlying transport (the transport will be closed when all the pending RPCs finished or
// failed.).
Expand All @@ -889,6 +911,7 @@ func (ac *addrConn) transportMonitor() {
t.Close()
return
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
return
default:
Expand Down
69 changes: 69 additions & 0 deletions clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
"golang.org/x/net/context"

"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/transport"
)

const tlsDir = "testdata/"
Expand Down Expand Up @@ -306,3 +308,70 @@ func TestNonblockingDialWithEmptyBalancer(t *testing.T) {
<-dialDone
cancel()
}

type testserver struct {
tr transport.ServerTransport
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just start a gRPC server here?

done chan struct{}
}

func (s *testserver) start(t *testing.T) string {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to listen. Error: %v", err)
}
go func() {
defer func() {
s.done <- struct{}{}
}()
conn, err := lis.Accept()
if err != nil {
t.Errorf("Server failed to accept. Error: %v", err)
return
}
lis.Close()
if s.tr, err = transport.NewServerTransport("http2", conn, &transport.ServerConfig{
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 100 * time.Millisecond,
PermitWithoutStream: true,
},
}); err != nil {
conn.Close()
t.Errorf("Failed to create server transport. Error: %v", err)
return
}
go s.tr.HandleStreams(func(stream *transport.Stream) {
}, func(ctx context.Context, method string) context.Context {
return ctx
})
}()
return lis.Addr().String()
}

func (s *testserver) stop() {
if s.tr != nil {
s.tr.Close()
}
}

func TestClientUpdatesParamsAfterGoAway(t *testing.T) {
server := testserver{done: make(chan struct{}, 1)}
addr := server.start(t)
defer server.stop()
cc, err := Dial(addr, WithBlock(), WithInsecure(), WithKeepaliveParams(keepalive.ClientParameters{
Time: 50 * time.Millisecond,
Timeout: 1 * time.Millisecond,
PermitWithoutStream: true,
}))
if err != nil {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}
defer cc.Close()
<-server.done
time.Sleep(1 * time.Second)
cc.mu.RLock()
defer cc.mu.RUnlock()
v := cc.mkp.Time
if v < 100*time.Millisecond {
t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 100ms", v)
}
}
24 changes: 24 additions & 0 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ type http2Client struct {
goAwayID uint32
// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
prevGoAwayID uint32
// goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame.
goAwayReason GoAwayReason
}

func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
Expand Down Expand Up @@ -917,13 +920,34 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
t.mu.Unlock()
return
default:
t.setGoAwayReason(f)
}
t.goAwayID = f.LastStreamID
close(t.goAway)
}
t.mu.Unlock()
}

// setGoAwayReason sets the value of t.goAwayReason based
// on the GoAway frame received.
// It expects a lock on transport's mutext to be held by
// the caller.
func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
t.goAwayReason = NoReason
switch f.ErrCode {
case http2.ErrCodeEnhanceYourCalm:
if string(f.DebugData()) == "too_many_pings" {
t.goAwayReason = TooManyPings
}
}
}

func (t *http2Client) GetGoAwayReason() GoAwayReason {
t.mu.Lock()
defer t.mu.Unlock()
return t.goAwayReason
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this atomic read?
The lock is probably even not necessary...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following offline discussion leaving as is.

}

func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
id := f.Header().StreamID
incr := f.Increment
Expand Down
16 changes: 16 additions & 0 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,9 @@ type ClientTransport interface {
// receives the draining signal from the server (e.g., GOAWAY frame in
// HTTP/2).
GoAway() <-chan struct{}

// GetGoAwayReason returns the reason why GoAway frame was received.
GetGoAwayReason() GoAwayReason
}

// ServerTransport is the common interface for all gRPC server-side transport
Expand Down Expand Up @@ -620,3 +623,16 @@ func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-
return i, nil
}
}

// GoAwayReason contains the reason for the GoAway frame received.
type GoAwayReason uint8

const (
// Invalid indicates that no GoAway frame is received.
Invalid GoAwayReason = 0
// NoReason is the default value when GoAway frame is received.
NoReason GoAwayReason = 1
// TooManyPings indicates that a GoAway frame with ErrCodeEnhanceYourCalm
// was recieved and that the debug data said "too_many_pings".
TooManyPings GoAwayReason = 2
)