diff --git a/configs/examples/load-test-config.yml b/configs/examples/load-test-config.yml new file mode 100644 index 00000000..77074cdd --- /dev/null +++ b/configs/examples/load-test-config.yml @@ -0,0 +1,20 @@ +transaction_submission_rpcs: + - "http://localhost:8545" +query_rpc: "http://localhost:8545" +txpool_nodes: [] +flashblocks_ws: "ws://localhost:7111" + +sender_count: 10 +target_gps: 500000000 +duration: "60s" +funding_amount: "10000000000000000000" + +transactions: + - weight: 70 + type: transfer + - weight: 20 + type: calldata + max_size: 256 + - weight: 10 + type: precompile + target: sha256 diff --git a/configs/examples/load-test.yml b/configs/examples/load-test.yml index 6743ca60..163513ce 100644 --- a/configs/examples/load-test.yml +++ b/configs/examples/load-test.yml @@ -4,16 +4,8 @@ payloads: - name: Load Test type: load-test id: load-test - sender_count: 10 - transactions: - - weight: 70 - type: transfer - - weight: 20 - type: calldata - max_size: 256 - - weight: 10 - type: precompile - target: sha256 + network: devnet + config_file: ./load-test-config.yml benchmarks: - variables: diff --git a/runner/benchmark/result_metadata.go b/runner/benchmark/result_metadata.go index e31bdbff..03336e26 100644 --- a/runner/benchmark/result_metadata.go +++ b/runner/benchmark/result_metadata.go @@ -51,7 +51,9 @@ func (runs *RunGroup) AddResult(testIdx int, runResult RunResult) { } const ( - BenchmarkRunTag = "BenchmarkRun" + BenchmarkRunTag = "BenchmarkRun" + LoadTestResultsDir = "load-tests" + LoadTestTimestampLayout = "2006-01-02-15-04-05" ) func RunGroupFromTestPlans(testPlans []TestPlan, machineInfo *MachineInfo) RunGroup { diff --git a/runner/clients/baserethnode/client.go b/runner/clients/baserethnode/client.go index 844d33b3..3a84cff4 100644 --- a/runner/clients/baserethnode/client.go +++ b/runner/clients/baserethnode/client.go @@ -276,3 +276,7 @@ func (r *BaseRethNodeClient) FlashblocksClient() types.FlashblocksClient { func (r *BaseRethNodeClient) SupportsFlashblocks() bool { return true } + +func (r *BaseRethNodeClient) FlashblocksWsURL() string { + return "" +} diff --git a/runner/clients/builder/client.go b/runner/clients/builder/client.go index b229905c..605f93e1 100644 --- a/runner/clients/builder/client.go +++ b/runner/clients/builder/client.go @@ -128,3 +128,12 @@ func (r *BuilderClient) FlashblocksClient() types.FlashblocksClient { func (r *BuilderClient) SupportsFlashblocks() bool { return false } + +// FlashblocksWsURL returns the local WebSocket URL of the flashblocks server +// hosted by the builder. +func (r *BuilderClient) FlashblocksWsURL() string { + if r.websocketPort == 0 { + return "" + } + return fmt.Sprintf("ws://localhost:%d", r.websocketPort) +} diff --git a/runner/clients/common/proxy/proxy.go b/runner/clients/common/proxy/proxy.go index f261c39e..b212ec0b 100644 --- a/runner/clients/common/proxy/proxy.go +++ b/runner/clients/common/proxy/proxy.go @@ -17,6 +17,7 @@ import ( "fmt" "io" "net/http" + "sync" "github.com/base/base-bench/runner/network/mempool" "github.com/ethereum/go-ethereum/common" @@ -31,6 +32,7 @@ type ProxyServer struct { pendingTxs []*ethTypes.Transaction clientURL string mempool *mempool.StaticWorkloadMempool + mu sync.Mutex } func NewProxyServer(clientURL string, log log.Logger, port int, mempool *mempool.StaticWorkloadMempool) *ProxyServer { @@ -62,11 +64,29 @@ func (p *ProxyServer) Run(ctx context.Context) error { } func (p *ProxyServer) PendingTxs() []*ethTypes.Transaction { - return p.pendingTxs + p.mu.Lock() + defer p.mu.Unlock() + + txs := make([]*ethTypes.Transaction, len(p.pendingTxs)) + copy(txs, p.pendingTxs) + return txs } func (p *ProxyServer) ClearPendingTxs() { + p.mu.Lock() + defer p.mu.Unlock() + + p.pendingTxs = make([]*ethTypes.Transaction, 0) +} + +func (p *ProxyServer) DrainPendingTxs() []*ethTypes.Transaction { + p.mu.Lock() + defer p.mu.Unlock() + + txs := make([]*ethTypes.Transaction, len(p.pendingTxs)) + copy(txs, p.pendingTxs) p.pendingTxs = make([]*ethTypes.Transaction, 0) + return txs } // Stop stops both the proxy server and the underlying client @@ -89,13 +109,20 @@ func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) { return } - var request struct { + type rpcRequest struct { Method string `json:"method"` Params json.RawMessage `json:"params"` ID interface{} `json:"id"` JSONRPC string `json:"jsonrpc"` } + if len(body) > 0 && body[0] == '[' { + p.handleBatchRequest(w, body) + return + } + + var request rpcRequest + if err := json.Unmarshal(body, &request); err != nil { http.Error(w, "Error parsing request", http.StatusBadRequest) return @@ -164,6 +191,50 @@ func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) { p.DebugResponse(request.Method, request.Params, respBody) } +func (p *ProxyServer) handleBatchRequest(w http.ResponseWriter, body []byte) { + type rpcRequest struct { + Method string `json:"method"` + Params json.RawMessage `json:"params"` + ID interface{} `json:"id"` + JSONRPC string `json:"jsonrpc"` + } + + var requests []rpcRequest + if err := json.Unmarshal(body, &requests); err != nil { + http.Error(w, "Error parsing batch request", http.StatusBadRequest) + return + } + + type rpcResponse struct { + JSONRPC string `json:"jsonrpc"` + ID interface{} `json:"id"` + Result json.RawMessage `json:"result,omitempty"` + Error interface{} `json:"error,omitempty"` + } + + responses := make([]rpcResponse, 0, len(requests)) + for _, request := range requests { + handled, result, err := p.OverrideRequest(request.Method, request.Params) + response := rpcResponse{ + JSONRPC: "2.0", + ID: request.ID, + } + if err != nil { + response.Error = map[string]interface{}{"code": -32000, "message": err.Error()} + } else if handled { + response.Result = result + } else { + response.Error = map[string]interface{}{"code": -32601, "message": "method not supported in proxy batch mode"} + } + responses = append(responses, response) + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(responses); err != nil { + p.log.Error("Error encoding batch response", "err", err) + } +} + func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) (bool, json.RawMessage, error) { switch method { case "eth_getTransactionCount": @@ -204,7 +275,9 @@ func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) return false, nil, fmt.Errorf("failed to decode transaction: %w", err) } + p.mu.Lock() p.pendingTxs = append(p.pendingTxs, &tx) + p.mu.Unlock() txHash := tx.Hash().Hex() jsonResponse, _ := json.Marshal(txHash) diff --git a/runner/clients/common/proxy/proxy_test.go b/runner/clients/common/proxy/proxy_test.go new file mode 100644 index 00000000..a5c0678f --- /dev/null +++ b/runner/clients/common/proxy/proxy_test.go @@ -0,0 +1,104 @@ +package proxy + +import ( + "bytes" + "encoding/json" + "math/big" + "net/http" + "net/http/httptest" + "testing" + + "github.com/base/base-bench/runner/network/mempool" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" +) + +func TestHandleBatchRequestCapturesRawTransactions(t *testing.T) { + chainID := big.NewInt(8453) + tx := signedTestTx(t, chainID) + rawTx, err := tx.MarshalBinary() + if err != nil { + t.Fatalf("marshal tx: %v", err) + } + + server := NewProxyServer( + "http://127.0.0.1:8545", + log.New(), + 0, + mempool.NewStaticWorkloadMempool(log.New(), chainID), + ) + + body, err := json.Marshal([]map[string]any{ + { + "jsonrpc": "2.0", + "id": 0, + "method": "eth_sendRawTransaction", + "params": []string{hexutil.Encode(rawTx)}, + }, + }) + if err != nil { + t.Fatalf("marshal request: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + server.handleRequest(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + var responses []struct { + Result string `json:"result"` + Error map[string]any `json:"error"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &responses); err != nil { + t.Fatalf("unmarshal response: %v", err) + } + if len(responses) != 1 { + t.Fatalf("expected 1 response, got %d", len(responses)) + } + if responses[0].Error != nil { + t.Fatalf("expected successful response, got error %v", responses[0].Error) + } + if responses[0].Result != tx.Hash().Hex() { + t.Fatalf("expected tx hash %s, got %s", tx.Hash().Hex(), responses[0].Result) + } + + pending := server.DrainPendingTxs() + if len(pending) != 1 { + t.Fatalf("expected 1 pending tx, got %d", len(pending)) + } + if pending[0].Hash() != tx.Hash() { + t.Fatalf("expected pending tx %s, got %s", tx.Hash(), pending[0].Hash()) + } +} + +func signedTestTx(t *testing.T, chainID *big.Int) *types.Transaction { + t.Helper() + + key, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("generate key: %v", err) + } + + tx := types.NewTx(&types.DynamicFeeTx{ + ChainID: chainID, + Nonce: 0, + GasTipCap: big.NewInt(1), + GasFeeCap: big.NewInt(1), + Gas: 21_000, + To: &common.Address{1}, + Value: big.NewInt(1), + }) + + signed, err := types.SignTx(tx, types.NewIsthmusSigner(chainID), key) + if err != nil { + t.Fatalf("sign tx: %v", err) + } + return signed +} diff --git a/runner/clients/geth/client.go b/runner/clients/geth/client.go index 89e40b67..a4efd25b 100644 --- a/runner/clients/geth/client.go +++ b/runner/clients/geth/client.go @@ -280,3 +280,7 @@ func (g *GethClient) FlashblocksClient() types.FlashblocksClient { func (g *GethClient) SupportsFlashblocks() bool { return false } + +func (g *GethClient) FlashblocksWsURL() string { + return "" +} diff --git a/runner/clients/reth/client.go b/runner/clients/reth/client.go index 7a4a25c8..35739d4e 100644 --- a/runner/clients/reth/client.go +++ b/runner/clients/reth/client.go @@ -296,3 +296,7 @@ func (r *RethClient) FlashblocksClient() types.FlashblocksClient { func (r *RethClient) SupportsFlashblocks() bool { return true } + +func (r *RethClient) FlashblocksWsURL() string { + return "" +} diff --git a/runner/clients/types/types.go b/runner/clients/types/types.go index a0be0080..c88b6212 100644 --- a/runner/clients/types/types.go +++ b/runner/clients/types/types.go @@ -32,4 +32,7 @@ type ExecutionClient interface { SetHead(ctx context.Context, blockNumber uint64) error FlashblocksClient() FlashblocksClient // returns nil for clients that don't support flashblocks SupportsFlashblocks() bool // returns true if the client supports receiving flashblock payloads + // FlashblocksWsURL returns the local WebSocket URL hosted by this client, + // or an empty string when the client does not expose one. + FlashblocksWsURL() string } diff --git a/runner/network/network_benchmark.go b/runner/network/network_benchmark.go index 2bf13775..fd83d113 100644 --- a/runner/network/network_benchmark.go +++ b/runner/network/network_benchmark.go @@ -44,9 +44,9 @@ type NetworkBenchmark struct { testConfig *benchtypes.TestConfig proofConfig *benchmark.ProofProgramOptions - transactionPayload payload.Definition - ports portmanager.PortManager - flashblocksBlockTime string + transactionPayload payload.Definition + ports portmanager.PortManager + flashblocksBlockTime string } // NewNetworkBenchmark creates a new network benchmark and initializes the payload worker and consensus client diff --git a/runner/network/sequencer_benchmark.go b/runner/network/sequencer_benchmark.go index 29aa8c28..83e493c4 100644 --- a/runner/network/sequencer_benchmark.go +++ b/runner/network/sequencer_benchmark.go @@ -15,6 +15,7 @@ import ( "github.com/base/base-bench/runner/network/proofprogram/fakel1" benchtypes "github.com/base/base-bench/runner/network/types" "github.com/base/base-bench/runner/payload" + payloadworker "github.com/base/base-bench/runner/payload/worker" "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" @@ -24,6 +25,8 @@ import ( "github.com/pkg/errors" ) +const gracefulWorkerShutdownTimeout = 90 * time.Second + type sequencerBenchmark struct { log log.Logger sequencerClient types.ExecutionClient @@ -282,8 +285,12 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. payloads = append(payloads, *payload) } - err = consensusClient.Stop(benchmarkCtx) - if err != nil { + if err := nb.settleGracefulWorkerShutdown(benchmarkCtx, transactionWorker, consensusClient, pendingTxs); err != nil { + errChan <- err + return + } + + if err := consensusClient.Stop(benchmarkCtx); err != nil { nb.log.Warn("failed to stop consensus client", "err", err) } @@ -309,6 +316,64 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. } } +func (nb *sequencerBenchmark) settleGracefulWorkerShutdown( + ctx context.Context, + transactionWorker payloadworker.Worker, + consensusClient *consensus.SequencerConsensusClient, + pendingTxs int, +) error { + gracefulWorker, ok := transactionWorker.(payloadworker.GracefulShutdownWorker) + if !ok { + return nil + } + + if err := gracefulWorker.BeginGracefulShutdown(ctx); err != nil { + return errors.Wrap(err, "failed to begin graceful payload worker shutdown") + } + + timeout := time.NewTimer(gracefulWorkerShutdownTimeout) + defer timeout.Stop() + + settlementBlock := 0 + for { + select { + case <-gracefulWorker.Done(): + nb.log.Info("Payload worker stopped gracefully", "settlement_blocks", settlementBlock) + return nil + case <-timeout.C: + nb.log.Warn("Timed out waiting for payload worker to stop gracefully", "settlement_blocks", settlementBlock) + return nil + default: + } + + blockMetrics := metrics.NewBlockMetrics() + txsSent, err := transactionWorker.SendTxs(ctx, pendingTxs) + if err != nil { + return errors.Wrap(err, "failed to collect settlement transactions") + } + + payload, err := consensusClient.Propose(ctx, blockMetrics, true) + if err != nil { + return errors.Wrap(err, "failed to propose settlement block") + } + if payload == nil { + return errors.New("received nil settlement payload from consensus client") + } + + userTxsIncluded := len(payload.Transactions) - 1 + if userTxsIncluded < 0 { + userTxsIncluded = 0 + } + pendingTxs = pendingTxs + txsSent - userTxsIncluded + if pendingTxs < 0 { + pendingTxs = 0 + } + + settlementBlock++ + time.Sleep(nb.config.Params.BlockTime) + } +} + // flashblockCollector implements FlashblockListener to collect flashblocks. type flashblockCollector struct { log log.Logger diff --git a/runner/network/types/types.go b/runner/network/types/types.go index c219c72a..9bb0b34a 100644 --- a/runner/network/types/types.go +++ b/runner/network/types/types.go @@ -40,6 +40,10 @@ type TestConfig struct { PrefundPrivateKey ecdsa.PrivateKey PrefundAmount big.Int + + // LoadTestOutputPath is the optional normal load-test report JSON path used + // by the load-test payload worker. + LoadTestOutputPath string } // BatcherAddr returns the batcher address, computing it if necessary diff --git a/runner/payload/factory.go b/runner/payload/factory.go index b8e7176e..7894563f 100644 --- a/runner/payload/factory.go +++ b/runner/payload/factory.go @@ -38,7 +38,7 @@ func NewPayloadWorker(ctx context.Context, log log.Logger, testConfig *benchtype def = &loadtest.LoadTestPayloadDefinition{} } worker, err = loadtest.NewLoadTestPayloadWorker( - log, sequencerClient.ClientURL(), params, privateKey, amount, config, genesis.Config.ChainID, *def) + log, sequencerClient.ClientURL(), sequencerClient.FlashblocksWsURL(), params, privateKey, amount, config, genesis.Config.ChainID, *def, testConfig.LoadTestOutputPath) case "transfer-only": worker, err = transferonly.NewTransferPayloadWorker( ctx, log, sequencerClient.ClientURL(), params, privateKey, amount, &genesis, definition.Params) diff --git a/runner/payload/loadtest/load_test_worker.go b/runner/payload/loadtest/load_test_worker.go index 5b6d62ff..92527aa8 100644 --- a/runner/payload/loadtest/load_test_worker.go +++ b/runner/payload/loadtest/load_test_worker.go @@ -3,12 +3,15 @@ package loadtest import ( "context" "crypto/ecdsa" - cryptorand "crypto/rand" "encoding/hex" "fmt" "math/big" "os" "os/exec" + "path/filepath" + "strconv" + "sync" + "time" "github.com/base/base-bench/runner/clients/common/proxy" "github.com/base/base-bench/runner/config" @@ -21,38 +24,30 @@ import ( ) // LoadTestPayloadDefinition is the YAML payload params for the load-test type. -// Fields map directly to the Rust base-load-test config format. -// The `transactions` field is passed through as raw YAML to support the full -// Rust config schema (transfer, calldata, precompile, erc20, etc.). +// The load-test workload itself lives in a native base-load-tester config file; +// benchmark mode only overlays the RPC and timing fields it must control. type LoadTestPayloadDefinition struct { - SenderCount uint64 `yaml:"sender_count"` - FundingAmount string `yaml:"funding_amount"` - Transactions yaml.Node `yaml:"transactions"` -} - -// loadTestConfig is the YAML config written to a temp file for the load-test binary. -type loadTestConfig struct { - RPC string `yaml:"rpc"` - SenderCount uint64 `yaml:"sender_count"` - TargetGPS uint64 `yaml:"target_gps"` - Duration string `yaml:"duration"` - Seed uint64 `yaml:"seed"` - FundingAmount string `yaml:"funding_amount"` - Transactions yaml.Node `yaml:"transactions"` + ConfigFile string `yaml:"config_file"` + Network string `yaml:"network"` } type loadTestPayloadWorker struct { - log log.Logger - prefundSK string - loadTestBin string - elRPCURL string - gasLimit uint64 - blockTimeSec uint64 - params LoadTestPayloadDefinition - mempool *mempool.StaticWorkloadMempool - proxyServer *proxy.ProxyServer - cmd *exec.Cmd - configFilePath string + log log.Logger + prefundSK string + loadTestBin string + elRPCURL string + flashblocksURL string + gasLimit uint64 + blockTimeSec uint64 + params LoadTestPayloadDefinition + mempool *mempool.StaticWorkloadMempool + proxyServer *proxy.ProxyServer + cmd *exec.Cmd + done chan struct{} + shutdownOnce sync.Once + sourceConfigPath string + renderedConfigPath string + outputPath string } // NewLoadTestPayloadWorker creates a worker that runs the base-load-test binary @@ -60,12 +55,14 @@ type loadTestPayloadWorker struct { func NewLoadTestPayloadWorker( log log.Logger, elRPCURL string, + flashblocksURL string, params types.RunParams, prefundedPrivateKey ecdsa.PrivateKey, prefundAmount *big.Int, cfg config.Config, chainID *big.Int, definition LoadTestPayloadDefinition, + outputPath string, ) (worker.Worker, error) { mp := mempool.NewStaticWorkloadMempool(log, chainID) ps := proxy.NewProxyServer(elRPCURL, log, cfg.ProxyPort(), mp) @@ -75,16 +72,24 @@ func NewLoadTestPayloadWorker( blockTimeSec = 1 } + sourceConfigPath, err := resolveConfigFilePath(cfg.ConfigPath(), definition.ConfigFile) + if err != nil { + return nil, err + } + w := &loadTestPayloadWorker{ - log: log, - prefundSK: hex.EncodeToString(prefundedPrivateKey.D.Bytes()), - loadTestBin: cfg.LoadTestBinary(), - elRPCURL: elRPCURL, - gasLimit: params.GasLimit, - blockTimeSec: blockTimeSec, - params: definition, - mempool: mp, - proxyServer: ps, + log: log, + prefundSK: hex.EncodeToString(prefundedPrivateKey.D.Bytes()), + loadTestBin: cfg.LoadTestBinary(), + elRPCURL: elRPCURL, + flashblocksURL: flashblocksURL, + gasLimit: params.GasLimit, + blockTimeSec: blockTimeSec, + params: definition, + mempool: mp, + proxyServer: ps, + sourceConfigPath: sourceConfigPath, + outputPath: outputPath, } return w, nil @@ -103,7 +108,7 @@ func (w *loadTestPayloadWorker) Setup(ctx context.Context) error { if err != nil { return errors.Wrap(err, "failed to write load-test config") } - w.configFilePath = configPath + w.renderedConfigPath = configPath w.log.Info("Starting load test", "binary", w.loadTestBin, "config", configPath) @@ -111,31 +116,90 @@ func (w *loadTestPayloadWorker) Setup(ctx context.Context) error { cmd.Stdout = os.Stdout cmd.Stderr = os.Stdout cmd.Env = append(os.Environ(), fmt.Sprintf("FUNDER_KEY=%s", w.prefundSK)) + if w.outputPath != "" { + if err := os.MkdirAll(filepath.Dir(w.outputPath), 0755); err != nil { + return errors.Wrap(err, "failed to create load-test output directory") + } + cmd.Env = append(cmd.Env, fmt.Sprintf("LOAD_TEST_OUTPUT=%s", w.outputPath)) + } if err := cmd.Start(); err != nil { return errors.Wrap(err, "failed to start load test binary") } w.cmd = cmd + w.done = make(chan struct{}) + go func() { + _ = cmd.Wait() + close(w.done) + }() return nil } +func (w *loadTestPayloadWorker) BeginGracefulShutdown(ctx context.Context) error { + if w.cmd == nil || w.cmd.Process == nil { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-w.Done(): + return nil + default: + } + + var signalErr error + w.shutdownOnce.Do(func() { + w.log.Info("Stopping load test process gracefully", "pid", w.cmd.Process.Pid, "output", w.outputPath) + signalErr = w.cmd.Process.Signal(os.Interrupt) + }) + if signalErr != nil { + select { + case <-w.Done(): + return nil + default: + } + } + return signalErr +} + +func (w *loadTestPayloadWorker) Done() <-chan struct{} { + if w.done != nil { + return w.done + } + + done := make(chan struct{}) + close(done) + return done +} + func (w *loadTestPayloadWorker) Stop(ctx context.Context) error { if w.cmd != nil && w.cmd.Process != nil { - w.log.Info("Stopping load test process", "pid", w.cmd.Process.Pid) - if err := w.cmd.Process.Kill(); err != nil { - w.log.Warn("failed to kill load test process", "err", err) - } else { - // Reap the process to avoid zombies. - _, _ = w.cmd.Process.Wait() + if err := w.BeginGracefulShutdown(ctx); err != nil { + w.log.Warn("failed to signal load test process", "err", err) + } + + select { + case <-w.Done(): + case <-time.After(10 * time.Second): + w.log.Warn("load test process did not stop gracefully, killing", "pid", w.cmd.Process.Pid) + if err := w.cmd.Process.Kill(); err != nil { + w.log.Warn("failed to kill load test process", "err", err) + } + select { + case <-w.Done(): + case <-time.After(5 * time.Second): + w.log.Warn("timed out waiting for killed load test process") + } } } w.proxyServer.Stop() - if w.configFilePath != "" { - if err := os.Remove(w.configFilePath); err != nil { - w.log.Warn("failed to remove load-test config", "path", w.configFilePath, "err", err) + if w.renderedConfigPath != "" { + if err := os.Remove(w.renderedConfigPath); err != nil { + w.log.Warn("failed to remove load-test config", "path", w.renderedConfigPath, "err", err) } } @@ -144,79 +208,110 @@ func (w *loadTestPayloadWorker) Stop(ctx context.Context) error { func (w *loadTestPayloadWorker) SendTxs(ctx context.Context, _ int) (int, error) { w.log.Info("Collecting txs from load test") - pendingTxs := w.proxyServer.PendingTxs() - w.proxyServer.ClearPendingTxs() + pendingTxs := w.proxyServer.DrainPendingTxs() w.mempool.AddTransactions(pendingTxs) return len(pendingTxs), nil } -// defaultTransactions returns the default transaction mix as a yaml.Node. -func defaultTransactions() yaml.Node { - var node yaml.Node - // Default: 70% transfer, 20% calldata, 10% precompile - defaultYAML := ` -- weight: 70 - type: transfer -- weight: 20 - type: calldata - max_size: 256 -- weight: 10 - type: precompile - target: sha256 -` - if err := yaml.Unmarshal([]byte(defaultYAML), &node); err != nil { - panic(fmt.Sprintf("failed to parse default transactions YAML: %v", err)) - } - // yaml.Unmarshal wraps in a document node; return the inner sequence - if node.Kind == yaml.DocumentNode && len(node.Content) > 0 { - return *node.Content[0] +func resolveConfigFilePath(benchmarkConfigPath string, loadTestConfigPath string) (string, error) { + if loadTestConfigPath == "" { + return "", errors.New("load-test payload requires config_file") } - return node + if filepath.IsAbs(loadTestConfigPath) { + return loadTestConfigPath, nil + } + return filepath.Join(filepath.Dir(benchmarkConfigPath), loadTestConfigPath), nil } -// randomSeed returns a cryptographically random uint64 seed. -func randomSeed() uint64 { - var b [8]byte - if _, err := cryptorand.Read(b[:]); err != nil { - return 42 +func (w *loadTestPayloadWorker) targetGPS() uint64 { + blockTimeSec := w.blockTimeSec + if blockTimeSec == 0 { + blockTimeSec = 1 } - return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 | - uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56 + return w.gasLimit / blockTimeSec } -// writeConfig generates a temporary YAML config file for the load-test binary -// with the RPC URL pointing to the proxy server. -func (w *loadTestPayloadWorker) writeConfig() (string, error) { - senderCount := w.params.SenderCount - if senderCount == 0 { - senderCount = 10 +func (w *loadTestPayloadWorker) buildConfig() (*yaml.Node, error) { + data, err := os.ReadFile(w.sourceConfigPath) + if err != nil { + return nil, errors.Wrap(err, "failed to read load-test config file") + } + + var doc yaml.Node + if err := yaml.Unmarshal(data, &doc); err != nil { + return nil, errors.Wrap(err, "failed to parse load-test config file") } - fundingAmount := w.params.FundingAmount - if fundingAmount == "" { - fundingAmount = "10000000000000000000" + config, err := mappingRoot(&doc) + if err != nil { + return nil, err + } + + proxyURL := w.proxyServer.ClientURL() + setMappingValue(config, "transaction_submission_rpcs", stringSequenceNode(proxyURL)) + setMappingValue(config, "query_rpc", stringNode(proxyURL)) + + flashblocksURL := w.flashblocksURL + if flashblocksURL == "" { + flashblocksURL = "ws://localhost:7111" } + setMappingValue(config, "flashblocks_ws", stringNode(flashblocksURL)) + setMappingValue(config, "target_gps", uintNode(w.targetGPS())) + setMappingValue(config, "duration", stringNode("99999s")) - // Compute target GPS from gas limit and block time - targetGPS := w.gasLimit / w.blockTimeSec + return config, nil +} - transactions := w.params.Transactions - if transactions.Kind == 0 { - transactions = defaultTransactions() +func mappingRoot(doc *yaml.Node) (*yaml.Node, error) { + root := doc + if doc.Kind == yaml.DocumentNode { + if len(doc.Content) == 0 { + return nil, errors.New("load-test config file is empty") + } + root = doc.Content[0] } - config := loadTestConfig{ - RPC: w.proxyServer.ClientURL(), - SenderCount: senderCount, - TargetGPS: targetGPS, - Duration: "99999s", - Seed: randomSeed(), - FundingAmount: fundingAmount, - Transactions: transactions, + if root.Kind != yaml.MappingNode { + return nil, fmt.Errorf("load-test config file must be a YAML mapping, got kind %d", root.Kind) } + return root, nil +} + +func setMappingValue(mapping *yaml.Node, key string, value *yaml.Node) { + for i := 0; i < len(mapping.Content)-1; i += 2 { + if mapping.Content[i].Value == key { + mapping.Content[i+1] = value + return + } + } + mapping.Content = append(mapping.Content, stringNode(key), value) +} - data, err := yaml.Marshal(&config) +func stringNode(value string) *yaml.Node { + return &yaml.Node{Kind: yaml.ScalarNode, Tag: "!!str", Value: value} +} + +func uintNode(value uint64) *yaml.Node { + return &yaml.Node{Kind: yaml.ScalarNode, Tag: "!!int", Value: strconv.FormatUint(value, 10)} +} + +func stringSequenceNode(values ...string) *yaml.Node { + node := &yaml.Node{Kind: yaml.SequenceNode, Tag: "!!seq"} + for _, value := range values { + node.Content = append(node.Content, stringNode(value)) + } + return node +} + +// writeConfig generates a temporary YAML config file for the load-test binary +// with the RPC URL pointing to the proxy server. +func (w *loadTestPayloadWorker) writeConfig() (string, error) { + config, err := w.buildConfig() + if err != nil { + return "", err + } + data, err := yaml.Marshal(config) if err != nil { return "", errors.Wrap(err, "failed to marshal load-test config") } @@ -236,8 +331,8 @@ func (w *loadTestPayloadWorker) writeConfig() (string, error) { } w.log.Info("Generated load-test config", - "sender_count", senderCount, - "target_gps", targetGPS, + "source_config", w.sourceConfigPath, + "target_gps", w.targetGPS(), "gas_limit", w.gasLimit, "block_time_sec", w.blockTimeSec, ) diff --git a/runner/payload/loadtest/load_test_worker_test.go b/runner/payload/loadtest/load_test_worker_test.go new file mode 100644 index 00000000..b69e8784 --- /dev/null +++ b/runner/payload/loadtest/load_test_worker_test.go @@ -0,0 +1,125 @@ +package loadtest + +import ( + "os" + "path/filepath" + "testing" + + "github.com/base/base-bench/runner/clients/common/proxy" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" +) + +func TestBuildConfigOverlaysBenchmarkFieldsAndPreservesLoadTestConfig(t *testing.T) { + configPath := filepath.Join(t.TempDir(), "mainnet-state-weth-usdc-swaps.yaml") + err := os.WriteFile(configPath, []byte(` +transaction_submission_rpcs: + - "http://standalone-submitter.invalid" +query_rpc: "http://standalone-query.invalid" +flashblocks_ws: "ws://standalone-flashblocks.invalid" +target_gps: 123 +duration: "60s" +chain_id: 8453 +sender_count: 250 +in_flight_per_sender: 64 +batch_size: 20 +batch_timeout: "10ms" +seed: 654789 +funding_amount: "200000000000000000" +real_token_setup: + enabled: true + allow_chain_id_8453: true + weth: "0x4200000000000000000000000000000000000006" + weth_amount_per_sender: "50000000000000000" + pair_token: + token: "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" + amount_per_sender: "10000000" + acquisition: + type: uniswap_v3_exact_input + router: "0x2626664c2603336E57B271c5C0b26F421741e481" + fee: 500 + amount_in: "10000000000000000" + min_amount_out: "0" +transactions: + - weight: 50 + type: uniswap_v3 + router: "0x2626664c2603336E57B271c5C0b26F421741e481" + token_in: "0x4200000000000000000000000000000000000006" + token_out: "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" + fee: 500 + min_amount: "10000000000000" + max_amount: "100000000000000" + reverse_min_amount: "100000" + reverse_max_amount: "1000000" + - weight: 50 + type: aerodrome_cl + router: "0xBE6D8f0d05cC4be24d5167a3eF062215bE6D18a5" + token_in: "0x4200000000000000000000000000000000000006" + token_out: "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" + tick_spacing: 100 + min_amount: "10000000000000" + max_amount: "100000000000000" + reverse_min_amount: "100000" + reverse_max_amount: "1000000" +`), 0644) + require.NoError(t, err) + + worker := &loadTestPayloadWorker{ + flashblocksURL: "ws://benchmark-flashblocks.example", + gasLimit: 150_000_000, + blockTimeSec: 2, + proxyServer: proxy.NewProxyServer("http://sequencer.example", nil, 18546, nil), + sourceConfigPath: configPath, + } + + config, err := worker.buildConfig() + require.NoError(t, err) + + encoded, err := yaml.Marshal(config) + require.NoError(t, err) + output := string(encoded) + + for _, want := range []string{ + "transaction_submission_rpcs:\n - http://localhost:18546", + "query_rpc: http://localhost:18546", + "flashblocks_ws: ws://benchmark-flashblocks.example", + "target_gps: 75000000", + "duration: 99999s", + "chain_id: 8453", + "sender_count: 250", + "in_flight_per_sender: 64", + "batch_size: 20", + "batch_timeout: \"10ms\"", + "seed: 654789", + "real_token_setup:", + "allow_chain_id_8453: true", + "type: uniswap_v3", + "type: aerodrome_cl", + "reverse_min_amount: \"100000\"", + } { + require.Contains(t, output, want) + } + for _, oldValue := range []string{ + "standalone-submitter.invalid", + "standalone-query.invalid", + "standalone-flashblocks.invalid", + "target_gps: 123", + "duration: 60s", + } { + require.NotContains(t, output, oldValue) + } +} + +func TestResolveConfigFilePath(t *testing.T) { + resolved, err := resolveConfigFilePath("/tmp/configs/benchmark.yml", "load-tests/mainnet.yaml") + require.NoError(t, err) + require.Equal(t, "/tmp/configs/load-tests/mainnet.yaml", resolved) + + resolved, err = resolveConfigFilePath("/tmp/configs/benchmark.yml", "/var/load-tests/mainnet.yaml") + require.NoError(t, err) + require.Equal(t, "/var/load-tests/mainnet.yaml", resolved) + + _, err = resolveConfigFilePath("/tmp/configs/benchmark.yml", "") + require.Error(t, err) + require.Contains(t, err.Error(), "config_file") +} diff --git a/runner/payload/txfuzz/tx_fuzz_worker.go b/runner/payload/txfuzz/tx_fuzz_worker.go index b5167d42..87fd5010 100644 --- a/runner/payload/txfuzz/tx_fuzz_worker.go +++ b/runner/payload/txfuzz/tx_fuzz_worker.go @@ -84,8 +84,7 @@ func (t *txFuzzPayloadWorker) Stop(ctx context.Context) error { func (t *txFuzzPayloadWorker) SendTxs(ctx context.Context, _ int) (int, error) { t.log.Info("Sending txs in tx-fuzz mode") - pending := t.proxyServer.PendingTxs() - t.proxyServer.ClearPendingTxs() + pending := t.proxyServer.DrainPendingTxs() t.mempool.AddTransactions(pending) return len(pending), nil diff --git a/runner/payload/worker/types.go b/runner/payload/worker/types.go index 69906ffe..a0e05725 100644 --- a/runner/payload/worker/types.go +++ b/runner/payload/worker/types.go @@ -18,3 +18,10 @@ type Worker interface { Stop(ctx context.Context) error Mempool() mempool.FakeMempool } + +// GracefulShutdownWorker can stop generating transactions while the benchmark +// sequencer keeps producing settlement blocks. +type GracefulShutdownWorker interface { + BeginGracefulShutdown(ctx context.Context) error + Done() <-chan struct{} +} diff --git a/runner/service.go b/runner/service.go index 2e1c0309..8916094c 100644 --- a/runner/service.go +++ b/runner/service.go @@ -26,6 +26,7 @@ import ( "github.com/base/base-bench/runner/network" "github.com/base/base-bench/runner/network/types" "github.com/base/base-bench/runner/payload" + "github.com/base/base-bench/runner/payload/loadtest" "github.com/base/base-bench/runner/utils" "github.com/ethereum/go-ethereum/core" ethparams "github.com/ethereum/go-ethereum/params" @@ -368,6 +369,54 @@ func (s *service) setupBlobsDir(workingDir string) error { return nil } +func loadTestNetwork(genesis *core.Genesis, transactionPayload payload.Definition) string { + if envNetwork := os.Getenv("BASE_BENCH_LOAD_TEST_NETWORK"); envNetwork != "" { + return envNetwork + } + + if transactionPayload.Type == "load-test" { + if def, ok := transactionPayload.Params.(*loadtest.LoadTestPayloadDefinition); ok && def.Network != "" { + return def.Network + } + } + + if genesis == nil || genesis.Config == nil || genesis.Config.ChainID == nil { + return "unknown" + } + + switch genesis.Config.ChainID.Uint64() { + case 8453: + return "mainnet" + case 84532: + return "sepolia" + case 13371337: + return "devnet" + default: + return fmt.Sprintf("chain-%s", genesis.Config.ChainID.String()) + } +} + +func (s *service) loadTestOutputPath(genesis *core.Genesis, transactionPayload payload.Definition) string { + if transactionPayload.Type != "load-test" { + return "" + } + + network := loadTestNetwork(genesis, transactionPayload) + baseTime := time.Now().UTC() + for i := 0; ; i++ { + timestamp := baseTime.Add(time.Duration(i) * time.Second).Format(benchmark.LoadTestTimestampLayout) + outputPath := path.Join( + s.config.OutputDir(), + benchmark.LoadTestResultsDir, + network, + fmt.Sprintf("%s.json", timestamp), + ) + if _, err := os.Stat(outputPath); err != nil { + return outputPath + } + } +} + func (s *service) runTest(ctx context.Context, params types.RunParams, workingDir string, outputDir string, snapshotConfig *benchmark.SnapshotDefinition, proofConfig *benchmark.ProofProgramOptions, transactionPayload payload.Definition, datadirsConfig *benchmark.DatadirConfig, flashblocksBlockTime string) (*benchmark.RunResult, error) { s.log.Info(fmt.Sprintf("Running benchmark with params: %+v", params)) @@ -423,12 +472,13 @@ func (s *service) runTest(ctx context.Context, params types.RunParams, workingDi prefundAmount := new(big.Int).Mul(big.NewInt(1e6), big.NewInt(ethparams.Ether)) config := &types.TestConfig{ - Params: params, - Config: s.config, - Genesis: *genesis, - BatcherKey: *batcherKey, - PrefundPrivateKey: *prefundKey, - PrefundAmount: *prefundAmount, + Params: params, + Config: s.config, + Genesis: *genesis, + BatcherKey: *batcherKey, + PrefundPrivateKey: *prefundKey, + PrefundAmount: *prefundAmount, + LoadTestOutputPath: s.loadTestOutputPath(genesis, transactionPayload), } // Run benchmark