Whamcloud - gitweb
- fixes in split about using correct byte order;
authoryury <yury>
Wed, 1 Nov 2006 19:44:13 +0000 (19:44 +0000)
committeryury <yury>
Wed, 1 Nov 2006 19:44:13 +0000 (19:44 +0000)
- in cmm_split_try() init la_size used in split info messages;
- cleanups and debug in llite close thread related stuff;
- fixes in mdd about link() operation. Fixed deadlock for case when tgt and src is same object. Fixed lost error code after insert name which is possibly existing. Cleanups;
- in mdt_reint_open() use MDT_CROSS_LOCK for opened file in case it is located on remote MDT (by huanghua);
- in mdt_attr_set() fixed wrong goto to label out;
- cleanups in mdt_reint_link(), mdt_reint_unlink(), mdt_reint_rename() (by huanghua);
- fixed lock mode in md_reint_link(). It should LCK_EX instead of LCK_PW;
- fixed deadlock in mdt_reint_link() in case src and dst are same object;
- use MDT_CROSS_LOCK for src in mdt_reint_link() as it maybe located on remote MDT (by huanghua);
- used MDT_CROSS_LOCK for in mdt_reint_rename() as name may be located on remote MDT (by huanghua);
- fixed -ENODATA case in mdt_getxattr_pack_reply(). It should cause err_serious() with errors in console. Cleanups;

lustre/cmm/cmm_internal.h
lustre/cmm/cmm_split.c
lustre/include/lustre/lustre_idl.h
lustre/llite/llite_close.c
lustre/mdd/mdd_dir.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/mdt/mdt_xattr.c

index b8d0983..1d3357d 100644 (file)
@@ -111,6 +111,7 @@ struct cmr_object {
 struct cmm_thread_info {
         struct md_attr  cmi_ma;
         struct lu_buf   cmi_buf;
+        struct lu_fid   cmi_fid; /* used for le/cpu convertions */
         char            cmi_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
 };
 
index d6bd3e2..5d052f8 100644 (file)
@@ -55,9 +55,8 @@ enum {
 int cmm_split_check(const struct lu_env *env, struct md_object *mp,
                     const char *name)
 {
-        struct cml_object *clo = md2cml_obj(mp);
-        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
+        struct cml_object *clo = md2cml_obj(mp);
         int rc;
         ENTRY;
         
@@ -197,8 +196,8 @@ int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
         }
 
         /*
-         * Assumption: ma_valid = 0 here, we only need get
-         * inode and lmv_size for this get_attr
+         * Assumption: ma_valid = 0 here, we only need get inode and lmv_size
+         * for this get_attr.
          */
         LASSERT(ma->ma_valid == 0); 
         ma->ma_need = MA_INODE | MA_LMV;
@@ -370,22 +369,26 @@ cleanup:
         return rc;
 }
 
-/* Remove one entry from local MDT. */
+/*
+ * Remove one entry from local MDT. Do not corrupt byte order in page, it will
+ * be sent to remote MDT.
+ */
 static int cmm_split_remove_entry(const struct lu_env *env,
                                   struct md_object *mo,
                                   struct lu_dirent *ent)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct cmm_object *obj;
-        char *name;
         int is_dir, rc;
+        char *name;
         ENTRY;
 
-        if (!strncmp(ent->lde_name, ".", ent->lde_namelen) ||
-            !strncmp(ent->lde_name, "..", ent->lde_namelen))
+        if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) ||
+            !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen)))
                 RETURN(0);
 
-        obj = cmm_object_find(env, cmm, &ent->lde_fid);
+        fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid);
+        obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid);
         if (IS_ERR(obj))
                 RETURN(PTR_ERR(obj));
 
@@ -400,14 +403,14 @@ static int cmm_split_remove_entry(const struct lu_env *env,
                  */
                 is_dir = 1;
 
-        OBD_ALLOC(name, ent->lde_namelen + 1);
+        OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
         if (!name)
                 GOTO(cleanup, rc = -ENOMEM);
 
-        memcpy(name, ent->lde_name, ent->lde_namelen);
+        memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
         rc = mdo_name_remove(env, md_object_next(mo),
                              name, is_dir);
-        OBD_FREE(name, ent->lde_namelen + 1);
+        OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -417,8 +420,10 @@ static int cmm_split_remove_entry(const struct lu_env *env,
          * use the highest bit of the hash to indicate that (because we do not
          * use highest bit of hash).
          */
-        if (is_dir)
-                ent->lde_hash |= MAX_HASH_HIGHEST_BIT;
+        if (is_dir) {
+                ent->lde_hash = le32_to_cpu(ent->lde_hash);
+                ent->lde_hash = cpu_to_le32(ent->lde_hash | MAX_HASH_HIGHEST_BIT);
+        }
         EXIT;
 cleanup:
         cmm_object_put(env, obj);
@@ -443,25 +448,26 @@ static int cmm_split_remove_page(const struct lu_env *env,
         dp = page_address(rdpg->rp_pages[0]);
         *len = 0;
         for (ent = lu_dirent_start(dp);
-             ent != NULL && ent->lde_hash < hash_end;
+             ent != NULL && le32_to_cpu(ent->lde_hash) < hash_end;
              ent = lu_dirent_next(ent)) {
                 rc = cmm_split_remove_entry(env, mo, ent);
                 if (rc) {
                         /*
-                         * XXX Error handler to insert remove name back,
-                         * currently we assumed it will success anyway
-                         * in verfication test.
+                         * XXX: Error handler to insert remove name back,
+                         * currently we assumed it will success anyway in
+                         * verfication test.
                          */
-                        CWARN("Can not del %*.*s rc %d\n", ent->lde_namelen,
-                                ent->lde_namelen, ent->lde_name, rc);
+                        CWARN("Can not del %*.*s rc %d\n", le16_to_cpu(ent->lde_namelen),
+                              le16_to_cpu(ent->lde_namelen), ent->lde_name, rc);
                         GOTO(unmap, rc);
                 }
-                if (ent->lde_reclen == 0)
+                if (le16_to_cpu(ent->lde_reclen) == 0)
                         /*
                          * This is the last ent, whose rec size set to 0
                          * so recompute here
                          */
-                        *len += (sizeof *ent + le16_to_cpu(ent->lde_namelen) +
+                        *len += (sizeof(*ent) +
+                                 le16_to_cpu(ent->lde_namelen) +
                                  3) & ~3;
                 else
                         *len += le16_to_cpu(ent->lde_reclen);
@@ -553,10 +559,10 @@ static int cmm_split_process_stripe(const struct lu_env *env,
 
                 kmap(rdpg->rp_pages[0]);
                 ldp = page_address(rdpg->rp_pages[0]);
-                if (ldp->ldp_hash_end >= end)
+                if (le32_to_cpu(ldp->ldp_hash_end) >= end)
                         done = 1;
 
-                rdpg->rp_hash = ldp->ldp_hash_end;
+                rdpg->rp_hash = le32_to_cpu(ldp->ldp_hash_end);
                 kunmap(rdpg->rp_pages[0]);
         } while (!done);
 
@@ -631,7 +637,7 @@ int cmm_split_try(const struct lu_env *env, struct md_object *mo)
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct md_attr    *ma = &cmm_env_info(env)->cmi_ma;
         int                rc = 0, split;
-        __u64              la_size = 0;
+        __u64              la_size;
         struct lu_buf     *buf;
         ENTRY;
 
@@ -647,6 +653,8 @@ int cmm_split_try(const struct lu_env *env, struct md_object *mo)
                 /* No split is needed, caller may proceed with create. */
                 RETURN(0);
         }
+
+        la_size = ma->ma_attr.la_size;
         
         /* Split should be done now, let's do it. */
         CWARN("Dir "DFID" is going to split\n",
index 04b6e71..8f0b9c9 100644 (file)
@@ -309,7 +309,7 @@ static inline struct lu_dirent *lu_dirent_next(struct lu_dirent *ent)
 {
         struct lu_dirent *next;
 
-        if (ent->lde_reclen != 0)
+        if (le16_to_cpu(ent->lde_reclen) != 0)
                 next = ((void *)ent) + le16_to_cpu(ent->lde_reclen);
         else
                 next = NULL;
index 8a36eda..9c74070 100644 (file)
@@ -89,8 +89,8 @@ void ll_queue_done_writing(struct inode *inode, unsigned long flags)
                                inode->i_ino, inode->i_generation);
                         list_add_tail(&lli->lli_close_list, &lcq->lcq_head);
                 } else {
-                        CWARN("Inode %d is already queued for done writing!\n",
-                              inode->i_ino);
+                        CWARN("Inode %lu/%u is already queued for done writing!\n",
+                              inode->i_ino, inode->i_generation);
                 }
                 wake_up(&lcq->lcq_waitq);
                 spin_unlock(&lcq->lcq_lock);
@@ -119,7 +119,7 @@ void ll_epoch_close(struct inode *inode, struct md_op_data *op_data,
                 
                 inode = igrab(inode);
                 LASSERT(inode);
-                goto out;
+                GOTO(out, 0);
         }
 
         CDEBUG(D_INODE, "Epoch "LPU64" closed on "DFID"\n",
@@ -136,14 +136,14 @@ void ll_epoch_close(struct inode *inode, struct md_op_data *op_data,
                 /* Pack Size-on-MDS inode attributes only if they has changed */
                 if (!(lli->lli_flags & LLIF_SOM_DIRTY)) {
                         spin_unlock(&lli->lli_lock);
-                        goto out;
+                        GOTO(out, 0);
                 }
 
                 /* There is already 1 pending DONE_WRITE, do not create another
                  * one -- close epoch with no attribute change. */
                 if (lli->lli_flags & LLIF_EPOCH_PENDING) {
                         spin_unlock(&lli->lli_lock);
-                        goto out;
+                        GOTO(out, 0);
                 }
         }
         
@@ -157,8 +157,9 @@ void ll_epoch_close(struct inode *inode, struct md_op_data *op_data,
                 op_data->attr.ia_valid |= ATTR_MTIME_SET | ATTR_CTIME_SET |
                                           ATTR_SIZE | ATTR_BLOCKS;
         }
-out:
         EXIT;
+out:
+        return;
 }
 
 int ll_sizeonmds_update(struct inode *inode, struct lustre_handle *fh)
@@ -276,10 +277,13 @@ static int ll_close_thread(void *arg)
                         break;
 
                 inode = ll_info2i(lli);
+                CDEBUG(D_INFO, "done_writting for inode %lu/%u\n",
+                       inode->i_ino, inode->i_generation);
                 ll_done_writing(inode);
                 iput(inode);
         }
 
+        CDEBUG(D_INFO, "ll_close exiting\n");
         complete(&lcq->lcq_comp);
         RETURN(0);
 }
index 7e76c2b..4530819 100644 (file)
@@ -176,10 +176,9 @@ static int mdd_is_subdir(const struct lu_env *env,
         RETURN(rc);
 }
 
-/*Check whether it may create the cobj under the pobj*/
-static int mdd_may_create(const struct lu_env *env,
-                          struct mdd_object *pobj, struct mdd_object *cobj,
-                          int need_check)
+/* Check whether it may create the cobj under the pobj */
+static int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
+                          struct mdd_object *cobj, int need_check, int lock)
 {
         int rc = 0;
         ENTRY;
@@ -190,11 +189,16 @@ static int mdd_may_create(const struct lu_env *env,
         if (mdd_is_dead_obj(pobj))
                 RETURN(-ENOENT);
 
-        /*check pobj may create or not*/
-        if (need_check)
-                rc = mdd_permission_internal_locked(env, pobj,
-                                             MAY_WRITE | MAY_EXEC);
-
+        if (need_check) {
+                if (lock) {
+                        rc = mdd_permission_internal_locked(env, pobj,
+                                                            (MAY_WRITE |
+                                                             MAY_EXEC));
+                } else {
+                        rc = mdd_permission_internal(env, pobj, (MAY_WRITE |
+                                                                 MAY_EXEC));
+                }
+        }
         RETURN(rc);
 }
 
@@ -256,7 +260,7 @@ static int mdd_may_delete(const struct lu_env *env,
                         RETURN(-EBUSY);
 
         } else if (S_ISDIR(mdd_object_type(cobj))) {
-                        RETURN(-EISDIR);
+                RETURN(-EISDIR);
         }
 
         if (pobj) {
@@ -281,15 +285,20 @@ int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
         ENTRY;
 
         if (tgt_obj) {
-                rc = mdd_may_create(env, tgt_obj, NULL, 1);
+                /* 
+                 * Lock only if tgt and src not same object. This is because
+                 * mdd_link() already locked src and we try to lock it again we
+                 * have a problem.
+                 */
+                rc = mdd_may_create(env, tgt_obj, NULL, 1, (src_obj != tgt_obj));
                 if (rc)
                         RETURN(rc);
         }
 
-        if (S_ISDIR(mdd_object_type(src_obj)))
+        if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
                 RETURN(-EPERM);
 
-        if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
+        if (S_ISDIR(mdd_object_type(src_obj)))
                 RETURN(-EPERM);
 
         RETURN(rc);
@@ -382,16 +391,18 @@ static int __mdd_index_insert_only(const struct lu_env *env,
                                    const char *name, struct thandle *th,
                                    struct lustre_capa *capa)
 {
-        int rc;
         struct dt_object *next = mdd_object_child(pobj);
+        int rc;
         ENTRY;
 
-        if (dt_try_as_dir(env, next))
+        if (dt_try_as_dir(env, next)) {
                 rc = next->do_index_ops->dio_insert(env, next,
-                                         __mdd_fid_rec(env, lf),
-                                         (const struct dt_key *)name, th, capa);
-        else
+                                                    __mdd_fid_rec(env, lf),
+                                                    (const struct dt_key *)name,
+                                                    th, capa);
+        } else {
                 rc = -ENOTDIR;
+        }
         RETURN(rc);
 }
 
@@ -399,12 +410,12 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
                     struct md_object *src_obj, const char *name,
                     struct md_attr *ma)
 {
+        struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
         struct mdd_device *mdd = mdo2mdd(src_obj);
-        struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
-        struct thandle *handle;
         struct dynlock_handle *dlh;
+        struct thandle *handle;
         int rc;
         ENTRY;
 
@@ -420,24 +431,26 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
 
         rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
         if (rc)
-                GOTO(out, rc);
+                GOTO(out_unlock, rc);
 
         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
                                      name, handle,
                                      mdd_object_capa(env, mdd_tobj));
-        if (rc == 0)
-                mdd_ref_add_internal(env, mdd_sobj, handle);
+        if (rc)
+                GOTO(out_unlock, rc);
+        
+        mdd_ref_add_internal(env, mdd_sobj, handle);
 
         *la_copy = ma->ma_attr;
         la_copy->la_valid = LA_CTIME;
         rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle, 0);
         if (rc)
-                GOTO(out, rc);
+                GOTO(out_unlock, rc);
 
         la_copy->la_valid = LA_CTIME | LA_MTIME;
         rc = mdd_attr_set_internal_locked(env, mdd_tobj, la_copy, handle, 0);
 
-out:
+out_unlock:
         mdd_write_unlock(env, mdd_sobj);
         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
 out_trans:
@@ -664,11 +677,12 @@ static int mdd_name_insert(const struct lu_env *env,
         rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle,
                                 BYPASS_CAPA);
 
+        EXIT;
 out_unlock:
         mdd_pdo_write_unlock(env, mdd_obj, dlh);
 out_trans:
         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
-        RETURN(rc);
+        return rc;
 }
 
 /*
@@ -732,6 +746,7 @@ out_trans:
         mdd_trans_stop(env, mdd, rc, handle);
         RETURN(rc);
 }
+
 static int mdd_rt_sanity_check(const struct lu_env *env,
                                struct mdd_object *tgt_pobj,
                                struct mdd_object *tobj,
@@ -752,7 +767,7 @@ static int mdd_rt_sanity_check(const struct lu_env *env,
                      mdd_dir_is_empty(env, tobj))
                                 RETURN(-ENOTEMPTY);
         } else {
-                rc = mdd_may_create(env, tgt_pobj, NULL, 1);
+                rc = mdd_may_create(env, tgt_pobj, NULL, 1, 1);
         }
 
         RETURN(rc);
@@ -898,7 +913,7 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
         struct dt_object    *dir = mdd_object_child(mdd_obj);
         struct dt_rec       *rec = (struct dt_rec *)fid;
         const struct dt_key *key = (const struct dt_key *)name;
-        struct timeval      start;
+        struct timeval       start;
         int rc;
         ENTRY;
 
@@ -1293,7 +1308,7 @@ static int mdd_rename_sanity_check(const struct lu_env *env,
 
         if (!tobj) {
                 rc = mdd_may_create(env, tgt_pobj, NULL,
-                                    (src_pobj != tgt_pobj));
+                                    (src_pobj != tgt_pobj), 1);
         } else {
                 mdd_read_lock(env, tobj);
                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
index 3614a55..98c7529 100644 (file)
@@ -818,7 +818,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 GOTO(out, rc);
         }
 
-        /*step 1: lock parent */
+        /* step 1: lock parent */
         lhp = &info->mti_lh[MDT_LH_PARENT];
         mdt_lock_pdo_init(lhp, LCK_PR, name, namelen);
         rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
index 9927f56..fc83a3a 100644 (file)
@@ -889,7 +889,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 
                                 rc = mdt_object_lock(info, child, lhc,
                                                      MDS_INODELOCK_LOOKUP,
-                                                     MDT_LOCAL_LOCK);
+                                                     MDT_CROSS_LOCK);
                         }
                         repbody->fid1 = *mdt_object_fid(child);
                         repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
index 1413d96..2a6a25f 100644 (file)
@@ -165,17 +165,17 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
 
                 rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK);
                 if (rc != 0)
-                        GOTO(out, rc);
+                        RETURN(rc);
         }
 
         /* Setattrs are syncronized through dlm lock taken above. If another
          * epoch started, its attributes may be already flushed on disk,
          * skip setattr. */
         if (som_update && (info->mti_epoch->ioepoch != mo->mot_ioepoch))
-                GOTO(out, rc = 0);
+                GOTO(out_unlock, rc = 0);
 
         if (lu_object_assert_not_exists(&mo->mot_obj.mo_lu))
-                GOTO(out, rc = -ENOENT);
+                GOTO(out_unlock, rc = -ENOENT);
 
         /* all attrs are packed into mti_attr in unpack_setattr */
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
@@ -184,21 +184,20 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
         /* all attrs are packed into mti_attr in unpack_setattr */
         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
         if (rc != 0)
-                GOTO(out, rc);
+                GOTO(out_unlock, rc);
 
         /* Re-enable SIZEONMDS. */
         if (som_update) {
                 CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
                        mo->mot_ioepoch, PFID(mdt_object_fid(mo)),
                        mo->mot_epochcount);
-
                 mdt_sizeonmds_enable(info, mo);
         }
 
         EXIT;
-out:
+out_unlock:
         mdt_object_unlock(info, mo, lh, rc);
-        return(rc);
+        return rc;
 }
 
 static int mdt_reint_setattr(struct mdt_thread_info *info,
@@ -407,7 +406,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         if (rc != 0)
                  GOTO(out_unlock_parent, rc);
 
-        /* we will lock the child regardless it is local or remote. No harm. */
+        /* We will lock the child regardless it is local or remote. No harm. */
         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
         if (IS_ERR(mc))
                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
@@ -415,8 +414,10 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         mdt_lock_reg_init(child_lh, LCK_EX);
         rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL,
                              MDT_CROSS_LOCK);
-        if (rc != 0)
-                GOTO(out_put_child, rc);
+        if (rc != 0) {
+                mdt_object_put(info->mti_env, mc);
+                GOTO(out_unlock_parent, rc);
+        }
 
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
@@ -430,16 +431,11 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
                         mdt_object_child(mc), rr->rr_name, ma);
-        if (rc)
-                GOTO(out_unlock_child, rc);
-
-        mdt_handle_last_unlink(info, mc, ma);
+        if (rc == 0)
+                mdt_handle_last_unlink(info, mc, ma);
 
         EXIT;
-out_unlock_child:
-        mdt_object_unlock(info, mc, child_lh, rc);
-out_put_child:
-        mdt_object_put(info->mti_env, mc);
+        mdt_object_unlock_put(info, mc, child_lh, rc);
 out_unlock_parent:
         mdt_object_unlock_put(info, mp, parent_lh, rc);
 out:
@@ -483,7 +479,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
 
         /* step 1: find & lock the target parent dir */
         lhp = &info->mti_lh[MDT_LH_PARENT];
-        mdt_lock_pdo_init(lhp, LCK_PW, rr->rr_name,
+        mdt_lock_pdo_init(lhp, LCK_EX, rr->rr_name,
                           rr->rr_namelen);
         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
                                   MDS_INODELOCK_UPDATE);
@@ -493,14 +489,22 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         /* step 2: find & lock the source */
         lhs = &info->mti_lh[MDT_LH_CHILD];
         mdt_lock_reg_init(lhs, LCK_EX);
-        ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
-        if (IS_ERR(ms))
-                GOTO(out_unlock_parent, rc = PTR_ERR(ms));
 
-        rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE,
-                             MDT_CROSS_LOCK);
-        if (rc != 0)
-                GOTO(out_unlock_source, rc);
+        if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
+                mdt_object_get(info->mti_env, mp);
+                ms = mp;
+        } else {
+                ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
+                if (IS_ERR(ms))
+                        GOTO(out_unlock_parent, rc = PTR_ERR(ms));
+                
+                rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE,
+                                     MDT_CROSS_LOCK);
+                if (rc != 0) {
+                        mdt_object_put(info->mti_env, ms);
+                        GOTO(out_unlock_parent, rc);
+                }
+        }
 
         /* step 3: link it */
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
@@ -510,7 +514,6 @@ static int mdt_reint_link(struct mdt_thread_info *info,
                       mdt_object_child(ms), rr->rr_name, ma);
 
         EXIT;
-out_unlock_source:
         mdt_object_unlock_put(info, ms, lhs, rc);
 out_unlock_parent:
         mdt_object_unlock_put(info, mp, lhp, rc);
@@ -752,13 +755,19 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
                 GOTO(out_unlock_target, rc = -EINVAL);
 
-        lh_oldp = &info->mti_lh[MDT_LH_OLD];
-        mdt_lock_reg_init(lh_oldp, LCK_EX);
-        mold = mdt_object_find_lock(info, old_fid, lh_oldp,
-                                    MDS_INODELOCK_LOOKUP);
+        mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
         if (IS_ERR(mold))
                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
 
+        lh_oldp = &info->mti_lh[MDT_LH_OLD];
+        mdt_lock_reg_init(lh_oldp, LCK_EX);
+        rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP,
+                             MDT_CROSS_LOCK);
+        if (rc != 0) {
+                mdt_object_put(info->mti_env, mold);
+                GOTO(out_unlock_target, rc);
+        }
+
         /* step 4: find & lock the new object. */
         /* new target object may not exist now */
         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
index cd207d2..c2949db 100644 (file)
@@ -53,20 +53,21 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info)
         __u64                   valid = info->mti_body->valid;
         static const char       user_string[] = "user.";
         int                     size, rc;
-
+        ENTRY;
+        
         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETXATTR_PACK))
-                return -ENOMEM;
+                RETURN(-ENOMEM);
 
         /* Determine how many bytes we need */
         if ((valid & OBD_MD_FLXATTR) == OBD_MD_FLXATTR) {
                 xattr_name = req_capsule_client_get(pill, &RMF_NAME);
                 if (!xattr_name)
-                        return -EFAULT;
+                        RETURN(-EFAULT);
 
                 if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_XATTR) &&
                     !strncmp(xattr_name, user_string, sizeof(user_string) - 1))
-                        return -EOPNOTSUPP;
-
+                        RETURN(-EOPNOTSUPP);
+                
                 if (!strcmp(xattr_name, XATTR_NAME_LUSTRE_ACL))
                         size = RMTACL_SIZE_MAX;
                 else
@@ -78,19 +79,22 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info)
                                      mdt_object_child(info->mti_object),
                                      &LU_BUF_NULL);
         } else {
-                CERROR("valid bits: "LPX64"\n", info->mti_body->valid);
-                return -EINVAL;
+                CERROR("Valid bits: "LPX64"\n", info->mti_body->valid);
+                RETURN(-EINVAL);
         }
 
         if (size < 0) {
-                if (size != -ENODATA && size != -EOPNOTSUPP)
-                        CERROR("get EA size error: %d\n", size);
-                return size;
+                if (size == -ENODATA)
+                        size = 0;
+                else if (size != -EOPNOTSUPP) {
+                        CERROR("Error geting EA size: %d\n", size);
+                        RETURN(size);
+                }
         }
 
         if (info->mti_body->eadatasize != 0 &&
             info->mti_body->eadatasize < size)
-                return -ERANGE;
+                RETURN(-ERANGE);
 
         req_capsule_set_size(pill, &RMF_EADATA, RCL_SERVER,
                              min_t(int, size, info->mti_body->eadatasize));
@@ -98,10 +102,10 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info)
         rc = req_capsule_pack(pill);
         if (rc) {
                 LASSERT(rc < 0);
-                return rc;
+                RETURN(rc);
         }
 
-        return size;
+        RETURN(size);
 }
 
 static int do_remote_getfacl(struct mdt_thread_info *info,
@@ -152,13 +156,13 @@ int mdt_getxattr(struct mdt_thread_info *info)
                 RETURN(err_serious(easize));
 
         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
-        LASSERT(repbody);
+        LASSERT(repbody != NULL);
 
         rc = mdt_init_ucred(info, reqbody);
         if (rc)
                 RETURN(rc);
 
-        /* no need further getxattr */
+        /* No need further getxattr. */
         if (easize == 0 || reqbody->eadatasize == 0)
                 GOTO(out, rc = easize);
 
@@ -192,13 +196,14 @@ int mdt_getxattr(struct mdt_thread_info *info)
         } else
                 LBUG();
 
+        EXIT;
 out:
         if (rc >= 0) {
                 repbody->eadatasize = rc;
                 rc = 0;
         }
         mdt_exit_ucred(info);
-        RETURN(rc);
+        return rc;
 }
 
 /* return EADATA length to the caller. negative value means error */