Whamcloud - gitweb
* Applied fix for 1888
[fs/lustre-release.git] / lustre / ptlrpc / events.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24
25 #ifdef __KERNEL__
26 #include <linux/module.h>
27 #else
28 #include <liblustre.h>
29 #endif
30 #include <linux/obd_class.h>
31 #include <linux/lustre_net.h>
32
33 struct ptlrpc_ni  ptlrpc_interfaces[NAL_MAX_NR];
34 int               ptlrpc_ninterfaces;
35
36 /*  
37  *  Client's outgoing request callback
38  */
39 void request_out_callback(ptl_event_t *ev)
40 {
41         struct ptlrpc_cb_id   *cbid = ev->mem_desc.user_ptr;
42         struct ptlrpc_request *req = cbid->cbid_arg;
43         unsigned long          flags;
44         ENTRY;
45
46         LASSERT (ev->type == PTL_EVENT_SENT ||
47                  ev->type == PTL_EVENT_UNLINK);
48         LASSERT (ev->unlinked);
49
50         DEBUG_REQ((ev->status == PTL_OK) ? D_NET : D_ERROR, req,
51                   "type %d, status %d", ev->type, ev->status);
52
53         if (ev->type == PTL_EVENT_UNLINK ||
54             ev->status != PTL_OK) {
55
56                 /* Failed send: make it seem like the reply timed out, just
57                  * like failing sends in client.c does currently...  */
58
59                 spin_lock_irqsave(&req->rq_lock, flags);
60                 req->rq_timeout = 0;
61                 spin_unlock_irqrestore(&req->rq_lock, flags);
62                 
63                 ptlrpc_wake_client_req(req);
64         }
65
66         /* this balances the atomic_inc in ptl_send_rpc() */
67         ptlrpc_req_finished(req);
68         EXIT;
69 }
70
71 /*
72  * Client's incoming reply callback
73  */
74 void reply_in_callback(ptl_event_t *ev)
75 {
76         struct ptlrpc_cb_id   *cbid = ev->mem_desc.user_ptr;
77         struct ptlrpc_request *req = cbid->cbid_arg;
78         unsigned long flags;
79         ENTRY;
80
81         LASSERT (ev->type == PTL_EVENT_PUT ||
82                  ev->type == PTL_EVENT_UNLINK);
83         LASSERT (ev->unlinked);
84         LASSERT (ev->mem_desc.start == req->rq_repmsg);
85         LASSERT (ev->offset == 0);
86         LASSERT (ev->mlength <= req->rq_replen);
87         
88         DEBUG_REQ((ev->status == PTL_OK) ? D_NET : D_ERROR, req,
89                   "type %d, status %d", ev->type, ev->status);
90
91         spin_lock_irqsave (&req->rq_lock, flags);
92
93         LASSERT (req->rq_receiving_reply);
94         req->rq_receiving_reply = 0;
95
96         if (ev->type == PTL_EVENT_PUT &&
97             ev->status == PTL_OK) {
98                 req->rq_replied = 1;
99                 req->rq_nob_received = ev->mlength;
100         }
101
102         /* NB don't unlock till after wakeup; req can disappear under us
103          * since we don't have our own ref */
104         ptlrpc_wake_client_req(req);
105
106         spin_unlock_irqrestore (&req->rq_lock, flags);
107         EXIT;
108 }
109
110 /* 
111  * Client's bulk has been written/read
112  */
113 void client_bulk_callback (ptl_event_t *ev)
114 {
115         struct ptlrpc_cb_id     *cbid = ev->mem_desc.user_ptr;
116         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
117         unsigned long            flags;
118         ENTRY;
119
120         LASSERT ((desc->bd_type == BULK_PUT_SINK && 
121                   ev->type == PTL_EVENT_PUT) ||
122                  (desc->bd_type == BULK_GET_SOURCE &&
123                   ev->type == PTL_EVENT_GET) ||
124                  ev->type == PTL_EVENT_UNLINK);
125         LASSERT (ev->unlinked);
126
127         CDEBUG((ev->status == PTL_OK) ? D_NET : D_ERROR,
128                "event type %d, status %d, desc %p\n", 
129                ev->type, ev->status, desc);
130
131         spin_lock_irqsave (&desc->bd_lock, flags);
132
133         LASSERT(desc->bd_network_rw);
134         desc->bd_network_rw = 0;
135
136         if (ev->type != PTL_EVENT_UNLINK &&
137             ev->status == PTL_OK) {
138                 desc->bd_success = 1;
139                 desc->bd_nob_transferred = ev->mlength;
140         }
141
142         /* NB don't unlock till after wakeup; desc can disappear under us
143          * otherwise */
144         ptlrpc_wake_client_req(desc->bd_req);
145
146         spin_unlock_irqrestore (&desc->bd_lock, flags);
147         EXIT;
148 }
149
150 /* 
151  * Server's incoming request callback
152  */
153 void request_in_callback(ptl_event_t *ev)
154 {
155         struct ptlrpc_cb_id               *cbid = ev->mem_desc.user_ptr;
156         struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
157         struct ptlrpc_srv_ni              *srv_ni = rqbd->rqbd_srv_ni;
158         struct ptlrpc_service             *service = srv_ni->sni_service;
159         struct ptlrpc_request             *req;
160         long                               flags;
161         ENTRY;
162
163         LASSERT (ev->type == PTL_EVENT_PUT ||
164                  ev->type == PTL_EVENT_UNLINK);
165         LASSERT ((char *)ev->mem_desc.start >= rqbd->rqbd_buffer);
166         LASSERT ((char *)ev->mem_desc.start + ev->offset + ev->mlength <=
167                  rqbd->rqbd_buffer + service->srv_buf_size);
168
169         CDEBUG((ev->status == PTL_OK) ? D_NET : D_ERROR,
170                "event type %d, status %d, service %s\n", 
171                ev->type, ev->status, service->srv_name);
172
173         if (ev->unlinked) {
174                 /* If this is the last request message to fit in the
175                  * request buffer we can use the request object embedded in
176                  * rqbd.  Note that if we failed to allocate a request,
177                  * we'd have to re-post the rqbd, which we can't do in this
178                  * context. */
179                 req = &rqbd->rqbd_req;
180                 memset(req, 0, sizeof (*req));
181         } else {
182                 LASSERT (ev->type == PTL_EVENT_PUT);
183                 if (ev->status != PTL_OK) {
184                         /* We moaned above already... */
185                         return;
186                 }
187                 OBD_ALLOC_GFP(req, sizeof(*req), GFP_ATOMIC);
188                 if (req == NULL) {
189                         CERROR("Can't allocate incoming request descriptor: "
190                                "Dropping %s RPC from "LPX64"\n",
191                                service->srv_name, ev->initiator.nid);
192                         return;
193                 }
194         }
195
196         /* NB we ABSOLUTELY RELY on req being zeroed, so pointers are NULL,
197          * flags are reset and scalars are zero.  We only set the message
198          * size to non-zero if this was a successful receive. */
199         req->rq_xid = ev->match_bits;
200         req->rq_reqmsg = ev->mem_desc.start + ev->offset;
201         if (ev->type == PTL_EVENT_PUT &&
202             ev->status == PTL_OK)
203                 req->rq_reqlen = ev->mlength;
204         req->rq_arrival_time = ev->arrival_time;
205         req->rq_peer.peer_nid = ev->initiator.nid;
206         req->rq_peer.peer_ni = rqbd->rqbd_srv_ni->sni_ni;
207         req->rq_rqbd = rqbd;
208
209         spin_lock_irqsave (&service->srv_lock, flags);
210
211         if (ev->unlinked) {
212                 srv_ni->sni_nrqbd_receiving--;
213                 if (ev->type != PTL_EVENT_UNLINK &&
214                     srv_ni->sni_nrqbd_receiving == 0) {
215                         /* This service is off-air on this interface because
216                          * all its request buffers are busy.  Portals will
217                          * start dropping incoming requests until more buffers
218                          * get posted.  NB don't moan if it's because we're
219                          * tearing down the service. */
220                         CWARN("All %s %s request buffers busy\n",
221                               service->srv_name, srv_ni->sni_ni->pni_name);
222                 }
223                 /* req takes over the network's ref on rqbd */
224         } else {
225                 /* req takes a ref on rqbd */
226                 rqbd->rqbd_refcount++;
227         }
228
229         list_add_tail(&req->rq_list, &service->srv_request_queue);
230         service->srv_n_queued_reqs++;
231         rqbd->rqbd_eventcount++;
232
233         /* NB everything can disappear under us once the request
234          * has been queued and we unlock, so do the wake now... */
235         wake_up(&service->srv_waitq);
236
237         spin_unlock_irqrestore(&service->srv_lock, flags);
238         EXIT;
239 }
240
241 /*  
242  *  Server's outgoing reply callback
243  */
244 void reply_out_callback(ptl_event_t *ev)
245 {
246         struct ptlrpc_cb_id       *cbid = ev->mem_desc.user_ptr;
247         struct ptlrpc_reply_state *rs = cbid->cbid_arg;
248         struct ptlrpc_srv_ni      *sni = rs->rs_srv_ni;
249         struct ptlrpc_service     *svc = sni->sni_service;
250         unsigned long              flags;
251         ENTRY;
252
253         LASSERT (ev->type == PTL_EVENT_SENT ||
254                  ev->type == PTL_EVENT_ACK ||
255                  ev->type == PTL_EVENT_UNLINK);
256
257         if (!rs->rs_difficult) {
258                 /* I'm totally responsible for freeing "easy" replies */
259                 LASSERT (ev->unlinked);
260                 lustre_free_reply_state (rs);
261                 atomic_dec (&svc->srv_outstanding_replies);
262                 EXIT;
263                 return;
264         }
265
266         LASSERT (rs->rs_on_net);
267
268         if (ev->unlinked) {
269                 /* Last network callback */
270                 spin_lock_irqsave (&svc->srv_lock, flags);
271                 rs->rs_on_net = 0;
272                 ptlrpc_schedule_difficult_reply (rs);
273                 spin_unlock_irqrestore (&svc->srv_lock, flags);
274         }
275
276         EXIT;
277 }
278
279 /*
280  * Server's bulk completion callback
281  */
282 void server_bulk_callback (ptl_event_t *ev)
283 {
284         struct ptlrpc_cb_id     *cbid = ev->mem_desc.user_ptr;
285         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
286         unsigned long            flags;
287         ENTRY;
288
289         LASSERT (ev->type == PTL_EVENT_SENT ||
290                  ev->type == PTL_EVENT_UNLINK ||
291                  (desc->bd_type == BULK_PUT_SOURCE &&
292                   ev->type == PTL_EVENT_ACK) ||
293                  (desc->bd_type == BULK_GET_SINK &&
294                   ev->type == PTL_EVENT_REPLY));
295
296         CDEBUG((ev->status == PTL_OK) ? D_NET : D_ERROR,
297                "event type %d, status %d, desc %p\n", 
298                ev->type, ev->status, desc);
299
300         spin_lock_irqsave (&desc->bd_lock, flags);
301         
302         if ((ev->type == PTL_EVENT_ACK ||
303              ev->type == PTL_EVENT_REPLY) &&
304             ev->status == PTL_OK) {
305                 /* We heard back from the peer, so even if we get this
306                  * before the SENT event (oh yes we can), we know we
307                  * read/wrote the peer buffer and how much... */
308                 desc->bd_success = 1;
309                 desc->bd_nob_transferred = ev->mlength;
310         }
311
312         if (ev->unlinked) {
313                 /* This is the last callback no matter what... */
314                 desc->bd_network_rw = 0;
315                 wake_up(&desc->bd_waitq);
316         }
317
318         spin_unlock_irqrestore (&desc->bd_lock, flags);
319         EXIT;
320 }
321
322 static int ptlrpc_master_callback(ptl_event_t *ev)
323 {
324         struct ptlrpc_cb_id *cbid = ev->mem_desc.user_ptr;
325         void (*callback)(ptl_event_t *ev) = cbid->cbid_fn;
326
327         /* Honestly, it's best to find out early. */
328         LASSERT (cbid->cbid_arg != (void *)0x5a5a5a5a5a5a5a5a);
329         LASSERT (callback == request_out_callback ||
330                  callback == reply_in_callback ||
331                  callback == client_bulk_callback ||
332                  callback == request_in_callback ||
333                  callback == reply_out_callback ||
334                  callback == server_bulk_callback);
335         
336         callback (ev);
337         return (0);
338 }
339
340 int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, struct ptlrpc_peer *peer)
341 {
342         struct ptlrpc_ni   *pni;
343         struct lustre_peer  lpeer;
344         int                 i;
345         int                 rc = lustre_uuid_to_peer (uuid->uuid, &lpeer);
346
347         if (rc != 0)
348                 RETURN (rc);
349
350         for (i = 0; i < ptlrpc_ninterfaces; i++) {
351                 pni = &ptlrpc_interfaces[i];
352
353                 if (!memcmp(&lpeer.peer_ni, &pni->pni_ni_h,
354                             sizeof (lpeer.peer_ni))) {
355                         peer->peer_nid = lpeer.peer_nid;
356                         peer->peer_ni = pni;
357                         return (0);
358                 }
359         }
360
361         CERROR("Can't find ptlrpc interface for "LPX64" ni handle %08lx."LPX64"\n",
362                lpeer.peer_nid, lpeer.peer_ni.nal_idx, lpeer.peer_ni.cookie);
363         return (-ENOENT);
364 }
365
366 void ptlrpc_ni_fini(struct ptlrpc_ni *pni)
367 {
368         wait_queue_head_t   waitq;
369         struct l_wait_info  lwi;
370         int                 rc;
371         int                 retries;
372         
373         /* Wait for the event queue to become idle since there may still be
374          * messages in flight with pending events (i.e. the fire-and-forget
375          * messages == client requests and "non-difficult" server
376          * replies */
377
378         for (retries = 0;; retries++) {
379                 rc = PtlEQFree(pni->pni_eq_h);
380                 switch (rc) {
381                 default:
382                         LBUG();
383
384                 case PTL_OK:
385                         kportal_put_ni (pni->pni_number);
386                         return;
387                         
388                 case PTL_EQ_INUSE:
389                         if (retries != 0)
390                                 CWARN("Event queue for %s still busy\n",
391                                       pni->pni_name);
392                         
393                         /* Wait for a bit */
394                         init_waitqueue_head(&waitq);
395                         lwi = LWI_TIMEOUT(2*HZ, NULL, NULL);
396                         l_wait_event(waitq, 0, &lwi);
397                         break;
398                 }
399         }
400         /* notreached */
401 }
402
403 int ptlrpc_ni_init(int number, char *name, struct ptlrpc_ni *pni)
404 {
405         int              rc;
406         ptl_handle_ni_t *nip = kportal_get_ni (number);
407
408         if (nip == NULL) {
409                 CDEBUG (D_NET, "Network interface %s not loaded\n", name);
410                 return (-ENOENT);
411         }
412
413         CDEBUG (D_NET, "init %d %s: nal_idx %ld\n", number, name, nip->nal_idx);
414
415         pni->pni_name = name;
416         pni->pni_number = number;
417         pni->pni_ni_h = *nip;
418
419         pni->pni_eq_h = PTL_HANDLE_NONE;
420
421 #ifdef __KERNEL__
422         /* kernel: portals calls the callback when the event is added to the
423          * queue, so we don't care if we lose events */
424         rc = PtlEQAlloc(pni->pni_ni_h, 1024, ptlrpc_master_callback,
425                         &pni->pni_eq_h);
426 #else
427         /* liblustre: no asynchronous callback and allocate a nice big event
428          * queue so we don't drop any events... */
429         rc = PtlEQAlloc(pni->pni_ni_h, 10240, NULL, &pni->pni_eq_h);
430 #endif
431         if (rc != PTL_OK)
432                 GOTO (fail, rc = -ENOMEM);
433
434         return (0);
435  fail:
436         CERROR ("Failed to initialise network interface %s: %d\n",
437                 name, rc);
438
439         /* OK to do complete teardown since we invalidated the handles above */
440         ptlrpc_ni_fini (pni);
441         return (rc);
442 }
443
444 #ifndef __KERNEL__
445 LIST_HEAD(liblustre_wait_callbacks);
446 void *liblustre_services_callback;
447
448 void *
449 liblustre_register_wait_callback (int (*fn)(void *arg), void *arg)
450 {
451         struct liblustre_wait_callback *llwc;
452         
453         OBD_ALLOC(llwc, sizeof(*llwc));
454         LASSERT (llwc != NULL);
455         
456         llwc->llwc_fn = fn;
457         llwc->llwc_arg = arg;
458         list_add_tail(&llwc->llwc_list, &liblustre_wait_callbacks);
459         
460         return (llwc);
461 }
462
463 void
464 liblustre_deregister_wait_callback (void *opaque)
465 {
466         struct liblustre_wait_callback *llwc = opaque;
467         
468         list_del(&llwc->llwc_list);
469         OBD_FREE(llwc, sizeof(*llwc));
470 }
471
472 int
473 liblustre_check_events (int timeout)
474 {
475         ptl_event_t ev;
476         int         rc;
477         ENTRY;
478
479         if (timeout) {
480                 rc = PtlEQWait_timeout(ptlrpc_interfaces[0].pni_eq_h, &ev, timeout);
481         } else {
482                 rc = PtlEQGet (ptlrpc_interfaces[0].pni_eq_h, &ev);
483         }
484         if (rc == PTL_EQ_EMPTY)
485                 RETURN(0);
486         
487         LASSERT (rc == PTL_EQ_DROPPED || rc == PTL_OK);
488         
489 #ifndef __KERNEL__
490         /* liblustre: no asynch callback so we can't affort to miss any
491          * events... */
492         if (rc == PTL_EQ_DROPPED) {
493                 CERROR ("Dropped an event!!!\n");
494                 abort();
495         }
496         
497         ptlrpc_master_callback (&ev);
498 #endif
499         RETURN(1);
500 }
501
502 int
503 liblustre_wait_event (int timeout)
504 {
505         struct list_head               *tmp;
506         struct liblustre_wait_callback *llwc;
507         int                             found_something = 0;
508
509         /* First check for any new events */
510         if (liblustre_check_events(0))
511                 found_something = 1;
512
513         /* Now give all registered callbacks a bite at the cherry */
514         list_for_each(tmp, &liblustre_wait_callbacks) {
515                 llwc = list_entry(tmp, struct liblustre_wait_callback, 
516                                   llwc_list);
517                 
518                 if (llwc->llwc_fn(llwc->llwc_arg))
519                         found_something = 1;
520         }
521
522         /* return to caller if something happened */
523         if (found_something)
524                 return 1;
525         
526         /* block for an event, returning immediately on timeout */
527         if (!liblustre_check_events(timeout))
528                 return 0;
529
530         /* an event occurred; let all registered callbacks progress... */
531         list_for_each(tmp, &liblustre_wait_callbacks) {
532                 llwc = list_entry(tmp, struct liblustre_wait_callback, 
533                                   llwc_list);
534                 
535                 if (llwc->llwc_fn(llwc->llwc_arg))
536                         found_something = 1;
537         }
538
539         /* ...and tell caller something happened */
540         return 1;
541 }
542 #endif
543
544 int ptlrpc_init_portals(void)
545 {
546         /* Add new portals network interfaces here.
547          * Order is irrelevent! */
548         static struct {
549                 int   number;
550                 char *name;
551         } ptl_nis[] = {
552                 {QSWNAL,  "qswnal"},
553                 {SOCKNAL, "socknal"},
554                 {GMNAL,   "gmnal"},
555                 {IBNAL,   "ibnal"},
556                 {TCPNAL,  "tcpnal"},
557                 {SCIMACNAL, "scimacnal"}};
558         int   rc;
559         int   i;
560
561         LASSERT(ptlrpc_ninterfaces == 0);
562
563         for (i = 0; i < sizeof (ptl_nis) / sizeof (ptl_nis[0]); i++) {
564                 LASSERT(ptlrpc_ninterfaces < (sizeof(ptlrpc_interfaces) /
565                                               sizeof(ptlrpc_interfaces[0])));
566
567                 rc = ptlrpc_ni_init(ptl_nis[i].number, ptl_nis[i].name,
568                                     &ptlrpc_interfaces[ptlrpc_ninterfaces]);
569                 if (rc == 0)
570                         ptlrpc_ninterfaces++;
571         }
572
573         if (ptlrpc_ninterfaces == 0) {
574                 CERROR("network initialisation failed: is a NAL module "
575                        "loaded?\n");
576                 return -EIO;
577         }
578 #ifndef __KERNEL__
579         liblustre_services_callback = 
580                 liblustre_register_wait_callback(&liblustre_check_services, NULL);
581 #endif
582         return 0;
583 }
584
585 void ptlrpc_exit_portals(void)
586 {
587 #ifndef __KERNEL__
588         liblustre_deregister_wait_callback(liblustre_services_callback);
589 #endif
590         while (ptlrpc_ninterfaces > 0)
591                 ptlrpc_ni_fini (&ptlrpc_interfaces[--ptlrpc_ninterfaces]);
592 }