-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add sampling to our ParquetSource
#773
Conversation
lilac/sources/parquet_source.py
Outdated
""") | ||
res = self._con.execute('SELECT COUNT(*) FROM t').fetchone() | ||
num_items = cast(tuple[int], res)[0] | ||
self._reader = self._con.execute('SELECT * from t').fetch_record_batch(rows_per_batch=10_000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these computations seem to belong in process(), not in setup()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great q.
self._reader = self._con.execute('SELECT * from t').fetch_record_batch(rows_per_batch=10_000)
returns a lazy iterator, so no data is being read yet, but we found that executing this in setup catches a lot of "setup" bugs like file not found, unrecognized parquet format (broken head), unauthorized S3/GCS bucket read etc.
In addition to this, once you have a reader , you can read the inferred schema before reading the data, and our sources need the schema before process() so they can setup a parquet writer with buffer ahead of time.
ParquetSource
@@ -62,7 +61,7 @@ def setup(self) -> None: | |||
@override | |||
def source_schema(self) -> SourceSchema: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not for now but maybe we should make schemas optional and let duckdb infer types to reduce cognitive overhead of both sources and signals
then signals are very close to a map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's our pq.ParquetWriter that needs a schema ahead of time to setup a writer, before writing a single row to disk. And that schema needs to be consistent with 100% of the rows that are going in that writer to avoid write error. That means we need to see the entire data in order to correctly infer the schema, if not provided by the user. Or we circumvent our writer and get duckdb to read the format and dump to paquet directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note that pq.ParquetWriter also doesn't hold everything in memory, it dumbs to parquet every 128MB row_group_buffer_size with 10k items per rowgroup.
This isn't yet addressing the problem of having large number of shards and thus needing to sample from random shards, but that's a follow up.
Towards #770