Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/lnet/lib-msg.c
37  *
38  * Message decoding, parsing and finalizing routines
39  */
40
41 #define DEBUG_SUBSYSTEM S_LNET
42
43 #include <lnet/lib-lnet.h>
44
45 void
46 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
47 {
48         memset(ev, 0, sizeof(*ev));
49
50         ev->status   = 0;
51         ev->unlinked = 1;
52         ev->type     = LNET_EVENT_UNLINK;
53         lnet_md_deconstruct(md, &ev->md);
54         lnet_md2handle(&ev->md_handle, md);
55 }
56
57 void
58 lnet_enq_event_locked (lnet_eq_t *eq, lnet_event_t *ev)
59 {
60         lnet_event_t  *eq_slot;
61
62         /* Allocate the next queue slot */
63         ev->sequence = eq->eq_enq_seq++;
64
65         /* size must be a power of 2 to handle sequence # overflow */
66         LASSERT (eq->eq_size != 0 &&
67                  eq->eq_size == LOWEST_BIT_SET (eq->eq_size));
68         eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
69
70         /* There is no race since both event consumers and event producers
71          * take the LNET_LOCK, so we don't screw around with memory
72          * barriers, setting the sequence number last or wierd structure
73          * layout assertions. */
74         *eq_slot = *ev;
75
76         /* Call the callback handler (if any) */
77         if (eq->eq_callback != NULL)
78                 eq->eq_callback (eq_slot);
79
80 #ifdef __KERNEL__
81         /* Wake anyone waiting in LNetEQPoll() */
82         if (cfs_waitq_active(&the_lnet.ln_waitq))
83                 cfs_waitq_broadcast(&the_lnet.ln_waitq);
84 #else
85 # ifndef HAVE_LIBPTHREAD
86         /* LNetEQPoll() calls into _the_ LND to wait for action */
87 # else
88         /* Wake anyone waiting in LNetEQPoll() */
89         pthread_cond_broadcast(&the_lnet.ln_cond);
90 # endif
91 #endif
92 }
93
94 void
95 lnet_complete_msg_locked(lnet_msg_t *msg)
96 {
97         lnet_handle_wire_t ack_wmd;
98         int                rc;
99         int                status = msg->msg_ev.status;
100
101         LASSERT (msg->msg_onactivelist);
102
103         if (status == 0 && msg->msg_ack) {
104                 /* Only send an ACK if the PUT completed successfully */
105
106                 lnet_return_credits_locked(msg);
107
108                 msg->msg_ack = 0;
109                 LNET_UNLOCK();
110
111                 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
112                 LASSERT(!msg->msg_routing);
113
114                 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
115
116                 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
117
118                 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
119                 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
120                 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
121
122                 rc = lnet_send(msg->msg_ev.target.nid, msg);
123
124                 LNET_LOCK();
125
126                 if (rc == 0)
127                         return;
128         } else if (status == 0 &&               /* OK so far */
129                    (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
130                 
131                 LASSERT (!msg->msg_receiving);  /* called back recv already */
132         
133                 LNET_UNLOCK();
134                 
135                 rc = lnet_send(LNET_NID_ANY, msg);
136
137                 LNET_LOCK();
138
139                 if (rc == 0)
140                         return;
141         }
142
143         lnet_return_credits_locked(msg);
144
145         LASSERT (msg->msg_onactivelist);
146         msg->msg_onactivelist = 0;
147         list_del (&msg->msg_activelist);
148         the_lnet.ln_counters.msgs_alloc--;
149         lnet_msg_free(msg);
150 }
151
152
153 void
154 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
155 {
156 #ifdef __KERNEL__
157         int                i;
158         int                my_slot;
159 #endif
160         lnet_libmd_t      *md;
161
162         LASSERT (!in_interrupt ());
163
164         if (msg == NULL)
165                 return;
166 #if 0
167         CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
168                lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
169                msg->msg_target_is_router ? "t" : "",
170                msg->msg_routing ? "X" : "",
171                msg->msg_ack ? "A" : "",
172                msg->msg_sending ? "S" : "",
173                msg->msg_receiving ? "R" : "",
174                msg->msg_delayed ? "d" : "",
175                msg->msg_txcredit ? "C" : "",
176                msg->msg_peertxcredit ? "c" : "",
177                msg->msg_rtrcredit ? "F" : "",
178                msg->msg_peerrtrcredit ? "f" : "",
179                msg->msg_onactivelist ? "!" : "",
180                msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
181                msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
182 #endif
183         LNET_LOCK();
184
185         LASSERT (msg->msg_onactivelist);
186
187         msg->msg_ev.status = status;
188
189         md = msg->msg_md;
190         if (md != NULL) {
191                 int      unlink;
192
193                 /* Now it's safe to drop my caller's ref */
194                 md->md_refcount--;
195                 LASSERT (md->md_refcount >= 0);
196
197                 unlink = lnet_md_unlinkable(md);
198
199                 msg->msg_ev.unlinked = unlink;
200
201                 if (md->md_eq != NULL)
202                         lnet_enq_event_locked(md->md_eq, &msg->msg_ev);
203
204                 if (unlink)
205                         lnet_md_unlink(md);
206
207                 msg->msg_md = NULL;
208         }
209
210         list_add_tail (&msg->msg_list, &the_lnet.ln_finalizeq);
211
212         /* Recursion breaker.  Don't complete the message here if I am (or
213          * enough other threads are) already completing messages */
214
215 #ifdef __KERNEL__
216         my_slot = -1;
217         for (i = 0; i < the_lnet.ln_nfinalizers; i++) {
218                 if (the_lnet.ln_finalizers[i] == cfs_current())
219                         goto out;
220                 if (my_slot < 0 && the_lnet.ln_finalizers[i] == NULL)
221                         my_slot = i;
222         }
223         if (my_slot < 0)
224                 goto out;
225
226         the_lnet.ln_finalizers[my_slot] = cfs_current();
227 #else
228         if (the_lnet.ln_finalizing)
229                 goto out;
230
231         the_lnet.ln_finalizing = 1;
232 #endif
233
234         while (!list_empty(&the_lnet.ln_finalizeq)) {
235                 msg = list_entry(the_lnet.ln_finalizeq.next,
236                                  lnet_msg_t, msg_list);
237                 
238                 list_del(&msg->msg_list);
239
240                 /* NB drops and regains the lnet lock if it actually does
241                  * anything, so my finalizing friends can chomp along too */
242                 lnet_complete_msg_locked(msg);
243         }
244
245 #ifdef __KERNEL__
246         the_lnet.ln_finalizers[my_slot] = NULL;
247 #else
248         the_lnet.ln_finalizing = 0;
249 #endif
250
251  out:
252         LNET_UNLOCK();
253 }