Skip to content

Commit 45a1a15

Browse files
authored
Merge pull request #518 from lutovich/1.7-tx-meta-and-timeout
Expose transaction timeout and metadata in the API
2 parents 0b55f09 + d902238 commit 45a1a15

30 files changed

+1768
-130
lines changed

driver/src/main/java/org/neo4j/driver/internal/AbstractStatementRunner.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.CompletionStage;
2323

2424
import org.neo4j.driver.internal.types.InternalTypeSystem;
25+
import org.neo4j.driver.internal.util.Extract;
2526
import org.neo4j.driver.internal.value.MapValue;
2627
import org.neo4j.driver.v1.Record;
2728
import org.neo4j.driver.v1.Statement;
@@ -32,10 +33,6 @@
3233
import org.neo4j.driver.v1.Values;
3334
import org.neo4j.driver.v1.types.TypeSystem;
3435

35-
import static org.neo4j.driver.internal.util.Extract.assertParameter;
36-
import static org.neo4j.driver.internal.util.Iterables.newHashMapWithSize;
37-
import static org.neo4j.driver.v1.Values.value;
38-
3936
abstract class AbstractStatementRunner implements StatementRunner
4037
{
4138
@Override
@@ -103,14 +100,6 @@ private static Value parameters( Map<String,Object> map )
103100
{
104101
return Values.EmptyMap;
105102
}
106-
107-
Map<String,Value> asValues = newHashMapWithSize( map.size() );
108-
for ( Map.Entry<String,Object> entry : map.entrySet() )
109-
{
110-
Object value = entry.getValue();
111-
assertParameter( value );
112-
asValues.put( entry.getKey(), value( value ) );
113-
}
114-
return new MapValue( asValues );
103+
return new MapValue( Extract.mapOfValues( map ) );
115104
}
116105
}

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.neo4j.driver.v1.StatementResult;
3232
import org.neo4j.driver.v1.StatementResultCursor;
3333
import org.neo4j.driver.v1.Transaction;
34+
import org.neo4j.driver.v1.TransactionConfig;
3435
import org.neo4j.driver.v1.exceptions.ClientException;
3536

3637
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
@@ -78,9 +79,9 @@ public ExplicitTransaction( Connection connection, NetworkSession session )
7879
this.resultCursors = new ResultCursorsHolder();
7980
}
8081

81-
public CompletionStage<ExplicitTransaction> beginAsync( Bookmarks initialBookmarks )
82+
public CompletionStage<ExplicitTransaction> beginAsync( Bookmarks initialBookmarks, TransactionConfig config )
8283
{
83-
return protocol.beginTransaction( connection, initialBookmarks )
84+
return protocol.beginTransaction( connection, initialBookmarks, config )
8485
.handle( ( ignore, beginError ) ->
8586
{
8687
if ( beginError != null )

driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ class LeakLoggingNetworkSession extends NetworkSession
3030
{
3131
private final String stackTrace;
3232

33-
LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
34-
Logging logging )
33+
LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic, Logging logging )
3534
{
3635
super( connectionProvider, mode, retryLogic, logging );
3736
this.stackTrace = captureStackTrace();

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 95 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import java.util.Map;
2122
import java.util.concurrent.CompletableFuture;
2223
import java.util.concurrent.CompletionException;
2324
import java.util.concurrent.CompletionStage;
@@ -36,9 +37,11 @@
3637
import org.neo4j.driver.v1.StatementResult;
3738
import org.neo4j.driver.v1.StatementResultCursor;
3839
import org.neo4j.driver.v1.Transaction;
40+
import org.neo4j.driver.v1.TransactionConfig;
3941
import org.neo4j.driver.v1.TransactionWork;
4042
import org.neo4j.driver.v1.exceptions.ClientException;
4143

44+
import static java.util.Collections.emptyMap;
4245
import static java.util.concurrent.CompletableFuture.completedFuture;
4346
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4447
import static org.neo4j.driver.internal.util.Futures.failedFuture;
@@ -59,8 +62,7 @@ public class NetworkSession extends AbstractStatementRunner implements Session
5962

6063
private final AtomicBoolean open = new AtomicBoolean( true );
6164

62-
public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
63-
Logging logging )
65+
public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic, Logging logging )
6466
{
6567
this.connectionProvider = connectionProvider;
6668
this.mode = mode;
@@ -71,7 +73,25 @@ public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, R
7173
@Override
7274
public StatementResult run( Statement statement )
7375
{
74-
StatementResultCursor cursor = Futures.blockingGet( run( statement, false ),
76+
return run( statement, TransactionConfig.empty() );
77+
}
78+
79+
@Override
80+
public StatementResult run( String statement, TransactionConfig config )
81+
{
82+
return run( statement, emptyMap(), config );
83+
}
84+
85+
@Override
86+
public StatementResult run( String statement, Map<String,Object> parameters, TransactionConfig config )
87+
{
88+
return run( new Statement( statement, parameters ), config );
89+
}
90+
91+
@Override
92+
public StatementResult run( Statement statement, TransactionConfig config )
93+
{
94+
StatementResultCursor cursor = Futures.blockingGet( run( statement, config, false ),
7595
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in session" ) );
7696

7797
// query executed, it is safe to obtain a connection in a blocking way
@@ -81,9 +101,27 @@ public StatementResult run( Statement statement )
81101

82102
@Override
83103
public CompletionStage<StatementResultCursor> runAsync( Statement statement )
104+
{
105+
return runAsync( statement, TransactionConfig.empty() );
106+
}
107+
108+
@Override
109+
public CompletionStage<StatementResultCursor> runAsync( String statement, TransactionConfig config )
110+
{
111+
return runAsync( statement, emptyMap(), config );
112+
}
113+
114+
@Override
115+
public CompletionStage<StatementResultCursor> runAsync( String statement, Map<String,Object> parameters, TransactionConfig config )
116+
{
117+
return runAsync( new Statement( statement, parameters ), config );
118+
}
119+
120+
@Override
121+
public CompletionStage<StatementResultCursor> runAsync( Statement statement, TransactionConfig config )
84122
{
85123
//noinspection unchecked
86-
return (CompletionStage) run( statement, true );
124+
return (CompletionStage) run( statement, config, true );
87125
}
88126

89127
@Override
@@ -131,7 +169,13 @@ public CompletionStage<Void> closeAsync()
131169
@Override
132170
public Transaction beginTransaction()
133171
{
134-
return beginTransaction( mode );
172+
return beginTransaction( TransactionConfig.empty() );
173+
}
174+
175+
@Override
176+
public Transaction beginTransaction( TransactionConfig config )
177+
{
178+
return beginTransaction( mode, config );
135179
}
136180

137181
@Deprecated
@@ -144,33 +188,63 @@ public Transaction beginTransaction( String bookmark )
144188

145189
@Override
146190
public CompletionStage<Transaction> beginTransactionAsync()
191+
{
192+
return beginTransactionAsync( TransactionConfig.empty() );
193+
}
194+
195+
@Override
196+
public CompletionStage<Transaction> beginTransactionAsync( TransactionConfig config )
147197
{
148198
//noinspection unchecked
149-
return (CompletionStage) beginTransactionAsync( mode );
199+
return (CompletionStage) beginTransactionAsync( mode, config );
150200
}
151201

152202
@Override
153203
public <T> T readTransaction( TransactionWork<T> work )
154204
{
155-
return transaction( AccessMode.READ, work );
205+
return readTransaction( work, TransactionConfig.empty() );
206+
}
207+
208+
@Override
209+
public <T> T readTransaction( TransactionWork<T> work, TransactionConfig config )
210+
{
211+
return transaction( AccessMode.READ, work, config );
156212
}
157213

158214
@Override
159215
public <T> CompletionStage<T> readTransactionAsync( TransactionWork<CompletionStage<T>> work )
160216
{
161-
return transactionAsync( AccessMode.READ, work );
217+
return readTransactionAsync( work, TransactionConfig.empty() );
218+
}
219+
220+
@Override
221+
public <T> CompletionStage<T> readTransactionAsync( TransactionWork<CompletionStage<T>> work, TransactionConfig config )
222+
{
223+
return transactionAsync( AccessMode.READ, work, config );
162224
}
163225

164226
@Override
165227
public <T> T writeTransaction( TransactionWork<T> work )
166228
{
167-
return transaction( AccessMode.WRITE, work );
229+
return writeTransaction( work, TransactionConfig.empty() );
230+
}
231+
232+
@Override
233+
public <T> T writeTransaction( TransactionWork<T> work, TransactionConfig config )
234+
{
235+
return transaction( AccessMode.WRITE, work, config );
168236
}
169237

170238
@Override
171239
public <T> CompletionStage<T> writeTransactionAsync( TransactionWork<CompletionStage<T>> work )
172240
{
173-
return transactionAsync( AccessMode.WRITE, work );
241+
return writeTransactionAsync( work, TransactionConfig.empty() );
242+
}
243+
244+
@Override
245+
public <T> CompletionStage<T> writeTransactionAsync( TransactionWork<CompletionStage<T>> work, TransactionConfig config )
246+
{
247+
return transactionAsync( AccessMode.WRITE, work, config );
174248
}
175249

176250
void setBookmarks( Bookmarks bookmarks )
@@ -225,15 +299,15 @@ CompletionStage<Boolean> currentConnectionIsOpen()
225299
connection.isOpen() ); // and it's still open
226300
}
227301

228-
private <T> T transaction( AccessMode mode, TransactionWork<T> work )
302+
private <T> T transaction( AccessMode mode, TransactionWork<T> work, TransactionConfig config )
229303
{
230304
// use different code path compared to async so that work is executed in the caller thread
231305
// caller thread will also be the one who sleeps between retries;
232306
// it is unsafe to execute retries in the event loop threads because this can cause a deadlock
233307
// event loop thread will bock and wait for itself to read some data
234308
return retryLogic.retry( () ->
235309
{
236-
try ( Transaction tx = beginTransaction( mode ) )
310+
try ( Transaction tx = beginTransaction( mode, config ) )
237311
{
238312
try
239313
{
@@ -252,12 +326,12 @@ private <T> T transaction( AccessMode mode, TransactionWork<T> work )
252326
} );
253327
}
254328

255-
private <T> CompletionStage<T> transactionAsync( AccessMode mode, TransactionWork<CompletionStage<T>> work )
329+
private <T> CompletionStage<T> transactionAsync( AccessMode mode, TransactionWork<CompletionStage<T>> work, TransactionConfig config )
256330
{
257331
return retryLogic.retryAsync( () ->
258332
{
259333
CompletableFuture<T> resultFuture = new CompletableFuture<>();
260-
CompletionStage<ExplicitTransaction> txFuture = beginTransactionAsync( mode );
334+
CompletionStage<ExplicitTransaction> txFuture = beginTransactionAsync( mode, config );
261335

262336
txFuture.whenComplete( ( tx, completionError ) ->
263337
{
@@ -358,26 +432,27 @@ private <T> void closeTxAfterSucceededTransactionWork( ExplicitTransaction tx, C
358432
}
359433
}
360434

361-
private CompletionStage<InternalStatementResultCursor> run( Statement statement, boolean waitForRunResponse )
435+
private CompletionStage<InternalStatementResultCursor> run( Statement statement, TransactionConfig config, boolean waitForRunResponse )
362436
{
363437
ensureSessionIsOpen();
364438

365439
CompletionStage<InternalStatementResultCursor> newResultCursorStage = ensureNoOpenTxBeforeRunningQuery()
366440
.thenCompose( ignore -> acquireConnection( mode ) )
367-
.thenCompose( connection -> connection.protocol().runInAutoCommitTransaction( connection, statement, waitForRunResponse ) );
441+
.thenCompose( connection ->
442+
connection.protocol().runInAutoCommitTransaction( connection, statement, bookmarks, config, waitForRunResponse ) );
368443

369444
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
370445

371446
return newResultCursorStage;
372447
}
373448

374-
private Transaction beginTransaction( AccessMode mode )
449+
private Transaction beginTransaction( AccessMode mode, TransactionConfig config )
375450
{
376-
return Futures.blockingGet( beginTransactionAsync( mode ),
451+
return Futures.blockingGet( beginTransactionAsync( mode, config ),
377452
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while starting a transaction" ) );
378453
}
379454

380-
private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode mode )
455+
private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode mode, TransactionConfig config )
381456
{
382457
ensureSessionIsOpen();
383458

@@ -387,7 +462,7 @@ private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode m
387462
.thenCompose( connection ->
388463
{
389464
ExplicitTransaction tx = new ExplicitTransaction( connection, NetworkSession.this );
390-
return tx.beginAsync( bookmarks );
465+
return tx.beginAsync( bookmarks, config );
391466
} );
392467

393468
// update the reference to the only known transaction

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import java.util.concurrent.CompletionException;
2323
import java.util.concurrent.CompletionStage;
2424

25+
import org.neo4j.driver.internal.Bookmarks;
2526
import org.neo4j.driver.internal.spi.Connection;
2627
import org.neo4j.driver.internal.util.Futures;
2728
import org.neo4j.driver.internal.util.ServerVersion;
2829
import org.neo4j.driver.v1.Record;
2930
import org.neo4j.driver.v1.Statement;
3031
import org.neo4j.driver.v1.StatementResultCursor;
32+
import org.neo4j.driver.v1.TransactionConfig;
3133
import org.neo4j.driver.v1.exceptions.ClientException;
3234

3335
import static org.neo4j.driver.internal.util.ServerVersion.v3_2_0;
@@ -60,7 +62,7 @@ public CompletionStage<RoutingProcedureResponse> run( CompletionStage<Connection
6062
CompletionStage<List<Record>> runProcedure( Connection connection, Statement procedure )
6163
{
6264
return connection.protocol()
63-
.runInAutoCommitTransaction( connection, procedure, true )
65+
.runInAutoCommitTransaction( connection, procedure, Bookmarks.empty(), TransactionConfig.empty(), true )
6466
.thenCompose( StatementResultCursor::listAsync );
6567
}
6668

driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.neo4j.driver.v1.Session;
3535
import org.neo4j.driver.v1.Statement;
3636
import org.neo4j.driver.v1.Transaction;
37+
import org.neo4j.driver.v1.TransactionConfig;
3738
import org.neo4j.driver.v1.Value;
3839
import org.neo4j.driver.v1.exceptions.ClientException;
3940

@@ -62,9 +63,10 @@ public interface BoltProtocol
6263
*
6364
* @param connection the connection to use.
6465
* @param bookmarks the bookmarks. Never null, should be {@link Bookmarks#empty()} when absent.
66+
* @param config the transaction configuration. Never null, should be {@link TransactionConfig#empty()} when absent.
6567
* @return a completion stage completed when transaction is started or completed exceptionally when there was a failure.
6668
*/
67-
CompletionStage<Void> beginTransaction( Connection connection, Bookmarks bookmarks );
69+
CompletionStage<Void> beginTransaction( Connection connection, Bookmarks bookmarks, TransactionConfig config );
6870

6971
/**
7072
* Commit the explicit transaction.
@@ -87,12 +89,15 @@ public interface BoltProtocol
8789
*
8890
* @param connection the network connection to use.
8991
* @param statement the cypher to execute.
92+
* @param bookmarks the bookmarks. Never null, should be {@link Bookmarks#empty()} when absent.
93+
* @param config the transaction config for the implicitly started auto-commit transaction.
9094
* @param waitForRunResponse {@code true} for async query execution and {@code false} for blocking query
9195
* execution. Makes returned cursor stage be chained after the RUN response arrives. Needed to have statement
9296
* keys populated.
9397
* @return stage with cursor.
9498
*/
95-
CompletionStage<InternalStatementResultCursor> runInAutoCommitTransaction( Connection connection, Statement statement, boolean waitForRunResponse );
99+
CompletionStage<InternalStatementResultCursor> runInAutoCommitTransaction( Connection connection, Statement statement,
100+
Bookmarks bookmarks, TransactionConfig config, boolean waitForRunResponse );
96101

97102
/**
98103
* Execute the given statement in a running explicit transaction, i.e. {@link Transaction#run(Statement)}.

driver/src/main/java/org/neo4j/driver/internal/messaging/request/BeginMessage.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,18 @@
2323
import java.util.Objects;
2424

2525
import org.neo4j.driver.internal.Bookmarks;
26+
import org.neo4j.driver.v1.TransactionConfig;
2627
import org.neo4j.driver.v1.Value;
2728

2829
public class BeginMessage extends TransactionStartingMessage
2930
{
3031
public static final byte SIGNATURE = 0x11;
3132

33+
public BeginMessage( Bookmarks bookmarks, TransactionConfig config )
34+
{
35+
this( bookmarks, config.timeout(), config.metadata() );
36+
}
37+
3238
public BeginMessage( Bookmarks bookmarks, Duration txTimeout, Map<String,Value> txMetadata )
3339
{
3440
super( bookmarks, txTimeout, txMetadata );

0 commit comments

Comments
 (0)