Whamcloud - gitweb
Fixes to pass Posix record locking tests.
authordmilos <dmilos>
Sun, 21 Sep 2003 16:31:00 +0000 (16:31 +0000)
committerdmilos <dmilos>
Sun, 21 Sep 2003 16:31:00 +0000 (16:31 +0000)
lustre/ldlm/ldlm_flock.c

index 3f8d079..fd29f47 100644 (file)
@@ -71,6 +71,9 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
 {
         ENTRY;
 
+        LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)",
+                   mode, flags);
+
         list_del_init(&lock->l_res_link);
         if (flags == LDLM_FL_WAIT_NOREPROC) {
                 /* client side - set a flag to prevent sending a CANCEL */
@@ -171,11 +174,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                                 *err = -EAGAIN;
                                 RETURN(LDLM_ITER_STOP);
                         }
-                        if (ldlm_flock_deadlock(req, lock)) {
-                                ldlm_flock_destroy(req, mode, *flags);
-                                *err = -EDEADLK;
-                                RETURN(LDLM_ITER_STOP);
-                        }
+
                         if (*flags & LDLM_FL_TEST_LOCK) {
                                 ldlm_flock_destroy(req, mode, *flags);
                                 req->l_req_mode = lock->l_granted_mode;
@@ -189,6 +188,12 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                                 RETURN(LDLM_ITER_STOP);
                         }
 
+                        if (ldlm_flock_deadlock(req, lock)) {
+                                ldlm_flock_destroy(req, mode, *flags);
+                                *err = -EDEADLK;
+                                RETURN(LDLM_ITER_STOP);
+                        }
+
                         req->l_policy_data.l_flock.blocking_pid =
                                 lock->l_policy_data.l_flock.pid;
                         req->l_policy_data.l_flock.blocking_export =
@@ -223,29 +228,37 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                         break;
 
                 if (lock->l_granted_mode == mode) {
-                        if (lock->l_policy_data.l_flock.end <
-                            (new->l_policy_data.l_flock.start - 1))
+                        /* If the modes are the same then we need to process
+                         * locks that overlap OR adjoin the new lock. The extra
+                         * logic condition is necessary to deal with arithmetic
+                         * overflow and underflow. */
+                        if ((new->l_policy_data.l_flock.start >
+                             (lock->l_policy_data.l_flock.end + 1))
+                            && (lock->l_policy_data.l_flock.end != ~0))
                                 continue;
 
-                        if (lock->l_policy_data.l_flock.start >
-                            (new->l_policy_data.l_flock.end + 1))
+                        if ((new->l_policy_data.l_flock.end <
+                             (lock->l_policy_data.l_flock.start - 1))
+                            && (lock->l_policy_data.l_flock.start != 0))
                                 break;
 
-                        if (lock->l_policy_data.l_flock.start >
-                            new->l_policy_data.l_flock.start)
+                        if (new->l_policy_data.l_flock.start <
+                            lock->l_policy_data.l_flock.start) {
                                 lock->l_policy_data.l_flock.start =
                                         new->l_policy_data.l_flock.start;
-                        else
+                        } else {
                                 new->l_policy_data.l_flock.start =
                                         lock->l_policy_data.l_flock.start;
+                        }
 
-                        if (lock->l_policy_data.l_flock.end <
-                            new->l_policy_data.l_flock.end)
+                        if (new->l_policy_data.l_flock.end >
+                            lock->l_policy_data.l_flock.end) {
                                 lock->l_policy_data.l_flock.end =
                                         new->l_policy_data.l_flock.end;
-                        else
+                        } else {
                                 new->l_policy_data.l_flock.end =
                                         lock->l_policy_data.l_flock.end;
+                        }
 
                         if (added) {
                                 ldlm_flock_destroy(lock, mode, *flags);
@@ -256,11 +269,12 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                         continue;
                 }
 
-                if (lock->l_policy_data.l_flock.end <
-                    new->l_policy_data.l_flock.start)
+                if (new->l_policy_data.l_flock.start >
+                    lock->l_policy_data.l_flock.end)
                         continue;
-                if (lock->l_policy_data.l_flock.start >
-                    new->l_policy_data.l_flock.end)
+
+                if (new->l_policy_data.l_flock.end <
+                    lock->l_policy_data.l_flock.start)
                         break;
 
                 ++overlaps;
@@ -272,17 +286,8 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                                 lock->l_policy_data.l_flock.start =
                                         new->l_policy_data.l_flock.end + 1;
                                 break;
-                        } else if (added) {
-                                ldlm_flock_destroy(lock, lock->l_req_mode,
-                                                   *flags);
-                        } else {
-                                lock->l_policy_data.l_flock.start =
-                                        new->l_policy_data.l_flock.start;
-                                lock->l_policy_data.l_flock.end =
-                                        new->l_policy_data.l_flock.end;
-                                new = lock;
-                                added = 1;
                         }
+                        ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
                         continue;
                 }
                 if (new->l_policy_data.l_flock.end >=
@@ -336,20 +341,19 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                 break;
         }
 
-        if (added) {
-                ldlm_flock_destroy(req, mode, *flags);
-        } else {
-                /* insert new after ownlocks */
-                new->l_granted_mode = new->l_req_mode;
-                list_del_init(&new->l_res_link);
-                ldlm_resource_add_lock(res, ownlocks, new);
+        /* Add req to the granted queue before calling ldlm_reprocess_all(). */
+        if (!added) {
+                req->l_granted_mode = req->l_req_mode;
+                list_del_init(&req->l_res_link);
+                /* insert new lock before ownlocks in list. */
+                ldlm_resource_add_lock(res, ownlocks, req);
         }
 
         if (*flags != LDLM_FL_WAIT_NOREPROC) {
                 if (first_enq) {
                         /* The only problem with doing the reprocessing here
                          * is that the completion ASTs for newly granted locks
-                         * will be sent before the unlock completion is sent.
+                         * will be sent before the unlock completion is sent.
                          * It shouldn't be an issue. Also note that
                          * ldlm_process_flock_lock() will recurse, but only
                          * once because there can't be unlock requests on the
@@ -362,26 +366,44 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                 }
         }
 
+        /* In case we're reprocessing the requested lock we can't destroy
+         * it until after calling ldlm_ast_work_item() above so that lawi()
+         * can bump the reference count on req. Otherwise req could be freed
+         * before the completion AST can be sent.  */
+        if (added)
+                ldlm_flock_destroy(req, mode, *flags);
+
         ldlm_resource_dump(res);
         RETURN(LDLM_ITER_CONTINUE);
 }
 
-static void
-interrupted_flock_completion_wait(void *data)
-{
-}
-
-struct flock_wait_data {
+struct ldlm_flock_wait_data {
         struct ldlm_lock *fwd_lock;
         int               fwd_generation;
 };
 
+static void
+ldlm_flock_interrupted_wait(void *data)
+{
+        struct ldlm_lock *lock;
+        struct lustre_handle lockh;
+        int rc;
+        ENTRY;
+
+        lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
+        ldlm_lock_decref_internal(lock, lock->l_req_mode);
+        ldlm_lock2handle(lock, &lockh);
+        rc = ldlm_cli_cancel(&lockh);
+        CDEBUG(D_DLMTRACE, "ldlm_cli_cancel: %d\n", rc);
+        EXIT;
+}
+
 int
 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 {
         struct ldlm_namespace *ns;
         struct file_lock *getlk = lock->l_ast_data;
-        struct flock_wait_data fwd;
+        struct ldlm_flock_wait_data fwd;
         unsigned long irqflags;
         struct obd_device *obd;
         struct obd_import *imp = NULL;
@@ -422,22 +444,21 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 spin_unlock_irqrestore(&imp->imp_lock, irqflags);
         }
 
-        lwi = LWI_TIMEOUT_INTR(0,NULL,interrupted_flock_completion_wait,&fwd);
+        lwi = LWI_TIMEOUT_INTR(0,NULL,ldlm_flock_interrupted_wait,&fwd);
 
         /* Go to sleep until the lock is granted. */
         rc = l_wait_event(lock->l_waitq,
                           ((lock->l_req_mode == lock->l_granted_mode) ||
                            lock->l_destroyed), &lwi);
 
-        LASSERT(!(lock->l_destroyed));
-
         if (rc) {
                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
                            rc);
-                /* XXX - need to cancel flock request on server */
                 RETURN(rc);
         }
 
+        LASSERT(!(lock->l_destroyed));
+
 granted:
 
         LDLM_DEBUG(lock, "client-side enqueue waking up");