Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions logs/log1756098266.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## Variable name audit

- Renamed ambiguous variables like `t`, `t1`, `t2`, and `tcs` across actor, remote, and cluster core libraries to descriptive names (e.g., `receiveTask`, `gracefullyLeftEntries`, `completionSource`).
- Updated foreach loop variables to meaningful names (`stat`, `terminated`, etc.) to improve readability.
- Motivation: clearer intent of local variables simplifies maintenance and reduces cognitive load.
6 changes: 3 additions & 3 deletions src/Proto.Actor/Context/ActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,13 @@ public void ReenterAfterCancellation(CancellationToken token, Action onCancelled
return;
}

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var registration = token.Register(() => tcs.SetResult(true));
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var registration = token.Register(() => completionSource.SetResult(true));

// Ensures registration is disposed with the actor
var inceptionRegistration = CancellationToken.Register(() => registration.Dispose());

((IContext)this).ReenterAfter(tcs.Task, () =>
((IContext)this).ReenterAfter(completionSource.Task, () =>
{
inceptionRegistration.Dispose();
onCancelled();
Expand Down
8 changes: 4 additions & 4 deletions src/Proto.Actor/Context/DeadlineContextDecorator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,24 @@ public DeadlineContextDecorator(IContext context, TimeSpan deadline, ILogger log

public override async Task Receive(MessageEnvelope envelope)
{
var t = base.Receive(envelope);
var receiveTask = base.Receive(envelope);

if (t.IsCompleted)
if (receiveTask.IsCompleted)
{
return;
}

try
{
await t.WaitAsync(_deadline).ConfigureAwait(false);
await receiveTask.WaitAsync(_deadline).ConfigureAwait(false);
}
catch (TimeoutException)
{
_logger.ActorDeadlineExceededOnMessage(_context.Self, _deadline, envelope.Message);

// keep waiting, we cannot just ignore and continue as an async task might still be running and updating state of the actor
// if we return here, actor concurrency guarantees could break
await t.ConfigureAwait(false);
await receiveTask.ConfigureAwait(false);
}
}
}
8 changes: 4 additions & 4 deletions src/Proto.Actor/Context/StartupDeadlineContextDecorator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,24 @@ public override async Task Receive(MessageEnvelope envelope)
var (m,_,_) = MessageEnvelope.Unwrap(envelope);
if (m is Started)
{
var t = base.Receive(envelope);
var receiveTask = base.Receive(envelope);

if (t.IsCompleted)
if (receiveTask.IsCompleted)
{
return;
}

try
{
await t.WaitAsync(_deadline).ConfigureAwait(false);
await receiveTask.WaitAsync(_deadline).ConfigureAwait(false);
}
catch (TimeoutException)
{
_logger.ActorDeadlineExceededOnStart(_context.Self, _deadline);

// keep waiting, we cannot just ignore and continue as an async task might still be running and updating state of the actor
// if we return here, actor concurrency guarantees could break
await t.ConfigureAwait(false);
await receiveTask.ConfigureAwait(false);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/Proto.Actor/Diagnostics/DiagnosticTools.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ public static class DiagnosticTools
/// <returns></returns>
public static async Task<string> GetDiagnosticsString(ActorSystem system, PID pid)
{
var tcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
var request = new ProcessDiagnosticsRequest(tcs);
var completionSource = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
var request = new ProcessDiagnosticsRequest(completionSource);
pid.SendSystemMessage(system, request);
var res = await tcs.Task.ConfigureAwait(false);
var res = await completionSource.Task.ConfigureAwait(false);

return res;
}
Expand Down
40 changes: 20 additions & 20 deletions src/Proto.Actor/Future/FutureBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public FutureBatchProcess(ActorSystem system, int size, CancellationToken ct) :
{
_cancellation = ct.Register(() =>
{
foreach (var tcs in _completionSources)
foreach (var completionSource in _completionSources)
{
if (tcs?.TrySetException(
if (completionSource?.TrySetException(
new TimeoutException("Request didn't receive any Response within the expected time.")
) == true)
{
Expand All @@ -82,30 +82,30 @@ public void Dispose()

if (index < _completionSources.Length)
{
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
_completionSources[index] = tcs;
var completionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
_completionSources[index] = completionSource;

if (System.Metrics.Enabled)
{
ActorMetrics.FuturesStartedCount.Add(1, _metricTags);
}

return new SimpleFutureHandle(Pid.WithRequestId(ToRequestId(index)), tcs, _onTimeout);
return new SimpleFutureHandle(Pid.WithRequestId(ToRequestId(index)), completionSource, _onTimeout);
}

return null;
}

protected internal override void SendUserMessage(PID pid, object message)
{
if (!TryGetTaskCompletionSource(pid.RequestId, out var index, out var tcs))
if (!TryGetTaskCompletionSource(pid.RequestId, out var index, out var completionSource))
{
return;
}

try
{
tcs.TrySetResult(message);
completionSource.TrySetResult(message);
_completionSources[index] = default;
}
finally
Expand All @@ -126,14 +126,14 @@ protected internal override void SendSystemMessage(PID pid, SystemMessage messag
return;
}

if (!TryGetTaskCompletionSource(pid.RequestId, out var index, out var tcs))
if (!TryGetTaskCompletionSource(pid.RequestId, out var index, out var completionSource))
{
return;
}

try
{
tcs.TrySetResult(default!);
completionSource.TrySetResult(default!);
_completionSources[index] = default;
}
finally
Expand All @@ -154,48 +154,48 @@ private bool TryGetIndex(uint requestId, out int index)

private static uint ToRequestId(int index) => (uint)(index + 1);

private bool TryGetTaskCompletionSource(uint requestId, out int index, out TaskCompletionSource<object> tcs)
private bool TryGetTaskCompletionSource(uint requestId, out int index, out TaskCompletionSource<object> completionSource)
{
if (!TryGetIndex(requestId, out index))
{
tcs = default!;
completionSource = default!;

return false;
}

tcs = _completionSources[index]!;
completionSource = _completionSources[index]!;

return tcs != default!;
return completionSource != default!;
}

private sealed class SimpleFutureHandle : IFuture
{
private readonly Action? _onTimeout;

public SimpleFutureHandle(PID pid, TaskCompletionSource<object> tcs, Action? onTimeout)
public SimpleFutureHandle(PID pid, TaskCompletionSource<object> completionSource, Action? onTimeout)
{
_onTimeout = onTimeout;
Pid = pid;
Tcs = tcs;
CompletionSource = completionSource;
}

internal TaskCompletionSource<object> Tcs { get; }
internal TaskCompletionSource<object> CompletionSource { get; }

public PID Pid { get; }
public Task<object> Task => Tcs.Task;
public Task<object> Task => CompletionSource.Task;

public async Task<object> GetTask(CancellationToken cancellationToken)
{
try
{
if (cancellationToken == default)
{
return await Tcs.Task.ConfigureAwait(false);
return await CompletionSource.Task.ConfigureAwait(false);
}

await using (cancellationToken.Register(() => Tcs.TrySetCanceled()).ConfigureAwait(false))
await using (cancellationToken.Register(() => CompletionSource.TrySetCanceled()).ConfigureAwait(false))
{
return await Tcs.Task.ConfigureAwait(false);
return await CompletionSource.Task.ConfigureAwait(false);
}
}
catch
Expand Down
14 changes: 7 additions & 7 deletions src/Proto.Actor/Future/SharedFuture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -215,30 +215,30 @@ private sealed class SharedFutureHandle : IFuture
{
private readonly SharedFutureProcess _parent;

private readonly TaskCompletionSource<object> _tcs;
private readonly TaskCompletionSource<object> _completionSource;

public SharedFutureHandle(SharedFutureProcess parent, PID pid, TaskCompletionSource<object> tcs)
public SharedFutureHandle(SharedFutureProcess parent, PID pid, TaskCompletionSource<object> completionSource)
{
_parent = parent;
Pid = pid;
_tcs = tcs;
_completionSource = completionSource;
}

public PID Pid { get; }
public Task<object> Task => _tcs.Task;
public Task<object> Task => _completionSource.Task;

public async Task<object> GetTask(CancellationToken cancellationToken)
{
try
{
if (cancellationToken == default)
{
return await _tcs.Task.ConfigureAwait(false);
return await _completionSource.Task.ConfigureAwait(false);
}

await using (cancellationToken.Register(() => _tcs.TrySetCanceled()).ConfigureAwait(false))
await using (cancellationToken.Register(() => _completionSource.TrySetCanceled()).ConfigureAwait(false))
{
return await _tcs.Task.ConfigureAwait(false);
return await _completionSource.Task.ConfigureAwait(false);
}
}
catch
Expand Down
52 changes: 26 additions & 26 deletions src/Proto.Actor/Mailbox/DefaultMailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ public void PostUserMessage(object msg)
{
_userMailbox.Push(message);

foreach (var t in _stats)
foreach (var stat in _stats)
{
t.MessagePosted(message);
stat.MessagePosted(message);
}
}

Expand All @@ -82,9 +82,9 @@ public void PostUserMessage(object msg)
_userMailbox.Push(msg);
}

foreach (var t in _stats)
foreach (var stat in _stats)
{
t.MessagePosted(msg);
stat.MessagePosted(msg);
}

Schedule();
Expand All @@ -93,9 +93,9 @@ public void PostUserMessage(object msg)
{
_userMailbox.Push(msg);

foreach (var t in _stats)
foreach (var stat in _stats)
{
t.MessagePosted(msg);
stat.MessagePosted(msg);
}

Schedule();
Expand All @@ -111,9 +111,9 @@ public void PostSystemMessage(object msg)
_invoker?.CancellationTokenSource?.Cancel();
}

foreach (var t in _stats)
foreach (var stat in _stats)
{
t.MessagePosted(msg);
stat.MessagePosted(msg);
}

Schedule();
Expand All @@ -127,9 +127,9 @@ public void RegisterHandlers(IMessageInvoker invoker, IDispatcher dispatcher)

public void Start()
{
foreach (var t in _stats)
foreach (var stat in _stats)
{
t.MailboxStarted();
stat.MailboxStarted();
}
}

Expand All @@ -150,9 +150,9 @@ private static Task RunAsync(DefaultMailbox mailbox)
}
else
{
foreach (var t in mailbox._stats)
foreach (var stat in mailbox._stats)
{
t.MailboxEmpty();
stat.MailboxEmpty();
}
}

Expand All @@ -170,9 +170,9 @@ static async Task Await(DefaultMailbox self, Task task)
}
else
{
foreach (var t in self._stats)
foreach (var stat in self._stats)
{
t.MailboxEmpty();
stat.MailboxEmpty();
}
}
}
Expand All @@ -197,16 +197,16 @@ private Task ProcessMessages()
_ => _suspended
};

var t = _invoker.InvokeSystemMessageAsync(sys);
var systemMessageTask = _invoker.InvokeSystemMessageAsync(sys);

if (!t.IsCompletedSuccessfully)
if (!systemMessageTask.IsCompletedSuccessfully)
{
return Await(msg, t, this);
return Await(msg, systemMessageTask, this);
}

foreach (var t1 in _stats)
foreach (var stat in _stats)
{
t1.MessageReceived(msg);
stat.MessageReceived(msg);
}

continue;
Expand All @@ -221,16 +221,16 @@ private Task ProcessMessages()

if (msg is not null)
{
var t = _invoker.InvokeUserMessageAsync(msg);
var userMessageTask = _invoker.InvokeUserMessageAsync(msg);

if (!t.IsCompletedSuccessfully)
if (!userMessageTask.IsCompletedSuccessfully)
{
return Await(msg, t, this);
return Await(msg, userMessageTask, this);
}

foreach (var t1 in _stats)
foreach (var stat in _stats)
{
t1.MessageReceived(msg);
stat.MessageReceived(msg);
}
}
else
Expand All @@ -253,9 +253,9 @@ static async Task Await(object msg, Task task, DefaultMailbox self)
{
await task.ConfigureAwait(false);

foreach (var t1 in self._stats)
foreach (var stat in self._stats)
{
t1.MessageReceived(msg);
stat.MessageReceived(msg);
}
}
catch (Exception e)
Expand Down
Loading
Loading