Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions vortex-file/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use vortex_buffer::ByteBufferMut;
use vortex_buffer::buffer;
use vortex_error::VortexResult;
use vortex_io::session::RuntimeSession;
use vortex_layout::Layout;
use vortex_layout::scan::scan_builder::ScanBuilder;
use vortex_layout::session::LayoutSession;
use vortex_session::VortexSession;
Expand All @@ -71,6 +72,7 @@ use crate::V1_FOOTER_FBS_SIZE;
use crate::VERSION;
use crate::VortexFile;
use crate::WriteOptionsSessionExt;
use crate::footer::SegmentSpec;

static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
let mut session = VortexSession::empty()
Expand Down Expand Up @@ -1696,3 +1698,176 @@ async fn timestamp_unit_mismatch_errors_with_constant_children()

Ok(())
}

/// Collect all segment byte offsets reachable from a layout node.
fn collect_segment_offsets(layout: &dyn Layout, segment_specs: &[SegmentSpec]) -> Vec<u64> {
let mut result = Vec::new();
collect_segment_offsets_inner(layout, segment_specs, &mut result);
result
}

fn collect_segment_offsets_inner(
layout: &dyn Layout,
segment_specs: &[SegmentSpec],
result: &mut Vec<u64>,
) {
for seg_id in layout.segment_ids() {
result.push(segment_specs[*seg_id as usize].offset);
}
for child in layout.children().unwrap() {
collect_segment_offsets_inner(child.as_ref(), segment_specs, result);
}
}

/// Assert that all offsets in `before` are less than all offsets in `after`.
fn assert_offsets_ordered(before: &[u64], after: &[u64], context: &str) {
if let (Some(&max_before), Some(&min_after)) = (before.iter().max(), after.iter().min()) {
assert!(
max_before < min_after,
"{context}: expected all 'before' offsets < all 'after' offsets, \
but max before = {max_before} >= min after = {min_after}"
);
}
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_segment_ordering_dict_codes_before_values() -> VortexResult<()> {
// Create low-cardinality strings to trigger dict encoding, plus an integer column.
let n = 100_000;
let values: Vec<&str> = (0..n).map(|i| ["alpha", "beta", "gamma"][i % 3]).collect();
let strings = VarBinArray::from(values).into_array();
let numbers = PrimitiveArray::from_iter(0..n as i32).into_array();

let st = StructArray::from_fields(&[("strings", strings), ("numbers", numbers)]).unwrap();

let mut buf = ByteBufferMut::empty();
let summary = SESSION
.write_options()
.write(&mut buf, st.to_array_stream())
.await?;

let footer = summary.footer();
let segment_specs = footer.segment_map();
let root = footer.layout();

// Walk the layout tree and find all dict layouts.
// Verify codes segments come before values segments in byte order within each run.
fn check_dict_ordering(layout: &dyn Layout, segment_specs: &[SegmentSpec]) {
if layout.encoding_id().as_ref() == "vortex.dict" {
// child 0 = values, child 1 = codes
let values_offsets =
collect_segment_offsets(layout.child(0).unwrap().as_ref(), segment_specs);
let codes_offsets =
collect_segment_offsets(layout.child(1).unwrap().as_ref(), segment_specs);

assert_offsets_ordered(
&codes_offsets,
&values_offsets,
"dict: codes should come before values",
);
}

for child in layout.children().unwrap() {
check_dict_ordering(child.as_ref(), segment_specs);
}
}

check_dict_ordering(root.as_ref(), segment_specs);

Ok(())
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_segment_ordering_zonemaps_after_data() -> VortexResult<()> {
// Create a multi-column struct with enough rows to produce zone maps.
let n = 100_000;
let values: Vec<&str> = (0..n).map(|i| ["alpha", "beta", "gamma"][i % 3]).collect();
let strings = VarBinArray::from(values).into_array();
let numbers = PrimitiveArray::from_iter(0..n as i32).into_array();
let floats = PrimitiveArray::from_iter((0..n).map(|i| i as f64 * 0.1)).into_array();

let st = StructArray::from_fields(&[
("strings", strings),
("numbers", numbers),
("floats", floats),
])
.unwrap();

let mut buf = ByteBufferMut::empty();
let summary = SESSION
.write_options()
.write(&mut buf, st.to_array_stream())
.await?;

let footer = summary.footer();
let segment_specs = footer.segment_map();
let root = footer.layout();

// Find all zoned layouts and verify data segments come before zone map segments.
fn check_zoned_ordering(layout: &dyn Layout, segment_specs: &[SegmentSpec]) {
if layout.encoding_id().as_ref() == "vortex.stats" {
// child 0 = data, child 1 = zones
let data_offsets =
collect_segment_offsets(layout.child(0).unwrap().as_ref(), segment_specs);
let zones_offsets =
collect_segment_offsets(layout.child(1).unwrap().as_ref(), segment_specs);

assert_offsets_ordered(
&data_offsets,
&zones_offsets,
"zoned: data should come before zones",
);
}

for child in layout.children().unwrap() {
check_zoned_ordering(child.as_ref(), segment_specs);
}
}

check_zoned_ordering(root.as_ref(), segment_specs);

// Additionally: all zone map segments across all columns should appear after
// all data segments across all columns.
let mut all_data_offsets = Vec::new();
let mut all_zones_offsets = Vec::new();

fn collect_all_zoned(
layout: &dyn Layout,
segment_specs: &[SegmentSpec],
all_data: &mut Vec<u64>,
all_zones: &mut Vec<u64>,
) {
if layout.encoding_id().as_ref() == "vortex.stats" {
// child 0 = data, child 1 = zones
all_data.extend(collect_segment_offsets(
layout.child(0).unwrap().as_ref(),
segment_specs,
));
all_zones.extend(collect_segment_offsets(
layout.child(1).unwrap().as_ref(),
segment_specs,
));
return;
}
for child in layout.children().unwrap() {
collect_all_zoned(child.as_ref(), segment_specs, all_data, all_zones);
}
}

collect_all_zoned(
root.as_ref(),
segment_specs,
&mut all_data_offsets,
&mut all_zones_offsets,
);

assert_offsets_ordered(
&all_data_offsets,
&all_zones_offsets,
"global: all data segments should come before all zone map segments",
);

Ok(())
}
34 changes: 19 additions & 15 deletions vortex-layout/src/layouts/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::sync::atomic::Ordering;
use async_stream::try_stream;
use async_trait::async_trait;
use futures::StreamExt as _;
use futures::pin_mut;
use vortex_array::ArrayContext;
use vortex_error::VortexResult;
use vortex_io::runtime::Handle;
Expand Down Expand Up @@ -44,20 +45,18 @@ impl LayoutStrategy for BufferedStrategy {
&self,
ctx: ArrayContext,
segment_sink: SegmentSinkRef,
mut stream: SendableSequentialStream,
mut eof: SequencePointer,
stream: SendableSequentialStream,
eof: SequencePointer,
handle: Handle,
) -> VortexResult<LayoutRef> {
let dtype = stream.dtype().clone();
let buffer_size = self.buffer_size;

// We have no choice but to put our final buffers here!
// We cannot hold on to sequence ids across iterations of the stream, otherwise we can
// cause deadlocks with other columns that are waiting for us to flush.
let mut final_flush = eof.split_off();

let buffered_bytes_counter = self.buffered_bytes.clone();
let buffered_stream = try_stream! {
let stream = stream.peekable();
pin_mut!(stream);

let mut nbytes = 0u64;
let mut chunks = VecDeque::new();

Expand All @@ -68,11 +67,23 @@ impl LayoutStrategy for BufferedStrategy {
buffered_bytes_counter.fetch_add(chunk_size, Ordering::Relaxed);
chunks.push_back(chunk);

// If this is the last element, flush everything.
if stream.as_mut().peek().await.is_none() {
let mut sequence_ptr = sequence_id.descend();
while let Some(chunk) = chunks.pop_front() {
let chunk_size = chunk.nbytes();
nbytes -= chunk_size;
buffered_bytes_counter.fetch_sub(chunk_size, Ordering::Relaxed);
yield (sequence_ptr.advance(), chunk)
}
break;
}

if nbytes < 2 * buffer_size {
continue;
};

// Wait until we're at 2x the buffer size before flushing 1x the buffer size
// Wait until we're at 2x the buffer size before flushing 1x the buffer size.
// This avoids small tail stragglers being flushed at the end of the file.
let mut sequence_ptr = sequence_id.descend();
while nbytes > buffer_size {
Expand All @@ -85,13 +96,6 @@ impl LayoutStrategy for BufferedStrategy {
yield (sequence_ptr.advance(), chunk)
}
}

// Now the input stream has ended, flush everything
while let Some(chunk) = chunks.pop_front() {
let chunk_size = chunk.nbytes();
buffered_bytes_counter.fetch_sub(chunk_size, Ordering::Relaxed);
yield (final_flush.advance(), chunk)
}
};

self.child
Expand Down
Loading