Skip to content

Commit

Permalink
Separate compilation unit for codec
Browse files Browse the repository at this point in the history
  • Loading branch information
elshize committed Mar 28, 2024
1 parent 7a94fb4 commit b75fcbe
Show file tree
Hide file tree
Showing 7 changed files with 488 additions and 116 deletions.
321 changes: 289 additions & 32 deletions include/pisa/block_inverted_index.hpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
#pragma once

#include <fmt/format.h>

#include "bit_vector.hpp"
#include "codec/block_codec.hpp"
#include "codec/block_codecs.hpp"
#include "codec/compact_elias_fano.hpp"
#include "concepts.hpp"
#include "concepts/inverted_index.hpp"
#include "global_parameters.hpp"
#include "mappable/mappable_vector.hpp"
#include "mappable/mapper.hpp"
#include "memory_source.hpp"
#include "temporary_directory.hpp"

namespace pisa {

namespace index::block {
class InMemoryBuilder;
class StreamBuilder;
} // namespace index::block

/**
* Cursor for a block-encoded posting list.
*/
class BlockInvertedIndexCursor {
public:
BlockInvertedIndexCursor(BlockCodec const* block_codec, std::uint8_t const* data, std::uint64_t universe)
Expand Down Expand Up @@ -243,7 +252,6 @@ class BlockInvertedIndexCursor {
};

class BlockInvertedIndex {
private:
global_parameters m_params;
std::size_t m_size{0};
std::size_t m_num_docs{0};
Expand All @@ -252,56 +260,305 @@ class BlockInvertedIndex {
MemorySource m_source;
std::unique_ptr<BlockCodec> m_block_codec;

void check_term_range(std::size_t term_id) const;

friend class index::block::InMemoryBuilder;
friend class index::block::StreamBuilder;

public:
using document_enumerator = BlockInvertedIndexCursor;

explicit BlockInvertedIndex(MemorySource source, std::unique_ptr<BlockCodec> block_codec)
: m_source(std::move(source)), m_block_codec(std::move(block_codec)) {
PISA_ASSERT_CONCEPT(
(concepts::SortedInvertedIndex<BlockInvertedIndex, BlockInvertedIndexCursor>)
);
mapper::map(*this, m_source.data(), mapper::map_flags::warmup);
}
explicit BlockInvertedIndex(MemorySource source, std::unique_ptr<BlockCodec> block_codec);

template <typename Visitor>
void map(Visitor& visit) {
visit(m_params, "m_params")(m_size, "m_size")(m_num_docs, "m_num_docs")(
m_endpoints, "m_endpoints")(m_lists, "m_lists");
}

[[nodiscard]] auto operator[](std::size_t term_id) const -> BlockInvertedIndexCursor {
// check_term_range(term_id);
compact_elias_fano::enumerator endpoints(m_endpoints, 0, m_lists.size(), m_size, m_params);
auto endpoint = endpoints.move(term_id).second;
return BlockInvertedIndexCursor(m_block_codec.get(), m_lists.data() + endpoint, num_docs());
}
[[nodiscard]] auto operator[](std::size_t term_id) const -> BlockInvertedIndexCursor;

/**
* \returns The size of the index, i.e., the number of terms (posting lists).
* The size of the index, i.e., the number of terms (posting lists).
*/
[[nodiscard]] std::size_t size() const noexcept { return m_size; }
[[nodiscard]] auto size() const noexcept -> std::size_t { return m_size; }

/**
* \returns The number of distinct documents in the index.
* The number of distinct documents in the index.
*/
[[nodiscard]] std::uint64_t num_docs() const noexcept { return m_num_docs; }
[[nodiscard]] auto num_docs() const noexcept -> std::uint64_t { return m_num_docs; }

void warmup(std::size_t term_id) const {
// check_term_range(term_id);
compact_elias_fano::enumerator endpoints(m_endpoints, 0, m_lists.size(), m_size, m_params);
void warmup(std::size_t term_id) const;
};

auto begin = endpoints.move(term_id).second;
auto end = m_lists.size();
if (term_id + 1 != size()) {
end = endpoints.move(term_id + 1).second;
namespace index::block {

class BlockPostingWriter {
BlockCodec const* m_block_codec;

public:
explicit BlockPostingWriter(BlockCodec const* block_codec) : m_block_codec(block_codec) {}

template <typename DocsIterator, typename FreqsIterator>
void write(
std::vector<uint8_t>& out, uint32_t n, DocsIterator docs_begin, FreqsIterator freqs_begin
) {
TightVariableByte::encode_single(n, out);

uint64_t block_size = m_block_codec->block_size();
uint64_t blocks = ceil_div(n, block_size);
size_t begin_block_maxs = out.size();
size_t begin_block_endpoints = begin_block_maxs + 4 * blocks;
size_t begin_blocks = begin_block_endpoints + 4 * (blocks - 1);
out.resize(begin_blocks);

DocsIterator docs_it(docs_begin);
FreqsIterator freqs_it(freqs_begin);
std::vector<uint32_t> docs_buf(block_size);
std::vector<uint32_t> freqs_buf(block_size);
int32_t last_doc(-1);
uint32_t block_base = 0;
for (size_t b = 0; b < blocks; ++b) {
uint32_t cur_block_size = ((b + 1) * block_size <= n) ? block_size : (n % block_size);

for (size_t i = 0; i < cur_block_size; ++i) {
uint32_t doc(*docs_it++);
docs_buf[i] = doc - last_doc - 1;
last_doc = doc;

freqs_buf[i] = *freqs_it++ - 1;
}
*((uint32_t*)&out[begin_block_maxs + 4 * b]) = last_doc;

m_block_codec->encode(
docs_buf.data(), last_doc - block_base - (cur_block_size - 1), cur_block_size, out
);
m_block_codec->encode(freqs_buf.data(), uint32_t(-1), cur_block_size, out);
if (b != blocks - 1) {
*((uint32_t*)&out[begin_block_endpoints + 4 * b]) = out.size() - begin_blocks;
}
block_base = last_doc + 1;
}
}
};

volatile std::uint32_t tmp;
for (std::size_t i = begin; i != end; ++i) {
tmp = m_lists[i];
template <typename BlockDataRange>
void write_blocks(std::vector<uint8_t>& out, uint32_t n, BlockDataRange const& input_blocks) {
TightVariableByte::encode_single(n, out);
assert(input_blocks.front().index == 0); // first block must remain first

uint64_t blocks = input_blocks.size();
size_t begin_block_maxs = out.size();
size_t begin_block_endpoints = begin_block_maxs + 4 * blocks;
size_t begin_blocks = begin_block_endpoints + 4 * (blocks - 1);
out.resize(begin_blocks);

for (auto const& block: input_blocks) {
size_t b = block.index;
// write endpoint
if (b != 0) {
*((uint32_t*)&out[begin_block_endpoints + 4 * (b - 1)]) = out.size() - begin_blocks;
}

// write max
*((uint32_t*)&out[begin_block_maxs + 4 * b]) = block.max;

// copy block
block.append_docs_block(out);
block.append_freqs_block(out);
}
(void)tmp;
}
};

/**
* In-memory block index builder.
*
* Builds the index in memory, which is eventually flushed to disk at the very end.
*/
class InMemoryBuilder {
global_parameters m_params;
std::size_t m_num_docs;
BlockPostingWriter m_posting_writer;
std::vector<std::uint64_t> m_endpoints{};
std::vector<std::uint8_t> m_lists{};

public:
/**
* Constructs a builder for an index containing the given number of documents.
*/
InMemoryBuilder(
std::uint64_t num_docs, global_parameters const& params, BlockPostingWriter posting_writer
);

/**
* Records a new posting list.
*
* Postings are written to an in-memory buffer.
*
* \param n Posting list length.
* \param docs_begin Iterator that points at the first document ID.
* \param freqs_begin Iterator that points at the first frequency.
* \param occurrences Not used in this builder.
*
* \throws std::invalid_argument Thrown if `n == 0`.
*/
template <typename DocsIterator, typename FreqsIterator>
void
add_posting_list(std::uint64_t n, DocsIterator docs_begin, FreqsIterator freqs_begin, std::uint64_t /* occurrences */) {
if (!n) {
throw std::invalid_argument("List must be nonempty");
}
m_posting_writer.write(m_lists, n, docs_begin, freqs_begin);
m_endpoints.push_back(m_lists.size());
}

/**
* Adds multiple posting list blocks.
*
* \tparam BlockDataRange This intended to be a vector of structs of the type
* block_posting_list<BlockCodec, Profile>::document_enumerator::block_data
*
* \param n Posting list length.
* \param blocks Encoded blocks.
*
* \throws std::invalid_argument Thrown if `n == 0`.
*/
template <typename BlockDataRange>
void add_posting_list(std::uint64_t n, BlockDataRange const& blocks) {
if (!n) {
throw std::invalid_argument("List must be nonempty");
}
write_blocks(m_lists, n, blocks);
m_endpoints.push_back(m_lists.size());
}

/**
* Adds a posting list that is already fully encoded.
*
* \tparam BytesRange A collection of bytes, e.g., std::vector<std::uint8_t>
*
* \param data Encoded data.
*/
template <typename BytesRange>
void add_posting_list(BytesRange const& data) {
m_lists.insert(m_lists.end(), std::begin(data), std::end(data));
m_endpoints.push_back(m_lists.size());
}

/**
* Builds an index.
*
* \param sq Inverted index object that will take ownership of the data.
*/
void build(BlockInvertedIndex& sq);
};

/**
* Stream block index builder.
*
* Buffers postings on disk in order to support building indexes larger than memory.
*/
class StreamBuilder {
global_parameters m_params{};
std::size_t m_num_docs = 0;
std::vector<std::uint64_t> m_endpoints{};
TemporaryDirectory tmp{};
std::ofstream m_postings_output;
std::size_t m_postings_bytes_written{0};
std::unique_ptr<BlockCodec> m_block_codec;
BlockPostingWriter m_posting_writer;

public:
/**
* Constructs a builder for an index containing the given number of documents.
*
* This constructor opens a temporary file to write. This file is used for
* buffering postings. This buffer is flushed to the right destination when the
* `build` member function is called.
*
* \throws std::ios_base::failure Thrown if the the temporary buffer file cannot be opened.
*/
StreamBuilder(
std::uint64_t num_docs, global_parameters const& params, BlockPostingWriter posting_writer
);

/**
* Records a new posting list.
*
* Postings are written to the temporary file, while some other data is accumulated
* within the builder struct.
*
* \param n Posting list length.
* \param docs_begin Iterator that points at the first document ID.
* \param freqs_begin Iterator that points at the first frequency.
* \param occurrences Not used in this builder.
*
* \throws std::invalid_argument Thrown if `n == 0`.
* \throws std::ios_base::failure Thrown if failed to write to the temporary file buffer.
*/
template <typename DocsIterator, typename FreqsIterator>
void
add_posting_list(std::uint64_t n, DocsIterator docs_begin, FreqsIterator freqs_begin, std::uint64_t /* occurrences */) {
if (!n) {
throw std::invalid_argument("List must be nonempty");
}
std::vector<std::uint8_t> buf;
m_posting_writer.write(buf, n, docs_begin, freqs_begin);
m_postings_bytes_written += buf.size();
m_postings_output.write(reinterpret_cast<char const*>(buf.data()), buf.size());
m_endpoints.push_back(m_postings_bytes_written);
}

/**
* Adds multiple posting list blocks.
*
* \tparam BlockDataRange This intended to be a vector of structs of the type
* block_posting_list<BlockCodec, Profile>::document_enumerator::block_data
*
* \param n Posting list length.
* \param blocks Encoded blocks.
*
* \throws std::invalid_argument Thrown if `n == 0`.
* \throws std::ios_base::failure Thrown if failed to write to the temporary file buffer.
*/
template <typename BlockDataRange>
void add_posting_list(std::uint64_t n, BlockDataRange const& blocks) {
if (!n) {
throw std::invalid_argument("List must be nonempty");
}
std::vector<std::uint8_t> buf;
write_blocks(buf, n, blocks);
m_postings_bytes_written += buf.size();
m_postings_output.write(reinterpret_cast<char const*>(buf.data()), buf.size());
m_endpoints.push_back(m_postings_bytes_written);
}

/**
* Adds a posting list that is already fully encoded.
*
* \tparam BytesRange A collection of bytes, e.g., std::vector<std::uint8_t>
*
* \param data Encoded data.
*
* \throws std::ios_base::failure Thrown if failed to write to the temporary file buffer.
*/
template <typename BytesRange>
void add_posting_list(BytesRange const& data) {
m_postings_bytes_written += data.size();
m_postings_output.write(reinterpret_cast<char const*>(data.data()), data.size());
m_endpoints.push_back(m_postings_bytes_written);
}

/**
* Flushes index data to disk.
*
* \param index_path Output index path.
*
* \throws std::ios_base::failure Thrown if failed to write to any file
* or failed to read from the temporary buffer.
*/
void build(std::string const& index_path);
};

}; // namespace index::block

}; // namespace pisa
Loading

0 comments on commit b75fcbe

Please sign in to comment.