-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Specialized GroupValues
for primitive
and large_primitive
#16136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
GroupValues
for primitive
and large_primitive
Improve primitive group valuesGroupValues
for primitive
and large_primitive
afceb44
to
bd002ad
Compare
bd002ad
to
1b5cde9
Compare
/// | ||
map: HashTable<(usize, u64)>, | ||
map: HashTable<(usize, T::Native)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is storing values: Vec<T::Native>
in this case necessary?
It could be rebuild in emit_internal
by traversing the map items I think (create a Vec
and update by group index). This avoids doing that while inserting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like:
diff --git i/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive/mod.rs w/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive/mod.rs
index 693cc997fa..c166c16f8e 100644
--- i/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive/mod.rs
+++ w/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive/mod.rs
@@ -25,7 +25,6 @@ use arrow::array::{
use arrow::datatypes::{i256, DataType};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
-use datafusion_execution::memory_pool::proxy::VecAllocExt;
use datafusion_expr::EmitTo;
use half::f16;
use hashbrown::hash_table::HashTable;
@@ -33,7 +32,6 @@ use std::mem::size_of;
use std::sync::Arc;
mod large_primitive;
-pub use large_primitive::GroupValuesLargePrimitive;
/// A trait to allow hashing of floating point numbers
pub(crate) trait HashValue {
@@ -94,8 +92,6 @@ pub struct GroupValuesPrimitive<T: ArrowPrimitiveType> {
map: HashTable<(usize, T::Native)>,
/// The group index of the null value if any
null_group: Option<usize>,
- /// The values for each group index
- values: Vec<T::Native>,
/// The random state used to generate hashes
random_state: RandomState,
}
@@ -106,7 +102,6 @@ impl<T: ArrowPrimitiveType> GroupValuesPrimitive<T> {
Self {
data_type,
map: HashTable::with_capacity(128),
- values: Vec::with_capacity(128),
null_group: None,
random_state: Default::default(),
}
@@ -124,13 +119,14 @@ where
for v in cols[0].as_primitive::<T>() {
let group_id = match v {
None => *self.null_group.get_or_insert_with(|| {
- let group_id = self.values.len();
- self.values.push(Default::default());
+ let group_id = self.map.len();
group_id
}),
Some(key) => {
let state = &self.random_state;
let hash = key.hash(state);
+ let group_id = self.map.len();
+
let insert = self.map.entry(
hash,
|&(_, v)| v.is_eq(key),
@@ -140,10 +136,8 @@ where
match insert {
hashbrown::hash_table::Entry::Occupied(o) => o.get().0,
hashbrown::hash_table::Entry::Vacant(v) => {
- let g = self.values.len();
- v.insert((g, key));
- self.values.push(key);
- g
+ v.insert((group_id, key));
+ group_id
}
}
}
@@ -155,21 +149,19 @@ where
fn size(&self) -> usize {
self.map.capacity() * size_of::<(usize, T::Native)>()
- + self.values.allocated_size()
}
fn is_empty(&self) -> bool {
- self.values.is_empty()
+ self.map.is_empty()
}
fn len(&self) -> usize {
- self.values.len()
+ self.map.len()
}
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
- emit_internal::<T, T::Native>(
+ emit_internal::<T>(
emit_to,
- &mut self.values,
&mut self.null_group,
&mut self.map,
self.data_type.clone(),
@@ -178,65 +170,64 @@ where
fn clear_shrink(&mut self, batch: &RecordBatch) {
let count = batch.num_rows();
- self.values.clear();
- self.values.shrink_to(count);
self.map.clear();
self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared
}
}
-pub(crate) fn emit_internal<T: ArrowPrimitiveType, K>(
+pub(crate) fn emit_internal<T: ArrowPrimitiveType>(
emit_to: EmitTo,
- values: &mut Vec<T::Native>,
null_group: &mut Option<usize>,
- map: &mut HashTable<(usize, K)>,
+ map: &mut HashTable<(usize, T::Native)>,
data_type: DataType,
) -> Result<Vec<ArrayRef>> {
fn build_primitive<T: ArrowPrimitiveType>(
- values: Vec<T::Native>,
+ map: HashTable<(usize, T::Native)>,
null_idx: Option<usize>,
) -> PrimitiveArray<T> {
let nulls = null_idx.map(|null_idx| {
- let mut buffer = NullBufferBuilder::new(values.len());
+ let mut buffer = NullBufferBuilder::new(map.len());
buffer.append_n_non_nulls(null_idx);
buffer.append_null();
- buffer.append_n_non_nulls(values.len() - null_idx - 1);
+ buffer.append_n_non_nulls(map.len() - null_idx - 1);
// NOTE: The inner builder must be constructed as there is at least one null
buffer.finish().unwrap()
});
+ let mut values: Vec<T::Native> = vec![T::default_value(); map.len()];
+ map.iter().for_each(|i| values[i.0] = i.1);
PrimitiveArray::<T>::new(values.into(), nulls)
}
let array: PrimitiveArray<T> = match emit_to {
EmitTo::All => {
- map.clear();
- build_primitive(std::mem::take(values), null_group.take())
+ build_primitive(std::mem::take(map), null_group.take())
}
EmitTo::First(n) => {
- map.retain(|entry| {
- // Decrement group index by n
- let group_idx = entry.0;
- match group_idx.checked_sub(n) {
- // Group index was >= n, shift value down
- Some(sub) => {
- entry.0 = sub;
- true
- }
- // Group index was < n, so remove from table
- None => false,
- }
- });
- let null_group = match null_group {
- Some(v) if *v >= n => {
- *v -= n;
- None
- }
- Some(_) => null_group.take(),
- None => None,
- };
- let mut split = values.split_off(n);
- std::mem::swap(values, &mut split);
- build_primitive(split, null_group)
+ todo!("");
+ // map.retain(|entry| {
+ // // Decrement group index by n
+ // let group_idx = entry.0;
+ // match group_idx.checked_sub(n) {
+ // // Group index was >= n, shift value down
+ // Some(sub) => {
+ // entry.0 = sub;
+ // true
+ // }
+ // // Group index was < n, so remove from table
+ // None => false,
+ // }
+ // });
+ // let null_group = match null_group {
+ // Some(v) if *v >= n => {
+ // *v -= n;
+ // None
+ // }
+ // Some(_) => null_group.take(),
+ // None => None,
+ // };
+ // let mut split = values.split_off(n);
+ // std::mem::swap(values, &mut split);
+ // build_primitive(split, null_group)
}
};
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to try it, too.
But I think it will lead to regression, becasue many extra random writes exist in emit
if we do that.
@@ -130,15 +133,15 @@ where | |||
let hash = key.hash(state); | |||
let insert = self.map.entry( | |||
hash, | |||
|&(g, _)| unsafe { self.values.get_unchecked(g).is_eq(key) }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the existing performance couldn't already be improved by doing h == hash && unsafe { self.values.get_unchecked(g).is_eq(key) }
(i.e. avoiding fetching the value if hash equality doesn't hold), like we do in other places. This already filters out most of them as hashes are almost never the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for the large
(> 64bits), we can do that to improve.
But for normal
(<= 64bits), the new way will be better, I am testing it.
/// | ||
/// This specialization is significantly faster than using the more general | ||
/// purpose `Row`s format | ||
pub struct GroupValuesLargePrimitive<T: ArrowPrimitiveType> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use large primitive? Is 128 bits slower otherwise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use large primitive? Is 128 bits slower otherwise?
🤔 I currently decide which type to use GroupValuesLargePrimitive
, according to if its bits > 64.
Because the hash
is 64bits, when bits > 64, the map
will get larger, and I am not sure if it is still better to go the new way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm I think the storage of a value inside HashTable
will not be very different from Vec
besides being inline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm I think the storage of a value inside
HashTable
will not be very different fromVec
besides being inline?
I think for the value <= 64bits
, storing it inside HashTable
:
- Can decrease the size of
HashTable
comparing to store their hashes? - And when probing and rehashing of
HashTable
totally in-place, and eliminting the random read to a the outsideVec
?
But I am indeed not sure, for the large primitive, if doing this can get benefit, because storing them inline HashTable
will make the table larger than storing their hashes...
Also comparing them directly is more expansive than comparing their hashes...
🤖 |
@@ -222,3 +184,61 @@ where | |||
self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared | |||
} | |||
} | |||
|
|||
pub(crate) fn emit_internal<T: ArrowPrimitiveType, K>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
K is unneeded here, it can take emit_internal<T: ArrowPrimitiveType>
🤖: Benchmark completed Details
|
Thanks, q4 and q15 are the target, and it seems indeed get faster! |
313ccfc
to
cf053cb
Compare
/// This specialization is significantly faster than using the more general | ||
/// purpose `Row`s format |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would help me a lot if you could document the rationale behind this change and why it is different than GroupValuesPrimitive
/// This specialization is significantly faster than using the more general | |
/// purpose `Row`s format | |
/// This specialization is faster than [`GroupValuesPrimitive`] because it does not store values directly | |
/// in the hash table, but instead stores an index into self.vales |
Or something like that
} | ||
} | ||
}; | ||
groups.push(group_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can extend
to groups here by writing this as iterator.
let g = num_total_groups; | ||
v.insert((g, key)); | ||
self.append_row_indices.push(row_index as u32); | ||
num_total_groups += 1; | ||
g | ||
} | ||
} | ||
} | ||
}; | ||
groups.push(group_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can extend
to groups
self.values.extend_from_slice(col.values()); | ||
} else { | ||
let col_values = col.values(); | ||
for &row_index in self.append_row_indices.iter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be written as
self.values.extend(self.append_row_indices.iter().map(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually... I found no obvious improvement when switching to extend
...
The bottleneck still the hashtable
, I think it is better to just keep the original push
logic because it may be simpler, and actually efficient enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR makes things better so approving. Nice work @Rachelint.
Thanks @alamb , I think still two blocked things before merging it:
|
🤖 |
🤖: Benchmark completed Details
|
Which issue does this PR close?
GroupValues
forprimitive
andlarge_primitive
#16135Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?