Skip to content

Commit

Permalink
Merge pull request #36 from zaihang365/33_using_writev
Browse files Browse the repository at this point in the history
Impl: reduce syscall and memcopy for multiple package
  • Loading branch information
AlexStocks authored Apr 8, 2020
2 parents 52cde59 + 7df84f8 commit 45d3d7d
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 51 deletions.
27 changes: 27 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -122,7 +123,21 @@ func TestTCPClient(t *testing.T) {
ss.SetCompressType(CompressNone)
conn := ss.(*session).Connection.(*gettyTCPConn)
assert.True(t, conn.compress == CompressNone)
beforeWriteBytes := atomic.LoadUint32(&conn.writeBytes)
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
_, err = conn.send([]byte("hello"))
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
assert.Nil(t, err)
assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
err = ss.WriteBytes([]byte("hello"))
assert.Equal(t, beforeWriteBytes+10, atomic.LoadUint32(&conn.writeBytes))
assert.Equal(t, beforeWritePkgNum+2, atomic.LoadUint32(&conn.writePkgNum))
assert.Nil(t, err)
var pkgs [][]byte
pkgs = append(pkgs, []byte("hello"), []byte("hello"))
_, err = conn.send(pkgs)
assert.Equal(t, beforeWritePkgNum+4, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWriteBytes+20, atomic.LoadUint32(&conn.writeBytes))
assert.Nil(t, err)
ss.SetCompressType(CompressSnappy)
assert.True(t, conn.compress == CompressSnappy)
Expand Down Expand Up @@ -194,7 +209,14 @@ func TestUDPClient(t *testing.T) {
_, err = udpConn.send(udpCtx)
assert.NotNil(t, err)
udpCtx.Pkg = []byte("hello")
beforeWriteBytes := atomic.LoadUint32(&udpConn.writeBytes)
_, err = udpConn.send(udpCtx)
assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&udpConn.writeBytes))
assert.Nil(t, err)

beforeWritePkgNum := atomic.LoadUint32(&udpConn.writePkgNum)
err = ss.WritePkg(udpCtx, 0)
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&udpConn.writePkgNum))
assert.Nil(t, err)

clt.Close()
Expand Down Expand Up @@ -247,8 +269,13 @@ func TestNewWSClient(t *testing.T) {
assert.Nil(t, err)
_, err = conn.send("hello")
assert.NotNil(t, err)
beforeWriteBytes := atomic.LoadUint32(&conn.writeBytes)
_, err = conn.send([]byte("hello"))
assert.Nil(t, err)
assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
err = ss.WriteBytes([]byte("hello"))
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
err = conn.writePing()
assert.Nil(t, err)

Expand Down
35 changes: 24 additions & 11 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,6 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
length int
)

if p, ok = pkg.([]byte); !ok {
return 0, perrors.Errorf("illegal @pkg{%#v} type", pkg)
}
if t.compress == CompressNone && t.wTimeout > 0 {
// Optimization: update write deadline only if more than 25%
// of the last write deadline exceeded.
Expand All @@ -281,12 +278,28 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
}
}

if length, err = t.writer.Write(p); err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
if buffers, ok := pkg.([][]byte); ok {
netBuf := net.Buffers(buffers)
if length, err := netBuf.WriteTo(t.conn); err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(length))
atomic.AddUint32(&t.writePkgNum, (uint32)(len(buffers)))
}
log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
return int(length), perrors.WithStack(err)
}
log.Debugf("now:%s, length:%d, err:%v", currentTime, length, err)
return length, perrors.WithStack(err)
//return length, err

if p, ok = pkg.([]byte); ok {
if length, err = t.writer.Write(p); err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
atomic.AddUint32(&t.writePkgNum, 1)
}
log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
return length, perrors.WithStack(err)
}

return 0, perrors.Errorf("illegal @pkg{%#v} type", pkg)
}

// close tcp connection
Expand Down Expand Up @@ -437,11 +450,11 @@ func (u *gettyUDPConn) send(udpCtx interface{}) (int, error) {

if length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr); err == nil {
atomic.AddUint32(&u.writeBytes, (uint32)(len(buf)))
atomic.AddUint32(&u.writePkgNum, 1)
}
log.Debugf("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%v}", peerAddr, length, err)

return length, perrors.WithStack(err)
//return length, err
}

// close udp connection
Expand Down Expand Up @@ -531,7 +544,7 @@ func (w *gettyWSConn) recv() ([]byte, error) {
// gorilla/websocket/conn.go:NextReader will always fail when got a timeout error.
_, b, e := w.conn.ReadMessage() // the first return value is message type.
if e == nil {
w.incReadPkgNum()
atomic.AddUint32(&w.readBytes, (uint32)(len(b)))
} else {
if websocket.IsUnexpectedCloseError(e, websocket.CloseGoingAway) {
log.Warnf("websocket unexpected close error: %v", e)
Expand Down Expand Up @@ -579,9 +592,9 @@ func (w *gettyWSConn) send(pkg interface{}) (int, error) {
w.updateWriteDeadline()
if err = w.conn.WriteMessage(websocket.BinaryMessage, p); err == nil {
atomic.AddUint32(&w.writeBytes, (uint32)(len(p)))
atomic.AddUint32(&w.writePkgNum, 1)
}
return len(p), perrors.WithStack(err)
//return len(p), err
}

func (w *gettyWSConn) writePing() error {
Expand Down
47 changes: 7 additions & 40 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (s *session) SetWQLen(writeQLen int) {
s.lock.Lock()
defer s.lock.Unlock()
s.wQ = make(chan interface{}, writeQLen)
log.Debug("%s, [session.SetWQLen] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
log.Debugf("%s, [session.SetWQLen] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
}

// set maximum wait time when session got error or got exit signal
Expand Down Expand Up @@ -404,10 +404,9 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
}
_, err = s.Connection.send(pkg)
if err != nil {
log.Warn("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%v", s.Stat(), pkg, err)
log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%v", s.Stat(), pkg, err)
return perrors.WithStack(err)
}
s.incWritePkgNum()
return nil
}
select {
Expand All @@ -432,9 +431,6 @@ func (s *session) WriteBytes(pkg []byte) error {
if _, err := s.Connection.send(pkg); err != nil {
return perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg))
}

s.incWritePkgNum()

return nil
}

Expand All @@ -449,39 +445,10 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
return s.WriteBytes(pkgs[0])
}

// get len
var (
l int
err error
length int
arrp *[]byte
arr []byte
)
length = 0
for i := 0; i < len(pkgs); i++ {
length += len(pkgs[i])
}

// merge the pkgs
// arr = make([]byte, length)
arrp = gxbytes.GetBytes(length)
defer gxbytes.PutBytes(arrp)
arr = *arrp
l = 0
for i := 0; i < len(pkgs); i++ {
copy(arr[l:], pkgs[i])
l += len(pkgs[i])
// TODO Currently, only TCP is supported.
if _, err := s.Connection.send(pkgs); err != nil {
return perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs))
}

if err = s.WriteBytes(arr); err != nil {
return perrors.WithStack(err)
}

num := len(pkgs) - 1
for i := 0; i < num; i++ {
s.incWritePkgNum()
}

return nil
}

Expand Down Expand Up @@ -567,7 +534,7 @@ LOOP:
continue
}
if !flag {
log.Warn("[session.handleLoop] drop write out package %#v", outPkg)
log.Warnf("[session.handleLoop] drop write out package %#v", outPkg)
continue
}

Expand Down Expand Up @@ -958,6 +925,6 @@ func (s *session) gc() {
// or (session)handleLoop automatically. It's thread safe.
func (s *session) Close() {
s.stop()
log.Info("%s closed now. its current gr num is %d",
log.Infof("%s closed now. its current gr num is %d",
s.sessionToken(), atomic.LoadInt32(&(s.grNum)))
}

0 comments on commit 45d3d7d

Please sign in to comment.