Skip to content

Commit

Permalink
feat(rust, python): comm_subexpr_elim in streaming 'select/with_colum…
Browse files Browse the repository at this point in the history
…ns' (pola-rs#10050)
  • Loading branch information
ritchie46 authored Jul 24, 2023
1 parent cdacbfe commit 42c21bd
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 27 deletions.
2 changes: 1 addition & 1 deletion polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ meta = ["polars-plan/meta"]
pivot = ["polars-core/rows", "polars-ops/pivot"]
top_k = ["polars-plan/top_k"]
semi_anti_join = ["polars-plan/semi_anti_join"]
cse = ["polars-plan/cse"]
cse = ["polars-plan/cse", "polars-pipe/cse"]
propagate_nans = ["polars-plan/propagate_nans"]
coalesce = ["polars-plan/coalesce"]
regex = ["polars-plan/regex"]
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/polars-pipe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ dtype-array = ["polars-core/dtype-array"]
dtype-categorical = ["polars-core/dtype-categorical"]
trigger_ooc = []
test = ["compile", "polars-core/chunked_ids"]
cse = []
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::sync::Arc;
use polars_core::error::PolarsResult;
use polars_core::frame::DataFrame;
use polars_core::schema::SchemaRef;
#[cfg(feature = "cse")]
use polars_plan::utils::rename_cse_tmp_series;

use crate::expressions::PhysicalPipedExpr;
use crate::operators::{DataChunk, Operator, OperatorResult, PExecutionContext};
Expand Down Expand Up @@ -46,6 +48,8 @@ impl Operator for FastProjectionOperator {
#[derive(Clone)]
pub(crate) struct ProjectionOperator {
pub(crate) exprs: Vec<Arc<dyn PhysicalPipedExpr>>,
#[cfg(feature = "cse")]
pub(crate) cse_exprs: Option<HstackOperator>,
}

impl Operator for ProjectionOperator {
Expand All @@ -54,19 +58,36 @@ impl Operator for ProjectionOperator {
context: &PExecutionContext,
chunk: &DataChunk,
) -> PolarsResult<OperatorResult> {
// add temporary cse column to the chunk
#[cfg(feature = "cse")]
let cse_owned_chunk;
#[cfg(feature = "cse")]
let chunk = if let Some(hstack) = &mut self.cse_exprs {
let OperatorResult::Finished(out) = hstack.execute(context, chunk)? else { unreachable!() };
cse_owned_chunk = out;
&cse_owned_chunk
} else {
chunk
};

let mut has_literals = false;
let mut has_empty = false;
let mut projected = self
.exprs
.iter()
.map(|e| {
let s = e.evaluate(chunk, context.execution_state.as_any())?;
if s.len() == 1 {
has_literals = true;
}
if s.len() == 0 {
has_empty = true;
#[allow(unused_mut)]
let mut s = e.evaluate(chunk, context.execution_state.as_any())?;

// correct the cse name
#[cfg(feature = "cse")]
if self.cse_exprs.is_some() {
rename_cse_tmp_series(&mut s);
}

has_literals |= s.len() == 1;
has_empty |= s.len() == 0;

Ok(s)
})
.collect::<PolarsResult<Vec<_>>>()?;
Expand All @@ -92,6 +113,15 @@ impl Operator for ProjectionOperator {
Box::new(self.clone())
}
fn fmt(&self) -> &str {
#[cfg(feature = "cse")]
{
if self.cse_exprs.is_some() {
"projection[cse]"
} else {
"projection"
}
}
#[cfg(not(feature = "cse"))]
"projection"
}
}
Expand All @@ -100,6 +130,8 @@ impl Operator for ProjectionOperator {
pub(crate) struct HstackOperator {
pub(crate) exprs: Vec<Arc<dyn PhysicalPipedExpr>>,
pub(crate) input_schema: SchemaRef,
#[cfg(feature = "cse")]
pub(crate) cse_exprs: Option<Box<Self>>,
}

impl Operator for HstackOperator {
Expand All @@ -108,13 +140,43 @@ impl Operator for HstackOperator {
context: &PExecutionContext,
chunk: &DataChunk,
) -> PolarsResult<OperatorResult> {
// add temporary cse column to the chunk
#[cfg(feature = "cse")]
let width = chunk.data.width();
#[cfg(feature = "cse")]
let cse_owned_chunk;
#[cfg(feature = "cse")]
let chunk = if let Some(hstack) = &mut self.cse_exprs {
let OperatorResult::Finished(out) = hstack.execute(context, chunk)? else { unreachable!() };
cse_owned_chunk = out;
&cse_owned_chunk
} else {
chunk
};

let projected = self
.exprs
.iter()
.map(|e| e.evaluate(chunk, context.execution_state.as_any()))
.map(|e| {
#[allow(unused_mut)]
let mut res = e.evaluate(chunk, context.execution_state.as_any());

#[cfg(feature = "cse")]
if self.cse_exprs.is_some() {
res = res.map(|mut s| {
rename_cse_tmp_series(&mut s);
s
})
}
res
})
.collect::<PolarsResult<Vec<_>>>()?;

#[cfg(feature = "cse")]
let mut df = DataFrame::new_no_checks(chunk.data.get_columns()[..width].to_vec());
#[cfg(not(feature = "cse"))]
let mut df = chunk.data.clone();

let schema = &*self.input_schema;
df._add_columns(projected, schema)?;

Expand All @@ -125,6 +187,15 @@ impl Operator for HstackOperator {
Box::new(self.clone())
}
fn fmt(&self) -> &str {
#[cfg(feature = "cse")]
{
if self.cse_exprs.is_some() {
"hstack[cse]"
} else {
"hstack"
}
}
#[cfg(not(feature = "cse"))]
"hstack"
}
}
72 changes: 67 additions & 5 deletions polars/polars-lazy/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_core::prelude::*;
use polars_core::with_match_physical_integer_polars_type;
use polars_plan::prelude::*;

use crate::executors::operators::HstackOperator;
use crate::executors::sinks::groupby::aggregates::convert_to_hash_agg;
use crate::executors::sinks::groupby::GenericGroupby2;
use crate::executors::sinks::*;
Expand Down Expand Up @@ -395,6 +396,24 @@ pub fn get_dummy_operator() -> Box<dyn Operator> {
Box::new(operators::PlaceHolder {})
}

fn get_hstack<F>(
exprs: &[Node],
expr_arena: &mut Arena<AExpr>,
to_physical: &F,
input_schema: SchemaRef,
#[cfg(feature = "cse")] cse_exprs: Option<Box<HstackOperator>>,
) -> PolarsResult<HstackOperator>
where
F: Fn(Node, &Arena<AExpr>, Option<&SchemaRef>) -> PolarsResult<Arc<dyn PhysicalPipedExpr>>,
{
Ok(operators::HstackOperator {
exprs: exprs_to_physical(exprs, expr_arena, &to_physical, Some(&input_schema))?,
input_schema,
#[cfg(feature = "cse")]
cse_exprs,
})
}

pub fn get_operator<F>(
node: Node,
lp_arena: &mut Arena<ALogicalPlan>,
Expand All @@ -408,17 +427,60 @@ where
let op = match lp_arena.get(node) {
Projection { expr, input, .. } => {
let input_schema = lp_arena.get(*input).schema(lp_arena);

#[cfg(feature = "cse")]
let cse_exprs = expr.cse_exprs();
#[cfg(feature = "cse")]
let cse_exprs = if cse_exprs.is_empty() {
None
} else {
Some(get_hstack(
cse_exprs,
expr_arena,
to_physical,
(*input_schema).clone(),
None,
)?)
};

let op = operators::ProjectionOperator {
exprs: exprs_to_physical(expr, expr_arena, &to_physical, Some(&input_schema))?,
exprs: exprs_to_physical(
expr.default_exprs(),
expr_arena,
&to_physical,
Some(&input_schema),
)?,
#[cfg(feature = "cse")]
cse_exprs,
};
Box::new(op) as Box<dyn Operator>
}
HStack { exprs, input, .. } => {
let input_schema = (*lp_arena.get(*input).schema(lp_arena)).clone();
let op = operators::HstackOperator {
exprs: exprs_to_physical(exprs, expr_arena, &to_physical, Some(&input_schema))?,
input_schema,
let input_schema = lp_arena.get(*input).schema(lp_arena);

#[cfg(feature = "cse")]
let cse_exprs = exprs.cse_exprs();
#[cfg(feature = "cse")]
let cse_exprs = if cse_exprs.is_empty() {
None
} else {
Some(Box::new(get_hstack(
cse_exprs,
expr_arena,
to_physical,
(*input_schema).clone(),
None,
)?))
};
let op = get_hstack(
exprs.default_exprs(),
expr_arena,
to_physical,
(*input_schema).clone(),
#[cfg(feature = "cse")]
cse_exprs,
)?;

Box::new(op) as Box<dyn Operator>
}
Selection { predicate, input } => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,9 @@ impl ALogicalPlan {
input,
schema,
} => {
debug_assert!(!expr.has_sub_exprs());
if expr.has_sub_exprs() {
polars_warn!("some columns show temporary names because of cse optimization")
}
let i = convert_to_lp(input, lp_arena);

LogicalPlan::Projection {
Expand Down
13 changes: 13 additions & 0 deletions polars/polars-lazy/polars-plan/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use polars_core::prelude::*;
use smartstring::alias::String as SmartString;

use crate::constants::CSE_REPLACED;
use crate::logical_plan::iterator::ArenaExprIter;
use crate::logical_plan::Context;
use crate::prelude::names::COUNT;
Expand Down Expand Up @@ -411,3 +412,15 @@ pub fn expr_is_projected_upstream(
let output_name = output_field.name();
projected_names.contains(output_name.as_str())
}

pub fn rename_cse_tmp_series(s: &mut Series) {
if s.name().starts_with(CSE_REPLACED) {
let field = s.field().into_owned();
let name = &field.name;
let pat = r#"col("#;
let offset = name.rfind(pat).unwrap() + pat.len();
// -1 is `)` of `col(foo)`
let name = &name[offset..name.len() - 1];
s.rename(name);
}
}
6 changes: 3 additions & 3 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,10 @@ impl LazyFrame {
let streaming = self.opt_state.streaming;
#[cfg(feature = "cse")]
if streaming && self.opt_state.comm_subplan_elim {
polars_warn!("Cannot combine 'streaming' with 'common_subplan_elimination'. CSE will be turned off.");
polars_warn!(
"Cannot combine 'streaming' with 'comm_subplan_elim'. CSE will be turned off."
);
opt_state.comm_subplan_elim = false;
// TODO! toggle on once implemented on the physical side
opt_state.comm_subexpr_elim = false;
}
let lp_top = optimize(self.logical_plan, opt_state, lp_arena, expr_arena, scratch)?;

Expand Down
11 changes: 1 addition & 10 deletions polars/polars-lazy/src/physical_plan/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::borrow::Cow;

pub use executor::*;
use polars_core::POOL;
use polars_plan::constants::CSE_REPLACED;
use polars_plan::global::FETCH_ROWS;
use polars_plan::utils::*;
use rayon::prelude::*;
Expand Down Expand Up @@ -177,15 +176,7 @@ pub(super) fn evaluate_physical_expressions(
// the replace CSE has a temporary name
// we don't want this name in the result
for s in result.iter_mut() {
let field = s.field().into_owned();
let name = &field.name;
if name.starts_with(CSE_REPLACED) {
let pat = r#"col("#;
let offset = name.rfind(pat).unwrap() + pat.len();
// -1 is `)` of `col(foo)`
let name = &name[offset..name.len() - 1];
s.rename(name);
}
rename_cse_tmp_series(s);
}

result
Expand Down
37 changes: 37 additions & 0 deletions py-polars/tests/unit/test_cse.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,40 @@ def test_cse_expr_selection_context(monkeypatch: Any, capfd: Any) -> None:
out = capfd.readouterr().out
assert "run ProjectionExec with 2 CSE" in out
assert "run StackExec with 2 CSE" in out


def test_cse_expr_selection_streaming(monkeypatch: Any, capfd: Any) -> None:
monkeypatch.setenv("POLARS_VERBOSE", "1")
q = pl.LazyFrame(
{
"a": [1, 2, 3, 4],
"b": [1, 2, 3, 4],
"c": [1, 2, 3, 4],
}
)

derived = pl.col("a") * pl.col("b")
derived2 = derived * derived

exprs = [
derived.alias("d1"),
derived2.alias("d2"),
(derived2 * 10).alias("d3"),
]

assert q.select(exprs).collect(comm_subexpr_elim=True, streaming=True).to_dict(
False
) == {"d1": [1, 4, 9, 16], "d2": [1, 16, 81, 256], "d3": [10, 160, 810, 2560]}
assert q.with_columns(exprs).collect(
comm_subexpr_elim=True, streaming=True
).to_dict(False) == {
"a": [1, 2, 3, 4],
"b": [1, 2, 3, 4],
"c": [1, 2, 3, 4],
"d1": [1, 4, 9, 16],
"d2": [1, 16, 81, 256],
"d3": [10, 160, 810, 2560],
}
err = capfd.readouterr().err
assert "df -> projection[cse] -> ordered_sink" in err
assert "df -> hstack[cse] -> ordered_sink" in err

0 comments on commit 42c21bd

Please sign in to comment.