return "getattr";
case IT_LOOKUP:
return "lookup";
- case IT_UNLINK:
- return "unlink";
case IT_GETXATTR:
return "getxattr";
case IT_LAYOUT:
struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
+ if (ns->ns_last_pos == &lock->l_lru)
+ ns->ns_last_pos = lock->l_lru.prev;
list_del_init(&lock->l_lru);
LASSERT(ns->ns_nr_unused > 0);
ns->ns_nr_unused--;
LASSERT(list_empty(&lock->l_lru));
LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
list_add_tail(&lock->l_lru, &ns->ns_unused_list);
- ldlm_clear_skipped(lock);
LASSERT(ns->ns_nr_unused >= 0);
ns->ns_nr_unused++;
}
lu_ref_init(&lock->l_reference);
lu_ref_add(&lock->l_reference, "hash", lock);
lock->l_callback_timeout = 0;
+ lock->l_activity = 0;
#if LUSTRE_TRACKS_LOCK_EXP_REFS
INIT_LIST_HEAD(&lock->l_exp_refs_link);
} else if (ns_is_client(ns) &&
!lock->l_readers && !lock->l_writers &&
!ldlm_is_no_lru(lock) &&
- !ldlm_is_bl_ast(lock)) {
+ !ldlm_is_bl_ast(lock) &&
+ !ldlm_is_converting(lock)) {
LDLM_DEBUG(lock, "add lock into lru list");
RETURN(ERR_PTR(rc));
}
+#ifdef HAVE_SERVER_SUPPORT
+static enum ldlm_error ldlm_lock_enqueue_helper(struct ldlm_lock *lock,
+ __u64 *flags)
+{
+ struct ldlm_resource *res = lock->l_resource;
+ enum ldlm_error rc = ELDLM_OK;
+ struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+ ldlm_processing_policy policy;
+ ENTRY;
+
+ policy = ldlm_processing_policy_table[res->lr_type];
+restart:
+ policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, &rpc_list);
+ if (rc == ELDLM_OK && lock->l_granted_mode != lock->l_req_mode &&
+ res->lr_type != LDLM_FLOCK) {
+ rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list);
+ if (rc == -ERESTART)
+ GOTO(restart, rc);
+ }
+
+ if (!list_empty(&rpc_list))
+ ldlm_discard_bl_list(&rpc_list);
+
+ RETURN(rc);
+}
+#endif
+
/**
* Enqueue (request) a lock.
*
struct ldlm_lock *lock = *lockp;
struct ldlm_resource *res = lock->l_resource;
int local = ns_is_client(ldlm_res_to_ns(res));
-#ifdef HAVE_SERVER_SUPPORT
- ldlm_processing_policy policy;
-#endif
enum ldlm_error rc = ELDLM_OK;
struct ldlm_interval *node = NULL;
ENTRY;
/* If no flags, fall through to normal enqueue path. */
}
- policy = ldlm_processing_policy_table[res->lr_type];
- policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, NULL);
+ rc = ldlm_lock_enqueue_helper(lock, flags);
GOTO(out, rc);
#else
} else {
__u64 flags;
int rc = LDLM_ITER_CONTINUE;
enum ldlm_error err;
+ struct list_head bl_ast_list = LIST_HEAD_INIT(bl_ast_list);
ENTRY;
check_res_locked(res);
LASSERT(intention == LDLM_PROCESS_RESCAN ||
intention == LDLM_PROCESS_RECOVERY);
+restart:
list_for_each_safe(tmp, pos, queue) {
struct ldlm_lock *pending;
+ struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
pending = list_entry(tmp, struct ldlm_lock, l_res_link);
CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
flags = 0;
- rc = policy(pending, &flags, intention, &err, work_list);
+ rc = policy(pending, &flags, intention, &err, &rpc_list);
+ if (pending->l_granted_mode == pending->l_req_mode ||
+ res->lr_type == LDLM_FLOCK) {
+ list_splice(&rpc_list, work_list);
+ } else {
+ list_splice(&rpc_list, &bl_ast_list);
+ }
/*
* When this is called from recovery done, we always want
* to scan the whole list no matter what 'rc' is returned.
break;
}
+ if (!list_empty(&bl_ast_list)) {
+ unlock_res(res);
+
+ rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &bl_ast_list,
+ LDLM_WORK_BL_AST);
+
+ lock_res(res);
+ if (rc == -ERESTART)
+ GOTO(restart, rc);
+ }
+
+ if (!list_empty(&bl_ast_list))
+ ldlm_discard_bl_list(&bl_ast_list);
+
RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE);
}
* \param[in] lock The lock to be enqueued.
* \param[out] flags Lock flags for the lock to be enqueued.
* \param[in] rpc_list Conflicting locks list.
- * \param[in] grant_flags extra flags when granting a lock.
*
* \retval -ERESTART: Some lock was instantly canceled while sending
* blocking ASTs, caller needs to re-check conflicting
* \reval 0: Lock is successfully added in waiting list.
*/
int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
- struct list_head *rpc_list, __u64 grant_flags)
+ struct list_head *rpc_list)
{
struct ldlm_resource *res = lock->l_resource;
int rc;
RETURN(rc);
}
- *flags |= (LDLM_FL_BLOCK_GRANTED | grant_flags);
+ *flags |= LDLM_FL_BLOCK_GRANTED;
RETURN(0);
}
unlock_res_and_lock(lock);
ldlm_lock2desc(lock->l_blocking_lock, &d);
+ /* copy blocking lock ibits in cancel_bits as well,
+ * new client may use them for lock convert and it is
+ * important to use new field to convert locks from
+ * new servers only
+ */
+ d.l_policy_data.l_inodebits.cancel_bits =
+ lock->l_blocking_lock->l_policy_data.l_inodebits.bits;
rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
LDLM_LOCK_RELEASE(lock->l_blocking_lock);
* talking to me first. -phik */
if (lock->l_readers || lock->l_writers) {
LDLM_ERROR(lock, "lock still has references");
+ unlock_res_and_lock(lock);
LBUG();
}