Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 34 additions & 24 deletions src/Proto.Cluster/Gossip/GossipActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ int gossipMaxSend
public Task ReceiveAsync(IContext context) =>
context.Message switch
{
SetGossipStateKey setState => OnSetGossipStateKey(context, setState),
GetGossipStateRequest getState => OnGetGossipStateKey(context, getState),
SetGossipStateKey setState => OnSetGossipStateKey(context, setState),
GetGossipStateRequest getState => OnGetGossipStateKey(context, getState),
GetGossipStateEntryRequest getState => OnGetGossipStateEntryKey(context, getState),
GetGossipStateSnapshot => OnGetGossipStateSnapshot(context),
GossipRequest gossipRequest => OnGossipRequest(context, gossipRequest),
SendGossipStateRequest => OnSendGossipState(context),
AddConsensusCheck request => OnAddConsensusCheck(context, request),
ClusterTopology clusterTopology => OnClusterTopology(clusterTopology),
_ => Task.CompletedTask
GetGossipStateSnapshot => OnGetGossipStateSnapshot(context),
GossipRequest gossipRequest => OnGossipRequest(context, gossipRequest),
SendGossipStateRequest => OnSendGossipState(context),
AddConsensusCheck request => OnAddConsensusCheck(context, request),
ClusterTopology clusterTopology => OnClusterTopology(clusterTopology),
_ => Task.CompletedTask
};

private Task OnGetGossipStateEntryKey(IContext context, GetGossipStateEntryRequest getState)
Expand Down Expand Up @@ -94,10 +94,10 @@ private Task OnGossipRequest(IContext context, GossipRequest gossipRequest)
}

ReceiveState(context, gossipRequest.State);

context.Respond(new GossipResponse());


return Task.CompletedTask;
}

Expand Down Expand Up @@ -126,20 +126,20 @@ private Task OnSetGossipStateKey(IContext context, SetGossipStateKey setStateKey

private Task OnSendGossipState(IContext context)
{
_internal.SendState((memberState, member, logger) => SendGossipForMember(context, member, logger, memberState));
_internal.SendState((memberState, member, logger) => SendGossipForMember(context, member, memberState));
context.Respond(new SendGossipStateResponse());

return Task.CompletedTask;
}

private void SendGossipForMember(IContext context, Member member, InstanceLogger? logger,
private void SendGossipForMember(IContext context, Member targetMember,
MemberStateDelta memberStateDelta)
{
var pid = PID.FromAddress(member.Address, Gossiper.GossipActorName);
var pid = PID.FromAddress(targetMember.Address, Gossiper.GossipActorName);

if (Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug("Sending GossipRequest to {MemberId}", member.Id);
Logger.LogDebug("Sending GossipRequest to {MemberId}", targetMember.Id);
}

var start = DateTime.UtcNow;
Expand All @@ -151,26 +151,36 @@ private void SendGossipForMember(IContext context, Member member, InstanceLogger
async task =>
{
var delta = DateTime.UtcNow - start;
var self = context.Cluster().MemberList.Self;

//if the target is no longer part of the cluster. don't log. the failure is expected.. issue #1992
if (!context.Cluster().MemberList.TryGetMember(targetMember.Id, out _))
{
return;
}

try
{
await task.ConfigureAwait(false);
memberStateDelta.CommitOffsets();
}
catch (DeadLetterException)
{
Logger.LogWarning("DeadLetter in GossipReenterAfterSend, elapsed {Delta}ms",delta.TotalMilliseconds);
}
catch (OperationCanceledException)
{
Logger.LogWarning("OperationCancel in GossipReenterAfterSend, elapsed {Delta}ms",delta.TotalMilliseconds);
}
catch (TimeoutException)
{
Logger.LogWarning("Timeout in GossipReenterAfterSend, elapsed {Delta}ms",delta.TotalMilliseconds);
//log member issue #1993
Logger.LogWarning(
"Timeout in GossipReenterAfterSend, elapsed {Delta}ms for target member {TargetMember} from {SelfMember}",
delta.TotalMilliseconds, targetMember, self);
}
catch (Exception x)
{
Logger.LogError(x, "GossipReenterAfterSend failed, elapsed {Delta}ms",delta.TotalMilliseconds);
//if the target is no longer part of the cluster. don't log. the failure is expected.. issue #1992
if (context.Cluster().MemberList.TryGetMember(targetMember.Id, out _))
{
//log member issue #1993
Logger.LogError(x,
"GossipReenterAfterSend failed, elapsed {Delta}ms for target member {TargetMember} from {SelfMember}",
delta.TotalMilliseconds, targetMember, self);
}
}
},
CancellationTokens.WithTimeout(_gossipRequestTimeout)
Expand Down