X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fptlrpc%2Fclient.c;h=04fe73473d7812362186292fcc5a4df9bb982e48;hp=7ee532494db2d9556f59ce5b840768ebfb24c017;hb=f7f31f8f969f410cca0b4b8b02f81391148e01f2;hpb=babf0232273467b7199ec9a7c36047b1968913df diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 7ee5324..04fe734 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -27,7 +27,6 @@ */ /* * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. */ /** Implementation of client-side PortalRPC interfaces */ @@ -406,10 +405,10 @@ void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req, * resent time, but server sent back service time of original * RPC. */ - CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ? - D_ADAPTTO : D_WARNING, - "Reported service time %u > total measured time %lld\n", - service_timeout, now - req->rq_sent); + CDEBUG_LIMIT((lustre_msg_get_flags(req->rq_reqmsg) & + MSG_RESENT) ? D_ADAPTTO : D_WARNING, + "Reported service time %u > total measured time %lld\n", + service_timeout, now - req->rq_sent); return; } @@ -551,14 +550,14 @@ void ptlrpc_request_cache_free(struct ptlrpc_request *req) */ void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool) { - struct list_head *l, *tmp; struct ptlrpc_request *req; LASSERT(pool != NULL); spin_lock(&pool->prp_lock); - list_for_each_safe(l, tmp, &pool->prp_req_list) { - req = list_entry(l, struct ptlrpc_request, rq_list); + while ((req = list_first_entry_or_null(&pool->prp_req_list, + struct ptlrpc_request, + rq_list))) { list_del(&req->rq_list); LASSERT(req->rq_reqbuf); LASSERT(req->rq_reqbuf_len == pool->prp_rq_size); @@ -668,8 +667,8 @@ ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool) return NULL; } - request = list_entry(pool->prp_req_list.next, struct ptlrpc_request, - rq_list); + request = list_first_entry(&pool->prp_req_list, struct ptlrpc_request, + rq_list); list_del_init(&request->rq_list); spin_unlock(&pool->prp_lock); @@ -702,17 +701,14 @@ static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request) void ptlrpc_add_unreplied(struct ptlrpc_request *req) { struct obd_import *imp = req->rq_import; - struct list_head *tmp; struct ptlrpc_request *iter; assert_spin_locked(&imp->imp_lock); LASSERT(list_empty(&req->rq_unreplied_list)); /* unreplied list is sorted by xid in ascending order */ - list_for_each_prev(tmp, &imp->imp_unreplied_list) { - iter = list_entry(tmp, struct ptlrpc_request, - rq_unreplied_list); - + list_for_each_entry_reverse(iter, &imp->imp_unreplied_list, + rq_unreplied_list) { LASSERT(req->rq_xid != iter->rq_xid); if (req->rq_xid < iter->rq_xid) continue; @@ -1106,8 +1102,7 @@ struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func, */ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) { - struct list_head *tmp; - struct list_head *next; + struct ptlrpc_request *req; int expected_phase; int n = 0; @@ -1116,11 +1111,7 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) /* Requests on the set should either all be completed, or all be new */ expected_phase = (atomic_read(&set->set_remaining) == 0) ? RQ_PHASE_COMPLETE : RQ_PHASE_NEW; - list_for_each(tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, - rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { LASSERT(req->rq_phase == expected_phase); n++; } @@ -1129,10 +1120,9 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) atomic_read(&set->set_remaining) == n, "%d / %d\n", atomic_read(&set->set_remaining), n); - list_for_each_safe(tmp, next, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, - rq_set_chain); + while ((req = list_first_entry_or_null(&set->set_requests, + struct ptlrpc_request, + rq_set_chain))) { list_del_init(&req->rq_set_chain); LASSERT(req->rq_phase == expected_phase); @@ -1291,7 +1281,7 @@ static int ptlrpc_import_delay_req(struct obd_import *imp, } else if (req->rq_no_delay && imp->imp_generation != imp->imp_initiated_at) { /* ignore nodelay for requests initiating connections */ - *status = -EWOULDBLOCK; + *status = -EAGAIN; } else if (req->rq_allow_replay && (imp->imp_state == LUSTRE_IMP_REPLAY || imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS || @@ -1409,8 +1399,8 @@ __u64 ptlrpc_known_replied_xid(struct obd_import *imp) if (list_empty(&imp->imp_unreplied_list)) return 0; - req = list_entry(imp->imp_unreplied_list.next, struct ptlrpc_request, - rq_unreplied_list); + req = list_first_entry(&imp->imp_unreplied_list, struct ptlrpc_request, + rq_unreplied_list); LASSERTF(req->rq_xid >= 1, "XID:%llu\n", req->rq_xid); if (imp->imp_known_replied_xid < req->rq_xid - 1) @@ -1706,7 +1696,22 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) lustre_msg_set_status(req->rq_reqmsg, current->pid); - rc = sptlrpc_req_refresh_ctx(req, 0); + /* If the request to be sent is an LDLM callback, do not try to + * refresh context. + * An LDLM callback is sent by a server to a client in order to make + * it release a lock, on a communication channel that uses a reverse + * context. It cannot be refreshed on its own, as it is the 'reverse' + * (server-side) representation of a client context. + * We do not care if the reverse context is expired, and want to send + * the LDLM callback anyway. Once the client receives the AST, it is + * its job to refresh its own context if it has expired, hence + * refreshing the associated reverse context on server side, before + * being able to send the LDLM_CANCEL requested by the server. + */ + if (lustre_msg_get_opc(req->rq_reqmsg) != LDLM_BL_CALLBACK && + lustre_msg_get_opc(req->rq_reqmsg) != LDLM_CP_CALLBACK && + lustre_msg_get_opc(req->rq_reqmsg) != LDLM_GL_CALLBACK) + rc = sptlrpc_req_refresh_ctx(req, 0); if (rc) { if (req->rq_err) { req->rq_status = rc; @@ -1786,7 +1791,7 @@ static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set) */ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) { - struct list_head *tmp, *next; + struct ptlrpc_request *req, *next; LIST_HEAD(comp_reqs); int force_timer_recalc = 0; @@ -1794,10 +1799,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) if (atomic_read(&set->set_remaining) == 0) RETURN(1); - list_for_each_safe(tmp, next, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, - rq_set_chain); + list_for_each_entry_safe(req, next, &set->set_requests, + rq_set_chain) { struct obd_import *imp = req->rq_import; int unregistered = 0; int async = 1; @@ -2154,7 +2157,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) * was good after getting the REPLY for her GET or * the ACK for her PUT. */ - DEBUG_REQ(D_ERROR, req, "bulk transfer failed"); + DEBUG_REQ(D_ERROR, req, "bulk transfer failed %d/%d/%d", + req->rq_status, + req->rq_bulk->bd_nob, + req->rq_bulk->bd_nob_transferred); req->rq_status = -EIO; } @@ -2338,7 +2344,7 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) */ void ptlrpc_expired_set(struct ptlrpc_request_set *set) { - struct list_head *tmp; + struct ptlrpc_request *req; time64_t now = ktime_get_real_seconds(); ENTRY; @@ -2347,11 +2353,7 @@ void ptlrpc_expired_set(struct ptlrpc_request_set *set) /* * A timeout expired. See which reqs it applies to... */ - list_for_each(tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, - rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { /* don't expire request waiting for context */ if (req->rq_wait_ctx) continue; @@ -2387,15 +2389,12 @@ void ptlrpc_expired_set(struct ptlrpc_request_set *set) */ static void ptlrpc_interrupted_set(struct ptlrpc_request_set *set) { - struct list_head *tmp; + struct ptlrpc_request *req; LASSERT(set != NULL); CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set); - list_for_each(tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { if (req->rq_intr) continue; @@ -2415,16 +2414,13 @@ static void ptlrpc_interrupted_set(struct ptlrpc_request_set *set) */ time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) { - struct list_head *tmp; time64_t now = ktime_get_real_seconds(); int timeout = 0; struct ptlrpc_request *req; time64_t deadline; ENTRY; - list_for_each(tmp, &set->set_requests) { - req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { /* Request in-flight? */ if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) || (req->rq_phase == RQ_PHASE_BULK) || @@ -2462,7 +2458,6 @@ time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) */ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) { - struct list_head *tmp; struct ptlrpc_request *req; time64_t timeout; int rc; @@ -2471,9 +2466,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) if (set->set_producer) (void)ptlrpc_set_producer(set); else - list_for_each(tmp, &set->set_requests) { - req = list_entry(tmp, struct ptlrpc_request, - rq_set_chain); + list_for_each_entry(req, &set->set_requests, rq_set_chain) { if (req->rq_phase == RQ_PHASE_NEW) (void)ptlrpc_send_new_req(req); } @@ -2567,9 +2560,8 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) * the error cases -eeb. */ if (rc == 0 && atomic_read(&set->set_remaining) == 0) { - list_for_each(tmp, &set->set_requests) { - req = list_entry(tmp, struct ptlrpc_request, - rq_set_chain); + list_for_each_entry(req, &set->set_requests, + rq_set_chain) { spin_lock(&req->rq_lock); req->rq_invalid_rqset = 1; spin_unlock(&req->rq_lock); @@ -2580,9 +2572,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) LASSERT(atomic_read(&set->set_remaining) == 0); rc = set->set_rc; /* rq_status of already freed requests if any */ - list_for_each(tmp, &set->set_requests) { - req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { LASSERT(req->rq_phase == RQ_PHASE_COMPLETE); if (req->rq_status != 0) rc = req->rq_status; @@ -3026,7 +3016,7 @@ EXPORT_SYMBOL(ptlrpc_request_addref); void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, struct obd_import *imp) { - struct list_head *tmp; + struct ptlrpc_request *iter; assert_spin_locked(&imp->imp_lock); @@ -3054,11 +3044,8 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, LASSERT(imp->imp_replayable); /* Balanced in ptlrpc_free_committed, usually. */ ptlrpc_request_addref(req); - list_for_each_prev(tmp, &imp->imp_replay_list) { - struct ptlrpc_request *iter = list_entry(tmp, - struct ptlrpc_request, - rq_replay_list); - + list_for_each_entry_reverse(iter, &imp->imp_replay_list, + rq_replay_list) { /* * We may have duplicate transnos if we create and then * open a file, or for closes retained if to match creating @@ -3306,7 +3293,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) */ void ptlrpc_abort_inflight(struct obd_import *imp) { - struct list_head *tmp, *n; + struct ptlrpc_request *req; ENTRY; /* @@ -3321,11 +3308,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) * locked? Also, how do we know if the requests on the list are * being freed at this time? */ - list_for_each_safe(tmp, n, &imp->imp_sending_list) { - struct ptlrpc_request *req = list_entry(tmp, - struct ptlrpc_request, - rq_list); - + list_for_each_entry(req, &imp->imp_sending_list, rq_list) { DEBUG_REQ(D_RPCTRACE, req, "inflight"); spin_lock(&req->rq_lock); @@ -3337,10 +3320,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) spin_unlock(&req->rq_lock); } - list_for_each_safe(tmp, n, &imp->imp_delayed_list) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, rq_list); - + list_for_each_entry(req, &imp->imp_delayed_list, rq_list) { DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req"); spin_lock(&req->rq_lock); @@ -3367,15 +3347,11 @@ void ptlrpc_abort_inflight(struct obd_import *imp) */ void ptlrpc_abort_set(struct ptlrpc_request_set *set) { - struct list_head *tmp, *pos; + struct ptlrpc_request *req; LASSERT(set != NULL); - list_for_each_safe(pos, tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(pos, struct ptlrpc_request, - rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { spin_lock(&req->rq_lock); if (req->rq_phase != RQ_PHASE_RPC) { spin_unlock(&req->rq_lock); @@ -3448,12 +3424,11 @@ __u64 ptlrpc_next_xid(void) * request to ensure previous bulk fails and avoid problems with lost replies * and therefore several transfers landing into the same buffer from different * sending attempts. + * Also, to avoid previous reply landing to a different sending attempt. */ -void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) +void ptlrpc_set_mbits(struct ptlrpc_request *req) { - struct ptlrpc_bulk_desc *bd = req->rq_bulk; - - LASSERT(bd != NULL); + int md_count = req->rq_bulk ? req->rq_bulk->bd_md_count : 1; /* * Generate new matchbits for all resend requests, including @@ -3469,7 +3444,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) * 'resend for the -EINPROGRESS resend'. To make it simple, * we opt to generate mbits for all resend cases. */ - if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, + if (OCD_HAS_FLAG(&req->rq_import->imp_connect_data, BULK_MBITS)) { req->rq_mbits = ptlrpc_next_xid(); } else { @@ -3483,15 +3458,16 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) spin_unlock(&req->rq_import->imp_lock); req->rq_mbits = req->rq_xid; } - CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n", + CDEBUG(D_HA, "resend with new mbits old x%llu new x%llu\n", old_mbits, req->rq_mbits); } else if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) { /* Request being sent first time, use xid as matchbits. */ - if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS) - || req->rq_mbits == 0) { + if (OCD_HAS_FLAG(&req->rq_import->imp_connect_data, + BULK_MBITS) || req->rq_mbits == 0) + { req->rq_mbits = req->rq_xid; } else { - req->rq_mbits -= bd->bd_md_count - 1; + req->rq_mbits -= md_count - 1; } } else { /* @@ -3506,7 +3482,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) * that server can infer the number of bulks that were prepared, * see LU-1431 */ - req->rq_mbits += bd->bd_md_count - 1; + req->rq_mbits += md_count - 1; /* * Set rq_xid as rq_mbits to indicate the final bulk for the old @@ -3515,7 +3491,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) * It's ok to directly set the rq_xid here, since this xid bump * won't affect the request position in unreplied list. */ - if (!OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS)) + if (!OCD_HAS_FLAG(&req->rq_import->imp_connect_data, BULK_MBITS)) req->rq_xid = req->rq_mbits; }