Skip to content

Commit 21672cd

Browse files
authored
reorg specific blocks (#314)
* reorg selected blocks for chain * log fix * publish missing block * skip idempotency check
1 parent 1230c51 commit 21672cd

9 files changed

Lines changed: 602 additions & 10 deletions

File tree

cmd/reorgapi.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package cmd
2+
3+
import (
4+
"github.com/rs/zerolog/log"
5+
"github.com/spf13/cobra"
6+
"github.com/thirdweb-dev/indexer/internal/libs"
7+
"github.com/thirdweb-dev/indexer/internal/reorgapi"
8+
)
9+
10+
var reorgAPICmd = &cobra.Command{
11+
Use: "reorg-api",
12+
Short: "HTTP API to publish manual reorg fixes to Kafka",
13+
Long: `Loads old block data from ClickHouse, fetches canonical data from RPC, and publishes
14+
to Kafka using the same reorg semantics as automatic reorg handling (old blocks as deleted, then new blocks).
15+
16+
Requires the same env as committer for RPC, ClickHouse, and Kafka (no S3).
17+
18+
Example:
19+
curl -sS -X POST http://localhost:8080/v1/reorg/publish \
20+
-H 'Content-Type: application/json' \
21+
-d '{"chain_id":8453,"block_numbers":[12345,12346]}'`,
22+
Run: runReorgAPI,
23+
}
24+
25+
func runReorgAPI(cmd *cobra.Command, args []string) {
26+
libs.InitRPCClient()
27+
libs.InitNewClickHouseV2()
28+
libs.InitKafkaV2ForRole("reorg-api")
29+
libs.InitRedis()
30+
31+
log.Info().Str("chain_id", libs.ChainIdStr).Msg("starting reorg-api")
32+
33+
if err := reorgapi.RunHTTPServer(); err != nil {
34+
log.Fatal().Err(err).Msg("reorg-api server exited with error")
35+
}
36+
}

cmd/root.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func init() {
3333
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "optional config file path (defaults to env-only when unset)")
3434
rootCmd.AddCommand(committerCmd)
3535
rootCmd.AddCommand(backfillCmd)
36+
rootCmd.AddCommand(reorgAPICmd)
3637
}
3738

3839
func initConfig() {

configs/config.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@ type LogConfig struct {
1616
}
1717

1818
type KafkaConfig struct {
19-
Brokers string `mapstructure:"brokers"`
20-
Username string `mapstructure:"username"`
21-
Password string `mapstructure:"password"`
22-
EnableTLS bool `mapstructure:"enableTLS"`
19+
Brokers string `mapstructure:"brokers"`
20+
Username string `mapstructure:"username"`
21+
Password string `mapstructure:"password"`
22+
EnableTLS bool `mapstructure:"enableTLS"`
23+
ProducerRole string
2324
}
2425

2526
type RPCBatchRequestConfig struct {
@@ -90,6 +91,12 @@ type Config struct {
9091
RedisDB int `env:"REDIS_DB" envDefault:"0"`
9192
ValidationMode string `env:"VALIDATION_MODE" envDefault:"minimal"`
9293
EnableReorgValidation bool `env:"ENABLE_REORG_VALIDATION" envDefault:"true"`
94+
// ReorgAPIListenAddr is the bind address for the manual reorg publish HTTP server (reorg-api command).
95+
ReorgAPIListenAddr string `env:"REORG_API_LISTEN_ADDR" envDefault:":8080"`
96+
// ReorgAPIKey, when non-empty, requires requests to send Authorization: Bearer <ReorgAPIKey>.
97+
ReorgAPIKey string `env:"REORG_API_KEY"`
98+
// ReorgAPIClickhouseBatchSize is how many block numbers to load from ClickHouse per reorg-api sub-request (manual reorg publish).
99+
ReorgAPIClickhouseBatchSize uint64 `env:"REORG_API_CLICKHOUSE_BATCH_SIZE" envDefault:"10"`
93100
}
94101

95102
var Cfg Config

internal/libs/clickhouse.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ import (
55
"crypto/tls"
66
"fmt"
77
"math/big"
8+
"slices"
89
"strconv"
910
"strings"
1011
"sync"
1112

1213
"github.com/rs/zerolog/log"
14+
"golang.org/x/sync/errgroup"
1315

1416
"github.com/ClickHouse/clickhouse-go/v2"
1517
config "github.com/thirdweb-dev/indexer/configs"
@@ -250,6 +252,153 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl
250252
return blockData, nil
251253
}
252254

255+
func joinUint64sForIN(nums []uint64) string {
256+
var b strings.Builder
257+
b.Grow(len(nums) * 12)
258+
for i, n := range nums {
259+
if i > 0 {
260+
b.WriteByte(',')
261+
}
262+
b.WriteString(strconv.FormatUint(n, 10))
263+
}
264+
return b.String()
265+
}
266+
267+
func queryBlocksByBlockNumbers(chainId uint64, nums []uint64) ([]common.Block, error) {
268+
if len(nums) == 0 {
269+
return nil, nil
270+
}
271+
q := fmt.Sprintf(
272+
"SELECT %s FROM %s.blocks FINAL WHERE chain_id = %d AND block_number IN (%s) ORDER BY block_number",
273+
strings.Join(defaultBlockFields, ", "),
274+
config.Cfg.CommitterClickhouseDatabase,
275+
chainId,
276+
joinUint64sForIN(nums),
277+
)
278+
return execQueryV2[common.Block](q)
279+
}
280+
281+
func queryTransactionsByBlockNumbers(chainId uint64, nums []uint64) ([]common.Transaction, error) {
282+
if len(nums) == 0 {
283+
return nil, nil
284+
}
285+
q := fmt.Sprintf(
286+
"SELECT %s FROM %s.transactions FINAL WHERE chain_id = %d AND block_number IN (%s) ORDER BY block_number, transaction_index",
287+
strings.Join(defaultTransactionFields, ", "),
288+
config.Cfg.CommitterClickhouseDatabase,
289+
chainId,
290+
joinUint64sForIN(nums),
291+
)
292+
return execQueryV2[common.Transaction](q)
293+
}
294+
295+
func queryLogsByBlockNumbers(chainId uint64, nums []uint64) ([]common.Log, error) {
296+
if len(nums) == 0 {
297+
return nil, nil
298+
}
299+
q := fmt.Sprintf(
300+
"SELECT %s FROM %s.logs FINAL WHERE chain_id = %d AND block_number IN (%s) ORDER BY block_number, log_index",
301+
strings.Join(defaultLogFields, ", "),
302+
config.Cfg.CommitterClickhouseDatabase,
303+
chainId,
304+
joinUint64sForIN(nums),
305+
)
306+
return execQueryV2[common.Log](q)
307+
}
308+
309+
func queryTracesByBlockNumbers(chainId uint64, nums []uint64) ([]common.Trace, error) {
310+
if len(nums) == 0 {
311+
return nil, nil
312+
}
313+
q := fmt.Sprintf(
314+
"SELECT %s FROM %s.traces FINAL WHERE chain_id = %d AND block_number IN (%s) ORDER BY block_number, transaction_index",
315+
strings.Join(defaultTraceFields, ", "),
316+
config.Cfg.CommitterClickhouseDatabase,
317+
chainId,
318+
joinUint64sForIN(nums),
319+
)
320+
return execQueryV2[common.Trace](q)
321+
}
322+
323+
// GetBlockDataFromClickHouseForBlockNumbers loads stored block data for specific block numbers using
324+
// block_number IN (...). Callers that send very large lists should chunk requests (e.g. reorg-api batches by REORG_API_CLICKHOUSE_BATCH_SIZE).
325+
func GetBlockDataFromClickHouseForBlockNumbers(chainId uint64, blockNumbers []uint64) ([]*common.BlockData, error) {
326+
if len(blockNumbers) == 0 {
327+
return nil, nil
328+
}
329+
nums := slices.Clone(blockNumbers)
330+
slices.Sort(nums)
331+
nums = slices.Compact(nums)
332+
333+
var blocks []common.Block
334+
var txs []common.Transaction
335+
var logs []common.Log
336+
var traces []common.Trace
337+
g := new(errgroup.Group)
338+
g.Go(func() (err error) {
339+
blocks, err = queryBlocksByBlockNumbers(chainId, nums)
340+
return err
341+
})
342+
g.Go(func() (err error) {
343+
txs, err = queryTransactionsByBlockNumbers(chainId, nums)
344+
return err
345+
})
346+
g.Go(func() (err error) {
347+
logs, err = queryLogsByBlockNumbers(chainId, nums)
348+
return err
349+
})
350+
g.Go(func() (err error) {
351+
traces, err = queryTracesByBlockNumbers(chainId, nums)
352+
return err
353+
})
354+
if err := g.Wait(); err != nil {
355+
return nil, err
356+
}
357+
358+
blocksByNum := make(map[uint64]common.Block, len(blocks))
359+
for _, b := range blocks {
360+
if b.Number != nil {
361+
blocksByNum[b.Number.Uint64()] = b
362+
}
363+
}
364+
txByNum := make(map[uint64][]common.Transaction)
365+
for _, t := range txs {
366+
if t.BlockNumber != nil {
367+
bn := t.BlockNumber.Uint64()
368+
txByNum[bn] = append(txByNum[bn], t)
369+
}
370+
}
371+
logsByNum := make(map[uint64][]common.Log)
372+
for _, l := range logs {
373+
if l.BlockNumber != nil {
374+
bn := l.BlockNumber.Uint64()
375+
logsByNum[bn] = append(logsByNum[bn], l)
376+
}
377+
}
378+
tracesByNum := make(map[uint64][]common.Trace)
379+
for _, tr := range traces {
380+
if tr.BlockNumber != nil {
381+
bn := tr.BlockNumber.Uint64()
382+
tracesByNum[bn] = append(tracesByNum[bn], tr)
383+
}
384+
}
385+
386+
out := make([]*common.BlockData, 0, len(nums))
387+
for _, bn := range nums {
388+
b, ok := blocksByNum[bn]
389+
if !ok || b.ChainId == nil || b.Number == nil || b.ChainId.Uint64() == 0 {
390+
continue
391+
}
392+
out = append(out, &common.BlockData{
393+
Block: b,
394+
Transactions: txByNum[bn],
395+
Logs: logsByNum[bn],
396+
Traces: tracesByNum[bn],
397+
})
398+
}
399+
return out, nil
400+
}
401+
253402
// GetTransactionMismatchRangeFromClickHouseV2 checks, for blocks in the given range,
254403
// where the stored transaction_count in the blocks table does not match the number
255404
// of transactions in the transactions table. It returns the minimum and maximum

internal/libs/kafka.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,17 @@ import (
99
var KafkaPublisherV2 *storage.KafkaPublisher
1010

1111
func InitKafkaV2() {
12+
InitKafkaV2ForRole("committer")
13+
}
14+
15+
func InitKafkaV2ForRole(role string) {
1216
var err error
1317
KafkaPublisherV2, err = storage.NewKafkaPublisher(&config.KafkaConfig{
14-
Brokers: config.Cfg.CommitterKafkaBrokers,
15-
Username: config.Cfg.CommitterKafkaUsername,
16-
Password: config.Cfg.CommitterKafkaPassword,
17-
EnableTLS: config.Cfg.CommitterKafkaEnableTLS,
18+
Brokers: config.Cfg.CommitterKafkaBrokers,
19+
Username: config.Cfg.CommitterKafkaUsername,
20+
Password: config.Cfg.CommitterKafkaPassword,
21+
EnableTLS: config.Cfg.CommitterKafkaEnableTLS,
22+
ProducerRole: role,
1823
})
1924
if err != nil {
2025
log.Fatal().Err(err).Msg("Failed to initialize Kafka publisher")

0 commit comments

Comments
 (0)