Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion runner/benchmark/result_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func (runs *RunGroup) AddResult(testIdx int, runResult RunResult) {
}

const (
BenchmarkRunTag = "BenchmarkRun"
BenchmarkRunTag = "BenchmarkRun"
LoadTestResultsDir = "load-tests"
LoadTestTimestampLayout = "2006-01-02-15-04-05"
)

func RunGroupFromTestPlans(testPlans []TestPlan, machineInfo *MachineInfo) RunGroup {
Expand Down
4 changes: 4 additions & 0 deletions runner/clients/baserethnode/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,7 @@ func (r *BaseRethNodeClient) FlashblocksClient() types.FlashblocksClient {
func (r *BaseRethNodeClient) SupportsFlashblocks() bool {
return true
}

func (r *BaseRethNodeClient) FlashblocksWsURL() string {
return ""
}
9 changes: 9 additions & 0 deletions runner/clients/builder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,12 @@ func (r *BuilderClient) FlashblocksClient() types.FlashblocksClient {
func (r *BuilderClient) SupportsFlashblocks() bool {
return false
}

// FlashblocksWsURL returns the local WebSocket URL of the flashblocks server
// hosted by the builder.
func (r *BuilderClient) FlashblocksWsURL() string {
if r.websocketPort == 0 {
return ""
}
return fmt.Sprintf("ws://localhost:%d", r.websocketPort)
}
77 changes: 75 additions & 2 deletions runner/clients/common/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"io"
"net/http"
"sync"

"github.com/base/base-bench/runner/network/mempool"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -31,6 +32,7 @@ type ProxyServer struct {
pendingTxs []*ethTypes.Transaction
clientURL string
mempool *mempool.StaticWorkloadMempool
mu sync.Mutex
}

func NewProxyServer(clientURL string, log log.Logger, port int, mempool *mempool.StaticWorkloadMempool) *ProxyServer {
Expand Down Expand Up @@ -62,11 +64,29 @@ func (p *ProxyServer) Run(ctx context.Context) error {
}

func (p *ProxyServer) PendingTxs() []*ethTypes.Transaction {
return p.pendingTxs
p.mu.Lock()
defer p.mu.Unlock()

txs := make([]*ethTypes.Transaction, len(p.pendingTxs))
copy(txs, p.pendingTxs)
return txs
}

func (p *ProxyServer) ClearPendingTxs() {
p.mu.Lock()
defer p.mu.Unlock()

p.pendingTxs = make([]*ethTypes.Transaction, 0)
}

func (p *ProxyServer) DrainPendingTxs() []*ethTypes.Transaction {
p.mu.Lock()
defer p.mu.Unlock()

txs := make([]*ethTypes.Transaction, len(p.pendingTxs))
copy(txs, p.pendingTxs)
p.pendingTxs = make([]*ethTypes.Transaction, 0)
return txs
}

// Stop stops both the proxy server and the underlying client
Expand All @@ -89,13 +109,20 @@ func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) {
return
}

var request struct {
type rpcRequest struct {
Method string `json:"method"`
Params json.RawMessage `json:"params"`
ID interface{} `json:"id"`
JSONRPC string `json:"jsonrpc"`
}

if len(body) > 0 && body[0] == '[' {
p.handleBatchRequest(w, body)
return
}

var request rpcRequest

if err := json.Unmarshal(body, &request); err != nil {
http.Error(w, "Error parsing request", http.StatusBadRequest)
return
Expand Down Expand Up @@ -164,6 +191,50 @@ func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) {
p.DebugResponse(request.Method, request.Params, respBody)
}

func (p *ProxyServer) handleBatchRequest(w http.ResponseWriter, body []byte) {
type rpcRequest struct {
Method string `json:"method"`
Params json.RawMessage `json:"params"`
ID interface{} `json:"id"`
JSONRPC string `json:"jsonrpc"`
}

var requests []rpcRequest
if err := json.Unmarshal(body, &requests); err != nil {
http.Error(w, "Error parsing batch request", http.StatusBadRequest)
return
}

type rpcResponse struct {
JSONRPC string `json:"jsonrpc"`
ID interface{} `json:"id"`
Result json.RawMessage `json:"result,omitempty"`
Error interface{} `json:"error,omitempty"`
}

responses := make([]rpcResponse, 0, len(requests))
for _, request := range requests {
handled, result, err := p.OverrideRequest(request.Method, request.Params)
response := rpcResponse{
JSONRPC: "2.0",
ID: request.ID,
}
if err != nil {
response.Error = map[string]interface{}{"code": -32000, "message": err.Error()}
} else if handled {
response.Result = result
} else {
response.Error = map[string]interface{}{"code": -32601, "message": "method not supported in proxy batch mode"}
}
responses = append(responses, response)
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(responses); err != nil {
p.log.Error("Error encoding batch response", "err", err)
}
}

func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) (bool, json.RawMessage, error) {
switch method {
case "eth_getTransactionCount":
Expand Down Expand Up @@ -204,7 +275,9 @@ func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage)
return false, nil, fmt.Errorf("failed to decode transaction: %w", err)
}

p.mu.Lock()
p.pendingTxs = append(p.pendingTxs, &tx)
p.mu.Unlock()

txHash := tx.Hash().Hex()
jsonResponse, _ := json.Marshal(txHash)
Expand Down
104 changes: 104 additions & 0 deletions runner/clients/common/proxy/proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package proxy

import (
"bytes"
"encoding/json"
"math/big"
"net/http"
"net/http/httptest"
"testing"

"github.com/base/base-bench/runner/network/mempool"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
)

func TestHandleBatchRequestCapturesRawTransactions(t *testing.T) {
chainID := big.NewInt(8453)
tx := signedTestTx(t, chainID)
rawTx, err := tx.MarshalBinary()
if err != nil {
t.Fatalf("marshal tx: %v", err)
}

server := NewProxyServer(
"http://127.0.0.1:8545",
log.New(),
0,
mempool.NewStaticWorkloadMempool(log.New(), chainID),
)

body, err := json.Marshal([]map[string]any{
{
"jsonrpc": "2.0",
"id": 0,
"method": "eth_sendRawTransaction",
"params": []string{hexutil.Encode(rawTx)},
},
})
if err != nil {
t.Fatalf("marshal request: %v", err)
}

req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body))
rec := httptest.NewRecorder()

server.handleRequest(rec, req)

if rec.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String())
}

var responses []struct {
Result string `json:"result"`
Error map[string]any `json:"error"`
}
if err := json.Unmarshal(rec.Body.Bytes(), &responses); err != nil {
t.Fatalf("unmarshal response: %v", err)
}
if len(responses) != 1 {
t.Fatalf("expected 1 response, got %d", len(responses))
}
if responses[0].Error != nil {
t.Fatalf("expected successful response, got error %v", responses[0].Error)
}
if responses[0].Result != tx.Hash().Hex() {
t.Fatalf("expected tx hash %s, got %s", tx.Hash().Hex(), responses[0].Result)
}

pending := server.DrainPendingTxs()
if len(pending) != 1 {
t.Fatalf("expected 1 pending tx, got %d", len(pending))
}
if pending[0].Hash() != tx.Hash() {
t.Fatalf("expected pending tx %s, got %s", tx.Hash(), pending[0].Hash())
}
}

func signedTestTx(t *testing.T, chainID *big.Int) *types.Transaction {
t.Helper()

key, err := crypto.GenerateKey()
if err != nil {
t.Fatalf("generate key: %v", err)
}

tx := types.NewTx(&types.DynamicFeeTx{
ChainID: chainID,
Nonce: 0,
GasTipCap: big.NewInt(1),
GasFeeCap: big.NewInt(1),
Gas: 21_000,
To: &common.Address{1},
Value: big.NewInt(1),
})

signed, err := types.SignTx(tx, types.NewIsthmusSigner(chainID), key)
if err != nil {
t.Fatalf("sign tx: %v", err)
}
return signed
}
4 changes: 4 additions & 0 deletions runner/clients/geth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,7 @@ func (g *GethClient) FlashblocksClient() types.FlashblocksClient {
func (g *GethClient) SupportsFlashblocks() bool {
return false
}

func (g *GethClient) FlashblocksWsURL() string {
return ""
}
4 changes: 4 additions & 0 deletions runner/clients/reth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,7 @@ func (r *RethClient) FlashblocksClient() types.FlashblocksClient {
func (r *RethClient) SupportsFlashblocks() bool {
return true
}

func (r *RethClient) FlashblocksWsURL() string {
return ""
}
3 changes: 3 additions & 0 deletions runner/clients/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ type ExecutionClient interface {
SetHead(ctx context.Context, blockNumber uint64) error
FlashblocksClient() FlashblocksClient // returns nil for clients that don't support flashblocks
SupportsFlashblocks() bool // returns true if the client supports receiving flashblock payloads
// FlashblocksWsURL returns the local WebSocket URL hosted by this client,
// or an empty string when the client does not expose one.
FlashblocksWsURL() string
}
6 changes: 3 additions & 3 deletions runner/network/network_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ type NetworkBenchmark struct {
testConfig *benchtypes.TestConfig
proofConfig *benchmark.ProofProgramOptions

transactionPayload payload.Definition
ports portmanager.PortManager
flashblocksBlockTime string
transactionPayload payload.Definition
ports portmanager.PortManager
flashblocksBlockTime string
}

// NewNetworkBenchmark creates a new network benchmark and initializes the payload worker and consensus client
Expand Down
Loading
Loading