+
+#define pct(a, b) (b ? a * 100 / b : 0)
+int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
+ struct seq_file *seq)
+{
+ struct timeval now;
+ unsigned long mod_tot = 0, mod_cum;
+ int i;
+
+ do_gettimeofday(&now);
+
+ spin_lock(&cli->cl_mod_rpcs_lock);
+
+ seq_printf(seq, "snapshot_time: %lu.%lu (secs.usecs)\n",
+ now.tv_sec, now.tv_usec);
+ seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
+ cli->cl_mod_rpcs_in_flight);
+
+ seq_printf(seq, "\n\t\t\tmodify\n");
+ seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
+
+ mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
+
+ mod_cum = 0;
+ for (i = 0; i < OBD_HIST_MAX; i++) {
+ unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
+ mod_cum += mod;
+ seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
+ i, mod, pct(mod, mod_tot),
+ pct(mod_cum, mod_tot));
+ if (mod_cum == mod_tot)
+ break;
+ }
+
+ spin_unlock(&cli->cl_mod_rpcs_lock);
+
+ return 0;
+}
+EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
+#undef pct
+
+
+/* The number of modify RPCs sent in parallel is limited
+ * because the server has a finite number of slots per client to
+ * store request result and ensure reply reconstruction when needed.
+ * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
+ * that takes into account server limit and cl_max_rpcs_in_flight
+ * value.
+ * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
+ * one close request is allowed above the maximum.
+ */
+static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
+ bool close_req)
+{
+ bool avail;
+
+ /* A slot is available if
+ * - number of modify RPCs in flight is less than the max
+ * - it's a close RPC and no other close request is in flight
+ */
+ avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
+ (close_req && cli->cl_close_rpcs_in_flight == 0);
+
+ return avail;
+}
+
+static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
+ bool close_req)
+{
+ bool avail;
+
+ spin_lock(&cli->cl_mod_rpcs_lock);
+ avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
+ spin_unlock(&cli->cl_mod_rpcs_lock);
+ return avail;
+}
+
+/* Get a modify RPC slot from the obd client @cli according
+ * to the kind of operation @opc that is going to be sent
+ * and the intent @it of the operation if it applies.
+ * If the maximum number of modify RPCs in flight is reached
+ * the thread is put to sleep.
+ * Returns the tag to be set in the request message. Tag 0
+ * is reserved for non-modifying requests.
+ */
+__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
+ struct lookup_intent *it)
+{
+ struct l_wait_info lwi = LWI_INTR(NULL, NULL);
+ bool close_req = false;
+ __u16 i, max;
+
+ /* read-only metadata RPCs don't consume a slot on MDT
+ * for reply reconstruction
+ */
+ if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
+ it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
+ return 0;
+
+ if (opc == MDS_CLOSE)
+ close_req = true;
+
+ do {
+ spin_lock(&cli->cl_mod_rpcs_lock);
+ max = cli->cl_max_mod_rpcs_in_flight;
+ if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
+ /* there is a slot available */
+ cli->cl_mod_rpcs_in_flight++;
+ if (close_req)
+ cli->cl_close_rpcs_in_flight++;
+ lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
+ cli->cl_mod_rpcs_in_flight);
+ /* find a free tag */
+ i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
+ max + 1);
+ LASSERT(i < OBD_MAX_RIF_MAX);
+ LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
+ spin_unlock(&cli->cl_mod_rpcs_lock);
+ /* tag 0 is reserved for non-modify RPCs */
+ return i + 1;
+ }
+ spin_unlock(&cli->cl_mod_rpcs_lock);
+
+ CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
+ "opc %u, max %hu\n",
+ cli->cl_import->imp_obd->obd_name, opc, max);
+
+ l_wait_event(cli->cl_mod_rpcs_waitq,
+ obd_mod_rpc_slot_avail(cli, close_req), &lwi);
+ } while (true);
+}
+EXPORT_SYMBOL(obd_get_mod_rpc_slot);
+
+/* Put a modify RPC slot from the obd client @cli according
+ * to the kind of operation @opc that has been sent and the
+ * intent @it of the operation if it applies.
+ */
+void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
+ struct lookup_intent *it, __u16 tag)
+{
+ bool close_req = false;
+
+ if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
+ it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
+ return;
+
+ if (opc == MDS_CLOSE)
+ close_req = true;
+
+ spin_lock(&cli->cl_mod_rpcs_lock);
+ cli->cl_mod_rpcs_in_flight--;
+ if (close_req)
+ cli->cl_close_rpcs_in_flight--;
+ /* release the tag in the bitmap */
+ LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
+ LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
+ spin_unlock(&cli->cl_mod_rpcs_lock);
+ wake_up(&cli->cl_mod_rpcs_waitq);
+}
+EXPORT_SYMBOL(obd_put_mod_rpc_slot);
+