Whamcloud - gitweb
018dc62a90cc69ffa4804cd2cc54830af19207a4
[fs/lustre-release.git] / lustre / ptlrpc / events.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_RPC
38
39 #ifndef __KERNEL__
40 # include <liblustre.h>
41 #else
42 # include <libcfs/libcfs.h>
43 # ifdef __mips64__
44 #  include <linux/kernel.h>
45 # endif
46 #endif
47
48 #include <obd_class.h>
49 #include <lustre_net.h>
50 #include <lustre_sec.h>
51 #include "ptlrpc_internal.h"
52
53 lnet_handle_eq_t   ptlrpc_eq_h;
54
55 /*
56  *  Client's outgoing request callback
57  */
58 void request_out_callback(lnet_event_t *ev)
59 {
60         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
61         struct ptlrpc_request *req = cbid->cbid_arg;
62         ENTRY;
63
64         LASSERT (ev->type == LNET_EVENT_SEND ||
65                  ev->type == LNET_EVENT_UNLINK);
66         LASSERT (ev->unlinked);
67
68         DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
69                   "type %d, status %d", ev->type, ev->status);
70
71         sptlrpc_request_out_callback(req);
72
73         if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) {
74
75                 /* Failed send: make it seem like the reply timed out, just
76                  * like failing sends in client.c does currently...  */
77
78                 cfs_spin_lock(&req->rq_lock);
79                 req->rq_net_err = 1;
80                 cfs_spin_unlock(&req->rq_lock);
81
82                 ptlrpc_client_wake_req(req);
83         }
84
85         ptlrpc_req_finished(req);
86
87         EXIT;
88 }
89
90 /*
91  * Client's incoming reply callback
92  */
93 void reply_in_callback(lnet_event_t *ev)
94 {
95         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
96         struct ptlrpc_request *req = cbid->cbid_arg;
97         ENTRY;
98
99         DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
100                   "type %d, status %d", ev->type, ev->status);
101
102         LASSERT (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK);
103         LASSERT (ev->md.start == req->rq_repbuf);
104         LASSERT (ev->mlength <= req->rq_repbuf_len);
105         /* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests
106            for adaptive timeouts' early reply. */
107         LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0);
108
109         cfs_spin_lock(&req->rq_lock);
110
111         req->rq_receiving_reply = 0;
112         req->rq_early = 0;
113         if (ev->unlinked)
114                 req->rq_must_unlink = 0;
115
116         if (ev->status)
117                 goto out_wake;
118
119         if (ev->type == LNET_EVENT_UNLINK) {
120                 LASSERT(ev->unlinked);
121                 DEBUG_REQ(D_RPCTRACE, req, "unlink");
122                 goto out_wake;
123         }
124
125         if (ev->mlength < ev->rlength ) {
126                 CDEBUG(D_RPCTRACE, "truncate req %p rpc %d - %d+%d\n", req,
127                        req->rq_replen, ev->rlength, ev->offset);
128                 req->rq_reply_truncate = 1;
129                 req->rq_replied = 1;
130                 req->rq_status = -EOVERFLOW;
131                 req->rq_nob_received = ev->rlength + ev->offset;
132                 goto out_wake;
133         }
134
135         if ((ev->offset == 0) &&
136             ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT))) {
137                 /* Early reply */
138                 DEBUG_REQ(D_ADAPTTO, req,
139                           "Early reply received: mlen=%u offset=%d replen=%d "
140                           "replied=%d unlinked=%d", ev->mlength, ev->offset,
141                           req->rq_replen, req->rq_replied, ev->unlinked);
142
143                 req->rq_early_count++; /* number received, client side */
144
145                 if (req->rq_replied)   /* already got the real reply */
146                         goto out_wake;
147
148                 req->rq_early = 1;
149                 req->rq_reply_off = ev->offset;
150                 req->rq_nob_received = ev->mlength;
151                 /* And we're still receiving */
152                 req->rq_receiving_reply = 1;
153         } else {
154                 /* Real reply */
155                 req->rq_rep_swab_mask = 0;
156                 req->rq_replied = 1;
157                 req->rq_reply_off = ev->offset;
158                 req->rq_nob_received = ev->mlength;
159                 /* LNetMDUnlink can't be called under the LNET_LOCK,
160                    so we must unlink in ptlrpc_unregister_reply */
161                 DEBUG_REQ(D_INFO, req,
162                           "reply in flags=%x mlen=%u offset=%d replen=%d",
163                           lustre_msg_get_flags(req->rq_reqmsg),
164                           ev->mlength, ev->offset, req->rq_replen);
165         }
166
167         req->rq_import->imp_last_reply_time = cfs_time_current_sec();
168
169 out_wake:
170         /* NB don't unlock till after wakeup; req can disappear under us
171          * since we don't have our own ref */
172         ptlrpc_client_wake_req(req);
173         cfs_spin_unlock(&req->rq_lock);
174         EXIT;
175 }
176
177 /*
178  * Client's bulk has been written/read
179  */
180 void client_bulk_callback (lnet_event_t *ev)
181 {
182         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
183         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
184         ENTRY;
185
186         LASSERT ((desc->bd_type == BULK_PUT_SINK &&
187                   ev->type == LNET_EVENT_PUT) ||
188                  (desc->bd_type == BULK_GET_SOURCE &&
189                   ev->type == LNET_EVENT_GET) ||
190                  ev->type == LNET_EVENT_UNLINK);
191         LASSERT (ev->unlinked);
192
193         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
194                "event type %d, status %d, desc %p\n",
195                ev->type, ev->status, desc);
196
197         cfs_spin_lock(&desc->bd_lock);
198
199         LASSERT(desc->bd_network_rw);
200         desc->bd_network_rw = 0;
201
202         if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
203                 desc->bd_success = 1;
204                 desc->bd_nob_transferred = ev->mlength;
205                 desc->bd_sender = ev->sender;
206         }
207
208         /* release the encrypted pages for write */
209         if (desc->bd_req->rq_bulk_write)
210                 sptlrpc_enc_pool_put_pages(desc);
211
212         /* NB don't unlock till after wakeup; desc can disappear under us
213          * otherwise */
214         ptlrpc_client_wake_req(desc->bd_req);
215
216         cfs_spin_unlock(&desc->bd_lock);
217         EXIT;
218 }
219
220 /*
221  * Server's incoming request callback
222  */
223 void request_in_callback(lnet_event_t *ev)
224 {
225         struct ptlrpc_cb_id               *cbid = ev->md.user_ptr;
226         struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
227         struct ptlrpc_service             *service = rqbd->rqbd_service;
228         struct ptlrpc_request             *req;
229         ENTRY;
230
231         LASSERT (ev->type == LNET_EVENT_PUT ||
232                  ev->type == LNET_EVENT_UNLINK);
233         LASSERT ((char *)ev->md.start >= rqbd->rqbd_buffer);
234         LASSERT ((char *)ev->md.start + ev->offset + ev->mlength <=
235                  rqbd->rqbd_buffer + service->srv_buf_size);
236
237         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
238                "event type %d, status %d, service %s\n",
239                ev->type, ev->status, service->srv_name);
240
241         if (ev->unlinked) {
242                 /* If this is the last request message to fit in the
243                  * request buffer we can use the request object embedded in
244                  * rqbd.  Note that if we failed to allocate a request,
245                  * we'd have to re-post the rqbd, which we can't do in this
246                  * context. */
247                 req = &rqbd->rqbd_req;
248                 memset(req, 0, sizeof (*req));
249         } else {
250                 LASSERT (ev->type == LNET_EVENT_PUT);
251                 if (ev->status != 0) {
252                         /* We moaned above already... */
253                         return;
254                 }
255                 OBD_ALLOC_GFP(req, sizeof(*req), CFS_ALLOC_ATOMIC_TRY);
256                 if (req == NULL) {
257                         CERROR("Can't allocate incoming request descriptor: "
258                                "Dropping %s RPC from %s\n",
259                                service->srv_name,
260                                libcfs_id2str(ev->initiator));
261                         return;
262                 }
263         }
264
265         /* NB we ABSOLUTELY RELY on req being zeroed, so pointers are NULL,
266          * flags are reset and scalars are zero.  We only set the message
267          * size to non-zero if this was a successful receive. */
268         req->rq_xid = ev->match_bits;
269         req->rq_reqbuf = ev->md.start + ev->offset;
270         if (ev->type == LNET_EVENT_PUT && ev->status == 0)
271                 req->rq_reqdata_len = ev->mlength;
272         cfs_gettimeofday(&req->rq_arrival_time);
273         req->rq_peer = ev->initiator;
274         req->rq_self = ev->target.nid;
275         req->rq_rqbd = rqbd;
276         req->rq_phase = RQ_PHASE_NEW;
277 #ifdef CRAY_XT3
278         req->rq_uid = ev->uid;
279 #endif
280         cfs_spin_lock_init(&req->rq_lock);
281         CFS_INIT_LIST_HEAD(&req->rq_timed_list);
282         cfs_atomic_set(&req->rq_refcount, 1);
283         if (ev->type == LNET_EVENT_PUT)
284                 CDEBUG(D_RPCTRACE, "incoming req@%p x"LPU64" msgsize %u\n",
285                        req, req->rq_xid, ev->mlength);
286
287         CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer));
288
289         cfs_spin_lock(&service->srv_lock);
290
291         req->rq_history_seq = service->srv_request_seq++;
292         cfs_list_add_tail(&req->rq_history_list, &service->srv_request_history);
293
294         if (ev->unlinked) {
295                 service->srv_nrqbd_receiving--;
296                 CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
297                        service->srv_nrqbd_receiving);
298
299                 /* Normally, don't complain about 0 buffers posted; LNET won't
300                  * drop incoming reqs since we set the portal lazy */
301                 if (test_req_buffer_pressure &&
302                     ev->type != LNET_EVENT_UNLINK &&
303                     service->srv_nrqbd_receiving == 0)
304                         CWARN("All %s request buffers busy\n",
305                               service->srv_name);
306
307                 /* req takes over the network's ref on rqbd */
308         } else {
309                 /* req takes a ref on rqbd */
310                 rqbd->rqbd_refcount++;
311         }
312
313         cfs_list_add_tail(&req->rq_list, &service->srv_req_in_queue);
314         service->srv_n_queued_reqs++;
315
316         /* NB everything can disappear under us once the request
317          * has been queued and we unlock, so do the wake now... */
318         cfs_waitq_signal(&service->srv_waitq);
319
320         cfs_spin_unlock(&service->srv_lock);
321         EXIT;
322 }
323
324 /*
325  *  Server's outgoing reply callback
326  */
327 void reply_out_callback(lnet_event_t *ev)
328 {
329         struct ptlrpc_cb_id       *cbid = ev->md.user_ptr;
330         struct ptlrpc_reply_state *rs = cbid->cbid_arg;
331         struct ptlrpc_service     *svc = rs->rs_service;
332         ENTRY;
333
334         LASSERT (ev->type == LNET_EVENT_SEND ||
335                  ev->type == LNET_EVENT_ACK ||
336                  ev->type == LNET_EVENT_UNLINK);
337
338         if (!rs->rs_difficult) {
339                 /* 'Easy' replies have no further processing so I drop the
340                  * net's ref on 'rs' */
341                 LASSERT (ev->unlinked);
342                 ptlrpc_rs_decref(rs);
343                 cfs_atomic_dec (&svc->srv_outstanding_replies);
344                 EXIT;
345                 return;
346         }
347
348         LASSERT (rs->rs_on_net);
349
350         if (ev->unlinked) {
351                 /* Last network callback. The net's ref on 'rs' stays put
352                  * until ptlrpc_handle_rs() is done with it */
353                 cfs_spin_lock(&svc->srv_lock);
354                 cfs_spin_lock(&rs->rs_lock);
355                 rs->rs_on_net = 0;
356                 if (!rs->rs_no_ack ||
357                     rs->rs_transno <= rs->rs_export->exp_obd->obd_last_committed)
358                         ptlrpc_schedule_difficult_reply (rs);
359                 cfs_spin_unlock(&rs->rs_lock);
360                 cfs_spin_unlock(&svc->srv_lock);
361         }
362
363         EXIT;
364 }
365
366 /*
367  * Server's bulk completion callback
368  */
369 void server_bulk_callback (lnet_event_t *ev)
370 {
371         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
372         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
373         ENTRY;
374
375         LASSERT (ev->type == LNET_EVENT_SEND ||
376                  ev->type == LNET_EVENT_UNLINK ||
377                  (desc->bd_type == BULK_PUT_SOURCE &&
378                   ev->type == LNET_EVENT_ACK) ||
379                  (desc->bd_type == BULK_GET_SINK &&
380                   ev->type == LNET_EVENT_REPLY));
381
382         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
383                "event type %d, status %d, desc %p\n",
384                ev->type, ev->status, desc);
385
386         cfs_spin_lock(&desc->bd_lock);
387
388         if ((ev->type == LNET_EVENT_ACK ||
389              ev->type == LNET_EVENT_REPLY) &&
390             ev->status == 0) {
391                 /* We heard back from the peer, so even if we get this
392                  * before the SENT event (oh yes we can), we know we
393                  * read/wrote the peer buffer and how much... */
394                 desc->bd_success = 1;
395                 desc->bd_nob_transferred = ev->mlength;
396                 desc->bd_sender = ev->sender;
397         }
398
399         if (ev->unlinked) {
400                 /* This is the last callback no matter what... */
401                 desc->bd_network_rw = 0;
402                 cfs_waitq_signal(&desc->bd_waitq);
403         }
404
405         cfs_spin_unlock(&desc->bd_lock);
406         EXIT;
407 }
408
409 static void ptlrpc_master_callback(lnet_event_t *ev)
410 {
411         struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
412         void (*callback)(lnet_event_t *ev) = cbid->cbid_fn;
413
414         /* Honestly, it's best to find out early. */
415         LASSERT (cbid->cbid_arg != LP_POISON);
416         LASSERT (callback == request_out_callback ||
417                  callback == reply_in_callback ||
418                  callback == client_bulk_callback ||
419                  callback == request_in_callback ||
420                  callback == reply_out_callback ||
421                  callback == server_bulk_callback);
422
423         callback (ev);
424 }
425
426 int ptlrpc_uuid_to_peer (struct obd_uuid *uuid,
427                          lnet_process_id_t *peer, lnet_nid_t *self)
428 {
429         int               best_dist = 0;
430         __u32             best_order = 0;
431         int               count = 0;
432         int               rc = -ENOENT;
433         int               portals_compatibility;
434         int               dist;
435         __u32             order;
436         lnet_nid_t        dst_nid;
437         lnet_nid_t        src_nid;
438
439         portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL);
440
441         peer->pid = LUSTRE_SRV_LNET_PID;
442
443         /* Choose the matching UUID that's closest */
444         while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) {
445                 dist = LNetDist(dst_nid, &src_nid, &order);
446                 if (dist < 0)
447                         continue;
448
449                 if (dist == 0) {                /* local! use loopback LND */
450                         peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0);
451                         rc = 0;
452                         break;
453                 }
454
455                 if (rc < 0 ||
456                     dist < best_dist ||
457                     (dist == best_dist && order < best_order)) {
458                         best_dist = dist;
459                         best_order = order;
460
461                         if (portals_compatibility > 1) {
462                                 /* Strong portals compatibility: Zero the nid's
463                                  * NET, so if I'm reading new config logs, or
464                                  * getting configured by (new) lconf I can
465                                  * still talk to old servers. */
466                                 dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid));
467                                 src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid));
468                         }
469                         peer->nid = dst_nid;
470                         *self = src_nid;
471                         rc = 0;
472                 }
473         }
474
475         CDEBUG(D_NET,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer));
476         if (rc != 0)
477                 CERROR("No NID found for %s\n", uuid->uuid);
478         return rc;
479 }
480
481 void ptlrpc_ni_fini(void)
482 {
483         cfs_waitq_t         waitq;
484         struct l_wait_info  lwi;
485         int                 rc;
486         int                 retries;
487
488         /* Wait for the event queue to become idle since there may still be
489          * messages in flight with pending events (i.e. the fire-and-forget
490          * messages == client requests and "non-difficult" server
491          * replies */
492
493         for (retries = 0;; retries++) {
494                 rc = LNetEQFree(ptlrpc_eq_h);
495                 switch (rc) {
496                 default:
497                         LBUG();
498
499                 case 0:
500                         LNetNIFini();
501                         return;
502
503                 case -EBUSY:
504                         if (retries != 0)
505                                 CWARN("Event queue still busy\n");
506
507                         /* Wait for a bit */
508                         cfs_waitq_init(&waitq);
509                         lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL);
510                         l_wait_event(waitq, 0, &lwi);
511                         break;
512                 }
513         }
514         /* notreached */
515 }
516
517 lnet_pid_t ptl_get_pid(void)
518 {
519         lnet_pid_t        pid;
520
521 #ifndef  __KERNEL__
522         pid = getpid();
523 #else
524         pid = LUSTRE_SRV_LNET_PID;
525 #endif
526         return pid;
527 }
528
529 int ptlrpc_ni_init(void)
530 {
531         int              rc;
532         lnet_pid_t       pid;
533
534         pid = ptl_get_pid();
535         CDEBUG(D_NET, "My pid is: %x\n", pid);
536
537         /* We're not passing any limits yet... */
538         rc = LNetNIInit(pid);
539         if (rc < 0) {
540                 CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
541                 return (-ENOENT);
542         }
543
544         /* CAVEAT EMPTOR: how we process portals events is _radically_
545          * different depending on... */
546 #ifdef __KERNEL__
547         /* kernel portals calls our master callback when events are added to
548          * the event queue.  In fact lustre never pulls events off this queue,
549          * so it's only sized for some debug history. */
550         rc = LNetEQAlloc(1024, ptlrpc_master_callback, &ptlrpc_eq_h);
551 #else
552         /* liblustre calls the master callback when it removes events from the
553          * event queue.  The event queue has to be big enough not to drop
554          * anything */
555         rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &ptlrpc_eq_h);
556 #endif
557         if (rc == 0)
558                 return 0;
559
560         CERROR ("Failed to allocate event queue: %d\n", rc);
561         LNetNIFini();
562
563         return (-ENOMEM);
564 }
565
566 #ifndef __KERNEL__
567 CFS_LIST_HEAD(liblustre_wait_callbacks);
568 CFS_LIST_HEAD(liblustre_idle_callbacks);
569 void *liblustre_services_callback;
570
571 void *
572 liblustre_register_waitidle_callback (cfs_list_t *callback_list,
573                                       const char *name,
574                                       int (*fn)(void *arg), void *arg)
575 {
576         struct liblustre_wait_callback *llwc;
577
578         OBD_ALLOC(llwc, sizeof(*llwc));
579         LASSERT (llwc != NULL);
580
581         llwc->llwc_name = name;
582         llwc->llwc_fn = fn;
583         llwc->llwc_arg = arg;
584         cfs_list_add_tail(&llwc->llwc_list, callback_list);
585
586         return (llwc);
587 }
588
589 void
590 liblustre_deregister_waitidle_callback (void *opaque)
591 {
592         struct liblustre_wait_callback *llwc = opaque;
593
594         cfs_list_del(&llwc->llwc_list);
595         OBD_FREE(llwc, sizeof(*llwc));
596 }
597
598 void *
599 liblustre_register_wait_callback (const char *name,
600                                   int (*fn)(void *arg), void *arg)
601 {
602         return liblustre_register_waitidle_callback(&liblustre_wait_callbacks,
603                                                     name, fn, arg);
604 }
605
606 void
607 liblustre_deregister_wait_callback (void *opaque)
608 {
609         liblustre_deregister_waitidle_callback(opaque);
610 }
611
612 void *
613 liblustre_register_idle_callback (const char *name,
614                                   int (*fn)(void *arg), void *arg)
615 {
616         return liblustre_register_waitidle_callback(&liblustre_idle_callbacks,
617                                                     name, fn, arg);
618 }
619
620 void
621 liblustre_deregister_idle_callback (void *opaque)
622 {
623         liblustre_deregister_waitidle_callback(opaque);
624 }
625
626 int
627 liblustre_check_events (int timeout)
628 {
629         lnet_event_t ev;
630         int         rc;
631         int         i;
632         ENTRY;
633
634         rc = LNetEQPoll(&ptlrpc_eq_h, 1, timeout * 1000, &ev, &i);
635         if (rc == 0)
636                 RETURN(0);
637
638         LASSERT (rc == -EOVERFLOW || rc == 1);
639
640         /* liblustre: no asynch callback so we can't afford to miss any
641          * events... */
642         if (rc == -EOVERFLOW) {
643                 CERROR ("Dropped an event!!!\n");
644                 abort();
645         }
646
647         ptlrpc_master_callback (&ev);
648         RETURN(1);
649 }
650
651 int liblustre_waiting = 0;
652
653 int
654 liblustre_wait_event (int timeout)
655 {
656         cfs_list_t                     *tmp;
657         struct liblustre_wait_callback *llwc;
658         int                             found_something = 0;
659
660         /* single threaded recursion check... */
661         liblustre_waiting = 1;
662
663         for (;;) {
664                 /* Deal with all pending events */
665                 while (liblustre_check_events(0))
666                         found_something = 1;
667
668                 /* Give all registered callbacks a bite at the cherry */
669                 cfs_list_for_each(tmp, &liblustre_wait_callbacks) {
670                         llwc = cfs_list_entry(tmp,
671                                               struct liblustre_wait_callback,
672                                               llwc_list);
673
674                         if (llwc->llwc_fn(llwc->llwc_arg))
675                                 found_something = 1;
676                 }
677
678                 if (found_something || timeout == 0)
679                         break;
680
681                 /* Nothing so far, but I'm allowed to block... */
682                 found_something = liblustre_check_events(timeout);
683                 if (!found_something)           /* still nothing */
684                         break;                  /* I timed out */
685         }
686
687         liblustre_waiting = 0;
688
689         return found_something;
690 }
691
692 void
693 liblustre_wait_idle(void)
694 {
695         static int recursed = 0;
696
697         cfs_list_t                     *tmp;
698         struct liblustre_wait_callback *llwc;
699         int                             idle = 0;
700
701         LASSERT(!recursed);
702         recursed = 1;
703
704         do {
705                 liblustre_wait_event(0);
706
707                 idle = 1;
708
709                 cfs_list_for_each(tmp, &liblustre_idle_callbacks) {
710                         llwc = cfs_list_entry(tmp,
711                                               struct liblustre_wait_callback,
712                                               llwc_list);
713
714                         if (!llwc->llwc_fn(llwc->llwc_arg)) {
715                                 idle = 0;
716                                 break;
717                         }
718                 }
719
720         } while (!idle);
721
722         recursed = 0;
723 }
724
725 #endif /* __KERNEL__ */
726
727 int ptlrpc_init_portals(void)
728 {
729         int   rc = ptlrpc_ni_init();
730
731         if (rc != 0) {
732                 CERROR("network initialisation failed\n");
733                 return -EIO;
734         }
735 #ifndef __KERNEL__
736         liblustre_services_callback =
737                 liblustre_register_wait_callback("liblustre_check_services",
738                                                  &liblustre_check_services,
739                                                  NULL);
740         cfs_init_completion_module(liblustre_wait_event);
741 #endif
742         rc = ptlrpcd_addref();
743         if (rc == 0)
744                 return 0;
745
746         CERROR("rpcd initialisation failed\n");
747 #ifndef __KERNEL__
748         liblustre_deregister_wait_callback(liblustre_services_callback);
749 #endif
750         ptlrpc_ni_fini();
751         return rc;
752 }
753
754 void ptlrpc_exit_portals(void)
755 {
756 #ifndef __KERNEL__
757         liblustre_deregister_wait_callback(liblustre_services_callback);
758 #endif
759         ptlrpcd_decref();
760         ptlrpc_ni_fini();
761 }