Whamcloud - gitweb
b=23994 hide EMFILE error in ptlrpc_start_threads().
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index 5f67efa..79e0e37 100644 (file)
@@ -286,9 +286,9 @@ static void rs_batch_add(struct rs_batch *b, struct ptlrpc_reply_state *rs)
         if (svc != b->rsb_svc || b->rsb_n_replies >= MAX_SCHEDULED) {
                 if (b->rsb_svc != NULL) {
                         rs_batch_dispatch(b);
-                        cfs_spin_unlock(&b->rsb_svc->srv_lock);
+                        cfs_spin_unlock(&b->rsb_svc->srv_rs_lock);
                 }
-                cfs_spin_lock(&svc->srv_lock);
+                cfs_spin_lock(&svc->srv_rs_lock);
                 b->rsb_svc = svc;
         }
         cfs_spin_lock(&rs->rs_lock);
@@ -313,7 +313,7 @@ static void rs_batch_fini(struct rs_batch *b)
 {
         if (b->rsb_svc != 0) {
                 rs_batch_dispatch(b);
-                cfs_spin_unlock(&b->rsb_svc->srv_lock);
+                cfs_spin_unlock(&b->rsb_svc->srv_rs_lock);
         }
 }
 
@@ -357,12 +357,12 @@ ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
 {
         ENTRY;
 
-        LASSERT_SPIN_LOCKED(&rs->rs_service->srv_lock);
+        LASSERT_SPIN_LOCKED(&rs->rs_service->srv_rs_lock);
         LASSERT_SPIN_LOCKED(&rs->rs_lock);
         LASSERT (rs->rs_difficult);
-        rs->rs_scheduled_ever = 1;              /* flag any notification attempt */
+        rs->rs_scheduled_ever = 1;  /* flag any notification attempt */
 
-        if (rs->rs_scheduled) {                  /* being set up or already notified */
+        if (rs->rs_scheduled) {     /* being set up or already notified */
                 EXIT;
                 return;
         }
@@ -454,7 +454,7 @@ ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc)
 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
                                             svc_handler_t h, char *name,
                                             struct proc_dir_entry *proc_entry,
-                                            svcreq_printfn_t prntfn,
+                                            svc_req_printfn_t prntfn,
                                             char *threadname)
 {
         return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
@@ -499,7 +499,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
                 int req_portal, int rep_portal, int watchdog_factor,
                 svc_handler_t handler, char *name,
                 cfs_proc_dir_entry_t *proc_entry,
-                svcreq_printfn_t svcreq_printfn,
+                svc_req_printfn_t svcreq_printfn,
                 int min_threads, int max_threads,
                 char *threadname, __u32 ctx_tags,
                 svc_hpreq_handler_t hp_handler)
@@ -522,6 +522,8 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
 
         service->srv_name = name;
         cfs_spin_lock_init(&service->srv_lock);
+        cfs_spin_lock_init(&service->srv_rq_lock);
+        cfs_spin_lock_init(&service->srv_rs_lock);
         CFS_INIT_LIST_HEAD(&service->srv_threads);
         cfs_waitq_init(&service->srv_waitq);
 
@@ -532,7 +534,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         service->srv_req_portal = req_portal;
         service->srv_watchdog_factor = watchdog_factor;
         service->srv_handler = handler;
-        service->srv_request_history_print_fn = svcreq_printfn;
+        service->srv_req_printfn = svcreq_printfn;
         service->srv_request_seq = 1;           /* valid seq #s start at 1 */
         service->srv_request_max_cull_seq = 0;
         service->srv_threads_min = min_threads;
@@ -542,7 +544,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         service->srv_hpreq_handler = hp_handler;
         service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
         service->srv_hpreq_count = 0;
-        service->srv_n_hpreq = 0;
+        service->srv_n_active_hpreq = 0;
 
         rc = LNetSetLazyPortal(service->srv_req_portal);
         LASSERT (rc == 0);
@@ -648,9 +650,9 @@ void ptlrpc_server_active_request_inc(struct ptlrpc_request *req)
         struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
         struct ptlrpc_service *svc = rqbd->rqbd_service;
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
         svc->srv_n_active_reqs++;
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
 }
 
 /**
@@ -661,9 +663,9 @@ void ptlrpc_server_active_request_dec(struct ptlrpc_request *req)
         struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
         struct ptlrpc_service *svc = rqbd->rqbd_service;
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
         svc->srv_n_active_reqs--;
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
 }
 
 /**
@@ -705,7 +707,6 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
 
         cfs_spin_lock(&svc->srv_lock);
 
-        svc->srv_n_active_reqs--;
         cfs_list_add(&req->rq_list, &rqbd->rqbd_reqs);
 
         refcount = --(rqbd->rqbd_refcount);
@@ -776,8 +777,15 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
  * to finish a request: stop sending more early replies, and release
  * the request. should be called after we finished handling the request.
  */
-static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
+static void ptlrpc_server_finish_request(struct ptlrpc_service *svc,
+                                         struct ptlrpc_request *req)
 {
+        cfs_spin_lock(&svc->srv_rq_lock);
+        svc->srv_n_active_reqs--;
+        if (req->rq_hp)
+                svc->srv_n_active_hpreq--;
+        cfs_spin_unlock(&svc->srv_rq_lock);
+
         ptlrpc_server_drop_request(req);
 }
 
@@ -1237,10 +1245,10 @@ static int ptlrpc_hpreq_init(struct ptlrpc_service *svc,
                         RETURN(rc);
         }
         if (req->rq_export && req->rq_ops) {
-                cfs_spin_lock(&req->rq_export->exp_lock);
+                cfs_spin_lock_bh(&req->rq_export->exp_rpc_lock);
                 cfs_list_add(&req->rq_exp_list,
                              &req->rq_export->exp_queued_rpc);
-                cfs_spin_unlock(&req->rq_export->exp_lock);
+                cfs_spin_unlock_bh(&req->rq_export->exp_rpc_lock);
         }
 
         RETURN(0);
@@ -1251,9 +1259,9 @@ static void ptlrpc_hpreq_fini(struct ptlrpc_request *req)
 {
         ENTRY;
         if (req->rq_export && req->rq_ops) {
-                cfs_spin_lock(&req->rq_export->exp_lock);
+                cfs_spin_lock_bh(&req->rq_export->exp_rpc_lock);
                 cfs_list_del_init(&req->rq_exp_list);
-                cfs_spin_unlock(&req->rq_export->exp_lock);
+                cfs_spin_unlock_bh(&req->rq_export->exp_rpc_lock);
         }
         EXIT;
 }
@@ -1295,12 +1303,12 @@ void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
         ENTRY;
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
         /* It may happen that the request is already taken for the processing
          * but still in the export list, do not re-add it into the HP list. */
         if (req->rq_phase == RQ_PHASE_NEW)
                 ptlrpc_hpreq_reorder_nolock(svc, req);
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
         EXIT;
 }
 
@@ -1332,7 +1340,7 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
         if (rc < 0)
                 RETURN(rc);
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
         /* Before inserting the request into the queue, check if it is not
          * inserted yet, or even already handled -- it may happen due to
          * a racing ldlm_server_blocking_ast(). */
@@ -1343,22 +1351,74 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
                         cfs_list_add_tail(&req->rq_list,
                                           &svc->srv_request_queue);
         }
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
 
         RETURN(0);
 }
 
 /**
+ * Allow to handle high priority request
+ * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
+ * to get reliable result
+ */
+static int ptlrpc_server_allow_high(struct ptlrpc_service *svc, int force)
+{
+        if (force)
+                return 1;
+
+        if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1)
+                return 0;
+
+        return cfs_list_empty(&svc->srv_request_queue) ||
+               svc->srv_hpreq_count < svc->srv_hpreq_ratio;
+}
+
+static int ptlrpc_server_high_pending(struct ptlrpc_service *svc, int force)
+{
+        return ptlrpc_server_allow_high(svc, force) &&
+               !cfs_list_empty(&svc->srv_request_hpq);
+}
+
+/**
  * Only allow normal priority requests on a service that has a high-priority
  * queue if forced (i.e. cleanup), if there are other high priority requests
  * already being processed (i.e. those threads can service more high-priority
  * requests), or if there are enough idle threads that a later thread can do
  * a high priority request.
+ * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
+ * to get reliable result
  */
 static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
 {
-        return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 ||
-                svc->srv_threads_running <= svc->srv_threads_started - 2;
+        if (force ||
+            svc->srv_n_active_reqs < svc->srv_threads_running - 2)
+                return 1;
+
+        if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1)
+                return 0;
+
+        return svc->srv_n_active_hpreq > 0 || svc->srv_hpreq_handler == NULL;
+}
+
+static int ptlrpc_server_normal_pending(struct ptlrpc_service *svc, int force)
+{
+        return ptlrpc_server_allow_normal(svc, force) &&
+               !cfs_list_empty(&svc->srv_request_queue);
+}
+
+/**
+ * Returns true if there are requests available in incoming
+ * request queue for processing and it is allowed to fetch them.
+ * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
+ * to get reliable result
+ * \see ptlrpc_server_allow_normal
+ * \see ptlrpc_server_allow high
+ */
+static inline int
+ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
+{
+        return ptlrpc_server_high_pending(svc, force) ||
+               ptlrpc_server_normal_pending(svc, force);
 }
 
 /**
@@ -1369,34 +1429,24 @@ static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
 static struct ptlrpc_request *
 ptlrpc_server_request_get(struct ptlrpc_service *svc, int force)
 {
-        struct ptlrpc_request *req = NULL;
+        struct ptlrpc_request *req;
         ENTRY;
 
-        if (ptlrpc_server_allow_normal(svc, force) &&
-            !cfs_list_empty(&svc->srv_request_queue) &&
-            (cfs_list_empty(&svc->srv_request_hpq) ||
-             svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) {
-                req = cfs_list_entry(svc->srv_request_queue.next,
-                                     struct ptlrpc_request, rq_list);
-                svc->srv_hpreq_count = 0;
-        } else if (!cfs_list_empty(&svc->srv_request_hpq)) {
+        if (ptlrpc_server_high_pending(svc, force)) {
                 req = cfs_list_entry(svc->srv_request_hpq.next,
                                      struct ptlrpc_request, rq_list);
                 svc->srv_hpreq_count++;
+                RETURN(req);
+
         }
-        RETURN(req);
-}
 
-/**
- * Returns true if there are requests available in incoming
- * request queue for processing and it is allowed to fetch them
- * \see ptlrpc_server_allow_normal
- */
-static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
-{
-        return ((ptlrpc_server_allow_normal(svc, force) &&
-                 !cfs_list_empty(&svc->srv_request_queue)) ||
-                !cfs_list_empty(&svc->srv_request_hpq));
+        if (ptlrpc_server_normal_pending(svc, force)) {
+                req = cfs_list_entry(svc->srv_request_queue.next,
+                                     struct ptlrpc_request, rq_list);
+                svc->srv_hpreq_count = 0;
+                RETURN(req);
+        }
+        RETURN(NULL);
 }
 
 /**
@@ -1424,6 +1474,7 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
         req = cfs_list_entry(svc->srv_req_in_queue.next,
                              struct ptlrpc_request, rq_list);
         cfs_list_del_init (&req->rq_list);
+        svc->srv_n_queued_reqs--;
         /* Consider this still a "queued" request as far as stats are
            concerned */
         cfs_spin_unlock(&svc->srv_lock);
@@ -1538,11 +1589,10 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
         RETURN(1);
 
 err_req:
-        cfs_spin_lock(&svc->srv_lock);
-        svc->srv_n_queued_reqs--;
+        cfs_spin_lock(&svc->srv_rq_lock);
         svc->srv_n_active_reqs++;
-        cfs_spin_unlock(&svc->srv_lock);
-        ptlrpc_server_finish_request(req);
+        cfs_spin_unlock(&svc->srv_rq_lock);
+        ptlrpc_server_finish_request(svc, req);
 
         RETURN(1);
 }
@@ -1566,17 +1616,17 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
 
         LASSERT(svc);
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
 #ifndef __KERNEL__
         /* !@%$# liblustre only has 1 thread */
         if (cfs_atomic_read(&svc->srv_n_difficult_replies) != 0) {
-                cfs_spin_unlock(&svc->srv_lock);
+                cfs_spin_unlock(&svc->srv_rq_lock);
                 RETURN(0);
         }
 #endif
         request = ptlrpc_server_request_get(svc, 0);
         if  (request == NULL) {
-                cfs_spin_unlock(&svc->srv_lock);
+                cfs_spin_unlock(&svc->srv_rq_lock);
                 RETURN(0);
         }
 
@@ -1588,27 +1638,26 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
 
         if (unlikely(fail_opc)) {
                 if (request->rq_export && request->rq_ops) {
-                        cfs_spin_unlock(&svc->srv_lock);
+                        cfs_spin_unlock(&svc->srv_rq_lock);
                         OBD_FAIL_TIMEOUT(fail_opc, 4);
-                        cfs_spin_lock(&svc->srv_lock);
+                        cfs_spin_lock(&svc->srv_rq_lock);
                         request = ptlrpc_server_request_get(svc, 0);
                         if  (request == NULL) {
-                                cfs_spin_unlock(&svc->srv_lock);
+                                cfs_spin_unlock(&svc->srv_rq_lock);
                                 RETURN(0);
                         }
                 }
         }
 
         cfs_list_del_init(&request->rq_list);
-        svc->srv_n_queued_reqs--;
         svc->srv_n_active_reqs++;
         if (request->rq_hp)
-                svc->srv_n_hpreq++;
+                svc->srv_n_active_hpreq++;
 
         /* The phase is changed under the lock here because we need to know
          * the request is under processing (see ptlrpc_hpreq_reorder()). */
         ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
 
         ptlrpc_hpreq_fini(request);
 
@@ -1740,11 +1789,7 @@ put_conn:
         }
 
 out_req:
-        cfs_spin_lock(&svc->srv_lock);
-        if (request->rq_hp)
-                svc->srv_n_hpreq--;
-        cfs_spin_unlock(&svc->srv_lock);
-        ptlrpc_server_finish_request(request);
+        ptlrpc_server_finish_request(svc, request);
 
         RETURN(1);
 }
@@ -1845,7 +1890,6 @@ ptlrpc_handle_rs (struct ptlrpc_reply_state *rs)
                 class_export_put (exp);
                 rs->rs_export = NULL;
                 ptlrpc_rs_decref (rs);
-                cfs_atomic_dec (&svc->srv_outstanding_replies);
                 if (cfs_atomic_dec_and_test(&svc->srv_n_difficult_replies) &&
                     svc->srv_is_stopping)
                         cfs_waitq_broadcast(&svc->srv_waitq);
@@ -1873,14 +1917,14 @@ ptlrpc_server_handle_reply(struct ptlrpc_service *svc)
         struct ptlrpc_reply_state *rs = NULL;
         ENTRY;
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rs_lock);
         if (!cfs_list_empty(&svc->srv_reply_queue)) {
                 rs = cfs_list_entry(svc->srv_reply_queue.prev,
                                     struct ptlrpc_reply_state,
                                     rs_list);
                 cfs_list_del_init(&rs->rs_list);
         }
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rs_lock);
         if (rs != NULL)
                 ptlrpc_handle_rs(rs);
         RETURN(rs != NULL);
@@ -1960,79 +2004,63 @@ ptlrpc_retry_rqbds(void *arg)
         return (-ETIMEDOUT);
 }
 
-/**
- *  Status bits to pass todo info from
- *  ptlrpc_main_check_event to ptlrpc_main.
- */
-#define PTLRPC_MAIN_STOPPING    0x01
-#define PTLRPC_MAIN_IN_REQ      0x02
-#define PTLRPC_MAIN_ACTIVE_REQ  0x04
-#define PTLRPC_MAIN_CHECK_TIMED 0x08
-#define PTLRPC_MAIN_REPOST      0x10
+static inline int
+ptlrpc_threads_enough(struct ptlrpc_service *svc)
+{
+        return svc->srv_n_active_reqs <
+               svc->srv_threads_running - 1 - (svc->srv_hpreq_handler != NULL);
+}
 
 /**
- * A container to share per-thread status variables between
- * ptlrpc_main_check_event and ptlrpc_main functions.
+ * allowed to create more threads
+ * user can call it w/o any lock but need to hold ptlrpc_service::srv_lock to
+ * get reliable result
  */
-struct ptlrpc_main_check_s {
-        /** todo info for the ptrlrpc_main */
-        int todo;
-        /** is this thread counted as running or not? */
-        int running;
-};
+static inline int
+ptlrpc_threads_increasable(struct ptlrpc_service *svc)
+{
+        return svc->srv_threads_running +
+               svc->srv_threads_starting < svc->srv_threads_max;
+}
 
 /**
- * Check whether current service thread has work to do.
+ * too many requests and allowed to create more threads
  */
-static int ptlrpc_main_check_event(struct ptlrpc_thread *t,
-                                   struct ptlrpc_main_check_s *status)
+static inline int
+ptlrpc_threads_need_create(struct ptlrpc_service *svc)
 {
-        struct ptlrpc_service *svc = t->t_svc;
-        ENTRY;
+        return !ptlrpc_threads_enough(svc) && ptlrpc_threads_increasable(svc);
+}
 
-        status->todo = 0;
+static inline int
+ptlrpc_thread_stopping(struct ptlrpc_thread *thread)
+{
+        return (thread->t_flags & SVC_STOPPING) != 0 ||
+                thread->t_svc->srv_is_stopping;
+}
 
-        /* check the stop flags w/o any locking to make all
-         * concurrently running threads stop faster. */
-        if (unlikely((t->t_flags & SVC_STOPPING) ||
-                     svc->srv_is_stopping)) {
-                status->todo |= PTLRPC_MAIN_STOPPING;
-                goto out;
-        }
+static inline int
+ptlrpc_rqbd_pending(struct ptlrpc_service *svc)
+{
+        return !cfs_list_empty(&svc->srv_idle_rqbds) &&
+               svc->srv_rqbd_timeout == 0;
+}
 
-        cfs_spin_lock(&svc->srv_lock);
-        /* ptlrpc_server_request_pending() needs this thread to be
-         * counted as running. */
-        if (!status->running) {
-                svc->srv_threads_running++;
-                status->running = 1;
-        }
-        /* Process all incoming reqs before handling any */
-        if (!cfs_list_empty(&svc->srv_req_in_queue)) {
-                status->todo |= PTLRPC_MAIN_IN_REQ;
-        }
-        /* Don't handle regular requests in the last thread, in order
-         * to handle any incoming reqs, early replies, etc. */
-        if (ptlrpc_server_request_pending(svc, 0) &&
-            svc->srv_threads_running <= svc->srv_threads_started - 1) {
-                status->todo |= PTLRPC_MAIN_ACTIVE_REQ;
-        }
-        if (svc->srv_at_check) {
-                status->todo |= PTLRPC_MAIN_CHECK_TIMED;
-        }
-        if (!cfs_list_empty(&svc->srv_idle_rqbds) &&
-            svc->srv_rqbd_timeout == 0) {
-                status->todo |= PTLRPC_MAIN_REPOST;
-        }
-        /* count this thread as not running if it is going to sleep in
-         * the outer wait event */
-        if (!status->todo) {
-                svc->srv_threads_running--;
-                status->running = 0;
-        }
-        cfs_spin_unlock(&svc->srv_lock);
- out:
-        RETURN(status->todo);
+static inline int
+ptlrpc_at_check(struct ptlrpc_service *svc)
+{
+        return svc->srv_at_check;
+}
+
+/**
+ * requests wait on preprocessing
+ * user can call it w/o any lock but need to hold ptlrpc_service::srv_lock to
+ * get reliable result
+ */
+static inline int
+ptlrpc_server_request_waiting(struct ptlrpc_service *svc)
+{
+        return !cfs_list_empty(&svc->srv_req_in_queue);
 }
 
 /**
@@ -2046,9 +2074,7 @@ static int ptlrpc_main(void *arg)
         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
         struct ptlrpc_service  *svc = data->svc;
         struct ptlrpc_thread   *thread = data->thread;
-        struct obd_device      *dev = data->dev;
         struct ptlrpc_reply_state *rs;
-        struct ptlrpc_main_check_s st;
 #ifdef WITH_GROUP_INFO
         cfs_group_info_t *ginfo = NULL;
 #endif
@@ -2112,11 +2138,17 @@ static int ptlrpc_main(void *arg)
         }
 
         cfs_spin_lock(&svc->srv_lock);
+
+        LASSERT((thread->t_flags & SVC_STARTING) != 0);
+        thread->t_flags &= ~SVC_STARTING;
+        svc->srv_threads_starting--;
+
         /* SVC_STOPPING may already be set here if someone else is trying
          * to stop the service while this new thread has been dynamically
          * forked. We still set SVC_RUNNING to let our creator know that
          * we are now running, however we will exit as soon as possible */
         thread->t_flags |= SVC_RUNNING;
+        svc->srv_threads_running++;
         cfs_spin_unlock(&svc->srv_lock);
 
         /*
@@ -2127,20 +2159,16 @@ static int ptlrpc_main(void *arg)
 
         thread->t_watchdog = lc_watchdog_add(CFS_GET_TIMEOUT(svc), NULL, NULL);
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rs_lock);
         cfs_list_add(&rs->rs_list, &svc->srv_free_rs_list);
-        cfs_spin_unlock(&svc->srv_lock);
         cfs_waitq_signal(&svc->srv_free_rs_waitq);
+        cfs_spin_unlock(&svc->srv_rs_lock);
 
         CDEBUG(D_NET, "service thread %d (#%d) started\n", thread->t_id,
                svc->srv_threads_running);
 
         /* XXX maintain a list of all managed devices: insert here */
-
-        st.running = 0;
-        st.todo = 0;
-
-        while (!(st.todo & PTLRPC_MAIN_STOPPING)) {
+        while (!ptlrpc_thread_stopping(thread)) {
                 /* Don't exit while there are replies to be handled */
                 struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
                                                      ptlrpc_retry_rqbds, svc);
@@ -2149,36 +2177,44 @@ static int ptlrpc_main(void *arg)
 
                 cfs_cond_resched();
 
-                l_wait_event_exclusive (svc->srv_waitq,
-                                        ptlrpc_main_check_event(thread, &st),
-                                        &lwi);
+                l_wait_event_exclusive(svc->srv_waitq,
+                                       ptlrpc_thread_stopping(thread) ||
+                                       ptlrpc_server_request_waiting(svc) ||
+                                       ptlrpc_server_request_pending(svc, 0) ||
+                                       ptlrpc_rqbd_pending(svc) ||
+                                       ptlrpc_at_check(svc), &lwi);
+
+                if (ptlrpc_thread_stopping(thread))
+                        break;
 
                 lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc));
 
                 ptlrpc_check_rqbd_pool(svc);
 
-                if (svc->srv_threads_started < svc->srv_threads_max &&
-                    svc->srv_n_active_reqs >= (svc->srv_threads_started - 1)) 
+                if (ptlrpc_threads_need_create(svc)) {
                         /* Ignore return code - we tried... */
-                        ptlrpc_start_thread(dev, svc);
+                        ptlrpc_start_thread(svc);
+                }
 
                 /* Process all incoming reqs before handling any */
-                if (st.todo & PTLRPC_MAIN_IN_REQ) {
+                if (ptlrpc_server_request_waiting(svc)) {
                         ptlrpc_server_handle_req_in(svc);
                         /* but limit ourselves in case of flood */
-                        if (counter++ < 1000)
+                        if (counter++ < 100)
                                 continue;
                         counter = 0;
                 }
-                if (st.todo & PTLRPC_MAIN_CHECK_TIMED) {
+
+                if (ptlrpc_at_check(svc))
                         ptlrpc_at_check_timed(svc);
-                }
-                if (st.todo & PTLRPC_MAIN_ACTIVE_REQ) {
+
+                if (ptlrpc_server_request_pending(svc, 0)) {
                         lu_context_enter(&env.le_ctx);
                         ptlrpc_server_handle_request(svc, thread);
                         lu_context_exit(&env.le_ctx);
                 }
-                if ((st.todo & PTLRPC_MAIN_REPOST) &&
+
+                if (ptlrpc_rqbd_pending(svc) &&
                     ptlrpc_server_post_idle_rqbds(svc) < 0) {
                         /* I just failed to repost request buffers.
                          * Wait for a timeout (unless something else
@@ -2205,10 +2241,20 @@ out:
                thread, thread->t_pid, thread->t_id, rc);
 
         cfs_spin_lock(&svc->srv_lock);
-        if (st.running)
+        if ((thread->t_flags & SVC_STARTING) != 0) {
+                svc->srv_threads_starting--;
+                thread->t_flags &= ~SVC_STARTING;
+        }
+
+        if ((thread->t_flags & SVC_RUNNING) != 0) {
+                /* must know immediately */
                 svc->srv_threads_running--;
-        thread->t_id = rc;
-        thread->t_flags = SVC_STOPPED;
+                thread->t_flags &= ~SVC_RUNNING;
+        }
+
+        thread->t_id    = rc;
+        thread->t_flags |= SVC_STOPPED;
+
         cfs_waitq_signal(&thread->t_ctl_waitq);
         cfs_spin_unlock(&svc->srv_lock);
 
@@ -2405,7 +2451,7 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
         EXIT;
 }
 
-int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
+int ptlrpc_start_threads(struct ptlrpc_service *svc)
 {
         int i, rc = 0;
         ENTRY;
@@ -2414,10 +2460,12 @@ int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
            ptlrpc_server_handle_request */
         LASSERT(svc->srv_threads_min >= 2);
         for (i = 0; i < svc->srv_threads_min; i++) {
-                rc = ptlrpc_start_thread(dev, svc);
+                rc = ptlrpc_start_thread(svc);
                 /* We have enough threads, don't start more.  b=15759 */
-                if (rc == -EMFILE)
+                if (rc == -EMFILE) {
+                        rc = 0;
                         break;
+                }
                 if (rc) {
                         CERROR("cannot start %s thread #%d: rc %d\n",
                                svc->srv_thread_name, i, rc);
@@ -2428,25 +2476,25 @@ int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
         RETURN(rc);
 }
 
-int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
+int ptlrpc_start_thread(struct ptlrpc_service *svc)
 {
         struct l_wait_info lwi = { 0 };
         struct ptlrpc_svc_data d;
         struct ptlrpc_thread *thread;
         char name[32];
-        int id, rc;
+        int rc;
         ENTRY;
 
         CDEBUG(D_RPCTRACE, "%s started %d min %d max %d running %d\n",
-               svc->srv_name, svc->srv_threads_started, svc->srv_threads_min,
+               svc->srv_name, svc->srv_threads_running, svc->srv_threads_min,
                svc->srv_threads_max, svc->srv_threads_running);
 
         if (unlikely(svc->srv_is_stopping))
                 RETURN(-ESRCH);
 
-        if (unlikely(svc->srv_threads_started >= svc->srv_threads_max) ||
+        if (!ptlrpc_threads_increasable(svc) ||
             (OBD_FAIL_CHECK(OBD_FAIL_TGT_TOOMANY_THREADS) &&
-             svc->srv_threads_started == svc->srv_threads_min - 1))
+             svc->srv_threads_running == svc->srv_threads_min - 1))
                 RETURN(-EMFILE);
 
         OBD_ALLOC_PTR(thread);
@@ -2455,19 +2503,21 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
         cfs_waitq_init(&thread->t_ctl_waitq);
 
         cfs_spin_lock(&svc->srv_lock);
-        if (svc->srv_threads_started >= svc->srv_threads_max) {
+        if (!ptlrpc_threads_increasable(svc)) {
                 cfs_spin_unlock(&svc->srv_lock);
                 OBD_FREE_PTR(thread);
                 RETURN(-EMFILE);
         }
+
+        svc->srv_threads_starting++;
+        thread->t_id    = svc->srv_threads_next_id++;
+        thread->t_flags |= SVC_STARTING;
+        thread->t_svc   = svc;
+
         cfs_list_add(&thread->t_link, &svc->srv_threads);
-        id = svc->srv_threads_started++;
         cfs_spin_unlock(&svc->srv_lock);
 
-        thread->t_svc = svc;
-        thread->t_id = id;
-        sprintf(name, "%s_%02d", svc->srv_thread_name, id);
-        d.dev = dev;
+        sprintf(name, "%s_%02d", svc->srv_thread_name, thread->t_id);
         d.svc = svc;
         d.name = name;
         d.thread = thread;
@@ -2483,7 +2533,7 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
 
                 cfs_spin_lock(&svc->srv_lock);
                 cfs_list_del(&thread->t_link);
-                --svc->srv_threads_started;
+                --svc->srv_threads_starting;
                 cfs_spin_unlock(&svc->srv_lock);
 
                 OBD_FREE(thread, sizeof(*thread));
@@ -2625,7 +2675,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         }
 
         /* schedule all outstanding replies to terminate them */
-        cfs_spin_lock(&service->srv_lock);
+        cfs_spin_lock(&service->srv_rs_lock);
         while (!cfs_list_empty(&service->srv_active_replies)) {
                 struct ptlrpc_reply_state *rs =
                         cfs_list_entry(service->srv_active_replies.next,
@@ -2634,7 +2684,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 ptlrpc_schedule_difficult_reply(rs);
                 cfs_spin_unlock(&rs->rs_lock);
         }
-        cfs_spin_unlock(&service->srv_lock);
+        cfs_spin_unlock(&service->srv_rs_lock);
 
         /* purge the request queue.  NB No new replies (rqbds all unlinked)
          * and no service threads, so I'm the only thread noodling the
@@ -2648,7 +2698,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 cfs_list_del(&req->rq_list);
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
-                ptlrpc_server_finish_request(req);
+                ptlrpc_server_finish_request(service, req);
         }
         while (ptlrpc_server_request_pending(service, 1)) {
                 struct ptlrpc_request *req;
@@ -2658,7 +2708,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
                 ptlrpc_hpreq_fini(req);
-                ptlrpc_server_finish_request(req);
+                ptlrpc_server_finish_request(service, req);
         }
         LASSERT(service->srv_n_queued_reqs == 0);
         LASSERT(service->srv_n_active_reqs == 0);
@@ -2720,9 +2770,9 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc)
 
         cfs_gettimeofday(&right_now);
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
         if (!ptlrpc_server_request_pending(svc, 1)) {
-                cfs_spin_unlock(&svc->srv_lock);
+                cfs_spin_unlock(&svc->srv_rq_lock);
                 return 0;
         }
 
@@ -2734,7 +2784,7 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc)
                 request = cfs_list_entry(svc->srv_request_queue.next,
                                          struct ptlrpc_request, rq_list);
         timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
 
         if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 :
                                         at_max)) {