Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

200 changes: 152 additions & 48 deletions benchmarks/datafusion-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use clap::Parser;
use clap::value_parser;
use custom_labels::asynchronous::Label;
use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::util::display::ArrayFormatter;
use datafusion::arrow::util::display::FormatOptions;
use datafusion::common::runtime::set_join_set_tracer;
use datafusion::datasource::listing::ListingOptions;
use datafusion::datasource::listing::ListingTable;
Expand Down Expand Up @@ -97,6 +99,14 @@ struct Args {
#[arg(long, default_value_t = false)]
explain: bool,

/// Validate query results against reference files.
#[arg(long, default_value_t = false, conflicts_with = "explain")]
validate: bool,

/// Print query results to stdout after execution.
#[arg(long, default_value_t = false, conflicts_with = "explain")]
print_results: bool,

#[arg(long, value_delimiter = ',', value_parser = value_parser!(Format))]
formats: Vec<Format>,

Expand Down Expand Up @@ -161,59 +171,106 @@ async fn main() -> anyhow::Result<()> {
Arc::new(Mutex::new(Vec::new()));
let show_metrics = args.show_metrics;

let mode = if args.explain {
BenchmarkMode::Explain
} else {
BenchmarkMode::Run {
iterations: args.iterations,
}
};

runner
.run_all_async(
&filtered_queries,
mode,
|format| {
let benchmark = &*benchmark;
async move {
let session = datafusion_bench::get_session_context();
datafusion_bench::make_object_store(&session, benchmark.data_url())?;
register_benchmark_tables(&session, benchmark, format).await?;
Ok((session, format))
}
},
|query_idx, (session, format), query| {
let plans = Arc::clone(&collected_plans);
let validate = args.validate || std::env::var("CI").is_ok();
let iterations = if args.validate { 1 } else { args.iterations };

if let Some(slt_path) = benchmark.slt_path("datafusion") {
for &format in &args.formats {
let session = Arc::new(datafusion_bench::get_session_context());
datafusion_bench::make_object_store(&session, benchmark.data_url())?;
register_benchmark_tables(&session, &*benchmark, format).await?;

runner
.run_slt_async(
&slt_path,
"datafusion",
format,
iterations,
validate,
args.queries.as_ref(),
args.exclude_queries.as_ref(),
|sql| {
let session = Arc::clone(&session);
let sql = sql.to_string();
async move {
session.sql(&sql).await?.collect().await?;
Ok(())
}
},
|query| {
let session = Arc::clone(&session);
let plans = Arc::clone(&collected_plans);
Box::pin(async move {
let timer = Instant::now();
let (batches, plan) = execute_query(&session, query).await?;
let time = timer.elapsed();

if show_metrics {
let mut plans_mut = plans.lock();
plans_mut.push((0, format, plan.clone()));
}

let labelset = set_labels(benchmark_name.clone(), query_idx, *format);
anyhow::Ok((Some(time), DataFusionQueryResult(batches)))
})
},
)
.await?;
}
} else {
let mode = if args.explain {
BenchmarkMode::Explain
} else {
BenchmarkMode::Run {
iterations,
validate,
print_results: args.print_results,
}
};

Box::pin(
runner
.run_all_async(
&filtered_queries,
mode,
|format| {
let benchmark = &*benchmark;
async move {
let timer = Instant::now();
let (batches, plan) = execute_query(session, query)
.with_labelset(get_labelset_from_global())
.await?;
let time = timer.elapsed();

// Store plan for metrics (only store once per query/format combination)
if show_metrics {
let mut plans_mut = plans.lock();
// Only store if we don't already have this query/format combo
if !plans_mut
.iter()
.any(|(idx, f, _)| *idx == query_idx && *f == *format)
{
plans_mut.push((query_idx, *format, plan.clone()));
let session = datafusion_bench::get_session_context();
datafusion_bench::make_object_store(&session, benchmark.data_url())?;
register_benchmark_tables(&session, benchmark, format).await?;
Ok((session, format))
}
},
|query_idx, (session, format), query| {
let plans = Arc::clone(&collected_plans);

let labelset = set_labels(benchmark_name.clone(), query_idx, *format);

Box::pin(
async move {
let timer = Instant::now();
let (batches, plan) = execute_query(session, query)
.with_labelset(get_labelset_from_global())
.await?;
let time = timer.elapsed();

if show_metrics {
let mut plans_mut = plans.lock();
if !plans_mut
.iter()
.any(|(idx, f, _)| *idx == query_idx && *f == *format)
{
plans_mut.push((query_idx, *format, plan.clone()));
}
}
}

anyhow::Ok((Some(time), DataFusionQueryResult(batches)))
}
.with_labelset(labelset),
)
},
)
.await?;
anyhow::Ok((Some(time), DataFusionQueryResult(batches)))
}
.with_labelset(labelset),
)
},
)
.await?;
}

if !args.explain {
// Print metrics if requested
Expand Down Expand Up @@ -400,6 +457,53 @@ impl BenchmarkQueryResult for DataFusionQueryResult {
.map(|d| d.to_string())
.unwrap_or_else(|e| format!("<error: {e}>"))
}

fn result_rows(&self) -> (Vec<String>, Vec<Vec<String>>) {
extract_record_batch_rows(&self.0)
}
}

/// Extract raw string values from Arrow `RecordBatch`es.
///
/// Uses `ArrayFormatter` to produce `to_string()` values for every cell.
/// NULL cells are represented as `"NULL"`. No type-specific normalization is
/// applied — each engine's per-engine `.slt` reference files contain the
/// exact expected output.
fn extract_record_batch_rows(batches: &[RecordBatch]) -> (Vec<String>, Vec<Vec<String>>) {
use vortex::error::VortexExpect;

let column_names = batches
.first()
.map(|b| {
b.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect()
})
.unwrap_or_default();

let format_opts = FormatOptions::default().with_null("NULL");
let mut rows = Vec::new();

for batch in batches {
let formatters: Vec<ArrayFormatter> = batch
.columns()
.iter()
.map(|col| ArrayFormatter::try_new(col.as_ref(), &format_opts))
.collect::<Result<Vec<_>, _>>()
.vortex_expect("ArrayFormatter creation should not fail");

for row_idx in 0..batch.num_rows() {
let mut row = Vec::with_capacity(batch.num_columns());
for formatter in &formatters {
row.push(formatter.value(row_idx).to_string());
}
rows.push(row);
}
}

(column_names, rows)
}

pub async fn execute_query(
Expand Down
75 changes: 64 additions & 11 deletions benchmarks/duckdb-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,25 +199,78 @@ impl DuckClient {
let time_instant = Instant::now();
let result = self.connection().query(query)?;
let query_time = time_instant.elapsed();
Ok((Some(query_time), DuckQueryResult(result)))
Ok((Some(query_time), DuckQueryResult::from_query_result(result)))
}
}

/// Wrapper around DuckDB's `QueryResult` implementing `BenchmarkQueryResult`.
pub struct DuckQueryResult(pub QueryResult);
/// Eagerly materialized wrapper around DuckDB query results.
///
/// Materializes the result on construction so that both `row_count()`,
/// `display()`, and `result_rows()` can be called via shared reference.
pub struct DuckQueryResult {
row_count: usize,
display_string: String,
column_names: Vec<String>,
normalized_rows: Vec<Vec<String>>,
}

impl DuckQueryResult {
/// Consume a DuckDB `QueryResult` and materialize its contents.
pub fn from_query_result(result: QueryResult) -> Self {
let row_count = usize::try_from(result.row_count()).unwrap_or(0);
let col_count = usize::try_from(result.column_count()).unwrap_or(0);

let mut column_names = Vec::with_capacity(col_count);
for col_idx in 0..col_count {
column_names.push(
result
.column_name(col_idx)
.vortex_expect("column name should be valid")
.to_string(),
);
}

let mut display_string = String::new();
let mut normalized_rows = Vec::new();

for chunk in result {
let chunk_str =
String::try_from(chunk.deref()).unwrap_or_else(|_| "<error>".to_string());
display_string.push_str(&chunk_str);

for row_idx in 0..chunk.len() {
let mut row = Vec::with_capacity(chunk.column_count());
for col_idx in 0..chunk.column_count() {
let vector = chunk.get_vector(col_idx);
let cell = match vector.get_value(row_idx, chunk.len()) {
Some(value) => value.to_string(),
None => "NULL".to_string(),
};
row.push(cell);
}
normalized_rows.push(row);
}
}

Self {
row_count,
display_string,
column_names,
normalized_rows,
}
}
}

impl BenchmarkQueryResult for DuckQueryResult {
fn row_count(&self) -> usize {
usize::try_from(self.0.row_count()).unwrap_or(0)
self.row_count
}

fn display(self) -> String {
let mut output = String::new();
for chunk in self.0 {
let chunk_str =
String::try_from(chunk.deref()).unwrap_or_else(|_| "<error>".to_string());
output.push_str(&chunk_str);
}
output
self.display_string
}

fn result_rows(&self) -> (Vec<String>, Vec<Vec<String>>) {
(self.column_names.clone(), self.normalized_rows.clone())
}
}
Loading
Loading