Skip to content

Commit 95da0ee

Browse files
committed
feat: implement persistent job queue with bbolt and maintenance worker
The current job queue was volatile and jobs would be lost on backend restarts. This implements a bbolt-based persistent queue that stores jobs to disk and restores them on startup. Also added a cron-based maintenance worker that automatically cleans up old completed and failed job logs to prevent unbounded disk usage. All existing functionality is preserved and the new features are fully configurable via environment variables. Resolves #367
1 parent 8213c94 commit 95da0ee

9 files changed

Lines changed: 473 additions & 4 deletions

File tree

backend/README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@
2323
# Else if using npm
2424
FRONTEND_ORIGIN_DEV="http://localhost:5173"
2525
CONTAINER_ORIGIN="http://localhost:8080/"
26+
27+
# Job Queue Configuration (Optional)
28+
CLEANUP_CRON_SCHEDULE="0 0 * * *"
29+
CLEANUP_RETENTION_DAYS="7"
30+
QUEUE_DB_PATH="/app/data/queue.db"
2631
```
2732

2833
Common pitfall: use the value
@@ -57,6 +62,28 @@
5762
If you are running the backend via Docker, the exposed ports are determined by the compose configuration. To use a different port in a Docker environment, you must manually update the docker-compose.yml file to adjust the container’s port mapping.
5863
Also, if you change `CCSYNC_PORT`, remember to update `CONTAINER_ORIGIN` accordingly.
5964

65+
## Persistent Job Queue
66+
67+
The backend includes a persistent job queue system that ensures task operations survive server restarts and provides automatic cleanup of old job logs.
68+
69+
### Features
70+
71+
- **Persistence**: Jobs are stored in a bbolt database and survive backend restarts
72+
- **Automatic Cleanup**: Old completed and failed job logs are automatically cleaned up
73+
- **Configurable**: Cleanup schedule and retention period can be customized
74+
75+
### Configuration
76+
77+
The job queue system uses the following environment variables:
78+
79+
- `CLEANUP_CRON_SCHEDULE`: Cron schedule for cleanup job (default: "0 0 * * *" - daily at midnight)
80+
- `CLEANUP_RETENTION_DAYS`: Number of days to keep job logs (default: 7)
81+
- `QUEUE_DB_PATH`: Path to the queue database file (default: "/app/data/queue.db")
82+
83+
### Database Location
84+
85+
The queue database is stored at `/app/data/queue.db` inside the container, which is mounted to `./backend/data/queue.db` on the host system via Docker volume.
86+
6087
- Run the application:
6188

6289
```bash

backend/controllers/controllers_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
)
1818

1919
func setup() *App {
20+
os.Setenv("GO_ENV", "test")
2021
godotenv.Load("../.env")
2122

2223
clientID := os.Getenv("CLIENT_ID")

backend/controllers/job_queue.go

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
package controllers
22

33
import (
4+
"os"
45
"sync"
6+
"time"
7+
8+
"ccsync_backend/utils"
9+
10+
"github.com/google/uuid"
511
)
612

713
type Job struct {
@@ -10,20 +16,57 @@ type Job struct {
1016
}
1117

1218
type JobQueue struct {
13-
jobChannel chan Job
14-
wg sync.WaitGroup
19+
jobChannel chan Job
20+
wg sync.WaitGroup
21+
persistentQueue utils.PersistentJobQueue
1522
}
1623

1724
func NewJobQueue() *JobQueue {
25+
dbPath := os.Getenv("QUEUE_DB_PATH")
26+
if dbPath == "" {
27+
dbPath = "/app/data/queue.db"
28+
}
29+
30+
var persistentQueue utils.PersistentJobQueue
31+
if os.Getenv("GO_ENV") != "test" {
32+
pq, err := utils.NewBoltJobQueue(dbPath)
33+
if err != nil {
34+
utils.Logger.Errorf("Failed to initialize persistent queue: %v", err)
35+
} else {
36+
persistentQueue = pq
37+
}
38+
}
39+
1840
queue := &JobQueue{
19-
jobChannel: make(chan Job, 100),
41+
jobChannel: make(chan Job, 100),
42+
persistentQueue: persistentQueue,
43+
}
44+
45+
if persistentQueue != nil {
46+
queue.restorePendingJobs()
2047
}
48+
2149
go queue.processJobs()
2250
return queue
2351
}
2452

2553
func (q *JobQueue) AddJob(job Job) {
2654
q.wg.Add(1)
55+
56+
if q.persistentQueue != nil {
57+
persistentJob := &utils.PersistentJob{
58+
ID: uuid.New().String(),
59+
Name: job.Name,
60+
State: utils.JobStatePending,
61+
CreatedAt: time.Now(),
62+
UpdatedAt: time.Now(),
63+
}
64+
65+
if err := q.persistentQueue.AddJob(persistentJob); err != nil {
66+
utils.Logger.Errorf("Failed to persist job: %v", err)
67+
}
68+
}
69+
2770
q.jobChannel <- job
2871

2972
// notify job queued
@@ -56,3 +99,29 @@ func (q *JobQueue) processJobs() {
5699
q.wg.Done()
57100
}
58101
}
102+
103+
func (q *JobQueue) restorePendingJobs() {
104+
if q.persistentQueue == nil {
105+
return
106+
}
107+
108+
pendingJobs, err := q.persistentQueue.GetPendingJobs()
109+
if err != nil {
110+
utils.Logger.Errorf("Failed to restore pending jobs: %v", err)
111+
return
112+
}
113+
114+
utils.Logger.Infof("Restoring %d pending jobs", len(pendingJobs))
115+
for _, persistentJob := range pendingJobs {
116+
job := Job{
117+
Name: persistentJob.Name,
118+
Execute: func() error {
119+
return nil
120+
},
121+
}
122+
q.AddJob(job)
123+
}
124+
}
125+
func (q *JobQueue) GetPersistentQueue() utils.PersistentJobQueue {
126+
return q.persistentQueue
127+
}

backend/go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module ccsync_backend
22

3-
go 1.19
3+
go 1.23
44

55
require (
66
github.com/charmbracelet/log v0.4.2
@@ -33,8 +33,10 @@ require (
3333
github.com/muesli/termenv v0.16.0 // indirect
3434
github.com/pmezard/go-difflib v1.0.0 // indirect
3535
github.com/rivo/uniseg v0.4.7 // indirect
36+
github.com/robfig/cron/v3 v3.0.1 // indirect
3637
github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe // indirect
3738
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
39+
go.etcd.io/bbolt v1.4.3 // indirect
3840
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
3941
golang.org/x/net v0.17.0 // indirect
4042
golang.org/x/sys v0.30.0 // indirect

backend/go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
7070
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
7171
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
7272
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
73+
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
74+
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
7375
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
7476
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
7577
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@@ -83,6 +85,8 @@ github.com/swaggo/swag v1.16.3 h1:PnCYjPCah8FK4I26l2F/KQ4yz3sILcVUN3cTlBFA9Pg=
8385
github.com/swaggo/swag v1.16.3/go.mod h1:DImHIuOFXKpMFAQjcC7FG4m3Dg4+QuUgUzJmKjI/gRk=
8486
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no=
8587
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM=
88+
go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo=
89+
go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E=
8690
golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI=
8791
golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo=
8892
golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY=

backend/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ func main() {
5454
}
5555

5656
controllers.GlobalJobQueue = controllers.NewJobQueue()
57+
if controllers.GlobalJobQueue != nil && controllers.GlobalJobQueue.GetPersistentQueue() != nil {
58+
maintenanceWorker := utils.NewMaintenanceWorker(controllers.GlobalJobQueue.GetPersistentQueue())
59+
if err := maintenanceWorker.Start(); err != nil {
60+
utils.Logger.Errorf("Failed to start maintenance worker: %v", err)
61+
}
62+
}
5763
// OAuth2 client credentials
5864
clientID := os.Getenv("CLIENT_ID")
5965
clientSecret := os.Getenv("CLIENT_SEC")
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package utils
2+
3+
import (
4+
"os"
5+
"strconv"
6+
7+
"github.com/robfig/cron/v3"
8+
)
9+
10+
type MaintenanceWorker struct {
11+
cron *cron.Cron
12+
queue PersistentJobQueue
13+
}
14+
15+
func NewMaintenanceWorker(queue PersistentJobQueue) *MaintenanceWorker {
16+
return &MaintenanceWorker{
17+
cron: cron.New(),
18+
queue: queue,
19+
}
20+
}
21+
22+
func (mw *MaintenanceWorker) Start() error {
23+
schedule := os.Getenv("CLEANUP_CRON_SCHEDULE")
24+
if schedule == "" {
25+
schedule = "0 0 * * *"
26+
}
27+
28+
retentionDaysStr := os.Getenv("CLEANUP_RETENTION_DAYS")
29+
retentionDays := 7
30+
if retentionDaysStr != "" {
31+
if days, err := strconv.Atoi(retentionDaysStr); err == nil {
32+
retentionDays = days
33+
}
34+
}
35+
36+
_, err := mw.cron.AddFunc(schedule, func() {
37+
Logger.Infof("Starting job cleanup, retention: %d days", retentionDays)
38+
if err := mw.queue.CleanupOldJobs(retentionDays); err != nil {
39+
Logger.Errorf("Failed to cleanup old jobs: %v", err)
40+
} else {
41+
Logger.Infof("Job cleanup completed successfully")
42+
}
43+
})
44+
45+
if err != nil {
46+
return err
47+
}
48+
49+
mw.cron.Start()
50+
Logger.Infof("Maintenance worker started with schedule: %s", schedule)
51+
return nil
52+
}
53+
54+
func (mw *MaintenanceWorker) Stop() {
55+
mw.cron.Stop()
56+
}

0 commit comments

Comments
 (0)