From 095d731efa763f998614b22515ac552e335aadee Mon Sep 17 00:00:00 2001 From: Nguyen Sy Thanh Son Date: Sat, 6 Jun 2020 05:25:02 +0000 Subject: [PATCH 1/5] add log and fix panic --- services/ohlcv.go | 2 ++ ws/client.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/services/ohlcv.go b/services/ohlcv.go index 1e6d1d7..055d13e 100644 --- a/services/ohlcv.go +++ b/services/ohlcv.go @@ -1300,6 +1300,8 @@ func (s *OHLCVService) GetLastPriceCurrentByTime(symbol string, createAt time.Ti } func (s *OHLCVService) getCachePairByName(pairName string) (*types.Pair, error) { + s.mutex.RLock() + defer s.mutex.RUnlock() now := time.Now().Unix() if pairCache, ok := s.pairCacheByName[pairName]; ok { if now-pairCache.timelife < cacheTimeLifeMax { diff --git a/ws/client.go b/ws/client.go index 63e02a1..9c17fd5 100644 --- a/ws/client.go +++ b/ws/client.go @@ -53,6 +53,8 @@ func (c *Client) SendMessage(channel string, msgType types.SubscriptionEvent, pa e.Hash = h[0].Hex() } + logger.Debug("SendMessage", channel, msgType) + m := types.WebsocketMessage{ Channel: channel, Event: e, From 5d85401fffecc77ad4f3b6dca1876f4c24ebadab Mon Sep 17 00:00:00 2001 From: Nguyen Sy Thanh Son Date: Sat, 6 Jun 2020 05:29:36 +0000 Subject: [PATCH 2/5] remove lock --- services/ohlcv.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/services/ohlcv.go b/services/ohlcv.go index 055d13e..1e6d1d7 100644 --- a/services/ohlcv.go +++ b/services/ohlcv.go @@ -1300,8 +1300,6 @@ func (s *OHLCVService) GetLastPriceCurrentByTime(symbol string, createAt time.Ti } func (s *OHLCVService) getCachePairByName(pairName string) (*types.Pair, error) { - s.mutex.RLock() - defer s.mutex.RUnlock() now := time.Now().Unix() if pairCache, ok := s.pairCacheByName[pairName]; ok { if now-pairCache.timelife < cacheTimeLifeMax { From 6f1823d4bfbd266d1c951eab7f2d66500d559e73 Mon Sep 17 00:00:00 2001 From: Nguyen Sy Thanh Son Date: Sun, 7 Jun 2020 17:56:45 +0000 Subject: [PATCH 3/5] wait 10 s --- ws/connection.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ws/connection.go b/ws/connection.go index 58863df..5e1a43e 100644 --- a/ws/connection.go +++ b/ws/connection.go @@ -11,8 +11,8 @@ import ( ) const ( - writeWait = 60 * time.Second - pongWait = 60 * time.Second + writeWait = 10 * time.Second + pongWait = 10 * time.Second pingPeriod = (pongWait * 9) / 10 ) From 4d7ccb6b303baced1c228e05abd87778949daf30 Mon Sep 17 00:00:00 2001 From: Nguyen Sy Thanh Son Date: Sun, 7 Jun 2020 17:59:47 +0000 Subject: [PATCH 4/5] ws 30 s --- ws/connection.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ws/connection.go b/ws/connection.go index 5e1a43e..8ae6c99 100644 --- a/ws/connection.go +++ b/ws/connection.go @@ -11,8 +11,8 @@ import ( ) const ( - writeWait = 10 * time.Second - pongWait = 10 * time.Second + writeWait = 30 * time.Second + pongWait = 30 * time.Second pingPeriod = (pongWait * 9) / 10 ) From 0b7649fda79bea497f17f146256e82843292b33d Mon Sep 17 00:00:00 2001 From: Nguyen Sy Thanh Son Date: Tue, 9 Jun 2020 04:19:21 +0000 Subject: [PATCH 5/5] change lock, unlock --- ws/lending_markets.go | 11 ++++++++--- ws/lending_orderbook.go | 11 ++++++++--- ws/lending_priceboard.go | 11 ++++++++--- ws/lending_trade.go | 11 ++++++++--- ws/markets.go | 11 ++++++++--- ws/orderbook.go | 11 ++++++++--- ws/price_board.go | 11 ++++++++--- ws/trades.go | 11 ++++++++--- 8 files changed, 64 insertions(+), 24 deletions(-) diff --git a/ws/lending_markets.go b/ws/lending_markets.go index 7fdc991..70379d1 100644 --- a/ws/lending_markets.go +++ b/ws/lending_markets.go @@ -98,11 +98,16 @@ func (s *LendingMarketsSocket) Unsubscribe(c *Client) { } } -// BroadcastMessage streams message to all the subscriptions subscribed to the pair -func (s *LendingMarketsSocket) BroadcastMessage(channelID string, p interface{}) error { +func (s *LendingMarketsSocket) getSubscriptions() map[string]map[*Client]bool { s.subsMutex.RLock() defer s.subsMutex.RUnlock() - for c, status := range s.subscriptions[channelID] { + return s.subscriptions +} + +// BroadcastMessage streams message to all the subscriptions subscribed to the pair +func (s *LendingMarketsSocket) BroadcastMessage(channelID string, p interface{}) error { + subs := s.getSubscriptions() + for c, status := range subs[channelID] { if status { s.SendUpdateMessage(c, p) } diff --git a/ws/lending_orderbook.go b/ws/lending_orderbook.go index dbd480f..549a222 100644 --- a/ws/lending_orderbook.go +++ b/ws/lending_orderbook.go @@ -101,11 +101,16 @@ func (s *LendingOrderBookSocket) Unsubscribe(c *Client) { } } -// BroadcastMessage streams message to all the subscribtions subscribed to the pair -func (s *LendingOrderBookSocket) BroadcastMessage(channelID string, p interface{}) error { +func (s *LendingOrderBookSocket) getSubscriptions() map[string]map[*Client]bool { s.subsMutex.RLock() defer s.subsMutex.RUnlock() - for c, status := range s.subscriptions[channelID] { + return s.subscriptions +} + +// BroadcastMessage streams message to all the subscribtions subscribed to the pair +func (s *LendingOrderBookSocket) BroadcastMessage(channelID string, p interface{}) error { + subs := s.getSubscriptions() + for c, status := range subs[channelID] { if status { s.SendUpdateMessage(c, p) } diff --git a/ws/lending_priceboard.go b/ws/lending_priceboard.go index 4f4ee69..268eb3a 100644 --- a/ws/lending_priceboard.go +++ b/ws/lending_priceboard.go @@ -94,11 +94,16 @@ func (s *LendingPriceBoardSocket) Unsubscribe(c *Client) { } } -// BroadcastMessage streams message to all the subscriptions subscribed to the pair -func (s *LendingPriceBoardSocket) BroadcastMessage(channelID string, p interface{}) error { +func (s *LendingPriceBoardSocket) getSubscriptions() map[string]map[*Client]bool { s.subsMutex.RLock() defer s.subsMutex.RUnlock() - for c, status := range s.subscriptions[channelID] { + return s.subscriptions +} + +// BroadcastMessage streams message to all the subscriptions subscribed to the pair +func (s *LendingPriceBoardSocket) BroadcastMessage(channelID string, p interface{}) error { + subs := s.getSubscriptions() + for c, status := range subs[channelID] { if status { s.SendUpdateMessage(c, p) } diff --git a/ws/lending_trade.go b/ws/lending_trade.go index 752429f..97d7fb0 100644 --- a/ws/lending_trade.go +++ b/ws/lending_trade.go @@ -97,12 +97,17 @@ func (s *LendingTradeSocket) Unsubscribe(c *Client) { } } +func (s *LendingTradeSocket) getSubscriptions() map[string]map[*Client]bool { + s.subsMutex.RLock() + defer s.subsMutex.RUnlock() + return lendingTradeSocket.subscriptions +} + // BroadcastMessage broadcasts trade message to all subscribed sockets func (s *LendingTradeSocket) BroadcastMessage(channelID string, p interface{}) { go func() { - s.subsMutex.RLock() - defer s.subsMutex.RUnlock() - for conn, active := range lendingTradeSocket.subscriptions[channelID] { + subs := s.getSubscriptions() + for conn, active := range subs[channelID] { if active { s.SendUpdateMessage(conn, p) } diff --git a/ws/markets.go b/ws/markets.go index 17d835e..320c5a8 100644 --- a/ws/markets.go +++ b/ws/markets.go @@ -96,11 +96,16 @@ func (s *MarketsSocket) Unsubscribe(c *Client) { } } -// BroadcastMessage streams message to all the subscriptions subscribed to the pair -func (s *MarketsSocket) BroadcastMessage(channelID string, p interface{}) error { +func (s *MarketsSocket) getSubscriptions() map[string]map[*Client]bool { s.subsMutex.RLock() defer s.subsMutex.RUnlock() - for c, status := range s.subscriptions[channelID] { + return s.subscriptions +} + +// BroadcastMessage streams message to all the subscriptions subscribed to the pair +func (s *MarketsSocket) BroadcastMessage(channelID string, p interface{}) error { + subs := s.getSubscriptions() + for c, status := range subs[channelID] { if status { s.SendUpdateMessage(c, p) } diff --git a/ws/orderbook.go b/ws/orderbook.go index 5f134a7..d9f3631 100644 --- a/ws/orderbook.go +++ b/ws/orderbook.go @@ -98,11 +98,16 @@ func (s *OrderBookSocket) Unsubscribe(c *Client) { } } -// BroadcastMessage streams message to all the subscribtions subscribed to the pair -func (s *OrderBookSocket) BroadcastMessage(channelID string, p interface{}) error { +func (s *OrderBookSocket) getSubscriptions() map[string]map[*Client]bool { s.subsMutex.RLock() defer s.subsMutex.RUnlock() - for c, status := range s.subscriptions[channelID] { + return s.subscriptions +} + +// BroadcastMessage streams message to all the subscribtions subscribed to the pair +func (s *OrderBookSocket) BroadcastMessage(channelID string, p interface{}) error { + subs := s.getSubscriptions() + for c, status := range subs[channelID] { if status { s.SendUpdateMessage(c, p) } diff --git a/ws/price_board.go b/ws/price_board.go index a5a1373..419b898 100644 --- a/ws/price_board.go +++ b/ws/price_board.go @@ -95,11 +95,16 @@ func (s *PriceBoardSocket) Unsubscribe(c *Client) { } } -// BroadcastMessage streams message to all the subscriptions subscribed to the pair -func (s *PriceBoardSocket) BroadcastMessage(channelID string, p interface{}) error { +func (s *PriceBoardSocket) getSubscriptions() map[string]map[*Client]bool { s.subsMutex.RLock() defer s.subsMutex.RUnlock() - for c, status := range s.subscriptions[channelID] { + return s.subscriptions +} + +// BroadcastMessage streams message to all the subscriptions subscribed to the pair +func (s *PriceBoardSocket) BroadcastMessage(channelID string, p interface{}) error { + subs := s.getSubscriptions() + for c, status := range subs[channelID] { if status { s.SendUpdateMessage(c, p) } diff --git a/ws/trades.go b/ws/trades.go index 7bba0fc..eb0afe2 100644 --- a/ws/trades.go +++ b/ws/trades.go @@ -93,12 +93,17 @@ func (s *TradeSocket) Unsubscribe(c *Client) { } } +func (s *TradeSocket) getSubscriptions() map[string]map[*Client]bool { + s.subsMutex.RLock() + defer s.subsMutex.RUnlock() + return tradeSocket.subscriptions +} + // BroadcastMessage broadcasts trade message to all subscribed sockets func (s *TradeSocket) BroadcastMessage(channelID string, p interface{}) { go func() { - s.subsMutex.RLock() - defer s.subsMutex.RUnlock() - for conn, active := range tradeSocket.subscriptions[channelID] { + subs := s.getSubscriptions() + for conn, active := range subs[channelID] { if active { s.SendUpdateMessage(conn, p) }