Whamcloud - gitweb
LU-56 lnet: add lnet_*_free_locked for LNet
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/lnet/lib-msg.c
35  *
36  * Message decoding, parsing and finalizing routines
37  */
38
39 #define DEBUG_SUBSYSTEM S_LNET
40
41 #include <lnet/lib-lnet.h>
42
43 void
44 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
45 {
46         ENTRY;
47
48         memset(ev, 0, sizeof(*ev));
49
50         ev->status   = 0;
51         ev->unlinked = 1;
52         ev->type     = LNET_EVENT_UNLINK;
53         lnet_md_deconstruct(md, &ev->md);
54         lnet_md2handle(&ev->md_handle, md);
55         EXIT;
56 }
57
58 void
59 lnet_enq_event_locked (lnet_eq_t *eq, lnet_event_t *ev)
60 {
61         lnet_event_t  *eq_slot;
62
63         /* Allocate the next queue slot */
64         ev->sequence = eq->eq_enq_seq++;
65
66         /* size must be a power of 2 to handle sequence # overflow */
67         LASSERT (eq->eq_size != 0 &&
68                  eq->eq_size == LOWEST_BIT_SET (eq->eq_size));
69         eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
70
71         /* There is no race since both event consumers and event producers
72          * take the LNET_LOCK, so we don't screw around with memory
73          * barriers, setting the sequence number last or weird structure
74          * layout assertions. */
75         *eq_slot = *ev;
76
77         /* Call the callback handler (if any) */
78         if (eq->eq_callback != NULL)
79                 eq->eq_callback (eq_slot);
80
81 #ifdef __KERNEL__
82         /* Wake anyone waiting in LNetEQPoll() */
83         if (cfs_waitq_active(&the_lnet.ln_waitq))
84                 cfs_waitq_broadcast(&the_lnet.ln_waitq);
85 #else
86 # ifndef HAVE_LIBPTHREAD
87         /* LNetEQPoll() calls into _the_ LND to wait for action */
88 # else
89         /* Wake anyone waiting in LNetEQPoll() */
90         pthread_cond_broadcast(&the_lnet.ln_cond);
91 # endif
92 #endif
93 }
94
95 void
96 lnet_complete_msg_locked(lnet_msg_t *msg)
97 {
98         lnet_handle_wire_t ack_wmd;
99         int                rc;
100         int                status = msg->msg_ev.status;
101
102         LASSERT (msg->msg_onactivelist);
103
104         if (status == 0 && msg->msg_ack) {
105                 /* Only send an ACK if the PUT completed successfully */
106
107                 lnet_return_credits_locked(msg);
108
109                 msg->msg_ack = 0;
110                 LNET_UNLOCK();
111
112                 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
113                 LASSERT(!msg->msg_routing);
114
115                 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
116
117                 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
118
119                 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
120                 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
121                 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
122
123                 rc = lnet_send(msg->msg_ev.target.nid, msg);
124
125                 LNET_LOCK();
126
127                 if (rc == 0)
128                         return;
129         } else if (status == 0 &&               /* OK so far */
130                    (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
131                 
132                 LASSERT (!msg->msg_receiving);  /* called back recv already */
133         
134                 LNET_UNLOCK();
135                 
136                 rc = lnet_send(LNET_NID_ANY, msg);
137
138                 LNET_LOCK();
139
140                 if (rc == 0)
141                         return;
142         }
143
144         lnet_return_credits_locked(msg);
145
146         LASSERT (msg->msg_onactivelist);
147         msg->msg_onactivelist = 0;
148         cfs_list_del (&msg->msg_activelist);
149         the_lnet.ln_counters.msgs_alloc--;
150         lnet_msg_free_locked(msg);
151 }
152
153
154 void
155 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
156 {
157 #ifdef __KERNEL__
158         int                i;
159         int                my_slot;
160 #endif
161         lnet_libmd_t      *md;
162
163         LASSERT (!cfs_in_interrupt ());
164
165         if (msg == NULL)
166                 return;
167 #if 0
168         CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
169                lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
170                msg->msg_target_is_router ? "t" : "",
171                msg->msg_routing ? "X" : "",
172                msg->msg_ack ? "A" : "",
173                msg->msg_sending ? "S" : "",
174                msg->msg_receiving ? "R" : "",
175                msg->msg_delayed ? "d" : "",
176                msg->msg_txcredit ? "C" : "",
177                msg->msg_peertxcredit ? "c" : "",
178                msg->msg_rtrcredit ? "F" : "",
179                msg->msg_peerrtrcredit ? "f" : "",
180                msg->msg_onactivelist ? "!" : "",
181                msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
182                msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
183 #endif
184         LNET_LOCK();
185
186         LASSERT (msg->msg_onactivelist);
187
188         msg->msg_ev.status = status;
189
190         md = msg->msg_md;
191         if (md != NULL) {
192                 int      unlink;
193
194                 /* Now it's safe to drop my caller's ref */
195                 md->md_refcount--;
196                 LASSERT (md->md_refcount >= 0);
197
198                 unlink = lnet_md_unlinkable(md);
199
200                 msg->msg_ev.unlinked = unlink;
201
202                 if (md->md_eq != NULL)
203                         lnet_enq_event_locked(md->md_eq, &msg->msg_ev);
204
205                 if (unlink)
206                         lnet_md_unlink(md);
207
208                 msg->msg_md = NULL;
209         }
210
211         cfs_list_add_tail (&msg->msg_list, &the_lnet.ln_finalizeq);
212
213         /* Recursion breaker.  Don't complete the message here if I am (or
214          * enough other threads are) already completing messages */
215
216 #ifdef __KERNEL__
217         my_slot = -1;
218         for (i = 0; i < the_lnet.ln_nfinalizers; i++) {
219                 if (the_lnet.ln_finalizers[i] == cfs_current())
220                         goto out;
221                 if (my_slot < 0 && the_lnet.ln_finalizers[i] == NULL)
222                         my_slot = i;
223         }
224         if (my_slot < 0)
225                 goto out;
226
227         the_lnet.ln_finalizers[my_slot] = cfs_current();
228 #else
229         if (the_lnet.ln_finalizing)
230                 goto out;
231
232         the_lnet.ln_finalizing = 1;
233 #endif
234
235         while (!cfs_list_empty(&the_lnet.ln_finalizeq)) {
236                 msg = cfs_list_entry(the_lnet.ln_finalizeq.next,
237                                      lnet_msg_t, msg_list);
238
239                 cfs_list_del(&msg->msg_list);
240
241                 /* NB drops and regains the lnet lock if it actually does
242                  * anything, so my finalizing friends can chomp along too */
243                 lnet_complete_msg_locked(msg);
244         }
245
246 #ifdef __KERNEL__
247         the_lnet.ln_finalizers[my_slot] = NULL;
248 #else
249         the_lnet.ln_finalizing = 0;
250 #endif
251
252  out:
253         LNET_UNLOCK();
254 }