Skip to content

Commit 11fba87

Browse files
committed
Implement basic Pub/Sub
1 parent e20a6ed commit 11fba87

3 files changed

Lines changed: 67 additions & 20 deletions

File tree

README.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
This is a redis proxy to make ACL more convenient. It prefixes keys with the ACL username before sending it to upstream and undoing it when it about to send downstream. This makes the connecting clients appear like the whole service is dedicated for that client while actually the redis server is shared or set for multi tenancy.
44

5-
At the moment this proxy does.
6-
75
Your app can connect to this instance listening by default at port `6479`.
86

97
## What it does do
@@ -38,7 +36,6 @@ Note that the revival values doesn't work for KEYS ran via EVAL, that means you
3836
## TODO
3937

4038
+ RESP3 protocol (aka `HELLO 3`)
41-
+ Pub/Sub implementations
4239
+ Use cluster for read operations
4340
+ Unit tests
4441

handler.go

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"bytes"
66
"log"
77
"net"
8-
"strconv"
98
"strings"
109

1110
"github.com/tidwall/redcon"
@@ -19,6 +18,7 @@ type Handler struct {
1918
type HandlerContext struct {
2019
upstreamConn net.Conn
2120
username string
21+
detached bool
2222
}
2323

2424
func NewHandler(config Config) *Handler {
@@ -55,7 +55,6 @@ func (m *Handler) ServeRESP(conn redcon.Conn, cmd redcon.Command) {
5555

5656
// Read response from Redis
5757
var b bytes.Buffer
58-
5958
reader := newRespReader(bufio.NewReader(upConn), &b, reviver)
6059
if err = reader.ReadReply(); err != nil {
6160
log.Printf("Failed to read response: %v", err)
@@ -64,10 +63,16 @@ func (m *Handler) ServeRESP(conn redcon.Conn, cmd redcon.Command) {
6463
}
6564
response := b.Bytes()
6665

67-
if command == "AUTH" && len(cmd.Args) == 3 {
68-
if strings.HasPrefix(string(response), "+OK") {
66+
if len(response) > 0 && response[0] != '-' {
67+
if command == "AUTH" && len(cmd.Args) == 3 {
6968
context.username = string(cmd.Args[1])
7069
conn.SetContext(context)
70+
} else if command == "SUBSCRIBE" || command == "PSUBSRIBE" {
71+
// this will detach connection from this loop
72+
context.detached = true
73+
conn.SetContext(context)
74+
dconn := conn.Detach()
75+
go m.subscriptionLoop(dconn, upConn)
7176
}
7277
}
7378

@@ -91,22 +96,52 @@ func (m *Handler) AcceptConn(conn redcon.Conn) bool {
9196

9297
func (m *Handler) ClosedConn(conn redcon.Conn, err error) {
9398
context, ok := conn.Context().(HandlerContext)
94-
if ok && context.upstreamConn != nil {
99+
if ok && context.upstreamConn != nil && !context.detached {
95100
context.upstreamConn.Close()
101+
context.upstreamConn = nil
96102
}
97103
}
98104

99-
func buildRESPCommand(args [][]byte) []byte {
100-
var sb bytes.Buffer
101-
sb.WriteByte('*')
102-
sb.WriteString(strconv.Itoa(len(args)))
103-
sb.WriteString("\r\n")
104-
for _, arg := range args {
105-
sb.WriteByte('$')
106-
sb.WriteString(strconv.Itoa(len(arg)))
107-
sb.WriteString("\r\n")
108-
sb.Write(arg)
109-
sb.WriteString("\r\n")
105+
func (m *Handler) subscriptionLoop(conn redcon.DetachedConn, upConn net.Conn) {
106+
defer conn.Close()
107+
context, ok := conn.Context().(HandlerContext)
108+
if !ok || context.upstreamConn == nil {
109+
conn.Close()
110+
return
111+
}
112+
113+
go (func() {
114+
cmd, err := conn.ReadCommand()
115+
if err != nil {
116+
log.Printf("Failed to read command: %v", err)
117+
conn.Close()
118+
return
119+
}
120+
command := strings.ToUpper(string(cmd.Args[0]))
121+
newArgs, _ := modSingleCommand(command, context.username, cmd.Args)
122+
cmd.Args = newArgs
123+
124+
// Construct RESP command & send to redis
125+
request := buildRESPCommand(cmd.Args)
126+
_, err = upConn.Write(request)
127+
if err != nil {
128+
log.Printf("Failed to send command: %v", err)
129+
conn.Close()
130+
return
131+
}
132+
})()
133+
134+
for {
135+
conn.Flush()
136+
137+
var b bytes.Buffer
138+
reader := newRespReader(bufio.NewReader(upConn), &b, nil)
139+
if err := reader.ReadReply(); err != nil {
140+
log.Printf("Error reading subscription response: %v", err)
141+
conn.Close()
142+
return
143+
}
144+
145+
conn.WriteRaw(b.Bytes())
110146
}
111-
return sb.Bytes()
112147
}

reader.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,18 @@ func parseLen(p []byte) (int, error) {
133133

134134
return n, nil
135135
}
136+
137+
func buildRESPCommand(args [][]byte) []byte {
138+
var sb bytes.Buffer
139+
sb.WriteByte('*')
140+
sb.WriteString(strconv.Itoa(len(args)))
141+
sb.WriteString("\r\n")
142+
for _, arg := range args {
143+
sb.WriteByte('$')
144+
sb.WriteString(strconv.Itoa(len(arg)))
145+
sb.WriteString("\r\n")
146+
sb.Write(arg)
147+
sb.WriteString("\r\n")
148+
}
149+
return sb.Bytes()
150+
}

0 commit comments

Comments
 (0)