Skip to content

Conversation

@pettyjamesm
Copy link
Member

@pettyjamesm pettyjamesm commented Oct 29, 2025

Description

Avoids starting join probe leaf splits for tasks that are awaiting the build side completion. Otherwise, build side leaf split concurrency will be starved by probe side splits that start and then immediately block.

Additional context and related issues

Relates to #27121

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

## Section
* Fix some things. ({issue}`issuenumber`)

Summary by Sourcery

Defer broadcast join probe-side split scheduling until build pipelines complete to avoid starving build-side split concurrency

Enhancements:

  • Track pipeline dependency readiness in DriverFactory and OperatorFactory to expose a pipelineDependenciesSatisfied future
  • Buffer and batch pending splits in SqlTaskExecution, scheduling them only when pipeline dependencies are satisfied
  • Extend WorkProcessorOperatorAdapter, JoinBridgeManager, and join operator factories to expose buildPipelineReady futures and register listeners

@cla-bot cla-bot bot added the cla-signed label Oct 29, 2025
@sourcery-ai
Copy link

sourcery-ai bot commented Oct 29, 2025

Reviewer's Guide

This PR defers probe-side partitioned source splits until all upstream pipeline dependencies—especially the build side of broadcast joins—are satisfied. It tracks and propagates a pipelineDependenciesSatisfied future through operator and driver factories, signals build-side completion via new buildPipelineReady futures in join operators, and refactors split scheduling in SqlTaskExecution to prevent starvation of build-side splits.

Sequence diagram for deferred probe-side split scheduling in broadcast joins

sequenceDiagram
    participant "SqlTaskExecution"
    participant "DriverFactory"
    participant "OperatorFactory"
    participant "JoinOperatorFactory"
    participant "JoinBridgeManager"
    participant "Build Side"
    participant "Probe Side"

    "SqlTaskExecution"->>"DriverFactory": getPipelineDependenciesSatisfied()
    "DriverFactory"->>"OperatorFactory": pipelineDependenciesSatisfied()
    "OperatorFactory"->>"JoinOperatorFactory": pipelineDependenciesSatisfied()
    "JoinOperatorFactory"->>"JoinBridgeManager": buildPipelineReady()
    "JoinBridgeManager"->>"Build Side": getBuildFinishedFuture()
    "Build Side"-->>"JoinBridgeManager": Signal build finished
    "JoinBridgeManager"-->>"JoinOperatorFactory": buildPipelineReady future completed
    "JoinOperatorFactory"-->>"OperatorFactory": pipelineDependenciesSatisfied future completed
    "OperatorFactory"-->>"DriverFactory": pipelineDependenciesSatisfied future completed
    "DriverFactory"-->>"SqlTaskExecution": pipelineDependenciesSatisfied future completed
    "SqlTaskExecution"->>"Probe Side": Schedule probe-side splits
Loading

Class diagram for pipeline dependency tracking and join operator changes

classDiagram
    class OperatorFactory {
        +createOperator(driverContext)
        +noMoreOperators()
        +duplicate()
        +pipelineDependenciesSatisfied() ListenableFuture<Void>
    }
    class DriverFactory {
        +getPipelineDependenciesSatisfied() ListenableFuture<Void>
        -pipelineDependenciesSatisfied: ListenableFuture<Void>
    }
    class WorkProcessorOperatorAdapter {
        +buildPipelineReady() ListenableFuture<Void>
        +pipelineDependenciesSatisfied() ListenableFuture<Void>
    }
    class JoinOperatorFactory {
        +createOuterOperatorFactory()
        +buildPipelineReady() ListenableFuture<Void>
    }
    class LookupJoinOperatorFactory {
        +buildPipelineReady() ListenableFuture<Void>
    }
    class JoinBridgeManager {
        +getBuildFinishedFuture() ListenableFuture<Void>
        -whenBuildFinishes: ListenableFuture<Void>
    }

    OperatorFactory <|-- LookupJoinOperatorFactory
    OperatorFactory <|-- JoinOperatorFactory
    DriverFactory o-- OperatorFactory
    WorkProcessorOperatorAdapter o-- OperatorFactory
    JoinOperatorFactory o-- JoinBridgeManager
    LookupJoinOperatorFactory o-- JoinBridgeManager
Loading

Flow diagram for split scheduling with pipeline dependency check

flowchart TD
    A["Split assignment received"] --> B["Merge into pending splits"]
    B --> C["Check pipelineDependenciesSatisfied future"]
    C -- "Not done" --> D["Register listener to reschedule after unblocked"]
    C -- "Done" --> E["Schedule partitioned source pending splits"]
    D --> E
    E --> F["Check task completion"]
Loading

File-Level Changes

Change Details Files
Defer probe-side split scheduling until pipeline dependencies are satisfied
  • Track pipelineDependenciesSatisfied future in SqlTaskExecution
  • Replace direct scheduling with pending-splits queue and listener callback
  • Refactor schedulePartitionedSource into conditionally rescheduled logic
core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java
Introduce and propagate pipelineDependenciesSatisfied futures across pipeline components
  • Add default pipelineDependenciesSatisfied in OperatorFactory
  • Collect and combine operator factory futures in DriverFactory
  • Expose getPipelineDependenciesSatisfied in DriverSplitRunnerFactory
core/trino-main/src/main/java/io/trino/operator/OperatorFactory.java
core/trino-main/src/main/java/io/trino/operator/DriverFactory.java
core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java
Propagate build-side completion signals in join operators
  • Add whenBuildFinishes future and use it in JoinBridgeManager and JoinLifecycle
  • Override buildPipelineReady in WorkProcessorOperatorAdapter
  • Implement buildPipelineReady in all JoinOperatorFactory variants
core/trino-main/src/main/java/io/trino/operator/WorkProcessorOperatorAdapter.java
core/trino-main/src/main/java/io/trino/operator/join/JoinBridgeManager.java
core/trino-main/src/main/java/io/trino/operator/join/LookupJoinOperatorFactory.java
core/trino-main/src/main/java/io/trino/operator/join/unspilled/LookupJoinOperatorFactory.java
core/trino-main/src/main/java/io/trino/operator/join/JoinOperatorFactory.java

Possibly linked issues


Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@pettyjamesm pettyjamesm force-pushed the block-probe-leaf-pipelines-on-build branch 6 times, most recently from 78b86c9 to 669d3d0 Compare October 30, 2025 14:14
@pettyjamesm pettyjamesm force-pushed the block-probe-leaf-pipelines-on-build branch from 669d3d0 to 12a6064 Compare October 31, 2025 13:13
Avoids starting join probe leaf splits for tasks that are awaiting the
build side completion. Otherwise, build side leaf split concurrency will
be starved by probe side splits that start and then immediately block.
@pettyjamesm pettyjamesm force-pushed the block-probe-leaf-pipelines-on-build branch from 12a6064 to 0524103 Compare October 31, 2025 16:42
@pettyjamesm pettyjamesm marked this pull request as ready for review October 31, 2025 20:20
Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location> `core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java:365-367` </location>
<code_context>
+            ListenableFuture<Void> pipelineDependenciesSatisfied = partitionedDriverRunnerFactory.getPipelineDependenciesSatisfied();
+            if (!pipelineDependenciesSatisfied.isDone()) {
+                // Only register a single re-schedule listener if we're blocked on pipeline dependencies
+                if (this.pipelineDependenciesSatisfied.isDone()) {
+                    this.pipelineDependenciesSatisfied = pipelineDependenciesSatisfied;
+                    pipelineDependenciesSatisfied.addListener(this::scheduleSourcePartitionedSplitsAfterPipelineUnblocked, notificationExecutor);
+                }
+                break;
</code_context>

<issue_to_address>
**issue (bug_risk):** Potential for missed scheduling if pipelineDependenciesSatisfied transitions to done between check and listener registration.

Using isDone() before addListener introduces a race condition where the callback may not be registered if the future completes between these calls. To avoid missing the callback, use Futures.addCallback or an equivalent method that guarantees invocation regardless of the future's completion state.
</issue_to_address>

### Comment 2
<location> `core/trino-main/src/main/java/io/trino/operator/OperatorFactory.java:34-37` </location>
<code_context>

     OperatorFactory duplicate();
+
+    default ListenableFuture<Void> pipelineDependenciesSatisfied()
+    {
+        return Futures.immediateVoidFuture();
+    }
 }
</code_context>

<issue_to_address>
**suggestion:** Default implementation may mask missing dependencies in custom OperatorFactories.

This default may lead to overlooked dependency handling in custom implementations. Recommend documenting the need to override or making the method abstract.

```suggestion
    /**
     * Returns a future that will be completed when all pipeline dependencies for this operator are satisfied.
     * <p>
     * Implementors must override this method to ensure that any required dependencies are handled.
     * Failing to do so may result in incorrect operator behavior.
     */
    ListenableFuture<Void> pipelineDependenciesSatisfied();
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +34 to +37
default ListenableFuture<Void> pipelineDependenciesSatisfied()
{
return Futures.immediateVoidFuture();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Default implementation may mask missing dependencies in custom OperatorFactories.

This default may lead to overlooked dependency handling in custom implementations. Recommend documenting the need to override or making the method abstract.

Suggested change
default ListenableFuture<Void> pipelineDependenciesSatisfied()
{
return Futures.immediateVoidFuture();
}
/**
* Returns a future that will be completed when all pipeline dependencies for this operator are satisfied.
* <p>
* Implementors must override this method to ensure that any required dependencies are handled.
* Failing to do so may result in incorrect operator behavior.
*/
ListenableFuture<Void> pipelineDependenciesSatisfied();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

1 participant