diff --git a/internal/fslock/fslock.go b/internal/fslock/fslock.go index 99a25c6b..cd6f330a 100644 --- a/internal/fslock/fslock.go +++ b/internal/fslock/fslock.go @@ -20,11 +20,6 @@ import "os" // Lock represents an acquired advisory file lock. type Lock struct { - f *os.File -} - -// Release releases the lock and closes the file. The lock file is left on -// disk so that concurrent waiters do not race on a deleted path. -func (l Lock) Release() error { - return l.f.Close() + f *os.File + path string } diff --git a/internal/fslock/fslock_test.go b/internal/fslock/fslock_test.go index 423f3d8a..629d229b 100644 --- a/internal/fslock/fslock_test.go +++ b/internal/fslock/fslock_test.go @@ -15,7 +15,9 @@ package fslock_test import ( + "os" "path/filepath" + "sync" "testing" "time" @@ -48,6 +50,60 @@ func TestAcquireTwiceSequential(t *testing.T) { lock2.Release() } +func TestReleaseRemovesFile(t *testing.T) { + path := filepath.Join(t.TempDir(), "test.lock") + lock, err := fslock.Acquire(path, 5*time.Second) + if err != nil { + t.Fatalf("Acquire: %v", err) + } + if _, err := os.Stat(path); err != nil { + t.Fatalf("lock file missing while held: %v", err) + } + if err := lock.Release(); err != nil { + t.Fatalf("Release: %v", err) + } + if _, err := os.Stat(path); !os.IsNotExist(err) { + t.Fatalf("lock file still on disk after Release: stat err=%v", err) + } +} + +func TestConcurrentAcquireIsExclusive(t *testing.T) { + path := filepath.Join(t.TempDir(), "test.lock") + const workers = 8 + var ( + wg sync.WaitGroup + mu sync.Mutex + holding int + maxSeen int + ) + for range workers { + wg.Go(func() { + lock, err := fslock.Acquire(path, 10*time.Second) + if err != nil { + t.Errorf("Acquire: %v", err) + return + } + mu.Lock() + holding++ + if holding > maxSeen { + maxSeen = holding + } + mu.Unlock() + time.Sleep(10 * time.Millisecond) + mu.Lock() + holding-- + mu.Unlock() + if err := lock.Release(); err != nil { + t.Errorf("Release: %v", err) + } + }) + } + wg.Wait() + if maxSeen != 1 { + t.Fatalf("mutual exclusion violated: saw %d concurrent holders", maxSeen) + } +} + func TestAcquireTimeout(t *testing.T) { path := filepath.Join(t.TempDir(), "test.lock") lock1, err := fslock.Acquire(path, 5*time.Second) diff --git a/internal/fslock/fslock_unix.go b/internal/fslock/fslock_unix.go index 52c3f4c2..51246c68 100644 --- a/internal/fslock/fslock_unix.go +++ b/internal/fslock/fslock_unix.go @@ -17,6 +17,7 @@ package fslock import ( + "errors" "fmt" "os" "syscall" @@ -26,21 +27,89 @@ import ( // Acquire acquires an exclusive advisory lock on the file at path, retrying // until timeout elapses. Returns an error if the lock cannot be acquired. func Acquire(path string, timeout time.Duration) (Lock, error) { - f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0600) - if err != nil { - return Lock{}, fmt.Errorf("fslock: open %s: %w", path, err) + deadline := time.Now().Add(timeout) + for { + f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0600) + if err != nil { + return Lock{}, fmt.Errorf("fslock: open %s: %w", path, err) + } + + lock, err := lockFile(f, path, deadline) + if err == nil { + return lock, nil + } + f.Close() + if errors.Is(err, errStaleInode) { + if time.Now().Before(deadline) { + // Previous holder unlinked the file between our open and + // our flock; the path now refers to a different inode. + // Reopen and try again within the remaining budget. + continue + } + return Lock{}, fmt.Errorf("fslock: could not acquire lock on %s within %s: %w", path, timeout, err) + } + return Lock{}, err } +} - deadline := time.Now().Add(timeout) +// errStaleInode signals that the opened fd refers to an inode that has been +// unlinked (or replaced) since we opened it — we need to reopen and retry. +var errStaleInode = errors.New("fslock: stale inode") + +func lockFile(f *os.File, path string, deadline time.Time) (Lock, error) { for { - err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) if err == nil { - return Lock{f: f}, nil + // Confirm the path still points at our inode. If the previous + // holder unlinked it (or it was replaced), the lock we just took + // is on a dead inode and a new caller could lock the new file. + if stale, serr := inodeIsStale(f, path); serr != nil { + syscall.Flock(int(f.Fd()), syscall.LOCK_UN) + return Lock{}, fmt.Errorf("fslock: stat %s: %w", path, serr) + } else if stale { + syscall.Flock(int(f.Fd()), syscall.LOCK_UN) + return Lock{}, errStaleInode + } + return Lock{f: f, path: path}, nil } if time.Now().After(deadline) { - f.Close() - return Lock{}, fmt.Errorf("fslock: could not acquire lock on %s within %s: %w", path, timeout, err) + return Lock{}, fmt.Errorf("fslock: could not acquire lock on %s: %w", path, err) } time.Sleep(50 * time.Millisecond) } } + +func inodeIsStale(f *os.File, path string) (bool, error) { + var fdStat syscall.Stat_t + if err := syscall.Fstat(int(f.Fd()), &fdStat); err != nil { + return false, err + } + var pathStat syscall.Stat_t + if err := syscall.Stat(path, &pathStat); err != nil { + if errors.Is(err, syscall.ENOENT) { + return true, nil + } + return false, err + } + return fdStat.Ino != pathStat.Ino || fdStat.Dev != pathStat.Dev, nil +} + +// Release unlinks the lock file and releases the lock. Unlinking before +// closing ensures no new caller can open the same inode and take the lock +// while we still hold it; combined with the inode recheck in Acquire, this +// guarantees that a fresh file at the same path is never raced against a +// soon-to-be-deleted one. +func (l Lock) Release() error { + if l.f == nil { + return nil + } + rmErr := os.Remove(l.path) + closeErr := l.f.Close() + if closeErr != nil { + return closeErr + } + if rmErr != nil && !errors.Is(rmErr, os.ErrNotExist) { + return rmErr + } + return nil +} diff --git a/internal/fslock/fslock_windows.go b/internal/fslock/fslock_windows.go index f4050261..123fd7e2 100644 --- a/internal/fslock/fslock_windows.go +++ b/internal/fslock/fslock_windows.go @@ -17,6 +17,7 @@ package fslock import ( + "errors" "fmt" "os" "time" @@ -39,7 +40,7 @@ func Acquire(path string, timeout time.Duration) (Lock, error) { windows.LOCKFILE_EXCLUSIVE_LOCK|windows.LOCKFILE_FAIL_IMMEDIATELY, 0, 1, 0, ol) if err == nil { - return Lock{f: f}, nil + return Lock{f: f, path: path}, nil } if time.Now().After(deadline) { f.Close() @@ -48,3 +49,33 @@ func Acquire(path string, timeout time.Duration) (Lock, error) { time.Sleep(50 * time.Millisecond) } } + +// Release releases the lock and removes the lock file. On Windows, Go opens +// files without FILE_SHARE_DELETE, so os.Remove returns ERROR_SHARING_VIOLATION +// if another process still has the file open — making the delete inherently +// safe. We close first so our own handle doesn't block the remove, then +// swallow only ERROR_SHARING_VIOLATION (deferred cleanup) and ErrNotExist. +// Other errors, including ERROR_ACCESS_DENIED, propagate. +func (l Lock) Release() error { + if l.f == nil { + return nil + } + if err := l.f.Close(); err != nil { + return err + } + if err := os.Remove(l.path); err != nil && !isBenignRemoveErr(err) { + return err + } + return nil +} + +func isBenignRemoveErr(err error) bool { + // ERROR_SHARING_VIOLATION is the unambiguous "another handle is open + // without FILE_SHARE_DELETE" case — cleanup is simply deferred to + // whoever closes last. ERROR_ACCESS_DENIED is *not* safe to swallow: + // Windows returns it for read-only attributes, ACL changes, or when + // the path has been replaced with a directory, none of which should + // be reported as a successful Release. + return errors.Is(err, os.ErrNotExist) || + errors.Is(err, windows.ERROR_SHARING_VIOLATION) +} diff --git a/internal/fslock/fslock_windows_test.go b/internal/fslock/fslock_windows_test.go new file mode 100644 index 00000000..845078c1 --- /dev/null +++ b/internal/fslock/fslock_windows_test.go @@ -0,0 +1,47 @@ +// Copyright 2026 Columnar Technologies Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build windows + +package fslock + +import ( + "errors" + "io/fs" + "os" + "testing" + + "golang.org/x/sys/windows" +) + +func TestIsBenignRemoveErr(t *testing.T) { + cases := []struct { + name string + err error + benign bool + }{ + {"sharing violation", &fs.PathError{Op: "remove", Err: windows.ERROR_SHARING_VIOLATION}, true}, + {"not exist", os.ErrNotExist, true}, + {"wrapped not exist", &fs.PathError{Op: "remove", Err: os.ErrNotExist}, true}, + {"access denied", &fs.PathError{Op: "remove", Err: windows.ERROR_ACCESS_DENIED}, false}, + {"generic io error", errors.New("boom"), false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := isBenignRemoveErr(tc.err); got != tc.benign { + t.Fatalf("isBenignRemoveErr(%v) = %v, want %v", tc.err, got, tc.benign) + } + }) + } +}