@@ -16,17 +16,40 @@ import { blobToUint8Array } from "@App/pkg/utils/datatype";
1616import { readBlobContent } from "@App/pkg/utils/encoding" ;
1717import { Semaphore , withTimeoutNotify } from "@App/pkg/utils/concurrency-control" ;
1818
19- /** 同时发起的最大 fetch 数量,避免大量请求冲击同一服务器 */
20- const MAX_CONCURRENT_FETCHES = 5 ;
21- /** fetch 前的随机延迟范围(ms),分散请求时间 */
22- const FETCH_DELAY_MIN_MS = 100 ;
23- const FETCH_DELAY_MAX_MS = 150 ;
24- /** fetch 超时后释放信号量的时间(ms),不会中止 fetch 本身 */
25- const FETCH_SEMAPHORE_TIMEOUT_MS = 800 ;
19+ /**
20+ * 滑动窗口并发上限:同时"已启动、尚未归还槽位"的 fetch 数量。
21+ * 超过此数量的请求会排队等待槽位释放后再启动,
22+ * 避免瞬间大量请求冲击同一 server(被误判为 DDoS)。
23+ */
24+ const MAX_ACTIVE_FETCHES = 5 ;
25+
26+ /** fetch 启动前的随机抖动范围(ms),分散对同一 server 的请求时间 */
27+ const FETCH_JITTER_MIN_MS = 100 ;
28+ const FETCH_JITTER_MAX_MS = 150 ;
29+
30+ /**
31+ * 滑动窗口超时(ms):
32+ * fetch 启动后若超过此时间仍无响应,提前归还并发槽位,
33+ * 允许队列中的下一个请求启动——但原 fetch 继续运行,
34+ * 响应回来后仍会被正常处理。
35+ *
36+ * 这是"槽位滑动"而非"取消请求":
37+ * - 慢响应不会阻塞后续请求的启动(需求 3)
38+ * - 慢响应最终到达时仍会被处理(需求 4)
39+ * - 同时活跃的 fetch 数受 MAX_ACTIVE_FETCHES 控制(需求 1 & 2)
40+ */
41+ const FETCH_SLOT_SLIDE_TIMEOUT_MS = 800 ;
42+
2643/** 资源缓存过期时间(ms),24小时 */
27- const RESOURCE_CACHE_TTL_MS = 86400_000 ;
44+ const RESOURCE_CACHE_TTL_MS = 86_400_000 ;
2845
29- const fetchSemaphore = new Semaphore ( MAX_CONCURRENT_FETCHES ) ;
46+ /**
47+ * 滑动窗口并发控制器(Sliding Window Semaphore)。
48+ * 持有槽位 = "已启动 fetch 且尚未超时或完成"。
49+ * 超时后槽位提前归还,让下一个 fetch 可以启动,
50+ * 而超时的 fetch 本身继续跑直到响应或网络错误。
51+ */
52+ const concurrentFetchSlots = new Semaphore ( MAX_ACTIVE_FETCHES ) ;
3053
3154export class ResourceService {
3255 logger : Logger ;
@@ -274,26 +297,30 @@ export class ResourceService {
274297 async createResourceByUrlFetch ( u : TUrlSRIInfo , type : ResourceType ) : Promise < Resource > {
275298 const url = u . url ; // 无 URI Integrity Hash
276299
277- let released = false ;
278- await fetchSemaphore . acquire ( ) ;
279- // Semaphore 锁 - 同期只有五个 fetch 一起执行
280- const delay = randNum ( FETCH_DELAY_MIN_MS , FETCH_DELAY_MAX_MS ) ;
281- await sleep ( delay ) ;
282- // 执行 fetch, 若超时则不中止 fetch 但释放信号量,让下一个任务启动
283- const { result, err } = await withTimeoutNotify (
284- fetch ( url ) ,
285- FETCH_SEMAPHORE_TIMEOUT_MS ,
286- ( { done, timeouted, err } ) => {
287- if ( timeouted || done || err ) {
288- // fetch 成功 或 发生错误 或 timeout 时解锁
289- if ( ! released ) {
290- released = true ;
291- fetchSemaphore . release ( ) ;
292- }
293- }
300+ // 等待并发槽位(滑动窗口入口)
301+ await concurrentFetchSlots . acquire ( ) ;
302+
303+ // releaseSlotOnce 保证槽位只归还一次,无论经由 timeout 路径还是正常完成路径
304+ let slotReleased = false ;
305+ const releaseSlotOnce = ( ) => {
306+ if ( ! slotReleased ) {
307+ slotReleased = true ;
308+ concurrentFetchSlots . release ( ) ;
294309 }
295- ) ;
296- // Semaphore 锁已解锁。继续处理 fetch Response 的结果
310+ } ;
311+
312+ // 随机抖动:分散对同一 server 的请求启动时间,降低被限速的概率
313+ await sleep ( randNum ( FETCH_JITTER_MIN_MS , FETCH_JITTER_MAX_MS ) ) ;
314+
315+ // 滑动窗口语义:
316+ // - fetch 超时 (timeouted=true) → 提前归还槽位,下一个请求可以启动
317+ // - fetch 完成/失败 (done=true) → 归还槽位(若 timeout 已归还则为 no-op)
318+ // 原 fetch 在超时后仍继续运行,响应到达时照常处理(不会被取消)
319+ const { result, err } = await withTimeoutNotify ( fetch ( url ) , FETCH_SLOT_SLIDE_TIMEOUT_MS , ( { done, timeouted } ) => {
320+ if ( timeouted || done ) {
321+ releaseSlotOnce ( ) ;
322+ }
323+ } ) ;
297324
298325 if ( err ) {
299326 throw new Error ( `resource fetch failed: ${ err . message || err } ` ) ;
0 commit comments