Skip to content

Commit f4f6e9a

Browse files
fix: show realtime active account users
1 parent 50dae3e commit f4f6e9a

22 files changed

Lines changed: 331 additions & 82 deletions

backend/cmd/server/wire_gen.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/internal/handler/gateway_handler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
125125
h.errorResponse(c, http.StatusInternalServerError, "api_error", "User context not found")
126126
return
127127
}
128+
c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID))
128129
reqLog := requestLogger(
129130
c,
130131
"handler.gateway.messages",
@@ -322,7 +323,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
322323
}
323324

324325
for {
325-
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, reqModel, fs.FailedAccountIDs, "", int64(0)) // Gemini 不使用会话限制
326+
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, reqModel, fs.FailedAccountIDs, "", subject.UserID) // Gemini 不使用会话限制
326327
if err != nil {
327328
if len(fs.FailedAccountIDs) == 0 {
328329
reqLog.Warn("gateway.select_account_no_available",

backend/internal/handler/gateway_handler_chat_completions.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) {
3636
h.chatCompletionsErrorResponse(c, http.StatusInternalServerError, "api_error", "User context not found")
3737
return
3838
}
39+
c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID))
3940
reqLog := requestLogger(
4041
c,
4142
"handler.gateway.chat_completions",
@@ -166,7 +167,7 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) {
166167
fs := NewFailoverState(h.maxAccountSwitches, false)
167168

168169
for {
169-
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionHash, reqModel, fs.FailedAccountIDs, "", int64(0))
170+
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionHash, reqModel, fs.FailedAccountIDs, "", subject.UserID)
170171
if err != nil {
171172
if len(fs.FailedAccountIDs) == 0 {
172173
h.chatCompletionsErrorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error())

backend/internal/handler/gateway_handler_responses.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) {
3636
h.responsesErrorResponse(c, http.StatusInternalServerError, "api_error", "User context not found")
3737
return
3838
}
39+
c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID))
3940
reqLog := requestLogger(
4041
c,
4142
"handler.gateway.responses",
@@ -171,7 +172,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) {
171172
fs := NewFailoverState(h.maxAccountSwitches, false)
172173

173174
for {
174-
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionHash, reqModel, fs.FailedAccountIDs, "", int64(0))
175+
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionHash, reqModel, fs.FailedAccountIDs, "", subject.UserID)
175176
if err != nil {
176177
if len(fs.FailedAccountIDs) == 0 {
177178
h.responsesErrorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error())

backend/internal/handler/gateway_handler_warmup_intercept_unit_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ func (f *fakeConcurrencyCache) GetAccountConcurrencyBatch(_ context.Context, acc
138138
}
139139
func (f *fakeConcurrencyCache) CleanupExpiredAccountSlots(context.Context, int64) error { return nil }
140140
func (f *fakeConcurrencyCache) CleanupStaleProcessSlots(context.Context, string) error { return nil }
141+
func (f *fakeConcurrencyCache) GetAccountActiveUserConcurrency(context.Context, int64) (map[int64]int, error) {
142+
return nil, nil
143+
}
141144

142145
func newTestGatewayHandler(t *testing.T, group *service.Group, accounts []*service.Account) (*GatewayHandler, func()) {
143146
t.Helper()

backend/internal/handler/gateway_helper_fastpath_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ func (m *concurrencyCacheMock) CleanupStaleProcessSlots(ctx context.Context, act
9393
return nil
9494
}
9595

96+
func (m *concurrencyCacheMock) GetAccountActiveUserConcurrency(ctx context.Context, accountID int64) (map[int64]int, error) {
97+
return nil, nil
98+
}
99+
96100
func TestConcurrencyHelper_TryAcquireUserSlot(t *testing.T) {
97101
cache := &concurrencyCacheMock{
98102
acquireUserSlotFn: func(ctx context.Context, userID int64, maxConcurrency int, requestID string) (bool, error) {

backend/internal/handler/gateway_helper_hotpath_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ func (s *helperConcurrencyCacheStub) CleanupStaleProcessSlots(ctx context.Contex
124124
return nil
125125
}
126126

127+
func (s *helperConcurrencyCacheStub) GetAccountActiveUserConcurrency(ctx context.Context, accountID int64) (map[int64]int, error) {
128+
return nil, nil
129+
}
130+
127131
func newHelperTestContext(method, path string) (*gin.Context, *httptest.ResponseRecorder) {
128132
gin.SetMode(gin.TestMode)
129133
rec := httptest.NewRecorder()

backend/internal/handler/gemini_v1beta_handler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
143143
googleError(c, http.StatusInternalServerError, "User context not found")
144144
return
145145
}
146+
c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), authSubject.UserID))
146147
reqLog := requestLogger(
147148
c,
148149
"handler.gemini_v1beta.models",
@@ -369,7 +370,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
369370
}
370371

371372
for {
372-
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, modelName, fs.FailedAccountIDs, "", int64(0)) // Gemini 不使用会话限制
373+
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, modelName, fs.FailedAccountIDs, "", authSubject.UserID) // Gemini 不使用会话限制
373374
if err != nil {
374375
if len(fs.FailedAccountIDs) == 0 {
375376
googleError(c, http.StatusServiceUnavailable, "No available Gemini accounts: "+err.Error())

backend/internal/handler/openai_gateway_handler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
113113
if !h.ensureResponsesDependencies(c, reqLog) {
114114
return
115115
}
116+
c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID))
116117

117118
// Read request body
118119
body, err := pkghttputil.ReadRequestBodyWithPrealloc(c.Request)
@@ -572,6 +573,7 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) {
572573
if !h.ensureResponsesDependencies(c, reqLog) {
573574
return
574575
}
576+
c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID))
575577

576578
body, err := pkghttputil.ReadRequestBodyWithPrealloc(c.Request)
577579
if err != nil {
@@ -1097,6 +1099,7 @@ func (h *OpenAIGatewayHandler) ResponsesWebSocket(c *gin.Context) {
10971099
if !h.ensureResponsesDependencies(c, reqLog) {
10981100
return
10991101
}
1102+
c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID))
11001103
reqLog.Info("openai.websocket_ingress_started")
11011104
clientIP := ip.GetClientIP(c)
11021105
userAgent := strings.TrimSpace(c.GetHeader("User-Agent"))

backend/internal/pkg/ctxkey/ctxkey.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,7 @@ const (
5555

5656
// ClaudeCodeVersion stores the extracted Claude Code version from User-Agent (e.g. "2.1.22")
5757
ClaudeCodeVersion Key = "ctx_claude_code_version"
58+
59+
// Sub2APIUserID 当前请求的系统用户 ID,用于账号并发槽位中编码用户信息以支持实时活跃用户统计
60+
Sub2APIUserID Key = "ctx_sub2api_user_id"
5861
)

0 commit comments

Comments
 (0)