Whamcloud - gitweb
Branch HEAD
authortappro <tappro>
Wed, 15 Jul 2009 11:42:33 +0000 (11:42 +0000)
committertappro <tappro>
Wed, 15 Jul 2009 11:42:33 +0000 (11:42 +0000)
b=19844
i=rread
i=zam

- fix the bug with timeout due to missed clients during recovery, introduce
  mechanism to track such events and re-charge timer.
- fix incorrect handling of some counters in class_unlink_export
- correct procfs output information about recovery
- add regression tests

lustre/include/lustre_export.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/ldlm/ldlm_lib.c
lustre/mdt/mdt_handler.c
lustre/obdclass/genops.c
lustre/obdclass/lprocfs_status.c
lustre/obdfilter/filter.c

index 5c328f7..7ff4284 100644 (file)
@@ -146,6 +146,7 @@ struct obd_export {
         atomic_t                  exp_rpc_count;
         atomic_t                  exp_cb_count;
         atomic_t                  exp_locks_count;
+        atomic_t                  exp_replay_count;
         struct obd_uuid           exp_client_uuid;
         struct list_head          exp_obd_chain;
         struct hlist_node         exp_uuid_hash; /* uuid-export hash*/
index 79b7391..dbfd961 100644 (file)
@@ -1025,6 +1025,7 @@ struct obd_device {
                       obd_recovering:1,    /* there are recoverable clients */
                       obd_abort_recovery:1,/* recovery expired */
                       obd_version_recov:1, /* obd uses version checking */
+                      obd_recovery_expired:1,
                       obd_replayable:1,    /* recovery is enabled; inform clients */
                       obd_no_transno:1,    /* no committed-transno notification */
                       obd_no_recov:1,      /* fail instead of retry messages */
@@ -1077,7 +1078,6 @@ struct obd_device {
 
         int                              obd_max_recoverable_clients;
         int                              obd_connected_clients;
-        int                              obd_recoverable_clients;
         int                              obd_stale_clients;
         int                              obd_delayed_clients;
         spinlock_t                       obd_processing_task_lock; /* BH lock (timer) */
index c296b1d..a9e36ba 100644 (file)
@@ -210,9 +210,7 @@ void class_fail_export(struct obd_export *exp);
 void class_disconnect_exports(struct obd_device *obddev);
 int class_manual_cleanup(struct obd_device *obd);
 void class_disconnect_stale_exports(struct obd_device *,
-                                    int (*test_export)(struct obd_export *),
-                                    enum obd_option flags);
-  
+                                    int (*test_export)(struct obd_export *));
 static inline enum obd_option exp_flags_from_obd(struct obd_device *obd)
 {
         return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
index f45f7e8..a55a9b0 100644 (file)
@@ -859,7 +859,7 @@ no_export:
                                "%d clients in recovery for "CFS_TIME_T"s\n",
                                target->obd_name,
                                libcfs_nid2str(req->rq_peer.nid), cluuid.uuid,
-                               target->obd_recoverable_clients,
+                               atomic_read(&target->obd_lock_replay_clients),
                                cfs_duration_sec(t));
                         rc = -EBUSY;
                 } else {
@@ -956,8 +956,6 @@ dont_check_exports:
                      && data->ocd_transno < target->obd_next_recovery_transno)
                         target->obd_next_recovery_transno = data->ocd_transno;
                 target->obd_connected_clients++;
-                /* each connected client is counted as recoverable */
-                target->obd_recoverable_clients++;
                 atomic_inc(&target->obd_req_replay_clients);
                 atomic_inc(&target->obd_lock_replay_clients);
                 if (target->obd_connected_clients ==
@@ -1057,10 +1055,10 @@ void target_destroy_export(struct obd_export *exp)
         if (exp->exp_imp_reverse != NULL)
                 client_destroy_import(exp->exp_imp_reverse);
 
-        /* We cancel locks at disconnect time, but this will catch any locks
-         * granted in a race with recovery-induced disconnect. */
-        if (exp->exp_obd->obd_namespace != NULL)
-                ldlm_cancel_locks_for_export(exp);
+        LASSERT(atomic_read(&exp->exp_locks_count) == 0);
+        LASSERT(atomic_read(&exp->exp_rpc_count) == 0);
+        LASSERT(atomic_read(&exp->exp_cb_count) == 0);
+        LASSERT(atomic_read(&exp->exp_replay_count) == 0);
 }
 
 /*
@@ -1112,7 +1110,8 @@ struct ptlrpc_request *ptlrpc_clone_req( struct ptlrpc_request *orig_req)
                 orig_req->rq_repmsg = NULL;
                 orig_req->rq_replen = 0;
         }
-
+        /** let export know it has replays to be handled */
+        atomic_inc(&copy_req->rq_export->exp_replay_count);
         return copy_req;
 }
 
@@ -1122,6 +1121,8 @@ void ptlrpc_free_clone(struct ptlrpc_request *req)
 
         ptlrpc_req_drop_rs(req);
         sptlrpc_svc_ctx_decref(req);
+        LASSERT(atomic_read(&req->rq_export->exp_replay_count) > 0);
+        atomic_dec(&req->rq_export->exp_replay_count);
         class_export_rpc_put(req->rq_export);
         list_del_init(&req->rq_list);
 
@@ -1407,10 +1408,53 @@ target_start_and_reset_recovery_timer(struct obd_device *obd,
 }
 
 #ifdef __KERNEL__
+
+/** Health checking routines */
+static inline int exp_connect_healthy(struct obd_export *exp)
+{
+        return (exp->exp_in_recovery);
+}
+
+/** if export done req_replay or has replay in queue */
+static inline int exp_req_replay_healthy(struct obd_export *exp)
+{
+        return (!exp->exp_req_replay_needed ||
+                atomic_read(&exp->exp_replay_count) > 0);
+}
+/** if export done lock_replay or has replay in queue */
+static inline int exp_lock_replay_healthy(struct obd_export *exp)
+{
+        return (!exp->exp_lock_replay_needed ||
+                atomic_read(&exp->exp_replay_count) > 0);
+}
+
+static inline int exp_vbr_healthy(struct obd_export *exp)
+{
+        return (!exp->exp_vbr_failed);
+}
+
+static inline int exp_finished(struct obd_export *exp)
+{
+        return (exp->exp_in_recovery && !exp->exp_lock_replay_needed);
+}
+
+/** Checking routines for recovery */
+static int check_for_clients(struct obd_device *obd)
+{
+        if (obd->obd_abort_recovery || obd->obd_recovery_expired)
+                return 1;
+        LASSERT(obd->obd_connected_clients <= obd->obd_max_recoverable_clients);
+        if (obd->obd_no_conn == 0 &&
+            obd->obd_connected_clients + obd->obd_stale_clients ==
+            obd->obd_max_recoverable_clients)
+                return 1;
+        return 0;
+}
+
 static int check_for_next_transno(struct obd_device *obd)
 {
         struct ptlrpc_request *req = NULL;
-        int wake_up = 0, connected, completed, queue_len, max;
+        int wake_up = 0, connected, completed, queue_len;
         __u64 next_transno, req_transno;
         ENTRY;
         spin_lock_bh(&obd->obd_processing_task_lock);
@@ -1423,37 +1467,44 @@ static int check_for_next_transno(struct obd_device *obd)
                 req_transno = 0;
         }
 
-        max = obd->obd_max_recoverable_clients;
         connected = obd->obd_connected_clients;
-        completed = connected - obd->obd_recoverable_clients;
+        completed = connected - atomic_read(&obd->obd_req_replay_clients);
         queue_len = obd->obd_requests_queued_for_recovery;
         next_transno = obd->obd_next_recovery_transno;
 
         CDEBUG(D_HA, "max: %d, connected: %d, completed: %d, queue_len: %d, "
                "req_transno: "LPU64", next_transno: "LPU64"\n",
-               max, connected, completed, queue_len, req_transno, next_transno);
+               obd->obd_max_recoverable_clients, connected, completed,
+               queue_len, req_transno, next_transno);
 
         if (obd->obd_abort_recovery) {
                 CDEBUG(D_HA, "waking for aborted recovery\n");
                 wake_up = 1;
+        } else if (obd->obd_recovery_expired) {
+                CDEBUG(D_HA, "waking for expired recovery\n");
+                wake_up = 1;
         } else if (atomic_read(&obd->obd_req_replay_clients) == 0) {
                 CDEBUG(D_HA, "waking for completed recovery\n");
                 wake_up = 1;
         } else if (req_transno == next_transno) {
                 CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
                 wake_up = 1;
-        } else if (queue_len + completed == max) {
-                /* handle gaps occured due to lost reply. It is allowed gaps
-                 * because all clients are connected and there will be resend
-                 * for missed transaction */
+        } else if (queue_len == atomic_read(&obd->obd_req_replay_clients)) {
+                int d_lvl = D_HA;
+                /** handle gaps occured due to lost reply or VBR */
                 LASSERTF(req_transno >= next_transno,
                          "req_transno: "LPU64", next_transno: "LPU64"\n",
                          req_transno, next_transno);
-
-                CDEBUG(req_transno > obd->obd_last_committed ? D_ERROR : D_HA,
-                       "waking for skipped transno (skip: "LPD64
-                       ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n",
-                       next_transno, queue_len, completed, connected, req_transno);
+                if (req_transno > obd->obd_last_committed &&
+                    !obd->obd_version_recov)
+                        d_lvl = D_ERROR;
+                CDEBUG(d_lvl,
+                       "%s: waking for gap in transno, VBR is %s (skip: "
+                       LPD64", ql: %d, comp: %d, conn: %d, next: "LPD64
+                       ", last_committed: "LPD64")\n",
+                       obd->obd_name, obd->obd_version_recov ? "ON" : "OFF",
+                       next_transno, queue_len, completed, connected,
+                       req_transno, obd->obd_last_committed);
                 obd->obd_next_recovery_transno = req_transno;
                 wake_up = 1;
         } else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_RECOVERY_ACCEPTS_GAPS)) {
@@ -1461,64 +1512,17 @@ static int check_for_next_transno(struct obd_device *obd)
                        " by fail_lock, waking up ("LPD64")\n", next_transno);
                 obd->obd_next_recovery_transno = req_transno;
                 wake_up = 1;
-        } else if (queue_len == atomic_read(&obd->obd_req_replay_clients)) {
-                /* some clients haven't connected in time, but we can try
-                 * to replay requests that demand on already committed ones
-                 * also, we can replay first non-committed transation */
-                LASSERT(req_transno != 0);
-                if (obd->obd_version_recov ||
-                    req_transno == obd->obd_last_committed + 1) {
-                        obd->obd_next_recovery_transno = req_transno;
-                } else if (req_transno > obd->obd_last_committed) {
-                        /* can't continue recovery: have no needed transno */
-                        obd->obd_abort_recovery = 1;
-                        CDEBUG(D_ERROR, "abort due to missed clients. max: %d, "
-                               "connected: %d, completed: %d, queue_len: %d, "
-                               "req_transno: "LPU64", next_transno: "LPU64"\n",
-                               max, connected, completed, queue_len,
-                               req_transno, next_transno);
-                }
-                wake_up = 1;
         }
-
         spin_unlock_bh(&obd->obd_processing_task_lock);
         return wake_up;
 }
 
-static struct ptlrpc_request *target_next_replay_req(struct obd_device *obd)
-{
-        struct l_wait_info lwi = { 0 };
-        struct ptlrpc_request *req;
-
-        CDEBUG(D_HA, "Waiting for transno "LPD64"\n",
-               obd->obd_next_recovery_transno);
-        l_wait_event(obd->obd_next_transno_waitq,
-                     check_for_next_transno(obd), &lwi);
-
-        spin_lock_bh(&obd->obd_processing_task_lock);
-        if (obd->obd_abort_recovery) {
-                req = NULL;
-        } else if (!list_empty(&obd->obd_req_replay_queue)) {
-                req = list_entry(obd->obd_req_replay_queue.next,
-                                 struct ptlrpc_request, rq_list);
-                list_del_init(&req->rq_list);
-                obd->obd_requests_queued_for_recovery--;
-        } else {
-                req = NULL;
-        }
-        spin_unlock_bh(&obd->obd_processing_task_lock);
-        RETURN(req);
-}
-
 static int check_for_next_lock(struct obd_device *obd)
 {
-        struct ptlrpc_request *req = NULL;
         int wake_up = 0;
 
         spin_lock_bh(&obd->obd_processing_task_lock);
         if (!list_empty(&obd->obd_lock_replay_queue)) {
-                req = list_entry(obd->obd_lock_replay_queue.next,
-                                 struct ptlrpc_request, rq_list);
                 CDEBUG(D_HA, "waking for next lock\n");
                 wake_up = 1;
         } else if (atomic_read(&obd->obd_lock_replay_clients) == 0) {
@@ -1527,38 +1531,117 @@ static int check_for_next_lock(struct obd_device *obd)
         } else if (obd->obd_abort_recovery) {
                 CDEBUG(D_HA, "waking for aborted recovery\n");
                 wake_up = 1;
+        } else if (obd->obd_recovery_expired) {
+                CDEBUG(D_HA, "waking for expired recovery\n");
+                wake_up = 1;
         }
         spin_unlock_bh(&obd->obd_processing_task_lock);
 
         return wake_up;
 }
 
+/**
+ * wait for recovery events,
+ * check its status with help of check_routine
+ * evict dead clients via health_check
+ */
+static int target_recovery_overseer(struct obd_device *obd,
+                                    int (*check_routine)(struct obd_device *),
+                                    int (*health_check)(struct obd_export *))
+{
+        int abort = 0, expired = 1;
+
+        do {
+                cfs_wait_event(obd->obd_next_transno_waitq, check_routine(obd));
+                spin_lock_bh(&obd->obd_processing_task_lock);
+                abort = obd->obd_abort_recovery;
+                expired = obd->obd_recovery_expired;
+                obd->obd_recovery_expired = 0;
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+                if (abort) {
+                        CWARN("recovery is aborted, evict exports in recovery\n");
+                        /** evict exports which didn't finish recovery yet */
+                        class_disconnect_stale_exports(obd, exp_finished);
+                } else if (expired) {
+                        /** If some clients died being recovered, evict them */
+                        CDEBUG(D_WARNING, "recovery is timed out, evict stale exports\n");
+                        /** evict cexports with no replay in queue, they are stalled */
+                        class_disconnect_stale_exports(obd, health_check);
+                        /** continue with VBR */
+                        spin_lock_bh(&obd->obd_processing_task_lock);
+                        obd->obd_version_recov = 1;
+                        spin_unlock_bh(&obd->obd_processing_task_lock);
+                        /**
+                         * reset timer, recovery will proceed with versions now,
+                         * timeout is set just to handle reconnection delays
+                         */
+                        reset_recovery_timer(obd, RECONNECT_DELAY_MAX * 2, 1);
+                        /** Wait for recovery events again, after evicting bad clients */
+                }
+        } while (!abort && expired);
+
+        return abort;
+}
+
+static struct ptlrpc_request *target_next_replay_req(struct obd_device *obd)
+{
+        struct ptlrpc_request *req = NULL;
+        ENTRY;
+
+        CDEBUG(D_HA, "Waiting for transno "LPD64"\n",
+               obd->obd_next_recovery_transno);
+
+        if (target_recovery_overseer(obd, check_for_next_transno,
+                                     exp_req_replay_healthy)) {
+                abort_req_replay_queue(obd);
+                abort_lock_replay_queue(obd);
+        }
+
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        if (!list_empty(&obd->obd_req_replay_queue)) {
+                req = list_entry(obd->obd_req_replay_queue.next,
+                                 struct ptlrpc_request, rq_list);
+                list_del_init(&req->rq_list);
+                obd->obd_requests_queued_for_recovery--;
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+        } else {
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+                LASSERT(list_empty(&obd->obd_req_replay_queue));
+                LASSERT(atomic_read(&obd->obd_req_replay_clients) == 0);
+                /** evict exports failed VBR */
+                class_disconnect_stale_exports(obd, exp_vbr_healthy);
+        }
+        RETURN(req);
+}
+
 static struct ptlrpc_request *target_next_replay_lock(struct obd_device *obd)
 {
-        struct l_wait_info lwi = { 0 };
-        struct ptlrpc_request *req;
+        struct ptlrpc_request *req = NULL;
 
         CDEBUG(D_HA, "Waiting for lock\n");
-        l_wait_event(obd->obd_next_transno_waitq,
-                     check_for_next_lock(obd), &lwi);
+        if (target_recovery_overseer(obd, check_for_next_lock,
+                                     exp_lock_replay_healthy))
+                abort_lock_replay_queue(obd);
 
         spin_lock_bh(&obd->obd_processing_task_lock);
-        if (obd->obd_abort_recovery) {
-                req = NULL;
-        } else if (!list_empty(&obd->obd_lock_replay_queue)) {
+        if (!list_empty(&obd->obd_lock_replay_queue)) {
                 req = list_entry(obd->obd_lock_replay_queue.next,
                                  struct ptlrpc_request, rq_list);
                 list_del_init(&req->rq_list);
+                spin_unlock_bh(&obd->obd_processing_task_lock);
         } else {
-                req = NULL;
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+                LASSERT(list_empty(&obd->obd_lock_replay_queue));
+                LASSERT(atomic_read(&obd->obd_lock_replay_clients) == 0);
+                /** evict exports failed VBR */
+                class_disconnect_stale_exports(obd, exp_vbr_healthy);
         }
-        spin_unlock_bh(&obd->obd_processing_task_lock);
         return req;
 }
 
 static struct ptlrpc_request *target_next_final_ping(struct obd_device *obd)
 {
-        struct ptlrpc_request *req;
+        struct ptlrpc_request *req = NULL;
 
         spin_lock_bh(&obd->obd_processing_task_lock);
         if (!list_empty(&obd->obd_final_req_queue)) {
@@ -1570,44 +1653,11 @@ static struct ptlrpc_request *target_next_final_ping(struct obd_device *obd)
                         req->rq_export->exp_in_recovery = 0;
                         spin_unlock(&req->rq_export->exp_lock);
                 }
-        } else {
-                req = NULL;
         }
         spin_unlock_bh(&obd->obd_processing_task_lock);
         return req;
 }
 
-static inline int req_vbr_done(struct obd_export *exp)
-{
-        return (exp->exp_vbr_failed == 0);
-}
-
-static inline int req_replay_done(struct obd_export *exp)
-{
-        return (exp->exp_req_replay_needed == 0);
-}
-
-static inline int lock_replay_done(struct obd_export *exp)
-{
-        return (exp->exp_lock_replay_needed == 0);
-}
-
-static inline int connect_done(struct obd_export *exp)
-{
-        return (exp->exp_in_recovery != 0);
-}
-
-static int check_for_clients(struct obd_device *obd)
-{
-        if (obd->obd_abort_recovery || obd->obd_version_recov)
-                return 1;
-        LASSERT(obd->obd_connected_clients <= obd->obd_max_recoverable_clients);
-        if (obd->obd_no_conn == 0 &&
-            obd->obd_connected_clients == obd->obd_max_recoverable_clients)
-                return 1;
-        return 0;
-}
-
 static int handle_recovery_req(struct ptlrpc_thread *thread,
                                struct ptlrpc_request *req,
                                svc_handler_t handler)
@@ -1618,8 +1668,15 @@ static int handle_recovery_req(struct ptlrpc_thread *thread,
         rc = lu_context_init(&req->rq_session, LCT_SESSION);
         if (rc) {
                 CERROR("Failure to initialize session: %d\n", rc);
-                return rc;
+                GOTO(free_clone, rc);
         }
+        /**
+         * export can be evicted during recovery, no need to handle replays for
+         * it after that, discard such request silently
+         */
+        if (req->rq_export->exp_disconnected)
+                GOTO(free_clone, rc);
+
         req->rq_session.lc_thread = thread;
         lu_context_enter(&req->rq_session);
         req->rq_svc_thread = thread;
@@ -1633,8 +1690,7 @@ static int handle_recovery_req(struct ptlrpc_thread *thread,
         lu_context_exit(&req->rq_session);
         lu_context_fini(&req->rq_session);
         /* don't reset timer for final stage */
-        if (!req_replay_done(req->rq_export) ||
-            !lock_replay_done(req->rq_export))
+        if (!exp_finished(req->rq_export))
                 reset_recovery_timer(class_exp2obd(req->rq_export),
                                      AT_OFF ? obd_timeout :
                        at_get(&req->rq_rqbd->rqbd_service->srv_at_estimate), 1);
@@ -1643,12 +1699,13 @@ static int handle_recovery_req(struct ptlrpc_thread *thread,
          * bz18031: increase next_recovery_transno before ptlrpc_free_clone()
          * will drop exp_rpc reference
          */
-        if (!req_replay_done(req->rq_export)) {
+        if (req->rq_export->exp_req_replay_needed) {
                 spin_lock_bh(&req->rq_export->exp_obd->obd_processing_task_lock);
                 req->rq_export->exp_obd->obd_next_recovery_transno++;
                 spin_unlock_bh(&req->rq_export->exp_obd->obd_processing_task_lock);
                 target_exp_dequeue_req_replay(req);
         }
+free_clone:
         ptlrpc_free_clone(req);
         RETURN(0);
 }
@@ -1659,7 +1716,6 @@ static int target_recovery_thread(void *arg)
         struct obd_device *obd = lut->lut_obd;
         struct ptlrpc_request *req;
         struct target_recovery_data *trd = &obd->obd_recovery_data;
-        struct l_wait_info lwi = { 0 };
         unsigned long delta;
         unsigned long flags;
         struct lu_env env;
@@ -1689,25 +1745,10 @@ static int target_recovery_thread(void *arg)
         complete(&trd->trd_starting);
 
         /* first of all, we have to know the first transno to replay */
-        obd->obd_abort_recovery = 0;
-        l_wait_event(obd->obd_next_transno_waitq,
-                     check_for_clients(obd), &lwi);
-
-        /* If some clients haven't connected in time, evict them */
-        if (obd->obd_connected_clients < obd->obd_max_recoverable_clients) {
-                CWARN("Some clients haven't connect in time (%d/%d),"
-                       "evict them\n", obd->obd_connected_clients,
-                       obd->obd_max_recoverable_clients);
-                class_disconnect_stale_exports(obd, connect_done,
-                                               exp_flags_from_obd(obd) |
-                                               OBD_OPT_ABORT_RECOV);
-                /**
-                 * if recovery proceeds with versions then some clients may be
-                 * timed out waiting for others and trying to reconnect.
-                 * Extend timer for such reconnect cases.
-                 */
-                if (obd->obd_version_recov)
-                        reset_recovery_timer(obd, RECONNECT_DELAY_MAX * 2, 1);
+        if (target_recovery_overseer(obd, check_for_clients,
+                                     exp_connect_healthy)) {
+                abort_req_replay_queue(obd);
+                abort_lock_replay_queue(obd);
         }
 
         /* next stage: replay requests */
@@ -1726,17 +1767,9 @@ static int target_recovery_thread(void *arg)
                 obd->obd_replayed_requests++;
         }
 
-        /* If some clients haven't replayed requests in time, evict them */
-        if (obd->obd_abort_recovery) {
-                CDEBUG(D_WARNING, "req replay is aborted\n");
-                class_disconnect_stale_exports(obd, req_replay_done,
-                                               exp_flags_from_obd(obd) |
-                                               OBD_OPT_ABORT_RECOV);
-                abort_req_replay_queue(obd);
-        }
-        LASSERT(list_empty(&obd->obd_req_replay_queue));
-
-        /* The second stage: replay locks */
+        /**
+         * The second stage: replay locks
+         */
         CDEBUG(D_INFO, "2: lock replay stage - %d clients\n",
                atomic_read(&obd->obd_lock_replay_clients));
         while ((req = target_next_replay_lock(obd))) {
@@ -1748,22 +1781,11 @@ static int target_recovery_thread(void *arg)
                 obd->obd_replayed_locks++;
         }
 
-        /* If some clients haven't replayed requests in time, evict them */
-        if (obd->obd_abort_recovery) {
-                CERROR("lock replay is aborted\n");
-                class_disconnect_stale_exports(obd, lock_replay_done,
-                                               exp_flags_from_obd(obd) |
-                                               OBD_OPT_ABORT_RECOV);
-                abort_lock_replay_queue(obd);
-        }
-        LASSERT(list_empty(&obd->obd_lock_replay_queue));
-
-        /* The third stage: reply on final pings */
+        /**
+         * The third stage: reply on final pings, at this moment all clients
+         * must have request in final queue
+         */
         CDEBUG(D_INFO, "3: final stage - process recovery completion pings\n");
-        /** evict exports failed VBR */
-        class_disconnect_stale_exports(obd, req_vbr_done,
-                                       exp_flags_from_obd(obd) |
-                                       OBD_OPT_ABORT_RECOV);
         /** Update server last boot epoch */
         lut_boot_epoch_update(lut);
         /* We drop recoverying flag to forward all new requests
@@ -1783,8 +1805,6 @@ static int target_recovery_thread(void *arg)
         delta = (jiffies - delta) / HZ;
         CDEBUG(D_INFO,"4: recovery completed in %lus - %d/%d reqs/locks\n",
               delta, obd->obd_replayed_requests, obd->obd_replayed_locks);
-        LASSERT(atomic_read(&obd->obd_req_replay_clients) == 0);
-        LASSERT(atomic_read(&obd->obd_lock_replay_clients) == 0);
         if (delta > obd_timeout * OBD_RECOVERY_FACTOR) {
                 CWARN("too long recovery - read logs\n");
                 libcfs_debug_dumplog();
@@ -1845,17 +1865,14 @@ EXPORT_SYMBOL(target_recovery_fini);
 static void target_recovery_expired(unsigned long castmeharder)
 {
         struct obd_device *obd = (struct obd_device *)castmeharder;
-        CDEBUG(D_HA, "%s: recovery timed out; %d clients never reconnected "
-               "after %lds (%d clients did)\n",
-               obd->obd_name, obd->obd_recoverable_clients,
+        CDEBUG(D_HA, "%s: recovery timed out; %d clients are still in recovery"
+               " after %lds (%d clients connected)\n",
+               obd->obd_name, atomic_read(&obd->obd_lock_replay_clients),
                cfs_time_current_sec()- obd->obd_recovery_start,
                obd->obd_connected_clients);
 
         spin_lock_bh(&obd->obd_processing_task_lock);
-        obd->obd_version_recov = 1;
-        CDEBUG(D_INFO, "VBR is used for %d clients from t"LPU64"\n",
-               atomic_read(&obd->obd_req_replay_clients),
-               obd->obd_next_recovery_transno);
+        obd->obd_recovery_expired = 1;
         cfs_waitq_signal(&obd->obd_next_transno_waitq);
         spin_unlock_bh(&obd->obd_processing_task_lock);
 }
@@ -1900,10 +1917,6 @@ static int target_process_req_flags(struct obd_device *obd,
                         exp->exp_req_replay_needed = 0;
                         spin_unlock(&exp->exp_lock);
                         atomic_dec(&obd->obd_req_replay_clients);
-                        LASSERT(obd->obd_recoverable_clients > 0);
-                        obd->obd_recoverable_clients--;
-                        if (atomic_read(&obd->obd_req_replay_clients) == 0)
-                                CDEBUG(D_HA, "all clients have replayed reqs\n");
                 }
                 spin_unlock_bh(&obd->obd_processing_task_lock);
         }
@@ -1917,8 +1930,6 @@ static int target_process_req_flags(struct obd_device *obd,
                         exp->exp_lock_replay_needed = 0;
                         spin_unlock(&exp->exp_lock);
                         atomic_dec(&obd->obd_lock_replay_clients);
-                        if (atomic_read(&obd->obd_lock_replay_clients) == 0)
-                                CDEBUG(D_HA, "all clients have replayed locks\n");
                 }
                 spin_unlock_bh(&obd->obd_processing_task_lock);
         }
@@ -2071,17 +2082,6 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
         RETURN(0);
 }
 
-struct obd_device * target_req2obd(struct ptlrpc_request *req)
-{
-        return req->rq_export->exp_obd;
-}
-
-static inline struct ldlm_pool *ldlm_exp2pl(struct obd_export *exp)
-{
-        LASSERT(exp != NULL);
-        return &exp->exp_obd->obd_namespace->ns_pool;
-}
-
 /**
  * Packs current SLV and Limit into \a req.
  */
index dafe372..3b77972 100644 (file)
@@ -4343,7 +4343,8 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
         m->mdt_identity_cache = NULL;
 
         if (m->mdt_namespace != NULL) {
-                ldlm_namespace_free(m->mdt_namespace, NULL, d->ld_obd->obd_force);
+                ldlm_namespace_free(m->mdt_namespace, NULL,
+                                    d->ld_obd->obd_force);
                 d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
         }
 
index f4099e0..e9f337b 100644 (file)
@@ -62,6 +62,7 @@ spinlock_t        obd_zombie_impexp_lock;
 static void obd_zombie_impexp_notify(void);
 static void obd_zombie_export_add(struct obd_export *exp);
 static void obd_zombie_import_add(struct obd_import *imp);
+static void print_export_data(struct obd_export *exp, const char *status);
 
 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
 
@@ -783,6 +784,7 @@ struct obd_export *class_new_export(struct obd_device *obd,
         atomic_set(&export->exp_rpc_count, 0);
         atomic_set(&export->exp_cb_count, 0);
         atomic_set(&export->exp_locks_count, 0);
+        atomic_set(&export->exp_replay_count, 0);
         export->exp_obd = obd;
         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
         spin_lock_init(&export->exp_uncommitted_replies_lock);
@@ -842,16 +844,6 @@ void class_unlink_export(struct obd_export *exp)
         list_del_init(&exp->exp_obd_chain_timed);
         exp->exp_obd->obd_num_exports--;
         spin_unlock(&exp->exp_obd->obd_dev_lock);
-
-        /* Keep these counter valid always */
-        spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
-        if (exp->exp_delayed)
-                exp->exp_obd->obd_delayed_clients--;
-        else if (exp->exp_in_recovery)
-                exp->exp_obd->obd_recoverable_clients--;
-        else if (exp->exp_obd->obd_recovering)
-                exp->exp_obd->obd_max_recoverable_clients--;
-        spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
         class_export_put(exp);
 }
 EXPORT_SYMBOL(class_unlink_export);
@@ -1017,27 +1009,30 @@ void class_export_recovery_cleanup(struct obd_export *exp)
         struct obd_device *obd = exp->exp_obd;
 
         spin_lock_bh(&obd->obd_processing_task_lock);
+        if (exp->exp_delayed)
+                obd->obd_delayed_clients--;
         if (obd->obd_recovering && exp->exp_in_recovery) {
                 spin_lock(&exp->exp_lock);
                 exp->exp_in_recovery = 0;
                 spin_unlock(&exp->exp_lock);
+                LASSERT(obd->obd_connected_clients);
                 obd->obd_connected_clients--;
-                /* each connected client is counted as recoverable */
-                obd->obd_recoverable_clients--;
-                if (exp->exp_req_replay_needed) {
-                        spin_lock(&exp->exp_lock);
-                        exp->exp_req_replay_needed = 0;
-                        spin_unlock(&exp->exp_lock);
-                        LASSERT(atomic_read(&obd->obd_req_replay_clients));
-                        atomic_dec(&obd->obd_req_replay_clients);
-                }
-                if (exp->exp_lock_replay_needed) {
-                        spin_lock(&exp->exp_lock);
-                        exp->exp_lock_replay_needed = 0;
-                        spin_unlock(&exp->exp_lock);
-                        LASSERT(atomic_read(&obd->obd_lock_replay_clients));
-                        atomic_dec(&obd->obd_lock_replay_clients);
-                }
+        }
+        /** Cleanup req replay fields */
+        if (exp->exp_req_replay_needed) {
+                spin_lock(&exp->exp_lock);
+                exp->exp_req_replay_needed = 0;
+                spin_unlock(&exp->exp_lock);
+                LASSERT(atomic_read(&obd->obd_req_replay_clients));
+                atomic_dec(&obd->obd_req_replay_clients);
+        }
+        /** Cleanup lock replay data */
+        if (exp->exp_lock_replay_needed) {
+                spin_lock(&exp->exp_lock);
+                exp->exp_lock_replay_needed = 0;
+                spin_unlock(&exp->exp_lock);
+                LASSERT(atomic_read(&obd->obd_lock_replay_clients));
+                atomic_dec(&obd->obd_lock_replay_clients);
         }
         spin_unlock_bh(&obd->obd_processing_task_lock);
 }
@@ -1159,40 +1154,43 @@ EXPORT_SYMBOL(class_disconnect_exports);
 /* Remove exports that have not completed recovery.
  */
 void class_disconnect_stale_exports(struct obd_device *obd,
-                                    int (*test_export)(struct obd_export *),
-                                    enum obd_option flags)
+                                    int (*test_export)(struct obd_export *))
 {
         struct list_head work_list;
         struct list_head *pos, *n;
         struct obd_export *exp;
+        int evicted = 0;
         ENTRY;
 
         CFS_INIT_LIST_HEAD(&work_list);
         spin_lock(&obd->obd_dev_lock);
-        obd->obd_stale_clients = 0;
         list_for_each_safe(pos, n, &obd->obd_exports) {
                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
                 if (test_export(exp))
                         continue;
 
-                list_move(&exp->exp_obd_chain, &work_list);
                 /* don't count self-export as client */
                 if (obd_uuid_equals(&exp->exp_client_uuid,
-                                     &exp->exp_obd->obd_uuid))
+                                    &exp->exp_obd->obd_uuid))
                         continue;
 
-                obd->obd_stale_clients++;
+                list_move(&exp->exp_obd_chain, &work_list);
+                evicted++;
                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
                        obd->obd_name, exp->exp_client_uuid.uuid,
                        exp->exp_connection == NULL ? "<unknown>" :
                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
+                print_export_data(exp, "EVICTING");
         }
         spin_unlock(&obd->obd_dev_lock);
 
-        CDEBUG(D_HA, "%s: disconnecting %d stale clients\n", obd->obd_name,
-               obd->obd_stale_clients);
-
-        class_disconnect_export_list(&work_list, flags);
+        if (evicted) {
+                CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
+                       obd->obd_name, evicted);
+                obd->obd_stale_clients += evicted;
+        }
+        class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
+                                                 OBD_OPT_ABORT_RECOV);
         EXIT;
 }
 EXPORT_SYMBOL(class_disconnect_stale_exports);
index 8885773..6c5e052 100644 (file)
@@ -2141,7 +2141,7 @@ int lprocfs_obd_rd_recovery_status(char *page, char **start, off_t off,
                 if (lprocfs_obd_snprintf(&page, size, &len,
                                          "completed_clients: %d/%d\n",
                                          obd->obd_max_recoverable_clients -
-                                         obd->obd_recoverable_clients,
+                                         obd->obd_stale_clients,
                                          obd->obd_max_recoverable_clients) <= 0)
                         goto out;
                 if (lprocfs_obd_snprintf(&page, size, &len,
@@ -2152,6 +2152,9 @@ int lprocfs_obd_rd_recovery_status(char *page, char **start, off_t off,
                                          "last_transno: "LPD64"\n",
                                          obd->obd_next_recovery_transno - 1)<=0)
                         goto out;
+                if (lprocfs_obd_snprintf(&page, size, &len, "VBR: %s\n",
+                                         obd->obd_version_recov ? "ON" : "OFF")<=0)
+                        goto out;
                 goto fclose;
         }
 
@@ -2169,12 +2172,20 @@ int lprocfs_obd_rd_recovery_status(char *page, char **start, off_t off,
                                  obd->obd_max_recoverable_clients) <= 0)
                 goto out;
         /* Number of clients that have completed recovery */
-        if (lprocfs_obd_snprintf(&page, size, &len,"completed_clients: %d/%d\n",
-                                 obd->obd_max_recoverable_clients -
-                                 obd->obd_recoverable_clients,
-                                 obd->obd_max_recoverable_clients) <= 0)
+        if (lprocfs_obd_snprintf(&page, size, &len,"req_replay_clients: %d\n",
+                                 atomic_read(&obd->obd_req_replay_clients))<= 0)
+                goto out;
+        if (lprocfs_obd_snprintf(&page, size, &len,"lock_repay_clients: %d\n",
+                                 atomic_read(&obd->obd_lock_replay_clients))<=0)
+                goto out;
+        if (lprocfs_obd_snprintf(&page, size, &len,"completed_clients: %d\n",
+                                 obd->obd_connected_clients -
+                                 atomic_read(&obd->obd_lock_replay_clients))<=0)
+                goto out;
+        if (lprocfs_obd_snprintf(&page, size, &len,"evicted_clients: %d\n",
+                                 obd->obd_stale_clients) <= 0)
                 goto out;
-        if (lprocfs_obd_snprintf(&page, size, &len,"replayed_requests: %d/??\n",
+        if (lprocfs_obd_snprintf(&page, size, &len,"replayed_requests: %d\n",
                                  obd->obd_replayed_requests) <= 0)
                 goto out;
         if (lprocfs_obd_snprintf(&page, size, &len, "queued_requests: %d\n",
index e3987c5..7fabcb4 100644 (file)
@@ -909,9 +909,6 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                 OBD_FREE_PTR(lcd);
 
         obd->obd_last_committed = le64_to_cpu(fsd->lsd_last_transno);
-
-        target_recovery_init(&filter->fo_lut, ost_handle);
-
 out:
         filter->fo_mount_count = mount_count + 1;
         fsd->lsd_mount_count = cpu_to_le64(filter->fo_mount_count);
@@ -924,7 +921,7 @@ out:
         RETURN(0);
 
 err_client:
-        target_recovery_fini(obd);
+        class_disconnect_exports(obd);
 err_fsd:
         filter_free_server_data(filter);
         RETURN(rc);
@@ -1370,6 +1367,9 @@ static int filter_prep(struct obd_device *obd)
                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
                 GOTO(err_filp, rc);
         }
+
+        target_recovery_init(&filter->fo_lut, ost_handle);
+
         /* open and create health check io file*/
         file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644);
         if (IS_ERR(file)) {