X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fptlrpc%2Fevents.c;h=96e68d45a3207dd4da23fe09652b4338d7ab08d0;hb=3485238de84848740aaaa05f78da2cb53b81d3a0;hp=8441c6cf88a634247ea4bfa66e3291ce9e09b363;hpb=f95393b0d0a59cf3dc2f29cffc35dcc4cc9d7728;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 8441c6c..96e68d4 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -28,6 +26,8 @@ /* * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2012, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -65,22 +65,23 @@ void request_out_callback(lnet_event_t *ev) ev->type == LNET_EVENT_UNLINK); LASSERT (ev->unlinked); - DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req, - "type %d, status %d", ev->type, ev->status); + DEBUG_REQ(D_NET, req, "type %d, status %d", ev->type, ev->status); sptlrpc_request_out_callback(req); + spin_lock(&req->rq_lock); + req->rq_real_sent = cfs_time_current_sec(); + if (ev->unlinked) + req->rq_req_unlink = 0; if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) { /* Failed send: make it seem like the reply timed out, just * like failing sends in client.c does currently... */ - cfs_spin_lock(&req->rq_lock); - req->rq_net_err = 1; - cfs_spin_unlock(&req->rq_lock); - + req->rq_net_err = 1; ptlrpc_client_wake_req(req); } + spin_unlock(&req->rq_lock); ptlrpc_req_finished(req); @@ -96,8 +97,7 @@ void reply_in_callback(lnet_event_t *ev) struct ptlrpc_request *req = cbid->cbid_arg; ENTRY; - DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req, - "type %d, status %d", ev->type, ev->status); + DEBUG_REQ(D_NET, req, "type %d, status %d", ev->type, ev->status); LASSERT (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK); LASSERT (ev->md.start == req->rq_repbuf); @@ -106,19 +106,19 @@ void reply_in_callback(lnet_event_t *ev) for adaptive timeouts' early reply. */ LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0); - cfs_spin_lock(&req->rq_lock); + spin_lock(&req->rq_lock); req->rq_receiving_reply = 0; req->rq_early = 0; if (ev->unlinked) - req->rq_must_unlink = 0; + req->rq_reply_unlink = 0; if (ev->status) goto out_wake; if (ev->type == LNET_EVENT_UNLINK) { LASSERT(ev->unlinked); - DEBUG_REQ(D_RPCTRACE, req, "unlink"); + DEBUG_REQ(D_NET, req, "unlink"); goto out_wake; } @@ -154,6 +154,8 @@ void reply_in_callback(lnet_event_t *ev) /* Real reply */ req->rq_rep_swab_mask = 0; req->rq_replied = 1; + /* Got reply, no resend required */ + req->rq_resend = 0; req->rq_reply_off = ev->offset; req->rq_nob_received = ev->mlength; /* LNetMDUnlink can't be called under the LNET_LOCK, @@ -170,8 +172,8 @@ out_wake: /* NB don't unlock till after wakeup; req can disappear under us * since we don't have our own ref */ ptlrpc_client_wake_req(req); - cfs_spin_unlock(&req->rq_lock); - EXIT; + spin_unlock(&req->rq_lock); + EXIT; } /* @@ -181,6 +183,7 @@ void client_bulk_callback (lnet_event_t *ev) { struct ptlrpc_cb_id *cbid = ev->md.user_ptr; struct ptlrpc_bulk_desc *desc = cbid->cbid_arg; + struct ptlrpc_request *req; ENTRY; LASSERT ((desc->bd_type == BULK_PUT_SINK && @@ -190,31 +193,97 @@ void client_bulk_callback (lnet_event_t *ev) ev->type == LNET_EVENT_UNLINK); LASSERT (ev->unlinked); + if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB, CFS_FAIL_ONCE)) + ev->status = -EIO; + + if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2,CFS_FAIL_ONCE)) + ev->status = -EIO; + CDEBUG((ev->status == 0) ? D_NET : D_ERROR, "event type %d, status %d, desc %p\n", ev->type, ev->status, desc); - cfs_spin_lock(&desc->bd_lock); - - LASSERT(desc->bd_network_rw); - desc->bd_network_rw = 0; + spin_lock(&desc->bd_lock); + req = desc->bd_req; + LASSERT(desc->bd_md_count > 0); + desc->bd_md_count--; + + if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) { + desc->bd_nob_transferred += ev->mlength; + desc->bd_sender = ev->sender; + } else { + /* start reconnect and resend if network error hit */ + spin_lock(&req->rq_lock); + req->rq_net_err = 1; + spin_unlock(&req->rq_lock); + } + + if (ev->status != 0) + desc->bd_failure = 1; + + /* NB don't unlock till after wakeup; desc can disappear under us + * otherwise */ + if (desc->bd_md_count == 0) + ptlrpc_client_wake_req(desc->bd_req); + + spin_unlock(&desc->bd_lock); + EXIT; +} - if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) { - desc->bd_success = 1; - desc->bd_nob_transferred = ev->mlength; - desc->bd_sender = ev->sender; - } +/* + * We will have percpt request history list for ptlrpc service in upcoming + * patches because we don't want to be serialized by current per-service + * history operations. So we require history ID can (somehow) show arriving + * order w/o grabbing global lock, and user can sort them in userspace. + * + * This is how we generate history ID for ptlrpc_request: + * ---------------------------------------------------- + * | 32 bits | 16 bits | (16 - X)bits | X bits | + * ---------------------------------------------------- + * | seconds | usec / 16 | sequence | CPT id | + * ---------------------------------------------------- + * + * it might not be precise but should be good enough. + */ - /* release the encrypted pages for write */ - if (desc->bd_req->rq_bulk_write) - sptlrpc_enc_pool_put_pages(desc); +#define REQS_CPT_BITS(svcpt) ((svcpt)->scp_service->srv_cpt_bits) - /* NB don't unlock till after wakeup; desc can disappear under us - * otherwise */ - ptlrpc_client_wake_req(desc->bd_req); +#define REQS_SEC_SHIFT 32 +#define REQS_USEC_SHIFT 16 +#define REQS_SEQ_SHIFT(svcpt) REQS_CPT_BITS(svcpt) - cfs_spin_unlock(&desc->bd_lock); - EXIT; +static void ptlrpc_req_add_history(struct ptlrpc_service_part *svcpt, + struct ptlrpc_request *req) +{ + __u64 sec = req->rq_arrival_time.tv_sec; + __u32 usec = req->rq_arrival_time.tv_usec >> 4; /* usec / 16 */ + __u64 new_seq; + + /* set sequence ID for request and add it to history list, + * it must be called with hold svcpt::scp_lock */ + + new_seq = (sec << REQS_SEC_SHIFT) | + (usec << REQS_USEC_SHIFT) | + (svcpt->scp_cpt < 0 ? 0 : svcpt->scp_cpt); + + if (new_seq > svcpt->scp_hist_seq) { + /* This handles the initial case of scp_hist_seq == 0 or + * we just jumped into a new time window */ + svcpt->scp_hist_seq = new_seq; + } else { + LASSERT(REQS_SEQ_SHIFT(svcpt) < REQS_USEC_SHIFT); + /* NB: increase sequence number in current usec bucket, + * however, it's possible that we used up all bits for + * sequence and jumped into the next usec bucket (future time), + * then we hope there will be less RPCs per bucket at some + * point, and sequence will catch up again */ + svcpt->scp_hist_seq += (1U << REQS_SEQ_SHIFT(svcpt)); + new_seq = svcpt->scp_hist_seq; + } + + req->rq_history_seq = new_seq; + + list_add_tail(&req->rq_history_list, &svcpt->scp_hist_reqs); } /* @@ -222,9 +291,10 @@ void client_bulk_callback (lnet_event_t *ev) */ void request_in_callback(lnet_event_t *ev) { - struct ptlrpc_cb_id *cbid = ev->md.user_ptr; - struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg; - struct ptlrpc_service *service = rqbd->rqbd_service; + struct ptlrpc_cb_id *cbid = ev->md.user_ptr; + struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg; + struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt; + struct ptlrpc_service *service = svcpt->scp_service; struct ptlrpc_request *req; ENTRY; @@ -252,7 +322,7 @@ void request_in_callback(lnet_event_t *ev) /* We moaned above already... */ return; } - OBD_ALLOC_GFP(req, sizeof(*req), CFS_ALLOC_ATOMIC_TRY); + req = ptlrpc_request_cache_alloc(ALLOC_ATOMIC_TRY); if (req == NULL) { CERROR("Can't allocate incoming request descriptor: " "Dropping %s RPC from %s\n", @@ -267,40 +337,37 @@ void request_in_callback(lnet_event_t *ev) * size to non-zero if this was a successful receive. */ req->rq_xid = ev->match_bits; req->rq_reqbuf = ev->md.start + ev->offset; - if (ev->type == LNET_EVENT_PUT && ev->status == 0) - req->rq_reqdata_len = ev->mlength; - cfs_gettimeofday(&req->rq_arrival_time); - req->rq_peer = ev->initiator; - req->rq_self = ev->target.nid; - req->rq_rqbd = rqbd; + if (ev->type == LNET_EVENT_PUT && ev->status == 0) + req->rq_reqdata_len = ev->mlength; + do_gettimeofday(&req->rq_arrival_time); + req->rq_peer = ev->initiator; + req->rq_self = ev->target.nid; + req->rq_rqbd = rqbd; req->rq_phase = RQ_PHASE_NEW; -#ifdef CRAY_XT3 - req->rq_uid = ev->uid; -#endif - cfs_spin_lock_init(&req->rq_lock); - CFS_INIT_LIST_HEAD(&req->rq_timed_list); - cfs_atomic_set(&req->rq_refcount, 1); - if (ev->type == LNET_EVENT_PUT) - CDEBUG(D_RPCTRACE, "incoming req@%p x"LPU64" msgsize %u\n", - req, req->rq_xid, ev->mlength); - - CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer)); - - cfs_spin_lock(&service->srv_lock); - - req->rq_history_seq = service->srv_request_seq++; - cfs_list_add_tail(&req->rq_history_list, &service->srv_request_history); - - if (ev->unlinked) { - service->srv_nrqbd_receiving--; - CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n", - service->srv_nrqbd_receiving); - - /* Normally, don't complain about 0 buffers posted; LNET won't - * drop incoming reqs since we set the portal lazy */ - if (test_req_buffer_pressure && - ev->type != LNET_EVENT_UNLINK && - service->srv_nrqbd_receiving == 0) + spin_lock_init(&req->rq_lock); + INIT_LIST_HEAD(&req->rq_timed_list); + INIT_LIST_HEAD(&req->rq_exp_list); + atomic_set(&req->rq_refcount, 1); + if (ev->type == LNET_EVENT_PUT) + CDEBUG(D_INFO, "incoming req@%p x"LPU64" msgsize %u\n", + req, req->rq_xid, ev->mlength); + + CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer)); + + spin_lock(&svcpt->scp_lock); + + ptlrpc_req_add_history(svcpt, req); + + if (ev->unlinked) { + svcpt->scp_nrqbds_posted--; + CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n", + svcpt->scp_nrqbds_posted); + + /* Normally, don't complain about 0 buffers posted; LNET won't + * drop incoming reqs since we set the portal lazy */ + if (test_req_buffer_pressure && + ev->type != LNET_EVENT_UNLINK && + svcpt->scp_nrqbds_posted == 0) CWARN("All %s request buffers busy\n", service->srv_name); @@ -310,15 +377,15 @@ void request_in_callback(lnet_event_t *ev) rqbd->rqbd_refcount++; } - cfs_list_add_tail(&req->rq_list, &service->srv_req_in_queue); - service->srv_n_queued_reqs++; + list_add_tail(&req->rq_list, &svcpt->scp_req_incoming); + svcpt->scp_nreqs_incoming++; - /* NB everything can disappear under us once the request - * has been queued and we unlock, so do the wake now... */ - cfs_waitq_signal(&service->srv_waitq); + /* NB everything can disappear under us once the request + * has been queued and we unlock, so do the wake now... */ + wake_up(&svcpt->scp_waitq); - cfs_spin_unlock(&service->srv_lock); - EXIT; + spin_unlock(&svcpt->scp_lock); + EXIT; } /* @@ -326,9 +393,9 @@ void request_in_callback(lnet_event_t *ev) */ void reply_out_callback(lnet_event_t *ev) { - struct ptlrpc_cb_id *cbid = ev->md.user_ptr; - struct ptlrpc_reply_state *rs = cbid->cbid_arg; - struct ptlrpc_service *svc = rs->rs_service; + struct ptlrpc_cb_id *cbid = ev->md.user_ptr; + struct ptlrpc_reply_state *rs = cbid->cbid_arg; + struct ptlrpc_service_part *svcpt = rs->rs_svcpt; ENTRY; LASSERT (ev->type == LNET_EVENT_SEND || @@ -340,7 +407,6 @@ void reply_out_callback(lnet_event_t *ev) * net's ref on 'rs' */ LASSERT (ev->unlinked); ptlrpc_rs_decref(rs); - cfs_atomic_dec (&svc->srv_outstanding_replies); EXIT; return; } @@ -350,61 +416,70 @@ void reply_out_callback(lnet_event_t *ev) if (ev->unlinked) { /* Last network callback. The net's ref on 'rs' stays put * until ptlrpc_handle_rs() is done with it */ - cfs_spin_lock(&svc->srv_lock); - cfs_spin_lock(&rs->rs_lock); - rs->rs_on_net = 0; - if (!rs->rs_no_ack || - rs->rs_transno <= rs->rs_export->exp_obd->obd_last_committed) - ptlrpc_schedule_difficult_reply (rs); - cfs_spin_unlock(&rs->rs_lock); - cfs_spin_unlock(&svc->srv_lock); - } - - EXIT; + spin_lock(&svcpt->scp_rep_lock); + spin_lock(&rs->rs_lock); + + rs->rs_on_net = 0; + if (!rs->rs_no_ack || + rs->rs_transno <= + rs->rs_export->exp_obd->obd_last_committed) + ptlrpc_schedule_difficult_reply(rs); + + spin_unlock(&rs->rs_lock); + spin_unlock(&svcpt->scp_rep_lock); + } + EXIT; } +#ifdef HAVE_SERVER_SUPPORT /* * Server's bulk completion callback */ void server_bulk_callback (lnet_event_t *ev) { - struct ptlrpc_cb_id *cbid = ev->md.user_ptr; - struct ptlrpc_bulk_desc *desc = cbid->cbid_arg; - ENTRY; + struct ptlrpc_cb_id *cbid = ev->md.user_ptr; + struct ptlrpc_bulk_desc *desc = cbid->cbid_arg; + ENTRY; - LASSERT (ev->type == LNET_EVENT_SEND || - ev->type == LNET_EVENT_UNLINK || - (desc->bd_type == BULK_PUT_SOURCE && - ev->type == LNET_EVENT_ACK) || - (desc->bd_type == BULK_GET_SINK && - ev->type == LNET_EVENT_REPLY)); + LASSERT(ev->type == LNET_EVENT_SEND || + ev->type == LNET_EVENT_UNLINK || + (desc->bd_type == BULK_PUT_SOURCE && + ev->type == LNET_EVENT_ACK) || + (desc->bd_type == BULK_GET_SINK && + ev->type == LNET_EVENT_REPLY)); CDEBUG((ev->status == 0) ? D_NET : D_ERROR, "event type %d, status %d, desc %p\n", ev->type, ev->status, desc); - cfs_spin_lock(&desc->bd_lock); - - if ((ev->type == LNET_EVENT_ACK || - ev->type == LNET_EVENT_REPLY) && - ev->status == 0) { - /* We heard back from the peer, so even if we get this - * before the SENT event (oh yes we can), we know we - * read/wrote the peer buffer and how much... */ - desc->bd_success = 1; - desc->bd_nob_transferred = ev->mlength; - desc->bd_sender = ev->sender; - } + spin_lock(&desc->bd_lock); - if (ev->unlinked) { - /* This is the last callback no matter what... */ - desc->bd_network_rw = 0; - cfs_waitq_signal(&desc->bd_waitq); - } + LASSERT(desc->bd_md_count > 0); - cfs_spin_unlock(&desc->bd_lock); - EXIT; + if ((ev->type == LNET_EVENT_ACK || + ev->type == LNET_EVENT_REPLY) && + ev->status == 0) { + /* We heard back from the peer, so even if we get this + * before the SENT event (oh yes we can), we know we + * read/wrote the peer buffer and how much... */ + desc->bd_nob_transferred += ev->mlength; + desc->bd_sender = ev->sender; + } + + if (ev->status != 0) + desc->bd_failure = 1; + + if (ev->unlinked) { + desc->bd_md_count--; + /* This is the last callback no matter what... */ + if (desc->bd_md_count == 0) + wake_up(&desc->bd_waitq); + } + + spin_unlock(&desc->bd_lock); + EXIT; } +#endif static void ptlrpc_master_callback(lnet_event_t *ev) { @@ -417,8 +492,11 @@ static void ptlrpc_master_callback(lnet_event_t *ev) callback == reply_in_callback || callback == client_bulk_callback || callback == request_in_callback || - callback == reply_out_callback || - callback == server_bulk_callback); + callback == reply_out_callback +#ifdef HAVE_SERVER_SUPPORT + || callback == server_bulk_callback +#endif + ); callback (ev); } @@ -478,38 +556,38 @@ int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, void ptlrpc_ni_fini(void) { - cfs_waitq_t waitq; - struct l_wait_info lwi; - int rc; - int retries; - - /* Wait for the event queue to become idle since there may still be - * messages in flight with pending events (i.e. the fire-and-forget - * messages == client requests and "non-difficult" server - * replies */ - - for (retries = 0;; retries++) { - rc = LNetEQFree(ptlrpc_eq_h); - switch (rc) { - default: - LBUG(); - - case 0: - LNetNIFini(); - return; - - case -EBUSY: - if (retries != 0) - CWARN("Event queue still busy\n"); - - /* Wait for a bit */ - cfs_waitq_init(&waitq); - lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL); - l_wait_event(waitq, 0, &lwi); - break; - } - } - /* notreached */ + wait_queue_head_t waitq; + struct l_wait_info lwi; + int rc; + int retries; + + /* Wait for the event queue to become idle since there may still be + * messages in flight with pending events (i.e. the fire-and-forget + * messages == client requests and "non-difficult" server + * replies */ + + for (retries = 0;; retries++) { + rc = LNetEQFree(ptlrpc_eq_h); + switch (rc) { + default: + LBUG(); + + case 0: + LNetNIFini(); + return; + + case -EBUSY: + if (retries != 0) + CWARN("Event queue still busy\n"); + + /* Wait for a bit */ + init_waitqueue_head(&waitq); + lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL); + l_wait_event(waitq, 0, &lwi); + break; + } + } + /* notreached */ } lnet_pid_t ptl_get_pid(void) @@ -536,16 +614,17 @@ int ptlrpc_ni_init(void) rc = LNetNIInit(pid); if (rc < 0) { CDEBUG (D_NET, "Can't init network interface: %d\n", rc); - return (-ENOENT); + return rc; } /* CAVEAT EMPTOR: how we process portals events is _radically_ * different depending on... */ #ifdef __KERNEL__ - /* kernel portals calls our master callback when events are added to - * the event queue. In fact lustre never pulls events off this queue, - * so it's only sized for some debug history. */ - rc = LNetEQAlloc(1024, ptlrpc_master_callback, &ptlrpc_eq_h); + /* kernel LNet calls our master callback when there are new event, + * because we are guaranteed to get every event via callback, + * so we just set EQ size to 0 to avoid overhread of serializing + * enqueue/dequeue operations in LNet. */ + rc = LNetEQAlloc(0, ptlrpc_master_callback, &ptlrpc_eq_h); #else /* liblustre calls the master callback when it removes events from the * event queue. The event queue has to be big enough not to drop @@ -558,18 +637,18 @@ int ptlrpc_ni_init(void) CERROR ("Failed to allocate event queue: %d\n", rc); LNetNIFini(); - return (-ENOMEM); + return rc; } #ifndef __KERNEL__ -CFS_LIST_HEAD(liblustre_wait_callbacks); -CFS_LIST_HEAD(liblustre_idle_callbacks); +struct list_head liblustre_wait_callbacks; +struct list_head liblustre_idle_callbacks; void *liblustre_services_callback; void * -liblustre_register_waitidle_callback (cfs_list_t *callback_list, - const char *name, - int (*fn)(void *arg), void *arg) +liblustre_register_waitidle_callback(struct list_head *callback_list, + const char *name, + int (*fn)(void *arg), void *arg) { struct liblustre_wait_callback *llwc; @@ -579,7 +658,7 @@ liblustre_register_waitidle_callback (cfs_list_t *callback_list, llwc->llwc_name = name; llwc->llwc_fn = fn; llwc->llwc_arg = arg; - cfs_list_add_tail(&llwc->llwc_list, callback_list); + list_add_tail(&llwc->llwc_list, callback_list); return (llwc); } @@ -587,10 +666,10 @@ liblustre_register_waitidle_callback (cfs_list_t *callback_list, void liblustre_deregister_waitidle_callback (void *opaque) { - struct liblustre_wait_callback *llwc = opaque; + struct liblustre_wait_callback *llwc = opaque; - cfs_list_del(&llwc->llwc_list); - OBD_FREE(llwc, sizeof(*llwc)); + list_del(&llwc->llwc_list); + OBD_FREE(llwc, sizeof(*llwc)); } void * @@ -651,7 +730,7 @@ int liblustre_waiting = 0; int liblustre_wait_event (int timeout) { - cfs_list_t *tmp; + struct list_head *tmp; struct liblustre_wait_callback *llwc; int found_something = 0; @@ -664,8 +743,8 @@ liblustre_wait_event (int timeout) found_something = 1; /* Give all registered callbacks a bite at the cherry */ - cfs_list_for_each(tmp, &liblustre_wait_callbacks) { - llwc = cfs_list_entry(tmp, + list_for_each(tmp, &liblustre_wait_callbacks) { + llwc = list_entry(tmp, struct liblustre_wait_callback, llwc_list); @@ -691,10 +770,9 @@ void liblustre_wait_idle(void) { static int recursed = 0; - - cfs_list_t *tmp; - struct liblustre_wait_callback *llwc; - int idle = 0; + struct list_head *tmp; + struct liblustre_wait_callback *llwc; + int idle = 0; LASSERT(!recursed); recursed = 1; @@ -704,11 +782,9 @@ liblustre_wait_idle(void) idle = 1; - cfs_list_for_each(tmp, &liblustre_idle_callbacks) { - llwc = cfs_list_entry(tmp, - struct liblustre_wait_callback, - llwc_list); - + list_for_each(tmp, &liblustre_idle_callbacks) { + llwc = list_entry(tmp, struct liblustre_wait_callback, + llwc_list); if (!llwc->llwc_fn(llwc->llwc_arg)) { idle = 0; break; @@ -728,14 +804,17 @@ int ptlrpc_init_portals(void) if (rc != 0) { CERROR("network initialisation failed\n"); - return -EIO; + return rc; } #ifndef __KERNEL__ + INIT_LIST_HEAD(&liblustre_wait_callbacks); + INIT_LIST_HEAD(&liblustre_idle_callbacks); + liblustre_services_callback = liblustre_register_wait_callback("liblustre_check_services", &liblustre_check_services, NULL); - cfs_init_completion_module(liblustre_wait_event); + init_completion_module(liblustre_wait_event); #endif rc = ptlrpcd_addref(); if (rc == 0)