Whamcloud - gitweb
Fix lost unlinked ev in reply_in_callback (Isaac)
[fs/lustre-release.git] / lustre / ptlrpc / events.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_RPC
38
39 #ifndef __KERNEL__
40 # include <liblustre.h>
41 #else
42 # include <libcfs/libcfs.h>
43 # ifdef __mips64__
44 #  include <linux/kernel.h>
45 # endif
46 #endif
47
48 #include <obd_class.h>
49 #include <lustre_net.h>
50 #include <lustre_sec.h>
51 #include "ptlrpc_internal.h"
52
53 lnet_handle_eq_t   ptlrpc_eq_h;
54
55 /*
56  *  Client's outgoing request callback
57  */
58 void request_out_callback(lnet_event_t *ev)
59 {
60         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
61         struct ptlrpc_request *req = cbid->cbid_arg;
62         ENTRY;
63
64         LASSERT (ev->type == LNET_EVENT_SEND ||
65                  ev->type == LNET_EVENT_UNLINK);
66         LASSERT (ev->unlinked);
67
68         DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
69                   "type %d, status %d", ev->type, ev->status);
70
71         sptlrpc_request_out_callback(req);
72
73         if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) {
74
75                 /* Failed send: make it seem like the reply timed out, just
76                  * like failing sends in client.c does currently...  */
77
78                 spin_lock(&req->rq_lock);
79                 req->rq_net_err = 1;
80                 spin_unlock(&req->rq_lock);
81
82                 ptlrpc_client_wake_req(req);
83         }
84
85         ptlrpc_req_finished(req);
86
87         EXIT;
88 }
89
90 /*
91  * Client's incoming reply callback
92  */
93 void reply_in_callback(lnet_event_t *ev)
94 {
95         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
96         struct ptlrpc_request *req = cbid->cbid_arg;
97         ENTRY;
98
99         DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
100                   "type %d, status %d", ev->type, ev->status);
101
102         LASSERT (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK);
103         LASSERT (ev->md.start == req->rq_repbuf);
104         LASSERT (ev->mlength <= req->rq_repbuf_len);
105         /* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests
106            for adaptive timeouts' early reply. */
107         LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0);
108
109         spin_lock(&req->rq_lock);
110
111         req->rq_receiving_reply = 0;
112         req->rq_early = 0;
113         if (ev->unlinked)
114                 req->rq_must_unlink = 0;
115
116         if (ev->status)
117                 goto out_wake;
118         if (ev->type == LNET_EVENT_UNLINK) {
119                 LASSERT(ev->unlinked);
120                 DEBUG_REQ(D_RPCTRACE, req, "unlink");
121                 goto out_wake;
122         }
123
124         if ((ev->offset == 0) &&
125             ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT))) {
126                 /* Early reply */
127                 DEBUG_REQ(D_ADAPTTO, req,
128                           "Early reply received: mlen=%u offset=%d replen=%d "
129                           "replied=%d unlinked=%d", ev->mlength, ev->offset,
130                           req->rq_replen, req->rq_replied, ev->unlinked);
131
132                 req->rq_early_count++; /* number received, client side */
133
134                 if (req->rq_replied)   /* already got the real reply */
135                         goto out_wake;
136
137                 req->rq_early = 1;
138                 req->rq_reply_off = ev->offset;
139                 req->rq_nob_received = ev->mlength;
140                 /* And we're still receiving */
141                 req->rq_receiving_reply = 1;
142         } else {
143                 /* Real reply */
144                 req->rq_replied = 1;
145                 req->rq_reply_off = ev->offset;
146                 req->rq_nob_received = ev->mlength;
147                 /* LNetMDUnlink can't be called under the LNET_LOCK,
148                    so we must unlink in ptlrpc_unregister_reply */
149                 DEBUG_REQ(D_INFO, req,
150                           "reply in flags=%x mlen=%u offset=%d replen=%d",
151                           lustre_msg_get_flags(req->rq_reqmsg),
152                           ev->mlength, ev->offset, req->rq_replen);
153         }
154
155         req->rq_import->imp_last_reply_time = cfs_time_current_sec();
156
157 out_wake:
158         /* NB don't unlock till after wakeup; req can disappear under us
159          * since we don't have our own ref */
160         ptlrpc_client_wake_req(req);
161         spin_unlock(&req->rq_lock);
162         EXIT;
163 }
164
165 /*
166  * Client's bulk has been written/read
167  */
168 void client_bulk_callback (lnet_event_t *ev)
169 {
170         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
171         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
172         ENTRY;
173
174         LASSERT ((desc->bd_type == BULK_PUT_SINK &&
175                   ev->type == LNET_EVENT_PUT) ||
176                  (desc->bd_type == BULK_GET_SOURCE &&
177                   ev->type == LNET_EVENT_GET) ||
178                  ev->type == LNET_EVENT_UNLINK);
179         LASSERT (ev->unlinked);
180
181         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
182                "event type %d, status %d, desc %p\n",
183                ev->type, ev->status, desc);
184
185         spin_lock(&desc->bd_lock);
186
187         LASSERT(desc->bd_network_rw);
188         desc->bd_network_rw = 0;
189
190         if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
191                 desc->bd_success = 1;
192                 desc->bd_nob_transferred = ev->mlength;
193                 desc->bd_sender = ev->sender;
194         }
195
196         sptlrpc_enc_pool_put_pages(desc);
197
198         /* NB don't unlock till after wakeup; desc can disappear under us
199          * otherwise */
200         ptlrpc_client_wake_req(desc->bd_req);
201
202         spin_unlock(&desc->bd_lock);
203         EXIT;
204 }
205
206 /*
207  * Server's incoming request callback
208  */
209 void request_in_callback(lnet_event_t *ev)
210 {
211         struct ptlrpc_cb_id               *cbid = ev->md.user_ptr;
212         struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
213         struct ptlrpc_service             *service = rqbd->rqbd_service;
214         struct ptlrpc_request             *req;
215         ENTRY;
216
217         LASSERT (ev->type == LNET_EVENT_PUT ||
218                  ev->type == LNET_EVENT_UNLINK);
219         LASSERT ((char *)ev->md.start >= rqbd->rqbd_buffer);
220         LASSERT ((char *)ev->md.start + ev->offset + ev->mlength <=
221                  rqbd->rqbd_buffer + service->srv_buf_size);
222
223         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
224                "event type %d, status %d, service %s\n",
225                ev->type, ev->status, service->srv_name);
226
227         if (ev->unlinked) {
228                 /* If this is the last request message to fit in the
229                  * request buffer we can use the request object embedded in
230                  * rqbd.  Note that if we failed to allocate a request,
231                  * we'd have to re-post the rqbd, which we can't do in this
232                  * context. */
233                 req = &rqbd->rqbd_req;
234                 memset(req, 0, sizeof (*req));
235         } else {
236                 LASSERT (ev->type == LNET_EVENT_PUT);
237                 if (ev->status != 0) {
238                         /* We moaned above already... */
239                         return;
240                 }
241                 OBD_ALLOC_GFP(req, sizeof(*req), CFS_ALLOC_ATOMIC_TRY);
242                 if (req == NULL) {
243                         CERROR("Can't allocate incoming request descriptor: "
244                                "Dropping %s RPC from %s\n",
245                                service->srv_name,
246                                libcfs_id2str(ev->initiator));
247                         return;
248                 }
249         }
250
251         /* NB we ABSOLUTELY RELY on req being zeroed, so pointers are NULL,
252          * flags are reset and scalars are zero.  We only set the message
253          * size to non-zero if this was a successful receive. */
254         req->rq_xid = ev->match_bits;
255         req->rq_reqbuf = ev->md.start + ev->offset;
256         if (ev->type == LNET_EVENT_PUT && ev->status == 0)
257                 req->rq_reqdata_len = ev->mlength;
258         do_gettimeofday(&req->rq_arrival_time);
259         req->rq_peer = ev->initiator;
260         req->rq_self = ev->target.nid;
261         req->rq_rqbd = rqbd;
262         req->rq_phase = RQ_PHASE_NEW;
263 #ifdef CRAY_XT3
264         req->rq_uid = ev->uid;
265 #endif
266         spin_lock_init(&req->rq_lock);
267         CFS_INIT_LIST_HEAD(&req->rq_timed_list);
268         atomic_set(&req->rq_refcount, 1);
269         if (ev->type == LNET_EVENT_PUT)
270                 DEBUG_REQ(D_RPCTRACE, req, "incoming req");
271
272         CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer));
273
274         spin_lock(&service->srv_lock);
275
276         req->rq_history_seq = service->srv_request_seq++;
277         list_add_tail(&req->rq_history_list, &service->srv_request_history);
278
279         if (ev->unlinked) {
280                 service->srv_nrqbd_receiving--;
281                 CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
282                        service->srv_nrqbd_receiving);
283
284                 /* Normally, don't complain about 0 buffers posted; LNET won't
285                  * drop incoming reqs since we set the portal lazy */
286                 if (test_req_buffer_pressure &&
287                     ev->type != LNET_EVENT_UNLINK &&
288                     service->srv_nrqbd_receiving == 0)
289                         CWARN("All %s request buffers busy\n",
290                               service->srv_name);
291
292                 /* req takes over the network's ref on rqbd */
293         } else {
294                 /* req takes a ref on rqbd */
295                 rqbd->rqbd_refcount++;
296         }
297
298         list_add_tail(&req->rq_list, &service->srv_req_in_queue);
299         service->srv_n_queued_reqs++;
300
301         /* NB everything can disappear under us once the request
302          * has been queued and we unlock, so do the wake now... */
303         cfs_waitq_signal(&service->srv_waitq);
304
305         spin_unlock(&service->srv_lock);
306         EXIT;
307 }
308
309 /*
310  *  Server's outgoing reply callback
311  */
312 void reply_out_callback(lnet_event_t *ev)
313 {
314         struct ptlrpc_cb_id       *cbid = ev->md.user_ptr;
315         struct ptlrpc_reply_state *rs = cbid->cbid_arg;
316         struct ptlrpc_service     *svc = rs->rs_service;
317         ENTRY;
318
319         LASSERT (ev->type == LNET_EVENT_SEND ||
320                  ev->type == LNET_EVENT_ACK ||
321                  ev->type == LNET_EVENT_UNLINK);
322
323         if (!rs->rs_difficult) {
324                 /* 'Easy' replies have no further processing so I drop the
325                  * net's ref on 'rs' */
326                 LASSERT (ev->unlinked);
327                 ptlrpc_rs_decref(rs);
328                 atomic_dec (&svc->srv_outstanding_replies);
329                 EXIT;
330                 return;
331         }
332
333         LASSERT (rs->rs_on_net);
334
335         if (ev->unlinked) {
336                 /* Last network callback.  The net's ref on 'rs' stays put
337                  * until ptlrpc_server_handle_reply() is done with it */
338                 spin_lock(&svc->srv_lock);
339                 spin_lock(&rs->rs_lock);
340                 rs->rs_on_net = 0;
341                 if (!rs->rs_no_ack ||
342                     rs->rs_transno <= rs->rs_export->exp_obd->obd_last_committed)
343                         ptlrpc_schedule_difficult_reply (rs);
344                 spin_unlock(&rs->rs_lock);
345                 spin_unlock(&svc->srv_lock);
346         }
347
348         EXIT;
349 }
350
351 /*
352  * Server's bulk completion callback
353  */
354 void server_bulk_callback (lnet_event_t *ev)
355 {
356         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
357         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
358         ENTRY;
359
360         LASSERT (ev->type == LNET_EVENT_SEND ||
361                  ev->type == LNET_EVENT_UNLINK ||
362                  (desc->bd_type == BULK_PUT_SOURCE &&
363                   ev->type == LNET_EVENT_ACK) ||
364                  (desc->bd_type == BULK_GET_SINK &&
365                   ev->type == LNET_EVENT_REPLY));
366
367         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
368                "event type %d, status %d, desc %p\n",
369                ev->type, ev->status, desc);
370
371         spin_lock(&desc->bd_lock);
372
373         if ((ev->type == LNET_EVENT_ACK ||
374              ev->type == LNET_EVENT_REPLY) &&
375             ev->status == 0) {
376                 /* We heard back from the peer, so even if we get this
377                  * before the SENT event (oh yes we can), we know we
378                  * read/wrote the peer buffer and how much... */
379                 desc->bd_success = 1;
380                 desc->bd_nob_transferred = ev->mlength;
381                 desc->bd_sender = ev->sender;
382         }
383
384         if (ev->unlinked) {
385                 /* This is the last callback no matter what... */
386                 desc->bd_network_rw = 0;
387                 cfs_waitq_signal(&desc->bd_waitq);
388         }
389
390         spin_unlock(&desc->bd_lock);
391         EXIT;
392 }
393
394 static void ptlrpc_master_callback(lnet_event_t *ev)
395 {
396         struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
397         void (*callback)(lnet_event_t *ev) = cbid->cbid_fn;
398
399         /* Honestly, it's best to find out early. */
400         LASSERT (cbid->cbid_arg != LP_POISON);
401         LASSERT (callback == request_out_callback ||
402                  callback == reply_in_callback ||
403                  callback == client_bulk_callback ||
404                  callback == request_in_callback ||
405                  callback == reply_out_callback ||
406                  callback == server_bulk_callback);
407
408         callback (ev);
409 }
410
411 int ptlrpc_uuid_to_peer (struct obd_uuid *uuid,
412                          lnet_process_id_t *peer, lnet_nid_t *self)
413 {
414         int               best_dist = 0;
415         __u32             best_order = 0;
416         int               count = 0;
417         int               rc = -ENOENT;
418         int               portals_compatibility;
419         int               dist;
420         __u32             order;
421         lnet_nid_t        dst_nid;
422         lnet_nid_t        src_nid;
423
424         portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL);
425
426         peer->pid = LUSTRE_SRV_LNET_PID;
427
428         /* Choose the matching UUID that's closest */
429         while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) {
430                 dist = LNetDist(dst_nid, &src_nid, &order);
431                 if (dist < 0)
432                         continue;
433
434                 if (dist == 0) {                /* local! use loopback LND */
435                         peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0);
436                         rc = 0;
437                         break;
438                 }
439
440                 if (rc < 0 ||
441                     dist < best_dist ||
442                     (dist == best_dist && order < best_order)) {
443                         best_dist = dist;
444                         best_order = order;
445
446                         if (portals_compatibility > 1) {
447                                 /* Strong portals compatibility: Zero the nid's
448                                  * NET, so if I'm reading new config logs, or
449                                  * getting configured by (new) lconf I can
450                                  * still talk to old servers. */
451                                 dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid));
452                                 src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid));
453                         }
454                         peer->nid = dst_nid;
455                         *self = src_nid;
456                         rc = 0;
457                 }
458         }
459
460         CDEBUG(D_NET,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer));
461         if (rc != 0)
462                 CERROR("No NID found for %s\n", uuid->uuid);
463         return rc;
464 }
465
466 void ptlrpc_ni_fini(void)
467 {
468         cfs_waitq_t         waitq;
469         struct l_wait_info  lwi;
470         int                 rc;
471         int                 retries;
472
473         /* Wait for the event queue to become idle since there may still be
474          * messages in flight with pending events (i.e. the fire-and-forget
475          * messages == client requests and "non-difficult" server
476          * replies */
477
478         for (retries = 0;; retries++) {
479                 rc = LNetEQFree(ptlrpc_eq_h);
480                 switch (rc) {
481                 default:
482                         LBUG();
483
484                 case 0:
485                         LNetNIFini();
486                         return;
487
488                 case -EBUSY:
489                         if (retries != 0)
490                                 CWARN("Event queue still busy\n");
491
492                         /* Wait for a bit */
493                         cfs_waitq_init(&waitq);
494                         lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL);
495                         l_wait_event(waitq, 0, &lwi);
496                         break;
497                 }
498         }
499         /* notreached */
500 }
501
502 lnet_pid_t ptl_get_pid(void)
503 {
504         lnet_pid_t        pid;
505
506 #ifndef  __KERNEL__
507         pid = getpid();
508 #else
509         pid = LUSTRE_SRV_LNET_PID;
510 #endif
511         return pid;
512 }
513
514 int ptlrpc_ni_init(void)
515 {
516         int              rc;
517         lnet_pid_t       pid;
518
519         pid = ptl_get_pid();
520         CDEBUG(D_NET, "My pid is: %x\n", pid);
521
522         /* We're not passing any limits yet... */
523         rc = LNetNIInit(pid);
524         if (rc < 0) {
525                 CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
526                 return (-ENOENT);
527         }
528
529         /* CAVEAT EMPTOR: how we process portals events is _radically_
530          * different depending on... */
531 #ifdef __KERNEL__
532         /* kernel portals calls our master callback when events are added to
533          * the event queue.  In fact lustre never pulls events off this queue,
534          * so it's only sized for some debug history. */
535         rc = LNetEQAlloc(1024, ptlrpc_master_callback, &ptlrpc_eq_h);
536 #else
537         /* liblustre calls the master callback when it removes events from the
538          * event queue.  The event queue has to be big enough not to drop
539          * anything */
540         rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &ptlrpc_eq_h);
541 #endif
542         if (rc == 0)
543                 return 0;
544
545         CERROR ("Failed to allocate event queue: %d\n", rc);
546         LNetNIFini();
547
548         return (-ENOMEM);
549 }
550
551 #ifndef __KERNEL__
552 CFS_LIST_HEAD(liblustre_wait_callbacks);
553 CFS_LIST_HEAD(liblustre_idle_callbacks);
554 void *liblustre_services_callback;
555
556 void *
557 liblustre_register_waitidle_callback (struct list_head *callback_list,
558                                       const char *name,
559                                       int (*fn)(void *arg), void *arg)
560 {
561         struct liblustre_wait_callback *llwc;
562
563         OBD_ALLOC(llwc, sizeof(*llwc));
564         LASSERT (llwc != NULL);
565
566         llwc->llwc_name = name;
567         llwc->llwc_fn = fn;
568         llwc->llwc_arg = arg;
569         list_add_tail(&llwc->llwc_list, callback_list);
570
571         return (llwc);
572 }
573
574 void
575 liblustre_deregister_waitidle_callback (void *opaque)
576 {
577         struct liblustre_wait_callback *llwc = opaque;
578
579         list_del(&llwc->llwc_list);
580         OBD_FREE(llwc, sizeof(*llwc));
581 }
582
583 void *
584 liblustre_register_wait_callback (const char *name,
585                                   int (*fn)(void *arg), void *arg)
586 {
587         return liblustre_register_waitidle_callback(&liblustre_wait_callbacks,
588                                                     name, fn, arg);
589 }
590
591 void
592 liblustre_deregister_wait_callback (void *opaque)
593 {
594         liblustre_deregister_waitidle_callback(opaque);
595 }
596
597 void *
598 liblustre_register_idle_callback (const char *name,
599                                   int (*fn)(void *arg), void *arg)
600 {
601         return liblustre_register_waitidle_callback(&liblustre_idle_callbacks,
602                                                     name, fn, arg);
603 }
604
605 void
606 liblustre_deregister_idle_callback (void *opaque)
607 {
608         liblustre_deregister_waitidle_callback(opaque);
609 }
610
611 int
612 liblustre_check_events (int timeout)
613 {
614         lnet_event_t ev;
615         int         rc;
616         int         i;
617         ENTRY;
618
619         rc = LNetEQPoll(&ptlrpc_eq_h, 1, timeout * 1000, &ev, &i);
620         if (rc == 0)
621                 RETURN(0);
622
623         LASSERT (rc == -EOVERFLOW || rc == 1);
624
625         /* liblustre: no asynch callback so we can't affort to miss any
626          * events... */
627         if (rc == -EOVERFLOW) {
628                 CERROR ("Dropped an event!!!\n");
629                 abort();
630         }
631
632         ptlrpc_master_callback (&ev);
633         RETURN(1);
634 }
635
636 int liblustre_waiting = 0;
637
638 int
639 liblustre_wait_event (int timeout)
640 {
641         struct list_head               *tmp;
642         struct liblustre_wait_callback *llwc;
643         int                             found_something = 0;
644
645         /* single threaded recursion check... */
646         liblustre_waiting = 1;
647
648         for (;;) {
649                 /* Deal with all pending events */
650                 while (liblustre_check_events(0))
651                         found_something = 1;
652
653                 /* Give all registered callbacks a bite at the cherry */
654                 list_for_each(tmp, &liblustre_wait_callbacks) {
655                         llwc = list_entry(tmp, struct liblustre_wait_callback,
656                                           llwc_list);
657
658                         if (llwc->llwc_fn(llwc->llwc_arg))
659                                 found_something = 1;
660                 }
661
662                 if (found_something || timeout == 0)
663                         break;
664
665                 /* Nothing so far, but I'm allowed to block... */
666                 found_something = liblustre_check_events(timeout);
667                 if (!found_something)           /* still nothing */
668                         break;                  /* I timed out */
669         }
670
671         liblustre_waiting = 0;
672
673         return found_something;
674 }
675
676 void
677 liblustre_wait_idle(void)
678 {
679         static int recursed = 0;
680
681         struct list_head               *tmp;
682         struct liblustre_wait_callback *llwc;
683         int                             idle = 0;
684
685         LASSERT(!recursed);
686         recursed = 1;
687
688         do {
689                 liblustre_wait_event(0);
690
691                 idle = 1;
692
693                 list_for_each(tmp, &liblustre_idle_callbacks) {
694                         llwc = list_entry(tmp, struct liblustre_wait_callback,
695                                           llwc_list);
696
697                         if (!llwc->llwc_fn(llwc->llwc_arg)) {
698                                 idle = 0;
699                                 break;
700                         }
701                 }
702
703         } while (!idle);
704
705         recursed = 0;
706 }
707
708 #endif /* __KERNEL__ */
709
710 int ptlrpc_init_portals(void)
711 {
712         int   rc = ptlrpc_ni_init();
713
714         if (rc != 0) {
715                 CERROR("network initialisation failed\n");
716                 return -EIO;
717         }
718 #ifndef __KERNEL__
719         liblustre_services_callback =
720                 liblustre_register_wait_callback("liblustre_check_services",
721                                                  &liblustre_check_services,
722                                                  NULL);
723         init_completion_module(liblustre_wait_event);
724 #endif
725         rc = ptlrpcd_addref();
726         if (rc == 0)
727                 return 0;
728
729         CERROR("rpcd initialisation failed\n");
730 #ifndef __KERNEL__
731         liblustre_deregister_wait_callback(liblustre_services_callback);
732 #endif
733         ptlrpc_ni_fini();
734         return rc;
735 }
736
737 void ptlrpc_exit_portals(void)
738 {
739 #ifndef __KERNEL__
740         liblustre_deregister_wait_callback(liblustre_services_callback);
741 #endif
742         ptlrpcd_decref();
743         ptlrpc_ni_fini();
744 }