Skip to content

Commit 5b51089

Browse files
committed
fix bug
1 parent fad7c6e commit 5b51089

2 files changed

Lines changed: 37 additions & 29 deletions

File tree

_example/main.go

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,29 +26,16 @@ func init() {
2626
func main() {
2727
flag.Parse()
2828

29-
c1 := &redisBuff.Config{
30-
SendBuff: 100,
31-
MsgBatch: 5,
32-
RunnerInterval: 5 * time.Second,
33-
CacheName: "message-test",
34-
LockDuration: 3 * time.Second,
35-
RLockDuration: 3 * time.Second,
36-
ClearMsgFunc: func(msg []string) {
37-
fmt.Println("from client: ", msg)
38-
},
39-
Debug: true,
40-
}
41-
29+
c1 := newConfig("message-test-1")
4230
s1 := redisBuff.New(c1)
4331
closeRunner := s1.SendMsgRunner()
4432

4533
go testAdd(s1)
4634
go loopAdd(s1)
4735
go testReadList(s1)
4836

49-
c2 := *c1
50-
c2.CacheName += "-2"
51-
s2 := redisBuff.New(&c2)
37+
c2 := newConfig("message-test-2")
38+
s2 := redisBuff.New(c2)
5239
closeRunner2 := s2.SendMsgRunner()
5340

5441
go testAdd(s2)
@@ -60,9 +47,24 @@ func main() {
6047
fmt.Println("send closeRunner signal")
6148
closeRunner <- true
6249
closeRunner2 <- true
50+
time.Sleep(50 * time.Millisecond)
6351
}
6452
}
6553

54+
func newConfig(name string) *redisBuff.Config {
55+
return &redisBuff.Config{
56+
SendBuff: 100,
57+
MsgBatch: 5,
58+
RunnerInterval: 5 * time.Second,
59+
CacheName: name,
60+
LockDuration: 3 * time.Second,
61+
RLockDuration: 3 * time.Second,
62+
ClearMsgFunc: func(msg []string) {
63+
fmt.Println("from client: ", msg)
64+
},
65+
// Debug: true,
66+
}
67+
}
6668
func testAdd(b *redisBuff.Buff) {
6769
var wg sync.WaitGroup
6870
for i := 0; i < 30; i++ {
@@ -81,11 +83,6 @@ func testReadList(b *redisBuff.Buff) {
8183
fmt.Println("from testReadList-1: ", b.List())
8284
}()
8385
}
84-
85-
for {
86-
fmt.Println("from testReadList-2: ", b.List())
87-
time.Sleep(500 * time.Millisecond)
88-
}
8986
}
9087

9188
func loopAdd(b *redisBuff.Buff) {

buff.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ type Buff struct {
102102
debug bool
103103
readLockName string
104104
writeLockName string
105+
L sync.RWMutex
106+
close bool
105107
}
106108

107109
// execute runner
@@ -116,7 +118,11 @@ func (b *Buff) SendMsgRunner() chan<- bool {
116118
case msg := <-b.send:
117119
b.pushMsgWithLock(msg)
118120
case <-done:
121+
b.L.Lock()
119122
close(b.send)
123+
b.close = true
124+
b.L.Unlock()
125+
120126
s := make(chan bool)
121127
go func() { // 等chan上的讯息处理完后才能关闭
122128
for len(b.send) > 0 {
@@ -162,7 +168,11 @@ func (b *Buff) getIntervalLock() (ok bool) {
162168
// add msg
163169
// sendBuff满时,将堵塞
164170
func (b *Buff) Add(data interface{}) {
165-
b.send <- data
171+
b.L.RLock()
172+
defer b.L.RUnlock()
173+
if !b.close {
174+
b.send <- data
175+
}
166176
}
167177

168178
// 获取目前缓存上的资料
@@ -184,17 +194,17 @@ func (b *Buff) RLock() func() { // 怪怪的,打印频率有点怪
184194
if rdb.Exists(ctx, b.writeLockName).Val() < 1 {
185195
break
186196
}
187-
b.debugMsg("[%s RLock] - 等待写锁释放\n", b.cacheName)
197+
// b.debugMsg("[%s RLock] - 等待写锁释放\n", b.cacheName)
188198
time.Sleep(100 * time.Millisecond)
189199
}
190200

191201
t := time.Now().Nanosecond()
192202
rdb.Set(ctx, b.readLockName, t, b.rlockDuration)
193-
b.debugMsg("[%s RLock] - 获得读锁\n", b.cacheName)
203+
// b.debugMsg("[%s RLock] - 获得读锁\n", b.cacheName)
194204
return func() {
195205
v, _ := strconv.Atoi(rdb.Get(ctx, b.readLockName).Val())
196206
if v == t {
197-
b.debugMsg("[%s RLock] - 释放读锁\n", b.cacheName)
207+
// b.debugMsg("[%s RLock] - 释放读锁\n", b.cacheName)
198208
rdb.Del(ctx, b.readLockName)
199209
}
200210
}
@@ -206,21 +216,22 @@ func (b *Buff) lock() func() {
206216
if rdb.Exists(ctx, b.readLockName).Val() < 1 {
207217
break
208218
}
209-
b.debugMsg("[%s Lock] - 等待读锁释放\n", b.cacheName)
210-
time.Sleep(100 * time.Second)
219+
// b.debugMsg("[%s Lock] - 等待读锁释放\n", b.cacheName)
220+
time.Sleep(100 * time.Millisecond)
211221
}
212222

213223
for {
214224
ok := rdb.SetNX(ctx, b.writeLockName, 1, b.lockDuration).Val()
215225
if ok {
216-
b.debugMsg("[%s Lock] - 获得写锁\n", b.cacheName)
226+
// b.debugMsg("[%s Lock] - 获得写锁\n", b.cacheName)
217227
break
218228
}
229+
// b.debugMsg("[%s Lock] - 读锁抢占失败\n", b.cacheName)
219230
time.Sleep(time.Millisecond * 50)
220231
}
221232

222233
return func() {
223-
b.debugMsg("[%s Lock] - 释放写锁\n", b.cacheName)
234+
// b.debugMsg("[%s Lock] - 释放写锁\n", b.cacheName)
224235
rdb.Del(ctx, b.writeLockName)
225236
}
226237
}

0 commit comments

Comments
 (0)