Whamcloud - gitweb
b=14471
[fs/lustre-release.git] / lustre / ptlrpc / events.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  *
24  */
25
26 #define DEBUG_SUBSYSTEM S_RPC
27
28 #ifndef __KERNEL__
29 # include <liblustre.h>
30 #else
31 # ifdef __mips64__
32 #  include <linux/kernel.h>
33 # endif
34 #endif
35 #include <obd_class.h>
36 #include <lustre_net.h>
37 #include <lustre_sec.h>
38 #include "ptlrpc_internal.h"
39
40 lnet_handle_eq_t   ptlrpc_eq_h;
41
42 /*
43  *  Client's outgoing request callback
44  */
45 void request_out_callback(lnet_event_t *ev)
46 {
47         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
48         struct ptlrpc_request *req = cbid->cbid_arg;
49         ENTRY;
50
51         LASSERT (ev->type == LNET_EVENT_SEND ||
52                  ev->type == LNET_EVENT_UNLINK);
53         LASSERT (ev->unlinked);
54
55         DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
56                   "type %d, status %d", ev->type, ev->status);
57
58         sptlrpc_request_out_callback(req);
59
60         if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) {
61
62                 /* Failed send: make it seem like the reply timed out, just
63                  * like failing sends in client.c does currently...  */
64
65                 spin_lock(&req->rq_lock);
66                 req->rq_net_err = 1;
67                 spin_unlock(&req->rq_lock);
68
69                 ptlrpc_wake_client_req(req);
70         }
71
72         /* these balance the references in ptl_send_rpc() */
73         atomic_dec(&req->rq_import->imp_inflight);
74         ptlrpc_req_finished(req);
75
76         EXIT;
77 }
78
79 /*
80  * Client's incoming reply callback
81  */
82 void reply_in_callback(lnet_event_t *ev)
83 {
84         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
85         struct ptlrpc_request *req = cbid->cbid_arg;
86         ENTRY;
87
88         LASSERT (ev->type == LNET_EVENT_PUT ||
89                  ev->type == LNET_EVENT_UNLINK);
90         LASSERT (ev->unlinked);
91         LASSERT (ev->md.start == req->rq_repbuf);
92         LASSERT (ev->offset == 0);
93         LASSERT (ev->mlength <= req->rq_repbuf_len);
94
95         DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
96                   "type %d, status %d", ev->type, ev->status);
97
98         spin_lock(&req->rq_lock);
99
100         LASSERT (req->rq_receiving_reply);
101         req->rq_receiving_reply = 0;
102
103         if (ev->type == LNET_EVENT_PUT && ev->status == 0) {
104                 req->rq_replied = 1;
105                 req->rq_nob_received = ev->mlength;
106         }
107
108         /* NB don't unlock till after wakeup; req can disappear under us
109          * since we don't have our own ref */
110         ptlrpc_wake_client_req(req);
111
112         spin_unlock(&req->rq_lock);
113         EXIT;
114 }
115
116 /*
117  * Client's bulk has been written/read
118  */
119 void client_bulk_callback (lnet_event_t *ev)
120 {
121         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
122         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
123         ENTRY;
124
125         LASSERT ((desc->bd_type == BULK_PUT_SINK &&
126                   ev->type == LNET_EVENT_PUT) ||
127                  (desc->bd_type == BULK_GET_SOURCE &&
128                   ev->type == LNET_EVENT_GET) ||
129                  ev->type == LNET_EVENT_UNLINK);
130         LASSERT (ev->unlinked);
131
132         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
133                "event type %d, status %d, desc %p\n",
134                ev->type, ev->status, desc);
135
136         spin_lock(&desc->bd_lock);
137
138         LASSERT(desc->bd_network_rw);
139         desc->bd_network_rw = 0;
140
141         if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
142                 desc->bd_success = 1;
143                 desc->bd_nob_transferred = ev->mlength;
144                 desc->bd_sender = ev->sender;
145         }
146
147         sptlrpc_enc_pool_put_pages(desc);
148
149         /* NB don't unlock till after wakeup; desc can disappear under us
150          * otherwise */
151         ptlrpc_wake_client_req(desc->bd_req);
152
153         spin_unlock(&desc->bd_lock);
154         EXIT;
155 }
156
157 /*
158  * Server's incoming request callback
159  */
160 void request_in_callback(lnet_event_t *ev)
161 {
162         struct ptlrpc_cb_id               *cbid = ev->md.user_ptr;
163         struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
164         struct ptlrpc_service             *service = rqbd->rqbd_service;
165         struct ptlrpc_request             *req;
166         ENTRY;
167
168         LASSERT (ev->type == LNET_EVENT_PUT ||
169                  ev->type == LNET_EVENT_UNLINK);
170         LASSERT ((char *)ev->md.start >= rqbd->rqbd_buffer);
171         LASSERT ((char *)ev->md.start + ev->offset + ev->mlength <=
172                  rqbd->rqbd_buffer + service->srv_buf_size);
173
174         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
175                "event type %d, status %d, service %s\n",
176                ev->type, ev->status, service->srv_name);
177
178         if (ev->unlinked) {
179                 /* If this is the last request message to fit in the
180                  * request buffer we can use the request object embedded in
181                  * rqbd.  Note that if we failed to allocate a request,
182                  * we'd have to re-post the rqbd, which we can't do in this
183                  * context. */
184                 req = &rqbd->rqbd_req;
185                 memset(req, 0, sizeof (*req));
186         } else {
187                 LASSERT (ev->type == LNET_EVENT_PUT);
188                 if (ev->status != 0) {
189                         /* We moaned above already... */
190                         return;
191                 }
192                 OBD_ALLOC_GFP(req, sizeof(*req), CFS_ALLOC_ATOMIC_TRY);
193                 if (req == NULL) {
194                         CERROR("Can't allocate incoming request descriptor: "
195                                "Dropping %s RPC from %s\n",
196                                service->srv_name,
197                                libcfs_id2str(ev->initiator));
198                         return;
199                 }
200         }
201
202         /* NB we ABSOLUTELY RELY on req being zeroed, so pointers are NULL,
203          * flags are reset and scalars are zero.  We only set the message
204          * size to non-zero if this was a successful receive. */
205         req->rq_xid = ev->match_bits;
206         req->rq_reqbuf = ev->md.start + ev->offset;
207         if (ev->type == LNET_EVENT_PUT && ev->status == 0)
208                 req->rq_reqdata_len = ev->mlength;
209         do_gettimeofday(&req->rq_arrival_time);
210         req->rq_peer = ev->initiator;
211         req->rq_self = ev->target.nid;
212         req->rq_rqbd = rqbd;
213         req->rq_phase = RQ_PHASE_NEW;
214 #ifdef CRAY_XT3
215         req->rq_uid = ev->uid;
216 #endif
217
218         CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer));
219
220         spin_lock(&service->srv_lock);
221
222         req->rq_history_seq = service->srv_request_seq++;
223         list_add_tail(&req->rq_history_list, &service->srv_request_history);
224
225         if (ev->unlinked) {
226                 service->srv_nrqbd_receiving--;
227                 CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
228                        service->srv_nrqbd_receiving);
229
230                 /* Normally, don't complain about 0 buffers posted; LNET won't
231                  * drop incoming reqs since we set the portal lazy */
232                 if (test_req_buffer_pressure &&
233                     ev->type != LNET_EVENT_UNLINK &&
234                     service->srv_nrqbd_receiving == 0)
235                         CWARN("All %s request buffers busy\n",
236                               service->srv_name);
237
238                 /* req takes over the network's ref on rqbd */
239         } else {
240                 /* req takes a ref on rqbd */
241                 rqbd->rqbd_refcount++;
242         }
243
244         list_add_tail(&req->rq_list, &service->srv_request_queue);
245         service->srv_n_queued_reqs++;
246
247         /* NB everything can disappear under us once the request
248          * has been queued and we unlock, so do the wake now... */
249         cfs_waitq_signal(&service->srv_waitq);
250
251         spin_unlock(&service->srv_lock);
252         EXIT;
253 }
254
255 /*
256  *  Server's outgoing reply callback
257  */
258 void reply_out_callback(lnet_event_t *ev)
259 {
260         struct ptlrpc_cb_id       *cbid = ev->md.user_ptr;
261         struct ptlrpc_reply_state *rs = cbid->cbid_arg;
262         struct ptlrpc_service     *svc = rs->rs_service;
263         ENTRY;
264
265         LASSERT (ev->type == LNET_EVENT_SEND ||
266                  ev->type == LNET_EVENT_ACK ||
267                  ev->type == LNET_EVENT_UNLINK);
268
269         if (!rs->rs_difficult) {
270                 /* 'Easy' replies have no further processing so I drop the
271                  * net's ref on 'rs' */
272                 LASSERT (ev->unlinked);
273                 ptlrpc_rs_decref(rs);
274                 atomic_dec (&svc->srv_outstanding_replies);
275                 EXIT;
276                 return;
277         }
278
279         LASSERT (rs->rs_on_net);
280
281         if (ev->unlinked) {
282                 /* Last network callback.  The net's ref on 'rs' stays put
283                  * until ptlrpc_server_handle_reply() is done with it */
284                 spin_lock(&svc->srv_lock);
285                 rs->rs_on_net = 0;
286                 ptlrpc_schedule_difficult_reply (rs);
287                 spin_unlock(&svc->srv_lock);
288         }
289
290         EXIT;
291 }
292
293 /*
294  * Server's bulk completion callback
295  */
296 void server_bulk_callback (lnet_event_t *ev)
297 {
298         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
299         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
300         ENTRY;
301
302         LASSERT (ev->type == LNET_EVENT_SEND ||
303                  ev->type == LNET_EVENT_UNLINK ||
304                  (desc->bd_type == BULK_PUT_SOURCE &&
305                   ev->type == LNET_EVENT_ACK) ||
306                  (desc->bd_type == BULK_GET_SINK &&
307                   ev->type == LNET_EVENT_REPLY));
308
309         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
310                "event type %d, status %d, desc %p\n",
311                ev->type, ev->status, desc);
312
313         spin_lock(&desc->bd_lock);
314
315         if ((ev->type == LNET_EVENT_ACK ||
316              ev->type == LNET_EVENT_REPLY) &&
317             ev->status == 0) {
318                 /* We heard back from the peer, so even if we get this
319                  * before the SENT event (oh yes we can), we know we
320                  * read/wrote the peer buffer and how much... */
321                 desc->bd_success = 1;
322                 desc->bd_nob_transferred = ev->mlength;
323                 desc->bd_sender = ev->sender;
324         }
325
326         if (ev->unlinked) {
327                 /* This is the last callback no matter what... */
328                 desc->bd_network_rw = 0;
329                 cfs_waitq_signal(&desc->bd_waitq);
330         }
331
332         spin_unlock(&desc->bd_lock);
333         EXIT;
334 }
335
336 static void ptlrpc_master_callback(lnet_event_t *ev)
337 {
338         struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
339         void (*callback)(lnet_event_t *ev) = cbid->cbid_fn;
340
341         /* Honestly, it's best to find out early. */
342         LASSERT (cbid->cbid_arg != LP_POISON);
343         LASSERT (callback == request_out_callback ||
344                  callback == reply_in_callback ||
345                  callback == client_bulk_callback ||
346                  callback == request_in_callback ||
347                  callback == reply_out_callback ||
348                  callback == server_bulk_callback);
349
350         callback (ev);
351 }
352
353 int ptlrpc_uuid_to_peer (struct obd_uuid *uuid,
354                          lnet_process_id_t *peer, lnet_nid_t *self)
355 {
356         int               best_dist = 0;
357         __u32             best_order = 0;
358         int               count = 0;
359         int               rc = -ENOENT;
360         int               portals_compatibility;
361         int               dist;
362         __u32             order;
363         lnet_nid_t        dst_nid;
364         lnet_nid_t        src_nid;
365
366         portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL);
367
368         peer->pid = LUSTRE_SRV_LNET_PID;
369
370         /* Choose the matching UUID that's closest */
371         while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) {
372                 dist = LNetDist(dst_nid, &src_nid, &order);
373                 if (dist < 0)
374                         continue;
375
376                 if (dist == 0) {                /* local! use loopback LND */
377                         peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0);
378                         rc = 0;
379                         break;
380                 }
381
382                 if (rc < 0 ||
383                     dist < best_dist ||
384                     (dist == best_dist && order < best_order)) {
385                         best_dist = dist;
386                         best_order = order;
387
388                         if (portals_compatibility > 1) {
389                                 /* Strong portals compatibility: Zero the nid's
390                                  * NET, so if I'm reading new config logs, or
391                                  * getting configured by (new) lconf I can
392                                  * still talk to old servers. */
393                                 dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid));
394                                 src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid));
395                         }
396                         peer->nid = dst_nid;
397                         *self = src_nid;
398                         rc = 0;
399                 }
400         }
401
402         CDEBUG(D_NET,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer));
403         if (rc != 0)
404                 CERROR("No NID found for %s\n", uuid->uuid);
405         return rc;
406 }
407
408 void ptlrpc_ni_fini(void)
409 {
410         cfs_waitq_t         waitq;
411         struct l_wait_info  lwi;
412         int                 rc;
413         int                 retries;
414
415         /* Wait for the event queue to become idle since there may still be
416          * messages in flight with pending events (i.e. the fire-and-forget
417          * messages == client requests and "non-difficult" server
418          * replies */
419
420         for (retries = 0;; retries++) {
421                 rc = LNetEQFree(ptlrpc_eq_h);
422                 switch (rc) {
423                 default:
424                         LBUG();
425
426                 case 0:
427                         LNetNIFini();
428                         return;
429
430                 case -EBUSY:
431                         if (retries != 0)
432                                 CWARN("Event queue still busy\n");
433
434                         /* Wait for a bit */
435                         cfs_waitq_init(&waitq);
436                         lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL);
437                         l_wait_event(waitq, 0, &lwi);
438                         break;
439                 }
440         }
441         /* notreached */
442 }
443
444 lnet_pid_t ptl_get_pid(void)
445 {
446         lnet_pid_t        pid;
447
448 #ifndef  __KERNEL__
449         pid = getpid();
450 #else
451         pid = LUSTRE_SRV_LNET_PID;
452 #endif
453         return pid;
454 }
455
456 int ptlrpc_ni_init(void)
457 {
458         int              rc;
459         lnet_pid_t       pid;
460
461         pid = ptl_get_pid();
462         CDEBUG(D_NET, "My pid is: %x\n", pid);
463
464         /* We're not passing any limits yet... */
465         rc = LNetNIInit(pid);
466         if (rc < 0) {
467                 CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
468                 return (-ENOENT);
469         }
470
471         /* CAVEAT EMPTOR: how we process portals events is _radically_
472          * different depending on... */
473 #ifdef __KERNEL__
474         /* kernel portals calls our master callback when events are added to
475          * the event queue.  In fact lustre never pulls events off this queue,
476          * so it's only sized for some debug history. */
477         rc = LNetEQAlloc(1024, ptlrpc_master_callback, &ptlrpc_eq_h);
478 #else
479         /* liblustre calls the master callback when it removes events from the
480          * event queue.  The event queue has to be big enough not to drop
481          * anything */
482         rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &ptlrpc_eq_h);
483 #endif
484         if (rc == 0)
485                 return 0;
486
487         CERROR ("Failed to allocate event queue: %d\n", rc);
488         LNetNIFini();
489
490         return (-ENOMEM);
491 }
492
493 #ifndef __KERNEL__
494 CFS_LIST_HEAD(liblustre_wait_callbacks);
495 CFS_LIST_HEAD(liblustre_idle_callbacks);
496 void *liblustre_services_callback;
497
498 void *
499 liblustre_register_waitidle_callback (struct list_head *callback_list,
500                                       const char *name,
501                                       int (*fn)(void *arg), void *arg)
502 {
503         struct liblustre_wait_callback *llwc;
504
505         OBD_ALLOC(llwc, sizeof(*llwc));
506         LASSERT (llwc != NULL);
507
508         llwc->llwc_name = name;
509         llwc->llwc_fn = fn;
510         llwc->llwc_arg = arg;
511         list_add_tail(&llwc->llwc_list, callback_list);
512
513         return (llwc);
514 }
515
516 void
517 liblustre_deregister_waitidle_callback (void *opaque)
518 {
519         struct liblustre_wait_callback *llwc = opaque;
520
521         list_del(&llwc->llwc_list);
522         OBD_FREE(llwc, sizeof(*llwc));
523 }
524
525 void *
526 liblustre_register_wait_callback (const char *name,
527                                   int (*fn)(void *arg), void *arg)
528 {
529         return liblustre_register_waitidle_callback(&liblustre_wait_callbacks,
530                                                     name, fn, arg);
531 }
532
533 void
534 liblustre_deregister_wait_callback (void *opaque)
535 {
536         liblustre_deregister_waitidle_callback(opaque);
537 }
538
539 void *
540 liblustre_register_idle_callback (const char *name,
541                                   int (*fn)(void *arg), void *arg)
542 {
543         return liblustre_register_waitidle_callback(&liblustre_idle_callbacks,
544                                                     name, fn, arg);
545 }
546
547 void
548 liblustre_deregister_idle_callback (void *opaque)
549 {
550         liblustre_deregister_waitidle_callback(opaque);
551 }
552
553 int
554 liblustre_check_events (int timeout)
555 {
556         lnet_event_t ev;
557         int         rc;
558         int         i;
559         ENTRY;
560
561         rc = LNetEQPoll(&ptlrpc_eq_h, 1, timeout * 1000, &ev, &i);
562         if (rc == 0)
563                 RETURN(0);
564
565         LASSERT (rc == -EOVERFLOW || rc == 1);
566
567         /* liblustre: no asynch callback so we can't affort to miss any
568          * events... */
569         if (rc == -EOVERFLOW) {
570                 CERROR ("Dropped an event!!!\n");
571                 abort();
572         }
573
574         ptlrpc_master_callback (&ev);
575         RETURN(1);
576 }
577
578 int liblustre_waiting = 0;
579
580 int
581 liblustre_wait_event (int timeout)
582 {
583         struct list_head               *tmp;
584         struct liblustre_wait_callback *llwc;
585         int                             found_something = 0;
586
587         /* single threaded recursion check... */
588         liblustre_waiting = 1;
589
590         for (;;) {
591                 /* Deal with all pending events */
592                 while (liblustre_check_events(0))
593                         found_something = 1;
594
595                 /* Give all registered callbacks a bite at the cherry */
596                 list_for_each(tmp, &liblustre_wait_callbacks) {
597                         llwc = list_entry(tmp, struct liblustre_wait_callback,
598                                           llwc_list);
599
600                         if (llwc->llwc_fn(llwc->llwc_arg))
601                                 found_something = 1;
602                 }
603
604                 if (found_something || timeout == 0)
605                         break;
606
607                 /* Nothing so far, but I'm allowed to block... */
608                 found_something = liblustre_check_events(timeout);
609                 if (!found_something)           /* still nothing */
610                         break;                  /* I timed out */
611         }
612
613         liblustre_waiting = 0;
614
615         return found_something;
616 }
617
618 void
619 liblustre_wait_idle(void)
620 {
621         static int recursed = 0;
622         
623         struct list_head               *tmp;
624         struct liblustre_wait_callback *llwc;
625         int                             idle = 0;
626
627         LASSERT(!recursed);
628         recursed = 1;
629         
630         do {
631                 liblustre_wait_event(0);
632
633                 idle = 1;
634
635                 list_for_each(tmp, &liblustre_idle_callbacks) {
636                         llwc = list_entry(tmp, struct liblustre_wait_callback,
637                                           llwc_list);
638                         
639                         if (!llwc->llwc_fn(llwc->llwc_arg)) {
640                                 idle = 0;
641                                 break;
642                         }
643                 }
644                         
645         } while (!idle);
646
647         recursed = 0;
648 }
649
650 #endif /* __KERNEL__ */
651
652 int ptlrpc_init_portals(void)
653 {
654         int   rc = ptlrpc_ni_init();
655
656         if (rc != 0) {
657                 CERROR("network initialisation failed\n");
658                 return -EIO;
659         }
660 #ifndef __KERNEL__
661         liblustre_services_callback =
662                 liblustre_register_wait_callback("liblustre_check_services",
663                                                  &liblustre_check_services,
664                                                  NULL);
665 #endif
666         rc = ptlrpcd_addref();
667         if (rc == 0)
668                 return 0;
669         
670         CERROR("rpcd initialisation failed\n");
671 #ifndef __KERNEL__
672         liblustre_deregister_wait_callback(liblustre_services_callback);
673 #endif
674         ptlrpc_ni_fini();
675         return rc;
676 }
677
678 void ptlrpc_exit_portals(void)
679 {
680 #ifndef __KERNEL__
681         liblustre_deregister_wait_callback(liblustre_services_callback);
682 #endif
683         ptlrpcd_decref();
684         ptlrpc_ni_fini();
685 }