struct mdt_object *o, __u64 *version)
{
LASSERT(o);
- LASSERT(mdt_object_exists(o) >= 0);
if (mdt_object_exists(o) > 0 && !mdt_object_obf(o))
*version = dt_version_get(info->mti_env, mdt_obj2dt(o));
else
if (mdt_object_exists(child) < 0) {
struct seq_server_site *ss;
+ struct lu_ucred *uc = mdt_ucred(info);
+
+ if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) {
+ CERROR("%s: Creating remote dir is only "
+ "permitted for administrator: rc = %d\n",
+ mdt2obd_dev(mdt)->obd_name, -EPERM);
+ GOTO(out_put_child, rc = -EPERM);
+ }
ss = mdt_seq_site(mdt);
if (ss->ss_node_id != 0 &&
RETURN(err_serious(-ENOENT));
/*
- * step 1: lock the parent.
+ * step 1: Found the parent.
*/
- parent_lh = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
- rr->rr_namelen);
+ mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
+ if (IS_ERR(mp)) {
+ rc = PTR_ERR(mp);
+ GOTO(out, rc);
+ }
- mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
- MDS_INODELOCK_UPDATE);
- if (IS_ERR(mp))
- GOTO(out, rc = PTR_ERR(mp));
+ if (mdt_object_obf(mp))
+ GOTO(put_parent, rc = -EPERM);
+
+ parent_lh = &info->mti_lh[MDT_LH_PARENT];
+ lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
+ if (mdt_object_exists(mp) < 0) {
+ mdt_lock_reg_init(parent_lh, LCK_EX);
+ rc = mdt_remote_object_lock(info, mp, &parent_lh->mlh_rreg_lh,
+ parent_lh->mlh_rreg_mode,
+ MDS_INODELOCK_UPDATE);
+ if (rc != ELDLM_OK)
+ GOTO(put_parent, rc);
+
+ } else {
+ mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
+ rr->rr_namelen);
+ rc = mdt_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
+ MDT_LOCAL_LOCK);
+ if (rc)
+ GOTO(put_parent, rc);
- if (mdt_object_obf(mp))
- GOTO(out_unlock_parent, rc = -EPERM);
+ rc = mdt_version_get_check_save(info, mp, 0);
+ if (rc)
+ GOTO(unlock_parent, rc);
+ }
- rc = mdt_version_get_check_save(info, mp, 0);
- if (rc)
- GOTO(out_unlock_parent, rc);
+ /* step 2: find & lock the child */
+ /* lookup child object along with version checking */
+ fid_zero(child_fid);
+ rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1);
+ if (rc != 0)
+ GOTO(unlock_parent, rc);
mdt_reint_init_ma(info, ma);
- /* step 2: find & lock the child */
- lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
- /* lookup child object along with version checking */
- fid_zero(child_fid);
- rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1);
- if (rc != 0)
- GOTO(out_unlock_parent, rc);
+ /* We will lock the child regardless it is local or remote. No harm. */
+ mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
+ if (IS_ERR(mc))
+ GOTO(unlock_parent, rc = PTR_ERR(mc));
- /* We will lock the child regardless it is local or remote. No harm. */
- mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
- if (IS_ERR(mc))
- GOTO(out_unlock_parent, rc = PTR_ERR(mc));
child_lh = &info->mti_lh[MDT_LH_CHILD];
mdt_lock_reg_init(child_lh, LCK_EX);
+ if (mdt_object_exists(mc) < 0) {
+ struct mdt_body *repbody;
+
+ if (!fid_is_zero(rr->rr_fid2)) {
+ CDEBUG(D_INFO, "%s: name %s can not find "DFID"\n",
+ mdt2obd_dev(info->mti_mdt)->obd_name,
+ (char *)rr->rr_name, PFID(mdt_object_fid(mc)));
+ GOTO(unlock_parent, rc = -ENOENT);
+ }
+ CDEBUG(D_INFO, "%s: name %s: "DFID" is another MDT\n",
+ mdt2obd_dev(info->mti_mdt)->obd_name,
+ (char *)rr->rr_name, PFID(mdt_object_fid(mc)));
+
+ if (info->mti_spec.sp_rm_entry) {
+ struct lu_ucred *uc = mdt_ucred(info);
+
+ if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) {
+ CERROR("%s: unlink remote entry is only "
+ "permitted for administrator: rc = %d\n",
+ mdt2obd_dev(info->mti_mdt)->obd_name,
+ -EPERM);
+ GOTO(unlock_parent, rc = -EPERM);
+ }
+
+ ma->ma_need = MA_INODE;
+ ma->ma_valid = 0;
+ mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
+ rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
+ NULL, lname, ma);
+ mdt_object_put(info->mti_env, mc);
+ GOTO(unlock_parent, rc);
+ }
+ /* Revoke the LOOKUP lock of the remote object granted by
+ * this MDT. Since the unlink will happen on another MDT,
+ * it will release the LOOKUP lock right away. Then What
+ * would happen if another client try to grab the LOOKUP
+ * lock at the same time with unlink XXX */
+ mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP,
+ MDT_CROSS_LOCK);
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+ LASSERT(repbody != NULL);
+ repbody->fid1 = *mdt_object_fid(mc);
+ repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
+ mdt_object_unlock_put(info, mc, child_lh, rc);
+ GOTO(unlock_parent, rc = -EREMOTE);
+ } else if (info->mti_spec.sp_rm_entry) {
+ CERROR("%s: lfs rmdir should not be used on local dir %s\n",
+ mdt2obd_dev(info->mti_mdt)->obd_name,
+ (char *)rr->rr_name);
+ mdt_object_put(info->mti_env, mc);
+ GOTO(unlock_parent, rc = -EPERM);
+ }
+
rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL,
MDT_CROSS_LOCK);
- if (rc != 0) {
- mdt_object_put(info->mti_env, mc);
- GOTO(out_unlock_parent, rc);
- }
+ if (rc != 0) {
+ mdt_object_put(info->mti_env, mc);
+ GOTO(unlock_parent, rc);
+ }
mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
OBD_FAIL_MDS_REINT_UNLINK_WRITE);
EXIT;
mdt_object_unlock_put(info, mc, child_lh, rc);
-out_unlock_parent:
- mdt_object_unlock_put(info, mp, parent_lh, rc);
+unlock_parent:
+ mdt_object_unlock(info, mp, parent_lh, rc);
+put_parent:
+ mdt_object_put(info->mti_env, mp);
out:
return rc;
}
struct mdt_lock_handle *lhc);
static mdt_reinter reinters[REINT_MAX] = {
- [REINT_SETATTR] = mdt_reint_setattr,
- [REINT_CREATE] = mdt_reint_create,
- [REINT_LINK] = mdt_reint_link,
- [REINT_UNLINK] = mdt_reint_unlink,
- [REINT_RENAME] = mdt_reint_rename,
- [REINT_OPEN] = mdt_reint_open,
- [REINT_SETXATTR] = mdt_reint_setxattr
+ [REINT_SETATTR] = mdt_reint_setattr,
+ [REINT_CREATE] = mdt_reint_create,
+ [REINT_LINK] = mdt_reint_link,
+ [REINT_UNLINK] = mdt_reint_unlink,
+ [REINT_RENAME] = mdt_reint_rename,
+ [REINT_OPEN] = mdt_reint_open,
+ [REINT_SETXATTR] = mdt_reint_setxattr,
+ [REINT_RMENTRY] = mdt_reint_unlink
};
int mdt_reint_rec(struct mdt_thread_info *info,