Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sink_parquet for anonymous scan #8719

Open
sid-6581 opened this issue May 7, 2023 · 6 comments
Open

Support sink_parquet for anonymous scan #8719

sid-6581 opened this issue May 7, 2023 · 6 comments
Labels
enhancement New feature or an improvement of an existing feature

Comments

@sid-6581
Copy link

sid-6581 commented May 7, 2023

Problem description

I have a use case that I would imagine wouldn't be too out of the ordinary. I have many files in a format that doesn't already have a reader, and I would like to convert them to a parquet file in a streaming fashion. They don't all fit in memory at the same time, so it's important that they are read individually and appended to the parquet file. I tried writing a lazy reader using AnonymousScan, but I get the error sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()' with the following minimal reproduction:

#[derive(Clone)]
struct LazyReader {
    path: PathBuf,
    rechunk: bool,
    row_count: Option<RowCount>,
    n_rows: Option<usize>,
}

impl AnonymousScan for LazyReader {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn scan(&self, _scan_opts: AnonymousScanOptions) -> PolarsResult<DataFrame> {
        df!["test" => ["testValue1", "testValue2"]]
    }

    fn schema(&self, _infer_schema_length: Option<usize>) -> PolarsResult<Schema> {
        Ok(Schema::from(
            [Field::new("test", DataType::Utf8)].into_iter(),
        ))
    }
}

impl LazyFileListReader for LazyReader {
    fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
        let options = ScanArgsAnonymous::default();
        LazyFrame::anonymous_scan(Arc::new(self), options)
    }

    fn path(&self) -> &Path {
        &self.path
    }

    fn with_path(mut self, path: PathBuf) -> Self {
        self.path = path;
        self
    }

    fn rechunk(&self) -> bool {
        self.rechunk
    }

    #[must_use]
    fn with_rechunk(mut self, toggle: bool) -> Self {
        self.rechunk = toggle;
        self
    }

    fn n_rows(&self) -> Option<usize> {
        self.n_rows
    }

    fn row_count(&self) -> Option<&RowCount> {
        self.row_count.as_ref()
    }
}

fn main() -> Result<()> {
    let args: Vec<String> = env::args().collect();

    LazyReader {
        path: "test".into(),
        rechunk: false,
        row_count: None,
        n_rows: None,
    }
    .finish()?
    .sink_parquet(
        args[2].clone().into(),
        ParquetWriteOptions {
            compression: ParquetCompression::Zstd(None),
            ..ParquetWriteOptions::default()
        },
    )?;

    Ok(())
}

I found barely any examples of using AnonymousScan, so it's possible I missed something, but I don't know what that might be based on the example in the polars repo. It uses .collect().write_parquet() which won't work for me.

@sid-6581 sid-6581 added the enhancement New feature or an improvement of an existing feature label May 7, 2023
@osawyerr
Copy link

@sid-6581 really good example here - https://github.com/universalmind303/polars-mongo

@sid-6581
Copy link
Author

@sid-6581 really good example here - https://github.com/universalmind303/polars-mongo

Thanks for the reply! I had seen that example as well, but I didn't see anything significantly different about it that would make it work with sink_parquet. Did I miss something?

@Simon-Will
Copy link

Just want to add that I have basically the same use case: downloading large volumes of data via HTTP and streaming it to a parquet file. I'm trying to use the Python bindings and can reduce the error to the following minimum example:

import pickle
import polars as pl

SCHEMA = {"foo": pl.Int64, "bar": pl.Utf8}

def _pseudo_scan(*args, **kwargs):
    return pl.DataFrame(
        {"foo": [1, 2], "bar": ["a", "b"]},
        schema=SCHEMA
    )

def pseudo_scan():
    return pl.LazyFrame._scan_python_function(SCHEMA, _pseudo_scan)


pseudo_scan().sink_parquet("test.parquet")

@abealcantara
Copy link
Contributor

Hi 😄 I just started to contribute to Polars and want to help with this issue. I noticed that both AnonymousScan and PythonScan operations are not currently supported in streaming mode, therefore the scan operation is called only once in both cases.
For this to work we need to extend both AnonymousScan and PythonScan APIs to support streaming so they can be called multiple times to read the data in small batches.
I will start working in a proposal for AnonymousScan as this will also add support of ndjson format in streaming mode, later on we can work in a version for PythonScan.

@eitsupi
Copy link
Contributor

eitsupi commented Feb 18, 2024

Any update on this?

@dwpeng
Copy link

dwpeng commented Dec 13, 2024

Supporting streaming for AnonymousScan will be a very great feature to parse some custom format which has a large size.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature
Projects
None yet
Development

No branches or pull requests

6 participants