-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathslowpoke.go
103 lines (82 loc) · 2.27 KB
/
slowpoke.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package main
import (
"io"
"net"
"time"
"github.com/op/go-logging"
)
type Slowpoke struct {
conn net.Conn
targetAddr *net.TCPAddr
latency time.Duration
bufferSize int
isClosed bool
close chan bool
logger *logging.Logger
}
func NewSlowpoke(conn net.Conn, targetAddr *net.TCPAddr, latency time.Duration, bufferSize int, logger *logging.Logger) *Slowpoke {
return &Slowpoke{
conn: conn,
targetAddr: targetAddr,
latency: latency,
bufferSize: bufferSize,
isClosed: false,
close: make(chan bool),
logger: logger,
}
}
func (s *Slowpoke) StartTransfer() {
defer s.conn.Close()
target, err := net.DialTCP("tcp", nil, s.targetAddr)
if err != nil {
s.logger.Errorf("Failed to connect to target address %s:\n%v", s.targetAddr, err)
return
}
defer target.Close()
s.logger.Debugf("Established connection to %s", target.RemoteAddr())
go s.transferWithLatency(s.conn, target)
go s.transferWithLatency(target, s.conn)
<-s.close
s.logger.Infof("Connection between client %s and target %s closed", s.conn.RemoteAddr(), target.RemoteAddr())
}
func (s *Slowpoke) createBuffer() []byte {
return make([]byte, s.bufferSize)
}
func (s *Slowpoke) transferWithLatency(source net.Conn, target net.Conn) {
byteBuffer := s.createBuffer()
for {
bytesRead, readError := source.Read(byteBuffer)
if bytesRead > 0 {
s.logger.Debugf("Transferring %d bytes from %s to %s with %s added latency", bytesRead, source.RemoteAddr(), target.RemoteAddr(), s.latency)
if s.latency != 0 {
time.Sleep(s.latency)
}
bytesWritten, writeError := target.Write(byteBuffer[0:bytesRead])
if writeError != nil {
s.handleError("Error during write: %v", writeError)
break
}
if bytesRead != bytesWritten {
s.logger.Warningf("Read %d bytes but could only write %d bytes", bytesRead, bytesWritten)
}
}
if readError != nil {
s.handleError("Error during read: %v", readError)
break
}
}
}
func (s *Slowpoke) handleError(msg string, err error) {
if s.isClosed {
// One of the send/receive streams was already closed. Nothing to do.
return
}
s.isClosed = true
s.close <- true
if err == io.EOF {
// EOF is expected and not really an error
s.logger.Debug("Received EOF")
} else {
s.logger.Errorf(msg, err)
}
}