Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions block/DIVERGENCE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Divergence from Main Branch

This branch (`perf/block-optimization`) introduces breaking changes to maximize performance in the `block/` package. It is **not wire-compatible** with the main branch.

## 1. Combined Header+Data Blobs

**Main**: Headers and data are submitted as separate blobs to separate DA namespaces (`HeaderNamespace`, `DataNamespace`). On retrieval, the DA retriever fetches from both namespaces, decodes headers and data independently, then matches them by block height.

**This branch**: Headers and data are combined into a single blob using a custom binary encoding (`common.MarshalBlockBlob`/`UnmarshalBlockBlob`). Each blob contains the proto-encoded header, proto-encoded data, and the envelope signature, separated by length prefixes with a magic number prefix (`0x45564E44`). Only the `HeaderNamespace` is used.

### Why
- Eliminates matching overhead (no separate header/data pending maps)
- Halves DA submission round trips (one blob per block instead of two)
- Simplifies DA inclusion tracking (single check per block)
- Removes the `DAHeaderEnvelope` protobuf wrapper and the separate `SignedData` protobuf wrapper

## 2. Custom Binary Blob Encoding

**Main**: DA blobs use protobuf encoding (`DAHeaderEnvelope` for headers, `SignedData` for data). Each involves allocating proto message structs, converting Go types to proto types, and calling `proto.Marshal`.

**This branch**: The combined blob wrapper uses a custom binary format: `[magic 4B][header_len 4B][header_bytes][data_len 4B][data_bytes][sig_len 4B][sig_bytes]`. Individual header and data fields are still proto-encoded internally (hash computation requires it), but the envelope wrapper avoids all proto overhead.

### Why
- Zero allocation for the blob wrapper (direct length-prefixed binary)
- No proto message pool management for the envelope
- No `ToProto`/`FromProto` conversion for the DA envelope or signed data
- Simpler and faster encode/decode path

## 3. P2P Sync Removed

**Main**: Full nodes sync from both P2P (via `go-header` `HeaderSyncService`/`DataSyncService`) and DA. The executor broadcasts produced blocks to P2P peers. P2P events include DA height hints for targeted DA retrieval. The syncer runs a P2P worker loop alongside the DA follower.

**This branch**: All P2P sync is removed. Nodes sync exclusively from DA. No P2P broadcasting, no P2P stores, no P2P handler, no DA height hints.

### Removed code
- `syncing/p2p_handler.go` — entire file deleted
- `syncing/syncer_mock.go` — P2P handler mock deleted
- `common/expected_interfaces.go` — `HeaderP2PBroadcaster`/`DataP2PBroadcaster` types removed
- P2P broadcasting in `executing/executor.go` removed
- P2P worker loop in syncer removed
- `SourceP2P` event source removed
- `DaHeightHints` field removed from `DAHeightEvent`
- `headerStore`/`dataStore` parameters removed from `NewSyncer` and component constructors
- `headerSyncService`/`dataSyncService` parameters removed from aggregator component constructors
- `DAHintAppender` interface removed from DA submitter
- Separate `SubmitHeaders`/`SubmitData` replaced with single `SubmitBlocks`

### Why
- Removes network overhead from P2P gossip
- Eliminates the complexity of two sync sources competing
- DA is the single source of truth, reducing consistency issues
- Removes libp2p dependency from the block package's hot path
- Simplifies the syncer from 3 worker loops to 2 (process loop + pending worker)

## 4. DA Submitter Simplified

**Main**: `DASubmitterAPI` has two methods: `SubmitHeaders` and `SubmitData`, each with separate batching strategies, mutex locks, and retry loops.

**This branch**: `DASubmitterAPI` has a single `SubmitBlocks` method that takes headers and data together, creates combined blobs, signs them, and submits to a single namespace. One batching strategy, one mutex, one retry loop.

### Why
- Halves the submission loop complexity
- Eliminates the envelope cache (no more retry-signing concern)
- Single retry loop instead of two
- Combined blobs submitted atomically — no partial header-without-data states

## Migration Notes

- Existing DA data from main branch is **not readable** by this branch (different blob format)
- This branch requires a fresh start or a migration tool
- The `P2PSignedHeader` and `P2PData` types still exist in `types/` but are no longer used by the block package
- External consumers of `NewSyncComponents` and `NewAggregatorWithCatchupComponents` must update their call sites
43 changes: 13 additions & 30 deletions block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"

"github.com/celestiaorg/go-header"
"github.com/rs/zerolog"

"github.com/evstack/ev-node/block/internal/cache"
Expand All @@ -22,9 +21,7 @@ import (
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/signer"
"github.com/evstack/ev-node/pkg/store"
"github.com/evstack/ev-node/pkg/sync"
"github.com/evstack/ev-node/pkg/telemetry"
"github.com/evstack/ev-node/types"
)

// Components represents the block-related components
Expand Down Expand Up @@ -133,18 +130,14 @@ func (bc *Components) Stop() error {
}

// NewSyncComponents creates components for a non-aggregator full node that can only sync blocks.
// Non-aggregator full nodes can sync from P2P and DA but cannot produce blocks or submit to DA.
// Non-aggregator full nodes can sync from DA but cannot produce blocks or submit to DA.
// They have more sync capabilities than light nodes but no block production. No signer required.
func NewSyncComponents(
config config.Config,
genesis genesis.Genesis,
store store.Store,
exec coreexecutor.Executor,
daClient da.Client,
headerStore header.Store[*types.P2PSignedHeader],
dataStore header.Store[*types.P2PData],
headerDAHintAppender submitting.DAHintAppender,
dataDAHintAppender submitting.DAHintAppender,
logger zerolog.Logger,
metrics *Metrics,
blockOpts BlockOptions,
Expand All @@ -166,8 +159,6 @@ func NewSyncComponents(
metrics,
config,
genesis,
headerStore,
dataStore,
logger,
blockOpts,
errorCh,
Expand All @@ -182,10 +173,10 @@ func NewSyncComponents(
if p, ok := exec.(coreexecutor.ExecPruner); ok {
execPruner = p
}
pruner := pruner.New(logger, store, execPruner, config.Pruning, config.Node.BlockTime.Duration, config.DA.Address)
prunerObj := pruner.New(logger, store, execPruner, config.Pruning, config.Node.BlockTime.Duration, config.DA.Address)

// Create submitter for sync nodes (no signer, only DA inclusion processing)
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerDAHintAppender, dataDAHintAppender)
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
if config.Instrumentation.IsTracingEnabled() {
daSubmitter = submitting.WithTracingDASubmitter(daSubmitter)
}
Expand All @@ -207,13 +198,13 @@ func NewSyncComponents(
Syncer: syncer,
Submitter: submitter,
Cache: cacheManager,
Pruner: pruner,
Pruner: prunerObj,
errorCh: errorCh,
}, nil
}

// newAggregatorComponents creates components for an aggregator full node that can produce and sync blocks.
// Aggregator nodes have full capabilities - they can produce blocks, sync from P2P and DA,
// Aggregator nodes have full capabilities - they can produce blocks, sync from DA,
// and submit headers/data to DA. Requires a signer for block production and DA submission.
func newAggregatorComponents(
config config.Config,
Expand All @@ -223,8 +214,6 @@ func newAggregatorComponents(
sequencer coresequencer.Sequencer,
daClient da.Client,
signer signer.Signer,
headerSyncService *sync.HeaderSyncService,
dataSyncService *sync.DataSyncService,
logger zerolog.Logger,
metrics *Metrics,
blockOpts BlockOptions,
Expand Down Expand Up @@ -252,8 +241,6 @@ func newAggregatorComponents(
metrics,
config,
genesis,
headerSyncService,
dataSyncService,
logger,
blockOpts,
errorCh,
Expand All @@ -271,7 +258,7 @@ func newAggregatorComponents(
if p, ok := exec.(coreexecutor.ExecPruner); ok {
execPruner = p
}
pruner := pruner.New(logger, store, execPruner, config.Pruning, config.Node.BlockTime.Duration, config.DA.Address)
prunerObj := pruner.New(logger, store, execPruner, config.Pruning, config.Node.BlockTime.Duration, config.DA.Address)

reaper, err := reaping.NewReaper(
exec,
Expand All @@ -286,17 +273,17 @@ func newAggregatorComponents(
return nil, fmt.Errorf("failed to create reaper: %w", err)
}

if config.Node.BasedSequencer { // no submissions needed for bases sequencer
if config.Node.BasedSequencer { // no submissions needed for based sequencer
return &Components{
Executor: executor,
Pruner: pruner,
Pruner: prunerObj,
Reaper: reaper,
Cache: cacheManager,
errorCh: errorCh,
}, nil
}

var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerSyncService, dataSyncService)
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
if config.Instrumentation.IsTracingEnabled() {
daSubmitter = submitting.WithTracingDASubmitter(daSubmitter)
}
Expand All @@ -316,7 +303,7 @@ func newAggregatorComponents(

return &Components{
Executor: executor,
Pruner: pruner,
Pruner: prunerObj,
Reaper: reaper,
Submitter: submitter,
Cache: cacheManager,
Expand All @@ -325,10 +312,10 @@ func newAggregatorComponents(
}

// NewAggregatorWithCatchupComponents creates aggregator components that include a Syncer
// for DA/P2P catchup before block production begins.
// for DA catchup before block production begins.
//
// The caller should:
// 1. Start the Syncer and wait for DA head + P2P catchup
// 1. Start the Syncer and wait for DA head catchup
// 2. Stop the Syncer and set Components.Syncer = nil
// 3. Call Components.Start() — which will start the Executor and other components
func NewAggregatorWithCatchupComponents(
Expand All @@ -339,16 +326,14 @@ func NewAggregatorWithCatchupComponents(
sequencer coresequencer.Sequencer,
daClient da.Client,
signer signer.Signer,
headerSyncService *sync.HeaderSyncService,
dataSyncService *sync.DataSyncService,
logger zerolog.Logger,
metrics *Metrics,
blockOpts BlockOptions,
raftNode common.RaftNode,
) (*Components, error) {
bc, err := newAggregatorComponents(
config, genesis, store, exec, sequencer, daClient, signer,
headerSyncService, dataSyncService, logger, metrics, blockOpts, raftNode,
logger, metrics, blockOpts, raftNode,
)
if err != nil {
return nil, err
Expand All @@ -364,8 +349,6 @@ func NewAggregatorWithCatchupComponents(
metrics,
config,
genesis,
headerSyncService.Store(),
dataSyncService.Store(),
logger,
blockOpts,
catchupErrCh,
Expand Down
42 changes: 7 additions & 35 deletions block/components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,10 @@ import (
"github.com/evstack/ev-node/pkg/signer/noop"
"github.com/evstack/ev-node/pkg/store"
testmocks "github.com/evstack/ev-node/test/mocks"
extmocks "github.com/evstack/ev-node/test/mocks/external"
"github.com/evstack/ev-node/types"
)

// noopDAHintAppender is a no-op implementation of DAHintAppender for testing
type noopDAHintAppender struct{}

func (n noopDAHintAppender) AppendDAHint(ctx context.Context, daHeight uint64, heights ...uint64) error {
return nil
}

// Test the error channel mechanism works as intended
func TestBlockComponents_ExecutionClientFailure_StopsNode(t *testing.T) {
// Test the error channel mechanism works as intended

// Create a mock component that simulates execution client failure
errorCh := make(chan error, 1)
criticalError := errors.New("execution client connection lost")
Expand All @@ -62,13 +52,13 @@ func TestBlockComponents_ExecutionClientFailure_StopsNode(t *testing.T) {
assert.Contains(t, err.Error(), "execution client connection lost")
}

// Simple lifecycle test without creating full components
func TestBlockComponents_StartStop_Lifecycle(t *testing.T) {
// Simple lifecycle test without creating full components
// Test that Start and Stop work without hanging
bc := &Components{
errorCh: make(chan error, 1),
}

// Test that Start and Stop work without hanging
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

Expand Down Expand Up @@ -96,26 +86,12 @@ func TestNewSyncComponents_Creation(t *testing.T) {
daClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe()
daClient.On("HasForcedInclusionNamespace").Return(false).Maybe()

// Create mock P2P stores
mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t)
mockDataStore := extmocks.NewMockStore[*types.P2PData](t)

// Create noop DAHintAppenders for testing
headerHintAppender := noopDAHintAppender{}
dataHintAppender := noopDAHintAppender{}

// Just test that the constructor doesn't panic - don't start the components
// to avoid P2P store dependencies
components, err := NewSyncComponents(
cfg,
gen,
memStore,
mockExec,
daClient,
mockHeaderStore,
mockDataStore,
headerHintAppender,
dataHintAppender,
zerolog.Nop(),
NopMetrics(),
DefaultBlockOptions(),
Expand Down Expand Up @@ -171,12 +147,10 @@ func TestNewAggregatorComponents_Creation(t *testing.T) {
mockSeq,
daClient,
mockSigner,
nil, // header broadcaster
nil, // data broadcaster
zerolog.Nop(),
NopMetrics(),
DefaultBlockOptions(),
nil, // raftNode
nil,
)

require.NoError(t, err)
Expand All @@ -189,9 +163,9 @@ func TestNewAggregatorComponents_Creation(t *testing.T) {
assert.Nil(t, components.Syncer) // Aggregator nodes currently don't create syncers in this constructor
}

// This test verifies that when the executor's execution client calls fail,
// the error is properly propagated through the error channel and stops the node
func TestExecutor_RealExecutionClientFailure_StopsNode(t *testing.T) {
// This test verifies that when the executor's execution client calls fail,
// the error is properly propagated through the error channel and stops the node
synctest.Test(t, func(t *testing.T) {
ds := sync.MutexWrap(datastore.NewMapDatastore())
memStore := store.New(ds)
Expand Down Expand Up @@ -255,12 +229,10 @@ func TestExecutor_RealExecutionClientFailure_StopsNode(t *testing.T) {
mockSeq,
daClient,
testSigner,
nil, // header broadcaster
nil, // data broadcaster
zerolog.Nop(),
NopMetrics(),
DefaultBlockOptions(),
nil, // raftNode
nil,
)
require.NoError(t, err)

Expand Down
Loading
Loading