Whamcloud - gitweb
Merge b_md into HEAD
[fs/lustre-release.git] / lustre / ptlrpc / events.c
index c260f5d..e7a1e08 100644 (file)
@@ -1,7 +1,7 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  Copyright (C) 2002 Cluster File Systems, Inc.
+ *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
  *
  *   This file is part of Lustre, http://www.lustre.org.
  *
@@ -26,8 +26,9 @@
 #include <linux/obd_support.h>
 #include <linux/lustre_net.h>
 
-ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq, bulk_source_eq,
-        bulk_sink_eq;
+ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq, 
+        bulk_put_source_eq, bulk_put_sink_eq, 
+        bulk_get_source_eq, bulk_get_sink_eq;
 static const ptl_handle_ni_t *socknal_nip = NULL, *toenal_nip = NULL, 
         *qswnal_nip = NULL, *gmnal_nip = NULL;
 
@@ -149,7 +150,7 @@ int request_in_callback(ptl_event_t *ev)
         return 0;
 }
 
-static int bulk_source_callback(ptl_event_t *ev)
+static int bulk_put_source_callback(ptl_event_t *ev)
 {
         struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
         struct ptlrpc_bulk_page *bulk;
@@ -196,7 +197,7 @@ static int bulk_source_callback(ptl_event_t *ev)
         RETURN(0);
 }
 
-static int bulk_sink_callback(ptl_event_t *ev)
+static int bulk_put_sink_callback(ptl_event_t *ev)
 {
         struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
         struct ptlrpc_bulk_page *bulk;
@@ -241,6 +242,100 @@ static int bulk_sink_callback(ptl_event_t *ev)
         RETURN(1);
 }
 
+static int bulk_get_source_callback(ptl_event_t *ev)
+{
+        struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
+        struct ptlrpc_bulk_page *bulk;
+        struct list_head        *tmp;
+        struct list_head        *next;
+        ptl_size_t               total = 0;
+        void                   (*event_handler)(struct ptlrpc_bulk_desc *);
+        ENTRY;
+
+        LASSERT(ev->type == PTL_EVENT_GET);
+
+        /* put with zero offset */
+        LASSERT(ev->offset == 0);
+        /* used iovs */
+        LASSERT((ev->mem_desc.options & PTL_MD_IOV) != 0);
+        /* 1 fragment for each page always */
+        LASSERT(ev->mem_desc.niov == desc->bd_page_count);
+
+        list_for_each_safe (tmp, next, &desc->bd_page_list) {
+                bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
+
+                total += bulk->bp_buflen;
+
+                if (bulk->bp_cb != NULL)
+                        bulk->bp_cb(bulk);
+        }
+
+        LASSERT(ev->mem_desc.length == total);
+
+        /* We need to make a note of whether there's an event handler
+         * before we call wake_up, because if there is no event
+         * handler, 'desc' might be freed before we're scheduled again. */
+        event_handler = desc->bd_ptl_ev_hdlr;
+
+        desc->bd_flags |= PTL_BULK_FL_SENT;
+        wake_up(&desc->bd_waitq);
+        if (event_handler) {
+                LASSERT(desc->bd_ptl_ev_hdlr == event_handler);
+                event_handler(desc);
+        }
+
+        RETURN(1);
+}
+
+
+static int bulk_get_sink_callback(ptl_event_t *ev)
+{
+        struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
+        struct ptlrpc_bulk_page *bulk;
+        struct list_head        *tmp;
+        struct list_head        *next;
+        ENTRY;
+
+        CDEBUG(D_NET, "got %s event %d\n",
+               (ev->type == PTL_EVENT_SENT) ? "SENT" :
+               (ev->type == PTL_EVENT_REPLY)  ? "REPLY"  : "UNEXPECTED", 
+               ev->type);
+
+        LASSERT(ev->type == PTL_EVENT_SENT || ev->type == PTL_EVENT_REPLY);
+
+        LASSERT(atomic_read(&desc->bd_source_callback_count) > 0 &&
+                atomic_read(&desc->bd_source_callback_count) <= 2);
+
+        /* 1 fragment for each page always */
+        LASSERT(ev->mem_desc.niov == desc->bd_page_count);
+
+        if (atomic_dec_and_test(&desc->bd_source_callback_count)) {
+                void (*event_handler)(struct ptlrpc_bulk_desc *);
+
+                list_for_each_safe(tmp, next, &desc->bd_page_list) {
+                        bulk = list_entry(tmp, struct ptlrpc_bulk_page,
+                                          bp_link);
+
+                        if (bulk->bp_cb != NULL)
+                                bulk->bp_cb(bulk);
+                }
+
+                /* We need to make a note of whether there's an event handler
+                 * before we call wake_up, because if there is no event handler,
+                 * 'desc' might be freed before we're scheduled again. */
+                event_handler = desc->bd_ptl_ev_hdlr;
+
+                desc->bd_flags |= PTL_BULK_FL_RCVD;
+                wake_up(&desc->bd_waitq);
+                if (event_handler) {
+                        LASSERT(desc->bd_ptl_ev_hdlr == event_handler);
+                        event_handler(desc);
+                }
+        }
+
+        RETURN(0);
+}
+
 int ptlrpc_init_portals(void)
 {
         int rc;
@@ -272,11 +367,21 @@ int ptlrpc_init_portals(void)
         if (rc != PTL_OK)
                 CERROR("PtlEQAlloc failed: %d\n", rc);
 
-        rc = PtlEQAlloc(ni, 1024, bulk_source_callback, &bulk_source_eq);
+        rc = PtlEQAlloc(ni, 1024, bulk_put_source_callback, 
+                        &bulk_put_source_eq);
+        if (rc != PTL_OK)
+                CERROR("PtlEQAlloc failed: %d\n", rc);
+
+        rc = PtlEQAlloc(ni, 1024, bulk_put_sink_callback, &bulk_put_sink_eq);
+        if (rc != PTL_OK)
+                CERROR("PtlEQAlloc failed: %d\n", rc);
+
+        rc = PtlEQAlloc(ni, 1024, bulk_get_source_callback, 
+                        &bulk_get_source_eq);
         if (rc != PTL_OK)
                 CERROR("PtlEQAlloc failed: %d\n", rc);
 
-        rc = PtlEQAlloc(ni, 1024, bulk_sink_callback, &bulk_sink_eq);
+        rc = PtlEQAlloc(ni, 1024, bulk_get_sink_callback, &bulk_get_sink_eq);
         if (rc != PTL_OK)
                 CERROR("PtlEQAlloc failed: %d\n", rc);
 
@@ -288,8 +393,10 @@ void ptlrpc_exit_portals(void)
         PtlEQFree(request_out_eq);
         PtlEQFree(reply_out_eq);
         PtlEQFree(reply_in_eq);
-        PtlEQFree(bulk_source_eq);
-        PtlEQFree(bulk_sink_eq);
+        PtlEQFree(bulk_put_source_eq);
+        PtlEQFree(bulk_put_sink_eq);
+        PtlEQFree(bulk_get_source_eq);
+        PtlEQFree(bulk_get_sink_eq);
 
         if (qswnal_nip != NULL)
                 inter_module_put("kqswnal_ni");