return 0;
}
+/* VBR: to determine the delayed client the lcd should be updated for each new
+ * epoch */
+int mds_update_client_epoch(struct obd_export *exp)
+{
+ struct mds_export_data *med = &exp->exp_mds_data;
+ struct mds_obd *mds = &exp->exp_obd->u.mds;
+ struct lvfs_run_ctxt saved;
+ loff_t off = med->med_lr_off;
+ int rc = 0;
+
+ /* VBR: set client last_epoch to current epoch */
+ if (le32_to_cpu(med->med_lcd->lcd_last_epoch) >=
+ le32_to_cpu(mds->mds_server_data->lsd_start_epoch))
+ return rc;
+
+ med->med_lcd->lcd_last_epoch = mds->mds_server_data->lsd_start_epoch;
+ push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+ rc = fsfilt_write_record(exp->exp_obd, mds->mds_rcvd_filp,
+ med->med_lcd, sizeof(*med->med_lcd), &off,
+ exp->exp_delayed);
+ pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+
+ CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
+ med->med_lr_idx, le32_to_cpu(med->med_lcd->lcd_last_epoch),
+ le32_to_cpu(mds->mds_server_data->lsd_start_epoch));
+
+ return rc;
+}
+
+/* Called after recovery is done on server */
+void mds_update_last_epoch(struct obd_device *obd)
+{
+ struct ptlrpc_request *req;
+ struct mds_obd *mds = &obd->u.mds;
+ __u32 start_epoch;
+
+ /* Increase server epoch after recovery */
+ spin_lock(&mds->mds_transno_lock);
+ start_epoch = lr_epoch(mds->mds_last_transno) + 1;
+ mds->mds_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
+ mds->mds_server_data->lsd_start_epoch = cpu_to_le32(start_epoch);
+ spin_unlock(&mds->mds_transno_lock);
+
+ /* go through delayed reply queue to find all exports participate in
+ * recovery and set new epoch for them */
+ list_for_each_entry(req, &obd->obd_delayed_reply_queue, rq_list) {
+ LASSERT(!req->rq_export->exp_delayed);
+ mds_update_client_epoch(req->rq_export);
+ }
+ mds_update_server_data(obd, 1);
+}
+
/* Add client data to the MDS. We use a bitmap to locate a free space
* in the last_rcvd file if cl_off is -1 (i.e. a new client).
* Otherwise, we have just read the data from the last_rcvd file and
struct mds_export_data *med = &exp->exp_mds_data;
unsigned long *bitmap = mds->mds_client_bitmap;
int new_client = (cl_idx == -1);
- int rc;
+ int rc = 0;
ENTRY;
LASSERT(bitmap != NULL);
rc = PTR_ERR(handle);
CERROR("unable to start transaction: rc %d\n", rc);
} else {
+ /* VBR: set client last_transno as mds_last_transno to
+ * remember last epoch for this client */
+ med->med_lcd->lcd_last_epoch =
+ mds->mds_server_data->lsd_start_epoch;
rc = fsfilt_add_journal_cb(obd, 0, handle,
target_client_add_cb, exp);
if (rc == 0) {
med->med_lr_idx, med->med_lr_off,
(unsigned int)sizeof(*med->med_lcd));
}
- return 0;
+ return rc;
}
int mds_client_free(struct obd_export *exp)
loff_t off = 0;
unsigned long last_rcvd_size = i_size_read(file->f_dentry->d_inode);
__u64 mount_count;
+ __u32 start_epoch;
int cl_idx, rc = 0;
ENTRY;
lsd->lsd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
mds->mds_last_transno = le64_to_cpu(lsd->lsd_last_transno);
+ start_epoch = le32_to_cpu(lsd->lsd_start_epoch);
- CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
+ CDEBUG(D_INODE, "%s: server start_epoch: %#x\n",
+ obd->obd_name, start_epoch);
+ CDEBUG(D_INODE, "%s: server last_transno: "LPX64"\n",
obd->obd_name, mds->mds_last_transno);
CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
obd->obd_name, mount_count + 1);
for (cl_idx = 0, off = le32_to_cpu(lsd->lsd_client_start);
off < last_rcvd_size; cl_idx++) {
__u64 last_transno;
+ __u32 last_epoch;
struct obd_export *exp;
struct mds_export_data *med;
le64_to_cpu(lcd->lcd_last_transno) :
le64_to_cpu(lcd->lcd_last_close_transno);
+ last_epoch = le32_to_cpu(lcd->lcd_last_epoch);
+
/* These exports are cleaned up by mds_disconnect(), so they
* need to be set up like real exports as mds_connect() does.
*/
/* can't fail for existing client */
LASSERTF(rc == 0, "rc = %d\n", rc);
+ /* VBR: set export last committed version */
+ exp->exp_last_committed = last_transno;
+
lcd = NULL;
spin_lock(&exp->exp_lock);
exp->exp_replay_needed = 1;
exp->exp_connecting = 0;
+ exp->exp_in_recovery = 0;
spin_unlock(&exp->exp_lock);
+ spin_lock_bh(&obd->obd_processing_task_lock);
obd->obd_recoverable_clients++;
obd->obd_max_recoverable_clients++;
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+
+ /* VBR: if epoch too old mark export as delayed */
+ if (start_epoch > last_epoch)
+ class_set_export_delayed(exp);
class_export_put(exp);
}
/* Need to check last_rcvd even for duplicated exports. */
- CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
- cl_idx, last_transno);
+ CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPX64","
+ "last_epoch %#x\n", cl_idx, last_transno, last_epoch);
if (last_transno > mds->mds_last_transno)
mds->mds_last_transno = last_transno;
if (obd->obd_recoverable_clients) {
CWARN("RECOVERY: service %s, %d recoverable clients, "
- "last_transno "LPU64"\n", obd->obd_name,
- obd->obd_recoverable_clients, mds->mds_last_transno);
+ "%d delayed clients, last_transno "LPU64"\n",
+ obd->obd_name, obd->obd_recoverable_clients,
+ obd->obd_delayed_clients, mds->mds_last_transno);
obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
obd->obd_recovering = 1;
obd->obd_recovery_start = 0;