Whamcloud - gitweb
- landing of b_hd_cleanup_merge to HEAD.
[fs/lustre-release.git] / lustre / mds / mds_reint.c
index a8af3c6..1de982c 100644 (file)
@@ -31,6 +31,8 @@
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <linux/fs.h>
+#include <linux/jbd.h>
+#include <linux/ext3_fs.h>
 #include <linux/obd_support.h>
 #include <linux/obd_class.h>
 #include <linux/obd.h>
@@ -40,7 +42,6 @@
 #include <linux/lustre_dlm.h>
 #include <linux/lustre_log.h>
 #include <linux/lustre_fsfilt.h>
-
 #include "mds_internal.h"
 
 void mds_commit_cb(struct obd_device *obd, __u64 transno, void *data,
@@ -794,7 +795,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 if (rec->ur_tgt == NULL)        /* no target supplied */
                         rc = -EINVAL;           /* -EPROTO? */
                 else
-                        rc = vfs_symlink(dir, dchild, rec->ur_tgt);
+                        rc = ll_vfs_symlink(dir, dchild, rec->ur_tgt, S_IALLUGO);
                 EXIT;
                 break;
         }
@@ -1058,8 +1059,7 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
         int rc, i, j, sorted, flags;
         ENTRY;
 
-        CDEBUG(D_DLMTRACE,
-               "locks before: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
+        CDEBUG(D_DLMTRACE, "locks before: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
                res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
                res_id[3]->name[0]);
 
@@ -1091,8 +1091,7 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
                 policies[j + 1] = policies[4];
         }
 
-        CDEBUG(D_DLMTRACE,
-               "lock order: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
+        CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
                res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
                res_id[3]->name[0]);
 
@@ -1248,7 +1247,7 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
                                 char *name, int namelen,
                                 struct lustre_handle *child_lockh,
                                 struct dentry **dchildp, int child_mode,
-                                __u64 child_lockpart, void *clone_info)
+                                __u64 child_lockpart)
 {
         struct ldlm_res_id child_res_id = { .name = {0} };
         struct ldlm_res_id parent_res_id = { .name = {0} };
@@ -1302,13 +1301,6 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
 #endif
 
         cleanup_phase = 1; /* parent dentry */
-#ifdef CONFIG_SNAPFS
-        if (clone_info) {
-                /*FIXME is there any other FUNC will use d_fsdata, 
-                 *excepet creating inode according inum*/
-                (*dparentp)->d_fsdata = clone_info;         
-        }
-#endif
 
         /* Step 2: Lookup child (without DLM lock, to get resource name) */
         *dchildp = ll_lookup_one_len(name, *dparentp, namelen - 1);
@@ -1394,6 +1386,85 @@ void mds_reconstruct_generic(struct ptlrpc_request *req)
 
         mds_req_from_mcd(req, med->med_mcd);
 }
+/* If we are unlinking an open file/dir (i.e. creating an orphan) then
+ * we instead link the inode into the PENDING directory until it is
+ * finally released.  We can't simply call mds_reint_rename() or some
+ * part thereof, because we don't have the inode to check for link
+ * count/open status until after it is locked.
+ *
+ * For lock ordering, caller must get child->i_sem first, then pending->i_sem
+ * before starting journal transaction.
+ *
+ * returns 1 on success
+ * returns 0 if we lost a race and didn't make a new link
+ * returns negative on error
+ */
+static int mds_orphan_add_link(struct mds_update_record *rec,
+                               struct obd_device *obd, struct dentry *dentry)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct inode *pending_dir = mds->mds_pending_dir->d_inode;
+        struct inode *inode = dentry->d_inode;
+        struct dentry *pending_child;
+        char fidname[LL_FID_NAMELEN];
+        int fidlen = 0, rc, mode;
+        ENTRY;
+
+        LASSERT(inode != NULL);
+        LASSERT(!mds_inode_is_orphan(inode));
+#ifndef HAVE_I_ALLOC_SEM
+        LASSERT(down_trylock(&inode->i_sem) != 0);
+#endif
+        LASSERT(down_trylock(&pending_dir->i_sem) != 0);
+
+        fidlen = ll_fid2str(fidname, inode->i_ino, inode->i_generation);
+
+        CDEBUG(D_INODE, "pending destroy of %dx open %d linked %s %s = %s\n",
+               mds_orphan_open_count(inode), inode->i_nlink,
+               S_ISDIR(inode->i_mode) ? "dir" :
+                S_ISREG(inode->i_mode) ? "file" : "other",rec->ur_name,fidname);
+
+        if (mds_orphan_open_count(inode) == 0 || inode->i_nlink != 0)
+                RETURN(0);
+
+        pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
+        if (IS_ERR(pending_child))
+                RETURN(PTR_ERR(pending_child));
+
+        if (pending_child->d_inode != NULL) {
+                CERROR("re-destroying orphan file %s?\n", rec->ur_name);
+                LASSERT(pending_child->d_inode == inode);
+                GOTO(out_dput, rc = 0);
+        }
+
+        /* link() is semanticaly-wrong for S_IFDIR, so we set S_IFREG
+         * for linking and return real mode back then -bzzz */
+        mode = inode->i_mode;
+        inode->i_mode = S_IFREG;
+        rc = vfs_link(dentry, pending_dir, pending_child);
+        if (rc)
+                CERROR("error linking orphan %s to PENDING: rc = %d\n",
+                       rec->ur_name, rc);
+        else
+                mds_inode_set_orphan(inode);
+
+        /* return mode and correct i_nlink if inode is directory */
+        inode->i_mode = mode;
+        LASSERTF(inode->i_nlink == 1, "%s nlink == %d\n",
+                 S_ISDIR(mode) ? "dir" : S_ISREG(mode) ? "file" : "other",
+                 inode->i_nlink);
+        if (S_ISDIR(mode)) {
+                inode->i_nlink++;
+                pending_dir->i_nlink++;
+                mark_inode_dirty(inode);
+                mark_inode_dirty(pending_dir);
+        }
+
+        GOTO(out_dput, rc = 1);
+out_dput:
+        l_dput(pending_child);
+        RETURN(rc);
+}
 
 int mds_create_local_dentry(struct mds_update_record *rec,
                            struct obd_device *obd)
@@ -1622,11 +1693,11 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                             struct ptlrpc_request *req,
                             struct lustre_handle *lh)
 {
-        struct dentry *dparent, *dchild;
+        struct dentry *dparent = NULL, *dchild;
         struct mds_obd *mds = mds_req2mds(req);
         struct obd_device *obd = req->rq_export->exp_obd;
         struct mds_body *body = NULL;
-        struct inode *child_inode;
+        struct inode *child_inode = NULL;
         struct lustre_handle parent_lockh[2] = {{0}, {0}}; 
         struct lustre_handle child_lockh = {0};
 #if 0
@@ -1636,7 +1707,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         struct llog_create_locks *lcl = NULL;
         char fidname[LL_FID_NAMELEN];
         void *handle = NULL;
-        int rc = 0, log_unlink = 0, cleanup_phase = 0;
+        int rc = 0, cleanup_phase = 0;
         int unlink_by_fid = 0;
         int update_mode;
         ENTRY;
@@ -1703,7 +1774,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                                                  rec->ur_namelen, &child_lockh,
                                                  &dchild, LCK_EX,
                                                  MDS_INODELOCK_LOOKUP |
-                                                 MDS_INODELOCK_UPDATE, NULL);
+                                                 MDS_INODELOCK_UPDATE);
         }
         if (rc)
                 GOTO(cleanup, rc);
@@ -1768,22 +1839,24 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
         LASSERT(body != NULL);
 
-        /* If this is the last reference to this inode, get the OBD EA
-         * data first so the client can destroy OST objects.
-         * we only do the object removal if no open files remain.
-         * Nobody can get at this name anymore because of the locks so
-         * we make decisions here as to whether to remove the inode */
-        if (S_ISREG(child_inode->i_mode) && child_inode->i_nlink == 1 &&
-            mds_open_orphan_count(child_inode) == 0) {
-                mds_pack_inode2fid(obd, &body->fid1, child_inode);
-                mds_pack_inode2body(obd, body, child_inode);
-                mds_pack_md(obd, req->rq_repmsg, offset + 1, body,
-                            child_inode, 1);
-                if (!(body->valid & OBD_MD_FLEASIZE)) {
-                        body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
-                                        OBD_MD_FLATIME | OBD_MD_FLMTIME);
-                } else {
-                        log_unlink = 1;
+        /* child i_alloc_sem protects orphan_dec_test && is_orphan race */
+        DOWN_READ_I_ALLOC_SEM(child_inode);
+        cleanup_phase = 4; /* up(&child_inode->i_sem) when finished */
+
+        /* If this is potentially the last reference to this inode, get the
+         * OBD EA data first so the client can destroy OST objects.  We
+         * only do the object removal later if no open files/links remain. */
+        if ((S_ISDIR(child_inode->i_mode) && child_inode->i_nlink == 2) ||
+            child_inode->i_nlink == 1) {
+                if (mds_orphan_open_count(child_inode) > 0) {
+                        /* need to lock pending_dir before transaction */
+                        down(&mds->mds_pending_dir->d_inode->i_sem);
+                        cleanup_phase = 5; /* up(&pending_dir->i_sem) */
+                } else if (S_ISREG(child_inode->i_mode)) {
+                        mds_pack_inode2fid(obd, &body->fid1, child_inode);
+                        mds_pack_inode2body(obd, body, child_inode);
+                        mds_pack_md(obd, req->rq_repmsg, offset + 1, body,
+                                    child_inode, MDS_PACK_MD_LOCK);
                 }
         }
 
@@ -1798,7 +1871,6 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                                       NULL);
                 if (IS_ERR(handle))
                         GOTO(cleanup, rc = PTR_ERR(handle));
-                cleanup_phase = 4; /* transaction */
                 rc = vfs_rmdir(dparent->d_inode, dchild);
                 break;
         case S_IFREG: {
@@ -1810,17 +1882,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                                           le32_to_cpu(lmm->lmm_stripe_count));
                 if (IS_ERR(handle))
                         GOTO(cleanup, rc = PTR_ERR(handle));
-
-                cleanup_phase = 4; /* transaction */
                 rc = vfs_unlink(dparent->d_inode, dchild);
-
-                if (!rc && log_unlink)
-                        if (mds_log_op_unlink(obd, child_inode,
-                                lustre_msg_buf(req->rq_repmsg, offset + 1, 0),
-                                req->rq_repmsg->buflens[offset + 1],
-                                lustre_msg_buf(req->rq_repmsg, offset + 2, 0),
-                                req->rq_repmsg->buflens[offset + 2], &lcl) > 0)
-                                body->valid |= OBD_MD_FLCOOKIE;
                 break;
         }
         case S_IFLNK:
@@ -1832,7 +1894,6 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                                       NULL);
                 if (IS_ERR(handle))
                         GOTO(cleanup, rc = PTR_ERR(handle));
-                cleanup_phase = 4; /* transaction */
                 rc = vfs_unlink(dparent->d_inode, dchild);
                 break;
         default:
@@ -1842,7 +1903,32 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 GOTO(cleanup, rc = -EINVAL);
         }
 
- cleanup:
+        if (rc == 0 && child_inode->i_nlink == 0) {
+                if (mds_orphan_open_count(child_inode) > 0)
+                        rc = mds_orphan_add_link(rec, obd, dchild);
+
+                if (rc == 1)
+                        GOTO(cleanup, rc = 0);
+
+                if (!S_ISREG(child_inode->i_mode))
+                        GOTO(cleanup, rc);
+
+                if (!(body->valid & OBD_MD_FLEASIZE)) {
+                        body->valid |=(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+                                       OBD_MD_FLATIME | OBD_MD_FLMTIME);
+                } else if (mds_log_op_unlink(obd, child_inode,
+                                lustre_msg_buf(req->rq_repmsg, offset + 1, 0),
+                                        req->rq_repmsg->buflens[offset + 1],
+                                lustre_msg_buf(req->rq_repmsg, offset + 2, 0),
+                                        req->rq_repmsg->buflens[offset+2], 
+                                &lcl) > 0){
+                        body->valid |= OBD_MD_FLCOOKIE;
+                }
+        }
+
+        GOTO(cleanup, rc);
+
+cleanup:
         if (rc == 0) {
                 struct iattr iattr;
                 int err;
@@ -1855,28 +1941,21 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 if (err)
                         CERROR("error on parent setattr: rc = %d\n", err);
         }
-
+        rc = mds_finish_transno(mds, dparent ? dparent->d_inode : NULL,
+                                handle, req, rc, 0);
+        if (!rc)
+                (void)obd_set_info(mds->mds_osc_exp, strlen("unlinked"),
+                                   "unlinked", 0, NULL);
         switch(cleanup_phase) {
-        case 4:
-                LASSERT(dchild != NULL && dchild->d_inode != NULL);
-                LASSERT(atomic_read(&dchild->d_inode->i_count) > 0);
-                if (rc == 0 && dchild->d_inode->i_nlink == 0 &&
-                                mds_open_orphan_count(dchild->d_inode) > 0) {
-
-                        /* filesystem is really going to destroy an inode
-                         * we have to delay this till inode is opened -bzzz */
-                        mds_open_unlink_rename(rec, obd, dparent, dchild, NULL);
-                }
-                /* handle splitted dir */
+        case 5: /* pending_dir semaphore */
+                up(&mds->mds_pending_dir->d_inode->i_sem);
+        case 4: /* child inode semaphore */
+                UP_READ_I_ALLOC_SEM(child_inode);
+                 /* handle splitted dir */
                 if (rc == 0) {
                         /* master directory can be non-empty or something else ... */
                         mds_unlink_slave_objs(obd, dchild);
                 }
-                rc = mds_finish_transno(mds, dparent->d_inode, handle, req,
-                                        rc, 0);
-                if (!rc)
-                        (void)obd_set_info(mds->mds_osc_exp, strlen("unlinked"),
-                                           "unlinked", 0, NULL);
                 if (lcl != NULL)
                         ptlrpc_save_llog_lock(req, lcl);
         case 3: /* child ino-reuse lock */
@@ -2101,7 +2180,9 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         ldlm_policy_data_t tgt_dir_policy =
                                        {.l_inodebits = {MDS_INODELOCK_UPDATE}};
         int rc = 0, cleanup_phase = 0;
+#ifdef S_PDIROPS
         int update_mode = 0;
+#endif
         ENTRY;
 
         LASSERT(offset == 1);
@@ -2250,56 +2331,6 @@ cleanup:
         return 0;
 }
 
-/*
- * add a hard link in the PENDING directory, only used by rename()
- */
-static int mds_add_link_orphan(struct mds_update_record *rec,
-                               struct obd_device *obd,
-                               struct dentry *dentry)
-{
-        struct mds_obd *mds = &obd->u.mds;
-        struct inode *pending_dir = mds->mds_pending_dir->d_inode;
-        struct dentry *pending_child;
-        char fidname[LL_FID_NAMELEN];
-        int fidlen = 0, rc;
-        ENTRY;
-
-        LASSERT(dentry->d_inode);
-        LASSERT(!mds_inode_is_orphan(dentry->d_inode));
-
-        down(&pending_dir->i_sem);
-        fidlen = ll_fid2str(fidname, dentry->d_inode->i_ino,
-                            dentry->d_inode->i_generation);
-
-        CDEBUG(D_ERROR, "pending destroy of %dx open file %s = %s\n",
-               mds_open_orphan_count(dentry->d_inode),
-               rec->ur_name, fidname);
-
-        pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
-        if (IS_ERR(pending_child))
-                GOTO(out_lock, rc = PTR_ERR(pending_child));
-
-        if (pending_child->d_inode != NULL) {
-                CERROR("re-destroying orphan file %s?\n", rec->ur_name);
-                LASSERT(pending_child->d_inode == dentry->d_inode);
-                GOTO(out_dput, rc = 0);
-        }
-
-        lock_kernel();
-        rc = vfs_link(dentry, pending_dir, pending_child);
-        unlock_kernel();
-        if (rc)
-                CERROR("error addlink orphan %s to PENDING: rc = %d\n",
-                       rec->ur_name, rc);
-        else
-                mds_inode_set_orphan(dentry->d_inode);
-out_dput:
-        l_dput(pending_child);
-out_lock:
-        up(&pending_dir->i_sem);
-        RETURN(rc);
-}
-
 /* The idea here is that we need to get four locks in the end:
  * one on each parent directory, one on each child.  We need to take
  * these locks in some kind of order (to avoid deadlocks), and the order
@@ -2414,6 +2445,7 @@ static int mds_get_parents_children_locked(struct obd_device *obd,
         cleanup_phase = 4; /* original name dentry */
 
         inode = (*de_oldp)->d_inode;
+        
         if (inode != NULL) {
                 inode = igrab(inode);
                 if (inode == NULL)
@@ -2439,6 +2471,10 @@ static int mds_get_parents_children_locked(struct obd_device *obd,
         cleanup_phase = 5; /* target dentry */
 
         inode = (*de_newp)->d_inode;
+        
+        if (inode == NULL)
+                goto retry_locks;
+        
         if (inode != NULL) {
                 inode = igrab(inode);
                 if (inode == NULL)
@@ -2539,6 +2575,95 @@ cleanup:
 
         return rc;
 }
+                                                                                                                                                                                                     
+static int mds_add_local_dentry(struct mds_update_record *rec, int offset,
+                                struct ptlrpc_request *req, struct dentry *dentry,
+                                struct dentry *de_dir, 
+                                struct dentry *de)
+{
+        struct obd_device *obd = req->rq_export->exp_obd;
+        struct mds_obd *mds = mds_req2mds(req);
+        void *handle = NULL;
+        int rc = 0;
+        ENTRY;
+                                                                                                                                                                                                     
+        if (de->d_inode) {
+                /*
+                 * name exists and points to local inode try to unlink this name
+                 * and create new one.
+                 */
+                CDEBUG(D_OTHER, "%s: %s points to local inode %lu/%lu\n",
+                       obd->obd_name, rec->ur_tgt,
+                       (unsigned long)de->d_inode->i_ino,
+                       (unsigned long)de->d_inode->i_generation);
+                handle = fsfilt_start(obd, de_dir->d_inode,
+                                      FSFILT_OP_RENAME, NULL);
+                if (IS_ERR(handle))
+                        GOTO(cleanup, rc = PTR_ERR(handle));
+                rc = fsfilt_del_dir_entry(req->rq_export->exp_obd, de);
+                if (rc)
+                        GOTO(cleanup, rc);
+        } else if (de->d_flags & DCACHE_CROSS_REF) {
+                /* name exists and points to remove inode */
+                CDEBUG(D_OTHER, "%s: %s points to remote inode %lu/%lu/%lu\n",
+                       obd->obd_name, rec->ur_tgt, (unsigned long)de->d_mdsnum,
+                       (unsigned long)de->d_inum, 
+                       (unsigned long)de->d_generation);
+                handle = fsfilt_start(obd, de_dir->d_inode,
+                                      FSFILT_OP_RENAME, NULL);
+                if (IS_ERR(handle))
+                        GOTO(cleanup, rc = PTR_ERR(handle));
+                rc = fsfilt_del_dir_entry(req->rq_export->exp_obd, de);
+                if (rc)
+                        GOTO(cleanup, rc);
+        } else {
+                /* name doesn't exist. the simplest case. */
+                handle = fsfilt_start(obd, de_dir->d_inode,
+                                      FSFILT_OP_LINK, NULL);
+                if (IS_ERR(handle))
+                        GOTO(cleanup, rc = PTR_ERR(handle));
+        }
+                                                                                                                                                                                                     
+        rc = fsfilt_add_dir_entry(obd, de_dir, rec->ur_tgt,
+                                  rec->ur_tgtlen - 1, dentry->d_inum,
+                                  dentry->d_generation, dentry->d_mdsnum);
+        if (rc) {
+                CERROR("add_dir_entry() returned error %d\n", rc);
+                GOTO(cleanup, rc);
+        }
+                                                                                                                                                                                                     
+cleanup:
+        EXIT;
+        rc = mds_finish_transno(mds, de_dir ? de_dir->d_inode : NULL,
+                                handle, req, rc, 0);
+                                                                                                                                                                                                     
+        RETURN(rc);
+}
+
+static int mds_del_local_dentry(struct mds_update_record *rec, int offset,
+                                struct ptlrpc_request *req, 
+                                struct dentry *dentry, struct dentry *de_dir, 
+                                struct dentry *de)
+{
+        struct obd_device *obd = req->rq_export->exp_obd;
+        struct mds_obd *mds = mds_req2mds(req);
+        void *handle = NULL;
+        int rc = 0;
+        ENTRY;
+                                                                                                                                                                                                     
+        handle = fsfilt_start(obd, de_dir->d_inode, FSFILT_OP_UNLINK, NULL);
+        if (IS_ERR(handle))
+                GOTO(cleanup, rc = PTR_ERR(handle));
+        rc = fsfilt_del_dir_entry(obd, de);
+        d_drop(de);
+                                                                                                                                                                                                     
+cleanup:
+        EXIT;
+        rc = mds_finish_transno(mds, de_dir ? de_dir->d_inode : NULL,
+                                handle, req, rc, 0);
+        RETURN(0);
+}
+
 
 static int mds_reint_rename_create_name(struct mds_update_record *rec,
                                         int offset, struct ptlrpc_request *req)
@@ -2570,7 +2695,7 @@ static int mds_reint_rename_create_name(struct mds_update_record *rec,
                                          &de_srcdir,LCK_PW,MDS_INODELOCK_UPDATE,
                                          &update_mode, rec->ur_tgt, rec->ur_tgtlen,
                                          &child_lockh, &de_new, LCK_EX,
-                                         MDS_INODELOCK_LOOKUP, NULL);
+                                         MDS_INODELOCK_LOOKUP);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -2652,7 +2777,6 @@ static int mds_reint_rename_to_remote(struct mds_update_record *rec, int offset,
         struct lustre_handle parent_lockh[2] = {{0}, {0}};
         struct lustre_handle child_lockh = {0};
         struct mdc_op_data opdata;
-        void *handle = NULL;
         int update_mode, rc = 0;
         ENTRY;
 
@@ -2665,7 +2789,7 @@ static int mds_reint_rename_to_remote(struct mds_update_record *rec, int offset,
                                          &de_srcdir,LCK_PW,MDS_INODELOCK_UPDATE,
                                          &update_mode, rec->ur_name, 
                                          rec->ur_namelen, &child_lockh, &de_old,
-                                         LCK_EX, MDS_INODELOCK_LOOKUP, NULL);
+                                         LCK_EX, MDS_INODELOCK_LOOKUP);
         LASSERT(rc == 0);
         LASSERT(de_srcdir);
         LASSERT(de_srcdir->d_inode);
@@ -2698,17 +2822,11 @@ static int mds_reint_rename_to_remote(struct mds_update_record *rec, int offset,
        
         if (rc)
                 GOTO(cleanup, rc);
-
-        handle = fsfilt_start(obd, de_srcdir->d_inode, FSFILT_OP_UNLINK, NULL);
-        if (IS_ERR(handle))
-                GOTO(cleanup, rc = PTR_ERR(handle));
-        rc = fsfilt_del_dir_entry(obd, de_old);
-        d_drop(de_old);
-
+        
+        rc = mds_del_local_dentry(rec, offset, req, NULL, de_srcdir, de_old);
 cleanup:
         EXIT;
-        rc = mds_finish_transno(mds, de_srcdir ? de_srcdir->d_inode : NULL,
-                                handle, req, rc, 0);
+
         if (req2)
                 ptlrpc_req_finished(req2);
 
@@ -2736,12 +2854,14 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         struct dentry *de_tgtdir = NULL;
         struct dentry *de_old = NULL;
         struct dentry *de_new = NULL;
+        struct inode *old_inode = NULL, *new_inode = NULL;
         struct mds_obd *mds = mds_req2mds(req);
         struct lustre_handle dlm_handles[7] = {{0},{0},{0},{0},{0},{0},{0}};
         struct mds_body *body = NULL;
         struct llog_create_locks *lcl = NULL;
-        int rc = 0, log_unlink = 0, lock_count = 3;
-        int cleanup_phase = 0;
+        struct lov_mds_md *lmm = NULL;
+        int rc = 0, cleanup_phase = 0;
+
         void *handle = NULL;
         ENTRY;
 
@@ -2783,21 +2903,60 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
 
         cleanup_phase = 1; /* parent(s), children, locks */
 
-        if (de_new->d_inode)
-                lock_count = 4;
+        old_inode = de_old->d_inode;
+        new_inode = de_new->d_inode;
 
         /* sanity check for src inode */
-        if (de_old->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
-            de_old->d_inode->i_ino == de_tgtdir->d_inode->i_ino)
-                GOTO(cleanup, rc = -EINVAL);
-        
+        if (de_old->d_flags & DCACHE_CROSS_REF) {
+                LASSERT(de_old->d_inode == NULL);
+                                                                                                                                                                                                     
+                /*
+                 * in the case of cross-ref dir, we can perform this check only
+                 * if child and parent lie on the same mds. This is because
+                 * otherwise they can have the same inodes.
+                 */
+                if (de_old->d_mdsnum == mds->mds_num) {
+                        if (de_old->d_inum == de_srcdir->d_inode->i_ino ||
+                            de_old->d_inum == de_tgtdir->d_inode->i_ino)
+                                GOTO(cleanup, rc = -EINVAL);
+                }
+        } else {
+                LASSERT(de_old->d_inode != NULL);
+                if (de_old->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
+                    de_old->d_inode->i_ino == de_tgtdir->d_inode->i_ino)
+                        GOTO(cleanup, rc = -EINVAL);
+        }
+                                                                                                                                                                                                     
         /* sanity check for dest inode */
-        if (de_new->d_inode &&
-            (de_new->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
-             de_new->d_inode->i_ino == de_tgtdir->d_inode->i_ino))
-                GOTO(cleanup, rc = -EINVAL);
-
-        if (de_old->d_inode == de_new->d_inode)
+        if (de_new->d_flags & DCACHE_CROSS_REF) {
+                LASSERT(new_inode == NULL);
+                                                                                                                                                                                                     
+                /* the same check about target dentry. */
+                if (de_new->d_mdsnum == mds->mds_num) {
+                        if (de_new->d_inum == de_srcdir->d_inode->i_ino ||
+                            de_new->d_inum == de_tgtdir->d_inode->i_ino)
+                                GOTO(cleanup, rc = -EINVAL);
+                }
+                                                                                                                                                                                                     
+                /*
+                 * regular files usualy do not have ->rename() implemented. But
+                 * we handle only this case when @de_new is cross-ref entry,
+                 * because in other cases it will be handled by vfs_rename().
+                 */
+                if (de_old->d_inode && (!de_old->d_inode->i_op || 
+                    !de_old->d_inode->i_op->rename))
+                        GOTO(cleanup, rc = -EPERM);
+        } else {
+                if (new_inode &&
+                    (new_inode->i_ino == de_srcdir->d_inode->i_ino ||
+                     new_inode->i_ino == de_tgtdir->d_inode->i_ino))
+                        GOTO(cleanup, rc = -EINVAL);
+        }
+        
+        /* check if inodes point to each other. */
+        if (!(de_old->d_flags & DCACHE_CROSS_REF) &&
+            !(de_new->d_flags & DCACHE_CROSS_REF) &&
+            old_inode == new_inode)
                 GOTO(cleanup, rc = 0);
 
         /* if we are about to remove the target at first, pass the EA of
@@ -2805,52 +2964,78 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
         LASSERT(body != NULL);
 
-        if (de_new->d_inode &&
-            S_ISREG(de_new->d_inode->i_mode) &&
-            de_new->d_inode->i_nlink == 1 &&
-            mds_open_orphan_count(de_new->d_inode) == 0) {
-                mds_pack_inode2fid(obd, &body->fid1, de_new->d_inode);
-                mds_pack_inode2body(obd, body, de_new->d_inode);
-                mds_pack_md(obd, req->rq_repmsg, 1, body, de_new->d_inode, 1);
-                if (!(body->valid & OBD_MD_FLEASIZE)) {
-                        body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
-                        OBD_MD_FLATIME | OBD_MD_FLMTIME);
-                } else {
-                        log_unlink = 1; 
-                }
+        /* child i_alloc_sem protects orphan_dec_test && is_orphan race */
+        if (new_inode) 
+                DOWN_READ_I_ALLOC_SEM(new_inode);
+        cleanup_phase = 2; /* up(&new_inode->i_sem) when finished */
+
+        if (new_inode && ((S_ISDIR(new_inode->i_mode) && 
+            new_inode->i_nlink == 2) ||
+            new_inode->i_nlink == 1)) {
+                if (mds_orphan_open_count(new_inode) > 0) {
+                        /* need to lock pending_dir before transaction */
+                        down(&mds->mds_pending_dir->d_inode->i_sem);
+                        cleanup_phase = 3; /* up(&pending_dir->i_sem) */
+                } else if (S_ISREG(new_inode->i_mode)) {
+                        mds_pack_inode2fid(obd, &body->fid1, new_inode);
+                        mds_pack_inode2body(obd, body, new_inode);
+                        mds_pack_md(obd, req->rq_repmsg, 1, body, new_inode, 
+                                    MDS_PACK_MD_LOCK);
+                 }
         }
 
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
                        de_srcdir->d_inode->i_sb);
 
-        handle = fsfilt_start(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME, NULL);
-        if (IS_ERR(handle))
-                GOTO(cleanup, rc = PTR_ERR(handle));
-
-        /* FIXME need adjust the journal block count? */
-        /* if the target should be moved to PENDING, we at first increase the
-         * link and later vfs_rename() will decrease the link count again */
-        if (de_new->d_inode &&
-            S_ISREG(de_new->d_inode->i_mode) &&
-            de_new->d_inode->i_nlink == 1 &&
-            mds_open_orphan_count(de_new->d_inode) > 0) {
-                rc = mds_add_link_orphan(rec, obd, de_new);
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)
+        /* Check if we are moving old entry into its child. 2.6 does not
+           check for this in vfs_rename() anymore */
+        if (is_subdir(de_new, de_old))
+                GOTO(cleanup, rc = -EINVAL);
+#endif
+        if (de_old->d_flags & DCACHE_CROSS_REF) {
+                rc = mds_add_local_dentry(rec, offset, req, de_old, de_tgtdir, 
+                                          de_new);
                 if (rc)
                         GOTO(cleanup, rc);
+                                                                                                                                                                                                     
+                rc = mds_del_local_dentry(rec, offset, req, de_old, de_srcdir, 
+                                          de_old);
+                GOTO(cleanup, rc);
         }
 
+        lmm = lustre_msg_buf(req->rq_repmsg, 1, 0);
+        handle = fsfilt_start_log(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME,
+                                  NULL, le32_to_cpu(lmm->lmm_stripe_count));
+
+        if (IS_ERR(handle))
+                GOTO(cleanup, rc = PTR_ERR(handle));
+
         lock_kernel();
         de_old->d_fsdata = req;
         de_new->d_fsdata = req;
         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new);
         unlock_kernel();
 
-        if (!rc && log_unlink) {
-                if (mds_log_op_unlink(obd, de_tgtdir->d_inode,
-                                lustre_msg_buf(req->rq_repmsg, 1, 0),
-                                req->rq_repmsg->buflens[1],
-                                lustre_msg_buf(req->rq_repmsg, 2, 0),
-                                req->rq_repmsg->buflens[2], &lcl) > 0) {
+        if (rc == 0 && new_inode != NULL && new_inode->i_nlink == 0) {
+                if (mds_orphan_open_count(new_inode) > 0)
+                        rc = mds_orphan_add_link(rec, obd, de_new);
+
+                if (rc == 1)
+                        GOTO(cleanup, rc = 0);
+
+                if (!S_ISREG(new_inode->i_mode))
+                        GOTO(cleanup, rc);
+
+                if (!(body->valid & OBD_MD_FLEASIZE)) {
+                        body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+                                        OBD_MD_FLATIME | OBD_MD_FLMTIME);
+                } else if (mds_log_op_unlink(obd, new_inode,
+                                             lustre_msg_buf(req->rq_repmsg,1,0),
+                                             req->rq_repmsg->buflens[1],
+                                             lustre_msg_buf(req->rq_repmsg,2,0),
+                                             req->rq_repmsg->buflens[2], 
+                                             &lcl) > 0) {
                         body->valid |= OBD_MD_FLCOOKIE;
                 }
         }
@@ -2860,6 +3045,11 @@ cleanup:
         rc = mds_finish_transno(mds, de_tgtdir ? de_tgtdir->d_inode : NULL,
                                 handle, req, rc, 0);
         switch (cleanup_phase) {
+        case 3:
+                up(&mds->mds_pending_dir->d_inode->i_sem);
+        case 2:
+                if (new_inode)
+                        UP_READ_I_ALLOC_SEM(new_inode);
         case 1:
 #ifdef S_PDIROPS
                 if (dlm_handles[5].cookie != 0)
@@ -2871,13 +3061,13 @@ cleanup:
                         ptlrpc_save_llog_lock(req, lcl);
 
                 if (rc) {
-                        if (lock_count == 4)
+                        if (dlm_handles[3].cookie != 0)
                                 ldlm_lock_decref(&(dlm_handles[3]), LCK_EX);
                         ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
                         ldlm_lock_decref(&(dlm_handles[1]), LCK_PW);
                         ldlm_lock_decref(&(dlm_handles[0]), LCK_PW);
                 } else {
-                        if (lock_count == 4)
+                        if (dlm_handles[3].cookie != 0)
                                 ptlrpc_save_lock(req,&(dlm_handles[3]), LCK_EX);
                         ptlrpc_save_lock(req, &(dlm_handles[2]), LCK_EX);
                         ptlrpc_save_lock(req, &(dlm_handles[1]), LCK_PW);