Whamcloud - gitweb
Landing the ldlm_testing branch; now the only difference is that the locking
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 9252705..9523a21 100644 (file)
 
 #define DEBUG_SUBSYSTEM S_RPC
 
-#include <linux/lustre_net.h>
+#include <linux/lustre_ha.h>
 
-void llite_ha_conn_manage(struct lustre_ha_mgr *mgr, struct ptlrpc_client *cli)
-{
-        ENTRY;
-        cli->cli_ha_mgr = mgr;
-        spin_lock(&mgr->mgr_lock);
-        list_add(&cli->cli_ha_item, &mgr->mgr_connections_lh); 
-        spin_unlock(&mgr->mgr_lock); 
-        EXIT;
-}
-
-void llite_ha_conn_fail(struct ptlrpc_client *cli)
-{
-        ENTRY;
-        spin_lock(&cli->cli_ha_mgr->mgr_lock);
-        list_del(&cli->cli_ha_item);
-        list_add(&cli->cli_ha_item, &cli->cli_ha_mgr->mgr_troubled_lh); 
-        spin_unlock(&cli->cli_ha_mgr->mgr_lock); 
-        wake_up(&cli->cli_ha_mgr->mgr_waitq);
-        EXIT;
-}
-
-void ptlrpc_init_client(struct lustre_ha_mgr *mgr, int req_portal,
+void ptlrpc_init_client(struct recovd_obd *recovd, 
+                        void (*recover)(struct ptlrpc_client *recover),
+                        int req_portal,
                         int rep_portal, struct ptlrpc_client *cl)
 {
         memset(cl, 0, sizeof(*cl));
-        cl->cli_ha_mgr = mgr;
-        if (mgr)
-                llite_ha_conn_manage(mgr, cl);
+        cl->cli_recovd = recovd;
+        cl->cli_recover = recover;
+        if (recovd)
+                recovd_cli_manage(recovd, cl);
         cl->cli_obd = NULL;
         cl->cli_request_portal = req_portal;
         cl->cli_reply_portal = rep_portal;
         INIT_LIST_HEAD(&cl->cli_sending_head);
         INIT_LIST_HEAD(&cl->cli_sent_head);
+        INIT_LIST_HEAD(&cl->cli_replied_head);
+        INIT_LIST_HEAD(&cl->cli_replay_head);
+        INIT_LIST_HEAD(&cl->cli_dying_head);
+        spin_lock_init(&cl->cli_lock);
         sema_init(&cl->cli_rpc_sem, 32);
 }
 
-struct ptlrpc_connection *ptlrpc_connect_client(char *uuid)
+__u8 *ptlrpc_req_to_uuid(struct ptlrpc_request *req)
+{
+        return req->rq_connection->c_remote_uuid;
+}
+
+struct ptlrpc_connection *ptlrpc_uuid_to_connection(char *uuid)
 {
         struct ptlrpc_connection *c;
         struct lustre_peer peer;
@@ -94,6 +85,20 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn)
         return bulk;
 }
 
+void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk)
+{
+        ENTRY;
+        if (bulk == NULL) {
+                EXIT;
+                return;
+        }
+
+        ptlrpc_put_connection(bulk->b_connection);
+
+        OBD_FREE(bulk, sizeof(*bulk));
+        EXIT;
+}
+
 struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
                                        struct ptlrpc_connection *conn,
                                        int opcode, int count, int *lengths,
@@ -116,24 +121,41 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
                 RETURN(NULL);
         }
 
-        request->rq_time = CURRENT_TIME;
-        request->rq_type = PTL_RPC_REQUEST;
+        request->rq_type = PTL_RPC_TYPE_REQUEST;
         request->rq_connection = ptlrpc_connection_addref(conn);
 
-        request->rq_reqmsg->conn = (__u64)(unsigned long)conn;
-        request->rq_reqmsg->token = conn->c_token;
+        request->rq_reqmsg->conn = (__u64)(unsigned long)conn->c_remote_conn;
+        request->rq_reqmsg->token = conn->c_remote_token;
         request->rq_reqmsg->opc = HTON__u32(opcode);
-        request->rq_reqmsg->type = HTON__u32(request->rq_type);
+        request->rq_reqmsg->type = HTON__u32(PTL_RPC_MSG_REQUEST);
+        INIT_LIST_HEAD(&request->rq_list);
+
+        /* this will be dec()d once in req_finished, once in free_committed */
+        atomic_set(&request->rq_refcount, 2);
 
         spin_lock(&conn->c_lock);
         request->rq_reqmsg->xid = HTON__u32(++conn->c_xid_out);
-        spin_unlock(&c->c_lock);
+        spin_unlock(&conn->c_lock);
 
         request->rq_client = cl;
 
         RETURN(request);
 }
 
+void ptlrpc_req_finished(struct ptlrpc_request *request)
+{
+        if (request == NULL)
+                return;
+
+        if (request->rq_repmsg != NULL) { 
+                OBD_FREE(request->rq_repmsg, request->rq_replen);
+                request->rq_repmsg = NULL;
+        }
+
+        if (atomic_dec_and_test(&request->rq_refcount))
+                ptlrpc_free_req(request);
+}
+
 void ptlrpc_free_req(struct ptlrpc_request *request)
 {
         if (request == NULL)
@@ -141,6 +163,14 @@ void ptlrpc_free_req(struct ptlrpc_request *request)
 
         if (request->rq_repmsg != NULL)
                 OBD_FREE(request->rq_repmsg, request->rq_replen);
+        if (request->rq_reqmsg != NULL)
+                OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
+
+        if (request->rq_client) {
+                spin_lock(&request->rq_client->cli_lock);
+                list_del(&request->rq_list);
+                spin_unlock(&request->rq_client->cli_lock);
+        }
 
         ptlrpc_put_connection(request->rq_connection);
 
@@ -151,23 +181,36 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
 {
         int rc = 0;
 
-        schedule_timeout(3 * HZ);  /* 3 second timeout */
         if (req->rq_repmsg != NULL) {
-                req->rq_flags = PTL_RPC_REPLY;
+                req->rq_transno = NTOH__u64(req->rq_repmsg->transno);
+                req->rq_flags |= PTL_RPC_FL_REPLY;
+                GOTO(out, rc = 1);
+        }
+
+        if (req->rq_flags & PTL_RPC_FL_RESEND) { 
+                CERROR("-- RESEND --\n");
+                req->rq_status = -EAGAIN;
                 GOTO(out, rc = 1);
         }
 
-        if (CURRENT_TIME - req->rq_time >= 3) { 
-                CERROR("-- REQ TIMEOUT --\n"); 
-                if (req->rq_client && req->rq_client->cli_ha_mgr)
-                        llite_ha_conn_fail(req->rq_client); 
-                return 0;
+        if (CURRENT_TIME - req->rq_time >= req->rq_timeout) {
+                CERROR("-- REQ TIMEOUT --\n");
+                /* clear the timeout */
+                req->rq_timeout = 0;
+                req->rq_flags |= PTL_RPC_FL_TIMEOUT;
+                if (req->rq_client && req->rq_client->cli_recovd)
+                        recovd_cli_fail(req->rq_client);
+                GOTO(out, rc = 0);
+        }
+
+        if (req->rq_timeout) { 
+                schedule_timeout(req->rq_timeout * HZ);
         }
 
         if (sigismember(&(current->pending.signal), SIGKILL) ||
             sigismember(&(current->pending.signal), SIGTERM) ||
             sigismember(&(current->pending.signal), SIGINT)) {
-                req->rq_flags = PTL_RPC_INTR;
+                req->rq_flags |= PTL_RPC_FL_INTR;
                 GOTO(out, rc = 1);
         }
 
@@ -194,6 +237,11 @@ int ptlrpc_check_status(struct ptlrpc_request *req, int err)
                 RETURN(-ENOMEM);
         }
 
+        if (req->rq_repmsg->type == NTOH__u32(PTL_RPC_MSG_ERR)) {
+                CERROR("req->rq_repmsg->type == PTL_RPC_MSG_ERR\n");
+                RETURN(-EINVAL);
+        }
+
         if (req->rq_repmsg->status != 0) {
                 CERROR("req->rq_repmsg->status is %d\n",
                        req->rq_repmsg->status);
@@ -223,13 +271,97 @@ static int ptlrpc_abort(struct ptlrpc_request *request)
         return 0;
 }
 
+/* caller must lock cli */
+void ptlrpc_free_committed(struct ptlrpc_client *cli)
+{
+        struct list_head *tmp, *saved;
+        struct ptlrpc_request *req;
+
+        list_for_each_safe(tmp, saved, &cli->cli_replied_head) {
+                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+
+                /* not yet committed */ 
+                if (req->rq_transno > cli->cli_last_committed)
+                        break; 
+
+                /* retain for replay if flagged */
+                list_del(&req->rq_list);
+                if (req->rq_flags & PTL_RPC_FL_RETAIN) {
+                        list_add(&req->rq_list, &cli->cli_replay_head);
+                } else {
+                        CDEBUG(D_INFO, "Marking request %p as committed ("
+                               "transno=%Lu, last_committed=%Lu\n", req,
+                               req->rq_transno, cli->cli_last_committed);
+                        if (atomic_dec_and_test(&req->rq_refcount)) {
+                                /* we do this to prevent free_req deadlock */
+                                req->rq_client = NULL;
+                                ptlrpc_free_req(req);
+                        } else
+                                list_add(&req->rq_list, &cli->cli_dying_head);
+                }
+        }
+
+        EXIT;
+        return;
+}
+
+void ptlrpc_cleanup_client(struct ptlrpc_client *cli)
+{
+        struct list_head *tmp, *saved;
+        struct ptlrpc_request *req;
+        ENTRY;
+
+        spin_lock(&cli->cli_lock);
+        list_for_each_safe(tmp, saved, &cli->cli_replied_head) {
+                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                /* We do this to prevent ptlrpc_free_req from taking cli_lock */
+                CDEBUG(D_INFO, "Cleaning req %p from replied list.\n", req);
+                list_del(&req->rq_list);
+                req->rq_client = NULL;
+                ptlrpc_free_req(req); 
+        }
+        list_for_each_safe(tmp, saved, &cli->cli_sent_head) {
+                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                CDEBUG(D_INFO, "Cleaning req %p from sent list.\n", req);
+                list_del(&req->rq_list);
+                req->rq_client = NULL;
+                ptlrpc_free_req(req); 
+        }
+        list_for_each_safe(tmp, saved, &cli->cli_replay_head) {
+                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                CERROR("Request %p is on the replay list at cleanup!\n", req);
+                list_del(&req->rq_list);
+                req->rq_client = NULL;
+                ptlrpc_free_req(req); 
+        }
+        list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
+                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                CDEBUG(D_INFO, "Cleaning req %p from sending list.\n", req);
+                list_del(&req->rq_list);
+                req->rq_client = NULL;
+                ptlrpc_free_req(req); 
+        }
+        list_for_each_safe(tmp, saved, &cli->cli_dying_head) {
+                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                CERROR("Request %p is on the dying list at cleanup!\n", req);
+                list_del(&req->rq_list);
+                req->rq_client = NULL;
+                ptlrpc_free_req(req); 
+        }
+        spin_unlock(&cli->cli_lock);
+        EXIT;
+        return;
+}
+
 int ptlrpc_queue_wait(struct ptlrpc_request *req)
 {
         int rc = 0;
         ENTRY;
 
         init_waitqueue_head(&req->rq_wait_for_rep);
-
+ resend:
+        req->rq_time = CURRENT_TIME;
+        req->rq_timeout = 30;
         rc = ptl_send_rpc(req);
         if (rc) {
                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
@@ -241,15 +373,21 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         CDEBUG(D_OTHER, "-- sleeping\n");
         wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req));
         CDEBUG(D_OTHER, "-- done\n");
-        ptlrpc_cleanup_request_buf(req);
+
+        if (req->rq_flags & PTL_RPC_FL_RESEND) {
+                req->rq_flags &= ~PTL_RPC_FL_RESEND;
+                goto resend;
+        }
+
+        //ptlrpc_cleanup_request_buf(req);
         up(&req->rq_client->cli_rpc_sem);
-        if (req->rq_flags == PTL_RPC_INTR) {
+        if (req->rq_flags & PTL_RPC_FL_INTR) {
                 /* Clean up the dangling reply buffers */
                 ptlrpc_abort(req);
                 GOTO(out, rc = -EINTR);
         }
 
-        if (req->rq_flags != PTL_RPC_REPLY) {
+        if (! (req->rq_flags & PTL_RPC_FL_REPLY)) {
                 CERROR("Unknown reason for wakeup\n");
                 /* XXX Phil - I end up here when I kill obdctl */
                 ptlrpc_abort(req);
@@ -262,11 +400,20 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                 GOTO(out, rc);
         }
         CDEBUG(D_NET, "got rep %d\n", req->rq_repmsg->xid);
-
         if (req->rq_repmsg->status == 0)
                 CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
                        req->rq_replen, req->rq_repmsg->status);
 
+        spin_lock(&req->rq_client->cli_lock);
+        /* add to the tail of the replied head */
+        list_del(&req->rq_list);
+        list_add(&req->rq_list, req->rq_client->cli_replied_head.prev); 
+
+        req->rq_client->cli_last_rcvd = req->rq_repmsg->last_rcvd;
+        req->rq_client->cli_last_committed = req->rq_repmsg->last_committed;
+        ptlrpc_free_committed(req->rq_client); 
+        spin_unlock(&req->rq_client->cli_lock);
+
         EXIT;
  out:
         return rc;