Skip to content

Commit

Permalink
[filter] Support Promises from the callback to acceptHTTPTunnel() filter
Browse files Browse the repository at this point in the history
  • Loading branch information
pajama-coder committed Jul 22, 2024
1 parent 4425fab commit 40ec6b3
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 7 deletions.
47 changes: 40 additions & 7 deletions src/filters/http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1797,14 +1797,22 @@ auto TunnelServer::clone() -> Filter* {
void TunnelServer::reset() {
Filter::reset();
m_pipeline = nullptr;
m_request_head = nullptr;
m_promise_callback = nullptr;
m_buffer.clear();
m_message_reader.reset();
}

void TunnelServer::process(Event *evt) {
if (m_pipeline) {
m_pipeline->input()->input(evt);

} else if (m_promise_callback) {
m_buffer.push(evt);

} else if (auto req = m_message_reader.read(evt)) {
m_request_head = pjs::coerce<RequestHead>(req->head());

pjs::Value arg(req), ret;
req->release();
if (!callback(m_handler, 1, &arg, ret)) return;
Expand All @@ -1814,20 +1822,45 @@ void TunnelServer::process(Event *evt) {
res = Message::make();
} else if (ret.is_instance_of<Message>()) {
res = ret.as<Message>();
} else if (ret.is_promise()) {
m_promise_callback = pjs::Promise::Callback::make(
[this](pjs::Promise::State state, const pjs::Value &v) {
on_resolve(state, v);
}
);
ret.as<pjs::Promise>()->then(Filter::context(), m_promise_callback->resolved());
return;
} else {
Filter::error("handler did not return a Message");
return;
}

pjs::Ref<RequestHead> req_head = pjs::coerce<RequestHead>(req->head());
pjs::Ref<ResponseHead> res_head = pjs::coerce<ResponseHead>(res->head());
if (res_head->is_tunnel_ok(req_head->tunnel_type())) {
m_pipeline = sub_pipeline(0, true, Filter::output());
}
start_tunnel(res);
}
}

void TunnelServer::on_resolve(pjs::Promise::State state, const pjs::Value &value) {
pjs::Ref<Message> res;
if (value.is_nullish()) {
start_tunnel(Message::make());
} else if (value.is_instance_of<Message>()) {
start_tunnel(value.as<Message>());
} else {
Filter::error("Promise did not resolve to a Message");
}
}

void TunnelServer::start_tunnel(Message *response) {
pjs::Ref<ResponseHead> response_head = pjs::coerce<ResponseHead>(response->head());
if (response_head->is_tunnel_ok(m_request_head->tunnel_type())) {
m_pipeline = sub_pipeline(0, true, Filter::output());
}

res->write(Filter::output());
response->write(Filter::output());

if (m_pipeline) m_pipeline->start();
if (m_pipeline) {
m_pipeline->start();
m_buffer.flush(m_pipeline->input());
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/filters/http.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,13 @@ class TunnelServer : public Filter {

pjs::Ref<pjs::Function> m_handler;
pjs::Ref<Pipeline> m_pipeline;
pjs::Ref<RequestHead> m_request_head;
pjs::Ref<pjs::Promise::Callback> m_promise_callback;
EventBuffer m_buffer;
MessageReader m_message_reader;

void on_resolve(pjs::Promise::State state, const pjs::Value &value);
void start_tunnel(Message *response);
};

//
Expand Down

0 comments on commit 40ec6b3

Please sign in to comment.