Skip to content

Commit b0cce58

Browse files
Hirobxcodec
andauthored
fix: multi master testing (#14)
* refactor: support multi-master connection * chore: add comments * chore: fix typo * chore: fix lint * refactor: increase module version * chore: add TODO * chore: fix data-struct * chore: fix linter * chore: remove todo * chore: fix struct's name * 🎨 just refactor * 🚸 Signed-off-by: Hiro <goferHiro@gmail.com> * 🚸 mostly refactoring Signed-off-by: Hiro <goferHiro@gmail.com> * ✅ mw tests + mocking Signed-off-by: Hiro <goferHiro@gmail.com> * 🐛 mandating 1 primary db connection Signed-off-by: Hiro <goferHiro@gmail.com> * ➖ sql loggers Signed-off-by: Hiro <goferHiro@gmail.com> * 🔀 main v2 Signed-off-by: Hiro <goferHiro@gmail.com> * 🚑 replicas count overlooked Signed-off-by: Hiro <goferHiro@gmail.com> * ✅ prepare Signed-off-by: Hiro <goferHiro@gmail.com> * 🚑 bugs being fixed Signed-off-by: Hiro <goferHiro@gmail.com> * ✅ ping Signed-off-by: Hiro <goferHiro@gmail.com> * ✅ ping context Signed-off-by: Hiro <goferHiro@gmail.com> * 🚧 prepare test Signed-off-by: Hiro <goferHiro@gmail.com> * 🚑 Prepare Context, Prepare Signed-off-by: Hiro <goferHiro@gmail.com> * 🚚 more tests, refactoring, Signed-off-by: Hiro <goferHiro@gmail.com> * ✅ table tests Signed-off-by: Hiro <goferHiro@gmail.com> * ✅ table tests Signed-off-by: Hiro <goferHiro@gmail.com> * 🎨 tests Signed-off-by: Hiro <goferHiro@gmail.com> * 🎨 tests Signed-off-by: Hiro <goferHiro@gmail.com> * 🎨 tests Signed-off-by: Hiro <goferHiro@gmail.com> * 🐛 tests Signed-off-by: Hiro <goferHiro@gmail.com> * 🐛 tests Signed-off-by: Hiro <goferHiro@gmail.com> * ⬆️ deps Signed-off-by: Hiro <goferHiro@gmail.com> * chore: fix some linter issue * chore: fix linter issue * chore: fix race condition test * chore: fix linter * chore: remove extra line * chore: fix the predict function * chore: fix lint * chore: remove unused log * chore: add CODEOWNERS Signed-off-by: Hiro <goferHiro@gmail.com> Co-authored-by: Iman Tumorang <iman.tumorang@gmail.com>
1 parent 94a1937 commit b0cce58

12 files changed

Lines changed: 377 additions & 219 deletions

.golangci.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ issues:
132132
- path: _test\.go
133133
linters:
134134
- gomnd
135+
- goconst
136+
- funlen
137+
- gocyclo
138+
- path: db_test.go
139+
text: "deferInLoop: Possible resource leak, 'defer' is called in the 'for' loop"
135140
- path: loadbalancer.go
136141
text: "G404: Use of weak random number generator" #expected, just for randomLB policy
137142
run:

CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
* @bxcodec @goferHiro

db.go

Lines changed: 56 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"errors"
88
"strings"
99
"time"
10+
11+
"go.uber.org/multierr"
1012
)
1113

1214
// DB interface is a contract that supported by this library.
@@ -58,40 +60,6 @@ type sqlDB struct {
5860
stmtLoadBalancer StmtLoadBalancer
5961
}
6062

61-
// Open concurrently opens each underlying db connection
62-
// dataSourceNames must be a semi-comma separated list of DSNs with the first
63-
// one being used as the RW-database(primary) and the rest as RO databases (replicas).
64-
func Open(driverName, dataSourceNames string) (res DB, err error) {
65-
conns := strings.Split(dataSourceNames, ";")
66-
if len(conns) == 0 {
67-
return nil, errors.New("invalid data source name")
68-
}
69-
opt := defaultOption()
70-
db := &sqlDB{
71-
replicas: make([]*sql.DB, len(conns)-1),
72-
primaries: make([]*sql.DB, 1),
73-
loadBalancer: opt.DBLB,
74-
stmtLoadBalancer: opt.StmtLB,
75-
}
76-
77-
db.totalConnection = len(conns)
78-
err = doParallely(db.totalConnection, func(i int) (err error) {
79-
if i == 0 {
80-
db.primaries[0], err = sql.Open(driverName, conns[i])
81-
return err
82-
}
83-
var roDB *sql.DB
84-
roDB, err = sql.Open(driverName, conns[i])
85-
if err != nil {
86-
return
87-
}
88-
db.replicas[i-1] = roDB
89-
return err
90-
})
91-
92-
return db, err
93-
}
94-
9563
// OpenMultiPrimary concurrently opens each underlying db connection
9664
// both primaryDataSourceNames and readOnlyDataSourceNames must be a semi-comma separated list of DSNs
9765
// primaryDataSourceNames will be used as the RW-database(primary)
@@ -138,14 +106,13 @@ func (db *sqlDB) ReplicaDBs() []*sql.DB {
138106

139107
// Close closes all physical databases concurrently, releasing any open resources.
140108
func (db *sqlDB) Close() error {
141-
return doParallely(db.totalConnection, func(i int) (err error) {
142-
if i < len(db.primaries) {
143-
return db.primaries[i].Close()
144-
}
145-
146-
roIndex := i - len(db.primaries)
147-
return db.replicas[roIndex].Close()
109+
errPrimaries := doParallely(len(db.primaries), func(i int) error {
110+
return db.primaries[i].Close()
111+
})
112+
errReplicas := doParallely(len(db.replicas), func(i int) error {
113+
return db.replicas[i].Close()
148114
})
115+
return multierr.Combine(errPrimaries, errReplicas)
149116
}
150117

151118
// Driver returns the physical database's underlying driver.
@@ -184,87 +151,89 @@ func (db *sqlDB) ExecContext(ctx context.Context, query string, args ...interfac
184151
// Ping verifies if a connection to each physical database is still alive,
185152
// establishing a connection if necessary.
186153
func (db *sqlDB) Ping() error {
187-
return doParallely(db.totalConnection, func(i int) error {
188-
if i < len(db.primaries) {
189-
return db.primaries[i].Ping()
190-
}
191-
192-
roIndex := i - len(db.primaries)
193-
return db.replicas[roIndex].Ping()
154+
errPrimaries := doParallely(len(db.primaries), func(i int) error {
155+
return db.primaries[i].Ping()
156+
})
157+
errReplicas := doParallely(len(db.replicas), func(i int) error {
158+
return db.replicas[i].Ping()
194159
})
160+
return multierr.Combine(errPrimaries, errReplicas)
195161
}
196162

197163
// PingContext verifies if a connection to each physical database is still
198164
// alive, establishing a connection if necessary.
199165
func (db *sqlDB) PingContext(ctx context.Context) error {
200-
return doParallely(db.totalConnection, func(i int) error {
201-
if i < len(db.primaries) {
202-
return db.primaries[i].PingContext(ctx)
203-
}
204-
roIndex := i - len(db.primaries)
205-
return db.replicas[roIndex].PingContext(ctx)
166+
errPrimaries := doParallely(len(db.primaries), func(i int) error {
167+
return db.primaries[i].PingContext(ctx)
168+
})
169+
errReplicas := doParallely(len(db.replicas), func(i int) error {
170+
return db.replicas[i].PingContext(ctx)
206171
})
172+
return multierr.Combine(errPrimaries, errReplicas)
207173
}
208174

209175
// Prepare creates a prepared statement for later queries or executions
210176
// on each physical database, concurrently.
211-
func (db *sqlDB) Prepare(query string) (Stmt, error) {
212-
stmt := &stmt{
213-
db: db,
214-
loadBalancer: db.stmtLoadBalancer,
215-
}
177+
func (db *sqlDB) Prepare(query string) (_stmt Stmt, err error) {
216178
roStmts := make([]*sql.Stmt, len(db.replicas))
217179
primaryStmts := make([]*sql.Stmt, len(db.primaries))
218-
err := doParallely(db.totalConnection, func(i int) (err error) {
219-
if i < len(db.primaries) {
220-
primaryStmts[i], err = db.primaries[i].Prepare(query)
221-
return err
222-
}
223180

224-
roIndex := i - len(db.primaries)
225-
roStmts[roIndex], err = db.replicas[roIndex].Prepare(query)
181+
errPrimaries := doParallely(len(db.primaries), func(i int) (err error) {
182+
primaryStmts[i], err = db.primaries[i].Prepare(query)
183+
return
184+
})
185+
errReplicas := doParallely(len(db.replicas), func(i int) (err error) {
186+
roStmts[i], err = db.replicas[i].Prepare(query)
226187
return err
227188
})
228189

190+
err = multierr.Combine(errPrimaries, errReplicas)
191+
229192
if err != nil {
230-
return nil, err
193+
return
231194
}
232195

233-
stmt.replicaStmts = roStmts
234-
stmt.primaryStmts = primaryStmts
235-
return stmt, nil
196+
_stmt = &stmt{
197+
db: db,
198+
loadBalancer: db.stmtLoadBalancer,
199+
primaryStmts: primaryStmts,
200+
replicaStmts: roStmts,
201+
}
202+
203+
return
236204
}
237205

238206
// PrepareContext creates a prepared statement for later queries or executions
239207
// on each physical database, concurrently.
240208
//
241209
// The provided context is used for the preparation of the statement, not for
242210
// the execution of the statement.
243-
func (db *sqlDB) PrepareContext(ctx context.Context, query string) (Stmt, error) {
244-
stmt := &stmt{
245-
db: db,
246-
loadBalancer: db.stmtLoadBalancer,
247-
}
211+
func (db *sqlDB) PrepareContext(ctx context.Context, query string) (_stmt Stmt, err error) {
248212
roStmts := make([]*sql.Stmt, len(db.replicas))
249213
primaryStmts := make([]*sql.Stmt, len(db.primaries))
250-
err := doParallely(db.totalConnection, func(i int) (err error) {
251-
if i < len(db.primaries) {
252-
primaryStmts[i], err = db.primaries[i].PrepareContext(ctx, query)
253-
return err
254-
}
255214

256-
roIndex := i - len(db.primaries)
257-
roStmts[roIndex], err = db.replicas[roIndex].PrepareContext(ctx, query)
215+
errPrimaries := doParallely(len(db.primaries), func(i int) (err error) {
216+
primaryStmts[i], err = db.primaries[i].PrepareContext(ctx, query)
217+
return
218+
})
219+
errReplicas := doParallely(len(db.replicas), func(i int) (err error) {
220+
roStmts[i], err = db.replicas[i].PrepareContext(ctx, query)
258221
return err
259222
})
260223

224+
err = multierr.Combine(errPrimaries, errReplicas)
225+
261226
if err != nil {
262-
return nil, err
227+
return
263228
}
264229

265-
stmt.replicaStmts = roStmts
266-
stmt.primaryStmts = primaryStmts
267-
return stmt, nil
230+
_stmt = &stmt{
231+
db: db,
232+
loadBalancer: db.stmtLoadBalancer,
233+
primaryStmts: primaryStmts,
234+
replicaStmts: roStmts,
235+
}
236+
return
268237
}
269238

270239
// Query executes a query that returns rows, typically a SELECT.
@@ -354,7 +323,7 @@ func (db *sqlDB) SetConnMaxIdleTime(d time.Duration) {
354323

355324
// ReadOnly returns the readonly database
356325
func (db *sqlDB) ReadOnly() *sql.DB {
357-
if db.totalConnection == len(db.primaries) {
326+
if len(db.replicas) == 0 {
358327
return db.loadBalancer.Resolve(db.primaries)
359328
}
360329
return db.loadBalancer.Resolve(db.replicas)

0 commit comments

Comments
 (0)