Whamcloud - gitweb
- land b_ver_recov
authortappro <tappro>
Thu, 24 Jul 2008 11:37:40 +0000 (11:37 +0000)
committertappro <tappro>
Thu, 24 Jul 2008 11:37:40 +0000 (11:37 +0000)
lustre/mds/mds_fs.c

index 4b76093..f9886eb 100644 (file)
@@ -97,6 +97,58 @@ static int mds_export_stats_init(struct obd_device *obd,
         return 0;
 }
 
+/* VBR: to determine the delayed client the lcd should be updated for each new
+ * epoch */
+int mds_update_client_epoch(struct obd_export *exp)
+{
+        struct mds_export_data *med = &exp->exp_mds_data;
+        struct mds_obd *mds = &exp->exp_obd->u.mds;
+        struct lvfs_run_ctxt saved;
+        loff_t off = med->med_lr_off;
+        int rc = 0;
+
+        /* VBR: set client last_epoch to current epoch */
+        if (le32_to_cpu(med->med_lcd->lcd_last_epoch) >=
+                        le32_to_cpu(mds->mds_server_data->lsd_start_epoch))
+                return rc;
+
+        med->med_lcd->lcd_last_epoch = mds->mds_server_data->lsd_start_epoch;
+        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+        rc = fsfilt_write_record(exp->exp_obd, mds->mds_rcvd_filp,
+                                 med->med_lcd, sizeof(*med->med_lcd), &off,
+                                 exp->exp_delayed);
+        pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+
+        CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
+               med->med_lr_idx, le32_to_cpu(med->med_lcd->lcd_last_epoch),
+               le32_to_cpu(mds->mds_server_data->lsd_start_epoch));
+
+        return rc;
+}
+
+/* Called after recovery is done on server */
+void mds_update_last_epoch(struct obd_device *obd)
+{
+        struct ptlrpc_request *req;
+        struct mds_obd *mds = &obd->u.mds;
+        __u32 start_epoch;
+
+        /* Increase server epoch after recovery */
+        spin_lock(&mds->mds_transno_lock);
+        start_epoch = lr_epoch(mds->mds_last_transno) + 1;
+        mds->mds_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
+        mds->mds_server_data->lsd_start_epoch = cpu_to_le32(start_epoch);
+        spin_unlock(&mds->mds_transno_lock);
+
+        /* go through delayed reply queue to find all exports participate in
+         * recovery and set new epoch for them */
+        list_for_each_entry(req, &obd->obd_delayed_reply_queue, rq_list) {
+                LASSERT(!req->rq_export->exp_delayed);
+                mds_update_client_epoch(req->rq_export);
+        }
+        mds_update_server_data(obd, 1);
+}
+
 /* Add client data to the MDS.  We use a bitmap to locate a free space
  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
  * Otherwise, we have just read the data from the last_rcvd file and
@@ -112,7 +164,7 @@ int mds_client_add(struct obd_device *obd, struct obd_export *exp,
         struct mds_export_data *med = &exp->exp_mds_data;
         unsigned long *bitmap = mds->mds_client_bitmap;
         int new_client = (cl_idx == -1);
-        int rc;
+        int rc = 0;
         ENTRY;
 
         LASSERT(bitmap != NULL);
@@ -169,6 +221,10 @@ int mds_client_add(struct obd_device *obd, struct obd_export *exp,
                         rc = PTR_ERR(handle);
                         CERROR("unable to start transaction: rc %d\n", rc);
                 } else {
+                        /* VBR: set client last_transno as mds_last_transno to
+                         * remember last epoch for this client */
+                        med->med_lcd->lcd_last_epoch =
+                                        mds->mds_server_data->lsd_start_epoch;
                         rc = fsfilt_add_journal_cb(obd, 0, handle,
                                                    target_client_add_cb, exp);
                         if (rc == 0) {
@@ -190,7 +246,7 @@ int mds_client_add(struct obd_device *obd, struct obd_export *exp,
                        med->med_lr_idx, med->med_lr_off,
                        (unsigned int)sizeof(*med->med_lcd));
         }
-        return 0;
+        return rc;
 }
 
 int mds_client_free(struct obd_export *exp)
@@ -287,6 +343,7 @@ static int mds_init_server_data(struct obd_device *obd, struct file *file)
         loff_t off = 0;
         unsigned long last_rcvd_size = i_size_read(file->f_dentry->d_inode);
         __u64 mount_count;
+        __u32 start_epoch;
         int cl_idx, rc = 0;
         ENTRY;
 
@@ -358,8 +415,11 @@ static int mds_init_server_data(struct obd_device *obd, struct file *file)
         lsd->lsd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
 
         mds->mds_last_transno = le64_to_cpu(lsd->lsd_last_transno);
+        start_epoch = le32_to_cpu(lsd->lsd_start_epoch);
 
-        CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
+        CDEBUG(D_INODE, "%s: server start_epoch: %#x\n",
+               obd->obd_name, start_epoch);
+        CDEBUG(D_INODE, "%s: server last_transno: "LPX64"\n",
                obd->obd_name, mds->mds_last_transno);
         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
                obd->obd_name, mount_count + 1);
@@ -388,6 +448,7 @@ static int mds_init_server_data(struct obd_device *obd, struct file *file)
         for (cl_idx = 0, off = le32_to_cpu(lsd->lsd_client_start);
              off < last_rcvd_size; cl_idx++) {
                 __u64 last_transno;
+                __u32 last_epoch;
                 struct obd_export *exp;
                 struct mds_export_data *med;
 
@@ -420,6 +481,8 @@ static int mds_init_server_data(struct obd_device *obd, struct file *file)
                                le64_to_cpu(lcd->lcd_last_transno) :
                                le64_to_cpu(lcd->lcd_last_close_transno);
 
+                last_epoch = le32_to_cpu(lcd->lcd_last_epoch);
+
                 /* These exports are cleaned up by mds_disconnect(), so they
                  * need to be set up like real exports as mds_connect() does.
                  */
@@ -443,21 +506,31 @@ static int mds_init_server_data(struct obd_device *obd, struct file *file)
                         /* can't fail for existing client */
                         LASSERTF(rc == 0, "rc = %d\n", rc);
 
+                        /* VBR: set export last committed version */
+                        exp->exp_last_committed = last_transno;
+
                         lcd = NULL;
 
                         spin_lock(&exp->exp_lock);
                         exp->exp_replay_needed = 1;
                         exp->exp_connecting = 0;
+                        exp->exp_in_recovery = 0;
                         spin_unlock(&exp->exp_lock);
 
+                        spin_lock_bh(&obd->obd_processing_task_lock);
                         obd->obd_recoverable_clients++;
                         obd->obd_max_recoverable_clients++;
+                        spin_unlock_bh(&obd->obd_processing_task_lock);
+
+                        /* VBR: if epoch too old mark export as delayed */
+                        if (start_epoch > last_epoch)
+                                class_set_export_delayed(exp);
                         class_export_put(exp);
                 }
 
                 /* Need to check last_rcvd even for duplicated exports. */
-                CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
-                       cl_idx, last_transno);
+                CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPX64","
+                       "last_epoch %#x\n", cl_idx, last_transno, last_epoch);
 
                 if (last_transno > mds->mds_last_transno)
                         mds->mds_last_transno = last_transno;
@@ -470,8 +543,9 @@ static int mds_init_server_data(struct obd_device *obd, struct file *file)
 
         if (obd->obd_recoverable_clients) {
                 CWARN("RECOVERY: service %s, %d recoverable clients, "
-                      "last_transno "LPU64"\n", obd->obd_name,
-                      obd->obd_recoverable_clients, mds->mds_last_transno);
+                      "%d delayed clients, last_transno "LPU64"\n",
+                      obd->obd_name, obd->obd_recoverable_clients,
+                      obd->obd_delayed_clients, mds->mds_last_transno);
                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
                 obd->obd_recovering = 1;
                 obd->obd_recovery_start = 0;