From 8403f1403fd221d17f5c26fad7c0462900e63e54 Mon Sep 17 00:00:00 2001 From: tappro Date: Thu, 24 Jul 2008 11:38:02 +0000 Subject: [PATCH] - land b_ver_recov --- lustre/obdfilter/filter.c | 181 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 156 insertions(+), 25 deletions(-) diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 2f39aa0..4375077 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -86,11 +86,38 @@ cfs_mem_cache_t *ll_fmd_cachep; static void filter_commit_cb(struct obd_device *obd, __u64 transno, void *cb_data, int error) { - obd_transno_commit_cb(obd, transno, error); + struct obd_export *exp = cb_data; + obd_transno_commit_cb(obd, transno, exp, error); +} + +int filter_version_get_check(struct obd_export *exp, + struct obd_trans_info *oti, struct inode *inode) +{ + __u64 curr_version; + + if (inode == NULL || oti == NULL) + RETURN(0); + + curr_version = fsfilt_get_version(exp->exp_obd, inode); + if ((__s64)curr_version == -EOPNOTSUPP) + RETURN(0); + /* VBR: version is checked always because costs nothing */ + if (oti->oti_pre_version != 0 && + oti->oti_pre_version != curr_version) { + CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n", + oti->oti_pre_version, curr_version); + spin_lock(&exp->exp_lock); + exp->exp_vbr_failed = 1; + spin_unlock(&exp->exp_lock); + RETURN (-EOVERFLOW); + } + oti->oti_pre_version = curr_version; + RETURN(0); } /* Assumes caller has already pushed us into the kernel context. */ -int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti, +int filter_finish_transno(struct obd_export *exp, struct inode *inode, + struct obd_trans_info *oti, int rc, int force_sync) { struct filter_obd *filter = &exp->exp_obd->u.filter; @@ -108,24 +135,28 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti, RETURN(rc); /* we don't allocate new transnos for replayed requests */ + spin_lock(&filter->fo_translock); if (oti->oti_transno == 0) { - spin_lock(&filter->fo_translock); last_rcvd = le64_to_cpu(filter->fo_fsd->lsd_last_transno) + 1; filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd); - spin_unlock(&filter->fo_translock); - oti->oti_transno = last_rcvd; } else { - spin_lock(&filter->fo_translock); last_rcvd = oti->oti_transno; if (last_rcvd > le64_to_cpu(filter->fo_fsd->lsd_last_transno)) - filter->fo_fsd->lsd_last_transno = - cpu_to_le64(last_rcvd); + filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd); + } + oti->oti_transno = last_rcvd; + if (last_rcvd <= le64_to_cpu(lcd->lcd_last_transno)) { spin_unlock(&filter->fo_translock); + LBUG(); } lcd->lcd_last_transno = cpu_to_le64(last_rcvd); + lcd->lcd_pre_versions[0] = cpu_to_le64(oti->oti_pre_version); + lcd->lcd_last_xid = cpu_to_le64(oti->oti_xid); + + spin_unlock(&filter->fo_translock); - /* could get xid from oti, if it's ever needed */ - lcd->lcd_last_xid = 0; + if (inode) + fsfilt_set_version(exp->exp_obd, inode, last_rcvd); off = fed->fed_lr_off; if (off <= 0) { @@ -134,17 +165,17 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti, err = -EINVAL; } else { if (!force_sync) - force_sync = fsfilt_add_journal_cb(exp->exp_obd, + force_sync = fsfilt_add_journal_cb(exp->exp_obd, last_rcvd, oti->oti_handle, filter_commit_cb, - NULL); + exp); err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp, lcd, sizeof(*lcd), &off, force_sync | exp->exp_need_sync); if (force_sync) - filter_commit_cb(exp->exp_obd, last_rcvd, NULL, err); + filter_commit_cb(exp->exp_obd, last_rcvd, exp, err); } if (err) { log_pri = D_ERROR; @@ -246,6 +277,73 @@ static int filter_export_stats_init(struct obd_device *obd, RETURN(0); } +/* VBR: to determine the delayed client the lcd should be updated for each new + * epoch */ +static int filter_update_client_epoch(struct obd_export *exp) +{ + struct filter_export_data *fed = &exp->exp_filter_data; + struct filter_obd *filter = &exp->exp_obd->u.filter; + struct lvfs_run_ctxt saved; + loff_t off = fed->fed_lr_off; + int rc = 0; + + /* VBR: set client last_epoch to current epoch */ + if (le32_to_cpu(fed->fed_lcd->lcd_last_epoch) >= + le32_to_cpu(filter->fo_fsd->lsd_start_epoch)) + return rc; + fed->fed_lcd->lcd_last_epoch = filter->fo_fsd->lsd_start_epoch; + push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + rc = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp, + fed->fed_lcd, sizeof(*fed->fed_lcd), &off, + exp->exp_delayed); + pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + + CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n", + fed->fed_lr_idx, le32_to_cpu(fed->fed_lcd->lcd_last_epoch), + le32_to_cpu(filter->fo_fsd->lsd_start_epoch)); + + return rc; +} + +/* Called after recovery is done on server */ +static void filter_update_last_epoch(struct obd_device *obd) +{ + struct ptlrpc_request *req; + struct filter_obd *filter = &obd->u.filter; + struct lr_server_data *fsd = filter->fo_fsd; + __u32 start_epoch; + + /* Increase server epoch after recovery */ + spin_lock(&filter->fo_translock); + /* VBR: increase the epoch and store it in lsd */ + start_epoch = lr_epoch(le64_to_cpu(fsd->lsd_last_transno)) + 1; + fsd->lsd_last_transno = cpu_to_le64((__u64)start_epoch << LR_EPOCH_BITS); + fsd->lsd_start_epoch = cpu_to_le32(start_epoch); + spin_unlock(&filter->fo_translock); + + /* go through delayed reply queue to find all exports participate in + * recovery and set new epoch for them */ + list_for_each_entry(req, &obd->obd_delayed_reply_queue, rq_list) { + LASSERT(!req->rq_export->exp_delayed); + filter_update_client_epoch(req->rq_export); + } + filter_update_server_data(obd, filter->fo_rcvd_filp, fsd, 1); +} + +static int filter_postrecov(struct obd_device *obd) +{ + ENTRY; + + if (obd->obd_fail) + RETURN(0); + + LASSERT(!obd->obd_recovering); + /* VBR: update start_epoch on server */ + filter_update_last_epoch(obd); + + RETURN(0); +} + /* Add client data to the FILTER. We use a bitmap to locate a free space * in the last_rcvd file if cl_idx is -1 (i.e. a new client). * Otherwise, we have just read the data from the last_rcvd file and @@ -317,6 +415,8 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp, rc = PTR_ERR(handle); CERROR("unable to start transaction: rc %d\n", rc); } else { + fed->fed_lcd->lcd_last_epoch = + filter->fo_fsd->lsd_start_epoch; rc = fsfilt_add_journal_cb(obd, 0, handle, target_client_add_cb, exp); if (rc == 0) { @@ -362,7 +462,7 @@ static int filter_client_free(struct obd_export *exp) GOTO(free, 0); CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n", - fed->fed_lr_idx, off, fed->fed_lcd->lcd_uuid); + fed->fed_lr_idx, fed->fed_lr_off, fed->fed_lcd->lcd_uuid); LASSERT(filter->fo_last_rcvd_slots != NULL); @@ -594,7 +694,7 @@ static int filter_init_export(struct obd_export *exp) { spin_lock_init(&exp->exp_filter_data.fed_lock); INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list); - + spin_lock(&exp->exp_lock); exp->exp_connecting = 1; spin_unlock(&exp->exp_lock); @@ -669,6 +769,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) struct inode *inode = filp->f_dentry->d_inode; unsigned long last_rcvd_size = i_size_read(inode); __u64 mount_count; + __u32 start_epoch; int cl_idx; loff_t off = 0; int rc; @@ -740,7 +841,11 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) GOTO(err_fsd, rc = -EINVAL); } - CDEBUG(D_INODE, "%s: server last_transno : "LPU64"\n", + start_epoch = le32_to_cpu(fsd->lsd_start_epoch); + + CDEBUG(D_INODE, "%s: server start_epoch : %#x\n", + obd->obd_name, start_epoch); + CDEBUG(D_INODE, "%s: server last_transno : "LPX64"\n", obd->obd_name, le64_to_cpu(fsd->lsd_last_transno)); CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n", obd->obd_name, mount_count + 1); @@ -818,20 +923,30 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) /* can't fail for existing client */ LASSERTF(rc == 0, "rc = %d\n", rc); - lcd = NULL; + /* VBR: set export last committed */ + exp->exp_last_committed = last_rcvd; spin_lock(&exp->exp_lock); exp->exp_replay_needed = 1; exp->exp_connecting = 0; + exp->exp_in_recovery = 0; spin_unlock(&exp->exp_lock); + spin_lock_bh(&obd->obd_processing_task_lock); obd->obd_recoverable_clients++; obd->obd_max_recoverable_clients++; + spin_unlock_bh(&obd->obd_processing_task_lock); + + /* VBR: if epoch too old mark export as delayed */ + if (start_epoch > le32_to_cpu(lcd->lcd_last_epoch)) + class_set_export_delayed(exp); + + lcd = NULL; class_export_put(exp); } /* Need to check last_rcvd even for duplicated exports. */ - CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n", + CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPX64"\n", cl_idx, last_rcvd); if (last_rcvd > le64_to_cpu(fsd->lsd_last_transno)) @@ -845,8 +960,9 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) if (obd->obd_recoverable_clients) { CWARN("RECOVERY: service %s, %d recoverable clients, " - "last_rcvd "LPU64"\n", obd->obd_name, - obd->obd_recoverable_clients, + "%d delayed clients, last_rcvd "LPU64"\n", + obd->obd_name, obd->obd_recoverable_clients, + obd->obd_delayed_clients, le64_to_cpu(fsd->lsd_last_transno)); obd->obd_next_recovery_transno = obd->obd_last_committed + 1; obd->obd_recovering = 1; @@ -1838,8 +1954,8 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf, label ?: "", label ? "/" : "", str, obd->obd_recovery_timeout / 60, obd->obd_recovery_timeout % 60, - obd->obd_max_recoverable_clients, - (obd->obd_max_recoverable_clients == 1) ? "":"s", + obd->obd_recoverable_clients, + (obd->obd_recoverable_clients == 1) ? "":"s", obd->obd_name); } else { LCONSOLE_INFO("OST %s now serving %s (%s%s%s) with recovery " @@ -2421,6 +2537,9 @@ static int filter_ping(struct obd_export *exp) { filter_fmd_expire(exp); + if (exp->exp_delayed) + filter_update_client_epoch(exp); + return 0; } @@ -2550,6 +2669,11 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, locked = 1; } + /* VBR: version recovery check */ + rc = filter_version_get_check(exp, oti, inode); + if (rc) + GOTO(out_unlock, rc); + /* If the inode still has SUID+SGID bits set (see filter_precreate()) * then we will accept the UID+GID sent by the client during write for * initializing the ownership of this inode. We only allow this to @@ -2614,7 +2738,7 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, /* The truncate might have used up our transaction credits. Make * sure we have one left for the last_rcvd update. */ err = fsfilt_extend(exp->exp_obd, inode, 1, handle); - rc = filter_finish_transno(exp, oti, rc, sync); + rc = filter_finish_transno(exp, inode, oti, rc, sync); if (sync) { filter_cancel_cookies_cb(exp->exp_obd, 0, fcc, rc); fcc = NULL; @@ -3288,7 +3412,7 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, } GOTO(cleanup, rc = -ENOENT); } - + filter_prepare_destroy(obd, oa->o_id); /* Our MDC connection is established by the MDS to us */ @@ -3307,6 +3431,12 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, * (see BUG 4180) -bzzz */ LOCK_INODE_MUTEX(dchild->d_inode); + + /* VBR: version recovery check */ + rc = filter_version_get_check(exp, oti, dchild->d_inode); + if (rc) + GOTO(cleanup, rc); + handle = fsfilt_start_log(obd, dchild->d_inode, FSFILT_OP_SETATTR, NULL, 1); if (IS_ERR(handle)) { @@ -3363,7 +3493,7 @@ cleanup: * on commit. then we call callback directly to free * the fcc. */ - rc = filter_finish_transno(exp, oti, rc, sync); + rc = filter_finish_transno(exp, NULL, oti, rc, sync); if (sync) { filter_cancel_cookies_cb(obd, 0, fcc, rc); fcc = NULL; @@ -3701,6 +3831,7 @@ static struct obd_ops filter_obd_ops = { .o_iocontrol = filter_iocontrol, .o_health_check = filter_health_check, .o_process_config = filter_process_config, + .o_postrecov = filter_postrecov, }; quota_interface_t *filter_quota_interface_ref; -- 1.8.3.1