Skip to content

Commit 4b8855a

Browse files
committed
chore: add multi-statement support to SpannerLib
Adds support for multi-statement SQL strings to SpannerLib. This adds a new exported function NextResultSet that moves the pointer to the next result for a multi-statement SQL string. It also updates the return type for the gRPC server, so this returns an indication whether the result contains more result sets. These changes are backwards-compatible with existing implementations for Rows. Wrappers are free to ignore any result sets after the first one.
1 parent 5e98b10 commit 4b8855a

File tree

56 files changed

+4640
-5145
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+4640
-5145
lines changed

spannerlib/api/connection.go

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"context"
1919
"database/sql"
2020
"database/sql/driver"
21-
"fmt"
2221
"strings"
2322
"sync"
2423
"sync/atomic"
@@ -322,27 +321,14 @@ func execute(ctx, directExecuteContext context.Context, conn *Connection, execut
322321
if err != nil {
323322
return 0, err
324323
}
325-
// The first result set should contain the metadata.
326-
if !it.Next() {
327-
_ = it.Close()
328-
return 0, fmt.Errorf("query returned no metadata")
324+
res := &rows{
325+
backend: it,
329326
}
330-
metadata := &spannerpb.ResultSetMetadata{}
331-
if err := it.Scan(&metadata); err != nil {
332-
_ = it.Close()
327+
if err := res.readMetadata(ctx); err != nil {
333328
return 0, err
334329
}
335-
// Move to the next result set, which contains the normal data.
336-
if !it.NextResultSet() {
337-
_ = it.Close()
338-
return 0, fmt.Errorf("no results found after metadata")
339-
}
340330
id := conn.resultsIdx.Add(1)
341-
res := &rows{
342-
backend: it,
343-
metadata: metadata,
344-
}
345-
if len(metadata.RowType.Fields) == 0 {
331+
if len(res.metadata.RowType.Fields) == 0 {
346332
// No rows returned. Read the stats now.
347333
_ = res.readStats(ctx)
348334
}

spannerlib/api/rows.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"database/sql"
2020
"errors"
21+
"fmt"
2122

2223
"cloud.google.com/go/spanner"
2324
"cloud.google.com/go/spanner/apiv1/spannerpb"
@@ -54,6 +55,16 @@ func ResultSetStats(ctx context.Context, poolId, connId, rowsId int64) (*spanner
5455
return res.ResultSetStats(ctx)
5556
}
5657

58+
// NextResultSet returns the ResultSetMetadata of the next result set in the given rows or nil if there are no
59+
// more result sets in the given Rows object.
60+
func NextResultSet(ctx context.Context, poolId, connId, rowsId int64) (*spannerpb.ResultSetMetadata, error) {
61+
res, err := findRows(poolId, connId, rowsId)
62+
if err != nil {
63+
return nil, err
64+
}
65+
return res.NextResultSet(ctx)
66+
}
67+
5768
// NextEncoded returns the next row data in encoded form.
5869
// Using NextEncoded instead of Next can be more efficient for large result sets,
5970
// as it allows the library to re-use the encoding buffer.
@@ -146,6 +157,32 @@ func (rows *rows) ResultSetStats(ctx context.Context) (*spannerpb.ResultSetStats
146157
return rows.stats, nil
147158
}
148159

160+
func (rows *rows) NextResultSet(ctx context.Context) (*spannerpb.ResultSetMetadata, error) {
161+
if !rows.done {
162+
// The current result set has not been read to the end.
163+
// We therefore need to move the cursor to the next result set contains the stats for the current result set,
164+
// so we can skip those.
165+
rows.backend.NextResultSet()
166+
}
167+
if rows.backend.NextResultSet() {
168+
rows.done = false
169+
rows.stats = nil
170+
if err := rows.readMetadata(ctx); err != nil {
171+
return nil, err
172+
}
173+
if len(rows.metadata.RowType.Fields) == 0 {
174+
if err := rows.readStats(ctx); err != nil {
175+
return nil, err
176+
}
177+
}
178+
return rows.metadata, nil
179+
}
180+
if err := rows.backend.Err(); err != nil {
181+
return nil, err
182+
}
183+
return nil, nil
184+
}
185+
149186
type genericValue struct {
150187
v *structpb.Value
151188
}
@@ -199,6 +236,25 @@ func (rows *rows) Next(ctx context.Context) (*structpb.ListValue, error) {
199236
return rows.values, nil
200237
}
201238

239+
func (rows *rows) readMetadata(ctx context.Context) error {
240+
// The first result set should contain the metadata.
241+
if !rows.backend.Next() {
242+
_ = rows.backend.Close()
243+
return fmt.Errorf("query returned no metadata")
244+
}
245+
rows.metadata = &spannerpb.ResultSetMetadata{}
246+
if err := rows.backend.Scan(&rows.metadata); err != nil {
247+
_ = rows.backend.Close()
248+
return err
249+
}
250+
// Move to the next result set, which contains the normal data.
251+
if !rows.backend.NextResultSet() {
252+
_ = rows.backend.Close()
253+
return fmt.Errorf("no results found after metadata")
254+
}
255+
return nil
256+
}
257+
202258
func (rows *rows) readStats(ctx context.Context) error {
203259
rows.stats = &spannerpb.ResultSetStats{}
204260
if !rows.backend.NextResultSet() {

spannerlib/api/rows_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,142 @@ func TestExecute(t *testing.T) {
8282
}
8383
}
8484

85+
func TestExecuteMultiStatement(t *testing.T) {
86+
t.Parallel()
87+
88+
ctx := context.Background()
89+
server, teardown := setupMockServer(t)
90+
defer teardown()
91+
dsn := fmt.Sprintf("%s/projects/p/instances/i/databases/d?useplaintext=true", server.Address)
92+
93+
poolId, err := CreatePool(ctx, dsn)
94+
if err != nil {
95+
t.Fatalf("CreatePool returned unexpected error: %v", err)
96+
}
97+
connId, err := CreateConnection(ctx, poolId)
98+
if err != nil {
99+
t.Fatalf("CreateConnection returned unexpected error: %v", err)
100+
}
101+
rowsId, err := Execute(ctx, poolId, connId, &spannerpb.ExecuteSqlRequest{
102+
Sql: fmt.Sprintf("%s;%s", testutil.SelectFooFromBar, testutil.SelectFooFromBar),
103+
})
104+
if rowsId == 0 {
105+
t.Fatal("Execute returned unexpected zero id")
106+
}
107+
108+
totalRowCount := 0
109+
for {
110+
rowCount := 0
111+
for {
112+
row, err := Next(ctx, poolId, connId, rowsId)
113+
if err != nil {
114+
t.Fatalf("Next returned unexpected error: %v", err)
115+
}
116+
if row == nil {
117+
break
118+
}
119+
rowCount++
120+
totalRowCount++
121+
}
122+
if g, w := rowCount, 2; g != w {
123+
t.Fatalf("row count mismatch\n Got: %d\nWant: %d", g, w)
124+
}
125+
metadata, err := NextResultSet(ctx, poolId, connId, rowsId)
126+
if err != nil {
127+
t.Fatalf("NextResultSet returned unexpected error: %v", err)
128+
}
129+
if metadata == nil {
130+
break
131+
}
132+
}
133+
if g, w := totalRowCount, 4; g != w {
134+
t.Fatalf("total row count mismatch\n Got: %d\nWant: %d", g, w)
135+
}
136+
137+
if err := CloseRows(ctx, poolId, connId, rowsId); err != nil {
138+
t.Fatalf("CloseRows returned unexpected error: %v", err)
139+
}
140+
if err := CloseConnection(ctx, poolId, connId); err != nil {
141+
t.Fatalf("CloseConnection returned unexpected error: %v", err)
142+
}
143+
if err := ClosePool(ctx, poolId); err != nil {
144+
t.Fatalf("ClosePool returned unexpected error: %v", err)
145+
}
146+
147+
requests := server.TestSpanner.DrainRequestsFromServer()
148+
executeRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.ExecuteSqlRequest{}))
149+
if g, w := len(executeRequests), 2; g != w {
150+
t.Fatalf("num ExecuteSql requests mismatch\n Got: %d\nWant: %d", g, w)
151+
}
152+
}
153+
154+
func TestExecuteMultiStatement_MoveToNextResultSetHalfway(t *testing.T) {
155+
t.Parallel()
156+
157+
ctx := context.Background()
158+
server, teardown := setupMockServer(t)
159+
defer teardown()
160+
dsn := fmt.Sprintf("%s/projects/p/instances/i/databases/d?useplaintext=true", server.Address)
161+
162+
poolId, err := CreatePool(ctx, dsn)
163+
if err != nil {
164+
t.Fatalf("CreatePool returned unexpected error: %v", err)
165+
}
166+
connId, err := CreateConnection(ctx, poolId)
167+
if err != nil {
168+
t.Fatalf("CreateConnection returned unexpected error: %v", err)
169+
}
170+
rowsId, err := Execute(ctx, poolId, connId, &spannerpb.ExecuteSqlRequest{
171+
Sql: fmt.Sprintf("%s;%s", testutil.SelectFooFromBar, testutil.SelectFooFromBar),
172+
})
173+
if rowsId == 0 {
174+
t.Fatal("Execute returned unexpected zero id")
175+
}
176+
177+
// Read one row from the first result set.
178+
row, err := Next(ctx, poolId, connId, rowsId)
179+
if err != nil {
180+
t.Fatalf("Next returned unexpected error: %v", err)
181+
}
182+
if row == nil {
183+
t.Fatal("Next returned unexpected nil row")
184+
}
185+
// Then move to the next result set.
186+
metadata, err := NextResultSet(ctx, poolId, connId, rowsId)
187+
if err != nil {
188+
t.Fatalf("NextResultSet returned unexpected error: %v", err)
189+
}
190+
if metadata == nil {
191+
t.Fatal("NextResultSet returned unexpected nil metadata")
192+
}
193+
// Try to read two rows from the second result set.
194+
for range 2 {
195+
row, err := Next(ctx, poolId, connId, rowsId)
196+
if err != nil {
197+
t.Fatalf("Next returned unexpected error: %v", err)
198+
}
199+
if row == nil {
200+
t.Fatal("Next returned unexpected nil row")
201+
}
202+
}
203+
204+
if err := CloseRows(ctx, poolId, connId, rowsId); err != nil {
205+
t.Fatalf("CloseRows returned unexpected error: %v", err)
206+
}
207+
if err := CloseConnection(ctx, poolId, connId); err != nil {
208+
t.Fatalf("CloseConnection returned unexpected error: %v", err)
209+
}
210+
if err := ClosePool(ctx, poolId); err != nil {
211+
t.Fatalf("ClosePool returned unexpected error: %v", err)
212+
}
213+
214+
requests := server.TestSpanner.DrainRequestsFromServer()
215+
executeRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.ExecuteSqlRequest{}))
216+
if g, w := len(executeRequests), 2; g != w {
217+
t.Fatalf("num ExecuteSql requests mismatch\n Got: %d\nWant: %d", g, w)
218+
}
219+
}
220+
85221
func TestExecuteUnknownConnection(t *testing.T) {
86222
t.Parallel()
87223

spannerlib/grpc-server/build-protos.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,5 @@ protoc \
2626
google/spannerlib/v1/spannerlib.proto
2727
cd .. || exit 1
2828
rm googleapis/google/spannerlib
29-
git rm googleapis
29+
git rm googleapis -f
3030
rm ../../.gitmodules

spannerlib/grpc-server/google/spannerlib/v1/spannerlib.pb.go

Lines changed: 18 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

spannerlib/grpc-server/google/spannerlib/v1/spannerlib.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ message RowData {
143143
(google.api.field_behavior) = REQUIRED
144144
];
145145
google.spanner.v1.ResultSetStats stats = 4;
146+
bool has_more_results = 5;
146147
}
147148

148149
message MetadataRequest {

spannerlib/grpc-server/server.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ func (s *spannerLibServer) ExecuteStreaming(request *pb.ExecuteRequest, stream g
133133
if err != nil {
134134
return err
135135
}
136+
136137
first := true
137138
for {
138139
if row, err := api.Next(queryContext, request.Connection.Pool.Id, request.Connection.Id, id); err != nil {
@@ -143,14 +144,23 @@ func (s *spannerLibServer) ExecuteStreaming(request *pb.ExecuteRequest, stream g
143144
if err != nil {
144145
return err
145146
}
146-
res := &pb.RowData{Rows: rows, Stats: stats}
147+
nextMetadata, err := api.NextResultSet(queryContext, request.Connection.Pool.Id, request.Connection.Id, id)
148+
if err != nil {
149+
return err
150+
}
151+
res := &pb.RowData{Rows: rows, Stats: stats, HasMoreResults: nextMetadata != nil}
147152
if first {
148153
res.Metadata = metadata
149154
first = false
150155
}
151156
if err := stream.Send(res); err != nil {
152157
return err
153158
}
159+
if res.HasMoreResults {
160+
metadata = nextMetadata
161+
first = true
162+
continue
163+
}
154164
break
155165
}
156166
res := &pb.RowData{Rows: rows, Data: []*structpb.ListValue{row}}

spannerlib/grpc-server/server_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,13 +264,18 @@ func TestExecuteStreamingMultiStatement(t *testing.T) {
264264
if err != nil {
265265
t.Fatalf("failed to execute: %v", err)
266266
}
267+
numResultSets := 1
267268
numRows := 0
268269
for {
269270
row, err := stream.Recv()
270271
if err != nil {
271272
t.Fatalf("failed to receive row: %v", err)
272273
}
273274
if len(row.Data) == 0 {
275+
if row.HasMoreResults {
276+
numResultSets++
277+
continue
278+
}
274279
break
275280
}
276281
if g, w := len(row.Data), 1; g != w {
@@ -281,9 +286,12 @@ func TestExecuteStreamingMultiStatement(t *testing.T) {
281286
}
282287
numRows++
283288
}
284-
if g, w := numRows, 2; g != w {
289+
if g, w := numRows, 4; g != w {
285290
t.Fatalf("num rows mismatch\n Got: %v\nWant: %v", g, w)
286291
}
292+
if g, w := numResultSets, 2; g != w {
293+
t.Fatalf("num result sets mismatch\n Got: %v\nWant: %v", g, w)
294+
}
287295

288296
if _, err := client.ClosePool(ctx, pool); err != nil {
289297
t.Fatalf("failed to close pool: %v", err)

0 commit comments

Comments
 (0)