if (flags == LDLM_FL_WAIT_NOREPROC) {
/* client side - set a flag to prevent sending a CANCEL */
lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
- ldlm_lock_decref_internal(lock, mode);
+
+ /* when reaching here, it is under lock_res_and_lock(). Thus,
+ need call the nolock version of ldlm_lock_decref_internal*/
+ ldlm_lock_decref_internal_nolock(lock, mode);
}
ldlm_lock_destroy_nolock(lock);
int local = ns->ns_client;
int added = (mode == LCK_NL);
int overlaps = 0;
+ int splitted = 0;
ENTRY;
CDEBUG(D_DLMTRACE, "flags %#x pid %u mode %u start "LPU64" end "LPU64
req->l_blocking_ast = ldlm_flock_blocking_ast;
}
+reprocess:
if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
/* This loop determines where this processes locks start
* in the resource lr_granted list. */
/* XXX - if ldlm_lock_new() can sleep we should
* release the ns_lock, allocate the new lock,
* and restart processing this lock. */
- new2 = ldlm_lock_create(ns, res->lr_name, LDLM_FLOCK,
+ if (!new2) {
+ unlock_res_and_lock(req);
+ new2 = ldlm_lock_create(ns, res->lr_name, LDLM_FLOCK,
lock->l_granted_mode, NULL, NULL, NULL,
NULL, 0);
- if (!new2) {
- ldlm_flock_destroy(req, lock->l_granted_mode, *flags);
- *err = -ENOLCK;
- RETURN(LDLM_ITER_STOP);
+ lock_res_and_lock(req);
+ if (!new2) {
+ ldlm_flock_destroy(req, lock->l_granted_mode, *flags);
+ *err = -ENOLCK;
+ RETURN(LDLM_ITER_STOP);
+ }
+ goto reprocess;
}
+ splitted = 1;
+
new2->l_granted_mode = lock->l_granted_mode;
new2->l_policy_data.l_flock.pid =
new->l_policy_data.l_flock.pid;
&new2->l_export->exp_ldlm_data.led_held_locks);
spin_unlock(&new2->l_export->exp_ldlm_data.led_lock);
}
- if (*flags == LDLM_FL_WAIT_NOREPROC)
- ldlm_lock_addref_internal(new2, lock->l_granted_mode);
+ if (*flags == LDLM_FL_WAIT_NOREPROC) {
+ ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode);
+ }
/* insert new2 at lock */
ldlm_resource_add_lock(res, ownlocks, new2);
break;
}
+ /* if new2 is created but never used, destroy it*/
+ if (splitted == 0 && new2 != NULL)
+ ldlm_lock_destroy_nolock(new2);
+
/* At this point we're granting the lock request. */
req->l_granted_mode = req->l_req_mode;
ldlm_reprocess_queue(res, &res->lr_waiting,
&rpc_list);
- unlock_res(res);
- rc = ldlm_run_bl_ast_work(&rpc_list);
- lock_res(res);
+ unlock_res_and_lock(req);
+ rc = ldlm_run_cp_ast_work(&rpc_list);
+ lock_res_and_lock(req);
if (rc == -ERESTART)
GOTO(restart, -ERESTART);
}
LDLM_DEBUG(lock, "client-side enqueue granted");
ns = lock->l_resource->lr_namespace;
- lock_res(lock->l_resource);
+ lock_res_and_lock(lock);
/* take lock off the deadlock detection waitq. */
list_del_init(&lock->l_flock_waitq);
if (flags == 0)
cfs_waitq_signal(&lock->l_waitq);
}
- unlock_res(lock->l_resource);
+ unlock_res_and_lock(lock);
RETURN(0);
}
EXPORT_SYMBOL(ldlm_flock_completion_ast);
ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **,
void *cookie, int *flags);
void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode);
+void ldlm_lock_addref_internal_nolock(struct ldlm_lock *, __u32 mode);
void ldlm_lock_decref_internal(struct ldlm_lock *, __u32 mode);
+void ldlm_lock_decref_internal_nolock(struct ldlm_lock *, __u32 mode);
void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
struct list_head *work_list);
int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
unlock_res_and_lock(lock);
}
-void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
+/* only called in ldlm_flock_destroy and for local locks.
+ * for LDLM_FLOCK type locks, l_blocking_ast is null, and
+ * ldlm_lock_remove_from_lru() does nothing, it is safe
+ * for ldlm_flock_destroy usage by dropping some code */
+void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
{
- struct ldlm_namespace *ns;
- ENTRY;
-
- lock_res_and_lock(lock);
-
- ns = lock->l_resource->lr_namespace;
-
LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
LASSERT(lock->l_readers > 0);
lock->l_writers--;
}
+ LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */
+}
+void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
+{
+ struct ldlm_namespace *ns;
+ ENTRY;
+
+ lock_res_and_lock(lock);
+
+ ns = lock->l_resource->lr_namespace;
+
+ ldlm_lock_decref_internal_nolock(lock, mode);
+
if (lock->l_flags & LDLM_FL_LOCAL &&
!lock->l_readers && !lock->l_writers) {
/* If this is a local lock on a server namespace and this was
unlock_res_and_lock(lock);
}
- LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */
-
EXIT;
}