Skip to content

Commit

Permalink
redis database改pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Your Name committed Oct 16, 2023
1 parent 92361b0 commit a6d9b92
Show file tree
Hide file tree
Showing 28 changed files with 354 additions and 299 deletions.
7 changes: 6 additions & 1 deletion cmd/haobase/assets/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/shopspring/decimal"
"github.com/yzimhao/trading_engine/utils"
"github.com/yzimhao/trading_engine/utils/app"
)

// 用户资产冻结记录
Expand Down Expand Up @@ -62,8 +63,12 @@ type assetsFreeze struct {
}

func FindSymbol(user_id string, symbol string) *Assets {

db := app.Database().NewSession()
defer db.Close()

var row Assets
db_engine.Table(new(Assets)).Where("user_id=? and symbol=?", user_id, symbol).Get(&row)
db.Table(new(Assets)).Where("user_id=? and symbol=?", user_id, symbol).Get(&row)
return &row
}

Expand Down
18 changes: 6 additions & 12 deletions cmd/haobase/assets/init.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
package assets

import (
"github.com/redis/go-redis/v9"
"github.com/shopspring/decimal"
"github.com/sirupsen/logrus"
"xorm.io/xorm"
)

var (
db_engine *xorm.Engine
"github.com/yzimhao/trading_engine/utils/app"
)

const (
Expand All @@ -18,8 +13,8 @@ const (
UserSystemFee string = "system_fee"
)

func Init(db *xorm.Engine, rdc *redis.Client) {
db_engine = db
func Init() {
db_engine := app.Database()

//同步表结构
err := db_engine.Sync2(
Expand All @@ -33,6 +28,9 @@ func Init(db *xorm.Engine, rdc *redis.Client) {
}

func UserAssets(user_id string, symbol []string) []Assets {
db_engine := app.Database().NewSession()
defer db_engine.Close()

rows := []Assets{}
q := db_engine.Table(new(Assets)).Where("user_id=?", user_id)
if len(symbol) > 0 {
Expand All @@ -43,10 +41,6 @@ func UserAssets(user_id string, symbol []string) []Assets {
return rows
}

func DB() *xorm.Engine {
return db_engine
}

func d(s string) decimal.Decimal {
ss, _ := decimal.NewFromString(s)
return ss
Expand Down
3 changes: 2 additions & 1 deletion cmd/haobase/assets/transfer_assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strings"

"github.com/yzimhao/trading_engine/utils/app"
"xorm.io/xorm"
)

Expand All @@ -12,7 +13,7 @@ func Transfer(db *xorm.Session, from, to string, symbol string, amount string, b
}

func SysRecharge(to string, symbol string, amount string, business_id string) (success bool, err error) {
db := db_engine.NewSession()
db := app.Database().NewSession()
defer db.Close()

db.Begin()
Expand Down
21 changes: 3 additions & 18 deletions cmd/haobase/base/base.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,11 @@
package base

import (
"github.com/redis/go-redis/v9"
"github.com/yzimhao/trading_engine/cmd/haobase/base/symbols"
"xorm.io/xorm"
)

var (
db *xorm.Engine
rdc *redis.Client
)

func Init(_db *xorm.Engine, _rdc *redis.Client) {
db = _db
rdc = _rdc
symbols.Init(db, rdc)
}

func DB() *xorm.Engine {
return db
}
var ()

func RDC() *redis.Client {
return rdc
func Init() {
symbols.Init()
}
11 changes: 11 additions & 0 deletions cmd/haobase/base/symbols/func.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package symbols

import "github.com/yzimhao/trading_engine/utils/app"

func NewVarieties(symbol string) *Varieties {
db := app.Database().NewSession()
defer db.Close()

var row Varieties
db.Where("symbol=?", symbol).Get(&row)
return &row
}

func NewTradingVarieties(symbol string) *TradingVarieties {
db := app.Database().NewSession()
defer db.Close()

var row TradingVarieties

db.Where("symbol=?", symbol).Get(&row)
Expand All @@ -18,6 +26,9 @@ func NewTradingVarieties(symbol string) *TradingVarieties {
}

func newVarietiesById(id int) *Varieties {
db := app.Database().NewSession()
defer db.Close()

var row Varieties
db.Where("id=?", id).Get(&row)
return &row
Expand Down
18 changes: 8 additions & 10 deletions cmd/haobase/base/symbols/init.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
package symbols

import (
"github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"
"xorm.io/xorm"
"github.com/yzimhao/trading_engine/utils/app"
)

var (
db *xorm.Engine
rdc *redis.Client
)

func Init(_db *xorm.Engine, _rdc *redis.Client) {
db = _db
rdc = _rdc
var ()

func Init() {
init_db()
}

func init_db() {
db := app.Database()

err := db.Sync2(
new(Varieties),
new(TradingVarieties),
Expand Down Expand Up @@ -54,6 +49,9 @@ func DemoData() {
},
}

db := app.Database().NewSession()
defer db.Close()

_, err := db.Insert(symbols)
if err != nil {
logrus.Error(err)
Expand Down
17 changes: 9 additions & 8 deletions cmd/haobase/clearing/clearing.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package clearing

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/gomodule/redigo/redis"
"github.com/sirupsen/logrus"
"github.com/yzimhao/trading_engine/cmd/haobase/base"
"github.com/yzimhao/trading_engine/cmd/haobase/base/symbols"
"github.com/yzimhao/trading_engine/trading_core"
"github.com/yzimhao/trading_engine/types"
"github.com/yzimhao/trading_engine/utils"
"github.com/yzimhao/trading_engine/utils/app"
)

func Run() {
//load symbols
db := base.DB().NewSession()
db := app.Database().NewSession()
defer db.Close()

var rows []symbols.TradingVarieties
Expand All @@ -37,16 +37,15 @@ func watch_redis_list(symbol string) {
logrus.Infof("结算,正在监听%s成交日志...", symbol)
for {
func() {
cx := context.Background()
rdc := base.RDC()
rdc := app.RedisPool().Get()
defer rdc.Close()

if n, _ := rdc.LLen(cx, key).Result(); n == 0 {
if n, _ := redis.Int64(rdc.Do("LLen", key)); n == 0 {
time.Sleep(time.Duration(50) * time.Millisecond)
return
}

raw, _ := rdc.LPop(cx, key).Bytes()
raw, _ := redis.Bytes(rdc.Do("Lpop", key))

var data trading_core.TradeResult
err := json.Unmarshal(raw, &data)
Expand All @@ -64,7 +63,9 @@ func watch_redis_list(symbol string) {
}

//通知kline系统
rdc.RPush(cx, quote_key, raw)
if _, err := rdc.Do("RPUSH", quote_key, raw); err != nil {
logrus.Errorf("rpush %s err: %s", quote_key, err.Error())
}

// if !data.Last {
// go newClean(data)
Expand Down
4 changes: 2 additions & 2 deletions cmd/haobase/clearing/clearing_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"fmt"

"github.com/yzimhao/trading_engine/cmd/haobase/assets"
"github.com/yzimhao/trading_engine/cmd/haobase/base"
"github.com/yzimhao/trading_engine/cmd/haobase/base/symbols"
"github.com/yzimhao/trading_engine/cmd/haobase/orders"
"github.com/yzimhao/trading_engine/trading_core"
"github.com/yzimhao/trading_engine/utils"
"github.com/yzimhao/trading_engine/utils/app"
"xorm.io/xorm"
)

Expand All @@ -23,7 +23,7 @@ type clean struct {
}

func newClean(raw trading_core.TradeResult) error {
db := base.DB().NewSession()
db := app.Database().NewSession()
defer db.Close()

item := clean{
Expand Down
39 changes: 18 additions & 21 deletions cmd/haobase/clearing/clearing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ import (
"testing"
"time"

"github.com/redis/go-redis/v9"
"github.com/yzimhao/trading_engine/cmd/haobase/assets"
"github.com/yzimhao/trading_engine/cmd/haobase/base"
"github.com/yzimhao/trading_engine/cmd/haobase/base/symbols"
"github.com/yzimhao/trading_engine/cmd/haobase/orders"
"github.com/yzimhao/trading_engine/trading_core"
"github.com/yzimhao/trading_engine/utils"
"xorm.io/xorm"
"github.com/yzimhao/trading_engine/utils/app"

_ "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq"
Expand All @@ -28,22 +27,18 @@ var (
)

func initdb(t *testing.T) {
db, err := xorm.NewEngine("mysql", "root:root@tcp(localhost:3306)/test?charset=utf8&loc=Local")
if err != nil {
t.Logf("mysql err: %s", err)
}
app.DatabaseInit("mysql", "root:root@tcp(localhost:3306)/test?charset=utf8&loc=Local", true)
app.RedisInit("127.0.0.1:6379", "", 0)

rdc := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379", DB: 0})
base.Init(db, rdc)
db.ShowSQL(true)
base.Init()

cleanAssets(t)
cleanOrders(t)

}

func initAssets(t *testing.T) {
assets.Init(base.DB(), base.RDC())
assets.Init()
symbols.DemoData()

assets.SysRecharge("user1", "usd", "10000.00", "C001")
Expand All @@ -53,24 +48,26 @@ func initAssets(t *testing.T) {
}

func cleanAssets(t *testing.T) {
base.DB().DropIndexes(new(assets.Assets))
base.DB().DropIndexes("assets_freeze")
base.DB().DropIndexes("assets_log")
err := base.DB().DropTables(new(assets.Assets), "assets_freeze", "assets_log")
db := app.Database()
db.DropIndexes(new(assets.Assets))
db.DropIndexes("assets_freeze")
db.DropIndexes("assets_log")
err := db.DropTables(new(assets.Assets), "assets_freeze", "assets_log")
if err != nil {
t.Logf("mysql droptables: %s", err)
}

}

func cleanOrders(t *testing.T) {
base.DB().DropIndexes(orders.GetOrderTableName(testSymbol))
base.DB().DropIndexes(new(orders.UnfinishedOrder))
base.DB().DropIndexes(orders.GetTradelogTableName(testSymbol))

base.DB().DropTables(orders.GetOrderTableName(testSymbol))
base.DB().DropTables(new(orders.UnfinishedOrder))
base.DB().DropTables(orders.GetTradelogTableName(testSymbol))
db := app.Database()
db.DropIndexes(orders.GetOrderTableName(testSymbol))
db.DropIndexes(new(orders.UnfinishedOrder))
db.DropIndexes(orders.GetTradelogTableName(testSymbol))

db.DropTables(orders.GetOrderTableName(testSymbol))
db.DropTables(new(orders.UnfinishedOrder))
db.DropTables(orders.GetTradelogTableName(testSymbol))

}

Expand Down
8 changes: 4 additions & 4 deletions cmd/haobase/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ func main() {

Before: func(ctx *cli.Context) error {
app.ConfigInit(ctx.String("config"))
db := app.DatabaseInit()
rc := app.RedisInit()
app.DatabaseInit(app.Cstring("database.driver"), app.Cstring("database.dsn"), app.Cbool("database.show_sql"))
app.RedisInit(app.Cstring("redis.host"), app.Cstring("redis.password"), app.Cint("redis.db"))

base.Init(db, rc)
assets.Init(db, rc)
base.Init()
assets.Init()
return nil
},

Expand Down
3 changes: 2 additions & 1 deletion cmd/haobase/orders/cancel_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package orders
import (
"github.com/sirupsen/logrus"
"github.com/yzimhao/trading_engine/cmd/haobase/assets"
"github.com/yzimhao/trading_engine/utils/app"
)

func cancel_order(symbol, order_id string) (order *Order, err error) {
db := assets.DB().NewSession()
db := app.Database().NewSession()
defer db.Close()

err = db.Begin()
Expand Down
Loading

0 comments on commit a6d9b92

Please sign in to comment.