Skip to content

Commit

Permalink
给connMap加了个搜
Browse files Browse the repository at this point in the history
  • Loading branch information
zanjie1999 committed Dec 14, 2021
1 parent 3da4beb commit 202550f
Showing 1 changed file with 35 additions and 26 deletions.
61 changes: 35 additions & 26 deletions tcp2ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"regexp"
"time"
"os/signal"
"sync"
)

type tcp2wsSparkle struct {
Expand All @@ -29,10 +30,11 @@ type tcp2wsSparkle struct {
var (
tcp_addr string
ws_addr string
conn_num int
msg_type int = websocket.BinaryMessage
isServer bool
connMap map[string]*tcp2wsSparkle = make(map[string]*tcp2wsSparkle)
// go的map不是线程安全的 读写冲突就会直接exit
connMapLock *sync.RWMutex = new(sync.RWMutex)
)

var upgrader = websocket.Upgrader{
Expand All @@ -41,15 +43,23 @@ var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool{ return true },
}

func deleteConnMap(uuid string) {
defer func() {
err := recover()
if err != nil {
log.Print("del conn Boom!\n", err)
}
}()
func getConn(uuid string) (*tcp2wsSparkle, bool) {
connMapLock.RLock()
defer connMapLock.RUnlock()
conn, haskey := getConn(uuid);
return conn, haskey
}

func setConn(uuid string, conn *tcp2wsSparkle) {
connMapLock.Lock()
defer connMapLock.Unlock()
connMap[uuid] = conn
}

func deleteConn(uuid string) {
defer connMapLock.Unlock()

if conn, haskey := connMap[uuid]; haskey && conn != nil && !conn.del{
if conn, haskey := getConn(uuid); haskey && conn != nil && !conn.del{
conn.del = true
// 等一下再关闭 避免太快多线程锁不到
time.Sleep(100 * time.Millisecond)
Expand All @@ -61,6 +71,7 @@ func deleteConnMap(uuid string) {
conn.wsConn.WriteMessage(websocket.TextMessage, []byte("tcp2wsSparkleClose"))
conn.wsConn.Close()
}
connMapLock.Lock()
delete(connMap, uuid)
}
// panic("炸一下试试")
Expand All @@ -75,7 +86,7 @@ func ReadTcp2Ws(uuid string) (bool) {
}
}()

conn, haskey := connMap[uuid];
conn, haskey := getConn(uuid);
if !haskey {
return false
}
Expand All @@ -87,17 +98,17 @@ func ReadTcp2Ws(uuid string) (bool) {
}
length,err := tcpConn.Read(buf)
if err != nil {
if conn, haskey := connMap[uuid]; haskey && !conn.del {
if conn, haskey := getConn(uuid); haskey && !conn.del {
// tcp中断 关闭所有连接 关过的就不用关了
log.Print(uuid, " tcp read err: ", err)
deleteConnMap(uuid)
deleteConn(uuid)
return false
}
return false
}
if length > 0 {
// 因为tcpConn.Read会阻塞 所以要从connMap中获取最新的wsConn
conn, haskey := connMap[uuid];
conn, haskey := getConn(uuid);
if !haskey || conn.del {
return false
}
Expand Down Expand Up @@ -133,7 +144,7 @@ func ReadWs2Tcp(uuid string) (bool) {
}
}()

conn, haskey := connMap[uuid];
conn, haskey := getConn(uuid);
if !haskey {
return false
}
Expand All @@ -146,7 +157,7 @@ func ReadWs2Tcp(uuid string) (bool) {
t, buf, err := wsConn.ReadMessage()
if err != nil || t == -1 {
wsConn.Close()
if conn, haskey := connMap[uuid]; haskey && !conn.del {
if conn, haskey := getConn(uuid); haskey && !conn.del {
// 外部干涉导致中断 重连ws
log.Print(uuid, " ws read err: ", err)
return true
Expand All @@ -170,7 +181,7 @@ func ReadWs2Tcp(uuid string) (bool) {
msg_type = t
if _, err = tcpConn.Write(buf);err != nil{
log.Print(uuid, " tcp write err: ", err)
deleteConnMap(uuid)
deleteConn(uuid)
return false
}
// if !isServer {
Expand All @@ -188,7 +199,7 @@ func ReadWs2TcpClient(uuid string) {
}

func writeErrorBuf2Ws(uuid string) {
if conn, haskey := connMap[uuid]; haskey && conn.buf != nil {
if conn, haskey := getConn(uuid); haskey && conn.buf != nil {
for i := 0; i < len(conn.buf); i++ {
conn.wsConn.WriteMessage(websocket.BinaryMessage, conn.buf[i])
}
Expand Down Expand Up @@ -219,7 +230,7 @@ func RunServer(wsConn *websocket.Conn) {
if t == websocket.TextMessage {
uuid = string(buf)
// get
if conn, haskey := connMap[uuid]; haskey {
if conn, haskey := getConn(uuid); haskey {
tcpConn = conn.tcpConn
conn.wsConn.Close()
conn.wsConn = wsConn
Expand All @@ -238,8 +249,7 @@ func RunServer(wsConn *websocket.Conn) {
}
if uuid != "" {
// save
conn_num += 1
connMap[uuid] = &tcp2wsSparkle {tcpConn, wsConn, uuid, false, nil}
setConn(uuid, &tcp2wsSparkle {tcpConn, wsConn, uuid, false, nil})
}

go ReadTcp2Ws(uuid)
Expand All @@ -259,7 +269,7 @@ func RunClient(tcpConn net.Conn, uuid string) {
}()
// conn is close?
if tcpConn == nil {
if conn, haskey := connMap[uuid]; haskey {
if conn, haskey := getConn(uuid); haskey {
if conn.del {
return
}
Expand Down Expand Up @@ -290,11 +300,10 @@ func RunClient(tcpConn net.Conn, uuid string) {
// save conn
if tcpConn != nil {
// save
conn_num += 1
connMap[uuid] = &tcp2wsSparkle {tcpConn, wsConn, uuid, false, nil}
setConn(uuid, &tcp2wsSparkle {tcpConn, wsConn, uuid, false, nil})
} else {
// update
if conn, haskey := connMap[uuid]; haskey {
if conn, haskey := getConn(uuid); haskey {
conn.wsConn.Close()
conn.wsConn = wsConn
writeErrorBuf2Ws(uuid)
Expand Down Expand Up @@ -403,7 +412,7 @@ func main() {
log.Print(i.uuid, " timeout close")
i.tcpConn.Close()
i.wsConn.Close()
deleteConnMap(k)
deleteConn(k)
}
}
} else {
Expand All @@ -412,7 +421,7 @@ func main() {
<-c
log.Print(" quit...")
for k, _ := range connMap {
deleteConnMap(k)
deleteConn(k)
}
os.Exit(0)
}
Expand Down

0 comments on commit 202550f

Please sign in to comment.