Skip to content

Commit 8bf2a7b

Browse files
committed
fix(scheduler): resolve SetSnapshot race conditions and remove usage throttle
Backend: Fix three race conditions in SetSnapshot that caused account scheduling anomalies and broken sticky sessions: - Use Lua CAS script for atomic version activation, preventing version rollback when concurrent goroutines write snapshots simultaneously - Add UnlockBucket to release rebuild lock immediately after completion instead of waiting 30s TTL expiry - Replace immediate DEL of old snapshots with 60s EXPIRE grace period, preventing readers from hitting empty ZRANGE during version switches Frontend: Remove serial queue throttle (1-2s delay per request) from usage loading since backend now uses passive sampling. All usage requests execute immediately in parallel.
1 parent 40feb86 commit 8bf2a7b

7 files changed

Lines changed: 91 additions & 95 deletions

File tree

backend/internal/handler/gateway_handler_warmup_intercept_unit_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ func (f *fakeSchedulerCache) UpdateLastUsed(_ context.Context, _ map[int64]time.
5050
func (f *fakeSchedulerCache) TryLockBucket(_ context.Context, _ service.SchedulerBucket, _ time.Duration) (bool, error) {
5151
return true, nil
5252
}
53+
func (f *fakeSchedulerCache) UnlockBucket(_ context.Context, _ service.SchedulerBucket) error {
54+
return nil
55+
}
5356
func (f *fakeSchedulerCache) ListBuckets(_ context.Context) ([]service.SchedulerBucket, error) {
5457
return nil, nil
5558
}

backend/internal/repository/account_repo_integration_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ func (s *schedulerCacheRecorder) TryLockBucket(ctx context.Context, bucket servi
6464
return true, nil
6565
}
6666

67+
func (s *schedulerCacheRecorder) UnlockBucket(ctx context.Context, bucket service.SchedulerBucket) error {
68+
return nil
69+
}
70+
6771
func (s *schedulerCacheRecorder) ListBuckets(ctx context.Context) ([]service.SchedulerBucket, error) {
6872
return nil, nil
6973
}

backend/internal/repository/scheduler_cache.go

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,49 @@ const (
2424

2525
defaultSchedulerSnapshotMGetChunkSize = 128
2626
defaultSchedulerSnapshotWriteChunkSize = 256
27+
28+
// snapshotGraceTTLSeconds 旧快照过期的宽限期(秒)。
29+
// 替代立即 DEL,让正在读取旧版本的 reader 有足够时间完成 ZRANGE。
30+
snapshotGraceTTLSeconds = 60
31+
)
32+
33+
var (
34+
// activateSnapshotScript 原子 CAS 切换快照版本。
35+
// 仅当新版本号 >= 当前激活版本时才切换,防止并发写入导致版本回滚。
36+
// 旧快照使用 EXPIRE 设置宽限期而非立即 DEL,避免与 reader 竞态。
37+
//
38+
// KEYS[1] = activeKey (sched:active:{bucket})
39+
// KEYS[2] = readyKey (sched:ready:{bucket})
40+
// KEYS[3] = bucketSetKey (sched:buckets)
41+
// KEYS[4] = snapshotKey (新写入的快照 key)
42+
// ARGV[1] = 新版本号字符串
43+
// ARGV[2] = bucket 字符串 (用于 SADD)
44+
// ARGV[3] = 快照 key 前缀 (用于构造旧快照 key)
45+
// ARGV[4] = 宽限期 TTL 秒数
46+
//
47+
// 返回 1 = 已激活, 0 = 版本过旧未激活
48+
activateSnapshotScript = redis.NewScript(`
49+
local currentActive = redis.call('GET', KEYS[1])
50+
local newVersion = tonumber(ARGV[1])
51+
52+
if currentActive ~= false then
53+
local curVersion = tonumber(currentActive)
54+
if curVersion and newVersion < curVersion then
55+
redis.call('DEL', KEYS[4])
56+
return 0
57+
end
58+
end
59+
60+
redis.call('SET', KEYS[1], ARGV[1])
61+
redis.call('SET', KEYS[2], '1')
62+
redis.call('SADD', KEYS[3], ARGV[2])
63+
64+
if currentActive ~= false and currentActive ~= ARGV[1] then
65+
redis.call('EXPIRE', ARGV[3] .. currentActive, tonumber(ARGV[4]))
66+
end
67+
68+
return 1
69+
`)
2770
)
2871

2972
type schedulerCache struct {
@@ -108,9 +151,9 @@ func (c *schedulerCache) GetSnapshot(ctx context.Context, bucket service.Schedul
108151
}
109152

110153
func (c *schedulerCache) SetSnapshot(ctx context.Context, bucket service.SchedulerBucket, accounts []service.Account) error {
111-
activeKey := schedulerBucketKey(schedulerActivePrefix, bucket)
112-
oldActive, _ := c.rdb.Get(ctx, activeKey).Result()
113-
154+
// Phase 1: 分配新版本号并写入快照数据。
155+
// INCR 保证每个调用方获得唯一递增版本号。
156+
// 写入的 snapshotKey 是新的版本化 key,reader 尚不知晓,因此无竞态。
114157
versionKey := schedulerBucketKey(schedulerVersionPrefix, bucket)
115158
version, err := c.rdb.Incr(ctx, versionKey).Result()
116159
if err != nil {
@@ -124,7 +167,6 @@ func (c *schedulerCache) SetSnapshot(ctx context.Context, bucket service.Schedul
124167
return err
125168
}
126169

127-
pipe := c.rdb.Pipeline()
128170
if len(accounts) > 0 {
129171
// 使用序号作为 score,保持数据库返回的排序语义。
130172
members := make([]redis.Z, 0, len(accounts))
@@ -134,25 +176,33 @@ func (c *schedulerCache) SetSnapshot(ctx context.Context, bucket service.Schedul
134176
Member: strconv.FormatInt(account.ID, 10),
135177
})
136178
}
179+
pipe := c.rdb.Pipeline()
137180
for start := 0; start < len(members); start += c.writeChunkSize {
138181
end := start + c.writeChunkSize
139182
if end > len(members) {
140183
end = len(members)
141184
}
142185
pipe.ZAdd(ctx, snapshotKey, members[start:end]...)
143186
}
144-
} else {
145-
pipe.Del(ctx, snapshotKey)
146-
}
147-
pipe.Set(ctx, activeKey, versionStr, 0)
148-
pipe.Set(ctx, schedulerBucketKey(schedulerReadyPrefix, bucket), "1", 0)
149-
pipe.SAdd(ctx, schedulerBucketSetKey, bucket.String())
150-
if _, err := pipe.Exec(ctx); err != nil {
151-
return err
187+
if _, err := pipe.Exec(ctx); err != nil {
188+
return err
189+
}
152190
}
153191

154-
if oldActive != "" && oldActive != versionStr {
155-
_ = c.rdb.Del(ctx, schedulerSnapshotKey(bucket, oldActive)).Err()
192+
// Phase 2: 原子 CAS 激活版本。
193+
// Lua 脚本保证:仅当新版本 >= 当前激活版本时才切换 active 指针,
194+
// 防止并发写入导致版本回滚。
195+
// 旧快照使用 EXPIRE 宽限期而非立即 DEL,避免 reader 竞态。
196+
activeKey := schedulerBucketKey(schedulerActivePrefix, bucket)
197+
readyKey := schedulerBucketKey(schedulerReadyPrefix, bucket)
198+
snapshotKeyPrefix := fmt.Sprintf("%s%d:%s:%s:v", schedulerSnapshotPrefix, bucket.GroupID, bucket.Platform, bucket.Mode)
199+
200+
keys := []string{activeKey, readyKey, schedulerBucketSetKey, snapshotKey}
201+
args := []any{versionStr, bucket.String(), snapshotKeyPrefix, snapshotGraceTTLSeconds}
202+
203+
_, err = activateSnapshotScript.Run(ctx, c.rdb, keys, args...).Result()
204+
if err != nil {
205+
return err
156206
}
157207

158208
return nil
@@ -232,6 +282,11 @@ func (c *schedulerCache) TryLockBucket(ctx context.Context, bucket service.Sched
232282
return c.rdb.SetNX(ctx, key, time.Now().UnixNano(), ttl).Result()
233283
}
234284

285+
func (c *schedulerCache) UnlockBucket(ctx context.Context, bucket service.SchedulerBucket) error {
286+
key := schedulerBucketKey(schedulerLockPrefix, bucket)
287+
return c.rdb.Del(ctx, key).Err()
288+
}
289+
235290
func (c *schedulerCache) ListBuckets(ctx context.Context) ([]service.SchedulerBucket, error) {
236291
raw, err := c.rdb.SMembers(ctx, schedulerBucketSetKey).Result()
237292
if err != nil {

backend/internal/service/scheduler_cache.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type SchedulerCache interface {
5959
UpdateLastUsed(ctx context.Context, updates map[int64]time.Time) error
6060
// TryLockBucket 尝试获取分桶重建锁。
6161
TryLockBucket(ctx context.Context, bucket SchedulerBucket, ttl time.Duration) (bool, error)
62+
// UnlockBucket 释放分桶重建锁。
63+
UnlockBucket(ctx context.Context, bucket SchedulerBucket) error
6264
// ListBuckets 返回已注册的分桶集合。
6365
ListBuckets(ctx context.Context) ([]SchedulerBucket, error)
6466
// GetOutboxWatermark 读取 outbox 水位。

backend/internal/service/scheduler_snapshot_hydration_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ func (c *snapshotHydrationCache) TryLockBucket(ctx context.Context, bucket Sched
4444
return true, nil
4545
}
4646

47+
func (c *snapshotHydrationCache) UnlockBucket(ctx context.Context, bucket SchedulerBucket) error {
48+
return nil
49+
}
50+
4751
func (c *snapshotHydrationCache) ListBuckets(ctx context.Context) ([]SchedulerBucket, error) {
4852
return nil, nil
4953
}

backend/internal/service/scheduler_snapshot_service.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,9 @@ func (s *SchedulerSnapshotService) rebuildBucket(ctx context.Context, bucket Sch
544544
if !ok {
545545
return nil
546546
}
547+
defer func() {
548+
_ = s.cache.UnlockBucket(ctx, bucket)
549+
}()
547550

548551
rebuildCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
549552
defer cancel()
Lines changed: 6 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,93 +1,18 @@
11
/**
2-
* Usage request scheduler — throttles Anthropic API calls by proxy exit.
2+
* Usage request scheduler.
33
*
4-
* Anthropic OAuth/setup-token accounts sharing the same proxy exit are placed
5-
* into a serial queue with a random 1–2s delay between requests, preventing
6-
* upstream 429 rate-limit errors.
7-
*
8-
* Proxy identity = host:port:username — two proxy records pointing to the
9-
* same exit share a single queue. Accounts without a proxy go into a
10-
* "direct" queue.
11-
*
12-
* All other platforms bypass the queue and execute immediately.
4+
* All platforms execute immediately without queuing — the backend uses
5+
* passive sampling so upstream 429 rate-limit errors are no longer a concern.
136
*/
147

158
import type { Account } from '@/types'
169

17-
const GROUP_DELAY_MIN_MS = 1000
18-
const GROUP_DELAY_MAX_MS = 2000
19-
20-
type Task<T> = {
21-
fn: () => Promise<T>
22-
resolve: (value: T) => void
23-
reject: (reason: unknown) => void
24-
}
25-
26-
const queues = new Map<string, Task<unknown>[]>()
27-
const running = new Set<string>()
28-
29-
/** Whether this account needs throttled queuing. */
30-
function needsThrottle(account: Account): boolean {
31-
return (
32-
account.platform === 'anthropic' &&
33-
(account.type === 'oauth' || account.type === 'setup-token')
34-
)
35-
}
36-
37-
/** Build a queue key from proxy connection details. */
38-
function buildGroupKey(account: Account): string {
39-
const proxy = account.proxy
40-
const proxyIdentity = proxy
41-
? `${proxy.host}:${proxy.port}:${proxy.username || ''}`
42-
: 'direct'
43-
return `anthropic:${proxyIdentity}`
44-
}
45-
46-
async function drain(groupKey: string) {
47-
if (running.has(groupKey)) return
48-
running.add(groupKey)
49-
50-
const queue = queues.get(groupKey)
51-
while (queue && queue.length > 0) {
52-
const task = queue.shift()!
53-
try {
54-
const result = await task.fn()
55-
task.resolve(result)
56-
} catch (err) {
57-
task.reject(err)
58-
}
59-
if (queue.length > 0) {
60-
const jitter = GROUP_DELAY_MIN_MS + Math.random() * (GROUP_DELAY_MAX_MS - GROUP_DELAY_MIN_MS)
61-
await new Promise((r) => setTimeout(r, jitter))
62-
}
63-
}
64-
65-
running.delete(groupKey)
66-
queues.delete(groupKey)
67-
}
68-
6910
/**
70-
* Schedule a usage fetch. Anthropic accounts are queued by proxy exit;
71-
* all other platforms execute immediately.
11+
* Schedule a usage fetch. All requests execute immediately.
7212
*/
7313
export function enqueueUsageRequest<T>(
74-
account: Account,
14+
_account: Account,
7515
fn: () => Promise<T>
7616
): Promise<T> {
77-
// Non-Anthropic → fire immediately, no queuing
78-
if (!needsThrottle(account)) {
79-
return fn()
80-
}
81-
82-
const key = buildGroupKey(account)
83-
84-
return new Promise<T>((resolve, reject) => {
85-
let queue = queues.get(key)
86-
if (!queue) {
87-
queue = []
88-
queues.set(key, queue)
89-
}
90-
queue.push({ fn, resolve, reject } as Task<unknown>)
91-
drain(key)
92-
})
17+
return fn()
9318
}

0 commit comments

Comments
 (0)