-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Client should update keepalive parameters upon receiving GoAway with … #1169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,8 @@ import ( | |
"golang.org/x/net/context" | ||
|
||
"google.golang.org/grpc/credentials" | ||
"google.golang.org/grpc/keepalive" | ||
"google.golang.org/grpc/transport" | ||
) | ||
|
||
const tlsDir = "testdata/" | ||
|
@@ -306,3 +308,70 @@ func TestNonblockingDialWithEmptyBalancer(t *testing.T) { | |
<-dialDone | ||
cancel() | ||
} | ||
|
||
type testserver struct { | ||
tr transport.ServerTransport | ||
|
||
done chan struct{} | ||
} | ||
|
||
func (s *testserver) start(t *testing.T) string { | ||
lis, err := net.Listen("tcp", "localhost:0") | ||
if err != nil { | ||
t.Fatalf("Failed to listen. Error: %v", err) | ||
} | ||
go func() { | ||
defer func() { | ||
s.done <- struct{}{} | ||
}() | ||
conn, err := lis.Accept() | ||
if err != nil { | ||
t.Errorf("Server failed to accept. Error: %v", err) | ||
return | ||
} | ||
lis.Close() | ||
if s.tr, err = transport.NewServerTransport("http2", conn, &transport.ServerConfig{ | ||
KeepalivePolicy: keepalive.EnforcementPolicy{ | ||
MinTime: 100 * time.Millisecond, | ||
PermitWithoutStream: true, | ||
}, | ||
}); err != nil { | ||
conn.Close() | ||
t.Errorf("Failed to create server transport. Error: %v", err) | ||
return | ||
} | ||
go s.tr.HandleStreams(func(stream *transport.Stream) { | ||
}, func(ctx context.Context, method string) context.Context { | ||
return ctx | ||
}) | ||
}() | ||
return lis.Addr().String() | ||
} | ||
|
||
func (s *testserver) stop() { | ||
if s.tr != nil { | ||
s.tr.Close() | ||
} | ||
} | ||
|
||
func TestClientUpdatesParamsAfterGoAway(t *testing.T) { | ||
server := testserver{done: make(chan struct{}, 1)} | ||
addr := server.start(t) | ||
defer server.stop() | ||
cc, err := Dial(addr, WithBlock(), WithInsecure(), WithKeepaliveParams(keepalive.ClientParameters{ | ||
Time: 50 * time.Millisecond, | ||
Timeout: 1 * time.Millisecond, | ||
PermitWithoutStream: true, | ||
})) | ||
if err != nil { | ||
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err) | ||
} | ||
defer cc.Close() | ||
<-server.done | ||
time.Sleep(1 * time.Second) | ||
cc.mu.RLock() | ||
defer cc.mu.RUnlock() | ||
v := cc.mkp.Time | ||
if v < 100*time.Millisecond { | ||
t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 100ms", v) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -121,6 +121,9 @@ type http2Client struct { | |
goAwayID uint32 | ||
// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. | ||
prevGoAwayID uint32 | ||
// goAwayReason records the http2.ErrCode and debug data received with the | ||
// GoAway frame. | ||
goAwayReason GoAwayReason | ||
} | ||
|
||
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { | ||
|
@@ -917,13 +920,34 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { | |
t.mu.Unlock() | ||
return | ||
default: | ||
t.setGoAwayReason(f) | ||
} | ||
t.goAwayID = f.LastStreamID | ||
close(t.goAway) | ||
} | ||
t.mu.Unlock() | ||
} | ||
|
||
// setGoAwayReason sets the value of t.goAwayReason based | ||
// on the GoAway frame received. | ||
// It expects a lock on transport's mutext to be held by | ||
// the caller. | ||
func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) { | ||
t.goAwayReason = NoReason | ||
switch f.ErrCode { | ||
case http2.ErrCodeEnhanceYourCalm: | ||
if string(f.DebugData()) == "too_many_pings" { | ||
t.goAwayReason = TooManyPings | ||
} | ||
} | ||
} | ||
|
||
func (t *http2Client) GetGoAwayReason() GoAwayReason { | ||
t.mu.Lock() | ||
defer t.mu.Unlock() | ||
return t.goAwayReason | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make this atomic read? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Following offline discussion leaving as is. |
||
} | ||
|
||
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) { | ||
id := f.Header().StreamID | ||
incr := f.Increment | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the empty line here to indicate that mkp is protected by mu?