static void filter_commit_cb(struct obd_device *obd, __u64 transno,
void *cb_data, int error)
{
- obd_transno_commit_cb(obd, transno, error);
+ struct obd_export *exp = cb_data;
+ obd_transno_commit_cb(obd, transno, exp, error);
+}
+
+int filter_version_get_check(struct obd_export *exp,
+ struct obd_trans_info *oti, struct inode *inode)
+{
+ __u64 curr_version;
+
+ if (inode == NULL || oti == NULL)
+ RETURN(0);
+
+ curr_version = fsfilt_get_version(exp->exp_obd, inode);
+ if ((__s64)curr_version == -EOPNOTSUPP)
+ RETURN(0);
+ /* VBR: version is checked always because costs nothing */
+ if (oti->oti_pre_version != 0 &&
+ oti->oti_pre_version != curr_version) {
+ CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
+ oti->oti_pre_version, curr_version);
+ spin_lock(&exp->exp_lock);
+ exp->exp_vbr_failed = 1;
+ spin_unlock(&exp->exp_lock);
+ RETURN (-EOVERFLOW);
+ }
+ oti->oti_pre_version = curr_version;
+ RETURN(0);
}
/* Assumes caller has already pushed us into the kernel context. */
-int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
+int filter_finish_transno(struct obd_export *exp, struct inode *inode,
+ struct obd_trans_info *oti,
int rc, int force_sync)
{
struct filter_obd *filter = &exp->exp_obd->u.filter;
RETURN(rc);
/* we don't allocate new transnos for replayed requests */
+ spin_lock(&filter->fo_translock);
if (oti->oti_transno == 0) {
- spin_lock(&filter->fo_translock);
last_rcvd = le64_to_cpu(filter->fo_fsd->lsd_last_transno) + 1;
filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
- spin_unlock(&filter->fo_translock);
- oti->oti_transno = last_rcvd;
} else {
- spin_lock(&filter->fo_translock);
last_rcvd = oti->oti_transno;
if (last_rcvd > le64_to_cpu(filter->fo_fsd->lsd_last_transno))
- filter->fo_fsd->lsd_last_transno =
- cpu_to_le64(last_rcvd);
+ filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
+ }
+ oti->oti_transno = last_rcvd;
+ if (last_rcvd <= le64_to_cpu(lcd->lcd_last_transno)) {
spin_unlock(&filter->fo_translock);
+ LBUG();
}
lcd->lcd_last_transno = cpu_to_le64(last_rcvd);
+ lcd->lcd_pre_versions[0] = cpu_to_le64(oti->oti_pre_version);
+ lcd->lcd_last_xid = cpu_to_le64(oti->oti_xid);
+
+ spin_unlock(&filter->fo_translock);
- /* could get xid from oti, if it's ever needed */
- lcd->lcd_last_xid = 0;
+ if (inode)
+ fsfilt_set_version(exp->exp_obd, inode, last_rcvd);
off = fed->fed_lr_off;
if (off <= 0) {
err = -EINVAL;
} else {
if (!force_sync)
- force_sync = fsfilt_add_journal_cb(exp->exp_obd,
+ force_sync = fsfilt_add_journal_cb(exp->exp_obd,
last_rcvd,
oti->oti_handle,
filter_commit_cb,
- NULL);
+ exp);
err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp,
lcd, sizeof(*lcd), &off,
force_sync | exp->exp_need_sync);
if (force_sync)
- filter_commit_cb(exp->exp_obd, last_rcvd, NULL, err);
+ filter_commit_cb(exp->exp_obd, last_rcvd, exp, err);
}
if (err) {
log_pri = D_ERROR;
RETURN(0);
}
+/* VBR: to determine the delayed client the lcd should be updated for each new
+ * epoch */
+static int filter_update_client_epoch(struct obd_export *exp)
+{
+ struct filter_export_data *fed = &exp->exp_filter_data;
+ struct filter_obd *filter = &exp->exp_obd->u.filter;
+ struct lvfs_run_ctxt saved;
+ loff_t off = fed->fed_lr_off;
+ int rc = 0;
+
+ /* VBR: set client last_epoch to current epoch */
+ if (le32_to_cpu(fed->fed_lcd->lcd_last_epoch) >=
+ le32_to_cpu(filter->fo_fsd->lsd_start_epoch))
+ return rc;
+ fed->fed_lcd->lcd_last_epoch = filter->fo_fsd->lsd_start_epoch;
+ push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+ rc = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp,
+ fed->fed_lcd, sizeof(*fed->fed_lcd), &off,
+ exp->exp_delayed);
+ pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+
+ CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
+ fed->fed_lr_idx, le32_to_cpu(fed->fed_lcd->lcd_last_epoch),
+ le32_to_cpu(filter->fo_fsd->lsd_start_epoch));
+
+ return rc;
+}
+
+/* Called after recovery is done on server */
+static void filter_update_last_epoch(struct obd_device *obd)
+{
+ struct ptlrpc_request *req;
+ struct filter_obd *filter = &obd->u.filter;
+ struct lr_server_data *fsd = filter->fo_fsd;
+ __u32 start_epoch;
+
+ /* Increase server epoch after recovery */
+ spin_lock(&filter->fo_translock);
+ /* VBR: increase the epoch and store it in lsd */
+ start_epoch = lr_epoch(le64_to_cpu(fsd->lsd_last_transno)) + 1;
+ fsd->lsd_last_transno = cpu_to_le64((__u64)start_epoch << LR_EPOCH_BITS);
+ fsd->lsd_start_epoch = cpu_to_le32(start_epoch);
+ spin_unlock(&filter->fo_translock);
+
+ /* go through delayed reply queue to find all exports participate in
+ * recovery and set new epoch for them */
+ list_for_each_entry(req, &obd->obd_delayed_reply_queue, rq_list) {
+ LASSERT(!req->rq_export->exp_delayed);
+ filter_update_client_epoch(req->rq_export);
+ }
+ filter_update_server_data(obd, filter->fo_rcvd_filp, fsd, 1);
+}
+
+static int filter_postrecov(struct obd_device *obd)
+{
+ ENTRY;
+
+ if (obd->obd_fail)
+ RETURN(0);
+
+ LASSERT(!obd->obd_recovering);
+ /* VBR: update start_epoch on server */
+ filter_update_last_epoch(obd);
+
+ RETURN(0);
+}
+
/* Add client data to the FILTER. We use a bitmap to locate a free space
* in the last_rcvd file if cl_idx is -1 (i.e. a new client).
* Otherwise, we have just read the data from the last_rcvd file and
rc = PTR_ERR(handle);
CERROR("unable to start transaction: rc %d\n", rc);
} else {
+ fed->fed_lcd->lcd_last_epoch =
+ filter->fo_fsd->lsd_start_epoch;
rc = fsfilt_add_journal_cb(obd, 0, handle,
target_client_add_cb, exp);
if (rc == 0) {
GOTO(free, 0);
CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n",
- fed->fed_lr_idx, off, fed->fed_lcd->lcd_uuid);
+ fed->fed_lr_idx, fed->fed_lr_off, fed->fed_lcd->lcd_uuid);
LASSERT(filter->fo_last_rcvd_slots != NULL);
{
spin_lock_init(&exp->exp_filter_data.fed_lock);
INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list);
-
+
spin_lock(&exp->exp_lock);
exp->exp_connecting = 1;
spin_unlock(&exp->exp_lock);
struct inode *inode = filp->f_dentry->d_inode;
unsigned long last_rcvd_size = i_size_read(inode);
__u64 mount_count;
+ __u32 start_epoch;
int cl_idx;
loff_t off = 0;
int rc;
GOTO(err_fsd, rc = -EINVAL);
}
- CDEBUG(D_INODE, "%s: server last_transno : "LPU64"\n",
+ start_epoch = le32_to_cpu(fsd->lsd_start_epoch);
+
+ CDEBUG(D_INODE, "%s: server start_epoch : %#x\n",
+ obd->obd_name, start_epoch);
+ CDEBUG(D_INODE, "%s: server last_transno : "LPX64"\n",
obd->obd_name, le64_to_cpu(fsd->lsd_last_transno));
CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
obd->obd_name, mount_count + 1);
/* can't fail for existing client */
LASSERTF(rc == 0, "rc = %d\n", rc);
- lcd = NULL;
+ /* VBR: set export last committed */
+ exp->exp_last_committed = last_rcvd;
spin_lock(&exp->exp_lock);
exp->exp_replay_needed = 1;
exp->exp_connecting = 0;
+ exp->exp_in_recovery = 0;
spin_unlock(&exp->exp_lock);
+ spin_lock_bh(&obd->obd_processing_task_lock);
obd->obd_recoverable_clients++;
obd->obd_max_recoverable_clients++;
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+
+ /* VBR: if epoch too old mark export as delayed */
+ if (start_epoch > le32_to_cpu(lcd->lcd_last_epoch))
+ class_set_export_delayed(exp);
+
+ lcd = NULL;
class_export_put(exp);
}
/* Need to check last_rcvd even for duplicated exports. */
- CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
+ CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPX64"\n",
cl_idx, last_rcvd);
if (last_rcvd > le64_to_cpu(fsd->lsd_last_transno))
if (obd->obd_recoverable_clients) {
CWARN("RECOVERY: service %s, %d recoverable clients, "
- "last_rcvd "LPU64"\n", obd->obd_name,
- obd->obd_recoverable_clients,
+ "%d delayed clients, last_rcvd "LPU64"\n",
+ obd->obd_name, obd->obd_recoverable_clients,
+ obd->obd_delayed_clients,
le64_to_cpu(fsd->lsd_last_transno));
obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
obd->obd_recovering = 1;
label ?: "", label ? "/" : "", str,
obd->obd_recovery_timeout / 60,
obd->obd_recovery_timeout % 60,
- obd->obd_max_recoverable_clients,
- (obd->obd_max_recoverable_clients == 1) ? "":"s",
+ obd->obd_recoverable_clients,
+ (obd->obd_recoverable_clients == 1) ? "":"s",
obd->obd_name);
} else {
LCONSOLE_INFO("OST %s now serving %s (%s%s%s) with recovery "
{
filter_fmd_expire(exp);
+ if (exp->exp_delayed)
+ filter_update_client_epoch(exp);
+
return 0;
}
locked = 1;
}
+ /* VBR: version recovery check */
+ rc = filter_version_get_check(exp, oti, inode);
+ if (rc)
+ GOTO(out_unlock, rc);
+
/* If the inode still has SUID+SGID bits set (see filter_precreate())
* then we will accept the UID+GID sent by the client during write for
* initializing the ownership of this inode. We only allow this to
/* The truncate might have used up our transaction credits. Make
* sure we have one left for the last_rcvd update. */
err = fsfilt_extend(exp->exp_obd, inode, 1, handle);
- rc = filter_finish_transno(exp, oti, rc, sync);
+ rc = filter_finish_transno(exp, inode, oti, rc, sync);
if (sync) {
filter_cancel_cookies_cb(exp->exp_obd, 0, fcc, rc);
fcc = NULL;
}
GOTO(cleanup, rc = -ENOENT);
}
-
+
filter_prepare_destroy(obd, oa->o_id);
/* Our MDC connection is established by the MDS to us */
* (see BUG 4180) -bzzz
*/
LOCK_INODE_MUTEX(dchild->d_inode);
+
+ /* VBR: version recovery check */
+ rc = filter_version_get_check(exp, oti, dchild->d_inode);
+ if (rc)
+ GOTO(cleanup, rc);
+
handle = fsfilt_start_log(obd, dchild->d_inode, FSFILT_OP_SETATTR,
NULL, 1);
if (IS_ERR(handle)) {
* on commit. then we call callback directly to free
* the fcc.
*/
- rc = filter_finish_transno(exp, oti, rc, sync);
+ rc = filter_finish_transno(exp, NULL, oti, rc, sync);
if (sync) {
filter_cancel_cookies_cb(obd, 0, fcc, rc);
fcc = NULL;
.o_iocontrol = filter_iocontrol,
.o_health_check = filter_health_check,
.o_process_config = filter_process_config,
+ .o_postrecov = filter_postrecov,
};
quota_interface_t *filter_quota_interface_ref;