Whamcloud - gitweb
LU-4687 obdclass: unified flow control interfaces 62/9562/13
authorFan Yong <fan.yong@intel.com>
Tue, 25 Mar 2014 14:16:12 +0000 (22:16 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 23 Apr 2014 17:05:13 +0000 (17:05 +0000)
Unify the flow control interfaces for MDC RPC, FLD RPC and lfsck
remote update (via OSP). We allow to adjust the maximum inflight
RPCs count via /proc interface.

Rename some variables/functions to make them easily understood.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I096a1de17fe226bac2fe3a3c891365c2e2c51ef3
Reviewed-on: http://review.whamcloud.com/9562
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Niu Yawei <yawei.niu@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
16 files changed:
lustre/fld/fld_request.c
lustre/include/lustre_mdc.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/ldlm/ldlm_lib.c
lustre/mdc/lproc_mdc.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_lib.c
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_request.c
lustre/obdclass/genops.c
lustre/osp/lproc_osp.c
lustre/osp/osp_dev.c
lustre/osp/osp_internal.h
lustre/osp/osp_trans.c
lustre/ptlrpc/nrs_crr.c

index 2a2a927..6a4986e 100644 (file)
 #include <lustre_mdc.h>
 #include "fld_internal.h"
 
-/* TODO: these 3 functions are copies of flow-control code from mdc_lib.c
- * It should be common thing. The same about mdc RPC lock */
-static int fld_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw)
-{
-        int rc;
-        ENTRY;
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-        rc = cfs_list_empty(&mcw->mcw_entry);
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
-        RETURN(rc);
-};
-
-static void fld_enter_request(struct client_obd *cli)
-{
-       struct mdc_cache_waiter mcw;
-       struct l_wait_info lwi = { 0 };
-
-       client_obd_list_lock(&cli->cl_loi_list_lock);
-       if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
-               cfs_list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters);
-               init_waitqueue_head(&mcw.mcw_waitq);
-               client_obd_list_unlock(&cli->cl_loi_list_lock);
-               l_wait_event(mcw.mcw_waitq, fld_req_avail(cli, &mcw), &lwi);
-       } else {
-               cli->cl_r_in_flight++;
-               client_obd_list_unlock(&cli->cl_loi_list_lock);
-       }
-}
-
-static void fld_exit_request(struct client_obd *cli)
-{
-       cfs_list_t *l, *tmp;
-       struct mdc_cache_waiter *mcw;
-
-       client_obd_list_lock(&cli->cl_loi_list_lock);
-       cli->cl_r_in_flight--;
-       cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
-
-               if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
-                       /* No free request slots anymore */
-                       break;
-               }
-
-               mcw = cfs_list_entry(l, struct mdc_cache_waiter, mcw_entry);
-               cfs_list_del_init(&mcw->mcw_entry);
-               cli->cl_r_in_flight++;
-               wake_up(&mcw->mcw_waitq);
-       }
-       client_obd_list_unlock(&cli->cl_loi_list_lock);
-}
-
 static int fld_rrb_hash(struct lu_client_fld *fld,
                         seqno_t seq)
 {
@@ -477,9 +426,9 @@ again:
        req->rq_reply_portal = MDC_REPLY_PORTAL;
         ptlrpc_at_set_req_timeout(req);
 
-       fld_enter_request(&exp->exp_obd->u.cli);
+       obd_get_request_slot(&exp->exp_obd->u.cli);
        rc = ptlrpc_queue_wait(req);
-       fld_exit_request(&exp->exp_obd->u.cli);
+       obd_put_request_slot(&exp->exp_obd->u.cli);
        if (rc != 0) {
                if (rc == -EWOULDBLOCK) {
                        /* For no_delay req(see above), EWOULDBLOCK means the
index 3d343b1..9f6edd3 100644 (file)
@@ -191,11 +191,6 @@ static inline void mdc_update_max_ea_from_body(struct obd_export *exp,
 }
 
 
-struct mdc_cache_waiter {
-       cfs_list_t              mcw_entry;
-       wait_queue_head_t             mcw_waitq;
-};
-
 /* mdc/mdc_locks.c */
 int it_disposition(const struct lookup_intent *it, int flag);
 void it_clear_disposition(struct lookup_intent *it, int flag);
index ac5fa1d..d06bbfe 100644 (file)
@@ -239,12 +239,12 @@ struct timeout_item {
         cfs_list_t         ti_chain;
 };
 
-#define OSC_MAX_RIF_DEFAULT       8
-#define MDS_OSC_MAX_RIF_DEFAULT   50
-#define OSC_MAX_RIF_MAX         256
-#define OSC_MAX_DIRTY_DEFAULT  (OSC_MAX_RIF_DEFAULT * 4)
-#define OSC_MAX_DIRTY_MB_MAX   2048     /* arbitrary, but < MAX_LONG bytes */
-#define OSC_DEFAULT_RESENDS      10
+#define OBD_MAX_RIF_DEFAULT    8
+#define OBD_MAX_RIF_MAX                512
+#define OSC_MAX_RIF_MAX                256
+#define OSC_MAX_DIRTY_DEFAULT  (OBD_MAX_RIF_DEFAULT * 4)
+#define OSC_MAX_DIRTY_MB_MAX   2048     /* arbitrary, but < MAX_LONG bytes */
+#define OSC_DEFAULT_RESENDS    10
 
 /* possible values for fo_sync_lock_cancel */
 enum {
@@ -254,9 +254,6 @@ enum {
         NUM_SYNC_ON_CANCEL_STATES
 };
 
-#define MDC_MAX_RIF_DEFAULT       8
-#define MDC_MAX_RIF_MAX         512
-
 struct mdc_rpc_lock;
 struct obd_import;
 struct client_obd {
index 5de7cd7..7464435 100644 (file)
@@ -128,6 +128,10 @@ struct kuc_hdr * kuc_ptr(void *p);
 int kuc_ispayload(void *p);
 void *kuc_alloc(int payload_len, int transport, int type);
 void kuc_free(void *p, int payload_len);
+int obd_get_request_slot(struct client_obd *cli);
+void obd_put_request_slot(struct client_obd *cli);
+__u32 obd_get_max_rpcs_in_flight(struct client_obd *cli);
+int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max);
 
 struct llog_handle;
 struct llog_rec_hdr;
index aac76e9..a3769d5 100644 (file)
@@ -405,7 +405,7 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
        cli->cl_chunkbits = PAGE_CACHE_SHIFT;
 
        if (!strcmp(name, LUSTRE_MDC_NAME)) {
-               cli->cl_max_rpcs_in_flight = MDC_MAX_RIF_DEFAULT;
+               cli->cl_max_rpcs_in_flight = OBD_MAX_RIF_DEFAULT;
        } else if (totalram_pages >> (20 - PAGE_CACHE_SHIFT) <= 128 /* MB */) {
                cli->cl_max_rpcs_in_flight = 2;
        } else if (totalram_pages >> (20 - PAGE_CACHE_SHIFT) <= 256 /* MB */) {
@@ -414,9 +414,9 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
                cli->cl_max_rpcs_in_flight = 4;
        } else {
                if (osc_on_mdt(obddev->obd_name))
-                       cli->cl_max_rpcs_in_flight = MDS_OSC_MAX_RIF_DEFAULT;
+                       cli->cl_max_rpcs_in_flight = OBD_MAX_RIF_MAX;
                else
-                       cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
+                       cli->cl_max_rpcs_in_flight = OBD_MAX_RIF_DEFAULT;
         }
         rc = ldlm_get_ref();
         if (rc) {
index 69e377f..3a4570a 100644 (file)
 static int mdc_max_rpcs_in_flight_seq_show(struct seq_file *m, void *v)
 {
        struct obd_device *dev = m->private;
-       struct client_obd *cli = &dev->u.cli;
+       __u32 max;
        int rc;
 
-       client_obd_list_lock(&cli->cl_loi_list_lock);
-       rc = seq_printf(m, "%u\n", cli->cl_max_rpcs_in_flight);
-       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       max = obd_get_max_rpcs_in_flight(&dev->u.cli);
+       rc = seq_printf(m, "%u\n", max);
+
        return rc;
 }
 
@@ -60,21 +60,17 @@ static ssize_t mdc_max_rpcs_in_flight_seq_write(struct file *file,
                                                loff_t *off)
 {
        struct obd_device *dev = ((struct seq_file *)file->private_data)->private;
-        struct client_obd *cli = &dev->u.cli;
-        int val, rc;
-
-        rc = lprocfs_write_helper(buffer, count, &val);
-        if (rc)
-                return rc;
+       int val;
+       int rc;
 
-        if (val < 1 || val > MDC_MAX_RIF_MAX)
-                return -ERANGE;
+       rc = lprocfs_write_helper(buffer, count, &val);
+       if (rc == 0)
+               rc = obd_set_max_rpcs_in_flight(&dev->u.cli, val);
 
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-        cli->cl_max_rpcs_in_flight = val;
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
+       if (rc != 0)
+               count = rc;
 
-        return count;
+       return count;
 }
 LPROC_SEQ_FOPS(mdc_max_rpcs_in_flight);
 
index a49e9cd..68c5cc6 100644 (file)
@@ -72,8 +72,6 @@ void mdc_link_pack(struct ptlrpc_request *req, struct md_op_data *op_data);
 void mdc_rename_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
                      const char *old, int oldlen, const char *new, int newlen);
 void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data);
-int mdc_enter_request(struct client_obd *cli);
-void mdc_exit_request(struct client_obd *cli);
 
 /* mdc/mdc_locks.c */
 int mdc_set_lock_data(struct obd_export *exp,
index 2d8a09d..1e4c3dc 100644 (file)
@@ -546,65 +546,3 @@ void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
         mdc_ioepoch_pack(epoch, op_data);
        mdc_hsm_release_pack(req, op_data);
 }
-
-static int mdc_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw)
-{
-        int rc;
-        ENTRY;
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-        rc = cfs_list_empty(&mcw->mcw_entry);
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
-        RETURN(rc);
-};
-
-/* We record requests in flight in cli->cl_r_in_flight here.
- * There is only one write rpc possible in mdc anyway. If this to change
- * in the future - the code may need to be revisited. */
-int mdc_enter_request(struct client_obd *cli)
-{
-       int rc = 0;
-       struct mdc_cache_waiter mcw;
-       struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
-
-       client_obd_list_lock(&cli->cl_loi_list_lock);
-       if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
-               cfs_list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters);
-               init_waitqueue_head(&mcw.mcw_waitq);
-               client_obd_list_unlock(&cli->cl_loi_list_lock);
-               rc = l_wait_event(mcw.mcw_waitq, mdc_req_avail(cli, &mcw), &lwi);
-               if (rc) {
-                       client_obd_list_lock(&cli->cl_loi_list_lock);
-                       if (cfs_list_empty(&mcw.mcw_entry))
-                               cli->cl_r_in_flight--;
-                       cfs_list_del_init(&mcw.mcw_entry);
-                       client_obd_list_unlock(&cli->cl_loi_list_lock);
-               }
-       } else {
-               cli->cl_r_in_flight++;
-               client_obd_list_unlock(&cli->cl_loi_list_lock);
-       }
-       return rc;
-}
-
-void mdc_exit_request(struct client_obd *cli)
-{
-       cfs_list_t *l, *tmp;
-       struct mdc_cache_waiter *mcw;
-
-       client_obd_list_lock(&cli->cl_loi_list_lock);
-       cli->cl_r_in_flight--;
-       cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
-               if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
-                       /* No free request slots anymore */
-                       break;
-               }
-
-               mcw = cfs_list_entry(l, struct mdc_cache_waiter, mcw_entry);
-               cfs_list_del_init(&mcw->mcw_entry);
-               cli->cl_r_in_flight++;
-               wake_up(&mcw->mcw_waitq);
-       }
-       /* Empty waiting list? Decrease reqs in-flight number */
-
-       client_obd_list_unlock(&cli->cl_loi_list_lock);
-}
index fa40867..ffc49c3 100644 (file)
@@ -872,7 +872,7 @@ resend:
          * rpcs in flight counter. We do not do flock request limiting, though*/
         if (it) {
                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
-                rc = mdc_enter_request(&obddev->u.cli);
+               rc = obd_get_request_slot(&obddev->u.cli);
                 if (rc != 0) {
                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
                         mdc_clear_replay_flag(req, 0);
@@ -899,7 +899,7 @@ resend:
                RETURN(rc);
        }
 
-       mdc_exit_request(&obddev->u.cli);
+       obd_put_request_slot(&obddev->u.cli);
        mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
 
        if (rc < 0) {
@@ -1228,7 +1228,7 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
 
         obddev = class_exp2obd(exp);
 
-        mdc_exit_request(&obddev->u.cli);
+       obd_put_request_slot(&obddev->u.cli);
         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
                 rc = -ETIMEDOUT;
 
@@ -1290,7 +1290,7 @@ int mdc_intent_getattr_async(struct obd_export *exp,
        if (IS_ERR(req))
                RETURN(PTR_ERR(req));
 
-        rc = mdc_enter_request(&obddev->u.cli);
+       rc = obd_get_request_slot(&obddev->u.cli);
         if (rc != 0) {
                 ptlrpc_req_finished(req);
                 RETURN(rc);
@@ -1299,7 +1299,7 @@ int mdc_intent_getattr_async(struct obd_export *exp,
         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
                              0, LVB_T_NONE, &minfo->mi_lockh, 1);
         if (rc < 0) {
-                mdc_exit_request(&obddev->u.cli);
+               obd_put_request_slot(&obddev->u.cli);
                 ptlrpc_req_finished(req);
                 RETURN(rc);
         }
index d5c9701..846b659 100644 (file)
@@ -95,15 +95,15 @@ static inline int mdc_queue_wait(struct ptlrpc_request *req)
        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
        int rc;
 
-       /* mdc_enter_request() ensures that this client has no more
+       /* obd_get_request_slot() ensures that this client has no more
         * than cl_max_rpcs_in_flight RPCs simultaneously inf light
         * against an MDT. */
-       rc = mdc_enter_request(cli);
+       rc = obd_get_request_slot(cli);
        if (rc != 0)
                return rc;
 
        rc = ptlrpc_queue_wait(req);
-       mdc_exit_request(cli);
+       obd_put_request_slot(cli);
 
        return rc;
 }
index 98dbcce..d7c541a 100644 (file)
@@ -1934,5 +1934,132 @@ inline void kuc_free(void *p, int payload_len)
 }
 EXPORT_SYMBOL(kuc_free);
 
+struct obd_request_slot_waiter {
+       struct list_head        orsw_entry;
+       wait_queue_head_t       orsw_waitq;
+       bool                    orsw_signaled;
+};
+
+static bool obd_request_slot_avail(struct client_obd *cli,
+                                  struct obd_request_slot_waiter *orsw)
+{
+       bool avail;
+
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       avail = !!list_empty(&orsw->orsw_entry);
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       return avail;
+};
 
+/*
+ * For network flow control, the RPC sponsor needs to acquire a credit
+ * before sending the RPC. The credits count for a connection is defined
+ * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
+ * the subsequent RPC sponsors need to wait until others released their
+ * credits, or the administrator increased the "cl_max_rpcs_in_flight".
+ */
+int obd_get_request_slot(struct client_obd *cli)
+{
+       struct obd_request_slot_waiter   orsw;
+       struct l_wait_info               lwi;
+       int                              rc;
 
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
+               cli->cl_r_in_flight++;
+               client_obd_list_unlock(&cli->cl_loi_list_lock);
+               return 0;
+       }
+
+       init_waitqueue_head(&orsw.orsw_waitq);
+       list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
+       orsw.orsw_signaled = false;
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+       rc = l_wait_event(orsw.orsw_waitq,
+                         obd_request_slot_avail(cli, &orsw) ||
+                         orsw.orsw_signaled,
+                         &lwi);
+
+       /* Here, we must take the lock to avoid the on-stack 'orsw' to be
+        * freed but other (such as obd_put_request_slot) is using it. */
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       if (rc != 0) {
+               if (!orsw.orsw_signaled) {
+                       if (list_empty(&orsw.orsw_entry))
+                               cli->cl_r_in_flight--;
+                       else
+                               list_del(&orsw.orsw_entry);
+               }
+       }
+
+       if (orsw.orsw_signaled) {
+               LASSERT(list_empty(&orsw.orsw_entry));
+
+               rc = -EINTR;
+       }
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       return rc;
+}
+EXPORT_SYMBOL(obd_get_request_slot);
+
+void obd_put_request_slot(struct client_obd *cli)
+{
+       struct obd_request_slot_waiter *orsw;
+
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       cli->cl_r_in_flight--;
+
+       /* If there is free slot, wakeup the first waiter. */
+       if (!list_empty(&cli->cl_loi_read_list) &&
+           likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
+               orsw = list_entry(cli->cl_loi_read_list.next,
+                                 struct obd_request_slot_waiter, orsw_entry);
+               list_del_init(&orsw->orsw_entry);
+               cli->cl_r_in_flight++;
+               wake_up(&orsw->orsw_waitq);
+       }
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+}
+EXPORT_SYMBOL(obd_put_request_slot);
+
+__u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
+{
+       return cli->cl_max_rpcs_in_flight;
+}
+EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
+
+int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
+{
+       struct obd_request_slot_waiter *orsw;
+       __u32                           old;
+       int                             diff;
+       int                             i;
+
+       if (max > OBD_MAX_RIF_MAX || max < 1)
+               return -ERANGE;
+
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       old = cli->cl_max_rpcs_in_flight;
+       cli->cl_max_rpcs_in_flight = max;
+       diff = max - old;
+
+       /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
+       for (i = 0; i < diff; i++) {
+               if (list_empty(&cli->cl_loi_read_list))
+                       break;
+
+               orsw = list_entry(cli->cl_loi_read_list.next,
+                                 struct obd_request_slot_waiter, orsw_entry);
+               list_del_init(&orsw->orsw_entry);
+               cli->cl_r_in_flight++;
+               wake_up(&orsw->orsw_waitq);
+       }
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       return 0;
+}
+EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
index 75ebeb1..118c4df 100644 (file)
@@ -429,6 +429,38 @@ static int osp_rd_old_sync_processed(char *page, char **start, off_t off,
        return rc;
 }
 
+static int osp_rd_lfsck_max_rpcs_in_flight(char *page, char **start, off_t off,
+                                          int count, int *eof, void *data)
+{
+       struct obd_device *dev = data;
+       __u32 max;
+       int rc;
+
+       *eof = 1;
+       max = obd_get_max_rpcs_in_flight(&dev->u.cli);
+       rc = snprintf(page, count, "%u\n", max);
+
+       return rc;
+}
+
+static int osp_wr_lfsck_max_rpcs_in_flight(struct file *file,
+                                          const char *buffer,
+                                          unsigned long count, void *data)
+{
+       struct obd_device *dev = data;
+       int val;
+       int rc;
+
+       rc = lprocfs_write_helper(buffer, count, &val);
+       if (rc == 0)
+               rc = obd_set_max_rpcs_in_flight(&dev->u.cli, val);
+
+       if (rc != 0)
+               count = rc;
+
+       return count;
+}
+
 static struct lprocfs_vars lprocfs_osp_obd_vars[] = {
        { "uuid",               lprocfs_rd_uuid, 0, 0 },
        { "ping",               0, lprocfs_wr_ping, 0, 0, 0222 },
@@ -461,6 +493,8 @@ static struct lprocfs_vars lprocfs_osp_obd_vars[] = {
 
        /* for compatibility reasons */
        { "destroys_in_flight", osp_rd_destroys_in_flight, 0, 0 },
+       { "lfsck_max_rpcs_in_flight", osp_rd_lfsck_max_rpcs_in_flight,
+                                     osp_wr_lfsck_max_rpcs_in_flight, 0 },
        { 0 }
 };
 
index 60bf8ef..ff0b794 100644 (file)
@@ -531,8 +531,6 @@ out:
        RETURN(rc);
 }
 
-#define OSP_MAX_AUIF_MAX       512
-
 static int osp_init0(const struct lu_env *env, struct osp_device *m,
                     struct lu_device_type *ldt, struct lustre_cfg *cfg)
 {
@@ -546,8 +544,6 @@ static int osp_init0(const struct lu_env *env, struct osp_device *m,
        ENTRY;
 
        mutex_init(&m->opd_async_requests_mutex);
-       /* We allow OSP_MAX_AUIF_MAX async updates in flight at most. */
-       sema_init(&m->opd_async_fc_sem, OSP_MAX_AUIF_MAX);
 
        obd = class_name2obd(lustre_cfg_string(cfg, 0));
        if (obd == NULL) {
index f2f2a2c..c90e029 100644 (file)
@@ -195,7 +195,6 @@ struct osp_device {
        struct dt_update_request        *opd_async_requests;
        /* Protect current operations on opd_async_requests. */
        struct mutex                     opd_async_requests_mutex;
-       struct semaphore                 opd_async_fc_sem;
 };
 
 #define opd_pre_lock                   opd_pre->osp_pre_lock
index fd137d1..8e77ecf 100644 (file)
@@ -35,7 +35,7 @@
 
 struct osp_async_update_args {
        struct dt_update_request *oaua_update;
-       unsigned int             oaua_fc:1;
+       bool                      oaua_flow_control;
 };
 
 struct osp_async_update_item {
@@ -82,13 +82,13 @@ static int osp_async_update_interpret(const struct lu_env *env,
        struct dt_update_request        *dt_update = oaua->oaua_update;
        struct osp_async_update_item    *oaui;
        struct osp_async_update_item    *next;
-       struct osp_device               *osp    = dt2osp_dev(dt_update->dur_dt);
        int                              count  = 0;
        int                              index  = 0;
        int                              rc1    = 0;
 
-       if (oaua->oaua_fc)
-               up(&osp->opd_async_fc_sem);
+       if (oaua->oaua_flow_control)
+               obd_put_request_slot(
+                               &dt2osp_dev(dt_update->dur_dt)->opd_obd->u.cli);
 
        if (rc == 0 || req->rq_repmsg != NULL) {
                reply = req_capsule_server_sized_get(&req->rq_pill,
@@ -269,7 +269,7 @@ out:
 
 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
                             struct dt_update_request *dt_update,
-                            struct thandle *th, bool fc)
+                            struct thandle *th, bool flow_control)
 {
        struct thandle_update   *tu = th->th_update;
        int                     rc = 0;
@@ -288,7 +288,7 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
                if (rc == 0) {
                        args = ptlrpc_req_async_args(req);
                        args->oaua_update = dt_update;
-                       args->oaua_fc = !!fc;
+                       args->oaua_flow_control = flow_control;
                        req->rq_interpret_reply =
                                osp_async_update_interpret;
                        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
@@ -366,24 +366,25 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
        if (is_only_remote_trans(th)) {
                if (th->th_result == 0) {
                        struct osp_device *osp = dt2osp_dev(th->th_dev);
+                       struct client_obd *cli = &osp->opd_obd->u.cli;
 
-                       do {
-                               if (!osp->opd_imp_active ||
-                                   osp->opd_got_disconnected) {
-                                       out_destroy_update_req(dt_update);
-                                       GOTO(put, rc = -ENOTCONN);
-                               }
+                       rc = obd_get_request_slot(cli);
+                       if (!osp->opd_imp_active || osp->opd_got_disconnected) {
+                               if (rc == 0)
+                                       obd_put_request_slot(cli);
 
-                               /* Get the semaphore to guarantee it has
-                                * free slot, which will be released via
-                                * osp_async_update_interpret(). */
-                               rc = down_timeout(&osp->opd_async_fc_sem, HZ);
-                       } while (rc != 0);
+                               rc = -ENOTCONN;
+                       }
+
+                       if (rc != 0) {
+                               out_destroy_update_req(dt_update);
+                               goto put;
+                       }
 
                        rc = osp_trans_trigger(env, dt2osp_dev(dt),
                                               dt_update, th, true);
                        if (rc != 0)
-                               up(&osp->opd_async_fc_sem);
+                               obd_put_request_slot(cli);
                } else {
                        rc = th->th_result;
                        out_destroy_update_req(dt_update);
index 53a3fbc..e0f1e8d 100644 (file)
@@ -214,7 +214,7 @@ static int nrs_crrn_start(struct ptlrpc_nrs_policy *policy, char *arg)
         * with the default max_rpcs_in_flight value, as we are scheduling over
         * NIDs, and there may be more than one mount point per client.
         */
-       net->cn_quantum = OSC_MAX_RIF_DEFAULT;
+       net->cn_quantum = OBD_MAX_RIF_DEFAULT;
        /**
         * Set to 1 so that the test inside nrs_crrn_req_add() can evaluate to
         * true.