Whamcloud - gitweb
LU-1854 ptlrpc: track culled request seq
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index d601924..672704d 100644 (file)
@@ -65,6 +65,7 @@ CFS_MODULE_PARM(at_extra, "i", int, 0644,
 /* forward ref */
 static int ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt);
 static void ptlrpc_hpreq_fini(struct ptlrpc_request *req);
+static void ptlrpc_at_remove_timed(struct ptlrpc_request *req);
 
 static CFS_LIST_HEAD(ptlrpc_all_services);
 cfs_spinlock_t ptlrpc_all_services_lock;
@@ -175,25 +176,48 @@ ptlrpc_save_lock(struct ptlrpc_request *req,
                 rs->rs_no_ack = !!no_ack;
         }
 }
+EXPORT_SYMBOL(ptlrpc_save_lock);
 
 #ifdef __KERNEL__
 
-#define HRT_RUNNING 0
-#define HRT_STOPPING 1
+struct ptlrpc_hr_partition;
 
 struct ptlrpc_hr_thread {
-        cfs_spinlock_t        hrt_lock;
-        unsigned long         hrt_flags;
-        cfs_waitq_t           hrt_wait;
-        cfs_list_t            hrt_queue;
-        cfs_completion_t      hrt_completion;
+       int                             hrt_id;         /* thread ID */
+       cfs_spinlock_t                  hrt_lock;
+       cfs_waitq_t                     hrt_waitq;
+       cfs_list_t                      hrt_queue;      /* RS queue */
+       struct ptlrpc_hr_partition      *hrt_partition;
 };
 
+struct ptlrpc_hr_partition {
+       /* # of started threads */
+       cfs_atomic_t                    hrp_nstarted;
+       /* # of stopped threads */
+       cfs_atomic_t                    hrp_nstopped;
+       /* cpu partition id */
+       int                             hrp_cpt;
+       /* round-robin rotor for choosing thread */
+       int                             hrp_rotor;
+       /* total number of threads on this partition */
+       int                             hrp_nthrs;
+       /* threads table */
+       struct ptlrpc_hr_thread         *hrp_thrs;
+};
+
+#define HRT_RUNNING 0
+#define HRT_STOPPING 1
+
 struct ptlrpc_hr_service {
-        int                     hr_index;
-        int                     hr_n_threads;
-        int                     hr_size;
-        struct ptlrpc_hr_thread hr_threads[0];
+       /* CPU partition table, it's just cfs_cpt_table for now */
+       struct cfs_cpt_table            *hr_cpt_table;
+       /** controller sleep waitq */
+       cfs_waitq_t                     hr_waitq;
+        unsigned int                   hr_stopping;
+       /** roundrobin rotor for non-affinity service */
+       unsigned int                    hr_rotor;
+       /* partition data */
+       struct ptlrpc_hr_partition      **hr_partitions;
 };
 
 struct rs_batch {
@@ -202,10 +226,8 @@ struct rs_batch {
        struct ptlrpc_service_part      *rsb_svcpt;
 };
 
-/**
- *  A pointer to per-node reply handling service.
- */
-static struct ptlrpc_hr_service *ptlrpc_hr = NULL;
+/** reply handling service. */
+static struct ptlrpc_hr_service                ptlrpc_hr;
 
 /**
  * maximum mumber of replies scheduled in one batch
@@ -226,17 +248,26 @@ static void rs_batch_init(struct rs_batch *b)
 /**
  * Choose an hr thread to dispatch requests to.
  */
-static unsigned int get_hr_thread_index(struct ptlrpc_hr_service *hr)
+static struct ptlrpc_hr_thread *
+ptlrpc_hr_select(struct ptlrpc_service_part *svcpt)
 {
-        unsigned int idx;
+       struct ptlrpc_hr_partition      *hrp;
+       unsigned int                    rotor;
 
-        /* Concurrent modification of hr_index w/o any spinlock
-           protection is harmless as long as the result fits
-           [0..(hr_n_threads-1)] range and each thread gets near equal
-           load. */
-        idx = hr->hr_index;
-        hr->hr_index = (idx >= hr->hr_n_threads - 1) ? 0 : idx + 1;
-        return idx;
+       if (svcpt->scp_cpt >= 0 &&
+           svcpt->scp_service->srv_cptable == ptlrpc_hr.hr_cpt_table) {
+               /* directly match partition */
+               hrp = ptlrpc_hr.hr_partitions[svcpt->scp_cpt];
+
+       } else {
+               rotor = ptlrpc_hr.hr_rotor++;
+               rotor %= cfs_cpt_number(ptlrpc_hr.hr_cpt_table);
+
+               hrp = ptlrpc_hr.hr_partitions[rotor];
+       }
+
+       rotor = hrp->hrp_rotor++;
+       return &hrp->hrp_thrs[rotor % hrp->hrp_nthrs];
 }
 
 /**
@@ -247,19 +278,18 @@ static unsigned int get_hr_thread_index(struct ptlrpc_hr_service *hr)
  */
 static void rs_batch_dispatch(struct rs_batch *b)
 {
-        if (b->rsb_n_replies != 0) {
-                struct ptlrpc_hr_service *hr = ptlrpc_hr;
-                int idx;
+       if (b->rsb_n_replies != 0) {
+               struct ptlrpc_hr_thread *hrt;
 
-                idx = get_hr_thread_index(hr);
+               hrt = ptlrpc_hr_select(b->rsb_svcpt);
 
-                cfs_spin_lock(&hr->hr_threads[idx].hrt_lock);
-                cfs_list_splice_init(&b->rsb_replies,
-                                     &hr->hr_threads[idx].hrt_queue);
-                cfs_spin_unlock(&hr->hr_threads[idx].hrt_lock);
-                cfs_waitq_signal(&hr->hr_threads[idx].hrt_wait);
-                b->rsb_n_replies = 0;
-        }
+               cfs_spin_lock(&hrt->hrt_lock);
+               cfs_list_splice_init(&b->rsb_replies, &hrt->hrt_queue);
+               cfs_spin_unlock(&hrt->hrt_lock);
+
+               cfs_waitq_signal(&hrt->hrt_waitq);
+               b->rsb_n_replies = 0;
+       }
 }
 
 /**
@@ -325,18 +355,19 @@ static void rs_batch_fini(struct rs_batch *b)
 void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs)
 {
 #ifdef __KERNEL__
-        struct ptlrpc_hr_service *hr = ptlrpc_hr;
-        int idx;
-        ENTRY;
+       struct ptlrpc_hr_thread *hrt;
+       ENTRY;
 
-        LASSERT(cfs_list_empty(&rs->rs_list));
+       LASSERT(cfs_list_empty(&rs->rs_list));
 
-        idx = get_hr_thread_index(hr);
-        cfs_spin_lock(&hr->hr_threads[idx].hrt_lock);
-        cfs_list_add_tail(&rs->rs_list, &hr->hr_threads[idx].hrt_queue);
-        cfs_spin_unlock(&hr->hr_threads[idx].hrt_lock);
-        cfs_waitq_signal(&hr->hr_threads[idx].hrt_wait);
-        EXIT;
+       hrt = ptlrpc_hr_select(rs->rs_svcpt);
+
+       cfs_spin_lock(&hrt->hrt_lock);
+       cfs_list_add_tail(&rs->rs_list, &hrt->hrt_queue);
+       cfs_spin_unlock(&hrt->hrt_lock);
+
+       cfs_waitq_signal(&hrt->hrt_waitq);
+       EXIT;
 #else
        cfs_list_add_tail(&rs->rs_list, &rs->rs_svcpt->scp_rep_queue);
 #endif
@@ -362,6 +393,7 @@ ptlrpc_schedule_difficult_reply(struct ptlrpc_reply_state *rs)
         ptlrpc_dispatch_difficult_reply(rs);
         EXIT;
 }
+EXPORT_SYMBOL(ptlrpc_schedule_difficult_reply);
 
 void ptlrpc_commit_replies(struct obd_export *exp)
 {
@@ -389,6 +421,7 @@ void ptlrpc_commit_replies(struct obd_export *exp)
         rs_batch_fini(&batch);
         EXIT;
 }
+EXPORT_SYMBOL(ptlrpc_commit_replies);
 
 static int
 ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt)
@@ -789,6 +822,7 @@ failed:
        ptlrpc_unregister_service(service);
        RETURN(ERR_PTR(rc));
 }
+EXPORT_SYMBOL(ptlrpc_register_service);
 
 /**
  * to actually free the request, must be called without holding svc_lock.
@@ -830,24 +864,16 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
                 return;
 
        if (req->rq_at_linked) {
-               struct ptlrpc_at_array *array = &svcpt->scp_at_array;
-                __u32 index = req->rq_at_index;
-
                cfs_spin_lock(&svcpt->scp_at_lock);
-
-                LASSERT(!cfs_list_empty(&req->rq_timed_list));
-                cfs_list_del_init(&req->rq_timed_list);
-                cfs_spin_lock(&req->rq_lock);
-                req->rq_at_linked = 0;
-                cfs_spin_unlock(&req->rq_lock);
-                array->paa_reqs_count[index]--;
-                array->paa_count--;
-
+               /* recheck with lock, in case it's unlinked by
+                * ptlrpc_at_check_timed() */
+               if (likely(req->rq_at_linked))
+                       ptlrpc_at_remove_timed(req);
                cfs_spin_unlock(&svcpt->scp_at_lock);
-       } else {
-               LASSERT(cfs_list_empty(&req->rq_timed_list));
        }
 
+       LASSERT(cfs_list_empty(&req->rq_timed_list));
+
         /* finalize request */
         if (req->rq_export) {
                 class_export_put(req->rq_export);
@@ -917,6 +943,10 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
                cfs_list_del(&req->rq_list);
                cfs_list_del_init(&req->rq_history_list);
 
+               /* Track the highest culled req seq */
+               if (req->rq_history_seq > svcpt->scp_hist_seq_culled)
+                       svcpt->scp_hist_seq_culled = req->rq_history_seq;
+
                cfs_spin_unlock(&svcpt->scp_lock);
 
                ptlrpc_server_free_request(req);
@@ -1150,6 +1180,25 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
        return 0;
 }
 
+static void
+ptlrpc_at_remove_timed(struct ptlrpc_request *req)
+{
+       struct ptlrpc_at_array *array;
+
+       array = &req->rq_rqbd->rqbd_svcpt->scp_at_array;
+
+       /* NB: must call with hold svcpt::scp_at_lock */
+       LASSERT(!cfs_list_empty(&req->rq_timed_list));
+       cfs_list_del_init(&req->rq_timed_list);
+
+       cfs_spin_lock(&req->rq_lock);
+       req->rq_at_linked = 0;
+       cfs_spin_unlock(&req->rq_lock);
+
+       array->paa_reqs_count[req->rq_at_index]--;
+       array->paa_count--;
+}
+
 static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
 {
        struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
@@ -1333,29 +1382,23 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
                 cfs_list_for_each_entry_safe(rq, n,
                                              &array->paa_reqs_array[index],
                                              rq_timed_list) {
-                        if (rq->rq_deadline <= now + at_early_margin) {
-                                cfs_list_del_init(&rq->rq_timed_list);
-                                /**
-                                 * ptlrpc_server_drop_request() may drop
-                                 * refcount to 0 already. Let's check this and
-                                 * don't add entry to work_list
-                                 */
-                                if (likely(cfs_atomic_inc_not_zero(&rq->rq_refcount)))
-                                        cfs_list_add(&rq->rq_timed_list, &work_list);
-                                counter++;
-                                array->paa_reqs_count[index]--;
-                                array->paa_count--;
-                                cfs_spin_lock(&rq->rq_lock);
-                                rq->rq_at_linked = 0;
-                                cfs_spin_unlock(&rq->rq_lock);
-                                continue;
-                        }
-
-                        /* update the earliest deadline */
-                        if (deadline == -1 || rq->rq_deadline < deadline)
-                                deadline = rq->rq_deadline;
+                       if (rq->rq_deadline > now + at_early_margin) {
+                               /* update the earliest deadline */
+                               if (deadline == -1 ||
+                                   rq->rq_deadline < deadline)
+                                       deadline = rq->rq_deadline;
+                               break;
+                       }
 
-                        break;
+                       ptlrpc_at_remove_timed(rq);
+                       /**
+                        * ptlrpc_server_drop_request() may drop
+                        * refcount to 0 already. Let's check this and
+                        * don't add entry to work_list
+                        */
+                       if (likely(cfs_atomic_inc_not_zero(&rq->rq_refcount)))
+                               cfs_list_add(&rq->rq_timed_list, &work_list);
+                       counter++;
                 }
 
                 if (++index >= array->paa_size)
@@ -1446,6 +1489,32 @@ static void ptlrpc_hpreq_fini(struct ptlrpc_request *req)
         EXIT;
 }
 
+static int ptlrpc_hpreq_check(struct ptlrpc_request *req)
+{
+       return 1;
+}
+
+static struct ptlrpc_hpreq_ops ptlrpc_hpreq_common = {
+       .hpreq_lock_match  = NULL,
+       .hpreq_check       = ptlrpc_hpreq_check,
+       .hpreq_fini        = NULL,
+};
+
+/* Hi-Priority RPC check by RPC operation code. */
+int ptlrpc_hpreq_handler(struct ptlrpc_request *req)
+{
+       int opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+       /* Check for export to let only reconnects for not yet evicted
+        * export to become a HP rpc. */
+       if ((req->rq_export != NULL) &&
+           (opc == OBD_PING || opc == MDS_CONNECT || opc == OST_CONNECT))
+               req->rq_ops = &ptlrpc_hpreq_common;
+
+       return 0;
+}
+EXPORT_SYMBOL(ptlrpc_hpreq_handler);
+
 /**
  * Make the request a high priority one.
  *
@@ -1493,18 +1562,13 @@ void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
        cfs_spin_unlock(&svcpt->scp_req_lock);
        EXIT;
 }
+EXPORT_SYMBOL(ptlrpc_hpreq_reorder);
 
 /** Check if the request is a high priority one. */
 static int ptlrpc_server_hpreq_check(struct ptlrpc_service *svc,
                                      struct ptlrpc_request *req)
 {
-        ENTRY;
-
-        /* Check by request opc. */
-        if (OBD_PING == lustre_msg_get_opc(req->rq_reqmsg))
-                RETURN(1);
-
-        RETURN(ptlrpc_hpreq_init(svc, req));
+       return ptlrpc_hpreq_init(svc, req);
 }
 
 /** Check if a request is a high priority one. */
@@ -2461,23 +2525,18 @@ out:
        return rc;
 }
 
-struct ptlrpc_hr_args {
-        int                       thread_index;
-        int                       cpu_index;
-        struct ptlrpc_hr_service *hrs;
-};
-
-static int hrt_dont_sleep(struct ptlrpc_hr_thread *t,
-                          cfs_list_t *replies)
+static int hrt_dont_sleep(struct ptlrpc_hr_thread *hrt,
+                         cfs_list_t *replies)
 {
-        int result;
+       int result;
 
-        cfs_spin_lock(&t->hrt_lock);
-        cfs_list_splice_init(&t->hrt_queue, replies);
-        result = cfs_test_bit(HRT_STOPPING, &t->hrt_flags) ||
-                !cfs_list_empty(replies);
-        cfs_spin_unlock(&t->hrt_lock);
-        return result;
+       cfs_spin_lock(&hrt->hrt_lock);
+
+       cfs_list_splice_init(&hrt->hrt_queue, replies);
+       result = ptlrpc_hr.hr_stopping || !cfs_list_empty(replies);
+
+       cfs_spin_unlock(&hrt->hrt_lock);
+       return result;
 }
 
 /**
@@ -2486,26 +2545,28 @@ static int hrt_dont_sleep(struct ptlrpc_hr_thread *t,
  */
 static int ptlrpc_hr_main(void *arg)
 {
-        struct ptlrpc_hr_args * hr_args = arg;
-        struct ptlrpc_hr_service *hr = hr_args->hrs;
-        struct ptlrpc_hr_thread *t = &hr->hr_threads[hr_args->thread_index];
-        char threadname[20];
-        CFS_LIST_HEAD(replies);
+       struct ptlrpc_hr_thread         *hrt = (struct ptlrpc_hr_thread *)arg;
+       struct ptlrpc_hr_partition      *hrp = hrt->hrt_partition;
+       CFS_LIST_HEAD                   (replies);
+       char                            threadname[20];
+       int                             rc;
 
-        snprintf(threadname, sizeof(threadname),
-                 "ptlrpc_hr_%d", hr_args->thread_index);
+       snprintf(threadname, sizeof(threadname), "ptlrpc_hr%02d_%03d",
+                hrp->hrp_cpt, hrt->hrt_id);
+       cfs_daemonize_ctxt(threadname);
 
-        cfs_daemonize_ctxt(threadname);
-#if defined(CONFIG_NUMA) && defined(HAVE_NODE_TO_CPUMASK)
-        cfs_set_cpus_allowed(cfs_current(),
-                             node_to_cpumask(cpu_to_node(hr_args->cpu_index)));
-#endif
-        cfs_set_bit(HRT_RUNNING, &t->hrt_flags);
-        cfs_waitq_signal(&t->hrt_wait);
+       rc = cfs_cpt_bind(ptlrpc_hr.hr_cpt_table, hrp->hrp_cpt);
+       if (rc != 0) {
+               CWARN("Failed to bind %s on CPT %d of CPT table %p: rc = %d\n",
+                     threadname, hrp->hrp_cpt, ptlrpc_hr.hr_cpt_table, rc);
+       }
 
-        while (!cfs_test_bit(HRT_STOPPING, &t->hrt_flags)) {
+       cfs_atomic_inc(&hrp->hrp_nstarted);
+       cfs_waitq_signal(&ptlrpc_hr.hr_waitq);
+
+       while (!ptlrpc_hr.hr_stopping) {
+               l_wait_condition(hrt->hrt_waitq, hrt_dont_sleep(hrt, &replies));
 
-                l_wait_condition(t->hrt_wait, hrt_dont_sleep(t, &replies));
                 while (!cfs_list_empty(&replies)) {
                         struct ptlrpc_reply_state *rs;
 
@@ -2517,89 +2578,64 @@ static int ptlrpc_hr_main(void *arg)
                 }
         }
 
-        cfs_clear_bit(HRT_RUNNING, &t->hrt_flags);
-        cfs_complete(&t->hrt_completion);
+       cfs_atomic_inc(&hrp->hrp_nstopped);
+       cfs_waitq_signal(&ptlrpc_hr.hr_waitq);
 
-        return 0;
+       return 0;
 }
 
-static int ptlrpc_start_hr_thread(struct ptlrpc_hr_service *hr, int n, int cpu)
+static void ptlrpc_stop_hr_threads(void)
 {
-        struct ptlrpc_hr_thread *t = &hr->hr_threads[n];
-        struct ptlrpc_hr_args args;
-        int rc;
-        ENTRY;
+       struct ptlrpc_hr_partition      *hrp;
+       int                             i;
+       int                             j;
 
-        args.thread_index = n;
-        args.cpu_index = cpu;
-        args.hrs = hr;
+       ptlrpc_hr.hr_stopping = 1;
 
-        rc = cfs_create_thread(ptlrpc_hr_main, (void*)&args, CFS_DAEMON_FLAGS);
-        if (rc < 0) {
-                cfs_complete(&t->hrt_completion);
-                GOTO(out, rc);
-        }
-        l_wait_condition(t->hrt_wait, cfs_test_bit(HRT_RUNNING, &t->hrt_flags));
-        RETURN(0);
- out:
-        return rc;
-}
-
-static void ptlrpc_stop_hr_thread(struct ptlrpc_hr_thread *t)
-{
-        ENTRY;
-
-        cfs_set_bit(HRT_STOPPING, &t->hrt_flags);
-        cfs_waitq_signal(&t->hrt_wait);
-        cfs_wait_for_completion(&t->hrt_completion);
+       cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
+               if (hrp->hrp_thrs == NULL)
+                       continue; /* uninitialized */
+               for (j = 0; j < hrp->hrp_nthrs; j++)
+                       cfs_waitq_broadcast(&hrp->hrp_thrs[j].hrt_waitq);
+       }
 
-        EXIT;
+       cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
+               if (hrp->hrp_thrs == NULL)
+                       continue; /* uninitialized */
+               cfs_wait_event(ptlrpc_hr.hr_waitq,
+                              cfs_atomic_read(&hrp->hrp_nstopped) ==
+                              cfs_atomic_read(&hrp->hrp_nstarted));
+       }
 }
 
-static void ptlrpc_stop_hr_threads(struct ptlrpc_hr_service *hrs)
+static int ptlrpc_start_hr_threads(void)
 {
-        int n;
-        ENTRY;
-
-        for (n = 0; n < hrs->hr_n_threads; n++)
-                ptlrpc_stop_hr_thread(&hrs->hr_threads[n]);
-
-        EXIT;
-}
+       struct ptlrpc_hr_partition      *hrp;
+       int                             i;
+       int                             j;
+       ENTRY;
 
-static int ptlrpc_start_hr_threads(struct ptlrpc_hr_service *hr)
-{
-        int rc = -ENOMEM;
-        int n, cpu, threads_started = 0;
-        ENTRY;
+       cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
+               int     rc = 0;
 
-        LASSERT(hr != NULL);
-        LASSERT(hr->hr_n_threads > 0);
+               for (j = 0; j < hrp->hrp_nthrs; j++) {
+                       rc = cfs_create_thread(ptlrpc_hr_main,
+                                              &hrp->hrp_thrs[j],
+                                              CLONE_VM | CLONE_FILES);
+                       if (rc < 0)
+                               break;
+               }
+               cfs_wait_event(ptlrpc_hr.hr_waitq,
+                              cfs_atomic_read(&hrp->hrp_nstarted) == j);
+               if (rc >= 0)
+                       continue;
 
-        for (n = 0, cpu = 0; n < hr->hr_n_threads; n++) {
-#if defined(CONFIG_SMP) && defined(HAVE_NODE_TO_CPUMASK)
-                while (!cpu_online(cpu)) {
-                        cpu++;
-                        if (cpu >= cfs_num_possible_cpus())
-                                cpu = 0;
-                }
-#endif
-                rc = ptlrpc_start_hr_thread(hr, n, cpu);
-                if (rc != 0)
-                        break;
-                threads_started++;
-                cpu++;
-        }
-        if (threads_started == 0) {
-                CERROR("No reply handling threads started\n");
-                RETURN(-ESRCH);
-        }
-        if (threads_started < hr->hr_n_threads) {
-                CWARN("Started only %d reply handling threads from %d\n",
-                      threads_started, hr->hr_n_threads);
-                hr->hr_n_threads = threads_started;
-        }
-        RETURN(0);
+               CERROR("Reply handling thread %d:%d Failed on starting: "
+                      "rc = %d\n", i, j, rc);
+               ptlrpc_stop_hr_threads();
+               RETURN(rc);
+       }
+       RETURN(0);
 }
 
 static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt)
@@ -2668,6 +2704,7 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
 
        EXIT;
 }
+EXPORT_SYMBOL(ptlrpc_stop_all_threads);
 
 int ptlrpc_start_threads(struct ptlrpc_service *svc)
 {
@@ -2699,6 +2736,7 @@ int ptlrpc_start_threads(struct ptlrpc_service *svc)
        ptlrpc_stop_all_threads(svc);
        RETURN(rc);
 }
+EXPORT_SYMBOL(ptlrpc_start_threads);
 
 int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
 {
@@ -2800,46 +2838,75 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
 
 int ptlrpc_hr_init(void)
 {
-        int i;
-        int n_cpus = cfs_num_online_cpus();
-        struct ptlrpc_hr_service *hr;
-        int size;
-        int rc;
-        ENTRY;
+       struct ptlrpc_hr_partition      *hrp;
+       struct ptlrpc_hr_thread         *hrt;
+       int                             rc;
+       int                             i;
+       int                             j;
+       ENTRY;
 
-        LASSERT(ptlrpc_hr == NULL);
+       memset(&ptlrpc_hr, 0, sizeof(ptlrpc_hr));
+       ptlrpc_hr.hr_cpt_table = cfs_cpt_table;
 
-        size = offsetof(struct ptlrpc_hr_service, hr_threads[n_cpus]);
-        OBD_ALLOC(hr, size);
-        if (hr == NULL)
-                RETURN(-ENOMEM);
-        for (i = 0; i < n_cpus; i++) {
-                struct ptlrpc_hr_thread *t = &hr->hr_threads[i];
+       ptlrpc_hr.hr_partitions = cfs_percpt_alloc(ptlrpc_hr.hr_cpt_table,
+                                                  sizeof(*hrp));
+       if (ptlrpc_hr.hr_partitions == NULL)
+               RETURN(-ENOMEM);
 
-                cfs_spin_lock_init(&t->hrt_lock);
-                cfs_waitq_init(&t->hrt_wait);
-                CFS_INIT_LIST_HEAD(&t->hrt_queue);
-                cfs_init_completion(&t->hrt_completion);
-        }
-        hr->hr_n_threads = n_cpus;
-        hr->hr_size = size;
-        ptlrpc_hr = hr;
+       cfs_waitq_init(&ptlrpc_hr.hr_waitq);
 
-        rc = ptlrpc_start_hr_threads(hr);
-        if (rc) {
-                OBD_FREE(hr, hr->hr_size);
-                ptlrpc_hr = NULL;
-        }
-        RETURN(rc);
+       cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
+               hrp->hrp_cpt = i;
+
+               cfs_atomic_set(&hrp->hrp_nstarted, 0);
+               cfs_atomic_set(&hrp->hrp_nstopped, 0);
+
+               hrp->hrp_nthrs = cfs_cpt_weight(ptlrpc_hr.hr_cpt_table, i);
+               hrp->hrp_nthrs /= cfs_cpu_ht_nsiblings(0);
+
+               LASSERT(hrp->hrp_nthrs > 0);
+               OBD_CPT_ALLOC(hrp->hrp_thrs, ptlrpc_hr.hr_cpt_table, i,
+                             hrp->hrp_nthrs * sizeof(*hrt));
+               if (hrp->hrp_thrs == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               for (j = 0; j < hrp->hrp_nthrs; j++) {
+                       hrt = &hrp->hrp_thrs[j];
+
+                       hrt->hrt_id = j;
+                       hrt->hrt_partition = hrp;
+                       cfs_waitq_init(&hrt->hrt_waitq);
+                       cfs_spin_lock_init(&hrt->hrt_lock);
+                       CFS_INIT_LIST_HEAD(&hrt->hrt_queue);
+               }
+       }
+
+       rc = ptlrpc_start_hr_threads();
+out:
+       if (rc != 0)
+               ptlrpc_hr_fini();
+       RETURN(rc);
 }
 
 void ptlrpc_hr_fini(void)
 {
-        if (ptlrpc_hr != NULL) {
-                ptlrpc_stop_hr_threads(ptlrpc_hr);
-                OBD_FREE(ptlrpc_hr, ptlrpc_hr->hr_size);
-                ptlrpc_hr = NULL;
-        }
+       struct ptlrpc_hr_partition      *hrp;
+       int                             i;
+
+       if (ptlrpc_hr.hr_partitions == NULL)
+               return;
+
+       ptlrpc_stop_hr_threads();
+
+       cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
+               if (hrp->hrp_thrs != NULL) {
+                       OBD_FREE(hrp->hrp_thrs,
+                                hrp->hrp_nthrs * sizeof(hrp->hrp_thrs[0]));
+               }
+       }
+
+       cfs_percpt_free(ptlrpc_hr.hr_partitions);
+       ptlrpc_hr.hr_partitions = NULL;
 }
 
 #endif /* __KERNEL__ */
@@ -3066,6 +3133,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
 
        RETURN(0);
 }
+EXPORT_SYMBOL(ptlrpc_unregister_service);
 
 /**
  * Returns 0 if the service is healthy.
@@ -3126,3 +3194,4 @@ ptlrpc_service_health_check(struct ptlrpc_service *svc)
        }
        return 0;
 }
+EXPORT_SYMBOL(ptlrpc_service_health_check);