Skip to content

Commit de6c2d8

Browse files
authored
use concurrent bag for shared futures (#1697)
* use concurrent bag for shared futures
1 parent af14b11 commit de6c2d8

File tree

3 files changed

+104
-102
lines changed

3 files changed

+104
-102
lines changed

src/Proto.Actor/Future/SharedFuture.cs

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
// </copyright>
55
// -----------------------------------------------------------------------
66
using System;
7+
using System.Collections.Concurrent;
78
using System.Collections.Generic;
89
using System.Threading;
910
using System.Threading.Channels;
@@ -16,8 +17,8 @@ namespace Proto.Future;
1617
public sealed class SharedFutureProcess : Process, IDisposable
1718
{
1819
private readonly FutureHandle[] _slots;
19-
private readonly ChannelWriter<FutureHandle> _completedFutures;
20-
private readonly ChannelReader<FutureHandle> _availableFutures;
20+
private readonly ConcurrentBag<FutureHandle> _futures = new();
21+
2122

2223
private long _createdRequests;
2324
private long _completedRequests;
@@ -54,15 +55,13 @@ internal SharedFutureProcess(ActorSystem system, int size) : base(system)
5455

5556
_slots = new FutureHandle[size];
5657

57-
Channel<FutureHandle> channel = Channel.CreateUnbounded<FutureHandle>();
58-
_availableFutures = channel.Reader;
59-
_completedFutures = channel.Writer;
58+
6059

6160
for (var i = 0; i < _slots.Length; i++)
6261
{
6362
var requestSlot = new FutureHandle(this, ToRequestId(i));
6463
_slots[i] = requestSlot;
65-
_completedFutures.TryWrite(requestSlot);
64+
_futures.Add(requestSlot);
6665
}
6766

6867
_maxRequestId = (int.MaxValue - (int.MaxValue % size));
@@ -83,7 +82,8 @@ public int RequestsInFlight {
8382

8483
public IFuture? TryCreateHandle()
8584
{
86-
if (Stopping || !_availableFutures.TryRead(out var requestSlot)) return default;
85+
86+
if (Stopping || !_futures.TryTake(out var requestSlot)) return default;
8787

8888
var pid = requestSlot.Init();
8989
Interlocked.Increment(ref _createdRequests);
@@ -129,7 +129,7 @@ private void Complete(uint requestId, FutureHandle slot)
129129
{
130130
if (slot.TryComplete((int) requestId))
131131
{
132-
_completedFutures.TryWrite(slot);
132+
_futures.Add(slot);
133133

134134
Interlocked.Increment(ref _completedRequests);
135135

@@ -151,21 +151,6 @@ public void Dispose()
151151
}
152152
}
153153

154-
public void Stop()
155-
{
156-
Stopping = true;
157-
_completedFutures.TryComplete();
158-
159-
while (_availableFutures.TryRead(out _))
160-
{
161-
}
162-
163-
if (RequestsInFlight == 0)
164-
{
165-
Stop(Pid);
166-
}
167-
}
168-
169154
private void Cancel(uint requestId)
170155
{
171156
if (!TryGetRequestSlot(requestId, out var slot)) return;

src/Proto.Remote/Endpoints/EndpointReader.cs

Lines changed: 93 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -40,35 +40,38 @@ ServerCallContext context
4040
throw new RpcException(Status.DefaultCancelled, "Suspended");
4141
}
4242

43-
using (_endpointManager.CancellationToken.Register(async () => {
44-
try
45-
{
46-
await responseStream.WriteAsync(new RemoteMessage
47-
{
48-
DisconnectRequest = new DisconnectRequest()
49-
}
50-
).ConfigureAwait(false);
51-
}
52-
catch (Exception x)
53-
{
54-
x.CheckFailFast();
55-
Logger.LogWarning("[EndpointReader][{SystemAddress}] Failed to write disconnect message to the stream", _system.Address);
56-
}
57-
}
58-
))
43+
async void Disconnect()
44+
{
45+
try
46+
{
47+
var disconnectMsg = new RemoteMessage
48+
{
49+
DisconnectRequest = new DisconnectRequest()
50+
};
51+
await responseStream.WriteAsync(disconnectMsg).ConfigureAwait(false);
52+
}
53+
catch (Exception x)
54+
{
55+
x.CheckFailFast();
56+
Logger.LogWarning("[EndpointReader][{SystemAddress}] Failed to write disconnect message to the stream", _system.Address);
57+
}
58+
}
59+
60+
await using (_endpointManager.CancellationToken.Register(Disconnect))
5961
{
6062
IEndpoint endpoint;
6163
string? address = null;
6264
string systemId;
63-
64-
65+
6566
Logger.LogInformation("[EndpointReader][{SystemAddress}] Accepted connection request from {Remote} to {Local}",
6667
_system.Address, context.Peer, context.Host
6768
);
6869

69-
if (await requestStream.MoveNext().ConfigureAwait(false) &&
70+
if (await requestStream.MoveNext(_endpointManager.CancellationToken).ConfigureAwait(false) &&
7071
requestStream.Current.MessageTypeCase != RemoteMessage.MessageTypeOneofCase.ConnectRequest)
72+
{
7173
throw new RpcException(Status.DefaultCancelled, "Expected connection message");
74+
}
7275

7376
var connectRequest = requestStream.Current.ConnectRequest;
7477

@@ -104,54 +107,7 @@ await responseStream.WriteAsync(new RemoteMessage
104107
).ConfigureAwait(false);
105108
systemId = clientConnection.MemberId;
106109
endpoint = _endpointManager.GetOrAddClientEndpoint(systemId);
107-
_ = Task.Run(async () => {
108-
try
109-
{
110-
while (!cancellationTokenSource.Token.IsCancellationRequested)
111-
{
112-
while (!cancellationTokenSource.Token.IsCancellationRequested &&
113-
endpoint.OutgoingStash.TryPop(out var message))
114-
{
115-
try
116-
{
117-
await responseStream.WriteAsync(message).ConfigureAwait(false);
118-
}
119-
catch (Exception)
120-
{
121-
_ = endpoint.OutgoingStash.Append(message);
122-
throw;
123-
}
124-
}
125-
126-
while (endpoint.OutgoingStash.IsEmpty && !cancellationTokenSource.Token.IsCancellationRequested)
127-
{
128-
var message = await endpoint.Outgoing.Reader.ReadAsync(cancellationTokenSource.Token)
129-
.ConfigureAwait(false);
130-
131-
try
132-
{
133-
// Logger.LogInformation($"Sending {message}");
134-
await responseStream.WriteAsync(message).ConfigureAwait(false);
135-
}
136-
catch (Exception)
137-
{
138-
_ = endpoint.OutgoingStash.Append(message);
139-
throw;
140-
}
141-
}
142-
}
143-
}
144-
catch (OperationCanceledException)
145-
{
146-
Logger.LogDebug("[EndpointReader][{SystemAddress}] Writer closed for {SystemId}", _system.Address, systemId);
147-
}
148-
catch (Exception e)
149-
{
150-
e.CheckFailFast();
151-
Logger.LogWarning(e, "[EndpointReader][{SystemAddress}] Writing error to {SystemId}", _system.Address, systemId);
152-
}
153-
}
154-
);
110+
_ = Task.Run(async () => { await RunClientWriter(responseStream, cancellationTokenSource, endpoint, systemId); });
155111
}
156112
break;
157113
case ConnectRequest.ConnectionTypeOneofCase.ServerConnection: {
@@ -207,24 +163,83 @@ await responseStream.WriteAsync(new RemoteMessage
207163
throw new ArgumentOutOfRangeException();
208164
}
209165

210-
try
166+
await RunReader(requestStream, address, cancellationTokenSource, systemId);
167+
}
168+
}
169+
170+
private async Task RunReader(IAsyncStreamReader<RemoteMessage> requestStream, string? address, CancellationTokenSource cancellationTokenSource, string systemId)
171+
{
172+
try
173+
{
174+
while (await requestStream.MoveNext(CancellationToken.None).ConfigureAwait(false))
211175
{
212-
while (await requestStream.MoveNext().ConfigureAwait(false))
176+
var currentMessage = requestStream.Current;
177+
if (_endpointManager.CancellationToken.IsCancellationRequested)
213178
{
214-
var currentMessage = requestStream.Current;
215-
if (_endpointManager.CancellationToken.IsCancellationRequested)
216-
continue;
217-
218-
_endpointManager.RemoteMessageHandler.HandleRemoteMessage(currentMessage, address!);
179+
continue;
219180
}
181+
182+
_endpointManager.RemoteMessageHandler.HandleRemoteMessage(currentMessage, address!);
220183
}
221-
finally
184+
}
185+
finally
186+
{
187+
cancellationTokenSource.Cancel();
188+
189+
if (address is null && systemId is not null)
222190
{
223-
cancellationTokenSource.Cancel();
224-
if (address is null && systemId is not null)
225-
_system.EventStream.Publish(new EndpointTerminatedEvent(false, null, systemId));
191+
_system.EventStream.Publish(new EndpointTerminatedEvent(false, null, systemId));
192+
}
193+
}
194+
}
195+
196+
private async Task RunClientWriter(IAsyncStreamWriter<RemoteMessage> responseStream, CancellationTokenSource cancellationTokenSource, IEndpoint endpoint, string systemId)
197+
{
198+
try
199+
{
200+
while (!cancellationTokenSource.Token.IsCancellationRequested)
201+
{
202+
//consume stash
203+
while (!cancellationTokenSource.Token.IsCancellationRequested && endpoint.OutgoingStash.TryPop(out var message))
204+
{
205+
try
206+
{
207+
await responseStream.WriteAsync(message).ConfigureAwait(false);
208+
}
209+
catch (Exception)
210+
{
211+
_ = endpoint.OutgoingStash.Append(message);
212+
throw;
213+
}
214+
}
215+
216+
//
217+
while (endpoint.OutgoingStash.IsEmpty && !cancellationTokenSource.Token.IsCancellationRequested)
218+
{
219+
var message = await endpoint.Outgoing.Reader.ReadAsync(cancellationTokenSource.Token).ConfigureAwait(false);
220+
221+
try
222+
{
223+
// Logger.LogInformation($"Sending {message}");
224+
await responseStream.WriteAsync(message).ConfigureAwait(false);
225+
}
226+
catch (Exception)
227+
{
228+
_ = endpoint.OutgoingStash.Append(message);
229+
throw;
230+
}
231+
}
226232
}
227233
}
234+
catch (OperationCanceledException)
235+
{
236+
Logger.LogDebug("[EndpointReader][{SystemAddress}] Writer closed for {SystemId}", _system.Address, systemId);
237+
}
238+
catch (Exception e)
239+
{
240+
e.CheckFailFast();
241+
Logger.LogWarning(e, "[EndpointReader][{SystemAddress}] Writing error to {SystemId}", _system.Address, systemId);
242+
}
228243
}
229244

230245
public override Task<ListProcessesResponse> ListProcesses(ListProcessesRequest request, ServerCallContext context)

src/Proto.Remote/GrpcNet/GrpcNetRemote.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ public Task StartAsync()
6161
if (_config.ConfigureKestrel == null)
6262
{
6363
serverOptions.Listen(ipAddress, Config.Port,
64-
listenOptions => listenOptions.Protocols = HttpProtocols.Http2
64+
listenOptions => {
65+
listenOptions.Protocols = HttpProtocols.Http2;
66+
}
6567
);
6668
}
6769
else

0 commit comments

Comments
 (0)