Whamcloud - gitweb
Bug# 782 - address code review comments and enable Posix locking file op.
authordmilos <dmilos>
Wed, 17 Sep 2003 02:07:00 +0000 (02:07 +0000)
committerdmilos <dmilos>
Wed, 17 Sep 2003 02:07:00 +0000 (02:07 +0000)
lustre/ldlm/ldlm_flock.c
lustre/ldlm/ldlm_plain.c

index b12e69b..b0123a3 100644 (file)
 #include "ldlm_internal.h"
 
 #define l_flock_waitq   l_lru
-#define l_flock_blocker l_parent
 
 static struct list_head ldlm_flock_waitq = LIST_HEAD_INIT(ldlm_flock_waitq);
 
+/**
+ * list_for_remaining_safe - iterate over the remaining entries in a list
+ *              and safeguard against removal of a list entry.
+ * @pos:        the &struct list_head to use as a loop counter. pos MUST
+ *              have been initialized prior to using it in this macro.
+ * @n:          another &struct list_head to use as temporary storage
+ * @head:       the head for your list.
+ */
+#define list_for_remaining_safe(pos, n, head) \
+        for (n = pos->next; pos != (head); pos = n, n = pos->next)
+
 static inline int
 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
 {
-        if ((new->l_policy_data.l_flock.pid ==
-             lock->l_policy_data.l_flock.pid) &&
-            (new->l_export == lock->l_export))
-                return 1;
-        else
-                return 0;
+        return((new->l_policy_data.l_flock.pid ==
+                lock->l_policy_data.l_flock.pid) &&
+               (new->l_export == lock->l_export));
 }
 
 static inline int
 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
 {
-        if ((new->l_policy_data.l_flock.start <=
-             lock->l_policy_data.l_flock.end) &&
-            (new->l_policy_data.l_flock.end >=
-             lock->l_policy_data.l_flock.start))
-                return 1;
-        else
-                return 0;
+        return((new->l_policy_data.l_flock.start <=
+                lock->l_policy_data.l_flock.end) &&
+               (new->l_policy_data.l_flock.end >=
+                lock->l_policy_data.l_flock.start));
 }
 
 static inline void
-ldlm_flock_destroy(struct ldlm_lock *lock, int flags)
+ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
 {
         ENTRY;
 
         list_del_init(&lock->l_res_link);
         if (flags == LDLM_FL_WAIT_NOREPROC) {
-                /* client side */
-                struct lustre_handle lockh;
-
-                /* Set a flag to prevent us from sending a CANCEL */
-                lock->l_flags |= LDLM_FL_LOCAL_ONLY;
-
-                ldlm_lock2handle(lock, &lockh);
-                ldlm_lock_decref_and_cancel(&lockh, lock->l_granted_mode);
+                /* client side - set a flag to prevent sending a CANCEL */
+                lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
+                ldlm_lock_decref_internal(lock, mode);
         }
 
         ldlm_lock_destroy(lock);
         EXIT;
 }
 
-#if 0
 static int
-ldlm_flock_deadlock(struct ldlm_lock *waiter, struct ldlm_lock *blocker)
+ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *blocking_lock)
 {
-       struct list_head *tmp;
-       struct ldlm_lock *lock;
-       struct obd_export *waiter_export;
-       struct obd_export *blocker_export;
-       pid_t waiter_pid;
-       pid_t blocker_pid;
-
-       waiter_export = waiter->l_export;
-       waiter_pid = waiter->l_policy_data.l_flock.pid;
-       blocker_export = blocker->l_export;
-       blocker_pid = blocker->l_policy_data.l_flock.pid;
-
-next_task:
-       if (waiter_export == blocker_export && waiter_pid == blocker_pid)
-               return 1;
-
-       list_for_each(tmp, &ldlm_flock_waitq) {
-
-               lock = list_entry(tmp, struct ldlm_lock, l_flock_waitq);
-               if ((lock->l_export == blocker_export)
-                   && (lock->l_policy_data.l_flock.pid == blocker_pid)) {
-                       lock = lock->l_flock_blocker;
-                       blocker_export = lock->l_export;
-                       blocker_pid = lock->l_policy_data.l_flock.pid;
-                       goto next_task;
-               }
-       }
-       return 0;
+        struct obd_export *req_export = req->l_export;
+        struct obd_export *blocking_export = blocking_lock->l_export;
+        pid_t req_pid = req->l_policy_data.l_flock.pid;
+        pid_t blocking_pid = blocking_lock->l_policy_data.l_flock.pid;
+        struct ldlm_lock *lock;
+
+restart:
+        list_for_each_entry(lock, &ldlm_flock_waitq, l_flock_waitq) {
+                if ((lock->l_policy_data.l_flock.pid != blocking_pid) ||
+                    (lock->l_export != blocking_export))
+                        continue;
+
+                blocking_pid = lock->l_policy_data.l_flock.blocking_pid;
+                blocking_export = lock->l_policy_data.l_flock.blocking_export;
+                if (blocking_pid == req_pid && blocking_export == req_export)
+                        return 1;
+
+                goto restart;
+        }
+
+        return 0;
 }
-#endif
 
 int
-ldlm_flock_enqueue(struct ldlm_lock *req, int *flags, int first_enq,
-                   ldlm_error_t *err)
+ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
+                        ldlm_error_t *err)
 {
-        struct ldlm_lock *new = req;
-        struct ldlm_lock *new2 = NULL;
-        struct ldlm_lock *lock = NULL;
         struct ldlm_resource *res = req->l_resource;
         struct ldlm_namespace *ns = res->lr_namespace;
         struct list_head *tmp;
-        struct list_head *ownlocks;
+        struct list_head *ownlocks = NULL;
+        struct ldlm_lock *lock = NULL;
+        struct ldlm_lock *new = req;
+        struct ldlm_lock *new2 = NULL;
         ldlm_mode_t mode = req->l_req_mode;
-        int added = 0;
+        int added = (mode == LCK_NL);
         int overlaps = 0;
         ENTRY;
 
-        CDEBUG(D_DLMTRACE, "flags: 0x%x pid: %d mode: %d start: %llu end: %llu\n",
-               *flags, new->l_policy_data.l_flock.pid, mode,
+        CDEBUG(D_DLMTRACE, "flags: 0x%x pid: %d mode: %d start: %llu end: %llu"
+               "\n", *flags, new->l_policy_data.l_flock.pid, mode,
                req->l_policy_data.l_flock.start,
                req->l_policy_data.l_flock.end);
 
         *err = ELDLM_OK;
 
-        /* No blocking ASTs are sent for record locks */
+        /* No blocking ASTs are sent for Posix file & record locks */
         req->l_blocking_ast = NULL;
 
-        ownlocks = NULL;
-       if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
-                CDEBUG(D_DLMTRACE, "starting loop1.\n");
+        if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
+                /* This loop determines where this processes locks start
+                 * in the resource lr_granted list. */
                 list_for_each(tmp, &res->lr_granted) {
                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
-
-                        CDEBUG(D_DLMTRACE, "loop1 granted: %p tmp: %p\n",
-                               &res->lr_granted, tmp);
-
                         if (ldlm_same_flock_owner(lock, req)) {
                                 ownlocks = tmp;
                                 break;
                         }
                 }
-                CDEBUG(D_DLMTRACE, "loop1 end.\n");
         } else {
-                CDEBUG(D_DLMTRACE, "starting loop2.\n");
+                /* This loop determines if there are existing locks
+                 * that conflict with the new lock request. */
                 list_for_each(tmp, &res->lr_granted) {
                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
-                        CDEBUG(D_DLMTRACE, "loop2 granted: %p tmp: %p\n",
-                               &res->lr_granted, tmp);
-
                         if (ldlm_same_flock_owner(lock, req)) {
                                 if (!ownlocks)
                                         ownlocks = tmp;
@@ -180,98 +163,98 @@ ldlm_flock_enqueue(struct ldlm_lock *req, int *flags, int first_enq,
                         if (!ldlm_flocks_overlap(lock, req))
                                 continue;
 
-#if 0
-                        if ((*flags & LDLM_FL_BLOCK_NOWAIT) ||
-                            (first_enq && ldlm_flock_deadlock(req, lock))) {
-#else
+                        if (!first_enq)
+                                RETURN(LDLM_ITER_CONTINUE);
+
                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
-#endif
-                                ldlm_flock_destroy(req, *flags);
-                                *err = ELDLM_LOCK_ABORTED;
+                                ldlm_flock_destroy(req, mode, *flags);
+                                *err = -EAGAIN;
+                                RETURN(LDLM_ITER_STOP);
+                        }
+                        if (ldlm_flock_deadlock(req, lock)) {
+                                ldlm_flock_destroy(req, mode, *flags);
+                                *err = -EDEADLK;
                                 RETURN(LDLM_ITER_STOP);
                         }
-
                         if (*flags & LDLM_FL_TEST_LOCK) {
-                                req->l_granted_mode = lock->l_granted_mode;
+                                ldlm_flock_destroy(req, mode, *flags);
+                                req->l_req_mode = lock->l_granted_mode;
                                 req->l_policy_data.l_flock.pid =
                                         lock->l_policy_data.l_flock.pid;
                                 req->l_policy_data.l_flock.start =
                                         lock->l_policy_data.l_flock.start;
                                 req->l_policy_data.l_flock.end =
                                         lock->l_policy_data.l_flock.end;
-                                ldlm_flock_destroy(req, *flags);
+                                *flags |= LDLM_FL_LOCK_CHANGED;
                                 RETURN(LDLM_ITER_STOP);
                         }
 
-                        req->l_flock_blocker = lock;
-                        list_add_tail(&ldlm_flock_waitq, &req->l_flock_waitq);
+                        req->l_policy_data.l_flock.blocking_pid =
+                                lock->l_policy_data.l_flock.pid;
+                        req->l_policy_data.l_flock.blocking_export =
+                                lock->l_export;
+
+                        LASSERT(list_empty(&req->l_flock_waitq));
+                        list_add_tail(&req->l_flock_waitq, &ldlm_flock_waitq);
+
+                        ldlm_resource_add_lock(res, &res->lr_waiting, req);
                         *flags |= LDLM_FL_BLOCK_GRANTED;
-                        RETURN(LDLM_ITER_CONTINUE);
+                        RETURN(LDLM_ITER_STOP);
                 }
-                CDEBUG(D_DLMTRACE, "loop2 end.\n");
         }
 
         if (*flags & LDLM_FL_TEST_LOCK) {
-                LASSERT(first_enq);
-                req->l_granted_mode = req->l_req_mode;
+                ldlm_flock_destroy(req, mode, *flags);
+                req->l_req_mode = LCK_NL;
+                *flags |= LDLM_FL_LOCK_CHANGED;
                 RETURN(LDLM_ITER_STOP);
         }
 
-        added = (mode == LCK_NL);
-
-        /* Insert the new lock into the list */
+        /* Scan the locks owned by this process that overlap this request.
+         * We may have to merge or split existing locks. */
 
         if (!ownlocks)
                 ownlocks = &res->lr_granted;
 
-        CDEBUG(D_DLMTRACE, "granted: %p ownlocks: %p\n",
-               &res->lr_granted, ownlocks);
-
-        CDEBUG(D_DLMTRACE, "starting loop3.\n");
-        for (tmp = ownlocks->next; ownlocks != &res->lr_granted;
-             ownlocks = tmp, tmp = ownlocks->next) {
-
-                CDEBUG(D_DLMTRACE, "loop3 granted: %p ownlocks: %p\n",
-                       &res->lr_granted, ownlocks);
-
+        list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
                 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
 
                 if (!ldlm_same_flock_owner(lock, new))
                         break;
 
-               if (lock->l_granted_mode == mode) {
-                       if (lock->l_policy_data.l_flock.end <
+                if (lock->l_granted_mode == mode) {
+                        if (lock->l_policy_data.l_flock.end <
                             (new->l_policy_data.l_flock.start - 1))
-                               continue;
+                                continue;
 
-                       if (lock->l_policy_data.l_flock.start >
+                        if (lock->l_policy_data.l_flock.start >
                             (new->l_policy_data.l_flock.end + 1))
-                               break;
+                                break;
 
-                       if (lock->l_policy_data.l_flock.start >
+                        if (lock->l_policy_data.l_flock.start >
                             new->l_policy_data.l_flock.start)
-                               lock->l_policy_data.l_flock.start =
+                                lock->l_policy_data.l_flock.start =
                                         new->l_policy_data.l_flock.start;
-                       else
-                               new->l_policy_data.l_flock.start =
+                        else
+                                new->l_policy_data.l_flock.start =
                                         lock->l_policy_data.l_flock.start;
 
-                       if (lock->l_policy_data.l_flock.end <
+                        if (lock->l_policy_data.l_flock.end <
                             new->l_policy_data.l_flock.end)
-                               lock->l_policy_data.l_flock.end =
+                                lock->l_policy_data.l_flock.end =
                                         new->l_policy_data.l_flock.end;
-                       else
-                               new->l_policy_data.l_flock.end =
+                        else
+                                new->l_policy_data.l_flock.end =
                                         lock->l_policy_data.l_flock.end;
 
-                       if (added) {
-                                ldlm_flock_destroy(lock, *flags);
-                       } else {
+                        if (added) {
+                                ldlm_flock_destroy(lock, mode, *flags);
+                        } else {
                                 new = lock;
                                 added = 1;
                         }
                         continue;
-               }
+                }
 
                 if (lock->l_policy_data.l_flock.end <
                     new->l_policy_data.l_flock.start)
@@ -290,7 +273,8 @@ ldlm_flock_enqueue(struct ldlm_lock *req, int *flags, int first_enq,
                                         new->l_policy_data.l_flock.end + 1;
                                 break;
                         } else if (added) {
-                                ldlm_flock_destroy(lock, *flags);
+                                ldlm_flock_destroy(lock, lock->l_req_mode,
+                                                   *flags);
                         } else {
                                 lock->l_policy_data.l_flock.start =
                                         new->l_policy_data.l_flock.start;
@@ -314,18 +298,18 @@ ldlm_flock_enqueue(struct ldlm_lock *req, int *flags, int first_enq,
                  * allocating a new lock and use the req lock passed in
                  * with the request but this would complicate the reply
                  * processing since updates to req get reflected in the
-                 * reply. The client side must see the original lock data
-                 * so that it can process the unlock properly. */
+                 * reply. The client side replays the lock request so
+                 * it must see the original lock data in the reply. */
 
-                /* XXX - if ldlm_lock_new() can sleep we have to
-                 * release the ns_lock, allocate the new lock, and
-                 * restart processing this lock. */
+                /* XXX - if ldlm_lock_new() can sleep we should
+                 * release the ns_lock, allocate the new lock,
+                 * and restart processing this lock. */
                 new2 = ldlm_lock_create(ns, NULL, res->lr_name, LDLM_FLOCK,
                                         lock->l_granted_mode, NULL, NULL);
                 if (!new2) {
-                /* LBUG for now */
-                LASSERT(0);
-                        RETURN(ENOMEM);
+                        ldlm_flock_destroy(req, lock->l_granted_mode, *flags);
+                        *err = -ENOLCK;
+                        RETURN(LDLM_ITER_STOP);
                 }
 
                 new2->l_granted_mode = lock->l_granted_mode;
@@ -347,42 +331,40 @@ ldlm_flock_enqueue(struct ldlm_lock *req, int *flags, int first_enq,
                         ldlm_lock_addref_internal(new2, lock->l_granted_mode);
 
                 /* insert new2 at lock */
-                list_add_tail(&new2->l_res_link, ownlocks);
+                ldlm_resource_add_lock(res, ownlocks, new2);
                 LDLM_LOCK_PUT(new2);
                 break;
         }
 
-        CDEBUG(D_DLMTRACE, "loop3 end; added: %d\n", added);
-
         if (added) {
-                ldlm_flock_destroy(req, *flags);
+                ldlm_flock_destroy(req, mode, *flags);
         } else {
-                /* insert new at ownlocks */
+                /* insert new after ownlocks */
                 new->l_granted_mode = new->l_req_mode;
                 list_del_init(&new->l_res_link);
-                list_add_tail(&new->l_res_link, ownlocks);
+                ldlm_resource_add_lock(res, ownlocks, new);
         }
 
-       if (*flags != LDLM_FL_WAIT_NOREPROC) {
+        if (*flags != LDLM_FL_WAIT_NOREPROC) {
                 if (req->l_completion_ast)
                         ldlm_add_ast_work_item(req, NULL, NULL, 0);
 
                 /* The only problem with doing the reprocessing here is that
                  * the completion ASTs for newly granted locks will be sent
                  * before the unlock completion is sent. It shouldn't be an
-                 * issue. Also note that ldlm_flock_enqueue() will recurse,
-                 * but only once because there can't be unlock requests on
-                 * the wait queue. */
+                 * issue. Also note that ldlm_process_flock_lock() will
+                 * recurse, but only once because there can't be unlock
+                 * requests on the wait queue. */
                 if ((mode == LCK_NL) && overlaps)
                         ldlm_reprocess_queue(res, &res->lr_waiting);
         }
 
         ldlm_resource_dump(res);
-
-       RETURN(LDLM_ITER_CONTINUE);
+        RETURN(LDLM_ITER_CONTINUE);
 }
 
-static void interrupted_flock_completion_wait(void *data)
+static void
+interrupted_flock_completion_wait(void *data)
 {
 }
 
@@ -395,7 +377,7 @@ int
 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 {
         struct ldlm_namespace *ns;
-        struct file_lock *getlk = data;
+        struct file_lock *getlk = lock->l_ast_data;
         struct flock_wait_data fwd;
         unsigned long irqflags;
         struct obd_device *obd;
@@ -405,6 +387,9 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         struct l_wait_info lwi;
         ENTRY;
 
+        CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n",
+               flags, data, getlk);
+
         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
 
         if (flags == 0) {
@@ -414,7 +399,7 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 
         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
                        LDLM_FL_BLOCK_CONV)))
-                goto granted;
+                goto  granted;
 
         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
                    "sleeping");
@@ -434,8 +419,7 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 spin_unlock_irqrestore(&imp->imp_lock, irqflags);
         }
 
-        lwi = LWI_TIMEOUT_INTR(0, NULL, interrupted_flock_completion_wait,
-                               &fwd);
+        lwi = LWI_TIMEOUT_INTR(0,NULL,interrupted_flock_completion_wait,&fwd);
 
         /* Go to sleep until the lock is granted. */
         rc = l_wait_event(lock->l_waitq,
@@ -457,39 +441,36 @@ granted:
         ns = lock->l_resource->lr_namespace;
         l_lock(&ns->ns_lock);
 
-        lock->l_flock_blocker = NULL;
+        /* take data off of deadlock detection waitq. */
         list_del_init(&lock->l_flock_waitq);
 
         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
         list_del_init(&lock->l_res_link);
 
-        if (getlk) {
+        if (flags & LDLM_FL_TEST_LOCK) {
                 /* fcntl(F_GETLK) request */
-                if (lock->l_granted_mode == LCK_PR)
+                /* The old mode was saved in getlk->fl_type so that if the mode
+                 * in the lock changes we can decref the approprate refcount. */
+                ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
+                switch (lock->l_granted_mode) {
+                case LCK_PR:
                         getlk->fl_type = F_RDLCK;
-                else if (lock->l_granted_mode == LCK_PW)
+                        break;
+                case LCK_PW:
                         getlk->fl_type = F_WRLCK;
-                else
+                        break;
+                default:
                         getlk->fl_type = F_UNLCK;
+                }
                 getlk->fl_pid = lock->l_policy_data.l_flock.pid;
                 getlk->fl_start = lock->l_policy_data.l_flock.start;
                 getlk->fl_end = lock->l_policy_data.l_flock.end;
-                /* ldlm_flock_destroy(lock); */
         } else {
+                /* We need to reprocess the lock to do merges or splits
+                 * with existing locks owne by this process. */
                 flags = LDLM_FL_WAIT_NOREPROC;
-                /* We need to reprocess the lock to do merges or split */
-                ldlm_flock_enqueue(lock, &flags, 1, &err);
+                ldlm_process_flock_lock(lock, &flags, 1, &err);
         }
         l_unlock(&ns->ns_lock);
         RETURN(0);
 }
-
-/* This function is only called on the client when a lock is aborted. */
-int
-ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *ld,
-                        void *data, int flag)
-{
-        ENTRY;
-        ldlm_lock_destroy(lock);
-        RETURN(0);
-}
index 0efc294..af55c17 100644 (file)
@@ -39,7 +39,7 @@ ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req,
 {
         struct list_head *tmp;
         struct ldlm_lock *lock;
-       ldlm_mode_t req_mode = req->l_req_mode;
+        ldlm_mode_t req_mode = req->l_req_mode;
         int compat = 1;
         ENTRY;
 
@@ -67,11 +67,11 @@ ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req,
 }
 
 int
-ldlm_plain_enqueue(struct ldlm_lock *lock, int *flags, int first_enq,
-                   ldlm_error_t *err)
+ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq,
+                        ldlm_error_t *err)
 {
         struct ldlm_resource *res = lock->l_resource;
-       int compat;
+        int compat;
         ENTRY;
 
         if (first_enq) {