Whamcloud - gitweb
ORNL-22 general ptlrpcd threads pool support
authorFan Yong <yong.fan@whamcloud.com>
Sat, 15 Oct 2011 16:09:53 +0000 (00:09 +0800)
committerOleg Drokin <green@whamcloud.com>
Mon, 31 Oct 2011 01:06:43 +0000 (21:06 -0400)
Originally, there were two ptlrpcd threads on each node to serve all
async RPCs on the node, one ptlrpcd is for BRW, the other is for all
others. Such load mode cannot match more and more async RPCs process
on current large SMP node.

So we introduce ptlrpcd threads pool, any ptlrpcd threads in the pool
can be common shared by all async RPCs, like async I/O, async glimpse
lock, statahead, and ect. The async RPC sponsor can affect the system
load mode by specifying load policy when pushes the RPC into ptlrpcd
queue. On the other hand, it supports some flexible binding policies
to bind some ptlrpcd threads on CPU cores for reducing cross-CPU data
traffic, and also allow some ptlrpcd threads to be scheduled freely
on any CPU core to try to guarantee processing async RPCs in time.

Signed-off-by: Fan Yong <yong.fan@whamcloud.com>
Change-Id: Icc0bd689df73b6863cc9adc544c3654c046cb8bd
Reviewed-on: http://review.whamcloud.com/1184
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
18 files changed:
lustre/include/lustre_net.h
lustre/ldlm/ldlm_request.c
lustre/lov/lov_internal.h
lustre/lov/lov_obd.c
lustre/lov/lov_qos.c
lustre/lov/lov_request.c
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_request.c
lustre/obdclass/genops.c
lustre/osc/osc_create.c
lustre/osc/osc_request.c
lustre/ptlrpc/client.c
lustre/ptlrpc/import.c
lustre/ptlrpc/pinger.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/ptlrpcd.c
lustre/ptlrpc/recov_thread.c
lustre/quota/quota_context.c

index 82fe858..50c3077 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
@@ -258,6 +261,9 @@ typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int);
  * returned.
  */
 struct ptlrpc_request_set {
  * returned.
  */
 struct ptlrpc_request_set {
+        cfs_atomic_t          set_refcount;
+        /** number of in queue requests */
+        cfs_atomic_t          set_new_count;
         /** number of uncompleted requests */
         cfs_atomic_t          set_remaining;
         /** wait queue to wait on for request events */
         /** number of uncompleted requests */
         cfs_atomic_t          set_remaining;
         /** wait queue to wait on for request events */
@@ -1237,6 +1243,22 @@ struct ptlrpcd_ctl {
          * Environment for request interpreters to run in.
          */
         struct lu_env               pc_env;
          * Environment for request interpreters to run in.
          */
         struct lu_env               pc_env;
+        /**
+         * Index of ptlrpcd thread in the array.
+         */
+        int                         pc_index;
+        /**
+         * Number of the ptlrpcd's partners.
+         */
+        int                         pc_npartners;
+        /**
+         * Pointer to the array of partners' ptlrpcd_ctl structure.
+         */
+        struct ptlrpcd_ctl        **pc_partners;
+        /**
+         * Record the partner index to be processed next.
+         */
+        int                         pc_cursor;
 #ifndef __KERNEL__
         /**
          * Async rpcs flag to make sure that ptlrpcd_check() is called only
 #ifndef __KERNEL__
         /**
          * Async rpcs flag to make sure that ptlrpcd_check() is called only
@@ -1277,7 +1299,11 @@ enum ptlrpcd_ctl_flags {
         /**
          * This is a recovery ptlrpc thread.
          */
         /**
          * This is a recovery ptlrpc thread.
          */
-        LIOD_RECOVERY    = 1 << 3
+        LIOD_RECOVERY    = 1 << 3,
+        /**
+         * The ptlrpcd is bound to some CPU core.
+         */
+        LIOD_BIND        = 1 << 4,
 };
 
 /* ptlrpc/events.c */
 };
 
 /* ptlrpc/events.c */
@@ -1391,8 +1417,8 @@ void ptlrpc_interrupted_set(void *data);
 void ptlrpc_mark_interrupted(struct ptlrpc_request *req);
 void ptlrpc_set_destroy(struct ptlrpc_request_set *);
 void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *);
 void ptlrpc_mark_interrupted(struct ptlrpc_request *req);
 void ptlrpc_set_destroy(struct ptlrpc_request_set *);
 void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *);
-int ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
-                           struct ptlrpc_request *req);
+void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
+                            struct ptlrpc_request *req);
 
 void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool);
 void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq);
 
 void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool);
 void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq);
@@ -1826,26 +1852,40 @@ void ping_evictor_stop(void);
 int ptlrpc_check_and_wait_suspend(struct ptlrpc_request *req);
 /** @} */
 
 int ptlrpc_check_and_wait_suspend(struct ptlrpc_request *req);
 /** @} */
 
-/* ptlrpc/ptlrpcd.c */
+/* ptlrpc daemon bind policy */
+typedef enum {
+        /* all ptlrpcd threads are free mode */
+        PDB_POLICY_NONE          = 1,
+        /* all ptlrpcd threads are bound mode */
+        PDB_POLICY_FULL          = 2,
+        /* <free1 bound1> <free2 bound2> ... <freeN boundN> */
+        PDB_POLICY_PAIR          = 3,
+        /* <free1 bound1> <bound1 free2> ... <freeN boundN> <boundN free1>,
+         * means each ptlrpcd[X] has two partners: thread[X-1] and thread[X+1]*/
+        PDB_POLICY_NEIGHBOR      = 4,
+} pdb_policy_t;
+
+/* ptlrpc daemon load policy
+ * It is caller's duty to specify how to push the async RPC into some ptlrpcd
+ * queue, but it is not enforced, affected by "ptlrpcd_bind_policy". If it is
+ * "PDB_POLICY_FULL", then the RPC will be processed by the selected ptlrpcd,
+ * Otherwise, the RPC may be processed by the selected ptlrpcd or its partner,
+ * depends on which is scheduled firstly, to accelerate the RPC processing. */
+typedef enum {
+        /* on the same CPU core as the caller */
+        PDL_POLICY_SAME         = 1,
+        /* within the same CPU partition, but not the same core as the caller */
+        PDL_POLICY_LOCAL        = 2,
+        /* round-robin on all CPU cores, but not the same core as the caller */
+        PDL_POLICY_ROUND        = 3,
+        /* the specified CPU core is preferred, but not enforced */
+        PDL_POLICY_PREFERRED    = 4,
+} pdl_policy_t;
 
 
-/**
- * Ptlrpcd scope is a set of two threads: ptlrpcd-foo and ptlrpcd-foo-rcv,
- * these threads are used to asynchronously send requests queued with
- * ptlrpcd_add_req(req, PCSOPE_FOO), and to handle completion call-backs for
- * such requests. Multiple scopes are needed to avoid dead-locks.
- */
-enum ptlrpcd_scope {
-        /** Scope of bulk read-write rpcs. */
-        PSCOPE_BRW,
-        /** Everything else. */
-        PSCOPE_OTHER,
-        PSCOPE_NR
-};
-
-int ptlrpcd_start(const char *name, struct ptlrpcd_ctl *pc);
+/* ptlrpc/ptlrpcd.c */
 void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force);
 void ptlrpcd_wake(struct ptlrpc_request *req);
 void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force);
 void ptlrpcd_wake(struct ptlrpc_request *req);
-int ptlrpcd_add_req(struct ptlrpc_request *req, enum ptlrpcd_scope scope);
+void ptlrpcd_add_req(struct ptlrpc_request *req, pdl_policy_t policy, int idx);
 void ptlrpcd_add_rqset(struct ptlrpc_request_set *set);
 int ptlrpcd_addref(void);
 void ptlrpcd_decref(void);
 void ptlrpcd_add_rqset(struct ptlrpc_request_set *set);
 int ptlrpcd_addref(void);
 void ptlrpcd_decref(void);
index d244a75..e73b91a 100644 (file)
@@ -1152,7 +1152,7 @@ int ldlm_cli_cancel_req(struct obd_export *exp, cfs_list_t *cancels,
 
                 ptlrpc_request_set_replen(req);
                 if (flags & LCF_ASYNC) {
 
                 ptlrpc_request_set_replen(req);
                 if (flags & LCF_ASYNC) {
-                        ptlrpcd_add_req(req, PSCOPE_OTHER);
+                        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
                         sent = count;
                         GOTO(out, 0);
                 } else {
                         sent = count;
                         GOTO(out, 0);
                 } else {
@@ -2143,7 +2143,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
         aa = ptlrpc_req_async_args(req);
         aa->lock_handle = body->lock_handle[0];
         req->rq_interpret_reply = (ptlrpc_interpterer_t)replay_lock_interpret;
         aa = ptlrpc_req_async_args(req);
         aa->lock_handle = body->lock_handle[0];
         req->rq_interpret_reply = (ptlrpc_interpterer_t)replay_lock_interpret;
-        ptlrpcd_add_req(req, PSCOPE_OTHER);
+        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
 
         RETURN(0);
 }
 
         RETURN(0);
 }
index 4aafdef..28ddbd2 100644 (file)
@@ -75,8 +75,8 @@ struct lov_request_set {
            only. */
         struct obd_device       *set_obd;
         int                      set_count;
            only. */
         struct obd_device       *set_obd;
         int                      set_count;
-        int                      set_completes;
-        int                      set_success;
+        cfs_atomic_t             set_completes;
+        cfs_atomic_t             set_success;
         struct llog_cookie      *set_cookies;
         int                      set_cookie_sent;
         struct obd_trans_info   *set_oti;
         struct llog_cookie      *set_cookies;
         int                      set_cookie_sent;
         struct obd_trans_info   *set_oti;
@@ -243,8 +243,6 @@ int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
                        ldlm_policy_data_t *policy, __u32 mode,
                        struct lustre_handle *lockh,
                        struct lov_request_set **reqset);
                        ldlm_policy_data_t *policy, __u32 mode,
                        struct lustre_handle *lockh,
                        struct lov_request_set **reqset);
-int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
-                         int rc);
 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags);
 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
                         struct lov_stripe_md *lsm,
 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags);
 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
                         struct lov_stripe_md *lsm,
index 3f1191f..0497cb2 100644 (file)
@@ -1307,7 +1307,7 @@ static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
 
         /* don't do attribute merge if this aysnc op failed */
         if (rc)
 
         /* don't do attribute merge if this aysnc op failed */
         if (rc)
-                lovset->set_completes = 0;
+                cfs_atomic_set(&lovset->set_completes, 0);
         err = lov_fini_getattr_set(lovset);
         RETURN(rc ? rc : err);
 }
         err = lov_fini_getattr_set(lovset);
         RETURN(rc ? rc : err);
 }
@@ -1364,7 +1364,7 @@ static int lov_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
         }
 out:
         if (rc)
         }
 out:
         if (rc)
-                lovset->set_completes = 0;
+                cfs_atomic_set(&lovset->set_completes, 0);
         err = lov_fini_getattr_set(lovset);
         RETURN(rc ? rc : err);
 }
         err = lov_fini_getattr_set(lovset);
         RETURN(rc ? rc : err);
 }
@@ -1427,7 +1427,7 @@ static int lov_setattr_interpret(struct ptlrpc_request_set *rqset,
         ENTRY;
 
         if (rc)
         ENTRY;
 
         if (rc)
-                lovset->set_completes = 0;
+                cfs_atomic_set(&lovset->set_completes, 0);
         err = lov_fini_setattr_set(lovset);
         RETURN(rc ? rc : err);
 }
         err = lov_fini_setattr_set(lovset);
         RETURN(rc ? rc : err);
 }
@@ -1490,7 +1490,7 @@ static int lov_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
         if (rc || !rqset || cfs_list_empty(&rqset->set_requests)) {
                 int err;
                 if (rc)
         if (rc || !rqset || cfs_list_empty(&rqset->set_requests)) {
                 int err;
                 if (rc)
-                        set->set_completes = 0;
+                        cfs_atomic_set(&set->set_completes, 0);
                 err = lov_fini_setattr_set(set);
                 RETURN(rc ? rc : err);
         }
                 err = lov_fini_setattr_set(set);
                 RETURN(rc ? rc : err);
         }
@@ -1510,7 +1510,7 @@ static int lov_punch_interpret(struct ptlrpc_request_set *rqset,
         ENTRY;
 
         if (rc)
         ENTRY;
 
         if (rc)
-                lovset->set_completes = 0;
+                cfs_atomic_set(&lovset->set_completes, 0);
         err = lov_fini_punch_set(lovset);
         RETURN(rc ? rc : err);
 }
         err = lov_fini_punch_set(lovset);
         RETURN(rc ? rc : err);
 }
@@ -1575,7 +1575,7 @@ static int lov_sync_interpret(struct ptlrpc_request_set *rqset,
         ENTRY;
 
         if (rc)
         ENTRY;
 
         if (rc)
-                lovset->set_completes = 0;
+                cfs_atomic_set(&lovset->set_completes, 0);
         err = lov_fini_sync_set(lovset);
         RETURN(rc ?: err);
 }
         err = lov_fini_sync_set(lovset);
         RETURN(rc ?: err);
 }
@@ -1948,7 +1948,7 @@ int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc)
         ENTRY;
 
         if (rc)
         ENTRY;
 
         if (rc)
-                lovset->set_completes = 0;
+                cfs_atomic_set(&lovset->set_completes, 0);
 
         err = lov_fini_statfs_set(lovset);
         RETURN(rc ? rc : err);
 
         err = lov_fini_statfs_set(lovset);
         RETURN(rc ? rc : err);
@@ -1986,7 +1986,7 @@ static int lov_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
         if (rc || cfs_list_empty(&rqset->set_requests)) {
                 int err;
                 if (rc)
         if (rc || cfs_list_empty(&rqset->set_requests)) {
                 int err;
                 if (rc)
-                        set->set_completes = 0;
+                        cfs_atomic_set(&set->set_completes, 0);
                 err = lov_fini_statfs_set(set);
                 RETURN(rc ? rc : err);
         }
                 err = lov_fini_statfs_set(set);
                 RETURN(rc ? rc : err);
         }
index c1f8c96..9180373 100644 (file)
@@ -1132,11 +1132,12 @@ void qos_update(struct lov_obd *lov)
 
 void qos_statfs_done(struct lov_obd *lov)
 {
 
 void qos_statfs_done(struct lov_obd *lov)
 {
-        LASSERT(lov->lov_qos.lq_statfs_in_progress);
         cfs_down_write(&lov->lov_qos.lq_rw_sem);
         cfs_down_write(&lov->lov_qos.lq_rw_sem);
-        lov->lov_qos.lq_statfs_in_progress = 0;
-        /* wake up any threads waiting for the statfs rpcs to complete */
-        cfs_waitq_signal(&lov->lov_qos.lq_statfs_waitq);
+        if (lov->lov_qos.lq_statfs_in_progress) {
+                lov->lov_qos.lq_statfs_in_progress = 0;
+                /* wake up any threads waiting for the statfs rpcs to complete*/
+                cfs_waitq_signal(&lov->lov_qos.lq_statfs_waitq);
+        }
         cfs_up_write(&lov->lov_qos.lq_rw_sem);
 }
 
         cfs_up_write(&lov->lov_qos.lq_rw_sem);
 }
 
index ac52991..3e29f43 100644 (file)
@@ -57,8 +57,8 @@
 static void lov_init_set(struct lov_request_set *set)
 {
         set->set_count = 0;
 static void lov_init_set(struct lov_request_set *set)
 {
         set->set_count = 0;
-        set->set_completes = 0;
-        set->set_success = 0;
+        cfs_atomic_set(&set->set_completes, 0);
+        cfs_atomic_set(&set->set_success, 0);
         set->set_cookies = 0;
         CFS_INIT_LIST_HEAD(&set->set_list);
         cfs_atomic_set(&set->set_refcount, 1);
         set->set_cookies = 0;
         CFS_INIT_LIST_HEAD(&set->set_list);
         cfs_atomic_set(&set->set_refcount, 1);
@@ -101,9 +101,11 @@ void lov_finish_set(struct lov_request_set *set)
 
 int lov_finished_set(struct lov_request_set *set)
 {
 
 int lov_finished_set(struct lov_request_set *set)
 {
-        CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
+        int completes = cfs_atomic_read(&set->set_completes);
+
+        CDEBUG(D_INFO, "check set %d/%d\n", completes,
                set->set_count);
                set->set_count);
-        return set->set_completes == set->set_count;
+        return completes == set->set_count;
 }
 
 void lov_update_set(struct lov_request_set *set,
 }
 
 void lov_update_set(struct lov_request_set *set,
@@ -112,9 +114,9 @@ void lov_update_set(struct lov_request_set *set,
         req->rq_complete = 1;
         req->rq_rc = rc;
 
         req->rq_complete = 1;
         req->rq_rc = rc;
 
-        set->set_completes++;
+        cfs_atomic_inc(&set->set_completes);
         if (rc == 0)
         if (rc == 0)
-                set->set_success++;
+                cfs_atomic_inc(&set->set_success);
 
         cfs_waitq_signal(&set->set_waitq);
 }
 
         cfs_waitq_signal(&set->set_waitq);
 }
@@ -216,11 +218,12 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode)
 {
         struct lov_request *req;
         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
 {
         struct lov_request *req;
         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+        int completes = cfs_atomic_read(&set->set_completes);
         int rc = 0;
         ENTRY;
 
         /* enqueue/match success, just return */
         int rc = 0;
         ENTRY;
 
         /* enqueue/match success, just return */
-        if (set->set_completes && set->set_completes == set->set_success)
+        if (completes && completes == cfs_atomic_read(&set->set_success))
                 RETURN(0);
 
         /* cancel enqueued/matched locks */
                 RETURN(0);
 
         /* cancel enqueued/matched locks */
@@ -262,7 +265,7 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
          * succeeded. */
         if (!rqset) {
                 if (rc)
          * succeeded. */
         if (!rqset) {
                 if (rc)
-                        set->set_completes = 0;
+                        cfs_atomic_set(&set->set_completes, 0);
                 ret = enqueue_done(set, mode);
         } else if (set->set_lockh)
                 lov_llh_put(set->set_lockh);
                 ret = enqueue_done(set, mode);
         } else if (set->set_lockh)
                 lov_llh_put(set->set_lockh);
@@ -363,20 +366,6 @@ out_set:
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
-                         int rc)
-{
-        int ret = rc;
-        ENTRY;
-
-        if (rc > 0)
-                ret = 0;
-        else if (rc == 0)
-                ret = 1;
-        lov_update_set(set, req, ret);
-        RETURN(rc);
-}
-
 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
 {
         int rc = 0;
 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
 {
         int rc = 0;
@@ -386,7 +375,7 @@ int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
                 RETURN(0);
         LASSERT(set->set_exp);
         rc = enqueue_done(set, mode);
                 RETURN(0);
         LASSERT(set->set_exp);
         rc = enqueue_done(set, mode);
-        if ((set->set_count == set->set_success) &&
+        if ((set->set_count == cfs_atomic_read(&set->set_success)) &&
             (flags & LDLM_FL_TEST_LOCK))
                 lov_llh_put(set->set_lockh);
 
             (flags & LDLM_FL_TEST_LOCK))
                 lov_llh_put(set->set_lockh);
 
@@ -574,7 +563,7 @@ static int lov_update_create_set(struct lov_request_set *set,
         }
 
         cfs_spin_lock(&set->set_lock);
         }
 
         cfs_spin_lock(&set->set_lock);
-        req->rq_stripe = set->set_success;
+        req->rq_stripe = cfs_atomic_read(&set->set_success);
         loi = lsm->lsm_oinfo[req->rq_stripe];
 
 
         loi = lsm->lsm_oinfo[req->rq_stripe];
 
 
@@ -610,19 +599,19 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
         struct obdo *src_oa = set->set_oi->oi_oa;
         struct lov_request *req;
         struct obdo *ret_oa = NULL;
         struct obdo *src_oa = set->set_oi->oi_oa;
         struct lov_request *req;
         struct obdo *ret_oa = NULL;
-        int attrset = 0, rc = 0;
+        int success, attrset = 0, rc = 0;
         ENTRY;
 
         ENTRY;
 
-        LASSERT(set->set_completes);
+        LASSERT(cfs_atomic_read(&set->set_completes));
 
         /* try alloc objects on other osts if osc_create fails for
          * exceptions: RPC failure, ENOSPC, etc */
 
         /* try alloc objects on other osts if osc_create fails for
          * exceptions: RPC failure, ENOSPC, etc */
-        if (set->set_count != set->set_success) {
+        if (set->set_count != cfs_atomic_read(&set->set_success)) {
                 cfs_list_for_each_entry (req, &set->set_list, rq_link) {
                         if (req->rq_rc == 0)
                                 continue;
 
                 cfs_list_for_each_entry (req, &set->set_list, rq_link) {
                         if (req->rq_rc == 0)
                                 continue;
 
-                        set->set_completes--;
+                        cfs_atomic_dec(&set->set_completes);
                         req->rq_complete = 0;
 
                         rc = qos_remedy_create(set, req);
                         req->rq_complete = 0;
 
                         rc = qos_remedy_create(set, req);
@@ -630,12 +619,13 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
                 }
         }
 
                 }
         }
 
+        success = cfs_atomic_read(&set->set_success);
         /* no successful creates */
         /* no successful creates */
-        if (set->set_success == 0)
+        if (success == 0)
                 GOTO(cleanup, rc);
 
                 GOTO(cleanup, rc);
 
-        if (set->set_count != set->set_success) {
-                set->set_count = set->set_success;
+        if (set->set_count != success) {
+                set->set_count = success;
                 qos_shrink_lsm(set);
         }
 
                 qos_shrink_lsm(set);
         }
 
@@ -705,7 +695,7 @@ int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes)
+        if (cfs_atomic_read(&set->set_completes))
                 rc = create_done(set->set_exp, set, lsmp);
 
         lov_put_reqset(set);
                 rc = create_done(set->set_exp, set, lsmp);
 
         lov_put_reqset(set);
@@ -775,7 +765,7 @@ static int common_attr_done(struct lov_request_set *set)
         if (set->set_oi->oi_oa == NULL)
                 RETURN(0);
 
         if (set->set_oi->oi_oa == NULL)
                 RETURN(0);
 
-        if (!set->set_success)
+        if (!cfs_atomic_read(&set->set_success))
                 RETURN(-EIO);
 
         OBDO_ALLOC(tmp_oa);
                 RETURN(-EIO);
 
         OBDO_ALLOC(tmp_oa);
@@ -845,7 +835,7 @@ int lov_fini_brw_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes) {
+        if (cfs_atomic_read(&set->set_completes)) {
                 rc = brw_done(set);
                 /* FIXME update qos data here */
         }
                 rc = brw_done(set);
                 /* FIXME update qos data here */
         }
@@ -986,7 +976,7 @@ int lov_fini_getattr_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes)
+        if (cfs_atomic_read(&set->set_completes))
                 rc = common_attr_done(set);
 
         lov_put_reqset(set);
                 rc = common_attr_done(set);
 
         lov_put_reqset(set);
@@ -1071,7 +1061,7 @@ int lov_fini_destroy_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes) {
+        if (cfs_atomic_read(&set->set_completes)) {
                 /* FIXME update qos data here */
         }
 
                 /* FIXME update qos data here */
         }
 
@@ -1148,7 +1138,7 @@ int lov_fini_setattr_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes) {
+        if (cfs_atomic_read(&set->set_completes)) {
                 rc = common_attr_done(set);
                 /* FIXME update qos data here */
         }
                 rc = common_attr_done(set);
                 /* FIXME update qos data here */
         }
@@ -1276,10 +1266,10 @@ int lov_fini_punch_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes) {
+        if (cfs_atomic_read(&set->set_completes)) {
                 rc = -EIO;
                 /* FIXME update qos data here */
                 rc = -EIO;
                 /* FIXME update qos data here */
-                if (set->set_success)
+                if (cfs_atomic_read(&set->set_success))
                         rc = common_attr_done(set);
         }
 
                         rc = common_attr_done(set);
         }
 
@@ -1405,8 +1395,8 @@ int lov_fini_sync_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes) {
-                if (!set->set_success)
+        if (cfs_atomic_read(&set->set_completes)) {
+                if (!cfs_atomic_read(&set->set_success))
                         rc = -EIO;
                 /* FIXME update qos data here */
         }
                         rc = -EIO;
                 /* FIXME update qos data here */
         }
@@ -1530,9 +1520,9 @@ int lov_fini_statfs_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
 
         if (set == NULL)
                 RETURN(0);
 
-        if (set->set_completes) {
+        if (cfs_atomic_read(&set->set_completes)) {
                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
-                                     set->set_success);
+                                     cfs_atomic_read(&set->set_success));
         }
         lov_put_reqset(set);
         RETURN(rc);
         }
         lov_put_reqset(set);
         RETURN(rc);
@@ -1612,6 +1602,7 @@ static int cb_statfs_update(void *cookie, int rc)
 {
         struct obd_info *oinfo = cookie;
         struct lov_request *lovreq;
 {
         struct obd_info *oinfo = cookie;
         struct lov_request *lovreq;
+        struct lov_request_set *set;
         struct obd_statfs *osfs, *lov_sfs;
         struct lov_obd *lov;
         struct lov_tgt_desc *tgt;
         struct obd_statfs *osfs, *lov_sfs;
         struct lov_obd *lov;
         struct lov_tgt_desc *tgt;
@@ -1620,14 +1611,15 @@ static int cb_statfs_update(void *cookie, int rc)
         ENTRY;
 
         lovreq = container_of(oinfo, struct lov_request, rq_oi);
         ENTRY;
 
         lovreq = container_of(oinfo, struct lov_request, rq_oi);
-        lovobd = lovreq->rq_rqset->set_obd;
+        set = lovreq->rq_rqset;
+        lovobd = set->set_obd;
         lov = &lovobd->u.lov;
         lov = &lovobd->u.lov;
-        osfs = lovreq->rq_rqset->set_oi->oi_osfs;
+        osfs = set->set_oi->oi_osfs;
         lov_sfs = oinfo->oi_osfs;
         lov_sfs = oinfo->oi_osfs;
-        success = lovreq->rq_rqset->set_success;
+        success = cfs_atomic_read(&set->set_success);
         /* XXX: the same is done in lov_update_common_set, however
            lovset->set_exp is not initialized. */
         /* XXX: the same is done in lov_update_common_set, however
            lovset->set_exp is not initialized. */
-        lov_update_set(lovreq->rq_rqset, lovreq, rc);
+        lov_update_set(set, lovreq, rc);
         if (rc)
                 GOTO(out, rc);
 
         if (rc)
                 GOTO(out, rc);
 
@@ -1649,12 +1641,12 @@ out_update:
         obd_putref(lovobd);
 
 out:
         obd_putref(lovobd);
 
 out:
-        if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
-            lov_finished_set(lovreq->rq_rqset)) {
-               lov_statfs_interpret(NULL, lovreq->rq_rqset,
-                                    lovreq->rq_rqset->set_success !=
-                                                  lovreq->rq_rqset->set_count);
-               qos_statfs_done(lov);
+        if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
+            lov_finished_set(set)) {
+                lov_statfs_interpret(NULL, set, set->set_count !=
+                                     cfs_atomic_read(&set->set_success));
+                if (lov->lov_qos.lq_statfs_in_progress)
+                        qos_statfs_done(lov);
         }
 
         RETURN(0);
         }
 
         RETURN(0);
index 7ec8408..3216fb1 100644 (file)
@@ -1062,7 +1062,7 @@ int mdc_intent_getattr_async(struct obd_export *exp,
         ga->ga_einfo = einfo;
 
         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
         ga->ga_einfo = einfo;
 
         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
-        ptlrpcd_add_req(req, PSCOPE_OTHER);
+        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
 
         RETURN(0);
 }
 
         RETURN(0);
 }
index 873d274..9b63d42 100644 (file)
@@ -2262,7 +2262,7 @@ static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc,
         ra->ra_oc = oc;
         ra->ra_cb = cb;
         req->rq_interpret_reply = mdc_interpret_renew_capa;
         ra->ra_oc = oc;
         ra->ra_cb = cb;
         req->rq_interpret_reply = mdc_interpret_renew_capa;
-        ptlrpcd_add_req(req, PSCOPE_OTHER);
+        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
         RETURN(0);
 }
 
         RETURN(0);
 }
 
index 6bd0869..e44da95 100644 (file)
@@ -281,6 +281,7 @@ struct obd_device *class_newdev(const char *type_name, const char *name)
         struct obd_type *type = NULL;
         int i;
         int new_obd_minor = 0;
         struct obd_type *type = NULL;
         int i;
         int new_obd_minor = 0;
+        ENTRY;
 
         if (strlen(name) >= MAX_OBD_NAME) {
                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
 
         if (strlen(name) >= MAX_OBD_NAME) {
                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
@@ -306,7 +307,8 @@ struct obd_device *class_newdev(const char *type_name, const char *name)
 
                 if (obd && obd->obd_name &&
                     (strcmp(name, obd->obd_name) == 0)) {
 
                 if (obd && obd->obd_name &&
                     (strcmp(name, obd->obd_name) == 0)) {
-                        CERROR("Device %s already exists, won't add\n", name);
+                        CERROR("Device %s already exists at %d, won't add\n",
+                               name, i);
                         if (result) {
                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
                                          "%p obd_magic %08x != %08x\n", result,
                         if (result) {
                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
                                          "%p obd_magic %08x != %08x\n", result,
@@ -336,7 +338,7 @@ struct obd_device *class_newdev(const char *type_name, const char *name)
         if (result == NULL && i >= class_devno_max()) {
                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
                        class_devno_max());
         if (result == NULL && i >= class_devno_max()) {
                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
                        class_devno_max());
-                result = ERR_PTR(-EOVERFLOW);
+                RETURN(ERR_PTR(-EOVERFLOW));
         }
 
         if (IS_ERR(result)) {
         }
 
         if (IS_ERR(result)) {
@@ -346,7 +348,7 @@ struct obd_device *class_newdev(const char *type_name, const char *name)
                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
                        result->obd_name, result);
         }
                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
                        result->obd_name, result);
         }
-        return result;
+        RETURN(result);
 }
 
 void class_release_dev(struct obd_device *obd)
 }
 
 void class_release_dev(struct obd_device *obd)
@@ -359,8 +361,8 @@ void class_release_dev(struct obd_device *obd)
                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
         LASSERT(obd_type != NULL);
 
                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
         LASSERT(obd_type != NULL);
 
-        CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
-               obd->obd_name,obd->obd_type->typ_name);
+        CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
+               obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
 
         cfs_write_lock(&obd_dev_lock);
         obd_devs[obd->obd_minor] = NULL;
 
         cfs_write_lock(&obd_dev_lock);
         obd_devs[obd->obd_minor] = NULL;
index 152e7df..b9a0fc0 100644 (file)
@@ -184,7 +184,7 @@ static int osc_interpret_create(const struct lu_env *env,
         cfs_spin_lock(&oscc->oscc_lock);
         cfs_list_for_each_entry_safe(fake_req, pos,
                                      &oscc->oscc_wait_create_list, rq_list) {
         cfs_spin_lock(&oscc->oscc_lock);
         cfs_list_for_each_entry_safe(fake_req, pos,
                                      &oscc->oscc_wait_create_list, rq_list) {
-                if (handle_async_create(fake_req, rc)  == -EAGAIN) {
+                if (handle_async_create(fake_req, rc) == -EAGAIN) {
                         oscc_internal_create(oscc);
                         /* sending request should be never fail because
                          * osc use preallocated requests pool */
                         oscc_internal_create(oscc);
                         /* sending request should be never fail because
                          * osc use preallocated requests pool */
@@ -275,7 +275,7 @@ static int oscc_internal_create(struct osc_creator *oscc)
         ptlrpc_request_set_replen(request);
 
         request->rq_interpret_reply = osc_interpret_create;
         ptlrpc_request_set_replen(request);
 
         request->rq_interpret_reply = osc_interpret_create;
-        ptlrpcd_add_req(request, PSCOPE_OTHER);
+        ptlrpcd_add_req(request, PDL_POLICY_ROUND, -1);
 
         RETURN(0);
 }
 
         RETURN(0);
 }
@@ -285,7 +285,6 @@ static int oscc_has_objects_nolock(struct osc_creator *oscc, int count)
         return ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count);
 }
 
         return ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count);
 }
 
-
 static int oscc_has_objects(struct osc_creator *oscc, int count)
 {
         int have_objs;
 static int oscc_has_objects(struct osc_creator *oscc, int count)
 {
         int have_objs;
@@ -512,24 +511,19 @@ int osc_create_async(struct obd_export *exp, struct obd_info *oinfo,
         /* try fast path */
         rc = handle_async_create(fake_req, 0);
         if (rc == -EAGAIN) {
         /* try fast path */
         rc = handle_async_create(fake_req, 0);
         if (rc == -EAGAIN) {
-                int is_add;
-                /* we not have objects - try wait */
-                is_add = ptlrpcd_add_req(fake_req, PSCOPE_OTHER);
-                if (!is_add)
-                        cfs_list_add(&fake_req->rq_list,
-                                     &oscc->oscc_wait_create_list);
-                else
-                        rc = is_add;
-        }
-        cfs_spin_unlock(&oscc->oscc_lock);
-
-        if (rc != -EAGAIN)
+                /* We don't have any objects, wait until we get a reply. */
+                ptlrpcd_add_req(fake_req, PDL_POLICY_ROUND, -1);
+                cfs_list_add(&fake_req->rq_list,
+                             &oscc->oscc_wait_create_list);
+                cfs_spin_unlock(&oscc->oscc_lock);
+                /* EAGAIN mean - request is delayed */
+                rc = 0;
+        } else {
+                cfs_spin_unlock(&oscc->oscc_lock);
                 /* need free request if was error hit or
                  * objects already allocated */
                 ptlrpc_req_finished(fake_req);
                 /* need free request if was error hit or
                  * objects already allocated */
                 ptlrpc_req_finished(fake_req);
-        else
-                /* EAGAIN mean - request is delayed */
-                rc = 0;
+        }
 
         RETURN(rc);
 }
 
         RETURN(rc);
 }
index 887275b..bc1243b 100644 (file)
@@ -403,7 +403,7 @@ int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
         /* do mds to ost setattr asynchronously */
         if (!rqset) {
                 /* Do not wait for response. */
         /* do mds to ost setattr asynchronously */
         if (!rqset) {
                 /* Do not wait for response. */
-                ptlrpcd_add_req(req, PSCOPE_OTHER);
+                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
         } else {
                 req->rq_interpret_reply =
                         (ptlrpc_interpterer_t)osc_setattr_interpret;
         } else {
                 req->rq_interpret_reply =
                         (ptlrpc_interpterer_t)osc_setattr_interpret;
@@ -415,7 +415,7 @@ int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
                 sa->sa_cookie = cookie;
 
                 if (rqset == PTLRPCD_SET)
                 sa->sa_cookie = cookie;
 
                 if (rqset == PTLRPCD_SET)
-                        ptlrpcd_add_req(req, PSCOPE_OTHER);
+                        ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
                 else
                         ptlrpc_set_add_req(rqset, req);
         }
                 else
                         ptlrpc_set_add_req(rqset, req);
         }
@@ -554,7 +554,7 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
         sa->sa_upcall = upcall;
         sa->sa_cookie = cookie;
         if (rqset == PTLRPCD_SET)
         sa->sa_upcall = upcall;
         sa->sa_cookie = cookie;
         if (rqset == PTLRPCD_SET)
-                ptlrpcd_add_req(req, PSCOPE_OTHER);
+                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
         else
                 ptlrpc_set_add_req(rqset, req);
 
         else
                 ptlrpc_set_add_req(rqset, req);
 
@@ -768,7 +768,7 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
         }
 
         /* Do not wait for response */
         }
 
         /* Do not wait for response */
-        ptlrpcd_add_req(req, PSCOPE_OTHER);
+        ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
         RETURN(0);
 }
 
         RETURN(0);
 }
 
@@ -2671,7 +2671,19 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
 
         req->rq_interpret_reply = brw_interpret;
                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
 
         req->rq_interpret_reply = brw_interpret;
-        ptlrpcd_add_req(req, PSCOPE_BRW);
+
+        /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
+         *      CPU/NUMA node the majority of pages were allocated on, and try
+         *      to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
+         *      to reduce cross-CPU memory traffic.
+         *
+         *      But on the other hand, we expect that multiple ptlrpcd threads
+         *      and the initial write sponsor can run in parallel, especially
+         *      when data checksum is enabled, which is CPU-bound operation and
+         *      single ptlrpcd thread cannot process in time. So more ptlrpcd
+         *      threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
+         */
+        ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
         RETURN(1);
 }
 
         RETURN(1);
 }
 
@@ -3452,7 +3464,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                         req->rq_interpret_reply =
                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
                         if (rqset == PTLRPCD_SET)
                         req->rq_interpret_reply =
                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
                         if (rqset == PTLRPCD_SET)
-                                ptlrpcd_add_req(req, PSCOPE_OTHER);
+                                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
                         else
                                 ptlrpc_set_add_req(rqset, req);
                 } else if (intent) {
                         else
                                 ptlrpc_set_add_req(rqset, req);
                 } else if (intent) {
@@ -4152,7 +4164,7 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
                 ptlrpc_set_add_req(set, req);
                 ptlrpc_check_set(NULL, set);
         } else
                 ptlrpc_set_add_req(set, req);
                 ptlrpc_check_set(NULL, set);
         } else
-                ptlrpcd_add_req(req, PSCOPE_OTHER);
+                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
 
         RETURN(0);
 }
 
         RETURN(0);
 }
index bb7eed7..483ca5d 100644 (file)
@@ -856,16 +856,24 @@ struct ptlrpc_request *ptlrpc_prep_fakereq(struct obd_import *imp,
  */
 void ptlrpc_fakereq_finished(struct ptlrpc_request *req)
 {
  */
 void ptlrpc_fakereq_finished(struct ptlrpc_request *req)
 {
-        /* if we kill request before timeout - need adjust counter */
-        if (req->rq_phase == RQ_PHASE_RPC) {
-                struct ptlrpc_request_set *set = req->rq_set;
+        struct ptlrpc_request_set *set = req->rq_set;
+        int wakeup = 0;
 
 
-                if (set)
-                        cfs_atomic_dec(&set->set_remaining);
-        }
+        /* hold ref on the request to prevent others (ptlrpcd) to free it */
+        ptlrpc_request_addref(req);
+        cfs_list_del_init(&req->rq_list);
+
+        /* if we kill request before timeout - need adjust counter */
+        if (req->rq_phase == RQ_PHASE_RPC && set != NULL &&
+            cfs_atomic_dec_and_test(&set->set_remaining))
+                wakeup = 1;
 
         ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
 
         ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
-        cfs_list_del_init(&req->rq_list);
+
+        /* Only need to call wakeup once when to be empty. */
+        if (wakeup)
+                cfs_waitq_signal(&set->set_waitq);
+        ptlrpc_req_finished(req);
 }
 
 /**
 }
 
 /**
@@ -880,8 +888,10 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void)
         OBD_ALLOC(set, sizeof *set);
         if (!set)
                 RETURN(NULL);
         OBD_ALLOC(set, sizeof *set);
         if (!set)
                 RETURN(NULL);
+        cfs_atomic_set(&set->set_refcount, 1);
         CFS_INIT_LIST_HEAD(&set->set_requests);
         cfs_waitq_init(&set->set_waitq);
         CFS_INIT_LIST_HEAD(&set->set_requests);
         cfs_waitq_init(&set->set_waitq);
+        cfs_atomic_set(&set->set_new_count, 0);
         cfs_atomic_set(&set->set_remaining, 0);
         cfs_spin_lock_init(&set->set_new_req_lock);
         CFS_INIT_LIST_HEAD(&set->set_new_requests);
         cfs_atomic_set(&set->set_remaining, 0);
         cfs_spin_lock_init(&set->set_new_req_lock);
         CFS_INIT_LIST_HEAD(&set->set_new_requests);
@@ -945,7 +955,7 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
 
         LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
 
 
         LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
 
-        OBD_FREE(set, sizeof(*set));
+        ptlrpc_reqset_put(set);
         EXIT;
 }
 
         EXIT;
 }
 
@@ -977,43 +987,49 @@ int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
                         struct ptlrpc_request *req)
 {
 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
                         struct ptlrpc_request *req)
 {
+        LASSERT(cfs_list_empty(&req->rq_set_chain));
+
         /* The set takes over the caller's request reference */
         cfs_list_add_tail(&req->rq_set_chain, &set->set_requests);
         req->rq_set = set;
         cfs_atomic_inc(&set->set_remaining);
         /* The set takes over the caller's request reference */
         cfs_list_add_tail(&req->rq_set_chain, &set->set_requests);
         req->rq_set = set;
         cfs_atomic_inc(&set->set_remaining);
-        req->rq_queued_time = cfs_time_current(); /* Where is the best place to set this? */
+        req->rq_queued_time = cfs_time_current();
 }
 
 /**
  * Add a request to a request with dedicated server thread
  * and wake the thread to make any necessary processing.
  * Currently only used for ptlrpcd.
 }
 
 /**
  * Add a request to a request with dedicated server thread
  * and wake the thread to make any necessary processing.
  * Currently only used for ptlrpcd.
- * Returns 0 if succesful or non zero error code on error.
- * (the only possible error for now is if the dedicated server thread
- * is shutting down)
  */
  */
-int ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
+void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
                            struct ptlrpc_request *req)
 {
         struct ptlrpc_request_set *set = pc->pc_set;
                            struct ptlrpc_request *req)
 {
         struct ptlrpc_request_set *set = pc->pc_set;
+        int count, i;
 
 
-        /*
-         * Let caller know that we stopped and will not handle this request.
-         * It needs to take care itself of request.
-         */
-        if (cfs_test_bit(LIOD_STOP, &pc->pc_flags))
-                return -EALREADY;
+        LASSERT(req->rq_set == NULL);
+        LASSERT(cfs_test_bit(LIOD_STOP, &pc->pc_flags) == 0);
 
         cfs_spin_lock(&set->set_new_req_lock);
         /*
          * The set takes over the caller's request reference.
          */
 
         cfs_spin_lock(&set->set_new_req_lock);
         /*
          * The set takes over the caller's request reference.
          */
-        cfs_list_add_tail(&req->rq_set_chain, &set->set_new_requests);
         req->rq_set = set;
         req->rq_set = set;
+        req->rq_queued_time = cfs_time_current();
+        cfs_list_add_tail(&req->rq_set_chain, &set->set_new_requests);
+        count = cfs_atomic_inc_return(&set->set_new_count);
         cfs_spin_unlock(&set->set_new_req_lock);
 
         cfs_spin_unlock(&set->set_new_req_lock);
 
-        cfs_waitq_signal(&set->set_waitq);
-        return 0;
+        /* Only need to call wakeup once for the first entry. */
+        if (count == 1) {
+                cfs_waitq_signal(&set->set_waitq);
+
+                /* XXX: It maybe unnecessary to wakeup all the partners. But to
+                 *      guarantee the async RPC can be processed ASAP, we have
+                 *      no other better choice. It maybe fixed in future. */
+                for (i = 0; i < pc->pc_npartners; i++)
+                        cfs_waitq_signal(&pc->pc_partners[i]->pc_set->set_waitq);
+        }
 }
 
 /**
 }
 
 /**
@@ -2623,7 +2639,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         cfs_atomic_inc(&req->rq_import->imp_replay_inflight);
         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
 
         cfs_atomic_inc(&req->rq_import->imp_replay_inflight);
         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
 
-        ptlrpcd_add_req(req, PSCOPE_OTHER);
+        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
         RETURN(0);
 }
 
         RETURN(0);
 }
 
index 83e4937..05fa0f9 100644 (file)
@@ -705,7 +705,7 @@ int ptlrpc_connect_import(struct obd_import *imp)
 
         DEBUG_REQ(D_RPCTRACE, request, "(re)connect request (timeout %d)",
                   request->rq_timeout);
 
         DEBUG_REQ(D_RPCTRACE, request, "(re)connect request (timeout %d)",
                   request->rq_timeout);
-        ptlrpcd_add_req(request, PSCOPE_OTHER);
+        ptlrpcd_add_req(request, PDL_POLICY_ROUND, -1);
         rc = 0;
 out:
         if (rc != 0) {
         rc = 0;
 out:
         if (rc != 0) {
@@ -1191,7 +1191,7 @@ static int signal_completed_replay(struct obd_import *imp)
                 req->rq_timeout *= 3;
         req->rq_interpret_reply = completed_replay_interpret;
 
                 req->rq_timeout *= 3;
         req->rq_interpret_reply = completed_replay_interpret;
 
-        ptlrpcd_add_req(req, PSCOPE_OTHER);
+        ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
         RETURN(0);
 }
 
         RETURN(0);
 }
 
index a9bc232..b95f8ca 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -100,7 +103,7 @@ int ptlrpc_ping(struct obd_import *imp)
 
         DEBUG_REQ(D_INFO, req, "pinging %s->%s",
                   imp->imp_obd->obd_uuid.uuid, obd2cli_tgt(imp->imp_obd));
 
         DEBUG_REQ(D_INFO, req, "pinging %s->%s",
                   imp->imp_obd->obd_uuid.uuid, obd2cli_tgt(imp->imp_obd));
-        ptlrpcd_add_req(req, PSCOPE_OTHER);
+        ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
 
         RETURN(0);
 }
 
         RETURN(0);
 }
index 57bf5ba..8ab5019 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
@@ -47,6 +50,9 @@ struct ldlm_res_id;
 struct ptlrpc_request_set;
 extern int test_req_buffer_pressure;
 
 struct ptlrpc_request_set;
 extern int test_req_buffer_pressure;
 
+/* ptlrpcd.c */
+int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc);
+
 /* client.c */
 void ptlrpc_init_xid(void);
 
 /* client.c */
 void ptlrpc_init_xid(void);
 
@@ -148,4 +154,10 @@ static inline int ll_rpc_recoverable_error(int rc)
 {
         return (rc == -ENOTCONN || rc == -ENODEV);
 }
 {
         return (rc == -ENOTCONN || rc == -ENODEV);
 }
+
+static inline void ptlrpc_reqset_put(struct ptlrpc_request_set *set)
+{
+        if (cfs_atomic_dec_and_test(&set->set_refcount))
+                OBD_FREE_PTR(set);
+}
 #endif /* PTLRPC_INTERNAL_H */
 #endif /* PTLRPC_INTERNAL_H */
index 90220ce..56a64ec 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
 #include <cl_object.h> /* cl_env_{get,put}() */
 #include <lprocfs_status.h>
 
 #include <cl_object.h> /* cl_env_{get,put}() */
 #include <lprocfs_status.h>
 
-enum pscope_thread {
-        PT_NORMAL,
-        PT_RECOVERY,
-        PT_NR
-};
+#include "ptlrpc_internal.h"
 
 
-struct ptlrpcd_scope_ctl {
-        struct ptlrpcd_thread {
-                const char        *pt_name;
-                struct ptlrpcd_ctl pt_ctl;
-        } pscope_thread[PT_NR];
+struct ptlrpcd {
+        int                pd_size;
+        int                pd_index;
+        int                pd_nthreads;
+        struct ptlrpcd_ctl pd_thread_rcv;
+        struct ptlrpcd_ctl pd_threads[0];
 };
 
 };
 
-static struct ptlrpcd_scope_ctl ptlrpcd_scopes[PSCOPE_NR] = {
-        [PSCOPE_BRW] = {
-                .pscope_thread = {
-                        [PT_NORMAL] = {
-                                .pt_name = "ptlrpcd-brw"
-                        },
-                        [PT_RECOVERY] = {
-                                .pt_name = "ptlrpcd-brw-rcv"
-                        }
-                }
-        },
-        [PSCOPE_OTHER] = {
-                .pscope_thread = {
-                        [PT_NORMAL] = {
-                                .pt_name = "ptlrpcd"
-                        },
-                        [PT_RECOVERY] = {
-                                .pt_name = "ptlrpcd-rcv"
-                        }
-                }
-        }
-};
+#ifdef __KERNEL__
+static int max_ptlrpcds;
+CFS_MODULE_PARM(max_ptlrpcds, "i", int, 0644,
+                "Max ptlrpcd thread count to be started.");
+
+static int ptlrpcd_bind_policy = PDB_POLICY_PAIR;
+CFS_MODULE_PARM(ptlrpcd_bind_policy, "i", int, 0644,
+                "Ptlrpcd threads binding mode.");
+#endif
+static struct ptlrpcd *ptlrpcds;
 
 cfs_semaphore_t ptlrpcd_sem;
 static int ptlrpcd_users = 0;
 
 cfs_semaphore_t ptlrpcd_sem;
 static int ptlrpcd_users = 0;
@@ -119,6 +107,50 @@ void ptlrpcd_wake(struct ptlrpc_request *req)
         cfs_waitq_signal(&rq_set->set_waitq);
 }
 
         cfs_waitq_signal(&rq_set->set_waitq);
 }
 
+static struct ptlrpcd_ctl *
+ptlrpcd_select_pc(struct ptlrpc_request *req, pdl_policy_t policy, int index)
+{
+        int idx = 0;
+
+        if (req != NULL && req->rq_send_state != LUSTRE_IMP_FULL)
+                return &ptlrpcds->pd_thread_rcv;
+
+#ifdef __KERNEL__
+        switch (policy) {
+        case PDL_POLICY_SAME:
+                idx = cfs_smp_processor_id() % ptlrpcds->pd_nthreads;
+                break;
+        case PDL_POLICY_LOCAL:
+                /* Before CPU partition patches available, process it the same
+                 * as "PDL_POLICY_ROUND". */
+# ifdef CFS_CPU_MODE_NUMA
+# warning "fix this code to use new CPU partition APIs"
+# endif
+                /* Fall through to PDL_POLICY_ROUND until the CPU
+                 * CPU partition patches are available. */
+                index = -1;
+        case PDL_POLICY_PREFERRED:
+                if (index >= 0 && index < cfs_num_online_cpus()) {
+                        idx = index % ptlrpcds->pd_nthreads;
+                        break;
+                }
+                /* Fall through to PDL_POLICY_ROUND for bad index. */
+        default:
+                /* Fall through to PDL_POLICY_ROUND for unknown policy. */
+        case PDL_POLICY_ROUND:
+                /* We do not care whether it is strict load balance. */
+                idx = ptlrpcds->pd_index + 1;
+                if (idx == cfs_smp_processor_id())
+                        idx++;
+                idx %= ptlrpcds->pd_nthreads;
+                ptlrpcds->pd_index = idx;
+                break;
+        }
+#endif /* __KERNEL__ */
+
+        return &ptlrpcds->pd_threads[idx];
+}
+
 /**
  * Move all request from an existing request set to the ptlrpcd queue.
  * All requests from the set must be in phase RQ_PHASE_NEW.
 /**
  * Move all request from an existing request set to the ptlrpcd queue.
  * All requests from the set must be in phase RQ_PHASE_NEW.
@@ -126,6 +158,14 @@ void ptlrpcd_wake(struct ptlrpc_request *req)
 void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
 {
         cfs_list_t *tmp, *pos;
 void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
 {
         cfs_list_t *tmp, *pos;
+#ifdef __KERNEL__
+        struct ptlrpcd_ctl *pc;
+        struct ptlrpc_request_set *new;
+        int count, i;
+
+        pc = ptlrpcd_select_pc(NULL, PDL_POLICY_LOCAL, -1);
+        new = pc->pc_set;
+#endif
 
         cfs_list_for_each_safe(pos, tmp, &set->set_requests) {
                 struct ptlrpc_request *req =
 
         cfs_list_for_each_safe(pos, tmp, &set->set_requests) {
                 struct ptlrpc_request *req =
@@ -133,37 +173,81 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
                                        rq_set_chain);
 
                 LASSERT(req->rq_phase == RQ_PHASE_NEW);
                                        rq_set_chain);
 
                 LASSERT(req->rq_phase == RQ_PHASE_NEW);
+#ifdef __KERNEL__
+                req->rq_set = new;
+                req->rq_queued_time = cfs_time_current();
+#else
                 cfs_list_del_init(&req->rq_set_chain);
                 req->rq_set = NULL;
                 cfs_list_del_init(&req->rq_set_chain);
                 req->rq_set = NULL;
-                ptlrpcd_add_req(req, PSCOPE_OTHER);
+                ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
                 cfs_atomic_dec(&set->set_remaining);
                 cfs_atomic_dec(&set->set_remaining);
+#endif
         }
         }
-        LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
+
+#ifdef __KERNEL__
+        cfs_spin_lock(&new->set_new_req_lock);
+        cfs_list_splice_init(&set->set_requests, &new->set_new_requests);
+        i = cfs_atomic_read(&set->set_remaining);
+        count = cfs_atomic_add_return(i, &new->set_new_count);
+        cfs_atomic_set(&set->set_remaining, 0);
+        cfs_spin_unlock(&new->set_new_req_lock);
+        if (count == i) {
+                cfs_waitq_signal(&new->set_waitq);
+
+                /* XXX: It maybe unnecessary to wakeup all the partners. But to
+                 *      guarantee the async RPC can be processed ASAP, we have
+                 *      no other better choice. It maybe fixed in future. */
+                for (i = 0; i < pc->pc_npartners; i++)
+                        cfs_waitq_signal(&pc->pc_partners[i]->pc_set->set_waitq);
+        }
+#endif
 }
 EXPORT_SYMBOL(ptlrpcd_add_rqset);
 
 }
 EXPORT_SYMBOL(ptlrpcd_add_rqset);
 
+#ifdef __KERNEL__
+/**
+ * Return transferred RPCs count.
+ */
+static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des,
+                               struct ptlrpc_request_set *src)
+{
+        cfs_list_t *tmp, *pos;
+        struct ptlrpc_request *req;
+        int rc = 0;
+
+        cfs_spin_lock(&src->set_new_req_lock);
+        if (likely(!cfs_list_empty(&src->set_new_requests))) {
+                cfs_list_for_each_safe(pos, tmp, &src->set_new_requests) {
+                        req = cfs_list_entry(pos, struct ptlrpc_request,
+                                             rq_set_chain);
+                        req->rq_set = des;
+                }
+                cfs_list_splice_init(&src->set_new_requests,
+                                     &des->set_requests);
+                rc = cfs_atomic_read(&src->set_new_count);
+                cfs_atomic_add(rc, &des->set_remaining);
+                cfs_atomic_set(&src->set_new_count, 0);
+        }
+        cfs_spin_unlock(&src->set_new_req_lock);
+        return rc;
+}
+#endif
+
 /**
  * Requests that are added to the ptlrpcd queue are sent via
  * ptlrpcd_check->ptlrpc_check_set().
  */
 /**
  * Requests that are added to the ptlrpcd queue are sent via
  * ptlrpcd_check->ptlrpc_check_set().
  */
-int ptlrpcd_add_req(struct ptlrpc_request *req, enum ptlrpcd_scope scope)
+void ptlrpcd_add_req(struct ptlrpc_request *req, pdl_policy_t policy, int idx)
 {
         struct ptlrpcd_ctl *pc;
 {
         struct ptlrpcd_ctl *pc;
-        enum pscope_thread  pt;
-        int rc;
 
 
-        LASSERT(scope < PSCOPE_NR);
-        
         cfs_spin_lock(&req->rq_lock);
         if (req->rq_invalid_rqset) {
         cfs_spin_lock(&req->rq_lock);
         if (req->rq_invalid_rqset) {
-                cfs_duration_t timeout;
-                struct l_wait_info lwi;
+                struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(5),
+                                                     back_to_sleep, NULL);
 
                 req->rq_invalid_rqset = 0;
                 cfs_spin_unlock(&req->rq_lock);
 
                 req->rq_invalid_rqset = 0;
                 cfs_spin_unlock(&req->rq_lock);
-
-                timeout = cfs_time_seconds(5);
-                lwi = LWI_TIMEOUT(timeout, back_to_sleep, NULL);
                 l_wait_event(req->rq_set_waitq, (req->rq_set == NULL), &lwi);
         } else if (req->rq_set) {
                 LASSERT(req->rq_phase == RQ_PHASE_NEW);
                 l_wait_event(req->rq_set_waitq, (req->rq_set == NULL), &lwi);
         } else if (req->rq_set) {
                 LASSERT(req->rq_phase == RQ_PHASE_NEW);
@@ -172,39 +256,23 @@ int ptlrpcd_add_req(struct ptlrpc_request *req, enum ptlrpcd_scope scope)
                 /* ptlrpc_check_set will decrease the count */
                 cfs_atomic_inc(&req->rq_set->set_remaining);
                 cfs_spin_unlock(&req->rq_lock);
                 /* ptlrpc_check_set will decrease the count */
                 cfs_atomic_inc(&req->rq_set->set_remaining);
                 cfs_spin_unlock(&req->rq_lock);
-
                 cfs_waitq_signal(&req->rq_set->set_waitq);
                 cfs_waitq_signal(&req->rq_set->set_waitq);
+                return;
         } else {
                 cfs_spin_unlock(&req->rq_lock);
         }
 
         } else {
                 cfs_spin_unlock(&req->rq_lock);
         }
 
-        pt = req->rq_send_state == LUSTRE_IMP_FULL ? PT_NORMAL : PT_RECOVERY;
-        pc = &ptlrpcd_scopes[scope].pscope_thread[pt].pt_ctl;
-        rc = ptlrpc_set_add_new_req(pc, req);
-        /*
-         * XXX disable this for CLIO: environment is needed for interpreter.
-         *     add debug temporary to check rc.
-         */
-        LASSERTF(rc == 0, "ptlrpcd_add_req failed (rc = %d)\n", rc);
-        if (rc && 0) {
-                /*
-                 * Thread is probably in stop now so we need to
-                 * kill this rpc as it was not added. Let's call
-                 * interpret for it to let know we're killing it
-                 * so that higher levels might free associated
-                 * resources.
-                 */
-                ptlrpc_req_interpret(NULL, req, -EBADR);
-                req->rq_set = NULL;
-                ptlrpc_req_finished(req);
-        } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING) {
-                /*
-                 * The request is for recovery, should be sent ASAP.
-                 */
-                cfs_waitq_signal(&pc->pc_set->set_waitq);
-        }
+        pc = ptlrpcd_select_pc(req, policy, idx);
 
 
-        return rc;
+        DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s:%d]",
+                  req, pc->pc_name, pc->pc_index);
+
+        ptlrpc_set_add_new_req(pc, req);
+}
+
+static inline void ptlrpc_reqset_get(struct ptlrpc_request_set *set)
+{
+        cfs_atomic_inc(&set->set_refcount);
 }
 
 /**
 }
 
 /**
@@ -215,37 +283,43 @@ static int ptlrpcd_check(const struct lu_env *env, struct ptlrpcd_ctl *pc)
 {
         cfs_list_t *tmp, *pos;
         struct ptlrpc_request *req;
 {
         cfs_list_t *tmp, *pos;
         struct ptlrpc_request *req;
+        struct ptlrpc_request_set *set = pc->pc_set;
         int rc = 0;
         ENTRY;
 
         int rc = 0;
         ENTRY;
 
-        cfs_spin_lock(&pc->pc_set->set_new_req_lock);
-        cfs_list_for_each_safe(pos, tmp, &pc->pc_set->set_new_requests) {
-                req = cfs_list_entry(pos, struct ptlrpc_request, rq_set_chain);
-                cfs_list_del_init(&req->rq_set_chain);
-                ptlrpc_set_add_req(pc->pc_set, req);
-                /*
-                 * Need to calculate its timeout.
-                 */
-                rc = 1;
+        if (cfs_atomic_read(&set->set_new_count)) {
+                cfs_spin_lock(&set->set_new_req_lock);
+                if (likely(!cfs_list_empty(&set->set_new_requests))) {
+                        cfs_list_splice_init(&set->set_new_requests,
+                                             &set->set_requests);
+                        cfs_atomic_add(cfs_atomic_read(&set->set_new_count),
+                                       &set->set_remaining);
+                        cfs_atomic_set(&set->set_new_count, 0);
+                        /*
+                         * Need to calculate its timeout.
+                         */
+                        rc = 1;
+                }
+                cfs_spin_unlock(&set->set_new_req_lock);
         }
         }
-        cfs_spin_unlock(&pc->pc_set->set_new_req_lock);
 
 
-        if (cfs_atomic_read(&pc->pc_set->set_remaining)) {
-                rc = rc | ptlrpc_check_set(env, pc->pc_set);
+        if (cfs_atomic_read(&set->set_remaining))
+                rc |= ptlrpc_check_set(env, set);
 
 
+        if (!cfs_list_empty(&set->set_requests)) {
                 /*
                  * XXX: our set never completes, so we prune the completed
                  * reqs after each iteration. boy could this be smarter.
                  */
                 /*
                  * XXX: our set never completes, so we prune the completed
                  * reqs after each iteration. boy could this be smarter.
                  */
-                cfs_list_for_each_safe(pos, tmp, &pc->pc_set->set_requests) {
+                cfs_list_for_each_safe(pos, tmp, &set->set_requests) {
                         req = cfs_list_entry(pos, struct ptlrpc_request,
                         req = cfs_list_entry(pos, struct ptlrpc_request,
-                                         rq_set_chain);
+                                             rq_set_chain);
                         if (req->rq_phase != RQ_PHASE_COMPLETE)
                                 continue;
 
                         cfs_list_del_init(&req->rq_set_chain);
                         req->rq_set = NULL;
                         if (req->rq_phase != RQ_PHASE_COMPLETE)
                                 continue;
 
                         cfs_list_del_init(&req->rq_set_chain);
                         req->rq_set = NULL;
-                        ptlrpc_req_finished (req);
+                        ptlrpc_req_finished(req);
                 }
         }
 
                 }
         }
 
@@ -253,9 +327,43 @@ static int ptlrpcd_check(const struct lu_env *env, struct ptlrpcd_ctl *pc)
                 /*
                  * If new requests have been added, make sure to wake up.
                  */
                 /*
                  * If new requests have been added, make sure to wake up.
                  */
-                cfs_spin_lock(&pc->pc_set->set_new_req_lock);
-                rc = !cfs_list_empty(&pc->pc_set->set_new_requests);
-                cfs_spin_unlock(&pc->pc_set->set_new_req_lock);
+                rc = cfs_atomic_read(&set->set_new_count);
+
+#ifdef __KERNEL__
+                if (rc == 0 && pc->pc_npartners > 0) {
+                        struct ptlrpcd_ctl *partner;
+                        struct ptlrpc_request_set *ps;
+                        int first = pc->pc_cursor;
+
+                        do {
+                                partner = pc->pc_partners[pc->pc_cursor++];
+                                if (pc->pc_cursor >= pc->pc_npartners)
+                                        pc->pc_cursor = 0;
+                                if (partner == NULL)
+                                        continue;
+
+                                cfs_spin_lock(&partner->pc_lock);
+                                ps = partner->pc_set;
+                                if (ps == NULL) {
+                                        cfs_spin_unlock(&partner->pc_lock);
+                                        continue;
+                                }
+
+                                ptlrpc_reqset_get(ps);
+                                cfs_spin_unlock(&partner->pc_lock);
+
+                                if (cfs_atomic_read(&ps->set_new_count)) {
+                                        rc = ptlrpcd_steal_rqset(set, ps);
+                                        if (rc > 0)
+                                                CDEBUG(D_RPCTRACE, "transfer %d"
+                                                       " async RPCs [%d->%d]\n",
+                                                        rc, pc->pc_index,
+                                                        partner->pc_index);
+                                }
+                                ptlrpc_reqset_put(ps);
+                        } while (rc == 0 && pc->pc_cursor != first);
+                }
+#endif
         }
 
         RETURN(rc);
         }
 
         RETURN(rc);
@@ -271,26 +379,37 @@ static int ptlrpcd_check(const struct lu_env *env, struct ptlrpcd_ctl *pc)
 static int ptlrpcd(void *arg)
 {
         struct ptlrpcd_ctl *pc = arg;
 static int ptlrpcd(void *arg)
 {
         struct ptlrpcd_ctl *pc = arg;
+        struct ptlrpc_request_set *set = pc->pc_set;
         struct lu_env env = { .le_ses = NULL };
         int rc, exit = 0;
         ENTRY;
 
         struct lu_env env = { .le_ses = NULL };
         int rc, exit = 0;
         ENTRY;
 
-        rc = cfs_daemonize_ctxt(pc->pc_name);
-        if (rc == 0) {
-                /*
-                 * XXX So far only "client" ptlrpcd uses an environment. In
-                 * the future, ptlrpcd thread (or a thread-set) has to given
-                 * an argument, describing its "scope".
-                 */
-                rc = lu_context_init(&env.le_ctx,
-                                     LCT_CL_THREAD|LCT_REMEMBER|LCT_NOREF);
-        }
+        cfs_daemonize_ctxt(pc->pc_name);
+#if defined(CONFIG_SMP) && defined(HAVE_NODE_TO_CPUMASK)
+        if (cfs_test_bit(LIOD_BIND, &pc->pc_flags)) {
+                int index = pc->pc_index;
 
 
+                if (index >= 0 && index < cfs_num_possible_cpus()) {
+                        while (!cfs_cpu_online(index)) {
+                                if (++index >= cfs_num_possible_cpus())
+                                        index = 0;
+                        }
+                        cfs_set_cpus_allowed(cfs_current(),
+                                     node_to_cpumask(cpu_to_node(index)));
+                }
+        }
+#endif
+        /*
+         * XXX So far only "client" ptlrpcd uses an environment. In
+         * the future, ptlrpcd thread (or a thread-set) has to given
+         * an argument, describing its "scope".
+         */
+        rc = lu_context_init(&env.le_ctx,
+                             LCT_CL_THREAD|LCT_REMEMBER|LCT_NOREF);
         cfs_complete(&pc->pc_starting);
 
         if (rc != 0)
                 RETURN(rc);
         cfs_complete(&pc->pc_starting);
 
         if (rc != 0)
                 RETURN(rc);
-        env.le_ctx.lc_cookie = 0x7;
 
         /*
          * This mainloop strongly resembles ptlrpc_set_wait() except that our
 
         /*
          * This mainloop strongly resembles ptlrpc_set_wait() except that our
@@ -318,12 +437,12 @@ static int ptlrpcd(void *arg)
                         continue;
                 }
 
                         continue;
                 }
 
-                timeout = ptlrpc_set_next_timeout(pc->pc_set);
+                timeout = ptlrpc_set_next_timeout(set);
                 lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
                 lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
-                                  ptlrpc_expired_set, pc->pc_set);
+                                  ptlrpc_expired_set, set);
 
                 lu_context_enter(&env.le_ctx);
 
                 lu_context_enter(&env.le_ctx);
-                l_wait_event(pc->pc_set->set_waitq,
+                l_wait_event(set->set_waitq,
                              ptlrpcd_check(&env, pc), &lwi);
                 lu_context_exit(&env.le_ctx);
 
                              ptlrpcd_check(&env, pc), &lwi);
                 lu_context_exit(&env.le_ctx);
 
@@ -332,7 +451,7 @@ static int ptlrpcd(void *arg)
                  */
                 if (cfs_test_bit(LIOD_STOP, &pc->pc_flags)) {
                         if (cfs_test_bit(LIOD_FORCE, &pc->pc_flags))
                  */
                 if (cfs_test_bit(LIOD_STOP, &pc->pc_flags)) {
                         if (cfs_test_bit(LIOD_FORCE, &pc->pc_flags))
-                                ptlrpc_abort_set(pc->pc_set);
+                                ptlrpc_abort_set(set);
                         exit++;
                 }
 
                         exit++;
                 }
 
@@ -345,17 +464,123 @@ static int ptlrpcd(void *arg)
         /*
          * Wait for inflight requests to drain.
          */
         /*
          * Wait for inflight requests to drain.
          */
-        if (!cfs_list_empty(&pc->pc_set->set_requests))
-                ptlrpc_set_wait(pc->pc_set);
+        if (!cfs_list_empty(&set->set_requests))
+                ptlrpc_set_wait(set);
         lu_context_fini(&env.le_ctx);
         cfs_complete(&pc->pc_finishing);
 
         cfs_clear_bit(LIOD_START, &pc->pc_flags);
         cfs_clear_bit(LIOD_STOP, &pc->pc_flags);
         cfs_clear_bit(LIOD_FORCE, &pc->pc_flags);
         lu_context_fini(&env.le_ctx);
         cfs_complete(&pc->pc_finishing);
 
         cfs_clear_bit(LIOD_START, &pc->pc_flags);
         cfs_clear_bit(LIOD_STOP, &pc->pc_flags);
         cfs_clear_bit(LIOD_FORCE, &pc->pc_flags);
+        cfs_clear_bit(LIOD_BIND, &pc->pc_flags);
         return 0;
 }
 
         return 0;
 }
 
+/* XXX: We want multiple CPU cores to share the async RPC load. So we start many
+ *      ptlrpcd threads. We also want to reduce the ptlrpcd overhead caused by
+ *      data transfer cross-CPU cores. So we bind ptlrpcd thread to specified
+ *      CPU core. But binding all ptlrpcd threads maybe cause response delay
+ *      because of some CPU core(s) busy with other loads.
+ *
+ *      For example: "ls -l", some async RPCs for statahead are assigned to
+ *      ptlrpcd_0, and ptlrpcd_0 is bound to CPU_0, but CPU_0 may be quite busy
+ *      with other non-ptlrpcd, like "ls -l" itself (we want to the "ls -l"
+ *      thread, statahead thread, and ptlrpcd thread can run in parallel), under
+ *      such case, the statahead async RPCs can not be processed in time, it is
+ *      unexpected. If ptlrpcd_0 can be re-scheduled on other CPU core, it may
+ *      be better. But it breaks former data transfer policy.
+ *
+ *      So we shouldn't be blind for avoiding the data transfer. We make some
+ *      compromise: divide the ptlrpcd threds pool into two parts. One part is
+ *      for bound mode, each ptlrpcd thread in this part is bound to some CPU
+ *      core. The other part is for free mode, all the ptlrpcd threads in the
+ *      part can be scheduled on any CPU core. We specify some partnership
+ *      between bound mode ptlrpcd thread(s) and free mode ptlrpcd thread(s),
+ *      and the async RPC load within the partners are shared.
+ *
+ *      It can partly avoid data transfer cross-CPU (if the bound mode ptlrpcd
+ *      thread can be scheduled in time), and try to guarantee the async RPC
+ *      processed ASAP (as long as the free mode ptlrpcd thread can be scheduled
+ *      on any CPU core).
+ *
+ *      As for how to specify the partnership between bound mode ptlrpcd
+ *      thread(s) and free mode ptlrpcd thread(s), the simplest way is to use
+ *      <free bound> pair. In future, we can specify some more complex
+ *      partnership based on the patches for CPU partition. But before such
+ *      patches are available, we prefer to use the simplest one.
+ */
+# ifdef CFS_CPU_MODE_NUMA
+# warning "fix ptlrpcd_bind() to use new CPU partition APIs"
+# endif
+static int ptlrpcd_bind(int index, int max)
+{
+        struct ptlrpcd_ctl *pc;
+        int rc = 0;
+        ENTRY;
+
+        LASSERT(index <= max - 1);
+        pc = &ptlrpcds->pd_threads[index];
+        switch (ptlrpcd_bind_policy) {
+        case PDB_POLICY_NONE:
+                pc->pc_npartners = -1;
+                break;
+        case PDB_POLICY_FULL:
+                pc->pc_npartners = 0;
+                cfs_set_bit(LIOD_BIND, &pc->pc_flags);
+                break;
+        case PDB_POLICY_PAIR:
+                LASSERT(max % 2 == 0);
+                pc->pc_npartners = 1;
+                break;
+        case PDB_POLICY_NEIGHBOR:
+                LASSERT(max >= 3);
+                pc->pc_npartners = 2;
+                break;
+        default:
+                CERROR("unknown ptlrpcd bind policy %d\n", ptlrpcd_bind_policy);
+                rc = -EINVAL;
+        }
+
+        if (rc == 0 && pc->pc_npartners > 0) {
+                OBD_ALLOC(pc->pc_partners,
+                          sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners);
+                if (pc->pc_partners == NULL) {
+                        pc->pc_npartners = 0;
+                        rc = -ENOMEM;
+                } else {
+                        if (index & 0x1)
+                                cfs_set_bit(LIOD_BIND, &pc->pc_flags);
+
+                        switch (ptlrpcd_bind_policy) {
+                        case PDB_POLICY_PAIR:
+                                if (index & 0x1) {
+                                        pc->pc_partners[0] = &ptlrpcds->
+                                                pd_threads[index - 1];
+                                        ptlrpcds->pd_threads[index - 1].
+                                                pc_partners[0] = pc;
+                                }
+                                break;
+                        case PDB_POLICY_NEIGHBOR:
+                                if (index > 0) {
+                                        pc->pc_partners[0] = &ptlrpcds->
+                                                pd_threads[index - 1];
+                                        ptlrpcds->pd_threads[index - 1].
+                                                pc_partners[1] = pc;
+                                        if (index == max - 1) {
+                                                pc->pc_partners[1] =
+                                                &ptlrpcds->pd_threads[0];
+                                                ptlrpcds->pd_threads[0].
+                                                pc_partners[0] = pc;
+                                        }
+                                }
+                                break;
+                        }
+                }
+        }
+
+        RETURN(rc);
+}
+
 #else /* !__KERNEL__ */
 
 /**
 #else /* !__KERNEL__ */
 
 /**
@@ -378,7 +603,6 @@ int ptlrpcd_check_async_rpcs(void *arg)
                 if (rc == 0) {
                         lu_context_enter(&pc->pc_env.le_ctx);
                         rc = ptlrpcd_check(&pc->pc_env, pc);
                 if (rc == 0) {
                         lu_context_enter(&pc->pc_env.le_ctx);
                         rc = ptlrpcd_check(&pc->pc_env, pc);
-                        lu_context_exit(&pc->pc_env.le_ctx);
                         if (!rc)
                                 ptlrpc_expired_set(pc->pc_set);
                         /*
                         if (!rc)
                                 ptlrpc_expired_set(pc->pc_set);
                         /*
@@ -386,6 +610,7 @@ int ptlrpcd_check_async_rpcs(void *arg)
                          */
                         if (cfs_test_bit(LIOD_RECOVERY, &pc->pc_flags))
                                 rc = ptlrpcd_check(&pc->pc_env, pc);
                          */
                         if (cfs_test_bit(LIOD_RECOVERY, &pc->pc_flags))
                                 rc = ptlrpcd_check(&pc->pc_env, pc);
+                        lu_context_exit(&pc->pc_env.le_ctx);
                 }
         }
 
                 }
         }
 
@@ -397,26 +622,28 @@ int ptlrpcd_idle(void *arg)
 {
         struct ptlrpcd_ctl *pc = arg;
 
 {
         struct ptlrpcd_ctl *pc = arg;
 
-        return (cfs_list_empty(&pc->pc_set->set_new_requests) &&
+        return (cfs_atomic_read(&pc->pc_set->set_new_count) == 0 &&
                 cfs_atomic_read(&pc->pc_set->set_remaining) == 0);
 }
 
 #endif
 
                 cfs_atomic_read(&pc->pc_set->set_remaining) == 0);
 }
 
 #endif
 
-int ptlrpcd_start(const char *name, struct ptlrpcd_ctl *pc)
+int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc)
 {
         int rc;
 {
         int rc;
+        int env = 0;
         ENTRY;
 
         /*
          * Do not allow start second thread for one pc.
          */
         if (cfs_test_and_set_bit(LIOD_START, &pc->pc_flags)) {
         ENTRY;
 
         /*
          * Do not allow start second thread for one pc.
          */
         if (cfs_test_and_set_bit(LIOD_START, &pc->pc_flags)) {
-                CERROR("Starting second thread (%s) for same pc %p\n",
+                CWARN("Starting second thread (%s) for same pc %p\n",
                        name, pc);
                        name, pc);
-                RETURN(-EALREADY);
+                RETURN(0);
         }
 
         }
 
+        pc->pc_index = index;
         cfs_init_completion(&pc->pc_starting);
         cfs_init_completion(&pc->pc_finishing);
         cfs_spin_lock_init(&pc->pc_lock);
         cfs_init_completion(&pc->pc_starting);
         cfs_init_completion(&pc->pc_finishing);
         cfs_spin_lock_init(&pc->pc_lock);
@@ -430,18 +657,21 @@ int ptlrpcd_start(const char *name, struct ptlrpcd_ctl *pc)
          * describing its "scope".
          */
         rc = lu_context_init(&pc->pc_env.le_ctx, LCT_CL_THREAD|LCT_REMEMBER);
          * describing its "scope".
          */
         rc = lu_context_init(&pc->pc_env.le_ctx, LCT_CL_THREAD|LCT_REMEMBER);
-        if (rc != 0) {
-                ptlrpc_set_destroy(pc->pc_set);
+        if (rc != 0)
                 GOTO(out, rc);
                 GOTO(out, rc);
-        }
 
 
+        env = 1;
 #ifdef __KERNEL__
 #ifdef __KERNEL__
+        if (index >= 0) {
+                rc = ptlrpcd_bind(index, max);
+                if (rc < 0)
+                        GOTO(out, rc);
+        }
+
         rc = cfs_create_thread(ptlrpcd, pc, 0);
         rc = cfs_create_thread(ptlrpcd, pc, 0);
-        if (rc < 0)  {
-                lu_context_fini(&pc->pc_env.le_ctx);
-                ptlrpc_set_destroy(pc->pc_set);
+        if (rc < 0)
                 GOTO(out, rc);
                 GOTO(out, rc);
-        }
+
         rc = 0;
         cfs_wait_for_completion(&pc->pc_starting);
 #else
         rc = 0;
         cfs_wait_for_completion(&pc->pc_starting);
 #else
@@ -453,16 +683,33 @@ int ptlrpcd_start(const char *name, struct ptlrpcd_ctl *pc)
                                                  &ptlrpcd_idle, pc);
 #endif
 out:
                                                  &ptlrpcd_idle, pc);
 #endif
 out:
-        if (rc)
+        if (rc) {
+#ifdef __KERNEL__
+                if (pc->pc_set != NULL) {
+                        struct ptlrpc_request_set *set = pc->pc_set;
+
+                        cfs_spin_lock(&pc->pc_lock);
+                        pc->pc_set = NULL;
+                        cfs_spin_unlock(&pc->pc_lock);
+                        ptlrpc_set_destroy(set);
+                }
+                if (env != 0)
+                        lu_context_fini(&pc->pc_env.le_ctx);
+                cfs_clear_bit(LIOD_BIND, &pc->pc_flags);
+#endif
                 cfs_clear_bit(LIOD_START, &pc->pc_flags);
                 cfs_clear_bit(LIOD_START, &pc->pc_flags);
+        }
         RETURN(rc);
 }
 
 void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force)
 {
         RETURN(rc);
 }
 
 void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force)
 {
+       struct ptlrpc_request_set *set = pc->pc_set;
+        ENTRY;
+
         if (!cfs_test_bit(LIOD_START, &pc->pc_flags)) {
         if (!cfs_test_bit(LIOD_START, &pc->pc_flags)) {
-                CERROR("Thread for pc %p was not started\n", pc);
-                return;
+                CWARN("Thread for pc %p was not started\n", pc);
+                goto out;
         }
 
         cfs_set_bit(LIOD_STOP, &pc->pc_flags);
         }
 
         cfs_set_bit(LIOD_STOP, &pc->pc_flags);
@@ -476,55 +723,116 @@ void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force)
         liblustre_deregister_idle_callback(pc->pc_idle_callback);
 #endif
         lu_context_fini(&pc->pc_env.le_ctx);
         liblustre_deregister_idle_callback(pc->pc_idle_callback);
 #endif
         lu_context_fini(&pc->pc_env.le_ctx);
-        ptlrpc_set_destroy(pc->pc_set);
+
+        cfs_spin_lock(&pc->pc_lock);
+        pc->pc_set = NULL;
+        cfs_spin_unlock(&pc->pc_lock);
+        ptlrpc_set_destroy(set);
+
+out:
+#ifdef __KERNEL__
+        if (pc->pc_npartners > 0) {
+                LASSERT(pc->pc_partners != NULL);
+
+                OBD_FREE(pc->pc_partners,
+                         sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners);
+                pc->pc_partners = NULL;
+        }
+        pc->pc_npartners = 0;
+#endif
+        EXIT;
 }
 
 }
 
-void ptlrpcd_fini(void)
+static void ptlrpcd_fini(void)
 {
         int i;
 {
         int i;
-        int j;
+        ENTRY;
+
+        if (ptlrpcds != NULL) {
+                for (i = 0; i < ptlrpcds->pd_nthreads; i++)
+                        ptlrpcd_stop(&ptlrpcds->pd_threads[i], 0);
+                ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0);
+                OBD_FREE(ptlrpcds, ptlrpcds->pd_size);
+                ptlrpcds = NULL;
+        }
 
 
+        EXIT;
+}
+
+static int ptlrpcd_init(void)
+{
+        int nthreads = cfs_num_online_cpus();
+        char name[16];
+        int size, i = -1, j, rc = 0;
         ENTRY;
 
         ENTRY;
 
-        for (i = 0; i < PSCOPE_NR; ++i) {
-                for (j = 0; j < PT_NR; ++j) {
-                        struct ptlrpcd_ctl *pc;
+#ifdef __KERNEL__
+        if (max_ptlrpcds > 0 && max_ptlrpcds < nthreads)
+                nthreads = max_ptlrpcds;
+        if (nthreads < 2)
+                nthreads = 2;
+        if (nthreads < 3 && ptlrpcd_bind_policy == PDB_POLICY_NEIGHBOR)
+                ptlrpcd_bind_policy = PDB_POLICY_PAIR;
+        else if (nthreads % 2 != 0 && ptlrpcd_bind_policy == PDB_POLICY_PAIR)
+                nthreads &= ~1; /* make sure it is even */
+#else
+        nthreads = 1;
+#endif
 
 
-                        pc = &ptlrpcd_scopes[i].pscope_thread[j].pt_ctl;
+        size = offsetof(struct ptlrpcd, pd_threads[nthreads]);
+        OBD_ALLOC(ptlrpcds, size);
+        if (ptlrpcds == NULL)
+                GOTO(out, rc = -ENOMEM);
 
 
-                        if (cfs_test_bit(LIOD_START, &pc->pc_flags))
-                                ptlrpcd_stop(pc, 0);
-                }
+        snprintf(name, 15, "ptlrpcd_rcv");
+        cfs_set_bit(LIOD_RECOVERY, &ptlrpcds->pd_thread_rcv.pc_flags);
+        rc = ptlrpcd_start(-1, nthreads, name, &ptlrpcds->pd_thread_rcv);
+        if (rc < 0)
+                GOTO(out, rc);
+
+        /* XXX: We start nthreads ptlrpc daemons. Each of them can process any
+         *      non-recovery async RPC to improve overall async RPC efficiency.
+         *
+         *      But there are some issues with async I/O RPCs and async non-I/O
+         *      RPCs processed in the same set under some cases. The ptlrpcd may
+         *      be blocked by some async I/O RPC(s), then will cause other async
+         *      non-I/O RPC(s) can not be processed in time.
+         *
+         *      Maybe we should distinguish blocked async RPCs from non-blocked
+         *      async RPCs, and process them in different ptlrpcd sets to avoid
+         *      unnecessary dependency. But how to distribute async RPCs load
+         *      among all the ptlrpc daemons becomes another trouble. */
+        for (i = 0; i < nthreads; i++) {
+                snprintf(name, 15, "ptlrpcd_%d", i);
+                rc = ptlrpcd_start(i, nthreads, name, &ptlrpcds->pd_threads[i]);
+                if (rc < 0)
+                        GOTO(out, rc);
         }
         }
-        EXIT;
+
+        ptlrpcds->pd_size = size;
+        ptlrpcds->pd_index = 0;
+        ptlrpcds->pd_nthreads = nthreads;
+
+out:
+        if (rc != 0 && ptlrpcds != NULL) {
+                for (j = 0; j <= i; j++)
+                        ptlrpcd_stop(&ptlrpcds->pd_threads[j], 0);
+                ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0);
+                OBD_FREE(ptlrpcds, size);
+                ptlrpcds = NULL;
+        }
+
+        RETURN(0);
 }
 
 int ptlrpcd_addref(void)
 {
         int rc = 0;
 }
 
 int ptlrpcd_addref(void)
 {
         int rc = 0;
-        int i;
-        int j;
         ENTRY;
 
         cfs_mutex_down(&ptlrpcd_sem);
         ENTRY;
 
         cfs_mutex_down(&ptlrpcd_sem);
-        if (++ptlrpcd_users == 1) {
-                for (i = 0; rc == 0 && i < PSCOPE_NR; ++i) {
-                        for (j = 0; rc == 0 && j < PT_NR; ++j) {
-                                struct ptlrpcd_thread *pt;
-                                struct ptlrpcd_ctl    *pc;
-
-                                pt = &ptlrpcd_scopes[i].pscope_thread[j];
-                                pc = &pt->pt_ctl;
-                                if (j == PT_RECOVERY)
-                                        cfs_set_bit(LIOD_RECOVERY, &pc->pc_flags);
-                                rc = ptlrpcd_start(pt->pt_name, pc);
-                        }
-                }
-                if (rc != 0) {
-                        --ptlrpcd_users;
-                        ptlrpcd_fini();
-                }
-        }
+        if (++ptlrpcd_users == 1)
+                rc = ptlrpcd_init();
         cfs_mutex_up(&ptlrpcd_sem);
         RETURN(rc);
 }
         cfs_mutex_up(&ptlrpcd_sem);
         RETURN(rc);
 }
index 0a5365e..d86ab0c 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -286,11 +289,7 @@ static int llcd_send(struct llog_canceld_ctxt *llcd)
          * first from replay llog, second for resended rpc */
         req->rq_no_delay = req->rq_no_resend = 1;
 
          * first from replay llog, second for resended rpc */
         req->rq_no_delay = req->rq_no_resend = 1;
 
-        rc = ptlrpc_set_add_new_req(&lcm->lcm_pc, req);
-        if (rc) {
-                ptlrpc_request_free(req);
-                GOTO(exit, rc);
-        }
+        ptlrpc_set_add_new_req(&lcm->lcm_pc, req);
         RETURN(0);
 exit:
         CDEBUG(D_RPCTRACE, "Refused llcd %p\n", llcd);
         RETURN(0);
 exit:
         CDEBUG(D_RPCTRACE, "Refused llcd %p\n", llcd);
@@ -405,7 +404,7 @@ int llog_recov_thread_start(struct llog_commit_master *lcm)
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        rc = ptlrpcd_start(lcm->lcm_name, &lcm->lcm_pc);
+        rc = ptlrpcd_start(-1, 1, lcm->lcm_name, &lcm->lcm_pc);
         if (rc) {
                 CERROR("Error %d while starting recovery thread %s\n",
                        rc, lcm->lcm_name);
         if (rc) {
                 CERROR("Error %d while starting recovery thread %s\n",
                        rc, lcm->lcm_name);
index 181474b..05179ec 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -1094,7 +1097,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         aa->aa_qunit = qunit;
 
         req->rq_interpret_reply = dqacq_interpret;
         aa->aa_qunit = qunit;
 
         req->rq_interpret_reply = dqacq_interpret;
-        ptlrpcd_add_req(req, PSCOPE_OTHER);
+        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
 
         QDATA_DEBUG(qdata, "%s scheduled.\n",
                     opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
 
         QDATA_DEBUG(qdata, "%s scheduled.\n",
                     opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");