Whamcloud - gitweb
b=23289 grained lock for ptlrpc service
authorJian Yu <jian.yu@oracle.com>
Wed, 17 Nov 2010 02:15:26 +0000 (10:15 +0800)
committerVitaly Fertman <vitaly.fertman@sun.com>
Thu, 18 Nov 2010 22:36:20 +0000 (01:36 +0300)
- cacheline optimization for struct ptlrpc_service
- split ptlrpc_service::srv_lock to three locks:
  . ptlrpc_service::srv_lock
    serialize operations on rqbd and requests queued on srv_req_in_queue
  . ptlrpc_service::srv_rq_lock
    serialized operations on active requests
  . ptlrpc_service::srv_rs_lock
    serialize operations on RS
- cleanup logic in ptlrpc_main()
- remove unused atomic srv_outstanding_replies

o=liang
i=andreas.dilger
i=mikhail.pershin

lustre/include/lustre/lustre_idl.h
lustre/include/lustre_net.h
lustre/ldlm/ldlm_lib.c
lustre/mdt/mdt_recovery.c
lustre/ptlrpc/events.c
lustre/ptlrpc/lproc_ptlrpc.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/service.c

index eefff59..4dc97ad 100644 (file)
 
 /* Portal 63 is reserved for the Cray Inc DVS - nic@cray.com, roe@cray.com, n8851@cray.com */
 
-#define SVC_KILLED               1
-#define SVC_EVENT                2
-#define SVC_SIGNAL               4
-#define SVC_RUNNING              8
-#define SVC_STOPPING            16
-#define SVC_STOPPED             32
-
 /* packet types */
 #define PTL_RPC_MSG_REQUEST 4711
 #define PTL_RPC_MSG_ERR     4712
index be610d7..2c6ba51 100644 (file)
@@ -945,6 +945,15 @@ struct ptlrpc_bulk_desc {
 #endif
 };
 
+enum {
+        SVC_STOPPED     = 1 << 0,
+        SVC_STOPPING    = 1 << 1,
+        SVC_STARTING    = 1 << 2,
+        SVC_RUNNING     = 1 << 3,
+        SVC_EVENT       = 1 << 4,
+        SVC_SIGNAL      = 1 << 5,
+};
+
 /**
  * Definition of server service thread structure
  */
@@ -1005,9 +1014,16 @@ struct ptlrpc_request_buffer_desc {
         struct ptlrpc_request  rqbd_req;
 };
 
-typedef int (*svc_handler_t)(struct ptlrpc_request *req);
-typedef void (*svcreq_printfn_t)(void *, struct ptlrpc_request *);
-typedef int (*svc_hpreq_handler_t)(struct ptlrpc_request *);
+typedef int  (*svc_thr_init_t)(struct ptlrpc_thread *thread);
+typedef void (*svc_thr_done_t)(struct ptlrpc_thread *thread);
+typedef int  (*svc_handler_t)(struct ptlrpc_request *req);
+typedef int  (*svc_hpreq_handler_t)(struct ptlrpc_request *);
+typedef void (*svc_req_printfn_t)(void *, struct ptlrpc_request *);
+
+#ifndef __cfs_cacheline_aligned
+/* NB: put it here for reducing patche dependence */
+# define __cfs_cacheline_aligned
+#endif
 
 /**
  * How many high priority requests to serve before serving one normal
@@ -1020,106 +1036,181 @@ typedef int (*svc_hpreq_handler_t)(struct ptlrpc_request *);
  * The service is listening on a particular portal (like tcp port)
  * and perform actions for a specific server like IO service for OST
  * or general metadata service for MDS.
+ *
+ * ptlrpc service has four locks:
+ * \a srv_lock
+ *    serialize operations on rqbd and requests waiting for preprocess
+ * \a srv_rq_lock
+ *    serialize operations active requests sent to this portal
+ * \a srv_at_lock
+ *    serialize adaptive timeout stuff
+ * \a srv_rs_lock
+ *    serialize operations on RS list (reply states)
+ *
+ * We don't have any use-case to take two or more locks at the same time
+ * for now, so there is no lock order issue.
  */
 struct ptlrpc_service {
-        cfs_list_t       srv_list;              /* chain thru all services */
-        int              srv_max_req_size;      /* biggest request to receive */
-        int              srv_max_reply_size;    /* biggest reply to send */
-        int              srv_buf_size;          /* size of individual buffers */
-        int              srv_nbuf_per_group;    /* # buffers to allocate in 1 group */
-        int              srv_nbufs;             /* total # req buffer descs allocated */
-        int              srv_threads_min;       /* threads to start at SOW */
-        int              srv_threads_max;       /* thread upper limit */
-        int              srv_threads_started;   /* index of last started thread */
-        int              srv_threads_running;   /* # running threads */
-        cfs_atomic_t     srv_n_difficult_replies; /* # 'difficult' replies */
-        int              srv_n_active_reqs;     /* # reqs being served */
-        int              srv_n_hpreq;           /* # HPreqs being served */
-        cfs_duration_t   srv_rqbd_timeout;      /* timeout before re-posting reqs, in tick */
-        int              srv_watchdog_factor;   /* soft watchdog timeout multiplier */
-        unsigned         srv_cpu_affinity:1;    /* bind threads to CPUs */
-        unsigned         srv_at_check:1;        /* check early replies */
-        unsigned         srv_is_stopping:1;     /* under unregister_service */
-        cfs_time_t       srv_at_checktime;      /* debug */
-
-        /** Local portal on which to receive requests */
-        __u32            srv_req_portal;
-        /** Portal on the client to send replies to */
-        __u32            srv_rep_portal;
-
-        /** AT stuff */
+        /** most often accessed fields */
+        /** chain thru all services */
+        cfs_list_t                      srv_list;
+        /** only statically allocated strings here; we don't clean them */
+        char                           *srv_name;
+        /** only statically allocated strings here; we don't clean them */
+        char                           *srv_thread_name;
+        /** service thread list */
+        cfs_list_t                      srv_threads;
+        /** threads to start at beginning of service */
+        int                             srv_threads_min;
+        /** thread upper limit */
+        int                             srv_threads_max;
+        /** always increasing number */
+        unsigned                        srv_threads_next_id;
+        /** # of starting threads */
+        int                             srv_threads_starting;
+        /** # running threads */
+        int                             srv_threads_running;
+
+        /** service operations, move to ptlrpc_svc_ops_t in the future */
         /** @{ */
-        struct adaptive_timeout srv_at_estimate;/* estimated rpc service time */
-        cfs_spinlock_t   srv_at_lock;
-        struct ptlrpc_at_array  srv_at_array;   /* reqs waiting for replies */
-        cfs_timer_t      srv_at_timer;      /* early reply timer */
-        /** @} */
-
-        int              srv_n_queued_reqs; /* # reqs in either of the queues below */
-        int              srv_hpreq_count;   /* # hp requests handled */
-        int              srv_hpreq_ratio;   /* # hp per lp reqs to handle */
-        cfs_list_t       srv_req_in_queue;  /* incoming reqs */
-        cfs_list_t       srv_request_queue; /* reqs waiting for service */
-        cfs_list_t       srv_request_hpq;   /* high priority queue */
-
-        cfs_list_t       srv_request_history;  /* request history */
-        __u64            srv_request_seq;      /* next request sequence # */
-        __u64            srv_request_max_cull_seq; /* highest seq culled from history */
-        svcreq_printfn_t srv_request_history_print_fn; /* service-specific print fn */
-
-        cfs_list_t       srv_idle_rqbds;    /* request buffers to be reposted */
-        cfs_list_t       srv_active_rqbds;  /* req buffers receiving */
-        cfs_list_t       srv_history_rqbds; /* request buffer history */
-        int              srv_nrqbd_receiving; /* # posted request buffers */
-        int              srv_n_history_rqbds;  /* # request buffers in history */
-        int              srv_max_history_rqbds;/* max # request buffers in history */
-
-        cfs_atomic_t     srv_outstanding_replies;
-        cfs_list_t       srv_active_replies;   /* all the active replies */
-#ifndef __KERNEL__
-        cfs_list_t       srv_reply_queue;  /* replies waiting for service */
-#endif
-        cfs_waitq_t      srv_waitq; /* all threads sleep on this. This
-                                     * wait-queue is signalled when new
-                                     * incoming request arrives and when
-                                     * difficult reply has to be handled. */
-
-        cfs_list_t       srv_threads;       /* service thread list */
+        /**
+         * if non-NULL called during thread creation (ptlrpc_start_thread())
+         * to initialize service specific per-thread state.
+         */
+        svc_thr_init_t                  srv_init;
+        /**
+         * if non-NULL called during thread shutdown (ptlrpc_main()) to
+         * destruct state created by ->srv_init().
+         */
+        svc_thr_done_t                  srv_done;
         /** Handler function for incoming requests for this service */
-        svc_handler_t    srv_handler;
-        svc_hpreq_handler_t  srv_hpreq_handler; /* hp request handler */
-
-        char *srv_name; /* only statically allocated strings here; we don't clean them */
-        char *srv_thread_name; /* only statically allocated strings here; we don't clean them */
-
-        cfs_spinlock_t        srv_lock;
+        svc_handler_t                   srv_handler;
+        /** hp request handler */
+        svc_hpreq_handler_t             srv_hpreq_handler;
+        /** service-specific print fn */
+        svc_req_printfn_t               srv_req_printfn;
+        /** @} */
 
         /** Root of /proc dir tree for this service */
-        cfs_proc_dir_entry_t *srv_procroot;
+        cfs_proc_dir_entry_t           *srv_procroot;
         /** Pointer to statistic data for this service */
-        struct lprocfs_stats *srv_stats;
-
-        /** List of free reply_states */
-        cfs_list_t           srv_free_rs_list;
-        /** waitq to run, when adding stuff to srv_free_rs_list */
-        cfs_waitq_t          srv_free_rs_waitq;
-
+        struct lprocfs_stats           *srv_stats;
+        /** # hp per lp reqs to handle */
+        int                             srv_hpreq_ratio;
+        /** biggest request to receive */
+        int                             srv_max_req_size;
+        /** biggest reply to send */
+        int                             srv_max_reply_size;
+        /** size of individual buffers */
+        int                             srv_buf_size;
+        /** # buffers to allocate in 1 group */
+        int                             srv_nbuf_per_group;
+        /** Local portal on which to receive requests */
+        __u32                           srv_req_portal;
+        /** Portal on the client to send replies to */
+        __u32                           srv_rep_portal;
         /**
          * Tags for lu_context associated with this thread, see struct
          * lu_context.
          */
-        __u32                srv_ctx_tags;
+        __u32                           srv_ctx_tags;
+        /** soft watchdog timeout multiplier */
+        int                             srv_watchdog_factor;
+        /** bind threads to CPUs */
+        unsigned                        srv_cpu_affinity:1;
+        /** under unregister_service */
+        unsigned                        srv_is_stopping:1;
+
         /**
-         * if non-NULL called during thread creation (ptlrpc_start_thread())
-         * to initialize service specific per-thread state.
+         * serialize the following fields, used for protecting
+         * rqbd list and incoming requests waiting for preprocess
          */
-        int (*srv_init)(struct ptlrpc_thread *thread);
+        cfs_spinlock_t                  srv_lock  __cfs_cacheline_aligned;
+        /** incoming reqs */
+        cfs_list_t                      srv_req_in_queue;
+        /** total # req buffer descs allocated */
+        int                             srv_nbufs;
+        /** # posted request buffers */
+        int                             srv_nrqbd_receiving;
+        /** timeout before re-posting reqs, in tick */
+        cfs_duration_t                  srv_rqbd_timeout;
+        /** request buffers to be reposted */
+        cfs_list_t                      srv_idle_rqbds;
+        /** req buffers receiving */
+        cfs_list_t                      srv_active_rqbds;
+        /** request buffer history */
+        cfs_list_t                      srv_history_rqbds;
+        /** # request buffers in history */
+        int                             srv_n_history_rqbds;
+        /** max # request buffers in history */
+        int                             srv_max_history_rqbds;
+        /** request history */
+        cfs_list_t                      srv_request_history;
+        /** next request sequence # */
+        __u64                           srv_request_seq;
+        /** highest seq culled from history */
+        __u64                           srv_request_max_cull_seq;
         /**
-         * if non-NULL called during thread shutdown (ptlrpc_main()) to
-         * destruct state created by ->srv_init().
+         * all threads sleep on this. This wait-queue is signalled when new
+         * incoming request arrives and when difficult reply has to be handled.
+         */
+        cfs_waitq_t                     srv_waitq;
+
+        /**
+         * serialize the following fields, used for processing requests
+         * sent to this portal
          */
-        void (*srv_done)(struct ptlrpc_thread *thread);
+        cfs_spinlock_t                  srv_rq_lock __cfs_cacheline_aligned;
+        /** # reqs in either of the queues below */
+        /** reqs waiting for service */
+        cfs_list_t                      srv_request_queue;
+        /** high priority queue */
+        cfs_list_t                      srv_request_hpq;
+        /** # incoming reqs */
+        int                             srv_n_queued_reqs;
+        /** # reqs being served */
+        int                             srv_n_active_reqs;
+        /** # HPreqs being served */
+        int                             srv_n_active_hpreq;
+        /** # hp requests handled */
+        int                             srv_hpreq_count;
+
+        /** AT stuff */
+        /** @{ */
+        /**
+         * serialize the following fields, used for changes on
+         * adaptive timeout
+         */
+        cfs_spinlock_t                  srv_at_lock __cfs_cacheline_aligned;
+        /** estimated rpc service time */
+        struct adaptive_timeout         srv_at_estimate;
+        /** reqs waiting for replies */
+        struct ptlrpc_at_array          srv_at_array;
+        /** early reply timer */
+        cfs_timer_t                     srv_at_timer;
+        /** check early replies */
+        unsigned                        srv_at_check;
+        /** debug */
+        cfs_time_t                      srv_at_checktime;
+        /** @} */
 
+        /**
+         * serialize the following fields, used for processing
+         * replies for this portal
+         */
+        cfs_spinlock_t                  srv_rs_lock __cfs_cacheline_aligned;
+        /** all the active replies */
+        cfs_list_t                      srv_active_replies;
+#ifndef __KERNEL__
+        /** replies waiting for service */
+        cfs_list_t                      srv_reply_queue;
+#endif
+        /** List of free reply_states */
+        cfs_list_t                      srv_free_rs_list;
+        /** waitq to run, when adding stuff to srv_free_rs_list */
+        cfs_waitq_t                     srv_free_rs_waitq;
+        /** # 'difficult' replies */
+        cfs_atomic_t                    srv_n_difficult_replies;
         //struct ptlrpc_srv_ni srv_interfaces[0];
 };
 
@@ -1392,7 +1483,7 @@ void ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs);
 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
                                             svc_handler_t h, char *name,
                                             struct proc_dir_entry *proc_entry,
-                                            svcreq_printfn_t prntfn,
+                                            svc_req_printfn_t prntfn,
                                             char *threadname);
 
 struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
@@ -1401,7 +1492,7 @@ struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
                                        int watchdog_factor,
                                        svc_handler_t, char *name,
                                        cfs_proc_dir_entry_t *proc_entry,
-                                       svcreq_printfn_t,
+                                       svc_req_printfn_t,
                                        int min_threads, int max_threads,
                                        char *threadname, __u32 ctx_tags,
                                        svc_hpreq_handler_t);
index 93f2366..5530278 100644 (file)
@@ -568,12 +568,12 @@ int server_disconnect_export(struct obd_export *exp)
                                        struct ptlrpc_reply_state, rs_exp_list);
                 struct ptlrpc_service *svc = rs->rs_service;
 
-                cfs_spin_lock(&svc->srv_lock);
+                cfs_spin_lock(&svc->srv_rs_lock);
                 cfs_list_del_init(&rs->rs_exp_list);
                 cfs_spin_lock(&rs->rs_lock);
                 ptlrpc_schedule_difficult_reply(rs);
                 cfs_spin_unlock(&rs->rs_lock);
-                cfs_spin_unlock(&svc->srv_lock);
+                cfs_spin_unlock(&svc->srv_rs_lock);
         }
         cfs_spin_unlock(&exp->exp_lock);
 
@@ -2184,7 +2184,7 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
 
         netrc = target_send_reply_msg (req, rc, fail_id);
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rs_lock);
 
         cfs_atomic_inc(&svc->srv_n_difficult_replies);
 
@@ -2196,7 +2196,6 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
                  * reply_out_callback leaves alone) */
                 rs->rs_on_net = 0;
                 ptlrpc_rs_addref(rs);
-                cfs_atomic_inc (&svc->srv_outstanding_replies);
         }
 
         cfs_spin_lock(&rs->rs_lock);
@@ -2211,7 +2210,7 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
                 rs->rs_scheduled = 0;           /* allow notifier to schedule */
         }
         cfs_spin_unlock(&rs->rs_lock);
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rs_lock);
         EXIT;
 }
 
index a8327bb..e125d5b 100644 (file)
@@ -978,7 +978,7 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req)
                                 oldrep->rs_opc);
 
                 svc = oldrep->rs_service;
-                cfs_spin_lock (&svc->srv_lock);
+                cfs_spin_lock (&svc->srv_rs_lock);
 
                 cfs_list_del_init (&oldrep->rs_exp_list);
 
@@ -998,7 +998,7 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req)
                 ptlrpc_schedule_difficult_reply (oldrep);
                 cfs_spin_unlock(&oldrep->rs_lock);
 
-                cfs_spin_unlock (&svc->srv_lock);
+                cfs_spin_unlock (&svc->srv_rs_lock);
                 break;
         }
         cfs_spin_unlock(&exp->exp_lock);
index 2c93be1..5c37070 100644 (file)
@@ -339,7 +339,6 @@ void reply_out_callback(lnet_event_t *ev)
                  * net's ref on 'rs' */
                 LASSERT (ev->unlinked);
                 ptlrpc_rs_decref(rs);
-                cfs_atomic_dec (&svc->srv_outstanding_replies);
                 EXIT;
                 return;
         }
@@ -349,14 +348,14 @@ void reply_out_callback(lnet_event_t *ev)
         if (ev->unlinked) {
                 /* Last network callback. The net's ref on 'rs' stays put
                  * until ptlrpc_handle_rs() is done with it */
-                cfs_spin_lock(&svc->srv_lock);
+                cfs_spin_lock(&svc->srv_rs_lock);
                 cfs_spin_lock(&rs->rs_lock);
                 rs->rs_on_net = 0;
                 if (!rs->rs_no_ack ||
                     rs->rs_transno <= rs->rs_export->exp_obd->obd_last_committed)
                         ptlrpc_schedule_difficult_reply (rs);
                 cfs_spin_unlock(&rs->rs_lock);
-                cfs_spin_unlock(&svc->srv_lock);
+                cfs_spin_unlock(&svc->srv_rs_lock);
         }
 
         EXIT;
index 8b77941..b42c41b 100644 (file)
@@ -333,7 +333,7 @@ ptlrpc_lprocfs_rd_threads_started(char *page, char **start, off_t off,
 {
         struct ptlrpc_service *svc = data;
 
-        return snprintf(page, count, "%d\n", svc->srv_threads_started);
+        return snprintf(page, count, "%d\n", svc->srv_threads_running);
 }
 
 static int
@@ -470,7 +470,7 @@ ptlrpc_lprocfs_svc_req_history_next(struct seq_file *s,
         return srhi;
 }
 
-/* common ost/mdt srv_request_history_print_fn */
+/* common ost/mdt srv_req_printfn */
 void target_print_req(void *seq_file, struct ptlrpc_request *req)
 {
         /* Called holding srv_lock with irqs disabled.
@@ -527,10 +527,10 @@ static int ptlrpc_lprocfs_svc_req_history_show(struct seq_file *s, void *iter)
                            req->rq_arrival_time.tv_sec,
                            req->rq_sent - req->rq_arrival_time.tv_sec,
                            req->rq_sent - req->rq_deadline);
-                if (svc->srv_request_history_print_fn == NULL)
+                if (svc->srv_req_printfn == NULL)
                         seq_printf(s, "\n");
                 else
-                        svc->srv_request_history_print_fn(s, srhi->srhi_req);
+                        svc->srv_req_printfn(s, srhi->srhi_req);
         }
 
         cfs_spin_unlock(&svc->srv_lock);
index 515014e..240b226 100644 (file)
@@ -463,7 +463,6 @@ int ptlrpc_send_reply(struct ptlrpc_request *req, int flags)
                 CERROR("not replying on NULL connection\n"); /* bug 9635 */
                 return -ENOTCONN;
         }
-        cfs_atomic_inc (&svc->srv_outstanding_replies);
         ptlrpc_rs_addref(rs);                   /* +1 ref for the network */
 
         rc = sptlrpc_svc_wrap_reply(req);
@@ -478,10 +477,8 @@ int ptlrpc_send_reply(struct ptlrpc_request *req, int flags)
                            &rs->rs_cb_id, conn, svc->srv_rep_portal,
                            req->rq_xid, req->rq_reply_off);
 out:
-        if (unlikely(rc != 0)) {
-                cfs_atomic_dec (&svc->srv_outstanding_replies);
+        if (unlikely(rc != 0))
                 ptlrpc_req_drop_rs(req);
-        }
         ptlrpc_connection_put(conn);
         return rc;
 }
index 56558e5..da4719f 100644 (file)
@@ -270,12 +270,12 @@ struct ptlrpc_reply_state *lustre_get_emerg_rs(struct ptlrpc_service *svc)
 {
         struct ptlrpc_reply_state *rs = NULL;
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rs_lock);
         /* See if we have anything in a pool, and wait if nothing */
         while (cfs_list_empty(&svc->srv_free_rs_list)) {
                 struct l_wait_info lwi;
                 int rc;
-                cfs_spin_unlock(&svc->srv_lock);
+                cfs_spin_unlock(&svc->srv_rs_lock);
                 /* If we cannot get anything for some long time, we better
                    bail out instead of waiting infinitely */
                 lwi = LWI_TIMEOUT(cfs_time_seconds(10), NULL, NULL);
@@ -284,13 +284,13 @@ struct ptlrpc_reply_state *lustre_get_emerg_rs(struct ptlrpc_service *svc)
                                   &lwi);
                 if (rc)
                         goto out;
-                cfs_spin_lock(&svc->srv_lock);
+                cfs_spin_lock(&svc->srv_rs_lock);
         }
 
         rs = cfs_list_entry(svc->srv_free_rs_list.next,
                             struct ptlrpc_reply_state, rs_list);
         cfs_list_del(&rs->rs_list);
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rs_lock);
         LASSERT(rs);
         memset(rs, 0, svc->srv_max_reply_size);
         rs->rs_service = svc;
@@ -305,9 +305,9 @@ void lustre_put_emerg_rs(struct ptlrpc_reply_state *rs)
 
         LASSERT(svc);
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rs_lock);
         cfs_list_add(&rs->rs_list, &svc->srv_free_rs_list);
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rs_lock);
         cfs_waitq_signal(&svc->srv_free_rs_waitq);
 }
 
index f3cf022..299398d 100644 (file)
@@ -286,9 +286,9 @@ static void rs_batch_add(struct rs_batch *b, struct ptlrpc_reply_state *rs)
         if (svc != b->rsb_svc || b->rsb_n_replies >= MAX_SCHEDULED) {
                 if (b->rsb_svc != NULL) {
                         rs_batch_dispatch(b);
-                        cfs_spin_unlock(&b->rsb_svc->srv_lock);
+                        cfs_spin_unlock(&b->rsb_svc->srv_rs_lock);
                 }
-                cfs_spin_lock(&svc->srv_lock);
+                cfs_spin_lock(&svc->srv_rs_lock);
                 b->rsb_svc = svc;
         }
         cfs_spin_lock(&rs->rs_lock);
@@ -313,7 +313,7 @@ static void rs_batch_fini(struct rs_batch *b)
 {
         if (b->rsb_svc != 0) {
                 rs_batch_dispatch(b);
-                cfs_spin_unlock(&b->rsb_svc->srv_lock);
+                cfs_spin_unlock(&b->rsb_svc->srv_rs_lock);
         }
 }
 
@@ -357,12 +357,12 @@ ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
 {
         ENTRY;
 
-        LASSERT_SPIN_LOCKED(&rs->rs_service->srv_lock);
+        LASSERT_SPIN_LOCKED(&rs->rs_service->srv_rs_lock);
         LASSERT_SPIN_LOCKED(&rs->rs_lock);
         LASSERT (rs->rs_difficult);
-        rs->rs_scheduled_ever = 1;              /* flag any notification attempt */
+        rs->rs_scheduled_ever = 1;  /* flag any notification attempt */
 
-        if (rs->rs_scheduled) {                  /* being set up or already notified */
+        if (rs->rs_scheduled) {     /* being set up or already notified */
                 EXIT;
                 return;
         }
@@ -454,7 +454,7 @@ ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc)
 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
                                             svc_handler_t h, char *name,
                                             struct proc_dir_entry *proc_entry,
-                                            svcreq_printfn_t prntfn,
+                                            svc_req_printfn_t prntfn,
                                             char *threadname)
 {
         return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
@@ -499,7 +499,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
                 int req_portal, int rep_portal, int watchdog_factor,
                 svc_handler_t handler, char *name,
                 cfs_proc_dir_entry_t *proc_entry,
-                svcreq_printfn_t svcreq_printfn,
+                svc_req_printfn_t svcreq_printfn,
                 int min_threads, int max_threads,
                 char *threadname, __u32 ctx_tags,
                 svc_hpreq_handler_t hp_handler)
@@ -522,6 +522,8 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
 
         service->srv_name = name;
         cfs_spin_lock_init(&service->srv_lock);
+        cfs_spin_lock_init(&service->srv_rq_lock);
+        cfs_spin_lock_init(&service->srv_rs_lock);
         CFS_INIT_LIST_HEAD(&service->srv_threads);
         cfs_waitq_init(&service->srv_waitq);
 
@@ -532,7 +534,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         service->srv_req_portal = req_portal;
         service->srv_watchdog_factor = watchdog_factor;
         service->srv_handler = handler;
-        service->srv_request_history_print_fn = svcreq_printfn;
+        service->srv_req_printfn = svcreq_printfn;
         service->srv_request_seq = 1;           /* valid seq #s start at 1 */
         service->srv_request_max_cull_seq = 0;
         service->srv_threads_min = min_threads;
@@ -542,7 +544,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         service->srv_hpreq_handler = hp_handler;
         service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
         service->srv_hpreq_count = 0;
-        service->srv_n_hpreq = 0;
+        service->srv_n_active_hpreq = 0;
 
         rc = LNetSetLazyPortal(service->srv_req_portal);
         LASSERT (rc == 0);
@@ -648,9 +650,9 @@ void ptlrpc_server_active_request_inc(struct ptlrpc_request *req)
         struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
         struct ptlrpc_service *svc = rqbd->rqbd_service;
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
         svc->srv_n_active_reqs++;
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
 }
 
 /**
@@ -661,9 +663,9 @@ void ptlrpc_server_active_request_dec(struct ptlrpc_request *req)
         struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
         struct ptlrpc_service *svc = rqbd->rqbd_service;
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
         svc->srv_n_active_reqs--;
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
 }
 
 /**
@@ -705,7 +707,6 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
 
         cfs_spin_lock(&svc->srv_lock);
 
-        svc->srv_n_active_reqs--;
         cfs_list_add(&req->rq_list, &rqbd->rqbd_reqs);
 
         refcount = --(rqbd->rqbd_refcount);
@@ -776,8 +777,15 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
  * to finish a request: stop sending more early replies, and release
  * the request. should be called after we finished handling the request.
  */
-static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
+static void ptlrpc_server_finish_request(struct ptlrpc_service *svc,
+                                         struct ptlrpc_request *req)
 {
+        cfs_spin_lock(&svc->srv_rq_lock);
+        svc->srv_n_active_reqs--;
+        if (req->rq_hp)
+                svc->srv_n_active_hpreq--;
+        cfs_spin_unlock(&svc->srv_rq_lock);
+
         ptlrpc_server_drop_request(req);
 }
 
@@ -1295,12 +1303,12 @@ void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
         ENTRY;
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
         /* It may happen that the request is already taken for the processing
          * but still in the export list, do not re-add it into the HP list. */
         if (req->rq_phase == RQ_PHASE_NEW)
                 ptlrpc_hpreq_reorder_nolock(svc, req);
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
         EXIT;
 }
 
@@ -1332,7 +1340,7 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
         if (rc < 0)
                 RETURN(rc);
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
         /* Before inserting the request into the queue, check if it is not
          * inserted yet, or even already handled -- it may happen due to
          * a racing ldlm_server_blocking_ast(). */
@@ -1343,22 +1351,74 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
                         cfs_list_add_tail(&req->rq_list,
                                           &svc->srv_request_queue);
         }
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
 
         RETURN(0);
 }
 
 /**
+ * Allow to handle high priority request
+ * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
+ * to get reliable result
+ */
+static int ptlrpc_server_allow_high(struct ptlrpc_service *svc, int force)
+{
+        if (force)
+                return 1;
+
+        if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1)
+                return 0;
+
+        return cfs_list_empty(&svc->srv_request_queue) ||
+               svc->srv_hpreq_count < svc->srv_hpreq_ratio;
+}
+
+static int ptlrpc_server_high_pending(struct ptlrpc_service *svc, int force)
+{
+        return ptlrpc_server_allow_high(svc, force) &&
+               !cfs_list_empty(&svc->srv_request_hpq);
+}
+
+/**
  * Only allow normal priority requests on a service that has a high-priority
  * queue if forced (i.e. cleanup), if there are other high priority requests
  * already being processed (i.e. those threads can service more high-priority
  * requests), or if there are enough idle threads that a later thread can do
  * a high priority request.
+ * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
+ * to get reliable result
  */
 static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
 {
-        return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 ||
-                svc->srv_threads_running <= svc->srv_threads_started - 2;
+        if (force ||
+            svc->srv_n_active_reqs < svc->srv_threads_running - 2)
+                return 1;
+
+        if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1)
+                return 0;
+
+        return svc->srv_n_active_hpreq > 0 || svc->srv_hpreq_handler == NULL;
+}
+
+static int ptlrpc_server_normal_pending(struct ptlrpc_service *svc, int force)
+{
+        return ptlrpc_server_allow_normal(svc, force) &&
+               !cfs_list_empty(&svc->srv_request_queue);
+}
+
+/**
+ * Returns true if there are requests available in incoming
+ * request queue for processing and it is allowed to fetch them.
+ * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
+ * to get reliable result
+ * \see ptlrpc_server_allow_normal
+ * \see ptlrpc_server_allow high
+ */
+static inline int
+ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
+{
+        return ptlrpc_server_high_pending(svc, force) ||
+               ptlrpc_server_normal_pending(svc, force);
 }
 
 /**
@@ -1369,34 +1429,24 @@ static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
 static struct ptlrpc_request *
 ptlrpc_server_request_get(struct ptlrpc_service *svc, int force)
 {
-        struct ptlrpc_request *req = NULL;
+        struct ptlrpc_request *req;
         ENTRY;
 
-        if (ptlrpc_server_allow_normal(svc, force) &&
-            !cfs_list_empty(&svc->srv_request_queue) &&
-            (cfs_list_empty(&svc->srv_request_hpq) ||
-             svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) {
-                req = cfs_list_entry(svc->srv_request_queue.next,
-                                     struct ptlrpc_request, rq_list);
-                svc->srv_hpreq_count = 0;
-        } else if (!cfs_list_empty(&svc->srv_request_hpq)) {
+        if (ptlrpc_server_high_pending(svc, force)) {
                 req = cfs_list_entry(svc->srv_request_hpq.next,
                                      struct ptlrpc_request, rq_list);
                 svc->srv_hpreq_count++;
+                RETURN(req);
+
         }
-        RETURN(req);
-}
 
-/**
- * Returns true if there are requests available in incoming
- * request queue for processing and it is allowed to fetch them
- * \see ptlrpc_server_allow_normal
- */
-static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
-{
-        return ((ptlrpc_server_allow_normal(svc, force) &&
-                 !cfs_list_empty(&svc->srv_request_queue)) ||
-                !cfs_list_empty(&svc->srv_request_hpq));
+        if (ptlrpc_server_normal_pending(svc, force)) {
+                req = cfs_list_entry(svc->srv_request_queue.next,
+                                     struct ptlrpc_request, rq_list);
+                svc->srv_hpreq_count = 0;
+                RETURN(req);
+        }
+        RETURN(NULL);
 }
 
 /**
@@ -1424,6 +1474,7 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
         req = cfs_list_entry(svc->srv_req_in_queue.next,
                              struct ptlrpc_request, rq_list);
         cfs_list_del_init (&req->rq_list);
+        svc->srv_n_queued_reqs--;
         /* Consider this still a "queued" request as far as stats are
            concerned */
         cfs_spin_unlock(&svc->srv_lock);
@@ -1538,11 +1589,10 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
         RETURN(1);
 
 err_req:
-        cfs_spin_lock(&svc->srv_lock);
-        svc->srv_n_queued_reqs--;
+        cfs_spin_lock(&svc->srv_rq_lock);
         svc->srv_n_active_reqs++;
-        cfs_spin_unlock(&svc->srv_lock);
-        ptlrpc_server_finish_request(req);
+        cfs_spin_unlock(&svc->srv_rq_lock);
+        ptlrpc_server_finish_request(svc, req);
 
         RETURN(1);
 }
@@ -1566,17 +1616,17 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
 
         LASSERT(svc);
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
 #ifndef __KERNEL__
         /* !@%$# liblustre only has 1 thread */
         if (cfs_atomic_read(&svc->srv_n_difficult_replies) != 0) {
-                cfs_spin_unlock(&svc->srv_lock);
+                cfs_spin_unlock(&svc->srv_rq_lock);
                 RETURN(0);
         }
 #endif
         request = ptlrpc_server_request_get(svc, 0);
         if  (request == NULL) {
-                cfs_spin_unlock(&svc->srv_lock);
+                cfs_spin_unlock(&svc->srv_rq_lock);
                 RETURN(0);
         }
 
@@ -1588,27 +1638,26 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
 
         if (unlikely(fail_opc)) {
                 if (request->rq_export && request->rq_ops) {
-                        cfs_spin_unlock(&svc->srv_lock);
+                        cfs_spin_unlock(&svc->srv_rq_lock);
                         OBD_FAIL_TIMEOUT(fail_opc, 4);
-                        cfs_spin_lock(&svc->srv_lock);
+                        cfs_spin_lock(&svc->srv_rq_lock);
                         request = ptlrpc_server_request_get(svc, 0);
                         if  (request == NULL) {
-                                cfs_spin_unlock(&svc->srv_lock);
+                                cfs_spin_unlock(&svc->srv_rq_lock);
                                 RETURN(0);
                         }
                 }
         }
 
         cfs_list_del_init(&request->rq_list);
-        svc->srv_n_queued_reqs--;
         svc->srv_n_active_reqs++;
         if (request->rq_hp)
-                svc->srv_n_hpreq++;
+                svc->srv_n_active_hpreq++;
 
         /* The phase is changed under the lock here because we need to know
          * the request is under processing (see ptlrpc_hpreq_reorder()). */
         ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
 
         ptlrpc_hpreq_fini(request);
 
@@ -1740,11 +1789,7 @@ put_conn:
         }
 
 out_req:
-        cfs_spin_lock(&svc->srv_lock);
-        if (request->rq_hp)
-                svc->srv_n_hpreq--;
-        cfs_spin_unlock(&svc->srv_lock);
-        ptlrpc_server_finish_request(request);
+        ptlrpc_server_finish_request(svc, request);
 
         RETURN(1);
 }
@@ -1845,7 +1890,6 @@ ptlrpc_handle_rs (struct ptlrpc_reply_state *rs)
                 class_export_put (exp);
                 rs->rs_export = NULL;
                 ptlrpc_rs_decref (rs);
-                cfs_atomic_dec (&svc->srv_outstanding_replies);
                 if (cfs_atomic_dec_and_test(&svc->srv_n_difficult_replies) &&
                     svc->srv_is_stopping)
                         cfs_waitq_broadcast(&svc->srv_waitq);
@@ -1873,14 +1917,14 @@ ptlrpc_server_handle_reply(struct ptlrpc_service *svc)
         struct ptlrpc_reply_state *rs = NULL;
         ENTRY;
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rs_lock);
         if (!cfs_list_empty(&svc->srv_reply_queue)) {
                 rs = cfs_list_entry(svc->srv_reply_queue.prev,
                                     struct ptlrpc_reply_state,
                                     rs_list);
                 cfs_list_del_init(&rs->rs_list);
         }
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rs_lock);
         if (rs != NULL)
                 ptlrpc_handle_rs(rs);
         RETURN(rs != NULL);
@@ -1960,79 +2004,63 @@ ptlrpc_retry_rqbds(void *arg)
         return (-ETIMEDOUT);
 }
 
-/**
- *  Status bits to pass todo info from
- *  ptlrpc_main_check_event to ptlrpc_main.
- */
-#define PTLRPC_MAIN_STOPPING    0x01
-#define PTLRPC_MAIN_IN_REQ      0x02
-#define PTLRPC_MAIN_ACTIVE_REQ  0x04
-#define PTLRPC_MAIN_CHECK_TIMED 0x08
-#define PTLRPC_MAIN_REPOST      0x10
+static inline int
+ptlrpc_threads_enough(struct ptlrpc_service *svc)
+{
+        return svc->srv_n_active_reqs <
+               svc->srv_threads_running - 1 - (svc->srv_hpreq_handler != NULL);
+}
 
 /**
- * A container to share per-thread status variables between
- * ptlrpc_main_check_event and ptlrpc_main functions.
+ * allowed to create more threads
+ * user can call it w/o any lock but need to hold ptlrpc_service::srv_lock to
+ * get reliable result
  */
-struct ptlrpc_main_check_s {
-        /** todo info for the ptrlrpc_main */
-        int todo;
-        /** is this thread counted as running or not? */
-        int running;
-};
+static inline int
+ptlrpc_threads_increasable(struct ptlrpc_service *svc)
+{
+        return svc->srv_threads_running +
+               svc->srv_threads_starting < svc->srv_threads_max;
+}
 
 /**
- * Check whether current service thread has work to do.
+ * too many requests and allowed to create more threads
  */
-static int ptlrpc_main_check_event(struct ptlrpc_thread *t,
-                                   struct ptlrpc_main_check_s *status)
+static inline int
+ptlrpc_threads_need_create(struct ptlrpc_service *svc)
 {
-        struct ptlrpc_service *svc = t->t_svc;
-        ENTRY;
+        return !ptlrpc_threads_enough(svc) && ptlrpc_threads_increasable(svc);
+}
 
-        status->todo = 0;
+static inline int
+ptlrpc_thread_stopping(struct ptlrpc_thread *thread)
+{
+        return (thread->t_flags & SVC_STOPPING) != 0 ||
+                thread->t_svc->srv_is_stopping;
+}
 
-        /* check the stop flags w/o any locking to make all
-         * concurrently running threads stop faster. */
-        if (unlikely((t->t_flags & SVC_STOPPING) ||
-                     svc->srv_is_stopping)) {
-                status->todo |= PTLRPC_MAIN_STOPPING;
-                goto out;
-        }
+static inline int
+ptlrpc_rqbd_pending(struct ptlrpc_service *svc)
+{
+        return !cfs_list_empty(&svc->srv_idle_rqbds) &&
+               svc->srv_rqbd_timeout == 0;
+}
 
-        cfs_spin_lock(&svc->srv_lock);
-        /* ptlrpc_server_request_pending() needs this thread to be
-         * counted as running. */
-        if (!status->running) {
-                svc->srv_threads_running++;
-                status->running = 1;
-        }
-        /* Process all incoming reqs before handling any */
-        if (!cfs_list_empty(&svc->srv_req_in_queue)) {
-                status->todo |= PTLRPC_MAIN_IN_REQ;
-        }
-        /* Don't handle regular requests in the last thread, in order
-         * to handle any incoming reqs, early replies, etc. */
-        if (ptlrpc_server_request_pending(svc, 0) &&
-            svc->srv_threads_running <= svc->srv_threads_started - 1) {
-                status->todo |= PTLRPC_MAIN_ACTIVE_REQ;
-        }
-        if (svc->srv_at_check) {
-                status->todo |= PTLRPC_MAIN_CHECK_TIMED;
-        }
-        if (!cfs_list_empty(&svc->srv_idle_rqbds) &&
-            svc->srv_rqbd_timeout == 0) {
-                status->todo |= PTLRPC_MAIN_REPOST;
-        }
-        /* count this thread as not running if it is going to sleep in
-         * the outer wait event */
-        if (!status->todo) {
-                svc->srv_threads_running--;
-                status->running = 0;
-        }
-        cfs_spin_unlock(&svc->srv_lock);
- out:
-        RETURN(status->todo);
+static inline int
+ptlrpc_at_check(struct ptlrpc_service *svc)
+{
+        return svc->srv_at_check;
+}
+
+/**
+ * requests wait on preprocessing
+ * user can call it w/o any lock but need to hold ptlrpc_service::srv_lock to
+ * get reliable result
+ */
+static inline int
+ptlrpc_server_request_waiting(struct ptlrpc_service *svc)
+{
+        return !cfs_list_empty(&svc->srv_req_in_queue);
 }
 
 /**
@@ -2047,7 +2075,6 @@ static int ptlrpc_main(void *arg)
         struct ptlrpc_service  *svc = data->svc;
         struct ptlrpc_thread   *thread = data->thread;
         struct ptlrpc_reply_state *rs;
-        struct ptlrpc_main_check_s st;
 #ifdef WITH_GROUP_INFO
         cfs_group_info_t *ginfo = NULL;
 #endif
@@ -2111,11 +2138,17 @@ static int ptlrpc_main(void *arg)
         }
 
         cfs_spin_lock(&svc->srv_lock);
+
+        LASSERT((thread->t_flags & SVC_STARTING) != 0);
+        thread->t_flags &= ~SVC_STARTING;
+        svc->srv_threads_starting--;
+
         /* SVC_STOPPING may already be set here if someone else is trying
          * to stop the service while this new thread has been dynamically
          * forked. We still set SVC_RUNNING to let our creator know that
          * we are now running, however we will exit as soon as possible */
         thread->t_flags |= SVC_RUNNING;
+        svc->srv_threads_running++;
         cfs_spin_unlock(&svc->srv_lock);
 
         /*
@@ -2126,20 +2159,16 @@ static int ptlrpc_main(void *arg)
 
         thread->t_watchdog = lc_watchdog_add(CFS_GET_TIMEOUT(svc), NULL, NULL);
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rs_lock);
         cfs_list_add(&rs->rs_list, &svc->srv_free_rs_list);
-        cfs_spin_unlock(&svc->srv_lock);
         cfs_waitq_signal(&svc->srv_free_rs_waitq);
+        cfs_spin_unlock(&svc->srv_rs_lock);
 
         CDEBUG(D_NET, "service thread %d (#%d) started\n", thread->t_id,
                svc->srv_threads_running);
 
         /* XXX maintain a list of all managed devices: insert here */
-
-        st.running = 0;
-        st.todo = 0;
-
-        while (!(st.todo & PTLRPC_MAIN_STOPPING)) {
+        while (!ptlrpc_thread_stopping(thread)) {
                 /* Don't exit while there are replies to be handled */
                 struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
                                                      ptlrpc_retry_rqbds, svc);
@@ -2148,36 +2177,44 @@ static int ptlrpc_main(void *arg)
 
                 cfs_cond_resched();
 
-                l_wait_event_exclusive (svc->srv_waitq,
-                                        ptlrpc_main_check_event(thread, &st),
-                                        &lwi);
+                l_wait_event_exclusive(svc->srv_waitq,
+                                       ptlrpc_thread_stopping(thread) ||
+                                       ptlrpc_server_request_waiting(svc) ||
+                                       ptlrpc_server_request_pending(svc, 0) ||
+                                       ptlrpc_rqbd_pending(svc) ||
+                                       ptlrpc_at_check(svc), &lwi);
+
+                if (ptlrpc_thread_stopping(thread))
+                        break;
 
                 lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc));
 
                 ptlrpc_check_rqbd_pool(svc);
 
-                if (svc->srv_threads_started < svc->srv_threads_max &&
-                    svc->srv_n_active_reqs >= (svc->srv_threads_started - 2))
+                if (ptlrpc_threads_need_create(svc)) {
                         /* Ignore return code - we tried... */
                         ptlrpc_start_thread(svc);
+                }
 
                 /* Process all incoming reqs before handling any */
-                if (st.todo & PTLRPC_MAIN_IN_REQ) {
+                if (ptlrpc_server_request_waiting(svc)) {
                         ptlrpc_server_handle_req_in(svc);
                         /* but limit ourselves in case of flood */
-                        if (counter++ < 1000)
+                        if (counter++ < 100)
                                 continue;
                         counter = 0;
                 }
-                if (st.todo & PTLRPC_MAIN_CHECK_TIMED) {
+
+                if (ptlrpc_at_check(svc))
                         ptlrpc_at_check_timed(svc);
-                }
-                if (st.todo & PTLRPC_MAIN_ACTIVE_REQ) {
+
+                if (ptlrpc_server_request_pending(svc, 0)) {
                         lu_context_enter(&env.le_ctx);
                         ptlrpc_server_handle_request(svc, thread);
                         lu_context_exit(&env.le_ctx);
                 }
-                if ((st.todo & PTLRPC_MAIN_REPOST) &&
+
+                if (ptlrpc_rqbd_pending(svc) &&
                     ptlrpc_server_post_idle_rqbds(svc) < 0) {
                         /* I just failed to repost request buffers.
                          * Wait for a timeout (unless something else
@@ -2204,10 +2241,20 @@ out:
                thread, thread->t_pid, thread->t_id, rc);
 
         cfs_spin_lock(&svc->srv_lock);
-        if (st.running)
+        if ((thread->t_flags & SVC_STARTING) != 0) {
+                svc->srv_threads_starting--;
+                thread->t_flags &= ~SVC_STARTING;
+        }
+
+        if ((thread->t_flags & SVC_RUNNING) != 0) {
+                /* must know immediately */
                 svc->srv_threads_running--;
-        thread->t_id = rc;
-        thread->t_flags = SVC_STOPPED;
+                thread->t_flags &= ~SVC_RUNNING;
+        }
+
+        thread->t_id    = rc;
+        thread->t_flags |= SVC_STOPPED;
+
         cfs_waitq_signal(&thread->t_ctl_waitq);
         cfs_spin_unlock(&svc->srv_lock);
 
@@ -2433,19 +2480,19 @@ int ptlrpc_start_thread(struct ptlrpc_service *svc)
         struct ptlrpc_svc_data d;
         struct ptlrpc_thread *thread;
         char name[32];
-        int id, rc;
+        int rc;
         ENTRY;
 
         CDEBUG(D_RPCTRACE, "%s started %d min %d max %d running %d\n",
-               svc->srv_name, svc->srv_threads_started, svc->srv_threads_min,
+               svc->srv_name, svc->srv_threads_running, svc->srv_threads_min,
                svc->srv_threads_max, svc->srv_threads_running);
 
         if (unlikely(svc->srv_is_stopping))
                 RETURN(-ESRCH);
 
-        if (unlikely(svc->srv_threads_started >= svc->srv_threads_max) ||
+        if (!ptlrpc_threads_increasable(svc) ||
             (OBD_FAIL_CHECK(OBD_FAIL_TGT_TOOMANY_THREADS) &&
-             svc->srv_threads_started == svc->srv_threads_min - 1))
+             svc->srv_threads_running == svc->srv_threads_min - 1))
                 RETURN(-EMFILE);
 
         OBD_ALLOC_PTR(thread);
@@ -2454,18 +2501,21 @@ int ptlrpc_start_thread(struct ptlrpc_service *svc)
         cfs_waitq_init(&thread->t_ctl_waitq);
 
         cfs_spin_lock(&svc->srv_lock);
-        if (svc->srv_threads_started >= svc->srv_threads_max) {
+        if (!ptlrpc_threads_increasable(svc)) {
                 cfs_spin_unlock(&svc->srv_lock);
                 OBD_FREE_PTR(thread);
                 RETURN(-EMFILE);
         }
+
+        svc->srv_threads_starting++;
+        thread->t_id    = svc->srv_threads_next_id++;
+        thread->t_flags |= SVC_STARTING;
+        thread->t_svc   = svc;
+
         cfs_list_add(&thread->t_link, &svc->srv_threads);
-        id = svc->srv_threads_started++;
         cfs_spin_unlock(&svc->srv_lock);
 
-        thread->t_svc = svc;
-        thread->t_id = id;
-        sprintf(name, "%s_%02d", svc->srv_thread_name, id);
+        sprintf(name, "%s_%02d", svc->srv_thread_name, thread->t_id);
         d.svc = svc;
         d.name = name;
         d.thread = thread;
@@ -2481,7 +2531,7 @@ int ptlrpc_start_thread(struct ptlrpc_service *svc)
 
                 cfs_spin_lock(&svc->srv_lock);
                 cfs_list_del(&thread->t_link);
-                --svc->srv_threads_started;
+                --svc->srv_threads_starting;
                 cfs_spin_unlock(&svc->srv_lock);
 
                 OBD_FREE(thread, sizeof(*thread));
@@ -2623,7 +2673,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         }
 
         /* schedule all outstanding replies to terminate them */
-        cfs_spin_lock(&service->srv_lock);
+        cfs_spin_lock(&service->srv_rs_lock);
         while (!cfs_list_empty(&service->srv_active_replies)) {
                 struct ptlrpc_reply_state *rs =
                         cfs_list_entry(service->srv_active_replies.next,
@@ -2632,7 +2682,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 ptlrpc_schedule_difficult_reply(rs);
                 cfs_spin_unlock(&rs->rs_lock);
         }
-        cfs_spin_unlock(&service->srv_lock);
+        cfs_spin_unlock(&service->srv_rs_lock);
 
         /* purge the request queue.  NB No new replies (rqbds all unlinked)
          * and no service threads, so I'm the only thread noodling the
@@ -2646,7 +2696,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 cfs_list_del(&req->rq_list);
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
-                ptlrpc_server_finish_request(req);
+                ptlrpc_server_finish_request(service, req);
         }
         while (ptlrpc_server_request_pending(service, 1)) {
                 struct ptlrpc_request *req;
@@ -2656,7 +2706,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
                 ptlrpc_hpreq_fini(req);
-                ptlrpc_server_finish_request(req);
+                ptlrpc_server_finish_request(service, req);
         }
         LASSERT(service->srv_n_queued_reqs == 0);
         LASSERT(service->srv_n_active_reqs == 0);
@@ -2718,9 +2768,9 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc)
 
         cfs_gettimeofday(&right_now);
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
         if (!ptlrpc_server_request_pending(svc, 1)) {
-                cfs_spin_unlock(&svc->srv_lock);
+                cfs_spin_unlock(&svc->srv_rq_lock);
                 return 0;
         }
 
@@ -2732,7 +2782,7 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc)
                 request = cfs_list_entry(svc->srv_request_queue.next,
                                          struct ptlrpc_request, rq_list);
         timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
 
         if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 :
                                         at_max)) {