Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 76 additions & 27 deletions collector/body.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"bytes"
"errors"
"io"
"sync"
Expand Down Expand Up @@ -37,6 +38,64 @@ func NewBody(rc io.ReadCloser, limit int) *Body {
return b
}

// PreReadBody creates a new Body wrapper that immediately pre-reads data from the body.
// This ensures body content is captured even if the underlying connection is closed early.
// It returns a Body with an io.MultiReader that combines the pre-read buffer with the original reader.
func PreReadBody(rc io.ReadCloser, limit int) *Body {
if rc == nil {
return NewBody(rc, limit)
}

b := &Body{}

var preReadBuffer = new(bytes.Buffer)

// Pre-read up to limit bytes into our capture buffer
n, err := io.CopyN(preReadBuffer, rc, int64(limit)+1) // +1 to check for truncation

truncated := n > int64(limit)

if err == io.EOF {
// We've read everything (body was smaller than limit).
b.consumedOriginal = true
b.isFullyCaptured = !truncated
}

multiReader := io.MultiReader(preReadBuffer, rc)

// Wrap in a readCloser to maintain the Close capability
b.reader = &preReadBodyWrapper{
Reader: multiReader,
closer: rc,
}

// Set up the buffer with pre-read data but only up to the limit
preReadBytes := preReadBuffer.Bytes()
if len(preReadBytes) > limit {
preReadBytes = preReadBytes[:limit]
}
b.buffer = &LimitedBuffer{
Buffer: bytes.NewBuffer(preReadBytes),
limit: limit,
truncated: truncated,
}

return b
}

// preReadBodyWrapper wraps an io.Reader with Close functionality
type preReadBodyWrapper struct {
io.Reader
closer io.Closer
}

func (w *preReadBodyWrapper) Close() error {
if w.closer != nil {
return w.closer.Close()
}
return nil
}

func (b *Body) Read(p []byte) (n int, err error) {
b.mu.Lock()
defer b.mu.Unlock()
Expand All @@ -52,13 +111,18 @@ func (b *Body) Read(p []byte) (n int, err error) {
// Read from the original reader
n, err = b.reader.Read(p)

// Only write to buffer if it's not a preReadBodyWrapper
// (preReadBodyWrapper means we already captured the data in PreReadBody)
if n > 0 {
b.buffer.Write(p[:n])
if _, isPreRead := b.reader.(*preReadBodyWrapper); !isPreRead {
_, _ = b.buffer.Write(p[:n])
}
}

// If EOF, mark as fully consumed
if err == io.EOF {
b.consumedOriginal = true
b.isFullyCaptured = !b.buffer.IsTruncated()

// Remove original body
b.reader = nil
Expand All @@ -68,7 +132,6 @@ func (b *Body) Read(p []byte) (n int, err error) {
}

// Close closes the original body and finalizes the buffer.
// This will attempt to read any unread data from the original body up to the maximum size limit.
func (b *Body) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
Expand All @@ -81,39 +144,25 @@ func (b *Body) Close() error {
return nil
}

// Mark as closed before capturing remaining data to avoid potential recursive calls
// Mark as closed
b.closed = true

// Check state to determine if we need to read more data
fullyConsumed := b.consumedOriginal

// If the body wasn't fully read, read the rest of it into our buffer
if !fullyConsumed {
// Create a buffer for reading
buf := make([]byte, 32*1024) // 32KB chunks

// Try to read more data
for {
var n int
var readErr error
n, readErr = b.reader.Read(buf)

if n > 0 {
b.buffer.Write(buf[:n])
}
// For PreReadBody cases (identified by preReadBodyWrapper),
// the data is already captured, just close
if _, isPreRead := b.reader.(*preReadBodyWrapper); isPreRead {
return b.reader.Close()
}

if readErr != nil {
// We've read all we can
break
}
}
// For legacy NewBody usage (when not using PreReadBody),
// we still need to try to read remaining data
if !b.consumedOriginal {
_, _ = io.Copy(b.buffer, b.reader)
}

// Now close the original reader - its implementation should handle any cleanup
// Close the original reader
err := b.reader.Close()

if !b.buffer.IsTruncated() {
// Mark as fully captured
b.isFullyCaptured = true
}

Expand Down
187 changes: 187 additions & 0 deletions collector/body_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,190 @@ func TestBody_ReadAfterClose(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, collector.ErrBodyClosed, err)
}

// Test PreReadBody with small body that handler doesn't read
func TestPreReadBody_SmallBodyUnread(t *testing.T) {
data := "small test data"
reader := io.NopCloser(strings.NewReader(data))

body := collector.PreReadBody(reader, 1024) // limit > data size

// Close without reading
err := body.Close()
require.NoError(t, err)

// Captured data should be available
assert.Equal(t, data, body.String())
assert.Equal(t, []byte(data), body.Bytes())
assert.Equal(t, int64(len(data)), body.Size())
assert.True(t, body.IsFullyCaptured())
assert.False(t, body.IsTruncated())
}

// Test PreReadBody with small body that handler fully reads
func TestPreReadBody_SmallBodyRead(t *testing.T) {
data := "small test data for reading"
reader := io.NopCloser(strings.NewReader(data))

body := collector.PreReadBody(reader, 1024) // limit > data size

// Handler reads the entire body
handlerData, err := io.ReadAll(body)
require.NoError(t, err)
assert.Equal(t, data, string(handlerData))

// Close the body
err = body.Close()
require.NoError(t, err)

// Captured data should STILL be available after reading + closing
assert.Equal(t, data, body.String())
assert.Equal(t, []byte(data), body.Bytes())
assert.Equal(t, int64(len(data)), body.Size())
assert.True(t, body.IsFullyCaptured())
assert.False(t, body.IsTruncated())
}

// Test PreReadBody with large body that handler doesn't read
func TestPreReadBody_LargeBodyUnread(t *testing.T) {
data := strings.Repeat("x", 2000) // Large data
reader := io.NopCloser(strings.NewReader(data))

body := collector.PreReadBody(reader, 100) // limit < data size

// Close without reading
err := body.Close()
require.NoError(t, err)

// Only first 100 bytes should be captured
expectedCaptured := data[:100]
assert.Equal(t, expectedCaptured, body.String())
assert.Equal(t, []byte(expectedCaptured), body.Bytes())
assert.Equal(t, int64(100), body.Size())
assert.False(t, body.IsFullyCaptured())
assert.True(t, body.IsTruncated())
}

// Test PreReadBody with large body that handler fully reads - CRITICAL TEST
func TestPreReadBody_LargeBodyRead(t *testing.T) {
data := strings.Repeat("y", 2000) // Large data
reader := io.NopCloser(strings.NewReader(data))

body := collector.PreReadBody(reader, 100) // limit < data size

// Handler reads the entire body (pre-read + remaining)
handlerData, err := io.ReadAll(body)
require.NoError(t, err)
assert.Equal(t, data, string(handlerData)) // Handler should get full data

// Close the body
err = body.Close()
require.NoError(t, err)

// Captured portion should STILL be available after reading
expectedCaptured := data[:100]
assert.Equal(t, expectedCaptured, body.String())
assert.Equal(t, []byte(expectedCaptured), body.Bytes())
assert.Equal(t, int64(100), body.Size())
assert.False(t, body.IsFullyCaptured()) // Only partial capture
assert.True(t, body.IsTruncated())
}

// Test closing behavior - close without reading small body
func TestPreReadBody_CloseWithoutReading_SmallBody(t *testing.T) {
data := "test data"
reader := io.NopCloser(strings.NewReader(data))

body := collector.PreReadBody(reader, 1024)

// Close immediately without any reading
err := body.Close()
require.NoError(t, err)

// Captured data should be preserved
assert.Equal(t, data, body.String())
assert.True(t, body.IsFullyCaptured())
}

// Test closing behavior - close without reading large body
func TestPreReadBody_CloseWithoutReading_LargeBody(t *testing.T) {
data := strings.Repeat("z", 500)
reader := io.NopCloser(strings.NewReader(data))

body := collector.PreReadBody(reader, 100)

// Close immediately without any reading
err := body.Close()
require.NoError(t, err)

// Captured portion should be preserved
expectedCaptured := data[:100]
assert.Equal(t, expectedCaptured, body.String())
assert.True(t, body.IsTruncated())
}

// Test closing behavior - close after partial reading
func TestPreReadBody_CloseAfterPartialReading(t *testing.T) {
data := strings.Repeat("a", 500)
reader := io.NopCloser(strings.NewReader(data))

body := collector.PreReadBody(reader, 100)

// Handler reads only part of the body
buf := make([]byte, 50)
n, err := body.Read(buf)
require.NoError(t, err)
assert.Equal(t, 50, n)

// Close the body
err = body.Close()
require.NoError(t, err)

// Captured data should still be available
expectedCaptured := data[:100]
assert.Equal(t, expectedCaptured, body.String())
assert.True(t, body.IsTruncated())
}

// Test double close safety
func TestPreReadBody_DoubleClose(t *testing.T) {
data := "test data"
reader := io.NopCloser(strings.NewReader(data))

body := collector.PreReadBody(reader, 1024)

// First close
err := body.Close()
require.NoError(t, err)

// Second close should be safe
err = body.Close()
require.NoError(t, err)

// Captured data should remain available
assert.Equal(t, data, body.String())
}

// Test read after close
func TestPreReadBody_ReadAfterClose(t *testing.T) {
data := "test data"
reader := io.NopCloser(strings.NewReader(data))

body := collector.PreReadBody(reader, 1024)

// Close first
err := body.Close()
require.NoError(t, err)

// Try to read after close
buf := make([]byte, 10)
_, err = body.Read(buf)

// Should return ErrBodyClosed
assert.Error(t, err)
assert.Equal(t, collector.ErrBodyClosed, err)

// Captured data should still be accessible
assert.Equal(t, data, body.String())
assert.Equal(t, []byte(data), body.Bytes())
}
8 changes: 4 additions & 4 deletions collector/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func (t *httpClientTransport) RoundTrip(req *http.Request) (*http.Response, erro

// Capture request body if present and configured to do so
if req.Body != nil && t.collector.options.CaptureRequestBody {
// Wrap the body to capture it
body := NewBody(req.Body, t.collector.options.MaxBodySize)
// Pre-read the body to ensure capture
body := PreReadBody(req.Body, t.collector.options.MaxBodySize)

// Store the body in the request record
httpReq.RequestBody = body
Expand Down Expand Up @@ -187,8 +187,8 @@ func (t *httpClientTransport) RoundTrip(req *http.Request) (*http.Response, erro
// Create a copy of the response to read the body even if the client doesn't
originalRespBody := resp.Body

// Wrap the body to capture it
body := NewBody(originalRespBody, t.collector.options.MaxBodySize)
// Pre-read the body to ensure capture even if client doesn't read it
body := PreReadBody(originalRespBody, t.collector.options.MaxBodySize)

// Store the body in the request record
httpReq.ResponseBody = body
Expand Down
4 changes: 2 additions & 2 deletions collector/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func (c *HTTPServerCollector) Middleware(next http.Handler) http.Handler {
// Save the original body
originalBody := r.Body

// Create a body wrapper
requestBody = NewBody(originalBody, c.options.MaxBodySize)
// Pre-read the body to ensure capturing bodies even if the handler writes a large response (Go net/http will close the request body then)
requestBody = PreReadBody(originalBody, c.options.MaxBodySize)

// Replace the request body with our wrapper
r.Body = requestBody
Expand Down
Loading