X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fptlrpc%2Fservice.c;h=37d7bf9d12ca5967e4285277e40e7296f0c2e3da;hp=fa23b912af732583df6039a2690b168ad388a2d8;hb=62499626b81f83f9e2ceceaa11d9b33861581cb6;hpb=6869932b552ac705f411de3362f01bd50c1f6f7d diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index fa23b91..37d7bf9 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -169,7 +169,7 @@ ptlrpc_grow_req_bufs(struct ptlrpc_service *svc) void ptlrpc_save_lock (struct ptlrpc_request *req, - struct lustre_handle *lock, int mode) + struct lustre_handle *lock, int mode, int no_ack) { struct ptlrpc_reply_state *rs = req->rq_reply_state; int idx; @@ -181,12 +181,14 @@ ptlrpc_save_lock (struct ptlrpc_request *req, rs->rs_locks[idx] = *lock; rs->rs_modes[idx] = mode; rs->rs_difficult = 1; + rs->rs_no_ack = !!no_ack; } void ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs) { struct ptlrpc_service *svc = rs->rs_service; + ENTRY; #ifdef CONFIG_SMP LASSERT (spin_is_locked (&svc->srv_lock)); @@ -194,13 +196,16 @@ ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs) LASSERT (rs->rs_difficult); rs->rs_scheduled_ever = 1; /* flag any notification attempt */ - if (rs->rs_scheduled) /* being set up or already notified */ + if (rs->rs_scheduled) { /* being set up or already notified */ + EXIT; return; + } rs->rs_scheduled = 1; list_del (&rs->rs_list); list_add (&rs->rs_list, &svc->srv_reply_queue); cfs_waitq_signal (&svc->srv_waitq); + EXIT; } void @@ -208,6 +213,7 @@ ptlrpc_commit_replies (struct obd_device *obd) { struct list_head *tmp; struct list_head *nxt; + ENTRY; /* Find any replies that have been committed and get their service * to attend to complete them. */ @@ -232,6 +238,7 @@ ptlrpc_commit_replies (struct obd_device *obd) } spin_unlock(&obd->obd_uncommitted_replies_lock); + EXIT; } static int @@ -400,41 +407,34 @@ failed: return NULL; } -static void ptlrpc_server_req_decref(struct ptlrpc_request *req) +/** + * to actually free the request, must be called without holding svc_lock. + * note it's caller's responsibility to unlink req->rq_list. + */ +static void ptlrpc_server_free_request(struct ptlrpc_request *req) { - struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd; + LASSERT(atomic_read(&req->rq_refcount) == 0); + LASSERT(list_empty(&req->rq_timed_list)); - if (!atomic_dec_and_test(&req->rq_refcount)) - return; + /* DEBUG_REQ() assumes the reply state of a request with a valid + * ref will not be destroyed until that reference is dropped. */ + ptlrpc_req_drop_rs(req); sptlrpc_svc_ctx_decref(req); - LASSERT(list_empty(&req->rq_timed_list)); - if (req != &rqbd->rqbd_req) { + if (req != &req->rq_rqbd->rqbd_req) { /* NB request buffers use an embedded * req if the incoming req unlinked the * MD; this isn't one of them! */ OBD_FREE(req, sizeof(*req)); - } else { - struct ptlrpc_service *svc = rqbd->rqbd_service; - /* schedule request buffer for re-use. - * NB I can only do this after I've disposed of their - * reqs; particularly the embedded req */ - spin_lock(&svc->srv_lock); - list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds); - spin_unlock(&svc->srv_lock); } } -static void __ptlrpc_server_free_request(struct ptlrpc_request *req) -{ - list_del(&req->rq_list); - ptlrpc_req_drop_rs(req); - ptlrpc_server_req_decref(req); -} - -static void -ptlrpc_server_free_request(struct ptlrpc_request *req) +/** + * drop a reference count of the request. if it reaches 0, we either + * put it into history list, or free it immediately. + */ +static void ptlrpc_server_drop_request(struct ptlrpc_request *req) { struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd; struct ptlrpc_service *svc = rqbd->rqbd_service; @@ -442,12 +442,8 @@ ptlrpc_server_free_request(struct ptlrpc_request *req) struct list_head *tmp; struct list_head *nxt; - if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */ - DEBUG_REQ(D_INFO, req, "free req"); - spin_lock(&svc->srv_at_lock); - req->rq_sent_final = 1; - list_del_init(&req->rq_timed_list); - spin_unlock(&svc->srv_at_lock); + if (!atomic_dec_and_test(&req->rq_refcount)) + return; spin_lock(&svc->srv_lock); @@ -490,20 +486,49 @@ ptlrpc_server_free_request(struct ptlrpc_request *req) req = list_entry(rqbd->rqbd_reqs.next, struct ptlrpc_request, rq_list); - __ptlrpc_server_free_request(req); + list_del(&req->rq_list); + ptlrpc_server_free_request(req); } spin_lock(&svc->srv_lock); + /* + * now all reqs including the embedded req has been + * disposed, schedule request buffer for re-use. + */ + LASSERT(atomic_read(&rqbd->rqbd_req.rq_refcount) == 0); + list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds); } + + spin_unlock(&svc->srv_lock); } else if (req->rq_reply_state && req->rq_reply_state->rs_prealloc) { - /* If we are low on memory, we are not interested in - history */ - list_del(&req->rq_history_list); - __ptlrpc_server_free_request(req); + /* If we are low on memory, we are not interested in history */ + list_del(&req->rq_list); + list_del_init(&req->rq_history_list); + spin_unlock(&svc->srv_lock); + + ptlrpc_server_free_request(req); + } else { + spin_unlock(&svc->srv_lock); } +} - spin_unlock(&svc->srv_lock); +/** + * to finish a request: stop sending more early replies, and release + * the request. should be called after we finished handling the request. + */ +static void ptlrpc_server_finish_request(struct ptlrpc_request *req) +{ + struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; + + if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */ + DEBUG_REQ(D_INFO, req, "free req"); + spin_lock(&svc->srv_at_lock); + req->rq_sent_final = 1; + list_del_init(&req->rq_timed_list); + spin_unlock(&svc->srv_at_lock); + + ptlrpc_server_drop_request(req); } /* This function makes sure dead exports are evicted in a timely manner. @@ -585,26 +610,6 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay) EXIT; } -#ifndef __KERNEL__ -int lu_context_init(struct lu_context *ctx, __u32 tags) -{ - return 0; -} - -void lu_context_fini(struct lu_context *ctx) -{ -} - -void lu_context_enter(struct lu_context *ctx) -{ -} - -void lu_context_exit(struct lu_context *ctx) -{ -} - -#endif - static int ptlrpc_check_req(struct ptlrpc_request *req) { if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) < @@ -644,7 +649,7 @@ static void ptlrpc_at_set_timer(struct ptlrpc_service *svc) /* Set timer for closest deadline */ rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request, rq_timed_list); - next = (__s32)(rq->rq_deadline - cfs_time_current_sec() - + next = (__s32)(rq->rq_deadline - cfs_time_current_sec() - at_early_margin); if (next <= 0) ptlrpc_at_timer((unsigned long)svc); @@ -670,9 +675,6 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req) if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0) return(-ENOSYS); - DEBUG_REQ(D_ADAPTTO, req, "add timed %lds", - req->rq_deadline - cfs_time_current_sec()); - spin_lock(&svc->srv_at_lock); if (unlikely(req->rq_sent_final)) { @@ -684,7 +686,7 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req) /* Add to sorted list. Presumably latest rpcs will have the latest deadlines, so search backward. */ list_for_each_entry_reverse(rq, &svc->srv_at_list, rq_timed_list) { - if (req->rq_deadline > rq->rq_deadline) { + if (req->rq_deadline >= rq->rq_deadline) { list_add(&req->rq_timed_list, &rq->rq_timed_list); found++; break; @@ -711,7 +713,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req, struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; struct ptlrpc_request *reqcopy; struct lustre_msg *reqmsg; - long olddl = req->rq_deadline - cfs_time_current_sec(); + cfs_duration_t olddl = req->rq_deadline - cfs_time_current_sec(); time_t newdl; int rc; ENTRY; @@ -728,34 +730,43 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req, RETURN(0); if (olddl < 0) { - CDEBUG(D_WARNING, "x"LPU64": Already past deadline (%+lds), not" - " sending early reply. Increase at_early_margin (%d)?\n", - req->rq_xid, olddl, at_early_margin); + DEBUG_REQ(D_WARNING, req, "Already past deadline (%+lds), " + "not sending early reply. Consider increasing " + "at_early_margin (%d)?", olddl, at_early_margin); + /* Return an error so we're not re-added to the timed list. */ RETURN(-ETIMEDOUT); } if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0){ - CDEBUG(D_INFO, "Wanted to ask client for more time, but no AT " - "support\n"); + DEBUG_REQ(D_INFO, req, "Wanted to ask client for more time, " + "but no AT support"); RETURN(-ENOSYS); } - if (extra_time) { - /* Fake our processing time into the future to ask the - clients for some extra amount of time */ - extra_time += cfs_time_current_sec() - - req->rq_arrival_time.tv_sec; - at_add(&svc->srv_at_estimate, extra_time); + if (req->rq_export && req->rq_export->exp_in_recovery) { + /* don't increase server estimates during recovery, and give + clients the full recovery time. */ + newdl = cfs_time_current_sec() + + req->rq_export->exp_obd->obd_recovery_timeout; + } else { + if (extra_time) { + /* Fake our processing time into the future to ask the + clients for some extra amount of time */ + extra_time += cfs_time_current_sec() - + req->rq_arrival_time.tv_sec; + at_add(&svc->srv_at_estimate, extra_time); + } + newdl = req->rq_arrival_time.tv_sec + + at_get(&svc->srv_at_estimate); } - - newdl = req->rq_arrival_time.tv_sec + at_get(&svc->srv_at_estimate); if (req->rq_deadline >= newdl) { /* We're not adding any time, no need to send an early reply (e.g. maybe at adaptive_max) */ - CDEBUG(D_ADAPTTO, "x"LPU64": Couldn't add any time (%ld/%ld), " - "not sending early reply\n", req->rq_xid, olddl, - newdl - cfs_time_current_sec()); + DEBUG_REQ(D_WARNING, req, "Couldn't add any time (" + CFS_DURATION_T"/"CFS_DURATION_T"), " + "not sending early reply\n", olddl, + cfs_time_sub(newdl, cfs_time_current_sec())); RETURN(-ETIMEDOUT); } @@ -780,8 +791,8 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req, memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen); if (req->rq_sent_final) { - CDEBUG(D_ADAPTTO, "x"LPU64": normal reply already sent out, " - "abort sending early reply\n", req->rq_xid); + DEBUG_REQ(D_ADAPTTO, reqcopy, "Normal reply already sent out, " + "abort sending early reply\n"); GOTO(out, rc = 0); } @@ -890,8 +901,8 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc) at_get(&svc->srv_at_estimate), delay); } - /* ptlrpc_server_free_request may delete an entry out of the work - list */ + /* ptlrpc_server_finish_request may delete an entry out of + * the work list */ spin_lock(&svc->srv_at_lock); while (!list_empty(&work_list)) { rq = list_entry(work_list.next, struct ptlrpc_request, @@ -905,7 +916,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc) if (ptlrpc_at_send_early_reply(rq, at_extra) == 0) ptlrpc_at_add_timed(rq); - ptlrpc_server_req_decref(rq); + ptlrpc_server_drop_request(rq); spin_lock(&svc->srv_at_lock); } spin_unlock(&svc->srv_at_lock); @@ -1000,8 +1011,9 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc) /* req_in handling should/must be fast */ if (cfs_time_current_sec() - req->rq_arrival_time.tv_sec > 5) - DEBUG_REQ(D_WARNING, req, "Slow req_in handling %lus", - cfs_time_current_sec() - req->rq_arrival_time.tv_sec); + DEBUG_REQ(D_WARNING, req, "Slow req_in handling "CFS_DURATION_T"s", + cfs_time_sub(cfs_time_current_sec(), + req->rq_arrival_time.tv_sec)); /* Set rpc server deadline and add it to the timed list */ deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) & @@ -1028,7 +1040,7 @@ err_req: svc->srv_n_queued_reqs--; svc->srv_n_active_reqs++; spin_unlock(&svc->srv_lock); - ptlrpc_server_free_request(req); + ptlrpc_server_finish_request(req); RETURN(1); } @@ -1071,6 +1083,9 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, spin_unlock(&svc->srv_lock); + if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG)) + libcfs_debug_dumplog(); + do_gettimeofday(&work_start); timediff = cfs_timeval_sub(&work_start, &request->rq_arrival_time,NULL); if (likely(svc->srv_stats != NULL)) { @@ -1092,7 +1107,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, request->rq_session.lc_thread = thread; lu_context_enter(&request->rq_session); - CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid); + CDEBUG(D_NET, "got req "LPU64"\n", request->rq_xid); request->rq_svc_thread = thread; if (thread) @@ -1112,11 +1127,12 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, The deadline is increased if we send an early reply. */ if (cfs_time_current_sec() > request->rq_deadline) { DEBUG_REQ(D_ERROR, request, "Dropping timed-out request from %s" - ": deadline %ld%+lds ago\n", + ": deadline "CFS_DURATION_T":"CFS_DURATION_T"s ago\n", libcfs_id2str(request->rq_peer), - request->rq_deadline - - request->rq_arrival_time.tv_sec, - cfs_time_current_sec() - request->rq_deadline); + cfs_time_sub(request->rq_deadline, + request->rq_arrival_time.tv_sec), + cfs_time_sub(cfs_time_current_sec(), + request->rq_deadline)); goto put_rpc_export; } @@ -1160,10 +1176,12 @@ put_conn: if (unlikely(cfs_time_current_sec() > request->rq_deadline)) { DEBUG_REQ(D_WARNING, request, "Request x"LPU64" took longer " - "than estimated (%ld%+lds); client may timeout.", - request->rq_xid, request->rq_deadline - - request->rq_arrival_time.tv_sec, - cfs_time_current_sec() - request->rq_deadline); + "than estimated ("CFS_DURATION_T":"CFS_DURATION_T"s);" + " client may timeout.", + request->rq_xid, cfs_time_sub(request->rq_deadline, + request->rq_arrival_time.tv_sec), + cfs_time_sub(cfs_time_current_sec(), + request->rq_deadline)); } do_gettimeofday(&work_end); @@ -1189,13 +1207,15 @@ put_conn: } if (unlikely(request->rq_early_count)) { DEBUG_REQ(D_ADAPTTO, request, - "sent %d early replies before finishing in %lds", + "sent %d early replies before finishing in " + CFS_DURATION_T"s", request->rq_early_count, - work_end.tv_sec - request->rq_arrival_time.tv_sec); + cfs_time_sub(work_end.tv_sec, + request->rq_arrival_time.tv_sec)); } out_req: - ptlrpc_server_free_request(request); + ptlrpc_server_finish_request(request); RETURN(1); } @@ -1280,6 +1300,11 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc) if (!rs->rs_on_net) { /* Off the net */ svc->srv_n_difficult_replies--; + if (svc->srv_n_difficult_replies == 0 && svc->srv_is_stopping) + /* wake up threads that are being stopped by + ptlrpc_unregister_service/ptlrpc_stop_threads + and sleep waiting svr_n_difficult_replies == 0 */ + cfs_waitq_broadcast(&svc->srv_waitq); spin_unlock(&svc->srv_lock); class_export_put (exp); @@ -1567,7 +1592,9 @@ static void ptlrpc_stop_thread(struct ptlrpc_service *svc, struct ptlrpc_thread *thread) { struct l_wait_info lwi = { 0 }; + ENTRY; + CDEBUG(D_RPCTRACE, "Stopping thread %p\n", thread); spin_lock(&svc->srv_lock); thread->t_flags = SVC_STOPPING; spin_unlock(&svc->srv_lock); @@ -1581,11 +1608,13 @@ static void ptlrpc_stop_thread(struct ptlrpc_service *svc, spin_unlock(&svc->srv_lock); OBD_FREE_PTR(thread); + EXIT; } void ptlrpc_stop_all_threads(struct ptlrpc_service *svc) { struct ptlrpc_thread *thread; + ENTRY; spin_lock(&svc->srv_lock); while (!list_empty(&svc->srv_threads)) { @@ -1598,6 +1627,7 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc) } spin_unlock(&svc->srv_lock); + EXIT; } int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc) @@ -1614,7 +1644,7 @@ int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc) if (rc == -EMFILE) break; if (rc) { - CERROR("cannot start %s thread #%d: rc %d\n", + CERROR("cannot start %s thread #%d: rc %d\n", svc->srv_thread_name, i, rc); ptlrpc_stop_all_threads(svc); } @@ -1662,7 +1692,7 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc) d.thread = thread; CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name); - + /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we * just drop the VM and FILES in ptlrpc_daemonize() right away. */ @@ -1692,7 +1722,9 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) struct l_wait_info lwi; struct list_head *tmp; struct ptlrpc_reply_state *rs, *t; + ENTRY; + service->srv_is_stopping = 1; cfs_timer_disarm(&service->srv_at_timer); ptlrpc_stop_all_threads(service); @@ -1767,7 +1799,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) list_del(&req->rq_list); service->srv_n_queued_reqs--; service->srv_n_active_reqs++; - ptlrpc_server_free_request(req); + ptlrpc_server_finish_request(req); } while (!list_empty(&service->srv_request_queue)) { struct ptlrpc_request *req = @@ -1779,7 +1811,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) service->srv_n_queued_reqs--; service->srv_n_active_reqs++; - ptlrpc_server_free_request(req); + ptlrpc_server_finish_request(req); } LASSERT(service->srv_n_queued_reqs == 0); LASSERT(service->srv_n_active_reqs == 0); @@ -1822,7 +1854,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) cfs_timer_disarm(&service->srv_at_timer); OBD_FREE_PTR(service); - return 0; + RETURN(0); } /* Returns 0 if the service is healthy.