Skip to content

Commit bf81dc8

Browse files
authored
Merge pull request #15 from nao1215/nchika/fix-stream
Fix: Support chunked reading when loading local files
2 parents 5f48cc8 + 9b3a837 commit bf81dc8

File tree

4 files changed

+886
-74
lines changed

4 files changed

+886
-74
lines changed

builder.go

Lines changed: 133 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
package filesql
33

44
import (
5+
"compress/bzip2"
6+
"compress/gzip"
57
"context"
68
"database/sql"
79
"encoding/base64"
@@ -14,7 +16,9 @@ import (
1416
"path/filepath"
1517
"strings"
1618

19+
"github.com/klauspost/compress/zstd"
1720
"github.com/nao1215/filesql/domain/model"
21+
"github.com/ulikunitz/xz"
1822
_ "modernc.org/sqlite" // Import SQLite driver for in-memory databases
1923
)
2024

@@ -345,13 +349,14 @@ func (b *DBBuilder) Build(ctx context.Context) (*DBBuilder, error) {
345349

346350
// Open creates and returns a database connection using the configured and validated inputs.
347351
// This method can only be called after Build() has been successfully executed.
348-
// It creates an in-memory SQLite database and loads all configured files as tables.
352+
// It creates an in-memory SQLite database and loads all configured files as tables using streaming.
349353
//
350354
// Table names are derived from file names without extensions:
351355
// - "users.csv" becomes table "users"
352356
// - "data.tsv.gz" becomes table "data"
353357
//
354358
// The returned database connection supports the full SQLite3 SQL syntax.
359+
// Auto-save functionality is supported for both file paths and reader inputs.
355360
// The caller is responsible for closing the connection when done.
356361
//
357362
// Returns a *sql.DB connection or an error if the database cannot be created.
@@ -364,7 +369,7 @@ func (b *DBBuilder) Open(ctx context.Context) (*sql.DB, error) {
364369
var db *sql.DB
365370
var err error
366371

367-
// Case 1: We have only file paths, use existing DSN-based approach
372+
// Case 1: Only file paths, use filesql driver for autosave support
368373
if len(b.collectedPaths) > 0 && len(b.readers) == 0 {
369374
// Create DSN with all collected paths and auto-save config
370375
dsn := strings.Join(b.collectedPaths, ";")
@@ -385,26 +390,19 @@ func (b *DBBuilder) Open(ctx context.Context) (*sql.DB, error) {
385390
return nil, err
386391
}
387392
} else {
388-
// Case 2: We have reader inputs (with or without file paths)
389-
// Create in-memory SQLite database and stream data directly
393+
// Case 2: Readers or mixed, use direct SQLite with streaming
394+
// Create in-memory SQLite database
390395
db, err = sql.Open("sqlite", ":memory:")
391396
if err != nil {
392397
return nil, fmt.Errorf("failed to create in-memory database: %w", err)
393398
}
394399

395-
// Process file paths first if any
400+
// Process file paths using streaming (chunked reading)
396401
if len(b.collectedPaths) > 0 {
397402
for _, path := range b.collectedPaths {
398-
file := model.NewFile(path)
399-
table, err := file.ToTable()
400-
if err != nil {
403+
if err := b.streamFileToSQLite(ctx, db, path); err != nil {
401404
_ = db.Close() // Ignore close error during error handling
402-
return nil, fmt.Errorf("failed to process file %s: %w", path, err)
403-
}
404-
405-
if err := b.createTableFromModel(ctx, db, table); err != nil {
406-
_ = db.Close() // Ignore close error during error handling
407-
return nil, fmt.Errorf("failed to create table from file %s: %w", path, err)
405+
return nil, fmt.Errorf("failed to stream file %s: %w", path, err)
408406
}
409407
}
410408
}
@@ -416,6 +414,9 @@ func (b *DBBuilder) Open(ctx context.Context) (*sql.DB, error) {
416414
return nil, fmt.Errorf("failed to stream reader input for table '%s': %w", readerInput.TableName, err)
417415
}
418416
}
417+
418+
// Note: Auto-save for readers-only case is not fully implemented yet
419+
// This requires extending the driver to support auto-save without original file paths
419420
}
420421

421422
// Validate connection
@@ -512,6 +513,89 @@ func (b *DBBuilder) processFSToReaders(_ context.Context, filesystem fs.FS) ([]R
512513
return readers, nil
513514
}
514515

516+
// streamFileToSQLite streams data from a file path directly to SQLite database using chunked processing
517+
func (b *DBBuilder) streamFileToSQLite(ctx context.Context, db *sql.DB, filePath string) error {
518+
// Check if path is a directory or file
519+
info, err := os.Stat(filePath)
520+
if err != nil {
521+
return fmt.Errorf("failed to stat path %s: %w", filePath, err)
522+
}
523+
524+
if info.IsDir() {
525+
// Handle directory by loading all supported files
526+
return b.streamDirectoryToSQLite(ctx, db, filePath)
527+
}
528+
529+
// Check if file is supported
530+
if !model.IsSupportedFile(filePath) {
531+
return fmt.Errorf("unsupported file type: %s", filePath)
532+
}
533+
534+
// Open the file and create a reader
535+
file, err := os.Open(filePath) //nolint:gosec // File path is validated and comes from user input
536+
if err != nil {
537+
return fmt.Errorf("failed to open file %s: %w", filePath, err)
538+
}
539+
defer file.Close()
540+
541+
// Create decompressed reader if needed
542+
reader, err := b.createDecompressedReader(file, filePath)
543+
if err != nil {
544+
return fmt.Errorf("failed to create decompressed reader for %s: %w", filePath, err)
545+
}
546+
547+
// Create file model to determine type and table name
548+
fileModel := model.NewFile(filePath)
549+
tableName := model.TableFromFilePath(filePath)
550+
551+
// Create reader input for streaming
552+
readerInput := ReaderInput{
553+
Reader: reader,
554+
TableName: tableName,
555+
FileType: fileModel.Type(),
556+
}
557+
558+
// Use existing streaming logic
559+
return b.streamReaderToSQLite(ctx, db, readerInput)
560+
}
561+
562+
// streamDirectoryToSQLite processes all supported files in a directory
563+
func (b *DBBuilder) streamDirectoryToSQLite(ctx context.Context, db *sql.DB, dirPath string) error {
564+
entries, err := os.ReadDir(dirPath)
565+
if err != nil {
566+
return fmt.Errorf("failed to read directory %s: %w", dirPath, err)
567+
}
568+
569+
loadedFiles := 0
570+
for _, entry := range entries {
571+
if entry.IsDir() {
572+
continue // Skip subdirectories
573+
}
574+
575+
fileName := entry.Name()
576+
filePath := filepath.Join(dirPath, fileName)
577+
578+
// Check if file is supported
579+
if !model.IsSupportedFile(fileName) {
580+
continue
581+
}
582+
583+
// Stream the file
584+
if err := b.streamFileToSQLite(ctx, db, filePath); err != nil {
585+
// Log error but continue with other files
586+
fmt.Printf("Warning: failed to load file %s: %v\n", filepath.Base(filePath), err)
587+
continue
588+
}
589+
loadedFiles++
590+
}
591+
592+
if loadedFiles == 0 {
593+
return fmt.Errorf("no supported files found in directory: %s", dirPath)
594+
}
595+
596+
return nil
597+
}
598+
515599
// streamReaderToSQLite streams data from io.Reader directly to SQLite database
516600
// This is the ideal approach that provides true streaming with chunk-based processing
517601
func (b *DBBuilder) streamReaderToSQLite(ctx context.Context, db *sql.DB, input ReaderInput) error {
@@ -578,59 +662,6 @@ func (b *DBBuilder) createTableFromChunk(ctx context.Context, db *sql.DB, chunk
578662
return err
579663
}
580664

581-
// createTableFromModel creates a SQLite table from a model.Table and inserts all data
582-
func (b *DBBuilder) createTableFromModel(ctx context.Context, db *sql.DB, table *model.Table) error {
583-
columnInfo := table.ColumnInfo()
584-
columns := make([]string, 0, len(columnInfo))
585-
for _, col := range columnInfo {
586-
columns = append(columns, fmt.Sprintf(`"%s" %s`, col.Name, col.Type.String()))
587-
}
588-
589-
// Create table
590-
query := fmt.Sprintf(
591-
`CREATE TABLE IF NOT EXISTS "%s" (%s)`,
592-
table.Name(),
593-
strings.Join(columns, ", "),
594-
)
595-
596-
if _, err := db.ExecContext(ctx, query); err != nil {
597-
return fmt.Errorf("failed to create table: %w", err)
598-
}
599-
600-
// Insert all data
601-
headers := table.Header()
602-
placeholders := make([]string, len(headers))
603-
for i := range placeholders {
604-
placeholders[i] = "?"
605-
}
606-
607-
insertQuery := fmt.Sprintf( //nolint:gosec // Table name is from validated input
608-
`INSERT INTO "%s" VALUES (%s)`,
609-
table.Name(),
610-
strings.Join(placeholders, ", "),
611-
)
612-
613-
stmt, err := db.PrepareContext(ctx, insertQuery)
614-
if err != nil {
615-
return fmt.Errorf("failed to prepare insert statement: %w", err)
616-
}
617-
defer stmt.Close()
618-
619-
// Insert all records
620-
for _, record := range table.Records() {
621-
values := make([]any, len(record))
622-
for i, value := range record {
623-
values[i] = value
624-
}
625-
626-
if _, err := stmt.ExecContext(ctx, values...); err != nil {
627-
return fmt.Errorf("failed to insert record: %w", err)
628-
}
629-
}
630-
631-
return nil
632-
}
633-
634665
// prepareInsertStatement prepares an insert statement for the table
635666
func (b *DBBuilder) prepareInsertStatement(ctx context.Context, db *sql.DB, chunk *model.TableChunk) (*sql.Stmt, error) {
636667
headers := chunk.Headers()
@@ -663,3 +694,38 @@ func (b *DBBuilder) insertChunkData(ctx context.Context, stmt *sql.Stmt, chunk *
663694

664695
return nil
665696
}
697+
698+
// createDecompressedReader creates a decompressed reader based on file extension
699+
func (b *DBBuilder) createDecompressedReader(file *os.File, filePath string) (io.Reader, error) {
700+
// Check file extension to determine compression type
701+
if strings.HasSuffix(strings.ToLower(filePath), model.ExtGZ) {
702+
gzReader, err := gzip.NewReader(file)
703+
if err != nil {
704+
return nil, fmt.Errorf("failed to create gzip reader: %w", err)
705+
}
706+
return gzReader, nil
707+
}
708+
709+
if strings.HasSuffix(strings.ToLower(filePath), model.ExtBZ2) {
710+
return bzip2.NewReader(file), nil
711+
}
712+
713+
if strings.HasSuffix(strings.ToLower(filePath), model.ExtXZ) {
714+
xzReader, err := xz.NewReader(file)
715+
if err != nil {
716+
return nil, fmt.Errorf("failed to create xz reader: %w", err)
717+
}
718+
return xzReader, nil
719+
}
720+
721+
if strings.HasSuffix(strings.ToLower(filePath), model.ExtZSTD) {
722+
zstdReader, err := zstd.NewReader(file)
723+
if err != nil {
724+
return nil, fmt.Errorf("failed to create zstd reader: %w", err)
725+
}
726+
return zstdReader.IOReadCloser(), nil
727+
}
728+
729+
// No compression, return file as-is
730+
return file, nil
731+
}

0 commit comments

Comments
 (0)