diff --git a/src/MongoDB.Driver.Core/Core/ChannelPinningHelper.cs b/src/MongoDB.Driver.Core/Core/ChannelPinningHelper.cs index 44c57e1de66..12da1bafbc7 100644 --- a/src/MongoDB.Driver.Core/Core/ChannelPinningHelper.cs +++ b/src/MongoDB.Driver.Core/Core/ChannelPinningHelper.cs @@ -16,6 +16,7 @@ using System.Threading; using MongoDB.Driver.Core.Bindings; using MongoDB.Driver.Core.Clusters; +using MongoDB.Driver.Core.ConnectionPools; using MongoDB.Driver.Core.Connections; using MongoDB.Driver.Core.Servers; @@ -89,18 +90,20 @@ public static IReadWriteBindingHandle CreateReadWriteBinding(ICluster cluster, I return new ReadWriteBindingHandle(readWriteBinding); } - internal static IChannelSourceHandle CreateGetMoreChannelSource(IChannelSourceHandle channelSource, long cursorId) + internal static IChannelSourceHandle CreateGetMoreChannelSource(IChannelSourceHandle channelSource, IChannelHandle channel, long cursorId) { IChannelSource effectiveChannelSource; if (IsInLoadBalancedMode(channelSource.ServerDescription) && cursorId != 0) { - var getMoreChannel = channelSource.GetChannel(CancellationToken.None); // no need for cancellation token since we already have channel in the source - var getMoreSession = channelSource.Session.Fork(); + if (channel.Connection is ICheckOutReasonTracker checkOutReasonTracker) + { + checkOutReasonTracker.SetCheckOutReasonIfNotAlreadySet(CheckOutReason.Cursor); + } effectiveChannelSource = new ChannelChannelSource( channelSource.Server, - getMoreChannel, - getMoreSession); + channel.Fork(), + channelSource.Session.Fork()); } else { @@ -110,37 +113,22 @@ internal static IChannelSourceHandle CreateGetMoreChannelSource(IChannelSourceHa return new ChannelSourceHandle(effectiveChannelSource); } - internal static bool PinChannelSourceAndChannelIfRequired( + internal static void PinChannellIfRequired( IChannelSourceHandle channelSource, IChannelHandle channel, - ICoreSessionHandle session, - out IChannelSourceHandle pinnedChannelSource, - out IChannelHandle pinnedChannel) + ICoreSessionHandle session) { - if (IsInLoadBalancedMode(channel.ConnectionDescription)) + if (IsInLoadBalancedMode(channel.ConnectionDescription) && + session.IsInTransaction && + !IsChannelPinned(session.CurrentTransaction)) { - var server = channelSource.Server; - - pinnedChannelSource = new ChannelSourceHandle( - new ChannelChannelSource( - server, - channel.Fork(), - session.Fork())); - - if (session.IsInTransaction && !IsChannelPinned(session.CurrentTransaction)) + if (channel.Connection is ICheckOutReasonTracker checkOutReasonTracker) { - session.CurrentTransaction.PinChannel(channel.Fork()); - session.CurrentTransaction.PinnedServer = server; + checkOutReasonTracker.SetCheckOutReasonIfNotAlreadySet(CheckOutReason.Transaction); } - - pinnedChannel = channel.Fork(); - - return true; + session.CurrentTransaction.PinChannel(channel.Fork()); + session.CurrentTransaction.PinnedServer = channelSource.Server; } - - pinnedChannelSource = null; - pinnedChannel = null; - return false; } // private methods diff --git a/src/MongoDB.Driver.Core/Core/ConnectionPools/CheckOutReasonCounter.cs b/src/MongoDB.Driver.Core/Core/ConnectionPools/CheckOutReasonCounter.cs new file mode 100644 index 00000000000..185ea12c8a0 --- /dev/null +++ b/src/MongoDB.Driver.Core/Core/ConnectionPools/CheckOutReasonCounter.cs @@ -0,0 +1,78 @@ +/* Copyright 2021-present MongoDB Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System; +using System.Threading; + +namespace MongoDB.Driver.Core.ConnectionPools +{ + internal enum CheckOutReason + { + Cursor, + Transaction + } + + internal interface ICheckOutReasonTracker + { + CheckOutReason? CheckOutReason { get; } + void SetCheckOutReasonIfNotAlreadySet(CheckOutReason reason); + } + + internal sealed class CheckOutReasonCounter + { + public int _cursorCheckOutsCount = 0; + public int _transactionCheckOutsCount = 0; + + public int GetCheckOutsCount(CheckOutReason reason) => + reason switch + { + CheckOutReason.Cursor => _cursorCheckOutsCount, + CheckOutReason.Transaction => _transactionCheckOutsCount, + _ => throw new InvalidOperationException($"Invalid checkout reason {reason}.") + }; + + public void Increment(CheckOutReason reason) + { + switch (reason) + { + case CheckOutReason.Cursor: + Interlocked.Increment(ref _cursorCheckOutsCount); + break; + case CheckOutReason.Transaction: + Interlocked.Increment(ref _transactionCheckOutsCount); + break; + default: + throw new InvalidOperationException($"Invalid checkout reason {reason}."); + } + } + + public void Decrement(CheckOutReason? reason) + { + switch (reason) + { + case null: + break; + case CheckOutReason.Cursor: + Interlocked.Decrement(ref _cursorCheckOutsCount); + break; + case CheckOutReason.Transaction: + Interlocked.Decrement(ref _transactionCheckOutsCount); + break; + default: + throw new InvalidOperationException($"Invalid checkout reason {reason}."); + } + } + } +} diff --git a/src/MongoDB.Driver.Core/Core/ConnectionPools/ExclusiveConnectionPool.Helpers.cs b/src/MongoDB.Driver.Core/Core/ConnectionPools/ExclusiveConnectionPool.Helpers.cs index 29ce9f83d7b..84947701656 100644 --- a/src/MongoDB.Driver.Core/Core/ConnectionPools/ExclusiveConnectionPool.Helpers.cs +++ b/src/MongoDB.Driver.Core/Core/ConnectionPools/ExclusiveConnectionPool.Helpers.cs @@ -29,8 +29,33 @@ namespace MongoDB.Driver.Core.ConnectionPools { - internal sealed partial class ExclusiveConnectionPool : IConnectionPool + internal sealed partial class ExclusiveConnectionPool { + // private methods + private Exception CreateTimeoutException(Stopwatch stopwatch, string message) + { + var checkOutsForCursorCount = _checkOutReasonCounter.GetCheckOutsCount(CheckOutReason.Cursor); + var checkOutsForTransactionCount = _checkOutReasonCounter.GetCheckOutsCount(CheckOutReason.Transaction); + + // only use the expanded message format when connected to a load balancer + if (checkOutsForCursorCount != 0 || checkOutsForTransactionCount != 0) + { + var maxPoolSize = _settings.MaxConnections; + var availableConnectionsCount = AvailableCount; + var checkOutsCount = maxPoolSize - availableConnectionsCount; + var checkOutsForOtherCount = checkOutsCount - checkOutsForCursorCount - checkOutsForTransactionCount; + + message = + $"Timed out after {stopwatch.ElapsedMilliseconds}ms waiting for a connection from the connection pool. " + + $"maxPoolSize: {maxPoolSize}, " + + $"connections in use by cursors: {checkOutsForCursorCount}, " + + $"connections in use by transactions: {checkOutsForTransactionCount}, " + + $"connections in use by other operations: {checkOutsForOtherCount}."; + } + + return new TimeoutException(message); + } + // nested classes private static class State { @@ -125,7 +150,7 @@ private AcquiredConnection FinalizePoolEnterance(PooledConnection pooledConnecti _stopwatch.Stop(); var message = $"Timed out waiting for a connection after {_stopwatch.ElapsedMilliseconds}ms."; - throw new TimeoutException(message); + throw _pool.CreateTimeoutException(_stopwatch, message); } } @@ -173,8 +198,9 @@ public void HandleException(Exception ex) } } - private sealed class PooledConnection : IConnection + private sealed class PooledConnection : IConnection, ICheckOutReasonTracker { + private CheckOutReason? _checkOutReason; private readonly IConnection _connection; private readonly ExclusiveConnectionPool _connectionPool; private int _generation; @@ -187,6 +213,14 @@ public PooledConnection(ExclusiveConnectionPool connectionPool, IConnection conn _generation = connectionPool._generation; } + public CheckOutReason? CheckOutReason + { + get + { + return _checkOutReason; + } + } + public ConnectionId ConnectionId { get { return _connection.ConnectionId; } @@ -313,6 +347,15 @@ public async Task SendMessagesAsync(IEnumerable messages, Messag } } + public void SetCheckOutReasonIfNotAlreadySet(CheckOutReason reason) + { + if (_checkOutReason == null) + { + _checkOutReason = reason; + _connectionPool._checkOutReasonCounter.Increment(reason); + } + } + public void SetReadTimeout(TimeSpan timeout) { _connection.SetReadTimeout(timeout); @@ -335,7 +378,7 @@ private void SetEffectiveGenerationIfRequired(ConnectionDescription description) } } - private sealed class AcquiredConnection : IConnectionHandle + private sealed class AcquiredConnection : IConnectionHandle, ICheckOutReasonTracker { private ExclusiveConnectionPool _connectionPool; private bool _disposed; @@ -347,6 +390,14 @@ public AcquiredConnection(ExclusiveConnectionPool connectionPool, ReferenceCount _reference = reference; } + public CheckOutReason? CheckOutReason + { + get + { + return _reference.Instance.CheckOutReason; + } + } + public ConnectionId ConnectionId { get { return _reference.Instance.ConnectionId; } @@ -432,6 +483,12 @@ public Task SendMessagesAsync(IEnumerable messages, MessageEncod return _reference.Instance.SendMessagesAsync(messages, messageEncoderSettings, cancellationToken); } + public void SetCheckOutReasonIfNotAlreadySet(CheckOutReason reason) + { + ThrowIfDisposed(); + _reference.Instance.SetCheckOutReasonIfNotAlreadySet(reason); + } + public void SetReadTimeout(TimeSpan timeout) { ThrowIfDisposed(); @@ -674,7 +731,7 @@ public PooledConnection CreateOpenedOrReuse(CancellationToken cancellationToken) { SemaphoreSlimSignalable.SemaphoreWaitResult.Signaled => _pool._connectionHolder.Acquire(), SemaphoreSlimSignalable.SemaphoreWaitResult.Entered => CreateOpenedInternal(cancellationToken), - SemaphoreSlimSignalable.SemaphoreWaitResult.TimedOut => throw new TimeoutException($"Timed out waiting in connecting queue after {stopwatch.ElapsedMilliseconds}ms."), + SemaphoreSlimSignalable.SemaphoreWaitResult.TimedOut => throw CreateTimeoutException(stopwatch), _ => throw new InvalidOperationException($"Invalid wait result {_connectingWaitStatus}") }; @@ -682,7 +739,7 @@ public PooledConnection CreateOpenedOrReuse(CancellationToken cancellationToken) if (connection == null && waitTimeout <= TimeSpan.Zero) { - throw TimoutException(stopwatch); + throw CreateTimeoutException(stopwatch); } } @@ -708,7 +765,7 @@ public async Task CreateOpenedOrReuseAsync(CancellationToken c { SemaphoreSlimSignalable.SemaphoreWaitResult.Signaled => _pool._connectionHolder.Acquire(), SemaphoreSlimSignalable.SemaphoreWaitResult.Entered => await CreateOpenedInternalAsync(cancellationToken).ConfigureAwait(false), - SemaphoreSlimSignalable.SemaphoreWaitResult.TimedOut => throw TimoutException(stopwatch), + SemaphoreSlimSignalable.SemaphoreWaitResult.TimedOut => throw CreateTimeoutException(stopwatch), _ => throw new InvalidOperationException($"Invalid wait result {_connectingWaitStatus}") }; @@ -716,7 +773,7 @@ public async Task CreateOpenedOrReuseAsync(CancellationToken c if (connection == null && waitTimeout <= TimeSpan.Zero) { - throw TimoutException(stopwatch); + throw CreateTimeoutException(stopwatch); } } @@ -783,8 +840,11 @@ private void FinishCreating(ConnectionDescription description) _pool._serviceStates.IncrementConnectionCount(description?.ServiceId); } - private Exception TimoutException(Stopwatch stopwatch) => - new TimeoutException($"Timed out waiting in connecting queue after {stopwatch.ElapsedMilliseconds}ms."); + private Exception CreateTimeoutException(Stopwatch stopwatch) + { + var message = $"Timed out waiting in connecting queue after {stopwatch.ElapsedMilliseconds}ms."; + return _pool.CreateTimeoutException(stopwatch, message); + } } } } diff --git a/src/MongoDB.Driver.Core/Core/ConnectionPools/ExclusiveConnectionPool.cs b/src/MongoDB.Driver.Core/Core/ConnectionPools/ExclusiveConnectionPool.cs index 8ed28939bf0..cb80790e75e 100644 --- a/src/MongoDB.Driver.Core/Core/ConnectionPools/ExclusiveConnectionPool.cs +++ b/src/MongoDB.Driver.Core/Core/ConnectionPools/ExclusiveConnectionPool.cs @@ -29,6 +29,7 @@ namespace MongoDB.Driver.Core.ConnectionPools internal sealed partial class ExclusiveConnectionPool : IConnectionPool { // fields + private readonly CheckOutReasonCounter _checkOutReasonCounter; private readonly IConnectionFactory _connectionFactory; private readonly ListConnectionHolder _connectionHolder; private readonly EndPoint _endPoint; @@ -71,6 +72,7 @@ public ExclusiveConnectionPool( _connectionFactory = Ensure.IsNotNull(connectionFactory, nameof(connectionFactory)); Ensure.IsNotNull(eventSubscriber, nameof(eventSubscriber)); + _checkOutReasonCounter = new CheckOutReasonCounter(); _connectingQueue = new SemaphoreSlimSignalable(MongoInternalDefaults.ConnectionPool.MaxConnecting); _connectionHolder = new ListConnectionHolder(eventSubscriber, _connectingQueue); _serviceStates = new ServiceStates(); @@ -385,6 +387,8 @@ private void ReleaseConnection(PooledConnection connection) _checkedInConnectionEventHandler(new ConnectionPoolCheckedInConnectionEvent(connection.ConnectionId, TimeSpan.Zero, EventContext.OperationId)); } + _checkOutReasonCounter.Decrement(connection.CheckOutReason); + if (!connection.IsExpired && _state.Value != State.Disposed) { _connectionHolder.Return(connection); diff --git a/src/MongoDB.Driver.Core/Core/Operations/AggregateOperation.cs b/src/MongoDB.Driver.Core/Core/Operations/AggregateOperation.cs index 733d00d783f..2d513c92a6d 100644 --- a/src/MongoDB.Driver.Core/Core/Operations/AggregateOperation.cs +++ b/src/MongoDB.Driver.Core/Core/Operations/AggregateOperation.cs @@ -298,7 +298,7 @@ public IAsyncCursor Execute(RetryableReadContext context, CancellationT context.ChannelSource.Session.SetSnapshotTimeIfNeeded(result.AtClusterTime); - return CreateCursor(context.ChannelSource, operation.Command, result); + return CreateCursor(context.ChannelSource, context.Channel, operation.Command, result); } } @@ -326,7 +326,7 @@ public async Task> ExecuteAsync(RetryableReadContext conte context.ChannelSource.Session.SetSnapshotTimeIfNeeded(result.AtClusterTime); - return CreateCursor(context.ChannelSource, operation.Command, result); + return CreateCursor(context.ChannelSource, context.Channel,operation.Command, result); } } @@ -387,11 +387,11 @@ private ReadCommandOperation CreateOperation(RetryableReadConte }; } - private AsyncCursor CreateCursor(IChannelSourceHandle channelSource, BsonDocument command, AggregateResult result) + private AsyncCursor CreateCursor(IChannelSourceHandle channelSource, IChannelHandle channel, BsonDocument command, AggregateResult result) { if (result.CursorId.HasValue) { - return CreateCursorFromCursorResult(channelSource, command, result); + return CreateCursorFromCursorResult(channelSource, channel, command, result); } else { @@ -400,10 +400,10 @@ private AsyncCursor CreateCursor(IChannelSourceHandle channelSource, Bs } } - private AsyncCursor CreateCursorFromCursorResult(IChannelSourceHandle channelSource, BsonDocument command, AggregateResult result) + private AsyncCursor CreateCursorFromCursorResult(IChannelSourceHandle channelSource, IChannelHandle channel, BsonDocument command, AggregateResult result) { var cursorId = result.CursorId.GetValueOrDefault(0); - var getMoreChannelSource = ChannelPinningHelper.CreateGetMoreChannelSource(channelSource, cursorId); + var getMoreChannelSource = ChannelPinningHelper.CreateGetMoreChannelSource(channelSource, channel, cursorId); return new AsyncCursor( getMoreChannelSource, result.CollectionNamespace, diff --git a/src/MongoDB.Driver.Core/Core/Operations/FindCommandOperation.cs b/src/MongoDB.Driver.Core/Core/Operations/FindCommandOperation.cs index 2d94705a029..3635706237a 100644 --- a/src/MongoDB.Driver.Core/Core/Operations/FindCommandOperation.cs +++ b/src/MongoDB.Driver.Core/Core/Operations/FindCommandOperation.cs @@ -479,12 +479,12 @@ internal BsonDocument CreateCommand(ConnectionDescription connectionDescription, }; } - private AsyncCursor CreateCursor(IChannelSourceHandle channelSource, BsonDocument commandResult) + private AsyncCursor CreateCursor(IChannelSourceHandle channelSource, IChannelHandle channel, BsonDocument commandResult) { var cursorDocument = commandResult["cursor"].AsBsonDocument; var collectionNamespace = CollectionNamespace.FromFullName(cursorDocument["ns"].AsString); var firstBatch = CreateFirstCursorBatch(cursorDocument); - var getMoreChannelSource = ChannelPinningHelper.CreateGetMoreChannelSource(channelSource, firstBatch.CursorId); + var getMoreChannelSource = ChannelPinningHelper.CreateGetMoreChannelSource(channelSource, channel, firstBatch.CursorId); if (cursorDocument.TryGetValue("atClusterTime", out var atClusterTime)) { @@ -537,7 +537,7 @@ private CursorBatch CreateFirstCursorBatch(BsonDocument cursorDocumen { var operation = CreateOperation(context); var commandResult = operation.Execute(context, cancellationToken); - return CreateCursor(context.ChannelSource, commandResult); + return CreateCursor(context.ChannelSource, context.Channel, commandResult); } } @@ -562,7 +562,7 @@ private CursorBatch CreateFirstCursorBatch(BsonDocument cursorDocumen { var operation = CreateOperation(context); var commandResult = await operation.ExecuteAsync(context, cancellationToken).ConfigureAwait(false); - return CreateCursor(context.ChannelSource, commandResult); + return CreateCursor(context.ChannelSource, context.Channel, commandResult); } } diff --git a/src/MongoDB.Driver.Core/Core/Operations/FindOpcodeOperation.cs b/src/MongoDB.Driver.Core/Core/Operations/FindOpcodeOperation.cs index ff956a11790..46eeaaef235 100644 --- a/src/MongoDB.Driver.Core/Core/Operations/FindOpcodeOperation.cs +++ b/src/MongoDB.Driver.Core/Core/Operations/FindOpcodeOperation.cs @@ -467,7 +467,7 @@ public IAsyncCursor Execute(RetryableReadContext context, Cancellatio var wrappedQuery = CreateWrappedQuery(serverDescription.Type, readPreference, out var secondaryOk); var batch = ExecuteProtocol(context.Channel, wrappedQuery, secondaryOk, cancellationToken); - return CreateCursor(context.ChannelSource, wrappedQuery, batch); + return CreateCursor(context.ChannelSource, context.Channel, wrappedQuery, batch); } } @@ -495,7 +495,7 @@ public async Task> ExecuteAsync(RetryableReadContext con var wrappedQuery = CreateWrappedQuery(serverDescription.Type, readPreference, out var secondaryOk); var batch = await ExecuteProtocolAsync(context.Channel, wrappedQuery, secondaryOk, cancellationToken).ConfigureAwait(false); - return CreateCursor(context.ChannelSource, wrappedQuery, batch); + return CreateCursor(context.ChannelSource, context.Channel, wrappedQuery, batch); } } @@ -538,9 +538,9 @@ public IReadOperation ToExplainOperation(ExplainVerbosity verbosit } // private methods - private IAsyncCursor CreateCursor(IChannelSourceHandle channelSource, BsonDocument query, CursorBatch batch) + private IAsyncCursor CreateCursor(IChannelSourceHandle channelSource, IChannelHandle channel, BsonDocument query, CursorBatch batch) { - var getMoreChannelSource = ChannelPinningHelper.CreateGetMoreChannelSource(channelSource, batch.CursorId); + var getMoreChannelSource = ChannelPinningHelper.CreateGetMoreChannelSource(channelSource, channel, batch.CursorId); return new AsyncCursor( getMoreChannelSource, diff --git a/src/MongoDB.Driver.Core/Core/Operations/ListCollectionsUsingCommandOperation.cs b/src/MongoDB.Driver.Core/Core/Operations/ListCollectionsUsingCommandOperation.cs index 169ebab4886..b2eac990b86 100644 --- a/src/MongoDB.Driver.Core/Core/Operations/ListCollectionsUsingCommandOperation.cs +++ b/src/MongoDB.Driver.Core/Core/Operations/ListCollectionsUsingCommandOperation.cs @@ -144,7 +144,7 @@ public IAsyncCursor Execute(RetryableReadContext context, Cancella { var operation = CreateOperation(); var result = operation.Execute(context, cancellationToken); - return CreateCursor(context.ChannelSource, operation.Command, result); + return CreateCursor(context.ChannelSource, context.Channel, operation.Command, result); } } @@ -168,7 +168,7 @@ public async Task> ExecuteAsync(RetryableReadContext { var operation = CreateOperation(); var result = await operation.ExecuteAsync(context, cancellationToken).ConfigureAwait(false); - return CreateCursor(context.ChannelSource, operation.Command, result); + return CreateCursor(context.ChannelSource, context.Channel, operation.Command, result); } } @@ -188,11 +188,11 @@ private ReadCommandOperation CreateOperation() }; } - private IAsyncCursor CreateCursor(IChannelSourceHandle channelSource, BsonDocument command, BsonDocument result) + private IAsyncCursor CreateCursor(IChannelSourceHandle channelSource, IChannelHandle channel, BsonDocument command, BsonDocument result) { var cursorDocument = result["cursor"].AsBsonDocument; var cursorId = cursorDocument["id"].ToInt64(); - var getMoreChannelSource = ChannelPinningHelper.CreateGetMoreChannelSource(channelSource, cursorId); + var getMoreChannelSource = ChannelPinningHelper.CreateGetMoreChannelSource(channelSource, channel, cursorId); var cursor = new AsyncCursor( getMoreChannelSource, CollectionNamespace.FromFullName(cursorDocument["ns"].AsString), diff --git a/src/MongoDB.Driver.Core/Core/Operations/ListIndexesUsingCommandOperation.cs b/src/MongoDB.Driver.Core/Core/Operations/ListIndexesUsingCommandOperation.cs index 1fba66678a9..e12ffaa1eaf 100644 --- a/src/MongoDB.Driver.Core/Core/Operations/ListIndexesUsingCommandOperation.cs +++ b/src/MongoDB.Driver.Core/Core/Operations/ListIndexesUsingCommandOperation.cs @@ -123,7 +123,7 @@ public IAsyncCursor Execute(RetryableReadContext context, Cancella try { var result = operation.Execute(context, cancellationToken); - return CreateCursor(context.ChannelSource, result, operation.Command); + return CreateCursor(context.ChannelSource, context.Channel, result, operation.Command); } catch (MongoCommandException ex) when (IsCollectionNotFoundException(ex)) { @@ -154,7 +154,7 @@ public async Task> ExecuteAsync(RetryableReadContext try { var result = await operation.ExecuteAsync(context, cancellationToken).ConfigureAwait(false); - return CreateCursor(context.ChannelSource, result, operation.Command); + return CreateCursor(context.ChannelSource, context.Channel, result, operation.Command); } catch (MongoCommandException ex) when (IsCollectionNotFoundException(ex)) { @@ -178,11 +178,11 @@ private ReadCommandOperation CreateOperation() }; } - private IAsyncCursor CreateCursor(IChannelSourceHandle channelSource, BsonDocument result, BsonDocument command) + private IAsyncCursor CreateCursor(IChannelSourceHandle channelSource, IChannelHandle channel, BsonDocument result, BsonDocument command) { var cursorDocument = result["cursor"].AsBsonDocument; var cursorId = cursorDocument["id"].ToInt64(); - var getMoreChannelSource = ChannelPinningHelper.CreateGetMoreChannelSource(channelSource, cursorId); + var getMoreChannelSource = ChannelPinningHelper.CreateGetMoreChannelSource(channelSource, channel, cursorId); var cursor = new AsyncCursor( getMoreChannelSource, CollectionNamespace.FromFullName(cursorDocument["ns"].AsString), diff --git a/src/MongoDB.Driver.Core/Core/Operations/RetryableReadContext.cs b/src/MongoDB.Driver.Core/Core/Operations/RetryableReadContext.cs index 3f8c050c1fa..720bfbd49e1 100644 --- a/src/MongoDB.Driver.Core/Core/Operations/RetryableReadContext.cs +++ b/src/MongoDB.Driver.Core/Core/Operations/RetryableReadContext.cs @@ -43,16 +43,11 @@ public static RetryableReadContext Create(IReadBinding binding, bool retryReques { context.Initialize(cancellationToken); - if (ChannelPinningHelper.PinChannelSourceAndChannelIfRequired( + ChannelPinningHelper.PinChannellIfRequired( context.ChannelSource, context.Channel, - context.Binding.Session, - out var pinnedChannelSource, - out var pinnedChannel)) - { - context.ReplaceChannelSource(pinnedChannelSource); - context.ReplaceChannel(pinnedChannel); - } + context.Binding.Session); + return context; } catch @@ -76,16 +71,11 @@ public static async Task CreateAsync(IReadBinding binding, { await context.InitializeAsync(cancellationToken).ConfigureAwait(false); - if (ChannelPinningHelper.PinChannelSourceAndChannelIfRequired( + ChannelPinningHelper.PinChannellIfRequired( context.ChannelSource, context.Channel, - context.Binding.Session, - out var pinnedChannelSource, - out var pinnedChannel)) - { - context.ReplaceChannelSource(pinnedChannelSource); - context.ReplaceChannel(pinnedChannel); - } + context.Binding.Session); + return context; } catch diff --git a/src/MongoDB.Driver.Core/Core/Operations/RetryableWriteContext.cs b/src/MongoDB.Driver.Core/Core/Operations/RetryableWriteContext.cs index 2608ac80693..039209c5a77 100644 --- a/src/MongoDB.Driver.Core/Core/Operations/RetryableWriteContext.cs +++ b/src/MongoDB.Driver.Core/Core/Operations/RetryableWriteContext.cs @@ -45,17 +45,10 @@ public static RetryableWriteContext Create(IWriteBinding binding, bool retryRequ { context.Initialize(cancellationToken); - if (context.Binding.Session.IsInTransaction && - ChannelPinningHelper.PinChannelSourceAndChannelIfRequired( + ChannelPinningHelper.PinChannellIfRequired( context.ChannelSource, context.Channel, - context.Binding.Session, - out var pinnedChannelSource, - out var pinnedChannel)) - { - context.ReplaceChannelSource(pinnedChannelSource); - context.ReplaceChannel(pinnedChannel); - } + context.Binding.Session); return context; } @@ -80,17 +73,10 @@ public static async Task CreateAsync(IWriteBinding bindin { await context.InitializeAsync(cancellationToken).ConfigureAwait(false); - if (context.Binding.Session.IsInTransaction && - ChannelPinningHelper.PinChannelSourceAndChannelIfRequired( + ChannelPinningHelper.PinChannellIfRequired( context.ChannelSource, context.Channel, - context.Binding.Session, - out var pinnedChannelSource, - out var pinnedChannel)) - { - context.ReplaceChannelSource(pinnedChannelSource); - context.ReplaceChannel(pinnedChannel); - } + context.Binding.Session); return context; } diff --git a/tests/MongoDB.Driver.Core.Tests/Core/ConnectionPools/ExclusiveConnectionPoolTests.cs b/tests/MongoDB.Driver.Core.Tests/Core/ConnectionPools/ExclusiveConnectionPoolTests.cs index 9c581a10684..38f39e7a3e0 100644 --- a/tests/MongoDB.Driver.Core.Tests/Core/ConnectionPools/ExclusiveConnectionPoolTests.cs +++ b/tests/MongoDB.Driver.Core.Tests/Core/ConnectionPools/ExclusiveConnectionPoolTests.cs @@ -280,6 +280,81 @@ public void AcquireConnection_should_return_a_connection( _capturedEvents.Any().Should().BeFalse(); } + [Theory] + [ParameterAttributeData] + internal void AcquireConnection_should_track_checked_out_reasons( + [Values(CheckOutReason.Cursor, CheckOutReason.Transaction)] CheckOutReason reason, + [Values(1, 3, 5)] int attempts, + [Values(false, true)] bool async) + { + var subjectSettings = new ConnectionPoolSettings(minConnections: 0); + + var mockConnectionFactory = Mock.Of(c => c.CreateConnection(_serverId, _endPoint) == Mock.Of()); + + var subject = CreateSubject(subjectSettings, connectionFactory: mockConnectionFactory); + + InitializeAndWait(subject, subjectSettings); + _capturedEvents.Clear(); + + List connections = new(); + for (int attempt = 1; attempt <= attempts; attempt++) + { + IConnectionHandle connection; + if (async) + { + connection = subject.AcquireConnectionAsync(CancellationToken.None).GetAwaiter().GetResult(); + } + else + { + connection = subject.AcquireConnection(CancellationToken.None); + } + ((ICheckOutReasonTracker)connection).SetCheckOutReasonIfNotAlreadySet(reason); + connections.Add(connection); + + connections.Should().HaveCount(attempt); + subject._checkOutReasonCounter().GetCheckOutsCount(reason).Should().Be(attempt); + foreach (var restItem in GetEnumItemsExcept(reason)) + { + subject._checkOutReasonCounter().GetCheckOutsCount(restItem).Should().Be(0); + } + + _capturedEvents.Next().Should().BeOfType(); + _capturedEvents.Next().Should().BeOfType(); + _capturedEvents.Next().Should().BeOfType(); + _capturedEvents.Next().Should().BeOfType(); + _capturedEvents.Next().Should().BeOfType(); + } + + _capturedEvents.Any().Should().BeFalse(); + + for (int attempt = 1; attempt <= attempts; attempt++) + { + connections[attempt - 1].Dispose(); // return connection to the pool + + subject._checkOutReasonCounter().GetCheckOutsCount(reason).Should().Be(attempts - attempt); + foreach (var restItem in GetEnumItemsExcept(reason)) + { + subject._checkOutReasonCounter().GetCheckOutsCount(restItem).Should().Be(0); + } + + _capturedEvents.Next().Should().BeOfType(); + _capturedEvents.Next().Should().BeOfType(); + } + _capturedEvents.Any().Should().BeFalse(); + + IEnumerable GetEnumItemsExcept(CheckOutReason reason) + { + foreach (var reasonItem in Enum.GetValues(typeof(CheckOutReason)).Cast()) + { + if (reasonItem == reason) + { + continue; + } + yield return reasonItem; + } + } + } + [Theory] [ParameterAttributeData] public void AcquireConnection_should_increase_count_up_to_the_max_number_of_connections( @@ -764,7 +839,7 @@ public void MaintainSizeAsync_should_not_try_new_attempt_after_failing_without_d [Theory] [ParameterAttributeData] - public void PrunePoolAsync_should_remove_all_expired_connections([RandomSeed]int seed) + public void PrunePoolAsync_should_remove_all_expired_connections([RandomSeed] int seed) { const int connectionsCount = 10; @@ -875,22 +950,27 @@ private ExclusiveConnectionPool CreateSubject(ConnectionPoolSettings connectionP _capturedEvents); } - private void InitializeAndWait() + private void InitializeAndWait(ExclusiveConnectionPool pool = null, ConnectionPoolSettings poolSettings = null) { - _subject.Initialize(); + var connectionPool = pool ?? _subject; + var connectionPoolSettings = poolSettings ?? _settings; + + connectionPool.Initialize(); SpinWait.SpinUntil( - () => _subject.CreatedCount == _settings.MinConnections && - _subject.AvailableCount == _settings.MaxConnections && - _subject.DormantCount == _settings.MinConnections && - _subject.UsedCount == 0, + () => + connectionPool.CreatedCount == connectionPoolSettings.MinConnections && + connectionPool.AvailableCount == connectionPoolSettings.MaxConnections && + connectionPool.DormantCount == connectionPoolSettings.MinConnections && + connectionPool.UsedCount == 0, TimeSpan.FromSeconds(5)) - .Should().BeTrue(); + .Should() + .BeTrue(); - _subject.AvailableCount.Should().Be(_settings.MaxConnections); - _subject.CreatedCount.Should().Be(_settings.MinConnections); - _subject.DormantCount.Should().Be(_settings.MinConnections); - _subject.UsedCount.Should().Be(0); + connectionPool.AvailableCount.Should().Be(connectionPoolSettings.MaxConnections); + connectionPool.CreatedCount.Should().Be(connectionPoolSettings.MinConnections); + connectionPool.DormantCount.Should().Be(connectionPoolSettings.MinConnections); + connectionPool.UsedCount.Should().Be(0); } } @@ -906,6 +986,8 @@ public static Task MaintainSizeAsync(this ExclusiveConnectionPool obj) return (Task)Reflector.Invoke(obj, nameof(MaintainSizeAsync)); } + public static CheckOutReasonCounter _checkOutReasonCounter(this ExclusiveConnectionPool obj) => (CheckOutReasonCounter)Reflector.GetFieldValue(obj, nameof(_checkOutReasonCounter)); + public static ServiceStates _serviceStates(this ExclusiveConnectionPool obj) { return (ServiceStates)Reflector.GetFieldValue(obj, nameof(_serviceStates)); diff --git a/tests/MongoDB.Driver.Core.Tests/Core/Operations/FindCommandOperationTests.cs b/tests/MongoDB.Driver.Core.Tests/Core/Operations/FindCommandOperationTests.cs index 83060ebcbb4..b6402ac3838 100644 --- a/tests/MongoDB.Driver.Core.Tests/Core/Operations/FindCommandOperationTests.cs +++ b/tests/MongoDB.Driver.Core.Tests/Core/Operations/FindCommandOperationTests.cs @@ -846,7 +846,7 @@ public void CreateCursor_should_use_ns_field_instead_of_namespace_passed_in_cons mockChannelSource.Setup(x => x.Server).Returns(mockServer.Object); mockChannelSource.Setup(x => x.Session).Returns(mockSession.Object); - var cursor = subject.CreateCursor(mockChannelSource.Object, commandResult); + var cursor = subject.CreateCursor(mockChannelSource.Object, Mock.Of(), commandResult); cursor._collectionNamespace().Should().Be(cursorCollectionNamespace); } @@ -1392,9 +1392,10 @@ public static class FindCommandOperationReflector public static AsyncCursor CreateCursor( this FindCommandOperation obj, IChannelSourceHandle channelSource, + IChannelHandle channel, BsonDocument commandResult) { - return (AsyncCursor)Reflector.Invoke(obj, nameof(CreateCursor), channelSource, commandResult); + return (AsyncCursor)Reflector.Invoke(obj, nameof(CreateCursor), channelSource, channel, commandResult); } } } diff --git a/tests/MongoDB.Driver.Tests/Specifications/load-balancers/LoadBalancersTestRunner.cs b/tests/MongoDB.Driver.Tests/Specifications/load-balancers/LoadBalancersTestRunner.cs index a5905c5cf3b..8866a9af2f1 100644 --- a/tests/MongoDB.Driver.Tests/Specifications/load-balancers/LoadBalancersTestRunner.cs +++ b/tests/MongoDB.Driver.Tests/Specifications/load-balancers/LoadBalancersTestRunner.cs @@ -32,10 +32,6 @@ public sealed class LoadBalancersTestRunner [ClassData(typeof(TestCaseFactory))] public void Run(JsonDrivenTestCase testCase) { - if (testCase.Name.Contains("wait-queue-timeouts")) - { - throw new SkipException("Not implemented yet."); - } #if DEBUG RequirePlatform .Check() diff --git a/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedListCollectionsOperation.cs b/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedListCollectionsOperation.cs index 6f1c44e065f..45c1bf896b3 100644 --- a/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedListCollectionsOperation.cs +++ b/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedListCollectionsOperation.cs @@ -63,7 +63,7 @@ public async Task ExecuteAsync(CancellationToken cancellationTo ? await _database.ListCollectionsAsync(_options, cancellationToken) : await _database.ListCollectionsAsync(_session, _options, cancellationToken); - _ = cursor.ToListAsync(cancellationToken); + _ = await cursor.ToListAsync(cancellationToken); return OperationResult.Empty(); } diff --git a/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedListIndexesOperation.cs b/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedListIndexesOperation.cs index 607b6266b98..441dfebcbfd 100644 --- a/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedListIndexesOperation.cs +++ b/tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedListIndexesOperation.cs @@ -63,7 +63,7 @@ public async Task ExecuteAsync(CancellationToken cancellationTo ? await _collection.Indexes.ListAsync(_listIndexesOptions, cancellationToken) : await _collection.Indexes.ListAsync(_session, _listIndexesOptions, cancellationToken); - _ = cursor.ToListAsync(cancellationToken); + _ = await cursor.ToListAsync(cancellationToken); return OperationResult.Empty(); }