Whamcloud - gitweb
land 0.5.20.3 b_devel onto HEAD (b_devel will remain)
[fs/lustre-release.git] / lustre / lib / target.c
index 81638f1..590ae4b 100644 (file)
@@ -69,7 +69,8 @@ int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
         RETURN(0);
 }
 
-int target_handle_connect(struct ptlrpc_request *req)
+
+int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
 {
         struct obd_device *target;
         struct obd_export *export = NULL;
@@ -103,6 +104,11 @@ int target_handle_connect(struct ptlrpc_request *req)
         if (!target)
                 GOTO(out, rc = -ENODEV);
 
+        spin_lock_bh(&target->obd_processing_task_lock);
+        if (target->obd_flags & OBD_ABORT_RECOVERY)
+                target_abort_recovery(target);
+        spin_unlock_bh(&target->obd_processing_task_lock);
+
         conn.addr = req->rq_reqmsg->addr;
         conn.cookie = req->rq_reqmsg->cookie;
 
@@ -132,8 +138,11 @@ int target_handle_connect(struct ptlrpc_request *req)
                 spin_unlock(&target->obd_dev_lock);
 
         /* Tell the client if we're in recovery. */
-        if (target->obd_flags & OBD_RECOVERING)
+        /* If this is the first client, start the recovery timer */
+        if (target->obd_flags & OBD_RECOVERING) {
                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
+                target_start_recovery_timer(target, handler);
+        }
 
         /* Tell the client if we support replayable requests */
         if (target->obd_flags & OBD_REPLAYABLE)
@@ -151,16 +160,11 @@ int target_handle_connect(struct ptlrpc_request *req)
                 }
         }
 
-        if (rc == EALREADY) {
-                /* We indicate the reconnection in a flag, not an error code. */
-                lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
-                rc = 0;
-        } else if (rc) {
-                GOTO(out, rc);
-        }
-
         /* If all else goes well, this is our RPC return code. */
-        req->rq_status = rc;
+        req->rq_status = 0;
+
+        if (rc && rc != EALREADY)
+                GOTO(out, rc);
 
         req->rq_repmsg->addr = conn.addr;
         req->rq_repmsg->cookie = conn.cookie;
@@ -174,6 +178,12 @@ int target_handle_connect(struct ptlrpc_request *req)
                 ptlrpc_put_connection(req->rq_connection);
         req->rq_connection = ptlrpc_connection_addref(export->exp_connection);
 
+        if (rc == EALREADY) {
+                /* We indicate the reconnection in a flag, not an error code. */
+                lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
+                GOTO(out, rc = 0);
+        }
+
         spin_lock(&export->exp_connection->c_lock);
         list_add(&export->exp_conn_chain, &export->exp_connection->c_exports);
         spin_unlock(&export->exp_connection->c_lock);
@@ -224,6 +234,8 @@ static int target_disconnect_client(struct ptlrpc_connection *conn)
         list_for_each_safe(expiter, n, &conn->c_exports) {
                 exp = list_entry(expiter, struct obd_export, exp_conn_chain);
 
+                CDEBUG(D_HA, "disconnecting export %p/%s\n",
+                       exp, exp->exp_client_uuid.uuid);
                 hdl.addr = (__u64)(unsigned long)exp;
                 hdl.cookie = exp->exp_cookie;
                 rc = obd_disconnect(&hdl);
@@ -264,3 +276,248 @@ int target_revoke_connection(struct recovd_data *rd, int phase)
         LBUG();
         RETURN(-ENOSYS);
 }
+
+/*
+ * Recovery functions 
+ */
+
+static void abort_delayed_replies(struct obd_device *obd)
+{
+        struct ptlrpc_request *req;
+        struct list_head *tmp, *n;
+        list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
+                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                DEBUG_REQ(D_ERROR, req, "aborted:");
+                req->rq_status = -ENOTCONN;
+                req->rq_type = PTL_RPC_MSG_ERR;
+                ptlrpc_reply(req->rq_svc, req);
+                list_del(&req->rq_list);
+                OBD_FREE(req, sizeof *req);
+        }
+}
+
+void target_abort_recovery(void *data)
+{
+        struct obd_device *obd = data;
+        CERROR("disconnecting clients and aborting recovery\n");
+        obd->obd_recoverable_clients = 0;
+        obd->obd_flags &= ~(OBD_RECOVERING | OBD_ABORT_RECOVERY);
+        abort_delayed_replies(obd);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        class_disconnect_all(obd);
+        spin_lock_bh(&obd->obd_processing_task_lock);
+}
+
+static void target_recovery_expired(unsigned long castmeharder)
+{
+        struct obd_device *obd = (struct obd_device *)castmeharder;
+        CERROR("recovery timed out, aborting\n");
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        obd->obd_flags |= OBD_ABORT_RECOVERY;
+        wake_up(&obd->obd_next_transno_waitq);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+}
+
+static void reset_recovery_timer(struct obd_device *obd)
+{
+        CDEBUG(D_ERROR, "timer will expire in %ld seconds\n",
+               OBD_RECOVERY_TIMEOUT / HZ);
+        mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
+}
+
+
+/* Only start it the first time called */
+void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
+{
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        if (obd->obd_recovery_handler) {
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+                return;
+        }
+        CERROR("%s: starting recovery timer\n", obd->obd_name);
+        obd->obd_recovery_handler = handler;
+        obd->obd_recovery_timer.function = target_recovery_expired;
+        obd->obd_recovery_timer.data = (unsigned long)obd;
+        init_timer(&obd->obd_recovery_timer);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+
+        reset_recovery_timer(obd);
+}
+
+static void cancel_recovery_timer(struct obd_device *obd)
+{
+        del_timer(&obd->obd_recovery_timer);
+}
+
+static int check_for_next_transno(struct obd_device *obd)
+{
+        struct ptlrpc_request *req;
+        req = list_entry(obd->obd_recovery_queue.next,
+                         struct ptlrpc_request, rq_list);
+        LASSERT(req->rq_reqmsg->transno >= obd->obd_next_recovery_transno);
+
+        return req->rq_reqmsg->transno == obd->obd_next_recovery_transno ||
+                (obd->obd_flags & OBD_RECOVERING) == 0;
+}
+
+static void process_recovery_queue(struct obd_device *obd)
+{
+        struct ptlrpc_request *req;
+        int aborted = 0;
+        ENTRY;
+
+        for (;;) {
+                spin_lock_bh(&obd->obd_processing_task_lock);
+                LASSERT(obd->obd_processing_task == current->pid);
+                req = list_entry(obd->obd_recovery_queue.next,
+                                 struct ptlrpc_request, rq_list);
+
+                if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) {
+                        spin_unlock_bh(&obd->obd_processing_task_lock);
+                        CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
+                               LPD64")\n",
+                               obd->obd_next_recovery_transno,
+                               req->rq_reqmsg->transno);
+                        wait_event(obd->obd_next_transno_waitq,
+                                   check_for_next_transno(obd));
+                        spin_lock_bh(&obd->obd_processing_task_lock);
+                        if (obd->obd_flags & OBD_ABORT_RECOVERY) {
+                                target_abort_recovery(obd);
+                                aborted = 1;
+                        }
+                        spin_unlock_bh(&obd->obd_processing_task_lock);
+                        if (aborted)
+                                return;
+                        continue;
+                }
+                list_del_init(&req->rq_list);
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+
+                DEBUG_REQ(D_ERROR, req, "processing: ");
+                (void)obd->obd_recovery_handler(req);
+                reset_recovery_timer(obd);
+#warning FIXME: mds_fsync_super(mds->mds_sb);
+                OBD_FREE(req, sizeof *req);
+                spin_lock_bh(&obd->obd_processing_task_lock);
+                obd->obd_next_recovery_transno++;
+                if (list_empty(&obd->obd_recovery_queue)) {
+                        obd->obd_processing_task = 0;
+                        spin_unlock_bh(&obd->obd_processing_task_lock);
+                        break;
+                }
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+        }
+        EXIT;
+}
+
+int target_queue_recovery_request(struct ptlrpc_request *req,
+                                  struct obd_device *obd)
+{
+        struct list_head *tmp;
+        int inserted = 0;
+        __u64 transno = req->rq_reqmsg->transno;
+        struct ptlrpc_request *saved_req;
+
+        if (!transno) {
+                INIT_LIST_HEAD(&req->rq_list);
+                DEBUG_REQ(D_HA, req, "not queueing");
+                return 1;
+        }
+
+        spin_lock_bh(&obd->obd_processing_task_lock);
+
+        if (obd->obd_processing_task == current->pid) {
+                /* Processing the queue right now, don't re-add. */
+                LASSERT(list_empty(&req->rq_list));
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+                return 1;
+        }
+
+        OBD_ALLOC(saved_req, sizeof *saved_req);
+        if (!saved_req)
+                LBUG();
+        memcpy(saved_req, req, sizeof *req);
+        req = saved_req;
+        INIT_LIST_HEAD(&req->rq_list);
+
+        /* XXX O(n^2) */
+        list_for_each(tmp, &obd->obd_recovery_queue) {
+                struct ptlrpc_request *reqiter =
+                        list_entry(tmp, struct ptlrpc_request, rq_list);
+
+                if (reqiter->rq_reqmsg->transno > transno) {
+                        list_add_tail(&req->rq_list, &reqiter->rq_list);
+                        inserted = 1;
+                        break;
+                }
+        }
+
+        if (!inserted) {
+                list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
+        }
+
+        if (obd->obd_processing_task != 0) {
+                /* Someone else is processing this queue, we'll leave it to
+                 * them.
+                 */
+                if (transno == obd->obd_next_recovery_transno)
+                        wake_up(&obd->obd_next_transno_waitq);
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+                return 0;
+        }
+
+        /* Nobody is processing, and we know there's (at least) one to process
+         * now, so we'll do the honours.
+         */
+        obd->obd_processing_task = current->pid;
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+
+        process_recovery_queue(obd);
+        return 0;
+}
+
+struct obd_device * target_req2obd(struct ptlrpc_request *req)
+{
+        return req->rq_export->exp_obd;
+}
+
+int target_queue_final_reply(struct ptlrpc_request *req, int rc)
+{
+        struct obd_device *obd = target_req2obd(req);
+        struct ptlrpc_request *saved_req;
+
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        if (rc) {
+                /* Just like ptlrpc_error, but without the sending. */
+                lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
+                                &req->rq_repmsg);
+                req->rq_type = PTL_RPC_MSG_ERR;
+        }
+
+        LASSERT(list_empty(&req->rq_list));
+        OBD_ALLOC(saved_req, sizeof *saved_req);
+        memcpy(saved_req, req, sizeof *saved_req);
+        req = saved_req;
+        list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
+        if (--obd->obd_recoverable_clients == 0) {
+                struct list_head *tmp, *n;
+                ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
+                CDEBUG(D_ERROR,
+                       "all clients recovered, sending delayed replies\n");
+                obd->obd_flags &= ~OBD_RECOVERING;
+                list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
+                        req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                        DEBUG_REQ(D_ERROR, req, "delayed:");
+                        ptlrpc_reply(req->rq_svc, req);
+                        list_del(&req->rq_list);
+                        OBD_FREE(req, sizeof *req);
+                }
+                cancel_recovery_timer(obd);
+        } else {
+                CERROR("%d recoverable clients remain\n",
+                       obd->obd_recoverable_clients);
+        }
+
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        return 1;
+}