Skip to content

Commit f7f627e

Browse files
CSHARP-3671: Better wait queue timeout errors for load balanced clusters.
1 parent cfa834f commit f7f627e

File tree

19 files changed

+477
-122
lines changed

19 files changed

+477
-122
lines changed

src/MongoDB.Driver.Core/Core/Bindings/ReadPreferenceBinding.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public sealed class ReadPreferenceBinding : IReadBinding
3434
private readonly ReadPreference _readPreference;
3535
private readonly IServerSelector _serverSelector;
3636
private readonly ICoreSessionHandle _session;
37+
private readonly TrackedOperationRunContext _trackedOperationRunContext;
3738

3839
// constructors
3940
/// <summary>
@@ -43,11 +44,17 @@ public sealed class ReadPreferenceBinding : IReadBinding
4344
/// <param name="readPreference">The read preference.</param>
4445
/// <param name="session">The session.</param>
4546
public ReadPreferenceBinding(ICluster cluster, ReadPreference readPreference, ICoreSessionHandle session)
47+
: this(cluster, readPreference, session, trackedOperationRunContext: null)
48+
{
49+
}
50+
51+
internal ReadPreferenceBinding(ICluster cluster, ReadPreference readPreference, ICoreSessionHandle session, TrackedOperationRunContext trackedOperationRunContext)
4652
{
4753
_cluster = Ensure.IsNotNull(cluster, nameof(cluster));
4854
_readPreference = Ensure.IsNotNull(readPreference, nameof(readPreference));
4955
_session = Ensure.IsNotNull(session, nameof(session));
5056
_serverSelector = new ReadPreferenceServerSelector(readPreference);
57+
_trackedOperationRunContext = trackedOperationRunContext; // can be null
5158
}
5259

5360
// properties
@@ -82,7 +89,7 @@ public async Task<IChannelSourceHandle> GetReadChannelSourceAsync(CancellationTo
8289

8390
private IChannelSourceHandle GetChannelSourceHelper(IServer server)
8491
{
85-
return new ChannelSourceHandle(new ServerChannelSource(server, _session.Fork()));
92+
return new ChannelSourceHandle(new ServerChannelSource(server, _session.Fork(), _trackedOperationRunContext));
8693
}
8794

8895
/// <inheritdoc/>

src/MongoDB.Driver.Core/Core/Bindings/ServerChannelSource.cs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
using System;
1717
using System.Threading;
1818
using System.Threading.Tasks;
19-
using MongoDB.Driver.Core.Connections;
2019
using MongoDB.Driver.Core.Misc;
2120
using MongoDB.Driver.Core.Servers;
2221

@@ -31,6 +30,7 @@ public sealed class ServerChannelSource : IChannelSource
3130
private bool _disposed;
3231
private readonly IServer _server;
3332
private readonly ICoreSessionHandle _session;
33+
private readonly TrackedOperationRunContext _trackedOperationRunContext;
3434

3535
// constructors
3636
/// <summary>
@@ -39,9 +39,15 @@ public sealed class ServerChannelSource : IChannelSource
3939
/// <param name="server">The server.</param>
4040
/// <param name="session">The session.</param>
4141
public ServerChannelSource(IServer server, ICoreSessionHandle session)
42+
: this(server, session, trackedOperationRunContext: null)
43+
{
44+
}
45+
46+
internal ServerChannelSource(IServer server, ICoreSessionHandle session, TrackedOperationRunContext trackedOperationRunContext)
4247
{
4348
_server = Ensure.IsNotNull(server, nameof(server));
4449
_session = Ensure.IsNotNull(session, nameof(session));
50+
_trackedOperationRunContext = trackedOperationRunContext; // can be null
4551
}
4652

4753
// properties
@@ -78,14 +84,28 @@ public void Dispose()
7884
public IChannelHandle GetChannel(CancellationToken cancellationToken)
7985
{
8086
ThrowIfDisposed();
81-
return _server.GetChannel(cancellationToken);
87+
if (_trackedOperationRunContext != null && _server is IServerWithTrackedGetChannel trackedServer)
88+
{
89+
return trackedServer.GetChannel(_trackedOperationRunContext, cancellationToken);
90+
}
91+
else
92+
{
93+
return _server.GetChannel(cancellationToken);
94+
}
8295
}
8396

8497
/// <inheritdoc/>
8598
public Task<IChannelHandle> GetChannelAsync(CancellationToken cancellationToken)
8699
{
87100
ThrowIfDisposed();
88-
return _server.GetChannelAsync(cancellationToken);
101+
if (_trackedOperationRunContext != null && _server is IServerWithTrackedGetChannel trackedServer)
102+
{
103+
return trackedServer.GetChannelAsync(_trackedOperationRunContext, cancellationToken);
104+
}
105+
else
106+
{
107+
return _server.GetChannelAsync(cancellationToken);
108+
}
89109
}
90110

91111
private void ThrowIfDisposed()
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/* Copyright 2021-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
namespace MongoDB.Driver.Core.Bindings
17+
{
18+
/// <summary>
19+
/// Shows running context for tracking operation.
20+
/// </summary>
21+
public class TrackedOperationRunContext
22+
{
23+
#region static
24+
/// <summary>
25+
/// Create not tracked running context.
26+
/// </summary>
27+
public static TrackedOperationRunContext CreateEmpty() => new TrackedOperationRunContext(false, false);
28+
#endregion
29+
30+
private readonly bool _isInTransaction;
31+
private readonly bool _withCursorResult;
32+
33+
/// <summary>
34+
/// Initializes a new instance of the <see cref="TrackedOperationRunContext"/> class.
35+
/// </summary>
36+
/// <param name="isInTransaction">Determine whether the operation is under transaction.</param>
37+
/// <param name="withCursorResult">Determine whether the operation initiates a cursor result.</param>
38+
public TrackedOperationRunContext(bool isInTransaction, bool withCursorResult = false)
39+
{
40+
_isInTransaction = isInTransaction;
41+
_withCursorResult = withCursorResult;
42+
}
43+
44+
/// <summary>
45+
/// Determine whether the operation is under transaction.
46+
/// </summary>
47+
public bool IsInTransaction => _isInTransaction;
48+
49+
/// <summary>
50+
/// Determine whether the operation initiates a cursor result.
51+
/// </summary>
52+
public bool WithCursorResult => _withCursorResult;
53+
}
54+
}

src/MongoDB.Driver.Core/Core/Bindings/WritableServerBinding.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public sealed class WritableServerBinding : IReadWriteBinding
3232
private readonly ICluster _cluster;
3333
private bool _disposed;
3434
private readonly ICoreSessionHandle _session;
35+
private readonly TrackedOperationRunContext _trackedOperationRunContext;
3536

3637
// constructors
3738
/// <summary>
@@ -40,9 +41,15 @@ public sealed class WritableServerBinding : IReadWriteBinding
4041
/// <param name="cluster">The cluster.</param>
4142
/// <param name="session">The session.</param>
4243
public WritableServerBinding(ICluster cluster, ICoreSessionHandle session)
44+
: this(cluster, session, trackedOperationRunContext: null)
45+
{
46+
}
47+
48+
internal WritableServerBinding(ICluster cluster, ICoreSessionHandle session, TrackedOperationRunContext trackedOperationRunContext)
4349
{
4450
_cluster = Ensure.IsNotNull(cluster, nameof(cluster));
4551
_session = Ensure.IsNotNull(session, nameof(session));
52+
_trackedOperationRunContext = trackedOperationRunContext; // can be null
4653
}
4754

4855
// properties
@@ -94,7 +101,7 @@ public async Task<IChannelSourceHandle> GetWriteChannelSourceAsync(CancellationT
94101

95102
private IChannelSourceHandle GetChannelSourceHelper(IServer server)
96103
{
97-
return new ChannelSourceHandle(new ServerChannelSource(server, _session.Fork()));
104+
return new ChannelSourceHandle(new ServerChannelSource(server, _session.Fork(), _trackedOperationRunContext));
98105
}
99106

100107
/// <inheritdoc/>

src/MongoDB.Driver.Core/Core/ChannelPinningHelper.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ public static class ChannelPinningHelper
3232
/// <param name="cluster">The cluster,</param>
3333
/// <param name="session">The session.</param>
3434
/// <param name="readPreference">The read preference.</param>
35+
/// <param name="trackedOperationRunContext">The tracked operation run context.</param>
3536
/// <returns>An effective read binging.</returns>
36-
public static IReadBindingHandle CreateReadBinding(ICluster cluster, ICoreSessionHandle session, ReadPreference readPreference)
37+
public static IReadBindingHandle CreateReadBinding(ICluster cluster, ICoreSessionHandle session, ReadPreference readPreference, TrackedOperationRunContext trackedOperationRunContext = null)
3738
{
3839
IReadBinding readBinding;
3940
if (session.IsInTransaction &&
@@ -52,7 +53,9 @@ public static IReadBindingHandle CreateReadBinding(ICluster cluster, ICoreSessio
5253
// unpin if the next operation is not under transaction
5354
session.CurrentTransaction.UnpinAll();
5455
}
55-
readBinding = new ReadPreferenceBinding(cluster, readPreference, session);
56+
57+
trackedOperationRunContext = trackedOperationRunContext ?? new TrackedOperationRunContext(session.IsInTransaction, withCursorResult: false); // the default
58+
readBinding = new ReadPreferenceBinding(cluster, readPreference, session, trackedOperationRunContext);
5659
}
5760

5861
return new ReadBindingHandle(readBinding);
@@ -63,8 +66,9 @@ public static IReadBindingHandle CreateReadBinding(ICluster cluster, ICoreSessio
6366
/// </summary>
6467
/// <param name="cluster">The cluster.</param>
6568
/// <param name="session">The session.</param>
69+
/// <param name="trackedOperationRunContext">The tracked operation run context.</param>
6670
/// <returns>An effective read write binging.</returns>
67-
public static IReadWriteBindingHandle CreateReadWriteBinding(ICluster cluster, ICoreSessionHandle session)
71+
public static IReadWriteBindingHandle CreateReadWriteBinding(ICluster cluster, ICoreSessionHandle session, TrackedOperationRunContext trackedOperationRunContext = null)
6872
{
6973
IReadWriteBinding readWriteBinding;
7074
if (session.IsInTransaction &&
@@ -83,7 +87,9 @@ public static IReadWriteBindingHandle CreateReadWriteBinding(ICluster cluster, I
8387
// unpin if the next operation is not under transaction
8488
session.CurrentTransaction.UnpinAll();
8589
}
86-
readWriteBinding = new WritableServerBinding(cluster, session);
90+
91+
trackedOperationRunContext = trackedOperationRunContext ?? new TrackedOperationRunContext(session.IsInTransaction, withCursorResult: false); // the default
92+
readWriteBinding = new WritableServerBinding(cluster, session, trackedOperationRunContext);
8793
}
8894

8995
return new ReadWriteBindingHandle(readWriteBinding);

src/MongoDB.Driver.Core/Core/ConnectionPools/ExclusiveConnectionPool.Helpers.cs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
namespace MongoDB.Driver.Core.ConnectionPools
3131
{
32-
internal sealed partial class ExclusiveConnectionPool : IConnectionPool
32+
internal sealed partial class ExclusiveConnectionPool
3333
{
3434
// nested classes
3535
private static class State
@@ -42,14 +42,16 @@ private static class State
4242
private sealed class AcquireConnectionHelper
4343
{
4444
// private fields
45+
private readonly CheckedOutReason _checkedOutReason;
4546
private readonly ExclusiveConnectionPool _pool;
4647
private bool _enteredPool;
4748
private bool _enteredWaitQueue;
4849
private Stopwatch _stopwatch;
4950

5051
// constructors
51-
public AcquireConnectionHelper(ExclusiveConnectionPool pool)
52+
public AcquireConnectionHelper(ExclusiveConnectionPool pool, CheckedOutReason checkedOutReason)
5253
{
54+
_checkedOutReason = checkedOutReason;
5355
_pool = pool;
5456
}
5557

@@ -82,7 +84,7 @@ public IConnectionHandle EnteredPool(bool enteredPool, CancellationToken cancell
8284
if (enteredPool)
8385
{
8486
var timeSpentInWaitQueue = _stopwatch.Elapsed;
85-
using (var connectionCreator = new ConnectionCreator(_pool, _pool._settings.WaitQueueTimeout - timeSpentInWaitQueue))
87+
using (var connectionCreator = new ConnectionCreator(_pool, _pool._settings.WaitQueueTimeout - timeSpentInWaitQueue, _checkedOutReason))
8688
{
8789
connection = connectionCreator.CreateOpenedOrReuse(cancellationToken);
8890
}
@@ -99,7 +101,7 @@ public async Task<IConnectionHandle> EnteredPoolAsync(bool enteredPool, Cancella
99101
if (enteredPool)
100102
{
101103
var timeSpentInWaitQueue = _stopwatch.Elapsed;
102-
using (var connectionCreator = new ConnectionCreator(_pool, _pool._settings.WaitQueueTimeout - timeSpentInWaitQueue))
104+
using (var connectionCreator = new ConnectionCreator(_pool, _pool._settings.WaitQueueTimeout - timeSpentInWaitQueue, _checkedOutReason))
103105
{
104106
connection = await connectionCreator.CreateOpenedOrReuseAsync(cancellationToken).ConfigureAwait(false);
105107
}
@@ -118,13 +120,20 @@ private AcquiredConnection FinalizePoolEnterance(PooledConnection pooledConnecti
118120
var checkedOutConnectionEvent = new ConnectionPoolCheckedOutConnectionEvent(connectionHandle.ConnectionId, _stopwatch.Elapsed, EventContext.OperationId);
119121
_pool._checkedOutConnectionEventHandler?.Invoke(checkedOutConnectionEvent);
120122

123+
_pool._checkedOutTracker.CheckOut(_checkedOutReason);
124+
121125
return connectionHandle;
122126
}
123127
else
124128
{
125129
_stopwatch.Stop();
126130

127-
var message = $"Timed out waiting for a connection after {_stopwatch.ElapsedMilliseconds}ms.";
131+
var message =
132+
"Timeout waiting for connection from the connection pool. " +
133+
$"maxPoolSize: {_pool._settings.MaxConnections}, " +
134+
$"connections in use by cursors: {_pool._checkedOutTracker.GetCheckedOutNumber(CheckedOutReason.Cursor)}, " +
135+
$"connections in use by transactions: {_pool._checkedOutTracker.GetCheckedOutNumber(CheckedOutReason.Transaction)}, " +
136+
$"connections in use by other operations: {_pool._checkedOutTracker.GetCheckedOutNumber(CheckedOutReason.NotSet)}";
128137
throw new TimeoutException(message);
129138
}
130139
}
@@ -175,18 +184,25 @@ public void HandleException(Exception ex)
175184

176185
private sealed class PooledConnection : IConnection
177186
{
187+
private readonly CheckedOutReason _checkedOutReason;
178188
private readonly IConnection _connection;
179189
private readonly ExclusiveConnectionPool _connectionPool;
180190
private int _generation;
181191
private bool _disposed;
182192

183-
public PooledConnection(ExclusiveConnectionPool connectionPool, IConnection connection)
193+
public PooledConnection(ExclusiveConnectionPool connectionPool, IConnection connection, CheckedOutReason checkedOutReason)
184194
{
195+
_checkedOutReason = checkedOutReason;
185196
_connectionPool = connectionPool;
186197
_connection = connection;
187198
_generation = connectionPool._generation;
188199
}
189200

201+
public CheckedOutReason CheckedOutReason
202+
{
203+
get { return _checkedOutReason; }
204+
}
205+
190206
public ConnectionId ConnectionId
191207
{
192208
get { return _connection.ConnectionId; }
@@ -621,6 +637,7 @@ private void SignalOrReset()
621637

622638
private sealed class ConnectionCreator : IDisposable
623639
{
640+
private readonly CheckedOutReason _checkedOutReason;
624641
private readonly ExclusiveConnectionPool _pool;
625642
private readonly TimeSpan _connectingTimeout;
626643

@@ -631,8 +648,9 @@ private sealed class ConnectionCreator : IDisposable
631648

632649
private Stopwatch _stopwatch;
633650

634-
public ConnectionCreator(ExclusiveConnectionPool pool, TimeSpan connectingTimeout)
651+
public ConnectionCreator(ExclusiveConnectionPool pool, TimeSpan connectingTimeout, CheckedOutReason checkedOutReason)
635652
{
653+
_checkedOutReason = checkedOutReason;
636654
_pool = pool;
637655
_connectingTimeout = connectingTimeout;
638656
_connectingWaitStatus = SemaphoreSlimSignalable.SemaphoreWaitResult.None;
@@ -768,7 +786,7 @@ private void StartCreating(CancellationToken cancellationToken)
768786
cancellationToken.ThrowIfCancellationRequested();
769787

770788
_stopwatch = Stopwatch.StartNew();
771-
_connection = _pool.CreateNewConnection();
789+
_connection = _pool.CreateNewConnection(_checkedOutReason);
772790
}
773791

774792
private void FinishCreating(ConnectionDescription description)

0 commit comments

Comments
 (0)