Whamcloud - gitweb
LU-14876 out: don't connect to busy MDS-MDS export
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index be4cd31..6a68a19 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 #define DEBUG_SUBSYSTEM S_RPC
@@ -41,6 +40,7 @@
 #include <lu_object.h>
 #include <uapi/linux/lnet/lnet-types.h>
 #include "ptlrpc_internal.h"
+#include <linux/delay.h>
 
 /* The following are visible and mutable through /sys/module/ptlrpc */
 int test_req_buffer_pressure = 0;
@@ -62,9 +62,11 @@ MODULE_PARM_DESC(at_extra, "How much extra time to give with each early reply");
 static int ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt);
 static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req);
 static void ptlrpc_at_remove_timed(struct ptlrpc_request *req);
+static int ptlrpc_start_threads(struct ptlrpc_service *svc);
+static int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait);
 
 /** Holds a list of all PTLRPC services */
-struct list_head ptlrpc_all_services;
+LIST_HEAD(ptlrpc_all_services);
 /** Used to protect the \e ptlrpc_all_services list */
 struct mutex ptlrpc_all_services_mutex;
 
@@ -228,7 +230,7 @@ struct ptlrpc_hr_partition {
 #define HRT_STOPPING 1
 
 struct ptlrpc_hr_service {
-       /* CPU partition table, it's just cfs_cpt_table for now */
+       /* CPU partition table, it's just cfs_cpt_tab for now */
        struct cfs_cpt_table            *hr_cpt_table;
        /** controller sleep waitq */
        wait_queue_head_t               hr_waitq;
@@ -446,14 +448,13 @@ static int ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt)
                        return posted;
                }
 
-               rqbd = list_entry(svcpt->scp_rqbd_idle.next,
-                                     struct ptlrpc_request_buffer_desc,
-                                     rqbd_list);
-               list_del(&rqbd->rqbd_list);
+               rqbd = list_first_entry(&svcpt->scp_rqbd_idle,
+                                       struct ptlrpc_request_buffer_desc,
+                                       rqbd_list);
 
                /* assume we will post successfully */
                svcpt->scp_nrqbds_posted++;
-               list_add(&rqbd->rqbd_list, &svcpt->scp_rqbd_posted);
+               list_move(&rqbd->rqbd_list, &svcpt->scp_rqbd_posted);
 
                spin_unlock(&svcpt->scp_lock);
 
@@ -467,8 +468,7 @@ static int ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt)
        spin_lock(&svcpt->scp_lock);
 
        svcpt->scp_nrqbds_posted--;
-       list_del(&rqbd->rqbd_list);
-       list_add_tail(&rqbd->rqbd_list, &svcpt->scp_rqbd_idle);
+       list_move_tail(&rqbd->rqbd_list, &svcpt->scp_rqbd_idle);
 
        /*
         * Don't complain if no request buffers are posted right now; LNET
@@ -571,12 +571,14 @@ static void ptlrpc_server_nthreads_check(struct ptlrpc_service *svc,
                 * have too many threads no matter how many cores/HTs
                 * there are.
                 */
+               preempt_disable();
                if (cpumask_weight
                    (topology_sibling_cpumask(smp_processor_id())) > 1) {
                        /* weight is # of HTs */
                        /* depress thread factor for hyper-thread */
                        factor = factor - (factor >> 1) + (factor >> 3);
                }
+               preempt_enable();
 
                weight = cfs_cpt_weight(svc->srv_cptable, 0);
 
@@ -683,13 +685,12 @@ static int ptlrpc_service_part_init(struct ptlrpc_service *svc,
 
  failed:
        if (array->paa_reqs_count != NULL) {
-               OBD_FREE(array->paa_reqs_count, sizeof(__u32) * size);
+               OBD_FREE_PTR_ARRAY(array->paa_reqs_count, size);
                array->paa_reqs_count = NULL;
        }
 
        if (array->paa_reqs_array != NULL) {
-               OBD_FREE(array->paa_reqs_array,
-                        sizeof(struct list_head) * array->paa_size);
+               OBD_FREE_PTR_ARRAY(array->paa_reqs_array, array->paa_size);
                array->paa_reqs_array = NULL;
        }
 
@@ -724,7 +725,7 @@ struct ptlrpc_service *ptlrpc_register_service(struct ptlrpc_service_conf *conf,
 
        cptable = cconf->cc_cptable;
        if (cptable == NULL)
-               cptable = cfs_cpt_table;
+               cptable = cfs_cpt_tab;
 
        if (conf->psc_thr.tc_cpu_bind > 1) {
                CERROR("%s: Invalid cpu bind value %d, only 1 or 0 allowed\n",
@@ -743,7 +744,7 @@ struct ptlrpc_service *ptlrpc_register_service(struct ptlrpc_service_conf *conf,
                                                 strlen(cconf->cc_pattern),
                                                 0, ncpts - 1, &el);
                        if (rc != 0) {
-                               CERROR("%s: invalid CPT pattern string: %s",
+                               CERROR("%s: invalid CPT pattern string: %s\n",
                                       conf->psc_name, cconf->cc_pattern);
                                RETURN(ERR_PTR(-EINVAL));
                        }
@@ -754,7 +755,7 @@ struct ptlrpc_service *ptlrpc_register_service(struct ptlrpc_service_conf *conf,
                                CERROR("%s: failed to parse CPT array %s: %d\n",
                                       conf->psc_name, cconf->cc_pattern, rc);
                                if (cpts != NULL)
-                                       OBD_FREE(cpts, sizeof(*cpts) * ncpts);
+                                       OBD_FREE_PTR_ARRAY(cpts, ncpts);
                                RETURN(ERR_PTR(rc < 0 ? rc : -EINVAL));
                        }
                        ncpts = rc;
@@ -764,7 +765,7 @@ struct ptlrpc_service *ptlrpc_register_service(struct ptlrpc_service_conf *conf,
        OBD_ALLOC(service, offsetof(struct ptlrpc_service, srv_parts[ncpts]));
        if (service == NULL) {
                if (cpts != NULL)
-                       OBD_FREE(cpts, sizeof(*cpts) * ncpts);
+                       OBD_FREE_PTR_ARRAY(cpts, ncpts);
                RETURN(ERR_PTR(-ENOMEM));
        }
 
@@ -905,8 +906,6 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
        struct ptlrpc_service_part        *svcpt = rqbd->rqbd_svcpt;
        struct ptlrpc_service             *svc = svcpt->scp_service;
        int                                refcount;
-       struct list_head                          *tmp;
-       struct list_head                          *nxt;
 
        if (!atomic_dec_and_test(&req->rq_refcount))
                return;
@@ -942,9 +941,7 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
        refcount = --(rqbd->rqbd_refcount);
        if (refcount == 0) {
                /* request buffer is now idle: add to history */
-               list_del(&rqbd->rqbd_list);
-
-               list_add_tail(&rqbd->rqbd_list, &svcpt->scp_hist_rqbds);
+               list_move_tail(&rqbd->rqbd_list, &svcpt->scp_hist_rqbds);
                svcpt->scp_hist_nrqbds++;
 
                /*
@@ -952,9 +949,9 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
                 * I expect only about 1 or 2 rqbds need to be recycled here
                 */
                while (svcpt->scp_hist_nrqbds > svc->srv_hist_nrqbds_cpt_max) {
-                       rqbd = list_entry(svcpt->scp_hist_rqbds.next,
-                                         struct ptlrpc_request_buffer_desc,
-                                         rqbd_list);
+                       rqbd = list_first_entry(&svcpt->scp_hist_rqbds,
+                                               struct ptlrpc_request_buffer_desc,
+                                               rqbd_list);
 
                        list_del(&rqbd->rqbd_list);
                        svcpt->scp_hist_nrqbds--;
@@ -963,9 +960,7 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
                         * remove rqbd's reqs from svc's req history while
                         * I've got the service lock
                         */
-                       list_for_each(tmp, &rqbd->rqbd_reqs) {
-                               req = list_entry(tmp, struct ptlrpc_request,
-                                                rq_list);
+                       list_for_each_entry(req, &rqbd->rqbd_reqs, rq_list) {
                                /* Track the highest culled req seq */
                                if (req->rq_history_seq >
                                    svcpt->scp_hist_seq_culled) {
@@ -977,10 +972,9 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
 
                        spin_unlock(&svcpt->scp_lock);
 
-                       list_for_each_safe(tmp, nxt, &rqbd->rqbd_reqs) {
-                               req = list_entry(rqbd->rqbd_reqs.next,
-                                                struct ptlrpc_request,
-                                                rq_list);
+                       while ((req = list_first_entry_or_null(
+                                       &rqbd->rqbd_reqs,
+                                       struct ptlrpc_request, rq_list))) {
                                list_del(&req->rq_list);
                                ptlrpc_server_free_request(req);
                        }
@@ -1026,6 +1020,30 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
        }
 }
 
+static void ptlrpc_add_exp_list_nolock(struct ptlrpc_request *req,
+                                      struct obd_export *export, bool hp)
+{
+       __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+       if (hp)
+               list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
+       else
+               list_add(&req->rq_exp_list, &export->exp_reg_rpcs);
+       if (tag && export->exp_used_slots)
+               set_bit(tag - 1, export->exp_used_slots);
+}
+
+static void ptlrpc_del_exp_list(struct ptlrpc_request *req)
+{
+       __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+       spin_lock(&req->rq_export->exp_rpc_lock);
+       list_del_init(&req->rq_exp_list);
+       if (tag && !req->rq_obsolete && req->rq_export->exp_used_slots)
+               clear_bit(tag - 1, req->rq_export->exp_used_slots);
+       spin_unlock(&req->rq_export->exp_rpc_lock);
+}
+
 /** Change request export and move hp request from old export to new */
 void ptlrpc_request_change_export(struct ptlrpc_request *req,
                                  struct obd_export *export)
@@ -1033,19 +1051,13 @@ void ptlrpc_request_change_export(struct ptlrpc_request *req,
        if (req->rq_export != NULL) {
                LASSERT(!list_empty(&req->rq_exp_list));
                /* remove rq_exp_list from last export */
-               spin_lock(&req->rq_export->exp_rpc_lock);
-               list_del_init(&req->rq_exp_list);
-               spin_unlock(&req->rq_export->exp_rpc_lock);
-               /*
-                * export has one reference already, so it`s safe to
+               ptlrpc_del_exp_list(req);
+               /* export has one reference already, so it's safe to
                 * add req to export queue here and get another
                 * reference for request later
                 */
                spin_lock(&export->exp_rpc_lock);
-               if (req->rq_ops != NULL) /* hp request */
-                       list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
-               else
-                       list_add(&req->rq_exp_list, &export->exp_reg_rpcs);
+               ptlrpc_add_exp_list_nolock(req, export, req->rq_ops != NULL);
                spin_unlock(&export->exp_rpc_lock);
 
                class_export_rpc_dec(req->rq_export);
@@ -1055,8 +1067,6 @@ void ptlrpc_request_change_export(struct ptlrpc_request *req,
        /* request takes one export refcount */
        req->rq_export = class_export_get(export);
        class_export_rpc_inc(export);
-
-       return;
 }
 
 /**
@@ -1336,13 +1346,14 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
        struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
        struct ptlrpc_request *reqcopy;
        struct lustre_msg *reqmsg;
-       time64_t olddl = req->rq_deadline - ktime_get_real_seconds();
+       timeout_t olddl = req->rq_deadline - ktime_get_real_seconds();
        time64_t newdl;
        int rc;
 
        ENTRY;
 
-       if (CFS_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_RECONNECT)) {
+       if (CFS_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_RECONNECT) ||
+           CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
                /* don't send early reply */
                RETURN(1);
        }
@@ -1352,18 +1363,19 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
         * difference between clients' and servers' expectations
         */
        DEBUG_REQ(D_ADAPTTO, req,
-                 "%ssending early reply (deadline %+llds, margin %+llds) for %d+%d",
+                 "%ssending early reply (deadline %+ds, margin %+ds) for %d+%d",
                  AT_OFF ? "AT off - not " : "",
-                 (s64)olddl, (s64)(olddl - at_get(&svcpt->scp_at_estimate)),
+                 olddl, olddl - at_get(&svcpt->scp_at_estimate),
                  at_get(&svcpt->scp_at_estimate), at_extra);
 
        if (AT_OFF)
                RETURN(0);
 
        if (olddl < 0) {
+               /* below message is checked in replay-ost-single.sh test_9 */
                DEBUG_REQ(D_WARNING, req,
-                         "Already past deadline (%+llds), not sending early reply. Consider increasing at_early_margin (%d)?",
-                         (s64)olddl, at_early_margin);
+                         "Already past deadline (%+ds), not sending early reply. Consider increasing at_early_margin (%d)?",
+                         olddl, at_early_margin);
 
                /* Return an error so we're not re-added to the timed list. */
                RETURN(-ETIMEDOUT);
@@ -1419,8 +1431,9 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
         * we may be past adaptive_max
         */
        if (req->rq_deadline >= newdl) {
-               DEBUG_REQ(D_WARNING, req, "Couldn't add any time (%lld/%lld), not sending early reply\n",
-                         (s64)olddl, (s64)(newdl - ktime_get_real_seconds()));
+               DEBUG_REQ(D_WARNING, req,
+                         "Could not add any time (%d/%lld), not sending early reply",
+                         olddl, newdl - ktime_get_real_seconds());
                RETURN(-ETIMEDOUT);
        }
 
@@ -1451,10 +1464,10 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
                GOTO(out, rc = -ETIMEDOUT);
 
        LASSERT(atomic_read(&req->rq_refcount));
-       /** if it is last refcount then early reply isn't needed */
+       /* if it is last refcount then early reply isn't needed */
        if (atomic_read(&req->rq_refcount) == 1) {
                DEBUG_REQ(D_ADAPTTO, reqcopy,
-                         "Normal reply already sent out, abort sending early reply\n");
+                         "Normal reply already sent, abort early reply");
                GOTO(out, rc = -EINVAL);
        }
 
@@ -1481,7 +1494,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
                req->rq_deadline = newdl;
                req->rq_early_count++; /* number sent, server side */
        } else {
-               DEBUG_REQ(D_ERROR, req, "Early reply send failed %d", rc);
+               DEBUG_REQ(D_ERROR, req, "Early reply send failed: rc = %d", rc);
        }
 
        /*
@@ -1509,11 +1522,11 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
 {
        struct ptlrpc_at_array *array = &svcpt->scp_at_array;
        struct ptlrpc_request *rq, *n;
-       struct list_head work_list;
+       LIST_HEAD(work_list);
        __u32 index, count;
        time64_t deadline;
        time64_t now = ktime_get_real_seconds();
-       s64 delay;
+       s64 delay_ms;
        int first, counter = 0;
 
        ENTRY;
@@ -1522,7 +1535,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
                spin_unlock(&svcpt->scp_at_lock);
                RETURN(0);
        }
-       delay = ktime_ms_delta(ktime_get(), svcpt->scp_at_checktime);
+       delay_ms = ktime_ms_delta(ktime_get(), svcpt->scp_at_checktime);
        svcpt->scp_at_check = 0;
 
        if (array->paa_count == 0) {
@@ -1543,7 +1556,6 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
         * We're close to a timeout, and we don't know how much longer the
         * server will take. Send early replies to everyone expiring soon.
         */
-       INIT_LIST_HEAD(&work_list);
        deadline = -1;
        div_u64_rem(array->paa_deadline, array->paa_size, &index);
        count = array->paa_count;
@@ -1594,19 +1606,19 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
                 */
                LCONSOLE_WARN("%s: This server is not able to keep up with request traffic (cpu-bound).\n",
                              svcpt->scp_service->srv_name);
-               CWARN("earlyQ=%d reqQ=%d recA=%d, svcEst=%d, delay=%lld\n",
+               CWARN("earlyQ=%d reqQ=%d recA=%d, svcEst=%d, delay=%lldms\n",
                      counter, svcpt->scp_nreqs_incoming,
                      svcpt->scp_nreqs_active,
-                     at_get(&svcpt->scp_at_estimate), delay);
+                     at_get(&svcpt->scp_at_estimate), delay_ms);
        }
 
        /*
         * we took additional refcount so entries can't be deleted from list, no
         * locking is needed
         */
-       while (!list_empty(&work_list)) {
-               rq = list_entry(work_list.next, struct ptlrpc_request,
-                               rq_timed_list);
+       while ((rq = list_first_entry_or_null(&work_list,
+                                             struct ptlrpc_request,
+                                             rq_timed_list)) != NULL) {
                list_del_init(&rq->rq_timed_list);
 
                if (ptlrpc_at_send_early_reply(rq) == 0)
@@ -1632,13 +1644,6 @@ ptlrpc_server_check_resend_in_progress(struct ptlrpc_request *req)
                return NULL;
 
        /*
-        * bulk request are aborted upon reconnect, don't try to
-        * find a match
-        */
-       if (req->rq_bulk_write || req->rq_bulk_read)
-               return NULL;
-
-       /*
         * This list should not be longer than max_requests in
         * flights on the client, so it is not all that long.
         * Also we only hit this codepath in case of a resent
@@ -1664,6 +1669,47 @@ found:
        return tmp;
 }
 
+#ifdef HAVE_SERVER_SUPPORT
+static void ptlrpc_server_mark_obsolete(struct ptlrpc_request *req)
+{
+       req->rq_obsolete = 1;
+}
+
+static void
+ptlrpc_server_mark_in_progress_obsolete(struct ptlrpc_request *req)
+{
+       struct ptlrpc_request   *tmp = NULL;
+       __u16                   tag;
+
+       if (!tgt_is_increasing_xid_client(req->rq_export) ||
+           req->rq_export->exp_used_slots == NULL)
+               return;
+
+       tag = lustre_msg_get_tag(req->rq_reqmsg);
+       if (tag == 0)
+               return;
+
+       if (!test_bit(tag - 1, req->rq_export->exp_used_slots))
+               return;
+
+       /* This list should not be longer than max_requests in
+        * flights on the client, so it is not all that long.
+        * Also we only hit this codepath in case of a resent
+        * request which makes it even more rarely hit */
+       list_for_each_entry(tmp, &req->rq_export->exp_reg_rpcs, rq_exp_list) {
+               if (tag == lustre_msg_get_tag(tmp->rq_reqmsg) &&
+                   req->rq_xid > tmp->rq_xid)
+                       ptlrpc_server_mark_obsolete(tmp);
+
+       }
+       list_for_each_entry(tmp, &req->rq_export->exp_hp_rpcs, rq_exp_list) {
+               if (tag == lustre_msg_get_tag(tmp->rq_reqmsg) &&
+                   req->rq_xid > tmp->rq_xid)
+                       ptlrpc_server_mark_obsolete(tmp);
+       }
+}
+#endif
+
 /**
  * Check if a request should be assigned with a high priority.
  *
@@ -1721,9 +1767,7 @@ static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req)
                if (req->rq_ops && req->rq_ops->hpreq_fini)
                        req->rq_ops->hpreq_fini(req);
 
-               spin_lock(&req->rq_export->exp_rpc_lock);
-               list_del_init(&req->rq_exp_list);
-               spin_unlock(&req->rq_export->exp_rpc_lock);
+               ptlrpc_del_exp_list(req);
        }
        EXIT;
 }
@@ -1770,7 +1814,7 @@ static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
        hp = rc > 0;
        ptlrpc_nrs_req_initialize(svcpt, req, hp);
 
-       if (req->rq_export != NULL) {
+       while (req->rq_export != NULL) {
                struct obd_export *exp = req->rq_export;
 
                /*
@@ -1778,7 +1822,18 @@ static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
                 * atomically
                 */
                spin_lock_bh(&exp->exp_rpc_lock);
+#ifdef HAVE_SERVER_SUPPORT
+               ptlrpc_server_mark_in_progress_obsolete(req);
+#endif
                orig = ptlrpc_server_check_resend_in_progress(req);
+               if (orig && OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE)) {
+                       spin_unlock_bh(&exp->exp_rpc_lock);
+
+                       OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
+                       msleep(4 * MSEC_PER_SEC);
+                       continue;
+               }
+
                if (orig && likely(atomic_inc_not_zero(&orig->rq_refcount))) {
                        bool linked;
 
@@ -1797,18 +1852,22 @@ static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
                                ptlrpc_at_remove_timed(orig);
                        spin_unlock(&orig->rq_rqbd->rqbd_svcpt->scp_at_lock);
                        orig->rq_deadline = req->rq_deadline;
+                       orig->rq_rep_mbits = req->rq_rep_mbits;
                        if (likely(linked))
                                ptlrpc_at_add_timed(orig);
                        ptlrpc_server_drop_request(orig);
                        ptlrpc_nrs_req_finalize(req);
+
+                       /* don't mark slot unused for resend in progress */
+                       req->rq_obsolete = 1;
+
                        RETURN(-EBUSY);
                }
 
-               if (hp || req->rq_ops != NULL)
-                       list_add(&req->rq_exp_list, &exp->exp_hp_rpcs);
-               else
-                       list_add(&req->rq_exp_list, &exp->exp_reg_rpcs);
+               ptlrpc_add_exp_list_nolock(req, exp, hp || req->rq_ops != NULL);
+
                spin_unlock_bh(&exp->exp_rpc_lock);
+               break;
        }
 
        /*
@@ -1987,6 +2046,7 @@ static int ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
        struct ptlrpc_service *svc = svcpt->scp_service;
        struct ptlrpc_request *req;
        __u32 deadline;
+       __u32 opc;
        int rc;
 
        ENTRY;
@@ -1997,8 +2057,8 @@ static int ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
                RETURN(0);
        }
 
-       req = list_entry(svcpt->scp_req_incoming.next,
-                            struct ptlrpc_request, rq_list);
+       req = list_first_entry(&svcpt->scp_req_incoming,
+                              struct ptlrpc_request, rq_list);
        list_del_init(&req->rq_list);
        svcpt->scp_nreqs_incoming--;
        /*
@@ -2043,8 +2103,9 @@ static int ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
                goto err_req;
        }
 
+       opc = lustre_msg_get_opc(req->rq_reqmsg);
        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_REQ_OPC) &&
-           lustre_msg_get_opc(req->rq_reqmsg) == cfs_fail_val) {
+           opc == cfs_fail_val) {
                CERROR("drop incoming rpc opc %u, x%llu\n",
                       cfs_fail_val, req->rq_xid);
                goto err_req;
@@ -2058,7 +2119,7 @@ static int ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
                goto err_req;
        }
 
-       switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+       switch (opc) {
        case MDS_WRITEPAGE:
        case OST_WRITE:
        case OUT_UPDATE:
@@ -2081,7 +2142,7 @@ static int ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
                        rc = sptlrpc_target_export_check(req->rq_export, req);
                        if (rc)
                                DEBUG_REQ(D_ERROR, req,
-                                         "DROPPING req with illegal security flavor,");
+                                         "DROPPING req with illegal security flavor");
                }
 
                if (rc)
@@ -2092,8 +2153,8 @@ static int ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
        /* req_in handling should/must be fast */
        if (ktime_get_real_seconds() - req->rq_arrival_time.tv_sec > 5)
                DEBUG_REQ(D_WARNING, req, "Slow req_in handling %llds",
-                         (s64)(ktime_get_real_seconds() -
-                               req->rq_arrival_time.tv_sec));
+                         ktime_get_real_seconds() -
+                         req->rq_arrival_time.tv_sec);
 
        /* Set rpc server deadline and add it to the timed list */
        deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) &
@@ -2129,8 +2190,20 @@ static int ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
                thread->t_env->le_ses = &req->rq_session;
        }
 
+
+       if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND) &&
+                    (opc == LDLM_ENQUEUE) &&
+                    (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
+               OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND, 6);
+
        ptlrpc_at_add_timed(req);
 
+       if (opc != OST_CONNECT && opc != MDS_CONNECT &&
+           opc != MGS_CONNECT && req->rq_export != NULL) {
+               if (exp_connect_flags2(req->rq_export) & OBD_CONNECT2_REP_MBITS)
+                       req->rq_rep_mbits = lustre_msg_get_mbits(req->rq_reqmsg);
+       }
+
        /* Move it over to the request processing queue */
        rc = ptlrpc_server_request_add(svcpt, req);
        if (rc)
@@ -2209,7 +2282,8 @@ static int ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
         * The deadline is increased if we send an early reply.
         */
        if (ktime_get_real_seconds() > request->rq_deadline) {
-               DEBUG_REQ(D_ERROR, request, "Dropping timed-out request from %s: deadline %lld:%llds ago\n",
+               DEBUG_REQ(D_ERROR, request,
+                         "Dropping timed-out request from %s: deadline %lld/%llds ago",
                          libcfs_id2str(request->rq_peer),
                          request->rq_deadline -
                          request->rq_arrival_time.tv_sec,
@@ -2218,15 +2292,16 @@ static int ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
        }
 
        CDEBUG(D_RPCTRACE,
-              "Handling RPC pname:cluuid+ref:pid:xid:nid:opc %s:%s+%d:%d:x%llu:%s:%d\n",
-              current_comm(),
+              "Handling RPC req@%p pname:cluuid+ref:pid:xid:nid:opc:job %s:%s+%d:%d:x%llu:%s:%d:%s\n",
+              request, current->comm,
               (request->rq_export ?
                (char *)request->rq_export->exp_client_uuid.uuid : "0"),
               (request->rq_export ?
-               atomic_read(&request->rq_export->exp_refcount) : -99),
+               refcount_read(&request->rq_export->exp_handle.h_ref) : -99),
               lustre_msg_get_status(request->rq_reqmsg), request->rq_xid,
               libcfs_id2str(request->rq_peer),
-              lustre_msg_get_opc(request->rq_reqmsg));
+              lustre_msg_get_opc(request->rq_reqmsg),
+              lustre_msg_get_jobid(request->rq_reqmsg) ?: "");
 
        if (lustre_msg_get_opc(request->rq_reqmsg) != OBD_PING)
                CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, cfs_fail_val);
@@ -2247,8 +2322,7 @@ static int ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
 put_conn:
        if (unlikely(ktime_get_real_seconds() > request->rq_deadline)) {
                DEBUG_REQ(D_WARNING, request,
-                         "Request took longer than estimated (%lld:%llds); "
-                         "client may timeout.",
+                         "Request took longer than estimated (%lld/%llds); client may timeout",
                          request->rq_deadline -
                          request->rq_arrival_time.tv_sec,
                          ktime_get_real_seconds() - request->rq_deadline);
@@ -2258,16 +2332,17 @@ put_conn:
        timediff_usecs = ktime_us_delta(work_end, work_start);
        arrived_usecs = ktime_us_delta(work_end, arrived);
        CDEBUG(D_RPCTRACE,
-              "Handled RPC pname:cluuid+ref:pid:xid:nid:opc %s:%s+%d:%d:x%llu:%s:%d Request processed in %lldus (%lldus total) trans %llu rc %d/%d\n",
-              current_comm(),
+              "Handled RPC req@%p pname:cluuid+ref:pid:xid:nid:opc:job %s:%s+%d:%d:x%llu:%s:%d:%s Request processed in %lldus (%lldus total) trans %llu rc %d/%d\n",
+              request, current->comm,
               (request->rq_export ?
               (char *)request->rq_export->exp_client_uuid.uuid : "0"),
               (request->rq_export ?
-              atomic_read(&request->rq_export->exp_refcount) : -99),
+               refcount_read(&request->rq_export->exp_handle.h_ref) : -99),
               lustre_msg_get_status(request->rq_reqmsg),
               request->rq_xid,
               libcfs_id2str(request->rq_peer),
               lustre_msg_get_opc(request->rq_reqmsg),
+              lustre_msg_get_jobid(request->rq_reqmsg) ?: "",
               timediff_usecs,
               arrived_usecs,
               (request->rq_repmsg ?
@@ -2482,14 +2557,6 @@ static void ptlrpc_check_rqbd_pool(struct ptlrpc_service_part *svcpt)
        }
 }
 
-static int ptlrpc_retry_rqbds(void *arg)
-{
-       struct ptlrpc_service_part *svcpt = (struct ptlrpc_service_part *)arg;
-
-       svcpt->scp_rqbd_timeout = 0;
-       return -ETIMEDOUT;
-}
-
 static inline int ptlrpc_threads_enough(struct ptlrpc_service_part *svcpt)
 {
        return svcpt->scp_nreqs_active <
@@ -2582,37 +2649,40 @@ static void ptlrpc_watchdog_fire(struct work_struct *w)
        u64 ms_lapse = ktime_ms_delta(ktime_get(), thread->t_touched);
        u32 ms_frac = do_div(ms_lapse, MSEC_PER_SEC);
 
-       if (!__ratelimit(&watchdog_limit)) {
+       /* ___ratelimit() returns true if the action is NOT ratelimited */
+       if (__ratelimit(&watchdog_limit)) {
+               /* below message is checked in sanity-quota.sh test_6,18 */
                LCONSOLE_WARN("%s: service thread pid %u was inactive for %llu.%03u seconds. The thread might be hung, or it might only be slow and will resume later. Dumping the stack trace for debugging purposes:\n",
                              thread->t_task->comm, thread->t_task->pid,
                              ms_lapse, ms_frac);
 
                libcfs_debug_dumpstack(thread->t_task);
        } else {
+               /* below message is checked in sanity-quota.sh test_6,18 */
                LCONSOLE_WARN("%s: service thread pid %u was inactive for %llu.%03u seconds. Watchdog stack traces are limited to 3 per %u seconds, skipping this one.\n",
                              thread->t_task->comm, thread->t_task->pid,
                              ms_lapse, ms_frac, libcfs_watchdog_ratelimit);
        }
 }
 
-static void ptlrpc_watchdog_init(struct delayed_work *work, time_t time)
+void ptlrpc_watchdog_init(struct delayed_work *work, timeout_t timeout)
 {
        INIT_DELAYED_WORK(work, ptlrpc_watchdog_fire);
-       schedule_delayed_work(work, cfs_time_seconds(time));
+       schedule_delayed_work(work, cfs_time_seconds(timeout));
 }
 
-static void ptlrpc_watchdog_disable(struct delayed_work *work)
+void ptlrpc_watchdog_disable(struct delayed_work *work)
 {
        cancel_delayed_work_sync(work);
 }
 
-static void ptlrpc_watchdog_touch(struct delayed_work *work, time_t time)
+void ptlrpc_watchdog_touch(struct delayed_work *work, timeout_t timeout)
 {
        struct ptlrpc_thread *thread = container_of(&work->work,
                                                    struct ptlrpc_thread,
                                                    t_watchdog.work);
        thread->t_touched = ktime_get();
-       mod_delayed_work(system_wq, work, cfs_time_seconds(time));
+       mod_delayed_work(system_wq, work, cfs_time_seconds(timeout));
 }
 
 /**
@@ -2630,20 +2700,28 @@ static __attribute__((__noinline__)) int
 ptlrpc_wait_event(struct ptlrpc_service_part *svcpt,
                  struct ptlrpc_thread *thread)
 {
-       /* Don't exit while there are replies to be handled */
-       struct l_wait_info lwi = LWI_TIMEOUT(svcpt->scp_rqbd_timeout,
-                                            ptlrpc_retry_rqbds, svcpt);
-
        ptlrpc_watchdog_disable(&thread->t_watchdog);
 
        cond_resched();
 
-       l_wait_event_exclusive_head(svcpt->scp_waitq,
-                               ptlrpc_thread_stopping(thread) ||
-                               ptlrpc_server_request_incoming(svcpt) ||
-                               ptlrpc_server_request_pending(svcpt, false) ||
-                               ptlrpc_rqbd_pending(svcpt) ||
-                               ptlrpc_at_check(svcpt), &lwi);
+       if (svcpt->scp_rqbd_timeout == 0)
+               /* Don't exit while there are replies to be handled */
+               wait_event_idle_exclusive_lifo(
+                       svcpt->scp_waitq,
+                       ptlrpc_thread_stopping(thread) ||
+                       ptlrpc_server_request_incoming(svcpt) ||
+                       ptlrpc_server_request_pending(svcpt, false) ||
+                       ptlrpc_rqbd_pending(svcpt) ||
+                       ptlrpc_at_check(svcpt));
+       else if (wait_event_idle_exclusive_lifo_timeout(
+                        svcpt->scp_waitq,
+                        ptlrpc_thread_stopping(thread) ||
+                        ptlrpc_server_request_incoming(svcpt) ||
+                        ptlrpc_server_request_pending(svcpt, false) ||
+                        ptlrpc_rqbd_pending(svcpt) ||
+                        ptlrpc_at_check(svcpt),
+                        svcpt->scp_rqbd_timeout) == 0)
+               svcpt->scp_rqbd_timeout = 0;
 
        if (ptlrpc_thread_stopping(thread))
                return -EINTR;
@@ -2672,8 +2750,7 @@ static int ptlrpc_main(void *arg)
        ENTRY;
 
        thread->t_task = current;
-       thread->t_pid = current_pid();
-       unshare_fs_struct();
+       thread->t_pid = current->pid;
 
        if (svc->srv_cpt_bind) {
                rc = cfs_cpt_bind(svc->srv_cptable, svcpt->scp_cpt);
@@ -2774,6 +2851,9 @@ static int ptlrpc_main(void *arg)
 
                /* reset le_ses to initial state */
                env->le_ses = NULL;
+               /* Refill the context before execution to make sure
+                * all thread keys are allocated */
+               lu_env_refill(env);
                /* Process all incoming reqs before handling any */
                if (ptlrpc_server_request_incoming(svcpt)) {
                        lu_context_enter(&env->le_ctx);
@@ -2870,7 +2950,7 @@ static int ptlrpc_hr_main(void *arg)
 {
        struct ptlrpc_hr_thread *hrt = (struct ptlrpc_hr_thread *)arg;
        struct ptlrpc_hr_partition *hrp = hrt->hrt_partition;
-       struct list_head replies;
+       LIST_HEAD(replies);
        struct lu_env *env;
        int rc;
 
@@ -2878,9 +2958,6 @@ static int ptlrpc_hr_main(void *arg)
        if (env == NULL)
                RETURN(-ENOMEM);
 
-       INIT_LIST_HEAD(&replies);
-       unshare_fs_struct();
-
        rc = cfs_cpt_bind(ptlrpc_hr.hr_cpt_table, hrp->hrp_cpt);
        if (rc != 0) {
                char threadname[20];
@@ -2904,7 +2981,7 @@ static int ptlrpc_hr_main(void *arg)
        wake_up(&ptlrpc_hr.hr_waitq);
 
        while (!ptlrpc_hr.hr_stopping) {
-               l_wait_condition(hrt->hrt_waitq, hrt_dont_sleep(hrt, &replies));
+               wait_event_idle(hrt->hrt_waitq, hrt_dont_sleep(hrt, &replies));
 
                while (!list_empty(&replies)) {
                        struct ptlrpc_reply_state *rs;
@@ -2944,7 +3021,7 @@ static void ptlrpc_stop_hr_threads(void)
                if (hrp->hrp_thrs == NULL)
                        continue; /* uninitialized */
                for (j = 0; j < hrp->hrp_nthrs; j++)
-                       wake_up_all(&hrp->hrp_thrs[j].hrt_waitq);
+                       wake_up(&hrp->hrp_thrs[j].hrt_waitq);
        }
 
        cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
@@ -2998,16 +3075,14 @@ static int ptlrpc_start_hr_threads(void)
 
 static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt)
 {
-       struct l_wait_info lwi = { 0 };
        struct ptlrpc_thread *thread;
-       struct list_head zombie;
+       LIST_HEAD(zombie);
 
        ENTRY;
 
        CDEBUG(D_INFO, "Stopping threads for service %s\n",
               svcpt->scp_service->srv_name);
 
-       INIT_LIST_HEAD(&zombie);
        spin_lock(&svcpt->scp_lock);
        /* let the thread know that we would like it to stop asap */
        list_for_each_entry(thread, &svcpt->scp_threads, t_link)
@@ -3015,29 +3090,28 @@ static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt)
 
        wake_up_all(&svcpt->scp_waitq);
 
-       while (!list_empty(&svcpt->scp_threads)) {
-               thread = list_entry(svcpt->scp_threads.next,
-                                       struct ptlrpc_thread, t_link);
+       while ((thread = list_first_entry_or_null(&svcpt->scp_threads,
+                                                 struct ptlrpc_thread,
+                                                 t_link)) != NULL) {
                if (thread_is_stopped(thread)) {
-                       list_del(&thread->t_link);
-                       list_add(&thread->t_link, &zombie);
+                       list_move(&thread->t_link, &zombie);
                        continue;
                }
                spin_unlock(&svcpt->scp_lock);
 
                CDEBUG(D_INFO, "waiting for stopping-thread %s #%u\n",
                       svcpt->scp_service->srv_thread_name, thread->t_id);
-               l_wait_event(thread->t_ctl_waitq,
-                            thread_is_stopped(thread), &lwi);
+               wait_event_idle(thread->t_ctl_waitq,
+                               thread_is_stopped(thread));
 
                spin_lock(&svcpt->scp_lock);
        }
 
        spin_unlock(&svcpt->scp_lock);
 
-       while (!list_empty(&zombie)) {
-               thread = list_entry(zombie.next,
-                                       struct ptlrpc_thread, t_link);
+       while ((thread = list_first_entry_or_null(&zombie,
+                                                 struct ptlrpc_thread,
+                                                 t_link)) != NULL) {
                list_del(&thread->t_link);
                OBD_FREE_PTR(thread);
        }
@@ -3047,7 +3121,7 @@ static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt)
 /**
  * Stops all threads of a particular service \a svc
  */
-void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
+static void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
 {
        struct ptlrpc_service_part *svcpt;
        int i;
@@ -3062,7 +3136,7 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
        EXIT;
 }
 
-int ptlrpc_start_threads(struct ptlrpc_service *svc)
+static int ptlrpc_start_threads(struct ptlrpc_service *svc)
 {
        int rc = 0;
        int i;
@@ -3094,9 +3168,8 @@ int ptlrpc_start_threads(struct ptlrpc_service *svc)
        RETURN(rc);
 }
 
-int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
+static int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
 {
-       struct l_wait_info lwi = { 0 };
        struct ptlrpc_thread *thread;
        struct ptlrpc_service *svc;
        struct task_struct *task;
@@ -3196,9 +3269,8 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
        if (!wait)
                RETURN(0);
 
-       l_wait_event(thread->t_ctl_waitq,
-                    thread_is_running(thread) || thread_is_stopped(thread),
-                    &lwi);
+       wait_event_idle(thread->t_ctl_waitq,
+                       thread_is_running(thread) || thread_is_stopped(thread));
 
        rc = thread_is_stopped(thread) ? thread->t_id : 0;
        RETURN(rc);
@@ -3216,7 +3288,7 @@ int ptlrpc_hr_init(void)
        ENTRY;
 
        memset(&ptlrpc_hr, 0, sizeof(ptlrpc_hr));
-       ptlrpc_hr.hr_cpt_table = cfs_cpt_table;
+       ptlrpc_hr.hr_cpt_table = cfs_cpt_tab;
 
        ptlrpc_hr.hr_partitions = cfs_percpt_alloc(ptlrpc_hr.hr_cpt_table,
                                                   sizeof(*hrp));
@@ -3228,7 +3300,9 @@ int ptlrpc_hr_init(void)
 
        init_waitqueue_head(&ptlrpc_hr.hr_waitq);
 
+       preempt_disable();
        weight = cpumask_weight(topology_sibling_cpumask(smp_processor_id()));
+       preempt_enable();
 
        cfs_percpt_for_each(hrp, cpt, ptlrpc_hr.hr_partitions) {
                hrp->hrp_cpt = cpt;
@@ -3275,10 +3349,8 @@ void ptlrpc_hr_fini(void)
        ptlrpc_stop_hr_threads();
 
        cfs_percpt_for_each(hrp, cpt, ptlrpc_hr.hr_partitions) {
-               if (hrp->hrp_thrs != NULL) {
-                       OBD_FREE(hrp->hrp_thrs,
-                                hrp->hrp_nthrs * sizeof(hrp->hrp_thrs[0]));
-               }
+               if (hrp->hrp_thrs)
+                       OBD_FREE_PTR_ARRAY(hrp->hrp_thrs, hrp->hrp_nthrs);
        }
 
        cfs_percpt_free(ptlrpc_hr.hr_partitions);
@@ -3292,13 +3364,10 @@ void ptlrpc_hr_fini(void)
 static void ptlrpc_wait_replies(struct ptlrpc_service_part *svcpt)
 {
        while (1) {
-               int rc;
-               struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(10),
-                                                    NULL, NULL);
-
-               rc = l_wait_event(svcpt->scp_waitq,
-                    atomic_read(&svcpt->scp_nreps_difficult) == 0, &lwi);
-               if (rc == 0)
+               if (wait_event_idle_timeout(
+                       svcpt->scp_waitq,
+                       atomic_read(&svcpt->scp_nreps_difficult) == 0,
+                       cfs_time_seconds(10)) > 0)
                        break;
                CWARN("Unexpectedly long timeout %s %p\n",
                      svcpt->scp_service->srv_name, svcpt->scp_service);
@@ -3323,7 +3392,6 @@ ptlrpc_service_unlink_rqbd(struct ptlrpc_service *svc)
 {
        struct ptlrpc_service_part *svcpt;
        struct ptlrpc_request_buffer_desc *rqbd;
-       struct l_wait_info lwi;
        int rc;
        int i;
 
@@ -3361,18 +3429,21 @@ ptlrpc_service_unlink_rqbd(struct ptlrpc_service *svc)
                 */
                spin_lock(&svcpt->scp_lock);
                while (svcpt->scp_nrqbds_posted != 0) {
+                       int seconds = PTLRPC_REQ_LONG_UNLINK;
+
                        spin_unlock(&svcpt->scp_lock);
                        /*
                         * Network access will complete in finite time but
                         * the HUGE timeout lets us CWARN for visibility
                         * of sluggish NALs
                         */
-                       lwi = LWI_TIMEOUT_INTERVAL(
-                                       cfs_time_seconds(LONG_UNLINK),
-                                       cfs_time_seconds(1), NULL, NULL);
-                       rc = l_wait_event(svcpt->scp_waitq,
-                                         svcpt->scp_nrqbds_posted == 0, &lwi);
-                       if (rc == -ETIMEDOUT) {
+                       while (seconds > 0 &&
+                              wait_event_idle_timeout(
+                                      svcpt->scp_waitq,
+                                      svcpt->scp_nrqbds_posted == 0,
+                                      cfs_time_seconds(1)) == 0)
+                               seconds -= 1;
+                       if (seconds == 0) {
                                CWARN("Service %s waiting for request buffers\n",
                                      svcpt->scp_service->srv_name);
                        }
@@ -3396,9 +3467,9 @@ ptlrpc_service_purge_all(struct ptlrpc_service *svc)
                        break;
 
                spin_lock(&svcpt->scp_rep_lock);
-               while (!list_empty(&svcpt->scp_rep_active)) {
-                       rs = list_entry(svcpt->scp_rep_active.next,
-                                           struct ptlrpc_reply_state, rs_list);
+               while ((rs = list_first_entry_or_null(&svcpt->scp_rep_active,
+                                                     struct ptlrpc_reply_state,
+                                                     rs_list)) != NULL) {
                        spin_lock(&rs->rs_lock);
                        ptlrpc_schedule_difficult_reply(rs);
                        spin_unlock(&rs->rs_lock);
@@ -3410,10 +3481,9 @@ ptlrpc_service_purge_all(struct ptlrpc_service *svc)
                 * all unlinked) and no service threads, so I'm the only
                 * thread noodling the request queue now
                 */
-               while (!list_empty(&svcpt->scp_req_incoming)) {
-                       req = list_entry(svcpt->scp_req_incoming.next,
-                                            struct ptlrpc_request, rq_list);
-
+               while ((req = list_first_entry_or_null(&svcpt->scp_req_incoming,
+                                                      struct ptlrpc_request,
+                                                      rq_list)) != NULL) {
                        list_del(&req->rq_list);
                        svcpt->scp_nreqs_incoming--;
                        ptlrpc_server_finish_request(svcpt, req);
@@ -3424,7 +3494,23 @@ ptlrpc_service_purge_all(struct ptlrpc_service *svc)
                        ptlrpc_server_finish_active_request(svcpt, req);
                }
 
-               LASSERT(list_empty(&svcpt->scp_rqbd_posted));
+               /*
+                * The portal may be shared by several services (eg:OUT_PORTAL).
+                * So the request could be referenced by other target. So we
+                * have to wait the ptlrpc_server_drop_request invoked.
+                *
+                * TODO: move the req_buffer as global rather than per service.
+                */
+               spin_lock(&svcpt->scp_lock);
+               while (!list_empty(&svcpt->scp_rqbd_posted)) {
+                       spin_unlock(&svcpt->scp_lock);
+                       wait_event_idle_timeout(svcpt->scp_waitq,
+                               list_empty(&svcpt->scp_rqbd_posted),
+                               cfs_time_seconds(1));
+                       spin_lock(&svcpt->scp_lock);
+               }
+               spin_unlock(&svcpt->scp_lock);
+
                LASSERT(svcpt->scp_nreqs_incoming == 0);
                LASSERT(svcpt->scp_nreqs_active == 0);
                /*
@@ -3437,19 +3523,16 @@ ptlrpc_service_purge_all(struct ptlrpc_service *svc)
                 * Now free all the request buffers since nothing
                 * references them any more...
                 */
-
-               while (!list_empty(&svcpt->scp_rqbd_idle)) {
-                       rqbd = list_entry(svcpt->scp_rqbd_idle.next,
-                                             struct ptlrpc_request_buffer_desc,
-                                             rqbd_list);
+               while ((rqbd = list_first_entry_or_null(&svcpt->scp_rqbd_idle,
+                                                       struct ptlrpc_request_buffer_desc,
+                                                       rqbd_list)) != NULL)
                        ptlrpc_free_rqbd(rqbd);
-               }
+
                ptlrpc_wait_replies(svcpt);
 
-               while (!list_empty(&svcpt->scp_rep_idle)) {
-                       rs = list_entry(svcpt->scp_rep_idle.next,
-                                           struct ptlrpc_reply_state,
-                                           rs_list);
+               while ((rs = list_first_entry_or_null(&svcpt->scp_rep_idle,
+                                                     struct ptlrpc_reply_state,
+                                                     rs_list)) != NULL) {
                        list_del(&rs->rs_list);
                        OBD_FREE_LARGE(rs, svc->srv_max_reply_size);
                }
@@ -3472,14 +3555,14 @@ ptlrpc_service_free(struct ptlrpc_service *svc)
                array = &svcpt->scp_at_array;
 
                if (array->paa_reqs_array != NULL) {
-                       OBD_FREE(array->paa_reqs_array,
-                                sizeof(struct list_head) * array->paa_size);
+                       OBD_FREE_PTR_ARRAY(array->paa_reqs_array,
+                                          array->paa_size);
                        array->paa_reqs_array = NULL;
                }
 
                if (array->paa_reqs_count != NULL) {
-                       OBD_FREE(array->paa_reqs_count,
-                                sizeof(__u32) * array->paa_size);
+                       OBD_FREE_PTR_ARRAY(array->paa_reqs_count,
+                                          array->paa_size);
                        array->paa_reqs_count = NULL;
                }
        }