diff --git a/src/Proto.Actor/ActorSystem.cs b/src/Proto.Actor/ActorSystem.cs index 129b5eb594..bcc309911c 100644 --- a/src/Proto.Actor/ActorSystem.cs +++ b/src/Proto.Actor/ActorSystem.cs @@ -45,32 +45,71 @@ public ActorSystem(ActorSystemConfig config) RunThreadPoolStats(); } + /// + /// Unique Id of the actor system. Used as member id for cluster members. + /// This is generated by the actor system and is used to identify the actor system in the network. + /// public string Id { get; } = Guid.NewGuid().ToString("N"); + /// + /// Address of the actor system when used in Proto.Remote or Proto.Cluster. + /// public string Address { get; private set; } = NoHost; + /// + /// Configuration used to create the actor system. + /// public ActorSystemConfig Config { get; } + /// + /// Manages all processes in the actor system (actors, futures, event stream, etc.). + /// public ProcessRegistry ProcessRegistry { get; } + /// + /// Root context of the actor system. Use it to spawn actors or send messages from outside of an actor context. + /// public IRootContext Root { get; } + /// + /// Allows to access the stop cancellation token and stop reason. + /// Use to stop the actor system. + /// public Stopper Stopper { get; } + /// + /// Manages all the guardians in the actor system. + /// public Guardians Guardians { get; } + /// + /// DeadLetter process that receives all messages that could not be delivered to an actor. + /// public DeadLetterProcess DeadLetter { get; } + /// + /// Allows to broadcast messages across the actor system to anyone who explicitly subscribed. + /// public EventStream EventStream { get; } + /// + /// Diagnostics and metrics for the actor system. + /// public ProtoMetrics Metrics { get; } + /// + /// Contains extensions for the actor system. Examples: Cluster, PubSub, etc. + /// public ActorSystemExtensions Extensions { get; } private Lazy DeferredFuture { get; } internal FutureFactory Future => DeferredFuture.Value; + /// + /// Cancellation token use to stop the actor system. Register a callback for this token to be notified when the ActorSystem begins stopping. + /// Use to stop the actor system. + /// public CancellationToken Shutdown => Stopper.Token; private void RunThreadPoolStats() @@ -97,6 +136,11 @@ private void RunThreadPoolStats() ); } + /// + /// Stops the actor system and records the reason. + /// + /// Shutdown reason + /// public Task ShutdownAsync(string reason="") { try @@ -112,6 +156,11 @@ public Task ShutdownAsync(string reason="") return Task.CompletedTask; } + /// + /// Sets the network address of the actor system. Used by Proto.Remote. + /// + /// + /// public void SetAddress(string host, int port) { _host = host; @@ -119,6 +168,9 @@ public void SetAddress(string host, int port) Address = $"{host}:{port}"; } + /// + /// Sets the address of the actor system to client address. Used by Proto.Remote. + /// public void SetClientAddress() { Address = $"{Client}/{Id}"; @@ -129,12 +181,30 @@ public void SetClientAddress() public RootContext NewRoot(MessageHeader? headers = null, params Func[] middleware) => new(this, headers, middleware); + /// + /// Gets the network address of the actor system. Used by Proto.Remote. + /// + /// public (string Host, int Port) GetAddress() => (_host, _port); + /// + /// Applies props configuration delegate from actor system configuration. + /// + /// + /// public Props ConfigureProps(Props props) => Config.ConfigureProps(props); + /// + /// Applies props configuration delegate for system actors. + /// + /// + /// + /// public Props ConfigureSystemProps(string name, Props props) => Config.ConfigureSystemProps(name, props); + /// + /// Stops the actor system with reason = "Disposed" + /// public async ValueTask DisposeAsync() { await ShutdownAsync("Disposed"); diff --git a/src/Proto.Actor/Behavior.cs b/src/Proto.Actor/Behavior.cs index a5ab5825da..ffc5c3c076 100644 --- a/src/Proto.Actor/Behavior.cs +++ b/src/Proto.Actor/Behavior.cs @@ -8,26 +8,52 @@ namespace Proto; +/// +/// Utility class for creating state machines. See Behaviors for more information. +/// public class Behavior { private readonly Stack _behaviors = new(); + /// + /// Initializes the behavior with an initial message handler. Use to set initial message handler. + /// public Behavior() { } + /// + /// Initializes the behavior with an initial message handler. + /// + /// Function to process actor's messages public Behavior(Receive receive) => Become(receive); + /// + /// Switches to a new behavior. Previous behavior stack is cleared. + /// + /// Function to process actor's messages public void Become(Receive receive) { _behaviors.Clear(); _behaviors.Push(receive); } + /// + /// Switches to a new behavior. Previous behavior is stored on a stack. + /// + /// public void BecomeStacked(Receive receive) => _behaviors.Push(receive); + /// + /// Restores previous behavior from the stack. + /// public void UnbecomeStacked() => _behaviors.Pop(); + /// + /// Handle the message with currently active message handler (behavior). + /// + /// Actor context to process + /// public Task ReceiveAsync(IContext context) { var behavior = _behaviors.Peek(); diff --git a/src/Proto.Actor/CancellationTokens.cs b/src/Proto.Actor/CancellationTokens.cs index 0a21d5d048..70f6a33ef0 100644 --- a/src/Proto.Actor/CancellationTokens.cs +++ b/src/Proto.Actor/CancellationTokens.cs @@ -12,19 +12,34 @@ namespace Proto; public static class CancellationTokens { private record TokenEntry(DateTimeOffset Timestamp, CancellationToken Token); - + private static readonly ConcurrentDictionary Tokens = new(); + /// + /// Gets a cancellation token that will be cancelled after the specified time rounded up to nearest second. + /// + /// The tokens creation is optimized and they are reused, hence cancellation timeout can be off by (at most) 500ms. + /// Because of that it is best to use this method when the timeout is at least several seconds + /// + /// public static CancellationToken FromSeconds(TimeSpan duration) { - var seconds = (int)Math.Ceiling(duration.TotalSeconds); + var seconds = (int) Math.Ceiling(duration.TotalSeconds); return FromSeconds(seconds); } + /// + /// Gets a cancellation token that will be cancelled after specified number of seconds. + /// + /// + /// The tokens creation is optimized and they are reused, hence cancellation timeout can be off by (at most) 500ms. + /// Because of that it is best to use this method when the timeout is at least several seconds + /// + /// Thrown if seconds is less than 1 public static CancellationToken FromSeconds(int seconds) { if (seconds < 1) throw new ArgumentOutOfRangeException(nameof(seconds)); - + static TokenEntry ValueFactory(int seconds) { var cts = new CancellationTokenSource(seconds * 1000); @@ -45,7 +60,14 @@ static TokenEntry ValueFactory(int seconds) Tokens.TryRemove(seconds, out _); } } - [Obsolete("Use and dispose CancellationTokenSource, or use CancellationTokens.FromSeconds",true)] + + [Obsolete("Use and dispose CancellationTokenSource, or use CancellationTokens.FromSeconds", true)] public static CancellationToken WithTimeout(int ms) => new CancellationTokenSource(ms).Token; + + /// + /// Creates a new CancellationTokenSource that will be cancelled after the specified time. + /// + /// + /// public static CancellationToken WithTimeout(TimeSpan timeSpan) => new CancellationTokenSource(timeSpan).Token; } \ No newline at end of file diff --git a/src/Proto.Actor/Channels/ChannelPublisherActor.cs b/src/Proto.Actor/Channels/ChannelPublisherActor.cs index 807288fb1b..b6f905c9ad 100644 --- a/src/Proto.Actor/Channels/ChannelPublisherActor.cs +++ b/src/Proto.Actor/Channels/ChannelPublisherActor.cs @@ -14,8 +14,10 @@ namespace Proto.Channels; public static class ChannelPublisher { /// - /// Starts a new publisher actor + /// Starts a new channel publisher actor. This actor will read from the given channel + /// and send the received messages to the subscribers. The actor will poison itself when the channel is closed. /// + /// Use to subscribe /// The parent context used to spawn the actor /// The source channel /// The name of the publisher actor diff --git a/src/Proto.Actor/Channels/ChannelSubscriberActor.cs b/src/Proto.Actor/Channels/ChannelSubscriberActor.cs index adb3f6ce82..ebb75a517f 100644 --- a/src/Proto.Actor/Channels/ChannelSubscriberActor.cs +++ b/src/Proto.Actor/Channels/ChannelSubscriberActor.cs @@ -13,7 +13,8 @@ namespace Proto.Channels; public static class ChannelSubscriber { /// - /// Starts a new subscriber actor + /// Starts a new subscriber actor, that subscribes to messages from . + /// Received messages will be sent to the specified channel. /// /// The parent context used to spawn /// The PID of the publisher actor to subscribe to diff --git a/src/Proto.Actor/Configuration/ActorSystemConfig.cs b/src/Proto.Actor/Configuration/ActorSystemConfig.cs index 5cd87aedec..8106f5b698 100644 --- a/src/Proto.Actor/Configuration/ActorSystemConfig.cs +++ b/src/Proto.Actor/Configuration/ActorSystemConfig.cs @@ -16,19 +16,19 @@ namespace Proto; public record ActorSystemConfig { /// - /// The interval used to trigger DeadLetter throttling + /// The interval used to trigger throttling of deadletter message logs /// public TimeSpan DeadLetterThrottleInterval { get; init; } /// - /// The counter used to trigger DeadLetter throttling + /// The counter used to trigger throttling of deadletter message logs /// DeadLetter throttling triggers when there are DeadLetterThrottleCount deadletters in DeadLetterThrottleInterval /// time /// public int DeadLetterThrottleCount { get; init; } /// - /// Enables logging for DeadLetter responses in Request/RequestAsync + /// Enables logging for DeadLetter events in Request/RequestAsync (when a message reaches DeadLetter instead of target actor) /// When disabled, the requesting code is responsible for logging manually /// public bool DeadLetterRequestLogging { get; set; } = true; @@ -39,7 +39,7 @@ public record ActorSystemConfig public bool DeveloperSupervisionLogging { get; init; } /// - /// Enables actor metrics + /// Enables actor metrics. Set to true if you want to export the metrics with OpenTelemetry exporters. /// public bool MetricsEnabled { get; init; } @@ -59,6 +59,7 @@ public record ActorSystemConfig /// /// Allows ActorSystem-wide augmentation of system Props /// All system props are translated via this function + /// By default, DeadlineDecorator, LoggingContextDecorator are used. Additionally, the supervision strategy is set to AlwaysRestart. /// public Func ConfigureSystemProps { get; init; } = (_,props) => { var logger = Log.CreateLogger("Proto.SystemActors"); @@ -101,6 +102,10 @@ public record ActorSystemConfig /// The default timeout for RequestAsync calls /// public TimeSpan ActorRequestTimeout { get; init; } = TimeSpan.FromSeconds(5); + + /// + /// Enables logging for DeadLetter responses in Request/RequestAsync (responses returned from DeadLetter to original sender) + /// public bool DeadLetterResponseLogging { get; init; } = false; /// @@ -109,40 +114,103 @@ public record ActorSystemConfig /// The new ActorSystemConfig public static ActorSystemConfig Setup() => new(); + /// + /// The interval used to trigger throttling of deadletter message logs + /// public ActorSystemConfig WithDeadLetterThrottleInterval(TimeSpan deadLetterThrottleInterval) => this with {DeadLetterThrottleInterval = deadLetterThrottleInterval}; + /// + /// The counter used to trigger throttling of deadletter message logs + /// DeadLetter throttling triggers when there are DeadLetterThrottleCount deadletters in DeadLetterThrottleInterval + /// time + /// public ActorSystemConfig WithDeadLetterThrottleCount(int deadLetterThrottleCount) => this with {DeadLetterThrottleCount = deadLetterThrottleCount}; + /// + /// Enables logging for DeadLetter responses in Request/RequestAsync + /// When disabled, the requesting code is responsible for logging manually + /// public ActorSystemConfig WithDeadLetterRequestLogging(bool enabled) => this with {DeadLetterRequestLogging = enabled}; + /// + /// Enables SharedFutures + /// SharedFutures allows the ActorSystem to avoid registering a new temporary process for each request + /// Instead registering a SharedFuture that can handle multiple requests internally + /// + /// The number of requests that can be handled by a SharedFuture public ActorSystemConfig WithSharedFutures(int size = 5000) => this with {SharedFutures = true, SharedFutureSize = size}; + /// + /// Developer debugging feature, enables extended logging for actor supervision failures + /// public ActorSystemConfig WithDeveloperSupervisionLogging(bool enabled) => this with {DeveloperSupervisionLogging = enabled}; - + + /// + /// Enables actor metrics. Set to true if you want to export the metrics with OpenTelemetry exporters. + /// public ActorSystemConfig WithMetrics(bool enabled = true) => this with {MetricsEnabled = enabled}; + /// + /// Function used to serialize actor state to a diagnostics string + /// Can be used together with RemoteDiagnostics to view the state of remote actors + /// public ActorSystemConfig WithDiagnosticsSerializer(Func serializer) => this with {DiagnosticsSerializer = serializer}; + /// + /// Allows adding middleware to the root context exposed by the ActorSystem. + /// The result from this will be used as the default sender for all requests, + /// except requests overriding the sender context by parameter + /// public ActorSystemConfig WithConfigureRootContext(Func configureContext) => this with {ConfigureRootContext = configureContext}; + + /// + /// Allows ActorSystem-wide augmentation of any Props + /// All props are translated via this function + /// public ActorSystemConfig WithConfigureProps(Func configureProps) => this with {ConfigureProps = configureProps}; + /// + /// Allows ActorSystem-wide augmentation of system Props + /// All system props are translated via this function + /// By default, DeadlineDecorator, LoggingContextDecorator are used. Additionally, the supervision strategy is set to AlwaysRestart. + /// public ActorSystemConfig WithConfigureSystemProps(Func configureSystemProps) => this with {ConfigureSystemProps = configureSystemProps}; + /// + /// Measures the time it takes from scheduling a Task, until the task starts to execute + /// If this deadline expires, the ActorSystem logs that the threadpool is running hot + /// public ActorSystemConfig WithThreadPoolStatsTimeout(TimeSpan threadPoolStatsTimeout) => this with {ThreadPoolStatsTimeout = threadPoolStatsTimeout}; + /// + /// Enables more extensive threadpool stats logging + /// public ActorSystemConfig WithDeveloperThreadPoolStatsLogging(bool enabled) => this with {DeveloperThreadPoolStatsLogging = enabled}; + /// + /// The default timeout for RequestAsync calls + /// public ActorSystemConfig WithActorRequestTimeout(TimeSpan timeout) => this with {ActorRequestTimeout = timeout}; + /// + /// Enables logging for DeadLetter responses in Request/RequestAsync (responses returned from DeadLetter to original sender) + /// public ActorSystemConfig WithDeadLetterResponseLogging(bool enabled) => this with {DeadLetterResponseLogging = enabled}; } //Not part of the contract, but still shipped out of the box public static class ActorSystemConfigExtensions { + /// + /// Enables logging when the Receive method on an actor takes too long. This method appends to wraps existing ConfigureProps delegate. + /// + /// + /// Time allowed for Receive call + /// Log level to use + /// public static ActorSystemConfig WithDeveloperReceiveLogging( this ActorSystemConfig self, TimeSpan receiveDeadline, diff --git a/src/Proto.Actor/Context/ActorContextDecorator.cs b/src/Proto.Actor/Context/ActorContextDecorator.cs index df03005229..7ce0af1141 100644 --- a/src/Proto.Actor/Context/ActorContextDecorator.cs +++ b/src/Proto.Actor/Context/ActorContextDecorator.cs @@ -12,6 +12,9 @@ // ReSharper disable once CheckNamespace namespace Proto; +/// +/// Base class for decorators that decorate (extend) an . +/// public abstract class ActorContextDecorator : IContext { private readonly IContext _context; diff --git a/src/Proto.Actor/Context/ActorLoggingContext.cs b/src/Proto.Actor/Context/ActorLoggingContext.cs index 94e65744b4..4e32574ad9 100644 --- a/src/Proto.Actor/Context/ActorLoggingContext.cs +++ b/src/Proto.Actor/Context/ActorLoggingContext.cs @@ -24,6 +24,9 @@ public static Props WithLoggingContextDecorator( props.WithContextDecorator(ctx => new ActorLoggingContext(ctx, logger, logLevel, infrastructureLogLevel, exceptionLogLevel)); } +/// +/// A decorator for that logs events related to message delivery to the actor. +/// public class ActorLoggingContext : ActorContextDecorator { private readonly ILogger _logger; diff --git a/src/Proto.Actor/Context/DeadlineContextDecorator.cs b/src/Proto.Actor/Context/DeadlineContextDecorator.cs index b7893439e9..d1aba3c2d8 100644 --- a/src/Proto.Actor/Context/DeadlineContextDecorator.cs +++ b/src/Proto.Actor/Context/DeadlineContextDecorator.cs @@ -15,6 +15,13 @@ namespace Proto.Context; [PublicAPI] public static class DeadlineContextExtensions { + /// + /// Adds a decorator for a that logs warning message if Receive takes more time than specified timeout. + /// + /// + /// The timeout for Receive to complete + /// + /// public static Props WithDeadlineDecorator( this Props props, TimeSpan deadline, @@ -22,6 +29,10 @@ ILogger logger ) => props.WithContextDecorator(ctx => new DeadlineContextDecorator(ctx, deadline, logger)); } + +/// +/// A decorator for a that logs warning message if Receive takes more time than specified timeout. +/// public class DeadlineContextDecorator : ActorContextDecorator { private readonly TimeSpan _deadline; diff --git a/src/Proto.Actor/Context/IContext.cs b/src/Proto.Actor/Context/IContext.cs index a71bda00ab..7f69714ef6 100644 --- a/src/Proto.Actor/Context/IContext.cs +++ b/src/Proto.Actor/Context/IContext.cs @@ -11,6 +11,9 @@ // ReSharper disable once CheckNamespace namespace Proto; +/// +/// All contextual information available for a given actor +/// public interface IContext : ISenderContext, IReceiverContext, ISpawnerContext, IStopperContext { /// @@ -19,7 +22,7 @@ public interface IContext : ISenderContext, IReceiverContext, ISpawnerContext, I CancellationToken CancellationToken { get; } /// - /// Gets the receive timeout. + /// Gets the receive timeout. It will be TimeSpan.Zero if receive timeout was not set /// TimeSpan ReceiveTimeout { get; } @@ -29,13 +32,13 @@ public interface IContext : ISenderContext, IReceiverContext, ISpawnerContext, I IReadOnlyCollection Children { get; } /// - /// Sends a response to the current Sender. If the Sender is null, the actor will panic. + /// Sends a response to the current Sender. If the Sender is null, this call has no effect apart from warning log entry. /// /// The message to send void Respond(object message); - + /// - /// Sends a response to the current Sender, including message header + /// Sends a response to the current Sender, including message header. If the Sender is null, this call has no effect apart from warning log entry. /// /// The message to send /// @@ -47,7 +50,7 @@ public interface IContext : ISenderContext, IReceiverContext, ISpawnerContext, I void Stash(); /// - /// Registers the actor as a watcher for the specified PID. + /// Registers the actor as a watcher for the specified PID. When the PID terminates the watcher is notified with message. /// /// The PID to watch void Watch(PID pid); @@ -59,15 +62,22 @@ public interface IContext : ISenderContext, IReceiverContext, ISpawnerContext, I void Unwatch(PID pid); /// - /// Sets the receive timeout. If no message is received for the given duration, a ReceiveTimeout message will be sent + /// Sets the receive timeout. If no message is received for the given duration, a message will be sent /// to the actor. If a message is received within the given duration, the timer is reset, unless the message implements - /// INotInfluenceReceiveTimeout. + /// /// /// The receive timeout duration void SetReceiveTimeout(TimeSpan duration); + /// + /// Cancels the receive timeout. + /// void CancelReceiveTimeout(); + /// + /// Forwards the current message in the context to another actor. + /// + /// Actor to forward to void Forward(PID target); /// @@ -77,7 +87,7 @@ public interface IContext : ISenderContext, IReceiverContext, ISpawnerContext, I /// asynchronous operation completes. /// /// the Task to await - /// the continuation to call once the task is completed + /// The continuation to call once the task is completed. The awaited task is passed in as a parameter. /// The generic type of the task void ReenterAfter(Task target, Func, Task> action); @@ -87,32 +97,33 @@ public interface IContext : ISenderContext, IReceiverContext, ISpawnerContext, I /// The concept is called Reentrancy, where an actor can continue to process messages while also awaiting that some /// asynchronous operation completes. /// - /// the Task to await - /// the continuation to call once the task is completed + /// The Task to await + /// The continuation to call once the task is completed void ReenterAfter(Task target, Action action); - + /// /// Awaits the given target task and once completed, the given action is then completed within the actors concurrency /// constraint. /// The concept is called Reentrancy, where an actor can continue to process messages while also awaiting that some /// asynchronous operation completes. /// - /// the Task to await - /// the continuation to call once the task is completed + /// The Task to await + /// The continuation to call once the task is completed. The awaited task is passed in as a parameter. void ReenterAfter(Task target, Action action); - + /// /// Awaits the given target task and once completed, the given action is then completed within the actors concurrency /// constraint. /// The concept is called Reentrancy, where an actor can continue to process messages while also awaiting that some /// asynchronous operation completes. /// - /// the Task to await - /// the continuation to call once the task is completed - void ReenterAfter(Task target, Func action); + /// The Task to await + /// The continuation to call once the task is completed. The awaited task is passed in as a parameter. + void ReenterAfter(Task target, Func action); /// - /// Captures the current MessageOrEnvelope for the ActorContext + /// Captures the current MessageOrEnvelope for the ActorContext. Use this to stash messages for later processing. Use + /// to process stored messages. /// /// The Captured Context CapturedContext Capture(); @@ -125,7 +136,8 @@ public interface IContext : ISenderContext, IReceiverContext, ISpawnerContext, I void Apply(CapturedContext capturedContext); /// - /// Calls the callback on token cancellation. If CancellationToken is non-cancellable, this is a noop. + /// Calls the callback when specified cancellation token gets cancelled. The callback runs within actor's concurrency constrins. + /// If CancellationToken is non-cancellable, this is a noop. /// /// The CancellationToken to continue after /// The callback diff --git a/src/Proto.Actor/Context/IContextStore.cs b/src/Proto.Actor/Context/IContextStore.cs index b5eb2d468f..6811eef0ad 100644 --- a/src/Proto.Actor/Context/IContextStore.cs +++ b/src/Proto.Actor/Context/IContextStore.cs @@ -17,14 +17,14 @@ public interface IContextStore T? Get(); /// - /// Sets a value from the actor context + /// Sets a value on the actor context /// /// The value to set /// The Type key of the value void Set(T obj) => Set(obj); /// - /// Sets a value from the actor context + /// Sets a value on the actor context /// /// The value to set /// The Type key of the value diff --git a/src/Proto.Actor/Context/ISenderContext.cs b/src/Proto.Actor/Context/ISenderContext.cs index a8df119cd4..1ced3c0d41 100644 --- a/src/Proto.Actor/Context/ISenderContext.cs +++ b/src/Proto.Actor/Context/ISenderContext.cs @@ -35,7 +35,7 @@ public interface ISenderContext : IInfoContext /// /// The target PID /// The message to send - /// Message sender + /// Message sender that will receive the response void Request(PID target, object message, PID? sender); /// @@ -74,9 +74,9 @@ public static class SenderContextExtensions public static BatchContext CreateBatchContext(this ISenderContext context, int size, CancellationToken ct) => new(context, size, ct); /// - /// Sends a message together with a Sender PID, this allows the target to respond async to the Sender + /// Sends a message together with a Sender PID, this allows the target to respond async to the Sender. /// - /// the context used to issue the request + /// The context used to issue the request. Response will be sent back to self.Self. /// The target PID /// The message to send public static void Request(this ISenderContext self, PID target, object message) => @@ -118,7 +118,7 @@ public static async Task RequestAsync(this ISenderContext self, PID target /// the context used to issue the request /// The target PID /// The message to send - /// + /// Callback gets the request task passed in as a parameter /// /// Expected return message type public static void RequestReenter(this IContext self, PID target, object message, Func, Task> callback, CancellationToken ct) diff --git a/src/Proto.Actor/Context/ISpawnerContext.cs b/src/Proto.Actor/Context/ISpawnerContext.cs index fa35c55743..73fac23db8 100644 --- a/src/Proto.Actor/Context/ISpawnerContext.cs +++ b/src/Proto.Actor/Context/ISpawnerContext.cs @@ -12,7 +12,7 @@ namespace Proto; public interface ISpawnerContext : ISystemContext { /// - /// Spawns a new child actor based on props and named using the specified name. + /// Spawns a new child actor based on props and specified name. /// /// The Props used to spawn the actor /// The actor name diff --git a/src/Proto.Actor/Context/RootContext.cs b/src/Proto.Actor/Context/RootContext.cs index a4cd23e99f..e4fbebb275 100644 --- a/src/Proto.Actor/Context/RootContext.cs +++ b/src/Proto.Actor/Context/RootContext.cs @@ -16,6 +16,12 @@ namespace Proto; public interface IRootContext : ISpawnerContext, ISenderContext, IStopperContext { + /// + /// Add sender middleware to the root context. Every message sent through the root context will be passed through the middleware. + /// The middleware will overwrite any other middleware previously added to the root context. + /// + /// Middleware to use. First entry is the outermost middleware, while last entry is innermost. + /// IRootContext WithSenderMiddleware(params Func[] middleware); } diff --git a/src/Proto.Actor/Context/RootContextDecorator.cs b/src/Proto.Actor/Context/RootContextDecorator.cs index 715b6deee9..186d8814ed 100644 --- a/src/Proto.Actor/Context/RootContextDecorator.cs +++ b/src/Proto.Actor/Context/RootContextDecorator.cs @@ -12,6 +12,9 @@ // ReSharper disable once CheckNamespace namespace Proto; +/// +/// A base class for decorators that decorate (extend) a . +/// [PublicAPI] public abstract class RootContextDecorator : IRootContext { diff --git a/src/Proto.Actor/Context/SystemContext.cs b/src/Proto.Actor/Context/SystemContext.cs index b639561c83..da14eb24d8 100644 --- a/src/Proto.Actor/Context/SystemContext.cs +++ b/src/Proto.Actor/Context/SystemContext.cs @@ -12,6 +12,14 @@ public static class SystemContext { private static readonly ILogger Logger = Log.CreateLogger(nameof(SystemContext)); + /// + /// Spawns a system actor with the given name. + /// + /// + /// Props of the actor + /// Name of the actor (must start with $) + /// + /// public static PID SpawnNamedSystem(this IRootContext self, Props props, string name) { if (!name.StartsWith("$")) diff --git a/src/Proto.Actor/Deduplication/DeduplicationContext.cs b/src/Proto.Actor/Deduplication/DeduplicationContext.cs index 4b1387ba32..938f42fa55 100644 --- a/src/Proto.Actor/Deduplication/DeduplicationContext.cs +++ b/src/Proto.Actor/Deduplication/DeduplicationContext.cs @@ -12,8 +12,18 @@ namespace Proto.Deduplication; +/// +/// Extracts the deduplication key from the message. +/// +/// Type of the key +/// The key should be returned in this variable +/// Returns true if the key was successfully extracted, false otherwise public delegate bool TryGetDeduplicationKey(MessageEnvelope envelope, out T? key); +/// +/// A decorator for actor context that de-duplicates incoming messages based on the message's deduplication key. +/// +/// Type of the deduplication key public class DeduplicationContext : ActorContextDecorator where T : IEquatable { private readonly DeDuplicator _deDuplicator; diff --git a/src/Proto.Actor/DependencyInjection/DIExtension.cs b/src/Proto.Actor/DependencyInjection/DIExtension.cs index f070415498..68c06d1e02 100644 --- a/src/Proto.Actor/DependencyInjection/DIExtension.cs +++ b/src/Proto.Actor/DependencyInjection/DIExtension.cs @@ -20,6 +20,12 @@ public class DIExtension : IActorSystemExtension [PublicAPI] public static class Extensions { + /// + /// Adds the DI extension to the actor system, that helps to create Props based on the DI container. + /// + /// + /// Service provider to use to resolve actors + /// public static ActorSystem WithServiceProvider(this ActorSystem actorSystem, IServiceProvider serviceProvider) { var dependencyResolver = new DependencyResolver(serviceProvider); @@ -28,6 +34,11 @@ public static ActorSystem WithServiceProvider(this ActorSystem actorSystem, ISer return actorSystem; } + /// + /// Access the from the DI extension. Requires that the actor system was configured with . + /// + /// + /// // ReSharper disable once InconsistentNaming public static IDependencyResolver DI(this ActorSystem system) => system.Extensions.GetRequired().Resolver; } \ No newline at end of file diff --git a/src/Proto.Actor/DependencyInjection/IDependencyResolver.cs b/src/Proto.Actor/DependencyInjection/IDependencyResolver.cs index 5b0ef8cfad..578cc39a87 100644 --- a/src/Proto.Actor/DependencyInjection/IDependencyResolver.cs +++ b/src/Proto.Actor/DependencyInjection/IDependencyResolver.cs @@ -7,11 +7,28 @@ namespace Proto.DependencyInjection; +/// +/// Utility to create Props based on +/// public interface IDependencyResolver { + /// + /// Creates Props that resolve the actor by type, with possibility to supply additional constructor arguments + /// + /// Additional constructor arguments + /// Props PropsFor(params object[] args) where TActor : IActor; - + + /// + /// Creates Props that resolve the actor by type + /// + /// Props PropsFor() where TActor : IActor; + /// + /// Creates Props that resolve the actor by type + /// + /// + /// Props PropsFor(Type actorType); } \ No newline at end of file diff --git a/src/Proto.Actor/Diagnostics/DiagnosticTools.cs b/src/Proto.Actor/Diagnostics/DiagnosticTools.cs index 6d4a494889..6add1ab287 100644 --- a/src/Proto.Actor/Diagnostics/DiagnosticTools.cs +++ b/src/Proto.Actor/Diagnostics/DiagnosticTools.cs @@ -11,6 +11,12 @@ namespace Proto.Diagnostics; [PublicAPI] public static class DiagnosticTools { + /// + /// Asks an actor (or any other process) to provide diagnostics string by sending a message. + /// + /// + /// + /// public static async Task GetDiagnosticsString(ActorSystem system, PID pid) { var tcs = new TaskCompletionSource(); diff --git a/src/Proto.Actor/Diagnostics/IActorDiagnostics.cs b/src/Proto.Actor/Diagnostics/IActorDiagnostics.cs index 75ef319d23..b3697d3b20 100644 --- a/src/Proto.Actor/Diagnostics/IActorDiagnostics.cs +++ b/src/Proto.Actor/Diagnostics/IActorDiagnostics.cs @@ -5,7 +5,14 @@ // ----------------------------------------------------------------------- namespace Proto.Diagnostics; +/// +/// Adds the ability to return a diagnostic string for an actor +/// public interface IActorDiagnostics { + /// + /// Return a diagnostic string for the actor + /// + /// string GetDiagnosticsString(); } \ No newline at end of file diff --git a/src/Proto.Actor/EventStream/DeadLetter.cs b/src/Proto.Actor/EventStream/DeadLetter.cs index 13b0eee107..2d166c8532 100644 --- a/src/Proto.Actor/EventStream/DeadLetter.cs +++ b/src/Proto.Actor/EventStream/DeadLetter.cs @@ -12,6 +12,11 @@ // ReSharper disable once CheckNamespace namespace Proto; +/// +/// A wrapper for a message that could not be delivered to the original recipient. Such message is wrapped in +/// a by the and forwarded +/// to the +/// [PublicAPI] public class DeadLetterEvent { @@ -27,15 +32,34 @@ public DeadLetterEvent(PID pid, object message, PID? sender, MessageHeader? head Header = header ?? MessageHeader.Empty; } + /// + /// The PID of the actor that was the original recipient of the message. + /// public PID Pid { get; } + + /// + /// The message that could not be delivered to the original recipient. + /// public object Message { get; } + + /// + /// Sender of the message. + /// public PID? Sender { get; } + + /// + /// Headers of the message. + /// public MessageHeader Header { get; } public override string ToString() => $"DeadLetterEvent: [ Pid: {Pid}, Message: {Message.GetType()}:{Message}, Sender: {Sender}, Headers: {Header} ]"; } +/// +/// A process that receives messages, that cannot be handled by the original recipients e.g. because they have been stopped. +/// The message is then forwarded to the as a +/// public class DeadLetterProcess : Process { public DeadLetterProcess(ActorSystem system) : base(system) diff --git a/src/Proto.Actor/EventStream/EventStream.cs b/src/Proto.Actor/EventStream/EventStream.cs index f6c744fb98..a3c6ea8ed9 100644 --- a/src/Proto.Actor/EventStream/EventStream.cs +++ b/src/Proto.Actor/EventStream/EventStream.cs @@ -16,6 +16,9 @@ namespace Proto; +/// +/// Event stream global to an actor system. +/// [PublicAPI] public class EventStream : EventStream { @@ -53,7 +56,7 @@ internal EventStream(ActorSystem system) } /// -/// Global event stream of a specific message type +/// Event stream of a specific message type global to an actor system. /// /// Message type [PublicAPI] @@ -68,7 +71,7 @@ internal EventStream() } /// - /// Subscribe to the specified message type + /// Subscribe to messages /// /// Synchronous message handler /// Optional: the dispatcher, will use by default @@ -88,7 +91,7 @@ public EventStreamSubscription Subscribe(Action action, IDispatcher? dispa } /// - /// Subscribe to the specified message type and yields the result onto a Channel + /// Subscribe to messages and yields the result onto a Channel /// /// a Channel which receives the event /// Optional: the dispatcher, will use by default @@ -107,7 +110,7 @@ public EventStreamSubscription Subscribe(Channel channel, IDispatcher? dis } /// - /// Subscribe to the specified message type with an asynchronous handler + /// Subscribe to messages with an asynchronous handler /// /// Asynchronous message handler /// Optional: the dispatcher, will use by default @@ -120,7 +123,7 @@ public EventStreamSubscription Subscribe(Func action, IDispatcher? d } /// - /// Subscribe to the specified message type, which is a derived type from + /// Subscribe to a message type, which is a derived type from /// /// Synchronous message handler /// Optional: the dispatcher, will use by default @@ -143,7 +146,7 @@ public EventStreamSubscription Subscribe(Action action, IDispatch } /// - /// Subscribe to the specified message type, which is a derived type from + /// Subscribe to a message type, which is a derived type from /// /// Additional filter upon the typed message /// Synchronous message handler @@ -217,7 +220,7 @@ public EventStreamSubscription Subscribe(Func action, IDisp } /// - /// Publish a message to the event stream + /// Publish a message to the event stream /// /// A message to publish public void Publish(T msg) @@ -243,13 +246,13 @@ public void Publish(T msg) } /// - /// Remove a subscription by id + /// Remove a subscription by id /// /// Subscription id public void Unsubscribe(Guid id) => _subscriptions.TryRemove(id, out _); /// - /// Remove a subscription + /// Remove a subscription /// /// A subscription to remove public void Unsubscribe(EventStreamSubscription? subscription) diff --git a/src/Proto.Actor/Extensions/ActorSystemExtensions.cs b/src/Proto.Actor/Extensions/ActorSystemExtensions.cs index 0587dba62c..db542a7e47 100644 --- a/src/Proto.Actor/Extensions/ActorSystemExtensions.cs +++ b/src/Proto.Actor/Extensions/ActorSystemExtensions.cs @@ -8,6 +8,9 @@ namespace Proto.Extensions; +/// +/// Contains extensions for the actor system. Examples: Cluster, PubSub, etc. +/// public class ActorSystemExtensions { private readonly ActorSystem _actorSystem; @@ -16,12 +19,24 @@ public class ActorSystemExtensions public ActorSystemExtensions(ActorSystem actorSystem) => _actorSystem = actorSystem; + /// + /// Gets the extension by the given type. + /// + /// + /// public T? Get() where T : IActorSystemExtension { var id = IActorSystemExtension.Id; return (T) _extensions[id]; } + /// + /// Gets the extension by the given type or throws if not found. + /// + /// Message to put on the exception + /// + /// + /// public T GetRequired(string? notFoundMessage=null) where T : IActorSystemExtension { var id = IActorSystemExtension.Id; @@ -35,6 +50,11 @@ public T GetRequired(string? notFoundMessage=null) where T : IActorSystemExte return res; } + /// + /// Registers a new extension by its type. + /// + /// Extension to register + /// public void Register(IActorSystemExtension extension) where T : IActorSystemExtension { lock (_lockObject) diff --git a/src/Proto.Actor/Extensions/IActorSystemExtension.cs b/src/Proto.Actor/Extensions/IActorSystemExtension.cs index 31c1d09030..b871a97e2d 100644 --- a/src/Proto.Actor/Extensions/IActorSystemExtension.cs +++ b/src/Proto.Actor/Extensions/IActorSystemExtension.cs @@ -7,6 +7,9 @@ namespace Proto.Extensions; +/// +/// Marks a class as an actor system extension +/// public interface IActorSystemExtension { private static int _nextId; @@ -14,6 +17,9 @@ public interface IActorSystemExtension internal static int GetNextId() => Interlocked.Increment(ref _nextId); } +/// +/// Marks a class as an actor system extension +/// public interface IActorSystemExtension : IActorSystemExtension where T : IActorSystemExtension { public static int Id = GetNextId(); diff --git a/src/Proto.Actor/FunctionActor.cs b/src/Proto.Actor/FunctionActor.cs index 3de7cd3d2b..e041aa1542 100644 --- a/src/Proto.Actor/FunctionActor.cs +++ b/src/Proto.Actor/FunctionActor.cs @@ -7,7 +7,9 @@ namespace Proto; -//this is used when creating actors from a Func +/// +/// Used when creating actors from a Func +/// class FunctionActor : IActor { private readonly Receive _receive; diff --git a/src/Proto.Actor/Future/Futures.cs b/src/Proto.Actor/Future/Futures.cs index a7b1ec31ed..eb38bbaba9 100644 --- a/src/Proto.Actor/Future/Futures.cs +++ b/src/Proto.Actor/Future/Futures.cs @@ -12,11 +12,30 @@ namespace Proto.Future; +/// +/// A future allows to asynchronously wait for a value to be available. +/// The value is sent to the future by some other process using the future's . +/// It is used e.g. to provide a request-response abstraction on top of asynchronous messaging. +/// public interface IFuture : IDisposable { + /// + /// Future's PID. + /// public PID Pid { get; } + + /// + /// A task that will be completed when the future is receives the expected value. The expected value is then + /// available in . + /// public Task Task { get; } + /// + /// A task that will be completed when the future is receives the expected value or provided cancellation token is cancelled. + /// The value is available in if the task was completed successfully. + /// + /// + /// public Task GetTask(CancellationToken cancellationToken); } diff --git a/src/Proto.Actor/Logging/InstanceLogger.cs b/src/Proto.Actor/Logging/InstanceLogger.cs index c683234c23..d5263d1089 100644 --- a/src/Proto.Actor/Logging/InstanceLogger.cs +++ b/src/Proto.Actor/Logging/InstanceLogger.cs @@ -11,6 +11,10 @@ namespace Proto.Logging; +/// +/// A logging abstraction that stores the log entries in the (in memory) and/or writes them a . +/// Mostly used for testing and debugging. +/// [PublicAPI] public class InstanceLogger : IActorSystemExtension { @@ -19,10 +23,25 @@ public class InstanceLogger : IActorSystemExtension private readonly LogStore? _logStore; private readonly string _category; + /// + /// Get new InstanceLogger with the calling member name appended to the category hierarchy. + /// + /// Auto filled + /// public InstanceLogger BeginMethodScope([CallerMemberName] string caller = "") => new(_logLevel, _logStore, _logger, $"{_category}/{caller}"); + /// + /// Get new InstanceLogger with the name of appended to the category hierarchy. + /// + /// + /// public InstanceLogger BeginScope() => new(_logLevel, _logStore, _logger, typeof(T).Name); + /// + /// Get new InstanceLogger with the postfix appended to the category hierarchy. + /// + /// Postfix + /// public InstanceLogger BeginScope(string category) => new(_logLevel, _logStore, _logger, $"{_category}/{category}"); public InstanceLogger(LogLevel logLevel, LogStore? logStore = null, ILogger? logger = null, string category = "default") diff --git a/src/Proto.Actor/Logging/Log.cs b/src/Proto.Actor/Logging/Log.cs index a7d712a6b1..7cd427ab32 100644 --- a/src/Proto.Actor/Logging/Log.cs +++ b/src/Proto.Actor/Logging/Log.cs @@ -10,16 +10,37 @@ // ReSharper disable once CheckNamespace namespace Proto; +/// +/// Utility to create a from a globally shared . +/// [PublicAPI] public static class Log { private static ILoggerFactory _loggerFactory = new NullLoggerFactory(); + /// + /// Configure the global logger factory. + /// + /// public static void SetLoggerFactory(ILoggerFactory loggerFactory) => _loggerFactory = loggerFactory; + /// + /// Get the global logger factory. + /// + /// public static ILoggerFactory GetLoggerFactory() => _loggerFactory; + /// + /// Create a logger for the given category name. + /// + /// + /// public static ILogger CreateLogger(string categoryName) => _loggerFactory.CreateLogger(categoryName); + /// + /// Create a logger for the given type. + /// + /// + /// public static ILogger CreateLogger() => _loggerFactory.CreateLogger(); } \ No newline at end of file diff --git a/src/Proto.Actor/Logging/LogStore.cs b/src/Proto.Actor/Logging/LogStore.cs index a1507f7161..18208e5cd5 100644 --- a/src/Proto.Actor/Logging/LogStore.cs +++ b/src/Proto.Actor/Logging/LogStore.cs @@ -12,6 +12,9 @@ namespace Proto.Logging; +/// +/// Stores and queries log entries created by the +/// [PublicAPI] public class LogStore { diff --git a/src/Proto.Actor/Mailbox/Dispatcher.cs b/src/Proto.Actor/Mailbox/Dispatcher.cs index 68956ef5de..eb86dbd691 100644 --- a/src/Proto.Actor/Mailbox/Dispatcher.cs +++ b/src/Proto.Actor/Mailbox/Dispatcher.cs @@ -20,19 +20,39 @@ public interface IMessageInvoker void EscalateFailure(Exception reason, object? message); } +/// +/// An abstraction for running tasks +/// public interface IDispatcher { + /// + /// Used by mailbox to determine how many messages to process before yielding + /// int Throughput { get; } + /// + /// Runs the task + /// + /// void Schedule(Func runner); } public static class Dispatchers { + /// + /// Schedules the task on the + /// public static ThreadPoolDispatcher DefaultDispatcher { get; } = new(); + + /// + /// Runs and awaits the task + /// public static SynchronousDispatcher SynchronousDispatcher { get; } = new(); } +/// +/// Runs and awaits the task +/// public sealed class SynchronousDispatcher : IDispatcher { private const int DefaultThroughput = 300; @@ -44,6 +64,9 @@ public sealed class SynchronousDispatcher : IDispatcher public void Schedule(Func runner) => runner().Wait(); } +/// +/// Schedules the task on the +/// public sealed class ThreadPoolDispatcher : IDispatcher { private const int DefaultThroughput = 300; @@ -76,6 +99,9 @@ public void Schedule(Func runner) => public int Throughput { get; } } +/// +/// Throws instead of running the task +/// class NoopDispatcher : IDispatcher { internal static readonly IDispatcher Instance = new NoopDispatcher(); diff --git a/src/Proto.Actor/Messages/IAutoRespond.cs b/src/Proto.Actor/Messages/IAutoRespond.cs index f95f17449a..1040c00432 100644 --- a/src/Proto.Actor/Messages/IAutoRespond.cs +++ b/src/Proto.Actor/Messages/IAutoRespond.cs @@ -5,7 +5,15 @@ // ----------------------------------------------------------------------- namespace Proto; +/// +/// A message implementing with IAutoRespond will not be passed to actor's Receive method, but instead an automatic response will be returned to the sender. +/// public interface IAutoRespond { + /// + /// Gets the automatic response to return to the sender + /// + /// + /// object GetAutoResponse(IContext context); } \ No newline at end of file diff --git a/src/Proto.Actor/Messages/IMessageBatch.cs b/src/Proto.Actor/Messages/IMessageBatch.cs index 0fe565f10f..a614fdf728 100644 --- a/src/Proto.Actor/Messages/IMessageBatch.cs +++ b/src/Proto.Actor/Messages/IMessageBatch.cs @@ -7,8 +7,16 @@ namespace Proto; +/// +/// Marks a message as a batch. This batch will be unpacked into the recipient's queue. If additionally the batch message implements +/// , the batch message itself will be posted into the recipient's queue immediately after unpacked messages. +/// public interface IMessageBatch { // ReSharper disable once ReturnTypeCanBeEnumerable.Global + /// + /// Unpack the messages from the batch + /// + /// Collection of messages in the batch IReadOnlyCollection GetMessages(); } \ No newline at end of file diff --git a/src/Proto.Actor/Messages/MessageEnvelope.cs b/src/Proto.Actor/Messages/MessageEnvelope.cs index 8b338cff91..a4afa85269 100644 --- a/src/Proto.Actor/Messages/MessageEnvelope.cs +++ b/src/Proto.Actor/Messages/MessageEnvelope.cs @@ -9,9 +9,18 @@ namespace Proto; +/// +/// Adds headers and sender information to a message. +/// [PublicAPI] public record MessageEnvelope { + /// + /// Creates a new message envelope. + /// + /// Message to wrap + /// Sender + /// Headers public MessageEnvelope(object message, PID? sender, MessageHeader? header = null) { Sender = sender; @@ -19,25 +28,67 @@ public MessageEnvelope(object message, PID? sender, MessageHeader? header = null Header = header ?? MessageHeader.Empty; } + /// + /// Message sender + /// public PID? Sender { get; init; } + + /// + /// Wrapped message + /// public object Message { get; init; } + + /// + /// Message headers + /// public MessageHeader Header { get; init; } + /// + /// Creates a new message envelope. + /// + /// Message to wrap + /// public static MessageEnvelope Wrap(object message) => message is MessageEnvelope env ? env : new MessageEnvelope(message, null); + /// + /// Creates a new message envelope. + /// + /// Message to wrap + /// Message headers + /// public static MessageEnvelope Wrap(object message, MessageHeader header) => message is MessageEnvelope env ? env.MergeHeader(header) : new MessageEnvelope(message, null, header); + /// + /// Adds a sender to the message envelope. + /// + /// + /// New envelope public MessageEnvelope WithSender(PID sender) => this with {Sender = sender}; + /// + /// Adds the wrapped message to the message envelope. + /// + /// + /// New envelope public MessageEnvelope WithMessage(object message) => this with {Message = message}; + /// + /// Adds the headers to the message envelope. + /// + /// + /// New envelope public MessageEnvelope WithHeader(MessageHeader header) => this with {Header = header}; + /// + /// Extends the message envelope with additional headers. + /// + /// + /// New envelope public MessageEnvelope MergeHeader(MessageHeader header) { if (header.Count == 0) @@ -48,30 +99,61 @@ public MessageEnvelope MergeHeader(MessageHeader header) return this with {Header = Header.With(header)}; } + /// + /// Adds a header to the message envelope. + /// + /// + /// + /// New envelope public MessageEnvelope WithHeader(string key, string value) { var header = Header.With(key, value); return this with {Header = header}; } + /// + /// Extends the message envelope with additional headers. + /// + /// + /// New envelope public MessageEnvelope WithHeaders(IEnumerable> items) { var header = Header.With(items); return this with {Header = header}; } + /// + /// Unpacks the message envelope to a tuple. If provided message is not a , it is returned as is. + /// + /// or any other message object + /// public static (object message, PID? sender, MessageHeader headers) Unwrap(object message) => message is MessageEnvelope envelope ? (envelope.Message, envelope.Sender, envelope.Header) : (message, null, MessageHeader.Empty); + /// + /// Returns the message headers if provided message is a . + /// + /// or any other message object + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public static MessageHeader UnwrapHeader(object? message) => (message as MessageEnvelope)?.Header ?? MessageHeader.Empty; + /// + /// Returns the wrapped message if provided message is a . + /// + /// or any other message object + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public static object? UnwrapMessage(object? message) => message is MessageEnvelope r ? r.Message : message; + /// + /// Returns the message sender if provided message is a . + /// + /// or any other message object + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public static PID? UnwrapSender(object? message) => (message as MessageEnvelope)?.Sender; } \ No newline at end of file diff --git a/src/Proto.Actor/Messages/MessageHeader.cs b/src/Proto.Actor/Messages/MessageHeader.cs index 06dbe8c414..d17b992b32 100644 --- a/src/Proto.Actor/Messages/MessageHeader.cs +++ b/src/Proto.Actor/Messages/MessageHeader.cs @@ -11,6 +11,9 @@ // ReSharper disable once CheckNamespace namespace Proto; +/// +/// A collection of message headers +/// [PublicAPI] public record MessageHeader : IReadOnlyDictionary { diff --git a/src/Proto.Actor/Messages/Messages.cs b/src/Proto.Actor/Messages/Messages.cs index d13cdc2c36..1095c4109c 100644 --- a/src/Proto.Actor/Messages/Messages.cs +++ b/src/Proto.Actor/Messages/Messages.cs @@ -10,20 +10,30 @@ // ReSharper disable once CheckNamespace namespace Proto; -//marker interface for all built in message types +/// +/// Marker interface for all built in message types +/// public interface InfrastructureMessage { } - -//messages with this marker interface should not be deadletter logged + +/// +/// Marker interface for all built in message types +/// public interface IIgnoreDeadLetterLogging { } +/// +/// Notifies about actor termination, used together with +/// public sealed partial class Terminated : SystemMessage { } +/// +/// Notifies about actor restarting +/// public sealed class Restarting : InfrastructureMessage { public static readonly Restarting Instance = new(); @@ -33,6 +43,9 @@ private Restarting() } } +/// +/// Diagnostic message to determine if an actor is responsive. Mostly used for debugging problems. +/// public sealed partial class Touch : IAutoRespond, InfrastructureMessage { public object GetAutoResponse(IContext context) => new Touched() @@ -41,11 +54,17 @@ public sealed partial class Touch : IAutoRespond, InfrastructureMessage }; } +/// +/// A user-level message that signals the actor to stop. +/// public sealed partial class PoisonPill : IIgnoreDeadLetterLogging, InfrastructureMessage { public static readonly PoisonPill Instance = new(); } +/// +/// Signals failure up the supervision hierarchy. +/// public class Failure : SystemMessage { public Failure(PID who, Exception reason, RestartStatistics crs, object? message) @@ -62,16 +81,25 @@ public Failure(PID who, Exception reason, RestartStatistics crs, object? message public object? Message { get; } } +/// +/// A message to subscribe to actor termination, used togeter with +/// public sealed partial class Watch : SystemMessage { public Watch(PID watcher) => Watcher = watcher; } +/// +/// Unsubscribe from the termination notifications of the specified actor. +/// public sealed partial class Unwatch : SystemMessage { public Unwatch(PID watcher) => Watcher = watcher; } +/// +/// Signals the actor to restart +/// public sealed class Restart : SystemMessage { public Restart(Exception reason) => Reason = reason; @@ -79,11 +107,17 @@ public sealed class Restart : SystemMessage public Exception Reason { get; } } +/// +/// A system-level message that signals the actor to stop. +/// public partial class Stop : SystemMessage, IIgnoreDeadLetterLogging { public static readonly Stop Instance = new(); } +/// +/// A message sent to the actor to indicate that it is about to stop. Handle this message in order to clean up. +/// public sealed class Stopping : SystemMessage { public static readonly Stopping Instance = new(); @@ -93,6 +127,9 @@ private Stopping() } } +/// +/// A message sent to the actor to indicate that it has started. Handle this message to run additional initialization logic. +/// public sealed class Started : SystemMessage { public static readonly Started Instance = new(); @@ -102,6 +139,9 @@ private Started() } } +/// +/// A message sent to the actor to indicate that it has stopped. +/// public sealed class Stopped : SystemMessage { public static readonly Stopped Instance = new(); @@ -111,6 +151,9 @@ private Stopped() } } +/// +/// When receive timeout expires, this message is sent to the actor to notify it. See +/// public class ReceiveTimeout : SystemMessage { public static readonly ReceiveTimeout Instance = new(); @@ -120,10 +163,16 @@ private ReceiveTimeout() } } +/// +/// Messages marked with this interface will not reset the receive timeout timer. See +/// public interface INotInfluenceReceiveTimeout { } +/// +/// Related to reentrancy, this message is sent to the actor after the awaited task is finished and actor can handle the result. See +/// public class Continuation : SystemMessage { public Continuation(Func? fun, object? message, IActor actor) @@ -140,8 +189,12 @@ public Continuation(Func? fun, object? message, IActor actor) public IActor Actor { get; } } +/// +/// Request diagnostic information for the actor +/// +/// public record ProcessDiagnosticsRequest(TaskCompletionSource Result) : SystemMessage; - + public static class Nothing { public static readonly Google.Protobuf.WellKnownTypes.Empty Instance = new(); diff --git a/src/Proto.Actor/PID.cs b/src/Proto.Actor/PID.cs index ef953750c6..b259617598 100644 --- a/src/Proto.Actor/PID.cs +++ b/src/Proto.Actor/PID.cs @@ -8,11 +8,19 @@ namespace Proto; +/// +/// PID is a reference to an actor (or any other process). It consists of actor system address and an identifier. +/// // ReSharper disable once InconsistentNaming public partial class PID : ICustomDiagnosticMessage { private Process? _process; + /// + /// Creates a new PID instance from address and identifier. + /// + /// Actor system address + /// Actor identifier public PID(string address, string id) { Address = address; @@ -23,6 +31,11 @@ public PID(string address, string id) public string ToDiagnosticString() => $"{Address}/{Id}"; + /// + /// Creates a new PID instance from address and identifier. + /// + /// Actor system address + /// Actor identifier public static PID FromAddress(string address, string id) => new(address, id); internal Process? Ref(ActorSystem system) @@ -54,12 +67,21 @@ public void SendSystemMessage(ActorSystem system, SystemMessage sys) reff.SendSystemMessage(this, sys); } + /// + /// Stops the referenced actor. + /// + /// Actor system this PID belongs to public void Stop(ActorSystem system) { var reff = _process ?? system.ProcessRegistry.Get(this); reff.Stop(this); } + /// + /// Used internally to track requests in context of shared futures and future batches. + /// + /// + /// public PID WithRequestId(uint requestId) => new() { Id = Id, diff --git a/src/Proto.Actor/Process/ProcessRegistry.cs b/src/Proto.Actor/Process/ProcessRegistry.cs index 8a2fd5634b..f04c70c8c2 100644 --- a/src/Proto.Actor/Process/ProcessRegistry.cs +++ b/src/Proto.Actor/Process/ProcessRegistry.cs @@ -11,6 +11,9 @@ // ReSharper disable once CheckNamespace namespace Proto; +/// +/// Manages all processes in the actor system (actors, futures, event stream, etc.). +/// public class ProcessRegistry { private readonly List> _hostResolvers = new(); diff --git a/src/Proto.Actor/Props/Props.cs b/src/Proto.Actor/Props/Props.cs index 9ab33ac91d..3bc6f1172f 100644 --- a/src/Proto.Actor/Props/Props.cs +++ b/src/Proto.Actor/Props/Props.cs @@ -12,32 +12,68 @@ namespace Proto; +/// +/// Use Props to specify how a new actor should be created. +/// [PublicAPI] public sealed record Props { private static IActor NullProducer(ActorSystem _, IContext __) => null!; + public static readonly Props Empty = new(); + /// + /// Delegate used to create the actor. + /// public ProducerWithSystemAndContext Producer { get; init; } = NullProducer; + + /// + /// Deletegate used to create the mailbox. + /// public MailboxProducer MailboxProducer { get; init; } = () => UnboundedMailbox.Create(); + + /// + /// Used when actor is spawned at the root of the system. A guardian process will be created to handle failures of this actor, + /// according to the supervision strategy specified here. + /// public ISupervisorStrategy? GuardianStrategy { get; init; } + + /// + /// Supervision strategy for handling failures in actor's children. + /// public ISupervisorStrategy SupervisorStrategy { get; init; } = Supervision.DefaultStrategy; + + /// + /// Dispatcher to be used by the actor's mailbox. + /// public IDispatcher Dispatcher { get; init; } = Dispatchers.DefaultDispatcher; + /// + /// Middleware used when receiving a message + /// public ImmutableList> ReceiverMiddleware { get; init; } = ImmutableList>.Empty; + /// + /// Middleware used when sending a message + /// public ImmutableList> SenderMiddleware { get; init; } = ImmutableList>.Empty; public Receiver? ReceiverMiddlewareChain { get; init; } public Sender? SenderMiddlewareChain { get; init; } + /// + /// List of decorators for the actor context + /// public ImmutableList> ContextDecorator { get; init; } = ImmutableList>.Empty; public Func? ContextDecoratorChain { get; init; } + /// + /// Delegate that creates the actor and wires it with context and mailbox. + /// public Spawner Spawner { get; init; } = DefaultSpawner; private static IContext DefaultContextDecorator(IContext context) => context; @@ -132,9 +168,39 @@ public Props WithSpawner(Spawner spawner) => internal PID Spawn(ActorSystem system, string name, PID? parent, Action? callback=null) => Spawner(system, name, this, parent, callback); + /// + /// Props that spawn actors by calling the provided producer delegate. + /// + /// Returns a new instance of the actor + /// + /// + /// + /// var props = Props.FromProducer(() => new MyActor()); + /// + /// public static Props FromProducer(Producer producer) => Empty.WithProducer(_ => producer()); + /// + /// Props that spawn actors by calling the provided producer delegate. + /// + /// Returns a new instance of the actor. Gets as a parameter + /// public static Props FromProducer(ProducerWithSystem producer) => Empty.WithProducer(producer); + /// + /// Props that spawn actors based on provided delegate implementation. Useful when you don't want to create an actor class. + /// The Receive delegate will be wrapped in a instance. + /// + /// + /// + /// + /// + /// var props = Props.FromFunc(ctx => { + /// if (ctx.Message is Hello msg) + /// Console.WriteLine($"Hello {msg.Name}"); + /// return Task.CompletedTask; + /// }); + /// + /// public static Props FromFunc(Receive receive) => FromProducer(() => new FunctionActor(receive)); } \ No newline at end of file diff --git a/src/Proto.Actor/Router/IHashable.cs b/src/Proto.Actor/Router/IHashable.cs index 0d9fe5070e..247a65006e 100644 --- a/src/Proto.Actor/Router/IHashable.cs +++ b/src/Proto.Actor/Router/IHashable.cs @@ -5,7 +5,14 @@ // ----------------------------------------------------------------------- namespace Proto.Router; +/// +/// Adds a capability to a message to return a hash key. Used in conjunction with routers. +/// public interface IHashable { + /// + /// Return the hash key for this message. + /// + /// string HashBy(); } \ No newline at end of file diff --git a/src/Proto.Actor/Router/RouterExtensions.cs b/src/Proto.Actor/Router/RouterExtensions.cs index 17db35d9d0..68ed776d20 100644 --- a/src/Proto.Actor/Router/RouterExtensions.cs +++ b/src/Proto.Actor/Router/RouterExtensions.cs @@ -10,12 +10,35 @@ namespace Proto.Router; public static class RouterExtensions { + /// + /// Creates props for a router, that broadcasts messages to all of its routees. + /// + /// Context to send the messages through + /// List of routee + /// public static Props NewBroadcastGroup(this ISenderContext senderContext, params PID[] routees) => new BroadcastGroupRouterConfig(senderContext, routees).Props(); + /// + /// Creates props for a router, that routes the messages to the routees by calculating the hash of the message key and finding a routee on a hash ring. + /// The message has to implement . Uses as hash function. + /// + /// Context to send the messages through + /// List of routee + /// public static Props NewConsistentHashGroup(this ISenderContext senderContext, params PID[] routees) => new ConsistentHashGroupRouterConfig(senderContext, MurmurHash2.Hash, 100, null, routees).Props(); + /// + /// Creates props for a router, that routes the messages to the routees by calculating the hash of the message key and finding a routee on a hash ring. + /// If the message is , then the key extracted with takes precedence. Otherwise, the hash key + /// is extracted from the message with a provided delegate. + /// Uses as hash function. + /// + /// Context to send the messages through + /// Gets the message and returns a hash key for it. + /// List of routee List of routee + /// public static Props NewConsistentHashGroup( this ISenderContext senderContext, Func messageHasher, @@ -24,6 +47,15 @@ params PID[] routees => new ConsistentHashGroupRouterConfig(senderContext, MurmurHash2.Hash, 100, messageHasher, routees) .Props(); + /// + /// Creates props for a router, that routes the messages to the routees by calculating the hash of the message key and finding a routee on a hash ring. + /// The message has to implement . + /// + /// Context to send the messages through + /// Hashing function + /// Number of virtual copies of the routee PID on the hash ring + /// List of routee + /// public static Props NewConsistentHashGroup( this ISenderContext senderContext, Func hash, @@ -32,6 +64,17 @@ params PID[] routees ) => new ConsistentHashGroupRouterConfig(senderContext, hash, replicaCount, null, routees).Props(); + /// + /// Creates props for a router, that routes the messages to the routees by calculating the hash of the message key and finding a routee on a hash ring. + /// If the message is , then the key extracted with takes precedence. Otherwise, the hash key + /// is extracted from the message with a provided delegate. + /// + /// Context to send the messages through + /// Hashing function + /// Number of virtual copies of the routee on the hash ring + /// Gets the message and returns a hash key for it. + /// List of routee + /// public static Props NewConsistentHashGroup( this ISenderContext senderContext, Func hash, @@ -41,18 +84,56 @@ params PID[] routees ) => new ConsistentHashGroupRouterConfig(senderContext, hash, replicaCount, messageHasher, routees).Props(); + /// + /// Creates props for a router, that routes messages to a random routee. + /// + /// Context to send the messages through + /// List of routee + /// public static Props NewRandomGroup(this ISenderContext senderContext, params PID[] routees) => new RandomGroupRouterConfig(senderContext, routees).Props(); + /// + /// Creates props for a router, that routes messages to a random routee. + /// + /// Context to send the messages through + /// Random seed + /// List of routee + /// public static Props NewRandomGroup(this ISenderContext senderContext, int seed, params PID[] routees) => new RandomGroupRouterConfig(senderContext, seed, routees).Props(); + /// + /// Creates props for a router, that routes messages to its routees in a round robin fashion. + /// + /// Context to send the messages through + /// List of routee + /// public static Props NewRoundRobinGroup(this ISenderContext senderContext, params PID[] routees) => new RoundRobinGroupRouterConfig(senderContext, routees).Props(); + /// + /// Creates props for a router that broadcasts the message to all the actors in the pool it maintains. + /// + /// Context to send the messages through + /// Props to spawn actors - members of the pool + /// Size of the pool + /// public static Props NewBroadcastPool(this ISenderContext senderContext, Props props, int poolSize) => new BroadcastPoolRouterConfig(senderContext, poolSize, props).Props(); + /// + /// Creates props for a router, that routes the messages to the pool of actors it maintains by calculating the hash of the message key and finding a pool member on a hash ring. + /// If the message is , then the key extracted with takes precedence. Otherwise, the hash key + /// is extracted from the message with a provided delegate. + /// + /// Context to send the messages through + /// Props to spawn actors - members of the pool + /// Size of the pool + /// Hashing function + /// Number of virtual copies of the pool member on the hash ring + /// Gets the message and returns a hash key for it. + /// public static Props NewConsistentHashPool( this ISenderContext senderContext, Props props, @@ -66,6 +147,14 @@ public static Props NewConsistentHashPool( ) .Props(); + /// + /// Creates props for a router, that routes messages to a random member of the pool it maintains. + /// + /// Context to send the messages through + /// Props to spawn actors - members of the pool + /// Size of the pool + /// Random seed + /// public static Props NewRandomPool( this ISenderContext senderContext, Props props, @@ -74,6 +163,13 @@ public static Props NewRandomPool( ) => new RandomPoolRouterConfig(senderContext, poolSize, props, seed).Props(); + /// + /// Creates props for a router, that routes messages to member of the pool it maintains in a round robin fashion. + /// + /// Context to send the messages through + /// Props to spawn actors - members of the pool + /// Size of the pool + /// public static Props NewRoundRobinPool(this ISenderContext senderContext, Props props, int poolSize) => new RoundRobinPoolRouterConfig(senderContext, poolSize, props).Props(); } \ No newline at end of file diff --git a/src/Proto.Actor/Stashing/CapturedContext.cs b/src/Proto.Actor/Stashing/CapturedContext.cs index a832668a07..35907997dd 100644 --- a/src/Proto.Actor/Stashing/CapturedContext.cs +++ b/src/Proto.Actor/Stashing/CapturedContext.cs @@ -7,6 +7,11 @@ namespace Proto; +/// +/// Holds a reference to actor context and a message +/// +/// Message to store +/// Context to store public record CapturedContext(MessageEnvelope MessageEnvelope, IContext Context){ public async Task Receive() { @@ -15,5 +20,8 @@ public async Task Receive() current.Apply(); } + /// + /// Restores the stored message to the actor context so that it can be re-processed by the actor + /// public void Apply() => Context.Apply(this); } \ No newline at end of file diff --git a/src/Proto.Actor/Supervision/AllForOneStrategy.cs b/src/Proto.Actor/Supervision/AllForOneStrategy.cs index 5b187ded73..78e2816b72 100644 --- a/src/Proto.Actor/Supervision/AllForOneStrategy.cs +++ b/src/Proto.Actor/Supervision/AllForOneStrategy.cs @@ -11,8 +11,8 @@ namespace Proto; /// -/// AllForOneStrategy returns a new SupervisorStrategy which applies the given fault Directive from the decider to the -/// failing child and all its children. +/// Supervision strategy that applies the supervision directive to all the children. +/// See One-For-One strategy vs All-For-One strategy /// This strategy is appropriate when the children have a strong dependency, such that and any single one failing would /// place them all into a potentially invalid state. /// @@ -23,6 +23,12 @@ public class AllForOneStrategy : ISupervisorStrategy private readonly int _maxNrOfRetries; private readonly TimeSpan? _withinTimeSpan; + /// + /// Creates a new instance of the + /// + /// A delegate that provided with failing child and the exception returns a + /// Number of restart retries before stopping the the children of the supervisor + /// A time window to count in public AllForOneStrategy(Decider decider, int maxNrOfRetries, TimeSpan? withinTimeSpan) { _decider = decider; diff --git a/src/Proto.Actor/Supervision/AlwaysRestartStrategy.cs b/src/Proto.Actor/Supervision/AlwaysRestartStrategy.cs index 578e433e2c..5ca4cffdc9 100644 --- a/src/Proto.Actor/Supervision/AlwaysRestartStrategy.cs +++ b/src/Proto.Actor/Supervision/AlwaysRestartStrategy.cs @@ -8,6 +8,9 @@ // ReSharper disable once CheckNamespace namespace Proto; +/// +/// A supervision strategy that always restarts all of the children of the actor. +/// public class AlwaysRestartStrategy : ISupervisorStrategy { //always restart diff --git a/src/Proto.Actor/Supervision/ExponentialBackoffStrategy.cs b/src/Proto.Actor/Supervision/ExponentialBackoffStrategy.cs index 67c18e58af..0419b63a3d 100644 --- a/src/Proto.Actor/Supervision/ExponentialBackoffStrategy.cs +++ b/src/Proto.Actor/Supervision/ExponentialBackoffStrategy.cs @@ -9,12 +9,20 @@ // ReSharper disable once CheckNamespace namespace Proto; +/// +/// A supervision strategy that will try to restart the failing child while backing off exponentially. +/// public class ExponentialBackoffStrategy : ISupervisorStrategy { private readonly TimeSpan _backoffWindow; private readonly TimeSpan _initialBackoff; private readonly Random _random = new(); + /// + /// Creates a new instance of . + /// + /// Maximum time for the retries + /// Initial delay that will be multiplied by retry count on subsequent retries public ExponentialBackoffStrategy(TimeSpan backoffWindow, TimeSpan initialBackoff) { _backoffWindow = backoffWindow; diff --git a/src/Proto.Actor/Supervision/ISupervisorStrategy.cs b/src/Proto.Actor/Supervision/ISupervisorStrategy.cs index d93e37bf80..cb7d7c224d 100644 --- a/src/Proto.Actor/Supervision/ISupervisorStrategy.cs +++ b/src/Proto.Actor/Supervision/ISupervisorStrategy.cs @@ -8,7 +8,18 @@ // ReSharper disable once CheckNamespace namespace Proto; +/// +/// Handles failures of children actors. +/// public interface ISupervisorStrategy { + /// + /// Handle the failure + /// + /// Supervisor of the children + /// The failing child's + /// Restart statistics + /// Exception thrown by the child + /// Message being processed at the time of the failure void HandleFailure(ISupervisor supervisor, PID child, RestartStatistics rs, Exception cause, object? message); } \ No newline at end of file diff --git a/src/Proto.Actor/Supervision/OneForOneStrategy.cs b/src/Proto.Actor/Supervision/OneForOneStrategy.cs index 979cfddad0..13c4d3fa2b 100644 --- a/src/Proto.Actor/Supervision/OneForOneStrategy.cs +++ b/src/Proto.Actor/Supervision/OneForOneStrategy.cs @@ -9,6 +9,11 @@ // ReSharper disable once CheckNamespace namespace Proto; +/// +/// Supervision strategy that applies the supervision directive only to the failing child. +/// See One-For-One strategy vs All-For-One strategy +/// This strategy is appropriate when the failing child can be restarted independently from other children of the supervisor. +/// public class OneForOneStrategy : ISupervisorStrategy { private static readonly ILogger Logger = Log.CreateLogger(); @@ -17,6 +22,12 @@ public class OneForOneStrategy : ISupervisorStrategy private readonly int _maxNrOfRetries; private readonly TimeSpan? _withinTimeSpan; + /// + /// Creates a new instance of the class. + /// + /// A delegate that provided with failing child and the exception returns a + /// Number of restart retries before stopping the failing child of the supervisor + /// A time window to count in public OneForOneStrategy(Decider decider, int maxNrOfRetries, TimeSpan? withinTimeSpan) { _decider = decider; diff --git a/src/Proto.Actor/Supervision/Supervision.cs b/src/Proto.Actor/Supervision/Supervision.cs index a21702f780..fc14315235 100644 --- a/src/Proto.Actor/Supervision/Supervision.cs +++ b/src/Proto.Actor/Supervision/Supervision.cs @@ -10,10 +10,21 @@ namespace Proto; public static class Supervision { + /// + /// Default supervision strategy is + /// public static ISupervisorStrategy DefaultStrategy { get; } = new OneForOneStrategy((who, reason) => SupervisorDirective.Restart, 10, TimeSpan.FromSeconds(10)); + /// + /// Restarts the actor regardless of the failure reason + /// public static ISupervisorStrategy AlwaysRestartStrategy { get; } = new AlwaysRestartStrategy(); } +/// +/// Decides how to handle the failure +/// +/// of the actor that failed +/// Exception thrown by the failing actor public delegate SupervisorDirective Decider(PID pid, Exception reason); \ No newline at end of file diff --git a/src/Proto.Actor/Supervision/SupervisorDirective.cs b/src/Proto.Actor/Supervision/SupervisorDirective.cs index 19d98af0ba..2779e60157 100644 --- a/src/Proto.Actor/Supervision/SupervisorDirective.cs +++ b/src/Proto.Actor/Supervision/SupervisorDirective.cs @@ -7,10 +7,28 @@ // ReSharper disable once CheckNamespace namespace Proto; +/// +/// Tells the supervisor what to do in case of failure. +/// public enum SupervisorDirective { + /// + /// Subject continues processing after failure, starting with the next message in the mailbox. + /// Resume, + + /// + /// Restarts the subject + /// Restart, + + /// + /// Permanently stops the subject + /// Stop, + + /// + /// Escalates the failure to the parent supervisor + /// Escalate } \ No newline at end of file diff --git a/src/Proto.Actor/Timers/Scheduler.cs b/src/Proto.Actor/Timers/Scheduler.cs index 9ab6149c5a..4602f06e16 100644 --- a/src/Proto.Actor/Timers/Scheduler.cs +++ b/src/Proto.Actor/Timers/Scheduler.cs @@ -5,13 +5,27 @@ namespace Proto.Timers; +/// +/// Scheduler can be used to schedule a message to be sent in the future. It is useful e.g., when actor needs to do some work after a certain time. +/// [PublicAPI] public class Scheduler { private readonly ISenderContext _context; + /// + /// Creates a new scheduler. + /// + /// Context to send the scheduled message through public Scheduler(ISenderContext context) => _context = context; + /// + /// Schedules a single message to be sent in the future. + /// + /// Delay before sending the message + /// of the recipient. + /// Message to be sent + /// that can be used to cancel the scheduled message public CancellationTokenSource SendOnce(TimeSpan delay, PID target, object message) { var cts = new CancellationTokenSource(); @@ -26,9 +40,24 @@ public CancellationTokenSource SendOnce(TimeSpan delay, PID target, object messa return cts; } + /// + /// Schedules message sending on a periodic basis. + /// + /// Interval between sends, and also the initial delay + /// of the recipient. + /// Message to be sent + /// that can be used to cancel the scheduled messages public CancellationTokenSource SendRepeatedly(TimeSpan interval, PID target, object message) => SendRepeatedly(interval, interval, target, message); + /// + /// Schedules message sending on a periodic basis. + /// + /// Initial delay + /// Interval between sends + /// of the recipient. + /// Message to be sent + /// that can be used to cancel the scheduled messages public CancellationTokenSource SendRepeatedly(TimeSpan delay, TimeSpan interval, PID target, object message) { var cts = new CancellationTokenSource(); @@ -56,6 +85,14 @@ async Task Trigger() return cts; } + /// + /// Schedules a request on a periodic basis. The response will arrive to the actor context for which the was created. + /// + /// Initial delay + /// Interval between requests + /// of the recipient. + /// Message to be sent + /// that can be used to cancel the scheduled messages public CancellationTokenSource RequestRepeatedly(TimeSpan delay, TimeSpan interval, PID target, object message) { var cts = new CancellationTokenSource(); diff --git a/src/Proto.Actor/Timers/TimerExtensions.cs b/src/Proto.Actor/Timers/TimerExtensions.cs index 0ada947623..82683a778f 100644 --- a/src/Proto.Actor/Timers/TimerExtensions.cs +++ b/src/Proto.Actor/Timers/TimerExtensions.cs @@ -2,5 +2,10 @@ namespace Proto.Timers; public static class TimerExtensions { + /// + /// Gets a new scheduler that allows to schedule messages in the future + /// + /// + /// public static Scheduler Scheduler(this ISenderContext context) => new(context); } \ No newline at end of file diff --git a/src/Proto.Actor/Utils/AsyncSemaphore.cs b/src/Proto.Actor/Utils/AsyncSemaphore.cs index bf93ccebf7..f16d420dcd 100644 --- a/src/Proto.Actor/Utils/AsyncSemaphore.cs +++ b/src/Proto.Actor/Utils/AsyncSemaphore.cs @@ -9,15 +9,28 @@ namespace Proto.Utils; +/// +/// AsyncSemaphore allows to limit the number of concurrent tasks to a maximum number. +/// public class AsyncSemaphore { private readonly SemaphoreSlim _semaphore; + /// + /// Creates a new instance of . + /// + /// Maximum number of concurrent tasks public AsyncSemaphore(int maxConcurrency) => _semaphore = new SemaphoreSlim( maxConcurrency, maxConcurrency ); + /// + /// Starts and awaits a task when a slot within the maximum number of concurrent tasks is available. + /// + /// Delegate to start the task + /// + /// public async Task WaitAsync(Func> producer) { await _semaphore.WaitAsync(); @@ -34,6 +47,10 @@ public async Task WaitAsync(Func> producer) } } + /// + /// Starts and awaits a task when a slot within the maximum number of concurrent tasks is available. + /// + /// Delegate to start the task public async Task WaitAsync(Func producer) { await _semaphore.WaitAsync(); @@ -49,6 +66,11 @@ public async Task WaitAsync(Func producer) } } + /// + /// Starts a task when a slot within the maximum number of concurrent tasks is available. The caller will be blocked until the slot is available, + /// however then the task is run asynchronously and the method returns. + /// + /// Delegate to start the task public void Wait(Func producer) { //block caller diff --git a/src/Proto.Actor/Utils/ConcurrentKeyValueStore.cs b/src/Proto.Actor/Utils/ConcurrentKeyValueStore.cs index 47c881a838..c0643538bd 100644 --- a/src/Proto.Actor/Utils/ConcurrentKeyValueStore.cs +++ b/src/Proto.Actor/Utils/ConcurrentKeyValueStore.cs @@ -10,11 +10,19 @@ namespace Proto.Utils; +/// +/// A base class for a key value store, that limits the number of concurrent operations +/// +/// [PublicAPI] public abstract class ConcurrentKeyValueStore : IKeyValueStore { private readonly AsyncSemaphore _semaphore; + /// + /// Creates a new instance of + /// + /// that defines the concurrency limits protected ConcurrentKeyValueStore(AsyncSemaphore semaphore) => _semaphore = semaphore; public Task GetAsync(string id, CancellationToken ct) => _semaphore.WaitAsync(() => InnerGetStateAsync(id, ct)); diff --git a/src/Proto.Actor/Utils/ConcurrentSet.cs b/src/Proto.Actor/Utils/ConcurrentSet.cs index 66a273ff40..710eeac1cd 100644 --- a/src/Proto.Actor/Utils/ConcurrentSet.cs +++ b/src/Proto.Actor/Utils/ConcurrentSet.cs @@ -9,6 +9,10 @@ namespace Proto.Utils; +/// +/// A collection with set semantics built on top of . +/// +/// [PublicAPI] public class ConcurrentSet { diff --git a/src/Proto.Actor/Utils/EmptyKeyValueStore.cs b/src/Proto.Actor/Utils/EmptyKeyValueStore.cs index 7121f05c3e..4d77311ab8 100644 --- a/src/Proto.Actor/Utils/EmptyKeyValueStore.cs +++ b/src/Proto.Actor/Utils/EmptyKeyValueStore.cs @@ -8,6 +8,10 @@ namespace Proto.Utils; +/// +/// Noop key value store. +/// +/// public class EmptyKeyValueStore : IKeyValueStore { public Task GetAsync(string id, CancellationToken ct) => Task.FromResult(default(T)); diff --git a/src/Proto.Actor/Utils/IKeyValueStore.cs b/src/Proto.Actor/Utils/IKeyValueStore.cs index ca20716dc3..2e6145771e 100644 --- a/src/Proto.Actor/Utils/IKeyValueStore.cs +++ b/src/Proto.Actor/Utils/IKeyValueStore.cs @@ -9,13 +9,36 @@ namespace Proto.Utils; +/// +/// A key value store abstraction. +/// +/// [PublicAPI] public interface IKeyValueStore { + /// + /// Get the value for the given key. + /// + /// Key + /// + /// Task GetAsync(string id, CancellationToken ct); + /// + /// Set the value for the given key. + /// + /// Key + /// Value + /// + /// Task SetAsync(string id, T state, CancellationToken ct); + /// + /// Clear the value for the given key. + /// + /// Key + /// + /// Task ClearAsync(string id, CancellationToken ct); } \ No newline at end of file diff --git a/src/Proto.Actor/Utils/TaskClock.cs b/src/Proto.Actor/Utils/TaskClock.cs index 7e8001a7c6..23c444cfbb 100644 --- a/src/Proto.Actor/Utils/TaskClock.cs +++ b/src/Proto.Actor/Utils/TaskClock.cs @@ -9,6 +9,9 @@ namespace Proto.Utils; +/// +/// Provides a clock that "ticks" at a certain interval and the next tick can be awaited. +/// public class TaskClock { private readonly TimeSpan _bucketSize; @@ -16,10 +19,20 @@ public class TaskClock private readonly CancellationToken _ct; private Task _currentBucket = Task.CompletedTask; + /// + /// This task will complete when the clock "ticks" and will be replaced with a new task that will complete on the next "tick" + /// public Task CurrentBucket { get => Volatile.Read(ref _currentBucket); private set => Volatile.Write(ref _currentBucket, value); } + + /// + /// Creates a new TaskClock + /// + /// Initial delay + /// Tick interval + /// Used to stop the clock public TaskClock(TimeSpan timeout, TimeSpan updateInterval, CancellationToken ct) { _bucketSize = timeout + updateInterval; @@ -27,6 +40,9 @@ public TaskClock(TimeSpan timeout, TimeSpan updateInterval, CancellationToken ct _ct = ct; } + /// + /// Starts the clock + /// public void Start() { CurrentBucket = Task.Delay(_bucketSize, _ct); diff --git a/src/Proto.Actor/Utils/TaskExtensions.cs b/src/Proto.Actor/Utils/TaskExtensions.cs index 3c5b19feac..bbd4a7f3a8 100644 --- a/src/Proto.Actor/Utils/TaskExtensions.cs +++ b/src/Proto.Actor/Utils/TaskExtensions.cs @@ -11,6 +11,15 @@ namespace Proto.Utils; public static class TaskExtensions { + /// + /// Adds a timeout to a task. If the task times out (or provided token is cancelled), is thrown. + /// + /// Task to be awaited + /// Timeout + /// + /// + /// + /// public static async Task WithTimeout(this Task task, TimeSpan timeout, CancellationToken? ct = null) { if (task.IsCompleted) return await task.ConfigureAwait(false); // Very important in order to propagate exceptions @@ -31,7 +40,7 @@ public static async Task WithTimeout(this Task task, } /// - /// Waits up to given timeout, returns true if task completed, false if it timed out + /// Waits up to given timeout (or provided token is cancelled), returns true if task completed, false otherwise /// public static async Task WaitUpTo(this Task task, TimeSpan timeout, CancellationToken? ct = null) { @@ -51,7 +60,7 @@ public static async Task WaitUpTo(this Task task, TimeSpan timeout, Cancel } /// - /// Waits up to given timeout, returns (true,value) if task completed, (false, default) if it timed out + /// Waits up to given timeout (or provided token is cancelled), returns (true,value) if task completed, (false, default) otherwise /// public static async Task<(bool completed, TResult result)> WaitUpTo( this Task task, diff --git a/src/Proto.Actor/Utils/TaskFactory.cs b/src/Proto.Actor/Utils/TaskFactory.cs index 7623a6c713..7a5416befa 100644 --- a/src/Proto.Actor/Utils/TaskFactory.cs +++ b/src/Proto.Actor/Utils/TaskFactory.cs @@ -15,6 +15,13 @@ public static class SafeTask { private static readonly ILogger Logger = Log.CreateLogger(); + /// + /// Runs a task and handles exceptions. If is thrown, it is ignored. + /// If any other exception is thrown, it is logged. + /// + /// + /// + /// public static async Task Run(Func body, CancellationToken cancellationToken = default, [CallerMemberName] string name = "") { Task? t = null; diff --git a/src/Proto.Actor/Utils/Throttle.cs b/src/Proto.Actor/Utils/Throttle.cs index c693f1032e..17e8f85a6f 100644 --- a/src/Proto.Actor/Utils/Throttle.cs +++ b/src/Proto.Actor/Utils/Throttle.cs @@ -9,25 +9,44 @@ namespace Proto.Utils; +/// +/// Records an event when called, and returns current state of the throttle valve +/// public delegate Throttle.Valve ShouldThrottle(); +/// +/// Used for throttling events in a given time window. +/// public static class Throttle { public enum Valve { + /// + /// Business as usual, continue processing events + /// Open, + + /// + /// Next event will close the valve + /// Closing, + + /// + /// Limit exceeded, stop processing events for now + /// Closed } /// - /// This has no guarantees that the throttle opens exactly after the period, since it is reset asynchronously - /// Throughput has been prioritized over exact re-opening + /// Creates a new throttle with the given window and rate. After first event is recorded, a timer starts to reset the number of events back to 0. + /// If the number of events in the meantime exceeds the limit, the valve will be closed. + /// This has no guarantees that the throttle opens exactly after the period, since it is reset asynchronously + /// Throughput has been prioritized over exact re-opening /// - /// - /// - /// This will be called with the number of events what was throttled after the period - /// + /// Event limit + /// Time window to verify event limit + /// This will be called with the number of events that was throttled after the period + /// delegate that records an event when called, and returns current state of the throttle valve public static ShouldThrottle Create( int maxEventsInPeriod, TimeSpan period, @@ -65,6 +84,4 @@ public static ShouldThrottle Create( public static bool IsOpen(this Valve valve) => valve != Valve.Closed; } -public record ThrottleOptions(int MaxEventsInPeriod, TimeSpan Period) -{ -} \ No newline at end of file +public record ThrottleOptions(int MaxEventsInPeriod, TimeSpan Period); \ No newline at end of file