Whamcloud - gitweb
LU-1303 lod: introduce lod device
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index 3ffd2e6..c2227e7 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -44,9 +44,6 @@
  * Author: Yury Umanets <umka@clusterfs.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include "mdt_internal.h"
@@ -106,8 +103,8 @@ static void mdt_obj_version_get(struct mdt_thread_info *info,
 {
         LASSERT(o);
         LASSERT(mdt_object_exists(o) >= 0);
-        if (mdt_object_exists(o) > 0)
-                *version = mo_version_get(info->mti_env, mdt_object_child(o));
+       if (mdt_object_exists(o) > 0 && !mdt_object_obf(o))
+                *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
         else
                 *version = ENOENT_VERSION;
         CDEBUG(D_INODE, "FID "DFID" version is "LPX64"\n",
@@ -292,6 +289,9 @@ static int mdt_md_create(struct mdt_thread_info *info)
         if (IS_ERR(parent))
                 RETURN(PTR_ERR(parent));
 
+        if (mdt_object_obf(parent))
+                GOTO(out_put_parent, rc = -EPERM);
+
         rc = mdt_version_get_check_save(info, parent, 0);
         if (rc)
                 GOTO(out_put_parent, rc);
@@ -303,14 +303,17 @@ static int mdt_md_create(struct mdt_thread_info *info)
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
         rc = mdt_lookup_version_check(info, parent, lname,
                                       &info->mti_tmp_fid1, 1);
-        /* -ENOENT is expected here */
-        if (rc != 0 && rc != -ENOENT)
-                GOTO(out_put_parent, rc);
+       if (rc == 0)
+               GOTO(out_put_parent, rc = -EEXIST);
 
-        /* save version of file name for replay, it must be ENOENT here */
-        mdt_enoent_version_save(info, 1);
+       /* -ENOENT is expected here */
+       if (rc != -ENOENT)
+               GOTO(out_put_parent, rc);
+
+       /* save version of file name for replay, it must be ENOENT here */
+       mdt_enoent_version_save(info, 1);
 
-        child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2);
+       child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
         if (likely(!IS_ERR(child))) {
                 struct md_object *next = mdt_object_child(parent);
 
@@ -334,16 +337,19 @@ static int mdt_md_create(struct mdt_thread_info *info)
                 info->mti_spec.sp_cr_mode =
                         mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
 
-                /*
-                 * Do perform lookup sanity check. We do not know if name exists
-                 * or not.
-                 */
-                info->mti_spec.sp_cr_lookup = 1;
+               /*
+                * Do not perform lookup sanity check. We know that name does
+                * not exist.
+                */
+               info->mti_spec.sp_cr_lookup = 0;
                 info->mti_spec.sp_feat = &dt_directory_features;
 
                 rc = mdo_create(info->mti_env, next, lname,
                                 mdt_object_child(child),
                                 &info->mti_spec, ma);
+               if (rc == 0)
+                       rc = mdt_attr_get_complex(info, child, ma);
+
                 if (rc == 0) {
                         /* Return fid & attr to client. */
                         if (ma->ma_valid & MA_INODE)
@@ -388,7 +394,7 @@ static int mdt_md_mkobj(struct mdt_thread_info *info)
                  * recovery, just get attr in that case.
                  */
                 if (mdt_object_exists(o) == 1) {
-                        rc = mo_attr_get(info->mti_env, next, ma);
+                       rc = mdt_attr_get_complex(info, o, ma);
                 } else {
                         /*
                          * Here, NO permission check for object_create,
@@ -411,16 +417,12 @@ static int mdt_md_mkobj(struct mdt_thread_info *info)
         RETURN(rc);
 }
 
-/* In the raw-setattr case, we lock the child inode.
- * In the write-back case or if being called from open,
- *               the client holds a lock already.
- * We use the ATTR_FROM_OPEN (translated into MRF_SETATTR_LOCKED by
- * mdt_setattr_unpack()) flag to tell these cases apart. */
 int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
                  struct md_attr *ma, int flags)
 {
         struct mdt_lock_handle  *lh;
         int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS);
+        __u64 lockpart = MDS_INODELOCK_UPDATE;
         int rc;
         ENTRY;
 
@@ -430,15 +432,12 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
         lh = &info->mti_lh[MDT_LH_PARENT];
         mdt_lock_reg_init(lh, LCK_PW);
 
-        if (!(flags & MRF_SETATTR_LOCKED)) {
-                __u64 lockpart = MDS_INODELOCK_UPDATE;
-                if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
-                        lockpart |= MDS_INODELOCK_LOOKUP;
+        if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
+                lockpart |= MDS_INODELOCK_LOOKUP;
 
-                rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK);
-                if (rc != 0)
-                        RETURN(rc);
-        }
+        rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK);
+        if (rc != 0)
+                RETURN(rc);
 
         if (mdt_object_exists(mo) == 0)
                 GOTO(out_unlock, rc = -ENOENT);
@@ -480,9 +479,8 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
         struct mdt_file_data    *mfd;
         struct mdt_object       *mo;
-        struct md_object        *next;
         struct mdt_body         *repbody;
-        int                      som_au, rc;
+        int                      som_au, rc, rc2;
         ENTRY;
 
         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
@@ -496,10 +494,13 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
         if (IS_ERR(mo))
                 GOTO(out, rc = PTR_ERR(mo));
 
+       if (mdt_object_obf(mo))
+               GOTO(out_put, rc = -EPERM);
+
         /* start a log jounal handle if needed */
         if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) {
                 if ((ma->ma_attr.la_valid & LA_SIZE) ||
-                    (rr->rr_flags & MRF_SETATTR_LOCKED)) {
+                    (rr->rr_flags & MRF_OPEN_TRUNC)) {
                         /* Check write access for the O_TRUNC case */
                         if (mdt_write_read(mo) < 0)
                                 GOTO(out_put, rc = -ETXTBSY);
@@ -521,7 +522,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                 repbody->ioepoch = mo->mot_ioepoch;
 
                 mdt_object_get(info->mti_env, mo);
-                mdt_mfd_set_mode(mfd, FMODE_TRUNC);
+                mdt_mfd_set_mode(mfd, MDS_FMODE_TRUNC);
                 mfd->mfd_object = mo;
                 mfd->mfd_xid = req->rq_xid;
 
@@ -548,7 +549,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                                info->mti_ioepoch->handle.cookie);
                         GOTO(out_put, rc = -ESTALE);
                 }
-                LASSERT(mfd->mfd_mode == FMODE_SOM);
+                LASSERT(mfd->mfd_mode == MDS_FMODE_SOM);
                 LASSERT(!(info->mti_ioepoch->flags & MF_EPOCH_CLOSE));
 
                 class_handle_unhash(&mfd->mfd_handle);
@@ -557,13 +558,13 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
 
                 /* Close the found mfd, update attributes. */
                 ma->ma_lmm_size = info->mti_mdt->mdt_max_mdsize;
-                OBD_ALLOC(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize);
+                OBD_ALLOC_LARGE(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize);
                 if (ma->ma_lmm == NULL)
                         GOTO(out_put, rc = -ENOMEM);
 
                 mdt_mfd_close(info, mfd);
 
-                OBD_FREE(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize);
+                OBD_FREE_LARGE(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize);
         } else {
                 rc = mdt_attr_set(info, mo, ma, rr->rr_flags);
                 if (rc)
@@ -572,8 +573,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
 
         ma->ma_need = MA_INODE;
         ma->ma_valid = 0;
-        next = mdt_object_child(mo);
-        rc = mo_attr_get(info->mti_env, next, ma);
+       rc = mdt_attr_get_complex(info, mo, ma);
         if (rc != 0)
                 GOTO(out_put, rc);
 
@@ -598,14 +598,21 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
 out_put:
         mdt_object_put(info->mti_env, mo);
 out:
-        mdt_shrink_reply(info);
+        if (rc == 0)
+               mdt_counter_incr(req, LPROC_MDT_SETATTR);
+
+        mdt_client_compatibility(info);
+        rc2 = mdt_fix_reply(info);
+        if (rc == 0)
+                rc = rc2;
         return rc;
 }
 
 static int mdt_reint_create(struct mdt_thread_info *info,
                             struct mdt_lock_handle *lhc)
 {
-        int rc;
+        struct ptlrpc_request   *req = mdt_info_req(info);
+        int                     rc;
         ENTRY;
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
@@ -617,10 +624,15 @@ static int mdt_reint_create(struct mdt_thread_info *info,
         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
         case S_IFDIR:{
                 /* Cross-ref case. */
+                /* TODO: we can add LPROC_MDT_CROSS for cross-ref stats */
                 if (info->mti_cross_ref) {
                         rc = mdt_md_mkobj(info);
-                        break;
+                } else {
+                        LASSERT(info->mti_rr.rr_namelen > 0);
+                       mdt_counter_incr(req, LPROC_MDT_MKDIR);
+                        rc = mdt_md_create(info);
                 }
+                break;
         }
         case S_IFREG:
         case S_IFLNK:
@@ -630,6 +642,7 @@ static int mdt_reint_create(struct mdt_thread_info *info,
         case S_IFSOCK:{
                 /* Special file should stay on the same node as parent. */
                 LASSERT(info->mti_rr.rr_namelen > 0);
+               mdt_counter_incr(req, LPROC_MDT_MKNOD);
                 rc = mdt_md_create(info);
                 break;
         }
@@ -641,7 +654,7 @@ static int mdt_reint_create(struct mdt_thread_info *info,
 
 /*
  * VBR: save parent version in reply and child version getting by its name.
- * Version of child is getting and checking during its lookup. If 
+ * Version of child is getting and checking during its lookup. If
  */
 static int mdt_reint_unlink(struct mdt_thread_info *info,
                             struct mdt_lock_handle *lhc)
@@ -692,6 +705,9 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                 GOTO(out, rc);
         }
 
+        if (mdt_object_obf(mp))
+                GOTO(out_unlock_parent, rc = -EPERM);
+
         rc = mdt_version_get_check_save(info, mp, 0);
         if (rc)
                 GOTO(out_unlock_parent, rc);
@@ -721,6 +737,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         /* step 2: find & lock the child */
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
         /* lookup child object along with version checking */
+        fid_zero(child_fid);
         rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1);
         if (rc != 0)
                  GOTO(out_unlock_parent, rc);
@@ -751,9 +768,30 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
                         mdt_object_child(mc), lname, ma);
+       if (rc == 0 && !lu_object_is_dying(&mc->mot_header))
+               rc = mdt_attr_get_complex(info, mc, ma);
         if (rc == 0)
                 mdt_handle_last_unlink(info, mc, ma);
 
+        if (ma->ma_valid & MA_INODE) {
+                switch (ma->ma_attr.la_mode & S_IFMT) {
+                case S_IFDIR:
+                       mdt_counter_incr(req, LPROC_MDT_RMDIR);
+                        break;
+                case S_IFREG:
+                case S_IFLNK:
+                case S_IFCHR:
+                case S_IFBLK:
+                case S_IFIFO:
+                case S_IFSOCK:
+                       mdt_counter_incr(req, LPROC_MDT_UNLINK);
+                        break;
+                default:
+                        LASSERTF(0, "bad file type %o unlinking\n",
+                                 ma->ma_attr.la_mode);
+                }
+        }
+
         EXIT;
 
         mdt_object_unlock_put(info, mc, child_lh, rc);
@@ -819,6 +857,9 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         if (IS_ERR(mp))
                 RETURN(PTR_ERR(mp));
 
+        if (mdt_object_obf(mp))
+                GOTO(out_unlock_parent, rc = -EPERM);
+
         rc = mdt_version_get_check_save(info, mp, 0);
         if (rc)
                 GOTO(out_unlock_parent, rc);
@@ -861,6 +902,9 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         rc = mdo_link(info->mti_env, mdt_object_child(mp),
                       mdt_object_child(ms), lname, ma);
 
+        if (rc == 0)
+               mdt_counter_incr(req, LPROC_MDT_LINK);
+
         EXIT;
 out_unlock_child:
         mdt_object_unlock_put(info, ms, lhs, rc);
@@ -1119,6 +1163,9 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         if (IS_ERR(msrcdir))
                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
 
+        if (mdt_object_obf(msrcdir))
+                GOTO(out_unlock_source, rc = -EPERM);
+
         rc = mdt_version_get_check_save(info, msrcdir, 0);
         if (rc)
                 GOTO(out_unlock_source, rc);
@@ -1143,6 +1190,9 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                 if (IS_ERR(mtgtdir))
                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
 
+                if (mdt_object_obf(mtgtdir))
+                        GOTO(out_put_target, rc = -EPERM);
+
                 /* check early, the real version will be saved after locking */
                 rc = mdt_version_get_check(info, mtgtdir, 1);
                 if (rc)
@@ -1166,6 +1216,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         /* step 3: find & lock the old object. */
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
         mdt_name_copy(&slname, lname);
+        fid_zero(old_fid);
         rc = mdt_lookup_version_check(info, msrcdir, &slname, old_fid, 2);
         if (rc != 0)
                 GOTO(out_unlock_target, rc);
@@ -1177,6 +1228,11 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         if (IS_ERR(mold))
                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
 
+       if (mdt_object_obf(mold)) {
+               mdt_object_put(info->mti_env, mold);
+               GOTO(out_unlock_target, rc = -EPERM);
+       }
+
         lh_oldp = &info->mti_lh[MDT_LH_OLD];
         mdt_lock_reg_init(lh_oldp, LCK_EX);
         rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP,
@@ -1195,6 +1251,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         /* new target object may not exist now */
         lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen);
         /* lookup with version checking */
+        fid_zero(new_fid);
         rc = mdt_lookup_version_check(info, mtgtdir, lname, new_fid, 3);
         if (rc == 0) {
                 /* the new_fid should have been filled at this moment */
@@ -1210,6 +1267,11 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                 if (IS_ERR(mnew))
                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
 
+               if (mdt_object_obf(mnew)) {
+                       mdt_object_put(info->mti_env, mnew);
+                       GOTO(out_unlock_old, rc = -EPERM);
+               }
+
                 rc = mdt_object_lock(info, mnew, lh_newp,
                                      MDS_INODELOCK_FULL, MDT_CROSS_LOCK);
                 if (rc != 0) {
@@ -1245,8 +1307,14 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                         lname, ma);
 
         /* handle last link of tgt object */
-        if (rc == 0 && mnew)
-                mdt_handle_last_unlink(info, mnew, ma);
+        if (rc == 0) {
+               mdt_counter_incr(req, LPROC_MDT_RENAME);
+                if (mnew)
+                        mdt_handle_last_unlink(info, mnew, ma);
+
+               mdt_rename_counter_tally(info, info->mti_mdt, req,
+                                         msrcdir, mtgtdir);
+        }
 
         EXIT;
 out_unlock_new: