Skip to content

Conversation

@aronchick
Copy link

Summary

This pull request introduces a streaming_file input component designed for continuously reading from files with high reliability. It functions similarly to tail -F but adds critical features for production environments, including crash recovery, seamless log rotation handling, and at-least-once delivery guarantees.

The component monitors a target file for new data, persisting its read position to a state file. This allows it to resume from the exact last-acknowledged position after a restart. It is built to handle common operational scenarios like file rotation and truncation automatically, ensuring data is not lost or duplicated.

Key Features

  • Persistent State: The reader's position (byte offset and inode) is durably persisted to a state directory, enabling seamless recovery from crashes or restarts.
  • Automatic File Rotation: Uses file inodes (on Unix systems) to reliably detect file rotations. It finishes reading the old file before automatically switching to the new one and resetting its position.
  • Truncation Handling: Detects when a file has been truncated (i.e., its size is smaller than the last read offset) and correctly resets its position to the beginning of the file.
  • At-Least-Once Semantics: Implements an acknowledgment (AckFunc) mechanism. The read position is only advanced and checkpointed after a message has been successfully processed and acknowledged downstream.
  • Efficient Monitoring: Uses fsnotify to react to file system events (writes, creates, renames) in real-time, avoiding inefficient polling.
  • Comprehensive Observability: Integrated with OpenTelemetry to provide key metrics, including lines read, bytes read, file rotations, and buffer saturation.
  • Graceful Shutdown: Ensures all in-flight messages are acknowledged before closing (with a configurable timeout) and performs a final state save to prevent data loss.

Notes for Reviewer

This is a large but self-contained feature. The core logic resides in internal/impl/io/input_streaming_file.go.

📝 Design Highlights

  1. Durable State Saving: The savePositionDurable function implements a robust save pattern: write to a temporary file, fsync() the file, rename() it to the final destination, and then fsync() the parent directory. This ensures the state file is never left in a corrupted state, even if the process crashes mid-write.
  2. Platform-Specific Inode Handling: Logic to retrieve a file's inode is split into inode_unix.go and inode_other.go using Go build tags. This provides the most reliable method for rotation detection on Unix while gracefully falling back to other heuristics on systems like Windows.
  3. Stale ACK Prevention: A generation counter (atomic.Uint64) is incremented every time a file is rotated or truncated. The acknowledgment function captures the generation number at the time of message creation and will ignore any acks from a previous "generation." This is critical to prevent an ack from an old, rotated-away file from incorrectly advancing the byte offset of the new file.
  4. Concurrency Model:
    • The primary monitorFile goroutine listens for fsnotify events and triggers data draining.
    • Data is read into a buffered channel (sfi.buffer) to decouple file I/O from downstream consumption in the Read method.
    • Atomic operations are used for high-frequency updates (metrics counters, offsets), while mutexes protect access to shared resources like the file handle and position struct.
  5. Testing: A comprehensive test suite (input_streaming_file_test.go) is included, covering the main success paths and edge cases, including position persistence, file rotation, file truncation, and concurrent reads.

- Add drainBufferChannel() method to clear stale buffered data when truncation is detected
- Increment generation counter to invalidate in-flight acks from old file
- Update tests to use event-driven approach with fsnotify
- Remove old IdleTimeout config fields from tests
- Add FileTruncation test to verify truncation detection and handling

This ensures that when a file is truncated, any buffered data from before the
truncation is discarded, and new data from the truncated file is read correctly.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant