Skip to content

Commit

Permalink
NTH_VALUE reverse support (apache#8327)
Browse files Browse the repository at this point in the history
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
  • Loading branch information
mustafasrepo and ozankabak authored Nov 29, 2023
1 parent 2a69244 commit e21b031
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use std::sync::Arc;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::{
add_sort_above, get_children_exectrees, get_plan_string, is_coalesce_partitions,
is_repartition, is_sort_preserving_merge, ExecTree,
add_sort_above, get_children_exectrees, is_coalesce_partitions, is_repartition,
is_sort_preserving_merge, ExecTree,
};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
Expand All @@ -54,8 +54,8 @@ use datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
physical_exprs_equal, EquivalenceProperties, PhysicalExpr,
};
use datafusion_physical_plan::unbounded_output;
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
use datafusion_physical_plan::{get_plan_string, unbounded_output};

use itertools::izip;

Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,9 +765,8 @@ mod tests {
repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec,
sort_preserving_merge_exec, spr_repartition_exec, union_exec,
};
use crate::physical_optimizer::utils::get_plan_string;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::{displayable, Partitioning};
use crate::physical_plan::{displayable, get_plan_string, Partitioning};
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::csv_exec_sorted;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,6 @@ mod tests {
use crate::physical_optimizer::projection_pushdown::{
join_table_borders, update_expr, ProjectionPushdown,
};
use crate::physical_optimizer::utils::get_plan_string;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::filter::FilterExec;
Expand All @@ -1141,7 +1140,7 @@ mod tests {
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{get_plan_string, ExecutionPlan};

use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions};
use datafusion_common::config::ConfigOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ mod tests {
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::{displayable, Partitioning};
use crate::physical_plan::{displayable, get_plan_string, Partitioning};
use crate::prelude::SessionConfig;

use arrow::compute::SortOptions;
Expand Down Expand Up @@ -958,11 +958,4 @@ mod tests {
FileCompressionType::UNCOMPRESSED,
))
}

// Util function to get string representation of a physical plan
fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
let formatted = displayable(plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
actual.iter().map(|elem| elem.to_string()).collect()
}
}
9 changes: 1 addition & 8 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{displayable, ExecutionPlan};
use crate::physical_plan::{get_plan_string, ExecutionPlan};

use datafusion_physical_expr::{LexRequirementRef, PhysicalSortRequirement};

Expand Down Expand Up @@ -154,10 +154,3 @@ pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<RepartitionExec>()
}

/// Utility function yielding a string representation of the given [`ExecutionPlan`].
pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
let formatted = displayable(plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
actual.iter().map(|elem| elem.to_string()).collect()
}
89 changes: 64 additions & 25 deletions datafusion/physical-expr/src/window/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,24 @@
// specific language governing permissions and limitations
// under the License.

//! Defines physical expressions for `first_value`, `last_value`, and `nth_value`
//! that can evaluated at runtime during query execution
//! Defines physical expressions for `FIRST_VALUE`, `LAST_VALUE`, and `NTH_VALUE`
//! functions that can be evaluated at run time during query execution.
use std::any::Any;
use std::cmp::Ordering;
use std::ops::Range;
use std::sync::Arc;

use crate::window::window_expr::{NthValueKind, NthValueState};
use crate::window::BuiltInWindowFunctionExpr;
use crate::PhysicalExpr;

use arrow::array::{Array, ArrayRef};
use arrow::datatypes::{DataType, Field};
use datafusion_common::{exec_err, ScalarValue};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::window_state::WindowAggState;
use datafusion_expr::PartitionEvaluator;
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;

/// nth_value expression
#[derive(Debug)]
Expand Down Expand Up @@ -77,17 +80,17 @@ impl NthValue {
n: u32,
) -> Result<Self> {
match n {
0 => exec_err!("nth_value expect n to be > 0"),
0 => exec_err!("NTH_VALUE expects n to be non-zero"),
_ => Ok(Self {
name: name.into(),
expr,
data_type,
kind: NthValueKind::Nth(n),
kind: NthValueKind::Nth(n as i64),
}),
}
}

/// Get nth_value kind
/// Get the NTH_VALUE kind
pub fn get_kind(&self) -> NthValueKind {
self.kind
}
Expand Down Expand Up @@ -125,7 +128,7 @@ impl BuiltInWindowFunctionExpr for NthValue {
let reversed_kind = match self.kind {
NthValueKind::First => NthValueKind::Last,
NthValueKind::Last => NthValueKind::First,
NthValueKind::Nth(_) => return None,
NthValueKind::Nth(idx) => NthValueKind::Nth(-idx),
};
Some(Arc::new(Self {
name: self.name.clone(),
Expand All @@ -143,16 +146,17 @@ pub(crate) struct NthValueEvaluator {
}

impl PartitionEvaluator for NthValueEvaluator {
/// When the window frame has a fixed beginning (e.g UNBOUNDED
/// PRECEDING), for some functions such as FIRST_VALUE, LAST_VALUE and
/// NTH_VALUE we can memoize result. Once result is calculated it
/// will always stay same. Hence, we do not need to keep past data
/// as we process the entire dataset. This feature enables us to
/// prune rows from table. The default implementation does nothing
/// When the window frame has a fixed beginning (e.g UNBOUNDED PRECEDING),
/// for some functions such as FIRST_VALUE, LAST_VALUE and NTH_VALUE, we
/// can memoize the result. Once result is calculated, it will always stay
/// same. Hence, we do not need to keep past data as we process the entire
/// dataset.
fn memoize(&mut self, state: &mut WindowAggState) -> Result<()> {
let out = &state.out_col;
let size = out.len();
let (is_prunable, is_last) = match self.state.kind {
let mut buffer_size = 1;
// Decide if we arrived at a final result yet:
let (is_prunable, is_reverse_direction) = match self.state.kind {
NthValueKind::First => {
let n_range =
state.window_frame_range.end - state.window_frame_range.start;
Expand All @@ -162,16 +166,30 @@ impl PartitionEvaluator for NthValueEvaluator {
NthValueKind::Nth(n) => {
let n_range =
state.window_frame_range.end - state.window_frame_range.start;
(n_range >= (n as usize) && size >= (n as usize), false)
match n.cmp(&0) {
Ordering::Greater => {
(n_range >= (n as usize) && size > (n as usize), false)
}
Ordering::Less => {
let reverse_index = (-n) as usize;
buffer_size = reverse_index;
// Negative index represents reverse direction.
(n_range >= reverse_index, true)
}
Ordering::Equal => {
// The case n = 0 is not valid for the NTH_VALUE function.
unreachable!();
}
}
}
};
if is_prunable {
if self.state.finalized_result.is_none() && !is_last {
if self.state.finalized_result.is_none() && !is_reverse_direction {
let result = ScalarValue::try_from_array(out, size - 1)?;
self.state.finalized_result = Some(result);
}
state.window_frame_range.start =
state.window_frame_range.end.saturating_sub(1);
state.window_frame_range.end.saturating_sub(buffer_size);
}
Ok(())
}
Expand All @@ -195,12 +213,33 @@ impl PartitionEvaluator for NthValueEvaluator {
NthValueKind::First => ScalarValue::try_from_array(arr, range.start),
NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1),
NthValueKind::Nth(n) => {
// We are certain that n > 0.
let index = (n as usize) - 1;
if index >= n_range {
ScalarValue::try_from(arr.data_type())
} else {
ScalarValue::try_from_array(arr, range.start + index)
match n.cmp(&0) {
Ordering::Greater => {
// SQL indices are not 0-based.
let index = (n as usize) - 1;
if index >= n_range {
// Outside the range, return NULL:
ScalarValue::try_from(arr.data_type())
} else {
ScalarValue::try_from_array(arr, range.start + index)
}
}
Ordering::Less => {
let reverse_index = (-n) as usize;
if n_range >= reverse_index {
ScalarValue::try_from_array(
arr,
range.start + n_range - reverse_index,
)
} else {
// Outside the range, return NULL:
ScalarValue::try_from(arr.data_type())
}
}
Ordering::Equal => {
// The case n = 0 is not valid for the NTH_VALUE function.
unreachable!();
}
}
}
}
Expand Down
16 changes: 9 additions & 7 deletions datafusion/physical-expr/src/window/window_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;

use crate::{PhysicalExpr, PhysicalSortExpr};

use arrow::array::{new_empty_array, Array, ArrayRef};
use arrow::compute::kernels::sort::SortColumn;
use arrow::compute::SortOptions;
Expand All @@ -25,13 +31,9 @@ use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::window_state::{
PartitionBatchState, WindowAggState, WindowFrameContext,
};
use datafusion_expr::PartitionEvaluator;
use datafusion_expr::{Accumulator, WindowFrame};
use datafusion_expr::{Accumulator, PartitionEvaluator, WindowFrame};

use indexmap::IndexMap;
use std::any::Any;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;

/// Common trait for [window function] implementations
///
Expand Down Expand Up @@ -292,7 +294,7 @@ pub struct NumRowsState {
pub enum NthValueKind {
First,
Last,
Nth(u32),
Nth(i64),
}

#[derive(Debug, Clone)]
Expand Down
7 changes: 7 additions & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,5 +570,12 @@ pub fn unbounded_output(plan: &Arc<dyn ExecutionPlan>) -> bool {
.unwrap_or(true)
}

/// Utility function yielding a string representation of the given [`ExecutionPlan`].
pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
let formatted = displayable(plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
actual.iter().map(|elem| elem.to_string()).collect()
}

#[cfg(test)]
pub mod test;
Loading

0 comments on commit e21b031

Please sign in to comment.