Skip to content

Commit a17fdb3

Browse files
authored
Add regression test for unreachable subscribers (#2233)
1 parent 38a152e commit a17fdb3

File tree

3 files changed

+141
-2
lines changed

3 files changed

+141
-2
lines changed

src/Proto.Remote/Endpoints/Endpoint.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ private int DropMessagesInBatch(RemoteMessage remoteMessage)
196196

197197
for (var i = 0; i < batch.Targets.Count; i++)
198198
{
199-
var target = new PID(System.Address, batch.Targets[i]);
199+
var target = new PID(RemoteAddress, batch.Targets[i]);
200200

201201
if (target.TryTranslateToLocalClientPID(out var pid))
202202
{
@@ -215,7 +215,7 @@ private int DropMessagesInBatch(RemoteMessage remoteMessage)
215215

216216
if (string.IsNullOrEmpty(s.Address))
217217
{
218-
s.Address = RemoteAddress;
218+
s.Address = System.Address;
219219
}
220220

221221
s.Ref(System);
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using System.Collections.Concurrent;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Proto.Cluster.PubSub;
5+
using Proto.Utils;
6+
7+
namespace Proto.Cluster.Tests;
8+
9+
public class InMemorySubscribersStore : IKeyValueStore<Subscribers>
10+
{
11+
private readonly ConcurrentDictionary<string, Subscribers> _store = new();
12+
13+
public Task<Subscribers> GetAsync(string id, CancellationToken ct)
14+
{
15+
_store.TryGetValue(id, out var subscribers);
16+
return subscribers == null ? Task.FromResult(new Subscribers()) : Task.FromResult(subscribers);
17+
}
18+
19+
public Task SetAsync(string id, Subscribers state, CancellationToken ct)
20+
{
21+
_store[id] = state;
22+
return Task.CompletedTask;
23+
}
24+
25+
public Task ClearAsync(string id, CancellationToken ct)
26+
{
27+
_store.TryRemove(id, out _);
28+
return Task.CompletedTask;
29+
}
30+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using FluentAssertions;
7+
using Proto;
8+
using Proto.Cluster.PubSub;
9+
using Proto.Utils;
10+
using Xunit;
11+
12+
namespace Proto.Cluster.Tests;
13+
14+
[Collection("ClusterTests")]
15+
public class UnreachableSubscriberTests
16+
{
17+
[Fact]
18+
public async Task Unreachable_subscribers_are_removed_after_dropped_message()
19+
{
20+
const string topic = "unreachable-drop";
21+
22+
var fixture = new Fixture();
23+
await using var _ = fixture;
24+
await fixture.InitializeAsync();
25+
26+
var leavingMember = fixture.Members.Last();
27+
var stayingMember = fixture.Members.First();
28+
29+
var props = Props.FromFunc(ctx =>
30+
{
31+
if (ctx.Message is DataPublished msg)
32+
{
33+
fixture.Deliveries.Add(new Delivery(ctx.Self.ToDiagnosticString(), msg.Data));
34+
ctx.Respond(new Response());
35+
}
36+
37+
return Task.CompletedTask;
38+
});
39+
40+
var subscriberPid = leavingMember.System.Root.Spawn(props);
41+
await stayingMember.Subscribe(topic, subscriberPid);
42+
43+
var subscribers = await fixture.GetSubscribersForTopic(topic);
44+
subscribers.Subscribers_.Should().Contain(s => s.Pid.Equals(subscriberPid));
45+
46+
await fixture.PublishData(topic, 1);
47+
await WaitUntil(() => fixture.Deliveries.Count == 1, "initial delivery");
48+
49+
await fixture.RemoveNode(leavingMember, graceful: false);
50+
51+
await fixture.PublishData(topic, 2);
52+
53+
await WaitUntil(async () =>
54+
{
55+
var subs = await fixture.GetSubscribersForTopic(topic);
56+
return subs.Subscribers_.Count == 0;
57+
}, "Subscriber should be removed");
58+
59+
fixture.Deliveries.Count.Should().Be(1);
60+
}
61+
62+
private static async Task WaitUntil(Func<bool> condition, string? message = null, int timeoutMs = 5000, int delayMs = 100)
63+
{
64+
var stop = DateTime.UtcNow + TimeSpan.FromMilliseconds(timeoutMs);
65+
while (DateTime.UtcNow < stop)
66+
{
67+
if (condition()) return;
68+
await Task.Delay(delayMs);
69+
}
70+
throw new TimeoutException(message ?? "Condition not met");
71+
}
72+
73+
private static async Task WaitUntil(Func<Task<bool>> condition, string? message = null, int timeoutMs = 5000, int delayMs = 100)
74+
{
75+
var stop = DateTime.UtcNow + TimeSpan.FromMilliseconds(timeoutMs);
76+
while (DateTime.UtcNow < stop)
77+
{
78+
if (await condition()) return;
79+
await Task.Delay(delayMs);
80+
}
81+
throw new TimeoutException(message ?? "Condition not met");
82+
}
83+
84+
private record DataPublished(int Data);
85+
private record Delivery(string Identity, int Data);
86+
private record Response;
87+
88+
private class Fixture : BaseInMemoryClusterFixture
89+
{
90+
public readonly ConcurrentBag<Delivery> Deliveries = new();
91+
private readonly InMemorySubscribersStore _store = new();
92+
93+
public Fixture() : base(2, config => config
94+
.WithActorRequestTimeout(TimeSpan.FromSeconds(1))
95+
.WithPubSubConfig(PubSubConfig.Setup().WithSubscriberTimeout(TimeSpan.FromSeconds(1))))
96+
{
97+
}
98+
99+
protected override ClusterKind[] ClusterKinds =>
100+
base.ClusterKinds
101+
.Concat(new[] { new ClusterKind(TopicActor.Kind, Props.FromProducer(() => new TopicActor(_store))) })
102+
.ToArray();
103+
104+
public Task<Subscribers> GetSubscribersForTopic(string topic) => _store.GetAsync(topic, CancellationToken.None);
105+
106+
public Task<PublishResponse> PublishData(string topic, int data) =>
107+
Members.First().Publisher().Publish(topic, new DataPublished(data), CancellationTokens.FromSeconds(5));
108+
}
109+
}

0 commit comments

Comments
 (0)