Whamcloud - gitweb
- land b_ver_recov
authortappro <tappro>
Thu, 24 Jul 2008 11:37:15 +0000 (11:37 +0000)
committertappro <tappro>
Thu, 24 Jul 2008 11:37:15 +0000 (11:37 +0000)
lustre/ldlm/ldlm_lib.c

index c67d48b..1a68cf0 100644 (file)
@@ -777,6 +777,12 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
                        "cookies not random?\n", target->obd_name,
                        libcfs_nid2str(req->rq_peer.nid), cluuid.uuid);
                 GOTO(out, rc = -EALREADY);
+        } else if (export->exp_delayed && target->obd_recovering) {
+                /* VBR: don't allow delayed connection during recovery */
+                CWARN("%s: NID %s (%s) export was already marked as delayed "
+                      "and will wait for end of recovery\n", target->obd_name,
+                       libcfs_nid2str(req->rq_peer.nid), cluuid.uuid);
+                GOTO(out, rc = -EBUSY);
         } else {
                 OBD_FAIL_TIMEOUT(OBD_FAIL_TGT_DELAY_RECONNECT, 2 * obd_timeout);
         }
@@ -808,6 +814,17 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
         client_nid = &req->rq_peer.nid;
 
+        /* VBR: for delayed connections we start recovery */
+        if (export && export->exp_delayed && !export->exp_in_recovery) {
+                LASSERT(!target->obd_recovering);
+                lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_DELAYED |
+                                        MSG_CONNECT_RECOVERING);
+                spin_lock_bh(&target->obd_processing_task_lock);
+                target->obd_version_recov = 1;
+                spin_unlock_bh(&target->obd_processing_task_lock);
+                target_start_and_reset_recovery_timer(target, handler, req, 0);
+        }
+
         if (export == NULL) {
                 if (target->obd_recovering) {
                         CERROR("%s: denying connection for new client %s (%s): "
@@ -889,6 +906,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
 
         if (export->exp_connection != NULL)
                 ptlrpc_put_connection(export->exp_connection);
+
         export->exp_connection = ptlrpc_get_connection(req->rq_peer,
                                                        req->rq_self,
                                                        &remote_uuid);
@@ -906,9 +924,12 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
                 GOTO(set_flags, rc = 0);
         }
 
-        if (target->obd_recovering)
+        if (target->obd_recovering && !export->exp_in_recovery) {
+                spin_lock(&export->exp_lock);
+                export->exp_in_recovery = 1;
+                spin_unlock(&export->exp_lock);
                 target->obd_connected_clients++;
-
+        }
         memcpy(&conn,
                lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, sizeof conn),
                sizeof conn);
@@ -1035,13 +1056,24 @@ static void target_release_saved_req(struct ptlrpc_request *req)
         OBD_FREE(req, sizeof *req);
 }
 
-static void target_finish_recovery(struct obd_device *obd)
+static void target_send_delayed_replies(struct obd_device *obd)
 {
-        struct list_head *tmp, *n;
+        struct ptlrpc_request *req, *tmp;
 
         LCONSOLE_INFO("%s: sending delayed replies to recovered clients\n",
                       obd->obd_name);
 
+        list_for_each_entry_safe(req, tmp, &obd->obd_delayed_reply_queue, rq_list) {
+                list_del_init(&req->rq_list);
+                DEBUG_REQ(D_HA, req, "delayed:");
+                ptlrpc_reply(req);
+                target_release_saved_req(req);
+        }
+        obd->obd_recovery_end = cfs_time_current_sec();
+}
+
+static void target_finish_recovery(struct obd_device *obd)
+{
         ldlm_reprocess_all_ns(obd->obd_namespace);
 
         /* when recovery finished, cleanup orphans on mds and ost */
@@ -1050,16 +1082,7 @@ static void target_finish_recovery(struct obd_device *obd)
                 LCONSOLE_WARN("%s: recovery %s: rc %d\n", obd->obd_name,
                               rc < 0 ? "failed" : "complete", rc);
         }
-
-        list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
-                struct ptlrpc_request *req;
-                req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                list_del(&req->rq_list);
-                DEBUG_REQ(D_HA, req, "delayed:");
-                ptlrpc_reply(req);
-                target_release_saved_req(req);
-        }
-        obd->obd_recovery_end = cfs_time_current_sec();
+        target_send_delayed_replies(obd);
 }
 
 static void abort_recovery_queue(struct obd_device *obd)
@@ -1132,6 +1155,11 @@ void target_abort_recovery(void *data)
         struct obd_device *obd = data;
         ENTRY;
 
+        LCONSOLE_WARN("%s: recovery is aborted by administrative request;"
+                      "%d clients are not recovered (%d clients did)\n",
+                      obd->obd_name, obd->obd_recoverable_clients,
+                      obd->obd_connected_clients);
+
         spin_lock_bh(&obd->obd_processing_task_lock);
         if (!obd->obd_recovering) {
                 spin_unlock_bh(&obd->obd_processing_task_lock);
@@ -1139,14 +1167,11 @@ void target_abort_recovery(void *data)
                 return;
         }
         obd->obd_recovering = obd->obd_abort_recovery = 0;
+        obd->obd_version_recov = 0;
+        obd->obd_recoverable_clients = 0;
         target_cancel_recovery_timer(obd);
         spin_unlock_bh(&obd->obd_processing_task_lock);
 
-        LCONSOLE_WARN("%s: recovery period over; %d clients never reconnected "
-                      "after %lds (%d clients did)\n",
-                      obd->obd_name, obd->obd_recoverable_clients,
-                      cfs_time_current_sec()- obd->obd_recovery_start,
-                      obd->obd_connected_clients);
         class_disconnect_stale_exports(obd);
         abort_recovery_queue(obd);
 
@@ -1155,15 +1180,31 @@ void target_abort_recovery(void *data)
         EXIT;
 }
 
+static void reset_recovery_timer(struct obd_device *, int, int);
 static void target_recovery_expired(unsigned long castmeharder)
 {
         struct obd_device *obd = (struct obd_device *)castmeharder;
-        CERROR("%s: recovery timed out, aborting\n", obd->obd_name);
+        int version_recov = obd->obd_version_recov;
+        LCONSOLE_WARN("%s: recovery period over; %d clients never reconnected "
+                      "after %lds (%d clients did)\n",
+                      obd->obd_name, obd->obd_recoverable_clients,
+                      cfs_time_current_sec()- obd->obd_recovery_start,
+                      obd->obd_connected_clients);
+
+
+        /* VBR: stale exports should be marked as delayed */
+        class_handle_stale_exports(obd);
+
         spin_lock_bh(&obd->obd_processing_task_lock);
-        if (obd->obd_recovering)
-                obd->obd_abort_recovery = 1;
+        /* VBR: not all clients are connected, so there can be gap */
+        if (obd->obd_recovering && !version_recov) {
+                obd->obd_version_recov = 1;
+        }
         cfs_waitq_signal(&obd->obd_next_transno_waitq);
         spin_unlock_bh(&obd->obd_processing_task_lock);
+        /* reset timer if recovery will proceed with versions now */
+        reset_recovery_timer(obd, OBD_RECOVERY_FACTOR * obd_timeout,
+                             !version_recov);
 }
 
 
@@ -1251,7 +1292,7 @@ target_start_and_reset_recovery_timer(struct obd_device *obd,
                                       struct ptlrpc_request *req,
                                       int new_client)
 {
-        int req_timeout = OBD_RECOVERY_FACTOR * 
+        int req_timeout = OBD_RECOVERY_FACTOR *
                           lustre_msg_get_timeout(req->rq_reqmsg);
 
         check_and_start_recovery_timer(obd, handler);
@@ -1272,13 +1313,15 @@ static int check_for_next_transno(struct obd_device *obd)
         max = obd->obd_max_recoverable_clients;
         req_transno = lustre_msg_get_transno(req->rq_reqmsg);
         connected = obd->obd_connected_clients;
-        completed = max - obd->obd_recoverable_clients;
+        completed = max - obd->obd_recoverable_clients -
+                    obd->obd_delayed_clients;
         queue_len = obd->obd_requests_queued_for_recovery;
         next_transno = obd->obd_next_recovery_transno;
 
-        CDEBUG(D_HA,"max: %d, connected: %d, completed: %d, queue_len: %d, "
-               "req_transno: "LPU64", next_transno: "LPU64"\n",
-               max, connected, completed, queue_len, req_transno, next_transno);
+        CDEBUG(D_HA,"max: %d, connected: %d, delayed %d, completed: %d, "
+               "queue_len: %d, req_transno: "LPU64", next_transno: "LPU64"\n",
+               max, connected, obd->obd_delayed_clients, completed, queue_len,
+               req_transno, next_transno);
         if (obd->obd_abort_recovery) {
                 CDEBUG(D_HA, "waking for aborted recovery\n");
                 wake_up = 1;
@@ -1288,7 +1331,7 @@ static int check_for_next_transno(struct obd_device *obd)
         } else if (req_transno == next_transno) {
                 CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
                 wake_up = 1;
-        } else if (queue_len + completed == max) {
+        } else if (queue_len == obd->obd_recoverable_clients) {
                 CDEBUG(D_ERROR,
                        "waking for skipped transno (skip: "LPD64
                        ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n",
@@ -1493,7 +1536,8 @@ int target_queue_last_replay_reply(struct ptlrpc_request *req, int rc)
         struct obd_device *obd = target_req2obd(req);
         struct ptlrpc_request *saved_req;
         struct lustre_msg *reqmsg;
-        int recovery_done = 0;
+        struct obd_export *exp = req->rq_export;
+        int recovery_done = 0, delayed_done = 0;
 
         LASSERT ((rc == 0) == req->rq_packed_final);
 
@@ -1523,47 +1567,126 @@ int target_queue_last_replay_reply(struct ptlrpc_request *req, int rc)
         spin_lock_bh(&obd->obd_processing_task_lock);
         if (obd->obd_stopping) {
                 spin_unlock_bh(&obd->obd_processing_task_lock);
-                OBD_FREE(reqmsg, req->rq_reqlen);
-                OBD_FREE(saved_req, sizeof *req);
-                req->rq_status = -ENOTCONN;
-                /* rv is ignored anyhow */
-                return -ENOTCONN;
+                goto out_noconn;
+        }
+
+        if (!exp->exp_vbr_failed) {
+                ptlrpc_rs_addref(req->rq_reply_state);  /* +1 ref for saved reply */
+                req = saved_req;
+                req->rq_reqmsg = reqmsg;
+                CFS_INIT_LIST_HEAD(&req->rq_list);
+                CFS_INIT_LIST_HEAD(&req->rq_replay_list);
+                class_export_get(exp);
+                list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
         }
-        ptlrpc_rs_addref(req->rq_reply_state);  /* +1 ref for saved reply */
-        req = saved_req;
-        req->rq_reqmsg = reqmsg;
-        class_export_get(req->rq_export);
-        list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
 
         /* only count the first "replay over" request from each
            export */
-        if (req->rq_export->exp_replay_needed) {
-                --obd->obd_recoverable_clients;
-                
-                spin_lock(&req->rq_export->exp_lock);
-                req->rq_export->exp_replay_needed = 0;
-                spin_unlock(&req->rq_export->exp_lock);
+        if (exp->exp_replay_needed) {
+                spin_lock(&exp->exp_lock);
+                exp->exp_replay_needed = 0;
+                spin_unlock(&exp->exp_lock);
+
+                if (!exp->exp_delayed) {
+                        --obd->obd_recoverable_clients;
+                } else {
+                        spin_lock(&exp->exp_lock);
+                        exp->exp_delayed = 0;
+                        spin_unlock(&exp->exp_lock);
+                        delayed_done = 1;
+                        if (obd->obd_delayed_clients == 0) {
+                                spin_unlock_bh(&obd->obd_processing_task_lock);
+                                LBUG();
+                        }
+                        --obd->obd_delayed_clients;
+                }
         }
         recovery_done = (obd->obd_recoverable_clients == 0);
         spin_unlock_bh(&obd->obd_processing_task_lock);
 
+        if (delayed_done) {
+                /* start pinging export */
+                spin_lock(&obd->obd_dev_lock);
+                list_add_tail(&exp->exp_obd_chain_timed,
+                              &obd->obd_exports_timed);
+                spin_unlock(&obd->obd_dev_lock);
+                target_send_delayed_replies(obd);
+        }
+
         OBD_RACE(OBD_FAIL_LDLM_RECOV_CLIENTS);
         if (recovery_done) {
                 spin_lock_bh(&obd->obd_processing_task_lock);
-                obd->obd_recovering = obd->obd_abort_recovery = 0;
+                obd->obd_recovering = 0;
+                obd->obd_version_recov = 0;
+                obd->obd_abort_recovery = 0;
                 target_cancel_recovery_timer(obd);
                 spin_unlock_bh(&obd->obd_processing_task_lock);
 
+
+                if (!delayed_done)
                 target_finish_recovery(obd);
                 CDEBUG(D_HA, "%s: recovery complete\n",
                        obd_uuid2str(&obd->obd_uuid));
         } else {
                 CWARN("%s: %d recoverable clients remain\n",
-                       obd->obd_name, obd->obd_recoverable_clients);
+                      obd->obd_name, obd->obd_recoverable_clients);
                 cfs_waitq_signal(&obd->obd_next_transno_waitq);
         }
 
+        /* VBR: disconnect export with failed recovery */
+        if (exp->exp_vbr_failed) {
+                CWARN("%s: disconnect export %s\n", obd->obd_name,
+                      exp->exp_client_uuid.uuid);
+                class_fail_export(exp);
+                goto out_noconn;
+        }
+
         return 1;
+
+out_noconn:
+        OBD_FREE(reqmsg, req->rq_reqlen);
+        OBD_FREE(saved_req, sizeof *req);
+        req->rq_status = -ENOTCONN;
+        /* rv is ignored anyhow */
+        return -ENOTCONN;
+}
+
+int target_handle_reply(struct ptlrpc_request *req, int rc, int fail)
+{
+        struct obd_device *obd = NULL;
+
+        if (req->rq_export)
+                obd = target_req2obd(req);
+
+        /* handle replay reply for version recovery */
+        if (obd && obd->obd_version_recov &&
+            (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
+                LASSERT(req->rq_repmsg);
+                lustre_msg_add_flags(req->rq_repmsg, MSG_VERSION_REPLAY);
+        }
+
+        /* handle last replay */
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
+                if (obd &&
+                    lustre_msg_get_flags(req->rq_reqmsg) & MSG_DELAY_REPLAY) {
+                        DEBUG_REQ(D_HA, req,
+                                  "delayed LAST_REPLAY, queuing reply");
+                        rc = target_queue_last_replay_reply(req, rc);
+                        LASSERT(req->rq_export->exp_delayed == 0);
+                        return rc;
+                }
+
+                if (obd && obd->obd_recovering) { /* normal recovery */
+                        DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
+                        rc = target_queue_last_replay_reply(req, rc);
+                        return rc;
+                }
+
+                /* Lost a race with recovery; let the error path DTRT. */
+                rc = req->rq_status = -ENOTCONN;
+        }
+        target_send_reply(req, rc, fail);
+        return 0;
 }
 
 static inline struct ldlm_pool *ldlm_exp2pl(struct obd_export *exp)
@@ -1576,9 +1699,9 @@ int target_pack_pool_reply(struct ptlrpc_request *req)
 {
         struct obd_device *obd;
         ENTRY;
-   
-        /* 
-         * Check that we still have all structures alive as this may 
+
+        /*
+         * Check that we still have all structures alive as this may
          * be some late rpc in shutdown time.
          */
         if (unlikely(!req->rq_export || !req->rq_export->exp_obd ||
@@ -1588,8 +1711,8 @@ int target_pack_pool_reply(struct ptlrpc_request *req)
                 RETURN(0);
         }
 
-        /* 
-         * OBD is alive here as export is alive, which we checked above. 
+        /*
+         * OBD is alive here as export is alive, which we checked above.
          */
         obd = req->rq_export->exp_obd;
 
@@ -1652,7 +1775,7 @@ target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
         LASSERT (list_empty(&rs->rs_obd_list));
         LASSERT (list_empty(&rs->rs_exp_list));
 
-        exp = class_export_get (req->rq_export);
+        exp = class_export_get(req->rq_export);
         obd = exp->exp_obd;
 
         /* disable reply scheduling onto srv_reply_queue while I'm setting up */
@@ -1662,15 +1785,16 @@ target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
         rs->rs_transno   = req->rq_transno;
         rs->rs_export    = exp;
 
-        spin_lock(&obd->obd_uncommitted_replies_lock);
+        spin_lock(&exp->exp_uncommitted_replies_lock);
 
-        if (rs->rs_transno > obd->obd_last_committed) {
+        /* VBR: use exp_last_committed */
+        if (rs->rs_transno > exp->exp_last_committed) {
                 /* not committed already */
                 list_add_tail (&rs->rs_obd_list,
-                               &obd->obd_uncommitted_replies);
+                               &exp->exp_uncommitted_replies);
         }
 
-        spin_unlock (&obd->obd_uncommitted_replies_lock);
+        spin_unlock (&exp->exp_uncommitted_replies_lock);
         spin_lock (&exp->exp_lock);
 
         list_add_tail (&rs->rs_exp_list, &exp->exp_outstanding_replies);
@@ -1709,23 +1833,29 @@ target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
 
 int target_handle_ping(struct ptlrpc_request *req)
 {
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY &&
+            req->rq_export->exp_in_recovery) {
+                spin_lock(&req->rq_export->exp_lock);
+                req->rq_export->exp_in_recovery = 0;
+                spin_unlock(&req->rq_export->exp_lock);
+        }
         obd_ping(req->rq_export);
         return lustre_pack_reply(req, 1, NULL, NULL);
 }
 
 void target_committed_to_req(struct ptlrpc_request *req)
 {
-        struct obd_device *obd = req->rq_export->exp_obd;
-
-        if (!obd->obd_no_transno && req->rq_repmsg != NULL)
+        struct obd_export *exp = req->rq_export;
+        if (!exp->exp_obd->obd_no_transno && req->rq_repmsg != NULL) {
                 lustre_msg_set_last_committed(req->rq_repmsg,
-                                              obd->obd_last_committed);
-        else
+                                              exp->exp_last_committed);
+        } else {
                 DEBUG_REQ(D_IOCTL, req, "not sending last_committed update (%d/"
-                          "%d)", obd->obd_no_transno, req->rq_repmsg == NULL);
-
+                          "%d)", exp->exp_obd->obd_no_transno,
+                          req->rq_repmsg == NULL);
+        }
         CDEBUG(D_INFO, "last_committed x"LPU64", this req x"LPU64"\n",
-               obd->obd_last_committed, req->rq_xid);
+               exp->exp_obd->obd_last_committed, req->rq_xid);
 }
 
 EXPORT_SYMBOL(target_committed_to_req);
@@ -1739,7 +1869,7 @@ int target_handle_qc_callback(struct ptlrpc_request *req)
         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
                                    lustre_swab_obd_quotactl);
         if (oqctl == NULL) {
-                CERROR("Can't unpack obd_quotactl\n");
+                CERROR("Can't unpack obd_quatactl\n");
                 RETURN(-EPROTO);
         }