|
1 | | --- We are declaring the return type to be queue_classic_jobs. |
2 | | --- This is ok since I am assuming that all of the users added queues will |
3 | | --- have identical columns to queue_classic_jobs. |
4 | | --- When QC supports queues with columns other than the default, we will have to change this. |
5 | | - |
6 | | -CREATE OR REPLACE FUNCTION lock_head(q_name varchar, top_boundary integer) |
7 | | -RETURNS SETOF queue_classic_jobs AS $$ |
8 | | -DECLARE |
9 | | - unlocked bigint; |
10 | | - relative_top integer; |
11 | | - job_count integer; |
12 | | -BEGIN |
13 | | - -- The purpose is to release contention for the first spot in the table. |
14 | | - -- The select count(*) is going to slow down dequeue performance but allow |
15 | | - -- for more workers. Would love to see some optimization here... |
16 | | - |
17 | | - EXECUTE 'SELECT count(*) FROM ' |
18 | | - || '(SELECT * FROM queue_classic_jobs ' |
19 | | - || ' WHERE locked_at IS NULL' |
20 | | - || ' AND q_name = ' |
21 | | - || quote_literal(q_name) |
22 | | - || ' AND scheduled_at <= ' |
23 | | - || quote_literal(now()) |
24 | | - || ' LIMIT ' |
25 | | - || quote_literal(top_boundary) |
26 | | - || ') limited' |
27 | | - INTO job_count; |
28 | | - |
29 | | - SELECT TRUNC(random() * (top_boundary - 1)) |
30 | | - INTO relative_top; |
31 | | - |
32 | | - IF job_count < top_boundary THEN |
33 | | - relative_top = 0; |
34 | | - END IF; |
35 | | - |
36 | | - LOOP |
37 | | - BEGIN |
38 | | - EXECUTE 'SELECT id FROM queue_classic_jobs ' |
39 | | - || ' WHERE locked_at IS NULL' |
40 | | - || ' AND q_name = ' |
41 | | - || quote_literal(q_name) |
42 | | - || ' AND scheduled_at <= ' |
43 | | - || quote_literal(now()) |
44 | | - || ' ORDER BY id ASC' |
45 | | - || ' LIMIT 1' |
46 | | - || ' OFFSET ' || quote_literal(relative_top) |
47 | | - || ' FOR UPDATE NOWAIT' |
48 | | - INTO unlocked; |
49 | | - EXIT; |
50 | | - EXCEPTION |
51 | | - WHEN lock_not_available THEN |
52 | | - -- do nothing. loop again and hope we get a lock |
53 | | - END; |
54 | | - END LOOP; |
55 | | - |
56 | | - RETURN QUERY EXECUTE 'UPDATE queue_classic_jobs ' |
57 | | - || ' SET locked_at = (CURRENT_TIMESTAMP),' |
58 | | - || ' locked_by = (select pg_backend_pid())' |
59 | | - || ' WHERE id = $1' |
60 | | - || ' AND locked_at is NULL' |
61 | | - || ' RETURNING *' |
62 | | - USING unlocked; |
63 | | - |
64 | | - RETURN; |
65 | | -END $$ LANGUAGE plpgsql; |
66 | | - |
67 | | -CREATE OR REPLACE FUNCTION lock_head(tname varchar) RETURNS SETOF queue_classic_jobs AS $$ BEGIN |
68 | | - RETURN QUERY EXECUTE 'SELECT * FROM lock_head($1,10)' USING tname; |
69 | | -END $$ LANGUAGE plpgsql; |
70 | | - |
71 | 1 | -- queue_classic_notify function and trigger |
72 | 2 | CREATE FUNCTION queue_classic_notify() RETURNS TRIGGER AS $$ BEGIN |
73 | 3 | perform pg_notify(new.q_name, ''); RETURN NULL; |
|
0 commit comments