Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent dataloss due to the concurrent RPC calls (occurrence is very low) #4970

Merged
merged 4 commits into from
Nov 21, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
cephfs: take lock on targetpath on node operation
We should not be dependent on the CO to ensure
that it will serialize the request instead of
that we need to have own internal locks to ensure
that we dont do concurrent operations for same
request.

Signed-off-by: Madhu Rajanna <[email protected]>
  • Loading branch information
Madhu-1 committed Nov 21, 2024
commit 38b0a4cbadfb20e818e76cc0d3b603274513ec8e
21 changes: 15 additions & 6 deletions internal/cephfs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,13 @@ func (ns *NodeServer) NodePublishVolume(
targetPath := req.GetTargetPath()
volID := fsutil.VolumeID(req.GetVolumeId())

if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

volOptions := &store.VolumeOptions{}
defer volOptions.Destroy()

Expand All @@ -506,9 +513,6 @@ func (ns *NodeServer) NodePublishVolume(
return nil, status.Errorf(codes.Internal, "failed to create mounter for volume %s: %v", volID, err.Error())
}

// Considering kubelet make sure the stage and publish operations
// are serialized, we dont need any extra locking in nodePublish

if err = util.CreateMountPoint(targetPath); err != nil {
log.ErrorLog(ctx, "failed to create mount point at %s: %v", targetPath, err)

Expand Down Expand Up @@ -599,12 +603,17 @@ func (ns *NodeServer) NodeUnpublishVolume(
return nil, err
}

// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.
targetPath := req.GetTargetPath()
volID := req.GetVolumeId()
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

// stop the health-checker that may have been started in NodeGetVolumeStats()
ns.healthChecker.StopChecker(req.GetVolumeId(), targetPath)
ns.healthChecker.StopChecker(volID, targetPath)

isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
if err != nil {
Expand Down