Skip to content

Commit 5896050

Browse files
committed
riverlog: build capped log arrays in one pass
Appending to large river:log histories still did extra work in the sjson version: it built an appended array first, then repeatedly deleted index 0 while over cap. In high-volume histories this caused multiple full-array rewrites before reaching the final payload. Rework append/prune to compute the kept suffix first and then build the final array once. The new flow marshals only the new entry, derives bounds for existing array elements, decides how many oldest entries to drop to fit MaxTotalBytes, and emits a single final JSON array. It also truncates log bytes before converting to string so oversized log buffers are not copied in full. Benchmark comparison for `BenchmarkMiddlewareWorkAppend` (`-benchtime=100ms`, same machine): case: 256 KB impl ns/op B/op allocs/op before 821178 1167325 25 after 177355 884454 31 case: 2 MB impl ns/op B/op allocs/op before 4854185 6479606 29 after 1247496 7006818 46 Added focused unit coverage for appendLogDataWithCap, including missing-key behavior, non-array metadata errors, minimal oldest-first pruning, keeping newest entry when over cap, and no-cap behavior.
1 parent c87f47e commit 5896050

2 files changed

Lines changed: 217 additions & 28 deletions

File tree

riverlog/river_log.go

Lines changed: 110 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"log/slog"
1414

1515
"github.com/tidwall/gjson"
16-
"github.com/tidwall/sjson"
1716

1817
"github.com/riverqueue/river/internal/jobexecutor"
1918
"github.com/riverqueue/river/rivershared/baseservice"
@@ -186,26 +185,26 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
186185

187186
// This all runs invariant of whether the job panics or returns an error.
188187
defer func() {
189-
logData := logBuf.String()
188+
logBytes := logBuf.Bytes()
190189

191190
// Return early if nothing ended up getting logged.
192-
if len(logData) < 1 {
191+
if len(logBytes) < 1 {
193192
return
194193
}
195194

196195
// Postgres JSONB is limited to 255MB, but it would be a bad idea to get
197196
// anywhere close to that limit here.
198-
if len(logData) > m.config.MaxSizeBytes {
197+
if len(logBytes) > m.config.MaxSizeBytes {
199198
m.Logger.WarnContext(ctx, m.Name+": Logs size exceeded maximum; truncating",
200-
slog.Int("logs_size", len(logData)),
199+
slog.Int("logs_size", len(logBytes)),
201200
slog.Int("max_size", m.config.MaxSizeBytes),
202201
)
203-
logData = logData[0:m.config.MaxSizeBytes]
202+
logBytes = logBytes[0:m.config.MaxSizeBytes]
204203
}
205204

206205
newLogEntryBytes, err := json.Marshal(logAttempt{
207206
Attempt: job.Attempt,
208-
Log: logData,
207+
Log: string(logBytes),
209208
})
210209
if err != nil {
211210
m.Logger.ErrorContext(ctx, m.Name+": Error marshaling log data",
@@ -236,42 +235,125 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
236235
}
237236

238237
func appendLogDataWithCap(metadataBytes, newLogEntryBytes []byte, maxTotalBytes int) ([]byte, int, error) {
239-
if !json.Valid(metadataBytes) {
240-
return nil, 0, errors.New("metadata is not valid JSON")
241-
}
242-
243238
existingLogData := gjson.GetBytes(metadataBytes, metadataKey)
244-
var allLogDataBytes []byte
239+
var existingLogArrayBytes []byte
245240
switch {
246241
case !existingLogData.Exists():
247-
allLogDataBytes = []byte("[]")
242+
existingLogArrayBytes = []byte("[]")
248243
case existingLogData.IsArray():
249-
allLogDataBytes = []byte(existingLogData.Raw)
244+
// Slice raw JSON straight from metadata bytes to avoid an extra copy.
245+
existingLogArrayBytes = metadataBytes[existingLogData.Index : existingLogData.Index+len(existingLogData.Raw)]
250246
default:
251247
return nil, 0, fmt.Errorf("%q value is not an array", metadataKey)
252248
}
253249

254-
var err error
255-
allLogDataBytes, err = sjson.SetRawBytes(allLogDataBytes, "-1", newLogEntryBytes)
250+
existingElementBounds, err := getArrayElementBounds(existingLogArrayBytes)
256251
if err != nil {
257252
return nil, 0, err
258253
}
259254

260-
if maxTotalBytes <= 0 || len(allLogDataBytes) <= maxTotalBytes {
261-
return allLogDataBytes, 0, nil
255+
// Determine the smallest suffix to keep that still fits with the new entry.
256+
// This keeps pruning oldest-first while avoiding repeated full rewrites.
257+
keepStart := getKeepStart(existingElementBounds, len(newLogEntryBytes), maxTotalBytes)
258+
259+
// Build the final array once from the kept suffix plus the new entry.
260+
appendedLogDataBytes := buildAppendedArray(existingLogArrayBytes, existingElementBounds, keepStart, newLogEntryBytes)
261+
numDroppedEntries := keepStart
262+
if numDroppedEntries > len(existingElementBounds) {
263+
numDroppedEntries = len(existingElementBounds)
262264
}
263265

264-
// Drop oldest entries first, while always retaining the latest one.
265-
numEntries := len(gjson.ParseBytes(allLogDataBytes).Array())
266-
var numDroppedEntries int
267-
for numEntries > 1 && len(allLogDataBytes) > maxTotalBytes {
268-
allLogDataBytes, err = sjson.DeleteBytes(allLogDataBytes, "0")
269-
if err != nil {
270-
return nil, numDroppedEntries, err
266+
return appendedLogDataBytes, numDroppedEntries, nil
267+
}
268+
269+
type arrayElementBounds struct {
270+
Start int
271+
End int
272+
}
273+
274+
func getArrayElementBounds(arrayBytes []byte) ([]arrayElementBounds, error) {
275+
arrResult := gjson.ParseBytes(arrayBytes)
276+
if !arrResult.IsArray() {
277+
return nil, errors.New("expected a JSON array")
278+
}
279+
280+
elements := arrResult.Array()
281+
bounds := make([]arrayElementBounds, len(elements))
282+
for i, elem := range elements {
283+
if elem.Index < 0 {
284+
return nil, errors.New("failed to determine array element index")
285+
}
286+
bounds[i] = arrayElementBounds{
287+
Start: elem.Index,
288+
End: elem.Index + len(elem.Raw),
271289
}
272-
numEntries--
273-
numDroppedEntries++
274290
}
291+
return bounds, nil
292+
}
293+
294+
func getKeepStart(bounds []arrayElementBounds, newEntryLen, maxTotalBytes int) int {
295+
if maxTotalBytes <= 0 {
296+
return 0
297+
}
298+
299+
// Keep newest entry even if it's larger than the configured cap.
300+
newOnlyLen := 2 + newEntryLen // `[` + entry + `]`
301+
if newOnlyLen > maxTotalBytes {
302+
return len(bounds)
303+
}
304+
305+
if len(bounds) == 0 {
306+
return 0
307+
}
308+
309+
// suffixContentLen[i] is the serialized content length for elements i..end
310+
// inside the array, including commas between elements.
311+
suffixContentLen := make([]int, len(bounds))
312+
for i := len(bounds) - 1; i >= 0; i-- {
313+
elemLen := bounds[i].End - bounds[i].Start
314+
if i == len(bounds)-1 {
315+
suffixContentLen[i] = elemLen
316+
} else {
317+
suffixContentLen[i] = elemLen + 1 + suffixContentLen[i+1]
318+
}
319+
}
320+
321+
// Iterate from oldest to newest so we drop the minimum number of entries
322+
// necessary to fit the configured cap.
323+
for keepStart := 0; keepStart <= len(bounds); keepStart++ {
324+
contentLen := newEntryLen
325+
if keepStart < len(bounds) {
326+
contentLen += 1 + suffixContentLen[keepStart] // comma between kept suffix and new entry
327+
}
328+
totalLen := 2 + contentLen // `[` + content + `]`
329+
if totalLen <= maxTotalBytes {
330+
return keepStart
331+
}
332+
}
333+
334+
return len(bounds)
335+
}
336+
337+
func buildAppendedArray(existingArrayBytes []byte, bounds []arrayElementBounds, keepStart int, newEntryBytes []byte) []byte {
338+
var totalLen int
339+
if keepStart >= len(bounds) {
340+
totalLen = 2 + len(newEntryBytes)
341+
} else {
342+
// Kept suffix is contiguous in the original array bytes, so copy once.
343+
keptContentLen := bounds[len(bounds)-1].End - bounds[keepStart].Start
344+
totalLen = 2 + keptContentLen + 1 + len(newEntryBytes) // [] + comma + new entry
345+
}
346+
347+
result := make([]byte, 0, totalLen)
348+
result = append(result, '[')
349+
350+
if keepStart < len(bounds) {
351+
result = append(result, existingArrayBytes[bounds[keepStart].Start:bounds[len(bounds)-1].End]...)
352+
result = append(result, ',')
353+
}
354+
355+
result = append(result, newEntryBytes...)
356+
result = append(result, ']')
275357

276-
return allLogDataBytes, numDroppedEntries, nil
358+
return result
277359
}

riverlog/river_log_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"io"
88
"log"
99
"log/slog"
10+
"strings"
1011
"testing"
1112

1213
"github.com/jackc/pgx/v5"
@@ -44,6 +45,112 @@ func TestLogger(t *testing.T) {
4445
})
4546
}
4647

48+
func TestAppendLogDataWithCap(t *testing.T) {
49+
t.Parallel()
50+
51+
marshalLog := func(tb testing.TB, attempt int, log string) []byte {
52+
tb.Helper()
53+
b, err := json.Marshal(logAttempt{Attempt: attempt, Log: log})
54+
require.NoError(tb, err)
55+
return b
56+
}
57+
58+
marshalMetadataWithLogs := func(tb testing.TB, logs []logAttempt) []byte {
59+
tb.Helper()
60+
b, err := json.Marshal(map[string]any{
61+
metadataKey: logs,
62+
})
63+
require.NoError(tb, err)
64+
return b
65+
}
66+
67+
unmarshalLogs := func(tb testing.TB, rawArray []byte) []logAttempt {
68+
tb.Helper()
69+
var logs []logAttempt
70+
require.NoError(tb, json.Unmarshal(rawArray, &logs))
71+
return logs
72+
}
73+
74+
t.Run("MissingKeyStartsFromEmptyArray", func(t *testing.T) {
75+
t.Parallel()
76+
77+
newEntry := marshalLog(t, 1, "new")
78+
result, dropped, err := appendLogDataWithCap([]byte(`{"other":"value"}`), newEntry, maxTotalBytes)
79+
require.NoError(t, err)
80+
require.Zero(t, dropped)
81+
require.Equal(t, []logAttempt{{Attempt: 1, Log: "new"}}, unmarshalLogs(t, result))
82+
})
83+
84+
t.Run("NonArrayLogValueReturnsError", func(t *testing.T) {
85+
t.Parallel()
86+
87+
newEntry := marshalLog(t, 1, "new")
88+
_, _, err := appendLogDataWithCap([]byte(`{"river:log":{"not":"array"}}`), newEntry, maxTotalBytes)
89+
require.EqualError(t, err, `"river:log" value is not an array`)
90+
})
91+
92+
t.Run("PrunesOldestEntriesOnlyAsNeeded", func(t *testing.T) {
93+
t.Parallel()
94+
95+
existing := []logAttempt{
96+
{Attempt: 1, Log: "a"},
97+
{Attempt: 2, Log: "b"},
98+
{Attempt: 3, Log: "c"},
99+
}
100+
newEntry := marshalLog(t, 4, "d")
101+
102+
target, err := json.Marshal([]logAttempt{
103+
{Attempt: 2, Log: "b"},
104+
{Attempt: 3, Log: "c"},
105+
{Attempt: 4, Log: "d"},
106+
})
107+
require.NoError(t, err)
108+
109+
result, dropped, err := appendLogDataWithCap(marshalMetadataWithLogs(t, existing), newEntry, len(target))
110+
require.NoError(t, err)
111+
require.Equal(t, 1, dropped)
112+
require.Equal(t, []logAttempt{
113+
{Attempt: 2, Log: "b"},
114+
{Attempt: 3, Log: "c"},
115+
{Attempt: 4, Log: "d"},
116+
}, unmarshalLogs(t, result))
117+
})
118+
119+
t.Run("KeepsNewestEntryEvenIfOverCap", func(t *testing.T) {
120+
t.Parallel()
121+
122+
existing := []logAttempt{
123+
{Attempt: 1, Log: "a"},
124+
{Attempt: 2, Log: "b"},
125+
}
126+
newEntry := marshalLog(t, 3, strings.Repeat("x", 64))
127+
128+
result, dropped, err := appendLogDataWithCap(marshalMetadataWithLogs(t, existing), newEntry, 8)
129+
require.NoError(t, err)
130+
require.Equal(t, len(existing), dropped)
131+
require.Equal(t, []logAttempt{
132+
{Attempt: 3, Log: strings.Repeat("x", 64)},
133+
}, unmarshalLogs(t, result))
134+
})
135+
136+
t.Run("NoCapKeepsEverything", func(t *testing.T) {
137+
t.Parallel()
138+
139+
existing := []logAttempt{
140+
{Attempt: 1, Log: "a"},
141+
}
142+
newEntry := marshalLog(t, 2, "b")
143+
144+
result, dropped, err := appendLogDataWithCap(marshalMetadataWithLogs(t, existing), newEntry, 0)
145+
require.NoError(t, err)
146+
require.Zero(t, dropped)
147+
require.Equal(t, []logAttempt{
148+
{Attempt: 1, Log: "a"},
149+
{Attempt: 2, Log: "b"},
150+
}, unmarshalLogs(t, result))
151+
})
152+
}
153+
47154
func TestLoggerSafely(t *testing.T) {
48155
t.Parallel()
49156

0 commit comments

Comments
 (0)