Whamcloud - gitweb
LU-1347 build: remove the vim/emacs modelines
[fs/lustre-release.git] / lustre / ptlrpc / events.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  */
34
35 #define DEBUG_SUBSYSTEM S_RPC
36
37 #ifndef __KERNEL__
38 # include <liblustre.h>
39 #else
40 # include <libcfs/libcfs.h>
41 # ifdef __mips64__
42 #  include <linux/kernel.h>
43 # endif
44 #endif
45
46 #include <obd_class.h>
47 #include <lustre_net.h>
48 #include <lustre_sec.h>
49 #include "ptlrpc_internal.h"
50
51 lnet_handle_eq_t   ptlrpc_eq_h;
52
53 /*
54  *  Client's outgoing request callback
55  */
56 void request_out_callback(lnet_event_t *ev)
57 {
58         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
59         struct ptlrpc_request *req = cbid->cbid_arg;
60         ENTRY;
61
62         LASSERT (ev->type == LNET_EVENT_SEND ||
63                  ev->type == LNET_EVENT_UNLINK);
64         LASSERT (ev->unlinked);
65
66         DEBUG_REQ(D_NET, req, "type %d, status %d", ev->type, ev->status);
67
68         sptlrpc_request_out_callback(req);
69         req->rq_real_sent = cfs_time_current_sec();
70
71         if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) {
72
73                 /* Failed send: make it seem like the reply timed out, just
74                  * like failing sends in client.c does currently...  */
75
76                 cfs_spin_lock(&req->rq_lock);
77                 req->rq_net_err = 1;
78                 cfs_spin_unlock(&req->rq_lock);
79
80                 ptlrpc_client_wake_req(req);
81         }
82
83         ptlrpc_req_finished(req);
84
85         EXIT;
86 }
87
88 /*
89  * Client's incoming reply callback
90  */
91 void reply_in_callback(lnet_event_t *ev)
92 {
93         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
94         struct ptlrpc_request *req = cbid->cbid_arg;
95         ENTRY;
96
97         DEBUG_REQ(D_NET, req, "type %d, status %d", ev->type, ev->status);
98
99         LASSERT (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK);
100         LASSERT (ev->md.start == req->rq_repbuf);
101         LASSERT (ev->offset + ev->mlength <= req->rq_repbuf_len);
102         /* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests
103            for adaptive timeouts' early reply. */
104         LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0);
105
106         cfs_spin_lock(&req->rq_lock);
107
108         req->rq_receiving_reply = 0;
109         req->rq_early = 0;
110         if (ev->unlinked)
111                 req->rq_must_unlink = 0;
112
113         if (ev->status)
114                 goto out_wake;
115
116         if (ev->type == LNET_EVENT_UNLINK) {
117                 LASSERT(ev->unlinked);
118                 DEBUG_REQ(D_NET, req, "unlink");
119                 goto out_wake;
120         }
121
122         if (ev->mlength < ev->rlength ) {
123                 CDEBUG(D_RPCTRACE, "truncate req %p rpc %d - %d+%d\n", req,
124                        req->rq_replen, ev->rlength, ev->offset);
125                 req->rq_reply_truncate = 1;
126                 req->rq_replied = 1;
127                 req->rq_status = -EOVERFLOW;
128                 req->rq_nob_received = ev->rlength + ev->offset;
129                 goto out_wake;
130         }
131
132         if ((ev->offset == 0) &&
133             ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT))) {
134                 /* Early reply */
135                 DEBUG_REQ(D_ADAPTTO, req,
136                           "Early reply received: mlen=%u offset=%d replen=%d "
137                           "replied=%d unlinked=%d", ev->mlength, ev->offset,
138                           req->rq_replen, req->rq_replied, ev->unlinked);
139
140                 req->rq_early_count++; /* number received, client side */
141
142                 if (req->rq_replied)   /* already got the real reply */
143                         goto out_wake;
144
145                 req->rq_early = 1;
146                 req->rq_reply_off = ev->offset;
147                 req->rq_nob_received = ev->mlength;
148                 /* And we're still receiving */
149                 req->rq_receiving_reply = 1;
150         } else {
151                 /* Real reply */
152                 req->rq_rep_swab_mask = 0;
153                 req->rq_replied = 1;
154                 req->rq_reply_off = ev->offset;
155                 req->rq_nob_received = ev->mlength;
156                 /* LNetMDUnlink can't be called under the LNET_LOCK,
157                    so we must unlink in ptlrpc_unregister_reply */
158                 DEBUG_REQ(D_INFO, req,
159                           "reply in flags=%x mlen=%u offset=%d replen=%d",
160                           lustre_msg_get_flags(req->rq_reqmsg),
161                           ev->mlength, ev->offset, req->rq_replen);
162         }
163
164         req->rq_import->imp_last_reply_time = cfs_time_current_sec();
165
166 out_wake:
167         /* NB don't unlock till after wakeup; req can disappear under us
168          * since we don't have our own ref */
169         ptlrpc_client_wake_req(req);
170         cfs_spin_unlock(&req->rq_lock);
171         EXIT;
172 }
173
174 /*
175  * Client's bulk has been written/read
176  */
177 void client_bulk_callback (lnet_event_t *ev)
178 {
179         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
180         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
181         ENTRY;
182
183         LASSERT ((desc->bd_type == BULK_PUT_SINK &&
184                   ev->type == LNET_EVENT_PUT) ||
185                  (desc->bd_type == BULK_GET_SOURCE &&
186                   ev->type == LNET_EVENT_GET) ||
187                  ev->type == LNET_EVENT_UNLINK);
188         LASSERT (ev->unlinked);
189
190         if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB, CFS_FAIL_ONCE))
191                 ev->status = -EIO;
192
193         if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2,CFS_FAIL_ONCE))
194                 ev->status = -EIO;
195
196         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
197                "event type %d, status %d, desc %p\n",
198                ev->type, ev->status, desc);
199
200         cfs_spin_lock(&desc->bd_lock);
201
202         LASSERT(desc->bd_network_rw);
203         desc->bd_network_rw = 0;
204
205         if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
206                 desc->bd_success = 1;
207                 desc->bd_nob_transferred = ev->mlength;
208                 desc->bd_sender = ev->sender;
209         }
210
211         /* release the encrypted pages for write */
212         if (desc->bd_req->rq_bulk_write)
213                 sptlrpc_enc_pool_put_pages(desc);
214
215         /* NB don't unlock till after wakeup; desc can disappear under us
216          * otherwise */
217         ptlrpc_client_wake_req(desc->bd_req);
218
219         cfs_spin_unlock(&desc->bd_lock);
220         EXIT;
221 }
222
223 /*
224  * Server's incoming request callback
225  */
226 void request_in_callback(lnet_event_t *ev)
227 {
228         struct ptlrpc_cb_id               *cbid = ev->md.user_ptr;
229         struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
230         struct ptlrpc_service             *service = rqbd->rqbd_service;
231         struct ptlrpc_request             *req;
232         ENTRY;
233
234         LASSERT (ev->type == LNET_EVENT_PUT ||
235                  ev->type == LNET_EVENT_UNLINK);
236         LASSERT ((char *)ev->md.start >= rqbd->rqbd_buffer);
237         LASSERT ((char *)ev->md.start + ev->offset + ev->mlength <=
238                  rqbd->rqbd_buffer + service->srv_buf_size);
239
240         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
241                "event type %d, status %d, service %s\n",
242                ev->type, ev->status, service->srv_name);
243
244         if (ev->unlinked) {
245                 /* If this is the last request message to fit in the
246                  * request buffer we can use the request object embedded in
247                  * rqbd.  Note that if we failed to allocate a request,
248                  * we'd have to re-post the rqbd, which we can't do in this
249                  * context. */
250                 req = &rqbd->rqbd_req;
251                 memset(req, 0, sizeof (*req));
252         } else {
253                 LASSERT (ev->type == LNET_EVENT_PUT);
254                 if (ev->status != 0) {
255                         /* We moaned above already... */
256                         return;
257                 }
258                 OBD_ALLOC_GFP(req, sizeof(*req), CFS_ALLOC_ATOMIC_TRY);
259                 if (req == NULL) {
260                         CERROR("Can't allocate incoming request descriptor: "
261                                "Dropping %s RPC from %s\n",
262                                service->srv_name,
263                                libcfs_id2str(ev->initiator));
264                         return;
265                 }
266         }
267
268         /* NB we ABSOLUTELY RELY on req being zeroed, so pointers are NULL,
269          * flags are reset and scalars are zero.  We only set the message
270          * size to non-zero if this was a successful receive. */
271         req->rq_xid = ev->match_bits;
272         req->rq_reqbuf = ev->md.start + ev->offset;
273         if (ev->type == LNET_EVENT_PUT && ev->status == 0)
274                 req->rq_reqdata_len = ev->mlength;
275         cfs_gettimeofday(&req->rq_arrival_time);
276         req->rq_peer = ev->initiator;
277         req->rq_self = ev->target.nid;
278         req->rq_rqbd = rqbd;
279         req->rq_phase = RQ_PHASE_NEW;
280 #ifdef CRAY_XT3
281         req->rq_uid = ev->uid;
282 #endif
283         cfs_spin_lock_init(&req->rq_lock);
284         CFS_INIT_LIST_HEAD(&req->rq_timed_list);
285         cfs_atomic_set(&req->rq_refcount, 1);
286         if (ev->type == LNET_EVENT_PUT)
287                 CDEBUG(D_INFO, "incoming req@%p x"LPU64" msgsize %u\n",
288                        req, req->rq_xid, ev->mlength);
289
290         CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer));
291
292         cfs_spin_lock(&service->srv_lock);
293
294         req->rq_history_seq = service->srv_request_seq++;
295         cfs_list_add_tail(&req->rq_history_list, &service->srv_request_history);
296
297         if (ev->unlinked) {
298                 service->srv_nrqbd_receiving--;
299                 CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
300                        service->srv_nrqbd_receiving);
301
302                 /* Normally, don't complain about 0 buffers posted; LNET won't
303                  * drop incoming reqs since we set the portal lazy */
304                 if (test_req_buffer_pressure &&
305                     ev->type != LNET_EVENT_UNLINK &&
306                     service->srv_nrqbd_receiving == 0)
307                         CWARN("All %s request buffers busy\n",
308                               service->srv_name);
309
310                 /* req takes over the network's ref on rqbd */
311         } else {
312                 /* req takes a ref on rqbd */
313                 rqbd->rqbd_refcount++;
314         }
315
316         cfs_list_add_tail(&req->rq_list, &service->srv_req_in_queue);
317         service->srv_n_queued_reqs++;
318
319         /* NB everything can disappear under us once the request
320          * has been queued and we unlock, so do the wake now... */
321         cfs_waitq_signal(&service->srv_waitq);
322
323         cfs_spin_unlock(&service->srv_lock);
324         EXIT;
325 }
326
327 /*
328  *  Server's outgoing reply callback
329  */
330 void reply_out_callback(lnet_event_t *ev)
331 {
332         struct ptlrpc_cb_id       *cbid = ev->md.user_ptr;
333         struct ptlrpc_reply_state *rs = cbid->cbid_arg;
334         struct ptlrpc_service     *svc = rs->rs_service;
335         ENTRY;
336
337         LASSERT (ev->type == LNET_EVENT_SEND ||
338                  ev->type == LNET_EVENT_ACK ||
339                  ev->type == LNET_EVENT_UNLINK);
340
341         if (!rs->rs_difficult) {
342                 /* 'Easy' replies have no further processing so I drop the
343                  * net's ref on 'rs' */
344                 LASSERT (ev->unlinked);
345                 ptlrpc_rs_decref(rs);
346                 EXIT;
347                 return;
348         }
349
350         LASSERT (rs->rs_on_net);
351
352         if (ev->unlinked) {
353                 /* Last network callback. The net's ref on 'rs' stays put
354                  * until ptlrpc_handle_rs() is done with it */
355                 cfs_spin_lock(&svc->srv_rs_lock);
356                 cfs_spin_lock(&rs->rs_lock);
357                 rs->rs_on_net = 0;
358                 if (!rs->rs_no_ack ||
359                     rs->rs_transno <= rs->rs_export->exp_obd->obd_last_committed)
360                         ptlrpc_schedule_difficult_reply (rs);
361                 cfs_spin_unlock(&rs->rs_lock);
362                 cfs_spin_unlock(&svc->srv_rs_lock);
363         }
364
365         EXIT;
366 }
367
368 #ifdef HAVE_SERVER_SUPPORT
369 /*
370  * Server's bulk completion callback
371  */
372 void server_bulk_callback (lnet_event_t *ev)
373 {
374         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
375         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
376         ENTRY;
377
378         LASSERT (ev->type == LNET_EVENT_SEND ||
379                  ev->type == LNET_EVENT_UNLINK ||
380                  (desc->bd_type == BULK_PUT_SOURCE &&
381                   ev->type == LNET_EVENT_ACK) ||
382                  (desc->bd_type == BULK_GET_SINK &&
383                   ev->type == LNET_EVENT_REPLY));
384
385         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
386                "event type %d, status %d, desc %p\n",
387                ev->type, ev->status, desc);
388
389         cfs_spin_lock(&desc->bd_lock);
390
391         if ((ev->type == LNET_EVENT_ACK ||
392              ev->type == LNET_EVENT_REPLY) &&
393             ev->status == 0) {
394                 /* We heard back from the peer, so even if we get this
395                  * before the SENT event (oh yes we can), we know we
396                  * read/wrote the peer buffer and how much... */
397                 desc->bd_success = 1;
398                 desc->bd_nob_transferred = ev->mlength;
399                 desc->bd_sender = ev->sender;
400         }
401
402         if (ev->unlinked) {
403                 /* This is the last callback no matter what... */
404                 desc->bd_network_rw = 0;
405                 cfs_waitq_signal(&desc->bd_waitq);
406         }
407
408         cfs_spin_unlock(&desc->bd_lock);
409         EXIT;
410 }
411 #endif
412
413 static void ptlrpc_master_callback(lnet_event_t *ev)
414 {
415         struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
416         void (*callback)(lnet_event_t *ev) = cbid->cbid_fn;
417
418         /* Honestly, it's best to find out early. */
419         LASSERT (cbid->cbid_arg != LP_POISON);
420         LASSERT (callback == request_out_callback ||
421                  callback == reply_in_callback ||
422                  callback == client_bulk_callback ||
423                  callback == request_in_callback ||
424                  callback == reply_out_callback
425 #ifdef HAVE_SERVER_SUPPORT
426                  || callback == server_bulk_callback
427 #endif
428                  );
429
430         callback (ev);
431 }
432
433 int ptlrpc_uuid_to_peer (struct obd_uuid *uuid,
434                          lnet_process_id_t *peer, lnet_nid_t *self)
435 {
436         int               best_dist = 0;
437         __u32             best_order = 0;
438         int               count = 0;
439         int               rc = -ENOENT;
440         int               portals_compatibility;
441         int               dist;
442         __u32             order;
443         lnet_nid_t        dst_nid;
444         lnet_nid_t        src_nid;
445
446         portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL);
447
448         peer->pid = LUSTRE_SRV_LNET_PID;
449
450         /* Choose the matching UUID that's closest */
451         while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) {
452                 dist = LNetDist(dst_nid, &src_nid, &order);
453                 if (dist < 0)
454                         continue;
455
456                 if (dist == 0) {                /* local! use loopback LND */
457                         peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0);
458                         rc = 0;
459                         break;
460                 }
461
462                 if (rc < 0 ||
463                     dist < best_dist ||
464                     (dist == best_dist && order < best_order)) {
465                         best_dist = dist;
466                         best_order = order;
467
468                         if (portals_compatibility > 1) {
469                                 /* Strong portals compatibility: Zero the nid's
470                                  * NET, so if I'm reading new config logs, or
471                                  * getting configured by (new) lconf I can
472                                  * still talk to old servers. */
473                                 dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid));
474                                 src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid));
475                         }
476                         peer->nid = dst_nid;
477                         *self = src_nid;
478                         rc = 0;
479                 }
480         }
481
482         CDEBUG(D_NET,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer));
483         return rc;
484 }
485
486 void ptlrpc_ni_fini(void)
487 {
488         cfs_waitq_t         waitq;
489         struct l_wait_info  lwi;
490         int                 rc;
491         int                 retries;
492
493         /* Wait for the event queue to become idle since there may still be
494          * messages in flight with pending events (i.e. the fire-and-forget
495          * messages == client requests and "non-difficult" server
496          * replies */
497
498         for (retries = 0;; retries++) {
499                 rc = LNetEQFree(ptlrpc_eq_h);
500                 switch (rc) {
501                 default:
502                         LBUG();
503
504                 case 0:
505                         LNetNIFini();
506                         return;
507
508                 case -EBUSY:
509                         if (retries != 0)
510                                 CWARN("Event queue still busy\n");
511
512                         /* Wait for a bit */
513                         cfs_waitq_init(&waitq);
514                         lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL);
515                         l_wait_event(waitq, 0, &lwi);
516                         break;
517                 }
518         }
519         /* notreached */
520 }
521
522 lnet_pid_t ptl_get_pid(void)
523 {
524         lnet_pid_t        pid;
525
526 #ifndef  __KERNEL__
527         pid = getpid();
528 #else
529         pid = LUSTRE_SRV_LNET_PID;
530 #endif
531         return pid;
532 }
533
534 int ptlrpc_ni_init(void)
535 {
536         int              rc;
537         lnet_pid_t       pid;
538
539         pid = ptl_get_pid();
540         CDEBUG(D_NET, "My pid is: %x\n", pid);
541
542         /* We're not passing any limits yet... */
543         rc = LNetNIInit(pid);
544         if (rc < 0) {
545                 CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
546                 return (-ENOENT);
547         }
548
549         /* CAVEAT EMPTOR: how we process portals events is _radically_
550          * different depending on... */
551 #ifdef __KERNEL__
552         /* kernel portals calls our master callback when events are added to
553          * the event queue.  In fact lustre never pulls events off this queue,
554          * so it's only sized for some debug history. */
555         rc = LNetEQAlloc(1024, ptlrpc_master_callback, &ptlrpc_eq_h);
556 #else
557         /* liblustre calls the master callback when it removes events from the
558          * event queue.  The event queue has to be big enough not to drop
559          * anything */
560         rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &ptlrpc_eq_h);
561 #endif
562         if (rc == 0)
563                 return 0;
564
565         CERROR ("Failed to allocate event queue: %d\n", rc);
566         LNetNIFini();
567
568         return (-ENOMEM);
569 }
570
571 #ifndef __KERNEL__
572 CFS_LIST_HEAD(liblustre_wait_callbacks);
573 CFS_LIST_HEAD(liblustre_idle_callbacks);
574 void *liblustre_services_callback;
575
576 void *
577 liblustre_register_waitidle_callback (cfs_list_t *callback_list,
578                                       const char *name,
579                                       int (*fn)(void *arg), void *arg)
580 {
581         struct liblustre_wait_callback *llwc;
582
583         OBD_ALLOC(llwc, sizeof(*llwc));
584         LASSERT (llwc != NULL);
585
586         llwc->llwc_name = name;
587         llwc->llwc_fn = fn;
588         llwc->llwc_arg = arg;
589         cfs_list_add_tail(&llwc->llwc_list, callback_list);
590
591         return (llwc);
592 }
593
594 void
595 liblustre_deregister_waitidle_callback (void *opaque)
596 {
597         struct liblustre_wait_callback *llwc = opaque;
598
599         cfs_list_del(&llwc->llwc_list);
600         OBD_FREE(llwc, sizeof(*llwc));
601 }
602
603 void *
604 liblustre_register_wait_callback (const char *name,
605                                   int (*fn)(void *arg), void *arg)
606 {
607         return liblustre_register_waitidle_callback(&liblustre_wait_callbacks,
608                                                     name, fn, arg);
609 }
610
611 void
612 liblustre_deregister_wait_callback (void *opaque)
613 {
614         liblustre_deregister_waitidle_callback(opaque);
615 }
616
617 void *
618 liblustre_register_idle_callback (const char *name,
619                                   int (*fn)(void *arg), void *arg)
620 {
621         return liblustre_register_waitidle_callback(&liblustre_idle_callbacks,
622                                                     name, fn, arg);
623 }
624
625 void
626 liblustre_deregister_idle_callback (void *opaque)
627 {
628         liblustre_deregister_waitidle_callback(opaque);
629 }
630
631 int
632 liblustre_check_events (int timeout)
633 {
634         lnet_event_t ev;
635         int         rc;
636         int         i;
637         ENTRY;
638
639         rc = LNetEQPoll(&ptlrpc_eq_h, 1, timeout * 1000, &ev, &i);
640         if (rc == 0)
641                 RETURN(0);
642
643         LASSERT (rc == -EOVERFLOW || rc == 1);
644
645         /* liblustre: no asynch callback so we can't afford to miss any
646          * events... */
647         if (rc == -EOVERFLOW) {
648                 CERROR ("Dropped an event!!!\n");
649                 abort();
650         }
651
652         ptlrpc_master_callback (&ev);
653         RETURN(1);
654 }
655
656 int liblustre_waiting = 0;
657
658 int
659 liblustre_wait_event (int timeout)
660 {
661         cfs_list_t                     *tmp;
662         struct liblustre_wait_callback *llwc;
663         int                             found_something = 0;
664
665         /* single threaded recursion check... */
666         liblustre_waiting = 1;
667
668         for (;;) {
669                 /* Deal with all pending events */
670                 while (liblustre_check_events(0))
671                         found_something = 1;
672
673                 /* Give all registered callbacks a bite at the cherry */
674                 cfs_list_for_each(tmp, &liblustre_wait_callbacks) {
675                         llwc = cfs_list_entry(tmp,
676                                               struct liblustre_wait_callback,
677                                               llwc_list);
678
679                         if (llwc->llwc_fn(llwc->llwc_arg))
680                                 found_something = 1;
681                 }
682
683                 if (found_something || timeout == 0)
684                         break;
685
686                 /* Nothing so far, but I'm allowed to block... */
687                 found_something = liblustre_check_events(timeout);
688                 if (!found_something)           /* still nothing */
689                         break;                  /* I timed out */
690         }
691
692         liblustre_waiting = 0;
693
694         return found_something;
695 }
696
697 void
698 liblustre_wait_idle(void)
699 {
700         static int recursed = 0;
701
702         cfs_list_t                     *tmp;
703         struct liblustre_wait_callback *llwc;
704         int                             idle = 0;
705
706         LASSERT(!recursed);
707         recursed = 1;
708
709         do {
710                 liblustre_wait_event(0);
711
712                 idle = 1;
713
714                 cfs_list_for_each(tmp, &liblustre_idle_callbacks) {
715                         llwc = cfs_list_entry(tmp,
716                                               struct liblustre_wait_callback,
717                                               llwc_list);
718
719                         if (!llwc->llwc_fn(llwc->llwc_arg)) {
720                                 idle = 0;
721                                 break;
722                         }
723                 }
724
725         } while (!idle);
726
727         recursed = 0;
728 }
729
730 #endif /* __KERNEL__ */
731
732 int ptlrpc_init_portals(void)
733 {
734         int   rc = ptlrpc_ni_init();
735
736         if (rc != 0) {
737                 CERROR("network initialisation failed\n");
738                 return -EIO;
739         }
740 #ifndef __KERNEL__
741         liblustre_services_callback =
742                 liblustre_register_wait_callback("liblustre_check_services",
743                                                  &liblustre_check_services,
744                                                  NULL);
745         cfs_init_completion_module(liblustre_wait_event);
746 #endif
747         rc = ptlrpcd_addref();
748         if (rc == 0)
749                 return 0;
750
751         CERROR("rpcd initialisation failed\n");
752 #ifndef __KERNEL__
753         liblustre_deregister_wait_callback(liblustre_services_callback);
754 #endif
755         ptlrpc_ni_fini();
756         return rc;
757 }
758
759 void ptlrpc_exit_portals(void)
760 {
761 #ifndef __KERNEL__
762         liblustre_deregister_wait_callback(liblustre_services_callback);
763 #endif
764         ptlrpcd_decref();
765         ptlrpc_ni_fini();
766 }