Whamcloud - gitweb
LU-506 kernel: 2.6.38 kernel macro check cleanup
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/lnet/lib-msg.c
35  *
36  * Message decoding, parsing and finalizing routines
37  */
38
39 #define DEBUG_SUBSYSTEM S_LNET
40
41 #include <lnet/lib-lnet.h>
42
43 void
44 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
45 {
46         ENTRY;
47
48         memset(ev, 0, sizeof(*ev));
49
50         ev->status   = 0;
51         ev->unlinked = 1;
52         ev->type     = LNET_EVENT_UNLINK;
53         lnet_md_deconstruct(md, &ev->md);
54         lnet_md2handle(&ev->md_handle, md);
55         EXIT;
56 }
57
58 /*
59  * Don't need any lock, must be called after lnet_commit_md
60  */
61 void
62 lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
63 {
64         lnet_hdr_t      *hdr = &msg->msg_hdr;
65         lnet_event_t    *ev  = &msg->msg_ev;
66
67         LASSERT(!msg->msg_routing);
68
69         ev->type = ev_type;
70
71         if (ev_type == LNET_EVENT_SEND) {
72                 /* event for active message */
73                 ev->target.nid    = le64_to_cpu(hdr->dest_nid);
74                 ev->target.pid    = le32_to_cpu(hdr->dest_pid);
75                 ev->initiator.nid = LNET_NID_ANY;
76                 ev->initiator.pid = the_lnet.ln_pid;
77                 ev->sender        = LNET_NID_ANY;
78
79         } else {
80                 /* event for passive message */
81                 ev->target.pid    = hdr->dest_pid;
82                 ev->target.nid    = hdr->dest_nid;
83                 ev->initiator.pid = hdr->src_pid;
84                 ev->initiator.nid = hdr->src_nid;
85                 ev->rlength       = hdr->payload_length;
86                 ev->sender        = msg->msg_from;
87                 ev->mlength       = msg->msg_wanted;
88                 ev->offset        = msg->msg_offset;
89         }
90
91         switch (ev_type) {
92         default:
93                 LBUG();
94
95         case LNET_EVENT_PUT: /* passive PUT */
96                 ev->pt_index   = hdr->msg.put.ptl_index;
97                 ev->match_bits = hdr->msg.put.match_bits;
98                 ev->hdr_data   = hdr->msg.put.hdr_data;
99                 return;
100
101         case LNET_EVENT_GET: /* passive GET */
102                 ev->pt_index   = hdr->msg.get.ptl_index;
103                 ev->match_bits = hdr->msg.get.match_bits;
104                 ev->hdr_data   = 0;
105                 return;
106
107         case LNET_EVENT_ACK: /* ACK */
108                 ev->match_bits = hdr->msg.ack.match_bits;
109                 ev->mlength    = hdr->msg.ack.mlength;
110                 return;
111
112         case LNET_EVENT_REPLY: /* REPLY */
113                 return;
114
115         case LNET_EVENT_SEND: /* active message */
116                 if (msg->msg_type == LNET_MSG_PUT) {
117                         ev->pt_index   = le32_to_cpu(hdr->msg.put.ptl_index);
118                         ev->match_bits = le64_to_cpu(hdr->msg.put.match_bits);
119                         ev->offset     = le32_to_cpu(hdr->msg.put.offset);
120                         ev->mlength    =
121                         ev->rlength    = le32_to_cpu(hdr->payload_length);
122                         ev->hdr_data   = le64_to_cpu(hdr->msg.put.hdr_data);
123
124                 } else {
125                         LASSERT(msg->msg_type == LNET_MSG_GET);
126                         ev->pt_index   = le32_to_cpu(hdr->msg.get.ptl_index);
127                         ev->match_bits = le64_to_cpu(hdr->msg.get.match_bits);
128                         ev->mlength    =
129                         ev->rlength    = le32_to_cpu(hdr->msg.get.sink_length);
130                         ev->offset     = le32_to_cpu(hdr->msg.get.src_offset);
131                         ev->hdr_data   = 0;
132                 }
133                 return;
134         }
135 }
136
137 void
138 lnet_msg_commit(lnet_msg_t *msg, int sending)
139 {
140         struct lnet_msg_container *container = &the_lnet.ln_msg_container;
141         lnet_counters_t           *counters  = &the_lnet.ln_counters;
142
143         /* routed message can be committed for both receiving and sending */
144         LASSERT(!msg->msg_tx_committed);
145
146         if (msg->msg_rx_committed) { /* routed message, or reply for GET */
147                 LASSERT(sending);
148                 LASSERT(msg->msg_onactivelist);
149                 msg->msg_tx_committed = 1;
150                 return;
151         }
152
153         LASSERT(!msg->msg_onactivelist);
154         msg->msg_onactivelist = 1;
155         cfs_list_add(&msg->msg_activelist, &container->msc_active);
156
157         counters->msgs_alloc++;
158         if (counters->msgs_alloc > counters->msgs_max)
159                 counters->msgs_max = counters->msgs_alloc;
160
161         if (sending)
162                 msg->msg_tx_committed = 1;
163         else
164                 msg->msg_rx_committed = 1;
165 }
166
167 static void
168 lnet_msg_tx_decommit(lnet_msg_t *msg, int status)
169 {
170         lnet_counters_t *counters = &the_lnet.ln_counters;
171         lnet_event_t *ev = &msg->msg_ev;
172
173         LASSERT(msg->msg_tx_committed);
174         if (status != 0)
175                 goto out;
176
177         switch (ev->type) {
178         default: /* routed message */
179                 LASSERT(msg->msg_routing);
180                 LASSERT(msg->msg_rx_committed);
181                 LASSERT(ev->type == 0);
182
183                 counters->route_length += msg->msg_len;
184                 counters->route_count++;
185                 goto out;
186
187         case LNET_EVENT_PUT:
188                 /* should have been decommitted */
189                 LASSERT(!msg->msg_rx_committed);
190                 /* overwritten while sending ACK */
191                 LASSERT(msg->msg_type == LNET_MSG_ACK);
192                 msg->msg_type = LNET_MSG_PUT; /* fix type */
193                 break;
194
195         case LNET_EVENT_SEND:
196                 LASSERT(!msg->msg_rx_committed);
197                 if (msg->msg_type == LNET_MSG_PUT)
198                         counters->send_length += msg->msg_len;
199                 break;
200
201         case LNET_EVENT_GET:
202                 LASSERT(msg->msg_rx_committed);
203                 /* overwritten while sending reply */
204                 LASSERT(msg->msg_type == LNET_MSG_REPLY);
205
206                 msg->msg_type = LNET_MSG_GET; /* fix type */
207                 counters->send_length += msg->msg_len;
208                 break;
209         }
210
211         counters->send_count++;
212  out:
213         lnet_return_tx_credits_locked(msg);
214         msg->msg_tx_committed = 0;
215 }
216
217 static void
218 lnet_msg_rx_decommit(lnet_msg_t *msg, int status)
219 {
220         lnet_counters_t *counters = &the_lnet.ln_counters;
221         lnet_event_t *ev = &msg->msg_ev;
222
223         LASSERT(!msg->msg_tx_committed); /* decommitted or uncommitted */
224         LASSERT(msg->msg_rx_committed);
225
226         if (status != 0)
227                 goto out;
228
229         switch (ev->type) {
230         default:
231                 LASSERT(ev->type == 0);
232                 LASSERT(msg->msg_routing);
233                 goto out;
234
235         case LNET_EVENT_ACK:
236                 LASSERT(msg->msg_type == LNET_MSG_ACK);
237                 break;
238
239         case LNET_EVENT_GET:
240                 LASSERT(msg->msg_type == LNET_MSG_GET);
241                 break;
242
243         case LNET_EVENT_PUT:
244                 LASSERT(msg->msg_type == LNET_MSG_PUT);
245                 break;
246
247         case LNET_EVENT_REPLY:
248                 LASSERT(msg->msg_type == LNET_MSG_REPLY ||
249                         msg->msg_type == LNET_MSG_GET); /* optimized GET */
250                 break;
251         }
252
253         counters->recv_count++;
254         if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
255                 counters->recv_length += msg->msg_wanted;
256
257  out:
258         lnet_return_rx_credits_locked(msg);
259         msg->msg_rx_committed = 0;
260 }
261
262 void
263 lnet_msg_decommit(lnet_msg_t *msg, int status)
264 {
265         lnet_counters_t *counters = &the_lnet.ln_counters;
266
267         LASSERT(msg->msg_tx_committed || msg->msg_rx_committed);
268         LASSERT(msg->msg_onactivelist);
269
270         if (msg->msg_tx_committed) /* always decommit for sending first */
271                 lnet_msg_tx_decommit(msg, status);
272
273         if (msg->msg_rx_committed)
274                 lnet_msg_rx_decommit(msg, status);
275
276         cfs_list_del(&msg->msg_activelist);
277         msg->msg_onactivelist = 0;
278         counters->msgs_alloc--;
279 }
280
281 void
282 lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
283                    unsigned int offset, unsigned int mlen)
284 {
285         /* Here, we attach the MD on lnet_msg and mark it busy and
286          * decrementing its threshold. Come what may, the lnet_msg "owns"
287          * the MD until a call to lnet_msg_detach_md or lnet_finalize()
288          * signals completion. */
289         LASSERT(!msg->msg_routing);
290
291         msg->msg_md = md;
292         if (msg->msg_receiving) { /* commited for receiving */
293                 msg->msg_offset = offset;
294                 msg->msg_wanted = mlen;
295         }
296
297         md->md_refcount++;
298         if (md->md_threshold != LNET_MD_THRESH_INF) {
299                 LASSERT(md->md_threshold > 0);
300                 md->md_threshold--;
301         }
302
303         /* build umd in event */
304         lnet_md2handle(&msg->msg_ev.md_handle, md);
305         lnet_md_deconstruct(md, &msg->msg_ev.md);
306 }
307
308 void
309 lnet_msg_detach_md(lnet_msg_t *msg, int status)
310 {
311         lnet_libmd_t    *md = msg->msg_md;
312         int             unlink;
313
314         /* Now it's safe to drop my caller's ref */
315         md->md_refcount--;
316         LASSERT(md->md_refcount >= 0);
317
318         unlink = lnet_md_unlinkable(md);
319         if (md->md_eq != NULL) {
320                 msg->msg_ev.status   = status;
321                 msg->msg_ev.unlinked = unlink;
322                 lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
323         }
324
325         if (unlink)
326                 lnet_md_unlink(md);
327
328         msg->msg_md = NULL;
329 }
330
331 void
332 lnet_complete_msg_locked(lnet_msg_t *msg)
333 {
334         lnet_handle_wire_t ack_wmd;
335         int                rc;
336         int                status = msg->msg_ev.status;
337
338         LASSERT (msg->msg_onactivelist);
339
340         if (status == 0 && msg->msg_ack) {
341                 /* Only send an ACK if the PUT completed successfully */
342
343                 lnet_msg_decommit(msg, 0);
344
345                 msg->msg_ack = 0;
346                 LNET_UNLOCK();
347
348                 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
349                 LASSERT(!msg->msg_routing);
350
351                 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
352
353                 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
354
355                 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
356                 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
357                 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
358
359                 rc = lnet_send(msg->msg_ev.target.nid, msg);
360
361                 LNET_LOCK();
362
363                 if (rc == 0)
364                         return;
365         } else if (status == 0 &&               /* OK so far */
366                    (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
367                 
368                 LASSERT (!msg->msg_receiving);  /* called back recv already */
369         
370                 LNET_UNLOCK();
371                 
372                 rc = lnet_send(LNET_NID_ANY, msg);
373
374                 LNET_LOCK();
375
376                 if (rc == 0)
377                         return;
378         }
379
380         lnet_msg_decommit(msg, status);
381         lnet_msg_free_locked(msg);
382 }
383
384
385 void
386 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
387 {
388         struct lnet_msg_container       *container;
389         int                             my_slot;
390         int                             i;
391
392         LASSERT (!cfs_in_interrupt ());
393
394         if (msg == NULL)
395                 return;
396 #if 0
397         CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
398                lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
399                msg->msg_target_is_router ? "t" : "",
400                msg->msg_routing ? "X" : "",
401                msg->msg_ack ? "A" : "",
402                msg->msg_sending ? "S" : "",
403                msg->msg_receiving ? "R" : "",
404                msg->msg_delayed ? "d" : "",
405                msg->msg_txcredit ? "C" : "",
406                msg->msg_peertxcredit ? "c" : "",
407                msg->msg_rtrcredit ? "F" : "",
408                msg->msg_peerrtrcredit ? "f" : "",
409                msg->msg_onactivelist ? "!" : "",
410                msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
411                msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
412 #endif
413
414         LASSERT (msg->msg_onactivelist);
415
416         msg->msg_ev.status = status;
417
418         if (msg->msg_md != NULL) {
419                 lnet_res_lock();
420                 lnet_msg_detach_md(msg, status);
421                 lnet_res_unlock();
422         }
423
424         if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
425                 /* not commited to network yet */
426                 LASSERT(!msg->msg_onactivelist);
427                 lnet_msg_free(msg);
428                 return;
429         }
430
431         LNET_LOCK();
432         container = &the_lnet.ln_msg_container;
433         cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
434
435         /* Recursion breaker.  Don't complete the message here if I am (or
436          * enough other threads are) already completing messages */
437
438 #ifdef __KERNEL__
439         my_slot = -1;
440         for (i = 0; i < container->msc_nfinalizers; i++) {
441                 if (container->msc_finalizers[i] == cfs_current())
442                         goto out;
443
444                 if (my_slot < 0 && container->msc_finalizers[i] == NULL)
445                         my_slot = i;
446         }
447
448         if (my_slot < 0)
449                 goto out;
450
451         container->msc_finalizers[my_slot] = cfs_current();
452 #else
453         LASSERT(container->msc_nfinalizers == 1);
454         if (container->msc_finalizers[0] != NULL)
455                 goto out;
456
457         my_slot = i = 0;
458         container->msc_finalizers[0] = (struct lnet_msg_container *)1;
459 #endif
460
461         while (!cfs_list_empty(&container->msc_finalizing)) {
462                 msg = cfs_list_entry(container->msc_finalizing.next,
463                                      lnet_msg_t, msg_list);
464
465                 cfs_list_del(&msg->msg_list);
466
467                 /* NB drops and regains the lnet lock if it actually does
468                  * anything, so my finalizing friends can chomp along too */
469                 lnet_complete_msg_locked(msg);
470         }
471
472         container->msc_finalizers[my_slot] = NULL;
473  out:
474         LNET_UNLOCK();
475 }
476
477 void
478 lnet_msg_container_cleanup(struct lnet_msg_container *container)
479 {
480         int     count = 0;
481
482         if (container->msc_init == 0)
483                 return;
484
485         while (!cfs_list_empty(&container->msc_active)) {
486                 lnet_msg_t *msg = cfs_list_entry(container->msc_active.next,
487                                                  lnet_msg_t, msg_activelist);
488
489                 LASSERT(msg->msg_onactivelist);
490                 msg->msg_onactivelist = 0;
491                 cfs_list_del(&msg->msg_activelist);
492                 lnet_msg_free(msg);
493                 count++;
494         }
495
496         if (count > 0)
497                 CERROR("%d active msg on exit\n", count);
498
499         if (container->msc_finalizers != NULL) {
500                 LIBCFS_FREE(container->msc_finalizers,
501                             container->msc_nfinalizers *
502                             sizeof(*container->msc_finalizers));
503                 container->msc_finalizers = NULL;
504         }
505 #ifdef LNET_USE_LIB_FREELIST
506         lnet_freelist_fini(&container->msc_freelist);
507 #endif
508         container->msc_init = 0;
509 }
510
511 int
512 lnet_msg_container_setup(struct lnet_msg_container *container)
513 {
514         int     rc;
515
516         container->msc_init = 1;
517
518         CFS_INIT_LIST_HEAD(&container->msc_active);
519         CFS_INIT_LIST_HEAD(&container->msc_finalizing);
520
521 #ifdef LNET_USE_LIB_FREELIST
522         memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
523
524         rc = lnet_freelist_init(&container->msc_freelist,
525                                 LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
526         if (rc != 0) {
527                 CERROR("Failed to init freelist for message container\n");
528                 lnet_msg_container_cleanup(container);
529                 return rc;
530         }
531 #else
532         rc = 0;
533 #endif
534         /* number of CPUs */
535         container->msc_nfinalizers = cfs_cpt_weight(cfs_cpt_table,
536                                                     CFS_CPT_ANY);
537         LIBCFS_ALLOC(container->msc_finalizers,
538                      container->msc_nfinalizers *
539                      sizeof(*container->msc_finalizers));
540
541         if (container->msc_finalizers == NULL) {
542                 CERROR("Failed to allocate message finalizers\n");
543                 lnet_msg_container_cleanup(container);
544                 return -ENOMEM;
545         }
546
547         return rc;
548 }