Whamcloud - gitweb
LU-56 lnet: cleanup for LNet Event Queue
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/lnet/lib-msg.c
35  *
36  * Message decoding, parsing and finalizing routines
37  */
38
39 #define DEBUG_SUBSYSTEM S_LNET
40
41 #include <lnet/lib-lnet.h>
42
43 void
44 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
45 {
46         ENTRY;
47
48         memset(ev, 0, sizeof(*ev));
49
50         ev->status   = 0;
51         ev->unlinked = 1;
52         ev->type     = LNET_EVENT_UNLINK;
53         lnet_md_deconstruct(md, &ev->md);
54         lnet_md2handle(&ev->md_handle, md);
55         EXIT;
56 }
57
58 void
59 lnet_complete_msg_locked(lnet_msg_t *msg)
60 {
61         lnet_handle_wire_t ack_wmd;
62         int                rc;
63         int                status = msg->msg_ev.status;
64
65         LASSERT (msg->msg_onactivelist);
66
67         if (status == 0 && msg->msg_ack) {
68                 /* Only send an ACK if the PUT completed successfully */
69
70                 lnet_return_credits_locked(msg);
71
72                 msg->msg_ack = 0;
73                 LNET_UNLOCK();
74
75                 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
76                 LASSERT(!msg->msg_routing);
77
78                 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
79
80                 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
81
82                 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
83                 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
84                 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
85
86                 rc = lnet_send(msg->msg_ev.target.nid, msg);
87
88                 LNET_LOCK();
89
90                 if (rc == 0)
91                         return;
92         } else if (status == 0 &&               /* OK so far */
93                    (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
94                 
95                 LASSERT (!msg->msg_receiving);  /* called back recv already */
96         
97                 LNET_UNLOCK();
98                 
99                 rc = lnet_send(LNET_NID_ANY, msg);
100
101                 LNET_LOCK();
102
103                 if (rc == 0)
104                         return;
105         }
106
107         lnet_return_credits_locked(msg);
108
109         LASSERT (msg->msg_onactivelist);
110         msg->msg_onactivelist = 0;
111         cfs_list_del (&msg->msg_activelist);
112         the_lnet.ln_counters.msgs_alloc--;
113         lnet_msg_free_locked(msg);
114 }
115
116
117 void
118 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
119 {
120         struct lnet_msg_container       *container;
121         lnet_libmd_t                    *md;
122         int                             my_slot;
123         int                             i;
124
125         LASSERT (!cfs_in_interrupt ());
126
127         if (msg == NULL)
128                 return;
129 #if 0
130         CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
131                lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
132                msg->msg_target_is_router ? "t" : "",
133                msg->msg_routing ? "X" : "",
134                msg->msg_ack ? "A" : "",
135                msg->msg_sending ? "S" : "",
136                msg->msg_receiving ? "R" : "",
137                msg->msg_delayed ? "d" : "",
138                msg->msg_txcredit ? "C" : "",
139                msg->msg_peertxcredit ? "c" : "",
140                msg->msg_rtrcredit ? "F" : "",
141                msg->msg_peerrtrcredit ? "f" : "",
142                msg->msg_onactivelist ? "!" : "",
143                msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
144                msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
145 #endif
146         LNET_LOCK();
147
148         LASSERT (msg->msg_onactivelist);
149
150         msg->msg_ev.status = status;
151
152         md = msg->msg_md;
153         if (md != NULL) {
154                 int      unlink;
155
156                 /* Now it's safe to drop my caller's ref */
157                 md->md_refcount--;
158                 LASSERT (md->md_refcount >= 0);
159
160                 unlink = lnet_md_unlinkable(md);
161
162                 msg->msg_ev.unlinked = unlink;
163
164                 if (md->md_eq != NULL)
165                         lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
166
167                 if (unlink)
168                         lnet_md_unlink(md);
169
170                 msg->msg_md = NULL;
171         }
172
173         container = &the_lnet.ln_msg_container;
174         cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
175
176         /* Recursion breaker.  Don't complete the message here if I am (or
177          * enough other threads are) already completing messages */
178
179 #ifdef __KERNEL__
180         my_slot = -1;
181         for (i = 0; i < container->msc_nfinalizers; i++) {
182                 if (container->msc_finalizers[i] == cfs_current())
183                         goto out;
184
185                 if (my_slot < 0 && container->msc_finalizers[i] == NULL)
186                         my_slot = i;
187         }
188
189         if (my_slot < 0)
190                 goto out;
191
192         container->msc_finalizers[my_slot] = cfs_current();
193 #else
194         LASSERT(container->msc_nfinalizers == 1);
195         if (container->msc_finalizers[0] != NULL)
196                 goto out;
197
198         my_slot = i = 0;
199         container->msc_finalizers[0] = (struct lnet_msg_container *)1;
200 #endif
201
202         while (!cfs_list_empty(&container->msc_finalizing)) {
203                 msg = cfs_list_entry(container->msc_finalizing.next,
204                                      lnet_msg_t, msg_list);
205
206                 cfs_list_del(&msg->msg_list);
207
208                 /* NB drops and regains the lnet lock if it actually does
209                  * anything, so my finalizing friends can chomp along too */
210                 lnet_complete_msg_locked(msg);
211         }
212
213         container->msc_finalizers[my_slot] = NULL;
214  out:
215         LNET_UNLOCK();
216 }
217
218 void
219 lnet_msg_container_cleanup(struct lnet_msg_container *container)
220 {
221         int     count = 0;
222
223         if (container->msc_init == 0)
224                 return;
225
226         while (!cfs_list_empty(&container->msc_active)) {
227                 lnet_msg_t *msg = cfs_list_entry(container->msc_active.next,
228                                                  lnet_msg_t, msg_activelist);
229
230                 LASSERT(msg->msg_onactivelist);
231                 msg->msg_onactivelist = 0;
232                 cfs_list_del(&msg->msg_activelist);
233                 lnet_msg_free(msg);
234                 count++;
235         }
236
237         if (count > 0)
238                 CERROR("%d active msg on exit\n", count);
239
240         if (container->msc_finalizers != NULL) {
241                 LIBCFS_FREE(container->msc_finalizers,
242                             container->msc_nfinalizers *
243                             sizeof(*container->msc_finalizers));
244                 container->msc_finalizers = NULL;
245         }
246 #ifdef LNET_USE_LIB_FREELIST
247         lnet_freelist_fini(&container->msc_freelist);
248 #endif
249         container->msc_init = 0;
250 }
251
252 int
253 lnet_msg_container_setup(struct lnet_msg_container *container)
254 {
255         int     rc;
256
257         container->msc_init = 1;
258
259         CFS_INIT_LIST_HEAD(&container->msc_active);
260         CFS_INIT_LIST_HEAD(&container->msc_finalizing);
261
262 #ifdef LNET_USE_LIB_FREELIST
263         memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
264
265         rc = lnet_freelist_init(&container->msc_freelist,
266                                 LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
267         if (rc != 0) {
268                 CERROR("Failed to init freelist for message container\n");
269                 lnet_msg_container_cleanup(container);
270                 return rc;
271         }
272 #else
273         rc = 0;
274 #endif
275         /* number of CPUs */
276         container->msc_nfinalizers = cfs_cpt_weight(cfs_cpt_table,
277                                                     CFS_CPT_ANY);
278         LIBCFS_ALLOC(container->msc_finalizers,
279                      container->msc_nfinalizers *
280                      sizeof(*container->msc_finalizers));
281
282         if (container->msc_finalizers == NULL) {
283                 CERROR("Failed to allocate message finalizers\n");
284                 lnet_msg_container_cleanup(container);
285                 return -ENOMEM;
286         }
287
288         return 0;
289 }