Skip to content

Commit ab4dd6f

Browse files
committed
Cover all commands
1 parent 62b4ddc commit ab4dd6f

4 files changed

Lines changed: 192 additions & 39 deletions

File tree

README.md

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# RDPROXY
22

3-
This is a redis proxy to make ACL more convenient. It prefixes keys with the ACL username before sending it to upstream and undoing it when it about to send downstream.
3+
This is a redis proxy to make ACL more convenient. It prefixes keys with the ACL username before sending it to upstream and undoing it when it about to send downstream. This makes the connection appear truly support redis for multi tenancy.
44

5-
This software is currently WIP. Only support RESP2.
5+
At the moment this proxy does.
66

77
Your app can connect to this instance listening by default at port `6479`.
88

@@ -12,3 +12,18 @@ Your app can connect to this instance listening by default at port `6479`.
1212
|:--|:--|
1313
|`LISTEN`|`:6479`|
1414
|`UPSTREAM_REDIS`|`:6379`|
15+
16+
## TODO
17+
18+
+ RESP3 protocol (aka `HELLO 3`)
19+
+ Pub/Sub implementations
20+
+ Use cluster for read operations
21+
+ Unit tests
22+
23+
## Acknowledgements
24+
25+
Some parts of the code are inspired from these related projects
26+
27+
- [redis-namespace](https://github.com/resque/redis-namespace/)
28+
- [redigo](https://github.com/gomodule/redigo)
29+
- [redcon](github.com/tidwall/redcon)

handler.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ func (m *Handler) ServeRESP(conn redcon.Conn, cmd redcon.Command) {
4141

4242
upConn := context.upstreamConn
4343
command := strings.ToUpper(string(cmd.Args[0]))
44-
reviver := modSingleCommand(command, context.username, cmd.Args)
44+
newArgs, reviver := modSingleCommand(command, context.username, cmd.Args)
45+
cmd.Args = newArgs
4546

4647
// Construct RESP command & send to redis
4748
request := buildRESPCommand(cmd.Args)
@@ -56,7 +57,7 @@ func (m *Handler) ServeRESP(conn redcon.Conn, cmd redcon.Command) {
5657
var b bytes.Buffer
5758

5859
reader := newRespReader(bufio.NewReader(upConn), &b, reviver)
59-
if err = reader.readReply(); err != nil {
60+
if err = reader.ReadReply(); err != nil {
6061
log.Printf("Failed to read response: %v", err)
6162
conn.Close()
6263
return

mod.go

Lines changed: 162 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,48 +2,182 @@ package main
22

33
import "strings"
44

5-
var modCommands = map[string]bool{
6-
"GET": true, "SET": true, "MGET": true, "MSET": true, "DEL": true,
7-
"EXISTS": true, "INCR": true, "DECR": true, "HGET": true, "HSET": true,
8-
"LPUSH": true, "RPUSH": true, "LPOP": true, "RPOP": true,
9-
"TYPE": true, "TTL": true,
5+
// Command modification types
6+
const (
7+
ModifyFirst = "first"
8+
ModifyFirstTwo = "first_two"
9+
ModifyAll = "all"
10+
ModifyExcludeFirst = "exclude_first"
11+
ModifyExcludeLast = "exclude_last"
12+
ModifyExcludeOpts = "exclude_options"
13+
ModifyAlternate = "alternate"
14+
ModifySort = "sort"
15+
ModifyEvalStyle = "eval_style"
16+
ModifyScanStyle = "scan_style"
17+
18+
ReviveFirst = "first"
19+
ReviveSecond = "second"
20+
ReviveAll = "all"
21+
)
22+
23+
// Define namespaced command rules (extracted from redis-namespace)
24+
var namespacedCommands = map[string]string{
25+
"APPEND": ModifyFirst, "BITCOUNT": ModifyFirst, "BITFIELD": ModifyFirst,
26+
"BITOP": ModifyExcludeFirst, "BITPOS": ModifyFirst, "BLPOP": ModifyExcludeLast,
27+
"BRPOP": ModifyExcludeLast, "BRPOPLPUSH": ModifyExcludeLast, "BZPOPMIN": ModifyFirst,
28+
"BZPOPMAX": ModifyFirst, "DECR": ModifyFirst, "DECRBY": ModifyFirst, "DEL": ModifyAll,
29+
"DUMP": ModifyFirst, "EXISTS": ModifyAll, "EXPIRE": ModifyFirst, "EXPIREAT": ModifyFirst,
30+
"EXPIRETIME": ModifyFirst, "EVAL": ModifyEvalStyle, "EVALSHA": ModifyEvalStyle,
31+
"GET": ModifyFirst, "GETEX": ModifyFirst, "GETBIT": ModifyFirst, "GETRANGE": ModifyFirst,
32+
"GETSET": ModifyFirst, "HSET": ModifyFirst, "HSETNX": ModifyFirst, "HGET": ModifyFirst,
33+
"HINCRBY": ModifyFirst, "HINCRBYFLOAT": ModifyFirst, "HMGET": ModifyFirst,
34+
"HMSET": ModifyFirst, "HDEL": ModifyFirst, "HEXISTS": ModifyFirst, "HLEN": ModifyFirst,
35+
"HKEYS": ModifyFirst, "HSCAN": ModifyFirst, "HSCAN_EACH": ModifyFirst, "HVALS": ModifyFirst,
36+
"HGETALL": ModifyFirst, "INCR": ModifyFirst, "INCRBY": ModifyFirst, "INCRBYFLOAT": ModifyFirst,
37+
"KEYS": ModifyFirst, "LINDEX": ModifyFirst, "LINSERT": ModifyFirst, "LLEN": ModifyFirst,
38+
"LMOVE": ModifyFirstTwo, "LPOP": ModifyFirst, "LPOS": ModifyFirst, "LPUSH": ModifyFirst, "LPUSHX": ModifyFirst,
39+
"LRANGE": ModifyFirst, "LREM": ModifyFirst, "LSET": ModifyFirst, "LTRIM": ModifyFirst,
40+
"MAPPED_HMSET": ModifyFirst, "MAPPED_HMGET": ModifyFirst, "MAPPED_MGET": ModifyAll,
41+
"MAPPED_MSET": ModifyAll, "MAPPED_MSETNX": ModifyAll, "MGET": ModifyAll, "MOVE": ModifyFirst,
42+
"MSET": ModifyAlternate, "MSETNX": ModifyAlternate, "OBJECT": ModifyExcludeFirst,
43+
"PERSIST": ModifyFirst, "PEXPIRE": ModifyFirst, "PEXPIREAT": ModifyFirst, "PEXPIRETIME": ModifyFirst,
44+
"PSUBSCRIBE": ModifyAll, "PTTL": ModifyFirst, "PUBLISH": ModifyFirst, "PUNSUBSCRIBE": ModifyAll,
45+
"RENAME": ModifyAll, "RENAMENX": ModifyAll, "RESTORE": ModifyFirst, "RPOP": ModifyFirst,
46+
"RPOPLPUSH": ModifyAll, "RPUSH": ModifyFirst, "RPUSHX": ModifyFirst, "SADD": ModifyFirst,
47+
"SCARD": ModifyFirst, "SCAN": ModifyScanStyle, "SCAN_EACH": ModifyScanStyle,
48+
"SDIFF": ModifyAll, "SDIFFSTORE": ModifyAll, "SET": ModifyFirst, "SETBIT": ModifyFirst,
49+
"SETEX": ModifyFirst, "SETNX": ModifyFirst, "SETRANGE": ModifyFirst, "SINTER": ModifyAll,
50+
"SINTERSTORE": ModifyAll, "SISMEMBER": ModifyFirst, "SMEMBERS": ModifyFirst,
51+
"SMISMEMBER": ModifyFirst, "SMOVE": ModifyExcludeLast, "SORT": ModifySort, "SPOP": ModifyFirst,
52+
"SRANDMEMBER": ModifyFirst, "SREM": ModifyFirst, "SSCAN": ModifyFirst, "SSCAN_EACH": ModifyFirst,
53+
"STRLEN": ModifyFirst, "SUBSCRIBE": ModifyAll, "SUNION": ModifyAll, "SUNIONSTORE": ModifyAll,
54+
"TTL": ModifyFirst, "TYPE": ModifyFirst, "UNLINK": ModifyAll, "UNSUBSCRIBE": ModifyAll,
55+
"ZADD": ModifyFirst, "ZCARD": ModifyFirst, "ZCOUNT": ModifyFirst, "ZINCRBY": ModifyFirst,
56+
"ZINTERSTORE": ModifyExcludeOpts, "ZPOPMIN": ModifyFirst, "ZPOPMAX": ModifyFirst,
57+
"ZRANGE": ModifyFirst, "ZRANGEBYSCORE": ModifyFirst, "ZRANGEBYLEX": ModifyFirst,
58+
"ZRANK": ModifyFirst, "ZREM": ModifyFirst, "ZREMRANGEBYRANK": ModifyFirst,
59+
"ZREMRANGEBYSCORE": ModifyFirst, "ZREMRANGEBYLEX": ModifyFirst, "ZREVRANGE": ModifyFirst,
60+
"ZREVRANGEBYSCORE": ModifyFirst, "ZREVRANGEBYLEX": ModifyFirst, "ZREVRANK": ModifyFirst,
61+
"ZSCAN": ModifyFirst, "ZSCAN_EACH": ModifyFirst, "ZSCORE": ModifyFirst, "ZUNIONSTORE": ModifyExcludeOpts,
1062
}
1163

12-
func modSingleCommand(command, username string, args [][]byte) transformerFn {
13-
if modCommands[command] && len(args) > 1 {
14-
args[1] = append([]byte(username+":"), args[1]...)
15-
return nil
16-
}
64+
var reviverCommands = map[string]string{
65+
"BLPOP": ReviveFirst,
66+
"BRPOP": ReviveFirst,
67+
"KEYS": ReviveAll,
68+
"MAPPED_MGET": ReviveAll,
69+
"SCAN": ReviveSecond,
70+
"SCAN_EACH": ReviveAll,
71+
}
1772

18-
if command == "OBJECT" && len(args) == 3 {
19-
args[2] = append([]byte(username+":"), args[2]...)
20-
return nil
73+
func modSingleCommand(command, username string, args [][]byte) ([][]byte, reviverFn) {
74+
modType, exists := namespacedCommands[strings.ToUpper(command)]
75+
if !exists {
76+
return args, nil // No modification needed
2177
}
2278

23-
if command == "KEYS" && len(args) == 2 {
24-
args[1] = append([]byte(username+":"), args[1]...)
25-
return func(code byte, line []byte) []byte {
26-
if len(line) == 0 || (line[0] >= '0' && line[0] <= '9') {
27-
return line
79+
namespacePrefix := []byte(username + ":")
80+
81+
switch modType {
82+
case ModifyFirst:
83+
if len(args) > 1 {
84+
args[1] = append(namespacePrefix, args[1]...)
85+
}
86+
case ModifyFirstTwo:
87+
if len(args) > 2 {
88+
args[1] = append(namespacePrefix, args[1]...)
89+
args[2] = append(namespacePrefix, args[2]...)
90+
}
91+
case ModifyAll:
92+
for i := 1; i < len(args); i++ {
93+
args[i] = append(namespacePrefix, args[i]...)
94+
}
95+
case ModifyExcludeFirst:
96+
for i := 2; i < len(args); i++ {
97+
args[i] = append(namespacePrefix, args[i]...)
98+
}
99+
case ModifyExcludeLast:
100+
for i := 1; i < len(args)-1; i++ {
101+
args[i] = append(namespacePrefix, args[i]...)
102+
}
103+
case ModifyExcludeOpts:
104+
if len(args) > 2 && len(args[len(args)-1]) > 0 {
105+
// Check if last argument is an option (e.g., weight, aggregate)
106+
for i := 1; i < len(args)-1; i++ {
107+
args[i] = append(namespacePrefix, args[i]...)
108+
}
109+
} else {
110+
for i := 1; i < len(args); i++ {
111+
args[i] = append(namespacePrefix, args[i]...)
28112
}
29-
return line[len(username)+1:]
30113
}
31-
}
32-
33-
if command == "SCAN" {
34-
// Find "MATCH" argument and modify its pattern
114+
case ModifyAlternate:
115+
for i := 2; i < len(args); i += 2 {
116+
args[i] = append(namespacePrefix, args[i]...)
117+
}
118+
case ModifySort:
119+
if len(args) > 1 {
120+
args[1] = append(namespacePrefix, args[1]...)
121+
}
122+
// If second argument is not hash, modify 'by', 'store', and 'get' keys
123+
for i := 2; i+1 < len(args); i += 1 {
124+
key := strings.ToUpper(string(args[i]))
125+
if key == "BY" || key == "STORE" {
126+
i += 1
127+
args[i] = append(namespacePrefix, args[i]...)
128+
}
129+
if key == "LIMIT" {
130+
i += 2
131+
}
132+
if key == "GET" {
133+
i += 1
134+
}
135+
}
136+
case ModifyEvalStyle:
137+
for i := 2; i < 1+len(args)/2; i++ {
138+
args[i] = append(namespacePrefix, args[i]...)
139+
}
140+
case ModifyScanStyle:
141+
// Modify MATCH argument
142+
found := false
35143
for i := 1; i < len(args)-1; i++ {
36144
if strings.ToUpper(string(args[i])) == "MATCH" {
37-
args[i+1] = append([]byte(username+":"), args[i+1]...)
145+
args[i+1] = append(namespacePrefix, args[i+1]...)
146+
found = true
38147
break
39148
}
40149
}
41-
return func(code byte, line []byte) []byte {
42-
if len(line) == 0 || (line[0] >= '0' && line[0] <= '9') {
150+
if !found {
151+
args = append(args, []byte("MATCH"), []byte(username+":*"))
152+
}
153+
}
154+
155+
revType, exists := reviverCommands[strings.ToUpper(command)]
156+
if !exists {
157+
return args, nil // No reviver needed
158+
}
159+
160+
var reviver reviverFn
161+
switch revType {
162+
case ReviveFirst:
163+
reviver = func(pos, depth int, line []byte) []byte {
164+
if depth == 1 && pos != 0 {
165+
return line
166+
}
167+
return line[len(namespacePrefix):]
168+
}
169+
case ReviveSecond:
170+
reviver = func(pos, depth int, line []byte) []byte {
171+
if depth == 1 && pos != 1 {
43172
return line
44173
}
45-
return line[len(username)+1:]
174+
return line[len(namespacePrefix):]
175+
}
176+
case ReviveAll:
177+
reviver = func(pos, depth int, line []byte) []byte {
178+
return line[len(namespacePrefix):]
46179
}
47180
}
48-
return nil
181+
182+
return args, reviver
49183
}

reader.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import (
88
"strconv"
99
)
1010

11-
type transformerFn func(code byte, line []byte) []byte
11+
type reviverFn func(pos int, depth int, line []byte) []byte
1212

13-
func newRespReader(b *bufio.Reader, w *bytes.Buffer, fn transformerFn) RespReader {
13+
func newRespReader(b *bufio.Reader, w *bytes.Buffer, fn reviverFn) RespReader {
1414
return RespReader{
1515
br: b,
1616
bw: w,
@@ -21,7 +21,7 @@ func newRespReader(b *bufio.Reader, w *bytes.Buffer, fn transformerFn) RespReade
2121
type RespReader struct {
2222
br *bufio.Reader
2323
bw *bytes.Buffer
24-
fn transformerFn
24+
fn reviverFn
2525
}
2626

2727
type readerError string
@@ -30,7 +30,10 @@ func (pe readerError) Error() string {
3030
return fmt.Sprintf("parse erro: %s", string(pe))
3131
}
3232

33-
func (c *RespReader) readReply() error {
33+
func (c *RespReader) ReadReply() error {
34+
return c.readReply(0, 0)
35+
}
36+
func (c *RespReader) readReply(pos, depth int) error {
3437
line, err := c.readLine()
3538
if err != nil {
3639
return err
@@ -57,7 +60,7 @@ func (c *RespReader) readReply() error {
5760
return err
5861
}
5962
if c.fn != nil {
60-
p = c.fn('$', p)
63+
p = c.fn(pos, depth, p)
6164
}
6265
c.bw.WriteString("$" + strconv.Itoa(len(p)-2) + "\r\n")
6366
c.bw.Write(p)
@@ -72,8 +75,8 @@ func (c *RespReader) readReply() error {
7275
return nil
7376
}
7477
c.bw.Write(line)
75-
for range n {
76-
err = c.readReply()
78+
for i := range n {
79+
err = c.readReply(i, depth+1)
7780
if err != nil {
7881
return err
7982
}

0 commit comments

Comments
 (0)