Skip to content

Commit 1230c51

Browse files
authored
lag tip by x blocks (#313)
* lag tip by x blocks
1 parent 0168ca2 commit 1230c51

2 files changed

Lines changed: 62 additions & 47 deletions

File tree

configs/config.go

Lines changed: 45 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -42,50 +42,54 @@ type RPCConfig struct {
4242
}
4343

4444
type Config struct {
45-
RPC RPCConfig `mapstructure:"rpc"`
46-
Log LogConfig `mapstructure:"log"`
47-
ZeetProjectName string `env:"ZEET_PROJECT_NAME" envDefault:"insight-indexer"`
48-
ZeetDeploymentId string `env:"ZEET_DEPLOYMENT_ID"`
49-
ZeetClusterId string `env:"ZEET_CLUSTER_ID"`
50-
CommitterClickhouseDatabase string `env:"COMMITTER_CLICKHOUSE_DATABASE"`
51-
CommitterClickhouseHost string `env:"COMMITTER_CLICKHOUSE_HOST"`
52-
CommitterClickhousePort int `env:"COMMITTER_CLICKHOUSE_PORT"`
53-
CommitterClickhouseUsername string `env:"COMMITTER_CLICKHOUSE_USERNAME"`
54-
CommitterClickhousePassword string `env:"COMMITTER_CLICKHOUSE_PASSWORD"`
55-
CommitterClickhouseEnableTLS bool `env:"COMMITTER_CLICKHOUSE_ENABLE_TLS" envDefault:"true"`
56-
CommitterKafkaBrokers string `env:"COMMITTER_KAFKA_BROKERS"`
57-
CommitterKafkaUsername string `env:"COMMITTER_KAFKA_USERNAME"`
58-
CommitterKafkaPassword string `env:"COMMITTER_KAFKA_PASSWORD"`
59-
CommitterKafkaEnableTLS bool `env:"COMMITTER_KAFKA_ENABLE_TLS" envDefault:"true"`
60-
CommitterMaxMemoryMB int `env:"COMMITTER_MAX_MEMORY_MB" envDefault:"512"`
61-
CommitterCompressionThresholdMB int `env:"COMMITTER_COMPRESSION_THRESHOLD_MB" envDefault:"50"`
62-
CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"`
63-
CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"`
64-
CommitterLagByBlocks uint64 `env:"COMMITTER_LAG_BY_BLOCKS" envDefault:"0"`
45+
RPC RPCConfig `mapstructure:"rpc"`
46+
Log LogConfig `mapstructure:"log"`
47+
ZeetProjectName string `env:"ZEET_PROJECT_NAME" envDefault:"insight-indexer"`
48+
ZeetDeploymentId string `env:"ZEET_DEPLOYMENT_ID"`
49+
ZeetClusterId string `env:"ZEET_CLUSTER_ID"`
50+
CommitterClickhouseDatabase string `env:"COMMITTER_CLICKHOUSE_DATABASE"`
51+
CommitterClickhouseHost string `env:"COMMITTER_CLICKHOUSE_HOST"`
52+
CommitterClickhousePort int `env:"COMMITTER_CLICKHOUSE_PORT"`
53+
CommitterClickhouseUsername string `env:"COMMITTER_CLICKHOUSE_USERNAME"`
54+
CommitterClickhousePassword string `env:"COMMITTER_CLICKHOUSE_PASSWORD"`
55+
CommitterClickhouseEnableTLS bool `env:"COMMITTER_CLICKHOUSE_ENABLE_TLS" envDefault:"true"`
56+
CommitterKafkaBrokers string `env:"COMMITTER_KAFKA_BROKERS"`
57+
CommitterKafkaUsername string `env:"COMMITTER_KAFKA_USERNAME"`
58+
CommitterKafkaPassword string `env:"COMMITTER_KAFKA_PASSWORD"`
59+
CommitterKafkaEnableTLS bool `env:"COMMITTER_KAFKA_ENABLE_TLS" envDefault:"true"`
60+
CommitterMaxMemoryMB int `env:"COMMITTER_MAX_MEMORY_MB" envDefault:"512"`
61+
CommitterCompressionThresholdMB int `env:"COMMITTER_COMPRESSION_THRESHOLD_MB" envDefault:"50"`
62+
CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"`
63+
CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"`
64+
CommitterLagByBlocks uint64 `env:"COMMITTER_LAG_BY_BLOCKS" envDefault:"0"`
65+
// PollerLag subtracts this many blocks from the RPC head only when COMMITTER_IS_LIVE
66+
// is true, so the highest published block stays roughly this far behind the true tip.
67+
// Ignored for non-live catch-up (effective head stays rpcLatest; CommitterLagByBlocks unchanged).
68+
PollerLag uint64 `env:"POLLER_LAG" envDefault:"0"`
6569
// CommitterStartBlock, when set (>0), forces the committer to start publishing
6670
// from this block number regardless of what ClickHouse says is already committed.
6771
// This can cause duplicate publishing if ClickHouse already contains higher blocks.
68-
CommitterStartBlock uint64 `env:"COMMITTER_START_BLOCK" envDefault:"0"`
69-
StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"`
70-
StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"`
71-
StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"`
72-
StagingS3SecretAccessKey string `env:"STAGING_S3_SECRET_ACCESS_KEY"`
73-
StagingS3MaxParallelFileDownload int `env:"STAGING_S3_MAX_PARALLEL_FILE_DOWNLOAD" envDefault:"2"`
74-
BackfillStartBlock uint64 `env:"BACKFILL_START_BLOCK"`
75-
BackfillEndBlock uint64 `env:"BACKFILL_END_BLOCK"`
76-
RPCNumParallelCalls uint64 `env:"RPC_NUM_PARALLEL_CALLS" envDefault:"20"`
77-
RPCBatchSize uint64 `env:"RPC_BATCH_SIZE" envDefault:"10"`
78-
RPCBatchMaxMemoryUsageMB uint64 `env:"RPC_BATCH_MAX_MEMORY_USAGE_MB" envDefault:"32"`
79-
RPCDisableBlockReceipts bool `env:"RPC_DISABLE_BLOCK_RECEIPTS" envDefault:"false"`
80-
ParquetMaxFileSizeMB int64 `env:"PARQUET_MAX_FILE_SIZE_MB" envDefault:"512"`
81-
InsightServiceUrl string `env:"INSIGHT_SERVICE_URL" envDefault:"https://insight.thirdweb.com"`
82-
InsightServiceApiKey string `env:"INSIGHT_SERVICE_API_KEY"`
83-
RedisAddr string `env:"REDIS_ADDR" envDefault:"localhost:6379"`
84-
RedisUsername string `env:"REDIS_USERNAME"`
85-
RedisPassword string `env:"REDIS_PASSWORD"`
86-
RedisDB int `env:"REDIS_DB" envDefault:"0"`
87-
ValidationMode string `env:"VALIDATION_MODE" envDefault:"minimal"`
88-
EnableReorgValidation bool `env:"ENABLE_REORG_VALIDATION" envDefault:"true"`
72+
CommitterStartBlock uint64 `env:"COMMITTER_START_BLOCK" envDefault:"0"`
73+
StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"`
74+
StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"`
75+
StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"`
76+
StagingS3SecretAccessKey string `env:"STAGING_S3_SECRET_ACCESS_KEY"`
77+
StagingS3MaxParallelFileDownload int `env:"STAGING_S3_MAX_PARALLEL_FILE_DOWNLOAD" envDefault:"2"`
78+
BackfillStartBlock uint64 `env:"BACKFILL_START_BLOCK"`
79+
BackfillEndBlock uint64 `env:"BACKFILL_END_BLOCK"`
80+
RPCNumParallelCalls uint64 `env:"RPC_NUM_PARALLEL_CALLS" envDefault:"20"`
81+
RPCBatchSize uint64 `env:"RPC_BATCH_SIZE" envDefault:"10"`
82+
RPCBatchMaxMemoryUsageMB uint64 `env:"RPC_BATCH_MAX_MEMORY_USAGE_MB" envDefault:"32"`
83+
RPCDisableBlockReceipts bool `env:"RPC_DISABLE_BLOCK_RECEIPTS" envDefault:"false"`
84+
ParquetMaxFileSizeMB int64 `env:"PARQUET_MAX_FILE_SIZE_MB" envDefault:"512"`
85+
InsightServiceUrl string `env:"INSIGHT_SERVICE_URL" envDefault:"https://insight.thirdweb.com"`
86+
InsightServiceApiKey string `env:"INSIGHT_SERVICE_API_KEY"`
87+
RedisAddr string `env:"REDIS_ADDR" envDefault:"localhost:6379"`
88+
RedisUsername string `env:"REDIS_USERNAME"`
89+
RedisPassword string `env:"REDIS_PASSWORD"`
90+
RedisDB int `env:"REDIS_DB" envDefault:"0"`
91+
ValidationMode string `env:"VALIDATION_MODE" envDefault:"minimal"`
92+
EnableReorgValidation bool `env:"ENABLE_REORG_VALIDATION" envDefault:"true"`
8993
}
9094

9195
var Cfg Config

internal/committer/poollatest.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,26 @@ func pollLatest() error {
2626
time.Sleep(250 * time.Millisecond)
2727
continue
2828
}
29-
// Update latest block number metric
30-
metrics.CommitterLatestBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(latestBlock.Uint64()))
29+
rpcLatest := latestBlock.Uint64()
30+
effectiveLatest := rpcLatest
31+
if config.Cfg.CommitterIsLive {
32+
if config.Cfg.PollerLag < rpcLatest {
33+
effectiveLatest = rpcLatest - config.Cfg.PollerLag
34+
} else {
35+
effectiveLatest = 0
36+
}
37+
}
38+
39+
// Update latest block number metric (RPC head, not poller-lag-adjusted)
40+
metrics.CommitterLatestBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(rpcLatest))
3141

32-
if nextBlockNumber+config.Cfg.CommitterLagByBlocks >= latestBlock.Uint64() {
42+
if nextBlockNumber+config.Cfg.CommitterLagByBlocks >= effectiveLatest {
3343
time.Sleep(250 * time.Millisecond)
3444
continue
3545
}
3646

3747
// will panic if any block is invalid
38-
blockDataArray := libblockdata.GetValidBlockDataInBatch(latestBlock.Uint64(), nextBlockNumber)
48+
blockDataArray := libblockdata.GetValidBlockDataInBatch(effectiveLatest, nextBlockNumber)
3949

4050
// Validate that all blocks are sequential and nothing is missing
4151
expectedBlockNumber := nextBlockNumber
@@ -80,9 +90,10 @@ func pollLatest() error {
8090
metrics.CommitterIsLive.WithLabelValues(indexerName, chainIdStr).Set(1)
8191
}
8292

83-
if !config.Cfg.CommitterIsLive && latestBlock.Int64()-int64(nextBlockNumber) < 20 && !hasRightsized {
93+
if !config.Cfg.CommitterIsLive && int64(effectiveLatest)-int64(nextBlockNumber) < 20 && !hasRightsized {
8494
log.Debug().
85-
Uint64("latest_block", latestBlock.Uint64()).
95+
Uint64("rpc_latest_block", rpcLatest).
96+
Uint64("effective_latest_block", effectiveLatest).
8697
Uint64("next_commit_block", nextBlockNumber).
8798
Msg("Latest block is close to next commit block. Resizing s3 committer")
8899
libs.RightsizeS3Committer()

0 commit comments

Comments
 (0)