Skip to content

Commit

Permalink
fix proposing block to BDLS and enableing TPS cal in both protocols
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed Al-Salih <a.alsalih2@gmail.com>
  • Loading branch information
ahmed82 committed Nov 30, 2023
1 parent 03ff847 commit b679a3f
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 111 deletions.
16 changes: 16 additions & 0 deletions orderer/common/multichannel/blockwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ SPDX-License-Identifier: Apache-2.0
package multichannel

import (
"math"
"sync"
"time"

"github.com/golang/protobuf/proto"
cb "github.com/hyperledger/fabric-protos-go/common"
Expand Down Expand Up @@ -201,6 +203,20 @@ func (bw *BlockWriter) commitBlock(encodedMetadataValue []byte) {
logger.Panicf("[channel: %s] Could not append block: %s", bw.support.ChannelID(), err)
}
logger.Debugf("[channel: %s] Wrote block [%d]", bw.support.ChannelID(), bw.lastBlock.GetHeader().Number)
SetTPSEndTime()
total := endTime.Sub(startTime)
logger.Infof(" **************************************** The Total time is %v , The TPS value is %v", total, float64(10000*math.Pow(10, 9))/float64(total))
}

var startTime time.Time
var endTime time.Time

func SetTPSStart() {
startTime = time.Now()
}

func SetTPSEndTime() {
endTime = time.Now()
}

func (bw *BlockWriter) addBlockSignature(block *cb.Block, consenterMetadata []byte) {
Expand Down
25 changes: 13 additions & 12 deletions orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/common/types"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft"

//"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -443,19 +444,19 @@ func (r *Registrar) newChain(configtx *cb.Envelope) {
r.lock.Lock()
defer r.lock.Unlock()

channelName, err := channelNameFromConfigTx(configtx)
if err != nil {
logger.Warnf("Failed extracting channel name: %v", err)
return
}

// fixes https://github.com/hyperledger/fabric/issues/2931
if existingChain, exists := r.chains[channelName]; exists {
if _, isRaftChain := existingChain.Chain.(*etcdraft.Chain); isRaftChain {
logger.Infof("Channel %s already created, skipping its creation", channelName)
/* channelName, err := channelNameFromConfigTx(configtx)
if err != nil {
logger.Warnf("Failed extracting channel name: %v", err)
return
}
}
// fixes https://github.com/hyperledger/fabric/issues/2931
if existingChain, exists := r.chains[channelName]; exists {
if _, isRaftChain := existingChain.Chain.(*etcdraft.Chain); isRaftChain {
logger.Infof("Channel %s already created, skipping its creation", channelName)
return
}
}*/

cs := r.createNewChain(configtx)
cs.start()
Expand Down
125 changes: 49 additions & 76 deletions orderer/consensus/bdls/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ type Chain struct {
bdlsId uint64
Channel string

ActiveNodes atomic.Value

//agent *agent

//BDLS
Expand Down Expand Up @@ -174,20 +176,21 @@ type Options struct {

// Order accepts a message which has been processed at a given configSeq.
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {

c.Metrics.NormalProposalsReceived.Add(1)
seq := c.support.Sequence()
if configSeq < seq {
c.Logger.Warnf("Normal message was validated against %d, although current config seq has advanced (%d)", configSeq, seq)
if _, err := c.support.ProcessNormalMsg(env); err != nil {
// No need to ProcessNormalMsg. this process must be in Ordered func
/*if _, err := c.support.ProcessNormalMsg(env); err != nil {
return errors.Errorf("bad normal message: %s", err)
}
}*/
}

return c.submit(env, configSeq)
}

// Configure accepts a message which reconfigures the channel
func (c *Chain) Configure(env *common.Envelope, configSeq uint64) error {
c.Metrics.ConfigProposalsReceived.Add(1)
seq := c.support.Sequence()
if configSeq < seq {
c.Logger.Warnf("Normal message was validated against %d, although current config seq has advanced (%d)", configSeq, seq)
Expand Down Expand Up @@ -269,70 +272,57 @@ func NewChain(
}

c := &Chain{
//RuntimeConfig: &atomic.Value{},
Channel: support.ChannelID(),
//Config: config,
lastBlock: b,
WALDir: walDir,
Comm: comm,
support: support,

SignerSerializer: signerSerializer,
Channel: support.ChannelID(),
lastBlock: b,
WALDir: walDir,
Comm: comm,
support: support,
PolicyManager: policyManager,
BlockPuller: blockPuller,
Logger: logger,
opts: opts,
bdlsId: selfID,

applyC: make(chan apply),
submitC: make(chan *submit),
haltC: make(chan struct{}),
doneC: make(chan struct{}),
startC: make(chan struct{}),
errorC: make(chan struct{}),

applyC: make(chan apply),
submitC: make(chan *submit),
haltC: make(chan struct{}),
doneC: make(chan struct{}),
startC: make(chan struct{}),
errorC: make(chan struct{}),
//RuntimeConfig: &atomic.Value{},
//Config: config,
clock: opts.Clock,
consensusRelation: types2.ConsensusRelationConsenter,
status: types2.StatusActive,

Metrics: &Metrics{
ClusterSize: metrics.ClusterSize.With("channel", support.ChannelID()),
CommittedBlockNumber: metrics.CommittedBlockNumber.With("channel", support.ChannelID()),
IsLeader: metrics.IsLeader.With("channel", support.ChannelID()),
LeaderID: metrics.LeaderID.With("channel", support.ChannelID()),
ClusterSize: metrics.ClusterSize.With("channel", support.ChannelID()),
CommittedBlockNumber: metrics.CommittedBlockNumber.With("channel", support.ChannelID()),
ActiveNodes: metrics.ActiveNodes.With("channel", support.ChannelID()),
IsLeader: metrics.IsLeader.With("channel", support.ChannelID()),
LeaderID: metrics.LeaderID.With("channel", support.ChannelID()),
NormalProposalsReceived: metrics.NormalProposalsReceived.With("channel", support.ChannelID()),
ConfigProposalsReceived: metrics.ConfigProposalsReceived.With("channel", support.ChannelID()),
},
bccsp: bccsp,

chConsensusMessages: make(chan struct{}, 1),
}

// Sets initial values for metrics
c.Metrics.ClusterSize.Set(float64(len(c.opts.Consenters)))
c.Metrics.IsLeader.Set(float64(0)) // all nodes start out as followers
c.Metrics.ActiveNodes.Set(float64(0))
c.Metrics.CommittedBlockNumber.Set(float64(c.lastBlock.Header.Number))

/*
lastBlock := LastBlockFromLedgerOrPanic(support, c.Logger)
lastConfigBlock := LastConfigBlockFromLedgerOrPanic(support, c.Logger)
rtc := RuntimeConfig{
logger: logger,
id: selfID,
}*/
/* rtc, err := rtc.BlockCommitted(lastConfigBlock, bccsp)
if err != nil {
return nil, errors.Wrap(err, "failed constructing RuntimeConfig")
}
rtc, err = rtc.BlockCommitted(lastBlock, bccsp)
if err != nil {
return nil, errors.Wrap(err, "failed constructing RuntimeConfig")
}
c.RuntimeConfig.Store(rtc)
*/
//c.verifier = buildVerifier(cv, c.RuntimeConfig, support, requestInspector, policyManager)
//c.consensus = bftSmartConsensusBuild(c, requestInspector)

// Setup communication with list of remotes notes for the new channel

//if err != nil {
// return nil, errors.WithStack(err)
//}

/*privateKey, err := ecdsa.GenerateKey(S256Curve, rand.Reader)
if err != nil {
c.Logger.Warnf("error generating privateKey value:", err)
Expand Down Expand Up @@ -369,9 +359,7 @@ func NewChain(
priv.D = i
priv.PublicKey.X, priv.PublicKey.Y = bdls.S256Curve.ScalarBaseMult(priv.D.Bytes())
// myself
logger.Info("XXXXXXX ", c.bdlsId, k, " XXXXXXXX ")
if int(c.bdlsId) == k+1 {
logger.Info("----- T r u e -----")
config.PrivateKey = priv
}

Expand All @@ -381,32 +369,12 @@ func NewChain(

c.config = config

/*if err := c.startConsensus(config); err != nil {
logger.Error(err)
}*/

// create the consensus object
/*consensus, err := bdls.NewConsensus(config)
if err != nil {
logger.Warnf(" ^^^^^^^ Cannot init BDLS instincas: %v", err)
return nil, errors.WithStack(err)
}*/

//c.consensus = consensus

//c.opts.Consenters
nodes, err := c.remotePeers()

if err != nil {
return nil, errors.WithStack(err)
}
c.Comm.Configure(c.support.ChannelID(), nodes)

/* if err := c.consensus.ValidateConfiguration(rtc.Nodes); err != nil {
return nil, errors.Wrap(err, "failed to verify SmartBFT-Go configuration")
}
*/
logger.Infof("BDLS is now servicing chain %s", support.ChannelID())

return c, nil
Expand Down Expand Up @@ -568,7 +536,7 @@ func (c *Chain) writeBlock(block *common.Block, index uint64) {
}

if c.blockInflight > 0 {
c.blockInflight-- // only reduce on leader
c.blockInflight-- // Reduce on All Orderer
}
c.lastBlock = block

Expand Down Expand Up @@ -711,21 +679,21 @@ func (c *Chain) startConsensus(config *bdls.Config) error {
}(peers[k])
}

//go c.TestMultiClients()
go c.TestMultiClients()

c.transportLayer = transportLayer
//updateTick := time.NewTicker(updatePeriod)
updateTick := time.NewTicker(updatePeriod)

for {
//<-updateTick.C
//c.transportLayer.Update()
<-updateTick.C
c.transportLayer.Update()
// Check for confirmed new block
height /*round*/, _, state := c.transportLayer.GetLatestState()
if height > c.lastBlock.Header.Number {
c.applyC <- apply{state}
}
}
//defer c.testOrder()

/*
// c.Order(env, 0)
var bc *blockCreator
Expand Down Expand Up @@ -893,18 +861,19 @@ func (c *Chain) run() {
<-updateTick.C
c.transportLayer.Update()
// Check for confirmed new block
height , _, state := c.transportLayer.GetLatestState()
height, _, state := c.transportLayer.GetLatestState()
if height > c.lastBlock.Header.Number {
c.applyC <- apply{state}
}
}
}()*/
for {
select {
case <-updateTick.C:
//case <-updateTick.C:

// Calling BDLS consensus Update() function
c.transportLayer.Update()
// Calling BDLS consensus Update() function
// c.transportLayer.Update() Due to new bug

// check if new block confirmed
/*height, round, state := c.transportLayer.GetLatestState()
Expand All @@ -927,6 +896,7 @@ func (c *Chain) run() {
// polled by `WaitReady`
continue
}
// Direct Ordered for the Payload
//batches, pending := c.support.BlockCutter().Ordered(s.req.Payload)

batches, pending, err := c.ordered(s.req)
Expand All @@ -952,6 +922,8 @@ func (c *Chain) run() {
}

c.propose(ch, bc, batches...)

// The code block bellow direct cut the block and propse to the consensus.
/*
batch := c.support.BlockCutter().Cut()
Expand Down Expand Up @@ -985,6 +957,7 @@ func (c *Chain) run() {
} else if c.blockInflight < c.opts.MaxInflightBlocks {
submitC = c.submitC
}
//The code section is direct writeBlock
/*//if app.state != nil {
newBlock, err := protoutil.UnmarshalBlock(app.state)
if err != nil {
Expand Down
11 changes: 4 additions & 7 deletions orderer/consensus/bdls/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ import (
"github.com/hyperledger/fabric-protos-go/msp"
ab "github.com/hyperledger/fabric-protos-go/orderer"

//"github.com/hyperledger/fabric-protos-go/orderer/smartbft"
"github.com/pkg/errors"

//cb "github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/bccsp"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/crypto"
Expand Down Expand Up @@ -106,7 +104,7 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
*/
opts := Options{
Consenters: consenters,
MaxInflightBlocks: 5,
MaxInflightBlocks: 2000,
Clock: clock.NewClock(),
}

Expand Down Expand Up @@ -162,10 +160,9 @@ func New(
Logger: logger,
Chains: r,
SignerSerializer: signerSerializer,
//WALBaseDir: walConfig.WALDir,
Metrics: NewMetrics(metricsProvider),
CreateChain: r.CreateChain,
BCCSP: BCCSP,
Metrics: NewMetrics(metricsProvider),
CreateChain: r.CreateChain,
BCCSP: BCCSP,
}

identity, _ := signerSerializer.Serialize()
Expand Down
Loading

0 comments on commit b679a3f

Please sign in to comment.