Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust): use SchemaRef in CSV modules #7250

Merged
merged 3 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion polars/polars-io/src/csv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ pub mod utils;
mod write;
pub(super) mod write_impl;

use std::borrow::Cow;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
Expand Down
48 changes: 18 additions & 30 deletions polars/polars-io/src/csv/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ where
delimiter: Option<u8>,
has_header: bool,
ignore_errors: bool,
pub(crate) schema: Option<&'a Schema>,
pub(crate) schema: Option<SchemaRef>,
encoding: CsvEncoding,
n_threads: Option<usize>,
path: Option<PathBuf>,
schema_overwrite: Option<&'a Schema>,
schema_overwrite: Option<SchemaRef>,
dtype_overwrite: Option<&'a [DataType]>,
sample_size: usize,
chunk_size: usize,
Expand All @@ -129,8 +129,6 @@ where
skip_rows_after_header: usize,
try_parse_dates: bool,
row_count: Option<RowCount>,
// temporary schema needed for batch lifetimes
owned_schema: Option<Box<Schema>>,
}

impl<'a, R> CsvReader<'a, R>
Expand Down Expand Up @@ -178,7 +176,7 @@ where
/// in the csv parser and expects a complete Schema.
///
/// It is recommended to use [with_dtypes](Self::with_dtypes) instead.
pub fn with_schema(mut self, schema: &'a Schema) -> Self {
pub fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
self
}
Expand Down Expand Up @@ -233,7 +231,7 @@ where

/// Overwrite the schema with the dtypes in this given Schema. The given schema may be a subset
/// of the total schema.
pub fn with_dtypes(mut self, schema: Option<&'a Schema>) -> Self {
pub fn with_dtypes(mut self, schema: Option<SchemaRef>) -> Self {
self.schema_overwrite = schema;
self
}
Expand Down Expand Up @@ -331,7 +329,7 @@ impl<'a> CsvReader<'a, File> {
impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
fn core_reader<'b>(
&'b mut self,
schema: Option<&'b Schema>,
schema: Option<SchemaRef>,
to_cast: Vec<Field>,
) -> PolarsResult<CoreReader<'b>>
where
Expand All @@ -347,7 +345,7 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
self.delimiter,
self.has_header,
self.ignore_errors,
self.schema,
self.schema.clone(),
std::mem::take(&mut self.columns),
self.encoding,
self.n_threads,
Expand Down Expand Up @@ -404,24 +402,14 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
}

pub fn batched_borrowed(&'a mut self) -> PolarsResult<BatchedCsvReader<'a>> {
if let Some(schema) = self.schema_overwrite {
if let Some(schema) = self.schema_overwrite.as_deref() {
let (schema, to_cast, has_cat) = self.prepare_schema_overwrite(schema);
self.owned_schema = Some(Box::new(schema));

// safety
// we boxed the schema and we refer to the boxed pointer
// the schema will drop once self drops
// so it is bound to 'a
let schema = unsafe {
std::mem::transmute::<Option<&Schema>, Option<&Schema>>(
self.owned_schema.as_ref().map(|b| b.as_ref()),
)
};
let schema = Arc::new(schema);

let csv_reader = self.core_reader(schema, to_cast)?;
let csv_reader = self.core_reader(Some(schema), to_cast)?;
csv_reader.batched(has_cat)
} else {
let csv_reader = self.core_reader(self.schema, vec![])?;
let csv_reader = self.core_reader(self.schema.clone(), vec![])?;
csv_reader.batched(false)
}
}
Expand Down Expand Up @@ -490,36 +478,36 @@ where
skip_rows_after_header: 0,
try_parse_dates: false,
row_count: None,
owned_schema: None,
}
}

/// Read the file and create the DataFrame.
fn finish(mut self) -> PolarsResult<DataFrame> {
let rechunk = self.rechunk;
let schema_overwrite = self.schema_overwrite;
let schema_overwrite = self.schema_overwrite.clone();
let dtype_overwrite = self.dtype_overwrite;
let should_parse_dates = self.try_parse_dates;
let low_memory = self.low_memory;

#[cfg(feature = "dtype-categorical")]
let mut _cat_lock = None;

let mut df = if let Some(schema) = schema_overwrite {
let mut df = if let Some(schema) = schema_overwrite.as_deref() {
let (schema, to_cast, _has_cat) = self.prepare_schema_overwrite(schema);

#[cfg(feature = "dtype-categorical")]
if _has_cat {
_cat_lock = Some(polars_core::IUseStringCache::new())
}

let mut csv_reader = self.core_reader(Some(&schema), to_cast)?;
let mut csv_reader = self.core_reader(Some(Arc::new(schema)), to_cast)?;
csv_reader.as_df()?
} else {
#[cfg(feature = "dtype-categorical")]
{
let has_cat = self
.schema
.clone()
.map(|schema| {
schema
.iter_dtypes()
Expand All @@ -530,7 +518,7 @@ where
_cat_lock = Some(polars_core::IUseStringCache::new())
}
}
let mut csv_reader = self.core_reader(self.schema, vec![])?;
let mut csv_reader = self.core_reader(self.schema.clone(), vec![])?;
csv_reader.as_df()?
};

Expand All @@ -549,16 +537,16 @@ where
if should_parse_dates {
// determine the schema that's given by the user. That should not be changed
let fixed_schema = match (schema_overwrite, dtype_overwrite) {
(Some(schema), _) => Cow::Borrowed(schema),
(Some(schema), _) => schema,
(None, Some(dtypes)) => {
let fields = dtypes
.iter()
.zip(df.get_column_names())
.map(|(dtype, name)| Field::new(name, dtype.clone()));

Cow::Owned(Schema::from(fields))
Arc::new(Schema::from(fields))
}
_ => Cow::Owned(Schema::default()),
_ => Arc::default(),
};
df = parse_dates(df, &fixed_schema)
}
Expand Down
13 changes: 4 additions & 9 deletions polars/polars-io/src/csv/read_impl/batched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@ impl<'a> CoreReader<'a> {
.determine_file_chunks_and_statistics(&mut n_threads, &reader_bytes, logging, true)?;
let projection = self.get_projection();

// safety
// we extend the lifetime because we are sure they are bound
// to 'a, as the &str refer to the &schema which is bound by 'a
let str_columns = unsafe {
std::mem::transmute::<Vec<&str>, Vec<&'a str>>(self.get_string_columns(&projection)?)
};
let str_columns = self.get_string_columns(&projection)?;

// RAII structure that will ensure we maintain a global stringcache
#[cfg(feature = "dtype-categorical")]
Expand Down Expand Up @@ -64,7 +59,7 @@ pub struct BatchedCsvReader<'a> {
file_chunks: Vec<(usize, usize)>,
chunk_offset: IdxSize,
str_capacities: Vec<RunningSize>,
str_columns: Vec<&'a str>,
str_columns: StringColumns,
projection: Vec<usize>,
starting_point_offset: Option<usize>,
row_count: Option<RowCount>,
Expand All @@ -78,7 +73,7 @@ pub struct BatchedCsvReader<'a> {
n_rows: Option<usize>,
encoding: CsvEncoding,
delimiter: u8,
schema: Cow<'a, Schema>,
schema: SchemaRef,
rows_read: IdxSize,
#[cfg(feature = "dtype-categorical")]
_cat_lock: Option<polars_core::IUseStringCache>,
Expand Down Expand Up @@ -193,7 +188,7 @@ pub fn to_batched_owned(
) -> OwnedBatchedCsvReader {
// make sure that the schema is bound to the schema we have
// we will keep ownership of the schema so that the lifetime remains bound to ourselves
let reader = reader.with_schema(schema.as_ref());
let reader = reader.with_schema(schema.clone());
// extend the lifetime
// the lifetime was bound to schema, which we own and will store on the heap
let reader = unsafe {
Expand Down
61 changes: 45 additions & 16 deletions polars/polars-io/src/csv/read_impl/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
mod batched;

use std::borrow::Cow;
use std::fmt;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -77,7 +76,7 @@ pub(crate) fn cast_columns(
pub(crate) struct CoreReader<'a> {
reader_bytes: Option<ReaderBytes<'a>>,
/// Explicit schema for the CSV file
schema: Cow<'a, Schema>,
schema: SchemaRef,
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<usize>>,
/// Current line number, used in error reporting
Expand Down Expand Up @@ -173,11 +172,11 @@ impl<'a> CoreReader<'a> {
delimiter: Option<u8>,
has_header: bool,
ignore_errors: bool,
schema: Option<&'a Schema>,
schema: Option<SchemaRef>,
columns: Option<Vec<String>>,
encoding: CsvEncoding,
n_threads: Option<usize>,
schema_overwrite: Option<&'a Schema>,
schema_overwrite: Option<SchemaRef>,
dtype_overwrite: Option<&'a [DataType]>,
sample_size: usize,
chunk_size: usize,
Expand Down Expand Up @@ -205,7 +204,7 @@ impl<'a> CoreReader<'a> {
let delimiter = delimiter.unwrap_or(b',');

let mut schema = match schema {
Some(schema) => Cow::Borrowed(schema),
Some(schema) => schema,
None => {
{
// We keep track of the inferred schema bool
Expand All @@ -223,7 +222,7 @@ impl<'a> CoreReader<'a> {
delimiter,
max_records,
has_header,
schema_overwrite,
schema_overwrite.as_deref(),
&mut skip_rows,
skip_rows_after_header,
comment_char,
Expand All @@ -232,16 +231,15 @@ impl<'a> CoreReader<'a> {
null_values.as_ref(),
try_parse_dates,
)?;
Cow::Owned(inferred_schema)
Arc::new(inferred_schema)
}
}
};
if let Some(dtypes) = dtype_overwrite {
let mut s = schema.into_owned();
let s = Arc::make_mut(&mut schema);
for (index, dt) in dtypes.iter().enumerate() {
s.coerce_by_index(index, dt.clone()).unwrap();
}
schema = Cow::Owned(s);
}

// create a null value for every column
Expand Down Expand Up @@ -445,26 +443,33 @@ impl<'a> CoreReader<'a> {
.unwrap_or_else(|| (0..self.schema.len()).collect())
}

fn get_string_columns(&self, projection: &[usize]) -> PolarsResult<Vec<&str>> {
fn get_string_columns(&self, projection: &[usize]) -> PolarsResult<StringColumns> {
// keep track of the maximum capacity that needs to be allocated for the utf8-builder
// Per string column we keep a statistic of the maximum length of string bytes per chunk
// We must the names, not the indexes, (the indexes are incorrect due to projection
// pushdown)
let mut str_columns = Vec::with_capacity(projection.len());

let mut new_projection = Vec::with_capacity(projection.len());

for i in projection {
let (name, dtype) = self.schema.get_index(*i).ok_or_else(||
let (_, dtype) = self.schema.get_index(*i).ok_or_else(||
PolarsError::ComputeError(
format!("the given projection index: {} is out of bounds for csv schema with {} columns", i, self.schema.len()).into())
)?;

if dtype == &DataType::Utf8 {
str_columns.push(name.as_str())
new_projection.push(*i)
}
}
Ok(str_columns)

Ok(StringColumns::new(self.schema.clone(), new_projection))
}

fn init_string_size_stats(&self, str_columns: &[&str], capacity: usize) -> Vec<RunningSize> {
fn init_string_size_stats(
&self,
str_columns: &StringColumns,
capacity: usize,
) -> Vec<RunningSize> {
// assume 10 chars per str
// this is not updated in low memory mode
let init_str_bytes = capacity * 10;
Expand Down Expand Up @@ -690,7 +695,7 @@ impl<'a> CoreReader<'a> {

fn update_string_stats(
str_capacities: &[RunningSize],
str_columns: &[&str],
str_columns: &StringColumns,
local_df: &DataFrame,
) -> PolarsResult<()> {
// update the running str bytes statistics
Expand Down Expand Up @@ -769,3 +774,27 @@ fn read_chunk(
.collect::<PolarsResult<_>>()?,
))
}

/// List of strings, which are stored inside of a [Schema].
///
/// Conceptually it is `Vec<&str>` with `&str` tied to the lifetime of
/// the [Schema].
struct StringColumns {
schema: SchemaRef,
fields: Vec<usize>,
}

impl StringColumns {
/// New [StringColumns], where the list `fields` has indices
/// of fields in the `schema`.
fn new(schema: SchemaRef, fields: Vec<usize>) -> Self {
Self { schema, fields }
}

fn iter(&self) -> impl Iterator<Item = &str> {
self.fields.iter().map(|schema_i| {
let (name, _) = self.schema.get_index(*schema_i).unwrap();
name.as_str()
})
}
}
9 changes: 2 additions & 7 deletions polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,10 @@ impl CsvSource {
}
let n_rows = _set_n_rows_for_scan(options.n_rows);

// Safety:
// schema will be owned by CsvSource and have a valid lifetime until CsvSource is dropped
let schema_ref =
unsafe { std::mem::transmute::<&Schema, &'static Schema>(schema.as_ref()) };

let n_cols = if projected_len > 0 {
projected_len
} else {
schema_ref.len()
schema.len()
};
// inversely scale the chunk size by the number of threads so that we reduce memory pressure
// in streaming
Expand All @@ -55,7 +50,7 @@ impl CsvSource {
let reader = CsvReader::from_path(&path)
.unwrap()
.has_header(options.has_header)
.with_schema(schema_ref)
.with_schema(schema.clone())
.with_delimiter(options.delimiter)
.with_ignore_errors(options.ignore_errors)
.with_skip_rows(options.skip_rows)
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl CsvExec {
CsvReader::from_path(&self.path)
.unwrap()
.has_header(self.options.has_header)
.with_schema(&self.schema)
.with_schema(self.schema.clone())
.with_delimiter(self.options.delimiter)
.with_ignore_errors(self.options.ignore_errors)
.with_skip_rows(self.options.skip_rows)
Expand Down
Loading