Whamcloud - gitweb
LU-540 misc patch for user identity upcall/downcall
[fs/lustre-release.git] / lustre / mdt / mdt_recovery.c
index bce0088..2065bfd 100644 (file)
@@ -26,7 +26,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  */
 /*
@@ -241,13 +241,17 @@ static int mdt_clients_data_init(const struct lu_env *env,
                                  unsigned long last_size)
 {
         struct lr_server_data  *lsd = &mdt->mdt_lut.lut_lsd;
-        struct lsd_client_data *lcd = NULL;
+        struct lsd_client_data *lcd;
         struct obd_device      *obd = mdt2obd_dev(mdt);
         loff_t off;
         int cl_idx;
         int rc = 0;
         ENTRY;
 
+        OBD_ALLOC_PTR(lcd);
+        if (!lcd)
+                RETURN(-ENOMEM);
+
         /* When we do a clean MDS shutdown, we save the last_transno into
          * the header.  If we find clients with higher last_transno values
          * then those clients may need recovery done. */
@@ -256,12 +260,7 @@ static int mdt_clients_data_init(const struct lu_env *env,
              off < last_size; cl_idx++) {
                 __u64 last_transno;
                 struct obd_export *exp;
-
-                if (!lcd) {
-                        OBD_ALLOC_PTR(lcd);
-                        if (!lcd)
-                                RETURN(-ENOMEM);
-                }
+                struct mdt_thread_info *mti;
 
                 off = lsd->lsd_client_start +
                         cl_idx * lsd->lsd_client_size;
@@ -295,30 +294,29 @@ static int mdt_clients_data_init(const struct lu_env *env,
                 if (IS_ERR(exp)) {
                         if (PTR_ERR(exp) == -EALREADY) {
                                 /* export already exists, zero out this one */
-                                lcd->lcd_uuid[0] = '\0';
-                        } else {
-                                GOTO(err_client, rc = PTR_ERR(exp));
+                                CERROR("Duplicate export %s!\n", lcd->lcd_uuid);
+                                continue;
                         }
-                } else {
-                        struct mdt_thread_info *mti;
-                        mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
-                        LASSERT(mti != NULL);
-                        mti->mti_exp = exp;
-                        exp->exp_target_data.ted_lcd = lcd;
-                        rc = mdt_client_add(env, mdt, cl_idx);
-                        /* can't fail existing */
-                        LASSERTF(rc == 0, "rc = %d\n", rc);
-                        /* VBR: set export last committed version */
-                        exp->exp_last_committed = last_transno;
-                        lcd = NULL;
-                        cfs_spin_lock(&exp->exp_lock);
-                        exp->exp_connecting = 0;
-                        exp->exp_in_recovery = 0;
-                        cfs_spin_unlock(&exp->exp_lock);
-                        obd->obd_max_recoverable_clients++;
-                        class_export_put(exp);
+                        GOTO(err_client, rc = PTR_ERR(exp));
                 }
 
+                mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+                LASSERT(mti != NULL);
+                mti->mti_exp = exp;
+                /* copy on-disk lcd to the export */
+                *exp->exp_target_data.ted_lcd = *lcd;
+                rc = mdt_client_add(env, mdt, cl_idx);
+                /* can't fail existing */
+                LASSERTF(rc == 0, "rc = %d\n", rc);
+                /* VBR: set export last committed version */
+                exp->exp_last_committed = last_transno;
+                cfs_spin_lock(&exp->exp_lock);
+                exp->exp_connecting = 0;
+                exp->exp_in_recovery = 0;
+                cfs_spin_unlock(&exp->exp_lock);
+                obd->obd_max_recoverable_clients++;
+                class_export_put(exp);
+
                 CDEBUG(D_OTHER, "client at idx %d has last_transno="LPU64"\n",
                        cl_idx, last_transno);
                 /* protect __u64 value update */
@@ -329,8 +327,7 @@ static int mdt_clients_data_init(const struct lu_env *env,
         }
 
 err_client:
-        if (lcd)
-                OBD_FREE_PTR(lcd);
+        OBD_FREE_PTR(lcd);
         RETURN(rc);
 }
 
@@ -418,12 +415,15 @@ static int mdt_server_data_init(const struct lu_env *env,
         }
         /** Interop: evict all clients at first boot with 1.8 last_rcvd */
         if (!(lsd->lsd_feature_compat & OBD_COMPAT_20)) {
-                LCONSOLE_WARN("Mounting %s at first time on 1.8 FS, remove all"
-                              " clients for interop needs\n", obd->obd_name);
-                simple_truncate(lsi->lsi_srv_mnt->mnt_sb->s_root,
-                                lsi->lsi_srv_mnt, LAST_RCVD,
-                                lsd->lsd_client_start);
-                last_rcvd_size = lsd->lsd_client_start;
+                if (last_rcvd_size > lsd->lsd_client_start) {
+                        LCONSOLE_WARN("Mounting %s at first time on 1.8 FS, "
+                                      "remove all clients for interop needs\n",
+                                      obd->obd_name);
+                        simple_truncate(lsi->lsi_srv_mnt->mnt_sb->s_root,
+                                        lsi->lsi_srv_mnt, LAST_RCVD,
+                                        lsd->lsd_client_start);
+                        last_rcvd_size = lsd->lsd_client_start;
+                }
                 /** set 2.0 flag to upgrade/downgrade between 1.8 and 2.0 */
                 lsd->lsd_feature_compat |= OBD_COMPAT_20;
         }
@@ -472,8 +472,8 @@ static int mdt_server_data_init(const struct lu_env *env,
         obd->obd_last_committed = mdt->mdt_lut.lut_last_transno;
         cfs_spin_unlock(&mdt->mdt_lut.lut_translock);
 
-        mdt->mdt_lut.lut_mount_count = mount_count + 1;
-        lsd->lsd_mount_count = mdt->mdt_lut.lut_mount_count;
+        obd->u.obt.obt_mount_count = mount_count + 1;
+        lsd->lsd_mount_count = obd->u.obt.obt_mount_count;
 
         /* save it, so mount count and last_transno is current */
         rc = mdt_server_data_update(env, mdt);
@@ -503,7 +503,8 @@ static int mdt_server_data_update(const struct lu_env *env,
                 RETURN(PTR_ERR(th));
 
         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
-               mdt->mdt_lut.lut_mount_count, mdt->mdt_lut.lut_last_transno);
+               mdt->mdt_lut.lut_obd->u.obt.obt_mount_count,
+               mdt->mdt_lut.lut_last_transno);
 
         cfs_spin_lock(&mdt->mdt_lut.lut_translock);
         mdt->mdt_lut.lut_lsd.lsd_last_transno = mdt->mdt_lut.lut_last_transno;
@@ -752,8 +753,21 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
         LASSERT(ergo(mti->mti_transno == 0, rc != 0));
         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
             lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
-                if (mti->mti_transno != 0)
+                if (mti->mti_transno != 0) {
+                        if (lcd->lcd_last_close_transno > mti->mti_transno) {
+                                LASSERT(req_is_replay(req));
+                                CERROR("Trying to overwrite bigger transno:"
+                                       "on-disk: "LPU64", new: "LPU64"\n",
+                                       lcd->lcd_last_close_transno,
+                                       mti->mti_transno);
+                                cfs_spin_lock(&req->rq_export->exp_lock);
+                                req->rq_export->exp_vbr_failed = 1;
+                                cfs_spin_unlock(&req->rq_export->exp_lock);
+                                cfs_mutex_up(&ted->ted_lcd_lock);
+                                RETURN(-EOVERFLOW);
+                        }
                         lcd->lcd_last_close_transno = mti->mti_transno;
+                }
                 lcd->lcd_last_close_xid = req->rq_xid;
                 lcd->lcd_last_close_result = rc;
         } else {
@@ -765,8 +779,21 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
                         lcd->lcd_pre_versions[2] = pre_versions[2];
                         lcd->lcd_pre_versions[3] = pre_versions[3];
                 }
-                if (mti->mti_transno != 0)
+                if (mti->mti_transno != 0) {
+                        if (lcd->lcd_last_transno > mti->mti_transno) {
+                                LASSERT(req_is_replay(req));
+                                CERROR("Trying to overwrite bigger transno:"
+                                       "on-disk: "LPU64", new: "LPU64"\n",
+                                       lcd->lcd_last_transno,
+                                       mti->mti_transno);
+                                cfs_spin_lock(&req->rq_export->exp_lock);
+                                req->rq_export->exp_vbr_failed = 1;
+                                cfs_spin_unlock(&req->rq_export->exp_lock);
+                                cfs_mutex_up(&ted->ted_lcd_lock);
+                                RETURN(-EOVERFLOW);
+                        }
                         lcd->lcd_last_transno = mti->mti_transno;
+                }
                 lcd->lcd_last_xid = req->rq_xid;
                 lcd->lcd_last_result = rc;
                 /*XXX: save intent_disposition in mdt_thread_info?
@@ -799,14 +826,13 @@ static int mdt_txn_start_cb(const struct lu_env *env,
 }
 
 /* Set new object versions */
-static void mdt_versions_set(struct mdt_thread_info *info)
+static void mdt_version_set(struct mdt_thread_info *info)
 {
-        int i;
-        for (i = 0; i < PTLRPC_NUM_VERSIONS; i++)
-                if (info->mti_mos[i] != NULL)
-                        mo_version_set(info->mti_env,
-                                       mdt_object_child(info->mti_mos[i]),
-                                       info->mti_transno);
+        if (info->mti_mos != NULL) {
+                mo_version_set(info->mti_env, mdt_object_child(info->mti_mos),
+                               info->mti_transno);
+                info->mti_mos = NULL;
+        }
 }
 
 /* Update last_rcvd records with latests transaction data */
@@ -841,7 +867,7 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
         cfs_spin_lock(&mdt->mdt_lut.lut_translock);
         if (txn->th_result != 0) {
                 if (mti->mti_transno != 0) {
-                        CERROR("Replay transno "LPU64" failed: rc %i\n",
+                        CERROR("Replay transno "LPU64" failed: rc %d\n",
                                mti->mti_transno, txn->th_result);
                 }
         } else if (mti->mti_transno == 0) {
@@ -857,7 +883,7 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
 
         /** VBR: set new versions */
         if (txn->th_result == 0)
-                mdt_versions_set(mti);
+                mdt_version_set(mti);
 
         /* filling reply data */
         CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
@@ -978,7 +1004,7 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req)
                                 oldrep->rs_opc);
 
                 svc = oldrep->rs_service;
-                cfs_spin_lock (&svc->srv_lock);
+                cfs_spin_lock (&svc->srv_rs_lock);
 
                 cfs_list_del_init (&oldrep->rs_exp_list);
 
@@ -998,7 +1024,7 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req)
                 ptlrpc_schedule_difficult_reply (oldrep);
                 cfs_spin_unlock(&oldrep->rs_lock);
 
-                cfs_spin_unlock (&svc->srv_lock);
+                cfs_spin_unlock (&svc->srv_rs_lock);
                 break;
         }
         cfs_spin_unlock(&exp->exp_lock);