Whamcloud - gitweb
b=21776 Use atomic memory allocation when VM is flushing to free memory.
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/lnet/lib-msg.c
37  *
38  * Message decoding, parsing and finalizing routines
39  */
40
41 #define DEBUG_SUBSYSTEM S_LNET
42
43 #include <lnet/lib-lnet.h>
44
45 void
46 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
47 {
48         ENTRY;
49
50         memset(ev, 0, sizeof(*ev));
51
52         ev->status   = 0;
53         ev->unlinked = 1;
54         ev->type     = LNET_EVENT_UNLINK;
55         lnet_md_deconstruct(md, &ev->md);
56         lnet_md2handle(&ev->md_handle, md);
57         EXIT;
58 }
59
60 void
61 lnet_enq_event_locked (lnet_eq_t *eq, lnet_event_t *ev)
62 {
63         lnet_event_t  *eq_slot;
64
65         /* Allocate the next queue slot */
66         ev->sequence = eq->eq_enq_seq++;
67
68         /* size must be a power of 2 to handle sequence # overflow */
69         LASSERT (eq->eq_size != 0 &&
70                  eq->eq_size == LOWEST_BIT_SET (eq->eq_size));
71         eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
72
73         /* There is no race since both event consumers and event producers
74          * take the LNET_LOCK, so we don't screw around with memory
75          * barriers, setting the sequence number last or wierd structure
76          * layout assertions. */
77         *eq_slot = *ev;
78
79         /* Call the callback handler (if any) */
80         if (eq->eq_callback != NULL)
81                 eq->eq_callback (eq_slot);
82
83 #ifdef __KERNEL__
84         /* Wake anyone waiting in LNetEQPoll() */
85         if (cfs_waitq_active(&the_lnet.ln_waitq))
86                 cfs_waitq_broadcast(&the_lnet.ln_waitq);
87 #else
88 # ifndef HAVE_PTHREAD
89         /* LNetEQPoll() calls into _the_ LND to wait for action */
90 # else
91         /* Wake anyone waiting in LNetEQPoll() */
92         pthread_cond_broadcast(&the_lnet.ln_cond);
93 # endif
94 #endif
95 }
96
97 void
98 lnet_complete_msg_locked(lnet_msg_t *msg)
99 {
100         lnet_handle_wire_t ack_wmd;
101         int                rc;
102         int                status = msg->msg_ev.status;
103
104         LASSERT (msg->msg_onactivelist);
105
106         if (status == 0 && msg->msg_ack) {
107                 /* Only send an ACK if the PUT completed successfully */
108
109                 lnet_return_credits_locked(msg);
110
111                 msg->msg_ack = 0;
112                 LNET_UNLOCK();
113
114                 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
115                 LASSERT(!msg->msg_routing);
116
117                 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
118
119                 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
120
121                 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
122                 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
123                 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
124
125                 rc = lnet_send(msg->msg_ev.target.nid, msg);
126
127                 LNET_LOCK();
128
129                 if (rc == 0)
130                         return;
131         } else if (status == 0 &&               /* OK so far */
132                    (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
133                 
134                 LASSERT (!msg->msg_receiving);  /* called back recv already */
135         
136                 LNET_UNLOCK();
137                 
138                 rc = lnet_send(LNET_NID_ANY, msg);
139
140                 LNET_LOCK();
141
142                 if (rc == 0)
143                         return;
144         }
145
146         lnet_return_credits_locked(msg);
147
148         LASSERT (msg->msg_onactivelist);
149         msg->msg_onactivelist = 0;
150         cfs_list_del (&msg->msg_activelist);
151         the_lnet.ln_counters.msgs_alloc--;
152         lnet_msg_free(msg);
153 }
154
155
156 void
157 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
158 {
159 #ifdef __KERNEL__
160         int                i;
161         int                my_slot;
162 #endif
163         lnet_libmd_t      *md;
164
165         LASSERT (!cfs_in_interrupt ());
166
167         if (msg == NULL)
168                 return;
169 #if 0
170         CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
171                lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
172                msg->msg_target_is_router ? "t" : "",
173                msg->msg_routing ? "X" : "",
174                msg->msg_ack ? "A" : "",
175                msg->msg_sending ? "S" : "",
176                msg->msg_receiving ? "R" : "",
177                msg->msg_delayed ? "d" : "",
178                msg->msg_txcredit ? "C" : "",
179                msg->msg_peertxcredit ? "c" : "",
180                msg->msg_rtrcredit ? "F" : "",
181                msg->msg_peerrtrcredit ? "f" : "",
182                msg->msg_onactivelist ? "!" : "",
183                msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
184                msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
185 #endif
186         LNET_LOCK();
187
188         LASSERT (msg->msg_onactivelist);
189
190         msg->msg_ev.status = status;
191
192         md = msg->msg_md;
193         if (md != NULL) {
194                 int      unlink;
195
196                 /* Now it's safe to drop my caller's ref */
197                 md->md_refcount--;
198                 LASSERT (md->md_refcount >= 0);
199
200                 unlink = lnet_md_unlinkable(md);
201
202                 msg->msg_ev.unlinked = unlink;
203
204                 if (md->md_eq != NULL)
205                         lnet_enq_event_locked(md->md_eq, &msg->msg_ev);
206
207                 if (unlink)
208                         lnet_md_unlink(md);
209
210                 msg->msg_md = NULL;
211         }
212
213         cfs_list_add_tail (&msg->msg_list, &the_lnet.ln_finalizeq);
214
215         /* Recursion breaker.  Don't complete the message here if I am (or
216          * enough other threads are) already completing messages */
217
218 #ifdef __KERNEL__
219         my_slot = -1;
220         for (i = 0; i < the_lnet.ln_nfinalizers; i++) {
221                 if (the_lnet.ln_finalizers[i] == cfs_current())
222                         goto out;
223                 if (my_slot < 0 && the_lnet.ln_finalizers[i] == NULL)
224                         my_slot = i;
225         }
226         if (my_slot < 0)
227                 goto out;
228
229         the_lnet.ln_finalizers[my_slot] = cfs_current();
230 #else
231         if (the_lnet.ln_finalizing)
232                 goto out;
233
234         the_lnet.ln_finalizing = 1;
235 #endif
236
237         while (!cfs_list_empty(&the_lnet.ln_finalizeq)) {
238                 msg = cfs_list_entry(the_lnet.ln_finalizeq.next,
239                                      lnet_msg_t, msg_list);
240
241                 cfs_list_del(&msg->msg_list);
242
243                 /* NB drops and regains the lnet lock if it actually does
244                  * anything, so my finalizing friends can chomp along too */
245                 lnet_complete_msg_locked(msg);
246         }
247
248 #ifdef __KERNEL__
249         the_lnet.ln_finalizers[my_slot] = NULL;
250 #else
251         the_lnet.ln_finalizing = 0;
252 #endif
253
254  out:
255         LNET_UNLOCK();
256 }