From bbad8bbdaff7d49a2963c0fc2fb4f1e65bf45f3e Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 17 Oct 2023 18:05:34 +0800 Subject: [PATCH] update --- cmd/haobase/assets/unfreeze_assets.go | 2 +- cmd/haobase/clearing/clearing.go | 67 ++++++++++++++++----------- cmd/haobase/clearing/clearing_test.go | 9 ++-- 3 files changed, 44 insertions(+), 34 deletions(-) diff --git a/cmd/haobase/assets/unfreeze_assets.go b/cmd/haobase/assets/unfreeze_assets.go index 9e999986..878e62d9 100644 --- a/cmd/haobase/assets/unfreeze_assets.go +++ b/cmd/haobase/assets/unfreeze_assets.go @@ -17,7 +17,7 @@ func UnfreezeAllAssets(db *xorm.Session, user_id string, business_id string) (su func unfreezeAssets(db *xorm.Session, user_id string, business_id, unfreeze_amount string) (success bool, err error) { - if utils.D(unfreeze_amount).Cmp(utils.D("0")) >= 0 { + if utils.D(unfreeze_amount).Cmp(utils.D("0")) < 0 { return false, fmt.Errorf("解冻金额必须大于等于0") } diff --git a/cmd/haobase/clearing/clearing.go b/cmd/haobase/clearing/clearing.go index 46dbf691..c1f22433 100644 --- a/cmd/haobase/clearing/clearing.go +++ b/cmd/haobase/clearing/clearing.go @@ -33,7 +33,6 @@ func run_clearing(symbol string) { func watch_redis_list(symbol string) { key := types.FormatTradeResult.Format(symbol) - quote_key := types.FormatQuoteTradeResult.Format(symbol) logrus.Infof("正在监听%s成交日志 结算...", symbol) for { func() { @@ -48,36 +47,44 @@ func watch_redis_list(symbol string) { raw, _ := redis.Bytes(rdc.Do("Lpop", key)) logrus.Infof("%s成交记录: %s", symbol, raw) - var data trading_core.TradeResult - err := json.Unmarshal(raw, &data) - if err != nil { - logrus.Warnf("%s 解析json: %s 错误: %s", key, raw, err) - return - } + go clearing_trade_order(symbol, raw) + }() - lock(data.AskOrderId) - lock(data.BidOrderId) - - if data.Last != "" { - go func() { - for { - time.Sleep(time.Duration(50) * time.Millisecond) - if getlock(data.Last) == 1 { - newClean(data) - break - } - } - }() - } else { - go newClean(data) - } + } +} + +func clearing_trade_order(symbol string, raw []byte) { + var data trading_core.TradeResult + err := json.Unmarshal(raw, &data) + if err != nil { + logrus.Errorf("%s成交日志格式错误: %s %s", symbol, err.Error(), raw) + return + } - //通知kline系统 - if _, err := rdc.Do("RPUSH", quote_key, raw); err != nil { - logrus.Errorf("rpush %s err: %s", quote_key, err.Error()) + lock(data.AskOrderId) + lock(data.BidOrderId) + + if data.Last == "" { + go newClean(data) + } else { + go func() { + for { + time.Sleep(time.Duration(50) * time.Millisecond) + logrus.Infof("等待其他订单结算完成....") + if getlock(data.Last) == 1 { + newClean(data) + break + } } }() + } + //通知kline系统 + rdc := app.RedisPool().Get() + defer rdc.Close() + quote_key := types.FormatQuoteTradeResult.Format(symbol) + if _, err := rdc.Do("RPUSH", quote_key, raw); err != nil { + logrus.Errorf("rpush %s err: %s", quote_key, err.Error()) } } @@ -93,7 +100,6 @@ func lock(order_id string) { key := fmt.Sprintf("clearing.lock.%s", order_id) rdc.Do("INCR", key) - rdc.Do("expire", key, time.Duration(1)*time.Minute) } func unlock(order_id string) { @@ -101,7 +107,12 @@ func unlock(order_id string) { defer rdc.Close() key := fmt.Sprintf("clearing.lock.%s", order_id) - rdc.Do("DECR", key) + if _, err := rdc.Do("DECR", key); err != nil { + logrus.Warnf("clearing unlock %s err: %s", order_id, err.Error()) + } + if _, err := rdc.Do("Expire", key, 300); err != nil { + logrus.Warnf("clearing unlock %s set expire err: %s", order_id, err.Error()) + } } func getlock(order_id string) int64 { diff --git a/cmd/haobase/clearing/clearing_test.go b/cmd/haobase/clearing/clearing_test.go index 4ce5a4cd..98bfdc0d 100644 --- a/cmd/haobase/clearing/clearing_test.go +++ b/cmd/haobase/clearing/clearing_test.go @@ -92,7 +92,7 @@ func TestLimitOrder(t *testing.T) { TradeQuantity: utils.D("1"), TradeTime: time.Now().UnixNano(), } - newClean(result) + clearing_trade_order(testSymbol, result.Json()) //检查资产 sell_assets_target := assets.FindSymbol(sellUser, testTargetSymbol) @@ -124,8 +124,6 @@ func TestMarket(t *testing.T) { initdb(t) Convey("市价买指定的数量", t, func() { initAssets(t) - // defer cleanOrders(t) - // defer cleanAssets(t) s1, err := orders.NewLimitOrder(sellUser, testSymbol, trading_core.OrderSideSell, "1.00", "1") So(err, ShouldBeNil) @@ -153,7 +151,8 @@ func TestMarket(t *testing.T) { Last: buy.OrderId, } - newClean(result1) - newClean(result2) + clearing_trade_order(testSymbol, result1.Json()) + clearing_trade_order(testSymbol, result2.Json()) + time.Sleep(5 * time.Second) }) }