+ * Process a granting attempt for flock lock.
+ * Must be called under ns lock held.
+ *
+ * This function looks for any conflicts for \a lock in the granted or
+ * waiting queues. The lock is granted if no conflicts are found in
+ * either queue.
+ */
+int
+ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
+ enum ldlm_process_intention intention,
+ enum ldlm_error *err, struct list_head *work_list)
+{
+ struct ldlm_resource *res = req->l_resource;
+ struct ldlm_namespace *ns = ldlm_res_to_ns(res);
+ struct list_head *tmp;
+ struct list_head *ownlocks = NULL;
+ struct ldlm_lock *lock = NULL;
+ struct ldlm_lock *new = req;
+ struct ldlm_lock *new2 = NULL;
+ enum ldlm_mode mode = req->l_req_mode;
+ int local = ns_is_client(ns);
+ int added = (mode == LCK_NL);
+ int overlaps = 0;
+ int splitted = 0;
+ const struct ldlm_callback_suite null_cbs = { NULL };
+#ifdef HAVE_SERVER_SUPPORT
+ struct list_head *grant_work = (intention == LDLM_PROCESS_ENQUEUE ?
+ NULL : work_list);
+#endif
+ ENTRY;
+
+ CDEBUG(D_DLMTRACE, "flags %#llx owner %llu pid %u mode %u start "
+ "%llu end %llu\n", *flags,
+ new->l_policy_data.l_flock.owner,
+ new->l_policy_data.l_flock.pid, mode,
+ req->l_policy_data.l_flock.start,
+ req->l_policy_data.l_flock.end);
+
+ *err = ELDLM_OK;
+
+ if (local) {
+ /* No blocking ASTs are sent to the clients for
+ * Posix file & record locks
+ */
+ req->l_blocking_ast = NULL;
+ } else {
+ /* Called on the server for lock cancels. */
+ req->l_blocking_ast = ldlm_flock_blocking_ast;
+ }
+
+reprocess:
+ if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
+ /* This loop determines where this processes locks start
+ * in the resource lr_granted list.
+ */
+ list_for_each(tmp, &res->lr_granted) {
+ lock = list_entry(tmp, struct ldlm_lock,
+ l_res_link);
+ if (ldlm_same_flock_owner(lock, req)) {
+ ownlocks = tmp;
+ break;
+ }
+ }
+ }
+#ifdef HAVE_SERVER_SUPPORT
+ else {
+ int reprocess_failed = 0;
+ lockmode_verify(mode);
+
+ /* This loop determines if there are existing locks
+ * that conflict with the new lock request.
+ */
+ list_for_each(tmp, &res->lr_granted) {
+ lock = list_entry(tmp, struct ldlm_lock,
+ l_res_link);
+
+ if (ldlm_same_flock_owner(lock, req)) {
+ if (!ownlocks)
+ ownlocks = tmp;
+ continue;
+ }
+
+ /* locks are compatible, overlap doesn't matter */
+ if (lockmode_compat(lock->l_granted_mode, mode))
+ continue;
+
+ if (!ldlm_flocks_overlap(lock, req))
+ continue;
+
+ if (intention != LDLM_PROCESS_ENQUEUE) {
+ reprocess_failed = 1;
+ if (ldlm_flock_deadlock(req, lock)) {
+ ldlm_flock_cancel_on_deadlock(
+ req, grant_work);
+ RETURN(LDLM_ITER_CONTINUE);
+ }
+ continue;
+ }
+
+ if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+ ldlm_flock_destroy(req, mode, *flags);
+ *err = -EAGAIN;
+ RETURN(LDLM_ITER_STOP);
+ }
+
+ if (*flags & LDLM_FL_TEST_LOCK) {
+ ldlm_flock_destroy(req, mode, *flags);
+ req->l_req_mode = lock->l_granted_mode;
+ req->l_policy_data.l_flock.pid =
+ lock->l_policy_data.l_flock.pid;
+ req->l_policy_data.l_flock.start =
+ lock->l_policy_data.l_flock.start;
+ req->l_policy_data.l_flock.end =
+ lock->l_policy_data.l_flock.end;
+ *flags |= LDLM_FL_LOCK_CHANGED;
+ RETURN(LDLM_ITER_STOP);
+ }
+
+ /* add lock to blocking list before deadlock
+ * check to prevent race
+ */
+ ldlm_flock_blocking_link(req, lock);
+
+ if (ldlm_flock_deadlock(req, lock)) {
+ ldlm_flock_blocking_unlink(req);
+ ldlm_flock_destroy(req, mode, *flags);
+ *err = -EDEADLK;
+ RETURN(LDLM_ITER_STOP);
+ }
+
+ ldlm_resource_add_lock(res, &res->lr_waiting, req);
+ *flags |= LDLM_FL_BLOCK_GRANTED;
+ RETURN(LDLM_ITER_STOP);
+ }
+ if (reprocess_failed)
+ RETURN(LDLM_ITER_CONTINUE);
+ }
+
+ if (*flags & LDLM_FL_TEST_LOCK) {
+ ldlm_flock_destroy(req, mode, *flags);
+ req->l_req_mode = LCK_NL;
+ *flags |= LDLM_FL_LOCK_CHANGED;
+ RETURN(LDLM_ITER_STOP);
+ }
+
+ /* In case we had slept on this lock request take it off of the
+ * deadlock detection hash list.
+ */
+ ldlm_flock_blocking_unlink(req);
+#endif /* HAVE_SERVER_SUPPORT */
+
+ /* Scan the locks owned by this process that overlap this request.
+ * We may have to merge or split existing locks.
+ */
+ if (!ownlocks)
+ ownlocks = &res->lr_granted;
+
+ list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
+ lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
+
+ if (!ldlm_same_flock_owner(lock, new))
+ break;
+
+ if (lock->l_granted_mode == mode) {
+ /* If the modes are the same then we need to process
+ * locks that overlap OR adjoin the new lock. The extra
+ * logic condition is necessary to deal with arithmetic
+ * overflow and underflow.
+ */
+ if ((new->l_policy_data.l_flock.start >
+ (lock->l_policy_data.l_flock.end + 1))
+ && (lock->l_policy_data.l_flock.end !=
+ OBD_OBJECT_EOF))
+ continue;
+
+ if ((new->l_policy_data.l_flock.end <
+ (lock->l_policy_data.l_flock.start - 1))
+ && (lock->l_policy_data.l_flock.start != 0))
+ break;
+
+ if (new->l_policy_data.l_flock.start <
+ lock->l_policy_data.l_flock.start) {
+ lock->l_policy_data.l_flock.start =
+ new->l_policy_data.l_flock.start;
+ } else {
+ new->l_policy_data.l_flock.start =
+ lock->l_policy_data.l_flock.start;
+ }
+
+ if (new->l_policy_data.l_flock.end >
+ lock->l_policy_data.l_flock.end) {
+ lock->l_policy_data.l_flock.end =
+ new->l_policy_data.l_flock.end;
+ } else {
+ new->l_policy_data.l_flock.end =
+ lock->l_policy_data.l_flock.end;
+ }
+
+ if (added) {
+ ldlm_flock_destroy(lock, mode, *flags);
+ } else {
+ new = lock;
+ added = 1;
+ }
+ continue;
+ }
+
+ if (new->l_policy_data.l_flock.start >
+ lock->l_policy_data.l_flock.end)
+ continue;
+
+ if (new->l_policy_data.l_flock.end <
+ lock->l_policy_data.l_flock.start)
+ break;
+
+ ++overlaps;
+
+ if (new->l_policy_data.l_flock.start <=
+ lock->l_policy_data.l_flock.start) {
+ if (new->l_policy_data.l_flock.end <
+ lock->l_policy_data.l_flock.end) {
+ lock->l_policy_data.l_flock.start =
+ new->l_policy_data.l_flock.end + 1;
+ break;
+ }
+ ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
+ continue;
+ }
+ if (new->l_policy_data.l_flock.end >=
+ lock->l_policy_data.l_flock.end) {
+ lock->l_policy_data.l_flock.end =
+ new->l_policy_data.l_flock.start - 1;
+ continue;
+ }
+
+ /* split the existing lock into two locks */
+
+ /* if this is an F_UNLCK operation then we could avoid
+ * allocating a new lock and use the req lock passed in
+ * with the request but this would complicate the reply
+ * processing since updates to req get reflected in the
+ * reply. The client side replays the lock request so
+ * it must see the original lock data in the reply.
+ */
+
+ /* XXX - if ldlm_lock_new() can sleep we should
+ * release the lr_lock, allocate the new lock,
+ * and restart processing this lock.
+ */
+ if (new2 == NULL) {
+ unlock_res_and_lock(req);
+ new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
+ lock->l_granted_mode, &null_cbs,
+ NULL, 0, LVB_T_NONE);
+ lock_res_and_lock(req);
+ if (IS_ERR(new2)) {
+ ldlm_flock_destroy(req, lock->l_granted_mode,
+ *flags);
+ *err = PTR_ERR(new2);
+ RETURN(LDLM_ITER_STOP);
+ }
+ goto reprocess;
+ }
+
+ splitted = 1;
+
+ new2->l_granted_mode = lock->l_granted_mode;
+ new2->l_policy_data.l_flock.pid =
+ new->l_policy_data.l_flock.pid;
+ new2->l_policy_data.l_flock.owner =
+ new->l_policy_data.l_flock.owner;
+ new2->l_policy_data.l_flock.start =
+ lock->l_policy_data.l_flock.start;
+ new2->l_policy_data.l_flock.end =
+ new->l_policy_data.l_flock.start - 1;
+ lock->l_policy_data.l_flock.start =
+ new->l_policy_data.l_flock.end + 1;
+ new2->l_conn_export = lock->l_conn_export;
+ if (lock->l_export != NULL) {
+ new2->l_export = class_export_lock_get(lock->l_export,
+ new2);
+ if (new2->l_export->exp_lock_hash &&
+ hlist_unhashed(&new2->l_exp_hash))
+ cfs_hash_add(new2->l_export->exp_lock_hash,
+ &new2->l_remote_handle,
+ &new2->l_exp_hash);
+ }
+ if (*flags == LDLM_FL_WAIT_NOREPROC)
+ ldlm_lock_addref_internal_nolock(new2,
+ lock->l_granted_mode);
+
+ /* insert new2 at lock */
+ ldlm_resource_add_lock(res, ownlocks, new2);
+ LDLM_LOCK_RELEASE(new2);
+ break;
+ }
+
+ /* if new2 is created but never used, destroy it*/
+ if (splitted == 0 && new2 != NULL)
+ ldlm_lock_destroy_nolock(new2);
+
+ /* At this point we're granting the lock request. */
+ req->l_granted_mode = req->l_req_mode;
+
+ /* Add req to the granted queue before calling ldlm_reprocess_all(). */
+ if (!added) {
+ list_del_init(&req->l_res_link);
+ /* insert new lock before ownlocks in list. */
+ ldlm_resource_add_lock(res, ownlocks, req);
+ }
+
+ if (*flags != LDLM_FL_WAIT_NOREPROC) {
+#ifdef HAVE_SERVER_SUPPORT
+ if (intention == LDLM_PROCESS_ENQUEUE) {
+ /* If this is an unlock, reprocess the waitq and
+ * send completions ASTs for locks that can now be
+ * granted. The only problem with doing this
+ * reprocessing here is that the completion ASTs for
+ * newly granted locks will be sent before the unlock
+ * completion is sent. It shouldn't be an issue. Also
+ * note that ldlm_process_flock_lock() will recurse,
+ * but only once because 'intention' won't be
+ * LDLM_PROCESS_ENQUEUE from ldlm_reprocess_queue.
+ */
+ if ((mode == LCK_NL) && overlaps) {
+ LIST_HEAD(rpc_list);
+ int rc;
+
+restart:
+ ldlm_reprocess_queue(res, &res->lr_waiting,
+ &rpc_list,
+ LDLM_PROCESS_RESCAN, NULL);
+
+ unlock_res_and_lock(req);
+ rc = ldlm_run_ast_work(ns, &rpc_list,
+ LDLM_WORK_CP_AST);
+ lock_res_and_lock(req);
+ if (rc == -ERESTART)
+ GOTO(restart, rc);
+ }
+ } else {
+ LASSERT(req->l_completion_ast);
+ ldlm_add_ast_work_item(req, NULL, grant_work);
+ }
+#else /* !HAVE_SERVER_SUPPORT */
+ /* The only one possible case for client-side calls flock
+ * policy function is ldlm_flock_completion_ast inside which
+ * carries LDLM_FL_WAIT_NOREPROC flag.
+ */
+ CERROR("Illegal parameter for client-side-only module.\n");
+ LBUG();
+#endif /* HAVE_SERVER_SUPPORT */
+ }
+
+ /* In case we're reprocessing the requested lock we can't destroy
+ * it until after calling ldlm_add_ast_work_item() above so that laawi()
+ * can bump the reference count on \a req. Otherwise \a req
+ * could be freed before the completion AST can be sent.
+ */
+ if (added)
+ ldlm_flock_destroy(req, mode, *flags);
+
+ ldlm_resource_dump(D_INFO, res);
+ RETURN(LDLM_ITER_CONTINUE);
+}
+
+/**
+ * Flock completion callback function.