Skip to content
10 changes: 9 additions & 1 deletion go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,15 @@ func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator {
ghostTableMigrated: make(chan bool),
firstThrottlingCollected: make(chan bool, 3),
rowCopyComplete: make(chan error),
allEventsUpToLockProcessed: make(chan *lockProcessedStruct),
// Buffered to MaxRetries() to prevent a deadlock when waitForEventsUpToLock times
// out. The sentinel applyEventFunc may still be queued in applyEventsQueue when the
// timeout fires; when the worker eventually executes it, it sends on this channel
// with no active receiver. An unbuffered channel would block the worker permanently:
// the queue fills, the listener goroutine stalls, heartbeat lag grows unboundedly,
// and no further cutover attempts are made. With a buffer sized to the retry limit
// the send always completes immediately. Stale sentinels accumulate in the buffer
// and are discarded by the stale-skip loop in waitForEventsUpToLock.
allEventsUpToLockProcessed: make(chan *lockProcessedStruct, context.MaxRetries()),

copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize),
Expand Down
Loading