Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support cleaning up spare examples correctly in read_span_flatbuffer() #4684

Merged
merged 12 commits into from
Feb 15, 2024
Merged
3 changes: 3 additions & 0 deletions vowpalwabbit/core/include/vw/core/error_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ ERROR_CODE_DEFINITION(
ERROR_CODE_DEFINITION(
13, fb_parser_size_mismatch_ft_names_ft_values, "Size of feature names and feature values do not match. ")
ERROR_CODE_DEFINITION(14, unknown_label_type, "Label type in Flatbuffer not understood. ")
ERROR_CODE_DEFINITION(15, fb_parser_span_misaligned, "Input Flatbuffer span is not aligned to an 8-byte boundary. ")
ERROR_CODE_DEFINITION(
16, fb_parser_span_length_mismatch, "Input Flatbuffer span does not match flatbuffer size prefix. ")

// TODO: This is temporary until we switch to the new error handling mechanism.
ERROR_CODE_DEFINITION(10000, vw_exception, "vw_exception: ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#pragma once

#include "vw/core/api_status.h"
#include "vw/core/example.h"
#include "vw/core/multi_ex.h"
#include "vw/core/shared_data.h"
Expand All @@ -14,15 +13,21 @@
namespace VW
{

namespace experimental
{
class api_status;
}

using example_sink_f = std::function<void(VW::multi_ex&& spare_examples)>;

namespace parsers
{
namespace flatbuffer
{
int flatbuffer_to_examples(VW::workspace* all, io_buf& buf, VW::multi_ex& examples);
bool read_span_flatbuffer(
VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory, VW::multi_ex& examples);

int read_span_flatbuffer(VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory,
VW::multi_ex& examples, example_sink_f example_sink = nullptr, VW::experimental::api_status* status = nullptr);

class parser
{
Expand Down Expand Up @@ -57,6 +62,19 @@ class parser
VW::experimental::api_status* status = nullptr);
int get_namespace_index(const Namespace* ns, namespace_index& ni, VW::experimental::api_status* status = nullptr);

inline void reset_active_multi_ex()
{
_multi_ex_index = 0;
_active_multi_ex = false;
_multi_example_object = nullptr;
}

inline void reset_active_collection()
{
_example_index = 0;
_active_collection = false;
}

void parse_simple_label(shared_data* sd, polylabel* l, reduction_features* red_features, const SimpleLabel* label);
void parse_cb_label(polylabel* l, const CBLabel* label);
void parse_ccb_label(polylabel* l, const CCBLabel* label);
Expand Down
130 changes: 80 additions & 50 deletions vowpalwabbit/fb_parser/src/parse_example_flatbuffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
#include "vw/fb_parser/parse_example_flatbuffer.h"

#include "vw/core/action_score.h"
#include "vw/core/api_status.h"
#include "vw/core/best_constant.h"
#include "vw/core/cb.h"
#include "vw/core/constant.h"
#include "vw/core/error_constants.h"
#include "vw/core/global_data.h"
#include "vw/core/parser.h"
#include "vw/core/scope_exit.h"
#include "vw/core/vw.h"

#include <cfloat>
Expand Down Expand Up @@ -43,8 +45,8 @@ int flatbuffer_to_examples(VW::workspace* all, io_buf& buf, VW::multi_ex& exampl
return static_cast<int>(status.get_error_code() == VW::experimental::error_code::success);
}

bool read_span_flatbuffer(
VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory, VW::multi_ex& examples)
int read_span_flatbuffer(VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory,
VW::multi_ex& examples, example_sink_f example_sink, VW::experimental::api_status* status)
{
// we expect context to contain a size_prefixed flatbuffer (technically a binary string)
// which means:
Expand All @@ -59,16 +61,15 @@ bool read_span_flatbuffer(
// thus context.size() = sizeof(length) + length
io_buf unused;

// TODO: How do we report errors out of here? (This is a general API problem with the parsers)
size_t address = reinterpret_cast<size_t>(span);
if (address % 8 != 0)
{
std::stringstream sstream;
sstream << "fb_parser error: flatbuffer data not aligned to 8 bytes" << std::endl;
sstream << " span => @" << std::hex << address << std::dec << " % " << 8 << " = " << address % 8
<< " (vs desired = " << 0 << ")";
THROW(sstream.str());
return false;

RETURN_ERROR_LS(status, fb_parser_span_misaligned) << sstream.str();
lokitoth marked this conversation as resolved.
Show resolved Hide resolved
}

flatbuffers::uoffset_t flatbuffer_object_size =
Expand All @@ -79,42 +80,80 @@ bool read_span_flatbuffer(
sstream << "fb_parser error: flatbuffer size prefix does not match actual size" << std::endl;
sstream << " span => @" << std::hex << address << std::dec << " size_prefix = " << flatbuffer_object_size
<< " length = " << length;
THROW(sstream.str());
return false;

RETURN_ERROR_LS(status, fb_parser_span_length_mismatch) << sstream.str();
}

VW::multi_ex temp_ex;
temp_ex.push_back(&example_factory());

// Use scope_exit because the parser reports errors by throwing exceptions (the code path in the vw driver
// uses the return value to signal completion, not errors).
auto scope_guard = VW::scope_exit(
[&temp_ex, &all, &example_sink]()
{
if (example_sink == nullptr) { VW::finish_example(*all, temp_ex); }
else { example_sink(std::move(temp_ex)); }
});

// There is a bit of unhappiness with the interface of the read_XYZ_<format>() functions, because they often
// expect the input multi_ex to have a single "empty" example there. This contributes, in part, to the large
// proliferation of entry points into the JSON parser(s). We want to avoid exposing that insofar as possible,
// so we will check whether we already received a perfectly good example and use that, or create a new one if
// needed.
if (examples.size() > 0)
lokitoth marked this conversation as resolved.
Show resolved Hide resolved
lokitoth marked this conversation as resolved.
Show resolved Hide resolved
{
assert(examples.size() == 1);
temp_ex.push_back(examples[0]);
examples.pop_back();
}
else { temp_ex.push_back(&example_factory()); }

bool has_more = true;
VW::experimental::api_status status;
do {
switch (all->parser_runtime.flat_converter->parse_examples(all, unused, temp_ex, span, &status))
switch (int result = all->parser_runtime.flat_converter->parse_examples(all, unused, temp_ex, span, status))
{
case VW::experimental::error_code::success:
has_more = true;
break;
// Because nothing_to_parse is not an error we have to filter it out here, otherwise
// we could simply do RETURN_IF_FAIL(result) and let the macro handle it.
case VW::experimental::error_code::nothing_to_parse:
has_more = false;
break;
default:
std::stringstream sstream;
sstream << "Error parsing examples: " << std::endl;
THROW(sstream.str());
return false;
RETURN_IF_FAIL(result);
}

// The underlying parser will emit a newline example when terminating the parsing
// of a multi_ex block. Since we are collecting it into a multi_ex, we want to
// swallow it here, but should the parser not have followed its contract w.r.t.
// the return value, we should use the presence of the newline example to override
// has_more.
has_more &= !temp_ex[0]->is_newline;
lokitoth marked this conversation as resolved.
Show resolved Hide resolved

// If this is a real example, we need to move it to the output multi_ex; we also
// need to create a new example to replace it for the next run through the parser.
if (!temp_ex[0]->is_newline)
{
examples.push_back(&example_factory());
std::swap(examples[examples.size() - 1], temp_ex[0]);
// We avoid doing moves or copy construction here because multi_ex contains
// example pointers. The compile-time code here is meant to call attention
// to here if the underlying type changes.
using temp_ex_element_t = std::remove_reference<decltype(temp_ex[0])>::type;
using examples_element_t = std::remove_reference<decltype(examples[0])>::type;

static_assert(std::is_same<temp_ex_element_t, examples_element_t>::value &&
std::is_same<temp_ex_element_t, VW::example*>::value,
"temp_ex and example must be vector-like over VW::example*");

examples.push_back(temp_ex[0]);

// Since we are using a vector of pointers, we can simply reassign the slot to
// the pointer of the newly created destination example for the parser.
temp_ex[0] = &example_factory();
}
} while (has_more);

VW::finish_example(*all, temp_ex);
return true;
return VW::experimental::error_code::success;
}

const VW::parsers::flatbuffer::ExampleRoot* parser::data() { return _data; }
Expand Down Expand Up @@ -198,16 +237,17 @@ int parser::process_collection_item(VW::workspace* all, VW::multi_ex& examples,
{
_active_multi_ex = true;
_multi_example_object = _data->example_obj_as_ExampleCollection()->multi_examples()->Get(_example_index);

// read from active multi_ex
RETURN_IF_FAIL(parse_multi_example(all, examples[0], _multi_example_object, status));
// read from active collection

// if we are done with the multi example, move to the next one, or finish the collection
if (!_active_multi_ex)
{
_example_index++;
if (_example_index == _data->example_obj_as_ExampleCollection()->multi_examples()->size())
{
_example_index = 0;
_active_collection = false;
reset_active_collection();
}
}
}
Expand All @@ -216,11 +256,7 @@ int parser::process_collection_item(VW::workspace* all, VW::multi_ex& examples,
const auto ex = _data->example_obj_as_ExampleCollection()->examples()->Get(_example_index);
RETURN_IF_FAIL(parse_example(all, examples[0], ex, status));
_example_index++;
if (_example_index == _data->example_obj_as_ExampleCollection()->examples()->size())
{
_example_index = 0;
_active_collection = false;
}
if (_example_index == _data->example_obj_as_ExampleCollection()->examples()->size()) { reset_active_collection(); }
}
return VW::experimental::error_code::success;
}
Expand All @@ -231,6 +267,20 @@ int parser::parse_examples(VW::workspace* all, io_buf& buf, VW::multi_ex& exampl
#define RETURN_SUCCESS_FINISHED() \
return buffer_pointer ? VW::experimental::error_code::nothing_to_parse : VW::experimental::error_code::success;

// If we are re-using a single parser instance across multiple invocations, we need to reset
// the state when we get a new buffer_pointer. Otherwise we may be in the middle of a multi_ex
// or example_collection, and the following parse will attempt to reuse the object references
// from the previous buffer, which may have been deallocated.
// TODO: Rewrite the parser to avoid this convoluted, re-entrant logic.
if (buffer_pointer && _flatbuffer_pointer != buffer_pointer)
{
reset_active_multi_ex();
reset_active_collection();
}

// The ExampleCollection processing code owns dispatching to parse_multi_example to handle
// iteration through the outer collection correctly, thus it must have the first chance to
// incoming parse request.
if (_active_collection)
{
RETURN_IF_FAIL(process_collection_item(all, examples, status));
Expand Down Expand Up @@ -307,9 +357,7 @@ int parser::parse_multi_example(
{
// done with multi example, send a newline example and reset
ae->is_newline = true;
_multi_ex_index = 0;
_active_multi_ex = false;
_multi_example_object = nullptr;
reset_active_multi_ex();
return VW::experimental::error_code::success;
}

Expand All @@ -325,30 +373,11 @@ int parser::get_namespace_index(const Namespace* ns, namespace_index& ni, VW::ex
ni = static_cast<uint8_t>(ns->name()->c_str()[0]);
return VW::experimental::error_code::success;
}
else if (flatbuffers::IsFieldPresent(ns, Namespace::VT_HASH))
else
{
ni = ns->hash();
return VW::experimental::error_code::success;
}

if (_active_collection && _active_multi_ex)
{
RETURN_ERROR_LS(status, fb_parser_name_hash_missing)
<< "Either name or hash field must be specified to get the namespace index in collection item with example "
"index "
<< _example_index << "and multi example index " << _multi_ex_index;
}
else if (_active_multi_ex)
{
RETURN_ERROR_LS(status, fb_parser_name_hash_missing)
<< "Either name or hash field must be specified to get the namespace index in multi example index "
<< _multi_ex_index;
}
else
{
RETURN_ERROR_LS(status, fb_parser_name_hash_missing)
<< "Either name or hash field must be specified to get the namespace index";
}
}

bool get_namespace_hash(VW::workspace* all, const Namespace* ns, uint64_t& hash)
Expand Down Expand Up @@ -462,7 +491,7 @@ int parser::parse_namespaces(VW::workspace* all, example* ae, const Namespace* n
}
else
{
if (!has_hashes) { RETURN_NS_PARSER_ERROR(status, fb_parser_name_hash_missing) }
if (!has_hashes) { RETURN_NS_PARSER_ERROR(status, fb_parser_feature_hashes_names_missing) }

if (ns->feature_hashes()->size() != ns->feature_values()->size())
{
Expand Down Expand Up @@ -541,6 +570,7 @@ int parser::parse_flat_label(
break;
}
case Label_NONE:
case Label_no_label:
break;
default:
if (_active_collection && _active_multi_ex)
Expand Down
1 change: 1 addition & 0 deletions vowpalwabbit/fb_parser/src/parse_label.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// license as described in the file LICENSE.

#include "vw/core/action_score.h"
#include "vw/core/api_status.h"
#include "vw/core/best_constant.h"
#include "vw/core/cb.h"
#include "vw/core/constant.h"
Expand Down
Loading
Loading