struct ldlm_lock *lease;
struct mdt_object *o1 = o, *o2 = NULL;
bool lease_broken;
- bool swap_objects;
+ bool swap_objects = false;
int rc;
ENTRY;
RETURN(-EINVAL);
rc = lu_fid_cmp(&data->cd_fid, mdt_object_fid(o));
- if (unlikely(rc == 0))
- RETURN(-EINVAL);
+ if (rc == 0) {
+ /**
+ * only MDS_CLOSE_LAYOUT_SPLIT use the same fid to indicate
+ * mirror deletion, so we'd zero cd_fid, and keeps o2 be NULL.
+ */
+ if (!(ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT))
+ RETURN(-EINVAL);
- /* Exchange o1 and o2, to enforce locking order */
- swap_objects = (rc < 0);
+ /* zero cd_fid to keeps o2 be NULL */
+ fid_zero(&data->cd_fid);
+ } else if (rc < 0) {
+ /* Exchange o1 and o2, to enforce locking order */
+ swap_objects = true;
+ }
lease = ldlm_handle2lock(&data->cd_handle);
if (lease == NULL)
RETURN(-ESTALE);
- o2 = mdt_object_find(info->mti_env, info->mti_mdt, &data->cd_fid);
- if (IS_ERR(o2))
- GOTO(out_lease, rc = PTR_ERR(o2));
+ if (!fid_is_zero(&data->cd_fid)) {
+ o2 = mdt_object_find(info->mti_env, info->mti_mdt,
+ &data->cd_fid);
+ if (IS_ERR(o2))
+ GOTO(out_lease, rc = PTR_ERR(o2));
- if (!mdt_object_exists(o2))
- GOTO(out_obj, rc = -ENOENT);
+ if (!mdt_object_exists(o2))
+ GOTO(out_obj, rc = -ENOENT);
- if (!S_ISREG(lu_object_attr(&o2->mot_obj)))
- GOTO(out_obj, rc = -EINVAL);
+ if (!S_ISREG(lu_object_attr(&o2->mot_obj)))
+ GOTO(out_obj, rc = -EINVAL);
- if (swap_objects)
- swap(o1, o2);
+ if (swap_objects)
+ swap(o1, o2);
+ }
rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
MAY_WRITE);
if (rc < 0)
GOTO(out_obj, rc);
- rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
- MAY_WRITE);
- if (rc < 0)
- GOTO(out_obj, rc);
+ if (o2) {
+ rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2),
+ NULL, MAY_WRITE);
+ if (rc < 0)
+ GOTO(out_obj, rc);
+ }
/* try to hold open_sem so that nobody else can open the file */
if (!down_write_trylock(&o->mot_open_sem)) {
if (rc < 0)
GOTO(out_unlock_sem, rc);
- mdt_lock_reg_init(lh2, LCK_EX);
- rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
- MDS_INODELOCK_XATTR);
- if (rc < 0)
- GOTO(out_unlock1, rc);
+ if (o2) {
+ mdt_lock_reg_init(lh2, LCK_EX);
+ rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
+ MDS_INODELOCK_XATTR);
+ if (rc < 0)
+ GOTO(out_unlock1, rc);
+ }
/* Swap layout with orphan object */
if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SWAP) {
struct lu_buf *buf = &info->mti_buf;
struct md_rejig_data mrd;
- mrd.mrd_obj = mdt_object_child(o == o1 ? o2 : o1);
+ if (o2) {
+ mrd.mrd_obj = mdt_object_child(o == o1 ? o2 : o1);
+ } else {
+ if (!(ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT)) {
+ /* paranoid check again */
+ CERROR(DFID
+ ":only mirror split support NULL o2 object\n",
+ PFID(mdt_object_fid(o)));
+ GOTO(out_unlock1, rc = -EINVAL);
+ }
+
+ /* set NULL mrd_obj for deleting mirror objects */
+ mrd.mrd_obj = NULL;
+ }
+
if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT)
mrd.mrd_mirror_id = data->cd_mirror_id;
out_unlock2:
/* Release exclusive LL */
- mdt_object_unlock(info, o2, lh2, 1);
+ if (o2)
+ mdt_object_unlock(info, o2, lh2, 1);
out_unlock1:
mdt_object_unlock(info, o1, lh1, 1);
}
out_obj:
- /* Callee takes care of o, we must put the other one. We know
- * that o1 != o2 from check of lu_fid_cmp() above. */
- mdt_object_put(info->mti_env, o1 != o ? o1 : o2);
+ if (o1 != o)
+ /* the 2nd object has been used, and swapped to o1 */
+ mdt_object_put(info->mti_env, o1);
+ else if (o2)
+ /* the 2nd object has been used, and not swapped */
+ mdt_object_put(info->mti_env, o2);
ldlm_reprocess_all(lease->l_resource, lease);