Whamcloud - gitweb
LU-56 lnet: split lnet_commit_md and cleanup
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/lnet/lib-msg.c
35  *
36  * Message decoding, parsing and finalizing routines
37  */
38
39 #define DEBUG_SUBSYSTEM S_LNET
40
41 #include <lnet/lib-lnet.h>
42
43 void
44 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
45 {
46         ENTRY;
47
48         memset(ev, 0, sizeof(*ev));
49
50         ev->status   = 0;
51         ev->unlinked = 1;
52         ev->type     = LNET_EVENT_UNLINK;
53         lnet_md_deconstruct(md, &ev->md);
54         lnet_md2handle(&ev->md_handle, md);
55         EXIT;
56 }
57
58 /*
59  * Don't need any lock, must be called after lnet_commit_md
60  */
61 void
62 lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
63 {
64         lnet_hdr_t      *hdr = &msg->msg_hdr;
65         lnet_event_t    *ev  = &msg->msg_ev;
66
67         LASSERT(!msg->msg_routing);
68
69         ev->type = ev_type;
70
71         if (ev_type == LNET_EVENT_SEND) {
72                 /* event for active message */
73                 ev->target.nid    = le64_to_cpu(hdr->dest_nid);
74                 ev->target.pid    = le32_to_cpu(hdr->dest_pid);
75                 ev->initiator.nid = LNET_NID_ANY;
76                 ev->initiator.pid = the_lnet.ln_pid;
77                 ev->sender        = LNET_NID_ANY;
78
79         } else {
80                 /* event for passive message */
81                 ev->target.pid    = hdr->dest_pid;
82                 ev->target.nid    = hdr->dest_nid;
83                 ev->initiator.pid = hdr->src_pid;
84                 ev->initiator.nid = hdr->src_nid;
85                 ev->rlength       = hdr->payload_length;
86                 ev->sender        = msg->msg_from;
87                 ev->mlength       = msg->msg_wanted;
88                 ev->offset        = msg->msg_offset;
89         }
90
91         switch (ev_type) {
92         default:
93                 LBUG();
94
95         case LNET_EVENT_PUT: /* passive PUT */
96                 ev->pt_index   = hdr->msg.put.ptl_index;
97                 ev->match_bits = hdr->msg.put.match_bits;
98                 ev->hdr_data   = hdr->msg.put.hdr_data;
99                 return;
100
101         case LNET_EVENT_GET: /* passive GET */
102                 ev->pt_index   = hdr->msg.get.ptl_index;
103                 ev->match_bits = hdr->msg.get.match_bits;
104                 ev->hdr_data   = 0;
105                 return;
106
107         case LNET_EVENT_ACK: /* ACK */
108                 ev->match_bits = hdr->msg.ack.match_bits;
109                 ev->mlength    = hdr->msg.ack.mlength;
110                 return;
111
112         case LNET_EVENT_REPLY: /* REPLY */
113                 return;
114
115         case LNET_EVENT_SEND: /* active message */
116                 if (msg->msg_type == LNET_MSG_PUT) {
117                         ev->pt_index   = le32_to_cpu(hdr->msg.put.ptl_index);
118                         ev->match_bits = le64_to_cpu(hdr->msg.put.match_bits);
119                         ev->offset     = le32_to_cpu(hdr->msg.put.offset);
120                         ev->mlength    =
121                         ev->rlength    = le32_to_cpu(hdr->payload_length);
122                         ev->hdr_data   = le64_to_cpu(hdr->msg.put.hdr_data);
123
124                 } else {
125                         LASSERT(msg->msg_type == LNET_MSG_GET);
126                         ev->pt_index   = le32_to_cpu(hdr->msg.get.ptl_index);
127                         ev->match_bits = le64_to_cpu(hdr->msg.get.match_bits);
128                         ev->mlength    =
129                         ev->rlength    = le32_to_cpu(hdr->msg.get.sink_length);
130                         ev->offset     = le32_to_cpu(hdr->msg.get.src_offset);
131                         ev->hdr_data   = 0;
132                 }
133                 return;
134         }
135 }
136
137 void
138 lnet_msg_commit(lnet_msg_t *msg, int sending)
139 {
140         struct lnet_msg_container *container = &the_lnet.ln_msg_container;
141         lnet_counters_t           *counters  = &the_lnet.ln_counters;
142
143         /* routed message can be committed for both receiving and sending */
144         LASSERT(!msg->msg_tx_committed);
145
146         if (msg->msg_rx_committed) { /* routed message, or reply for GET */
147                 LASSERT(sending);
148                 LASSERT(msg->msg_onactivelist);
149                 msg->msg_tx_committed = 1;
150                 return;
151         }
152
153         LASSERT(!msg->msg_onactivelist);
154         msg->msg_onactivelist = 1;
155         cfs_list_add(&msg->msg_activelist, &container->msc_active);
156
157         counters->msgs_alloc++;
158         if (counters->msgs_alloc > counters->msgs_max)
159                 counters->msgs_max = counters->msgs_alloc;
160
161         if (sending)
162                 msg->msg_tx_committed = 1;
163         else
164                 msg->msg_rx_committed = 1;
165 }
166
167 static void
168 lnet_msg_tx_decommit(lnet_msg_t *msg, int status)
169 {
170         lnet_counters_t *counters = &the_lnet.ln_counters;
171         lnet_event_t *ev = &msg->msg_ev;
172
173         LASSERT(msg->msg_tx_committed);
174         if (status != 0)
175                 goto out;
176
177         switch (ev->type) {
178         default: /* routed message */
179                 LASSERT(msg->msg_routing);
180                 LASSERT(msg->msg_rx_committed);
181                 LASSERT(ev->type == 0);
182
183                 counters->route_length += msg->msg_len;
184                 counters->route_count++;
185                 goto out;
186
187         case LNET_EVENT_PUT:
188                 /* should have been decommitted */
189                 LASSERT(!msg->msg_rx_committed);
190                 /* overwritten while sending ACK */
191                 LASSERT(msg->msg_type == LNET_MSG_ACK);
192                 msg->msg_type = LNET_MSG_PUT; /* fix type */
193                 break;
194
195         case LNET_EVENT_SEND:
196                 LASSERT(!msg->msg_rx_committed);
197                 if (msg->msg_type == LNET_MSG_PUT)
198                         counters->send_length += msg->msg_len;
199                 break;
200
201         case LNET_EVENT_GET:
202                 LASSERT(msg->msg_rx_committed);
203                 /* overwritten while sending reply */
204                 LASSERT(msg->msg_type == LNET_MSG_REPLY);
205
206                 msg->msg_type = LNET_MSG_GET; /* fix type */
207                 counters->send_length += msg->msg_len;
208                 break;
209         }
210
211         counters->send_count++;
212  out:
213         lnet_return_tx_credits_locked(msg);
214         msg->msg_tx_committed = 0;
215 }
216
217 static void
218 lnet_msg_rx_decommit(lnet_msg_t *msg, int status)
219 {
220         lnet_counters_t *counters = &the_lnet.ln_counters;
221         lnet_event_t *ev = &msg->msg_ev;
222
223         LASSERT(!msg->msg_tx_committed); /* decommitted or uncommitted */
224         LASSERT(msg->msg_rx_committed);
225
226         if (status != 0)
227                 goto out;
228
229         switch (ev->type) {
230         default:
231                 LASSERT(ev->type == 0);
232                 LASSERT(msg->msg_routing);
233                 goto out;
234
235         case LNET_EVENT_ACK:
236                 LASSERT(msg->msg_type == LNET_MSG_ACK);
237                 break;
238
239         case LNET_EVENT_GET:
240                 LASSERT(msg->msg_type == LNET_MSG_GET);
241                 break;
242
243         case LNET_EVENT_PUT:
244                 LASSERT(msg->msg_type == LNET_MSG_PUT);
245                 break;
246
247         case LNET_EVENT_REPLY:
248                 LASSERT(msg->msg_type == LNET_MSG_REPLY ||
249                         msg->msg_type == LNET_MSG_GET); /* optimized GET */
250                 break;
251         }
252
253         counters->recv_count++;
254         if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
255                 counters->recv_length += msg->msg_wanted;
256
257  out:
258         lnet_return_rx_credits_locked(msg);
259         msg->msg_rx_committed = 0;
260 }
261
262 void
263 lnet_msg_decommit(lnet_msg_t *msg, int status)
264 {
265         lnet_counters_t *counters = &the_lnet.ln_counters;
266
267         LASSERT(msg->msg_tx_committed || msg->msg_rx_committed);
268         LASSERT(msg->msg_onactivelist);
269
270         if (msg->msg_tx_committed) /* always decommit for sending first */
271                 lnet_msg_tx_decommit(msg, status);
272
273         if (msg->msg_rx_committed)
274                 lnet_msg_rx_decommit(msg, status);
275
276         cfs_list_del(&msg->msg_activelist);
277         msg->msg_onactivelist = 0;
278         counters->msgs_alloc--;
279 }
280
281 void
282 lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
283                    unsigned int offset, unsigned int mlen)
284 {
285         /* Here, we attach the MD on lnet_msg and mark it busy and
286          * decrementing its threshold. Come what may, the lnet_msg "owns"
287          * the MD until a call to lnet_msg_detach_md or lnet_finalize()
288          * signals completion. */
289         LASSERT(!msg->msg_routing);
290
291         msg->msg_md = md;
292         if (msg->msg_receiving) { /* commited for receiving */
293                 msg->msg_offset = offset;
294                 msg->msg_wanted = mlen;
295         }
296
297         md->md_refcount++;
298         if (md->md_threshold != LNET_MD_THRESH_INF) {
299                 LASSERT(md->md_threshold > 0);
300                 md->md_threshold--;
301         }
302
303         /* build umd in event */
304         lnet_md2handle(&msg->msg_ev.md_handle, md);
305         lnet_md_deconstruct(md, &msg->msg_ev.md);
306 }
307
308 void
309 lnet_msg_detach_md(lnet_msg_t *msg, int status)
310 {
311         lnet_libmd_t    *md = msg->msg_md;
312         int             unlink;
313
314         if (md == NULL)
315                 return;
316
317         /* Now it's safe to drop my caller's ref */
318         md->md_refcount--;
319         LASSERT(md->md_refcount >= 0);
320
321         unlink = lnet_md_unlinkable(md);
322         if (md->md_eq != NULL) {
323                 msg->msg_ev.status   = status;
324                 msg->msg_ev.unlinked = unlink;
325                 lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
326         }
327
328         if (unlink)
329                 lnet_md_unlink(md);
330
331         msg->msg_md = NULL;
332 }
333
334 void
335 lnet_complete_msg_locked(lnet_msg_t *msg)
336 {
337         lnet_handle_wire_t ack_wmd;
338         int                rc;
339         int                status = msg->msg_ev.status;
340
341         LASSERT (msg->msg_onactivelist);
342
343         if (status == 0 && msg->msg_ack) {
344                 /* Only send an ACK if the PUT completed successfully */
345
346                 lnet_msg_decommit(msg, 0);
347
348                 msg->msg_ack = 0;
349                 LNET_UNLOCK();
350
351                 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
352                 LASSERT(!msg->msg_routing);
353
354                 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
355
356                 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
357
358                 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
359                 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
360                 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
361
362                 rc = lnet_send(msg->msg_ev.target.nid, msg);
363
364                 LNET_LOCK();
365
366                 if (rc == 0)
367                         return;
368         } else if (status == 0 &&               /* OK so far */
369                    (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
370                 
371                 LASSERT (!msg->msg_receiving);  /* called back recv already */
372         
373                 LNET_UNLOCK();
374                 
375                 rc = lnet_send(LNET_NID_ANY, msg);
376
377                 LNET_LOCK();
378
379                 if (rc == 0)
380                         return;
381         }
382
383         lnet_msg_decommit(msg, status);
384         lnet_msg_free_locked(msg);
385 }
386
387
388 void
389 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
390 {
391         struct lnet_msg_container       *container;
392         int                             my_slot;
393         int                             i;
394
395         LASSERT (!cfs_in_interrupt ());
396
397         if (msg == NULL)
398                 return;
399 #if 0
400         CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
401                lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
402                msg->msg_target_is_router ? "t" : "",
403                msg->msg_routing ? "X" : "",
404                msg->msg_ack ? "A" : "",
405                msg->msg_sending ? "S" : "",
406                msg->msg_receiving ? "R" : "",
407                msg->msg_delayed ? "d" : "",
408                msg->msg_txcredit ? "C" : "",
409                msg->msg_peertxcredit ? "c" : "",
410                msg->msg_rtrcredit ? "F" : "",
411                msg->msg_peerrtrcredit ? "f" : "",
412                msg->msg_onactivelist ? "!" : "",
413                msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
414                msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
415 #endif
416         LNET_LOCK();
417
418         LASSERT (msg->msg_onactivelist);
419
420         msg->msg_ev.status = status;
421
422         if (msg->msg_md != NULL)
423                 lnet_msg_detach_md(msg, status);
424
425         if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
426                 LNET_UNLOCK();
427                 /* not commited to network yet */
428                 LASSERT(!msg->msg_onactivelist);
429                 lnet_msg_free(msg);
430                 return;
431         }
432
433         container = &the_lnet.ln_msg_container;
434         cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
435
436         /* Recursion breaker.  Don't complete the message here if I am (or
437          * enough other threads are) already completing messages */
438
439 #ifdef __KERNEL__
440         my_slot = -1;
441         for (i = 0; i < container->msc_nfinalizers; i++) {
442                 if (container->msc_finalizers[i] == cfs_current())
443                         goto out;
444
445                 if (my_slot < 0 && container->msc_finalizers[i] == NULL)
446                         my_slot = i;
447         }
448
449         if (my_slot < 0)
450                 goto out;
451
452         container->msc_finalizers[my_slot] = cfs_current();
453 #else
454         LASSERT(container->msc_nfinalizers == 1);
455         if (container->msc_finalizers[0] != NULL)
456                 goto out;
457
458         my_slot = i = 0;
459         container->msc_finalizers[0] = (struct lnet_msg_container *)1;
460 #endif
461
462         while (!cfs_list_empty(&container->msc_finalizing)) {
463                 msg = cfs_list_entry(container->msc_finalizing.next,
464                                      lnet_msg_t, msg_list);
465
466                 cfs_list_del(&msg->msg_list);
467
468                 /* NB drops and regains the lnet lock if it actually does
469                  * anything, so my finalizing friends can chomp along too */
470                 lnet_complete_msg_locked(msg);
471         }
472
473         container->msc_finalizers[my_slot] = NULL;
474  out:
475         LNET_UNLOCK();
476 }
477
478 void
479 lnet_msg_container_cleanup(struct lnet_msg_container *container)
480 {
481         int     count = 0;
482
483         if (container->msc_init == 0)
484                 return;
485
486         while (!cfs_list_empty(&container->msc_active)) {
487                 lnet_msg_t *msg = cfs_list_entry(container->msc_active.next,
488                                                  lnet_msg_t, msg_activelist);
489
490                 LASSERT(msg->msg_onactivelist);
491                 msg->msg_onactivelist = 0;
492                 cfs_list_del(&msg->msg_activelist);
493                 lnet_msg_free(msg);
494                 count++;
495         }
496
497         if (count > 0)
498                 CERROR("%d active msg on exit\n", count);
499
500         if (container->msc_finalizers != NULL) {
501                 LIBCFS_FREE(container->msc_finalizers,
502                             container->msc_nfinalizers *
503                             sizeof(*container->msc_finalizers));
504                 container->msc_finalizers = NULL;
505         }
506 #ifdef LNET_USE_LIB_FREELIST
507         lnet_freelist_fini(&container->msc_freelist);
508 #endif
509         container->msc_init = 0;
510 }
511
512 int
513 lnet_msg_container_setup(struct lnet_msg_container *container)
514 {
515         int     rc;
516
517         container->msc_init = 1;
518
519         CFS_INIT_LIST_HEAD(&container->msc_active);
520         CFS_INIT_LIST_HEAD(&container->msc_finalizing);
521
522 #ifdef LNET_USE_LIB_FREELIST
523         memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
524
525         rc = lnet_freelist_init(&container->msc_freelist,
526                                 LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
527         if (rc != 0) {
528                 CERROR("Failed to init freelist for message container\n");
529                 lnet_msg_container_cleanup(container);
530                 return rc;
531         }
532 #else
533         rc = 0;
534 #endif
535         /* number of CPUs */
536         container->msc_nfinalizers = cfs_cpt_weight(cfs_cpt_table,
537                                                     CFS_CPT_ANY);
538         LIBCFS_ALLOC(container->msc_finalizers,
539                      container->msc_nfinalizers *
540                      sizeof(*container->msc_finalizers));
541
542         if (container->msc_finalizers == NULL) {
543                 CERROR("Failed to allocate message finalizers\n");
544                 lnet_msg_container_cleanup(container);
545                 return -ENOMEM;
546         }
547
548         return 0;
549 }