|
| 1 | +import { assert } from "https://deno.land/std@0.208.0/assert/mod.ts"; |
| 2 | +import { ThrottleRequest, ThrottleResponse } from "../actors/limiter.ts"; |
1 | 3 | import { RuntimeError, ScriptContext } from "../module.gen.ts"; |
2 | 4 |
|
3 | 5 | export interface Request { |
@@ -28,58 +30,23 @@ export async function run( |
28 | 30 | ctx: ScriptContext, |
29 | 31 | req: Request, |
30 | 32 | ): Promise<Response> { |
31 | | - interface TokenBucket { |
32 | | - tokens: number; |
33 | | - lastRefill: Date; |
34 | | - } |
| 33 | + assert(req.requests > 0); |
| 34 | + assert(req.period > 0); |
| 35 | + |
| 36 | + // Create key |
| 37 | + const key = `${JSON.stringify(req.type)}.${JSON.stringify(req.key)}`; |
35 | 38 |
|
36 | | - // Update the token bucket |
37 | | - // |
38 | | - // `TokenBucket` is an unlogged table which are significantly faster to |
39 | | - // write to than regular tables, but are not durable. This is important |
40 | | - // because this script will be called on every request. |
41 | | - const rows = await ctx.db.$queryRawUnsafe<TokenBucket[]>( |
42 | | - ` |
43 | | - WITH |
44 | | - "UpdateBucket" AS ( |
45 | | - UPDATE "${ctx.dbSchema}"."TokenBuckets" b |
46 | | - SET |
47 | | - "tokens" = CASE |
48 | | - -- Reset the bucket and consume 1 token |
49 | | - WHEN now() > b."lastRefill" + make_interval(secs => $4) THEN $3 - 1 |
50 | | - -- Consume 1 token |
51 | | - ELSE b.tokens - 1 |
52 | | - END, |
53 | | - "lastRefill" = CASE |
54 | | - WHEN now() > b."lastRefill" + make_interval(secs => $4) THEN now() |
55 | | - ELSE b."lastRefill" |
56 | | - END |
57 | | - WHERE b."type" = $1 AND b."key" = $2 |
58 | | - RETURNING b."tokens", b."lastRefill" |
59 | | - ), |
60 | | - inserted AS ( |
61 | | - INSERT INTO "${ctx.dbSchema}"."TokenBuckets" ("type", "key", "tokens", "lastRefill") |
62 | | - SELECT $1, $2, $3 - 1, now() |
63 | | - WHERE NOT EXISTS (SELECT 1 FROM "UpdateBucket") |
64 | | - RETURNING "tokens", "lastRefill" |
65 | | - ) |
66 | | - SELECT * FROM "UpdateBucket" |
67 | | - UNION ALL |
68 | | - SELECT * FROM inserted; |
69 | | - `, |
70 | | - req.type, |
71 | | - req.key, |
72 | | - req.requests, |
73 | | - req.period, |
74 | | - ); |
75 | | - const { tokens, lastRefill } = rows[0]; |
| 39 | + // Throttle request |
| 40 | + const res = await ctx.actors.limiter.getOrCreateAndCall<undefined, ThrottleRequest, ThrottleResponse>(key, undefined, "throttle", { |
| 41 | + requests: req.requests, |
| 42 | + period: req.period, |
| 43 | + }); |
76 | 44 |
|
77 | | - // If the bucket is empty, throw an error |
78 | | - if (tokens < 0) { |
| 45 | + // Check if allowed |
| 46 | + if (!res.success) { |
79 | 47 | throw new RuntimeError("RATE_LIMIT_EXCEEDED", { |
80 | 48 | meta: { |
81 | | - retryAfter: new Date(lastRefill.getTime() + req.period * 1000) |
82 | | - .toUTCString(), |
| 49 | + retryAfter: new Date(res.refillAt).toUTCString(), |
83 | 50 | }, |
84 | 51 | }); |
85 | 52 | } |
|
0 commit comments