return 1;
}
+static void ldlm_add_blocked_lock(struct ldlm_lock *lock)
+{
+ spin_lock_bh(&lock->l_export->exp_bl_list_lock);
+ if (list_empty(&lock->l_exp_list)) {
+ if (lock->l_granted_mode != lock->l_req_mode)
+ list_add_tail(&lock->l_exp_list,
+ &lock->l_export->exp_bl_list);
+ else
+ list_add(&lock->l_exp_list,
+ &lock->l_export->exp_bl_list);
+ }
+ spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
+
+ /* A blocked lock is added. Adjust the position in
+ * the stale list if the export is in the list.
+ * If export is stale and not in the list - it is being
+ * processed and will be placed on the right position
+ * on obd_stale_export_put(). */
+ if (!list_empty(&lock->l_export->exp_stale_list))
+ obd_stale_export_adjust(lock->l_export);
+}
+
static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
{
int ret;
}
spin_unlock_bh(&waiting_locks_spinlock);
- if (ret) {
- spin_lock_bh(&lock->l_export->exp_bl_list_lock);
- if (list_empty(&lock->l_exp_list))
- list_add(&lock->l_exp_list,
- &lock->l_export->exp_bl_list);
- spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
- }
+ if (ret)
+ ldlm_add_blocked_lock(lock);
LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
ret == 0 ? "not re-" : "", timeout,
req->rq_interpret_reply = ldlm_cb_interpret;
lock_res_and_lock(lock);
- if (lock->l_granted_mode != lock->l_req_mode) {
- /* this blocking AST will be communicated as part of the
- * completion AST instead */
+ if (ldlm_is_destroyed(lock)) {
+ /* What's the point? */
unlock_res_and_lock(lock);
-
ptlrpc_req_finished(req);
- LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
RETURN(0);
}
- if (ldlm_is_destroyed(lock)) {
- /* What's the point? */
+ if (lock->l_granted_mode != lock->l_req_mode) {
+ /* this blocking AST will be communicated as part of the
+ * completion AST instead */
+ ldlm_add_blocked_lock(lock);
+ ldlm_set_waited(lock);
unlock_res_and_lock(lock);
+
ptlrpc_req_finished(req);
+ LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
RETURN(0);
}
ldlm_lock2desc(lock, &dlm_rep->lock_desc);
ldlm_lock2handle(lock, &dlm_rep->lock_handle);
+ if (lock && lock->l_resource->lr_type == LDLM_EXTENT)
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6);
+
/* We never send a blocking AST until the lock is granted, but
* we can tell it right now */
lock_res_and_lock(lock);
}
if (rc != 0) {
- lock_res_and_lock(lock);
- ldlm_resource_unlink_lock(lock);
- ldlm_lock_destroy_nolock(lock);
- unlock_res_and_lock(lock);
- }
+ if (lock->l_export) {
+ ldlm_lock_cancel(lock);
+ } else {
+ lock_res_and_lock(lock);
+ ldlm_resource_unlink_lock(lock);
+ ldlm_lock_destroy_nolock(lock);
+ unlock_res_and_lock(lock);
+
+ }
+ }
if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
ldlm_reprocess_all(lock->l_resource);
return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
}
+int ldlm_bl_thread_wakeup(void)
+{
+ wake_up(&ldlm_state->ldlm_bl_pool->blp_waitq);
+ return 0;
+}
+
/* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
static int ldlm_handle_setinfo(struct ptlrpc_request *req)
{
req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
CDEBUG(D_INODE, "cancel\n");
if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET) ||
- CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))
+ CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND) ||
+ CFS_FAIL_CHECK(OBD_FAIL_LDLM_BL_EVICT))
RETURN(0);
rc = ldlm_handle_cancel(req);
if (rc)
EXPORT_SYMBOL(ldlm_revoke_export_locks);
#endif /* HAVE_SERVER_SUPPORT */
-static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
+static int ldlm_bl_get_work(struct ldlm_bl_pool *blp,
+ struct ldlm_bl_work_item **p_blwi,
+ struct obd_export **p_exp)
{
struct ldlm_bl_work_item *blwi = NULL;
static unsigned int num_bl = 0;
+ static unsigned int num_stale;
+ int num_th = atomic_read(&blp->blp_num_threads);
+
+ *p_exp = obd_stale_export_get();
spin_lock(&blp->blp_lock);
+ if (*p_exp != NULL) {
+ if (num_th == 1 || ++num_stale < num_th) {
+ spin_unlock(&blp->blp_lock);
+ return 1;
+ } else {
+ num_stale = 0;
+ }
+ }
+
/* process a request from the blp_list at least every blp_num_threads */
if (!list_empty(&blp->blp_list) &&
(list_empty(&blp->blp_prio_list) || num_bl == 0))
blwi_entry);
if (blwi) {
- if (++num_bl >= atomic_read(&blp->blp_num_threads))
+ if (++num_bl >= num_th)
num_bl = 0;
list_del(&blwi->blwi_entry);
}
spin_unlock(&blp->blp_lock);
+ *p_blwi = blwi;
- return blwi;
+ if (*p_exp != NULL && *p_blwi != NULL) {
+ obd_stale_export_put(*p_exp);
+ *p_exp = NULL;
+ }
+
+ return (*p_blwi != NULL || *p_exp != NULL) ? 1 : 0;
}
/* This only contains temporary data until the thread starts */
return 0;
}
+/* Not fatal if racy and have a few too many threads */
+static int ldlm_bl_thread_need_create(struct ldlm_bl_pool *blp,
+ struct ldlm_bl_work_item *blwi)
+{
+ int busy = atomic_read(&blp->blp_busy_threads);
+
+ if (busy >= blp->blp_max_threads)
+ return 0;
+
+ if (busy < atomic_read(&blp->blp_num_threads))
+ return 0;
+
+ if (blwi != NULL && (blwi->blwi_ns == NULL ||
+ blwi->blwi_mem_pressure))
+ return 0;
+
+ return 1;
+}
+
+static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp,
+ struct ldlm_bl_work_item *blwi)
+{
+ ENTRY;
+
+ if (blwi->blwi_ns == NULL)
+ /* added by ldlm_cleanup() */
+ RETURN(LDLM_ITER_STOP);
+
+ if (blwi->blwi_mem_pressure)
+ memory_pressure_set();
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
+
+ if (blwi->blwi_count) {
+ int count;
+ /* The special case when we cancel locks in lru
+ * asynchronously, we pass the list of locks here.
+ * Thus locks are marked LDLM_FL_CANCELING, but NOT
+ * canceled locally yet. */
+ count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
+ blwi->blwi_count,
+ LCF_BL_AST);
+ ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
+ blwi->blwi_flags);
+ } else {
+ ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
+ blwi->blwi_lock);
+ }
+ if (blwi->blwi_mem_pressure)
+ memory_pressure_clr();
+
+ if (blwi->blwi_flags & LCF_ASYNC)
+ OBD_FREE(blwi, sizeof(*blwi));
+ else
+ complete(&blwi->blwi_comp);
+
+ RETURN(0);
+}
+
+/**
+ * Cancel stale locks on export. Cancel blocked locks first.
+ * If the given export has blocked locks, the next in the list may have
+ * them too, thus cancel not blocked locks only if the current export has
+ * no blocked locks.
+ **/
+static int ldlm_bl_thread_exports(struct ldlm_bl_pool *blp,
+ struct obd_export *exp)
+{
+ int num;
+ ENTRY;
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 4);
+
+ num = ldlm_export_cancel_blocked_locks(exp);
+ if (num == 0)
+ ldlm_export_cancel_locks(exp);
+
+ obd_stale_export_put(exp);
+
+ RETURN(0);
+}
+
+
/**
* Main blocking requests processing thread.
*
while (1) {
struct l_wait_info lwi = { 0 };
struct ldlm_bl_work_item *blwi = NULL;
- int busy;
+ struct obd_export *exp = NULL;
+ int rc;
- blwi = ldlm_bl_get_work(blp);
+ rc = ldlm_bl_get_work(blp, &blwi, &exp);
- if (blwi == NULL) {
+ if (rc == 0) {
atomic_dec(&blp->blp_busy_threads);
l_wait_event_exclusive(blp->blp_waitq,
- (blwi = ldlm_bl_get_work(blp)) != NULL,
- &lwi);
- busy = atomic_inc_return(&blp->blp_busy_threads);
- } else {
- busy = atomic_read(&blp->blp_busy_threads);
+ ldlm_bl_get_work(blp, &blwi,
+ &exp),
+ &lwi);
+ atomic_inc(&blp->blp_busy_threads);
}
- if (blwi->blwi_ns == NULL)
- /* added by ldlm_cleanup() */
- break;
-
- /* Not fatal if racy and have a few too many threads */
- if (unlikely(busy < blp->blp_max_threads &&
- busy >= atomic_read(&blp->blp_num_threads) &&
- !blwi->blwi_mem_pressure))
+ if (ldlm_bl_thread_need_create(blp, blwi))
/* discard the return value, we tried */
ldlm_bl_thread_start(blp);
- if (blwi->blwi_mem_pressure)
- memory_pressure_set();
-
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
-
- if (blwi->blwi_count) {
- int count;
- /* The special case when we cancel locks in LRU
- * asynchronously, we pass the list of locks here.
- * Thus locks are marked LDLM_FL_CANCELING, but NOT
- * canceled locally yet. */
- count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
- blwi->blwi_count,
- LCF_BL_AST);
- ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
- blwi->blwi_flags);
- } else {
- ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
- blwi->blwi_lock);
- }
- if (blwi->blwi_mem_pressure)
- memory_pressure_clr();
+ if (exp)
+ rc = ldlm_bl_thread_exports(blp, exp);
+ else if (blwi)
+ rc = ldlm_bl_thread_blwi(blp, blwi);
- if (blwi->blwi_flags & LCF_ASYNC)
- OBD_FREE(blwi, sizeof(*blwi));
- else
- complete(&blwi->blwi_comp);
+ if (rc == LDLM_ITER_STOP)
+ break;
}
atomic_dec(&blp->blp_busy_threads);