From b59c628da398cf4cf2f002e56d70b3b7d261d5c9 Mon Sep 17 00:00:00 2001 From: Jian Yu Date: Wed, 17 Nov 2010 10:15:26 +0800 Subject: [PATCH] b=23289 grained lock for ptlrpc service - cacheline optimization for struct ptlrpc_service - split ptlrpc_service::srv_lock to three locks: . ptlrpc_service::srv_lock serialize operations on rqbd and requests queued on srv_req_in_queue . ptlrpc_service::srv_rq_lock serialized operations on active requests . ptlrpc_service::srv_rs_lock serialize operations on RS - cleanup logic in ptlrpc_main() - remove unused atomic srv_outstanding_replies o=liang i=andreas.dilger i=mikhail.pershin --- lustre/include/lustre/lustre_idl.h | 7 - lustre/include/lustre_net.h | 269 +++++++++++++++++--------- lustre/ldlm/ldlm_lib.c | 9 +- lustre/mdt/mdt_recovery.c | 4 +- lustre/ptlrpc/events.c | 5 +- lustre/ptlrpc/lproc_ptlrpc.c | 8 +- lustre/ptlrpc/niobuf.c | 5 +- lustre/ptlrpc/pack_generic.c | 12 +- lustre/ptlrpc/service.c | 382 +++++++++++++++++++++---------------- 9 files changed, 415 insertions(+), 286 deletions(-) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index eefff59..4dc97ad 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -140,13 +140,6 @@ /* Portal 63 is reserved for the Cray Inc DVS - nic@cray.com, roe@cray.com, n8851@cray.com */ -#define SVC_KILLED 1 -#define SVC_EVENT 2 -#define SVC_SIGNAL 4 -#define SVC_RUNNING 8 -#define SVC_STOPPING 16 -#define SVC_STOPPED 32 - /* packet types */ #define PTL_RPC_MSG_REQUEST 4711 #define PTL_RPC_MSG_ERR 4712 diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index be610d7..2c6ba51 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -945,6 +945,15 @@ struct ptlrpc_bulk_desc { #endif }; +enum { + SVC_STOPPED = 1 << 0, + SVC_STOPPING = 1 << 1, + SVC_STARTING = 1 << 2, + SVC_RUNNING = 1 << 3, + SVC_EVENT = 1 << 4, + SVC_SIGNAL = 1 << 5, +}; + /** * Definition of server service thread structure */ @@ -1005,9 +1014,16 @@ struct ptlrpc_request_buffer_desc { struct ptlrpc_request rqbd_req; }; -typedef int (*svc_handler_t)(struct ptlrpc_request *req); -typedef void (*svcreq_printfn_t)(void *, struct ptlrpc_request *); -typedef int (*svc_hpreq_handler_t)(struct ptlrpc_request *); +typedef int (*svc_thr_init_t)(struct ptlrpc_thread *thread); +typedef void (*svc_thr_done_t)(struct ptlrpc_thread *thread); +typedef int (*svc_handler_t)(struct ptlrpc_request *req); +typedef int (*svc_hpreq_handler_t)(struct ptlrpc_request *); +typedef void (*svc_req_printfn_t)(void *, struct ptlrpc_request *); + +#ifndef __cfs_cacheline_aligned +/* NB: put it here for reducing patche dependence */ +# define __cfs_cacheline_aligned +#endif /** * How many high priority requests to serve before serving one normal @@ -1020,106 +1036,181 @@ typedef int (*svc_hpreq_handler_t)(struct ptlrpc_request *); * The service is listening on a particular portal (like tcp port) * and perform actions for a specific server like IO service for OST * or general metadata service for MDS. + * + * ptlrpc service has four locks: + * \a srv_lock + * serialize operations on rqbd and requests waiting for preprocess + * \a srv_rq_lock + * serialize operations active requests sent to this portal + * \a srv_at_lock + * serialize adaptive timeout stuff + * \a srv_rs_lock + * serialize operations on RS list (reply states) + * + * We don't have any use-case to take two or more locks at the same time + * for now, so there is no lock order issue. */ struct ptlrpc_service { - cfs_list_t srv_list; /* chain thru all services */ - int srv_max_req_size; /* biggest request to receive */ - int srv_max_reply_size; /* biggest reply to send */ - int srv_buf_size; /* size of individual buffers */ - int srv_nbuf_per_group; /* # buffers to allocate in 1 group */ - int srv_nbufs; /* total # req buffer descs allocated */ - int srv_threads_min; /* threads to start at SOW */ - int srv_threads_max; /* thread upper limit */ - int srv_threads_started; /* index of last started thread */ - int srv_threads_running; /* # running threads */ - cfs_atomic_t srv_n_difficult_replies; /* # 'difficult' replies */ - int srv_n_active_reqs; /* # reqs being served */ - int srv_n_hpreq; /* # HPreqs being served */ - cfs_duration_t srv_rqbd_timeout; /* timeout before re-posting reqs, in tick */ - int srv_watchdog_factor; /* soft watchdog timeout multiplier */ - unsigned srv_cpu_affinity:1; /* bind threads to CPUs */ - unsigned srv_at_check:1; /* check early replies */ - unsigned srv_is_stopping:1; /* under unregister_service */ - cfs_time_t srv_at_checktime; /* debug */ - - /** Local portal on which to receive requests */ - __u32 srv_req_portal; - /** Portal on the client to send replies to */ - __u32 srv_rep_portal; - - /** AT stuff */ + /** most often accessed fields */ + /** chain thru all services */ + cfs_list_t srv_list; + /** only statically allocated strings here; we don't clean them */ + char *srv_name; + /** only statically allocated strings here; we don't clean them */ + char *srv_thread_name; + /** service thread list */ + cfs_list_t srv_threads; + /** threads to start at beginning of service */ + int srv_threads_min; + /** thread upper limit */ + int srv_threads_max; + /** always increasing number */ + unsigned srv_threads_next_id; + /** # of starting threads */ + int srv_threads_starting; + /** # running threads */ + int srv_threads_running; + + /** service operations, move to ptlrpc_svc_ops_t in the future */ /** @{ */ - struct adaptive_timeout srv_at_estimate;/* estimated rpc service time */ - cfs_spinlock_t srv_at_lock; - struct ptlrpc_at_array srv_at_array; /* reqs waiting for replies */ - cfs_timer_t srv_at_timer; /* early reply timer */ - /** @} */ - - int srv_n_queued_reqs; /* # reqs in either of the queues below */ - int srv_hpreq_count; /* # hp requests handled */ - int srv_hpreq_ratio; /* # hp per lp reqs to handle */ - cfs_list_t srv_req_in_queue; /* incoming reqs */ - cfs_list_t srv_request_queue; /* reqs waiting for service */ - cfs_list_t srv_request_hpq; /* high priority queue */ - - cfs_list_t srv_request_history; /* request history */ - __u64 srv_request_seq; /* next request sequence # */ - __u64 srv_request_max_cull_seq; /* highest seq culled from history */ - svcreq_printfn_t srv_request_history_print_fn; /* service-specific print fn */ - - cfs_list_t srv_idle_rqbds; /* request buffers to be reposted */ - cfs_list_t srv_active_rqbds; /* req buffers receiving */ - cfs_list_t srv_history_rqbds; /* request buffer history */ - int srv_nrqbd_receiving; /* # posted request buffers */ - int srv_n_history_rqbds; /* # request buffers in history */ - int srv_max_history_rqbds;/* max # request buffers in history */ - - cfs_atomic_t srv_outstanding_replies; - cfs_list_t srv_active_replies; /* all the active replies */ -#ifndef __KERNEL__ - cfs_list_t srv_reply_queue; /* replies waiting for service */ -#endif - cfs_waitq_t srv_waitq; /* all threads sleep on this. This - * wait-queue is signalled when new - * incoming request arrives and when - * difficult reply has to be handled. */ - - cfs_list_t srv_threads; /* service thread list */ + /** + * if non-NULL called during thread creation (ptlrpc_start_thread()) + * to initialize service specific per-thread state. + */ + svc_thr_init_t srv_init; + /** + * if non-NULL called during thread shutdown (ptlrpc_main()) to + * destruct state created by ->srv_init(). + */ + svc_thr_done_t srv_done; /** Handler function for incoming requests for this service */ - svc_handler_t srv_handler; - svc_hpreq_handler_t srv_hpreq_handler; /* hp request handler */ - - char *srv_name; /* only statically allocated strings here; we don't clean them */ - char *srv_thread_name; /* only statically allocated strings here; we don't clean them */ - - cfs_spinlock_t srv_lock; + svc_handler_t srv_handler; + /** hp request handler */ + svc_hpreq_handler_t srv_hpreq_handler; + /** service-specific print fn */ + svc_req_printfn_t srv_req_printfn; + /** @} */ /** Root of /proc dir tree for this service */ - cfs_proc_dir_entry_t *srv_procroot; + cfs_proc_dir_entry_t *srv_procroot; /** Pointer to statistic data for this service */ - struct lprocfs_stats *srv_stats; - - /** List of free reply_states */ - cfs_list_t srv_free_rs_list; - /** waitq to run, when adding stuff to srv_free_rs_list */ - cfs_waitq_t srv_free_rs_waitq; - + struct lprocfs_stats *srv_stats; + /** # hp per lp reqs to handle */ + int srv_hpreq_ratio; + /** biggest request to receive */ + int srv_max_req_size; + /** biggest reply to send */ + int srv_max_reply_size; + /** size of individual buffers */ + int srv_buf_size; + /** # buffers to allocate in 1 group */ + int srv_nbuf_per_group; + /** Local portal on which to receive requests */ + __u32 srv_req_portal; + /** Portal on the client to send replies to */ + __u32 srv_rep_portal; /** * Tags for lu_context associated with this thread, see struct * lu_context. */ - __u32 srv_ctx_tags; + __u32 srv_ctx_tags; + /** soft watchdog timeout multiplier */ + int srv_watchdog_factor; + /** bind threads to CPUs */ + unsigned srv_cpu_affinity:1; + /** under unregister_service */ + unsigned srv_is_stopping:1; + /** - * if non-NULL called during thread creation (ptlrpc_start_thread()) - * to initialize service specific per-thread state. + * serialize the following fields, used for protecting + * rqbd list and incoming requests waiting for preprocess */ - int (*srv_init)(struct ptlrpc_thread *thread); + cfs_spinlock_t srv_lock __cfs_cacheline_aligned; + /** incoming reqs */ + cfs_list_t srv_req_in_queue; + /** total # req buffer descs allocated */ + int srv_nbufs; + /** # posted request buffers */ + int srv_nrqbd_receiving; + /** timeout before re-posting reqs, in tick */ + cfs_duration_t srv_rqbd_timeout; + /** request buffers to be reposted */ + cfs_list_t srv_idle_rqbds; + /** req buffers receiving */ + cfs_list_t srv_active_rqbds; + /** request buffer history */ + cfs_list_t srv_history_rqbds; + /** # request buffers in history */ + int srv_n_history_rqbds; + /** max # request buffers in history */ + int srv_max_history_rqbds; + /** request history */ + cfs_list_t srv_request_history; + /** next request sequence # */ + __u64 srv_request_seq; + /** highest seq culled from history */ + __u64 srv_request_max_cull_seq; /** - * if non-NULL called during thread shutdown (ptlrpc_main()) to - * destruct state created by ->srv_init(). + * all threads sleep on this. This wait-queue is signalled when new + * incoming request arrives and when difficult reply has to be handled. + */ + cfs_waitq_t srv_waitq; + + /** + * serialize the following fields, used for processing requests + * sent to this portal */ - void (*srv_done)(struct ptlrpc_thread *thread); + cfs_spinlock_t srv_rq_lock __cfs_cacheline_aligned; + /** # reqs in either of the queues below */ + /** reqs waiting for service */ + cfs_list_t srv_request_queue; + /** high priority queue */ + cfs_list_t srv_request_hpq; + /** # incoming reqs */ + int srv_n_queued_reqs; + /** # reqs being served */ + int srv_n_active_reqs; + /** # HPreqs being served */ + int srv_n_active_hpreq; + /** # hp requests handled */ + int srv_hpreq_count; + + /** AT stuff */ + /** @{ */ + /** + * serialize the following fields, used for changes on + * adaptive timeout + */ + cfs_spinlock_t srv_at_lock __cfs_cacheline_aligned; + /** estimated rpc service time */ + struct adaptive_timeout srv_at_estimate; + /** reqs waiting for replies */ + struct ptlrpc_at_array srv_at_array; + /** early reply timer */ + cfs_timer_t srv_at_timer; + /** check early replies */ + unsigned srv_at_check; + /** debug */ + cfs_time_t srv_at_checktime; + /** @} */ + /** + * serialize the following fields, used for processing + * replies for this portal + */ + cfs_spinlock_t srv_rs_lock __cfs_cacheline_aligned; + /** all the active replies */ + cfs_list_t srv_active_replies; +#ifndef __KERNEL__ + /** replies waiting for service */ + cfs_list_t srv_reply_queue; +#endif + /** List of free reply_states */ + cfs_list_t srv_free_rs_list; + /** waitq to run, when adding stuff to srv_free_rs_list */ + cfs_waitq_t srv_free_rs_waitq; + /** # 'difficult' replies */ + cfs_atomic_t srv_n_difficult_replies; //struct ptlrpc_srv_ni srv_interfaces[0]; }; @@ -1392,7 +1483,7 @@ void ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs); struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c, svc_handler_t h, char *name, struct proc_dir_entry *proc_entry, - svcreq_printfn_t prntfn, + svc_req_printfn_t prntfn, char *threadname); struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, @@ -1401,7 +1492,7 @@ struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int watchdog_factor, svc_handler_t, char *name, cfs_proc_dir_entry_t *proc_entry, - svcreq_printfn_t, + svc_req_printfn_t, int min_threads, int max_threads, char *threadname, __u32 ctx_tags, svc_hpreq_handler_t); diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 93f2366..5530278 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -568,12 +568,12 @@ int server_disconnect_export(struct obd_export *exp) struct ptlrpc_reply_state, rs_exp_list); struct ptlrpc_service *svc = rs->rs_service; - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rs_lock); cfs_list_del_init(&rs->rs_exp_list); cfs_spin_lock(&rs->rs_lock); ptlrpc_schedule_difficult_reply(rs); cfs_spin_unlock(&rs->rs_lock); - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rs_lock); } cfs_spin_unlock(&exp->exp_lock); @@ -2184,7 +2184,7 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) netrc = target_send_reply_msg (req, rc, fail_id); - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rs_lock); cfs_atomic_inc(&svc->srv_n_difficult_replies); @@ -2196,7 +2196,6 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) * reply_out_callback leaves alone) */ rs->rs_on_net = 0; ptlrpc_rs_addref(rs); - cfs_atomic_inc (&svc->srv_outstanding_replies); } cfs_spin_lock(&rs->rs_lock); @@ -2211,7 +2210,7 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) rs->rs_scheduled = 0; /* allow notifier to schedule */ } cfs_spin_unlock(&rs->rs_lock); - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rs_lock); EXIT; } diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index a8327bb..e125d5b 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -978,7 +978,7 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req) oldrep->rs_opc); svc = oldrep->rs_service; - cfs_spin_lock (&svc->srv_lock); + cfs_spin_lock (&svc->srv_rs_lock); cfs_list_del_init (&oldrep->rs_exp_list); @@ -998,7 +998,7 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req) ptlrpc_schedule_difficult_reply (oldrep); cfs_spin_unlock(&oldrep->rs_lock); - cfs_spin_unlock (&svc->srv_lock); + cfs_spin_unlock (&svc->srv_rs_lock); break; } cfs_spin_unlock(&exp->exp_lock); diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 2c93be1..5c37070 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -339,7 +339,6 @@ void reply_out_callback(lnet_event_t *ev) * net's ref on 'rs' */ LASSERT (ev->unlinked); ptlrpc_rs_decref(rs); - cfs_atomic_dec (&svc->srv_outstanding_replies); EXIT; return; } @@ -349,14 +348,14 @@ void reply_out_callback(lnet_event_t *ev) if (ev->unlinked) { /* Last network callback. The net's ref on 'rs' stays put * until ptlrpc_handle_rs() is done with it */ - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rs_lock); cfs_spin_lock(&rs->rs_lock); rs->rs_on_net = 0; if (!rs->rs_no_ack || rs->rs_transno <= rs->rs_export->exp_obd->obd_last_committed) ptlrpc_schedule_difficult_reply (rs); cfs_spin_unlock(&rs->rs_lock); - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rs_lock); } EXIT; diff --git a/lustre/ptlrpc/lproc_ptlrpc.c b/lustre/ptlrpc/lproc_ptlrpc.c index 8b77941a..b42c41b 100644 --- a/lustre/ptlrpc/lproc_ptlrpc.c +++ b/lustre/ptlrpc/lproc_ptlrpc.c @@ -333,7 +333,7 @@ ptlrpc_lprocfs_rd_threads_started(char *page, char **start, off_t off, { struct ptlrpc_service *svc = data; - return snprintf(page, count, "%d\n", svc->srv_threads_started); + return snprintf(page, count, "%d\n", svc->srv_threads_running); } static int @@ -470,7 +470,7 @@ ptlrpc_lprocfs_svc_req_history_next(struct seq_file *s, return srhi; } -/* common ost/mdt srv_request_history_print_fn */ +/* common ost/mdt srv_req_printfn */ void target_print_req(void *seq_file, struct ptlrpc_request *req) { /* Called holding srv_lock with irqs disabled. @@ -527,10 +527,10 @@ static int ptlrpc_lprocfs_svc_req_history_show(struct seq_file *s, void *iter) req->rq_arrival_time.tv_sec, req->rq_sent - req->rq_arrival_time.tv_sec, req->rq_sent - req->rq_deadline); - if (svc->srv_request_history_print_fn == NULL) + if (svc->srv_req_printfn == NULL) seq_printf(s, "\n"); else - svc->srv_request_history_print_fn(s, srhi->srhi_req); + svc->srv_req_printfn(s, srhi->srhi_req); } cfs_spin_unlock(&svc->srv_lock); diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 515014e..240b226 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -463,7 +463,6 @@ int ptlrpc_send_reply(struct ptlrpc_request *req, int flags) CERROR("not replying on NULL connection\n"); /* bug 9635 */ return -ENOTCONN; } - cfs_atomic_inc (&svc->srv_outstanding_replies); ptlrpc_rs_addref(rs); /* +1 ref for the network */ rc = sptlrpc_svc_wrap_reply(req); @@ -478,10 +477,8 @@ int ptlrpc_send_reply(struct ptlrpc_request *req, int flags) &rs->rs_cb_id, conn, svc->srv_rep_portal, req->rq_xid, req->rq_reply_off); out: - if (unlikely(rc != 0)) { - cfs_atomic_dec (&svc->srv_outstanding_replies); + if (unlikely(rc != 0)) ptlrpc_req_drop_rs(req); - } ptlrpc_connection_put(conn); return rc; } diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 56558e5..da4719f 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -270,12 +270,12 @@ struct ptlrpc_reply_state *lustre_get_emerg_rs(struct ptlrpc_service *svc) { struct ptlrpc_reply_state *rs = NULL; - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rs_lock); /* See if we have anything in a pool, and wait if nothing */ while (cfs_list_empty(&svc->srv_free_rs_list)) { struct l_wait_info lwi; int rc; - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rs_lock); /* If we cannot get anything for some long time, we better bail out instead of waiting infinitely */ lwi = LWI_TIMEOUT(cfs_time_seconds(10), NULL, NULL); @@ -284,13 +284,13 @@ struct ptlrpc_reply_state *lustre_get_emerg_rs(struct ptlrpc_service *svc) &lwi); if (rc) goto out; - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rs_lock); } rs = cfs_list_entry(svc->srv_free_rs_list.next, struct ptlrpc_reply_state, rs_list); cfs_list_del(&rs->rs_list); - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rs_lock); LASSERT(rs); memset(rs, 0, svc->srv_max_reply_size); rs->rs_service = svc; @@ -305,9 +305,9 @@ void lustre_put_emerg_rs(struct ptlrpc_reply_state *rs) LASSERT(svc); - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rs_lock); cfs_list_add(&rs->rs_list, &svc->srv_free_rs_list); - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rs_lock); cfs_waitq_signal(&svc->srv_free_rs_waitq); } diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index f3cf022..299398d 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -286,9 +286,9 @@ static void rs_batch_add(struct rs_batch *b, struct ptlrpc_reply_state *rs) if (svc != b->rsb_svc || b->rsb_n_replies >= MAX_SCHEDULED) { if (b->rsb_svc != NULL) { rs_batch_dispatch(b); - cfs_spin_unlock(&b->rsb_svc->srv_lock); + cfs_spin_unlock(&b->rsb_svc->srv_rs_lock); } - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rs_lock); b->rsb_svc = svc; } cfs_spin_lock(&rs->rs_lock); @@ -313,7 +313,7 @@ static void rs_batch_fini(struct rs_batch *b) { if (b->rsb_svc != 0) { rs_batch_dispatch(b); - cfs_spin_unlock(&b->rsb_svc->srv_lock); + cfs_spin_unlock(&b->rsb_svc->srv_rs_lock); } } @@ -357,12 +357,12 @@ ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs) { ENTRY; - LASSERT_SPIN_LOCKED(&rs->rs_service->srv_lock); + LASSERT_SPIN_LOCKED(&rs->rs_service->srv_rs_lock); LASSERT_SPIN_LOCKED(&rs->rs_lock); LASSERT (rs->rs_difficult); - rs->rs_scheduled_ever = 1; /* flag any notification attempt */ + rs->rs_scheduled_ever = 1; /* flag any notification attempt */ - if (rs->rs_scheduled) { /* being set up or already notified */ + if (rs->rs_scheduled) { /* being set up or already notified */ EXIT; return; } @@ -454,7 +454,7 @@ ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc) struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c, svc_handler_t h, char *name, struct proc_dir_entry *proc_entry, - svcreq_printfn_t prntfn, + svc_req_printfn_t prntfn, char *threadname) { return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize, @@ -499,7 +499,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, int req_portal, int rep_portal, int watchdog_factor, svc_handler_t handler, char *name, cfs_proc_dir_entry_t *proc_entry, - svcreq_printfn_t svcreq_printfn, + svc_req_printfn_t svcreq_printfn, int min_threads, int max_threads, char *threadname, __u32 ctx_tags, svc_hpreq_handler_t hp_handler) @@ -522,6 +522,8 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, service->srv_name = name; cfs_spin_lock_init(&service->srv_lock); + cfs_spin_lock_init(&service->srv_rq_lock); + cfs_spin_lock_init(&service->srv_rs_lock); CFS_INIT_LIST_HEAD(&service->srv_threads); cfs_waitq_init(&service->srv_waitq); @@ -532,7 +534,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, service->srv_req_portal = req_portal; service->srv_watchdog_factor = watchdog_factor; service->srv_handler = handler; - service->srv_request_history_print_fn = svcreq_printfn; + service->srv_req_printfn = svcreq_printfn; service->srv_request_seq = 1; /* valid seq #s start at 1 */ service->srv_request_max_cull_seq = 0; service->srv_threads_min = min_threads; @@ -542,7 +544,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, service->srv_hpreq_handler = hp_handler; service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO; service->srv_hpreq_count = 0; - service->srv_n_hpreq = 0; + service->srv_n_active_hpreq = 0; rc = LNetSetLazyPortal(service->srv_req_portal); LASSERT (rc == 0); @@ -648,9 +650,9 @@ void ptlrpc_server_active_request_inc(struct ptlrpc_request *req) struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd; struct ptlrpc_service *svc = rqbd->rqbd_service; - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rq_lock); svc->srv_n_active_reqs++; - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); } /** @@ -661,9 +663,9 @@ void ptlrpc_server_active_request_dec(struct ptlrpc_request *req) struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd; struct ptlrpc_service *svc = rqbd->rqbd_service; - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rq_lock); svc->srv_n_active_reqs--; - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); } /** @@ -705,7 +707,6 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req) cfs_spin_lock(&svc->srv_lock); - svc->srv_n_active_reqs--; cfs_list_add(&req->rq_list, &rqbd->rqbd_reqs); refcount = --(rqbd->rqbd_refcount); @@ -776,8 +777,15 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req) * to finish a request: stop sending more early replies, and release * the request. should be called after we finished handling the request. */ -static void ptlrpc_server_finish_request(struct ptlrpc_request *req) +static void ptlrpc_server_finish_request(struct ptlrpc_service *svc, + struct ptlrpc_request *req) { + cfs_spin_lock(&svc->srv_rq_lock); + svc->srv_n_active_reqs--; + if (req->rq_hp) + svc->srv_n_active_hpreq--; + cfs_spin_unlock(&svc->srv_rq_lock); + ptlrpc_server_drop_request(req); } @@ -1295,12 +1303,12 @@ void ptlrpc_hpreq_reorder(struct ptlrpc_request *req) struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; ENTRY; - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rq_lock); /* It may happen that the request is already taken for the processing * but still in the export list, do not re-add it into the HP list. */ if (req->rq_phase == RQ_PHASE_NEW) ptlrpc_hpreq_reorder_nolock(svc, req); - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); EXIT; } @@ -1332,7 +1340,7 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc, if (rc < 0) RETURN(rc); - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rq_lock); /* Before inserting the request into the queue, check if it is not * inserted yet, or even already handled -- it may happen due to * a racing ldlm_server_blocking_ast(). */ @@ -1343,22 +1351,74 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc, cfs_list_add_tail(&req->rq_list, &svc->srv_request_queue); } - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); RETURN(0); } /** + * Allow to handle high priority request + * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock + * to get reliable result + */ +static int ptlrpc_server_allow_high(struct ptlrpc_service *svc, int force) +{ + if (force) + return 1; + + if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1) + return 0; + + return cfs_list_empty(&svc->srv_request_queue) || + svc->srv_hpreq_count < svc->srv_hpreq_ratio; +} + +static int ptlrpc_server_high_pending(struct ptlrpc_service *svc, int force) +{ + return ptlrpc_server_allow_high(svc, force) && + !cfs_list_empty(&svc->srv_request_hpq); +} + +/** * Only allow normal priority requests on a service that has a high-priority * queue if forced (i.e. cleanup), if there are other high priority requests * already being processed (i.e. those threads can service more high-priority * requests), or if there are enough idle threads that a later thread can do * a high priority request. + * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock + * to get reliable result */ static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force) { - return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 || - svc->srv_threads_running <= svc->srv_threads_started - 2; + if (force || + svc->srv_n_active_reqs < svc->srv_threads_running - 2) + return 1; + + if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1) + return 0; + + return svc->srv_n_active_hpreq > 0 || svc->srv_hpreq_handler == NULL; +} + +static int ptlrpc_server_normal_pending(struct ptlrpc_service *svc, int force) +{ + return ptlrpc_server_allow_normal(svc, force) && + !cfs_list_empty(&svc->srv_request_queue); +} + +/** + * Returns true if there are requests available in incoming + * request queue for processing and it is allowed to fetch them. + * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock + * to get reliable result + * \see ptlrpc_server_allow_normal + * \see ptlrpc_server_allow high + */ +static inline int +ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force) +{ + return ptlrpc_server_high_pending(svc, force) || + ptlrpc_server_normal_pending(svc, force); } /** @@ -1369,34 +1429,24 @@ static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force) static struct ptlrpc_request * ptlrpc_server_request_get(struct ptlrpc_service *svc, int force) { - struct ptlrpc_request *req = NULL; + struct ptlrpc_request *req; ENTRY; - if (ptlrpc_server_allow_normal(svc, force) && - !cfs_list_empty(&svc->srv_request_queue) && - (cfs_list_empty(&svc->srv_request_hpq) || - svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) { - req = cfs_list_entry(svc->srv_request_queue.next, - struct ptlrpc_request, rq_list); - svc->srv_hpreq_count = 0; - } else if (!cfs_list_empty(&svc->srv_request_hpq)) { + if (ptlrpc_server_high_pending(svc, force)) { req = cfs_list_entry(svc->srv_request_hpq.next, struct ptlrpc_request, rq_list); svc->srv_hpreq_count++; + RETURN(req); + } - RETURN(req); -} -/** - * Returns true if there are requests available in incoming - * request queue for processing and it is allowed to fetch them - * \see ptlrpc_server_allow_normal - */ -static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force) -{ - return ((ptlrpc_server_allow_normal(svc, force) && - !cfs_list_empty(&svc->srv_request_queue)) || - !cfs_list_empty(&svc->srv_request_hpq)); + if (ptlrpc_server_normal_pending(svc, force)) { + req = cfs_list_entry(svc->srv_request_queue.next, + struct ptlrpc_request, rq_list); + svc->srv_hpreq_count = 0; + RETURN(req); + } + RETURN(NULL); } /** @@ -1424,6 +1474,7 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc) req = cfs_list_entry(svc->srv_req_in_queue.next, struct ptlrpc_request, rq_list); cfs_list_del_init (&req->rq_list); + svc->srv_n_queued_reqs--; /* Consider this still a "queued" request as far as stats are concerned */ cfs_spin_unlock(&svc->srv_lock); @@ -1538,11 +1589,10 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc) RETURN(1); err_req: - cfs_spin_lock(&svc->srv_lock); - svc->srv_n_queued_reqs--; + cfs_spin_lock(&svc->srv_rq_lock); svc->srv_n_active_reqs++; - cfs_spin_unlock(&svc->srv_lock); - ptlrpc_server_finish_request(req); + cfs_spin_unlock(&svc->srv_rq_lock); + ptlrpc_server_finish_request(svc, req); RETURN(1); } @@ -1566,17 +1616,17 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, LASSERT(svc); - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rq_lock); #ifndef __KERNEL__ /* !@%$# liblustre only has 1 thread */ if (cfs_atomic_read(&svc->srv_n_difficult_replies) != 0) { - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); RETURN(0); } #endif request = ptlrpc_server_request_get(svc, 0); if (request == NULL) { - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); RETURN(0); } @@ -1588,27 +1638,26 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, if (unlikely(fail_opc)) { if (request->rq_export && request->rq_ops) { - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); OBD_FAIL_TIMEOUT(fail_opc, 4); - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rq_lock); request = ptlrpc_server_request_get(svc, 0); if (request == NULL) { - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); RETURN(0); } } } cfs_list_del_init(&request->rq_list); - svc->srv_n_queued_reqs--; svc->srv_n_active_reqs++; if (request->rq_hp) - svc->srv_n_hpreq++; + svc->srv_n_active_hpreq++; /* The phase is changed under the lock here because we need to know * the request is under processing (see ptlrpc_hpreq_reorder()). */ ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET); - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); ptlrpc_hpreq_fini(request); @@ -1740,11 +1789,7 @@ put_conn: } out_req: - cfs_spin_lock(&svc->srv_lock); - if (request->rq_hp) - svc->srv_n_hpreq--; - cfs_spin_unlock(&svc->srv_lock); - ptlrpc_server_finish_request(request); + ptlrpc_server_finish_request(svc, request); RETURN(1); } @@ -1845,7 +1890,6 @@ ptlrpc_handle_rs (struct ptlrpc_reply_state *rs) class_export_put (exp); rs->rs_export = NULL; ptlrpc_rs_decref (rs); - cfs_atomic_dec (&svc->srv_outstanding_replies); if (cfs_atomic_dec_and_test(&svc->srv_n_difficult_replies) && svc->srv_is_stopping) cfs_waitq_broadcast(&svc->srv_waitq); @@ -1873,14 +1917,14 @@ ptlrpc_server_handle_reply(struct ptlrpc_service *svc) struct ptlrpc_reply_state *rs = NULL; ENTRY; - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rs_lock); if (!cfs_list_empty(&svc->srv_reply_queue)) { rs = cfs_list_entry(svc->srv_reply_queue.prev, struct ptlrpc_reply_state, rs_list); cfs_list_del_init(&rs->rs_list); } - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rs_lock); if (rs != NULL) ptlrpc_handle_rs(rs); RETURN(rs != NULL); @@ -1960,79 +2004,63 @@ ptlrpc_retry_rqbds(void *arg) return (-ETIMEDOUT); } -/** - * Status bits to pass todo info from - * ptlrpc_main_check_event to ptlrpc_main. - */ -#define PTLRPC_MAIN_STOPPING 0x01 -#define PTLRPC_MAIN_IN_REQ 0x02 -#define PTLRPC_MAIN_ACTIVE_REQ 0x04 -#define PTLRPC_MAIN_CHECK_TIMED 0x08 -#define PTLRPC_MAIN_REPOST 0x10 +static inline int +ptlrpc_threads_enough(struct ptlrpc_service *svc) +{ + return svc->srv_n_active_reqs < + svc->srv_threads_running - 1 - (svc->srv_hpreq_handler != NULL); +} /** - * A container to share per-thread status variables between - * ptlrpc_main_check_event and ptlrpc_main functions. + * allowed to create more threads + * user can call it w/o any lock but need to hold ptlrpc_service::srv_lock to + * get reliable result */ -struct ptlrpc_main_check_s { - /** todo info for the ptrlrpc_main */ - int todo; - /** is this thread counted as running or not? */ - int running; -}; +static inline int +ptlrpc_threads_increasable(struct ptlrpc_service *svc) +{ + return svc->srv_threads_running + + svc->srv_threads_starting < svc->srv_threads_max; +} /** - * Check whether current service thread has work to do. + * too many requests and allowed to create more threads */ -static int ptlrpc_main_check_event(struct ptlrpc_thread *t, - struct ptlrpc_main_check_s *status) +static inline int +ptlrpc_threads_need_create(struct ptlrpc_service *svc) { - struct ptlrpc_service *svc = t->t_svc; - ENTRY; + return !ptlrpc_threads_enough(svc) && ptlrpc_threads_increasable(svc); +} - status->todo = 0; +static inline int +ptlrpc_thread_stopping(struct ptlrpc_thread *thread) +{ + return (thread->t_flags & SVC_STOPPING) != 0 || + thread->t_svc->srv_is_stopping; +} - /* check the stop flags w/o any locking to make all - * concurrently running threads stop faster. */ - if (unlikely((t->t_flags & SVC_STOPPING) || - svc->srv_is_stopping)) { - status->todo |= PTLRPC_MAIN_STOPPING; - goto out; - } +static inline int +ptlrpc_rqbd_pending(struct ptlrpc_service *svc) +{ + return !cfs_list_empty(&svc->srv_idle_rqbds) && + svc->srv_rqbd_timeout == 0; +} - cfs_spin_lock(&svc->srv_lock); - /* ptlrpc_server_request_pending() needs this thread to be - * counted as running. */ - if (!status->running) { - svc->srv_threads_running++; - status->running = 1; - } - /* Process all incoming reqs before handling any */ - if (!cfs_list_empty(&svc->srv_req_in_queue)) { - status->todo |= PTLRPC_MAIN_IN_REQ; - } - /* Don't handle regular requests in the last thread, in order - * to handle any incoming reqs, early replies, etc. */ - if (ptlrpc_server_request_pending(svc, 0) && - svc->srv_threads_running <= svc->srv_threads_started - 1) { - status->todo |= PTLRPC_MAIN_ACTIVE_REQ; - } - if (svc->srv_at_check) { - status->todo |= PTLRPC_MAIN_CHECK_TIMED; - } - if (!cfs_list_empty(&svc->srv_idle_rqbds) && - svc->srv_rqbd_timeout == 0) { - status->todo |= PTLRPC_MAIN_REPOST; - } - /* count this thread as not running if it is going to sleep in - * the outer wait event */ - if (!status->todo) { - svc->srv_threads_running--; - status->running = 0; - } - cfs_spin_unlock(&svc->srv_lock); - out: - RETURN(status->todo); +static inline int +ptlrpc_at_check(struct ptlrpc_service *svc) +{ + return svc->srv_at_check; +} + +/** + * requests wait on preprocessing + * user can call it w/o any lock but need to hold ptlrpc_service::srv_lock to + * get reliable result + */ +static inline int +ptlrpc_server_request_waiting(struct ptlrpc_service *svc) +{ + return !cfs_list_empty(&svc->srv_req_in_queue); } /** @@ -2047,7 +2075,6 @@ static int ptlrpc_main(void *arg) struct ptlrpc_service *svc = data->svc; struct ptlrpc_thread *thread = data->thread; struct ptlrpc_reply_state *rs; - struct ptlrpc_main_check_s st; #ifdef WITH_GROUP_INFO cfs_group_info_t *ginfo = NULL; #endif @@ -2111,11 +2138,17 @@ static int ptlrpc_main(void *arg) } cfs_spin_lock(&svc->srv_lock); + + LASSERT((thread->t_flags & SVC_STARTING) != 0); + thread->t_flags &= ~SVC_STARTING; + svc->srv_threads_starting--; + /* SVC_STOPPING may already be set here if someone else is trying * to stop the service while this new thread has been dynamically * forked. We still set SVC_RUNNING to let our creator know that * we are now running, however we will exit as soon as possible */ thread->t_flags |= SVC_RUNNING; + svc->srv_threads_running++; cfs_spin_unlock(&svc->srv_lock); /* @@ -2126,20 +2159,16 @@ static int ptlrpc_main(void *arg) thread->t_watchdog = lc_watchdog_add(CFS_GET_TIMEOUT(svc), NULL, NULL); - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rs_lock); cfs_list_add(&rs->rs_list, &svc->srv_free_rs_list); - cfs_spin_unlock(&svc->srv_lock); cfs_waitq_signal(&svc->srv_free_rs_waitq); + cfs_spin_unlock(&svc->srv_rs_lock); CDEBUG(D_NET, "service thread %d (#%d) started\n", thread->t_id, svc->srv_threads_running); /* XXX maintain a list of all managed devices: insert here */ - - st.running = 0; - st.todo = 0; - - while (!(st.todo & PTLRPC_MAIN_STOPPING)) { + while (!ptlrpc_thread_stopping(thread)) { /* Don't exit while there are replies to be handled */ struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout, ptlrpc_retry_rqbds, svc); @@ -2148,36 +2177,44 @@ static int ptlrpc_main(void *arg) cfs_cond_resched(); - l_wait_event_exclusive (svc->srv_waitq, - ptlrpc_main_check_event(thread, &st), - &lwi); + l_wait_event_exclusive(svc->srv_waitq, + ptlrpc_thread_stopping(thread) || + ptlrpc_server_request_waiting(svc) || + ptlrpc_server_request_pending(svc, 0) || + ptlrpc_rqbd_pending(svc) || + ptlrpc_at_check(svc), &lwi); + + if (ptlrpc_thread_stopping(thread)) + break; lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc)); ptlrpc_check_rqbd_pool(svc); - if (svc->srv_threads_started < svc->srv_threads_max && - svc->srv_n_active_reqs >= (svc->srv_threads_started - 2)) + if (ptlrpc_threads_need_create(svc)) { /* Ignore return code - we tried... */ ptlrpc_start_thread(svc); + } /* Process all incoming reqs before handling any */ - if (st.todo & PTLRPC_MAIN_IN_REQ) { + if (ptlrpc_server_request_waiting(svc)) { ptlrpc_server_handle_req_in(svc); /* but limit ourselves in case of flood */ - if (counter++ < 1000) + if (counter++ < 100) continue; counter = 0; } - if (st.todo & PTLRPC_MAIN_CHECK_TIMED) { + + if (ptlrpc_at_check(svc)) ptlrpc_at_check_timed(svc); - } - if (st.todo & PTLRPC_MAIN_ACTIVE_REQ) { + + if (ptlrpc_server_request_pending(svc, 0)) { lu_context_enter(&env.le_ctx); ptlrpc_server_handle_request(svc, thread); lu_context_exit(&env.le_ctx); } - if ((st.todo & PTLRPC_MAIN_REPOST) && + + if (ptlrpc_rqbd_pending(svc) && ptlrpc_server_post_idle_rqbds(svc) < 0) { /* I just failed to repost request buffers. * Wait for a timeout (unless something else @@ -2204,10 +2241,20 @@ out: thread, thread->t_pid, thread->t_id, rc); cfs_spin_lock(&svc->srv_lock); - if (st.running) + if ((thread->t_flags & SVC_STARTING) != 0) { + svc->srv_threads_starting--; + thread->t_flags &= ~SVC_STARTING; + } + + if ((thread->t_flags & SVC_RUNNING) != 0) { + /* must know immediately */ svc->srv_threads_running--; - thread->t_id = rc; - thread->t_flags = SVC_STOPPED; + thread->t_flags &= ~SVC_RUNNING; + } + + thread->t_id = rc; + thread->t_flags |= SVC_STOPPED; + cfs_waitq_signal(&thread->t_ctl_waitq); cfs_spin_unlock(&svc->srv_lock); @@ -2433,19 +2480,19 @@ int ptlrpc_start_thread(struct ptlrpc_service *svc) struct ptlrpc_svc_data d; struct ptlrpc_thread *thread; char name[32]; - int id, rc; + int rc; ENTRY; CDEBUG(D_RPCTRACE, "%s started %d min %d max %d running %d\n", - svc->srv_name, svc->srv_threads_started, svc->srv_threads_min, + svc->srv_name, svc->srv_threads_running, svc->srv_threads_min, svc->srv_threads_max, svc->srv_threads_running); if (unlikely(svc->srv_is_stopping)) RETURN(-ESRCH); - if (unlikely(svc->srv_threads_started >= svc->srv_threads_max) || + if (!ptlrpc_threads_increasable(svc) || (OBD_FAIL_CHECK(OBD_FAIL_TGT_TOOMANY_THREADS) && - svc->srv_threads_started == svc->srv_threads_min - 1)) + svc->srv_threads_running == svc->srv_threads_min - 1)) RETURN(-EMFILE); OBD_ALLOC_PTR(thread); @@ -2454,18 +2501,21 @@ int ptlrpc_start_thread(struct ptlrpc_service *svc) cfs_waitq_init(&thread->t_ctl_waitq); cfs_spin_lock(&svc->srv_lock); - if (svc->srv_threads_started >= svc->srv_threads_max) { + if (!ptlrpc_threads_increasable(svc)) { cfs_spin_unlock(&svc->srv_lock); OBD_FREE_PTR(thread); RETURN(-EMFILE); } + + svc->srv_threads_starting++; + thread->t_id = svc->srv_threads_next_id++; + thread->t_flags |= SVC_STARTING; + thread->t_svc = svc; + cfs_list_add(&thread->t_link, &svc->srv_threads); - id = svc->srv_threads_started++; cfs_spin_unlock(&svc->srv_lock); - thread->t_svc = svc; - thread->t_id = id; - sprintf(name, "%s_%02d", svc->srv_thread_name, id); + sprintf(name, "%s_%02d", svc->srv_thread_name, thread->t_id); d.svc = svc; d.name = name; d.thread = thread; @@ -2481,7 +2531,7 @@ int ptlrpc_start_thread(struct ptlrpc_service *svc) cfs_spin_lock(&svc->srv_lock); cfs_list_del(&thread->t_link); - --svc->srv_threads_started; + --svc->srv_threads_starting; cfs_spin_unlock(&svc->srv_lock); OBD_FREE(thread, sizeof(*thread)); @@ -2623,7 +2673,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) } /* schedule all outstanding replies to terminate them */ - cfs_spin_lock(&service->srv_lock); + cfs_spin_lock(&service->srv_rs_lock); while (!cfs_list_empty(&service->srv_active_replies)) { struct ptlrpc_reply_state *rs = cfs_list_entry(service->srv_active_replies.next, @@ -2632,7 +2682,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) ptlrpc_schedule_difficult_reply(rs); cfs_spin_unlock(&rs->rs_lock); } - cfs_spin_unlock(&service->srv_lock); + cfs_spin_unlock(&service->srv_rs_lock); /* purge the request queue. NB No new replies (rqbds all unlinked) * and no service threads, so I'm the only thread noodling the @@ -2646,7 +2696,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) cfs_list_del(&req->rq_list); service->srv_n_queued_reqs--; service->srv_n_active_reqs++; - ptlrpc_server_finish_request(req); + ptlrpc_server_finish_request(service, req); } while (ptlrpc_server_request_pending(service, 1)) { struct ptlrpc_request *req; @@ -2656,7 +2706,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) service->srv_n_queued_reqs--; service->srv_n_active_reqs++; ptlrpc_hpreq_fini(req); - ptlrpc_server_finish_request(req); + ptlrpc_server_finish_request(service, req); } LASSERT(service->srv_n_queued_reqs == 0); LASSERT(service->srv_n_active_reqs == 0); @@ -2718,9 +2768,9 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc) cfs_gettimeofday(&right_now); - cfs_spin_lock(&svc->srv_lock); + cfs_spin_lock(&svc->srv_rq_lock); if (!ptlrpc_server_request_pending(svc, 1)) { - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); return 0; } @@ -2732,7 +2782,7 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc) request = cfs_list_entry(svc->srv_request_queue.next, struct ptlrpc_request, rq_list); timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL); - cfs_spin_unlock(&svc->srv_lock); + cfs_spin_unlock(&svc->srv_rq_lock); if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 : at_max)) { -- 1.8.3.1