Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 16 additions & 65 deletions src/Proto.Actor/Context/ActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ public void ReenterAfterCancellation(CancellationToken token, Action onCancelled
{
if (token.IsCancellationRequested)
{
ReenterAfter(Task.CompletedTask, onCancelled);
((IContext)this).ReenterAfter(Task.CompletedTask, () =>
{
onCancelled();
});

return;
}
Expand All @@ -238,84 +241,32 @@ public void ReenterAfterCancellation(CancellationToken token, Action onCancelled
// Ensures registration is disposed with the actor
var inceptionRegistration = CancellationToken.Register(() => registration.Dispose());

ReenterAfter(tcs.Task, () =>
{
inceptionRegistration.Dispose();
onCancelled();
}
);
((IContext)this).ReenterAfter(tcs.Task, () =>
{
inceptionRegistration.Dispose();
onCancelled();
});
}

public void ReenterAfter<T>(Task<T> target, Func<Task<T>, Task> action)
private void ContinueReenter<T>(Task<T> target, Func<Task<T>, Task> action)
{
var msg = _messageOrEnvelope;
var cont = new Continuation(() => action(target), msg, Actor);

ScheduleContinuation(target, cont);
}

public void ReenterAfter(Task target, Action action)
{
var msg = _messageOrEnvelope;

var cont = new Continuation(
() =>
{
action();

return Task.CompletedTask;
},
msg,
Actor);

ScheduleContinuation(target, cont);
}

public void ReenterAfter(Task target, Action<Task> action)
{
var msg = _messageOrEnvelope;

var cont = new Continuation(
() =>
{
action(target);

return Task.CompletedTask;
},
msg,
Actor);

ScheduleContinuation(target, cont);
}

public void ReenterAfter<T>(Task<T> target, Action<Task<T>> action)
private void ContinueReenter(Task target, Func<Task, Task> action)
{
var msg = _messageOrEnvelope;

var cont = new Continuation(
() =>
{
action(target);

return Task.CompletedTask;
},
msg,
Actor);

var cont = new Continuation(() => action(target), msg, Actor);
ScheduleContinuation(target, cont);
}

public void ReenterAfter(Task target, Func<Task, Task> action)
{
var msg = _messageOrEnvelope;

var cont = new Continuation(
() => action(target),
msg,
Actor);
public void ReenterAfter<T>(Task<T> target, Func<Task<T>, Task> action) =>
ContinueReenter(target, action);

ScheduleContinuation(target, cont);
}
public void ReenterAfter(Task target, Func<Task, Task> action) =>
ContinueReenter(target, action);

public Task Receive(MessageEnvelope envelope)
{
Expand Down
9 changes: 2 additions & 7 deletions src/Proto.Actor/Context/ActorContextDecorator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,8 @@ public virtual PID SpawnNamed(Props props, string name, Action<IContext>? callba
public virtual void ReenterAfter<T>(Task<T> target, Func<Task<T>, Task> action) =>
_context.ReenterAfter(target, action);

public virtual void ReenterAfter(Task target, Action action) => _context.ReenterAfter(target, action);

public virtual void ReenterAfter(Task target, Action<Task> action) => _context.ReenterAfter(target, action);

public virtual void ReenterAfter<T>(Task<T> target, Action<Task<T>> action) => _context.ReenterAfter(target, action);

public virtual void ReenterAfter(Task target, Func<Task, Task> action) => _context.ReenterAfter(target, action);
public virtual void ReenterAfter(Task target, Func<Task, Task> action) =>
_context.ReenterAfter(target, action);

public CapturedContext Capture() => _context.Capture();

Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Actor/Context/ActorLoggingContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public override void ReenterAfter<T>(Task<T> target, Func<Task<T>, Task> action)
base.ReenterAfter(target, action);
}

public override void ReenterAfter(Task target, Action action)
public override void ReenterAfter(Task target, Func<Task, Task> action)
{
if (_logLevel != LogLevel.None && _logger.IsEnabled(_logLevel))
{
Expand Down
35 changes: 2 additions & 33 deletions src/Proto.Actor/Context/IContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,39 +93,8 @@ public interface IContext : ISenderContext, IReceiverContext, ISpawnerContext, I

/// <summary>
/// Awaits the given target task and once completed, the given action is then completed within the actors concurrency
/// constraint.
/// The concept is called Reentrancy, where an actor can continue to process messages while also awaiting that some
/// asynchronous operation completes.
/// </summary>
/// <param name="target">The Task to await</param>
/// <param name="action">The continuation to call once the task is completed</param>
void ReenterAfter(Task target, Action action);

/// <summary>
/// Awaits the given target task and once completed, the given action is then completed within the actors concurrency
/// constraint.
/// The concept is called Reentrancy, where an actor can continue to process messages while also awaiting that some
/// asynchronous operation completes.
/// </summary>
/// <param name="target">The Task to await</param>
/// <param name="action">The continuation to call once the task is completed. The awaited task is passed in as a parameter.</param>
void ReenterAfter(Task target, Action<Task> action);

/// <summary>
/// Awaits the given target task and once completed, the given action is then completed within the actors concurrency
/// constraint.
/// The concept is called Reentrancy, where an actor can continue to process messages while also awaiting that some
/// asynchronous operation completes.
/// </summary>
/// <param name="target">The Task to await</param>
/// <param name="action">The continuation to call once the task is completed. The awaited task is passed in as a parameter.</param>
void ReenterAfter<T>(Task<T> target, Action<Task<T>> action);

/// <summary>
/// Awaits the given target task and once completed, the given action is then completed within the actors concurrency
/// constraint.
/// The concept is called Reentrancy, where an actor can continue to process messages while also awaiting that some
/// asynchronous operation completes.
/// constraint. The concept is called Reentrancy, where an actor can continue to process messages while also awaiting
/// that some asynchronous operation completes.
/// </summary>
/// <param name="target">The Task to await</param>
/// <param name="action">The continuation to call once the task is completed. The awaited task is passed in as a parameter.</param>
Expand Down
61 changes: 61 additions & 0 deletions src/Proto.Actor/Context/ReenterAfterExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using System;
using System.Threading.Tasks;
using JetBrains.Annotations;

// ReSharper disable once CheckNamespace
namespace Proto;

/// <summary>
/// Convenience overloads for <see cref="IContext.ReenterAfter"/> to avoid boilerplate when the
/// continuation does not require the completed task or when it is synchronous.
/// </summary>
[PublicAPI]
public static class ReenterAfterExtensions
{
public static void ReenterAfter(this IContext context, Task target, Action action)
=> context.ReenterAfter(target, _ =>
{
action();
return Task.CompletedTask;
});

public static void ReenterAfter(this IContext context, Task target, Func<Task> action)
=> context.ReenterAfter(target, _ => action());

public static void ReenterAfter(this IContext context, Task target, Action<Task> action)
=> context.ReenterAfter(target, t =>
{
action(t);
return Task.CompletedTask;
});

public static void ReenterAfter<T>(this IContext context, Task<T> target, Action action)
=> context.ReenterAfter(target, _ =>
{
action();
return Task.CompletedTask;
});

public static void ReenterAfter<T>(this IContext context, Task<T> target, Func<Task> action)
=> context.ReenterAfter(target, _ => action());

public static void ReenterAfter<T>(this IContext context, Task<T> target, Action<Task<T>> action)
=> context.ReenterAfter(target, t =>
{
action(t);
return Task.CompletedTask;
});

public static void ReenterAfter<T>(this IContext context, Task<T> target, Action<T> action)
=> context.ReenterAfter(target, async t =>
{
action(await t.ConfigureAwait(false));
});

public static void ReenterAfter<T>(this IContext context, Task<T> target, Func<T, Task> action)
=> context.ReenterAfter(target, async t =>
{
var res = await t.ConfigureAwait(false);
await action(res).ConfigureAwait(false);
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private Task OnRegisterMember(IContext context, RegisterMember command)
var registerMemberTask = RegisterMemberInternal();

// Reenter after the member has been registered.
context.ReenterAfter(registerMemberTask, _ =>
context.ReenterAfter(registerMemberTask, () =>
{
// Schedule the first update.
ScheduleUpdate(context);
Expand All @@ -120,7 +120,7 @@ private Task OnUpdateMembers(IContext context)
return Task.CompletedTask;

var updateMembersTask = UpdateMembersAsync();
context.ReenterAfter(updateMembersTask, _ =>
context.ReenterAfter(updateMembersTask, () =>
{
// Schedule the next update.
ScheduleUpdate(context);
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Cluster/Identity/IdentityActivatorProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private Task ReplaceActivation(ClusterIdentity identity, PID replacedPid, IConte
}

context.ReenterAfter(Task.Delay(50 * attempt),
() => ReplaceActivation(identity, replacedPid, context, attempt + 1));
_ => ReplaceActivation(identity, replacedPid, context, attempt + 1));

return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Cluster/Partition/PartitionIdentityActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ private Task OnActivationRequest(ActivationRequest msg, IContext context)
msg.ClusterIdentity);
}

context.ReenterAfter(_rebalanceTcs.Task, _ => OnActivationRequest(msg, context));
context.ReenterAfter(_rebalanceTcs.Task, () => OnActivationRequest(msg, context));

return Task.CompletedTask;
}
Expand Down
16 changes: 8 additions & 8 deletions src/Proto.Cluster/Partition/PartitionIdentityRebalanceWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ private Task OnPartitionFailed(PartitionFailed response, IContext context)
default:
Logger.LogWarning("[PartitionIdentity] Partition {Member} unreachable", response.MemberAddress);

context.ReenterAfter(Task.Delay(200, _cancellationToken),
() => StartRebalanceFromMember(_request!, context, response.MemberAddress)
);
context.ReenterAfter(Task.Delay(200, _cancellationToken), () =>
{
StartRebalanceFromMember(_request!, context, response.MemberAddress);
});

break;
}
Expand Down Expand Up @@ -197,11 +198,10 @@ private void OnIdentityHandoverRequest(IdentityHandoverRequest msg, IContext con
context.SetReceiveTimeout(_timeout);

context.ReenterAfter(_completionSource.Task, () =>
{
context.Send(context.Parent!, _completionSource.Task.Result);
context.Stop(context.Self);
}
);
{
context.Send(context.Parent!, _completionSource.Task.Result);
context.Stop(context.Self);
});
}

private void OnIdentityHandover(IdentityHandover response, IContext context)
Expand Down
11 changes: 5 additions & 6 deletions src/Proto.Cluster/Partition/PartitionPlacementActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,13 @@ private Task OnClusterTopology(IContext context, ClusterTopology msg)
.WaitUntilInFlightActivationsAreCompleted(_config.RebalanceActivationsCompletionTimeout, cancellationToken);

// Waits until all members agree on a cluster topology and have no more in-flight activation requests
context.ReenterAfter(activationsCompleted, async _ =>
context.ReenterAfter(activationsCompleted, async () =>
{
if (!cancellationToken.IsCancellationRequested)
{
if (!cancellationToken.IsCancellationRequested)
{
await Rebalance(context, msg).ConfigureAwait(false);
}
await Rebalance(context, msg).ConfigureAwait(false);
}
);
});

return Task.CompletedTask;
}
Expand Down
6 changes: 4 additions & 2 deletions src/Proto.Cluster/PubSub/PubSubMemberDeliveryActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ public Task ReceiveAsync(IContext context)
.Select(sub => DeliverBatch(context, topicBatch, sub))
.ToArray();

context.ReenterAfter(Task.WhenAll(tasks),
() => NotifyAboutInvalidDeliveries(tasks, deliveryBatch.Topic, context));
context.ReenterAfter(Task.WhenAll(tasks), () =>
{
NotifyAboutInvalidDeliveries(tasks, deliveryBatch.Topic, context);
});
}

return Task.CompletedTask;
Expand Down
42 changes: 0 additions & 42 deletions src/Proto.OpenTelemetry/OpenTelemetryActorContextDecorator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,6 @@ public override void Respond(object message)=>
OpenTelemetryMethodsDecorators.Respond(message,
() => base.Respond(message));

public override void ReenterAfter(Task target, Action action)
{
var current = Activity.Current?.Context ?? default;
var message = base.Message!;
var a2 = () =>
{
using var x = OpenTelemetryHelpers.BuildStartedActivity(current, Source, nameof(ReenterAfter), message,
_sendActivitySetup);
x?.SetTag(ProtoTags.ActionType, nameof(ReenterAfter));
action();
};
base.ReenterAfter(target, a2);
}

public override void ReenterAfter<T>(Task<T> target, Func<Task<T>, Task> action)
{
var current = Activity.Current?.Context ?? default;
Expand All @@ -105,34 +91,6 @@ public override void ReenterAfter<T>(Task<T> target, Func<Task<T>, Task> action)
base.ReenterAfter(target, a2);
}

public override void ReenterAfter(Task target, Action<Task> action)
{
var current = Activity.Current?.Context ?? default;
var message = base.Message!;
Action<Task> a2 = t =>
{
using var x = OpenTelemetryHelpers.BuildStartedActivity(current, Source, nameof(ReenterAfter), message,
_sendActivitySetup);
x?.SetTag(ProtoTags.ActionType, nameof(ReenterAfter));
action(t);
};
base.ReenterAfter(target, a2);
}

public override void ReenterAfter<T>(Task<T> target, Action<Task<T>> action)
{
var current = Activity.Current?.Context ?? default;
var message = base.Message!;
Action<Task<T>> a2 = t =>
{
using var x = OpenTelemetryHelpers.BuildStartedActivity(current, Source, nameof(ReenterAfter), message,
_sendActivitySetup);
x?.SetTag(ProtoTags.ActionType, nameof(ReenterAfter));
action(t);
};
base.ReenterAfter(target, a2);
}

public override void ReenterAfter(Task target, Func<Task, Task> action)
{
var current = Activity.Current?.Context ?? default;
Expand Down
Loading
Loading