Skip to content
Olivier Coanet edited this page Jan 29, 2022 · 12 revisions

The consumers are implemented using event handlers.

Event processing

For every event handler, the Disruptor will run a dedicated thread with the following pseudo-code:

var nextSequence = 0L;
while (IsRunning)
{
    // (1) Waits for the next event to be:
    // - published and
    // - available (if other event handlers are configured to run before this one).
    var availableSequence = WaitForAvailableSequence();

    // (2) Gets the events and invoke the event handler.
    while (nextSequence <= availableSequence)
    {
        var evt = RingBuffer[nextSequence];
        var endOfBatch = nextSequence == availableSequence;
        EventHandler.Handle(evt, endOfBatch;
        nextSequence++;
    }

    // (3) Marks the sequence as available, typically to allow following event handlers
    // to process the event.
    SetSequenceAsAvailable(availableSequence);
}

The step (1) is the only part that requires synchronization. It can be lock-based or lock-free depending on your wait strategy choice.

It is often the case that multiple events are available, because multiple events are be published together or simply because individual events are published faster than the handler can process them. The list of available events is called a batch. The goal of the inner loop (2) is to process the batch as fast as possible, without any synchronization. The handler is notified of the end of the batch, which can be very helpful for I/O based operations or to implement conflation.

The step (3) is a simple write of the current sequence and does not uses any synchronization.

Event handler interfaces

  1. IEventHandler<T>
void OnEvent(T data, long sequence, bool endOfBatch);

It is the simplest interface, with a method that it invoked for every event. The endOfBatch parameter

Note that there is an equivalent interface for value type disruptors: IValueEventHandler<T>.

  1. IBatchEventHandler<T>
void OnBatch(EventBatch<T> batch, long sequence);

Here the method will be invoked once per batch. EventBatch<T> is very similar to Span<T> expect that it is not a ref struct. This handler has better performance than the default IEventHandler<T> for processing batches of multiple events. It is also much more convenient for explicit batch management.

  1. IAsyncBatchEventHandler<T>
ValueTask OnBatch(EventBatch<T> batch, long sequence);

This event handler is quite unique because:

  • The processing of this handler can generate heap allocations.
  • The processing of this handler runs on thread-pool threads (other handlers runs on dedicated threads).

It is intended to be used in applications that can accept a reasonable amount of heap allocations and that use async APIs to process events.

Event handlers aggregation

The disruptor will create dedicated threads and event processing loops for each registered event handler.

disruptor.HandleEventsWith(new Handler1())
         .Then(new Handler2())
         .Then(new Handler3());

Here the disruptor will create 3 threads and 3 event processing loops. Having independent handlers can be required, for example if Handler2 is relatively slow and you need Handler1 to process events as fast as possible. But using independent handlers comes at a cost: it uses more CPU resources and the end-to-end latency is relatively greater because there is more coordination.

If both Handler1 and Handler2 are very fast and you want them to run in the same event processing loop, you can simply merge them into one handler, or use the AggregateEventHandler:

disruptor.HandleEventsWith(new AggregateEventHandler<Event>(new Handler1(), new Handler2()))
         .Then(new Handler3());

Additional event handler methods

TODO

Making event available to next event handlers

TODO

Clone this wiki locally