return -ERANGE;
}
- spin_lock(&cli->cl_mod_rpcs_lock);
+ spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
prev = cli->cl_max_mod_rpcs_in_flight;
cli->cl_max_mod_rpcs_in_flight = max;
/* wakeup waiters if limit has been increased */
if (cli->cl_max_mod_rpcs_in_flight > prev)
- wake_up(&cli->cl_mod_rpcs_waitq);
+ wake_up_locked(&cli->cl_mod_rpcs_waitq);
- spin_unlock(&cli->cl_mod_rpcs_lock);
+ spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
return 0;
}
unsigned long mod_tot = 0, mod_cum;
int i;
- spin_lock(&cli->cl_mod_rpcs_lock);
+ spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
lprocfs_stats_header(seq, ktime_get_real(), cli->cl_mod_rpcs_init, 25,
":", true, "");
seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
break;
}
- spin_unlock(&cli->cl_mod_rpcs_lock);
+ spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
return 0;
}
* On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
* one close request is allowed above the maximum.
*/
-static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
- bool close_req)
+struct mod_waiter {
+ struct client_obd *cli;
+ bool close_req;
+ wait_queue_entry_t wqe;
+};
+static int claim_mod_rpc_function(wait_queue_entry_t *wq_entry,
+ unsigned int mode, int flags, void *key)
{
+ struct mod_waiter *w = container_of(wq_entry, struct mod_waiter, wqe);
+ struct client_obd *cli = w->cli;
+ bool close_req = w->close_req;
bool avail;
+ int ret;
+
+ /* As woken_wake_function() doesn't remove us from the wait_queue,
+ * we could get called twice for the same thread - take care.
+ */
+ if (wq_entry->flags & WQ_FLAG_WOKEN)
+ /* Already woke this thread, don't try again */
+ return 0;
/* A slot is available if
* - number of modify RPCs in flight is less than the max
*/
avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
(close_req && cli->cl_close_rpcs_in_flight == 0);
-
- return avail;
-}
-
-static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
- bool close_req)
-{
- bool avail;
-
- spin_lock(&cli->cl_mod_rpcs_lock);
- avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
- spin_unlock(&cli->cl_mod_rpcs_lock);
- return avail;
+ if (avail) {
+ cli->cl_mod_rpcs_in_flight++;
+ if (w->close_req)
+ cli->cl_close_rpcs_in_flight++;
+ ret = woken_wake_function(wq_entry, mode, flags, key);
+ } else if (cli->cl_close_rpcs_in_flight)
+ /* No other waiter could be woken */
+ ret = -1;
+ else if (key == NULL)
+ /* This was not a wakeup from a close completion, so there is no
+ * point seeing if there are close waiters to be woken
+ */
+ ret = -1;
+ else
+ /* There might be be a close we could wake, keep looking */
+ ret = 0;
+ return ret;
}
-
/* Get a modify RPC slot from the obd client @cli according
* to the kind of operation @opc that is going to be sent
* and the intent @it of the operation if it applies.
*/
__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
{
- bool close_req = false;
+ struct mod_waiter wait = {
+ .cli = cli,
+ .close_req = (opc == MDS_CLOSE),
+ };
__u16 i, max;
- if (opc == MDS_CLOSE)
- close_req = true;
-
- do {
- spin_lock(&cli->cl_mod_rpcs_lock);
- max = cli->cl_max_mod_rpcs_in_flight;
- if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
- /* there is a slot available */
- cli->cl_mod_rpcs_in_flight++;
- if (close_req)
- cli->cl_close_rpcs_in_flight++;
- lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
- cli->cl_mod_rpcs_in_flight);
- /* find a free tag */
- i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
- max + 1);
- LASSERT(i < OBD_MAX_RIF_MAX);
- LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
- spin_unlock(&cli->cl_mod_rpcs_lock);
- /* tag 0 is reserved for non-modify RPCs */
-
- CDEBUG(D_RPCTRACE,
- "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
- cli->cl_import->imp_obd->obd_name,
- i + 1, opc, max);
-
- return i + 1;
- }
- spin_unlock(&cli->cl_mod_rpcs_lock);
-
- CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
- "opc %u, max %hu\n",
- cli->cl_import->imp_obd->obd_name, opc, max);
+ init_wait(&wait.wqe);
+ wait.wqe.func = claim_mod_rpc_function;
- wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
- obd_mod_rpc_slot_avail(cli,
- close_req));
- } while (true);
+ spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
+ __add_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
+ /* This wakeup will only succeed if the maximums haven't
+ * been reached. If that happens, WQ_FLAG_WOKEN will be cleared
+ * and there will be no need to wait.
+ */
+ wake_up_locked(&cli->cl_mod_rpcs_waitq);
+ if (!(wait.wqe.flags & WQ_FLAG_WOKEN)) {
+ spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
+ wait_woken(&wait.wqe, TASK_UNINTERRUPTIBLE,
+ MAX_SCHEDULE_TIMEOUT);
+ spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
+ }
+ __remove_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
+
+ max = cli->cl_max_mod_rpcs_in_flight;
+ lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
+ cli->cl_mod_rpcs_in_flight);
+ /* find a free tag */
+ i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
+ max + 1);
+ LASSERT(i < OBD_MAX_RIF_MAX);
+ LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
+ spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
+ /* tag 0 is reserved for non-modify RPCs */
+
+ CDEBUG(D_RPCTRACE,
+ "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
+ cli->cl_import->imp_obd->obd_name,
+ i + 1, opc, max);
+
+ return i + 1;
}
EXPORT_SYMBOL(obd_get_mod_rpc_slot);
if (opc == MDS_CLOSE)
close_req = true;
- spin_lock(&cli->cl_mod_rpcs_lock);
+ spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
cli->cl_mod_rpcs_in_flight--;
if (close_req)
cli->cl_close_rpcs_in_flight--;
/* release the tag in the bitmap */
LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
- spin_unlock(&cli->cl_mod_rpcs_lock);
- /* LU-14741 - to prevent close RPCs stuck behind normal ones */
- if (close_req)
- wake_up_all(&cli->cl_mod_rpcs_waitq);
- else
- wake_up(&cli->cl_mod_rpcs_waitq);
+ __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
+ (void *)close_req);
+ spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
}
EXPORT_SYMBOL(obd_put_mod_rpc_slot);
-