Whamcloud - gitweb
- minor debug messages fixes
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
index dcedbbb..38d9feb 100644 (file)
@@ -666,9 +666,10 @@ int target_handle_connect(struct ptlrpc_request *req)
 
         /* Tell the client if we're in recovery. */
         /* If this is the first client, start the recovery timer */
-        CWARN("%s: connection from %s@%s/%lu %s\n", target->obd_name, cluuid.uuid,
+        CWARN("%s: connection from %s@%s/%lu %st"LPU64"\n", target->obd_name, cluuid.uuid,
               ptlrpc_peernid2str(&req->rq_peer, peer_str), *cfp,
-              target->obd_recovering ? "(recovering)" : "");
+              target->obd_recovering ? "recovering/" : "",
+              conn_data->transno);
 
         if (target->obd_recovering) {
                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
@@ -767,8 +768,18 @@ int target_handle_connect(struct ptlrpc_request *req)
                 GOTO(out, rc = 0);
         }
 
-        if (target->obd_recovering)
+        spin_lock_bh(&target->obd_processing_task_lock);
+        if (target->obd_recovering && export->exp_connected == 0) {
+                __u64 t = conn_data->transno;
+                export->exp_connected = 1;
+                if ((lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_TRANSNO)
+                                && t < target->obd_next_recovery_transno)
+                        target->obd_next_recovery_transno = t;
                 target->obd_connected_clients++;
+                if (target->obd_connected_clients == target->obd_max_recoverable_clients)
+                        wake_up(&target->obd_next_transno_waitq);
+        }
+        spin_unlock_bh(&target->obd_processing_task_lock);
 
         memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, offset + 2, sizeof(conn)),
                sizeof(conn));
@@ -1094,7 +1105,7 @@ static int check_for_next_transno(struct obd_device *obd)
 
         max = obd->obd_max_recoverable_clients;
         connected = obd->obd_connected_clients;
-        completed = max - atomic_read(&obd->obd_req_replay_clients);
+        completed = max - obd->obd_recoverable_clients;
         queue_len = obd->obd_requests_queued_for_recovery;
         next_transno = obd->obd_next_recovery_transno;
 
@@ -1112,12 +1123,19 @@ static int check_for_next_transno(struct obd_device *obd)
                 wake_up = 1;
         } else if (queue_len + completed == max) {
                 LASSERT(req->rq_reqmsg->transno >= next_transno);
-                CDEBUG(D_ERROR,
+                CDEBUG(req_transno > obd->obd_last_committed ? D_ERROR : D_HA,
                        "waking for skipped transno (skip: "LPD64
                        ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n",
                        next_transno, queue_len, completed, max, req_transno);
                 obd->obd_next_recovery_transno = req_transno;
                 wake_up = 1;
+        } else if (queue_len == atomic_read(&obd->obd_req_replay_clients)) {
+                /* some clients haven't connected in time, but we need
+                 * their requests to continue recovery. so, we abort ... */
+                CDEBUG(D_ERROR, "abort due to missed clients: queue: %d max: %d\n",
+                       queue_len, max);
+                obd->obd_abort_recovery = 1;
+                wake_up = 1;
         }
         spin_unlock_bh(&obd->obd_processing_task_lock);
         
@@ -1228,12 +1246,30 @@ static int lock_replay_done(struct obd_export *exp)
         return 1;
 }
 
+static int connect_done(struct obd_export *exp)
+{
+        if (exp->exp_connected)
+                return 1;
+        return 0;
+}
+
+static int check_for_clients(struct obd_device *obd)
+{
+        if (obd->obd_abort_recovery)
+                return 1;
+        LASSERT(obd->obd_connected_clients <= obd->obd_max_recoverable_clients);
+        if (obd->obd_connected_clients == obd->obd_max_recoverable_clients)
+                return 1;
+        return 0;
+}
+
 static int target_recovery_thread(void *arg)
 {
         struct obd_device *obd = arg;
         struct ptlrpc_request *req;
         struct target_recovery_data *trd = &obd->obd_recovery_data;
         char peer_str[PTL_NALFMT_SIZE];
+        struct l_wait_info lwi = { 0 };
         unsigned long flags;
         ENTRY;
 
@@ -1251,9 +1287,29 @@ static int target_recovery_thread(void *arg)
         obd->obd_recovering = 1;
         complete(&trd->trd_starting);
 
-        /* The first stage: replay requests */
-        CWARN("1: request replay stage - %d clients\n",
-              atomic_read(&obd->obd_req_replay_clients));
+        /* first of all, we have to know the first transno to replay */
+        obd->obd_abort_recovery = 0;
+        l_wait_event(obd->obd_next_transno_waitq,
+                     check_for_clients(obd), &lwi);
+        
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        target_cancel_recovery_timer(obd);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+
+        /* If some clients haven't connected in time, evict them */
+        if (obd->obd_abort_recovery) {
+                int stale;
+                CERROR("some clients haven't connect in time, evict them ...\n");
+                obd->obd_abort_recovery = 0;
+                stale = class_disconnect_stale_exports(obd, connect_done, 0);
+                atomic_sub(stale, &obd->obd_req_replay_clients);
+                atomic_sub(stale, &obd->obd_lock_replay_clients);
+        }
+
+        /* next stage: replay requests */
+        CWARN("1: request replay stage - %d clients from t"LPU64"\n",
+              atomic_read(&obd->obd_req_replay_clients),
+              obd->obd_next_recovery_transno);
         while ((req = target_next_replay_req(obd))) {
                 LASSERT(trd->trd_processing_task == current->pid);
                 DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s: ", 
@@ -1345,9 +1401,10 @@ int target_start_recovery_thread(struct obd_device *obd, svc_handler_t handler)
         init_completion(&trd->trd_finishing);
         trd->trd_recovery_handler = handler;
 
-        if (kernel_thread(target_recovery_thread, obd, 0) == 0) 
+        if (kernel_thread(target_recovery_thread, obd, 0) > 0) {
                 wait_for_completion(&trd->trd_starting);
-        else
+                LASSERT(obd->obd_recovering != 0);
+        } else
                 rc = -ECHILD;
 
         return rc;
@@ -1380,6 +1437,7 @@ int target_process_req_flags(struct obd_device *obd, struct ptlrpc_request *req)
                         LASSERT(atomic_read(&obd->obd_req_replay_clients) > 0);
                         exp->exp_req_replay_needed = 0;
                         atomic_dec(&obd->obd_req_replay_clients);
+                        obd->obd_recoverable_clients--;
                         if (atomic_read(&obd->obd_req_replay_clients) == 0) {
                                 CDEBUG(D_HA, "all clients have replayed reqs\n");
                                 wake_up(&obd->obd_next_transno_waitq);
@@ -1468,7 +1526,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
          * handled will pass through here and be processed immediately.
          */
         spin_lock_bh(&obd->obd_processing_task_lock);
-        if (transno < obd->obd_next_recovery_transno) {
+        if (transno < obd->obd_next_recovery_transno && check_for_clients(obd)) {
                 /* Processing the queue right now, don't re-add. */
                 LASSERT(list_empty(&req->rq_list));
                 spin_unlock_bh(&obd->obd_processing_task_lock);