Skip to content

Commit 277831c

Browse files
Event Hubs - Don't pass cancelled token to TryExecute when draining (#38067)
* Event Hubs - Don't pass cancelled token to TryExecute when draining * test * Fix nullref and add comment * Update sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs * Fix test * PR fb * fix * Respect drain mode * rename * Fix disposed check * Fix token behavior * Fix * Fix namespace on test class * Fix token link * revert stopasync calls in tests
1 parent 11736d0 commit 277831c

File tree

10 files changed

+237
-106
lines changed

10 files changed

+237
-106
lines changed

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using Microsoft.Azure.WebJobs.Description;
1010
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
1111
using Microsoft.Azure.WebJobs.EventHubs.Processor;
12+
using Microsoft.Azure.WebJobs.Host;
1213
using Microsoft.Azure.WebJobs.Host.Bindings;
1314
using Microsoft.Azure.WebJobs.Host.Config;
1415
using Microsoft.Azure.WebJobs.Host.Configuration;
@@ -28,19 +29,22 @@ internal class EventHubExtensionConfigProvider : IExtensionConfigProvider
2829
private readonly IConverterManager _converterManager;
2930
private readonly IWebJobsExtensionConfiguration<EventHubExtensionConfigProvider> _configuration;
3031
private readonly EventHubClientFactory _clientFactory;
32+
private readonly IDrainModeManager _drainModeManager;
3133

3234
public EventHubExtensionConfigProvider(
3335
IOptions<EventHubOptions> options,
3436
ILoggerFactory loggerFactory,
3537
IConverterManager converterManager,
3638
IWebJobsExtensionConfiguration<EventHubExtensionConfigProvider> configuration,
37-
EventHubClientFactory clientFactory)
39+
EventHubClientFactory clientFactory,
40+
IDrainModeManager drainModeManager)
3841
{
3942
_options = options;
4043
_loggerFactory = loggerFactory;
4144
_converterManager = converterManager;
4245
_configuration = configuration;
4346
_clientFactory = clientFactory;
47+
_drainModeManager = drainModeManager;
4448
}
4549

4650
internal Action<ExceptionReceivedEventArgs> ExceptionHandler { get; set; }
@@ -71,7 +75,12 @@ public void Initialize(ExtensionConfigContext context)
7175
.AddOpenConverter<OpenType.Poco, EventData>(ConvertPocoToEventData);
7276

7377
// register our trigger binding provider
74-
var triggerBindingProvider = new EventHubTriggerAttributeBindingProvider(_converterManager, _options, _loggerFactory, _clientFactory);
78+
var triggerBindingProvider = new EventHubTriggerAttributeBindingProvider(
79+
_converterManager,
80+
_options,
81+
_loggerFactory,
82+
_clientFactory,
83+
_drainModeManager);
7584
context.AddBindingRule<EventHubTriggerAttribute>()
7685
.BindToTrigger(triggerBindingProvider);
7786

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ internal sealed partial class EventHubListener
2828
/// </summary>
2929
internal class PartitionProcessor : IEventProcessor, IDisposable
3030
{
31-
private readonly CancellationTokenSource _cts = new();
32-
3331
private readonly ITriggeredFunctionExecutor _executor;
3432
private readonly bool _singleDispatch;
3533
private readonly ILogger _logger;
@@ -44,13 +42,15 @@ internal class PartitionProcessor : IEventProcessor, IDisposable
4442
private Task _cachedEventsBackgroundTask;
4543
private CancellationTokenSource _cachedEventsBackgroundTaskCts;
4644
private SemaphoreSlim _cachedEventsGuard;
45+
private readonly CancellationToken _functionExecutionToken;
46+
private readonly CancellationTokenSource _ownershipLostTokenSource;
4747

4848
/// <summary>
4949
/// When we have a minimum batch size greater than 1, this class manages caching events.
5050
/// </summary>
5151
internal PartitionProcessorEventsManager CachedEventsManager { get; }
5252

53-
public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch)
53+
public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch, CancellationToken functionExecutionToken)
5454
{
5555
_executor = executor;
5656
_singleDispatch = singleDispatch;
@@ -59,6 +59,8 @@ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor ex
5959
_firstFunctionInvocation = true;
6060
_maxWaitTime = options.MaxWaitTime;
6161
_minimumBatchesEnabled = options.MinEventBatchSize > 1; // 1 is the default
62+
_functionExecutionToken = functionExecutionToken;
63+
_ownershipLostTokenSource = new CancellationTokenSource();
6264

6365
// Events are only cached when building a batch of minimum size.
6466
if (_minimumBatchesEnabled)
@@ -70,8 +72,12 @@ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor ex
7072

7173
public Task CloseAsync(EventProcessorHostPartition context, ProcessingStoppedReason reason)
7274
{
73-
// signal cancellation for any in progress executions and clear the cached events
74-
_cts.Cancel();
75+
if (reason == ProcessingStoppedReason.OwnershipLost)
76+
{
77+
_ownershipLostTokenSource.Cancel();
78+
}
79+
80+
// clear the cached events
7581
CachedEventsManager?.ClearEventCache();
7682

7783
_logger.LogDebug(GetOperationDetails(context, $"CloseAsync, {reason}"));
@@ -98,11 +104,10 @@ public Task ProcessErrorAsync(EventProcessorHostPartition context, Exception err
98104
/// </summary>
99105
/// <param name="context">The partition information for this partition.</param>
100106
/// <param name="messages">The events to process.</param>
101-
/// <param name="partitionProcessingCancellationToken">The cancellation token to respect if processing for the partition is canceled.</param>
102107
/// <returns></returns>
103-
public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable<EventData> messages, CancellationToken partitionProcessingCancellationToken)
108+
public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable<EventData> messages)
104109
{
105-
using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, partitionProcessingCancellationToken);
110+
using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_functionExecutionToken, _ownershipLostTokenSource.Token);
106111
_mostRecentPartitionContext = context;
107112
var events = messages.ToArray();
108113
EventData eventToCheckpoint = null;
@@ -135,7 +140,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
135140
TriggerDetails = eventHubTriggerInput.GetTriggerDetails(context)
136141
};
137142

138-
await _executor.TryExecuteAsync(input, linkedCts.Token).ConfigureAwait(false);
143+
await _executor.TryExecuteAsync(input, _functionExecutionToken).ConfigureAwait(false);
139144
_firstFunctionInvocation = false;
140145
eventToCheckpoint = events[i];
141146
}
@@ -168,7 +173,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
168173
_logger.LogDebug($"Partition Processor received events and is attempting to invoke function ({details})");
169174

170175
UpdateCheckpointContext(triggerEvents, context);
171-
await TriggerExecute(triggerEvents, context, linkedCts.Token).ConfigureAwait(false);
176+
await TriggerExecute(triggerEvents, context, _functionExecutionToken).ConfigureAwait(false);
172177
eventToCheckpoint = triggerEvents.Last();
173178

174179
// If there is a background timer task, cancel it and dispose of the cancellation token. If there
@@ -186,7 +191,8 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
186191
if (_cachedEventsBackgroundTaskCts == null && CachedEventsManager.HasCachedEvents)
187192
{
188193
// If there are events waiting to be processed, and no background task running, start a monitoring cycle.
189-
_cachedEventsBackgroundTaskCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token);
194+
// Don't reference linkedCts in the class level background task, as it will be disposed when the method goes out of scope.
195+
_cachedEventsBackgroundTaskCts = CancellationTokenSource.CreateLinkedTokenSource(_functionExecutionToken, _ownershipLostTokenSource.Token);
190196
_cachedEventsBackgroundTask = MonitorCachedEvents(context.ProcessorHost.GetLastReadCheckpoint(context.PartitionId)?.LastModified, _cachedEventsBackgroundTaskCts);
191197
}
192198
}
@@ -201,7 +207,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
201207
else
202208
{
203209
UpdateCheckpointContext(events, context);
204-
await TriggerExecute(events, context, linkedCts.Token).ConfigureAwait(false);
210+
await TriggerExecute(events, context, _functionExecutionToken).ConfigureAwait(false);
205211
eventToCheckpoint = events.LastOrDefault();
206212
}
207213

@@ -276,7 +282,7 @@ private async Task MonitorCachedEvents(DateTimeOffset? lastCheckpointTime, Cance
276282
var details = GetOperationDetails(_mostRecentPartitionContext, "MaxWaitTimeElapsed");
277283
_logger.LogDebug($"Partition Processor has waited MaxWaitTime since last invocation and is attempting to invoke function on all held events ({details})");
278284

279-
await TriggerExecute(triggerEvents, _mostRecentPartitionContext, backgroundCancellationTokenSource.Token).ConfigureAwait(false);
285+
await TriggerExecute(triggerEvents, _mostRecentPartitionContext, _functionExecutionToken).ConfigureAwait(false);
280286
if (!backgroundCancellationTokenSource.Token.IsCancellationRequested)
281287
{
282288
await CheckpointAsync(triggerEvents.Last(), _mostRecentPartitionContext).ConfigureAwait(false);
@@ -408,7 +414,6 @@ protected virtual void Dispose(bool disposing)
408414
{
409415
if (disposing)
410416
{
411-
_cts.Dispose();
412417
_cachedEventsBackgroundTaskCts?.Dispose();
413418
_cachedEventsGuard?.Dispose();
414419
}

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using Azure.Messaging.EventHubs.Primitives;
88
using Microsoft.Azure.WebJobs.EventHubs.Processor;
99
using Microsoft.Azure.WebJobs.Extensions.EventHubs.Listeners;
10+
using Microsoft.Azure.WebJobs.Host;
1011
using Microsoft.Azure.WebJobs.Host.Executors;
1112
using Microsoft.Azure.WebJobs.Host.Listeners;
1213
using Microsoft.Azure.WebJobs.Host.Scale;
@@ -27,6 +28,9 @@ internal sealed partial class EventHubListener : IListener, IEventProcessorFacto
2728
private readonly ILoggerFactory _loggerFactory;
2829
private readonly ILogger _logger;
2930
private string _details;
31+
private CancellationTokenSource _functionExecutionCancellationTokenSource;
32+
private readonly IDrainModeManager _drainModeManager;
33+
private volatile bool _disposed;
3034

3135
public EventHubListener(
3236
string functionId,
@@ -36,7 +40,8 @@ public EventHubListener(
3640
IEventHubConsumerClient consumerClient,
3741
BlobCheckpointStoreInternal checkpointStore,
3842
EventHubOptions options,
39-
ILoggerFactory loggerFactory)
43+
ILoggerFactory loggerFactory,
44+
IDrainModeManager drainModeManager)
4045
{
4146
_loggerFactory = loggerFactory;
4247
_executor = executor;
@@ -45,6 +50,8 @@ public EventHubListener(
4550
_checkpointStore = checkpointStore;
4651
_options = options;
4752
_logger = _loggerFactory.CreateLogger<EventHubListener>();
53+
_functionExecutionCancellationTokenSource = new CancellationTokenSource();
54+
_drainModeManager = drainModeManager;
4855

4956
EventHubMetricsProvider metricsProvider = new EventHubMetricsProvider(functionId, consumerClient, checkpointStore, _loggerFactory.CreateLogger<EventHubMetricsProvider>());
5057

@@ -68,20 +75,29 @@ public EventHubListener(
6875
}
6976

7077
/// <summary>
71-
/// Cancel any in progress listen operation.
78+
/// Cancel should be called prior to Dispose. We just validate that we are not already disposed.
79+
/// This is consistent with the Service Bus listener behavior.
7280
/// </summary>
7381
void IListener.Cancel()
7482
{
75-
#pragma warning disable AZC0102
76-
StopAsync(CancellationToken.None).GetAwaiter().GetResult();
77-
#pragma warning restore AZC0102
83+
if (_disposed)
84+
{
85+
throw new ObjectDisposedException(nameof(IListener));
86+
}
7887
}
7988

8089
void IDisposable.Dispose()
8190
{
91+
_functionExecutionCancellationTokenSource.Cancel();
92+
8293
#pragma warning disable AZC0102
8394
StopAsync(CancellationToken.None).GetAwaiter().GetResult();
8495
#pragma warning restore AZC0102
96+
97+
// No need to dispose the _disposingCancellationTokenSource since we don't create it as a linked token and
98+
// it won't use a timer, so the Dispose method is essentially a no-op. The downside to disposing it is that
99+
// any customers who are trying to use it to cancel their own operations would get an ObjectDisposedException.
100+
_disposed = true;
85101
}
86102

87103
public async Task StartAsync(CancellationToken cancellationToken)
@@ -94,14 +110,19 @@ public async Task StartAsync(CancellationToken cancellationToken)
94110

95111
public async Task StopAsync(CancellationToken cancellationToken)
96112
{
113+
if (!_drainModeManager.IsDrainModeEnabled)
114+
{
115+
_functionExecutionCancellationTokenSource.Cancel();
116+
}
117+
97118
await _eventProcessorHost.StopProcessingAsync(cancellationToken).ConfigureAwait(false);
98119

99120
_logger.LogDebug($"EventHub listener stopped ({_details})");
100121
}
101122

102123
IEventProcessor IEventProcessorFactory.CreatePartitionProcessor()
103124
{
104-
return new PartitionProcessor(_options, _executor, _loggerFactory.CreateLogger<PartitionProcessor>(), _singleDispatch);
125+
return new PartitionProcessor(_options, _executor, _loggerFactory.CreateLogger<PartitionProcessor>(), _singleDispatch, _functionExecutionCancellationTokenSource.Token);
105126
}
106127

107128
public IScaleMonitor GetMonitor()

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ protected override async Task<EventProcessorCheckpoint> GetCheckpointAsync(strin
6666

6767
if (checkpoint is BlobCheckpointStoreInternal.BlobStorageCheckpoint blobCheckpoint && blobCheckpoint is not null)
6868
{
69-
_lastReadCheckpoint[partitionId] = new CheckpointInfo(blobCheckpoint.Offset ?? -1, blobCheckpoint.SequenceNumber ?? -1, blobCheckpoint.LastModified);
69+
_lastReadCheckpoint[partitionId] = new CheckpointInfo(blobCheckpoint.Offset ?? -1, blobCheckpoint.SequenceNumber ?? -1,
70+
blobCheckpoint.LastModified);
7071
}
7172

7273
return checkpoint;
@@ -112,7 +113,7 @@ protected override Task OnProcessingEventBatchAsync(IEnumerable<EventData> event
112113
return Task.CompletedTask;
113114
}
114115

115-
return partition.EventProcessor.ProcessEventsAsync(partition, events, cancellationToken);
116+
return partition.EventProcessor.ProcessEventsAsync(partition, events);
116117
}
117118

118119
protected override async Task OnInitializingPartitionAsync(EventProcessorHostPartition partition, CancellationToken cancellationToken)

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/IEventProcessor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@ internal interface IEventProcessor
1515
Task CloseAsync(EventProcessorHostPartition context, ProcessingStoppedReason reason);
1616
Task OpenAsync(EventProcessorHostPartition context);
1717
Task ProcessErrorAsync(EventProcessorHostPartition context, Exception error);
18-
Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable<EventData> messages, CancellationToken cancellationToken);
18+
Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable<EventData> messages);
1919
}
2020
}

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using Azure.Messaging.EventHubs.Core;
88
using Azure.Messaging.EventHubs.Primitives;
99
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
10+
using Microsoft.Azure.WebJobs.Host;
1011
using Microsoft.Azure.WebJobs.Host.Bindings;
1112
using Microsoft.Azure.WebJobs.Host.Listeners;
1213
using Microsoft.Azure.WebJobs.Host.Triggers;
@@ -21,17 +22,20 @@ internal class EventHubTriggerAttributeBindingProvider : ITriggerBindingProvider
2122
private readonly IOptions<EventHubOptions> _options;
2223
private readonly EventHubClientFactory _clientFactory;
2324
private readonly IConverterManager _converterManager;
25+
private readonly IDrainModeManager _drainModeManager;
2426

2527
public EventHubTriggerAttributeBindingProvider(
2628
IConverterManager converterManager,
2729
IOptions<EventHubOptions> options,
2830
ILoggerFactory loggerFactory,
29-
EventHubClientFactory clientFactory)
31+
EventHubClientFactory clientFactory,
32+
IDrainModeManager drainModeManager)
3033
{
3134
_converterManager = converterManager;
3235
_options = options;
3336
_clientFactory = clientFactory;
3437
_loggerFactory = loggerFactory;
38+
_drainModeManager = drainModeManager;
3539
}
3640

3741
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
@@ -67,7 +71,8 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex
6771
_clientFactory.GetEventHubConsumerClient(attribute.EventHubName, attribute.Connection, attribute.ConsumerGroup),
6872
checkpointStore,
6973
options,
70-
_loggerFactory);
74+
_loggerFactory,
75+
_drainModeManager);
7176
return Task.FromResult(listener);
7277
};
7378
#pragma warning disable 618

0 commit comments

Comments
 (0)