Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions logs/log1755975701.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- created RemoteGossipTests exercising gossip exchange between standalone remote nodes
- ensures decoupled Gossiper and GossipActor operate without cluster infrastructure
- added two scenarios: one-way replication and bidirectional synchronization
164 changes: 164 additions & 0 deletions tests/Proto.Cluster.Tests/RemoteGossipTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
using System;
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using Proto;
using Proto.Cluster;
using Proto.Cluster.Gossip;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using Xunit;
using static Proto.TestKit.TestKit;

namespace Proto.Cluster.Tests;

[Collection("ClusterTests")]
public class RemoteGossipTests
{
private sealed class TestMemberList : IMemberList
{
private readonly ImmutableDictionary<string, Member> _members;

public TestMemberList(Member self, params Member[] members)
{
Self = self;
_members = members.ToImmutableDictionary(m => m.Id);
}

public bool ContainsMemberId(string memberId) => _members.ContainsKey(memberId);

public bool TryGetMember(string memberId, out Member? value) => _members.TryGetValue(memberId, out value);

public Member Self { get; }

public ImmutableHashSet<string> GetMembers() => _members.Keys.ToImmutableHashSet();
}

private static Gossiper CreateGossiper(ActorSystem system, IMemberList memberList)
{
var options = new GossiperOptions(
system.Root,
memberList,
system.Remote().BlockList,
system.EventStream,
system.Id,
Task.CompletedTask,
CancellationToken.None,
() => new ActorStatistics(),
GossipFanout: 1,
GossipMaxSend: 1,
GossipInterval: TimeSpan.FromMilliseconds(200),
GossipRequestTimeout: TimeSpan.FromSeconds(2),
GossipDebugLogging: false,
HeartbeatExpiration: TimeSpan.Zero,
HeartbeatExpirationHandler: () => Task.CompletedTask);

return new Gossiper(options);
}

private static RemoteConfig ConfigureRemote() =>
RemoteConfig.BindToLocalhost().WithProtoMessages(GossipContractsReflection.Descriptor);

[Fact]
public async Task Should_replicate_state_between_two_remote_nodes()
{
var remote1 = new GrpcNetRemote(new ActorSystem(), ConfigureRemote());
var remote2 = new GrpcNetRemote(new ActorSystem(), ConfigureRemote());
await remote1.StartAsync();
await remote2.StartAsync();

var system1 = remote1.System;
var system2 = remote2.System;

var (host1, port1) = system1.GetAddress();
var (host2, port2) = system2.GetAddress();
var member1 = new Member { Id = system1.Id, Host = host1, Port = port1 };
var member2 = new Member { Id = system2.Id, Host = host2, Port = port2 };

var memberList1 = new TestMemberList(member1, member1, member2);
var memberList2 = new TestMemberList(member2, member1, member2);

var gossiper1 = CreateGossiper(system1, memberList1);
var gossiper2 = CreateGossiper(system2, memberList2);
await gossiper1.StartGossipActorAsync();
await gossiper2.StartGossipActorAsync();

var topology = new ClusterTopology();
topology.Members.Add(member1);
topology.Members.Add(member2);
system1.EventStream.Publish(topology);
system2.EventStream.Publish(topology);

gossiper1.SetState("test", new Int32Value { Value = 42 });

var pid1 = PID.FromAddress(system1.Address, Gossiper.GossipActorName);
await system1.Root.RequestAsync<SendGossipStateResponse>(pid1, new SendGossipStateRequest(),
CancellationTokens.FromSeconds(5));

await AwaitConditionAsync(async () =>
{
var state = await gossiper2.GetState<Int32Value>("test");
return state.TryGetValue(member1.Id, out var v) && v.Value == 42;
}, TimeSpan.FromSeconds(5));

await gossiper1.ShutdownAsync();
await gossiper2.ShutdownAsync();
await Task.WhenAll(remote1.ShutdownAsync(), remote2.ShutdownAsync());
}

[Fact]
public async Task Should_gossip_state_in_both_directions()
{
var remote1 = new GrpcNetRemote(new ActorSystem(), ConfigureRemote());
var remote2 = new GrpcNetRemote(new ActorSystem(), ConfigureRemote());
await remote1.StartAsync();
await remote2.StartAsync();

var system1 = remote1.System;
var system2 = remote2.System;

var (host1, port1) = system1.GetAddress();
var (host2, port2) = system2.GetAddress();
var member1 = new Member { Id = system1.Id, Host = host1, Port = port1 };
var member2 = new Member { Id = system2.Id, Host = host2, Port = port2 };

var memberList1 = new TestMemberList(member1, member1, member2);
var memberList2 = new TestMemberList(member2, member1, member2);

var gossiper1 = CreateGossiper(system1, memberList1);
var gossiper2 = CreateGossiper(system2, memberList2);
await gossiper1.StartGossipActorAsync();
await gossiper2.StartGossipActorAsync();

var topology = new ClusterTopology();
topology.Members.Add(member1);
topology.Members.Add(member2);
system1.EventStream.Publish(topology);
system2.EventStream.Publish(topology);

gossiper1.SetState("key1", new Int32Value { Value = 1 });
gossiper2.SetState("key2", new Int32Value { Value = 2 });

var pid1 = PID.FromAddress(system1.Address, Gossiper.GossipActorName);
var pid2 = PID.FromAddress(system2.Address, Gossiper.GossipActorName);

await system1.Root.RequestAsync<SendGossipStateResponse>(pid1, new SendGossipStateRequest(),
CancellationTokens.FromSeconds(5));
await system2.Root.RequestAsync<SendGossipStateResponse>(pid2, new SendGossipStateRequest(),
CancellationTokens.FromSeconds(5));

await AwaitConditionAsync(async () =>
{
var a = await gossiper2.GetState<Int32Value>("key1");
var b = await gossiper1.GetState<Int32Value>("key2");
return a.TryGetValue(member1.Id, out var va) && va.Value == 1 &&
b.TryGetValue(member2.Id, out var vb) && vb.Value == 2;
}, TimeSpan.FromSeconds(5));

await gossiper1.ShutdownAsync();
await gossiper2.ShutdownAsync();
await Task.WhenAll(remote1.ShutdownAsync(), remote2.ShutdownAsync());
}
}

Loading