Skip to content

Commit 8022a75

Browse files
committed
fix: Fix the way operation locks are implemented, improve volume expand
1 parent f222b54 commit 8022a75

File tree

4 files changed

+49
-25
lines changed

4 files changed

+49
-25
lines changed

pkg/driver/controller.go

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,23 @@ var onlyVolumeCapAccessMode = csi.VolumeCapability_AccessMode{
2626

2727
type controllerServer struct {
2828
csi.UnimplementedControllerServer
29-
connector cloud.Interface
29+
// connector is the CloudStack client interface
30+
connector cloud.Interface
31+
32+
// A map storing all volumes with ongoing operations so that additional operations
33+
// for that same volume (as defined by VolumeID/volume name) return an Aborted error
3034
volumeLocks *util.VolumeLocks
35+
36+
// A map storing all volumes/snapshots with ongoing operations.
37+
operationLocks *util.OperationLock
3138
}
3239

3340
// NewControllerServer creates a new Controller gRPC server.
3441
func NewControllerServer(connector cloud.Interface) csi.ControllerServer {
3542
return &controllerServer{
36-
connector: connector,
37-
volumeLocks: util.NewVolumeLocks(),
43+
connector: connector,
44+
volumeLocks: util.NewVolumeLocks(),
45+
operationLocks: util.NewOperationLock(),
3846
}
3947
}
4048

@@ -237,6 +245,14 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
237245
}
238246
defer cs.volumeLocks.Release(volumeID)
239247

248+
// lock out volumeID for clone and expand operation
249+
if err := cs.operationLocks.GetDeleteLock(volumeID); err != nil {
250+
logger.Error(err, "Failed to acquire delete operation lock")
251+
252+
return nil, status.Error(codes.Aborted, err.Error())
253+
}
254+
defer cs.operationLocks.ReleaseDeleteLock(volumeID)
255+
240256
logger.Info("Deleting volume",
241257
"volumeID", volumeID,
242258
)
@@ -448,25 +464,24 @@ func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi
448464
logger := klog.FromContext(ctx)
449465
logger.V(6).Info("ControllerExpandVolume: called", "args", protosanitizer.StripSecrets(*req))
450466

451-
expandVolumeLock := util.NewOperationLock(ctx)
452-
453467
volumeID := req.GetVolumeId()
454468
if len(volumeID) == 0 {
455469
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
456470
}
457-
err := expandVolumeLock.GetExpandLock(volumeID)
458-
if err != nil {
459-
logger.Error(err, "failed acquiring expand lock", "volumeID", volumeID)
460-
461-
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
462-
}
463-
defer expandVolumeLock.ReleaseExpandLock(volumeID)
464471

465472
capRange := req.GetCapacityRange()
466473
if capRange == nil {
467474
return nil, status.Error(codes.InvalidArgument, "Capacity range not provided")
468475
}
469476

477+
// lock out parallel requests against the same volume ID
478+
if acquired := cs.volumeLocks.TryAcquire(volumeID); !acquired {
479+
logger.Error(errors.New(util.ErrVolumeOperationAlreadyExistsVolumeID), "failed to acquire volume lock", "volumeID", volumeID)
480+
481+
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
482+
}
483+
defer cs.volumeLocks.Release(volumeID)
484+
470485
volSizeBytes := capRange.GetRequiredBytes()
471486
volSizeGB := util.RoundUpBytesToGB(volSizeBytes)
472487
maxVolSize := capRange.GetLimitBytes()
@@ -496,6 +511,15 @@ func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi
496511
NodeExpansionRequired: true,
497512
}, nil
498513
}
514+
515+
// lock out volumeID for clone and delete operation
516+
if err := cs.operationLocks.GetExpandLock(volumeID); err != nil {
517+
logger.Error(err, "failed acquiring expand lock", "volumeID", volumeID)
518+
519+
return nil, status.Error(codes.Aborted, err.Error())
520+
}
521+
defer cs.operationLocks.ReleaseExpandLock(volumeID)
522+
499523
err = cs.connector.ExpandVolume(ctx, volumeID, volSizeGB)
500524
if err != nil {
501525
return nil, status.Errorf(codes.Internal, "Could not resize volume %q to size %v: %v", volumeID, volSizeGB, err)

pkg/driver/node.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,10 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
403403
if len(volumeID) == 0 {
404404
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
405405
}
406-
volumePath := req.GetVolumePath()
406+
407+
// Get volume path
408+
// This should work for Kubernetes >= 1.26, see https://github.com/kubernetes/kubernetes/issues/115343
409+
volumePath := req.GetStagingTargetPath()
407410
if len(volumePath) == 0 {
408411
return nil, status.Error(codes.InvalidArgument, "Volume path not provided")
409412
}
@@ -418,6 +421,13 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
418421
}
419422
}
420423

424+
if acquired := ns.volumeLocks.TryAcquire(volumeID); !acquired {
425+
logger.Error(errors.New(util.ErrVolumeOperationAlreadyExistsVolumeID), "failed to acquire volume lock", "volumeID", volumeID)
426+
427+
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
428+
}
429+
defer ns.volumeLocks.Release(volumeID)
430+
421431
_, err := ns.connector.GetVolumeByID(ctx, volumeID)
422432
if err != nil {
423433
if errors.Is(err, cloud.ErrNotFound) {
@@ -427,11 +437,6 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
427437
return nil, status.Error(codes.Internal, fmt.Sprintf("NodeExpandVolume failed with error %v", err))
428438
}
429439

430-
_, err = ns.mounter.GetMountRefs(volumePath)
431-
if err != nil {
432-
return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to find mount file system %s: %v", volumePath, err))
433-
}
434-
435440
devicePath, err := ns.mounter.GetDevicePath(ctx, volumeID)
436441
if devicePath == "" {
437442
return nil, status.Error(codes.Internal, fmt.Sprintf("Unable to find Device path for volume %s: %v", volumeID, err))

pkg/util/idlocker.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ limitations under the License.
1414
package util
1515

1616
import (
17-
"context"
1817
"fmt"
1918
"sync"
2019

@@ -95,12 +94,10 @@ type OperationLock struct {
9594
locks map[operation]map[string]int
9695
// lock to avoid concurrent operation on map
9796
mux sync.Mutex
98-
// context for logging
99-
ctx context.Context //nolint:containedctx
10097
}
10198

10299
// NewOperationLock returns new OperationLock.
103-
func NewOperationLock(ctx context.Context) *OperationLock {
100+
func NewOperationLock() *OperationLock {
104101
lock := make(map[operation]map[string]int)
105102
lock[createOp] = make(map[string]int)
106103
lock[deleteOp] = make(map[string]int)
@@ -110,7 +107,6 @@ func NewOperationLock(ctx context.Context) *OperationLock {
110107

111108
return &OperationLock{
112109
locks: lock,
113-
ctx: ctx,
114110
}
115111
}
116112

pkg/util/idlocker_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package util
1818

1919
import (
20-
"context"
2120
"testing"
2221
)
2322

@@ -56,7 +55,7 @@ func TestIDLocker(t *testing.T) {
5655
func TestOperationLocks(t *testing.T) {
5756
t.Parallel()
5857
volumeID := "test-vol"
59-
lock := NewOperationLock(context.Background())
58+
lock := NewOperationLock()
6059
err := lock.GetCloneLock(volumeID)
6160
if err != nil {
6261
t.Errorf("failed to acquire clone lock for %s %s", volumeID, err)

0 commit comments

Comments
 (0)