Skip to content

Commit

Permalink
[python 0.6.7] undo performance regression on large no. of threads (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Mar 2, 2021
1 parent 99b5457 commit 09e6c2e
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 7 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
### py-polars 0.6.7
* performance
- \[python | rust\] use mimalloc global allocator
- \[python | rust\] undo performance regression on large number of threads
* bug fix
- \[python\] fix accidental over-allocation in csv-parser
- \[python\ | rust\] fix accidental over-allocation in csv-parser
- \[python\] support agg (dictionary aggregation) for downsample

### py-polars 0.6.6
* performance
Expand Down
22 changes: 22 additions & 0 deletions polars/polars-arrow/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -1 +1,23 @@
use arrow::{buffer::Buffer, util::bit_util};

pub trait IsValid {
/// # Safety
/// no bound checks
unsafe fn is_valid_unchecked(&self, i: usize) -> bool;

/// # Safety
/// no bound checks
unsafe fn is_null_unchecked(&self, i: usize) -> bool;
}

impl IsValid for Buffer {
#[inline]
unsafe fn is_valid_unchecked(&self, i: usize) -> bool {
bit_util::get_bit_raw(self.as_ptr(), i)
}

#[inline]
unsafe fn is_null_unchecked(&self, i: usize) -> bool {
!self.is_valid_unchecked(i)
}
}
7 changes: 6 additions & 1 deletion polars/polars-core/src/chunked_array/kernels/take_agg.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! kernels that combine take and aggregations.
use crate::prelude::*;
use arrow::array::{Array, PrimitiveArray};
use polars_arrow::buffer::IsValid;

/// Take kernel for single chunk without nulls and an iterator as index.
pub(crate) unsafe fn take_agg_no_null_primitive_iter_unchecked<
Expand Down Expand Up @@ -39,9 +40,13 @@ pub(crate) unsafe fn take_agg_primitive_iter_unchecked<
}

let array_values = arr.values();
let buf = arr
.data_ref()
.null_buffer()
.expect("null buffer should be there");

let out = indices.into_iter().fold(init, |acc, idx| {
if arr.is_valid(idx) {
if buf.is_valid_unchecked(idx) {
f(acc, *array_values.get_unchecked(idx))
} else {
acc
Expand Down
2 changes: 1 addition & 1 deletion py-polars/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "py-polars"
version = "0.6.6"
version = "0.6.7"
authors = ["ritchie46 <[email protected]>"]
edition = "2018"
readme = "README.md"
Expand Down
12 changes: 8 additions & 4 deletions py-polars/pypolars/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1328,8 +1328,6 @@ def agg(
-------
Result of groupby split apply operations.
"""
if self.downsample:
raise ValueError("agg not suppore in downsample operation")
if isinstance(column_to_agg, dict):
column_to_agg = [
(column, [agg] if isinstance(agg, str) else agg)
Expand All @@ -1340,6 +1338,12 @@ def agg(
(column, [agg] if isinstance(agg, str) else agg)
for (column, agg) in column_to_agg
]
if self.downsample:
return wrap_df(
self._df.downsample_agg(
self.by, self.rule, self.downsample_n, column_to_agg
)
)

return wrap_df(self._df.groupby_agg(self.by, column_to_agg))

Expand All @@ -1353,7 +1357,7 @@ def select(self, columns: "Union[str, List[str]]") -> "GBSelection":
One or multiple columns
"""
if self.downsample:
raise ValueError("select not suppore in downsample operation")
raise ValueError("select not supported in downsample operation")
if isinstance(columns, str):
columns = [columns]
return GBSelection(self._df, self.by, columns)
Expand All @@ -1378,7 +1382,7 @@ def pivot(self, pivot_column: str, values_column: str) -> "PivotOps":
Column that will be aggregated
"""
if self.downsample:
raise ValueError("pivot not suppore in downsample operation")
raise ValueError("pivot not supported in downsample operation")
return PivotOps(self._df, self.by, pivot_column, values_column)

def first(self) -> DataFrame:
Expand Down
22 changes: 22 additions & 0 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,28 @@ impl PyDataFrame {
}
}

pub fn downsample_agg(
&self,
by: &str,
rule: &str,
n: u32,
column_to_agg: Vec<(&str, Vec<&str>)>,
) -> PyResult<Self> {
let rule = match rule {
"second" => SampleRule::Second(n),
"minute" => SampleRule::Minute(n),
"day" => SampleRule::Day(n),
"hour" => SampleRule::Hour(n),
a => {
return Err(PyPolarsEr::Other(format!("rule {} not supported", a)).into());
}
};
let gb = self.df.downsample(by, rule).map_err(PyPolarsEr::from)?;
let df = gb.agg(&column_to_agg).map_err(PyPolarsEr::from)?;
let out = df.sort(by, false).map_err(PyPolarsEr::from)?;
Ok(out.into())
}

pub fn downsample(&self, by: &str, rule: &str, n: u32, agg: &str) -> PyResult<Self> {
let rule = match rule {
"second" => SampleRule::Second(n),
Expand Down
6 changes: 6 additions & 0 deletions py-polars/tests/test_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ def test_downsample():
out = df.downsample("a", rule="minute", n=5).first()
assert out.shape == (4, 2)

# OLHC
out = df.downsample("a", rule="minute", n=5).agg(
{"b": ["first", "min", "max", "last"]}
)
assert out.shape == (4, 5)

# test to_pandas as well.
out = df.to_pandas()
assert out["a"].dtype == "datetime64[ns]"
Expand Down

0 comments on commit 09e6c2e

Please sign in to comment.