Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use datafusion::catalog::{Session, TableFunctionImpl};
use datafusion::common::{Column, plan_err};
use datafusion::common::plan_err;
use datafusion::datasource::TableProvider;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::error::Result;
Expand Down Expand Up @@ -329,7 +329,7 @@ impl TableFunctionImpl for ParquetMetadataFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let filename = match exprs.first() {
Some(Expr::Literal(ScalarValue::Utf8(Some(s)), _)) => s, // single quote: parquet_metadata('x.parquet')
Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
Some(Expr::Column(col)) => &col.name, // double quote: parquet_metadata("x.parquet")
_ => {
return plan_err!(
"parquet_metadata requires string argument as its input"
Expand Down
16 changes: 8 additions & 8 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt, stream::BoxStream};
use log::{debug, trace};

use datafusion_common::DFSchema;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{Column, DFSchema};
use datafusion_expr::{Expr, Volatility};
use datafusion_physical_expr::create_physical_expr;
use object_store::path::Path;
Expand All @@ -51,8 +51,8 @@ use object_store::{ObjectMeta, ObjectStore};
pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
let mut is_applicable = true;
expr.apply(|expr| match expr {
Expr::Column(Column { name, .. }) => {
is_applicable &= col_names.contains(&name.as_str());
Expr::Column(col) => {
is_applicable &= col_names.contains(&col.name.as_str());
if is_applicable {
Ok(TreeNodeRecursion::Jump)
} else {
Expand All @@ -61,7 +61,7 @@ pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
}
Expr::Literal(_, _)
| Expr::Alias(_)
| Expr::OuterReferenceColumn(_, _)
| Expr::OuterReferenceColumn(_)
| Expr::ScalarVariable(_, _)
| Expr::Not(_)
| Expr::IsNotNull(_)
Expand Down Expand Up @@ -251,13 +251,13 @@ fn populate_partition_values<'a>(
if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = filter {
match op {
Operator::Eq => match (left.as_ref(), right.as_ref()) {
(Expr::Column(Column { name, .. }), Expr::Literal(val, _))
| (Expr::Literal(val, _), Expr::Column(Column { name, .. })) => {
(Expr::Column(col), Expr::Literal(val, _))
| (Expr::Literal(val, _), Expr::Column(col)) => {
if partition_values
.insert(name, PartitionValue::Single(val.to_string()))
.insert(&col.name, PartitionValue::Single(val.to_string()))
.is_some()
{
partition_values.insert(name, PartitionValue::Multi);
partition_values.insert(&col.name, PartitionValue::Multi);
}
}
_ => {}
Expand Down
44 changes: 33 additions & 11 deletions datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ use std::fmt;
#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Column {
/// relation/table reference.
pub relation: Option<TableReference>,
///
/// Boxed to reduce the size of the `Column` struct (and thus `Expr` enum).
/// `TableReference` is ~56 bytes but `Box<TableReference>` is only 8 bytes.
/// This keeps `Column` small enough to be stored inline in `Expr` without
/// boxing the entire `Column`, avoiding heap allocation on the hottest path.
pub relation: Option<Box<TableReference>>,
/// field/column name.
pub name: String,
/// Original source code location, if known
Expand Down Expand Up @@ -57,7 +62,7 @@ impl Column {
name: impl Into<String>,
) -> Self {
Self {
relation: relation.map(|r| r.into()),
relation: relation.map(|r| Box::new(r.into())),
name: name.into(),
spans: Spans::new(),
}
Expand Down Expand Up @@ -91,24 +96,24 @@ impl Column {
let (relation, name) = match idents.len() {
1 => (None, idents.remove(0)),
2 => (
Some(TableReference::Bare {
Some(Box::new(TableReference::Bare {
table: idents.remove(0).into(),
}),
})),
idents.remove(0),
),
3 => (
Some(TableReference::Partial {
Some(Box::new(TableReference::Partial {
schema: idents.remove(0).into(),
table: idents.remove(0).into(),
}),
})),
idents.remove(0),
),
4 => (
Some(TableReference::Full {
Some(Box::new(TableReference::Full {
catalog: idents.remove(0).into(),
schema: idents.remove(0).into(),
table: idents.remove(0).into(),
}),
})),
idents.remove(0),
),
// any expression that failed to parse or has more than 4 period delimited
Expand Down Expand Up @@ -321,7 +326,7 @@ impl Column {
/// Qualifies the column with the given table reference.
pub fn with_relation(&self, relation: TableReference) -> Self {
Self {
relation: Some(relation),
relation: Some(Box::new(relation)),
..self.clone()
}
}
Expand Down Expand Up @@ -350,14 +355,22 @@ impl From<String> for Column {
/// Create a column, use qualifier and field name
impl From<(Option<&TableReference>, &Field)> for Column {
fn from((relation, field): (Option<&TableReference>, &Field)) -> Self {
Self::new(relation.cloned(), field.name())
Self {
relation: relation.map(|r| Box::new(r.clone())),
name: field.name().to_string(),
spans: Spans::new(),
}
}
}

/// Create a column, use qualifier and field name
impl From<(Option<&TableReference>, &FieldRef)> for Column {
fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self {
Self::new(relation.cloned(), field.name())
Self {
relation: relation.map(|r| Box::new(r.clone())),
name: field.name().to_string(),
spans: Spans::new(),
}
}
}

Expand All @@ -380,8 +393,17 @@ impl fmt::Display for Column {
mod tests {
use super::*;
use arrow::datatypes::{DataType, SchemaBuilder};
use std::mem::size_of;
use std::sync::Arc;

#[test]
fn test_column_size() {
// Column should be small enough to fit inline in Expr without boxing.
// Boxing TableReference (56 bytes -> 8 bytes pointer) shrinks Column
// from 104 to 56 bytes, keeping it inline in Expr.
assert_eq!(size_of::<Column>(), 56);
}

fn create_qualified_schema(qualifier: &str, names: Vec<&str>) -> Result<DFSchema> {
let mut schema_builder = SchemaBuilder::new();
schema_builder.extend(
Expand Down
11 changes: 6 additions & 5 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ impl DFSchema {
/// See [Self::index_of_column] for a version that returns an error if the
/// column is not found
pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
self.index_of_column_by_name(col.relation.as_ref(), &col.name)
self.index_of_column_by_name(col.relation.as_deref(), &col.name)
}

/// Find the index of the column with the given qualifier and name,
Expand All @@ -408,13 +408,14 @@ impl DFSchema {
/// See [Self::maybe_index_of_column] for a version that returns `None` if
/// the column is not found
pub fn index_of_column(&self, col: &Column) -> Result<usize> {
self.maybe_index_of_column(col)
.ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
self.maybe_index_of_column(col).ok_or_else(|| {
field_not_found(col.relation.clone().map(|r| *r), &col.name, self)
})
}

/// Check if the column is in the current schema
pub fn is_column_from_schema(&self, col: &Column) -> bool {
self.index_of_column_by_name(col.relation.as_ref(), &col.name)
self.index_of_column_by_name(col.relation.as_deref(), &col.name)
.is_some()
}

Expand Down Expand Up @@ -557,7 +558,7 @@ impl DFSchema {
&self,
column: &Column,
) -> Result<(Option<&TableReference>, &FieldRef)> {
self.qualified_field_with_name(column.relation.as_ref(), &column.name)
self.qualified_field_with_name(column.relation.as_deref(), &column.name)
}

/// Find if the field exists with the given name
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2466,15 +2466,15 @@ impl DataFrame {
if cols.contains(field) {
// Try to cast fill value to column type. If the cast fails, fallback to the original column.
match value.clone().cast_to(field.data_type()) {
Ok(fill_value) => Expr::Alias(Alias {
Ok(fill_value) => Expr::Alias(Box::new(Alias {
expr: Box::new(Expr::ScalarFunction(ScalarFunction {
func: coalesce(),
args: vec![col(field.name()), lit(fill_value)],
})),
relation: None,
name: field.name().to_string(),
metadata: None,
}),
})),
Err(_) => col(field.name()),
}
} else {
Expand Down
16 changes: 10 additions & 6 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::memory::MemorySourceConfig;
use datafusion_expr::dml::{CopyTo, InsertOp};
use datafusion_expr::expr::{
AggregateFunction, AggregateFunctionParams, Alias, GroupingSet, NullTreatment,
AggregateFunction, AggregateFunctionParams, GroupingSet, NullTreatment,
WindowFunction, WindowFunctionParams, physical_name,
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
Expand Down Expand Up @@ -719,9 +719,9 @@ impl DefaultPhysicalPlanner {
} = &window_fun.as_ref().params;
generate_sort_key(partition_by, order_by)
}
Expr::Alias(Alias { expr, .. }) => {
Expr::Alias(alias) => {
// Convert &Box<T> to &T
match &**expr {
match alias.expr.as_ref() {
Expr::WindowFunction(window_fun) => {
let WindowFunctionParams {
partition_by,
Expand Down Expand Up @@ -2151,7 +2151,7 @@ pub fn create_window_expr(
) -> Result<Arc<dyn WindowExpr>> {
// unpack aliased logical expressions, e.g. "sum(col) over () as total"
let (name, e) = match e {
Expr::Alias(Alias { expr, name, .. }) => (name.clone(), expr.as_ref()),
Expr::Alias(alias) => (alias.name.clone(), alias.expr.as_ref()),
_ => (e.schema_name().to_string(), e),
};
create_window_expr_with_name(e, name, logical_schema, execution_props)
Expand Down Expand Up @@ -2244,9 +2244,13 @@ pub fn create_aggregate_expr_and_maybe_filter(
// Some functions like `count_all()` create internal aliases,
// Unwrap all alias layers to get to the underlying aggregate function
let (name, human_display, e) = match e {
Expr::Alias(Alias { name, .. }) => {
Expr::Alias(alias) => {
let unaliased = e.clone().unalias_nested().data;
(Some(name.clone()), e.human_display().to_string(), unaliased)
(
Some(alias.name.clone()),
e.human_display().to_string(),
unaliased,
)
}
Expr::AggregateFunction(_) => (
Some(e.schema_name().to_string()),
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/dataframe/dataframe_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,11 +442,11 @@ async fn test_fn_approx_percentile_cont() -> Result<()> {
");

// the arg2 parameter is a complex expr, but it can be evaluated to the literal value
let alias_expr = Expr::Alias(Alias::new(
let alias_expr = Expr::Alias(Box::new(Alias::new(
cast(lit(0.5), DataType::Float32),
None::<&str>,
"arg_2".to_string(),
));
)));
let expr = approx_percentile_cont(col("b").sort(true, false), alias_expr, None);
let df = create_test_table().await?;
let batches = df.aggregate(vec![], vec![expr]).unwrap().collect().await?;
Expand All @@ -462,11 +462,11 @@ async fn test_fn_approx_percentile_cont() -> Result<()> {
"
);

let alias_expr = Expr::Alias(Alias::new(
let alias_expr = Expr::Alias(Box::new(Alias::new(
cast(lit(0.1), DataType::Float32),
None::<&str>,
"arg_2".to_string(),
));
)));
let expr = approx_percentile_cont(col("b").sort(false, false), alias_expr, None);
let df = create_test_table().await?;
let batches = df.aggregate(vec![], vec![expr]).unwrap().collect().await?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6162,7 +6162,7 @@ async fn test_alias() -> Result<()> {
.alias("table_alias")?;
// All output column qualifiers are changed to "table_alias"
df.schema().columns().iter().for_each(|c| {
assert_eq!(c.relation, Some("table_alias".into()));
assert_eq!(c.relation, Some(Box::new("table_alias".into())));
});

let plan = df
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/user_defined/expr_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ impl ExprPlanner for MyCustomPlanner {
})))
}
BinaryOperator::Question => {
Ok(PlannerResult::Planned(Expr::Alias(Alias::new(
Ok(PlannerResult::Planned(Expr::Alias(Box::new(Alias::new(
Expr::Literal(ScalarValue::Boolean(Some(true)), None),
None::<&str>,
format!("{} ? {}", expr.left, expr.right),
))))
)))))
}
_ => Ok(PlannerResult::Original(expr)),
}
Expand Down
Loading
Loading