-
Notifications
You must be signed in to change notification settings - Fork 11.9k
RIP 52 Optimize Building ConsumeQueue
- Current State: Discussing
- Authors: nowinkeyy
- Shepherds: lizhanhui
- Mailing List discussion: [email protected]
- Pull Request:
- Released:
- Related Docs: English 中文版
-
Will we add a new module? No.
-
Will we add new APIs? Yes.
-
Will we add a new feature? Yes.
- Are there any problems of our current project?
Messages published by the producers are eventually appended to CommitLog files on broker nodes, out of which the ReputMessageService builds consume queues. The building procedure turns to be both IO and CPU intensive: when message production traffic is heavy, building of the ConsumeQueue hits bottleneck, causing significant commit log dispatch lag warning and further delay in message consumption among consumers. For application that is sensitive to end-to-end latency, a spike is observed in APM(application performance monitor).
- What can we benefit from proposed changes?
Prior to the ReputMessageService thread, add a thread pool to process messages concurrently.
The ReputMessageService first takes CommitLog message in turn and assigns a unique and sorted logical number to it, then wraps it as a task and dispatch it into the thread pool. Workers in the thread pool execute these tasks concurrently to 1) build ConsumeQueue entries, 2) group commit entries for each message queue.
Even if building consume queues for each message queue are parallelized, we can ensure that entries in ConsumeQueue for the messages are still in-order.
- What problem is this proposal designed to solve?
To solve the problem that ReputMessageService is the single thread and generation of ConsumeQueue messages is too slowly. Refer to above for details.
- To what degree should we solve the problem?
-
What problem is this proposal NOT designed to solve?
-
Are there any limits of this proposal?
- The old way
1.The producer sends message to the broker
2.The broker writes the message to CommitLog
3.The ReputMessageService thread scans CommitLog messages sequentially and wraps them as DispatchRequest
4.The ReputMessageService thread executes DispatchRequest sequentially, generates ConsumeQueue messages and write them to ConsumeQueue
- The new way
1.The producer sends message to the broker
2.The broker writes the message to CommitLog
3.The ReputMessageService thread scans CommitLog messages sequentially and wraps them as DispatchRequest
4.The ReputMessageService thread assigns a number (unique and sorted in the same ConsumeQueue) to each DispatchRequest and wraps the DispatchRequest as a task into the thread pool
5.Thread in the thread pool execute task to generate ConsumeQueue message and write it to the ConsumeQueue. Before committing to the ConsumeQueue, the thread needs to query bitmap to ensure that the message is committed in order (meaning that if there are messages before this message that are not written to ConsumeQueue, they will not be committed and invisible, if all messages before this message have been written, the message is committed and visible)
- Take an example from the picture above
The ReputMessageService thread scans the CommitLog, gets five messages (the same topic and queueId), goes to bitmap in turn to get a unique and sorted number, and puts it into the thread pool for parallel execution.
m1, m2 and m5 have been written into ConsumeQueue, and the commit-position is m2. Although m5 has written to the ConsumeQueue, the commit-position cannot be updated, and the commit-position only can be updated when the next position of commit-position (m3) is written.
Suppose that when m3 and m4 have been written to ConsumeQueue, the commit-position will not be updated to m3 directly, because there may be a larger committable position, so it is necessary to query bitmap to get the largest committable position.
At the point, m4 and m5 have been written to ConsumeQueue, and m6 has not been written to ConsumeQueue, so the largest committable position is m5, and the final commit-position is updated to m5.
NOTE:Access to the same ConsumeQueue is serialized and may be changed to limited parallelization later.
Copyright © 2016~2022 The Apache Software Foundation.
- Home
- RocketMQ Improvement Proposal
- User Guide
- Community