Skip to content

Commit 5119b40

Browse files
Use batch logic to push logs
Signed-off-by: Mohamed Abokammer <mahmednabil109@gmail.com>
1 parent 2ba91cd commit 5119b40

5 files changed

Lines changed: 119 additions & 42 deletions

File tree

cmds/admin_server/server/server.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,24 +107,28 @@ func (r *RouteHandler) status(c *gin.Context) {
107107
c.JSON(http.StatusOK, gin.H{"status": "live"})
108108
}
109109

110-
// addLog inserts a new log entry inside the database
111-
func (r *RouteHandler) addLog(c *gin.Context) {
112-
113-
var log Log
114-
if err := c.Bind(&log); err != nil {
115-
c.JSON(http.StatusBadRequest, gin.H{"status": "err", "msg": "badly formatted log"})
110+
// addLogs inserts log's batches into the database
111+
func (r *RouteHandler) addLogs(c *gin.Context) {
112+
var logs []Log
113+
if err := c.Bind(&logs); err != nil {
114+
c.JSON(http.StatusBadRequest, gin.H{"status": "err", "msg": "badly formatted logs"})
116115
r.log.Errorf("Err while binding request body %v", err)
117116
return
118117
}
119118

119+
storageLogs := make([]storage.Log, 0, len(logs))
120+
for _, log := range logs {
121+
storageLogs = append(storageLogs, log.ToStorageLog())
122+
}
123+
120124
ctx, cancel := xcontext.WithTimeout(xcontext.Background(), DefaultDBAccessTimeout)
121125
defer cancel()
122126
ctx = ctx.WithLogger(r.log)
123-
err := r.storage.StoreLog(ctx, log.ToStorageLog())
127+
err := r.storage.StoreLogs(ctx, storageLogs)
124128
if err != nil {
125129
switch {
126130
case errors.Is(err, storage.ErrInsert):
127-
c.JSON(http.StatusInternalServerError, gin.H{"status": "err", "msg": "error while storing the log"})
131+
c.JSON(http.StatusInternalServerError, gin.H{"status": "err", "msg": "error while storing the batch"})
128132
case errors.Is(err, storage.ErrReadOnlyStorage):
129133
c.JSON(http.StatusNotImplemented, gin.H{"status": "err", "msg": "not supported action"})
130134
default:
@@ -168,7 +172,7 @@ func initRouter(ctx xcontext.Context, rh RouteHandler, middlewares []gin.Handler
168172
}
169173

170174
r.GET("/status", rh.status)
171-
r.POST("/log", rh.addLog)
175+
r.POST("/log", rh.addLogs)
172176
r.GET("/log", rh.getLogs)
173177

174178
// serve the frontend app

cmds/admin_server/storage/mongo/mongo.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,14 @@ func toMongoQuery(query storage.Query) bson.D {
106106
return q
107107
}
108108

109-
func (s *MongoStorage) StoreLog(ctx xcontext.Context, log storage.Log) error {
110-
mongoLog := toMongoLog(&log)
111-
_, err := s.collection.InsertOne(ctx, mongoLog)
109+
func (s *MongoStorage) StoreLogs(ctx xcontext.Context, logs []storage.Log) error {
110+
var mongoLogs []interface{}
111+
for _, log := range logs {
112+
mongoLogs = append(mongoLogs, toMongoLog(&log))
113+
}
114+
_, err := s.collection.InsertMany(ctx, mongoLogs)
112115
if err != nil {
113-
// for better debugging
114-
ctx.Errorf("Error while inserting into the db: %v", err)
116+
ctx.Errorf("Error while inserting a batch of logs: %v", err)
115117
return storage.ErrInsert
116118
}
117119
return nil

cmds/admin_server/storage/storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ var (
1919
)
2020

2121
type Storage interface {
22-
StoreLog(ctx xcontext.Context, entry Log) error
22+
StoreLogs(ctx xcontext.Context, logs []Log) error
2323
GetLogs(ctx xcontext.Context, query Query) (*Result, error)
2424

2525
Close(ctx xcontext.Context) error

pkg/loggerhook/httphook.go

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,59 @@ import (
1818

1919
var (
2020
DefaultBufferSize = 10
21+
MaxBatchSize = 500000 // size in bytes
22+
MaxBatchCount = 100
23+
BatchSendFreq = 1 * time.Second
2124
DefaultLogTimeout = 1 * time.Second
2225
)
2326

27+
// Batch defines a log batch that handles the size in bytes of the logs
28+
type Batch struct {
29+
addr string
30+
logs []server.Log
31+
size uint64
32+
}
33+
34+
func NewBatch(addr string) Batch {
35+
return Batch{
36+
addr: addr,
37+
}
38+
}
39+
40+
func (b *Batch) Add(log server.Log) {
41+
b.logs = append(b.logs, log)
42+
b.size += uint64(len(log.LogData))
43+
}
44+
45+
func (b *Batch) Size() uint64 {
46+
return b.size
47+
}
48+
49+
func (b *Batch) Count() int {
50+
return len(b.logs)
51+
}
52+
53+
// PostAndReset makes a post request sending hh.batch and reseting the batch
54+
func (b *Batch) PostAndReset() error {
55+
logJson, err := json.Marshal(b.logs)
56+
if err != nil {
57+
return fmt.Errorf("Marshal Err: %v", err)
58+
}
59+
requestBody := bytes.NewBuffer(logJson)
60+
_, err = http.Post(b.addr, "application/json", requestBody)
61+
if err != nil {
62+
return fmt.Errorf("Http Logger Err: %v", err)
63+
}
64+
65+
b.logs = nil
66+
b.size = 0
67+
return nil
68+
}
69+
2470
type HttpHook struct {
25-
Addr string
71+
batch Batch
72+
batchTicker *time.Ticker
73+
2674
logChan chan server.Log
2775
closeChan chan struct{}
2876
}
@@ -36,9 +84,10 @@ func NewHttpHook(addr string) (*HttpHook, error) {
3684
url.Path = path.Join(url.Path, "log")
3785

3886
hh := HttpHook{
39-
Addr: url.String(),
40-
logChan: make(chan server.Log, DefaultBufferSize),
41-
closeChan: make(chan struct{}),
87+
batch: NewBatch(url.String()),
88+
batchTicker: time.NewTicker(BatchSendFreq),
89+
logChan: make(chan server.Log, DefaultBufferSize),
90+
closeChan: make(chan struct{}),
4291
}
4392

4493
go hh.logHandler()
@@ -90,16 +139,32 @@ func (hh *HttpHook) logHandler() {
90139
for {
91140
select {
92141
case log := <-hh.logChan:
93-
logJson, err := json.Marshal(log)
94-
if err != nil {
95-
fmt.Fprintf(os.Stderr, "Marshal Err: %v", err)
142+
hh.batch.Add(log)
143+
if hh.batch.Count() > MaxBatchCount || hh.batch.Size() > uint64(MaxBatchSize) {
144+
err := hh.batch.PostAndReset()
145+
if err != nil {
146+
fmt.Fprintf(os.Stderr, "Send Batch failed: %v", err)
147+
break
148+
}
149+
// if the batch is sent
150+
// to avoid ticking on an empty batch
151+
hh.batchTicker.Reset(BatchSendFreq)
96152
}
97-
requestBody := bytes.NewBuffer(logJson)
98-
_, err = http.Post(hh.Addr, "application/json", requestBody)
99-
if err != nil {
100-
fmt.Fprintf(os.Stderr, "Http Logger Err: %v", err)
153+
case <-hh.batchTicker.C:
154+
if hh.batch.Size() > 0 {
155+
err := hh.batch.PostAndReset()
156+
if err != nil {
157+
fmt.Fprintf(os.Stderr, "Send Batch failed: %v", err)
158+
}
101159
}
102160
case <-hh.closeChan:
161+
// if there are logs in the buffered batch, send them
162+
if hh.batch.Count() > 0 {
163+
err := hh.batch.PostAndReset()
164+
if err != nil {
165+
fmt.Fprintf(os.Stderr, "Send Batch failed: %v", err)
166+
}
167+
}
103168
fmt.Fprintf(os.Stderr, "Closing http logger")
104169
return
105170
}
@@ -109,6 +174,7 @@ func (hh *HttpHook) logHandler() {
109174
// Close ends the logHandler goroutine
110175
func (hh *HttpHook) Close() {
111176
hh.closeChan <- struct{}{}
177+
hh.batchTicker.Stop()
112178
// to mark further Close as no-op
113179
hh.closeChan = nil
114180
}

tests/integ/admin_server/logendpoint_test.go

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ func setupCleanDB(uri string) (*mongo.Client, error) {
4646
return client, err
4747
}
4848

49-
func submitLog(addr string, log server.Log) error {
50-
logJson, err := json.Marshal(log)
49+
func submitLogs(addr string, logs []server.Log) error {
50+
batchJson, err := json.Marshal(logs)
5151
if err != nil {
5252
fmt.Fprintf(os.Stderr, "Marshal Err: %v", err)
5353
}
54-
requestBody := bytes.NewBuffer(logJson)
54+
requestBody := bytes.NewBuffer(batchJson)
5555
_, err = http.Post(addr, "application/json", requestBody)
5656

5757
return err
@@ -77,34 +77,39 @@ func getAllLogs(t *testing.T, db *mongo.Client) []mongoStorage.Log {
7777
return dbLogs
7878
}
7979

80-
func TestLogPush(t *testing.T) {
80+
func TestBatchPush(t *testing.T) {
8181
var (
8282
logData = "test log push"
8383
loglevel = "info"
84-
logDate = time.Now()
84+
date = time.Now()
85+
logsNum = 100
86+
logs []server.Log
8587
)
86-
log := server.Log{
87-
LogData: logData,
88-
LogLevel: loglevel,
89-
Date: logDate,
88+
89+
for i := 0; i < logsNum; i++ {
90+
logs = append(logs, server.Log{
91+
LogData: fmt.Sprintf("%s %d", logData, i),
92+
LogLevel: loglevel,
93+
Date: date,
94+
})
9095
}
9196

9297
db, err := setupCleanDB(*flagMongoEndpoint)
9398
if err != nil {
9499
t.Fatal(err)
95100
}
96-
ctx, cancel := context.WithTimeout(context.Background(), *flagOperationTimeout)
97-
defer cancel()
98-
defer db.Disconnect(ctx)
99101

100-
err = submitLog(*flagAdminEndpoint, log)
102+
err = submitLogs(*flagAdminEndpoint, logs)
101103
if err != nil {
102104
t.Fatal(err)
103105
}
104106

107+
ctx, cancel := context.WithTimeout(context.Background(), *flagOperationTimeout)
108+
defer cancel()
109+
defer db.Disconnect(ctx)
110+
105111
dbLogs := getAllLogs(t, db)
106112

107-
require.Equal(t, 1, len(dbLogs))
108-
require.Equal(t, logData, dbLogs[0].LogData)
109-
require.Equal(t, loglevel, dbLogs[0].LogLevel)
113+
require.Equal(t, len(logs), len(dbLogs))
114+
assertEqualResults(t, dbLogs, logs)
110115
}

0 commit comments

Comments
 (0)