Skip to content
97 changes: 97 additions & 0 deletions kv/sharded_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"log/slog"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/bootjp/elastickv/distribution"
Expand Down Expand Up @@ -68,6 +70,13 @@ func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[
return nil, err
}

// DEL_PREFIX cannot be routed to a single shard because the prefix may
// span multiple shards (or be nil, meaning "all keys"). Broadcast the
// operation to every shard group so each FSM scans locally.
if hasDelPrefixElem(reqs.Elems) {
return c.dispatchDelPrefixBroadcast(reqs.IsTxn, reqs.Elems)
}
Comment on lines +76 to +78
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Broadcasting an entire OperationGroup that contains mixed operation types (e.g., PUT and DEL_PREFIX) is problematic for two reasons:

  1. Data Loss: The kvFSM.handleRawRequest implementation (in fsm.go) only processes the first DEL_PREFIX mutation it finds and ignores all other mutations in the same request. Thus, any PUT or DEL operations in the same group would be lost.
  2. Transaction Bypass: DEL_PREFIX is not supported within transactions by the FSM. This broadcast logic currently intercepts transactions before they are validated, potentially bypassing intended transactional semantics.

You should ensure that DEL_PREFIX is only allowed in non-transactional groups and that it is not mixed with other operation types.

	if hasDelPrefixElem(reqs.Elems) {
		if reqs.IsTxn {
			return nil, errors.Wrapf(ErrInvalidRequest, "DEL_PREFIX not supported in transactions")
		}
		for _, e := range reqs.Elems {
			if e.Op != DelPrefix {
				return nil, errors.Wrapf(ErrInvalidRequest, "mixed DEL_PREFIX and other operations not supported")
			}
		}
		return c.dispatchDelPrefixBroadcast(reqs.Elems)
	}


if reqs.IsTxn && reqs.StartTS == 0 {
startTS, err := c.nextStartTS(ctx, reqs.Elems)
if err != nil {
Expand Down Expand Up @@ -97,6 +106,94 @@ func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[
return &CoordinateResponse{CommitIndex: r.CommitIndex}, nil
}

// hasDelPrefixElem returns true if any element is a DelPrefix operation.
func hasDelPrefixElem(elems []*Elem[OP]) bool {
for _, e := range elems {
if e != nil && e.Op == DelPrefix {
return true
}
}
return false
}

// validateDelPrefixOnly ensures all elements are DelPrefix operations.
// Mixing DEL_PREFIX with other operations (PUT, DEL) in a single dispatch is
// not allowed because the FSM handles DEL_PREFIX exclusively.
func validateDelPrefixOnly(elems []*Elem[OP]) error {
for _, e := range elems {
if e != nil && e.Op != DelPrefix {
return errors.Wrap(ErrInvalidRequest, "DEL_PREFIX cannot be mixed with other operations")
}
}
return nil
}

// dispatchDelPrefixBroadcast validates and broadcasts DEL_PREFIX operations
// to every shard group. Each element becomes a separate pb.Request (the FSM's
// extractDelPrefix processes only the first DEL_PREFIX mutation per request).
// All requests are batched into a single Commit call per shard group.
func (c *ShardedCoordinator) dispatchDelPrefixBroadcast(isTxn bool, elems []*Elem[OP]) (*CoordinateResponse, error) {
if isTxn {
return nil, errors.Wrap(ErrInvalidRequest, "DEL_PREFIX not supported in transactions")
}
if err := validateDelPrefixOnly(elems); err != nil {
return nil, err
}

ts := c.clock.Next()
requests := make([]*pb.Request, 0, len(elems))
for _, elem := range elems {
requests = append(requests, &pb.Request{
IsTxn: false,
Phase: pb.Phase_NONE,
Ts: ts,
Mutations: []*pb.Mutation{elemToMutation(elem)},
})
}

return c.broadcastToAllGroups(requests)
}
Comment on lines +135 to +155
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In dispatchDelPrefixBroadcast, each DEL_PREFIX element in a batch is currently assigned a unique timestamp via c.clock.Next(). This violates the requirement that all operations within a single transaction in an MVCC system must share the same start timestamp to ensure atomicity and a consistent snapshot view. Please update the implementation to use a single timestamp for all operations in the batch to maintain atomicity.

References
  1. In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view.


// broadcastToAllGroups sends the same set of requests to every shard group in
// parallel and returns the maximum commit index.
func (c *ShardedCoordinator) broadcastToAllGroups(requests []*pb.Request) (*CoordinateResponse, error) {
var (
maxIndex atomic.Uint64
firstErr error
errMu sync.Mutex
wg sync.WaitGroup
)
for _, g := range c.groups {
wg.Add(1)
go func(g *ShardGroup) {
Comment on lines +166 to +168
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The loop iterates over c.groups and starts a goroutine for each group. However, there is no check to ensure that the ShardGroup pointer g or its Txn field are non-nil. If any entry in the map is nil or has a nil transaction manager, the goroutine will panic when calling g.Txn.Commit. While the coordinator is typically initialized with valid groups, adding a defensive check here improves robustness and aligns with the nil checks performed in other methods like txnGroupForID and commitSecondaryTxns.

	for gid, g := range c.groups {
		if g == nil || g.Txn == nil {
			errMu.Lock()
			if firstErr == nil {
				firstErr = errors.Wrapf(ErrInvalidRequest, "group %d has no transaction manager", gid)
			}
			errMu.Unlock()
			continue
		}
		wg.Add(1)
		go func(g *ShardGroup) {

defer wg.Done()
r, err := g.Txn.Commit(requests)
if err != nil {
errMu.Lock()
if firstErr == nil {
firstErr = err
}
errMu.Unlock()
return
}
if r != nil {
for {
cur := maxIndex.Load()
if r.CommitIndex <= cur || maxIndex.CompareAndSwap(cur, r.CommitIndex) {
break
}
}
}
}(g)
}
Comment on lines +166 to +188
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The broadcast of DEL_PREFIX operations to all shard groups is performed sequentially. In a sharded environment with a significant number of shard groups, this will lead to high latency (O(N_shards * Raft_latency)). Since ShardedCoordinator is used in production, this sequential execution could easily exceed client timeouts for operations like FLUSHALL (nil prefix). Consider parallelizing these commits using goroutines and a sync.WaitGroup or similar mechanism to improve performance.

wg.Wait()

if firstErr != nil {
return nil, errors.WithStack(firstErr)
}
return &CoordinateResponse{CommitIndex: maxIndex.Load()}, nil
}

func (c *ShardedCoordinator) dispatchTxn(startTS uint64, commitTS uint64, elems []*Elem[OP]) (*CoordinateResponse, error) {
grouped, gids, err := c.groupMutations(elems)
if err != nil {
Expand Down
Loading
Loading