Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions src/Proto.Actor/ActorSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,32 +45,71 @@ public ActorSystem(ActorSystemConfig config)
RunThreadPoolStats();
}

/// <summary>
/// Unique Id of the actor system. Used as member id for cluster members.
/// This is generated by the actor system and is used to identify the actor system in the network.
/// </summary>
public string Id { get; } = Guid.NewGuid().ToString("N");

/// <summary>
/// Address of the actor system when used in Proto.Remote or Proto.Cluster.
/// </summary>
public string Address { get; private set; } = NoHost;

/// <summary>
/// Configuration used to create the actor system.
/// </summary>
public ActorSystemConfig Config { get; }

/// <summary>
/// Manages all processes in the actor system (actors, futures, event stream, etc.).
/// </summary>
public ProcessRegistry ProcessRegistry { get; }

/// <summary>
/// Root context of the actor system. Use it to spawn actors or send messages from outside of an actor context.
/// </summary>
public IRootContext Root { get; }

/// <summary>
/// Allows to access the stop cancellation token and stop reason.
/// Use <see cref="ShutdownAsync"/> to stop the actor system.
/// </summary>
public Stopper Stopper { get; }

/// <summary>
/// Manages all the guardians in the actor system.
/// </summary>
public Guardians Guardians { get; }

/// <summary>
/// DeadLetter process that receives all messages that could not be delivered to an actor.
/// </summary>
public DeadLetterProcess DeadLetter { get; }

/// <summary>
/// Allows to broadcast messages across the actor system to anyone who explicitly subscribed.
/// </summary>
public EventStream EventStream { get; }

/// <summary>
/// Diagnostics and metrics for the actor system.
/// </summary>
public ProtoMetrics Metrics { get; }

/// <summary>
/// Contains extensions for the actor system. Examples: Cluster, PubSub, etc.
/// </summary>
public ActorSystemExtensions Extensions { get; }

private Lazy<FutureFactory> DeferredFuture { get; }

internal FutureFactory Future => DeferredFuture.Value;

/// <summary>
/// Cancellation token use to stop the actor system. Register a callback for this token to be notified when the ActorSystem begins stopping.
/// Use <see cref="ShutdownAsync"/> to stop the actor system.
/// </summary>
public CancellationToken Shutdown => Stopper.Token;

private void RunThreadPoolStats()
Expand All @@ -97,6 +136,11 @@ private void RunThreadPoolStats()
);
}

/// <summary>
/// Stops the actor system and records the reason.
/// </summary>
/// <param name="reason">Shutdown reason</param>
/// <returns></returns>
public Task ShutdownAsync(string reason="")
{
try
Expand All @@ -112,13 +156,21 @@ public Task ShutdownAsync(string reason="")
return Task.CompletedTask;
}

/// <summary>
/// Sets the network address of the actor system. Used by Proto.Remote.
/// </summary>
/// <param name="host"></param>
/// <param name="port"></param>
public void SetAddress(string host, int port)
{
_host = host;
_port = port;
Address = $"{host}:{port}";
}

/// <summary>
/// Sets the address of the actor system to client address. Used by Proto.Remote.
/// </summary>
public void SetClientAddress()
{
Address = $"{Client}/{Id}";
Expand All @@ -129,12 +181,30 @@ public void SetClientAddress()
public RootContext NewRoot(MessageHeader? headers = null, params Func<Sender, Sender>[] middleware) =>
new(this, headers, middleware);

/// <summary>
/// Gets the network address of the actor system. Used by Proto.Remote.
/// </summary>
/// <returns></returns>
public (string Host, int Port) GetAddress() => (_host, _port);

/// <summary>
/// Applies props configuration delegate from actor system configuration.
/// </summary>
/// <param name="props"></param>
/// <returns></returns>
public Props ConfigureProps(Props props) => Config.ConfigureProps(props);

/// <summary>
/// Applies props configuration delegate for system actors.
/// </summary>
/// <param name="name"></param>
/// <param name="props"></param>
/// <returns></returns>
public Props ConfigureSystemProps(string name, Props props) => Config.ConfigureSystemProps(name, props);

/// <summary>
/// Stops the actor system with reason = "Disposed"
/// </summary>
public async ValueTask DisposeAsync()
{
await ShutdownAsync("Disposed");
Expand Down
26 changes: 26 additions & 0 deletions src/Proto.Actor/Behavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,52 @@

namespace Proto;

/// <summary>
/// Utility class for creating state machines. See <a href="https://proto.actor/docs/behaviors/">Behaviors</a> for more information.
/// </summary>
public class Behavior
{
private readonly Stack<Receive> _behaviors = new();

/// <summary>
/// Initializes the behavior with an initial message handler. Use <see cref="Become"/> to set initial message handler.
/// </summary>
public Behavior()
{
}

/// <summary>
/// Initializes the behavior with an initial message handler.
/// </summary>
/// <param name="receive">Function to process actor's messages</param>
public Behavior(Receive receive) => Become(receive);

/// <summary>
/// Switches to a new behavior. Previous behavior stack is cleared.
/// </summary>
/// <param name="receive">Function to process actor's messages</param>
public void Become(Receive receive)
{
_behaviors.Clear();
_behaviors.Push(receive);
}

/// <summary>
/// Switches to a new behavior. Previous behavior is stored on a stack.
/// </summary>
/// <param name="receive"></param>
public void BecomeStacked(Receive receive) => _behaviors.Push(receive);

/// <summary>
/// Restores previous behavior from the stack.
/// </summary>
public void UnbecomeStacked() => _behaviors.Pop();

/// <summary>
/// Handle the message with currently active message handler (behavior).
/// </summary>
/// <param name="context">Actor context to process</param>
/// <returns></returns>
public Task ReceiveAsync(IContext context)
{
var behavior = _behaviors.Peek();
Expand Down
30 changes: 26 additions & 4 deletions src/Proto.Actor/CancellationTokens.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,34 @@ namespace Proto;
public static class CancellationTokens
{
private record TokenEntry(DateTimeOffset Timestamp, CancellationToken Token);

private static readonly ConcurrentDictionary<int, TokenEntry> Tokens = new();

/// <summary>
/// Gets a cancellation token that will be cancelled after the specified time rounded up to nearest second.
/// </summary>
/// <remarks>The tokens creation is optimized and they are reused, hence cancellation timeout can be off by (at most) 500ms.
/// Because of that it is best to use this method when the timeout is at least several seconds</remarks>
/// <param name="duration"></param>
/// <returns></returns>
public static CancellationToken FromSeconds(TimeSpan duration)
{
var seconds = (int)Math.Ceiling(duration.TotalSeconds);
var seconds = (int) Math.Ceiling(duration.TotalSeconds);
return FromSeconds(seconds);
}

/// <summary>
/// Gets a cancellation token that will be cancelled after specified number of seconds.
/// </summary>
/// <param name="seconds"></param>
/// <remarks>The tokens creation is optimized and they are reused, hence cancellation timeout can be off by (at most) 500ms.
/// Because of that it is best to use this method when the timeout is at least several seconds</remarks>
/// <returns></returns>
/// <exception cref="ArgumentOutOfRangeException">Thrown if seconds is less than 1</exception>
public static CancellationToken FromSeconds(int seconds)
{
if (seconds < 1) throw new ArgumentOutOfRangeException(nameof(seconds));

static TokenEntry ValueFactory(int seconds)
{
var cts = new CancellationTokenSource(seconds * 1000);
Expand All @@ -45,7 +60,14 @@ static TokenEntry ValueFactory(int seconds)
Tokens.TryRemove(seconds, out _);
}
}
[Obsolete("Use and dispose CancellationTokenSource, or use CancellationTokens.FromSeconds",true)]

[Obsolete("Use and dispose CancellationTokenSource, or use CancellationTokens.FromSeconds", true)]
public static CancellationToken WithTimeout(int ms) => new CancellationTokenSource(ms).Token;

/// <summary>
/// Creates a new CancellationTokenSource that will be cancelled after the specified time.
/// </summary>
/// <param name="timeSpan"></param>
/// <returns></returns>
public static CancellationToken WithTimeout(TimeSpan timeSpan) => new CancellationTokenSource(timeSpan).Token;
}
4 changes: 3 additions & 1 deletion src/Proto.Actor/Channels/ChannelPublisherActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ namespace Proto.Channels;
public static class ChannelPublisher
{
/// <summary>
/// Starts a new publisher actor
/// Starts a new channel publisher actor. This actor will read from the given channel
/// and send the received messages to the subscribers. The actor will poison itself when the channel is closed.
/// </summary>
/// <remarks>Use <see cref="ChannelSubscriber.StartNew{T}"/> to subscribe</remarks>
/// <param name="context">The parent context used to spawn the actor</param>
/// <param name="channel">The source channel</param>
/// <param name="name">The name of the publisher actor</param>
Expand Down
3 changes: 2 additions & 1 deletion src/Proto.Actor/Channels/ChannelSubscriberActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ namespace Proto.Channels;
public static class ChannelSubscriber
{
/// <summary>
/// Starts a new subscriber actor
/// Starts a new subscriber actor, that subscribes to messages from <see cref="ChannelPublisherActor{T}"/>.
/// Received messages will be sent to the specified channel.
/// </summary>
/// <param name="context">The parent context used to spawn</param>
/// <param name="publisher">The PID of the publisher actor to subscribe to</param>
Expand Down
Loading