Whamcloud - gitweb
Alex's fix for cross-MDS mkdir recovery (b=3869)
authoradilger <adilger>
Mon, 9 Aug 2004 04:02:18 +0000 (04:02 +0000)
committeradilger <adilger>
Mon, 9 Aug 2004 04:02:18 +0000 (04:02 +0000)
Don't hold journal transaction open across OST_CREATE (b=3313)

lustre/lvfs/fsfilt_ext3.c
lustre/mds/handler.c
lustre/mds/mds_fs.c
lustre/mds/mds_open.c
lustre/mds/mds_reint.c

index be023a8..62850b9 100644 (file)
@@ -1116,6 +1116,8 @@ static int fsfilt_ext3_add_dir_entry(struct obd_device *obd,
 #ifdef EXT3_FEATURE_INCOMPAT_MDSNUM
         struct dentry *dentry;
         int err;
+        LASSERT(ino != 0);
+        LASSERT(namelen != 0);
         dentry = ll_lookup_one_len(name, parent, namelen);
         if (IS_ERR(dentry)) {
                 CERROR("can't lookup %*s in %lu/%lu: %d\n", dentry->d_name.len,
index cd5121b..27bb918 100644 (file)
@@ -1330,14 +1330,47 @@ static char *reint_names[] = {
 static void reconstruct_create(struct ptlrpc_request *req)
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
+        struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
         struct mds_client_data *mcd = med->med_mcd;
+        struct dentry *dentry;
         struct ost_body *body;
-
-        body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
+        struct ll_fid fid;
+        ENTRY;
 
         /* copy rc, transno and disp; steal locks */
         mds_req_from_mcd(req, mcd);
-        CERROR("reconstruct reply for x"LPU64"\n", req->rq_xid);
+        if (req->rq_status) {
+                EXIT;
+                return;
+        }
+
+        fid.mds = 0;
+        fid.id = mcd->mcd_last_data;
+        fid.generation = 0;
+
+        LASSERT(fid.id != 0);
+
+        dentry = mds_fid2dentry(mds, &fid, NULL);
+        if (IS_ERR(dentry)) {
+                CERROR("can't find inode "LPU64"\n", fid.id);
+                req->rq_status = PTR_ERR(dentry);
+                EXIT;
+                return;
+        }
+
+        CWARN("reconstruct reply for x"LPU64" (remote ino) "LPU64" -> %lu/%u\n",
+              req->rq_xid, fid.id, dentry->d_inode->i_ino,
+              dentry->d_inode->i_generation);
+
+        body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
+        obdo_from_inode(&body->oa, dentry->d_inode, FILTER_VALID_FLAGS);
+        body->oa.o_id = dentry->d_inode->i_ino;
+        body->oa.o_generation = dentry->d_inode->i_generation;
+        body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
+                
+        l_dput(dentry);
+        EXIT;
+        return;
 }
 
 static int mdt_obj_create(struct ptlrpc_request *req)
@@ -1354,6 +1387,7 @@ static int mdt_obj_create(struct ptlrpc_request *req)
         struct lvfs_ucred uc;
         struct mea *mea;
         void *handle = NULL;
+        unsigned long cr_inum = 0;
         ENTRY;
        
         DEBUG_REQ(D_HA, req, "create remote object");
@@ -1396,6 +1430,7 @@ static int mdt_obj_create(struct ptlrpc_request *req)
                         repbody->oa.o_generation = new->d_inode->i_generation;
                         repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
                         cleanup_phase = 1;
+                        cr_inum = new->d_inode->i_ino;
                         GOTO(cleanup, rc = 0);
                 }
                 CWARN("hmm. for some reason dir %lu/%lu (or reply) got lost\n",
@@ -1469,10 +1504,10 @@ repeat:
                         
                 cleanup_phase = 2; /* created directory object */
 
-                CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
-                                (unsigned long) new->d_inode->i_ino,
-                                (unsigned long) new->d_inode->i_generation,
-                                (unsigned) new->d_inode->i_mode);
+                cr_inum = new->d_inode->i_ino;
+                CDEBUG(D_OTHER, "created dirobj: %lu/%u mode %o\n",
+                       new->d_inode->i_ino, new->d_inode->i_generation,
+                       new->d_inode->i_mode);
         } else {
                 up(&parent_inode->i_sem);
                 CERROR("%s: can't create dirobj: %d\n", obd->obd_name, rc);
@@ -1509,7 +1544,7 @@ cleanup:
                 if (rc == 0)
                         ptlrpc_require_repack(req);
         case 1: /* transaction */
-                rc = mds_finish_transno(mds, parent_inode, handle, req, rc, 0);
+                rc = mds_finish_transno(mds, parent_inode, handle, req, rc, cr_inum);
         }
 
         l_dput(new);
index 6c8b722..1d79843 100644 (file)
@@ -642,6 +642,8 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa,
                         struct dentry_params dp;
                         struct inode *inode;
 
+                        CWARN("creating log with ID "LPU64"\n", oa->o_id);
+                        
                         dchild->d_fsdata = (void *) &dp;
                         dp.p_ptr = NULL;
                         dp.p_inum = oa->o_id;
@@ -683,6 +685,7 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa,
         oa->o_id = filp->f_dentry->d_inode->i_ino;
         oa->o_generation = filp->f_dentry->d_inode->i_generation;
         namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
+        CWARN("created log anonymous "LPU64"/%u\n", oa->o_id, oa->o_generation);
 
         dchild = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
 
index 9074f3b..2d96fe9 100644 (file)
@@ -333,14 +333,6 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
                 RETURN(-ENOMEM);
         oti.oti_objid = *ids;
 
-        if (*handle == NULL)
-                *handle = fsfilt_start(obd, inode, FSFILT_OP_CREATE, NULL);
-        if (IS_ERR(*handle)) {
-                rc = PTR_ERR(*handle);
-                *handle = NULL;
-                GOTO(out_ids, rc);
-        }
-
         /* replay case */
         if(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
                 LASSERT (rec->ur_fid2->id);
@@ -349,6 +341,14 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
                 lmm = rec->ur_eadata;
                 LASSERT(lmm);
 
+                if (*handle == NULL)
+                        *handle = fsfilt_start(obd,inode,FSFILT_OP_CREATE,NULL);
+                if (IS_ERR(*handle)) {
+                        rc = PTR_ERR(*handle);
+                        *handle = NULL;
+                        GOTO(out_ids, rc);
+                }
+
                 mds_objids_from_lmm(*ids, lmm, &mds->mds_lov_desc);
 
                 lmm_buf = lustre_msg_buf(req->rq_repmsg, offset, 0);
@@ -446,6 +446,15 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
         LASSERT(rc >= 0);
         lmm_size = rc;
         body->eadatasize = rc;
+
+        if (*handle == NULL)
+                *handle = fsfilt_start(obd, inode, FSFILT_OP_CREATE, NULL);
+        if (IS_ERR(*handle)) {
+                rc = PTR_ERR(*handle);
+                *handle = NULL;
+                GOTO(out_ids, rc);
+        }
+
         rc = fsfilt_set_md(obd, inode, *handle, lmm, lmm_size);
         lmm_buf = lustre_msg_buf(req->rq_repmsg, offset, 0);
         lmm_bufsize = req->rq_repmsg->buflens[offset];
@@ -1055,6 +1064,12 @@ got_child:
                 else
                         MDS_UPDATE_COUNTER(mds, MDS_CREATE_COUNT);
 
+                if (!(rec->ur_flags & O_EXCL)) { /* bug 3313 */
+                        rc = fsfilt_commit(obd, dchild->d_inode->i_sb,
+                                           dchild->d_inode, handle, 0);
+                        handle = NULL;
+                }
+
                 acc_mode = 0;           /* Don't check for permissions */
         }
 
index 457f327..a9e1f40 100644 (file)
@@ -552,7 +552,7 @@ static void reconstruct_reint_create(struct mds_update_record *rec, int offset,
         child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
         LASSERT(!IS_ERR(child));
         if ((child->d_flags & DCACHE_CROSS_REF)) {
-                LASSERTF(child->d_inode == NULL, "BUG 3869");
+                LASSERTF(child->d_inode == NULL, "BUG 3869\n");
                 body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
                 mds_pack_dentry2fid(&body->fid1, child);
                 mds_pack_dentry2body(body, child);
@@ -561,7 +561,7 @@ static void reconstruct_reint_create(struct mds_update_record *rec, int offset,
                 DEBUG_REQ(D_ERROR, req, "parent "LPU64"/%u name %s mode %o",
                           rec->ur_fid1->id, rec->ur_fid1->generation,
                           rec->ur_name, rec->ur_mode);
-                LASSERTF(child->d_inode != NULL, "BUG 3869");
+                LASSERTF(child->d_inode != NULL, "BUG 3869\n");
         } else {
                 body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
                 mds_pack_inode2fid(req2obd(req), &body->fid1, child->d_inode);
@@ -752,7 +752,11 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
 
                         rc = obd_create(mds->mds_lmv_exp, oa, NULL, NULL);
                         if (rc) {
-                               obdo_free(oa);
+                                CERROR("can't create remote inode: %d\n", rc);
+                                DEBUG_REQ(D_ERROR, req, "parent "LPU64"/%u name %s mode %o",
+                                          rec->ur_fid1->id, rec->ur_fid1->generation,
+                                          rec->ur_name, rec->ur_mode);
+                                obdo_free(oa);
                                 GOTO(cleanup, rc);
                         }