From 9d5123b29e8af39d0ebb28e18c18912b262d481c Mon Sep 17 00:00:00 2001 From: Niran Babalola Date: Fri, 15 May 2026 17:27:09 -0500 Subject: [PATCH 1/4] feat(load-test): capture and display load test results from benchmark runs Wire the load-test payload worker to write a JSON result file on graceful shutdown and surface it in the report UI. After the benchmark window ends the runner sends SIGINT to the load-test binary and continues proposing settlement blocks until the worker exits (up to 90s), giving the binary time to flush its result. The output path is stored as an artifact in RunResult and shown as a "Load test" link in the run list that opens a dedicated detail page. Also fixes an inverted slices.Contains check in the importer that caused required files to be silently skipped on error and optional files to abort the import. --- report/src/App.tsx | 5 + report/src/components/RunList.tsx | 21 +- report/src/pages/BenchmarkLoadTestDetail.tsx | 50 +++++ report/src/pages/LoadTestDetail.tsx | 202 +++++++++++-------- report/src/services/dataService.ts | 17 ++ report/src/types.ts | 4 + report/src/utils/useDataSeries.ts | 26 +++ runner/benchmark/result_metadata.go | 5 +- runner/clients/baserethnode/client.go | 4 + runner/clients/builder/client.go | 9 + runner/clients/common/proxy/proxy.go | 77 ++++++- runner/clients/common/proxy/proxy_test.go | 104 ++++++++++ runner/clients/geth/client.go | 4 + runner/clients/reth/client.go | 4 + runner/clients/types/types.go | 3 + runner/importer/service.go | 3 +- runner/network/network_benchmark.go | 11 + runner/network/sequencer_benchmark.go | 70 ++++++- runner/network/types/types.go | 4 + runner/payload/factory.go | 2 +- runner/payload/loadtest/load_test_worker.go | 147 +++++++++++--- runner/payload/txfuzz/tx_fuzz_worker.go | 3 +- runner/payload/worker/types.go | 7 + runner/service.go | 13 +- 24 files changed, 658 insertions(+), 137 deletions(-) create mode 100644 report/src/pages/BenchmarkLoadTestDetail.tsx create mode 100644 runner/clients/common/proxy/proxy_test.go diff --git a/report/src/App.tsx b/report/src/App.tsx index cef213a2..a1050e76 100644 --- a/report/src/App.tsx +++ b/report/src/App.tsx @@ -5,6 +5,7 @@ import RedirectToLatestRun from "./pages/RedirectToLatestRun"; import LoadTestLanding from "./pages/LoadTestLanding"; import LoadTestAllRuns from "./pages/LoadTestAllRuns"; import LoadTestDetail from "./pages/LoadTestDetail"; +import BenchmarkLoadTestDetail from "./pages/BenchmarkLoadTestDetail"; import ErrorBoundary from "./components/ErrorBoundary"; function App() { @@ -22,6 +23,10 @@ function App() { path="/load-tests/:network/:timestamp" element={} /> + } + /> } /> - +
+ + {run.result?.artifacts?.loadTestResult && ( + + Load test + + )} +
{Object.keys(COLUMN_DEFINITIONS).map((column) => ( { + const { outputDir } = useParams(); + const { + data: result, + isLoading, + error, + } = useBenchmarkLoadTestResult(outputDir); + + return ( +
+ +
+ {isLoading && ( +
+ Loading benchmark load test… +
+ )} + + {error && ( +
+ Failed to load benchmark load test result: {String(error)} +
+ )} + + {result && ( + + Output dir: {outputDir} + + } + backLink={{ + to: "/latest", + label: "View benchmark runs →", + }} + /> + )} +
+
+ ); +}; + +export default BenchmarkLoadTestDetail; diff --git a/report/src/pages/LoadTestDetail.tsx b/report/src/pages/LoadTestDetail.tsx index 808ab814..83c02fb5 100644 --- a/report/src/pages/LoadTestDetail.tsx +++ b/report/src/pages/LoadTestDetail.tsx @@ -1,5 +1,5 @@ import { Link, useParams } from "react-router-dom"; -import { useMemo } from "react"; +import { type ReactNode, useMemo } from "react"; import Navbar from "../components/Navbar"; import StatCard, { Stat, StatGrid } from "../components/StatCard"; import PercentileBarChart, { @@ -141,50 +141,113 @@ const SummarySection = ({ result }: { result: LoadTestResult }) => { ); }; -const LoadTestDetail = () => { - const { network, timestamp } = useParams(); - const { - data: result, - isLoading, - error, - } = useLoadTestResult(network, timestamp); +interface LoadTestReportContentProps { + result: LoadTestResult; + title: string; + subtitle: ReactNode; + backLink?: { + to: string; + label: string; + }; +} +export const LoadTestReportContent = ({ + result, + title, + subtitle, + backLink, +}: LoadTestReportContentProps) => { const blockLatencyRows = useMemo( - () => (result ? buildLatencyRows(result.block_latency) : []), + () => buildLatencyRows(result.block_latency), [result], ); const flashblocksLatencyRows = useMemo( - () => (result ? buildLatencyRows(result.flashblocks_latency) : []), + () => buildLatencyRows(result.flashblocks_latency), [result], ); return ( -
- -
-
-
+ <> +
+
+ {backLink && ( - View all runs → + {backLink.label} -

- {timestamp ? formatLoadTestTimestamp(timestamp) : "Load test"} -

-

- Network: {network} - {timestamp && ( - <> - {" · "} - {timestamp} - - )} -

-
-
+ )} +

+ {title} +

+

{subtitle}

+
+
+ + + {result.throughput_timeseries && + result.throughput_timeseries.length > 1 && ( + + + + )} + + {result.config && } + + + + + + + + + + + + + {result.top_failure_reasons.length === 0 ? ( +
No failures recorded.
+ ) : ( +
    + {result.top_failure_reasons.map(([reason, count]) => ( +
  • + {reason} + {count.toLocaleString()} +
  • + ))} +
+ )} +
+ + ); +}; + +const LoadTestDetail = () => { + const { network, timestamp } = useParams(); + const { + data: result, + isLoading, + error, + } = useLoadTestResult(network, timestamp); + + return ( +
+ +
{isLoading && (
Loading load test…
)} @@ -196,62 +259,27 @@ const LoadTestDetail = () => { )} {result && ( - <> - - - {result.throughput_timeseries && - result.throughput_timeseries.length > 1 && ( - - - - )} - - {result.config && } - - - - - - - - - - - - - {result.top_failure_reasons.length === 0 ? ( -
- No failures recorded. -
- ) : ( -
    - {result.top_failure_reasons.map(([reason, count]) => ( -
  • - {reason} - - {count.toLocaleString()} - -
  • - ))} -
- )} -
- + + Network: {network} + {timestamp && ( + <> + {" · "} + + {timestamp} + + + )} + + } + backLink={{ + to: `/load-tests/${network ?? "sepolia"}/all`, + label: "View all runs →", + }} + /> )}
diff --git a/report/src/services/dataService.ts b/report/src/services/dataService.ts index 458c4b66..45236ac5 100644 --- a/report/src/services/dataService.ts +++ b/report/src/services/dataService.ts @@ -79,6 +79,23 @@ export class DataService { return await response.json(); } + + async getBenchmarkLoadTestResult( + outputDir: string, + artifactPath = "load-test-result.json", + ): Promise { + const response = await fetch( + `${this.baseUrl}output/${encodeURIComponent(outputDir)}/${encodeURIComponent(artifactPath)}`, + ); + + if (!response.ok) { + throw new Error( + `Failed to fetch benchmark load test result: ${response.status} ${response.statusText}`, + ); + } + + return await response.json(); + } } // Configuration helper to determine base URL from environment diff --git a/report/src/types.ts b/report/src/types.ts index 321ad631..2d889950 100644 --- a/report/src/types.ts +++ b/report/src/types.ts @@ -87,6 +87,10 @@ export interface BenchmarkRun { gasPerSecond: number; newPayload: number; }; + artifacts?: { + loadTestResult?: string; + [key: string]: string | undefined; + }; } | null; } diff --git a/report/src/utils/useDataSeries.ts b/report/src/utils/useDataSeries.ts index 24f08eca..5f36a7b7 100644 --- a/report/src/utils/useDataSeries.ts +++ b/report/src/utils/useDataSeries.ts @@ -139,3 +139,29 @@ export const useLoadTestResult = ( }, ); }; + +export const useBenchmarkLoadTestResult = ( + outputDir: string | undefined, + artifactPath?: string, +) => { + const fetcher = useCallback(async (): Promise => { + if (!outputDir) { + throw new Error("outputDir required"); + } + const dataService = getDataService(); + return await dataService.getBenchmarkLoadTestResult(outputDir, artifactPath); + }, [outputDir, artifactPath]); + + return useSWR( + outputDir + ? `benchmark-load-test-${outputDir}-${artifactPath ?? "load-test-result.json"}` + : null, + fetcher, + { + dedupingInterval: 12 * 60 * 60 * 1000, + revalidateOnFocus: false, + errorRetryCount: 3, + errorRetryInterval: 5000, + }, + ); +}; diff --git a/runner/benchmark/result_metadata.go b/runner/benchmark/result_metadata.go index e31bdbff..b06f755a 100644 --- a/runner/benchmark/result_metadata.go +++ b/runner/benchmark/result_metadata.go @@ -12,6 +12,7 @@ type RunResult struct { SequencerMetrics types.SequencerKeyMetrics `json:"sequencerMetrics"` ValidatorMetrics types.ValidatorKeyMetrics `json:"validatorMetrics"` ClientVersion string `json:"clientVersion,omitempty"` + Artifacts map[string]string `json:"artifacts,omitempty"` } // MachineInfo contains information about the machine running the benchmark @@ -51,7 +52,9 @@ func (runs *RunGroup) AddResult(testIdx int, runResult RunResult) { } const ( - BenchmarkRunTag = "BenchmarkRun" + BenchmarkRunTag = "BenchmarkRun" + LoadTestResultArtifactKey = "loadTestResult" + LoadTestResultFileName = "load-test-result.json" ) func RunGroupFromTestPlans(testPlans []TestPlan, machineInfo *MachineInfo) RunGroup { diff --git a/runner/clients/baserethnode/client.go b/runner/clients/baserethnode/client.go index 844d33b3..3a84cff4 100644 --- a/runner/clients/baserethnode/client.go +++ b/runner/clients/baserethnode/client.go @@ -276,3 +276,7 @@ func (r *BaseRethNodeClient) FlashblocksClient() types.FlashblocksClient { func (r *BaseRethNodeClient) SupportsFlashblocks() bool { return true } + +func (r *BaseRethNodeClient) FlashblocksWsURL() string { + return "" +} diff --git a/runner/clients/builder/client.go b/runner/clients/builder/client.go index b229905c..605f93e1 100644 --- a/runner/clients/builder/client.go +++ b/runner/clients/builder/client.go @@ -128,3 +128,12 @@ func (r *BuilderClient) FlashblocksClient() types.FlashblocksClient { func (r *BuilderClient) SupportsFlashblocks() bool { return false } + +// FlashblocksWsURL returns the local WebSocket URL of the flashblocks server +// hosted by the builder. +func (r *BuilderClient) FlashblocksWsURL() string { + if r.websocketPort == 0 { + return "" + } + return fmt.Sprintf("ws://localhost:%d", r.websocketPort) +} diff --git a/runner/clients/common/proxy/proxy.go b/runner/clients/common/proxy/proxy.go index f261c39e..b212ec0b 100644 --- a/runner/clients/common/proxy/proxy.go +++ b/runner/clients/common/proxy/proxy.go @@ -17,6 +17,7 @@ import ( "fmt" "io" "net/http" + "sync" "github.com/base/base-bench/runner/network/mempool" "github.com/ethereum/go-ethereum/common" @@ -31,6 +32,7 @@ type ProxyServer struct { pendingTxs []*ethTypes.Transaction clientURL string mempool *mempool.StaticWorkloadMempool + mu sync.Mutex } func NewProxyServer(clientURL string, log log.Logger, port int, mempool *mempool.StaticWorkloadMempool) *ProxyServer { @@ -62,11 +64,29 @@ func (p *ProxyServer) Run(ctx context.Context) error { } func (p *ProxyServer) PendingTxs() []*ethTypes.Transaction { - return p.pendingTxs + p.mu.Lock() + defer p.mu.Unlock() + + txs := make([]*ethTypes.Transaction, len(p.pendingTxs)) + copy(txs, p.pendingTxs) + return txs } func (p *ProxyServer) ClearPendingTxs() { + p.mu.Lock() + defer p.mu.Unlock() + + p.pendingTxs = make([]*ethTypes.Transaction, 0) +} + +func (p *ProxyServer) DrainPendingTxs() []*ethTypes.Transaction { + p.mu.Lock() + defer p.mu.Unlock() + + txs := make([]*ethTypes.Transaction, len(p.pendingTxs)) + copy(txs, p.pendingTxs) p.pendingTxs = make([]*ethTypes.Transaction, 0) + return txs } // Stop stops both the proxy server and the underlying client @@ -89,13 +109,20 @@ func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) { return } - var request struct { + type rpcRequest struct { Method string `json:"method"` Params json.RawMessage `json:"params"` ID interface{} `json:"id"` JSONRPC string `json:"jsonrpc"` } + if len(body) > 0 && body[0] == '[' { + p.handleBatchRequest(w, body) + return + } + + var request rpcRequest + if err := json.Unmarshal(body, &request); err != nil { http.Error(w, "Error parsing request", http.StatusBadRequest) return @@ -164,6 +191,50 @@ func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) { p.DebugResponse(request.Method, request.Params, respBody) } +func (p *ProxyServer) handleBatchRequest(w http.ResponseWriter, body []byte) { + type rpcRequest struct { + Method string `json:"method"` + Params json.RawMessage `json:"params"` + ID interface{} `json:"id"` + JSONRPC string `json:"jsonrpc"` + } + + var requests []rpcRequest + if err := json.Unmarshal(body, &requests); err != nil { + http.Error(w, "Error parsing batch request", http.StatusBadRequest) + return + } + + type rpcResponse struct { + JSONRPC string `json:"jsonrpc"` + ID interface{} `json:"id"` + Result json.RawMessage `json:"result,omitempty"` + Error interface{} `json:"error,omitempty"` + } + + responses := make([]rpcResponse, 0, len(requests)) + for _, request := range requests { + handled, result, err := p.OverrideRequest(request.Method, request.Params) + response := rpcResponse{ + JSONRPC: "2.0", + ID: request.ID, + } + if err != nil { + response.Error = map[string]interface{}{"code": -32000, "message": err.Error()} + } else if handled { + response.Result = result + } else { + response.Error = map[string]interface{}{"code": -32601, "message": "method not supported in proxy batch mode"} + } + responses = append(responses, response) + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(responses); err != nil { + p.log.Error("Error encoding batch response", "err", err) + } +} + func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) (bool, json.RawMessage, error) { switch method { case "eth_getTransactionCount": @@ -204,7 +275,9 @@ func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) return false, nil, fmt.Errorf("failed to decode transaction: %w", err) } + p.mu.Lock() p.pendingTxs = append(p.pendingTxs, &tx) + p.mu.Unlock() txHash := tx.Hash().Hex() jsonResponse, _ := json.Marshal(txHash) diff --git a/runner/clients/common/proxy/proxy_test.go b/runner/clients/common/proxy/proxy_test.go new file mode 100644 index 00000000..a5c0678f --- /dev/null +++ b/runner/clients/common/proxy/proxy_test.go @@ -0,0 +1,104 @@ +package proxy + +import ( + "bytes" + "encoding/json" + "math/big" + "net/http" + "net/http/httptest" + "testing" + + "github.com/base/base-bench/runner/network/mempool" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" +) + +func TestHandleBatchRequestCapturesRawTransactions(t *testing.T) { + chainID := big.NewInt(8453) + tx := signedTestTx(t, chainID) + rawTx, err := tx.MarshalBinary() + if err != nil { + t.Fatalf("marshal tx: %v", err) + } + + server := NewProxyServer( + "http://127.0.0.1:8545", + log.New(), + 0, + mempool.NewStaticWorkloadMempool(log.New(), chainID), + ) + + body, err := json.Marshal([]map[string]any{ + { + "jsonrpc": "2.0", + "id": 0, + "method": "eth_sendRawTransaction", + "params": []string{hexutil.Encode(rawTx)}, + }, + }) + if err != nil { + t.Fatalf("marshal request: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + server.handleRequest(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + var responses []struct { + Result string `json:"result"` + Error map[string]any `json:"error"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &responses); err != nil { + t.Fatalf("unmarshal response: %v", err) + } + if len(responses) != 1 { + t.Fatalf("expected 1 response, got %d", len(responses)) + } + if responses[0].Error != nil { + t.Fatalf("expected successful response, got error %v", responses[0].Error) + } + if responses[0].Result != tx.Hash().Hex() { + t.Fatalf("expected tx hash %s, got %s", tx.Hash().Hex(), responses[0].Result) + } + + pending := server.DrainPendingTxs() + if len(pending) != 1 { + t.Fatalf("expected 1 pending tx, got %d", len(pending)) + } + if pending[0].Hash() != tx.Hash() { + t.Fatalf("expected pending tx %s, got %s", tx.Hash(), pending[0].Hash()) + } +} + +func signedTestTx(t *testing.T, chainID *big.Int) *types.Transaction { + t.Helper() + + key, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("generate key: %v", err) + } + + tx := types.NewTx(&types.DynamicFeeTx{ + ChainID: chainID, + Nonce: 0, + GasTipCap: big.NewInt(1), + GasFeeCap: big.NewInt(1), + Gas: 21_000, + To: &common.Address{1}, + Value: big.NewInt(1), + }) + + signed, err := types.SignTx(tx, types.NewIsthmusSigner(chainID), key) + if err != nil { + t.Fatalf("sign tx: %v", err) + } + return signed +} diff --git a/runner/clients/geth/client.go b/runner/clients/geth/client.go index 89e40b67..a4efd25b 100644 --- a/runner/clients/geth/client.go +++ b/runner/clients/geth/client.go @@ -280,3 +280,7 @@ func (g *GethClient) FlashblocksClient() types.FlashblocksClient { func (g *GethClient) SupportsFlashblocks() bool { return false } + +func (g *GethClient) FlashblocksWsURL() string { + return "" +} diff --git a/runner/clients/reth/client.go b/runner/clients/reth/client.go index 7a4a25c8..35739d4e 100644 --- a/runner/clients/reth/client.go +++ b/runner/clients/reth/client.go @@ -296,3 +296,7 @@ func (r *RethClient) FlashblocksClient() types.FlashblocksClient { func (r *RethClient) SupportsFlashblocks() bool { return true } + +func (r *RethClient) FlashblocksWsURL() string { + return "" +} diff --git a/runner/clients/types/types.go b/runner/clients/types/types.go index a0be0080..c88b6212 100644 --- a/runner/clients/types/types.go +++ b/runner/clients/types/types.go @@ -32,4 +32,7 @@ type ExecutionClient interface { SetHead(ctx context.Context, blockNumber uint64) error FlashblocksClient() FlashblocksClient // returns nil for clients that don't support flashblocks SupportsFlashblocks() bool // returns true if the client supports receiving flashblock payloads + // FlashblocksWsURL returns the local WebSocket URL hosted by this client, + // or an empty string when the client does not expose one. + FlashblocksWsURL() string } diff --git a/runner/importer/service.go b/runner/importer/service.go index e771abb6..1ed2008d 100644 --- a/runner/importer/service.go +++ b/runner/importer/service.go @@ -75,6 +75,7 @@ func (s *Service) downloadOutputFiles(baseURL, runID, runOutputDir string) error "result-sequencer.json", "metrics-validator.json", "metrics-sequencer.json", + benchmark.LoadTestResultFileName, } requiredFiles := []string{ @@ -97,7 +98,7 @@ func (s *Service) downloadOutputFiles(baseURL, runID, runOutputDir string) error // Try to download the file err := s.downloadFile(fileURL, localFilePath) if err != nil { - if !slices.Contains(requiredFiles, fileName) { + if slices.Contains(requiredFiles, fileName) { return errors.Wrap(err, "failed to download file") } s.log.Warn("Failed to download file (continuing)", "file", fileName, "url", fileURL, "error", err) diff --git a/runner/network/network_benchmark.go b/runner/network/network_benchmark.go index 2bf13775..52a38b1b 100644 --- a/runner/network/network_benchmark.go +++ b/runner/network/network_benchmark.go @@ -247,11 +247,22 @@ func (nb *NetworkBenchmark) GetResult() (*benchmark.RunResult, error) { return nil, errors.New("metrics not collected") } + artifacts := make(map[string]string) + if nb.testConfig.LoadTestOutputPath != "" { + if _, err := os.Stat(nb.testConfig.LoadTestOutputPath); err == nil { + artifacts[benchmark.LoadTestResultArtifactKey] = benchmark.LoadTestResultFileName + } + } + if len(artifacts) == 0 { + artifacts = nil + } + return &benchmark.RunResult{ SequencerMetrics: *nb.collectedSequencerMetrics, ValidatorMetrics: *nb.collectedValidatorMetrics, Success: true, Complete: true, + Artifacts: artifacts, }, nil } diff --git a/runner/network/sequencer_benchmark.go b/runner/network/sequencer_benchmark.go index 29aa8c28..1a7e790d 100644 --- a/runner/network/sequencer_benchmark.go +++ b/runner/network/sequencer_benchmark.go @@ -15,6 +15,7 @@ import ( "github.com/base/base-bench/runner/network/proofprogram/fakel1" benchtypes "github.com/base/base-bench/runner/network/types" "github.com/base/base-bench/runner/payload" + payloadworker "github.com/base/base-bench/runner/payload/worker" "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" @@ -24,6 +25,8 @@ import ( "github.com/pkg/errors" ) +const gracefulWorkerShutdownTimeout = 90 * time.Second + type sequencerBenchmark struct { log log.Logger sequencerClient types.ExecutionClient @@ -282,8 +285,13 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. payloads = append(payloads, *payload) } - err = consensusClient.Stop(benchmarkCtx) - if err != nil { + pendingTxs, shutdownErr := nb.settleGracefulWorkerShutdown(benchmarkCtx, transactionWorker, consensusClient, pendingTxs) + if shutdownErr != nil { + errChan <- shutdownErr + return + } + + if err := consensusClient.Stop(benchmarkCtx); err != nil { nb.log.Warn("failed to stop consensus client", "err", err) } @@ -309,6 +317,64 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. } } +func (nb *sequencerBenchmark) settleGracefulWorkerShutdown( + ctx context.Context, + transactionWorker payloadworker.Worker, + consensusClient *consensus.SequencerConsensusClient, + pendingTxs int, +) (int, error) { + gracefulWorker, ok := transactionWorker.(payloadworker.GracefulShutdownWorker) + if !ok { + return pendingTxs, nil + } + + if err := gracefulWorker.BeginGracefulShutdown(ctx); err != nil { + return pendingTxs, errors.Wrap(err, "failed to begin graceful payload worker shutdown") + } + + timeout := time.NewTimer(gracefulWorkerShutdownTimeout) + defer timeout.Stop() + + settlementBlock := 0 + for { + select { + case <-gracefulWorker.Done(): + nb.log.Info("Payload worker stopped gracefully", "settlement_blocks", settlementBlock) + return pendingTxs, nil + case <-timeout.C: + nb.log.Warn("Timed out waiting for payload worker to stop gracefully", "settlement_blocks", settlementBlock) + return pendingTxs, nil + default: + } + + blockMetrics := metrics.NewBlockMetrics() + txsSent, err := transactionWorker.SendTxs(ctx, pendingTxs) + if err != nil { + return pendingTxs, errors.Wrap(err, "failed to collect settlement transactions") + } + + payload, err := consensusClient.Propose(ctx, blockMetrics, true) + if err != nil { + return pendingTxs, errors.Wrap(err, "failed to propose settlement block") + } + if payload == nil { + return pendingTxs, errors.New("received nil settlement payload from consensus client") + } + + userTxsIncluded := len(payload.Transactions) - 1 + if userTxsIncluded < 0 { + userTxsIncluded = 0 + } + pendingTxs = pendingTxs + txsSent - userTxsIncluded + if pendingTxs < 0 { + pendingTxs = 0 + } + + settlementBlock++ + time.Sleep(nb.config.Params.BlockTime) + } +} + // flashblockCollector implements FlashblockListener to collect flashblocks. type flashblockCollector struct { log log.Logger diff --git a/runner/network/types/types.go b/runner/network/types/types.go index c219c72a..2b93c1e5 100644 --- a/runner/network/types/types.go +++ b/runner/network/types/types.go @@ -40,6 +40,10 @@ type TestConfig struct { PrefundPrivateKey ecdsa.PrivateKey PrefundAmount big.Int + + // LoadTestOutputPath is the optional JSON summary path used by the + // load-test payload worker. + LoadTestOutputPath string } // BatcherAddr returns the batcher address, computing it if necessary diff --git a/runner/payload/factory.go b/runner/payload/factory.go index b8e7176e..7894563f 100644 --- a/runner/payload/factory.go +++ b/runner/payload/factory.go @@ -38,7 +38,7 @@ func NewPayloadWorker(ctx context.Context, log log.Logger, testConfig *benchtype def = &loadtest.LoadTestPayloadDefinition{} } worker, err = loadtest.NewLoadTestPayloadWorker( - log, sequencerClient.ClientURL(), params, privateKey, amount, config, genesis.Config.ChainID, *def) + log, sequencerClient.ClientURL(), sequencerClient.FlashblocksWsURL(), params, privateKey, amount, config, genesis.Config.ChainID, *def, testConfig.LoadTestOutputPath) case "transfer-only": worker, err = transferonly.NewTransferPayloadWorker( ctx, log, sequencerClient.ClientURL(), params, privateKey, amount, &genesis, definition.Params) diff --git a/runner/payload/loadtest/load_test_worker.go b/runner/payload/loadtest/load_test_worker.go index 5b6d62ff..0723496c 100644 --- a/runner/payload/loadtest/load_test_worker.go +++ b/runner/payload/loadtest/load_test_worker.go @@ -9,6 +9,9 @@ import ( "math/big" "os" "os/exec" + "path/filepath" + "sync" + "time" "github.com/base/base-bench/runner/clients/common/proxy" "github.com/base/base-bench/runner/config" @@ -32,13 +35,16 @@ type LoadTestPayloadDefinition struct { // loadTestConfig is the YAML config written to a temp file for the load-test binary. type loadTestConfig struct { - RPC string `yaml:"rpc"` - SenderCount uint64 `yaml:"sender_count"` - TargetGPS uint64 `yaml:"target_gps"` - Duration string `yaml:"duration"` - Seed uint64 `yaml:"seed"` - FundingAmount string `yaml:"funding_amount"` - Transactions yaml.Node `yaml:"transactions"` + RPC string `yaml:"rpc,omitempty"` + TransactionSubmissionRPCs []string `yaml:"transaction_submission_rpcs"` + QueryRPC string `yaml:"query_rpc"` + FlashblocksWs string `yaml:"flashblocks_ws"` + SenderCount uint64 `yaml:"sender_count"` + TargetGPS uint64 `yaml:"target_gps"` + Duration string `yaml:"duration"` + Seed uint64 `yaml:"seed"` + FundingAmount string `yaml:"funding_amount"` + Transactions yaml.Node `yaml:"transactions"` } type loadTestPayloadWorker struct { @@ -46,13 +52,19 @@ type loadTestPayloadWorker struct { prefundSK string loadTestBin string elRPCURL string + flashblocksURL string gasLimit uint64 blockTimeSec uint64 params LoadTestPayloadDefinition mempool *mempool.StaticWorkloadMempool proxyServer *proxy.ProxyServer cmd *exec.Cmd + done chan struct{} + waitErr error + waitMu sync.Mutex + shutdownOnce sync.Once configFilePath string + outputPath string } // NewLoadTestPayloadWorker creates a worker that runs the base-load-test binary @@ -60,12 +72,14 @@ type loadTestPayloadWorker struct { func NewLoadTestPayloadWorker( log log.Logger, elRPCURL string, + flashblocksURL string, params types.RunParams, prefundedPrivateKey ecdsa.PrivateKey, prefundAmount *big.Int, cfg config.Config, chainID *big.Int, definition LoadTestPayloadDefinition, + outputPath string, ) (worker.Worker, error) { mp := mempool.NewStaticWorkloadMempool(log, chainID) ps := proxy.NewProxyServer(elRPCURL, log, cfg.ProxyPort(), mp) @@ -76,15 +90,17 @@ func NewLoadTestPayloadWorker( } w := &loadTestPayloadWorker{ - log: log, - prefundSK: hex.EncodeToString(prefundedPrivateKey.D.Bytes()), - loadTestBin: cfg.LoadTestBinary(), - elRPCURL: elRPCURL, - gasLimit: params.GasLimit, - blockTimeSec: blockTimeSec, - params: definition, - mempool: mp, - proxyServer: ps, + log: log, + prefundSK: hex.EncodeToString(prefundedPrivateKey.D.Bytes()), + loadTestBin: cfg.LoadTestBinary(), + elRPCURL: elRPCURL, + flashblocksURL: flashblocksURL, + gasLimit: params.GasLimit, + blockTimeSec: blockTimeSec, + params: definition, + mempool: mp, + proxyServer: ps, + outputPath: outputPath, } return w, nil @@ -111,23 +127,85 @@ func (w *loadTestPayloadWorker) Setup(ctx context.Context) error { cmd.Stdout = os.Stdout cmd.Stderr = os.Stdout cmd.Env = append(os.Environ(), fmt.Sprintf("FUNDER_KEY=%s", w.prefundSK)) + if w.outputPath != "" { + if err := os.MkdirAll(filepath.Dir(w.outputPath), 0755); err != nil { + return errors.Wrap(err, "failed to create load-test output directory") + } + cmd.Env = append(cmd.Env, fmt.Sprintf("LOAD_TEST_OUTPUT=%s", w.outputPath)) + } if err := cmd.Start(); err != nil { return errors.Wrap(err, "failed to start load test binary") } w.cmd = cmd + w.done = make(chan struct{}) + go func() { + err := cmd.Wait() + w.waitMu.Lock() + w.waitErr = err + w.waitMu.Unlock() + close(w.done) + }() return nil } +func (w *loadTestPayloadWorker) BeginGracefulShutdown(ctx context.Context) error { + if w.cmd == nil || w.cmd.Process == nil { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-w.Done(): + return nil + default: + } + + var signalErr error + w.shutdownOnce.Do(func() { + w.log.Info("Stopping load test process gracefully", "pid", w.cmd.Process.Pid, "output", w.outputPath) + signalErr = w.cmd.Process.Signal(os.Interrupt) + }) + if signalErr != nil { + select { + case <-w.Done(): + return nil + default: + } + } + return signalErr +} + +func (w *loadTestPayloadWorker) Done() <-chan struct{} { + if w.done != nil { + return w.done + } + + done := make(chan struct{}) + close(done) + return done +} + func (w *loadTestPayloadWorker) Stop(ctx context.Context) error { if w.cmd != nil && w.cmd.Process != nil { - w.log.Info("Stopping load test process", "pid", w.cmd.Process.Pid) - if err := w.cmd.Process.Kill(); err != nil { - w.log.Warn("failed to kill load test process", "err", err) - } else { - // Reap the process to avoid zombies. - _, _ = w.cmd.Process.Wait() + if err := w.BeginGracefulShutdown(ctx); err != nil { + w.log.Warn("failed to signal load test process", "err", err) + } + + select { + case <-w.Done(): + case <-time.After(10 * time.Second): + w.log.Warn("load test process did not stop gracefully, killing", "pid", w.cmd.Process.Pid) + if err := w.cmd.Process.Kill(); err != nil { + w.log.Warn("failed to kill load test process", "err", err) + } + select { + case <-w.Done(): + case <-time.After(5 * time.Second): + w.log.Warn("timed out waiting for killed load test process") + } } } @@ -144,8 +222,7 @@ func (w *loadTestPayloadWorker) Stop(ctx context.Context) error { func (w *loadTestPayloadWorker) SendTxs(ctx context.Context, _ int) (int, error) { w.log.Info("Collecting txs from load test") - pendingTxs := w.proxyServer.PendingTxs() - w.proxyServer.ClearPendingTxs() + pendingTxs := w.proxyServer.DrainPendingTxs() w.mempool.AddTransactions(pendingTxs) return len(pendingTxs), nil @@ -206,14 +283,22 @@ func (w *loadTestPayloadWorker) writeConfig() (string, error) { transactions = defaultTransactions() } + flashblocksURL := w.flashblocksURL + if flashblocksURL == "" { + flashblocksURL = "ws://localhost:7111" + } + config := loadTestConfig{ - RPC: w.proxyServer.ClientURL(), - SenderCount: senderCount, - TargetGPS: targetGPS, - Duration: "99999s", - Seed: randomSeed(), - FundingAmount: fundingAmount, - Transactions: transactions, + RPC: w.proxyServer.ClientURL(), + TransactionSubmissionRPCs: []string{w.proxyServer.ClientURL()}, + QueryRPC: w.proxyServer.ClientURL(), + FlashblocksWs: flashblocksURL, + SenderCount: senderCount, + TargetGPS: targetGPS, + Duration: "99999s", + Seed: randomSeed(), + FundingAmount: fundingAmount, + Transactions: transactions, } data, err := yaml.Marshal(&config) diff --git a/runner/payload/txfuzz/tx_fuzz_worker.go b/runner/payload/txfuzz/tx_fuzz_worker.go index b5167d42..87fd5010 100644 --- a/runner/payload/txfuzz/tx_fuzz_worker.go +++ b/runner/payload/txfuzz/tx_fuzz_worker.go @@ -84,8 +84,7 @@ func (t *txFuzzPayloadWorker) Stop(ctx context.Context) error { func (t *txFuzzPayloadWorker) SendTxs(ctx context.Context, _ int) (int, error) { t.log.Info("Sending txs in tx-fuzz mode") - pending := t.proxyServer.PendingTxs() - t.proxyServer.ClearPendingTxs() + pending := t.proxyServer.DrainPendingTxs() t.mempool.AddTransactions(pending) return len(pending), nil diff --git a/runner/payload/worker/types.go b/runner/payload/worker/types.go index 69906ffe..a0e05725 100644 --- a/runner/payload/worker/types.go +++ b/runner/payload/worker/types.go @@ -18,3 +18,10 @@ type Worker interface { Stop(ctx context.Context) error Mempool() mempool.FakeMempool } + +// GracefulShutdownWorker can stop generating transactions while the benchmark +// sequencer keeps producing settlement blocks. +type GracefulShutdownWorker interface { + BeginGracefulShutdown(ctx context.Context) error + Done() <-chan struct{} +} diff --git a/runner/service.go b/runner/service.go index 2e1c0309..6d280136 100644 --- a/runner/service.go +++ b/runner/service.go @@ -423,12 +423,13 @@ func (s *service) runTest(ctx context.Context, params types.RunParams, workingDi prefundAmount := new(big.Int).Mul(big.NewInt(1e6), big.NewInt(ethparams.Ether)) config := &types.TestConfig{ - Params: params, - Config: s.config, - Genesis: *genesis, - BatcherKey: *batcherKey, - PrefundPrivateKey: *prefundKey, - PrefundAmount: *prefundAmount, + Params: params, + Config: s.config, + Genesis: *genesis, + BatcherKey: *batcherKey, + PrefundPrivateKey: *prefundKey, + PrefundAmount: *prefundAmount, + LoadTestOutputPath: path.Join(outputDir, benchmark.LoadTestResultFileName), } // Run benchmark From e60e7c97adf8579f28e97da82ba66576b0e027ea Mon Sep 17 00:00:00 2001 From: Niran Babalola Date: Fri, 15 May 2026 20:46:20 -0500 Subject: [PATCH 2/4] feat(load-test): write load-test reports to output/load-tests//.json Drop the benchmark-specific UI page and report-api route added in the previous commit. Instead the load-test worker writes a normal load-test JSON report to output/load-tests//.json, which the benchmarking uploader can publish alongside regular benchmark runs. Network is resolved from BASE_BENCH_LOAD_TEST_NETWORK env var, then the payload's network field, then a chain-ID fallback map. The path is passed to base-load-test via LOAD_TEST_OUTPUT. The worker now shuts down gracefully so the sequencer can drain remaining transactions before stopping. Also fixes an inverted slices.Contains guard in the importer that would have returned an error for optional (rather than required) missing files. --- report/src/App.tsx | 5 - report/src/components/RunList.tsx | 21 +- report/src/pages/BenchmarkLoadTestDetail.tsx | 50 ----- report/src/pages/LoadTestDetail.tsx | 202 ++++++++----------- report/src/services/dataService.ts | 17 -- report/src/types.ts | 4 - report/src/utils/useDataSeries.ts | 26 --- runner/benchmark/result_metadata.go | 7 +- runner/importer/service.go | 3 +- runner/network/network_benchmark.go | 17 +- runner/network/sequencer_benchmark.go | 21 +- runner/network/types/types.go | 4 +- runner/payload/loadtest/load_test_worker.go | 8 +- runner/service.go | 51 ++++- 14 files changed, 162 insertions(+), 274 deletions(-) delete mode 100644 report/src/pages/BenchmarkLoadTestDetail.tsx diff --git a/report/src/App.tsx b/report/src/App.tsx index a1050e76..cef213a2 100644 --- a/report/src/App.tsx +++ b/report/src/App.tsx @@ -5,7 +5,6 @@ import RedirectToLatestRun from "./pages/RedirectToLatestRun"; import LoadTestLanding from "./pages/LoadTestLanding"; import LoadTestAllRuns from "./pages/LoadTestAllRuns"; import LoadTestDetail from "./pages/LoadTestDetail"; -import BenchmarkLoadTestDetail from "./pages/BenchmarkLoadTestDetail"; import ErrorBoundary from "./components/ErrorBoundary"; function App() { @@ -23,10 +22,6 @@ function App() { path="/load-tests/:network/:timestamp" element={} /> - } - /> } /> -
- - {run.result?.artifacts?.loadTestResult && ( - - Load test - - )} -
+ {Object.keys(COLUMN_DEFINITIONS).map((column) => ( { - const { outputDir } = useParams(); - const { - data: result, - isLoading, - error, - } = useBenchmarkLoadTestResult(outputDir); - - return ( -
- -
- {isLoading && ( -
- Loading benchmark load test… -
- )} - - {error && ( -
- Failed to load benchmark load test result: {String(error)} -
- )} - - {result && ( - - Output dir: {outputDir} - - } - backLink={{ - to: "/latest", - label: "View benchmark runs →", - }} - /> - )} -
-
- ); -}; - -export default BenchmarkLoadTestDetail; diff --git a/report/src/pages/LoadTestDetail.tsx b/report/src/pages/LoadTestDetail.tsx index 83c02fb5..808ab814 100644 --- a/report/src/pages/LoadTestDetail.tsx +++ b/report/src/pages/LoadTestDetail.tsx @@ -1,5 +1,5 @@ import { Link, useParams } from "react-router-dom"; -import { type ReactNode, useMemo } from "react"; +import { useMemo } from "react"; import Navbar from "../components/Navbar"; import StatCard, { Stat, StatGrid } from "../components/StatCard"; import PercentileBarChart, { @@ -141,113 +141,50 @@ const SummarySection = ({ result }: { result: LoadTestResult }) => { ); }; -interface LoadTestReportContentProps { - result: LoadTestResult; - title: string; - subtitle: ReactNode; - backLink?: { - to: string; - label: string; - }; -} +const LoadTestDetail = () => { + const { network, timestamp } = useParams(); + const { + data: result, + isLoading, + error, + } = useLoadTestResult(network, timestamp); -export const LoadTestReportContent = ({ - result, - title, - subtitle, - backLink, -}: LoadTestReportContentProps) => { const blockLatencyRows = useMemo( - () => buildLatencyRows(result.block_latency), + () => (result ? buildLatencyRows(result.block_latency) : []), [result], ); const flashblocksLatencyRows = useMemo( - () => buildLatencyRows(result.flashblocks_latency), + () => (result ? buildLatencyRows(result.flashblocks_latency) : []), [result], ); return ( - <> -
-
- {backLink && ( +
+ +
+
+
- {backLink.label} + View all runs → - )} -

- {title} -

-

{subtitle}

-
-
- - - - {result.throughput_timeseries && - result.throughput_timeseries.length > 1 && ( - - - - )} - - {result.config && } - - - - - - - - - - - - - {result.top_failure_reasons.length === 0 ? ( -
No failures recorded.
- ) : ( -
    - {result.top_failure_reasons.map(([reason, count]) => ( -
  • - {reason} - {count.toLocaleString()} -
  • - ))} -
- )} -
- - ); -}; - -const LoadTestDetail = () => { - const { network, timestamp } = useParams(); - const { - data: result, - isLoading, - error, - } = useLoadTestResult(network, timestamp); +

+ {timestamp ? formatLoadTestTimestamp(timestamp) : "Load test"} +

+

+ Network: {network} + {timestamp && ( + <> + {" · "} + {timestamp} + + )} +

+
+
- return ( -
- -
{isLoading && (
Loading load test…
)} @@ -259,27 +196,62 @@ const LoadTestDetail = () => { )} {result && ( - - Network: {network} - {timestamp && ( - <> - {" · "} - - {timestamp} - - - )} - - } - backLink={{ - to: `/load-tests/${network ?? "sepolia"}/all`, - label: "View all runs →", - }} - /> + <> + + + {result.throughput_timeseries && + result.throughput_timeseries.length > 1 && ( + + + + )} + + {result.config && } + + + + + + + + + + + + + {result.top_failure_reasons.length === 0 ? ( +
+ No failures recorded. +
+ ) : ( +
    + {result.top_failure_reasons.map(([reason, count]) => ( +
  • + {reason} + + {count.toLocaleString()} + +
  • + ))} +
+ )} +
+ )}
diff --git a/report/src/services/dataService.ts b/report/src/services/dataService.ts index 45236ac5..458c4b66 100644 --- a/report/src/services/dataService.ts +++ b/report/src/services/dataService.ts @@ -79,23 +79,6 @@ export class DataService { return await response.json(); } - - async getBenchmarkLoadTestResult( - outputDir: string, - artifactPath = "load-test-result.json", - ): Promise { - const response = await fetch( - `${this.baseUrl}output/${encodeURIComponent(outputDir)}/${encodeURIComponent(artifactPath)}`, - ); - - if (!response.ok) { - throw new Error( - `Failed to fetch benchmark load test result: ${response.status} ${response.statusText}`, - ); - } - - return await response.json(); - } } // Configuration helper to determine base URL from environment diff --git a/report/src/types.ts b/report/src/types.ts index 2d889950..321ad631 100644 --- a/report/src/types.ts +++ b/report/src/types.ts @@ -87,10 +87,6 @@ export interface BenchmarkRun { gasPerSecond: number; newPayload: number; }; - artifacts?: { - loadTestResult?: string; - [key: string]: string | undefined; - }; } | null; } diff --git a/report/src/utils/useDataSeries.ts b/report/src/utils/useDataSeries.ts index 5f36a7b7..24f08eca 100644 --- a/report/src/utils/useDataSeries.ts +++ b/report/src/utils/useDataSeries.ts @@ -139,29 +139,3 @@ export const useLoadTestResult = ( }, ); }; - -export const useBenchmarkLoadTestResult = ( - outputDir: string | undefined, - artifactPath?: string, -) => { - const fetcher = useCallback(async (): Promise => { - if (!outputDir) { - throw new Error("outputDir required"); - } - const dataService = getDataService(); - return await dataService.getBenchmarkLoadTestResult(outputDir, artifactPath); - }, [outputDir, artifactPath]); - - return useSWR( - outputDir - ? `benchmark-load-test-${outputDir}-${artifactPath ?? "load-test-result.json"}` - : null, - fetcher, - { - dedupingInterval: 12 * 60 * 60 * 1000, - revalidateOnFocus: false, - errorRetryCount: 3, - errorRetryInterval: 5000, - }, - ); -}; diff --git a/runner/benchmark/result_metadata.go b/runner/benchmark/result_metadata.go index b06f755a..03336e26 100644 --- a/runner/benchmark/result_metadata.go +++ b/runner/benchmark/result_metadata.go @@ -12,7 +12,6 @@ type RunResult struct { SequencerMetrics types.SequencerKeyMetrics `json:"sequencerMetrics"` ValidatorMetrics types.ValidatorKeyMetrics `json:"validatorMetrics"` ClientVersion string `json:"clientVersion,omitempty"` - Artifacts map[string]string `json:"artifacts,omitempty"` } // MachineInfo contains information about the machine running the benchmark @@ -52,9 +51,9 @@ func (runs *RunGroup) AddResult(testIdx int, runResult RunResult) { } const ( - BenchmarkRunTag = "BenchmarkRun" - LoadTestResultArtifactKey = "loadTestResult" - LoadTestResultFileName = "load-test-result.json" + BenchmarkRunTag = "BenchmarkRun" + LoadTestResultsDir = "load-tests" + LoadTestTimestampLayout = "2006-01-02-15-04-05" ) func RunGroupFromTestPlans(testPlans []TestPlan, machineInfo *MachineInfo) RunGroup { diff --git a/runner/importer/service.go b/runner/importer/service.go index 1ed2008d..e771abb6 100644 --- a/runner/importer/service.go +++ b/runner/importer/service.go @@ -75,7 +75,6 @@ func (s *Service) downloadOutputFiles(baseURL, runID, runOutputDir string) error "result-sequencer.json", "metrics-validator.json", "metrics-sequencer.json", - benchmark.LoadTestResultFileName, } requiredFiles := []string{ @@ -98,7 +97,7 @@ func (s *Service) downloadOutputFiles(baseURL, runID, runOutputDir string) error // Try to download the file err := s.downloadFile(fileURL, localFilePath) if err != nil { - if slices.Contains(requiredFiles, fileName) { + if !slices.Contains(requiredFiles, fileName) { return errors.Wrap(err, "failed to download file") } s.log.Warn("Failed to download file (continuing)", "file", fileName, "url", fileURL, "error", err) diff --git a/runner/network/network_benchmark.go b/runner/network/network_benchmark.go index 52a38b1b..fd83d113 100644 --- a/runner/network/network_benchmark.go +++ b/runner/network/network_benchmark.go @@ -44,9 +44,9 @@ type NetworkBenchmark struct { testConfig *benchtypes.TestConfig proofConfig *benchmark.ProofProgramOptions - transactionPayload payload.Definition - ports portmanager.PortManager - flashblocksBlockTime string + transactionPayload payload.Definition + ports portmanager.PortManager + flashblocksBlockTime string } // NewNetworkBenchmark creates a new network benchmark and initializes the payload worker and consensus client @@ -247,22 +247,11 @@ func (nb *NetworkBenchmark) GetResult() (*benchmark.RunResult, error) { return nil, errors.New("metrics not collected") } - artifacts := make(map[string]string) - if nb.testConfig.LoadTestOutputPath != "" { - if _, err := os.Stat(nb.testConfig.LoadTestOutputPath); err == nil { - artifacts[benchmark.LoadTestResultArtifactKey] = benchmark.LoadTestResultFileName - } - } - if len(artifacts) == 0 { - artifacts = nil - } - return &benchmark.RunResult{ SequencerMetrics: *nb.collectedSequencerMetrics, ValidatorMetrics: *nb.collectedValidatorMetrics, Success: true, Complete: true, - Artifacts: artifacts, }, nil } diff --git a/runner/network/sequencer_benchmark.go b/runner/network/sequencer_benchmark.go index 1a7e790d..83e493c4 100644 --- a/runner/network/sequencer_benchmark.go +++ b/runner/network/sequencer_benchmark.go @@ -285,9 +285,8 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. payloads = append(payloads, *payload) } - pendingTxs, shutdownErr := nb.settleGracefulWorkerShutdown(benchmarkCtx, transactionWorker, consensusClient, pendingTxs) - if shutdownErr != nil { - errChan <- shutdownErr + if err := nb.settleGracefulWorkerShutdown(benchmarkCtx, transactionWorker, consensusClient, pendingTxs); err != nil { + errChan <- err return } @@ -322,14 +321,14 @@ func (nb *sequencerBenchmark) settleGracefulWorkerShutdown( transactionWorker payloadworker.Worker, consensusClient *consensus.SequencerConsensusClient, pendingTxs int, -) (int, error) { +) error { gracefulWorker, ok := transactionWorker.(payloadworker.GracefulShutdownWorker) if !ok { - return pendingTxs, nil + return nil } if err := gracefulWorker.BeginGracefulShutdown(ctx); err != nil { - return pendingTxs, errors.Wrap(err, "failed to begin graceful payload worker shutdown") + return errors.Wrap(err, "failed to begin graceful payload worker shutdown") } timeout := time.NewTimer(gracefulWorkerShutdownTimeout) @@ -340,25 +339,25 @@ func (nb *sequencerBenchmark) settleGracefulWorkerShutdown( select { case <-gracefulWorker.Done(): nb.log.Info("Payload worker stopped gracefully", "settlement_blocks", settlementBlock) - return pendingTxs, nil + return nil case <-timeout.C: nb.log.Warn("Timed out waiting for payload worker to stop gracefully", "settlement_blocks", settlementBlock) - return pendingTxs, nil + return nil default: } blockMetrics := metrics.NewBlockMetrics() txsSent, err := transactionWorker.SendTxs(ctx, pendingTxs) if err != nil { - return pendingTxs, errors.Wrap(err, "failed to collect settlement transactions") + return errors.Wrap(err, "failed to collect settlement transactions") } payload, err := consensusClient.Propose(ctx, blockMetrics, true) if err != nil { - return pendingTxs, errors.Wrap(err, "failed to propose settlement block") + return errors.Wrap(err, "failed to propose settlement block") } if payload == nil { - return pendingTxs, errors.New("received nil settlement payload from consensus client") + return errors.New("received nil settlement payload from consensus client") } userTxsIncluded := len(payload.Transactions) - 1 diff --git a/runner/network/types/types.go b/runner/network/types/types.go index 2b93c1e5..9bb0b34a 100644 --- a/runner/network/types/types.go +++ b/runner/network/types/types.go @@ -41,8 +41,8 @@ type TestConfig struct { PrefundPrivateKey ecdsa.PrivateKey PrefundAmount big.Int - // LoadTestOutputPath is the optional JSON summary path used by the - // load-test payload worker. + // LoadTestOutputPath is the optional normal load-test report JSON path used + // by the load-test payload worker. LoadTestOutputPath string } diff --git a/runner/payload/loadtest/load_test_worker.go b/runner/payload/loadtest/load_test_worker.go index 0723496c..a236924d 100644 --- a/runner/payload/loadtest/load_test_worker.go +++ b/runner/payload/loadtest/load_test_worker.go @@ -30,6 +30,7 @@ import ( type LoadTestPayloadDefinition struct { SenderCount uint64 `yaml:"sender_count"` FundingAmount string `yaml:"funding_amount"` + Network string `yaml:"network"` Transactions yaml.Node `yaml:"transactions"` } @@ -60,8 +61,6 @@ type loadTestPayloadWorker struct { proxyServer *proxy.ProxyServer cmd *exec.Cmd done chan struct{} - waitErr error - waitMu sync.Mutex shutdownOnce sync.Once configFilePath string outputPath string @@ -140,10 +139,7 @@ func (w *loadTestPayloadWorker) Setup(ctx context.Context) error { w.cmd = cmd w.done = make(chan struct{}) go func() { - err := cmd.Wait() - w.waitMu.Lock() - w.waitErr = err - w.waitMu.Unlock() + _ = cmd.Wait() close(w.done) }() diff --git a/runner/service.go b/runner/service.go index 6d280136..8916094c 100644 --- a/runner/service.go +++ b/runner/service.go @@ -26,6 +26,7 @@ import ( "github.com/base/base-bench/runner/network" "github.com/base/base-bench/runner/network/types" "github.com/base/base-bench/runner/payload" + "github.com/base/base-bench/runner/payload/loadtest" "github.com/base/base-bench/runner/utils" "github.com/ethereum/go-ethereum/core" ethparams "github.com/ethereum/go-ethereum/params" @@ -368,6 +369,54 @@ func (s *service) setupBlobsDir(workingDir string) error { return nil } +func loadTestNetwork(genesis *core.Genesis, transactionPayload payload.Definition) string { + if envNetwork := os.Getenv("BASE_BENCH_LOAD_TEST_NETWORK"); envNetwork != "" { + return envNetwork + } + + if transactionPayload.Type == "load-test" { + if def, ok := transactionPayload.Params.(*loadtest.LoadTestPayloadDefinition); ok && def.Network != "" { + return def.Network + } + } + + if genesis == nil || genesis.Config == nil || genesis.Config.ChainID == nil { + return "unknown" + } + + switch genesis.Config.ChainID.Uint64() { + case 8453: + return "mainnet" + case 84532: + return "sepolia" + case 13371337: + return "devnet" + default: + return fmt.Sprintf("chain-%s", genesis.Config.ChainID.String()) + } +} + +func (s *service) loadTestOutputPath(genesis *core.Genesis, transactionPayload payload.Definition) string { + if transactionPayload.Type != "load-test" { + return "" + } + + network := loadTestNetwork(genesis, transactionPayload) + baseTime := time.Now().UTC() + for i := 0; ; i++ { + timestamp := baseTime.Add(time.Duration(i) * time.Second).Format(benchmark.LoadTestTimestampLayout) + outputPath := path.Join( + s.config.OutputDir(), + benchmark.LoadTestResultsDir, + network, + fmt.Sprintf("%s.json", timestamp), + ) + if _, err := os.Stat(outputPath); err != nil { + return outputPath + } + } +} + func (s *service) runTest(ctx context.Context, params types.RunParams, workingDir string, outputDir string, snapshotConfig *benchmark.SnapshotDefinition, proofConfig *benchmark.ProofProgramOptions, transactionPayload payload.Definition, datadirsConfig *benchmark.DatadirConfig, flashblocksBlockTime string) (*benchmark.RunResult, error) { s.log.Info(fmt.Sprintf("Running benchmark with params: %+v", params)) @@ -429,7 +478,7 @@ func (s *service) runTest(ctx context.Context, params types.RunParams, workingDi BatcherKey: *batcherKey, PrefundPrivateKey: *prefundKey, PrefundAmount: *prefundAmount, - LoadTestOutputPath: path.Join(outputDir, benchmark.LoadTestResultFileName), + LoadTestOutputPath: s.loadTestOutputPath(genesis, transactionPayload), } // Run benchmark From c83ce2e8f480b2e01c12c756cb4bd6ee06eb1bc7 Mon Sep 17 00:00:00 2001 From: Niran Babalola Date: Mon, 18 May 2026 16:21:29 -0500 Subject: [PATCH 3/4] feat(load-test): use native config files --- configs/examples/load-test-config.yml | 20 ++ configs/examples/load-test.yml | 12 +- runner/payload/loadtest/load_test_worker.go | 230 ++++++++++-------- .../payload/loadtest/load_test_worker_test.go | 125 ++++++++++ 4 files changed, 269 insertions(+), 118 deletions(-) create mode 100644 configs/examples/load-test-config.yml create mode 100644 runner/payload/loadtest/load_test_worker_test.go diff --git a/configs/examples/load-test-config.yml b/configs/examples/load-test-config.yml new file mode 100644 index 00000000..77074cdd --- /dev/null +++ b/configs/examples/load-test-config.yml @@ -0,0 +1,20 @@ +transaction_submission_rpcs: + - "http://localhost:8545" +query_rpc: "http://localhost:8545" +txpool_nodes: [] +flashblocks_ws: "ws://localhost:7111" + +sender_count: 10 +target_gps: 500000000 +duration: "60s" +funding_amount: "10000000000000000000" + +transactions: + - weight: 70 + type: transfer + - weight: 20 + type: calldata + max_size: 256 + - weight: 10 + type: precompile + target: sha256 diff --git a/configs/examples/load-test.yml b/configs/examples/load-test.yml index 6743ca60..163513ce 100644 --- a/configs/examples/load-test.yml +++ b/configs/examples/load-test.yml @@ -4,16 +4,8 @@ payloads: - name: Load Test type: load-test id: load-test - sender_count: 10 - transactions: - - weight: 70 - type: transfer - - weight: 20 - type: calldata - max_size: 256 - - weight: 10 - type: precompile - target: sha256 + network: devnet + config_file: ./load-test-config.yml benchmarks: - variables: diff --git a/runner/payload/loadtest/load_test_worker.go b/runner/payload/loadtest/load_test_worker.go index a236924d..92527aa8 100644 --- a/runner/payload/loadtest/load_test_worker.go +++ b/runner/payload/loadtest/load_test_worker.go @@ -3,13 +3,13 @@ package loadtest import ( "context" "crypto/ecdsa" - cryptorand "crypto/rand" "encoding/hex" "fmt" "math/big" "os" "os/exec" "path/filepath" + "strconv" "sync" "time" @@ -24,46 +24,30 @@ import ( ) // LoadTestPayloadDefinition is the YAML payload params for the load-test type. -// Fields map directly to the Rust base-load-test config format. -// The `transactions` field is passed through as raw YAML to support the full -// Rust config schema (transfer, calldata, precompile, erc20, etc.). +// The load-test workload itself lives in a native base-load-tester config file; +// benchmark mode only overlays the RPC and timing fields it must control. type LoadTestPayloadDefinition struct { - SenderCount uint64 `yaml:"sender_count"` - FundingAmount string `yaml:"funding_amount"` - Network string `yaml:"network"` - Transactions yaml.Node `yaml:"transactions"` -} - -// loadTestConfig is the YAML config written to a temp file for the load-test binary. -type loadTestConfig struct { - RPC string `yaml:"rpc,omitempty"` - TransactionSubmissionRPCs []string `yaml:"transaction_submission_rpcs"` - QueryRPC string `yaml:"query_rpc"` - FlashblocksWs string `yaml:"flashblocks_ws"` - SenderCount uint64 `yaml:"sender_count"` - TargetGPS uint64 `yaml:"target_gps"` - Duration string `yaml:"duration"` - Seed uint64 `yaml:"seed"` - FundingAmount string `yaml:"funding_amount"` - Transactions yaml.Node `yaml:"transactions"` + ConfigFile string `yaml:"config_file"` + Network string `yaml:"network"` } type loadTestPayloadWorker struct { - log log.Logger - prefundSK string - loadTestBin string - elRPCURL string - flashblocksURL string - gasLimit uint64 - blockTimeSec uint64 - params LoadTestPayloadDefinition - mempool *mempool.StaticWorkloadMempool - proxyServer *proxy.ProxyServer - cmd *exec.Cmd - done chan struct{} - shutdownOnce sync.Once - configFilePath string - outputPath string + log log.Logger + prefundSK string + loadTestBin string + elRPCURL string + flashblocksURL string + gasLimit uint64 + blockTimeSec uint64 + params LoadTestPayloadDefinition + mempool *mempool.StaticWorkloadMempool + proxyServer *proxy.ProxyServer + cmd *exec.Cmd + done chan struct{} + shutdownOnce sync.Once + sourceConfigPath string + renderedConfigPath string + outputPath string } // NewLoadTestPayloadWorker creates a worker that runs the base-load-test binary @@ -88,18 +72,24 @@ func NewLoadTestPayloadWorker( blockTimeSec = 1 } + sourceConfigPath, err := resolveConfigFilePath(cfg.ConfigPath(), definition.ConfigFile) + if err != nil { + return nil, err + } + w := &loadTestPayloadWorker{ - log: log, - prefundSK: hex.EncodeToString(prefundedPrivateKey.D.Bytes()), - loadTestBin: cfg.LoadTestBinary(), - elRPCURL: elRPCURL, - flashblocksURL: flashblocksURL, - gasLimit: params.GasLimit, - blockTimeSec: blockTimeSec, - params: definition, - mempool: mp, - proxyServer: ps, - outputPath: outputPath, + log: log, + prefundSK: hex.EncodeToString(prefundedPrivateKey.D.Bytes()), + loadTestBin: cfg.LoadTestBinary(), + elRPCURL: elRPCURL, + flashblocksURL: flashblocksURL, + gasLimit: params.GasLimit, + blockTimeSec: blockTimeSec, + params: definition, + mempool: mp, + proxyServer: ps, + sourceConfigPath: sourceConfigPath, + outputPath: outputPath, } return w, nil @@ -118,7 +108,7 @@ func (w *loadTestPayloadWorker) Setup(ctx context.Context) error { if err != nil { return errors.Wrap(err, "failed to write load-test config") } - w.configFilePath = configPath + w.renderedConfigPath = configPath w.log.Info("Starting load test", "binary", w.loadTestBin, "config", configPath) @@ -207,9 +197,9 @@ func (w *loadTestPayloadWorker) Stop(ctx context.Context) error { w.proxyServer.Stop() - if w.configFilePath != "" { - if err := os.Remove(w.configFilePath); err != nil { - w.log.Warn("failed to remove load-test config", "path", w.configFilePath, "err", err) + if w.renderedConfigPath != "" { + if err := os.Remove(w.renderedConfigPath); err != nil { + w.log.Warn("failed to remove load-test config", "path", w.renderedConfigPath, "err", err) } } @@ -224,80 +214,104 @@ func (w *loadTestPayloadWorker) SendTxs(ctx context.Context, _ int) (int, error) return len(pendingTxs), nil } -// defaultTransactions returns the default transaction mix as a yaml.Node. -func defaultTransactions() yaml.Node { - var node yaml.Node - // Default: 70% transfer, 20% calldata, 10% precompile - defaultYAML := ` -- weight: 70 - type: transfer -- weight: 20 - type: calldata - max_size: 256 -- weight: 10 - type: precompile - target: sha256 -` - if err := yaml.Unmarshal([]byte(defaultYAML), &node); err != nil { - panic(fmt.Sprintf("failed to parse default transactions YAML: %v", err)) +func resolveConfigFilePath(benchmarkConfigPath string, loadTestConfigPath string) (string, error) { + if loadTestConfigPath == "" { + return "", errors.New("load-test payload requires config_file") } - // yaml.Unmarshal wraps in a document node; return the inner sequence - if node.Kind == yaml.DocumentNode && len(node.Content) > 0 { - return *node.Content[0] + if filepath.IsAbs(loadTestConfigPath) { + return loadTestConfigPath, nil } - return node + return filepath.Join(filepath.Dir(benchmarkConfigPath), loadTestConfigPath), nil } -// randomSeed returns a cryptographically random uint64 seed. -func randomSeed() uint64 { - var b [8]byte - if _, err := cryptorand.Read(b[:]); err != nil { - return 42 +func (w *loadTestPayloadWorker) targetGPS() uint64 { + blockTimeSec := w.blockTimeSec + if blockTimeSec == 0 { + blockTimeSec = 1 } - return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 | - uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56 + return w.gasLimit / blockTimeSec } -// writeConfig generates a temporary YAML config file for the load-test binary -// with the RPC URL pointing to the proxy server. -func (w *loadTestPayloadWorker) writeConfig() (string, error) { - senderCount := w.params.SenderCount - if senderCount == 0 { - senderCount = 10 +func (w *loadTestPayloadWorker) buildConfig() (*yaml.Node, error) { + data, err := os.ReadFile(w.sourceConfigPath) + if err != nil { + return nil, errors.Wrap(err, "failed to read load-test config file") } - fundingAmount := w.params.FundingAmount - if fundingAmount == "" { - fundingAmount = "10000000000000000000" + var doc yaml.Node + if err := yaml.Unmarshal(data, &doc); err != nil { + return nil, errors.Wrap(err, "failed to parse load-test config file") } - // Compute target GPS from gas limit and block time - targetGPS := w.gasLimit / w.blockTimeSec - - transactions := w.params.Transactions - if transactions.Kind == 0 { - transactions = defaultTransactions() + config, err := mappingRoot(&doc) + if err != nil { + return nil, err } + proxyURL := w.proxyServer.ClientURL() + setMappingValue(config, "transaction_submission_rpcs", stringSequenceNode(proxyURL)) + setMappingValue(config, "query_rpc", stringNode(proxyURL)) + flashblocksURL := w.flashblocksURL if flashblocksURL == "" { flashblocksURL = "ws://localhost:7111" } + setMappingValue(config, "flashblocks_ws", stringNode(flashblocksURL)) + setMappingValue(config, "target_gps", uintNode(w.targetGPS())) + setMappingValue(config, "duration", stringNode("99999s")) + + return config, nil +} + +func mappingRoot(doc *yaml.Node) (*yaml.Node, error) { + root := doc + if doc.Kind == yaml.DocumentNode { + if len(doc.Content) == 0 { + return nil, errors.New("load-test config file is empty") + } + root = doc.Content[0] + } - config := loadTestConfig{ - RPC: w.proxyServer.ClientURL(), - TransactionSubmissionRPCs: []string{w.proxyServer.ClientURL()}, - QueryRPC: w.proxyServer.ClientURL(), - FlashblocksWs: flashblocksURL, - SenderCount: senderCount, - TargetGPS: targetGPS, - Duration: "99999s", - Seed: randomSeed(), - FundingAmount: fundingAmount, - Transactions: transactions, + if root.Kind != yaml.MappingNode { + return nil, fmt.Errorf("load-test config file must be a YAML mapping, got kind %d", root.Kind) + } + return root, nil +} + +func setMappingValue(mapping *yaml.Node, key string, value *yaml.Node) { + for i := 0; i < len(mapping.Content)-1; i += 2 { + if mapping.Content[i].Value == key { + mapping.Content[i+1] = value + return + } } + mapping.Content = append(mapping.Content, stringNode(key), value) +} + +func stringNode(value string) *yaml.Node { + return &yaml.Node{Kind: yaml.ScalarNode, Tag: "!!str", Value: value} +} + +func uintNode(value uint64) *yaml.Node { + return &yaml.Node{Kind: yaml.ScalarNode, Tag: "!!int", Value: strconv.FormatUint(value, 10)} +} - data, err := yaml.Marshal(&config) +func stringSequenceNode(values ...string) *yaml.Node { + node := &yaml.Node{Kind: yaml.SequenceNode, Tag: "!!seq"} + for _, value := range values { + node.Content = append(node.Content, stringNode(value)) + } + return node +} + +// writeConfig generates a temporary YAML config file for the load-test binary +// with the RPC URL pointing to the proxy server. +func (w *loadTestPayloadWorker) writeConfig() (string, error) { + config, err := w.buildConfig() + if err != nil { + return "", err + } + data, err := yaml.Marshal(config) if err != nil { return "", errors.Wrap(err, "failed to marshal load-test config") } @@ -317,8 +331,8 @@ func (w *loadTestPayloadWorker) writeConfig() (string, error) { } w.log.Info("Generated load-test config", - "sender_count", senderCount, - "target_gps", targetGPS, + "source_config", w.sourceConfigPath, + "target_gps", w.targetGPS(), "gas_limit", w.gasLimit, "block_time_sec", w.blockTimeSec, ) diff --git a/runner/payload/loadtest/load_test_worker_test.go b/runner/payload/loadtest/load_test_worker_test.go new file mode 100644 index 00000000..b69e8784 --- /dev/null +++ b/runner/payload/loadtest/load_test_worker_test.go @@ -0,0 +1,125 @@ +package loadtest + +import ( + "os" + "path/filepath" + "testing" + + "github.com/base/base-bench/runner/clients/common/proxy" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" +) + +func TestBuildConfigOverlaysBenchmarkFieldsAndPreservesLoadTestConfig(t *testing.T) { + configPath := filepath.Join(t.TempDir(), "mainnet-state-weth-usdc-swaps.yaml") + err := os.WriteFile(configPath, []byte(` +transaction_submission_rpcs: + - "http://standalone-submitter.invalid" +query_rpc: "http://standalone-query.invalid" +flashblocks_ws: "ws://standalone-flashblocks.invalid" +target_gps: 123 +duration: "60s" +chain_id: 8453 +sender_count: 250 +in_flight_per_sender: 64 +batch_size: 20 +batch_timeout: "10ms" +seed: 654789 +funding_amount: "200000000000000000" +real_token_setup: + enabled: true + allow_chain_id_8453: true + weth: "0x4200000000000000000000000000000000000006" + weth_amount_per_sender: "50000000000000000" + pair_token: + token: "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" + amount_per_sender: "10000000" + acquisition: + type: uniswap_v3_exact_input + router: "0x2626664c2603336E57B271c5C0b26F421741e481" + fee: 500 + amount_in: "10000000000000000" + min_amount_out: "0" +transactions: + - weight: 50 + type: uniswap_v3 + router: "0x2626664c2603336E57B271c5C0b26F421741e481" + token_in: "0x4200000000000000000000000000000000000006" + token_out: "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" + fee: 500 + min_amount: "10000000000000" + max_amount: "100000000000000" + reverse_min_amount: "100000" + reverse_max_amount: "1000000" + - weight: 50 + type: aerodrome_cl + router: "0xBE6D8f0d05cC4be24d5167a3eF062215bE6D18a5" + token_in: "0x4200000000000000000000000000000000000006" + token_out: "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" + tick_spacing: 100 + min_amount: "10000000000000" + max_amount: "100000000000000" + reverse_min_amount: "100000" + reverse_max_amount: "1000000" +`), 0644) + require.NoError(t, err) + + worker := &loadTestPayloadWorker{ + flashblocksURL: "ws://benchmark-flashblocks.example", + gasLimit: 150_000_000, + blockTimeSec: 2, + proxyServer: proxy.NewProxyServer("http://sequencer.example", nil, 18546, nil), + sourceConfigPath: configPath, + } + + config, err := worker.buildConfig() + require.NoError(t, err) + + encoded, err := yaml.Marshal(config) + require.NoError(t, err) + output := string(encoded) + + for _, want := range []string{ + "transaction_submission_rpcs:\n - http://localhost:18546", + "query_rpc: http://localhost:18546", + "flashblocks_ws: ws://benchmark-flashblocks.example", + "target_gps: 75000000", + "duration: 99999s", + "chain_id: 8453", + "sender_count: 250", + "in_flight_per_sender: 64", + "batch_size: 20", + "batch_timeout: \"10ms\"", + "seed: 654789", + "real_token_setup:", + "allow_chain_id_8453: true", + "type: uniswap_v3", + "type: aerodrome_cl", + "reverse_min_amount: \"100000\"", + } { + require.Contains(t, output, want) + } + for _, oldValue := range []string{ + "standalone-submitter.invalid", + "standalone-query.invalid", + "standalone-flashblocks.invalid", + "target_gps: 123", + "duration: 60s", + } { + require.NotContains(t, output, oldValue) + } +} + +func TestResolveConfigFilePath(t *testing.T) { + resolved, err := resolveConfigFilePath("/tmp/configs/benchmark.yml", "load-tests/mainnet.yaml") + require.NoError(t, err) + require.Equal(t, "/tmp/configs/load-tests/mainnet.yaml", resolved) + + resolved, err = resolveConfigFilePath("/tmp/configs/benchmark.yml", "/var/load-tests/mainnet.yaml") + require.NoError(t, err) + require.Equal(t, "/var/load-tests/mainnet.yaml", resolved) + + _, err = resolveConfigFilePath("/tmp/configs/benchmark.yml", "") + require.Error(t, err) + require.Contains(t, err.Error(), "config_file") +} From 432e245467b399a4f60296c9ea9660694c9e9a08 Mon Sep 17 00:00:00 2001 From: Niran Babalola Date: Tue, 19 May 2026 15:19:11 -0500 Subject: [PATCH 4/4] fix(load-test): proxy snapshot nonces from upstream Forward pass-through JSON-RPC batch items to the upstream node while continuing to capture raw transactions. Fetch eth_getTransactionCount from upstream and merge it with transactions already accepted by the proxy so snapshot-backed accounts keep their real chain nonce. Co-authored-by: Codex --- runner/clients/common/proxy/proxy.go | 199 +++++++++++++--- runner/clients/common/proxy/proxy_test.go | 266 +++++++++++++++++++++- 2 files changed, 433 insertions(+), 32 deletions(-) diff --git a/runner/clients/common/proxy/proxy.go b/runner/clients/common/proxy/proxy.go index b212ec0b..bf3f8147 100644 --- a/runner/clients/common/proxy/proxy.go +++ b/runner/clients/common/proxy/proxy.go @@ -21,6 +21,7 @@ import ( "github.com/base/base-bench/runner/network/mempool" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" ethTypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" ) @@ -32,15 +33,31 @@ type ProxyServer struct { pendingTxs []*ethTypes.Transaction clientURL string mempool *mempool.StaticWorkloadMempool + nextNonce map[common.Address]uint64 mu sync.Mutex } +type rpcRequest struct { + Method string `json:"method"` + Params json.RawMessage `json:"params"` + ID interface{} `json:"id"` + JSONRPC string `json:"jsonrpc"` +} + +type rpcResponse struct { + JSONRPC string `json:"jsonrpc"` + ID interface{} `json:"id"` + Result json.RawMessage `json:"result,omitempty"` + Error interface{} `json:"error,omitempty"` +} + func NewProxyServer(clientURL string, log log.Logger, port int, mempool *mempool.StaticWorkloadMempool) *ProxyServer { return &ProxyServer{ clientURL: clientURL, log: log, port: port, mempool: mempool, + nextNonce: make(map[common.Address]uint64), } } @@ -109,13 +126,6 @@ func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) { return } - type rpcRequest struct { - Method string `json:"method"` - Params json.RawMessage `json:"params"` - ID interface{} `json:"id"` - JSONRPC string `json:"jsonrpc"` - } - if len(body) > 0 && body[0] == '[' { p.handleBatchRequest(w, body) return @@ -135,11 +145,7 @@ func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) { } if handled { - resp := struct { - JSONRPC string `json:"jsonrpc"` - ID interface{} `json:"id"` - Result json.RawMessage `json:"result"` - }{ + resp := rpcResponse{ JSONRPC: request.JSONRPC, ID: request.ID, Result: response, @@ -192,26 +198,12 @@ func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) { } func (p *ProxyServer) handleBatchRequest(w http.ResponseWriter, body []byte) { - type rpcRequest struct { - Method string `json:"method"` - Params json.RawMessage `json:"params"` - ID interface{} `json:"id"` - JSONRPC string `json:"jsonrpc"` - } - var requests []rpcRequest if err := json.Unmarshal(body, &requests); err != nil { http.Error(w, "Error parsing batch request", http.StatusBadRequest) return } - type rpcResponse struct { - JSONRPC string `json:"jsonrpc"` - ID interface{} `json:"id"` - Result json.RawMessage `json:"result,omitempty"` - Error interface{} `json:"error,omitempty"` - } - responses := make([]rpcResponse, 0, len(requests)) for _, request := range requests { handled, result, err := p.OverrideRequest(request.Method, request.Params) @@ -224,7 +216,12 @@ func (p *ProxyServer) handleBatchRequest(w http.ResponseWriter, body []byte) { } else if handled { response.Result = result } else { - response.Error = map[string]interface{}{"code": -32601, "message": "method not supported in proxy batch mode"} + forwardedResponse, err := p.forwardRPCRequest(request) + if err != nil { + response.Error = map[string]interface{}{"code": -32000, "message": err.Error()} + } else { + response = forwardedResponse + } } responses = append(responses, response) } @@ -235,6 +232,43 @@ func (p *ProxyServer) handleBatchRequest(w http.ResponseWriter, body []byte) { } } +func (p *ProxyServer) forwardRPCRequest(request rpcRequest) (rpcResponse, error) { + body, err := json.Marshal(request) + if err != nil { + return rpcResponse{}, fmt.Errorf("failed to marshal upstream request: %w", err) + } + + resp, err := http.Post(p.clientURL, "application/json", bytes.NewReader(body)) + if err != nil { + return rpcResponse{}, fmt.Errorf("failed to forward request: %w", err) + } + defer func() { + if err := resp.Body.Close(); err != nil { + p.log.Error("Error closing response body", "err", err) + } + }() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return rpcResponse{}, fmt.Errorf("failed to read upstream response: %w", err) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return rpcResponse{}, fmt.Errorf("upstream request returned status %d: %s", resp.StatusCode, string(respBody)) + } + + var forwardedResponse rpcResponse + if err := json.Unmarshal(respBody, &forwardedResponse); err != nil { + return rpcResponse{}, fmt.Errorf("failed to decode upstream response: %w", err) + } + if forwardedResponse.JSONRPC == "" { + forwardedResponse.JSONRPC = request.JSONRPC + } + if forwardedResponse.ID == nil { + forwardedResponse.ID = request.ID + } + return forwardedResponse, nil +} + func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) (bool, json.RawMessage, error) { switch method { case "eth_getTransactionCount": @@ -242,8 +276,22 @@ func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) if err := json.Unmarshal(rawParams, ¶ms); err != nil { return false, nil, fmt.Errorf("failed to unmarshal params: %w", err) } + if len(params) == 0 { + return false, nil, fmt.Errorf("no params found") + } - nonce := p.mempool.GetTransactionCount(common.HexToAddress(params[0])) + address := common.HexToAddress(params[0]) + nonce, err := p.upstreamTransactionCount(rawParams) + if err != nil { + if observedNonce, ok := p.observedTransactionCount(address); ok { + jsonResponse, _ := json.Marshal(fmt.Sprintf("0x%x", observedNonce)) + return true, jsonResponse, nil + } + return false, nil, err + } + if observedNonce, ok := p.observedTransactionCount(address); ok && observedNonce > nonce { + nonce = observedNonce + } jsonResponse, _ := json.Marshal(fmt.Sprintf("0x%x", nonce)) return true, jsonResponse, nil @@ -275,9 +323,7 @@ func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) return false, nil, fmt.Errorf("failed to decode transaction: %w", err) } - p.mu.Lock() - p.pendingTxs = append(p.pendingTxs, &tx) - p.mu.Unlock() + p.recordPendingTransaction(&tx) txHash := tx.Hash().Hex() jsonResponse, _ := json.Marshal(txHash) @@ -287,10 +333,101 @@ func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) } } +func (p *ProxyServer) upstreamTransactionCount(rawParams json.RawMessage) (uint64, error) { + body, err := json.Marshal(struct { + JSONRPC string `json:"jsonrpc"` + ID int `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` + }{ + JSONRPC: "2.0", + ID: 1, + Method: "eth_getTransactionCount", + Params: rawParams, + }) + if err != nil { + return 0, fmt.Errorf("failed to marshal upstream nonce request: %w", err) + } + + resp, err := http.Post(p.clientURL, "application/json", bytes.NewReader(body)) + if err != nil { + return 0, fmt.Errorf("failed to fetch upstream transaction count: %w", err) + } + defer func() { + if err := resp.Body.Close(); err != nil { + p.log.Error("Error closing response body", "err", err) + } + }() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return 0, fmt.Errorf("failed to read upstream transaction count response: %w", err) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return 0, fmt.Errorf("upstream transaction count request returned status %d: %s", resp.StatusCode, string(respBody)) + } + + var rpcResp struct { + Result json.RawMessage `json:"result"` + Error *struct { + Code int `json:"code"` + Message string `json:"message"` + } `json:"error"` + } + if err := json.Unmarshal(respBody, &rpcResp); err != nil { + return 0, fmt.Errorf("failed to decode upstream transaction count response: %w", err) + } + if rpcResp.Error != nil { + return 0, fmt.Errorf("upstream transaction count error %d: %s", rpcResp.Error.Code, rpcResp.Error.Message) + } + + var nonceHex string + if err := json.Unmarshal(rpcResp.Result, &nonceHex); err != nil { + return 0, fmt.Errorf("failed to decode upstream transaction count result: %w", err) + } + nonce, err := hexutil.DecodeUint64(nonceHex) + if err != nil { + return 0, fmt.Errorf("failed to parse upstream transaction count %q: %w", nonceHex, err) + } + return nonce, nil +} + +func (p *ProxyServer) observedTransactionCount(address common.Address) (uint64, bool) { + p.mu.Lock() + defer p.mu.Unlock() + + nonce, ok := p.nextNonce[address] + return nonce, ok +} + +func (p *ProxyServer) recordPendingTransaction(tx *ethTypes.Transaction) { + from, err := ethTypes.Sender(ethTypes.LatestSignerForChainID(tx.ChainId()), tx) + if err != nil { + p.log.Warn("failed to recover sender for observed transaction", "err", err, "hash", tx.Hash()) + p.mu.Lock() + p.pendingTxs = append(p.pendingTxs, tx) + p.mu.Unlock() + return + } + + nextNonce := tx.Nonce() + 1 + p.mu.Lock() + p.pendingTxs = append(p.pendingTxs, tx) + if nextNonce > p.nextNonce[from] { + p.nextNonce[from] = nextNonce + } + p.mu.Unlock() +} + func (p *ProxyServer) DebugResponse(method string, params json.RawMessage, respBody []byte) { p.log.Debug("method", "method", method) p.log.Debug("params", "params", params) + if !bytes.HasPrefix(respBody, []byte{0x1f, 0x8b}) { + p.log.Debug("Response body", "body", string(respBody)) + return + } + gzipReader, err := gzip.NewReader(bytes.NewReader(respBody)) if err != nil { p.log.Error("Error creating gzip reader", "err", err) diff --git a/runner/clients/common/proxy/proxy_test.go b/runner/clients/common/proxy/proxy_test.go index a5c0678f..013f3ebe 100644 --- a/runner/clients/common/proxy/proxy_test.go +++ b/runner/clients/common/proxy/proxy_test.go @@ -78,7 +78,271 @@ func TestHandleBatchRequestCapturesRawTransactions(t *testing.T) { } } +func TestHandleBatchRequestForwardsPassThroughMethods(t *testing.T) { + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req rpcRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + t.Fatalf("decode upstream request: %v", err) + } + if req.Method != "eth_chainId" { + t.Fatalf("expected eth_chainId, got %s", req.Method) + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]any{ + "jsonrpc": "2.0", + "id": req.ID, + "result": "0x2105", + }); err != nil { + t.Fatalf("encode upstream response: %v", err) + } + })) + defer upstream.Close() + + server := NewProxyServer( + upstream.URL, + log.New(), + 0, + mempool.NewStaticWorkloadMempool(log.New(), big.NewInt(8453)), + ) + + body, err := json.Marshal([]map[string]any{ + { + "jsonrpc": "2.0", + "id": 7, + "method": "eth_chainId", + "params": []string{}, + }, + }) + if err != nil { + t.Fatalf("marshal request: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + server.handleRequest(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + var responses []struct { + Result string `json:"result"` + Error map[string]any `json:"error"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &responses); err != nil { + t.Fatalf("unmarshal response: %v", err) + } + if len(responses) != 1 { + t.Fatalf("expected 1 response, got %d", len(responses)) + } + if responses[0].Error != nil { + t.Fatalf("expected successful response, got error %v", responses[0].Error) + } + if responses[0].Result != "0x2105" { + t.Fatalf("expected forwarded result 0x2105, got %s", responses[0].Result) + } +} + +func TestHandleBatchRequestSupportsMixedForwardAndCapture(t *testing.T) { + chainID := big.NewInt(8453) + tx := signedTestTx(t, chainID) + rawTx, err := tx.MarshalBinary() + if err != nil { + t.Fatalf("marshal tx: %v", err) + } + + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req rpcRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + t.Fatalf("decode upstream request: %v", err) + } + if req.Method != "eth_gasPrice" { + t.Fatalf("expected eth_gasPrice, got %s", req.Method) + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]any{ + "jsonrpc": "2.0", + "id": req.ID, + "result": "0x3b9aca00", + }); err != nil { + t.Fatalf("encode upstream response: %v", err) + } + })) + defer upstream.Close() + + server := NewProxyServer( + upstream.URL, + log.New(), + 0, + mempool.NewStaticWorkloadMempool(log.New(), chainID), + ) + + body, err := json.Marshal([]map[string]any{ + { + "jsonrpc": "2.0", + "id": 0, + "method": "eth_gasPrice", + "params": []string{}, + }, + { + "jsonrpc": "2.0", + "id": 1, + "method": "eth_sendRawTransaction", + "params": []string{hexutil.Encode(rawTx)}, + }, + }) + if err != nil { + t.Fatalf("marshal request: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + server.handleRequest(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + var responses []struct { + Result string `json:"result"` + Error map[string]any `json:"error"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &responses); err != nil { + t.Fatalf("unmarshal response: %v", err) + } + if len(responses) != 2 { + t.Fatalf("expected 2 responses, got %d", len(responses)) + } + if responses[0].Error != nil { + t.Fatalf("expected successful forwarded response, got error %v", responses[0].Error) + } + if responses[0].Result != "0x3b9aca00" { + t.Fatalf("expected forwarded gas price, got %s", responses[0].Result) + } + if responses[1].Error != nil { + t.Fatalf("expected successful captured tx response, got error %v", responses[1].Error) + } + if responses[1].Result != tx.Hash().Hex() { + t.Fatalf("expected tx hash %s, got %s", tx.Hash().Hex(), responses[1].Result) + } + + pending := server.DrainPendingTxs() + if len(pending) != 1 { + t.Fatalf("expected 1 pending tx, got %d", len(pending)) + } + if pending[0].Hash() != tx.Hash() { + t.Fatalf("expected pending tx %s, got %s", tx.Hash(), pending[0].Hash()) + } +} + +func TestGetTransactionCountForwardsUpstreamNonce(t *testing.T) { + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "result": "0xfa", + }); err != nil { + t.Fatalf("encode upstream response: %v", err) + } + })) + defer upstream.Close() + + server := NewProxyServer( + upstream.URL, + log.New(), + 0, + mempool.NewStaticWorkloadMempool(log.New(), big.NewInt(8453)), + ) + + result := callProxyRPC(t, server, "eth_getTransactionCount", []string{common.Address{1}.Hex(), "pending"}) + if result != "0xfa" { + t.Fatalf("expected upstream nonce 0xfa, got %s", result) + } +} + +func TestGetTransactionCountIncludesObservedPendingTransactions(t *testing.T) { + chainID := big.NewInt(8453) + tx := signedTestTxWithNonce(t, chainID, 250) + from, err := types.Sender(types.LatestSignerForChainID(chainID), tx) + if err != nil { + t.Fatalf("recover sender: %v", err) + } + + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "result": "0xfa", + }); err != nil { + t.Fatalf("encode upstream response: %v", err) + } + })) + defer upstream.Close() + + server := NewProxyServer( + upstream.URL, + log.New(), + 0, + mempool.NewStaticWorkloadMempool(log.New(), chainID), + ) + + rawTx, err := tx.MarshalBinary() + if err != nil { + t.Fatalf("marshal tx: %v", err) + } + callProxyRPC(t, server, "eth_sendRawTransaction", []string{hexutil.Encode(rawTx)}) + + result := callProxyRPC(t, server, "eth_getTransactionCount", []string{from.Hex(), "pending"}) + if result != "0xfb" { + t.Fatalf("expected observed nonce 0xfb, got %s", result) + } +} + +func callProxyRPC(t *testing.T, server *ProxyServer, method string, params any) string { + t.Helper() + + body, err := json.Marshal(map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "method": method, + "params": params, + }) + if err != nil { + t.Fatalf("marshal request: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + rec := httptest.NewRecorder() + server.handleRequest(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + var response struct { + Result string `json:"result"` + Error map[string]any `json:"error"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &response); err != nil { + t.Fatalf("unmarshal response: %v", err) + } + if response.Error != nil { + t.Fatalf("expected successful response, got error %v", response.Error) + } + return response.Result +} + func signedTestTx(t *testing.T, chainID *big.Int) *types.Transaction { + return signedTestTxWithNonce(t, chainID, 0) +} + +func signedTestTxWithNonce(t *testing.T, chainID *big.Int, nonce uint64) *types.Transaction { t.Helper() key, err := crypto.GenerateKey() @@ -88,7 +352,7 @@ func signedTestTx(t *testing.T, chainID *big.Int) *types.Transaction { tx := types.NewTx(&types.DynamicFeeTx{ ChainID: chainID, - Nonce: 0, + Nonce: nonce, GasTipCap: big.NewInt(1), GasFeeCap: big.NewInt(1), Gas: 21_000,