Whamcloud - gitweb
b=16774
authorzhanghc <zhanghc>
Tue, 7 Jul 2009 16:09:41 +0000 (16:09 +0000)
committerzhanghc <zhanghc>
Tue, 7 Jul 2009 16:09:41 +0000 (16:09 +0000)
cache the extent lock gotten before doing I/O in "ll_file_aio_read"
and "ll_file_aio_write", then "ll_read_page", "ll_read_ahead_page"
and "ll_writepage" will save much time spent on "ldlm_lock_match"
(in cache_add_extent) to find the appropriate extent lock

1, using current->journal_info to store the locks(struct ll_thread_data)
2, replace o_reget_short_lock, o_release_short_lock with o_get_lock in obd_ops

i=johann@sun.com
i=oleg.drokin@sun.com

18 files changed:
lustre/include/lustre_dlm.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/liblustre/rw.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/llite_mmap.c
lustre/llite/rw.c
lustre/lov/lov_obd.c
lustre/lov/lov_request.c
lustre/mgc/mgc_request.c
lustre/obdclass/lprocfs_status.c
lustre/obdecho/echo_client.c
lustre/osc/cache.c
lustre/osc/osc_request.c

index cb635f9..938539b 100644 (file)
@@ -771,8 +771,8 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_allow_match(struct ldlm_lock *lock);
-int ldlm_lock_fast_match(struct ldlm_lock *, int, obd_off, obd_off, void **);
-void ldlm_lock_fast_release(void *, int);
+int ldlm_lock_fast_match(struct ldlm_lock *, int, obd_off, obd_off,
+                         struct lustre_handle *);
 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
                             struct ldlm_res_id *, ldlm_type_t type,
                             ldlm_policy_data_t *, ldlm_mode_t mode,
index 928c5f8..c365c13 100644 (file)
@@ -1144,16 +1144,11 @@ struct obd_ops {
                                  struct lov_oinfo *loi,
                                  cfs_page_t *page, obd_off offset,
                                  struct obd_async_page_ops *ops, void *data,
-                                 void **res, int nocache,
+                                 void **res, int flags,
                                  struct lustre_handle *lockh);
-        int (*o_reget_short_lock)(struct obd_export *exp,
-                                 struct lov_stripe_md *lsm,
-                                 void **res, int rw,
-                                 obd_off start, obd_off end,
-                                 void **cookie);
-        int (*o_release_short_lock)(struct obd_export *exp,
-                                    struct lov_stripe_md *lsm, obd_off end,
-                                    void *cookie, int rw);
+        int (*o_get_lock)(struct obd_export *exp, struct lov_stripe_md *lsm,
+                          void **res, int rw, obd_off start, obd_off end,
+                          struct lustre_handle *lockh, int flags);
         int (*o_queue_async_io)(struct obd_export *exp,
                                 struct lov_stripe_md *lsm,
                                 struct lov_oinfo *loi, void *cookie,
@@ -1216,7 +1211,8 @@ struct obd_ops {
         int (*o_change_cbdata)(struct obd_export *, struct lov_stripe_md *,
                                ldlm_iterator_t it, void *data);
         int (*o_cancel)(struct obd_export *, struct lov_stripe_md *md,
-                        __u32 mode, struct lustre_handle *);
+                        __u32 mode, struct lustre_handle *, int flags,
+                        obd_off end);
         int (*o_cancel_unused)(struct obd_export *, struct lov_stripe_md *,
                                int flags, void *opaque);
         int (*o_join_lru)(struct obd_export *, struct lov_stripe_md *,
index d2a4aa4..d2b9202 100644 (file)
@@ -1124,12 +1124,16 @@ static inline int obd_brw_rqset(int cmd, struct obd_export *exp,
         RETURN(rc);
 }
 
+/* flags used by obd_prep_async_page */
+#define OBD_PAGE_NO_CACHE  0x00000001 /* don't add to cache */
+#define OBD_FAST_LOCK 0x00000002 /* lockh refers to a "fast lock" */
+
 static inline  int obd_prep_async_page(struct obd_export *exp,
                                        struct lov_stripe_md *lsm,
                                        struct lov_oinfo *loi,
                                        cfs_page_t *page, obd_off offset,
                                        struct obd_async_page_ops *ops,
-                                       void *data, void **res, int nocache,
+                                       void *data, void **res, int flags,
                                        struct lustre_handle *lockh)
 {
         int ret;
@@ -1139,37 +1143,23 @@ static inline  int obd_prep_async_page(struct obd_export *exp,
         EXP_COUNTER_INCREMENT(exp, prep_async_page);
 
         ret = OBP(exp->exp_obd, prep_async_page)(exp, lsm, loi, page, offset,
-                                                 ops, data, res, nocache,
+                                                 ops, data, res, flags,
                                                  lockh);
         RETURN(ret);
 }
 
-static inline int obd_reget_short_lock(struct obd_export *exp,
-                                       struct lov_stripe_md *lsm,
-                                       void **res, int rw,
-                                       obd_off start, obd_off end,
-                                       void **cookie)
-{
-        ENTRY;
-
-        OBD_CHECK_OP(exp->exp_obd, reget_short_lock, -EOPNOTSUPP);
-        EXP_COUNTER_INCREMENT(exp, reget_short_lock);
-
-        RETURN(OBP(exp->exp_obd, reget_short_lock)(exp, lsm, res, rw,
-                                                   start, end, cookie));
-}
-
-static inline int obd_release_short_lock(struct obd_export *exp,
-                                         struct lov_stripe_md *lsm, obd_off end,
-                                         void *cookie, int rw)
+static inline int obd_get_lock(struct obd_export *exp,
+                               struct lov_stripe_md *lsm, void **res, int rw,
+                               obd_off start, obd_off end,
+                               struct lustre_handle *lockh, int flags)
 {
         ENTRY;
 
-        OBD_CHECK_OP(exp->exp_obd, release_short_lock, -EOPNOTSUPP);
-        EXP_COUNTER_INCREMENT(exp, release_short_lock);
+        OBD_CHECK_OP(exp->exp_obd, get_lock, -EOPNOTSUPP);
+        EXP_COUNTER_INCREMENT(exp, get_lock);
 
-        RETURN(OBP(exp->exp_obd, release_short_lock)(exp, lsm, end,
-                                                     cookie, rw));
+        RETURN(OBP(exp->exp_obd, get_lock)(exp, lsm, res, rw, start, end,
+                                           lockh, flags));
 }
 
 static inline int obd_queue_async_io(struct obd_export *exp,
@@ -1411,9 +1401,9 @@ static inline int obd_change_cbdata(struct obd_export *exp,
         RETURN(rc);
 }
 
-static inline int obd_cancel(struct obd_export *exp,
-                             struct lov_stripe_md *ea, __u32 mode,
-                             struct lustre_handle *lockh)
+static inline int obd_cancel(struct obd_export *exp, struct lov_stripe_md *ea,
+                             __u32 mode, struct lustre_handle *lockh, int flags,
+                             obd_off end)
 {
         int rc;
         ENTRY;
@@ -1421,7 +1411,7 @@ static inline int obd_cancel(struct obd_export *exp,
         EXP_CHECK_OP(exp, cancel);
         EXP_COUNTER_INCREMENT(exp, cancel);
 
-        rc = OBP(exp->exp_obd, cancel)(exp, ea, mode, lockh);
+        rc = OBP(exp->exp_obd, cancel)(exp, ea, mode, lockh, flags, end);
         RETURN(rc);
 }
 
index 667fc50..9a97126 100644 (file)
@@ -950,9 +950,10 @@ void ldlm_lock_allow_match(struct ldlm_lock *lock)
 
 int ldlm_lock_fast_match(struct ldlm_lock *lock, int rw,
                          obd_off start, obd_off end,
-                         void **cookie)
+                         struct lustre_handle *lockh)
 {
         LASSERT(rw == OBD_BRW_READ || rw == OBD_BRW_WRITE);
+        LASSERT(lockh != NULL);
 
         if (!lock)
                 return 0;
@@ -974,9 +975,10 @@ int ldlm_lock_fast_match(struct ldlm_lock *lock, int rw,
             !lock->l_writers && !lock->l_readers)
                 goto no_match;
 
-        ldlm_lock_addref_internal_nolock(lock, rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
+        ldlm_lock_addref_internal_nolock(lock,
+                                         rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
         unlock_res_and_lock(lock);
-        *cookie = (void *)lock;
+        ldlm_lock2handle(lock, lockh);
         return 1; /* avoid using rc for stack relief */
 
 no_match:
@@ -984,16 +986,6 @@ no_match:
         return 0;
 }
 
-void ldlm_lock_fast_release(void *cookie, int rw)
-{
-        struct ldlm_lock *lock = (struct ldlm_lock *)cookie;
-
-        LASSERT(lock != NULL);
-        LASSERT(rw == OBD_BRW_READ || rw == OBD_BRW_WRITE);
-        LASSERT(rw == OBD_BRW_READ || (lock->l_granted_mode & (LCK_PW | LCK_GROUP)));
-        ldlm_lock_decref_internal(lock, rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
-}
-
 /* Can be called in two ways:
  *
  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
index 95e37fb..75019be 100644 (file)
@@ -2357,7 +2357,6 @@ EXPORT_SYMBOL(__ldlm_handle2lock);
 EXPORT_SYMBOL(ldlm_lock_get);
 EXPORT_SYMBOL(ldlm_lock_put);
 EXPORT_SYMBOL(ldlm_lock_fast_match);
-EXPORT_SYMBOL(ldlm_lock_fast_release);
 EXPORT_SYMBOL(ldlm_lock_match);
 EXPORT_SYMBOL(ldlm_lock_cancel);
 EXPORT_SYMBOL(ldlm_lock_addref);
index 746af2b..a6af277 100644 (file)
@@ -366,7 +366,7 @@ int llu_extent_unlock(struct ll_file_data *fd, struct inode *inode,
             (sbi->ll_flags & LL_SBI_NOLCK) || mode == LCK_NL)
                 RETURN(0);
 
-        rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
+        rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh, 0, 0);
 
         RETURN(rc);
 }
@@ -508,7 +508,8 @@ static int llu_queue_pio(int cmd, struct llu_io_group *group,
                                          (obd_off)page->index << CFS_PAGE_SHIFT,
                                          &llu_async_page_ops,
                                          llap, &llap->llap_cookie,
-                                         1 /* no cache in liblustre at all */,
+                                         /* no cache in liblustre at all */
+                                         OBD_PAGE_NO_CACHE,
                                          NULL);
                 if (rc) {
                         LASSERT(rc < 0);
index e3a5c94..f8718c3 100644 (file)
@@ -1111,7 +1111,7 @@ int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
             (sbi->ll_flags & LL_SBI_NOLCK))
                 RETURN(0);
 
-        rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
+        rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh, 0, 0);
 
         RETURN(rc);
 }
@@ -1277,9 +1277,8 @@ static int iov_copy_update(unsigned long *nr_segs, const struct iovec **iov_out,
         return 0;
 }
 
-static int ll_reget_short_lock(struct page *page, int rw,
-                               obd_off start, obd_off end,
-                               void **cookie)
+static int ll_get_short_lock(struct page *page, int rw, obd_off start,
+                             obd_off end, struct lustre_handle *lockh)
 {
         struct ll_async_page *llap;
         struct obd_export *exp;
@@ -1295,13 +1294,13 @@ static int ll_reget_short_lock(struct page *page, int rw,
         if (llap == NULL)
                 RETURN(0);
 
-        RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
-                                    &llap->llap_cookie, rw, start, end,
-                                    cookie));
+        RETURN(obd_get_lock(exp, ll_i2info(inode)->lli_smd,
+                            &llap->llap_cookie, rw, start, end, lockh,
+                            OBD_FAST_LOCK));
 }
 
 static void ll_release_short_lock(struct inode *inode, obd_off end,
-                                  void *cookie, int rw)
+                                  struct lustre_handle *lockh, int rw)
 {
         struct obd_export *exp;
         int rc;
@@ -1310,8 +1309,9 @@ static void ll_release_short_lock(struct inode *inode, obd_off end,
         if (exp == NULL)
                 return;
 
-        rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
-                                    cookie, rw);
+        rc = obd_cancel(exp, ll_i2info(inode)->lli_smd,
+                        rw = OBD_BRW_READ ? LCK_PR : LCK_PW, lockh,
+                        OBD_FAST_LOCK, end);
         if (rc < 0)
                 CERROR("unlock failed (%d)\n", rc);
 }
@@ -1320,7 +1320,8 @@ static inline int ll_file_get_fast_lock(struct file *file,
                                         obd_off ppos, obd_off end,
                                         const struct iovec *iov,
                                         unsigned long nr_segs,
-                                        void **cookie, int rw)
+                                        struct lustre_handle *lockh,
+                                        int rw)
 {
         int rc = 0, seg;
         struct page *page;
@@ -1337,7 +1338,7 @@ static inline int ll_file_get_fast_lock(struct file *file,
         page = find_lock_page(file->f_dentry->d_inode->i_mapping,
                               ppos >> CFS_PAGE_SHIFT);
         if (page) {
-                if (ll_reget_short_lock(page, rw, ppos, end, cookie))
+                if (ll_get_short_lock(page, rw, ppos, end, lockh))
                         rc = 1;
 
                 unlock_page(page);
@@ -1349,27 +1350,22 @@ out:
 }
 
 static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
-                                         void *cookie, int rw)
+                                         struct lustre_handle *lockh, int rw)
 {
-        ll_release_short_lock(inode, end, cookie, rw);
+        ll_release_short_lock(inode, end, lockh, rw);
 }
 
-enum ll_lock_style {
-        LL_LOCK_STYLE_NOLOCK   = 0,
-        LL_LOCK_STYLE_FASTLOCK = 1,
-        LL_LOCK_STYLE_TREELOCK = 2
-};
-
 static inline int ll_file_get_lock(struct file *file, obd_off ppos,
                                    obd_off end, const struct iovec *iov,
-                                   unsigned long nr_segs, void **cookie,
+                                   unsigned long nr_segs,
+                                   struct lustre_handle *lockh,
                                    struct ll_lock_tree *tree, int rw)
 {
         int rc;
 
         ENTRY;
 
-        if (ll_file_get_fast_lock(file, ppos, end, iov, nr_segs, cookie, rw))
+        if (ll_file_get_fast_lock(file, ppos, end, iov, nr_segs, lockh, rw))
                 RETURN(LL_LOCK_STYLE_FASTLOCK);
 
         rc = ll_file_get_tree_lock_iov(tree, file, iov, nr_segs,
@@ -1388,8 +1384,8 @@ static inline int ll_file_get_lock(struct file *file, obd_off ppos,
 
 static inline void ll_file_put_lock(struct inode *inode, obd_off end,
                                     enum ll_lock_style lock_style,
-                                    void *cookie, struct ll_lock_tree *tree,
-                                    int rw)
+                                    struct lustre_handle *lockh,
+                                    struct ll_lock_tree *tree, int rw)
 
 {
         switch (lock_style) {
@@ -1397,7 +1393,7 @@ static inline void ll_file_put_lock(struct inode *inode, obd_off end,
                 ll_tree_unlock(tree);
                 break;
         case LL_LOCK_STYLE_FASTLOCK:
-                ll_file_put_fast_lock(inode, end, cookie, rw);
+                ll_file_put_fast_lock(inode, end, lockh, rw);
                 break;
         default:
                 CERROR("invalid locking style (%d)\n", lock_style);
@@ -1419,18 +1415,16 @@ static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
         struct ll_inode_info *lli = ll_i2info(inode);
         struct lov_stripe_md *lsm = lli->lli_smd;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct ll_lock_tree tree;
+        struct ll_thread_data ltd = { 0 };
         struct ost_lvb lvb;
         struct ll_ra_read bead;
         int ra = 0;
         obd_off end;
         ssize_t retval, chunk, sum = 0;
-        int lock_style;
         struct iovec *iov_copy = NULL;
         unsigned long nrsegs_copy, nrsegs_orig = 0;
         size_t count, iov_offset = 0;
         __u64 kms;
-        void *cookie;
         ENTRY;
 
         count = ll_file_get_iov_count(iov, &nr_segs);
@@ -1480,7 +1474,11 @@ static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
                 RETURN(sum);
         }
 
+        ltd.ltd_magic = LTD_MAGIC;
+        ll_td_set(&ltd);
 repeat:
+        memset(&ltd, 0, sizeof(ltd));
+        ltd.ltd_magic = LTD_MAGIC;
         if (sbi->ll_max_rw_chunk != 0 && !(file->f_flags & O_DIRECT)) {
                 /* first, let's know the end of the current stripe */
                 end = *ppos;
@@ -1520,13 +1518,14 @@ repeat:
 
         down_read(&lli->lli_truncate_rwsem); /* Bug 18233 */
 
-        lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
-                                      iov_copy, nrsegs_copy, &cookie, &tree,
-                                      OBD_BRW_READ);
-        if (lock_style < 0 || lock_style == LL_LOCK_STYLE_NOLOCK)
+        ltd.lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end, 
+                                          iov_copy, nrsegs_copy, 
+                                          &ltd.u.lockh, &ltd.u.tree,
+                                          OBD_BRW_READ);
+        if (ltd.lock_style < 0 || ltd.lock_style == LL_LOCK_STYLE_NOLOCK)
                 up_read(&lli->lli_truncate_rwsem);
-        if (lock_style < 0)
-                GOTO(out, retval = lock_style);
+        if (ltd.lock_style < 0)
+                GOTO(out, retval = ltd.lock_style);
 
         ll_inode_size_lock(inode, 1);
         /*
@@ -1557,9 +1556,10 @@ repeat:
                 ll_inode_size_unlock(inode, 1);
                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
                 if (retval) {
-                        if (lock_style != LL_LOCK_STYLE_NOLOCK) {
-                                ll_file_put_lock(inode, end, lock_style,
-                                                 cookie, &tree, OBD_BRW_READ);
+                        if (ltd.lock_style != LL_LOCK_STYLE_NOLOCK) {
+                                ll_file_put_lock(inode, end, ltd.lock_style,
+                                                 &ltd.u.lockh, &ltd.u.tree,
+                                                 OBD_BRW_READ);
                                 up_read(&lli->lli_truncate_rwsem);
                         }
                         goto out;
@@ -1573,9 +1573,12 @@ repeat:
 
                         if ((size == 0 && cur_index != 0) ||
                             (((size - 1) >> CFS_PAGE_SHIFT) < cur_index)) {
-                                if (lock_style != LL_LOCK_STYLE_NOLOCK) {
-                                        ll_file_put_lock(inode, end, lock_style,
-                                                         cookie, &tree,
+                                if (ltd.lock_style != LL_LOCK_STYLE_NOLOCK) {
+
+                                        ll_file_put_lock(inode, end, 
+                                                         ltd.lock_style,
+                                                         &ltd.u.lockh, 
+                                                         &ltd.u.tree,
                                                          OBD_BRW_READ);
                                         up_read(&lli->lli_truncate_rwsem);
                                 }
@@ -1599,7 +1602,7 @@ repeat:
                inode->i_ino, chunk, *ppos, i_size_read(inode));
 
         /* turn off the kernel's read-ahead */
-        if (lock_style != LL_LOCK_STYLE_NOLOCK) {
+        if (ltd.lock_style != LL_LOCK_STYLE_NOLOCK) {
                 struct ost_lvb *xtimes;
                 /* read under locks
                  *
@@ -1610,8 +1613,9 @@ repeat:
                  * ll_glimpse_size) could get correct values in lsm */
                 OBD_ALLOC_PTR(xtimes);
                 if (NULL == xtimes) {
-                        ll_file_put_lock(inode, end, lock_style, cookie,
-                                         &tree, OBD_BRW_READ);
+                        ll_file_put_lock(inode, end, ltd.lock_style, 
+                                         &ltd.u.lockh, &ltd.u.tree,
+                                         OBD_BRW_READ);
                         up_read(&lli->lli_truncate_rwsem);
                         GOTO(out, retval = -ENOMEM);
                 }
@@ -1639,8 +1643,8 @@ repeat:
                 retval = generic_file_aio_read(iocb, iov_copy, nrsegs_copy,
                                                *ppos);
 #endif
-                ll_file_put_lock(inode, end, lock_style, cookie,
-                                 &tree, OBD_BRW_READ);
+                ll_file_put_lock(inode, end, ltd.lock_style, &ltd.u.lockh,
+                                 &ltd.u.tree, OBD_BRW_READ);
                 up_read(&lli->lli_truncate_rwsem);
         } else {
                 retval = ll_direct_IO(READ, file, iov_copy, *ppos, nr_segs, 0);
@@ -1660,6 +1664,7 @@ repeat:
         }
 
  out:
+        ll_td_set(NULL);
         if (ra != 0)
                 ll_ra_read_ex(file, &bead);
         retval = (sum > 0) ? sum : retval;
@@ -1708,7 +1713,7 @@ static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
         struct inode *inode = file->f_dentry->d_inode;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-        struct ll_lock_tree tree;
+        struct ll_thread_data ltd = { 0 };
         loff_t maxbytes = ll_file_maxbytes(inode);
         loff_t lock_start, lock_end, end;
         ssize_t retval, chunk, sum = 0;
@@ -1741,7 +1746,12 @@ static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
         if (down_interruptible(&ll_i2info(inode)->lli_write_sem))
                 RETURN(-ERESTARTSYS);
 
+        ltd.ltd_magic = LTD_MAGIC;
+        ll_td_set(&ltd);
 repeat:
+        memset(&ltd, 0, sizeof(ltd));
+        ltd.ltd_magic = LTD_MAGIC;
+
         chunk = 0; /* just to fix gcc's warning */
         end = *ppos + count - 1;
 
@@ -1789,7 +1799,7 @@ repeat:
                 nrsegs_copy = nr_segs;
         }
 
-        tree_locked = ll_file_get_tree_lock_iov(&tree, file, iov_copy,
+        tree_locked = ll_file_get_tree_lock_iov(&ltd.u.tree, file, iov_copy,
                                                 nrsegs_copy,
                                                 (obd_off)lock_start,
                                                 (obd_off)lock_end,
@@ -1841,6 +1851,8 @@ repeat:
                 lov_stripe_unlock(lsm);
                 OBD_FREE_PTR(xtimes);
 
+                ltd.lock_style = LL_LOCK_STYLE_TREELOCK;
+
 #ifdef HAVE_FILE_WRITEV
                 retval = generic_file_writev(file, iov_copy, nrsegs_copy, ppos);
 #else
@@ -1860,7 +1872,7 @@ repeat:
 
 out_unlock:
         if (tree_locked)
-                ll_tree_unlock(&tree);
+                ll_tree_unlock(&ltd.u.tree);
 
 out:
         if (retval > 0) {
@@ -1872,6 +1884,7 @@ out:
 
         up(&ll_i2info(inode)->lli_write_sem);
 
+        ll_td_set(NULL);
         if (iov_copy && iov_copy != iov)
                 OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
 
index 6d2a39d..814a83d 100644 (file)
@@ -934,6 +934,26 @@ int ll_tree_lock_iov(struct ll_lock_tree *tree,
                      int ast_flags);
 int ll_tree_unlock(struct ll_lock_tree *tree);
 
+enum ll_lock_style {
+        LL_LOCK_STYLE_NOLOCK   = 0,
+        LL_LOCK_STYLE_FASTLOCK = 1,
+        LL_LOCK_STYLE_TREELOCK = 2
+};
+
+struct ll_thread_data {
+        int ltd_magic;
+        int lock_style;
+        struct list_head *tree_list;
+        union {
+                struct ll_lock_tree tree;
+                struct lustre_handle lockh;
+        } u;
+};
+struct ll_thread_data *ll_td_get(void);
+void ll_td_set(struct ll_thread_data *ltd);
+struct lustre_handle *ltd2lockh(struct ll_thread_data *ltd, __u64 start,
+                                __u64 end);
+
 #define    ll_s2sbi(sb)        (s2lsi(sb)->lsi_llsbi)
 
 static inline __u64 ll_ts2u64(struct timespec *time)
index 320e948..d8f58d8 100644 (file)
@@ -1502,7 +1502,7 @@ static int ll_setattr_do_truncate(struct inode *inode, loff_t new_size)
                 int err;
 
                 err = (local_lock == 2) ?
-                        obd_cancel(sbi->ll_osc_exp, lsm, LCK_PW, &lockh):
+                        obd_cancel(sbi->ll_osc_exp, lsm, LCK_PW, &lockh, 0, 0):
                         ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh);
                 if (unlikely(err != 0)){
                         CERROR("extent unlock failed: err=%d,"
index a56b6de..6f21e60 100644 (file)
@@ -481,6 +481,34 @@ static void ll_put_extent_lock(struct vm_area_struct *vma, int save_flags,
         ll_extent_unlock(fd, inode, ll_i2info(inode)->lli_smd, mode, lockh);
 }
 
+struct lustre_handle *ltd2lockh(struct ll_thread_data *ltd,
+                                __u64 start, __u64 end) {
+        ENTRY;
+        if (NULL == ltd)
+                RETURN(NULL);
+        switch(ltd->lock_style) {
+                case LL_LOCK_STYLE_FASTLOCK:
+                        RETURN(&ltd->u.lockh);
+                        break;
+                case LL_LOCK_STYLE_TREELOCK: {
+                        struct ll_lock_tree_node *node;
+                        if (ltd->tree_list == NULL)
+                                ltd->tree_list = &ltd->u.tree.lt_locked_list;
+
+                        list_for_each_entry(node, ltd->tree_list, lt_locked_item) {
+                                if (node->lt_policy.l_extent.start <= start &&
+                                    node->lt_policy.l_extent.end >= end) {
+                                        ltd->tree_list = node->lt_locked_item.prev;
+                                        RETURN(&node->lt_lockh);
+                                }
+                        }
+                }
+                default:
+                        break;
+        }
+        RETURN(NULL);
+}
+
 #ifndef HAVE_VM_OP_FAULT
 /**
  * Page fault handler.
index e89e320..da675c7 100644 (file)
@@ -770,7 +770,8 @@ static inline int llap_async_cache_rebalance(struct ll_sb_info *sbi)
 
 static struct ll_async_page *llap_from_page_with_lockh(struct page *page,
                                                        unsigned origin,
-                                                       struct lustre_handle *lockh)
+                                                       struct lustre_handle *lockh,
+                                                       int flags)
 {
         struct ll_async_page *llap;
         struct obd_export *exp;
@@ -795,8 +796,28 @@ static struct ll_async_page *llap_from_page_with_lockh(struct page *page,
         LASSERT(ll_async_page_slab);
         LASSERTF(origin < LLAP__ORIGIN_MAX, "%u\n", origin);
 
+        exp = ll_i2obdexp(page->mapping->host);
+        if (exp == NULL)
+                RETURN(ERR_PTR(-EINVAL));
+
         llap = llap_cast_private(page);
         if (llap != NULL) {
+                if (origin == LLAP_ORIGIN_READAHEAD && lockh) {
+                        /* the page could belong to another lock for which
+                         * we don't hold a reference. We need to check that
+                         * a reference is taken on a lock covering this page.
+                         * For readpage origin, this is fine because
+                         * ll_file_readv() took a reference on lock(s) covering
+                         * the whole read. However, for readhead, we don't have
+                         * this guarantee, so we need to check that the lock
+                         * matched in ll_file_readv() also covers this page */
+                        __u64 offset = ((loff_t)page->index) << CFS_PAGE_SHIFT;
+                        if (!obd_get_lock(exp, ll_i2info(inode)->lli_smd, 
+                                          &llap->llap_cookie, OBD_BRW_READ,
+                                          offset, offset + CFS_PAGE_SIZE - 1,
+                                          lockh, flags))
+                                RETURN(ERR_PTR(-ENOLCK));
+                }
                 /* move to end of LRU list, except when page is just about to
                  * die */
                 if (origin != LLAP_ORIGIN_REMOVEPAGE) {
@@ -828,10 +849,6 @@ static struct ll_async_page *llap_from_page_with_lockh(struct page *page,
                 GOTO(out, llap);
         }
 
-        exp = ll_i2obdexp(page->mapping->host);
-        if (exp == NULL)
-                RETURN(ERR_PTR(-EINVAL));
-
         /* limit the number of lustre-cached pages */
         cpu = cfs_get_cpu();
         pd = LL_PGLIST_DATA(sbi);
@@ -866,7 +883,7 @@ static struct ll_async_page *llap_from_page_with_lockh(struct page *page,
         rc = obd_prep_async_page(exp, ll_i2info(inode)->lli_smd, NULL, page,
                                  (obd_off)page->index << CFS_PAGE_SHIFT,
                                  &ll_async_page_ops, llap, &llap->llap_cookie,
-                                 0, lockh);
+                                 flags, lockh);
         if (rc) {
                 OBD_SLAB_FREE(llap, ll_async_page_slab,
                               ll_async_page_slab_size);
@@ -921,7 +938,7 @@ static struct ll_async_page *llap_from_page_with_lockh(struct page *page,
 static inline struct ll_async_page *llap_from_page(struct page *page,
                                                    unsigned origin)
 {
-        return llap_from_page_with_lockh(page, origin, NULL);
+        return llap_from_page_with_lockh(page, origin, NULL, 0);
 }
 
 static int queue_or_sync_write(struct obd_export *exp, struct inode *inode,
@@ -1042,7 +1059,8 @@ int ll_commit_write(struct file *file, struct page *page, unsigned from,
         if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
                 lockh = &fd->fd_cwlockh;
 
-        llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_COMMIT_WRITE, lockh);
+        llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_COMMIT_WRITE, lockh,
+                                         0);
         if (IS_ERR(llap))
                 RETURN(PTR_ERR(llap));
 
@@ -1324,6 +1342,28 @@ static int index_in_window(unsigned long index, unsigned long point,
         return start <= index && index <= end;
 }
 
+struct ll_thread_data *ll_td_get()
+{
+        struct ll_thread_data *ltd = current->journal_info;
+
+        LASSERT(ltd == NULL || ltd->ltd_magic == LTD_MAGIC);
+        return ltd;
+}
+
+void ll_td_set(struct ll_thread_data *ltd)
+{
+        if (ltd == NULL) {
+                ltd = current->journal_info;
+                LASSERT(ltd == NULL || ltd->ltd_magic == LTD_MAGIC);
+                current->journal_info = NULL;
+                return;
+        }
+
+        LASSERT(current->journal_info == NULL);
+        LASSERT(ltd->ltd_magic == LTD_MAGIC);
+        current->journal_info = ltd;
+}
+
 static struct ll_readahead_state *ll_ras_get(struct file *f)
 {
         struct ll_file_data       *fd;
@@ -1393,7 +1433,9 @@ static int ll_read_ahead_page(struct obd_export *exp, struct obd_io_group *oig,
         struct ll_async_page *llap;
         struct page *page;
         unsigned int gfp_mask = 0;
-        int rc = 0;
+        int rc = 0, flags = 0;
+        struct ll_thread_data *ltd;
+        struct lustre_handle *lockh = NULL;
 
         gfp_mask = GFP_HIGHUSER & ~__GFP_WAIT;
 #ifdef __GFP_NOWARN
@@ -1413,9 +1455,19 @@ static int ll_read_ahead_page(struct obd_export *exp, struct obd_io_group *oig,
                 GOTO(unlock_page, rc = 0);
         }
 
+        ltd = ll_td_get();
+        if (ltd && ltd->lock_style > 0) {
+                __u64 offset = ((loff_t)page->index) << CFS_PAGE_SHIFT;
+                lockh = ltd2lockh(ltd, offset,
+                                  offset + CFS_PAGE_SIZE - 1);
+                if (ltd->lock_style == LL_LOCK_STYLE_FASTLOCK)
+                        flags = OBD_FAST_LOCK;
+        }
+
         /* we do this first so that we can see the page in the /proc
          * accounting */
-        llap = llap_from_page(page, LLAP_ORIGIN_READAHEAD);
+        llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READAHEAD, lockh,
+                                         flags);
         if (IS_ERR(llap) || llap->llap_defer_uptodate) {
                 if (PTR_ERR(llap) == -ENOLCK) {
                         ll_ra_stats_inc(mapping, RA_STAT_FAILED_MATCH);
@@ -1580,8 +1632,8 @@ static int ll_read_ahead_pages(struct obd_export *exp,
 }
 
 static int ll_readahead(struct ll_readahead_state *ras,
-                         struct obd_export *exp, struct address_space *mapping,
-                         struct obd_io_group *oig, int flags)
+                        struct obd_export *exp, struct address_space *mapping,
+                        struct obd_io_group *oig, int flags)
 {
         unsigned long start = 0, end = 0, reserved;
         unsigned long ra_end, len;
@@ -1981,6 +2033,8 @@ int ll_writepage(struct page *page)
         struct ll_inode_info *lli = ll_i2info(inode);
         struct obd_export *exp;
         struct ll_async_page *llap;
+        struct ll_thread_data *ltd;
+        struct lustre_handle *lockh = NULL;
         int rc = 0;
         ENTRY;
 
@@ -1990,7 +2044,14 @@ int ll_writepage(struct page *page)
         if (exp == NULL)
                 GOTO(out, rc = -EINVAL);
 
-        llap = llap_from_page(page, LLAP_ORIGIN_WRITEPAGE);
+        ltd = ll_td_get();
+        /* currently, no FAST lock in write path */
+        if (ltd && ltd->lock_style == LL_LOCK_STYLE_TREELOCK) {
+                __u64 offset = ((loff_t)page->index) << CFS_PAGE_SHIFT;
+                lockh = ltd2lockh(ltd, offset, offset + CFS_PAGE_SIZE - 1);
+        }
+        
+        llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_WRITEPAGE, lockh, 0);
         if (IS_ERR(llap))
                 GOTO(out, rc = PTR_ERR(llap));
 
@@ -2045,7 +2106,7 @@ int ll_readpage(struct file *filp, struct page *page)
         struct ll_async_page *llap;
         struct obd_io_group *oig = NULL;
         struct lustre_handle *lockh = NULL;
-        int rc;
+        int rc, flags = 0;
         ENTRY;
 
         LASSERT(PageLocked(page));
@@ -2076,10 +2137,22 @@ int ll_readpage(struct file *filp, struct page *page)
         if (exp == NULL)
                 GOTO(out, rc = -EINVAL);
 
-        if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
+        if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
                 lockh = &fd->fd_cwlockh;
+        } else {
+                struct ll_thread_data *ltd;
+                ltd = ll_td_get();
+                if (ltd && ltd->lock_style > 0) {
+                        __u64 offset = ((loff_t)page->index) << CFS_PAGE_SHIFT;
+                        lockh = ltd2lockh(ltd, offset,
+                                          offset + CFS_PAGE_SIZE - 1);
+                        if (ltd->lock_style == LL_LOCK_STYLE_FASTLOCK)
+                                flags = OBD_FAST_LOCK;
+                }
+        }
 
-        llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READPAGE, lockh);
+        llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READPAGE, lockh,
+                                         flags);
         if (IS_ERR(llap)) {
                 if (PTR_ERR(llap) == -ENOLCK) {
                         CWARN("ino %lu page %lu (%llu) not covered by "
index 0d8fd7c..14097fe 100644 (file)
@@ -1893,10 +1893,10 @@ static struct obd_async_page_ops lov_async_page_ops = {
 };
 
 int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
-                           struct lov_oinfo *loi, cfs_page_t *page,
-                           obd_off offset, struct obd_async_page_ops *ops,
-                           void *data, void **res, int nocache,
-                           struct lustre_handle *lockh)
+                        struct lov_oinfo *loi, cfs_page_t *page,
+                        obd_off offset, struct obd_async_page_ops *ops,
+                        void *data, void **res, int flags,
+                        struct lustre_handle *lockh)
 {
         struct lov_obd *lov = &exp->exp_obd->u.lov;
         struct lov_async_page *lap;
@@ -1939,7 +1939,7 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
 
         lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap));
 
-        if (lockh) {
+        if (lockh && !(flags & OBD_FAST_LOCK)) {
                 lov_lockh = lov_handle2llh(lockh);
                 if (lov_lockh) {
                         lockh = lov_lockh->llh_handles + lap->lap_stripe;
@@ -1949,7 +1949,7 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
         rc = obd_prep_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
                                  lsm, loi, page, lap->lap_sub_offset,
                                  &lov_async_page_ops, lap,
-                                 &lap->lap_sub_cookie, nocache, lockh);
+                                 &lap->lap_sub_cookie, flags, lockh);
         if (lov_lockh)
                 lov_llh_put(lov_lockh);
         if (rc)
@@ -2223,7 +2223,8 @@ static int lov_change_cbdata(struct obd_export *exp,
 }
 
 static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
-                      __u32 mode, struct lustre_handle *lockh)
+                      __u32 mode, struct lustre_handle *lockh, int flags,
+                      obd_off end)
 {
         struct lov_request_set *set;
         struct obd_info oinfo;
@@ -2242,6 +2243,13 @@ static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
 
         LASSERT(lockh);
         lov = &exp->exp_obd->u.lov;
+        if (flags & OBD_FAST_LOCK) {
+                int stripe = lov_stripe_number(lsm, end);
+                RETURN(obd_cancel(lov->lov_tgts[lsm->lsm_oinfo[stripe]->
+                                  loi_ost_idx]->ltd_exp, NULL, mode, lockh,
+                                  flags, end));
+        }
+
         rc = lov_prep_cancel_set(exp, &oinfo, lsm, mode, lockh, &set);
         if (rc)
                 RETURN(rc);
@@ -2260,7 +2268,8 @@ static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
                         this_mode = mode;
 
                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
-                                req->rq_oi.oi_md, this_mode, lov_lockhp);
+                                req->rq_oi.oi_md, this_mode, lov_lockhp, flags,
+                                end);
                 rc = lov_update_common_set(set, req, rc);
                 if (rc) {
                         CERROR("error: cancel objid "LPX64" subobj "
@@ -3208,45 +3217,44 @@ void lov_stripe_unlock(struct lov_stripe_md *md)
 }
 EXPORT_SYMBOL(lov_stripe_unlock);
 
-static int lov_reget_short_lock(struct obd_export *exp,
-                                struct lov_stripe_md *lsm,
-                                void **res, int rw,
-                                obd_off start, obd_off end,
-                                void **cookie)
+static int lov_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm,
+                        void **res, int rw, obd_off start, obd_off end,
+                        struct lustre_handle *lockh, int flags)
 {
         struct lov_async_page *l = *res;
         obd_off stripe_start, stripe_end = start;
+        struct lov_lock_handles *lov_lockh = NULL;
+        int rc;
 
         ENTRY;
 
+        if (lockh && lustre_handle_is_used(lockh) &&
+            !(flags & OBD_FAST_LOCK)) {
+                lov_lockh = lov_handle2llh(lockh);
+                if (lov_lockh == NULL) {
+                        CERROR("LOV: invalid lov lock handle %p\n", lockh);
+                        RETURN(-EINVAL);
+                }
+                lockh = lov_lockh->llh_handles + l->lap_stripe;
+        }
+
         /* ensure we don't cross stripe boundaries */
         lov_extent_calc(exp, lsm, OBD_CALC_STRIPE_END, &stripe_end);
         if (stripe_end <= end)
-                RETURN(0);
+                GOTO(out, rc = 0);
 
         /* map the region limits to the object limits */
         lov_stripe_offset(lsm, start, l->lap_stripe, &stripe_start);
         lov_stripe_offset(lsm, end, l->lap_stripe, &stripe_end);
 
-        RETURN(obd_reget_short_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
-                                    lsm_oinfo[l->lap_stripe]->loi_ost_idx]->
-                                    ltd_exp, NULL, &l->lap_sub_cookie,
-                                    rw, stripe_start, stripe_end, cookie));
-}
-
-static int lov_release_short_lock(struct obd_export *exp,
-                                  struct lov_stripe_md *lsm, obd_off end,
-                                  void *cookie, int rw)
-{
-        int stripe;
-
-        ENTRY;
-
-        stripe = lov_stripe_number(lsm, end);
-
-        RETURN(obd_release_short_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
-                                      lsm_oinfo[stripe]->loi_ost_idx]->
-                                      ltd_exp, NULL, end, cookie, rw));
+        rc = obd_get_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
+                            lsm_oinfo[l->lap_stripe]->loi_ost_idx]->
+                            ltd_exp, NULL, &l->lap_sub_cookie,
+                            rw, stripe_start, stripe_end, lockh, flags);
+out:
+        if (lov_lockh != NULL)
+                lov_llh_put(lov_lockh);
+        RETURN(rc);
 }
 
 struct obd_ops lov_obd_ops = {
@@ -3271,8 +3279,7 @@ struct obd_ops lov_obd_ops = {
         .o_brw                 = lov_brw,
         .o_brw_async           = lov_brw_async,
         .o_prep_async_page     = lov_prep_async_page,
-        .o_reget_short_lock    = lov_reget_short_lock,
-        .o_release_short_lock  = lov_release_short_lock,
+        .o_get_lock            = lov_get_lock,
         .o_queue_async_io      = lov_queue_async_io,
         .o_set_async_flags     = lov_set_async_flags,
         .o_queue_group_io      = lov_queue_group_io,
index ad56f04..f7522b9 100644 (file)
@@ -249,7 +249,7 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode)
                         continue;
 
                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
-                                req->rq_oi.oi_md, mode, lov_lockhp);
+                                req->rq_oi.oi_md, mode, lov_lockhp, 0, 0);
                 if (rc && lov->lov_tgts[req->rq_idx] &&
                     lov->lov_tgts[req->rq_idx]->ltd_active)
                         CERROR("cancelling obdjid "LPX64" on OST "
index fe251bf..0c5a0af 100644 (file)
@@ -673,7 +673,8 @@ static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
 }
 
 static int mgc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
-                      __u32 mode, struct lustre_handle *lockh)
+                      __u32 mode, struct lustre_handle *lockh, int flags,
+                      obd_off end)
 {
         ENTRY;
 
@@ -1187,7 +1188,7 @@ out_pop:
         /* Now drop the lock so MGS can revoke it */
         if (!rcl) {
                 rcl = mgc_cancel(mgc->u.cli.cl_mgc_mgsexp, NULL,
-                                 LCK_CR, &lockh);
+                                 LCK_CR, &lockh, 0, 0);
                 if (rcl)
                         CERROR("Can't drop cfg lock: %d\n", rcl);
         }
index 17afa31..1abdb1b 100644 (file)
@@ -1368,8 +1368,7 @@ void lprocfs_init_ops_stats(int num_private_stats, struct lprocfs_stats *stats)
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, brw);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, brw_async);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, prep_async_page);
-        LPROCFS_OBD_OP_INIT(num_private_stats, stats, reget_short_lock);
-        LPROCFS_OBD_OP_INIT(num_private_stats, stats, release_short_lock);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, get_lock);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, queue_async_io);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, queue_group_io);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, trigger_group_io);
index 6cd7ef4..c66be77 100644 (file)
@@ -804,7 +804,8 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
 
                 rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page,
                                          eap->eap_off, &ec_async_page_ops,
-                                         eap, &eap->eap_cookie, 1, NULL);
+                                         eap, &eap->eap_cookie,
+                                         OBD_PAGE_NO_CACHE, NULL);
                 if (rc) {
                         spin_lock(&eas->eas_lock);
                         eas->eas_rc = rc;
@@ -1139,7 +1140,7 @@ echo_client_cancel(struct obd_export *exp, struct obdo *oa)
                 return (-ENOENT);
 
         rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm, ecl->ecl_mode,
-                        &ecl->ecl_lock_handle);
+                        &ecl->ecl_lock_handle, 0, 0);
 
         echo_put_object (ecl->ecl_object);
         OBD_FREE (ecl, sizeof (*ecl));
@@ -1436,7 +1437,7 @@ static int echo_client_disconnect(struct obd_export *exp)
                 list_del (&ecl->ecl_exp_chain);
 
                 rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm,
-                                 ecl->ecl_mode, &ecl->ecl_lock_handle);
+                                 ecl->ecl_mode, &ecl->ecl_lock_handle, 0, 0);
 
                 CDEBUG (D_INFO, "Cancel lock on object "LPX64" on disconnect "
                         "(%d)\n", ecl->ecl_object->eco_id, rc);
index f0a8bce..f8a89a4 100644 (file)
@@ -100,14 +100,16 @@ int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res,
                 if (!lock)
                         RETURN(-ENOLCK);
 
-                LASSERTF(lock->l_policy_data.l_extent.start <=
-                         extent->oap_obj_off &&
-                         extent->oap_obj_off + CFS_PAGE_SIZE - 1 <=
-                         lock->l_policy_data.l_extent.end,
-                         "Got wrong lock [" LPU64 "," LPU64 "] for page with "
-                         "offset " LPU64 "\n",
-                         lock->l_policy_data.l_extent.start,
-                         lock->l_policy_data.l_extent.end, extent->oap_obj_off);
+                if(lock->l_policy_data.l_extent.start > extent->oap_obj_off ||
+                   extent->oap_obj_off + CFS_PAGE_SIZE - 1 >
+                   lock->l_policy_data.l_extent.end) {
+                         CDEBUG(D_CACHE, "Got wrong lock [" LPU64 "," LPU64 "] "
+                                         "for page with offset " LPU64 "\n",
+                                         lock->l_policy_data.l_extent.start,
+                                         lock->l_policy_data.l_extent.end,
+                                         extent->oap_obj_off);
+                         RETURN(-ENOLCK);
+                }
         } else {
                 /* Real extent width calculation here once we have real
                  * extents
index 3a8bb3f..b1767dd 100644 (file)
@@ -2780,39 +2780,43 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
         RETURN(-EDQUOT);
 }
 
-static int osc_reget_short_lock(struct obd_export *exp,
-                                struct lov_stripe_md *lsm,
-                                void **res, int rw,
-                                obd_off start, obd_off end,
-                                void **cookie)
+static int osc_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm,
+                        void **res, int rw, obd_off start, obd_off end,
+                        struct lustre_handle *lockh, int flags)
 {
-        struct osc_async_page *oap = *res;
-        int rc;
+        struct ldlm_lock *lock = NULL;
+        int rc, release = 0;
 
         ENTRY;
 
-        spin_lock(&oap->oap_lock);
-        rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
-                                  start, end, cookie);
-        spin_unlock(&oap->oap_lock);
+        if (lockh && lustre_handle_is_used(lockh)) {
+                /* if a valid lockh is passed, just check that the corresponding
+                 * lock covers the extent */
+                lock = ldlm_handle2lock(lockh);
+                release = 1;
+        } else {
+                struct osc_async_page *oap = *res;
+                spin_lock(&oap->oap_lock);
+                lock = oap->oap_ldlm_lock;
+                LDLM_LOCK_GET(lock);
+                spin_unlock(&oap->oap_lock);
+        }
 
+        rc = ldlm_lock_fast_match(lock, rw, start, end, lockh);
+        if (release == 1 && rc == 1)
+                /* if a valid lockh was passed, we just need to check
+                 * that the lock covers the page, no reference should be
+                 * taken*/
+                ldlm_lock_decref(lockh,
+                                 rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
+        LDLM_LOCK_PUT(lock);
         RETURN(rc);
 }
 
-static int osc_release_short_lock(struct obd_export *exp,
-                                  struct lov_stripe_md *lsm, obd_off end,
-                                  void *cookie, int rw)
-{
-        ENTRY;
-        ldlm_lock_fast_release(cookie, rw);
-        /* no error could have happened at this layer */
-        RETURN(0);
-}
-
 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
                         struct lov_oinfo *loi, cfs_page_t *page,
                         obd_off offset, struct obd_async_page_ops *ops,
-                        void *data, void **res, int nocache,
+                        void *data, void **res, int flags,
                         struct lustre_handle *lockh)
 {
         struct osc_async_page *oap;
@@ -2845,7 +2849,7 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
         spin_lock_init(&oap->oap_lock);
 
         /* If the page was marked as notcacheable - don't add to any locks */
-        if (!nocache) {
+        if (!(flags & OBD_PAGE_NO_CACHE)) {
                 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
                 /* This is the only place where we can call cache_add_extent
                    without oap_lock, because this page is locked now, and
@@ -3479,7 +3483,8 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
 }
 
 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
-                      __u32 mode, struct lustre_handle *lockh)
+                      __u32 mode, struct lustre_handle *lockh, int flags,
+                      obd_off end)
 {
         ENTRY;
 
@@ -4461,8 +4466,7 @@ struct obd_ops osc_obd_ops = {
         .o_brw                  = osc_brw,
         .o_brw_async            = osc_brw_async,
         .o_prep_async_page      = osc_prep_async_page,
-        .o_reget_short_lock     = osc_reget_short_lock,
-        .o_release_short_lock   = osc_release_short_lock,
+        .o_get_lock             = osc_get_lock,
         .o_queue_async_io       = osc_queue_async_io,
         .o_set_async_flags      = osc_set_async_flags,
         .o_queue_group_io       = osc_queue_group_io,