Whamcloud - gitweb
LU-5319 mdc: manage number of modify RPCs in flight 74/14374/12
authorGregoire Pichon <gregoire.pichon@bull.net>
Mon, 30 Mar 2015 15:06:54 +0000 (17:06 +0200)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 1 Jul 2015 01:44:01 +0000 (01:44 +0000)
This patch is the main client part of a new feature that supports
multiple modify metadata RPCs in parallel. Its goal is to improve
metadata operations performance of a single client, while maintening
the consistency of MDT reply reconstruction and MDT recovery
mecanisms.

It allows to manage the number of modify RPCs in flight within
the client obd structure and to assign a virtual index (the tag) to
each modify RPC to help server side cleaning of reply data.

The mdc component uses this feature to send multiple modify RPCs
in parallel.

Signed-off-by: Gregoire Pichon <gregoire.pichon@bull.net>
Change-Id: Ia707e39770e479648627611a99d0724e7070baec
Reviewed-on: http://review.whamcloud.com/14374
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
12 files changed:
lustre/fid/fid_request.c
lustre/include/lustre_mdc.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/ldlm/ldlm_lib.c
lustre/mdc/lproc_mdc.c
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/obdclass/genops.c
lustre/tests/replay-single.sh
lustre/tests/sanity.sh

index 528bed7..8cb64f1 100644 (file)
@@ -115,13 +115,8 @@ static int seq_client_rpc(struct lu_client_seq *seq,
 
        ptlrpc_at_set_req_timeout(req);
 
-       if (opc != SEQ_ALLOC_SUPER && seq->lcs_type == LUSTRE_SEQ_METADATA)
-               mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
-
        rc = ptlrpc_queue_wait(req);
 
-       if (opc != SEQ_ALLOC_SUPER && seq->lcs_type == LUSTRE_SEQ_METADATA)
-               mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
        if (rc)
                GOTO(out_req, rc);
 
index 7ca89dc..f09f327 100644 (file)
@@ -165,6 +165,31 @@ static inline void mdc_put_rpc_lock(struct mdc_rpc_lock *lck,
        EXIT;
 }
 
+static inline void mdc_get_mod_rpc_slot(struct ptlrpc_request *req,
+                                       struct lookup_intent *it)
+{
+       struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+       __u32 opc;
+       __u16 tag;
+
+       opc = lustre_msg_get_opc(req->rq_reqmsg);
+       tag = obd_get_mod_rpc_slot(cli, opc, it);
+       lustre_msg_set_tag(req->rq_reqmsg, tag);
+}
+
+static inline void mdc_put_mod_rpc_slot(struct ptlrpc_request *req,
+                                       struct lookup_intent *it)
+{
+       struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+       __u32 opc;
+       __u16 tag;
+
+       opc = lustre_msg_get_opc(req->rq_reqmsg);
+       tag = lustre_msg_get_tag(req->rq_reqmsg);
+       obd_put_mod_rpc_slot(cli, opc, it, tag);
+}
+
+
 /**
  * Update the maximum possible easize.
  *
index c658de1..fad6b03 100644 (file)
@@ -260,12 +260,16 @@ struct client_obd {
        wait_queue_head_t        cl_destroy_waitq;
 
         struct mdc_rpc_lock     *cl_rpc_lock;
-        struct mdc_rpc_lock     *cl_close_lock;
 
        /* modify rpcs in flight
         * currently used for metadata only */
        spinlock_t               cl_mod_rpcs_lock;
        __u16                    cl_max_mod_rpcs_in_flight;
+       __u16                    cl_mod_rpcs_in_flight;
+       __u16                    cl_close_rpcs_in_flight;
+       wait_queue_head_t        cl_mod_rpcs_waitq;
+       unsigned long           *cl_mod_tag_bitmap;
+       struct obd_histogram     cl_mod_rpcs_hist;
 
         /* mgc datastruct */
        struct mutex              cl_mgc_mutex;
index 1c6a3ae..e1d776c 100644 (file)
@@ -119,6 +119,12 @@ __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli);
 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max);
 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli);
 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max);
+int obd_mod_rpc_stats_seq_show(struct client_obd *cli, struct seq_file *seq);
+
+__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
+                          struct lookup_intent *it);
+void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
+                         struct lookup_intent *it, __u16 tag);
 
 struct llog_handle;
 struct llog_rec_hdr;
index cff32cc..448eb9b 100644 (file)
@@ -415,6 +415,23 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
                else
                        cli->cl_max_rpcs_in_flight = OBD_MAX_RIF_DEFAULT;
         }
+
+       spin_lock_init(&cli->cl_mod_rpcs_lock);
+       spin_lock_init(&cli->cl_mod_rpcs_hist.oh_lock);
+       cli->cl_max_mod_rpcs_in_flight = 0;
+       cli->cl_mod_rpcs_in_flight = 0;
+       cli->cl_close_rpcs_in_flight = 0;
+       init_waitqueue_head(&cli->cl_mod_rpcs_waitq);
+       cli->cl_mod_tag_bitmap = NULL;
+
+       if (connect_op == MDS_CONNECT) {
+               cli->cl_max_mod_rpcs_in_flight = cli->cl_max_rpcs_in_flight - 1;
+               OBD_ALLOC(cli->cl_mod_tag_bitmap,
+                         BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
+               if (cli->cl_mod_tag_bitmap == NULL)
+                       GOTO(err, rc = -ENOMEM);
+       }
+
         rc = ldlm_get_ref();
         if (rc) {
                 CERROR("ldlm_get_ref failed: %d\n", rc);
@@ -471,6 +488,10 @@ err_import:
 err_ldlm:
         ldlm_put_ref();
 err:
+       if (cli->cl_mod_tag_bitmap != NULL)
+               OBD_FREE(cli->cl_mod_tag_bitmap,
+                        BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
+       cli->cl_mod_tag_bitmap = NULL;
         RETURN(rc);
 
 }
@@ -478,6 +499,7 @@ EXPORT_SYMBOL(client_obd_setup);
 
 int client_obd_cleanup(struct obd_device *obddev)
 {
+       struct client_obd *cli = &obddev->u.cli;
        ENTRY;
 
        ldlm_namespace_free_post(obddev->obd_namespace);
@@ -487,6 +509,12 @@ int client_obd_cleanup(struct obd_device *obddev)
        LASSERT(obddev->u.cli.cl_import == NULL);
 
        ldlm_put_ref();
+
+       if (cli->cl_mod_tag_bitmap != NULL)
+               OBD_FREE(cli->cl_mod_tag_bitmap,
+                        BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
+       cli->cl_mod_tag_bitmap = NULL;
+
        RETURN(0);
 }
 EXPORT_SYMBOL(client_obd_cleanup);
@@ -502,6 +530,7 @@ int client_connect_import(const struct lu_env *env,
        struct obd_connect_data *ocd;
        struct lustre_handle    conn    = { 0 };
        int                     rc;
+       bool                    is_mdc = false;
        ENTRY;
 
         *exp = NULL;
@@ -526,6 +555,10 @@ int client_connect_import(const struct lu_env *env,
         ocd = &imp->imp_connect_data;
         if (data) {
                 *ocd = *data;
+               is_mdc = strncmp(imp->imp_obd->obd_type->typ_name,
+                                LUSTRE_MDC_NAME, 3) == 0;
+               if (is_mdc)
+                       data->ocd_connect_flags |= OBD_CONNECT_MULTIMODRPCS;
                 imp->imp_connect_flags_orig = data->ocd_connect_flags;
         }
 
@@ -541,6 +574,10 @@ int client_connect_import(const struct lu_env *env,
                          ocd->ocd_connect_flags, "old "LPX64", new "LPX64"\n",
                          data->ocd_connect_flags, ocd->ocd_connect_flags);
                 data->ocd_connect_flags = ocd->ocd_connect_flags;
+               /* clear the flag as it was not set and is not known
+                * by upper layers */
+               if (is_mdc)
+                       data->ocd_connect_flags &= ~OBD_CONNECT_MULTIMODRPCS;
         }
 
         ptlrpc_pinger_add_import(imp);
index c5cb9d8..934669d 100644 (file)
@@ -114,6 +114,30 @@ static ssize_t mdc_max_mod_rpcs_in_flight_seq_write(struct file *file,
 }
 LPROC_SEQ_FOPS(mdc_max_mod_rpcs_in_flight);
 
+
+static int mdc_rpc_stats_seq_show(struct seq_file *seq, void *v)
+{
+       struct obd_device *dev = seq->private;
+
+       return obd_mod_rpc_stats_seq_show(&dev->u.cli, seq);
+}
+
+
+static ssize_t mdc_rpc_stats_seq_write(struct file *file,
+                                      const char __user *buf,
+                                      size_t len, loff_t *off)
+{
+       struct seq_file *seq = file->private_data;
+       struct obd_device *dev = seq->private;
+       struct client_obd *cli = &dev->u.cli;
+
+       lprocfs_oh_clear(&cli->cl_mod_rpcs_hist);
+
+       return len;
+}
+LPROC_SEQ_FOPS(mdc_rpc_stats);
+
+
 LPROC_SEQ_FOPS_WO_TYPE(mdc, ping);
 
 LPROC_SEQ_FOPS_RO_TYPE(mdc, uuid);
@@ -182,6 +206,8 @@ struct lprocfs_vars lprocfs_mdc_obd_vars[] = {
          .fops =       &mdc_state_fops                 },
        { .name =       "pinger_recov",
          .fops =       &mdc_pinger_recov_fops          },
+       { .name =       "rpc_stats",
+         .fops =       &mdc_rpc_stats_fops             },
        { NULL }
 };
 #endif /* CONFIG_PROC_FS */
index 1866103..d93146e 100644 (file)
@@ -828,14 +828,15 @@ resend:
                 req->rq_sent = cfs_time_current_sec() + resends;
         }
 
-        /* It is important to obtain rpc_lock first (if applicable), so that
-         * threads that are serialised with rpc_lock are not polluting our
-         * rpcs in flight counter. We do not do flock request limiting, though*/
-        if (it) {
-                mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+       /* It is important to obtain modify RPC slot first (if applicable), so
+        * that threads that are waiting for a modify RPC slot are not polluting
+        * our rpcs in flight counter.
+        * We do not do flock request limiting, though */
+       if (it) {
+               mdc_get_mod_rpc_slot(req, it);
                rc = obd_get_request_slot(&obddev->u.cli);
-                if (rc != 0) {
-                        mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+               if (rc != 0) {
+                       mdc_put_mod_rpc_slot(req, it);
                         mdc_clear_replay_flag(req, 0);
                         ptlrpc_req_finished(req);
                         RETURN(rc);
@@ -861,7 +862,7 @@ resend:
        }
 
        obd_put_request_slot(&obddev->u.cli);
-       mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+       mdc_put_mod_rpc_slot(req, it);
 
        if (rc < 0) {
                CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
index 3e5731a..31985a2 100644 (file)
 #include <lustre_fid.h>
 
 /* mdc_setattr does its own semaphore handling */
-static int mdc_reint(struct ptlrpc_request *request,
-                     struct mdc_rpc_lock *rpc_lock,
-                     int level)
+static int mdc_reint(struct ptlrpc_request *request, int level)
 {
         int rc;
 
         request->rq_send_state = level;
 
-        mdc_get_rpc_lock(rpc_lock, NULL);
-        rc = ptlrpc_queue_wait(request);
-        mdc_put_rpc_lock(rpc_lock, NULL);
+       mdc_get_mod_rpc_slot(request, NULL);
+       rc = ptlrpc_queue_wait(request);
+       mdc_put_mod_rpc_slot(request, NULL);
         if (rc)
                 CDEBUG(D_INFO, "error in handling %d\n", rc);
         else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY)) {
@@ -106,8 +104,6 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
 {
        struct list_head cancels = LIST_HEAD_INIT(cancels);
         struct ptlrpc_request *req;
-        struct mdc_rpc_lock *rpc_lock;
-        struct obd_device *obd = exp->exp_obd;
         int count = 0, rc;
         __u64 bits;
         ENTRY;
@@ -138,8 +134,6 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
                RETURN(rc);
        }
 
-        rpc_lock = obd->u.cli.cl_rpc_lock;
-
         if (op_data->op_attr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
                 CDEBUG(D_INODE, "setting mtime "CFS_TIME_T
                        ", ctime "CFS_TIME_T"\n",
@@ -149,7 +143,7 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
 
         ptlrpc_request_set_replen(req);
 
-        rc = mdc_reint(req, rpc_lock, LUSTRE_IMP_FULL);
+       rc = mdc_reint(req, LUSTRE_IMP_FULL);
        if (rc == -ERESTARTSYS)
                 rc = 0;
 
@@ -229,7 +223,7 @@ rebuild:
         }
         level = LUSTRE_IMP_FULL;
  resend:
-        rc = mdc_reint(req, exp->exp_obd->u.cli.cl_rpc_lock, level);
+       rc = mdc_reint(req, level);
 
         /* Resend if we were told to. */
         if (rc == -ERESTARTSYS) {
@@ -302,7 +296,7 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
 
         *request = req;
 
-        rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
+       rc = mdc_reint(req, LUSTRE_IMP_FULL);
         if (rc == -ERESTARTSYS)
                 rc = 0;
         RETURN(rc);
@@ -312,7 +306,6 @@ int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
              struct ptlrpc_request **request)
 {
        struct list_head cancels = LIST_HEAD_INIT(cancels);
-        struct obd_device *obd = exp->exp_obd;
         struct ptlrpc_request *req;
         int count = 0, rc;
         ENTRY;
@@ -346,7 +339,7 @@ int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
         mdc_link_pack(req, op_data);
         ptlrpc_request_set_replen(req);
 
-        rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
+       rc = mdc_reint(req, LUSTRE_IMP_FULL);
         *request = req;
         if (rc == -ERESTARTSYS)
                 rc = 0;
@@ -410,7 +403,7 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
                             obd->u.cli.cl_default_mds_easize);
        ptlrpc_request_set_replen(req);
 
-        rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
+       rc = mdc_reint(req, LUSTRE_IMP_FULL);
         *request = req;
         if (rc == -ERESTARTSYS)
                 rc = 0;
index fa082a2..3e41015 100644 (file)
@@ -351,12 +351,12 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
 
         /* make rpc */
         if (opcode == MDS_REINT)
-                mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+               mdc_get_mod_rpc_slot(req, NULL);
 
         rc = ptlrpc_queue_wait(req);
 
         if (opcode == MDS_REINT)
-                mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+               mdc_put_mod_rpc_slot(req, NULL);
 
         if (rc)
                 ptlrpc_req_finished(req);
@@ -790,9 +790,9 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
 
         ptlrpc_request_set_replen(req);
 
-        mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
-        rc = ptlrpc_queue_wait(req);
-        mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
+       mdc_get_mod_rpc_slot(req, NULL);
+       rc = ptlrpc_queue_wait(req);
+       mdc_put_mod_rpc_slot(req, NULL);
 
         if (req->rq_repmsg == NULL) {
                 CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req,
@@ -1512,9 +1512,9 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+       mdc_get_mod_rpc_slot(req, NULL);
        rc = ptlrpc_queue_wait(req);
-       mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+       mdc_put_mod_rpc_slot(req, NULL);
 
        GOTO(out, rc);
 out:
@@ -1691,9 +1691,9 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+       mdc_get_mod_rpc_slot(req, NULL);
        rc = ptlrpc_queue_wait(req);
-       mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+       mdc_put_mod_rpc_slot(req, NULL);
 
        GOTO(out, rc);
 out:
@@ -1751,9 +1751,9 @@ static int mdc_ioc_hsm_request(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+       mdc_get_mod_rpc_slot(req, NULL);
        rc = ptlrpc_queue_wait(req);
-       mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+       mdc_put_mod_rpc_slot(req, NULL);
 
        GOTO(out, rc);
 
@@ -2603,27 +2603,16 @@ static void mdc_llog_finish(struct obd_device *obd)
 
 static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
 {
-       struct client_obd               *cli = &obd->u.cli;
        int                             rc;
        ENTRY;
 
-        OBD_ALLOC(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
-        if (!cli->cl_rpc_lock)
-                RETURN(-ENOMEM);
-        mdc_init_rpc_lock(cli->cl_rpc_lock);
-
        rc = ptlrpcd_addref();
        if (rc < 0)
-               GOTO(err_rpc_lock, rc);
-
-        OBD_ALLOC(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
-        if (!cli->cl_close_lock)
-                GOTO(err_ptlrpcd_decref, rc = -ENOMEM);
-        mdc_init_rpc_lock(cli->cl_close_lock);
+               RETURN(rc);
 
         rc = client_obd_setup(obd, cfg);
         if (rc)
-                GOTO(err_close_lock, rc);
+               GOTO(err_ptlrpcd_decref, rc);
 #ifdef CONFIG_PROC_FS
        obd->obd_vars = lprocfs_mdc_obd_vars;
        lprocfs_obd_setup(obd);
@@ -2643,17 +2632,10 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
                RETURN(rc);
         }
 
-       spin_lock_init(&cli->cl_mod_rpcs_lock);
-       cli->cl_max_mod_rpcs_in_flight = OBD_MAX_RIF_DEFAULT - 1;
-
         RETURN(rc);
 
-err_close_lock:
-        OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
 err_ptlrpcd_decref:
         ptlrpcd_decref();
-err_rpc_lock:
-        OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
         RETURN(rc);
 }
 
@@ -2706,11 +2688,6 @@ static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
 
 static int mdc_cleanup(struct obd_device *obd)
 {
-        struct client_obd *cli = &obd->u.cli;
-
-        OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
-        OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
-
         ptlrpcd_decref();
 
         return client_obd_cleanup(obd);
index 24b8caa..344bdd9 100644 (file)
@@ -2012,6 +2012,7 @@ int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
 {
        struct obd_connect_data *ocd;
        __u16 maxmodrpcs;
+       __u16 prev;
 
        if (max > OBD_MAX_RIF_MAX || max < 1)
                return -ERANGE;
@@ -2040,11 +2041,179 @@ int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
                return -ERANGE;
        }
 
+       spin_lock(&cli->cl_mod_rpcs_lock);
+
+       prev = cli->cl_max_mod_rpcs_in_flight;
        cli->cl_max_mod_rpcs_in_flight = max;
 
-       /* will have to wakeup waiters if max has been increased */
+       /* wakeup waiters if limit has been increased */
+       if (cli->cl_max_mod_rpcs_in_flight > prev)
+               wake_up(&cli->cl_mod_rpcs_waitq);
+
+       spin_unlock(&cli->cl_mod_rpcs_lock);
 
        return 0;
 }
 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
 
+
+#define pct(a, b) (b ? a * 100 / b : 0)
+int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
+                              struct seq_file *seq)
+{
+       struct timeval now;
+       unsigned long mod_tot = 0, mod_cum;
+       int i;
+
+       do_gettimeofday(&now);
+
+       spin_lock(&cli->cl_mod_rpcs_lock);
+
+       seq_printf(seq, "snapshot_time:         %lu.%lu (secs.usecs)\n",
+                  now.tv_sec, now.tv_usec);
+       seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
+                  cli->cl_mod_rpcs_in_flight);
+
+       seq_printf(seq, "\n\t\t\tmodify\n");
+       seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
+
+       mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
+
+       mod_cum = 0;
+       for (i = 0; i < OBD_HIST_MAX; i++) {
+               unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
+               mod_cum += mod;
+               seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
+                                i, mod, pct(mod, mod_tot),
+                                pct(mod_cum, mod_tot));
+               if (mod_cum == mod_tot)
+                       break;
+       }
+
+       spin_unlock(&cli->cl_mod_rpcs_lock);
+
+       return 0;
+}
+EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
+#undef pct
+
+
+/* The number of modify RPCs sent in parallel is limited
+ * because the server has a finite number of slots per client to
+ * store request result and ensure reply reconstruction when needed.
+ * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
+ * that takes into account server limit and cl_max_rpcs_in_flight
+ * value.
+ * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
+ * one close request is allowed above the maximum.
+ */
+static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
+                                                bool close_req)
+{
+       bool avail;
+
+       /* A slot is available if
+        * - number of modify RPCs in flight is less than the max
+        * - it's a close RPC and no other close request is in flight
+        */
+       avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
+               (close_req && cli->cl_close_rpcs_in_flight == 0);
+
+       return avail;
+}
+
+static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
+                                        bool close_req)
+{
+       bool avail;
+
+       spin_lock(&cli->cl_mod_rpcs_lock);
+       avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
+       spin_unlock(&cli->cl_mod_rpcs_lock);
+       return avail;
+}
+
+/* Get a modify RPC slot from the obd client @cli according
+ * to the kind of operation @opc that is going to be sent
+ * and the intent @it of the operation if it applies.
+ * If the maximum number of modify RPCs in flight is reached
+ * the thread is put to sleep.
+ * Returns the tag to be set in the request message. Tag 0
+ * is reserved for non-modifying requests.
+ */
+__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
+                          struct lookup_intent *it)
+{
+       struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
+       bool                    close_req = false;
+       __u16                   i, max;
+
+       /* read-only metadata RPCs don't consume a slot on MDT
+        * for reply reconstruction
+        */
+       if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
+                          it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
+               return 0;
+
+       if (opc == MDS_CLOSE)
+               close_req = true;
+
+       do {
+               spin_lock(&cli->cl_mod_rpcs_lock);
+               max = cli->cl_max_mod_rpcs_in_flight;
+               if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
+                       /* there is a slot available */
+                       cli->cl_mod_rpcs_in_flight++;
+                       if (close_req)
+                               cli->cl_close_rpcs_in_flight++;
+                       lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
+                                        cli->cl_mod_rpcs_in_flight);
+                       /* find a free tag */
+                       i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
+                                               max + 1);
+                       LASSERT(i < OBD_MAX_RIF_MAX);
+                       LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
+                       spin_unlock(&cli->cl_mod_rpcs_lock);
+                       /* tag 0 is reserved for non-modify RPCs */
+                       return i + 1;
+               }
+               spin_unlock(&cli->cl_mod_rpcs_lock);
+
+               CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
+                      "opc %u, max %hu\n",
+                      cli->cl_import->imp_obd->obd_name, opc, max);
+
+               l_wait_event(cli->cl_mod_rpcs_waitq,
+                            obd_mod_rpc_slot_avail(cli, close_req), &lwi);
+       } while (true);
+}
+EXPORT_SYMBOL(obd_get_mod_rpc_slot);
+
+/* Put a modify RPC slot from the obd client @cli according
+ * to the kind of operation @opc that has been sent and the
+ * intent @it of the operation if it applies.
+ */
+void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
+                         struct lookup_intent *it, __u16 tag)
+{
+       bool                    close_req = false;
+
+       if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
+                          it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
+               return;
+
+       if (opc == MDS_CLOSE)
+               close_req = true;
+
+       spin_lock(&cli->cl_mod_rpcs_lock);
+       cli->cl_mod_rpcs_in_flight--;
+       if (close_req)
+               cli->cl_close_rpcs_in_flight--;
+       /* release the tag in the bitmap */
+       LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
+       LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
+       spin_unlock(&cli->cl_mod_rpcs_lock);
+       wake_up(&cli->cl_mod_rpcs_waitq);
+}
+EXPORT_SYMBOL(obd_put_mod_rpc_slot);
+
index cb52a4b..d73c12b 100755 (executable)
@@ -21,8 +21,10 @@ GRANT_CHECK_LIST=${GRANT_CHECK_LIST:-""}
 require_dsh_mds || exit 0
 
 # Skip these tests
-# bug number for skipped tests: b=17466/LU-472
-ALWAYS_EXCEPT="                 61d    $REPLAY_SINGLE_EXCEPT"
+# bug number for skipped tests:
+# b=17466/LU-472 : 61d
+# LU-5319 : 53a 53d
+ALWAYS_EXCEPT="61d 53a 53d $REPLAY_SINGLE_EXCEPT"
 # UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT!
 
 case "$(lsb_release -sr)" in   # only disable tests for el7
index dcaeb98..679a528 100644 (file)
@@ -10963,31 +10963,32 @@ test_181() { # bug 22177
 run_test 181 "Test open-unlinked dir ========================"
 
 test_182() {
-       [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
-       # disable MDC RPC lock wouldn't crash client
        local fcount=1000
-       local tcount=4
+       local tcount=10
 
        mkdir -p $DIR/$tdir || error "creating dir $DIR/$tdir"
-#define OBD_FAIL_MDC_RPCS_SEM          0x804
-       $LCTL set_param fail_loc=0x804
 
-       for (( i=0; i < $tcount; i++ )) ; do
+       $LCTL set_param mdc.*.rpc_stats=clear
+
+       for (( i = 0; i < $tcount; i++ )) ; do
                mkdir $DIR/$tdir/$i
+       done
+
+       for (( i = 0; i < $tcount; i++ )) ; do
                createmany -o $DIR/$tdir/$i/f- $fcount &
        done
        wait
 
-       for (( i=0; i < $tcount; i++ )) ; do
+       for (( i = 0; i < $tcount; i++ )) ; do
                unlinkmany $DIR/$tdir/$i/f- $fcount &
        done
        wait
 
-       rm -rf $DIR/$tdir
+       $LCTL get_param mdc.*.rpc_stats
 
-       $LCTL set_param fail_loc=0
+       rm -rf $DIR/$tdir
 }
-run_test 182 "Disable MDC RPCs semaphore wouldn't crash client ================"
+run_test 182 "Test parallel modify metadata operations ================"
 
 test_183() { # LU-2275
        remote_mds_nodsh && skip "remote MDS with nodsh" && return