Whamcloud - gitweb
* Move recovery state into connection from client, and fallout therefrom.
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index c775a5e..cce98a1 100644 (file)
 #include <linux/lustre_lib.h>
 #include <linux/lustre_ha.h>
 
-void ptlrpc_init_client(struct recovd_obd *recovd, 
-                        int (*recover)(struct ptlrpc_client *recover),
-                        int req_portal,
-                        int rep_portal, struct ptlrpc_client *cl)
+void ptlrpc_init_client(int req_portal, int rep_portal, struct ptlrpc_client *cl,
+                        struct ptlrpc_connection *conn)
 {
         memset(cl, 0, sizeof(*cl));
-        cl->cli_recover = recover;
-        if (recovd)
-                recovd_cli_manage(recovd, cl);
+        /* Some things, like the LDLM, can call us without a connection.
+         * I don't like it one bit.
+         */
+        if (conn) {
+                cl->cli_connection = conn;
+                list_add(&cl->cli_client_chain, &conn->c_clients);
+        }
         cl->cli_obd = NULL;
         cl->cli_request_portal = req_portal;
         cl->cli_reply_portal = rep_portal;
-        INIT_LIST_HEAD(&cl->cli_delayed_head);
-        INIT_LIST_HEAD(&cl->cli_sending_head);
-        INIT_LIST_HEAD(&cl->cli_dying_head);
-        spin_lock_init(&cl->cli_lock);
         sema_init(&cl->cli_rpc_sem, 32);
 }
 
@@ -153,11 +151,11 @@ void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *bulk)
 }
 
 struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
-                                       struct ptlrpc_connection *conn,
                                        int opcode, int count, int *lengths,
                                        char **bufs)
 {
         struct ptlrpc_request *request;
+        struct ptlrpc_connection *conn = cl->cli_connection;
         int rc;
         ENTRY;
 
@@ -212,8 +210,7 @@ struct ptlrpc_request *ptlrpc_prep_req2(struct lustre_handle *conn,
         }
 
         clobd = &export->exp_obd->u.cli;
-        req = ptlrpc_prep_req(clobd->cl_client, clobd->cl_conn, 
-                              opcode, count, lengths, bufs);
+        req = ptlrpc_prep_req(clobd->cl_client, opcode, count, lengths, bufs);
         ptlrpc_hdl2req(req, &clobd->cl_exporth);
         return req;
 }
@@ -246,10 +243,10 @@ void ptlrpc_free_req(struct ptlrpc_request *request)
         if (request->rq_reqmsg != NULL)
                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
 
-        if (request->rq_client) {
-                spin_lock(&request->rq_client->cli_lock);
+        if (request->rq_connection) {
+                spin_lock(&request->rq_connection->c_lock);
                 list_del_init(&request->rq_list);
-                spin_unlock(&request->rq_client->cli_lock);
+                spin_unlock(&request->rq_connection->c_lock);
         }
 
         ptlrpc_put_connection(request->rq_connection);
@@ -335,12 +332,12 @@ static int ptlrpc_abort(struct ptlrpc_request *request)
 }
 
 /* caller must lock cli */
-void ptlrpc_free_committed(struct ptlrpc_client *cli)
+void ptlrpc_free_committed(struct ptlrpc_connection *conn)
 {
         struct list_head *tmp, *saved;
         struct ptlrpc_request *req;
 
-        list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
+        list_for_each_safe(tmp, saved, &conn->c_sending_head) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
 
                 if ( (req->rq_flags & PTL_RPC_FL_REPLAY) ) { 
@@ -350,13 +347,13 @@ void ptlrpc_free_committed(struct ptlrpc_client *cli)
                 }
 
                 /* not yet committed */
-                if (req->rq_transno > cli->cli_last_committed)
+                if (req->rq_transno > conn->c_last_committed)
                         break;
 
                 CDEBUG(D_INFO, "Marking request xid %Ld as committed ("
                        "transno=%Lu, last_committed=%Lu\n",
                        (long long)req->rq_xid, (long long)req->rq_transno,
-                       (long long)cli->cli_last_committed);
+                       (long long)conn->c_last_committed);
                 if (atomic_dec_and_test(&req->rq_refcount)) {
                         /* we do this to prevent free_req deadlock */
                         list_del_init(&req->rq_list); 
@@ -364,7 +361,7 @@ void ptlrpc_free_committed(struct ptlrpc_client *cli)
                         ptlrpc_free_req(req);
                 } else {
                         list_del_init(&req->rq_list);
-                        list_add(&req->rq_list, &cli->cli_dying_head);
+                        list_add(&req->rq_list, &conn->c_dying_head);
                 }
         }
 
@@ -376,24 +373,34 @@ void ptlrpc_cleanup_client(struct ptlrpc_client *cli)
 {
         struct list_head *tmp, *saved;
         struct ptlrpc_request *req;
+        struct ptlrpc_connection *conn = cli->cli_connection;
         ENTRY;
 
-        spin_lock(&cli->cli_lock);
-        list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
+        if (!conn) {
+                EXIT;
+                return;
+        }
+
+        spin_lock(&conn->c_lock);
+        list_for_each_safe(tmp, saved, &conn->c_sending_head) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                if (req->rq_client != cli)
+                        continue;
                 CDEBUG(D_INFO, "Cleaning req %p from sending list.\n", req);
                 list_del_init(&req->rq_list);
                 req->rq_client = NULL;
                 ptlrpc_free_req(req); 
         }
-        list_for_each_safe(tmp, saved, &cli->cli_dying_head) {
+        list_for_each_safe(tmp, saved, &conn->c_dying_head) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                if (req->rq_client != cli)
+                        continue;
                 CERROR("Request %p is on the dying list at cleanup!\n", req);
                 list_del_init(&req->rq_list);
                 req->rq_client = NULL;
                 ptlrpc_free_req(req); 
         }
-        spin_unlock(&cli->cli_lock);
+        spin_unlock(&conn->c_lock);
 
         EXIT;
         return;
@@ -444,8 +451,9 @@ static int expired_request(void *data)
         req->rq_connection->c_level = LUSTRE_CONN_RECOVD;
         req->rq_flags |= PTL_RPC_FL_TIMEOUT;
         /* Activate the recovd for this client, if there is one. */
-        if (req->rq_client && req->rq_client->cli_recovd)
-                recovd_cli_fail(req->rq_client);
+        if (req->rq_client && req->rq_client->cli_connection &&
+            req->rq_client->cli_connection->c_recovd)
+                recovd_conn_fail(req->rq_client->cli_connection);
 
         /* If this request is for recovery or other primordial tasks,
          * don't go back to sleep.
@@ -468,6 +476,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         int rc = 0;
         struct l_wait_info lwi;
         struct ptlrpc_client *cli = req->rq_client;
+        struct ptlrpc_connection *conn = cli->cli_connection;
         ENTRY;
 
         init_waitqueue_head(&req->rq_wait_for_rep);
@@ -480,19 +489,19 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                 CERROR("process %d waiting for recovery (%d > %d)\n", 
                        current->pid, req->rq_level, req->rq_connection->c_level);
 
-                spin_lock(&cli->cli_lock);
+                spin_lock(&conn->c_lock);
                 list_del_init(&req->rq_list);
-                list_add_tail(&req->rq_list, &cli->cli_delayed_head);
-                spin_unlock(&cli->cli_lock);
+                list_add_tail(&req->rq_list, &conn->c_delayed_head);
+                spin_unlock(&conn->c_lock);
 
                 lwi = LWI_INTR(NULL, NULL);
                 rc = l_wait_event(req->rq_wait_for_rep,
                                   req->rq_level <= req->rq_connection->c_level,
                                   &lwi);
 
-                spin_lock(&cli->cli_lock);
+                spin_lock(&conn->c_lock);
                 list_del_init(&req->rq_list);
-                spin_unlock(&cli->cli_lock);
+                spin_unlock(&conn->c_lock);
                 
                 if (rc)
                         RETURN(rc);
@@ -512,10 +521,10 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                 RETURN(-rc);
         }
 
-        spin_lock(&cli->cli_lock);
+        spin_lock(&conn->c_lock);
         list_del_init(&req->rq_list);
-        list_add_tail(&req->rq_list, &cli->cli_sending_head);
-        spin_unlock(&cli->cli_lock);
+        list_add_tail(&req->rq_list, &conn->c_sending_head);
+        spin_unlock(&conn->c_lock);
 
         CDEBUG(D_OTHER, "-- sleeping\n");
         lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request,
@@ -555,11 +564,11 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                 CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
                        req->rq_replen, req->rq_repmsg->status);
 
-        spin_lock(&cli->cli_lock);
-        cli->cli_last_xid = req->rq_repmsg->last_xid;
-        cli->cli_last_committed = req->rq_repmsg->last_committed;
-        ptlrpc_free_committed(cli); 
-        spin_unlock(&cli->cli_lock);
+        spin_lock(&conn->c_lock);
+        conn->c_last_xid = req->rq_repmsg->last_xid;
+        conn->c_last_committed = req->rq_repmsg->last_committed;
+        ptlrpc_free_committed(conn); 
+        spin_unlock(&conn->c_lock);
 
         EXIT;
  out: