Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sampling to our ParquetSource #773

Merged
merged 5 commits into from
Oct 20, 2023
Merged

Add sampling to our ParquetSource #773

merged 5 commits into from
Oct 20, 2023

Conversation

dsmilkov
Copy link
Collaborator

@dsmilkov dsmilkov commented Oct 20, 2023

  • Add sampling for parquet files via duckdb
  • Improve memory usage by reading smaller batches from huge parquet files
  • Add support for parquet files on S3

This isn't yet addressing the problem of having large number of shards and thus needing to sample from random shards, but that's a follow up.

Towards #770

""")
res = self._con.execute('SELECT COUNT(*) FROM t').fetchone()
num_items = cast(tuple[int], res)[0]
self._reader = self._con.execute('SELECT * from t').fetch_record_batch(rows_per_batch=10_000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these computations seem to belong in process(), not in setup()

Copy link
Collaborator Author

@dsmilkov dsmilkov Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great q.

self._reader = self._con.execute('SELECT * from t').fetch_record_batch(rows_per_batch=10_000)

returns a lazy iterator, so no data is being read yet, but we found that executing this in setup catches a lot of "setup" bugs like file not found, unrecognized parquet format (broken head), unauthorized S3/GCS bucket read etc.

In addition to this, once you have a reader , you can read the inferred schema before reading the data, and our sources need the schema before process() so they can setup a parquet writer with buffer ahead of time.

@dsmilkov dsmilkov changed the title Add parquet sampling and use duckdb to read files Add sampling to our ParquetSource Oct 20, 2023
.env Outdated Show resolved Hide resolved
@@ -62,7 +61,7 @@ def setup(self) -> None:
@override
def source_schema(self) -> SourceSchema:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not for now but maybe we should make schemas optional and let duckdb infer types to reduce cognitive overhead of both sources and signals

then signals are very close to a map

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's our pq.ParquetWriter that needs a schema ahead of time to setup a writer, before writing a single row to disk. And that schema needs to be consistent with 100% of the rows that are going in that writer to avoid write error. That means we need to see the entire data in order to correctly infer the schema, if not provided by the user. Or we circumvent our writer and get duckdb to read the format and dump to paquet directly.

Copy link
Collaborator Author

@dsmilkov dsmilkov Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that pq.ParquetWriter also doesn't hold everything in memory, it dumbs to parquet every 128MB row_group_buffer_size with 10k items per rowgroup.

lilac/sources/parquet_source.py Outdated Show resolved Hide resolved
@dsmilkov dsmilkov merged commit ca44094 into main Oct 20, 2023
4 checks passed
@dsmilkov dsmilkov deleted the ds-smaple branch October 20, 2023 17:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants