Skip to content

Commit ac5dde9

Browse files
authored
Use context cancellation tokens in topic actor (#2246)
1 parent ea799dc commit ac5dde9

File tree

1 file changed

+23
-20
lines changed

1 file changed

+23
-20
lines changed

src/Proto.Cluster/PubSub/TopicActor.cs

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public Task ReceiveAsync(IContext context) =>
4646
PubSubBatch batch => OnPubSubBatch(context, batch),
4747
NotifyAboutFailingSubscribersRequest msg => OnNotifyAboutFailingSubscribers(context, msg),
4848
ClusterTopology msg => OnClusterTopologyChanged(context, msg),
49-
DeadLetterResponse msg => OnDeadLetterResponse(msg),
49+
DeadLetterResponse msg => OnDeadLetterResponse(context, msg),
5050
_ => Task.CompletedTask
5151
};
5252

@@ -55,7 +55,7 @@ private async Task OnStarted(IContext context)
5555
_topic = context.Get<ClusterIdentity>()!.Identity;
5656
_topologySubscription = context.System.EventStream.Subscribe<ClusterTopology>(context, context.Self);
5757

58-
var subs = await LoadSubscriptions(_topic).ConfigureAwait(false);
58+
var subs = await LoadSubscriptions(_topic, context.CancellationToken).ConfigureAwait(false);
5959

6060
if (subs.Subscribers_ is not null)
6161
{
@@ -169,7 +169,7 @@ private static Subscribers GetSubscribersForAddress(
169169

170170
private async Task OnNotifyAboutFailingSubscribers(IContext context, NotifyAboutFailingSubscribersRequest msg)
171171
{
172-
await UnsubscribeUnreachablePidSubscribers(msg.InvalidDeliveries).ConfigureAwait(false);
172+
await UnsubscribeUnreachablePidSubscribers(msg.InvalidDeliveries, context.CancellationToken).ConfigureAwait(false);
173173
LogDeliveryErrors(msg.InvalidDeliveries);
174174

175175
context.Respond(new NotifyAboutFailingSubscribersResponse());
@@ -188,7 +188,7 @@ private void LogDeliveryErrors(IReadOnlyCollection<SubscriberDeliveryReport> all
188188
}
189189

190190
private async Task UnsubscribeUnreachablePidSubscribers(
191-
IReadOnlyCollection<SubscriberDeliveryReport> allInvalidDeliveryReports)
191+
IReadOnlyCollection<SubscriberDeliveryReport> allInvalidDeliveryReports, CancellationToken ct)
192192
{
193193
var allUnreachable = allInvalidDeliveryReports
194194
.Where(r => r is
@@ -200,7 +200,7 @@ private async Task UnsubscribeUnreachablePidSubscribers(
200200
.Select(s => s.Subscriber)
201201
.ToList();
202202

203-
await RemoveSubscribers(allUnreachable).ConfigureAwait(false);
203+
await RemoveSubscribers(allUnreachable, ct).ConfigureAwait(false);
204204
}
205205

206206
private async Task OnClusterTopologyChanged(IContext context, ClusterTopology topology)
@@ -214,7 +214,7 @@ private async Task OnClusterTopologyChanged(IContext context, ClusterTopology to
214214
addressesThatLeft.Contains(s.Pid.Address))
215215
.ToList();
216216

217-
await RemoveSubscribers(subscribersThatLeft).ConfigureAwait(false);
217+
await RemoveSubscribers(subscribersThatLeft, context.CancellationToken).ConfigureAwait(false);
218218
}
219219
}
220220

@@ -227,10 +227,10 @@ private async Task UnsubscribeSubscribersOnMembersThatLeft(IContext ctx)
227227
!activeMemberAddresses.Contains(s.Pid.Address))
228228
.ToList();
229229

230-
await RemoveSubscribers(subscribersThatLeft).ConfigureAwait(false);
230+
await RemoveSubscribers(subscribersThatLeft, ctx.CancellationToken).ConfigureAwait(false);
231231
}
232232

233-
private async Task RemoveSubscribers(IReadOnlyCollection<SubscriberIdentity> subscribersThatLeft)
233+
private async Task RemoveSubscribers(IReadOnlyCollection<SubscriberIdentity> subscribersThatLeft, CancellationToken ct)
234234
{
235235
if (subscribersThatLeft.Count > 0)
236236
{
@@ -247,16 +247,15 @@ private async Task RemoveSubscribers(IReadOnlyCollection<SubscriberIdentity> sub
247247
string.Join(", ", subscribersThatLeft));
248248
}
249249

250-
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false);
250+
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }, ct).ConfigureAwait(false);
251251
}
252252
}
253253

254-
private async Task<Subscribers> LoadSubscriptions(string topic)
254+
private async Task<Subscribers> LoadSubscriptions(string topic, CancellationToken ct)
255255
{
256256
try
257257
{
258-
//TODO: cancellation token config?
259-
var state = await _subscriptionStore.GetAsync(topic, CancellationToken.None).ConfigureAwait(false);
258+
var state = await _subscriptionStore.GetAsync(topic, ct).ConfigureAwait(false);
260259
Logger.LogDebug("Topic {Topic} loaded subscriptions {Subscriptions}", _topic, state);
261260

262261
return state ?? new Subscribers();
@@ -272,13 +271,12 @@ private async Task<Subscribers> LoadSubscriptions(string topic)
272271
}
273272
}
274273

275-
private async Task SaveSubscriptions(string topic, Subscribers subs)
274+
private async Task SaveSubscriptions(string topic, Subscribers subs, CancellationToken ct)
276275
{
277276
try
278277
{
279-
//TODO: cancellation token config?
280278
Logger.LogDebug("Topic {Topic} saved subscriptions {Subscriptions}", _topic, subs);
281-
await _subscriptionStore.SetAsync(topic, subs, CancellationToken.None).ConfigureAwait(false);
279+
await _subscriptionStore.SetAsync(topic, subs, ct).ConfigureAwait(false);
282280
}
283281
catch (Exception e)
284282
{
@@ -293,26 +291,31 @@ private async Task OnUnsubscribe(IContext context, UnsubscribeRequest unsub)
293291
{
294292
_subscribers = _subscribers.Remove(unsub.Subscriber);
295293
Logger.LogDebug("Topic {Topic} - {Subscriber} unsubscribed", _topic, unsub);
296-
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false);
294+
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }, context.CancellationToken)
295+
.ConfigureAwait(false);
297296
context.Respond(new UnsubscribeResponse());
298297
}
299298

300299
private async Task OnSubscribe(IContext context, SubscribeRequest sub)
301300
{
302301
_subscribers = _subscribers.Add(sub.Subscriber);
303302
Logger.LogDebug("Topic {Topic} - {Subscriber} subscribed", _topic, sub);
304-
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false);
303+
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }, context.CancellationToken)
304+
.ConfigureAwait(false);
305305
context.Respond(new SubscribeResponse());
306306
}
307307

308-
private async Task OnDeadLetterResponse(DeadLetterResponse msg)
308+
private async Task OnDeadLetterResponse(IContext context, DeadLetterResponse msg)
309309
{
310-
var deadLetterSub = msg.Target == null ? null : _subscribers.FirstOrDefault(s => s.Pid.Address == msg.Target.Address);
310+
var deadLetterSub = msg.Target == null
311+
? null
312+
: _subscribers.FirstOrDefault(s => s.Pid.Address == msg.Target.Address);
311313
if (deadLetterSub != null)
312314
{
313315
_subscribers = _subscribers.Remove(deadLetterSub);
314316
Logger.LogDebug("Topic {Topic} - {Subscriber} unsubscribed due to dead letter", _topic, deadLetterSub);
315-
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false);
317+
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }, context.CancellationToken)
318+
.ConfigureAwait(false);
316319
}
317320
}
318321
}

0 commit comments

Comments
 (0)