-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Validate send quota again after acquiring writable channel #1367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -99,7 +99,8 @@ type http2Server struct { | |
|
||
initialWindowSize int32 | ||
|
||
bdpEst *bdpEstimator | ||
bdpEst *bdpEstimator | ||
outQuotaVersion uint64 | ||
|
||
mu sync.Mutex // guard the following | ||
state transportState | ||
|
@@ -828,10 +829,15 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { | |
t.WriteHeader(s, nil) | ||
} | ||
r := bytes.NewBuffer(data) | ||
var ( | ||
p []byte | ||
oqv uint64 | ||
) | ||
for { | ||
if r.Len() == 0 { | ||
if r.Len() == 0 && p == nil { | ||
return nil | ||
} | ||
oqv = atomic.LoadUint64(&t.outQuotaVersion) | ||
size := http2MaxFrameLen | ||
// Wait until the stream has some quota to send the data. | ||
sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire()) | ||
|
@@ -849,7 +855,9 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { | |
if tq < size { | ||
size = tq | ||
} | ||
p := r.Next(size) | ||
if p == nil { | ||
p = r.Next(size) | ||
} | ||
ps := len(p) | ||
if ps < sq { | ||
// Overbooked stream quota. Return it back. | ||
|
@@ -886,6 +894,18 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { | |
return ContextErr(s.ctx.Err()) | ||
default: | ||
} | ||
if oqv != atomic.LoadUint64(&t.outQuotaVersion) { | ||
// InitialWindowSize settings frame must have been received after we | ||
// acquired send quota but before we got the writable channel. | ||
// We must forsake this write. | ||
t.sendQuotaPool.add(ps) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to also return the stream quota? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, to keep things consistent. When the settings frame is received it adds to the current quota. Since we took something out of it earlier, we must add that back since we're not going to send anything. |
||
s.sendQuotaPool.add(ps) | ||
if t.framer.adjustNumWriters(-1) == 0 { | ||
t.controlBuf.put(&flushIO{}) | ||
} | ||
t.writableChan <- 0 | ||
continue | ||
} | ||
var forceFlush bool | ||
if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last { | ||
forceFlush = true | ||
|
@@ -897,6 +917,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { | |
t.Close() | ||
return connectionErrorf(true, err, "transport: %v", err) | ||
} | ||
p = nil | ||
if t.framer.adjustNumWriters(-1) == 0 { | ||
t.framer.flushWrite() | ||
} | ||
|
@@ -914,6 +935,7 @@ func (t *http2Server) applySettings(ss []http2.Setting) { | |
stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota)) | ||
} | ||
t.streamSendQuota = s.Val | ||
atomic.AddUint64(&t.outQuotaVersion, 1) | ||
} | ||
|
||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move version into quota.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, don't. There should only be one version per connection, not one per stream-quota.