Whamcloud - gitweb
LU-1303 lod: introduce lod device
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index 523003c..c2227e7 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
@@ -29,8 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011 Whamcloud, Inc.
- *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -47,9 +44,6 @@
  * Author: Yury Umanets <umka@clusterfs.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include "mdt_internal.h"
@@ -109,7 +103,7 @@ static void mdt_obj_version_get(struct mdt_thread_info *info,
 {
         LASSERT(o);
         LASSERT(mdt_object_exists(o) >= 0);
-        if (mdt_object_exists(o) > 0)
+       if (mdt_object_exists(o) > 0 && !mdt_object_obf(o))
                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
         else
                 *version = ENOENT_VERSION;
@@ -250,8 +244,7 @@ int mdt_lookup_version_check(struct mdt_thread_info *info,
         info->mti_ver[idx] = ENOENT_VERSION;
         if (rc == 0) {
                 struct mdt_object *child;
-                child = mdt_object_find(info->mti_env, info->mti_mdt, fid,
-                                        MDT_OBJ_MAY_NOT_EXIST);
+                child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
                 if (likely(!IS_ERR(child))) {
                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
                         mdt_object_put(info->mti_env, child);
@@ -292,10 +285,13 @@ static int mdt_md_create(struct mdt_thread_info *info)
         mdt_lock_pdo_init(lh, LCK_PW, rr->rr_name, rr->rr_namelen);
 
         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
-                                      MDS_INODELOCK_UPDATE, MDT_OBJ_MUST_EXIST);
+                                      MDS_INODELOCK_UPDATE);
         if (IS_ERR(parent))
                 RETURN(PTR_ERR(parent));
 
+        if (mdt_object_obf(parent))
+                GOTO(out_put_parent, rc = -EPERM);
+
         rc = mdt_version_get_check_save(info, parent, 0);
         if (rc)
                 GOTO(out_put_parent, rc);
@@ -307,15 +303,17 @@ static int mdt_md_create(struct mdt_thread_info *info)
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
         rc = mdt_lookup_version_check(info, parent, lname,
                                       &info->mti_tmp_fid1, 1);
-        /* -ENOENT is expected here */
-        if (rc != 0 && rc != -ENOENT)
-                GOTO(out_put_parent, rc);
+       if (rc == 0)
+               GOTO(out_put_parent, rc = -EEXIST);
 
-        /* save version of file name for replay, it must be ENOENT here */
-        mdt_enoent_version_save(info, 1);
+       /* -ENOENT is expected here */
+       if (rc != -ENOENT)
+               GOTO(out_put_parent, rc);
+
+       /* save version of file name for replay, it must be ENOENT here */
+       mdt_enoent_version_save(info, 1);
 
-        child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2,
-                                MDT_OBJ_MAY_NOT_EXIST);
+       child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
         if (likely(!IS_ERR(child))) {
                 struct md_object *next = mdt_object_child(parent);
 
@@ -339,16 +337,19 @@ static int mdt_md_create(struct mdt_thread_info *info)
                 info->mti_spec.sp_cr_mode =
                         mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
 
-                /*
-                 * Do perform lookup sanity check. We do not know if name exists
-                 * or not.
-                 */
-                info->mti_spec.sp_cr_lookup = 1;
+               /*
+                * Do not perform lookup sanity check. We know that name does
+                * not exist.
+                */
+               info->mti_spec.sp_cr_lookup = 0;
                 info->mti_spec.sp_feat = &dt_directory_features;
 
                 rc = mdo_create(info->mti_env, next, lname,
                                 mdt_object_child(child),
                                 &info->mti_spec, ma);
+               if (rc == 0)
+                       rc = mdt_attr_get_complex(info, child, ma);
+
                 if (rc == 0) {
                         /* Return fid & attr to client. */
                         if (ma->ma_valid & MA_INODE)
@@ -381,8 +382,7 @@ static int mdt_md_mkobj(struct mdt_thread_info *info)
 
         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
-        o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2,
-                            MDT_OBJ_MAY_NOT_EXIST);
+        o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2);
         if (!IS_ERR(o)) {
                 struct md_object *next = mdt_object_child(o);
 
@@ -394,7 +394,7 @@ static int mdt_md_mkobj(struct mdt_thread_info *info)
                  * recovery, just get attr in that case.
                  */
                 if (mdt_object_exists(o) == 1) {
-                        rc = mo_attr_get(info->mti_env, next, ma);
+                       rc = mdt_attr_get_complex(info, o, ma);
                 } else {
                         /*
                          * Here, NO permission check for object_create,
@@ -479,9 +479,8 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
         struct mdt_file_data    *mfd;
         struct mdt_object       *mo;
-        struct md_object        *next;
         struct mdt_body         *repbody;
-        int                      som_au, rc;
+        int                      som_au, rc, rc2;
         ENTRY;
 
         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
@@ -491,11 +490,13 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
 
         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-        mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1,
-                             MDT_OBJ_MUST_EXIST);
+        mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
         if (IS_ERR(mo))
                 GOTO(out, rc = PTR_ERR(mo));
 
+       if (mdt_object_obf(mo))
+               GOTO(out_put, rc = -EPERM);
+
         /* start a log jounal handle if needed */
         if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) {
                 if ((ma->ma_attr.la_valid & LA_SIZE) ||
@@ -572,8 +573,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
 
         ma->ma_need = MA_INODE;
         ma->ma_valid = 0;
-        next = mdt_object_child(mo);
-        rc = mo_attr_get(info->mti_env, next, ma);
+       rc = mdt_attr_get_complex(info, mo, ma);
         if (rc != 0)
                 GOTO(out_put, rc);
 
@@ -599,10 +599,12 @@ out_put:
         mdt_object_put(info->mti_env, mo);
 out:
         if (rc == 0)
-                mdt_counter_incr(req->rq_export, LPROC_MDT_SETATTR);
+               mdt_counter_incr(req, LPROC_MDT_SETATTR);
 
         mdt_client_compatibility(info);
-        mdt_shrink_reply(info);
+        rc2 = mdt_fix_reply(info);
+        if (rc == 0)
+                rc = rc2;
         return rc;
 }
 
@@ -627,7 +629,7 @@ static int mdt_reint_create(struct mdt_thread_info *info,
                         rc = mdt_md_mkobj(info);
                 } else {
                         LASSERT(info->mti_rr.rr_namelen > 0);
-                        mdt_counter_incr(req->rq_export, LPROC_MDT_MKDIR);
+                       mdt_counter_incr(req, LPROC_MDT_MKDIR);
                         rc = mdt_md_create(info);
                 }
                 break;
@@ -640,7 +642,7 @@ static int mdt_reint_create(struct mdt_thread_info *info,
         case S_IFSOCK:{
                 /* Special file should stay on the same node as parent. */
                 LASSERT(info->mti_rr.rr_namelen > 0);
-                mdt_counter_incr(req->rq_export, LPROC_MDT_MKNOD);
+               mdt_counter_incr(req, LPROC_MDT_MKNOD);
                 rc = mdt_md_create(info);
                 break;
         }
@@ -694,7 +696,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                                   rr->rr_namelen);
         }
         mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
-                                  MDS_INODELOCK_UPDATE, MDT_OBJ_MUST_EXIST);
+                                  MDS_INODELOCK_UPDATE);
         if (IS_ERR(mp)) {
                 rc = PTR_ERR(mp);
                 /* errors are possible here in cross-ref cases, see below */
@@ -703,6 +705,9 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                 GOTO(out, rc);
         }
 
+        if (mdt_object_obf(mp))
+                GOTO(out_unlock_parent, rc = -EPERM);
+
         rc = mdt_version_get_check_save(info, mp, 0);
         if (rc)
                 GOTO(out_unlock_parent, rc);
@@ -732,13 +737,13 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         /* step 2: find & lock the child */
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
         /* lookup child object along with version checking */
+        fid_zero(child_fid);
         rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1);
         if (rc != 0)
                  GOTO(out_unlock_parent, rc);
 
         /* We will lock the child regardless it is local or remote. No harm. */
-        mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid,
-                             MDT_OBJ_MUST_EXIST);
+        mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
         if (IS_ERR(mc))
                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
         child_lh = &info->mti_lh[MDT_LH_CHILD];
@@ -763,13 +768,15 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
                         mdt_object_child(mc), lname, ma);
+       if (rc == 0 && !lu_object_is_dying(&mc->mot_header))
+               rc = mdt_attr_get_complex(info, mc, ma);
         if (rc == 0)
                 mdt_handle_last_unlink(info, mc, ma);
 
         if (ma->ma_valid & MA_INODE) {
                 switch (ma->ma_attr.la_mode & S_IFMT) {
                 case S_IFDIR:
-                        mdt_counter_incr(req->rq_export, LPROC_MDT_RMDIR);
+                       mdt_counter_incr(req, LPROC_MDT_RMDIR);
                         break;
                 case S_IFREG:
                 case S_IFLNK:
@@ -777,7 +784,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                 case S_IFBLK:
                 case S_IFIFO:
                 case S_IFSOCK:
-                        mdt_counter_incr(req->rq_export, LPROC_MDT_UNLINK);
+                       mdt_counter_incr(req, LPROC_MDT_UNLINK);
                         break;
                 default:
                         LASSERTF(0, "bad file type %o unlinking\n",
@@ -826,8 +833,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
                 lhs = &info->mti_lh[MDT_LH_CHILD];
                 mdt_lock_reg_init(lhs, LCK_EX);
                 ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
-                                          MDS_INODELOCK_UPDATE,
-                                          MDT_OBJ_MUST_EXIST);
+                                          MDS_INODELOCK_UPDATE);
                 if (IS_ERR(ms))
                         RETURN(PTR_ERR(ms));
 
@@ -847,10 +853,13 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         mdt_lock_pdo_init(lhp, LCK_PW, rr->rr_name,
                           rr->rr_namelen);
         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
-                                  MDS_INODELOCK_UPDATE, MDT_OBJ_MUST_EXIST);
+                                  MDS_INODELOCK_UPDATE);
         if (IS_ERR(mp))
                 RETURN(PTR_ERR(mp));
 
+        if (mdt_object_obf(mp))
+                GOTO(out_unlock_parent, rc = -EPERM);
+
         rc = mdt_version_get_check_save(info, mp, 0);
         if (rc)
                 GOTO(out_unlock_parent, rc);
@@ -859,8 +868,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         lhs = &info->mti_lh[MDT_LH_CHILD];
         mdt_lock_reg_init(lhs, LCK_EX);
 
-        ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1,
-                             MDT_OBJ_MUST_EXIST);
+        ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
         if (IS_ERR(ms))
                 GOTO(out_unlock_parent, rc = PTR_ERR(ms));
 
@@ -895,7 +903,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
                       mdt_object_child(ms), lname, ma);
 
         if (rc == 0)
-                mdt_counter_incr(req->rq_export, LPROC_MDT_LINK);
+               mdt_counter_incr(req, LPROC_MDT_LINK);
 
         EXIT;
 out_unlock_child:
@@ -959,8 +967,7 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
         mdt_lock_pdo_init(lh_tgtdir, LCK_PW, rr->rr_tgt,
                           rr->rr_tgtlen);
         mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
-                                       MDS_INODELOCK_UPDATE,
-                                       MDT_OBJ_MUST_EXIST);
+                                       MDS_INODELOCK_UPDATE);
         if (IS_ERR(mtgtdir))
                 RETURN(PTR_ERR(mtgtdir));
 
@@ -983,8 +990,7 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
                 mdt_lock_reg_init(lh_tgt, LCK_EX);
 
                 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
-                                            MDS_INODELOCK_LOOKUP,
-                                            MDT_OBJ_MUST_EXIST);
+                                            MDS_INODELOCK_LOOKUP);
                 if (IS_ERR(mtgt))
                         GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
 
@@ -1080,8 +1086,7 @@ static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
 
         do {
                 LASSERT(fid_is_sane(&dst_fid));
-                dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid,
-                                      MDT_OBJ_MUST_EXIST);
+                dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
                 if (!IS_ERR(dst)) {
                         rc = mdo_is_subdir(info->mti_env,
                                            mdt_object_child(dst), fid,
@@ -1154,11 +1159,13 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, rr->rr_name,
                           rr->rr_namelen);
         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
-                                       MDS_INODELOCK_UPDATE,
-                                       MDT_OBJ_MUST_EXIST);
+                                       MDS_INODELOCK_UPDATE);
         if (IS_ERR(msrcdir))
                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
 
+        if (mdt_object_obf(msrcdir))
+                GOTO(out_unlock_source, rc = -EPERM);
+
         rc = mdt_version_get_check_save(info, msrcdir, 0);
         if (rc)
                 GOTO(out_unlock_source, rc);
@@ -1179,16 +1186,22 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                 }
         } else {
                 mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
-                                          rr->rr_fid2, MDT_OBJ_MUST_EXIST);
+                                          rr->rr_fid2);
                 if (IS_ERR(mtgtdir))
                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
 
+                if (mdt_object_obf(mtgtdir))
+                        GOTO(out_put_target, rc = -EPERM);
+
                 /* check early, the real version will be saved after locking */
                 rc = mdt_version_get_check(info, mtgtdir, 1);
                 if (rc)
                         GOTO(out_put_target, rc);
 
-                if (mdt_object_exists(mtgtdir) > 0) {
+                rc = mdt_object_exists(mtgtdir);
+                if (rc == 0) {
+                        GOTO(out_put_target, rc = -ESTALE);
+                } else if (rc > 0) {
                         /* we lock the target dir if it is local */
                         rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
                                              MDS_INODELOCK_UPDATE,
@@ -1203,6 +1216,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         /* step 3: find & lock the old object. */
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
         mdt_name_copy(&slname, lname);
+        fid_zero(old_fid);
         rc = mdt_lookup_version_check(info, msrcdir, &slname, old_fid, 2);
         if (rc != 0)
                 GOTO(out_unlock_target, rc);
@@ -1210,11 +1224,15 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
                 GOTO(out_unlock_target, rc = -EINVAL);
 
-        mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid,
-                               MDT_OBJ_MUST_EXIST);
+        mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
         if (IS_ERR(mold))
                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
 
+       if (mdt_object_obf(mold)) {
+               mdt_object_put(info->mti_env, mold);
+               GOTO(out_unlock_target, rc = -EPERM);
+       }
+
         lh_oldp = &info->mti_lh[MDT_LH_OLD];
         mdt_lock_reg_init(lh_oldp, LCK_EX);
         rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP,
@@ -1233,6 +1251,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         /* new target object may not exist now */
         lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen);
         /* lookup with version checking */
+        fid_zero(new_fid);
         rc = mdt_lookup_version_check(info, mtgtdir, lname, new_fid, 3);
         if (rc == 0) {
                 /* the new_fid should have been filled at this moment */
@@ -1244,11 +1263,15 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                         GOTO(out_unlock_old, rc = -EINVAL);
 
                 mdt_lock_reg_init(lh_newp, LCK_EX);
-                mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid,
-                                       MDT_OBJ_MAY_NOT_EXIST);
+                mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
                 if (IS_ERR(mnew))
                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
 
+               if (mdt_object_obf(mnew)) {
+                       mdt_object_put(info->mti_env, mnew);
+                       GOTO(out_unlock_old, rc = -EPERM);
+               }
+
                 rc = mdt_object_lock(info, mnew, lh_newp,
                                      MDS_INODELOCK_FULL, MDT_CROSS_LOCK);
                 if (rc != 0) {
@@ -1285,11 +1308,11 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
 
         /* handle last link of tgt object */
         if (rc == 0) {
-                mdt_counter_incr(req->rq_export, LPROC_MDT_RENAME);
+               mdt_counter_incr(req, LPROC_MDT_RENAME);
                 if (mnew)
                         mdt_handle_last_unlink(info, mnew, ma);
 
-                mdt_rename_counter_tally(info, info->mti_mdt, req->rq_export,
+               mdt_rename_counter_tally(info, info->mti_mdt, req,
                                          msrcdir, mtgtdir);
         }