Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Jan 5, 2025
1 parent edb8875 commit 35de02d
Show file tree
Hide file tree
Showing 5 changed files with 390 additions and 470 deletions.
170 changes: 67 additions & 103 deletions crates/rayexec_execution/src/functions/aggregate/builtin/first.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,38 @@
use std::borrow::Borrow;
use std::fmt::{self, Debug};
use std::fmt::Debug;
use std::marker::PhantomData;

use half::f16;
use rayexec_error::{not_implemented, Result};

use crate::arrays::array::ArrayData2;
use crate::arrays::buffer::physical_type::{
AddressableMut,
MutablePhysicalStorage,
PhysicalBinary,
PhysicalBool,
PhysicalF16,
PhysicalF32,
PhysicalF64,
PhysicalI128,
PhysicalI16,
PhysicalI32,
PhysicalI64,
PhysicalI8,
PhysicalInterval,
PhysicalType,
PhysicalU128,
PhysicalU16,
PhysicalU32,
PhysicalU64,
PhysicalU8,
PhysicalUntypedNull,
PhysicalUtf8,
};
use crate::arrays::datatype::{DataType, DataTypeId};
use crate::arrays::executor::aggregate::{AggregateState2, StateFinalizer};
use crate::arrays::executor::builder::{ArrayBuilder, GermanVarlenBuffer};
use crate::arrays::executor::physical_type::{
PhysicalBinary_2,
PhysicalBool_2,
PhysicalF16_2,
PhysicalF32_2,
PhysicalF64_2,
PhysicalI128_2,
PhysicalI16_2,
PhysicalI32_2,
PhysicalI64_2,
PhysicalI8_2,
PhysicalInterval_2,
PhysicalStorage2,
PhysicalType2,
PhysicalU128_2,
PhysicalU16_2,
PhysicalU32_2,
PhysicalU64_2,
PhysicalU8_2,
PhysicalUntypedNull_2,
};
use crate::arrays::datatype::DataTypeId;
use crate::arrays::executor_exp::aggregate::AggregateState;
use crate::arrays::executor_exp::PutBuffer;
use crate::arrays::scalar::interval::Interval;
use crate::arrays::storage::{PrimitiveStorage, UntypedNull};
use crate::expr::Expression;
use crate::functions::aggregate::states::{
boolean_finalize,
drain,
new_unary_aggregate_states2,
primitive_finalize,
unary_update,
untyped_null_finalize,
AggregateGroupStates,
TypedAggregateGroupStates,
};
Expand Down Expand Up @@ -94,67 +79,27 @@ impl AggregateFunction for First {
let datatype = inputs[0].datatype(table_list)?;

let function_impl: Box<dyn AggregateFunctionImpl> = match datatype.physical_type() {
// PhysicalType::Boolean
PhysicalType::UntypedNull => Box::new(FirstPrimitiveImpl::<PhysicalUntypedNull>::new()),
PhysicalType::Boolean => Box::new(FirstPrimitiveImpl::<PhysicalBool>::new()),
PhysicalType::Int8 => Box::new(FirstPrimitiveImpl::<PhysicalI8>::new()),
PhysicalType::Int16 => Box::new(FirstPrimitiveImpl::<PhysicalI16>::new()),
PhysicalType::Int32 => Box::new(FirstPrimitiveImpl::<PhysicalI32>::new()),
PhysicalType::Int64 => Box::new(FirstPrimitiveImpl::<PhysicalI64>::new()),
PhysicalType::Int128 => Box::new(FirstPrimitiveImpl::<PhysicalI128>::new()),
PhysicalType::UInt8 => Box::new(FirstPrimitiveImpl::<PhysicalU8>::new()),
PhysicalType::UInt16 => Box::new(FirstPrimitiveImpl::<PhysicalU16>::new()),
PhysicalType::UInt32 => Box::new(FirstPrimitiveImpl::<PhysicalU32>::new()),
PhysicalType::UInt64 => Box::new(FirstPrimitiveImpl::<PhysicalU64>::new()),
PhysicalType::UInt128 => Box::new(FirstPrimitiveImpl::<PhysicalU128>::new()),
PhysicalType::Float16 => Box::new(FirstPrimitiveImpl::<PhysicalF16>::new()),
PhysicalType::Float32 => Box::new(FirstPrimitiveImpl::<PhysicalF32>::new()),
PhysicalType::Float64 => Box::new(FirstPrimitiveImpl::<PhysicalF64>::new()),
PhysicalType::Interval => Box::new(FirstPrimitiveImpl::<PhysicalInterval>::new()),
PhysicalType::Utf8 => Box::new(FirstStringImpl),
PhysicalType::Binary => Box::new(FirstBinaryImpl),
other => not_implemented!("FIRST for physical type: {other}"),
};

// let function_impl: Box<dyn AggregateFunctionImpl> = match datatype.physical_type2()? {
// PhysicalType2::UntypedNull => Box::new(FirstUntypedNullImpl),
// PhysicalType2::Boolean => Box::new(FirstBoolImpl),
// PhysicalType2::Float16 => Box::new(FirstPrimitiveImpl::<PhysicalF16_2, f16>::new(
// datatype.clone(),
// )),
// PhysicalType2::Float32 => Box::new(FirstPrimitiveImpl::<PhysicalF32_2, f32>::new(
// datatype.clone(),
// )),
// PhysicalType2::Float64 => Box::new(FirstPrimitiveImpl::<PhysicalF64_2, f64>::new(
// datatype.clone(),
// )),
// PhysicalType2::Int8 => Box::new(FirstPrimitiveImpl::<PhysicalI8_2, i8>::new(
// datatype.clone(),
// )),
// PhysicalType2::Int16 => Box::new(FirstPrimitiveImpl::<PhysicalI16_2, i16>::new(
// datatype.clone(),
// )),
// PhysicalType2::Int32 => Box::new(FirstPrimitiveImpl::<PhysicalI32_2, i32>::new(
// datatype.clone(),
// )),
// PhysicalType2::Int64 => Box::new(FirstPrimitiveImpl::<PhysicalI64_2, i64>::new(
// datatype.clone(),
// )),
// PhysicalType2::Int128 => Box::new(FirstPrimitiveImpl::<PhysicalI128_2, i128>::new(
// datatype.clone(),
// )),
// PhysicalType2::UInt8 => Box::new(FirstPrimitiveImpl::<PhysicalU8_2, u8>::new(
// datatype.clone(),
// )),
// PhysicalType2::UInt16 => Box::new(FirstPrimitiveImpl::<PhysicalU16_2, u16>::new(
// datatype.clone(),
// )),
// PhysicalType2::UInt32 => Box::new(FirstPrimitiveImpl::<PhysicalU32_2, u32>::new(
// datatype.clone(),
// )),
// PhysicalType2::UInt64 => Box::new(FirstPrimitiveImpl::<PhysicalU64_2, u64>::new(
// datatype.clone(),
// )),
// PhysicalType2::UInt128 => Box::new(FirstPrimitiveImpl::<PhysicalU128_2, u128>::new(
// datatype.clone(),
// )),
// PhysicalType2::Interval => Box::new(
// FirstPrimitiveImpl::<PhysicalInterval_2, Interval>::new(datatype.clone()),
// ),
// PhysicalType2::Binary => Box::new(FirstBinaryImpl {
// datatype: datatype.clone(),
// }),
// PhysicalType2::Utf8 => Box::new(FirstBinaryImpl {
// datatype: datatype.clone(),
// }),
// PhysicalType2::List => {
// // TODO: Easy, clone underlying array and select.
// not_implemented!("FIRST for list arrays")
// }
// };

Ok(PlannedAggregateFunction {
function: Box::new(*self),
return_type: datatype,
Expand All @@ -169,6 +114,12 @@ pub struct FirstPrimitiveImpl<S> {
_s: PhantomData<S>,
}

impl<S> FirstPrimitiveImpl<S> {
const fn new() -> Self {
FirstPrimitiveImpl { _s: PhantomData }
}
}

impl<S> AggregateFunctionImpl for FirstPrimitiveImpl<S>
where
S: MutablePhysicalStorage,
Expand All @@ -183,18 +134,31 @@ where
}
}

// #[derive(Debug, Clone, Copy)]
// pub struct FirstBinaryImpl;
#[derive(Debug, Clone, Copy)]
pub struct FirstBinaryImpl;

// impl AggregateFunctionImpl for FirstBinaryImpl {
// fn new_states(&self) -> Box<dyn AggregateGroupStates> {
// Box::new(TypedAggregateGroupStates::new(
// FirstBinaryState::default,
// unary_update::<PhysicalBinary, PhysicalBinary, _>,
// drain::<PhysicalBinary, _, _>,
// ))
// }
// }
impl AggregateFunctionImpl for FirstBinaryImpl {
fn new_states(&self) -> Box<dyn AggregateGroupStates> {
Box::new(TypedAggregateGroupStates::new(
FirstBinaryState::default,
unary_update::<PhysicalBinary, PhysicalBinary, _>,
drain::<PhysicalBinary, _, _>,
))
}
}

#[derive(Debug, Clone, Copy)]
pub struct FirstStringImpl;

impl AggregateFunctionImpl for FirstStringImpl {
fn new_states(&self) -> Box<dyn AggregateGroupStates> {
Box::new(TypedAggregateGroupStates::new(
FirstStringState::default,
unary_update::<PhysicalUtf8, PhysicalUtf8, _>,
drain::<PhysicalUtf8, _, _>,
))
}
}

#[derive(Debug, Default)]
pub struct FirstPrimitiveState<T> {
Expand Down
Loading

0 comments on commit 35de02d

Please sign in to comment.