Skip to content

Commit

Permalink
Traffic generator V2 (Layr-Labs#1055)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmanc authored Jan 14, 2025
1 parent 17300a3 commit 2c7a414
Show file tree
Hide file tree
Showing 27 changed files with 644 additions and 1,493 deletions.
1 change: 1 addition & 0 deletions api/clients/v2/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint32, quo
}
return 0, a.cumulativePayment, nil
}

return 0, big.NewInt(0), fmt.Errorf("neither reservation nor on-demand payment is available")
}

Expand Down
1 change: 1 addition & 0 deletions api/clients/v2/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (c *disperserClient) PopulateAccountant(ctx context.Context) error {
if err != nil {
return fmt.Errorf("error setting payment state for accountant: %w", err)
}

return nil
}

Expand Down
18 changes: 10 additions & 8 deletions docker-bake.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ group "all" {
"churner",
"dataapi",
"traffic-generator",
"traffic-generator-v2",
"controller",
"relay"
]
Expand Down Expand Up @@ -84,6 +85,7 @@ group "internal-release" {
"churner-internal",
"dataapi-internal",
"traffic-generator-internal",
"traffic-generator-v2-internal",
"controller-internal",
"relay-internal"
]
Expand Down Expand Up @@ -201,19 +203,19 @@ target "traffic-generator-internal" {
]
}

target "traffic-generator2" {
target "traffic-generator-v2" {
context = "."
dockerfile = "./trafficgenerator2.Dockerfile"
dockerfile = "./trafficgenerator-v2.Dockerfile"
target = "generator2"
tags = ["${REGISTRY}/${REPO}/traffic-generator2:${BUILD_TAG}"]
tags = ["${REGISTRY}/${REPO}/traffic-generator-v2:${BUILD_TAG}"]
}

target "traffic-generator2-internal" {
inherits = ["traffic-generator2"]
target "traffic-generator-v2-internal" {
inherits = ["traffic-generator-v2"]
tags = [
"${REGISTRY}/eigenda-traffic-generator2:${BUILD_TAG}",
"${REGISTRY}/eigenda-traffic-generator2:${GIT_SHA}",
"${REGISTRY}/eigenda-traffic-generator2:sha-${GIT_SHORT_SHA}"
"${REGISTRY}/eigenda-traffic-generator-v2:${BUILD_TAG}",
"${REGISTRY}/eigenda-traffic-generator-v2:${GIT_SHA}",
"${REGISTRY}/eigenda-traffic-generator-v2:sha-${GIT_SHORT_SHA}"
]
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
golang.org/x/sync v0.8.0
golang.org/x/time v0.5.0
google.golang.org/grpc v1.64.1
gopkg.in/yaml.v2 v2.4.0
)

require (
Expand Down Expand Up @@ -156,7 +157,6 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

require (
Expand Down
24 changes: 23 additions & 1 deletion tools/traffic/cmd2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/Layr-Labs/eigenda/tools/traffic"
"github.com/Layr-Labs/eigenda/tools/traffic/config"
Expand Down Expand Up @@ -40,5 +42,25 @@ func trafficGeneratorMain(ctx *cli.Context) error {
panic(fmt.Sprintf("failed to create new traffic generator\n%s", err))
}

return generator.Start()
// Set up signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)

// Run the generator in a goroutine
errChan := make(chan error, 1)
go func() {
errChan <- generator.Start()
}()

// Wait for either an error or a signal
select {
case err := <-errChan:
return err
case sig := <-sigChan:
fmt.Printf("\nReceived signal %v, shutting down...\n", sig)
if err := generator.Stop(); err != nil {
fmt.Printf("Failed to stop generator: %v\n", err)
}
return nil
}
}
95 changes: 12 additions & 83 deletions tools/traffic/config/config.go
Original file line number Diff line number Diff line change
@@ -1,124 +1,53 @@
package config

import (
"errors"
"fmt"
"time"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/core/thegraph"
"github.com/Layr-Labs/eigenda/retriever"

"github.com/Layr-Labs/eigenda/api/clients/v2"
"github.com/Layr-Labs/eigenda/common"
"github.com/urfave/cli"
)

// Config configures a traffic generator.
type Config struct {

// Logging configuration.
LoggingConfig common.LoggerConfig

// Configuration for the disperser client.
DisperserClientConfig *clients.Config

// Configuration for the retriever client.
RetrievalClientConfig *retriever.Config

// Configuration for the graph.
TheGraphConfig *thegraph.Config
DisperserClientConfig *clients.DisperserClientConfig

// Configuration for the EigenDA client.
EigenDAClientConfig *clients.EigenDAClientConfig

// Configures the traffic generator workers.
WorkerConfig WorkerConfig
// Signer private key
SignerPrivateKey string

// The port at which the metrics server listens for HTTP requests.
MetricsHTTPPort string

// The timeout for the node client.
NodeClientTimeout time.Duration
// The amount of time to sleep after launching each worker thread.
InstanceLaunchInterval time.Duration

// Path to the runtime configuration file that defines writer groups.
RuntimeConfigPath string
}

func NewConfig(ctx *cli.Context) (*Config, error) {
loggerConfig, err := common.ReadLoggerCLIConfig(ctx, FlagPrefix)
if err != nil {
return nil, err
}
customQuorums := ctx.GlobalIntSlice(CustomQuorumNumbersFlag.Name)
customQuorumsUint8 := make([]uint8, len(customQuorums))
for i, q := range customQuorums {
if q < 0 || q > 255 {
return nil, errors.New("invalid custom quorum number")
}
customQuorumsUint8[i] = uint8(q)
}

retrieverConfig, err := retriever.NewConfig(ctx)
if err != nil {
return nil, err
}

config := &Config{
DisperserClientConfig: &clients.Config{
DisperserClientConfig: &clients.DisperserClientConfig{
Hostname: ctx.GlobalString(HostnameFlag.Name),
Port: ctx.GlobalString(GrpcPortFlag.Name),
Timeout: ctx.Duration(TimeoutFlag.Name),
UseSecureGrpcFlag: ctx.GlobalBool(UseSecureGrpcFlag.Name),
},

RetrievalClientConfig: retrieverConfig,

TheGraphConfig: &thegraph.Config{
Endpoint: ctx.String(TheGraphUrlFlag.Name),
PullInterval: ctx.Duration(TheGraphPullIntervalFlag.Name),
MaxRetries: ctx.Int(TheGraphRetriesFlag.Name),
},

EigenDAClientConfig: &clients.EigenDAClientConfig{
RPC: fmt.Sprintf("%s:%s", ctx.GlobalString(HostnameFlag.Name), ctx.GlobalString(GrpcPortFlag.Name)),
SignerPrivateKeyHex: ctx.String(SignerPrivateKeyFlag.Name),
DisableTLS: ctx.GlobalBool(DisableTLSFlag.Name),
},

LoggingConfig: *loggerConfig,
SignerPrivateKey: ctx.String(SignerPrivateKeyFlag.Name),
LoggingConfig: *loggerConfig,

MetricsHTTPPort: ctx.GlobalString(MetricsHTTPPortFlag.Name),
NodeClientTimeout: ctx.Duration(NodeClientTimeoutFlag.Name),

InstanceLaunchInterval: ctx.Duration(InstanceLaunchIntervalFlag.Name),

WorkerConfig: WorkerConfig{
NumWriteInstances: ctx.GlobalUint(NumWriteInstancesFlag.Name),
WriteRequestInterval: ctx.Duration(WriteRequestIntervalFlag.Name),
DataSize: ctx.GlobalUint64(DataSizeFlag.Name),
RandomizeBlobs: !ctx.GlobalBool(UniformBlobsFlag.Name),
WriteTimeout: ctx.Duration(WriteTimeoutFlag.Name),

TrackerInterval: ctx.Duration(VerifierIntervalFlag.Name),
GetBlobStatusTimeout: ctx.Duration(GetBlobStatusTimeoutFlag.Name),

NumReadInstances: ctx.GlobalUint(NumReadInstancesFlag.Name),
ReadRequestInterval: ctx.Duration(ReadRequestIntervalFlag.Name),
RequiredDownloads: ctx.Float64(RequiredDownloadsFlag.Name),
FetchBatchHeaderTimeout: ctx.Duration(FetchBatchHeaderTimeoutFlag.Name),
RetrieveBlobChunksTimeout: ctx.Duration(RetrieveBlobChunksTimeoutFlag.Name),
StatusTrackerChannelCapacity: ctx.Uint(VerificationChannelCapacityFlag.Name),

EigenDAServiceManager: retrieverConfig.EigenDAServiceManagerAddr,
SignerPrivateKey: ctx.String(SignerPrivateKeyFlag.Name),
CustomQuorums: customQuorumsUint8,

MetricsBlacklist: ctx.StringSlice(MetricsBlacklistFlag.Name),
MetricsFuzzyBlacklist: ctx.StringSlice(MetricsFuzzyBlacklistFlag.Name),
},
}

err = config.EigenDAClientConfig.CheckAndSetDefaults()
if err != nil {
return nil, err
RuntimeConfigPath: ctx.GlobalString(RuntimeConfigPathFlag.Name),
}

return config, nil
Expand Down
21 changes: 21 additions & 0 deletions tools/traffic/config/example_runtime_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Example runtime configuration file for the traffic generator
# This file can be modified while the traffic generator is running

writer_groups:
# 1kb/s writer (1024 bytes, 1 instance)
- name: "small_frequent"
num_write_instances: 1
write_request_interval: 1s
data_size: 1024
randomize_blobs: true
write_timeout: 10s
custom_quorums: [1]

# 2kb/s writer (1024 bytes, 2 instances)
- name: "medium_frequent"
num_write_instances: 2
write_request_interval: 1s
data_size: 1024
randomize_blobs: true
write_timeout: 10s
custom_quorums: [1]
Loading

0 comments on commit 2c7a414

Please sign in to comment.