Whamcloud - gitweb
- extN-wantedi accepts generation as well as ino to create an inode with
authoralex <alex>
Fri, 21 May 2004 17:16:44 +0000 (17:16 +0000)
committeralex <alex>
Fri, 21 May 2004 17:16:44 +0000 (17:16 +0000)
  given ino/generation. we need this for cross-node mkdir() which creates
  inode on remote MDS and adds dir entry then. after that remote MDS may
  fail and primary MDS will have to recreate inode with already gotten
  ino and generation numbers
- export refcounting bug in lmv fixed
- mdc_obj_create() stores just received ino/generation back to request.
  if request replay will happen mdt_obj_create() will use stored numbers
  to recreate lost inode
- mds_filter_recovery_request() accepts OST_CREATE request. need to be
  discussed though
- mdt_obj_create() rewritten to support recovery phase
- needless mds_lmv_disconnect() in mds_fs_cleanup() removed
- possible bug in retrieve_generation_numbers() fixed: it should understand
  cross-node dir entries and put proper mdsnum's

NOTE: please, re-apply extN-wantedi patch

lustre/kernel_patches/patches/extN-wantedi.patch
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_fs.c
lustre/mds/mds_internal.h
lustre/mds/mds_lmv.c

index 359dc63..cf095c8 100644 (file)
@@ -6,29 +6,35 @@
  include/linux/ext3_fs.h |    5 ++++-
  6 files changed, 90 insertions(+), 8 deletions(-)
 
---- linux-2.4.18-chaos-uml/fs/ext3/namei.c~extN-wantedi        2003-09-18 12:17:23.000000000 +0400
-+++ linux-2.4.18-chaos-uml-alexey/fs/ext3/namei.c      2003-09-18 12:17:26.000000000 +0400
-@@ -1531,6 +1531,19 @@ static int ext3_add_nondir(handle_t *han
+Index: linux-2.4.24/fs/ext3/namei.c
+===================================================================
+--- linux-2.4.24.orig/fs/ext3/namei.c  2004-05-19 13:58:43.000000000 +0400
++++ linux-2.4.24/fs/ext3/namei.c       2004-05-21 18:26:43.000000000 +0400
+@@ -1533,6 +1533,23 @@
        return err;
  }
  
 +static struct inode * ext3_new_inode_wantedi(handle_t *handle, struct inode *dir,
 +                                              int mode, struct dentry *dentry)
 +{
++      struct dentry_params *param = (struct dentry_params *) dentry->d_fsdata;
 +      unsigned long inum = 0;
++      struct inode *inode;
 +
-+      if (dentry->d_fsdata != NULL) {
-+              struct dentry_params *param =
-+                      (struct dentry_params *) dentry->d_fsdata;
++      if (param != NULL)
 +              inum = param->p_inum;
++      inode = ext3_new_inode(handle, dir, mode, inum);
++      if (inode && param && inum && param->p_generation) {
++              inode->i_generation = param->p_generation;
++              ext3_mark_inode_dirty(handle, inode);
 +      }
-+      return ext3_new_inode(handle, dir, mode, inum);
++      return inode;
 +}
 +
  /*
   * By the time this is called, we already have created
   * the directory cache entry for the new file, but it
-@@ -1554,7 +1567,7 @@ static int ext3_create (struct inode * d
+@@ -1556,7 +1573,7 @@
        if (IS_SYNC(dir))
                handle->h_sync = 1;
  
@@ -37,7 +43,7 @@
        err = PTR_ERR(inode);
        if (!IS_ERR(inode)) {
                inode->i_op = &ext3_file_inode_operations;
-@@ -1583,7 +1596,7 @@ static int ext3_mknod (struct inode * di
+@@ -1584,7 +1601,7 @@
        if (IS_SYNC(dir))
                handle->h_sync = 1;
  
@@ -46,7 +52,7 @@
        err = PTR_ERR(inode);
        if (!IS_ERR(inode)) {
                init_special_inode(inode, mode, rdev);
-@@ -1614,7 +1627,7 @@ static int ext3_mkdir(struct inode * dir
+@@ -1614,7 +1631,7 @@
        if (IS_SYNC(dir))
                handle->h_sync = 1;
  
@@ -55,7 +61,7 @@
        err = PTR_ERR(inode);
        if (IS_ERR(inode))
                goto out_stop;
-@@ -2008,7 +2021,7 @@ static int ext3_symlink (struct inode * 
+@@ -2041,7 +2058,7 @@
        if (IS_SYNC(dir))
                handle->h_sync = 1;
  
        err = PTR_ERR(inode);
        if (IS_ERR(inode))
                goto out_stop;
---- linux-2.4.18-chaos-uml/fs/ext3/ialloc.c~extN-wantedi       2003-09-18 12:17:25.000000000 +0400
-+++ linux-2.4.18-chaos-uml-alexey/fs/ext3/ialloc.c     2003-09-18 12:17:26.000000000 +0400
-@@ -330,7 +330,8 @@ int ext3_itable_block_used(struct super_
+Index: linux-2.4.24/fs/ext3/ialloc.c
+===================================================================
+--- linux-2.4.24.orig/fs/ext3/ialloc.c 2004-05-19 13:58:43.000000000 +0400
++++ linux-2.4.24/fs/ext3/ialloc.c      2004-05-19 13:58:43.000000000 +0400
+@@ -330,7 +330,8 @@
   * group to find a free inode.
   */
  struct inode * ext3_new_inode (handle_t *handle,
@@ -76,7 +84,7 @@
  {
        struct super_block * sb;
        struct buffer_head * bh;
-@@ -323,7 +324,41 @@ struct inode * ext3_new_inode (handle_t 
+@@ -355,7 +356,41 @@
        init_rwsem(&inode->u.ext3_i.truncate_sem);
  
        lock_super (sb);
  repeat:
        gdp = NULL;
        i = 0;
-@@ -438,6 +471,7 @@ repeat:
+@@ -470,6 +505,7 @@
                }
                goto repeat;
        }
        j += i * EXT3_INODES_PER_GROUP(sb) + 1;
        if (j < EXT3_FIRST_INO(sb) || j > le32_to_cpu(es->s_inodes_count)) {
                ext3_error (sb, "ext3_new_inode",
---- linux-2.4.18-18.8.0-l15/fs/ext3/inode.c~extN-wantedi       Thu Jul  3 00:15:41 2003
-+++ linux-2.4.18-18.8.0-l15-adilger/fs/ext3/inode.c    Thu Jul  3 00:17:28 2003
-@@ -2070,7 +2070,7 @@ void ext3_truncate_thread(struct inode *
+Index: linux-2.4.24/fs/ext3/inode.c
+===================================================================
+--- linux-2.4.24.orig/fs/ext3/inode.c  2004-05-19 13:58:43.000000000 +0400
++++ linux-2.4.24/fs/ext3/inode.c       2004-05-19 13:58:43.000000000 +0400
+@@ -2605,7 +2605,7 @@
        if (IS_ERR(handle))
                goto out_truncate;
  
        if (IS_ERR(new_inode)) {
                ext3_debug("truncate inode %lu directly (no new inodes)\n",
                           old_inode->i_ino);
---- linux-2.4.20/fs/ext3/ioctl.c~extN-wantedi  2003-04-08 23:35:55.000000000 -0600
-+++ linux-2.4.20-braam/fs/ext3/ioctl.c 2003-04-08 23:35:55.000000000 -0600
-@@ -23,6 +23,31 @@ int ext3_ioctl (struct inode * inode, st
+Index: linux-2.4.24/fs/ext3/ioctl.c
+===================================================================
+--- linux-2.4.24.orig/fs/ext3/ioctl.c  2004-01-10 17:04:42.000000000 +0300
++++ linux-2.4.24/fs/ext3/ioctl.c       2004-05-19 13:58:43.000000000 +0400
+@@ -23,6 +23,31 @@
        ext3_debug ("cmd = %u, arg = %lu\n", cmd, arg);
  
        switch (cmd) {
        case EXT3_IOC_GETFLAGS:
                flags = inode->u.ext3_i.i_flags & EXT3_FL_USER_VISIBLE;
                return put_user(flags, (int *) arg);
---- linux-2.4.20/include/linux/ext3_fs.h~extN-wantedi  2003-04-08 23:35:55.000000000 -0600
-+++ linux-2.4.20-braam/include/linux/ext3_fs.h 2003-04-08 23:35:55.000000000 -0600
-@@ -201,6 +201,7 @@ struct ext3_group_desc
+Index: linux-2.4.24/include/linux/ext3_fs.h
+===================================================================
+--- linux-2.4.24.orig/include/linux/ext3_fs.h  2004-05-19 13:58:43.000000000 +0400
++++ linux-2.4.24/include/linux/ext3_fs.h       2004-05-19 13:58:43.000000000 +0400
+@@ -202,6 +202,7 @@
  #define       EXT3_IOC_SETFLAGS               _IOW('f', 2, long)
  #define       EXT3_IOC_GETVERSION             _IOR('f', 3, long)
  #define       EXT3_IOC_SETVERSION             _IOW('f', 4, long)
  #define       EXT3_IOC_GETVERSION_OLD         _IOR('v', 1, long)
  #define       EXT3_IOC_SETVERSION_OLD         _IOW('v', 2, long)
  #ifdef CONFIG_JBD_DEBUG
-@@ -671,7 +672,8 @@ extern int ext3fs_dirhash(const char *na
+@@ -674,7 +675,8 @@
                          dx_hash_info *hinfo);
  
  /* ialloc.c */
  extern void ext3_free_inode (handle_t *, struct inode *);
  extern struct inode * ext3_orphan_get (struct super_block *, unsigned long);
  extern unsigned long ext3_count_free_inodes (struct super_block *);
-@@ -776,4 +778,5 @@ extern struct inode_operations ext3_fast
+@@ -766,4 +768,5 @@
  
  #endif        /* __KERNEL__ */
  
 +#define EXT3_IOC_CREATE_INUM                  _IOW('f', 5, long)
  #endif        /* _LINUX_EXT3_FS_H */
---- linux-2.4.18-chaos-uml/include/linux/dcache.h~extN-wantedi 2003-09-18 12:17:17.000000000 +0400
-+++ linux-2.4.18-chaos-uml-alexey/include/linux/dcache.h       2003-09-18 12:18:47.000000000 +0400
-@@ -62,6 +62,11 @@ static inline void intent_init(struct lo
+Index: linux-2.4.24/include/linux/dcache.h
+===================================================================
+--- linux-2.4.24.orig/include/linux/dcache.h   2004-05-19 13:58:42.000000000 +0400
++++ linux-2.4.24/include/linux/dcache.h        2004-05-21 18:20:34.000000000 +0400
+@@ -63,6 +63,12 @@
  
  #define IS_ROOT(x) ((x) == (x)->d_parent)
  
 +struct dentry_params {
 +      unsigned long   p_inum;
++      unsigned long   p_generation;
 +      void            *p_ptr;
 +};
 +
  /*
   * "quick string" -- eases parameter passing, but more importantly
   * saves "metadata" about the string (ie length and the hash).
-
-_
index 0441c14..7ab2c43 100644 (file)
@@ -80,6 +80,7 @@ static int lmv_connect_fake(struct lustre_handle *conn,
                             struct obd_uuid *cluuid)
 {
         struct lmv_obd *lmv = &obd->u.lmv;
+        struct obd_export *exp;
         int rc;
         ENTRY;
 
@@ -89,11 +90,18 @@ static int lmv_connect_fake(struct lustre_handle *conn,
                 RETURN(rc);
         }
 
-        lmv->exp = class_conn2export(conn);
-        LASSERT(lmv->exp != NULL);
+        exp = class_conn2export(conn);
+        /* We don't want to actually do the underlying connections more than
+         * once, so keep track. */
+        lmv->refcount++;
+        if (lmv->refcount > 1) {
+                class_export_put(exp);
+                RETURN(0);
+        }
 
         lmv->cluuid = *cluuid;
         lmv->connected = 0;
+        lmv->exp = exp;
 
         RETURN(0);
 }
@@ -116,14 +124,6 @@ int lmv_connect(struct obd_device *obd)
         CDEBUG(D_OTHER, "time to connect %s to %s\n",
                         cluuid->uuid, obd->obd_name);
 
-        /* We don't want to actually do the underlying connections more than
-         * once, so keep track. */
-        lmv->refcount++;
-        if (lmv->refcount > 1) {
-                class_export_put(exp);
-                RETURN(0);
-        }
-
         for (i = 0, tgts = lmv->tgts; i < lmv->count; i++, tgts++) {
                 struct obd_device *tgt_obd;
                 struct obd_uuid lmv_osc_uuid = { "LMV_OSC_UUID" };
index 08f853e..d6b156d 100644 (file)
@@ -1037,6 +1037,11 @@ int mdc_obj_create(struct obd_export *exp, struct obdo *oa,
 
         memcpy(oa, &body->oa, sizeof(*oa));
 
+        /* store ino/generation for recovery */
+        body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
+        body->oa.o_id = oa->o_id;
+        body->oa.o_generation = oa->o_generation;
+
         CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno);
         EXIT;
 out_req:
index 4279f54..d42bbc4 100644 (file)
@@ -1143,6 +1143,7 @@ static int mds_filter_recovery_request(struct ptlrpc_request *req,
         case OBD_PING:
         case MDS_REINT:
         case LDLM_ENQUEUE:
+        case OST_CREATE:
                 *process = target_queue_recovery_request(req, obd);
                 RETURN(0);
 
@@ -1169,11 +1170,23 @@ static char *reint_names[] = {
                             OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME|\
                             OBD_MD_FLID) 
 
+static void reconstruct_create(struct ptlrpc_request *req)
+{
+        struct mds_export_data *med = &req->rq_export->exp_mds_data;
+        struct mds_client_data *mcd = med->med_mcd;
+        struct ost_body *body;
+
+        body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
+
+        /* copy rc, transno and disp; steal locks */
+        mds_req_from_mcd(req, mcd);
+        CERROR("reconstruct reply for x"LPU64"\n", req->rq_xid);
+}
+
 static int mdt_obj_create(struct ptlrpc_request *req)
 {
+        struct obd_device *obd = req->rq_export->exp_obd;
         struct ldlm_res_id res_id = { .name = {0} };
-        struct obd_export *exp = req->rq_export;
-        struct obd_device *obd = exp->exp_obd;
         struct mds_obd *mds = &obd->u.mds;
         struct ost_body *body, *repbody;
         int rc, size = sizeof(*repbody);
@@ -1182,6 +1195,7 @@ static int mdt_obj_create(struct ptlrpc_request *req)
         struct lustre_handle lockh;
         struct obd_run_ctxt saved;
         ldlm_policy_data_t policy;
+        struct dentry_params dp;
         int mealen, flags = 0;
         unsigned int tmpname;
         struct obd_ucred uc;
@@ -1190,12 +1204,17 @@ static int mdt_obj_create(struct ptlrpc_request *req)
         void *handle;
         ENTRY;
        
+        DEBUG_REQ(D_HA, req, "create remote object");
+
         parent_inode = mds->mds_objects_dir->d_inode;
 
-        body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
+        body = lustre_swab_reqbuf(req, 0, sizeof(*body),
+                                  lustre_swab_ost_body);
         if (body == NULL)
                 RETURN(-EFAULT);
 
+        MDS_CHECK_RESENT(req, reconstruct_create(req));
+
         uc.ouc_fsuid = body->oa.o_uid;
         uc.ouc_fsgid = body->oa.o_gid;
 
@@ -1207,74 +1226,97 @@ static int mdt_obj_create(struct ptlrpc_request *req)
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
 
-repeat:
+
+        down(&parent_inode->i_sem);
         handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL);
         LASSERT(!IS_ERR(handle));
 
+repeat:
         tmpname = ll_insecure_random_int();
-        sprintf(fidname, "%u", tmpname);
-        new = simple_mkdir(mds->mds_objects_dir, fidname,
-                        body->oa.o_mode, 1);
+        rc = sprintf(fidname, "%u", tmpname);
+        new = lookup_one_len(fidname, mds->mds_objects_dir, rc);
         if (IS_ERR(new)) {
-                CERROR("%s: can't create new inode %s) for mkdir: %d\n",
+                CERROR("%s: can't lookup new inode (%s) for mkdir: %d\n",
                        obd->obd_name, fidname, (int) PTR_ERR(new));
-                if (PTR_ERR(new) == -EEXIST) {
-                        fsfilt_commit(obd, parent_inode, handle, 0);
-                        goto repeat;
+                fsfilt_commit(obd, new->d_inode, handle, 0);
+                up(&parent_inode->i_sem);
+                RETURN(PTR_ERR(new));
+        } else if (new->d_inode) {
+                CERROR("%s: name exists. repeat\n", obd->obd_name);
+                goto repeat;
+        }
+
+        new->d_fsdata = (void *) &dp;
+        dp.p_inum = 0;
+        dp.p_ptr = req;
+
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                DEBUG_REQ(D_HA, req, "replay create obj %lu/%lu",
+                          (unsigned long) body->oa.o_id,
+                          (unsigned long) body->oa.o_generation);
+                dp.p_inum = body->oa.o_id;
+                dp.p_generation = body->oa.o_generation;
+        }
+        rc = vfs_mkdir(parent_inode, new, body->oa.o_mode);
+        if (rc == 0) {
+                obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
+                repbody->oa.o_id = new->d_inode->i_ino;
+                repbody->oa.o_generation = new->d_inode->i_generation;
+                repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
+
+                rc = fsfilt_del_dir_entry(obd, new);
+                up(&parent_inode->i_sem);
+
+                if (rc) {
+                        CERROR("can't remove name for object: %d\n", rc);
+                        GOTO(cleanup, rc);
                 }
+                        
+                /* this lock should be taken to serialize MDS modifications
+                 * in failure case */
+                res_id.name[0] = new->d_inode->i_ino;
+                res_id.name[1] = new->d_inode->i_generation;
+                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+                                res_id, LDLM_IBITS, &policy,
+                                LCK_EX, &flags, mds_blocking_ast,
+                                ldlm_completion_ast, NULL, NULL,
+                                NULL, 0, NULL, &lockh);
+                if (rc != ELDLM_OK)
+                        GOTO(cleanup, rc);
+
+                CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
+                                (unsigned long) new->d_inode->i_ino,
+                                (unsigned long) new->d_inode->i_generation,
+                                (unsigned) new->d_inode->i_mode);
+        } else {
+                up(&parent_inode->i_sem);
+                CERROR("%s: can't create dirobj: %d\n", obd->obd_name, rc);
         }
-        LASSERT(!IS_ERR(new));
-        LASSERT(new->d_inode != NULL);
 
-        if (body->oa.o_valid & OBD_MD_FLID) {
+        if (rc == 0 && body->oa.o_valid & OBD_MD_FLID) {
                 /* this is new object for splitted dir. we have to
                  * prevent recursive splitting on it -bzzz */
                 mealen = obd_size_diskmd(mds->mds_lmv_exp, NULL);
                 OBD_ALLOC(mea, mealen);
-                LASSERT(mea != NULL);
+                if (mea == NULL)
+                        GOTO(cleanup, rc = -ENOMEM);
                 mea->mea_count = 0;
                 down(&new->d_inode->i_sem);
-                handle = fsfilt_start(obd, new->d_inode, FSFILT_OP_SETATTR, NULL);
-                LASSERT(!IS_ERR(handle));
                 rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen);
-                LASSERT(rc == 0);
-                fsfilt_commit(obd, new->d_inode, handle, 0);
-                LASSERT(rc == 0);
                 up(&new->d_inode->i_sem);
                 OBD_FREE(mea, mealen);
         }
-        obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
-        repbody->oa.o_id = new->d_inode->i_ino;
-        repbody->oa.o_generation = new->d_inode->i_generation;
-        repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
-
-        down(&parent_inode->i_sem);
-        rc = fsfilt_del_dir_entry(obd, new);
-        up(&parent_inode->i_sem);
-        LASSERT(rc == 0);
 
+cleanup:
         rc = mds_finish_transno(mds, parent_inode, handle, req, rc, 0);
-        LASSERT(rc == 0);
-
-        res_id.name[0] = new->d_inode->i_ino;
-        res_id.name[1] = new->d_inode->i_generation;
-        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
-                        res_id, LDLM_IBITS, &policy,
-                        LCK_EX, &flags, mds_blocking_ast,
-                        ldlm_completion_ast, NULL, NULL,
-                        NULL, 0, NULL, &lockh);
-        LASSERT(rc == ELDLM_OK);
-
-        CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
-                        (unsigned long) new->d_inode->i_ino,
-                        (unsigned long) new->d_inode->i_generation,
-                        (unsigned) new->d_inode->i_mode);
-
+        if (rc == 0)
+                ptlrpc_save_lock(req, &lockh, LCK_EX);
+        else
+                ldlm_lock_decref(&lockh, LCK_EX);
         l_dput(new);
         pop_ctxt(&saved, &obd->obd_ctxt, &uc);
-        ptlrpc_save_lock(req, &lockh, LCK_EX);
-        RETURN(0);
+        RETURN(rc);
 }
 
 static int mds_get_info(struct obd_export *exp, __u32 keylen,
index 1d1594f..5fdb5bd 100644 (file)
@@ -513,11 +513,9 @@ int mds_fs_cleanup(struct obd_device *obd, int flags)
         struct obd_run_ctxt saved;
         int rc = 0;
 
-        if (flags & OBD_OPT_FAILOVER) {
+        if (flags & OBD_OPT_FAILOVER)
                 CERROR("%s: shutting down for failover; client state will"
                        " be preserved.\n", obd->obd_name);
-                mds_lmv_disconnect(obd, flags);
-        }
 
         class_disconnect_exports(obd, flags); /* cleans up client info too */
         mds_server_free_data(mds);
index c07a5c6..e11bbc7 100644 (file)
@@ -119,5 +119,6 @@ int mds_try_to_split_dir(struct obd_device *, struct dentry *, struct mea **,
                          int);
 int mds_get_lmv_attr(struct obd_device *, struct inode *, struct mea **, int *);
 int mds_choose_mdsnum(struct obd_device *, const char *, int);
+int mds_lmv_postsetup(struct obd_device *);
 
 #endif /* _MDS_INTERNAL_H */
index d61a0f3..9ea4849 100644 (file)
@@ -226,6 +226,7 @@ static int dc_new_page_to_cache(struct dir_cache * dirc)
 
 static int retrieve_generation_numbers(struct dirsplit_control *dc, void *buf)
 {
+        struct mds_obd *mds = &dc->obd->u.mds;
         struct dir_entry *de;
         struct dentry *dentry;
         char * end;
@@ -233,20 +234,29 @@ static int retrieve_generation_numbers(struct dirsplit_control *dc, void *buf)
         end = buf + PAGE_SIZE;
         de = (struct dir_entry *) buf;
         while ((char *) de < end && de->namelen) {
-                LASSERT(de->namelen <= 255);
                 /* lookup an inode */
+                LASSERT(de->namelen <= 255);
                 dentry = ll_lookup_one_len(de->name, dc->dentry, de->namelen);
                 if (IS_ERR(dentry)) {
-                        CERROR("can't lookup '%*s'/%u in %lu: %d\n",
-                                (int) de->namelen, de->name,
-                                (unsigned) de->namelen,
-                                (unsigned long) dc->dentry->d_inode->i_ino,
-                                (int) PTR_ERR(dentry));
+                        CERROR("can't lookup %*s: %d\n", de->namelen,
+                               de->name, (int) PTR_ERR(dentry));
+                        goto next;
+                }
+                if (dentry->d_inode != NULL) {
+                        de->mds = mds->mds_num;
+                        de->ino = dentry->d_inode->i_ino;
+                        de->generation = dentry->d_inode->i_generation;
+                } else if (dentry->d_flags & DCACHE_CROSS_REF) {
+                        de->mds = dentry->d_mdsnum;
+                        de->ino = dentry->d_inum;
+                        de->generation = dentry->d_generation;
+                } else {
+                        CERROR("can't lookup %*s\n", de->namelen, de->name);
+                        goto next;
                 }
-                LASSERT(!IS_ERR(dentry));
-                LASSERT(dentry->d_inode != NULL);
-                de->generation = dentry->d_inode->i_generation;
                 l_dput(dentry);
+
+next:
                 de = (struct dir_entry *)
                         ((char *) de + DIR_REC_LEN(de->namelen));
         }
@@ -428,8 +438,9 @@ int mds_try_to_split_dir(struct obd_device *obd,
         if (mea && *mea)
                 RETURN(0);
 
-        CDEBUG(D_OTHER, "%s: split directory %lu/%lu\n",
-               obd->obd_name, dir->i_ino, (unsigned long) dir->i_generation);
+        CDEBUG(D_OTHER, "%s: split directory %u/%lu/%lu\n",
+               obd->obd_name, mds->mds_num, dir->i_ino,
+               (unsigned long) dir->i_generation);
 
         if (mea == NULL)
                 mea = &tmea;