Skip to content

Commit ed6393f

Browse files
authored
Merge pull request #482 from bootjp/feat/fix-jepsen
fix: broadcast DEL_PREFIX to all shard groups in ShardedCoordinator
2 parents 46dffb7 + 960d3fb commit ed6393f

2 files changed

Lines changed: 481 additions & 0 deletions

File tree

kv/sharded_coordinator.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"context"
66
"log/slog"
77
"slices"
8+
"sync"
9+
"sync/atomic"
810
"time"
911

1012
"github.com/bootjp/elastickv/distribution"
@@ -68,6 +70,13 @@ func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[
6870
return nil, err
6971
}
7072

73+
// DEL_PREFIX cannot be routed to a single shard because the prefix may
74+
// span multiple shards (or be nil, meaning "all keys"). Broadcast the
75+
// operation to every shard group so each FSM scans locally.
76+
if hasDelPrefixElem(reqs.Elems) {
77+
return c.dispatchDelPrefixBroadcast(reqs.IsTxn, reqs.Elems)
78+
}
79+
7180
if reqs.IsTxn && reqs.StartTS == 0 {
7281
startTS, err := c.nextStartTS(ctx, reqs.Elems)
7382
if err != nil {
@@ -97,6 +106,94 @@ func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[
97106
return &CoordinateResponse{CommitIndex: r.CommitIndex}, nil
98107
}
99108

109+
// hasDelPrefixElem returns true if any element is a DelPrefix operation.
110+
func hasDelPrefixElem(elems []*Elem[OP]) bool {
111+
for _, e := range elems {
112+
if e != nil && e.Op == DelPrefix {
113+
return true
114+
}
115+
}
116+
return false
117+
}
118+
119+
// validateDelPrefixOnly ensures all elements are DelPrefix operations.
120+
// Mixing DEL_PREFIX with other operations (PUT, DEL) in a single dispatch is
121+
// not allowed because the FSM handles DEL_PREFIX exclusively.
122+
func validateDelPrefixOnly(elems []*Elem[OP]) error {
123+
for _, e := range elems {
124+
if e != nil && e.Op != DelPrefix {
125+
return errors.Wrap(ErrInvalidRequest, "DEL_PREFIX cannot be mixed with other operations")
126+
}
127+
}
128+
return nil
129+
}
130+
131+
// dispatchDelPrefixBroadcast validates and broadcasts DEL_PREFIX operations
132+
// to every shard group. Each element becomes a separate pb.Request (the FSM's
133+
// extractDelPrefix processes only the first DEL_PREFIX mutation per request).
134+
// All requests are batched into a single Commit call per shard group.
135+
func (c *ShardedCoordinator) dispatchDelPrefixBroadcast(isTxn bool, elems []*Elem[OP]) (*CoordinateResponse, error) {
136+
if isTxn {
137+
return nil, errors.Wrap(ErrInvalidRequest, "DEL_PREFIX not supported in transactions")
138+
}
139+
if err := validateDelPrefixOnly(elems); err != nil {
140+
return nil, err
141+
}
142+
143+
ts := c.clock.Next()
144+
requests := make([]*pb.Request, 0, len(elems))
145+
for _, elem := range elems {
146+
requests = append(requests, &pb.Request{
147+
IsTxn: false,
148+
Phase: pb.Phase_NONE,
149+
Ts: ts,
150+
Mutations: []*pb.Mutation{elemToMutation(elem)},
151+
})
152+
}
153+
154+
return c.broadcastToAllGroups(requests)
155+
}
156+
157+
// broadcastToAllGroups sends the same set of requests to every shard group in
158+
// parallel and returns the maximum commit index.
159+
func (c *ShardedCoordinator) broadcastToAllGroups(requests []*pb.Request) (*CoordinateResponse, error) {
160+
var (
161+
maxIndex atomic.Uint64
162+
firstErr error
163+
errMu sync.Mutex
164+
wg sync.WaitGroup
165+
)
166+
for _, g := range c.groups {
167+
wg.Add(1)
168+
go func(g *ShardGroup) {
169+
defer wg.Done()
170+
r, err := g.Txn.Commit(requests)
171+
if err != nil {
172+
errMu.Lock()
173+
if firstErr == nil {
174+
firstErr = err
175+
}
176+
errMu.Unlock()
177+
return
178+
}
179+
if r != nil {
180+
for {
181+
cur := maxIndex.Load()
182+
if r.CommitIndex <= cur || maxIndex.CompareAndSwap(cur, r.CommitIndex) {
183+
break
184+
}
185+
}
186+
}
187+
}(g)
188+
}
189+
wg.Wait()
190+
191+
if firstErr != nil {
192+
return nil, errors.WithStack(firstErr)
193+
}
194+
return &CoordinateResponse{CommitIndex: maxIndex.Load()}, nil
195+
}
196+
100197
func (c *ShardedCoordinator) dispatchTxn(startTS uint64, commitTS uint64, elems []*Elem[OP]) (*CoordinateResponse, error) {
101198
grouped, gids, err := c.groupMutations(elems)
102199
if err != nil {

0 commit comments

Comments
 (0)