-
Notifications
You must be signed in to change notification settings - Fork 230
Event Handlers
The consumers are implemented using event handlers.
For every event handler, the Disruptor will run a dedicated thread with the following pseudo-code:
var nextSequence = 0L;
while (IsRunning)
{
// (1) Waits for the next event to be:
// - published and
// - available (if other event handlers are configured to run before this one).
var availableSequence = WaitForAvailableSequence();
// (2) Gets the events and invoke the event handler.
while (nextSequence <= availableSequence)
{
var evt = RingBuffer[nextSequence];
var endOfBatch = nextSequence == availableSequence;
EventHandler.Handle(evt, endOfBatch;
nextSequence++;
}
// (3) Marks the sequence as available, typically to allow following event handlers
// to process the event.
SetSequenceAsAvailable(availableSequence);
}
The step (1) is the only part that requires synchronization. It can be lock-based or lock-free depending on your wait strategy choice.
It is often the case that multiple events are available, because multiple events are be published together or simply because individual events are published faster than the handler can process them. The list of available events is called a batch. The goal of the inner loop (2) is to process the batch as fast as possible, without any synchronization. The handler is notified of the end of the batch, which can be very helpful for I/O based operations or to implement conflation.
The step (3) is a simple write of the current sequence and does not uses any synchronization.
IEventHandler<T>
void OnEvent(T data, long sequence, bool endOfBatch);
It is the simplest interface, with a method that it invoked for every event. The endOfBatch parameter
Note that there is an equivalent interface for value type disruptors: IValueEventHandler<T>
.
IBatchEventHandler<T>
void OnBatch(EventBatch<T> batch, long sequence);
Here the method will be invoked once per batch. EventBatch<T>
is very similar to Span<T>
expect that it is not a ref struct.
This handler has better performance than the default IEventHandler<T>
for processing batches of multiple events.
It is also much more convenient for explicit batch management.
IAsyncBatchEventHandler<T>
ValueTask OnBatch(EventBatch<T> batch, long sequence);
This event handler is quite unique because:
- The processing of this handler can generate heap allocations.
- The processing of this handler runs on thread-pool threads (other handlers runs on dedicated threads).
It is intended to be used in applications that can accept a reasonable amount of heap allocations and that use async APIs to process events.
The disruptor will create dedicated threads and event processing loops for each registered event handler.
disruptor.HandleEventsWith(new Handler1())
.Then(new Handler2())
.Then(new Handler3());
Here the disruptor will create 3 threads and 3 event processing loops. Having independent handlers can be required, for example if Handler2
is relatively slow and you need Handler1
to process events as fast as possible. But using independent handlers comes at a cost: it uses more CPU resources and the end-to-end latency is relatively greater because there is more coordination.
If both Handler1 and Handler2 are very fast and you want them to run in the same event processing loop, you can simply merge them into one handler, or use the AggregateEventHandler
:
disruptor.HandleEventsWith(new AggregateEventHandler<Event>(new Handler1(), new Handler2()))
.Then(new Handler3());
TODO
TODO