Skip to content

Commit e99cdbe

Browse files
abrichrclaude
andauthored
feat: add heartbeat + bot-side reaper for worker reliability (#26)
* feat: add heartbeat + bot-side reaper for worker reliability Worker writes heartbeat_at every 30s during processing. Bot reaper runs every 60s, detects stale running jobs (heartbeat >90s old), re-queues them with CAS safety. Includes Telegram notifications and startup crash recovery. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: add missing .select() to reaper update query to fix TS error The Supabase `.update()` without `.select()` returns `null` for data, causing TypeScript to infer it as `never`. Adding `.select('id')` matches the pattern used in the re-queue path above. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1db5df3 commit e99cdbe

5 files changed

Lines changed: 225 additions & 3 deletions

File tree

apps/bot/src/index.ts

Lines changed: 137 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88

99
import { execFileSync } from 'child_process'
1010
import { Bot, InlineKeyboard, type Context } from 'grammy'
11-
import { JOB_STATUS, type Job, type JobEvent } from '@wright/shared'
11+
import { JOB_STATUS, REAPER_INTERVAL_MS, STALE_HEARTBEAT_MS, STALE_CLAIMED_MS, type Job, type JobEvent } from '@wright/shared'
1212
import {
13+
getSupabase,
1314
insertJob,
1415
getJob,
1516
getJobByPrefix,
@@ -889,6 +890,138 @@ bot.catch((err) => {
889890
console.error('Unhandled bot error:', err)
890891
})
891892

893+
// ---------------------------------------------------------------------------
894+
// Stale job reaper — detects dead workers via heartbeat expiry
895+
// ---------------------------------------------------------------------------
896+
897+
function startReaper(): void {
898+
const sb = getSupabase()
899+
900+
setInterval(async () => {
901+
try {
902+
const cutoff = new Date(Date.now() - STALE_HEARTBEAT_MS).toISOString()
903+
904+
// Find running jobs with stale or missing heartbeats
905+
const { data: staleJobs, error } = await sb
906+
.from('job_queue')
907+
.select('id, attempt, max_attempts, worker_id, telegram_chat_id, task, heartbeat_at, started_at')
908+
.eq('status', 'running')
909+
.or(`heartbeat_at.lt.${cutoff},and(heartbeat_at.is.null,started_at.lt.${cutoff})`)
910+
911+
if (error) {
912+
console.error('[reaper] Query error:', error.message)
913+
return
914+
}
915+
916+
if (!staleJobs || staleJobs.length === 0) return
917+
918+
console.log(`[reaper] Found ${staleJobs.length} stale running job(s)`)
919+
920+
for (const job of staleJobs) {
921+
if (job.attempt < job.max_attempts) {
922+
// Re-queue for retry
923+
const { data: updated } = await sb
924+
.from('job_queue')
925+
.update({
926+
status: 'queued',
927+
worker_id: null,
928+
claimed_at: null,
929+
started_at: null,
930+
heartbeat_at: null,
931+
attempt: job.attempt + 1,
932+
error: `Re-queued by reaper: worker stopped responding (attempt ${job.attempt + 1}/${job.max_attempts})`,
933+
})
934+
.eq('id', job.id)
935+
.eq('status', 'running') // CAS: only if still running
936+
.select('id')
937+
938+
if (updated && updated.length > 0) {
939+
console.log(`[reaper] Re-queued job ${job.id} (attempt ${job.attempt + 1}/${job.max_attempts})`)
940+
941+
if (job.telegram_chat_id) {
942+
try {
943+
await bot.api.sendMessage(
944+
job.telegram_chat_id,
945+
`<b>[${job.id.slice(0, 8)}]</b> Worker stopped responding. `
946+
+ `Re-queuing automatically (attempt ${job.attempt + 1}/${job.max_attempts}).`,
947+
{ parse_mode: 'HTML' },
948+
)
949+
} catch {
950+
// Best effort notification
951+
}
952+
}
953+
954+
wakeWorker()
955+
}
956+
} else {
957+
// Max attempts exceeded — mark as permanently failed
958+
const { data: updated } = await sb
959+
.from('job_queue')
960+
.update({
961+
status: 'failed',
962+
completed_at: new Date().toISOString(),
963+
heartbeat_at: null,
964+
error: `Failed: worker stopped responding after ${job.max_attempts} attempts`,
965+
})
966+
.eq('id', job.id)
967+
.eq('status', 'running') // CAS
968+
.select('id')
969+
970+
if (updated && updated.length > 0) {
971+
console.log(`[reaper] Job ${job.id} permanently failed (max attempts)`)
972+
973+
if (job.telegram_chat_id) {
974+
try {
975+
await bot.api.sendMessage(
976+
job.telegram_chat_id,
977+
`<b>[${job.id.slice(0, 8)}]</b> Worker stopped responding. `
978+
+ `Job has failed permanently after ${job.max_attempts} attempts.`,
979+
{ parse_mode: 'HTML' },
980+
)
981+
} catch {
982+
// Best effort notification
983+
}
984+
}
985+
}
986+
}
987+
}
988+
989+
// Also check for stale claimed jobs (worker died before transitioning to running)
990+
const claimedCutoff = new Date(Date.now() - STALE_CLAIMED_MS).toISOString()
991+
const { data: staleClaimed } = await sb
992+
.from('job_queue')
993+
.select('id, worker_id')
994+
.eq('status', 'claimed')
995+
.lt('claimed_at', claimedCutoff)
996+
997+
if (staleClaimed && staleClaimed.length > 0) {
998+
for (const job of staleClaimed) {
999+
await sb
1000+
.from('job_queue')
1001+
.update({
1002+
status: 'queued',
1003+
worker_id: null,
1004+
claimed_at: null,
1005+
heartbeat_at: null,
1006+
error: `Re-queued by reaper: claimed by ${job.worker_id} but never started`,
1007+
})
1008+
.eq('id', job.id)
1009+
.eq('status', 'claimed') // CAS
1010+
1011+
console.log(`[reaper] Reset stale claimed job ${job.id}`)
1012+
}
1013+
wakeWorker()
1014+
}
1015+
} catch (err) {
1016+
console.error('[reaper] Unexpected error:', err)
1017+
}
1018+
}, REAPER_INTERVAL_MS)
1019+
1020+
console.log(
1021+
`[reaper] Stale job reaper started (interval: ${REAPER_INTERVAL_MS}ms, staleness: ${STALE_HEARTBEAT_MS}ms)`,
1022+
)
1023+
}
1024+
8921025
// ---------------------------------------------------------------------------
8931026
// Startup
8941027
// ---------------------------------------------------------------------------
@@ -901,6 +1034,9 @@ async function main(): Promise<void> {
9011034
// intentional -- we want a loud failure at startup.
9021035
startRealtimeBridge()
9031036

1037+
// Start the stale job reaper — detects dead workers via heartbeat expiry
1038+
startReaper()
1039+
9041040
// Start long polling. This will block until the process is stopped.
9051041
console.log('Bot is now polling for updates.')
9061042
await bot.start({

apps/worker/src/queue-poller.ts

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { createClient, type SupabaseClient } from '@supabase/supabase-js'
22
import type { Job } from '@wright/shared'
3-
import { POLL_INTERVAL_MS, STALE_CLAIMED_MS, STALE_RUNNING_MS } from '@wright/shared'
3+
import { POLL_INTERVAL_MS, STALE_CLAIMED_MS, STALE_RUNNING_MS, HEARTBEAT_INTERVAL_MS } from '@wright/shared'
44
import { runDevLoop } from './dev-loop.js'
55

66
// Worker identity — use Fly machine ID if available, otherwise hostname
@@ -120,6 +120,7 @@ export async function requeueCurrentJob(): Promise<string | null> {
120120
worker_id: null,
121121
claimed_at: null,
122122
started_at: null,
123+
heartbeat_at: null,
123124
attempt: job.attempt + 1,
124125
error: `Re-queued: worker shutdown (SIGTERM), attempt ${job.attempt + 1}/${job.max_attempts}`,
125126
})
@@ -141,6 +142,7 @@ export async function requeueCurrentJob(): Promise<string | null> {
141142
.update({
142143
status: 'failed',
143144
completed_at: new Date().toISOString(),
145+
heartbeat_at: null,
144146
error: `Failed after ${job.max_attempts} attempts (worker restarts)`,
145147
})
146148
.eq('id', job.id)
@@ -237,13 +239,26 @@ async function processJob(job: Job): Promise<void> {
237239
`[queue-poller] Processing job ${job.id} (attempt ${job.attempt})`,
238240
)
239241

242+
// Start heartbeat interval — proves this worker is alive while processing
243+
const heartbeatTimer = setInterval(async () => {
244+
try {
245+
await supabase!
246+
.from('job_queue')
247+
.update({ heartbeat_at: new Date().toISOString() })
248+
.eq('id', job.id)
249+
} catch (err) {
250+
console.error(`[heartbeat] Failed to update heartbeat for ${job.id}:`, err)
251+
}
252+
}, HEARTBEAT_INTERVAL_MS)
253+
240254
try {
241-
// Mark as running
255+
// Mark as running with initial heartbeat
242256
await supabase
243257
.from('job_queue')
244258
.update({
245259
status: 'running',
246260
started_at: new Date().toISOString(),
261+
heartbeat_at: new Date().toISOString(),
247262
})
248263
.eq('id', job.id)
249264

@@ -297,6 +312,7 @@ async function processJob(job: Job): Promise<void> {
297312
})
298313
.eq('id', job.id)
299314
} finally {
315+
clearInterval(heartbeatTimer)
300316
if (onJobEnd) onJobEnd(job.id)
301317
currentJob = null
302318
currentAbortController = null
@@ -329,11 +345,53 @@ async function startupCleanup(): Promise<void> {
329345
status: 'queued',
330346
worker_id: null,
331347
claimed_at: null,
348+
heartbeat_at: null,
332349
})
333350
.eq('id', job.id)
334351
}
335352
}
336353

354+
// 1b. Reset jobs still 'running' for this worker (interrupted by crash)
355+
const { data: staleRunningThisWorker } = await supabase
356+
.from('job_queue')
357+
.select('id, attempt, max_attempts')
358+
.eq('status', 'running')
359+
.eq('worker_id', WORKER_ID)
360+
361+
if (staleRunningThisWorker && staleRunningThisWorker.length > 0) {
362+
console.log(
363+
`[queue-poller] Found ${staleRunningThisWorker.length} running job(s) from this worker (crash recovery)`,
364+
)
365+
for (const job of staleRunningThisWorker) {
366+
if (job.attempt < job.max_attempts) {
367+
await supabase
368+
.from('job_queue')
369+
.update({
370+
status: 'queued',
371+
worker_id: null,
372+
claimed_at: null,
373+
started_at: null,
374+
heartbeat_at: null,
375+
attempt: job.attempt + 1,
376+
error: `Re-queued: worker crash recovery on startup (attempt ${job.attempt + 1}/${job.max_attempts})`,
377+
})
378+
.eq('id', job.id)
379+
console.log(`[queue-poller] Re-queued running job ${job.id} (attempt ${job.attempt + 1}/${job.max_attempts})`)
380+
} else {
381+
await supabase
382+
.from('job_queue')
383+
.update({
384+
status: 'failed',
385+
completed_at: new Date().toISOString(),
386+
heartbeat_at: null,
387+
error: `Failed: worker crashed after ${job.max_attempts} attempts`,
388+
})
389+
.eq('id', job.id)
390+
console.log(`[queue-poller] Job ${job.id} permanently failed (max attempts exceeded)`)
391+
}
392+
}
393+
}
394+
337395
// 2. Reset jobs claimed by ANY worker for too long
338396
const staleClaimedCutoff = new Date(
339397
Date.now() - STALE_CLAIMED_MS,
@@ -355,6 +413,7 @@ async function startupCleanup(): Promise<void> {
355413
status: 'queued',
356414
worker_id: null,
357415
claimed_at: null,
416+
heartbeat_at: null,
358417
error: `Reset: claimed by ${job.worker_id} but never started`,
359418
})
360419
.eq('id', job.id)
@@ -384,6 +443,7 @@ async function startupCleanup(): Promise<void> {
384443
worker_id: null,
385444
claimed_at: null,
386445
started_at: null,
446+
heartbeat_at: null,
387447
attempt: job.attempt + 1,
388448
error: `Re-queued: abandoned running job (attempt ${job.attempt + 1}/${job.max_attempts})`,
389449
})
@@ -394,6 +454,7 @@ async function startupCleanup(): Promise<void> {
394454
.update({
395455
status: 'failed',
396456
completed_at: new Date().toISOString(),
457+
heartbeat_at: null,
397458
error: `Failed: abandoned after ${job.max_attempts} attempts`,
398459
})
399460
.eq('id', job.id)

packages/shared/src/constants.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,25 @@ export const STALE_CLAIMED_MS = 2 * 60 * 1000 // 2 minutes
5050
*/
5151
export const STALE_RUNNING_MS = 30 * 60 * 1000 // 30 minutes
5252

53+
/**
54+
* How often the worker sends a heartbeat while processing a job (ms).
55+
*/
56+
export const HEARTBEAT_INTERVAL_MS = 30_000 // 30 seconds
57+
58+
/**
59+
* How long a running job can go without a heartbeat before being
60+
* considered stale (ms). Must be > HEARTBEAT_INTERVAL_MS.
61+
*
62+
* Set to 3x the heartbeat interval to tolerate transient delays
63+
* (slow DB writes, GC pauses, etc.)
64+
*/
65+
export const STALE_HEARTBEAT_MS = 90_000 // 90 seconds
66+
67+
/**
68+
* How often the bot checks for stale running jobs (ms).
69+
*/
70+
export const REAPER_INTERVAL_MS = 60_000 // 60 seconds
71+
5372
/**
5473
* Supabase table names.
5574
*/

packages/shared/src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ export interface Job {
6464
claimed_at?: string
6565
started_at?: string
6666
completed_at?: string
67+
heartbeat_at?: string
6768

6869
// Error details on failure
6970
error?: string
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- Add heartbeat_at column for worker liveness detection.
2+
-- Workers update this timestamp every 30s while processing a job.
3+
-- The bot-side reaper checks for stale heartbeats every 60s and
4+
-- re-queues jobs whose workers have stopped responding.
5+
ALTER TABLE job_queue ADD COLUMN heartbeat_at TIMESTAMPTZ;

0 commit comments

Comments
 (0)