Whamcloud - gitweb
41352f1f79d4b21feb52b9c7bbe31987f4df40d8
[fs/lustre-release.git] / lustre / ptlrpc / events.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  *
24  */
25
26 #define DEBUG_SUBSYSTEM S_RPC
27
28 #ifndef __KERNEL__
29 # include <liblustre.h>
30 #else
31 # ifdef __mips64__
32 #  include <linux/kernel.h>
33 # endif
34 #endif
35 #include <obd_class.h>
36 #include <lustre_net.h>
37 #include <lustre_sec.h>
38 #include "ptlrpc_internal.h"
39
40 lnet_handle_eq_t   ptlrpc_eq_h;
41
42 /*
43  *  Client's outgoing request callback
44  */
45 void request_out_callback(lnet_event_t *ev)
46 {
47         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
48         struct ptlrpc_request *req = cbid->cbid_arg;
49         ENTRY;
50
51         LASSERT (ev->type == LNET_EVENT_SEND ||
52                  ev->type == LNET_EVENT_UNLINK);
53         LASSERT (ev->unlinked);
54
55         DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
56                   "type %d, status %d", ev->type, ev->status);
57
58         sptlrpc_request_out_callback(req);
59
60         if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) {
61
62                 /* Failed send: make it seem like the reply timed out, just
63                  * like failing sends in client.c does currently...  */
64
65                 spin_lock(&req->rq_lock);
66                 req->rq_net_err = 1;
67                 spin_unlock(&req->rq_lock);
68
69                 ptlrpc_wake_client_req(req);
70         }
71
72         /* these balance the references in ptl_send_rpc() */
73         atomic_dec(&req->rq_import->imp_inflight);
74         ptlrpc_req_finished(req);
75
76         EXIT;
77 }
78
79 /*
80  * Client's incoming reply callback
81  */
82 void reply_in_callback(lnet_event_t *ev)
83 {
84         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
85         struct ptlrpc_request *req = cbid->cbid_arg;
86         ENTRY;
87
88         LASSERT (ev->type == LNET_EVENT_PUT ||
89                  ev->type == LNET_EVENT_UNLINK);
90         LASSERT (ev->unlinked);
91         LASSERT (ev->md.start == req->rq_repbuf);
92         LASSERT (ev->offset == 0);
93         LASSERT (ev->mlength <= req->rq_repbuf_len);
94
95         DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
96                   "type %d, status %d", ev->type, ev->status);
97
98         spin_lock(&req->rq_lock);
99
100         LASSERT (req->rq_receiving_reply);
101         req->rq_receiving_reply = 0;
102
103         if (ev->type == LNET_EVENT_PUT && ev->status == 0) {
104                 req->rq_replied = 1;
105                 req->rq_nob_received = ev->mlength;
106         }
107
108         /* NB don't unlock till after wakeup; req can disappear under us
109          * since we don't have our own ref */
110         ptlrpc_wake_client_req(req);
111
112         spin_unlock(&req->rq_lock);
113         EXIT;
114 }
115
116 /*
117  * Client's bulk has been written/read
118  */
119 void client_bulk_callback (lnet_event_t *ev)
120 {
121         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
122         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
123         ENTRY;
124
125         LASSERT ((desc->bd_type == BULK_PUT_SINK &&
126                   ev->type == LNET_EVENT_PUT) ||
127                  (desc->bd_type == BULK_GET_SOURCE &&
128                   ev->type == LNET_EVENT_GET) ||
129                  ev->type == LNET_EVENT_UNLINK);
130         LASSERT (ev->unlinked);
131
132         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
133                "event type %d, status %d, desc %p\n",
134                ev->type, ev->status, desc);
135
136         spin_lock(&desc->bd_lock);
137
138         LASSERT(desc->bd_network_rw);
139         desc->bd_network_rw = 0;
140
141         if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
142                 desc->bd_success = 1;
143                 desc->bd_nob_transferred = ev->mlength;
144                 desc->bd_sender = ev->sender;
145         }
146
147         sptlrpc_enc_pool_put_pages(desc);
148
149         /* NB don't unlock till after wakeup; desc can disappear under us
150          * otherwise */
151         ptlrpc_wake_client_req(desc->bd_req);
152
153         spin_unlock(&desc->bd_lock);
154         EXIT;
155 }
156
157 /*
158  * Server's incoming request callback
159  */
160 void request_in_callback(lnet_event_t *ev)
161 {
162         struct ptlrpc_cb_id               *cbid = ev->md.user_ptr;
163         struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
164         struct ptlrpc_service             *service = rqbd->rqbd_service;
165         struct ptlrpc_request             *req;
166         ENTRY;
167
168         LASSERT (ev->type == LNET_EVENT_PUT ||
169                  ev->type == LNET_EVENT_UNLINK);
170         LASSERT ((char *)ev->md.start >= rqbd->rqbd_buffer);
171         LASSERT ((char *)ev->md.start + ev->offset + ev->mlength <=
172                  rqbd->rqbd_buffer + service->srv_buf_size);
173
174         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
175                "event type %d, status %d, service %s\n",
176                ev->type, ev->status, service->srv_name);
177
178         if (ev->unlinked) {
179                 /* If this is the last request message to fit in the
180                  * request buffer we can use the request object embedded in
181                  * rqbd.  Note that if we failed to allocate a request,
182                  * we'd have to re-post the rqbd, which we can't do in this
183                  * context. */
184                 req = &rqbd->rqbd_req;
185                 memset(req, 0, sizeof (*req));
186         } else {
187                 LASSERT (ev->type == LNET_EVENT_PUT);
188                 if (ev->status != 0) {
189                         /* We moaned above already... */
190                         return;
191                 }
192                 OBD_ALLOC_GFP(req, sizeof(*req), CFS_ALLOC_ATOMIC_TRY);
193                 if (req == NULL) {
194                         CERROR("Can't allocate incoming request descriptor: "
195                                "Dropping %s RPC from %s\n",
196                                service->srv_name,
197                                libcfs_id2str(ev->initiator));
198                         return;
199                 }
200         }
201
202         /* NB we ABSOLUTELY RELY on req being zeroed, so pointers are NULL,
203          * flags are reset and scalars are zero.  We only set the message
204          * size to non-zero if this was a successful receive. */
205         req->rq_xid = ev->match_bits;
206         req->rq_reqbuf = ev->md.start + ev->offset;
207         if (ev->type == LNET_EVENT_PUT && ev->status == 0)
208                 req->rq_reqdata_len = ev->mlength;
209         do_gettimeofday(&req->rq_arrival_time);
210         req->rq_peer = ev->initiator;
211         req->rq_self = ev->target.nid;
212         req->rq_rqbd = rqbd;
213         req->rq_phase = RQ_PHASE_NEW;
214 #ifdef CRAY_XT3
215         req->rq_uid = ev->uid;
216 #endif
217
218         CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer));
219
220         spin_lock(&service->srv_lock);
221
222         req->rq_history_seq = service->srv_request_seq++;
223         list_add_tail(&req->rq_history_list, &service->srv_request_history);
224
225         if (ev->unlinked) {
226                 service->srv_nrqbd_receiving--;
227                 CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
228                        service->srv_nrqbd_receiving);
229
230                 /* Normally, don't complain about 0 buffers posted; LNET won't
231                  * drop incoming reqs since we set the portal lazy */
232                 if (test_req_buffer_pressure &&
233                     ev->type != LNET_EVENT_UNLINK &&
234                     service->srv_nrqbd_receiving == 0)
235                         CWARN("All %s request buffers busy\n",
236                               service->srv_name);
237
238                 /* req takes over the network's ref on rqbd */
239         } else {
240                 /* req takes a ref on rqbd */
241                 rqbd->rqbd_refcount++;
242         }
243
244         list_add_tail(&req->rq_list, &service->srv_request_queue);
245         service->srv_n_queued_reqs++;
246
247         /* NB everything can disappear under us once the request
248          * has been queued and we unlock, so do the wake now... */
249         cfs_waitq_signal(&service->srv_waitq);
250
251         spin_unlock(&service->srv_lock);
252         EXIT;
253 }
254
255 /*
256  *  Server's outgoing reply callback
257  */
258 void reply_out_callback(lnet_event_t *ev)
259 {
260         struct ptlrpc_cb_id       *cbid = ev->md.user_ptr;
261         struct ptlrpc_reply_state *rs = cbid->cbid_arg;
262         struct ptlrpc_service     *svc = rs->rs_service;
263         ENTRY;
264
265         LASSERT (ev->type == LNET_EVENT_SEND ||
266                  ev->type == LNET_EVENT_ACK ||
267                  ev->type == LNET_EVENT_UNLINK);
268
269         if (!rs->rs_difficult) {
270                 /* 'Easy' replies have no further processing so I drop the
271                  * net's ref on 'rs' */
272                 LASSERT (ev->unlinked);
273                 ptlrpc_rs_decref(rs);
274                 atomic_dec (&svc->srv_outstanding_replies);
275                 EXIT;
276                 return;
277         }
278
279         LASSERT (rs->rs_on_net);
280
281         if (ev->unlinked) {
282                 /* Last network callback.  The net's ref on 'rs' stays put
283                  * until ptlrpc_server_handle_reply() is done with it */
284                 spin_lock(&svc->srv_lock);
285                 rs->rs_on_net = 0;
286                 ptlrpc_schedule_difficult_reply (rs);
287                 spin_unlock(&svc->srv_lock);
288         }
289
290         EXIT;
291 }
292
293 /*
294  * Server's bulk completion callback
295  */
296 void server_bulk_callback (lnet_event_t *ev)
297 {
298         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
299         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
300         ENTRY;
301
302         LASSERT (ev->type == LNET_EVENT_SEND ||
303                  ev->type == LNET_EVENT_UNLINK ||
304                  (desc->bd_type == BULK_PUT_SOURCE &&
305                   ev->type == LNET_EVENT_ACK) ||
306                  (desc->bd_type == BULK_GET_SINK &&
307                   ev->type == LNET_EVENT_REPLY));
308
309         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
310                "event type %d, status %d, desc %p\n",
311                ev->type, ev->status, desc);
312
313         spin_lock(&desc->bd_lock);
314
315         if ((ev->type == LNET_EVENT_ACK ||
316              ev->type == LNET_EVENT_REPLY) &&
317             ev->status == 0) {
318                 /* We heard back from the peer, so even if we get this
319                  * before the SENT event (oh yes we can), we know we
320                  * read/wrote the peer buffer and how much... */
321                 desc->bd_success = 1;
322                 desc->bd_nob_transferred = ev->mlength;
323                 desc->bd_sender = ev->sender;
324         }
325
326         if (ev->unlinked) {
327                 /* This is the last callback no matter what... */
328                 desc->bd_network_rw = 0;
329                 cfs_waitq_signal(&desc->bd_waitq);
330         }
331
332         spin_unlock(&desc->bd_lock);
333         EXIT;
334 }
335
336 static void ptlrpc_master_callback(lnet_event_t *ev)
337 {
338         struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
339         void (*callback)(lnet_event_t *ev) = cbid->cbid_fn;
340
341         /* Honestly, it's best to find out early. */
342         LASSERT (cbid->cbid_arg != LP_POISON);
343         LASSERT (callback == request_out_callback ||
344                  callback == reply_in_callback ||
345                  callback == client_bulk_callback ||
346                  callback == request_in_callback ||
347                  callback == reply_out_callback ||
348                  callback == server_bulk_callback);
349
350         callback (ev);
351 }
352
353 int ptlrpc_uuid_to_peer (struct obd_uuid *uuid,
354                          lnet_process_id_t *peer, lnet_nid_t *self)
355 {
356         int               best_dist = 0;
357         int               best_order = 0;
358         int               count = 0;
359         int               rc = -ENOENT;
360         int               portals_compatibility;
361         int               dist;
362         int               order;
363         lnet_nid_t        dst_nid;
364         lnet_nid_t        src_nid;
365
366         portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL);
367
368         peer->pid = LUSTRE_SRV_LNET_PID;
369
370         /* Choose the matching UUID that's closest */
371         while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) {
372                 dist = LNetDist(dst_nid, &src_nid, &order);
373                 if (dist < 0)
374                         continue;
375
376                 if (dist == 0) {                /* local! use loopback LND */
377                         peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0);
378                         rc = 0;
379                         break;
380                 }
381
382                 LASSERT (order >= 0);
383                 if (rc < 0 ||
384                     dist < best_dist ||
385                     (dist == best_dist && order < best_order)) {
386                         best_dist = dist;
387                         best_order = order;
388
389                         if (portals_compatibility > 1) {
390                                 /* Strong portals compatibility: Zero the nid's
391                                  * NET, so if I'm reading new config logs, or
392                                  * getting configured by (new) lconf I can
393                                  * still talk to old servers. */
394                                 dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid));
395                                 src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid));
396                         }
397                         peer->nid = dst_nid;
398                         *self = src_nid;
399                         rc = 0;
400                 }
401         }
402
403         CDEBUG(D_NET,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer));
404         if (rc != 0)
405                 CERROR("No NID found for %s\n", uuid->uuid);
406         return rc;
407 }
408
409 void ptlrpc_ni_fini(void)
410 {
411         cfs_waitq_t         waitq;
412         struct l_wait_info  lwi;
413         int                 rc;
414         int                 retries;
415
416         /* Wait for the event queue to become idle since there may still be
417          * messages in flight with pending events (i.e. the fire-and-forget
418          * messages == client requests and "non-difficult" server
419          * replies */
420
421         for (retries = 0;; retries++) {
422                 rc = LNetEQFree(ptlrpc_eq_h);
423                 switch (rc) {
424                 default:
425                         LBUG();
426
427                 case 0:
428                         LNetNIFini();
429                         return;
430
431                 case -EBUSY:
432                         if (retries != 0)
433                                 CWARN("Event queue still busy\n");
434
435                         /* Wait for a bit */
436                         cfs_waitq_init(&waitq);
437                         lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL);
438                         l_wait_event(waitq, 0, &lwi);
439                         break;
440                 }
441         }
442         /* notreached */
443 }
444
445 lnet_pid_t ptl_get_pid(void)
446 {
447         lnet_pid_t        pid;
448
449 #ifndef  __KERNEL__
450         pid = getpid();
451 #else
452         pid = LUSTRE_SRV_LNET_PID;
453 #endif
454         return pid;
455 }
456
457 int ptlrpc_ni_init(void)
458 {
459         int              rc;
460         lnet_pid_t       pid;
461
462         pid = ptl_get_pid();
463         CDEBUG(D_NET, "My pid is: %x\n", pid);
464
465         /* We're not passing any limits yet... */
466         rc = LNetNIInit(pid);
467         if (rc < 0) {
468                 CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
469                 return (-ENOENT);
470         }
471
472         /* CAVEAT EMPTOR: how we process portals events is _radically_
473          * different depending on... */
474 #ifdef __KERNEL__
475         /* kernel portals calls our master callback when events are added to
476          * the event queue.  In fact lustre never pulls events off this queue,
477          * so it's only sized for some debug history. */
478         rc = LNetEQAlloc(1024, ptlrpc_master_callback, &ptlrpc_eq_h);
479 #else
480         /* liblustre calls the master callback when it removes events from the
481          * event queue.  The event queue has to be big enough not to drop
482          * anything */
483         rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &ptlrpc_eq_h);
484 #endif
485         if (rc == 0)
486                 return 0;
487
488         CERROR ("Failed to allocate event queue: %d\n", rc);
489         LNetNIFini();
490
491         return (-ENOMEM);
492 }
493
494 #ifndef __KERNEL__
495 CFS_LIST_HEAD(liblustre_wait_callbacks);
496 CFS_LIST_HEAD(liblustre_idle_callbacks);
497 void *liblustre_services_callback;
498
499 void *
500 liblustre_register_waitidle_callback (struct list_head *callback_list,
501                                       const char *name,
502                                       int (*fn)(void *arg), void *arg)
503 {
504         struct liblustre_wait_callback *llwc;
505
506         OBD_ALLOC(llwc, sizeof(*llwc));
507         LASSERT (llwc != NULL);
508
509         llwc->llwc_name = name;
510         llwc->llwc_fn = fn;
511         llwc->llwc_arg = arg;
512         list_add_tail(&llwc->llwc_list, callback_list);
513
514         return (llwc);
515 }
516
517 void
518 liblustre_deregister_waitidle_callback (void *opaque)
519 {
520         struct liblustre_wait_callback *llwc = opaque;
521
522         list_del(&llwc->llwc_list);
523         OBD_FREE(llwc, sizeof(*llwc));
524 }
525
526 void *
527 liblustre_register_wait_callback (const char *name,
528                                   int (*fn)(void *arg), void *arg)
529 {
530         return liblustre_register_waitidle_callback(&liblustre_wait_callbacks,
531                                                     name, fn, arg);
532 }
533
534 void
535 liblustre_deregister_wait_callback (void *opaque)
536 {
537         liblustre_deregister_waitidle_callback(opaque);
538 }
539
540 void *
541 liblustre_register_idle_callback (const char *name,
542                                   int (*fn)(void *arg), void *arg)
543 {
544         return liblustre_register_waitidle_callback(&liblustre_idle_callbacks,
545                                                     name, fn, arg);
546 }
547
548 void
549 liblustre_deregister_idle_callback (void *opaque)
550 {
551         liblustre_deregister_waitidle_callback(opaque);
552 }
553
554 int
555 liblustre_check_events (int timeout)
556 {
557         lnet_event_t ev;
558         int         rc;
559         int         i;
560         ENTRY;
561
562         rc = LNetEQPoll(&ptlrpc_eq_h, 1, timeout * 1000, &ev, &i);
563         if (rc == 0)
564                 RETURN(0);
565
566         LASSERT (rc == -EOVERFLOW || rc == 1);
567
568         /* liblustre: no asynch callback so we can't affort to miss any
569          * events... */
570         if (rc == -EOVERFLOW) {
571                 CERROR ("Dropped an event!!!\n");
572                 abort();
573         }
574
575         ptlrpc_master_callback (&ev);
576         RETURN(1);
577 }
578
579 int liblustre_waiting = 0;
580
581 int
582 liblustre_wait_event (int timeout)
583 {
584         struct list_head               *tmp;
585         struct liblustre_wait_callback *llwc;
586         int                             found_something = 0;
587
588         /* single threaded recursion check... */
589         liblustre_waiting = 1;
590
591         for (;;) {
592                 /* Deal with all pending events */
593                 while (liblustre_check_events(0))
594                         found_something = 1;
595
596                 /* Give all registered callbacks a bite at the cherry */
597                 list_for_each(tmp, &liblustre_wait_callbacks) {
598                         llwc = list_entry(tmp, struct liblustre_wait_callback,
599                                           llwc_list);
600
601                         if (llwc->llwc_fn(llwc->llwc_arg))
602                                 found_something = 1;
603                 }
604
605                 if (found_something || timeout == 0)
606                         break;
607
608                 /* Nothing so far, but I'm allowed to block... */
609                 found_something = liblustre_check_events(timeout);
610                 if (!found_something)           /* still nothing */
611                         break;                  /* I timed out */
612         }
613
614         liblustre_waiting = 0;
615
616         return found_something;
617 }
618
619 void
620 liblustre_wait_idle(void)
621 {
622         static int recursed = 0;
623         
624         struct list_head               *tmp;
625         struct liblustre_wait_callback *llwc;
626         int                             idle = 0;
627
628         LASSERT(!recursed);
629         recursed = 1;
630         
631         do {
632                 liblustre_wait_event(0);
633
634                 idle = 1;
635
636                 list_for_each(tmp, &liblustre_idle_callbacks) {
637                         llwc = list_entry(tmp, struct liblustre_wait_callback,
638                                           llwc_list);
639                         
640                         if (!llwc->llwc_fn(llwc->llwc_arg)) {
641                                 idle = 0;
642                                 break;
643                         }
644                 }
645                         
646         } while (!idle);
647
648         recursed = 0;
649 }
650
651 #endif /* __KERNEL__ */
652
653 int ptlrpc_init_portals(void)
654 {
655         int   rc = ptlrpc_ni_init();
656
657         if (rc != 0) {
658                 CERROR("network initialisation failed\n");
659                 return -EIO;
660         }
661 #ifndef __KERNEL__
662         liblustre_services_callback =
663                 liblustre_register_wait_callback("liblustre_check_services",
664                                                  &liblustre_check_services,
665                                                  NULL);
666 #endif
667         rc = ptlrpcd_addref();
668         if (rc == 0)
669                 return 0;
670         
671         CERROR("rpcd initialisation failed\n");
672 #ifndef __KERNEL__
673         liblustre_deregister_wait_callback(liblustre_services_callback);
674 #endif
675         ptlrpc_ni_fini();
676         return rc;
677 }
678
679 void ptlrpc_exit_portals(void)
680 {
681 #ifndef __KERNEL__
682         liblustre_deregister_wait_callback(liblustre_services_callback);
683 #endif
684         ptlrpcd_decref();
685         ptlrpc_ni_fini();
686 }