Skip to content

Commit 1c68cd7

Browse files
committed
Merge remote-tracking branch 'upstream/multi-attach-staleness' into release/4.0.0
2 parents dac4de8 + f7ffaa4 commit 1c68cd7

2 files changed

Lines changed: 322 additions & 5 deletions

File tree

driver/controller.go

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,11 @@ func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest)
398398
return nil, status.Error(codes.InvalidArgument, "DeleteVolume Volume ID must be provided")
399399
}
400400

401+
if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired {
402+
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId)
403+
}
404+
defer d.volumeLocks.Release(req.VolumeId)
405+
401406
ll := d.log.WithFields(logrus.Fields{
402407
"volume_id": req.VolumeId,
403408
"method": "delete_volume",
@@ -461,17 +466,58 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
461466
return nil, status.Error(codes.AlreadyExists, "read only Volumes are not supported")
462467
}
463468

469+
if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired {
470+
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId)
471+
}
472+
defer d.volumeLocks.Release(req.VolumeId)
473+
464474
ll := d.log.WithFields(logrus.Fields{
465475
"volume_id": req.VolumeId,
466476
"node_id": req.NodeId,
467477
"method": "controller_publish_volume",
468478
})
469479
ll.Info("controller publish volume called")
470480

481+
// Check current attachment state before modifying it. This prevents
482+
// silently moving a volume that is still attached to another node,
483+
// which would cause a stale VolumeAttachment and Multi-Attach errors.
484+
volume, err := d.cloudscaleClient.Volumes.Get(ctx, req.VolumeId)
485+
if err != nil {
486+
return nil, reraiseNotFound(err, ll, "fetch volume for publish")
487+
}
488+
489+
if volume.ServerUUIDs != nil && len(*volume.ServerUUIDs) > 0 {
490+
alreadyAttachedToRequestedNode := false
491+
for _, serverUUID := range *volume.ServerUUIDs {
492+
if serverUUID == req.NodeId {
493+
alreadyAttachedToRequestedNode = true
494+
break
495+
}
496+
}
497+
498+
if alreadyAttachedToRequestedNode {
499+
ll.Info("volume is already attached to the requested node")
500+
return &csi.ControllerPublishVolumeResponse{
501+
PublishContext: map[string]string{
502+
PublishInfoVolumeName: volume.Name,
503+
LuksEncryptedAttribute: req.VolumeContext[LuksEncryptedAttribute],
504+
LuksCipherAttribute: req.VolumeContext[LuksCipherAttribute],
505+
LuksKeySizeAttribute: req.VolumeContext[LuksKeySizeAttribute],
506+
},
507+
}, nil
508+
}
509+
510+
ll.WithField("current_server_uuids", *volume.ServerUUIDs).
511+
Warn("volume is already attached to a different node")
512+
return nil, status.Errorf(codes.FailedPrecondition,
513+
"volume %s is already attached to server(s) %v, must be detached first",
514+
req.VolumeId, *volume.ServerUUIDs)
515+
}
516+
471517
attachRequest := &cloudscale.VolumeUpdateRequest{
472518
ServerUUIDs: &[]string{req.NodeId},
473519
}
474-
err := d.cloudscaleClient.Volumes.Update(ctx, req.VolumeId, attachRequest)
520+
err = d.cloudscaleClient.Volumes.Update(ctx, req.VolumeId, attachRequest)
475521
if err != nil {
476522
if maxVolumesPerServerErrorMessageRe.MatchString(err.Error()) {
477523
return nil, status.Error(codes.ResourceExhausted, err.Error())
@@ -481,10 +527,6 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
481527
}
482528

483529
ll.Info("volume is attached")
484-
volume, err := d.cloudscaleClient.Volumes.Get(ctx, req.VolumeId)
485-
if err != nil {
486-
return nil, reraiseNotFound(err, ll, "fetch volume")
487-
}
488530
return &csi.ControllerPublishVolumeResponse{
489531
PublishContext: map[string]string{
490532
PublishInfoVolumeName: volume.Name,
@@ -501,6 +543,11 @@ func (d *Driver) ControllerUnpublishVolume(ctx context.Context, req *csi.Control
501543
return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume ID must be provided")
502544
}
503545

546+
if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired {
547+
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId)
548+
}
549+
defer d.volumeLocks.Release(req.VolumeId)
550+
504551
ll := d.log.WithFields(logrus.Fields{
505552
"volume_id": req.VolumeId,
506553
"node_id": req.NodeId,
@@ -710,6 +757,11 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ
710757
return nil, status.Error(codes.InvalidArgument, "CreateSnapshotRequest Source Volume Id must be provided")
711758
}
712759

760+
if acquired := d.volumeLocks.TryAcquire(req.SourceVolumeId); !acquired {
761+
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.SourceVolumeId)
762+
}
763+
defer d.volumeLocks.Release(req.SourceVolumeId)
764+
713765
ll := d.log.WithFields(logrus.Fields{
714766
"source_volume_id": req.SourceVolumeId,
715767
"name": req.Name,
@@ -941,6 +993,12 @@ func (d *Driver) ControllerExpandVolume(ctx context.Context, req *csi.Controller
941993
if len(volID) == 0 {
942994
return nil, status.Error(codes.InvalidArgument, "ControllerExpandVolume volume ID missing in request")
943995
}
996+
997+
if acquired := d.volumeLocks.TryAcquire(volID); !acquired {
998+
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", volID)
999+
}
1000+
defer d.volumeLocks.Release(volID)
1001+
9441002
volume, err := d.cloudscaleClient.Volumes.Get(ctx, volID)
9451003
if err != nil {
9461004
return nil, status.Errorf(codes.Internal, "ControllerExpandVolume could not retrieve existing volume: %v", err)

driver/driver_test.go

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1337,3 +1337,262 @@ func TestCreateVolumeFromSnapshot_Idempotent_NeedsExpansion(t *testing.T) {
13371337
assert.NoError(t, err)
13381338
assert.Equal(t, expandedSizeGiB, vol.SizeGB)
13391339
}
1340+
1341+
// TestControllerPublishVolume_RejectsWhenAttachedToDifferentNode tests that
1342+
// ControllerPublishVolume returns FailedPrecondition when the volume is
1343+
// already attached to a different node, preventing silent volume migration.
1344+
func TestControllerPublishVolume_RejectsWhenAttachedToDifferentNode(t *testing.T) {
1345+
serverA := "server-a-uuid"
1346+
serverB := "server-b-uuid"
1347+
initialServers := map[string]*cloudscale.Server{
1348+
serverA: {UUID: serverA},
1349+
serverB: {UUID: serverB},
1350+
}
1351+
cloudscaleClient := NewFakeClient(initialServers)
1352+
1353+
driver := &Driver{
1354+
endpoint: "unix:///tmp/csi-test.sock",
1355+
serverId: serverA,
1356+
zone: DefaultZone.Slug,
1357+
cloudscaleClient: cloudscaleClient,
1358+
mounter: &fakeMounter{mounted: map[string]string{}},
1359+
log: logrus.New().WithField("test_enabled", true),
1360+
volumeLocks: NewVolumeLocks(),
1361+
}
1362+
1363+
ctx := context.Background()
1364+
volumeID := createVolumeForTest(t, driver, "test-vol-multiattach")
1365+
1366+
// Attach volume to server A
1367+
_, err := driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{
1368+
VolumeId: volumeID,
1369+
NodeId: serverA,
1370+
VolumeCapability: &csi.VolumeCapability{
1371+
AccessMode: &csi.VolumeCapability_AccessMode{
1372+
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
1373+
},
1374+
AccessType: &csi.VolumeCapability_Mount{
1375+
Mount: &csi.VolumeCapability_MountVolume{},
1376+
},
1377+
},
1378+
})
1379+
if err != nil {
1380+
t.Fatalf("Failed to publish volume to server A: %v", err)
1381+
}
1382+
1383+
// Try to attach the same volume to server B — should be rejected
1384+
_, err = driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{
1385+
VolumeId: volumeID,
1386+
NodeId: serverB,
1387+
VolumeCapability: &csi.VolumeCapability{
1388+
AccessMode: &csi.VolumeCapability_AccessMode{
1389+
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
1390+
},
1391+
AccessType: &csi.VolumeCapability_Mount{
1392+
Mount: &csi.VolumeCapability_MountVolume{},
1393+
},
1394+
},
1395+
})
1396+
if err == nil {
1397+
t.Fatal("Expected FailedPrecondition error when publishing to different node, got nil")
1398+
}
1399+
1400+
st, ok := status.FromError(err)
1401+
if !ok {
1402+
t.Fatalf("Expected gRPC status error, got: %v", err)
1403+
}
1404+
if st.Code() != codes.FailedPrecondition {
1405+
t.Errorf("Expected codes.FailedPrecondition, got %v: %v", st.Code(), err)
1406+
}
1407+
1408+
// Verify the volume is still attached to server A (not silently moved)
1409+
vol, err := cloudscaleClient.Volumes.Get(ctx, volumeID)
1410+
if err != nil {
1411+
t.Fatalf("Failed to get volume: %v", err)
1412+
}
1413+
if len(*vol.ServerUUIDs) != 1 || (*vol.ServerUUIDs)[0] != serverA {
1414+
t.Errorf("Volume should still be attached to server A, got ServerUUIDs=%v", *vol.ServerUUIDs)
1415+
}
1416+
}
1417+
1418+
// TestControllerPublishVolume_IdempotentSameNode tests that calling
1419+
// ControllerPublishVolume for a volume already attached to the same node
1420+
// returns success without error.
1421+
func TestControllerPublishVolume_IdempotentSameNode(t *testing.T) {
1422+
serverA := "server-a-uuid"
1423+
initialServers := map[string]*cloudscale.Server{
1424+
serverA: {UUID: serverA},
1425+
}
1426+
cloudscaleClient := NewFakeClient(initialServers)
1427+
1428+
driver := &Driver{
1429+
endpoint: "unix:///tmp/csi-test.sock",
1430+
serverId: serverA,
1431+
zone: DefaultZone.Slug,
1432+
cloudscaleClient: cloudscaleClient,
1433+
mounter: &fakeMounter{mounted: map[string]string{}},
1434+
log: logrus.New().WithField("test_enabled", true),
1435+
volumeLocks: NewVolumeLocks(),
1436+
}
1437+
1438+
ctx := context.Background()
1439+
volumeID := createVolumeForTest(t, driver, "test-vol-idempotent")
1440+
1441+
publishReq := &csi.ControllerPublishVolumeRequest{
1442+
VolumeId: volumeID,
1443+
NodeId: serverA,
1444+
VolumeCapability: &csi.VolumeCapability{
1445+
AccessMode: &csi.VolumeCapability_AccessMode{
1446+
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
1447+
},
1448+
AccessType: &csi.VolumeCapability_Mount{
1449+
Mount: &csi.VolumeCapability_MountVolume{},
1450+
},
1451+
},
1452+
VolumeContext: map[string]string{
1453+
LuksEncryptedAttribute: "false",
1454+
},
1455+
}
1456+
1457+
// First publish
1458+
resp1, err := driver.ControllerPublishVolume(ctx, publishReq)
1459+
if err != nil {
1460+
t.Fatalf("First publish failed: %v", err)
1461+
}
1462+
1463+
// Second publish to same node — should succeed (idempotent)
1464+
resp2, err := driver.ControllerPublishVolume(ctx, publishReq)
1465+
if err != nil {
1466+
t.Fatalf("Second publish (idempotent) failed: %v", err)
1467+
}
1468+
1469+
// Both responses should have the same publish context
1470+
if resp1.PublishContext[PublishInfoVolumeName] != resp2.PublishContext[PublishInfoVolumeName] {
1471+
t.Errorf("Publish context mismatch: %v vs %v", resp1.PublishContext, resp2.PublishContext)
1472+
}
1473+
}
1474+
1475+
// TestControllerPublishVolume_SucceedsWhenNotAttached tests that
1476+
// ControllerPublishVolume works normally when the volume is not attached.
1477+
func TestControllerPublishVolume_SucceedsWhenNotAttached(t *testing.T) {
1478+
serverA := "server-a-uuid"
1479+
initialServers := map[string]*cloudscale.Server{
1480+
serverA: {UUID: serverA},
1481+
}
1482+
cloudscaleClient := NewFakeClient(initialServers)
1483+
1484+
driver := &Driver{
1485+
endpoint: "unix:///tmp/csi-test.sock",
1486+
serverId: serverA,
1487+
zone: DefaultZone.Slug,
1488+
cloudscaleClient: cloudscaleClient,
1489+
mounter: &fakeMounter{mounted: map[string]string{}},
1490+
log: logrus.New().WithField("test_enabled", true),
1491+
volumeLocks: NewVolumeLocks(),
1492+
}
1493+
1494+
ctx := context.Background()
1495+
volumeID := createVolumeForTest(t, driver, "test-vol-attach")
1496+
1497+
resp, err := driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{
1498+
VolumeId: volumeID,
1499+
NodeId: serverA,
1500+
VolumeCapability: &csi.VolumeCapability{
1501+
AccessMode: &csi.VolumeCapability_AccessMode{
1502+
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
1503+
},
1504+
AccessType: &csi.VolumeCapability_Mount{
1505+
Mount: &csi.VolumeCapability_MountVolume{},
1506+
},
1507+
},
1508+
})
1509+
if err != nil {
1510+
t.Fatalf("Publish failed: %v", err)
1511+
}
1512+
1513+
if resp.PublishContext[PublishInfoVolumeName] == "" {
1514+
t.Error("Expected non-empty volume name in publish context")
1515+
}
1516+
1517+
// Verify volume is attached to the server
1518+
vol, err := cloudscaleClient.Volumes.Get(ctx, volumeID)
1519+
if err != nil {
1520+
t.Fatalf("Failed to get volume: %v", err)
1521+
}
1522+
if len(*vol.ServerUUIDs) != 1 || (*vol.ServerUUIDs)[0] != serverA {
1523+
t.Errorf("Expected volume attached to server A, got ServerUUIDs=%v", *vol.ServerUUIDs)
1524+
}
1525+
}
1526+
1527+
// TestControllerOperations_VolumeLocks tests that concurrent controller
1528+
// operations on the same volume are properly serialized with volume locks.
1529+
func TestControllerOperations_VolumeLocks(t *testing.T) {
1530+
driver := createDriverForTest(t)
1531+
ctx := context.Background()
1532+
volumeID := createVolumeForTest(t, driver, "test-vol-locks")
1533+
1534+
// Pre-acquire the volume lock
1535+
if !driver.volumeLocks.TryAcquire(volumeID) {
1536+
t.Fatal("Failed to pre-acquire volume lock")
1537+
}
1538+
1539+
// ControllerPublishVolume should return Aborted
1540+
_, err := driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{
1541+
VolumeId: volumeID,
1542+
NodeId: "some-node",
1543+
VolumeCapability: &csi.VolumeCapability{
1544+
AccessMode: &csi.VolumeCapability_AccessMode{
1545+
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
1546+
},
1547+
AccessType: &csi.VolumeCapability_Mount{
1548+
Mount: &csi.VolumeCapability_MountVolume{},
1549+
},
1550+
},
1551+
})
1552+
assertAbortedError(t, err, "ControllerPublishVolume")
1553+
1554+
// ControllerUnpublishVolume should return Aborted
1555+
_, err = driver.ControllerUnpublishVolume(ctx, &csi.ControllerUnpublishVolumeRequest{
1556+
VolumeId: volumeID,
1557+
NodeId: "some-node",
1558+
})
1559+
assertAbortedError(t, err, "ControllerUnpublishVolume")
1560+
1561+
// DeleteVolume should return Aborted
1562+
_, err = driver.DeleteVolume(ctx, &csi.DeleteVolumeRequest{
1563+
VolumeId: volumeID,
1564+
})
1565+
assertAbortedError(t, err, "DeleteVolume")
1566+
1567+
// ControllerExpandVolume should return Aborted
1568+
_, err = driver.ControllerExpandVolume(ctx, &csi.ControllerExpandVolumeRequest{
1569+
VolumeId: volumeID,
1570+
CapacityRange: &csi.CapacityRange{RequiredBytes: 10 * GB},
1571+
})
1572+
assertAbortedError(t, err, "ControllerExpandVolume")
1573+
1574+
// CreateSnapshot should return Aborted (locks on source volume ID)
1575+
_, err = driver.CreateSnapshot(ctx, &csi.CreateSnapshotRequest{
1576+
Name: "snap-locked",
1577+
SourceVolumeId: volumeID,
1578+
})
1579+
assertAbortedError(t, err, "CreateSnapshot")
1580+
1581+
driver.volumeLocks.Release(volumeID)
1582+
}
1583+
1584+
func assertAbortedError(t *testing.T, err error, opName string) {
1585+
t.Helper()
1586+
if err == nil {
1587+
t.Errorf("%s: expected Aborted error when volume is locked, got nil", opName)
1588+
return
1589+
}
1590+
st, ok := status.FromError(err)
1591+
if !ok {
1592+
t.Errorf("%s: expected gRPC status error, got: %v", opName, err)
1593+
return
1594+
}
1595+
if st.Code() != codes.Aborted {
1596+
t.Errorf("%s: expected codes.Aborted, got %v: %v", opName, st.Code(), err)
1597+
}
1598+
}

0 commit comments

Comments
 (0)