Whamcloud - gitweb
LU-56 lnet: cleanup for rtrpool and LNet counter
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/lnet/lib-msg.c
35  *
36  * Message decoding, parsing and finalizing routines
37  */
38
39 #define DEBUG_SUBSYSTEM S_LNET
40
41 #include <lnet/lib-lnet.h>
42
43 void
44 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
45 {
46         ENTRY;
47
48         memset(ev, 0, sizeof(*ev));
49
50         ev->status   = 0;
51         ev->unlinked = 1;
52         ev->type     = LNET_EVENT_UNLINK;
53         lnet_md_deconstruct(md, &ev->md);
54         lnet_md2handle(&ev->md_handle, md);
55         EXIT;
56 }
57
58 /*
59  * Don't need any lock, must be called after lnet_commit_md
60  */
61 void
62 lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
63 {
64         lnet_hdr_t      *hdr = &msg->msg_hdr;
65         lnet_event_t    *ev  = &msg->msg_ev;
66
67         LASSERT(!msg->msg_routing);
68
69         ev->type = ev_type;
70
71         if (ev_type == LNET_EVENT_SEND) {
72                 /* event for active message */
73                 ev->target.nid    = le64_to_cpu(hdr->dest_nid);
74                 ev->target.pid    = le32_to_cpu(hdr->dest_pid);
75                 ev->initiator.nid = LNET_NID_ANY;
76                 ev->initiator.pid = the_lnet.ln_pid;
77                 ev->sender        = LNET_NID_ANY;
78
79         } else {
80                 /* event for passive message */
81                 ev->target.pid    = hdr->dest_pid;
82                 ev->target.nid    = hdr->dest_nid;
83                 ev->initiator.pid = hdr->src_pid;
84                 ev->initiator.nid = hdr->src_nid;
85                 ev->rlength       = hdr->payload_length;
86                 ev->sender        = msg->msg_from;
87                 ev->mlength       = msg->msg_wanted;
88                 ev->offset        = msg->msg_offset;
89         }
90
91         switch (ev_type) {
92         default:
93                 LBUG();
94
95         case LNET_EVENT_PUT: /* passive PUT */
96                 ev->pt_index   = hdr->msg.put.ptl_index;
97                 ev->match_bits = hdr->msg.put.match_bits;
98                 ev->hdr_data   = hdr->msg.put.hdr_data;
99                 return;
100
101         case LNET_EVENT_GET: /* passive GET */
102                 ev->pt_index   = hdr->msg.get.ptl_index;
103                 ev->match_bits = hdr->msg.get.match_bits;
104                 ev->hdr_data   = 0;
105                 return;
106
107         case LNET_EVENT_ACK: /* ACK */
108                 ev->match_bits = hdr->msg.ack.match_bits;
109                 ev->mlength    = hdr->msg.ack.mlength;
110                 return;
111
112         case LNET_EVENT_REPLY: /* REPLY */
113                 return;
114
115         case LNET_EVENT_SEND: /* active message */
116                 if (msg->msg_type == LNET_MSG_PUT) {
117                         ev->pt_index   = le32_to_cpu(hdr->msg.put.ptl_index);
118                         ev->match_bits = le64_to_cpu(hdr->msg.put.match_bits);
119                         ev->offset     = le32_to_cpu(hdr->msg.put.offset);
120                         ev->mlength    =
121                         ev->rlength    = le32_to_cpu(hdr->payload_length);
122                         ev->hdr_data   = le64_to_cpu(hdr->msg.put.hdr_data);
123
124                 } else {
125                         LASSERT(msg->msg_type == LNET_MSG_GET);
126                         ev->pt_index   = le32_to_cpu(hdr->msg.get.ptl_index);
127                         ev->match_bits = le64_to_cpu(hdr->msg.get.match_bits);
128                         ev->mlength    =
129                         ev->rlength    = le32_to_cpu(hdr->msg.get.sink_length);
130                         ev->offset     = le32_to_cpu(hdr->msg.get.src_offset);
131                         ev->hdr_data   = 0;
132                 }
133                 return;
134         }
135 }
136
137 void
138 lnet_msg_commit(lnet_msg_t *msg, int sending)
139 {
140         struct lnet_msg_container *container = &the_lnet.ln_msg_container;
141         lnet_counters_t           *counters  = the_lnet.ln_counters;
142
143         /* routed message can be committed for both receiving and sending */
144         LASSERT(!msg->msg_tx_committed);
145
146         if (msg->msg_rx_committed) { /* routed message, or reply for GET */
147                 LASSERT(sending);
148                 LASSERT(msg->msg_onactivelist);
149                 msg->msg_tx_committed = 1;
150                 return;
151         }
152
153         LASSERT(!msg->msg_onactivelist);
154         msg->msg_onactivelist = 1;
155         cfs_list_add(&msg->msg_activelist, &container->msc_active);
156
157         counters->msgs_alloc++;
158         if (counters->msgs_alloc > counters->msgs_max)
159                 counters->msgs_max = counters->msgs_alloc;
160
161         if (sending)
162                 msg->msg_tx_committed = 1;
163         else
164                 msg->msg_rx_committed = 1;
165 }
166
167 static void
168 lnet_msg_tx_decommit(lnet_msg_t *msg, int status)
169 {
170         lnet_counters_t *counters = the_lnet.ln_counters;
171         lnet_event_t *ev = &msg->msg_ev;
172
173         LASSERT(msg->msg_tx_committed);
174         if (status != 0)
175                 goto out;
176
177         switch (ev->type) {
178         default: /* routed message */
179                 LASSERT(msg->msg_routing);
180                 LASSERT(msg->msg_rx_committed);
181                 LASSERT(ev->type == 0);
182
183                 counters->route_length += msg->msg_len;
184                 counters->route_count++;
185                 goto out;
186
187         case LNET_EVENT_PUT:
188                 /* should have been decommitted */
189                 LASSERT(!msg->msg_rx_committed);
190                 /* overwritten while sending ACK */
191                 LASSERT(msg->msg_type == LNET_MSG_ACK);
192                 msg->msg_type = LNET_MSG_PUT; /* fix type */
193                 break;
194
195         case LNET_EVENT_SEND:
196                 LASSERT(!msg->msg_rx_committed);
197                 if (msg->msg_type == LNET_MSG_PUT)
198                         counters->send_length += msg->msg_len;
199                 break;
200
201         case LNET_EVENT_GET:
202                 LASSERT(msg->msg_rx_committed);
203                 /* overwritten while sending reply */
204                 LASSERT(msg->msg_type == LNET_MSG_REPLY);
205
206                 msg->msg_type = LNET_MSG_GET; /* fix type */
207                 counters->send_length += msg->msg_len;
208                 break;
209         }
210
211         counters->send_count++;
212  out:
213         lnet_return_tx_credits_locked(msg);
214         msg->msg_tx_committed = 0;
215 }
216
217 static void
218 lnet_msg_rx_decommit(lnet_msg_t *msg, int status)
219 {
220         lnet_counters_t *counters = the_lnet.ln_counters;
221         lnet_event_t *ev = &msg->msg_ev;
222
223         LASSERT(!msg->msg_tx_committed); /* decommitted or uncommitted */
224         LASSERT(msg->msg_rx_committed);
225
226         if (status != 0)
227                 goto out;
228
229         switch (ev->type) {
230         default:
231                 LASSERT(ev->type == 0);
232                 LASSERT(msg->msg_routing);
233                 goto out;
234
235         case LNET_EVENT_ACK:
236                 LASSERT(msg->msg_type == LNET_MSG_ACK);
237                 break;
238
239         case LNET_EVENT_GET:
240                 LASSERT(msg->msg_type == LNET_MSG_GET);
241                 break;
242
243         case LNET_EVENT_PUT:
244                 LASSERT(msg->msg_type == LNET_MSG_PUT);
245                 break;
246
247         case LNET_EVENT_REPLY:
248                 LASSERT(msg->msg_type == LNET_MSG_REPLY ||
249                         msg->msg_type == LNET_MSG_GET); /* optimized GET */
250                 break;
251         }
252
253         counters->recv_count++;
254         if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
255                 counters->recv_length += msg->msg_wanted;
256
257  out:
258         lnet_return_rx_credits_locked(msg);
259         msg->msg_rx_committed = 0;
260 }
261
262 void
263 lnet_msg_decommit(lnet_msg_t *msg, int status)
264 {
265         lnet_counters_t *counters = the_lnet.ln_counters;
266
267         LASSERT(msg->msg_tx_committed || msg->msg_rx_committed);
268         LASSERT(msg->msg_onactivelist);
269
270         if (msg->msg_tx_committed) /* always decommit for sending first */
271                 lnet_msg_tx_decommit(msg, status);
272
273         if (msg->msg_rx_committed)
274                 lnet_msg_rx_decommit(msg, status);
275
276         cfs_list_del(&msg->msg_activelist);
277         msg->msg_onactivelist = 0;
278         counters->msgs_alloc--;
279 }
280
281 void
282 lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
283                    unsigned int offset, unsigned int mlen)
284 {
285         /* Here, we attach the MD on lnet_msg and mark it busy and
286          * decrementing its threshold. Come what may, the lnet_msg "owns"
287          * the MD until a call to lnet_msg_detach_md or lnet_finalize()
288          * signals completion. */
289         LASSERT(!msg->msg_routing);
290
291         msg->msg_md = md;
292         if (msg->msg_receiving) { /* commited for receiving */
293                 msg->msg_offset = offset;
294                 msg->msg_wanted = mlen;
295         }
296
297         md->md_refcount++;
298         if (md->md_threshold != LNET_MD_THRESH_INF) {
299                 LASSERT(md->md_threshold > 0);
300                 md->md_threshold--;
301         }
302
303         /* build umd in event */
304         lnet_md2handle(&msg->msg_ev.md_handle, md);
305         lnet_md_deconstruct(md, &msg->msg_ev.md);
306 }
307
308 void
309 lnet_msg_detach_md(lnet_msg_t *msg, int status)
310 {
311         lnet_libmd_t    *md = msg->msg_md;
312         int             unlink;
313
314         /* Now it's safe to drop my caller's ref */
315         md->md_refcount--;
316         LASSERT(md->md_refcount >= 0);
317
318         unlink = lnet_md_unlinkable(md);
319         if (md->md_eq != NULL) {
320                 msg->msg_ev.status   = status;
321                 msg->msg_ev.unlinked = unlink;
322                 lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
323         }
324
325         if (unlink)
326                 lnet_md_unlink(md);
327
328         msg->msg_md = NULL;
329 }
330
331 void
332 lnet_complete_msg_locked(lnet_msg_t *msg)
333 {
334         lnet_handle_wire_t ack_wmd;
335         int                rc;
336         int                status = msg->msg_ev.status;
337
338         LASSERT (msg->msg_onactivelist);
339
340         if (status == 0 && msg->msg_ack) {
341                 /* Only send an ACK if the PUT completed successfully */
342
343                 lnet_msg_decommit(msg, 0);
344
345                 msg->msg_ack = 0;
346                 LNET_UNLOCK();
347
348                 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
349                 LASSERT(!msg->msg_routing);
350
351                 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
352
353                 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
354
355                 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
356                 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
357                 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
358
359                 rc = lnet_send(msg->msg_ev.target.nid, msg);
360
361                 LNET_LOCK();
362
363                 if (rc == 0)
364                         return;
365         } else if (status == 0 &&               /* OK so far */
366                    (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
367                 
368                 LASSERT (!msg->msg_receiving);  /* called back recv already */
369         
370                 LNET_UNLOCK();
371                 
372                 rc = lnet_send(LNET_NID_ANY, msg);
373
374                 LNET_LOCK();
375
376                 if (rc == 0)
377                         return;
378         }
379
380         lnet_msg_decommit(msg, status);
381         lnet_msg_free_locked(msg);
382 }
383
384
385 void
386 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
387 {
388         struct lnet_msg_container       *container;
389         int                             my_slot;
390         int                             cpt;
391         int                             i;
392
393         LASSERT (!cfs_in_interrupt ());
394
395         if (msg == NULL)
396                 return;
397 #if 0
398         CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
399                lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
400                msg->msg_target_is_router ? "t" : "",
401                msg->msg_routing ? "X" : "",
402                msg->msg_ack ? "A" : "",
403                msg->msg_sending ? "S" : "",
404                msg->msg_receiving ? "R" : "",
405                msg->msg_delayed ? "d" : "",
406                msg->msg_txcredit ? "C" : "",
407                msg->msg_peertxcredit ? "c" : "",
408                msg->msg_rtrcredit ? "F" : "",
409                msg->msg_peerrtrcredit ? "f" : "",
410                msg->msg_onactivelist ? "!" : "",
411                msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
412                msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
413 #endif
414
415         LASSERT (msg->msg_onactivelist);
416
417         msg->msg_ev.status = status;
418
419         if (msg->msg_md != NULL) {
420                 cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
421
422                 lnet_res_lock(cpt);
423                 lnet_msg_detach_md(msg, status);
424                 lnet_res_unlock(cpt);
425         }
426
427         if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
428                 /* not commited to network yet */
429                 LASSERT(!msg->msg_onactivelist);
430                 lnet_msg_free(msg);
431                 return;
432         }
433
434         LNET_LOCK();
435         container = &the_lnet.ln_msg_container;
436         cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
437
438         /* Recursion breaker.  Don't complete the message here if I am (or
439          * enough other threads are) already completing messages */
440
441 #ifdef __KERNEL__
442         my_slot = -1;
443         for (i = 0; i < container->msc_nfinalizers; i++) {
444                 if (container->msc_finalizers[i] == cfs_current())
445                         goto out;
446
447                 if (my_slot < 0 && container->msc_finalizers[i] == NULL)
448                         my_slot = i;
449         }
450
451         if (my_slot < 0)
452                 goto out;
453
454         container->msc_finalizers[my_slot] = cfs_current();
455 #else
456         LASSERT(container->msc_nfinalizers == 1);
457         if (container->msc_finalizers[0] != NULL)
458                 goto out;
459
460         my_slot = i = 0;
461         container->msc_finalizers[0] = (struct lnet_msg_container *)1;
462 #endif
463
464         while (!cfs_list_empty(&container->msc_finalizing)) {
465                 msg = cfs_list_entry(container->msc_finalizing.next,
466                                      lnet_msg_t, msg_list);
467
468                 cfs_list_del(&msg->msg_list);
469
470                 /* NB drops and regains the lnet lock if it actually does
471                  * anything, so my finalizing friends can chomp along too */
472                 lnet_complete_msg_locked(msg);
473         }
474
475         container->msc_finalizers[my_slot] = NULL;
476  out:
477         LNET_UNLOCK();
478 }
479
480 void
481 lnet_msg_container_cleanup(struct lnet_msg_container *container)
482 {
483         int     count = 0;
484
485         if (container->msc_init == 0)
486                 return;
487
488         while (!cfs_list_empty(&container->msc_active)) {
489                 lnet_msg_t *msg = cfs_list_entry(container->msc_active.next,
490                                                  lnet_msg_t, msg_activelist);
491
492                 LASSERT(msg->msg_onactivelist);
493                 msg->msg_onactivelist = 0;
494                 cfs_list_del(&msg->msg_activelist);
495                 lnet_msg_free(msg);
496                 count++;
497         }
498
499         if (count > 0)
500                 CERROR("%d active msg on exit\n", count);
501
502         if (container->msc_finalizers != NULL) {
503                 LIBCFS_FREE(container->msc_finalizers,
504                             container->msc_nfinalizers *
505                             sizeof(*container->msc_finalizers));
506                 container->msc_finalizers = NULL;
507         }
508 #ifdef LNET_USE_LIB_FREELIST
509         lnet_freelist_fini(&container->msc_freelist);
510 #endif
511         container->msc_init = 0;
512 }
513
514 int
515 lnet_msg_container_setup(struct lnet_msg_container *container)
516 {
517         int     rc;
518
519         container->msc_init = 1;
520
521         CFS_INIT_LIST_HEAD(&container->msc_active);
522         CFS_INIT_LIST_HEAD(&container->msc_finalizing);
523
524 #ifdef LNET_USE_LIB_FREELIST
525         memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
526
527         rc = lnet_freelist_init(&container->msc_freelist,
528                                 LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
529         if (rc != 0) {
530                 CERROR("Failed to init freelist for message container\n");
531                 lnet_msg_container_cleanup(container);
532                 return rc;
533         }
534 #else
535         rc = 0;
536 #endif
537         /* number of CPUs */
538         container->msc_nfinalizers = cfs_cpt_weight(cfs_cpt_table,
539                                                     CFS_CPT_ANY);
540         LIBCFS_ALLOC(container->msc_finalizers,
541                      container->msc_nfinalizers *
542                      sizeof(*container->msc_finalizers));
543
544         if (container->msc_finalizers == NULL) {
545                 CERROR("Failed to allocate message finalizers\n");
546                 lnet_msg_container_cleanup(container);
547                 return -ENOMEM;
548         }
549
550         return rc;
551 }