Whamcloud - gitweb
landing b_cmobd_merge on HEAD
[fs/lustre-release.git] / lustre / ptlrpc / events.c
index 77334a2..d865e1f 100644 (file)
 #include <linux/lustre_net.h>
 #include "ptlrpc_internal.h"
 
+#if !defined(__KERNEL__) && defined(CRAY_PORTALS)
+/* forward ref in events.c */
+static void cray_portals_callback(ptl_event_t *ev);
+#endif
+
+
 struct ptlrpc_ni  ptlrpc_interfaces[NAL_MAX_NR];
 int               ptlrpc_ninterfaces;
 
@@ -158,7 +164,8 @@ void request_in_callback(ptl_event_t *ev)
         struct ptlrpc_srv_ni              *srv_ni = rqbd->rqbd_srv_ni;
         struct ptlrpc_service             *service = srv_ni->sni_service;
         struct ptlrpc_request             *req;
-        long                               flags;
+        char                              str[PTL_NALFMT_SIZE];
+        unsigned long                     flags;
         ENTRY;
 
         LASSERT (ev->type == PTL_EVENT_PUT_END ||
@@ -188,8 +195,10 @@ void request_in_callback(ptl_event_t *ev)
                 OBD_ALLOC_GFP(req, sizeof(*req), GFP_ATOMIC);
                 if (req == NULL) {
                         CERROR("Can't allocate incoming request descriptor: "
-                               "Dropping %s RPC from "LPX64"\n",
-                               service->srv_name, ev->initiator.nid);
+                               "Dropping %s RPC from %s\n",
+                               service->srv_name, 
+                               portals_nid2str(srv_ni->sni_ni->pni_number,
+                                               ev->initiator.nid, str));
                         return;
                 }
         }
@@ -319,13 +328,13 @@ void server_bulk_callback (ptl_event_t *ev)
         EXIT;
 }
 
-static int ptlrpc_master_callback(ptl_event_t *ev)
+static void ptlrpc_master_callback(ptl_event_t *ev)
 {
         struct ptlrpc_cb_id *cbid = ev->mem_desc.user_ptr;
         void (*callback)(ptl_event_t *ev) = cbid->cbid_fn;
 
         /* Honestly, it's best to find out early. */
-        LASSERT (cbid->cbid_arg != (void *)0x5a5a5a5a5a5a5a5a);
+        LASSERT (cbid->cbid_arg != LP_POISON);
         LASSERT (callback == request_out_callback ||
                  callback == reply_in_callback ||
                  callback == client_bulk_callback ||
@@ -334,35 +343,32 @@ static int ptlrpc_master_callback(ptl_event_t *ev)
                  callback == server_bulk_callback);
         
         callback (ev);
-        return (0);
 }
 
 int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, struct ptlrpc_peer *peer)
 {
         struct ptlrpc_ni   *pni;
+        __u32               peer_nal;
         ptl_nid_t           peer_nid;
-        ptl_handle_ni_t     peer_ni;
         int                 i;
-        char                str[20];
+        char                str[PTL_NALFMT_SIZE];
         int                 rc = lustre_uuid_to_peer(uuid->uuid, 
-                                                     &peer_ni, &peer_nid);
+                                                     &peer_nal, &peer_nid);
         if (rc != 0)
                 RETURN (rc);
 
         for (i = 0; i < ptlrpc_ninterfaces; i++) {
                 pni = &ptlrpc_interfaces[i];
 
-                if (!memcmp(&peer_ni, &pni->pni_ni_h,
-                            sizeof (peer_ni))) {
+                if (pni->pni_number == peer_nal) {
                         peer->peer_nid = peer_nid;
                         peer->peer_ni = pni;
                         return (0);
                 }
         }
 
-        PtlSnprintHandle(str, sizeof(str), peer_ni);
-        CERROR("Can't find ptlrpc interface for "LPX64" ni %s\n",
-               peer_nid, str);
+        CERROR("Can't find ptlrpc interface for NAL %d, NID %s\n",
+               peer_nal, portals_nid2str(peer_nal, peer_nid, str));
         return (-ENOENT);
 }
 
@@ -385,7 +391,7 @@ void ptlrpc_ni_fini(struct ptlrpc_ni *pni)
                         LBUG();
 
                 case PTL_OK:
-                        kportal_put_ni (pni->pni_number);
+                        PtlNIFini(pni->pni_ni_h);
                         return;
                         
                 case PTL_EQ_IN_USE:
@@ -407,25 +413,48 @@ int ptlrpc_ni_init(int number, char *name, struct ptlrpc_ni *pni)
 {
         int              rc;
         char             str[20];
-        ptl_handle_ni_t *nip = kportal_get_ni (number);
+        ptl_handle_ni_t  nih;
 
-        if (nip == NULL) {
-                CDEBUG (D_NET, "Network interface %s not loaded\n", name);
+        /* We're not passing any limits yet... */
+        rc = PtlNIInit(number, 0, NULL, NULL, &nih);
+        if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
+                CDEBUG (D_NET, "Can't init network interface %s: %d\n", 
+                        name, rc);
                 return (-ENOENT);
         }
 
-        PtlSnprintHandle(str, sizeof(str), *nip);
+        PtlSnprintHandle(str, sizeof(str), nih);
         CDEBUG (D_NET, "init %d %s: %s\n", number, name, str);
 
         pni->pni_name = name;
         pni->pni_number = number;
-        pni->pni_ni_h = *nip;
+        pni->pni_ni_h = nih;
 
         pni->pni_eq_h = PTL_INVALID_HANDLE;
 
-        rc = PtlEQAlloc(pni->pni_ni_h, PTLRPC_NUM_EQ, PTLRPC_EQ_CALLBACK,
+        /* CAVEAT EMPTOR: how we process portals events is _radically_
+         * different depending on... */
+#ifdef __KERNEL__
+        /* kernel portals calls our master callback when events are added to
+         * the event queue.  In fact lustre never pulls events off this queue,
+         * so it's only sized for some debug history. */
+        rc = PtlEQAlloc(pni->pni_ni_h, 1024, ptlrpc_master_callback,
                         &pni->pni_eq_h);
-
+#else
+        /* liblustre calls the master callback when it removes events from the
+         * event queue.  The event queue has to be big enough not to drop
+         * anything */
+# if CRAY_PORTALS
+        /* cray portals implements a non-standard callback to notify us there
+         * are buffered events even when the app is not doing a filesystem
+         * call. */
+        rc = PtlEQAlloc(pni->pni_ni_h, 10240, cray_portals_callback,
+                        &pni->pni_eq_h);
+# else
+        rc = PtlEQAlloc(pni->pni_ni_h, 10240, PTL_EQ_HANDLER_NONE,
+                        &pni->pni_eq_h);
+# endif
+#endif
         if (rc != PTL_OK)
                 GOTO (fail, rc = -ENOMEM);
 
@@ -533,11 +562,12 @@ liblustre_wait_event (int timeout)
         return found_something;
 }
 
-static int cray_portals_callback(ptl_event_t *ev)
+#ifdef CRAY_PORTALS
+static void cray_portals_callback(ptl_event_t *ev)
 {
         /* We get a callback from the client Cray portals implementation
          * whenever anyone calls PtlEQPoll(), and an event queue with a
-         * callback handler has outstanding events.  
+         * callback handler has outstanding events.
          *
          * If it's not liblustre calling PtlEQPoll(), this lets us know we
          * have outstanding events which we handle with
@@ -545,13 +575,20 @@ static int cray_portals_callback(ptl_event_t *ev)
          *
          * Otherwise, we're already eagerly consuming events and we'd
          * handle events out of order if we recursed. */
-        if (liblustre_waiting)
-                return;
-        
-        liblustre_wait_event(0);
+        if (!liblustre_waiting)
+                liblustre_wait_event(0);
 }
+#endif
 #endif /* __KERNEL__ */
 
+int ptlrpc_default_nal(void)
+{
+        if (ptlrpc_ninterfaces == 0)
+                return (-ENOENT);
+
+        return (ptlrpc_interfaces[0].pni_number);
+}
+
 int ptlrpc_init_portals(void)
 {
         /* Add new portals network interfaces here.
@@ -565,7 +602,7 @@ int ptlrpc_init_portals(void)
                 {GMNAL,   "gmnal"},
                 {IBNAL,   "ibnal"},
                 {TCPNAL,  "tcpnal"},
-                {SCIMACNAL, "scimacnal"}};
+                {CRAY_KB_ERNAL, "cray_kb_ernal"}};
         int   rc;
         int   i;