Whamcloud - gitweb
land b_md onto HEAD. the highlights:
[fs/lustre-release.git] / lustre / ptlrpc / events.c
index 0bdf0d8..c260f5d 100644 (file)
@@ -170,6 +170,8 @@ static int bulk_source_callback(ptl_event_t *ev)
         LASSERT(ev->mem_desc.niov == desc->bd_page_count);
 
         if (atomic_dec_and_test(&desc->bd_source_callback_count)) {
+                void (*event_handler)(struct ptlrpc_bulk_desc *);
+
                 list_for_each_safe(tmp, next, &desc->bd_page_list) {
                         bulk = list_entry(tmp, struct ptlrpc_bulk_page,
                                           bp_link);
@@ -177,10 +179,18 @@ static int bulk_source_callback(ptl_event_t *ev)
                         if (bulk->bp_cb != NULL)
                                 bulk->bp_cb(bulk);
                 }
+
+                /* We need to make a note of whether there's an event handler
+                 * before we call wake_up, because if there is no event handler,
+                 * 'desc' might be freed before we're scheduled again. */
+                event_handler = desc->bd_ptl_ev_hdlr;
+
                 desc->bd_flags |= PTL_BULK_FL_SENT;
                 wake_up(&desc->bd_waitq);
-                if (desc->bd_ptl_ev_hdlr != NULL)
-                        desc->bd_ptl_ev_hdlr(desc);
+                if (event_handler) {
+                        LASSERT(desc->bd_ptl_ev_hdlr == event_handler);
+                        event_handler(desc);
+                }
         }
 
         RETURN(0);
@@ -193,35 +203,39 @@ static int bulk_sink_callback(ptl_event_t *ev)
         struct list_head        *tmp;
         struct list_head        *next;
         ptl_size_t               total = 0;
+        void                   (*event_handler)(struct ptlrpc_bulk_desc *);
         ENTRY;
 
-        if (ev->type == PTL_EVENT_PUT) {
-                /* put with zero offset */
-                LASSERT(ev->offset == 0);
-                /* used iovs */
-                LASSERT((ev->mem_desc.options & PTL_MD_IOV) != 0);
-                /* 1 fragment for each page always */
-                LASSERT(ev->mem_desc.niov == desc->bd_page_count);
-
-                list_for_each_safe (tmp, next, &desc->bd_page_list) {
-                        bulk = list_entry(tmp, struct ptlrpc_bulk_page,
-                                          bp_link);
+        LASSERT(ev->type == PTL_EVENT_PUT);
+
+        /* put with zero offset */
+        LASSERT(ev->offset == 0);
+        /* used iovs */
+        LASSERT((ev->mem_desc.options & PTL_MD_IOV) != 0);
+        /* 1 fragment for each page always */
+        LASSERT(ev->mem_desc.niov == desc->bd_page_count);
 
-                        total += bulk->bp_buflen;
+        list_for_each_safe (tmp, next, &desc->bd_page_list) {
+                bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
 
-                        if (bulk->bp_cb != NULL)
-                                bulk->bp_cb(bulk);
-                }
+                total += bulk->bp_buflen;
+
+                if (bulk->bp_cb != NULL)
+                        bulk->bp_cb(bulk);
+        }
 
-                LASSERT(ev->mem_desc.length == total);
+        LASSERT(ev->mem_desc.length == total);
 
-                desc->bd_flags |= PTL_BULK_FL_RCVD;
-                wake_up(&desc->bd_waitq);
-                if (desc->bd_ptl_ev_hdlr != NULL)
-                        desc->bd_ptl_ev_hdlr(desc);
-        } else {
-                CERROR("Unexpected event type!\n");
-                LBUG();
+        /* We need to make a note of whether there's an event handler
+         * before we call wake_up, because if there is no event
+         * handler, 'desc' might be freed before we're scheduled again. */
+        event_handler = desc->bd_ptl_ev_hdlr;
+
+        desc->bd_flags |= PTL_BULK_FL_RCVD;
+        wake_up(&desc->bd_waitq);
+        if (event_handler) {
+                LASSERT(desc->bd_ptl_ev_hdlr == event_handler);
+                event_handler(desc);
         }
 
         RETURN(1);