1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 #define DEBUG_SUBSYSTEM S_RPC
26 #include <linux/module.h>
28 #include <liblustre.h>
30 #include <linux/obd_class.h>
31 #include <linux/lustre_net.h>
33 struct ptlrpc_ni ptlrpc_interfaces[NAL_MAX_NR];
34 int ptlrpc_ninterfaces;
37 * Free the packet when it has gone out
39 static int request_out_callback(ptl_event_t *ev)
41 struct ptlrpc_request *req = ev->mem_desc.user_ptr;
44 /* requests always contiguous */
45 LASSERT((ev->mem_desc.options & (PTL_MD_IOV | PTL_MD_KIOV)) == 0);
47 if (ev->type != PTL_EVENT_SENT) {
48 // XXX make sure we understand all events, including ACK's
49 CERROR("Unknown event %d\n", ev->type);
53 /* this balances the atomic_inc in ptl_send_rpc() */
54 ptlrpc_req_finished(req);
59 * Free the packet when it has gone out
61 static int reply_out_callback(ptl_event_t *ev)
63 struct ptlrpc_request *req = ev->mem_desc.user_ptr;
67 /* replies always contiguous */
68 LASSERT((ev->mem_desc.options & (PTL_MD_IOV | PTL_MD_KIOV)) == 0);
70 if (ev->type == PTL_EVENT_SENT) {
71 /* NB don't even know if this is the current reply! In fact
72 * we can't touch any state in the request, since the
73 * service handler zeros it on each incoming request. */
74 OBD_FREE(ev->mem_desc.start, ev->mem_desc.length);
75 } else if (ev->type == PTL_EVENT_ACK) {
76 LASSERT(req->rq_want_ack);
77 spin_lock_irqsave(&req->rq_lock, flags);
79 wake_up(&req->rq_reply_waitq);
80 spin_unlock_irqrestore(&req->rq_lock, flags);
82 // XXX make sure we understand all events
83 CERROR("Unknown event %d\n", ev->type);
91 * Wake up the thread waiting for the reply once it comes in.
93 int reply_in_callback(ptl_event_t *ev)
95 struct ptlrpc_request *req = ev->mem_desc.user_ptr;
99 /* replies always contiguous */
100 LASSERT((ev->mem_desc.options & (PTL_MD_IOV | PTL_MD_KIOV)) == 0);
102 if (req->rq_xid == 0x5a5a5a5a5a5a5a5aULL) {
103 CERROR("Reply received for freed request! Probably a missing "
108 if (req->rq_xid != ev->match_bits) {
109 CERROR("Reply packet for wrong request\n");
113 if (ev->type == PTL_EVENT_PUT) {
114 /* Bug 1190: should handle non-zero offset as a protocol
116 LASSERT (ev->offset == 0);
118 spin_lock_irqsave (&req->rq_lock, flags);
119 LASSERT (req->rq_receiving_reply);
120 req->rq_receiving_reply = 0;
122 if (req->rq_set != NULL)
123 wake_up(&req->rq_set->set_waitq);
125 wake_up(&req->rq_reply_waitq);
126 spin_unlock_irqrestore (&req->rq_lock, flags);
128 // XXX make sure we understand all events, including ACKs
129 CERROR("Unknown event %d\n", ev->type);
136 int request_in_callback(ptl_event_t *ev)
138 struct ptlrpc_request_buffer_desc *rqbd = ev->mem_desc.user_ptr;
139 struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
140 struct ptlrpc_service *service = srv_ni->sni_service;
142 /* requests always contiguous */
143 LASSERT((ev->mem_desc.options & (PTL_MD_IOV | PTL_MD_KIOV)) == 0);
144 /* we only enable puts */
145 LASSERT(ev->type == PTL_EVENT_PUT);
146 LASSERT(atomic_read(&srv_ni->sni_nrqbds_receiving) > 0);
147 LASSERT(atomic_read(&rqbd->rqbd_refcount) > 0);
149 if (ev->rlength != ev->mlength)
150 CERROR("Warning: Possibly truncated rpc (%d/%d)\n",
151 ev->mlength, ev->rlength);
153 if (!PtlHandleEqual (ev->unlinked_me, PTL_HANDLE_NONE)) {
154 /* This is the last request to be received into this
155 * request buffer. We don't bump the refcount, since the
156 * thread servicing this event is effectively taking over
157 * portals' reference.
159 /* NB ev->unlinked_me.nal_idx is not set properly in a callback */
160 LASSERT(ev->unlinked_me.cookie==rqbd->rqbd_me_h.cookie);
162 /* we're off the air */
163 /* we'll probably start dropping packets in portals soon */
164 if (atomic_dec_and_test(&srv_ni->sni_nrqbds_receiving))
165 CERROR("All request buffers busy\n");
167 /* +1 ref for service thread */
168 atomic_inc(&rqbd->rqbd_refcount);
171 wake_up(&service->srv_waitq);
176 static int bulk_put_source_callback(ptl_event_t *ev)
179 struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
182 CDEBUG(D_NET, "got %s event %d\n",
183 (ev->type == PTL_EVENT_SENT) ? "SENT" :
184 (ev->type == PTL_EVENT_ACK) ? "ACK" : "UNEXPECTED", ev->type);
186 LASSERT(ev->type == PTL_EVENT_SENT || ev->type == PTL_EVENT_ACK);
188 /* 1 fragment for each page always */
189 LASSERT(ev->mem_desc.niov == desc->bd_page_count);
191 spin_lock_irqsave (&desc->bd_lock, flags);
193 LASSERT(desc->bd_callback_count > 0 &&
194 desc->bd_callback_count <= 2);
196 if (--desc->bd_callback_count == 0) {
197 desc->bd_network_rw = 0;
198 desc->bd_complete = 1;
199 wake_up(&desc->bd_waitq);
202 spin_unlock_irqrestore (&desc->bd_lock, flags);
206 struct ptlrpc_bulk_desc ptlrpc_bad_desc;
207 ptl_event_t ptlrpc_bad_event;
209 static int bulk_put_sink_callback(ptl_event_t *ev)
211 struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
215 LASSERT(ev->type == PTL_EVENT_PUT);
218 LASSERT((ev->mem_desc.options & (PTL_MD_IOV | PTL_MD_KIOV)) ==
220 /* Honestly, it's best to find out early. */
221 if (desc->bd_page_count == 0x5a5a5a5a ||
222 desc->bd_page_count != ev->mem_desc.niov ||
223 ev->mem_desc.start != &desc->bd_iov) {
224 /* not guaranteed (don't LASSERT) but good for this bug hunt */
225 ptlrpc_bad_event = *ev;
226 ptlrpc_bad_desc = *desc;
227 CERROR ("XXX ev %p type %d portal %d match "LPX64", seq %ld\n",
228 ev, ev->type, ev->portal, ev->match_bits, ev->sequence);
229 CERROR ("XXX desc %p, export %p import %p gen %d "
231 desc, desc->bd_export,
232 desc->bd_import, desc->bd_import_generation,
237 LASSERT(desc->bd_page_count != 0x5a5a5a5a);
238 /* 1 fragment for each page always */
239 LASSERT(ev->mem_desc.niov == desc->bd_page_count);
240 LASSERT(ev->match_bits == desc->bd_req->rq_xid);
242 /* peer must put with zero offset */
243 if (ev->offset != 0) {
244 /* Bug 1190: handle this as a protocol failure */
245 CERROR ("Bad offset %d\n", ev->offset);
249 /* No check for total # bytes; this could be a short read */
251 spin_lock_irqsave (&desc->bd_lock, flags);
252 desc->bd_network_rw = 0;
253 desc->bd_complete = 1;
254 if (desc->bd_req->rq_set != NULL)
255 wake_up (&desc->bd_req->rq_set->set_waitq);
257 wake_up (&desc->bd_req->rq_reply_waitq);
258 spin_unlock_irqrestore (&desc->bd_lock, flags);
263 static int bulk_get_source_callback(ptl_event_t *ev)
265 struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
266 struct ptlrpc_bulk_page *bulk;
267 struct list_head *tmp;
269 ptl_size_t total = 0;
272 LASSERT(ev->type == PTL_EVENT_GET);
275 LASSERT((ev->mem_desc.options & (PTL_MD_IOV | PTL_MD_KIOV)) ==
277 /* 1 fragment for each page always */
278 LASSERT(ev->mem_desc.niov == desc->bd_page_count);
279 LASSERT(ev->match_bits == desc->bd_req->rq_xid);
281 /* peer must get with zero offset */
282 if (ev->offset != 0) {
283 /* Bug 1190: handle this as a protocol failure */
284 CERROR ("Bad offset %d\n", ev->offset);
288 list_for_each (tmp, &desc->bd_page_list) {
289 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
291 total += bulk->bp_buflen;
294 /* peer must get everything */
295 if (ev->mem_desc.length != total) {
296 /* Bug 1190: handle this as a protocol failure */
297 CERROR ("Bad length/total %d/%d\n", ev->mem_desc.length, total);
301 spin_lock_irqsave (&desc->bd_lock, flags);
302 desc->bd_network_rw = 0;
303 desc->bd_complete = 1;
304 if (desc->bd_req->rq_set != NULL)
305 wake_up (&desc->bd_req->rq_set->set_waitq);
307 wake_up (&desc->bd_req->rq_reply_waitq);
308 spin_unlock_irqrestore (&desc->bd_lock, flags);
313 static int bulk_get_sink_callback(ptl_event_t *ev)
315 struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
319 CDEBUG(D_NET, "got %s event %d desc %p\n",
320 (ev->type == PTL_EVENT_SENT) ? "SENT" :
321 (ev->type == PTL_EVENT_REPLY) ? "REPLY" : "UNEXPECTED",
324 LASSERT(ev->type == PTL_EVENT_SENT || ev->type == PTL_EVENT_REPLY);
326 /* 1 fragment for each page always */
327 LASSERT(ev->mem_desc.niov == desc->bd_page_count);
329 spin_lock_irqsave (&desc->bd_lock, flags);
330 LASSERT(desc->bd_callback_count > 0 &&
331 desc->bd_callback_count <= 2);
333 if (--desc->bd_callback_count == 0) {
334 desc->bd_network_rw = 0;
335 desc->bd_complete = 1;
336 wake_up(&desc->bd_waitq);
338 spin_unlock_irqrestore (&desc->bd_lock, flags);
343 int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, struct ptlrpc_peer *peer)
345 struct ptlrpc_ni *pni;
346 struct lustre_peer lpeer;
348 int rc = lustre_uuid_to_peer (uuid->uuid, &lpeer);
353 for (i = 0; i < ptlrpc_ninterfaces; i++) {
354 pni = &ptlrpc_interfaces[i];
356 if (!memcmp(&lpeer.peer_ni, &pni->pni_ni_h,
357 sizeof (lpeer.peer_ni))) {
358 peer->peer_nid = lpeer.peer_nid;
364 CERROR("Can't find ptlrpc interface for "LPX64" ni handle %08lx."LPX64"\n",
365 lpeer.peer_nid, lpeer.peer_ni.nal_idx, lpeer.peer_ni.cookie);
369 void ptlrpc_ni_fini(struct ptlrpc_ni *pni)
371 PtlEQFree(pni->pni_request_out_eq_h);
372 PtlEQFree(pni->pni_reply_out_eq_h);
373 PtlEQFree(pni->pni_reply_in_eq_h);
374 PtlEQFree(pni->pni_bulk_put_source_eq_h);
375 PtlEQFree(pni->pni_bulk_put_sink_eq_h);
376 PtlEQFree(pni->pni_bulk_get_source_eq_h);
377 PtlEQFree(pni->pni_bulk_get_sink_eq_h);
379 kportal_put_ni (pni->pni_number);
382 int ptlrpc_ni_init(int number, char *name, struct ptlrpc_ni *pni)
385 ptl_handle_ni_t *nip = kportal_get_ni (number);
388 CDEBUG (D_NET, "Network interface %s not loaded\n", name);
392 CDEBUG (D_NET, "init %d %s: nal_idx %ld\n", number, name, nip->nal_idx);
394 pni->pni_name = name;
395 pni->pni_number = number;
396 pni->pni_ni_h = *nip;
398 pni->pni_request_out_eq_h = PTL_HANDLE_NONE;
399 pni->pni_reply_out_eq_h = PTL_HANDLE_NONE;
400 pni->pni_reply_in_eq_h = PTL_HANDLE_NONE;
401 pni->pni_bulk_put_source_eq_h = PTL_HANDLE_NONE;
402 pni->pni_bulk_put_sink_eq_h = PTL_HANDLE_NONE;
403 pni->pni_bulk_get_source_eq_h = PTL_HANDLE_NONE;
404 pni->pni_bulk_get_sink_eq_h = PTL_HANDLE_NONE;
406 /* NB We never actually PtlEQGet() out of these events queues since
407 * we're only interested in the event callback, so we can just let
408 * them wrap. Their sizes aren't a big deal, apart from providing
409 * a little history for debugging... */
411 rc = PtlEQAlloc(pni->pni_ni_h, 1024, request_out_callback,
412 &pni->pni_request_out_eq_h);
414 GOTO (fail, rc = -ENOMEM);
416 rc = PtlEQAlloc(pni->pni_ni_h, 1024, reply_out_callback,
417 &pni->pni_reply_out_eq_h);
419 GOTO (fail, rc = -ENOMEM);
421 rc = PtlEQAlloc(pni->pni_ni_h, 1024, reply_in_callback,
422 &pni->pni_reply_in_eq_h);
424 GOTO (fail, rc = -ENOMEM);
426 rc = PtlEQAlloc(pni->pni_ni_h, 1024, bulk_put_source_callback,
427 &pni->pni_bulk_put_source_eq_h);
429 GOTO (fail, rc = -ENOMEM);
431 rc = PtlEQAlloc(pni->pni_ni_h, 1024, bulk_put_sink_callback,
432 &pni->pni_bulk_put_sink_eq_h);
434 GOTO (fail, rc = -ENOMEM);
436 rc = PtlEQAlloc(pni->pni_ni_h, 1024, bulk_get_source_callback,
437 &pni->pni_bulk_get_source_eq_h);
439 GOTO (fail, rc = -ENOMEM);
441 rc = PtlEQAlloc(pni->pni_ni_h, 1024, bulk_get_sink_callback,
442 &pni->pni_bulk_get_sink_eq_h);
444 GOTO (fail, rc = -ENOMEM);
448 CERROR ("Failed to initialise network interface %s: %d\n",
451 /* OK to do complete teardown since we invalidated the handles above */
452 ptlrpc_ni_fini (pni);
458 liblustre_check_events (int block)
465 /* XXX to accelerate recovery tests XXX */
468 rc = PtlEQWait_timeout(ptlrpc_interfaces[0].pni_eq_h, &ev, block);
470 rc = PtlEQGet (ptlrpc_interfaces[0].pni_eq_h, &ev);
472 if (rc == PTL_EQ_EMPTY)
475 LASSERT (rc == PTL_EQ_DROPPED || rc == PTL_OK);
477 #if PORTALS_DOES_NOT_SUPPORT_CALLBACKS
478 if (rc == PTL_EQ_DROPPED)
479 CERROR ("Dropped an event!!!\n");
481 ptlrpc_master_callback (&ev);
486 int liblustre_wait_event(struct l_wait_info *lwi)
490 /* non-blocking checks (actually we might block in a service for
491 * bulk but we won't block in a blocked service)
493 if (liblustre_check_events(0) ||
494 liblustre_check_services()) {
495 /* the condition the caller is waiting for may now hold */
499 /* block for an event */
500 liblustre_check_events(lwi->lwi_timeout);
502 /* check it's not for some service */
503 liblustre_check_services ();
510 int ptlrpc_init_portals(void)
512 /* Add new portals network interfaces here.
513 * Order is irrelevent! */
519 {SOCKNAL, "socknal"},
524 {SCIMACNAL, "scimacnal"}};
528 LASSERT(ptlrpc_ninterfaces == 0);
530 for (i = 0; i < sizeof (ptl_nis) / sizeof (ptl_nis[0]); i++) {
531 LASSERT(ptlrpc_ninterfaces < (sizeof(ptlrpc_interfaces) /
532 sizeof(ptlrpc_interfaces[0])));
534 rc = ptlrpc_ni_init(ptl_nis[i].number, ptl_nis[i].name,
535 &ptlrpc_interfaces[ptlrpc_ninterfaces]);
537 ptlrpc_ninterfaces++;
540 if (ptlrpc_ninterfaces == 0) {
541 CERROR("network initialisation failed: is a NAL module "
548 void ptlrpc_exit_portals(void)
550 while (ptlrpc_ninterfaces > 0)
551 ptlrpc_ni_fini (&ptlrpc_interfaces[--ptlrpc_ninterfaces]);