Skip to content

Commit

Permalink
Merge pull request #1780 from fatedier/dev
Browse files Browse the repository at this point in the history
bump version
  • Loading branch information
fatedier authored Apr 27, 2020
2 parents 8668fef + 7266154 commit 2406ecd
Show file tree
Hide file tree
Showing 28 changed files with 1,161 additions and 98 deletions.
10 changes: 10 additions & 0 deletions client/admin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type StatusResp struct {
Https []ProxyStatusResp `json:"https"`
Stcp []ProxyStatusResp `json:"stcp"`
Xtcp []ProxyStatusResp `json:"xtcp"`
Sudp []ProxyStatusResp `json:"sudp"`
}

type ProxyStatusResp struct {
Expand Down Expand Up @@ -155,6 +156,11 @@ func NewProxyStatusResp(status *proxy.ProxyStatus, serverAddr string) ProxyStatu
psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort)
}
psr.Plugin = cfg.Plugin
case *config.SudpProxyConf:
if cfg.LocalPort != 0 {
psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort)
}
psr.Plugin = cfg.Plugin
}
return psr
}
Expand All @@ -171,6 +177,7 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) {
res.Https = make([]ProxyStatusResp, 0)
res.Stcp = make([]ProxyStatusResp, 0)
res.Xtcp = make([]ProxyStatusResp, 0)
res.Sudp = make([]ProxyStatusResp, 0)

log.Info("Http request [/api/status]")
defer func() {
Expand All @@ -194,6 +201,8 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) {
res.Stcp = append(res.Stcp, NewProxyStatusResp(status, svr.cfg.ServerAddr))
case "xtcp":
res.Xtcp = append(res.Xtcp, NewProxyStatusResp(status, svr.cfg.ServerAddr))
case "sudp":
res.Sudp = append(res.Sudp, NewProxyStatusResp(status, svr.cfg.ServerAddr))
}
}
sort.Sort(ByProxyStatusResp(res.Tcp))
Expand All @@ -202,6 +211,7 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) {
sort.Sort(ByProxyStatusResp(res.Https))
sort.Sort(ByProxyStatusResp(res.Stcp))
sort.Sort(ByProxyStatusResp(res.Xtcp))
sort.Sort(ByProxyStatusResp(res.Sudp))
return
}

Expand Down
151 changes: 151 additions & 0 deletions client/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.Cl
BaseProxy: &baseProxy,
cfg: cfg,
}
case *config.SudpProxyConf:
pxy = &SudpProxy{
BaseProxy: &baseProxy,
cfg: cfg,
closeCh: make(chan struct{}),
}
}
return
}
Expand Down Expand Up @@ -540,6 +546,151 @@ func (pxy *UdpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh)
}

type SudpProxy struct {
*BaseProxy

cfg *config.SudpProxyConf

localAddr *net.UDPAddr

closeCh chan struct{}
}

func (pxy *SudpProxy) Run() (err error) {
pxy.localAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.cfg.LocalIp, pxy.cfg.LocalPort))
if err != nil {
return
}
return
}

func (pxy *SudpProxy) Close() {
pxy.mu.Lock()
defer pxy.mu.Unlock()
select {
case <-pxy.closeCh:
return
default:
close(pxy.closeCh)
}
}

func (pxy *SudpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
xl := pxy.xl
xl.Info("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String())

if pxy.limiter != nil {
rwc := frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
return conn.Close()
})
conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
}

workConn := conn
readCh := make(chan *msg.UdpPacket, 1024)
sendCh := make(chan msg.Message, 1024)
isClose := false

mu := &sync.Mutex{}

closeFn := func() {
mu.Lock()
defer mu.Unlock()
if isClose {
return
}

isClose = true
if workConn != nil {
workConn.Close()
}
close(readCh)
close(sendCh)
}

// udp service <- frpc <- frps <- frpc visitor <- user
workConnReaderFn := func(conn net.Conn, readCh chan *msg.UdpPacket) {
defer closeFn()

for {
// first to check sudp proxy is closed or not
select {
case <-pxy.closeCh:
xl.Trace("frpc sudp proxy is closed")
return
default:
}

var udpMsg msg.UdpPacket
if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
xl.Warn("read from workConn for sudp error: %v", errRet)
return
}

if errRet := errors.PanicToError(func() {
readCh <- &udpMsg
}); errRet != nil {
xl.Warn("reader goroutine for sudp work connection closed: %v", errRet)
return
}
}
}

// udp service -> frpc -> frps -> frpc visitor -> user
workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
defer func() {
closeFn()
xl.Info("writer goroutine for sudp work connection closed")
}()

var errRet error
for rawMsg := range sendCh {
switch m := rawMsg.(type) {
case *msg.UdpPacket:
xl.Trace("frpc send udp package to frpc visitor, [udp local: %v, remote: %v], [tcp work conn local: %v, remote: %v]",
m.LocalAddr.String(), m.RemoteAddr.String(), conn.LocalAddr().String(), conn.RemoteAddr().String())
case *msg.Ping:
xl.Trace("frpc send ping message to frpc visitor")
}

if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
xl.Error("sudp work write error: %v", errRet)
return
}
}
}

heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
ticker := time.NewTicker(30 * time.Second)
defer func() {
ticker.Stop()
closeFn()
}()

var errRet error
for {
select {
case <-ticker.C:
if errRet = errors.PanicToError(func() {
sendCh <- &msg.Ping{}
}); errRet != nil {
xl.Warn("heartbeat goroutine for sudp work connection closed")
return
}
case <-pxy.closeCh:
xl.Trace("frpc sudp proxy is closed")
return
}
}
}

go workConnSenderFn(workConn, sendCh)
go workConnReaderFn(workConn, readCh)
go heartbeatFn(workConn, sendCh)

udp.Forwarder(pxy.localAddr, readCh, sendCh)
}

// Common handler for tcp work connections.
func HandleTcpWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) {
Expand Down
Loading

0 comments on commit 2406ecd

Please sign in to comment.