Whamcloud - gitweb
LU-966 mdd: mdd object may not be exist
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index 3ffd2e6..ef422a9 100644 (file)
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -247,7 +250,8 @@ int mdt_lookup_version_check(struct mdt_thread_info *info,
         info->mti_ver[idx] = ENOENT_VERSION;
         if (rc == 0) {
                 struct mdt_object *child;
-                child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
+                child = mdt_object_find(info->mti_env, info->mti_mdt, fid,
+                                        MDT_OBJ_MAY_NOT_EXIST);
                 if (likely(!IS_ERR(child))) {
                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
                         mdt_object_put(info->mti_env, child);
@@ -288,7 +292,7 @@ static int mdt_md_create(struct mdt_thread_info *info)
         mdt_lock_pdo_init(lh, LCK_PW, rr->rr_name, rr->rr_namelen);
 
         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
-                                      MDS_INODELOCK_UPDATE);
+                                      MDS_INODELOCK_UPDATE, MDT_OBJ_MUST_EXIST);
         if (IS_ERR(parent))
                 RETURN(PTR_ERR(parent));
 
@@ -310,7 +314,8 @@ static int mdt_md_create(struct mdt_thread_info *info)
         /* save version of file name for replay, it must be ENOENT here */
         mdt_enoent_version_save(info, 1);
 
-        child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2);
+        child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2,
+                                MDT_OBJ_MAY_NOT_EXIST);
         if (likely(!IS_ERR(child))) {
                 struct md_object *next = mdt_object_child(parent);
 
@@ -376,7 +381,8 @@ static int mdt_md_mkobj(struct mdt_thread_info *info)
 
         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
-        o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2);
+        o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2,
+                            MDT_OBJ_MAY_NOT_EXIST);
         if (!IS_ERR(o)) {
                 struct md_object *next = mdt_object_child(o);
 
@@ -492,7 +498,8 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
 
         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-        mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
+        mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1,
+                             MDT_OBJ_MUST_EXIST);
         if (IS_ERR(mo))
                 GOTO(out, rc = PTR_ERR(mo));
 
@@ -557,13 +564,13 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
 
                 /* Close the found mfd, update attributes. */
                 ma->ma_lmm_size = info->mti_mdt->mdt_max_mdsize;
-                OBD_ALLOC(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize);
+                OBD_ALLOC_LARGE(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize);
                 if (ma->ma_lmm == NULL)
                         GOTO(out_put, rc = -ENOMEM);
 
                 mdt_mfd_close(info, mfd);
 
-                OBD_FREE(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize);
+                OBD_FREE_LARGE(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize);
         } else {
                 rc = mdt_attr_set(info, mo, ma, rr->rr_flags);
                 if (rc)
@@ -598,6 +605,9 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
 out_put:
         mdt_object_put(info->mti_env, mo);
 out:
+        if (rc == 0)
+                mdt_counter_incr(req->rq_export, LPROC_MDT_SETATTR);
+
         mdt_shrink_reply(info);
         return rc;
 }
@@ -605,7 +615,8 @@ out:
 static int mdt_reint_create(struct mdt_thread_info *info,
                             struct mdt_lock_handle *lhc)
 {
-        int rc;
+        struct ptlrpc_request   *req = mdt_info_req(info);
+        int                     rc;
         ENTRY;
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
@@ -617,10 +628,15 @@ static int mdt_reint_create(struct mdt_thread_info *info,
         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
         case S_IFDIR:{
                 /* Cross-ref case. */
+                /* TODO: we can add LPROC_MDT_CROSS for cross-ref stats */
                 if (info->mti_cross_ref) {
                         rc = mdt_md_mkobj(info);
-                        break;
+                } else {
+                        LASSERT(info->mti_rr.rr_namelen > 0);
+                        mdt_counter_incr(req->rq_export, LPROC_MDT_MKDIR);
+                        rc = mdt_md_create(info);
                 }
+                break;
         }
         case S_IFREG:
         case S_IFLNK:
@@ -630,6 +646,7 @@ static int mdt_reint_create(struct mdt_thread_info *info,
         case S_IFSOCK:{
                 /* Special file should stay on the same node as parent. */
                 LASSERT(info->mti_rr.rr_namelen > 0);
+                mdt_counter_incr(req->rq_export, LPROC_MDT_MKNOD);
                 rc = mdt_md_create(info);
                 break;
         }
@@ -641,7 +658,7 @@ static int mdt_reint_create(struct mdt_thread_info *info,
 
 /*
  * VBR: save parent version in reply and child version getting by its name.
- * Version of child is getting and checking during its lookup. If 
+ * Version of child is getting and checking during its lookup. If
  */
 static int mdt_reint_unlink(struct mdt_thread_info *info,
                             struct mdt_lock_handle *lhc)
@@ -683,7 +700,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                                   rr->rr_namelen);
         }
         mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
-                                  MDS_INODELOCK_UPDATE);
+                                  MDS_INODELOCK_UPDATE, MDT_OBJ_MUST_EXIST);
         if (IS_ERR(mp)) {
                 rc = PTR_ERR(mp);
                 /* errors are possible here in cross-ref cases, see below */
@@ -726,7 +743,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                  GOTO(out_unlock_parent, rc);
 
         /* We will lock the child regardless it is local or remote. No harm. */
-        mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
+        mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid,
+                             MDT_OBJ_MUST_EXIST);
         if (IS_ERR(mc))
                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
         child_lh = &info->mti_lh[MDT_LH_CHILD];
@@ -754,6 +772,25 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         if (rc == 0)
                 mdt_handle_last_unlink(info, mc, ma);
 
+        if (ma->ma_valid & MA_INODE) {
+                switch (ma->ma_attr.la_mode & S_IFMT) {
+                case S_IFDIR:
+                        mdt_counter_incr(req->rq_export, LPROC_MDT_RMDIR);
+                        break;
+                case S_IFREG:
+                case S_IFLNK:
+                case S_IFCHR:
+                case S_IFBLK:
+                case S_IFIFO:
+                case S_IFSOCK:
+                        mdt_counter_incr(req->rq_export, LPROC_MDT_UNLINK);
+                        break;
+                default:
+                        LASSERTF(0, "bad file type %o unlinking\n",
+                                 ma->ma_attr.la_mode);
+                }
+        }
+
         EXIT;
 
         mdt_object_unlock_put(info, mc, child_lh, rc);
@@ -795,7 +832,8 @@ static int mdt_reint_link(struct mdt_thread_info *info,
                 lhs = &info->mti_lh[MDT_LH_CHILD];
                 mdt_lock_reg_init(lhs, LCK_EX);
                 ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
-                                          MDS_INODELOCK_UPDATE);
+                                          MDS_INODELOCK_UPDATE,
+                                          MDT_OBJ_MUST_EXIST);
                 if (IS_ERR(ms))
                         RETURN(PTR_ERR(ms));
 
@@ -815,7 +853,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         mdt_lock_pdo_init(lhp, LCK_PW, rr->rr_name,
                           rr->rr_namelen);
         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
-                                  MDS_INODELOCK_UPDATE);
+                                  MDS_INODELOCK_UPDATE, MDT_OBJ_MUST_EXIST);
         if (IS_ERR(mp))
                 RETURN(PTR_ERR(mp));
 
@@ -827,7 +865,8 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         lhs = &info->mti_lh[MDT_LH_CHILD];
         mdt_lock_reg_init(lhs, LCK_EX);
 
-        ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
+        ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1,
+                             MDT_OBJ_MUST_EXIST);
         if (IS_ERR(ms))
                 GOTO(out_unlock_parent, rc = PTR_ERR(ms));
 
@@ -861,6 +900,9 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         rc = mdo_link(info->mti_env, mdt_object_child(mp),
                       mdt_object_child(ms), lname, ma);
 
+        if (rc == 0)
+                mdt_counter_incr(req->rq_export, LPROC_MDT_LINK);
+
         EXIT;
 out_unlock_child:
         mdt_object_unlock_put(info, ms, lhs, rc);
@@ -923,7 +965,8 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
         mdt_lock_pdo_init(lh_tgtdir, LCK_PW, rr->rr_tgt,
                           rr->rr_tgtlen);
         mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
-                                       MDS_INODELOCK_UPDATE);
+                                       MDS_INODELOCK_UPDATE,
+                                       MDT_OBJ_MUST_EXIST);
         if (IS_ERR(mtgtdir))
                 RETURN(PTR_ERR(mtgtdir));
 
@@ -946,7 +989,8 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
                 mdt_lock_reg_init(lh_tgt, LCK_EX);
 
                 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
-                                            MDS_INODELOCK_LOOKUP);
+                                            MDS_INODELOCK_LOOKUP,
+                                            MDT_OBJ_MUST_EXIST);
                 if (IS_ERR(mtgt))
                         GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
 
@@ -1042,7 +1086,8 @@ static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
 
         do {
                 LASSERT(fid_is_sane(&dst_fid));
-                dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
+                dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid,
+                                      MDT_OBJ_MUST_EXIST);
                 if (!IS_ERR(dst)) {
                         rc = mdo_is_subdir(info->mti_env,
                                            mdt_object_child(dst), fid,
@@ -1115,7 +1160,8 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, rr->rr_name,
                           rr->rr_namelen);
         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
-                                       MDS_INODELOCK_UPDATE);
+                                       MDS_INODELOCK_UPDATE,
+                                       MDT_OBJ_MUST_EXIST);
         if (IS_ERR(msrcdir))
                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
 
@@ -1139,7 +1185,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                 }
         } else {
                 mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
-                                          rr->rr_fid2);
+                                          rr->rr_fid2, MDT_OBJ_MUST_EXIST);
                 if (IS_ERR(mtgtdir))
                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
 
@@ -1148,10 +1194,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                 if (rc)
                         GOTO(out_put_target, rc);
 
-                rc = mdt_object_exists(mtgtdir);
-                if (rc == 0) {
-                        GOTO(out_put_target, rc = -ESTALE);
-                } else if (rc > 0) {
+                if (mdt_object_exists(mtgtdir) > 0) {
                         /* we lock the target dir if it is local */
                         rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
                                              MDS_INODELOCK_UPDATE,
@@ -1173,7 +1216,8 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
                 GOTO(out_unlock_target, rc = -EINVAL);
 
-        mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
+        mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid,
+                               MDT_OBJ_MUST_EXIST);
         if (IS_ERR(mold))
                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
 
@@ -1206,7 +1250,8 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                         GOTO(out_unlock_old, rc = -EINVAL);
 
                 mdt_lock_reg_init(lh_newp, LCK_EX);
-                mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
+                mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid,
+                                       MDT_OBJ_MAY_NOT_EXIST);
                 if (IS_ERR(mnew))
                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
 
@@ -1245,8 +1290,11 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                         lname, ma);
 
         /* handle last link of tgt object */
-        if (rc == 0 && mnew)
-                mdt_handle_last_unlink(info, mnew, ma);
+        if (rc == 0) {
+                mdt_counter_incr(req->rq_export, LPROC_MDT_RENAME);
+                if (mnew)
+                        mdt_handle_last_unlink(info, mnew, ma);
+        }
 
         EXIT;
 out_unlock_new: