Skip to content

Commit

Permalink
Verify quantized index after compression
Browse files Browse the repository at this point in the history
Make the `--check` flag to take effect when compressing an index.

Signed-off-by: Michal Siedlaczek <michal@siedlaczek.me>
  • Loading branch information
elshize committed Feb 7, 2024
1 parent ef68dab commit 57cd7be
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 56 deletions.
28 changes: 27 additions & 1 deletion include/pisa/scorer/quantized.hpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,45 @@
#pragma once

#include <algorithm>
#include <cmath>
#include <cstdint>
#include <utility>

#include "index_scorer.hpp"
#include "linear_quantizer.hpp"

namespace pisa {

template <typename Wand>
struct quantized: public index_scorer<Wand> {
using index_scorer<Wand>::index_scorer;

term_scorer_t term_scorer([[maybe_unused]] uint64_t term_id) const {
return []([[maybe_unused]] uint32_t doc, uint32_t freq) { return freq; };
}
};

/**
* Uses internal scorer and quantizer to produce quantized scores.
*
* This is not inheriting from `index_scorer` because it returns int scores.
*/
template <typename Wand>
class QuantizingScorer {
private:
std::unique_ptr<index_scorer<Wand>> m_scorer;
LinearQuantizer m_quantizer;

public:
QuantizingScorer(std::unique_ptr<index_scorer<Wand>> scorer, LinearQuantizer quantizer)
: m_scorer(std::move(scorer)), m_quantizer(quantizer) {}

[[nodiscard]] auto term_scorer(std::uint64_t term_id) const
-> std::function<std::uint32_t(std::uint32_t, std::uint32_t)> {
return
[this, scorer = m_scorer->term_scorer(term_id)](std::uint32_t doc, std::uint32_t freq) {
return this->m_quantizer(scorer(doc, freq));
};
}
};

} // namespace pisa
34 changes: 26 additions & 8 deletions include/pisa/util/verify_collection.hpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
#pragma once

#include <optional>

#include "spdlog/spdlog.h"

#include "mappable/mapper.hpp"
#include "memory_source.hpp"
#include "util/util.hpp"
#include "scorer/quantized.hpp"

namespace pisa {

template <typename InputCollection, typename Collection>
void verify_collection(InputCollection const& input, const char* filename) {
template <typename InputCollection, typename Collection, typename Wand>
void verify_collection(
InputCollection const& input,
const char* filename,
std::optional<QuantizingScorer<Wand>> quantizing_scorer = std::nullopt
) {
Collection coll;
auto source = MemorySource::mapped_file(std::filesystem::path(filename));
pisa::mapper::map(coll, source.data());
Expand All @@ -21,8 +27,13 @@ void verify_collection(InputCollection const& input, const char* filename) {
auto e = coll[s];
if (e.size() != size) {
spdlog::error("sequence {} has wrong length! ({} != {})", s, e.size(), size);
exit(1);
throw std::runtime_error("oops");
}
auto term_scorer = quantizing_scorer.has_value()
? std::make_optional<std::function<std::uint32_t(std::uint32_t, std::uint32_t)>>(
quantizing_scorer->term_scorer(s)
)
: std::nullopt;
for (size_t i = 0; i < e.size(); ++i, e.next()) {
uint64_t docid = *(seq.docs.begin() + i);
uint64_t freq = *(seq.freqs.begin() + i);
Expand All @@ -31,16 +42,23 @@ void verify_collection(InputCollection const& input, const char* filename) {
spdlog::error("docid in sequence {} differs at position {}!", s, i);
spdlog::error("{} != {}", e.docid(), docid);
spdlog::error("sequence length: {}", seq.docs.size());

exit(1);
throw std::runtime_error("oops");
}

if (freq != e.freq()) {
if (!term_scorer.has_value() && freq != e.freq()) {
spdlog::error("freq in sequence {} differs at position {}!", s, i);
spdlog::error("{} != {}", e.freq(), freq);
spdlog::error("sequence length: {}", seq.docs.size());
throw std::runtime_error("oops");
}

exit(1);
if (term_scorer.has_value()) {
if ((*term_scorer)(docid, freq) != e.freq()) {
spdlog::error("quantized score in sequence {} differs at position {}!", s, i);
spdlog::error("{} != {}", e.freq(), (*term_scorer)(docid, freq));
spdlog::error("sequence length: {}", seq.docs.size());
throw std::runtime_error("oops");
}
}
}
s += 1;
Expand Down
69 changes: 29 additions & 40 deletions src/compress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,12 @@ void dump_index_specific_stats(pisa::pefopt_index const& coll, std::string const
);
}

template <typename Wand>
struct QuantizedScorer {
QuantizedScorer(std::unique_ptr<index_scorer<Wand>> scorer, LinearQuantizer quantizer)
: scorer(std::move(scorer)), quantizer(quantizer) {}
std::unique_ptr<index_scorer<Wand>> scorer;
LinearQuantizer quantizer;
};

template <typename CollectionType, typename Wand>
void compress_index_streaming(
binary_freq_collection const& input,
pisa::global_parameters const& params,
std::string const& output_filename,
std::optional<QuantizedScorer<Wand>> quantized_scorer,
std::optional<QuantizingScorer<Wand>> quantizing_scorer,
bool check
) {
spdlog::info("Processing {} documents (streaming)", input.num_docs());
Expand All @@ -68,17 +60,16 @@ void compress_index_streaming(
pisa::progress progress("Create index", input.size());

size_t term_id = 0;
if (quantized_scorer) {
auto&& [scorer, quantizer] = *quantized_scorer;
if (quantizing_scorer) {
std::vector<std::uint64_t> quantized_scores;
for (auto const& plist: input) {
auto term_scorer = scorer->term_scorer(term_id);
auto term_scorer = quantizing_scorer->term_scorer(term_id);
std::size_t size = plist.docs.size();
for (size_t pos = 0; pos < size; ++pos) {
auto doc = *(plist.docs.begin() + pos);
auto freq = *(plist.freqs.begin() + pos);
auto score = term_scorer(doc, freq);
quantized_scores.push_back(quantizer(score));
quantized_scores.push_back(score);
}
auto sum = std::accumulate(
quantized_scores.begin(), quantized_scores.end(), std::uint64_t(0)
Expand All @@ -104,8 +95,10 @@ void compress_index_streaming(
double elapsed_secs = (get_time_usecs() - tick) / 1000000;
spdlog::info("Index compressed in {} seconds", elapsed_secs);

if (check && not quantized_scorer) {
verify_collection<binary_freq_collection, CollectionType>(input, output_filename.c_str());
if (check) {
verify_collection<binary_freq_collection, CollectionType>(
input, output_filename.c_str(), std::move(quantizing_scorer)
);
}
}

Expand All @@ -122,8 +115,8 @@ void compress_index(
ScorerParams const& scorer_params,
std::optional<Size> quantization_bits
) {
std::optional<QuantizingScorer<WandType>> quantizing_scorer{};
if constexpr (std::is_same_v<typename CollectionType::index_layout_tag, BlockIndexTag>) {
std::optional<QuantizedScorer<WandType>> quantized_scorer{};
WandType wdata;
mio::mmap_source wdata_source;
if (quantization_bits.has_value()) {
Expand All @@ -138,46 +131,45 @@ void compress_index(
mapper::map(wdata, wdata_source, mapper::map_flags::warmup);
auto scorer = scorer::from_params(scorer_params, wdata);
LinearQuantizer quantizer(wdata.index_max_term_weight(), quantization_bits->as_int());
quantized_scorer = QuantizedScorer(std::move(scorer), quantizer);
quantizing_scorer = QuantizingScorer(std::move(scorer), quantizer);
}
compress_index_streaming<CollectionType, WandType>(
input, params, *output_filename, std::move(quantized_scorer), check
input, params, *output_filename, std::move(quantizing_scorer), check
);
return;
}

spdlog::info("Processing {} documents", input.num_docs());
double tick = get_time_usecs();

WandType const wdata = [&] {
if (wand_data_filename) {
return WandType(MemorySource::mapped_file(*wand_data_filename));
}
return WandType{};
}();

if (quantization_bits.has_value()) {
std::unique_ptr<index_scorer<WandType>> scorer = scorer::from_params(scorer_params, wdata);
LinearQuantizer quantizer(wdata.index_max_term_weight(), quantization_bits->as_int());
quantizing_scorer = QuantizingScorer(std::move(scorer), quantizer);
}

typename CollectionType::builder builder(input.num_docs(), params);
size_t postings = 0;
{
pisa::progress progress("Create index", input.size());
WandType const wdata = [&] {
if (wand_data_filename) {
return WandType(MemorySource::mapped_file(*wand_data_filename));
}
return WandType{};
}();

std::unique_ptr<index_scorer<WandType>> scorer;

if (quantization_bits.has_value()) {
scorer = scorer::from_params(scorer_params, wdata);
}

size_t term_id = 0;
for (auto const& plist: input) {
size_t size = plist.docs.size();
if (quantization_bits.has_value()) {
LinearQuantizer quantizer(wdata.index_max_term_weight(), quantization_bits->as_int());
auto term_scorer = scorer->term_scorer(term_id);
if (quantizing_scorer.has_value()) {
auto term_scorer = quantizing_scorer->term_scorer(term_id);
std::vector<uint64_t> quants;
for (size_t pos = 0; pos < size; ++pos) {
uint64_t doc = *(plist.docs.begin() + pos);
uint64_t freq = *(plist.freqs.begin() + pos);
float score = term_scorer(doc, freq);
uint64_t quant_score = quantizer(score);
uint64_t quant_score = term_scorer(doc, freq);
quants.push_back(quant_score);
}
assert(quants.size() == size);
Expand Down Expand Up @@ -210,12 +202,9 @@ void compress_index(

if (output_filename) {
mapper::freeze(coll, (*output_filename).c_str());
if (check and quantization_bits.has_value()) {
spdlog::warn("Index construction cannot be verified for quantized indexes.");
}
if (check and not quantization_bits.has_value()) {
if (check) {
verify_collection<binary_freq_collection, CollectionType>(
input, (*output_filename).c_str()
input, (*output_filename).c_str(), std::move(quantizing_scorer)
);
}
}
Expand Down
87 changes: 87 additions & 0 deletions test/test_compress.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#define CATCH_CONFIG_MAIN
#include "catch2/catch.hpp"

#include "pisa/compress.hpp"
#include "pisa/scorer/scorer.hpp"
#include "pisa/wand_data.hpp"
#include "pisa_config.hpp"
#include "temporary_directory.hpp"
#include "type_safe.hpp"
#include "wand_utils.hpp"

TEST_CASE("Compress block index", "[index][compress]") {
std::string encoding = GENERATE(
"ef",
"single",
"pefuniform",
"pefopt",
"block_optpfor",
"block_varintg8iu",
"block_streamvbyte",
"block_maskedvbyte",
"block_varintgb",
"block_interpolative",
"block_qmx",
"block_simple8b",
"block_simple16",
"block_simdbp"
);
pisa::TemporaryDirectory tmp;
pisa::compress(
PISA_SOURCE_DIR "/test/test_data/test_collection",
std::nullopt, // no wand
encoding,
(tmp.path() / encoding).string(),
ScorerParams(""), // no scorer
std::nullopt, // no quantization
true // check=true
);
}

TEST_CASE("Compress quantized block index", "[index][compress]") {
auto input = PISA_SOURCE_DIR "/test/test_data/test_collection";

std::string scorer = GENERATE("bm25");
CAPTURE(scorer);
auto scorer_params = ScorerParams(scorer);

pisa::TemporaryDirectory tmp;
pisa::create_wand_data(
(tmp.path() / "wand").string(),
input,
pisa::FixedBlock(64),
scorer_params,
false,
false,
pisa::Size(8),
std::unordered_set<std::size_t>()
);

std::string encoding = GENERATE(
"ef",
"single",
"pefuniform",
"pefopt",
"block_optpfor",
"block_varintg8iu",
"block_streamvbyte",
"block_maskedvbyte",
"block_varintgb",
"block_interpolative",
"block_qmx",
"block_simple8b",
"block_simple16",
"block_simdbp"
);
CAPTURE(encoding);

pisa::compress(
input,
(tmp.path() / "wand").string(),
encoding,
(tmp.path() / encoding).string(),
scorer_params,
pisa::Size(8),
true // check=true
);
}
7 changes: 0 additions & 7 deletions test/test_stream_builder.cpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
#define CATCH_CONFIG_MAIN

#include <catch2/catch.hpp>
#include <functional>

#include "accumulator/lazy_accumulator.hpp"
#include "cursor/block_max_scored_cursor.hpp"
#include "cursor/max_scored_cursor.hpp"
#include "cursor/scored_cursor.hpp"
#include "index_types.hpp"
#include "io.hpp"
#include "pisa_config.hpp"
#include "query/algorithm.hpp"
#include "temporary_directory.hpp"
#include "test_common.hpp"

using namespace pisa;

Expand Down

0 comments on commit 57cd7be

Please sign in to comment.