Skip to content

Commit f4440e0

Browse files
committed
riverlog: cap total persisted log size
Snoozed jobs can run repeatedly without consuming attempts, so river:log entries may grow without bound over long-running jobs. Large metadata payloads then amplify memory use on each future append. Add a new `MiddlewareConfig` field `MaxTotalBytes` to bound the total serialized `river:log` payload. When the new attempt is appended and the result exceeds the cap, oldest entries are dropped first while always retaining the newest entry. The middleware now defaults `MaxTotalBytes` to 8 MB. New tests verify that oldest entries are pruned when the cap is reached, a tiny cap still retains the newest entry, and the default is applied.
1 parent 23921c2 commit f4440e0

3 files changed

Lines changed: 143 additions & 5 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Changed
11+
12+
- `riverlog.Middleware` now supports `MiddlewareConfig.MaxTotalBytes` (default 8 MB) to cap total persisted `river:log` history per job. When the cap is exceeded, oldest log entries are dropped first while retaining the newest entry. [PR #1157](https://github.com/riverqueue/river/pull/1157).
13+
1014
## [0.31.0] - 2026-02-21
1115

1216
### Added

riverlog/river_log.go

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ import (
1717
)
1818

1919
const (
20-
maxSizeMB = 2
21-
maxSizeBytes = maxSizeMB * 1024 * 1024
22-
metadataKey = "river:log"
20+
maxSizeMB = 2
21+
maxSizeBytes = maxSizeMB * 1024 * 1024
22+
maxTotalSizeMB = 8
23+
maxTotalBytes = maxTotalSizeMB * 1024 * 1024
24+
metadataKey = "river:log"
2325
)
2426

2527
type contextKey struct{}
@@ -77,6 +79,16 @@ type MiddlewareConfig struct {
7779
//
7880
// Defaults to 2 MB (which is per job attempt).
7981
MaxSizeBytes int
82+
83+
// MaxTotalBytes is the maximum total size of all persisted river logs for a
84+
// job attempt history. If appending the latest attempt would exceed this
85+
// size, oldest log entries are dropped first.
86+
//
87+
// The latest entry is always retained, even if doing so means the resulting
88+
// payload exceeds MaxTotalBytes.
89+
//
90+
// Defaults to 8 MB.
91+
MaxTotalBytes int
8092
}
8193

8294
// NewMiddleware initializes a new Middleware with the given slog handler
@@ -136,6 +148,7 @@ func defaultConfig(config *MiddlewareConfig) *MiddlewareConfig {
136148
}
137149

138150
config.MaxSizeBytes = cmp.Or(config.MaxSizeBytes, maxSizeBytes)
151+
config.MaxTotalBytes = cmp.Or(config.MaxTotalBytes, maxTotalBytes)
139152

140153
return config
141154
}
@@ -193,18 +206,49 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
193206
logData = logData[0:m.config.MaxSizeBytes]
194207
}
195208

196-
allLogDataBytes, err := json.Marshal(append(existingLogData.RiverLog, logAttempt{
209+
allLogDataBytes, numDroppedEntries, err := marshalLogDataWithCap(append(existingLogData.RiverLog, logAttempt{
197210
Attempt: job.Attempt,
198211
Log: logData,
199-
}))
212+
}), m.config.MaxTotalBytes)
200213
if err != nil {
201214
m.Logger.ErrorContext(ctx, m.Name+": Error marshaling log data",
202215
slog.Any("error", err),
203216
)
204217
}
205218

219+
if numDroppedEntries > 0 {
220+
m.Logger.WarnContext(ctx, m.Name+": Logs size exceeded total maximum; dropping oldest entries",
221+
slog.Int("max_total_size", m.config.MaxTotalBytes),
222+
slog.Int("num_entries_dropped", numDroppedEntries),
223+
)
224+
}
225+
206226
metadataUpdates[metadataKey] = json.RawMessage(allLogDataBytes)
207227
}()
208228

209229
return doInner(ctx)
210230
}
231+
232+
func marshalLogDataWithCap(allLogData []logAttempt, maxTotalBytes int) ([]byte, int, error) {
233+
allLogDataBytes, err := json.Marshal(allLogData)
234+
if err != nil {
235+
return nil, 0, err
236+
}
237+
238+
if maxTotalBytes <= 0 || len(allLogDataBytes) <= maxTotalBytes {
239+
return allLogDataBytes, 0, nil
240+
}
241+
242+
// Drop oldest entries first, while always retaining the latest one.
243+
var numDroppedEntries int
244+
for numDroppedEntries < len(allLogData)-1 && len(allLogDataBytes) > maxTotalBytes {
245+
numDroppedEntries++
246+
247+
allLogDataBytes, err = json.Marshal(allLogData[numDroppedEntries:])
248+
if err != nil {
249+
return nil, numDroppedEntries, err
250+
}
251+
}
252+
253+
return allLogDataBytes, numDroppedEntries, nil
254+
}

riverlog/river_log_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,96 @@ func TestMiddleware(t *testing.T) {
270270
)
271271
})
272272

273+
t.Run("PrunesOldestEntriesAtMaxTotalBytes", func(t *testing.T) {
274+
t.Parallel()
275+
276+
maxTotalBytes, err := json.Marshal([]logAttempt{
277+
{Attempt: 1, Log: `msg="Logged from worker"` + "\n"},
278+
{Attempt: 2, Log: `msg="Logged from worker"` + "\n"},
279+
})
280+
require.NoError(t, err)
281+
282+
testWorker, bundle := setup(t, &MiddlewareConfig{
283+
MaxTotalBytes: len(maxTotalBytes),
284+
})
285+
286+
workRes, err := testWorker.Work(ctx, t, bundle.tx, loggingArgs{Message: "Logged from worker"}, nil)
287+
require.NoError(t, err)
288+
289+
// Set state back to available and unfinalize the job to make it runnable again.
290+
workRes.Job, err = bundle.driver.UnwrapExecutor(bundle.tx).JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{
291+
ID: workRes.Job.ID,
292+
FinalizedAtDoUpdate: true,
293+
FinalizedAt: nil,
294+
StateDoUpdate: true,
295+
State: rivertype.JobStateAvailable,
296+
})
297+
require.NoError(t, err)
298+
299+
workRes, err = testWorker.WorkJob(ctx, t, bundle.tx, workRes.Job)
300+
require.NoError(t, err)
301+
302+
// Set state back to available and unfinalize the job to make it runnable again.
303+
workRes.Job, err = bundle.driver.UnwrapExecutor(bundle.tx).JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{
304+
ID: workRes.Job.ID,
305+
FinalizedAtDoUpdate: true,
306+
FinalizedAt: nil,
307+
StateDoUpdate: true,
308+
State: rivertype.JobStateAvailable,
309+
})
310+
require.NoError(t, err)
311+
312+
workRes, err = testWorker.WorkJob(ctx, t, bundle.tx, workRes.Job)
313+
require.NoError(t, err)
314+
315+
var metadataWithLog metadataWithLog
316+
require.NoError(t, json.Unmarshal(workRes.Job.Metadata, &metadataWithLog))
317+
318+
require.Equal(t, []logAttempt{
319+
{Attempt: 2, Log: `msg="Logged from worker"` + "\n"},
320+
{Attempt: 3, Log: `msg="Logged from worker"` + "\n"},
321+
}, metadataWithLog.RiverLog)
322+
})
323+
324+
t.Run("RetainsLatestEntryWhenTotalLimitTiny", func(t *testing.T) {
325+
t.Parallel()
326+
327+
testWorker, bundle := setup(t, &MiddlewareConfig{
328+
MaxTotalBytes: 1,
329+
})
330+
331+
workRes, err := testWorker.Work(ctx, t, bundle.tx, loggingArgs{Message: "Logged from worker"}, nil)
332+
require.NoError(t, err)
333+
334+
// Set state back to available and unfinalize the job to make it runnable again.
335+
workRes.Job, err = bundle.driver.UnwrapExecutor(bundle.tx).JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{
336+
ID: workRes.Job.ID,
337+
FinalizedAtDoUpdate: true,
338+
FinalizedAt: nil,
339+
StateDoUpdate: true,
340+
State: rivertype.JobStateAvailable,
341+
})
342+
require.NoError(t, err)
343+
344+
workRes, err = testWorker.WorkJob(ctx, t, bundle.tx, workRes.Job)
345+
require.NoError(t, err)
346+
347+
var metadataWithLog metadataWithLog
348+
require.NoError(t, json.Unmarshal(workRes.Job.Metadata, &metadataWithLog))
349+
350+
require.Equal(t, []logAttempt{
351+
{Attempt: 2, Log: `msg="Logged from worker"` + "\n"},
352+
}, metadataWithLog.RiverLog)
353+
})
354+
355+
t.Run("DefaultsMaxTotalBytes", func(t *testing.T) {
356+
t.Parallel()
357+
358+
_, bundle := setup(t, nil)
359+
360+
require.Equal(t, maxTotalBytes, bundle.middleware.config.MaxTotalBytes)
361+
})
362+
273363
t.Run("RawMiddleware", func(t *testing.T) {
274364
t.Parallel()
275365

0 commit comments

Comments
 (0)