X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fptlrpc%2Fevents.c;h=777ef75d94c4aac5965cf4c0b05398eeae69ca18;hp=a016f35f063d72820eb5160a731f54169a2eb6fa;hb=859678cc6b075f7c81903e44b99bdbd18c635cbb;hpb=b339c5a672ed7d20747bb3653ee72865835b4d08 diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index a016f35..777ef75 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -1,233 +1,676 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Copyright (C) 2002 Cluster File Systems, Inc. + * Copyright (c) 2002, 2003 Cluster File Systems, Inc. * - * This file is part of Lustre, http://www.lustre.org. + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. * */ -#define EXPORT_SYMTAB #define DEBUG_SUBSYSTEM S_RPC -#include -#include +#ifndef __KERNEL__ +# include +#else +# ifdef __mips64__ +# include +# endif +#endif +#include +#include +#include "ptlrpc_internal.h" + +lnet_handle_eq_t ptlrpc_eq_h; + +/* + * Client's outgoing request callback + */ +void request_out_callback(lnet_event_t *ev) +{ + struct ptlrpc_cb_id *cbid = ev->md.user_ptr; + struct ptlrpc_request *req = cbid->cbid_arg; + ENTRY; + + LASSERT (ev->type == LNET_EVENT_SEND || + ev->type == LNET_EVENT_UNLINK); + LASSERT (ev->unlinked); -ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq, bulk_source_eq, - bulk_sink_eq; -static const ptl_handle_ni_t *socknal_nip = NULL, *qswnal_nip = NULL; + DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req, + "type %d, status %d", ev->type, ev->status); + + if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) { + + /* Failed send: make it seem like the reply timed out, just + * like failing sends in client.c does currently... */ + + spin_lock(&req->rq_lock); + req->rq_net_err = 1; + spin_unlock(&req->rq_lock); + + ptlrpc_wake_client_req(req); + } + + /* these balance the references in ptl_send_rpc() */ + atomic_dec(&req->rq_import->imp_inflight); + ptlrpc_req_finished(req); + + EXIT; +} /* - * Free the packet when it has gone out + * Client's incoming reply callback */ -static int request_out_callback(ptl_event_t *ev, void *data) +void reply_in_callback(lnet_event_t *ev) { + struct ptlrpc_cb_id *cbid = ev->md.user_ptr; + struct ptlrpc_request *req = cbid->cbid_arg; ENTRY; - if (ev->type != PTL_EVENT_SENT) { - // XXX make sure we understand all events, including ACK's - CERROR("Unknown event %d\n", ev->type); - LBUG(); + LASSERT (ev->type == LNET_EVENT_PUT || + ev->type == LNET_EVENT_UNLINK); + LASSERT (ev->unlinked); + LASSERT (ev->md.start == req->rq_repmsg); + LASSERT (ev->offset == 0); + LASSERT (ev->mlength <= req->rq_replen); + + DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req, + "type %d, status %d", ev->type, ev->status); + + spin_lock(&req->rq_lock); + + LASSERT (req->rq_receiving_reply); + req->rq_receiving_reply = 0; + + if (ev->type == LNET_EVENT_PUT && ev->status == 0) { + req->rq_replied = 1; + req->rq_nob_received = ev->mlength; } - RETURN(1); + /* NB don't unlock till after wakeup; req can disappear under us + * since we don't have our own ref */ + ptlrpc_wake_client_req(req); + + spin_unlock(&req->rq_lock); + EXIT; } +/* + * Client's bulk has been written/read + */ +void client_bulk_callback (lnet_event_t *ev) +{ + struct ptlrpc_cb_id *cbid = ev->md.user_ptr; + struct ptlrpc_bulk_desc *desc = cbid->cbid_arg; + ENTRY; -/* - * Free the packet when it has gone out + LASSERT ((desc->bd_type == BULK_PUT_SINK && + ev->type == LNET_EVENT_PUT) || + (desc->bd_type == BULK_GET_SOURCE && + ev->type == LNET_EVENT_GET) || + ev->type == LNET_EVENT_UNLINK); + LASSERT (ev->unlinked); + + CDEBUG((ev->status == 0) ? D_NET : D_ERROR, + "event type %d, status %d, desc %p\n", + ev->type, ev->status, desc); + + spin_lock(&desc->bd_lock); + + LASSERT(desc->bd_network_rw); + desc->bd_network_rw = 0; + + if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) { + desc->bd_success = 1; + desc->bd_nob_transferred = ev->mlength; + } + + /* NB don't unlock till after wakeup; desc can disappear under us + * otherwise */ + ptlrpc_wake_client_req(desc->bd_req); + + spin_unlock(&desc->bd_lock); + EXIT; +} + +/* + * Server's incoming request callback */ -static int reply_out_callback(ptl_event_t *ev, void *data) +void request_in_callback(lnet_event_t *ev) { + struct ptlrpc_cb_id *cbid = ev->md.user_ptr; + struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg; + struct ptlrpc_service *service = rqbd->rqbd_service; + struct ptlrpc_request *req; ENTRY; - if (ev->type == PTL_EVENT_SENT) { - OBD_FREE(ev->mem_desc.start, ev->mem_desc.length); + LASSERT (ev->type == LNET_EVENT_PUT || + ev->type == LNET_EVENT_UNLINK); + LASSERT ((char *)ev->md.start >= rqbd->rqbd_buffer); + LASSERT ((char *)ev->md.start + ev->offset + ev->mlength <= + rqbd->rqbd_buffer + service->srv_buf_size); + + CDEBUG((ev->status == 0) ? D_NET : D_ERROR, + "event type %d, status %d, service %s\n", + ev->type, ev->status, service->srv_name); + + if (ev->unlinked) { + /* If this is the last request message to fit in the + * request buffer we can use the request object embedded in + * rqbd. Note that if we failed to allocate a request, + * we'd have to re-post the rqbd, which we can't do in this + * context. */ + req = &rqbd->rqbd_req; + memset(req, 0, sizeof (*req)); } else { - // XXX make sure we understand all events, including ACK's - CERROR("Unknown event %d\n", ev->type); - LBUG(); + LASSERT (ev->type == LNET_EVENT_PUT); + if (ev->status != 0) { + /* We moaned above already... */ + return; + } + OBD_ALLOC_GFP(req, sizeof(*req), CFS_ALLOC_ATOMIC_TRY); + if (req == NULL) { + CERROR("Can't allocate incoming request descriptor: " + "Dropping %s RPC from %s\n", + service->srv_name, + libcfs_id2str(ev->initiator)); + return; + } } - RETURN(1); + /* NB we ABSOLUTELY RELY on req being zeroed, so pointers are NULL, + * flags are reset and scalars are zero. We only set the message + * size to non-zero if this was a successful receive. */ + req->rq_xid = ev->match_bits; + req->rq_reqmsg = ev->md.start + ev->offset; + if (ev->type == LNET_EVENT_PUT && ev->status == 0) + req->rq_reqlen = ev->mlength; + do_gettimeofday(&req->rq_arrival_time); + req->rq_peer = ev->initiator; + req->rq_self = ev->target.nid; + req->rq_rqbd = rqbd; + req->rq_phase = RQ_PHASE_NEW; +#ifdef CRAY_XT3 + req->rq_uid = ev->uid; +#endif + + spin_lock(&service->srv_lock); + + req->rq_history_seq = service->srv_request_seq++; + list_add_tail(&req->rq_history_list, &service->srv_request_history); + + if (ev->unlinked) { + service->srv_nrqbd_receiving--; + CDEBUG(D_RPCTRACE,"Buffer complete: %d buffers still posted\n", + service->srv_nrqbd_receiving); + + /* Normally, don't complain about 0 buffers posted; LNET won't + * drop incoming reqs since we set the portal lazy */ + if (test_req_buffer_pressure && + ev->type != LNET_EVENT_UNLINK && + service->srv_nrqbd_receiving == 0) + CWARN("All %s request buffers busy\n", + service->srv_name); + + /* req takes over the network's ref on rqbd */ + } else { + /* req takes a ref on rqbd */ + rqbd->rqbd_refcount++; + } + + list_add_tail(&req->rq_list, &service->srv_request_queue); + service->srv_n_queued_reqs++; + + /* NB everything can disappear under us once the request + * has been queued and we unlock, so do the wake now... */ + cfs_waitq_signal(&service->srv_waitq); + + spin_unlock(&service->srv_lock); + EXIT; } /* - * Wake up the thread waiting for the reply once it comes in. + * Server's outgoing reply callback */ -static int reply_in_callback(ptl_event_t *ev, void *data) +void reply_out_callback(lnet_event_t *ev) { - struct ptlrpc_request *rpc = ev->mem_desc.user_ptr; + struct ptlrpc_cb_id *cbid = ev->md.user_ptr; + struct ptlrpc_reply_state *rs = cbid->cbid_arg; + struct ptlrpc_service *svc = rs->rs_service; ENTRY; - if (ev->type == PTL_EVENT_PUT) { - rpc->rq_repmsg = ev->mem_desc.start + ev->offset; - barrier(); - wake_up_interruptible(&rpc->rq_wait_for_rep); - } else { - // XXX make sure we understand all events, including ACK's - CERROR("Unknown event %d\n", ev->type); - LBUG(); + LASSERT (ev->type == LNET_EVENT_SEND || + ev->type == LNET_EVENT_ACK || + ev->type == LNET_EVENT_UNLINK); + + if (!rs->rs_difficult) { + /* 'Easy' replies have no further processing so I drop the + * net's ref on 'rs' */ + LASSERT (ev->unlinked); + ptlrpc_rs_decref(rs); + atomic_dec (&svc->srv_outstanding_replies); + EXIT; + return; } - RETURN(1); + LASSERT (rs->rs_on_net); + + if (ev->unlinked) { + /* Last network callback. The net's ref on 'rs' stays put + * until ptlrpc_server_handle_reply() is done with it */ + spin_lock(&svc->srv_lock); + rs->rs_on_net = 0; + ptlrpc_schedule_difficult_reply (rs); + spin_unlock(&svc->srv_lock); + } + + EXIT; } -int request_in_callback(ptl_event_t *ev, void *data) +/* + * Server's bulk completion callback + */ +void server_bulk_callback (lnet_event_t *ev) { - struct ptlrpc_service *service = data; - int index; + struct ptlrpc_cb_id *cbid = ev->md.user_ptr; + struct ptlrpc_bulk_desc *desc = cbid->cbid_arg; + ENTRY; - if (ev->rlength != ev->mlength) - CERROR("Warning: Possibly truncated rpc (%d/%d)\n", - ev->mlength, ev->rlength); + LASSERT (ev->type == LNET_EVENT_SEND || + ev->type == LNET_EVENT_UNLINK || + (desc->bd_type == BULK_PUT_SOURCE && + ev->type == LNET_EVENT_ACK) || + (desc->bd_type == BULK_GET_SINK && + ev->type == LNET_EVENT_REPLY)); + + CDEBUG((ev->status == 0) ? D_NET : D_ERROR, + "event type %d, status %d, desc %p\n", + ev->type, ev->status, desc); + + spin_lock(&desc->bd_lock); + + if ((ev->type == LNET_EVENT_ACK || + ev->type == LNET_EVENT_REPLY) && + ev->status == 0) { + /* We heard back from the peer, so even if we get this + * before the SENT event (oh yes we can), we know we + * read/wrote the peer buffer and how much... */ + desc->bd_success = 1; + desc->bd_nob_transferred = ev->mlength; + } - spin_lock(&service->srv_lock); - for (index = 0; index < service->srv_ring_length; index++) - if ( service->srv_buf[index] == ev->mem_desc.start) - break; + if (ev->unlinked) { + /* This is the last callback no matter what... */ + desc->bd_network_rw = 0; + cfs_waitq_signal(&desc->bd_waitq); + } + + spin_unlock(&desc->bd_lock); + EXIT; +} - if (index == service->srv_ring_length) - LBUG(); +static void ptlrpc_master_callback(lnet_event_t *ev) +{ + struct ptlrpc_cb_id *cbid = ev->md.user_ptr; + void (*callback)(lnet_event_t *ev) = cbid->cbid_fn; + + /* Honestly, it's best to find out early. */ + LASSERT (cbid->cbid_arg != LP_POISON); + LASSERT (callback == request_out_callback || + callback == reply_in_callback || + callback == client_bulk_callback || + callback == request_in_callback || + callback == reply_out_callback || + callback == server_bulk_callback); + + callback (ev); +} - service->srv_ref_count[index]++; +int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, + lnet_process_id_t *peer, lnet_nid_t *self) +{ + int best_dist = 0; + int best_order = 0; + int count = 0; + int rc = -ENOENT; + int portals_compatibility; + int dist; + int order; + lnet_nid_t dst_nid; + lnet_nid_t src_nid; + + portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL); + + peer->pid = LUSTRE_SRV_LNET_PID; + + /* Choose the matching UUID that's closest */ + while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) { + dist = LNetDist(dst_nid, &src_nid, &order); + if (dist < 0) + continue; + + if (dist == 0) { /* local! use loopback LND */ + peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0); + rc = 0; + break; + } + + LASSERT (order >= 0); + if (rc < 0 || + dist < best_dist || + (dist == best_dist && order < best_order)) { + best_dist = dist; + best_order = order; + + if (portals_compatibility > 1) { + /* Strong portals compatibility: Zero the nid's + * NET, so if I'm reading new config logs, or + * getting configured by (new) lconf I can + * still talk to old servers. */ + dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid)); + src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid)); + } + peer->nid = dst_nid; + *self = src_nid; + rc = 0; + } + } - if (ptl_is_valid_handle(&ev->unlinked_me)) { - int idx; + CDEBUG(D_NET,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer)); + if (rc != 0) + CERROR("No NID found for %s\n", uuid->uuid); + return rc; +} - for (idx = 0; idx < service->srv_ring_length; idx++) - if (service->srv_me_h[idx].handle_idx == - ev->unlinked_me.handle_idx) - break; - if (idx == service->srv_ring_length) +void ptlrpc_ni_fini(void) +{ + cfs_waitq_t waitq; + struct l_wait_info lwi; + int rc; + int retries; + + /* Wait for the event queue to become idle since there may still be + * messages in flight with pending events (i.e. the fire-and-forget + * messages == client requests and "non-difficult" server + * replies */ + + for (retries = 0;; retries++) { + rc = LNetEQFree(ptlrpc_eq_h); + switch (rc) { + default: LBUG(); - CDEBUG(D_NET, "unlinked %d\n", idx); - ptl_set_inv_handle(&(service->srv_me_h[idx])); + case 0: + LNetNIFini(); + return; + + case -EBUSY: + if (retries != 0) + CWARN("Event queue still busy\n"); + + /* Wait for a bit */ + cfs_waitq_init(&waitq); + lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL); + l_wait_event(waitq, 0, &lwi); + break; + } + } + /* notreached */ +} - if (service->srv_ref_count[idx] == 0) - ptlrpc_link_svc_me(service, idx); +lnet_pid_t ptl_get_pid(void) +{ + lnet_pid_t pid; + +#ifndef __KERNEL__ + pid = getpid(); +#else + pid = LUSTRE_SRV_LNET_PID; +#endif + return pid; +} + +int ptlrpc_ni_init(void) +{ + int rc; + lnet_pid_t pid; + + pid = ptl_get_pid(); + CDEBUG(D_NET, "My pid is: %x\n", pid); + + /* We're not passing any limits yet... */ + rc = LNetNIInit(pid); + if (rc < 0) { + CDEBUG (D_NET, "Can't init network interface: %d\n", rc); + return (-ENOENT); } - spin_unlock(&service->srv_lock); - if (ev->type == PTL_EVENT_PUT) - wake_up(&service->srv_waitq); - else - CERROR("Unexpected event type: %d\n", ev->type); + /* CAVEAT EMPTOR: how we process portals events is _radically_ + * different depending on... */ +#ifdef __KERNEL__ + /* kernel portals calls our master callback when events are added to + * the event queue. In fact lustre never pulls events off this queue, + * so it's only sized for some debug history. */ + rc = LNetEQAlloc(1024, ptlrpc_master_callback, &ptlrpc_eq_h); +#else + /* liblustre calls the master callback when it removes events from the + * event queue. The event queue has to be big enough not to drop + * anything */ + rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &ptlrpc_eq_h); +#endif + if (rc == 0) + return 0; + + CERROR ("Failed to allocate event queue: %d\n", rc); + LNetNIFini(); + + return (-ENOMEM); +} + +#ifndef __KERNEL__ +CFS_LIST_HEAD(liblustre_wait_callbacks); +CFS_LIST_HEAD(liblustre_idle_callbacks); +void *liblustre_services_callback; - return 0; +void * +liblustre_register_waitidle_callback (struct list_head *callback_list, + const char *name, + int (*fn)(void *arg), void *arg) +{ + struct liblustre_wait_callback *llwc; + + OBD_ALLOC(llwc, sizeof(*llwc)); + LASSERT (llwc != NULL); + + llwc->llwc_name = name; + llwc->llwc_fn = fn; + llwc->llwc_arg = arg; + list_add_tail(&llwc->llwc_list, callback_list); + + return (llwc); } -static int bulk_source_callback(ptl_event_t *ev, void *data) +void +liblustre_deregister_waitidle_callback (void *opaque) { - struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr; - struct ptlrpc_bulk_desc *desc = bulk->b_desc; - ENTRY; + struct liblustre_wait_callback *llwc = opaque; + + list_del(&llwc->llwc_list); + OBD_FREE(llwc, sizeof(*llwc)); +} - if (ev->type == PTL_EVENT_SENT) { - CDEBUG(D_NET, "got SENT event\n"); - } else if (ev->type == PTL_EVENT_ACK) { - CDEBUG(D_NET, "got ACK event\n"); - desc->b_flags |= PTL_BULK_FL_SENT; - wake_up_interruptible(&desc->b_waitq); - } else { - CERROR("Unexpected event type!\n"); - LBUG(); - } +void * +liblustre_register_wait_callback (const char *name, + int (*fn)(void *arg), void *arg) +{ + return liblustre_register_waitidle_callback(&liblustre_wait_callbacks, + name, fn, arg); +} - RETURN(1); +void +liblustre_deregister_wait_callback (void *opaque) +{ + liblustre_deregister_waitidle_callback(opaque); } -static int bulk_sink_callback(ptl_event_t *ev, void *data) +void * +liblustre_register_idle_callback (const char *name, + int (*fn)(void *arg), void *arg) { - struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr; - struct ptlrpc_bulk_desc *desc = bulk->b_desc; + return liblustre_register_waitidle_callback(&liblustre_idle_callbacks, + name, fn, arg); +} + +void +liblustre_deregister_idle_callback (void *opaque) +{ + liblustre_deregister_waitidle_callback(opaque); +} + +int +liblustre_check_events (int timeout) +{ + lnet_event_t ev; + int rc; + int i; ENTRY; - if (ev->type == PTL_EVENT_PUT) { - if (bulk->b_buf != ev->mem_desc.start + ev->offset) - CERROR("bulkbuf != mem_desc -- why?\n"); - desc->b_finished_count++; - if (desc->b_finished_count == desc->b_page_count) { - desc->b_flags |= PTL_BULK_FL_RCVD; - wake_up_interruptible(&desc->b_waitq); - if (desc->b_cb != NULL) - desc->b_cb(desc); - } - if (bulk->b_cb != NULL) - bulk->b_cb(bulk); - } else { - CERROR("Unexpected event type!\n"); - LBUG(); + rc = LNetEQPoll(&ptlrpc_eq_h, 1, timeout * 1000, &ev, &i); + if (rc == 0) + RETURN(0); + + LASSERT (rc == -EOVERFLOW || rc == 1); + + /* liblustre: no asynch callback so we can't affort to miss any + * events... */ + if (rc == -EOVERFLOW) { + CERROR ("Dropped an event!!!\n"); + abort(); } - + + ptlrpc_master_callback (&ev); RETURN(1); } -int ptlrpc_init_portals(void) +int liblustre_waiting = 0; + +int +liblustre_wait_event (int timeout) { - int rc; - ptl_handle_ni_t ni; + struct list_head *tmp; + struct liblustre_wait_callback *llwc; + int found_something = 0; + + /* single threaded recursion check... */ + liblustre_waiting = 1; + + for (;;) { + /* Deal with all pending events */ + while (liblustre_check_events(0)) + found_something = 1; + + /* Give all registered callbacks a bite at the cherry */ + list_for_each(tmp, &liblustre_wait_callbacks) { + llwc = list_entry(tmp, struct liblustre_wait_callback, + llwc_list); + + if (llwc->llwc_fn(llwc->llwc_arg)) + found_something = 1; + } - socknal_nip = inter_module_get_request("ksocknal_ni", "ksocknal"); - qswnal_nip = inter_module_get_request("kqswnal_ni", "kqswnal"); - if (socknal_nip == NULL && qswnal_nip == NULL) { - CERROR("get_ni failed: is a NAL module loaded?\n"); - return -EIO; + if (found_something || timeout == 0) + break; + + /* Nothing so far, but I'm allowed to block... */ + found_something = liblustre_check_events(timeout); + if (!found_something) /* still nothing */ + break; /* I timed out */ } - /* Use the qswnal if it's there */ - if (qswnal_nip != NULL) - ni = *qswnal_nip; - else - ni = *socknal_nip; + liblustre_waiting = 0; - rc = PtlEQAlloc(ni, 128, request_out_callback, NULL, &request_out_eq); - if (rc != PTL_OK) - CERROR("PtlEQAlloc failed: %d\n", rc); + return found_something; +} - rc = PtlEQAlloc(ni, 128, reply_out_callback, NULL, &reply_out_eq); - if (rc != PTL_OK) - CERROR("PtlEQAlloc failed: %d\n", rc); +void +liblustre_wait_idle(void) +{ + static int recursed = 0; + + struct list_head *tmp; + struct liblustre_wait_callback *llwc; + int idle = 0; + + LASSERT(!recursed); + recursed = 1; + + do { + liblustre_wait_event(0); + + idle = 1; + + list_for_each(tmp, &liblustre_idle_callbacks) { + llwc = list_entry(tmp, struct liblustre_wait_callback, + llwc_list); + + if (!llwc->llwc_fn(llwc->llwc_arg)) { + idle = 0; + break; + } + } + + } while (!idle); - rc = PtlEQAlloc(ni, 128, reply_in_callback, NULL, &reply_in_eq); - if (rc != PTL_OK) - CERROR("PtlEQAlloc failed: %d\n", rc); + recursed = 0; +} - rc = PtlEQAlloc(ni, 128, bulk_source_callback, NULL, &bulk_source_eq); - if (rc != PTL_OK) - CERROR("PtlEQAlloc failed: %d\n", rc); +#endif /* __KERNEL__ */ - rc = PtlEQAlloc(ni, 128, bulk_sink_callback, NULL, &bulk_sink_eq); - if (rc != PTL_OK) - CERROR("PtlEQAlloc failed: %d\n", rc); +int ptlrpc_init_portals(void) +{ + int rc = ptlrpc_ni_init(); + if (rc != 0) { + CERROR("network initialisation failed\n"); + return -EIO; + } +#ifndef __KERNEL__ + liblustre_services_callback = + liblustre_register_wait_callback("liblustre_check_services", + &liblustre_check_services, NULL); +#endif + rc = ptlrpcd_addref(); + if (rc == 0) + return 0; + + CERROR("rpcd initialisation failed\n"); +#ifndef __KERNEL__ + liblustre_deregister_wait_callback(liblustre_services_callback); +#endif + ptlrpc_ni_fini(); return rc; } void ptlrpc_exit_portals(void) { - PtlEQFree(request_out_eq); - PtlEQFree(reply_out_eq); - PtlEQFree(reply_in_eq); - PtlEQFree(bulk_source_eq); - PtlEQFree(bulk_sink_eq); - - if (qswnal_nip != NULL) - inter_module_put("kqswnal_ni"); - if (socknal_nip != NULL) - inter_module_put("ksocknal_ni"); +#ifndef __KERNEL__ + liblustre_deregister_wait_callback(liblustre_services_callback); +#endif + ptlrpcd_decref(); + ptlrpc_ni_fini(); }