# define LDLM_ERROR(lock, fmt, a...) ((void)0)
#endif
+/*
+ * Three intentions can be used for the policy functions in
+ * ldlm_processing_policy.
+ *
+ * LDLM_PROCESS_RESCAN:
+ *
+ * It's used when policy functions are called from ldlm_reprocess_queue() to
+ * reprocess the wait & convert list and try to grant locks, blocking ASTs
+ * have already been sent in this situation, completion ASTs need be sent for
+ * the locks being granted.
+ *
+ * LDLM_PROCESS_ENQUEUE:
+ *
+ * It's used when policy functions are called from ldlm_lock_enqueue() to
+ * process the wait & convert list for handling an enqueue request, blocking
+ * ASTs have not been sent yet, so list of conflicting locks would be
+ * collected and ASTs sent.
+ *
+ * LDLM_PROCESS_RECOVERY:
+ *
+ * It's used when policy functions are called from ldlm_reprocess_queue() to
+ * reprocess the wait & convert list when recovery done. In case of blocking
+ * ASTs are lost before recovery, it needs not only to grant locks if
+ * available, but also send blocking ASTs to the locks doesn't have AST sent
+ * flag. Completion ASTs need be sent for the locks being granted.
+ */
+enum ldlm_process_intention {
+ LDLM_PROCESS_RESCAN = 0,
+ LDLM_PROCESS_ENQUEUE = 1,
+ LDLM_PROCESS_RECOVERY = 2,
+};
+
typedef int (*ldlm_processing_policy)(struct ldlm_lock *lock, __u64 *flags,
- int first_enq, enum ldlm_error *err,
+ enum ldlm_process_intention intention,
+ enum ldlm_error *err,
struct list_head *work_list);
/**
void ldlm_lock_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode);
void ldlm_lock_cancel(struct ldlm_lock *lock);
void ldlm_reprocess_all(struct ldlm_resource *res);
-void ldlm_reprocess_all_ns(struct ldlm_namespace *ns);
+void ldlm_reprocess_recovery_done(struct ldlm_namespace *ns);
void ldlm_lock_dump_handle(int level, const struct lustre_handle *lockh);
void ldlm_unlink_lock_skiplist(struct ldlm_lock *req);
}
EXPORT_SYMBOL(ldlm_resource_prolong);
-
-/**
- * Discard all AST work items from list.
- *
- * If for whatever reason we do not want to send ASTs to conflicting locks
- * anymore, disassemble the list with this function.
- */
-static void discard_bl_list(struct list_head *bl_list)
-{
- struct list_head *tmp, *pos;
- ENTRY;
-
- list_for_each_safe(pos, tmp, bl_list) {
- struct ldlm_lock *lock =
- list_entry(pos, struct ldlm_lock, l_bl_ast);
-
- list_del_init(&lock->l_bl_ast);
- LASSERT(ldlm_is_ast_sent(lock));
- ldlm_clear_ast_sent(lock);
- LASSERT(lock->l_bl_ast_run == 0);
- LASSERT(lock->l_blocking_lock);
- LDLM_LOCK_RELEASE(lock->l_blocking_lock);
- lock->l_blocking_lock = NULL;
- LDLM_LOCK_RELEASE(lock);
- }
- EXIT;
-}
-
/**
* Process a granting attempt for extent lock.
* Must be called with ns lock held.
* This function looks for any conflicts for \a lock in the granted or
* waiting queues. The lock is granted if no conflicts are found in
* either queue.
- *
- * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
- * - blocking ASTs have already been sent
- *
- * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
- * - blocking ASTs have not been sent yet, so list of conflicting locks
- * would be collected and ASTs sent.
*/
int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
- int first_enq, enum ldlm_error *err,
- struct list_head *work_list)
+ enum ldlm_process_intention intention,
+ enum ldlm_error *err, struct list_head *work_list)
{
struct ldlm_resource *res = lock->l_resource;
struct list_head rpc_list;
check_res_locked(res);
*err = ELDLM_OK;
- if (!first_enq) {
+ if (intention == LDLM_PROCESS_RESCAN) {
/* Careful observers will note that we don't handle -EWOULDBLOCK
* here, but it's ok for a non-obvious reason -- compat_queue
* can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
RETURN(LDLM_ITER_CONTINUE);
}
+ LASSERT((intention == LDLM_PROCESS_ENQUEUE && work_list == NULL) ||
+ (intention == LDLM_PROCESS_RECOVERY && work_list != NULL));
restart:
contended_locks = 0;
rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
&rpc_list, &contended_locks);
- if (rc < 0)
- GOTO(out, rc); /* lock was destroyed */
- if (rc == 2)
- goto grant;
-
- rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
- &rpc_list, &contended_locks);
- if (rc2 < 0)
- GOTO(out, rc = rc2); /* lock was destroyed */
-
- if (rc + rc2 == 2) {
- grant:
- ldlm_extent_policy(res, lock, flags);
- ldlm_resource_unlink_lock(lock);
- ldlm_grant_lock(lock, NULL);
- } else {
- /* If either of the compat_queue()s returned failure, then we
- * have ASTs to send and must go onto the waiting list.
- *
- * bug 2322: we used to unlink and re-add here, which was a
- * terrible folly -- if we goto restart, we could get
- * re-ordered! Causes deadlock, because ASTs aren't sent! */
- if (list_empty(&lock->l_res_link))
- ldlm_resource_add_lock(res, &res->lr_waiting, lock);
- unlock_res(res);
- rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
- LDLM_WORK_BL_AST);
-
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
- !ns_is_client(ldlm_res_to_ns(res)))
- class_fail_export(lock->l_export);
-
- lock_res(res);
- if (rc == -ERESTART) {
- /* 15715: The lock was granted and destroyed after
- * resource lock was dropped. Interval node was freed
- * in ldlm_lock_destroy. Anyway, this always happens
- * when a client is being evicted. So it would be
- * ok to return an error. -jay */
- if (ldlm_is_destroyed(lock)) {
- *err = -EAGAIN;
- GOTO(out, rc = -EAGAIN);
- }
-
- /* lock was granted while resource was unlocked. */
- if (lock->l_granted_mode == lock->l_req_mode) {
- /* bug 11300: if the lock has been granted,
- * break earlier because otherwise, we will go
- * to restart and ldlm_resource_unlink will be
- * called and it causes the interval node to be
- * freed. Then we will fail at
- * ldlm_extent_add_lock() */
- *flags &= ~LDLM_FL_BLOCKED_MASK;
- GOTO(out, rc = 0);
- }
+ if (rc < 0)
+ GOTO(out_rpc_list, rc);
+
+ rc2 = 0;
+ if (rc != 2) {
+ rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock,
+ flags, err, &rpc_list,
+ &contended_locks);
+ if (rc2 < 0)
+ GOTO(out_rpc_list, rc = rc2);
+ }
+ if (rc + rc2 != 2) {
+ /* Adding LDLM_FL_NO_TIMEOUT flag to granted lock to force
+ * client to wait for the lock endlessly once the lock is
+ * enqueued -bzzz */
+ rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list,
+ LDLM_FL_NO_TIMEOUT);
+ if (rc == -ERESTART)
GOTO(restart, rc);
- }
-
- /* this way we force client to wait for the lock
- * endlessly once the lock is enqueued -bzzz */
- *flags |= LDLM_FL_BLOCK_GRANTED | LDLM_FL_NO_TIMEOUT;
-
+ *err = rc;
+ } else {
+ ldlm_extent_policy(res, lock, flags);
+ ldlm_resource_unlink_lock(lock);
+ ldlm_grant_lock(lock, work_list);
+ rc = 0;
}
- RETURN(0);
-out:
+
+out_rpc_list:
if (!list_empty(&rpc_list)) {
LASSERT(!ldlm_is_ast_discard_data(lock));
- discard_bl_list(&rpc_list);
+ ldlm_discard_bl_list(&rpc_list);
}
RETURN(rc);
}
* This function looks for any conflicts for \a lock in the granted or
* waiting queues. The lock is granted if no conflicts are found in
* either queue.
- *
- * It is also responsible for splitting a lock if a portion of the lock
- * is released.
- *
- * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
- * - blocking ASTs have already been sent
- *
- * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
- * - blocking ASTs have not been sent yet, so list of conflicting locks
- * would be collected and ASTs sent.
*/
int
-ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
+ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
+ enum ldlm_process_intention intention,
enum ldlm_error *err, struct list_head *work_list)
{
struct ldlm_resource *res = req->l_resource;
if (!ldlm_flocks_overlap(lock, req))
continue;
- if (!first_enq) {
+ if (intention != LDLM_PROCESS_ENQUEUE) {
reprocess_failed = 1;
if (ldlm_flock_deadlock(req, lock)) {
ldlm_flock_cancel_on_deadlock(req,
if (*flags != LDLM_FL_WAIT_NOREPROC) {
#ifdef HAVE_SERVER_SUPPORT
- if (first_enq) {
+ if (intention == LDLM_PROCESS_ENQUEUE) {
/* If this is an unlock, reprocess the waitq and
* send completions ASTs for locks that can now be
* granted. The only problem with doing this
* newly granted locks will be sent before the unlock
* completion is sent. It shouldn't be an issue. Also
* note that ldlm_process_flock_lock() will recurse,
- * but only once because first_enq will be false from
- * ldlm_reprocess_queue. */
+ * but only once because 'intention' won't be
+ * LDLM_PROCESS_ENQUEUE from ldlm_reprocess_queue. */
if ((mode == LCK_NL) && overlaps) {
struct list_head rpc_list;
int rc;
INIT_LIST_HEAD(&rpc_list);
restart:
- ldlm_reprocess_queue(res, &res->lr_waiting,
- &rpc_list);
+ ldlm_reprocess_queue(res, &res->lr_waiting,
+ &rpc_list,
+ LDLM_PROCESS_RESCAN);
unlock_res_and_lock(req);
rc = ldlm_run_ast_work(ns, &rpc_list,
* This function looks for any conflicts for \a lock in the granted or
* waiting queues. The lock is granted if no conflicts are found in
* either queue.
- *
- * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
- * - blocking ASTs have already been sent
- *
- * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
- * - blocking ASTs have not been sent yet, so list of conflicting locks
- * would be collected and ASTs sent.
*/
int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
- int first_enq, enum ldlm_error *err,
+ enum ldlm_process_intention intention,
+ enum ldlm_error *err,
struct list_head *work_list)
{
struct ldlm_resource *res = lock->l_resource;
check_res_locked(res);
/* (*flags & LDLM_FL_BLOCK_NOWAIT) is for layout lock right now. */
- if (!first_enq || (*flags & LDLM_FL_BLOCK_NOWAIT)) {
+ if (intention == LDLM_PROCESS_RESCAN ||
+ (*flags & LDLM_FL_BLOCK_NOWAIT)) {
*err = ELDLM_LOCK_ABORTED;
if (*flags & LDLM_FL_BLOCK_NOWAIT)
*err = ELDLM_LOCK_WOULDBLOCK;
RETURN(LDLM_ITER_CONTINUE);
}
+ LASSERT((intention == LDLM_PROCESS_ENQUEUE && work_list == NULL) ||
+ (intention == LDLM_PROCESS_RECOVERY && work_list != NULL));
restart:
rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, &rpc_list);
rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, &rpc_list);
if (rc != 2) {
- /* If either of the compat_queue()s returned 0, then we
- * have ASTs to send and must go onto the waiting list.
- *
- * bug 2322: we used to unlink and re-add here, which was a
- * terrible folly -- if we goto restart, we could get
- * re-ordered! Causes deadlock, because ASTs aren't sent! */
- if (list_empty(&lock->l_res_link))
- ldlm_resource_add_lock(res, &res->lr_waiting, lock);
- unlock_res(res);
- rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
- LDLM_WORK_BL_AST);
- lock_res(res);
- if (rc == -ERESTART) {
- /* We were granted while waiting, nothing left to do */
- if (lock->l_granted_mode == lock->l_req_mode)
- GOTO(out, rc = 0);
- /* Lock was destroyed while we were waiting, abort */
- if (ldlm_is_destroyed(lock))
- GOTO(out, rc = -EAGAIN);
-
- /* Otherwise try again */
+ rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list, 0);
+ if (rc == -ERESTART)
GOTO(restart, rc);
- }
- *flags |= LDLM_FL_BLOCK_GRANTED;
- } else {
- ldlm_resource_unlink_lock(lock);
- ldlm_grant_lock(lock, NULL);
- }
+ *err = rc;
+ } else {
+ ldlm_resource_unlink_lock(lock);
+ ldlm_grant_lock(lock, work_list);
+ rc = 0;
+ }
- rc = 0;
-out:
- *err = rc;
- LASSERT(list_empty(&rpc_list));
+ if (!list_empty(&rpc_list))
+ ldlm_discard_bl_list(&rpc_list);
RETURN(rc);
}
struct list_head *work_list);
#ifdef HAVE_SERVER_SUPPORT
int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
- struct list_head *work_list);
+ struct list_head *work_list,
+ enum ldlm_process_intention intention);
+int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
+ struct list_head *rpc_list, __u64 grant_flags);
+void ldlm_discard_bl_list(struct list_head *bl_list);
#endif
int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
ldlm_desc_ast_t ast_type);
#ifdef HAVE_SERVER_SUPPORT
/* ldlm_plain.c */
int ldlm_process_plain_lock(struct ldlm_lock *lock, __u64 *flags,
- int first_enq, enum ldlm_error *err,
- struct list_head *work_list);
+ enum ldlm_process_intention intention,
+ enum ldlm_error *err, struct list_head *work_list);
/* ldlm_inodebits.c */
int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
- int first_enq, enum ldlm_error *err,
+ enum ldlm_process_intention intention,
+ enum ldlm_error *err,
struct list_head *work_list);
-#endif
-
/* ldlm_extent.c */
-#ifdef HAVE_SERVER_SUPPORT
int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
- int first_enq, enum ldlm_error *err,
- struct list_head *work_list);
+ enum ldlm_process_intention intention,
+ enum ldlm_error *err, struct list_head *work_list);
#endif
void ldlm_extent_add_lock(struct ldlm_resource *res, struct ldlm_lock *lock);
void ldlm_extent_unlink_lock(struct ldlm_lock *lock);
/* ldlm_flock.c */
int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
- int first_enq, enum ldlm_error *err,
- struct list_head *work_list);
+ enum ldlm_process_intention intention,
+ enum ldlm_error *err, struct list_head *work_list);
int ldlm_init_flock_export(struct obd_export *exp);
void ldlm_destroy_flock_export(struct obd_export *exp);
obd->obd_stale_clients == 1 ? "was" : "were");
}
- ldlm_reprocess_all_ns(obd->obd_namespace);
+ ldlm_reprocess_recovery_done(obd->obd_namespace);
spin_lock(&obd->obd_recovery_task_lock);
if (!list_empty(&obd->obd_req_replay_queue) ||
!list_empty(&obd->obd_lock_replay_queue) ||
}
policy = ldlm_processing_policy_table[res->lr_type];
- policy(lock, flags, 1, &rc, NULL);
+ policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, NULL);
GOTO(out, rc);
#else
} else {
* Must be called with resource lock held.
*/
int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
- struct list_head *work_list)
+ struct list_head *work_list,
+ enum ldlm_process_intention intention)
{
struct list_head *tmp, *pos;
ldlm_processing_policy policy;
policy = ldlm_processing_policy_table[res->lr_type];
LASSERT(policy);
+ LASSERT(intention == LDLM_PROCESS_RESCAN ||
+ intention == LDLM_PROCESS_RECOVERY);
list_for_each_safe(tmp, pos, queue) {
struct ldlm_lock *pending;
CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
flags = 0;
- rc = policy(pending, &flags, 0, &err, work_list);
- if (rc != LDLM_ITER_CONTINUE)
- break;
+ rc = policy(pending, &flags, intention, &err, work_list);
+ /*
+ * When this is called from recovery done, we always want
+ * to scan the whole list no matter what 'rc' is returned.
+ */
+ if (rc != LDLM_ITER_CONTINUE &&
+ intention == LDLM_PROCESS_RESCAN)
+ break;
}
- RETURN(rc);
+ RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE);
}
+
+/**
+ * Conflicting locks are detected for a lock to be enqueued, add the lock
+ * into waiting list and send blocking ASTs to the conflicting locks.
+ *
+ * \param[in] lock The lock to be enqueued.
+ * \param[out] flags Lock flags for the lock to be enqueued.
+ * \param[in] rpc_list Conflicting locks list.
+ * \param[in] grant_flags extra flags when granting a lock.
+ *
+ * \retval -ERESTART: Some lock was instantly canceled while sending
+ * blocking ASTs, caller needs to re-check conflicting
+ * locks.
+ * \retval -EAGAIN: Lock was destroyed, caller should return error.
+ * \reval 0: Lock is successfully added in waiting list.
+ */
+int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
+ struct list_head *rpc_list, __u64 grant_flags)
+{
+ struct ldlm_resource *res = lock->l_resource;
+ int rc;
+ ENTRY;
+
+ check_res_locked(res);
+
+ /* If either of the compat_queue()s returned failure, then we
+ * have ASTs to send and must go onto the waiting list.
+ *
+ * bug 2322: we used to unlink and re-add here, which was a
+ * terrible folly -- if we goto restart, we could get
+ * re-ordered! Causes deadlock, because ASTs aren't sent! */
+ if (list_empty(&lock->l_res_link))
+ ldlm_resource_add_lock(res, &res->lr_waiting, lock);
+ unlock_res(res);
+
+ rc = ldlm_run_ast_work(ldlm_res_to_ns(res), rpc_list,
+ LDLM_WORK_BL_AST);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
+ !ns_is_client(ldlm_res_to_ns(res)))
+ class_fail_export(lock->l_export);
+
+ lock_res(res);
+ if (rc == -ERESTART) {
+ /* 15715: The lock was granted and destroyed after
+ * resource lock was dropped. Interval node was freed
+ * in ldlm_lock_destroy. Anyway, this always happens
+ * when a client is being evicted. So it would be
+ * ok to return an error. -jay */
+ if (ldlm_is_destroyed(lock))
+ RETURN(-EAGAIN);
+
+ /* lock was granted while resource was unlocked. */
+ if (lock->l_granted_mode == lock->l_req_mode) {
+ /* bug 11300: if the lock has been granted,
+ * break earlier because otherwise, we will go
+ * to restart and ldlm_resource_unlink will be
+ * called and it causes the interval node to be
+ * freed. Then we will fail at
+ * ldlm_extent_add_lock() */
+ *flags &= ~LDLM_FL_BLOCKED_MASK;
+ RETURN(0);
+ }
+
+ RETURN(rc);
+ }
+ *flags |= (LDLM_FL_BLOCK_GRANTED | grant_flags);
+
+ RETURN(0);
+}
+
+/**
+ * Discard all AST work items from list.
+ *
+ * If for whatever reason we do not want to send ASTs to conflicting locks
+ * anymore, disassemble the list with this function.
+ */
+void ldlm_discard_bl_list(struct list_head *bl_list)
+{
+ struct list_head *tmp, *pos;
+ ENTRY;
+
+ list_for_each_safe(pos, tmp, bl_list) {
+ struct ldlm_lock *lock =
+ list_entry(pos, struct ldlm_lock, l_bl_ast);
+
+ list_del_init(&lock->l_bl_ast);
+ LASSERT(ldlm_is_ast_sent(lock));
+ ldlm_clear_ast_sent(lock);
+ LASSERT(lock->l_bl_ast_run == 0);
+ LASSERT(lock->l_blocking_lock);
+ LDLM_LOCK_RELEASE(lock->l_blocking_lock);
+ lock->l_blocking_lock = NULL;
+ LDLM_LOCK_RELEASE(lock);
+ }
+ EXIT;
+}
+
#endif
/**
return rc;
}
-static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
-{
- ldlm_reprocess_all(res);
- return LDLM_ITER_CONTINUE;
-}
-
-static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd,
- struct hlist_node *hnode, void *arg)
-{
- struct ldlm_resource *res = cfs_hash_object(hs, hnode);
- int rc;
-
- rc = reprocess_one_queue(res, arg);
-
- return rc == LDLM_ITER_STOP;
-}
-
-/**
- * Iterate through all resources on a namespace attempting to grant waiting
- * locks.
- */
-void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
-{
- ENTRY;
-
- if (ns != NULL) {
- cfs_hash_for_each_nolock(ns->ns_rs_hash,
- ldlm_reprocess_res, NULL, 0);
- }
- EXIT;
-}
-
/**
* Try to grant all waiting locks on a resource.
*
* Typically called after some resource locks are cancelled to see
* if anything could be granted as a result of the cancellation.
*/
-void ldlm_reprocess_all(struct ldlm_resource *res)
+static void __ldlm_reprocess_all(struct ldlm_resource *res,
+ enum ldlm_process_intention intention)
{
struct list_head rpc_list;
#ifdef HAVE_SERVER_SUPPORT
atomic_read(&obd->obd_req_replay_clients) == 0)
RETURN_EXIT;
restart:
- lock_res(res);
- rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
- if (rc == LDLM_ITER_CONTINUE)
- ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
- unlock_res(res);
+ lock_res(res);
+ rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list,
+ intention);
+ if (rc == LDLM_ITER_CONTINUE)
+ ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list,
+ intention);
+ unlock_res(res);
rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
LDLM_WORK_CP_AST);
#endif
EXIT;
}
+
+void ldlm_reprocess_all(struct ldlm_resource *res)
+{
+ __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN);
+}
EXPORT_SYMBOL(ldlm_reprocess_all);
+static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+ struct hlist_node *hnode, void *arg)
+{
+ struct ldlm_resource *res = cfs_hash_object(hs, hnode);
+
+ /* This is only called once after recovery done. LU-8306. */
+ __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY);
+ return 0;
+}
+
+/**
+ * Iterate through all resources on a namespace attempting to grant waiting
+ * locks.
+ */
+void ldlm_reprocess_recovery_done(struct ldlm_namespace *ns)
+{
+ ENTRY;
+
+ if (ns != NULL) {
+ cfs_hash_for_each_nolock(ns->ns_rs_hash,
+ ldlm_reprocess_res, NULL, 0);
+ }
+ EXIT;
+}
+
static bool is_bl_done(struct ldlm_lock *lock)
{
bool bl_done = true;
ldlm_processing_policy policy;
policy = ldlm_processing_policy_table[res->lr_type];
- rc = policy(lock, &pflags, 0, &err, &rpc_list);
+ rc = policy(lock, &pflags, LDLM_PROCESS_RESCAN, &err,
+ &rpc_list);
if (rc == LDLM_ITER_STOP) {
lock->l_req_mode = old_mode;
if (res->lr_type == LDLM_EXTENT)
* This function looks for any conflicts for \a lock in the granted or
* waiting queues. The lock is granted if no conflicts are found in
* either queue.
- *
- * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
- * - blocking ASTs have already been sent
- *
- * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
- * - blocking ASTs have not been sent yet, so list of conflicting locks
- * would be collected and ASTs sent.
*/
int ldlm_process_plain_lock(struct ldlm_lock *lock, __u64 *flags,
- int first_enq, enum ldlm_error *err,
- struct list_head *work_list)
+ enum ldlm_process_intention intention,
+ enum ldlm_error *err, struct list_head *work_list)
{
struct ldlm_resource *res = lock->l_resource;
struct list_head rpc_list;
LASSERT(list_empty(&res->lr_converting));
INIT_LIST_HEAD(&rpc_list);
- if (!first_enq) {
+ if (intention == LDLM_PROCESS_RESCAN) {
LASSERT(work_list != NULL);
rc = ldlm_plain_compat_queue(&res->lr_granted, lock, NULL);
if (!rc)
RETURN(LDLM_ITER_CONTINUE);
}
+ LASSERT((intention == LDLM_PROCESS_ENQUEUE && work_list == NULL) ||
+ (intention == LDLM_PROCESS_RECOVERY && work_list != NULL));
restart:
rc = ldlm_plain_compat_queue(&res->lr_granted, lock, &rpc_list);
rc += ldlm_plain_compat_queue(&res->lr_waiting, lock, &rpc_list);
if (rc != 2) {
- /* If either of the compat_queue()s returned 0, then we
- * have ASTs to send and must go onto the waiting list.
- *
- * bug 2322: we used to unlink and re-add here, which was a
- * terrible folly -- if we goto restart, we could get
- * re-ordered! Causes deadlock, because ASTs aren't sent! */
- if (list_empty(&lock->l_res_link))
- ldlm_resource_add_lock(res, &res->lr_waiting, lock);
- unlock_res(res);
- rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
- LDLM_WORK_BL_AST);
- lock_res(res);
- if (rc == -ERESTART) {
- /* We were granted while waiting, nothing left to do */
- if (lock->l_granted_mode == lock->l_req_mode)
- GOTO(out, rc = 0);
- /* Lock was destroyed while we were waiting, abort */
- if (ldlm_is_destroyed(lock))
- GOTO(out, rc = -EAGAIN);
-
- /* Otherwise try again */
+ rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list, 0);
+ if (rc == -ERESTART)
GOTO(restart, rc);
- }
- *flags |= LDLM_FL_BLOCK_GRANTED;
- } else {
- ldlm_resource_unlink_lock(lock);
- ldlm_grant_lock(lock, NULL);
- }
-
- rc = 0;
-out:
- *err = rc;
- LASSERT(list_empty(&rpc_list));
+ *err = rc;
+ } else {
+ ldlm_resource_unlink_lock(lock);
+ ldlm_grant_lock(lock, work_list);
+ rc = 0;
+ }
+
+ if (!list_empty(&rpc_list))
+ ldlm_discard_bl_list(&rpc_list);
RETURN(rc);
}
rc = LDLM_ITER_CONTINUE;
} else {
__u64 tmpflags = 0;
- rc = policy(lock, &tmpflags, 0, &err, NULL);
+ rc = policy(lock, &tmpflags, LDLM_PROCESS_RESCAN, &err, NULL);
check_res_locked(res);
}