-
Notifications
You must be signed in to change notification settings - Fork 2
fix: broadcast DEL_PREFIX to all shard groups in ShardedCoordinator #482
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ce01f1c
622917d
15402a2
c9f16e5
1857c74
e51d41c
6a4df79
960d3fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,8 @@ import ( | |
| "context" | ||
| "log/slog" | ||
| "slices" | ||
| "sync" | ||
| "sync/atomic" | ||
| "time" | ||
|
|
||
| "github.com/bootjp/elastickv/distribution" | ||
|
|
@@ -68,6 +70,13 @@ func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[ | |
| return nil, err | ||
| } | ||
|
|
||
| // DEL_PREFIX cannot be routed to a single shard because the prefix may | ||
| // span multiple shards (or be nil, meaning "all keys"). Broadcast the | ||
| // operation to every shard group so each FSM scans locally. | ||
| if hasDelPrefixElem(reqs.Elems) { | ||
| return c.dispatchDelPrefixBroadcast(reqs.IsTxn, reqs.Elems) | ||
| } | ||
|
|
||
| if reqs.IsTxn && reqs.StartTS == 0 { | ||
| startTS, err := c.nextStartTS(ctx, reqs.Elems) | ||
| if err != nil { | ||
|
|
@@ -97,6 +106,94 @@ func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[ | |
| return &CoordinateResponse{CommitIndex: r.CommitIndex}, nil | ||
| } | ||
|
|
||
| // hasDelPrefixElem returns true if any element is a DelPrefix operation. | ||
| func hasDelPrefixElem(elems []*Elem[OP]) bool { | ||
| for _, e := range elems { | ||
| if e != nil && e.Op == DelPrefix { | ||
| return true | ||
| } | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| // validateDelPrefixOnly ensures all elements are DelPrefix operations. | ||
| // Mixing DEL_PREFIX with other operations (PUT, DEL) in a single dispatch is | ||
| // not allowed because the FSM handles DEL_PREFIX exclusively. | ||
| func validateDelPrefixOnly(elems []*Elem[OP]) error { | ||
| for _, e := range elems { | ||
| if e != nil && e.Op != DelPrefix { | ||
| return errors.Wrap(ErrInvalidRequest, "DEL_PREFIX cannot be mixed with other operations") | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // dispatchDelPrefixBroadcast validates and broadcasts DEL_PREFIX operations | ||
| // to every shard group. Each element becomes a separate pb.Request (the FSM's | ||
| // extractDelPrefix processes only the first DEL_PREFIX mutation per request). | ||
| // All requests are batched into a single Commit call per shard group. | ||
| func (c *ShardedCoordinator) dispatchDelPrefixBroadcast(isTxn bool, elems []*Elem[OP]) (*CoordinateResponse, error) { | ||
| if isTxn { | ||
| return nil, errors.Wrap(ErrInvalidRequest, "DEL_PREFIX not supported in transactions") | ||
| } | ||
| if err := validateDelPrefixOnly(elems); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| ts := c.clock.Next() | ||
| requests := make([]*pb.Request, 0, len(elems)) | ||
| for _, elem := range elems { | ||
| requests = append(requests, &pb.Request{ | ||
| IsTxn: false, | ||
| Phase: pb.Phase_NONE, | ||
| Ts: ts, | ||
| Mutations: []*pb.Mutation{elemToMutation(elem)}, | ||
| }) | ||
| } | ||
|
|
||
| return c.broadcastToAllGroups(requests) | ||
| } | ||
|
Comment on lines
+135
to
+155
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In References
|
||
|
|
||
| // broadcastToAllGroups sends the same set of requests to every shard group in | ||
| // parallel and returns the maximum commit index. | ||
| func (c *ShardedCoordinator) broadcastToAllGroups(requests []*pb.Request) (*CoordinateResponse, error) { | ||
| var ( | ||
| maxIndex atomic.Uint64 | ||
| firstErr error | ||
| errMu sync.Mutex | ||
| wg sync.WaitGroup | ||
| ) | ||
| for _, g := range c.groups { | ||
| wg.Add(1) | ||
| go func(g *ShardGroup) { | ||
|
Comment on lines
+166
to
+168
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The loop iterates over for gid, g := range c.groups {
if g == nil || g.Txn == nil {
errMu.Lock()
if firstErr == nil {
firstErr = errors.Wrapf(ErrInvalidRequest, "group %d has no transaction manager", gid)
}
errMu.Unlock()
continue
}
wg.Add(1)
go func(g *ShardGroup) { |
||
| defer wg.Done() | ||
| r, err := g.Txn.Commit(requests) | ||
| if err != nil { | ||
| errMu.Lock() | ||
| if firstErr == nil { | ||
| firstErr = err | ||
| } | ||
| errMu.Unlock() | ||
| return | ||
| } | ||
| if r != nil { | ||
| for { | ||
| cur := maxIndex.Load() | ||
| if r.CommitIndex <= cur || maxIndex.CompareAndSwap(cur, r.CommitIndex) { | ||
| break | ||
| } | ||
| } | ||
| } | ||
| }(g) | ||
| } | ||
|
Comment on lines
+166
to
+188
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The broadcast of |
||
| wg.Wait() | ||
|
|
||
| if firstErr != nil { | ||
| return nil, errors.WithStack(firstErr) | ||
| } | ||
| return &CoordinateResponse{CommitIndex: maxIndex.Load()}, nil | ||
| } | ||
|
|
||
| func (c *ShardedCoordinator) dispatchTxn(startTS uint64, commitTS uint64, elems []*Elem[OP]) (*CoordinateResponse, error) { | ||
| grouped, gids, err := c.groupMutations(elems) | ||
| if err != nil { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broadcasting an entire
OperationGroupthat contains mixed operation types (e.g.,PUTandDEL_PREFIX) is problematic for two reasons:kvFSM.handleRawRequestimplementation (infsm.go) only processes the firstDEL_PREFIXmutation it finds and ignores all other mutations in the same request. Thus, anyPUTorDELoperations in the same group would be lost.DEL_PREFIXis not supported within transactions by the FSM. This broadcast logic currently intercepts transactions before they are validated, potentially bypassing intended transactional semantics.You should ensure that
DEL_PREFIXis only allowed in non-transactional groups and that it is not mixed with other operation types.