Whamcloud - gitweb
LU-56 lnet: LNet message event cleanup
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/lnet/lib-msg.c
35  *
36  * Message decoding, parsing and finalizing routines
37  */
38
39 #define DEBUG_SUBSYSTEM S_LNET
40
41 #include <lnet/lib-lnet.h>
42
43 void
44 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
45 {
46         ENTRY;
47
48         memset(ev, 0, sizeof(*ev));
49
50         ev->status   = 0;
51         ev->unlinked = 1;
52         ev->type     = LNET_EVENT_UNLINK;
53         lnet_md_deconstruct(md, &ev->md);
54         lnet_md2handle(&ev->md_handle, md);
55         EXIT;
56 }
57
58 /*
59  * Don't need any lock, must be called after lnet_commit_md
60  */
61 void
62 lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
63 {
64         lnet_hdr_t      *hdr = &msg->msg_hdr;
65         lnet_event_t    *ev  = &msg->msg_ev;
66
67         LASSERT(!msg->msg_routing);
68
69         ev->type = ev_type;
70
71         if (ev_type == LNET_EVENT_SEND) {
72                 /* event for active message */
73                 ev->target.nid    = le64_to_cpu(hdr->dest_nid);
74                 ev->target.pid    = le32_to_cpu(hdr->dest_pid);
75                 ev->initiator.nid = LNET_NID_ANY;
76                 ev->initiator.pid = the_lnet.ln_pid;
77                 ev->sender        = LNET_NID_ANY;
78
79         } else {
80                 /* event for passive message */
81                 ev->target.pid    = hdr->dest_pid;
82                 ev->target.nid    = hdr->dest_nid;
83                 ev->initiator.pid = hdr->src_pid;
84                 ev->initiator.nid = hdr->src_nid;
85                 ev->rlength       = hdr->payload_length;
86                 ev->sender        = msg->msg_from;
87                 ev->mlength       = msg->msg_wanted;
88                 ev->offset        = msg->msg_offset;
89         }
90
91         switch (ev_type) {
92         default:
93                 LBUG();
94
95         case LNET_EVENT_PUT: /* passive PUT */
96                 ev->pt_index   = hdr->msg.put.ptl_index;
97                 ev->match_bits = hdr->msg.put.match_bits;
98                 ev->hdr_data   = hdr->msg.put.hdr_data;
99                 return;
100
101         case LNET_EVENT_GET: /* passive GET */
102                 ev->pt_index   = hdr->msg.get.ptl_index;
103                 ev->match_bits = hdr->msg.get.match_bits;
104                 ev->hdr_data   = 0;
105                 return;
106
107         case LNET_EVENT_ACK: /* ACK */
108                 ev->match_bits = hdr->msg.ack.match_bits;
109                 ev->mlength    = hdr->msg.ack.mlength;
110                 return;
111
112         case LNET_EVENT_REPLY: /* REPLY */
113                 return;
114
115         case LNET_EVENT_SEND: /* active message */
116                 if (msg->msg_type == LNET_MSG_PUT) {
117                         ev->pt_index   = le32_to_cpu(hdr->msg.put.ptl_index);
118                         ev->match_bits = le64_to_cpu(hdr->msg.put.match_bits);
119                         ev->offset     = le32_to_cpu(hdr->msg.put.offset);
120                         ev->mlength    =
121                         ev->rlength    = le32_to_cpu(hdr->payload_length);
122                         ev->hdr_data   = le64_to_cpu(hdr->msg.put.hdr_data);
123
124                 } else {
125                         LASSERT(msg->msg_type == LNET_MSG_GET);
126                         ev->pt_index   = le32_to_cpu(hdr->msg.get.ptl_index);
127                         ev->match_bits = le64_to_cpu(hdr->msg.get.match_bits);
128                         ev->mlength    =
129                         ev->rlength    = le32_to_cpu(hdr->msg.get.sink_length);
130                         ev->offset     = le32_to_cpu(hdr->msg.get.src_offset);
131                         ev->hdr_data   = 0;
132                 }
133                 return;
134         }
135 }
136
137 void
138 lnet_complete_msg_locked(lnet_msg_t *msg)
139 {
140         lnet_handle_wire_t ack_wmd;
141         int                rc;
142         int                status = msg->msg_ev.status;
143
144         LASSERT (msg->msg_onactivelist);
145
146         if (status == 0 && msg->msg_ack) {
147                 /* Only send an ACK if the PUT completed successfully */
148
149                 lnet_return_credits_locked(msg);
150
151                 msg->msg_ack = 0;
152                 LNET_UNLOCK();
153
154                 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
155                 LASSERT(!msg->msg_routing);
156
157                 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
158
159                 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
160
161                 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
162                 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
163                 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
164
165                 rc = lnet_send(msg->msg_ev.target.nid, msg);
166
167                 LNET_LOCK();
168
169                 if (rc == 0)
170                         return;
171         } else if (status == 0 &&               /* OK so far */
172                    (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
173                 
174                 LASSERT (!msg->msg_receiving);  /* called back recv already */
175         
176                 LNET_UNLOCK();
177                 
178                 rc = lnet_send(LNET_NID_ANY, msg);
179
180                 LNET_LOCK();
181
182                 if (rc == 0)
183                         return;
184         }
185
186         lnet_return_credits_locked(msg);
187
188         LASSERT (msg->msg_onactivelist);
189         msg->msg_onactivelist = 0;
190         cfs_list_del (&msg->msg_activelist);
191         the_lnet.ln_counters.msgs_alloc--;
192         lnet_msg_free_locked(msg);
193 }
194
195
196 void
197 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
198 {
199         struct lnet_msg_container       *container;
200         lnet_libmd_t                    *md;
201         int                             my_slot;
202         int                             i;
203
204         LASSERT (!cfs_in_interrupt ());
205
206         if (msg == NULL)
207                 return;
208 #if 0
209         CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
210                lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
211                msg->msg_target_is_router ? "t" : "",
212                msg->msg_routing ? "X" : "",
213                msg->msg_ack ? "A" : "",
214                msg->msg_sending ? "S" : "",
215                msg->msg_receiving ? "R" : "",
216                msg->msg_delayed ? "d" : "",
217                msg->msg_txcredit ? "C" : "",
218                msg->msg_peertxcredit ? "c" : "",
219                msg->msg_rtrcredit ? "F" : "",
220                msg->msg_peerrtrcredit ? "f" : "",
221                msg->msg_onactivelist ? "!" : "",
222                msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
223                msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
224 #endif
225         LNET_LOCK();
226
227         LASSERT (msg->msg_onactivelist);
228
229         msg->msg_ev.status = status;
230
231         md = msg->msg_md;
232         if (md != NULL) {
233                 int      unlink;
234
235                 /* Now it's safe to drop my caller's ref */
236                 md->md_refcount--;
237                 LASSERT (md->md_refcount >= 0);
238
239                 unlink = lnet_md_unlinkable(md);
240
241                 msg->msg_ev.unlinked = unlink;
242
243                 if (md->md_eq != NULL)
244                         lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
245
246                 if (unlink)
247                         lnet_md_unlink(md);
248
249                 msg->msg_md = NULL;
250         }
251
252         container = &the_lnet.ln_msg_container;
253         cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
254
255         /* Recursion breaker.  Don't complete the message here if I am (or
256          * enough other threads are) already completing messages */
257
258 #ifdef __KERNEL__
259         my_slot = -1;
260         for (i = 0; i < container->msc_nfinalizers; i++) {
261                 if (container->msc_finalizers[i] == cfs_current())
262                         goto out;
263
264                 if (my_slot < 0 && container->msc_finalizers[i] == NULL)
265                         my_slot = i;
266         }
267
268         if (my_slot < 0)
269                 goto out;
270
271         container->msc_finalizers[my_slot] = cfs_current();
272 #else
273         LASSERT(container->msc_nfinalizers == 1);
274         if (container->msc_finalizers[0] != NULL)
275                 goto out;
276
277         my_slot = i = 0;
278         container->msc_finalizers[0] = (struct lnet_msg_container *)1;
279 #endif
280
281         while (!cfs_list_empty(&container->msc_finalizing)) {
282                 msg = cfs_list_entry(container->msc_finalizing.next,
283                                      lnet_msg_t, msg_list);
284
285                 cfs_list_del(&msg->msg_list);
286
287                 /* NB drops and regains the lnet lock if it actually does
288                  * anything, so my finalizing friends can chomp along too */
289                 lnet_complete_msg_locked(msg);
290         }
291
292         container->msc_finalizers[my_slot] = NULL;
293  out:
294         LNET_UNLOCK();
295 }
296
297 void
298 lnet_msg_container_cleanup(struct lnet_msg_container *container)
299 {
300         int     count = 0;
301
302         if (container->msc_init == 0)
303                 return;
304
305         while (!cfs_list_empty(&container->msc_active)) {
306                 lnet_msg_t *msg = cfs_list_entry(container->msc_active.next,
307                                                  lnet_msg_t, msg_activelist);
308
309                 LASSERT(msg->msg_onactivelist);
310                 msg->msg_onactivelist = 0;
311                 cfs_list_del(&msg->msg_activelist);
312                 lnet_msg_free(msg);
313                 count++;
314         }
315
316         if (count > 0)
317                 CERROR("%d active msg on exit\n", count);
318
319         if (container->msc_finalizers != NULL) {
320                 LIBCFS_FREE(container->msc_finalizers,
321                             container->msc_nfinalizers *
322                             sizeof(*container->msc_finalizers));
323                 container->msc_finalizers = NULL;
324         }
325 #ifdef LNET_USE_LIB_FREELIST
326         lnet_freelist_fini(&container->msc_freelist);
327 #endif
328         container->msc_init = 0;
329 }
330
331 int
332 lnet_msg_container_setup(struct lnet_msg_container *container)
333 {
334         int     rc;
335
336         container->msc_init = 1;
337
338         CFS_INIT_LIST_HEAD(&container->msc_active);
339         CFS_INIT_LIST_HEAD(&container->msc_finalizing);
340
341 #ifdef LNET_USE_LIB_FREELIST
342         memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
343
344         rc = lnet_freelist_init(&container->msc_freelist,
345                                 LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
346         if (rc != 0) {
347                 CERROR("Failed to init freelist for message container\n");
348                 lnet_msg_container_cleanup(container);
349                 return rc;
350         }
351 #else
352         rc = 0;
353 #endif
354         /* number of CPUs */
355         container->msc_nfinalizers = cfs_cpt_weight(cfs_cpt_table,
356                                                     CFS_CPT_ANY);
357         LIBCFS_ALLOC(container->msc_finalizers,
358                      container->msc_nfinalizers *
359                      sizeof(*container->msc_finalizers));
360
361         if (container->msc_finalizers == NULL) {
362                 CERROR("Failed to allocate message finalizers\n");
363                 lnet_msg_container_cleanup(container);
364                 return -ENOMEM;
365         }
366
367         return 0;
368 }