1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 #define DEBUG_SUBSYSTEM S_RPC
25 #include <linux/module.h>
26 #include <linux/obd_support.h>
27 #include <linux/lustre_net.h>
29 ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq,
30 bulk_put_source_eq, bulk_put_sink_eq,
31 bulk_get_source_eq, bulk_get_sink_eq;
32 static const ptl_handle_ni_t *socknal_nip = NULL, *toenal_nip = NULL,
33 *qswnal_nip = NULL, *gmnal_nip = NULL;
36 * Free the packet when it has gone out
38 static int request_out_callback(ptl_event_t *ev)
40 struct ptlrpc_request *req = ev->mem_desc.user_ptr;
43 /* requests always contiguous */
44 LASSERT((ev->mem_desc.options & PTL_MD_IOV) == 0);
46 if (ev->type != PTL_EVENT_SENT) {
47 // XXX make sure we understand all events, including ACK's
48 CERROR("Unknown event %d\n", ev->type);
52 /* this balances the atomic_inc in ptl_send_rpc */
53 ptlrpc_req_finished(req);
59 * Free the packet when it has gone out
61 static int reply_out_callback(ptl_event_t *ev)
65 /* replies always contiguous */
66 LASSERT((ev->mem_desc.options & PTL_MD_IOV) == 0);
68 if (ev->type == PTL_EVENT_SENT) {
69 OBD_FREE(ev->mem_desc.start, ev->mem_desc.length);
71 // XXX make sure we understand all events, including ACK's
72 CERROR("Unknown event %d\n", ev->type);
80 * Wake up the thread waiting for the reply once it comes in.
82 static int reply_in_callback(ptl_event_t *ev)
84 struct ptlrpc_request *req = ev->mem_desc.user_ptr;
87 /* replies always contiguous */
88 LASSERT((ev->mem_desc.options & PTL_MD_IOV) == 0);
90 if (req->rq_xid == 0x5a5a5a5a5a5a5a5a) {
91 CERROR("Reply received for freed request! Probably a missing "
96 if (req->rq_xid != ev->match_bits) {
97 CERROR("Reply packet for wrong request\n");
101 if (ev->type == PTL_EVENT_PUT) {
102 req->rq_repmsg = ev->mem_desc.start + ev->offset;
104 wake_up(&req->rq_wait_for_rep);
106 // XXX make sure we understand all events, including ACK's
107 CERROR("Unknown event %d\n", ev->type);
114 int request_in_callback(ptl_event_t *ev)
116 struct ptlrpc_request_buffer_desc *rqbd = ev->mem_desc.user_ptr;
117 struct ptlrpc_service *service = rqbd->rqbd_service;
119 /* requests always contiguous */
120 LASSERT((ev->mem_desc.options & PTL_MD_IOV) == 0);
121 /* we only enable puts */
122 LASSERT(ev->type == PTL_EVENT_PUT);
123 LASSERT(atomic_read(&service->srv_nrqbds_receiving) > 0);
124 LASSERT(atomic_read(&rqbd->rqbd_refcount) > 0);
126 if (ev->rlength != ev->mlength)
127 CERROR("Warning: Possibly truncated rpc (%d/%d)\n",
128 ev->mlength, ev->rlength);
130 if (ptl_is_valid_handle(&ev->unlinked_me)) {
131 /* This is the last request to be received into this
132 * request buffer. We don't bump the refcount, since the
133 * thread servicing this event is effectively taking over
134 * portals' reference.
136 #warning ev->unlinked_me.nal_idx is not set properly in a callback
137 LASSERT(ev->unlinked_me.handle_idx==rqbd->rqbd_me_h.handle_idx);
139 /* we're off the air */
140 /* we'll probably start dropping packets in portals soon */
141 if (atomic_dec_and_test(&service->srv_nrqbds_receiving))
142 CERROR("All request buffers busy\n");
144 /* +1 ref for service thread */
145 atomic_inc(&rqbd->rqbd_refcount);
148 wake_up(&service->srv_waitq);
153 static int bulk_put_source_callback(ptl_event_t *ev)
155 struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
156 struct ptlrpc_bulk_page *bulk;
157 struct list_head *tmp;
158 struct list_head *next;
161 CDEBUG(D_NET, "got %s event %d\n",
162 (ev->type == PTL_EVENT_SENT) ? "SENT" :
163 (ev->type == PTL_EVENT_ACK) ? "ACK" : "UNEXPECTED", ev->type);
165 LASSERT(ev->type == PTL_EVENT_SENT || ev->type == PTL_EVENT_ACK);
167 LASSERT(atomic_read(&desc->bd_source_callback_count) > 0 &&
168 atomic_read(&desc->bd_source_callback_count) <= 2);
170 /* 1 fragment for each page always */
171 LASSERT(ev->mem_desc.niov == desc->bd_page_count);
173 if (atomic_dec_and_test(&desc->bd_source_callback_count)) {
174 void (*event_handler)(struct ptlrpc_bulk_desc *);
176 list_for_each_safe(tmp, next, &desc->bd_page_list) {
177 bulk = list_entry(tmp, struct ptlrpc_bulk_page,
180 if (bulk->bp_cb != NULL)
184 /* We need to make a note of whether there's an event handler
185 * before we call wake_up, because if there is no event handler,
186 * 'desc' might be freed before we're scheduled again. */
187 event_handler = desc->bd_ptl_ev_hdlr;
189 desc->bd_flags |= PTL_BULK_FL_SENT;
190 wake_up(&desc->bd_waitq);
192 LASSERT(desc->bd_ptl_ev_hdlr == event_handler);
200 static int bulk_put_sink_callback(ptl_event_t *ev)
202 struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
203 struct ptlrpc_bulk_page *bulk;
204 struct list_head *tmp;
205 struct list_head *next;
206 ptl_size_t total = 0;
207 void (*event_handler)(struct ptlrpc_bulk_desc *);
210 LASSERT(ev->type == PTL_EVENT_PUT);
212 /* put with zero offset */
213 LASSERT(ev->offset == 0);
215 LASSERT((ev->mem_desc.options & PTL_MD_IOV) != 0);
216 /* 1 fragment for each page always */
217 LASSERT(ev->mem_desc.niov == desc->bd_page_count);
219 list_for_each_safe (tmp, next, &desc->bd_page_list) {
220 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
222 total += bulk->bp_buflen;
224 if (bulk->bp_cb != NULL)
228 LASSERT(ev->mem_desc.length == total);
230 /* We need to make a note of whether there's an event handler
231 * before we call wake_up, because if there is no event
232 * handler, 'desc' might be freed before we're scheduled again. */
233 event_handler = desc->bd_ptl_ev_hdlr;
235 desc->bd_flags |= PTL_BULK_FL_RCVD;
236 wake_up(&desc->bd_waitq);
238 LASSERT(desc->bd_ptl_ev_hdlr == event_handler);
245 static int bulk_get_source_callback(ptl_event_t *ev)
247 struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
248 struct ptlrpc_bulk_page *bulk;
249 struct list_head *tmp;
250 struct list_head *next;
251 ptl_size_t total = 0;
252 void (*event_handler)(struct ptlrpc_bulk_desc *);
255 LASSERT(ev->type == PTL_EVENT_GET);
257 /* put with zero offset */
258 LASSERT(ev->offset == 0);
260 LASSERT((ev->mem_desc.options & PTL_MD_IOV) != 0);
261 /* 1 fragment for each page always */
262 LASSERT(ev->mem_desc.niov == desc->bd_page_count);
264 list_for_each_safe (tmp, next, &desc->bd_page_list) {
265 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
267 total += bulk->bp_buflen;
269 if (bulk->bp_cb != NULL)
273 LASSERT(ev->mem_desc.length == total);
275 /* We need to make a note of whether there's an event handler
276 * before we call wake_up, because if there is no event
277 * handler, 'desc' might be freed before we're scheduled again. */
278 event_handler = desc->bd_ptl_ev_hdlr;
280 desc->bd_flags |= PTL_BULK_FL_SENT;
281 wake_up(&desc->bd_waitq);
283 LASSERT(desc->bd_ptl_ev_hdlr == event_handler);
291 static int bulk_get_sink_callback(ptl_event_t *ev)
293 struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
294 struct ptlrpc_bulk_page *bulk;
295 struct list_head *tmp;
296 struct list_head *next;
299 CDEBUG(D_NET, "got %s event %d\n",
300 (ev->type == PTL_EVENT_SENT) ? "SENT" :
301 (ev->type == PTL_EVENT_REPLY) ? "REPLY" : "UNEXPECTED",
304 LASSERT(ev->type == PTL_EVENT_SENT || ev->type == PTL_EVENT_REPLY);
306 LASSERT(atomic_read(&desc->bd_source_callback_count) > 0 &&
307 atomic_read(&desc->bd_source_callback_count) <= 2);
309 /* 1 fragment for each page always */
310 LASSERT(ev->mem_desc.niov == desc->bd_page_count);
312 if (atomic_dec_and_test(&desc->bd_source_callback_count)) {
313 void (*event_handler)(struct ptlrpc_bulk_desc *);
315 list_for_each_safe(tmp, next, &desc->bd_page_list) {
316 bulk = list_entry(tmp, struct ptlrpc_bulk_page,
319 if (bulk->bp_cb != NULL)
323 /* We need to make a note of whether there's an event handler
324 * before we call wake_up, because if there is no event handler,
325 * 'desc' might be freed before we're scheduled again. */
326 event_handler = desc->bd_ptl_ev_hdlr;
328 desc->bd_flags |= PTL_BULK_FL_RCVD;
329 wake_up(&desc->bd_waitq);
331 LASSERT(desc->bd_ptl_ev_hdlr == event_handler);
339 int ptlrpc_init_portals(void)
344 /* Use the qswnal if it's there */
345 if ((qswnal_nip = inter_module_get("kqswnal_ni")) != NULL)
347 else if ((gmnal_nip = inter_module_get("kgmnal_ni")) != NULL)
349 else if ((socknal_nip = inter_module_get("ksocknal_ni")) != NULL)
351 else if ((toenal_nip = inter_module_get("ktoenal_ni")) != NULL)
354 CERROR("get_ni failed: is a NAL module loaded?\n");
358 rc = PtlEQAlloc(ni, 1024, request_out_callback, &request_out_eq);
360 CERROR("PtlEQAlloc failed: %d\n", rc);
362 rc = PtlEQAlloc(ni, 1024, reply_out_callback, &reply_out_eq);
364 CERROR("PtlEQAlloc failed: %d\n", rc);
366 rc = PtlEQAlloc(ni, 1024, reply_in_callback, &reply_in_eq);
368 CERROR("PtlEQAlloc failed: %d\n", rc);
370 rc = PtlEQAlloc(ni, 1024, bulk_put_source_callback,
371 &bulk_put_source_eq);
373 CERROR("PtlEQAlloc failed: %d\n", rc);
375 rc = PtlEQAlloc(ni, 1024, bulk_put_sink_callback, &bulk_put_sink_eq);
377 CERROR("PtlEQAlloc failed: %d\n", rc);
379 rc = PtlEQAlloc(ni, 1024, bulk_get_source_callback,
380 &bulk_get_source_eq);
382 CERROR("PtlEQAlloc failed: %d\n", rc);
384 rc = PtlEQAlloc(ni, 1024, bulk_get_sink_callback, &bulk_get_sink_eq);
386 CERROR("PtlEQAlloc failed: %d\n", rc);
391 void ptlrpc_exit_portals(void)
393 PtlEQFree(request_out_eq);
394 PtlEQFree(reply_out_eq);
395 PtlEQFree(reply_in_eq);
396 PtlEQFree(bulk_put_source_eq);
397 PtlEQFree(bulk_put_sink_eq);
398 PtlEQFree(bulk_get_source_eq);
399 PtlEQFree(bulk_get_sink_eq);
401 if (qswnal_nip != NULL)
402 inter_module_put("kqswnal_ni");
403 if (socknal_nip != NULL)
404 inter_module_put("ksocknal_ni");
405 if (gmnal_nip != NULL)
406 inter_module_put("kgmnal_ni");
407 if (toenal_nip != NULL)
408 inter_module_put("ktoenal_ni");