Whamcloud - gitweb
d24c92d477de1b1a97f29f22cb949ca5a69c2cd6
[fs/lustre-release.git] / lustre / ptlrpc / events.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  *
24  */
25
26 #define DEBUG_SUBSYSTEM S_RPC
27
28 #ifndef __KERNEL__
29 # include <liblustre.h>
30 #else
31 # ifdef __mips64__
32 #  include <linux/kernel.h>
33 # endif
34 #endif
35 #include <obd_class.h>
36 #include <lustre_net.h>
37 #include <lustre_sec.h>
38 #include "ptlrpc_internal.h"
39
40 lnet_handle_eq_t   ptlrpc_eq_h;
41
42 /*
43  *  Client's outgoing request callback
44  */
45 void request_out_callback(lnet_event_t *ev)
46 {
47         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
48         struct ptlrpc_request *req = cbid->cbid_arg;
49         ENTRY;
50
51         LASSERT (ev->type == LNET_EVENT_SEND ||
52                  ev->type == LNET_EVENT_UNLINK);
53         LASSERT (ev->unlinked);
54
55         DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
56                   "type %d, status %d", ev->type, ev->status);
57
58         sptlrpc_request_out_callback(req);
59
60         if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) {
61
62                 /* Failed send: make it seem like the reply timed out, just
63                  * like failing sends in client.c does currently...  */
64
65                 spin_lock(&req->rq_lock);
66                 req->rq_net_err = 1;
67                 spin_unlock(&req->rq_lock);
68
69                 ptlrpc_wake_client_req(req);
70         }
71
72         ptlrpc_req_finished(req);
73
74         EXIT;
75 }
76
77 /*
78  * Client's incoming reply callback
79  */
80 void reply_in_callback(lnet_event_t *ev)
81 {
82         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
83         struct ptlrpc_request *req = cbid->cbid_arg;
84         ENTRY;
85
86         LASSERT (ev->type == LNET_EVENT_PUT ||
87                  ev->type == LNET_EVENT_UNLINK);
88         LASSERT (ev->unlinked);
89         LASSERT (ev->md.start == req->rq_repbuf);
90         LASSERT (ev->offset == 0);
91         LASSERT (ev->mlength <= req->rq_repbuf_len);
92
93         DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
94                   "type %d, status %d", ev->type, ev->status);
95
96         spin_lock(&req->rq_lock);
97
98         LASSERT (req->rq_receiving_reply);
99         req->rq_receiving_reply = 0;
100
101         if (ev->type == LNET_EVENT_PUT && ev->status == 0) {
102                 req->rq_replied = 1;
103                 req->rq_nob_received = ev->mlength;
104         }
105
106         /* NB don't unlock till after wakeup; req can disappear under us
107          * since we don't have our own ref */
108         ptlrpc_wake_client_req(req);
109
110         spin_unlock(&req->rq_lock);
111         EXIT;
112 }
113
114 /*
115  * Client's bulk has been written/read
116  */
117 void client_bulk_callback (lnet_event_t *ev)
118 {
119         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
120         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
121         ENTRY;
122
123         LASSERT ((desc->bd_type == BULK_PUT_SINK &&
124                   ev->type == LNET_EVENT_PUT) ||
125                  (desc->bd_type == BULK_GET_SOURCE &&
126                   ev->type == LNET_EVENT_GET) ||
127                  ev->type == LNET_EVENT_UNLINK);
128         LASSERT (ev->unlinked);
129
130         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
131                "event type %d, status %d, desc %p\n",
132                ev->type, ev->status, desc);
133
134         spin_lock(&desc->bd_lock);
135
136         LASSERT(desc->bd_network_rw);
137         desc->bd_network_rw = 0;
138
139         if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
140                 desc->bd_success = 1;
141                 desc->bd_nob_transferred = ev->mlength;
142                 desc->bd_sender = ev->sender;
143         }
144
145         sptlrpc_enc_pool_put_pages(desc);
146
147         /* NB don't unlock till after wakeup; desc can disappear under us
148          * otherwise */
149         ptlrpc_wake_client_req(desc->bd_req);
150
151         spin_unlock(&desc->bd_lock);
152         EXIT;
153 }
154
155 /*
156  * Server's incoming request callback
157  */
158 void request_in_callback(lnet_event_t *ev)
159 {
160         struct ptlrpc_cb_id               *cbid = ev->md.user_ptr;
161         struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
162         struct ptlrpc_service             *service = rqbd->rqbd_service;
163         struct ptlrpc_request             *req;
164         ENTRY;
165
166         LASSERT (ev->type == LNET_EVENT_PUT ||
167                  ev->type == LNET_EVENT_UNLINK);
168         LASSERT ((char *)ev->md.start >= rqbd->rqbd_buffer);
169         LASSERT ((char *)ev->md.start + ev->offset + ev->mlength <=
170                  rqbd->rqbd_buffer + service->srv_buf_size);
171
172         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
173                "event type %d, status %d, service %s\n",
174                ev->type, ev->status, service->srv_name);
175
176         if (ev->unlinked) {
177                 /* If this is the last request message to fit in the
178                  * request buffer we can use the request object embedded in
179                  * rqbd.  Note that if we failed to allocate a request,
180                  * we'd have to re-post the rqbd, which we can't do in this
181                  * context. */
182                 req = &rqbd->rqbd_req;
183                 memset(req, 0, sizeof (*req));
184         } else {
185                 LASSERT (ev->type == LNET_EVENT_PUT);
186                 if (ev->status != 0) {
187                         /* We moaned above already... */
188                         return;
189                 }
190                 OBD_ALLOC_GFP(req, sizeof(*req), CFS_ALLOC_ATOMIC_TRY);
191                 if (req == NULL) {
192                         CERROR("Can't allocate incoming request descriptor: "
193                                "Dropping %s RPC from %s\n",
194                                service->srv_name,
195                                libcfs_id2str(ev->initiator));
196                         return;
197                 }
198         }
199
200         /* NB we ABSOLUTELY RELY on req being zeroed, so pointers are NULL,
201          * flags are reset and scalars are zero.  We only set the message
202          * size to non-zero if this was a successful receive. */
203         req->rq_xid = ev->match_bits;
204         req->rq_reqbuf = ev->md.start + ev->offset;
205         if (ev->type == LNET_EVENT_PUT && ev->status == 0)
206                 req->rq_reqdata_len = ev->mlength;
207         do_gettimeofday(&req->rq_arrival_time);
208         req->rq_peer = ev->initiator;
209         req->rq_self = ev->target.nid;
210         req->rq_rqbd = rqbd;
211         req->rq_phase = RQ_PHASE_NEW;
212 #ifdef CRAY_XT3
213         req->rq_uid = ev->uid;
214 #endif
215
216         CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer));
217
218         spin_lock(&service->srv_lock);
219
220         req->rq_history_seq = service->srv_request_seq++;
221         list_add_tail(&req->rq_history_list, &service->srv_request_history);
222
223         if (ev->unlinked) {
224                 service->srv_nrqbd_receiving--;
225                 CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
226                        service->srv_nrqbd_receiving);
227
228                 /* Normally, don't complain about 0 buffers posted; LNET won't
229                  * drop incoming reqs since we set the portal lazy */
230                 if (test_req_buffer_pressure &&
231                     ev->type != LNET_EVENT_UNLINK &&
232                     service->srv_nrqbd_receiving == 0)
233                         CWARN("All %s request buffers busy\n",
234                               service->srv_name);
235
236                 /* req takes over the network's ref on rqbd */
237         } else {
238                 /* req takes a ref on rqbd */
239                 rqbd->rqbd_refcount++;
240         }
241
242         list_add_tail(&req->rq_list, &service->srv_request_queue);
243         service->srv_n_queued_reqs++;
244
245         /* NB everything can disappear under us once the request
246          * has been queued and we unlock, so do the wake now... */
247         cfs_waitq_signal(&service->srv_waitq);
248
249         spin_unlock(&service->srv_lock);
250         EXIT;
251 }
252
253 /*
254  *  Server's outgoing reply callback
255  */
256 void reply_out_callback(lnet_event_t *ev)
257 {
258         struct ptlrpc_cb_id       *cbid = ev->md.user_ptr;
259         struct ptlrpc_reply_state *rs = cbid->cbid_arg;
260         struct ptlrpc_service     *svc = rs->rs_service;
261         ENTRY;
262
263         LASSERT (ev->type == LNET_EVENT_SEND ||
264                  ev->type == LNET_EVENT_ACK ||
265                  ev->type == LNET_EVENT_UNLINK);
266
267         if (!rs->rs_difficult) {
268                 /* 'Easy' replies have no further processing so I drop the
269                  * net's ref on 'rs' */
270                 LASSERT (ev->unlinked);
271                 ptlrpc_rs_decref(rs);
272                 atomic_dec (&svc->srv_outstanding_replies);
273                 EXIT;
274                 return;
275         }
276
277         LASSERT (rs->rs_on_net);
278
279         if (ev->unlinked) {
280                 /* Last network callback.  The net's ref on 'rs' stays put
281                  * until ptlrpc_server_handle_reply() is done with it */
282                 spin_lock(&svc->srv_lock);
283                 rs->rs_on_net = 0;
284                 ptlrpc_schedule_difficult_reply (rs);
285                 spin_unlock(&svc->srv_lock);
286         }
287
288         EXIT;
289 }
290
291 /*
292  * Server's bulk completion callback
293  */
294 void server_bulk_callback (lnet_event_t *ev)
295 {
296         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
297         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
298         ENTRY;
299
300         LASSERT (ev->type == LNET_EVENT_SEND ||
301                  ev->type == LNET_EVENT_UNLINK ||
302                  (desc->bd_type == BULK_PUT_SOURCE &&
303                   ev->type == LNET_EVENT_ACK) ||
304                  (desc->bd_type == BULK_GET_SINK &&
305                   ev->type == LNET_EVENT_REPLY));
306
307         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
308                "event type %d, status %d, desc %p\n",
309                ev->type, ev->status, desc);
310
311         spin_lock(&desc->bd_lock);
312
313         if ((ev->type == LNET_EVENT_ACK ||
314              ev->type == LNET_EVENT_REPLY) &&
315             ev->status == 0) {
316                 /* We heard back from the peer, so even if we get this
317                  * before the SENT event (oh yes we can), we know we
318                  * read/wrote the peer buffer and how much... */
319                 desc->bd_success = 1;
320                 desc->bd_nob_transferred = ev->mlength;
321                 desc->bd_sender = ev->sender;
322         }
323
324         if (ev->unlinked) {
325                 /* This is the last callback no matter what... */
326                 desc->bd_network_rw = 0;
327                 cfs_waitq_signal(&desc->bd_waitq);
328         }
329
330         spin_unlock(&desc->bd_lock);
331         EXIT;
332 }
333
334 static void ptlrpc_master_callback(lnet_event_t *ev)
335 {
336         struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
337         void (*callback)(lnet_event_t *ev) = cbid->cbid_fn;
338
339         /* Honestly, it's best to find out early. */
340         LASSERT (cbid->cbid_arg != LP_POISON);
341         LASSERT (callback == request_out_callback ||
342                  callback == reply_in_callback ||
343                  callback == client_bulk_callback ||
344                  callback == request_in_callback ||
345                  callback == reply_out_callback ||
346                  callback == server_bulk_callback);
347
348         callback (ev);
349 }
350
351 int ptlrpc_uuid_to_peer (struct obd_uuid *uuid,
352                          lnet_process_id_t *peer, lnet_nid_t *self)
353 {
354         int               best_dist = 0;
355         __u32             best_order = 0;
356         int               count = 0;
357         int               rc = -ENOENT;
358         int               portals_compatibility;
359         int               dist;
360         __u32             order;
361         lnet_nid_t        dst_nid;
362         lnet_nid_t        src_nid;
363
364         portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL);
365
366         peer->pid = LUSTRE_SRV_LNET_PID;
367
368         /* Choose the matching UUID that's closest */
369         while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) {
370                 dist = LNetDist(dst_nid, &src_nid, &order);
371                 if (dist < 0)
372                         continue;
373
374                 if (dist == 0) {                /* local! use loopback LND */
375                         peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0);
376                         rc = 0;
377                         break;
378                 }
379
380                 if (rc < 0 ||
381                     dist < best_dist ||
382                     (dist == best_dist && order < best_order)) {
383                         best_dist = dist;
384                         best_order = order;
385
386                         if (portals_compatibility > 1) {
387                                 /* Strong portals compatibility: Zero the nid's
388                                  * NET, so if I'm reading new config logs, or
389                                  * getting configured by (new) lconf I can
390                                  * still talk to old servers. */
391                                 dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid));
392                                 src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid));
393                         }
394                         peer->nid = dst_nid;
395                         *self = src_nid;
396                         rc = 0;
397                 }
398         }
399
400         CDEBUG(D_NET,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer));
401         if (rc != 0)
402                 CERROR("No NID found for %s\n", uuid->uuid);
403         return rc;
404 }
405
406 void ptlrpc_ni_fini(void)
407 {
408         cfs_waitq_t         waitq;
409         struct l_wait_info  lwi;
410         int                 rc;
411         int                 retries;
412
413         /* Wait for the event queue to become idle since there may still be
414          * messages in flight with pending events (i.e. the fire-and-forget
415          * messages == client requests and "non-difficult" server
416          * replies */
417
418         for (retries = 0;; retries++) {
419                 rc = LNetEQFree(ptlrpc_eq_h);
420                 switch (rc) {
421                 default:
422                         LBUG();
423
424                 case 0:
425                         LNetNIFini();
426                         return;
427
428                 case -EBUSY:
429                         if (retries != 0)
430                                 CWARN("Event queue still busy\n");
431
432                         /* Wait for a bit */
433                         cfs_waitq_init(&waitq);
434                         lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL);
435                         l_wait_event(waitq, 0, &lwi);
436                         break;
437                 }
438         }
439         /* notreached */
440 }
441
442 lnet_pid_t ptl_get_pid(void)
443 {
444         lnet_pid_t        pid;
445
446 #ifndef  __KERNEL__
447         pid = getpid();
448 #else
449         pid = LUSTRE_SRV_LNET_PID;
450 #endif
451         return pid;
452 }
453
454 int ptlrpc_ni_init(void)
455 {
456         int              rc;
457         lnet_pid_t       pid;
458
459         pid = ptl_get_pid();
460         CDEBUG(D_NET, "My pid is: %x\n", pid);
461
462         /* We're not passing any limits yet... */
463         rc = LNetNIInit(pid);
464         if (rc < 0) {
465                 CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
466                 return (-ENOENT);
467         }
468
469         /* CAVEAT EMPTOR: how we process portals events is _radically_
470          * different depending on... */
471 #ifdef __KERNEL__
472         /* kernel portals calls our master callback when events are added to
473          * the event queue.  In fact lustre never pulls events off this queue,
474          * so it's only sized for some debug history. */
475         rc = LNetEQAlloc(1024, ptlrpc_master_callback, &ptlrpc_eq_h);
476 #else
477         /* liblustre calls the master callback when it removes events from the
478          * event queue.  The event queue has to be big enough not to drop
479          * anything */
480         rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &ptlrpc_eq_h);
481 #endif
482         if (rc == 0)
483                 return 0;
484
485         CERROR ("Failed to allocate event queue: %d\n", rc);
486         LNetNIFini();
487
488         return (-ENOMEM);
489 }
490
491 #ifndef __KERNEL__
492 CFS_LIST_HEAD(liblustre_wait_callbacks);
493 CFS_LIST_HEAD(liblustre_idle_callbacks);
494 void *liblustre_services_callback;
495
496 void *
497 liblustre_register_waitidle_callback (struct list_head *callback_list,
498                                       const char *name,
499                                       int (*fn)(void *arg), void *arg)
500 {
501         struct liblustre_wait_callback *llwc;
502
503         OBD_ALLOC(llwc, sizeof(*llwc));
504         LASSERT (llwc != NULL);
505
506         llwc->llwc_name = name;
507         llwc->llwc_fn = fn;
508         llwc->llwc_arg = arg;
509         list_add_tail(&llwc->llwc_list, callback_list);
510
511         return (llwc);
512 }
513
514 void
515 liblustre_deregister_waitidle_callback (void *opaque)
516 {
517         struct liblustre_wait_callback *llwc = opaque;
518
519         list_del(&llwc->llwc_list);
520         OBD_FREE(llwc, sizeof(*llwc));
521 }
522
523 void *
524 liblustre_register_wait_callback (const char *name,
525                                   int (*fn)(void *arg), void *arg)
526 {
527         return liblustre_register_waitidle_callback(&liblustre_wait_callbacks,
528                                                     name, fn, arg);
529 }
530
531 void
532 liblustre_deregister_wait_callback (void *opaque)
533 {
534         liblustre_deregister_waitidle_callback(opaque);
535 }
536
537 void *
538 liblustre_register_idle_callback (const char *name,
539                                   int (*fn)(void *arg), void *arg)
540 {
541         return liblustre_register_waitidle_callback(&liblustre_idle_callbacks,
542                                                     name, fn, arg);
543 }
544
545 void
546 liblustre_deregister_idle_callback (void *opaque)
547 {
548         liblustre_deregister_waitidle_callback(opaque);
549 }
550
551 int
552 liblustre_check_events (int timeout)
553 {
554         lnet_event_t ev;
555         int         rc;
556         int         i;
557         ENTRY;
558
559         rc = LNetEQPoll(&ptlrpc_eq_h, 1, timeout * 1000, &ev, &i);
560         if (rc == 0)
561                 RETURN(0);
562
563         LASSERT (rc == -EOVERFLOW || rc == 1);
564
565         /* liblustre: no asynch callback so we can't affort to miss any
566          * events... */
567         if (rc == -EOVERFLOW) {
568                 CERROR ("Dropped an event!!!\n");
569                 abort();
570         }
571
572         ptlrpc_master_callback (&ev);
573         RETURN(1);
574 }
575
576 int liblustre_waiting = 0;
577
578 int
579 liblustre_wait_event (int timeout)
580 {
581         struct list_head               *tmp;
582         struct liblustre_wait_callback *llwc;
583         int                             found_something = 0;
584
585         /* single threaded recursion check... */
586         liblustre_waiting = 1;
587
588         for (;;) {
589                 /* Deal with all pending events */
590                 while (liblustre_check_events(0))
591                         found_something = 1;
592
593                 /* Give all registered callbacks a bite at the cherry */
594                 list_for_each(tmp, &liblustre_wait_callbacks) {
595                         llwc = list_entry(tmp, struct liblustre_wait_callback,
596                                           llwc_list);
597
598                         if (llwc->llwc_fn(llwc->llwc_arg))
599                                 found_something = 1;
600                 }
601
602                 if (found_something || timeout == 0)
603                         break;
604
605                 /* Nothing so far, but I'm allowed to block... */
606                 found_something = liblustre_check_events(timeout);
607                 if (!found_something)           /* still nothing */
608                         break;                  /* I timed out */
609         }
610
611         liblustre_waiting = 0;
612
613         return found_something;
614 }
615
616 void
617 liblustre_wait_idle(void)
618 {
619         static int recursed = 0;
620         
621         struct list_head               *tmp;
622         struct liblustre_wait_callback *llwc;
623         int                             idle = 0;
624
625         LASSERT(!recursed);
626         recursed = 1;
627         
628         do {
629                 liblustre_wait_event(0);
630
631                 idle = 1;
632
633                 list_for_each(tmp, &liblustre_idle_callbacks) {
634                         llwc = list_entry(tmp, struct liblustre_wait_callback,
635                                           llwc_list);
636                         
637                         if (!llwc->llwc_fn(llwc->llwc_arg)) {
638                                 idle = 0;
639                                 break;
640                         }
641                 }
642                         
643         } while (!idle);
644
645         recursed = 0;
646 }
647
648 #endif /* __KERNEL__ */
649
650 int ptlrpc_init_portals(void)
651 {
652         int   rc = ptlrpc_ni_init();
653
654         if (rc != 0) {
655                 CERROR("network initialisation failed\n");
656                 return -EIO;
657         }
658 #ifndef __KERNEL__
659         liblustre_services_callback =
660                 liblustre_register_wait_callback("liblustre_check_services",
661                                                  &liblustre_check_services,
662                                                  NULL);
663 #endif
664         rc = ptlrpcd_addref();
665         if (rc == 0)
666                 return 0;
667         
668         CERROR("rpcd initialisation failed\n");
669 #ifndef __KERNEL__
670         liblustre_deregister_wait_callback(liblustre_services_callback);
671 #endif
672         ptlrpc_ni_fini();
673         return rc;
674 }
675
676 void ptlrpc_exit_portals(void)
677 {
678 #ifndef __KERNEL__
679         liblustre_deregister_wait_callback(liblustre_services_callback);
680 #endif
681         ptlrpcd_decref();
682         ptlrpc_ni_fini();
683 }