Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions internal/fslock/fslock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ import "os"

// Lock represents an acquired advisory file lock.
type Lock struct {
f *os.File
}

// Release releases the lock and closes the file. The lock file is left on
// disk so that concurrent waiters do not race on a deleted path.
func (l Lock) Release() error {
return l.f.Close()
f *os.File
path string
}
56 changes: 56 additions & 0 deletions internal/fslock/fslock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package fslock_test

import (
"os"
"path/filepath"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -48,6 +50,60 @@ func TestAcquireTwiceSequential(t *testing.T) {
lock2.Release()
}

func TestReleaseRemovesFile(t *testing.T) {
path := filepath.Join(t.TempDir(), "test.lock")
lock, err := fslock.Acquire(path, 5*time.Second)
if err != nil {
t.Fatalf("Acquire: %v", err)
}
if _, err := os.Stat(path); err != nil {
t.Fatalf("lock file missing while held: %v", err)
}
if err := lock.Release(); err != nil {
t.Fatalf("Release: %v", err)
}
if _, err := os.Stat(path); !os.IsNotExist(err) {
t.Fatalf("lock file still on disk after Release: stat err=%v", err)
}
}

func TestConcurrentAcquireIsExclusive(t *testing.T) {
path := filepath.Join(t.TempDir(), "test.lock")
const workers = 8
var (
wg sync.WaitGroup
mu sync.Mutex
holding int
maxSeen int
)
for range workers {
wg.Go(func() {
lock, err := fslock.Acquire(path, 10*time.Second)
if err != nil {
t.Errorf("Acquire: %v", err)
return
}
mu.Lock()
holding++
if holding > maxSeen {
maxSeen = holding
}
mu.Unlock()
time.Sleep(10 * time.Millisecond)
mu.Lock()
holding--
mu.Unlock()
if err := lock.Release(); err != nil {
t.Errorf("Release: %v", err)
}
})
}
wg.Wait()
if maxSeen != 1 {
t.Fatalf("mutual exclusion violated: saw %d concurrent holders", maxSeen)
}
}

func TestAcquireTimeout(t *testing.T) {
path := filepath.Join(t.TempDir(), "test.lock")
lock1, err := fslock.Acquire(path, 5*time.Second)
Expand Down
85 changes: 77 additions & 8 deletions internal/fslock/fslock_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package fslock

import (
"errors"
"fmt"
"os"
"syscall"
Expand All @@ -26,21 +27,89 @@ import (
// Acquire acquires an exclusive advisory lock on the file at path, retrying
// until timeout elapses. Returns an error if the lock cannot be acquired.
func Acquire(path string, timeout time.Duration) (Lock, error) {
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return Lock{}, fmt.Errorf("fslock: open %s: %w", path, err)
deadline := time.Now().Add(timeout)
for {
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return Lock{}, fmt.Errorf("fslock: open %s: %w", path, err)
}

lock, err := lockFile(f, path, deadline)
if err == nil {
return lock, nil
}
f.Close()
if errors.Is(err, errStaleInode) {
if time.Now().Before(deadline) {
// Previous holder unlinked the file between our open and
// our flock; the path now refers to a different inode.
// Reopen and try again within the remaining budget.
continue
}
return Lock{}, fmt.Errorf("fslock: could not acquire lock on %s within %s: %w", path, timeout, err)
}
return Lock{}, err
}
}

deadline := time.Now().Add(timeout)
// errStaleInode signals that the opened fd refers to an inode that has been
// unlinked (or replaced) since we opened it — we need to reopen and retry.
var errStaleInode = errors.New("fslock: stale inode")

func lockFile(f *os.File, path string, deadline time.Time) (Lock, error) {
for {
err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err == nil {
return Lock{f: f}, nil
// Confirm the path still points at our inode. If the previous
// holder unlinked it (or it was replaced), the lock we just took
// is on a dead inode and a new caller could lock the new file.
if stale, serr := inodeIsStale(f, path); serr != nil {
syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
return Lock{}, fmt.Errorf("fslock: stat %s: %w", path, serr)
} else if stale {
syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
return Lock{}, errStaleInode
}
return Lock{f: f, path: path}, nil
}
if time.Now().After(deadline) {
f.Close()
return Lock{}, fmt.Errorf("fslock: could not acquire lock on %s within %s: %w", path, timeout, err)
return Lock{}, fmt.Errorf("fslock: could not acquire lock on %s: %w", path, err)
}
time.Sleep(50 * time.Millisecond)
}
}

func inodeIsStale(f *os.File, path string) (bool, error) {
var fdStat syscall.Stat_t
if err := syscall.Fstat(int(f.Fd()), &fdStat); err != nil {
return false, err
}
var pathStat syscall.Stat_t
if err := syscall.Stat(path, &pathStat); err != nil {
if errors.Is(err, syscall.ENOENT) {
return true, nil
}
return false, err
}
return fdStat.Ino != pathStat.Ino || fdStat.Dev != pathStat.Dev, nil
}

// Release unlinks the lock file and releases the lock. Unlinking before
// closing ensures no new caller can open the same inode and take the lock
// while we still hold it; combined with the inode recheck in Acquire, this
// guarantees that a fresh file at the same path is never raced against a
// soon-to-be-deleted one.
func (l Lock) Release() error {
if l.f == nil {
return nil
}
rmErr := os.Remove(l.path)
closeErr := l.f.Close()
if closeErr != nil {
return closeErr
}
if rmErr != nil && !errors.Is(rmErr, os.ErrNotExist) {
return rmErr
}
return nil
}
33 changes: 32 additions & 1 deletion internal/fslock/fslock_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package fslock

import (
"errors"
"fmt"
"os"
"time"
Expand All @@ -39,7 +40,7 @@ func Acquire(path string, timeout time.Duration) (Lock, error) {
windows.LOCKFILE_EXCLUSIVE_LOCK|windows.LOCKFILE_FAIL_IMMEDIATELY,
0, 1, 0, ol)
if err == nil {
return Lock{f: f}, nil
return Lock{f: f, path: path}, nil
}
if time.Now().After(deadline) {
f.Close()
Expand All @@ -48,3 +49,33 @@ func Acquire(path string, timeout time.Duration) (Lock, error) {
time.Sleep(50 * time.Millisecond)
}
}

// Release releases the lock and removes the lock file. On Windows, Go opens
// files without FILE_SHARE_DELETE, so os.Remove returns ERROR_SHARING_VIOLATION
// if another process still has the file open — making the delete inherently
// safe. We close first so our own handle doesn't block the remove, then
// swallow only ERROR_SHARING_VIOLATION (deferred cleanup) and ErrNotExist.
// Other errors, including ERROR_ACCESS_DENIED, propagate.
func (l Lock) Release() error {
if l.f == nil {
return nil
}
if err := l.f.Close(); err != nil {
return err
}
if err := os.Remove(l.path); err != nil && !isBenignRemoveErr(err) {
return err
}
return nil
}

func isBenignRemoveErr(err error) bool {
// ERROR_SHARING_VIOLATION is the unambiguous "another handle is open
// without FILE_SHARE_DELETE" case — cleanup is simply deferred to
// whoever closes last. ERROR_ACCESS_DENIED is *not* safe to swallow:
// Windows returns it for read-only attributes, ACL changes, or when
// the path has been replaced with a directory, none of which should
// be reported as a successful Release.
return errors.Is(err, os.ErrNotExist) ||
errors.Is(err, windows.ERROR_SHARING_VIOLATION)
}
47 changes: 47 additions & 0 deletions internal/fslock/fslock_windows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2026 Columnar Technologies Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build windows

package fslock

import (
"errors"
"io/fs"
"os"
"testing"

"golang.org/x/sys/windows"
)

func TestIsBenignRemoveErr(t *testing.T) {
cases := []struct {
name string
err error
benign bool
}{
{"sharing violation", &fs.PathError{Op: "remove", Err: windows.ERROR_SHARING_VIOLATION}, true},
{"not exist", os.ErrNotExist, true},
{"wrapped not exist", &fs.PathError{Op: "remove", Err: os.ErrNotExist}, true},
{"access denied", &fs.PathError{Op: "remove", Err: windows.ERROR_ACCESS_DENIED}, false},
{"generic io error", errors.New("boom"), false},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := isBenignRemoveErr(tc.err); got != tc.benign {
t.Fatalf("isBenignRemoveErr(%v) = %v, want %v", tc.err, got, tc.benign)
}
})
}
}
Loading