Whamcloud - gitweb
add ability to resend request if it isn't fit in reply buffer.
[fs/lustre-release.git] / lustre / ptlrpc / events.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_RPC
38
39 #ifndef __KERNEL__
40 # include <liblustre.h>
41 #else
42 # ifdef __mips64__
43 #  include <linux/kernel.h>
44 # endif
45 #endif
46 #include <obd_class.h>
47 #include <lustre_net.h>
48 #include "ptlrpc_internal.h"
49
50 lnet_handle_eq_t   ptlrpc_eq_h;
51
52 /*  
53  *  Client's outgoing request callback
54  */
55 void request_out_callback(lnet_event_t *ev)
56 {
57         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
58         struct ptlrpc_request *req = cbid->cbid_arg;
59         ENTRY;
60
61         LASSERT (ev->type == LNET_EVENT_SEND ||
62                  ev->type == LNET_EVENT_UNLINK);
63         LASSERT (ev->unlinked);
64
65         DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
66                   "type %d, status %d", ev->type, ev->status);
67
68         if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) {
69
70                 /* Failed send: make it seem like the reply timed out, just
71                  * like failing sends in client.c does currently...  */
72
73                 spin_lock(&req->rq_lock);
74                 req->rq_net_err = 1;
75                 spin_unlock(&req->rq_lock);
76
77                 ptlrpc_client_wake_req(req);
78         }
79
80         ptlrpc_req_finished(req);
81
82         EXIT;
83 }
84
85 /*
86  * Client's incoming reply callback
87  */
88 void reply_in_callback(lnet_event_t *ev)
89 {
90         struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
91         struct ptlrpc_request *req = cbid->cbid_arg;
92         ENTRY;
93
94         DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
95                   "type %d, status %d", ev->type, ev->status);
96
97         LASSERT(ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK);
98         LASSERT(ev->md.start == req->rq_repbuf);
99         LASSERT(ev->mlength <= req->rq_replen);
100         /* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests
101            for adaptive timeouts' early reply. */
102         LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0);
103
104         spin_lock(&req->rq_lock);
105
106         req->rq_receiving_reply = 0;
107         req->rq_early = 0;
108         if (ev->unlinked)
109                 req->rq_must_unlink = 0;
110
111         if (ev->status)
112                 goto out_wake;
113
114         if (ev->type == LNET_EVENT_UNLINK) {
115                 LASSERT(ev->unlinked);
116                 DEBUG_REQ(D_RPCTRACE, req, "unlink");
117                 goto out_wake;
118         }
119
120         if (ev->mlength < ev->rlength ) {
121                 CDEBUG(D_RPCTRACE, "truncate req %p rpc %d - %d+%d\n", req,
122                        req->rq_replen, ev->rlength, ev->offset);
123                 req->rq_reply_truncate = 1;
124                 req->rq_replied = 1;
125                 req->rq_status = -EOVERFLOW;
126                 req->rq_nob_received = ev->rlength + ev->offset;
127                 goto out_wake;
128         }
129
130         if ((ev->offset == 0) &&
131             (lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) {
132                 /* Early reply */
133                 DEBUG_REQ(D_ADAPTTO, req,
134                           "Early reply received: mlen=%u offset=%d replen=%d "
135                           "replied=%d unlinked=%d", ev->mlength, ev->offset,
136                           req->rq_replen, req->rq_replied, ev->unlinked);
137
138                 if (unlikely(ev->mlength != lustre_msg_early_size(req)))
139                         CERROR("early reply sized %u, expect %u\n",
140                                ev->mlength, lustre_msg_early_size(req));
141
142                 req->rq_early_count++; /* number received, client side */
143
144                 if (req->rq_replied)   /* already got the real reply */
145                         goto out_wake;
146
147                 req->rq_early = 1;
148                 req->rq_nob_received = ev->mlength;
149                 /* repmsg points to early reply */
150                 req->rq_repmsg = req->rq_repbuf;
151                 /* And we're still receiving */
152                 req->rq_receiving_reply = 1;
153         } else {
154                 /* Real reply */
155                 req->rq_rep_swab_mask = 0;
156                 req->rq_replied = 1;
157                 req->rq_nob_received = ev->mlength;
158                 /* repmsg points to real reply */
159                 req->rq_repmsg = (struct lustre_msg *)((char *)req->rq_repbuf +
160                                                        ev->offset);
161                 /* LNetMDUnlink can't be called under the LNET_LOCK,
162                    so we must unlink in ptlrpc_unregister_reply */
163                 DEBUG_REQ(D_INFO, req, 
164                           "reply in flags=%x mlen=%u offset=%d replen=%d",
165                           lustre_msg_get_flags(req->rq_reqmsg),
166                           ev->mlength, ev->offset, req->rq_replen);
167         }
168
169         req->rq_import->imp_last_reply_time = cfs_time_current_sec();
170
171 out_wake:
172         /* NB don't unlock till after wakeup; req can disappear under us
173          * since we don't have our own ref */
174         ptlrpc_client_wake_req(req);
175         spin_unlock(&req->rq_lock);
176         EXIT;
177 }
178
179 /* 
180  * Client's bulk has been written/read
181  */
182 void client_bulk_callback (lnet_event_t *ev)
183 {
184         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
185         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
186         ENTRY;
187
188         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB))
189                 ev->status = -EIO;
190
191         LASSERT ((desc->bd_type == BULK_PUT_SINK && 
192                   ev->type == LNET_EVENT_PUT) ||
193                  (desc->bd_type == BULK_GET_SOURCE &&
194                   ev->type == LNET_EVENT_GET) ||
195                  ev->type == LNET_EVENT_UNLINK);
196         LASSERT (ev->unlinked);
197
198         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
199                "event type %d, status %d, desc %p\n", 
200                ev->type, ev->status, desc);
201
202         spin_lock(&desc->bd_lock);
203
204         LASSERT(desc->bd_network_rw);
205         desc->bd_network_rw = 0;
206
207         if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
208                 desc->bd_success = 1;
209                 desc->bd_nob_transferred = ev->mlength;
210                 desc->bd_sender = ev->sender;
211         }
212
213         /* NB don't unlock till after wakeup; desc can disappear under us
214          * otherwise */
215         ptlrpc_client_wake_req(desc->bd_req);
216
217         spin_unlock(&desc->bd_lock);
218         EXIT;
219 }
220
221 /* 
222  * Server's incoming request callback
223  */
224 void request_in_callback(lnet_event_t *ev)
225 {
226         struct ptlrpc_cb_id               *cbid = ev->md.user_ptr;
227         struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
228         struct ptlrpc_service             *service = rqbd->rqbd_service;
229         struct ptlrpc_request             *req;
230         ENTRY;
231
232         LASSERT (ev->type == LNET_EVENT_PUT ||
233                  ev->type == LNET_EVENT_UNLINK);
234         LASSERT ((char *)ev->md.start >= rqbd->rqbd_buffer);
235         LASSERT ((char *)ev->md.start + ev->offset + ev->mlength <=
236                  rqbd->rqbd_buffer + service->srv_buf_size);
237
238         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
239                "event type %d, status %d, service %s\n", 
240                ev->type, ev->status, service->srv_name);
241
242         if (ev->unlinked) {
243                 /* If this is the last request message to fit in the
244                  * request buffer we can use the request object embedded in
245                  * rqbd.  Note that if we failed to allocate a request,
246                  * we'd have to re-post the rqbd, which we can't do in this
247                  * context. */
248                 req = &rqbd->rqbd_req;
249                 memset(req, 0, sizeof (*req));
250         } else {
251                 LASSERT (ev->type == LNET_EVENT_PUT);
252                 if (ev->status != 0) {
253                         /* We moaned above already... */
254                         return;
255                 }
256                 OBD_ALLOC_GFP(req, sizeof(*req), CFS_ALLOC_ATOMIC_TRY);
257                 if (req == NULL) {
258                         CERROR("Can't allocate incoming request descriptor: "
259                                "Dropping %s RPC from %s\n",
260                                service->srv_name, 
261                                libcfs_id2str(ev->initiator));
262                         return;
263                 }
264         }
265
266         /* NB we ABSOLUTELY RELY on req being zeroed, so pointers are NULL,
267          * flags are reset and scalars are zero.  We only set the message
268          * size to non-zero if this was a successful receive. */
269         req->rq_xid = ev->match_bits;
270         req->rq_reqmsg = ev->md.start + ev->offset;
271         if (ev->type == LNET_EVENT_PUT && ev->status == 0)
272                 req->rq_reqlen = ev->mlength;
273         do_gettimeofday(&req->rq_arrival_time);
274         req->rq_peer = ev->initiator;
275         req->rq_self = ev->target.nid;
276         req->rq_rqbd = rqbd;
277         req->rq_phase = RQ_PHASE_NEW;
278 #ifdef CRAY_XT3
279         req->rq_uid = ev->uid;
280 #endif
281         spin_lock_init(&req->rq_lock);
282         CFS_INIT_LIST_HEAD(&req->rq_timed_list);
283         atomic_set(&req->rq_refcount, 1);
284         if (ev->type == LNET_EVENT_PUT)
285                 DEBUG_REQ(D_RPCTRACE, req, "incoming req");
286
287         spin_lock(&service->srv_lock);
288
289         req->rq_history_seq = service->srv_request_seq++;
290         list_add_tail(&req->rq_history_list, &service->srv_request_history);
291
292         if (ev->unlinked) {
293                 service->srv_nrqbd_receiving--;
294                 CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
295                        service->srv_nrqbd_receiving);
296
297                 /* Normally, don't complain about 0 buffers posted; LNET won't
298                  * drop incoming reqs since we set the portal lazy */
299                 if (test_req_buffer_pressure &&
300                     ev->type != LNET_EVENT_UNLINK &&
301                     service->srv_nrqbd_receiving == 0)
302                         CWARN("All %s request buffers busy\n",
303                               service->srv_name);
304
305                 /* req takes over the network's ref on rqbd */
306         } else {
307                 /* req takes a ref on rqbd */
308                 rqbd->rqbd_refcount++;
309         }
310
311         list_add_tail(&req->rq_list, &service->srv_req_in_queue);
312         service->srv_n_queued_reqs++;
313
314         /* NB everything can disappear under us once the request
315          * has been queued and we unlock, so do the wake now... */
316         cfs_waitq_signal(&service->srv_waitq);
317
318         spin_unlock(&service->srv_lock);
319         EXIT;
320 }
321
322 /*
323  *  Server's outgoing reply callback
324  */
325 void reply_out_callback(lnet_event_t *ev)
326 {
327         struct ptlrpc_cb_id       *cbid = ev->md.user_ptr;
328         struct ptlrpc_reply_state *rs = cbid->cbid_arg;
329         struct ptlrpc_service     *svc = rs->rs_service;
330         ENTRY;
331
332         LASSERT (ev->type == LNET_EVENT_SEND ||
333                  ev->type == LNET_EVENT_ACK ||
334                  ev->type == LNET_EVENT_UNLINK);
335
336         if (!rs->rs_difficult) {
337                 /* 'Easy' replies have no further processing so I drop the
338                  * net's ref on 'rs' */
339                 LASSERT (ev->unlinked);
340                 ptlrpc_rs_decref(rs);
341                 atomic_dec (&svc->srv_outstanding_replies);
342                 EXIT;
343                 return;
344         }
345
346         LASSERT (rs->rs_on_net);
347
348         if (ev->unlinked) {
349                 /* Last network callback.  The net's ref on 'rs' stays put
350                  * until ptlrpc_server_handle_reply() is done with it */
351                 spin_lock(&svc->srv_lock);
352                 rs->rs_on_net = 0;
353                 ptlrpc_schedule_difficult_reply (rs);
354                 spin_unlock(&svc->srv_lock);
355         }
356
357         EXIT;
358 }
359
360 /*
361  * Server's bulk completion callback
362  */
363 void server_bulk_callback (lnet_event_t *ev)
364 {
365         struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
366         struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
367         ENTRY;
368
369         LASSERT (ev->type == LNET_EVENT_SEND ||
370                  ev->type == LNET_EVENT_UNLINK ||
371                  (desc->bd_type == BULK_PUT_SOURCE &&
372                   ev->type == LNET_EVENT_ACK) ||
373                  (desc->bd_type == BULK_GET_SINK &&
374                   ev->type == LNET_EVENT_REPLY));
375
376         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
377                "event type %d, status %d, desc %p\n", 
378                ev->type, ev->status, desc);
379
380         spin_lock(&desc->bd_lock);
381         
382         if ((ev->type == LNET_EVENT_ACK ||
383              ev->type == LNET_EVENT_REPLY) &&
384             ev->status == 0) {
385                 /* We heard back from the peer, so even if we get this
386                  * before the SENT event (oh yes we can), we know we
387                  * read/wrote the peer buffer and how much... */
388                 desc->bd_success = 1;
389                 desc->bd_nob_transferred = ev->mlength;
390                 desc->bd_sender = ev->sender;
391         }
392
393         if (ev->unlinked) {
394                 /* This is the last callback no matter what... */
395                 desc->bd_network_rw = 0;
396                 cfs_waitq_signal(&desc->bd_waitq);
397         }
398
399         spin_unlock(&desc->bd_lock);
400         EXIT;
401 }
402
403 static void ptlrpc_master_callback(lnet_event_t *ev)
404 {
405         struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
406         void (*callback)(lnet_event_t *ev) = cbid->cbid_fn;
407
408         /* Honestly, it's best to find out early. */
409         LASSERT (cbid->cbid_arg != LP_POISON);
410         LASSERT (callback == request_out_callback ||
411                  callback == reply_in_callback ||
412                  callback == client_bulk_callback ||
413                  callback == request_in_callback ||
414                  callback == reply_out_callback ||
415                  callback == server_bulk_callback);
416         
417         callback (ev);
418 }
419
420 int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, 
421                          lnet_process_id_t *peer, lnet_nid_t *self)
422 {
423         int               best_dist = 0;
424         __u32             best_order = 0;
425         int               count = 0;
426         int               rc = -ENOENT;
427         int               portals_compatibility;
428         int               dist;
429         __u32             order;
430         lnet_nid_t        dst_nid;
431         lnet_nid_t        src_nid;
432
433         portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL);
434
435         peer->pid = LUSTRE_SRV_LNET_PID;
436
437         /* Choose the matching UUID that's closest */
438         while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) {
439                 dist = LNetDist(dst_nid, &src_nid, &order);
440                 if (dist < 0)
441                         continue;
442
443                 if (dist == 0) {                /* local! use loopback LND */
444                         peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0);
445                         rc = 0;
446                         break;
447                 }
448                 
449                 if (rc < 0 ||
450                     dist < best_dist ||
451                     (dist == best_dist && order < best_order)) {
452                         best_dist = dist;
453                         best_order = order;
454
455                         if (portals_compatibility > 1) {
456                                 /* Strong portals compatibility: Zero the nid's
457                                  * NET, so if I'm reading new config logs, or
458                                  * getting configured by (new) lconf I can
459                                  * still talk to old servers. */
460                                 dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid));
461                                 src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid));
462                         }
463                         peer->nid = dst_nid;
464                         *self = src_nid;
465                         rc = 0;
466                 }
467         }
468
469         CDEBUG(D_NET,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer));
470         if (rc != 0) 
471                 CERROR("No NID found for %s\n", uuid->uuid);
472         return rc;
473 }
474
475 void ptlrpc_ni_fini(void)
476 {
477         cfs_waitq_t         waitq;
478         struct l_wait_info  lwi;
479         int                 rc;
480         int                 retries;
481         
482         /* Wait for the event queue to become idle since there may still be
483          * messages in flight with pending events (i.e. the fire-and-forget
484          * messages == client requests and "non-difficult" server
485          * replies */
486
487         for (retries = 0;; retries++) {
488                 rc = LNetEQFree(ptlrpc_eq_h);
489                 switch (rc) {
490                 default:
491                         LBUG();
492
493                 case 0:
494                         LNetNIFini();
495                         return;
496                         
497                 case -EBUSY:
498                         if (retries != 0)
499                                 CWARN("Event queue still busy\n");
500                         
501                         /* Wait for a bit */
502                         cfs_waitq_init(&waitq);
503                         lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL);
504                         l_wait_event(waitq, 0, &lwi);
505                         break;
506                 }
507         }
508         /* notreached */
509 }
510
511 lnet_pid_t ptl_get_pid(void)
512 {
513         lnet_pid_t        pid;
514
515 #ifndef  __KERNEL__
516         pid = getpid();
517 #else
518         pid = LUSTRE_SRV_LNET_PID;
519 #endif
520         return pid;
521 }
522         
523 int ptlrpc_ni_init(void)
524 {
525         int              rc;
526         lnet_pid_t       pid;
527
528         pid = ptl_get_pid();
529         CDEBUG(D_NET, "My pid is: %x\n", pid);
530
531         /* We're not passing any limits yet... */
532         rc = LNetNIInit(pid);
533         if (rc < 0) {
534                 CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
535                 return (-ENOENT);
536         }
537
538         /* CAVEAT EMPTOR: how we process portals events is _radically_
539          * different depending on... */
540 #ifdef __KERNEL__
541         /* kernel portals calls our master callback when events are added to
542          * the event queue.  In fact lustre never pulls events off this queue,
543          * so it's only sized for some debug history. */
544         rc = LNetEQAlloc(1024, ptlrpc_master_callback, &ptlrpc_eq_h);
545 #else
546         /* liblustre calls the master callback when it removes events from the
547          * event queue.  The event queue has to be big enough not to drop
548          * anything */
549         rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &ptlrpc_eq_h);
550 #endif
551         if (rc == 0)
552                 return 0;
553
554         CERROR ("Failed to allocate event queue: %d\n", rc);
555         LNetNIFini();
556
557         return (-ENOMEM);
558 }
559
560 #ifndef __KERNEL__
561 CFS_LIST_HEAD(liblustre_wait_callbacks);
562 CFS_LIST_HEAD(liblustre_idle_callbacks);
563 void *liblustre_services_callback;
564
565 void *
566 liblustre_register_waitidle_callback (struct list_head *callback_list,
567                                       const char *name,
568                                       int (*fn)(void *arg), void *arg)
569 {
570         struct liblustre_wait_callback *llwc;
571         
572         OBD_ALLOC(llwc, sizeof(*llwc));
573         LASSERT (llwc != NULL);
574         
575         llwc->llwc_name = name;
576         llwc->llwc_fn = fn;
577         llwc->llwc_arg = arg;
578         list_add_tail(&llwc->llwc_list, callback_list);
579         
580         return (llwc);
581 }
582
583 void
584 liblustre_deregister_waitidle_callback (void *opaque)
585 {
586         struct liblustre_wait_callback *llwc = opaque;
587         
588         list_del(&llwc->llwc_list);
589         OBD_FREE(llwc, sizeof(*llwc));
590 }
591
592 void *
593 liblustre_register_wait_callback (const char *name,
594                                   int (*fn)(void *arg), void *arg)
595 {
596         return liblustre_register_waitidle_callback(&liblustre_wait_callbacks,
597                                                     name, fn, arg);
598 }
599
600 void
601 liblustre_deregister_wait_callback (void *opaque)
602 {
603         liblustre_deregister_waitidle_callback(opaque);
604 }
605
606 void *
607 liblustre_register_idle_callback (const char *name,
608                                   int (*fn)(void *arg), void *arg)
609 {
610         return liblustre_register_waitidle_callback(&liblustre_idle_callbacks,
611                                                     name, fn, arg);
612 }
613
614 void
615 liblustre_deregister_idle_callback (void *opaque)
616 {
617         liblustre_deregister_waitidle_callback(opaque);
618 }
619
620 int
621 liblustre_check_events (int timeout)
622 {
623         lnet_event_t ev;
624         int         rc;
625         int         i;
626         ENTRY;
627
628         rc = LNetEQPoll(&ptlrpc_eq_h, 1, timeout * 1000, &ev, &i);
629         if (rc == 0)
630                 RETURN(0);
631         
632         LASSERT (rc == -EOVERFLOW || rc == 1);
633         
634         /* liblustre: no asynch callback so we can't affort to miss any
635          * events... */
636         if (rc == -EOVERFLOW) {
637                 CERROR ("Dropped an event!!!\n");
638                 abort();
639         }
640         
641         ptlrpc_master_callback (&ev);
642         RETURN(1);
643 }
644
645 int liblustre_waiting = 0;
646
647 int
648 liblustre_wait_event (int timeout)
649 {
650         struct list_head               *tmp;
651         struct liblustre_wait_callback *llwc;
652         int                             found_something = 0;
653
654         /* single threaded recursion check... */
655         liblustre_waiting = 1;
656
657         for (;;) {
658                 /* Deal with all pending events */
659                 while (liblustre_check_events(0))
660                         found_something = 1;
661
662                 /* Give all registered callbacks a bite at the cherry */
663                 list_for_each(tmp, &liblustre_wait_callbacks) {
664                         llwc = list_entry(tmp, struct liblustre_wait_callback, 
665                                           llwc_list);
666                 
667                         if (llwc->llwc_fn(llwc->llwc_arg))
668                                 found_something = 1;
669                 }
670
671                 if (found_something || timeout == 0)
672                         break;
673
674                 /* Nothing so far, but I'm allowed to block... */
675                 found_something = liblustre_check_events(timeout);
676                 if (!found_something)           /* still nothing */
677                         break;                  /* I timed out */
678         }
679
680         liblustre_waiting = 0;
681
682         return found_something;
683 }
684
685 void
686 liblustre_wait_idle(void)
687 {
688         static int recursed = 0;
689         
690         struct list_head               *tmp;
691         struct liblustre_wait_callback *llwc;
692         int                             idle = 0;
693
694         LASSERT(!recursed);
695         recursed = 1;
696         
697         do {
698                 liblustre_wait_event(0);
699
700                 idle = 1;
701
702                 list_for_each(tmp, &liblustre_idle_callbacks) {
703                         llwc = list_entry(tmp, struct liblustre_wait_callback,
704                                           llwc_list);
705                         
706                         if (!llwc->llwc_fn(llwc->llwc_arg)) {
707                                 idle = 0;
708                                 break;
709                         }
710                 }
711                         
712         } while (!idle);
713
714         recursed = 0;
715 }
716
717 #endif /* __KERNEL__ */
718
719 int ptlrpc_init_portals(void)
720 {
721         int   rc = ptlrpc_ni_init();
722
723         if (rc != 0) {
724                 CERROR("network initialisation failed\n");
725                 return -EIO;
726         }
727 #ifndef __KERNEL__
728         liblustre_services_callback = 
729                 liblustre_register_wait_callback("liblustre_check_services",
730                                                  &liblustre_check_services, NULL);
731 #endif
732         rc = ptlrpcd_addref();
733         if (rc == 0)
734                 return 0;
735         
736         CERROR("rpcd initialisation failed\n");
737 #ifndef __KERNEL__
738         liblustre_deregister_wait_callback(liblustre_services_callback);
739 #endif
740         ptlrpc_ni_fini();
741         return rc;
742 }
743
744 void ptlrpc_exit_portals(void)
745 {
746 #ifndef __KERNEL__
747         liblustre_deregister_wait_callback(liblustre_services_callback);
748 #endif
749         ptlrpcd_decref();
750         ptlrpc_ni_fini();
751 }