From e2b1919cb1d72425851e764e9924905bd2665bd8 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Mon, 25 Nov 2024 17:48:31 +0800 Subject: [PATCH 01/11] fix: remove context when update nonce --- models/mysql/address.go | 4 ++-- models/mysql/address_test.go | 3 +-- models/repo/address_repo.go | 2 +- models/sqlite/address.go | 4 ++-- models/sqlite/address_test.go | 4 ++-- service/address_service.go | 4 ++-- service/message_selector.go | 4 ++-- 7 files changed, 12 insertions(+), 13 deletions(-) diff --git a/models/mysql/address.go b/models/mysql/address.go index 61f4b870..804d225b 100644 --- a/models/mysql/address.go +++ b/models/mysql/address.go @@ -170,8 +170,8 @@ func (s mysqlAddressRepo) DelAddress(ctx context.Context, addr address.Address) UpdateColumns(map[string]interface{}{"is_deleted": repo.Deleted, "state": types.AddressStateRemoved, "updated_at": time.Now()}).Error } -func (s mysqlAddressRepo) UpdateNonce(ctx context.Context, addr address.Address, nonce uint64) error { - return s.DB.WithContext(ctx).Model(&mysqlAddress{}).Where("addr = ? and is_deleted = ?", addr.String(), repo.NotDeleted). +func (s mysqlAddressRepo) UpdateNonce(addr address.Address, nonce uint64) error { + return s.DB.Model(&mysqlAddress{}).Where("addr = ? and is_deleted = ?", addr.String(), repo.NotDeleted). UpdateColumns(map[string]interface{}{"nonce": nonce, "updated_at": time.Now()}).Error } diff --git a/models/mysql/address_test.go b/models/mysql/address_test.go index e8d88958..18245be5 100644 --- a/models/mysql/address_test.go +++ b/models/mysql/address_test.go @@ -163,7 +163,6 @@ func testDelAddress(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) { } func testUpdateNonce(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) { - ctx := context.Background() addr := testutil.AddressProvider()(t) nonce := uint64(10) @@ -174,7 +173,7 @@ func testUpdateNonce(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) { WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - err := r.AddressRepo().UpdateNonce(ctx, addr, nonce) + err := r.AddressRepo().UpdateNonce(addr, nonce) assert.NoError(t, err) } diff --git a/models/repo/address_repo.go b/models/repo/address_repo.go index 8110b93a..5a2914c3 100644 --- a/models/repo/address_repo.go +++ b/models/repo/address_repo.go @@ -19,7 +19,7 @@ type AddressRepo interface { ListAddress(ctx context.Context) ([]*types.Address, error) ListActiveAddress(ctx context.Context) ([]*types.Address, error) DelAddress(ctx context.Context, addr address.Address) error - UpdateNonce(ctx context.Context, addr address.Address, nonce uint64) error + UpdateNonce(addr address.Address, nonce uint64) error UpdateState(ctx context.Context, addr address.Address, state types.AddressState) error UpdateSelectMsgNum(ctx context.Context, addr address.Address, num uint64) error UpdateFeeParams(ctx context.Context, addr address.Address, gasOverEstimation, gasOverPremium float64, maxFee, gasFeeCap, baseFee big.Int) error diff --git a/models/sqlite/address.go b/models/sqlite/address.go index b05b8840..0867ea7d 100644 --- a/models/sqlite/address.go +++ b/models/sqlite/address.go @@ -164,8 +164,8 @@ func (s sqliteAddressRepo) ListActiveAddress(ctx context.Context) ([]*types.Addr return result, nil } -func (s sqliteAddressRepo) UpdateNonce(ctx context.Context, addr address.Address, nonce uint64) error { - return s.DB.WithContext(ctx).Model((*sqliteAddress)(nil)).Where("addr = ? and is_deleted = -1", addr.String()). +func (s sqliteAddressRepo) UpdateNonce(addr address.Address, nonce uint64) error { + return s.DB.Model((*sqliteAddress)(nil)).Where("addr = ? and is_deleted = -1", addr.String()). UpdateColumns(map[string]interface{}{"nonce": nonce, "updated_at": time.Now()}).Error } diff --git a/models/sqlite/address_test.go b/models/sqlite/address_test.go index c59aacdd..84e84b35 100644 --- a/models/sqlite/address_test.go +++ b/models/sqlite/address_test.go @@ -116,13 +116,13 @@ func TestAddress(t *testing.T) { t.Run("UpdateNonce", func(t *testing.T) { nonce := uint64(5) - assert.NoError(t, addressRepo.UpdateNonce(ctx, addrInfo.Addr, nonce)) + assert.NoError(t, addressRepo.UpdateNonce(addrInfo.Addr, nonce)) r, err := addressRepo.GetAddress(ctx, addrInfo.Addr) assert.NoError(t, err) assert.Equal(t, nonce, r.Nonce) // set nonce for a not exist address - err = addressRepo.UpdateNonce(ctx, randAddr, nonce) + err = addressRepo.UpdateNonce(randAddr, nonce) assert.NoError(t, err) _, err = addressRepo.GetAddress(ctx, randAddr) assert.Contains(t, err.Error(), gorm.ErrRecordNotFound.Error()) diff --git a/service/address_service.go b/service/address_service.go index 911a7d12..be870a72 100644 --- a/service/address_service.go +++ b/service/address_service.go @@ -75,8 +75,8 @@ func (addressService *AddressService) SaveAddress(ctx context.Context, address * return address.ID, err } -func (addressService *AddressService) UpdateNonce(ctx context.Context, addr address.Address, nonce uint64) error { - return addressService.repo.AddressRepo().UpdateNonce(ctx, addr, nonce) +func (addressService *AddressService) UpdateNonce(_ context.Context, addr address.Address, nonce uint64) error { + return addressService.repo.AddressRepo().UpdateNonce(addr, nonce) } func (addressService *AddressService) GetAddress(ctx context.Context, addr address.Address) (*types.Address, error) { diff --git a/service/message_selector.go b/service/message_selector.go index 09e33d30..9d2b8653 100644 --- a/service/message_selector.go +++ b/service/message_selector.go @@ -351,7 +351,7 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap, w.log.Warnf("nonce in db %d is smaller than nonce on chain %d, update to latest", addrInfo.Nonce, nonceInLatestTs) addrInfo.Nonce = nonceInLatestTs addrInfo.UpdatedAt = time.Now() - err := w.repo.AddressRepo().UpdateNonce(ctx, addrInfo.Addr, addrInfo.Nonce) + err := w.repo.AddressRepo().UpdateNonce(addrInfo.Addr, addrInfo.Nonce) if err != nil { return nil, fmt.Errorf("update nonce failed: %v", err) } @@ -590,7 +590,7 @@ func (w *work) saveSelectedMessages(ctx context.Context, selectResult *MsgSelect } addrInfo := selectResult.Address - if err := txRepo.AddressRepo().UpdateNonce(ctx, addrInfo.Addr, addrInfo.Nonce); err != nil { + if err := txRepo.AddressRepo().UpdateNonce(addrInfo.Addr, addrInfo.Nonce); err != nil { return err } } From dc5cd25d44735f969d0076d62c165b40e14d45f8 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Mon, 25 Nov 2024 17:51:38 +0800 Subject: [PATCH 02/11] chore: fix lint --- service/message_selector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/message_selector.go b/service/message_selector.go index 9d2b8653..34a7ceaf 100644 --- a/service/message_selector.go +++ b/service/message_selector.go @@ -580,7 +580,7 @@ func (w *work) signMessage(ctx context.Context, msg *types.Message, accounts []s return sigI.(*crypto.Signature), nil } -func (w *work) saveSelectedMessages(ctx context.Context, selectResult *MsgSelectResult) error { +func (w *work) saveSelectedMessages(_ context.Context, selectResult *MsgSelectResult) error { startSaveDB := time.Now() w.log.Infof("start save messages to database") err := w.repo.Transaction(func(txRepo repo.TxRepo) error { From 7fa1789fb4d411435a0f4fdc22da22a71226c60d Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Tue, 26 Nov 2024 15:43:41 +0800 Subject: [PATCH 03/11] chore: reduce log --- service/message_selector.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/service/message_selector.go b/service/message_selector.go index 34a7ceaf..3342c2b3 100644 --- a/service/message_selector.go +++ b/service/message_selector.go @@ -308,14 +308,19 @@ func (w *work) startSelectMessage( w.log.Errorf("select message failed: %v", err) return } - w.log.Infof("select message result | SelectMsg: %d | ToPushMsg: %d | ErrMsg: %d | took: %v", len(selectResult.SelectMsg), - len(selectResult.ToPushMsg), len(selectResult.ErrMsg), time.Since(w.start)) - recordMetric(ctx, w.addr, selectResult) + if len(selectResult.SelectMsg) != 0 || len(selectResult.ToPushMsg) != 0 || len(selectResult.ErrMsg) != 0 { + w.log.Infof("select message result | SelectMsg: %d | ToPushMsg: %d | ErrMsg: %d | took: %v", len(selectResult.SelectMsg), + len(selectResult.ToPushMsg), len(selectResult.ErrMsg), time.Since(w.start)) - if err := w.saveSelectedMessages(ctx, selectResult); err != nil { - w.log.Errorf("failed to save selected messages to db %v", err) - return + recordMetric(ctx, w.addr, selectResult) + + if len(selectResult.SelectMsg) > 0 || len(selectResult.ErrMsg) > 0 { + if err := w.saveSelectedMessages(ctx, selectResult); err != nil { + w.log.Errorf("failed to save selected messages to db %v", err) + return + } + } } for _, msg := range selectResult.SelectMsg { @@ -362,14 +367,13 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap, // calc the message needed nonceGap := addrInfo.Nonce - nonceInLatestTs if nonceGap >= maxAllowPendingMessage { - w.log.Errorf("there are %d message not to be package, nonce gap: %d", len(toPushMessage), nonceGap) + w.log.Warnf("there are %d message not to be package, nonce gap: %d", len(toPushMessage), nonceGap) return &MsgSelectResult{ ToPushMsg: toPushMessage, Address: addrInfo, }, nil } wantCount := maxAllowPendingMessage - nonceGap - w.log.Infof("state actor nonce %d, latest nonce in ts %d, assigned nonce %d, nonce gap %d, want %d", actorNonce, nonceInLatestTs, addrInfo.Nonce, nonceGap, wantCount) // get unfill message selectCount := mathutil.MinUint64(wantCount, 100) @@ -379,12 +383,14 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap, } if len(messages) == 0 { - w.log.Infof("have no unfill message") + w.log.Debugf("have no unfill message") return &MsgSelectResult{ ToPushMsg: toPushMessage, Address: addrInfo, }, nil } + w.log.Infof("state actor nonce %d, latest nonce in ts %d, assigned nonce %d, nonce gap %d, want %d", actorNonce, + nonceInLatestTs, addrInfo.Nonce, nonceGap, wantCount) var errMsg []msgErrInfo count := uint64(0) @@ -477,7 +483,7 @@ func (w *work) getNonce(ctx context.Context, ts *venusTypes.TipSet, appliedNonce nonceInLatestTs := actor.Nonce // todo actor nonce maybe the latest ts. not need appliedNonce if nonceInTs, ok := appliedNonce.Get(w.addr); ok { - w.log.Infof("nonce in ts %d, nonce in actor %d", nonceInTs, nonceInLatestTs) + w.log.Debugf("nonce in ts %d, nonce in actor %d", nonceInTs, nonceInLatestTs) nonceInLatestTs = nonceInTs } From dbc551d8aab390d3da7e03fce66180b7c029fdd2 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Wed, 27 Nov 2024 15:21:09 +0800 Subject: [PATCH 04/11] fix: get max message nonce from db --- service/message_selector.go | 55 ++++++++++++++++++++++++++++++-- service/message_selector_test.go | 2 +- 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/service/message_selector.go b/service/message_selector.go index 3342c2b3..57099bf5 100644 --- a/service/message_selector.go +++ b/service/message_selector.go @@ -316,7 +316,7 @@ func (w *work) startSelectMessage( recordMetric(ctx, w.addr, selectResult) if len(selectResult.SelectMsg) > 0 || len(selectResult.ErrMsg) > 0 { - if err := w.saveSelectedMessages(ctx, selectResult); err != nil { + if err := w.saveSelectedMessages(selectResult); err != nil { w.log.Errorf("failed to save selected messages to db %v", err) return } @@ -356,7 +356,14 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap, w.log.Warnf("nonce in db %d is smaller than nonce on chain %d, update to latest", addrInfo.Nonce, nonceInLatestTs) addrInfo.Nonce = nonceInLatestTs addrInfo.UpdatedAt = time.Now() - err := w.repo.AddressRepo().UpdateNonce(addrInfo.Addr, addrInfo.Nonce) + maxMsgNonce, err := w.getMaxMessageNonceFromDB(addrInfo.Addr) + if err == nil { + if maxMsgNonce > addrInfo.Nonce { + addrInfo.Nonce = maxMsgNonce + 1 + w.log.Warnf("max message nonce in db %d", maxMsgNonce) + } + } + err = w.repo.AddressRepo().UpdateNonce(addrInfo.Addr, addrInfo.Nonce) if err != nil { return nil, fmt.Errorf("update nonce failed: %v", err) } @@ -490,6 +497,47 @@ func (w *work) getNonce(ctx context.Context, ts *venusTypes.TipSet, appliedNonce return nonceInLatestTs, actor.Nonce, nil } +func (w *work) getMaxMessageNonceFromDB(addr address.Address) (uint64, error) { + var maxNonce uint64 + msgs, err := w.repo.MessageRepo().ListMessageByParams(&types.MsgQueryParams{ + State: []types.MessageState{ + types.FillMsg, + }, + From: []address.Address{ + addr, + }, + Limit: 100, + }) + if err == nil { + for _, msg := range msgs { + if maxNonce < msg.Nonce { + maxNonce = msg.Nonce + } + } + return maxNonce, nil + } + + msgs, err = w.repo.MessageRepo().ListMessageByParams(&types.MsgQueryParams{ + State: []types.MessageState{ + types.OnChainMsg, + }, + From: []address.Address{ + addr, + }, + Limit: 10, + }) + if err != nil { + return 0, err + } + for _, msg := range msgs { + if maxNonce < msg.Nonce { + maxNonce = msg.Nonce + } + } + + return maxNonce, nil +} + func (w *work) getFilledMessage(nonceInLatestTs uint64) []*venusTypes.SignedMessage { filledMessage, err := w.repo.MessageRepo().ListFilledMessageByAddress(w.addr) if err != nil { @@ -586,7 +634,7 @@ func (w *work) signMessage(ctx context.Context, msg *types.Message, accounts []s return sigI.(*crypto.Signature), nil } -func (w *work) saveSelectedMessages(_ context.Context, selectResult *MsgSelectResult) error { +func (w *work) saveSelectedMessages(selectResult *MsgSelectResult) error { startSaveDB := time.Now() w.log.Infof("start save messages to database") err := w.repo.Transaction(func(txRepo repo.TxRepo) error { @@ -599,6 +647,7 @@ func (w *work) saveSelectedMessages(_ context.Context, selectResult *MsgSelectRe if err := txRepo.AddressRepo().UpdateNonce(addrInfo.Addr, addrInfo.Nonce); err != nil { return err } + w.log.Infof("update nonce to %v", addrInfo.Nonce) } for _, m := range selectResult.ErrMsg { diff --git a/service/message_selector_test.go b/service/message_selector_test.go index e87dc6f9..e5544fa5 100644 --- a/service/message_selector_test.go +++ b/service/message_selector_test.go @@ -718,7 +718,7 @@ func selectMsgWithAddress(ctx context.Context, } allSelectRes.ErrMsg = append(allSelectRes.ErrMsg, selectResult.ErrMsg...) - assert.NoError(t, work.saveSelectedMessages(ctx, selectResult)) + assert.NoError(t, work.saveSelectedMessages(selectResult)) } return allSelectRes From 5f810aa667594cb644783c436d60a0e0b25ccdbf Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Fri, 29 Nov 2024 10:56:17 +0800 Subject: [PATCH 05/11] fix: save nonce failed --- service/message_selector.go | 58 +++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/service/message_selector.go b/service/message_selector.go index 57099bf5..cc11564d 100644 --- a/service/message_selector.go +++ b/service/message_selector.go @@ -363,7 +363,7 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap, w.log.Warnf("max message nonce in db %d", maxMsgNonce) } } - err = w.repo.AddressRepo().UpdateNonce(addrInfo.Addr, addrInfo.Nonce) + err = w.repo.AddressRepo().SaveAddress(ctx, addrInfo) if err != nil { return nil, fmt.Errorf("update nonce failed: %v", err) } @@ -499,39 +499,35 @@ func (w *work) getNonce(ctx context.Context, ts *venusTypes.TipSet, appliedNonce func (w *work) getMaxMessageNonceFromDB(addr address.Address) (uint64, error) { var maxNonce uint64 - msgs, err := w.repo.MessageRepo().ListMessageByParams(&types.MsgQueryParams{ - State: []types.MessageState{ - types.FillMsg, + queryParams := []*types.MsgQueryParams{ + { + State: []types.MessageState{ + types.FillMsg, + }, + From: []address.Address{ + addr, + }, }, - From: []address.Address{ - addr, + { + State: []types.MessageState{ + types.OnChainMsg, + }, + From: []address.Address{ + addr, + }, + Limit: 50, }, - Limit: 100, - }) - if err == nil { - for _, msg := range msgs { - if maxNonce < msg.Nonce { - maxNonce = msg.Nonce - } - } - return maxNonce, nil } - msgs, err = w.repo.MessageRepo().ListMessageByParams(&types.MsgQueryParams{ - State: []types.MessageState{ - types.OnChainMsg, - }, - From: []address.Address{ - addr, - }, - Limit: 10, - }) - if err != nil { - return 0, err - } - for _, msg := range msgs { - if maxNonce < msg.Nonce { - maxNonce = msg.Nonce + for _, param := range queryParams { + msgs, err := w.repo.MessageRepo().ListMessageByParams(param) + if err == nil && len(msgs) > 0 { + for _, msg := range msgs { + if maxNonce < msg.Nonce { + maxNonce = msg.Nonce + } + } + return maxNonce, nil } } @@ -644,7 +640,7 @@ func (w *work) saveSelectedMessages(selectResult *MsgSelectResult) error { } addrInfo := selectResult.Address - if err := txRepo.AddressRepo().UpdateNonce(addrInfo.Addr, addrInfo.Nonce); err != nil { + if err := txRepo.AddressRepo().SaveAddress(w.ctx, addrInfo); err != nil { return err } w.log.Infof("update nonce to %v", addrInfo.Nonce) From b3f1ab6eff3d645d133b7ba2d3578d6d64ad51c5 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Mon, 2 Dec 2024 14:18:18 +0800 Subject: [PATCH 06/11] fix: save nonce failed --- models/mysql/address.go | 7 ++++--- models/mysql/address_test.go | 2 +- models/repo/address_repo.go | 2 +- models/sqlite/address.go | 7 ++++--- models/sqlite/address_test.go | 5 +++-- service/address_service.go | 3 ++- service/message_selector.go | 5 +++-- 7 files changed, 18 insertions(+), 13 deletions(-) diff --git a/models/mysql/address.go b/models/mysql/address.go index 804d225b..43c2d9d2 100644 --- a/models/mysql/address.go +++ b/models/mysql/address.go @@ -170,9 +170,10 @@ func (s mysqlAddressRepo) DelAddress(ctx context.Context, addr address.Address) UpdateColumns(map[string]interface{}{"is_deleted": repo.Deleted, "state": types.AddressStateRemoved, "updated_at": time.Now()}).Error } -func (s mysqlAddressRepo) UpdateNonce(addr address.Address, nonce uint64) error { - return s.DB.Model(&mysqlAddress{}).Where("addr = ? and is_deleted = ?", addr.String(), repo.NotDeleted). - UpdateColumns(map[string]interface{}{"nonce": nonce, "updated_at": time.Now()}).Error +func (s mysqlAddressRepo) UpdateNonce(addr address.Address, nonce uint64) (int64, error) { + query := s.DB.Model(&mysqlAddress{}).Where("addr = ? and is_deleted = ?", addr.String(), repo.NotDeleted). + UpdateColumns(map[string]interface{}{"nonce": nonce, "updated_at": time.Now()}) + return query.RowsAffected, query.Error } func (s mysqlAddressRepo) UpdateState(ctx context.Context, addr address.Address, state types.AddressState) error { diff --git a/models/mysql/address_test.go b/models/mysql/address_test.go index 18245be5..aaa7f2b5 100644 --- a/models/mysql/address_test.go +++ b/models/mysql/address_test.go @@ -173,7 +173,7 @@ func testUpdateNonce(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) { WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - err := r.AddressRepo().UpdateNonce(addr, nonce) + _, err := r.AddressRepo().UpdateNonce(addr, nonce) assert.NoError(t, err) } diff --git a/models/repo/address_repo.go b/models/repo/address_repo.go index 5a2914c3..d9576fa7 100644 --- a/models/repo/address_repo.go +++ b/models/repo/address_repo.go @@ -19,7 +19,7 @@ type AddressRepo interface { ListAddress(ctx context.Context) ([]*types.Address, error) ListActiveAddress(ctx context.Context) ([]*types.Address, error) DelAddress(ctx context.Context, addr address.Address) error - UpdateNonce(addr address.Address, nonce uint64) error + UpdateNonce(addr address.Address, nonce uint64) (int64, error) UpdateState(ctx context.Context, addr address.Address, state types.AddressState) error UpdateSelectMsgNum(ctx context.Context, addr address.Address, num uint64) error UpdateFeeParams(ctx context.Context, addr address.Address, gasOverEstimation, gasOverPremium float64, maxFee, gasFeeCap, baseFee big.Int) error diff --git a/models/sqlite/address.go b/models/sqlite/address.go index 0867ea7d..019d0887 100644 --- a/models/sqlite/address.go +++ b/models/sqlite/address.go @@ -164,9 +164,10 @@ func (s sqliteAddressRepo) ListActiveAddress(ctx context.Context) ([]*types.Addr return result, nil } -func (s sqliteAddressRepo) UpdateNonce(addr address.Address, nonce uint64) error { - return s.DB.Model((*sqliteAddress)(nil)).Where("addr = ? and is_deleted = -1", addr.String()). - UpdateColumns(map[string]interface{}{"nonce": nonce, "updated_at": time.Now()}).Error +func (s sqliteAddressRepo) UpdateNonce(addr address.Address, nonce uint64) (int64, error) { + query := s.DB.Model((*sqliteAddress)(nil)).Where("addr = ? and is_deleted = -1", addr.String()). + UpdateColumns(map[string]interface{}{"nonce": nonce, "updated_at": time.Now()}) + return query.RowsAffected, query.Error } func (s sqliteAddressRepo) UpdateState(ctx context.Context, addr address.Address, state types.AddressState) error { diff --git a/models/sqlite/address_test.go b/models/sqlite/address_test.go index 84e84b35..d1b4671a 100644 --- a/models/sqlite/address_test.go +++ b/models/sqlite/address_test.go @@ -116,13 +116,14 @@ func TestAddress(t *testing.T) { t.Run("UpdateNonce", func(t *testing.T) { nonce := uint64(5) - assert.NoError(t, addressRepo.UpdateNonce(addrInfo.Addr, nonce)) + _, err := addressRepo.UpdateNonce(addrInfo.Addr, nonce) + assert.NoError(t, err) r, err := addressRepo.GetAddress(ctx, addrInfo.Addr) assert.NoError(t, err) assert.Equal(t, nonce, r.Nonce) // set nonce for a not exist address - err = addressRepo.UpdateNonce(randAddr, nonce) + _, err = addressRepo.UpdateNonce(randAddr, nonce) assert.NoError(t, err) _, err = addressRepo.GetAddress(ctx, randAddr) assert.Contains(t, err.Error(), gorm.ErrRecordNotFound.Error()) diff --git a/service/address_service.go b/service/address_service.go index be870a72..59690b33 100644 --- a/service/address_service.go +++ b/service/address_service.go @@ -76,7 +76,8 @@ func (addressService *AddressService) SaveAddress(ctx context.Context, address * } func (addressService *AddressService) UpdateNonce(_ context.Context, addr address.Address, nonce uint64) error { - return addressService.repo.AddressRepo().UpdateNonce(addr, nonce) + _, err := addressService.repo.AddressRepo().UpdateNonce(addr, nonce) + return err } func (addressService *AddressService) GetAddress(ctx context.Context, addr address.Address) (*types.Address, error) { diff --git a/service/message_selector.go b/service/message_selector.go index cc11564d..6584e974 100644 --- a/service/message_selector.go +++ b/service/message_selector.go @@ -640,10 +640,11 @@ func (w *work) saveSelectedMessages(selectResult *MsgSelectResult) error { } addrInfo := selectResult.Address - if err := txRepo.AddressRepo().SaveAddress(w.ctx, addrInfo); err != nil { + row, err := txRepo.AddressRepo().UpdateNonce(addrInfo.Addr, addrInfo.Nonce) + if err != nil { return err } - w.log.Infof("update nonce to %v", addrInfo.Nonce) + w.log.Infof("update nonce to %v, row affected %v", addrInfo.Nonce, row) } for _, m := range selectResult.ErrMsg { From b778cf5ce420416fc6944b32213ea43c3e38f4e4 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Mon, 2 Dec 2024 15:16:40 +0800 Subject: [PATCH 07/11] chore: add network flag --- main.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/main.go b/main.go index 7e7e0e5d..89b836f3 100644 --- a/main.go +++ b/main.go @@ -13,6 +13,7 @@ import ( "github.com/ipfs-force-community/sophon-messager/publisher" "github.com/ipfs-force-community/sophon-messager/publisher/pubsub" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/venus/fixtures/networks" v1 "github.com/filecoin-project/venus/venus-shared/api/chain/v1" gatewayAPI "github.com/filecoin-project/venus/venus-shared/api/gateway/v2" @@ -123,6 +124,18 @@ var runCmd = &cli.Command{ &cli.StringFlag{ Name: "rate-limit-redis", }, + &cli.StringFlag{ + Name: "network", + Usage: "network name, eg. mainnet, calibnet", + Value: "calibnet", + Hidden: true, + }, + }, + Before: func(cctx *cli.Context) error { + if cctx.String("network") != "mainnet" { + address.CurrentNetwork = address.Testnet + } + return nil }, Action: runAction, } From ef9c24f476798b0386b2f3520187ef57f43ad69d Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Mon, 2 Dec 2024 15:45:57 +0800 Subject: [PATCH 08/11] opt: migrate address --- main.go | 13 --------- models/modules.go | 54 +++++++++++++++++++++++++++++++++++ models/mysql/address.go | 8 +++--- models/mysql/address_test.go | 4 +-- models/repo/address_repo.go | 4 +-- models/sqlite/address.go | 8 +++--- models/sqlite/address_test.go | 6 ++-- service/address_service.go | 2 +- 8 files changed, 70 insertions(+), 29 deletions(-) diff --git a/main.go b/main.go index 89b836f3..7e7e0e5d 100644 --- a/main.go +++ b/main.go @@ -13,7 +13,6 @@ import ( "github.com/ipfs-force-community/sophon-messager/publisher" "github.com/ipfs-force-community/sophon-messager/publisher/pubsub" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/venus/fixtures/networks" v1 "github.com/filecoin-project/venus/venus-shared/api/chain/v1" gatewayAPI "github.com/filecoin-project/venus/venus-shared/api/gateway/v2" @@ -124,18 +123,6 @@ var runCmd = &cli.Command{ &cli.StringFlag{ Name: "rate-limit-redis", }, - &cli.StringFlag{ - Name: "network", - Usage: "network name, eg. mainnet, calibnet", - Value: "calibnet", - Hidden: true, - }, - }, - Before: func(cctx *cli.Context) error { - if cctx.String("network") != "mainnet" { - address.CurrentNetwork = address.Testnet - } - return nil }, Action: runAction, } diff --git a/models/modules.go b/models/modules.go index 7ad4b995..d286ed19 100644 --- a/models/modules.go +++ b/models/modules.go @@ -1,15 +1,23 @@ package models import ( + "context" + "errors" "fmt" + "time" + types "github.com/filecoin-project/venus/venus-shared/types/messager" "github.com/ipfs-force-community/sophon-messager/filestore" "github.com/ipfs-force-community/sophon-messager/models/mysql" "github.com/ipfs-force-community/sophon-messager/models/repo" "github.com/ipfs-force-community/sophon-messager/models/sqlite" + logging "github.com/ipfs/go-log/v2" "go.uber.org/fx" + "gorm.io/gorm" ) +var log = logging.Logger("db") + func SetDataBase(fsRepo filestore.FSRepo) (repo.Repo, error) { switch fsRepo.Config().DB.Type { case "sqlite": @@ -22,6 +30,9 @@ func SetDataBase(fsRepo filestore.FSRepo) (repo.Repo, error) { } func AutoMigrate(repo repo.Repo) error { + if err := MigrateAddress(repo); err != nil { + return fmt.Errorf("migrate address: %w", err) + } return repo.AutoMigrate() } @@ -34,3 +45,46 @@ func Options() fx.Option { fx.Provide(repo.NewINodeProvider), ) } + +func MigrateAddress(r repo.Repo) error { + list, err := r.AddressRepo().ListAddress(context.Background()) + if err != nil { + return err + } + + return r.Transaction(func(txRepo repo.TxRepo) error { + for _, addrInfo := range list { + fAddr := addrInfo.Addr.String() + _, err := txRepo.AddressRepo().GetOneRecord(context.Background(), fAddr) + if err == nil { + continue + } + if !errors.Is(err, gorm.ErrRecordNotFound) { + return err + } + tAddr := "t" + fAddr[1:] + + log.Infof("migrate address %s to %s", tAddr, fAddr) + now := time.Now() + newAddrInfo := &types.Address{ + Addr: addrInfo.Addr, + Nonce: addrInfo.Nonce, + SelMsgNum: addrInfo.SelMsgNum, + State: addrInfo.State, + IsDeleted: repo.NotDeleted, + FeeSpec: addrInfo.FeeSpec, + CreatedAt: now, + UpdatedAt: now, + } + if err := txRepo.AddressRepo().SaveAddress(context.Background(), newAddrInfo); err != nil { + return err + } + log.Infof("migrate address %s to %s success", tAddr, fAddr) + if err := txRepo.AddressRepo().DelAddress(context.Background(), tAddr); err != nil { + return err + } + log.Infof("delete address %s success", tAddr) + } + return nil + }) +} diff --git a/models/mysql/address.go b/models/mysql/address.go index 43c2d9d2..b31f40fd 100644 --- a/models/mysql/address.go +++ b/models/mysql/address.go @@ -111,9 +111,9 @@ func (s mysqlAddressRepo) GetAddressByID(ctx context.Context, id shared.UUID) (* return a.Address() } -func (s mysqlAddressRepo) GetOneRecord(ctx context.Context, addr address.Address) (*types.Address, error) { +func (s mysqlAddressRepo) GetOneRecord(ctx context.Context, addr string) (*types.Address, error) { var a mysqlAddress - if err := s.DB.WithContext(ctx).Take(&a, "addr = ?", addr.String()).Error; err != nil { + if err := s.DB.WithContext(ctx).Take(&a, "addr = ?", addr).Error; err != nil { return nil, err } @@ -165,8 +165,8 @@ func (s mysqlAddressRepo) ListActiveAddress(ctx context.Context) ([]*types.Addre return result, nil } -func (s mysqlAddressRepo) DelAddress(ctx context.Context, addr address.Address) error { - return s.DB.WithContext(ctx).Model((*mysqlAddress)(nil)).Where("addr = ? and is_deleted = ?", addr.String(), repo.NotDeleted). +func (s mysqlAddressRepo) DelAddress(ctx context.Context, addr string) error { + return s.DB.WithContext(ctx).Model((*mysqlAddress)(nil)).Where("addr = ? and is_deleted = ?", addr, repo.NotDeleted). UpdateColumns(map[string]interface{}{"is_deleted": repo.Deleted, "state": types.AddressStateRemoved, "updated_at": time.Now()}).Error } diff --git a/models/mysql/address_test.go b/models/mysql/address_test.go index aaa7f2b5..2b0a12c4 100644 --- a/models/mysql/address_test.go +++ b/models/mysql/address_test.go @@ -102,7 +102,7 @@ func testGetOneRecord(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) { WithArgs(addr.String()). WillReturnRows(sqlmock.NewRows([]string{"addr"}).AddRow(addr.String())) - res, err := r.AddressRepo().GetOneRecord(ctx, addr) + res, err := r.AddressRepo().GetOneRecord(ctx, addr.String()) assert.NoError(t, err) assert.Equal(t, addr, res.Addr) } @@ -158,7 +158,7 @@ func testDelAddress(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) { WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - err := r.AddressRepo().DelAddress(ctx, addr) + err := r.AddressRepo().DelAddress(ctx, addr.String()) assert.NoError(t, err) } diff --git a/models/repo/address_repo.go b/models/repo/address_repo.go index d9576fa7..5b915b30 100644 --- a/models/repo/address_repo.go +++ b/models/repo/address_repo.go @@ -14,11 +14,11 @@ type AddressRepo interface { SaveAddress(ctx context.Context, address *types.Address) error GetAddress(ctx context.Context, addr address.Address) (*types.Address, error) GetAddressByID(ctx context.Context, id shared.UUID) (*types.Address, error) - GetOneRecord(ctx context.Context, addr address.Address) (*types.Address, error) + GetOneRecord(ctx context.Context, addr string) (*types.Address, error) HasAddress(ctx context.Context, addr address.Address) (bool, error) ListAddress(ctx context.Context) ([]*types.Address, error) ListActiveAddress(ctx context.Context) ([]*types.Address, error) - DelAddress(ctx context.Context, addr address.Address) error + DelAddress(ctx context.Context, addr string) error UpdateNonce(addr address.Address, nonce uint64) (int64, error) UpdateState(ctx context.Context, addr address.Address, state types.AddressState) error UpdateSelectMsgNum(ctx context.Context, addr address.Address, num uint64) error diff --git a/models/sqlite/address.go b/models/sqlite/address.go index 019d0887..483c1d57 100644 --- a/models/sqlite/address.go +++ b/models/sqlite/address.go @@ -110,9 +110,9 @@ func (s sqliteAddressRepo) GetAddressByID(ctx context.Context, id shared.UUID) ( return a.Address() } -func (s sqliteAddressRepo) GetOneRecord(ctx context.Context, addr address.Address) (*types.Address, error) { +func (s sqliteAddressRepo) GetOneRecord(ctx context.Context, addr string) (*types.Address, error) { var a sqliteAddress - if err := s.DB.WithContext(ctx).Take(&a, "addr = ?", addr.String()).Error; err != nil { + if err := s.DB.WithContext(ctx).Take(&a, "addr = ?", addr).Error; err != nil { return nil, err } @@ -206,8 +206,8 @@ func (s sqliteAddressRepo) UpdateFeeParams(ctx context.Context, addr address.Add return s.DB.WithContext(ctx).Model((*sqliteAddress)(nil)).Where("addr = ? and is_deleted = -1", addr.String()).UpdateColumns(updateColumns).Error } -func (s sqliteAddressRepo) DelAddress(ctx context.Context, addr address.Address) error { - return s.DB.WithContext(ctx).Model((*sqliteAddress)(nil)).Where("addr = ? and is_deleted = -1", addr.String()). +func (s sqliteAddressRepo) DelAddress(ctx context.Context, addr string) error { + return s.DB.WithContext(ctx).Model((*sqliteAddress)(nil)).Where("addr = ? and is_deleted = -1", addr). UpdateColumns(map[string]interface{}{"is_deleted": repo.Deleted, "state": types.AddressStateRemoved, "updated_at": time.Now()}).Error } diff --git a/models/sqlite/address_test.go b/models/sqlite/address_test.go index d1b4671a..e2794c92 100644 --- a/models/sqlite/address_test.go +++ b/models/sqlite/address_test.go @@ -192,19 +192,19 @@ func TestAddress(t *testing.T) { }) t.Run("DelAddress", func(t *testing.T) { - assert.NoError(t, addressRepo.DelAddress(ctx, addrInfo2.Addr)) + assert.NoError(t, addressRepo.DelAddress(ctx, addrInfo2.Addr.String())) r, err := addressRepo.GetAddress(ctx, addrInfo2.Addr) assert.Error(t, err) assert.Nil(t, r) - r, err = addressRepo.GetOneRecord(ctx, addrInfo2.Addr) + r, err = addressRepo.GetOneRecord(ctx, addrInfo2.Addr.String()) assert.NoError(t, err) assert.Equal(t, types.AddressStateRemoved, r.State) assert.Equal(t, repo.Deleted, r.IsDeleted) // delete a not exist address - err = addressRepo.DelAddress(ctx, randAddr) + err = addressRepo.DelAddress(ctx, randAddr.String()) assert.NoError(t, err) _, err = addressRepo.GetAddress(ctx, randAddr) assert.Contains(t, err.Error(), gorm.ErrRecordNotFound.Error()) diff --git a/service/address_service.go b/service/address_service.go index 59690b33..7b825ee3 100644 --- a/service/address_service.go +++ b/service/address_service.go @@ -121,7 +121,7 @@ func (addressService *AddressService) ListActiveAddress(ctx context.Context) ([] } func (addressService *AddressService) DeleteAddress(ctx context.Context, addr address.Address) error { - return addressService.repo.AddressRepo().DelAddress(ctx, addr) + return addressService.repo.AddressRepo().DelAddress(ctx, addr.String()) } func (addressService *AddressService) ForbiddenAddress(ctx context.Context, addr address.Address) error { From 7c8d65ef4e59a437b1242088ccd87ae0f0034996 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Mon, 2 Dec 2024 15:54:11 +0800 Subject: [PATCH 09/11] chore: fix test --- models/mysql/address.go | 3 +-- models/mysql/address_test.go | 4 ++-- models/sqlite/address.go | 3 +-- models/sqlite/address_test.go | 6 ++---- 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/models/mysql/address.go b/models/mysql/address.go index b31f40fd..f6d833ee 100644 --- a/models/mysql/address.go +++ b/models/mysql/address.go @@ -166,8 +166,7 @@ func (s mysqlAddressRepo) ListActiveAddress(ctx context.Context) ([]*types.Addre } func (s mysqlAddressRepo) DelAddress(ctx context.Context, addr string) error { - return s.DB.WithContext(ctx).Model((*mysqlAddress)(nil)).Where("addr = ? and is_deleted = ?", addr, repo.NotDeleted). - UpdateColumns(map[string]interface{}{"is_deleted": repo.Deleted, "state": types.AddressStateRemoved, "updated_at": time.Now()}).Error + return s.DB.WithContext(ctx).Where("addr = ?", addr).Delete(&mysqlAddress{}).Error } func (s mysqlAddressRepo) UpdateNonce(addr address.Address, nonce uint64) (int64, error) { diff --git a/models/mysql/address_test.go b/models/mysql/address_test.go index 2b0a12c4..dd4e6cb9 100644 --- a/models/mysql/address_test.go +++ b/models/mysql/address_test.go @@ -153,8 +153,8 @@ func testDelAddress(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) { mock.ExpectBegin() mock.ExpectExec(regexp.QuoteMeta( - "UPDATE `addresses` SET `is_deleted`=?,`state`=?,`updated_at`=? WHERE addr = ? and is_deleted = ?")). - WithArgs(repo.Deleted, types.AddressStateRemoved, anyTime{}, addr.String(), repo.NotDeleted). + "DELETE FROM `addresses` WHERE addr = ?")). + WithArgs(addr.String()). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() diff --git a/models/sqlite/address.go b/models/sqlite/address.go index 483c1d57..94223faf 100644 --- a/models/sqlite/address.go +++ b/models/sqlite/address.go @@ -207,8 +207,7 @@ func (s sqliteAddressRepo) UpdateFeeParams(ctx context.Context, addr address.Add } func (s sqliteAddressRepo) DelAddress(ctx context.Context, addr string) error { - return s.DB.WithContext(ctx).Model((*sqliteAddress)(nil)).Where("addr = ? and is_deleted = -1", addr). - UpdateColumns(map[string]interface{}{"is_deleted": repo.Deleted, "state": types.AddressStateRemoved, "updated_at": time.Now()}).Error + return s.DB.WithContext(ctx).Where("addr = ?", addr).Delete(&sqliteAddress{}).Error } var _ repo.AddressRepo = &sqliteAddressRepo{} diff --git a/models/sqlite/address_test.go b/models/sqlite/address_test.go index e2794c92..1629bacc 100644 --- a/models/sqlite/address_test.go +++ b/models/sqlite/address_test.go @@ -14,7 +14,6 @@ import ( "github.com/stretchr/testify/assert" "gorm.io/gorm" - "github.com/ipfs-force-community/sophon-messager/models/repo" "github.com/ipfs-force-community/sophon-messager/testhelper" ) @@ -199,9 +198,8 @@ func TestAddress(t *testing.T) { assert.Nil(t, r) r, err = addressRepo.GetOneRecord(ctx, addrInfo2.Addr.String()) - assert.NoError(t, err) - assert.Equal(t, types.AddressStateRemoved, r.State) - assert.Equal(t, repo.Deleted, r.IsDeleted) + assert.Error(t, err) + assert.Nil(t, r) // delete a not exist address err = addressRepo.DelAddress(ctx, randAddr.String()) From 3f05a33cf51abe9a8bc0d647d03b5852b26f0722 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Mon, 2 Dec 2024 16:20:01 +0800 Subject: [PATCH 10/11] fix: address id is empty --- models/modules.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/models/modules.go b/models/modules.go index d286ed19..90cc00fa 100644 --- a/models/modules.go +++ b/models/modules.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + shared "github.com/filecoin-project/venus/venus-shared/types" types "github.com/filecoin-project/venus/venus-shared/types/messager" "github.com/ipfs-force-community/sophon-messager/filestore" "github.com/ipfs-force-community/sophon-messager/models/mysql" @@ -30,10 +31,10 @@ func SetDataBase(fsRepo filestore.FSRepo) (repo.Repo, error) { } func AutoMigrate(repo repo.Repo) error { - if err := MigrateAddress(repo); err != nil { - return fmt.Errorf("migrate address: %w", err) + if err := repo.AutoMigrate(); err != nil { + return fmt.Errorf("migrate: %w", err) } - return repo.AutoMigrate() + return MigrateAddress(repo) } func Options() fx.Option { @@ -67,6 +68,7 @@ func MigrateAddress(r repo.Repo) error { log.Infof("migrate address %s to %s", tAddr, fAddr) now := time.Now() newAddrInfo := &types.Address{ + ID: shared.NewUUID(), Addr: addrInfo.Addr, Nonce: addrInfo.Nonce, SelMsgNum: addrInfo.SelMsgNum, From 00a1103aa20dccebf5ce76e10a395893d1c7243f Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Mon, 2 Dec 2024 17:08:28 +0800 Subject: [PATCH 11/11] fix: delete old address --- models/modules.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/models/modules.go b/models/modules.go index 90cc00fa..5fefabc9 100644 --- a/models/modules.go +++ b/models/modules.go @@ -48,22 +48,30 @@ func Options() fx.Option { } func MigrateAddress(r repo.Repo) error { - list, err := r.AddressRepo().ListAddress(context.Background()) + ctx := context.Background() + list, err := r.AddressRepo().ListAddress(ctx) if err != nil { return err } return r.Transaction(func(txRepo repo.TxRepo) error { + for _, addrInfo := range list { fAddr := addrInfo.Addr.String() - _, err := txRepo.AddressRepo().GetOneRecord(context.Background(), fAddr) + tAddr := "t" + fAddr[1:] + _, err := txRepo.AddressRepo().GetOneRecord(ctx, fAddr) if err == nil { + if _, err := txRepo.AddressRepo().GetOneRecord(ctx, tAddr); err == nil { + if err := txRepo.AddressRepo().DelAddress(ctx, tAddr); err != nil { + return err + } + log.Infof("delete address %s success", tAddr) + } continue } if !errors.Is(err, gorm.ErrRecordNotFound) { return err } - tAddr := "t" + fAddr[1:] log.Infof("migrate address %s to %s", tAddr, fAddr) now := time.Now()