From f8c9c4de66541d6c370363e898d809fa759ffbf2 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Tue, 31 Mar 2026 15:58:45 +0100 Subject: [PATCH 1/2] buffered strategy to not use eof for the final chunk Signed-off-by: Onur Satici --- vortex-layout/src/layouts/buffered.rs | 34 +++++++++++++++------------ 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/vortex-layout/src/layouts/buffered.rs b/vortex-layout/src/layouts/buffered.rs index 26d9e1a394f..049e1d295da 100644 --- a/vortex-layout/src/layouts/buffered.rs +++ b/vortex-layout/src/layouts/buffered.rs @@ -9,6 +9,7 @@ use std::sync::atomic::Ordering; use async_stream::try_stream; use async_trait::async_trait; use futures::StreamExt as _; +use futures::pin_mut; use vortex_array::ArrayContext; use vortex_error::VortexResult; use vortex_io::runtime::Handle; @@ -44,20 +45,18 @@ impl LayoutStrategy for BufferedStrategy { &self, ctx: ArrayContext, segment_sink: SegmentSinkRef, - mut stream: SendableSequentialStream, - mut eof: SequencePointer, + stream: SendableSequentialStream, + eof: SequencePointer, handle: Handle, ) -> VortexResult { let dtype = stream.dtype().clone(); let buffer_size = self.buffer_size; - // We have no choice but to put our final buffers here! - // We cannot hold on to sequence ids across iterations of the stream, otherwise we can - // cause deadlocks with other columns that are waiting for us to flush. - let mut final_flush = eof.split_off(); - let buffered_bytes_counter = self.buffered_bytes.clone(); let buffered_stream = try_stream! { + let stream = stream.peekable(); + pin_mut!(stream); + let mut nbytes = 0u64; let mut chunks = VecDeque::new(); @@ -68,11 +67,23 @@ impl LayoutStrategy for BufferedStrategy { buffered_bytes_counter.fetch_add(chunk_size, Ordering::Relaxed); chunks.push_back(chunk); + // If this is the last element, flush everything. + if stream.as_mut().peek().await.is_none() { + let mut sequence_ptr = sequence_id.descend(); + while let Some(chunk) = chunks.pop_front() { + let chunk_size = chunk.nbytes(); + nbytes -= chunk_size; + buffered_bytes_counter.fetch_sub(chunk_size, Ordering::Relaxed); + yield (sequence_ptr.advance(), chunk) + } + break; + } + if nbytes < 2 * buffer_size { continue; }; - // Wait until we're at 2x the buffer size before flushing 1x the buffer size + // Wait until we're at 2x the buffer size before flushing 1x the buffer size. // This avoids small tail stragglers being flushed at the end of the file. let mut sequence_ptr = sequence_id.descend(); while nbytes > buffer_size { @@ -85,13 +96,6 @@ impl LayoutStrategy for BufferedStrategy { yield (sequence_ptr.advance(), chunk) } } - - // Now the input stream has ended, flush everything - while let Some(chunk) = chunks.pop_front() { - let chunk_size = chunk.nbytes(); - buffered_bytes_counter.fetch_sub(chunk_size, Ordering::Relaxed); - yield (final_flush.advance(), chunk) - } }; self.child From 5a3cfa1d11c2165862d317d1e767021c684b3dc2 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Wed, 1 Apr 2026 11:12:24 +0100 Subject: [PATCH 2/2] segment offset tests Signed-off-by: Onur Satici --- vortex-file/src/tests.rs | 175 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 175 insertions(+) diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index 51b3b97383a..6733200aa20 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -62,6 +62,7 @@ use vortex_buffer::ByteBufferMut; use vortex_buffer::buffer; use vortex_error::VortexResult; use vortex_io::session::RuntimeSession; +use vortex_layout::Layout; use vortex_layout::scan::scan_builder::ScanBuilder; use vortex_layout::session::LayoutSession; use vortex_session::VortexSession; @@ -71,6 +72,7 @@ use crate::V1_FOOTER_FBS_SIZE; use crate::VERSION; use crate::VortexFile; use crate::WriteOptionsSessionExt; +use crate::footer::SegmentSpec; static SESSION: LazyLock = LazyLock::new(|| { let mut session = VortexSession::empty() @@ -1696,3 +1698,176 @@ async fn timestamp_unit_mismatch_errors_with_constant_children() Ok(()) } + +/// Collect all segment byte offsets reachable from a layout node. +fn collect_segment_offsets(layout: &dyn Layout, segment_specs: &[SegmentSpec]) -> Vec { + let mut result = Vec::new(); + collect_segment_offsets_inner(layout, segment_specs, &mut result); + result +} + +fn collect_segment_offsets_inner( + layout: &dyn Layout, + segment_specs: &[SegmentSpec], + result: &mut Vec, +) { + for seg_id in layout.segment_ids() { + result.push(segment_specs[*seg_id as usize].offset); + } + for child in layout.children().unwrap() { + collect_segment_offsets_inner(child.as_ref(), segment_specs, result); + } +} + +/// Assert that all offsets in `before` are less than all offsets in `after`. +fn assert_offsets_ordered(before: &[u64], after: &[u64], context: &str) { + if let (Some(&max_before), Some(&min_after)) = (before.iter().max(), after.iter().min()) { + assert!( + max_before < min_after, + "{context}: expected all 'before' offsets < all 'after' offsets, \ + but max before = {max_before} >= min after = {min_after}" + ); + } +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn test_segment_ordering_dict_codes_before_values() -> VortexResult<()> { + // Create low-cardinality strings to trigger dict encoding, plus an integer column. + let n = 100_000; + let values: Vec<&str> = (0..n).map(|i| ["alpha", "beta", "gamma"][i % 3]).collect(); + let strings = VarBinArray::from(values).into_array(); + let numbers = PrimitiveArray::from_iter(0..n as i32).into_array(); + + let st = StructArray::from_fields(&[("strings", strings), ("numbers", numbers)]).unwrap(); + + let mut buf = ByteBufferMut::empty(); + let summary = SESSION + .write_options() + .write(&mut buf, st.to_array_stream()) + .await?; + + let footer = summary.footer(); + let segment_specs = footer.segment_map(); + let root = footer.layout(); + + // Walk the layout tree and find all dict layouts. + // Verify codes segments come before values segments in byte order within each run. + fn check_dict_ordering(layout: &dyn Layout, segment_specs: &[SegmentSpec]) { + if layout.encoding_id().as_ref() == "vortex.dict" { + // child 0 = values, child 1 = codes + let values_offsets = + collect_segment_offsets(layout.child(0).unwrap().as_ref(), segment_specs); + let codes_offsets = + collect_segment_offsets(layout.child(1).unwrap().as_ref(), segment_specs); + + assert_offsets_ordered( + &codes_offsets, + &values_offsets, + "dict: codes should come before values", + ); + } + + for child in layout.children().unwrap() { + check_dict_ordering(child.as_ref(), segment_specs); + } + } + + check_dict_ordering(root.as_ref(), segment_specs); + + Ok(()) +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn test_segment_ordering_zonemaps_after_data() -> VortexResult<()> { + // Create a multi-column struct with enough rows to produce zone maps. + let n = 100_000; + let values: Vec<&str> = (0..n).map(|i| ["alpha", "beta", "gamma"][i % 3]).collect(); + let strings = VarBinArray::from(values).into_array(); + let numbers = PrimitiveArray::from_iter(0..n as i32).into_array(); + let floats = PrimitiveArray::from_iter((0..n).map(|i| i as f64 * 0.1)).into_array(); + + let st = StructArray::from_fields(&[ + ("strings", strings), + ("numbers", numbers), + ("floats", floats), + ]) + .unwrap(); + + let mut buf = ByteBufferMut::empty(); + let summary = SESSION + .write_options() + .write(&mut buf, st.to_array_stream()) + .await?; + + let footer = summary.footer(); + let segment_specs = footer.segment_map(); + let root = footer.layout(); + + // Find all zoned layouts and verify data segments come before zone map segments. + fn check_zoned_ordering(layout: &dyn Layout, segment_specs: &[SegmentSpec]) { + if layout.encoding_id().as_ref() == "vortex.stats" { + // child 0 = data, child 1 = zones + let data_offsets = + collect_segment_offsets(layout.child(0).unwrap().as_ref(), segment_specs); + let zones_offsets = + collect_segment_offsets(layout.child(1).unwrap().as_ref(), segment_specs); + + assert_offsets_ordered( + &data_offsets, + &zones_offsets, + "zoned: data should come before zones", + ); + } + + for child in layout.children().unwrap() { + check_zoned_ordering(child.as_ref(), segment_specs); + } + } + + check_zoned_ordering(root.as_ref(), segment_specs); + + // Additionally: all zone map segments across all columns should appear after + // all data segments across all columns. + let mut all_data_offsets = Vec::new(); + let mut all_zones_offsets = Vec::new(); + + fn collect_all_zoned( + layout: &dyn Layout, + segment_specs: &[SegmentSpec], + all_data: &mut Vec, + all_zones: &mut Vec, + ) { + if layout.encoding_id().as_ref() == "vortex.stats" { + // child 0 = data, child 1 = zones + all_data.extend(collect_segment_offsets( + layout.child(0).unwrap().as_ref(), + segment_specs, + )); + all_zones.extend(collect_segment_offsets( + layout.child(1).unwrap().as_ref(), + segment_specs, + )); + return; + } + for child in layout.children().unwrap() { + collect_all_zoned(child.as_ref(), segment_specs, all_data, all_zones); + } + } + + collect_all_zoned( + root.as_ref(), + segment_specs, + &mut all_data_offsets, + &mut all_zones_offsets, + ); + + assert_offsets_ordered( + &all_data_offsets, + &all_zones_offsets, + "global: all data segments should come before all zone map segments", + ); + + Ok(()) +}