#define DEBUG_SUBSYSTEM S_MDC
-#ifdef __KERNEL__
-# include <linux/module.h>
-# include <linux/pagemap.h>
-# include <linux/miscdevice.h>
-# include <linux/init.h>
-# include <linux/utsname.h>
-#else
-# include <liblustre.h>
-#endif
+#include <linux/module.h>
+#include <linux/pagemap.h>
+#include <linux/miscdevice.h>
+#include <linux/init.h>
+#include <linux/utsname.h>
#include <lustre_acl.h>
#include <lustre_ioctl.h>
RETURN(-EPROTO);
acl = posix_acl_from_xattr(&init_user_ns, buf, body->mbo_aclsize);
+ if (acl == NULL)
+ RETURN(0);
if (IS_ERR(acl)) {
rc = PTR_ERR(acl);
CERROR("convert xattr to acl: %d\n", rc);
RETURN(0);
}
-#ifdef __KERNEL__
static void mdc_release_page(struct page *page, int remove)
{
if (remove) {
/*
* upon hash collision, remove this page,
* otherwise put page reference, and
- * ll_get_dir_page() will issue RPC to fetch
- * the page we want.
+ * mdc_read_page_remote() will issue RPC to
+ * fetch the page we want.
*/
kunmap(page);
mdc_release_page(page,
kunmap(pages[i]);
}
- LASSERTF(lu_pgs == 0, "left = %d", lu_pgs);
+ LASSERTF(lu_pgs == 0, "left = %d\n", lu_pgs);
}
#else
#define mdc_adjust_dirpages(pages, cfs_pgs, lu_pgs) do {} while (0)
ENTRY;
LASSERT(max_pages > 0 && max_pages <= PTLRPC_MAX_BRW_PAGES);
- if (op_data->op_mea1 != NULL) {
- __u32 index = op_data->op_stripe_offset;
-
- inode = op_data->op_mea1->lsm_md_oinfo[index].lmo_root;
- fid = &op_data->op_mea1->lsm_md_oinfo[index].lmo_fid;
- } else {
- inode = op_data->op_data;
- fid = &op_data->op_fid1;
- }
+ inode = op_data->op_data;
+ fid = &op_data->op_fid1;
LASSERT(inode != NULL);
OBD_ALLOC(page_pool, sizeof(page_pool[0]) * max_pages);
/**
* Read dir page from cache first, if it can not find it, read it from
* server and add into the cache.
+ *
+ * \param[in] exp MDC export
+ * \param[in] op_data client MD stack parameters, transfering parameters
+ * between different layers on client MD stack.
+ * \param[in] cb_op callback required for ldlm lock enqueue during
+ * read page
+ * \param[in] hash_offset the hash offset of the page to be read
+ * \param[in] ppage the page to be read
+ *
+ * retval = 0 get the page successfully
+ * errno(<0) get the page failed
*/
static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
- struct md_callback *cb_op, struct page **ppage)
+ struct md_callback *cb_op, __u64 hash_offset,
+ struct page **ppage)
{
struct lookup_intent it = { .it_op = IT_READDIR };
struct page *page;
rc = 0;
mdc_set_lock_data(exp, &it.d.lustre.it_lock_handle, dir, NULL);
- rp_param.rp_off = op_data->op_hash_offset;
+ rp_param.rp_off = hash_offset;
rp_param.rp_hash64 = op_data->op_cli_flags & CLI_HASH64;
page = mdc_page_locate(mapping, &rp_param.rp_off, &start, &end,
rp_param.rp_hash64);
if (BITS_PER_LONG == 32 && rp_param.rp_hash64) {
start = le64_to_cpu(dp->ldp_hash_start) >> 32;
end = le64_to_cpu(dp->ldp_hash_end) >> 32;
- rp_param.rp_off = op_data->op_hash_offset >> 32;
+ rp_param.rp_off = hash_offset >> 32;
} else {
start = le64_to_cpu(dp->ldp_hash_start);
end = le64_to_cpu(dp->ldp_hash_end);
- rp_param.rp_off = op_data->op_hash_offset;
+ rp_param.rp_off = hash_offset;
}
if (end == start) {
LASSERT(start == rp_param.rp_off);
#if BITS_PER_LONG == 32
CWARN("Real page-wide hash collision at ["LPU64" "LPU64"] with "
"hash "LPU64"\n", le64_to_cpu(dp->ldp_hash_start),
- le64_to_cpu(dp->ldp_hash_end), op_data->op_hash_offset);
+ le64_to_cpu(dp->ldp_hash_end), hash_offset);
#endif
/*
goto out_unlock;
}
-/**
- * Read one directory entry from the cache.
- */
-int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data,
- struct md_callback *cb_op, struct lu_dirent **entp,
- struct page **ppage)
-{
- struct page *page = NULL;
- struct lu_dirpage *dp;
- struct lu_dirent *ent;
- int rc = 0;
- __u32 same_hash_count;
- __u64 hash_offset = op_data->op_hash_offset;
- ENTRY;
-
- CDEBUG(D_INFO, DFID " offset = "LPU64", flags %#x\n",
- PFID(&op_data->op_fid1), op_data->op_hash_offset,
- op_data->op_cli_flags);
-
- *ppage = NULL;
- *entp = NULL;
-
- if (op_data->op_hash_offset == MDS_DIR_END_OFF)
- RETURN(0);
-
- rc = mdc_read_page(exp, op_data, cb_op, &page);
- if (rc != 0)
- RETURN(rc);
-
- /* same_hash_count means how many entries with this
- * hash value has been read */
- same_hash_count = op_data->op_same_hash_offset + 1;
- dp = page_address(page);
- for (ent = lu_dirent_start(dp); ent != NULL;
- ent = lu_dirent_next(ent)) {
- /* Skip dummy entry */
- if (le16_to_cpu(ent->lde_namelen) == 0)
- continue;
-
- if (le64_to_cpu(ent->lde_hash) <
- op_data->op_hash_offset)
- continue;
-
- if (unlikely(le64_to_cpu(ent->lde_hash) ==
- op_data->op_hash_offset)) {
- /* If it is not for next entry, which usually from
- * ll_dir_entry_start, return this entry. */
- if (!(op_data->op_cli_flags & CLI_NEXT_ENTRY))
- break;
-
- /* Keep reading until all of entries being read are
- * skipped. */
- if (same_hash_count > 0) {
- same_hash_count--;
- continue;
- }
- }
- break;
- }
-
- /* If it can not find entry in current page, try next page. */
- if (ent == NULL) {
- if (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF) {
- op_data->op_same_hash_offset = 0;
- mdc_release_page(page,
- le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
- RETURN(0);
- }
-
- op_data->op_hash_offset = le64_to_cpu(dp->ldp_hash_end);
- mdc_release_page(page,
- le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
- rc = mdc_read_page(exp, op_data, cb_op, &page);
- if (rc != 0)
- RETURN(rc);
-
- if (page != NULL) {
- dp = page_address(page);
- ent = lu_dirent_start(dp);
- }
- }
-
- /* If the next hash is the same as the current hash, increase
- * the op_same_hash_offset to resolve the same hash conflict */
- if (ent != NULL && op_data->op_cli_flags & CLI_NEXT_ENTRY) {
- if (unlikely(le64_to_cpu(ent->lde_hash) == hash_offset))
- op_data->op_same_hash_offset++;
- else
- op_data->op_same_hash_offset = 0;
- }
-
- *ppage = page;
- *entp = ent;
- RETURN(rc);
-}
-
-#else /* __KERNEL__ */
-
-static struct page
-*mdc_read_page_remote(struct obd_export *exp, const struct lmv_oinfo *lmo,
- const __u64 hash, struct obd_capa *oc)
-{
- struct ptlrpc_request *req = NULL;
- struct page *page;
- int rc;
-
- OBD_PAGE_ALLOC(page, 0);
- if (page == NULL)
- return ERR_PTR(-ENOMEM);
-
- rc = mdc_getpage(exp, &lmo->lmo_fid, hash, oc, &page, 1, &req);
- if (req != NULL)
- ptlrpc_req_finished(req);
-
- if (unlikely(rc)) {
- OBD_PAGE_FREE(page);
- return ERR_PTR(rc);
- }
- return page;
-}
-
-
-static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
- struct md_callback *cb_op,
- struct page **ppage)
-{
- struct page *page;
- struct lmv_oinfo *lmo;
- int rc = 0;
-
- /* No local cache for liblustre, always read entry remotely */
- lmo = &op_data->op_mea1->lsm_md_oinfo[op_data->op_stripe_offset];
- page = mdc_read_page_remote(exp, lmo, op_data->op_hash_offset,
- op_data->op_capa1);
- if (IS_ERR(page))
- return PTR_ERR(page);
-
- *ppage = page;
-
- return rc;
-}
-
-int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data,
- struct md_callback *cb_op, struct lu_dirent **entp,
- struct page **ppage)
-{
- struct page *page = NULL;
- struct lu_dirpage *dp;
- struct lu_dirent *ent;
- int rc;
- ENTRY;
-
- rc = mdc_read_page(exp, op_data, cb_op, &page);
- if (rc != 0)
- RETURN(rc);
-
- dp = page_address(page);
- if (le64_to_cpu(dp->ldp_hash_end) < op_data->op_hash_offset)
- GOTO(out, *entp = NULL);
-
- for (ent = lu_dirent_start(dp); ent != NULL;
- ent = lu_dirent_next(ent))
- if (le64_to_cpu(ent->lde_hash) >= op_data->op_hash_offset)
- break;
- *entp = ent;
-out:
-
- OBD_PAGE_FREE(page);
- RETURN(rc);
-}
-
-#endif
static int mdc_statfs(const struct lu_env *env,
struct obd_export *exp, struct obd_statfs *osfs,
ENTRY;
if (!try_module_get(THIS_MODULE)) {
- CERROR("Can't get module. Is it alive?");
+ CERROR("%s: cannot get module '%s'\n", obd->obd_name,
+ module_name(THIS_MODULE));
return -EINVAL;
}
switch (cmd) {
.m_setxattr = mdc_setxattr,
.m_getxattr = mdc_getxattr,
.m_fsync = mdc_fsync,
- .m_read_entry = mdc_read_entry,
+ .m_read_page = mdc_read_page,
.m_unlink = mdc_unlink,
.m_cancel_unused = mdc_cancel_unused,
.m_init_ea_size = mdc_init_ea_size,
LUSTRE_MDC_NAME, NULL);
}
-#ifdef __KERNEL__
static void /*__exit*/ mdc_exit(void)
{
class_unregister_type(LUSTRE_MDC_NAME);
module_init(mdc_init);
module_exit(mdc_exit);
-#endif