Whamcloud - gitweb
b=15272
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-msg.c
5  * Message decoding, parsing and finalizing routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LNET
26
27 #include <lnet/lib-lnet.h>
28
29 void
30 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
31 {
32         memset(ev, 0, sizeof(*ev));
33
34         ev->status   = 0;
35         ev->unlinked = 1;
36         ev->type     = LNET_EVENT_UNLINK;
37         lnet_md_deconstruct(md, &ev->md);
38         lnet_md2handle(&ev->md_handle, md);
39 }
40
41 void
42 lnet_enq_event_locked (lnet_eq_t *eq, lnet_event_t *ev)
43 {
44         lnet_event_t  *eq_slot;
45
46         /* Allocate the next queue slot */
47         ev->sequence = eq->eq_enq_seq++;
48
49         /* size must be a power of 2 to handle sequence # overflow */
50         LASSERT (eq->eq_size != 0 &&
51                  eq->eq_size == LOWEST_BIT_SET (eq->eq_size));
52         eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
53
54         /* There is no race since both event consumers and event producers
55          * take the LNET_LOCK, so we don't screw around with memory
56          * barriers, setting the sequence number last or wierd structure
57          * layout assertions. */
58         *eq_slot = *ev;
59
60         /* Call the callback handler (if any) */
61         if (eq->eq_callback != NULL)
62                 eq->eq_callback (eq_slot);
63
64 #ifdef __KERNEL__
65         /* Wake anyone waiting in LNetEQPoll() */
66         if (cfs_waitq_active(&the_lnet.ln_waitq))
67                 cfs_waitq_broadcast(&the_lnet.ln_waitq);
68 #else
69 # ifndef HAVE_LIBPTHREAD
70         /* LNetEQPoll() calls into _the_ LND to wait for action */
71 # else
72         /* Wake anyone waiting in LNetEQPoll() */
73         pthread_cond_broadcast(&the_lnet.ln_cond);
74 # endif
75 #endif
76 }
77
78 void
79 lnet_complete_msg_locked(lnet_msg_t *msg)
80 {
81         lnet_handle_wire_t ack_wmd;
82         int                rc;
83         int                status = msg->msg_ev.status;
84
85         LASSERT (msg->msg_onactivelist);
86
87         if (status == 0 && msg->msg_ack) {
88                 /* Only send an ACK if the PUT completed successfully */
89
90                 lnet_return_credits_locked(msg);
91
92                 msg->msg_ack = 0;
93                 LNET_UNLOCK();
94         
95                 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
96                 LASSERT(!msg->msg_routing);
97
98                 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
99                 
100                 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
101
102                 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
103                 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
104                 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
105                 
106                 rc = lnet_send(msg->msg_ev.target.nid, msg);
107
108                 LNET_LOCK();
109
110                 if (rc == 0)
111                         return;
112         } else if (status == 0 &&               /* OK so far */
113                    (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
114                 
115                 LASSERT (!msg->msg_receiving);  /* called back recv already */
116         
117                 LNET_UNLOCK();
118                 
119                 rc = lnet_send(LNET_NID_ANY, msg);
120
121                 LNET_LOCK();
122
123                 if (rc == 0)
124                         return;
125         }
126
127         lnet_return_credits_locked(msg);
128
129         LASSERT (msg->msg_onactivelist);
130         msg->msg_onactivelist = 0;
131         list_del (&msg->msg_activelist);
132         the_lnet.ln_counters.msgs_alloc--;
133         lnet_msg_free(msg);
134 }
135
136
137 void
138 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
139 {
140 #ifdef __KERNEL__
141         int                i;
142         int                my_slot;
143 #endif
144         lnet_libmd_t      *md;
145
146         LASSERT (!in_interrupt ());
147
148         if (msg == NULL)
149                 return;
150 #if 0
151         CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
152                lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
153                msg->msg_target_is_router ? "t" : "",
154                msg->msg_routing ? "X" : "",
155                msg->msg_ack ? "A" : "",
156                msg->msg_sending ? "S" : "",
157                msg->msg_receiving ? "R" : "",
158                msg->msg_delayed ? "d" : "",
159                msg->msg_txcredit ? "C" : "",
160                msg->msg_peertxcredit ? "c" : "",
161                msg->msg_rtrcredit ? "F" : "",
162                msg->msg_peerrtrcredit ? "f" : "",
163                msg->msg_onactivelist ? "!" : "",
164                msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
165                msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
166 #endif
167         LNET_LOCK();
168
169         LASSERT (msg->msg_onactivelist);
170
171         msg->msg_ev.status = status;
172
173         md = msg->msg_md;
174         if (md != NULL) {
175                 int      unlink;
176
177                 /* Now it's safe to drop my caller's ref */
178                 md->md_refcount--;
179                 LASSERT (md->md_refcount >= 0);
180
181                 unlink = lnet_md_unlinkable(md);
182
183                 msg->msg_ev.unlinked = unlink;
184
185                 if (md->md_eq != NULL)
186                         lnet_enq_event_locked(md->md_eq, &msg->msg_ev);
187
188                 if (unlink)
189                         lnet_md_unlink(md);
190
191                 msg->msg_md = NULL;
192         }
193
194         list_add_tail (&msg->msg_list, &the_lnet.ln_finalizeq);
195
196         /* Recursion breaker.  Don't complete the message here if I am (or
197          * enough other threads are) already completing messages */
198
199 #ifdef __KERNEL__
200         my_slot = -1;
201         for (i = 0; i < the_lnet.ln_nfinalizers; i++) {
202                 if (the_lnet.ln_finalizers[i] == cfs_current())
203                         goto out;
204                 if (my_slot < 0 && the_lnet.ln_finalizers[i] == NULL)
205                         my_slot = i;
206         }
207         if (my_slot < 0)
208                 goto out;
209
210         the_lnet.ln_finalizers[my_slot] = cfs_current();
211 #else
212         if (the_lnet.ln_finalizing)
213                 goto out;
214
215         the_lnet.ln_finalizing = 1;
216 #endif
217
218         while (!list_empty(&the_lnet.ln_finalizeq)) {
219                 msg = list_entry(the_lnet.ln_finalizeq.next,
220                                  lnet_msg_t, msg_list);
221                 
222                 list_del(&msg->msg_list);
223
224                 /* NB drops and regains the lnet lock if it actually does
225                  * anything, so my finalizing friends can chomp along too */
226                 lnet_complete_msg_locked(msg);
227         }
228
229 #ifdef __KERNEL__
230         the_lnet.ln_finalizers[my_slot] = NULL;
231 #else
232         the_lnet.ln_finalizing = 0;
233 #endif
234
235  out:
236         LNET_UNLOCK();
237 }
238