Skip to content

Commit

Permalink
[filter] Added connection state callback to the connectHTTPTunnel() f…
Browse files Browse the repository at this point in the history
…ilter
pajama-coder committed Dec 17, 2024

Verified

This commit was signed with the committer’s verified signature.
agliga Andrew Gliga
1 parent 7d98726 commit e0d52c1
Showing 4 changed files with 58 additions and 5 deletions.
9 changes: 5 additions & 4 deletions src/api/pipeline-api.cpp
Original file line number Diff line number Diff line change
@@ -174,8 +174,8 @@ void PipelineDesigner::connect(const pjs::Value &target, pjs::Object *options) {
}
}

void PipelineDesigner::connect_http_tunnel(pjs::Object *handshake) {
require_sub_pipeline(append_filter(new http::TunnelClient(handshake)));
void PipelineDesigner::connect_http_tunnel(pjs::Object *handshake, pjs::Object *options) {
require_sub_pipeline(append_filter(new http::TunnelClient(handshake, options)));
}

void PipelineDesigner::connect_proxy_protocol(const pjs::Value &address) {
@@ -750,8 +750,9 @@ template<> void ClassDef<PipelineDesigner>::init() {
// PipelineDesigner.connectHTTPTunnel
filter("connectHTTPTunnel", [](Context &ctx, PipelineDesigner *obj) {
Object *handshake;
if (!ctx.arguments(1, &handshake)) return;
obj->connect_http_tunnel(handshake);
Object *options = nullptr;
if (!ctx.arguments(1, &handshake, &options)) return;
obj->connect_http_tunnel(handshake, options);
});

// PipelineDesigner.connectProxyProtocol
2 changes: 1 addition & 1 deletion src/api/pipeline-api.hpp
Original file line number Diff line number Diff line change
@@ -63,7 +63,7 @@ class PipelineDesigner : public pjs::ObjectTemplate<PipelineDesigner> {
void compress(const pjs::Value &algorithm);
void compress_http(const pjs::Value &algorithm);
void connect(const pjs::Value &target, pjs::Object *options);
void connect_http_tunnel(pjs::Object *handshake);
void connect_http_tunnel(pjs::Object *handshake, pjs::Object *options);
void connect_proxy_protocol(const pjs::Value &address);
void connect_socks(const pjs::Value &address);
void connect_tls(pjs::Object *options);
36 changes: 36 additions & 0 deletions src/filters/http.cpp
Original file line number Diff line number Diff line change
@@ -1909,14 +1909,27 @@ void TunnelServer::start_tunnel(Message *response) {
// TunnelClient
//

TunnelClient::Options::Options(pjs::Object *options) {
Value(options, "onState")
.get(on_state_f)
.check_nullable();
}

TunnelClient::TunnelClient(pjs::Object *handshake)
: m_handshake(handshake)
{
}

TunnelClient::TunnelClient(pjs::Object *handshake, const Options &options)
: m_handshake(handshake)
, m_options(options)
{
}

TunnelClient::TunnelClient(const TunnelClient &r)
: Filter(r)
, m_handshake(r.m_handshake)
, m_options(r.m_options)
{
}

@@ -1941,11 +1954,18 @@ void TunnelClient::reset() {
m_request_head = nullptr;
m_response_head = nullptr;
m_eos = nullptr;
m_on_state_change = nullptr;
m_is_tunnel_started = false;
}

void TunnelClient::process(Event *evt) {
if (!m_pipeline) {
if (m_options.on_state_f) {
m_on_state_change = [=](State state) {
pjs::Value arg(pjs::EnumDef<State>::name(state)), ret;
Filter::callback(m_options.on_state_f, 1, &arg, ret);
};
}
pjs::Ref<pjs::Object> handshake;
if (m_handshake) {
if (m_handshake->is_instance_of<Message>()) {
@@ -1964,6 +1984,9 @@ void TunnelClient::process(Event *evt) {
m_request_head = pjs::coerce<RequestHead>(msg->head());
m_pipeline = sub_pipeline(0, true, EventSource::reply())->start();
Filter::output(msg->as<Message>(), m_pipeline->input());
if (m_on_state_change) {
m_on_state_change(State::connecting);
}
}

if (m_is_tunnel_started) {
@@ -1992,12 +2015,18 @@ void TunnelClient::on_reply(Event *evt) {
auto tt = m_request_head->tunnel_type();
if (m_response_head->is_tunnel_ok(tt)) {
m_is_tunnel_started = true;
if (m_on_state_change) {
m_on_state_change(State::connected);
}
if (m_eos) {
EventFunction::input()->input_async(m_eos);
} else {
EventFunction::input()->flush_async();
}
} else {
if (m_on_state_change) {
m_on_state_change(State::closed);
}
Filter::output(StreamEnd::make());
}
m_request_head = nullptr;
@@ -2024,4 +2053,11 @@ template<> void ClassDef<Server::Handler>::init() {
super<Promise::Callback>();
}

template<> void EnumDef<TunnelClient::State>::init() {
define(TunnelClient::State::idle, "idle");
define(TunnelClient::State::connecting, "connecting");
define(TunnelClient::State::connected, "connected");
define(TunnelClient::State::closed, "closed");
}

} // namespace pjs
16 changes: 16 additions & 0 deletions src/filters/http.hpp
Original file line number Diff line number Diff line change
@@ -529,7 +529,21 @@ class TunnelServer : public Filter {

class TunnelClient : public Filter, public EventSource {
public:
enum class State {
idle,
connecting,
connected,
closed,
};

struct Options : public pipy::Options {
pjs::Ref<pjs::Function> on_state_f;
Options() {}
Options(pjs::Object *options);
};

TunnelClient(pjs::Object *handshake);
TunnelClient(pjs::Object *handshake, const Options &options);

private:
TunnelClient(const TunnelClient &r);
@@ -547,6 +561,8 @@ class TunnelClient : public Filter, public EventSource {
pjs::Ref<ResponseHead> m_response_head;
pjs::Ref<StreamEnd> m_eos;
Data m_buffer;
Options m_options;
std::function<void(State state)> m_on_state_change;
bool m_is_tunnel_started = false;
};

0 comments on commit e0d52c1

Please sign in to comment.