Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 13 additions & 52 deletions tests/Proto.Actor.Tests/SchedulerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading.Tasks;
using System.Threading;
using Microsoft.Extensions.Time.Testing;
using Proto.TestKit;
using Proto.Timers;
using Xunit;

Expand All @@ -14,53 +15,27 @@ public async Task SendOnceWorksWithTimeProvider()
{
await using var system = new ActorSystem();
var context = system.Root;
var tcs = new TaskCompletionSource();
var pid = context.Spawn(Props.FromFunc(context =>
{
if (context.Message is "Wakeup")
{
tcs.SetResult();
}
var probe = new TestProbe();
var pid = context.Spawn(Props.FromProducer(() => probe));

return Task.CompletedTask;
}));
var timeProvider = new FakeTimeProvider();
var hook = new TestSchedulerHook();
var scheduler = context.Scheduler(timeProvider, hook);

scheduler.SendOnce(TimeSpan.FromSeconds(10), pid, "Wakeup");
await hook.WaitAsync();
timeProvider.Advance(TimeSpan.FromMinutes(10));
await tcs.Task.WaitAsync(TimeSpan.FromMilliseconds(10));
var msg = await probe.GetNextMessageAsync<string>(TimeSpan.FromMilliseconds(10));
Assert.Equal("Wakeup", msg);
}

[Fact]
public async Task SendRepeatedlyCanBeCancelled()
{
await using var system = new ActorSystem();
var context = system.Root;
var count = 0;
var firstMessage = new TaskCompletionSource();
var extraMessage = new TaskCompletionSource();

var pid = context.Spawn(Props.FromFunc(ctx =>
{
if (ctx.Message is "Tick")
{
count++;

if (count == 1)
{
firstMessage.SetResult();
}
else
{
extraMessage.SetResult();
}
}

return Task.CompletedTask;
}));
var probe = new TestProbe();
var pid = context.Spawn(Props.FromProducer(() => probe));

var timeProvider = new FakeTimeProvider();
var hook = new TestSchedulerHook();
Expand All @@ -70,33 +45,20 @@ public async Task SendRepeatedlyCanBeCancelled()

await hook.WaitAsync();
timeProvider.Advance(TimeSpan.FromMinutes(1));
await firstMessage.Task.WaitAsync(TimeSpan.FromMilliseconds(10));
await probe.GetNextMessageAsync<string>(TimeSpan.FromMilliseconds(10));

cts.Cancel();

timeProvider.Advance(TimeSpan.FromMinutes(1));
await Task.Delay(50);

Assert.Equal(1, count);
Assert.False(extraMessage.Task.IsCompleted);
await probe.ExpectNoMessageAsync(TimeSpan.FromMilliseconds(50));
}

[Fact]
public async Task SendOnceCanBeCancelled()
{
await using var system = new ActorSystem();
var context = system.Root;
var tcs = new TaskCompletionSource();

var pid = context.Spawn(Props.FromFunc(ctx =>
{
if (ctx.Message is "Wakeup")
{
tcs.SetResult();
}

return Task.CompletedTask;
}));
var probe = new TestProbe();
var pid = context.Spawn(Props.FromProducer(() => probe));

var timeProvider = new FakeTimeProvider();
var hook = new TestSchedulerHook();
Expand All @@ -106,11 +68,10 @@ public async Task SendOnceCanBeCancelled()

await hook.WaitAsync();
cts.Cancel();

timeProvider.Advance(TimeSpan.FromMinutes(1));
await Task.Delay(50);

Assert.False(tcs.Task.IsCompleted);
timeProvider.Advance(TimeSpan.FromMinutes(1));
await probe.ExpectNoMessageAsync(TimeSpan.FromMilliseconds(50));
}

[Fact]
Expand Down
Loading