Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leader lane multi-chain gas price reports #1071

Merged
merged 17 commits into from
Jun 27, 2024
Merged
218 changes: 130 additions & 88 deletions core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,25 @@ func (r *CommitReportingPlugin) Observation(ctx context.Context, epochAndRound t
return nil, err
}

sourceGasPriceUSD, tokenPricesUSD, err := r.observePriceUpdates(ctx)
// Fetches multi-lane gasPricesUSD and tokenPricesUSD for the same dest chain
gasPricesUSD, tokenPricesUSD, err := r.observePriceUpdates(ctx)
if err != nil {
return nil, err
}
// Set prices to empty maps if nil to be friendlier to JSON encoding
if gasPricesUSD == nil {
gasPricesUSD = map[uint64]*big.Int{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better to do this inside r.observePriceUpdates(ctx)

}
if tokenPricesUSD == nil {
tokenPricesUSD = map[cciptypes.Address]*big.Int{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

}
// For backwards compatibility with the older release during phased rollout, set the default gas price on this lane
defaultGasPricesUSD := gasPricesUSD[r.sourceChainSelector]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename var to defaultGasPriceUSD and check if it exists in the map?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to sourceGasPriceUSD, it's ok if it does not exist, nil for this value is supported


lggr.Infow("Observation",
"minSeqNr", minSeqNr,
"maxSeqNr", maxSeqNr,
"sourceGasPriceUSD", sourceGasPriceUSD,
"gasPricesUSD", gasPricesUSD,
"tokenPricesUSD", tokenPricesUSD,
"epochAndRound", epochAndRound,
"messageIDs", messageIDs,
Expand All @@ -134,11 +144,32 @@ func (r *CommitReportingPlugin) Observation(ctx context.Context, epochAndRound t
Min: minSeqNr,
Max: maxSeqNr,
},
TokenPricesUSD: tokenPricesUSD,
SourceGasPriceUSD: sourceGasPriceUSD,
TokenPricesUSD: tokenPricesUSD,
SourceGasPriceUSD: defaultGasPricesUSD,
SourceGasPriceUSDPerChain: gasPricesUSD,
}.Marshal()
}

// observePriceUpdates fetches latest gas and token prices from DB as long as price reporting is not disabled.
// The prices are aggregated for all lanes for the same destination chain.
func (r *CommitReportingPlugin) observePriceUpdates(
ctx context.Context,
) (gasPricesUSD map[uint64]*big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) {
// Do not observe prices if price reporting is disabled. Price reporting will be disabled for lanes that are not leader lanes.
if r.offchainConfig.PriceReportingDisabled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth logging in these scenarios or nah?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also sidenote I wish this was PriceReportingEnabled instead

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah me too, this was defined in April under a different leader lane architecture that got killed due to LOOPPS, but Engops already update the tooling to include this flag, didn't wanna change it again

r.lggr.Infow("Price reporting disabled, skipping gas and token price reads")
return nil, nil, nil
}

// Fetches multi-lane gas prices and token prices, for the given dest chain
gasPricesUSD, tokenPricesUSD, err = r.priceService.GetGasAndTokenPrices(ctx, r.destChainSelector)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth checking that gasPricesUSD and tokenPricesUSD are not nil and erroring if they are? Thats a breakage in the API contract

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after the API cleanup, added explicit nil check

if err != nil {
return nil, nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap err?

}

return gasPricesUSD, tokenPricesUSD, nil
}

func (r *CommitReportingPlugin) calculateMinMaxSequenceNumbers(ctx context.Context, lggr logger.Logger) (uint64, uint64, []cciptypes.Hash, error) {
nextSeqNum, err := r.commitStoreReader.GetExpectedNextSequenceNumber(ctx)
if err != nil {
Expand Down Expand Up @@ -174,23 +205,6 @@ func (r *CommitReportingPlugin) calculateMinMaxSequenceNumbers(ctx context.Conte
return minSeqNr, maxSeqNr, messageIDs, nil
}

func (r *CommitReportingPlugin) observePriceUpdates(
ctx context.Context,
) (sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) {
gasPricesUSD, tokenPricesUSD, err := r.priceService.GetGasAndTokenPrices(ctx, r.destChainSelector)
if err != nil {
return nil, nil, err
}

// Reduce to single gas price for compatibility. In a followup PR, Commit plugin will make use of all source chain gas prices.
sourceGasPriceUSD = gasPricesUSD[r.sourceChainSelector]
if sourceGasPriceUSD == nil {
return nil, nil, fmt.Errorf("missing gas price for sourceChainSelector %d", r.sourceChainSelector)
}

return sourceGasPriceUSD, tokenPricesUSD, nil
}

// Gets the latest token price updates based on logs within the heartbeat
// The updates returned by this function are guaranteed to not contain nil values.
func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context, now time.Time) (map[cciptypes.Address]update, error) {
Expand Down Expand Up @@ -219,33 +233,34 @@ func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context,
return latestUpdates, nil
}

// getLatestGasPriceUpdate returns the latest gas price update based on logs within the heartbeat.
// If an update is found, it is not expected to contain a nil value. If no updates found, empty update with nil value is returned.
func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now time.Time) (gasUpdate update, error error) {
// If there are no price updates inflight, check latest prices onchain
gasPriceUpdates, err := r.destPriceRegistryReader.GetGasPriceUpdatesCreatedAfter(
// getLatestGasPriceUpdate returns the latest gas price updates based on logs within the heartbeat.
// If an update is found, it is not expected to contain a nil value.
func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now time.Time) (map[uint64]update, error) {
gasPriceUpdates, err := r.destPriceRegistryReader.GetAllGasPriceUpdatesCreatedAfter(
ctx,
r.sourceChainSelector,
now.Add(-r.offchainConfig.GasPriceHeartBeat),
0,
)

if err != nil {
return update{}, err
return nil, err
}

for _, priceUpdate := range gasPriceUpdates {
latestUpdates := make(map[uint64]update)
for _, gasUpdate := range gasPriceUpdates {
priceUpdate := gasUpdate.GasPriceUpdate
// Ordered by ascending timestamps
timestamp := time.Unix(priceUpdate.GasPriceUpdate.TimestampUnixSec.Int64(), 0)
if !timestamp.Before(gasUpdate.timestamp) {
gasUpdate = update{
timestamp := time.Unix(priceUpdate.TimestampUnixSec.Int64(), 0)
if priceUpdate.Value != nil && !timestamp.Before(latestUpdates[priceUpdate.DestChainSelector].timestamp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if latestUpdates[priceUpdate.DestChainSelector] exists?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update is a struct, I think it's intended that latestUpdates[priceUpdate.DestChainSelector] may not exist, if it doesn't, then timestamp is 0 so !timestamp.Before is always true, and that initializes the value

latestUpdates[priceUpdate.DestChainSelector] = update{
timestamp: timestamp,
value: priceUpdate.Value,
}
}
}

r.lggr.Infow("Latest gas price from log poller", "gasPriceUpdateVal", gasUpdate.value, "gasPriceUpdateTs", gasUpdate.timestamp)
return gasUpdate, nil
r.lggr.Infow("Latest gas price from log poller", "latestUpdates", latestUpdates)
return latestUpdates, nil
}

func (r *CommitReportingPlugin) Report(ctx context.Context, epochAndRound types.ReportTimestamp, _ types.Query, observations []types.AttributedObservation) (bool, types.Report, error) {
Expand All @@ -259,7 +274,7 @@ func (r *CommitReportingPlugin) Report(ctx context.Context, epochAndRound types.

parsableObservations := ccip.GetParsableObservations[ccip.CommitObservation](lggr, observations)

intervals, gasPriceObs, tokenPriceObs, err := extractObservationData(lggr, r.F, parsableObservations)
intervals, gasPriceObs, tokenPriceObs, err := extractObservationData(lggr, r.F, r.sourceChainSelector, parsableObservations)
if err != nil {
return false, nil, err
}
Expand Down Expand Up @@ -363,18 +378,26 @@ func calculateIntervalConsensus(intervals []cciptypes.CommitStoreInterval, f int

// extractObservationData extracts observation fields into their own slices
// and filters out observation data that are invalid
func extractObservationData(lggr logger.Logger, f int, observations []ccip.CommitObservation) (intervals []cciptypes.CommitStoreInterval, gasPrices []*big.Int, tokenPrices map[cciptypes.Address][]*big.Int, err error) {
// We require at least f+1 observations to each consensus. Checking to ensure there are at least f+1 parsed observations.
func extractObservationData(lggr logger.Logger, f int, sourceChainSelector uint64, observations []ccip.CommitObservation) (intervals []cciptypes.CommitStoreInterval, gasPrices map[uint64][]*big.Int, tokenPrices map[cciptypes.Address][]*big.Int, err error) {
// We require at least f+1 observations to reach consensus. Checking to ensure there are at least f+1 parsed observations.
if len(observations) <= f {
return nil, nil, nil, fmt.Errorf("not enough observations to form consensus: #obs=%d, f=%d", len(observations), f)
}

gasPriceObservations := make(map[uint64][]*big.Int)
tokenPriceObservations := make(map[cciptypes.Address][]*big.Int)
for _, obs := range observations {
intervals = append(intervals, obs.Interval)

if obs.SourceGasPriceUSD != nil {
gasPrices = append(gasPrices, obs.SourceGasPriceUSD)
for selector, price := range obs.SourceGasPriceUSDPerChain {
if price != nil {
gasPriceObservations[selector] = append(gasPriceObservations[selector], price)
}
}
// During phased rollout, NOPs running old release only report SourceGasPriceUSD.
// An empty `SourceGasPriceUSDPerChain` with a non-nil `SourceGasPriceUSD` can only happen with old release.
if len(obs.SourceGasPriceUSDPerChain) == 0 && obs.SourceGasPriceUSD != nil {
gasPriceObservations[sourceChainSelector] = append(gasPriceObservations[sourceChainSelector], obs.SourceGasPriceUSD)
}

for token, price := range obs.TokenPricesUSD {
Expand All @@ -384,29 +407,39 @@ func extractObservationData(lggr logger.Logger, f int, observations []ccip.Commi
}
}

// Observations are invalid if observed gas price is nil, we require at least f+1 valid observations.
if len(gasPrices) <= f {
return nil, nil, nil, fmt.Errorf("not enough valid observations with non-nil gas prices: #obs=%d, f=%d", len(gasPrices), f)
// Price is dropped if there are not enough valid observations. With a threshold of 2*(f-1) + 1, we achieve a balance between safety and liveness.
// During phased-rollout where some honest nodes may not have started observing the token yet, it requires 5 malicious node with 1 being the leader to successfully alter price.
// During regular operation, it requires 3 malicious nodes with 1 being the leader to temporarily delay price update for the token.
priceReportingThreshold := 2*(f-1) + 1

gasPrices = make(map[uint64][]*big.Int)
for selector, perChainPriceObservations := range gasPriceObservations {
if len(perChainPriceObservations) < priceReportingThreshold {
lggr.Warnf("Skipping chain with selector %d due to not enough valid observations: #obs=%d, f=%d, threshold=%d", selector, len(perChainPriceObservations), f, priceReportingThreshold)
continue
}
gasPrices[selector] = perChainPriceObservations
}

tokenPrices = make(map[cciptypes.Address][]*big.Int)
for token, perTokenPriceObservations := range tokenPriceObservations {
// Token price is dropped if there are not enough valid observations. With a threshold of 2*(f-1) + 1, we achieve a balance between safety and liveness.
// During phased-rollout where some honest nodes may not have started observing the token yet, it requires 5 malicious node with 1 being the leader to successfully alter price.
// During regular operation, it requires 3 malicious nodes with 1 being the leader to temporarily delay price update for the token.
if len(perTokenPriceObservations) < (2*(f-1) + 1) {
lggr.Warnf("Skipping token %s due to not enough valid observations: #obs=%d, f=%d", string(token), len(perTokenPriceObservations), f)
if len(perTokenPriceObservations) < priceReportingThreshold {
lggr.Warnf("Skipping token %s due to not enough valid observations: #obs=%d, f=%d, threshold=%d", string(token), len(perTokenPriceObservations), f, priceReportingThreshold)
continue
}

tokenPrices[token] = perTokenPriceObservations
}

return intervals, gasPrices, tokenPrices, nil
}

// selectPriceUpdates filters out gas and token price updates that are already inflight
func (r *CommitReportingPlugin) selectPriceUpdates(ctx context.Context, now time.Time, gasPriceObs []*big.Int, tokenPriceObs map[cciptypes.Address][]*big.Int) ([]cciptypes.GasPrice, []cciptypes.TokenPrice, error) {
func (r *CommitReportingPlugin) selectPriceUpdates(ctx context.Context, now time.Time, gasPriceObs map[uint64][]*big.Int, tokenPriceObs map[cciptypes.Address][]*big.Int) ([]cciptypes.GasPrice, []cciptypes.TokenPrice, error) {
// If price reporting is disabled, there is no need to select price updates.
if r.offchainConfig.PriceReportingDisabled {
return nil, nil, nil
}

latestGasPrice, err := r.getLatestGasPriceUpdate(ctx, now)
if err != nil {
return nil, nil, err
Expand All @@ -421,8 +454,9 @@ func (r *CommitReportingPlugin) selectPriceUpdates(ctx context.Context, now time
}

// Note priceUpdates must be deterministic.
// The provided latestTokenPrices should not contain nil values.
func (r *CommitReportingPlugin) calculatePriceUpdates(gasPriceObs []*big.Int, tokenPriceObs map[cciptypes.Address][]*big.Int, latestGasPrice update, latestTokenPrices map[cciptypes.Address]update) ([]cciptypes.GasPrice, []cciptypes.TokenPrice, error) {
// The provided gasPriceObs and tokenPriceObs should not contain nil values.
// The returned latestGasPrice and latestTokenPrices should not contain nil values.
func (r *CommitReportingPlugin) calculatePriceUpdates(gasPriceObs map[uint64][]*big.Int, tokenPriceObs map[cciptypes.Address][]*big.Int, latestGasPrice map[uint64]update, latestTokenPrices map[cciptypes.Address]update) ([]cciptypes.GasPrice, []cciptypes.TokenPrice, error) {
var tokenPriceUpdates []cciptypes.TokenPrice
for token, tokenPriceObservations := range tokenPriceObs {
medianPrice := ccipcalc.BigIntSortedMiddle(tokenPriceObservations)
Expand All @@ -432,7 +466,7 @@ func (r *CommitReportingPlugin) calculatePriceUpdates(gasPriceObs []*big.Int, to
tokenPriceUpdatedRecently := time.Since(latestTokenPrice.timestamp) < r.offchainConfig.TokenPriceHeartBeat
tokenPriceNotChanged := !ccipcalc.Deviates(medianPrice, latestTokenPrice.value, int64(r.offchainConfig.TokenPriceDeviationPPB))
if tokenPriceUpdatedRecently && tokenPriceNotChanged {
r.lggr.Debugw("price was updated recently, skipping the update",
r.lggr.Debugw("token price was updated recently, skipping the update",
"token", token, "newPrice", medianPrice, "existingPrice", latestTokenPrice.value)
continue // skip the update if we recently had a price update close to the new value
}
Expand All @@ -449,30 +483,38 @@ func (r *CommitReportingPlugin) calculatePriceUpdates(gasPriceObs []*big.Int, to
return tokenPriceUpdates[i].Token < tokenPriceUpdates[j].Token
})

newGasPrice, err := r.gasPriceEstimator.Median(gasPriceObs) // Compute the median price
if err != nil {
return nil, nil, err
}
destChainSelector := r.sourceChainSelector // Assuming plugin lane is A->B, we write to B the gas price of A

var gasPriceUpdate []cciptypes.GasPrice
// Default to updating so that we update if there are no prior updates.
shouldUpdate := true
if latestGasPrice.value != nil {
gasPriceUpdatedRecently := time.Since(latestGasPrice.timestamp) < r.offchainConfig.GasPriceHeartBeat
gasPriceDeviated, err := r.gasPriceEstimator.Deviates(newGasPrice, latestGasPrice.value)
for chainSelector, gasPriceObservations := range gasPriceObs {
newGasPrice, err := r.gasPriceEstimator.Median(gasPriceObservations) // Compute the median price
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to calculate median gas price for chain selector %d: %w", chainSelector, err)
}
if gasPriceUpdatedRecently && !gasPriceDeviated {
shouldUpdate = false

// Default to updating so that we update if there are no prior updates.
latestGasPrice, exists := latestGasPrice[chainSelector]
if exists && latestGasPrice.value != nil {
gasPriceUpdatedRecently := time.Since(latestGasPrice.timestamp) < r.offchainConfig.GasPriceHeartBeat
gasPriceDeviated, err := r.gasPriceEstimator.Deviates(newGasPrice, latestGasPrice.value)
if err != nil {
return nil, nil, err
}
if gasPriceUpdatedRecently && !gasPriceDeviated {
r.lggr.Debugw("gas price was updated recently and not deviated sufficiently, skipping the update",
"chainSelector", chainSelector, "newPrice", newGasPrice, "existingPrice", latestGasPrice.value)
continue
}
}
}
if shouldUpdate {
// Although onchain interface accepts multi gas updates, we only do 1 gas price per report for now.
gasPriceUpdate = append(gasPriceUpdate, cciptypes.GasPrice{DestChainSelector: destChainSelector, Value: newGasPrice})

gasPriceUpdate = append(gasPriceUpdate, cciptypes.GasPrice{
DestChainSelector: chainSelector,
Value: newGasPrice,
})
}

sort.Slice(gasPriceUpdate, func(i, j int) bool {
return gasPriceUpdate[i].DestChainSelector < gasPriceUpdate[j].DestChainSelector
})

return gasPriceUpdate, tokenPriceUpdates, nil
}

Expand Down Expand Up @@ -608,14 +650,9 @@ func (r *CommitReportingPlugin) isStaleReport(ctx context.Context, lggr logger.L
if !hasGasPriceUpdate && !hasTokenPriceUpdates {
return true
}
// Commit plugin currently only supports 1 gas price per report. If report contains more than 1, reject the report.
if len(report.GasPrices) > 1 {
lggr.Errorw("Report is stale because it contains more than 1 gas price update", "GasPriceUpdates", report.GasPrices)
return true
}

// We consider a price update as stale when, there isn't an update or there is an update that is stale.
gasPriceStale := !hasGasPriceUpdate || r.isStaleGasPrice(ctx, lggr, report.GasPrices[0])
gasPriceStale := !hasGasPriceUpdate || r.isStaleGasPrice(ctx, lggr, report.GasPrices)
tokenPricesStale := !hasTokenPriceUpdates || r.isStaleTokenPrices(ctx, lggr, report.TokenPrices)

if gasPriceStale && tokenPricesStale {
Expand Down Expand Up @@ -654,30 +691,35 @@ func (r *CommitReportingPlugin) isStaleMerkleRoot(ctx context.Context, lggr logg
return false
}

func (r *CommitReportingPlugin) isStaleGasPrice(ctx context.Context, lggr logger.Logger, gasPrice cciptypes.GasPrice) bool {
func (r *CommitReportingPlugin) isStaleGasPrice(ctx context.Context, lggr logger.Logger, gasPriceUpdates []cciptypes.GasPrice) bool {
latestGasPrice, err := r.getLatestGasPriceUpdate(ctx, time.Now())
if err != nil {
lggr.Errorw("Report is stale because getLatestGasPriceUpdate failed", "err", err)
lggr.Errorw("Gas price is stale because getLatestGasPriceUpdate failed", "err", err)
return true
}

if latestGasPrice.value != nil {
gasPriceDeviated, err := r.gasPriceEstimator.Deviates(gasPrice.Value, latestGasPrice.value)
for _, gasPriceUpdate := range gasPriceUpdates {
latestUpdate, exists := latestGasPrice[gasPriceUpdate.DestChainSelector]
if !exists || latestUpdate.value == nil {
lggr.Infow("Found non-stale gas price", "chainSelector", gasPriceUpdate.DestChainSelector, "gasPriceUSd", gasPriceUpdate.Value)
return false
}

gasPriceDeviated, err := r.gasPriceEstimator.Deviates(gasPriceUpdate.Value, latestUpdate.value)
if err != nil {
lggr.Errorw("Report is stale because deviation check failed", "err", err)
lggr.Errorw("Gas price is stale because deviation check failed", "err", err)
return true
}

if !gasPriceDeviated {
lggr.Infow("Report is stale because of gas price",
"latestGasPriceUpdate", latestGasPrice.value,
"currentUsdPerUnitGas", gasPrice.Value,
"destChainSelector", gasPrice.DestChainSelector)
return true
if gasPriceDeviated {
lggr.Infow("Found non-stale gas price", "chainSelector", gasPriceUpdate.DestChainSelector, "gasPriceUSd", gasPriceUpdate.Value, "latestUpdate", latestUpdate.value)
return false
}
lggr.Infow("Gas price is stale", "chainSelector", gasPriceUpdate.DestChainSelector, "gasPriceUSd", gasPriceUpdate.Value, "latestGasPrice", latestUpdate.value)
}

return false
lggr.Infow("All gas prices are stale")
return true
}

func (r *CommitReportingPlugin) isStaleTokenPrices(ctx context.Context, lggr logger.Logger, priceUpdates []cciptypes.TokenPrice) bool {
Expand Down
Loading
Loading