Skip to content

Commit

Permalink
Merge pull request filecoin-project#5611 from filecoin-project/feat/c…
Browse files Browse the repository at this point in the history
…onfig-migration-worker-limit

feat: optionally set max migration workers
  • Loading branch information
diwufeiwen authored Dec 26, 2022
2 parents 0182082 + 2a48978 commit c3d9edc
Showing 1 changed file with 39 additions and 14 deletions.
53 changes: 39 additions & 14 deletions pkg/fork/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"encoding/binary"
"errors"
"fmt"
"os"
"runtime"
"sort"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -69,6 +71,29 @@ var log = logging.Logger("fork")

var ErrExpensiveFork = errors.New("refusing explicit call due to state fork at epoch")

var (
MigrationMaxWorkerCount int
EnvMigrationMaxWorkerCount = "VENUS_MIGRATION_MAX_WORKER_COUNT"
)

func init() {
// the default calculation used for migration worker count
MigrationMaxWorkerCount = runtime.NumCPU()
// check if an alternative value was request by environment
if mwcs := os.Getenv(EnvMigrationMaxWorkerCount); mwcs != "" {
mwc, err := strconv.ParseInt(mwcs, 10, 32)
if err != nil {
log.Warnf("invalid value for %s (%s) defaulting to %d: %s", EnvMigrationMaxWorkerCount, mwcs, MigrationMaxWorkerCount, err)
return
}
// use value from environment
log.Infof("migration worker cound set from %s (%d)", EnvMigrationMaxWorkerCount, mwc)
MigrationMaxWorkerCount = int(mwc)
return
}
log.Infof("migration worker count: %d", MigrationMaxWorkerCount)
}

// MigrationCache can be used to cache information used by a migration. This is primarily useful to
// "pre-compute" some migration state ahead of time, and make it accessible in the migration itself.
type MigrationCache interface {
Expand Down Expand Up @@ -1603,7 +1628,7 @@ func terminateActor(ctx context.Context, tree *vmstate.State, addr address.Addre

func (c *ChainFork) UpgradeActorsV3(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand Down Expand Up @@ -1642,7 +1667,7 @@ func (c *ChainFork) UpgradeActorsV3(ctx context.Context, cache MigrationCache, r
func (c *ChainFork) PreUpgradeActorsV3(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
log.Info("PreUpgradeActorsV3 ......")
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -1706,7 +1731,7 @@ func (c *ChainFork) upgradeActorsV3Common(

func (c *ChainFork) UpgradeActorsV4(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -1728,7 +1753,7 @@ func (c *ChainFork) UpgradeActorsV4(ctx context.Context, cache MigrationCache, r

func (c *ChainFork) PreUpgradeActorsV4(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -1792,7 +1817,7 @@ func (c *ChainFork) upgradeActorsV4Common(

func (c *ChainFork) UpgradeActorsV5(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -1814,7 +1839,7 @@ func (c *ChainFork) UpgradeActorsV5(ctx context.Context, cache MigrationCache, r

func (c *ChainFork) PreUpgradeActorsV5(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -1878,7 +1903,7 @@ func (c *ChainFork) upgradeActorsV5Common(

func (c *ChainFork) UpgradeActorsV6(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -1900,7 +1925,7 @@ func (c *ChainFork) UpgradeActorsV6(ctx context.Context, cache MigrationCache, r

func (c *ChainFork) PreUpgradeActorsV6(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -1966,7 +1991,7 @@ func (c *ChainFork) upgradeActorsV6Common(

func (c *ChainFork) UpgradeActorsV7(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -1988,7 +2013,7 @@ func (c *ChainFork) UpgradeActorsV7(ctx context.Context, cache MigrationCache, r

func (c *ChainFork) PreUpgradeActorsV7(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -2063,7 +2088,7 @@ func (c *ChainFork) upgradeActorsV7Common(

func (c *ChainFork) UpgradeActorsV8(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -2087,7 +2112,7 @@ func (c *ChainFork) UpgradeActorsV8(ctx context.Context, cache MigrationCache, r

func (c *ChainFork) PreUpgradeActorsV8(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -2174,7 +2199,7 @@ func (c *ChainFork) upgradeActorsV8Common(

func (c *ChainFork) UpgradeActorsV9(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -2201,7 +2226,7 @@ func (c *ChainFork) PreUpgradeActorsV9(ctx context.Context,
ts *types.TipSet,
) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down

0 comments on commit c3d9edc

Please sign in to comment.