X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fptlrpc%2Fservice.c;h=cfb798f0c7ce259ca82eeb22499991377d12146c;hp=a1d1060034fd0455708e40f1755a675c3f218817;hb=9afee0366ab03c9feba88cc8eb0027aa0fb9bf6b;hpb=96b814bda9b2d923885291849ae0f14f660c90e1 diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index a1d1060..cfb798f 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -26,7 +26,7 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. */ /* @@ -161,6 +161,10 @@ ptlrpc_grow_req_bufs(struct ptlrpc_service *svc) return (0); } +/** + * Part of Rep-Ack logic. + * Puts a lock and its mode into reply state assotiated to request reply. + */ void ptlrpc_save_lock(struct ptlrpc_request *req, struct lustre_handle *lock, int mode, int no_ack) @@ -282,9 +286,9 @@ static void rs_batch_add(struct rs_batch *b, struct ptlrpc_reply_state *rs) if (svc != b->rsb_svc || b->rsb_n_replies >= MAX_SCHEDULED) { if (b->rsb_svc != NULL) { rs_batch_dispatch(b); - cfs_spin_unlock(&b->rsb_svc->srv_lock); + cfs_spin_unlock(&b->rsb_svc->srv_rs_lock); } - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rs_lock); b->rsb_svc = svc; } cfs_spin_lock(&rs->rs_lock); @@ -309,7 +313,7 @@ static void rs_batch_fini(struct rs_batch *b) { if (b->rsb_svc != 0) { rs_batch_dispatch(b); - cfs_spin_unlock(&b->rsb_svc->srv_lock); + cfs_spin_unlock(&b->rsb_svc->srv_rs_lock); } } @@ -324,6 +328,10 @@ static void rs_batch_fini(struct rs_batch *b) #endif /* __KERNEL__ */ +/** + * Put reply state into a queue for processing because we received + * ACK from the client + */ void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs) { #ifdef __KERNEL__ @@ -349,12 +357,12 @@ ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs) { ENTRY; - LASSERT_SPIN_LOCKED(&rs->rs_service->srv_lock); + LASSERT_SPIN_LOCKED(&rs->rs_service->srv_rs_lock); LASSERT_SPIN_LOCKED(&rs->rs_lock); LASSERT (rs->rs_difficult); - rs->rs_scheduled_ever = 1; /* flag any notification attempt */ + rs->rs_scheduled_ever = 1; /* flag any notification attempt */ - if (rs->rs_scheduled) { /* being set up or already notified */ + if (rs->rs_scheduled) { /* being set up or already notified */ EXIT; return; } @@ -439,10 +447,14 @@ ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc) return (-1); } +/** + * Start a service with parameters from struct ptlrpc_service_conf \a c + * as opposed to directly calling ptlrpc_init_svc with tons of arguments. + */ struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c, svc_handler_t h, char *name, struct proc_dir_entry *proc_entry, - svcreq_printfn_t prntfn, + svc_req_printfn_t prntfn, char *threadname) { return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize, @@ -463,13 +475,31 @@ static void ptlrpc_at_timer(unsigned long castmeharder) cfs_waitq_signal(&svc->srv_waitq); } -/* @threadname should be 11 characters or less - 3 will be added on */ +/** + * Initialize service on a given portal. + * This includes starting serving threads , allocating and posting rqbds and + * so on. + * \a nbufs is how many buffers to post + * \a bufsize is buffer size to post + * \a max_req_size - maximum request size to be accepted for this service + * \a max_reply_size maximum reply size this service can ever send + * \a req_portal - portal to listed for requests on + * \a rep_portal - portal of where to send replies to + * \a watchdog_factor soft watchdog timeout multiplifier to print stuck service traces. + * \a handler - function to process every new request + * \a name - service name + * \a proc_entry - entry in the /proc tree for sttistics reporting + * \a min_threads \a max_threads - min/max number of service threads to start. + * \a threadname should be 11 characters or less - 3 will be added on + * \a hp_handler - function to determine priority of the request, also called + * on every new request. + */ struct ptlrpc_service * ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, int req_portal, int rep_portal, int watchdog_factor, svc_handler_t handler, char *name, cfs_proc_dir_entry_t *proc_entry, - svcreq_printfn_t svcreq_printfn, + svc_req_printfn_t svcreq_printfn, int min_threads, int max_threads, char *threadname, __u32 ctx_tags, svc_hpreq_handler_t hp_handler) @@ -492,6 +522,8 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, service->srv_name = name; cfs_spin_lock_init(&service->srv_lock); + cfs_spin_lock_init(&service->srv_rq_lock); + cfs_spin_lock_init(&service->srv_rs_lock); CFS_INIT_LIST_HEAD(&service->srv_threads); cfs_waitq_init(&service->srv_waitq); @@ -502,7 +534,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, service->srv_req_portal = req_portal; service->srv_watchdog_factor = watchdog_factor; service->srv_handler = handler; - service->srv_request_history_print_fn = svcreq_printfn; + service->srv_req_printfn = svcreq_printfn; service->srv_request_seq = 1; /* valid seq #s start at 1 */ service->srv_request_max_cull_seq = 0; service->srv_threads_min = min_threads; @@ -512,7 +544,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, service->srv_hpreq_handler = hp_handler; service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO; service->srv_hpreq_count = 0; - service->srv_n_hpreq = 0; + service->srv_n_active_hpreq = 0; rc = LNetSetLazyPortal(service->srv_req_portal); LASSERT (rc == 0); @@ -618,9 +650,9 @@ void ptlrpc_server_active_request_inc(struct ptlrpc_request *req) struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd; struct ptlrpc_service *svc = rqbd->rqbd_service; - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rq_lock); svc->srv_n_active_reqs++; - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); } /** @@ -631,9 +663,9 @@ void ptlrpc_server_active_request_dec(struct ptlrpc_request *req) struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd; struct ptlrpc_service *svc = rqbd->rqbd_service; - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rq_lock); svc->srv_n_active_reqs--; - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); } /** @@ -675,7 +707,6 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req) cfs_spin_lock(&svc->srv_lock); - svc->srv_n_active_reqs--; cfs_list_add(&req->rq_list, &rqbd->rqbd_reqs); refcount = --(rqbd->rqbd_refcount); @@ -746,14 +777,23 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req) * to finish a request: stop sending more early replies, and release * the request. should be called after we finished handling the request. */ -static void ptlrpc_server_finish_request(struct ptlrpc_request *req) +static void ptlrpc_server_finish_request(struct ptlrpc_service *svc, + struct ptlrpc_request *req) { + cfs_spin_lock(&svc->srv_rq_lock); + svc->srv_n_active_reqs--; + if (req->rq_hp) + svc->srv_n_active_hpreq--; + cfs_spin_unlock(&svc->srv_rq_lock); + ptlrpc_server_drop_request(req); } -/* This function makes sure dead exports are evicted in a timely manner. - This function is only called when some export receives a message (i.e., - the network is up.) */ +/** + * This function makes sure dead exports are evicted in a timely manner. + * This function is only called when some export receives a message (i.e., + * the network is up.) + */ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay) { struct obd_export *oldest_exp; @@ -833,6 +873,10 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay) EXIT; } +/** + * Sanity check request \a req. + * Return 0 if all is ok, error code otherwise. + */ static int ptlrpc_check_req(struct ptlrpc_request *req) { if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) < @@ -1201,10 +1245,10 @@ static int ptlrpc_hpreq_init(struct ptlrpc_service *svc, RETURN(rc); } if (req->rq_export && req->rq_ops) { - cfs_spin_lock(&req->rq_export->exp_lock); + cfs_spin_lock_bh(&req->rq_export->exp_rpc_lock); cfs_list_add(&req->rq_exp_list, &req->rq_export->exp_queued_rpc); - cfs_spin_unlock(&req->rq_export->exp_lock); + cfs_spin_unlock_bh(&req->rq_export->exp_rpc_lock); } RETURN(0); @@ -1215,9 +1259,9 @@ static void ptlrpc_hpreq_fini(struct ptlrpc_request *req) { ENTRY; if (req->rq_export && req->rq_ops) { - cfs_spin_lock(&req->rq_export->exp_lock); + cfs_spin_lock_bh(&req->rq_export->exp_rpc_lock); cfs_list_del_init(&req->rq_exp_list); - cfs_spin_unlock(&req->rq_export->exp_lock); + cfs_spin_unlock_bh(&req->rq_export->exp_rpc_lock); } EXIT; } @@ -1251,17 +1295,20 @@ static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service *svc, EXIT; } +/** + * \see ptlrpc_hpreq_reorder_nolock + */ void ptlrpc_hpreq_reorder(struct ptlrpc_request *req) { struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; ENTRY; - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rq_lock); /* It may happen that the request is already taken for the processing * but still in the export list, do not re-add it into the HP list. */ if (req->rq_phase == RQ_PHASE_NEW) ptlrpc_hpreq_reorder_nolock(svc, req); - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); EXIT; } @@ -1293,7 +1340,7 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc, if (rc < 0) RETURN(rc); - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rq_lock); /* Before inserting the request into the queue, check if it is not * inserted yet, or even already handled -- it may happen due to * a racing ldlm_server_blocking_ast(). */ @@ -1304,51 +1351,110 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc, cfs_list_add_tail(&req->rq_list, &svc->srv_request_queue); } - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); RETURN(0); } -/* Only allow normal priority requests on a service that has a high-priority +/** + * Allow to handle high priority request + * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock + * to get reliable result + */ +static int ptlrpc_server_allow_high(struct ptlrpc_service *svc, int force) +{ + if (force) + return 1; + + if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1) + return 0; + + return cfs_list_empty(&svc->srv_request_queue) || + svc->srv_hpreq_count < svc->srv_hpreq_ratio; +} + +static int ptlrpc_server_high_pending(struct ptlrpc_service *svc, int force) +{ + return ptlrpc_server_allow_high(svc, force) && + !cfs_list_empty(&svc->srv_request_hpq); +} + +/** + * Only allow normal priority requests on a service that has a high-priority * queue if forced (i.e. cleanup), if there are other high priority requests * already being processed (i.e. those threads can service more high-priority * requests), or if there are enough idle threads that a later thread can do - * a high priority request. */ + * a high priority request. + * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock + * to get reliable result + */ static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force) { - return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 || - svc->srv_threads_running < svc->srv_threads_started - 2; + if (force || + svc->srv_n_active_reqs < svc->srv_threads_running - 2) + return 1; + + if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1) + return 0; + + return svc->srv_n_active_hpreq > 0 || svc->srv_hpreq_handler == NULL; +} + +static int ptlrpc_server_normal_pending(struct ptlrpc_service *svc, int force) +{ + return ptlrpc_server_allow_normal(svc, force) && + !cfs_list_empty(&svc->srv_request_queue); } +/** + * Returns true if there are requests available in incoming + * request queue for processing and it is allowed to fetch them. + * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock + * to get reliable result + * \see ptlrpc_server_allow_normal + * \see ptlrpc_server_allow high + */ +static inline int +ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force) +{ + return ptlrpc_server_high_pending(svc, force) || + ptlrpc_server_normal_pending(svc, force); +} + +/** + * Fetch a request for processing from queue of unprocessed requests. + * Favors high-priority requests. + * Returns a pointer to fetched request. + */ static struct ptlrpc_request * -ptlrpc_server_request_get(struct ptlrpc_service *svc) +ptlrpc_server_request_get(struct ptlrpc_service *svc, int force) { - struct ptlrpc_request *req = NULL; + struct ptlrpc_request *req; ENTRY; - if (!cfs_list_empty(&svc->srv_request_queue) && - (cfs_list_empty(&svc->srv_request_hpq) || - svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) { - req = cfs_list_entry(svc->srv_request_queue.next, - struct ptlrpc_request, rq_list); - svc->srv_hpreq_count = 0; - } else if (!cfs_list_empty(&svc->srv_request_hpq)) { + if (ptlrpc_server_high_pending(svc, force)) { req = cfs_list_entry(svc->srv_request_hpq.next, struct ptlrpc_request, rq_list); svc->srv_hpreq_count++; + RETURN(req); + } - RETURN(req); -} -static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force) -{ - return ((ptlrpc_server_allow_normal(svc, force) && - !cfs_list_empty(&svc->srv_request_queue)) || - !cfs_list_empty(&svc->srv_request_hpq)); + if (ptlrpc_server_normal_pending(svc, force)) { + req = cfs_list_entry(svc->srv_request_queue.next, + struct ptlrpc_request, rq_list); + svc->srv_hpreq_count = 0; + RETURN(req); + } + RETURN(NULL); } -/* Handle freshly incoming reqs, add to timed early reply list, - pass on to regular request queue */ +/** + * Handle freshly incoming reqs, add to timed early reply list, + * pass on to regular request queue. + * All incoming requests pass through here before getting into + * ptlrpc_server_handle_req later on. + */ static int ptlrpc_server_handle_req_in(struct ptlrpc_service *svc) { @@ -1368,8 +1474,14 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc) req = cfs_list_entry(svc->srv_req_in_queue.next, struct ptlrpc_request, rq_list); cfs_list_del_init (&req->rq_list); + svc->srv_n_queued_reqs--; /* Consider this still a "queued" request as far as stats are concerned */ + /* ptlrpc_hpreq_init() inserts it to the export list and by the time + * of ptlrpc_server_request_add() it could be already handled and + * released. To not lose request in between, take an extra reference + * on the request. */ + ptlrpc_request_addref(req); cfs_spin_unlock(&svc->srv_lock); /* go through security check/transform */ @@ -1479,18 +1591,23 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc) if (rc) GOTO(err_req, rc); cfs_waitq_signal(&svc->srv_waitq); + ptlrpc_server_drop_request(req); RETURN(1); err_req: - cfs_spin_lock(&svc->srv_lock); - svc->srv_n_queued_reqs--; + ptlrpc_server_drop_request(req); + cfs_spin_lock(&svc->srv_rq_lock); svc->srv_n_active_reqs++; - cfs_spin_unlock(&svc->srv_lock); - ptlrpc_server_finish_request(req); + cfs_spin_unlock(&svc->srv_rq_lock); + ptlrpc_server_finish_request(svc, req); RETURN(1); } +/** + * Main incoming request handling logic. + * Calls handler function from service to do actual processing. + */ static int ptlrpc_server_handle_request(struct ptlrpc_service *svc, struct ptlrpc_thread *thread) @@ -1506,17 +1623,17 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, LASSERT(svc); - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rq_lock); #ifndef __KERNEL__ /* !@%$# liblustre only has 1 thread */ if (cfs_atomic_read(&svc->srv_n_difficult_replies) != 0) { - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); RETURN(0); } #endif - request = ptlrpc_server_request_get(svc); + request = ptlrpc_server_request_get(svc, 0); if (request == NULL) { - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); RETURN(0); } @@ -1528,27 +1645,26 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, if (unlikely(fail_opc)) { if (request->rq_export && request->rq_ops) { - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); OBD_FAIL_TIMEOUT(fail_opc, 4); - cfs_spin_lock(&svc->srv_lock); - request = ptlrpc_server_request_get(svc); + cfs_spin_lock(&svc->srv_rq_lock); + request = ptlrpc_server_request_get(svc, 0); if (request == NULL) { - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); RETURN(0); } } } cfs_list_del_init(&request->rq_list); - svc->srv_n_queued_reqs--; svc->srv_n_active_reqs++; if (request->rq_hp) - svc->srv_n_hpreq++; + svc->srv_n_active_hpreq++; /* The phase is changed under the lock here because we need to know * the request is under processing (see ptlrpc_hpreq_reorder()). */ ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET); - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); ptlrpc_hpreq_fini(request); @@ -1680,11 +1796,7 @@ put_conn: } out_req: - cfs_spin_lock(&svc->srv_lock); - if (request->rq_hp) - svc->srv_n_hpreq--; - cfs_spin_unlock(&svc->srv_lock); - ptlrpc_server_finish_request(request); + ptlrpc_server_finish_request(svc, request); RETURN(1); } @@ -1785,7 +1897,6 @@ ptlrpc_handle_rs (struct ptlrpc_reply_state *rs) class_export_put (exp); rs->rs_export = NULL; ptlrpc_rs_decref (rs); - cfs_atomic_dec (&svc->srv_outstanding_replies); if (cfs_atomic_dec_and_test(&svc->srv_n_difficult_replies) && svc->srv_is_stopping) cfs_waitq_broadcast(&svc->srv_waitq); @@ -1813,14 +1924,14 @@ ptlrpc_server_handle_reply(struct ptlrpc_service *svc) struct ptlrpc_reply_state *rs = NULL; ENTRY; - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rs_lock); if (!cfs_list_empty(&svc->srv_reply_queue)) { rs = cfs_list_entry(svc->srv_reply_queue.prev, struct ptlrpc_reply_state, rs_list); cfs_list_del_init(&rs->rs_list); } - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rs_lock); if (rs != NULL) ptlrpc_handle_rs(rs); RETURN(rs != NULL); @@ -1900,93 +2011,77 @@ ptlrpc_retry_rqbds(void *arg) return (-ETIMEDOUT); } -/** - * Status bits to pass todo info from - * ptlrpc_main_check_event to ptlrpc_main. - */ -#define PTLRPC_MAIN_STOPPING 0x01 -#define PTLRPC_MAIN_IN_REQ 0x02 -#define PTLRPC_MAIN_ACTIVE_REQ 0x04 -#define PTLRPC_MAIN_CHECK_TIMED 0x08 -#define PTLRPC_MAIN_REPOST 0x10 +static inline int +ptlrpc_threads_enough(struct ptlrpc_service *svc) +{ + return svc->srv_n_active_reqs < + svc->srv_threads_running - 1 - (svc->srv_hpreq_handler != NULL); +} /** - * A container to share per-thread status variables between - * ptlrpc_main_check_event and ptlrpc_main functions. + * allowed to create more threads + * user can call it w/o any lock but need to hold ptlrpc_service::srv_lock to + * get reliable result */ -struct ptlrpc_main_check_s { - /** todo info for the ptrlrpc_main */ - int todo; - /** is this thread counted as running or not? */ - int running; -}; +static inline int +ptlrpc_threads_increasable(struct ptlrpc_service *svc) +{ + return svc->srv_threads_running + + svc->srv_threads_starting < svc->srv_threads_max; +} /** - * Check whether current service thread has work to do. + * too many requests and allowed to create more threads */ -static int ptlrpc_main_check_event(struct ptlrpc_thread *t, - struct ptlrpc_main_check_s *status) +static inline int +ptlrpc_threads_need_create(struct ptlrpc_service *svc) { - struct ptlrpc_service *svc = t->t_svc; - ENTRY; + return !ptlrpc_threads_enough(svc) && ptlrpc_threads_increasable(svc); +} - status->todo = 0; +static inline int +ptlrpc_thread_stopping(struct ptlrpc_thread *thread) +{ + return (thread->t_flags & SVC_STOPPING) != 0 || + thread->t_svc->srv_is_stopping; +} - /* check the stop flags w/o any locking to make all - * concurrently running threads stop faster. */ - if (unlikely((t->t_flags & SVC_STOPPING) || - svc->srv_is_stopping)) { - status->todo |= PTLRPC_MAIN_STOPPING; - goto out; - } +static inline int +ptlrpc_rqbd_pending(struct ptlrpc_service *svc) +{ + return !cfs_list_empty(&svc->srv_idle_rqbds) && + svc->srv_rqbd_timeout == 0; +} - cfs_spin_lock(&svc->srv_lock); - /* count this thread as not running before possible sleep in - * the outer wait event if it is not done yet. */ - if (status->running) { - LASSERT(svc->srv_threads_running > 0); - svc->srv_threads_running--; - status->running = 0; - } - /* Process all incoming reqs before handling any */ - if (!cfs_list_empty(&svc->srv_req_in_queue)) { - status->todo |= PTLRPC_MAIN_IN_REQ; - } - /* Don't handle regular requests in the last thread, in order - * to handle any incoming reqs, early replies, etc. */ - if (ptlrpc_server_request_pending(svc, 0) && - (svc->srv_threads_running < (svc->srv_threads_started - 1))) { - status->todo |= PTLRPC_MAIN_ACTIVE_REQ; - } - if (svc->srv_at_check) { - status->todo |= PTLRPC_MAIN_CHECK_TIMED; - } - if ((!cfs_list_empty(&svc->srv_idle_rqbds) && - svc->srv_rqbd_timeout == 0)) { - status->todo |= PTLRPC_MAIN_REPOST; - } - /* count this thread as active if it goes out the outer - * wait event */ - if (status->todo) { - svc->srv_threads_running++; - status->running = 1; - } - cfs_spin_unlock(&svc->srv_lock); - out: - RETURN(status->todo); +static inline int +ptlrpc_at_check(struct ptlrpc_service *svc) +{ + return svc->srv_at_check; } /** - * Main prlrpc service thread routine. + * requests wait on preprocessing + * user can call it w/o any lock but need to hold ptlrpc_service::srv_lock to + * get reliable result + */ +static inline int +ptlrpc_server_request_waiting(struct ptlrpc_service *svc) +{ + return !cfs_list_empty(&svc->srv_req_in_queue); +} + +/** + * Main thread body for service threads. + * Waits in a loop waiting for new requests to process to appear. + * Every time an incoming requests is added to its queue, a waitq + * is woken up and one of the threads will handle it. */ static int ptlrpc_main(void *arg) { struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg; struct ptlrpc_service *svc = data->svc; struct ptlrpc_thread *thread = data->thread; - struct obd_device *dev = data->dev; struct ptlrpc_reply_state *rs; - struct ptlrpc_main_check_s st; #ifdef WITH_GROUP_INFO cfs_group_info_t *ginfo = NULL; #endif @@ -2050,11 +2145,17 @@ static int ptlrpc_main(void *arg) } cfs_spin_lock(&svc->srv_lock); + + LASSERT((thread->t_flags & SVC_STARTING) != 0); + thread->t_flags &= ~SVC_STARTING; + svc->srv_threads_starting--; + /* SVC_STOPPING may already be set here if someone else is trying * to stop the service while this new thread has been dynamically * forked. We still set SVC_RUNNING to let our creator know that * we are now running, however we will exit as soon as possible */ thread->t_flags |= SVC_RUNNING; + svc->srv_threads_running++; cfs_spin_unlock(&svc->srv_lock); /* @@ -2065,20 +2166,16 @@ static int ptlrpc_main(void *arg) thread->t_watchdog = lc_watchdog_add(CFS_GET_TIMEOUT(svc), NULL, NULL); - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rs_lock); cfs_list_add(&rs->rs_list, &svc->srv_free_rs_list); - cfs_spin_unlock(&svc->srv_lock); cfs_waitq_signal(&svc->srv_free_rs_waitq); + cfs_spin_unlock(&svc->srv_rs_lock); CDEBUG(D_NET, "service thread %d (#%d) started\n", thread->t_id, svc->srv_threads_running); /* XXX maintain a list of all managed devices: insert here */ - - st.running = 0; - st.todo = 0; - - while (!(st.todo & PTLRPC_MAIN_STOPPING)) { + while (!ptlrpc_thread_stopping(thread)) { /* Don't exit while there are replies to be handled */ struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout, ptlrpc_retry_rqbds, svc); @@ -2087,35 +2184,44 @@ static int ptlrpc_main(void *arg) cfs_cond_resched(); - l_wait_event_exclusive (svc->srv_waitq, - ptlrpc_main_check_event(thread, &st), - &lwi); + l_wait_event_exclusive_head(svc->srv_waitq, + ptlrpc_thread_stopping(thread) || + ptlrpc_server_request_waiting(svc) || + ptlrpc_server_request_pending(svc, 0) || + ptlrpc_rqbd_pending(svc) || + ptlrpc_at_check(svc), &lwi); + + if (ptlrpc_thread_stopping(thread)) + break; lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc)); ptlrpc_check_rqbd_pool(svc); - if (svc->srv_threads_started < svc->srv_threads_max && - svc->srv_n_active_reqs >= (svc->srv_threads_started - 1)) + if (ptlrpc_threads_need_create(svc)) { /* Ignore return code - we tried... */ - ptlrpc_start_thread(dev, svc); + ptlrpc_start_thread(svc); + } - if (st.todo & PTLRPC_MAIN_IN_REQ) { + /* Process all incoming reqs before handling any */ + if (ptlrpc_server_request_waiting(svc)) { ptlrpc_server_handle_req_in(svc); /* but limit ourselves in case of flood */ - if (counter++ < 1000) + if (counter++ < 100) continue; counter = 0; } - if (st.todo & PTLRPC_MAIN_CHECK_TIMED) { + + if (ptlrpc_at_check(svc)) ptlrpc_at_check_timed(svc); - } - if (st.todo & PTLRPC_MAIN_ACTIVE_REQ) { + + if (ptlrpc_server_request_pending(svc, 0)) { lu_context_enter(&env.le_ctx); ptlrpc_server_handle_request(svc, thread); lu_context_exit(&env.le_ctx); } - if ((st.todo & PTLRPC_MAIN_REPOST) && + + if (ptlrpc_rqbd_pending(svc) && ptlrpc_server_post_idle_rqbds(svc) < 0) { /* I just failed to repost request buffers. * Wait for a timeout (unless something else @@ -2141,16 +2247,24 @@ out: CDEBUG(D_RPCTRACE, "service thread [ %p : %u ] %d exiting: rc %d\n", thread, thread->t_pid, thread->t_id, rc); - if (st.running) { - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_lock); + if ((thread->t_flags & SVC_STARTING) != 0) { + svc->srv_threads_starting--; + thread->t_flags &= ~SVC_STARTING; + } + + if ((thread->t_flags & SVC_RUNNING) != 0) { + /* must know immediately */ svc->srv_threads_running--; - cfs_spin_unlock(&svc->srv_lock); + thread->t_flags &= ~SVC_RUNNING; } - thread->t_id = rc; - thread->t_flags = SVC_STOPPED; + thread->t_id = rc; + thread->t_flags |= SVC_STOPPED; cfs_waitq_signal(&thread->t_ctl_waitq); + cfs_spin_unlock(&svc->srv_lock); + return rc; } @@ -2173,6 +2287,10 @@ static int hrt_dont_sleep(struct ptlrpc_hr_thread *t, return result; } +/** + * Main body of "handle reply" function. + * It processes acked reply states + */ static int ptlrpc_hr_main(void *arg) { struct ptlrpc_hr_args * hr_args = arg; @@ -2194,7 +2312,7 @@ static int ptlrpc_hr_main(void *arg) while (!cfs_test_bit(HRT_STOPPING, &t->hrt_flags)) { - l_cfs_wait_event(t->hrt_wait, hrt_dont_sleep(t, &replies)); + l_wait_condition(t->hrt_wait, hrt_dont_sleep(t, &replies)); while (!cfs_list_empty(&replies)) { struct ptlrpc_reply_state *rs; @@ -2229,7 +2347,7 @@ static int ptlrpc_start_hr_thread(struct ptlrpc_hr_service *hr, int n, int cpu) cfs_complete(&t->hrt_completion); GOTO(out, rc); } - l_cfs_wait_event(t->hrt_wait, cfs_test_bit(HRT_RUNNING, &t->hrt_flags)); + l_wait_condition(t->hrt_wait, cfs_test_bit(HRT_RUNNING, &t->hrt_flags)); RETURN(0); out: return rc; @@ -2318,6 +2436,9 @@ static void ptlrpc_stop_thread(struct ptlrpc_service *svc, EXIT; } +/** + * Stops all threads of a particular service \a svc + */ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc) { struct ptlrpc_thread *thread; @@ -2337,7 +2458,7 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc) EXIT; } -int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc) +int ptlrpc_start_threads(struct ptlrpc_service *svc) { int i, rc = 0; ENTRY; @@ -2346,10 +2467,12 @@ int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc) ptlrpc_server_handle_request */ LASSERT(svc->srv_threads_min >= 2); for (i = 0; i < svc->srv_threads_min; i++) { - rc = ptlrpc_start_thread(dev, svc); + rc = ptlrpc_start_thread(svc); /* We have enough threads, don't start more. b=15759 */ - if (rc == -EMFILE) + if (rc == -EMFILE) { + rc = 0; break; + } if (rc) { CERROR("cannot start %s thread #%d: rc %d\n", svc->srv_thread_name, i, rc); @@ -2360,25 +2483,25 @@ int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc) RETURN(rc); } -int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc) +int ptlrpc_start_thread(struct ptlrpc_service *svc) { struct l_wait_info lwi = { 0 }; struct ptlrpc_svc_data d; struct ptlrpc_thread *thread; char name[32]; - int id, rc; + int rc; ENTRY; CDEBUG(D_RPCTRACE, "%s started %d min %d max %d running %d\n", - svc->srv_name, svc->srv_threads_started, svc->srv_threads_min, + svc->srv_name, svc->srv_threads_running, svc->srv_threads_min, svc->srv_threads_max, svc->srv_threads_running); if (unlikely(svc->srv_is_stopping)) RETURN(-ESRCH); - if (unlikely(svc->srv_threads_started >= svc->srv_threads_max) || + if (!ptlrpc_threads_increasable(svc) || (OBD_FAIL_CHECK(OBD_FAIL_TGT_TOOMANY_THREADS) && - svc->srv_threads_started == svc->srv_threads_min - 1)) + svc->srv_threads_running == svc->srv_threads_min - 1)) RETURN(-EMFILE); OBD_ALLOC_PTR(thread); @@ -2387,19 +2510,21 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc) cfs_waitq_init(&thread->t_ctl_waitq); cfs_spin_lock(&svc->srv_lock); - if (svc->srv_threads_started >= svc->srv_threads_max) { + if (!ptlrpc_threads_increasable(svc)) { cfs_spin_unlock(&svc->srv_lock); OBD_FREE_PTR(thread); RETURN(-EMFILE); } + + svc->srv_threads_starting++; + thread->t_id = svc->srv_threads_next_id++; + thread->t_flags |= SVC_STARTING; + thread->t_svc = svc; + cfs_list_add(&thread->t_link, &svc->srv_threads); - id = svc->srv_threads_started++; cfs_spin_unlock(&svc->srv_lock); - thread->t_svc = svc; - thread->t_id = id; - sprintf(name, "%s_%02d", svc->srv_thread_name, id); - d.dev = dev; + sprintf(name, "%s_%02d", svc->srv_thread_name, thread->t_id); d.svc = svc; d.name = name; d.thread = thread; @@ -2415,7 +2540,7 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc) cfs_spin_lock(&svc->srv_lock); cfs_list_del(&thread->t_link); - --svc->srv_threads_started; + --svc->srv_threads_starting; cfs_spin_unlock(&svc->srv_lock); OBD_FREE(thread, sizeof(*thread)); @@ -2557,7 +2682,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) } /* schedule all outstanding replies to terminate them */ - cfs_spin_lock(&service->srv_lock); + cfs_spin_lock(&service->srv_rs_lock); while (!cfs_list_empty(&service->srv_active_replies)) { struct ptlrpc_reply_state *rs = cfs_list_entry(service->srv_active_replies.next, @@ -2566,7 +2691,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) ptlrpc_schedule_difficult_reply(rs); cfs_spin_unlock(&rs->rs_lock); } - cfs_spin_unlock(&service->srv_lock); + cfs_spin_unlock(&service->srv_rs_lock); /* purge the request queue. NB No new replies (rqbds all unlinked) * and no service threads, so I'm the only thread noodling the @@ -2580,17 +2705,17 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) cfs_list_del(&req->rq_list); service->srv_n_queued_reqs--; service->srv_n_active_reqs++; - ptlrpc_server_finish_request(req); + ptlrpc_server_finish_request(service, req); } while (ptlrpc_server_request_pending(service, 1)) { struct ptlrpc_request *req; - req = ptlrpc_server_request_get(service); + req = ptlrpc_server_request_get(service, 1); cfs_list_del(&req->rq_list); service->srv_n_queued_reqs--; service->srv_n_active_reqs++; ptlrpc_hpreq_fini(req); - ptlrpc_server_finish_request(req); + ptlrpc_server_finish_request(service, req); } LASSERT(service->srv_n_queued_reqs == 0); LASSERT(service->srv_n_active_reqs == 0); @@ -2635,7 +2760,8 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) RETURN(0); } -/* Returns 0 if the service is healthy. +/** + * Returns 0 if the service is healthy. * * Right now, it just checks to make sure that requests aren't languishing * in the queue. We'll use this health check to govern whether a node needs @@ -2651,9 +2777,9 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc) cfs_gettimeofday(&right_now); - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rq_lock); if (!ptlrpc_server_request_pending(svc, 1)) { - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); return 0; } @@ -2665,7 +2791,7 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc) request = cfs_list_entry(svc->srv_request_queue.next, struct ptlrpc_request, rq_list); timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL); - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 : at_max)) {