Whamcloud - gitweb
LU-56 lnet: allow to create EQ with zero eq_size
[fs/lustre-release.git] / lustre / ptlrpc / events.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  */
34
35 #define DEBUG_SUBSYSTEM S_RPC
36
37 #ifndef __KERNEL__
38 # include <liblustre.h>
39 #else
40 # include <libcfs/libcfs.h>
41 # ifdef __mips64__
42 #  include <linux/kernel.h>
43 # endif
44 #endif
45
46 #include <obd_class.h>
47 #include <lustre_net.h>
48 #include <lustre_sec.h>
49 #include "ptlrpc_internal.h"
50
51 lnet_handle_eq_t   ptlrpc_eq_h;
52
53 /*
54  *  Client's outgoing request callback
55  */
56 void request_out_callback(lnet_event_t *ev)
57 {
58         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
59         struct ptlrpc_request *req = cbid->cbid_arg;
60         ENTRY;
61
62         LASSERT (ev->type == LNET_EVENT_SEND ||
63                  ev->type == LNET_EVENT_UNLINK);
64         LASSERT (ev->unlinked);
65
66         DEBUG_REQ(D_NET, req, "type %d, status %d", ev->type, ev->status);
67
68         sptlrpc_request_out_callback(req);
69         req->rq_real_sent = cfs_time_current_sec();
70
71         if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) {
72
73                 /* Failed send: make it seem like the reply timed out, just
74                  * like failing sends in client.c does currently...  */
75
76                 cfs_spin_lock(&req->rq_lock);
77                 req->rq_net_err = 1;
78                 cfs_spin_unlock(&req->rq_lock);
79
80                 ptlrpc_client_wake_req(req);
81         }
82
83         ptlrpc_req_finished(req);
84
85         EXIT;
86 }
87
88 /*
89  * Client's incoming reply callback
90  */
91 void reply_in_callback(lnet_event_t *ev)
92 {
93         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
94         struct ptlrpc_request *req = cbid->cbid_arg;
95         ENTRY;
96
97         DEBUG_REQ(D_NET, req, "type %d, status %d", ev->type, ev->status);
98
99         LASSERT (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK);
100         LASSERT (ev->md.start == req->rq_repbuf);
101         LASSERT (ev->offset + ev->mlength <= req->rq_repbuf_len);
102         /* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests
103            for adaptive timeouts' early reply. */
104         LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0);
105
106         cfs_spin_lock(&req->rq_lock);
107
108         req->rq_receiving_reply = 0;
109         req->rq_early = 0;
110         if (ev->unlinked)
111                 req->rq_must_unlink = 0;
112
113         if (ev->status)
114                 goto out_wake;
115
116         if (ev->type == LNET_EVENT_UNLINK) {
117                 LASSERT(ev->unlinked);
118                 DEBUG_REQ(D_NET, req, "unlink");
119                 goto out_wake;
120         }
121
122         if (ev->mlength < ev->rlength ) {
123                 CDEBUG(D_RPCTRACE, "truncate req %p rpc %d - %d+%d\n", req,
124                        req->rq_replen, ev->rlength, ev->offset);
125                 req->rq_reply_truncate = 1;
126                 req->rq_replied = 1;
127                 req->rq_status = -EOVERFLOW;
128                 req->rq_nob_received = ev->rlength + ev->offset;
129                 goto out_wake;
130         }
131
132         if ((ev->offset == 0) &&
133             ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT))) {
134                 /* Early reply */
135                 DEBUG_REQ(D_ADAPTTO, req,
136                           "Early reply received: mlen=%u offset=%d replen=%d "
137                           "replied=%d unlinked=%d", ev->mlength, ev->offset,
138                           req->rq_replen, req->rq_replied, ev->unlinked);
139
140                 req->rq_early_count++; /* number received, client side */
141
142                 if (req->rq_replied)   /* already got the real reply */
143                         goto out_wake;
144
145                 req->rq_early = 1;
146                 req->rq_reply_off = ev->offset;
147                 req->rq_nob_received = ev->mlength;
148                 /* And we're still receiving */
149                 req->rq_receiving_reply = 1;
150         } else {
151                 /* Real reply */
152                 req->rq_rep_swab_mask = 0;
153                 req->rq_replied = 1;
154                 req->rq_reply_off = ev->offset;
155                 req->rq_nob_received = ev->mlength;
156                 /* LNetMDUnlink can't be called under the LNET_LOCK,
157                    so we must unlink in ptlrpc_unregister_reply */
158                 DEBUG_REQ(D_INFO, req,
159                           "reply in flags=%x mlen=%u offset=%d replen=%d",
160                           lustre_msg_get_flags(req->rq_reqmsg),
161                           ev->mlength, ev->offset, req->rq_replen);
162         }
163
164         req->rq_import->imp_last_reply_time = cfs_time_current_sec();
165
166 out_wake:
167         /* NB don't unlock till after wakeup; req can disappear under us
168          * since we don't have our own ref */
169         ptlrpc_client_wake_req(req);
170         cfs_spin_unlock(&req->rq_lock);
171         EXIT;
172 }
173
174 /*
175  * Client's bulk has been written/read
176  */
177 void client_bulk_callback (lnet_event_t *ev)
178 {
179         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
180         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
181         ENTRY;
182
183         LASSERT ((desc->bd_type == BULK_PUT_SINK &&
184                   ev->type == LNET_EVENT_PUT) ||
185                  (desc->bd_type == BULK_GET_SOURCE &&
186                   ev->type == LNET_EVENT_GET) ||
187                  ev->type == LNET_EVENT_UNLINK);
188         LASSERT (ev->unlinked);
189
190         if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB, CFS_FAIL_ONCE))
191                 ev->status = -EIO;
192
193         if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2,CFS_FAIL_ONCE))
194                 ev->status = -EIO;
195
196         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
197                "event type %d, status %d, desc %p\n",
198                ev->type, ev->status, desc);
199
200         cfs_spin_lock(&desc->bd_lock);
201
202         LASSERT(desc->bd_network_rw);
203         desc->bd_network_rw = 0;
204
205         if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
206                 desc->bd_success = 1;
207                 desc->bd_nob_transferred = ev->mlength;
208                 desc->bd_sender = ev->sender;
209         }
210
211         /* release the encrypted pages for write */
212         if (desc->bd_req->rq_bulk_write)
213                 sptlrpc_enc_pool_put_pages(desc);
214
215         /* NB don't unlock till after wakeup; desc can disappear under us
216          * otherwise */
217         ptlrpc_client_wake_req(desc->bd_req);
218
219         cfs_spin_unlock(&desc->bd_lock);
220         EXIT;
221 }
222
223 /*
224  * Server's incoming request callback
225  */
226 void request_in_callback(lnet_event_t *ev)
227 {
228         struct ptlrpc_cb_id               *cbid = ev->md.user_ptr;
229         struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
230         struct ptlrpc_service             *service = rqbd->rqbd_service;
231         struct ptlrpc_request             *req;
232         ENTRY;
233
234         LASSERT (ev->type == LNET_EVENT_PUT ||
235                  ev->type == LNET_EVENT_UNLINK);
236         LASSERT ((char *)ev->md.start >= rqbd->rqbd_buffer);
237         LASSERT ((char *)ev->md.start + ev->offset + ev->mlength <=
238                  rqbd->rqbd_buffer + service->srv_buf_size);
239
240         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
241                "event type %d, status %d, service %s\n",
242                ev->type, ev->status, service->srv_name);
243
244         if (ev->unlinked) {
245                 /* If this is the last request message to fit in the
246                  * request buffer we can use the request object embedded in
247                  * rqbd.  Note that if we failed to allocate a request,
248                  * we'd have to re-post the rqbd, which we can't do in this
249                  * context. */
250                 req = &rqbd->rqbd_req;
251                 memset(req, 0, sizeof (*req));
252         } else {
253                 LASSERT (ev->type == LNET_EVENT_PUT);
254                 if (ev->status != 0) {
255                         /* We moaned above already... */
256                         return;
257                 }
258                 OBD_ALLOC_GFP(req, sizeof(*req), CFS_ALLOC_ATOMIC_TRY);
259                 if (req == NULL) {
260                         CERROR("Can't allocate incoming request descriptor: "
261                                "Dropping %s RPC from %s\n",
262                                service->srv_name,
263                                libcfs_id2str(ev->initiator));
264                         return;
265                 }
266         }
267
268         /* NB we ABSOLUTELY RELY on req being zeroed, so pointers are NULL,
269          * flags are reset and scalars are zero.  We only set the message
270          * size to non-zero if this was a successful receive. */
271         req->rq_xid = ev->match_bits;
272         req->rq_reqbuf = ev->md.start + ev->offset;
273         if (ev->type == LNET_EVENT_PUT && ev->status == 0)
274                 req->rq_reqdata_len = ev->mlength;
275         cfs_gettimeofday(&req->rq_arrival_time);
276         req->rq_peer = ev->initiator;
277         req->rq_self = ev->target.nid;
278         req->rq_rqbd = rqbd;
279         req->rq_phase = RQ_PHASE_NEW;
280 #ifdef CRAY_XT3
281         req->rq_uid = ev->uid;
282 #endif
283         cfs_spin_lock_init(&req->rq_lock);
284         CFS_INIT_LIST_HEAD(&req->rq_timed_list);
285         cfs_atomic_set(&req->rq_refcount, 1);
286         if (ev->type == LNET_EVENT_PUT)
287                 CDEBUG(D_INFO, "incoming req@%p x"LPU64" msgsize %u\n",
288                        req, req->rq_xid, ev->mlength);
289
290         CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer));
291
292         cfs_spin_lock(&service->srv_lock);
293
294         req->rq_history_seq = service->srv_request_seq++;
295         cfs_list_add_tail(&req->rq_history_list, &service->srv_request_history);
296
297         if (ev->unlinked) {
298                 service->srv_nrqbd_receiving--;
299                 CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
300                        service->srv_nrqbd_receiving);
301
302                 /* Normally, don't complain about 0 buffers posted; LNET won't
303                  * drop incoming reqs since we set the portal lazy */
304                 if (test_req_buffer_pressure &&
305                     ev->type != LNET_EVENT_UNLINK &&
306                     service->srv_nrqbd_receiving == 0)
307                         CWARN("All %s request buffers busy\n",
308                               service->srv_name);
309
310                 /* req takes over the network's ref on rqbd */
311         } else {
312                 /* req takes a ref on rqbd */
313                 rqbd->rqbd_refcount++;
314         }
315
316         cfs_list_add_tail(&req->rq_list, &service->srv_req_in_queue);
317         service->srv_n_queued_reqs++;
318
319         /* NB everything can disappear under us once the request
320          * has been queued and we unlock, so do the wake now... */
321         cfs_waitq_signal(&service->srv_waitq);
322
323         cfs_spin_unlock(&service->srv_lock);
324         EXIT;
325 }
326
327 /*
328  *  Server's outgoing reply callback
329  */
330 void reply_out_callback(lnet_event_t *ev)
331 {
332         struct ptlrpc_cb_id       *cbid = ev->md.user_ptr;
333         struct ptlrpc_reply_state *rs = cbid->cbid_arg;
334         struct ptlrpc_service     *svc = rs->rs_service;
335         ENTRY;
336
337         LASSERT (ev->type == LNET_EVENT_SEND ||
338                  ev->type == LNET_EVENT_ACK ||
339                  ev->type == LNET_EVENT_UNLINK);
340
341         if (!rs->rs_difficult) {
342                 /* 'Easy' replies have no further processing so I drop the
343                  * net's ref on 'rs' */
344                 LASSERT (ev->unlinked);
345                 ptlrpc_rs_decref(rs);
346                 EXIT;
347                 return;
348         }
349
350         LASSERT (rs->rs_on_net);
351
352         if (ev->unlinked) {
353                 /* Last network callback. The net's ref on 'rs' stays put
354                  * until ptlrpc_handle_rs() is done with it */
355                 cfs_spin_lock(&svc->srv_rs_lock);
356                 cfs_spin_lock(&rs->rs_lock);
357                 rs->rs_on_net = 0;
358                 if (!rs->rs_no_ack ||
359                     rs->rs_transno <= rs->rs_export->exp_obd->obd_last_committed)
360                         ptlrpc_schedule_difficult_reply (rs);
361                 cfs_spin_unlock(&rs->rs_lock);
362                 cfs_spin_unlock(&svc->srv_rs_lock);
363         }
364
365         EXIT;
366 }
367
368 #ifdef HAVE_SERVER_SUPPORT
369 /*
370  * Server's bulk completion callback
371  */
372 void server_bulk_callback (lnet_event_t *ev)
373 {
374         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
375         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
376         ENTRY;
377
378         LASSERT (ev->type == LNET_EVENT_SEND ||
379                  ev->type == LNET_EVENT_UNLINK ||
380                  (desc->bd_type == BULK_PUT_SOURCE &&
381                   ev->type == LNET_EVENT_ACK) ||
382                  (desc->bd_type == BULK_GET_SINK &&
383                   ev->type == LNET_EVENT_REPLY));
384
385         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
386                "event type %d, status %d, desc %p\n",
387                ev->type, ev->status, desc);
388
389         cfs_spin_lock(&desc->bd_lock);
390
391         if ((ev->type == LNET_EVENT_ACK ||
392              ev->type == LNET_EVENT_REPLY) &&
393             ev->status == 0) {
394                 /* We heard back from the peer, so even if we get this
395                  * before the SENT event (oh yes we can), we know we
396                  * read/wrote the peer buffer and how much... */
397                 desc->bd_success = 1;
398                 desc->bd_nob_transferred = ev->mlength;
399                 desc->bd_sender = ev->sender;
400         }
401
402         if (ev->unlinked) {
403                 /* This is the last callback no matter what... */
404                 desc->bd_network_rw = 0;
405                 cfs_waitq_signal(&desc->bd_waitq);
406         }
407
408         cfs_spin_unlock(&desc->bd_lock);
409         EXIT;
410 }
411 #endif
412
413 static void ptlrpc_master_callback(lnet_event_t *ev)
414 {
415         struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
416         void (*callback)(lnet_event_t *ev) = cbid->cbid_fn;
417
418         /* Honestly, it's best to find out early. */
419         LASSERT (cbid->cbid_arg != LP_POISON);
420         LASSERT (callback == request_out_callback ||
421                  callback == reply_in_callback ||
422                  callback == client_bulk_callback ||
423                  callback == request_in_callback ||
424                  callback == reply_out_callback
425 #ifdef HAVE_SERVER_SUPPORT
426                  || callback == server_bulk_callback
427 #endif
428                  );
429
430         callback (ev);
431 }
432
433 int ptlrpc_uuid_to_peer (struct obd_uuid *uuid,
434                          lnet_process_id_t *peer, lnet_nid_t *self)
435 {
436         int               best_dist = 0;
437         __u32             best_order = 0;
438         int               count = 0;
439         int               rc = -ENOENT;
440         int               portals_compatibility;
441         int               dist;
442         __u32             order;
443         lnet_nid_t        dst_nid;
444         lnet_nid_t        src_nid;
445
446         portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL);
447
448         peer->pid = LUSTRE_SRV_LNET_PID;
449
450         /* Choose the matching UUID that's closest */
451         while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) {
452                 dist = LNetDist(dst_nid, &src_nid, &order);
453                 if (dist < 0)
454                         continue;
455
456                 if (dist == 0) {                /* local! use loopback LND */
457                         peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0);
458                         rc = 0;
459                         break;
460                 }
461
462                 if (rc < 0 ||
463                     dist < best_dist ||
464                     (dist == best_dist && order < best_order)) {
465                         best_dist = dist;
466                         best_order = order;
467
468                         if (portals_compatibility > 1) {
469                                 /* Strong portals compatibility: Zero the nid's
470                                  * NET, so if I'm reading new config logs, or
471                                  * getting configured by (new) lconf I can
472                                  * still talk to old servers. */
473                                 dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid));
474                                 src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid));
475                         }
476                         peer->nid = dst_nid;
477                         *self = src_nid;
478                         rc = 0;
479                 }
480         }
481
482         CDEBUG(D_NET,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer));
483         return rc;
484 }
485
486 void ptlrpc_ni_fini(void)
487 {
488         cfs_waitq_t         waitq;
489         struct l_wait_info  lwi;
490         int                 rc;
491         int                 retries;
492
493         /* Wait for the event queue to become idle since there may still be
494          * messages in flight with pending events (i.e. the fire-and-forget
495          * messages == client requests and "non-difficult" server
496          * replies */
497
498         for (retries = 0;; retries++) {
499                 rc = LNetEQFree(ptlrpc_eq_h);
500                 switch (rc) {
501                 default:
502                         LBUG();
503
504                 case 0:
505                         LNetNIFini();
506                         return;
507
508                 case -EBUSY:
509                         if (retries != 0)
510                                 CWARN("Event queue still busy\n");
511
512                         /* Wait for a bit */
513                         cfs_waitq_init(&waitq);
514                         lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL);
515                         l_wait_event(waitq, 0, &lwi);
516                         break;
517                 }
518         }
519         /* notreached */
520 }
521
522 lnet_pid_t ptl_get_pid(void)
523 {
524         lnet_pid_t        pid;
525
526 #ifndef  __KERNEL__
527         pid = getpid();
528 #else
529         pid = LUSTRE_SRV_LNET_PID;
530 #endif
531         return pid;
532 }
533
534 int ptlrpc_ni_init(void)
535 {
536         int              rc;
537         lnet_pid_t       pid;
538
539         pid = ptl_get_pid();
540         CDEBUG(D_NET, "My pid is: %x\n", pid);
541
542         /* We're not passing any limits yet... */
543         rc = LNetNIInit(pid);
544         if (rc < 0) {
545                 CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
546                 return (-ENOENT);
547         }
548
549         /* CAVEAT EMPTOR: how we process portals events is _radically_
550          * different depending on... */
551 #ifdef __KERNEL__
552         /* kernel LNet calls our master callback when there are new event,
553          * because we are guaranteed to get every event via callback,
554          * so we just set EQ size to 0 to avoid overhread of serializing
555          * enqueue/dequeue operations in LNet. */
556         rc = LNetEQAlloc(0, ptlrpc_master_callback, &ptlrpc_eq_h);
557 #else
558         /* liblustre calls the master callback when it removes events from the
559          * event queue.  The event queue has to be big enough not to drop
560          * anything */
561         rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &ptlrpc_eq_h);
562 #endif
563         if (rc == 0)
564                 return 0;
565
566         CERROR ("Failed to allocate event queue: %d\n", rc);
567         LNetNIFini();
568
569         return (-ENOMEM);
570 }
571
572 #ifndef __KERNEL__
573 CFS_LIST_HEAD(liblustre_wait_callbacks);
574 CFS_LIST_HEAD(liblustre_idle_callbacks);
575 void *liblustre_services_callback;
576
577 void *
578 liblustre_register_waitidle_callback (cfs_list_t *callback_list,
579                                       const char *name,
580                                       int (*fn)(void *arg), void *arg)
581 {
582         struct liblustre_wait_callback *llwc;
583
584         OBD_ALLOC(llwc, sizeof(*llwc));
585         LASSERT (llwc != NULL);
586
587         llwc->llwc_name = name;
588         llwc->llwc_fn = fn;
589         llwc->llwc_arg = arg;
590         cfs_list_add_tail(&llwc->llwc_list, callback_list);
591
592         return (llwc);
593 }
594
595 void
596 liblustre_deregister_waitidle_callback (void *opaque)
597 {
598         struct liblustre_wait_callback *llwc = opaque;
599
600         cfs_list_del(&llwc->llwc_list);
601         OBD_FREE(llwc, sizeof(*llwc));
602 }
603
604 void *
605 liblustre_register_wait_callback (const char *name,
606                                   int (*fn)(void *arg), void *arg)
607 {
608         return liblustre_register_waitidle_callback(&liblustre_wait_callbacks,
609                                                     name, fn, arg);
610 }
611
612 void
613 liblustre_deregister_wait_callback (void *opaque)
614 {
615         liblustre_deregister_waitidle_callback(opaque);
616 }
617
618 void *
619 liblustre_register_idle_callback (const char *name,
620                                   int (*fn)(void *arg), void *arg)
621 {
622         return liblustre_register_waitidle_callback(&liblustre_idle_callbacks,
623                                                     name, fn, arg);
624 }
625
626 void
627 liblustre_deregister_idle_callback (void *opaque)
628 {
629         liblustre_deregister_waitidle_callback(opaque);
630 }
631
632 int
633 liblustre_check_events (int timeout)
634 {
635         lnet_event_t ev;
636         int         rc;
637         int         i;
638         ENTRY;
639
640         rc = LNetEQPoll(&ptlrpc_eq_h, 1, timeout * 1000, &ev, &i);
641         if (rc == 0)
642                 RETURN(0);
643
644         LASSERT (rc == -EOVERFLOW || rc == 1);
645
646         /* liblustre: no asynch callback so we can't afford to miss any
647          * events... */
648         if (rc == -EOVERFLOW) {
649                 CERROR ("Dropped an event!!!\n");
650                 abort();
651         }
652
653         ptlrpc_master_callback (&ev);
654         RETURN(1);
655 }
656
657 int liblustre_waiting = 0;
658
659 int
660 liblustre_wait_event (int timeout)
661 {
662         cfs_list_t                     *tmp;
663         struct liblustre_wait_callback *llwc;
664         int                             found_something = 0;
665
666         /* single threaded recursion check... */
667         liblustre_waiting = 1;
668
669         for (;;) {
670                 /* Deal with all pending events */
671                 while (liblustre_check_events(0))
672                         found_something = 1;
673
674                 /* Give all registered callbacks a bite at the cherry */
675                 cfs_list_for_each(tmp, &liblustre_wait_callbacks) {
676                         llwc = cfs_list_entry(tmp,
677                                               struct liblustre_wait_callback,
678                                               llwc_list);
679
680                         if (llwc->llwc_fn(llwc->llwc_arg))
681                                 found_something = 1;
682                 }
683
684                 if (found_something || timeout == 0)
685                         break;
686
687                 /* Nothing so far, but I'm allowed to block... */
688                 found_something = liblustre_check_events(timeout);
689                 if (!found_something)           /* still nothing */
690                         break;                  /* I timed out */
691         }
692
693         liblustre_waiting = 0;
694
695         return found_something;
696 }
697
698 void
699 liblustre_wait_idle(void)
700 {
701         static int recursed = 0;
702
703         cfs_list_t                     *tmp;
704         struct liblustre_wait_callback *llwc;
705         int                             idle = 0;
706
707         LASSERT(!recursed);
708         recursed = 1;
709
710         do {
711                 liblustre_wait_event(0);
712
713                 idle = 1;
714
715                 cfs_list_for_each(tmp, &liblustre_idle_callbacks) {
716                         llwc = cfs_list_entry(tmp,
717                                               struct liblustre_wait_callback,
718                                               llwc_list);
719
720                         if (!llwc->llwc_fn(llwc->llwc_arg)) {
721                                 idle = 0;
722                                 break;
723                         }
724                 }
725
726         } while (!idle);
727
728         recursed = 0;
729 }
730
731 #endif /* __KERNEL__ */
732
733 int ptlrpc_init_portals(void)
734 {
735         int   rc = ptlrpc_ni_init();
736
737         if (rc != 0) {
738                 CERROR("network initialisation failed\n");
739                 return -EIO;
740         }
741 #ifndef __KERNEL__
742         liblustre_services_callback =
743                 liblustre_register_wait_callback("liblustre_check_services",
744                                                  &liblustre_check_services,
745                                                  NULL);
746         cfs_init_completion_module(liblustre_wait_event);
747 #endif
748         rc = ptlrpcd_addref();
749         if (rc == 0)
750                 return 0;
751
752         CERROR("rpcd initialisation failed\n");
753 #ifndef __KERNEL__
754         liblustre_deregister_wait_callback(liblustre_services_callback);
755 #endif
756         ptlrpc_ni_fini();
757         return rc;
758 }
759
760 void ptlrpc_exit_portals(void)
761 {
762 #ifndef __KERNEL__
763         liblustre_deregister_wait_callback(liblustre_services_callback);
764 #endif
765         ptlrpcd_decref();
766         ptlrpc_ni_fini();
767 }