Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ type http2Client struct {

initialWindowSize int32

bdpEst *bdpEstimator
bdpEst *bdpEstimator
outQuotaVersion uint64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move version into quota.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, don't. There should only be one version per connection, not one per stream-quota.


mu sync.Mutex // guard the following variables
state transportState // the state of underlying connection
Expand Down Expand Up @@ -699,9 +700,13 @@ func (t *http2Client) GracefulClose() error {
// if it improves the performance.
func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
r := bytes.NewBuffer(data)
var (
p []byte
oqv uint64
)
for {
var p []byte
if r.Len() > 0 {
oqv = atomic.LoadUint64(&t.outQuotaVersion)
if r.Len() > 0 || p != nil {
size := http2MaxFrameLen
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
Expand All @@ -719,7 +724,9 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
if tq < size {
size = tq
}
p = r.Next(size)
if p == nil {
p = r.Next(size)
}
ps := len(p)
if ps < sq {
// Overbooked stream quota. Return it back.
Expand Down Expand Up @@ -764,6 +771,18 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
return ContextErr(s.ctx.Err())
default:
}
if oqv != atomic.LoadUint64(&t.outQuotaVersion) {
// InitialWindowSize settings frame must have been received after we
// acquired send quota but before we got the writable channel.
// We must forsake this write.
t.sendQuotaPool.add(len(p))
s.sendQuotaPool.add(len(p))
if t.framer.adjustNumWriters(-1) == 0 {
t.controlBuf.put(&flushIO{})
}
t.writableChan <- 0
continue
}
if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 {
// Do a force flush iff this is last frame for the entire gRPC message
// and the caller is the only writer at this moment.
Expand All @@ -776,6 +795,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
t.notifyError(err)
return connectionErrorf(true, err, "transport: %v", err)
}
p = nil
if t.framer.adjustNumWriters(-1) == 0 {
t.framer.flushWrite()
}
Expand Down Expand Up @@ -1216,6 +1236,7 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
}
t.streamSendQuota = s.Val
t.mu.Unlock()
atomic.AddUint64(&t.outQuotaVersion, 1)
}
}
}
Expand Down
28 changes: 25 additions & 3 deletions transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ type http2Server struct {

initialWindowSize int32

bdpEst *bdpEstimator
bdpEst *bdpEstimator
outQuotaVersion uint64

mu sync.Mutex // guard the following
state transportState
Expand Down Expand Up @@ -828,10 +829,15 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
t.WriteHeader(s, nil)
}
r := bytes.NewBuffer(data)
var (
p []byte
oqv uint64
)
for {
if r.Len() == 0 {
if r.Len() == 0 && p == nil {
return nil
}
oqv = atomic.LoadUint64(&t.outQuotaVersion)
size := http2MaxFrameLen
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
Expand All @@ -849,7 +855,9 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
if tq < size {
size = tq
}
p := r.Next(size)
if p == nil {
p = r.Next(size)
}
ps := len(p)
if ps < sq {
// Overbooked stream quota. Return it back.
Expand Down Expand Up @@ -886,6 +894,18 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
return ContextErr(s.ctx.Err())
default:
}
if oqv != atomic.LoadUint64(&t.outQuotaVersion) {
// InitialWindowSize settings frame must have been received after we
// acquired send quota but before we got the writable channel.
// We must forsake this write.
t.sendQuotaPool.add(ps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to also return the stream quota?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, to keep things consistent. When the settings frame is received it adds to the current quota. Since we took something out of it earlier, we must add that back since we're not going to send anything.

s.sendQuotaPool.add(ps)
if t.framer.adjustNumWriters(-1) == 0 {
t.controlBuf.put(&flushIO{})
}
t.writableChan <- 0
continue
}
var forceFlush bool
if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last {
forceFlush = true
Expand All @@ -897,6 +917,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
t.Close()
return connectionErrorf(true, err, "transport: %v", err)
}
p = nil
if t.framer.adjustNumWriters(-1) == 0 {
t.framer.flushWrite()
}
Expand All @@ -914,6 +935,7 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota))
}
t.streamSendQuota = s.Val
atomic.AddUint64(&t.outQuotaVersion, 1)
}

}
Expand Down