Whamcloud - gitweb
LU-5319 mdc: manage number of modify RPCs in flight
[fs/lustre-release.git] / lustre / obdclass / genops.c
index 24b8caa..344bdd9 100644 (file)
@@ -2012,6 +2012,7 @@ int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
 {
        struct obd_connect_data *ocd;
        __u16 maxmodrpcs;
 {
        struct obd_connect_data *ocd;
        __u16 maxmodrpcs;
+       __u16 prev;
 
        if (max > OBD_MAX_RIF_MAX || max < 1)
                return -ERANGE;
 
        if (max > OBD_MAX_RIF_MAX || max < 1)
                return -ERANGE;
@@ -2040,11 +2041,179 @@ int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
                return -ERANGE;
        }
 
                return -ERANGE;
        }
 
+       spin_lock(&cli->cl_mod_rpcs_lock);
+
+       prev = cli->cl_max_mod_rpcs_in_flight;
        cli->cl_max_mod_rpcs_in_flight = max;
 
        cli->cl_max_mod_rpcs_in_flight = max;
 
-       /* will have to wakeup waiters if max has been increased */
+       /* wakeup waiters if limit has been increased */
+       if (cli->cl_max_mod_rpcs_in_flight > prev)
+               wake_up(&cli->cl_mod_rpcs_waitq);
+
+       spin_unlock(&cli->cl_mod_rpcs_lock);
 
        return 0;
 }
 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
 
 
        return 0;
 }
 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
 
+
+#define pct(a, b) (b ? a * 100 / b : 0)
+int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
+                              struct seq_file *seq)
+{
+       struct timeval now;
+       unsigned long mod_tot = 0, mod_cum;
+       int i;
+
+       do_gettimeofday(&now);
+
+       spin_lock(&cli->cl_mod_rpcs_lock);
+
+       seq_printf(seq, "snapshot_time:         %lu.%lu (secs.usecs)\n",
+                  now.tv_sec, now.tv_usec);
+       seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
+                  cli->cl_mod_rpcs_in_flight);
+
+       seq_printf(seq, "\n\t\t\tmodify\n");
+       seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
+
+       mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
+
+       mod_cum = 0;
+       for (i = 0; i < OBD_HIST_MAX; i++) {
+               unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
+               mod_cum += mod;
+               seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
+                                i, mod, pct(mod, mod_tot),
+                                pct(mod_cum, mod_tot));
+               if (mod_cum == mod_tot)
+                       break;
+       }
+
+       spin_unlock(&cli->cl_mod_rpcs_lock);
+
+       return 0;
+}
+EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
+#undef pct
+
+
+/* The number of modify RPCs sent in parallel is limited
+ * because the server has a finite number of slots per client to
+ * store request result and ensure reply reconstruction when needed.
+ * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
+ * that takes into account server limit and cl_max_rpcs_in_flight
+ * value.
+ * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
+ * one close request is allowed above the maximum.
+ */
+static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
+                                                bool close_req)
+{
+       bool avail;
+
+       /* A slot is available if
+        * - number of modify RPCs in flight is less than the max
+        * - it's a close RPC and no other close request is in flight
+        */
+       avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
+               (close_req && cli->cl_close_rpcs_in_flight == 0);
+
+       return avail;
+}
+
+static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
+                                        bool close_req)
+{
+       bool avail;
+
+       spin_lock(&cli->cl_mod_rpcs_lock);
+       avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
+       spin_unlock(&cli->cl_mod_rpcs_lock);
+       return avail;
+}
+
+/* Get a modify RPC slot from the obd client @cli according
+ * to the kind of operation @opc that is going to be sent
+ * and the intent @it of the operation if it applies.
+ * If the maximum number of modify RPCs in flight is reached
+ * the thread is put to sleep.
+ * Returns the tag to be set in the request message. Tag 0
+ * is reserved for non-modifying requests.
+ */
+__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
+                          struct lookup_intent *it)
+{
+       struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
+       bool                    close_req = false;
+       __u16                   i, max;
+
+       /* read-only metadata RPCs don't consume a slot on MDT
+        * for reply reconstruction
+        */
+       if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
+                          it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
+               return 0;
+
+       if (opc == MDS_CLOSE)
+               close_req = true;
+
+       do {
+               spin_lock(&cli->cl_mod_rpcs_lock);
+               max = cli->cl_max_mod_rpcs_in_flight;
+               if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
+                       /* there is a slot available */
+                       cli->cl_mod_rpcs_in_flight++;
+                       if (close_req)
+                               cli->cl_close_rpcs_in_flight++;
+                       lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
+                                        cli->cl_mod_rpcs_in_flight);
+                       /* find a free tag */
+                       i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
+                                               max + 1);
+                       LASSERT(i < OBD_MAX_RIF_MAX);
+                       LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
+                       spin_unlock(&cli->cl_mod_rpcs_lock);
+                       /* tag 0 is reserved for non-modify RPCs */
+                       return i + 1;
+               }
+               spin_unlock(&cli->cl_mod_rpcs_lock);
+
+               CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
+                      "opc %u, max %hu\n",
+                      cli->cl_import->imp_obd->obd_name, opc, max);
+
+               l_wait_event(cli->cl_mod_rpcs_waitq,
+                            obd_mod_rpc_slot_avail(cli, close_req), &lwi);
+       } while (true);
+}
+EXPORT_SYMBOL(obd_get_mod_rpc_slot);
+
+/* Put a modify RPC slot from the obd client @cli according
+ * to the kind of operation @opc that has been sent and the
+ * intent @it of the operation if it applies.
+ */
+void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
+                         struct lookup_intent *it, __u16 tag)
+{
+       bool                    close_req = false;
+
+       if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
+                          it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
+               return;
+
+       if (opc == MDS_CLOSE)
+               close_req = true;
+
+       spin_lock(&cli->cl_mod_rpcs_lock);
+       cli->cl_mod_rpcs_in_flight--;
+       if (close_req)
+               cli->cl_close_rpcs_in_flight--;
+       /* release the tag in the bitmap */
+       LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
+       LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
+       spin_unlock(&cli->cl_mod_rpcs_lock);
+       wake_up(&cli->cl_mod_rpcs_waitq);
+}
+EXPORT_SYMBOL(obd_put_mod_rpc_slot);
+