Whamcloud - gitweb
LU-3963 libcfs: remove proc handler wrappers
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
index 6c705f0..b1995f2 100644 (file)
@@ -26,6 +26,8 @@
 /*
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -74,7 +76,7 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
                ev->target.pid    = le32_to_cpu(hdr->dest_pid);
                ev->initiator.nid = LNET_NID_ANY;
                ev->initiator.pid = the_lnet.ln_pid;
-               ev->sender        = LNET_NID_ANY;
+               ev->sender        = LNET_NID_ANY;
 
        } else {
                /* event for passive message */
@@ -83,7 +85,7 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
                ev->initiator.pid = hdr->src_pid;
                ev->initiator.nid = hdr->src_nid;
                ev->rlength       = hdr->payload_length;
-               ev->sender        = msg->msg_from;
+               ev->sender        = msg->msg_from;
                ev->mlength       = msg->msg_wanted;
                ev->offset        = msg->msg_offset;
        }
@@ -160,7 +162,7 @@ lnet_msg_commit(lnet_msg_t *msg, int cpt)
 
        LASSERT(!msg->msg_onactivelist);
        msg->msg_onactivelist = 1;
-       cfs_list_add(&msg->msg_activelist, &container->msc_active);
+       list_add(&msg->msg_activelist, &container->msc_active);
 
        counters->msgs_alloc++;
        if (counters->msgs_alloc > counters->msgs_max)
@@ -204,11 +206,10 @@ lnet_msg_decommit_tx(lnet_msg_t *msg, int status)
 
        case LNET_EVENT_GET:
                LASSERT(msg->msg_rx_committed);
-               /* overwritten while sending reply */
+               /* overwritten while sending reply, we should never be
+                * here for optimized GET */
                LASSERT(msg->msg_type == LNET_MSG_REPLY);
-
                msg->msg_type = LNET_MSG_GET; /* fix type */
-               counters->send_length += msg->msg_len;
                break;
        }
 
@@ -230,6 +231,7 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
        if (status != 0)
                goto out;
 
+       counters = the_lnet.ln_counters[msg->msg_rx_cpt];
        switch (ev->type) {
        default:
                LASSERT(ev->type == 0);
@@ -241,7 +243,13 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
                break;
 
        case LNET_EVENT_GET:
-               LASSERT(msg->msg_type == LNET_MSG_GET);
+               /* type is "REPLY" if it's an optimized GET on passive side,
+                * because optimized GET will never be committed for sending,
+                * so message type wouldn't be changed back to "GET" by
+                * lnet_msg_decommit_tx(), see details in lnet_parse_get() */
+               LASSERT(msg->msg_type == LNET_MSG_REPLY ||
+                       msg->msg_type == LNET_MSG_GET);
+               counters->send_length += msg->msg_wanted;
                break;
 
        case LNET_EVENT_PUT:
@@ -249,12 +257,13 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
                break;
 
        case LNET_EVENT_REPLY:
-               LASSERT(msg->msg_type == LNET_MSG_REPLY ||
-                       msg->msg_type == LNET_MSG_GET); /* optimized GET */
+               /* type is "GET" if it's an optimized GET on active side,
+                * see details in lnet_create_reply_msg() */
+               LASSERT(msg->msg_type == LNET_MSG_GET ||
+                       msg->msg_type == LNET_MSG_REPLY);
                break;
        }
 
-       counters = the_lnet.ln_counters[msg->msg_rx_cpt];
        counters->recv_count++;
        if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
                counters->recv_length += msg->msg_wanted;
@@ -287,7 +296,7 @@ lnet_msg_decommit(lnet_msg_t *msg, int cpt, int status)
                lnet_msg_decommit_rx(msg, status);
        }
 
-       cfs_list_del(&msg->msg_activelist);
+       list_del(&msg->msg_activelist);
        msg->msg_onactivelist = 0;
 
        the_lnet.ln_counters[cpt2]->msgs_alloc--;
@@ -310,7 +319,7 @@ lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
        LASSERT(!msg->msg_routing);
 
        msg->msg_md = md;
-       if (msg->msg_receiving) { /* commited for receiving */
+       if (msg->msg_receiving) { /* committed for receiving */
                msg->msg_offset = offset;
                msg->msg_wanted = mlen;
        }
@@ -349,7 +358,7 @@ lnet_msg_detach_md(lnet_msg_t *msg, int status)
        msg->msg_md = NULL;
 }
 
-void
+static int
 lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
 {
         lnet_handle_wire_t ack_wmd;
@@ -382,9 +391,19 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
                rc = lnet_send(msg->msg_ev.target.nid, msg, LNET_NID_ANY);
 
                lnet_net_lock(cpt);
+               /*
+                * NB: message is committed for sending, we should return
+                * on success because LND will finalize this message later.
+                *
+                * Also, there is possibility that message is committed for
+                * sending and also failed before delivering to LND,
+                * i.e: ENOMEM, in that case we can't fall through either
+                * because CPT for sending can be different with CPT for
+                * receiving, so we should return back to lnet_finalize()
+                * to make sure we are locking the correct partition.
+                */
+               return rc;
 
-               if (rc == 0)
-                       return;
        } else if (status == 0 &&       /* OK so far */
                   (msg->msg_routing && !msg->msg_sending)) {
                /* not forwarded */
@@ -394,13 +413,25 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
                rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY);
 
                lnet_net_lock(cpt);
-
-               if (rc == 0)
-                       return;
+               /*
+                * NB: message is committed for sending, we should return
+                * on success because LND will finalize this message later.
+                *
+                * Also, there is possibility that message is committed for
+                * sending and also failed before delivering to LND,
+                * i.e: ENOMEM, in that case we can't fall through either:
+                * - The rule is message must decommit for sending first if
+                *   the it's committed for both sending and receiving
+                * - CPT for sending can be different with CPT for receiving,
+                *   so we should return back to lnet_finalize() to make
+                *   sure we are locking the correct partition.
+                */
+               return rc;
        }
 
        lnet_msg_decommit(msg, cpt, status);
        lnet_msg_free_locked(msg);
+       return 0;
 }
 
 void
@@ -409,12 +440,13 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
        struct lnet_msg_container       *container;
        int                             my_slot;
        int                             cpt;
+       int                             rc;
        int                             i;
 
-        LASSERT (!cfs_in_interrupt ());
+       LASSERT (!in_interrupt ());
 
-        if (msg == NULL)
-                return;
+       if (msg == NULL)
+               return;
 #if 0
         CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
                lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
@@ -432,9 +464,6 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
                msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
                msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
 #endif
-
-        LASSERT (msg->msg_onactivelist);
-
         msg->msg_ev.status = status;
 
        if (msg->msg_md != NULL) {
@@ -445,15 +474,17 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
                lnet_res_unlock(cpt);
        }
 
+ again:
+       rc = 0;
        if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
-               /* not commited to network yet */
+               /* not committed to network yet */
                LASSERT(!msg->msg_onactivelist);
                lnet_msg_free(msg);
                return;
        }
 
        /*
-        * NB: routed message can be commited for both receiving and sending,
+        * NB: routed message can be committed for both receiving and sending,
         * we should finalize in LIFO order and keep counters correct.
         * (finalize sending first then finalize receiving)
         */
@@ -461,7 +492,7 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
        lnet_net_lock(cpt);
 
        container = the_lnet.ln_msg_containers[cpt];
-       cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
+       list_add_tail(&msg->msg_list, &container->msc_finalizing);
 
        /* Recursion breaker.  Don't complete the message here if I am (or
         * enough other threads are) already completing messages */
@@ -469,41 +500,50 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
 #ifdef __KERNEL__
        my_slot = -1;
        for (i = 0; i < container->msc_nfinalizers; i++) {
-               if (container->msc_finalizers[i] == cfs_current())
-                       goto out;
+               if (container->msc_finalizers[i] == current)
+                       break;
 
                if (my_slot < 0 && container->msc_finalizers[i] == NULL)
                        my_slot = i;
        }
 
-       if (my_slot < 0)
-               goto out;
+       if (i < container->msc_nfinalizers || my_slot < 0) {
+               lnet_net_unlock(cpt);
+               return;
+       }
 
-       container->msc_finalizers[my_slot] = cfs_current();
+       container->msc_finalizers[my_slot] = current;
 #else
        LASSERT(container->msc_nfinalizers == 1);
-       if (container->msc_finalizers[0] != NULL)
-               goto out;
+       if (container->msc_finalizers[0] != NULL) {
+               lnet_net_unlock(cpt);
+               return;
+       }
 
        my_slot = i = 0;
        container->msc_finalizers[0] = (struct lnet_msg_container *)1;
 #endif
 
-       while (!cfs_list_empty(&container->msc_finalizing)) {
-               msg = cfs_list_entry(container->msc_finalizing.next,
-                                    lnet_msg_t, msg_list);
+       while (!list_empty(&container->msc_finalizing)) {
+               msg = list_entry(container->msc_finalizing.next,
+                                lnet_msg_t, msg_list);
 
-               cfs_list_del(&msg->msg_list);
+               list_del(&msg->msg_list);
 
                /* NB drops and regains the lnet lock if it actually does
                 * anything, so my finalizing friends can chomp along too */
-               lnet_complete_msg_locked(msg, cpt);
+               rc = lnet_complete_msg_locked(msg, cpt);
+               if (rc != 0)
+                       break;
        }
 
        container->msc_finalizers[my_slot] = NULL;
- out:
        lnet_net_unlock(cpt);
+
+       if (rc != 0)
+               goto again;
 }
+EXPORT_SYMBOL(lnet_finalize);
 
 void
 lnet_msg_container_cleanup(struct lnet_msg_container *container)
@@ -513,13 +553,13 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container)
        if (container->msc_init == 0)
                return;
 
-       while (!cfs_list_empty(&container->msc_active)) {
-               lnet_msg_t *msg = cfs_list_entry(container->msc_active.next,
-                                                lnet_msg_t, msg_activelist);
+       while (!list_empty(&container->msc_active)) {
+               lnet_msg_t *msg = list_entry(container->msc_active.next,
+                                            lnet_msg_t, msg_activelist);
 
                LASSERT(msg->msg_onactivelist);
                msg->msg_onactivelist = 0;
-               cfs_list_del(&msg->msg_activelist);
+               list_del(&msg->msg_activelist);
                lnet_msg_free(msg);
                count++;
        }
@@ -546,8 +586,8 @@ lnet_msg_container_setup(struct lnet_msg_container *container, int cpt)
 
        container->msc_init = 1;
 
-       CFS_INIT_LIST_HEAD(&container->msc_active);
-       CFS_INIT_LIST_HEAD(&container->msc_finalizing);
+       INIT_LIST_HEAD(&container->msc_active);
+       INIT_LIST_HEAD(&container->msc_finalizing);
 
 #ifdef LNET_USE_LIB_FREELIST
        memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));