Skip to content

Commit

Permalink
fix(message-watcher): milestone timestamp to change after new receive…
Browse files Browse the repository at this point in the history
…d messages (#306)

Signed-off-by: failfmi <oscurocalma@gmail.com>
  • Loading branch information
failfmi authored Oct 7, 2021
1 parent c3918a6 commit b0b89fe
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 80 deletions.
33 changes: 13 additions & 20 deletions app/process/watcher/message/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,33 +111,26 @@ func (cmw Watcher) beginWatching(q qi.Queue) {
cmw.logger.Infof("Watching for Messages after Timestamp [%s]", timestamp.ToHumanReadable(milestoneTimestamp))

for {
err := cmw.getAndProcessMessages(q, milestoneTimestamp)
messages, err := cmw.client.GetMessagesAfterTimestamp(cmw.topicID, milestoneTimestamp)
if err != nil {
cmw.logger.Errorf("Error while retrieving messages from mirror node. Error [%s]", err)
go cmw.beginWatching(q)
return
}
time.Sleep(cmw.pollingInterval * time.Second)
}
}

func (cmw Watcher) getAndProcessMessages(q qi.Queue, milestoneTimestamp int64) error {
messages, err := cmw.client.GetMessagesAfterTimestamp(cmw.topicID, milestoneTimestamp)
if err != nil {
cmw.logger.Errorf("Error while retrieving messages from mirror node. Error [%s]", err)
return err
}
cmw.logger.Tracef("Polling found [%d] Messages", len(messages))

cmw.logger.Tracef("Polling found [%d] Messages", len(messages))

for _, msg := range messages {
milestoneTimestamp, err = timestamp.FromString(msg.ConsensusTimestamp)
if err != nil {
cmw.logger.Errorf("Unable to parse latest message timestamp. Error - [%s].", err)
continue
for _, msg := range messages {
milestoneTimestamp, err = timestamp.FromString(msg.ConsensusTimestamp)
if err != nil {
cmw.logger.Errorf("Unable to parse latest message timestamp. Error - [%s].", err)
continue
}
cmw.processMessage(msg, q)
cmw.updateStatusTimestamp(milestoneTimestamp)
}
cmw.processMessage(msg, q)
cmw.updateStatusTimestamp(milestoneTimestamp)
time.Sleep(cmw.pollingInterval * time.Second)
}
return nil
}

func (cmw Watcher) processMessage(topicMsg model.Message, q qi.Queue) {
Expand Down
109 changes: 49 additions & 60 deletions app/process/watcher/message/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@
package message

import (
"encoding/base64"
"errors"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/hashgraph/hedera-sdk-go/v2"
"github.com/limechain/hedera-eth-bridge-validator/app/clients/hedera/mirror-node/model"
"github.com/limechain/hedera-eth-bridge-validator/app/core/queue"
"github.com/limechain/hedera-eth-bridge-validator/app/helper/timestamp"
"github.com/limechain/hedera-eth-bridge-validator/app/model/message"
"github.com/limechain/hedera-eth-bridge-validator/config"
"github.com/limechain/hedera-eth-bridge-validator/constants"
p "github.com/limechain/hedera-eth-bridge-validator/proto"
"github.com/limechain/hedera-eth-bridge-validator/test/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"gorm.io/gorm"
"testing"
Expand All @@ -40,6 +39,8 @@ var (
Realm: 0,
Topic: 1,
}
consensusTimestamp = "1633633534.108746000"
milestoneTimestamp, _ = timestamp.FromString(consensusTimestamp)
)

func Test_UpdateStatusTimestamp(t *testing.T) {
Expand All @@ -48,65 +49,12 @@ func Test_UpdateStatusTimestamp(t *testing.T) {
w.updateStatusTimestamp(1)
}

func Test_GetAndProcessMessages_Fails(t *testing.T) {
setup()
mocks.MStatusRepository.On("Get", topicID.String()).Return(int64(1), nil)
mocks.MHederaMirrorClient.On("GetMessagesAfterTimestamp", topicID, int64(1)).Return([]model.Message{}, errors.New("some-error"))
err := w.getAndProcessMessages(mocks.MQueue, 1)
assert.Equal(t, errors.New("some-error"), err)
}

func Test_ProcessMessage_FromString_Fails(t *testing.T) {
setup()
w.processMessage(model.Message{Contents: "invalid-data"}, mocks.MQueue)
mocks.MQueue.AssertNotCalled(t, "Push", mock.Anything)
}

func Test_GetAndProcessMessages_HappyPath(t *testing.T) {
setup()
actualMessage := &p.TopicEthSignatureMessage{
SourceChainId: 0,
TargetChainId: 1,
TransferID: "some-id",
Asset: constants.Hbar,
Recipient: "0xsomeethereumaddress",
Amount: "100",
TransactionTimestamp: 10000000000000,
}
bytes, e := proto.Marshal(actualMessage)
if e != nil {
t.Fatal(e)
}
contents := base64.StdEncoding.EncodeToString(bytes)
m := model.Message{
ConsensusTimestamp: "10000.0",
TopicId: topicID.String(),
Contents: contents,
}
mocks.MStatusRepository.On("Update", topicID.String(), int64(10000000000000)).Return(nil)
mocks.MStatusRepository.On("Get", topicID.String()).Return(int64(1), nil)
mocks.MHederaMirrorClient.On("GetMessagesAfterTimestamp", topicID, int64(1)).Return([]model.Message{m}, nil)
mocks.MQueue.On("Push", mock.Anything)
err := w.getAndProcessMessages(mocks.MQueue, 1)
assert.Nil(t, err)
mocks.MQueue.AssertCalled(t, "Push", mock.Anything)
}

func Test_GetAndProcessMessages_InvalidConsensusTimestamp(t *testing.T) {
setup()
m := model.Message{
ConsensusTimestamp: "aaaa",
TopicId: topicID.String(),
Contents: "some-content-here",
}
mocks.MStatusRepository.On("Get", topicID.String()).Return(int64(1), nil)
mocks.MHederaMirrorClient.On("GetMessagesAfterTimestamp", topicID, int64(1)).Return([]model.Message{m}, nil)
err := w.getAndProcessMessages(mocks.MQueue, 1)
assert.Nil(t, err)
mocks.MQueue.AssertNotCalled(t, "Push", mock.Anything, mock.Anything)
mocks.MStatusRepository.AssertNotCalled(t, "Update", mock.Anything, mock.Anything)
}

func Test_NewWatcher(t *testing.T) {
mocks.Setup()
mocks.MStatusRepository.On("Get", topicID.String()).Return(int64(0), nil)
Expand All @@ -122,9 +70,50 @@ func Test_NewWatcher_Get_Error(t *testing.T) {

func Test_NewWatcher_WithTS(t *testing.T) {
mocks.Setup()
mocks.MStatusRepository.On("Get", topicID.String()).Return(int64(1), nil)
mocks.MStatusRepository.On("Update", topicID.String(), int64(1)).Return(nil)
NewWatcher(mocks.MHederaMirrorClient, "0.0.1", mocks.MStatusRepository, 1, 1)
mocks.MStatusRepository.On("Get", topicID.String()).Return(int64(6), nil)
mocks.MStatusRepository.On("Update", topicID.String(), int64(6)).Return(nil)
NewWatcher(mocks.MHederaMirrorClient, "0.0.1", mocks.MStatusRepository, 1, 6)
}

func Test_BeginWatch_FailsMessagesRetrieval(t *testing.T) {
setup()
mocks.MStatusRepository.On("Get", topicID.String()).Return(int64(5), nil)
mocks.MHederaMirrorClient.On("GetMessagesAfterTimestamp", topicID, int64(5)).Return([]model.Message{}, errors.New("some-error"))
w.beginWatching(mocks.MQueue)

mocks.MQueue.AssertNotCalled(t, "Push", mock.Anything)
mocks.MStatusRepository.AssertNotCalled(t, "Update", mock.Anything)
}

func Test_BeginWatch_SuccessfulExecution(t *testing.T) {
m := model.Message{
ConsensusTimestamp: consensusTimestamp,
TopicId: "0.0.4321",
Contents: "EIHxBBodMC4wLjE4OTMtMTYzMTI2MDg5MC05NDgyMDg5NDkiKjB4MDg3MkI5RjY1OUYwYjQ" +
"xNGU1M2ZEYWIyQjY2OThDMzRCYWMxY0I5MCoqMHgwZjJGNjYyM2FDNGI5NGUxZDYxQjRDZD" +
"E5NUE2YzI4OTkyMzEwRjk2Mgk5MDAwMDAwMDE6ggE0YThiZmNhMmY2MGVkN2M5NDkwZDBhZ" +
"DNiZWNmODk2YmVjMGYxYmYxZmFiOTlhNWQwMmY4ZjZiYzU1NWZmNTA2NzdiOWRkMWJmOTg4" +
"OGIxMzZhYjhlMzMzMjE0NjJjMGRkZWNiNWQ5NzE3YTY1OGQxYjYyZTliYTkyY2Q4OTlmYjFj",
RunningHash: "0xff3",
}
payload, _ := message.FromString(m.Contents, m.ConsensusTimestamp)
queueMessage := &queue.Message{
Payload: payload,
Topic: constants.TopicMessageValidation,
}

setup()
mocks.MStatusRepository.On("Get", topicID.String()).Return(int64(2), nil).Once()
mocks.MStatusRepository.On("Get", topicID.String()).Return(milestoneTimestamp, nil)
mocks.MHederaMirrorClient.On("GetMessagesAfterTimestamp", topicID, int64(2)).Return([]model.Message{m}, nil).Once()
mocks.MHederaMirrorClient.On("GetMessagesAfterTimestamp", topicID, milestoneTimestamp).Return([]model.Message{}, errors.New("some-error"))
mocks.MQueue.On("Push", queueMessage)
mocks.MStatusRepository.On("Update", topicID.String(), milestoneTimestamp).Return(nil)

w.beginWatching(mocks.MQueue)

mocks.MQueue.AssertCalled(t, "Push", queueMessage)
mocks.MStatusRepository.AssertCalled(t, "Update", topicID.String(), milestoneTimestamp)
}

func setup() {
Expand Down

0 comments on commit b0b89fe

Please sign in to comment.