Skip to content

Commit

Permalink
Split adding in a separate commit to make things easier to review
Browse files Browse the repository at this point in the history
  • Loading branch information
mpatou-openai committed Jan 22, 2025
1 parent 0856ff5 commit 6a5f19f
Showing 1 changed file with 383 additions and 1 deletion.
384 changes: 383 additions & 1 deletion table/block_based/block_based_table_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
bool prefix_filtering_;
};


struct BlockBasedTableBuilder::ParallelCompressionRep {
// Keys is a wrapper of vector of strings avoiding
// releasing string memories during vector clear()
Expand Down Expand Up @@ -567,6 +566,389 @@ struct BlockBasedTableBuilder::ParallelCompressionRep {
return block_rep;
}
};
struct BlockBasedTableBuilder::Rep {
const ImmutableOptions ioptions;
// BEGIN from MutableCFOptions
std::shared_ptr<const SliceTransform> prefix_extractor;
// END from MutableCFOptions
const WriteOptions write_options;
const BlockBasedTableOptions table_options;
const InternalKeyComparator& internal_comparator;
// Size in bytes for the user-defined timestamps.
size_t ts_sz;
// When `ts_sz` > 0 and this flag is false, the user-defined timestamp in the
// user key will be stripped when creating the block based table. This
// stripping happens for all user keys, including the keys in data block,
// index block for data block, index block for index block (if index type is
// `kTwoLevelIndexSearch`), index for filter blocks (if using partitioned
// filters), the `first_internal_key` in `IndexValue`, the `end_key` for range
// deletion entries.
// As long as the user keys are sorted when added via `Add` API, their logic
// ordering won't change after timestamps are stripped. However, for each user
// key to be logically equivalent before and after timestamp is stripped, the
// user key should contain the minimum timestamp.
bool persist_user_defined_timestamps;
WritableFileWriter* file;
std::atomic<uint64_t> offset;
size_t alignment;
BlockBuilder data_block;
// Buffers uncompressed data blocks to replay later. Needed when
// compression dictionary is enabled so we can finalize the dictionary before
// compressing any data blocks.
std::vector<std::string> data_block_buffers;
BlockBuilder range_del_block;

InternalKeySliceTransform internal_prefix_transform;
std::unique_ptr<IndexBuilder> index_builder;
PartitionedIndexBuilder* p_index_builder_ = nullptr;

std::string last_key;
const Slice* first_key_in_next_block = nullptr;
CompressionType compression_type;
uint64_t sample_for_compression;
std::atomic<uint64_t> compressible_input_data_bytes;
std::atomic<uint64_t> uncompressible_input_data_bytes;
std::atomic<uint64_t> sampled_input_data_bytes;
std::atomic<uint64_t> sampled_output_slow_data_bytes;
std::atomic<uint64_t> sampled_output_fast_data_bytes;
CompressionOptions compression_opts;
std::unique_ptr<CompressionDict> compression_dict;
std::vector<std::unique_ptr<CompressionContext>> compression_ctxs;
std::vector<std::unique_ptr<UncompressionContext>> verify_ctxs;
std::unique_ptr<UncompressionDict> verify_dict;

size_t data_begin_offset = 0;

TableProperties props;

// States of the builder.
//
// - `kBuffered`: This is the initial state where zero or more data blocks are
// accumulated uncompressed in-memory. From this state, call
// `EnterUnbuffered()` to finalize the compression dictionary if enabled,
// compress/write out any buffered blocks, and proceed to the `kUnbuffered`
// state.
//
// - `kUnbuffered`: This is the state when compression dictionary is finalized
// either because it wasn't enabled in the first place or it's been created
// from sampling previously buffered data. In this state, blocks are simply
// compressed/written out as they fill up. From this state, call `Finish()`
// to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
// the partially created file.
//
// - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
// called, so the table builder is no longer usable. We must be in this
// state by the time the destructor runs.
enum class State {
kBuffered,
kUnbuffered,
kClosed,
};
State state;
// `kBuffered` state is allowed only as long as the buffering of uncompressed
// data blocks (see `data_block_buffers`) does not exceed `buffer_limit`.
uint64_t buffer_limit;
std::shared_ptr<CacheReservationManager>
compression_dict_buffer_cache_res_mgr;
const bool use_delta_encoding_for_index_values;
std::unique_ptr<FilterBlockBuilder> filter_builder;
OffsetableCacheKey base_cache_key;
const TableFileCreationReason reason;

BlockHandle pending_handle; // Handle to add to index block

std::string compressed_output;
std::unique_ptr<FlushBlockPolicy> flush_block_policy;

std::vector<std::unique_ptr<InternalTblPropColl>> table_properties_collectors;

std::unique_ptr<ParallelCompressionRep> pc_rep;
BlockCreateContext create_context;

// The size of the "tail" part of a SST file. "Tail" refers to
// all blocks after data blocks till the end of the SST file.
uint64_t tail_size;

// See class Footer
uint32_t base_context_checksum;

uint64_t get_offset() { return offset.load(std::memory_order_relaxed); }
void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); }

bool IsParallelCompressionEnabled() const {
return compression_opts.parallel_threads > 1;
}

Status GetStatus() {
// We need to make modifications of status visible when status_ok is set
// to false, and this is ensured by status_mutex, so no special memory
// order for status_ok is required.
if (status_ok.load(std::memory_order_relaxed)) {
return Status::OK();
} else {
return CopyStatus();
}
}

Status CopyStatus() {
std::lock_guard<std::mutex> lock(status_mutex);
return status;
}

IOStatus GetIOStatus() {
// We need to make modifications of io_status visible when status_ok is set
// to false, and this is ensured by io_status_mutex, so no special memory
// order for io_status_ok is required.
if (io_status_ok.load(std::memory_order_relaxed)) {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED // Avoid unnecessary lock acquisition
auto ios = CopyIOStatus();
ios.PermitUncheckedError();
// Assume no races in unit tests
assert(ios.ok());
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
return IOStatus::OK();
} else {
return CopyIOStatus();
}
}

IOStatus CopyIOStatus() {
std::lock_guard<std::mutex> lock(io_status_mutex);
return io_status;
}

// Never erase an existing status that is not OK.
void SetStatus(Status s) {
if (!s.ok() && status_ok.load(std::memory_order_relaxed)) {
// Locking is an overkill for non compression_opts.parallel_threads
// case but since it's unlikely that s is not OK, we take this cost
// to be simplicity.
std::lock_guard<std::mutex> lock(status_mutex);
status = s;
status_ok.store(false, std::memory_order_relaxed);
}
}

// Never erase an existing I/O status that is not OK.
// Calling this will also SetStatus(ios)
void SetIOStatus(IOStatus ios) {
if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) {
// Locking is an overkill for non compression_opts.parallel_threads
// case but since it's unlikely that s is not OK, we take this cost
// to be simplicity.
std::lock_guard<std::mutex> lock(io_status_mutex);
io_status = ios;
io_status_ok.store(false, std::memory_order_relaxed);
}
SetStatus(ios);
}

Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo,
WritableFileWriter* f)
: ioptions(tbo.ioptions),
prefix_extractor(tbo.moptions.prefix_extractor),
write_options(tbo.write_options),
table_options(table_opt),
internal_comparator(tbo.internal_comparator),
ts_sz(tbo.internal_comparator.user_comparator()->timestamp_size()),
persist_user_defined_timestamps(
tbo.ioptions.persist_user_defined_timestamps),
file(f),
offset(0),
alignment(table_options.block_align
? std::min(static_cast<size_t>(table_options.block_size),
kDefaultPageSize)
: 0),
data_block(table_options.block_restart_interval,
table_options.use_delta_encoding,
false /* use_value_delta_encoding */,
tbo.internal_comparator.user_comparator()
->CanKeysWithDifferentByteContentsBeEqual()
? BlockBasedTableOptions::kDataBlockBinarySearch
: table_options.data_block_index_type,
table_options.data_block_hash_table_util_ratio, ts_sz,
persist_user_defined_timestamps),
range_del_block(
1 /* block_restart_interval */, true /* use_delta_encoding */,
false /* use_value_delta_encoding */,
BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */,
0.75 /* data_block_hash_table_util_ratio */, ts_sz,
persist_user_defined_timestamps),
internal_prefix_transform(prefix_extractor.get()),
compression_type(tbo.compression_type),
sample_for_compression(tbo.moptions.sample_for_compression),
compressible_input_data_bytes(0),
uncompressible_input_data_bytes(0),
sampled_input_data_bytes(0),
sampled_output_slow_data_bytes(0),
sampled_output_fast_data_bytes(0),
compression_opts(tbo.compression_opts),
compression_dict(),
compression_ctxs(tbo.compression_opts.parallel_threads),
verify_ctxs(tbo.compression_opts.parallel_threads),
verify_dict(),
state((tbo.compression_opts.max_dict_bytes > 0 &&
tbo.compression_type != kNoCompression)
? State::kBuffered
: State::kUnbuffered),
use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
!table_opt.block_align),
reason(tbo.reason),
flush_block_policy(
table_options.flush_block_policy_factory->NewFlushBlockPolicy(
table_options, data_block)),
create_context(&table_options, &ioptions, ioptions.stats,
compression_type == kZSTD ||
compression_type == kZSTDNotFinalCompression,
tbo.moptions.block_protection_bytes_per_key,
tbo.internal_comparator.user_comparator(),
!use_delta_encoding_for_index_values,
table_opt.index_type ==
BlockBasedTableOptions::kBinarySearchWithFirstKey),
tail_size(0),
status_ok(true),
io_status_ok(true) {
if (tbo.target_file_size == 0) {
buffer_limit = compression_opts.max_dict_buffer_bytes;
} else if (compression_opts.max_dict_buffer_bytes == 0) {
buffer_limit = tbo.target_file_size;
} else {
buffer_limit = std::min(tbo.target_file_size,
compression_opts.max_dict_buffer_bytes);
}

const auto compress_dict_build_buffer_charged =
table_options.cache_usage_options.options_overrides
.at(CacheEntryRole::kCompressionDictionaryBuildingBuffer)
.charged;
if (table_options.block_cache &&
(compress_dict_build_buffer_charged ==
CacheEntryRoleOptions::Decision::kEnabled ||
compress_dict_build_buffer_charged ==
CacheEntryRoleOptions::Decision::kFallback)) {
compression_dict_buffer_cache_res_mgr =
std::make_shared<CacheReservationManagerImpl<
CacheEntryRole::kCompressionDictionaryBuildingBuffer>>(
table_options.block_cache);
} else {
compression_dict_buffer_cache_res_mgr = nullptr;
}

assert(compression_ctxs.size() >= compression_opts.parallel_threads);
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
compression_ctxs[i].reset(
new CompressionContext(compression_type, compression_opts));
}
if (table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) {
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
&internal_comparator, use_delta_encoding_for_index_values,
table_options, ts_sz, persist_user_defined_timestamps);
index_builder.reset(p_index_builder_);
} else {
index_builder.reset(IndexBuilder::CreateIndexBuilder(
table_options.index_type, &internal_comparator,
&this->internal_prefix_transform, use_delta_encoding_for_index_values,
table_options, ts_sz, persist_user_defined_timestamps));
}
if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) {
// Apply optimize_filters_for_hits setting here when applicable by
// skipping filter generation
filter_builder.reset();
} else if (tbo.skip_filters) {
// For SstFileWriter skip_filters
filter_builder.reset();
} else if (!table_options.filter_policy) {
// Null filter_policy -> no filter
filter_builder.reset();
} else {
FilterBuildingContext filter_context(table_options);

filter_context.info_log = ioptions.logger;
filter_context.column_family_name = tbo.column_family_name;
filter_context.reason = reason;

// Only populate other fields if known to be in LSM rather than
// generating external SST file
if (reason != TableFileCreationReason::kMisc) {
filter_context.compaction_style = ioptions.compaction_style;
filter_context.num_levels = ioptions.num_levels;
filter_context.level_at_creation = tbo.level_at_creation;
filter_context.is_bottommost = tbo.is_bottommost;
assert(filter_context.level_at_creation < filter_context.num_levels);
}

filter_builder.reset(CreateFilterBlockBuilder(
ioptions, tbo.moptions, filter_context,
use_delta_encoding_for_index_values, p_index_builder_, ts_sz,
persist_user_defined_timestamps));
}

assert(tbo.internal_tbl_prop_coll_factories);
for (auto& factory : *tbo.internal_tbl_prop_coll_factories) {
assert(factory);

std::unique_ptr<InternalTblPropColl> collector{
factory->CreateInternalTblPropColl(tbo.column_family_id,
tbo.level_at_creation)};
if (collector) {
table_properties_collectors.emplace_back(std::move(collector));
}
}
table_properties_collectors.emplace_back(
new BlockBasedTablePropertiesCollector(
table_options.index_type, table_options.whole_key_filtering,
prefix_extractor != nullptr));
if (ts_sz > 0 && persist_user_defined_timestamps) {
table_properties_collectors.emplace_back(
new TimestampTablePropertiesCollector(
tbo.internal_comparator.user_comparator()));
}
if (table_options.verify_compression) {
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
verify_ctxs[i].reset(new UncompressionContext(compression_type));
}
}

// These are only needed for populating table properties
props.column_family_id = tbo.column_family_id;
props.column_family_name = tbo.column_family_name;
props.oldest_key_time = tbo.oldest_key_time;
props.file_creation_time = tbo.file_creation_time;
props.orig_file_number = tbo.cur_file_num;
props.db_id = tbo.db_id;
props.db_session_id = tbo.db_session_id;
props.db_host_id = ioptions.db_host_id;
if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) {
ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set");
}

if (FormatVersionUsesContextChecksum(table_options.format_version)) {
// Must be non-zero and semi- or quasi-random
// TODO: ideally guaranteed different for related files (e.g. use file
// number and db_session, for benefit of SstFileWriter)
do {
base_context_checksum = Random::GetTLSInstance()->Next();
} while (UNLIKELY(base_context_checksum == 0));
} else {
base_context_checksum = 0;
}
}

Rep(const Rep&) = delete;
Rep& operator=(const Rep&) = delete;

private:
// Synchronize status & io_status accesses across threads from main thread,
// compression thread and write thread in parallel compression.
std::mutex status_mutex;
std::atomic<bool> status_ok;
Status status;
std::mutex io_status_mutex;
std::atomic<bool> io_status_ok;
IOStatus io_status;
};


BlockBasedTableBuilder::BlockBasedTableBuilder(
const BlockBasedTableOptions& table_options, const TableBuilderOptions& tbo,
Expand Down

0 comments on commit 6a5f19f

Please sign in to comment.