From 3306aff2b617178f557e368acabea37dd0fa0b55 Mon Sep 17 00:00:00 2001 From: vitaly Date: Tue, 18 Nov 2008 21:40:20 +0000 Subject: [PATCH] Branch b1_8_gate b=16129 i=adilger i=green - a high priority request list is added into service; - once a lock is canceled, all the IO requests, including coming ones, under this lock, are moved into this list; - PING is also added into this list; - once a lock cancel timeout occurs, the timeout is prolonged if there is an IO rpc under this lock; - another request list is added into the export, used to speed up the rpc-lock matching. --- lustre/ptlrpc/service.c | 277 ++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 243 insertions(+), 34 deletions(-) diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 44405f5..175b62f 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -303,7 +303,8 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, svc_handler_t handler, char *name, cfs_proc_dir_entry_t *proc_entry, svcreq_printfn_t svcreq_printfn, - int min_threads, int max_threads, char *threadname) + int min_threads, int max_threads, char *threadname, + svc_hpreq_handler_t hp_handler) { int rc; struct ptlrpc_service *service; @@ -336,11 +337,16 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, service->srv_threads_min = min_threads; service->srv_threads_max = max_threads; service->srv_thread_name = threadname; + service->srv_hpreq_handler = hp_handler; + service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO; + service->srv_hpreq_count = 0; + service->srv_n_hpreq = 0; rc = LNetSetLazyPortal(service->srv_req_portal); LASSERT (rc == 0); CFS_INIT_LIST_HEAD(&service->srv_request_queue); + CFS_INIT_LIST_HEAD(&service->srv_request_hpq); CFS_INIT_LIST_HEAD(&service->srv_idle_rqbds); CFS_INIT_LIST_HEAD(&service->srv_active_rqbds); CFS_INIT_LIST_HEAD(&service->srv_history_rqbds); @@ -498,6 +504,11 @@ static void ptlrpc_server_finish_request(struct ptlrpc_request *req) { struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; + if (req->rq_export) { + class_export_put(req->rq_export); + req->rq_export = NULL; + } + if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */ DEBUG_REQ(D_INFO, req, "free req"); @@ -515,7 +526,7 @@ static void ptlrpc_server_finish_request(struct ptlrpc_request *req) static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay) { struct obd_export *oldest_exp; - time_t oldest_time; + time_t oldest_time, new_time; ENTRY; @@ -526,9 +537,13 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay) of the list, we can be really lazy here - we don't have to evict at the exact right moment. Eventually, all silent exports will make it to the top of the list. */ - exp->exp_last_request_time = max(exp->exp_last_request_time, - cfs_time_current_sec() + extra_delay); + /* Do not pay attention on 1sec or smaller renewals. */ + new_time = cfs_time_current_sec() + extra_delay; + if (exp->exp_last_request_time + 1 /*second */ >= new_time) + RETURN_EXIT; + + exp->exp_last_request_time = new_time; CDEBUG(D_INFO, "updating export %s at %ld\n", exp->exp_client_uuid.uuid, exp->exp_last_request_time); @@ -541,8 +556,7 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay) if (list_empty(&exp->exp_obd_chain_timed)) { /* this one is not timed */ spin_unlock(&exp->exp_obd->obd_dev_lock); - EXIT; - return; + RETURN_EXIT; } list_move_tail(&exp->exp_obd_chain_timed, @@ -892,6 +906,167 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc) RETURN(0); } +/** + * Put the request to the export list if the request may become + * a high priority one. + */ +static int ptlrpc_hpreq_init(struct ptlrpc_service *svc, + struct ptlrpc_request *req) +{ + int rc; + ENTRY; + + if (svc->srv_hpreq_handler) { + rc = svc->srv_hpreq_handler(req); + if (rc) + RETURN(rc); + } + if (req->rq_export && req->rq_ops) { + spin_lock(&req->rq_export->exp_lock); + list_add(&req->rq_exp_list, &req->rq_export->exp_queued_rpc); + spin_unlock(&req->rq_export->exp_lock); + } + + RETURN(0); +} + +/** Remove the request from the export list. */ +static void ptlrpc_hpreq_fini(struct ptlrpc_request *req) +{ + ENTRY; + if (req->rq_export && req->rq_ops) { + spin_lock(&req->rq_export->exp_lock); + list_del_init(&req->rq_exp_list); + spin_unlock(&req->rq_export->exp_lock); + } + EXIT; +} + +/** + * Make the request a high priority one. + * + * All the high priority requests are queued in a separate FIFO + * ptlrpc_service::srv_request_hpq list which is parallel to + * ptlrpc_service::srv_request_queue list but has a higher priority + * for handling. + * + * \see ptlrpc_server_handle_request(). + */ +static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service *svc, + struct ptlrpc_request *req) +{ + ENTRY; + LASSERT(svc != NULL); + spin_lock(&req->rq_lock); + if (req->rq_hp == 0) { + int opc = lustre_msg_get_opc(req->rq_reqmsg); + + /* Add to the high priority queue. */ + list_move_tail(&req->rq_list, &svc->srv_request_hpq); + req->rq_hp = 1; + if (opc != OBD_PING) + DEBUG_REQ(D_NET, req, "high priority req"); + } + spin_unlock(&req->rq_lock); + EXIT; +} + +void ptlrpc_hpreq_reorder(struct ptlrpc_request *req) +{ + struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; + ENTRY; + + spin_lock(&svc->srv_lock); + /* It may happen that the request is already taken for the processing + * but still in the export list, do not re-add it into the HP list. */ + if (req->rq_phase == RQ_PHASE_NEW) + ptlrpc_hpreq_reorder_nolock(svc, req); + spin_unlock(&svc->srv_lock); + EXIT; +} + +/** Check if the request if a high priority one. */ +static int ptlrpc_server_hpreq_check(struct ptlrpc_request *req) +{ + int opc, rc = 0; + ENTRY; + + /* Check by request opc. */ + opc = lustre_msg_get_opc(req->rq_reqmsg); + if (opc == OBD_PING) + RETURN(1); + + /* Perform request specific check. */ + if (req->rq_ops && req->rq_ops->hpreq_check) + rc = req->rq_ops->hpreq_check(req); + RETURN(rc); +} + +/** Check if a request is a high priority one. */ +static int ptlrpc_server_request_add(struct ptlrpc_service *svc, + struct ptlrpc_request *req) +{ + int rc; + ENTRY; + + rc = ptlrpc_server_hpreq_check(req); + if (rc < 0) + RETURN(rc); + + spin_lock(&svc->srv_lock); + /* Before inserting the request into the queue, check if it is not + * inserted yet, or even already handled -- it may happen due to + * a racing ldlm_server_blocking_ast(). */ + if (req->rq_phase == RQ_PHASE_NEW && list_empty(&req->rq_list)) { + if (rc) + ptlrpc_hpreq_reorder_nolock(svc, req); + else + list_add_tail(&req->rq_list, &svc->srv_request_queue); + } + spin_unlock(&svc->srv_lock); + + RETURN(0); +} + +/* Only allow normal priority requests on a service that has a high-priority + * queue if forced (i.e. cleanup), if there are other high priority requests + * already being processed (i.e. those threads can service more high-priority + * requests), or if there are enough idle threads that a later thread can do + * a high priority request. */ +static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force) +{ + return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 || + svc->srv_n_active_reqs < svc->srv_threads_running - 2; +} + +static struct ptlrpc_request * +ptlrpc_server_request_get(struct ptlrpc_service *svc, int force) +{ + struct ptlrpc_request *req = NULL; + ENTRY; + + if (ptlrpc_server_allow_normal(svc, force) && + !list_empty(&svc->srv_request_queue) && + (list_empty(&svc->srv_request_hpq) || + svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) { + req = list_entry(svc->srv_request_queue.next, + struct ptlrpc_request, rq_list); + svc->srv_hpreq_count = 0; + } else if (!list_empty(&svc->srv_request_hpq)) { + req = list_entry(svc->srv_request_hpq.next, + struct ptlrpc_request, rq_list); + svc->srv_hpreq_count++; + } + RETURN(req); +} + +static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force) +{ + return ((ptlrpc_server_allow_normal(svc, force) && + !list_empty(&svc->srv_request_queue)) || + !list_empty(&svc->srv_request_hpq)); +} + /* Handle freshly incoming reqs, add to timed early reply list, pass on to regular request queue */ static int @@ -953,10 +1128,9 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc) lustre_msg_get_handle(req->rq_reqmsg)); if (req->rq_export) { rc = ptlrpc_check_req(req); - class_export_put(req->rq_export); - req->rq_export = NULL; if (rc) goto err_req; + ptlrpc_update_export_timer(req->rq_export, 0); } /* req_in handling should/must be fast */ @@ -976,12 +1150,15 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc) } ptlrpc_at_add_timed(req); + rc = ptlrpc_hpreq_init(svc, req); + if (rc) + GOTO(err_req, rc); /* Move it over to the request processing queue */ - spin_lock(&svc->srv_lock); - list_add_tail(&req->rq_list, &svc->srv_request_queue); + rc = ptlrpc_server_request_add(svc, req); + if (rc) + GOTO(err_req, rc); cfs_waitq_signal(&svc->srv_waitq); - spin_unlock(&svc->srv_lock); RETURN(1); err_req: @@ -1003,13 +1180,14 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, struct timeval work_start; struct timeval work_end; long timediff; - int rc; + int opc, rc; + int fail_opc = 0; ENTRY; LASSERT(svc); spin_lock(&svc->srv_lock); - if (list_empty (&svc->srv_request_queue) || + if (!ptlrpc_server_request_pending(svc, 0) || ( #ifndef __KERNEL__ /* !@%$# liblustre only has 1 thread */ @@ -1024,14 +1202,46 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, RETURN(0); } - request = list_entry(svc->srv_request_queue.next, - struct ptlrpc_request, rq_list); - list_del_init (&request->rq_list); + request = ptlrpc_server_request_get(svc, 0); + if (request == NULL) { + spin_unlock(&svc->srv_lock); + RETURN(0); + } + + opc = lustre_msg_get_opc(request->rq_reqmsg); + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT)) + fail_opc = OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT; + else if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT)) + fail_opc = OBD_FAIL_PTLRPC_HPREQ_TIMEOUT; + + if (unlikely(fail_opc)) { + if (request->rq_export && request->rq_ops) { + spin_unlock(&svc->srv_lock); + OBD_FAIL_TIMEOUT(fail_opc, 4); + spin_lock(&svc->srv_lock); + request = ptlrpc_server_request_get(svc, 0); + if (request == NULL) { + spin_unlock(&svc->srv_lock); + RETURN(0); + } + LASSERT(ptlrpc_server_request_pending(svc, 0)); + } + } + + list_del_init(&request->rq_list); svc->srv_n_queued_reqs--; svc->srv_n_active_reqs++; + if (request->rq_hp) + svc->srv_n_hpreq++; + + /* The phase is changed under the lock here because we need to know + * the request is under processing (see ptlrpc_hpreq_reorder()). */ + ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET); spin_unlock(&svc->srv_lock); + ptlrpc_hpreq_fini(request); + if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG)) libcfs_debug_dumplog(); @@ -1051,9 +1261,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid); request->rq_svc_thread = thread; - request->rq_export = class_conn2export( - lustre_msg_get_handle(request->rq_reqmsg)); - if (request->rq_export) { if (ptlrpc_check_req(request)) goto put_conn; @@ -1073,8 +1280,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, goto put_rpc_export; } - ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET); - CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc " "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(), (request->rq_export ? @@ -1106,9 +1311,6 @@ put_rpc_export: class_export_rpc_put(export); put_conn: - if (request->rq_export != NULL) - class_export_put(request->rq_export); - if (cfs_time_current_sec() > request->rq_deadline) { DEBUG_REQ(D_WARNING, request, "Request x"LPU64" took longer " "than estimated (%ld%+lds); client may timeout.", @@ -1145,6 +1347,10 @@ put_conn: work_end.tv_sec - request->rq_arrival_time.tv_sec); } + spin_lock(&svc->srv_lock); + if (request->rq_hp) + svc->srv_n_hpreq--; + spin_unlock(&svc->srv_lock); ptlrpc_server_finish_request(request); RETURN(1); @@ -1426,7 +1632,7 @@ static int ptlrpc_main(void *arg) svc->srv_rqbd_timeout == 0) || !list_empty(&svc->srv_req_in_queue) || !list_empty(&svc->srv_reply_queue) || - (!list_empty(&svc->srv_request_queue) && + (ptlrpc_server_request_pending(svc, 0) && (svc->srv_n_active_reqs < (svc->srv_threads_running - 1))) || svc->srv_at_check, @@ -1461,7 +1667,7 @@ static int ptlrpc_main(void *arg) ptlrpc_at_check_timed(svc); /* don't handle requests in the last thread */ - if (!list_empty (&svc->srv_request_queue) && + if (ptlrpc_server_request_pending(svc, 0) && (svc->srv_n_active_reqs < (svc->srv_threads_running - 1))) ptlrpc_server_handle_request(svc, thread); @@ -1707,15 +1913,14 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) service->srv_n_active_reqs++; ptlrpc_server_finish_request(req); } - while (!list_empty(&service->srv_request_queue)) { - struct ptlrpc_request *req = - list_entry(service->srv_request_queue.next, - struct ptlrpc_request, - rq_list); + while (ptlrpc_server_request_pending(service, 1)) { + struct ptlrpc_request *req; + req = ptlrpc_server_request_get(service, 1); list_del(&req->rq_list); service->srv_n_queued_reqs--; service->srv_n_active_reqs++; + ptlrpc_hpreq_fini(req); ptlrpc_server_finish_request(req); } LASSERT(service->srv_n_queued_reqs == 0); @@ -1779,14 +1984,18 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc) do_gettimeofday(&right_now); spin_lock(&svc->srv_lock); - if (list_empty(&svc->srv_request_queue)) { + if (!ptlrpc_server_request_pending(svc, 1)) { spin_unlock(&svc->srv_lock); return 0; } /* How long has the next entry been waiting? */ - request = list_entry(svc->srv_request_queue.next, - struct ptlrpc_request, rq_list); + if (list_empty(&svc->srv_request_queue)) + request = list_entry(svc->srv_request_hpq.next, + struct ptlrpc_request, rq_list); + else + request = list_entry(svc->srv_request_queue.next, + struct ptlrpc_request, rq_list); timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL); spin_unlock(&svc->srv_lock); -- 1.8.3.1