Whamcloud - gitweb
LU-7955 gnilnd: Add ability to set bte_get/put_dlvr_mode
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
index a252718..82c625e 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, Intel Corporation.
+ * Copyright (c) 2012, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -43,7 +43,7 @@
 #include <lnet/lib-lnet.h>
 
 void
-lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
+lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev)
 {
         ENTRY;
 
@@ -162,7 +162,7 @@ lnet_msg_commit(lnet_msg_t *msg, int cpt)
 
        LASSERT(!msg->msg_onactivelist);
        msg->msg_onactivelist = 1;
-       cfs_list_add(&msg->msg_activelist, &container->msc_active);
+       list_add(&msg->msg_activelist, &container->msc_active);
 
        counters->msgs_alloc++;
        if (counters->msgs_alloc > counters->msgs_max)
@@ -296,7 +296,7 @@ lnet_msg_decommit(lnet_msg_t *msg, int cpt, int status)
                lnet_msg_decommit_rx(msg, status);
        }
 
-       cfs_list_del(&msg->msg_activelist);
+       list_del(&msg->msg_activelist);
        msg->msg_onactivelist = 0;
 
        the_lnet.ln_counters[cpt2]->msgs_alloc--;
@@ -319,7 +319,7 @@ lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
        LASSERT(!msg->msg_routing);
 
        msg->msg_md = md;
-       if (msg->msg_receiving) { /* commited for receiving */
+       if (msg->msg_receiving) { /* committed for receiving */
                msg->msg_offset = offset;
                msg->msg_wanted = mlen;
        }
@@ -365,7 +365,7 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
         int                rc;
         int                status = msg->msg_ev.status;
 
-        LASSERT (msg->msg_onactivelist);
+       LASSERT(msg->msg_onactivelist);
 
         if (status == 0 && msg->msg_ack) {
                 /* Only send an ACK if the PUT completed successfully */
@@ -395,7 +395,7 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
                 * NB: message is committed for sending, we should return
                 * on success because LND will finalize this message later.
                 *
-                * Also, there is possibility that message is commited for
+                * Also, there is possibility that message is committed for
                 * sending and also failed before delivering to LND,
                 * i.e: ENOMEM, in that case we can't fall through either
                 * because CPT for sending can be different with CPT for
@@ -417,7 +417,7 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
                 * NB: message is committed for sending, we should return
                 * on success because LND will finalize this message later.
                 *
-                * Also, there is possibility that message is commited for
+                * Also, there is possibility that message is committed for
                 * sending and also failed before delivering to LND,
                 * i.e: ENOMEM, in that case we can't fall through either:
                 * - The rule is message must decommit for sending first if
@@ -430,12 +430,12 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
        }
 
        lnet_msg_decommit(msg, cpt, status);
-       lnet_msg_free_locked(msg);
+       lnet_msg_free(msg);
        return 0;
 }
 
 void
-lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
+lnet_finalize(lnet_ni_t *ni, lnet_msg_t *msg, int status)
 {
        struct lnet_msg_container       *container;
        int                             my_slot;
@@ -443,10 +443,10 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
        int                             rc;
        int                             i;
 
-        LASSERT (!cfs_in_interrupt ());
+       LASSERT(!in_interrupt());
 
-        if (msg == NULL)
-                return;
+       if (msg == NULL)
+               return;
 #if 0
         CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
                lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
@@ -477,14 +477,14 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
  again:
        rc = 0;
        if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
-               /* not commited to network yet */
+               /* not committed to network yet */
                LASSERT(!msg->msg_onactivelist);
                lnet_msg_free(msg);
                return;
        }
 
        /*
-        * NB: routed message can be commited for both receiving and sending,
+        * NB: routed message can be committed for both receiving and sending,
         * we should finalize in LIFO order and keep counters correct.
         * (finalize sending first then finalize receiving)
         */
@@ -492,15 +492,14 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
        lnet_net_lock(cpt);
 
        container = the_lnet.ln_msg_containers[cpt];
-       cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
+       list_add_tail(&msg->msg_list, &container->msc_finalizing);
 
        /* Recursion breaker.  Don't complete the message here if I am (or
         * enough other threads are) already completing messages */
 
-#ifdef __KERNEL__
        my_slot = -1;
        for (i = 0; i < container->msc_nfinalizers; i++) {
-               if (container->msc_finalizers[i] == cfs_current())
+               if (container->msc_finalizers[i] == current)
                        break;
 
                if (my_slot < 0 && container->msc_finalizers[i] == NULL)
@@ -512,23 +511,13 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
                return;
        }
 
-       container->msc_finalizers[my_slot] = cfs_current();
-#else
-       LASSERT(container->msc_nfinalizers == 1);
-       if (container->msc_finalizers[0] != NULL) {
-               lnet_net_unlock(cpt);
-               return;
-       }
+       container->msc_finalizers[my_slot] = current;
 
-       my_slot = i = 0;
-       container->msc_finalizers[0] = (struct lnet_msg_container *)1;
-#endif
-
-       while (!cfs_list_empty(&container->msc_finalizing)) {
-               msg = cfs_list_entry(container->msc_finalizing.next,
-                                    lnet_msg_t, msg_list);
+       while (!list_empty(&container->msc_finalizing)) {
+               msg = list_entry(container->msc_finalizing.next,
+                                lnet_msg_t, msg_list);
 
-               cfs_list_del(&msg->msg_list);
+               list_del(&msg->msg_list);
 
                /* NB drops and regains the lnet lock if it actually does
                 * anything, so my finalizing friends can chomp along too */
@@ -537,6 +526,12 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
                        break;
        }
 
+       if (unlikely(!list_empty(&the_lnet.ln_delay_rules))) {
+               lnet_net_unlock(cpt);
+               lnet_delay_rule_check();
+               lnet_net_lock(cpt);
+       }
+
        container->msc_finalizers[my_slot] = NULL;
        lnet_net_unlock(cpt);
 
@@ -553,13 +548,13 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container)
        if (container->msc_init == 0)
                return;
 
-       while (!cfs_list_empty(&container->msc_active)) {
-               lnet_msg_t *msg = cfs_list_entry(container->msc_active.next,
-                                                lnet_msg_t, msg_activelist);
+       while (!list_empty(&container->msc_active)) {
+               lnet_msg_t *msg = list_entry(container->msc_active.next,
+                                            lnet_msg_t, msg_activelist);
 
                LASSERT(msg->msg_onactivelist);
                msg->msg_onactivelist = 0;
-               cfs_list_del(&msg->msg_activelist);
+               list_del(&msg->msg_activelist);
                lnet_msg_free(msg);
                count++;
        }
@@ -573,9 +568,6 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container)
                            sizeof(*container->msc_finalizers));
                container->msc_finalizers = NULL;
        }
-#ifdef LNET_USE_LIB_FREELIST
-       lnet_freelist_fini(&container->msc_freelist);
-#endif
        container->msc_init = 0;
 }
 
@@ -586,22 +578,10 @@ lnet_msg_container_setup(struct lnet_msg_container *container, int cpt)
 
        container->msc_init = 1;
 
-       CFS_INIT_LIST_HEAD(&container->msc_active);
-       CFS_INIT_LIST_HEAD(&container->msc_finalizing);
-
-#ifdef LNET_USE_LIB_FREELIST
-       memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
+       INIT_LIST_HEAD(&container->msc_active);
+       INIT_LIST_HEAD(&container->msc_finalizing);
 
-       rc = lnet_freelist_init(&container->msc_freelist,
-                               LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
-       if (rc != 0) {
-               CERROR("Failed to init freelist for message container\n");
-               lnet_msg_container_cleanup(container);
-               return rc;
-       }
-#else
        rc = 0;
-#endif
        /* number of CPUs */
        container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt);