Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Proto.Actor/EventStream/EventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public EventStreamSubscription<T> Subscribe<TMsg>(
/// <returns>A new subscription that can be used to unsubscribe</returns>
public EventStreamSubscription<T> Subscribe<TMsg>(ISenderContext context, params PID[] pids) where TMsg : T
{
var caller = pids.First().ToDiagnosticString();
var caller = pids.First().ToDiagnosticString().Split("/").Last();
var sub = new EventStreamSubscription<T>(
this,
Dispatchers.SynchronousDispatcher,
Expand Down
14 changes: 13 additions & 1 deletion src/Proto.Actor/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Proto.Mailbox;
using Proto.Utils;

namespace Proto;

Expand All @@ -20,10 +23,19 @@ internal static void SendSystemMessage(this IEnumerable<PID> self, SystemMessage
pid.SendSystemMessage(system, message);
}
}

public static async Task StopMany(this IEnumerable<PID> self, IContext context)
{
foreach (var chunk in self.Chunk(20))
{
var tasks = chunk.Select(context.StopAsync);
await Task.WhenAll(tasks).WaitUpTo(TimeSpan.FromSeconds(10));
}
}

[UsedImplicitly]
public static void Deconstruct<TKey, TValue>(
//DONT TOUCH THIS, it tries to deconstruct the deconstruct method...
//DON'T TOUCH THIS, it tries to deconstruct the deconstruct method...
// ReSharper disable once UseDeconstructionOnParameter
this KeyValuePair<TKey, TValue> self,
out TKey key,
Expand Down
10 changes: 6 additions & 4 deletions src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand All @@ -17,7 +18,9 @@ namespace Proto.Cluster.Identity;
internal class IdentityStoragePlacementActor : IActor
{
private const int PersistenceRetries = 3;
#pragma warning disable CS0618 // Type or member is obsolete
private static readonly ILogger Logger = Log.CreateLogger<IdentityStoragePlacementActor>();
#pragma warning restore CS0618 // Type or member is obsolete

//pid -> the actor that we have created here
//kind -> the actor kind
Expand All @@ -39,7 +42,7 @@ public Task ReceiveAsync(IContext context) =>
context.Message switch
{
Started => OnStarted(context),
Stopping _ => Stopping(),
Stopping _ => Stopping(context),
Stopped _ => Stopped(),
ActivationTerminating msg => OnActivationTerminating(context, msg),
ActivationRequest msg => OnActivationRequest(context, msg),
Expand All @@ -53,12 +56,11 @@ private Task OnStarted(IContext context)
return Task.CompletedTask;
}

private Task Stopping()
private async Task Stopping(IContext context)
{
Logger.LogInformation("Stopping placement actor");
_subscription?.Unsubscribe();

return Task.CompletedTask;
await _actors.Values.StopMany(context);
}

private Task Stopped()
Expand Down
9 changes: 9 additions & 0 deletions src/Proto.Cluster/Partition/PartitionPlacementActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ namespace Proto.Cluster.Partition;

internal class PartitionPlacementActor : IActor, IDisposable
{
#pragma warning disable CS0618 // Type or member is obsolete
private static readonly ILogger Logger = Log.CreateLogger<PartitionPlacementActor>();
#pragma warning restore CS0618 // Type or member is obsolete

//pid -> the actor that we have created here
//kind -> the actor kind
Expand All @@ -40,13 +42,20 @@ public Task ReceiveAsync(IContext context) =>
context.Message switch
{
Started => OnStarted(context),
Stopping => OnStopping(context),
ActivationTerminating msg => OnActivationTerminating(msg),
IdentityHandoverRequest msg => OnIdentityHandoverRequest(context, msg),
ClusterTopology msg => OnClusterTopology(context, msg),
ActivationRequest msg => OnActivationRequest(context, msg),
_ => Task.CompletedTask
};

private async Task OnStopping(IContext context)
{
var pids = _actors.Values;
await pids.StopMany(context);
}

public void Dispose() => _subscription?.Unsubscribe();

private Task OnClusterTopology(IContext context, ClusterTopology msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,20 @@ public Task ReceiveAsync(IContext context) =>
context.Message switch
{
Started => OnStarted(context),
Stopping => OnStopping(context),
ActivationRequest msg => OnActivationRequest(msg, context),
ActivationTerminated msg => OnActivationTerminated(msg),
ActivationTerminating msg => OnActivationTerminating(msg),
ClusterTopology msg => OnClusterTopology(msg, context),
_ => Task.CompletedTask
};

private async Task OnStopping(IContext context)
{
var pids = _actors.Values;
await pids.StopMany(context);
}

private Task OnStarted(IContext context)
{
var self = context.Self;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,5 +122,8 @@ public Task SetupAsync(Cluster cluster, string[] kinds, bool isClient)
return Task.CompletedTask;
}

public Task ShutdownAsync() => _partitionManager.ShutdownAsync();
public Task ShutdownAsync()
{
return _partitionManager.ShutdownAsync();
}
}