Skip to content

Commit ece2bd0

Browse files
committed
riverlog: use sjson for log append and pruning
The raw-message iteration still rebuilt log arrays through json marshaling, which left append performance significantly worse than the baseline implementation used on master. Switch riverlog history to gjson/sjson so operations stay on raw JSON arrays. The middleware reads only `river:log` with `gjson`, appends with `sjson.SetRawBytes` using `-1`, and enforces `MaxTotalBytes` by deleting oldest entries with `sjson.DeleteBytes("0")`. Comparison for `BenchmarkMiddlewareWorkAppend` (`-benchtime=100ms`): case: 256 KB impl ns/op B/op allocs/op baseline 976052 645856 291 rawmessage 1605106 698233 293 sjson 614433 1167265 25 case: 2 MB impl ns/op B/op allocs/op baseline 7762435 5920954 2096 rawmessage 12516198 6651551 2098 sjson 4485274 6482082 32 The pruning test changed only to avoid a brittle exact-byte boundary. `MaxTotalBytes` is now set slightly above the two-entry payload so the test consistently verifies the intended behavior: appending a third entry drops the oldest and keeps the newest two.
1 parent ba3a95e commit ece2bd0

3 files changed

Lines changed: 32 additions & 20 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
### Changed
1111

1212
- `riverlog.Middleware` now supports `MiddlewareConfig.MaxTotalBytes` (default 8 MB) to cap total persisted `river:log` history per job. When the cap is exceeded, oldest log entries are dropped first while retaining the newest entry. [PR #1157](https://github.com/riverqueue/river/pull/1157).
13+
- Improved `riverlog` performance and reduced memory amplification when appending to large persisted `river:log` histories. [PR #1157](https://github.com/riverqueue/river/pull/1157).
1314

1415
## [0.31.0] - 2026-02-21
1516

riverlog/river_log.go

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,13 @@ import (
88
"context"
99
"encoding/json"
1010
"errors"
11+
"fmt"
1112
"io"
1213
"log/slog"
1314

15+
"github.com/tidwall/gjson"
16+
"github.com/tidwall/sjson"
17+
1418
"github.com/riverqueue/river/internal/jobexecutor"
1519
"github.com/riverqueue/river/rivershared/baseservice"
1620
"github.com/riverqueue/river/rivertype"
@@ -162,15 +166,8 @@ type metadataWithLog struct {
162166
RiverLog []logAttempt `json:"river:log"`
163167
}
164168

165-
type metadataWithRawLog struct {
166-
RiverLog []json.RawMessage `json:"river:log"`
167-
}
168-
169169
func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error {
170-
var (
171-
existingRawLogData metadataWithRawLog
172-
logBuf bytes.Buffer
173-
)
170+
var logBuf bytes.Buffer
174171

175172
switch {
176173
case m.newCustomContext != nil:
@@ -182,10 +179,6 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
182179
return errors.New("expected either newContextLogger or newSlogHandler to be set")
183180
}
184181

185-
if err := json.Unmarshal(job.Metadata, &existingRawLogData); err != nil {
186-
return err
187-
}
188-
189182
metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
190183
if !hasMetadataUpdates {
191184
return errors.New("expected to find metadata updates in context, but didn't")
@@ -221,7 +214,7 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
221214
return
222215
}
223216

224-
allLogDataBytes, numDroppedEntries, err := marshalRawLogDataWithCap(append(existingRawLogData.RiverLog, json.RawMessage(newLogEntryBytes)), m.config.MaxTotalBytes)
217+
allLogDataBytes, numDroppedEntries, err := appendLogDataWithCap(job.Metadata, newLogEntryBytes, m.config.MaxTotalBytes)
225218
if err != nil {
226219
m.Logger.ErrorContext(ctx, m.Name+": Error marshaling log data",
227220
slog.Any("error", err),
@@ -242,8 +235,24 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
242235
return doInner(ctx)
243236
}
244237

245-
func marshalRawLogDataWithCap(allLogData []json.RawMessage, maxTotalBytes int) ([]byte, int, error) {
246-
allLogDataBytes, err := json.Marshal(allLogData)
238+
func appendLogDataWithCap(metadataBytes, newLogEntryBytes []byte, maxTotalBytes int) ([]byte, int, error) {
239+
if !json.Valid(metadataBytes) {
240+
return nil, 0, errors.New("metadata is not valid JSON")
241+
}
242+
243+
existingLogData := gjson.GetBytes(metadataBytes, metadataKey)
244+
var allLogDataBytes []byte
245+
switch {
246+
case !existingLogData.Exists():
247+
allLogDataBytes = []byte("[]")
248+
case existingLogData.IsArray():
249+
allLogDataBytes = []byte(existingLogData.Raw)
250+
default:
251+
return nil, 0, fmt.Errorf("%q value is not an array", metadataKey)
252+
}
253+
254+
var err error
255+
allLogDataBytes, err = sjson.SetRawBytes(allLogDataBytes, "-1", newLogEntryBytes)
247256
if err != nil {
248257
return nil, 0, err
249258
}
@@ -253,14 +262,15 @@ func marshalRawLogDataWithCap(allLogData []json.RawMessage, maxTotalBytes int) (
253262
}
254263

255264
// Drop oldest entries first, while always retaining the latest one.
265+
numEntries := len(gjson.ParseBytes(allLogDataBytes).Array())
256266
var numDroppedEntries int
257-
for numDroppedEntries < len(allLogData)-1 && len(allLogDataBytes) > maxTotalBytes {
258-
numDroppedEntries++
259-
260-
allLogDataBytes, err = json.Marshal(allLogData[numDroppedEntries:])
267+
for numEntries > 1 && len(allLogDataBytes) > maxTotalBytes {
268+
allLogDataBytes, err = sjson.DeleteBytes(allLogDataBytes, "0")
261269
if err != nil {
262270
return nil, numDroppedEntries, err
263271
}
272+
numEntries--
273+
numDroppedEntries++
264274
}
265275

266276
return allLogDataBytes, numDroppedEntries, nil

riverlog/river_log_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,8 @@ func TestMiddleware(t *testing.T) {
280280
require.NoError(t, err)
281281

282282
testWorker, bundle := setup(t, &MiddlewareConfig{
283-
MaxTotalBytes: len(maxTotalBytes),
283+
// Keep two entries, force pruning once a third is appended.
284+
MaxTotalBytes: len(maxTotalBytes) + 16,
284285
})
285286

286287
workRes, err := testWorker.Work(ctx, t, bundle.tx, loggingArgs{Message: "Logged from worker"}, nil)

0 commit comments

Comments
 (0)