From 2c7a414e61d36296799b18dc16ac63644e5bab1e Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Tue, 14 Jan 2025 10:32:20 -0800 Subject: [PATCH] Traffic generator V2 (#1055) --- api/clients/v2/accountant.go | 1 + api/clients/v2/disperser_client.go | 1 + docker-bake.hcl | 18 +- go.mod | 2 +- tools/traffic/cmd2/main.go | 24 +- tools/traffic/config/config.go | 95 +----- .../config/example_runtime_config.yaml | 21 ++ tools/traffic/config/flags.go | 185 +---------- tools/traffic/config/runtime_config.go | 155 +++++++++ tools/traffic/config/worker_config.go | 53 ---- tools/traffic/config/writer_config.go | 25 ++ tools/traffic/generator_v2.go | 298 ++++++++++-------- tools/traffic/metrics/count_metric.go | 5 - tools/traffic/metrics/metrics.go | 104 +++--- tools/traffic/metrics/mock_metrics.go | 8 +- tools/traffic/table/blob_metadata.go | 9 +- tools/traffic/table/blob_store_test.go | 3 +- tools/traffic/workers/blob_reader.go | 233 -------------- tools/traffic/workers/blob_reader_test.go | 151 --------- tools/traffic/workers/blob_status_tracker.go | 256 --------------- .../workers/blob_status_tracker_test.go | 205 ------------ tools/traffic/workers/blob_writer.go | 156 +++++---- tools/traffic/workers/blob_writer_test.go | 73 ++--- tools/traffic/workers/mock_disperser.go | 35 +- tools/traffic/workers/unconfirmed_key.go | 12 +- ...ckerfile => trafficgenerator-v2.Dockerfile | 5 +- trafficgenerator.Dockerfile | 4 +- 27 files changed, 644 insertions(+), 1493 deletions(-) create mode 100644 tools/traffic/config/example_runtime_config.yaml create mode 100644 tools/traffic/config/runtime_config.go delete mode 100644 tools/traffic/config/worker_config.go create mode 100644 tools/traffic/config/writer_config.go delete mode 100644 tools/traffic/workers/blob_reader.go delete mode 100644 tools/traffic/workers/blob_reader_test.go delete mode 100644 tools/traffic/workers/blob_status_tracker.go delete mode 100644 tools/traffic/workers/blob_status_tracker_test.go rename trafficgenerator2.Dockerfile => trafficgenerator-v2.Dockerfile (77%) diff --git a/api/clients/v2/accountant.go b/api/clients/v2/accountant.go index be0030d3f..9ad17d7f2 100644 --- a/api/clients/v2/accountant.go +++ b/api/clients/v2/accountant.go @@ -104,6 +104,7 @@ func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint32, quo } return 0, a.cumulativePayment, nil } + return 0, big.NewInt(0), fmt.Errorf("neither reservation nor on-demand payment is available") } diff --git a/api/clients/v2/disperser_client.go b/api/clients/v2/disperser_client.go index 3ef20404b..4f4415fea 100644 --- a/api/clients/v2/disperser_client.go +++ b/api/clients/v2/disperser_client.go @@ -103,6 +103,7 @@ func (c *disperserClient) PopulateAccountant(ctx context.Context) error { if err != nil { return fmt.Errorf("error setting payment state for accountant: %w", err) } + return nil } diff --git a/docker-bake.hcl b/docker-bake.hcl index ecee0f2ae..9f9b136c1 100644 --- a/docker-bake.hcl +++ b/docker-bake.hcl @@ -44,6 +44,7 @@ group "all" { "churner", "dataapi", "traffic-generator", + "traffic-generator-v2", "controller", "relay" ] @@ -84,6 +85,7 @@ group "internal-release" { "churner-internal", "dataapi-internal", "traffic-generator-internal", + "traffic-generator-v2-internal", "controller-internal", "relay-internal" ] @@ -201,19 +203,19 @@ target "traffic-generator-internal" { ] } -target "traffic-generator2" { +target "traffic-generator-v2" { context = "." - dockerfile = "./trafficgenerator2.Dockerfile" + dockerfile = "./trafficgenerator-v2.Dockerfile" target = "generator2" - tags = ["${REGISTRY}/${REPO}/traffic-generator2:${BUILD_TAG}"] + tags = ["${REGISTRY}/${REPO}/traffic-generator-v2:${BUILD_TAG}"] } -target "traffic-generator2-internal" { - inherits = ["traffic-generator2"] +target "traffic-generator-v2-internal" { + inherits = ["traffic-generator-v2"] tags = [ - "${REGISTRY}/eigenda-traffic-generator2:${BUILD_TAG}", - "${REGISTRY}/eigenda-traffic-generator2:${GIT_SHA}", - "${REGISTRY}/eigenda-traffic-generator2:sha-${GIT_SHORT_SHA}" + "${REGISTRY}/eigenda-traffic-generator-v2:${BUILD_TAG}", + "${REGISTRY}/eigenda-traffic-generator-v2:${GIT_SHA}", + "${REGISTRY}/eigenda-traffic-generator-v2:sha-${GIT_SHORT_SHA}" ] } diff --git a/go.mod b/go.mod index b5de6d4e7..1aab1d295 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( golang.org/x/sync v0.8.0 golang.org/x/time v0.5.0 google.golang.org/grpc v1.64.1 + gopkg.in/yaml.v2 v2.4.0 ) require ( @@ -156,7 +157,6 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect ) require ( diff --git a/tools/traffic/cmd2/main.go b/tools/traffic/cmd2/main.go index 51b41f56b..6c9a412eb 100644 --- a/tools/traffic/cmd2/main.go +++ b/tools/traffic/cmd2/main.go @@ -4,6 +4,8 @@ import ( "fmt" "log" "os" + "os/signal" + "syscall" "github.com/Layr-Labs/eigenda/tools/traffic" "github.com/Layr-Labs/eigenda/tools/traffic/config" @@ -40,5 +42,25 @@ func trafficGeneratorMain(ctx *cli.Context) error { panic(fmt.Sprintf("failed to create new traffic generator\n%s", err)) } - return generator.Start() + // Set up signal handling + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) + + // Run the generator in a goroutine + errChan := make(chan error, 1) + go func() { + errChan <- generator.Start() + }() + + // Wait for either an error or a signal + select { + case err := <-errChan: + return err + case sig := <-sigChan: + fmt.Printf("\nReceived signal %v, shutting down...\n", sig) + if err := generator.Stop(); err != nil { + fmt.Printf("Failed to stop generator: %v\n", err) + } + return nil + } } diff --git a/tools/traffic/config/config.go b/tools/traffic/config/config.go index 9702c7be8..1b21c4db9 100644 --- a/tools/traffic/config/config.go +++ b/tools/traffic/config/config.go @@ -1,45 +1,32 @@ package config import ( - "errors" - "fmt" "time" - "github.com/Layr-Labs/eigenda/api/clients" - "github.com/Layr-Labs/eigenda/core/thegraph" - "github.com/Layr-Labs/eigenda/retriever" - + "github.com/Layr-Labs/eigenda/api/clients/v2" "github.com/Layr-Labs/eigenda/common" "github.com/urfave/cli" ) // Config configures a traffic generator. type Config struct { - // Logging configuration. LoggingConfig common.LoggerConfig // Configuration for the disperser client. - DisperserClientConfig *clients.Config - - // Configuration for the retriever client. - RetrievalClientConfig *retriever.Config - - // Configuration for the graph. - TheGraphConfig *thegraph.Config + DisperserClientConfig *clients.DisperserClientConfig - // Configuration for the EigenDA client. - EigenDAClientConfig *clients.EigenDAClientConfig - - // Configures the traffic generator workers. - WorkerConfig WorkerConfig + // Signer private key + SignerPrivateKey string // The port at which the metrics server listens for HTTP requests. MetricsHTTPPort string + // The timeout for the node client. NodeClientTimeout time.Duration - // The amount of time to sleep after launching each worker thread. - InstanceLaunchInterval time.Duration + + // Path to the runtime configuration file that defines writer groups. + RuntimeConfigPath string } func NewConfig(ctx *cli.Context) (*Config, error) { @@ -47,78 +34,20 @@ func NewConfig(ctx *cli.Context) (*Config, error) { if err != nil { return nil, err } - customQuorums := ctx.GlobalIntSlice(CustomQuorumNumbersFlag.Name) - customQuorumsUint8 := make([]uint8, len(customQuorums)) - for i, q := range customQuorums { - if q < 0 || q > 255 { - return nil, errors.New("invalid custom quorum number") - } - customQuorumsUint8[i] = uint8(q) - } - - retrieverConfig, err := retriever.NewConfig(ctx) - if err != nil { - return nil, err - } config := &Config{ - DisperserClientConfig: &clients.Config{ + DisperserClientConfig: &clients.DisperserClientConfig{ Hostname: ctx.GlobalString(HostnameFlag.Name), Port: ctx.GlobalString(GrpcPortFlag.Name), - Timeout: ctx.Duration(TimeoutFlag.Name), UseSecureGrpcFlag: ctx.GlobalBool(UseSecureGrpcFlag.Name), }, - RetrievalClientConfig: retrieverConfig, - - TheGraphConfig: &thegraph.Config{ - Endpoint: ctx.String(TheGraphUrlFlag.Name), - PullInterval: ctx.Duration(TheGraphPullIntervalFlag.Name), - MaxRetries: ctx.Int(TheGraphRetriesFlag.Name), - }, - - EigenDAClientConfig: &clients.EigenDAClientConfig{ - RPC: fmt.Sprintf("%s:%s", ctx.GlobalString(HostnameFlag.Name), ctx.GlobalString(GrpcPortFlag.Name)), - SignerPrivateKeyHex: ctx.String(SignerPrivateKeyFlag.Name), - DisableTLS: ctx.GlobalBool(DisableTLSFlag.Name), - }, - - LoggingConfig: *loggerConfig, + SignerPrivateKey: ctx.String(SignerPrivateKeyFlag.Name), + LoggingConfig: *loggerConfig, MetricsHTTPPort: ctx.GlobalString(MetricsHTTPPortFlag.Name), NodeClientTimeout: ctx.Duration(NodeClientTimeoutFlag.Name), - - InstanceLaunchInterval: ctx.Duration(InstanceLaunchIntervalFlag.Name), - - WorkerConfig: WorkerConfig{ - NumWriteInstances: ctx.GlobalUint(NumWriteInstancesFlag.Name), - WriteRequestInterval: ctx.Duration(WriteRequestIntervalFlag.Name), - DataSize: ctx.GlobalUint64(DataSizeFlag.Name), - RandomizeBlobs: !ctx.GlobalBool(UniformBlobsFlag.Name), - WriteTimeout: ctx.Duration(WriteTimeoutFlag.Name), - - TrackerInterval: ctx.Duration(VerifierIntervalFlag.Name), - GetBlobStatusTimeout: ctx.Duration(GetBlobStatusTimeoutFlag.Name), - - NumReadInstances: ctx.GlobalUint(NumReadInstancesFlag.Name), - ReadRequestInterval: ctx.Duration(ReadRequestIntervalFlag.Name), - RequiredDownloads: ctx.Float64(RequiredDownloadsFlag.Name), - FetchBatchHeaderTimeout: ctx.Duration(FetchBatchHeaderTimeoutFlag.Name), - RetrieveBlobChunksTimeout: ctx.Duration(RetrieveBlobChunksTimeoutFlag.Name), - StatusTrackerChannelCapacity: ctx.Uint(VerificationChannelCapacityFlag.Name), - - EigenDAServiceManager: retrieverConfig.EigenDAServiceManagerAddr, - SignerPrivateKey: ctx.String(SignerPrivateKeyFlag.Name), - CustomQuorums: customQuorumsUint8, - - MetricsBlacklist: ctx.StringSlice(MetricsBlacklistFlag.Name), - MetricsFuzzyBlacklist: ctx.StringSlice(MetricsFuzzyBlacklistFlag.Name), - }, - } - - err = config.EigenDAClientConfig.CheckAndSetDefaults() - if err != nil { - return nil, err + RuntimeConfigPath: ctx.GlobalString(RuntimeConfigPathFlag.Name), } return config, nil diff --git a/tools/traffic/config/example_runtime_config.yaml b/tools/traffic/config/example_runtime_config.yaml new file mode 100644 index 000000000..f5d746b41 --- /dev/null +++ b/tools/traffic/config/example_runtime_config.yaml @@ -0,0 +1,21 @@ +# Example runtime configuration file for the traffic generator +# This file can be modified while the traffic generator is running + +writer_groups: + # 1kb/s writer (1024 bytes, 1 instance) + - name: "small_frequent" + num_write_instances: 1 + write_request_interval: 1s + data_size: 1024 + randomize_blobs: true + write_timeout: 10s + custom_quorums: [1] + + # 2kb/s writer (1024 bytes, 2 instances) + - name: "medium_frequent" + num_write_instances: 2 + write_request_interval: 1s + data_size: 1024 + randomize_blobs: true + write_timeout: 10s + custom_quorums: [1] diff --git a/tools/traffic/config/flags.go b/tools/traffic/config/flags.go index c4218e52c..4d72e8a56 100644 --- a/tools/traffic/config/flags.go +++ b/tools/traffic/config/flags.go @@ -1,14 +1,10 @@ package config import ( - "github.com/Layr-Labs/eigenda/common/geth" - "github.com/Layr-Labs/eigenda/core/thegraph" - "github.com/Layr-Labs/eigenda/encoding/kzg" - "github.com/Layr-Labs/eigenda/indexer" - "github.com/Layr-Labs/eigenda/retriever/flags" "time" "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/indexer" "github.com/urfave/cli" ) @@ -51,12 +47,6 @@ var ( Required: false, EnvVar: common.PrefixEnvVar(envPrefix, "SIGNER_PRIVATE_KEY_HEX"), } - CustomQuorumNumbersFlag = cli.IntSliceFlag{ - Name: common.PrefixFlag(FlagPrefix, "custom-quorum-numbers"), - Usage: "Custom quorum numbers to use for the traffic generator.", - Required: false, - EnvVar: common.PrefixEnvVar(envPrefix, "CUSTOM_QUORUM_NUMBERS"), - } DisableTLSFlag = cli.BoolFlag{ Name: common.PrefixFlag(FlagPrefix, "disable-tls"), Usage: "Whether to disable TLS for an insecure connection.", @@ -69,87 +59,6 @@ var ( Required: false, EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_HTTP_PORT"), } - - /* Common Configuration. */ - - InstanceLaunchIntervalFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "instance-launch-interva"), - Usage: "Duration between generator instance launches.", - Required: false, - Value: 1 * time.Second, - EnvVar: common.PrefixEnvVar(envPrefix, "INSTANCE_LAUNCH_INTERVAL"), - } - - MetricsBlacklistFlag = cli.StringSliceFlag{ - Name: common.PrefixFlag(FlagPrefix, "metrics-blacklist"), - Usage: "Any metric with a label exactly matching this string will not be sent to the metrics server.", - Required: false, - EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_BLACKLIST"), - } - - MetricsFuzzyBlacklistFlag = cli.StringSliceFlag{ - Name: common.PrefixFlag(FlagPrefix, "metrics-fuzzy-blacklist"), - Usage: "Any metric that contains any string in this list will not be sent to the metrics server.", - Required: false, - EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_FUZZY_BLACKLIST"), - } - - /* Configuration for the blob writer. */ - - NumWriteInstancesFlag = cli.UintFlag{ - Name: common.PrefixFlag(FlagPrefix, "num-write-instances"), - Usage: "Number of writer instances producing traffic to run in parallel.", - Required: false, - Value: 1, - EnvVar: common.PrefixEnvVar(envPrefix, "NUM_WRITE_INSTANCES"), - } - WriteRequestIntervalFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "write-request-interval"), - Usage: "Time between write requests.", - Required: false, - Value: 30 * time.Second, - EnvVar: common.PrefixEnvVar(envPrefix, "WRITE_REQUEST_INTERVAL"), - } - DataSizeFlag = cli.Uint64Flag{ - Name: common.PrefixFlag(FlagPrefix, "data-size"), - Usage: "Size of the data blob.", - Required: false, - Value: 1024, - EnvVar: common.PrefixEnvVar(envPrefix, "DATA_SIZE"), - } - UniformBlobsFlag = cli.BoolFlag{ - Name: common.PrefixFlag(FlagPrefix, "uniform-blobs"), - Usage: "If set, do not randomize blobs.", - Required: false, - EnvVar: common.PrefixEnvVar(envPrefix, "UNIFORM_BLOBS"), - } - WriteTimeoutFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "write-timeout"), - Usage: "Amount of time to wait for a blob to be written.", - Required: false, - Value: 10 * time.Second, - EnvVar: common.PrefixEnvVar(envPrefix, "WRITE_TIMEOUT"), - } - TheGraphUrlFlag = cli.StringFlag{ - Name: common.PrefixFlag(FlagPrefix, "the-graph-url"), - Usage: "URL of the subgraph instance.", - Required: true, - EnvVar: common.PrefixEnvVar(envPrefix, "THE_GRAPH_URL"), - } - TheGraphPullIntervalFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "the-graph-pull-interval"), - Usage: "Interval at which to pull data from the subgraph.", - Required: false, - Value: 100 * time.Millisecond, - EnvVar: common.PrefixEnvVar(envPrefix, "THE_GRAPH_PULL_INTERVAL"), - } - TheGraphRetriesFlag = cli.UintFlag{ - Name: common.PrefixFlag(FlagPrefix, "the-graph-retries"), - Usage: "Number of times to retry a subgraph request.", - Required: false, - Value: 5, - EnvVar: common.PrefixEnvVar(envPrefix, "THE_GRAPH_RETRIES"), - } NodeClientTimeoutFlag = cli.DurationFlag{ Name: common.PrefixFlag(FlagPrefix, "node-client-timeout"), Usage: "The timeout for the node client.", @@ -157,103 +66,27 @@ var ( Value: 10 * time.Second, EnvVar: common.PrefixEnvVar(envPrefix, "NODE_CLIENT_TIMEOUT"), } - - /* Configuration for the blob validator. */ - - VerifierIntervalFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "verifier-interval"), - Usage: "Amount of time between verifier checks.", - Required: false, - Value: time.Second, - EnvVar: common.PrefixEnvVar(envPrefix, "VERIFIER_INTERVAL"), - } - GetBlobStatusTimeoutFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "get-blob-status-timeout"), - Usage: "Amount of time to wait for a blob status to be fetched.", - Required: false, - Value: 5 * time.Second, - EnvVar: common.PrefixEnvVar(envPrefix, "GET_BLOB_STATUS_TIMEOUT"), - } - VerificationChannelCapacityFlag = cli.UintFlag{ - Name: common.PrefixFlag(FlagPrefix, "verification-channel-capacity"), - Usage: "Size of the channel used to communicate between the writer and verifier.", - Required: false, - Value: 1000, - EnvVar: common.PrefixEnvVar(envPrefix, "VERIFICATION_CHANNEL_CAPACITY"), - } - - /* Configuration for the blob reader. */ - - NumReadInstancesFlag = cli.UintFlag{ - Name: common.PrefixFlag(FlagPrefix, "num-read-instances"), - Usage: "Number of reader instances producing traffic to run in parallel.", - Required: false, - Value: 1, - EnvVar: common.PrefixEnvVar(envPrefix, "NUM_READ_INSTANCES"), - } - ReadRequestIntervalFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "read-request-interval"), - Usage: "Time between read requests.", - Required: false, - Value: time.Second / 5, - EnvVar: common.PrefixEnvVar(envPrefix, "READ_REQUEST_INTERVAL"), - } - RequiredDownloadsFlag = cli.Float64Flag{ - Name: common.PrefixFlag(FlagPrefix, "required-downloads"), - Usage: "Number of required downloads. Numbers between 0.0 and 1.0 are treated as probabilities, " + - "numbers greater than 1.0 are treated as the number of downloads. -1 allows unlimited downloads.", - Required: false, - Value: 3.0, - EnvVar: common.PrefixEnvVar(envPrefix, "REQUIRED_DOWNLOADS"), - } - FetchBatchHeaderTimeoutFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "fetch-batch-header-timeout"), - Usage: "Amount of time to wait for a batch header to be fetched.", - Required: false, - Value: 5 * time.Second, - EnvVar: common.PrefixEnvVar(envPrefix, "FETCH_BATCH_HEADER_TIMEOUT"), - } - RetrieveBlobChunksTimeoutFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "retrieve-blob-chunks-timeout"), - Usage: "Amount of time to wait for a blob to be retrieved.", - Required: false, - Value: 5 * time.Second, - EnvVar: common.PrefixEnvVar(envPrefix, "RETRIEVE_BLOB_CHUNKS_TIMEOUT"), + RuntimeConfigPathFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "runtime-config-path"), + Usage: "Path to the runtime configuration file that defines writer groups.", + Required: true, + EnvVar: common.PrefixEnvVar(envPrefix, "RUNTIME_CONFIG_PATH"), } ) var requiredFlags = []cli.Flag{ HostnameFlag, GrpcPortFlag, - TheGraphUrlFlag, + RuntimeConfigPathFlag, } var optionalFlags = []cli.Flag{ TimeoutFlag, - UniformBlobsFlag, - InstanceLaunchIntervalFlag, UseSecureGrpcFlag, SignerPrivateKeyFlag, - CustomQuorumNumbersFlag, - NumWriteInstancesFlag, - WriteRequestIntervalFlag, - DataSizeFlag, - NumReadInstancesFlag, - ReadRequestIntervalFlag, - RequiredDownloadsFlag, DisableTLSFlag, MetricsHTTPPortFlag, - TheGraphPullIntervalFlag, - TheGraphRetriesFlag, - VerifierIntervalFlag, NodeClientTimeoutFlag, - FetchBatchHeaderTimeoutFlag, - RetrieveBlobChunksTimeoutFlag, - GetBlobStatusTimeoutFlag, - WriteTimeoutFlag, - VerificationChannelCapacityFlag, - MetricsBlacklistFlag, - MetricsFuzzyBlacklistFlag, } // Flags contains the list of configuration options available to the binary. @@ -261,10 +94,6 @@ var Flags []cli.Flag func init() { Flags = append(requiredFlags, optionalFlags...) - Flags = append(Flags, flags.RetrieverFlags(envPrefix)...) - Flags = append(Flags, kzg.CLIFlags(envPrefix)...) Flags = append(Flags, common.LoggerCLIFlags(envPrefix, FlagPrefix)...) - Flags = append(Flags, geth.EthClientFlags(envPrefix)...) Flags = append(Flags, indexer.CLIFlags(envPrefix)...) - Flags = append(Flags, thegraph.CLIFlags(envPrefix)...) } diff --git a/tools/traffic/config/runtime_config.go b/tools/traffic/config/runtime_config.go new file mode 100644 index 000000000..759976b04 --- /dev/null +++ b/tools/traffic/config/runtime_config.go @@ -0,0 +1,155 @@ +package config + +import ( + "context" + "fmt" + "os" + "sync" + "time" + + "gopkg.in/yaml.v2" +) + +// RuntimeConfig represents the configuration that can be modified while the traffic generator is running +type RuntimeConfig struct { + // WriterGroups defines different groups of writers with their own configurations + WriterGroups []WriterGroupConfig `yaml:"writer_groups"` +} + +// WriterGroupConfig represents the configuration for a group of writers with the same settings +type WriterGroupConfig struct { + // Name of the writer group for identification + Name string `yaml:"name"` + + // The number of worker threads that generate write traffic. + NumWriteInstances uint `yaml:"num_write_instances"` + + // The period of the submission rate of new blobs for each write worker thread. + WriteRequestInterval time.Duration `yaml:"write_request_interval"` + + // The Size of each blob dispersed, in bytes. + DataSize uint64 `yaml:"data_size"` + + // If true, then each blob will contain unique random data. If false, the same random data + // will be dispersed for each blob by a particular worker thread. + RandomizeBlobs bool `yaml:"randomize_blobs"` + + // The amount of time to wait for a blob to be written. + WriteTimeout time.Duration `yaml:"write_timeout"` + + // Custom quorum numbers to use for the traffic generator. + CustomQuorums []uint8 `yaml:"custom_quorums"` +} + +// RuntimeConfigManager handles loading and watching of runtime configuration +type RuntimeConfigManager struct { + sync.RWMutex + currentConfig *RuntimeConfig + configPath string + onChange func(*RuntimeConfig) +} + +// NewRuntimeConfigManager creates a new runtime config manager +func NewRuntimeConfigManager(configPath string, onChange func(*RuntimeConfig)) (*RuntimeConfigManager, error) { + manager := &RuntimeConfigManager{ + configPath: configPath, + onChange: onChange, + } + + // Load initial config + if err := manager.loadConfig(); err != nil { + return nil, err + } + + return manager, nil +} + +// GetConfig returns the current runtime configuration +func (m *RuntimeConfigManager) GetConfig() *RuntimeConfig { + m.RLock() + defer m.RUnlock() + return m.currentConfig +} + +// loadConfig loads the configuration from disk +func (m *RuntimeConfigManager) loadConfig() error { + data, err := os.ReadFile(m.configPath) + if err != nil { + return fmt.Errorf("failed to read config file: %w", err) + } + + var config RuntimeConfig + if err := yaml.Unmarshal(data, &config); err != nil { + return fmt.Errorf("failed to parse config file: %w", err) + } + + // Validate each writer group if any exist + for i, group := range config.WriterGroups { + if group.Name == "" { + return fmt.Errorf("writer group at index %d must have a name", i) + } + if group.NumWriteInstances == 0 { + return fmt.Errorf("writer group '%s' must have at least one writer instance", group.Name) + } + if group.WriteRequestInterval == 0 { + return fmt.Errorf("writer group '%s' must have a non-zero write request interval", group.Name) + } + if group.DataSize == 0 { + return fmt.Errorf("writer group '%s' must have a non-zero data size", group.Name) + } + if group.WriteTimeout == 0 { + return fmt.Errorf("writer group '%s' must have a non-zero write timeout", group.Name) + } + if len(group.CustomQuorums) == 0 { + return fmt.Errorf("writer group '%s' must have at least one custom quorum", group.Name) + } + } + + m.Lock() + defer m.Unlock() + + // Check if config has actually changed + if m.currentConfig != nil { + // Convert both configs to YAML for comparison + currentYAML, err := yaml.Marshal(m.currentConfig) + if err != nil { + return fmt.Errorf("failed to marshal current config: %w", err) + } + newYAML, err := yaml.Marshal(&config) + if err != nil { + return fmt.Errorf("failed to marshal new config: %w", err) + } + + if string(currentYAML) == string(newYAML) { + // No changes, skip update + return nil + } + } + + m.currentConfig = &config + + if m.onChange != nil { + m.onChange(&config) + } + + return nil +} + +// StartWatching begins watching the config file for changes +func (m *RuntimeConfigManager) StartWatching(ctx context.Context) { + ticker := time.NewTicker(5 * time.Second) + go func() { + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + if err := m.loadConfig(); err != nil { + // Just log the error and continue + fmt.Printf("Error reloading config: %v\n", err) + } + } + } + }() +} diff --git a/tools/traffic/config/worker_config.go b/tools/traffic/config/worker_config.go deleted file mode 100644 index 78eb7a6fc..000000000 --- a/tools/traffic/config/worker_config.go +++ /dev/null @@ -1,53 +0,0 @@ -package config - -import "time" - -// WorkerConfig configures the traffic generator workers. -type WorkerConfig struct { - // The number of worker threads that generate write traffic. - NumWriteInstances uint - // The period of the submission rate of new blobs for each write worker thread. - WriteRequestInterval time.Duration - // The Size of each blob dispersed, in bytes. - DataSize uint64 - // If true, then each blob will contain unique random data. If false, the same random data - // will be dispersed for each blob by a particular worker thread. - RandomizeBlobs bool - // The amount of time to wait for a blob to be written. - WriteTimeout time.Duration - - // The amount of time between attempts by the status tracker to confirm the status of blobs. - TrackerInterval time.Duration - // The amount of time to wait for a blob status to be fetched. - GetBlobStatusTimeout time.Duration - // The size of the channel used to communicate between the writer and status tracker. - StatusTrackerChannelCapacity uint - - // The number of worker threads that generate read traffic. - NumReadInstances uint - // The period of the submission rate of read requests for each read worker thread. - ReadRequestInterval time.Duration - // For each blob, how many times should it be downloaded? If between 0.0 and 1.0, blob will be downloaded - // 0 or 1 times with the specified probability (e.g. 0.2 means each blob has a 20% chance of being downloaded). - // If greater than 1.0, then each blob will be downloaded the specified number of times. - RequiredDownloads float64 - // The amount of time to wait for a batch header to be fetched. - FetchBatchHeaderTimeout time.Duration - // The amount of time to wait for a blob to be retrieved. - RetrieveBlobChunksTimeout time.Duration - - // The address of the EigenDA service manager smart contract, in hex. - EigenDAServiceManager string - // The private key to use for signing requests. - SignerPrivateKey string - // Custom quorum numbers to use for the traffic generator. - CustomQuorums []uint8 - - // Any metric with a label exactly matching one of the strings in this list will not be sent to the metrics server. - MetricsBlacklist []string - - // Any metric that contains any string in this list will not be sent to the metrics server. For example, - // including the string "_returned_chunk" will cause all metrics in the form of - // "operator_fb390a64122db3957fb220c3c42d5f71e97ab0c995da4e1e5cc3261602dac527_returned_chunk" to be omitted. - MetricsFuzzyBlacklist []string -} diff --git a/tools/traffic/config/writer_config.go b/tools/traffic/config/writer_config.go new file mode 100644 index 000000000..fde5f2102 --- /dev/null +++ b/tools/traffic/config/writer_config.go @@ -0,0 +1,25 @@ +package config + +import "time" + +// BlobWriterConfig configures the blob writer. +type BlobWriterConfig struct { + // The number of worker threads that generate write traffic. + NumWriteInstances uint + + // The period of the submission rate of new blobs for each write worker thread. + WriteRequestInterval time.Duration + + // The Size of each blob dispersed, in bytes. + DataSize uint64 + + // If true, then each blob will contain unique random data. If false, the same random data + // will be dispersed for each blob by a particular worker thread. + RandomizeBlobs bool + + // The amount of time to wait for a blob to be written. + WriteTimeout time.Duration + + // Custom quorum numbers to use for the traffic generator. + CustomQuorums []uint8 +} diff --git a/tools/traffic/generator_v2.go b/tools/traffic/generator_v2.go index beec5e393..5772a3502 100644 --- a/tools/traffic/generator_v2.go +++ b/tools/traffic/generator_v2.go @@ -3,29 +3,18 @@ package traffic import ( "context" "fmt" - "os" - "os/signal" "sync" - "syscall" "time" - "github.com/Layr-Labs/eigenda/common/geth" - "github.com/Layr-Labs/eigenda/core/auth" - "github.com/Layr-Labs/eigenda/core/eth" - "github.com/Layr-Labs/eigenda/core/thegraph" - "github.com/Layr-Labs/eigenda/encoding/kzg/verifier" - retrivereth "github.com/Layr-Labs/eigenda/retriever/eth" + clientsv2 "github.com/Layr-Labs/eigenda/api/clients/v2" + "github.com/Layr-Labs/eigenda/common" + auth "github.com/Layr-Labs/eigenda/core/auth/v2" "github.com/Layr-Labs/eigenda/tools/traffic/config" + trafficconfig "github.com/Layr-Labs/eigenda/tools/traffic/config" "github.com/Layr-Labs/eigenda/tools/traffic/metrics" - "github.com/Layr-Labs/eigenda/tools/traffic/table" "github.com/Layr-Labs/eigenda/tools/traffic/workers" "github.com/Layr-Labs/eigensdk-go/logging" gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - - "github.com/Layr-Labs/eigenda/api/clients" - "github.com/Layr-Labs/eigenda/common" - "github.com/Layr-Labs/eigenda/core" ) // Generator simulates read/write traffic to the DA service. @@ -37,25 +26,29 @@ import ( // └------------┘ └------------┘ // // The traffic generator is built from three principal components: one or more writers -// that write blobs, a statusTracker that polls the dispenser service until blobs are confirmed, +// that write blobs, a statusTracker that polls the disperser service until blobs are confirmed, // and one or more readers that read blobs. // // When a writer finishes writing a blob, it sends information about that blob to the statusTracker. // When the statusTracker observes that a blob has been confirmed, it sends information about the blob // to the readers. The readers only attempt to read blobs that have been confirmed by the statusTracker. +type WriterGroup struct { + name string + writers []*workers.BlobWriter + cancels map[*workers.BlobWriter]context.CancelFunc +} + type Generator struct { ctx *context.Context cancel *context.CancelFunc waitGroup *sync.WaitGroup generatorMetrics metrics.Metrics - logger *logging.Logger - disperserClient clients.DisperserClient - eigenDAClient *clients.EigenDAClient + logger logging.Logger + disperserClient clientsv2.DisperserClient config *config.Config - - writers []*workers.BlobWriter - statusTracker *workers.BlobStatusTracker - readers []*workers.BlobReader + writerGroups map[string]*WriterGroup + configManager *config.RuntimeConfigManager + mu sync.RWMutex } func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { @@ -64,15 +57,23 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { return nil, err } - var signer core.BlobRequestSigner - if config.EigenDAClientConfig.SignerPrivateKeyHex != "" { - signer = auth.NewLocalBlobRequestSigner(config.EigenDAClientConfig.SignerPrivateKeyHex) + var signer *auth.LocalBlobRequestSigner + if config.SignerPrivateKey != "" { + signer = auth.NewLocalBlobRequestSigner(config.SignerPrivateKey) + } else { + logger.Error("signer private key is required") + return nil, fmt.Errorf("signer private key is required") } - logger2 := log.NewLogger(log.NewTerminalHandler(os.Stderr, true)) - client, err := clients.NewEigenDAClient(logger2, *config.EigenDAClientConfig) + signerAccountId, err := signer.GetAccountID() if err != nil { - return nil, err + return nil, fmt.Errorf("error getting account ID: %w", err) + } + accountId := gethcommon.HexToAddress(signerAccountId) + logger.Info("Initializing traffic generator", "accountId", accountId) + + if config.RuntimeConfigPath == "" { + return nil, fmt.Errorf("runtime config path is required") } ctx, cancel := context.WithCancel(context.Background()) @@ -81,148 +82,167 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { generatorMetrics := metrics.NewMetrics( config.MetricsHTTPPort, logger, - config.WorkerConfig.MetricsBlacklist, - config.WorkerConfig.MetricsFuzzyBlacklist) - - blobTable := table.NewBlobStore() + ) - unconfirmedKeyChannel := make(chan *workers.UnconfirmedKey, 100) - - // TODO: create a dedicated reservation for traffic generator - disperserClient, err := clients.NewDisperserClient(config.DisperserClientConfig, signer) + disperserClient, err := clientsv2.NewDisperserClient(config.DisperserClientConfig, signer, nil, nil) if err != nil { cancel() return nil, fmt.Errorf("new disperser-client: %w", err) } - statusVerifier := workers.NewBlobStatusTracker( - &ctx, - &waitGroup, - logger, - &config.WorkerConfig, - unconfirmedKeyChannel, - blobTable, - disperserClient, - generatorMetrics) - - writers := make([]*workers.BlobWriter, 0) - for i := 0; i < int(config.WorkerConfig.NumWriteInstances); i++ { - writer := workers.NewBlobWriter( - &ctx, - &waitGroup, - logger, - &config.WorkerConfig, - disperserClient, - unconfirmedKeyChannel, - generatorMetrics) - writers = append(writers, &writer) - } - - retriever, chainClient := buildRetriever(config) - - readers := make([]*workers.BlobReader, 0) - for i := 0; i < int(config.WorkerConfig.NumReadInstances); i++ { - reader := workers.NewBlobReader( - &ctx, - &waitGroup, - logger, - &config.WorkerConfig, - retriever, - chainClient, - blobTable, - generatorMetrics) - readers = append(readers, &reader) - } - return &Generator{ + generator := &Generator{ ctx: &ctx, cancel: &cancel, waitGroup: &waitGroup, generatorMetrics: generatorMetrics, - logger: &logger, + logger: logger, disperserClient: disperserClient, - eigenDAClient: client, config: config, - writers: writers, - statusTracker: &statusVerifier, - readers: readers, - }, nil -} - -// buildRetriever creates a retriever client for the traffic generator. -func buildRetriever(config *config.Config) (clients.RetrievalClient, retrivereth.ChainClient) { - loggerConfig := common.DefaultLoggerConfig() - - logger, err := common.NewLogger(loggerConfig) - if err != nil { - panic(fmt.Sprintf("Unable to instantiate logger: %s", err)) - } - - gethClient, err := geth.NewMultiHomingClient(config.RetrievalClientConfig.EthClientConfig, gethcommon.Address{}, logger) - if err != nil { - panic(fmt.Sprintf("Unable to instantiate geth client: %s", err)) + writerGroups: make(map[string]*WriterGroup), } - tx, err := eth.NewReader( - logger, - gethClient, - config.RetrievalClientConfig.BLSOperatorStateRetrieverAddr, - config.RetrievalClientConfig.EigenDAServiceManagerAddr) + // Initialize runtime config manager + configManager, err := trafficconfig.NewRuntimeConfigManager(config.RuntimeConfigPath, generator.handleConfigUpdate) if err != nil { - panic(fmt.Sprintf("Unable to instantiate transactor: %s", err)) + cancel() + return nil, fmt.Errorf("failed to initialize runtime config manager: %w", err) } + generator.configManager = configManager - cs := eth.NewChainState(tx, gethClient) + return generator, nil +} - chainState := thegraph.MakeIndexedChainState(*config.TheGraphConfig, cs, logger) +// handleConfigUpdate is called when the runtime configuration changes +func (generator *Generator) handleConfigUpdate(runtimeConfig *trafficconfig.RuntimeConfig) { + generator.mu.Lock() + defer generator.mu.Unlock() - var assignmentCoordinator core.AssignmentCoordinator = &core.StdAssignmentCoordinator{} + generator.logger.Info("Received runtime configuration update") - nodeClient := clients.NewNodeClient(config.NodeClientTimeout) + // Track existing groups to identify which ones to remove + existingGroups := make(map[string]bool) + for name := range generator.writerGroups { + existingGroups[name] = true + } - config.RetrievalClientConfig.EncoderConfig.LoadG2Points = true - v, err := verifier.NewVerifier(&config.RetrievalClientConfig.EncoderConfig, nil) - if err != nil { - panic(fmt.Sprintf("Unable to build statusTracker: %s", err)) + // Update or create writer groups + for _, groupConfig := range runtimeConfig.WriterGroups { + delete(existingGroups, groupConfig.Name) + + writerConfig := &trafficconfig.BlobWriterConfig{ + NumWriteInstances: groupConfig.NumWriteInstances, + WriteRequestInterval: groupConfig.WriteRequestInterval, + DataSize: groupConfig.DataSize, + RandomizeBlobs: groupConfig.RandomizeBlobs, + WriteTimeout: groupConfig.WriteTimeout, + CustomQuorums: groupConfig.CustomQuorums, + } + + group, exists := generator.writerGroups[groupConfig.Name] + if !exists { + group = &WriterGroup{ + name: groupConfig.Name, + writers: make([]*workers.BlobWriter, 0), + cancels: make(map[*workers.BlobWriter]context.CancelFunc), + } + generator.writerGroups[groupConfig.Name] = group + } + + // Update writer count + currentWriters := len(group.writers) + targetWriters := int(groupConfig.NumWriteInstances) + + // Scale down if needed + if targetWriters < currentWriters { + for i := targetWriters; i < currentWriters; i++ { + if cancel, exists := group.cancels[group.writers[i]]; exists { + cancel() + delete(group.cancels, group.writers[i]) + } + } + group.writers = group.writers[:targetWriters] + } + + // Scale up if needed + if targetWriters > currentWriters { + for i := currentWriters; i < targetWriters; i++ { + writerCtx, writerCancel := context.WithCancel(*generator.ctx) + writer := workers.NewBlobWriter( + groupConfig.Name, + &writerCtx, + writerConfig, + generator.waitGroup, + generator.logger, + generator.disperserClient, + generator.generatorMetrics) + group.writers = append(group.writers, &writer) + group.cancels[&writer] = writerCancel + writer.Start() + } + } + + // Update configuration for existing writers + for _, writer := range group.writers[:min(currentWriters, targetWriters)] { + writer.UpdateConfig(writerConfig) + } } - retriever, err := clients.NewRetrievalClient( - logger, - chainState, - assignmentCoordinator, - nodeClient, - v, - config.RetrievalClientConfig.NumConnections) + // Remove any groups that are no longer in the config + for name := range existingGroups { + group := generator.writerGroups[name] + for _, writer := range group.writers { + if cancel, exists := group.cancels[writer]; exists { + cancel() + } + } + delete(generator.writerGroups, name) + } +} - if err != nil { - panic(fmt.Sprintf("Unable to build retriever: %s", err)) +// Start instantiates goroutines that generate read/write traffic. +func (generator *Generator) Start() error { + // Start metrics server + if err := generator.generatorMetrics.Start(); err != nil { + return fmt.Errorf("failed to start metrics server: %w", err) } - chainClient := retrivereth.NewChainClient(gethClient, logger) + // Start runtime config watcher if configured + if generator.configManager != nil { + generator.configManager.StartWatching(*generator.ctx) + } - return retriever, chainClient + // Wait for context cancellation to keep the process running + <-(*generator.ctx).Done() + generator.logger.Info("Generator received stop signal") + return nil } -// Start instantiates goroutines that generate read/write traffic, continues until a SIGTERM is observed. -func (generator *Generator) Start() error { +func (generator *Generator) Stop() error { + // Cancel context to stop all workers + (*generator.cancel)() - generator.generatorMetrics.Start() - generator.statusTracker.Start() + // Set a timeout for graceful shutdown + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer shutdownCancel() - for _, writer := range generator.writers { - writer.Start() - time.Sleep(generator.config.InstanceLaunchInterval) + // Shutdown metrics server + if err := generator.generatorMetrics.Shutdown(); err != nil { + generator.logger.Error("Failed to shutdown metrics server", "err", err) } - for _, reader := range generator.readers { - reader.Start() - time.Sleep(generator.config.InstanceLaunchInterval) + // Wait for all workers with timeout + done := make(chan struct{}) + go func() { + generator.waitGroup.Wait() + close(done) + }() + + select { + case <-done: + generator.logger.Info("All workers shut down gracefully") + return nil + case <-shutdownCtx.Done(): + generator.logger.Warn("Shutdown timed out, forcing exit") + return fmt.Errorf("shutdown timed out after 10 seconds") } - - signals := make(chan os.Signal, 1) - signal.Notify(signals, os.Interrupt, syscall.SIGTERM) - <-signals - - (*generator.cancel)() - generator.waitGroup.Wait() - return nil } diff --git a/tools/traffic/metrics/count_metric.go b/tools/traffic/metrics/count_metric.go index daa508bb8..114a69c6e 100644 --- a/tools/traffic/metrics/count_metric.go +++ b/tools/traffic/metrics/count_metric.go @@ -14,15 +14,10 @@ type CountMetric interface { type countMetric struct { metrics *metrics description string - // disabled specifies whether the metrics should behave as a no-op - disabled bool } // Increment increments the count of a type of event. func (metric *countMetric) Increment() { - if metric.disabled { - return - } metric.metrics.count.WithLabelValues(metric.description).Inc() } diff --git a/tools/traffic/metrics/metrics.go b/tools/traffic/metrics/metrics.go index e24a52dfc..2aa5c8160 100644 --- a/tools/traffic/metrics/metrics.go +++ b/tools/traffic/metrics/metrics.go @@ -1,19 +1,23 @@ package metrics import ( + "context" "fmt" + "net/http" + "time" + "github.com/Layr-Labs/eigensdk-go/logging" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" - "net/http" - "strings" ) // Metrics allows the creation of metrics for the traffic generator. type Metrics interface { // Start starts the metrics server. - Start() + Start() error + // Shutdown shuts down the metrics server. + Shutdown() error // NewLatencyMetric creates a new LatencyMetric instance. Useful for reporting the latency of an operation. NewLatencyMetric(description string) LatencyMetric // NewCountMetric creates a new CountMetric instance. Useful for tracking the count of a type of event. @@ -33,55 +37,73 @@ type metrics struct { httpPort string logger logging.Logger - metricsBlacklist []string - metricsFuzzyBlacklist []string + shutdown func() error } // NewMetrics creates a new Metrics instance. func NewMetrics( httpPort string, logger logging.Logger, - metricsBlacklist []string, - metricsFuzzyBlacklist []string) Metrics { +) Metrics { namespace := "eigenda_generator" reg := prometheus.NewRegistry() reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) - if metricsBlacklist == nil { - metricsBlacklist = []string{} - } - if metricsFuzzyBlacklist == nil { - metricsFuzzyBlacklist = []string{} - } - metrics := &metrics{ - count: buildCounterCollector(namespace, reg), - latency: buildLatencyCollector(namespace, reg), - gauge: buildGaugeCollector(namespace, reg), - registry: reg, - httpPort: httpPort, - logger: logger.With("component", "GeneratorMetrics"), - metricsBlacklist: metricsBlacklist, - metricsFuzzyBlacklist: metricsFuzzyBlacklist, + count: buildCounterCollector(namespace, reg), + latency: buildLatencyCollector(namespace, reg), + gauge: buildGaugeCollector(namespace, reg), + registry: reg, + httpPort: httpPort, + logger: logger.With("component", "GeneratorMetrics"), } return metrics } -// Start starts the metrics server. -func (metrics *metrics) Start() { - metrics.logger.Info("Starting metrics server at ", "port", metrics.httpPort) +func (metrics *metrics) Start() error { + metrics.logger.Info("Starting metrics server", "port", metrics.httpPort) addr := fmt.Sprintf(":%s", metrics.httpPort) + // Create mux and add /metrics handler + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor( + metrics.registry, + promhttp.HandlerOpts{}, + )) + + srv := &http.Server{ + Addr: addr, + Handler: mux, + } + go func() { - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.HandlerFor( - metrics.registry, - promhttp.HandlerOpts{}, - )) - err := http.ListenAndServe(addr, mux) - panic(fmt.Sprintf("Prometheus server failed: %s", err)) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + metrics.logger.Error("Prometheus server failed", "err", err) + } }() + + // Store shutdown function + metrics.shutdown = func() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + metrics.logger.Info("Shutting down metrics server") + if err := srv.Shutdown(ctx); err != nil { + metrics.logger.Error("Metrics server shutdown failed", "err", err) + return err + } + return nil + } + + return nil +} + +func (metrics *metrics) Shutdown() error { + if metrics.shutdown != nil { + return metrics.shutdown() + } + return nil } // NewLatencyMetric creates a new LatencyMetric instance. @@ -89,7 +111,6 @@ func (metrics *metrics) NewLatencyMetric(description string) LatencyMetric { return &latencyMetric{ metrics: metrics, description: description, - disabled: metrics.isBlacklisted(description), } } @@ -98,7 +119,6 @@ func (metrics *metrics) NewCountMetric(description string) CountMetric { return &countMetric{ metrics: metrics, description: description, - disabled: metrics.isBlacklisted(description), } } @@ -107,21 +127,5 @@ func (metrics *metrics) NewGaugeMetric(description string) GaugeMetric { return &gaugeMetric{ metrics: metrics, description: description, - disabled: metrics.isBlacklisted(description), - } -} - -// isBlacklisted returns true if the metric name is blacklisted. -func (metrics *metrics) isBlacklisted(metricName string) bool { - for _, blacklisted := range metrics.metricsBlacklist { - if metricName == blacklisted { - return true - } - } - for _, blacklisted := range metrics.metricsFuzzyBlacklist { - if strings.Contains(metricName, blacklisted) { - return true - } } - return false } diff --git a/tools/traffic/metrics/mock_metrics.go b/tools/traffic/metrics/mock_metrics.go index 88eb11447..fb4018a61 100644 --- a/tools/traffic/metrics/mock_metrics.go +++ b/tools/traffic/metrics/mock_metrics.go @@ -48,8 +48,12 @@ func (m *MockMetrics) GetLatency(description string) time.Duration { return m.latencies[description] } -func (m *MockMetrics) Start() { - // intentional no-op +func (m *MockMetrics) Start() error { + return nil +} + +func (m *MockMetrics) Shutdown() error { + return nil } func (m *MockMetrics) NewLatencyMetric(description string) LatencyMetric { diff --git a/tools/traffic/table/blob_metadata.go b/tools/traffic/table/blob_metadata.go index 40a0b415e..24ee9dc81 100644 --- a/tools/traffic/table/blob_metadata.go +++ b/tools/traffic/table/blob_metadata.go @@ -1,6 +1,10 @@ package table -import "errors" +import ( + "errors" + + corev2 "github.com/Layr-Labs/eigenda/core/v2" +) // BlobMetadata encapsulates various information about a blob written by the traffic generator. type BlobMetadata struct { @@ -13,6 +17,9 @@ type BlobMetadata struct { // Hash of the batch header that the blob was written in. BatchHeaderHash [32]byte + // Blob header of the blob. + BlobHeader *corev2.BlobHeader + // Checksum of the blob. Checksum [16]byte diff --git a/tools/traffic/table/blob_store_test.go b/tools/traffic/table/blob_store_test.go index 99c9c91a0..37992675b 100644 --- a/tools/traffic/table/blob_store_test.go +++ b/tools/traffic/table/blob_store_test.go @@ -1,10 +1,11 @@ package table import ( + "testing" + tu "github.com/Layr-Labs/eigenda/common/testutils" "github.com/stretchr/testify/assert" "golang.org/x/exp/rand" - "testing" ) // randomMetadata generates a random BlobMetadata instance. diff --git a/tools/traffic/workers/blob_reader.go b/tools/traffic/workers/blob_reader.go deleted file mode 100644 index 033eb9ee6..000000000 --- a/tools/traffic/workers/blob_reader.go +++ /dev/null @@ -1,233 +0,0 @@ -package workers - -import ( - "context" - "crypto/md5" - "fmt" - "github.com/Layr-Labs/eigenda/api/clients" - "github.com/Layr-Labs/eigenda/core" - "github.com/Layr-Labs/eigenda/encoding" - "github.com/Layr-Labs/eigenda/retriever/eth" - "github.com/Layr-Labs/eigenda/tools/traffic/config" - "github.com/Layr-Labs/eigenda/tools/traffic/metrics" - "github.com/Layr-Labs/eigenda/tools/traffic/table" - "github.com/Layr-Labs/eigensdk-go/logging" - gcommon "github.com/ethereum/go-ethereum/common" - "math/big" - "sync" - "time" -) - -// BlobReader reads blobs from the DA network at a configured rate. -type BlobReader struct { - // The context for the generator. All work should cease when this context is cancelled. - ctx *context.Context - - // Tracks the number of active goroutines within the generator. - waitGroup *sync.WaitGroup - - // All logs should be written using this logger. - logger logging.Logger - - // config contains the configuration for the generator. - config *config.WorkerConfig - - retriever clients.RetrievalClient - chainClient eth.ChainClient - - // blobsToRead blobs we are required to read a certain number of times. - blobsToRead *table.BlobStore - - // metrics for the blob reader. - metrics *blobReaderMetrics -} - -type blobReaderMetrics struct { - generatorMetrics metrics.Metrics - fetchBatchHeaderMetric metrics.LatencyMetric - fetchBatchHeaderSuccess metrics.CountMetric - fetchBatchHeaderFailure metrics.CountMetric - readLatencyMetric metrics.LatencyMetric - readSuccessMetric metrics.CountMetric - readFailureMetric metrics.CountMetric - recombinationSuccessMetric metrics.CountMetric - recombinationFailureMetric metrics.CountMetric - validBlobMetric metrics.CountMetric - invalidBlobMetric metrics.CountMetric - operatorSuccessMetrics map[core.OperatorID]metrics.CountMetric - operatorFailureMetrics map[core.OperatorID]metrics.CountMetric - requiredReadPoolSizeMetric metrics.GaugeMetric - optionalReadPoolSizeMetric metrics.GaugeMetric -} - -// NewBlobReader creates a new BlobReader instance. -func NewBlobReader( - ctx *context.Context, - waitGroup *sync.WaitGroup, - logger logging.Logger, - config *config.WorkerConfig, - retriever clients.RetrievalClient, - chainClient eth.ChainClient, - blobStore *table.BlobStore, - generatorMetrics metrics.Metrics) BlobReader { - - return BlobReader{ - ctx: ctx, - waitGroup: waitGroup, - logger: logger, - config: config, - retriever: retriever, - chainClient: chainClient, - blobsToRead: blobStore, - metrics: &blobReaderMetrics{ - generatorMetrics: generatorMetrics, - fetchBatchHeaderMetric: generatorMetrics.NewLatencyMetric("fetch_batch_header"), - fetchBatchHeaderSuccess: generatorMetrics.NewCountMetric("fetch_batch_header_success"), - fetchBatchHeaderFailure: generatorMetrics.NewCountMetric("fetch_batch_header_failure"), - recombinationSuccessMetric: generatorMetrics.NewCountMetric("recombination_success"), - recombinationFailureMetric: generatorMetrics.NewCountMetric("recombination_failure"), - readLatencyMetric: generatorMetrics.NewLatencyMetric("read"), - validBlobMetric: generatorMetrics.NewCountMetric("valid_blob"), - invalidBlobMetric: generatorMetrics.NewCountMetric("invalid_blob"), - readSuccessMetric: generatorMetrics.NewCountMetric("read_success"), - readFailureMetric: generatorMetrics.NewCountMetric("read_failure"), - operatorSuccessMetrics: make(map[core.OperatorID]metrics.CountMetric), - operatorFailureMetrics: make(map[core.OperatorID]metrics.CountMetric), - requiredReadPoolSizeMetric: generatorMetrics.NewGaugeMetric("required_read_pool_size"), - optionalReadPoolSizeMetric: generatorMetrics.NewGaugeMetric("optional_read_pool_size"), - }, - } -} - -// Start begins a blob reader goroutine. -func (r *BlobReader) Start() { - r.waitGroup.Add(1) - ticker := time.NewTicker(r.config.ReadRequestInterval) - go func() { - defer r.waitGroup.Done() - for { - select { - case <-(*r.ctx).Done(): - err := (*r.ctx).Err() - if err != nil { - r.logger.Info("blob reader context closed", "err:", err) - } - return - case <-ticker.C: - r.randomRead() - } - } - }() -} - -// randomRead reads a random blob. -func (r *BlobReader) randomRead() { - metadata := r.blobsToRead.GetNext() - if metadata == nil { - // There are no blobs that we are required to read. - return - } - - r.metrics.requiredReadPoolSizeMetric.Set(float64(r.blobsToRead.Size())) - - ctxTimeout, cancel := context.WithTimeout(*r.ctx, r.config.FetchBatchHeaderTimeout) - defer cancel() - - start := time.Now() - batchHeader, err := r.chainClient.FetchBatchHeader( - ctxTimeout, - gcommon.HexToAddress(r.config.EigenDAServiceManager), - metadata.BatchHeaderHash[:], - big.NewInt(int64(0)), - nil) - if err != nil { - r.logger.Error("failed to get batch header", "err:", err) - r.metrics.fetchBatchHeaderFailure.Increment() - return - } - r.metrics.fetchBatchHeaderMetric.ReportLatency(time.Since(start)) - - r.metrics.fetchBatchHeaderSuccess.Increment() - - ctxTimeout, cancel = context.WithTimeout(*r.ctx, r.config.RetrieveBlobChunksTimeout) - defer cancel() - - start = time.Now() - chunks, err := r.retriever.RetrieveBlobChunks( - ctxTimeout, - metadata.BatchHeaderHash, - uint32(metadata.BlobIndex), - uint(batchHeader.ReferenceBlockNumber), - batchHeader.BlobHeadersRoot, - core.QuorumID(0)) - if err != nil { - r.logger.Error("failed to read chunks", "err:", err) - r.metrics.readFailureMetric.Increment() - return - } - r.metrics.readLatencyMetric.ReportLatency(time.Since(start)) - - r.metrics.readSuccessMetric.Increment() - - assignments := chunks.Assignments - - data, err := r.retriever.CombineChunks(chunks) - if err != nil { - r.logger.Error("failed to combine chunks", "err:", err) - r.metrics.recombinationFailureMetric.Increment() - return - } - r.metrics.recombinationSuccessMetric.Increment() - - r.verifyBlob(metadata, &data) - - indexSet := make(map[encoding.ChunkNumber]bool) - for index := range chunks.Indices { - indexSet[chunks.Indices[index]] = true - } - - for id, assignment := range assignments { - for index := assignment.StartIndex; index < assignment.StartIndex+assignment.NumChunks; index++ { - if indexSet[index] { - r.reportChunk(id) - } else { - r.reportMissingChunk(id) - } - } - } -} - -// reportChunk reports a successful chunk read. -func (r *BlobReader) reportChunk(operatorId core.OperatorID) { - metric, exists := r.metrics.operatorSuccessMetrics[operatorId] - if !exists { - metric = r.metrics.generatorMetrics.NewCountMetric(fmt.Sprintf("operator_%x_returned_chunk", operatorId)) - r.metrics.operatorSuccessMetrics[operatorId] = metric - } - - metric.Increment() -} - -// reportMissingChunk reports a missing chunk. -func (r *BlobReader) reportMissingChunk(operatorId core.OperatorID) { - metric, exists := r.metrics.operatorFailureMetrics[operatorId] - if !exists { - metric = r.metrics.generatorMetrics.NewCountMetric(fmt.Sprintf("operator_%x_witheld_chunk", operatorId)) - r.metrics.operatorFailureMetrics[operatorId] = metric - } - - metric.Increment() -} - -// verifyBlob performs sanity checks on the blob. -func (r *BlobReader) verifyBlob(metadata *table.BlobMetadata, blob *[]byte) { - // Trim off the padding. - truncatedBlob := (*blob)[:metadata.Size] - recomputedChecksum := md5.Sum(truncatedBlob) - - if metadata.Checksum == recomputedChecksum { - r.metrics.validBlobMetric.Increment() - } else { - r.metrics.invalidBlobMetric.Increment() - } -} diff --git a/tools/traffic/workers/blob_reader_test.go b/tools/traffic/workers/blob_reader_test.go deleted file mode 100644 index ea9d5f077..000000000 --- a/tools/traffic/workers/blob_reader_test.go +++ /dev/null @@ -1,151 +0,0 @@ -package workers - -import ( - "context" - "crypto/md5" - "sync" - "testing" - "time" - - "github.com/Layr-Labs/eigenda/api/clients" - apiMock "github.com/Layr-Labs/eigenda/api/clients/mock" - "github.com/Layr-Labs/eigenda/common" - tu "github.com/Layr-Labs/eigenda/common/testutils" - binding "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager" - retrieverMock "github.com/Layr-Labs/eigenda/retriever/mock" - "github.com/Layr-Labs/eigenda/tools/traffic/config" - "github.com/Layr-Labs/eigenda/tools/traffic/metrics" - "github.com/Layr-Labs/eigenda/tools/traffic/table" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "golang.org/x/exp/rand" -) - -// TestBlobReaderNoOptionalReads tests the BlobReader's basic functionality' -func TestBlobReader(t *testing.T) { - tu.InitializeRandom() - - ctx, cancel := context.WithCancel(context.Background()) - waitGroup := sync.WaitGroup{} - logger, err := common.NewLogger(common.DefaultLoggerConfig()) - assert.Nil(t, err) - - blobTable := table.NewBlobStore() - - readerMetrics := metrics.NewMockMetrics() - - chainClient := &retrieverMock.MockChainClient{} - chainClient.On( - "FetchBatchHeader", - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything).Return(&binding.BatchHeader{}, nil) - retrievalClient := &apiMock.MockRetrievalClient{} - - blobReader := NewBlobReader( - &ctx, - &waitGroup, - logger, - &config.WorkerConfig{}, - retrievalClient, - chainClient, - blobTable, - readerMetrics) - - blobSize := 1024 - readPermits := 2 - blobCount := 100 - - invalidBlobCount := 0 - - // Insert some blobs into the table. - for i := 0; i < blobCount; i++ { - - key := make([]byte, 32) - _, err = rand.Read(key) - assert.Nil(t, err) - - blobData := make([]byte, blobSize) - _, err = rand.Read(blobData) - assert.Nil(t, err) - - var checksum [16]byte - if i%10 == 0 { - // Simulate an invalid blob - invalidBlobCount++ - _, err = rand.Read(checksum[:]) - assert.Nil(t, err) - } else { - checksum = md5.Sum(blobData) - } - - batchHeaderHash := [32]byte{} - _, err = rand.Read(batchHeaderHash[:]) - assert.Nil(t, err) - - blobMetadata, err := table.NewBlobMetadata( - key, - checksum, - uint(blobSize), - uint(i), - batchHeaderHash, - readPermits) - assert.Nil(t, err) - - // Simplify tracking by hijacking the BlobHeaderLength field to store the blob index, - // which is used as a unique identifier within this test. - chunks := &clients.BlobChunks{BlobHeaderLength: blobMetadata.BlobIndex} - retrievalClient.On("RetrieveBlobChunks", - blobMetadata.BatchHeaderHash, - uint32(blobMetadata.BlobIndex), - mock.Anything, - mock.Anything, - mock.Anything).Return(chunks, nil) - retrievalClient.On("CombineChunks", chunks).Return(blobData, nil) - - blobTable.Add(blobMetadata) - } - - // Do a bunch of reads. - expectedTotalReads := uint(readPermits * blobCount) - for i := uint(0); i < expectedTotalReads; i++ { - blobReader.randomRead() - - chainClient.AssertNumberOfCalls(t, "FetchBatchHeader", int(i+1)) - retrievalClient.AssertNumberOfCalls(t, "RetrieveBlobChunks", int(i+1)) - retrievalClient.AssertNumberOfCalls(t, "CombineChunks", int(i+1)) - - remainingPermits := uint(0) - for _, metadata := range blobTable.GetAll() { - remainingPermits += uint(metadata.RemainingReadPermits) - } - assert.Equal(t, remainingPermits, expectedTotalReads-i-1) - - assert.Equal(t, i+1, uint(readerMetrics.GetCount("read_success"))) - assert.Equal(t, i+1, uint(readerMetrics.GetCount("fetch_batch_header_success"))) - assert.Equal(t, i+1, uint(readerMetrics.GetCount("recombination_success"))) - } - - expectedInvalidBlobs := uint(invalidBlobCount * readPermits) - expectedValidBlobs := expectedTotalReads - expectedInvalidBlobs - - assert.Equal(t, expectedValidBlobs, uint(readerMetrics.GetCount("valid_blob"))) - assert.Equal(t, expectedInvalidBlobs, uint(readerMetrics.GetCount("invalid_blob"))) - assert.Equal(t, uint(0), uint(readerMetrics.GetGaugeValue("required_read_pool_size"))) - assert.Equal(t, uint(0), uint(readerMetrics.GetGaugeValue("optional_read_pool_size"))) - - // Table is empty, so doing a random read should have no effect. - blobReader.randomRead() - - // Give the system a moment to attempt to do work. This should not result in any reads. - time.Sleep(time.Second / 10) - assert.Equal(t, expectedTotalReads, uint(readerMetrics.GetCount("read_success"))) - assert.Equal(t, expectedTotalReads, uint(readerMetrics.GetCount("fetch_batch_header_success"))) - assert.Equal(t, expectedTotalReads, uint(readerMetrics.GetCount("recombination_success"))) - assert.Equal(t, expectedValidBlobs, uint(readerMetrics.GetCount("valid_blob"))) - assert.Equal(t, expectedInvalidBlobs, uint(readerMetrics.GetCount("invalid_blob"))) - - cancel() -} diff --git a/tools/traffic/workers/blob_status_tracker.go b/tools/traffic/workers/blob_status_tracker.go deleted file mode 100644 index 15bcca84b..000000000 --- a/tools/traffic/workers/blob_status_tracker.go +++ /dev/null @@ -1,256 +0,0 @@ -package workers - -import ( - "context" - "math/rand" - "sync" - "time" - - "github.com/Layr-Labs/eigenda/api/clients" - "github.com/Layr-Labs/eigenda/api/grpc/disperser" - "github.com/Layr-Labs/eigenda/tools/traffic/config" - "github.com/Layr-Labs/eigenda/tools/traffic/metrics" - "github.com/Layr-Labs/eigenda/tools/traffic/table" - "github.com/Layr-Labs/eigensdk-go/logging" -) - -// BlobStatusTracker periodically polls the disperser service to verify the status of blobs that were recently written. -// When blobs become confirmed, the status tracker updates the blob blobsToRead accordingly. -// This is a thread safe data structure. -type BlobStatusTracker struct { - - // The context for the generator. All work should cease when this context is cancelled. - ctx *context.Context - - // Tracks the number of active goroutines within the generator. - waitGroup *sync.WaitGroup - - // All logs should be written using this logger. - logger logging.Logger - - // config contains the configuration for the generator. - config *config.WorkerConfig - - // Contains confirmed blobs. Blobs are added here when they are confirmed by the disperser service. - confirmedBlobs *table.BlobStore - - // The disperser client used to monitor the disperser service. - disperser clients.DisperserClient - - // The keys of blobs that have not yet been confirmed by the disperser service. - unconfirmedBlobs []*UnconfirmedKey - - // Newly added keys that require verification. - keyChannel chan *UnconfirmedKey - - blobsInFlightMetric metrics.GaugeMetric - getStatusLatencyMetric metrics.LatencyMetric - confirmationLatencyMetric metrics.LatencyMetric - getStatusErrorCountMetric metrics.CountMetric - unknownCountMetric metrics.CountMetric - processingCountMetric metrics.CountMetric - dispersingCountMetric metrics.CountMetric - failedCountMetric metrics.CountMetric - insufficientSignaturesCountMetric metrics.CountMetric - confirmedCountMetric metrics.CountMetric - finalizedCountMetric metrics.CountMetric -} - -// NewBlobStatusTracker creates a new BlobStatusTracker instance. -func NewBlobStatusTracker( - ctx *context.Context, - waitGroup *sync.WaitGroup, - logger logging.Logger, - config *config.WorkerConfig, - keyChannel chan *UnconfirmedKey, - table *table.BlobStore, - disperser clients.DisperserClient, - generatorMetrics metrics.Metrics) BlobStatusTracker { - - return BlobStatusTracker{ - ctx: ctx, - waitGroup: waitGroup, - logger: logger, - config: config, - keyChannel: keyChannel, - confirmedBlobs: table, - disperser: disperser, - unconfirmedBlobs: make([]*UnconfirmedKey, 0), - blobsInFlightMetric: generatorMetrics.NewGaugeMetric("blobs_in_flight"), - getStatusLatencyMetric: generatorMetrics.NewLatencyMetric("get_status"), - confirmationLatencyMetric: generatorMetrics.NewLatencyMetric("confirmation"), - getStatusErrorCountMetric: generatorMetrics.NewCountMetric("get_status_ERROR"), - unknownCountMetric: generatorMetrics.NewCountMetric("get_status_UNKNOWN"), - processingCountMetric: generatorMetrics.NewCountMetric("get_status_PROCESSING"), - dispersingCountMetric: generatorMetrics.NewCountMetric("get_status_DISPERSING"), - failedCountMetric: generatorMetrics.NewCountMetric("get_status_FAILED"), - insufficientSignaturesCountMetric: generatorMetrics.NewCountMetric("get_status_INSUFFICIENT_SIGNATURES"), - confirmedCountMetric: generatorMetrics.NewCountMetric("get_status_CONFIRMED"), - finalizedCountMetric: generatorMetrics.NewCountMetric("get_status_FINALIZED"), - } -} - -// Start begins the status goroutine, which periodically polls -// the disperser service to verify the status of blobs. -func (tracker *BlobStatusTracker) Start() { - tracker.waitGroup.Add(1) - go tracker.monitor() -} - -// monitor periodically polls the disperser service to verify the status of blobs. -func (tracker *BlobStatusTracker) monitor() { - ticker := time.NewTicker(tracker.config.TrackerInterval) - for { - select { - case <-(*tracker.ctx).Done(): - tracker.waitGroup.Done() - return - case key := <-tracker.keyChannel: - tracker.unconfirmedBlobs = append(tracker.unconfirmedBlobs, key) - case <-ticker.C: - tracker.poll() - } - } -} - -// poll checks all unconfirmed keys to see if they have been confirmed by the disperser service. -// If a Key is confirmed, it is added to the blob confirmedBlobs and removed from the list of unconfirmed keys. -func (tracker *BlobStatusTracker) poll() { - - // FUTURE WORK If the number of unconfirmed blobs is high and the time to confirm is high, this is not efficient. - // Revisit this method if there are performance problems. - - nonFinalBlobs := make([]*UnconfirmedKey, 0) - for _, key := range tracker.unconfirmedBlobs { - - blobStatus, err := tracker.getBlobStatus(key) - if err != nil { - tracker.logger.Error("failed to get blob status: ", "err:", err) - // There was an error getting status. Try again later. - nonFinalBlobs = append(nonFinalBlobs, key) - continue - } - - tracker.updateStatusMetrics(blobStatus.Status) - if isBlobStatusTerminal(blobStatus.Status) { - if isBlobStatusConfirmed(blobStatus.Status) { - tracker.forwardToReader(key, blobStatus) - } - } else { - // try again later - nonFinalBlobs = append(nonFinalBlobs, key) - } - } - tracker.unconfirmedBlobs = nonFinalBlobs - tracker.blobsInFlightMetric.Set(float64(len(tracker.unconfirmedBlobs))) -} - -// isBlobStatusTerminal returns true if the status is a terminal status. -func isBlobStatusTerminal(status disperser.BlobStatus) bool { - switch status { - case disperser.BlobStatus_FAILED: - return true - case disperser.BlobStatus_INSUFFICIENT_SIGNATURES: - return true - case disperser.BlobStatus_CONFIRMED: - // Technically this isn't terminal, as confirmed blobs eventually should become finalized. - // But it is terminal from the status tracker's perspective, since we stop tracking the blob - // once it becomes either confirmed or finalized. - return true - case disperser.BlobStatus_FINALIZED: - return true - default: - return false - } -} - -// isBlobStatusConfirmed returns true if the status is a confirmed status. -func isBlobStatusConfirmed(status disperser.BlobStatus) bool { - switch status { - case disperser.BlobStatus_CONFIRMED: - return true - case disperser.BlobStatus_FINALIZED: - return true - default: - return false - } -} - -// updateStatusMetrics updates the metrics for the reported status of a blob. -func (tracker *BlobStatusTracker) updateStatusMetrics(status disperser.BlobStatus) { - switch status { - case disperser.BlobStatus_UNKNOWN: - tracker.unknownCountMetric.Increment() - case disperser.BlobStatus_PROCESSING: - tracker.processingCountMetric.Increment() - case disperser.BlobStatus_DISPERSING: - tracker.dispersingCountMetric.Increment() - case disperser.BlobStatus_FAILED: - tracker.failedCountMetric.Increment() - case disperser.BlobStatus_INSUFFICIENT_SIGNATURES: - tracker.insufficientSignaturesCountMetric.Increment() - case disperser.BlobStatus_CONFIRMED: - tracker.confirmedCountMetric.Increment() - case disperser.BlobStatus_FINALIZED: - tracker.finalizedCountMetric.Increment() - default: - tracker.logger.Error("unknown blob status", "status:", status) - } -} - -// getBlobStatus gets the status of a blob from the disperser service. Returns nil if there was an error -// getting the status. -func (tracker *BlobStatusTracker) getBlobStatus(key *UnconfirmedKey) (*disperser.BlobStatusReply, error) { - ctxTimeout, cancel := context.WithTimeout(*tracker.ctx, tracker.config.GetBlobStatusTimeout) - defer cancel() - - start := time.Now() - status, err := tracker.disperser.GetBlobStatus(ctxTimeout, key.Key) - - if err != nil { - tracker.getStatusErrorCountMetric.Increment() - return nil, err - } - tracker.getStatusLatencyMetric.ReportLatency(time.Since(start)) - - return status, nil -} - -// forwardToReader forwards a blob to the reader. Only called once the blob is ready to be read. -func (tracker *BlobStatusTracker) forwardToReader(key *UnconfirmedKey, status *disperser.BlobStatusReply) { - batchHeaderHash := [32]byte(status.GetInfo().BlobVerificationProof.BatchMetadata.BatchHeaderHash) - blobIndex := status.GetInfo().BlobVerificationProof.GetBlobIndex() - - confirmationTime := time.Now() - confirmationLatency := confirmationTime.Sub(key.SubmissionTime) - tracker.confirmationLatencyMetric.ReportLatency(confirmationLatency) - - requiredDownloads := tracker.config.RequiredDownloads - var downloadCount int32 - if requiredDownloads < 0 { - // Allow unlimited downloads. - downloadCount = -1 - } else if requiredDownloads == 0 { - // Do not download blob. - return - } else if requiredDownloads < 1 { - // Download blob with probability equal to requiredDownloads. - if rand.Float64() < requiredDownloads { - // Download the blob once. - downloadCount = 1 - } else { - // Do not download blob. - return - } - } else { - // Download blob requiredDownloads times. - downloadCount = int32(requiredDownloads) - } - - blobMetadata, err := table.NewBlobMetadata(key.Key, key.Checksum, key.Size, uint(blobIndex), batchHeaderHash, int(downloadCount)) - if err != nil { - tracker.logger.Error("failed to create blob metadata", "err:", err) - return - } - tracker.confirmedBlobs.Add(blobMetadata) -} diff --git a/tools/traffic/workers/blob_status_tracker_test.go b/tools/traffic/workers/blob_status_tracker_test.go deleted file mode 100644 index 9246e2052..000000000 --- a/tools/traffic/workers/blob_status_tracker_test.go +++ /dev/null @@ -1,205 +0,0 @@ -package workers - -import ( - "context" - "fmt" - disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" - "github.com/Layr-Labs/eigenda/common" - tu "github.com/Layr-Labs/eigenda/common/testutils" - "github.com/Layr-Labs/eigenda/tools/traffic/config" - "github.com/Layr-Labs/eigenda/tools/traffic/metrics" - "github.com/Layr-Labs/eigenda/tools/traffic/table" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "golang.org/x/exp/rand" - "sync" - "testing" - "time" -) - -func getRandomStatus() disperser_rpc.BlobStatus { - return disperser_rpc.BlobStatus(rand.Intn(7)) -} - -func isStatusTerminal(status disperser_rpc.BlobStatus) bool { - switch status { - case disperser_rpc.BlobStatus_UNKNOWN: - return false - case disperser_rpc.BlobStatus_PROCESSING: - return false - case disperser_rpc.BlobStatus_DISPERSING: - return false - - case disperser_rpc.BlobStatus_INSUFFICIENT_SIGNATURES: - return true - case disperser_rpc.BlobStatus_FAILED: - return true - case disperser_rpc.BlobStatus_FINALIZED: - return true - case disperser_rpc.BlobStatus_CONFIRMED: - return true - default: - panic("unknown status") - } -} - -func isStatusSuccess(status disperser_rpc.BlobStatus) bool { - switch status { - case disperser_rpc.BlobStatus_CONFIRMED: - return true - case disperser_rpc.BlobStatus_FINALIZED: - return true - default: - return false - } -} - -func TestStatusTracker(t *testing.T) { - tu.InitializeRandom() - - ctx, cancel := context.WithCancel(context.Background()) - waitGroup := sync.WaitGroup{} - logger, err := common.NewLogger(common.DefaultLoggerConfig()) - assert.Nil(t, err) - - requiredDownloads := rand.Intn(10) + 1 - config := &config.WorkerConfig{ - RequiredDownloads: float64(requiredDownloads), - } - - blobStore := table.NewBlobStore() - - trackerMetrics := metrics.NewMockMetrics() - - disperserClient := &MockDisperserClient{} - - tracker := NewBlobStatusTracker( - &ctx, - &waitGroup, - logger, - config, - make(chan *UnconfirmedKey), - blobStore, - disperserClient, - trackerMetrics) - - expectedGetStatusCount := 0 - statusCounts := make(map[disperser_rpc.BlobStatus]int) - checksums := make(map[string][16]byte) - sizes := make(map[string]uint) - - statusMap := make(map[string]disperser_rpc.BlobStatus) - - for i := 0; i < 100; i++ { - - // Add some new keys to track. - newKeys := rand.Intn(10) - for j := 0; j < newKeys; j++ { - key := make([]byte, 16) - checksum := [16]byte{} - size := rand.Uint32() - - _, err = rand.Read(key) - assert.Nil(t, err) - _, err = rand.Read(checksum[:]) - assert.Nil(t, err) - - checksums[string(key)] = checksum - sizes[string(key)] = uint(size) - - stringifiedKey := string(key) - statusMap[stringifiedKey] = disperser_rpc.BlobStatus_UNKNOWN - - unconfirmedKey := &UnconfirmedKey{ - Key: key, - Checksum: checksum, - Size: uint(size), - SubmissionTime: time.Now(), - } - - tracker.unconfirmedBlobs = append(tracker.unconfirmedBlobs, unconfirmedKey) - } - - // Reset the mock disperser client. - disperserClient.mock = mock.Mock{} - expectedGetStatusCount = 0 - - // Choose some new statuses to be returned. - // Count the number of status queries we expect to see in this iteration. - for key, status := range statusMap { - var newStatus disperser_rpc.BlobStatus - if isStatusTerminal(status) { - newStatus = status - } else { - // Blobs in a non-terminal status will be queried again. - expectedGetStatusCount += 1 - // Set the next status to be returned. - newStatus = getRandomStatus() - statusMap[key] = newStatus - - statusCounts[newStatus] += 1 - } - disperserClient.mock.On("GetBlobStatus", []byte(key)).Return( - &disperser_rpc.BlobStatusReply{ - Status: newStatus, - Info: &disperser_rpc.BlobInfo{ - BlobVerificationProof: &disperser_rpc.BlobVerificationProof{ - BatchMetadata: &disperser_rpc.BatchMetadata{ - BatchHeaderHash: make([]byte, 32), - }, - }, - }, - }, nil) - } - - // Simulate advancement of time, allowing the tracker to process the new keys. - tracker.poll() - - // Validate the number of calls made to the disperser client. - disperserClient.mock.AssertNumberOfCalls(t, "GetBlobStatus", expectedGetStatusCount) - - // Read the data in the confirmedBlobs into a map for quick lookup. - tableData := make(map[string]*table.BlobMetadata) - for _, metadata := range blobStore.GetAll() { - tableData[string(metadata.Key)] = metadata - } - - blobsInFlight := 0 - for key, status := range statusMap { - metadata, present := tableData[key] - - if !isStatusTerminal(status) { - blobsInFlight++ - } - - if isStatusSuccess(status) { - // Successful blobs should be in the confirmedBlobs. - assert.True(t, present) - } else { - // Non-successful blobs should not be in the confirmedBlobs. - assert.False(t, present) - } - - // Verify metadata. - if present { - assert.Equal(t, checksums[key], metadata.Checksum) - assert.Equal(t, sizes[key], metadata.Size) - assert.Equal(t, requiredDownloads, metadata.RemainingReadPermits) - } - } - - // Verify metrics. - for status, count := range statusCounts { // TODO - metricName := fmt.Sprintf("get_status_%s", status.String()) - assert.Equal(t, float64(count), trackerMetrics.GetCount(metricName), "status: %s", status.String()) - } - if float64(blobsInFlight) != trackerMetrics.GetGaugeValue("blobs_in_flight") { - assert.Equal(t, float64(blobsInFlight), trackerMetrics.GetGaugeValue("blobs_in_flight")) - } - } - - cancel() - tu.ExecuteWithTimeout(func() { - waitGroup.Wait() - }, time.Second) -} diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go index e0add4b35..03f759cd7 100644 --- a/tools/traffic/workers/blob_writer.go +++ b/tools/traffic/workers/blob_writer.go @@ -2,13 +2,13 @@ package workers import ( "context" - "crypto/md5" "crypto/rand" "fmt" "sync" "time" - "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/api/clients/v2" + v2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/encoding/utils/codec" "github.com/Layr-Labs/eigenda/tools/traffic/config" "github.com/Layr-Labs/eigenda/tools/traffic/metrics" @@ -17,6 +17,12 @@ import ( // BlobWriter sends blobs to a disperser at a configured rate. type BlobWriter struct { + // Name of the writer group this writer belongs to + name string + + // Config contains the configuration for the blob writer. + config *config.BlobWriterConfig + // The context for the generator. All work should cease when this context is cancelled. ctx *context.Context @@ -26,15 +32,9 @@ type BlobWriter struct { // All logs should be written using this logger. logger logging.Logger - // Config contains the configuration for the generator. - config *config.WorkerConfig - // disperser is the client used to send blobs to the disperser. disperser clients.DisperserClient - // Unconfirmed keys are sent here. - unconfirmedKeyChannel chan *UnconfirmedKey - // fixedRandomData contains random data for blobs if RandomizeBlobs is false, and nil otherwise. fixedRandomData []byte @@ -46,16 +46,22 @@ type BlobWriter struct { // writeFailureMetric is used to record the number of failed write requests. writeFailureMetric metrics.CountMetric + + // Mutex to protect configuration updates + configMutex sync.RWMutex + + // Ticker for controlling write intervals + ticker *time.Ticker } // NewBlobWriter creates a new BlobWriter instance. func NewBlobWriter( + name string, ctx *context.Context, + config *config.BlobWriterConfig, waitGroup *sync.WaitGroup, logger logging.Logger, - config *config.WorkerConfig, disperser clients.DisperserClient, - unconfirmedKeyChannel chan *UnconfirmedKey, generatorMetrics metrics.Metrics) BlobWriter { var fixedRandomData []byte @@ -73,71 +79,106 @@ func NewBlobWriter( } return BlobWriter{ - ctx: ctx, - waitGroup: waitGroup, - logger: logger, - config: config, - disperser: disperser, - unconfirmedKeyChannel: unconfirmedKeyChannel, - fixedRandomData: fixedRandomData, - writeLatencyMetric: generatorMetrics.NewLatencyMetric("write"), - writeSuccessMetric: generatorMetrics.NewCountMetric("write_success"), - writeFailureMetric: generatorMetrics.NewCountMetric("write_failure"), + name: name, + ctx: ctx, + waitGroup: waitGroup, + logger: logger, + config: config, + disperser: disperser, + fixedRandomData: fixedRandomData, + writeLatencyMetric: generatorMetrics.NewLatencyMetric("write"), + writeSuccessMetric: generatorMetrics.NewCountMetric("write_success"), + writeFailureMetric: generatorMetrics.NewCountMetric("write_failure"), } } // Start begins the blob writer goroutine. func (writer *BlobWriter) Start() { + writer.logger.Info("Starting blob writer", "name", writer.name) writer.waitGroup.Add(1) - ticker := time.NewTicker(writer.config.WriteRequestInterval) + writer.configMutex.Lock() + writer.ticker = time.NewTicker(writer.config.WriteRequestInterval) + writer.configMutex.Unlock() go func() { defer writer.waitGroup.Done() + defer writer.ticker.Stop() for { select { case <-(*writer.ctx).Done(): + writer.logger.Info("context cancelled, stopping blob writer", "name", writer.name) return - case <-ticker.C: - writer.writeNextBlob() + case <-writer.ticker.C: + if err := writer.writeNextBlob(); err != nil { + writer.logger.Error("failed to write blob", "name", writer.name, "err", err) + } } } }() } +// UpdateConfig updates the writer's configuration +func (writer *BlobWriter) UpdateConfig(config *config.BlobWriterConfig) { + writer.configMutex.Lock() + defer writer.configMutex.Unlock() + + // Update the ticker if the interval changed + if writer.config.WriteRequestInterval != config.WriteRequestInterval { + writer.ticker.Reset(config.WriteRequestInterval) + } + + // Update the fixed random data if needed + if writer.config.RandomizeBlobs != config.RandomizeBlobs || writer.config.DataSize != config.DataSize { + if config.RandomizeBlobs { + writer.fixedRandomData = nil + } else { + writer.fixedRandomData = make([]byte, config.DataSize) + _, err := rand.Read(writer.fixedRandomData) + if err != nil { + writer.logger.Error("failed to generate new fixed random data", "name", writer.name, "err", err) + return + } + writer.fixedRandomData = codec.ConvertByPaddingEmptyByte(writer.fixedRandomData) + } + } + + writer.config = config + writer.logger.Info("Updated blob writer configuration", + "name", writer.name, + "writeInterval", config.WriteRequestInterval, + "dataSize", config.DataSize, + "randomizeBlobs", config.RandomizeBlobs) +} + // writeNextBlob attempts to send a random blob to the disperser. -func (writer *BlobWriter) writeNextBlob() { +func (writer *BlobWriter) writeNextBlob() error { data, err := writer.getRandomData() if err != nil { - writer.logger.Error("failed to get random data", "err", err) - return + writer.logger.Error("failed to get random data", "name", writer.name, "err", err) + return err } start := time.Now() - key, err := writer.sendRequest(data) + _, err = writer.sendRequest(data) if err != nil { writer.writeFailureMetric.Increment() - writer.logger.Error("failed to send blob request", "err", err) - return - } else { - end := time.Now() - duration := end.Sub(start) - writer.writeLatencyMetric.ReportLatency(duration) + writer.logger.Error("failed to send blob request", "name", writer.name, "err", err) + return err } + end := time.Now() + duration := end.Sub(start) + writer.writeLatencyMetric.ReportLatency(duration) writer.writeSuccessMetric.Increment() - checksum := md5.Sum(data) - - writer.unconfirmedKeyChannel <- &UnconfirmedKey{ - Key: key, - Checksum: checksum, - Size: uint(len(data)), - SubmissionTime: time.Now(), - } + return nil } // getRandomData returns a slice of random data to be used for a blob. func (writer *BlobWriter) getRandomData() ([]byte, error) { + writer.configMutex.RLock() + defer writer.configMutex.RUnlock() + if writer.fixedRandomData != nil { return writer.fixedRandomData, nil } @@ -153,20 +194,29 @@ func (writer *BlobWriter) getRandomData() ([]byte, error) { } // sendRequest sends a blob to a disperser. -func (writer *BlobWriter) sendRequest(data []byte) (key []byte, err error) { - ctxTimeout, cancel := context.WithTimeout(*writer.ctx, writer.config.WriteTimeout) +func (writer *BlobWriter) sendRequest(data []byte) (key v2.BlobKey, err error) { + writer.configMutex.RLock() + writeTimeout := writer.config.WriteTimeout + customQuorums := writer.config.CustomQuorums + writer.configMutex.RUnlock() + + ctxTimeout, cancel := context.WithTimeout(*writer.ctx, writeTimeout) defer cancel() - if writer.config.SignerPrivateKey != "" { - _, key, err = writer.disperser.DisperseBlobAuthenticated( - ctxTimeout, - data, - writer.config.CustomQuorums) - } else { - _, key, err = writer.disperser.DisperseBlob( - ctxTimeout, - data, - writer.config.CustomQuorums) + writer.logger.Info("sending blob request", "name", writer.name, "size", len(data)) + status, key, err := writer.disperser.DisperseBlob( + ctxTimeout, + data, + 0, + customQuorums, + 0, + ) + if err != nil { + writer.logger.Error("failed to send blob request", "name", writer.name, "err", err) + return } + + writer.logger.Info("blob request sent", "name", writer.name, "key", key.Hex(), "status", status.String()) + return } diff --git a/tools/traffic/workers/blob_writer_test.go b/tools/traffic/workers/blob_writer_test.go index dcd70841c..2d0bfab82 100644 --- a/tools/traffic/workers/blob_writer_test.go +++ b/tools/traffic/workers/blob_writer_test.go @@ -2,19 +2,20 @@ package workers import ( "context" - "crypto/md5" "fmt" + "sync" + "testing" + "github.com/Layr-Labs/eigenda/common" tu "github.com/Layr-Labs/eigenda/common/testutils" - "github.com/Layr-Labs/eigenda/disperser" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/Layr-Labs/eigenda/encoding/utils/codec" "github.com/Layr-Labs/eigenda/tools/traffic/config" "github.com/Layr-Labs/eigenda/tools/traffic/metrics" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "golang.org/x/exp/rand" - "sync" - "testing" ) func TestBlobWriter(t *testing.T) { @@ -26,47 +27,30 @@ func TestBlobWriter(t *testing.T) { assert.Nil(t, err) dataSize := rand.Uint64()%1024 + 64 - encodedDataSize := len(codec.ConvertByPaddingEmptyByte(make([]byte, dataSize))) - - authenticated := rand.Intn(2) == 0 - var signerPrivateKey string - if authenticated { - signerPrivateKey = "asdf" - } - var functionName string - if authenticated { - functionName = "DisperseBlobAuthenticated" - } else { - functionName = "DisperseBlob" - } randomizeBlobs := rand.Intn(2) == 0 - useCustomQuorum := rand.Intn(2) == 0 var customQuorum []uint8 if useCustomQuorum { customQuorum = []uint8{1, 2, 3} } - config := &config.WorkerConfig{ - DataSize: dataSize, - SignerPrivateKey: signerPrivateKey, - RandomizeBlobs: randomizeBlobs, - CustomQuorums: customQuorum, + config := &config.BlobWriterConfig{ + DataSize: dataSize, + RandomizeBlobs: randomizeBlobs, + CustomQuorums: customQuorum, } disperserClient := &MockDisperserClient{} - unconfirmedKeyChannel := make(chan *UnconfirmedKey, 100) - generatorMetrics := metrics.NewMockMetrics() writer := NewBlobWriter( + "test-writer", &ctx, + config, &waitGroup, logger, - config, disperserClient, - unconfirmedKeyChannel, generatorMetrics) errorCount := 0 @@ -83,37 +67,38 @@ func TestBlobWriter(t *testing.T) { } // This is the Key that will be assigned to the next blob. - keyToReturn := make([]byte, 32) - _, err = rand.Read(keyToReturn) + var keyToReturn corev2.BlobKey + _, err = rand.Read(keyToReturn[:]) assert.Nil(t, err) - status := disperser.Processing + status := dispv2.Queued disperserClient.mock = mock.Mock{} // reset mock state - disperserClient.mock.On(functionName, mock.Anything, customQuorum).Return(&status, keyToReturn, errorToReturn) + disperserClient.mock.On("DisperseBlob", + mock.Anything, + mock.AnythingOfType("[]uint8"), + mock.AnythingOfType("uint16"), + mock.AnythingOfType("[]uint8"), + mock.AnythingOfType("uint32"), + ).Return(&status, keyToReturn, errorToReturn) // Simulate the advancement of time (i.e. allow the writer to write the next blob). - writer.writeNextBlob() + err = writer.writeNextBlob() + if errorToReturn != nil { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } - disperserClient.mock.AssertNumberOfCalls(t, functionName, 1) + disperserClient.mock.AssertNumberOfCalls(t, "DisperseBlob", 1) if errorToReturn == nil { - dataSentToDisperser := disperserClient.mock.Calls[0].Arguments.Get(0).([]byte) + dataSentToDisperser := disperserClient.mock.Calls[0].Arguments.Get(1).([]byte) assert.NotNil(t, dataSentToDisperser) // Strip away the extra encoding bytes. We should have data of the expected Size. decodedData := codec.RemoveEmptyByteFromPaddedBytes(dataSentToDisperser) assert.Equal(t, dataSize, uint64(len(decodedData))) - // Verify that the proper data was sent to the unconfirmed Key handler. - checksum := md5.Sum(dataSentToDisperser) - - unconfirmedKey, ok := <-unconfirmedKeyChannel - - assert.True(t, ok) - assert.Equal(t, keyToReturn, unconfirmedKey.Key) - assert.Equal(t, uint(encodedDataSize), unconfirmedKey.Size) - assert.Equal(t, checksum, unconfirmedKey.Checksum) - // Verify that data has the proper amount of randomness. if previousData != nil { if randomizeBlobs { diff --git a/tools/traffic/workers/mock_disperser.go b/tools/traffic/workers/mock_disperser.go index 43d9880ce..ae0e0f7c2 100644 --- a/tools/traffic/workers/mock_disperser.go +++ b/tools/traffic/workers/mock_disperser.go @@ -3,9 +3,11 @@ package workers import ( "context" - "github.com/Layr-Labs/eigenda/api/clients" - disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" - "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/api/clients/v2" + disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2" + "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/stretchr/testify/mock" ) @@ -18,29 +20,22 @@ type MockDisperserClient struct { func (m *MockDisperserClient) DisperseBlob( ctx context.Context, data []byte, - customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { + blobVersion corev2.BlobVersion, + quorums []core.QuorumID, + salt uint32) (*dispv2.BlobStatus, corev2.BlobKey, error) { - args := m.mock.Called(data, customQuorums) - return args.Get(0).(*disperser.BlobStatus), args.Get(1).([]byte), args.Error(2) + args := m.mock.Called(ctx, data, blobVersion, quorums, salt) + return args.Get(0).(*dispv2.BlobStatus), args.Get(1).(corev2.BlobKey), args.Error(2) } -func (m *MockDisperserClient) DisperseBlobAuthenticated( - ctx context.Context, - data []byte, - customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { - - args := m.mock.Called(data, customQuorums) - return args.Get(0).(*disperser.BlobStatus), args.Get(1).([]byte), args.Error(2) -} - -func (m *MockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) { - args := m.mock.Called(key) +func (m *MockDisperserClient) GetBlobStatus(ctx context.Context, blobKey corev2.BlobKey) (*disperser_rpc.BlobStatusReply, error) { + args := m.mock.Called(blobKey) return args.Get(0).(*disperser_rpc.BlobStatusReply), args.Error(1) } -func (m *MockDisperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) { - args := m.mock.Called(batchHeaderHash, blobIndex) - return args.Get(0).([]byte), args.Error(1) +func (m *MockDisperserClient) GetBlobCommitment(ctx context.Context, data []byte) (*disperser_rpc.BlobCommitmentReply, error) { + args := m.mock.Called(data) + return args.Get(0).(*disperser_rpc.BlobCommitmentReply), args.Error(1) } func (m *MockDisperserClient) Close() error { diff --git a/tools/traffic/workers/unconfirmed_key.go b/tools/traffic/workers/unconfirmed_key.go index c86b8f1dc..9523cd04c 100644 --- a/tools/traffic/workers/unconfirmed_key.go +++ b/tools/traffic/workers/unconfirmed_key.go @@ -1,11 +1,15 @@ package workers -import "time" +import ( + "time" -// UnconfirmedKey is a Key that has not yet been confirmed by the disperser service. -type UnconfirmedKey struct { + v2 "github.com/Layr-Labs/eigenda/core/v2" +) + +// UncertifiedKey is a Key that has not yet been certified by the disperser service. +type UncertifiedKey struct { // The Key of the blob. - Key []byte + Key v2.BlobKey // The Size of the blob in bytes. Size uint // The Checksum of the blob. diff --git a/trafficgenerator2.Dockerfile b/trafficgenerator-v2.Dockerfile similarity index 77% rename from trafficgenerator2.Dockerfile rename to trafficgenerator-v2.Dockerfile index 8f9dc149b..092d0e6e7 100644 --- a/trafficgenerator2.Dockerfile +++ b/trafficgenerator-v2.Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.21.1-alpine3.18 as builder +FROM golang:1.21.1-alpine3.18 AS builder RUN apk add --no-cache make musl-dev linux-headers gcc git jq bash @@ -13,9 +13,8 @@ RUN --mount=type=cache,target=/go/pkg/mod \ --mount=type=cache,target=/root/.cache/go-build \ go build -o ./bin/generator ./cmd2 -FROM alpine:3.18 as generator2 +FROM alpine:3.18 AS generator2 COPY --from=builder /app/tools/traffic/bin/generator /usr/local/bin -COPY --from=builder /app/inabox/resources /resources ENTRYPOINT ["generator"] diff --git a/trafficgenerator.Dockerfile b/trafficgenerator.Dockerfile index 8d39eaf33..5ffc767c2 100644 --- a/trafficgenerator.Dockerfile +++ b/trafficgenerator.Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.21.1-alpine3.18 as builder +FROM golang:1.21.1-alpine3.18 AS builder RUN apk add --no-cache make musl-dev linux-headers gcc git jq bash @@ -13,7 +13,7 @@ RUN --mount=type=cache,target=/go/pkg/mod \ --mount=type=cache,target=/root/.cache/go-build \ go build -o ./bin/generator ./cmd -FROM alpine:3.18 as generator +FROM alpine:3.18 AS generator COPY --from=builder /app/tools/traffic/bin/generator /usr/local/bin