Skip to content
Open
41 changes: 26 additions & 15 deletions go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ retract (
)

require (
cloud.google.com/go/bigquery v1.69.0
cloud.google.com/go/firestore v1.18.0
cloud.google.com/go/logging v1.13.0
firebase.google.com/go/v4 v4.15.2
Expand Down Expand Up @@ -35,27 +36,37 @@ require (
go.opentelemetry.io/otel/trace v1.35.0
golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0
golang.org/x/tools v0.32.0
google.golang.org/api v0.230.0
google.golang.org/api v0.232.0
google.golang.org/genai v1.8.0
)

require (
github.com/apache/arrow/go/v15 v15.0.2 // indirect
github.com/goccy/go-json v0.10.4 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/spf13/cast v1.7.1 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/mod v0.24.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
)

require (
cel.dev/expr v0.20.0 // indirect
cloud.google.com/go v0.118.0 // indirect
cloud.google.com/go/auth v0.16.0 // indirect
cloud.google.com/go v0.121.0 // indirect
cloud.google.com/go/auth v0.16.1 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
cloud.google.com/go/compute/metadata v0.6.0 // indirect
cloud.google.com/go/iam v1.3.1 // indirect
cloud.google.com/go/longrunning v0.6.4 // indirect
cloud.google.com/go/monitoring v1.22.1 // indirect
cloud.google.com/go/storage v1.49.0 // indirect
cloud.google.com/go/iam v1.5.2 // indirect
cloud.google.com/go/longrunning v0.6.7 // indirect
cloud.google.com/go/monitoring v1.24.0 // indirect
cloud.google.com/go/storage v1.53.0 // indirect
cloud.google.com/go/trace v1.11.3 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.26.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.27.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0 // indirect
github.com/MicahParks/keyfunc v1.9.0 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
Expand Down Expand Up @@ -85,7 +96,7 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/s2a-go v0.1.9 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
github.com/googleapis/gax-go/v2 v2.14.1 // indirect
github.com/googleapis/gax-go/v2 v2.14.1
github.com/gorilla/websocket v1.5.3 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/mailru/easyjson v0.9.0 // indirect
Expand All @@ -108,20 +119,20 @@ require (
github.com/zeebo/errs v1.4.0 // indirect
go.mongodb.org/mongo-driver v1.14.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/detectors/gcp v1.34.0 // indirect
go.opentelemetry.io/contrib/detectors/gcp v1.35.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/net v0.39.0 // indirect
golang.org/x/oauth2 v0.29.0 // indirect
golang.org/x/sync v0.13.0 // indirect
golang.org/x/oauth2 v0.30.0
golang.org/x/sync v0.14.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.24.0 // indirect
golang.org/x/time v0.11.0 // indirect
google.golang.org/appengine/v2 v2.0.6 // indirect
google.golang.org/genproto v0.0.0-20250106144421-5f5ef82da422 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e // indirect
google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2 // indirect
google.golang.org/grpc v1.72.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
86 changes: 56 additions & 30 deletions go/go.sum

Large diffs are not rendered by default.

124 changes: 124 additions & 0 deletions go/plugins/vertexai/vectorsearch/bigquery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package vectorsearch

import (
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
"time"

"cloud.google.com/go/bigquery"
"github.com/firebase/genkit/go/ai"
"google.golang.org/api/iterator"
)

// BigQueryDocumentRow defines the structure of a row in the BigQuery table.
type BigQueryDocumentRow struct {
ID string `bigquery:"id"`
Content string `bigquery:"content"` // Stored as JSON string
Metadata string `bigquery:"metadata"` // Stored as JSON string
}

// GetBigQueryDocumentRetriever creates a BigQuery Document Retriever.
// This function returns a DocumentRetriever function that retrieves documents
// from a BigQuery table based on the provided neighbors' IDs.
func GetBigQueryDocumentRetriever(bqClient *bigquery.Client, datasetID, tableID string) DocumentRetriever {
return func(ctx context.Context, neighbors []Neighbor, options any) ([]*ai.Document, error) {
var ids []string
for _, neighbor := range neighbors {
if neighbor.Datapoint.DatapointId != "" {
ids = append(ids, neighbor.Datapoint.DatapointId)
}
}

if len(ids) == 0 {
return []*ai.Document{}, nil
}

// Constructing the query with UNNEST for array parameters
// BigQuery expects parameters for IN clauses with UNNEST to be arrays.
query := fmt.Sprintf("SELECT id, content, metadata FROM `%s.%s.%s` WHERE id IN UNNEST(@ids)", bqClient.Project(), datasetID, tableID)

q := bqClient.Query(query)
q.Parameters = []bigquery.QueryParameter{
{Name: "ids", Value: ids},
}

it, err := q.Read(ctx)
if err != nil {
log.Printf("Failed to execute BigQuery query: %v", err)
return nil, fmt.Errorf("failed to query BigQuery: %w", err)
}

var documents []*ai.Document
for {
var row BigQueryDocumentRow
err := it.Next(&row)
if err == iterator.Done {
break
}
if err != nil {
log.Printf("Error reading BigQuery row: %v", err)
return nil, fmt.Errorf("error reading BigQuery row: %w", err)
}

var doc ai.Document

if err := json.Unmarshal([]byte(row.Content), &doc.Content); err != nil {
log.Printf("Failed to parse content for document ID %s: %v", row.ID, err)
}
if err := json.Unmarshal([]byte(row.Metadata), &doc.Metadata); err != nil {
log.Printf("Failed to parse metadata for document ID %s: %v", row.ID, err)
}
documents = append(documents, &doc)
}

return documents, nil
}
}

func GetBigQueryDocumentIndexer(bqClient *bigquery.Client, datasetID, tableID string) func(ctx context.Context, docs []*ai.Document) ([]string, error) {
return func(ctx context.Context, docs []*ai.Document) ([]string, error) {
var ids []string
var rows []*BigQueryDocumentRow

// Seed the random number generator for generating unique IDs.
rand.Seed(time.Now().UnixNano())

for _, doc := range docs {
id := fmt.Sprintf("%x", rand.Int63()) // Generate a random ID.
ids = append(ids, id)

content, err := json.Marshal(doc.Content)
if err != nil {
return nil, fmt.Errorf("failed to marshal document content: %w", err)
}

metadata, err := json.Marshal(doc.Metadata)
if err != nil {
return nil, fmt.Errorf("failed to marshal document metadata: %w", err)
}

row := &BigQueryDocumentRow{
ID: id,
Content: string(content),
Metadata: string(metadata),
}
rows = append(rows, row)
}

// Log rows for debugging.
for _, row := range rows {
log.Printf("Inserting row: %+v", row)
}

// Insert rows into the BigQuery table.
inserter := bqClient.Dataset(datasetID).Table(tableID).Inserter()
if err := inserter.Put(ctx, rows); err != nil {
return nil, fmt.Errorf("failed to insert rows into BigQuery: %w", err)
}

return ids, nil
}
}
77 changes: 77 additions & 0 deletions go/plugins/vertexai/vectorsearch/firestore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package vectorsearch

import (
"context"
"fmt"
"log"

"cloud.google.com/go/firestore"
"github.com/firebase/genkit/go/ai"
"github.com/googleapis/gax-go/v2/apierror"
)

// GetFirestoreDocumentRetriever creates a Firestore Document Retriever.
// This function returns a DocumentRetriever function that retrieves documents
// from a Firestore collection based on the provided Vertex AI Vector Search neighbors' IDs.
func GetFirestoreDocumentRetriever(db *firestore.Client, collectionName string) DocumentRetriever {
return func(ctx context.Context, neighbors []Neighbor, options any) ([]*ai.Document, error) {
docs := []*ai.Document{}
for _, neighbor := range neighbors {
if neighbor.Datapoint.DatapointId == "" {
log.Printf("Skipping neighbor with empty or nil DatapointId: %+v", neighbor)
continue
}

docRef := db.Collection(collectionName).Doc(neighbor.Datapoint.DatapointId)
docSnapshot, err := docRef.Get(ctx)
if err != nil {
// Log the error but continue to try other neighbors.
log.Printf("Failed to get document %s from Firestore: %v", neighbor.Datapoint.DatapointId, err)
continue
}

if !docSnapshot.Exists() {
log.Printf("Document %s does not exist in collection %s. Skipping.", neighbor.Datapoint.DatapointId, collectionName)
continue
}

var firestoreData ai.Document
if err := docSnapshot.DataTo(&firestoreData); err != nil {
log.Printf("Failed to unmarshal document data for ID %s: %v", neighbor.Datapoint.DatapointId, err)
continue
}

docs = append(docs, &firestoreData)
}
return docs, nil
}
}

// GetFirestoreDocumentIndexer creates a Firestore Document Indexer.
// This function returns a DocumentIndexer function that indexes documents
// into a Firestore collection.
func GetFirestoreDocumentIndexer(db *firestore.Client, collectionName string) DocumentIndexer {
return func(ctx context.Context, docs []*ai.Document) ([]string, error) {
batch := db.Batch()
var ids []string

for _, doc := range docs {
docRef := db.Collection(collectionName).NewDoc() // Generate a new document reference.
batch.Set(docRef, map[string]interface{}{
"content": doc.Content,
"metadata": doc.Metadata,
})
ids = append(ids, docRef.ID)
}

// Commit the batch operation.
if _, err := batch.Commit(ctx); err != nil {
if apiErr, ok := err.(*apierror.APIError); ok {
log.Printf("Firestore API Error: %v, DebugInfo: %v", apiErr, apiErr.Details())
}
return nil, fmt.Errorf("failed to commit Firestore batch: %w", err)
}

return ids, nil
}
}
Loading