Whamcloud - gitweb
land 0.5.20.3 b_devel onto HEAD (b_devel will remain)
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 48e11b5..7d80d5f 100644 (file)
  */
 
 #define DEBUG_SUBSYSTEM S_RPC
+#ifndef __KERNEL__
+#include <errno.h>
+#include <signal.h>
+#include <liblustre.h>
+#endif
 
 #include <linux/obd_support.h>
 #include <linux/obd_class.h>
@@ -44,10 +49,10 @@ struct obd_uuid *ptlrpc_req_to_uuid(struct ptlrpc_request *req)
 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
 {
         struct ptlrpc_connection *c;
-        struct lustre_peer peer;
+        struct ptlrpc_peer peer;
         int err;
 
-        err = kportal_uuid_to_peer(uuid->uuid, &peer);
+        err = ptlrpc_uuid_to_peer(uuid, &peer);
         if (err != 0) {
                 CERROR("cannot find peer %s!\n", uuid->uuid);
                 return NULL;
@@ -67,16 +72,16 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
 
 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,struct obd_uuid *uuid)
 {
-        struct lustre_peer peer;
+        struct ptlrpc_peer peer;
         int err;
 
-        err = kportal_uuid_to_peer(uuid->uuid, &peer);
+        err = ptlrpc_uuid_to_peer (uuid, &peer);
         if (err != 0) {
                 CERROR("cannot find peer %s!\n", uuid->uuid);
                 return;
         }
 
-        memcpy(&conn->c_peer, &peer, sizeof(peer));
+        memcpy (&conn->c_peer, &peer, sizeof (peer));
         return;
 }
 
@@ -183,11 +188,19 @@ static int ll_sync_brw_timeout(void *data)
 
                 LASSERT(desc->bd_connection);
 
-                /* If PtlMDUnlink succeeds, then it hasn't completed yet.  If it
-                 * fails, the bulk finished _just_ in time (after the timeout
-                 * fired but before we got this far) and we'll let it live.
+                /* If PtlMDUnlink succeeds, then bulk I/O on the MD hasn't
+                 * even started yet.  XXX where do we kunmup the thing?
+                 *
+                 * If it fail with PTL_MD_BUSY, then the network is still
+                 * reading/writing the buffers and we must wait for it to
+                 * complete (which it will within finite time, most
+                 * probably with failure; we really need portals error
+                 * events to detect that).
+                 *
+                 * Otherwise (PTL_INV_MD) it completed after the bd_flags
+                 * test above!
                  */
-                if (PtlMDUnlink(desc->bd_md_h) != 0) {
+                if (PtlMDUnlink(desc->bd_md_h) != PTL_OK) {
                         CERROR("Near-miss on OST %s -- need to adjust "
                                "obd_timeout?\n",
                                desc->bd_connection->c_remote_uuid.uuid);
@@ -311,13 +324,25 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
                 return;
         }
 
+        /* We must take it off the imp_replay_list first.  Otherwise, we'll set
+         * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
+        if (request->rq_import) {
+                unsigned long flags = 0;
+                if (!locked)
+                        spin_lock_irqsave(&request->rq_import->imp_lock, flags);
+                list_del_init(&request->rq_list);
+                if (!locked)
+                        spin_unlock_irqrestore(&request->rq_import->imp_lock,
+                                               flags);
+        }
+
         if (atomic_read(&request->rq_refcount) != 0) {
                 CERROR("freeing request %p (%d->%s:%d) with refcount %d\n",
                        request, request->rq_reqmsg->opc,
                        request->rq_connection->c_remote_uuid.uuid,
                        request->rq_import->imp_client->cli_request_portal,
                        atomic_read (&request->rq_refcount));
-                /* LBUG(); */
+                LBUG();
         }
 
         if (request->rq_repmsg != NULL) {
@@ -330,16 +355,6 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
                 request->rq_reqmsg = NULL;
         }
 
-        if (request->rq_import) {
-                unsigned long flags = 0;
-                if (!locked)
-                        spin_lock_irqsave(&request->rq_import->imp_lock, flags);
-                list_del_init(&request->rq_list);
-                if (!locked)
-                        spin_unlock_irqrestore(&request->rq_import->imp_lock,
-                                               flags);
-        }
-
         ptlrpc_put_connection(request->rq_connection);
         OBD_FREE(request, sizeof(*request));
         EXIT;
@@ -392,7 +407,6 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
         }
 
         if (req->rq_flags & PTL_RPC_FL_RESEND) {
-                ENTRY;
                 DEBUG_REQ(D_ERROR, req, "RESEND:");
                 GOTO(out, rc = 1);
         }
@@ -442,7 +456,7 @@ static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
 }
 
 /* Abort this request and cleanup any resources associated with it. */
-static int ptlrpc_abort(struct ptlrpc_request *request)
+int ptlrpc_abort(struct ptlrpc_request *request)
 {
         /* First remove the ME for the reply; in theory, this means
          * that we can tear down the buffer safely. */
@@ -469,8 +483,8 @@ void ptlrpc_free_committed(struct obd_import *imp)
         LASSERT(spin_is_locked(&imp->imp_lock));
 #endif
 
-        CDEBUG(D_HA, "committing for last_committed "LPU64"\n",
-               imp->imp_peer_committed_transno);
+        CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
+               imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
 
         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
@@ -524,17 +538,14 @@ void ptlrpc_cleanup_client(struct obd_import *imp)
 
 void ptlrpc_continue_req(struct ptlrpc_request *req)
 {
-        ENTRY;
         DEBUG_REQ(D_HA, req, "continuing delayed request");
         req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
         req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
         wake_up(&req->rq_wait_for_rep);
-        EXIT;
 }
 
 void ptlrpc_resend_req(struct ptlrpc_request *req)
 {
-        ENTRY;
         DEBUG_REQ(D_HA, req, "resending");
         req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
         req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
@@ -543,18 +554,15 @@ void ptlrpc_resend_req(struct ptlrpc_request *req)
         req->rq_flags |= PTL_RPC_FL_RESEND;
         req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
         wake_up(&req->rq_wait_for_rep);
-        EXIT;
 }
 
 void ptlrpc_restart_req(struct ptlrpc_request *req)
 {
-        ENTRY;
         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
         req->rq_status = -ERESTARTSYS;
         req->rq_flags |= PTL_RPC_FL_RESTART;
         req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
         wake_up(&req->rq_wait_for_rep);
-        EXIT;
 }
 
 static int expired_request(void *data)
@@ -659,15 +667,14 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
 
         init_waitqueue_head(&req->rq_wait_for_rep);
 
-        spin_lock_irqsave(&imp->imp_lock, flags);
-        req->rq_xid = HTON__u32(++imp->imp_last_xid);
-        spin_unlock_irqrestore(&imp->imp_lock, flags);
+        req->rq_xid = HTON__u32(ptlrpc_next_xid());
 
         /* for distributed debugging */
         req->rq_reqmsg->status = HTON__u32(current->pid);
-        CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:"LPU64":%x:%d\n",
-               NTOH__u32(req->rq_reqmsg->status), req->rq_xid,
-               conn->c_peer.peer_nid, NTOH__u32(req->rq_reqmsg->opc));
+        CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:"LPU64":%s:"LPX64
+               ":%d\n", NTOH__u32(req->rq_reqmsg->status), req->rq_xid,
+               conn->c_peer.peer_ni->pni_name, conn->c_peer.peer_nid,
+               NTOH__u32(req->rq_reqmsg->opc));
 
         spin_lock_irqsave(&imp->imp_lock, flags);
 
@@ -726,7 +733,17 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                 lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, expired_request,
                                        interrupted_request, req);
         }
+#ifdef __KERNEL__
         l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
+#else 
+        { 
+                extern int reply_in_callback(ptl_event_t *ev);
+                ptl_event_t reply_ev;
+                PtlEQWait(req->rq_connection->c_peer.peer_ni->pni_reply_in_eq_h, &reply_ev);
+                reply_in_callback(&reply_ev); 
+        }
+#endif 
+
         DEBUG_REQ(D_NET, req, "-- done sleeping");
 
         spin_lock_irqsave(&imp->imp_lock, flags);
@@ -741,6 +758,11 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         /* Don't resend if we were interrupted. */
         if ((req->rq_flags & (PTL_RPC_FL_RESEND | PTL_RPC_FL_INTR)) ==
             PTL_RPC_FL_RESEND) {
+                if (req->rq_flags & PTL_RPC_FL_NO_RESEND) {
+                        ptlrpc_abort(req); /* clean up reply buffers */
+                        req->rq_flags &= ~PTL_RPC_FL_NO_RESEND;
+                        GOTO(out, rc = -ETIMEDOUT);
+                }
                 req->rq_flags &= ~PTL_RPC_FL_RESEND;
                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
                 DEBUG_REQ(D_HA, req, "resending: ");
@@ -900,9 +922,11 @@ void ptlrpc_abort_inflight(struct obd_import *imp, int dying_import)
          * ptlrpc_queue_wait must (and does) hold imp_lock while testing this
          * flag and then putting requests on sending_list or delayed_list.
          */
-        spin_lock_irqsave(&imp->imp_lock, flags);
-        imp->imp_flags |= IMP_INVALID;
-        spin_unlock_irqrestore(&imp->imp_lock, flags);
+        if ((imp->imp_flags & IMP_REPLAYABLE) == 0) {
+                spin_lock_irqsave(&imp->imp_lock, flags);
+                imp->imp_flags |= IMP_INVALID;
+                spin_unlock_irqrestore(&imp->imp_lock, flags);
+        }
 
         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
                 struct ptlrpc_request *req =