Skip to content

Commit 3d4199c

Browse files
committed
Fix handing of response metadata in Bolt V3
Metadata keys are different in previous versions of the protocol. This commit makes it possible for keys to be defined by each Bolt protocol implementation. Also fixed couple unit tests and added license headers.
1 parent 5f87040 commit 3d4199c

24 files changed

+238
-54
lines changed

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public abstract class PullAllResponseHandler implements ResponseHandler
5353

5454
private final Statement statement;
5555
private final RunResponseHandler runResponseHandler;
56+
private final String resultConsumedAfterMetadataKey;
5657
protected final Connection connection;
5758

5859
// initialized lazily when first record arrives
@@ -66,10 +67,11 @@ public abstract class PullAllResponseHandler implements ResponseHandler
6667
private CompletableFuture<Record> recordFuture;
6768
private CompletableFuture<Throwable> failureFuture;
6869

69-
public PullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler, Connection connection )
70+
public PullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler, String resultConsumedAfterMetadataKey, Connection connection )
7071
{
7172
this.statement = requireNonNull( statement );
7273
this.runResponseHandler = requireNonNull( runResponseHandler );
74+
this.resultConsumedAfterMetadataKey = requireNonNull( resultConsumedAfterMetadataKey );
7375
this.connection = requireNonNull( connection );
7476
}
7577

@@ -317,6 +319,6 @@ private boolean completeFailureFuture( Throwable error )
317319
private ResultSummary extractResultSummary( Map<String,Value> metadata )
318320
{
319321
long resultAvailableAfter = runResponseHandler.resultAvailableAfter();
320-
return MetadataUtil.extractSummary( statement, connection, resultAvailableAfter, metadata );
322+
return MetadataUtil.extractSummary( statement, connection, resultAvailableAfter, metadata, resultConsumedAfterMetadataKey );
321323
}
322324
}

driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,22 @@
3232
public class RunResponseHandler implements ResponseHandler
3333
{
3434
private final CompletableFuture<Void> runCompletedFuture;
35+
private final String resultAvailableAfterMetadataKey;
3536

3637
private List<String> statementKeys = emptyList();
3738
private long resultAvailableAfter = -1;
3839

39-
public RunResponseHandler( CompletableFuture<Void> runCompletedFuture )
40+
public RunResponseHandler( CompletableFuture<Void> runCompletedFuture, String resultAvailableAfterMetadataKey )
4041
{
4142
this.runCompletedFuture = runCompletedFuture;
43+
this.resultAvailableAfterMetadataKey = resultAvailableAfterMetadataKey;
4244
}
4345

4446
@Override
4547
public void onSuccess( Map<String,Value> metadata )
4648
{
4749
statementKeys = extractStatementKeys( metadata );
48-
resultAvailableAfter = extractResultAvailableAfter( metadata );
50+
resultAvailableAfter = extractResultAvailableAfter( metadata, resultAvailableAfterMetadataKey );
4951

5052
completeRunFuture();
5153
}

driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
public class SessionPullAllResponseHandler extends PullAllResponseHandler
2525
{
2626
public SessionPullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler,
27-
Connection connection )
27+
String resultConsumedAfterMetadataKey, Connection connection )
2828
{
29-
super( statement, runResponseHandler, connection );
29+
super( statement, runResponseHandler, resultConsumedAfterMetadataKey, connection );
3030
}
3131

3232
@Override

driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ public class TransactionPullAllResponseHandler extends PullAllResponseHandler
2828
{
2929
private final ExplicitTransaction tx;
3030

31-
public TransactionPullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler,
31+
public TransactionPullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler, String resultConsumedAfterMetadataKey,
3232
Connection connection, ExplicitTransaction tx )
3333
{
34-
super( statement, runResponseHandler, connection );
34+
super( statement, runResponseHandler, resultConsumedAfterMetadataKey, connection );
3535
this.tx = requireNonNull( tx );
3636
}
3737

driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ public class BoltProtocolV1 implements BoltProtocol
5959

6060
public static final BoltProtocol INSTANCE = new BoltProtocolV1();
6161

62+
public static final String RESULT_AVAILABLE_AFTER_METADATA_KEY = "result_available_after";
63+
public static final String RESULT_CONSUMED_AFTER_METADATA_KEY = "result_consumed_after";
64+
6265
private static final String BEGIN_QUERY = "BEGIN";
6366
private static final Message BEGIN_MESSAGE = new RunMessage( BEGIN_QUERY );
6467
private static final Message COMMIT_MESSAGE = new RunMessage( "COMMIT" );
@@ -150,7 +153,7 @@ private static CompletionStage<InternalStatementResultCursor> runStatement( Conn
150153
Map<String,Value> params = statement.parameters().asMap( ofValue() );
151154

152155
CompletableFuture<Void> runCompletedFuture = new CompletableFuture<>();
153-
RunResponseHandler runHandler = new RunResponseHandler( runCompletedFuture );
156+
RunResponseHandler runHandler = new RunResponseHandler( runCompletedFuture, RESULT_AVAILABLE_AFTER_METADATA_KEY );
154157
PullAllResponseHandler pullAllHandler = newPullAllHandler( statement, runHandler, connection, tx );
155158

156159
connection.writeAndFlush(
@@ -174,8 +177,8 @@ private static PullAllResponseHandler newPullAllHandler( Statement statement, Ru
174177
{
175178
if ( tx != null )
176179
{
177-
return new TransactionPullAllResponseHandler( statement, runHandler, connection, tx );
180+
return new TransactionPullAllResponseHandler( statement, runHandler, RESULT_CONSUMED_AFTER_METADATA_KEY, connection, tx );
178181
}
179-
return new SessionPullAllResponseHandler( statement, runHandler, connection );
182+
return new SessionPullAllResponseHandler( statement, runHandler, RESULT_CONSUMED_AFTER_METADATA_KEY, connection );
180183
}
181184
}

driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ public class BoltProtocolV3 implements BoltProtocol
6161

6262
public static final BoltProtocol INSTANCE = new BoltProtocolV3();
6363

64+
public static final String RESULT_AVAILABLE_AFTER_METADATA_KEY = "t_first";
65+
public static final String RESULT_CONSUMED_AFTER_METADATA_KEY = "t_last";
66+
6467
@Override
6568
public MessageFormat createMessageFormat()
6669
{
@@ -134,7 +137,7 @@ private static CompletionStage<InternalStatementResultCursor> runStatement( Conn
134137

135138
CompletableFuture<Void> runCompletedFuture = new CompletableFuture<>();
136139
Message runMessage = new RunWithMetadataMessage( query, params, null, null, null );
137-
RunResponseHandler runHandler = new RunResponseHandler( runCompletedFuture );
140+
RunResponseHandler runHandler = new RunResponseHandler( runCompletedFuture, RESULT_AVAILABLE_AFTER_METADATA_KEY );
138141
PullAllResponseHandler pullAllHandler = newPullAllHandler( statement, runHandler, connection, tx );
139142

140143
connection.writeAndFlush( runMessage, runHandler, PULL_ALL, pullAllHandler );
@@ -156,8 +159,8 @@ private static PullAllResponseHandler newPullAllHandler( Statement statement, Ru
156159
{
157160
if ( tx != null )
158161
{
159-
return new TransactionPullAllResponseHandler( statement, runHandler, connection, tx );
162+
return new TransactionPullAllResponseHandler( statement, runHandler, RESULT_CONSUMED_AFTER_METADATA_KEY, connection, tx );
160163
}
161-
return new SessionPullAllResponseHandler( statement, runHandler, connection );
164+
return new SessionPullAllResponseHandler( statement, runHandler, RESULT_CONSUMED_AFTER_METADATA_KEY, connection );
162165
}
163166
}

driver/src/main/java/org/neo4j/driver/internal/util/MetadataUtil.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ public static List<String> extractStatementKeys( Map<String,Value> metadata )
6666
return emptyList();
6767
}
6868

69-
public static long extractResultAvailableAfter( Map<String,Value> metadata )
69+
public static long extractResultAvailableAfter( Map<String,Value> metadata, String key )
7070
{
71-
Value resultAvailableAfterValue = metadata.get( "result_available_after" );
71+
Value resultAvailableAfterValue = metadata.get( key );
7272
if ( resultAvailableAfterValue != null )
7373
{
7474
return resultAvailableAfterValue.asLong();
@@ -77,12 +77,12 @@ public static long extractResultAvailableAfter( Map<String,Value> metadata )
7777
}
7878

7979
public static ResultSummary extractSummary( Statement statement, Connection connection, long resultAvailableAfter,
80-
Map<String,Value> metadata )
80+
Map<String,Value> metadata, String resultConsumedAfterMetadataKey )
8181
{
8282
ServerInfo serverInfo = new InternalServerInfo( connection.serverAddress(), connection.serverVersion() );
8383
return new InternalResultSummary( statement, serverInfo, extractStatementType( metadata ),
8484
extractCounters( metadata ), extractPlan( metadata ), extractProfiledPlan( metadata ),
85-
extractNotifications( metadata ), resultAvailableAfter, extractResultConsumedAfter( metadata ) );
85+
extractNotifications( metadata ), resultAvailableAfter, extractResultConsumedAfter( metadata, resultConsumedAfterMetadataKey ) );
8686
}
8787

8888
private static StatementType extractStatementType( Map<String,Value> metadata )
@@ -153,9 +153,9 @@ private static List<Notification> extractNotifications( Map<String,Value> metada
153153
return Collections.emptyList();
154154
}
155155

156-
private static long extractResultConsumedAfter( Map<String,Value> metadata )
156+
private static long extractResultConsumedAfter( Map<String,Value> metadata, String key )
157157
{
158-
Value resultConsumedAfterValue = metadata.get( "result_consumed_after" );
158+
Value resultConsumedAfterValue = metadata.get( key );
159159
if ( resultConsumedAfterValue != null )
160160
{
161161
return resultConsumedAfterValue.asLong();

driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultCursorTest.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
3030
import org.neo4j.driver.internal.handlers.RunResponseHandler;
31+
import org.neo4j.driver.internal.messaging.v1.BoltProtocolV1;
3132
import org.neo4j.driver.internal.summary.InternalResultSummary;
3233
import org.neo4j.driver.internal.summary.InternalServerInfo;
3334
import org.neo4j.driver.internal.summary.InternalSummaryCounters;
@@ -66,7 +67,7 @@ class InternalStatementResultCursorTest
6667
@Test
6768
void shouldReturnStatementKeys()
6869
{
69-
RunResponseHandler runHandler = new RunResponseHandler( new CompletableFuture<>() );
70+
RunResponseHandler runHandler = newRunResponseHandler();
7071
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
7172

7273
List<String> keys = asList( "key1", "key2", "key3" );
@@ -395,12 +396,16 @@ void shouldPropagateFailureInConsumeAsync()
395396

396397
private static InternalStatementResultCursor newCursor( PullAllResponseHandler pullAllHandler )
397398
{
398-
return new InternalStatementResultCursor( new RunResponseHandler( new CompletableFuture<>() ), pullAllHandler );
399+
return new InternalStatementResultCursor( newRunResponseHandler(), pullAllHandler );
399400
}
400401

401-
private static InternalStatementResultCursor newCursor( RunResponseHandler runHandler,
402-
PullAllResponseHandler pullAllHandler )
402+
private static InternalStatementResultCursor newCursor( RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
403403
{
404404
return new InternalStatementResultCursor( runHandler, pullAllHandler );
405405
}
406+
407+
private static RunResponseHandler newRunResponseHandler()
408+
{
409+
return new RunResponseHandler( new CompletableFuture<>(), BoltProtocolV1.RESULT_AVAILABLE_AFTER_METADATA_KEY );
410+
}
406411
}

driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import static org.mockito.Mockito.mock;
5353
import static org.mockito.Mockito.when;
5454
import static org.neo4j.driver.internal.BoltServerAddress.LOCAL_DEFAULT;
55+
import static org.neo4j.driver.internal.messaging.v1.BoltProtocolV1.RESULT_AVAILABLE_AFTER_METADATA_KEY;
56+
import static org.neo4j.driver.internal.messaging.v1.BoltProtocolV1.RESULT_CONSUMED_AFTER_METADATA_KEY;
5557
import static org.neo4j.driver.v1.Records.column;
5658
import static org.neo4j.driver.v1.Values.ofString;
5759
import static org.neo4j.driver.v1.Values.value;
@@ -348,14 +350,14 @@ void shouldNotPeekIntoTheFutureWhenResultIsEmpty()
348350

349351
private StatementResult createResult( int numberOfRecords )
350352
{
351-
RunResponseHandler runHandler = new RunResponseHandler( new CompletableFuture<>() );
353+
RunResponseHandler runHandler = new RunResponseHandler( new CompletableFuture<>(), RESULT_AVAILABLE_AFTER_METADATA_KEY );
352354
runHandler.onSuccess( singletonMap( "fields", value( Arrays.asList( "k1", "k2" ) ) ) );
353355

354356
Statement statement = new Statement( "<unknown>" );
355357
Connection connection = mock( Connection.class );
356358
when( connection.serverAddress() ).thenReturn( LOCAL_DEFAULT );
357359
when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 );
358-
PullAllResponseHandler pullAllHandler = new SessionPullAllResponseHandler( statement, runHandler, connection );
360+
PullAllResponseHandler pullAllHandler = new SessionPullAllResponseHandler( statement, runHandler, RESULT_CONSUMED_AFTER_METADATA_KEY, connection );
359361

360362
for ( int i = 1; i <= numberOfRecords; i++ )
361363
{

driver/src/test/java/org/neo4j/driver/internal/async/DirectConnectionTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ void shouldWriteAndFlushSingleMessage()
196196
DirectConnection connection = newConnection( channel );
197197

198198
connection.writeAndFlush( PULL_ALL, NO_OP_HANDLER );
199+
channel.runPendingTasks(); // writeAndFlush is scheduled to execute in the event loop thread, trigger its execution
199200

200201
assertEquals( 1, channel.outboundMessages().size() );
201202
assertEquals( PULL_ALL, single( channel.outboundMessages() ) );
@@ -208,6 +209,7 @@ void shouldWriteAndFlushMultipleMessage()
208209
DirectConnection connection = newConnection( channel );
209210

210211
connection.writeAndFlush( PULL_ALL, NO_OP_HANDLER, RESET, NO_OP_HANDLER );
212+
channel.runPendingTasks(); // writeAndFlush is scheduled to execute in the event loop thread, trigger its execution
211213

212214
assertEquals( 2, channel.outboundMessages().size() );
213215
assertEquals( PULL_ALL, channel.outboundMessages().poll() );

0 commit comments

Comments
 (0)