Skip to content

Commit fba274d

Browse files
authored
fix pool router start in case of failing routee props (#1715)
1 parent 2fcd667 commit fba274d

File tree

3 files changed

+65
-9
lines changed

3 files changed

+65
-9
lines changed

src/Proto.Actor/Router/RouterActor.cs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
// Copyright (C) 2015-2022 Asynkron AB All rights reserved
44
// </copyright>
55
// -----------------------------------------------------------------------
6+
using System;
67
using System.Linq;
7-
using System.Threading;
88
using System.Threading.Tasks;
99
using Proto.Router.Messages;
1010
using Proto.Router.Routers;
@@ -15,21 +15,28 @@ public class RouterActor : IActor
1515
{
1616
private readonly RouterConfig _config;
1717
private readonly RouterState _routerState;
18-
private readonly AutoResetEvent _wg;
18+
private readonly RouterStartNotification _startNotification;
1919

20-
public RouterActor(RouterConfig config, RouterState routerState, AutoResetEvent wg)
20+
public RouterActor(RouterConfig config, RouterState routerState, RouterStartNotification startNotification)
2121
{
2222
_config = config;
2323
_routerState = routerState;
24-
_wg = wg;
24+
_startNotification = startNotification;
2525
}
2626

2727
public Task ReceiveAsync(IContext context)
2828
{
2929
if (context.Message is Started)
3030
{
31-
_config.OnStarted(context, _routerState);
32-
_wg.Set();
31+
try
32+
{
33+
_config.OnStarted(context, _routerState);
34+
_startNotification.NotifyStarted();
35+
}
36+
catch (Exception e)
37+
{
38+
_startNotification.NotifyFailed(e);
39+
}
3340
return Task.CompletedTask;
3441
}
3542

src/Proto.Actor/Router/Routers/RouterConfig.cs

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ public abstract record RouterConfig
2020
private PID SpawnRouterProcess(ActorSystem system, string name, Props props, PID? parent, Action<IContext>? callback)
2121
{
2222
var routerState = CreateRouterState();
23-
var wg = new AutoResetEvent(false);
24-
var p = props.WithProducer(() => new RouterActor(this, routerState, wg));
23+
var notifyStarted = new RouterStartNotification();
24+
var p = props.WithProducer(() => new RouterActor(this, routerState, notifyStarted));
2525

2626
var mailbox = props.MailboxProducer();
2727
var dispatcher = props.Dispatcher;
@@ -35,7 +35,42 @@ private PID SpawnRouterProcess(ActorSystem system, string name, Props props, PID
3535
mailbox.RegisterHandlers(ctx, dispatcher);
3636
mailbox.PostSystemMessage(Started.Instance);
3737
mailbox.Start();
38-
wg.WaitOne();
38+
39+
var (startSuccess, startException) = notifyStarted.Wait();
40+
41+
if (!startSuccess)
42+
{
43+
system.Root.Stop(self);
44+
throw new RouterStartFailedException(startException!);
45+
}
46+
3947
return self;
4048
}
49+
}
50+
51+
public class RouterStartNotification
52+
{
53+
private readonly ManualResetEvent _wg = new(false);
54+
private Exception? _exception;
55+
56+
public void NotifyStarted() => _wg.Set();
57+
58+
public void NotifyFailed(Exception exception)
59+
{
60+
_exception = exception;
61+
_wg.Set();
62+
}
63+
64+
public (bool StartSuccess, Exception? Exception) Wait()
65+
{
66+
_wg.WaitOne();
67+
return (_exception is null, _exception);
68+
}
69+
}
70+
71+
public class RouterStartFailedException : Exception
72+
{
73+
public RouterStartFailedException(Exception inner) : base("Router failed to start", inner)
74+
{
75+
}
4176
}

tests/Proto.Actor.Tests/Router/PoolRouterTests.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
22
using System.Threading.Tasks;
3+
using FluentAssertions;
34
using Proto.Router.Messages;
5+
using Proto.Router.Routers;
46
using Proto.TestFixtures;
57
using Xunit;
68

@@ -58,4 +60,16 @@ public async Task RandomPool_CreatesRoutees()
5860
var routees = await system.Root.RequestAsync<Routees>(router, new RouterGetRoutees(), _timeout);
5961
Assert.Equal(3, routees.Pids.Count);
6062
}
63+
64+
[Fact]
65+
public async Task If_routee_props_then_router_creation_fails()
66+
{
67+
await using var system = new ActorSystem();
68+
69+
var failingProps = Props.FromProducer(() => throw new Exception("Failing props"));
70+
71+
system.Invoking(s => s.Root.Spawn(s.Root.NewRandomPool(failingProps, 3, 0)))
72+
.Should().Throw<RouterStartFailedException>()
73+
.WithInnerException<Exception>().WithMessage("Failing props");
74+
}
6175
}

0 commit comments

Comments
 (0)