struct ldlm_lock *lock;
};
+struct ldlm_flock_node {
+ atomic_t lfn_unlock_pending;
+ bool lfn_needs_reprocess;
+};
+
/** Whether to track references to exports by LDLM locks. */
#define LUSTRE_TRACKS_LOCK_EXP_REFS (0)
*/
struct ldlm_interval_tree *lr_itree;
struct ldlm_ibits_queues *lr_ibits_queues;
+ struct ldlm_flock_node lr_flock_node;
};
union {
enum ldlm_mode mode = req->l_req_mode;
int local = ns_is_client(ns);
int added = (mode == LCK_NL);
- int overlaps = 0;
int splitted = 0;
const struct ldlm_callback_suite null_cbs = { NULL };
#ifdef HAVE_SERVER_SUPPORT
lock->l_policy_data.l_flock.start)
break;
- ++overlaps;
+ res->lr_flock_node.lfn_needs_reprocess = true;
if (new->l_policy_data.l_flock.start <=
lock->l_policy_data.l_flock.start) {
* but only once because 'intention' won't be
* LDLM_PROCESS_ENQUEUE from ldlm_reprocess_queue.
*/
- if ((mode == LCK_NL) && overlaps) {
+ struct ldlm_flock_node *fn = &res->lr_flock_node;
+restart:
+ if (mode == LCK_NL && fn->lfn_needs_reprocess &&
+ atomic_read(&fn->lfn_unlock_pending) == 0) {
LIST_HEAD(rpc_list);
int rc;
-restart:
ldlm_reprocess_queue(res, &res->lr_waiting,
&rpc_list,
LDLM_PROCESS_RESCAN, 0);
-
+ fn->lfn_needs_reprocess = false;
unlock_res_and_lock(req);
rc = ldlm_run_ast_work(ns, &rpc_list,
LDLM_WORK_CP_AST);
lock_res_and_lock(req);
- if (rc == -ERESTART)
+ if (rc == -ERESTART) {
+ fn->lfn_needs_reprocess = true;
GOTO(restart, rc);
+ }
}
} else {
LASSERT(req->l_completion_ast);
RETURN(rc);
}
}
+
+ if (!local && lock->l_resource->lr_type == LDLM_FLOCK) {
+ struct ldlm_flock_node *fn = &lock->l_resource->lr_flock_node;
+ if (lock->l_req_mode == LCK_NL) {
+ atomic_inc(&fn->lfn_unlock_pending);
+ res = lock_res_and_lock(lock);
+ atomic_dec(&fn->lfn_unlock_pending);
+ } else {
+ res = lock_res_and_lock(lock);
+
+ while (atomic_read(&fn->lfn_unlock_pending)) {
+ unlock_res_and_lock(lock);
+ cond_resched();
+ lock_res_and_lock(lock);
+ }
+ }
+ } else
#endif
- res = lock_res_and_lock(lock);
+ {
+ res = lock_res_and_lock(lock);
+ }
if (local && ldlm_is_granted(lock)) {
/* The server returned a blocked lock, but it was granted
* before we got a chance to actually enqueue it. We don't
return true;
}
+static bool ldlm_resource_flock_new(struct ldlm_resource *res)
+{
+ res->lr_flock_node.lfn_needs_reprocess = false;
+ atomic_set(&res->lr_flock_node.lfn_unlock_pending, 0);
+
+ return true;
+}
+
/** Create and initialize new resource. */
static struct ldlm_resource *ldlm_resource_new(enum ldlm_type ldlm_type)
{
case LDLM_IBITS:
rc = ldlm_resource_inodebits_new(res);
break;
+ case LDLM_FLOCK:
+ rc = ldlm_resource_flock_new(res);
+ break;
default:
rc = true;
break;