From e0d52c1a2447f8a07e1f39aceb6fd190ad9a3cac Mon Sep 17 00:00:00 2001 From: pajama-coder Date: Tue, 17 Dec 2024 12:22:27 +0800 Subject: [PATCH] [filter] Added connection state callback to the connectHTTPTunnel() filter --- src/api/pipeline-api.cpp | 9 +++++---- src/api/pipeline-api.hpp | 2 +- src/filters/http.cpp | 36 ++++++++++++++++++++++++++++++++++++ src/filters/http.hpp | 16 ++++++++++++++++ 4 files changed, 58 insertions(+), 5 deletions(-) diff --git a/src/api/pipeline-api.cpp b/src/api/pipeline-api.cpp index 12be36a5..4cab5aab 100644 --- a/src/api/pipeline-api.cpp +++ b/src/api/pipeline-api.cpp @@ -174,8 +174,8 @@ void PipelineDesigner::connect(const pjs::Value &target, pjs::Object *options) { } } -void PipelineDesigner::connect_http_tunnel(pjs::Object *handshake) { - require_sub_pipeline(append_filter(new http::TunnelClient(handshake))); +void PipelineDesigner::connect_http_tunnel(pjs::Object *handshake, pjs::Object *options) { + require_sub_pipeline(append_filter(new http::TunnelClient(handshake, options))); } void PipelineDesigner::connect_proxy_protocol(const pjs::Value &address) { @@ -750,8 +750,9 @@ template<> void ClassDef::init() { // PipelineDesigner.connectHTTPTunnel filter("connectHTTPTunnel", [](Context &ctx, PipelineDesigner *obj) { Object *handshake; - if (!ctx.arguments(1, &handshake)) return; - obj->connect_http_tunnel(handshake); + Object *options = nullptr; + if (!ctx.arguments(1, &handshake, &options)) return; + obj->connect_http_tunnel(handshake, options); }); // PipelineDesigner.connectProxyProtocol diff --git a/src/api/pipeline-api.hpp b/src/api/pipeline-api.hpp index 5c2b14ba..9245d13b 100644 --- a/src/api/pipeline-api.hpp +++ b/src/api/pipeline-api.hpp @@ -63,7 +63,7 @@ class PipelineDesigner : public pjs::ObjectTemplate { void compress(const pjs::Value &algorithm); void compress_http(const pjs::Value &algorithm); void connect(const pjs::Value &target, pjs::Object *options); - void connect_http_tunnel(pjs::Object *handshake); + void connect_http_tunnel(pjs::Object *handshake, pjs::Object *options); void connect_proxy_protocol(const pjs::Value &address); void connect_socks(const pjs::Value &address); void connect_tls(pjs::Object *options); diff --git a/src/filters/http.cpp b/src/filters/http.cpp index 8ee80a28..f4dcc974 100644 --- a/src/filters/http.cpp +++ b/src/filters/http.cpp @@ -1909,14 +1909,27 @@ void TunnelServer::start_tunnel(Message *response) { // TunnelClient // +TunnelClient::Options::Options(pjs::Object *options) { + Value(options, "onState") + .get(on_state_f) + .check_nullable(); +} + TunnelClient::TunnelClient(pjs::Object *handshake) : m_handshake(handshake) { } +TunnelClient::TunnelClient(pjs::Object *handshake, const Options &options) + : m_handshake(handshake) + , m_options(options) +{ +} + TunnelClient::TunnelClient(const TunnelClient &r) : Filter(r) , m_handshake(r.m_handshake) + , m_options(r.m_options) { } @@ -1941,11 +1954,18 @@ void TunnelClient::reset() { m_request_head = nullptr; m_response_head = nullptr; m_eos = nullptr; + m_on_state_change = nullptr; m_is_tunnel_started = false; } void TunnelClient::process(Event *evt) { if (!m_pipeline) { + if (m_options.on_state_f) { + m_on_state_change = [=](State state) { + pjs::Value arg(pjs::EnumDef::name(state)), ret; + Filter::callback(m_options.on_state_f, 1, &arg, ret); + }; + } pjs::Ref handshake; if (m_handshake) { if (m_handshake->is_instance_of()) { @@ -1964,6 +1984,9 @@ void TunnelClient::process(Event *evt) { m_request_head = pjs::coerce(msg->head()); m_pipeline = sub_pipeline(0, true, EventSource::reply())->start(); Filter::output(msg->as(), m_pipeline->input()); + if (m_on_state_change) { + m_on_state_change(State::connecting); + } } if (m_is_tunnel_started) { @@ -1992,12 +2015,18 @@ void TunnelClient::on_reply(Event *evt) { auto tt = m_request_head->tunnel_type(); if (m_response_head->is_tunnel_ok(tt)) { m_is_tunnel_started = true; + if (m_on_state_change) { + m_on_state_change(State::connected); + } if (m_eos) { EventFunction::input()->input_async(m_eos); } else { EventFunction::input()->flush_async(); } } else { + if (m_on_state_change) { + m_on_state_change(State::closed); + } Filter::output(StreamEnd::make()); } m_request_head = nullptr; @@ -2024,4 +2053,11 @@ template<> void ClassDef::init() { super(); } +template<> void EnumDef::init() { + define(TunnelClient::State::idle, "idle"); + define(TunnelClient::State::connecting, "connecting"); + define(TunnelClient::State::connected, "connected"); + define(TunnelClient::State::closed, "closed"); +} + } // namespace pjs diff --git a/src/filters/http.hpp b/src/filters/http.hpp index 5c26899c..4cb65779 100644 --- a/src/filters/http.hpp +++ b/src/filters/http.hpp @@ -529,7 +529,21 @@ class TunnelServer : public Filter { class TunnelClient : public Filter, public EventSource { public: + enum class State { + idle, + connecting, + connected, + closed, + }; + + struct Options : public pipy::Options { + pjs::Ref on_state_f; + Options() {} + Options(pjs::Object *options); + }; + TunnelClient(pjs::Object *handshake); + TunnelClient(pjs::Object *handshake, const Options &options); private: TunnelClient(const TunnelClient &r); @@ -547,6 +561,8 @@ class TunnelClient : public Filter, public EventSource { pjs::Ref m_response_head; pjs::Ref m_eos; Data m_buffer; + Options m_options; + std::function m_on_state_change; bool m_is_tunnel_started = false; };