+static bool obd_request_slot_avail(struct client_obd *cli,
+ struct obd_request_slot_waiter *orsw)
+{
+ bool avail;
+
+ spin_lock(&cli->cl_loi_list_lock);
+ avail = !!list_empty(&orsw->orsw_entry);
+ spin_unlock(&cli->cl_loi_list_lock);
+
+ return avail;
+};
+
+/*
+ * For network flow control, the RPC sponsor needs to acquire a credit
+ * before sending the RPC. The credits count for a connection is defined
+ * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
+ * the subsequent RPC sponsors need to wait until others released their
+ * credits, or the administrator increased the "cl_max_rpcs_in_flight".
+ */
+int obd_get_request_slot(struct client_obd *cli)
+{
+ struct obd_request_slot_waiter orsw;
+ struct l_wait_info lwi;
+ int rc;
+
+ spin_lock(&cli->cl_loi_list_lock);
+ if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
+ cli->cl_rpcs_in_flight++;
+ spin_unlock(&cli->cl_loi_list_lock);
+ return 0;
+ }
+
+ init_waitqueue_head(&orsw.orsw_waitq);
+ list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
+ orsw.orsw_signaled = false;
+ spin_unlock(&cli->cl_loi_list_lock);
+
+ lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+ rc = l_wait_event(orsw.orsw_waitq,
+ obd_request_slot_avail(cli, &orsw) ||
+ orsw.orsw_signaled,
+ &lwi);
+
+ /* Here, we must take the lock to avoid the on-stack 'orsw' to be
+ * freed but other (such as obd_put_request_slot) is using it. */
+ spin_lock(&cli->cl_loi_list_lock);
+ if (rc != 0) {
+ if (!orsw.orsw_signaled) {
+ if (list_empty(&orsw.orsw_entry))
+ cli->cl_rpcs_in_flight--;
+ else
+ list_del(&orsw.orsw_entry);
+ }
+ }
+
+ if (orsw.orsw_signaled) {
+ LASSERT(list_empty(&orsw.orsw_entry));
+
+ rc = -EINTR;
+ }
+ spin_unlock(&cli->cl_loi_list_lock);
+
+ return rc;
+}
+EXPORT_SYMBOL(obd_get_request_slot);
+
+void obd_put_request_slot(struct client_obd *cli)
+{
+ struct obd_request_slot_waiter *orsw;
+
+ spin_lock(&cli->cl_loi_list_lock);
+ cli->cl_rpcs_in_flight--;
+
+ /* If there is free slot, wakeup the first waiter. */
+ if (!list_empty(&cli->cl_flight_waiters) &&
+ likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
+ orsw = list_entry(cli->cl_flight_waiters.next,
+ struct obd_request_slot_waiter, orsw_entry);
+ list_del_init(&orsw->orsw_entry);
+ cli->cl_rpcs_in_flight++;
+ wake_up(&orsw->orsw_waitq);
+ }
+ spin_unlock(&cli->cl_loi_list_lock);
+}
+EXPORT_SYMBOL(obd_put_request_slot);
+
+__u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
+{
+ return cli->cl_max_rpcs_in_flight;
+}
+EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
+
+int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
+{
+ struct obd_request_slot_waiter *orsw;
+ __u32 old;
+ int diff;
+ int i;
+ char *typ_name;
+ int rc;
+
+ if (max > OBD_MAX_RIF_MAX || max < 1)
+ return -ERANGE;
+
+ typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
+ if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
+ /* adjust max_mod_rpcs_in_flight to ensure it is always
+ * strictly lower that max_rpcs_in_flight */
+ if (max < 2) {
+ CERROR("%s: cannot set max_rpcs_in_flight to 1 "
+ "because it must be higher than "
+ "max_mod_rpcs_in_flight value",
+ cli->cl_import->imp_obd->obd_name);
+ return -ERANGE;
+ }
+ if (max <= cli->cl_max_mod_rpcs_in_flight) {
+ rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
+ if (rc != 0)
+ return rc;
+ }
+ }
+
+ spin_lock(&cli->cl_loi_list_lock);
+ old = cli->cl_max_rpcs_in_flight;
+ cli->cl_max_rpcs_in_flight = max;
+ client_adjust_max_dirty(cli);
+
+ diff = max - old;
+
+ /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
+ for (i = 0; i < diff; i++) {
+ if (list_empty(&cli->cl_flight_waiters))
+ break;
+
+ orsw = list_entry(cli->cl_flight_waiters.next,
+ struct obd_request_slot_waiter, orsw_entry);
+ list_del_init(&orsw->orsw_entry);
+ cli->cl_rpcs_in_flight++;
+ wake_up(&orsw->orsw_waitq);
+ }
+ spin_unlock(&cli->cl_loi_list_lock);
+
+ return 0;
+}
+EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
+
+__u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
+{
+ return cli->cl_max_mod_rpcs_in_flight;
+}
+EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
+
+int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
+{
+ struct obd_connect_data *ocd;
+ __u16 maxmodrpcs;
+ __u16 prev;
+
+ if (max > OBD_MAX_RIF_MAX || max < 1)
+ return -ERANGE;
+
+ /* cannot exceed or equal max_rpcs_in_flight */
+ if (max >= cli->cl_max_rpcs_in_flight) {
+ CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
+ "higher or equal to max_rpcs_in_flight value (%u)\n",
+ cli->cl_import->imp_obd->obd_name,
+ max, cli->cl_max_rpcs_in_flight);
+ return -ERANGE;
+ }
+
+ /* cannot exceed max modify RPCs in flight supported by the server */
+ ocd = &cli->cl_import->imp_connect_data;
+ if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
+ maxmodrpcs = ocd->ocd_maxmodrpcs;
+ else
+ maxmodrpcs = 1;
+ if (max > maxmodrpcs) {
+ CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
+ "higher than max_mod_rpcs_per_client value (%hu) "
+ "returned by the server at connection\n",
+ cli->cl_import->imp_obd->obd_name,
+ max, maxmodrpcs);
+ return -ERANGE;
+ }
+
+ spin_lock(&cli->cl_mod_rpcs_lock);
+
+ prev = cli->cl_max_mod_rpcs_in_flight;
+ cli->cl_max_mod_rpcs_in_flight = max;
+
+ /* wakeup waiters if limit has been increased */
+ if (cli->cl_max_mod_rpcs_in_flight > prev)
+ wake_up(&cli->cl_mod_rpcs_waitq);
+
+ spin_unlock(&cli->cl_mod_rpcs_lock);
+
+ return 0;
+}
+EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
+
+int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
+ struct seq_file *seq)
+{
+ unsigned long mod_tot = 0, mod_cum;
+ struct timespec64 now;
+ int i;
+
+ ktime_get_real_ts64(&now);
+
+ spin_lock(&cli->cl_mod_rpcs_lock);
+
+ seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
+ (s64)now.tv_sec, now.tv_nsec);
+ seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
+ cli->cl_mod_rpcs_in_flight);
+
+ seq_printf(seq, "\n\t\t\tmodify\n");
+ seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
+
+ mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
+
+ mod_cum = 0;
+ for (i = 0; i < OBD_HIST_MAX; i++) {
+ unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
+ mod_cum += mod;
+ seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
+ i, mod, pct(mod, mod_tot),
+ pct(mod_cum, mod_tot));
+ if (mod_cum == mod_tot)
+ break;
+ }
+
+ spin_unlock(&cli->cl_mod_rpcs_lock);
+
+ return 0;
+}
+EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
+
+/* The number of modify RPCs sent in parallel is limited
+ * because the server has a finite number of slots per client to
+ * store request result and ensure reply reconstruction when needed.
+ * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
+ * that takes into account server limit and cl_max_rpcs_in_flight
+ * value.
+ * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
+ * one close request is allowed above the maximum.
+ */
+static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
+ bool close_req)
+{
+ bool avail;
+
+ /* A slot is available if
+ * - number of modify RPCs in flight is less than the max
+ * - it's a close RPC and no other close request is in flight
+ */
+ avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
+ (close_req && cli->cl_close_rpcs_in_flight == 0);
+
+ return avail;
+}
+
+static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
+ bool close_req)
+{
+ bool avail;
+
+ spin_lock(&cli->cl_mod_rpcs_lock);
+ avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
+ spin_unlock(&cli->cl_mod_rpcs_lock);
+ return avail;
+}
+
+static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
+{
+ if (it != NULL &&
+ (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
+ it->it_op == IT_READDIR ||
+ (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
+ return true;
+ return false;
+}
+
+/* Get a modify RPC slot from the obd client @cli according
+ * to the kind of operation @opc that is going to be sent
+ * and the intent @it of the operation if it applies.
+ * If the maximum number of modify RPCs in flight is reached
+ * the thread is put to sleep.
+ * Returns the tag to be set in the request message. Tag 0
+ * is reserved for non-modifying requests.
+ */
+__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
+ struct lookup_intent *it)
+{
+ struct l_wait_info lwi = LWI_INTR(NULL, NULL);
+ bool close_req = false;
+ __u16 i, max;
+
+ /* read-only metadata RPCs don't consume a slot on MDT
+ * for reply reconstruction
+ */
+ if (obd_skip_mod_rpc_slot(it))
+ return 0;
+
+ if (opc == MDS_CLOSE)
+ close_req = true;
+
+ do {
+ spin_lock(&cli->cl_mod_rpcs_lock);
+ max = cli->cl_max_mod_rpcs_in_flight;
+ if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
+ /* there is a slot available */
+ cli->cl_mod_rpcs_in_flight++;
+ if (close_req)
+ cli->cl_close_rpcs_in_flight++;
+ lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
+ cli->cl_mod_rpcs_in_flight);
+ /* find a free tag */
+ i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
+ max + 1);
+ LASSERT(i < OBD_MAX_RIF_MAX);
+ LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
+ spin_unlock(&cli->cl_mod_rpcs_lock);
+ /* tag 0 is reserved for non-modify RPCs */
+ return i + 1;
+ }
+ spin_unlock(&cli->cl_mod_rpcs_lock);
+
+ CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
+ "opc %u, max %hu\n",
+ cli->cl_import->imp_obd->obd_name, opc, max);
+
+ l_wait_event_exclusive(cli->cl_mod_rpcs_waitq,
+ obd_mod_rpc_slot_avail(cli, close_req),
+ &lwi);
+ } while (true);
+}
+EXPORT_SYMBOL(obd_get_mod_rpc_slot);
+
+/* Put a modify RPC slot from the obd client @cli according
+ * to the kind of operation @opc that has been sent and the
+ * intent @it of the operation if it applies.
+ */
+void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
+ struct lookup_intent *it, __u16 tag)
+{
+ bool close_req = false;
+
+ if (obd_skip_mod_rpc_slot(it))
+ return;
+
+ if (opc == MDS_CLOSE)
+ close_req = true;
+
+ spin_lock(&cli->cl_mod_rpcs_lock);
+ cli->cl_mod_rpcs_in_flight--;
+ if (close_req)
+ cli->cl_close_rpcs_in_flight--;
+ /* release the tag in the bitmap */
+ LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
+ LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
+ spin_unlock(&cli->cl_mod_rpcs_lock);
+ wake_up(&cli->cl_mod_rpcs_waitq);
+}
+EXPORT_SYMBOL(obd_put_mod_rpc_slot);