Whamcloud - gitweb
b=21485 Keep in-memory client data until export destroy
[fs/lustre-release.git] / lustre / ptlrpc / target.c
index 4704c20..2b2d522 100644 (file)
@@ -41,6 +41,7 @@
 
 #include <obd.h>
 #include <lustre_fsfilt.h>
+
 /**
  * Update client data in last_rcvd file. An obd API
  */
@@ -48,6 +49,7 @@ static int obt_client_data_update(struct obd_export *exp)
 {
         struct lu_export_data *led = &exp->exp_target_data;
         struct obd_device_target *obt = &exp->exp_obd->u.obt;
+        struct lu_target *lut = class_exp2tgt(exp);
         loff_t off = led->led_lr_off;
         int rc = 0;
 
@@ -56,7 +58,7 @@ static int obt_client_data_update(struct obd_export *exp)
 
         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
                led->led_lr_idx, le32_to_cpu(led->led_lcd->lcd_last_epoch),
-               le32_to_cpu(obt->obt_lsd->lsd_start_epoch));
+               le32_to_cpu(lut->lut_lsd.lsd_start_epoch));
 
         return rc;
 }
@@ -64,21 +66,22 @@ static int obt_client_data_update(struct obd_export *exp)
 /**
  * Update server data in last_rcvd file. An obd API
  */
-int obt_server_data_update(struct obd_device *obd, int force_sync)
+int obt_server_data_update(struct lu_target *lut, int force_sync)
 {
-        struct obd_device_target *obt = &obd->u.obt;
+        struct obd_device_target *obt = &lut->lut_obd->u.obt;
         loff_t off = 0;
         int rc;
         ENTRY;
 
         CDEBUG(D_SUPER,
                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
-               obt->obt_lsd->lsd_uuid,
-               le64_to_cpu(obt->obt_lsd->lsd_mount_count),
-               le64_to_cpu(obt->obt_lsd->lsd_last_transno));
+               lut->lut_lsd.lsd_uuid,
+               le64_to_cpu(lut->lut_lsd.lsd_mount_count),
+               le64_to_cpu(lut->lut_lsd.lsd_last_transno));
 
-        rc = fsfilt_write_record(obd, obt->obt_rcvd_filp, obt->obt_lsd,
-                                 sizeof(*obt->obt_lsd), &off, force_sync);
+        rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp,
+                                 &lut->lut_lsd, sizeof(lut->lut_lsd),
+                                 &off, force_sync);
         if (rc)
                 CERROR("error writing lr_server_data: rc = %d\n", rc);
 
@@ -91,32 +94,32 @@ int obt_server_data_update(struct obd_device *obd, int force_sync)
 void obt_client_epoch_update(struct obd_export *exp)
 {
         struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
-        struct obd_device_target *obt = &exp->exp_obd->u.obt;
+        struct lu_target *lut = class_exp2tgt(exp);
 
         /** VBR: set client last_epoch to current epoch */
         if (le32_to_cpu(lcd->lcd_last_epoch) >=
-            le32_to_cpu(obt->obt_lsd->lsd_start_epoch))
+            le32_to_cpu(lut->lut_lsd.lsd_start_epoch))
                 return;
-        lcd->lcd_last_epoch = obt->obt_lsd->lsd_start_epoch;
+        lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
         obt_client_data_update(exp);
 }
 
 /**
  * Increment server epoch. An obd API
  */
-static void obt_boot_epoch_update(struct obd_device *obd)
+static void obt_boot_epoch_update(struct lu_target *lut)
 {
+        struct obd_device *obd = lut->lut_obd;
         __u32 start_epoch;
-        struct obd_device_target *obt = &obd->u.obt;
         struct ptlrpc_request *req;
         cfs_list_t client_list;
 
-        cfs_spin_lock(&obt->obt_translock);
-        start_epoch = lr_epoch(le64_to_cpu(obt->obt_last_transno)) + 1;
-        obt->obt_last_transno = cpu_to_le64((__u64)start_epoch <<
+        cfs_spin_lock(&lut->lut_translock);
+        start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1;
+        lut->lut_last_transno = cpu_to_le64((__u64)start_epoch <<
                                             LR_EPOCH_BITS);
-        obt->obt_lsd->lsd_start_epoch = cpu_to_le32(start_epoch);
-        cfs_spin_unlock(&obt->obt_translock);
+        lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch);
+        cfs_spin_unlock(&lut->lut_translock);
 
         CFS_INIT_LIST_HEAD(&client_list);
         cfs_spin_lock_bh(&obd->obd_processing_task_lock);
@@ -135,7 +138,7 @@ static void obt_boot_epoch_update(struct obd_device *obd)
         cfs_spin_lock_bh(&obd->obd_processing_task_lock);
         cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
         cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
-        obt_server_data_update(obd, 1);
+        obt_server_data_update(lut, 1);
 }
 
 /**
@@ -168,12 +171,33 @@ static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
 }
 
 /**
+ * Free in-memory data for client slot related to export.
+ */
+void lut_client_free(struct obd_export *exp)
+{
+        struct lu_export_data *led = &exp->exp_target_data;
+        struct lu_target *lut = class_exp2tgt(exp);
+
+        OBD_FREE_PTR(led->led_lcd);
+        led->led_lcd = NULL;
+        /* Clear bit when lcd is freed */
+        spin_lock(&lut->lut_client_bitmap_lock);
+        if (!test_and_clear_bit(led->led_lr_idx, lut->lut_client_bitmap)) {
+                CERROR("%s: client %u bit already clear in bitmap\n",
+                       exp->exp_obd->obd_name, led->led_lr_idx);
+                LBUG();
+        }
+        spin_unlock(&lut->lut_client_bitmap_lock);
+}
+EXPORT_SYMBOL(lut_client_free);
+
+/**
  * Update client data in last_rcvd
  */
-int lut_client_data_update(const struct lu_env *env, struct lu_target *lut,
-                            struct obd_export *exp)
+int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
 {
         struct lu_export_data *led = &exp->exp_target_data;
+        struct lu_target *lut = class_exp2tgt(exp);
         struct lsd_client_data tmp_lcd;
         loff_t tmp_off = led->led_lr_off;
         struct lu_buf tmp_buf = {
@@ -219,17 +243,17 @@ static int lut_server_data_update(const struct lu_env *env,
         RETURN(rc);
 }
 
-void lut_client_epoch_update(const struct lu_env *env, struct lu_target *lut,
-                             struct obd_export *exp)
+void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
 {
         struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
+        struct lu_target *lut = class_exp2tgt(exp);
 
         LASSERT(lut->lut_bottom);
         /** VBR: set client last_epoch to current epoch */
         if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
                 return;
         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
-        lut_client_data_update(env, lut, exp);
+        lut_client_data_update(env, exp);
 }
 
 /**
@@ -247,7 +271,7 @@ void lut_boot_epoch_update(struct lu_target *lut)
                 return;
         /** Increase server epoch after recovery */
         if (lut->lut_bottom == NULL)
-                return obt_boot_epoch_update(lut->lut_obd);
+                return obt_boot_epoch_update(lut);
 
         rc = lu_env_init(&env, LCT_DT_THREAD);
         if (rc) {
@@ -277,7 +301,7 @@ void lut_boot_epoch_update(struct lu_target *lut)
         cfs_list_for_each_entry(req, &client_list, rq_list) {
                 LASSERT(!req->rq_export->exp_delayed);
                 if (!req->rq_export->exp_vbr_failed)
-                        lut_client_epoch_update(&env, lut, req->rq_export);
+                        lut_client_epoch_update(&env, req->rq_export);
         }
         /** return list back at once */
         cfs_spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
@@ -332,13 +356,19 @@ int lut_init(const struct lu_env *env, struct lu_target *lut,
         int rc = 0;
         ENTRY;
 
+        LASSERT(lut);
+        LASSERT(obd);
         lut->lut_obd = obd;
         lut->lut_bottom = dt;
         lut->lut_last_rcvd = NULL;
+        obd->u.obt.obt_lut = lut;
 
         cfs_spin_lock_init(&lut->lut_translock);
         cfs_spin_lock_init(&lut->lut_client_bitmap_lock);
-        cfs_spin_lock_init(&lut->lut_trans_table_lock);
+
+        OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
+        if (lut->lut_client_bitmap == NULL)
+                RETURN(-ENOMEM);
 
         /** obdfilter has no lu_device stack yet */
         if (dt == NULL)
@@ -347,6 +377,8 @@ int lut_init(const struct lu_env *env, struct lu_target *lut,
         if (!IS_ERR(o)) {
                 lut->lut_last_rcvd = o;
         } else {
+                OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
+                lut->lut_client_bitmap = NULL;
                 rc = PTR_ERR(o);
                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
         }
@@ -358,9 +390,15 @@ EXPORT_SYMBOL(lut_init);
 void lut_fini(const struct lu_env *env, struct lu_target *lut)
 {
         ENTRY;
-        if (lut->lut_last_rcvd)
+
+        if (lut->lut_client_bitmap) {
+                OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
+                lut->lut_client_bitmap = NULL;
+        }
+        if (lut->lut_last_rcvd) {
                 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
-        lut->lut_last_rcvd = NULL;
+                lut->lut_last_rcvd = NULL;
+        }
         EXIT;
 }
 EXPORT_SYMBOL(lut_fini);