Whamcloud - gitweb
- add flag no_lov_create in create_spec
authortappro <tappro>
Wed, 13 Sep 2006 16:39:36 +0000 (16:39 +0000)
committertappro <tappro>
Wed, 13 Sep 2006 16:39:36 +0000 (16:39 +0000)
- update mdd lov related code to handle no_lov_create and don't try to create
  objects during replay open|create
- call ldo_recovery_complete() at the end of mdt_init0()
- set mdd obd to recovering state initially and reset recovering flag from
  mdd_recovery_complete()

lustre/include/lustre_mds.h
lustre/include/md_object.h
lustre/mdd/mdd_handler.c
lustre/mdd/mdd_lov.c
lustre/mds/handler.c
lustre/mds/mds_lov.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_recovery.c

index 8eee98d..cfdcd10 100644 (file)
@@ -87,6 +87,9 @@ int mds_log_op_setattr(struct obd_device *obd, struct inode *inode,
 
 int mds_lov_write_objids(struct obd_device *obd);
 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids);
+void mds_objids_from_lmm(obd_id *, struct lov_mds_md *, struct lov_desc *);
+int mds_postrecov(struct obd_device *);
+
 /* ioctls for trying requests */
 #define IOC_REQUEST_TYPE                   'f'
 #define IOC_REQUEST_MIN_NR                 30
index c14434b..74de51e 100644 (file)
@@ -77,6 +77,8 @@ struct md_create_spec {
                 const struct lu_fid      *sp_pfid;
                 /* eadata for regular files */
                 struct md_spec_reg {
+                        /* lov objs exist already */
+                        int no_lov_create;
                         const void *eadata;
                         int  eadatalen;
                 } sp_ea;
index 01099fd..50a15d5 100644 (file)
@@ -595,8 +595,15 @@ static int mdd_recovery_complete(const struct lu_context *ctxt,
                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
                    OBD_NOTIFY_SYNC, NULL);
 */
+        LASSERT(mdd);
+        LASSERT(mdd->mdd_obd_dev);
+
+        mdd->mdd_obd_dev->obd_recovering = 0;
+        //mdd->mdd_obd_dev->obd_type->typ_dt_ops->
+        mds_postrecov(mdd->mdd_obd_dev);
         /* TODO: orphans handling */
         rc = next->ld_ops->ldo_recovery_complete(ctxt, next);
+        
         RETURN(rc);
 }
 
@@ -1808,7 +1815,14 @@ static int mdd_create(const struct lu_context *ctxt, struct md_object *pobj,
                 GOTO(cleanup, rc);
 
         inserted = 1;
-        rc = mdd_lov_set_md(ctxt, mdd_pobj, son, lmm, lmm_size, handle, 0);
+        /* replay creates has objects already */
+        if (spec->u.sp_ea.no_lov_create)
+                rc = mdd_lov_set_md(ctxt, mdd_pobj, son,
+                                    (struct lov_mds_md *)spec->u.sp_ea.eadata,
+                                    spec->u.sp_ea.eadatalen, handle, 0);
+        else
+                rc = mdd_lov_set_md(ctxt, mdd_pobj, son, lmm,
+                                    lmm_size, handle, 0);
         if (rc) {
                 CERROR("error on stripe info copy %d \n", rc);
                 GOTO(cleanup, rc);
index ac7099e..b80fb7b 100644 (file)
@@ -124,6 +124,7 @@ int mdd_init_obd(const struct lu_context *ctxt, struct mdd_device *mdd,
                 LBUG();
         }
         obd->u.mds.mds_id = index;
+        obd->obd_recovering = 1;
         rc = class_setup(obd, lcfg);
         if (rc)
                 GOTO(class_detach, rc);
@@ -133,7 +134,6 @@ int mdd_init_obd(const struct lu_context *ctxt, struct mdd_device *mdd,
          */
         obd->obd_upcall.onu_owner = mdd;
         obd->obd_upcall.onu_upcall = mdd_lov_update;
-
         mdd->mdd_obd_dev = obd;
 class_detach:
         if (rc)
@@ -340,6 +340,15 @@ static void mdd_lov_objid_update(const struct lu_context *ctxt,
         mds_lov_update_objids(mdd->mdd_obd_dev, info->mti_oti.oti_objid);
 }
 
+static void mdd_lov_objid_from_lmm(const struct lu_context *ctx,
+                                   struct mdd_device *mdd, 
+                                   struct lov_mds_md *lmm)
+{
+        struct mds_obd *mds = &mdd->mdd_obd_dev->u.mds;
+        struct mdd_thread_info *info = mdd_ctx_info(ctx);
+        mds_objids_from_lmm(info->mti_oti.oti_objid, lmm, &mds->mds_lov_desc);
+}
+
 static void mdd_lov_objid_free(const struct lu_context *ctxt,
                                struct mdd_device *mdd)
 {
@@ -382,12 +391,24 @@ int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd,
                         !(create_flags & FMODE_WRITE))
                 RETURN(0);
 
-        OBD_FAIL_RETURN((OBD_FAIL_MDS_ALLOC_OBDO), -ENOMEM);
+        oti_init(oti, NULL);
+        rc = mdd_lov_objid_alloc(ctxt, mdd);
+        if (rc != 0)
+                RETURN(rc);
+
+        /* replay case, should get lov from eadata */
+        if (spec->u.sp_ea.no_lov_create != 0) {
+                mdd_lov_objid_from_lmm(ctxt, mdd, (struct lov_mds_md *)eadata);
+                RETURN(0);
+        }
+        
+        if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_ALLOC_OBDO))
+                        GOTO(out_ids, rc = -ENOMEM);
 
         LASSERT(lov_exp != NULL);
         oa = obdo_alloc();
         if (oa == NULL)
-                RETURN(-ENOMEM);
+                GOTO(out_ids, rc = -ENOMEM);
 
         oa->o_uid = 0; /* must have 0 uid / gid on OST */
         oa->o_gid = 0;
@@ -398,11 +419,6 @@ int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd,
                 OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP;
         oa->o_size = 0;
         
-        oti_init(oti, NULL);
-        rc = mdd_lov_objid_alloc(ctxt, mdd);
-        if (rc != 0)
-                GOTO(out_oa, rc);
-
         if (!(create_flags & MDS_OPEN_HAS_OBJS)) {
                 if (create_flags & MDS_OPEN_HAS_EA) {
                         LASSERT(eadata != NULL);
@@ -503,6 +519,7 @@ int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd,
 out_oa:
         oti_free_cookies(oti);
         obdo_free(oa);
+out_ids:
         if (lsm)
                 obd_free_memmd(lov_exp, &lsm);
         if (rc != 0) 
index 994e3ad..0107b8a 100644 (file)
@@ -2157,6 +2157,7 @@ int mds_postrecov(struct obd_device *obd)
 out:
         RETURN(rc);
 }
+EXPORT_SYMBOL(mds_postrecov);
 
 /* We need to be able to stop an mds_lov_synchronize */
 static int mds_lov_early_clean(struct obd_device *obd)
@@ -2901,7 +2902,7 @@ static struct obd_ops mds_cmd_obd_ops = {
         .o_llog_init       = mds_llog_init,
         .o_llog_finish     = mds_llog_finish,
         .o_notify          = mds_notify,
-     //   .o_health_check    = mds_cmd_health_check,
+        //   .o_health_check    = mds_cmd_health_check,
 };
 
 static int __init mds_cmd_init(void)
index cb5330c..cc92186 100644 (file)
@@ -389,8 +389,10 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
          * set_nextid().  The class driver can help us here, because
          * it can use the obd_recovering flag to determine when the
          * the OBD is full available. */
+        /* MDD device will care about that
         if (!obd->obd_recovering)
                 rc = mds_postrecov(obd);
+         */
         RETURN(rc);
 
 err_reg:
@@ -874,4 +876,4 @@ void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
                         le64_to_cpu(lmm->lmm_objects[i].l_object_id);
         }
 }
-
+EXPORT_SYMBOL(mds_objids_from_lmm);
index 265a610..f5bd241 100644 (file)
@@ -2686,6 +2686,8 @@ static void mdt_fini(const struct lu_context *ctx, struct mdt_device *m)
         EXIT;
 }
 
+int mdt_postrecov(const struct lu_context *, struct mdt_device *);
+
 static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
 {
@@ -2768,6 +2770,8 @@ static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
         rc = mdt_fs_setup(ctx, m);
         if (rc)
                 GOTO(err_stop_service, rc);
+        if(obd->obd_recovering == 0)
+                mdt_postrecov(ctx, m);
         RETURN(0);
 
 err_stop_service:
@@ -3161,22 +3165,28 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         RETURN(rc);
 }
 
-int mdt_postrecov(struct obd_device *obd)
+int mdt_postrecov(const struct lu_context *ctx, struct mdt_device *mdt)
 {
-        struct lu_context ctxt;
-        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
         struct lu_device *ld = md2lu_dev(mdt->mdt_child);
         int rc;
         ENTRY;
+        rc = ld->ld_ops->ldo_recovery_complete(ctx, ld);
+        RETURN(rc);
+}
+
+int mdt_obd_postrecov(struct obd_device *obd)
+{
+        struct lu_context ctxt;
+        int rc;
 
         rc = lu_context_init(&ctxt, LCT_MD_THREAD);
         if (rc)
                 RETURN(rc);
         lu_context_enter(&ctxt);
-        rc = ld->ld_ops->ldo_recovery_complete(&ctxt, ld);
+        rc = mdt_postrecov(&ctxt, mdt_dev(obd->obd_lu_dev));
         lu_context_exit(&ctxt);
         lu_context_fini(&ctxt);
-        RETURN(rc);
+        return rc;
 }
 
 static struct obd_ops mdt_obd_device_ops = {
@@ -3187,7 +3197,7 @@ static struct obd_ops mdt_obd_device_ops = {
         .o_init_export    = mdt_init_export,
         .o_destroy_export = mdt_destroy_export,
         .o_iocontrol      = mdt_iocontrol,
-        .o_postrecov      = mdt_postrecov
+        .o_postrecov      = mdt_obd_postrecov
 
 };
 
index 630e255..5bd3d56 100644 (file)
@@ -366,11 +366,14 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
 
         if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) {
                 struct md_create_spec *sp = &info->mti_spec;
+                struct ptlrpc_request *req = mdt_info_req(info);
                 sp->u.sp_ea.eadata = req_capsule_client_get(pill,
                                                             &RMF_EADATA);
                 sp->u.sp_ea.eadatalen = req_capsule_get_size(pill,
                                                              &RMF_EADATA,
                                                              RCL_CLIENT);
+                if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+                        sp->u.sp_ea.no_lov_create = 1;
         }
 
         RETURN(result);
index e9c6868..f0d08ff 100644 (file)
@@ -706,13 +706,10 @@ int mdt_client_del(const struct lu_context *ctx,
         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
                         "zeroing out client idx %u in %s rc %d\n",
                         med->med_lr_idx, LAST_RCVD, rc);
-        
-        if (!test_and_clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
-                CERROR("MDS client %u: bit already clear in bitmap!!\n",
-                       med->med_lr_idx);
-                LBUG();
-        }
-
+       
+        spin_lock(&mdt->mdt_client_bitmap_lock);
+        clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap);
+        spin_unlock(&mdt->mdt_client_bitmap_lock);
         /* Make sure the server's last_transno is up to date. Do this
          * after the client is freed so we know all the client's
          * transactions have been committed. */
@@ -836,7 +833,6 @@ static int mdt_txn_stop_cb(const struct lu_context *ctx,
                 if (mti->mti_transno > mdt->mdt_last_transno)
                         mdt->mdt_last_transno = mti->mti_transno;
         }
-        spin_unlock(&mdt->mdt_transno_lock);
 
         /* sometimes the reply message has not been successfully packed */
         LASSERT(req != NULL && req->rq_repmsg != NULL);
@@ -845,7 +841,6 @@ static int mdt_txn_stop_cb(const struct lu_context *ctx,
         CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
                mti->mti_transno, req->rq_export->exp_obd->obd_last_committed);
 
-        spin_lock(&mdt->mdt_transno_lock);
         req->rq_transno = mti->mti_transno;
         lustre_msg_set_transno(req->rq_repmsg, mti->mti_transno);
         target_committed_to_req(req);