X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fptlrpc%2Fclient.c;h=59f4cc5935d36a387a25480d89729fed9d3480eb;hp=0c62264e17a86f385952f38773d192549410cc55;hb=bb75072cb679bf52e00537c19e42f8e4e95255b6;hpb=39160ccc60eb88c4a665d5d129abceacd2860de2 diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 0c62264..59f4cc5 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -128,19 +128,21 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw, (ptlrpc_is_bulk_desc_kvec(type) && ops->add_iov_frag != NULL)); + OBD_ALLOC_PTR(desc); + if (desc == NULL) + return NULL; if (type & PTLRPC_BULK_BUF_KIOV) { - OBD_ALLOC(desc, - offsetof(struct ptlrpc_bulk_desc, - bd_u.bd_kiov.bd_vec[nfrags])); + OBD_ALLOC_LARGE(GET_KIOV(desc), + nfrags * sizeof(*GET_KIOV(desc))); + if (GET_KIOV(desc) == NULL) + goto out; } else { - OBD_ALLOC(desc, - offsetof(struct ptlrpc_bulk_desc, - bd_u.bd_kvec.bd_kvec[nfrags])); + OBD_ALLOC_LARGE(GET_KVEC(desc), + nfrags * sizeof(*GET_KVEC(desc))); + if (GET_KVEC(desc) == NULL) + goto out; } - if (!desc) - return NULL; - spin_lock_init(&desc->bd_lock); init_waitqueue_head(&desc->bd_waitq); desc->bd_max_iov = nfrags; @@ -157,6 +159,9 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw, LNetInvalidateHandle(&desc->bd_mds[i]); return desc; +out: + OBD_FREE_PTR(desc); + return NULL; } /** @@ -207,7 +212,7 @@ void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc, LASSERT(page != NULL); LASSERT(pageoffset >= 0); LASSERT(len > 0); - LASSERT(pageoffset + len <= PAGE_CACHE_SIZE); + LASSERT(pageoffset + len <= PAGE_SIZE); LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type)); kiov = &BD_GET_KIOV(desc, desc->bd_iov_count); @@ -271,13 +276,12 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc) desc->bd_frag_ops->release_frags(desc); if (ptlrpc_is_bulk_desc_kiov(desc->bd_type)) - OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, - bd_u.bd_kiov.bd_vec[desc->bd_max_iov])); + OBD_FREE_LARGE(GET_KIOV(desc), + desc->bd_max_iov * sizeof(*GET_KIOV(desc))); else - OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, - bd_u.bd_kvec.bd_kvec[desc-> - bd_max_iov])); - + OBD_FREE_LARGE(GET_KVEC(desc), + desc->bd_max_iov * sizeof(*GET_KVEC(desc))); + OBD_FREE_PTR(desc); EXIT; } EXPORT_SYMBOL(ptlrpc_free_bulk); @@ -553,10 +557,10 @@ int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq) if (!msg) { ptlrpc_request_cache_free(req); return i; - } - req->rq_reqbuf = msg; - req->rq_reqbuf_len = size; - req->rq_pool = pool; + } + req->rq_reqbuf = msg; + req->rq_reqbuf_len = size; + req->rq_pool = pool; spin_lock(&pool->prp_lock); list_add_tail(&req->rq_list, &pool->prp_req_list); } @@ -727,6 +731,8 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, request->rq_reply_cbid.cbid_arg = request; request->rq_reply_deadline = 0; + request->rq_bulk_deadline = 0; + request->rq_req_deadline = 0; request->rq_phase = RQ_PHASE_NEW; request->rq_next_phase = RQ_PHASE_UNDEFINED; @@ -738,6 +744,35 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, lustre_msg_set_opc(request->rq_reqmsg, opcode); ptlrpc_assign_next_xid(request); + /* Let's setup deadline for req/reply/bulk unlink for opcode. */ + if (cfs_fail_val == opcode) { + time_t *fail_t = NULL, *fail2_t = NULL; + + if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) + fail_t = &request->rq_bulk_deadline; + else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) + fail_t = &request->rq_reply_deadline; + else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK)) + fail_t = &request->rq_req_deadline; + else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK)) { + fail_t = &request->rq_reply_deadline; + fail2_t = &request->rq_bulk_deadline; + } + + if (fail_t) { + *fail_t = cfs_time_current_sec() + LONG_UNLINK; + + if (fail2_t) + *fail2_t = cfs_time_current_sec() + LONG_UNLINK; + + /* The RPC is infected, let the test to change the + * fail_loc */ + set_current_state(TASK_UNINTERRUPTIBLE); + schedule_timeout(cfs_time_seconds(2)); + set_current_state(TASK_RUNNING); + } + } + RETURN(0); out_ctx: @@ -1048,6 +1083,9 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set, { LASSERT(list_empty(&req->rq_set_chain)); + if (req->rq_allow_intr) + set->set_allow_intr = 1; + /* The set takes over the caller's request reference */ list_add_tail(&req->rq_set_chain, &set->set_requests); req->rq_set = set; @@ -1256,7 +1294,7 @@ static void ptlrpc_save_versions(struct ptlrpc_request *req) LASSERT(versions); lustre_msg_set_versions(reqmsg, versions); - CDEBUG(D_INFO, "Client save versions ["LPX64"/"LPX64"]\n", + CDEBUG(D_INFO, "Client save versions [%#llx/%#llx]\n", versions[0], versions[1]); EXIT; @@ -1272,7 +1310,7 @@ __u64 ptlrpc_known_replied_xid(struct obd_import *imp) req = list_entry(imp->imp_unreplied_list.next, struct ptlrpc_request, rq_unreplied_list); - LASSERTF(req->rq_xid >= 1, "XID:"LPU64"\n", req->rq_xid); + LASSERTF(req->rq_xid >= 1, "XID:%llu\n", req->rq_xid); if (imp->imp_known_replied_xid < req->rq_xid - 1) imp->imp_known_replied_xid = req->rq_xid - 1; @@ -1575,7 +1613,7 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) } CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc" - " %s:%s:%d:"LPU64":%s:%d\n", current_comm(), + " %s:%s:%d:%llu:%s:%d\n", current_comm(), imp->imp_obd->obd_uuid.uuid, lustre_msg_get_status(req->rq_reqmsg), req->rq_xid, libcfs_nid2str(imp->imp_connection->c_peer.nid), @@ -1651,8 +1689,14 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) rq_set_chain); struct obd_import *imp = req->rq_import; int unregistered = 0; + int async = 1; int rc = 0; + if (req->rq_phase == RQ_PHASE_COMPLETE) { + list_move_tail(&req->rq_set_chain, &comp_reqs); + continue; + } + /* This schedule point is mainly for the ptlrpcd caller of this * function. Most ptlrpc sets are not long-lived and unbounded * in length, but at the least the set used by the ptlrpcd is. @@ -1669,16 +1713,18 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) req->rq_status = -EINTR; ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); + /* Since it is interpreted and we have to wait for + * the reply to be unlinked, then use sync mode. */ + async = 0; + GOTO(interpret, req->rq_status); } - if (req->rq_phase == RQ_PHASE_NEW && - ptlrpc_send_new_req(req)) { - force_timer_recalc = 1; - } + if (req->rq_phase == RQ_PHASE_NEW && ptlrpc_send_new_req(req)) + force_timer_recalc = 1; - /* delayed send - skip */ - if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent) + /* delayed send - skip */ + if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent) continue; /* delayed resend - skip */ @@ -1686,29 +1732,43 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) req->rq_sent > cfs_time_current_sec()) continue; - if (!(req->rq_phase == RQ_PHASE_RPC || - req->rq_phase == RQ_PHASE_BULK || - req->rq_phase == RQ_PHASE_INTERPRET || - req->rq_phase == RQ_PHASE_UNREGISTERING || - req->rq_phase == RQ_PHASE_COMPLETE)) { - DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase); - LBUG(); - } + if (!(req->rq_phase == RQ_PHASE_RPC || + req->rq_phase == RQ_PHASE_BULK || + req->rq_phase == RQ_PHASE_INTERPRET || + req->rq_phase == RQ_PHASE_UNREG_RPC || + req->rq_phase == RQ_PHASE_UNREG_BULK)) { + DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase); + LBUG(); + } - if (req->rq_phase == RQ_PHASE_UNREGISTERING) { - LASSERT(req->rq_next_phase != req->rq_phase); - LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED); + if (req->rq_phase == RQ_PHASE_UNREG_RPC || + req->rq_phase == RQ_PHASE_UNREG_BULK) { + LASSERT(req->rq_next_phase != req->rq_phase); + LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED); + + if (req->rq_req_deadline && + !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK)) + req->rq_req_deadline = 0; + if (req->rq_reply_deadline && + !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) + req->rq_reply_deadline = 0; + if (req->rq_bulk_deadline && + !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) + req->rq_bulk_deadline = 0; - /* - * Skip processing until reply is unlinked. We - * can't return to pool before that and we can't - * call interpret before that. We need to make - * sure that all rdma transfers finished and will - * not corrupt any data. - */ - if (ptlrpc_client_recv_or_unlink(req) || - ptlrpc_client_bulk_active(req)) - continue; + /* + * Skip processing until reply is unlinked. We + * can't return to pool before that and we can't + * call interpret before that. We need to make + * sure that all rdma transfers finished and will + * not corrupt any data. + */ + if (req->rq_phase == RQ_PHASE_UNREG_RPC && + ptlrpc_client_recv_or_unlink(req)) + continue; + if (req->rq_phase == RQ_PHASE_UNREG_BULK && + ptlrpc_client_bulk_active(req)) + continue; /* * Turn fail_loc off to prevent it from looping @@ -1730,11 +1790,6 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) ptlrpc_rqphase_move(req, req->rq_next_phase); } - if (req->rq_phase == RQ_PHASE_COMPLETE) { - list_move_tail(&req->rq_set_chain, &comp_reqs); - continue; - } - if (req->rq_phase == RQ_PHASE_INTERPRET) GOTO(interpret, req->rq_status); @@ -1951,27 +2006,27 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) req->rq_status = -EIO; } - ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); - interpret: - LASSERT(req->rq_phase == RQ_PHASE_INTERPRET); + interpret: + LASSERT(req->rq_phase == RQ_PHASE_INTERPRET); - /* This moves to "unregistering" phase we need to wait for - * reply unlink. */ - if (!unregistered && !ptlrpc_unregister_reply(req, 1)) { - /* start async bulk unlink too */ - ptlrpc_unregister_bulk(req, 1); - continue; - } + /* This moves to "unregistering" phase we need to wait for + * reply unlink. */ + if (!unregistered && !ptlrpc_unregister_reply(req, async)) { + /* start async bulk unlink too */ + ptlrpc_unregister_bulk(req, 1); + continue; + } - if (!ptlrpc_unregister_bulk(req, 1)) - continue; + if (!ptlrpc_unregister_bulk(req, async)) + continue; - /* When calling interpret receiving already should be - * finished. */ - LASSERT(!req->rq_receiving_reply); + /* When calling interpret receiving already should be + * finished. */ + LASSERT(!req->rq_receiving_reply); - ptlrpc_req_interpret(env, req, req->rq_status); + ptlrpc_req_interpret(env, req, req->rq_status); if (ptlrpcd_check_work(req)) { atomic_dec(&set->set_remaining); @@ -1981,7 +2036,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) CDEBUG(req->rq_reqmsg != NULL ? D_RPCTRACE : 0, "Completed RPC pname:cluuid:pid:xid:nid:" - "opc %s:%s:%d:"LPU64":%s:%d\n", + "opc %s:%s:%d:%llu:%s:%d\n", current_comm(), imp->imp_obd->obd_uuid.uuid, lustre_msg_get_status(req->rq_reqmsg), req->rq_xid, libcfs_nid2str(imp->imp_connection->c_peer.nid), @@ -2180,8 +2235,11 @@ static void ptlrpc_interrupted_set(void *data) struct ptlrpc_request *req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); + if (req->rq_intr) + continue; + if (req->rq_phase != RQ_PHASE_RPC && - req->rq_phase != RQ_PHASE_UNREGISTERING && + req->rq_phase != RQ_PHASE_UNREG_RPC && !req->rq_allow_intr) continue; @@ -2274,17 +2332,12 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n", set, timeout); - if (timeout == 0 && !signal_pending(current)) - /* - * No requests are in-flight (ether timed out - * or delayed), so we can allow interrupts. - * We still want to block for a limited time, - * so we allow interrupts during the timeout. - */ - lwi = LWI_TIMEOUT_INTR_ALL(cfs_time_seconds(1), - ptlrpc_expired_set, - ptlrpc_interrupted_set, set); - else if (set->set_allow_intr) + if ((timeout == 0 && !signal_pending(current)) || + set->set_allow_intr) + /* No requests are in-flight (ether timed out + * or delayed), so we can allow interrupts. + * We still want to block for a limited time, + * so we allow interrupts during the timeout. */ lwi = LWI_TIMEOUT_INTR_ALL( cfs_time_seconds(timeout ? timeout : 1), ptlrpc_expired_set, @@ -2512,12 +2565,11 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) */ LASSERT(!in_interrupt()); - /* - * Let's setup deadline for reply unlink. - */ - if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) && - async && request->rq_reply_deadline == 0) - request->rq_reply_deadline = cfs_time_current_sec()+LONG_UNLINK; + /* Let's setup deadline for reply unlink. */ + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) && + async && request->rq_reply_deadline == 0 && cfs_fail_val == 0) + request->rq_reply_deadline = + cfs_time_current_sec() + LONG_UNLINK; /* * Nothing left to do. @@ -2533,10 +2585,8 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) if (!ptlrpc_client_recv_or_unlink(request)) RETURN(1); - /* - * Move to "Unregistering" phase as reply was not unlinked yet. - */ - ptlrpc_rqphase_move(request, RQ_PHASE_UNREGISTERING); + /* Move to "Unregistering" phase as reply was not unlinked yet. */ + ptlrpc_rqphase_move(request, RQ_PHASE_UNREG_RPC); /* * Do not wait for unlink to finish. @@ -2628,11 +2678,11 @@ void ptlrpc_free_committed(struct obd_import *imp) if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked && imp->imp_generation == imp->imp_last_generation_checked) { - CDEBUG(D_INFO, "%s: skip recheck: last_committed "LPU64"\n", + CDEBUG(D_INFO, "%s: skip recheck: last_committed %llu\n", imp->imp_obd->obd_name, imp->imp_peer_committed_transno); RETURN_EXIT; } - CDEBUG(D_RPCTRACE, "%s: committing for last_committed "LPU64" gen %d\n", + CDEBUG(D_RPCTRACE, "%s: committing for last_committed %llu gen %d\n", imp->imp_obd->obd_name, imp->imp_peer_committed_transno, imp->imp_generation); @@ -2671,7 +2721,7 @@ void ptlrpc_free_committed(struct obd_import *imp) continue; } - DEBUG_REQ(D_INFO, req, "commit (last_committed "LPU64")", + DEBUG_REQ(D_INFO, req, "commit (last_committed %llu)", imp->imp_peer_committed_transno); free_req: ptlrpc_free_request(req); @@ -2834,9 +2884,6 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) RETURN(-ENOMEM); } - if (req->rq_allow_intr) - set->set_allow_intr = 1; - /* for distributed debugging */ lustre_msg_set_status(req->rq_reqmsg, current_pid()); @@ -2894,7 +2941,7 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) == lustre_msg_get_transno(req->rq_repmsg) || lustre_msg_get_transno(req->rq_repmsg) == 0, - LPX64"/"LPX64"\n", + "%#llx/%#llx\n", lustre_msg_get_transno(req->rq_reqmsg), lustre_msg_get_transno(req->rq_repmsg)); } @@ -2910,8 +2957,8 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, /* transaction number shouldn't be bigger than the latest replayed */ if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) { DEBUG_REQ(D_ERROR, req, - "Reported transno "LPU64" is bigger than the " - "replayed one: "LPU64, req->rq_transno, + "Reported transno %llu is bigger than the " + "replayed one: %llu", req->rq_transno, lustre_msg_get_transno(req->rq_reqmsg)); GOTO(out, rc = -EINVAL); } @@ -3202,13 +3249,16 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) } else { /* needs to generate a new matchbits for resend */ __u64 old_mbits = req->rq_mbits; - if ((bd->bd_import->imp_connect_data.ocd_connect_flags & - OBD_CONNECT_BULK_MBITS) != 0) + if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS)){ req->rq_mbits = ptlrpc_next_xid(); - else /* old version transfers rq_xid to peer as matchbits */ - req->rq_mbits = req->rq_xid = ptlrpc_next_xid(); - - CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n", + } else {/* old version transfers rq_xid to peer as matchbits */ + spin_lock(&req->rq_import->imp_lock); + list_del_init(&req->rq_unreplied_list); + ptlrpc_assign_next_xid_nolock(req); + req->rq_mbits = req->rq_xid; + spin_unlock(&req->rq_import->imp_lock); + } + CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n", old_mbits, req->rq_mbits); } @@ -3217,6 +3267,11 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) * see LU-1431 */ req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV) - 1; + + /* Set rq_xid as rq_mbits to indicate the final bulk for the old + * server which does not support OBD_CONNECT_BULK_MBITS. LU-6808 */ + if (!OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS)) + req->rq_xid = req->rq_mbits; } /** @@ -3269,7 +3324,6 @@ static void ptlrpcd_add_work_req(struct ptlrpc_request *req) req->rq_timeout = obd_timeout; req->rq_sent = cfs_time_current_sec(); req->rq_deadline = req->rq_sent + req->rq_timeout; - req->rq_reply_deadline = req->rq_deadline; req->rq_phase = RQ_PHASE_INTERPRET; req->rq_next_phase = RQ_PHASE_COMPLETE; req->rq_xid = ptlrpc_next_xid();