-reprocess:
- if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
- /* This loop determines where this processes locks start
- * in the resource lr_granted list. */
- list_for_each(tmp, &res->lr_granted) {
- lock = list_entry(tmp, struct ldlm_lock, l_res_link);
- if (ldlm_same_flock_owner(lock, req)) {
- ownlocks = tmp;
- break;
- }
- }
- } else {
- lockmode_verify(mode);
-
- /* This loop determines if there are existing locks
- * that conflict with the new lock request. */
- list_for_each(tmp, &res->lr_granted) {
- lock = list_entry(tmp, struct ldlm_lock, l_res_link);
-
- if (ldlm_same_flock_owner(lock, req)) {
- if (!ownlocks)
- ownlocks = tmp;
- continue;
- }
-
- /* locks are compatible, overlap doesn't matter */
- if (lockmode_compat(lock->l_granted_mode, mode))
- continue;
-
- if (!ldlm_flocks_overlap(lock, req))
- continue;
-
- if (!first_enq)
- RETURN(LDLM_ITER_CONTINUE);
-
- if (*flags & LDLM_FL_BLOCK_NOWAIT) {
- ldlm_flock_destroy(req, mode, *flags);
- *err = -EAGAIN;
- RETURN(LDLM_ITER_STOP);
- }
-
- if (*flags & LDLM_FL_TEST_LOCK) {
- ldlm_flock_destroy(req, mode, *flags);
- req->l_req_mode = lock->l_granted_mode;
- req->l_policy_data.l_flock.pid =
- lock->l_policy_data.l_flock.pid;
- req->l_policy_data.l_flock.start =
- lock->l_policy_data.l_flock.start;
- req->l_policy_data.l_flock.end =
- lock->l_policy_data.l_flock.end;
- *flags |= LDLM_FL_LOCK_CHANGED;
- RETURN(LDLM_ITER_STOP);
- }
-
- if (ldlm_flock_deadlock(req, lock)) {
- ldlm_flock_destroy(req, mode, *flags);
- *err = -EDEADLK;
- RETURN(LDLM_ITER_STOP);
- }
-
- req->l_policy_data.l_flock.blocking_pid =
- lock->l_policy_data.l_flock.pid;
- req->l_policy_data.l_flock.blocking_export =
- (long)(void *)lock->l_export;
-
- LASSERT(list_empty(&req->l_flock_waitq));
- spin_lock(&ldlm_flock_waitq_lock);
- list_add_tail(&req->l_flock_waitq, &ldlm_flock_waitq);
- spin_unlock(&ldlm_flock_waitq_lock);
-
- ldlm_resource_add_lock(res, &res->lr_waiting, req);
- *flags |= LDLM_FL_BLOCK_GRANTED;
- RETURN(LDLM_ITER_STOP);
- }
- }
-
- if (*flags & LDLM_FL_TEST_LOCK) {
- ldlm_flock_destroy(req, mode, *flags);
- req->l_req_mode = LCK_NL;
- *flags |= LDLM_FL_LOCK_CHANGED;
- RETURN(LDLM_ITER_STOP);
- }
-
- /* In case we had slept on this lock request take it off of the
- * deadlock detection waitq. */
- spin_lock(&ldlm_flock_waitq_lock);
- list_del_init(&req->l_flock_waitq);
- spin_unlock(&ldlm_flock_waitq_lock);
-
- /* Scan the locks owned by this process that overlap this request.
- * We may have to merge or split existing locks. */
-
- if (!ownlocks)
- ownlocks = &res->lr_granted;
-
- list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
- lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
-
- if (!ldlm_same_flock_owner(lock, new))
- break;
-
- if (lock->l_granted_mode == mode) {
- /* If the modes are the same then we need to process
- * locks that overlap OR adjoin the new lock. The extra
- * logic condition is necessary to deal with arithmetic
- * overflow and underflow. */
- if ((new->l_policy_data.l_flock.start >
- (lock->l_policy_data.l_flock.end + 1))
- && (lock->l_policy_data.l_flock.end !=
- OBD_OBJECT_EOF))
- continue;
-
- if ((new->l_policy_data.l_flock.end <
- (lock->l_policy_data.l_flock.start - 1))
- && (lock->l_policy_data.l_flock.start != 0))
- break;
-
- if (new->l_policy_data.l_flock.start <
- lock->l_policy_data.l_flock.start) {
- lock->l_policy_data.l_flock.start =
- new->l_policy_data.l_flock.start;
- } else {
- new->l_policy_data.l_flock.start =
- lock->l_policy_data.l_flock.start;
- }
-
- if (new->l_policy_data.l_flock.end >
- lock->l_policy_data.l_flock.end) {
- lock->l_policy_data.l_flock.end =
- new->l_policy_data.l_flock.end;
- } else {
- new->l_policy_data.l_flock.end =
- lock->l_policy_data.l_flock.end;
- }
-
- if (added) {
- ldlm_flock_destroy(lock, mode, *flags);
- } else {
- new = lock;
- added = 1;
- }
- continue;
- }
-
- if (new->l_policy_data.l_flock.start >
- lock->l_policy_data.l_flock.end)
- continue;
-
- if (new->l_policy_data.l_flock.end <
- lock->l_policy_data.l_flock.start)
- break;
-
- ++overlaps;
-
- if (new->l_policy_data.l_flock.start <=
- lock->l_policy_data.l_flock.start) {
- if (new->l_policy_data.l_flock.end <
- lock->l_policy_data.l_flock.end) {
- lock->l_policy_data.l_flock.start =
- new->l_policy_data.l_flock.end + 1;
- break;
- }
- ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
- continue;
- }
- if (new->l_policy_data.l_flock.end >=
- lock->l_policy_data.l_flock.end) {
- lock->l_policy_data.l_flock.end =
- new->l_policy_data.l_flock.start - 1;
- continue;
- }
-
- /* split the existing lock into two locks */
-
- /* if this is an F_UNLCK operation then we could avoid
- * allocating a new lock and use the req lock passed in
- * with the request but this would complicate the reply
- * processing since updates to req get reflected in the
- * reply. The client side replays the lock request so
- * it must see the original lock data in the reply. */
-
- /* XXX - if ldlm_lock_new() can sleep we should
- * release the ns_lock, allocate the new lock,
- * and restart processing this lock. */
- if (!new2) {
- unlock_res_and_lock(req);
- new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
- lock->l_granted_mode, &null_cbs,
- NULL, 0);
- lock_res_and_lock(req);
- if (!new2) {
- ldlm_flock_destroy(req, lock->l_granted_mode,
- *flags);
- *err = -ENOLCK;
- RETURN(LDLM_ITER_STOP);
- }
- goto reprocess;
- }
-
- splitted = 1;
-
- new2->l_granted_mode = lock->l_granted_mode;
- new2->l_policy_data.l_flock.pid =
- new->l_policy_data.l_flock.pid;
- new2->l_policy_data.l_flock.start =
- lock->l_policy_data.l_flock.start;
- new2->l_policy_data.l_flock.end =
- new->l_policy_data.l_flock.start - 1;
- lock->l_policy_data.l_flock.start =
- new->l_policy_data.l_flock.end + 1;
- new2->l_conn_export = lock->l_conn_export;
- if (lock->l_export != NULL) {
- new2->l_export = class_export_lock_get(lock->l_export);
- if (new2->l_export->exp_lock_hash &&
- hlist_unhashed(&new2->l_exp_hash))
- lustre_hash_add(new2->l_export->exp_lock_hash,
- &new2->l_remote_handle,
- &new2->l_exp_hash);
- }
- if (*flags == LDLM_FL_WAIT_NOREPROC)
- ldlm_lock_addref_internal_nolock(new2,
- lock->l_granted_mode);
-
- /* insert new2 at lock */
- ldlm_resource_add_lock(res, ownlocks, new2);
- LDLM_LOCK_RELEASE(new2);
- break;
- }
-
- /* if new2 is created but never used, destroy it*/
- if (splitted == 0 && new2 != NULL)
- ldlm_lock_destroy_nolock(new2);
-
- /* At this point we're granting the lock request. */
- req->l_granted_mode = req->l_req_mode;
-
- /* Add req to the granted queue before calling ldlm_reprocess_all(). */
- if (!added) {
- list_del_init(&req->l_res_link);
- /* insert new lock before ownlocks in list. */
- ldlm_resource_add_lock(res, ownlocks, req);
- }
-
- if (*flags != LDLM_FL_WAIT_NOREPROC) {
- if (first_enq) {
- /* If this is an unlock, reprocess the waitq and
- * send completions ASTs for locks that can now be
- * granted. The only problem with doing this
- * reprocessing here is that the completion ASTs for
- * newly granted locks will be sent before the unlock
- * completion is sent. It shouldn't be an issue. Also
- * note that ldlm_process_flock_lock() will recurse,
- * but only once because first_enq will be false from
- * ldlm_reprocess_queue. */
- if ((mode == LCK_NL) && overlaps) {
- CFS_LIST_HEAD(rpc_list);
- int rc;
-restart:
- ldlm_reprocess_queue(res, &res->lr_waiting,
- &rpc_list);
-
- unlock_res_and_lock(req);
- rc = ldlm_run_ast_work(&rpc_list,
- LDLM_WORK_CP_AST);
- lock_res_and_lock(req);
- if (rc == -ERESTART)
- GOTO(restart, -ERESTART);
- }
- } else {
- LASSERT(req->l_completion_ast);
- ldlm_add_ast_work_item(req, NULL, work_list);
- }
- }
-
- /* In case we're reprocessing the requested lock we can't destroy
- * it until after calling ldlm_ast_work_item() above so that lawi()
- * can bump the reference count on req. Otherwise req could be freed
- * before the completion AST can be sent. */
- if (added)
- ldlm_flock_destroy(req, mode, *flags);
-
- ldlm_resource_dump(D_INFO, res);
- RETURN(LDLM_ITER_CONTINUE);