X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fptlrpc%2Fclient.c;h=a41cd32edec11c7cf98ced6209196cd13ec9345a;hb=a643e3860f5397f904cc3cc937134c3ac841d7db;hp=2e1d16163ff051a9dc66d8b81fd79c16f5fded44;hpb=18d78c77953017e5a76cd10bc74a0d078217a626;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 2e1d161..a41cd32 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -54,6 +52,8 @@ #include "ptlrpc_internal.h" +static int ptlrpc_send_new_req(struct ptlrpc_request *req); + /** * Initialize passed in client structure \a cl. */ @@ -64,6 +64,7 @@ void ptlrpc_init_client(int req_portal, int rep_portal, char *name, cl->cli_reply_portal = rep_portal; cl->cli_name = name; } +EXPORT_SYMBOL(ptlrpc_init_client); /** * Return PortalRPC connection for remore uud \a uuid @@ -91,6 +92,7 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid) return c; } +EXPORT_SYMBOL(ptlrpc_uuid_to_connection); /** * Allocate and initialize new bulk descriptor @@ -146,14 +148,15 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req, return desc; } +EXPORT_SYMBOL(ptlrpc_prep_bulk_imp); /** * Add a page \a page to the bulk descriptor \a desc. * Data to transfer in the page starts at offset \a pageoffset and * amount of data to transfer from the page is \a len */ -void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc, - cfs_page_t *page, int pageoffset, int len) +void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc, + cfs_page_t *page, int pageoffset, int len, int pin) { LASSERT(desc->bd_iov_count < desc->bd_max_iov); LASSERT(page != NULL); @@ -163,15 +166,18 @@ void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc, desc->bd_nob += len; - cfs_page_pin(page); + if (pin) + cfs_page_pin(page); + ptlrpc_add_bulk_page(desc, page, pageoffset, len); } +EXPORT_SYMBOL(__ptlrpc_prep_bulk_page); /** * Uninitialize and free bulk descriptor \a desc. * Works on bulk descriptors both from server and client side. */ -void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc) +void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc, int unpin) { int i; ENTRY; @@ -188,13 +194,16 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc) else class_import_put(desc->bd_import); - for (i = 0; i < desc->bd_iov_count ; i++) - cfs_page_unpin(desc->bd_iov[i].kiov_page); + if (unpin) { + for (i = 0; i < desc->bd_iov_count ; i++) + cfs_page_unpin(desc->bd_iov[i].kiov_page); + } OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, bd_iov[desc->bd_max_iov])); EXIT; } +EXPORT_SYMBOL(__ptlrpc_free_bulk); /** * Set server timelimit for this req, i.e. how long are we willing to wait @@ -233,6 +242,7 @@ void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req) reqmsg*/ lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout); } +EXPORT_SYMBOL(ptlrpc_at_set_req_timeout); /* Adjust max service estimate based on server value */ static void ptlrpc_at_adj_service(struct ptlrpc_request *req, @@ -387,6 +397,7 @@ void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool) cfs_spin_unlock(&pool->prp_lock); OBD_FREE(pool, sizeof(*pool)); } +EXPORT_SYMBOL(ptlrpc_free_rq_pool); /** * Allocates, initializes and adds \a num_rq requests to the pool \a pool @@ -428,6 +439,7 @@ void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq) cfs_spin_unlock(&pool->prp_lock); return; } +EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool); /** * Create and initialize new request pool with given attributes: @@ -464,6 +476,7 @@ ptlrpc_init_rq_pool(int num_rq, int msgsize, } return pool; } +EXPORT_SYMBOL(ptlrpc_init_rq_pool); /** * Fetches one request from pool \a pool @@ -608,8 +621,32 @@ EXPORT_SYMBOL(ptlrpc_request_bufs_pack); int ptlrpc_request_pack(struct ptlrpc_request *request, __u32 version, int opcode) { - return ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL); -} + int rc; + rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL); + if (rc) + return rc; + + /* For some old 1.8 clients (< 1.8.7), they will LASSERT the size of + * ptlrpc_body sent from server equal to local ptlrpc_body size, so we + * have to send old ptlrpc_body to keep interoprability with these + * clients. + * + * Only three kinds of server->client RPCs so far: + * - LDLM_BL_CALLBACK + * - LDLM_CP_CALLBACK + * - LDLM_GL_CALLBACK + * + * XXX This should be removed whenever we drop the interoprability with + * the these old clients. + */ + if (opcode == LDLM_BL_CALLBACK || opcode == LDLM_CP_CALLBACK || + opcode == LDLM_GL_CALLBACK) + req_capsule_shrink(&request->rq_pill, &RMF_PTLRPC_BODY, + sizeof(struct ptlrpc_body_v2), RCL_CLIENT); + + return rc; +} +EXPORT_SYMBOL(ptlrpc_request_pack); /** * Helper function to allocate new request on import \a imp @@ -675,6 +712,7 @@ struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp, { return ptlrpc_request_alloc_internal(imp, NULL, format); } +EXPORT_SYMBOL(ptlrpc_request_alloc); /** * Allocate new request structure for import \a imp from pool \a pool and @@ -686,6 +724,7 @@ struct ptlrpc_request *ptlrpc_request_alloc_pool(struct obd_import *imp, { return ptlrpc_request_alloc_internal(imp, pool, format); } +EXPORT_SYMBOL(ptlrpc_request_alloc_pool); /** * For requests not from pool, free memory of the request structure. @@ -698,6 +737,7 @@ void ptlrpc_request_free(struct ptlrpc_request *request) else OBD_FREE_PTR(request); } +EXPORT_SYMBOL(ptlrpc_request_free); /** * Allocate new request for operatione \a opcode and immediatelly pack it for @@ -722,6 +762,7 @@ struct ptlrpc_request *ptlrpc_request_alloc_pack(struct obd_import *imp, } return req; } +EXPORT_SYMBOL(ptlrpc_request_alloc_pack); /** * Prepare request (fetched from pool \a poolif not NULL) on import \a imp @@ -751,6 +792,7 @@ ptlrpc_prep_req_pool(struct obd_import *imp, } return request; } +EXPORT_SYMBOL(ptlrpc_prep_req_pool); /** * Same as ptlrpc_prep_req_pool, but without pool @@ -762,6 +804,7 @@ ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode, int count, return ptlrpc_prep_req_pool(imp, version, opcode, count, lengths, bufs, NULL); } +EXPORT_SYMBOL(ptlrpc_prep_req); /** * Allocate "fake" request that would not be sent anywhere in the end. @@ -816,6 +859,7 @@ struct ptlrpc_request *ptlrpc_prep_fakereq(struct obd_import *imp, RETURN(request); } +EXPORT_SYMBOL(ptlrpc_prep_fakereq); /** * Indicate that processing of "fake" request is finished. @@ -841,6 +885,7 @@ void ptlrpc_fakereq_finished(struct ptlrpc_request *req) cfs_waitq_signal(&set->set_waitq); ptlrpc_req_finished(req); } +EXPORT_SYMBOL(ptlrpc_fakereq_finished); /** * Allocate and initialize new request set structure. @@ -848,23 +893,54 @@ void ptlrpc_fakereq_finished(struct ptlrpc_request *req) */ struct ptlrpc_request_set *ptlrpc_prep_set(void) { - struct ptlrpc_request_set *set; + struct ptlrpc_request_set *set; + + ENTRY; + OBD_ALLOC(set, sizeof *set); + if (!set) + RETURN(NULL); + cfs_atomic_set(&set->set_refcount, 1); + CFS_INIT_LIST_HEAD(&set->set_requests); + cfs_waitq_init(&set->set_waitq); + cfs_atomic_set(&set->set_new_count, 0); + cfs_atomic_set(&set->set_remaining, 0); + cfs_spin_lock_init(&set->set_new_req_lock); + CFS_INIT_LIST_HEAD(&set->set_new_requests); + CFS_INIT_LIST_HEAD(&set->set_cblist); + set->set_max_inflight = UINT_MAX; + set->set_producer = NULL; + set->set_producer_arg = NULL; + set->set_rc = 0; + + RETURN(set); +} +EXPORT_SYMBOL(ptlrpc_prep_set); - ENTRY; - OBD_ALLOC(set, sizeof *set); - if (!set) - RETURN(NULL); - cfs_atomic_set(&set->set_refcount, 1); - CFS_INIT_LIST_HEAD(&set->set_requests); - cfs_waitq_init(&set->set_waitq); - cfs_atomic_set(&set->set_new_count, 0); - cfs_atomic_set(&set->set_remaining, 0); - cfs_spin_lock_init(&set->set_new_req_lock); - CFS_INIT_LIST_HEAD(&set->set_new_requests); - CFS_INIT_LIST_HEAD(&set->set_cblist); +/** + * Allocate and initialize new request set structure with flow control + * extension. This extension allows to control the number of requests in-flight + * for the whole set. A callback function to generate requests must be provided + * and the request set will keep the number of requests sent over the wire to + * @max_inflight. + * Returns a pointer to the newly allocated set structure or NULL on error. + */ +struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func, + void *arg) + +{ + struct ptlrpc_request_set *set; + + set = ptlrpc_prep_set(); + if (!set) + RETURN(NULL); - RETURN(set); + set->set_max_inflight = max; + set->set_producer = func; + set->set_producer_arg = arg; + + RETURN(set); } +EXPORT_SYMBOL(ptlrpc_prep_fcset); /** * Wind down and free request set structure previously allocated with @@ -924,6 +1000,7 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) ptlrpc_reqset_put(set); EXIT; } +EXPORT_SYMBOL(ptlrpc_set_destroy); /** * Add a callback function \a fn to the set. @@ -945,6 +1022,7 @@ int ptlrpc_set_add_cb(struct ptlrpc_request_set *set, RETURN(0); } +EXPORT_SYMBOL(ptlrpc_set_add_cb); /** * Add a new request to the general purpose request set. @@ -953,14 +1031,23 @@ int ptlrpc_set_add_cb(struct ptlrpc_request_set *set, void ptlrpc_set_add_req(struct ptlrpc_request_set *set, struct ptlrpc_request *req) { - LASSERT(cfs_list_empty(&req->rq_set_chain)); + LASSERT(cfs_list_empty(&req->rq_set_chain)); - /* The set takes over the caller's request reference */ - cfs_list_add_tail(&req->rq_set_chain, &set->set_requests); - req->rq_set = set; - cfs_atomic_inc(&set->set_remaining); - req->rq_queued_time = cfs_time_current(); + /* The set takes over the caller's request reference */ + cfs_list_add_tail(&req->rq_set_chain, &set->set_requests); + req->rq_set = set; + cfs_atomic_inc(&set->set_remaining); + req->rq_queued_time = cfs_time_current(); + + if (req->rq_reqmsg != NULL) + lustre_msg_set_jobid(req->rq_reqmsg, NULL); + + if (set->set_producer != NULL) + /* If the request set has a producer callback, the RPC must be + * sent straight away */ + ptlrpc_send_new_req(req); } +EXPORT_SYMBOL(ptlrpc_set_add_req); /** * Add a request to a request with dedicated server thread @@ -997,6 +1084,7 @@ void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc, cfs_waitq_signal(&pc->pc_partners[i]->pc_set->set_waitq); } } +EXPORT_SYMBOL(ptlrpc_set_add_new_req); /** * Based on the current state of the import, determine if the request @@ -1022,7 +1110,6 @@ static int ptlrpc_import_delay_req(struct obd_import *imp, } else if (imp->imp_state == LUSTRE_IMP_NEW) { DEBUG_REQ(D_ERROR, req, "Uninitialized import."); *status = -EIO; - LBUG(); } else if (imp->imp_state == LUSTRE_IMP_CLOSED) { DEBUG_REQ(D_ERROR, req, "IMP_CLOSED "); *status = -EIO; @@ -1108,24 +1195,6 @@ static int ptlrpc_check_status(struct ptlrpc_request *req) if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) { struct obd_import *imp = req->rq_import; __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); - LCONSOLE_ERROR_MSG(0x011,"an error occurred while communicating" - " with %s. The %s operation failed with %d\n", - libcfs_nid2str(imp->imp_connection->c_peer.nid), - ll_opcode2str(opc), err); - RETURN(err < 0 ? err : -EINVAL); - } - - if (err < 0) { - DEBUG_REQ(D_INFO, req, "status is %d", err); - } else if (err > 0) { - /* XXX: translate this error from net to host */ - DEBUG_REQ(D_INFO, req, "status is %d", err); - } - - if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) { - struct obd_import *imp = req->rq_import; - __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); - if (ptlrpc_console_allow(req)) LCONSOLE_ERROR_MSG(0x011,"an error occurred while " "communicating with %s. The %s " @@ -1133,10 +1202,16 @@ static int ptlrpc_check_status(struct ptlrpc_request *req) libcfs_nid2str( imp->imp_connection->c_peer.nid), ll_opcode2str(opc), err); - RETURN(err < 0 ? err : -EINVAL); } + if (err < 0) { + DEBUG_REQ(D_INFO, req, "status is %d", err); + } else if (err > 0) { + /* XXX: translate this error from net to host */ + DEBUG_REQ(D_INFO, req, "status is %d", err); + } + RETURN(err); } @@ -1212,6 +1287,27 @@ static int after_reply(struct ptlrpc_request *req) RETURN(rc); } + /* retry indefinitely on EINPROGRESS */ + if (lustre_msg_get_status(req->rq_repmsg) == -EINPROGRESS && + ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) { + time_t now = cfs_time_current_sec(); + + DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS"); + req->rq_resend = 1; + req->rq_nr_resend++; + + /* Readjust the timeout for current conditions */ + ptlrpc_at_set_req_timeout(req); + /* delay resend to give a chance to the server to get ready. + * The delay is increased by 1s on every resend and is capped to + * the current request timeout (i.e. obd_timeout if AT is off, + * or AT service time x 125% + 5s, see at_est2timeout) */ + if (req->rq_nr_resend > req->rq_timeout) + req->rq_sent = now + req->rq_timeout; + else + req->rq_sent = now + req->rq_nr_resend; + } + /* * Security layer unwrap might ask resend this request. */ @@ -1317,23 +1413,25 @@ static int after_reply(struct ptlrpc_request *req) * Helper function to send request \a req over the network for the first time * Also adjusts request phase. * Returns 0 on success or error code. - */ + */ static int ptlrpc_send_new_req(struct ptlrpc_request *req) { - struct obd_import *imp; + struct obd_import *imp = req->rq_import; int rc; ENTRY; LASSERT(req->rq_phase == RQ_PHASE_NEW); - if (req->rq_sent && (req->rq_sent > cfs_time_current_sec())) + if (req->rq_sent && (req->rq_sent > cfs_time_current_sec()) && + (!req->rq_generation_set || + req->rq_import_generation == imp->imp_generation)) RETURN (0); ptlrpc_rqphase_move(req, RQ_PHASE_RPC); - imp = req->rq_import; cfs_spin_lock(&imp->imp_lock); - req->rq_import_generation = imp->imp_generation; + if (!req->rq_generation_set) + req->rq_import_generation = imp->imp_generation; if (ptlrpc_import_delay_req(imp, req, &rc)) { cfs_spin_lock(&req->rq_lock); @@ -1392,6 +1490,30 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) RETURN(0); } +static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set) +{ + int remaining, rc; + ENTRY; + + LASSERT(set->set_producer != NULL); + + remaining = cfs_atomic_read(&set->set_remaining); + + /* populate the ->set_requests list with requests until we + * reach the maximum number of RPCs in flight for this set */ + while (cfs_atomic_read(&set->set_remaining) < set->set_max_inflight) { + rc = set->set_producer(set, set->set_producer_arg); + if (rc == -ENOENT) { + /* no more RPC to produce */ + set->set_producer = NULL; + set->set_producer_arg = NULL; + RETURN(0); + } + } + + RETURN((cfs_atomic_read(&set->set_remaining) - remaining)); +} + /** * this sends any unsent RPCs in \a set and returns 1 if all are sent * and no more replies are expected. @@ -1400,14 +1522,14 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) */ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) { - cfs_list_t *tmp; + cfs_list_t *tmp, *next; int force_timer_recalc = 0; ENTRY; if (cfs_atomic_read(&set->set_remaining) == 0) RETURN(1); - cfs_list_for_each(tmp, &set->set_requests) { + cfs_list_for_each_safe(tmp, next, &set->set_requests) { struct ptlrpc_request *req = cfs_list_entry(tmp, struct ptlrpc_request, rq_set_chain); @@ -1422,7 +1544,12 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) /* delayed send - skip */ if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent) - continue; + continue; + + /* delayed resend - skip */ + if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend && + req->rq_sent > cfs_time_current_sec()) + continue; if (!(req->rq_phase == RQ_PHASE_RPC || req->rq_phase == RQ_PHASE_BULK || @@ -1709,13 +1836,13 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE); - CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:" - "opc %s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(), - imp->imp_obd->obd_uuid.uuid, - req->rq_reqmsg ? lustre_msg_get_status(req->rq_reqmsg):-1, - req->rq_xid, - libcfs_nid2str(imp->imp_connection->c_peer.nid), - req->rq_reqmsg ? lustre_msg_get_opc(req->rq_reqmsg) : -1); + CDEBUG(req->rq_reqmsg != NULL ? D_RPCTRACE : 0, + "Completed RPC pname:cluuid:pid:xid:nid:" + "opc %s:%s:%d:"LPU64":%s:%d\n", + cfs_curproc_comm(), imp->imp_obd->obd_uuid.uuid, + lustre_msg_get_status(req->rq_reqmsg), req->rq_xid, + libcfs_nid2str(imp->imp_connection->c_peer.nid), + lustre_msg_get_opc(req->rq_reqmsg)); cfs_spin_lock(&imp->imp_lock); /* Request already may be not on sending or delaying list. This @@ -1730,11 +1857,31 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) cfs_atomic_dec(&set->set_remaining); cfs_waitq_broadcast(&imp->imp_recovery_waitq); + + if (set->set_producer) { + /* produce a new request if possible */ + if (ptlrpc_set_producer(set) > 0) + force_timer_recalc = 1; + + /* free the request that has just been completed + * in order not to pollute set->set_requests */ + cfs_list_del_init(&req->rq_set_chain); + cfs_spin_lock(&req->rq_lock); + req->rq_set = NULL; + req->rq_invalid_rqset = 0; + cfs_spin_unlock(&req->rq_lock); + + /* record rq_status to compute the final status later */ + if (req->rq_status != 0) + set->set_rc = req->rq_status; + ptlrpc_req_finished(req); + } } /* If we hit an error, we want to recover promptly. */ RETURN(cfs_atomic_read(&set->set_remaining) == 0 || force_timer_recalc); } +EXPORT_SYMBOL(ptlrpc_check_set); /** * Time out request \a req. is \a async_unlink is set, that means do not wait @@ -1859,6 +2006,7 @@ int ptlrpc_expired_set(void *data) */ RETURN(1); } +EXPORT_SYMBOL(ptlrpc_expired_set); /** * Sets rq_intr flag in \a req under spinlock. @@ -1869,6 +2017,7 @@ void ptlrpc_mark_interrupted(struct ptlrpc_request *req) req->rq_intr = 1; cfs_spin_unlock(&req->rq_lock); } +EXPORT_SYMBOL(ptlrpc_mark_interrupted); /** * Interrupts (sets interrupted flag) all uncompleted requests in @@ -1894,6 +2043,7 @@ void ptlrpc_interrupted_set(void *data) ptlrpc_mark_interrupted(req); } } +EXPORT_SYMBOL(ptlrpc_interrupted_set); /** * Get the smallest timeout in the set; this does NOT set a timeout. @@ -1934,6 +2084,8 @@ int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) if (req->rq_phase == RQ_PHASE_NEW) deadline = req->rq_sent; + else if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend) + deadline = req->rq_sent; else deadline = req->rq_sent + req->rq_timeout; @@ -1944,6 +2096,7 @@ int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) } RETURN(timeout); } +EXPORT_SYMBOL(ptlrpc_set_next_timeout); /** * Send all unset request from the set and then wait untill all @@ -1959,15 +2112,19 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) int rc, timeout; ENTRY; + if (set->set_producer) + (void)ptlrpc_set_producer(set); + else + cfs_list_for_each(tmp, &set->set_requests) { + req = cfs_list_entry(tmp, struct ptlrpc_request, + rq_set_chain); + if (req->rq_phase == RQ_PHASE_NEW) + (void)ptlrpc_send_new_req(req); + } + if (cfs_list_empty(&set->set_requests)) RETURN(0); - cfs_list_for_each(tmp, &set->set_requests) { - req = cfs_list_entry(tmp, struct ptlrpc_request, rq_set_chain); - if (req->rq_phase == RQ_PHASE_NEW) - (void)ptlrpc_send_new_req(req); - } - do { timeout = ptlrpc_set_next_timeout(set); @@ -2011,7 +2168,7 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) * reentrant from userspace again */ if (cfs_signal_pending()) ptlrpc_interrupted_set(set); - cfs_block_sigs(blocked_sigs); + cfs_restore_sigs(blocked_sigs); } LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT); @@ -2036,7 +2193,7 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) LASSERT(cfs_atomic_read(&set->set_remaining) == 0); - rc = 0; + rc = set->set_rc; /* rq_status of already freed requests if any */ cfs_list_for_each(tmp, &set->set_requests) { req = cfs_list_entry(tmp, struct ptlrpc_request, rq_set_chain); @@ -2065,6 +2222,7 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) RETURN(rc); } +EXPORT_SYMBOL(ptlrpc_set_wait); /** * Helper fuction for request freeing. @@ -2144,6 +2302,7 @@ void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request) LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock); (void)__ptlrpc_req_finished(request, 1); } +EXPORT_SYMBOL(ptlrpc_req_finished_with_imp_lock); /** * Helper function @@ -2182,6 +2341,7 @@ void ptlrpc_req_finished(struct ptlrpc_request *request) { __ptlrpc_req_finished(request, 0); } +EXPORT_SYMBOL(ptlrpc_req_finished); /** * Returns xid of a \a request @@ -2271,6 +2431,7 @@ int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) } RETURN(0); } +EXPORT_SYMBOL(ptlrpc_unregister_reply); /** * Iterates through replay_list on import and prunes @@ -2355,6 +2516,7 @@ void ptlrpc_cleanup_client(struct obd_import *imp) EXIT; return; } +EXPORT_SYMBOL(ptlrpc_cleanup_client); /** * Schedule previously sent request for resend. @@ -2383,6 +2545,7 @@ void ptlrpc_resend_req(struct ptlrpc_request *req) ptlrpc_client_wake_req(req); cfs_spin_unlock(&req->rq_lock); } +EXPORT_SYMBOL(ptlrpc_resend_req); /* XXX: this function and rq_status are currently unused */ void ptlrpc_restart_req(struct ptlrpc_request *req) @@ -2396,6 +2559,7 @@ void ptlrpc_restart_req(struct ptlrpc_request *req) ptlrpc_client_wake_req(req); cfs_spin_unlock(&req->rq_lock); } +EXPORT_SYMBOL(ptlrpc_restart_req); /** * Grab additional reference on a request \a req @@ -2406,6 +2570,7 @@ struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req) cfs_atomic_inc(&req->rq_refcount); RETURN(req); } +EXPORT_SYMBOL(ptlrpc_request_addref); /** * Add a request to import replay_list. @@ -2462,6 +2627,7 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, cfs_list_add(&req->rq_replay_list, &imp->imp_replay_list); } +EXPORT_SYMBOL(ptlrpc_retain_replayable_request); /** * Send request and wait until it completes. @@ -2493,6 +2659,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) RETURN(rc); } +EXPORT_SYMBOL(ptlrpc_queue_wait); struct ptlrpc_replay_async_args { int praa_old_state; @@ -2635,6 +2802,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); RETURN(0); } +EXPORT_SYMBOL(ptlrpc_replay_req); /** * Aborts all in-flight request on import \a imp sending and delayed lists @@ -2663,7 +2831,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) cfs_spin_lock (&req->rq_lock); if (req->rq_import_generation < imp->imp_generation) { req->rq_err = 1; - req->rq_status = -EINTR; + req->rq_status = -EIO; ptlrpc_client_wake_req(req); } cfs_spin_unlock (&req->rq_lock); @@ -2678,7 +2846,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) cfs_spin_lock (&req->rq_lock); if (req->rq_import_generation < imp->imp_generation) { req->rq_err = 1; - req->rq_status = -EINTR; + req->rq_status = -EIO; ptlrpc_client_wake_req(req); } cfs_spin_unlock (&req->rq_lock); @@ -2693,6 +2861,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) EXIT; } +EXPORT_SYMBOL(ptlrpc_abort_inflight); /** * Abort all uncompleted requests in request set \a set @@ -2765,6 +2934,7 @@ __u64 ptlrpc_next_xid(void) cfs_spin_unlock(&ptlrpc_last_xid_lock); return tmp; } +EXPORT_SYMBOL(ptlrpc_next_xid); /** * Get a glimpse at what next xid value might have been.