Skip to content

Commit ba3a95e

Browse files
committed
riverlog: use raw messages internally
Appending a new river:log entry previously decoded the whole historical array into []logAttempt and then re-encoded it. For large metadata this creates substantial transient allocations for strings and slices. Switch middleware internals to decode river:log as []json.RawMessage. Each run now marshals only the newest entry and re-serializes the raw entry array while preserving the existing total-cap pruning behavior. This removes per-entry struct decoding from the append path.
1 parent f4440e0 commit ba3a95e

1 file changed

Lines changed: 19 additions & 6 deletions

File tree

riverlog/river_log.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,14 @@ type metadataWithLog struct {
162162
RiverLog []logAttempt `json:"river:log"`
163163
}
164164

165+
type metadataWithRawLog struct {
166+
RiverLog []json.RawMessage `json:"river:log"`
167+
}
168+
165169
func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error {
166170
var (
167-
existingLogData metadataWithLog
168-
logBuf bytes.Buffer
171+
existingRawLogData metadataWithRawLog
172+
logBuf bytes.Buffer
169173
)
170174

171175
switch {
@@ -178,7 +182,7 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
178182
return errors.New("expected either newContextLogger or newSlogHandler to be set")
179183
}
180184

181-
if err := json.Unmarshal(job.Metadata, &existingLogData); err != nil {
185+
if err := json.Unmarshal(job.Metadata, &existingRawLogData); err != nil {
182186
return err
183187
}
184188

@@ -206,14 +210,23 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
206210
logData = logData[0:m.config.MaxSizeBytes]
207211
}
208212

209-
allLogDataBytes, numDroppedEntries, err := marshalLogDataWithCap(append(existingLogData.RiverLog, logAttempt{
213+
newLogEntryBytes, err := json.Marshal(logAttempt{
210214
Attempt: job.Attempt,
211215
Log: logData,
212-
}), m.config.MaxTotalBytes)
216+
})
217+
if err != nil {
218+
m.Logger.ErrorContext(ctx, m.Name+": Error marshaling log data",
219+
slog.Any("error", err),
220+
)
221+
return
222+
}
223+
224+
allLogDataBytes, numDroppedEntries, err := marshalRawLogDataWithCap(append(existingRawLogData.RiverLog, json.RawMessage(newLogEntryBytes)), m.config.MaxTotalBytes)
213225
if err != nil {
214226
m.Logger.ErrorContext(ctx, m.Name+": Error marshaling log data",
215227
slog.Any("error", err),
216228
)
229+
return
217230
}
218231

219232
if numDroppedEntries > 0 {
@@ -229,7 +242,7 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
229242
return doInner(ctx)
230243
}
231244

232-
func marshalLogDataWithCap(allLogData []logAttempt, maxTotalBytes int) ([]byte, int, error) {
245+
func marshalRawLogDataWithCap(allLogData []json.RawMessage, maxTotalBytes int) ([]byte, int, error) {
233246
allLogDataBytes, err := json.Marshal(allLogData)
234247
if err != nil {
235248
return nil, 0, err

0 commit comments

Comments
 (0)