Whamcloud - gitweb
- land b_ver_recov
authortappro <tappro>
Thu, 24 Jul 2008 11:38:02 +0000 (11:38 +0000)
committertappro <tappro>
Thu, 24 Jul 2008 11:38:02 +0000 (11:38 +0000)
lustre/obdfilter/filter.c

index 2f39aa0..4375077 100644 (file)
@@ -86,11 +86,38 @@ cfs_mem_cache_t *ll_fmd_cachep;
 static void filter_commit_cb(struct obd_device *obd, __u64 transno,
                              void *cb_data, int error)
 {
-        obd_transno_commit_cb(obd, transno, error);
+        struct obd_export *exp = cb_data;
+        obd_transno_commit_cb(obd, transno, exp, error);
+}
+
+int filter_version_get_check(struct obd_export *exp,
+                             struct obd_trans_info *oti, struct inode *inode)
+{
+        __u64 curr_version;
+
+        if (inode == NULL || oti == NULL)
+                RETURN(0);
+
+        curr_version = fsfilt_get_version(exp->exp_obd, inode);
+        if ((__s64)curr_version == -EOPNOTSUPP)
+                RETURN(0);
+        /* VBR: version is checked always because costs nothing */
+        if (oti->oti_pre_version != 0 &&
+            oti->oti_pre_version != curr_version) {
+                CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
+                       oti->oti_pre_version, curr_version);
+                spin_lock(&exp->exp_lock);
+                exp->exp_vbr_failed = 1;
+                spin_unlock(&exp->exp_lock);
+                RETURN (-EOVERFLOW);
+        }
+        oti->oti_pre_version = curr_version;
+        RETURN(0);
 }
 
 /* Assumes caller has already pushed us into the kernel context. */
-int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
+int filter_finish_transno(struct obd_export *exp, struct inode *inode,
+                          struct obd_trans_info *oti,
                           int rc, int force_sync)
 {
         struct filter_obd *filter = &exp->exp_obd->u.filter;
@@ -108,24 +135,28 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
                 RETURN(rc);
 
         /* we don't allocate new transnos for replayed requests */
+        spin_lock(&filter->fo_translock);
         if (oti->oti_transno == 0) {
-                spin_lock(&filter->fo_translock);
                 last_rcvd = le64_to_cpu(filter->fo_fsd->lsd_last_transno) + 1;
                 filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
-                spin_unlock(&filter->fo_translock);
-                oti->oti_transno = last_rcvd;
         } else {
-                spin_lock(&filter->fo_translock);
                 last_rcvd = oti->oti_transno;
                 if (last_rcvd > le64_to_cpu(filter->fo_fsd->lsd_last_transno))
-                        filter->fo_fsd->lsd_last_transno =
-                                cpu_to_le64(last_rcvd);
+                        filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
+        }
+        oti->oti_transno = last_rcvd;
+        if (last_rcvd <= le64_to_cpu(lcd->lcd_last_transno)) {
                 spin_unlock(&filter->fo_translock);
+                LBUG();
         }
         lcd->lcd_last_transno = cpu_to_le64(last_rcvd);
+        lcd->lcd_pre_versions[0] = cpu_to_le64(oti->oti_pre_version);
+        lcd->lcd_last_xid = cpu_to_le64(oti->oti_xid);
+
+        spin_unlock(&filter->fo_translock);
 
-        /* could get xid from oti, if it's ever needed */
-        lcd->lcd_last_xid = 0;
+        if (inode)
+                fsfilt_set_version(exp->exp_obd, inode, last_rcvd);
 
         off = fed->fed_lr_off;
         if (off <= 0) {
@@ -134,17 +165,17 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
                 err = -EINVAL;
         } else {
                 if (!force_sync)
-                        force_sync = fsfilt_add_journal_cb(exp->exp_obd, 
+                        force_sync = fsfilt_add_journal_cb(exp->exp_obd,
                                                            last_rcvd,
                                                            oti->oti_handle,
                                                            filter_commit_cb,
-                                                           NULL);
+                                                           exp);
 
                 err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp,
                                           lcd, sizeof(*lcd), &off,
                                           force_sync | exp->exp_need_sync);
                 if (force_sync)
-                        filter_commit_cb(exp->exp_obd, last_rcvd, NULL, err);
+                        filter_commit_cb(exp->exp_obd, last_rcvd, exp, err);
         }
         if (err) {
                 log_pri = D_ERROR;
@@ -246,6 +277,73 @@ static int filter_export_stats_init(struct obd_device *obd,
         RETURN(0);
 }
 
+/* VBR: to determine the delayed client the lcd should be updated for each new
+ * epoch */
+static int filter_update_client_epoch(struct obd_export *exp)
+{
+        struct filter_export_data *fed = &exp->exp_filter_data;
+        struct filter_obd *filter = &exp->exp_obd->u.filter;
+        struct lvfs_run_ctxt saved;
+        loff_t off = fed->fed_lr_off;
+        int rc = 0;
+
+        /* VBR: set client last_epoch to current epoch */
+        if (le32_to_cpu(fed->fed_lcd->lcd_last_epoch) >=
+                        le32_to_cpu(filter->fo_fsd->lsd_start_epoch))
+                return rc;
+        fed->fed_lcd->lcd_last_epoch = filter->fo_fsd->lsd_start_epoch;
+        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+        rc = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp,
+                                 fed->fed_lcd, sizeof(*fed->fed_lcd), &off,
+                                 exp->exp_delayed);
+        pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+
+        CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
+               fed->fed_lr_idx, le32_to_cpu(fed->fed_lcd->lcd_last_epoch),
+               le32_to_cpu(filter->fo_fsd->lsd_start_epoch));
+
+        return rc;
+}
+
+/* Called after recovery is done on server */
+static void filter_update_last_epoch(struct obd_device *obd)
+{
+        struct ptlrpc_request *req;
+        struct filter_obd *filter = &obd->u.filter;
+        struct lr_server_data *fsd = filter->fo_fsd;
+        __u32 start_epoch;
+
+        /* Increase server epoch after recovery */
+        spin_lock(&filter->fo_translock);
+        /* VBR: increase the epoch and store it in lsd */
+        start_epoch = lr_epoch(le64_to_cpu(fsd->lsd_last_transno)) + 1;
+        fsd->lsd_last_transno = cpu_to_le64((__u64)start_epoch << LR_EPOCH_BITS);
+        fsd->lsd_start_epoch = cpu_to_le32(start_epoch);
+        spin_unlock(&filter->fo_translock);
+
+        /* go through delayed reply queue to find all exports participate in
+         * recovery and set new epoch for them */
+        list_for_each_entry(req, &obd->obd_delayed_reply_queue, rq_list) {
+                LASSERT(!req->rq_export->exp_delayed);
+                filter_update_client_epoch(req->rq_export);
+        }
+        filter_update_server_data(obd, filter->fo_rcvd_filp, fsd, 1);
+}
+
+static int filter_postrecov(struct obd_device *obd)
+{
+        ENTRY;
+
+        if (obd->obd_fail)
+                RETURN(0);
+
+        LASSERT(!obd->obd_recovering);
+        /* VBR: update start_epoch on server */
+        filter_update_last_epoch(obd);
+
+        RETURN(0);
+}
+
 /* Add client data to the FILTER.  We use a bitmap to locate a free space
  * in the last_rcvd file if cl_idx is -1 (i.e. a new client).
  * Otherwise, we have just read the data from the last_rcvd file and
@@ -317,6 +415,8 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
                         rc = PTR_ERR(handle);
                         CERROR("unable to start transaction: rc %d\n", rc);
                 } else {
+                        fed->fed_lcd->lcd_last_epoch =
+                                              filter->fo_fsd->lsd_start_epoch;
                         rc = fsfilt_add_journal_cb(obd, 0, handle,
                                                    target_client_add_cb, exp);
                         if (rc == 0) {
@@ -362,7 +462,7 @@ static int filter_client_free(struct obd_export *exp)
                 GOTO(free, 0);
 
         CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n",
-               fed->fed_lr_idx, off, fed->fed_lcd->lcd_uuid);
+               fed->fed_lr_idx, fed->fed_lr_off, fed->fed_lcd->lcd_uuid);
 
         LASSERT(filter->fo_last_rcvd_slots != NULL);
 
@@ -594,7 +694,7 @@ static int filter_init_export(struct obd_export *exp)
 {
         spin_lock_init(&exp->exp_filter_data.fed_lock);
         INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list);
-        
+
         spin_lock(&exp->exp_lock);
         exp->exp_connecting = 1;
         spin_unlock(&exp->exp_lock);
@@ -669,6 +769,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
         struct inode *inode = filp->f_dentry->d_inode;
         unsigned long last_rcvd_size = i_size_read(inode);
         __u64 mount_count;
+        __u32 start_epoch;
         int cl_idx;
         loff_t off = 0;
         int rc;
@@ -740,7 +841,11 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                 GOTO(err_fsd, rc = -EINVAL);
         }
 
-        CDEBUG(D_INODE, "%s: server last_transno : "LPU64"\n",
+        start_epoch = le32_to_cpu(fsd->lsd_start_epoch);
+
+        CDEBUG(D_INODE, "%s: server start_epoch : %#x\n",
+               obd->obd_name, start_epoch);
+        CDEBUG(D_INODE, "%s: server last_transno : "LPX64"\n",
                obd->obd_name, le64_to_cpu(fsd->lsd_last_transno));
         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
                obd->obd_name, mount_count + 1);
@@ -818,20 +923,30 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                         /* can't fail for existing client */
                         LASSERTF(rc == 0, "rc = %d\n", rc);
 
-                        lcd = NULL;
+                        /* VBR: set export last committed */
+                        exp->exp_last_committed = last_rcvd;
 
                         spin_lock(&exp->exp_lock);
                         exp->exp_replay_needed = 1;
                         exp->exp_connecting = 0;
+                        exp->exp_in_recovery = 0;
                         spin_unlock(&exp->exp_lock);
 
+                        spin_lock_bh(&obd->obd_processing_task_lock);
                         obd->obd_recoverable_clients++;
                         obd->obd_max_recoverable_clients++;
+                        spin_unlock_bh(&obd->obd_processing_task_lock);
+
+                        /* VBR: if epoch too old mark export as delayed */
+                        if (start_epoch > le32_to_cpu(lcd->lcd_last_epoch))
+                                class_set_export_delayed(exp);
+
+                        lcd = NULL;
                         class_export_put(exp);
                 }
 
                 /* Need to check last_rcvd even for duplicated exports. */
-                CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
+                CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPX64"\n",
                        cl_idx, last_rcvd);
 
                 if (last_rcvd > le64_to_cpu(fsd->lsd_last_transno))
@@ -845,8 +960,9 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
 
         if (obd->obd_recoverable_clients) {
                 CWARN("RECOVERY: service %s, %d recoverable clients, "
-                      "last_rcvd "LPU64"\n", obd->obd_name,
-                      obd->obd_recoverable_clients,
+                      "%d delayed clients, last_rcvd "LPU64"\n",
+                      obd->obd_name, obd->obd_recoverable_clients,
+                      obd->obd_delayed_clients,
                       le64_to_cpu(fsd->lsd_last_transno));
                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
                 obd->obd_recovering = 1;
@@ -1838,8 +1954,8 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
                               label ?: "", label ? "/" : "", str,
                               obd->obd_recovery_timeout / 60,
                               obd->obd_recovery_timeout % 60,
-                              obd->obd_max_recoverable_clients,
-                              (obd->obd_max_recoverable_clients == 1) ? "":"s",
+                              obd->obd_recoverable_clients,
+                              (obd->obd_recoverable_clients == 1) ? "":"s",
                               obd->obd_name);
         } else {
                 LCONSOLE_INFO("OST %s now serving %s (%s%s%s) with recovery "
@@ -2421,6 +2537,9 @@ static int filter_ping(struct obd_export *exp)
 {
         filter_fmd_expire(exp);
 
+        if (exp->exp_delayed)
+                filter_update_client_epoch(exp);
+
         return 0;
 }
 
@@ -2550,6 +2669,11 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
                 locked = 1;
         }
 
+        /* VBR: version recovery check */
+        rc = filter_version_get_check(exp, oti, inode);
+        if (rc)
+                GOTO(out_unlock, rc);
+
         /* If the inode still has SUID+SGID bits set (see filter_precreate())
          * then we will accept the UID+GID sent by the client during write for
          * initializing the ownership of this inode.  We only allow this to
@@ -2614,7 +2738,7 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
         /* The truncate might have used up our transaction credits.  Make
          * sure we have one left for the last_rcvd update. */
         err = fsfilt_extend(exp->exp_obd, inode, 1, handle);
-        rc = filter_finish_transno(exp, oti, rc, sync);
+        rc = filter_finish_transno(exp, inode, oti, rc, sync);
         if (sync) {
                 filter_cancel_cookies_cb(exp->exp_obd, 0, fcc, rc);
                 fcc = NULL;
@@ -3288,7 +3412,7 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa,
                 }
                 GOTO(cleanup, rc = -ENOENT);
         }
-        
+
         filter_prepare_destroy(obd, oa->o_id);
 
         /* Our MDC connection is established by the MDS to us */
@@ -3307,6 +3431,12 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa,
          * (see BUG 4180) -bzzz
          */
         LOCK_INODE_MUTEX(dchild->d_inode);
+
+        /* VBR: version recovery check */
+        rc = filter_version_get_check(exp, oti, dchild->d_inode);
+        if (rc)
+                GOTO(cleanup, rc);
+
         handle = fsfilt_start_log(obd, dchild->d_inode, FSFILT_OP_SETATTR,
                                   NULL, 1);
         if (IS_ERR(handle)) {
@@ -3363,7 +3493,7 @@ cleanup:
                  * on commit. then we call callback directly to free 
                  * the fcc. 
                  */
-                rc = filter_finish_transno(exp, oti, rc, sync);
+                rc = filter_finish_transno(exp, NULL, oti, rc, sync);
                 if (sync) {
                         filter_cancel_cookies_cb(obd, 0, fcc, rc); 
                         fcc = NULL;
@@ -3701,6 +3831,7 @@ static struct obd_ops filter_obd_ops = {
         .o_iocontrol      = filter_iocontrol,
         .o_health_check   = filter_health_check,
         .o_process_config = filter_process_config,
+        .o_postrecov      = filter_postrecov,
 };
 
 quota_interface_t *filter_quota_interface_ref;