feat(io): Add Streaming File Input Component #514
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
This pull request introduces a
streaming_fileinput component designed for continuously reading from files with high reliability. It functions similarly totail -Fbut adds critical features for production environments, including crash recovery, seamless log rotation handling, and at-least-once delivery guarantees.The component monitors a target file for new data, persisting its read position to a state file. This allows it to resume from the exact last-acknowledged position after a restart. It is built to handle common operational scenarios like file rotation and truncation automatically, ensuring data is not lost or duplicated.
Key Features
AckFunc) mechanism. The read position is only advanced and checkpointed after a message has been successfully processed and acknowledged downstream.fsnotifyto react to file system events (writes, creates, renames) in real-time, avoiding inefficient polling.Notes for Reviewer
This is a large but self-contained feature. The core logic resides in
internal/impl/io/input_streaming_file.go.📝 Design Highlights
savePositionDurablefunction implements a robust save pattern: write to a temporary file,fsync()the file,rename()it to the final destination, and thenfsync()the parent directory. This ensures the state file is never left in a corrupted state, even if the process crashes mid-write.inode_unix.goandinode_other.gousing Go build tags. This provides the most reliable method for rotation detection on Unix while gracefully falling back to other heuristics on systems like Windows.generationcounter (atomic.Uint64) is incremented every time a file is rotated or truncated. The acknowledgment function captures the generation number at the time of message creation and will ignore any acks from a previous "generation." This is critical to prevent an ack from an old, rotated-away file from incorrectly advancing the byte offset of the new file.monitorFilegoroutine listens forfsnotifyevents and triggers data draining.sfi.buffer) to decouple file I/O from downstream consumption in theReadmethod.input_streaming_file_test.go) is included, covering the main success paths and edge cases, including position persistence, file rotation, file truncation, and concurrent reads.