Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
34963ed
Implement morsel-driven execution for ParquetExec
google-labs-jules[bot] Feb 22, 2026
a67f9ac
Proto
Dandandan Feb 22, 2026
d0da5da
Proto
Dandandan Feb 22, 2026
32eec3c
Fmt
Dandandan Feb 22, 2026
5dc895c
Merge remote-tracking branch 'upstream/main' into parquet-morsel-driv…
Dandandan Feb 22, 2026
cc73788
Proto
Dandandan Feb 22, 2026
d517b5d
Fix
Dandandan Feb 22, 2026
de1606d
Fix
Dandandan Feb 22, 2026
950f6db
Clippy
Dandandan Feb 22, 2026
7f57317
Refactor
Dandandan Feb 23, 2026
fd6d7fd
WIP
Dandandan Feb 23, 2026
37126bf
WIP
Dandandan Feb 23, 2026
2d3c33e
WIP
Dandandan Feb 23, 2026
98f0ea9
WIP
Dandandan Feb 23, 2026
a389b02
WIP
Dandandan Feb 23, 2026
4065448
Update
Dandandan Feb 23, 2026
415315d
Update
Dandandan Feb 23, 2026
13b4977
Config
Dandandan Feb 23, 2026
a30c3f8
Test
Dandandan Feb 23, 2026
8b32ca8
Refactor
Dandandan Feb 23, 2026
876c296
Update test
Dandandan Feb 23, 2026
d2df36b
Update test
Dandandan Feb 23, 2026
869b7d3
Autofix
Dandandan Feb 23, 2026
67ea9ab
Prune files
Dandandan Feb 23, 2026
e845675
Update test
Dandandan Feb 23, 2026
6885981
Update test
Dandandan Feb 23, 2026
3384b8f
Update morsel_driven
Dandandan Feb 23, 2026
211d4fc
Update morsel_driven
Dandandan Feb 23, 2026
2db61f1
fmt
Dandandan Feb 23, 2026
c859d6a
move pruning
Dandandan Feb 23, 2026
24b95fb
Revert "move pruning"
Dandandan Feb 24, 2026
80fa1ec
Reapply "move pruning"
Dandandan Feb 24, 2026
1dcd401
Autofix
Dandandan Feb 24, 2026
04b08a6
Autofix
Dandandan Feb 24, 2026
9799b96
Autofix
Dandandan Feb 24, 2026
de29e40
Autofix
Dandandan Feb 24, 2026
aa27a43
CLippy
Dandandan Feb 24, 2026
9a4aa84
Undo submodule
Dandandan Feb 24, 2026
692bff6
Also change open to be consistent
Dandandan Feb 24, 2026
9a9cf0b
Move page index back to morselize
Dandandan Feb 24, 2026
f79fe63
Move page index back to morselize
Dandandan Feb 25, 2026
976d8dc
Add back lost optimizations
Dandandan Feb 25, 2026
f937f98
Tweak
Dandandan Feb 25, 2026
1bc9375
Merge
Dandandan Feb 25, 2026
e0e8520
Autofix
Dandandan Feb 25, 2026
25b044b
Fmt
Dandandan Feb 25, 2026
b872660
Merge
Dandandan Feb 27, 2026
eb7dfa3
Use builder API
Dandandan Feb 27, 2026
0ee12ec
Adaptive filter pushdown for Parquet
adriangb Feb 22, 2026
eec49cf
fmt
adriangb Feb 22, 2026
24b2d1a
Delete benchmarks/results.txt
adriangb Feb 22, 2026
3c07682
update default
adriangb Feb 25, 2026
fb407e6
Fix filter sort fallback: sort by ascending scan size (cheapest first)
adriangb Feb 28, 2026
dc37461
Improve adaptive filter pushdown: projection-aware placement, sort or…
adriangb Feb 28, 2026
d61fb49
lint
adriangb Feb 28, 2026
d3711b9
perf: Improve adaptive filter placement with projection-aware I/O cos…
adriangb Feb 28, 2026
5d83baf
fix: resolve clippy warnings (too_many_arguments, iter_cloned_collect…
adriangb Feb 28, 2026
8f9827d
use expect
adriangb Feb 28, 2026
4ed7af3
WIP
Dandandan Mar 1, 2026
b440313
WIP
Dandandan Mar 1, 2026
a11409c
Simplify morsel-driven execution code
Dandandan Mar 1, 2026
e95c2d7
Move morsel queue ownership to DataSourceExecStream
Dandandan Mar 1, 2026
b71a2a7
Use Arc::strong_count instead of Weak for morsel queue lifecycle
Dandandan Mar 1, 2026
c383e6f
Fix morsel queue reset using partition counter
Dandandan Mar 1, 2026
ccd21c8
Add FileStream::with_shared_queue builder method
Dandandan Mar 1, 2026
763fff2
Use TaskContext::query_id to detect morsel queue execution cycles
Dandandan Mar 1, 2026
2e2b68b
Remove query_id
Dandandan Mar 1, 2026
6ee74d6
Change tests
Dandandan Mar 1, 2026
040eacc
Remove allow_morsel_driven=false workarounds from tests and examples
Dandandan Mar 1, 2026
06e254b
Plan whole files instead of byte-range splits for morsel-driven execu…
Dandandan Mar 1, 2026
c769c2d
Improve I/O locality by pushing morsels to front of work queue
Dandandan Mar 1, 2026
74b7fce
Merge remote-tracking branch 'upstream/main' into parquet-morsel-driv…
Dandandan Mar 1, 2026
7238c1c
Simplify morsel handling: push all morsels to front of queue
Dandandan Mar 1, 2026
f7614b7
Use separate queues for files and morsels to improve I/O locality
Dandandan Mar 1, 2026
28e64c9
Fix
Dandandan Mar 1, 2026
8eda140
Fix time_opening metric not stopped after morselizing
Dandandan Mar 1, 2026
4c735cf
Merge filter-pushdown-dynamic-bytes onto morsel-driven PR #20481
adriangb Mar 1, 2026
9cb068b
fix: resolve post-merge issues
adriangb Mar 1, 2026
40a8307
feat: replace WorkQueue with SharedPipeline using buffer_unordered
adriangb Mar 2, 2026
217e391
Revert "feat: replace WorkQueue with SharedPipeline using buffer_unor…
adriangb Mar 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions after.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
=== PhysicalExprAdapter Serialization Example ===

Step 1: Creating sample Parquet data...
Step 2: Setting up session with custom adapter...
Step 3: Creating physical plan with filter...
[Verify] original plan adapter check: true
Original plan has adapter: true

Step 4: Serializing plan with AdapterPreservingCodec...
[Serialize] Found DataSourceExec with adapter tag: v1
Serialized 407 bytes
(DataSourceExec with adapter wrapped as PhysicalExtensionNode with child plan)

Step 5: Deserializing plan with AdapterPreservingCodec...
[Deserialize] Found adapter extension with tag: v1
[Verify] restored plan adapter check: true
Restored plan has adapter: true

Step 6: Executing plans and comparing results...

Original plan results:
+----+
| id |
+----+
| 6 |
| 7 |
| 8 |
| 9 |
| 10 |
+----+

Restored plan results:
+----+
| id |
+----+
| 6 |
| 7 |
| 8 |
| 9 |
| 10 |
+----+

=== Example Complete! ===
Key takeaways:
1. PhysicalExtensionCodec provides serialize_physical_plan/deserialize_physical_plan hooks
2. The wrapper node pattern: metadata in JSON payload, structure in inputs field
3. Inner plans are included as children via extension.inputs, not encoded to bytes
4. Both plans produce identical results despite serialization round-trip
5. Adapters are fully preserved through the serialization round-trip
49 changes: 49 additions & 0 deletions before.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
=== PhysicalExprAdapter Serialization Example ===

Step 1: Creating sample Parquet data...
Step 2: Setting up session with custom adapter...
Step 3: Creating physical plan with filter...
[Verify] original plan adapter check: true
Original plan has adapter: true

Step 4: Serializing plan with AdapterPreservingCodec...
[Serialize] Found DataSourceExec with adapter tag: v1
Serialized 1049 bytes
(DataSourceExec with adapter was wrapped as PhysicalExtensionNode)

Step 5: Deserializing plan with AdapterPreservingCodec...
[Deserialize] Found adapter extension with tag: v1
[Verify] restored plan adapter check: true
Restored plan has adapter: true

Step 6: Executing plans and comparing results...

Original plan results:
+----+
| id |
+----+
| 6 |
| 7 |
| 8 |
| 9 |
| 10 |
+----+

Restored plan results:
+----+
| id |
+----+
| 6 |
| 7 |
| 8 |
| 9 |
| 10 |
+----+

=== Example Complete! ===
Key takeaways:
1. PhysicalExtensionCodec provides serialize_physical_plan/deserialize_physical_plan hooks
2. Custom metadata can be wrapped as PhysicalExtensionNode
3. Nested serialization (protobuf + JSON) works seamlessly
4. Both plans produce identical results despite serialization round-trip
5. Adapters are fully preserved through the serialization round-trip
2 changes: 1 addition & 1 deletion benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ run_clickbench_partitioned() {
run_clickbench_pushdown() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_pushdown.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (partitioned, 100 files) benchmark with pushdown_filters=true, reorder_filters=true..."
echo "Running clickbench (partitioned, 100 files) benchmark with pushdown_filters=true..."
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --pushdown --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG}
}

Expand Down
3 changes: 0 additions & 3 deletions benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ pub struct RunOpt {
///
/// Specifically, it enables:
/// * `pushdown_filters = true`
/// * `reorder_filters = true`
#[arg(long = "pushdown")]
pushdown: bool,

Expand Down Expand Up @@ -196,14 +195,12 @@ impl RunOpt {
// Turn on Parquet filter pushdown if requested
if self.pushdown {
parquet_options.pushdown_filters = true;
parquet_options.reorder_filters = true;
}

if self.sorted_by.is_some() {
// We should compare the dynamic topk optimization when data is sorted, so we make the
// assumption that filter pushdown is also enabled in this case.
parquet_options.pushdown_filters = true;
parquet_options.reorder_filters = true;
}
}

Expand Down
5 changes: 4 additions & 1 deletion datafusion-examples/examples/data_io/json_shredding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ pub async fn json_shredding() -> Result<()> {
store.put(&path, payload).await?;

// Set up query execution
let mut cfg = SessionConfig::new();
let mut cfg = SessionConfig::default().set(
"datafusion.execution.parquet.filter_pushdown_min_bytes_per_sec",
&ScalarValue::Float64(Some(0.0)),
);
cfg.options_mut().execution.parquet.pushdown_filters = true;
let ctx = SessionContext::new_with_config(cfg);
ctx.runtime_env().register_object_store(
Expand Down
52 changes: 47 additions & 5 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,11 +709,6 @@ config_namespace! {
/// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
pub pushdown_filters: bool, default = false

/// (reading) If true, filter expressions evaluated during the parquet decoding operation
/// will be reordered heuristically to minimize the cost of evaluation. If false,
/// the filters are applied in the same order as written in the query
pub reorder_filters: bool, default = false

/// (reading) Force the use of RowSelections for filter results, when
/// pushdown_filters is enabled. If false, the reader will automatically
/// choose between a RowSelection and a Bitmap based on the number and
Expand Down Expand Up @@ -743,6 +738,10 @@ config_namespace! {
/// (reading) Use any available bloom filters when reading parquet files
pub bloom_filter_on_read: bool, default = true

/// (reading) If true, the parquet reader will share work between partitions
/// using morsel-driven execution. This can help mitigate data skew.
pub allow_morsel_driven: bool, default = true

/// (reading) The maximum predicate cache size, in bytes. When
/// `pushdown_filters` is enabled, sets the maximum memory used to cache
/// the results of predicate evaluation between filter evaluation and
Expand All @@ -751,6 +750,49 @@ config_namespace! {
/// parquet reader setting. 0 means no caching.
pub max_predicate_cache_size: Option<usize>, default = None

/// (reading) Minimum bytes/sec throughput for adaptive filter pushdown.
/// Filters that achieve at least this throughput (bytes_saved / eval_time)
/// are promoted to row filters.
/// f64::INFINITY = no filters promoted (feature disabled).
/// 0.0 = all filters pushed as row filters (no adaptive logic).
/// Default: 104,857,600 bytes/sec (100 MiB/sec), empirically chosen based on
/// TPC-H, TPC-DS, and ClickBench benchmarks on an m4 MacBook Pro.
/// The optimal value for this setting likely depends on the relative
/// cost of CPU vs. IO in your environment, and to some extent the shape
/// of your query.
///
/// **Interaction with `pushdown_filters`:**
/// This option only takes effect when `pushdown_filters = true`.
/// When pushdown is disabled, all filters run post-scan and this
/// threshold is ignored.
pub filter_pushdown_min_bytes_per_sec: f64, default = 104_857_600.0

/// (reading) Byte-ratio threshold for applying filters one at a time
/// (iterative pruning; aka row-level) vs. all at once (post-scan).
/// The ratio is computed as: (extra filter bytes not in projection) / (projected bytes).
/// Filters whose extra columns consume a smaller fraction than this threshold are placed as row filters.
/// Filters whose extra columns consume a larger fraction are placed as post-scan filters.
/// Note: filter columns that are already in the query projection have zero extra cost,
/// so such filters always start as row filters regardless of this threshold.
/// Default: 0.05 meaning filters that require less than 5% additional bytes beyond the projection
/// are placed as row filters.
/// Set to INF to place all filters as row filters (skip byte-ratio check).
/// Set to 0 to place all filters as post-scan filters (no filter passes the ratio check).
///
/// **Interaction with `pushdown_filters`:**
/// Only takes effect when `pushdown_filters = true`.
pub filter_collecting_byte_ratio_threshold: f64, default = 0.05

/// (reading) Z-score for confidence intervals on filter effectiveness.
/// Controls how much statistical evidence is required before promoting
/// or demoting a filter. Lower values = faster decisions with less
/// confidence. Higher values = more conservative, requiring more data.
/// Default: 2.0 (~95% confidence).
///
/// **Interaction with `pushdown_filters`:**
/// Only takes effect when `pushdown_filters = true`.
pub filter_confidence_z: f64, default = 2.0

// The following options affect writing to parquet files
// and map to parquet::file::properties::WriterProperties

Expand Down
21 changes: 18 additions & 3 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl ParquetOptions {
skip_metadata: _,
metadata_size_hint: _,
pushdown_filters: _,
reorder_filters: _,

force_filter_selections: _, // not used for writer props
allow_single_file_parallelism: _,
maximum_parallel_row_group_writers: _,
Expand All @@ -208,7 +208,11 @@ impl ParquetOptions {
binary_as_string: _, // not used for writer props
coerce_int96: _, // not used for writer props
skip_arrow_metadata: _,
allow_morsel_driven: _,
max_predicate_cache_size: _,
filter_pushdown_min_bytes_per_sec: _, // not used for writer props
filter_collecting_byte_ratio_threshold: _, // not used for writer props
filter_confidence_z: _, // not used for writer props
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -447,7 +451,7 @@ mod tests {
skip_metadata: defaults.skip_metadata,
metadata_size_hint: defaults.metadata_size_hint,
pushdown_filters: defaults.pushdown_filters,
reorder_filters: defaults.reorder_filters,

force_filter_selections: defaults.force_filter_selections,
allow_single_file_parallelism: defaults.allow_single_file_parallelism,
maximum_parallel_row_group_writers: defaults
Expand All @@ -460,6 +464,11 @@ mod tests {
skip_arrow_metadata: defaults.skip_arrow_metadata,
coerce_int96: None,
max_predicate_cache_size: defaults.max_predicate_cache_size,
allow_morsel_driven: defaults.allow_morsel_driven,
filter_pushdown_min_bytes_per_sec: defaults.filter_pushdown_min_bytes_per_sec,
filter_collecting_byte_ratio_threshold: defaults
.filter_collecting_byte_ratio_threshold,
filter_confidence_z: defaults.filter_confidence_z,
}
}

Expand Down Expand Up @@ -561,7 +570,7 @@ mod tests {
skip_metadata: global_options_defaults.skip_metadata,
metadata_size_hint: global_options_defaults.metadata_size_hint,
pushdown_filters: global_options_defaults.pushdown_filters,
reorder_filters: global_options_defaults.reorder_filters,

force_filter_selections: global_options_defaults.force_filter_selections,
allow_single_file_parallelism: global_options_defaults
.allow_single_file_parallelism,
Expand All @@ -575,7 +584,13 @@ mod tests {
schema_force_view_types: global_options_defaults.schema_force_view_types,
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
allow_morsel_driven: global_options_defaults.allow_morsel_driven,
coerce_int96: None,
filter_pushdown_min_bytes_per_sec: global_options_defaults
.filter_pushdown_min_bytes_per_sec,
filter_collecting_byte_ratio_threshold: global_options_defaults
.filter_collecting_byte_ratio_threshold,
filter_confidence_z: global_options_defaults.filter_confidence_z,
},
column_specific_options,
key_value_metadata,
Expand Down
9 changes: 8 additions & 1 deletion datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,14 @@ mod tests {
let plan = df.explain(false, false)?.collect().await?;
// Filters all the way to Parquet
let formatted = pretty::pretty_format_batches(&plan)?.to_string();
assert!(formatted.contains("FilterExec: id@0 = 1"), "{formatted}");
let data_source_exec_row = formatted
.lines()
.find(|line| line.contains("DataSourceExec:"))
.unwrap();
assert!(
data_source_exec_row.contains("predicate=id@0 = 1"),
"{formatted}"
);

Ok(())
}
Expand Down
Loading
Loading