Whamcloud - gitweb
Land b1_8_gate onto b1_8 (20081218_1708)
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index 276689b..f32d283 100644 (file)
@@ -1,26 +1,37 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  Copyright (C) 2002 Cluster File Systems, Inc.
+ * GPL HEADER START
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   You may have signed or agreed to another license before downloading
- *   this software.  If so, you are bound by the terms and conditions
- *   of that agreement, and the following does not apply to you.  See the
- *   LICENSE file included with this distribution for more information.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   If you did not agree to a different license, then this copy of Lustre
- *   is open source software; you can redistribute it and/or modify it
- *   under the terms of version 2 of the GNU General Public License as
- *   published by the Free Software Foundation.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   In either case, Lustre is distributed in the hope that it will be
- *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   license text for more details.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 #define DEBUG_SUBSYSTEM S_RPC
@@ -199,7 +210,7 @@ ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
 }
 
 void
-ptlrpc_commit_replies (struct obd_device *obd)
+ptlrpc_commit_replies (struct obd_export *exp)
 {
         struct list_head   *tmp;
         struct list_head   *nxt;
@@ -208,15 +219,16 @@ ptlrpc_commit_replies (struct obd_device *obd)
          * to attend to complete them. */
 
         /* CAVEAT EMPTOR: spinlock ordering!!! */
-        spin_lock(&obd->obd_uncommitted_replies_lock);
+        spin_lock(&exp->exp_uncommitted_replies_lock);
 
-        list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) {
+        list_for_each_safe(tmp, nxt, &exp->exp_uncommitted_replies) {
                 struct ptlrpc_reply_state *rs =
                         list_entry(tmp, struct ptlrpc_reply_state, rs_obd_list);
 
-                LASSERT (rs->rs_difficult);
-
-                if (rs->rs_transno <= obd->obd_last_committed) {
+                LASSERT(rs->rs_difficult);
+                /* VBR: per-export last_committed */
+                LASSERT(rs->rs_export);
+                if (rs->rs_transno <= rs->rs_export->exp_last_committed) {
                         struct ptlrpc_service *svc = rs->rs_service;
 
                         spin_lock (&svc->srv_lock);
@@ -226,7 +238,7 @@ ptlrpc_commit_replies (struct obd_device *obd)
                 }
         }
 
-        spin_unlock(&obd->obd_uncommitted_replies_lock);
+        spin_unlock(&exp->exp_uncommitted_replies_lock);
 }
 
 static int
@@ -279,9 +291,6 @@ ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc)
 static void ptlrpc_at_timer(unsigned long castmeharder)
 {
         struct ptlrpc_service *svc = (struct ptlrpc_service *)castmeharder;
-        CDEBUG(D_INFO, "at timer %s hit at %ld%s\n",
-               svc->srv_name, cfs_time_current_sec(), 
-               list_empty(&svc->srv_at_list) ? ", empty" : ""); 
         svc->srv_at_check = 1;
         svc->srv_at_checktime = cfs_time_current();
         cfs_waitq_signal(&svc->srv_waitq);
@@ -293,8 +302,9 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
                 int req_portal, int rep_portal, int watchdog_factor,
                 svc_handler_t handler, char *name,
                 cfs_proc_dir_entry_t *proc_entry,
-                svcreq_printfn_t svcreq_printfn, 
-                int min_threads, int max_threads, char *threadname)
+                svcreq_printfn_t svcreq_printfn,
+                int min_threads, int max_threads, char *threadname,
+                svc_hpreq_handler_t hp_handler)
 {
         int                    rc;
         struct ptlrpc_service *service;
@@ -302,7 +312,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
 
         LASSERT (nbufs > 0);
         LASSERT (bufsize >= max_req_size);
-        
+
         OBD_ALLOC(service, sizeof(*service));
         if (service == NULL)
                 RETURN(NULL);
@@ -327,11 +337,16 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         service->srv_threads_min = min_threads;
         service->srv_threads_max = max_threads;
         service->srv_thread_name = threadname;
+        service->srv_hpreq_handler = hp_handler;
+        service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
+        service->srv_hpreq_count = 0;
+        service->srv_n_hpreq = 0;
 
         rc = LNetSetLazyPortal(service->srv_req_portal);
         LASSERT (rc == 0);
 
         CFS_INIT_LIST_HEAD(&service->srv_request_queue);
+        CFS_INIT_LIST_HEAD(&service->srv_request_hpq);
         CFS_INIT_LIST_HEAD(&service->srv_idle_rqbds);
         CFS_INIT_LIST_HEAD(&service->srv_active_rqbds);
         CFS_INIT_LIST_HEAD(&service->srv_history_rqbds);
@@ -345,14 +360,14 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         CFS_INIT_LIST_HEAD(&service->srv_req_in_queue);
         CFS_INIT_LIST_HEAD(&service->srv_at_list);
         cfs_timer_init(&service->srv_at_timer, ptlrpc_at_timer, service);
-        /* At SOW, service time should be quick; 10s seems generous. If client 
+        /* At SOW, service time should be quick; 10s seems generous. If client
            timeout is less than this, we'll be sending an early reply. */
         at_init(&service->srv_at_estimate, 10, 0);
 
         spin_lock (&ptlrpc_all_services_lock);
         list_add (&service->srv_list, &ptlrpc_all_services);
         spin_unlock (&ptlrpc_all_services_lock);
-        
+
         /* Now allocate the request buffers */
         rc = ptlrpc_grow_req_bufs(service);
         /* We shouldn't be under memory pressure at startup, so
@@ -378,38 +393,32 @@ failed:
         return NULL;
 }
 
-static void ptlrpc_server_req_decref(struct ptlrpc_request *req)
+/**
+ * to actually free the request, must be called without holding svc_lock.
+ * note it's caller's responsibility to unlink req->rq_list.
+ */
+static void ptlrpc_server_free_request(struct ptlrpc_request *req)
 {
-        struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
+        LASSERT(atomic_read(&req->rq_refcount) == 0);
+        LASSERT(list_empty(&req->rq_timed_list));
 
-        if (!atomic_dec_and_test(&req->rq_refcount))
-                return;
+        /* DEBUG_REQ() assumes the reply state of a request with a valid
+         * ref will not be destroyed until that reference is dropped. */
+        ptlrpc_req_drop_rs(req);
 
-        LASSERT(list_empty(&req->rq_timed_list));
-        if (req != &rqbd->rqbd_req) {
+        if (req != &req->rq_rqbd->rqbd_req) {
                 /* NB request buffers use an embedded
                  * req if the incoming req unlinked the
                  * MD; this isn't one of them! */
                 OBD_FREE(req, sizeof(*req));
-        } else {
-                struct ptlrpc_service *svc = rqbd->rqbd_service;
-                /* schedule request buffer for re-use.
-                 * NB I can only do this after I've disposed of their
-                 * reqs; particularly the embedded req */
-                spin_lock(&svc->srv_lock);
-                list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
-                spin_unlock(&svc->srv_lock);
         }
 }
 
-static void __ptlrpc_server_free_request(struct ptlrpc_request *req)
-{
-        list_del(&req->rq_list);
-        ptlrpc_req_drop_rs(req);
-        ptlrpc_server_req_decref(req);
-}
-
-static void ptlrpc_server_free_request(struct ptlrpc_request *req)
+/**
+ * drop a reference count of the request. if it reaches 0, we either
+ * put it into history list, or free it immediately.
+ */
+static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
 {
         struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
         struct ptlrpc_service             *svc = rqbd->rqbd_service;
@@ -417,12 +426,8 @@ static void ptlrpc_server_free_request(struct ptlrpc_request *req)
         struct list_head                  *tmp;
         struct list_head                  *nxt;
 
-        if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
-                DEBUG_REQ(D_INFO, req, "free req");
-        spin_lock(&svc->srv_at_lock);
-        req->rq_sent_final = 1;
-        list_del_init(&req->rq_timed_list);
-        spin_unlock(&svc->srv_at_lock);
+        if (!atomic_dec_and_test(&req->rq_refcount))
+                return;
 
         spin_lock(&svc->srv_lock);
 
@@ -465,19 +470,54 @@ static void ptlrpc_server_free_request(struct ptlrpc_request *req)
                                 req = list_entry(rqbd->rqbd_reqs.next,
                                                  struct ptlrpc_request,
                                                  rq_list);
-                                __ptlrpc_server_free_request(req);
+                                list_del(&req->rq_list);
+                                ptlrpc_server_free_request(req);
                         }
 
                         spin_lock(&svc->srv_lock);
+                        /*
+                         * now all reqs including the embedded req has been
+                         * disposed, schedule request buffer for re-use.
+                         */
+                        LASSERT(atomic_read(&rqbd->rqbd_req.rq_refcount) == 0);
+                        list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
                 }
+
+                spin_unlock(&svc->srv_lock);
         } else if (req->rq_reply_state && req->rq_reply_state->rs_prealloc) {
-                 /* If we are low on memory, we are not interested in
-                    history */
-                list_del(&req->rq_history_list);
-                __ptlrpc_server_free_request(req);
+                 /* If we are low on memory, we are not interested in history */
+                list_del(&req->rq_list);
+                list_del_init(&req->rq_history_list);
+                spin_unlock(&svc->srv_lock);
+
+                ptlrpc_server_free_request(req);
+        } else {
+                spin_unlock(&svc->srv_lock);
         }
+}
 
-        spin_unlock(&svc->srv_lock);
+/**
+ * to finish a request: stop sending more early replies, and release
+ * the request. should be called after we finished handling the request.
+ */
+static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
+{
+        struct ptlrpc_service  *svc = req->rq_rqbd->rqbd_service;
+
+        if (req->rq_export) {
+                class_export_put(req->rq_export);
+                req->rq_export = NULL;
+        }
+
+        if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
+                DEBUG_REQ(D_INFO, req, "free req");
+
+        spin_lock(&svc->srv_at_lock);
+        req->rq_sent_final = 1;
+        list_del_init(&req->rq_timed_list);
+        spin_unlock(&svc->srv_at_lock);
+
+        ptlrpc_server_drop_request(req);
 }
 
 /* This function makes sure dead exports are evicted in a timely manner.
@@ -486,7 +526,7 @@ static void ptlrpc_server_free_request(struct ptlrpc_request *req)
 static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
 {
         struct obd_export *oldest_exp;
-        time_t oldest_time;
+        time_t oldest_time, new_time;
 
         ENTRY;
 
@@ -497,9 +537,13 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
            of the list, we can be really lazy here - we don't have to evict
            at the exact right moment.  Eventually, all silent exports
            will make it to the top of the list. */
-        exp->exp_last_request_time = max(exp->exp_last_request_time,
-                                         cfs_time_current_sec() + extra_delay);
 
+        /* Do not pay attention on 1sec or smaller renewals. */
+        new_time = cfs_time_current_sec() + extra_delay;
+        if (exp->exp_last_request_time + 1 /*second */ >= new_time)
+                RETURN_EXIT;
+
+        exp->exp_last_request_time = new_time;
         CDEBUG(D_INFO, "updating export %s at %ld\n",
                exp->exp_client_uuid.uuid,
                exp->exp_last_request_time);
@@ -512,8 +556,7 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
         if (list_empty(&exp->exp_obd_chain_timed)) {
                 /* this one is not timed */
                 spin_unlock(&exp->exp_obd->obd_dev_lock);
-                EXIT;
-                return;
+                RETURN_EXIT;
         }
 
         list_move_tail(&exp->exp_obd_chain_timed,
@@ -546,7 +589,7 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
                                oldest_time);
                 }
         } else {
-                if (cfs_time_current_sec() > 
+                if (cfs_time_current_sec() >
                     (exp->exp_obd->obd_eviction_timer + extra_delay)) {
                         /* The evictor won't evict anyone who we've heard from
                          * recently, so we don't have to check before we start
@@ -561,7 +604,7 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
 
 static int ptlrpc_check_req(struct ptlrpc_request *req)
 {
-        if (lustre_msg_get_conn_cnt(req->rq_reqmsg) < 
+        if (lustre_msg_get_conn_cnt(req->rq_reqmsg) <
             req->rq_export->exp_conn_cnt) {
                 DEBUG_REQ(D_ERROR, req,
                           "DROPPING req from old connection %d < %d",
@@ -594,16 +637,16 @@ static void ptlrpc_at_set_timer(struct ptlrpc_service *svc)
         }
 
         /* Set timer for closest deadline */
-        rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request, 
+        rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
                         rq_timed_list);
         next = (__s32)(rq->rq_deadline - cfs_time_current_sec() -
                        at_early_margin);
-        if (next <= 0) 
+        if (next <= 0)
                 ptlrpc_at_timer((unsigned long)svc);
         else
                 cfs_timer_arm(&svc->srv_at_timer, cfs_time_shift(next));
         spin_unlock(&svc->srv_at_lock);
-        CDEBUG(D_INFO, "armed %s at %+lds\n", svc->srv_name, next);
+        CDEBUG(D_INFO, "armed %s at %+ds\n", svc->srv_name, next);
 }
 
 /* Add rpc to early reply check list */
@@ -613,15 +656,12 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
         struct ptlrpc_request *rq;
         int found = 0;
 
-        if (AT_OFF) 
+        if (AT_OFF)
                 return(0);
 
         if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0)
                 return(-ENOSYS);
-        
-        DEBUG_REQ(D_ADAPTTO, req, "add timed %lds", 
-                  req->rq_deadline - cfs_time_current_sec());
-        
+
         spin_lock(&svc->srv_at_lock);
 
         if (unlikely(req->rq_sent_final)) {
@@ -633,7 +673,7 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
         /* Add to sorted list.  Presumably latest rpcs will have the latest
            deadlines, so search backward. */
         list_for_each_entry_reverse(rq, &svc->srv_at_list, rq_timed_list) {
-                if (req->rq_deadline > rq->rq_deadline) {
+                if (req->rq_deadline >= rq->rq_deadline) {
                         list_add(&req->rq_timed_list, &rq->rq_timed_list);
                         found++;
                         break;
@@ -652,9 +692,9 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
                 ptlrpc_at_set_timer(svc);
 
         return 0;
-}            
+}
 
-static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req, 
+static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
                                       int extra_time)
 {
         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
@@ -664,47 +704,55 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
         time_t newdl;
         int rc;
         ENTRY;
-                            
-        /* deadline is when the client expects us to reply, margin is the 
+
+        /* deadline is when the client expects us to reply, margin is the
            difference between clients' and servers' expectations */
-        DEBUG_REQ(D_ADAPTTO, req, 
+        DEBUG_REQ(D_ADAPTTO, req,
                   "%ssending early reply (deadline %+lds, margin %+lds) for "
                   "%d+%d", AT_OFF ? "AT off - not " : "",
                   olddl, olddl - at_get(&svc->srv_at_estimate),
                   at_get(&svc->srv_at_estimate), extra_time);
 
-        if (AT_OFF) 
+        if (AT_OFF)
                 RETURN(0);
-        
+
         if (olddl < 0) {
-                CDEBUG(D_WARNING, "x"LPU64": Already past deadline (%+lds), not"
-                       " sending early reply. Increase at_early_margin (%d)?\n",
-                       req->rq_xid, olddl, at_early_margin);
+                DEBUG_REQ(D_WARNING, req, "Already past deadline (%+lds), "
+                          "not sending early reply. Consider increasing "
+                          "at_early_margin (%d)?", olddl, at_early_margin);
+
                 /* Return an error so we're not re-added to the timed list. */
                 RETURN(-ETIMEDOUT);
         }
 
         if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0){
-                CDEBUG(D_INFO, "Wanted to ask client for more time, but no AT "
-                      "support\n");
+                DEBUG_REQ(D_INFO, req, "Wanted to ask client for more time, "
+                          "but no AT support");
                 RETURN(-ENOSYS);
         }
 
-        if (extra_time) {
-                /* Fake our processing time into the future to ask the
-                   clients for some extra amount of time */
-                extra_time += cfs_time_current_sec() -
-                        req->rq_arrival_time.tv_sec;
-                at_add(&svc->srv_at_estimate, extra_time);
+        if (req->rq_export && req->rq_export->exp_in_recovery) {
+                /* don't increase server estimates during recovery, and give
+                   clients the full recovery time. */
+                newdl = cfs_time_current_sec() +
+                        req->rq_export->exp_obd->obd_recovery_timeout;
+        } else {
+                if (extra_time) {
+                        /* Fake our processing time into the future to ask the
+                           clients for some extra amount of time */
+                        extra_time += cfs_time_current_sec() -
+                                      req->rq_arrival_time.tv_sec;
+                        at_add(&svc->srv_at_estimate, extra_time);
+                }
+                newdl = req->rq_arrival_time.tv_sec +
+                        at_get(&svc->srv_at_estimate);
         }
-
-        newdl = req->rq_arrival_time.tv_sec + at_get(&svc->srv_at_estimate);
         if (req->rq_deadline >= newdl) {
                 /* We're not adding any time, no need to send an early reply
                    (e.g. maybe at adaptive_max) */
-                CDEBUG(D_ADAPTTO, "x"LPU64": Couldn't add any time (%ld/%ld), "
-                       "not sending early reply\n", req->rq_xid, olddl,
-                       newdl - cfs_time_current_sec());
+                DEBUG_REQ(D_WARNING, req, "Couldn't add any time "
+                          "(%ld/%ld), not sending early reply\n",
+                          olddl, newdl - cfs_time_current_sec());
                 RETURN(-ETIMEDOUT);
         }
 
@@ -716,7 +764,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
                 OBD_FREE(reqcopy, sizeof *reqcopy);
                 RETURN(-ENOMEM);
         }
-        
+
         *reqcopy = *req;
         reqcopy->rq_reply_state = NULL;
         reqcopy->rq_rep_swab_mask = 0;
@@ -725,8 +773,8 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
 
         if (req->rq_sent_final) {
-                CDEBUG(D_ADAPTTO, "x"LPU64": normal reply already sent out, "
-                       "abort sending early reply\n", req->rq_xid);
+                DEBUG_REQ(D_ADAPTTO, reqcopy, "Normal reply already sent out, "
+                          "abort sending early reply\n");
                 GOTO(out, rc = 0);
         }
 
@@ -738,12 +786,12 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
 
         /* RPC ref */
         class_export_rpc_get(reqcopy->rq_export);
-        if (reqcopy->rq_export->exp_obd && 
+        if (reqcopy->rq_export->exp_obd &&
             reqcopy->rq_export->exp_obd->obd_fail)
                 GOTO(out_put, rc = -ENODEV);
 
         rc = lustre_pack_reply_flags(reqcopy, 1, NULL, NULL, LPRFL_EARLY_REPLY);
-        if (rc) 
+        if (rc)
                 GOTO(out_put, rc);
 
         rc = ptlrpc_send_reply(reqcopy, PTLRPC_REPLY_EARLY);
@@ -756,7 +804,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
                 DEBUG_REQ(D_ERROR, req, "Early reply send failed %d", rc);
         }
 
-        /* Free the (early) reply state from lustre_pack_reply. 
+        /* Free the (early) reply state from lustre_pack_reply.
            (ptlrpc_send_reply takes it's own rs ref, so this is safe here) */
         ptlrpc_req_drop_rs(reqcopy);
 
@@ -787,10 +835,10 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
         }
         delay = cfs_time_sub(cfs_time_current(), svc->srv_at_checktime);
         svc->srv_at_check = 0;
-        
+
         if (list_empty(&svc->srv_at_list)) {
                 spin_unlock(&svc->srv_at_lock);
-                RETURN(0);      
+                RETURN(0);
         }
 
         /* The timer went off, but maybe the nearest rpc already completed. */
@@ -801,10 +849,10 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
                 /* We've still got plenty of time.  Reset the timer. */
                 spin_unlock(&svc->srv_at_lock);
                 ptlrpc_at_set_timer(svc);
-                RETURN(0);      
+                RETURN(0);
         }
 
-        /* We're close to a timeout, and we don't know how much longer the 
+        /* We're close to a timeout, and we don't know how much longer the
            server will take. Send early replies to everyone expiring soon. */
         CFS_INIT_LIST_HEAD(&work_list);
         list_for_each_entry_safe(rq, n, &svc->srv_at_list, rq_timed_list) {
@@ -823,9 +871,9 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
 
         CDEBUG(D_ADAPTTO, "timeout in %+ds, asking for %d secs on %d early "
                "replies\n", first, at_extra, counter);
-        
+
         if (first < 0) {
-                /* We're already past request deadlines before we even get a 
+                /* We're already past request deadlines before we even get a
                    chance to send early replies */
                 LCONSOLE_WARN("%s: This server is not able to keep up with "
                               "request traffic (cpu-bound).\n",  svc->srv_name);
@@ -835,8 +883,8 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
                       at_get(&svc->srv_at_estimate), delay);
         }
 
-        /* ptlrpc_server_free_request may delete an entry out of the work
-           list */
+        /* ptlrpc_server_finish_request may delete an entry out of
+         * the work list */
         spin_lock(&svc->srv_at_lock);
         while (!list_empty(&work_list)) {
                 rq = list_entry(work_list.next, struct ptlrpc_request,
@@ -850,7 +898,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
                 if (ptlrpc_at_send_early_reply(rq, at_extra) == 0)
                         ptlrpc_at_add_timed(rq);
 
-                ptlrpc_server_req_decref(rq);
+                ptlrpc_server_drop_request(rq);
                 spin_lock(&svc->srv_at_lock);
         }
         spin_unlock(&svc->srv_at_lock);
@@ -858,6 +906,167 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
         RETURN(0);
 }
 
+/**
+ * Put the request to the export list if the request may become
+ * a high priority one.
+ */
+static int ptlrpc_hpreq_init(struct ptlrpc_service *svc,
+                             struct ptlrpc_request *req)
+{
+        int rc;
+        ENTRY;
+
+        if (svc->srv_hpreq_handler) {
+                rc = svc->srv_hpreq_handler(req);
+                if (rc)
+                        RETURN(rc);
+        }
+        if (req->rq_export && req->rq_ops) {
+                spin_lock(&req->rq_export->exp_lock);
+                list_add(&req->rq_exp_list, &req->rq_export->exp_queued_rpc);
+                spin_unlock(&req->rq_export->exp_lock);
+        }
+
+        RETURN(0);
+}
+
+/** Remove the request from the export list. */
+static void ptlrpc_hpreq_fini(struct ptlrpc_request *req)
+{
+        ENTRY;
+        if (req->rq_export && req->rq_ops) {
+                spin_lock(&req->rq_export->exp_lock);
+                list_del_init(&req->rq_exp_list);
+                spin_unlock(&req->rq_export->exp_lock);
+        }
+        EXIT;
+}
+
+/**
+ * Make the request a high priority one.
+ *
+ * All the high priority requests are queued in a separate FIFO
+ * ptlrpc_service::srv_request_hpq list which is parallel to
+ * ptlrpc_service::srv_request_queue list but has a higher priority
+ * for handling.
+ *
+ * \see ptlrpc_server_handle_request().
+ */
+static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service *svc,
+                                        struct ptlrpc_request *req)
+{
+        ENTRY;
+        LASSERT(svc != NULL);
+        spin_lock(&req->rq_lock);
+        if (req->rq_hp == 0) {
+                int opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+                /* Add to the high priority queue. */
+                list_move_tail(&req->rq_list, &svc->srv_request_hpq);
+                req->rq_hp = 1;
+                if (opc != OBD_PING)
+                        DEBUG_REQ(D_NET, req, "high priority req");
+        }
+        spin_unlock(&req->rq_lock);
+        EXIT;
+}
+
+void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
+{
+        struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
+        ENTRY;
+
+        spin_lock(&svc->srv_lock);
+        /* It may happen that the request is already taken for the processing
+         * but still in the export list, do not re-add it into the HP list. */
+        if (req->rq_phase == RQ_PHASE_NEW)
+                ptlrpc_hpreq_reorder_nolock(svc, req);
+        spin_unlock(&svc->srv_lock);
+        EXIT;
+}
+
+/** Check if the request if a high priority one. */
+static int ptlrpc_server_hpreq_check(struct ptlrpc_request *req)
+{
+        int opc, rc = 0;
+        ENTRY;
+
+        /* Check by request opc. */
+        opc = lustre_msg_get_opc(req->rq_reqmsg);
+        if (opc == OBD_PING)
+                RETURN(1);
+
+        /* Perform request specific check. */
+        if (req->rq_ops && req->rq_ops->hpreq_check)
+                rc = req->rq_ops->hpreq_check(req);
+        RETURN(rc);
+}
+
+/** Check if a request is a high priority one. */
+static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
+                                     struct ptlrpc_request *req)
+{
+        int rc;
+        ENTRY;
+
+        rc = ptlrpc_server_hpreq_check(req);
+        if (rc < 0)
+                RETURN(rc);
+
+        spin_lock(&svc->srv_lock);
+        /* Before inserting the request into the queue, check if it is not
+         * inserted yet, or even already handled -- it may happen due to
+         * a racing ldlm_server_blocking_ast(). */
+        if (req->rq_phase == RQ_PHASE_NEW && list_empty(&req->rq_list)) {
+                if (rc)
+                        ptlrpc_hpreq_reorder_nolock(svc, req);
+                else
+                        list_add_tail(&req->rq_list, &svc->srv_request_queue);
+        }
+        spin_unlock(&svc->srv_lock);
+
+        RETURN(0);
+}
+
+/* Only allow normal priority requests on a service that has a high-priority
+ * queue if forced (i.e. cleanup), if there are other high priority requests
+ * already being processed (i.e. those threads can service more high-priority
+ * requests), or if there are enough idle threads that a later thread can do
+ * a high priority request. */
+static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
+{
+        return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 ||
+               svc->srv_n_active_reqs < svc->srv_threads_running - 2;
+}
+
+static struct ptlrpc_request *
+ptlrpc_server_request_get(struct ptlrpc_service *svc, int force)
+{
+        struct ptlrpc_request *req = NULL;
+        ENTRY;
+
+        if (ptlrpc_server_allow_normal(svc, force) &&
+            !list_empty(&svc->srv_request_queue) &&
+            (list_empty(&svc->srv_request_hpq) ||
+             svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) {
+                req = list_entry(svc->srv_request_queue.next,
+                                 struct ptlrpc_request, rq_list);
+                svc->srv_hpreq_count = 0;
+        } else if (!list_empty(&svc->srv_request_hpq)) {
+                req = list_entry(svc->srv_request_hpq.next,
+                                 struct ptlrpc_request, rq_list);
+                svc->srv_hpreq_count++;
+        }
+        RETURN(req);
+}
+
+static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
+{
+        return ((ptlrpc_server_allow_normal(svc, force) &&
+                 !list_empty(&svc->srv_request_queue)) ||
+                !list_empty(&svc->srv_request_hpq));
+}
+
 /* Handle freshly incoming reqs, add to timed early reply list,
    pass on to regular request queue */
 static int
@@ -919,20 +1128,19 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
                 lustre_msg_get_handle(req->rq_reqmsg));
         if (req->rq_export) {
                 rc = ptlrpc_check_req(req);
-                class_export_put(req->rq_export);
-                req->rq_export = NULL;
-                if (rc) 
+                if (rc)
                         goto err_req;
+                ptlrpc_update_export_timer(req->rq_export, 0);
         }
 
         /* req_in handling should/must be fast */
-        if (cfs_time_current_sec() - req->rq_arrival_time.tv_sec > 5) 
+        if (cfs_time_current_sec() - req->rq_arrival_time.tv_sec > 5)
                 DEBUG_REQ(D_WARNING, req, "Slow req_in handling %lus",
                           cfs_time_current_sec() - req->rq_arrival_time.tv_sec);
 
         /* Set rpc server deadline and add it to the timed list */
         deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) &
-                    MSGHDR_AT_SUPPORT) ? 
+                    MSGHDR_AT_SUPPORT) ?
                    /* The max time the client expects us to take */
                    lustre_msg_get_timeout(req->rq_reqmsg) : obd_timeout;
         req->rq_deadline = req->rq_arrival_time.tv_sec + deadline;
@@ -940,14 +1148,17 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
                 DEBUG_REQ(D_ERROR, req, "Dropping request with 0 timeout");
                 goto err_req;
         }
-        
+
         ptlrpc_at_add_timed(req);
+        rc = ptlrpc_hpreq_init(svc, req);
+        if (rc)
+                GOTO(err_req, rc);
 
         /* Move it over to the request processing queue */
-        spin_lock(&svc->srv_lock);
-        list_add_tail(&req->rq_list, &svc->srv_request_queue);
+        rc = ptlrpc_server_request_add(svc, req);
+        if (rc)
+                GOTO(err_req, rc);
         cfs_waitq_signal(&svc->srv_waitq);
-        spin_unlock(&svc->srv_lock);
         RETURN(1);
 
 err_req:
@@ -955,7 +1166,7 @@ err_req:
         svc->srv_n_queued_reqs--;
         svc->srv_n_active_reqs++;
         spin_unlock(&svc->srv_lock);
-        ptlrpc_server_free_request(req);
+        ptlrpc_server_finish_request(req);
 
         RETURN(1);
 }
@@ -969,35 +1180,71 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
         struct timeval         work_start;
         struct timeval         work_end;
         long                   timediff;
-        int                    rc;
+        int                    opc, rc;
+        int                    fail_opc = 0;
         ENTRY;
 
         LASSERT(svc);
 
         spin_lock(&svc->srv_lock);
-        if (list_empty (&svc->srv_request_queue) ||
+        if (!ptlrpc_server_request_pending(svc, 0) ||
             (
 #ifndef __KERNEL__
              /* !@%$# liblustre only has 1 thread */
              svc->srv_n_difficult_replies != 0 &&
 #endif
              svc->srv_n_active_reqs >= (svc->srv_threads_running - 1))) {
-                /* Don't handle regular requests in the last thread, in order               * remain free to handle any 'difficult' replies (that might
+                /* Don't handle regular requests in the last thread, in order
                  * to handle difficult replies (which might block other threads)
-                 * as well as handle any incoming reqs, early replies, etc. 
+                 * as well as handle any incoming reqs, early replies, etc.
                  * That means we always need at least 2 service threads. */
                 spin_unlock(&svc->srv_lock);
                 RETURN(0);
         }
 
-        request = list_entry (svc->srv_request_queue.next,
-                              struct ptlrpc_request, rq_list);
-        list_del_init (&request->rq_list);
+        request = ptlrpc_server_request_get(svc, 0);
+        if  (request == NULL) {
+                spin_unlock(&svc->srv_lock);
+                RETURN(0);
+        }
+
+        opc = lustre_msg_get_opc(request->rq_reqmsg);
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT))
+                fail_opc = OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT;
+        else if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
+                fail_opc = OBD_FAIL_PTLRPC_HPREQ_TIMEOUT;
+
+        if (unlikely(fail_opc)) {
+                if (request->rq_export && request->rq_ops) {
+                        spin_unlock(&svc->srv_lock);
+                        OBD_FAIL_TIMEOUT(fail_opc, 4);
+                        spin_lock(&svc->srv_lock);
+                        request = ptlrpc_server_request_get(svc, 0);
+                        if  (request == NULL) {
+                                spin_unlock(&svc->srv_lock);
+                                RETURN(0);
+                        }
+                        LASSERT(ptlrpc_server_request_pending(svc, 0));
+                }
+        }
+
+        list_del_init(&request->rq_list);
         svc->srv_n_queued_reqs--;
         svc->srv_n_active_reqs++;
 
+        if (request->rq_hp)
+                svc->srv_n_hpreq++;
+
+        /* The phase is changed under the lock here because we need to know
+         * the request is under processing (see ptlrpc_hpreq_reorder()). */
+        ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
         spin_unlock(&svc->srv_lock);
 
+        ptlrpc_hpreq_fini(request);
+
+        if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
+                libcfs_debug_dumplog();
+
         do_gettimeofday(&work_start);
         timediff = cfs_timeval_sub(&work_start, &request->rq_arrival_time,NULL);
         if (svc->srv_stats != NULL) {
@@ -1010,13 +1257,10 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                 lprocfs_counter_add(svc->srv_stats, PTLRPC_TIMEOUT,
                                     at_get(&svc->srv_at_estimate));
         }
-        
+
         CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid);
-        
-        request->rq_svc_thread = thread;
-        request->rq_export = class_conn2export(
-                                     lustre_msg_get_handle(request->rq_reqmsg));
 
+        request->rq_svc_thread = thread;
         if (request->rq_export) {
                 if (ptlrpc_check_req(request))
                         goto put_conn;
@@ -1024,7 +1268,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                 export = class_export_rpc_get(request->rq_export);
         }
 
-        /* Discard requests queued for longer than the deadline.  
+        /* Discard requests queued for longer than the deadline.
            The deadline is increased if we send an early reply. */
         if (cfs_time_current_sec() > request->rq_deadline) {
                 DEBUG_REQ(D_ERROR, request, "Dropping timed-out request from %s"
@@ -1036,8 +1280,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                 goto put_rpc_export;
         }
 
-        request->rq_phase = RQ_PHASE_INTERPRET;
-
         CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc "
                "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
                (request->rq_export ?
@@ -1051,8 +1293,8 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
         OBD_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, obd_fail_val);
 
         rc = svc->srv_handler(request);
-        
-        request->rq_phase = RQ_PHASE_COMPLETE;
+
+        ptlrpc_rqphase_move(request, RQ_PHASE_COMPLETE);
 
         CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
                "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
@@ -1069,9 +1311,6 @@ put_rpc_export:
                 class_export_rpc_put(export);
 
 put_conn:
-        if (request->rq_export != NULL)
-                class_export_put(request->rq_export);
-
         if (cfs_time_current_sec() > request->rq_deadline) {
                 DEBUG_REQ(D_WARNING, request, "Request x"LPU64" took longer "
                           "than estimated (%ld%+lds); client may timeout.",
@@ -1089,7 +1328,7 @@ put_conn:
                cfs_timeval_sub(&work_end, &request->rq_arrival_time, NULL),
                request->rq_repmsg ? lustre_msg_get_transno(request->rq_repmsg) :
                request->rq_transno, request->rq_status,
-               request->rq_repmsg ? lustre_msg_get_status(request->rq_repmsg): 
+               request->rq_repmsg ? lustre_msg_get_status(request->rq_repmsg):
                -999);
         if (svc->srv_stats != NULL) {
                 __u32 op = lustre_msg_get_opc(request->rq_reqmsg);
@@ -1102,13 +1341,17 @@ put_conn:
                 }
         }
         if (request->rq_early_count) {
-                DEBUG_REQ(D_ADAPTTO, request, 
+                DEBUG_REQ(D_ADAPTTO, request,
                           "sent %d early replies before finishing in %lds",
                           request->rq_early_count,
                           work_end.tv_sec - request->rq_arrival_time.tv_sec);
         }
 
-        ptlrpc_server_free_request(request);
+        spin_lock(&svc->srv_lock);
+        if (request->rq_hp)
+                svc->srv_n_hpreq--;
+        spin_unlock(&svc->srv_lock);
+        ptlrpc_server_finish_request(request);
 
         RETURN(1);
 }
@@ -1143,10 +1386,10 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
         /* Disengage from notifiers carefully (lock order - irqrestore below!)*/
         spin_unlock(&svc->srv_lock);
 
-        spin_lock (&obd->obd_uncommitted_replies_lock);
+        spin_lock (&exp->exp_uncommitted_replies_lock);
         /* Noop if removed already */
         list_del_init (&rs->rs_obd_list);
-        spin_unlock (&obd->obd_uncommitted_replies_lock);
+        spin_unlock (&exp->exp_uncommitted_replies_lock);
 
         spin_lock (&exp->exp_lock);
         /* Noop if removed already */
@@ -1165,9 +1408,7 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
                 /* If we see this, we should already have seen the warning
                  * in mds_steal_ack_locks()  */
                 CWARN("All locks stolen from rs %p x"LPD64".t"LPD64
-                      " o%d NID %s\n",
-                      rs,
-                      rs->rs_xid, rs->rs_transno,
+                      " o%d NID %s\n", rs, rs->rs_xid, rs->rs_transno,
                       lustre_msg_get_opc(rs->rs_msg),
                       libcfs_nid2str(exp->exp_connection->c_peer.nid));
         }
@@ -1301,7 +1542,6 @@ static int ptlrpc_main(void *arg)
         struct ptlrpc_thread   *thread = data->thread;
         struct obd_device      *dev = data->dev;
         struct ptlrpc_reply_state *rs;
-        struct lc_watchdog     *watchdog;
 #ifdef WITH_GROUP_INFO
         struct group_info *ginfo = NULL;
 #endif
@@ -1359,9 +1599,10 @@ static int ptlrpc_main(void *arg)
          */
         cfs_waitq_signal(&thread->t_ctl_waitq);
 
-        watchdog = lc_watchdog_add(max_t(int, obd_timeout, AT_OFF ? 0 : 
-                                   at_get(&svc->srv_at_estimate)) * 
-                                   svc->srv_watchdog_factor, NULL, NULL);
+        thread->t_watchdog = lc_watchdog_add(max_t(int, obd_timeout, AT_OFF ? 0 :
+                                                   at_get(&svc->srv_at_estimate))
+                                             *  svc->srv_watchdog_factor,
+                                             NULL, NULL);
 
         spin_lock(&svc->srv_lock);
         svc->srv_threads_running++;
@@ -1380,7 +1621,7 @@ static int ptlrpc_main(void *arg)
                 struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
                                                      ptlrpc_retry_rqbds, svc);
 
-                lc_watchdog_disable(watchdog);
+                lc_watchdog_disable(thread->t_watchdog);
 
                 cond_resched();
 
@@ -1391,15 +1632,15 @@ static int ptlrpc_main(void *arg)
                                svc->srv_rqbd_timeout == 0) ||
                               !list_empty(&svc->srv_req_in_queue) ||
                               !list_empty(&svc->srv_reply_queue) ||
-                              (!list_empty(&svc->srv_request_queue) &&
+                              (ptlrpc_server_request_pending(svc, 0) &&
                                (svc->srv_n_active_reqs <
                                 (svc->srv_threads_running - 1))) ||
                               svc->srv_at_check,
                               &lwi);
 
-                lc_watchdog_touch_ms(watchdog, max_t(int, obd_timeout,
-                                     AT_OFF ? 0 : 
-                                     at_get(&svc->srv_at_estimate)) * 
+                lc_watchdog_touch_ms(thread->t_watchdog, max_t(int, obd_timeout,
+                                     AT_OFF ? 0 :
+                                     at_get(&svc->srv_at_estimate)) *
                                      svc->srv_watchdog_factor);
 
                 ptlrpc_check_rqbd_pool(svc);
@@ -1416,17 +1657,17 @@ static int ptlrpc_main(void *arg)
                 if (!list_empty(&svc->srv_req_in_queue)) {
                         /* Process all incoming reqs before handling any */
                         ptlrpc_server_handle_req_in(svc);
-                        /* but limit ourselves in case of flood */ 
+                        /* but limit ourselves in case of flood */
                         if (counter++ < 1000)
                                 continue;
                         counter = 0;
                 }
 
-                if (svc->srv_at_check) 
+                if (svc->srv_at_check)
                         ptlrpc_at_check_timed(svc);
 
                 /* don't handle requests in the last thread */
-                if (!list_empty (&svc->srv_request_queue) &&
+                if (ptlrpc_server_request_pending(svc, 0) &&
                     (svc->srv_n_active_reqs < (svc->srv_threads_running - 1)))
                         ptlrpc_server_handle_request(svc, thread);
 
@@ -1441,7 +1682,8 @@ static int ptlrpc_main(void *arg)
                 }
         }
 
-        lc_watchdog_delete(watchdog);
+        lc_watchdog_delete(thread->t_watchdog);
+        thread->t_watchdog = NULL;
 
 out_srv_init:
         /*
@@ -1508,6 +1750,7 @@ int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
 
         /* We require 2 threads min - see note in
          * ptlrpc_server_handle_request() */
+
         LASSERT(svc->srv_threads_min >= 2);
         for (i = 0; i < svc->srv_threads_min; i++) {
                 rc = ptlrpc_start_thread(dev, svc);
@@ -1563,7 +1806,7 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
         d.thread = thread;
 
         CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
-        
+
         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
          * just drop the VM and FILES in ptlrpc_daemonize() right away.
          */
@@ -1618,7 +1861,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
          * its 'unlink' flag set for each posted rqbd */
         list_for_each(tmp, &service->srv_active_rqbds) {
                 struct ptlrpc_request_buffer_desc *rqbd =
-                        list_entry(tmp, struct ptlrpc_request_buffer_desc, 
+                        list_entry(tmp, struct ptlrpc_request_buffer_desc,
                                    rqbd_list);
 
                 rc = LNetMDUnlink(rqbd->rqbd_md_h);
@@ -1637,7 +1880,8 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
 
                 /* Network access will complete in finite time but the HUGE
                  * timeout lets us CWARN for visibility of sluggish NALs */
-                lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL);
+                lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
+                                           cfs_time_seconds(1), NULL, NULL);
                 rc = l_wait_event(service->srv_waitq,
                                   service->srv_nrqbd_receiving == 0,
                                   &lwi);
@@ -1668,18 +1912,17 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 list_del(&req->rq_list);
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
-                ptlrpc_server_free_request(req);
+                ptlrpc_server_finish_request(req);
         }
-        while (!list_empty(&service->srv_request_queue)) {
-                struct ptlrpc_request *req =
-                        list_entry(service->srv_request_queue.next,
-                                   struct ptlrpc_request,
-                                   rq_list);
+        while (ptlrpc_server_request_pending(service, 1)) {
+                struct ptlrpc_request *req;
 
+                req = ptlrpc_server_request_get(service, 1);
                 list_del(&req->rq_list);
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
-                ptlrpc_server_free_request(req);
+                ptlrpc_hpreq_fini(req);
+                ptlrpc_server_finish_request(req);
         }
         LASSERT(service->srv_n_queued_reqs == 0);
         LASSERT(service->srv_n_active_reqs == 0);
@@ -1742,18 +1985,22 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc)
         do_gettimeofday(&right_now);
 
         spin_lock(&svc->srv_lock);
-        if (list_empty(&svc->srv_request_queue)) {
+        if (!ptlrpc_server_request_pending(svc, 1)) {
                 spin_unlock(&svc->srv_lock);
                 return 0;
         }
-        
+
         /* How long has the next entry been waiting? */
-        request = list_entry(svc->srv_request_queue.next,
-                             struct ptlrpc_request, rq_list);
+        if (list_empty(&svc->srv_request_queue))
+                request = list_entry(svc->srv_request_hpq.next,
+                                     struct ptlrpc_request, rq_list);
+        else
+                request = list_entry(svc->srv_request_queue.next,
+                                     struct ptlrpc_request, rq_list);
         timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
         spin_unlock(&svc->srv_lock);
 
-        if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 : 
+        if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 :
                                         at_max)) {
                 CERROR("%s: unhealthy - request has been waiting %lds\n",
                        svc->srv_name, timediff / ONE_MILLION);