X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lnet%2Flnet%2Flib-msg.c;h=a6ffe8e72c1f03a5000de7e926c34352a35f4227;hb=76adbed805e71995d521d1a26e1e3d93f3dfd7b7;hp=9840ff53d086ffd30d162a8d738d48a7bb8915a6;hpb=c5050e412572b00cbe93d8517d2d1f767bebfa92;p=fs%2Flustre-release.git diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 9840ff5..a6ffe8e 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -1,164 +1,635 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: +/* + * GPL HEADER START * - * lib/lib-msg.c - * Message decoding, parsing and finalizing routines + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * Copyright (c) 2001-2003 Cluster File Systems, Inc. - * Copyright (c) 2001-2002 Sandia National Laboratories + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * This file is part of Lustre, http://www.sf.net/projects/lustre/ + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * GPL HEADER END + */ +/* + * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + * + * Copyright (c) 2012, 2014, Intel Corporation. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * lnet/lnet/lib-msg.c * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * Message decoding, parsing and finalizing routines + */ + +#define DEBUG_SUBSYSTEM S_LNET + +#include + +void +lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev) +{ + ENTRY; + + memset(ev, 0, sizeof(*ev)); + + ev->status = 0; + ev->unlinked = 1; + ev->type = LNET_EVENT_UNLINK; + lnet_md_deconstruct(md, &ev->md); + lnet_md2handle(&ev->md_handle, md); + EXIT; +} + +/* + * Don't need any lock, must be called after lnet_commit_md */ +void +lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type) +{ + lnet_hdr_t *hdr = &msg->msg_hdr; + lnet_event_t *ev = &msg->msg_ev; + + LASSERT(!msg->msg_routing); + + ev->type = ev_type; + + if (ev_type == LNET_EVENT_SEND) { + /* event for active message */ + ev->target.nid = le64_to_cpu(hdr->dest_nid); + ev->target.pid = le32_to_cpu(hdr->dest_pid); + ev->initiator.nid = LNET_NID_ANY; + ev->initiator.pid = the_lnet.ln_pid; + ev->source.nid = LNET_NID_ANY; + ev->source.pid = the_lnet.ln_pid; + ev->sender = LNET_NID_ANY; + + } else { + /* event for passive message */ + ev->target.pid = hdr->dest_pid; + ev->target.nid = hdr->dest_nid; + ev->initiator.pid = hdr->src_pid; + /* Multi-Rail: resolve src_nid to "primary" peer NID */ + ev->initiator.nid = msg->msg_initiator; + /* Multi-Rail: track source NID. */ + ev->source.pid = hdr->src_pid; + ev->source.nid = hdr->src_nid; + ev->rlength = hdr->payload_length; + ev->sender = msg->msg_from; + ev->mlength = msg->msg_wanted; + ev->offset = msg->msg_offset; + } + + switch (ev_type) { + default: + LBUG(); + + case LNET_EVENT_PUT: /* passive PUT */ + ev->pt_index = hdr->msg.put.ptl_index; + ev->match_bits = hdr->msg.put.match_bits; + ev->hdr_data = hdr->msg.put.hdr_data; + return; + + case LNET_EVENT_GET: /* passive GET */ + ev->pt_index = hdr->msg.get.ptl_index; + ev->match_bits = hdr->msg.get.match_bits; + ev->hdr_data = 0; + return; + + case LNET_EVENT_ACK: /* ACK */ + ev->match_bits = hdr->msg.ack.match_bits; + ev->mlength = hdr->msg.ack.mlength; + return; + + case LNET_EVENT_REPLY: /* REPLY */ + return; + + case LNET_EVENT_SEND: /* active message */ + if (msg->msg_type == LNET_MSG_PUT) { + ev->pt_index = le32_to_cpu(hdr->msg.put.ptl_index); + ev->match_bits = le64_to_cpu(hdr->msg.put.match_bits); + ev->offset = le32_to_cpu(hdr->msg.put.offset); + ev->mlength = + ev->rlength = le32_to_cpu(hdr->payload_length); + ev->hdr_data = le64_to_cpu(hdr->msg.put.hdr_data); + + } else { + LASSERT(msg->msg_type == LNET_MSG_GET); + ev->pt_index = le32_to_cpu(hdr->msg.get.ptl_index); + ev->match_bits = le64_to_cpu(hdr->msg.get.match_bits); + ev->mlength = + ev->rlength = le32_to_cpu(hdr->msg.get.sink_length); + ev->offset = le32_to_cpu(hdr->msg.get.src_offset); + ev->hdr_data = 0; + } + return; + } +} + +void +lnet_msg_commit(lnet_msg_t *msg, int cpt) +{ + struct lnet_msg_container *container = the_lnet.ln_msg_containers[cpt]; + lnet_counters_t *counters = the_lnet.ln_counters[cpt]; + + /* routed message can be committed for both receiving and sending */ + LASSERT(!msg->msg_tx_committed); + + if (msg->msg_sending) { + LASSERT(!msg->msg_receiving); + + msg->msg_tx_cpt = cpt; + msg->msg_tx_committed = 1; + if (msg->msg_rx_committed) { /* routed message REPLY */ + LASSERT(msg->msg_onactivelist); + return; + } + } else { + LASSERT(!msg->msg_sending); + msg->msg_rx_cpt = cpt; + msg->msg_rx_committed = 1; + } + + LASSERT(!msg->msg_onactivelist); + msg->msg_onactivelist = 1; + list_add(&msg->msg_activelist, &container->msc_active); + + counters->msgs_alloc++; + if (counters->msgs_alloc > counters->msgs_max) + counters->msgs_max = counters->msgs_alloc; +} + +static void +lnet_msg_decommit_tx(lnet_msg_t *msg, int status) +{ + lnet_counters_t *counters; + lnet_event_t *ev = &msg->msg_ev; + + LASSERT(msg->msg_tx_committed); + if (status != 0) + goto out; + + counters = the_lnet.ln_counters[msg->msg_tx_cpt]; + switch (ev->type) { + default: /* routed message */ + LASSERT(msg->msg_routing); + LASSERT(msg->msg_rx_committed); + LASSERT(ev->type == 0); + + counters->route_length += msg->msg_len; + counters->route_count++; + goto out; + + case LNET_EVENT_PUT: + /* should have been decommitted */ + LASSERT(!msg->msg_rx_committed); + /* overwritten while sending ACK */ + LASSERT(msg->msg_type == LNET_MSG_ACK); + msg->msg_type = LNET_MSG_PUT; /* fix type */ + break; + + case LNET_EVENT_SEND: + LASSERT(!msg->msg_rx_committed); + if (msg->msg_type == LNET_MSG_PUT) + counters->send_length += msg->msg_len; + break; + + case LNET_EVENT_GET: + LASSERT(msg->msg_rx_committed); + /* overwritten while sending reply, we should never be + * here for optimized GET */ + LASSERT(msg->msg_type == LNET_MSG_REPLY); + msg->msg_type = LNET_MSG_GET; /* fix type */ + break; + } + + counters->send_count++; + if (msg->msg_txpeer) + atomic_inc(&msg->msg_txpeer->lpni_stats.send_count); + if (msg->msg_txni) + atomic_inc(&msg->msg_txni->ni_stats.send_count); + out: + lnet_return_tx_credits_locked(msg); + msg->msg_tx_committed = 0; +} + +static void +lnet_msg_decommit_rx(lnet_msg_t *msg, int status) +{ + lnet_counters_t *counters; + lnet_event_t *ev = &msg->msg_ev; + + LASSERT(!msg->msg_tx_committed); /* decommitted or never committed */ + LASSERT(msg->msg_rx_committed); + + if (status != 0) + goto out; + + counters = the_lnet.ln_counters[msg->msg_rx_cpt]; + switch (ev->type) { + default: + LASSERT(ev->type == 0); + LASSERT(msg->msg_routing); + goto out; + + case LNET_EVENT_ACK: + LASSERT(msg->msg_type == LNET_MSG_ACK); + break; + + case LNET_EVENT_GET: + /* type is "REPLY" if it's an optimized GET on passive side, + * because optimized GET will never be committed for sending, + * so message type wouldn't be changed back to "GET" by + * lnet_msg_decommit_tx(), see details in lnet_parse_get() */ + LASSERT(msg->msg_type == LNET_MSG_REPLY || + msg->msg_type == LNET_MSG_GET); + counters->send_length += msg->msg_wanted; + break; + + case LNET_EVENT_PUT: + LASSERT(msg->msg_type == LNET_MSG_PUT); + break; + + case LNET_EVENT_REPLY: + /* type is "GET" if it's an optimized GET on active side, + * see details in lnet_create_reply_msg() */ + LASSERT(msg->msg_type == LNET_MSG_GET || + msg->msg_type == LNET_MSG_REPLY); + break; + } + + counters->recv_count++; + if (msg->msg_rxpeer) + atomic_inc(&msg->msg_rxpeer->lpni_stats.recv_count); + if (msg->msg_rxni) + atomic_inc(&msg->msg_rxni->ni_stats.recv_count); + if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY) + counters->recv_length += msg->msg_wanted; + + out: + lnet_return_rx_credits_locked(msg); + msg->msg_rx_committed = 0; +} + +void +lnet_msg_decommit(lnet_msg_t *msg, int cpt, int status) +{ + int cpt2 = cpt; + + LASSERT(msg->msg_tx_committed || msg->msg_rx_committed); + LASSERT(msg->msg_onactivelist); + + if (msg->msg_tx_committed) { /* always decommit for sending first */ + LASSERT(cpt == msg->msg_tx_cpt); + lnet_msg_decommit_tx(msg, status); + } + + if (msg->msg_rx_committed) { + /* forwarding msg committed for both receiving and sending */ + if (cpt != msg->msg_rx_cpt) { + lnet_net_unlock(cpt); + cpt2 = msg->msg_rx_cpt; + lnet_net_lock(cpt2); + } + lnet_msg_decommit_rx(msg, status); + } + + list_del(&msg->msg_activelist); + msg->msg_onactivelist = 0; + + the_lnet.ln_counters[cpt2]->msgs_alloc--; + + if (cpt2 != cpt) { + lnet_net_unlock(cpt2); + lnet_net_lock(cpt); + } +} + +void +lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md, + unsigned int offset, unsigned int mlen) +{ + /* NB: @offset and @len are only useful for receiving */ + /* Here, we attach the MD on lnet_msg and mark it busy and + * decrementing its threshold. Come what may, the lnet_msg "owns" + * the MD until a call to lnet_msg_detach_md or lnet_finalize() + * signals completion. */ + LASSERT(!msg->msg_routing); + + msg->msg_md = md; + if (msg->msg_receiving) { /* committed for receiving */ + msg->msg_offset = offset; + msg->msg_wanted = mlen; + } + + md->md_refcount++; + if (md->md_threshold != LNET_MD_THRESH_INF) { + LASSERT(md->md_threshold > 0); + md->md_threshold--; + } + + /* build umd in event */ + lnet_md2handle(&msg->msg_ev.md_handle, md); + lnet_md_deconstruct(md, &msg->msg_ev.md); +} + +void +lnet_msg_detach_md(lnet_msg_t *msg, int status) +{ + lnet_libmd_t *md = msg->msg_md; + int unlink; + + /* Now it's safe to drop my caller's ref */ + md->md_refcount--; + LASSERT(md->md_refcount >= 0); + + unlink = lnet_md_unlinkable(md); + if (md->md_eq != NULL) { + msg->msg_ev.status = status; + msg->msg_ev.unlinked = unlink; + lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev); + } + + if (unlink) + lnet_md_unlink(md); + + msg->msg_md = NULL; +} + +static int +lnet_complete_msg_locked(lnet_msg_t *msg, int cpt) +{ + struct lnet_handle_wire ack_wmd; + int rc; + int status = msg->msg_ev.status; + + LASSERT(msg->msg_onactivelist); + + if (status == 0 && msg->msg_ack) { + /* Only send an ACK if the PUT completed successfully */ + + lnet_msg_decommit(msg, cpt, 0); + + msg->msg_ack = 0; + lnet_net_unlock(cpt); + + LASSERT(msg->msg_ev.type == LNET_EVENT_PUT); + LASSERT(!msg->msg_routing); + + ack_wmd = msg->msg_hdr.msg.put.ack_wmd; + + lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.source, 0, 0); + + msg->msg_hdr.msg.ack.dst_wmd = ack_wmd; + msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits; + msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength); + + /* NB: we probably want to use NID of msg::msg_from as 3rd + * parameter (router NID) if it's routed message */ + rc = lnet_send(msg->msg_ev.target.nid, msg, LNET_NID_ANY); + + lnet_net_lock(cpt); + /* + * NB: message is committed for sending, we should return + * on success because LND will finalize this message later. + * + * Also, there is possibility that message is committed for + * sending and also failed before delivering to LND, + * i.e: ENOMEM, in that case we can't fall through either + * because CPT for sending can be different with CPT for + * receiving, so we should return back to lnet_finalize() + * to make sure we are locking the correct partition. + */ + return rc; + + } else if (status == 0 && /* OK so far */ + (msg->msg_routing && !msg->msg_sending)) { + /* not forwarded */ + LASSERT(!msg->msg_receiving); /* called back recv already */ + lnet_net_unlock(cpt); + + rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY); + + lnet_net_lock(cpt); + /* + * NB: message is committed for sending, we should return + * on success because LND will finalize this message later. + * + * Also, there is possibility that message is committed for + * sending and also failed before delivering to LND, + * i.e: ENOMEM, in that case we can't fall through either: + * - The rule is message must decommit for sending first if + * the it's committed for both sending and receiving + * - CPT for sending can be different with CPT for receiving, + * so we should return back to lnet_finalize() to make + * sure we are locking the correct partition. + */ + return rc; + } + + lnet_msg_decommit(msg, cpt, status); + lnet_msg_free(msg); + return 0; +} + +void +lnet_finalize(lnet_ni_t *ni, lnet_msg_t *msg, int status) +{ + struct lnet_msg_container *container; + int my_slot; + int cpt; + int rc; + int i; + + LASSERT(!in_interrupt()); + + if (msg == NULL) + return; + + msg->msg_ev.status = status; + + if (msg->msg_md != NULL) { + cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie); + + lnet_res_lock(cpt); + lnet_msg_detach_md(msg, status); + lnet_res_unlock(cpt); + } + + again: + rc = 0; + if (!msg->msg_tx_committed && !msg->msg_rx_committed) { + /* not committed to network yet */ + LASSERT(!msg->msg_onactivelist); + lnet_msg_free(msg); + return; + } + + /* + * NB: routed message can be committed for both receiving and sending, + * we should finalize in LIFO order and keep counters correct. + * (finalize sending first then finalize receiving) + */ + cpt = msg->msg_tx_committed ? msg->msg_tx_cpt : msg->msg_rx_cpt; + lnet_net_lock(cpt); + + container = the_lnet.ln_msg_containers[cpt]; + list_add_tail(&msg->msg_list, &container->msc_finalizing); + + /* Recursion breaker. Don't complete the message here if I am (or + * enough other threads are) already completing messages */ -#ifndef __KERNEL__ -# include -#else -# define DEBUG_SUBSYSTEM S_PORTALS -# include -#endif + my_slot = -1; + for (i = 0; i < container->msc_nfinalizers; i++) { + if (container->msc_finalizers[i] == current) + break; -#include + if (my_slot < 0 && container->msc_finalizers[i] == NULL) + my_slot = i; + } -int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg) + if (i < container->msc_nfinalizers || my_slot < 0) { + lnet_net_unlock(cpt); + return; + } + + container->msc_finalizers[my_slot] = current; + + while (!list_empty(&container->msc_finalizing)) { + msg = list_entry(container->msc_finalizing.next, + lnet_msg_t, msg_list); + + list_del(&msg->msg_list); + + /* NB drops and regains the lnet lock if it actually does + * anything, so my finalizing friends can chomp along too */ + rc = lnet_complete_msg_locked(msg, cpt); + if (rc != 0) + break; + } + + if (unlikely(!list_empty(&the_lnet.ln_delay_rules))) { + lnet_net_unlock(cpt); + lnet_delay_rule_check(); + lnet_net_lock(cpt); + } + + container->msc_finalizers[my_slot] = NULL; + lnet_net_unlock(cpt); + + if (rc != 0) + goto again; +} +EXPORT_SYMBOL(lnet_finalize); + +void +lnet_msg_container_cleanup(struct lnet_msg_container *container) +{ + int count = 0; + + if (container->msc_init == 0) + return; + + while (!list_empty(&container->msc_active)) { + lnet_msg_t *msg = list_entry(container->msc_active.next, + lnet_msg_t, msg_activelist); + + LASSERT(msg->msg_onactivelist); + msg->msg_onactivelist = 0; + list_del(&msg->msg_activelist); + lnet_msg_free(msg); + count++; + } + + if (count > 0) + CERROR("%d active msg on exit\n", count); + + if (container->msc_finalizers != NULL) { + LIBCFS_FREE(container->msc_finalizers, + container->msc_nfinalizers * + sizeof(*container->msc_finalizers)); + container->msc_finalizers = NULL; + } + container->msc_init = 0; +} + +int +lnet_msg_container_setup(struct lnet_msg_container *container, int cpt) +{ + int rc; + + container->msc_init = 1; + + INIT_LIST_HEAD(&container->msc_active); + INIT_LIST_HEAD(&container->msc_finalizing); + + rc = 0; + /* number of CPUs */ + container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt); + + LIBCFS_CPT_ALLOC(container->msc_finalizers, lnet_cpt_table(), cpt, + container->msc_nfinalizers * + sizeof(*container->msc_finalizers)); + + if (container->msc_finalizers == NULL) { + CERROR("Failed to allocate message finalizers\n"); + lnet_msg_container_cleanup(container); + return -ENOMEM; + } + + return rc; +} + +void +lnet_msg_containers_destroy(void) +{ + struct lnet_msg_container *container; + int i; + + if (the_lnet.ln_msg_containers == NULL) + return; + + cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers) + lnet_msg_container_cleanup(container); + + cfs_percpt_free(the_lnet.ln_msg_containers); + the_lnet.ln_msg_containers = NULL; +} + +int +lnet_msg_containers_create(void) { - lib_md_t *md; - lib_eq_t *eq; - int rc; - unsigned long flags; - - /* ni went down while processing this message */ - if (nal->ni.up == 0) { - return -1; - } - - if (msg == NULL) - return 0; - - rc = 0; - if (msg->send_ack) { - ptl_hdr_t ack; - - LASSERT (!ptl_is_wire_handle_none (&msg->ack_wmd)); - - memset (&ack, 0, sizeof (ack)); - ack.type = HTON__u32 (PTL_MSG_ACK); - ack.dest_nid = HTON__u64 (msg->nid); - ack.src_nid = HTON__u64 (nal->ni.nid); - ack.dest_pid = HTON__u32 (msg->pid); - ack.src_pid = HTON__u32 (nal->ni.pid); - ack.payload_length = 0; - - ack.msg.ack.dst_wmd = msg->ack_wmd; - ack.msg.ack.match_bits = msg->ev.match_bits; - ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength); - - rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK, - msg->nid, msg->pid, NULL, 0, 0); - /* If this send fails, there's nothing else to clean up */ - } - - md = msg->md; - LASSERT (md->pending > 0); /* I've not dropped my ref yet */ - eq = md->eq; - - state_lock(nal, &flags); - - if (eq != NULL) { - ptl_event_t *ev = &msg->ev; - ptl_event_t *eq_slot; - - /* I have to hold the lock while I bump the sequence number - * and copy the event into the queue. If not, and I was - * interrupted after bumping the sequence number, other - * events could fill the queue, including the slot I just - * allocated to this event. On resuming, I would overwrite - * a more 'recent' event with old event state, and - * processes taking events off the queue would not detect - * overflow correctly. - */ - - ev->sequence = eq->sequence++;/* Allocate the next queue slot */ - - /* size must be a power of 2 to handle a wrapped sequence # */ - LASSERT (eq->size != 0 && - eq->size == LOWEST_BIT_SET (eq->size)); - eq_slot = eq->base + (ev->sequence & (eq->size - 1)); - - /* Invalidate unlinked_me unless this is the last - * event for an auto-unlinked MD. Note that if md was - * auto-unlinked, md->pending can only decrease - */ - if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || /* not auto-unlinked */ - md->pending != 1) /* not last ref */ - ev->unlinked_me = PTL_HANDLE_NONE; - - /* Copy the event into the allocated slot, ensuring all the - * rest of the event's contents have been copied _before_ - * the sequence number gets updated. A processes 'getting' - * an event waits on the next queue slot's sequence to be - * 'new'. When it is, _all_ other event fields had better - * be consistent. I assert 'sequence' is the last member, - * so I only need a 2 stage copy. - */ - LASSERT(sizeof (ptl_event_t) == - offsetof(ptl_event_t, sequence) + sizeof(ev->sequence)); - - rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev, - offsetof (ptl_event_t, sequence)); - LASSERT (rc == 0); - -#ifdef __KERNEL__ - barrier(); -#endif - /* Updating the sequence number is what makes the event 'new' */ - - /* cb_write is not necessarily atomic, so this could - cause a race with PtlEQGet */ - rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence, - (void *)&ev->sequence,sizeof (ev->sequence)); - LASSERT (rc == 0); - -#ifdef __KERNEL__ - barrier(); -#endif - - /* I must also ensure that (a) callbacks are made in the - * same order as the events land in the queue, and (b) the - * callback occurs before the event can be removed from the - * queue, so I can't drop the lock during the callback. */ - if (nal->cb_callback != NULL) - nal->cb_callback(nal, private, eq, ev); - else if (eq->event_callback != NULL) - (void)((eq->event_callback) (ev)); - } - - LASSERT ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || - (md->md_flags & PTL_MD_FLAG_UNLINK) != 0); - - md->pending--; - if (md->pending == 0 && /* no more outstanding operations on this md */ - (md->threshold == 0 || /* done its business */ - (md->md_flags & PTL_MD_FLAG_UNLINK) != 0)) /* marked for death */ - lib_md_unlink(nal, md); - - list_del (&msg->msg_list); - nal->ni.counters.msgs_alloc--; - lib_msg_free(nal, msg); - - state_unlock(nal, &flags); - - return rc; + struct lnet_msg_container *container; + int rc; + int i; + + the_lnet.ln_msg_containers = cfs_percpt_alloc(lnet_cpt_table(), + sizeof(*container)); + + if (the_lnet.ln_msg_containers == NULL) { + CERROR("Failed to allocate cpu-partition data for network\n"); + return -ENOMEM; + } + + cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers) { + rc = lnet_msg_container_setup(container, i); + if (rc != 0) { + lnet_msg_containers_destroy(); + return rc; + } + } + + return 0; }