Whamcloud - gitweb
Directly associate cached pages to lock that protect those pages,
authordeen <deen>
Thu, 5 Jun 2008 11:43:21 +0000 (11:43 +0000)
committerdeen <deen>
Thu, 5 Jun 2008 11:43:21 +0000 (11:43 +0000)
this allows us to quickly find what pages to write and remove
once lock callback is received.

b=10718
i=green
i=johann

22 files changed:
lustre/ChangeLog
lustre/include/Makefile.am
lustre/include/lustre_dlm.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/include/obd_ost.h
lustre/ldlm/ldlm_lock.c
lustre/liblustre/llite_lib.h
lustre/liblustre/rw.c
lustre/liblustre/super.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/rw.c
lustre/lov/lov_obd.c
lustre/obdclass/lprocfs_status.c
lustre/obdecho/echo_client.c
lustre/osc/Makefile.in
lustre/osc/autoMakefile.am
lustre/osc/osc_internal.h
lustre/osc/osc_request.c
lustre/tests/sanity.sh

index a332517..04dee50 100644 (file)
@@ -1084,6 +1084,13 @@ Details    : Change LASSERTs to client eviction (i.e. abort client's recovery)
              because LASSERT on both the data supplied by a client, and the data 
             on disk is dangerous and incorrect.
 
+Severity   : enhancement
+Bugzilla   : 10718
+Description: Slow trucate/writes to huge files at high offsets.
+Details    : Directly associate cached pages to lock that protect those pages,
+             this allows us to quickly find what pages to write and remove
+            once lock callback is received.
+
 --------------------------------------------------------------------------------
 
 2007-08-10         Cluster File Systems, Inc. <info@clusterfs.com>
index 26cad31..b2dc0d9 100644 (file)
@@ -16,4 +16,5 @@ EXTRA_DIST = ioctl.h liblustre.h lprocfs_status.h lustre_cfg.h        \
             obd_ost.h obd_support.h lustre_ver.h lu_object.h lu_time.h  \
              md_object.h dt_object.h lustre_param.h lustre_mdt.h \
              lustre_fid.h lustre_fld.h lustre_req_layout.h lustre_capa.h \
-             lustre_idmap.h lustre_eacl.h interval_tree.h obd_cksum.h
+             lustre_idmap.h lustre_eacl.h interval_tree.h obd_cksum.h \
+            lustre_cache.h
index 5d2941c..61a4d13 100644 (file)
@@ -549,6 +549,10 @@ struct ldlm_lock {
         void                 *l_lvb_data;       /* an LVB received during */
         void                 *l_lvb_swabber;    /* an enqueue */
         void                 *l_ast_data;
+        spinlock_t            l_extents_list_lock;
+        struct list_head      l_extents_list;
+
+        struct list_head      l_cache_locks_list;
 
         /* Server-side-only members */
 
index 188a022..e69ac77 100644 (file)
@@ -255,6 +255,11 @@ struct obd_device_target {
         struct lustre_quota_ctxt  obt_qctxt;
 };
 
+typedef void (*obd_pin_extent_cb)(void *data);
+typedef int (*obd_page_removal_cb_t)(void *data, int discard);
+typedef int (*obd_lock_cancel_cb)(struct ldlm_lock *,struct ldlm_lock_desc *,
+                                   void *, int);
+
 /* llog contexts */
 enum llog_ctxt_id {
         LLOG_CONFIG_ORIG_CTXT  =  0,
@@ -379,6 +384,7 @@ struct filter_obd {
 
 struct mdc_rpc_lock;
 struct obd_import;
+struct lustre_cache;
 struct client_obd {
         struct semaphore         cl_sem;
         struct obd_uuid          cl_target_uuid;
@@ -473,6 +479,10 @@ struct client_obd {
         struct lu_client_seq    *cl_seq;
 
         atomic_t                 cl_resends; /* resend count */
+
+        /* Cache of triples */
+        struct lustre_cache     *cl_cache;
+        obd_lock_cancel_cb       cl_ext_lock_cancel_cb;
 };
 #define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid)
 
@@ -647,6 +657,9 @@ struct lov_obd {
         __u32                   lov_offset_idx; /* aliasing for start_idx  */
         int                     lov_start_count;/* reseed counter */
         int                     lov_connects;
+        obd_page_removal_cb_t   lov_page_removal_cb;
+        obd_pin_extent_cb       lov_page_pin_cb;
+        obd_lock_cancel_cb      lov_lock_cancel_cb;
 };
 
 struct lmv_tgt_desc {
@@ -1172,7 +1185,8 @@ struct obd_ops {
                                  struct lov_oinfo *loi,
                                  cfs_page_t *page, obd_off offset,
                                  struct obd_async_page_ops *ops, void *data,
-                                 void **res);
+                                 void **res, int nocache,
+                                 struct lustre_handle *lockh);
         int (*o_queue_async_io)(struct obd_export *exp,
                                 struct lov_stripe_md *lsm,
                                 struct lov_oinfo *loi, void *cookie,
@@ -1267,6 +1281,17 @@ struct obd_ops {
         int (*o_quotactl)(struct obd_export *, struct obd_quotactl *);
 
         int (*o_ping)(struct obd_export *exp);
+
+        int (*o_register_page_removal_cb)(struct obd_export *exp,
+                                          obd_page_removal_cb_t cb,
+                                          obd_pin_extent_cb pin_cb);
+        int (*o_unregister_page_removal_cb)(struct obd_export *exp,
+                                            obd_page_removal_cb_t cb);
+        int (*o_register_lock_cancel_cb)(struct obd_export *exp,
+                                       obd_lock_cancel_cb cb);
+        int (*o_unregister_lock_cancel_cb)(struct obd_export *exp,
+                                         obd_lock_cancel_cb cb);
+
         /*
          * NOTE: If adding ops, add another LPROCFS_OBD_OP_INIT() line
          * to lprocfs_alloc_obd_stats() in obdclass/lprocfs_status.c.
index f6ea134..c5d7f79 100644 (file)
@@ -1193,7 +1193,8 @@ static inline  int obd_prep_async_page(struct obd_export *exp,
                                        struct lov_oinfo *loi,
                                        cfs_page_t *page, obd_off offset,
                                        struct obd_async_page_ops *ops,
-                                       void *data, void **res)
+                                       void *data, void **res, int nocache,
+                                       struct lustre_handle *lockh)
 {
         int ret;
         ENTRY;
@@ -1202,7 +1203,8 @@ static inline  int obd_prep_async_page(struct obd_export *exp,
         EXP_COUNTER_INCREMENT(exp, prep_async_page);
 
         ret = OBP(exp->exp_obd, prep_async_page)(exp, lsm, loi, page, offset,
-                                                 ops, data, res);
+                                                 ops, data, res, nocache,
+                                                 lockh);
         RETURN(ret);
 }
 
@@ -1647,6 +1649,59 @@ static inline int obd_register_observer(struct obd_device *obd,
         RETURN(0);
 }
 
+static inline int obd_register_page_removal_cb(struct obd_export *exp,
+                                               obd_page_removal_cb_t cb,
+                                               obd_pin_extent_cb pin_cb)
+{
+        int rc;
+        ENTRY;
+
+        OBD_CHECK_DT_OP(exp->exp_obd, register_page_removal_cb, 0);
+        OBD_COUNTER_INCREMENT(exp->exp_obd, register_page_removal_cb);
+
+        rc = OBP(exp->exp_obd, register_page_removal_cb)(exp, cb, pin_cb);
+        RETURN(rc);
+}
+
+static inline int obd_unregister_page_removal_cb(struct obd_export *exp,
+                                                 obd_page_removal_cb_t cb)
+{
+        int rc;
+        ENTRY;
+
+        OBD_CHECK_DT_OP(exp->exp_obd, unregister_page_removal_cb, 0);
+        OBD_COUNTER_INCREMENT(exp->exp_obd, unregister_page_removal_cb);
+
+        rc = OBP(exp->exp_obd, unregister_page_removal_cb)(exp, cb);
+        RETURN(rc);
+}
+
+static inline int obd_register_lock_cancel_cb(struct obd_export *exp,
+                                              obd_lock_cancel_cb cb)
+{
+        int rc;
+        ENTRY;
+
+        OBD_CHECK_DT_OP(exp->exp_obd, register_lock_cancel_cb, 0);
+        OBD_COUNTER_INCREMENT(exp->exp_obd, register_lock_cancel_cb);
+
+        rc = OBP(exp->exp_obd, register_lock_cancel_cb)(exp, cb);
+        RETURN(rc);
+}
+
+static inline int obd_unregister_lock_cancel_cb(struct obd_export *exp,
+                                                 obd_lock_cancel_cb cb)
+{
+        int rc;
+        ENTRY;
+
+        OBD_CHECK_DT_OP(exp->exp_obd, unregister_lock_cancel_cb, 0);
+        OBD_COUNTER_INCREMENT(exp->exp_obd, unregister_lock_cancel_cb);
+
+        rc = OBP(exp->exp_obd, unregister_lock_cancel_cb)(exp, cb);
+        RETURN(rc);
+}
+
 /* metadata helpers */
 static inline int md_getstatus(struct obd_export *exp,
                                struct lu_fid *fid, struct obd_capa **pc)
index 12beb63..1cbff28 100644 (file)
@@ -34,4 +34,8 @@ struct osc_enqueue_args {
         struct ldlm_enqueue_info*oa_ei;
 };
 
+int osc_extent_blocking_cb(struct ldlm_lock *lock,
+                           struct ldlm_lock_desc *new, void *data,
+                           int flag);
+
 #endif
index 286197f..11ac6ca 100644 (file)
@@ -341,6 +341,10 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
         class_handle_hash(&lock->l_handle, lock_handle_addref);
 
+        CFS_INIT_LIST_HEAD(&lock->l_extents_list);
+        spin_lock_init(&lock->l_extents_list_lock);
+        CFS_INIT_LIST_HEAD(&lock->l_cache_locks_list);
+
         RETURN(lock);
 }
 
index d262250..c91f25c 100644 (file)
@@ -223,6 +223,9 @@ int llu_iop_write(struct inode *ino, struct ioctx *ioctxp);
 int llu_iop_iodone(struct ioctx *ioctxp);
 int llu_local_size(struct inode *inode);
 int llu_glimpse_size(struct inode *inode);
+int llu_extent_lock_cancel_cb(struct ldlm_lock *lock,
+                              struct ldlm_lock_desc *new, void *data,
+                              int flag);
 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
                     struct lov_stripe_md *lsm, int mode,
                     ldlm_policy_data_t *policy, struct lustre_handle *lockh,
index 021a592..d3e4a9f 100644 (file)
@@ -112,9 +112,9 @@ static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock
         RETURN(stripe);
 }
 
-static int llu_extent_lock_callback(struct ldlm_lock *lock,
-                                    struct ldlm_lock_desc *new, void *data,
-                                    int flag)
+int llu_extent_lock_cancel_cb(struct ldlm_lock *lock,
+                              struct ldlm_lock_desc *new, void *data,
+                              int flag)
 {
         struct lustre_handle lockh = { 0 };
         int rc;
@@ -295,7 +295,7 @@ int llu_glimpse_size(struct inode *inode)
 
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = LCK_PR;
-        einfo.ei_cb_bl = llu_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = llu_glimpse_callback;
         einfo.ei_cbdata = inode;
@@ -345,7 +345,7 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
 
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = mode;
-        einfo.ei_cb_bl = llu_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = llu_glimpse_callback;
         einfo.ei_cbdata = inode;
@@ -537,7 +537,9 @@ static int llu_queue_pio(int cmd, struct llu_io_group *group,
                 rc = obd_prep_async_page(exp, lsm, NULL, page,
                                          (obd_off)page->index << CFS_PAGE_SHIFT,
                                          &llu_async_page_ops,
-                                         llap, &llap->llap_cookie);
+                                         llap, &llap->llap_cookie,
+                                         1 /* no cache in liblustre at all */,
+                                         NULL);
                 if (rc) {
                         LASSERT(rc < 0);
                         llap->llap_cookie = NULL;
@@ -609,7 +611,8 @@ struct llu_io_group * get_io_group(struct inode *inode, int maxpages,
         if (!llap_cookie_size)
                 llap_cookie_size = obd_prep_async_page(llu_i2obdexp(inode),
                                                        NULL, NULL, NULL, 0,
-                                                       NULL, NULL, NULL);
+                                                       NULL, NULL, NULL, 0,
+                                                       NULL);
 
         OBD_ALLOC(group, LLU_IO_GROUP_SIZE(maxpages));
         if (!group)
index 81dedf8..42d6b4e 100644 (file)
@@ -95,6 +95,8 @@ static void llu_fsop_gone(struct filesys *fs)
         ENTRY;
 
         list_del(&sbi->ll_conn_chain);
+        obd_unregister_lock_cancel_cb(sbi->ll_dt_exp,
+                                      llu_extent_lock_cancel_cb);
         obd_disconnect(sbi->ll_dt_exp);
         obd_disconnect(sbi->ll_md_exp);
 
@@ -2121,12 +2123,19 @@ llu_fsswop_mount(const char *source,
         sbi->ll_dt_exp = class_conn2export(&dt_conn);
         sbi->ll_lco.lco_flags = ocd.ocd_connect_flags;
 
+        err = obd_register_lock_cancel_cb(sbi->ll_dt_exp,
+                                          llu_extent_lock_cancel_cb);
+        if (err) {
+                CERROR("cannot register lock cancel callback: rc = %d\n", err);
+                GOTO(out_dt, err);
+        }
+
         llu_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp);
 
         err = md_getstatus(sbi->ll_md_exp, &rootfid, NULL);
         if (err) {
                 CERROR("cannot mds_connect: rc = %d\n", err);
-                GOTO(out_dt, err);
+                GOTO(out_lock_cn_cb, err);
         }
         CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&rootfid));
         sbi->ll_root_fid = rootfid;
@@ -2136,7 +2145,7 @@ llu_fsswop_mount(const char *source,
                          OBD_MD_FLGETATTR | OBD_MD_FLBLOCKS, 0, &request);
         if (err) {
                 CERROR("md_getattr failed for root: rc = %d\n", err);
-                GOTO(out_dt, err);
+                GOTO(out_lock_cn_cb, err);
         }
 
         err = md_get_lustre_md(sbi->ll_md_exp, request,
@@ -2180,6 +2189,9 @@ out_inode:
         _sysio_i_gone(root);
 out_request:
         ptlrpc_req_finished(request);
+out_lock_cn_cb:
+        obd_unregister_lock_cancel_cb(sbi->ll_dt_exp,
+                                      llu_extent_lock_cancel_cb);
 out_dt:
         obd_disconnect(sbi->ll_dt_exp);
 out_md:
index 7d1765a..5c384d0 100644 (file)
@@ -776,165 +776,92 @@ check:
         RETURN(stripe);
 }
 
-/* Flush the page cache for an extent as its canceled.  When we're on an LOV,
- * we get a lock cancellation for each stripe, so we have to map the obd's
- * region back onto the stripes in the file that it held.
+/* Get extra page reference to ensure it is not going away */
+void ll_pin_extent_cb(void *data)
+{
+        struct page *page = data;
+        
+        page_cache_get(page);
+
+        return;
+}
+
+/* Flush the page from page cache for an extent as its canceled.
+ * Page to remove is delivered as @data.
  *
- * No one can dirty the extent until we've finished our work and they can
+ * No one can dirty the extent until we've finished our work and they cannot
  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
  * but other kernel actors could have pages locked.
  *
+ * If @discard is set, there is no need to write the page if it is dirty.
+ *
  * Called with the DLM lock held. */
-void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
-                              struct ldlm_lock *lock, __u32 stripe)
+int ll_page_removal_cb(void *data, int discard)
 {
-        ldlm_policy_data_t tmpex;
-        unsigned long start, end, count, skip, i, j;
-        struct page *page;
-        int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
-        struct lustre_handle lockh;
-        struct address_space *mapping = inode->i_mapping;
-
+        int rc;
+        struct page *page = data;
+        struct address_space *mapping;
         ENTRY;
-        tmpex = lock->l_policy_data;
-        CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
-               inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
-               i_size_read(inode));
-
-        /* our locks are page granular thanks to osc_enqueue, we invalidate the
-         * whole page. */
-        if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
-            ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
-                LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
-                           CFS_PAGE_SIZE);
-        LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
-        LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
-
-        count = ~0;
-        skip = 0;
-        start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
-        end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
-        if (lsm->lsm_stripe_count > 1) {
-                count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
-                skip = (lsm->lsm_stripe_count - 1) * count;
-                start += start/count * skip + stripe * count;
-                if (end != ~0)
-                        end += end/count * skip + stripe * count;
-        }
-        if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
-                end = ~0;
-
-        i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
-            CFS_PAGE_SHIFT : 0;
-        if (i < end)
-                end = i;
-
-        CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
-               "count: %lu skip: %lu end: %lu%s\n", start, start % count,
-               count, skip, end, discard ? " (DISCARDING)" : "");
-
-        /* walk through the vmas on the inode and tear down mmaped pages that
-         * intersect with the lock.  this stops immediately if there are no
-         * mmap()ed regions of the file.  This is not efficient at all and
-         * should be short lived. We'll associate mmap()ed pages with the lock
-         * and will be able to find them directly */
-        for (i = start; i <= end; i += (j + skip)) {
-                j = min(count - (i % count), end - i + 1);
-                LASSERT(j > 0);
-                LASSERT(mapping);
-                if (ll_teardown_mmaps(mapping,
-                                      (__u64)i << CFS_PAGE_SHIFT,
-                                      ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
-                        break;
-        }
-
-        /* this is the simplistic implementation of page eviction at
-         * cancelation.  It is careful to get races with other page
-         * lockers handled correctly.  fixes from bug 20 will make it
-         * more efficient by associating locks with pages and with
-         * batching writeback under the lock explicitly. */
-        for (i = start, j = start % count; i <= end;
-             j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
-                if (j == count) {
-                        CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
-                        i += skip;
-                        j = 0;
-                        if (i > end)
-                                break;
-                }
-                LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
-                         LPU64" >= "LPU64" start %lu i %lu end %lu\n",
-                         tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
-                         start, i, end);
 
-                if (!mapping_has_pages(mapping)) {
-                        CDEBUG(D_INODE|D_PAGE, "nothing left\n");
-                        break;
-                }
+        /* We have page reference already from ll_pin_page */
+        lock_page(page);
 
-                cond_resched();
-
-                page = find_lock_page(mapping, i);
-                if (page == NULL)
-                        continue;
-                LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
-                               i, tmpex.l_extent.start);
-                if (!discard && PageWriteback(page))
-                        wait_on_page_writeback(page);
-
-                /* page->mapping to check with racing against teardown */
-                if (!discard && clear_page_dirty_for_io(page)) {
-                        rc = ll_call_writepage(inode, page);
-                        /* either waiting for io to complete or reacquiring
-                         * the lock that the failed writepage released */
-                        lock_page(page);
-                        wait_on_page_writeback(page);
-                        if (rc < 0) {
-                                CERROR("writepage inode %lu(%p) of page %p "
-                                       "failed: %d\n", inode->i_ino, inode,
-                                       page, rc);
-                                if (rc == -ENOSPC)
-                                        set_bit(AS_ENOSPC, &mapping->flags);
-                                else
-                                        set_bit(AS_EIO, &mapping->flags);
-                        }
-                }
-
-                tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
-                /* check to see if another DLM lock covers this page b=2765 */
-                rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
-                                      LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
-                                      LDLM_FL_TEST_LOCK,
-                                      &lock->l_resource->lr_name, LDLM_EXTENT,
-                                      &tmpex, LCK_PR | LCK_PW, &lockh);
-
-                if (rc2 <= 0 && page->mapping != NULL) {
-                        struct ll_async_page *llap = llap_cast_private(page);
-                        /* checking again to account for writeback's
-                         * lock_page() */
-                        LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
-                        if (llap)
-                                ll_ra_accounting(llap, mapping);
-                        ll_truncate_complete_page(page);
+        /* Already truncated by somebody */
+        if (!page->mapping)
+                GOTO(out, rc = 0);
+        mapping = page->mapping;
+
+        ll_teardown_mmaps(mapping,
+                          (__u64)page->index << PAGE_CACHE_SHIFT,
+                          ((__u64)page->index<<PAGE_CACHE_SHIFT)|
+                                                              ~PAGE_CACHE_MASK);        
+        LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
+
+        if (!discard && clear_page_dirty_for_io(page)) {
+                LASSERT(page->mapping);
+                rc = ll_call_writepage(page->mapping->host, page);
+                /* either waiting for io to complete or reacquiring
+                 * the lock that the failed writepage released */
+                lock_page(page);
+                wait_on_page_writeback(page);
+                if (rc != 0) {
+                        CERROR("writepage inode %lu(%p) of page %p "
+                               "failed: %d\n", mapping->host->i_ino,
+                               mapping->host, page, rc);
+                        if (rc == -ENOSPC)
+                                set_bit(AS_ENOSPC, &mapping->flags);
+                        else
+                                set_bit(AS_EIO, &mapping->flags);
                 }
-                unlock_page(page);
-                page_cache_release(page);
-        }
-        LASSERTF(tmpex.l_extent.start <=
-                 (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
-                  lock->l_policy_data.l_extent.end + 1),
-                 "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
-                 tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
-                 start, i, end);
+                set_bit(AS_EIO, &mapping->flags);
+        }
+        if (page->mapping != NULL) {
+                struct ll_async_page *llap = llap_cast_private(page);
+                /* checking again to account for writeback's lock_page() */
+                LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
+                if (llap)
+                        ll_ra_accounting(llap, page->mapping);
+                ll_truncate_complete_page(page);
+        }
         EXIT;
+out:
+        LASSERT(!PageWriteback(page));
+        unlock_page(page);
+        page_cache_release(page);
+
+        return 0;
 }
 
-static int ll_extent_lock_callback(struct ldlm_lock *lock,
-                                   struct ldlm_lock_desc *new, void *data,
-                                   int flag)
+int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
+                             void *data, int flag)
 {
-        struct lustre_handle lockh = { 0 };
-        int rc;
+        struct inode *inode;
+        struct ll_inode_info *lli;
+        struct lov_stripe_md *lsm;
+        int stripe;
+        __u64 kms;
+
         ENTRY;
 
         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
@@ -942,60 +869,37 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock,
                 LBUG();
         }
 
-        switch (flag) {
-        case LDLM_CB_BLOCKING:
-                ldlm_lock2handle(lock, &lockh);
-                rc = ldlm_cli_cancel(&lockh);
-                if (rc != ELDLM_OK)
-                        CERROR("ldlm_cli_cancel failed: %d\n", rc);
-                break;
-        case LDLM_CB_CANCELING: {
-                struct inode *inode;
-                struct ll_inode_info *lli;
-                struct lov_stripe_md *lsm;
-                int stripe;
-                __u64 kms;
-
-                /* This lock wasn't granted, don't try to evict pages */
-                if (lock->l_req_mode != lock->l_granted_mode)
-                        RETURN(0);
-
-                inode = ll_inode_from_lock(lock);
-                if (inode == NULL)
-                        RETURN(0);
-                lli = ll_i2info(inode);
-                if (lli == NULL)
-                        goto iput;
-                if (lli->lli_smd == NULL)
-                        goto iput;
-                lsm = lli->lli_smd;
-
-                stripe = ll_lock_to_stripe_offset(inode, lock);
-                if (stripe < 0)
-                        goto iput;
-
-                ll_pgcache_remove_extent(inode, lsm, lock, stripe);
+        inode = ll_inode_from_lock(lock);
+        if (inode == NULL)
+                RETURN(0);
+        lli = ll_i2info(inode);
+        if (lli == NULL)
+                GOTO(iput, 0);
+        if (lli->lli_smd == NULL)
+                GOTO(iput, 0);
+        lsm = lli->lli_smd;
 
-                lov_stripe_lock(lsm);
-                lock_res_and_lock(lock);
-                kms = ldlm_extent_shift_kms(lock,
-                                            lsm->lsm_oinfo[stripe]->loi_kms);
+        stripe = ll_lock_to_stripe_offset(inode, lock);
+        if (stripe < 0)
+                GOTO(iput, 0);
 
-                if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
-                        LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
-                                   lsm->lsm_oinfo[stripe]->loi_kms, kms);
-                lsm->lsm_oinfo[stripe]->loi_kms = kms;
-                unlock_res_and_lock(lock);
-                lov_stripe_unlock(lsm);
-        iput:
-                iput(inode);
-                break;
-        }
-        default:
-                LBUG();
-        }
+        lov_stripe_lock(lsm);
+        lock_res_and_lock(lock);
+        kms = ldlm_extent_shift_kms(lock,
+                                    lsm->lsm_oinfo[stripe]->loi_kms);
+
+        if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
+                LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
+                           lsm->lsm_oinfo[stripe]->loi_kms, kms);
+        lsm->lsm_oinfo[stripe]->loi_kms = kms;
+        unlock_res_and_lock(lock);
+        lov_stripe_unlock(lsm);
+        ll_queue_done_writing(inode, 0);
+        EXIT;
+iput:
+        iput(inode);
 
-        RETURN(0);
+        return 0;
 }
 
 #if 0
@@ -1170,7 +1074,7 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
 
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = LCK_PR;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = NULL;
@@ -1233,7 +1137,7 @@ int ll_glimpse_size(struct inode *inode, int ast_flags)
          *       acquired only if there were no conflicting locks. */
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = LCK_PR;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = inode;
@@ -1288,7 +1192,7 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
 
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = mode;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = inode;
index 3b73342..e4871e8 100644 (file)
@@ -499,6 +499,7 @@ struct ll_async_page {
                          llap_origin:3,
                          llap_ra_used:1,
                          llap_ignore_quota:1,
+                         llap_nocache:1,
                          llap_lockless_io_page:1;
         void            *llap_cookie;
         struct page     *llap_page;
@@ -655,6 +656,10 @@ int ll_dir_setstripe(struct inode *inode, struct lov_user_md *lump,
                      int set_default);
 int ll_dir_getstripe(struct inode *inode, struct lov_mds_md **lmm, 
                      int *lmm_size, struct ptlrpc_request **request);
+void ll_pin_extent_cb(void *data);
+int ll_page_removal_cb(void *data, int discard);
+int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
+                             void *data, int flag);
 
 /* llite/dcache.c */
 extern struct dentry_operations ll_init_d_ops;
index 0c32ba0..04eee3e 100644 (file)
@@ -36,6 +36,7 @@
 #include <lustre_param.h>
 #include <lustre_log.h>
 #include <obd_cksum.h>
+#include <lustre_cache.h>
 #include "llite_internal.h"
 
 cfs_mem_cache_t *ll_file_data_slab;
@@ -424,15 +425,33 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
         sbi->ll_lco.lco_flags = data->ocd_connect_flags;
         spin_unlock(&sbi->ll_lco.lco_lock);
 
-        ll_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp);
+        err = obd_register_page_removal_cb(sbi->ll_dt_exp,
+                                           ll_page_removal_cb, 
+                                           ll_pin_extent_cb);
+        if (err) {
+                CERROR("cannot register page removal callback: rc = %d\n",err);
+                GOTO(out_dt, err);
+        }
+        err = obd_register_lock_cancel_cb(sbi->ll_dt_exp,
+                                          ll_extent_lock_cancel_cb);
+        if (err) {
+                CERROR("cannot register lock cancel callback: rc = %d\n", err);
+                GOTO(out_page_rm_cb, err);
+        }
+
+        err = ll_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp);;
+        if (err) {
+                CERROR("cannot set max EA and cookie sizes: rc = %d\n", err);
+                GOTO(out_lock_cn_cb, err);
+        }
 
         err = obd_prep_async_page(sbi->ll_dt_exp, NULL, NULL, NULL,
-                                  0, NULL, NULL, NULL);
+                                  0, NULL, NULL, NULL, 0, NULL);
         if (err < 0) {
                 LCONSOLE_ERROR_MSG(0x151, "There are no OST's in this "
                                    "filesystem. There must be at least one "
                                    "active OST for a client to start.\n");
-                GOTO(out_dt_fid, err);
+                GOTO(out_lock_cn_cb, err);
         }
 
         if (!ll_async_page_slab) {
@@ -442,13 +461,13 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
                                                           ll_async_page_slab_size,
                                                           0, 0);
                 if (!ll_async_page_slab)
-                        GOTO(out_dt_fid, err = -ENOMEM);
+                        GOTO(out_lock_cn_cb, err = -ENOMEM);
         }
 
         err = md_getstatus(sbi->ll_md_exp, &rootfid, &oc);
         if (err) {
                 CERROR("cannot mds_connect: rc = %d\n", err);
-                GOTO(out_dt_fid, err);
+                GOTO(out_lock_cn_cb, err);
         }
         CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&rootfid));
         sbi->ll_root_fid = rootfid;
@@ -469,7 +488,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
                 free_capa(oc);
         if (err) {
                 CERROR("md_getattr failed for root: rc = %d\n", err);
-                GOTO(out_dt_fid, err);
+                GOTO(out_lock_cn_cb, err);
         }
         memset(&lmd, 0, sizeof(lmd));
         err = md_get_lustre_md(sbi->ll_md_exp, request, sbi->ll_dt_exp,
@@ -477,7 +496,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
         if (err) {
                 CERROR("failed to understand root inode md: rc = %d\n", err);
                 ptlrpc_req_finished (request);
-                GOTO(out_dt_fid, err);
+                GOTO(out_lock_cn_cb, err);
         }
 
         LASSERT(fid_is_sane(&sbi->ll_root_fid));
@@ -523,7 +542,12 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
 out_root:
         if (root)
                 iput(root);
-out_dt_fid:
+out_lock_cn_cb:
+        obd_unregister_lock_cancel_cb(sbi->ll_dt_exp,
+                                      ll_extent_lock_cancel_cb);
+out_page_rm_cb:
+        obd_unregister_page_removal_cb(sbi->ll_dt_exp,
+                                       ll_page_removal_cb);
         obd_fid_fini(sbi->ll_dt_exp);
 out_dt:
         obd_disconnect(sbi->ll_dt_exp);
@@ -715,6 +739,10 @@ void client_common_put_super(struct super_block *sb)
 
         list_del(&sbi->ll_conn_chain);
 
+        obd_unregister_page_removal_cb(sbi->ll_dt_exp,
+                                       ll_page_removal_cb);
+        obd_unregister_lock_cancel_cb(sbi->ll_dt_exp,ll_extent_lock_cancel_cb);
+
         obd_fid_fini(sbi->ll_dt_exp);
         obd_disconnect(sbi->ll_dt_exp);
         sbi->ll_dt_exp = NULL;
index 94ed749..3ac6fc3 100644 (file)
@@ -593,7 +593,9 @@ int llap_shrink_cache(struct ll_sb_info *sbi, int shrink_fraction)
         return count;
 }
 
-struct ll_async_page *llap_from_page(struct page *page, unsigned origin)
+static struct ll_async_page *llap_from_page_with_lockh(struct page *page,
+                                                       unsigned origin,
+                                                       struct lustre_handle *lockh)
 {
         struct ll_async_page *llap;
         struct obd_export *exp;
@@ -646,9 +648,14 @@ struct ll_async_page *llap_from_page(struct page *page, unsigned origin)
         llap->llap_magic = LLAP_MAGIC;
         llap->llap_cookie = (void *)llap + size_round(sizeof(*llap));
 
+        /* XXX: for bug 11270 - check for lockless origin here! */
+        if (origin == LLAP_ORIGIN_LOCKLESS_IO)
+                llap->llap_nocache = 1;
+
         rc = obd_prep_async_page(exp, ll_i2info(inode)->lli_smd, NULL, page,
                                  (obd_off)page->index << CFS_PAGE_SHIFT,
-                                 &ll_async_page_ops, llap, &llap->llap_cookie);
+                                 &ll_async_page_ops, llap, &llap->llap_cookie,
+                                 llap->llap_nocache, lockh);
         if (rc) {
                 OBD_SLAB_FREE(llap, ll_async_page_slab,
                               ll_async_page_slab_size);
@@ -698,6 +705,12 @@ struct ll_async_page *llap_from_page(struct page *page, unsigned origin)
         RETURN(llap);
 }
 
+struct ll_async_page *llap_from_page(struct page *page,
+                                     unsigned origin)
+{
+        return llap_from_page_with_lockh(page, origin, NULL);
+}
+
 static int queue_or_sync_write(struct obd_export *exp, struct inode *inode,
                                struct ll_async_page *llap,
                                unsigned to, obd_flag async_flags)
@@ -799,12 +812,14 @@ out:
 int ll_commit_write(struct file *file, struct page *page, unsigned from,
                     unsigned to)
 {
+        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
         struct inode *inode = page->mapping->host;
         struct ll_inode_info *lli = ll_i2info(inode);
         struct lov_stripe_md *lsm = lli->lli_smd;
         struct obd_export *exp;
         struct ll_async_page *llap;
         loff_t size;
+        struct lustre_handle *lockh = NULL;
         int rc = 0;
         ENTRY;
 
@@ -815,7 +830,10 @@ int ll_commit_write(struct file *file, struct page *page, unsigned from,
         CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n",
                inode, page, from, to, page->index);
 
-        llap = llap_from_page(page, LLAP_ORIGIN_COMMIT_WRITE);
+        if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
+                lockh = &fd->fd_cwlockh;
+
+        llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_COMMIT_WRITE, lockh);
         if (IS_ERR(llap))
                 RETURN(PTR_ERR(llap));
 
@@ -1012,6 +1030,7 @@ static void __ll_put_llap(struct page *page)
  * here. */
 void ll_removepage(struct page *page)
 {
+        struct ll_async_page *llap = llap_cast_private(page);
         ENTRY;
 
         LASSERT(!in_interrupt());
@@ -1023,36 +1042,13 @@ void ll_removepage(struct page *page)
                 return;
         }
 
-        LASSERT(!llap_cast_private(page)->llap_lockless_io_page);
+        LASSERT(!llap->llap_lockless_io_page);
+        LASSERT(!llap->llap_nocache);
         LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n");
         __ll_put_llap(page);
         EXIT;
 }
 
-static int ll_page_matches(struct page *page, int fd_flags)
-{
-        struct lustre_handle match_lockh = {0};
-        struct inode *inode = page->mapping->host;
-        ldlm_policy_data_t page_extent;
-        int flags, matches;
-        ENTRY;
-
-        if (unlikely(fd_flags & LL_FILE_GROUP_LOCKED))
-                RETURN(1);
-
-        page_extent.l_extent.start = (__u64)page->index << CFS_PAGE_SHIFT;
-        page_extent.l_extent.end =
-                page_extent.l_extent.start + CFS_PAGE_SIZE - 1;
-        flags = LDLM_FL_TEST_LOCK | LDLM_FL_BLOCK_GRANTED;
-        if (!(fd_flags & LL_FILE_READAHEAD))
-                flags |= LDLM_FL_CBPENDING;
-        matches = obd_match(ll_i2sbi(inode)->ll_dt_exp,
-                            ll_i2info(inode)->lli_smd, LDLM_EXTENT,
-                            &page_extent, LCK_PR | LCK_PW, &flags, inode,
-                            &match_lockh);
-        RETURN(matches);
-}
-
 static int ll_issue_page_read(struct obd_export *exp,
                               struct ll_async_page *llap,
                               struct obd_io_group *oig, int defer)
@@ -1769,6 +1765,7 @@ int ll_writepage(struct page *page)
         if (IS_ERR(llap))
                 GOTO(out, rc = PTR_ERR(llap));
 
+        LASSERT(!llap->llap_nocache);
         LASSERT(!PageWriteback(page));
         set_page_writeback(page);
 
@@ -1816,6 +1813,7 @@ int ll_readpage(struct file *filp, struct page *page)
         struct obd_export *exp;
         struct ll_async_page *llap;
         struct obd_io_group *oig = NULL;
+        struct lustre_handle *lockh = NULL;
         int rc;
         ENTRY;
 
@@ -1847,9 +1845,19 @@ int ll_readpage(struct file *filp, struct page *page)
         if (exp == NULL)
                 GOTO(out, rc = -EINVAL);
 
-        llap = llap_from_page(page, LLAP_ORIGIN_READPAGE);
-        if (IS_ERR(llap))
+        if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
+                lockh = &fd->fd_cwlockh;
+
+        llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READPAGE, lockh);
+        if (IS_ERR(llap)) {
+                if (PTR_ERR(llap) == -ENOLCK) {
+                        CWARN("ino %lu page %lu (%llu) not covered by "
+                              "a lock (mmap?).  check debug logs.\n",
+                              inode->i_ino, page->index,
+                              (long long)page->index << PAGE_CACHE_SHIFT);
+                }
                 GOTO(out, rc = PTR_ERR(llap));
+        }
 
         if (ll_i2sbi(inode)->ll_ra_info.ra_max_pages)
                 ras_update(ll_i2sbi(inode), inode, &fd->fd_ras, page->index,
@@ -1870,22 +1878,6 @@ int ll_readpage(struct file *filp, struct page *page)
                 GOTO(out_oig, rc = 0);
         }
 
-        if (likely((fd->fd_flags & LL_FILE_IGNORE_LOCK) == 0)) {
-                rc = ll_page_matches(page, fd->fd_flags);
-                if (rc < 0) {
-                        LL_CDEBUG_PAGE(D_ERROR, page,
-                                       "lock match failed: rc %d\n", rc);
-                        GOTO(out, rc);
-                }
-
-                if (rc == 0) {
-                        CWARN("ino %lu page %lu (%llu) not covered by "
-                              "a lock (mmap?).  check debug logs.\n",
-                              inode->i_ino, page->index,
-                              (long long)page->index << CFS_PAGE_SHIFT);
-                }
-        }
-
         rc = ll_issue_page_read(exp, llap, oig, 0);
         if (rc)
                 GOTO(out, rc);
index 6e6844e..161800e 100644 (file)
@@ -48,6 +48,7 @@
 #include <obd_ost.h>
 #include <lprocfs_status.h>
 #include <lustre_param.h>
+#include <lustre_cache.h>
 
 #include "lov_internal.h"
 
@@ -87,6 +88,94 @@ void lov_putref(struct obd_device *obd)
         mutex_up(&lov->lov_lock);
 }
 
+static int lov_register_page_removal_cb(struct obd_export *exp,
+                                        obd_page_removal_cb_t func,
+                                        obd_pin_extent_cb pin_cb)
+{
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
+        int i, rc = 0;
+
+        if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
+                return -EBUSY;
+
+        if (lov->lov_page_pin_cb && lov->lov_page_pin_cb != pin_cb)
+                return -EBUSY;
+
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+                        continue;
+                rc |= obd_register_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
+                                                   func, pin_cb);
+        }
+
+        lov->lov_page_removal_cb = func;
+        lov->lov_page_pin_cb = pin_cb;
+
+        return rc;
+}
+
+static int lov_unregister_page_removal_cb(struct obd_export *exp,
+                                        obd_page_removal_cb_t func)
+{
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
+        int i, rc = 0;
+
+        if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
+                return -EINVAL;
+
+        lov->lov_page_removal_cb = NULL;
+        lov->lov_page_pin_cb = NULL;
+
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+                        continue;
+                rc |= obd_unregister_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
+                                                     func);
+        }
+
+        return rc;
+}
+
+static int lov_register_lock_cancel_cb(struct obd_export *exp,
+                                         obd_lock_cancel_cb func)
+{
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
+        int i, rc = 0;
+
+        if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
+                return -EBUSY;
+
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+                        continue;
+                rc |= obd_register_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
+                                                  func);
+        }
+
+        lov->lov_lock_cancel_cb = func;
+
+        return rc;
+}
+
+static int lov_unregister_lock_cancel_cb(struct obd_export *exp,
+                                         obd_lock_cancel_cb func)
+{
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
+        int i, rc = 0;
+
+        if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
+                return -EINVAL;
+
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+                        continue;
+                rc |= obd_unregister_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
+                                                    func);
+        }
+        lov->lov_lock_cancel_cb = NULL;
+        return rc;
+}
+
 #define MAX_STRING_SIZE 128
 static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
                            struct obd_connect_data *data)
@@ -160,10 +249,33 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
                 RETURN(-ENODEV);
         }
 
+        rc = obd_register_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+                                          lov->lov_page_removal_cb,
+                                          lov->lov_page_pin_cb);
+        if (rc) {
+                obd_disconnect(lov->lov_tgts[index]->ltd_exp);
+                lov->lov_tgts[index]->ltd_exp = NULL;
+                RETURN(rc);
+        }
+
+        rc = obd_register_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
+                                         lov->lov_lock_cancel_cb);
+        if (rc) {
+                obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+                                               lov->lov_page_removal_cb);
+                obd_disconnect(lov->lov_tgts[index]->ltd_exp);
+                lov->lov_tgts[index]->ltd_exp = NULL;
+                RETURN(rc);
+        }
+
         rc = obd_register_observer(tgt_obd, obd);
         if (rc) {
                 CERROR("Target %s register_observer error %d\n",
                        obd_uuid2str(&tgt_uuid), rc);
+                obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
+                                              lov->lov_lock_cancel_cb);
+                obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+                                               lov->lov_page_removal_cb);
                 obd_disconnect(lov->lov_tgts[index]->ltd_exp);
                 lov->lov_tgts[index]->ltd_exp = NULL;
                 RETURN(rc);
@@ -268,6 +380,10 @@ static int lov_disconnect_obd(struct obd_device *obd, __u32 index)
         CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
                obd->obd_name, osc_obd->obd_name);
 
+        obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
+                                      lov->lov_lock_cancel_cb);
+        obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+                                       lov->lov_page_removal_cb);
 
         if (lov->lov_tgts[index]->ltd_active) {
                 lov->lov_tgts[index]->ltd_active = 0;
@@ -1684,10 +1800,12 @@ static struct obd_async_page_ops lov_async_page_ops = {
 int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
                            struct lov_oinfo *loi, cfs_page_t *page,
                            obd_off offset, struct obd_async_page_ops *ops,
-                           void *data, void **res)
+                           void *data, void **res, int nocache,
+                           struct lustre_handle *lockh)
 {
         struct lov_obd *lov = &exp->exp_obd->u.lov;
         struct lov_async_page *lap;
+        struct lov_lock_handles *lov_lockh = NULL;
         int rc = 0;
         ENTRY;
 
@@ -1704,7 +1822,8 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
                 }
                 rc = size_round(sizeof(*lap)) +
                         obd_prep_async_page(lov->lov_tgts[i]->ltd_exp, NULL,
-                                            NULL, NULL, 0, NULL, NULL, NULL);
+                                            NULL, NULL, 0, NULL, NULL, NULL, 0,
+                                            NULL);
                 RETURN(rc);
         }
         ASSERT_LSM_MAGIC(lsm);
@@ -1727,10 +1846,19 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
         
         lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap));
 
+        if (lockh) {
+                lov_lockh = lov_handle2llh(lockh);
+                if (lov_lockh) {
+                        lockh = lov_lockh->llh_handles + lap->lap_stripe;
+                }
+        }
+
         rc = obd_prep_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
                                  lsm, loi, page, lap->lap_sub_offset,
                                  &lov_async_page_ops, lap,
-                                 &lap->lap_sub_cookie);
+                                 &lap->lap_sub_cookie, nocache, lockh);
+        if (lov_lockh)
+                lov_llh_put(lov_lockh);
         if (rc)
                 RETURN(rc);
         CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
@@ -2752,6 +2880,10 @@ struct obd_ops lov_obd_ops = {
         .o_llog_init           = lov_llog_init,
         .o_llog_finish         = lov_llog_finish,
         .o_notify              = lov_notify,
+        .o_register_page_removal_cb = lov_register_page_removal_cb,
+        .o_unregister_page_removal_cb = lov_unregister_page_removal_cb,
+        .o_register_lock_cancel_cb = lov_register_lock_cancel_cb,
+        .o_unregister_lock_cancel_cb = lov_unregister_lock_cancel_cb,
 };
 
 static quota_interface_t *quota_interface;
index f57b55c..01e0654 100644 (file)
@@ -1102,6 +1102,10 @@ void lprocfs_init_ops_stats(int num_private_stats, struct lprocfs_stats *stats)
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, quotacheck);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, quotactl);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, ping);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, register_page_removal_cb);
+        LPROCFS_OBD_OP_INIT(num_private_stats,stats,unregister_page_removal_cb);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, register_lock_cancel_cb);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats,unregister_lock_cancel_cb);
 }
 
 int lprocfs_alloc_obd_stats(struct obd_device *obd, unsigned num_private_stats)
index f0c6fb4..e24fe8a 100644 (file)
@@ -785,7 +785,7 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
 
                 rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page,
                                          eap->eap_off, &ec_async_page_ops,
-                                         eap, &eap->eap_cookie);
+                                         eap, &eap->eap_cookie, 1, NULL);
                 if (rc) {
                         spin_lock(&eas.eas_lock);
                         eas.eas_rc = rc;
index ce9107f..2eb2eea 100644 (file)
@@ -1,4 +1,4 @@
 MODULES := osc
-osc-objs := osc_request.o lproc_osc.o osc_create.o
+osc-objs := osc_request.o lproc_osc.o osc_create.o cache.o
 
 @INCLUDE_RULES@
index 2b00785..985e473 100644 (file)
@@ -5,7 +5,7 @@
 
 if LIBLUSTRE
 noinst_LIBRARIES = libosc.a
-libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h
+libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h cache.c
 libosc_a_CPPFLAGS = $(LLCPPFLAGS)
 libosc_a_CFLAGS = $(LLCFLAGS)
 endif
index 89f341c..9b6c19b 100644 (file)
@@ -30,6 +30,9 @@ struct osc_async_page {
 
        struct obd_async_page_ops *oap_caller_ops;
         void                    *oap_caller_data;
+        struct list_head         oap_page_list;
+        struct ldlm_lock        *oap_ldlm_lock;
+        spinlock_t               oap_lock;
 };
 
 #define oap_page        oap_brw_page.pg
index 3a8b7ab..679e78f 100644 (file)
 #include <lustre_log.h>
 #include <lustre_debug.h>
 #include <lustre_param.h>
+#include <lustre_cache.h>
 #include "osc_internal.h"
 
 static quota_interface_t *quota_interface = NULL;
 extern quota_interface_t osc_quota_interface;
 
 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
+int osc_cleanup(struct obd_device *obd);
 
 /* Pack OSC object metadata for disk storage (LE byte order). */
 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
@@ -2560,9 +2562,12 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
                         struct lov_oinfo *loi, cfs_page_t *page,
                         obd_off offset, struct obd_async_page_ops *ops,
-                        void *data, void **res)
+                        void *data, void **res, int nocache,
+                        struct lustre_handle *lockh)
 {
         struct osc_async_page *oap;
+        struct ldlm_res_id oid = {{0}};
+        int rc = 0;
         ENTRY;
 
         if (!page)
@@ -2582,9 +2587,25 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
+        CFS_INIT_LIST_HEAD(&oap->oap_page_list);
 
         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
 
+        spin_lock_init(&oap->oap_lock);
+
+        /* If the page was marked as notcacheable - don't add to any locks */ 
+        if (!nocache) {
+                oid.name[0] = loi->loi_id;
+                oid.name[2] = loi->loi_gr;
+                /* This is the only place where we can call cache_add_extent
+                   without oap_lock, because this page is locked now, and
+                   the lock we are adding it to is referenced, so cannot lose
+                   any pages either. */
+                rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
+                if (rc)
+                        RETURN(rc);
+        }
+
         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
         RETURN(0);
 }
@@ -2869,6 +2890,7 @@ static int osc_teardown_async_page(struct obd_export *exp,
                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
         }
         loi_list_maint(cli, loi);
+        cache_remove_extent(cli->cl_cache, oap);
 
         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
 out:
@@ -2876,6 +2898,49 @@ out:
         RETURN(rc);
 }
 
+int osc_extent_blocking_cb(struct ldlm_lock *lock,
+                           struct ldlm_lock_desc *new, void *data,
+                           int flag)
+{
+        struct lustre_handle lockh = { 0 };
+        int rc;
+        ENTRY;  
+                
+        if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
+                LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
+                LBUG(); 
+        }       
+
+        switch (flag) {
+        case LDLM_CB_BLOCKING:
+                ldlm_lock2handle(lock, &lockh);
+                rc = ldlm_cli_cancel(&lockh);
+                if (rc != ELDLM_OK)
+                        CERROR("ldlm_cli_cancel failed: %d\n", rc);
+                break;
+        case LDLM_CB_CANCELING: {
+
+                ldlm_lock2handle(lock, &lockh);
+                /* This lock wasn't granted, don't try to do anything */
+                if (lock->l_req_mode != lock->l_granted_mode)
+                        RETURN(0);
+
+                cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
+                                  &lockh);
+
+                if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
+                        lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
+                                                          lock, new, data,flag);
+                break;
+        }
+        default:
+                LBUG();
+        }
+
+        RETURN(0);
+}
+EXPORT_SYMBOL(osc_extent_blocking_cb);
+
 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
                                     int flags)
 {
@@ -2920,8 +2985,8 @@ static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
         return 0;
 }
 
-static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
-                            int intent, int rc)
+static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
+                            struct obd_info *oinfo, int intent, int rc)
 {
         ENTRY;
 
@@ -2945,6 +3010,9 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
         }
 
+        if (!rc)
+                cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
+
         /* Call the update callback. */
         rc = oinfo->oi_cb_up(oinfo, rc);
         RETURN(rc);
@@ -2971,7 +3039,7 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req,
                                    aa->oa_oi->oi_lockh, rc);
 
         /* Complete osc stuff. */
-        rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
+        rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
 
         /* Release the lock for async request. */
         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
@@ -3101,7 +3169,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 RETURN(rc);
         }
 
-        rc = osc_enqueue_fini(req, oinfo, intent, rc);
+        rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
         if (intent)
                 ptlrpc_req_finished(req);
 
@@ -3836,6 +3904,11 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
                                             OST_MAXREQSIZE,
                                             ptlrpc_add_rqs_to_pool);
+                cli->cl_cache = cache_create(obd);
+                if (!cli->cl_cache) {
+                        osc_cleanup(obd);
+                        rc = -ENOMEM;
+                }
         }
 
         RETURN(rc);
@@ -3901,12 +3974,50 @@ int osc_cleanup(struct obd_device *obd)
         /* free memory of osc quota cache */
         lquota_cleanup(quota_interface, obd);
 
+        cache_destroy(obd->u.cli.cl_cache);
         rc = client_obd_cleanup(obd);
 
         ptlrpcd_decref();
         RETURN(rc);
 }
 
+static int osc_register_page_removal_cb(struct obd_export *exp,
+                                        obd_page_removal_cb_t func,
+                                        obd_pin_extent_cb pin_cb)
+{
+        return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
+                                           pin_cb);
+}
+
+static int osc_unregister_page_removal_cb(struct obd_export *exp,
+                                          obd_page_removal_cb_t func)
+{
+        return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
+}
+
+static int osc_register_lock_cancel_cb(struct obd_export *exp,
+                                       obd_lock_cancel_cb cb)
+{
+        LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
+
+        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
+        return 0;
+}
+
+static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
+                                         obd_lock_cancel_cb cb)
+{
+        if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
+                CERROR("Unregistering cancel cb %p, while only %p was "
+                       "registered\n", cb,
+                       exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
+                RETURN(-EINVAL);
+        }
+
+        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
+        return 0;
+}
+
 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
 {
         struct lustre_cfg *lcfg = buf;
@@ -3972,6 +4083,10 @@ struct obd_ops osc_obd_ops = {
         .o_llog_init            = osc_llog_init,
         .o_llog_finish          = osc_llog_finish,
         .o_process_config       = osc_process_config,
+        .o_register_page_removal_cb = osc_register_page_removal_cb,
+        .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
+        .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
+        .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
 };
 int __init osc_init(void)
 {
index 91b1867..b7df21d 100644 (file)
@@ -2706,7 +2706,9 @@ test_62() {
         cat $f && error "cat succeeded, expect -EIO"
         lctl set_param fail_loc=0
 }
-run_test 62 "verify obd_match failure doesn't LBUG (should -EIO)"
+# This test is now irrelevant (as of bug 10718 inclusion), we no longer
+# match every page all of the time.
+#run_test 62 "verify obd_match failure doesn't LBUG (should -EIO)"
 
 # bug 2319 - oig_wait() interrupted causes crash because of invalid waitq.
 test_63a() {   # was test_63
@@ -3475,6 +3477,20 @@ test_79() { # bug 12743
 }
 run_test 79 "df report consistency check ======================="
 
+test_80() { # bug 10718
+        dd if=/dev/zero of=$DIR/$tfile bs=1M count=1 seek=1M
+        sync; sleep 1; sync
+        BEFORE=`date +%s`
+        cancel_lru_locks OSC
+        AFTER=`date +%s`
+        DIFF=$((AFTER-BEFORE))
+        if [ $DIFF -gt 1 ] ; then
+                error "elapsed for 1M@1T = $DIFF"
+        fi
+        true
+}
+run_test 80 "Page eviction is equally fast at high offsets too  ===="
+
 # on the LLNL clusters, runas will still pick up root's $TMP settings,
 # which will not be writable for the runas user, and then you get a CVS
 # error message with a corrupt path string (CVS bug) and panic.