- 
                Notifications
    
You must be signed in to change notification settings  - Fork 3.4k
 
Defer broadcast join probe leaf splits #27149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Defer broadcast join probe leaf splits #27149
Conversation
          
Reviewer's GuideThis PR defers probe-side partitioned source splits until all upstream pipeline dependencies—especially the build side of broadcast joins—are satisfied. It tracks and propagates a pipelineDependenciesSatisfied future through operator and driver factories, signals build-side completion via new buildPipelineReady futures in join operators, and refactors split scheduling in SqlTaskExecution to prevent starvation of build-side splits. Sequence diagram for deferred probe-side split scheduling in broadcast joinssequenceDiagram
    participant "SqlTaskExecution"
    participant "DriverFactory"
    participant "OperatorFactory"
    participant "JoinOperatorFactory"
    participant "JoinBridgeManager"
    participant "Build Side"
    participant "Probe Side"
    "SqlTaskExecution"->>"DriverFactory": getPipelineDependenciesSatisfied()
    "DriverFactory"->>"OperatorFactory": pipelineDependenciesSatisfied()
    "OperatorFactory"->>"JoinOperatorFactory": pipelineDependenciesSatisfied()
    "JoinOperatorFactory"->>"JoinBridgeManager": buildPipelineReady()
    "JoinBridgeManager"->>"Build Side": getBuildFinishedFuture()
    "Build Side"-->>"JoinBridgeManager": Signal build finished
    "JoinBridgeManager"-->>"JoinOperatorFactory": buildPipelineReady future completed
    "JoinOperatorFactory"-->>"OperatorFactory": pipelineDependenciesSatisfied future completed
    "OperatorFactory"-->>"DriverFactory": pipelineDependenciesSatisfied future completed
    "DriverFactory"-->>"SqlTaskExecution": pipelineDependenciesSatisfied future completed
    "SqlTaskExecution"->>"Probe Side": Schedule probe-side splits
    Class diagram for pipeline dependency tracking and join operator changesclassDiagram
    class OperatorFactory {
        +createOperator(driverContext)
        +noMoreOperators()
        +duplicate()
        +pipelineDependenciesSatisfied() ListenableFuture<Void>
    }
    class DriverFactory {
        +getPipelineDependenciesSatisfied() ListenableFuture<Void>
        -pipelineDependenciesSatisfied: ListenableFuture<Void>
    }
    class WorkProcessorOperatorAdapter {
        +buildPipelineReady() ListenableFuture<Void>
        +pipelineDependenciesSatisfied() ListenableFuture<Void>
    }
    class JoinOperatorFactory {
        +createOuterOperatorFactory()
        +buildPipelineReady() ListenableFuture<Void>
    }
    class LookupJoinOperatorFactory {
        +buildPipelineReady() ListenableFuture<Void>
    }
    class JoinBridgeManager {
        +getBuildFinishedFuture() ListenableFuture<Void>
        -whenBuildFinishes: ListenableFuture<Void>
    }
    OperatorFactory <|-- LookupJoinOperatorFactory
    OperatorFactory <|-- JoinOperatorFactory
    DriverFactory o-- OperatorFactory
    WorkProcessorOperatorAdapter o-- OperatorFactory
    JoinOperatorFactory o-- JoinBridgeManager
    LookupJoinOperatorFactory o-- JoinBridgeManager
    Flow diagram for split scheduling with pipeline dependency checkflowchart TD
    A["Split assignment received"] --> B["Merge into pending splits"]
    B --> C["Check pipelineDependenciesSatisfied future"]
    C -- "Not done" --> D["Register listener to reschedule after unblocked"]
    C -- "Done" --> E["Schedule partitioned source pending splits"]
    D --> E
    E --> F["Check task completion"]
    File-Level Changes
 Possibly linked issues
 Tips and commandsInteracting with Sourcery
 Customizing Your ExperienceAccess your dashboard to: 
 Getting Help
  | 
    
78b86c9    to
    669d3d0      
    Compare
  
    669d3d0    to
    12a6064      
    Compare
  
    Avoids starting join probe leaf splits for tasks that are awaiting the build side completion. Otherwise, build side leaf split concurrency will be starved by probe side splits that start and then immediately block.
12a6064    to
    0524103      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java:365-367` </location>
<code_context>
+            ListenableFuture<Void> pipelineDependenciesSatisfied = partitionedDriverRunnerFactory.getPipelineDependenciesSatisfied();
+            if (!pipelineDependenciesSatisfied.isDone()) {
+                // Only register a single re-schedule listener if we're blocked on pipeline dependencies
+                if (this.pipelineDependenciesSatisfied.isDone()) {
+                    this.pipelineDependenciesSatisfied = pipelineDependenciesSatisfied;
+                    pipelineDependenciesSatisfied.addListener(this::scheduleSourcePartitionedSplitsAfterPipelineUnblocked, notificationExecutor);
+                }
+                break;
</code_context>
<issue_to_address>
**issue (bug_risk):** Potential for missed scheduling if pipelineDependenciesSatisfied transitions to done between check and listener registration.
Using isDone() before addListener introduces a race condition where the callback may not be registered if the future completes between these calls. To avoid missing the callback, use Futures.addCallback or an equivalent method that guarantees invocation regardless of the future's completion state.
</issue_to_address>
### Comment 2
<location> `core/trino-main/src/main/java/io/trino/operator/OperatorFactory.java:34-37` </location>
<code_context>
     OperatorFactory duplicate();
+
+    default ListenableFuture<Void> pipelineDependenciesSatisfied()
+    {
+        return Futures.immediateVoidFuture();
+    }
 }
</code_context>
<issue_to_address>
**suggestion:** Default implementation may mask missing dependencies in custom OperatorFactories.
This default may lead to overlooked dependency handling in custom implementations. Recommend documenting the need to override or making the method abstract.
```suggestion
    /**
     * Returns a future that will be completed when all pipeline dependencies for this operator are satisfied.
     * <p>
     * Implementors must override this method to ensure that any required dependencies are handled.
     * Failing to do so may result in incorrect operator behavior.
     */
    ListenableFuture<Void> pipelineDependenciesSatisfied();
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| default ListenableFuture<Void> pipelineDependenciesSatisfied() | ||
| { | ||
| return Futures.immediateVoidFuture(); | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Default implementation may mask missing dependencies in custom OperatorFactories.
This default may lead to overlooked dependency handling in custom implementations. Recommend documenting the need to override or making the method abstract.
| default ListenableFuture<Void> pipelineDependenciesSatisfied() | |
| { | |
| return Futures.immediateVoidFuture(); | |
| } | |
| /** | |
| * Returns a future that will be completed when all pipeline dependencies for this operator are satisfied. | |
| * <p> | |
| * Implementors must override this method to ensure that any required dependencies are handled. | |
| * Failing to do so may result in incorrect operator behavior. | |
| */ | |
| ListenableFuture<Void> pipelineDependenciesSatisfied(); | 
Description
Avoids starting join probe leaf splits for tasks that are awaiting the build side completion. Otherwise, build side leaf split concurrency will be starved by probe side splits that start and then immediately block.
Additional context and related issues
Relates to #27121
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:
Summary by Sourcery
Defer broadcast join probe-side split scheduling until build pipelines complete to avoid starving build-side split concurrency
Enhancements: