From 87ed9a5f1e49dfd544e42dff9743cb09ce205370 Mon Sep 17 00:00:00 2001 From: Jacob Alber <jaalber@microsoft.com> Date: Wed, 14 Feb 2024 14:10:13 -0500 Subject: [PATCH] feat: read_span_flatbuffer: support cleaning up spare examples correctly Consumers of VW as a library can provide their own event pools, etc. Previous parsers were always able to predict when an even would be needed ahead of time, so would only allocate when necessary. This was done by relying on a single incoming event preallocation to let the external host deallocate in the case of nothing to be parsed. This does not work for the FB parser due to how it handles re-entrancy, and we do not want to spend the time re-architecting it to avoid this. The fix, in this case, is to expand the API to include a callback to return spare events back to the host's event pool. --- .../vw/fb_parser/parse_example_flatbuffer.h | 6 ++++- .../fb_parser/src/parse_example_flatbuffer.cc | 24 +++++++++++++++---- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/vowpalwabbit/fb_parser/include/vw/fb_parser/parse_example_flatbuffer.h b/vowpalwabbit/fb_parser/include/vw/fb_parser/parse_example_flatbuffer.h index fa181c1ea46..70836775c22 100644 --- a/vowpalwabbit/fb_parser/include/vw/fb_parser/parse_example_flatbuffer.h +++ b/vowpalwabbit/fb_parser/include/vw/fb_parser/parse_example_flatbuffer.h @@ -16,13 +16,17 @@ namespace VW class api_status; +using example_sink_f = std::function<void(VW::multi_ex&& spare_examples)>; + namespace parsers { namespace flatbuffer { int flatbuffer_to_examples(VW::workspace* all, io_buf& buf, VW::multi_ex& examples); + + bool read_span_flatbuffer( - VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory, VW::multi_ex& examples); + VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory, VW::multi_ex& examples, example_sink_f example_sink = nullptr); class parser { diff --git a/vowpalwabbit/fb_parser/src/parse_example_flatbuffer.cc b/vowpalwabbit/fb_parser/src/parse_example_flatbuffer.cc index f70e61f6a93..41d27e7a953 100644 --- a/vowpalwabbit/fb_parser/src/parse_example_flatbuffer.cc +++ b/vowpalwabbit/fb_parser/src/parse_example_flatbuffer.cc @@ -43,9 +43,12 @@ int flatbuffer_to_examples(VW::workspace* all, io_buf& buf, VW::multi_ex& exampl return static_cast<int>(status.get_error_code() == VW::experimental::error_code::success); } -bool read_span_flatbuffer( - VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory, VW::multi_ex& examples) +bool read_span_flatbuffer(VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory, + VW::multi_ex& examples, example_sink_f example_sink) { + int a = 0; + a++; + // we expect context to contain a size_prefixed flatbuffer (technically a binary string) // which means: // @@ -84,7 +87,18 @@ bool read_span_flatbuffer( } VW::multi_ex temp_ex; - temp_ex.push_back(&example_factory()); + + // There is a bit of unhappiness with the interface of the read_XYZ_<format>() functions, because they often + // expect the input multi_ex to have a single "empty" example there. This contributes, in part, to the large + // proliferation of entry points into the JSON parser(s). We want to avoid exposing that insofar as possible, + // so we will check whether we already received a perfectly good example and use that, or create a new one if + // needed. + if (examples.size() > 0) + { + temp_ex.push_back(examples[0]); + examples.pop_back(); + } + else { temp_ex.push_back(&example_factory()); } bool has_more = true; VW::experimental::api_status status; @@ -113,7 +127,9 @@ bool read_span_flatbuffer( } } while (has_more); - VW::finish_example(*all, temp_ex); + if (example_sink == nullptr) { VW::finish_example(*all, temp_ex); } + else { example_sink(std::move(temp_ex)); } + return true; }