Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/Proto.Actor/Mailbox/BatchingMailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ public class BatchingMailbox : IMailbox
private IDispatcher _dispatcher = null!;
private IMessageInvoker _invoker = null!;

private int _status = MailboxStatus.Idle;
private const int Idle = 0;
private const int Busy = 1;

private int _status = Idle;
private bool _suspended;

public BatchingMailbox(int batchSize)
Expand Down Expand Up @@ -99,7 +102,7 @@ private async Task RunAsync()
_invoker.EscalateFailure(x, currentMessage);
}

Interlocked.Exchange(ref _status, MailboxStatus.Idle);
Interlocked.Exchange(ref _status, Idle);

if (_systemMessages.HasMessages || (_userMessages.HasMessages && !_suspended))
{
Expand All @@ -109,7 +112,7 @@ private async Task RunAsync()

private void Schedule()
{
if (Interlocked.CompareExchange(ref _status, MailboxStatus.Busy, MailboxStatus.Idle) == MailboxStatus.Idle)
if (Interlocked.CompareExchange(ref _status, Busy, Idle) == Idle)
{
_dispatcher.Schedule(RunAsync);
}
Expand Down
21 changes: 21 additions & 0 deletions src/Proto.Actor/Mailbox/BoundedMailbox.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// -----------------------------------------------------------------------
// <copyright file="BoundedMailbox.cs" company="Asynkron AB">
// Copyright (C) 2015-2025 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------

using System.Threading.Channels;

namespace Proto.Mailbox;

/// <summary>
/// Factory helpers for creating bounded mailboxes.
/// </summary>
public static class BoundedMailbox
{
public static IMailbox Create(int size, params IMailboxStatistics[] stats) =>
new DefaultMailbox(new LockingUnboundedMailboxQueue(4), new BoundedMailboxQueue(size), stats);

public static IMailbox Create(int size, BoundedChannelFullMode fullMode, params IMailboxStatistics[] stats) =>
new DefaultMailbox(new LockingUnboundedMailboxQueue(4), new BoundedMailboxQueue(size, fullMode), stats);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,55 +6,23 @@

using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace Proto.Mailbox;

internal static class MailboxStatus
{
public const int Idle = 0;
public const int Busy = 1;
}

public interface IMailbox
{
int UserMessageCount { get; }

void PostUserMessage(object msg);

void PostSystemMessage(object msg);

void RegisterHandlers(IMessageInvoker invoker, IDispatcher dispatcher);

void Start();
}

public static class BoundedMailbox
{
public static IMailbox Create(int size, params IMailboxStatistics[] stats) =>
new DefaultMailbox(new LockingUnboundedMailboxQueue(4), new BoundedMailboxQueue(size), stats);

public static IMailbox Create(int size, BoundedChannelFullMode fullMode, params IMailboxStatistics[] stats) =>
new DefaultMailbox(new LockingUnboundedMailboxQueue(4), new BoundedMailboxQueue(size, fullMode), stats);
}

public static class UnboundedMailbox
{
public static IMailbox Create(params IMailboxStatistics[] stats) =>
new DefaultMailbox(new LockingUnboundedMailboxQueue(4), new UnboundedMailboxQueue(), stats);
}

public sealed class DefaultMailbox : IMailbox, IThreadPoolWorkItem

{
private const int Idle = 0;
private const int Busy = 1;

private readonly IMailboxStatistics[] _stats;
private readonly IMailboxQueue _systemMessages;
private readonly IMailboxQueue _userMailbox;
private IDispatcher _dispatcher;
private IMessageInvoker _invoker;

private long _status = MailboxStatus.Idle;
private long _status = Idle;
private bool _suspended;

public DefaultMailbox(
Expand Down Expand Up @@ -174,7 +142,7 @@ private static Task RunAsync(DefaultMailbox mailbox)
return Await(mailbox, task);
}

Interlocked.Exchange(ref mailbox._status, MailboxStatus.Idle);
Interlocked.Exchange(ref mailbox._status, Idle);

if (mailbox._systemMessages.HasMessages || mailbox is { _suspended: false, _userMailbox.HasMessages: true })
{
Expand All @@ -194,7 +162,7 @@ static async Task Await(DefaultMailbox self, Task task)
{
await task.ConfigureAwait(false);

Interlocked.Exchange(ref self._status, MailboxStatus.Idle);
Interlocked.Exchange(ref self._status, Idle);

if (self._systemMessages.HasMessages || self is { _suspended: false, _userMailbox.HasMessages: true })
{
Expand Down Expand Up @@ -300,7 +268,7 @@ static async Task Await(object msg, Task task, DefaultMailbox self)

private void Schedule()
{
if (Interlocked.CompareExchange(ref _status, MailboxStatus.Busy, MailboxStatus.Idle) == MailboxStatus.Idle)
if (Interlocked.CompareExchange(ref _status, Busy, Idle) == Idle)
{
if (_dispatcher == Dispatchers.DefaultDispatcher)
{
Expand Down
23 changes: 23 additions & 0 deletions src/Proto.Actor/Mailbox/IMailbox.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// -----------------------------------------------------------------------
// <copyright file="IMailbox.cs" company="Asynkron AB">
// Copyright (C) 2015-2025 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------

namespace Proto.Mailbox;

/// <summary>
/// Defines the core operations of a mailbox used by actors.
/// </summary>
public interface IMailbox
{
int UserMessageCount { get; }

void PostUserMessage(object msg);

void PostSystemMessage(object msg);

void RegisterHandlers(IMessageInvoker invoker, IDispatcher dispatcher);

void Start();
}
16 changes: 16 additions & 0 deletions src/Proto.Actor/Mailbox/UnboundedMailbox.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// -----------------------------------------------------------------------
// <copyright file="UnboundedMailbox.cs" company="Asynkron AB">
// Copyright (C) 2015-2025 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------

namespace Proto.Mailbox;

/// <summary>
/// Factory helpers for creating unbounded mailboxes.
/// </summary>
public static class UnboundedMailbox
{
public static IMailbox Create(params IMailboxStatistics[] stats) =>
new DefaultMailbox(new LockingUnboundedMailboxQueue(4), new UnboundedMailboxQueue(), stats);
}
3 changes: 2 additions & 1 deletion tests/Proto.Actor.Tests/Mailbox/MailboxSchedulingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public async Task GivenNonCompletedUserMessage_ShouldSetMailboxToIdleAfterComple
msg1.TaskCompletionSource.SetResult(0);
await Task.Delay(1000);

Assert.Equal(MailboxStatus.Idle, mailbox.Status);
// Mailbox becomes idle (status 0) after completing the user message
Assert.Equal(0, mailbox.Status);
}
}
Loading