Whamcloud - gitweb
LU-292 srv_n_queued_reqs is screwed up by ptlrpc_unregister_service
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index 5f67efa..0513730 100644 (file)
@@ -68,28 +68,6 @@ static int ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc);
 static CFS_LIST_HEAD(ptlrpc_all_services);
 cfs_spinlock_t ptlrpc_all_services_lock;
 
-static char *
-ptlrpc_alloc_request_buffer (int size)
-{
-        char *ptr;
-
-        if (size > SVC_BUF_VMALLOC_THRESHOLD)
-                OBD_VMALLOC(ptr, size);
-        else
-                OBD_ALLOC(ptr, size);
-
-        return (ptr);
-}
-
-static void
-ptlrpc_free_request_buffer (char *ptr, int size)
-{
-        if (size > SVC_BUF_VMALLOC_THRESHOLD)
-                OBD_VFREE(ptr, size);
-        else
-                OBD_FREE(ptr, size);
-}
-
 struct ptlrpc_request_buffer_desc *
 ptlrpc_alloc_rqbd (struct ptlrpc_service *svc)
 {
@@ -104,7 +82,7 @@ ptlrpc_alloc_rqbd (struct ptlrpc_service *svc)
         rqbd->rqbd_cbid.cbid_fn = request_in_callback;
         rqbd->rqbd_cbid.cbid_arg = rqbd;
         CFS_INIT_LIST_HEAD(&rqbd->rqbd_reqs);
-        rqbd->rqbd_buffer = ptlrpc_alloc_request_buffer(svc->srv_buf_size);
+        OBD_ALLOC_LARGE(rqbd->rqbd_buffer, svc->srv_buf_size);
 
         if (rqbd->rqbd_buffer == NULL) {
                 OBD_FREE_PTR(rqbd);
@@ -132,7 +110,7 @@ ptlrpc_free_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
         svc->srv_nbufs--;
         cfs_spin_unlock(&svc->srv_lock);
 
-        ptlrpc_free_request_buffer (rqbd->rqbd_buffer, svc->srv_buf_size);
+        OBD_FREE_LARGE(rqbd->rqbd_buffer, svc->srv_buf_size);
         OBD_FREE_PTR(rqbd);
 }
 
@@ -286,9 +264,9 @@ static void rs_batch_add(struct rs_batch *b, struct ptlrpc_reply_state *rs)
         if (svc != b->rsb_svc || b->rsb_n_replies >= MAX_SCHEDULED) {
                 if (b->rsb_svc != NULL) {
                         rs_batch_dispatch(b);
-                        cfs_spin_unlock(&b->rsb_svc->srv_lock);
+                        cfs_spin_unlock(&b->rsb_svc->srv_rs_lock);
                 }
-                cfs_spin_lock(&svc->srv_lock);
+                cfs_spin_lock(&svc->srv_rs_lock);
                 b->rsb_svc = svc;
         }
         cfs_spin_lock(&rs->rs_lock);
@@ -313,7 +291,7 @@ static void rs_batch_fini(struct rs_batch *b)
 {
         if (b->rsb_svc != 0) {
                 rs_batch_dispatch(b);
-                cfs_spin_unlock(&b->rsb_svc->srv_lock);
+                cfs_spin_unlock(&b->rsb_svc->srv_rs_lock);
         }
 }
 
@@ -357,12 +335,12 @@ ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
 {
         ENTRY;
 
-        LASSERT_SPIN_LOCKED(&rs->rs_service->srv_lock);
+        LASSERT_SPIN_LOCKED(&rs->rs_service->srv_rs_lock);
         LASSERT_SPIN_LOCKED(&rs->rs_lock);
         LASSERT (rs->rs_difficult);
-        rs->rs_scheduled_ever = 1;              /* flag any notification attempt */
+        rs->rs_scheduled_ever = 1;  /* flag any notification attempt */
 
-        if (rs->rs_scheduled) {                  /* being set up or already notified */
+        if (rs->rs_scheduled) {     /* being set up or already notified */
                 EXIT;
                 return;
         }
@@ -454,7 +432,7 @@ ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc)
 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
                                             svc_handler_t h, char *name,
                                             struct proc_dir_entry *proc_entry,
-                                            svcreq_printfn_t prntfn,
+                                            svc_req_printfn_t prntfn,
                                             char *threadname)
 {
         return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
@@ -499,7 +477,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
                 int req_portal, int rep_portal, int watchdog_factor,
                 svc_handler_t handler, char *name,
                 cfs_proc_dir_entry_t *proc_entry,
-                svcreq_printfn_t svcreq_printfn,
+                svc_req_printfn_t svcreq_printfn,
                 int min_threads, int max_threads,
                 char *threadname, __u32 ctx_tags,
                 svc_hpreq_handler_t hp_handler)
@@ -522,6 +500,8 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
 
         service->srv_name = name;
         cfs_spin_lock_init(&service->srv_lock);
+        cfs_spin_lock_init(&service->srv_rq_lock);
+        cfs_spin_lock_init(&service->srv_rs_lock);
         CFS_INIT_LIST_HEAD(&service->srv_threads);
         cfs_waitq_init(&service->srv_waitq);
 
@@ -532,7 +512,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         service->srv_req_portal = req_portal;
         service->srv_watchdog_factor = watchdog_factor;
         service->srv_handler = handler;
-        service->srv_request_history_print_fn = svcreq_printfn;
+        service->srv_req_printfn = svcreq_printfn;
         service->srv_request_seq = 1;           /* valid seq #s start at 1 */
         service->srv_request_max_cull_seq = 0;
         service->srv_threads_min = min_threads;
@@ -542,7 +522,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         service->srv_hpreq_handler = hp_handler;
         service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
         service->srv_hpreq_count = 0;
-        service->srv_n_hpreq = 0;
+        service->srv_n_active_hpreq = 0;
 
         rc = LNetSetLazyPortal(service->srv_req_portal);
         LASSERT (rc == 0);
@@ -641,32 +621,6 @@ static void ptlrpc_server_free_request(struct ptlrpc_request *req)
 }
 
 /**
- * increment the number of active requests consuming service threads.
- */
-void ptlrpc_server_active_request_inc(struct ptlrpc_request *req)
-{
-        struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
-        struct ptlrpc_service *svc = rqbd->rqbd_service;
-
-        cfs_spin_lock(&svc->srv_lock);
-        svc->srv_n_active_reqs++;
-        cfs_spin_unlock(&svc->srv_lock);
-}
-
-/**
- * decrement the number of active requests consuming service threads.
- */
-void ptlrpc_server_active_request_dec(struct ptlrpc_request *req)
-{
-        struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
-        struct ptlrpc_service *svc = rqbd->rqbd_service;
-
-        cfs_spin_lock(&svc->srv_lock);
-        svc->srv_n_active_reqs--;
-        cfs_spin_unlock(&svc->srv_lock);
-}
-
-/**
  * drop a reference count of the request. if it reaches 0, we either
  * put it into history list, or free it immediately.
  */
@@ -705,7 +659,6 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
 
         cfs_spin_lock(&svc->srv_lock);
 
-        svc->srv_n_active_reqs--;
         cfs_list_add(&req->rq_list, &rqbd->rqbd_reqs);
 
         refcount = --(rqbd->rqbd_refcount);
@@ -776,8 +729,15 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
  * to finish a request: stop sending more early replies, and release
  * the request. should be called after we finished handling the request.
  */
-static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
+static void ptlrpc_server_finish_request(struct ptlrpc_service *svc,
+                                         struct ptlrpc_request *req)
 {
+        cfs_spin_lock(&svc->srv_rq_lock);
+        svc->srv_n_active_reqs--;
+        if (req->rq_hp)
+                svc->srv_n_active_hpreq--;
+        cfs_spin_unlock(&svc->srv_rq_lock);
+
         ptlrpc_server_drop_request(req);
 }
 
@@ -871,6 +831,8 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
  */
 static int ptlrpc_check_req(struct ptlrpc_request *req)
 {
+        int rc = 0;
+
         if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) <
                      req->rq_export->exp_conn_cnt)) {
                 DEBUG_REQ(D_ERROR, req,
@@ -885,12 +847,28 @@ static int ptlrpc_check_req(struct ptlrpc_request *req)
                 error response instead. */
                 CDEBUG(D_RPCTRACE, "Dropping req %p for failed obd %s\n",
                        req, req->rq_export->exp_obd->obd_name);
-                req->rq_status = -ENODEV;
+                rc = -ENODEV;
+        } else if (lustre_msg_get_flags(req->rq_reqmsg) &
+                   (MSG_REPLAY | MSG_REQ_REPLAY_DONE) &&
+                   !(req->rq_export->exp_obd->obd_recovering)) {
+                        DEBUG_REQ(D_ERROR, req,
+                                  "Invalid replay without recovery");
+                        class_fail_export(req->rq_export);
+                        rc = -ENODEV;
+        } else if (lustre_msg_get_transno(req->rq_reqmsg) != 0 &&
+                   !(req->rq_export->exp_obd->obd_recovering)) {
+                        DEBUG_REQ(D_ERROR, req, "Invalid req with transno "
+                                  LPU64" without recovery",
+                                  lustre_msg_get_transno(req->rq_reqmsg));
+                        class_fail_export(req->rq_export);
+                        rc = -ENODEV;
+        }
+
+        if (unlikely(rc < 0)) {
+                req->rq_status = rc;
                 ptlrpc_error(req);
-                return -ENODEV;
         }
-
-        return 0;
+        return rc;
 }
 
 static void ptlrpc_at_set_timer(struct ptlrpc_service *svc)
@@ -1046,7 +1024,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
         OBD_ALLOC(reqcopy, sizeof *reqcopy);
         if (reqcopy == NULL)
                 RETURN(-ENOMEM);
-        OBD_ALLOC(reqmsg, req->rq_reqlen);
+        OBD_ALLOC_LARGE(reqmsg, req->rq_reqlen);
         if (!reqmsg) {
                 OBD_FREE(reqcopy, sizeof *reqcopy);
                 RETURN(-ENOMEM);
@@ -1106,7 +1084,7 @@ out_put:
         class_export_put(reqcopy->rq_export);
 out:
         sptlrpc_svc_ctx_decref(reqcopy);
-        OBD_FREE(reqmsg, req->rq_reqlen);
+        OBD_FREE_LARGE(reqmsg, req->rq_reqlen);
         OBD_FREE(reqcopy, sizeof *reqcopy);
         RETURN(rc);
 }
@@ -1237,10 +1215,10 @@ static int ptlrpc_hpreq_init(struct ptlrpc_service *svc,
                         RETURN(rc);
         }
         if (req->rq_export && req->rq_ops) {
-                cfs_spin_lock(&req->rq_export->exp_lock);
+                cfs_spin_lock_bh(&req->rq_export->exp_rpc_lock);
                 cfs_list_add(&req->rq_exp_list,
                              &req->rq_export->exp_queued_rpc);
-                cfs_spin_unlock(&req->rq_export->exp_lock);
+                cfs_spin_unlock_bh(&req->rq_export->exp_rpc_lock);
         }
 
         RETURN(0);
@@ -1251,9 +1229,9 @@ static void ptlrpc_hpreq_fini(struct ptlrpc_request *req)
 {
         ENTRY;
         if (req->rq_export && req->rq_ops) {
-                cfs_spin_lock(&req->rq_export->exp_lock);
+                cfs_spin_lock_bh(&req->rq_export->exp_rpc_lock);
                 cfs_list_del_init(&req->rq_exp_list);
-                cfs_spin_unlock(&req->rq_export->exp_lock);
+                cfs_spin_unlock_bh(&req->rq_export->exp_rpc_lock);
         }
         EXIT;
 }
@@ -1295,12 +1273,12 @@ void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
         ENTRY;
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
         /* It may happen that the request is already taken for the processing
          * but still in the export list, do not re-add it into the HP list. */
         if (req->rq_phase == RQ_PHASE_NEW)
                 ptlrpc_hpreq_reorder_nolock(svc, req);
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
         EXIT;
 }
 
@@ -1332,7 +1310,7 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
         if (rc < 0)
                 RETURN(rc);
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
         /* Before inserting the request into the queue, check if it is not
          * inserted yet, or even already handled -- it may happen due to
          * a racing ldlm_server_blocking_ast(). */
@@ -1343,22 +1321,78 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
                         cfs_list_add_tail(&req->rq_list,
                                           &svc->srv_request_queue);
         }
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
 
         RETURN(0);
 }
 
 /**
+ * Allow to handle high priority request
+ * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
+ * to get reliable result
+ */
+static int ptlrpc_server_allow_high(struct ptlrpc_service *svc, int force)
+{
+        if (force)
+                return 1;
+
+        if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1)
+                return 0;
+
+        return cfs_list_empty(&svc->srv_request_queue) ||
+               svc->srv_hpreq_count < svc->srv_hpreq_ratio;
+}
+
+static int ptlrpc_server_high_pending(struct ptlrpc_service *svc, int force)
+{
+        return ptlrpc_server_allow_high(svc, force) &&
+               !cfs_list_empty(&svc->srv_request_hpq);
+}
+
+/**
  * Only allow normal priority requests on a service that has a high-priority
  * queue if forced (i.e. cleanup), if there are other high priority requests
  * already being processed (i.e. those threads can service more high-priority
  * requests), or if there are enough idle threads that a later thread can do
  * a high priority request.
+ * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
+ * to get reliable result
  */
 static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
 {
-        return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 ||
-                svc->srv_threads_running <= svc->srv_threads_started - 2;
+#ifndef __KERNEL__
+        if (1) /* always allow to handle normal request for liblustre */
+                return 1;
+#endif
+        if (force ||
+            svc->srv_n_active_reqs < svc->srv_threads_running - 2)
+                return 1;
+
+        if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1)
+                return 0;
+
+        return svc->srv_n_active_hpreq > 0 || svc->srv_hpreq_handler == NULL;
+}
+
+static int ptlrpc_server_normal_pending(struct ptlrpc_service *svc, int force)
+{
+        return ptlrpc_server_allow_normal(svc, force) &&
+               !cfs_list_empty(&svc->srv_request_queue);
+}
+
+/**
+ * Returns true if there are requests available in incoming
+ * request queue for processing and it is allowed to fetch them.
+ * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
+ * to get reliable result
+ * \see ptlrpc_server_allow_normal
+ * \see ptlrpc_server_allow high
+ */
+static inline int
+ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
+{
+        return ptlrpc_server_high_pending(svc, force) ||
+               ptlrpc_server_normal_pending(svc, force);
 }
 
 /**
@@ -1369,34 +1403,24 @@ static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
 static struct ptlrpc_request *
 ptlrpc_server_request_get(struct ptlrpc_service *svc, int force)
 {
-        struct ptlrpc_request *req = NULL;
+        struct ptlrpc_request *req;
         ENTRY;
 
-        if (ptlrpc_server_allow_normal(svc, force) &&
-            !cfs_list_empty(&svc->srv_request_queue) &&
-            (cfs_list_empty(&svc->srv_request_hpq) ||
-             svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) {
-                req = cfs_list_entry(svc->srv_request_queue.next,
-                                     struct ptlrpc_request, rq_list);
-                svc->srv_hpreq_count = 0;
-        } else if (!cfs_list_empty(&svc->srv_request_hpq)) {
+        if (ptlrpc_server_high_pending(svc, force)) {
                 req = cfs_list_entry(svc->srv_request_hpq.next,
                                      struct ptlrpc_request, rq_list);
                 svc->srv_hpreq_count++;
+                RETURN(req);
+
         }
-        RETURN(req);
-}
 
-/**
- * Returns true if there are requests available in incoming
- * request queue for processing and it is allowed to fetch them
- * \see ptlrpc_server_allow_normal
- */
-static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
-{
-        return ((ptlrpc_server_allow_normal(svc, force) &&
-                 !cfs_list_empty(&svc->srv_request_queue)) ||
-                !cfs_list_empty(&svc->srv_request_hpq));
+        if (ptlrpc_server_normal_pending(svc, force)) {
+                req = cfs_list_entry(svc->srv_request_queue.next,
+                                     struct ptlrpc_request, rq_list);
+                svc->srv_hpreq_count = 0;
+                RETURN(req);
+        }
+        RETURN(NULL);
 }
 
 /**
@@ -1424,8 +1448,14 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
         req = cfs_list_entry(svc->srv_req_in_queue.next,
                              struct ptlrpc_request, rq_list);
         cfs_list_del_init (&req->rq_list);
+        svc->srv_n_queued_reqs--;
         /* Consider this still a "queued" request as far as stats are
            concerned */
+        /* ptlrpc_hpreq_init() inserts it to the export list and by the time
+         * of ptlrpc_server_request_add() it could be already handled and
+         * released. To not lose request in between, take an extra reference
+         * on the request. */
+        ptlrpc_request_addref(req);
         cfs_spin_unlock(&svc->srv_lock);
 
         /* go through security check/transform */
@@ -1465,9 +1495,9 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
         }
 
         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_REQ_OPC) &&
-            lustre_msg_get_opc(req->rq_reqmsg) == obd_fail_val) {
+            lustre_msg_get_opc(req->rq_reqmsg) == cfs_fail_val) {
                 CERROR("drop incoming rpc opc %u, x"LPU64"\n",
-                       obd_fail_val, req->rq_xid);
+                       cfs_fail_val, req->rq_xid);
                 goto err_req;
         }
 
@@ -1535,14 +1565,15 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
         if (rc)
                 GOTO(err_req, rc);
         cfs_waitq_signal(&svc->srv_waitq);
+        ptlrpc_server_drop_request(req);
         RETURN(1);
 
 err_req:
-        cfs_spin_lock(&svc->srv_lock);
-        svc->srv_n_queued_reqs--;
+        ptlrpc_server_drop_request(req);
+        cfs_spin_lock(&svc->srv_rq_lock);
         svc->srv_n_active_reqs++;
-        cfs_spin_unlock(&svc->srv_lock);
-        ptlrpc_server_finish_request(req);
+        cfs_spin_unlock(&svc->srv_rq_lock);
+        ptlrpc_server_finish_request(svc, req);
 
         RETURN(1);
 }
@@ -1566,17 +1597,17 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
 
         LASSERT(svc);
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
 #ifndef __KERNEL__
         /* !@%$# liblustre only has 1 thread */
         if (cfs_atomic_read(&svc->srv_n_difficult_replies) != 0) {
-                cfs_spin_unlock(&svc->srv_lock);
+                cfs_spin_unlock(&svc->srv_rq_lock);
                 RETURN(0);
         }
 #endif
         request = ptlrpc_server_request_get(svc, 0);
         if  (request == NULL) {
-                cfs_spin_unlock(&svc->srv_lock);
+                cfs_spin_unlock(&svc->srv_rq_lock);
                 RETURN(0);
         }
 
@@ -1588,27 +1619,26 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
 
         if (unlikely(fail_opc)) {
                 if (request->rq_export && request->rq_ops) {
-                        cfs_spin_unlock(&svc->srv_lock);
+                        cfs_spin_unlock(&svc->srv_rq_lock);
                         OBD_FAIL_TIMEOUT(fail_opc, 4);
-                        cfs_spin_lock(&svc->srv_lock);
+                        cfs_spin_lock(&svc->srv_rq_lock);
                         request = ptlrpc_server_request_get(svc, 0);
                         if  (request == NULL) {
-                                cfs_spin_unlock(&svc->srv_lock);
+                                cfs_spin_unlock(&svc->srv_rq_lock);
                                 RETURN(0);
                         }
                 }
         }
 
         cfs_list_del_init(&request->rq_list);
-        svc->srv_n_queued_reqs--;
         svc->srv_n_active_reqs++;
         if (request->rq_hp)
-                svc->srv_n_hpreq++;
+                svc->srv_n_active_hpreq++;
 
         /* The phase is changed under the lock here because we need to know
          * the request is under processing (see ptlrpc_hpreq_reorder()). */
         ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
 
         ptlrpc_hpreq_fini(request);
 
@@ -1675,7 +1705,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                lustre_msg_get_opc(request->rq_reqmsg));
 
         if (lustre_msg_get_opc(request->rq_reqmsg) != OBD_PING)
-                OBD_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, obd_fail_val);
+                CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, cfs_fail_val);
 
         rc = svc->srv_handler(request);
 
@@ -1740,11 +1770,7 @@ put_conn:
         }
 
 out_req:
-        cfs_spin_lock(&svc->srv_lock);
-        if (request->rq_hp)
-                svc->srv_n_hpreq--;
-        cfs_spin_unlock(&svc->srv_lock);
-        ptlrpc_server_finish_request(request);
+        ptlrpc_server_finish_request(svc, request);
 
         RETURN(1);
 }
@@ -1845,7 +1871,6 @@ ptlrpc_handle_rs (struct ptlrpc_reply_state *rs)
                 class_export_put (exp);
                 rs->rs_export = NULL;
                 ptlrpc_rs_decref (rs);
-                cfs_atomic_dec (&svc->srv_outstanding_replies);
                 if (cfs_atomic_dec_and_test(&svc->srv_n_difficult_replies) &&
                     svc->srv_is_stopping)
                         cfs_waitq_broadcast(&svc->srv_waitq);
@@ -1873,14 +1898,14 @@ ptlrpc_server_handle_reply(struct ptlrpc_service *svc)
         struct ptlrpc_reply_state *rs = NULL;
         ENTRY;
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rs_lock);
         if (!cfs_list_empty(&svc->srv_reply_queue)) {
                 rs = cfs_list_entry(svc->srv_reply_queue.prev,
                                     struct ptlrpc_reply_state,
                                     rs_list);
                 cfs_list_del_init(&rs->rs_list);
         }
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rs_lock);
         if (rs != NULL)
                 ptlrpc_handle_rs(rs);
         RETURN(rs != NULL);
@@ -1960,79 +1985,90 @@ ptlrpc_retry_rqbds(void *arg)
         return (-ETIMEDOUT);
 }
 
+static inline int
+ptlrpc_threads_enough(struct ptlrpc_service *svc)
+{
+        return svc->srv_n_active_reqs <
+               svc->srv_threads_running - 1 - (svc->srv_hpreq_handler != NULL);
+}
+
 /**
- *  Status bits to pass todo info from
- *  ptlrpc_main_check_event to ptlrpc_main.
+ * allowed to create more threads
+ * user can call it w/o any lock but need to hold ptlrpc_service::srv_lock to
+ * get reliable result
  */
-#define PTLRPC_MAIN_STOPPING    0x01
-#define PTLRPC_MAIN_IN_REQ      0x02
-#define PTLRPC_MAIN_ACTIVE_REQ  0x04
-#define PTLRPC_MAIN_CHECK_TIMED 0x08
-#define PTLRPC_MAIN_REPOST      0x10
+static inline int
+ptlrpc_threads_increasable(struct ptlrpc_service *svc)
+{
+        return svc->srv_threads_running +
+               svc->srv_threads_starting < svc->srv_threads_max;
+}
 
 /**
- * A container to share per-thread status variables between
- * ptlrpc_main_check_event and ptlrpc_main functions.
+ * too many requests and allowed to create more threads
  */
-struct ptlrpc_main_check_s {
-        /** todo info for the ptrlrpc_main */
-        int todo;
-        /** is this thread counted as running or not? */
-        int running;
-};
+static inline int
+ptlrpc_threads_need_create(struct ptlrpc_service *svc)
+{
+        return !ptlrpc_threads_enough(svc) && ptlrpc_threads_increasable(svc);
+}
+
+static inline int
+ptlrpc_thread_stopping(struct ptlrpc_thread *thread)
+{
+        return (thread->t_flags & SVC_STOPPING) != 0 ||
+                thread->t_svc->srv_is_stopping;
+}
+
+static inline int
+ptlrpc_rqbd_pending(struct ptlrpc_service *svc)
+{
+        return !cfs_list_empty(&svc->srv_idle_rqbds) &&
+               svc->srv_rqbd_timeout == 0;
+}
+
+static inline int
+ptlrpc_at_check(struct ptlrpc_service *svc)
+{
+        return svc->srv_at_check;
+}
 
 /**
- * Check whether current service thread has work to do.
+ * requests wait on preprocessing
+ * user can call it w/o any lock but need to hold ptlrpc_service::srv_lock to
+ * get reliable result
  */
-static int ptlrpc_main_check_event(struct ptlrpc_thread *t,
-                                   struct ptlrpc_main_check_s *status)
+static inline int
+ptlrpc_server_request_waiting(struct ptlrpc_service *svc)
 {
-        struct ptlrpc_service *svc = t->t_svc;
-        ENTRY;
+        return !cfs_list_empty(&svc->srv_req_in_queue);
+}
 
-        status->todo = 0;
+static __attribute__((__noinline__)) int
+ptlrpc_wait_event(struct ptlrpc_service *svc,
+                  struct ptlrpc_thread *thread)
+{
+        /* Don't exit while there are replies to be handled */
+        struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
+                                             ptlrpc_retry_rqbds, svc);
 
-        /* check the stop flags w/o any locking to make all
-         * concurrently running threads stop faster. */
-        if (unlikely((t->t_flags & SVC_STOPPING) ||
-                     svc->srv_is_stopping)) {
-                status->todo |= PTLRPC_MAIN_STOPPING;
-                goto out;
-        }
+        lc_watchdog_disable(thread->t_watchdog);
 
-        cfs_spin_lock(&svc->srv_lock);
-        /* ptlrpc_server_request_pending() needs this thread to be
-         * counted as running. */
-        if (!status->running) {
-                svc->srv_threads_running++;
-                status->running = 1;
-        }
-        /* Process all incoming reqs before handling any */
-        if (!cfs_list_empty(&svc->srv_req_in_queue)) {
-                status->todo |= PTLRPC_MAIN_IN_REQ;
-        }
-        /* Don't handle regular requests in the last thread, in order
-         * to handle any incoming reqs, early replies, etc. */
-        if (ptlrpc_server_request_pending(svc, 0) &&
-            svc->srv_threads_running <= svc->srv_threads_started - 1) {
-                status->todo |= PTLRPC_MAIN_ACTIVE_REQ;
-        }
-        if (svc->srv_at_check) {
-                status->todo |= PTLRPC_MAIN_CHECK_TIMED;
-        }
-        if (!cfs_list_empty(&svc->srv_idle_rqbds) &&
-            svc->srv_rqbd_timeout == 0) {
-                status->todo |= PTLRPC_MAIN_REPOST;
-        }
-        /* count this thread as not running if it is going to sleep in
-         * the outer wait event */
-        if (!status->todo) {
-                svc->srv_threads_running--;
-                status->running = 0;
-        }
-        cfs_spin_unlock(&svc->srv_lock);
- out:
-        RETURN(status->todo);
+        cfs_cond_resched();
+
+        l_wait_event_exclusive_head(svc->srv_waitq,
+                               ptlrpc_thread_stopping(thread) ||
+                               ptlrpc_server_request_waiting(svc) ||
+                               ptlrpc_server_request_pending(svc, 0) ||
+                               ptlrpc_rqbd_pending(svc) ||
+                               ptlrpc_at_check(svc), &lwi);
+
+        if (ptlrpc_thread_stopping(thread))
+                return -EINTR;
+
+        lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc));
+
+        return 0;
 }
 
 /**
@@ -2046,9 +2082,7 @@ static int ptlrpc_main(void *arg)
         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
         struct ptlrpc_service  *svc = data->svc;
         struct ptlrpc_thread   *thread = data->thread;
-        struct obd_device      *dev = data->dev;
         struct ptlrpc_reply_state *rs;
-        struct ptlrpc_main_check_s st;
 #ifdef WITH_GROUP_INFO
         cfs_group_info_t *ginfo = NULL;
 #endif
@@ -2105,18 +2139,24 @@ static int ptlrpc_main(void *arg)
         env.le_ctx.lc_cookie = 0x6;
 
         /* Alloc reply state structure for this one */
-        OBD_ALLOC_GFP(rs, svc->srv_max_reply_size, CFS_ALLOC_STD);
+        OBD_ALLOC_LARGE(rs, svc->srv_max_reply_size);
         if (!rs) {
                 rc = -ENOMEM;
                 goto out_srv_fini;
         }
 
         cfs_spin_lock(&svc->srv_lock);
+
+        LASSERT((thread->t_flags & SVC_STARTING) != 0);
+        thread->t_flags &= ~SVC_STARTING;
+        svc->srv_threads_starting--;
+
         /* SVC_STOPPING may already be set here if someone else is trying
          * to stop the service while this new thread has been dynamically
          * forked. We still set SVC_RUNNING to let our creator know that
          * we are now running, however we will exit as soon as possible */
         thread->t_flags |= SVC_RUNNING;
+        svc->srv_threads_running++;
         cfs_spin_unlock(&svc->srv_lock);
 
         /*
@@ -2127,58 +2167,45 @@ static int ptlrpc_main(void *arg)
 
         thread->t_watchdog = lc_watchdog_add(CFS_GET_TIMEOUT(svc), NULL, NULL);
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rs_lock);
         cfs_list_add(&rs->rs_list, &svc->srv_free_rs_list);
-        cfs_spin_unlock(&svc->srv_lock);
         cfs_waitq_signal(&svc->srv_free_rs_waitq);
+        cfs_spin_unlock(&svc->srv_rs_lock);
 
         CDEBUG(D_NET, "service thread %d (#%d) started\n", thread->t_id,
                svc->srv_threads_running);
 
         /* XXX maintain a list of all managed devices: insert here */
-
-        st.running = 0;
-        st.todo = 0;
-
-        while (!(st.todo & PTLRPC_MAIN_STOPPING)) {
-                /* Don't exit while there are replies to be handled */
-                struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
-                                                     ptlrpc_retry_rqbds, svc);
-
-                lc_watchdog_disable(thread->t_watchdog);
-
-                cfs_cond_resched();
-
-                l_wait_event_exclusive (svc->srv_waitq,
-                                        ptlrpc_main_check_event(thread, &st),
-                                        &lwi);
-
-                lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc));
+        while (!ptlrpc_thread_stopping(thread)) {
+                if (ptlrpc_wait_event(svc, thread))
+                        break;
 
                 ptlrpc_check_rqbd_pool(svc);
 
-                if (svc->srv_threads_started < svc->srv_threads_max &&
-                    svc->srv_n_active_reqs >= (svc->srv_threads_started - 1)) 
+                if (ptlrpc_threads_need_create(svc)) {
                         /* Ignore return code - we tried... */
-                        ptlrpc_start_thread(dev, svc);
+                        ptlrpc_start_thread(svc);
+                }
 
                 /* Process all incoming reqs before handling any */
-                if (st.todo & PTLRPC_MAIN_IN_REQ) {
+                if (ptlrpc_server_request_waiting(svc)) {
                         ptlrpc_server_handle_req_in(svc);
                         /* but limit ourselves in case of flood */
-                        if (counter++ < 1000)
+                        if (counter++ < 100)
                                 continue;
                         counter = 0;
                 }
-                if (st.todo & PTLRPC_MAIN_CHECK_TIMED) {
+
+                if (ptlrpc_at_check(svc))
                         ptlrpc_at_check_timed(svc);
-                }
-                if (st.todo & PTLRPC_MAIN_ACTIVE_REQ) {
+
+                if (ptlrpc_server_request_pending(svc, 0)) {
                         lu_context_enter(&env.le_ctx);
                         ptlrpc_server_handle_request(svc, thread);
                         lu_context_exit(&env.le_ctx);
                 }
-                if ((st.todo & PTLRPC_MAIN_REPOST) &&
+
+                if (ptlrpc_rqbd_pending(svc) &&
                     ptlrpc_server_post_idle_rqbds(svc) < 0) {
                         /* I just failed to repost request buffers.
                          * Wait for a timeout (unless something else
@@ -2205,10 +2232,20 @@ out:
                thread, thread->t_pid, thread->t_id, rc);
 
         cfs_spin_lock(&svc->srv_lock);
-        if (st.running)
+        if ((thread->t_flags & SVC_STARTING) != 0) {
+                svc->srv_threads_starting--;
+                thread->t_flags &= ~SVC_STARTING;
+        }
+
+        if ((thread->t_flags & SVC_RUNNING) != 0) {
+                /* must know immediately */
                 svc->srv_threads_running--;
-        thread->t_id = rc;
-        thread->t_flags = SVC_STOPPED;
+                thread->t_flags &= ~SVC_RUNNING;
+        }
+
+        thread->t_id    = rc;
+        thread->t_flags |= SVC_STOPPED;
+
         cfs_waitq_signal(&thread->t_ctl_waitq);
         cfs_spin_unlock(&svc->srv_lock);
 
@@ -2259,7 +2296,7 @@ static int ptlrpc_hr_main(void *arg)
 
         while (!cfs_test_bit(HRT_STOPPING, &t->hrt_flags)) {
 
-                l_cfs_wait_event(t->hrt_wait, hrt_dont_sleep(t, &replies));
+                l_wait_condition(t->hrt_wait, hrt_dont_sleep(t, &replies));
                 while (!cfs_list_empty(&replies)) {
                         struct ptlrpc_reply_state *rs;
 
@@ -2288,13 +2325,12 @@ static int ptlrpc_start_hr_thread(struct ptlrpc_hr_service *hr, int n, int cpu)
         args.cpu_index = cpu;
         args.hrs = hr;
 
-        rc = cfs_kernel_thread(ptlrpc_hr_main, (void*)&args,
-                               CLONE_VM|CLONE_FILES);
+        rc = cfs_create_thread(ptlrpc_hr_main, (void*)&args, CFS_DAEMON_FLAGS);
         if (rc < 0) {
                 cfs_complete(&t->hrt_completion);
                 GOTO(out, rc);
         }
-        l_cfs_wait_event(t->hrt_wait, cfs_test_bit(HRT_RUNNING, &t->hrt_flags));
+        l_wait_condition(t->hrt_wait, cfs_test_bit(HRT_RUNNING, &t->hrt_flags));
         RETURN(0);
  out:
         return rc;
@@ -2405,7 +2441,7 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
         EXIT;
 }
 
-int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
+int ptlrpc_start_threads(struct ptlrpc_service *svc)
 {
         int i, rc = 0;
         ENTRY;
@@ -2414,10 +2450,12 @@ int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
            ptlrpc_server_handle_request */
         LASSERT(svc->srv_threads_min >= 2);
         for (i = 0; i < svc->srv_threads_min; i++) {
-                rc = ptlrpc_start_thread(dev, svc);
+                rc = ptlrpc_start_thread(svc);
                 /* We have enough threads, don't start more.  b=15759 */
-                if (rc == -EMFILE)
+                if (rc == -EMFILE) {
+                        rc = 0;
                         break;
+                }
                 if (rc) {
                         CERROR("cannot start %s thread #%d: rc %d\n",
                                svc->srv_thread_name, i, rc);
@@ -2428,25 +2466,25 @@ int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
         RETURN(rc);
 }
 
-int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
+int ptlrpc_start_thread(struct ptlrpc_service *svc)
 {
         struct l_wait_info lwi = { 0 };
         struct ptlrpc_svc_data d;
         struct ptlrpc_thread *thread;
         char name[32];
-        int id, rc;
+        int rc;
         ENTRY;
 
         CDEBUG(D_RPCTRACE, "%s started %d min %d max %d running %d\n",
-               svc->srv_name, svc->srv_threads_started, svc->srv_threads_min,
+               svc->srv_name, svc->srv_threads_running, svc->srv_threads_min,
                svc->srv_threads_max, svc->srv_threads_running);
 
         if (unlikely(svc->srv_is_stopping))
                 RETURN(-ESRCH);
 
-        if (unlikely(svc->srv_threads_started >= svc->srv_threads_max) ||
+        if (!ptlrpc_threads_increasable(svc) ||
             (OBD_FAIL_CHECK(OBD_FAIL_TGT_TOOMANY_THREADS) &&
-             svc->srv_threads_started == svc->srv_threads_min - 1))
+             svc->srv_threads_running == svc->srv_threads_min - 1))
                 RETURN(-EMFILE);
 
         OBD_ALLOC_PTR(thread);
@@ -2455,19 +2493,21 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
         cfs_waitq_init(&thread->t_ctl_waitq);
 
         cfs_spin_lock(&svc->srv_lock);
-        if (svc->srv_threads_started >= svc->srv_threads_max) {
+        if (!ptlrpc_threads_increasable(svc)) {
                 cfs_spin_unlock(&svc->srv_lock);
                 OBD_FREE_PTR(thread);
                 RETURN(-EMFILE);
         }
+
+        svc->srv_threads_starting++;
+        thread->t_id    = svc->srv_threads_next_id++;
+        thread->t_flags |= SVC_STARTING;
+        thread->t_svc   = svc;
+
         cfs_list_add(&thread->t_link, &svc->srv_threads);
-        id = svc->srv_threads_started++;
         cfs_spin_unlock(&svc->srv_lock);
 
-        thread->t_svc = svc;
-        thread->t_id = id;
-        sprintf(name, "%s_%02d", svc->srv_thread_name, id);
-        d.dev = dev;
+        sprintf(name, "%s_%02d", svc->srv_thread_name, thread->t_id);
         d.svc = svc;
         d.name = name;
         d.thread = thread;
@@ -2477,13 +2517,13 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
          * just drop the VM and FILES in cfs_daemonize_ctxt() right away.
          */
-        rc = cfs_kernel_thread(ptlrpc_main, &d, CLONE_VM | CLONE_FILES);
+        rc = cfs_create_thread(ptlrpc_main, &d, CFS_DAEMON_FLAGS);
         if (rc < 0) {
                 CERROR("cannot start thread '%s': rc %d\n", name, rc);
 
                 cfs_spin_lock(&svc->srv_lock);
                 cfs_list_del(&thread->t_link);
-                --svc->srv_threads_started;
+                --svc->srv_threads_starting;
                 cfs_spin_unlock(&svc->srv_lock);
 
                 OBD_FREE(thread, sizeof(*thread));
@@ -2625,7 +2665,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         }
 
         /* schedule all outstanding replies to terminate them */
-        cfs_spin_lock(&service->srv_lock);
+        cfs_spin_lock(&service->srv_rs_lock);
         while (!cfs_list_empty(&service->srv_active_replies)) {
                 struct ptlrpc_reply_state *rs =
                         cfs_list_entry(service->srv_active_replies.next,
@@ -2634,7 +2674,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 ptlrpc_schedule_difficult_reply(rs);
                 cfs_spin_unlock(&rs->rs_lock);
         }
-        cfs_spin_unlock(&service->srv_lock);
+        cfs_spin_unlock(&service->srv_rs_lock);
 
         /* purge the request queue.  NB No new replies (rqbds all unlinked)
          * and no service threads, so I'm the only thread noodling the
@@ -2648,17 +2688,16 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 cfs_list_del(&req->rq_list);
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
-                ptlrpc_server_finish_request(req);
+                ptlrpc_server_finish_request(service, req);
         }
         while (ptlrpc_server_request_pending(service, 1)) {
                 struct ptlrpc_request *req;
 
                 req = ptlrpc_server_request_get(service, 1);
                 cfs_list_del(&req->rq_list);
-                service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
                 ptlrpc_hpreq_fini(req);
-                ptlrpc_server_finish_request(req);
+                ptlrpc_server_finish_request(service, req);
         }
         LASSERT(service->srv_n_queued_reqs == 0);
         LASSERT(service->srv_n_active_reqs == 0);
@@ -2681,7 +2720,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         cfs_list_for_each_entry_safe(rs, t, &service->srv_free_rs_list,
                                      rs_list) {
                 cfs_list_del(&rs->rs_list);
-                OBD_FREE(rs, service->srv_max_reply_size);
+                OBD_FREE_LARGE(rs, service->srv_max_reply_size);
         }
 
         /* In case somebody rearmed this in the meantime */
@@ -2720,9 +2759,9 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc)
 
         cfs_gettimeofday(&right_now);
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
         if (!ptlrpc_server_request_pending(svc, 1)) {
-                cfs_spin_unlock(&svc->srv_lock);
+                cfs_spin_unlock(&svc->srv_rq_lock);
                 return 0;
         }
 
@@ -2734,7 +2773,7 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc)
                 request = cfs_list_entry(svc->srv_request_queue.next,
                                          struct ptlrpc_request, rq_list);
         timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
 
         if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 :
                                         at_max)) {