Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / ptlrpc / events.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  *
24  */
25
26 #define DEBUG_SUBSYSTEM S_RPC
27
28 #ifndef __KERNEL__
29 # include <liblustre.h>
30 #else
31 # ifdef __mips64__
32 #  include <linux/kernel.h>
33 # endif
34 #endif
35 #include <obd_class.h>
36 #include <lustre_net.h>
37 #include <lustre_sec.h>
38 #include "ptlrpc_internal.h"
39
40 lnet_handle_eq_t   ptlrpc_eq_h;
41
42 /*
43  *  Client's outgoing request callback
44  */
45 void request_out_callback(lnet_event_t *ev)
46 {
47         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
48         struct ptlrpc_request *req = cbid->cbid_arg;
49         ENTRY;
50
51         LASSERT (ev->type == LNET_EVENT_SEND ||
52                  ev->type == LNET_EVENT_UNLINK);
53         LASSERT (ev->unlinked);
54
55         DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
56                   "type %d, status %d", ev->type, ev->status);
57
58         sptlrpc_request_out_callback(req);
59
60         if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) {
61
62                 /* Failed send: make it seem like the reply timed out, just
63                  * like failing sends in client.c does currently...  */
64
65                 spin_lock(&req->rq_lock);
66                 req->rq_net_err = 1;
67                 spin_unlock(&req->rq_lock);
68
69                 ptlrpc_wake_client_req(req);
70         }
71
72         ptlrpc_req_finished(req);
73
74         EXIT;
75 }
76
77 /*
78  * Client's incoming reply callback
79  */
80 void reply_in_callback(lnet_event_t *ev)
81 {
82         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
83         struct ptlrpc_request *req = cbid->cbid_arg;
84         ENTRY;
85
86         DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
87                   "type %d, status %d", ev->type, ev->status);
88
89         LASSERT (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK);
90         LASSERT (ev->md.start == req->rq_repbuf);
91         LASSERT (ev->mlength <= req->rq_repbuf_len);
92         /* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests
93            for adaptive timeouts' early reply. */
94         LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0);
95
96         spin_lock(&req->rq_lock);
97
98         req->rq_receiving_reply = 0;
99         req->rq_early = 0;
100
101         if (ev->status)
102                 goto out_wake;
103         if (ev->type == LNET_EVENT_UNLINK) {
104                 req->rq_must_unlink = 0;
105                 DEBUG_REQ(D_RPCTRACE, req, "unlink");
106                 goto out_wake;
107         }
108
109         if ((ev->offset == 0) &&
110             ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT))) {
111                 /* Early reply */
112                 DEBUG_REQ(D_ADAPTTO, req,
113                           "Early reply received: mlen=%u offset=%d replen=%d "
114                           "replied=%d unlinked=%d", ev->mlength, ev->offset,
115                           req->rq_replen, req->rq_replied, ev->unlinked);
116
117                 req->rq_early_count++; /* number received, client side */
118                 if (req->rq_replied) {
119                         /* If we already got the real reply, then we need to
120                          * check if lnet_finalize() unlinked the md.  In that
121                          * case, there will be no further callback of type
122                          * LNET_EVENT_UNLINK.
123                          */
124                         if (ev->unlinked)
125                                 req->rq_must_unlink = 0;
126                         else
127                                 DEBUG_REQ(D_RPCTRACE, req, "unlinked in reply");
128                         goto out_wake;
129                 }
130                 req->rq_early = 1;
131                 req->rq_reply_off = ev->offset;
132                 req->rq_nob_received = ev->mlength;
133                 /* And we're still receiving */
134                 req->rq_receiving_reply = 1;
135         } else {
136                 /* Real reply */
137                 req->rq_replied = 1;
138                 req->rq_reply_off = ev->offset;
139                 req->rq_nob_received = ev->mlength;
140                 /* LNetMDUnlink can't be called under the LNET_LOCK,
141                    so we must unlink in ptlrpc_unregister_reply */
142                 DEBUG_REQ(D_INFO, req,
143                           "reply in flags=%x mlen=%u offset=%d replen=%d",
144                           lustre_msg_get_flags(req->rq_reqmsg),
145                           ev->mlength, ev->offset, req->rq_replen);
146         }
147
148         req->rq_import->imp_last_reply_time = cfs_time_current_sec();
149
150 out_wake:
151         /* NB don't unlock till after wakeup; req can disappear under us
152          * since we don't have our own ref */
153         ptlrpc_wake_client_req(req);
154         spin_unlock(&req->rq_lock);
155         EXIT;
156 }
157
158 /*
159  * Client's bulk has been written/read
160  */
161 void client_bulk_callback (lnet_event_t *ev)
162 {
163         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
164         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
165         ENTRY;
166
167         LASSERT ((desc->bd_type == BULK_PUT_SINK &&
168                   ev->type == LNET_EVENT_PUT) ||
169                  (desc->bd_type == BULK_GET_SOURCE &&
170                   ev->type == LNET_EVENT_GET) ||
171                  ev->type == LNET_EVENT_UNLINK);
172         LASSERT (ev->unlinked);
173
174         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
175                "event type %d, status %d, desc %p\n",
176                ev->type, ev->status, desc);
177
178         spin_lock(&desc->bd_lock);
179
180         LASSERT(desc->bd_network_rw);
181         desc->bd_network_rw = 0;
182
183         if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
184                 desc->bd_success = 1;
185                 desc->bd_nob_transferred = ev->mlength;
186                 desc->bd_sender = ev->sender;
187         }
188
189         sptlrpc_enc_pool_put_pages(desc);
190
191         /* NB don't unlock till after wakeup; desc can disappear under us
192          * otherwise */
193         ptlrpc_wake_client_req(desc->bd_req);
194
195         spin_unlock(&desc->bd_lock);
196         EXIT;
197 }
198
199 /*
200  * Server's incoming request callback
201  */
202 void request_in_callback(lnet_event_t *ev)
203 {
204         struct ptlrpc_cb_id               *cbid = ev->md.user_ptr;
205         struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
206         struct ptlrpc_service             *service = rqbd->rqbd_service;
207         struct ptlrpc_request             *req;
208         ENTRY;
209
210         LASSERT (ev->type == LNET_EVENT_PUT ||
211                  ev->type == LNET_EVENT_UNLINK);
212         LASSERT ((char *)ev->md.start >= rqbd->rqbd_buffer);
213         LASSERT ((char *)ev->md.start + ev->offset + ev->mlength <=
214                  rqbd->rqbd_buffer + service->srv_buf_size);
215
216         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
217                "event type %d, status %d, service %s\n",
218                ev->type, ev->status, service->srv_name);
219
220         if (ev->unlinked) {
221                 /* If this is the last request message to fit in the
222                  * request buffer we can use the request object embedded in
223                  * rqbd.  Note that if we failed to allocate a request,
224                  * we'd have to re-post the rqbd, which we can't do in this
225                  * context. */
226                 req = &rqbd->rqbd_req;
227                 memset(req, 0, sizeof (*req));
228         } else {
229                 LASSERT (ev->type == LNET_EVENT_PUT);
230                 if (ev->status != 0) {
231                         /* We moaned above already... */
232                         return;
233                 }
234                 OBD_ALLOC_GFP(req, sizeof(*req), CFS_ALLOC_ATOMIC_TRY);
235                 if (req == NULL) {
236                         CERROR("Can't allocate incoming request descriptor: "
237                                "Dropping %s RPC from %s\n",
238                                service->srv_name,
239                                libcfs_id2str(ev->initiator));
240                         return;
241                 }
242         }
243
244         /* NB we ABSOLUTELY RELY on req being zeroed, so pointers are NULL,
245          * flags are reset and scalars are zero.  We only set the message
246          * size to non-zero if this was a successful receive. */
247         req->rq_xid = ev->match_bits;
248         req->rq_reqbuf = ev->md.start + ev->offset;
249         if (ev->type == LNET_EVENT_PUT && ev->status == 0)
250                 req->rq_reqdata_len = ev->mlength;
251         do_gettimeofday(&req->rq_arrival_time);
252         req->rq_peer = ev->initiator;
253         req->rq_self = ev->target.nid;
254         req->rq_rqbd = rqbd;
255         req->rq_phase = RQ_PHASE_NEW;
256 #ifdef CRAY_XT3
257         req->rq_uid = ev->uid;
258 #endif
259         spin_lock_init(&req->rq_lock);
260         CFS_INIT_LIST_HEAD(&req->rq_timed_list);
261         atomic_set(&req->rq_refcount, 1);
262         if (ev->type == LNET_EVENT_PUT)
263                 DEBUG_REQ(D_RPCTRACE, req, "incoming req");
264
265         CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer));
266
267         spin_lock(&service->srv_lock);
268
269         req->rq_history_seq = service->srv_request_seq++;
270         list_add_tail(&req->rq_history_list, &service->srv_request_history);
271
272         if (ev->unlinked) {
273                 service->srv_nrqbd_receiving--;
274                 CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
275                        service->srv_nrqbd_receiving);
276
277                 /* Normally, don't complain about 0 buffers posted; LNET won't
278                  * drop incoming reqs since we set the portal lazy */
279                 if (test_req_buffer_pressure &&
280                     ev->type != LNET_EVENT_UNLINK &&
281                     service->srv_nrqbd_receiving == 0)
282                         CWARN("All %s request buffers busy\n",
283                               service->srv_name);
284
285                 /* req takes over the network's ref on rqbd */
286         } else {
287                 /* req takes a ref on rqbd */
288                 rqbd->rqbd_refcount++;
289         }
290
291         list_add_tail(&req->rq_list, &service->srv_req_in_queue);
292         service->srv_n_queued_reqs++;
293
294         /* NB everything can disappear under us once the request
295          * has been queued and we unlock, so do the wake now... */
296         cfs_waitq_signal(&service->srv_waitq);
297
298         spin_unlock(&service->srv_lock);
299         EXIT;
300 }
301
302 /*
303  *  Server's outgoing reply callback
304  */
305 void reply_out_callback(lnet_event_t *ev)
306 {
307         struct ptlrpc_cb_id       *cbid = ev->md.user_ptr;
308         struct ptlrpc_reply_state *rs = cbid->cbid_arg;
309         struct ptlrpc_service     *svc = rs->rs_service;
310         ENTRY;
311
312         LASSERT (ev->type == LNET_EVENT_SEND ||
313                  ev->type == LNET_EVENT_ACK ||
314                  ev->type == LNET_EVENT_UNLINK);
315
316         if (!rs->rs_difficult) {
317                 /* 'Easy' replies have no further processing so I drop the
318                  * net's ref on 'rs' */
319                 LASSERT (ev->unlinked);
320                 ptlrpc_rs_decref(rs);
321                 atomic_dec (&svc->srv_outstanding_replies);
322                 EXIT;
323                 return;
324         }
325
326         LASSERT (rs->rs_on_net);
327
328         if (ev->unlinked) {
329                 /* Last network callback.  The net's ref on 'rs' stays put
330                  * until ptlrpc_server_handle_reply() is done with it */
331                 spin_lock(&svc->srv_lock);
332                 rs->rs_on_net = 0;
333                 ptlrpc_schedule_difficult_reply (rs);
334                 spin_unlock(&svc->srv_lock);
335         }
336
337         EXIT;
338 }
339
340 /*
341  * Server's bulk completion callback
342  */
343 void server_bulk_callback (lnet_event_t *ev)
344 {
345         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
346         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
347         ENTRY;
348
349         LASSERT (ev->type == LNET_EVENT_SEND ||
350                  ev->type == LNET_EVENT_UNLINK ||
351                  (desc->bd_type == BULK_PUT_SOURCE &&
352                   ev->type == LNET_EVENT_ACK) ||
353                  (desc->bd_type == BULK_GET_SINK &&
354                   ev->type == LNET_EVENT_REPLY));
355
356         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
357                "event type %d, status %d, desc %p\n",
358                ev->type, ev->status, desc);
359
360         spin_lock(&desc->bd_lock);
361
362         if ((ev->type == LNET_EVENT_ACK ||
363              ev->type == LNET_EVENT_REPLY) &&
364             ev->status == 0) {
365                 /* We heard back from the peer, so even if we get this
366                  * before the SENT event (oh yes we can), we know we
367                  * read/wrote the peer buffer and how much... */
368                 desc->bd_success = 1;
369                 desc->bd_nob_transferred = ev->mlength;
370                 desc->bd_sender = ev->sender;
371         }
372
373         if (ev->unlinked) {
374                 /* This is the last callback no matter what... */
375                 desc->bd_network_rw = 0;
376                 cfs_waitq_signal(&desc->bd_waitq);
377         }
378
379         spin_unlock(&desc->bd_lock);
380         EXIT;
381 }
382
383 static void ptlrpc_master_callback(lnet_event_t *ev)
384 {
385         struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
386         void (*callback)(lnet_event_t *ev) = cbid->cbid_fn;
387
388         /* Honestly, it's best to find out early. */
389         LASSERT (cbid->cbid_arg != LP_POISON);
390         LASSERT (callback == request_out_callback ||
391                  callback == reply_in_callback ||
392                  callback == client_bulk_callback ||
393                  callback == request_in_callback ||
394                  callback == reply_out_callback ||
395                  callback == server_bulk_callback);
396
397         callback (ev);
398 }
399
400 int ptlrpc_uuid_to_peer (struct obd_uuid *uuid,
401                          lnet_process_id_t *peer, lnet_nid_t *self)
402 {
403         int               best_dist = 0;
404         __u32             best_order = 0;
405         int               count = 0;
406         int               rc = -ENOENT;
407         int               portals_compatibility;
408         int               dist;
409         __u32             order;
410         lnet_nid_t        dst_nid;
411         lnet_nid_t        src_nid;
412
413         portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL);
414
415         peer->pid = LUSTRE_SRV_LNET_PID;
416
417         /* Choose the matching UUID that's closest */
418         while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) {
419                 dist = LNetDist(dst_nid, &src_nid, &order);
420                 if (dist < 0)
421                         continue;
422
423                 if (dist == 0) {                /* local! use loopback LND */
424                         peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0);
425                         rc = 0;
426                         break;
427                 }
428
429                 if (rc < 0 ||
430                     dist < best_dist ||
431                     (dist == best_dist && order < best_order)) {
432                         best_dist = dist;
433                         best_order = order;
434
435                         if (portals_compatibility > 1) {
436                                 /* Strong portals compatibility: Zero the nid's
437                                  * NET, so if I'm reading new config logs, or
438                                  * getting configured by (new) lconf I can
439                                  * still talk to old servers. */
440                                 dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid));
441                                 src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid));
442                         }
443                         peer->nid = dst_nid;
444                         *self = src_nid;
445                         rc = 0;
446                 }
447         }
448
449         CDEBUG(D_NET,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer));
450         if (rc != 0)
451                 CERROR("No NID found for %s\n", uuid->uuid);
452         return rc;
453 }
454
455 void ptlrpc_ni_fini(void)
456 {
457         cfs_waitq_t         waitq;
458         struct l_wait_info  lwi;
459         int                 rc;
460         int                 retries;
461
462         /* Wait for the event queue to become idle since there may still be
463          * messages in flight with pending events (i.e. the fire-and-forget
464          * messages == client requests and "non-difficult" server
465          * replies */
466
467         for (retries = 0;; retries++) {
468                 rc = LNetEQFree(ptlrpc_eq_h);
469                 switch (rc) {
470                 default:
471                         LBUG();
472
473                 case 0:
474                         LNetNIFini();
475                         return;
476
477                 case -EBUSY:
478                         if (retries != 0)
479                                 CWARN("Event queue still busy\n");
480
481                         /* Wait for a bit */
482                         cfs_waitq_init(&waitq);
483                         lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL);
484                         l_wait_event(waitq, 0, &lwi);
485                         break;
486                 }
487         }
488         /* notreached */
489 }
490
491 lnet_pid_t ptl_get_pid(void)
492 {
493         lnet_pid_t        pid;
494
495 #ifndef  __KERNEL__
496         pid = getpid();
497 #else
498         pid = LUSTRE_SRV_LNET_PID;
499 #endif
500         return pid;
501 }
502
503 int ptlrpc_ni_init(void)
504 {
505         int              rc;
506         lnet_pid_t       pid;
507
508         pid = ptl_get_pid();
509         CDEBUG(D_NET, "My pid is: %x\n", pid);
510
511         /* We're not passing any limits yet... */
512         rc = LNetNIInit(pid);
513         if (rc < 0) {
514                 CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
515                 return (-ENOENT);
516         }
517
518         /* CAVEAT EMPTOR: how we process portals events is _radically_
519          * different depending on... */
520 #ifdef __KERNEL__
521         /* kernel portals calls our master callback when events are added to
522          * the event queue.  In fact lustre never pulls events off this queue,
523          * so it's only sized for some debug history. */
524         rc = LNetEQAlloc(1024, ptlrpc_master_callback, &ptlrpc_eq_h);
525 #else
526         /* liblustre calls the master callback when it removes events from the
527          * event queue.  The event queue has to be big enough not to drop
528          * anything */
529         rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &ptlrpc_eq_h);
530 #endif
531         if (rc == 0)
532                 return 0;
533
534         CERROR ("Failed to allocate event queue: %d\n", rc);
535         LNetNIFini();
536
537         return (-ENOMEM);
538 }
539
540 #ifndef __KERNEL__
541 CFS_LIST_HEAD(liblustre_wait_callbacks);
542 CFS_LIST_HEAD(liblustre_idle_callbacks);
543 void *liblustre_services_callback;
544
545 void *
546 liblustre_register_waitidle_callback (struct list_head *callback_list,
547                                       const char *name,
548                                       int (*fn)(void *arg), void *arg)
549 {
550         struct liblustre_wait_callback *llwc;
551
552         OBD_ALLOC(llwc, sizeof(*llwc));
553         LASSERT (llwc != NULL);
554
555         llwc->llwc_name = name;
556         llwc->llwc_fn = fn;
557         llwc->llwc_arg = arg;
558         list_add_tail(&llwc->llwc_list, callback_list);
559
560         return (llwc);
561 }
562
563 void
564 liblustre_deregister_waitidle_callback (void *opaque)
565 {
566         struct liblustre_wait_callback *llwc = opaque;
567
568         list_del(&llwc->llwc_list);
569         OBD_FREE(llwc, sizeof(*llwc));
570 }
571
572 void *
573 liblustre_register_wait_callback (const char *name,
574                                   int (*fn)(void *arg), void *arg)
575 {
576         return liblustre_register_waitidle_callback(&liblustre_wait_callbacks,
577                                                     name, fn, arg);
578 }
579
580 void
581 liblustre_deregister_wait_callback (void *opaque)
582 {
583         liblustre_deregister_waitidle_callback(opaque);
584 }
585
586 void *
587 liblustre_register_idle_callback (const char *name,
588                                   int (*fn)(void *arg), void *arg)
589 {
590         return liblustre_register_waitidle_callback(&liblustre_idle_callbacks,
591                                                     name, fn, arg);
592 }
593
594 void
595 liblustre_deregister_idle_callback (void *opaque)
596 {
597         liblustre_deregister_waitidle_callback(opaque);
598 }
599
600 int
601 liblustre_check_events (int timeout)
602 {
603         lnet_event_t ev;
604         int         rc;
605         int         i;
606         ENTRY;
607
608         rc = LNetEQPoll(&ptlrpc_eq_h, 1, timeout * 1000, &ev, &i);
609         if (rc == 0)
610                 RETURN(0);
611
612         LASSERT (rc == -EOVERFLOW || rc == 1);
613
614         /* liblustre: no asynch callback so we can't affort to miss any
615          * events... */
616         if (rc == -EOVERFLOW) {
617                 CERROR ("Dropped an event!!!\n");
618                 abort();
619         }
620
621         ptlrpc_master_callback (&ev);
622         RETURN(1);
623 }
624
625 int liblustre_waiting = 0;
626
627 int
628 liblustre_wait_event (int timeout)
629 {
630         struct list_head               *tmp;
631         struct liblustre_wait_callback *llwc;
632         int                             found_something = 0;
633
634         /* single threaded recursion check... */
635         liblustre_waiting = 1;
636
637         for (;;) {
638                 /* Deal with all pending events */
639                 while (liblustre_check_events(0))
640                         found_something = 1;
641
642                 /* Give all registered callbacks a bite at the cherry */
643                 list_for_each(tmp, &liblustre_wait_callbacks) {
644                         llwc = list_entry(tmp, struct liblustre_wait_callback,
645                                           llwc_list);
646
647                         if (llwc->llwc_fn(llwc->llwc_arg))
648                                 found_something = 1;
649                 }
650
651                 if (found_something || timeout == 0)
652                         break;
653
654                 /* Nothing so far, but I'm allowed to block... */
655                 found_something = liblustre_check_events(timeout);
656                 if (!found_something)           /* still nothing */
657                         break;                  /* I timed out */
658         }
659
660         liblustre_waiting = 0;
661
662         return found_something;
663 }
664
665 void
666 liblustre_wait_idle(void)
667 {
668         static int recursed = 0;
669         
670         struct list_head               *tmp;
671         struct liblustre_wait_callback *llwc;
672         int                             idle = 0;
673
674         LASSERT(!recursed);
675         recursed = 1;
676         
677         do {
678                 liblustre_wait_event(0);
679
680                 idle = 1;
681
682                 list_for_each(tmp, &liblustre_idle_callbacks) {
683                         llwc = list_entry(tmp, struct liblustre_wait_callback,
684                                           llwc_list);
685                         
686                         if (!llwc->llwc_fn(llwc->llwc_arg)) {
687                                 idle = 0;
688                                 break;
689                         }
690                 }
691                         
692         } while (!idle);
693
694         recursed = 0;
695 }
696
697 #endif /* __KERNEL__ */
698
699 int ptlrpc_init_portals(void)
700 {
701         int   rc = ptlrpc_ni_init();
702
703         if (rc != 0) {
704                 CERROR("network initialisation failed\n");
705                 return -EIO;
706         }
707 #ifndef __KERNEL__
708         liblustre_services_callback =
709                 liblustre_register_wait_callback("liblustre_check_services",
710                                                  &liblustre_check_services,
711                                                  NULL);
712 #endif
713         rc = ptlrpcd_addref();
714         if (rc == 0)
715                 return 0;
716         
717         CERROR("rpcd initialisation failed\n");
718 #ifndef __KERNEL__
719         liblustre_deregister_wait_callback(liblustre_services_callback);
720 #endif
721         ptlrpc_ni_fini();
722         return rc;
723 }
724
725 void ptlrpc_exit_portals(void)
726 {
727 #ifndef __KERNEL__
728         liblustre_deregister_wait_callback(liblustre_services_callback);
729 #endif
730         ptlrpcd_decref();
731         ptlrpc_ni_fini();
732 }