Skip to content

Commit

Permalink
refactor(rust): cluster file scans in one node (pola-rs#9799)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jul 11, 2023
1 parent 243951c commit ff09ec3
Show file tree
Hide file tree
Showing 31 changed files with 459 additions and 1,127 deletions.
2 changes: 1 addition & 1 deletion polars/polars-core/src/cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::error::{PolarsError, PolarsResult};
#[allow(dead_code)]
type Configs<T> = Vec<(T, String)>;

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, PartialEq)]
#[cfg_attr(feature = "serde-lazy", derive(Serialize, Deserialize))]
/// Options to connect to various cloud providers.
pub struct CloudOptions {
Expand Down
12 changes: 8 additions & 4 deletions polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use polars_core::POOL;
use polars_io::csv::read_impl::{BatchedCsvReaderMmap, BatchedCsvReaderRead};
use polars_io::csv::{CsvEncoding, CsvReader};
use polars_plan::global::_set_n_rows_for_scan;
use polars_plan::prelude::CsvParserOptions;
use polars_plan::prelude::{CsvParserOptions, FileScanOptions};

use super::*;
use crate::pipeline::determine_chunk_size;
Expand All @@ -22,6 +22,7 @@ pub(crate) struct CsvSource {
chunk_index: IdxSize,
path: Option<PathBuf>,
options: Option<CsvParserOptions>,
file_options: Option<FileScanOptions>,
verbose: bool,
}

Expand All @@ -31,8 +32,9 @@ impl CsvSource {
// leading to Too many Open files error
fn init_reader(&mut self) -> PolarsResult<()> {
let options = self.options.take().unwrap();
let file_options = self.file_options.take().unwrap();
let path = self.path.take().unwrap();
let mut with_columns = options.with_columns;
let mut with_columns = file_options.with_columns;
let mut projected_len = 0;
with_columns.as_ref().map(|columns| {
projected_len = columns.len();
Expand All @@ -48,7 +50,7 @@ impl CsvSource {
} else {
self.schema.len()
};
let n_rows = _set_n_rows_for_scan(options.n_rows);
let n_rows = _set_n_rows_for_scan(file_options.n_rows);
// inversely scale the chunk size by the number of threads so that we reduce memory pressure
// in streaming
let chunk_size = determine_chunk_size(n_cols, POOL.current_num_threads())?;
Expand Down Expand Up @@ -76,7 +78,7 @@ impl CsvSource {
// never rechunk in streaming
.with_rechunk(false)
.with_chunk_size(chunk_size)
.with_row_count(options.row_count)
.with_row_count(file_options.row_count)
.with_try_parse_dates(options.try_parse_dates);

let reader = Box::new(reader);
Expand All @@ -100,6 +102,7 @@ impl CsvSource {
path: PathBuf,
schema: SchemaRef,
options: CsvParserOptions,
file_options: FileScanOptions,
verbose: bool,
) -> PolarsResult<Self> {
Ok(CsvSource {
Expand All @@ -110,6 +113,7 @@ impl CsvSource {
chunk_index: 0,
path: Some(path),
options: Some(options),
file_options: Some(file_options),
verbose,
})
}
Expand Down
16 changes: 10 additions & 6 deletions polars/polars-lazy/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use polars_io::parquet::{BatchedParquetReader, ParquetReader};
#[cfg(feature = "async")]
use polars_io::prelude::ParquetAsyncReader;
use polars_io::{is_cloud_url, SerReader};
use polars_plan::prelude::ParquetOptions;
use polars_plan::prelude::{FileScanOptions, ParquetOptions};
use polars_utils::IdxSize;

use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult};
Expand All @@ -20,6 +20,7 @@ pub struct ParquetSource {
chunk_index: IdxSize,
path: Option<PathBuf>,
options: Option<ParquetOptions>,
file_options: Option<FileScanOptions>,
#[allow(dead_code)]
cloud_options: Option<CloudOptions>,
schema: Option<SchemaRef>,
Expand All @@ -33,8 +34,9 @@ impl ParquetSource {
fn init_reader(&mut self) -> PolarsResult<()> {
let path = self.path.take().unwrap();
let options = self.options.take().unwrap();
let file_options = self.file_options.take().unwrap();
let schema = self.schema.take().unwrap();
let projection: Option<Vec<_>> = options.with_columns.map(|with_columns| {
let projection: Option<Vec<_>> = file_options.with_columns.map(|with_columns| {
with_columns
.iter()
.map(|name| schema.index_of(name).unwrap())
Expand All @@ -59,8 +61,8 @@ impl ParquetSource {
{
let uri = path.to_string_lossy();
ParquetAsyncReader::from_uri(&uri, self.cloud_options.as_ref())?
.with_n_rows(options.n_rows)
.with_row_count(options.row_count)
.with_n_rows(file_options.n_rows)
.with_row_count(file_options.row_count)
.with_projection(projection)
.use_statistics(options.use_statistics)
.batched(chunk_size)?
Expand All @@ -69,8 +71,8 @@ impl ParquetSource {
let file = std::fs::File::open(path).unwrap();

ParquetReader::new(file)
.with_n_rows(options.n_rows)
.with_row_count(options.row_count)
.with_n_rows(file_options.n_rows)
.with_row_count(file_options.row_count)
.with_projection(projection)
.use_statistics(options.use_statistics)
.batched(chunk_size)?
Expand All @@ -84,6 +86,7 @@ impl ParquetSource {
path: PathBuf,
options: ParquetOptions,
cloud_options: Option<CloudOptions>,
file_options: FileScanOptions,
schema: SchemaRef,
verbose: bool,
) -> PolarsResult<Self> {
Expand All @@ -94,6 +97,7 @@ impl ParquetSource {
n_threads,
chunk_index: 0,
options: Some(options),
file_options: Some(file_options),
path: Some(path),
cloud_options,
schema: Some(schema),
Expand Down
78 changes: 35 additions & 43 deletions polars/polars-lazy/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,13 @@ where
}
Ok(Box::new(sources::DataFrameSource::from_df(df)) as Box<dyn Source>)
}
#[cfg(feature = "csv")]
CsvScan {
Scan {
path,
file_info,
options,
file_options,
predicate,
output_schema,
..
scan_type,
} => {
// add predicate to operators
if let (true, Some(predicate)) = (push_predicate, predicate) {
Expand All @@ -81,36 +80,39 @@ where
let op = Box::new(op) as Box<dyn Operator>;
operator_objects.push(op)
}
let src = sources::CsvSource::new(path, file_info.schema, options, verbose)?;
Ok(Box::new(src) as Box<dyn Source>)
}
#[cfg(feature = "parquet")]
ParquetScan {
path,
file_info,
options,
cloud_options,
predicate,
output_schema,
..
} => {
// add predicate to operators
if let (true, Some(predicate)) = (push_predicate, predicate) {
let predicate = to_physical(predicate, expr_arena, output_schema.as_ref())?;
let op = operators::FilterOperator { predicate };
let op = Box::new(op) as Box<dyn Operator>;
operator_objects.push(op)
match scan_type {
#[cfg(feature = "csv")]
FileScan::Csv {
options: csv_options,
} => {
let src = sources::CsvSource::new(
path,
file_info.schema,
csv_options,
file_options,
verbose,
)?;
Ok(Box::new(src) as Box<dyn Source>)
}
#[cfg(feature = "parquet")]
FileScan::Parquet {
options: parquet_options,
cloud_options,
} => {
let src = sources::ParquetSource::new(
path,
parquet_options,
cloud_options,
file_options,
file_info.schema,
verbose,
)?;
Ok(Box::new(src) as Box<dyn Source>)
}
_ => todo!(),
}
let src = sources::ParquetSource::new(
path,
options,
cloud_options,
file_info.schema,
verbose,
)?;
Ok(Box::new(src) as Box<dyn Source>)
}
_ => todo!(),
_ => unreachable!(),
}
}

Expand Down Expand Up @@ -480,17 +482,7 @@ where
true,
verbose,
)?,
#[cfg(feature = "csv")]
lp @ CsvScan { .. } => get_source(
lp.clone(),
&mut operator_objects,
expr_arena,
&to_physical,
true,
verbose,
)?,
#[cfg(feature = "parquet")]
lp @ ParquetScan { .. } => get_source(
lp @ Scan { .. } => get_source(
lp.clone(),
&mut operator_objects,
expr_arena,
Expand Down
73 changes: 19 additions & 54 deletions polars/polars-lazy/polars-plan/src/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,63 +340,28 @@ impl LogicalPlan {
self.write_dot(acc_str, prev_node, current_node, id_map)
}
}
#[cfg(feature = "csv")]
CsvScan {
path,
options,
file_info,
predicate,
..
} => self.write_scan(
acc_str,
prev_node,
"CSV",
path.as_ref(),
options.with_columns.as_deref().map(|cols| cols.as_slice()),
file_info.schema.len(),
predicate,
branch,
id,
id_map,
),
#[cfg(feature = "parquet")]
ParquetScan {
Scan {
path,
file_info,
predicate,
options,
..
} => self.write_scan(
acc_str,
prev_node,
"PARQUET",
path.as_ref(),
options.with_columns.as_deref().map(|cols| cols.as_slice()),
file_info.schema.len(),
predicate,
branch,
id,
id_map,
),
#[cfg(feature = "ipc")]
IpcScan {
path,
file_info,
options,
predicate,
..
} => self.write_scan(
acc_str,
prev_node,
"IPC",
path.as_ref(),
options.with_columns.as_deref().map(|cols| cols.as_slice()),
file_info.schema.len(),
predicate,
branch,
id,
id_map,
),
scan_type,
file_options: options,
} => {
let name: &str = scan_type.into();

self.write_scan(
acc_str,
prev_node,
name,
path.as_ref(),
options.with_columns.as_ref().map(|cols| cols.as_slice()),
file_info.schema.len(),
predicate,
branch,
id,
id_map,
)
}
Join {
input_left,
input_right,
Expand Down
Loading

0 comments on commit ff09ec3

Please sign in to comment.