Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 53 additions & 119 deletions tests/Proto.Actor.Tests/ReceiveTimeoutTests.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
using System;
using System.Runtime.CompilerServices;
using System;
using System.Threading;
using System.Threading.Tasks;
using Proto.TestKit;
using Proto.Timers;
using Xunit;

namespace Proto.Tests;
Expand All @@ -11,25 +12,20 @@ public class ReceiveTimeoutTests
[Fact]
public async Task receive_timeout_received_within_expected_time()
{
var system = new ActorSystem();
await using var _ = system;
await using var system = new ActorSystem();
var context = system.Root;

var timeoutReceived = false;
var receiveTimeoutWaiter = GetExpiringTaskCompletionSource();
var probe = new TestProbe();
context.Spawn(Props.FromProducer(() => probe));

var props = Props.FromFunc(ctx =>
{
switch (ctx.Message)
{
case Started _:
case Started:
ctx.SetReceiveTimeout(TimeSpan.FromMilliseconds(150));

break;
case ReceiveTimeout _:
timeoutReceived = true;
receiveTimeoutWaiter.SetResult(0);

case ReceiveTimeout msg:
ctx.Send(probe, msg);
break;
}

Expand All @@ -39,32 +35,26 @@ public async Task receive_timeout_received_within_expected_time()

context.Spawn(props);

await GetSafeAwaitableTask(receiveTimeoutWaiter);
Assert.True(timeoutReceived);
await probe.GetNextMessageAsync<ReceiveTimeout>();
}

[Fact]
public async Task receive_timeout_received_within_expected_time_when_sending_ignored_messages()
{

await using var system = new ActorSystem();
var context = system.Root;

var timeoutReceived = false;
var receiveTimeoutWaiter = GetExpiringTaskCompletionSource(1000);
var probe = new TestProbe();
context.Spawn(Props.FromProducer(() => probe));

var props = Props.FromFunc(ctx =>
{
switch (ctx.Message)
{
case Started _:
case Started:
ctx.SetReceiveTimeout(TimeSpan.FromMilliseconds(150));

break;
case ReceiveTimeout _:
timeoutReceived = true;
receiveTimeoutWaiter.SetResult(0);

case ReceiveTimeout msg:
ctx.Send(probe, msg);
break;
}

Expand All @@ -73,45 +63,33 @@ public async Task receive_timeout_received_within_expected_time_when_sending_ign
);

var pid = context.Spawn(props);
var scheduler = context.Scheduler();
var cts = scheduler.SendRepeatedly(TimeSpan.Zero, TimeSpan.FromMilliseconds(100), pid, new IgnoreMe());

_ = Task.Run(async () =>
{
while (!receiveTimeoutWaiter.Task.IsCompleted)
{
context.Send(pid, new IgnoreMe());
await Task.Delay(100);
}
}
);

await GetSafeAwaitableTask(receiveTimeoutWaiter);
Assert.True(timeoutReceived);
await probe.GetNextMessageAsync<ReceiveTimeout>();
cts.Cancel();
}

[Fact]
public async Task receive_timeout_is_reset_by_influencing_messages()
{
await using var system = new ActorSystem();
var context = system.Root;

var timeoutReceived = false;
var receiveTimeoutWaiter = GetExpiringTaskCompletionSource(2000);
var probe = new TestProbe();
context.Spawn(Props.FromProducer(() => probe));

var props = Props.FromFunc(ctx =>
{
switch (ctx.Message)
{
case Started _:
case Started:
ctx.SetReceiveTimeout(TimeSpan.FromMilliseconds(200));

break;
case string _:
case string:
// regular messages reset the receive timeout
break;
case ReceiveTimeout _:
timeoutReceived = true;
receiveTimeoutWaiter.TrySetResult(0);

case ReceiveTimeout msg:
ctx.Send(probe, msg);
break;
}

Expand All @@ -120,49 +98,32 @@ public async Task receive_timeout_is_reset_by_influencing_messages()
);

var pid = context.Spawn(props);
var scheduler = context.Scheduler();
var cts = scheduler.SendRepeatedly(TimeSpan.Zero, TimeSpan.FromMilliseconds(50), pid, "tick");

using var cts = new CancellationTokenSource();

_ = Task.Run(async () =>
{
while (!cts.IsCancellationRequested && !receiveTimeoutWaiter.Task.IsCompleted)
{
context.Send(pid, "tick");
await Task.Delay(50);
}
});

await Task.Delay(400);
Assert.False(timeoutReceived);

await probe.ExpectNoMessageAsync(TimeSpan.FromMilliseconds(400));
cts.Cancel();

await GetSafeAwaitableTask(receiveTimeoutWaiter);
Assert.True(timeoutReceived);
await probe.GetNextMessageAsync<ReceiveTimeout>();
}

[Fact]
public async Task receive_timeout_not_received_within_expected_time()
{
var system = new ActorSystem();
await using var _ = system;
await using var system = new ActorSystem();
var context = system.Root;

var timeoutReceived = false;
var actorStartedWaiter = GetExpiringTaskCompletionSource();
var probe = new TestProbe();
context.Spawn(Props.FromProducer(() => probe));

var props = Props.FromFunc(ctx =>
{
switch (ctx.Message)
{
case Started:
ctx.SetReceiveTimeout(TimeSpan.FromMilliseconds(1500));
actorStartedWaiter.SetResult(0);

ctx.Send(probe, "started");
break;
case ReceiveTimeout:
timeoutReceived = true;

case ReceiveTimeout msg:
ctx.Send(probe, msg);
break;
}

Expand All @@ -172,35 +133,31 @@ public async Task receive_timeout_not_received_within_expected_time()

context.Spawn(props);

await GetSafeAwaitableTask(actorStartedWaiter);
Assert.False(timeoutReceived);
await probe.GetNextMessageAsync<string>();
await probe.ExpectNoMessageAsync(TimeSpan.FromMilliseconds(200));
}

[Fact]
public async Task can_cancel_receive_timeout()
{
var system = new ActorSystem();
await using var _ = system;
await using var system = new ActorSystem();
var context = system.Root;
var probe = new TestProbe();
context.Spawn(Props.FromProducer(() => probe));

var timeoutReceived = false;
var endingTimeout = TimeSpan.MaxValue;
var autoExpiringWaiter = GetExpiringTaskCompletionSource(1500);

var props = Props.FromFunc(ctx =>
{
switch (ctx.Message)
{
case Started _:
case Started:
ctx.SetReceiveTimeout(TimeSpan.FromMilliseconds(150));
ctx.CancelReceiveTimeout();
endingTimeout = ctx.ReceiveTimeout;

break;
case ReceiveTimeout _:
timeoutReceived = true;
autoExpiringWaiter.SetResult(0); // should never happen

case ReceiveTimeout msg:
ctx.Send(probe, msg); // should never happen
break;
}

Expand All @@ -210,38 +167,30 @@ public async Task can_cancel_receive_timeout()

context.Spawn(props);

// this task should auto cancel
await GetSafeAwaitableTask(autoExpiringWaiter);
await probe.ExpectNoMessageAsync(TimeSpan.FromMilliseconds(200));

Assert.True(autoExpiringWaiter.Task.IsCanceled);
Assert.Equal(TimeSpan.Zero, endingTimeout);
Assert.False(timeoutReceived);
}

[Fact]
public async Task can_still_set_receive_timeout_after_cancelling()
{
var system = new ActorSystem();
await using var _ = system;
await using var system = new ActorSystem();
var context = system.Root;

var timeoutReceived = false;
var receiveTimeoutWaiter = GetExpiringTaskCompletionSource();
var probe = new TestProbe();
context.Spawn(Props.FromProducer(() => probe));

var props = Props.FromFunc(ctx =>
{
switch (ctx.Message)
{
case Started _:
case Started:
ctx.SetReceiveTimeout(TimeSpan.FromMilliseconds(150));
ctx.CancelReceiveTimeout();
ctx.SetReceiveTimeout(TimeSpan.FromMilliseconds(150));

break;
case ReceiveTimeout _:
timeoutReceived = true;
receiveTimeoutWaiter.SetResult(0);

case ReceiveTimeout msg:
ctx.Send(probe, msg);
break;
}

Expand All @@ -251,24 +200,9 @@ public async Task can_still_set_receive_timeout_after_cancelling()

context.Spawn(props);

await GetSafeAwaitableTask(receiveTimeoutWaiter);
Assert.True(timeoutReceived);
await probe.GetNextMessageAsync<ReceiveTimeout>();
}

private TaskCompletionSource<int> GetExpiringTaskCompletionSource(int timeoutMs = 60000)
{
var tcs = new TaskCompletionSource<int>();
var ct = new CancellationTokenSource();
ct.Token.Register(() => tcs.TrySetCanceled());
ct.CancelAfter(timeoutMs);

return tcs;
}

private ConfiguredTaskAwaitable<Task<int>> GetSafeAwaitableTask(TaskCompletionSource<int> tcs) =>
tcs.Task
.ContinueWith(t => t) // suppress any TaskCanceledException
.ConfigureAwait(false);

private record IgnoreMe : INotInfluenceReceiveTimeout;
}
}

Loading