Whamcloud - gitweb
- landing b_fid.
[fs/lustre-release.git] / lustre / mds / mds_reint.c
index f1a1fa9..10e1ea8 100644 (file)
 #include <linux/lustre_fsfilt.h>
 #include "mds_internal.h"
 
-void mds_commit_cb(struct obd_device *obd, __u64 transno, void *data,
-                   int error)
-{
-        obd_transno_commit_cb(obd, transno, error);
-}
-
 struct mds_logcancel_data {
         struct lov_mds_md      *mlcd_lmm;
         int                     mlcd_size;
@@ -58,9 +52,9 @@ struct mds_logcancel_data {
         struct llog_cookie      mlcd_cookies[0];
 };
 
-
-static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
-                                  void *cb_data, int error)
+static void mds_cancel_cookies_cb(struct obd_device *obd,
+                                  __u64 transno, void *cb_data,
+                                  int error)
 {
         struct mds_logcancel_data *mlcd = cb_data;
         struct lov_stripe_md *lsm = NULL;
@@ -72,7 +66,7 @@ static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
         CDEBUG(D_HA, "cancelling %d cookies\n",
                (int)(mlcd->mlcd_cookielen / sizeof(*mlcd->mlcd_cookies)));
 
-        rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, mlcd->mlcd_lmm,
+        rc = obd_unpackmd(obd->u.mds.mds_lov_exp, &lsm, mlcd->mlcd_lmm,
                           mlcd->mlcd_eadatalen);
         if (rc < 0) {
                 CERROR("bad LSM cancelling %d log cookies: rc %d\n",
@@ -99,19 +93,18 @@ int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
                        struct ptlrpc_request *req, int rc, __u32 op_data)
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
-        struct mds_client_data *mcd = med->med_mcd;
         struct obd_device *obd = req->rq_export->exp_obd;
-        int err;
+        struct mds_client_data *mcd = med->med_mcd;
+        int err, log_pri = D_HA;
         __u64 transno;
         loff_t off;
-        int log_pri = D_HA;
         ENTRY;
 
         /* if the export has already been failed, we have no last_rcvd slot */
         if (req->rq_export->exp_failed) {
                 CERROR("committing transaction for disconnected client\n");
                 if (handle)
-                        GOTO(commit, rc);
+                        GOTO(out_commit, rc);
                 RETURN(rc);
         }
 
@@ -149,12 +142,11 @@ int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
         mcd->mcd_last_result = cpu_to_le32(rc);
         mcd->mcd_last_data = cpu_to_le32(op_data);
 
-
-        fsfilt_add_journal_cb(req->rq_export->exp_obd, mds->mds_sb,
-                              transno, handle, mds_commit_cb, NULL);
+        fsfilt_add_journal_cb(obd, mds->mds_sb, transno, handle,
+                              mds_commit_last_transno_cb, NULL);
         
-        err = fsfilt_write_record(obd, mds->mds_rcvd_filp, mcd, sizeof(*mcd),
-                                  &off, 0);
+        err = fsfilt_write_record(obd, mds->mds_rcvd_filp, mcd,
+                                  sizeof(*mcd), &off, 0);
 
         if (err) {
                 log_pri = D_ERROR;
@@ -166,6 +158,13 @@ int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
                   "wrote trans #"LPU64" client %s at idx %u: err = %d",
                   transno, mcd->mcd_uuid, med->med_idx, err);
 
+        err = mds_update_last_fid(obd, handle, 0);
+        if (err) {
+                log_pri = D_ERROR;
+                if (rc == 0)
+                        rc = err;
+        }
+                
         err = mds_lov_write_objids(obd);
         if (err) {
                 log_pri = D_ERROR;
@@ -174,7 +173,7 @@ int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
         }
         CDEBUG(log_pri, "wrote objids: err = %d\n", err);
 
-commit:
+out_commit:
         err = fsfilt_commit(obd, mds->mds_sb, inode, handle, 0);
         if (err) {
                 CERROR("error committing transaction: %d\n", err);
@@ -214,7 +213,7 @@ int mds_fix_attr(struct inode *inode, struct mds_update_record *rec)
 
         /* times */
         if ((ia_valid & (ATTR_MTIME|ATTR_ATIME)) == (ATTR_MTIME|ATTR_ATIME)) {
-                if (rec->_ur_fsuid != inode->i_uid &&
+                if (rec->ur_fsuid != inode->i_uid &&
                     (error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
                         RETURN(error);
         }
@@ -346,21 +345,19 @@ static void reconstruct_reint_setattr(struct mds_update_record *rec,
                                       int offset, struct ptlrpc_request *req)
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
-        struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
-        struct dentry *de;
         struct mds_body *body;
+        struct dentry *de;
 
         mds_req_from_mcd(req, med->med_mcd);
 
-        de = mds_fid2dentry(obd, rec->ur_fid1, NULL);
+        de = mds_id2dentry(req2obd(req), rec->ur_id1, NULL);
         if (IS_ERR(de)) {
                 LASSERT(PTR_ERR(de) == req->rq_status);
                 return;
         }
 
-        body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
-        mds_pack_inode2fid(req2obd(req), &body->fid1, de->d_inode);
-        mds_pack_inode2body(req2obd(req), body, de->d_inode);
+        body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
+        mds_pack_inode2body(req2obd(req), body, de->d_inode, 0);
 
         /* Don't return OST-specific attributes if we didn't just set them */
         if (rec->ur_iattr.ia_valid & ATTR_SIZE)
@@ -379,8 +376,7 @@ static void reconstruct_reint_setattr(struct mds_update_record *rec,
  *
  * We use the ATTR_FROM_OPEN flag to tell these cases apart. */
 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
-                             struct ptlrpc_request *req,
-                             struct lustre_handle *lh)
+                             struct ptlrpc_request *req, struct lustre_handle *lh)
 {
         struct mds_obd *mds = mds_req2mds(req);
         struct obd_device *obd = req->rq_export->exp_obd;
@@ -391,28 +387,30 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         int parent_mode;
         void *handle = NULL;
         struct mds_logcancel_data *mlcd = NULL;
-        int rc = 0, cleanup_phase = 0, err, locked = 0;
+        int rc = 0, cleanup_phase = 0, err;
+        int locked = 0;
         ENTRY;
 
         LASSERT(offset == 1);
 
-        DEBUG_REQ(D_INODE, req, "setattr "LPU64"/%u %x", rec->ur_fid1->id,
-                  rec->ur_fid1->generation, rec->ur_iattr.ia_valid);
+        DEBUG_REQ(D_INODE, req, "setattr "LPU64"/%u %x",
+                  id_ino(rec->ur_id1), id_gen(rec->ur_id1),
+                  rec->ur_iattr.ia_valid);
 
         MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req));
 
         MDS_UPDATE_COUNTER(mds, MDS_SETATTR_COUNT);
 
         if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN) {
-                de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
+                de = mds_id2dentry(obd, rec->ur_id1, NULL);
                 if (IS_ERR(de))
                         GOTO(cleanup, rc = PTR_ERR(de));
         } else {
                 __u64 lockpart = MDS_INODELOCK_UPDATE;
-                if (rec->ur_iattr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID) )
+                if (rec->ur_iattr.ia_valid & (ATTR_MODE | ATTR_UID | ATTR_GID))
                         lockpart |= MDS_INODELOCK_LOOKUP;
-                de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
-                                           lockh, &parent_mode, NULL, 0, lockpart);
+                de = mds_id2locked_dentry(obd, rec->ur_id1, NULL, LCK_PW,
+                                          lockh, &parent_mode, NULL, 0, lockpart);
                 if (IS_ERR(de))
                         GOTO(cleanup, rc = PTR_ERR(de));
                 locked = 1;
@@ -454,12 +452,12 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                 if (rc < 0)
                         GOTO(cleanup, rc);
 
-                rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE,
-                                   mds->mds_osc_exp, 0, &lsm, rec->ur_eadata);
+                rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, mds->mds_lov_exp,
+                                   0, &lsm, rec->ur_eadata);
                 if (rc)
                         GOTO(cleanup, rc);
 
-                obd_free_memmd(mds->mds_osc_exp, &lsm);
+                obd_free_memmd(mds->mds_lov_exp, &lsm);
 
                 rc = fsfilt_set_md(obd, inode, handle, rec->ur_eadata,
                                    rec->ur_eadatalen);
@@ -468,8 +466,8 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         }
 
         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
-        mds_pack_inode2fid(obd, &body->fid1, inode);
-        mds_pack_inode2body(obd, body, inode);
+        mds_pack_inode2body(obd, body, inode, 0);
+        body->id1 = *rec->ur_id1;
 
         /* Don't return OST-specific attributes if we didn't just set them */
         if (rec->ur_iattr.ia_valid & ATTR_SIZE)
@@ -479,7 +477,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
                 body->valid |= OBD_MD_FLATIME;
 
-        if (rc == 0 && rec->ur_cookielen && !IS_ERR(mds->mds_osc_obd)) {
+        if (rc == 0 && rec->ur_cookielen && !IS_ERR(mds->mds_lov_obd)) {
                 OBD_ALLOC(mlcd, sizeof(*mlcd) + rec->ur_cookielen +
                           rec->ur_eadatalen);
                 if (mlcd) {
@@ -536,7 +534,6 @@ static void reconstruct_reint_create(struct mds_update_record *rec, int offset,
                                      struct ptlrpc_request *req)
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
-        struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
         struct dentry *parent, *child;
         struct mds_body *body;
         ENTRY;
@@ -548,25 +545,22 @@ static void reconstruct_reint_create(struct mds_update_record *rec, int offset,
                 return;
         }
 
-        parent = mds_fid2dentry(obd, rec->ur_fid1, NULL);
+        parent = mds_id2dentry(req2obd(req), rec->ur_id1, NULL);
         LASSERT(!IS_ERR(parent));
-        child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
+        child = ll_lookup_one_len(rec->ur_name, parent,
+                                  rec->ur_namelen - 1);
         LASSERT(!IS_ERR(child));
         if ((child->d_flags & DCACHE_CROSS_REF)) {
                 LASSERTF(child->d_inode == NULL, "BUG 3869\n");
-                body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
-                mds_pack_dentry2fid(&body->fid1, child);
-                mds_pack_dentry2body(body, child);
-                body->valid |= OBD_MD_MDS;
+                body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
+                mds_pack_dentry2body(req2obd(req), body, child, 1);
         } else if (child->d_inode == NULL) {
-                DEBUG_REQ(D_ERROR, req, "parent "LPU64"/%u name %s mode %o",
-                          rec->ur_fid1->id, rec->ur_fid1->generation,
-                          rec->ur_name, rec->ur_mode);
+                DEBUG_REQ(D_ERROR, req, "parent "DLID4" name %s mode %o",
+                          OLID4(rec->ur_id1), rec->ur_name, rec->ur_mode);
                 LASSERTF(child->d_inode != NULL, "BUG 3869\n");
         } else {
-                body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
-                mds_pack_inode2fid(req2obd(req), &body->fid1, child->d_inode);
-                mds_pack_inode2body(req2obd(req), body, child->d_inode);
+                body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
+                mds_pack_inode2body(req2obd(req), body, child->d_inode, 1);
         }
         l_dput(parent);
         l_dput(child);
@@ -593,21 +587,22 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         ENTRY;
 
         LASSERT(offset == 1);
-        LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, LUSTRE_MDS_NAME));
+        
+        LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name,
+                        LUSTRE_MDS_NAME));
 
         DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u name %s mode %o",
-                  rec->ur_fid1->id, rec->ur_fid1->generation,
+                  id_ino(rec->ur_id1), id_gen(rec->ur_id1),
                   rec->ur_name, rec->ur_mode);
 
         MDS_CHECK_RESENT(req, reconstruct_reint_create(rec, offset, req));
 
-
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
                 GOTO(cleanup, rc = -ESTALE);
 
-        dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, lockh,
-                                        &parent_mode, rec->ur_name,
-                                        rec->ur_namelen - 1, MDS_INODELOCK_UPDATE);
+        dparent = mds_id2locked_dentry(obd, rec->ur_id1, NULL, LCK_PW,
+                                       lockh, &parent_mode, rec->ur_name,
+                                       rec->ur_namelen - 1, MDS_INODELOCK_UPDATE);
         if (IS_ERR(dparent)) {
                 rc = PTR_ERR(dparent);
                 CERROR("parent lookup error %d\n", rc);
@@ -625,24 +620,27 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 GOTO(cleanup, rc);
 
         if (mea != NULL) {
-                /* dir is already splitted, check is requested filename
-                 * should live at this MDS or at another one */
-                int i;
-                i = mea_name2idx(mea, rec->ur_name, rec->ur_namelen - 1);
-                if (mea->mea_master != mea->mea_fids[i].mds) {
+                /*
+                 * dir is already splitted, check is requested filename should
+                 * live at this MDS or at another one.
+                 */
+                int i = mea_name2idx(mea, rec->ur_name, rec->ur_namelen - 1);
+                if (mea->mea_master != id_group(&mea->mea_ids[i])) {
                         CDEBUG(D_OTHER, "inapropriate MDS(%d) for %lu/%u:%s."
-                               " should be %d(%d)\n",
+                               " should be %lu(%d)\n",
                                mea->mea_master, dparent->d_inode->i_ino,
                                dparent->d_inode->i_generation, rec->ur_name,
-                               mea->mea_fids[i].mds, i);
+                               (unsigned long)id_group(&mea->mea_ids[i]), i);
                         GOTO(cleanup, rc = -ERESTART);
                 }
         }
 
-        dchild = ll_lookup_one_len(rec->ur_name, dparent, rec->ur_namelen - 1);
+        dchild = ll_lookup_one_len(rec->ur_name, dparent, 
+                                   rec->ur_namelen - 1);
         if (IS_ERR(dchild)) {
                 rc = PTR_ERR(dchild);
-                CERROR("child lookup error %d\n", rc);
+                CERROR("Can't find "DLID4"/%s, error %d\n",
+                       OLID4(rec->ur_id1), rec->ur_name, rc);
                 GOTO(cleanup, rc);
         }
 
@@ -669,8 +667,13 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                         rec->ur_mode |= S_ISGID;
         }
 
+        /*
+         * here inode number should be used only in the case of replaying. It is
+         * needed to check if object already created in the case of creating
+         * remote inode.
+         */
         dchild->d_fsdata = (void *)&dp;
-        dp.p_inum = (unsigned long)rec->ur_fid2->id;
+        dp.p_inum = (unsigned long)id_ino(rec->ur_id2);
         dp.p_ptr = req;
 
         switch (type) {
@@ -683,13 +686,15 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 break;
         }
         case S_IFDIR:{
-                int nstripes = 0;
-                int i;
+                int i, nstripes = 0;
+                struct lustre_id sid;
                 
-                /* as Peter asked, mkdir() should distribute new directories
+                /*
+                 * as Peter asked, mkdir() should distribute new directories
                  * over the whole cluster in order to distribute namespace
                  * processing load. first, we calculate which MDS to use to put
-                 * new directory's inode in. */
+                 * new directory's inode in.
+                 */
                 i = mds_choose_mdsnum(obd, rec->ur_name, rec->ur_namelen - 1, 
                                       rec->ur_flags);
                 if (i == mds->mds_num) {
@@ -699,15 +704,46 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                                 GOTO(cleanup, rc = PTR_ERR(handle));
 
                         rc = vfs_mkdir(dir, dchild, rec->ur_mode);
+                        if (rc) {
+                                CERROR("Can't create dir %s, rc = %d\n",
+                                       dchild->d_name.name, rc);
+                                GOTO(cleanup, rc);
+                        }
 
+                        down(&dchild->d_inode->i_sem);
+                        if (dp.p_inum) {
+                                rc = mds_update_inode_sid(obd, dchild->d_inode,
+                                                          handle, rec->ur_id2);
+                                if (rc) {
+                                        CERROR("mds_update_inode_sid() failed, inode %lu, "
+                                               "rc %d\n", dchild->d_inode->i_ino, rc);
+                                }
+
+                                /* 
+                                 * make sure, that fid is up-to-date.
+                                 */
+                                mds_set_last_fid(obd, id_fid(rec->ur_id2));
+                        } else {
+                                rc = mds_alloc_inode_sid(obd, dchild->d_inode,
+                                                         handle, &sid);
+                                if (rc) {
+                                        CERROR("mds_alloc_inode_sid() failed, inode %lu, "
+                                               "rc %d\n", dchild->d_inode->i_ino, rc);
+                                }
+                        }
+                        up(&dchild->d_inode->i_sem);
+                        
+                        if (rc)
+                                GOTO(cleanup, rc);
+                        
                         if (rec->ur_eadata)
                                 nstripes = *(u16 *)rec->ur_eadata;
 
                         if (rc == 0 && nstripes) {
-                                /* we pass LCK_EX to split routine to
-                                 * signalthat we have exclusive access
-                                 * to the directory. simple because
-                                 * nobody knows it already exists -bzzz */
+                                /* we pass LCK_EX to split routine to signal,
+                                 * that we have exclusive access to the
+                                 * directory. Simple because nobody knows it
+                                 * already exists -bzzz */
                                 rc = mds_try_to_split_dir(obd, dchild,
                                                           NULL, nstripes,
                                                           LCK_EX);
@@ -727,9 +763,12 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                         
                         /* first, create that inode */
                         oa = obdo_alloc();
-                        LASSERT(oa != NULL);
+                        if (!oa)
+                                GOTO(cleanup, rc = -ENOMEM);
+
                         oa->o_mds = i;
                         oa->o_easize = 0;
+
                         if (rec->ur_eadata) {
                                 /* user asks for creating splitted dir */
                                 oa->o_easize = *((u16 *) rec->ur_eadata);
@@ -738,24 +777,35 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                         obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
                                         OBD_MD_FLMTIME | OBD_MD_FLCTIME |
                                         OBD_MD_FLUID | OBD_MD_FLGID);
+                        
                         oa->o_mode = dir->i_mode;
+                        
                         CDEBUG(D_OTHER, "%s: create dir on MDS %u\n",
-                                        obd->obd_name, i);
+                               obd->obd_name, i);
+
                         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                                /*
+                                 * here inode number and generation are
+                                 * important, as this is replay request and we
+                                 * need them to check if such an object is
+                                 * already created.
+                                 */
                                 CDEBUG(D_HA, "%s: replay dir creation %*s -> %u/%u\n",
                                        obd->obd_name, rec->ur_namelen - 1,
-                                       rec->ur_name, (unsigned) rec->ur_fid2->id,
-                                       (unsigned) rec->ur_fid2->generation);
-                                oa->o_id = rec->ur_fid2->id;
-                                oa->o_generation = rec->ur_fid2->generation;
+                                       rec->ur_name, (unsigned)id_ino(rec->ur_id2),
+                                       (unsigned)id_gen(rec->ur_id2));
+                                oa->o_id = id_ino(rec->ur_id2);
+                                oa->o_fid = id_fid(rec->ur_id2);
+                                oa->o_generation = id_gen(rec->ur_id2);
                                 oa->o_flags |= OBD_FL_RECREATE_OBJS;
                         }
 
+                        /* before obd_create() is called, o_fid is not known. */
                         rc = obd_create(mds->mds_lmv_exp, oa, NULL, NULL);
                         if (rc) {
                                 CERROR("can't create remote inode: %d\n", rc);
                                 DEBUG_REQ(D_ERROR, req, "parent "LPU64"/%u name %s mode %o",
-                                          rec->ur_fid1->id, rec->ur_fid1->generation,
+                                          id_ino(rec->ur_id1), id_gen(rec->ur_id1),
                                           rec->ur_name, rec->ur_mode);
                                 obdo_free(oa);
                                 GOTO(cleanup, rc);
@@ -767,19 +817,29 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                                obdo_free(oa);
                                 GOTO(cleanup, rc = PTR_ERR(handle));
                         }
+
+                        /* creating local dentry for remote inode. */
                         rc = fsfilt_add_dir_entry(obd, dparent, rec->ur_name,
-                                                  rec->ur_namelen - 1,
-                                                  oa->o_id, oa->o_generation,
-                                                  i);
-                        LASSERT(rc == 0);
+                                                  rec->ur_namelen - 1, oa->o_id,
+                                                  oa->o_generation, i, oa->o_fid);
+
+                        if (rc) {
+                                CERROR("Can't create local entry %*s for "
+                                       "remote inode.\n", rec->ur_namelen - 1,
+                                        rec->ur_name);
+                                GOTO(cleanup, rc);
+                        }
 
                         /* fill reply */
-                        body = lustre_msg_buf(req->rq_repmsg, 0,
-                                              sizeof (*body));
-                        body->valid |= OBD_MD_FLID | OBD_MD_MDS;
-                        body->fid1.id = oa->o_id;
-                        body->fid1.mds = i;
-                        body->fid1.generation = oa->o_generation;
+                        body = lustre_msg_buf(req->rq_repmsg,
+                                              0, sizeof(*body));
+                        body->valid |= OBD_MD_FLID | OBD_MD_MDS |
+                                OBD_MD_FID;
+
+                        id_group(&body->id1) = i;
+                        id_ino(&body->id1) = oa->o_id;
+                        id_fid(&body->id1) = oa->o_fid;
+                        id_gen(&body->id1) = oa->o_generation;
                        obdo_free(oa);
                 } else {
                         /* requested name exists in the directory */
@@ -818,7 +878,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         }
 
         /* In case we stored the desired inum in here, we want to clean up. */
-        if (dchild->d_fsdata == (void *)(unsigned long)rec->ur_fid2->id)
+        if (dchild->d_fsdata == (void *)(unsigned long)id_ino(rec->ur_id2))
                 dchild->d_fsdata = NULL;
 
         if (rc) {
@@ -826,50 +886,93 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 GOTO(cleanup, rc);
         } else if (dchild->d_inode) {
                 struct iattr iattr;
-                struct inode *inode = dchild->d_inode;
                 struct mds_body *body;
+                struct inode *inode = dchild->d_inode;
 
                 created = 1;
+                iattr.ia_uid = rec->ur_fsuid;
                 LTIME_S(iattr.ia_atime) = rec->ur_time;
                 LTIME_S(iattr.ia_ctime) = rec->ur_time;
                 LTIME_S(iattr.ia_mtime) = rec->ur_time;
-                iattr.ia_uid = rec->_ur_fsuid;
+
                 if (dir->i_mode & S_ISGID)
                         iattr.ia_gid = dir->i_gid;
                 else
-                        iattr.ia_gid = rec->_ur_fsgid;
+                        iattr.ia_gid = rec->ur_fsgid;
+
                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
                         ATTR_MTIME | ATTR_CTIME;
 
-                if (rec->ur_fid2->id) {
-                        LASSERT(rec->ur_fid2->id == inode->i_ino);
-                        inode->i_generation = rec->ur_fid2->generation;
-                        /* Dirtied and committed by the upcoming setattr. */
+                if (id_ino(rec->ur_id2)) {
+                        LASSERT(id_ino(rec->ur_id2) == inode->i_ino);
+                        inode->i_generation = id_gen(rec->ur_id2);
+
+                        if (type != S_IFDIR) {
+                                /* 
+                                 * updating inode self id, as inode already
+                                 * exists and we should make sure, its sid will
+                                 * be the same as we reveived.
+                                 */
+                                down(&inode->i_sem);
+                                rc = mds_update_inode_sid(obd, inode,
+                                                          handle, rec->ur_id2);
+                                up(&inode->i_sem);
+                                if (rc) {
+                                        CERROR("Can't update inode self id, "
+                                               "rc = %d.\n", rc);
+                                }
+
+                                /* 
+                                 * make sure, that fid is up-to-date.
+                                 */
+                                mds_set_last_fid(obd, id_fid(rec->ur_id2));
+                        }
+                        
+                        /* dirtied and committed by the upcoming setattr. */
                         CDEBUG(D_INODE, "recreated ino %lu with gen %u\n",
                                inode->i_ino, inode->i_generation);
                 } else {
-#if 0
                         struct lustre_handle child_ino_lockh;
-#endif
 
                         CDEBUG(D_INODE, "created ino %lu with gen %x\n",
                                inode->i_ino, inode->i_generation);
 
-#if 0
-                        /* The inode we were allocated may have just been freed
-                         * by an unlink operation.  We take this lock to
-                         * synchronize against the matching reply-ack-lock taken
-                         * in unlink, to avoid replay problems if this reply
-                         * makes it out to the client but the unlink's does not.
-                         * See bug 2029 for more detail.*/
-                        rc = mds_lock_new_child(obd, inode, &child_ino_lockh);
-                        if (rc != ELDLM_OK) {
-                                CERROR("error locking for unlink/create sync: "
-                                       "%d\n", rc);
-                        } else {
-                                ldlm_lock_decref(&child_ino_lockh, LCK_EX);
+                        if (type != S_IFDIR) {
+                                struct lustre_id sid;
+                                
+                                /* 
+                                 * allocate new id for @inode if it is not dir,
+                                 * because for dir it was already done.
+                                 */
+                                down(&inode->i_sem);
+                                rc = mds_alloc_inode_sid(obd, inode,
+                                                         handle, &sid);
+                                up(&inode->i_sem);
+                                if (rc) {
+                                        CERROR("mds_alloc_inode_sid() failed, "
+                                               "inode %lu, rc %d\n", inode->i_ino,
+                                               rc);
+                                }
+                        }
+
+                        if (rc == 0) {
+                                /*
+                                 * the inode we were allocated may have just
+                                 * been freed by an unlink operation.  We take
+                                 * this lock to synchronize against the matching
+                                 * reply-ack-lock taken in unlink, to avoid
+                                 * replay problems if this reply makes it out to
+                                 * the client but the unlink's does not.  See
+                                 * bug 2029 for more detail.
+                                 */
+                                rc = mds_lock_new_child(obd, inode, &child_ino_lockh);
+                                if (rc != ELDLM_OK) {
+                                        CERROR("error locking for unlink/create sync: "
+                                               "%d\n", rc);
+                                } else {
+                                        ldlm_lock_decref(&child_ino_lockh, LCK_EX);
+                                }
                         }
-#endif
                 }
 
                 rc = fsfilt_setattr(obd, dchild, handle, &iattr, 0);
@@ -883,9 +986,8 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 else
                         MDS_UPDATE_COUNTER(mds, MDS_CREATE_COUNT);
 
-                body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
-                mds_pack_inode2fid(obd, &body->fid1, inode);
-                mds_pack_inode2body(obd, body, inode);
+                body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
+                mds_pack_inode2body(obd, body, inode, 1);
         }
         EXIT;
 
@@ -938,19 +1040,21 @@ cleanup:
         return 0;
 }
 
-static int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2,
-           ldlm_policy_data_t *p1, ldlm_policy_data_t *p2)
+static inline int
+res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2,
+       ldlm_policy_data_t *p1, ldlm_policy_data_t *p2)
 {
         int i;
 
         for (i = 0; i < RES_NAME_SIZE; i++) {
-                /* return 1 here, because enqueue_ordered will skip resources
-                 * of all zeroes if they're sorted to the end of the list. */
+                /* 
+                 * this is needed to make zeroed res_id entries to be put at the
+                 * end of list in *ordered_locks() .
+                 */
                 if (res1->name[i] == 0 && res2->name[i] != 0)
                         return 1;
                 if (res2->name[i] == 0 && res1->name[i] != 0)
                         return 0;
-
                 if (res1->name[i] > res2->name[i])
                         return 1;
                 if (res1->name[i] < res2->name[i])
@@ -979,17 +1083,17 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
                           struct lustre_handle *p2_lockh, int p2_lock_mode,
                           ldlm_policy_data_t *p2_policy)
 {
+        int lock_modes[2] = { p1_lock_mode, p2_lock_mode };
         struct ldlm_res_id *res_id[2] = { p1_res_id, p2_res_id };
         struct lustre_handle *handles[2] = { p1_lockh, p2_lockh };
-        int lock_modes[2] = { p1_lock_mode, p2_lock_mode };
         ldlm_policy_data_t *policies[2] = { p1_policy, p2_policy };
         int rc, flags;
         ENTRY;
 
         LASSERT(p1_res_id != NULL && p2_res_id != NULL);
 
-        CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n", res_id[0]->name[0],
-               res_id[1]->name[0]);
+        CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n",
+               res_id[0]->name[0], res_id[1]->name[0]);
 
         if (res_gt(p1_res_id, p2_res_id, p1_policy, p2_policy)) {
                 handles[1] = p1_lockh;
@@ -1063,7 +1167,10 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
                res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
                res_id[3]->name[0]);
 
-        /* simple insertion sort - we have at most 4 elements */
+        /* 
+         * simple insertion sort - we have at most 4 elements. Note, that zeroed
+         * res_id should be at the end of list after sorting is finished.
+         */
         for (i = 1; i < 4; i++) {
                 j = i - 1;
                 dlm_handles[4] = dlm_handles[i];
@@ -1098,8 +1205,18 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
         /* XXX we could send ASTs on all these locks first before blocking? */
         for (i = 0; i < 4; i++) {
                 flags = 0;
+
+                /* 
+                 * nevertheless zeroed res_ids should be at the end of list, and
+                 * could use break here, I think, that it is more correctly for
+                 * clear understanding of code to have continue here, as it
+                 * clearly means, that zeroed res_id should be skipped and does
+                 * not mean, that if we meet zeroed res_id we should stop
+                 * locking loop.
+                 */
                 if (res_id[i]->name[0] == 0)
                         continue;
+                
                 if (i != 0 &&
                     !memcmp(res_id[i], res_id[i-1], sizeof(*res_id[i])) &&
                     (policies[i]->l_inodebits.bits &
@@ -1150,8 +1267,11 @@ static int mds_verify_child(struct obd_device *obd,
                             struct dentry **dchildp, int child_mode,
                             ldlm_policy_data_t *child_policy,
                             const char *name, int namelen,
-                            struct ldlm_res_id *maxres)
+                            struct ldlm_res_id *maxres,
+                            unsigned long child_ino,
+                            __u32 child_gen)
 {
+        struct lustre_id sid;
         struct dentry *vchild, *dchild = *dchildp;
         int rc = 0, cleanup_phase = 2; /* parent, child locks */
         ENTRY;
@@ -1161,9 +1281,9 @@ static int mds_verify_child(struct obd_device *obd,
                 GOTO(cleanup, rc = PTR_ERR(vchild));
 
         if ((vchild->d_flags & DCACHE_CROSS_REF)) {
-                if  (child_res_id->name[0] == vchild->d_inum &&
-                                child_res_id->name[1] == vchild->d_generation) {
-                        if (dchild != NULL)
+                if (child_gen == vchild->d_generation &&
+                    child_ino == vchild->d_inum) {
+                        if (dchild)
                                 l_dput(dchild);
                         *dchildp = vchild;
                         RETURN(0);
@@ -1173,12 +1293,11 @@ static int mds_verify_child(struct obd_device *obd,
 
         if (likely((vchild->d_inode == NULL && child_res_id->name[0] == 0) ||
                    (vchild->d_inode != NULL &&
-                    child_res_id->name[0] == vchild->d_inode->i_ino &&
-                    child_res_id->name[1] == vchild->d_inode->i_generation))) {
-                if (dchild != NULL)
+                    child_gen == vchild->d_inode->i_generation &&
+                    child_ino == vchild->d_inode->i_ino))) {
+                if (dchild)
                         l_dput(dchild);
                 *dchildp = vchild;
-
                 RETURN(0);
         }
 
@@ -1187,6 +1306,7 @@ changed:
                vchild->d_inode, dchild ? dchild->d_inode : 0,
                vchild->d_inode ? vchild->d_inode->i_ino : 0,
                child_res_id->name[0]);
+
         if (child_res_id->name[0] != 0)
                 ldlm_lock_decref(child_lockh, child_mode);
         if (dchild)
@@ -1197,12 +1317,21 @@ changed:
 
         if (dchild->d_inode || (dchild->d_flags & DCACHE_CROSS_REF)) {
                 int flags = 0;
+                
                 if (dchild->d_inode) {
-                        child_res_id->name[0] = dchild->d_inode->i_ino;
-                        child_res_id->name[1] = dchild->d_inode->i_generation;
+                        down(&dchild->d_inode->i_sem);
+                        rc = mds_read_inode_sid(obd, dchild->d_inode, &sid);
+                        up(&dchild->d_inode->i_sem);
+                        if (rc) {
+                                CERROR("Can't read inode self id, inode %lu,"
+                                       " rc %d\n",  dchild->d_inode->i_ino, rc);
+                                GOTO(cleanup, rc);
+                        }
+                        child_res_id->name[0] = id_fid(&sid);
+                        child_res_id->name[1] = id_group(&sid);
                 } else {
-                        child_res_id->name[0] = dchild->d_inum;
-                        child_res_id->name[1] = dchild->d_generation;
+                        child_res_id->name[0] = dchild->d_fid;
+                        child_res_id->name[1] = dchild->d_mdsnum;
                 }
 
                 if (res_gt(parent_res_id, child_res_id, NULL, NULL) ||
@@ -1225,7 +1354,6 @@ changed:
                 memset(child_res_id, 0, sizeof(*child_res_id));
         }
 
-        EXIT;
 cleanup:
         if (rc) {
                 switch(cleanup_phase) {
@@ -1236,11 +1364,11 @@ cleanup:
                         ldlm_lock_decref(parent_lockh, parent_mode);
                 }
         }
-        return rc;
+        RETURN(rc);
 }
 
 int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
-                                struct ll_fid *fid,
+                                struct lustre_id *id,
                                 struct lustre_handle *parent_lockh,
                                 struct dentry **dparentp, int parent_mode,
                                 __u64 parent_lockpart, int *update_mode,
@@ -1249,16 +1377,19 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
                                 struct dentry **dchildp, int child_mode,
                                 __u64 child_lockpart)
 {
-        struct ldlm_res_id child_res_id = { .name = {0} };
-        struct ldlm_res_id parent_res_id = { .name = {0} };
         ldlm_policy_data_t parent_policy = {.l_inodebits = { parent_lockpart }};
         ldlm_policy_data_t child_policy = {.l_inodebits = { child_lockpart }};
-        struct inode *inode;
+        struct ldlm_res_id parent_res_id = { .name = {0} };
+        struct ldlm_res_id child_res_id = { .name = {0} };
         int rc = 0, cleanup_phase = 0;
+        unsigned long child_ino;
+        struct lustre_id sid;
+        __u32 child_gen = 0;
+        struct inode *inode;
         ENTRY;
 
         /* Step 1: Lookup parent */
-        *dparentp = mds_fid2dentry(mds, fid, NULL);
+        *dparentp = mds_id2dentry(obd, id, NULL);
         if (IS_ERR(*dparentp)) {
                 rc = PTR_ERR(*dparentp);
                 *dparentp = NULL;
@@ -1268,19 +1399,22 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
         CDEBUG(D_INODE, "parent ino %lu, name %s\n",
                (*dparentp)->d_inode->i_ino, name);
 
-        parent_res_id.name[0] = (*dparentp)->d_inode->i_ino;
-        parent_res_id.name[1] = (*dparentp)->d_inode->i_generation;
+        parent_res_id.name[0] = id_fid(id);
+        parent_res_id.name[1] = id_group(id);
+        
 #ifdef S_PDIROPS
         parent_lockh[1].cookie = 0;
         if (name && IS_PDIROPS((*dparentp)->d_inode)) {
                 struct ldlm_res_id res_id = { .name = {0} };
                 ldlm_policy_data_t policy;
                 int flags = 0;
+
                 *update_mode = mds_lock_mode_for_dir(obd, *dparentp, parent_mode);
                 if (*update_mode) {
-                        res_id.name[0] = (*dparentp)->d_inode->i_ino;
-                        res_id.name[1] = (*dparentp)->d_inode->i_generation;
+                        res_id.name[0] = id_fid(id);
+                        res_id.name[1] = id_group(id);
                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+
                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
                                               res_id, LDLM_IBITS, &policy,
                                               *update_mode, &flags,
@@ -1293,9 +1427,9 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
                 }
 
                 parent_res_id.name[2] = full_name_hash(name, namelen - 1);
-                CDEBUG(D_INFO, "take lock on %lu:%u:"LPX64"\n",
-                       (*dparentp)->d_inode->i_ino, 
-                       (*dparentp)->d_inode->i_generation,
+                
+                CDEBUG(D_INFO, "take lock on %lu:%lu:"LPX64"\n",
+                       (unsigned long)id_fid(id), (unsigned long)id_group(id),
                        parent_res_id.name[2]);
         }
 #endif
@@ -1311,12 +1445,17 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
         }
 
         if ((*dchildp)->d_flags & DCACHE_CROSS_REF) {
-                /* inode lives on another MDS: return * mds/ino/gen
-                 * and LOOKUP lock. drop possible UPDATE lock! */
+                /*
+                 * inode lives on another MDS: return * fid/mdsnum and LOOKUP
+                 * lock. Drop possible UPDATE lock!
+                 */
                 child_policy.l_inodebits.bits &= ~MDS_INODELOCK_UPDATE;
                 child_policy.l_inodebits.bits |= MDS_INODELOCK_LOOKUP;
-                child_res_id.name[0] = (*dchildp)->d_inum;
-                child_res_id.name[1] = (*dchildp)->d_generation;
+
+                child_res_id.name[0] = (*dchildp)->d_fid;
+                child_res_id.name[1] = (*dchildp)->d_mdsnum;
+                child_gen = (*dchildp)->d_generation;
+                child_ino = (*dchildp)->d_inum;
                 goto retry_locks;
         }
 
@@ -1326,17 +1465,28 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
         if (inode == NULL)
                 goto retry_locks;
 
-        child_res_id.name[0] = inode->i_ino;
-        child_res_id.name[1] = inode->i_generation;
-
+        down(&inode->i_sem);
+        rc = mds_read_inode_sid(obd, inode, &sid);
+        up(&inode->i_sem);
+        if (rc) {
+                CERROR("Can't read inode self id, inode %lu, "
+                       "rc %d\n", inode->i_ino, rc);
+                iput(inode);
+                GOTO(cleanup, rc);
+        }
+        
+        child_res_id.name[0] = id_fid(&sid);
+        child_res_id.name[1] = id_group(&sid);
+        child_gen = inode->i_generation;
+        child_ino = inode->i_ino;
         iput(inode);
 
 retry_locks:
         cleanup_phase = 2; /* child dentry */
 
         /* Step 3: Lock parent and child in resource order.  If child doesn't
-         *         exist, we still have to lock the parent and re-lookup. */
-        rc = enqueue_ordered_locks(obd,&parent_res_id,parent_lockh,parent_mode,
+         * exist, we still have to lock the parent and re-lookup. */
+        rc = enqueue_ordered_locks(obd, &parent_res_id, parent_lockh, parent_mode,
                                    &parent_policy, &child_res_id, child_lockh,
                                    child_mode, &child_policy);
         if (rc)
@@ -1351,7 +1501,8 @@ retry_locks:
         rc = mds_verify_child(obd, &parent_res_id, parent_lockh, *dparentp,
                               parent_mode, &child_res_id, child_lockh, 
                               dchildp, child_mode, &child_policy,
-                              name, namelen, &parent_res_id);
+                              name, namelen, &parent_res_id,
+                              child_ino, child_gen);
         if (rc > 0)
                 goto retry_locks;
         if (rc < 0) {
@@ -1374,10 +1525,9 @@ cleanup:
                                 ldlm_lock_decref(parent_lockh + 1, *update_mode);
 #endif
                         l_dput(*dparentp);
-                default: ;
                 }
         }
-        return rc;
+        RETURN(rc);
 }
 
 void mds_reconstruct_generic(struct ptlrpc_request *req)
@@ -1406,8 +1556,8 @@ static int mds_orphan_add_link(struct mds_update_record *rec,
         struct inode *pending_dir = mds->mds_pending_dir->d_inode;
         struct inode *inode = dentry->d_inode;
         struct dentry *pending_child;
-        char fidname[LL_FID_NAMELEN];
-        int fidlen = 0, rc, mode;
+        char idname[LL_ID_NAMELEN];
+        int idlen = 0, rc, mode;
         ENTRY;
 
         LASSERT(inode != NULL);
@@ -1417,17 +1567,18 @@ static int mds_orphan_add_link(struct mds_update_record *rec,
 #endif
         LASSERT(down_trylock(&pending_dir->i_sem) != 0);
 
-        fidlen = ll_fid2str(fidname, inode->i_ino, inode->i_generation);
+        idlen = ll_id2str(idname, inode->i_ino, inode->i_generation);
 
         CDEBUG(D_INODE, "pending destroy of %dx open %d linked %s %s = %s\n",
                mds_orphan_open_count(inode), inode->i_nlink,
                S_ISDIR(inode->i_mode) ? "dir" :
-                S_ISREG(inode->i_mode) ? "file" : "other",rec->ur_name,fidname);
+               S_ISREG(inode->i_mode) ? "file" : "other",
+               rec->ur_name, idname);
 
         if (mds_orphan_open_count(inode) == 0 || inode->i_nlink != 0)
                 RETURN(0);
 
-        pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
+        pending_child = lookup_one_len(idname, mds->mds_pending_dir, idlen);
         if (IS_ERR(pending_child))
                 RETURN(PTR_ERR(pending_child));
 
@@ -1467,41 +1618,58 @@ out_dput:
 }
 
 int mds_create_local_dentry(struct mds_update_record *rec,
-                           struct obd_device *obd)
+                            struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
-        struct inode *fids_dir = mds->mds_fids_dir->d_inode;
-        int fidlen = 0, rc, cleanup_phase = 0;
+        struct inode *id_dir = mds->mds_id_dir->d_inode;
+        int idlen = 0, rc, cleanup_phase = 0;
         struct dentry *new_child = NULL;
-        char *fidname = rec->ur_name;
+        char *idname = rec->ur_name;
         struct dentry *child = NULL;
         struct lustre_handle lockh[2] = {{0}, {0}};
+        struct lustre_id sid;
         void *handle;
         ENTRY;
 
-        down(&fids_dir->i_sem);
-        fidlen = ll_fid2str(fidname, rec->ur_fid1->id, rec->ur_fid1->generation);
-        CDEBUG(D_OTHER, "look for local dentry '%s' for %u/%u\n",
-                        fidname, (unsigned) rec->ur_fid1->id,
-                        (unsigned) rec->ur_fid1->generation);
+        down(&id_dir->i_sem);
+        idlen = ll_id2str(idname, id_ino(rec->ur_id1),
+                          id_gen(rec->ur_id1));
+        
+        CDEBUG(D_OTHER, "look for local dentry '%s' for "DLID4"\n",
+               idname, OLID4(rec->ur_id1));
 
-        new_child = lookup_one_len(fidname, mds->mds_fids_dir, fidlen);
-        up(&fids_dir->i_sem);
+        new_child = ll_lookup_one_len(idname, mds->mds_id_dir, 
+                                      idlen);
+        up(&id_dir->i_sem);
         if (IS_ERR(new_child)) {
-                CERROR("can't lookup %s: %d\n", fidname,
-                                (int) PTR_ERR(new_child));
+                CERROR("can't lookup %s: %d\n", idname,
+                       (int) PTR_ERR(new_child));
                 GOTO(cleanup, rc = PTR_ERR(new_child));
         }
         cleanup_phase = 1;
 
+        down(&id_dir->i_sem);
+        rc = mds_read_inode_sid(obd, id_dir, &sid);
+        up(&id_dir->i_sem);
+        if (rc) {
+                CERROR("Can't read inode self id, inode %lu, "
+                       "rc %d\n", id_dir->i_ino, rc);
+                GOTO(cleanup, rc);
+        }
+        
         if (new_child->d_inode != NULL) {
                 /* nice. we've already have local dentry! */
                 CDEBUG(D_OTHER, "found dentry in FIDS/: %u/%u\n", 
-                       (unsigned) new_child->d_inode->i_ino,
-                       (unsigned) new_child->d_inode->i_generation);
-                rec->ur_fid1->id = fids_dir->i_ino;
-                rec->ur_fid1->generation = fids_dir->i_generation;
-                rec->ur_namelen = fidlen + 1;
+                       (unsigned)new_child->d_inode->i_ino,
+                       (unsigned)new_child->d_inode->i_generation);
+                
+                id_ino(rec->ur_id1) = id_dir->i_ino;
+                id_gen(rec->ur_id1) = id_dir->i_generation;
+                rec->ur_namelen = idlen + 1;
+
+                id_fid(rec->ur_id1) = id_fid(&sid);
+                id_group(rec->ur_id1) = id_group(&sid);
+                
                 GOTO(cleanup, rc = 0);
         }
 
@@ -1509,11 +1677,11 @@ int mds_create_local_dentry(struct mds_update_record *rec,
         d_drop(new_child);
 
         if (rec->ur_mode & MDS_MODE_DONT_LOCK) {
-                child = mds_fid2dentry(mds, rec->ur_fid1, NULL);
+                child = mds_id2dentry(obd, rec->ur_id1, NULL);
         } else {
-                child = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL,
-                                              LCK_EX, lockh, NULL, NULL, 0,
-                                              MDS_INODELOCK_UPDATE);
+                child = mds_id2locked_dentry(obd, rec->ur_id1, NULL,
+                                             LCK_EX, lockh, NULL, NULL, 0,
+                                             MDS_INODELOCK_UPDATE);
         }
 
         if (IS_ERR(child)) {
@@ -1524,30 +1692,33 @@ int mds_create_local_dentry(struct mds_update_record *rec,
         }
         cleanup_phase = 2;
 
-        handle = fsfilt_start(obd, fids_dir, FSFILT_OP_LINK, NULL);
+        handle = fsfilt_start(obd, id_dir, FSFILT_OP_LINK, NULL);
         if (IS_ERR(handle))
                 GOTO(cleanup, rc = PTR_ERR(handle));
 
-        rc = fsfilt_add_dir_entry(obd, mds->mds_fids_dir, fidname, fidlen,
-                                  rec->ur_fid1->id, rec->ur_fid1->generation,
-                                  mds->mds_num);
+        rc = fsfilt_add_dir_entry(obd, mds->mds_id_dir, idname,
+                                  idlen, id_ino(rec->ur_id1),
+                                  id_gen(rec->ur_id1), mds->mds_num,
+                                  id_fid(rec->ur_id1));
         if (rc)
                 CERROR("error linking orphan %lu/%lu to FIDS: rc = %d\n",
-                       (unsigned long) child->d_inode->i_ino,
-                       (unsigned long) child->d_inode->i_generation, rc);
+                       (unsigned long)child->d_inode->i_ino,
+                       (unsigned long)child->d_inode->i_generation, rc);
         else {
                 if (S_ISDIR(child->d_inode->i_mode)) {
-                        fids_dir->i_nlink++;
-                        mark_inode_dirty(fids_dir);
+                        id_dir->i_nlink++;
+                        mark_inode_dirty(id_dir);
                 }
                 mark_inode_dirty(child->d_inode);
         }
-        fsfilt_commit(obd, mds->mds_sb, fids_dir, handle, 0);
+        fsfilt_commit(obd, mds->mds_sb, id_dir, handle, 0);
 
-        rec->ur_fid1->id = fids_dir->i_ino;
-        rec->ur_fid1->generation = fids_dir->i_generation;
-        rec->ur_namelen = fidlen + 1;
+        id_ino(rec->ur_id1) = id_dir->i_ino;
+        id_gen(rec->ur_id1) = id_dir->i_generation;
+        rec->ur_namelen = idlen + 1;
 
+        id_fid(rec->ur_id1) = id_fid(&sid);
+        id_group(rec->ur_id1) = id_group(&sid);
 cleanup:
         switch(cleanup_phase) {
                 case 2:
@@ -1563,7 +1734,7 @@ cleanup:
 }
 
 static int mds_copy_unlink_reply(struct ptlrpc_request *master,
-                                        struct ptlrpc_request *slave)
+                                 struct ptlrpc_request *slave)
 {
         void *cookie, *cookie2;
         struct mds_body *body2;
@@ -1577,9 +1748,8 @@ static int mds_copy_unlink_reply(struct ptlrpc_request *master,
         body2 = lustre_msg_buf(master->rq_repmsg, 0, sizeof (*body));
         LASSERT(body2 != NULL);
 
-        if (!(body->valid & (OBD_MD_FLID | OBD_MD_FLGENER))) {
+        if (!(body->valid & (OBD_MD_FLID | OBD_MD_FLGENER)))
                 RETURN(0);
-        }
 
         memcpy(body2, body, sizeof(*body));
         body2->valid &= ~OBD_MD_FLCOOKIE;
@@ -1619,49 +1789,48 @@ static int mds_copy_unlink_reply(struct ptlrpc_request *master,
         RETURN(0);
 }
 
-static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset,
-                                   struct ptlrpc_request *req,
+static int mds_reint_unlink_remote(struct mds_update_record *rec,
+                                   int offset, struct ptlrpc_request *req,
                                    struct lustre_handle *parent_lockh,
-                                   int update_mode,
-                                   struct dentry *dparent,
+                                   int update_mode, struct dentry *dparent,
                                    struct lustre_handle *child_lockh,
                                    struct dentry *dchild)
 {
         struct obd_device *obd = req->rq_export->exp_obd;
         struct mds_obd *mds = mds_req2mds(req);
-        struct mdc_op_data op_data;
-        int rc = 0, cleanup_phase = 0;
         struct ptlrpc_request *request = NULL;
+        int rc = 0, cleanup_phase = 0;
+        struct mdc_op_data op_data;
         void *handle;
         ENTRY;
 
         LASSERT(offset == 1 || offset == 3);
 
-        DEBUG_REQ(D_INODE, req, "unlink %*s (remote inode %u/%u/%u)",
-                  rec->ur_namelen - 1, rec->ur_name, (unsigned)dchild->d_mdsnum,
-                  (unsigned) dchild->d_inum, (unsigned) dchild->d_generation);
-        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
-                DEBUG_REQ(D_HA, req, "unlink %*s (remote inode %u/%u/%u)",
-                          rec->ur_namelen - 1, rec->ur_name,
-                          (unsigned)dchild->d_mdsnum,
-                          (unsigned) dchild->d_inum,
-                          (unsigned) dchild->d_generation);
-
         /* time to drop i_nlink on remote MDS */
         memset(&op_data, 0, sizeof(op_data));
-        op_data.fid1.mds = dchild->d_mdsnum;
-        op_data.fid1.id = dchild->d_inum;
-        op_data.fid1.generation = dchild->d_generation;
+        mds_pack_dentry2id(obd, &op_data.id1, dchild, 1);
         op_data.create_mode = rec->ur_mode;
+
+        DEBUG_REQ(D_INODE, req, "unlink %*s (remote inode "DLID4")",
+                  rec->ur_namelen - 1, rec->ur_name, OLID4(&op_data.id1));
+        
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                DEBUG_REQ(D_HA, req, "unlink %*s (remote inode "DLID4")",
+                          rec->ur_namelen - 1, rec->ur_name, OLID4(&op_data.id1));
+        }
+
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
                 op_data.create_mode |= MDS_MODE_REPLAY;
+        
         rc = md_unlink(mds->mds_lmv_exp, &op_data, &request);
         cleanup_phase = 2;
+
         if (request) {
                 if (rc == 0)
                         mds_copy_unlink_reply(req, request);
                 ptlrpc_req_finished(request);
         }
+
         if (rc == 0) {
                 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_RMDIR,
                                       NULL);
@@ -1671,6 +1840,7 @@ static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset,
                 rc = mds_finish_transno(mds, dparent->d_inode, handle, req,
                                         rc, 0);
         }
+        EXIT;
 cleanup:
         req->rq_status = rc;
 
@@ -1700,22 +1870,21 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         struct inode *child_inode = NULL;
         struct lustre_handle parent_lockh[2] = {{0}, {0}}; 
         struct lustre_handle child_lockh = {0};
-#if 0
         struct lustre_handle child_reuse_lockh = {0};
-#endif
-        struct lustre_handle * slave_lockh = NULL;
+        struct lustre_handle *slave_lockh = NULL;
+        char idname[LL_ID_NAMELEN];
         struct llog_create_locks *lcl = NULL;
-        char fidname[LL_FID_NAMELEN];
         void *handle = NULL;
         int rc = 0, cleanup_phase = 0;
-        int unlink_by_fid = 0;
+        int unlink_by_id = 0;
         int update_mode;
         ENTRY;
 
         LASSERT(offset == 1 || offset == 3);
 
         DEBUG_REQ(D_INODE, req, "parent ino "LPU64"/%u, child %s",
-                  rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name);
+                  id_ino(rec->ur_id1), id_gen(rec->ur_id1),
+                  rec->ur_name);
 
         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
 
@@ -1734,31 +1903,29 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
 
         if (rec->ur_namelen == 1) {
                 /* this is request to drop i_nlink on local inode */
-                unlink_by_fid = 1;
-                rec->ur_name = fidname;
+                unlink_by_id = 1;
+                rec->ur_name = idname;
                 rc = mds_create_local_dentry(rec, obd);
                 if (rc == -ENOENT || (rec->ur_mode & MDS_MODE_REPLAY)) {
                         DEBUG_REQ(D_HA, req,
-                                  "drop nlink on inode %u/%u/%u (replay)",
-                                  (unsigned) rec->ur_fid1->mds,
-                                  (unsigned) rec->ur_fid1->id,
-                                  (unsigned) rec->ur_fid1->generation);
+                                  "drop nlink on inode "DLID4" (replay)",
+                                  OLID4(rec->ur_id1));
                         req->rq_status = 0;
                         RETURN(0);
                 }
         }
 
         if (rec->ur_mode & MDS_MODE_DONT_LOCK) {
-                /* master mds for directory asks slave removing
-                 * inode is already locked */
-                dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL,
+                /* master mds for directory asks slave removing inode is already
+                 * locked */
+                dparent = mds_id2locked_dentry(obd, rec->ur_id1, NULL,
                                                LCK_PW, parent_lockh,
                                                &update_mode, rec->ur_name,
                                                rec->ur_namelen,
                                                MDS_INODELOCK_UPDATE);
                 if (IS_ERR(dparent))
                         GOTO(cleanup, rc = PTR_ERR(dparent));
-                dchild = ll_lookup_one_len(rec->ur_name, dparent,
+                dchild = ll_lookup_one_len(rec->ur_name, dparent, 
                                            rec->ur_namelen - 1);
                 if (IS_ERR(dchild))
                         GOTO(cleanup, rc = PTR_ERR(dchild));
@@ -1767,7 +1934,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 LASSERT(dchild->d_inode != NULL);
                 LASSERT(S_ISDIR(dchild->d_inode->i_mode));
         } else {
-                rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
+                rc = mds_get_parent_child_locked(obd, mds, rec->ur_id1,
                                                  parent_lockh, &dparent,
                                                  LCK_PW, MDS_INODELOCK_UPDATE,
                                                  &update_mode, rec->ur_name,
@@ -1781,7 +1948,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
 
         if (dchild->d_flags & DCACHE_CROSS_REF) {
                 /* we should have parent lock only here */
-                LASSERT(unlink_by_fid == 0);
+                LASSERT(unlink_by_id == 0);
                 LASSERT(dchild->d_mdsnum != mds->mds_num);
                 mds_reint_unlink_remote(rec, offset, req, parent_lockh,
                                         update_mode, dparent, &child_lockh, dchild);
@@ -1819,13 +1986,11 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         if (rc)
                 GOTO(cleanup, rc);
 
-#if 0
         /* Step 4: Get a lock on the ino to sync with creation WRT inode
          * reuse (see bug 2029). */
         rc = mds_lock_new_child(obd, child_inode, &child_reuse_lockh);
         if (rc != ELDLM_OK)
                 GOTO(cleanup, rc);
-#endif
         cleanup_phase = 3; /* child inum lock */
 
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, dparent->d_inode->i_sb);
@@ -1853,10 +2018,9 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                         down(&mds->mds_pending_dir->d_inode->i_sem);
                         cleanup_phase = 5; /* up(&pending_dir->i_sem) */
                 } else if (S_ISREG(child_inode->i_mode)) {
-                        mds_pack_inode2fid(obd, &body->fid1, child_inode);
-                        mds_pack_inode2body(obd, body, child_inode);
-                        mds_pack_md(obd, req->rq_repmsg, offset + 1, body,
-                                    child_inode, MDS_PACK_MD_LOCK);
+                        mds_pack_inode2body(obd, body, child_inode, 0);
+                        mds_pack_md(obd, req->rq_repmsg, offset + 1,
+                                    body, child_inode, MDS_PACK_MD_LOCK);
                 }
         }
 
@@ -1944,7 +2108,7 @@ cleanup:
         rc = mds_finish_transno(mds, dparent ? dparent->d_inode : NULL,
                                 handle, req, rc, 0);
         if (!rc)
-                (void)obd_set_info(mds->mds_osc_exp, strlen("unlinked"),
+                (void)obd_set_info(mds->mds_lov_exp, strlen("unlinked"),
                                    "unlinked", 0, NULL);
         switch(cleanup_phase) {
         case 5: /* pending_dir semaphore */
@@ -1959,7 +2123,6 @@ cleanup:
                 if (lcl != NULL)
                         ptlrpc_save_llog_lock(req, lcl);
         case 3: /* child ino-reuse lock */
-#if 0
                 if (rc && body != NULL) {
                         // Don't unlink the OST objects if the MDS unlink failed
                         body->valid = 0;
@@ -1968,7 +2131,6 @@ cleanup:
                         ldlm_lock_decref(&child_reuse_lockh, LCK_EX);
                 else
                         ptlrpc_save_lock(req, &child_reuse_lockh, LCK_EX);
-#endif
         case 2: /* child lock */
                 mds_unlock_slave_objs(obd, dchild, slave_lockh);
                 if (child_lockh.cookie)
@@ -1999,8 +2161,8 @@ cleanup:
  * to service requests from remote MDS to increment i_nlink
  */
 static int mds_reint_link_acquire(struct mds_update_record *rec,
-                                 int offset, struct ptlrpc_request *req,
-                                 struct lustre_handle *lh)
+                                  int offset, struct ptlrpc_request *req,
+                                  struct lustre_handle *lh)
 {
         struct obd_device *obd = req->rq_export->exp_obd;
         struct ldlm_res_id src_res_id = { .name = {0} };
@@ -2012,26 +2174,24 @@ static int mds_reint_link_acquire(struct mds_update_record *rec,
         int flags = 0;
         ENTRY;
 
-        DEBUG_REQ(D_INODE, req, "%s: request to acquire i_nlinks %u/%u/%u\n",
-                  obd->obd_name, (unsigned) rec->ur_fid1->mds,
-                  (unsigned) rec->ur_fid1->id,
-                  (unsigned) rec->ur_fid1->generation);
+        DEBUG_REQ(D_INODE, req, "%s: request to acquire i_nlinks "DLID4"\n",
+                  obd->obd_name, OLID4(rec->ur_id1));
 
-        /* Step 1: Lookup the source inode and target directory by FID */
-        de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
+        /* Step 1: Lookup the source inode and target directory by ID */
+        de_src = mds_id2dentry(obd, rec->ur_id1, NULL);
         if (IS_ERR(de_src))
                 GOTO(cleanup, rc = PTR_ERR(de_src));
         cleanup_phase = 1; /* source dentry */
 
-        src_res_id.name[0] = de_src->d_inode->i_ino;
-        src_res_id.name[1] = de_src->d_inode->i_generation;
+        src_res_id.name[0] = id_fid(rec->ur_id1);
+        src_res_id.name[1] = id_group(rec->ur_id1);
         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
 
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
-                        src_res_id, LDLM_IBITS, &policy,
-                        LCK_EX, &flags, mds_blocking_ast,
-                        ldlm_completion_ast, NULL, NULL,
-                        NULL, 0, NULL, &src_lockh);
+                              src_res_id, LDLM_IBITS, &policy,
+                              LCK_EX, &flags, mds_blocking_ast,
+                              ldlm_completion_ast, NULL, NULL,
+                              NULL, 0, NULL, &src_lockh);
         if (rc != ELDLM_OK)
                 GOTO(cleanup, rc = -ENOLCK);
         cleanup_phase = 2; /* lock */
@@ -2087,25 +2247,20 @@ static int mds_reint_link_to_remote(struct mds_update_record *rec,
         int update_mode;
         ENTRY;
 
-#define fmt     "%s: request to link %u/%u/%u:%*s to foreign inode %u/%u/%u\n"
-        DEBUG_REQ(D_INODE, req, fmt, obd->obd_name,
-                  (unsigned) rec->ur_fid2->mds,
-                  (unsigned) rec->ur_fid2->id,
-                  (unsigned) rec->ur_fid2->generation,
-                  rec->ur_namelen - 1, rec->ur_name,
-                  (unsigned) rec->ur_fid1->mds,
-                  (unsigned) rec->ur_fid1->id,
-                  (unsigned)rec->ur_fid1->generation);
-
-        de_tgt_dir = mds_fid2locked_dentry(obd, rec->ur_fid2, NULL, LCK_EX,
-                                           tgt_dir_lockh, &update_mode,
-                                           rec->ur_name, rec->ur_namelen - 1,
-                                           MDS_INODELOCK_UPDATE);
+        DEBUG_REQ(D_INODE, req, "%s: request to link "DLID4
+                  ":%*s to foreign inode "DLID4"\n", obd->obd_name,
+                  OLID4(rec->ur_id2), rec->ur_namelen - 1, rec->ur_name,
+                  OLID4(rec->ur_id1));
+
+        de_tgt_dir = mds_id2locked_dentry(obd, rec->ur_id2, NULL, LCK_EX,
+                                          tgt_dir_lockh, &update_mode,
+                                          rec->ur_name, rec->ur_namelen - 1,
+                                          MDS_INODELOCK_UPDATE);
         if (IS_ERR(de_tgt_dir))
                 GOTO(cleanup, rc = PTR_ERR(de_tgt_dir));
         cleanup_phase = 1;
 
-        op_data.fid1 = *(rec->ur_fid1);
+        op_data.id1 = *(rec->ur_id1);
         op_data.namelen = 0;
         op_data.name = NULL;
         rc = md_link(mds->mds_lmv_exp, &op_data, &request);
@@ -2125,8 +2280,9 @@ static int mds_reint_link_to_remote(struct mds_update_record *rec,
         }
         
         rc = fsfilt_add_dir_entry(obd, de_tgt_dir, rec->ur_name,
-                                  rec->ur_namelen - 1, rec->ur_fid1->id,
-                                  rec->ur_fid1->generation, rec->ur_fid1->mds);
+                                  rec->ur_namelen - 1, id_ino(rec->ur_id1),
+                                  id_gen(rec->ur_id1), id_group(rec->ur_id1),
+                                  id_fid(rec->ur_id1));
         cleanup_phase = 3;
 
 cleanup:
@@ -2188,18 +2344,18 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         LASSERT(offset == 1);
 
         DEBUG_REQ(D_INODE, req, "original "LPU64"/%u to "LPU64"/%u %s",
-                  rec->ur_fid1->id, rec->ur_fid1->generation,
-                  rec->ur_fid2->id, rec->ur_fid2->generation, rec->ur_name);
+                  id_ino(rec->ur_id1), id_gen(rec->ur_id1),
+                  id_ino(rec->ur_id2), id_gen(rec->ur_id2),
+                  rec->ur_name);
 
         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
-        
         MDS_UPDATE_COUNTER(mds, MDS_LINK_COUNT);
         
 //      memset(tgt_dir_lockh, 0, 2*sizeof(tgt_dir_lockh[0]));
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
                 GOTO(cleanup, rc = -ENOENT);
 
-        if (rec->ur_fid1->mds != mds->mds_num) {
+        if (id_group(rec->ur_id1) != mds->mds_num) {
                 rc = mds_reint_link_to_remote(rec, offset, req, lh);
                 RETURN(rc);
         }
@@ -2209,14 +2365,14 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
                 RETURN(rc);
         }
 
-        /* Step 1: Lookup the source inode and target directory by FID */
-        de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
+        /* Step 1: Lookup the source inode and target directory by ID */
+        de_src = mds_id2dentry(obd, rec->ur_id1, NULL);
         if (IS_ERR(de_src))
                 GOTO(cleanup, rc = PTR_ERR(de_src));
 
         cleanup_phase = 1; /* source dentry */
 
-        de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
+        de_tgt_dir = mds_id2dentry(obd, rec->ur_id2, NULL);
         if (IS_ERR(de_tgt_dir)) {
                 rc = PTR_ERR(de_tgt_dir);
                 de_tgt_dir = NULL;
@@ -2226,14 +2382,15 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         cleanup_phase = 2; /* target directory dentry */
 
         CDEBUG(D_INODE, "linking %*s/%s to inode %lu\n",
-               de_tgt_dir->d_name.len, de_tgt_dir->d_name.name, rec->ur_name,
-               de_src->d_inode->i_ino);
+               de_tgt_dir->d_name.len, de_tgt_dir->d_name.name,
+               rec->ur_name, de_src->d_inode->i_ino);
 
         /* Step 2: Take the two locks */
-        src_res_id.name[0] = de_src->d_inode->i_ino;
-        src_res_id.name[1] = de_src->d_inode->i_generation;
-        tgt_dir_res_id.name[0] = de_tgt_dir->d_inode->i_ino;
-        tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
+        src_res_id.name[0] = id_fid(rec->ur_id1);
+        src_res_id.name[1] = id_group(rec->ur_id1);
+        tgt_dir_res_id.name[0] = id_fid(rec->ur_id2);
+        tgt_dir_res_id.name[1] = id_group(rec->ur_id2);
+        
 #ifdef S_PDIROPS
         if (IS_PDIROPS(de_tgt_dir->d_inode)) {
                 int flags = 0;
@@ -2251,23 +2408,23 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
 
                 tgt_dir_res_id.name[2] = full_name_hash(rec->ur_name,
                                                         rec->ur_namelen - 1);
-                CDEBUG(D_INFO, "take lock on %lu:%u:"LPX64"\n",
-                       de_tgt_dir->d_inode->i_ino,
-                       de_tgt_dir->d_inode->i_generation,
+                CDEBUG(D_INFO, "take lock on %lu:%lu:"LPX64"\n",
+                       (unsigned long)id_fid(rec->ur_id2),
+                       (unsigned long)id_group(rec->ur_id2),
                        tgt_dir_res_id.name[2]);
         }
 #endif
         rc = enqueue_ordered_locks(obd, &src_res_id, &src_lockh, LCK_EX,
-                                   &src_policy,
-                                   &tgt_dir_res_id, tgt_dir_lockh, LCK_EX,
-                                   &tgt_dir_policy);
+                                   &src_policy, &tgt_dir_res_id, tgt_dir_lockh,
+                                   LCK_EX, &tgt_dir_policy);
         if (rc)
                 GOTO(cleanup, rc);
 
         cleanup_phase = 3; /* locks */
 
         /* Step 3: Lookup the child */
-        dchild = ll_lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen-1);
+        dchild = ll_lookup_one_len(rec->ur_name, de_tgt_dir, 
+                                   rec->ur_namelen - 1);
         if (IS_ERR(dchild)) {
                 rc = PTR_ERR(dchild);
                 if (rc != -EPERM && rc != -EACCES)
@@ -2355,9 +2512,9 @@ cleanup:
  */
 static int mds_get_parents_children_locked(struct obd_device *obd,
                                            struct mds_obd *mds,
-                                           struct ll_fid *p1_fid,
+                                           struct lustre_id *p1_id,
                                            struct dentry **de_srcdirp,
-                                           struct ll_fid *p2_fid,
+                                           struct lustre_id *p2_id,
                                            struct dentry **de_tgtdirp,
                                            int parent_mode,
                                            const char *old_name, int old_len,
@@ -2381,24 +2538,28 @@ static int mds_get_parents_children_locked(struct obd_device *obd,
                                                         MDS_INODELOCK_UPDATE}};
         struct ldlm_res_id *maxres_src, *maxres_tgt;
         struct inode *inode;
+        __u32 child1_gen = 0;
+        __u32 child2_gen = 0;
+        unsigned long child1_ino;
+        unsigned long child2_ino;
         int rc = 0, cleanup_phase = 0;
         ENTRY;
 
         /* Step 1: Lookup the source directory */
-        *de_srcdirp = mds_fid2dentry(mds, p1_fid, NULL);
+        *de_srcdirp = mds_id2dentry(obd, p1_id, NULL);
         if (IS_ERR(*de_srcdirp))
                 GOTO(cleanup, rc = PTR_ERR(*de_srcdirp));
 
         cleanup_phase = 1; /* source directory dentry */
 
-        p1_res_id.name[0] = (*de_srcdirp)->d_inode->i_ino;
-        p1_res_id.name[1] = (*de_srcdirp)->d_inode->i_generation;
+        p1_res_id.name[0] = id_fid(p1_id);
+        p1_res_id.name[1] = id_group(p1_id);
 
         /* Step 2: Lookup the target directory */
-        if (memcmp(p1_fid, p2_fid, sizeof(*p1_fid)) == 0) {
+        if (id_equal_stc(p1_id, p2_id)) {
                 *de_tgtdirp = dget(*de_srcdirp);
         } else {
-                *de_tgtdirp = mds_fid2dentry(mds, p2_fid, NULL);
+                *de_tgtdirp = mds_id2dentry(obd, p2_id, NULL);
                 if (IS_ERR(*de_tgtdirp)) {
                         rc = PTR_ERR(*de_tgtdirp);
                         *de_tgtdirp = NULL;
@@ -2408,33 +2569,38 @@ static int mds_get_parents_children_locked(struct obd_device *obd,
 
         cleanup_phase = 2; /* target directory dentry */
 
-        p2_res_id.name[0] = (*de_tgtdirp)->d_inode->i_ino;
-        p2_res_id.name[1] = (*de_tgtdirp)->d_inode->i_generation;
+        p2_res_id.name[0] = id_fid(p2_id);
+        p2_res_id.name[1] = id_group(p2_id);
 
 #ifdef S_PDIROPS
         dlm_handles[5].cookie = 0;
         dlm_handles[6].cookie = 0;
+        
         if (IS_PDIROPS((*de_srcdirp)->d_inode)) {
-                /* Get a temp lock on just ino, gen to flush client cache and
-                 * to protect dirs from concurrent splitting */
-                rc = enqueue_ordered_locks(obd, &p1_res_id, &(dlm_handles[5]),
+                /*
+                 * get a temp lock on just fid, group to flush client cache and
+                 * to protect dirs from concurrent splitting.
+                 */
+                rc = enqueue_ordered_locks(obd, &p1_res_id, &dlm_handles[5],
                                            LCK_PW, &p_policy, &p2_res_id,
-                                           &(dlm_handles[6]),LCK_PW,&p_policy);
+                                           &dlm_handles[6], LCK_PW, &p_policy);
                 if (rc != ELDLM_OK)
                         GOTO(cleanup, rc);
+                
                 p1_res_id.name[2] = full_name_hash(old_name, old_len - 1);
                 p2_res_id.name[2] = full_name_hash(new_name, new_len - 1);
-                CDEBUG(D_INFO, "take locks on %lu:%u:"LPX64", %lu:%u:"LPX64"\n",
-                       (*de_srcdirp)->d_inode->i_ino,
-                       (*de_srcdirp)->d_inode->i_generation, p1_res_id.name[2],
-                       (*de_tgtdirp)->d_inode->i_ino,
-                       (*de_tgtdirp)->d_inode->i_generation, p2_res_id.name[2]);
+
+                CDEBUG(D_INFO, "take locks on "
+                       LPX64":"LPX64":"LPX64", "LPX64":"LPX64":"LPX64"\n",
+                       p1_res_id.name[0], p1_res_id.name[1], p1_res_id.name[2],
+                       p2_res_id.name[0], p2_res_id.name[1], p2_res_id.name[2]);
         }
         cleanup_phase = 3;
 #endif
 
         /* Step 3: Lookup the source child entry */
-        *de_oldp = ll_lookup_one_len(old_name, *de_srcdirp, old_len - 1);
+        *de_oldp = ll_lookup_one_len(old_name, *de_srcdirp, 
+                                     old_len - 1);
         if (IS_ERR(*de_oldp)) {
                 rc = PTR_ERR(*de_oldp);
                 CERROR("old child lookup error (%*s): %d\n",
@@ -2445,22 +2611,40 @@ static int mds_get_parents_children_locked(struct obd_device *obd,
         cleanup_phase = 4; /* original name dentry */
 
         inode = (*de_oldp)->d_inode;
-        
         if (inode != NULL) {
+                struct lustre_id sid;
+                
                 inode = igrab(inode);
                 if (inode == NULL)
                         GOTO(cleanup, rc = -ENOENT);
 
-                c1_res_id.name[0] = inode->i_ino;
-                c1_res_id.name[1] = inode->i_generation;
+                down(&inode->i_sem);
+                rc = mds_read_inode_sid(obd, inode, &sid);
+                up(&inode->i_sem);
+                if (rc) {
+                        CERROR("Can't read inode self id, inode %lu, "
+                               "rc %d\n", inode->i_ino, rc);
+                        iput(inode);
+                        GOTO(cleanup, rc);
+                }
+
+                c1_res_id.name[0] = id_fid(&sid);
+                c1_res_id.name[1] = id_group(&sid);
+                child1_gen = inode->i_generation;
+                child1_ino = inode->i_ino;
                 iput(inode);
         } else if ((*de_oldp)->d_flags & DCACHE_CROSS_REF) {
-                c1_res_id.name[0] = (*de_oldp)->d_inum;
-                c1_res_id.name[1] = (*de_oldp)->d_generation;
+                c1_res_id.name[0] = (*de_oldp)->d_fid;
+                c1_res_id.name[1] = (*de_oldp)->d_mdsnum;
+                child1_gen = (*de_oldp)->d_generation;
+                child1_ino = (*de_oldp)->d_inum;
+        } else {
+                GOTO(cleanup, rc = -ENOENT);
         }
 
         /* Step 4: Lookup the target child entry */
-        *de_newp = ll_lookup_one_len(new_name, *de_tgtdirp, new_len - 1);
+        *de_newp = ll_lookup_one_len(new_name, *de_tgtdirp, 
+                                     new_len - 1);
         if (IS_ERR(*de_newp)) {
                 rc = PTR_ERR(*de_newp);
                 CERROR("new child lookup error (%*s): %d\n",
@@ -2471,21 +2655,32 @@ static int mds_get_parents_children_locked(struct obd_device *obd,
         cleanup_phase = 5; /* target dentry */
 
         inode = (*de_newp)->d_inode;
-        
-        if (inode == NULL)
-                goto retry_locks;
-        
         if (inode != NULL) {
+                struct lustre_id sid;
+
                 inode = igrab(inode);
                 if (inode == NULL)
                         goto retry_locks;
 
-                c2_res_id.name[0] = inode->i_ino;
-                c2_res_id.name[1] = inode->i_generation;
+                down(&inode->i_sem);
+                rc = mds_read_inode_sid(obd, inode, &sid);
+                up(&inode->i_sem);
+                if (rc) {
+                        CERROR("Can't read inode self id, inode %lu, "
+                               "rc %d\n", inode->i_ino, rc);
+                        GOTO(cleanup, rc);
+                }
+
+                c2_res_id.name[0] = id_fid(&sid);
+                c2_res_id.name[1] = id_group(&sid);
+                child2_gen = inode->i_generation;
+                child2_ino = inode->i_ino;
                 iput(inode);
         } else if ((*de_newp)->d_flags & DCACHE_CROSS_REF) {
-                c2_res_id.name[0] = (*de_newp)->d_inum;
-                c2_res_id.name[1] = (*de_newp)->d_generation;
+                c2_res_id.name[0] = (*de_newp)->d_fid;
+                c2_res_id.name[1] = (*de_newp)->d_mdsnum;
+                child2_gen = (*de_newp)->d_generation;
+                child2_ino = (*de_newp)->d_inum;
         }
 
 retry_locks:
@@ -2494,12 +2689,12 @@ retry_locks:
         maxres_tgt = &p2_res_id;
         cleanup_phase = 5; /* target dentry */
 
-        if (c1_res_id.name[0] != 0 && res_gt(&c1_res_id, &p1_res_id, NULL,NULL))
+        if (c1_res_id.name[0] != 0 && res_gt(&c1_res_id, &p1_res_id, NULL, NULL))
                 maxres_src = &c1_res_id;
-        if (c2_res_id.name[0] != 0 && res_gt(&c2_res_id, &p2_res_id, NULL,NULL))
+        if (c2_res_id.name[0] != 0 && res_gt(&c2_res_id, &p2_res_id, NULL, NULL))
                 maxres_tgt = &c2_res_id;
 
-        rc = enqueue_4ordered_locks(obd, &p1_res_id,&dlm_handles[0],parent_mode,
+        rc = enqueue_4ordered_locks(obd, &p1_res_id, &dlm_handles[0], parent_mode,
                                     &p_policy,
                                     &p2_res_id, &dlm_handles[1], parent_mode,
                                     &p_policy,
@@ -2516,7 +2711,7 @@ retry_locks:
         rc = mds_verify_child(obd, &p1_res_id, &dlm_handles[0], *de_srcdirp,
                               parent_mode, &c1_res_id, &dlm_handles[2],
                               de_oldp, child_mode, &c1_policy, old_name,old_len,
-                              maxres_tgt);
+                              maxres_tgt, child1_ino, child1_gen);
         if (rc) {
                 if (c2_res_id.name[0] != 0)
                         ldlm_lock_decref(&dlm_handles[3], child_mode);
@@ -2534,7 +2729,7 @@ retry_locks:
         rc = mds_verify_child(obd, &p2_res_id, &dlm_handles[1], *de_tgtdirp,
                               parent_mode, &c2_res_id, &dlm_handles[3],
                               de_newp, child_mode, &c2_policy, new_name,
-                              new_len, maxres_src);
+                              new_len, maxres_src, child2_ino, child2_gen);
         if (rc) {
                 ldlm_lock_decref(&dlm_handles[2], child_mode);
                 ldlm_lock_decref(&dlm_handles[0], parent_mode);
@@ -2553,8 +2748,10 @@ cleanup:
                                 ldlm_lock_decref(&dlm_handles[3], child_mode);
                         if (c1_res_id.name[0] != 0)
                                 ldlm_lock_decref(&dlm_handles[2], child_mode);
-                        ldlm_lock_decref(&dlm_handles[1], parent_mode);
-                        ldlm_lock_decref(&dlm_handles[0], parent_mode);
+                        if (dlm_handles[1].cookie != 0)
+                                ldlm_lock_decref(&dlm_handles[1], parent_mode);
+                        if (dlm_handles[0].cookie != 0)
+                                ldlm_lock_decref(&dlm_handles[0], parent_mode);
                 case 5: /* target dentry */
                         l_dput(*de_newp);
                 case 4: /* source dentry */
@@ -2575,27 +2772,88 @@ cleanup:
 
         return rc;
 }
-                                                                                                                                                                                                     
+
+/*
+ * checks if dentry can be removed. This function also handles cross-ref
+ * dentries.
+ */
+static int mds_check_for_rename(struct obd_device *obd,
+                                struct dentry *dentry)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct lustre_handle *rlockh;
+        struct ptlrpc_request *req;
+        struct mdc_op_data op_data;
+        struct lookup_intent it;
+        int handle_size, rc = 0;
+        ENTRY;
+
+        LASSERT(dentry != NULL);
+
+        if (dentry->d_inode) {
+                if (S_ISDIR(dentry->d_inode->i_mode) &&
+                    !mds_is_dir_empty(obd, dentry))
+                        rc = -ENOTEMPTY;
+        } else {
+                LASSERT((dentry->d_flags & DCACHE_CROSS_REF));
+                handle_size = sizeof(struct lustre_handle);
+        
+                OBD_ALLOC(rlockh, handle_size);
+                if (rlockh == NULL)
+                        RETURN(-ENOMEM);
+
+                memset(rlockh, 0, handle_size);
+                memset(&op_data, 0, sizeof(op_data));
+                mds_pack_dentry2id(obd, &op_data.id1, dentry, 1);
+
+                it.it_op = IT_UNLINK;
+                rc = md_enqueue(mds->mds_lmv_exp, LDLM_IBITS, &it, LCK_EX,
+                                &op_data, rlockh, NULL, 0, ldlm_completion_ast,
+                                mds_blocking_ast, NULL);
+
+                if (rc)
+                        RETURN(rc);
+
+                if (rlockh->cookie != 0)
+                        ldlm_lock_decref(rlockh, LCK_EX);
+                
+                if (it.d.lustre.it_data) {
+                        req = (struct ptlrpc_request *)it.d.lustre.it_data;
+                        ptlrpc_req_finished(req);
+                }
+
+                if (it.d.lustre.it_status)
+                        rc = it.d.lustre.it_status;
+                OBD_FREE(rlockh, handle_size);
+        }
+        RETURN(rc);
+}
+
 static int mds_add_local_dentry(struct mds_update_record *rec, int offset,
-                                struct ptlrpc_request *req, struct dentry *dentry,
-                                struct dentry *de_dir, 
-                                struct dentry *de)
+                                struct ptlrpc_request *req, struct lustre_id *id,
+                                struct dentry *de_dir, struct dentry *de,
+                                int del_cross_ref)
 {
         struct obd_device *obd = req->rq_export->exp_obd;
         struct mds_obd *mds = mds_req2mds(req);
         void *handle = NULL;
         int rc = 0;
         ENTRY;
-                                                                                                                                                                                                     
+
         if (de->d_inode) {
                 /*
                  * name exists and points to local inode try to unlink this name
                  * and create new one.
                  */
                 CDEBUG(D_OTHER, "%s: %s points to local inode %lu/%lu\n",
-                       obd->obd_name, rec->ur_tgt,
-                       (unsigned long)de->d_inode->i_ino,
+                       obd->obd_name, rec->ur_tgt, (unsigned long)de->d_inode->i_ino,
                        (unsigned long)de->d_inode->i_generation);
+
+                /* checking if we can remove local dentry. */
+                rc = mds_check_for_rename(obd, de);
+                if (rc)
+                        GOTO(cleanup, rc);
+
                 handle = fsfilt_start(obd, de_dir->d_inode,
                                       FSFILT_OP_RENAME, NULL);
                 if (IS_ERR(handle))
@@ -2604,18 +2862,46 @@ static int mds_add_local_dentry(struct mds_update_record *rec, int offset,
                 if (rc)
                         GOTO(cleanup, rc);
         } else if (de->d_flags & DCACHE_CROSS_REF) {
-                /* name exists and points to remove inode */
-                CDEBUG(D_OTHER, "%s: %s points to remote inode %lu/%lu/%lu\n",
-                       obd->obd_name, rec->ur_tgt, (unsigned long)de->d_mdsnum,
-                       (unsigned long)de->d_inum, 
-                       (unsigned long)de->d_generation);
-                handle = fsfilt_start(obd, de_dir->d_inode,
-                                      FSFILT_OP_RENAME, NULL);
-                if (IS_ERR(handle))
-                        GOTO(cleanup, rc = PTR_ERR(handle));
-                rc = fsfilt_del_dir_entry(req->rq_export->exp_obd, de);
+                struct lustre_id de_id;
+
+                /* name exists and points to remote inode */
+                mds_pack_dentry2id(obd, &de_id, de, 1);
+                
+                CDEBUG(D_OTHER, "%s: %s points to remote inode "DLID4"\n",
+                       obd->obd_name, rec->ur_tgt, OLID4(&de_id));
+
+                /* checking if we can remove local dentry. */
+                rc = mds_check_for_rename(obd, de);
                 if (rc)
                         GOTO(cleanup, rc);
+
+                /*
+                 * to be fully POSIX compatible, we should add one more check:
+                 *
+                 * if de_new is subdir of dir rec->ur_id1. If so - return
+                 * -EINVAL.
+                 *
+                 * I do not know how to implement it right now, because
+                 * inodes/dentries for new and old names lie on different MDS,
+                 * so add this notice here just to make it visible for the rest
+                 * of developers and do not forget about. And when this check
+                 * will be added, del_cross_ref should gone, that is local
+                 * dentry is able to be removed if all checks passed.
+                 * 
+                 * Currently -EEXISTS is returned by fsfilt_add_dir_entry() what
+                 * is not fully correct. --umka
+                 */
+
+                if (del_cross_ref) {
+                        handle = fsfilt_start(obd, de_dir->d_inode,
+                                              FSFILT_OP_RENAME, NULL);
+                        if (IS_ERR(handle))
+                                GOTO(cleanup, rc = PTR_ERR(handle));
+                        rc = fsfilt_del_dir_entry(req->rq_export->exp_obd, de);
+                        if (rc)
+                                GOTO(cleanup, rc);
+                }
+                
         } else {
                 /* name doesn't exist. the simplest case. */
                 handle = fsfilt_start(obd, de_dir->d_inode,
@@ -2623,26 +2909,25 @@ static int mds_add_local_dentry(struct mds_update_record *rec, int offset,
                 if (IS_ERR(handle))
                         GOTO(cleanup, rc = PTR_ERR(handle));
         }
-                                                                                                                                                                                                     
+
         rc = fsfilt_add_dir_entry(obd, de_dir, rec->ur_tgt,
-                                  rec->ur_tgtlen - 1, dentry->d_inum,
-                                  dentry->d_generation, dentry->d_mdsnum);
+                                  rec->ur_tgtlen - 1, id_ino(id),
+                                  id_gen(id), id_group(id), id_fid(id));
         if (rc) {
                 CERROR("add_dir_entry() returned error %d\n", rc);
                 GOTO(cleanup, rc);
         }
-                                                                                                                                                                                                     
+
 cleanup:
         EXIT;
         rc = mds_finish_transno(mds, de_dir ? de_dir->d_inode : NULL,
                                 handle, req, rc, 0);
-                                                                                                                                                                                                     
+
         RETURN(rc);
 }
 
 static int mds_del_local_dentry(struct mds_update_record *rec, int offset,
-                                struct ptlrpc_request *req, 
-                                struct dentry *dentry, struct dentry *de_dir, 
+                                struct ptlrpc_request *req, struct dentry *de_dir,
                                 struct dentry *de)
 {
         struct obd_device *obd = req->rq_export->exp_obd;
@@ -2650,13 +2935,13 @@ static int mds_del_local_dentry(struct mds_update_record *rec, int offset,
         void *handle = NULL;
         int rc = 0;
         ENTRY;
-                                                                                                                                                                                                     
+
         handle = fsfilt_start(obd, de_dir->d_inode, FSFILT_OP_UNLINK, NULL);
         if (IS_ERR(handle))
                 GOTO(cleanup, rc = PTR_ERR(handle));
         rc = fsfilt_del_dir_entry(obd, de);
         d_drop(de);
-                                                                                                                                                                                                     
+
 cleanup:
         EXIT;
         rc = mds_finish_transno(mds, de_dir ? de_dir->d_inode : NULL,
@@ -2664,35 +2949,31 @@ cleanup:
         RETURN(0);
 }
 
-
 static int mds_reint_rename_create_name(struct mds_update_record *rec,
                                         int offset, struct ptlrpc_request *req)
 {
+        struct lustre_handle parent_lockh[2] = {{0}, {0}};
         struct obd_device *obd = req->rq_export->exp_obd;
-        struct dentry *de_srcdir = NULL;
-        struct dentry *de_new = NULL;
         struct mds_obd *mds = mds_req2mds(req);
-        struct lustre_handle parent_lockh[2] = {{0}, {0}};
         struct lustre_handle child_lockh = {0};
+        struct dentry *de_tgtdir = NULL;
+        struct dentry *de_new = NULL;
         int cleanup_phase = 0;
-        void *handle = NULL;
         int update_mode, rc = 0;
         ENTRY;
 
-        /* another MDS executing rename operation has asked us
-         * to create target name. such a creation should destroy
-         * existing target name */
-
-        CDEBUG(D_OTHER, "%s: request to create name %s for %lu/%lu/%lu\n",
-                        obd->obd_name, rec->ur_tgt,
-                        (unsigned long) rec->ur_fid1->mds,
-                        (unsigned long) rec->ur_fid1->id,
-                        (unsigned long) rec->ur_fid1->generation);
+        /*
+         * another MDS executing rename operation has asked us to create target
+         * name. such a creation should destroy existing target name.
+         */
+        CDEBUG(D_OTHER, "%s: request to create name %s for "DLID4"\n",
+               obd->obd_name, rec->ur_tgt, OLID4(rec->ur_id1));
 
         /* first, lookup the target */
         child_lockh.cookie = 0;
-        rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid2, parent_lockh,
-                                         &de_srcdir,LCK_PW,MDS_INODELOCK_UPDATE,
+        
+        rc = mds_get_parent_child_locked(obd, mds, rec->ur_id2, parent_lockh,
+                                         &de_tgtdir, LCK_PW, MDS_INODELOCK_UPDATE,
                                          &update_mode, rec->ur_tgt, rec->ur_tgtlen,
                                          &child_lockh, &de_new, LCK_EX,
                                          MDS_INODELOCK_LOOKUP);
@@ -2701,68 +2982,30 @@ static int mds_reint_rename_create_name(struct mds_update_record *rec,
 
         cleanup_phase = 1;
 
-        LASSERT(de_srcdir);
-        LASSERT(de_srcdir->d_inode);
+        LASSERT(de_tgtdir);
+        LASSERT(de_tgtdir->d_inode);
         LASSERT(de_new);
 
-        if (de_new->d_inode) {
-                /* name exists and points to local inode
-                 * try to unlink this name and create new one */
-                CERROR("%s: %s points to local inode %lu/%lu\n",
-                       obd->obd_name, rec->ur_tgt,
-                       (unsigned long) de_new->d_inode->i_ino,
-                       (unsigned long) de_new->d_inode->i_generation);
-                handle = fsfilt_start(obd, de_srcdir->d_inode,
-                                      FSFILT_OP_RENAME, NULL);
-                if (IS_ERR(handle))
-                        GOTO(cleanup, rc = PTR_ERR(handle));
-                rc = fsfilt_del_dir_entry(req->rq_export->exp_obd, de_new);
-                if (rc)
-                        GOTO(cleanup, rc);
-        } else if (de_new->d_flags & DCACHE_CROSS_REF) {
-                /* name exists adn points to remove inode */
-                CERROR("%s: %s points to remote inode %lu/%lu/%lu\n",
-                       obd->obd_name, rec->ur_tgt,
-                       (unsigned long) de_new->d_mdsnum,
-                       (unsigned long) de_new->d_inum,
-                       (unsigned long) de_new->d_generation);
-        } else {
-                /* name doesn't exist. the simplest case */
-                handle = fsfilt_start(obd, de_srcdir->d_inode,
-                                      FSFILT_OP_LINK, NULL);
-                if (IS_ERR(handle))
-                        GOTO(cleanup, rc = PTR_ERR(handle));
-        }
-       
-        cleanup_phase = 2;
-        rc = fsfilt_add_dir_entry(obd, de_srcdir, rec->ur_tgt,
-                        rec->ur_tgtlen - 1, rec->ur_fid1->id,
-                        rec->ur_fid1->generation, rec->ur_fid1->mds);
-        if (rc)
-                CERROR("add_dir_entry() returned error %d\n", rc);
+        rc = mds_add_local_dentry(rec, offset, req, rec->ur_id1,
+                                  de_tgtdir, de_new, 0);
+
+        GOTO(cleanup, rc);
 cleanup:
         EXIT;
-        rc = mds_finish_transno(mds, de_srcdir ? de_srcdir->d_inode : NULL,
-                                handle, req, rc, 0);
-        switch(cleanup_phase) {
-                case 2:
-                case 1:
+        
+        if (cleanup_phase == 1) {
 #ifdef S_PDIROPS
-                        if (parent_lockh[1].cookie != 0)
-                                ldlm_lock_decref(&parent_lockh[1], update_mode);
+                if (parent_lockh[1].cookie != 0)
+                        ldlm_lock_decref(&parent_lockh[1], update_mode);
 #endif
-                        ldlm_lock_decref(&parent_lockh[0], LCK_PW);
-                        if (child_lockh.cookie != 0)
-                                ldlm_lock_decref(&child_lockh, LCK_EX);
-                        l_dput(de_new);
-                        l_dput(de_srcdir);
-                        break;
-                default:
-                        LBUG();
+                ldlm_lock_decref(&parent_lockh[0], LCK_PW);
+                if (child_lockh.cookie != 0)
+                        ldlm_lock_decref(&child_lockh, LCK_EX);
+                l_dput(de_new);
+                l_dput(de_tgtdir);
         }
 
         req->rq_status = rc;
-
         RETURN(0);
 }
 
@@ -2780,13 +3023,13 @@ static int mds_reint_rename_to_remote(struct mds_update_record *rec, int offset,
         int update_mode, rc = 0;
         ENTRY;
 
-        CDEBUG(D_OTHER, "%s: move name %s onto another mds%u\n",
-               obd->obd_name, rec->ur_name, rec->ur_fid2->mds + 1);
+        CDEBUG(D_OTHER, "%s: move name %s onto another mds #%lu\n",
+               obd->obd_name, rec->ur_name, (unsigned long)id_group(rec->ur_id2));
         memset(&opdata, 0, sizeof(opdata));
 
         child_lockh.cookie = 0;
-        rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1, parent_lockh,
-                                         &de_srcdir,LCK_PW,MDS_INODELOCK_UPDATE,
+        rc = mds_get_parent_child_locked(obd, mds, rec->ur_id1, parent_lockh,
+                                         &de_srcdir, LCK_PW, MDS_INODELOCK_UPDATE,
                                          &update_mode, rec->ur_name, 
                                          rec->ur_namelen, &child_lockh, &de_old,
                                          LCK_EX, MDS_INODELOCK_LOOKUP);
@@ -2795,38 +3038,52 @@ static int mds_reint_rename_to_remote(struct mds_update_record *rec, int offset,
         LASSERT(de_srcdir->d_inode);
         LASSERT(de_old);
        
-        /* we already know the target should be created on another MDS
-         * so, we have to request that MDS to do it */
+        /*
+         * we already know the target should be created on another MDS so, we
+         * have to request that MDS to do it.
+         */
 
-        /* prepare source fid */
+        /* prepare source id */
         if (de_old->d_flags & DCACHE_CROSS_REF) {
                 LASSERT(de_old->d_inode == NULL);
                 CDEBUG(D_OTHER, "request to move remote name\n");
-                opdata.fid1.mds = de_old->d_mdsnum;
-                opdata.fid1.id = de_old->d_inum;
-                opdata.fid1.generation = de_old->d_generation;
+                mds_pack_dentry2id(obd, &opdata.id1, de_old, 1);
         } else if (de_old->d_inode == NULL) {
                 /* oh, source doesn't exist */
                 GOTO(cleanup, rc = -ENOENT);
         } else {
-                LASSERT(de_old->d_inode != NULL);
+                struct lustre_id sid;
+                struct inode *inode = de_old->d_inode;
+                
+                LASSERT(inode != NULL);
                 CDEBUG(D_OTHER, "request to move local name\n");
-                opdata.fid1.mds = mds->mds_num;
-                opdata.fid1.id = de_old->d_inode->i_ino;
-                opdata.fid1.generation = de_old->d_inode->i_generation;
+                id_ino(&opdata.id1) = inode->i_ino;
+                id_group(&opdata.id1) = mds->mds_num;
+                id_gen(&opdata.id1) = inode->i_generation;
+
+                down(&inode->i_sem);
+                rc = mds_read_inode_sid(obd, inode, &sid);
+                up(&inode->i_sem);
+                if (rc) {
+                        CERROR("Can't read inode self id, "
+                               "inode %lu, rc = %d\n",
+                               inode->i_ino, rc);
+                        GOTO(cleanup, rc);
+                }
+
+                id_fid(&opdata.id1) = id_fid(&sid);
         }
 
-        opdata.fid2 = *(rec->ur_fid2);
-        rc = md_rename(mds->mds_lmv_exp, &opdata, NULL, 0, rec->ur_tgt,
-                       rec->ur_tgtlen - 1, &req2);
+        opdata.id2 = *rec->ur_id2;
+        rc = md_rename(mds->mds_lmv_exp, &opdata, NULL, 0,
+                       rec->ur_tgt, rec->ur_tgtlen - 1, &req2);
        
         if (rc)
                 GOTO(cleanup, rc);
         
-        rc = mds_del_local_dentry(rec, offset, req, NULL, de_srcdir, de_old);
+        rc = mds_del_local_dentry(rec, offset, req, de_srcdir,
+                                  de_old);
 cleanup:
-        EXIT;
-
         if (req2)
                 ptlrpc_req_finished(req2);
 
@@ -2843,7 +3100,6 @@ cleanup:
 
         req->rq_status = rc;
         RETURN(0);
-
 }
 
 static int mds_reint_rename(struct mds_update_record *rec, int offset,
@@ -2861,15 +3117,14 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         struct llog_create_locks *lcl = NULL;
         struct lov_mds_md *lmm = NULL;
         int rc = 0, cleanup_phase = 0;
-
         void *handle = NULL;
         ENTRY;
 
         LASSERT(offset == 1);
 
-        DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u %s to "LPU64"/%u %s",
-                  rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name,
-                  rec->ur_fid2->id, rec->ur_fid2->generation, rec->ur_tgt);
+        DEBUG_REQ(D_INODE, req, "parent "DLID4" %s to "DLID4" %s",
+                  OLID4(rec->ur_id1), rec->ur_name, OLID4(rec->ur_id2),
+                  rec->ur_tgt);
 
         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
 
@@ -2887,13 +3142,14 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
                 RETURN(rc);
         }
 
-        if (rec->ur_fid2->mds != mds->mds_num) {
+        /* check if new name should be located on remote target. */
+        if (id_group(rec->ur_id2) != mds->mds_num) {
                 rc = mds_reint_rename_to_remote(rec, offset, req);
                 RETURN(rc);
         }
         
-        rc = mds_get_parents_children_locked(obd, mds, rec->ur_fid1, &de_srcdir,
-                                             rec->ur_fid2, &de_tgtdir, LCK_PW,
+        rc = mds_get_parents_children_locked(obd, mds, rec->ur_id1, &de_srcdir,
+                                             rec->ur_id2, &de_tgtdir, LCK_PW,
                                              rec->ur_name, rec->ur_namelen,
                                              &de_old, rec->ur_tgt,
                                              rec->ur_tgtlen, &de_new,
@@ -2902,18 +3158,17 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
                 GOTO(cleanup, rc);
 
         cleanup_phase = 1; /* parent(s), children, locks */
-
         old_inode = de_old->d_inode;
         new_inode = de_new->d_inode;
 
         /* sanity check for src inode */
         if (de_old->d_flags & DCACHE_CROSS_REF) {
                 LASSERT(de_old->d_inode == NULL);
-                                                                                                                                                                                                     
+
                 /*
                  * in the case of cross-ref dir, we can perform this check only
                  * if child and parent lie on the same mds. This is because
-                 * otherwise they can have the same inodes.
+                 * otherwise they can have the same inode numbers.
                  */
                 if (de_old->d_mdsnum == mds->mds_num) {
                         if (de_old->d_inum == de_srcdir->d_inode->i_ino ||
@@ -2926,18 +3181,18 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
                     de_old->d_inode->i_ino == de_tgtdir->d_inode->i_ino)
                         GOTO(cleanup, rc = -EINVAL);
         }
-                                                                                                                                                                                                     
+
         /* sanity check for dest inode */
         if (de_new->d_flags & DCACHE_CROSS_REF) {
                 LASSERT(new_inode == NULL);
-                                                                                                                                                                                                     
+
                 /* the same check about target dentry. */
                 if (de_new->d_mdsnum == mds->mds_num) {
                         if (de_new->d_inum == de_srcdir->d_inode->i_ino ||
                             de_new->d_inum == de_tgtdir->d_inode->i_ino)
                                 GOTO(cleanup, rc = -EINVAL);
                 }
-                                                                                                                                                                                                     
+                
                 /*
                  * regular files usualy do not have ->rename() implemented. But
                  * we handle only this case when @de_new is cross-ref entry,
@@ -2951,22 +3206,35 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
                     (new_inode->i_ino == de_srcdir->d_inode->i_ino ||
                      new_inode->i_ino == de_tgtdir->d_inode->i_ino))
                         GOTO(cleanup, rc = -EINVAL);
+
         }
         
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)
+        /*
+         * check if we are moving old entry into its child. 2.6 does not check
+         * for this in vfs_rename() anymore.
+         */
+        if (is_subdir(de_new, de_old))
+                GOTO(cleanup, rc = -EINVAL);
+#endif
+        
         /* check if inodes point to each other. */
         if (!(de_old->d_flags & DCACHE_CROSS_REF) &&
             !(de_new->d_flags & DCACHE_CROSS_REF) &&
             old_inode == new_inode)
                 GOTO(cleanup, rc = 0);
 
-        /* if we are about to remove the target at first, pass the EA of
-         * that inode to client to perform and cleanup on OST */
+        /*
+         * if we are about to remove the target at first, pass the EA of that
+         * inode to client to perform and cleanup on OST.
+         */
         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
         LASSERT(body != NULL);
 
         /* child i_alloc_sem protects orphan_dec_test && is_orphan race */
         if (new_inode) 
                 DOWN_READ_I_ALLOC_SEM(new_inode);
+        
         cleanup_phase = 2; /* up(&new_inode->i_sem) when finished */
 
         if (new_inode && ((S_ISDIR(new_inode->i_mode) && 
@@ -2977,29 +3245,26 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
                         down(&mds->mds_pending_dir->d_inode->i_sem);
                         cleanup_phase = 3; /* up(&pending_dir->i_sem) */
                 } else if (S_ISREG(new_inode->i_mode)) {
-                        mds_pack_inode2fid(obd, &body->fid1, new_inode);
-                        mds_pack_inode2body(obd, body, new_inode);
-                        mds_pack_md(obd, req->rq_repmsg, 1, body, new_inode, 
-                                    MDS_PACK_MD_LOCK);
+                        mds_pack_inode2body(obd, body, new_inode, 0);
+                        mds_pack_md(obd, req->rq_repmsg, 1, body, 
+                                    new_inode, MDS_PACK_MD_LOCK);
                  }
         }
 
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
                        de_srcdir->d_inode->i_sb);
 
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)
-        /* Check if we are moving old entry into its child. 2.6 does not
-           check for this in vfs_rename() anymore */
-        if (is_subdir(de_new, de_old))
-                GOTO(cleanup, rc = -EINVAL);
-#endif
         if (de_old->d_flags & DCACHE_CROSS_REF) {
-                rc = mds_add_local_dentry(rec, offset, req, de_old, de_tgtdir, 
-                                          de_new);
+                struct lustre_id old_id;
+
+                mds_pack_dentry2id(obd, &old_id, de_old, 1);
+                
+                rc = mds_add_local_dentry(rec, offset, req, &old_id,
+                                          de_tgtdir, de_new, 1);
                 if (rc)
                         GOTO(cleanup, rc);
-                                                                                                                                                                                                     
-                rc = mds_del_local_dentry(rec, offset, req, de_old, de_srcdir, 
+
+                rc = mds_del_local_dentry(rec, offset, req, de_srcdir,
                                           de_old);
                 GOTO(cleanup, rc);
         }
@@ -3042,8 +3307,10 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
 
         GOTO(cleanup, rc);
 cleanup:
-        rc = mds_finish_transno(mds, de_tgtdir ? de_tgtdir->d_inode : NULL,
+        EXIT;
+        rc = mds_finish_transno(mds, (de_tgtdir ? de_tgtdir->d_inode : NULL),
                                 handle, req, rc, 0);
+
         switch (cleanup_phase) {
         case 3:
                 up(&mds->mds_pending_dir->d_inode->i_sem);