1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 #define DEBUG_SUBSYSTEM S_RPC
26 #include <linux/module.h>
28 #include <liblustre.h>
30 #include <linux/obd_class.h>
31 #include <linux/lustre_net.h>
33 struct ptlrpc_ni ptlrpc_interfaces[NAL_MAX_NR];
34 int ptlrpc_ninterfaces;
37 * Client's outgoing request callback
39 void request_out_callback(ptl_event_t *ev)
41 struct ptlrpc_cb_id *cbid = ev->mem_desc.user_ptr;
42 struct ptlrpc_request *req = cbid->cbid_arg;
46 LASSERT (ev->type == PTL_EVENT_SENT ||
47 ev->type == PTL_EVENT_UNLINK);
48 LASSERT (ev->unlinked);
50 DEBUG_REQ((ev->status == PTL_OK) ? D_NET : D_ERROR, req,
51 "type %d, status %d", ev->type, ev->status);
53 if (ev->type == PTL_EVENT_UNLINK ||
54 ev->status != PTL_OK) {
56 /* Failed send: make it seem like the reply timed out, just
57 * like failing sends in client.c does currently... */
59 spin_lock_irqsave(&req->rq_lock, flags);
61 spin_unlock_irqrestore(&req->rq_lock, flags);
63 ptlrpc_wake_client_req(req);
66 /* this balances the atomic_inc in ptl_send_rpc() */
67 ptlrpc_req_finished(req);
72 * Client's incoming reply callback
74 void reply_in_callback(ptl_event_t *ev)
76 struct ptlrpc_cb_id *cbid = ev->mem_desc.user_ptr;
77 struct ptlrpc_request *req = cbid->cbid_arg;
81 LASSERT (ev->type == PTL_EVENT_PUT ||
82 ev->type == PTL_EVENT_UNLINK);
83 LASSERT (ev->unlinked);
84 LASSERT (ev->mem_desc.start == req->rq_repmsg);
85 LASSERT (ev->offset == 0);
86 LASSERT (ev->mlength <= req->rq_replen);
88 DEBUG_REQ((ev->status == PTL_OK) ? D_NET : D_ERROR, req,
89 "type %d, status %d", ev->type, ev->status);
91 spin_lock_irqsave (&req->rq_lock, flags);
93 LASSERT (req->rq_receiving_reply);
94 req->rq_receiving_reply = 0;
96 if (ev->type == PTL_EVENT_PUT &&
97 ev->status == PTL_OK) {
99 req->rq_nob_received = ev->mlength;
102 /* NB don't unlock till after wakeup; req can disappear under us
103 * since we don't have our own ref */
104 ptlrpc_wake_client_req(req);
106 spin_unlock_irqrestore (&req->rq_lock, flags);
111 * Client's bulk has been written/read
113 void client_bulk_callback (ptl_event_t *ev)
115 struct ptlrpc_cb_id *cbid = ev->mem_desc.user_ptr;
116 struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
120 LASSERT ((desc->bd_type == BULK_PUT_SINK &&
121 ev->type == PTL_EVENT_PUT) ||
122 (desc->bd_type == BULK_GET_SOURCE &&
123 ev->type == PTL_EVENT_GET) ||
124 ev->type == PTL_EVENT_UNLINK);
125 LASSERT (ev->unlinked);
127 CDEBUG((ev->status == PTL_OK) ? D_NET : D_ERROR,
128 "event type %d, status %d, desc %p\n",
129 ev->type, ev->status, desc);
131 spin_lock_irqsave (&desc->bd_lock, flags);
133 LASSERT(desc->bd_network_rw);
134 desc->bd_network_rw = 0;
136 if (ev->type != PTL_EVENT_UNLINK &&
137 ev->status == PTL_OK) {
138 desc->bd_success = 1;
139 desc->bd_nob_transferred = ev->mlength;
142 /* NB don't unlock till after wakeup; desc can disappear under us
144 ptlrpc_wake_client_req(desc->bd_req);
146 spin_unlock_irqrestore (&desc->bd_lock, flags);
151 * Server's incoming request callback
153 void request_in_callback(ptl_event_t *ev)
155 struct ptlrpc_cb_id *cbid = ev->mem_desc.user_ptr;
156 struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
157 struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
158 struct ptlrpc_service *service = srv_ni->sni_service;
159 struct ptlrpc_request *req;
163 LASSERT (ev->type == PTL_EVENT_PUT ||
164 ev->type == PTL_EVENT_UNLINK);
165 LASSERT ((char *)ev->mem_desc.start >= rqbd->rqbd_buffer);
166 LASSERT ((char *)ev->mem_desc.start + ev->offset + ev->mlength <=
167 rqbd->rqbd_buffer + service->srv_buf_size);
169 CDEBUG((ev->status == PTL_OK) ? D_NET : D_ERROR,
170 "event type %d, status %d, service %s\n",
171 ev->type, ev->status, service->srv_name);
174 /* If this is the last request message to fit in the
175 * request buffer we can use the request object embedded in
176 * rqbd. Note that if we failed to allocate a request,
177 * we'd have to re-post the rqbd, which we can't do in this
179 req = &rqbd->rqbd_req;
180 memset(req, 0, sizeof (*req));
182 LASSERT (ev->type == PTL_EVENT_PUT);
183 if (ev->status != PTL_OK) {
184 /* We moaned above already... */
187 OBD_ALLOC_GFP(req, sizeof(*req), GFP_ATOMIC);
189 CERROR("Can't allocate incoming request descriptor: "
190 "Dropping %s RPC from "LPX64"\n",
191 service->srv_name, ev->initiator.nid);
196 /* NB we ABSOLUTELY RELY on req being zeroed, so pointers are NULL,
197 * flags are reset and scalars are zero. We only set the message
198 * size to non-zero if this was a successful receive. */
199 req->rq_xid = ev->match_bits;
200 req->rq_reqmsg = ev->mem_desc.start + ev->offset;
201 if (ev->type == PTL_EVENT_PUT &&
202 ev->status == PTL_OK)
203 req->rq_reqlen = ev->mlength;
204 req->rq_arrival_time = ev->arrival_time;
205 req->rq_peer.peer_nid = ev->initiator.nid;
206 req->rq_peer.peer_ni = rqbd->rqbd_srv_ni->sni_ni;
209 spin_lock_irqsave (&service->srv_lock, flags);
212 srv_ni->sni_nrqbd_receiving--;
213 if (ev->type != PTL_EVENT_UNLINK &&
214 srv_ni->sni_nrqbd_receiving == 0) {
215 /* This service is off-air on this interface because
216 * all its request buffers are busy. Portals will
217 * start dropping incoming requests until more buffers
218 * get posted. NB don't moan if it's because we're
219 * tearing down the service. */
220 CWARN("All %s %s request buffers busy\n",
221 service->srv_name, srv_ni->sni_ni->pni_name);
223 /* req takes over the network's ref on rqbd */
225 /* req takes a ref on rqbd */
226 rqbd->rqbd_refcount++;
229 list_add_tail(&req->rq_list, &service->srv_request_queue);
230 service->srv_n_queued_reqs++;
231 rqbd->rqbd_eventcount++;
233 /* NB everything can disappear under us once the request
234 * has been queued and we unlock, so do the wake now... */
235 wake_up(&service->srv_waitq);
237 spin_unlock_irqrestore(&service->srv_lock, flags);
242 * Server's outgoing reply callback
244 void reply_out_callback(ptl_event_t *ev)
246 struct ptlrpc_cb_id *cbid = ev->mem_desc.user_ptr;
247 struct ptlrpc_reply_state *rs = cbid->cbid_arg;
248 struct ptlrpc_srv_ni *sni = rs->rs_srv_ni;
249 struct ptlrpc_service *svc = sni->sni_service;
253 LASSERT (ev->type == PTL_EVENT_SENT ||
254 ev->type == PTL_EVENT_ACK ||
255 ev->type == PTL_EVENT_UNLINK);
257 if (!rs->rs_difficult) {
258 /* I'm totally responsible for freeing "easy" replies */
259 LASSERT (ev->unlinked);
260 lustre_free_reply_state (rs);
261 atomic_dec (&svc->srv_outstanding_replies);
266 LASSERT (rs->rs_on_net);
269 /* Last network callback */
270 spin_lock_irqsave (&svc->srv_lock, flags);
272 ptlrpc_schedule_difficult_reply (rs);
273 spin_unlock_irqrestore (&svc->srv_lock, flags);
280 * Server's bulk completion callback
282 void server_bulk_callback (ptl_event_t *ev)
284 struct ptlrpc_cb_id *cbid = ev->mem_desc.user_ptr;
285 struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
289 LASSERT (ev->type == PTL_EVENT_SENT ||
290 ev->type == PTL_EVENT_UNLINK ||
291 (desc->bd_type == BULK_PUT_SOURCE &&
292 ev->type == PTL_EVENT_ACK) ||
293 (desc->bd_type == BULK_GET_SINK &&
294 ev->type == PTL_EVENT_REPLY));
296 CDEBUG((ev->status == PTL_OK) ? D_NET : D_ERROR,
297 "event type %d, status %d, desc %p\n",
298 ev->type, ev->status, desc);
300 spin_lock_irqsave (&desc->bd_lock, flags);
302 if ((ev->type == PTL_EVENT_ACK ||
303 ev->type == PTL_EVENT_REPLY) &&
304 ev->status == PTL_OK) {
305 /* We heard back from the peer, so even if we get this
306 * before the SENT event (oh yes we can), we know we
307 * read/wrote the peer buffer and how much... */
308 desc->bd_success = 1;
309 desc->bd_nob_transferred = ev->mlength;
313 /* This is the last callback no matter what... */
314 desc->bd_network_rw = 0;
315 wake_up(&desc->bd_waitq);
318 spin_unlock_irqrestore (&desc->bd_lock, flags);
322 static int ptlrpc_master_callback(ptl_event_t *ev)
324 struct ptlrpc_cb_id *cbid = ev->mem_desc.user_ptr;
325 void (*callback)(ptl_event_t *ev) = cbid->cbid_fn;
327 /* Honestly, it's best to find out early. */
328 LASSERT (cbid->cbid_arg != (void *)0x5a5a5a5a5a5a5a5a);
329 LASSERT (callback == request_out_callback ||
330 callback == reply_in_callback ||
331 callback == client_bulk_callback ||
332 callback == request_in_callback ||
333 callback == reply_out_callback ||
334 callback == server_bulk_callback);
340 int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, struct ptlrpc_peer *peer)
342 struct ptlrpc_ni *pni;
343 struct lustre_peer lpeer;
345 int rc = lustre_uuid_to_peer (uuid->uuid, &lpeer);
350 for (i = 0; i < ptlrpc_ninterfaces; i++) {
351 pni = &ptlrpc_interfaces[i];
353 if (!memcmp(&lpeer.peer_ni, &pni->pni_ni_h,
354 sizeof (lpeer.peer_ni))) {
355 peer->peer_nid = lpeer.peer_nid;
361 CERROR("Can't find ptlrpc interface for "LPX64" ni handle %08lx."LPX64"\n",
362 lpeer.peer_nid, lpeer.peer_ni.nal_idx, lpeer.peer_ni.cookie);
366 void ptlrpc_ni_fini(struct ptlrpc_ni *pni)
368 wait_queue_head_t waitq;
369 struct l_wait_info lwi;
373 /* Wait for the event queue to become idle since there may still be
374 * messages in flight with pending events (i.e. the fire-and-forget
375 * messages == client requests and "non-difficult" server
378 for (retries = 0;; retries++) {
379 rc = PtlEQFree(pni->pni_eq_h);
385 kportal_put_ni (pni->pni_number);
390 CWARN("Event queue for %s still busy\n",
394 init_waitqueue_head(&waitq);
395 lwi = LWI_TIMEOUT(2*HZ, NULL, NULL);
396 l_wait_event(waitq, 0, &lwi);
403 int ptlrpc_ni_init(int number, char *name, struct ptlrpc_ni *pni)
406 ptl_handle_ni_t *nip = kportal_get_ni (number);
409 CDEBUG (D_NET, "Network interface %s not loaded\n", name);
413 CDEBUG (D_NET, "init %d %s: nal_idx %ld\n", number, name, nip->nal_idx);
415 pni->pni_name = name;
416 pni->pni_number = number;
417 pni->pni_ni_h = *nip;
419 pni->pni_eq_h = PTL_HANDLE_NONE;
422 /* kernel: portals calls the callback when the event is added to the
423 * queue, so we don't care if we lose events */
424 rc = PtlEQAlloc(pni->pni_ni_h, 1024, ptlrpc_master_callback,
427 /* liblustre: no asynchronous callback and allocate a nice big event
428 * queue so we don't drop any events... */
429 rc = PtlEQAlloc(pni->pni_ni_h, 10240, NULL, &pni->pni_eq_h);
432 GOTO (fail, rc = -ENOMEM);
436 CERROR ("Failed to initialise network interface %s: %d\n",
439 /* OK to do complete teardown since we invalidated the handles above */
440 ptlrpc_ni_fini (pni);
445 LIST_HEAD(liblustre_wait_callbacks);
446 void *liblustre_services_callback;
449 liblustre_register_wait_callback (int (*fn)(void *arg), void *arg)
451 struct liblustre_wait_callback *llwc;
453 OBD_ALLOC(llwc, sizeof(*llwc));
454 LASSERT (llwc != NULL);
457 llwc->llwc_arg = arg;
458 list_add_tail(&llwc->llwc_list, &liblustre_wait_callbacks);
464 liblustre_deregister_wait_callback (void *opaque)
466 struct liblustre_wait_callback *llwc = opaque;
468 list_del(&llwc->llwc_list);
469 OBD_FREE(llwc, sizeof(*llwc));
473 liblustre_check_events (int timeout)
480 rc = PtlEQWait_timeout(ptlrpc_interfaces[0].pni_eq_h, &ev, timeout);
482 rc = PtlEQGet (ptlrpc_interfaces[0].pni_eq_h, &ev);
484 if (rc == PTL_EQ_EMPTY)
487 LASSERT (rc == PTL_EQ_DROPPED || rc == PTL_OK);
490 /* liblustre: no asynch callback so we can't affort to miss any
492 if (rc == PTL_EQ_DROPPED) {
493 CERROR ("Dropped an event!!!\n");
497 ptlrpc_master_callback (&ev);
503 liblustre_wait_event (int timeout)
505 struct list_head *tmp;
506 struct liblustre_wait_callback *llwc;
507 int found_something = 0;
509 /* First check for any new events */
510 if (liblustre_check_events(0))
513 /* Now give all registered callbacks a bite at the cherry */
514 list_for_each(tmp, &liblustre_wait_callbacks) {
515 llwc = list_entry(tmp, struct liblustre_wait_callback,
518 if (llwc->llwc_fn(llwc->llwc_arg))
522 /* return to caller if something happened */
526 /* block for an event, returning immediately on timeout */
527 if (!liblustre_check_events(timeout))
530 /* an event occurred; let all registered callbacks progress... */
531 list_for_each(tmp, &liblustre_wait_callbacks) {
532 llwc = list_entry(tmp, struct liblustre_wait_callback,
535 if (llwc->llwc_fn(llwc->llwc_arg))
539 /* ...and tell caller something happened */
544 int ptlrpc_init_portals(void)
546 /* Add new portals network interfaces here.
547 * Order is irrelevent! */
553 {SOCKNAL, "socknal"},
557 {SCIMACNAL, "scimacnal"}};
561 LASSERT(ptlrpc_ninterfaces == 0);
563 for (i = 0; i < sizeof (ptl_nis) / sizeof (ptl_nis[0]); i++) {
564 LASSERT(ptlrpc_ninterfaces < (sizeof(ptlrpc_interfaces) /
565 sizeof(ptlrpc_interfaces[0])));
567 rc = ptlrpc_ni_init(ptl_nis[i].number, ptl_nis[i].name,
568 &ptlrpc_interfaces[ptlrpc_ninterfaces]);
570 ptlrpc_ninterfaces++;
573 if (ptlrpc_ninterfaces == 0) {
574 CERROR("network initialisation failed: is a NAL module "
579 liblustre_services_callback =
580 liblustre_register_wait_callback(&liblustre_check_services, NULL);
585 void ptlrpc_exit_portals(void)
588 liblustre_deregister_wait_callback(liblustre_services_callback);
590 while (ptlrpc_ninterfaces > 0)
591 ptlrpc_ni_fini (&ptlrpc_interfaces[--ptlrpc_ninterfaces]);