fix: broadcast DEL_PREFIX to all shard groups in ShardedCoordinator#482
fix: broadcast DEL_PREFIX to all shard groups in ShardedCoordinator#482
Conversation
DEL_PREFIX with a nil key (FLUSHALL) or a prefix spanning multiple shards could not be routed by groupMutations, causing "invalid request" errors in production where ShardedCoordinator is always used.
There was a problem hiding this comment.
Code Review
This pull request implements broadcasting for DEL_PREFIX operations within the ShardedCoordinator to ensure all shards process the prefix deletion. The review feedback identifies several critical issues: broadcasting mixed operation types (e.g., PUT and DEL_PREFIX) can lead to data loss because the underlying FSM only processes the first DEL_PREFIX mutation, and DEL_PREFIX is currently unsupported within transactions. Additionally, the HLC timestamp should be generated once outside the broadcast loop to maintain consistency across shards, and the FSM's limitation of only processing a single DEL_PREFIX mutation per request should be handled.
| if hasDelPrefixElem(reqs.Elems) { | ||
| return c.dispatchDelPrefixBroadcast(reqs.Elems) | ||
| } |
There was a problem hiding this comment.
Broadcasting an entire OperationGroup that contains mixed operation types (e.g., PUT and DEL_PREFIX) is problematic for two reasons:
- Data Loss: The
kvFSM.handleRawRequestimplementation (infsm.go) only processes the firstDEL_PREFIXmutation it finds and ignores all other mutations in the same request. Thus, anyPUTorDELoperations in the same group would be lost. - Transaction Bypass:
DEL_PREFIXis not supported within transactions by the FSM. This broadcast logic currently intercepts transactions before they are validated, potentially bypassing intended transactional semantics.
You should ensure that DEL_PREFIX is only allowed in non-transactional groups and that it is not mixed with other operation types.
if hasDelPrefixElem(reqs.Elems) {
if reqs.IsTxn {
return nil, errors.Wrapf(ErrInvalidRequest, "DEL_PREFIX not supported in transactions")
}
for _, e := range reqs.Elems {
if e.Op != DelPrefix {
return nil, errors.Wrapf(ErrInvalidRequest, "mixed DEL_PREFIX and other operations not supported")
}
}
return c.dispatchDelPrefixBroadcast(reqs.Elems)
}
kv/sharded_coordinator.go
Outdated
| func (c *ShardedCoordinator) dispatchDelPrefixBroadcast(elems []*Elem[OP]) (*CoordinateResponse, error) { | ||
| muts := make([]*pb.Mutation, 0, len(elems)) | ||
| for _, elem := range elems { | ||
| muts = append(muts, elemToMutation(elem)) | ||
| } |
There was a problem hiding this comment.
Note that if elems contains multiple DEL_PREFIX operations, only the first one will be executed by the FSM due to the logic in extractDelPrefix (in fsm.go). While this PR enables broadcasting, the underlying FSM still expects a DEL_PREFIX request to be the sole mutation or at least only processes the first one. If multiple prefixes need to be deleted, they should be sent as separate requests or the FSM should be updated to iterate over all mutations.
kv/sharded_coordinator.go
Outdated
| for _, g := range c.groups { | ||
| req := &pb.Request{ | ||
| IsTxn: false, | ||
| Phase: pb.Phase_NONE, | ||
| Ts: c.clock.Next(), | ||
| Mutations: muts, | ||
| } |
There was a problem hiding this comment.
The HLC timestamp should be generated once outside the loop to ensure consistency across all shards. Using c.clock.Next() inside the loop assigns a different timestamp to each shard for what is logically a single broadcast operation. In an MVCC system, operations within a single logical unit must share the same start timestamp to ensure atomicity and a consistent snapshot view.
| for _, g := range c.groups { | |
| req := &pb.Request{ | |
| IsTxn: false, | |
| Phase: pb.Phase_NONE, | |
| Ts: c.clock.Next(), | |
| Mutations: muts, | |
| } | |
| ts := c.clock.Next() | |
| for _, g := range c.groups { | |
| req := &pb.Request{ | |
| IsTxn: false, | |
| Phase: pb.Phase_NONE, | |
| Ts: ts, | |
| Mutations: muts, | |
| } |
References
- In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view.
Unit tests verify broadcast to all shard groups, partial failure handling, and single-shard scenarios. Integration tests verify end-to-end deletion across shards and txn-internal key preservation.
- Reject DEL_PREFIX in transactional groups (unsupported by FSM) - Reject mixing DEL_PREFIX with other operations (PUT/DEL) - Generate HLC timestamp once per element, shared across all shards - Dispatch each DEL_PREFIX element as a separate request (FSM only processes the first DEL_PREFIX mutation per request) - Add tests for validation and timestamp consistency
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces support for DEL_PREFIX operations within the ShardedCoordinator. Because these operations may span multiple shards, they are broadcast to all shard groups. The implementation includes validation to prevent DEL_PREFIX from being used in transactions or mixed with other operation types, along with comprehensive unit and integration tests. Feedback focuses on optimizing the broadcast logic by batching requests per shard group to improve performance, enforcing a maximum batch size to prevent potential OOM issues, and adding safety checks for response objects.
kv/sharded_coordinator.go
Outdated
| func (c *ShardedCoordinator) dispatchDelPrefixBroadcast(elems []*Elem[OP]) (*CoordinateResponse, error) { | ||
| var maxIndex uint64 | ||
| var firstErr error | ||
| for _, elem := range elems { | ||
| mut := elemToMutation(elem) | ||
| ts := c.clock.Next() | ||
| for _, g := range c.groups { | ||
| req := &pb.Request{ | ||
| IsTxn: false, | ||
| Phase: pb.Phase_NONE, | ||
| Ts: ts, | ||
| Mutations: []*pb.Mutation{mut}, | ||
| } | ||
| r, err := g.Txn.Commit([]*pb.Request{req}) | ||
| if err != nil { | ||
| if firstErr == nil { | ||
| firstErr = err | ||
| } | ||
| continue | ||
| } | ||
| if r.CommitIndex > maxIndex { | ||
| maxIndex = r.CommitIndex | ||
| } | ||
| } | ||
| } | ||
| if firstErr != nil { | ||
| return nil, errors.WithStack(firstErr) | ||
| } | ||
| return &CoordinateResponse{CommitIndex: maxIndex}, nil | ||
| } |
There was a problem hiding this comment.
The current implementation performs N_elements * N_groups synchronous Commit calls, which can be extremely slow in production environments with many shards. Since g.Txn.Commit accepts a slice of requests, you can optimize this by pre-building the list of pb.Request objects and sending them in a single call per shard group. Additionally, to prevent unbounded memory growth and potential OOM issues as per repository guidelines, you should ensure the input collection is bounded and reject requests that exceed this limit. A nil check for the response r should also be added to prevent potential panics.
func (c *ShardedCoordinator) dispatchDelPrefixBroadcast(elems []*Elem[OP]) (*CoordinateResponse, error) {
if len(elems) > MaxBatchSize {
return nil, errors.Errorf("batch size %d exceeds maximum allowed %d", len(elems), MaxBatchSize)
}
requests := make([]*pb.Request, 0, len(elems))
for _, elem := range elems {
requests = append(requests, &pb.Request{
IsTxn: false,
Phase: pb.Phase_NONE,
Ts: c.clock.Next(),
Mutations: []*pb.Mutation{elemToMutation(elem)},
})
}
var maxIndex uint64
var firstErr error
for _, g := range c.groups {
r, err := g.Txn.Commit(requests)
if err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
if r != nil && r.CommitIndex > maxIndex {
maxIndex = r.CommitIndex
}
}
if firstErr != nil {
return nil, errors.WithStack(firstErr)
}
return &CoordinateResponse{CommitIndex: maxIndex}, nil
}References
- To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests. Reject new requests when the bound is reached.
- Extract DEL_PREFIX validation into dispatchDelPrefixBroadcast to reduce Dispatch cyclomatic complexity below threshold (cyclop) - Batch all DEL_PREFIX requests into a single Commit call per shard group instead of N_elements * N_groups synchronous calls - Add nil check for Commit response - Fix gofmt alignment in test file
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces support for DEL_PREFIX operations within the ShardedCoordinator, allowing for prefix-based deletions that are broadcast across all shard groups. The changes include new validation logic to ensure DEL_PREFIX is not mixed with other operation types or executed within transactions, along with comprehensive unit and integration tests. Feedback was provided regarding the sequential nature of the broadcast mechanism, suggesting that parallelizing the commits to shard groups would significantly reduce latency in environments with many shards.
| for _, g := range c.groups { | ||
| r, err := g.Txn.Commit(requests) | ||
| if err != nil { | ||
| if firstErr == nil { | ||
| firstErr = err | ||
| } | ||
| continue | ||
| } | ||
| if r != nil && r.CommitIndex > maxIndex { | ||
| maxIndex = r.CommitIndex | ||
| } | ||
| } |
There was a problem hiding this comment.
The broadcast of DEL_PREFIX operations to all shard groups is performed sequentially. In a sharded environment with a significant number of shard groups, this will lead to high latency (O(N_shards * Raft_latency)). Since ShardedCoordinator is used in production, this sequential execution could easily exceed client timeouts for operations like FLUSHALL (nil prefix). Consider parallelizing these commits using goroutines and a sync.WaitGroup or similar mechanism to improve performance.
Extract broadcastToAllGroups to dispatch Commit calls concurrently using goroutines, reducing latency from O(N_shards * raft_latency) to O(raft_latency). Also resolves cyclop complexity warning by splitting validation and broadcast into separate functions.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces support for DEL_PREFIX operations within the sharded coordinator. Because prefix deletions can span multiple shards, the implementation adds logic to broadcast these requests to all shard groups in parallel and aggregate the resulting commit indices. The changes include validation to ensure DEL_PREFIX is not mixed with other operations or used within transactions, along with comprehensive unit and integration tests. Feedback was provided regarding the timestamping logic in dispatchDelPrefixBroadcast, where assigning unique timestamps to each element in a single batch may violate MVCC consistency requirements.
| func (c *ShardedCoordinator) dispatchDelPrefixBroadcast(isTxn bool, elems []*Elem[OP]) (*CoordinateResponse, error) { | ||
| if isTxn { | ||
| return nil, errors.Wrap(ErrInvalidRequest, "DEL_PREFIX not supported in transactions") | ||
| } | ||
| if err := validateDelPrefixOnly(elems); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| requests := make([]*pb.Request, 0, len(elems)) | ||
| for _, elem := range elems { | ||
| requests = append(requests, &pb.Request{ | ||
| IsTxn: false, | ||
| Phase: pb.Phase_NONE, | ||
| Ts: c.clock.Next(), | ||
| Mutations: []*pb.Mutation{elemToMutation(elem)}, | ||
| }) | ||
| } | ||
|
|
||
| return c.broadcastToAllGroups(requests) | ||
| } |
There was a problem hiding this comment.
In dispatchDelPrefixBroadcast, each DEL_PREFIX element in a batch is currently assigned a unique timestamp via c.clock.Next(). This violates the requirement that all operations within a single transaction in an MVCC system must share the same start timestamp to ensure atomicity and a consistent snapshot view. Please update the implementation to use a single timestamp for all operations in the batch to maintain atomicity.
References
- In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view.
All elements in a DEL_PREFIX batch now share one timestamp to ensure atomicity and consistent MVCC snapshot view across shards.
|
/gemini reiview |
|
It looks like you made a small typo in your command ( |
|
/gemini reiview |
|
It appears you are still using the command |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces support for the DEL_PREFIX operation in the ShardedCoordinator. Since DEL_PREFIX can span multiple shards, the implementation broadcasts the operation to all shard groups in parallel and returns the maximum commit index. The changes include validation logic to prevent mixing DEL_PREFIX with other operations or using it within transactions. Additionally, a comprehensive suite of unit and integration tests has been added to verify the broadcast behavior, error handling, and preservation of internal transaction metadata. I have no feedback to provide.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces DEL_PREFIX support in the ShardedCoordinator, broadcasting the operation to all shard groups to handle keys spanning multiple shards. It includes validation to prohibit mixing DEL_PREFIX with other operations or using it in transactions. Feedback was provided to address a potential nil pointer dereference in the broadcast loop when accessing shard group transactions.
| for _, g := range c.groups { | ||
| wg.Add(1) | ||
| go func(g *ShardGroup) { |
There was a problem hiding this comment.
The loop iterates over c.groups and starts a goroutine for each group. However, there is no check to ensure that the ShardGroup pointer g or its Txn field are non-nil. If any entry in the map is nil or has a nil transaction manager, the goroutine will panic when calling g.Txn.Commit. While the coordinator is typically initialized with valid groups, adding a defensive check here improves robustness and aligns with the nil checks performed in other methods like txnGroupForID and commitSecondaryTxns.
for gid, g := range c.groups {
if g == nil || g.Txn == nil {
errMu.Lock()
if firstErr == nil {
firstErr = errors.Wrapf(ErrInvalidRequest, "group %d has no transaction manager", gid)
}
errMu.Unlock()
continue
}
wg.Add(1)
go func(g *ShardGroup) {
DEL_PREFIX with a nil key (FLUSHALL) or a prefix spanning multiple shards could not be routed by groupMutations, causing "invalid request" errors in production where ShardedCoordinator is always used.