Whamcloud - gitweb
b=23701 uninline part of ptlrpc_main to reduce stack usage
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index 8f31ef1..b13b82b 100644 (file)
@@ -26,7 +26,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  */
 /*
@@ -161,6 +161,10 @@ ptlrpc_grow_req_bufs(struct ptlrpc_service *svc)
         return (0);
 }
 
+/**
+ * Part of Rep-Ack logic.
+ * Puts a lock and its mode into reply state assotiated to request reply.
+ */
 void
 ptlrpc_save_lock(struct ptlrpc_request *req,
                  struct lustre_handle *lock, int mode, int no_ack)
@@ -230,6 +234,22 @@ static void rs_batch_init(struct rs_batch *b)
 }
 
 /**
+ * Choose an hr thread to dispatch requests to.
+ */
+static unsigned int get_hr_thread_index(struct ptlrpc_hr_service *hr)
+{
+        unsigned int idx;
+
+        /* Concurrent modification of hr_index w/o any spinlock
+           protection is harmless as long as the result fits
+           [0..(hr_n_threads-1)] range and each thread gets near equal
+           load. */
+        idx = hr->hr_index;
+        hr->hr_index = (idx >= hr->hr_n_threads - 1) ? 0 : idx + 1;
+        return idx;
+}
+
+/**
  * Dispatch all replies accumulated in the batch to one from
  * dedicated reply handling threads.
  *
@@ -241,9 +261,7 @@ static void rs_batch_dispatch(struct rs_batch *b)
                 struct ptlrpc_hr_service *hr = ptlrpc_hr;
                 int idx;
 
-                idx = hr->hr_index++;
-                if (hr->hr_index >= hr->hr_n_threads)
-                        hr->hr_index = 0;
+                idx = get_hr_thread_index(hr);
 
                 cfs_spin_lock(&hr->hr_threads[idx].hrt_lock);
                 cfs_list_splice_init(&b->rsb_replies,
@@ -268,9 +286,9 @@ static void rs_batch_add(struct rs_batch *b, struct ptlrpc_reply_state *rs)
         if (svc != b->rsb_svc || b->rsb_n_replies >= MAX_SCHEDULED) {
                 if (b->rsb_svc != NULL) {
                         rs_batch_dispatch(b);
-                        cfs_spin_unlock(&b->rsb_svc->srv_lock);
+                        cfs_spin_unlock(&b->rsb_svc->srv_rs_lock);
                 }
-                cfs_spin_lock(&svc->srv_lock);
+                cfs_spin_lock(&svc->srv_rs_lock);
                 b->rsb_svc = svc;
         }
         cfs_spin_lock(&rs->rs_lock);
@@ -295,7 +313,7 @@ static void rs_batch_fini(struct rs_batch *b)
 {
         if (b->rsb_svc != 0) {
                 rs_batch_dispatch(b);
-                cfs_spin_unlock(&b->rsb_svc->srv_lock);
+                cfs_spin_unlock(&b->rsb_svc->srv_rs_lock);
         }
 }
 
@@ -310,6 +328,10 @@ static void rs_batch_fini(struct rs_batch *b)
 
 #endif /* __KERNEL__ */
 
+/**
+ * Put reply state into a queue for processing because we received
+ * ACK from the client
+ */
 void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs)
 {
 #ifdef __KERNEL__
@@ -319,9 +341,7 @@ void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs)
 
         LASSERT(cfs_list_empty(&rs->rs_list));
 
-        idx = hr->hr_index++;
-        if (hr->hr_index >= hr->hr_n_threads)
-                hr->hr_index = 0;
+        idx = get_hr_thread_index(hr);
         cfs_spin_lock(&hr->hr_threads[idx].hrt_lock);
         cfs_list_add_tail(&rs->rs_list, &hr->hr_threads[idx].hrt_queue);
         cfs_spin_unlock(&hr->hr_threads[idx].hrt_lock);
@@ -337,12 +357,12 @@ ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
 {
         ENTRY;
 
-        LASSERT_SPIN_LOCKED(&rs->rs_service->srv_lock);
+        LASSERT_SPIN_LOCKED(&rs->rs_service->srv_rs_lock);
         LASSERT_SPIN_LOCKED(&rs->rs_lock);
         LASSERT (rs->rs_difficult);
-        rs->rs_scheduled_ever = 1;              /* flag any notification attempt */
+        rs->rs_scheduled_ever = 1;  /* flag any notification attempt */
 
-        if (rs->rs_scheduled) {                  /* being set up or already notified */
+        if (rs->rs_scheduled) {     /* being set up or already notified */
                 EXIT;
                 return;
         }
@@ -427,10 +447,14 @@ ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc)
         return (-1);
 }
 
+/**
+ * Start a service with parameters from struct ptlrpc_service_conf \a c
+ * as opposed to directly calling ptlrpc_init_svc with tons of arguments.
+ */
 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
                                             svc_handler_t h, char *name,
                                             struct proc_dir_entry *proc_entry,
-                                            svcreq_printfn_t prntfn,
+                                            svc_req_printfn_t prntfn,
                                             char *threadname)
 {
         return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
@@ -451,13 +475,31 @@ static void ptlrpc_at_timer(unsigned long castmeharder)
         cfs_waitq_signal(&svc->srv_waitq);
 }
 
-/* @threadname should be 11 characters or less - 3 will be added on */
+/**
+ * Initialize service on a given portal.
+ * This includes starting serving threads , allocating and posting rqbds and
+ * so on.
+ * \a nbufs is how many buffers to post
+ * \a bufsize is buffer size to post
+ * \a max_req_size - maximum request size to be accepted for this service
+ * \a max_reply_size maximum reply size this service can ever send
+ * \a req_portal - portal to listed for requests on
+ * \a rep_portal - portal of where to send replies to
+ * \a watchdog_factor soft watchdog timeout multiplifier to print stuck service traces.
+ * \a handler - function to process every new request
+ * \a name - service name
+ * \a proc_entry - entry in the /proc tree for sttistics reporting
+ * \a min_threads \a max_threads - min/max number of service threads to start.
+ * \a threadname should be 11 characters or less - 3 will be added on
+ * \a hp_handler - function to determine priority of the request, also called
+ *                 on every new request.
+ */
 struct ptlrpc_service *
 ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
                 int req_portal, int rep_portal, int watchdog_factor,
                 svc_handler_t handler, char *name,
                 cfs_proc_dir_entry_t *proc_entry,
-                svcreq_printfn_t svcreq_printfn,
+                svc_req_printfn_t svcreq_printfn,
                 int min_threads, int max_threads,
                 char *threadname, __u32 ctx_tags,
                 svc_hpreq_handler_t hp_handler)
@@ -480,6 +522,8 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
 
         service->srv_name = name;
         cfs_spin_lock_init(&service->srv_lock);
+        cfs_spin_lock_init(&service->srv_rq_lock);
+        cfs_spin_lock_init(&service->srv_rs_lock);
         CFS_INIT_LIST_HEAD(&service->srv_threads);
         cfs_waitq_init(&service->srv_waitq);
 
@@ -490,7 +534,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         service->srv_req_portal = req_portal;
         service->srv_watchdog_factor = watchdog_factor;
         service->srv_handler = handler;
-        service->srv_request_history_print_fn = svcreq_printfn;
+        service->srv_req_printfn = svcreq_printfn;
         service->srv_request_seq = 1;           /* valid seq #s start at 1 */
         service->srv_request_max_cull_seq = 0;
         service->srv_threads_min = min_threads;
@@ -500,7 +544,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         service->srv_hpreq_handler = hp_handler;
         service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
         service->srv_hpreq_count = 0;
-        service->srv_n_hpreq = 0;
+        service->srv_n_active_hpreq = 0;
 
         rc = LNetSetLazyPortal(service->srv_req_portal);
         LASSERT (rc == 0);
@@ -599,6 +643,32 @@ static void ptlrpc_server_free_request(struct ptlrpc_request *req)
 }
 
 /**
+ * increment the number of active requests consuming service threads.
+ */
+void ptlrpc_server_active_request_inc(struct ptlrpc_request *req)
+{
+        struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
+        struct ptlrpc_service *svc = rqbd->rqbd_service;
+
+        cfs_spin_lock(&svc->srv_rq_lock);
+        svc->srv_n_active_reqs++;
+        cfs_spin_unlock(&svc->srv_rq_lock);
+}
+
+/**
+ * decrement the number of active requests consuming service threads.
+ */
+void ptlrpc_server_active_request_dec(struct ptlrpc_request *req)
+{
+        struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
+        struct ptlrpc_service *svc = rqbd->rqbd_service;
+
+        cfs_spin_lock(&svc->srv_rq_lock);
+        svc->srv_n_active_reqs--;
+        cfs_spin_unlock(&svc->srv_rq_lock);
+}
+
+/**
  * drop a reference count of the request. if it reaches 0, we either
  * put it into history list, or free it immediately.
  */
@@ -620,7 +690,9 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
 
                 LASSERT(!cfs_list_empty(&req->rq_timed_list));
                 cfs_list_del_init(&req->rq_timed_list);
+                cfs_spin_lock(&req->rq_lock);
                 req->rq_at_linked = 0;
+                cfs_spin_unlock(&req->rq_lock);
                 array->paa_reqs_count[index]--;
                 array->paa_count--;
         } else
@@ -635,7 +707,6 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
 
         cfs_spin_lock(&svc->srv_lock);
 
-        svc->srv_n_active_reqs--;
         cfs_list_add(&req->rq_list, &rqbd->rqbd_reqs);
 
         refcount = --(rqbd->rqbd_refcount);
@@ -706,14 +777,23 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
  * to finish a request: stop sending more early replies, and release
  * the request. should be called after we finished handling the request.
  */
-static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
+static void ptlrpc_server_finish_request(struct ptlrpc_service *svc,
+                                         struct ptlrpc_request *req)
 {
+        cfs_spin_lock(&svc->srv_rq_lock);
+        svc->srv_n_active_reqs--;
+        if (req->rq_hp)
+                svc->srv_n_active_hpreq--;
+        cfs_spin_unlock(&svc->srv_rq_lock);
+
         ptlrpc_server_drop_request(req);
 }
 
-/* This function makes sure dead exports are evicted in a timely manner.
-   This function is only called when some export receives a message (i.e.,
-   the network is up.) */
+/**
+ * This function makes sure dead exports are evicted in a timely manner.
+ * This function is only called when some export receives a message (i.e.,
+ * the network is up.)
+ */
 static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
 {
         struct obd_export *oldest_exp;
@@ -793,6 +873,10 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
         EXIT;
 }
 
+/**
+ * Sanity check request \a req.
+ * Return 0 if all is ok, error code otherwise.
+ */
 static int ptlrpc_check_req(struct ptlrpc_request *req)
 {
         if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) <
@@ -881,7 +965,9 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
                 cfs_list_add(&req->rq_timed_list,
                              &array->paa_reqs_array[index]);
 
+        cfs_spin_lock(&req->rq_lock);
         req->rq_at_linked = 1;
+        cfs_spin_unlock(&req->rq_lock);
         req->rq_at_index = index;
         array->paa_reqs_count[index]++;
         array->paa_count++;
@@ -936,29 +1022,34 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
         if (req->rq_export &&
             lustre_msg_get_flags(req->rq_reqmsg) &
             (MSG_REPLAY | MSG_REQ_REPLAY_DONE | MSG_LOCK_REPLAY_DONE)) {
-                /**
-                 * Use at_extra as early reply period for recovery requests but
-                 * make sure it is not bigger than recovery time / 4
-                 */
-                at_add(&svc->srv_at_estimate,
-                       min(at_extra,
-                           req->rq_export->exp_obd->obd_recovery_timeout / 4));
+                /* During recovery, we don't want to send too many early
+                 * replies, but on the other hand we want to make sure the
+                 * client has enough time to resend if the rpc is lost. So
+                 * during the recovery period send at least 4 early replies,
+                 * spacing them every at_extra if we can. at_estimate should
+                 * always equal this fixed value during recovery. */
+                at_measured(&svc->srv_at_estimate, min(at_extra,
+                            req->rq_export->exp_obd->obd_recovery_timeout / 4));
         } else {
                 /* Fake our processing time into the future to ask the clients
                  * for some extra amount of time */
-                at_add(&svc->srv_at_estimate, at_extra);
+                at_measured(&svc->srv_at_estimate, at_extra +
+                            cfs_time_current_sec() -
+                            req->rq_arrival_time.tv_sec);
+
+                /* Check to see if we've actually increased the deadline -
+                 * we may be past adaptive_max */
+                if (req->rq_deadline >= req->rq_arrival_time.tv_sec +
+                    at_get(&svc->srv_at_estimate)) {
+                        DEBUG_REQ(D_WARNING, req, "Couldn't add any time "
+                                  "(%ld/%ld), not sending early reply\n",
+                                  olddl, req->rq_arrival_time.tv_sec +
+                                  at_get(&svc->srv_at_estimate) -
+                                  cfs_time_current_sec());
+                        RETURN(-ETIMEDOUT);
+                }
         }
-
         newdl = cfs_time_current_sec() + at_get(&svc->srv_at_estimate);
-        if (req->rq_deadline >= newdl) {
-                /* We're not adding any time, no need to send an early reply
-                   (e.g. maybe at adaptive_max) */
-                DEBUG_REQ(D_WARNING, req, "Couldn't add any time ("
-                          CFS_DURATION_T"/"CFS_DURATION_T"), "
-                          "not sending early reply\n", olddl,
-                          cfs_time_sub(newdl, cfs_time_current_sec()));
-                RETURN(-ETIMEDOUT);
-        }
 
         OBD_ALLOC(reqcopy, sizeof *reqcopy);
         if (reqcopy == NULL)
@@ -1087,7 +1178,9 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
                                 counter++;
                                 array->paa_reqs_count[index]--;
                                 array->paa_count--;
+                                cfs_spin_lock(&rq->rq_lock);
                                 rq->rq_at_linked = 0;
+                                cfs_spin_unlock(&rq->rq_lock);
                                 continue;
                         }
 
@@ -1152,10 +1245,10 @@ static int ptlrpc_hpreq_init(struct ptlrpc_service *svc,
                         RETURN(rc);
         }
         if (req->rq_export && req->rq_ops) {
-                cfs_spin_lock(&req->rq_export->exp_lock);
+                cfs_spin_lock_bh(&req->rq_export->exp_rpc_lock);
                 cfs_list_add(&req->rq_exp_list,
                              &req->rq_export->exp_queued_rpc);
-                cfs_spin_unlock(&req->rq_export->exp_lock);
+                cfs_spin_unlock_bh(&req->rq_export->exp_rpc_lock);
         }
 
         RETURN(0);
@@ -1166,9 +1259,9 @@ static void ptlrpc_hpreq_fini(struct ptlrpc_request *req)
 {
         ENTRY;
         if (req->rq_export && req->rq_ops) {
-                cfs_spin_lock(&req->rq_export->exp_lock);
+                cfs_spin_lock_bh(&req->rq_export->exp_rpc_lock);
                 cfs_list_del_init(&req->rq_exp_list);
-                cfs_spin_unlock(&req->rq_export->exp_lock);
+                cfs_spin_unlock_bh(&req->rq_export->exp_rpc_lock);
         }
         EXIT;
 }
@@ -1202,17 +1295,20 @@ static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service *svc,
         EXIT;
 }
 
+/**
+ * \see ptlrpc_hpreq_reorder_nolock
+ */
 void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
 {
         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
         ENTRY;
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
         /* It may happen that the request is already taken for the processing
          * but still in the export list, do not re-add it into the HP list. */
         if (req->rq_phase == RQ_PHASE_NEW)
                 ptlrpc_hpreq_reorder_nolock(svc, req);
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
         EXIT;
 }
 
@@ -1244,7 +1340,7 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
         if (rc < 0)
                 RETURN(rc);
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
         /* Before inserting the request into the queue, check if it is not
          * inserted yet, or even already handled -- it may happen due to
          * a racing ldlm_server_blocking_ast(). */
@@ -1255,52 +1351,110 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
                         cfs_list_add_tail(&req->rq_list,
                                           &svc->srv_request_queue);
         }
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
 
         RETURN(0);
 }
 
-/* Only allow normal priority requests on a service that has a high-priority
+/**
+ * Allow to handle high priority request
+ * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
+ * to get reliable result
+ */
+static int ptlrpc_server_allow_high(struct ptlrpc_service *svc, int force)
+{
+        if (force)
+                return 1;
+
+        if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1)
+                return 0;
+
+        return cfs_list_empty(&svc->srv_request_queue) ||
+               svc->srv_hpreq_count < svc->srv_hpreq_ratio;
+}
+
+static int ptlrpc_server_high_pending(struct ptlrpc_service *svc, int force)
+{
+        return ptlrpc_server_allow_high(svc, force) &&
+               !cfs_list_empty(&svc->srv_request_hpq);
+}
+
+/**
+ * Only allow normal priority requests on a service that has a high-priority
  * queue if forced (i.e. cleanup), if there are other high priority requests
  * already being processed (i.e. those threads can service more high-priority
  * requests), or if there are enough idle threads that a later thread can do
- * a high priority request. */
+ * a high priority request.
+ * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
+ * to get reliable result
+ */
 static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
 {
-        return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 ||
-               svc->srv_n_active_reqs < svc->srv_threads_running - 2;
+        if (force ||
+            svc->srv_n_active_reqs < svc->srv_threads_running - 2)
+                return 1;
+
+        if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1)
+                return 0;
+
+        return svc->srv_n_active_hpreq > 0 || svc->srv_hpreq_handler == NULL;
+}
+
+static int ptlrpc_server_normal_pending(struct ptlrpc_service *svc, int force)
+{
+        return ptlrpc_server_allow_normal(svc, force) &&
+               !cfs_list_empty(&svc->srv_request_queue);
+}
+
+/**
+ * Returns true if there are requests available in incoming
+ * request queue for processing and it is allowed to fetch them.
+ * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
+ * to get reliable result
+ * \see ptlrpc_server_allow_normal
+ * \see ptlrpc_server_allow high
+ */
+static inline int
+ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
+{
+        return ptlrpc_server_high_pending(svc, force) ||
+               ptlrpc_server_normal_pending(svc, force);
 }
 
+/**
+ * Fetch a request for processing from queue of unprocessed requests.
+ * Favors high-priority requests.
+ * Returns a pointer to fetched request.
+ */
 static struct ptlrpc_request *
 ptlrpc_server_request_get(struct ptlrpc_service *svc, int force)
 {
-        struct ptlrpc_request *req = NULL;
+        struct ptlrpc_request *req;
         ENTRY;
 
-        if (ptlrpc_server_allow_normal(svc, force) &&
-            !cfs_list_empty(&svc->srv_request_queue) &&
-            (cfs_list_empty(&svc->srv_request_hpq) ||
-             svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) {
-                req = cfs_list_entry(svc->srv_request_queue.next,
-                                     struct ptlrpc_request, rq_list);
-                svc->srv_hpreq_count = 0;
-        } else if (!cfs_list_empty(&svc->srv_request_hpq)) {
+        if (ptlrpc_server_high_pending(svc, force)) {
                 req = cfs_list_entry(svc->srv_request_hpq.next,
                                      struct ptlrpc_request, rq_list);
                 svc->srv_hpreq_count++;
+                RETURN(req);
+
         }
-        RETURN(req);
-}
 
-static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
-{
-        return ((ptlrpc_server_allow_normal(svc, force) &&
-                 !cfs_list_empty(&svc->srv_request_queue)) ||
-                !cfs_list_empty(&svc->srv_request_hpq));
+        if (ptlrpc_server_normal_pending(svc, force)) {
+                req = cfs_list_entry(svc->srv_request_queue.next,
+                                     struct ptlrpc_request, rq_list);
+                svc->srv_hpreq_count = 0;
+                RETURN(req);
+        }
+        RETURN(NULL);
 }
 
-/* Handle freshly incoming reqs, add to timed early reply list,
-   pass on to regular request queue */
+/**
+ * Handle freshly incoming reqs, add to timed early reply list,
+ * pass on to regular request queue.
+ * All incoming requests pass through here before getting into
+ * ptlrpc_server_handle_req later on.
+ */
 static int
 ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
 {
@@ -1320,8 +1474,14 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
         req = cfs_list_entry(svc->srv_req_in_queue.next,
                              struct ptlrpc_request, rq_list);
         cfs_list_del_init (&req->rq_list);
+        svc->srv_n_queued_reqs--;
         /* Consider this still a "queued" request as far as stats are
            concerned */
+        /* ptlrpc_hpreq_init() inserts it to the export list and by the time
+         * of ptlrpc_server_request_add() it could be already handled and
+         * released. To not lose request in between, take an extra reference
+         * on the request. */
+        ptlrpc_request_addref(req);
         cfs_spin_unlock(&svc->srv_lock);
 
         /* go through security check/transform */
@@ -1431,18 +1591,23 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
         if (rc)
                 GOTO(err_req, rc);
         cfs_waitq_signal(&svc->srv_waitq);
+        ptlrpc_server_drop_request(req);
         RETURN(1);
 
 err_req:
-        cfs_spin_lock(&svc->srv_lock);
-        svc->srv_n_queued_reqs--;
+        ptlrpc_server_drop_request(req);
+        cfs_spin_lock(&svc->srv_rq_lock);
         svc->srv_n_active_reqs++;
-        cfs_spin_unlock(&svc->srv_lock);
-        ptlrpc_server_finish_request(req);
+        cfs_spin_unlock(&svc->srv_rq_lock);
+        ptlrpc_server_finish_request(svc, req);
 
         RETURN(1);
 }
 
+/**
+ * Main incoming request handling logic.
+ * Calls handler function from service to do actual processing.
+ */
 static int
 ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                              struct ptlrpc_thread *thread)
@@ -1458,25 +1623,17 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
 
         LASSERT(svc);
 
-        cfs_spin_lock(&svc->srv_lock);
-        if (unlikely(!ptlrpc_server_request_pending(svc, 0) ||
-            (
+        cfs_spin_lock(&svc->srv_rq_lock);
 #ifndef __KERNEL__
-             /* !@%$# liblustre only has 1 thread */
-             cfs_atomic_read(&svc->srv_n_difficult_replies) != 0 &&
-#endif
-             svc->srv_n_active_reqs >= (svc->srv_threads_running - 1)))) {
-                 /* Don't handle regular requests in the last thread, in order               * re
-                  * to handle difficult replies (which might block other threads)
-                  * as well as handle any incoming reqs, early replies, etc.
-                  * That means we always need at least 2 service threads. */
-                cfs_spin_unlock(&svc->srv_lock);
+        /* !@%$# liblustre only has 1 thread */
+        if (cfs_atomic_read(&svc->srv_n_difficult_replies) != 0) {
+                cfs_spin_unlock(&svc->srv_rq_lock);
                 RETURN(0);
-             }
-
+        }
+#endif
         request = ptlrpc_server_request_get(svc, 0);
         if  (request == NULL) {
-                cfs_spin_unlock(&svc->srv_lock);
+                cfs_spin_unlock(&svc->srv_rq_lock);
                 RETURN(0);
         }
 
@@ -1488,28 +1645,26 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
 
         if (unlikely(fail_opc)) {
                 if (request->rq_export && request->rq_ops) {
-                        cfs_spin_unlock(&svc->srv_lock);
+                        cfs_spin_unlock(&svc->srv_rq_lock);
                         OBD_FAIL_TIMEOUT(fail_opc, 4);
-                        cfs_spin_lock(&svc->srv_lock);
+                        cfs_spin_lock(&svc->srv_rq_lock);
                         request = ptlrpc_server_request_get(svc, 0);
                         if  (request == NULL) {
-                                cfs_spin_unlock(&svc->srv_lock);
+                                cfs_spin_unlock(&svc->srv_rq_lock);
                                 RETURN(0);
                         }
-                        LASSERT(ptlrpc_server_request_pending(svc, 0));
                 }
         }
 
         cfs_list_del_init(&request->rq_list);
-        svc->srv_n_queued_reqs--;
         svc->srv_n_active_reqs++;
         if (request->rq_hp)
-                svc->srv_n_hpreq++;
+                svc->srv_n_active_hpreq++;
 
         /* The phase is changed under the lock here because we need to know
          * the request is under processing (see ptlrpc_hpreq_reorder()). */
         ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
 
         ptlrpc_hpreq_fini(request);
 
@@ -1603,7 +1758,7 @@ put_conn:
         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
         CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
                "%s:%s+%d:%d:x"LPU64":%s:%d Request procesed in "
-               "%lds (%lds total) trans "LPU64" rc %d/%d\n",
+               "%ldus (%ldus total) trans "LPU64" rc %d/%d\n",
                 cfs_curproc_comm(),
                 (request->rq_export ?
                  (char *)request->rq_export->exp_client_uuid.uuid : "0"),
@@ -1641,11 +1796,7 @@ put_conn:
         }
 
 out_req:
-        cfs_spin_lock(&svc->srv_lock);
-        if (request->rq_hp)
-                svc->srv_n_hpreq--;
-        cfs_spin_unlock(&svc->srv_lock);
-        ptlrpc_server_finish_request(request);
+        ptlrpc_server_finish_request(svc, request);
 
         RETURN(1);
 }
@@ -1746,7 +1897,6 @@ ptlrpc_handle_rs (struct ptlrpc_reply_state *rs)
                 class_export_put (exp);
                 rs->rs_export = NULL;
                 ptlrpc_rs_decref (rs);
-                cfs_atomic_dec (&svc->srv_outstanding_replies);
                 if (cfs_atomic_dec_and_test(&svc->srv_n_difficult_replies) &&
                     svc->srv_is_stopping)
                         cfs_waitq_broadcast(&svc->srv_waitq);
@@ -1774,14 +1924,14 @@ ptlrpc_server_handle_reply(struct ptlrpc_service *svc)
         struct ptlrpc_reply_state *rs = NULL;
         ENTRY;
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rs_lock);
         if (!cfs_list_empty(&svc->srv_reply_queue)) {
                 rs = cfs_list_entry(svc->srv_reply_queue.prev,
                                     struct ptlrpc_reply_state,
                                     rs_list);
                 cfs_list_del_init(&rs->rs_list);
         }
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rs_lock);
         if (rs != NULL)
                 ptlrpc_handle_rs(rs);
         RETURN(rs != NULL);
@@ -1861,12 +2011,103 @@ ptlrpc_retry_rqbds(void *arg)
         return (-ETIMEDOUT);
 }
 
+static inline int
+ptlrpc_threads_enough(struct ptlrpc_service *svc)
+{
+        return svc->srv_n_active_reqs <
+               svc->srv_threads_running - 1 - (svc->srv_hpreq_handler != NULL);
+}
+
+/**
+ * allowed to create more threads
+ * user can call it w/o any lock but need to hold ptlrpc_service::srv_lock to
+ * get reliable result
+ */
+static inline int
+ptlrpc_threads_increasable(struct ptlrpc_service *svc)
+{
+        return svc->srv_threads_running +
+               svc->srv_threads_starting < svc->srv_threads_max;
+}
+
+/**
+ * too many requests and allowed to create more threads
+ */
+static inline int
+ptlrpc_threads_need_create(struct ptlrpc_service *svc)
+{
+        return !ptlrpc_threads_enough(svc) && ptlrpc_threads_increasable(svc);
+}
+
+static inline int
+ptlrpc_thread_stopping(struct ptlrpc_thread *thread)
+{
+        return (thread->t_flags & SVC_STOPPING) != 0 ||
+                thread->t_svc->srv_is_stopping;
+}
+
+static inline int
+ptlrpc_rqbd_pending(struct ptlrpc_service *svc)
+{
+        return !cfs_list_empty(&svc->srv_idle_rqbds) &&
+               svc->srv_rqbd_timeout == 0;
+}
+
+static inline int
+ptlrpc_at_check(struct ptlrpc_service *svc)
+{
+        return svc->srv_at_check;
+}
+
+/**
+ * requests wait on preprocessing
+ * user can call it w/o any lock but need to hold ptlrpc_service::srv_lock to
+ * get reliable result
+ */
+static inline int
+ptlrpc_server_request_waiting(struct ptlrpc_service *svc)
+{
+        return !cfs_list_empty(&svc->srv_req_in_queue);
+}
+
+static __attribute__((__noinline__)) int
+ptlrpc_wait_event(struct ptlrpc_service *svc,
+                  struct ptlrpc_thread *thread)
+{
+        /* Don't exit while there are replies to be handled */
+        struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
+                                             ptlrpc_retry_rqbds, svc);
+
+        lc_watchdog_disable(thread->t_watchdog);
+
+        cfs_cond_resched();
+
+        l_wait_event_exclusive_head(svc->srv_waitq,
+                               ptlrpc_thread_stopping(thread) ||
+                               ptlrpc_server_request_waiting(svc) ||
+                               ptlrpc_server_request_pending(svc, 0) ||
+                               ptlrpc_rqbd_pending(svc) ||
+                               ptlrpc_at_check(svc), &lwi);
+
+        if (ptlrpc_thread_stopping(thread))
+                return -EINTR;
+
+        lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc));
+
+        return 0;
+}
+
+/**
+ * Main thread body for service threads.
+ * Waits in a loop waiting for new requests to process to appear.
+ * Every time an incoming requests is added to its queue, a waitq
+ * is woken up and one of the threads will handle it.
+ */
 static int ptlrpc_main(void *arg)
 {
         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
         struct ptlrpc_service  *svc = data->svc;
         struct ptlrpc_thread   *thread = data->thread;
-        struct obd_device      *dev = data->dev;
         struct ptlrpc_reply_state *rs;
 #ifdef WITH_GROUP_INFO
         cfs_group_info_t *ginfo = NULL;
@@ -1931,11 +2172,17 @@ static int ptlrpc_main(void *arg)
         }
 
         cfs_spin_lock(&svc->srv_lock);
+
+        LASSERT((thread->t_flags & SVC_STARTING) != 0);
+        thread->t_flags &= ~SVC_STARTING;
+        svc->srv_threads_starting--;
+
         /* SVC_STOPPING may already be set here if someone else is trying
          * to stop the service while this new thread has been dynamically
          * forked. We still set SVC_RUNNING to let our creator know that
          * we are now running, however we will exit as soon as possible */
         thread->t_flags |= SVC_RUNNING;
+        svc->srv_threads_running++;
         cfs_spin_unlock(&svc->srv_lock);
 
         /*
@@ -1946,75 +2193,49 @@ static int ptlrpc_main(void *arg)
 
         thread->t_watchdog = lc_watchdog_add(CFS_GET_TIMEOUT(svc), NULL, NULL);
 
-        cfs_spin_lock(&svc->srv_lock);
-        svc->srv_threads_running++;
+        cfs_spin_lock(&svc->srv_rs_lock);
         cfs_list_add(&rs->rs_list, &svc->srv_free_rs_list);
-        cfs_spin_unlock(&svc->srv_lock);
         cfs_waitq_signal(&svc->srv_free_rs_waitq);
+        cfs_spin_unlock(&svc->srv_rs_lock);
 
         CDEBUG(D_NET, "service thread %d (#%d) started\n", thread->t_id,
                svc->srv_threads_running);
 
         /* XXX maintain a list of all managed devices: insert here */
-
-        while (!(thread->t_flags & SVC_STOPPING) && !svc->srv_is_stopping) {
-                /* Don't exit while there are replies to be handled */
-                struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
-                                                     ptlrpc_retry_rqbds, svc);
-
-                lc_watchdog_disable(thread->t_watchdog);
-
-                cfs_cond_resched();
-
-                l_wait_event_exclusive (svc->srv_waitq,
-                              thread->t_flags & SVC_STOPPING ||
-                              svc->srv_is_stopping ||
-                              (!cfs_list_empty(&svc->srv_idle_rqbds) &&
-                               svc->srv_rqbd_timeout == 0) ||
-                              !cfs_list_empty(&svc->srv_req_in_queue) ||
-                              (ptlrpc_server_request_pending(svc, 0) &&
-                               (svc->srv_n_active_reqs <
-                                (svc->srv_threads_running - 1))) ||
-                              svc->srv_at_check,
-                              &lwi);
-
-                if (thread->t_flags & SVC_STOPPING || svc->srv_is_stopping)
+        while (!ptlrpc_thread_stopping(thread)) {
+                if (ptlrpc_wait_event(svc, thread))
                         break;
 
-                lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc));
-
                 ptlrpc_check_rqbd_pool(svc);
 
-                if (svc->srv_threads_started < svc->srv_threads_max &&
-                    svc->srv_n_active_reqs >= (svc->srv_threads_started - 1)) 
+                if (ptlrpc_threads_need_create(svc)) {
                         /* Ignore return code - we tried... */
-                        ptlrpc_start_thread(dev, svc);
+                        ptlrpc_start_thread(svc);
+                }
 
-                if (!cfs_list_empty(&svc->srv_req_in_queue)) {
-                        /* Process all incoming reqs before handling any */
+                /* Process all incoming reqs before handling any */
+                if (ptlrpc_server_request_waiting(svc)) {
                         ptlrpc_server_handle_req_in(svc);
                         /* but limit ourselves in case of flood */
-                        if (counter++ < 1000)
+                        if (counter++ < 100)
                                 continue;
                         counter = 0;
                 }
 
-                if (svc->srv_at_check)
+                if (ptlrpc_at_check(svc))
                         ptlrpc_at_check_timed(svc);
 
-                /* don't handle requests in the last thread */
-                if (ptlrpc_server_request_pending(svc, 0) &&
-                    (svc->srv_n_active_reqs < (svc->srv_threads_running - 1))) {
+                if (ptlrpc_server_request_pending(svc, 0)) {
                         lu_context_enter(&env.le_ctx);
                         ptlrpc_server_handle_request(svc, thread);
                         lu_context_exit(&env.le_ctx);
                 }
 
-                if (!cfs_list_empty(&svc->srv_idle_rqbds) &&
+                if (ptlrpc_rqbd_pending(svc) &&
                     ptlrpc_server_post_idle_rqbds(svc) < 0) {
-                        /* I just failed to repost request buffers.  Wait
-                         * for a timeout (unless something else happens)
-                         * before I try again */
+                        /* I just failed to repost request buffers.
+                         * Wait for a timeout (unless something else
+                         * happens) before I try again */
                         svc->srv_rqbd_timeout = cfs_time_seconds(1)/10;
                         CDEBUG(D_RPCTRACE,"Posted buffers: %d\n",
                                svc->srv_nrqbd_receiving);
@@ -2037,9 +2258,19 @@ out:
                thread, thread->t_pid, thread->t_id, rc);
 
         cfs_spin_lock(&svc->srv_lock);
-        svc->srv_threads_running--; /* must know immediately */
-        thread->t_id = rc;
-        thread->t_flags = SVC_STOPPED;
+        if ((thread->t_flags & SVC_STARTING) != 0) {
+                svc->srv_threads_starting--;
+                thread->t_flags &= ~SVC_STARTING;
+        }
+
+        if ((thread->t_flags & SVC_RUNNING) != 0) {
+                /* must know immediately */
+                svc->srv_threads_running--;
+                thread->t_flags &= ~SVC_RUNNING;
+        }
+
+        thread->t_id    = rc;
+        thread->t_flags |= SVC_STOPPED;
 
         cfs_waitq_signal(&thread->t_ctl_waitq);
         cfs_spin_unlock(&svc->srv_lock);
@@ -2066,6 +2297,10 @@ static int hrt_dont_sleep(struct ptlrpc_hr_thread *t,
         return result;
 }
 
+/**
+ * Main body of "handle reply" function.
+ * It processes acked reply states
+ */
 static int ptlrpc_hr_main(void *arg)
 {
         struct ptlrpc_hr_args * hr_args = arg;
@@ -2087,7 +2322,7 @@ static int ptlrpc_hr_main(void *arg)
 
         while (!cfs_test_bit(HRT_STOPPING, &t->hrt_flags)) {
 
-                l_cfs_wait_event(t->hrt_wait, hrt_dont_sleep(t, &replies));
+                l_wait_condition(t->hrt_wait, hrt_dont_sleep(t, &replies));
                 while (!cfs_list_empty(&replies)) {
                         struct ptlrpc_reply_state *rs;
 
@@ -2122,7 +2357,7 @@ static int ptlrpc_start_hr_thread(struct ptlrpc_hr_service *hr, int n, int cpu)
                 cfs_complete(&t->hrt_completion);
                 GOTO(out, rc);
         }
-        l_cfs_wait_event(t->hrt_wait, cfs_test_bit(HRT_RUNNING, &t->hrt_flags));
+        l_wait_condition(t->hrt_wait, cfs_test_bit(HRT_RUNNING, &t->hrt_flags));
         RETURN(0);
  out:
         return rc;
@@ -2211,6 +2446,9 @@ static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
         EXIT;
 }
 
+/**
+ * Stops all threads of a particular service \a svc
+ */
 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
 {
         struct ptlrpc_thread *thread;
@@ -2230,7 +2468,7 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
         EXIT;
 }
 
-int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
+int ptlrpc_start_threads(struct ptlrpc_service *svc)
 {
         int i, rc = 0;
         ENTRY;
@@ -2239,10 +2477,12 @@ int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
            ptlrpc_server_handle_request */
         LASSERT(svc->srv_threads_min >= 2);
         for (i = 0; i < svc->srv_threads_min; i++) {
-                rc = ptlrpc_start_thread(dev, svc);
+                rc = ptlrpc_start_thread(svc);
                 /* We have enough threads, don't start more.  b=15759 */
-                if (rc == -EMFILE)
+                if (rc == -EMFILE) {
+                        rc = 0;
                         break;
+                }
                 if (rc) {
                         CERROR("cannot start %s thread #%d: rc %d\n",
                                svc->srv_thread_name, i, rc);
@@ -2253,25 +2493,25 @@ int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
         RETURN(rc);
 }
 
-int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
+int ptlrpc_start_thread(struct ptlrpc_service *svc)
 {
         struct l_wait_info lwi = { 0 };
         struct ptlrpc_svc_data d;
         struct ptlrpc_thread *thread;
         char name[32];
-        int id, rc;
+        int rc;
         ENTRY;
 
         CDEBUG(D_RPCTRACE, "%s started %d min %d max %d running %d\n",
-               svc->srv_name, svc->srv_threads_started, svc->srv_threads_min,
+               svc->srv_name, svc->srv_threads_running, svc->srv_threads_min,
                svc->srv_threads_max, svc->srv_threads_running);
 
         if (unlikely(svc->srv_is_stopping))
                 RETURN(-ESRCH);
 
-        if (unlikely(svc->srv_threads_started >= svc->srv_threads_max) ||
+        if (!ptlrpc_threads_increasable(svc) ||
             (OBD_FAIL_CHECK(OBD_FAIL_TGT_TOOMANY_THREADS) &&
-             svc->srv_threads_started == svc->srv_threads_min - 1))
+             svc->srv_threads_running == svc->srv_threads_min - 1))
                 RETURN(-EMFILE);
 
         OBD_ALLOC_PTR(thread);
@@ -2280,19 +2520,21 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
         cfs_waitq_init(&thread->t_ctl_waitq);
 
         cfs_spin_lock(&svc->srv_lock);
-        if (svc->srv_threads_started >= svc->srv_threads_max) {
+        if (!ptlrpc_threads_increasable(svc)) {
                 cfs_spin_unlock(&svc->srv_lock);
                 OBD_FREE_PTR(thread);
                 RETURN(-EMFILE);
         }
+
+        svc->srv_threads_starting++;
+        thread->t_id    = svc->srv_threads_next_id++;
+        thread->t_flags |= SVC_STARTING;
+        thread->t_svc   = svc;
+
         cfs_list_add(&thread->t_link, &svc->srv_threads);
-        id = svc->srv_threads_started++;
         cfs_spin_unlock(&svc->srv_lock);
 
-        thread->t_svc = svc;
-        thread->t_id = id;
-        sprintf(name, "%s_%02d", svc->srv_thread_name, id);
-        d.dev = dev;
+        sprintf(name, "%s_%02d", svc->srv_thread_name, thread->t_id);
         d.svc = svc;
         d.name = name;
         d.thread = thread;
@@ -2308,7 +2550,7 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
 
                 cfs_spin_lock(&svc->srv_lock);
                 cfs_list_del(&thread->t_link);
-                --svc->srv_threads_started;
+                --svc->srv_threads_starting;
                 cfs_spin_unlock(&svc->srv_lock);
 
                 OBD_FREE(thread, sizeof(*thread));
@@ -2450,7 +2692,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         }
 
         /* schedule all outstanding replies to terminate them */
-        cfs_spin_lock(&service->srv_lock);
+        cfs_spin_lock(&service->srv_rs_lock);
         while (!cfs_list_empty(&service->srv_active_replies)) {
                 struct ptlrpc_reply_state *rs =
                         cfs_list_entry(service->srv_active_replies.next,
@@ -2459,7 +2701,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 ptlrpc_schedule_difficult_reply(rs);
                 cfs_spin_unlock(&rs->rs_lock);
         }
-        cfs_spin_unlock(&service->srv_lock);
+        cfs_spin_unlock(&service->srv_rs_lock);
 
         /* purge the request queue.  NB No new replies (rqbds all unlinked)
          * and no service threads, so I'm the only thread noodling the
@@ -2473,7 +2715,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 cfs_list_del(&req->rq_list);
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
-                ptlrpc_server_finish_request(req);
+                ptlrpc_server_finish_request(service, req);
         }
         while (ptlrpc_server_request_pending(service, 1)) {
                 struct ptlrpc_request *req;
@@ -2483,7 +2725,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
                 ptlrpc_hpreq_fini(req);
-                ptlrpc_server_finish_request(req);
+                ptlrpc_server_finish_request(service, req);
         }
         LASSERT(service->srv_n_queued_reqs == 0);
         LASSERT(service->srv_n_active_reqs == 0);
@@ -2528,7 +2770,8 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         RETURN(0);
 }
 
-/* Returns 0 if the service is healthy.
+/**
+ * Returns 0 if the service is healthy.
  *
  * Right now, it just checks to make sure that requests aren't languishing
  * in the queue.  We'll use this health check to govern whether a node needs
@@ -2544,9 +2787,9 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc)
 
         cfs_gettimeofday(&right_now);
 
-        cfs_spin_lock(&svc->srv_lock);
+        cfs_spin_lock(&svc->srv_rq_lock);
         if (!ptlrpc_server_request_pending(svc, 1)) {
-                cfs_spin_unlock(&svc->srv_lock);
+                cfs_spin_unlock(&svc->srv_rq_lock);
                 return 0;
         }
 
@@ -2558,7 +2801,7 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc)
                 request = cfs_list_entry(svc->srv_request_queue.next,
                                          struct ptlrpc_request, rq_list);
         timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
-        cfs_spin_unlock(&svc->srv_lock);
+        cfs_spin_unlock(&svc->srv_rq_lock);
 
         if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 :
                                         at_max)) {