Whamcloud - gitweb
- fixes about using lmv_get_export(), fixes possible memory leaks;
authoryury <yury>
Mon, 9 Oct 2006 14:00:48 +0000 (14:00 +0000)
committeryury <yury>
Mon, 9 Oct 2006 14:00:48 +0000 (14:00 +0000)
- fixed bug with split. If after split repeat intent_lock() return error we have to free req and zero it out to not confuse client.

lustre/cmm/cmm_object.c
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_locks.c
lustre/mdt/mdt_reint.c
lustre/tests/sanity-lmv.sh

index 953845e..a126289 100644 (file)
@@ -764,7 +764,6 @@ static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
  * MDS involved and to avoid the RPC inside transaction.
  * 2) only one RPC can be sent - also due to epoch negotiation.
  * For more details see rollback HLD/DLD.
- *
  */
 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
                       const char *child_name, struct md_object *mo_c,
index 6ecfd08..408716d 100644 (file)
@@ -77,6 +77,10 @@ int lmv_intent_remote(struct obd_export *exp, void *lmm,
         if (!(body->valid & OBD_MD_MDS))
                 RETURN(0);
 
+        tgt_exp = lmv_get_export(lmv, &body->fid1);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
+
         /*
          * oh, MDS reports that this is remote inode case i.e. we have to ask
          * for real attrs on another MDS.
@@ -107,10 +111,6 @@ int lmv_intent_remote(struct obd_export *exp, void *lmm,
 
         op_data->fid1 = body->fid1;
 
-        tgt_exp = lmv_get_export(lmv, &body->fid1);
-        if (IS_ERR(tgt_exp))
-                RETURN(PTR_ERR(tgt_exp));
-
         rc = md_intent_lock(tgt_exp, op_data, lmm, lmmsize, it, flags,
                             &req, cb_blocking, extra_lock_flags);
 
@@ -242,12 +242,19 @@ repeat:
                 rc = lmv_handle_split(exp, &rpid);
                 if (rc == 0) {
                         ptlrpc_req_finished(*reqp);
+
+                        /* 
+                         * Zero out reqp to not confuse client. In many cases it
+                         * tries to free req even if error is returned.
+                         */
+                        it->d.lustre.it_data = 0;
+                        *reqp = NULL;
+
                        /* We shoudld reallocate the FID for the object */
                         rc = lmv_alloc_fid_for_split(obd, &rpid, op_data,
                                                      &sop_data->fid2);
                         if (rc)
                                 GOTO(out_free_sop_data, rc);
-                        /* client switches to new sequence, setup fld */
                         goto repeat;
                 }
         }
@@ -306,7 +313,7 @@ repeat:
         }
 
         if (obj) {
-                /* this is split dir and we'd want to get attrs */
+                /* This is split dir and we'd want to get attrs. */
                 CDEBUG(D_OTHER, "attrs from slaves for "DFID"\n",
                        PFID(&body->fid1));
 
@@ -711,7 +718,7 @@ repeat:
                  * reason llite calls lookup, not revalidate.
                  */
                 CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n",
-                      PFID(&rpid));
+                       PFID(&rpid));
                 LASSERT(*reqp == NULL);
                 GOTO(out_free_sop_data, rc);
         }
@@ -719,7 +726,7 @@ repeat:
         if (rc == 0 && *reqp == NULL) {
                 /* once again, we're asked for lookup, not revalidate */
                 CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n",
-                      PFID(&rpid));
+                       PFID(&rpid));
                 GOTO(out_free_sop_data, rc);
         }
 
@@ -838,11 +845,13 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
         if (op_data == NULL)
                 RETURN(-ENOMEM);
 
-        /* we have to loop over the subobjects, check validity and update them
+        /*
+         * We have to loop over the subobjects, check validity and update them
          * from MDSs if needed. it's very useful that we need not to update all
          * the fields. say, common fields (that are equal on all the subojects
          * need not to be update, another fields (i_size, for example) are
-         * cached all the time */
+         * cached all the time.
+         */
         obj = lmv_obj_grab(obd, mid);
         LASSERT(obj != NULL);
 
@@ -900,7 +909,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                 rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, &req, cb,
                                     extra_lock_flags);
 
-                lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
+                lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
                 if (rc > 0 && req == NULL) {
                         /* nice, this slave is valid */
                         LASSERT(req == NULL);
@@ -908,10 +917,9 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                         goto release_lock;
                 }
 
-                if (rc < 0) {
-                        /* error during revalidation */
+                if (rc < 0)
                         GOTO(cleanup, rc);
-                }
+                
                 if (master) {
                         LASSERT(master_valid == 0);
                         /* save lock on master to be returned to the caller */
@@ -971,7 +979,7 @@ release_lock:
 
                 if (mreq == NULL) {
                         /*
-                         * very important to maintain mds num the same because
+                         * Very important to maintain mds num the same because
                          * of revalidation. mreq == NULL means that caller has
                          * no reply and the only attr we can return is size.
                          */
index 75c3084..55eaa02 100644 (file)
@@ -1315,7 +1315,7 @@ repeat:
 
                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
                                    op_data->name, op_data->namelen);
-                op_data->fid1      = obj->lo_inodes[mds].li_fid;
+                op_data->fid1 = obj->lo_inodes[mds].li_fid;
                 lmv_obj_put(obj);
         }
 
@@ -1466,7 +1466,11 @@ lmv_enqueue_remote(struct obd_export *exp, int lock_type,
         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID" -> "DFID"\n",
                LL_IT2STR(it), PFID(&op_data->fid1), PFID(&body->fid1));
 
-        /* we got LOOKUP lock, but we really need attrs */
+        tgt_exp = lmv_get_export(lmv, &body->fid1);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
+
+        /* We got LOOKUP lock, but we really need attrs */
         pmode = it->d.lustre.it_lock_mode;
         LASSERT(pmode != 0);
         memcpy(&plock, lockh, sizeof(plock));
@@ -1483,10 +1487,6 @@ lmv_enqueue_remote(struct obd_export *exp, int lock_type,
         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
         ptlrpc_req_finished(req);
 
-        tgt_exp = lmv_get_export(lmv, &rdata->fid1);
-        if (IS_ERR(tgt_exp))
-                GOTO(out_free_rdata, rc = PTR_ERR(tgt_exp));
-
         rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, rdata,
                         lockh, lmm, lmmsize, cb_compl, cb_blocking,
                         cb_data, extra_lock_flags);
index b44aba5..30edcfe 100644 (file)
@@ -532,7 +532,7 @@ int mdc_enqueue(struct obd_export *exp,
                 if ((body->valid & OBD_MD_FLDIREA) != 0) {
                         if (body->eadatasize) {
                                 eadata = lustre_swab_repbuf(req, reply_off++,
-                                                body->eadatasize, NULL);
+                                                            body->eadatasize, NULL);
                                 if (eadata == NULL) {
                                         CERROR ("Missing/short eadata\n");
                                         RETURN (-EPROTO);
@@ -669,9 +669,10 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
         LASSERT(it);
 
         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
-                           ", intent: %s flags %#o\n",
-               op_data->namelen, op_data->name, PFID(&op_data->fid2), 
-               PFID(&op_data->fid1), ldlm_it2str(it->it_op), it->it_flags);
+               ", intent: %s flags %#o\n", op_data->namelen,
+               op_data->name, PFID(&op_data->fid2), 
+               PFID(&op_data->fid1), ldlm_it2str(it->it_op),
+               it->it_flags);
 
         if (fid_is_sane((struct lu_fid *)&op_data->fid2) &&
             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
@@ -703,13 +704,13 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
                         mode = LCK_CW;
                         rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
                                              LDLM_FL_BLOCK_GRANTED, &res_id,
-                                             LDLM_IBITS, &policy,LCK_CW,&lockh);
+                                             LDLM_IBITS, &policy, LCK_CW, &lockh);
                 }
                 if (!rc) {
                         mode = LCK_PR;
                         rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
                                              LDLM_FL_BLOCK_GRANTED, &res_id,
-                                             LDLM_IBITS, &policy,LCK_PR,&lockh);
+                                             LDLM_IBITS, &policy, LCK_PR, &lockh);
                 }
                 if (rc) {
                         memcpy(&it->d.lustre.it_lock_handle, &lockh,
index ae488e5..8af3f53 100644 (file)
@@ -369,9 +369,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                 GOTO(out_unlock_parent, rc = -EINVAL);
 
         if (strlen(rr->rr_name) == 0) {
-                /* remote partial operation */
+                /* MDT holding directory name ask to remove local inode. */
                 rc = mo_ref_del(info->mti_env, mdt_object_child(mp), ma);
-                
                 mdt_handle_last_unlink(info, mp, ma);
                 GOTO(out_unlock_parent, rc);
         }
@@ -408,7 +407,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
 
         mdt_handle_last_unlink(info, mc, ma);
 
-        GOTO(out_unlock_child, rc);
+        EXIT;
 out_unlock_child:
         mdt_object_unlock(info, mc, child_lh, rc);
 out_put_child:
@@ -431,7 +430,6 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         struct mdt_lock_handle  *lhs;
         struct mdt_lock_handle  *lhp;
         int rc;
-
         ENTRY;
 
         DEBUG_REQ(D_INODE, req, "link original "DFID" to "DFID" %s",
@@ -449,11 +447,12 @@ static int mdt_reint_link(struct mdt_thread_info *info,
                 RETURN(PTR_ERR(ms));
 
         if (strlen(rr->rr_name) == 0) {
-                /* remote partial operation */
+                /* MDT holding name ask to add ref. */
                 mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
                 rc = mo_ref_add(info->mti_env, mdt_object_child(ms));
                 GOTO(out_unlock_source, rc);
         }
+        
         /*step 2: find & lock the target parent dir*/
         lhp = &info->mti_lh[MDT_LH_CHILD];
         lhp->mlh_mode = LCK_EX;
@@ -463,14 +462,13 @@ static int mdt_reint_link(struct mdt_thread_info *info,
                 GOTO(out_unlock_source, rc = PTR_ERR(mp));
 
         /* step 4: link it */
-
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_LINK_WRITE);
 
         rc = mdo_link(info->mti_env, mdt_object_child(mp),
                       mdt_object_child(ms), rr->rr_name, ma);
-        GOTO(out_unlock_target, rc);
 
+        EXIT;
 out_unlock_target:
         mdt_object_unlock_put(info, mp, lhp, rc);
 out_unlock_source:
index 1a20243..0a1b351 100644 (file)
@@ -230,7 +230,7 @@ umask 077
 
 test_1a() {
        mkdir $DIR/1a0 || error 
-       createmany -o $DIR/1a0/f 4000
+       createmany -o $DIR/1a0/f 4000 || error
        rmdir $DIR/1a0 && error
        rm -rf $DIR/1a0 || error
 }
@@ -238,16 +238,16 @@ run_test 1a " remove splitted dir ============================="
 
 test_1b() {
        mkdir $DIR/1b0 || error
-       createmany -o $DIR/1b0/f 4000
+       createmany -o $DIR/1b0/f 4000 || error
        for file in $DIR/1b0/*; do rm -f $file; done
        NUM=`ls $DIR/1b0 | wc -l`
        if [ $NUM -ne 0 ] ; then
                echo "dir must be empty"
                error
        fi
-       touch $DIR/1b0/file0
-       touch $DIR/1b0/file1
-       touch $DIR/1b0/file2
+       touch $DIR/1b0/file0 || error
+       touch $DIR/1b0/file1 || error
+       touch $DIR/1b0/file2 || error
 
        echo "3 files left"
        rmdir $DIR/1b0 && error
@@ -268,16 +268,16 @@ run_test 1b " remove splitted dir ============================="
 
 test_1c() {
        mkdir $DIR/1b1 || error
-       createmany -o $DIR/1b1/f 4000
+       createmany -o $DIR/1b1/f 4000 || error
        for file in $DIR/1b1/*; do rm -f $file; done
        NUM=`ls $DIR/1b1 | wc -l`
        if [ $NUM -ne 0 ] ; then
                echo "dir must be empty"
                error
        fi
-       touch $DIR/1b1/file0
-       touch $DIR/1b1/file1
-       touch $DIR/1b1/file2
+       touch $DIR/1b1/file0 || error
+       touch $DIR/1b1/file1 || error
+       touch $DIR/1b1/file2 || error
 
        ls $DIR/1b1/
        log "3 files left"
@@ -302,7 +302,7 @@ run_test 1c " remove splitted cross-node dir ============================="
 
 test_2a() {
        mkdir $DIR/2a0 || error 
-       createmany -o $DIR/2a0/f 5000
+       createmany -o $DIR/2a0/f 5000 || error
        NUM=`ls $DIR/2a0 | wc -l`
        echo "found $NUM files"
        if [ $NUM -ne 5000 ]; then
@@ -315,7 +315,7 @@ run_test 2a " list splitted dir ============================="
 
 test_2b() {
        mkdir $DIR/2b1 || error 
-       createmany -o $DIR/2b1/f 5000
+       createmany -o $DIR/2b1/f 5000 || error
        $CLEAN
        $START
        statmany -l $DIR/2b1/f 5000 5000 || error