Whamcloud - gitweb
b=21485 allocate lcd inside obd_init_export()
authorMikhail Pershin <tappro@sun.com>
Thu, 20 May 2010 15:03:05 +0000 (19:03 +0400)
committerRobert Read <robert.read@oracle.com>
Mon, 24 May 2010 16:12:40 +0000 (09:12 -0700)
i=rread
i=zam

lustre/include/lu_target.h
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_recovery.c
lustre/obdfilter/filter.c
lustre/ptlrpc/target.c

index 09c98a6..b13a05f 100644 (file)
@@ -74,5 +74,6 @@ void lut_cb_client(struct lu_target *, __u64, void *, int);
 int lut_init(const struct lu_env *, struct lu_target *,
              struct obd_device *, struct dt_device *);
 void lut_fini(const struct lu_env *, struct lu_target *);
+int lut_client_alloc(struct obd_export *);
 void lut_client_free(struct obd_export *);
 #endif /* __LUSTRE_LU_TARGET_H */
index 0b6b12f..a064ca4 100644 (file)
@@ -4979,7 +4979,6 @@ static int mdt_obd_connect(const struct lu_env *env,
                            void *localdata)
 {
         struct mdt_thread_info *info;
-        struct lsd_client_data *lcd;
         struct obd_export      *lexp;
         struct lustre_handle    conn = { 0 };
         struct mdt_device      *mdt;
@@ -5008,28 +5007,25 @@ static int mdt_obd_connect(const struct lu_env *env,
 
         rc = mdt_connect_internal(lexp, mdt, data);
         if (rc == 0) {
-                OBD_ALLOC_PTR(lcd);
-                if (lcd != NULL) {
-                        struct mdt_thread_info *mti;
-                        mti = lu_context_key_get(&env->le_ctx,
-                                                 &mdt_thread_key);
-                        LASSERT(mti != NULL);
-                        mti->mti_exp = lexp;
-                        memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid);
-                        lexp->exp_target_data.ted_lcd = lcd;
-                        rc = mdt_client_new(env, mdt);
-                        if (rc == 0)
-                                mdt_export_stats_init(obd, lexp, localdata);
-                } else {
-                        rc = -ENOMEM;
-                }
+                struct mdt_thread_info *mti;
+                struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd;
+                LASSERT(lcd);
+                mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+                LASSERT(mti != NULL);
+                mti->mti_exp = lexp;
+                memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid);
+                rc = mdt_client_new(env, mdt);
+                if (rc == 0)
+                        mdt_export_stats_init(obd, lexp, localdata);
         }
 
 out:
-        if (rc != 0)
+        if (rc != 0) {
                 class_disconnect(lexp);
-        else
+                *exp = NULL;
+        } else {
                 *exp = lexp;
+        }
 
         RETURN(rc);
 }
@@ -5176,7 +5172,10 @@ static int mdt_init_export(struct obd_export *exp)
         cfs_spin_lock(&exp->exp_lock);
         exp->exp_connecting = 1;
         cfs_spin_unlock(&exp->exp_lock);
-        rc = ldlm_init_export(exp);
+        rc = lut_client_alloc(exp);
+        if (rc == 0)
+                rc = ldlm_init_export(exp);
+
         if (rc)
                 CERROR("Error %d while initializing export\n", rc);
         RETURN(rc);
@@ -5194,13 +5193,13 @@ static int mdt_destroy_export(struct obd_export *exp)
 
         target_destroy_export(exp);
         ldlm_destroy_export(exp);
+        lut_client_free(exp);
 
         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
         LASSERT(cfs_list_empty(&exp->exp_mdt_data.med_open_head));
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
                 RETURN(0);
 
-        lut_client_free(exp);
         RETURN(rc);
 }
 
index b92727b..6279111 100644 (file)
@@ -241,13 +241,17 @@ static int mdt_clients_data_init(const struct lu_env *env,
                                  unsigned long last_size)
 {
         struct lr_server_data  *lsd = &mdt->mdt_lut.lut_lsd;
-        struct lsd_client_data *lcd = NULL;
+        struct lsd_client_data *lcd;
         struct obd_device      *obd = mdt2obd_dev(mdt);
         loff_t off;
         int cl_idx;
         int rc = 0;
         ENTRY;
 
+        OBD_ALLOC_PTR(lcd);
+        if (!lcd)
+                RETURN(-ENOMEM);
+
         /* When we do a clean MDS shutdown, we save the last_transno into
          * the header.  If we find clients with higher last_transno values
          * then those clients may need recovery done. */
@@ -256,12 +260,7 @@ static int mdt_clients_data_init(const struct lu_env *env,
              off < last_size; cl_idx++) {
                 __u64 last_transno;
                 struct obd_export *exp;
-
-                if (!lcd) {
-                        OBD_ALLOC_PTR(lcd);
-                        if (!lcd)
-                                RETURN(-ENOMEM);
-                }
+                struct mdt_thread_info *mti;
 
                 off = lsd->lsd_client_start +
                         cl_idx * lsd->lsd_client_size;
@@ -295,30 +294,29 @@ static int mdt_clients_data_init(const struct lu_env *env,
                 if (IS_ERR(exp)) {
                         if (PTR_ERR(exp) == -EALREADY) {
                                 /* export already exists, zero out this one */
-                                lcd->lcd_uuid[0] = '\0';
-                        } else {
-                                GOTO(err_client, rc = PTR_ERR(exp));
+                                CERROR("Duplicate export %s!\n", lcd->lcd_uuid);
+                                continue;
                         }
-                } else {
-                        struct mdt_thread_info *mti;
-                        mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
-                        LASSERT(mti != NULL);
-                        mti->mti_exp = exp;
-                        exp->exp_target_data.ted_lcd = lcd;
-                        rc = mdt_client_add(env, mdt, cl_idx);
-                        /* can't fail existing */
-                        LASSERTF(rc == 0, "rc = %d\n", rc);
-                        /* VBR: set export last committed version */
-                        exp->exp_last_committed = last_transno;
-                        lcd = NULL;
-                        cfs_spin_lock(&exp->exp_lock);
-                        exp->exp_connecting = 0;
-                        exp->exp_in_recovery = 0;
-                        cfs_spin_unlock(&exp->exp_lock);
-                        obd->obd_max_recoverable_clients++;
-                        class_export_put(exp);
+                        GOTO(err_client, rc = PTR_ERR(exp));
                 }
 
+                mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+                LASSERT(mti != NULL);
+                mti->mti_exp = exp;
+                /* copy on-disk lcd to the export */
+                *exp->exp_target_data.ted_lcd = *lcd;
+                rc = mdt_client_add(env, mdt, cl_idx);
+                /* can't fail existing */
+                LASSERTF(rc == 0, "rc = %d\n", rc);
+                /* VBR: set export last committed version */
+                exp->exp_last_committed = last_transno;
+                cfs_spin_lock(&exp->exp_lock);
+                exp->exp_connecting = 0;
+                exp->exp_in_recovery = 0;
+                cfs_spin_unlock(&exp->exp_lock);
+                obd->obd_max_recoverable_clients++;
+                class_export_put(exp);
+
                 CDEBUG(D_OTHER, "client at idx %d has last_transno="LPU64"\n",
                        cl_idx, last_transno);
                 /* protect __u64 value update */
@@ -329,8 +327,7 @@ static int mdt_clients_data_init(const struct lu_env *env,
         }
 
 err_client:
-        if (lcd)
-                OBD_FREE_PTR(lcd);
+        OBD_FREE_PTR(lcd);
         RETURN(rc);
 }
 
index f59732d..1b9078a 100644 (file)
@@ -643,14 +643,18 @@ static void filter_fmd_cleanup(struct obd_export *exp)
 
 static int filter_init_export(struct obd_export *exp)
 {
+        int rc;
         cfs_spin_lock_init(&exp->exp_filter_data.fed_lock);
         CFS_INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list);
 
         cfs_spin_lock(&exp->exp_lock);
         exp->exp_connecting = 1;
         cfs_spin_unlock(&exp->exp_lock);
+        rc = lut_client_alloc(exp);
+        if (rc == 0)
+                rc = ldlm_init_export(exp);
 
-        return ldlm_init_export(exp);
+        return rc;
 }
 
 static int filter_free_server_data(struct obd_device_target *obt)
@@ -821,18 +825,16 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                 GOTO(out, rc = 0);
         }
 
+        OBD_ALLOC_PTR(lcd);
+        if (!lcd)
+                GOTO(err_client, rc = -ENOMEM);
+
         for (cl_idx = 0, off = le32_to_cpu(lsd->lsd_client_start);
              off < last_rcvd_size; cl_idx++) {
                 __u64 last_rcvd;
                 struct obd_export *exp;
                 struct filter_export_data *fed;
 
-                if (!lcd) {
-                        OBD_ALLOC_PTR(lcd);
-                        if (!lcd)
-                                GOTO(err_client, rc = -ENOMEM);
-                }
-
                 /* Don't assume off is incremented properly by
                  * fsfilt_read_record(), in case sizeof(*lcd)
                  * isn't the same as lsd->lsd_client_size.  */
@@ -855,55 +857,47 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
 
                 last_rcvd = le64_to_cpu(lcd->lcd_last_transno);
 
+                CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
+                       " srv lr: "LPU64"\n", lcd->lcd_uuid, cl_idx,
+                       last_rcvd, le64_to_cpu(lsd->lsd_last_transno));
+
                 /* These exports are cleaned up by filter_disconnect(), so they
                  * need to be set up like real exports as filter_connect() does.
                  */
                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
-
-                CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
-                       " srv lr: "LPU64"\n", lcd->lcd_uuid, cl_idx,
-                       last_rcvd, le64_to_cpu(lsd->lsd_last_transno));
                 if (IS_ERR(exp)) {
                         if (PTR_ERR(exp) == -EALREADY) {
                                 /* export already exists, zero out this one */
-                                CERROR("Zeroing out duplicate export due to "
-                                       "bug 10479.\n");
-                                lcd->lcd_uuid[0] = '\0';
-                        } else {
-                                GOTO(err_client, rc = PTR_ERR(exp));
+                                CERROR("Duplicate export %s!\n", lcd->lcd_uuid);
+                                continue;
                         }
-                } else {
-                        fed = &exp->exp_filter_data;
-                        fed->fed_ted.ted_lcd = lcd;
-                        fed->fed_group = 0; /* will be assigned at connect */
-                        filter_export_stats_init(obd, exp, NULL);
-                        rc = filter_client_add(obd, exp, cl_idx);
-                        /* can't fail for existing client */
-                        LASSERTF(rc == 0, "rc = %d\n", rc);
-
-                        /* VBR: set export last committed */
-                        exp->exp_last_committed = last_rcvd;
-                        cfs_spin_lock(&exp->exp_lock);
-                        exp->exp_connecting = 0;
-                        exp->exp_in_recovery = 0;
-                        cfs_spin_unlock(&exp->exp_lock);
-                        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
-                        obd->obd_max_recoverable_clients++;
-                        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
-                        lcd = NULL;
-                        class_export_put(exp);
+                        OBD_FREE_PTR(lcd);
+                        GOTO(err_client, rc = PTR_ERR(exp));
                 }
 
-                /* Need to check last_rcvd even for duplicated exports. */
-                CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
-                       cl_idx, last_rcvd);
+                fed = &exp->exp_filter_data;
+                *fed->fed_ted.ted_lcd = *lcd;
+                fed->fed_group = 0; /* will be assigned at connect */
+                filter_export_stats_init(obd, exp, NULL);
+                rc = filter_client_add(obd, exp, cl_idx);
+                /* can't fail for existing client */
+                LASSERTF(rc == 0, "rc = %d\n", rc);
+
+                /* VBR: set export last committed */
+                exp->exp_last_committed = last_rcvd;
+                cfs_spin_lock(&exp->exp_lock);
+                exp->exp_connecting = 0;
+                exp->exp_in_recovery = 0;
+                cfs_spin_unlock(&exp->exp_lock);
+                cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+                obd->obd_max_recoverable_clients++;
+                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                class_export_put(exp);
 
                 if (last_rcvd > le64_to_cpu(lsd->lsd_last_transno))
                         lsd->lsd_last_transno = cpu_to_le64(last_rcvd);
         }
-
-        if (lcd)
-                OBD_FREE_PTR(lcd);
+        OBD_FREE_PTR(lcd);
 
         obd->obd_last_committed = le64_to_cpu(lsd->lsd_last_transno);
 out:
@@ -2775,8 +2769,6 @@ static int filter_connect(const struct lu_env *env,
         struct lvfs_run_ctxt saved;
         struct lustre_handle conn = { 0 };
         struct obd_export *lexp;
-        struct tg_export_data *ted;
-        struct lsd_client_data *lcd = NULL;
         __u32 group;
         int rc;
         ENTRY;
@@ -2790,22 +2782,15 @@ static int filter_connect(const struct lu_env *env,
         lexp = class_conn2export(&conn);
         LASSERT(lexp != NULL);
 
-        ted = &lexp->exp_target_data;
-
         rc = filter_connect_internal(lexp, data, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
         filter_export_stats_init(obd, lexp, localdata);
         if (obd->obd_replayable) {
-                OBD_ALLOC(lcd, sizeof(*lcd));
-                if (!lcd) {
-                        CERROR("filter: out of memory for client data\n");
-                        GOTO(cleanup, rc = -ENOMEM);
-                }
-
+                struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd;
+                LASSERT(lcd);
                 memcpy(lcd->lcd_uuid, cluuid, sizeof(lcd->lcd_uuid));
-                ted->ted_lcd = lcd;
                 rc = filter_client_add(obd, lexp, -1);
                 if (rc)
                         GOTO(cleanup, rc);
@@ -2960,12 +2945,11 @@ static int filter_destroy_export(struct obd_export *exp)
 
         target_destroy_export(exp);
         ldlm_destroy_export(exp);
+        lut_client_free(exp);
 
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
                 RETURN(0);
 
-        lut_client_free(exp);
-
         if (!exp->exp_obd->obd_replayable)
                 fsfilt_sync(exp->exp_obd, exp->exp_obd->u.obt.obt_sb);
 
index 94c6e3b..8b9f581 100644 (file)
@@ -171,6 +171,20 @@ static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
 }
 
 /**
+ * Allocate in-memory data for client slot related to export.
+ */
+int lut_client_alloc(struct obd_export *exp)
+{
+        OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
+        if (exp->exp_target_data.ted_lcd == NULL)
+                RETURN(-ENOMEM);
+        /* Mark that slot is not yet valid, 0 doesn't work here */
+        exp->exp_target_data.ted_lr_idx = -1;
+        RETURN(0);
+}
+EXPORT_SYMBOL(lut_client_alloc);
+
+/**
  * Free in-memory data for client slot related to export.
  */
 void lut_client_free(struct obd_export *exp)
@@ -180,6 +194,10 @@ void lut_client_free(struct obd_export *exp)
 
         OBD_FREE_PTR(ted->ted_lcd);
         ted->ted_lcd = NULL;
+
+        /* Slot may be not yet assigned */
+        if (ted->ted_lr_idx < 0)
+                return;
         /* Clear bit when lcd is freed */
         spin_lock(&lut->lut_client_bitmap_lock);
         if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {