Whamcloud - gitweb
Branch b1_6
authorvitaly <vitaly>
Thu, 25 Oct 2007 12:02:13 +0000 (12:02 +0000)
committervitaly <vitaly>
Thu, 25 Oct 2007 12:02:13 +0000 (12:02 +0000)
b=13622
i=tappro
i=green

1) do not zero cookie in class_handle_unhash_nolock() to let it be rehashed back later;
2) dlm_lock_match() returns the matched mode instead of 1; pass bitwise sets of modes into;

lustre/include/lustre_dlm.h
lustre/include/lustre_handles.h
lustre/ldlm/ldlm_lock.c
lustre/liblustre/super.c
lustre/llite/file.c
lustre/llite/llite_lib.c
lustre/lov/lov_obd.c
lustre/lov/lov_request.c
lustre/mdc/mdc_locks.c
lustre/obdclass/lustre_handles.c
lustre/osc/osc_request.c

index 0463a3f..791f7bd 100644 (file)
@@ -637,9 +637,10 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_allow_match(struct ldlm_lock *lock);
-int ldlm_lock_match(struct ldlm_namespace *ns, int flags, struct ldlm_res_id *,
-                    ldlm_type_t type, ldlm_policy_data_t *, ldlm_mode_t mode,
-                    struct lustre_handle *);
+ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
+                            struct ldlm_res_id *, ldlm_type_t type,
+                            ldlm_policy_data_t *, ldlm_mode_t mode,
+                            struct lustre_handle *);
 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
                                         int *flags);
 void ldlm_lock_cancel(struct ldlm_lock *lock);
index ac56c27..9186173 100644 (file)
@@ -32,10 +32,12 @@ struct portals_handle {
 
         /* newly added fields to handle the RCU issue. -jxiong */
         spinlock_t h_lock;
-        unsigned int h_size;
         void *h_ptr;
         void (*h_free_cb)(void *, size_t);
         struct rcu_head h_rcu;
+        unsigned int h_size;
+        __u8 h_in:1;
+        __u8 h_unused[3];
 };
 #define RCU2HANDLE(rcu)    container_of(rcu, struct portals_handle, h_rcu)
 
index 4e62e68..1a51e03 100644 (file)
@@ -901,7 +901,8 @@ void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
 
 /* returns a referenced lock or NULL.  See the flag descriptions below, in the
  * comment above ldlm_lock_match */
-static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
+static struct ldlm_lock *search_queue(struct list_head *queue,
+                                      ldlm_mode_t *mode,
                                       ldlm_policy_data_t *policy,
                                       struct ldlm_lock *old_lock, int flags)
 {
@@ -909,6 +910,8 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
         struct list_head *tmp;
 
         list_for_each(tmp, queue) {
+                ldlm_mode_t match;
+
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
                 if (lock == old_lock)
@@ -927,16 +930,17 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                     lock->l_readers == 0 && lock->l_writers == 0)
                         continue;
 
-                if (!(lock->l_req_mode & mode))
+                if (!(lock->l_req_mode & *mode))
                         continue;
 
+                match = lock->l_req_mode;
                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
                     (lock->l_policy_data.l_extent.start >
                      policy->l_extent.start ||
                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
                         continue;
 
-                if (unlikely(mode == LCK_GROUP) &&
+                if (unlikely(match == LCK_GROUP) &&
                     lock->l_resource->lr_type == LDLM_EXTENT &&
                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
                         continue;
@@ -960,8 +964,9 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                         LDLM_LOCK_GET(lock);
                         ldlm_lock_touch_in_lru(lock);
                 } else {
-                        ldlm_lock_addref_internal_nolock(lock, mode);
+                        ldlm_lock_addref_internal_nolock(lock, match);
                 }
+                *mode = match;
                 return lock;
         }
 
@@ -996,10 +1001,10 @@ void ldlm_lock_allow_match(struct ldlm_lock *lock)
  * Returns 1 if it finds an already-existing lock that is compatible; in this
  * case, lockh is filled in with a addref()ed lock
  */
-int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
-                    struct ldlm_res_id *res_id, ldlm_type_t type,
-                    ldlm_policy_data_t *policy, ldlm_mode_t mode,
-                    struct lustre_handle *lockh)
+ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
+                            struct ldlm_res_id *res_id, ldlm_type_t type,
+                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
+                            struct lustre_handle *lockh)
 {
         struct ldlm_resource *res;
         struct ldlm_lock *lock, *old_lock = NULL;
@@ -1024,15 +1029,15 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
 
         lock_res(res);
 
-        lock = search_queue(&res->lr_granted, mode, policy, old_lock, flags);
+        lock = search_queue(&res->lr_granted, &mode, policy, old_lock, flags);
         if (lock != NULL)
                 GOTO(out, rc = 1);
         if (flags & LDLM_FL_BLOCK_GRANTED)
                 GOTO(out, rc = 0);
-        lock = search_queue(&res->lr_converting, mode, policy, old_lock, flags);
+        lock = search_queue(&res->lr_converting, &mode, policy, old_lock, flags);
         if (lock != NULL)
                 GOTO(out, rc = 1);
-        lock = search_queue(&res->lr_waiting, mode, policy, old_lock, flags);
+        lock = search_queue(&res->lr_waiting, &mode, policy, old_lock, flags);
         if (lock != NULL)
                 GOTO(out, rc = 1);
 
@@ -1088,7 +1093,7 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
         if (flags & LDLM_FL_TEST_LOCK && rc)
                 LDLM_LOCK_PUT(lock);
 
-        return rc;
+        return rc ? mode : 0;
 }
 
 /* Returns a referenced lock */
index 5425d6b..8a59666 100644 (file)
@@ -399,7 +399,7 @@ static int llu_have_md_lock(struct inode *inode, __u64 lockpart)
 
         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
-                            &policy, LCK_PW | LCK_PR, &lockh)) {
+                            &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
                 RETURN(1);
         }
         RETURN(0);
index a0ea289..595daad 100644 (file)
@@ -2490,7 +2490,7 @@ int ll_have_md_lock(struct inode *inode, __u64 bits)
 
         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
-                            &policy, LCK_CR|LCK_CW|LCK_PR, &lockh)) {
+                            &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
                 RETURN(1);
         }
 
index d13f173..9cacc16 100644 (file)
@@ -1271,7 +1271,7 @@ static int ll_setattr_do_truncate(struct inode *inode, loff_t new_size)
                 ast_flags = LDLM_FL_BLOCK_GRANTED;
                 rc = obd_match(sbi->ll_osc_exp, lsm, LDLM_EXTENT,
                                &policy, LCK_PW, &ast_flags, inode, &lockh);
-                if (rc == 1) {
+                if (rc > 0) {
                         local_lock = 2;
                         rc = 0;
                 } else if (rc == 0) {
index 0e6c873..ece42ef 100644 (file)
@@ -1842,6 +1842,7 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                        struct ldlm_enqueue_info *einfo,
                        struct ptlrpc_request_set *rqset)
 {
+        ldlm_mode_t mode = einfo->ei_mode;
         struct lov_request_set *set;
         struct lov_request *req;
         struct list_head *pos;
@@ -1851,6 +1852,7 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
 
         LASSERT(oinfo);
         ASSERT_LSM_MAGIC(oinfo->oi_md);
+        LASSERT(mode == (mode & -mode));
 
         /* we should never be asked to replay a lock this way. */
         LASSERT((oinfo->oi_flags & LDLM_FL_REPLAY) == 0);
@@ -1880,7 +1882,7 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 RETURN(rc);
         }
 out:
-        rc = lov_fini_enqueue_set(set, einfo->ei_mode, rc, rqset);
+        rc = lov_fini_enqueue_set(set, mode, rc, rqset);
         RETURN(rc);
 }
 
@@ -1898,6 +1900,7 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
         ENTRY;
 
         ASSERT_LSM_MAGIC(lsm);
+        LASSERT((*flags & LDLM_FL_TEST_LOCK) || mode == (mode & -mode));
 
         if (!exp || !exp->exp_obd)
                 RETURN(-ENODEV);
@@ -1920,7 +1923,7 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
                                req->rq_oi.oi_md, type, &sub_policy,
                                mode, &lov_flags, data, lov_lockhp);
                 rc = lov_update_match_set(set, req, rc);
-                if (rc != 1)
+                if (rc <= 0)
                         break;
         }
         lov_fini_match_set(set, mode, *flags);
index a59ddd4..858894c 100644 (file)
@@ -360,7 +360,7 @@ int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
         int ret = rc;
         ENTRY;
 
-        if (rc == 1)
+        if (rc > 0)
                 ret = 0;
         else if (rc == 0)
                 ret = 1;
index 6f2da48..6f3813e 100644 (file)
@@ -595,46 +595,33 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 }
 EXPORT_SYMBOL(mdc_enqueue);
 
-int mdc_revalidate_lock(struct obd_export *exp,
-                        struct lookup_intent *it,
+int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
                         struct ll_fid *fid)
 {
                 /* We could just return 1 immediately, but since we should only
                  * be called in revalidate_it if we already have a lock, let's
                  * verify that. */
         struct ldlm_res_id res_id = {.name ={fid->id, fid->generation}};
-                struct lustre_handle lockh;
-                ldlm_policy_data_t policy;
-                int mode = LCK_CR;
-        int rc;
-
-                /* As not all attributes are kept under update lock, e.g. 
-                   owner/group/acls are under lookup lock, we need both 
-                   ibits for GETATTR. */
-                policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
-                        MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
-                        MDS_INODELOCK_LOOKUP;
-
-        rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED,
-                             &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh);
-                if (!rc) {
-                        mode = LCK_CW;
-                        rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
-                                     LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
-                                     &policy, LCK_CW, &lockh);
-                }
-                if (!rc) {
-                        mode = LCK_PR;
-                        rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
-                                     LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
-                                     &policy, LCK_PR, &lockh);
-                }
-                if (rc) {
+        struct lustre_handle lockh;
+        ldlm_policy_data_t policy;
+        ldlm_mode_t mode;
+
+        /* As not all attributes are kept under update lock, e.g. 
+           owner/group/acls are under lookup lock, we need both 
+           ibits for GETATTR. */
+        policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
+                MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
+                MDS_INODELOCK_LOOKUP;
+
+        mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
+                               LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
+                               &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
+        if (mode) {
                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
-                        it->d.lustre.it_lock_mode = mode;
-                }
+                it->d.lustre.it_lock_mode = mode;
+        }
 
-        return rc;
+        return !!mode;
 }
 EXPORT_SYMBOL(mdc_revalidate_lock);
 
index 89388f6..05b10c4 100644 (file)
@@ -102,6 +102,7 @@ void class_handle_hash(struct portals_handle *h, portals_handle_addref_cb cb)
         bucket = &handle_hash[h->h_cookie & HANDLE_HASH_MASK];
         spin_lock(&bucket->lock);
         list_add_rcu(&h->h_link, &bucket->head);
+        h->h_in = 1;
         spin_unlock(&bucket->lock);
 
         CDEBUG(D_INFO, "added object %p with handle "LPX64" to hash\n",
@@ -121,11 +122,11 @@ static void class_handle_unhash_nolock(struct portals_handle *h)
                h, h->h_cookie);
 
         spin_lock(&h->h_lock);
-        if (h->h_cookie == 0) {
+        if (h->h_in == 0) {
                 spin_unlock(&h->h_lock);
                 return;
         }
-        h->h_cookie = 0;
+        h->h_in = 0;
         spin_unlock(&h->h_lock);
         list_del_rcu(&h->h_link);
 }
index f145d89..04df048 100644 (file)
@@ -2713,8 +2713,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
                 return;
         }
         lock_res_and_lock(lock);
-#ifdef __KERNEL__
-#ifdef __LINUX__
+#if defined (__KERNEL__) && defined (__LINUX__)
         /* Liang XXX: Darwin and Winnt checking should be added */
         if (lock->l_ast_data && lock->l_ast_data != data) {
                 struct inode *new_inode = data;
@@ -2729,7 +2728,6 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
                          new_inode, new_inode->i_ino, new_inode->i_generation);
         }
 #endif
-#endif
         lock->l_ast_data = data;
         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
         unlock_res_and_lock(lock);
@@ -2827,6 +2825,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
         struct ldlm_reply *rep;
         struct ptlrpc_request *req = NULL;
         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
+        ldlm_mode_t mode;
         int rc;
         ENTRY;
 
@@ -2840,11 +2839,29 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 goto no_match;
 
         /* Next, search for already existing extent locks that will cover us */
-        rc = ldlm_lock_match(obd->obd_namespace,
-                             oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
-                             einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
-                             oinfo->oi_lockh);
-        if (rc == 1) {
+        /* If we're trying to read, we also search for an existing PW lock.  The
+         * VFS and page cache already protect us locally, so lots of readers/
+         * writers can share a single PW lock.
+         *
+         * There are problems with conversion deadlocks, so instead of
+         * converting a read lock to a write lock, we'll just enqueue a new
+         * one.
+         *
+         * At some point we should cancel the read lock instead of making them
+         * send us a blocking callback, but there are problems with canceling
+         * locks out from other users right now, too. */
+        mode = einfo->ei_mode;
+        if (einfo->ei_mode == LCK_PR)
+                mode |= LCK_PW;
+        mode = ldlm_lock_match(obd->obd_namespace,
+                               oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
+                               einfo->ei_type, &oinfo->oi_policy, mode,
+                               oinfo->oi_lockh);
+        if (mode) {
+                /* addref the lock only if not async requests and PW lock is
+                 * matched whereas we asked for PR. */
+                if (!rqset && einfo->ei_mode != mode)
+                        ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
                                         oinfo->oi_flags);
                 if (intent) {
@@ -2857,45 +2874,14 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
 
                 /* For async requests, decref the lock. */
-                if (rqset)
+                if (einfo->ei_mode != mode)
+                        ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
+                else if (rqset)
                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
 
                 RETURN(ELDLM_OK);
         }
 
-        /* If we're trying to read, we also search for an existing PW lock.  The
-         * VFS and page cache already protect us locally, so lots of readers/
-         * writers can share a single PW lock.
-         *
-         * There are problems with conversion deadlocks, so instead of
-         * converting a read lock to a write lock, we'll just enqueue a new
-         * one.
-         *
-         * At some point we should cancel the read lock instead of making them
-         * send us a blocking callback, but there are problems with canceling
-         * locks out from other users right now, too. */
-
-        if (einfo->ei_mode == LCK_PR) {
-                rc = ldlm_lock_match(obd->obd_namespace,
-                                     oinfo->oi_flags | LDLM_FL_LVB_READY,
-                                     &res_id, einfo->ei_type, &oinfo->oi_policy,
-                                     LCK_PW, oinfo->oi_lockh);
-                if (rc == 1) {
-                        /* FIXME: This is not incredibly elegant, but it might
-                         * be more elegant than adding another parameter to
-                         * lock_match.  I want a second opinion. */
-                        /* addref the lock only if not async requests. */
-                        if (!rqset)
-                                ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
-                        osc_set_data_with_check(oinfo->oi_lockh,
-                                                einfo->ei_cbdata,
-                                                oinfo->oi_flags);
-                        oinfo->oi_cb_up(oinfo, ELDLM_OK);
-                        ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
-                        RETURN(ELDLM_OK);
-                }
-        }
-
  no_match:
         if (intent) {
                 int size[3] = {
@@ -2952,8 +2938,8 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
 {
         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
         struct obd_device *obd = exp->exp_obd;
-        int rc;
         int lflags = *flags;
+        ldlm_mode_t rc;
         ENTRY;
 
         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
@@ -2964,29 +2950,23 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
         policy->l_extent.end |= ~CFS_PAGE_MASK;
 
         /* Next, search for already existing extent locks that will cover us */
-        rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, &res_id, type,
-                             policy, mode, lockh);
-        if (rc) {
-                //if (!(*flags & LDLM_FL_TEST_LOCK))
-                        osc_set_data_with_check(lockh, data, lflags);
-                RETURN(rc);
-        }
         /* If we're trying to read, we also search for an existing PW lock.  The
          * VFS and page cache already protect us locally, so lots of readers/
          * writers can share a single PW lock. */
-        if (mode == LCK_PR) {
-                rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, 
-                                     &res_id, type,
-                                     policy, LCK_PW, lockh);
-                if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
-                        /* FIXME: This is not incredibly elegant, but it might
-                         * be more elegant than adding another parameter to
-                         * lock_match.  I want a second opinion. */
-                        osc_set_data_with_check(lockh, data, lflags);
+        rc = mode;
+        if (mode == LCK_PR)
+                rc |= LCK_PW;
+        rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
+                             &res_id, type, policy, rc, lockh);
+        if (rc) {
+                osc_set_data_with_check(lockh, data, lflags);
+                if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
                         ldlm_lock_addref(lockh, LCK_PR);
                         ldlm_lock_decref(lockh, LCK_PW);
                 }
+                RETURN(rc);
         }
+
         RETURN(rc);
 }