Whamcloud - gitweb
b=2776
[fs/lustre-release.git] / lustre / ptlrpc / events.c
index b1f8221..6e61236 100644 (file)
@@ -29,6 +29,7 @@
 #endif
 #include <linux/obd_class.h>
 #include <linux/lustre_net.h>
+#include "ptlrpc_internal.h"
 
 struct ptlrpc_ni  ptlrpc_interfaces[NAL_MAX_NR];
 int               ptlrpc_ninterfaces;
@@ -43,15 +44,15 @@ void request_out_callback(ptl_event_t *ev)
         unsigned long          flags;
         ENTRY;
 
-        LASSERT (ev->type == PTL_EVENT_SENT ||
+        LASSERT (ev->type == PTL_EVENT_SEND_END ||
                  ev->type == PTL_EVENT_UNLINK);
         LASSERT (ev->unlinked);
 
-        DEBUG_REQ((ev->status == PTL_OK) ? D_NET : D_ERROR, req,
-                  "type %d, status %d", ev->type, ev->status);
+        DEBUG_REQ((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR, req,
+                  "type %d, status %d", ev->type, ev->ni_fail_type);
 
         if (ev->type == PTL_EVENT_UNLINK ||
-            ev->status != PTL_OK) {
+            ev->ni_fail_type != PTL_NI_OK) {
 
                 /* Failed send: make it seem like the reply timed out, just
                  * like failing sends in client.c does currently...  */
@@ -78,23 +79,23 @@ void reply_in_callback(ptl_event_t *ev)
         unsigned long flags;
         ENTRY;
 
-        LASSERT (ev->type == PTL_EVENT_PUT ||
+        LASSERT (ev->type == PTL_EVENT_PUT_END ||
                  ev->type == PTL_EVENT_UNLINK);
         LASSERT (ev->unlinked);
         LASSERT (ev->mem_desc.start == req->rq_repmsg);
         LASSERT (ev->offset == 0);
         LASSERT (ev->mlength <= req->rq_replen);
         
-        DEBUG_REQ((ev->status == PTL_OK) ? D_NET : D_ERROR, req,
-                  "type %d, status %d", ev->type, ev->status);
+        DEBUG_REQ((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR, req,
+                  "type %d, status %d", ev->type, ev->ni_fail_type);
 
         spin_lock_irqsave (&req->rq_lock, flags);
 
         LASSERT (req->rq_receiving_reply);
         req->rq_receiving_reply = 0;
 
-        if (ev->type == PTL_EVENT_PUT &&
-            ev->status == PTL_OK) {
+        if (ev->type == PTL_EVENT_PUT_END &&
+            ev->ni_fail_type == PTL_NI_OK) {
                 req->rq_replied = 1;
                 req->rq_nob_received = ev->mlength;
         }
@@ -118,15 +119,15 @@ void client_bulk_callback (ptl_event_t *ev)
         ENTRY;
 
         LASSERT ((desc->bd_type == BULK_PUT_SINK && 
-                  ev->type == PTL_EVENT_PUT) ||
+                  ev->type == PTL_EVENT_PUT_END) ||
                  (desc->bd_type == BULK_GET_SOURCE &&
-                  ev->type == PTL_EVENT_GET) ||
+                  ev->type == PTL_EVENT_GET_END) ||
                  ev->type == PTL_EVENT_UNLINK);
         LASSERT (ev->unlinked);
 
-        CDEBUG((ev->status == PTL_OK) ? D_NET : D_ERROR,
+        CDEBUG((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR,
                "event type %d, status %d, desc %p\n", 
-               ev->type, ev->status, desc);
+               ev->type, ev->ni_fail_type, desc);
 
         spin_lock_irqsave (&desc->bd_lock, flags);
 
@@ -134,7 +135,7 @@ void client_bulk_callback (ptl_event_t *ev)
         desc->bd_network_rw = 0;
 
         if (ev->type != PTL_EVENT_UNLINK &&
-            ev->status == PTL_OK) {
+            ev->ni_fail_type == PTL_NI_OK) {
                 desc->bd_success = 1;
                 desc->bd_nob_transferred = ev->mlength;
         }
@@ -160,15 +161,15 @@ void request_in_callback(ptl_event_t *ev)
         long                               flags;
         ENTRY;
 
-        LASSERT (ev->type == PTL_EVENT_PUT ||
+        LASSERT (ev->type == PTL_EVENT_PUT_END ||
                  ev->type == PTL_EVENT_UNLINK);
         LASSERT ((char *)ev->mem_desc.start >= rqbd->rqbd_buffer);
         LASSERT ((char *)ev->mem_desc.start + ev->offset + ev->mlength <=
                  rqbd->rqbd_buffer + service->srv_buf_size);
 
-        CDEBUG((ev->status == PTL_OK) ? D_NET : D_ERROR,
+        CDEBUG((ev->ni_fail_type == PTL_OK) ? D_NET : D_ERROR,
                "event type %d, status %d, service %s\n", 
-               ev->type, ev->status, service->srv_name);
+               ev->type, ev->ni_fail_type, service->srv_name);
 
         if (ev->unlinked) {
                 /* If this is the last request message to fit in the
@@ -179,8 +180,8 @@ void request_in_callback(ptl_event_t *ev)
                 req = &rqbd->rqbd_req;
                 memset(req, 0, sizeof (*req));
         } else {
-                LASSERT (ev->type == PTL_EVENT_PUT);
-                if (ev->status != PTL_OK) {
+                LASSERT (ev->type == PTL_EVENT_PUT_END);
+                if (ev->ni_fail_type != PTL_NI_OK) {
                         /* We moaned above already... */
                         return;
                 }
@@ -198,10 +199,10 @@ void request_in_callback(ptl_event_t *ev)
          * size to non-zero if this was a successful receive. */
         req->rq_xid = ev->match_bits;
         req->rq_reqmsg = ev->mem_desc.start + ev->offset;
-        if (ev->type == PTL_EVENT_PUT &&
-            ev->status == PTL_OK)
+        if (ev->type == PTL_EVENT_PUT_END &&
+            ev->ni_fail_type == PTL_NI_OK)
                 req->rq_reqlen = ev->mlength;
-        req->rq_arrival_time = ev->arrival_time;
+        do_gettimeofday(&req->rq_arrival_time);
         req->rq_peer.peer_nid = ev->initiator.nid;
         req->rq_peer.peer_ni = rqbd->rqbd_srv_ni->sni_ni;
         req->rq_rqbd = rqbd;
@@ -249,7 +250,7 @@ void reply_out_callback(ptl_event_t *ev)
         unsigned long              flags;
         ENTRY;
 
-        LASSERT (ev->type == PTL_EVENT_SENT ||
+        LASSERT (ev->type == PTL_EVENT_SEND_END ||
                  ev->type == PTL_EVENT_ACK ||
                  ev->type == PTL_EVENT_UNLINK);
 
@@ -285,22 +286,22 @@ void server_bulk_callback (ptl_event_t *ev)
         unsigned long            flags;
         ENTRY;
 
-        LASSERT (ev->type == PTL_EVENT_SENT ||
+        LASSERT (ev->type == PTL_EVENT_SEND_END ||
                  ev->type == PTL_EVENT_UNLINK ||
                  (desc->bd_type == BULK_PUT_SOURCE &&
                   ev->type == PTL_EVENT_ACK) ||
                  (desc->bd_type == BULK_GET_SINK &&
-                  ev->type == PTL_EVENT_REPLY));
+                  ev->type == PTL_EVENT_REPLY_END));
 
-        CDEBUG((ev->status == PTL_OK) ? D_NET : D_ERROR,
+        CDEBUG((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR,
                "event type %d, status %d, desc %p\n", 
-               ev->type, ev->status, desc);
+               ev->type, ev->ni_fail_type, desc);
 
         spin_lock_irqsave (&desc->bd_lock, flags);
         
         if ((ev->type == PTL_EVENT_ACK ||
-             ev->type == PTL_EVENT_REPLY) &&
-            ev->status == PTL_OK) {
+             ev->type == PTL_EVENT_REPLY_END) &&
+            ev->ni_fail_type == PTL_NI_OK) {
                 /* We heard back from the peer, so even if we get this
                  * before the SENT event (oh yes we can), we know we
                  * read/wrote the peer buffer and how much... */
@@ -339,26 +340,29 @@ static int ptlrpc_master_callback(ptl_event_t *ev)
 int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, struct ptlrpc_peer *peer)
 {
         struct ptlrpc_ni   *pni;
-        struct lustre_peer  lpeer;
+        ptl_nid_t           peer_nid;
+        ptl_handle_ni_t     peer_ni;
         int                 i;
-        int                 rc = lustre_uuid_to_peer (uuid->uuid, &lpeer);
-
+        char                str[20];
+        int                 rc = lustre_uuid_to_peer(uuid->uuid, 
+                                                     &peer_ni, &peer_nid);
         if (rc != 0)
                 RETURN (rc);
 
         for (i = 0; i < ptlrpc_ninterfaces; i++) {
                 pni = &ptlrpc_interfaces[i];
 
-                if (!memcmp(&lpeer.peer_ni, &pni->pni_ni_h,
-                            sizeof (lpeer.peer_ni))) {
-                        peer->peer_nid = lpeer.peer_nid;
+                if (!memcmp(&peer_ni, &pni->pni_ni_h,
+                            sizeof (peer_ni))) {
+                        peer->peer_nid = peer_nid;
                         peer->peer_ni = pni;
                         return (0);
                 }
         }
 
-        CERROR("Can't find ptlrpc interface for "LPX64" ni handle %08lx."LPX64"\n",
-               lpeer.peer_nid, lpeer.peer_ni.nal_idx, lpeer.peer_ni.cookie);
+        PtlSnprintHandle(str, sizeof(str), peer_ni);
+        CERROR("Can't find ptlrpc interface for "LPX64" ni %s\n",
+               peer_nid, str);
         return (-ENOENT);
 }
 
@@ -384,7 +388,7 @@ void ptlrpc_ni_fini(struct ptlrpc_ni *pni)
                         kportal_put_ni (pni->pni_number);
                         return;
                         
-                case PTL_EQ_INUSE:
+                case PTL_EQ_IN_USE:
                         if (retries != 0)
                                 CWARN("Event queue for %s still busy\n",
                                       pni->pni_name);
@@ -402,6 +406,7 @@ void ptlrpc_ni_fini(struct ptlrpc_ni *pni)
 int ptlrpc_ni_init(int number, char *name, struct ptlrpc_ni *pni)
 {
         int              rc;
+        char             str[20];
         ptl_handle_ni_t *nip = kportal_get_ni (number);
 
         if (nip == NULL) {
@@ -409,24 +414,18 @@ int ptlrpc_ni_init(int number, char *name, struct ptlrpc_ni *pni)
                 return (-ENOENT);
         }
 
-        CDEBUG (D_NET, "init %d %s: nal_idx %ld\n", number, name, nip->nal_idx);
+        PtlSnprintHandle(str, sizeof(str), *nip);
+        CDEBUG (D_NET, "init %d %s: %s\n", number, name, str);
 
         pni->pni_name = name;
         pni->pni_number = number;
         pni->pni_ni_h = *nip;
 
-        pni->pni_eq_h = PTL_HANDLE_NONE;
+        pni->pni_eq_h = PTL_INVALID_HANDLE;
 
-#ifdef __KERNEL__
-        /* kernel: portals calls the callback when the event is added to the
-         * queue, so we don't care if we lose events */
-        rc = PtlEQAlloc(pni->pni_ni_h, 1024, ptlrpc_master_callback,
+        rc = PtlEQAlloc(pni->pni_ni_h, PTLRPC_NUM_EQ, PTLRPC_EQ_CALLBACK,
                         &pni->pni_eq_h);
-#else
-        /* liblustre: no asynchronous callback and allocate a nice big event
-         * queue so we don't drop any events... */
-        rc = PtlEQAlloc(pni->pni_ni_h, 10240, NULL, &pni->pni_eq_h);
-#endif
+
         if (rc != PTL_OK)
                 GOTO (fail, rc = -ENOMEM);
 
@@ -473,19 +472,16 @@ liblustre_check_events (int timeout)
 {
         ptl_event_t ev;
         int         rc;
+        int         i;
         ENTRY;
 
-        if (timeout) {
-                rc = PtlEQWait_timeout(ptlrpc_interfaces[0].pni_eq_h, &ev, timeout);
-        } else {
-                rc = PtlEQGet (ptlrpc_interfaces[0].pni_eq_h, &ev);
-        }
+        rc = PtlEQPoll(&ptlrpc_interfaces[0].pni_eq_h, 1, timeout * 1000,
+                       &ev, &i);
         if (rc == PTL_EQ_EMPTY)
                 RETURN(0);
         
         LASSERT (rc == PTL_EQ_DROPPED || rc == PTL_OK);
         
-#ifndef __KERNEL__
         /* liblustre: no asynch callback so we can't affort to miss any
          * events... */
         if (rc == PTL_EQ_DROPPED) {
@@ -494,10 +490,11 @@ liblustre_check_events (int timeout)
         }
         
         ptlrpc_master_callback (&ev);
-#endif
         RETURN(1);
 }
 
+int liblustre_waiting = 0;
+
 int
 liblustre_wait_event (int timeout)
 {
@@ -505,40 +502,55 @@ liblustre_wait_event (int timeout)
         struct liblustre_wait_callback *llwc;
         int                             found_something = 0;
 
-        /* First check for any new events */
-        if (liblustre_check_events(0))
-                found_something = 1;
+        /* single threaded recursion check... */
+        liblustre_waiting = 1;
 
-        /* Now give all registered callbacks a bite at the cherry */
-        list_for_each(tmp, &liblustre_wait_callbacks) {
-                llwc = list_entry(tmp, struct liblustre_wait_callback, 
-                                  llwc_list);
-                
-                if (llwc->llwc_fn(llwc->llwc_arg))
+        for (;;) {
+                /* Deal with all pending events */
+                while (liblustre_check_events(0))
                         found_something = 1;
-        }
 
-        /* return to caller if something happened */
-        if (found_something)
-                return 1;
-        
-        /* block for an event, returning immediately on timeout */
-        if (!liblustre_check_events(timeout))
-                return 0;
-
-        /* an event occurred; let all registered callbacks progress... */
-        list_for_each(tmp, &liblustre_wait_callbacks) {
-                llwc = list_entry(tmp, struct liblustre_wait_callback, 
-                                  llwc_list);
+                /* Give all registered callbacks a bite at the cherry */
+                list_for_each(tmp, &liblustre_wait_callbacks) {
+                        llwc = list_entry(tmp, struct liblustre_wait_callback, 
+                                          llwc_list);
                 
-                if (llwc->llwc_fn(llwc->llwc_arg))
-                        found_something = 1;
+                        if (llwc->llwc_fn(llwc->llwc_arg))
+                                found_something = 1;
+                }
+
+                if (found_something || timeout == 0)
+                        break;
+
+                /* Nothing so far, but I'm allowed to block... */
+                found_something = liblustre_check_events(timeout);
+                if (!found_something)           /* still nothing */
+                        break;                  /* I timed out */
         }
 
-        /* ...and tell caller something happened */
-        return 1;
+        liblustre_waiting = 0;
+
+        return found_something;
 }
-#endif
+
+static int cray_portals_callback(ptl_event_t *ev)
+{
+        /* We get a callback from the client Cray portals implementation
+         * whenever anyone calls PtlEQPoll(), and an event queue with a
+         * callback handler has outstanding events.  
+         *
+         * If it's not liblustre calling PtlEQPoll(), this lets us know we
+         * have outstanding events which we handle with
+         * liblustre_wait_event().
+         *
+         * Otherwise, we're already eagerly consuming events and we'd
+         * handle events out of order if we recursed. */
+        if (liblustre_waiting)
+                return;
+        
+        liblustre_wait_event(0);
+}
+#endif /* __KERNEL__ */
 
 int ptlrpc_init_portals(void)
 {