Table retention #556
purplefox
started this conversation in
Design discussions
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Table retention
Motivation
Sometimes users ingest into a source from a Topic which has a high volume of traffic, and they are only interested in the most recent data (e.g. 7 days worth of data). By default the source will store all data and storage size will grow without bound.
We would like a user to be able to configure an optional retention time on a source, and rows where the key hasn't been upserted in less than this retention time will be automatically deleted.
This feature is similar to Kafka log retention.
Approach
If retention is configured for a source we will add a hidden column containing the last update time in ms since Unix epoch, as a BIGINT (we use a BIGINT as parsing is simpler and quicker than a TIMESTAMP). The column value will be updated to current time when the row is upserted.
We will also create an index on the source which indexes on the last update time column. This enables us to scan the index in last update time order and efficiently locate the oldest entries in the source for deletion. Without the index we would have to scan the entire source continuously to find the oldest rows which would become prohibitively expensive for large data sets.
On each node, and for each shard which has a replica on that node, we will create an instance of a "reaper" - this is a component which scans the index for the shard, and deletes any entries which have exceeded the retention time.
At any one time there may be multiple sources in Prana with different retention times which can vary greatly. It would be inefficient for the reaper to continuously scan the index in the hope there are rows to be deleted. And setting a fixed periodic scan time (e.g. the shortest table retention time, does not work well).
Instead the reaper will scan the index for each table, iterating through the oldest entries, and deleting those which have exceeded the retention time. When it finds the first entry that has not exceeded the retention time, it adds an entry to a min heap with the time at which that row should be deleted. This is repeated for all tables with a retention time. A timer is then set for the entry at the top of the heap - that is, the next row that needs to be deleted some time in the future. When the timer fires, the top of the heap is popped, the index is scanned again, and entries deleted, and new entries pushed to the heap again when we find the next entry which is not ready to delete.
If no more entries are found when scanning the index for a table, then an entry is added to the heap with a time equal to now + table retention time.
In a single execution we iterate over the min heap, popping entries, deleting rows and pushing back the next deletion times until the entry at the top of the heap is not ready for deletion or a maximum batch size is exceeded.
We provide a maximum batch size to limit the number of rows that can be deleted in one pass of the reaper. This is so the Pebble batch size does not grow without bound.
Future improvements
In the case of a high throughput ingest into a source with a retention time, it might be possible (need to confirm) that rows could be ingested faster than old rows can be deleted. This would result in the source growing without bound.
For this case we should consider throttling, using a rate limiter, the ingest rate to some figure below the actual delete rate, which we can measure.
Proposed PR
PR for this here #552
Beta Was this translation helpful? Give feedback.
All reactions