Whamcloud - gitweb
LU-4687 obdclass: unified flow control interfaces
[fs/lustre-release.git] / lustre / obdclass / genops.c
index 98dbcce..d7c541a 100644 (file)
@@ -1934,5 +1934,132 @@ inline void kuc_free(void *p, int payload_len)
 }
 EXPORT_SYMBOL(kuc_free);
 
+struct obd_request_slot_waiter {
+       struct list_head        orsw_entry;
+       wait_queue_head_t       orsw_waitq;
+       bool                    orsw_signaled;
+};
+
+static bool obd_request_slot_avail(struct client_obd *cli,
+                                  struct obd_request_slot_waiter *orsw)
+{
+       bool avail;
+
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       avail = !!list_empty(&orsw->orsw_entry);
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       return avail;
+};
 
+/*
+ * For network flow control, the RPC sponsor needs to acquire a credit
+ * before sending the RPC. The credits count for a connection is defined
+ * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
+ * the subsequent RPC sponsors need to wait until others released their
+ * credits, or the administrator increased the "cl_max_rpcs_in_flight".
+ */
+int obd_get_request_slot(struct client_obd *cli)
+{
+       struct obd_request_slot_waiter   orsw;
+       struct l_wait_info               lwi;
+       int                              rc;
 
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
+               cli->cl_r_in_flight++;
+               client_obd_list_unlock(&cli->cl_loi_list_lock);
+               return 0;
+       }
+
+       init_waitqueue_head(&orsw.orsw_waitq);
+       list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
+       orsw.orsw_signaled = false;
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+       rc = l_wait_event(orsw.orsw_waitq,
+                         obd_request_slot_avail(cli, &orsw) ||
+                         orsw.orsw_signaled,
+                         &lwi);
+
+       /* Here, we must take the lock to avoid the on-stack 'orsw' to be
+        * freed but other (such as obd_put_request_slot) is using it. */
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       if (rc != 0) {
+               if (!orsw.orsw_signaled) {
+                       if (list_empty(&orsw.orsw_entry))
+                               cli->cl_r_in_flight--;
+                       else
+                               list_del(&orsw.orsw_entry);
+               }
+       }
+
+       if (orsw.orsw_signaled) {
+               LASSERT(list_empty(&orsw.orsw_entry));
+
+               rc = -EINTR;
+       }
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       return rc;
+}
+EXPORT_SYMBOL(obd_get_request_slot);
+
+void obd_put_request_slot(struct client_obd *cli)
+{
+       struct obd_request_slot_waiter *orsw;
+
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       cli->cl_r_in_flight--;
+
+       /* If there is free slot, wakeup the first waiter. */
+       if (!list_empty(&cli->cl_loi_read_list) &&
+           likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
+               orsw = list_entry(cli->cl_loi_read_list.next,
+                                 struct obd_request_slot_waiter, orsw_entry);
+               list_del_init(&orsw->orsw_entry);
+               cli->cl_r_in_flight++;
+               wake_up(&orsw->orsw_waitq);
+       }
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+}
+EXPORT_SYMBOL(obd_put_request_slot);
+
+__u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
+{
+       return cli->cl_max_rpcs_in_flight;
+}
+EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
+
+int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
+{
+       struct obd_request_slot_waiter *orsw;
+       __u32                           old;
+       int                             diff;
+       int                             i;
+
+       if (max > OBD_MAX_RIF_MAX || max < 1)
+               return -ERANGE;
+
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       old = cli->cl_max_rpcs_in_flight;
+       cli->cl_max_rpcs_in_flight = max;
+       diff = max - old;
+
+       /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
+       for (i = 0; i < diff; i++) {
+               if (list_empty(&cli->cl_loi_read_list))
+                       break;
+
+               orsw = list_entry(cli->cl_loi_read_list.next,
+                                 struct obd_request_slot_waiter, orsw_entry);
+               list_del_init(&orsw->orsw_entry);
+               cli->cl_r_in_flight++;
+               wake_up(&orsw->orsw_waitq);
+       }
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       return 0;
+}
+EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);