because LASSERT on both the data supplied by a client, and the data
on disk is dangerous and incorrect.
+Severity : enhancement
+Bugzilla : 10718
+Description: Slow trucate/writes to huge files at high offsets.
+Details : Directly associate cached pages to lock that protect those pages,
+ this allows us to quickly find what pages to write and remove
+ once lock callback is received.
+
--------------------------------------------------------------------------------
2007-08-10 Cluster File Systems, Inc. <info@clusterfs.com>
obd_ost.h obd_support.h lustre_ver.h lu_object.h lu_time.h \
md_object.h dt_object.h lustre_param.h lustre_mdt.h \
lustre_fid.h lustre_fld.h lustre_req_layout.h lustre_capa.h \
- lustre_idmap.h lustre_eacl.h interval_tree.h obd_cksum.h
+ lustre_idmap.h lustre_eacl.h interval_tree.h obd_cksum.h \
+ lustre_cache.h
void *l_lvb_data; /* an LVB received during */
void *l_lvb_swabber; /* an enqueue */
void *l_ast_data;
+ spinlock_t l_extents_list_lock;
+ struct list_head l_extents_list;
+
+ struct list_head l_cache_locks_list;
/* Server-side-only members */
struct lustre_quota_ctxt obt_qctxt;
};
+typedef void (*obd_pin_extent_cb)(void *data);
+typedef int (*obd_page_removal_cb_t)(void *data, int discard);
+typedef int (*obd_lock_cancel_cb)(struct ldlm_lock *,struct ldlm_lock_desc *,
+ void *, int);
+
/* llog contexts */
enum llog_ctxt_id {
LLOG_CONFIG_ORIG_CTXT = 0,
struct mdc_rpc_lock;
struct obd_import;
+struct lustre_cache;
struct client_obd {
struct semaphore cl_sem;
struct obd_uuid cl_target_uuid;
struct lu_client_seq *cl_seq;
atomic_t cl_resends; /* resend count */
+
+ /* Cache of triples */
+ struct lustre_cache *cl_cache;
+ obd_lock_cancel_cb cl_ext_lock_cancel_cb;
};
#define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid)
__u32 lov_offset_idx; /* aliasing for start_idx */
int lov_start_count;/* reseed counter */
int lov_connects;
+ obd_page_removal_cb_t lov_page_removal_cb;
+ obd_pin_extent_cb lov_page_pin_cb;
+ obd_lock_cancel_cb lov_lock_cancel_cb;
};
struct lmv_tgt_desc {
struct lov_oinfo *loi,
cfs_page_t *page, obd_off offset,
struct obd_async_page_ops *ops, void *data,
- void **res);
+ void **res, int nocache,
+ struct lustre_handle *lockh);
int (*o_queue_async_io)(struct obd_export *exp,
struct lov_stripe_md *lsm,
struct lov_oinfo *loi, void *cookie,
int (*o_quotactl)(struct obd_export *, struct obd_quotactl *);
int (*o_ping)(struct obd_export *exp);
+
+ int (*o_register_page_removal_cb)(struct obd_export *exp,
+ obd_page_removal_cb_t cb,
+ obd_pin_extent_cb pin_cb);
+ int (*o_unregister_page_removal_cb)(struct obd_export *exp,
+ obd_page_removal_cb_t cb);
+ int (*o_register_lock_cancel_cb)(struct obd_export *exp,
+ obd_lock_cancel_cb cb);
+ int (*o_unregister_lock_cancel_cb)(struct obd_export *exp,
+ obd_lock_cancel_cb cb);
+
/*
* NOTE: If adding ops, add another LPROCFS_OBD_OP_INIT() line
* to lprocfs_alloc_obd_stats() in obdclass/lprocfs_status.c.
struct lov_oinfo *loi,
cfs_page_t *page, obd_off offset,
struct obd_async_page_ops *ops,
- void *data, void **res)
+ void *data, void **res, int nocache,
+ struct lustre_handle *lockh)
{
int ret;
ENTRY;
EXP_COUNTER_INCREMENT(exp, prep_async_page);
ret = OBP(exp->exp_obd, prep_async_page)(exp, lsm, loi, page, offset,
- ops, data, res);
+ ops, data, res, nocache,
+ lockh);
RETURN(ret);
}
RETURN(0);
}
+static inline int obd_register_page_removal_cb(struct obd_export *exp,
+ obd_page_removal_cb_t cb,
+ obd_pin_extent_cb pin_cb)
+{
+ int rc;
+ ENTRY;
+
+ OBD_CHECK_DT_OP(exp->exp_obd, register_page_removal_cb, 0);
+ OBD_COUNTER_INCREMENT(exp->exp_obd, register_page_removal_cb);
+
+ rc = OBP(exp->exp_obd, register_page_removal_cb)(exp, cb, pin_cb);
+ RETURN(rc);
+}
+
+static inline int obd_unregister_page_removal_cb(struct obd_export *exp,
+ obd_page_removal_cb_t cb)
+{
+ int rc;
+ ENTRY;
+
+ OBD_CHECK_DT_OP(exp->exp_obd, unregister_page_removal_cb, 0);
+ OBD_COUNTER_INCREMENT(exp->exp_obd, unregister_page_removal_cb);
+
+ rc = OBP(exp->exp_obd, unregister_page_removal_cb)(exp, cb);
+ RETURN(rc);
+}
+
+static inline int obd_register_lock_cancel_cb(struct obd_export *exp,
+ obd_lock_cancel_cb cb)
+{
+ int rc;
+ ENTRY;
+
+ OBD_CHECK_DT_OP(exp->exp_obd, register_lock_cancel_cb, 0);
+ OBD_COUNTER_INCREMENT(exp->exp_obd, register_lock_cancel_cb);
+
+ rc = OBP(exp->exp_obd, register_lock_cancel_cb)(exp, cb);
+ RETURN(rc);
+}
+
+static inline int obd_unregister_lock_cancel_cb(struct obd_export *exp,
+ obd_lock_cancel_cb cb)
+{
+ int rc;
+ ENTRY;
+
+ OBD_CHECK_DT_OP(exp->exp_obd, unregister_lock_cancel_cb, 0);
+ OBD_COUNTER_INCREMENT(exp->exp_obd, unregister_lock_cancel_cb);
+
+ rc = OBP(exp->exp_obd, unregister_lock_cancel_cb)(exp, cb);
+ RETURN(rc);
+}
+
/* metadata helpers */
static inline int md_getstatus(struct obd_export *exp,
struct lu_fid *fid, struct obd_capa **pc)
struct ldlm_enqueue_info*oa_ei;
};
+int osc_extent_blocking_cb(struct ldlm_lock *lock,
+ struct ldlm_lock_desc *new, void *data,
+ int flag);
+
#endif
CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
class_handle_hash(&lock->l_handle, lock_handle_addref);
+ CFS_INIT_LIST_HEAD(&lock->l_extents_list);
+ spin_lock_init(&lock->l_extents_list_lock);
+ CFS_INIT_LIST_HEAD(&lock->l_cache_locks_list);
+
RETURN(lock);
}
int llu_iop_iodone(struct ioctx *ioctxp);
int llu_local_size(struct inode *inode);
int llu_glimpse_size(struct inode *inode);
+int llu_extent_lock_cancel_cb(struct ldlm_lock *lock,
+ struct ldlm_lock_desc *new, void *data,
+ int flag);
int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
struct lov_stripe_md *lsm, int mode,
ldlm_policy_data_t *policy, struct lustre_handle *lockh,
RETURN(stripe);
}
-static int llu_extent_lock_callback(struct ldlm_lock *lock,
- struct ldlm_lock_desc *new, void *data,
- int flag)
+int llu_extent_lock_cancel_cb(struct ldlm_lock *lock,
+ struct ldlm_lock_desc *new, void *data,
+ int flag)
{
struct lustre_handle lockh = { 0 };
int rc;
einfo.ei_type = LDLM_EXTENT;
einfo.ei_mode = LCK_PR;
- einfo.ei_cb_bl = llu_extent_lock_callback;
+ einfo.ei_cb_bl = osc_extent_blocking_cb;
einfo.ei_cb_cp = ldlm_completion_ast;
einfo.ei_cb_gl = llu_glimpse_callback;
einfo.ei_cbdata = inode;
einfo.ei_type = LDLM_EXTENT;
einfo.ei_mode = mode;
- einfo.ei_cb_bl = llu_extent_lock_callback;
+ einfo.ei_cb_bl = osc_extent_blocking_cb;
einfo.ei_cb_cp = ldlm_completion_ast;
einfo.ei_cb_gl = llu_glimpse_callback;
einfo.ei_cbdata = inode;
rc = obd_prep_async_page(exp, lsm, NULL, page,
(obd_off)page->index << CFS_PAGE_SHIFT,
&llu_async_page_ops,
- llap, &llap->llap_cookie);
+ llap, &llap->llap_cookie,
+ 1 /* no cache in liblustre at all */,
+ NULL);
if (rc) {
LASSERT(rc < 0);
llap->llap_cookie = NULL;
if (!llap_cookie_size)
llap_cookie_size = obd_prep_async_page(llu_i2obdexp(inode),
NULL, NULL, NULL, 0,
- NULL, NULL, NULL);
+ NULL, NULL, NULL, 0,
+ NULL);
OBD_ALLOC(group, LLU_IO_GROUP_SIZE(maxpages));
if (!group)
ENTRY;
list_del(&sbi->ll_conn_chain);
+ obd_unregister_lock_cancel_cb(sbi->ll_dt_exp,
+ llu_extent_lock_cancel_cb);
obd_disconnect(sbi->ll_dt_exp);
obd_disconnect(sbi->ll_md_exp);
sbi->ll_dt_exp = class_conn2export(&dt_conn);
sbi->ll_lco.lco_flags = ocd.ocd_connect_flags;
+ err = obd_register_lock_cancel_cb(sbi->ll_dt_exp,
+ llu_extent_lock_cancel_cb);
+ if (err) {
+ CERROR("cannot register lock cancel callback: rc = %d\n", err);
+ GOTO(out_dt, err);
+ }
+
llu_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp);
err = md_getstatus(sbi->ll_md_exp, &rootfid, NULL);
if (err) {
CERROR("cannot mds_connect: rc = %d\n", err);
- GOTO(out_dt, err);
+ GOTO(out_lock_cn_cb, err);
}
CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&rootfid));
sbi->ll_root_fid = rootfid;
OBD_MD_FLGETATTR | OBD_MD_FLBLOCKS, 0, &request);
if (err) {
CERROR("md_getattr failed for root: rc = %d\n", err);
- GOTO(out_dt, err);
+ GOTO(out_lock_cn_cb, err);
}
err = md_get_lustre_md(sbi->ll_md_exp, request,
_sysio_i_gone(root);
out_request:
ptlrpc_req_finished(request);
+out_lock_cn_cb:
+ obd_unregister_lock_cancel_cb(sbi->ll_dt_exp,
+ llu_extent_lock_cancel_cb);
out_dt:
obd_disconnect(sbi->ll_dt_exp);
out_md:
RETURN(stripe);
}
-/* Flush the page cache for an extent as its canceled. When we're on an LOV,
- * we get a lock cancellation for each stripe, so we have to map the obd's
- * region back onto the stripes in the file that it held.
+/* Get extra page reference to ensure it is not going away */
+void ll_pin_extent_cb(void *data)
+{
+ struct page *page = data;
+
+ page_cache_get(page);
+
+ return;
+}
+
+/* Flush the page from page cache for an extent as its canceled.
+ * Page to remove is delivered as @data.
*
- * No one can dirty the extent until we've finished our work and they can
+ * No one can dirty the extent until we've finished our work and they cannot
* enqueue another lock. The DLM protects us from ll_file_read/write here,
* but other kernel actors could have pages locked.
*
+ * If @discard is set, there is no need to write the page if it is dirty.
+ *
* Called with the DLM lock held. */
-void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
- struct ldlm_lock *lock, __u32 stripe)
+int ll_page_removal_cb(void *data, int discard)
{
- ldlm_policy_data_t tmpex;
- unsigned long start, end, count, skip, i, j;
- struct page *page;
- int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
- struct lustre_handle lockh;
- struct address_space *mapping = inode->i_mapping;
-
+ int rc;
+ struct page *page = data;
+ struct address_space *mapping;
+
ENTRY;
- tmpex = lock->l_policy_data;
- CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
- inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
- i_size_read(inode));
-
- /* our locks are page granular thanks to osc_enqueue, we invalidate the
- * whole page. */
- if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
- ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
- LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
- CFS_PAGE_SIZE);
- LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
- LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
-
- count = ~0;
- skip = 0;
- start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
- end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
- if (lsm->lsm_stripe_count > 1) {
- count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
- skip = (lsm->lsm_stripe_count - 1) * count;
- start += start/count * skip + stripe * count;
- if (end != ~0)
- end += end/count * skip + stripe * count;
- }
- if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
- end = ~0;
-
- i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
- CFS_PAGE_SHIFT : 0;
- if (i < end)
- end = i;
-
- CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
- "count: %lu skip: %lu end: %lu%s\n", start, start % count,
- count, skip, end, discard ? " (DISCARDING)" : "");
-
- /* walk through the vmas on the inode and tear down mmaped pages that
- * intersect with the lock. this stops immediately if there are no
- * mmap()ed regions of the file. This is not efficient at all and
- * should be short lived. We'll associate mmap()ed pages with the lock
- * and will be able to find them directly */
- for (i = start; i <= end; i += (j + skip)) {
- j = min(count - (i % count), end - i + 1);
- LASSERT(j > 0);
- LASSERT(mapping);
- if (ll_teardown_mmaps(mapping,
- (__u64)i << CFS_PAGE_SHIFT,
- ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
- break;
- }
-
- /* this is the simplistic implementation of page eviction at
- * cancelation. It is careful to get races with other page
- * lockers handled correctly. fixes from bug 20 will make it
- * more efficient by associating locks with pages and with
- * batching writeback under the lock explicitly. */
- for (i = start, j = start % count; i <= end;
- j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
- if (j == count) {
- CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
- i += skip;
- j = 0;
- if (i > end)
- break;
- }
- LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
- LPU64" >= "LPU64" start %lu i %lu end %lu\n",
- tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
- start, i, end);
- if (!mapping_has_pages(mapping)) {
- CDEBUG(D_INODE|D_PAGE, "nothing left\n");
- break;
- }
+ /* We have page reference already from ll_pin_page */
+ lock_page(page);
- cond_resched();
-
- page = find_lock_page(mapping, i);
- if (page == NULL)
- continue;
- LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
- i, tmpex.l_extent.start);
- if (!discard && PageWriteback(page))
- wait_on_page_writeback(page);
-
- /* page->mapping to check with racing against teardown */
- if (!discard && clear_page_dirty_for_io(page)) {
- rc = ll_call_writepage(inode, page);
- /* either waiting for io to complete or reacquiring
- * the lock that the failed writepage released */
- lock_page(page);
- wait_on_page_writeback(page);
- if (rc < 0) {
- CERROR("writepage inode %lu(%p) of page %p "
- "failed: %d\n", inode->i_ino, inode,
- page, rc);
- if (rc == -ENOSPC)
- set_bit(AS_ENOSPC, &mapping->flags);
- else
- set_bit(AS_EIO, &mapping->flags);
- }
- }
-
- tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
- /* check to see if another DLM lock covers this page b=2765 */
- rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
- LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
- LDLM_FL_TEST_LOCK,
- &lock->l_resource->lr_name, LDLM_EXTENT,
- &tmpex, LCK_PR | LCK_PW, &lockh);
-
- if (rc2 <= 0 && page->mapping != NULL) {
- struct ll_async_page *llap = llap_cast_private(page);
- /* checking again to account for writeback's
- * lock_page() */
- LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
- if (llap)
- ll_ra_accounting(llap, mapping);
- ll_truncate_complete_page(page);
+ /* Already truncated by somebody */
+ if (!page->mapping)
+ GOTO(out, rc = 0);
+ mapping = page->mapping;
+
+ ll_teardown_mmaps(mapping,
+ (__u64)page->index << PAGE_CACHE_SHIFT,
+ ((__u64)page->index<<PAGE_CACHE_SHIFT)|
+ ~PAGE_CACHE_MASK);
+ LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
+
+ if (!discard && clear_page_dirty_for_io(page)) {
+ LASSERT(page->mapping);
+ rc = ll_call_writepage(page->mapping->host, page);
+ /* either waiting for io to complete or reacquiring
+ * the lock that the failed writepage released */
+ lock_page(page);
+ wait_on_page_writeback(page);
+ if (rc != 0) {
+ CERROR("writepage inode %lu(%p) of page %p "
+ "failed: %d\n", mapping->host->i_ino,
+ mapping->host, page, rc);
+ if (rc == -ENOSPC)
+ set_bit(AS_ENOSPC, &mapping->flags);
+ else
+ set_bit(AS_EIO, &mapping->flags);
}
- unlock_page(page);
- page_cache_release(page);
- }
- LASSERTF(tmpex.l_extent.start <=
- (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
- lock->l_policy_data.l_extent.end + 1),
- "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
- tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
- start, i, end);
+ set_bit(AS_EIO, &mapping->flags);
+ }
+ if (page->mapping != NULL) {
+ struct ll_async_page *llap = llap_cast_private(page);
+ /* checking again to account for writeback's lock_page() */
+ LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
+ if (llap)
+ ll_ra_accounting(llap, page->mapping);
+ ll_truncate_complete_page(page);
+ }
EXIT;
+out:
+ LASSERT(!PageWriteback(page));
+ unlock_page(page);
+ page_cache_release(page);
+
+ return 0;
}
-static int ll_extent_lock_callback(struct ldlm_lock *lock,
- struct ldlm_lock_desc *new, void *data,
- int flag)
+int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
+ void *data, int flag)
{
- struct lustre_handle lockh = { 0 };
- int rc;
+ struct inode *inode;
+ struct ll_inode_info *lli;
+ struct lov_stripe_md *lsm;
+ int stripe;
+ __u64 kms;
+
ENTRY;
if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
LBUG();
}
- switch (flag) {
- case LDLM_CB_BLOCKING:
- ldlm_lock2handle(lock, &lockh);
- rc = ldlm_cli_cancel(&lockh);
- if (rc != ELDLM_OK)
- CERROR("ldlm_cli_cancel failed: %d\n", rc);
- break;
- case LDLM_CB_CANCELING: {
- struct inode *inode;
- struct ll_inode_info *lli;
- struct lov_stripe_md *lsm;
- int stripe;
- __u64 kms;
-
- /* This lock wasn't granted, don't try to evict pages */
- if (lock->l_req_mode != lock->l_granted_mode)
- RETURN(0);
-
- inode = ll_inode_from_lock(lock);
- if (inode == NULL)
- RETURN(0);
- lli = ll_i2info(inode);
- if (lli == NULL)
- goto iput;
- if (lli->lli_smd == NULL)
- goto iput;
- lsm = lli->lli_smd;
-
- stripe = ll_lock_to_stripe_offset(inode, lock);
- if (stripe < 0)
- goto iput;
-
- ll_pgcache_remove_extent(inode, lsm, lock, stripe);
+ inode = ll_inode_from_lock(lock);
+ if (inode == NULL)
+ RETURN(0);
+ lli = ll_i2info(inode);
+ if (lli == NULL)
+ GOTO(iput, 0);
+ if (lli->lli_smd == NULL)
+ GOTO(iput, 0);
+ lsm = lli->lli_smd;
- lov_stripe_lock(lsm);
- lock_res_and_lock(lock);
- kms = ldlm_extent_shift_kms(lock,
- lsm->lsm_oinfo[stripe]->loi_kms);
+ stripe = ll_lock_to_stripe_offset(inode, lock);
+ if (stripe < 0)
+ GOTO(iput, 0);
- if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
- LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
- lsm->lsm_oinfo[stripe]->loi_kms, kms);
- lsm->lsm_oinfo[stripe]->loi_kms = kms;
- unlock_res_and_lock(lock);
- lov_stripe_unlock(lsm);
- iput:
- iput(inode);
- break;
- }
- default:
- LBUG();
- }
+ lov_stripe_lock(lsm);
+ lock_res_and_lock(lock);
+ kms = ldlm_extent_shift_kms(lock,
+ lsm->lsm_oinfo[stripe]->loi_kms);
+
+ if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
+ LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
+ lsm->lsm_oinfo[stripe]->loi_kms, kms);
+ lsm->lsm_oinfo[stripe]->loi_kms = kms;
+ unlock_res_and_lock(lock);
+ lov_stripe_unlock(lsm);
+ ll_queue_done_writing(inode, 0);
+ EXIT;
+iput:
+ iput(inode);
- RETURN(0);
+ return 0;
}
#if 0
einfo.ei_type = LDLM_EXTENT;
einfo.ei_mode = LCK_PR;
- einfo.ei_cb_bl = ll_extent_lock_callback;
+ einfo.ei_cb_bl = osc_extent_blocking_cb;
einfo.ei_cb_cp = ldlm_completion_ast;
einfo.ei_cb_gl = ll_glimpse_callback;
einfo.ei_cbdata = NULL;
* acquired only if there were no conflicting locks. */
einfo.ei_type = LDLM_EXTENT;
einfo.ei_mode = LCK_PR;
- einfo.ei_cb_bl = ll_extent_lock_callback;
+ einfo.ei_cb_bl = osc_extent_blocking_cb;
einfo.ei_cb_cp = ldlm_completion_ast;
einfo.ei_cb_gl = ll_glimpse_callback;
einfo.ei_cbdata = inode;
einfo.ei_type = LDLM_EXTENT;
einfo.ei_mode = mode;
- einfo.ei_cb_bl = ll_extent_lock_callback;
+ einfo.ei_cb_bl = osc_extent_blocking_cb;
einfo.ei_cb_cp = ldlm_completion_ast;
einfo.ei_cb_gl = ll_glimpse_callback;
einfo.ei_cbdata = inode;
llap_origin:3,
llap_ra_used:1,
llap_ignore_quota:1,
+ llap_nocache:1,
llap_lockless_io_page:1;
void *llap_cookie;
struct page *llap_page;
int set_default);
int ll_dir_getstripe(struct inode *inode, struct lov_mds_md **lmm,
int *lmm_size, struct ptlrpc_request **request);
+void ll_pin_extent_cb(void *data);
+int ll_page_removal_cb(void *data, int discard);
+int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
+ void *data, int flag);
/* llite/dcache.c */
extern struct dentry_operations ll_init_d_ops;
#include <lustre_param.h>
#include <lustre_log.h>
#include <obd_cksum.h>
+#include <lustre_cache.h>
#include "llite_internal.h"
cfs_mem_cache_t *ll_file_data_slab;
sbi->ll_lco.lco_flags = data->ocd_connect_flags;
spin_unlock(&sbi->ll_lco.lco_lock);
- ll_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp);
+ err = obd_register_page_removal_cb(sbi->ll_dt_exp,
+ ll_page_removal_cb,
+ ll_pin_extent_cb);
+ if (err) {
+ CERROR("cannot register page removal callback: rc = %d\n",err);
+ GOTO(out_dt, err);
+ }
+ err = obd_register_lock_cancel_cb(sbi->ll_dt_exp,
+ ll_extent_lock_cancel_cb);
+ if (err) {
+ CERROR("cannot register lock cancel callback: rc = %d\n", err);
+ GOTO(out_page_rm_cb, err);
+ }
+
+ err = ll_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp);;
+ if (err) {
+ CERROR("cannot set max EA and cookie sizes: rc = %d\n", err);
+ GOTO(out_lock_cn_cb, err);
+ }
err = obd_prep_async_page(sbi->ll_dt_exp, NULL, NULL, NULL,
- 0, NULL, NULL, NULL);
+ 0, NULL, NULL, NULL, 0, NULL);
if (err < 0) {
LCONSOLE_ERROR_MSG(0x151, "There are no OST's in this "
"filesystem. There must be at least one "
"active OST for a client to start.\n");
- GOTO(out_dt_fid, err);
+ GOTO(out_lock_cn_cb, err);
}
if (!ll_async_page_slab) {
ll_async_page_slab_size,
0, 0);
if (!ll_async_page_slab)
- GOTO(out_dt_fid, err = -ENOMEM);
+ GOTO(out_lock_cn_cb, err = -ENOMEM);
}
err = md_getstatus(sbi->ll_md_exp, &rootfid, &oc);
if (err) {
CERROR("cannot mds_connect: rc = %d\n", err);
- GOTO(out_dt_fid, err);
+ GOTO(out_lock_cn_cb, err);
}
CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&rootfid));
sbi->ll_root_fid = rootfid;
free_capa(oc);
if (err) {
CERROR("md_getattr failed for root: rc = %d\n", err);
- GOTO(out_dt_fid, err);
+ GOTO(out_lock_cn_cb, err);
}
memset(&lmd, 0, sizeof(lmd));
err = md_get_lustre_md(sbi->ll_md_exp, request, sbi->ll_dt_exp,
if (err) {
CERROR("failed to understand root inode md: rc = %d\n", err);
ptlrpc_req_finished (request);
- GOTO(out_dt_fid, err);
+ GOTO(out_lock_cn_cb, err);
}
LASSERT(fid_is_sane(&sbi->ll_root_fid));
out_root:
if (root)
iput(root);
-out_dt_fid:
+out_lock_cn_cb:
+ obd_unregister_lock_cancel_cb(sbi->ll_dt_exp,
+ ll_extent_lock_cancel_cb);
+out_page_rm_cb:
+ obd_unregister_page_removal_cb(sbi->ll_dt_exp,
+ ll_page_removal_cb);
obd_fid_fini(sbi->ll_dt_exp);
out_dt:
obd_disconnect(sbi->ll_dt_exp);
list_del(&sbi->ll_conn_chain);
+ obd_unregister_page_removal_cb(sbi->ll_dt_exp,
+ ll_page_removal_cb);
+ obd_unregister_lock_cancel_cb(sbi->ll_dt_exp,ll_extent_lock_cancel_cb);
+
obd_fid_fini(sbi->ll_dt_exp);
obd_disconnect(sbi->ll_dt_exp);
sbi->ll_dt_exp = NULL;
return count;
}
-struct ll_async_page *llap_from_page(struct page *page, unsigned origin)
+static struct ll_async_page *llap_from_page_with_lockh(struct page *page,
+ unsigned origin,
+ struct lustre_handle *lockh)
{
struct ll_async_page *llap;
struct obd_export *exp;
llap->llap_magic = LLAP_MAGIC;
llap->llap_cookie = (void *)llap + size_round(sizeof(*llap));
+ /* XXX: for bug 11270 - check for lockless origin here! */
+ if (origin == LLAP_ORIGIN_LOCKLESS_IO)
+ llap->llap_nocache = 1;
+
rc = obd_prep_async_page(exp, ll_i2info(inode)->lli_smd, NULL, page,
(obd_off)page->index << CFS_PAGE_SHIFT,
- &ll_async_page_ops, llap, &llap->llap_cookie);
+ &ll_async_page_ops, llap, &llap->llap_cookie,
+ llap->llap_nocache, lockh);
if (rc) {
OBD_SLAB_FREE(llap, ll_async_page_slab,
ll_async_page_slab_size);
RETURN(llap);
}
+struct ll_async_page *llap_from_page(struct page *page,
+ unsigned origin)
+{
+ return llap_from_page_with_lockh(page, origin, NULL);
+}
+
static int queue_or_sync_write(struct obd_export *exp, struct inode *inode,
struct ll_async_page *llap,
unsigned to, obd_flag async_flags)
int ll_commit_write(struct file *file, struct page *page, unsigned from,
unsigned to)
{
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
struct inode *inode = page->mapping->host;
struct ll_inode_info *lli = ll_i2info(inode);
struct lov_stripe_md *lsm = lli->lli_smd;
struct obd_export *exp;
struct ll_async_page *llap;
loff_t size;
+ struct lustre_handle *lockh = NULL;
int rc = 0;
ENTRY;
CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n",
inode, page, from, to, page->index);
- llap = llap_from_page(page, LLAP_ORIGIN_COMMIT_WRITE);
+ if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
+ lockh = &fd->fd_cwlockh;
+
+ llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_COMMIT_WRITE, lockh);
if (IS_ERR(llap))
RETURN(PTR_ERR(llap));
* here. */
void ll_removepage(struct page *page)
{
+ struct ll_async_page *llap = llap_cast_private(page);
ENTRY;
LASSERT(!in_interrupt());
return;
}
- LASSERT(!llap_cast_private(page)->llap_lockless_io_page);
+ LASSERT(!llap->llap_lockless_io_page);
+ LASSERT(!llap->llap_nocache);
LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n");
__ll_put_llap(page);
EXIT;
}
-static int ll_page_matches(struct page *page, int fd_flags)
-{
- struct lustre_handle match_lockh = {0};
- struct inode *inode = page->mapping->host;
- ldlm_policy_data_t page_extent;
- int flags, matches;
- ENTRY;
-
- if (unlikely(fd_flags & LL_FILE_GROUP_LOCKED))
- RETURN(1);
-
- page_extent.l_extent.start = (__u64)page->index << CFS_PAGE_SHIFT;
- page_extent.l_extent.end =
- page_extent.l_extent.start + CFS_PAGE_SIZE - 1;
- flags = LDLM_FL_TEST_LOCK | LDLM_FL_BLOCK_GRANTED;
- if (!(fd_flags & LL_FILE_READAHEAD))
- flags |= LDLM_FL_CBPENDING;
- matches = obd_match(ll_i2sbi(inode)->ll_dt_exp,
- ll_i2info(inode)->lli_smd, LDLM_EXTENT,
- &page_extent, LCK_PR | LCK_PW, &flags, inode,
- &match_lockh);
- RETURN(matches);
-}
-
static int ll_issue_page_read(struct obd_export *exp,
struct ll_async_page *llap,
struct obd_io_group *oig, int defer)
if (IS_ERR(llap))
GOTO(out, rc = PTR_ERR(llap));
+ LASSERT(!llap->llap_nocache);
LASSERT(!PageWriteback(page));
set_page_writeback(page);
struct obd_export *exp;
struct ll_async_page *llap;
struct obd_io_group *oig = NULL;
+ struct lustre_handle *lockh = NULL;
int rc;
ENTRY;
if (exp == NULL)
GOTO(out, rc = -EINVAL);
- llap = llap_from_page(page, LLAP_ORIGIN_READPAGE);
- if (IS_ERR(llap))
+ if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
+ lockh = &fd->fd_cwlockh;
+
+ llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READPAGE, lockh);
+ if (IS_ERR(llap)) {
+ if (PTR_ERR(llap) == -ENOLCK) {
+ CWARN("ino %lu page %lu (%llu) not covered by "
+ "a lock (mmap?). check debug logs.\n",
+ inode->i_ino, page->index,
+ (long long)page->index << PAGE_CACHE_SHIFT);
+ }
GOTO(out, rc = PTR_ERR(llap));
+ }
if (ll_i2sbi(inode)->ll_ra_info.ra_max_pages)
ras_update(ll_i2sbi(inode), inode, &fd->fd_ras, page->index,
GOTO(out_oig, rc = 0);
}
- if (likely((fd->fd_flags & LL_FILE_IGNORE_LOCK) == 0)) {
- rc = ll_page_matches(page, fd->fd_flags);
- if (rc < 0) {
- LL_CDEBUG_PAGE(D_ERROR, page,
- "lock match failed: rc %d\n", rc);
- GOTO(out, rc);
- }
-
- if (rc == 0) {
- CWARN("ino %lu page %lu (%llu) not covered by "
- "a lock (mmap?). check debug logs.\n",
- inode->i_ino, page->index,
- (long long)page->index << CFS_PAGE_SHIFT);
- }
- }
-
rc = ll_issue_page_read(exp, llap, oig, 0);
if (rc)
GOTO(out, rc);
#include <obd_ost.h>
#include <lprocfs_status.h>
#include <lustre_param.h>
+#include <lustre_cache.h>
#include "lov_internal.h"
mutex_up(&lov->lov_lock);
}
+static int lov_register_page_removal_cb(struct obd_export *exp,
+ obd_page_removal_cb_t func,
+ obd_pin_extent_cb pin_cb)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int i, rc = 0;
+
+ if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
+ return -EBUSY;
+
+ if (lov->lov_page_pin_cb && lov->lov_page_pin_cb != pin_cb)
+ return -EBUSY;
+
+ for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+ if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+ continue;
+ rc |= obd_register_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
+ func, pin_cb);
+ }
+
+ lov->lov_page_removal_cb = func;
+ lov->lov_page_pin_cb = pin_cb;
+
+ return rc;
+}
+
+static int lov_unregister_page_removal_cb(struct obd_export *exp,
+ obd_page_removal_cb_t func)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int i, rc = 0;
+
+ if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
+ return -EINVAL;
+
+ lov->lov_page_removal_cb = NULL;
+ lov->lov_page_pin_cb = NULL;
+
+ for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+ if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+ continue;
+ rc |= obd_unregister_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
+ func);
+ }
+
+ return rc;
+}
+
+static int lov_register_lock_cancel_cb(struct obd_export *exp,
+ obd_lock_cancel_cb func)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int i, rc = 0;
+
+ if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
+ return -EBUSY;
+
+ for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+ if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+ continue;
+ rc |= obd_register_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
+ func);
+ }
+
+ lov->lov_lock_cancel_cb = func;
+
+ return rc;
+}
+
+static int lov_unregister_lock_cancel_cb(struct obd_export *exp,
+ obd_lock_cancel_cb func)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int i, rc = 0;
+
+ if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
+ return -EINVAL;
+
+ for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+ if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+ continue;
+ rc |= obd_unregister_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
+ func);
+ }
+ lov->lov_lock_cancel_cb = NULL;
+ return rc;
+}
+
#define MAX_STRING_SIZE 128
static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
struct obd_connect_data *data)
RETURN(-ENODEV);
}
+ rc = obd_register_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+ lov->lov_page_removal_cb,
+ lov->lov_page_pin_cb);
+ if (rc) {
+ obd_disconnect(lov->lov_tgts[index]->ltd_exp);
+ lov->lov_tgts[index]->ltd_exp = NULL;
+ RETURN(rc);
+ }
+
+ rc = obd_register_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
+ lov->lov_lock_cancel_cb);
+ if (rc) {
+ obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+ lov->lov_page_removal_cb);
+ obd_disconnect(lov->lov_tgts[index]->ltd_exp);
+ lov->lov_tgts[index]->ltd_exp = NULL;
+ RETURN(rc);
+ }
+
rc = obd_register_observer(tgt_obd, obd);
if (rc) {
CERROR("Target %s register_observer error %d\n",
obd_uuid2str(&tgt_uuid), rc);
+ obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
+ lov->lov_lock_cancel_cb);
+ obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+ lov->lov_page_removal_cb);
obd_disconnect(lov->lov_tgts[index]->ltd_exp);
lov->lov_tgts[index]->ltd_exp = NULL;
RETURN(rc);
CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
obd->obd_name, osc_obd->obd_name);
+ obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
+ lov->lov_lock_cancel_cb);
+ obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+ lov->lov_page_removal_cb);
if (lov->lov_tgts[index]->ltd_active) {
lov->lov_tgts[index]->ltd_active = 0;
int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
struct lov_oinfo *loi, cfs_page_t *page,
obd_off offset, struct obd_async_page_ops *ops,
- void *data, void **res)
+ void *data, void **res, int nocache,
+ struct lustre_handle *lockh)
{
struct lov_obd *lov = &exp->exp_obd->u.lov;
struct lov_async_page *lap;
+ struct lov_lock_handles *lov_lockh = NULL;
int rc = 0;
ENTRY;
}
rc = size_round(sizeof(*lap)) +
obd_prep_async_page(lov->lov_tgts[i]->ltd_exp, NULL,
- NULL, NULL, 0, NULL, NULL, NULL);
+ NULL, NULL, 0, NULL, NULL, NULL, 0,
+ NULL);
RETURN(rc);
}
ASSERT_LSM_MAGIC(lsm);
lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap));
+ if (lockh) {
+ lov_lockh = lov_handle2llh(lockh);
+ if (lov_lockh) {
+ lockh = lov_lockh->llh_handles + lap->lap_stripe;
+ }
+ }
+
rc = obd_prep_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
lsm, loi, page, lap->lap_sub_offset,
&lov_async_page_ops, lap,
- &lap->lap_sub_cookie);
+ &lap->lap_sub_cookie, nocache, lockh);
+ if (lov_lockh)
+ lov_llh_put(lov_lockh);
if (rc)
RETURN(rc);
CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
.o_llog_init = lov_llog_init,
.o_llog_finish = lov_llog_finish,
.o_notify = lov_notify,
+ .o_register_page_removal_cb = lov_register_page_removal_cb,
+ .o_unregister_page_removal_cb = lov_unregister_page_removal_cb,
+ .o_register_lock_cancel_cb = lov_register_lock_cancel_cb,
+ .o_unregister_lock_cancel_cb = lov_unregister_lock_cancel_cb,
};
static quota_interface_t *quota_interface;
LPROCFS_OBD_OP_INIT(num_private_stats, stats, quotacheck);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, quotactl);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, ping);
+ LPROCFS_OBD_OP_INIT(num_private_stats, stats, register_page_removal_cb);
+ LPROCFS_OBD_OP_INIT(num_private_stats,stats,unregister_page_removal_cb);
+ LPROCFS_OBD_OP_INIT(num_private_stats, stats, register_lock_cancel_cb);
+ LPROCFS_OBD_OP_INIT(num_private_stats, stats,unregister_lock_cancel_cb);
}
int lprocfs_alloc_obd_stats(struct obd_device *obd, unsigned num_private_stats)
rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page,
eap->eap_off, &ec_async_page_ops,
- eap, &eap->eap_cookie);
+ eap, &eap->eap_cookie, 1, NULL);
if (rc) {
spin_lock(&eas.eas_lock);
eas.eas_rc = rc;
MODULES := osc
-osc-objs := osc_request.o lproc_osc.o osc_create.o
+osc-objs := osc_request.o lproc_osc.o osc_create.o cache.o
@INCLUDE_RULES@
if LIBLUSTRE
noinst_LIBRARIES = libosc.a
-libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h
+libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h cache.c
libosc_a_CPPFLAGS = $(LLCPPFLAGS)
libosc_a_CFLAGS = $(LLCFLAGS)
endif
struct obd_async_page_ops *oap_caller_ops;
void *oap_caller_data;
+ struct list_head oap_page_list;
+ struct ldlm_lock *oap_ldlm_lock;
+ spinlock_t oap_lock;
};
#define oap_page oap_brw_page.pg
#include <lustre_log.h>
#include <lustre_debug.h>
#include <lustre_param.h>
+#include <lustre_cache.h>
#include "osc_internal.h"
static quota_interface_t *quota_interface = NULL;
extern quota_interface_t osc_quota_interface;
static void osc_release_ppga(struct brw_page **ppga, obd_count count);
+int osc_cleanup(struct obd_device *obd);
/* Pack OSC object metadata for disk storage (LE byte order). */
static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
struct lov_oinfo *loi, cfs_page_t *page,
obd_off offset, struct obd_async_page_ops *ops,
- void *data, void **res)
+ void *data, void **res, int nocache,
+ struct lustre_handle *lockh)
{
struct osc_async_page *oap;
+ struct ldlm_res_id oid = {{0}};
+ int rc = 0;
ENTRY;
if (!page)
CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
+ CFS_INIT_LIST_HEAD(&oap->oap_page_list);
oap->oap_occ.occ_interrupted = osc_occ_interrupted;
+ spin_lock_init(&oap->oap_lock);
+
+ /* If the page was marked as notcacheable - don't add to any locks */
+ if (!nocache) {
+ oid.name[0] = loi->loi_id;
+ oid.name[2] = loi->loi_gr;
+ /* This is the only place where we can call cache_add_extent
+ without oap_lock, because this page is locked now, and
+ the lock we are adding it to is referenced, so cannot lose
+ any pages either. */
+ rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
+ if (rc)
+ RETURN(rc);
+ }
+
CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
RETURN(0);
}
lop_update_pending(cli, lop, oap->oap_cmd, -1);
}
loi_list_maint(cli, loi);
+ cache_remove_extent(cli->cl_cache, oap);
LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
out:
RETURN(rc);
}
+int osc_extent_blocking_cb(struct ldlm_lock *lock,
+ struct ldlm_lock_desc *new, void *data,
+ int flag)
+{
+ struct lustre_handle lockh = { 0 };
+ int rc;
+ ENTRY;
+
+ if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
+ LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
+ LBUG();
+ }
+
+ switch (flag) {
+ case LDLM_CB_BLOCKING:
+ ldlm_lock2handle(lock, &lockh);
+ rc = ldlm_cli_cancel(&lockh);
+ if (rc != ELDLM_OK)
+ CERROR("ldlm_cli_cancel failed: %d\n", rc);
+ break;
+ case LDLM_CB_CANCELING: {
+
+ ldlm_lock2handle(lock, &lockh);
+ /* This lock wasn't granted, don't try to do anything */
+ if (lock->l_req_mode != lock->l_granted_mode)
+ RETURN(0);
+
+ cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
+ &lockh);
+
+ if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
+ lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
+ lock, new, data,flag);
+ break;
+ }
+ default:
+ LBUG();
+ }
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(osc_extent_blocking_cb);
+
static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
int flags)
{
return 0;
}
-static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
- int intent, int rc)
+static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
+ struct obd_info *oinfo, int intent, int rc)
{
ENTRY;
oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
}
+ if (!rc)
+ cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
+
/* Call the update callback. */
rc = oinfo->oi_cb_up(oinfo, rc);
RETURN(rc);
aa->oa_oi->oi_lockh, rc);
/* Complete osc stuff. */
- rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
+ rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
/* Release the lock for async request. */
if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
RETURN(rc);
}
- rc = osc_enqueue_fini(req, oinfo, intent, rc);
+ rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
if (intent)
ptlrpc_req_finished(req);
ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
OST_MAXREQSIZE,
ptlrpc_add_rqs_to_pool);
+ cli->cl_cache = cache_create(obd);
+ if (!cli->cl_cache) {
+ osc_cleanup(obd);
+ rc = -ENOMEM;
+ }
}
RETURN(rc);
/* free memory of osc quota cache */
lquota_cleanup(quota_interface, obd);
+ cache_destroy(obd->u.cli.cl_cache);
rc = client_obd_cleanup(obd);
ptlrpcd_decref();
RETURN(rc);
}
+static int osc_register_page_removal_cb(struct obd_export *exp,
+ obd_page_removal_cb_t func,
+ obd_pin_extent_cb pin_cb)
+{
+ return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
+ pin_cb);
+}
+
+static int osc_unregister_page_removal_cb(struct obd_export *exp,
+ obd_page_removal_cb_t func)
+{
+ return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
+}
+
+static int osc_register_lock_cancel_cb(struct obd_export *exp,
+ obd_lock_cancel_cb cb)
+{
+ LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
+
+ exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
+ return 0;
+}
+
+static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
+ obd_lock_cancel_cb cb)
+{
+ if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
+ CERROR("Unregistering cancel cb %p, while only %p was "
+ "registered\n", cb,
+ exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
+ RETURN(-EINVAL);
+ }
+
+ exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
+ return 0;
+}
+
static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
{
struct lustre_cfg *lcfg = buf;
.o_llog_init = osc_llog_init,
.o_llog_finish = osc_llog_finish,
.o_process_config = osc_process_config,
+ .o_register_page_removal_cb = osc_register_page_removal_cb,
+ .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
+ .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
+ .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
};
int __init osc_init(void)
{
cat $f && error "cat succeeded, expect -EIO"
lctl set_param fail_loc=0
}
-run_test 62 "verify obd_match failure doesn't LBUG (should -EIO)"
+# This test is now irrelevant (as of bug 10718 inclusion), we no longer
+# match every page all of the time.
+#run_test 62 "verify obd_match failure doesn't LBUG (should -EIO)"
# bug 2319 - oig_wait() interrupted causes crash because of invalid waitq.
test_63a() { # was test_63
}
run_test 79 "df report consistency check ======================="
+test_80() { # bug 10718
+ dd if=/dev/zero of=$DIR/$tfile bs=1M count=1 seek=1M
+ sync; sleep 1; sync
+ BEFORE=`date +%s`
+ cancel_lru_locks OSC
+ AFTER=`date +%s`
+ DIFF=$((AFTER-BEFORE))
+ if [ $DIFF -gt 1 ] ; then
+ error "elapsed for 1M@1T = $DIFF"
+ fi
+ true
+}
+run_test 80 "Page eviction is equally fast at high offsets too ===="
+
# on the LLNL clusters, runas will still pick up root's $TMP settings,
# which will not be writable for the runas user, and then you get a CVS
# error message with a corrupt path string (CVS bug) and panic.