44 "context"
55 "database/sql"
66 "database/sql/driver"
7- "errors"
8- "strings"
97 "time"
108
119 "go.uber.org/multierr"
@@ -55,45 +53,10 @@ type StmtLoadBalancer LoadBalancer[*sql.Stmt]
5553type sqlDB struct {
5654 primaries []* sql.DB
5755 replicas []* sql.DB
58- totalConnection int
5956 loadBalancer DBLoadBalancer
6057 stmtLoadBalancer StmtLoadBalancer
6158}
6259
63- // OpenMultiPrimary concurrently opens each underlying db connection
64- // both primaryDataSourceNames and readOnlyDataSourceNames must be a semi-comma separated list of DSNs
65- // primaryDataSourceNames will be used as the RW-database(primary)
66- // and readOnlyDataSourceNames as RO databases (replicas).
67- func OpenMultiPrimary (driverName , primaryDataSourceNames , readOnlyDataSourceNames string ) (res DB , err error ) {
68- primaryConns := strings .Split (primaryDataSourceNames , ";" )
69- readOnlyConns := strings .Split (readOnlyDataSourceNames , ";" )
70-
71- if len (primaryConns ) == 0 {
72- return nil , errors .New ("require primary data source name" )
73- }
74-
75- opt := defaultOption ()
76- db := & sqlDB {
77- replicas : make ([]* sql.DB , len (readOnlyConns )),
78- primaries : make ([]* sql.DB , len (primaryConns )),
79- loadBalancer : opt .DBLB ,
80- stmtLoadBalancer : opt .StmtLB ,
81- }
82-
83- db .totalConnection = len (primaryConns ) + len (readOnlyConns )
84- err = doParallely (db .totalConnection , func (i int ) (err error ) {
85- if i < len (primaryConns ) {
86- db .primaries [0 ], err = sql .Open (driverName , primaryConns [i ])
87- return err
88- }
89- roIndex := i - len (primaryConns )
90- db .replicas [roIndex ], err = sql .Open (driverName , readOnlyConns [roIndex ])
91- return err
92- })
93-
94- return db , err
95- }
96-
9760// PrimaryDBs return all the active primary DB
9861func (db * sqlDB ) PrimaryDBs () []* sql.DB {
9962 return db .primaries
@@ -138,7 +101,7 @@ func (db *sqlDB) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, err
138101// The args are for any placeholder parameters in the query.
139102// Exec uses the RW-database as the underlying db connection
140103func (db * sqlDB ) Exec (query string , args ... interface {}) (sql.Result , error ) {
141- return db .ReadWrite (). Exec ( query , args ... )
104+ return db .ExecContext ( context . Background (), query , args ... )
142105}
143106
144107// ExecContext executes a query without returning any rows.
@@ -151,13 +114,7 @@ func (db *sqlDB) ExecContext(ctx context.Context, query string, args ...interfac
151114// Ping verifies if a connection to each physical database is still alive,
152115// establishing a connection if necessary.
153116func (db * sqlDB ) Ping () error {
154- errPrimaries := doParallely (len (db .primaries ), func (i int ) error {
155- return db .primaries [i ].Ping ()
156- })
157- errReplicas := doParallely (len (db .replicas ), func (i int ) error {
158- return db .replicas [i ].Ping ()
159- })
160- return multierr .Combine (errPrimaries , errReplicas )
117+ return db .PingContext (context .Background ())
161118}
162119
163120// PingContext verifies if a connection to each physical database is still
@@ -175,32 +132,7 @@ func (db *sqlDB) PingContext(ctx context.Context) error {
175132// Prepare creates a prepared statement for later queries or executions
176133// on each physical database, concurrently.
177134func (db * sqlDB ) Prepare (query string ) (_stmt Stmt , err error ) {
178- roStmts := make ([]* sql.Stmt , len (db .replicas ))
179- primaryStmts := make ([]* sql.Stmt , len (db .primaries ))
180-
181- errPrimaries := doParallely (len (db .primaries ), func (i int ) (err error ) {
182- primaryStmts [i ], err = db .primaries [i ].Prepare (query )
183- return
184- })
185- errReplicas := doParallely (len (db .replicas ), func (i int ) (err error ) {
186- roStmts [i ], err = db .replicas [i ].Prepare (query )
187- return err
188- })
189-
190- err = multierr .Combine (errPrimaries , errReplicas )
191-
192- if err != nil {
193- return
194- }
195-
196- _stmt = & stmt {
197- db : db ,
198- loadBalancer : db .stmtLoadBalancer ,
199- primaryStmts : primaryStmts ,
200- replicaStmts : roStmts ,
201- }
202-
203- return
135+ return db .PrepareContext (context .Background (), query )
204136}
205137
206138// PrepareContext creates a prepared statement for later queries or executions
@@ -240,7 +172,7 @@ func (db *sqlDB) PrepareContext(ctx context.Context, query string) (_stmt Stmt,
240172// The args are for any placeholder parameters in the query.
241173// Query uses a radonly db as the physical db.
242174func (db * sqlDB ) Query (query string , args ... interface {}) (* sql.Rows , error ) {
243- return db .ReadOnly (). Query ( query , args ... )
175+ return db .QueryContext ( context . Background (), query , args ... )
244176}
245177
246178// QueryContext executes a query that returns rows, typically a SELECT.
@@ -255,7 +187,7 @@ func (db *sqlDB) QueryContext(ctx context.Context, query string, args ...interfa
255187// Errors are deferred until Row's Scan method is called.
256188// QueryRow uses a radonly db as the physical db.
257189func (db * sqlDB ) QueryRow (query string , args ... interface {}) * sql.Row {
258- return db .ReadOnly (). QueryRow ( query , args ... )
190+ return db .QueryRowContext ( context . Background (), query , args ... )
259191}
260192
261193// QueryRowContext executes a query that is expected to return at most one row.
0 commit comments