X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fptlrpc%2Fclient.c;h=0577f6aa898425dbc9d4d2e1f4482a49d6948d53;hb=5c89fa57cb2dfff667ddd75eaeb13f942f5b155d;hp=c13eb7370fa1bd9b3042a2e723a05708701fc42e;hpb=6c6f247bc08bb6c3a3c07d2da2774a54fd244c9a;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index c13eb73..0577f6a 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -169,6 +169,12 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags, LASSERT(ops->add_kiov_frag != NULL); + if (max_brw > PTLRPC_BULK_OPS_COUNT) + RETURN(NULL); + + if (nfrags > LNET_MAX_IOV * max_brw) + RETURN(NULL); + OBD_ALLOC_PTR(desc); if (!desc) return NULL; @@ -185,6 +191,7 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags, desc->bd_portal = portal; desc->bd_type = type; desc->bd_md_count = 0; + desc->bd_nob_last = LNET_MTU; desc->bd_frag_ops = ops; LASSERT(max_brw > 0); desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT); @@ -253,6 +260,15 @@ void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc, kiov = &desc->bd_vec[desc->bd_iov_count]; + if (((desc->bd_iov_count % LNET_MAX_IOV) == 0) || + ((desc->bd_nob_last + len) > LNET_MTU)) { + desc->bd_mds_off[desc->bd_md_count] = desc->bd_iov_count; + desc->bd_md_count++; + desc->bd_nob_last = 0; + LASSERT(desc->bd_md_count <= PTLRPC_BULK_OPS_COUNT); + } + + desc->bd_nob_last += len; desc->bd_nob += len; if (pin) @@ -272,7 +288,7 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc) LASSERT(desc != NULL); LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */ - LASSERT(desc->bd_md_count == 0); /* network hands off */ + LASSERT(desc->bd_refs == 0); /* network hands off */ LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL)); LASSERT(desc->bd_frag_ops != NULL); @@ -535,14 +551,14 @@ void ptlrpc_request_cache_free(struct ptlrpc_request *req) */ void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool) { - struct list_head *l, *tmp; struct ptlrpc_request *req; LASSERT(pool != NULL); spin_lock(&pool->prp_lock); - list_for_each_safe(l, tmp, &pool->prp_req_list) { - req = list_entry(l, struct ptlrpc_request, rq_list); + while ((req = list_first_entry_or_null(&pool->prp_req_list, + struct ptlrpc_request, + rq_list))) { list_del(&req->rq_list); LASSERT(req->rq_reqbuf); LASSERT(req->rq_reqbuf_len == pool->prp_rq_size); @@ -686,17 +702,14 @@ static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request) void ptlrpc_add_unreplied(struct ptlrpc_request *req) { struct obd_import *imp = req->rq_import; - struct list_head *tmp; struct ptlrpc_request *iter; assert_spin_locked(&imp->imp_lock); LASSERT(list_empty(&req->rq_unreplied_list)); /* unreplied list is sorted by xid in ascending order */ - list_for_each_prev(tmp, &imp->imp_unreplied_list) { - iter = list_entry(tmp, struct ptlrpc_request, - rq_unreplied_list); - + list_for_each_entry_reverse(iter, &imp->imp_unreplied_list, + rq_unreplied_list) { LASSERT(req->rq_xid != iter->rq_xid); if (req->rq_xid < iter->rq_xid) continue; @@ -829,11 +842,12 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, } if (fail_t) { - *fail_t = ktime_get_real_seconds() + LONG_UNLINK; + *fail_t = ktime_get_real_seconds() + + PTLRPC_REQ_LONG_UNLINK; if (fail2_t) *fail2_t = ktime_get_real_seconds() + - LONG_UNLINK; + PTLRPC_REQ_LONG_UNLINK; /* * The RPC is infected, let the test to change the @@ -850,6 +864,7 @@ out_ctx: LASSERT(!request->rq_pool); sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1); out_free: + atomic_dec(&imp->imp_reqs); class_import_put(imp); return rc; @@ -887,13 +902,14 @@ struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp, if (request) { ptlrpc_cli_req_init(request); - LASSERTF((unsigned long)imp > 0x1000, "%p", imp); + LASSERTF((unsigned long)imp > 0x1000, "%p\n", imp); LASSERT(imp != LP_POISON); LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p\n", imp->imp_client); LASSERT(imp->imp_client != LP_POISON); request->rq_import = class_import_get(imp); + atomic_inc(&imp->imp_reqs); } else { CERROR("request allocation out of memory\n"); } @@ -901,6 +917,33 @@ struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp, return request; } +static int ptlrpc_reconnect_if_idle(struct obd_import *imp) +{ + int rc; + + /* + * initiate connection if needed when the import has been + * referenced by the new request to avoid races with disconnect. + * serialize this check against conditional state=IDLE + * in ptlrpc_disconnect_idle_interpret() + */ + spin_lock(&imp->imp_lock); + if (imp->imp_state == LUSTRE_IMP_IDLE) { + imp->imp_generation++; + imp->imp_initiated_at = imp->imp_generation; + imp->imp_state = LUSTRE_IMP_NEW; + + /* connect_import_locked releases imp_lock */ + rc = ptlrpc_connect_import_locked(imp); + if (rc) + return rc; + ptlrpc_pinger_add_import(imp); + } else { + spin_unlock(&imp->imp_lock); + } + return 0; +} + /** * Helper function for creating a request. * Calls __ptlrpc_request_alloc to allocate new request sturcture and inits @@ -918,32 +961,13 @@ ptlrpc_request_alloc_internal(struct obd_import *imp, if (!request) return NULL; - /* - * initiate connection if needed when the import has been - * referenced by the new request to avoid races with disconnect - */ - if (unlikely(imp->imp_state == LUSTRE_IMP_IDLE)) { - int rc; - - CDEBUG_LIMIT(imp->imp_idle_debug, - "%s: reconnect after %llds idle\n", - imp->imp_obd->obd_name, ktime_get_real_seconds() - - imp->imp_last_reply_time); - spin_lock(&imp->imp_lock); - if (imp->imp_state == LUSTRE_IMP_IDLE) { - imp->imp_generation++; - imp->imp_initiated_at = imp->imp_generation; - imp->imp_state = LUSTRE_IMP_NEW; - - /* connect_import_locked releases imp_lock */ - rc = ptlrpc_connect_import_locked(imp); - if (rc < 0) { - ptlrpc_request_free(request); - return NULL; - } - ptlrpc_pinger_add_import(imp); - } else { - spin_unlock(&imp->imp_lock); + /* don't make expensive check for idling connection + * if it's already connected */ + if (unlikely(imp->imp_state != LUSTRE_IMP_FULL)) { + if (ptlrpc_reconnect_if_idle(imp) < 0) { + atomic_dec(&imp->imp_reqs); + ptlrpc_request_free(request); + return NULL; } } @@ -1079,8 +1103,7 @@ struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func, */ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) { - struct list_head *tmp; - struct list_head *next; + struct ptlrpc_request *req; int expected_phase; int n = 0; @@ -1089,11 +1112,7 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) /* Requests on the set should either all be completed, or all be new */ expected_phase = (atomic_read(&set->set_remaining) == 0) ? RQ_PHASE_COMPLETE : RQ_PHASE_NEW; - list_for_each(tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, - rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { LASSERT(req->rq_phase == expected_phase); n++; } @@ -1102,10 +1121,9 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) atomic_read(&set->set_remaining) == n, "%d / %d\n", atomic_read(&set->set_remaining), n); - list_for_each_safe(tmp, next, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, - rq_set_chain); + while ((req = list_first_entry_or_null(&set->set_requests, + struct ptlrpc_request, + rq_set_chain))) { list_del_init(&req->rq_set_chain); LASSERT(req->rq_phase == expected_phase); @@ -1264,7 +1282,7 @@ static int ptlrpc_import_delay_req(struct obd_import *imp, } else if (req->rq_no_delay && imp->imp_generation != imp->imp_initiated_at) { /* ignore nodelay for requests initiating connections */ - *status = -EWOULDBLOCK; + *status = -EAGAIN; } else if (req->rq_allow_replay && (imp->imp_state == LUSTRE_IMP_REPLAY || imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS || @@ -1759,7 +1777,7 @@ static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set) */ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) { - struct list_head *tmp, *next; + struct ptlrpc_request *req, *next; LIST_HEAD(comp_reqs); int force_timer_recalc = 0; @@ -1767,10 +1785,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) if (atomic_read(&set->set_remaining) == 0) RETURN(1); - list_for_each_safe(tmp, next, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, - rq_set_chain); + list_for_each_entry_safe(req, next, &set->set_requests, + rq_set_chain) { struct obd_import *imp = req->rq_import; int unregistered = 0; int async = 1; @@ -1853,7 +1869,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) * not corrupt any data. */ if (req->rq_phase == RQ_PHASE_UNREG_RPC && - ptlrpc_client_recv_or_unlink(req)) + ptlrpc_cli_wait_unlink(req)) continue; if (req->rq_phase == RQ_PHASE_UNREG_BULK && ptlrpc_client_bulk_active(req)) @@ -1891,7 +1907,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) /* * Check if we still need to wait for unlink. */ - if (ptlrpc_client_recv_or_unlink(req) || + if (ptlrpc_cli_wait_unlink(req) || ptlrpc_client_bulk_active(req)) continue; /* If there is no need to resend, fail it now. */ @@ -1974,6 +1990,27 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) GOTO(interpret, req->rq_status); } + /* don't resend too fast in case of network + * errors. + */ + if (ktime_get_real_seconds() < (req->rq_sent + 1) + && req->rq_net_err && req->rq_timedout) { + + DEBUG_REQ(D_INFO, req, + "throttle request"); + /* Don't try to resend RPC right away + * as it is likely it will fail again + * and ptlrpc_check_set() will be + * called again, keeping this thread + * busy. Instead, wait for the next + * timeout. Flag it as resend to + * ensure we don't wait to long. + */ + req->rq_resend = 1; + spin_unlock(&imp->imp_lock); + continue; + } + list_move_tail(&req->rq_list, &imp->imp_sending_list); @@ -2290,7 +2327,7 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) */ void ptlrpc_expired_set(struct ptlrpc_request_set *set) { - struct list_head *tmp; + struct ptlrpc_request *req; time64_t now = ktime_get_real_seconds(); ENTRY; @@ -2299,11 +2336,7 @@ void ptlrpc_expired_set(struct ptlrpc_request_set *set) /* * A timeout expired. See which reqs it applies to... */ - list_for_each(tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, - rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { /* don't expire request waiting for context */ if (req->rq_wait_ctx) continue; @@ -2323,6 +2356,12 @@ void ptlrpc_expired_set(struct ptlrpc_request_set *set) * ptlrpcd thread. */ ptlrpc_expire_one_request(req, 1); + /* + * Loops require that we resched once in a while to avoid + * RCU stalls and a few other problems. + */ + cond_resched(); + } } @@ -2333,15 +2372,12 @@ void ptlrpc_expired_set(struct ptlrpc_request_set *set) */ static void ptlrpc_interrupted_set(struct ptlrpc_request_set *set) { - struct list_head *tmp; + struct ptlrpc_request *req; LASSERT(set != NULL); CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set); - list_for_each(tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { if (req->rq_intr) continue; @@ -2361,16 +2397,13 @@ static void ptlrpc_interrupted_set(struct ptlrpc_request_set *set) */ time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) { - struct list_head *tmp; time64_t now = ktime_get_real_seconds(); int timeout = 0; struct ptlrpc_request *req; time64_t deadline; ENTRY; - list_for_each(tmp, &set->set_requests) { - req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { /* Request in-flight? */ if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) || (req->rq_phase == RQ_PHASE_BULK) || @@ -2408,7 +2441,6 @@ time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) */ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) { - struct list_head *tmp; struct ptlrpc_request *req; time64_t timeout; int rc; @@ -2417,9 +2449,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) if (set->set_producer) (void)ptlrpc_set_producer(set); else - list_for_each(tmp, &set->set_requests) { - req = list_entry(tmp, struct ptlrpc_request, - rq_set_chain); + list_for_each_entry(req, &set->set_requests, rq_set_chain) { if (req->rq_phase == RQ_PHASE_NEW) (void)ptlrpc_send_new_req(req); } @@ -2483,10 +2513,10 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) */ if (rc == -ETIMEDOUT && signal_pending(current)) { - sigset_t blocked_sigs; + sigset_t old, new; - cfs_block_sigsinv(LUSTRE_FATAL_SIGS, - &blocked_sigs); + siginitset(&new, LUSTRE_FATAL_SIGS); + sigprocmask(SIG_BLOCK, &new, &old); /* * In fact we only interrupt for the * "fatal" signals like SIGINT or @@ -2497,7 +2527,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) */ if (signal_pending(current)) ptlrpc_interrupted_set(set); - cfs_restore_sigs(&blocked_sigs); + sigprocmask(SIG_SETMASK, &old, NULL); } } @@ -2513,9 +2543,8 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) * the error cases -eeb. */ if (rc == 0 && atomic_read(&set->set_remaining) == 0) { - list_for_each(tmp, &set->set_requests) { - req = list_entry(tmp, struct ptlrpc_request, - rq_set_chain); + list_for_each_entry(req, &set->set_requests, + rq_set_chain) { spin_lock(&req->rq_lock); req->rq_invalid_rqset = 1; spin_unlock(&req->rq_lock); @@ -2526,9 +2555,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set) LASSERT(atomic_read(&set->set_remaining) == 0); rc = set->set_rc; /* rq_status of already freed requests if any */ - list_for_each(tmp, &set->set_requests) { - req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { LASSERT(req->rq_phase == RQ_PHASE_COMPLETE); if (req->rq_status != 0) rc = req->rq_status; @@ -2586,6 +2613,10 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) sptlrpc_cli_free_repbuf(request); if (request->rq_import) { + if (!ptlrpcd_check_work(request)) { + LASSERT(atomic_read(&request->rq_import->imp_reqs) > 0); + atomic_dec(&request->rq_import->imp_reqs); + } class_import_put(request->rq_import); request->rq_import = NULL; } @@ -2700,6 +2731,7 @@ EXPORT_SYMBOL(ptlrpc_req_xid); */ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) { + bool discard = false; /* * Might sleep. */ @@ -2709,20 +2741,23 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) && async && request->rq_reply_deadline == 0 && cfs_fail_val == 0) request->rq_reply_deadline = ktime_get_real_seconds() + - LONG_UNLINK; + PTLRPC_REQ_LONG_UNLINK; /* * Nothing left to do. */ - if (!ptlrpc_client_recv_or_unlink(request)) + if (!__ptlrpc_cli_wait_unlink(request, &discard)) RETURN(1); LNetMDUnlink(request->rq_reply_md_h); + if (discard) /* Discard the request-out callback */ + __LNetMDUnlink(request->rq_req_md_h, discard); + /* * Let's check it once again. */ - if (!ptlrpc_client_recv_or_unlink(request)) + if (!ptlrpc_cli_wait_unlink(request)) RETURN(1); /* Move to "Unregistering" phase as reply was not unlinked yet. */ @@ -2743,7 +2778,7 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) wait_queue_head_t *wq = (request->rq_set) ? &request->rq_set->set_waitq : &request->rq_reply_waitq; - int seconds = LONG_UNLINK; + int seconds = PTLRPC_REQ_LONG_UNLINK; /* * Network access will complete in finite time but the HUGE * timeout lets us CWARN for visibility of sluggish NALs @@ -2751,7 +2786,7 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) while (seconds > 0 && wait_event_idle_timeout( *wq, - !ptlrpc_client_recv_or_unlink(request), + !ptlrpc_cli_wait_unlink(request), cfs_time_seconds(1)) == 0) seconds -= 1; if (seconds > 0) { @@ -2964,7 +2999,7 @@ EXPORT_SYMBOL(ptlrpc_request_addref); void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, struct obd_import *imp) { - struct list_head *tmp; + struct ptlrpc_request *iter; assert_spin_locked(&imp->imp_lock); @@ -2992,11 +3027,8 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, LASSERT(imp->imp_replayable); /* Balanced in ptlrpc_free_committed, usually. */ ptlrpc_request_addref(req); - list_for_each_prev(tmp, &imp->imp_replay_list) { - struct ptlrpc_request *iter = list_entry(tmp, - struct ptlrpc_request, - rq_replay_list); - + list_for_each_entry_reverse(iter, &imp->imp_replay_list, + rq_replay_list) { /* * We may have duplicate transnos if we create and then * open a file, or for closes retained if to match creating @@ -3244,7 +3276,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) */ void ptlrpc_abort_inflight(struct obd_import *imp) { - struct list_head *tmp, *n; + struct ptlrpc_request *req; ENTRY; /* @@ -3259,11 +3291,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) * locked? Also, how do we know if the requests on the list are * being freed at this time? */ - list_for_each_safe(tmp, n, &imp->imp_sending_list) { - struct ptlrpc_request *req = list_entry(tmp, - struct ptlrpc_request, - rq_list); - + list_for_each_entry(req, &imp->imp_sending_list, rq_list) { DEBUG_REQ(D_RPCTRACE, req, "inflight"); spin_lock(&req->rq_lock); @@ -3275,10 +3303,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) spin_unlock(&req->rq_lock); } - list_for_each_safe(tmp, n, &imp->imp_delayed_list) { - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, rq_list); - + list_for_each_entry(req, &imp->imp_delayed_list, rq_list) { DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req"); spin_lock(&req->rq_lock); @@ -3305,15 +3330,11 @@ void ptlrpc_abort_inflight(struct obd_import *imp) */ void ptlrpc_abort_set(struct ptlrpc_request_set *set) { - struct list_head *tmp, *pos; + struct ptlrpc_request *req; LASSERT(set != NULL); - list_for_each_safe(pos, tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(pos, struct ptlrpc_request, - rq_set_chain); - + list_for_each_entry(req, &set->set_requests, rq_set_chain) { spin_lock(&req->rq_lock); if (req->rq_phase != RQ_PHASE_RPC) { spin_unlock(&req->rq_lock); @@ -3429,9 +3450,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) || req->rq_mbits == 0) { req->rq_mbits = req->rq_xid; } else { - int total_md = (bd->bd_iov_count + LNET_MAX_IOV - 1) / - LNET_MAX_IOV; - req->rq_mbits -= total_md - 1; + req->rq_mbits -= bd->bd_md_count - 1; } } else { /* @@ -3446,8 +3465,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) * that server can infer the number of bulks that were prepared, * see LU-1431 */ - req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) / - LNET_MAX_IOV) - 1; + req->rq_mbits += bd->bd_md_count - 1; /* * Set rq_xid as rq_mbits to indicate the final bulk for the old