Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/src/sort_tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ impl RunOpt {
let options = if self.sorted {
let key_column_name = schema.fields()[0].name();
options
.with_file_sort_order(vec![vec![col(key_column_name).sort(true, false)]])
.with_file_sort_order(vec![vec![col(key_column_name).sort().asc().nulls_last()]])
} else {
options
};
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/tpcds/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ impl RunOpt {
let options = if self.sorted {
let key_column_name = schema.fields()[0].name();
options
.with_file_sort_order(vec![vec![col(key_column_name).sort(true, false)]])
.with_file_sort_order(vec![vec![col(key_column_name).sort().asc().nulls_last()]])
} else {
options
};
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ impl RunOpt {
let options = if self.sorted {
let key_column_name = schema.fields()[0].name();
options
.with_file_sort_order(vec![vec![col(key_column_name).sort(true, false)]])
.with_file_sort_order(vec![vec![col(key_column_name).sort().asc().nulls_last()]])
} else {
options
};
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ mod tests {
"file_size_bytes",
"etag",
])?
.sort(vec![col("filename").sort(true, false)])?;
.sort(vec![col("filename").sort().asc().nulls_last()])?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
+---------------------+-----------+-----------------+------+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ mod non_windows {
]));

// Specify the ordering:
let order = vec![vec![datafusion::logical_expr::col("a1").sort(true, false)]];
let order = vec![vec![datafusion::logical_expr::col("a1").sort().asc().nulls_last()]];

let provider = fifo_table(schema.clone(), fifo_path, order.clone());
ctx.register_table("fifo", provider)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/dataframe/cache_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub async fn cache_dataframe_with_custom_logic() -> Result<()> {
.await?;

let df1 = df_cached.clone().filter(col("car").eq(lit("red")))?;
let df2 = df1.clone().sort(vec![col("car").sort(true, false)])?;
let df2 = df1.clone().sort(vec![col("car").sort().asc().nulls_last()])?;

// should see log for caching only once
df_cached.show().await?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/query_planning/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ fn expr_fn_demo() -> Result<()> {
// such as `FIRST_VALUE(price FILTER quantity > 100 ORDER BY ts )
let agg = first_value
.call(vec![col("price")])
.order_by(vec![col("ts").sort(false, false)])
.order_by(vec![col("ts").sort().desc().nulls_last()])
.filter(col("quantity").gt(lit(100)))
.build()?; // build the aggregate
assert_eq!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async fn query_parquet_demo() -> Result<()> {
)?
// Directly parsing the SQL text into a sort expression is not supported yet, so
// construct it programmatically
.sort(vec![col("car").sort(false, false)])?
.sort(vec![col("car").sort().desc().nulls_last()])?
.limit(0, Some(1))?;

let result = df.collect().await?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/udf/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ pub async fn advanced_udwf() -> Result<()> {
let window_expr = smooth_it
.call(vec![col("speed")]) // smooth_it(speed)
.partition_by(vec![col("car")]) // PARTITION BY car
.order_by(vec![col("time").sort(true, true)]) // ORDER BY time ASC
.order_by(vec![col("time").sort().asc().nulls_first()]) // ORDER BY time ASC
.window_frame(WindowFrame::new(None))
.build()?;
let df = ctx.table("cars").await?.window(vec![window_expr])?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/udf/simple_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub async fn simple_udwf() -> Result<()> {
let window_expr = smooth_it
.call(vec![col("speed")]) // smooth_it(speed)
.partition_by(vec![col("car")]) // PARTITION BY car
.order_by(vec![col("time").sort(true, true)]) // ORDER BY time ASC
.order_by(vec![col("time").sort().asc().nulls_first()]) // ORDER BY time ASC
.window_frame(WindowFrame::new(None))
.build()?;
let df = ctx.table("cars").await?.window(vec![window_expr])?;
Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/src/utils/csv_to_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl ParquetTemp {
/// let parquet_dir = write_csv_to_parquet(&ctx, &csv_path).await?;
/// let df = ctx.read_parquet(parquet_dir.path_str()?, ParquetReadOptions::default()).await?;
/// let rows = df
/// .sort(vec![col("speed").sort(true, true)])?
/// .sort(vec![col("speed").sort().asc().nulls_first()])?
/// .limit(0, Some(5))?;
/// assert_batches_eq!(
/// &[
Expand Down Expand Up @@ -146,7 +146,7 @@ mod tests {
.read_parquet(parquet_dir.path_str()?, ParquetReadOptions::default())
.await?;

let rows = df.sort(vec![col("speed").sort(true, true)])?;
let rows = df.sort(vec![col("speed").sort().asc().nulls_first()])?;
assert_batches_eq!(
&[
"+-------+-------+---------------------+",
Expand Down Expand Up @@ -198,7 +198,7 @@ mod tests {
.read_parquet(parquet_dir.path_str()?, ParquetReadOptions::default())
.await?;

let rows = df.sort(vec![col("values").sort(true, true)])?;
let rows = df.sort(vec![col("values").sort().asc().nulls_first()])?;
assert_batches_eq!(
&[
"+------------+--------------------------------------+-------------+-------+",
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog-listing/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl ListingOptions {
/// # use datafusion_datasource_parquet::file_format::ParquetFormat;
///
/// // Tell datafusion that the files are sorted by column "a"
/// let file_sort_order = vec![vec![col("a").sort(true, true)]];
/// let file_sort_order = vec![vec![col("a").sort().asc().nulls_first()]];
///
/// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
/// .with_file_sort_order(file_sort_order.clone());
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/benches/preserve_file_partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ fn preserve_order_bench(
GROUP BY f_dkey \
ORDER BY f_dkey";

let file_sort_order = vec![vec![col("f_dkey").sort(true, false)]];
let file_sort_order = vec![vec![col("f_dkey").sort().asc().nulls_last()]];

run_benchmark(
c,
Expand Down Expand Up @@ -643,7 +643,7 @@ fn preserve_order_join_bench(
GROUP BY f.f_dkey \
ORDER BY f.f_dkey";

let file_sort_order = vec![vec![col("f_dkey").sort(true, false)]];
let file_sort_order = vec![vec![col("f_dkey").sort().asc().nulls_last()]];

run_benchmark(
c,
Expand Down Expand Up @@ -745,8 +745,8 @@ fn preserve_order_window_bench(
LIMIT 1000";

let file_sort_order = vec![vec![
col("f_dkey").sort(true, false),
col("timestamp").sort(true, false),
col("f_dkey").sort().asc().nulls_last(),
col("timestamp").sort().asc().nulls_last(),
]];

run_benchmark(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ fn register_union_order_table_generic<T>(

// tell DataFusion that the table is sorted by all columns
let sort_order = (0..num_columns)
.map(|i| col(format!("c{i}")).sort(true, true))
.map(|i| col(format!("c{i}")).sort().asc().nulls_first())
.collect::<Vec<_>>();

// create the table
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,7 @@ impl DataFrame {
pub fn sort_by(self, expr: Vec<Expr>) -> Result<DataFrame> {
self.sort(
expr.into_iter()
.map(|e| e.sort(true, false))
.map(|e| e.sort().asc().nulls_last())
.collect::<Vec<SortExpr>>(),
)
}
Expand All @@ -1202,8 +1202,8 @@ impl DataFrame {
/// .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
/// .await?;
/// let df = df.sort(vec![
/// col("a").sort(false, true), // a DESC, nulls first
/// col("b").sort(true, false), // b ASC, nulls last
/// col("a").sort().desc().nulls_first(), // a DESC, nulls first
/// col("b").sort().asc().nulls_last(), // b ASC, nulls last
/// ])?;
/// let expected = vec![
/// "+---+---+---+",
Expand Down Expand Up @@ -2023,7 +2023,7 @@ impl DataFrame {
/// // Sort the data by column "b" and write it to a new location
/// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new())
/// .await?
/// .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
/// .sort(vec![col("b").sort().asc().nulls_first()])? // sort by b asc, nulls first
/// .write_csv(
/// "output.csv",
/// DataFrameWriteOptions::new(),
Expand Down Expand Up @@ -2097,7 +2097,7 @@ impl DataFrame {
/// // Sort the data by column "b" and write it to a new location
/// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new())
/// .await?
/// .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
/// .sort(vec![col("b").sort().asc().nulls_first()])? // sort by b asc, nulls first
/// .write_json("output.json", DataFrameWriteOptions::new(), None)
/// .await?;
/// # fs::remove_file("output.json")?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl DataFrame {
/// // Sort the data by column "b" and write it to a new location
/// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new())
/// .await?
/// .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
/// .sort(vec![col("b").sort().asc().nulls_first()])? // sort by b asc, nulls first
/// .write_parquet(
/// "output.parquet",
/// DataFrameWriteOptions::new(),
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ mod tests {
),
// sort expr, but non column
(
vec![vec![col("int_col").add(lit(1)).sort(true, true)]],
vec![vec![col("int_col").add(lit(1)).sort().asc().nulls_first()]],
Ok(vec![
[PhysicalSortExpr {
expr: binary(
Expand All @@ -301,7 +301,7 @@ mod tests {
),
// ok with one column
(
vec![vec![col("string_col").sort(true, false)]],
vec![vec![col("string_col").sort().asc().nulls_last()]],
Ok(vec![
[PhysicalSortExpr {
expr: physical_col("string_col", &schema).unwrap(),
Expand All @@ -316,8 +316,8 @@ mod tests {
// ok with two columns, different options
(
vec![vec![
col("string_col").sort(true, false),
col("int_col").sort(false, true),
col("string_col").sort().asc().nulls_last(),
col("int_col").sort().desc().nulls_first(),
]],
Ok(vec![
[
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2926,7 +2926,7 @@ mod tests {
.filter(col("c7").lt(lit(5_u8)))?
.project(vec![col("c1"), col("c2")])?
.aggregate(vec![col("c1")], vec![sum(col("c2"))])?
.sort(vec![col("c1").sort(true, true)])?
.sort(vec![col("c1").sort().asc().nulls_first()])?
.limit(3, Some(10))?
.build()?;

Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/tests/dataframe/dataframe_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ async fn test_fn_approx_median() -> Result<()> {

#[tokio::test]
async fn test_fn_approx_percentile_cont() -> Result<()> {
let expr = approx_percentile_cont(col("b").sort(true, false), lit(0.5), None);
let expr = approx_percentile_cont(col("b").sort().asc().nulls_last(), lit(0.5), None);

let df = create_test_table().await?;
let batches = df.aggregate(vec![], vec![expr]).unwrap().collect().await?;
Expand All @@ -426,7 +426,7 @@ async fn test_fn_approx_percentile_cont() -> Result<()> {
+---------------------------------------------------------------------------+
");

let expr = approx_percentile_cont(col("b").sort(false, false), lit(0.1), None);
let expr = approx_percentile_cont(col("b").sort().desc().nulls_last(), lit(0.1), None);

let df = create_test_table().await?;
let batches = df.aggregate(vec![], vec![expr]).unwrap().collect().await?;
Expand All @@ -447,7 +447,7 @@ async fn test_fn_approx_percentile_cont() -> Result<()> {
None::<&str>,
"arg_2".to_string(),
));
let expr = approx_percentile_cont(col("b").sort(true, false), alias_expr, None);
let expr = approx_percentile_cont(col("b").sort().asc().nulls_last(), alias_expr, None);
let df = create_test_table().await?;
let batches = df.aggregate(vec![], vec![expr]).unwrap().collect().await?;

Expand All @@ -467,7 +467,7 @@ async fn test_fn_approx_percentile_cont() -> Result<()> {
None::<&str>,
"arg_2".to_string(),
));
let expr = approx_percentile_cont(col("b").sort(false, false), alias_expr, None);
let expr = approx_percentile_cont(col("b").sort().desc().nulls_last(), alias_expr, None);
let df = create_test_table().await?;
let batches = df.aggregate(vec![], vec![expr]).unwrap().collect().await?;

Expand All @@ -483,7 +483,7 @@ async fn test_fn_approx_percentile_cont() -> Result<()> {
);

// with number of centroids set
let expr = approx_percentile_cont(col("b").sort(true, false), lit(0.5), Some(lit(2)));
let expr = approx_percentile_cont(col("b").sort().asc().nulls_last(), lit(0.5), Some(lit(2)));

let df = create_test_table().await?;
let batches = df.aggregate(vec![], vec![expr]).unwrap().collect().await?;
Expand All @@ -499,7 +499,7 @@ async fn test_fn_approx_percentile_cont() -> Result<()> {
");

let expr =
approx_percentile_cont(col("b").sort(false, false), lit(0.1), Some(lit(2)));
approx_percentile_cont(col("b").sort().desc().nulls_last(), lit(0.1), Some(lit(2)));

let df = create_test_table().await?;
let batches = df.aggregate(vec![], vec![expr]).unwrap().collect().await?;
Expand Down Expand Up @@ -1327,7 +1327,7 @@ async fn test_count_wildcard() -> Result<()> {
.unwrap()
.project(vec![count_all()])
.unwrap()
.sort(vec![count_all().sort(true, false)])
.sort(vec![count_all().sort().asc().nulls_last()])
.unwrap()
.build()
.unwrap();
Expand Down
Loading