don't allow someone with root access on a client node to be able
to manipulate files owned by root on a server node.
+Severity : normal
+Bugzilla : 10718
+Description: Slow trucate/writes to huge files at high offsets.
+Details : Directly associate cached pages to lock that protect those pages,
+ this allows us to quickly find what pages to write and remove
+ once lock callback is received.
+
--------------------------------------------------------------------------------
2007-10-26 Cluster File Systems, Inc. <info@clusterfs.com>
lustre_lite.h lustre_log.h lustre_mds.h lustre_net.h \
lustre_param.h lustre_quota.h lustre_ucache.h lvfs.h \
obd_cache.h obd_class.h obd_echo.h obd.h obd_lov.h \
- obd_ost.h obd_support.h lustre_ver.h
+ obd_ost.h obd_support.h lustre_cache.h lustre_ver.h
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ */
+
+#ifndef LUSTRE_CACHE_H
+#define LUSTRE_CACHE_H
+#include <obd.h>
+#include <lustre/lustre_idl.h>
+#include <lustre_dlm.h>
+
+struct lustre_cache;
+struct osc_async_page;
+struct page_removal_cb_element {
+ struct list_head prce_list;
+ obd_page_removal_cb_t prce_callback;
+};
+
+typedef int (*cache_iterate_extents_cb_t)(struct lustre_cache *,
+ struct lustre_handle *,
+ struct osc_async_page *,
+ void *);
+typedef int (*cache_iterate_locks_cb_t)(struct lustre_cache *,
+ struct ldlm_res_id *,
+ struct lustre_handle *, void *);
+
+struct lustre_cache {
+ struct list_head lc_locks_list;
+ spinlock_t lc_locks_list_lock;
+ struct list_head lc_page_removal_callback_list;
+ struct obd_device *lc_obd;
+ obd_pin_extent_cb lc_pin_extent_cb;
+};
+
+int cache_add_lock(struct lustre_cache *cache, struct lustre_handle *lockh);
+int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res,
+ struct osc_async_page *extent,
+ struct lustre_handle *lockh);
+void cache_remove_extent(struct lustre_cache *, struct osc_async_page *);
+int cache_add_extent_removal_cb(struct lustre_cache *cache,
+ obd_page_removal_cb_t func_cb,
+ obd_pin_extent_cb pin_cb);
+int cache_del_extent_removal_cb(struct lustre_cache *cache,
+ obd_page_removal_cb_t func_cb);
+int cache_iterate_extents(struct lustre_cache *cache, struct lustre_handle *lockh,
+ cache_iterate_extents_cb_t cb_func, void *data);
+int cache_remove_lock(struct lustre_cache *cache, struct lustre_handle *lockh);
+int cache_iterate_locks(struct lustre_cache *cache, struct ldlm_res_id *res,
+ cache_iterate_locks_cb_t cb_fun, void *data);
+struct lustre_cache *cache_create(struct obd_device *obd);
+int cache_destroy(struct lustre_cache *cache);
+
+
+#endif /* LUSTRE_CACHE_H */
void *l_lvb_data; /* an LVB received during */
void *l_lvb_swabber; /* an enqueue */
void *l_ast_data;
+ spinlock_t l_extents_list_lock;
+ struct list_head l_extents_list;
+
+ struct list_head l_cache_locks_list;
/* Server-side-only members */
struct lustre_quota_ctxt obt_qctxt;
};
+typedef void (*obd_pin_extent_cb)(void *data);
+typedef int (*obd_page_removal_cb_t)(void *data, int discard);
+typedef int (*obd_lock_cancel_cb)(struct ldlm_lock *,struct ldlm_lock_desc *,
+ void *, int);
+
#define FILTER_GROUP_LLOG 1
#define FILTER_GROUP_ECHO 2
struct mdc_rpc_lock;
struct obd_import;
+struct lustre_cache;
struct client_obd {
struct semaphore cl_sem;
struct obd_uuid cl_target_uuid;
/* used by quotacheck */
int cl_qchk_stat; /* quotacheck stat of the peer */
-
atomic_t cl_resends; /* resend count */
+ /* Cache of triples */
+ struct lustre_cache *cl_cache;
+ obd_lock_cancel_cb cl_ext_lock_cancel_cb;
};
#define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid)
__u32 lov_offset_idx; /* aliasing for start_idx */
int lov_start_count;/* reseed counter */
int lov_connects;
+ obd_page_removal_cb_t lov_page_removal_cb;
+ obd_pin_extent_cb lov_page_pin_cb;
+ obd_lock_cancel_cb lov_lock_cancel_cb;
};
struct niobuf_local {
struct lov_oinfo *loi,
cfs_page_t *page, obd_off offset,
struct obd_async_page_ops *ops, void *data,
- void **res);
+ void **res, int nocache,
+ struct lustre_handle *lockh);
int (*o_queue_async_io)(struct obd_export *exp,
struct lov_stripe_md *lsm,
struct lov_oinfo *loi, void *cookie,
int (*o_quotactl)(struct obd_export *, struct obd_quotactl *);
int (*o_ping)(struct obd_export *exp);
+
+ int (*o_register_page_removal_cb)(struct obd_export *exp,
+ obd_page_removal_cb_t cb,
+ obd_pin_extent_cb pin_cb);
+ int (*o_unregister_page_removal_cb)(struct obd_export *exp,
+ obd_page_removal_cb_t cb);
+ int (*o_register_lock_cancel_cb)(struct obd_export *exp,
+ obd_lock_cancel_cb cb);
+ int (*o_unregister_lock_cancel_cb)(struct obd_export *exp,
+ obd_lock_cancel_cb cb);
+
/*
* NOTE: If adding ops, add another LPROCFS_OBD_OP_INIT() line
* to lprocfs_alloc_obd_stats() in obdclass/lprocfs_status.c.
struct lov_oinfo *loi,
cfs_page_t *page, obd_off offset,
struct obd_async_page_ops *ops,
- void *data, void **res)
+ void *data, void **res, int nocache,
+ struct lustre_handle *lockh)
{
int ret;
ENTRY;
EXP_COUNTER_INCREMENT(exp, prep_async_page);
ret = OBP(exp->exp_obd, prep_async_page)(exp, lsm, loi, page, offset,
- ops, data, res);
+ ops, data, res, nocache,
+ lockh);
RETURN(ret);
}
RETURN(0);
}
+static inline int obd_register_page_removal_cb(struct obd_export *exp,
+ obd_page_removal_cb_t cb,
+ obd_pin_extent_cb pin_cb)
+{
+ int rc;
+ ENTRY;
+
+ OBD_CHECK_OP(exp->exp_obd, register_page_removal_cb, 0);
+ OBD_COUNTER_INCREMENT(exp->exp_obd, register_page_removal_cb);
+
+ rc = OBP(exp->exp_obd, register_page_removal_cb)(exp, cb, pin_cb);
+ RETURN(rc);
+}
+
+static inline int obd_unregister_page_removal_cb(struct obd_export *exp,
+ obd_page_removal_cb_t cb)
+{
+ int rc;
+ ENTRY;
+
+ OBD_CHECK_OP(exp->exp_obd, unregister_page_removal_cb, 0);
+ OBD_COUNTER_INCREMENT(exp->exp_obd, unregister_page_removal_cb);
+
+ rc = OBP(exp->exp_obd, unregister_page_removal_cb)(exp, cb);
+ RETURN(rc);
+}
+
+static inline int obd_register_lock_cancel_cb(struct obd_export *exp,
+ obd_lock_cancel_cb cb)
+{
+ int rc;
+ ENTRY;
+
+ OBD_CHECK_OP(exp->exp_obd, register_lock_cancel_cb, 0);
+ OBD_COUNTER_INCREMENT(exp->exp_obd, register_lock_cancel_cb);
+
+ rc = OBP(exp->exp_obd, register_lock_cancel_cb)(exp, cb);
+ RETURN(rc);
+}
+
+static inline int obd_unregister_lock_cancel_cb(struct obd_export *exp,
+ obd_lock_cancel_cb cb)
+{
+ int rc;
+ ENTRY;
+
+ OBD_CHECK_OP(exp->exp_obd, unregister_lock_cancel_cb, 0);
+ OBD_COUNTER_INCREMENT(exp->exp_obd, unregister_lock_cancel_cb);
+
+ rc = OBP(exp->exp_obd, unregister_lock_cancel_cb)(exp, cb);
+ RETURN(rc);
+}
+
/* OBD Metadata Support */
extern int obd_init_caches(void);
struct ldlm_enqueue_info*oa_ei;
};
+int osc_extent_blocking_cb(struct ldlm_lock *lock,
+ struct ldlm_lock_desc *new, void *data,
+ int flag);
+
#endif
#define OBD_FAIL_LDLM_CANCEL_RACE 0x310
#define OBD_FAIL_LDLM_CANCEL_EVICT_RACE 0x311
#define OBD_FAIL_LDLM_PAUSE_CANCEL 0x312
+#define OBD_FAIL_LDLM_CLOSE_THREAD 0x313
#define OBD_FAIL_OSC 0x400
#define OBD_FAIL_OSC_BRW_READ_BULK 0x401
CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
class_handle_hash(&lock->l_handle, lock_handle_addref);
+ CFS_INIT_LIST_HEAD(&lock->l_extents_list);
+ spin_lock_init(&lock->l_extents_list_lock);
+ CFS_INIT_LIST_HEAD(&lock->l_cache_locks_list);
+
RETURN(lock);
}
rc = obd_prep_async_page(exp, lsm, NULL, page,
(obd_off)page->index << CFS_PAGE_SHIFT,
&llu_async_page_ops,
- llap, &llap->llap_cookie);
+ llap, &llap->llap_cookie,
+ 1 /* no cache in liblustre at all */,
+ NULL);
if (rc) {
LASSERT(rc < 0);
llap->llap_cookie = NULL;
if (!llap_cookie_size)
llap_cookie_size = obd_prep_async_page(llu_i2obdexp(inode),
NULL, NULL, NULL, 0,
- NULL, NULL, NULL);
+ NULL, NULL, NULL, 0,
+ NULL);
OBD_ALLOC(group, LLU_IO_GROUP_SIZE(maxpages));
if (!group)
RETURN(stripe);
}
-/* Flush the page cache for an extent as its canceled. When we're on an LOV,
- * we get a lock cancellation for each stripe, so we have to map the obd's
- * region back onto the stripes in the file that it held.
+/* Get extra page reference to ensure it is not going away */
+void ll_pin_extent_cb(void *data)
+{
+ struct page *page = data;
+
+ page_cache_get(page);
+
+ return;
+}
+/* Flush the page from page cache for an extent as its canceled.
+ * Page to remove is delivered as @data.
*
- * No one can dirty the extent until we've finished our work and they can
+ * No one can dirty the extent until we've finished our work and they cannot
* enqueue another lock. The DLM protects us from ll_file_read/write here,
* but other kernel actors could have pages locked.
*
+ * If @discard is set, there is no need to write the page if it is dirty.
+ *
* Called with the DLM lock held. */
-void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
- struct ldlm_lock *lock, __u32 stripe)
+int ll_page_removal_cb(void *data, int discard)
{
- ldlm_policy_data_t tmpex;
- unsigned long start, end, count, skip, i, j;
- struct page *page;
- int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
- struct lustre_handle lockh;
- struct address_space *mapping = inode->i_mapping;
+ int rc;
+ struct page *page = data;
+ struct address_space *mapping;
ENTRY;
- tmpex = lock->l_policy_data;
- CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
- inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
- i_size_read(inode));
-
- /* our locks are page granular thanks to osc_enqueue, we invalidate the
- * whole page. */
- if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
- ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
- LDLM_ERROR(lock, "lock not aligned on CFS_PAGE_SIZE %lu", CFS_PAGE_SIZE);
- LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
- LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
-
- count = ~0;
- skip = 0;
- start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
- end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
- if (lsm->lsm_stripe_count > 1) {
- count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
- skip = (lsm->lsm_stripe_count - 1) * count;
- start += start/count * skip + stripe * count;
- if (end != ~0)
- end += end/count * skip + stripe * count;
- }
- if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
- end = ~0;
-
- i = i_size_read(inode) ? (i_size_read(inode) - 1) >> CFS_PAGE_SHIFT : 0;
- if (i < end)
- end = i;
-
- CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
- "count: %lu skip: %lu end: %lu%s\n", start, start % count,
- count, skip, end, discard ? " (DISCARDING)" : "");
-
- /* walk through the vmas on the inode and tear down mmaped pages that
- * intersect with the lock. this stops immediately if there are no
- * mmap()ed regions of the file. This is not efficient at all and
- * should be short lived. We'll associate mmap()ed pages with the lock
- * and will be able to find them directly */
- for (i = start; i <= end; i += (j + skip)) {
- j = min(count - (i % count), end - i + 1);
- LASSERT(j > 0);
- LASSERT(mapping);
- if (ll_teardown_mmaps(mapping,
- (__u64)i << CFS_PAGE_SHIFT,
- ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
- break;
- }
- /* this is the simplistic implementation of page eviction at
- * cancelation. It is careful to get races with other page
- * lockers handled correctly. fixes from bug 20 will make it
- * more efficient by associating locks with pages and with
- * batching writeback under the lock explicitly. */
- for (i = start, j = start % count; i <= end;
- j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
- if (j == count) {
- CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
- i += skip;
- j = 0;
- if (i > end)
- break;
- }
- LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
- LPU64" >= "LPU64" start %lu i %lu end %lu\n",
- tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
- start, i, end);
+ /* We have page reference already from ll_pin_page */
+ lock_page(page);
- if (!mapping_has_pages(mapping)) {
- CDEBUG(D_INODE|D_PAGE, "nothing left\n");
- break;
- }
+ /* Already truncated by somebody */
+ if (!page->mapping)
+ GOTO(out, rc = 0);
- cond_resched();
+ mapping = page->mapping;
- page = find_get_page(mapping, i);
- if (page == NULL)
- continue;
- LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
- i, tmpex.l_extent.start);
- lock_page(page);
+ ll_teardown_mmaps(mapping,
+ (__u64)page->index << PAGE_CACHE_SHIFT,
+ ((__u64)page->index<<PAGE_CACHE_SHIFT)|
+ ~PAGE_CACHE_MASK);
+ LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
- /* page->mapping to check with racing against teardown */
- if (!discard && clear_page_dirty_for_io(page)) {
- rc = ll_call_writepage(inode, page);
- /* either waiting for io to complete or reacquiring
- * the lock that the failed writepage released */
- lock_page(page);
- wait_on_page_writeback(page);
- if (rc != 0) {
- CERROR("writepage inode %lu(%p) of page %p "
- "failed: %d\n", inode->i_ino, inode,
- page, rc);
+ if (!discard && clear_page_dirty_for_io(page)) {
+ LASSERT(page->mapping);
+ rc = ll_call_writepage(page->mapping->host, page);
+ /* either waiting for io to complete or reacquiring
+ * the lock that the failed writepage released */
+ lock_page(page);
+ wait_on_page_writeback(page);
+ if (rc != 0) {
+ CERROR("writepage inode %lu(%p) of page %p "
+ "failed: %d\n", mapping->host->i_ino,
+ mapping->host, page, rc);
#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
- if (rc == -ENOSPC)
- set_bit(AS_ENOSPC, &mapping->flags);
- else
- set_bit(AS_EIO, &mapping->flags);
+ if (rc == -ENOSPC)
+ set_bit(AS_ENOSPC, &mapping->flags);
+ else
+ set_bit(AS_EIO, &mapping->flags);
#else
- mapping->gfp_mask |= AS_EIO_MASK;
+ mapping->gfp_mask |= AS_EIO_MASK;
#endif
- }
- }
-
- tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
- /* check to see if another DLM lock covers this page b=2765 */
- rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
- LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
- LDLM_FL_TEST_LOCK,
- &lock->l_resource->lr_name, LDLM_EXTENT,
- &tmpex, LCK_PR | LCK_PW, &lockh);
- if (rc2 == 0 && page->mapping != NULL) {
- struct ll_async_page *llap = llap_cast_private(page);
- // checking again to account for writeback's lock_page()
- LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
- if (llap)
- ll_ra_accounting(llap, mapping);
- ll_truncate_complete_page(page);
}
- unlock_page(page);
- page_cache_release(page);
- }
- LASSERTF(tmpex.l_extent.start <=
- (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
- lock->l_policy_data.l_extent.end + 1),
- "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
- tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
- start, i, end);
+ }
+ if (page->mapping != NULL) {
+ struct ll_async_page *llap = llap_cast_private(page);
+ // checking again to account for writeback's lock_page()
+ LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
+ if (llap)
+ ll_ra_accounting(llap, page->mapping);
+ ll_truncate_complete_page(page);
+ }
EXIT;
+out:
+ LASSERT(!PageWriteback(page));
+ unlock_page(page);
+ page_cache_release(page);
+
+ return 0;
}
-static int ll_extent_lock_callback(struct ldlm_lock *lock,
- struct ldlm_lock_desc *new, void *data,
- int flag)
+int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
+ void *data, int flag)
{
- struct lustre_handle lockh = { 0 };
- int rc;
+ struct inode *inode;
+ struct ll_inode_info *lli;
+ struct lov_stripe_md *lsm;
+ int stripe;
+ __u64 kms;
+
ENTRY;
if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
LBUG();
}
- switch (flag) {
- case LDLM_CB_BLOCKING:
- ldlm_lock2handle(lock, &lockh);
- rc = ldlm_cli_cancel(&lockh);
- if (rc != ELDLM_OK)
- CERROR("ldlm_cli_cancel failed: %d\n", rc);
- break;
- case LDLM_CB_CANCELING: {
- struct inode *inode;
- struct ll_inode_info *lli;
- struct lov_stripe_md *lsm;
- int stripe;
- __u64 kms;
-
- /* This lock wasn't granted, don't try to evict pages */
- if (lock->l_req_mode != lock->l_granted_mode)
- RETURN(0);
-
- inode = ll_inode_from_lock(lock);
- if (inode == NULL)
- RETURN(0);
- lli = ll_i2info(inode);
- if (lli == NULL)
- goto iput;
- if (lli->lli_smd == NULL)
- goto iput;
- lsm = lli->lli_smd;
-
- stripe = ll_lock_to_stripe_offset(inode, lock);
- if (stripe < 0)
- goto iput;
-
- ll_pgcache_remove_extent(inode, lsm, lock, stripe);
+ inode = ll_inode_from_lock(lock);
+ if (inode == NULL)
+ RETURN(0);
+ lli = ll_i2info(inode);
+ if (lli == NULL)
+ GOTO(iput, 0);
+ if (lli->lli_smd == NULL)
+ GOTO(iput, 0);
+ lsm = lli->lli_smd;
- lov_stripe_lock(lsm);
- lock_res_and_lock(lock);
- kms = ldlm_extent_shift_kms(lock,
- lsm->lsm_oinfo[stripe]->loi_kms);
+ stripe = ll_lock_to_stripe_offset(inode, lock);
+ if (stripe < 0)
+ GOTO(iput, 0);
- if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
- LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
- lsm->lsm_oinfo[stripe]->loi_kms, kms);
- lsm->lsm_oinfo[stripe]->loi_kms = kms;
- unlock_res_and_lock(lock);
- lov_stripe_unlock(lsm);
- ll_try_done_writing(inode);
- iput:
- iput(inode);
- break;
- }
- default:
- LBUG();
- }
+ lov_stripe_lock(lsm);
+ lock_res_and_lock(lock);
+ kms = ldlm_extent_shift_kms(lock,
+ lsm->lsm_oinfo[stripe]->loi_kms);
+
+ if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
+ LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
+ lsm->lsm_oinfo[stripe]->loi_kms, kms);
+ lsm->lsm_oinfo[stripe]->loi_kms = kms;
+ unlock_res_and_lock(lock);
+ lov_stripe_unlock(lsm);
+ ll_try_done_writing(inode);
+ EXIT;
+iput:
+ iput(inode);
- RETURN(0);
+ return 0;
}
#if 0
einfo.ei_type = LDLM_EXTENT;
einfo.ei_mode = LCK_PR;
- einfo.ei_cb_bl = ll_extent_lock_callback;
+ einfo.ei_cb_bl = osc_extent_blocking_cb;
einfo.ei_cb_cp = ldlm_completion_ast;
einfo.ei_cb_gl = ll_glimpse_callback;
einfo.ei_cbdata = NULL;
* acquired only if there were no conflicting locks. */
einfo.ei_type = LDLM_EXTENT;
einfo.ei_mode = LCK_PR;
- einfo.ei_cb_bl = ll_extent_lock_callback;
+ einfo.ei_cb_bl = osc_extent_blocking_cb;
einfo.ei_cb_cp = ldlm_completion_ast;
einfo.ei_cb_gl = ll_glimpse_callback;
einfo.ei_cbdata = inode;
einfo.ei_type = LDLM_EXTENT;
einfo.ei_mode = mode;
- einfo.ei_cb_bl = ll_extent_lock_callback;
+ einfo.ei_cb_bl = osc_extent_blocking_cb;
einfo.ei_cb_cp = ldlm_completion_ast;
einfo.ei_cb_gl = ll_glimpse_callback;
einfo.ei_cbdata = inode;
struct ll_close_queue *lcq;
pid_t pid;
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_CLOSE_THREAD, -EINTR);
OBD_ALLOC(lcq, sizeof(*lcq));
if (lcq == NULL)
return -ENOMEM;
llap_origin:3,
llap_ra_used:1,
llap_ignore_quota:1,
+ llap_nocache:1,
llap_lockless_io_page:1;
void *llap_cookie;
struct page *llap_page;
struct lookup_intent *ll_convert_intent(struct open_intent *oit,
int lookup_flags);
#endif
+void ll_pin_extent_cb(void *data);
+int ll_page_removal_cb(void *data, int discard);
+int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
+ void *data, int flag);
int lookup_it_finish(struct ptlrpc_request *request, int offset,
struct lookup_intent *it, void *data);
void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry);
#include <lprocfs_status.h>
#include <lustre_disk.h>
#include <lustre_param.h>
+#include <lustre_cache.h>
#include "llite_internal.h"
cfs_mem_cache_t *ll_file_data_slab;
obd->obd_upcall.onu_upcall = ll_ocd_update;
data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
-
err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, data, NULL);
if (err == -EBUSY) {
LCONSOLE_ERROR_MSG(0x150, "An OST (osc %s) is performing "
sbi->ll_lco.lco_flags = data->ocd_connect_flags;
spin_unlock(&sbi->ll_lco.lco_lock);
- mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp);
+ err = obd_register_page_removal_cb(sbi->ll_osc_exp,
+ ll_page_removal_cb,
+ ll_pin_extent_cb);
+ if (err) {
+ CERROR("cannot register page removal callback: rc = %d\n",err);
+ GOTO(out_osc, err);
+ }
+ err = obd_register_lock_cancel_cb(sbi->ll_osc_exp,
+ ll_extent_lock_cancel_cb);
+ if (err) {
+ CERROR("cannot register lock cancel callback: rc = %d\n", err);
+ GOTO(out_page_rm_cb, err);
+ }
+
+ err = mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp);
+ if (err) {
+ CERROR("cannot set max EA and cookie sizes: rc = %d\n", err);
+ GOTO(out_lock_cn_cb, err);
+ }
err = obd_prep_async_page(sbi->ll_osc_exp, NULL, NULL, NULL,
- 0, NULL, NULL, NULL);
+ 0, NULL, NULL, NULL, 0, NULL);
if (err < 0) {
LCONSOLE_ERROR_MSG(0x151, "There are no OST's in this "
"filesystem. There must be at least one "
"active OST for a client to start.\n");
- GOTO(out_osc, err);
+ GOTO(out_lock_cn_cb, err);
}
if (!ll_async_page_slab) {
ll_async_page_slab_size,
0, 0);
if (!ll_async_page_slab)
- GOTO(out_osc, -ENOMEM);
+ GOTO(out_lock_cn_cb, -ENOMEM);
}
err = mdc_getstatus(sbi->ll_mdc_exp, &rootfid);
if (err) {
CERROR("cannot mds_connect: rc = %d\n", err);
- GOTO(out_osc, err);
+ GOTO(out_lock_cn_cb, err);
}
CDEBUG(D_SUPER, "rootfid "LPU64"\n", rootfid.id);
sbi->ll_rootino = rootfid.id;
0, &request);
if (err) {
CERROR("mdc_getattr failed for root: rc = %d\n", err);
- GOTO(out_osc, err);
+ GOTO(out_lock_cn_cb, err);
}
err = mdc_req2lustre_md(request, REPLY_REC_OFF, sbi->ll_osc_exp, &md);
if (err) {
CERROR("failed to understand root inode md: rc = %d\n",err);
ptlrpc_req_finished (request);
- GOTO(out_osc, err);
+ GOTO(out_lock_cn_cb, err);
}
LASSERT(sbi->ll_rootino != 0);
out_root:
if (root)
iput(root);
+out_lock_cn_cb:
+ obd_unregister_lock_cancel_cb(sbi->ll_osc_exp,
+ ll_extent_lock_cancel_cb);
+out_page_rm_cb:
+ obd_unregister_page_removal_cb(sbi->ll_osc_exp,
+ ll_page_removal_cb);
out_osc:
obd_disconnect(sbi->ll_osc_exp);
sbi->ll_osc_exp = NULL;
prune_deathrow(sbi, 0);
list_del(&sbi->ll_conn_chain);
+
+ obd_unregister_page_removal_cb(sbi->ll_osc_exp,
+ ll_page_removal_cb);
+ obd_unregister_lock_cancel_cb(sbi->ll_osc_exp,ll_extent_lock_cancel_cb);
+
obd_disconnect(sbi->ll_osc_exp);
sbi->ll_osc_exp = NULL;
return count;
}
-static struct ll_async_page *llap_from_page(struct page *page, unsigned origin)
+static struct ll_async_page *llap_from_page_with_lockh(struct page *page,
+ unsigned origin,
+ struct lustre_handle *lockh)
{
struct ll_async_page *llap;
struct obd_export *exp;
llap->llap_magic = LLAP_MAGIC;
llap->llap_cookie = (void *)llap + size_round(sizeof(*llap));
+ /* XXX: for bug 11270 - check for lockless origin here! */
+ if (origin == LLAP_ORIGIN_LOCKLESS_IO)
+ llap->llap_nocache = 1;
+
rc = obd_prep_async_page(exp, ll_i2info(inode)->lli_smd, NULL, page,
(obd_off)page->index << CFS_PAGE_SHIFT,
- &ll_async_page_ops, llap, &llap->llap_cookie);
+ &ll_async_page_ops, llap, &llap->llap_cookie,
+ llap->llap_nocache, lockh);
if (rc) {
OBD_SLAB_FREE(llap, ll_async_page_slab,
ll_async_page_slab_size);
RETURN(llap);
}
+static inline struct ll_async_page *llap_from_page(struct page *page,
+ unsigned origin)
+{
+ return llap_from_page_with_lockh(page, origin, NULL);
+}
+
static int queue_or_sync_write(struct obd_export *exp, struct inode *inode,
struct ll_async_page *llap,
unsigned to, obd_flag async_flags)
int ll_commit_write(struct file *file, struct page *page, unsigned from,
unsigned to)
{
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
struct inode *inode = page->mapping->host;
struct ll_inode_info *lli = ll_i2info(inode);
struct lov_stripe_md *lsm = lli->lli_smd;
struct obd_export *exp;
struct ll_async_page *llap;
loff_t size;
+ struct lustre_handle *lockh = NULL;
int rc = 0;
ENTRY;
CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n",
inode, page, from, to, page->index);
- llap = llap_from_page(page, LLAP_ORIGIN_COMMIT_WRITE);
+ if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
+ lockh = &fd->fd_cwlockh;
+
+ llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_COMMIT_WRITE, lockh);
if (IS_ERR(llap))
RETURN(PTR_ERR(llap));
* here. */
void ll_removepage(struct page *page)
{
+ struct ll_async_page *llap = llap_cast_private(page);
ENTRY;
LASSERT(!in_interrupt());
return;
}
- LASSERT(!llap_cast_private(page)->llap_lockless_io_page);
+ LASSERT(!llap->llap_lockless_io_page);
+ LASSERT(!llap->llap_nocache);
+
LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n");
__ll_put_llap(page);
EXIT;
}
-static int ll_page_matches(struct page *page, int fd_flags)
-{
- struct lustre_handle match_lockh = {0};
- struct inode *inode = page->mapping->host;
- ldlm_policy_data_t page_extent;
- int flags, matches;
- ENTRY;
-
- if (unlikely(fd_flags & LL_FILE_GROUP_LOCKED))
- RETURN(1);
-
- page_extent.l_extent.start = (__u64)page->index << CFS_PAGE_SHIFT;
- page_extent.l_extent.end =
- page_extent.l_extent.start + CFS_PAGE_SIZE - 1;
- flags = LDLM_FL_TEST_LOCK | LDLM_FL_BLOCK_GRANTED;
- if (!(fd_flags & LL_FILE_READAHEAD))
- flags |= LDLM_FL_CBPENDING;
- matches = obd_match(ll_i2sbi(inode)->ll_osc_exp,
- ll_i2info(inode)->lli_smd, LDLM_EXTENT,
- &page_extent, LCK_PR | LCK_PW, &flags, inode,
- &match_lockh);
- RETURN(matches);
-}
-
static int ll_issue_page_read(struct obd_export *exp,
struct ll_async_page *llap,
struct obd_io_group *oig, int defer)
/* we do this first so that we can see the page in the /proc
* accounting */
llap = llap_from_page(page, LLAP_ORIGIN_READAHEAD);
- if (IS_ERR(llap) || llap->llap_defer_uptodate)
+ if (IS_ERR(llap) || llap->llap_defer_uptodate) {
+ if (PTR_ERR(llap) == -ENOLCK) {
+ ll_ra_stats_inc(mapping, RA_STAT_FAILED_MATCH);
+ match_failed = 1;
+ CDEBUG(D_READA | D_PAGE,
+ "Adding page to cache failed index "
+ "%lu\n", i);
+ }
goto next_page;
+ }
/* skip completed pages */
if (Page_Uptodate(page))
goto next_page;
/* bail when we hit the end of the lock. */
- if ((rc = ll_page_matches(page, flags|LL_FILE_READAHEAD)) <= 0){
- LL_CDEBUG_PAGE(D_READA | D_PAGE, page,
- "lock match failed: rc %d\n", rc);
- ll_ra_stats_inc(mapping, RA_STAT_FAILED_MATCH);
- match_failed = 1;
- goto next_page;
- }
-
rc = ll_issue_page_read(exp, llap, oig, 1);
if (rc == 0) {
reserved--;
if (IS_ERR(llap))
GOTO(out, rc = PTR_ERR(llap));
+ LASSERT(!llap->llap_nocache);
LASSERT(!PageWriteback(page));
set_page_writeback(page);
struct obd_export *exp;
struct ll_async_page *llap;
struct obd_io_group *oig = NULL;
+ struct lustre_handle *lockh = NULL;
int rc;
ENTRY;
if (exp == NULL)
GOTO(out, rc = -EINVAL);
- llap = llap_from_page(page, LLAP_ORIGIN_READPAGE);
- if (IS_ERR(llap))
+ if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
+ lockh = &fd->fd_cwlockh;
+
+ llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READPAGE, lockh);
+ if (IS_ERR(llap)) {
+ if (PTR_ERR(llap) == -ENOLCK) {
+ CWARN("ino %lu page %lu (%llu) not covered by "
+ "a lock (mmap?). check debug logs.\n",
+ inode->i_ino, page->index,
+ (long long)page->index << PAGE_CACHE_SHIFT);
+ }
GOTO(out, rc = PTR_ERR(llap));
+ }
if (ll_i2sbi(inode)->ll_ra_info.ra_max_pages)
ras_update(ll_i2sbi(inode), inode, &fd->fd_ras, page->index,
GOTO(out_oig, rc = 0);
}
- if (likely((fd->fd_flags & LL_FILE_IGNORE_LOCK) == 0)) {
- rc = ll_page_matches(page, fd->fd_flags);
- if (rc < 0) {
- LL_CDEBUG_PAGE(D_ERROR, page,
- "lock match failed: rc %d\n", rc);
- GOTO(out, rc);
- }
-
- if (rc == 0) {
- CWARN("ino %lu page %lu (%llu) not covered by "
- "a lock (mmap?). check debug logs.\n",
- inode->i_ino, page->index,
- (long long)page->index << CFS_PAGE_SHIFT);
- }
- }
-
rc = ll_issue_page_read(exp, llap, oig, 0);
if (rc)
GOTO(out, rc);
#include <obd_ost.h>
#include <lprocfs_status.h>
#include <lustre_param.h>
+#include <lustre_cache.h>
#include "lov_internal.h"
mutex_up(&lov->lov_lock);
}
+static int lov_register_page_removal_cb(struct obd_export *exp,
+ obd_page_removal_cb_t func,
+ obd_pin_extent_cb pin_cb)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int i, rc = 0;
+
+ if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
+ return -EBUSY;
+
+ if (lov->lov_page_pin_cb && lov->lov_page_pin_cb != pin_cb)
+ return -EBUSY;
+
+ for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+ if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+ continue;
+ rc |= obd_register_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
+ func, pin_cb);
+ }
+
+ lov->lov_page_removal_cb = func;
+ lov->lov_page_pin_cb = pin_cb;
+
+ return rc;
+}
+
+static int lov_unregister_page_removal_cb(struct obd_export *exp,
+ obd_page_removal_cb_t func)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int i, rc = 0;
+
+ if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
+ return -EINVAL;
+
+ lov->lov_page_removal_cb = NULL;
+ lov->lov_page_pin_cb = NULL;
+
+ for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+ if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+ continue;
+ rc |= obd_unregister_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
+ func);
+ }
+
+ return rc;
+}
+
+static int lov_register_lock_cancel_cb(struct obd_export *exp,
+ obd_lock_cancel_cb func)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int i, rc = 0;
+
+ if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
+ return -EBUSY;
+
+ for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+ if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+ continue;
+ rc |= obd_register_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
+ func);
+ }
+
+ lov->lov_lock_cancel_cb = func;
+
+ return rc;
+}
+
+static int lov_unregister_lock_cancel_cb(struct obd_export *exp,
+ obd_lock_cancel_cb func)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int i, rc = 0;
+
+ if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
+ return -EINVAL;
+
+ for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+ if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+ continue;
+ rc |= obd_unregister_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
+ func);
+ }
+ lov->lov_lock_cancel_cb = NULL;
+ return rc;
+}
+
#define MAX_STRING_SIZE 128
static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
struct obd_connect_data *data)
RETURN(-ENODEV);
}
+ rc = obd_register_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+ lov->lov_page_removal_cb,
+ lov->lov_page_pin_cb);
+ if (rc) {
+ obd_disconnect(lov->lov_tgts[index]->ltd_exp);
+ lov->lov_tgts[index]->ltd_exp = NULL;
+ RETURN(rc);
+ }
+
+ rc = obd_register_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
+ lov->lov_lock_cancel_cb);
+ if (rc) {
+ obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+ lov->lov_page_removal_cb);
+ obd_disconnect(lov->lov_tgts[index]->ltd_exp);
+ lov->lov_tgts[index]->ltd_exp = NULL;
+ RETURN(rc);
+ }
+
rc = obd_register_observer(tgt_obd, obd);
if (rc) {
CERROR("Target %s register_observer error %d\n",
obd_uuid2str(&tgt_uuid), rc);
+ obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
+ lov->lov_lock_cancel_cb);
+ obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+ lov->lov_page_removal_cb);
obd_disconnect(lov->lov_tgts[index]->ltd_exp);
lov->lov_tgts[index]->ltd_exp = NULL;
RETURN(rc);
CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
obd->obd_name, osc_obd->obd_name);
+ obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
+ lov->lov_lock_cancel_cb);
+ obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+ lov->lov_page_removal_cb);
if (lov->lov_tgts[index]->ltd_active) {
lov->lov_tgts[index]->ltd_active = 0;
lov->desc.ld_active_tgt_count--;
int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
struct lov_oinfo *loi, cfs_page_t *page,
obd_off offset, struct obd_async_page_ops *ops,
- void *data, void **res)
+ void *data, void **res, int nocache,
+ struct lustre_handle *lockh)
{
struct lov_obd *lov = &exp->exp_obd->u.lov;
struct lov_async_page *lap;
+ struct lov_lock_handles *lov_lockh = NULL;
int rc = 0;
ENTRY;
}
rc = size_round(sizeof(*lap)) +
obd_prep_async_page(lov->lov_tgts[i]->ltd_exp, NULL,
- NULL, NULL, 0, NULL, NULL, NULL);
+ NULL, NULL, 0, NULL, NULL, NULL, 0,
+ NULL);
RETURN(rc);
}
ASSERT_LSM_MAGIC(lsm);
lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap));
+ if (lockh) {
+ lov_lockh = lov_handle2llh(lockh);
+ if (lov_lockh) {
+ lockh = lov_lockh->llh_handles + lap->lap_stripe;
+ }
+ }
+
rc = obd_prep_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
lsm, loi, page, lap->lap_sub_offset,
&lov_async_page_ops, lap,
- &lap->lap_sub_cookie);
+ &lap->lap_sub_cookie, nocache, lockh);
+ if (lov_lockh)
+ lov_llh_put(lov_lockh);
if (rc)
RETURN(rc);
CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
.o_llog_init = lov_llog_init,
.o_llog_finish = lov_llog_finish,
.o_notify = lov_notify,
+ .o_register_page_removal_cb = lov_register_page_removal_cb,
+ .o_unregister_page_removal_cb = lov_unregister_page_removal_cb,
+ .o_register_lock_cancel_cb = lov_register_lock_cancel_cb,
+ .o_unregister_lock_cancel_cb = lov_unregister_lock_cancel_cb,
};
static quota_interface_t *quota_interface;
LPROCFS_OBD_OP_INIT(num_private_stats, stats, quotacheck);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, quotactl);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, ping);
+ LPROCFS_OBD_OP_INIT(num_private_stats, stats, register_page_removal_cb);
+ LPROCFS_OBD_OP_INIT(num_private_stats,stats,unregister_page_removal_cb);
+ LPROCFS_OBD_OP_INIT(num_private_stats, stats, register_lock_cancel_cb);
+ LPROCFS_OBD_OP_INIT(num_private_stats, stats,unregister_lock_cancel_cb);
}
int lprocfs_alloc_obd_stats(struct obd_device *obd, unsigned num_private_stats)
rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page,
eap->eap_off, &ec_async_page_ops,
- eap, &eap->eap_cookie);
+ eap, &eap->eap_cookie, 1, NULL);
if (rc) {
spin_lock(&eas.eas_lock);
eas.eas_rc = rc;
MODULES := osc
-osc-objs := osc_request.o lproc_osc.o osc_create.o
+osc-objs := osc_request.o lproc_osc.o osc_create.o cache.o
@INCLUDE_RULES@
if LIBLUSTRE
noinst_LIBRARIES = libosc.a
-libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h
+libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h cache.c
libosc_a_CPPFLAGS = $(LLCPPFLAGS)
libosc_a_CFLAGS = $(LLCFLAGS)
endif
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2001-2003 Cluster File Systems, Inc.
+ * Author Oleg Drokin <green@clusterfs.com>
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ * You may have signed or agreed to another license before downloading
+ * this software. If so, you are bound by the terms and conditions
+ * of that agreement, and the following does not apply to you. See the
+ * LICENSE file included with this distribution for more information.
+ *
+ * If you did not agree to a different license, then this copy of Lustre
+ * is open source software; you can redistribute it and/or modify it
+ * under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In either case, Lustre is distributed in the hope that it will be
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * license text for more details.
+ *
+ * Cache of triples - object, lock, extent
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_OSC
+
+#ifdef __KERNEL__
+# include <linux/version.h>
+# include <linux/module.h>
+# include <linux/list.h>
+#else /* __KERNEL__ */
+# include <liblustre.h>
+#endif
+
+#include <lustre_dlm.h>
+#include <lustre_cache.h>
+#include <obd.h>
+#include <lustre_debug.h>
+
+#include "osc_internal.h"
+
+/* Adding @lock to the @cache */
+int cache_add_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
+{
+ struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+
+ if (!lock) // Lock disappeared under us.
+ return 0;
+
+ spin_lock(&cache->lc_locks_list_lock);
+ list_add_tail(&lock->l_cache_locks_list, &cache->lc_locks_list);
+ spin_unlock(&cache->lc_locks_list_lock);
+
+ LDLM_LOCK_PUT(lock);
+
+ return 0;
+}
+
+/* Tries to add @extent to lock represented by @lockh if non-NULL, otherwise
+ just tries to match some suitable lock by resource and data contained in
+ @extent */
+/* Should be called with oap->lock held (except on initial addition, see
+ comment in osc_request.c*/
+int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res,
+ struct osc_async_page *extent, struct lustre_handle *lockh)
+{
+ struct lustre_handle tmplockh;
+ ldlm_policy_data_t tmpex;
+ struct ldlm_lock *lock = NULL;
+ ENTRY;
+
+ /* Don't add anything second time */
+ if (!list_empty(&extent->oap_page_list)) {
+ LBUG();
+ RETURN(0);
+ }
+
+ if (lockh && lustre_handle_is_used(lockh)) {
+ lock = ldlm_handle2lock(lockh);
+ if (!lock)
+ RETURN(-ENOLCK);
+
+ LASSERTF(lock->l_policy_data.l_extent.start <=
+ extent->oap_obj_off &&
+ extent->oap_obj_off + CFS_PAGE_SIZE - 1 <=
+ lock->l_policy_data.l_extent.end,
+ "Got wrong lock [" LPU64 "," LPU64 "] for page with "
+ "offset " LPU64 "\n",
+ lock->l_policy_data.l_extent.start,
+ lock->l_policy_data.l_extent.end, extent->oap_obj_off);
+ } else {
+ int mode;
+ /* Real extent width calculation here once we have real
+ * extents
+ */
+ tmpex.l_extent.start = extent->oap_obj_off;
+ tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
+
+ /* XXX find lock from extent or something like that */
+ /* The lock mode does not matter. If this is dirty page - then
+ * there could be only one PW lock. If the page is clean,
+ * any PR lock is good
+ */
+ mode = ldlm_lock_match(cache->lc_obd->obd_namespace,
+ LDLM_FL_BLOCK_GRANTED |
+ LDLM_FL_CBPENDING, res, LDLM_EXTENT,
+ &tmpex, LCK_PW | LCK_PR, &tmplockh);
+
+ if (mode <= 0) {
+ CDEBUG(D_CACHE, "No lock to attach " LPU64 "->" LPU64
+ " extent to!\n", tmpex.l_extent.start,
+ tmpex.l_extent.end);
+ RETURN((mode < 0) ? mode : -ENOLCK);
+ }
+
+ lock = ldlm_handle2lock(&tmplockh);
+ if (!lock) { // Race - lock disappeared under us (eviction?)
+ CDEBUG(D_CACHE, "Newly matched lock just disappeared "
+ "under us\n");
+ RETURN(-ENOLCK);
+ }
+ ldlm_lock_decref(&tmplockh, mode);
+ }
+
+ spin_lock(&lock->l_extents_list_lock);
+ list_add_tail(&extent->oap_page_list, &lock->l_extents_list);
+ spin_unlock(&lock->l_extents_list_lock);
+ extent->oap_ldlm_lock = lock;
+ LDLM_LOCK_PUT(lock);
+
+ RETURN(0);
+}
+
+static int cache_extent_removal_event(struct lustre_cache *cache,
+ void *data, int discard)
+{
+ struct page *page = data;
+ struct page_removal_cb_element *element;
+
+ list_for_each_entry(element, &cache->lc_page_removal_callback_list,
+ prce_list) {
+ element->prce_callback(page, discard);
+ }
+ return 0;
+}
+
+/* Registers set of pin/remove callbacks for extents. Current limitation is
+ there could be only one pin_cb per cache.
+ @pin_cb is called when we have the page locked to pin it in memory so that
+ it does not disappear after we release page lock (which we need to do
+ to avoid deadlocks).
+ @func_cb is removal callback that is called after page and all spinlocks are
+ released, and is supposed to clean the page and remove it from all
+ (vfs) caches it might be in */
+int cache_add_extent_removal_cb(struct lustre_cache *cache,
+ obd_page_removal_cb_t func_cb,
+ obd_pin_extent_cb pin_cb)
+{
+ struct page_removal_cb_element *element;
+
+ if (!func_cb)
+ return 0;
+ OBD_ALLOC(element, sizeof(*element));
+ if (!element)
+ return -ENOMEM;
+ element->prce_callback = func_cb;
+ list_add_tail(&element->prce_list,
+ &cache->lc_page_removal_callback_list);
+
+ cache->lc_pin_extent_cb = pin_cb;
+ return 0;
+}
+EXPORT_SYMBOL(cache_add_extent_removal_cb);
+
+/* Unregister exntent removal callback registered earlier. If the list of
+ registered removal callbacks becomes empty, we also clear pin callback
+ since it could only be one */
+int cache_del_extent_removal_cb(struct lustre_cache *cache,
+ obd_page_removal_cb_t func_cb)
+{
+ int found = 0;
+ struct page_removal_cb_element *element, *t;
+
+ list_for_each_entry_safe(element, t,
+ &cache->lc_page_removal_callback_list,
+ prce_list) {
+ if (element->prce_callback == func_cb) {
+ list_del(&element->prce_list);
+ OBD_FREE(element, sizeof(*element));
+ found = 1;
+ /* We continue iterating the list in case this function
+ was registered more than once */
+ }
+ }
+
+ if (list_empty(&cache->lc_page_removal_callback_list))
+ cache->lc_pin_extent_cb = NULL;
+
+ return !found;
+}
+EXPORT_SYMBOL(cache_del_extent_removal_cb);
+
+static int cache_remove_extent_nolock(struct lustre_cache *cache,
+ struct osc_async_page *extent)
+{
+ int have_lock = !!extent->oap_ldlm_lock;
+ /* We used to check oap_ldlm_lock for non NULL here, but it might be
+ NULL, in fact, due to parallel page eviction clearing it and waiting
+ on a lock's page list lock */
+ extent->oap_ldlm_lock = NULL;
+
+ if (!list_empty(&extent->oap_page_list))
+ list_del_init(&extent->oap_page_list);
+
+ return have_lock;
+}
+
+/* Request the @extent to be removed from cache and locks it belongs to. */
+void cache_remove_extent(struct lustre_cache *cache,
+ struct osc_async_page *extent)
+{
+ struct ldlm_lock *lock;
+
+ spin_lock(&extent->oap_lock);
+ lock = extent->oap_ldlm_lock;
+
+ extent->oap_ldlm_lock = NULL;
+ spin_unlock(&extent->oap_lock);
+
+ /* No lock - means this extent is not in any list */
+ if (!lock)
+ return;
+
+ spin_lock(&lock->l_extents_list_lock);
+ if (!list_empty(&extent->oap_page_list))
+ list_del_init(&extent->oap_page_list);
+ spin_unlock(&lock->l_extents_list_lock);
+}
+
+/* Iterate through list of extents in given lock identified by @lockh,
+ calling @cb_func for every such extent. Also passed @data to every call.
+ Stops iterating prematurely if @cb_func returns nonzero. */
+int cache_iterate_extents(struct lustre_cache *cache,
+ struct lustre_handle *lockh,
+ cache_iterate_extents_cb_t cb_func, void *data)
+{
+ struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+ struct osc_async_page *extent, *t;
+
+ if (!lock) // Lock disappeared
+ return 0;
+ /* Parallel page removal from mem pressure can race with us */
+ spin_lock(&lock->l_extents_list_lock);
+ list_for_each_entry_safe(extent, t, &lock->l_extents_list,
+ oap_page_list) {
+ if (cb_func(cache, lockh, extent, data))
+ break;
+ }
+ spin_unlock(&lock->l_extents_list_lock);
+ LDLM_LOCK_PUT(lock);
+
+ return 0;
+}
+
+static int cache_remove_extents_from_lock(struct lustre_cache *cache,
+ struct ldlm_lock *lock, void *data)
+{
+ struct osc_async_page *extent;
+ void *ext_data;
+
+ LASSERT(lock);
+
+ spin_lock(&lock->l_extents_list_lock);
+ while (!list_empty(&lock->l_extents_list)) {
+ extent = list_entry(lock->l_extents_list.next,
+ struct osc_async_page, oap_page_list);
+
+ spin_lock(&extent->oap_lock);
+ /* If there is no lock referenced from this oap, it means
+ there is parallel page-removal process waiting to free that
+ page on l_extents_list_lock and it holds page lock.
+ We need this page to completely go away and for that to
+ happen we will just try to truncate it here too.
+ Serialisation on page lock will achieve that goal for us. */
+ /* Try to add extent back to the cache first, but only if we
+ * cancel read lock, write locks cannot have other overlapping
+ * locks. If adding is not possible (or canceling pw lock),
+ * then remove extent from cache */
+ if (!cache_remove_extent_nolock(cache, extent) ||
+ (lock->l_granted_mode == LCK_PW) ||
+ cache_add_extent(cache, &lock->l_resource->lr_name, extent,
+ NULL)) {
+ /* We need to remember this oap_page value now,
+ once we release spinlocks, extent struct
+ might be freed and we endup requesting
+ page with address 0x5a5a5a5a in
+ cache_extent_removal_event */
+ ext_data = extent->oap_page;
+ cache->lc_pin_extent_cb(extent->oap_page);
+ spin_unlock(&extent->oap_lock);
+ spin_unlock(&lock->l_extents_list_lock);
+ cache_extent_removal_event(cache, ext_data,
+ lock->
+ l_flags &
+ LDLM_FL_DISCARD_DATA);
+ spin_lock(&lock->l_extents_list_lock);
+ } else {
+ spin_unlock(&extent->oap_lock);
+ }
+ }
+ spin_unlock(&lock->l_extents_list_lock);
+
+ return 0;
+}
+
+/* Remoes @lock from cache after necessary checks. */
+int cache_remove_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
+{
+ struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+
+ if (!lock) // The lock was removed by somebody just now, nothing to do
+ return 0;
+
+ cache_remove_extents_from_lock(cache, lock, NULL /*data */ );
+
+ spin_lock(&cache->lc_locks_list_lock);
+ list_del_init(&lock->l_cache_locks_list);
+ spin_unlock(&cache->lc_locks_list_lock);
+
+ LDLM_LOCK_PUT(lock);
+
+ return 0;
+}
+
+/* Supposed to iterate through all locks in the cache for given resource.
+ Not implemented atthe moment. */
+int cache_iterate_locks(struct lustre_cache *cache, struct ldlm_res_id *res,
+ cache_iterate_locks_cb_t cb_fun, void *data)
+{
+ return -ENOTSUPP;
+}
+
+/* Create lustre cache and attach it to @obd */
+struct lustre_cache *cache_create(struct obd_device *obd)
+{
+ struct lustre_cache *cache;
+
+ OBD_ALLOC(cache, sizeof(*cache));
+ if (!cache)
+ GOTO(out, NULL);
+ spin_lock_init(&cache->lc_locks_list_lock);
+ CFS_INIT_LIST_HEAD(&cache->lc_locks_list);
+ CFS_INIT_LIST_HEAD(&cache->lc_page_removal_callback_list);
+ cache->lc_obd = obd;
+
+ out:
+ return cache;
+}
+
+/* Destroy @cache and free its memory */
+int cache_destroy(struct lustre_cache *cache)
+{
+ if (cache) {
+ spin_lock(&cache->lc_locks_list_lock);
+ if (!list_empty(&cache->lc_locks_list)) {
+ struct ldlm_lock *lock, *tmp;
+ CERROR("still have locks in the list on cleanup:\n");
+
+ list_for_each_entry_safe(lock, tmp,
+ &cache->lc_locks_list,
+ l_cache_locks_list) {
+ list_del_init(&lock->l_cache_locks_list);
+ /* XXX: Of course natural idea would be to print
+ offending locks here, but if we use
+ e.g. LDLM_ERROR, we will likely crash here,
+ as LDLM error tries to access e.g.
+ nonexisting namespace. Normally this kind of
+ case could only happen when somebody did not
+ release lock reference and we have other ways
+ to detect this. */
+ /* Make sure there are no pages left under the
+ lock */
+ LASSERT(list_empty(&lock->l_extents_list));
+ }
+ }
+ spin_unlock(&cache->lc_locks_list_lock);
+ LASSERT(list_empty(&cache->lc_page_removal_callback_list));
+ OBD_FREE(cache, sizeof(*cache));
+ }
+
+ return 0;
+}
struct client_obd *oap_cli;
struct lov_oinfo *oap_loi;
- struct obd_async_page_ops *oap_caller_ops;
+ struct obd_async_page_ops *oap_caller_ops;
void *oap_caller_data;
+ struct list_head oap_page_list;
+ struct ldlm_lock *oap_ldlm_lock;
+ spinlock_t oap_lock;
};
#define oap_page oap_brw_page.pg
#include <lustre_log.h>
#include <lustre_debug.h>
#include <lustre_param.h>
+#include <lustre_cache.h>
#include "osc_internal.h"
static quota_interface_t *quota_interface = NULL;
extern quota_interface_t osc_quota_interface;
static void osc_release_ppga(struct brw_page **ppga, obd_count count);
+int osc_cleanup(struct obd_device *obd);
static quota_interface_t *quota_interface;
extern quota_interface_t osc_quota_interface;
int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
struct lov_oinfo *loi, cfs_page_t *page,
obd_off offset, struct obd_async_page_ops *ops,
- void *data, void **res)
+ void *data, void **res, int nocache,
+ struct lustre_handle *lockh)
{
struct osc_async_page *oap;
+ struct ldlm_res_id oid = {{0}};
+ int rc = 0;
+
ENTRY;
if (!page)
CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
+ INIT_LIST_HEAD(&oap->oap_page_list);
oap->oap_occ.occ_interrupted = osc_occ_interrupted;
+ spin_lock_init(&oap->oap_lock);
+
+ /* If the page was marked as notcacheable - don't add to any locks */
+ if (!nocache) {
+ oid.name[0] = loi->loi_id;
+ /* This is the only place where we can call cache_add_extent
+ without oap_lock, because this page is locked now, and
+ the lock we are adding it to is referenced, so cannot lose
+ any pages either. */
+ rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
+ if (rc)
+ RETURN(rc);
+ }
+
CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
RETURN(0);
}
lop_update_pending(cli, lop, oap->oap_cmd, -1);
}
loi_list_maint(cli, loi);
+ cache_remove_extent(cli->cl_cache, oap);
LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
out:
RETURN(rc);
}
+int osc_extent_blocking_cb(struct ldlm_lock *lock,
+ struct ldlm_lock_desc *new, void *data,
+ int flag)
+{
+ struct lustre_handle lockh = { 0 };
+ int rc;
+ ENTRY;
+
+ if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
+ LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
+ LBUG();
+ }
+
+ switch (flag) {
+ case LDLM_CB_BLOCKING:
+ ldlm_lock2handle(lock, &lockh);
+ rc = ldlm_cli_cancel(&lockh);
+ if (rc != ELDLM_OK)
+ CERROR("ldlm_cli_cancel failed: %d\n", rc);
+ break;
+ case LDLM_CB_CANCELING: {
+
+ ldlm_lock2handle(lock, &lockh);
+ /* This lock wasn't granted, don't try to do anything */
+ if (lock->l_req_mode != lock->l_granted_mode)
+ RETURN(0);
+
+ cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
+ &lockh);
+
+ if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
+ lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
+ lock, new, data,flag);
+ break;
+ }
+ default:
+ LBUG();
+ }
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(osc_extent_blocking_cb);
+
static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
int flags)
{
return 0;
}
-static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
- int intent, int rc)
+static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
+ struct obd_info *oinfo, int intent, int rc)
{
ENTRY;
oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
}
+ if (!rc)
+ cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
+
/* Call the update callback. */
rc = oinfo->oi_cb_up(oinfo, rc);
RETURN(rc);
aa->oa_oi->oi_lockh, rc);
/* Complete osc stuff. */
- rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
+ rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
/* Release the lock for async request. */
if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
RETURN(rc);
}
- rc = osc_enqueue_fini(req, oinfo, intent, rc);
+ rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
if (intent)
ptlrpc_req_finished(req);
ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
OST_MAXREQSIZE,
ptlrpc_add_rqs_to_pool);
+ cli->cl_cache = cache_create(obd);
+ if (!cli->cl_cache) {
+ osc_cleanup(obd);
+ rc = -ENOMEM;
+ }
}
RETURN(rc);
/* free memory of osc quota cache */
lquota_cleanup(quota_interface, obd);
+ cache_destroy(obd->u.cli.cl_cache);
rc = client_obd_cleanup(obd);
ptlrpcd_decref();
RETURN(rc);
}
+static int osc_register_page_removal_cb(struct obd_export *exp,
+ obd_page_removal_cb_t func,
+ obd_pin_extent_cb pin_cb)
+{
+ return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
+ pin_cb);
+}
+
+static int osc_unregister_page_removal_cb(struct obd_export *exp,
+ obd_page_removal_cb_t func)
+{
+ return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
+}
+
+static int osc_register_lock_cancel_cb(struct obd_export *exp,
+ obd_lock_cancel_cb cb)
+{
+ LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
+
+ exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
+ return 0;
+}
+
+static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
+ obd_lock_cancel_cb cb)
+{
+ if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
+ CERROR("Unregistering cancel cb %p, while only %p was "
+ "registered\n", cb,
+ exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
+ RETURN(-EINVAL);
+ }
+
+ exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
+ return 0;
+}
+
static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
{
struct lustre_cfg *lcfg = buf;
.o_llog_init = osc_llog_init,
.o_llog_finish = osc_llog_finish,
.o_process_config = osc_process_config,
+ .o_register_page_removal_cb = osc_register_page_removal_cb,
+ .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
+ .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
+ .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
};
int __init osc_init(void)
{
}
run_test 33 "Mount ost with a large index number"
-umount_client $MOUNT
+umount_client $MOUNT
cleanup_nocli
+test_23() {
+ start_ost
+ start_mds
+ # Simulate -EINTR during mount OBD_FAIL_LDLM_CLOSE_THREAD
+ sysctl -w lustre.fail_loc=0x80000313
+ mount_client $MOUNT
+ cleanup
+}
+run_test 23 "Simulate -EINTR during mount"
+
+equals_msg "Done"
+echo "$0: completed"
test_33a() {
setup
cat $f && error "cat succeeded, expect -EIO"
sysctl -w lustre.fail_loc=0
}
-run_test 62 "verify obd_match failure doesn't LBUG (should -EIO)"
+# This test is now irrelevant (as of bug 10718 inclusion), we no longer
+# match every page all of the time.
+#run_test 62 "verify obd_match failure doesn't LBUG (should -EIO)"
# bug 2319 - oig_wait() interrupted causes crash because of invalid waitq.
test_63() {
}
run_test 79 "df report consistency check ======================="
+test_80() { # bug 10718
+ dd if=/dev/zero of=$DIR/$tfile bs=1M count=1 seek=1M
+ sync; sleep 1; sync
+ BEFORE=`date +%s`
+ cancel_lru_locks OSC
+ AFTER=`date +%s`
+ DIFF=$((AFTER-BEFORE))
+ if [ $DIFF -gt 1 ] ; then
+ error "elapsed for 1M@1T = $DIFF"
+ fi
+ true
+}
+run_test 80 "Page eviction is equally fast at high offsets too ===="
+
# on the LLNL clusters, runas will still pick up root's $TMP settings,
# which will not be writable for the runas user, and then you get a CVS
# error message with a corrupt path string (CVS bug) and panic.