From 4e0c8aeb9460e20eb7be9011c24edca35e17340d Mon Sep 17 00:00:00 2001 From: wang di Date: Thu, 21 Nov 2013 00:00:04 -0800 Subject: [PATCH] LU-3531 llite: move dir cache to MDC layer Move directory entries cache from llite to MDC, so client side dir stripe will use independent hash function(in LMV), which does not need to be tightly coupled with the backend storage dir-entry hash function. With striped directory, it will be 2-tier hash, LMV calculate hash value according to the name and hash-type in layout, then each MDT will store these entry in disk by its own hash. Signed-off-by: wang di Change-Id: I14bb6bd81aad6fd59dcc22cf4bcea9d341dca2a1 Reviewed-on: http://review.whamcloud.com/7043 Tested-by: Jenkins Reviewed-by: John L. Hammond Tested-by: Maloo Reviewed-by: Jinshan Xiong Reviewed-by: Andreas Dilger Reviewed-by: Oleg Drokin --- lustre/include/Makefile.am | 1 + lustre/include/lclient.h | 8 + lustre/include/lustre/lustre_idl.h | 78 +++- lustre/include/lustre_lite.h | 13 - lustre/include/lustre_lmv.h | 63 ++++ lustre/include/obd.h | 48 ++- lustre/include/obd_class.h | 13 + lustre/liblustre/dir.c | 3 +- lustre/liblustre/llite_lib.h | 10 + lustre/llite/dir.c | 581 +++++++---------------------- lustre/llite/llite_internal.h | 20 +- lustre/llite/llite_lib.c | 30 +- lustre/llite/llite_nfs.c | 30 +- lustre/llite/statahead.c | 487 +++++++++++------------- lustre/lmv/lmv_internal.h | 70 ++-- lustre/lmv/lmv_obd.c | 435 +++++++++++++++------ lustre/mdc/mdc_internal.h | 13 + lustre/mdc/mdc_lib.c | 3 + lustre/mdc/mdc_locks.c | 32 +- lustre/mdc/mdc_request.c | 746 +++++++++++++++++++++++++++++++++---- lustre/obdclass/Makefile.in | 2 +- lustre/obdclass/autoMakefile.am | 4 +- lustre/obdclass/lprocfs_status.c | 1 + lustre/obdclass/mea.c | 117 ------ lustre/ptlrpc/pack_generic.c | 8 - lustre/ptlrpc/wiretest.c | 30 -- lustre/utils/wirecheck.c | 15 - lustre/utils/wiretest.c | 30 -- 28 files changed, 1657 insertions(+), 1234 deletions(-) create mode 100644 lustre/include/lustre_lmv.h delete mode 100644 lustre/obdclass/mea.c diff --git a/lustre/include/Makefile.am b/lustre/include/Makefile.am index bb75665..2312a20 100644 --- a/lustre/include/Makefile.am +++ b/lustre/include/Makefile.am @@ -69,6 +69,7 @@ EXTRA_DIST = \ lustre_lib.h \ lustre_linkea.h \ lustre_lite.h \ + lustre_lmv.h \ lustre_log.h \ lustre_mdc.h \ lustre_mds.h \ diff --git a/lustre/include/lclient.h b/lustre/include/lclient.h index b21778a..f165d14 100644 --- a/lustre/include/lclient.h +++ b/lustre/include/lclient.h @@ -457,4 +457,12 @@ struct cl_client_cache { wait_queue_head_t ccc_unstable_waitq; /* Signaled on BRW commit */ }; +enum op_cli_flags { + CLI_SET_MEA = 1 << 0, + CLI_RM_ENTRY = 1 << 1, + CLI_HASH64 = 1 << 2, + CLI_API32 = 1 << 3, + CLI_READENT_END = 1 << 4, +}; + #endif /*LCLIENT_H */ diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 8b835a5..370899c 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -2645,25 +2645,69 @@ struct lmv_desc { extern void lustre_swab_lmv_desc (struct lmv_desc *ld); -/* TODO: lmv_stripe_md should contain mds capabilities for all slave fids */ -struct lmv_stripe_md { - __u32 mea_magic; - __u32 mea_count; - __u32 mea_master; - __u32 mea_padding; - char mea_pool_name[LOV_MAXPOOLNAME]; - struct lu_fid mea_ids[0]; -}; - -extern void lustre_swab_lmv_stripe_md(struct lmv_stripe_md *mea); +/* lmv structures */ +#define LMV_MAGIC_V1 0x0CD10CD0 /* normal stripe lmv magic */ +#define LMV_USER_MAGIC 0x0CD20CD0 /* default lmv magic*/ +#define LMV_MAGIC LMV_MAGIC_V1 +struct lmv_mds_md_v1 { + __u32 lmv_magic; + __u32 lmv_stripe_count; /* stripe count */ + __u32 lmv_master_mdt_index; /* master MDT index */ + __u32 lmv_hash_type; /* dir stripe policy, i.e. indicate + * which hash function to be used */ + __u32 lmv_layout_version; /* Used for directory restriping */ + __u32 lmv_padding; + char lmv_pool_name[LOV_MAXPOOLNAME]; /* pool name */ + struct lu_fid lmv_stripe_fids[0]; /* FIDs for each stripe */ +}; + +union lmv_mds_md { + __u32 lmv_magic; + struct lmv_mds_md_v1 lmv_md_v1; + struct lmv_user_md lmv_user_md; +}; + +static inline int lmv_mds_md_size(int stripe_count, unsigned int lmm_magic) +{ + switch (lmm_magic) { + case LMV_MAGIC_V1: { + struct lmv_mds_md_v1 *lmm1; + + return sizeof(*lmm1) + stripe_count * + sizeof(lmm1->lmv_stripe_fids[0]); + } + default: + return -EINVAL; + } +} -#define MEA_MAGIC_LAST_CHAR 0xb2221ca1 -#define MEA_MAGIC_ALL_CHARS 0xb222a11c -#define MEA_MAGIC_HASH_SEGMENT 0xb222a11b +static inline int lmv_mds_md_stripe_count_get(const union lmv_mds_md *lmm) +{ + switch (le32_to_cpu(lmm->lmv_magic)) { + case LMV_MAGIC_V1: + return le32_to_cpu(lmm->lmv_md_v1.lmv_stripe_count); + case LMV_USER_MAGIC: + return le32_to_cpu(lmm->lmv_user_md.lum_stripe_count); + default: + return -EINVAL; + } +} -#define MAX_HASH_SIZE_32 0x7fffffffUL -#define MAX_HASH_SIZE 0x7fffffffffffffffULL -#define MAX_HASH_HIGHEST_BIT 0x1000000000000000ULL +static inline int lmv_mds_md_stripe_count_set(union lmv_mds_md *lmm, + unsigned int stripe_count) +{ + switch (le32_to_cpu(lmm->lmv_magic)) { + case LMV_MAGIC_V1: + lmm->lmv_md_v1.lmv_stripe_count = cpu_to_le32(stripe_count); + break; + case LMV_USER_MAGIC: + lmm->lmv_user_md.lum_stripe_count = cpu_to_le32(stripe_count); + break; + default: + return -EINVAL; + } + return 0; +} enum fld_rpc_opc { FLD_QUERY = 900, diff --git a/lustre/include/lustre_lite.h b/lustre/include/lustre_lite.h index 7729bdb..573841c 100644 --- a/lustre/include/lustre_lite.h +++ b/lustre/include/lustre_lite.h @@ -143,19 +143,6 @@ static inline void ll_dir_chain_fini(struct ll_dir_chain *chain) { } -static inline unsigned long hash_x_index(__u64 hash, int hash64) -{ -#ifdef __KERNEL__ - if (BITS_PER_LONG == 32 && hash64) - hash >>= 32; -#endif - /* save hash 0 as index 0 because otherwise we'll save it at - * page index end (~0UL) and it causes truncate_inode_pages_range() - * to loop forever. - */ - return ~0UL - (hash + !hash); -} - /** @} lite */ #endif diff --git a/lustre/include/lustre_lmv.h b/lustre/include/lustre_lmv.h new file mode 100644 index 0000000..a85c6d7 --- /dev/null +++ b/lustre/include/lustre_lmv.h @@ -0,0 +1,63 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License version 2 for more details. A copy is + * included in the COPYING file that accompanied this code. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * GPL HEADER END + */ +/* + * Copyright (c) 2013, Intel Corporation. + */ +/* + * lustre/include/lustre_lmv.h + * + * Lustre LMV structures and functions. + * + * Author: Di Wang + */ + +#ifndef _LUSTRE_LMV_H +#define _LUSTRE_LMV_H +#include + +struct lmv_oinfo { + struct lu_fid lmo_fid; + mdsno_t lmo_mds; + struct inode *lmo_root; +}; + +struct lmv_stripe_md { + __u32 lsm_md_magic; + __u32 lsm_md_stripe_count; + __u32 lsm_md_master_mdt_index; + __u32 lsm_md_hash_type; + __u32 lsm_md_layout_version; + __u32 lsm_md_default_count; + __u32 lsm_md_default_index; + char lsm_md_pool_name[LOV_MAXPOOLNAME]; + struct lmv_oinfo lsm_md_oinfo[0]; +}; + +union lmv_mds_md; + +int lmv_pack_md(union lmv_mds_md **lmmp, const struct lmv_stripe_md *lsm, + int stripe_count); +int lmv_alloc_md(union lmv_mds_md **lmmp, int stripe_count); +void lmv_free_md(union lmv_mds_md *lmm); +int lmv_alloc_memmd(struct lmv_stripe_md **lsmp, int stripe_count); +void lmv_free_memmd(struct lmv_stripe_md *lsm); +#endif diff --git a/lustre/include/obd.h b/lustre/include/obd.h index aceab76..3b76c34 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -917,19 +917,21 @@ struct lu_context; static inline int it_to_lock_mode(struct lookup_intent *it) { - /* CREAT needs to be tested before open (both could be set) */ - if (it->it_op & IT_CREAT) - return LCK_CW; - else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP | - IT_LAYOUT)) - return LCK_CR; + /* CREAT needs to be tested before open (both could be set) */ + if (it->it_op & IT_CREAT) + return LCK_CW; + else if (it->it_op & (IT_GETATTR | IT_OPEN | IT_LOOKUP | + IT_LAYOUT)) + return LCK_CR; + else if (it->it_op & IT_READDIR) + return LCK_PR; else if (it->it_op & IT_GETXATTR) return LCK_PR; else if (it->it_op & IT_SETXATTR) return LCK_PW; - LASSERTF(0, "Invalid it_op: %d\n", it->it_op); - return -EINVAL; + LASSERTF(0, "Invalid it_op: %d\n", it->it_op); + return -EINVAL; } struct md_op_data { @@ -974,7 +976,7 @@ struct md_op_data { __u32 op_opc; /* Used by readdir */ - __u64 op_offset; + __u64 op_hash_offset; /* Used by readdir */ __u32 op_npages; @@ -988,9 +990,13 @@ struct md_op_data { struct lustre_handle op_lease_handle; }; -enum op_cli_flags { - CLI_SET_MEA = 1 << 0, - CLI_RM_ENTRY = 1 << 1, +#define op_stripe_offset op_ioepoch +#define op_max_pages op_valid + +struct md_callback { + int (*md_blocking_ast)(struct ldlm_lock *lock, + struct ldlm_lock_desc *desc, + void *data, int flag); }; struct md_enqueue_info; @@ -1201,15 +1207,16 @@ enum { #define MAX_HASH_HIGHEST_BIT 0x1000000000000000ULL struct lustre_md { - struct mdt_body *body; - struct lov_stripe_md *lsm; - struct lmv_stripe_md *mea; + struct mdt_body *body; + struct lov_stripe_md *lsm; + struct lmv_stripe_md *lmv; #ifdef CONFIG_FS_POSIX_ACL - struct posix_acl *posix_acl; + struct posix_acl *posix_acl; #endif - struct mdt_remote_perm *remote_perm; - struct obd_capa *mds_capa; - struct obd_capa *oss_capa; + struct mdt_remote_perm *remote_perm; + struct obd_capa *mds_capa; + struct obd_capa *oss_capa; + __u64 lm_flags; }; struct md_open_data { @@ -1272,6 +1279,9 @@ struct md_ops { int (*m_readpage)(struct obd_export *, struct md_op_data *, struct page **, struct ptlrpc_request **); + int (*m_read_entry)(struct obd_export *, struct md_op_data *, + struct md_callback *cb_op, struct lu_dirent **ld); + int (*m_unlink)(struct obd_export *, struct md_op_data *, struct ptlrpc_request **); diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 7019179..a540df9 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -2015,6 +2015,19 @@ static inline int md_readpage(struct obd_export *exp, struct md_op_data *opdata, RETURN(rc); } +static inline int md_read_entry(struct obd_export *exp, + struct md_op_data *op_data, + struct md_callback *cb_op, + struct lu_dirent **ld) +{ + int rc; + ENTRY; + EXP_CHECK_MD_OP(exp, read_entry); + EXP_MD_COUNTER_INCREMENT(exp, read_entry); + rc = MDP(exp->exp_obd, read_entry)(exp, op_data, cb_op, ld); + RETURN(rc); +} + static inline int md_unlink(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { diff --git a/lustre/liblustre/dir.c b/lustre/liblustre/dir.c index c7560e8..00b111e 100644 --- a/lustre/liblustre/dir.c +++ b/lustre/liblustre/dir.c @@ -61,7 +61,6 @@ #include /* (new) readdir implementation overview can be found in lustre/llite/dir.c */ - static int llu_dir_do_readpage(struct inode *inode, struct page *page) { struct llu_inode_info *lli = llu_i2info(inode); @@ -101,7 +100,7 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page) } ldlm_lock_dump_handle(D_OTHER, &lockh); - op_data.op_offset = (__u64)hash_x_index(page->index, 0); + op_data.op_hash_offset = hash_x_index(page->index, 0); op_data.op_npages = 1; rc = md_readpage(sbi->ll_md_exp, &op_data, &page, &request); if (!rc) { diff --git a/lustre/liblustre/llite_lib.h b/lustre/liblustre/llite_lib.h index 2b427c3..f26c201 100644 --- a/lustre/liblustre/llite_lib.h +++ b/lustre/liblustre/llite_lib.h @@ -445,4 +445,14 @@ static inline void i_size_write(struct inode *inode, loff_t i_sz) { inode->i_stbuf.st_size = i_sz; } + +static inline __u64 hash_x_index(__u64 hash, int hash64) +{ + if (BITS_PER_LONG == 32 && hash64) + hash >>= 32; + /* save hash 0 as index 0 because otherwise we'll save it at + * page index end (~0UL) and it causes truncate_inode_pages_range() + * to loop forever. */ + return ~0ULL - (hash + !hash); +} #endif diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 2021897..1c849b8 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -139,456 +139,132 @@ * lmv_adjust_dirpages(). * */ - -/* returns the page unlocked, but with a reference */ -static int ll_dir_filler(void *_hash, struct page *page0) +/** + * The following three APIs will be used by llite to iterate directory + * entries from MDC dir page caches. + * + * ll_dir_entry_start(next) will lookup(return) entry by op_hash_offset. + * To avoid extra memory allocation, the @entry will be pointed to + * the dir entries in MDC page directly, so these pages can not be released + * until the entry has been accessed in ll_readdir(or statahead). + * + * The iterate process will be + * + * ll_dir_entry_start: locate the page in MDC, and return the first entry. + * hold the page. + * + * ll_dir_entry_next: return the next entry in the current page, if it reaches + * to the end, release current page. + * + * ll_dir_entry_end: release the last page. + **/ +struct lu_dirent *ll_dir_entry_start(struct inode *dir, + struct md_op_data *op_data) { - struct inode *inode = page0->mapping->host; - int hash64 = ll_i2sbi(inode)->ll_flags & LL_SBI_64BIT_HASH; - struct obd_export *exp = ll_i2sbi(inode)->ll_md_exp; - struct ptlrpc_request *request; - struct mdt_body *body; - struct md_op_data *op_data; - __u64 hash = *((__u64 *)_hash); - struct page **page_pool; - struct page *page; - struct lu_dirpage *dp; - int max_pages = ll_i2sbi(inode)->ll_md_brw_size >> PAGE_CACHE_SHIFT; - int nrdpgs = 0; /* number of pages read actually */ - int npages; - int i; + struct lu_dirent *entry; + struct md_callback cb_op; int rc; - ENTRY; - - CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p) hash "LPU64"\n", - PFID(ll_inode2fid(inode)), inode, hash); - - LASSERT(max_pages > 0 && max_pages <= MD_MAX_BRW_PAGES); - - op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0, - LUSTRE_OPC_ANY, NULL); - if (IS_ERR(op_data)) - RETURN(PTR_ERR(op_data)); - - OBD_ALLOC(page_pool, sizeof(page) * max_pages); - if (page_pool != NULL) { - page_pool[0] = page0; - } else { - page_pool = &page0; - max_pages = 1; - } - for (npages = 1; npages < max_pages; npages++) { - page = page_cache_alloc_cold(inode->i_mapping); - if (!page) - break; - page_pool[npages] = page; - } - - op_data->op_npages = npages; - op_data->op_offset = hash; - rc = md_readpage(exp, op_data, page_pool, &request); - ll_finish_md_op_data(op_data); - if (rc == 0) { - body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY); - /* Checked by mdc_readpage() */ - LASSERT(body != NULL); - - if (body->valid & OBD_MD_FLSIZE) - cl_isize_write(inode, body->size); - - nrdpgs = (request->rq_bulk->bd_nob_transferred + - PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; - SetPageUptodate(page0); - } - unlock_page(page0); - ptlrpc_req_finished(request); - - CDEBUG(D_VFSTRACE, "read %d/%d pages\n", nrdpgs, npages); - - for (i = 1; i < npages; i++) { - unsigned long offset; - int ret; - page = page_pool[i]; - - if (rc < 0 || i >= nrdpgs) { - page_cache_release(page); - continue; - } - - SetPageUptodate(page); - - dp = kmap(page); - hash = le64_to_cpu(dp->ldp_hash_start); - kunmap(page); - - offset = hash_x_index(hash, hash64); - - prefetchw(&page->flags); - ret = add_to_page_cache_lru(page, inode->i_mapping, offset, - GFP_KERNEL); - if (ret == 0) - unlock_page(page); - else - CDEBUG(D_VFSTRACE, "page %lu add to page cache failed:" - " %d\n", offset, ret); - page_cache_release(page); - } - - if (page_pool != &page0) - OBD_FREE(page_pool, sizeof(struct page *) * max_pages); - - RETURN(rc); + cb_op.md_blocking_ast = ll_md_blocking_ast; + rc = md_read_entry(ll_i2mdexp(dir), op_data, &cb_op, &entry); + if (rc != 0) + entry = ERR_PTR(rc); + return entry; } -static void ll_check_page(struct inode *dir, struct page *page) +struct lu_dirent *ll_dir_entry_next(struct inode *dir, + struct md_op_data *op_data, + struct lu_dirent *ent) { - /* XXX: check page format later */ - SetPageChecked(page); -} + struct lu_dirent *entry; + struct md_callback cb_op; + int rc; -void ll_release_page(struct page *page, int remove) -{ - kunmap(page); - if (remove) { - lock_page(page); - if (likely(page->mapping != NULL)) - truncate_complete_page(page->mapping, page); - unlock_page(page); - } - page_cache_release(page); + cb_op.md_blocking_ast = ll_md_blocking_ast; + op_data->op_hash_offset = le64_to_cpu(ent->lde_hash); + rc = md_read_entry(ll_i2mdexp(dir), op_data, &cb_op, &entry); + if (rc != 0) + entry = ERR_PTR(rc); + return entry; } -/* - * Find, kmap and return page that contains given hash. - */ -static struct page *ll_dir_page_locate(struct inode *dir, __u64 *hash, - __u64 *start, __u64 *end) +void ll_dir_entry_end(struct inode *dir, struct md_op_data *op_data, + struct lu_dirent *ent) { - int hash64 = ll_i2sbi(dir)->ll_flags & LL_SBI_64BIT_HASH; - struct address_space *mapping = dir->i_mapping; - /* - * Complement of hash is used as an index so that - * radix_tree_gang_lookup() can be used to find a page with starting - * hash _smaller_ than one we are looking for. - */ - unsigned long offset = hash_x_index(*hash, hash64); - struct page *page; - int found; - - spin_lock_irq(&mapping->tree_lock); - found = radix_tree_gang_lookup(&mapping->page_tree, - (void **)&page, offset, 1); - if (found > 0) { - struct lu_dirpage *dp; - - page_cache_get(page); - spin_unlock_irq(&mapping->tree_lock); - /* - * In contrast to find_lock_page() we are sure that directory - * page cannot be truncated (while DLM lock is held) and, - * hence, can avoid restart. - * - * In fact, page cannot be locked here at all, because - * ll_dir_filler() does synchronous io. - */ - wait_on_page_locked(page); - if (PageUptodate(page)) { - dp = kmap(page); - if (BITS_PER_LONG == 32 && hash64) { - *start = le64_to_cpu(dp->ldp_hash_start) >> 32; - *end = le64_to_cpu(dp->ldp_hash_end) >> 32; - *hash = *hash >> 32; - } else { - *start = le64_to_cpu(dp->ldp_hash_start); - *end = le64_to_cpu(dp->ldp_hash_end); - } - LASSERTF(*start <= *hash, "start = "LPX64",end = " - LPX64",hash = "LPX64"\n", *start, *end, *hash); - CDEBUG(D_VFSTRACE, "page %lu [%llu %llu], hash "LPU64"\n", - offset, *start, *end, *hash); - if (*hash > *end) { - ll_release_page(page, 0); - page = NULL; - } else if (*end != *start && *hash == *end) { - /* - * upon hash collision, remove this page, - * otherwise put page reference, and - * ll_get_dir_page() will issue RPC to fetch - * the page we want. - */ - ll_release_page(page, - le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); - page = NULL; - } - } else { - page_cache_release(page); - page = ERR_PTR(-EIO); - } + struct lu_dirent *entry; + struct md_callback cb_op; - } else { - spin_unlock_irq(&mapping->tree_lock); - page = NULL; - } - return page; + cb_op.md_blocking_ast = ll_md_blocking_ast; + op_data->op_cli_flags = CLI_READENT_END; + md_read_entry(ll_i2mdexp(dir), op_data, &cb_op, &entry); + return; } -struct page *ll_get_dir_page(struct inode *dir, __u64 hash, - struct ll_dir_chain *chain) +int ll_dir_read(struct inode *inode, struct md_op_data *op_data, + void *cookie, filldir_t filldir) { - ldlm_policy_data_t policy = {.l_inodebits = {MDS_INODELOCK_UPDATE} }; - struct address_space *mapping = dir->i_mapping; - struct lustre_handle lockh; - struct lu_dirpage *dp; - struct page *page; - ldlm_mode_t mode; - int rc; - __u64 start = 0; - __u64 end = 0; - __u64 lhash = hash; - struct ll_inode_info *lli = ll_i2info(dir); - int hash64 = ll_i2sbi(dir)->ll_flags & LL_SBI_64BIT_HASH; - - mode = LCK_PR; - rc = md_lock_match(ll_i2sbi(dir)->ll_md_exp, LDLM_FL_BLOCK_GRANTED, - ll_inode2fid(dir), LDLM_IBITS, &policy, mode, &lockh); - if (!rc) { - struct ldlm_enqueue_info einfo = { - .ei_type = LDLM_IBITS, - .ei_mode = mode, - .ei_cb_bl = ll_md_blocking_ast, - .ei_cb_cp = ldlm_completion_ast, - }; - struct lookup_intent it = { .it_op = IT_READDIR }; - struct ptlrpc_request *request; - struct md_op_data *op_data; - - op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0, - LUSTRE_OPC_ANY, NULL); - if (IS_ERR(op_data)) - return (void *)op_data; - - rc = md_enqueue(ll_i2sbi(dir)->ll_md_exp, &einfo, &it, - op_data, &lockh, NULL, 0, NULL, 0); - - ll_finish_md_op_data(op_data); - - request = (struct ptlrpc_request *)it.d.lustre.it_data; - if (request) - ptlrpc_req_finished(request); - if (rc < 0) { - CERROR("lock enqueue: "DFID" at "LPU64": rc %d\n", - PFID(ll_inode2fid(dir)), hash, rc); - return ERR_PTR(rc); - } - - CDEBUG(D_INODE, "setting lr_lvb_inode to inode "DFID"(%p)\n", - PFID(ll_inode2fid(dir)), dir); - md_set_lock_data(ll_i2sbi(dir)->ll_md_exp, - &it.d.lustre.it_lock_handle, dir, NULL); - } else { - /* for cross-ref object, l_ast_data of the lock may not be set, - * we reset it here */ - md_set_lock_data(ll_i2sbi(dir)->ll_md_exp, &lockh.cookie, - dir, NULL); - } - ldlm_lock_dump_handle(D_OTHER, &lockh); - - mutex_lock(&lli->lli_readdir_mutex); - page = ll_dir_page_locate(dir, &lhash, &start, &end); - if (IS_ERR(page)) { - CERROR("dir page locate: "DFID" at "LPU64": rc %ld\n", - PFID(ll_inode2fid(dir)), lhash, PTR_ERR(page)); - GOTO(out_unlock, page); - } else if (page != NULL) { - /* - * XXX nikita: not entirely correct handling of a corner case: - * suppose hash chain of entries with hash value HASH crosses - * border between pages P0 and P1. First both P0 and P1 are - * cached, seekdir() is called for some entry from the P0 part - * of the chain. Later P0 goes out of cache. telldir(HASH) - * happens and finds P1, as it starts with matching hash - * value. Remaining entries from P0 part of the chain are - * skipped. (Is that really a bug?) - * - * Possible solutions: 0. don't cache P1 is such case, handle - * it as an "overflow" page. 1. invalidate all pages at - * once. 2. use HASH|1 as an index for P1. - */ - GOTO(hash_collision, page); - } + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ll_dir_chain chain; + struct lu_dirent *ent; + int api32 = ll_need_32bit_api(sbi); + int hash64 = sbi->ll_flags & LL_SBI_64BIT_HASH; + int done = 0; + int rc = 0; + __u64 hash = MDS_DIR_END_OFF; + __u64 last_hash = MDS_DIR_END_OFF; + ENTRY; - page = read_cache_page(mapping, hash_x_index(hash, hash64), - ll_dir_filler, &lhash); - if (IS_ERR(page)) { - CERROR("read cache page: "DFID" at "LPU64": rc %ld\n", - PFID(ll_inode2fid(dir)), hash, PTR_ERR(page)); - GOTO(out_unlock, page); - } + ll_dir_chain_init(&chain); + for (ent = ll_dir_entry_start(inode, op_data); + ent != NULL && !IS_ERR(ent) && !done; + ent = ll_dir_entry_next(inode, op_data, ent)) { + __u16 type; + int namelen; + struct lu_fid fid; + __u64 lhash; + __u64 ino; + + hash = le64_to_cpu(ent->lde_hash); + if (hash < op_data->op_hash_offset) + /* + * Skip until we find target hash + * value. + */ + continue; + namelen = le16_to_cpu(ent->lde_namelen); + if (namelen == 0) + /* + * Skip dummy record. + */ + continue; - wait_on_page_locked(page); - (void)kmap(page); - if (!PageUptodate(page)) { - CERROR("page not updated: "DFID" at "LPU64": rc %d\n", - PFID(ll_inode2fid(dir)), hash, -5); - goto fail; + if (api32 && hash64) + lhash = hash >> 32; + else + lhash = hash; + fid_le_to_cpu(&fid, &ent->lde_fid); + ino = cl_fid_build_ino(&fid, api32); + type = ll_dirent_type_get(ent); + /* For 'll_nfs_get_name_filldir()', it will try + * to access the 'ent' through its 'lde_name', + * so the parameter 'name' for 'filldir()' must + * be part of the 'ent'. */ + done = filldir(cookie, ent->lde_name, namelen, lhash, + ino, type); + if (done) { + if (op_data->op_hash_offset != MDS_DIR_END_OFF) + op_data->op_hash_offset = last_hash; + break; + } else { + last_hash = hash; + } } - if (!PageChecked(page)) - ll_check_page(dir, page); - if (PageError(page)) { - CERROR("page error: "DFID" at "LPU64": rc %d\n", - PFID(ll_inode2fid(dir)), hash, -5); - goto fail; - } -hash_collision: - dp = page_address(page); - if (BITS_PER_LONG == 32 && hash64) { - start = le64_to_cpu(dp->ldp_hash_start) >> 32; - end = le64_to_cpu(dp->ldp_hash_end) >> 32; - lhash = hash >> 32; - } else { - start = le64_to_cpu(dp->ldp_hash_start); - end = le64_to_cpu(dp->ldp_hash_end); - lhash = hash; - } - if (end == start) { - LASSERT(start == lhash); - CWARN("Page-wide hash collision: "LPU64"\n", end); - if (BITS_PER_LONG == 32 && hash64) - CWARN("Real page-wide hash collision at ["LPU64" "LPU64 - "] with hash "LPU64"\n", - le64_to_cpu(dp->ldp_hash_start), - le64_to_cpu(dp->ldp_hash_end), hash); - /* - * Fetch whole overflow chain... - * - * XXX not yet. - */ - goto fail; - } -out_unlock: - mutex_unlock(&lli->lli_readdir_mutex); - ldlm_lock_decref(&lockh, mode); - return page; - -fail: - ll_release_page(page, 1); - page = ERR_PTR(-EIO); - goto out_unlock; -} -int ll_dir_read(struct inode *inode, __u64 *_pos, void *cookie, - filldir_t filldir) -{ - struct ll_inode_info *info = ll_i2info(inode); - struct ll_sb_info *sbi = ll_i2sbi(inode); - __u64 pos = *_pos; - int api32 = ll_need_32bit_api(sbi); - int hash64 = sbi->ll_flags & LL_SBI_64BIT_HASH; - struct page *page; - struct ll_dir_chain chain; - int done = 0; - int rc = 0; - ENTRY; + if (IS_ERR(ent)) + rc = PTR_ERR(ent); + else if (ent != NULL) + ll_dir_entry_end(inode, op_data, ent); - ll_dir_chain_init(&chain); - - page = ll_get_dir_page(inode, pos, &chain); - - while (rc == 0 && !done) { - struct lu_dirpage *dp; - struct lu_dirent *ent; - - if (!IS_ERR(page)) { - /* - * If page is empty (end of directory is reached), - * use this value. - */ - __u64 hash = MDS_DIR_END_OFF; - __u64 next; - - dp = page_address(page); - for (ent = lu_dirent_start(dp); ent != NULL && !done; - ent = lu_dirent_next(ent)) { - __u16 type; - int namelen; - struct lu_fid fid; - __u64 lhash; - __u64 ino; - - /* - * XXX: implement correct swabbing here. - */ - - hash = le64_to_cpu(ent->lde_hash); - if (hash < pos) - /* - * Skip until we find target hash - * value. - */ - continue; - - namelen = le16_to_cpu(ent->lde_namelen); - if (namelen == 0) - /* - * Skip dummy record. - */ - continue; - - if (api32 && hash64) - lhash = hash >> 32; - else - lhash = hash; - fid_le_to_cpu(&fid, &ent->lde_fid); - ino = cl_fid_build_ino(&fid, api32); - type = ll_dirent_type_get(ent); - /* For 'll_nfs_get_name_filldir()', it will try - * to access the 'ent' through its 'lde_name', - * so the parameter 'name' for 'filldir()' must - * be part of the 'ent'. */ - done = filldir(cookie, ent->lde_name, namelen, - lhash, ino, type); - } - next = le64_to_cpu(dp->ldp_hash_end); - if (!done) { - pos = next; - if (pos == MDS_DIR_END_OFF) { - /* - * End of directory reached. - */ - done = 1; - ll_release_page(page, 0); - } else if (1 /* chain is exhausted*/) { - /* - * Normal case: continue to the next - * page. - */ - ll_release_page(page, - le32_to_cpu(dp->ldp_flags) & - LDF_COLLIDE); - next = pos; - page = ll_get_dir_page(inode, pos, - &chain); - } else { - /* - * go into overflow page. - */ - LASSERT(le32_to_cpu(dp->ldp_flags) & - LDF_COLLIDE); - ll_release_page(page, 1); - } - } else { - pos = hash; - ll_release_page(page, 0); - } - } else { - rc = PTR_ERR(page); - CERROR("error reading dir "DFID" at %lu: rc %d\n", - PFID(&info->lli_fid), (unsigned long)pos, rc); - } - } - - *_pos = pos; ll_dir_chain_fini(&chain); RETURN(rc); } @@ -598,9 +274,10 @@ static int ll_readdir(struct file *filp, void *cookie, filldir_t filldir) struct inode *inode = filp->f_dentry->d_inode; struct ll_file_data *lfd = LUSTRE_FPRIVATE(filp); struct ll_sb_info *sbi = ll_i2sbi(inode); - __u64 pos; + __u64 pos = lfd->lfd_pos; int hash64 = sbi->ll_flags & LL_SBI_64BIT_HASH; int api32 = ll_need_32bit_api(sbi); + struct md_op_data *op_data; int rc; #ifdef HAVE_TOUCH_ATIME_1ARG struct path path; @@ -612,8 +289,8 @@ static int ll_readdir(struct file *filp, void *cookie, filldir_t filldir) else pos = 0; - CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p) pos %lu/%llu " - " 32bit_api %d\n", PFID(ll_inode2fid(inode)), + CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p) pos/size" + "%lu/%llu 32bit_api %d\n", PFID(ll_inode2fid(inode)), inode, (unsigned long)pos, i_size_read(inode), api32); if (pos == MDS_DIR_END_OFF) @@ -622,20 +299,30 @@ static int ll_readdir(struct file *filp, void *cookie, filldir_t filldir) */ GOTO(out, rc = 0); - rc = ll_dir_read(inode, &pos, cookie, filldir); + op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, 0, 0, + LUSTRE_OPC_ANY, inode); + if (IS_ERR(op_data)) + GOTO(out, rc = PTR_ERR(op_data)); + + op_data->op_hash_offset = pos; + op_data->op_max_pages = sbi->ll_md_brw_size >> PAGE_CACHE_SHIFT; + rc = ll_dir_read(inode, op_data, cookie, filldir); if (lfd != NULL) - lfd->lfd_pos = pos; - if (pos == MDS_DIR_END_OFF) { - if (api32) - filp->f_pos = LL_DIR_END_OFF_32BIT; - else - filp->f_pos = LL_DIR_END_OFF; - } else { - if (api32 && hash64) - filp->f_pos = pos >> 32; - else - filp->f_pos = pos; - } + lfd->lfd_pos = op_data->op_hash_offset; + + if (pos == MDS_DIR_END_OFF) { + if (api32) + filp->f_pos = LL_DIR_END_OFF_32BIT; + else + filp->f_pos = LL_DIR_END_OFF; + } else { + if (api32 && hash64) + filp->f_pos = op_data->op_hash_offset >> 32; + else + filp->f_pos = op_data->op_hash_offset; + } + + ll_finish_md_op_data(op_data); filp->f_version = inode->i_version; #ifdef HAVE_TOUCH_ATIME_1ARG #ifdef HAVE_F_PATH_MNT diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 9da81ca..e4482f5 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -198,6 +198,8 @@ struct ll_inode_info { /* "opendir_pid" is the token when lookup/revalid * -- I am the owner of dir statahead. */ pid_t d_opendir_pid; + /* directory stripe information */ + struct lmv_stripe_md *d_lmv_md; } d; #define lli_readdir_mutex u.d.d_readdir_mutex @@ -206,6 +208,7 @@ struct ll_inode_info { #define lli_def_acl u.d.d_def_acl #define lli_sa_lock u.d.d_sa_lock #define lli_opendir_pid u.d.d_opendir_pid +#define lli_lmv_md u.d.d_lmv_md /* for non-directory */ struct { @@ -717,15 +720,20 @@ static void lprocfs_llite_init_vars(struct lprocfs_static_vars *lvars) /* llite/dir.c */ -void ll_release_page(struct page *page, int remove); extern struct file_operations ll_dir_operations; extern struct inode_operations ll_dir_inode_operations; -struct page *ll_get_dir_page(struct inode *dir, __u64 hash, - struct ll_dir_chain *chain); -int ll_dir_read(struct inode *inode, __u64 *_pos, void *cookie, - filldir_t filldir); - +int ll_dir_read(struct inode *inode, struct md_op_data *op_data, + void *cookie, filldir_t filldir); int ll_get_mdt_idx(struct inode *inode); + +struct lu_dirent *ll_dir_entry_start(struct inode *dir, + struct md_op_data *op_data); + +struct lu_dirent *ll_dir_entry_next(struct inode *dir, + struct md_op_data *op_data, + struct lu_dirent *ent); +void ll_dir_entry_end(struct inode *dir, struct md_op_data *op_data, + struct lu_dirent *ent); /* llite/namei.c */ int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir); diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 1aec027..1056a83 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -2320,17 +2320,27 @@ struct md_op_data * ll_prep_md_op_data(struct md_op_data *op_data, if (op_data == NULL) return ERR_PTR(-ENOMEM); - ll_i2gids(op_data->op_suppgids, i1, i2); - op_data->op_fid1 = *ll_inode2fid(i1); - op_data->op_capa1 = ll_mdscapa_get(i1); + ll_i2gids(op_data->op_suppgids, i1, i2); + op_data->op_fid1 = *ll_inode2fid(i1); + op_data->op_capa1 = ll_mdscapa_get(i1); + if (S_ISDIR(i1->i_mode)) + op_data->op_mea1 = ll_i2info(i1)->lli_lmv_md; + + if (i2) { + op_data->op_fid2 = *ll_inode2fid(i2); + op_data->op_capa2 = ll_mdscapa_get(i2); + if (S_ISDIR(i2->i_mode)) + op_data->op_mea2 = ll_i2info(i2)->lli_lmv_md; + } else { + fid_zero(&op_data->op_fid2); + op_data->op_capa2 = NULL; + } - if (i2) { - op_data->op_fid2 = *ll_inode2fid(i2); - op_data->op_capa2 = ll_mdscapa_get(i2); - } else { - fid_zero(&op_data->op_fid2); - op_data->op_capa2 = NULL; - } + if (ll_i2sbi(i1)->ll_flags & LL_SBI_64BIT_HASH) + op_data->op_cli_flags |= CLI_HASH64; + + if (ll_need_32bit_api(ll_i2sbi(i1))) + op_data->op_cli_flags |= CLI_API32; op_data->op_name = name; op_data->op_namelen = namelen; diff --git a/lustre/llite/llite_nfs.c b/lustre/llite/llite_nfs.c index 2a2eb19..68616e9 100644 --- a/lustre/llite/llite_nfs.c +++ b/lustre/llite/llite_nfs.c @@ -235,11 +235,11 @@ static int ll_nfs_get_name_filldir(void *cookie, const char *name, int namelen, static int ll_get_name(struct dentry *dentry, char *name, struct dentry *child) { - struct inode *dir = dentry->d_inode; - struct ll_getname_data lgd; - __u64 offset = 0; - int rc; - ENTRY; + struct inode *dir = dentry->d_inode; + struct ll_getname_data lgd; + struct md_op_data *op_data; + int rc; + ENTRY; if (!dir || !S_ISDIR(dir->i_mode)) GOTO(out, rc = -ENOTDIR); @@ -251,15 +251,23 @@ static int ll_get_name(struct dentry *dentry, char *name, lgd.lgd_fid = ll_i2info(child->d_inode)->lli_fid; lgd.lgd_found = 0; + op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0, + LUSTRE_OPC_ANY, dir); + if (IS_ERR(op_data)) + GOTO(out, rc = PTR_ERR(op_data)); + + op_data->op_hash_offset = 0; + op_data->op_max_pages = + ll_i2sbi(dir)->ll_md_brw_size >> PAGE_CACHE_SHIFT; mutex_lock(&dir->i_mutex); - rc = ll_dir_read(dir, &offset, &lgd, ll_nfs_get_name_filldir); + rc = ll_dir_read(dir, op_data, &lgd, ll_nfs_get_name_filldir); mutex_unlock(&dir->i_mutex); - if (!rc && !lgd.lgd_found) - rc = -ENOENT; - EXIT; - + ll_finish_md_op_data(op_data); + if (!rc && !lgd.lgd_found) + rc = -ENOENT; + EXIT; out: - return rc; + return rc; } static struct dentry *ll_fh_to_dentry(struct super_block *sb, struct fid *fid, diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index ca3faaa..01521ef 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -1058,25 +1058,34 @@ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai) static int ll_statahead_thread(void *arg) { - struct dentry *parent = (struct dentry *)arg; - struct inode *dir = parent->d_inode; - struct ll_inode_info *plli = ll_i2info(dir); - struct ll_inode_info *clli; - struct ll_sb_info *sbi = ll_i2sbi(dir); - struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai); - struct ptlrpc_thread *thread = &sai->sai_thread; - struct ptlrpc_thread *agl_thread = &sai->sai_agl_thread; - struct page *page; - __u64 pos = 0; - int first = 0; - int rc = 0; - struct ll_dir_chain chain; - struct l_wait_info lwi = { 0 }; - ENTRY; + struct dentry *parent = (struct dentry *)arg; + struct inode *dir = parent->d_inode; + struct ll_inode_info *plli = ll_i2info(dir); + struct ll_inode_info *clli; + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai); + struct ptlrpc_thread *thread = &sai->sai_thread; + struct ptlrpc_thread *agl_thread = &sai->sai_agl_thread; + int first = 0; + int rc = 0; + struct md_op_data *op_data; + struct ll_dir_chain chain; + struct l_wait_info lwi = { 0 }; + struct lu_dirent *ent; + ENTRY; CDEBUG(D_READA, "statahead thread started: [pid %d] [parent %.*s]\n", current_pid(), parent->d_name.len, parent->d_name.name); + op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0, + LUSTRE_OPC_ANY, dir); + if (IS_ERR(op_data)) + RETURN(PTR_ERR(op_data)); + + op_data->op_hash_offset = 0; + op_data->op_max_pages = + ll_i2sbi(dir)->ll_md_brw_size >> PAGE_CACHE_SHIFT; + if (sbi->ll_flags & LL_SBI_AGL_ENABLED) ll_start_agl(parent, sai); @@ -1087,177 +1096,139 @@ static int ll_statahead_thread(void *arg) wake_up(&thread->t_ctl_waitq); ll_dir_chain_init(&chain); - page = ll_get_dir_page(dir, pos, &chain); - - while (1) { - struct lu_dirpage *dp; - struct lu_dirent *ent; - - if (IS_ERR(page)) { - rc = PTR_ERR(page); - CDEBUG(D_READA, "error reading dir "DFID" at "LPU64 - "/"LPU64": [rc %d] [parent %u]\n", - PFID(ll_inode2fid(dir)), pos, sai->sai_index, - rc, plli->lli_opendir_pid); - GOTO(out, rc); - } - - dp = page_address(page); - for (ent = lu_dirent_start(dp); ent != NULL; - ent = lu_dirent_next(ent)) { - __u64 hash; - int namelen; - char *name; - - hash = le64_to_cpu(ent->lde_hash); - if (unlikely(hash < pos)) - /* - * Skip until we find target hash value. - */ - continue; - - namelen = le16_to_cpu(ent->lde_namelen); - if (unlikely(namelen == 0)) - /* - * Skip dummy record. - */ - continue; - - name = ent->lde_name; - if (name[0] == '.') { - if (namelen == 1) { - /* - * skip "." - */ - continue; - } else if (name[1] == '.' && namelen == 2) { - /* - * skip ".." - */ - continue; - } else if (!sai->sai_ls_all) { - /* - * skip hidden files. - */ - sai->sai_skip_hidden++; - continue; - } - } + for (ent = ll_dir_entry_start(dir, op_data); + ent != NULL && !IS_ERR(ent); + ent = ll_dir_entry_next(dir, op_data, ent)) { + __u64 hash; + int namelen; + char *name; + + hash = le64_to_cpu(ent->lde_hash); + if (unlikely(hash < op_data->op_hash_offset)) + /* + * Skip until we find target hash value. + */ + continue; + + namelen = le16_to_cpu(ent->lde_namelen); + if (unlikely(namelen == 0)) + /* + * Skip dummy record. + */ + continue; + + name = ent->lde_name; + if (name[0] == '.') { + if (namelen == 1) { + /* + * skip "." + */ + continue; + } else if (name[1] == '.' && namelen == 2) { + /* + * skip ".." + */ + continue; + } else if (!sai->sai_ls_all) { + /* + * skip hidden files. + */ + sai->sai_skip_hidden++; + continue; + } + } - /* - * don't stat-ahead first entry. - */ - if (unlikely(++first == 1)) - continue; + /* + * don't stat-ahead first entry. + */ + if (unlikely(++first == 1)) + continue; keep_it: - l_wait_event(thread->t_ctl_waitq, - !sa_sent_full(sai) || - !sa_received_empty(sai) || - !agl_list_empty(sai) || - !thread_is_running(thread), - &lwi); + l_wait_event(thread->t_ctl_waitq, + !sa_sent_full(sai) || + !sa_received_empty(sai) || + !agl_list_empty(sai) || + !thread_is_running(thread), + &lwi); interpret_it: - while (!sa_received_empty(sai)) - ll_post_statahead(sai); - - if (unlikely(!thread_is_running(thread))) { - ll_release_page(page, 0); - GOTO(out, rc = 0); - } - - /* If no window for metadata statahead, but there are - * some AGL entries to be triggered, then try to help - * to process the AGL entries. */ - if (sa_sent_full(sai)) { - spin_lock(&plli->lli_agl_lock); - while (!agl_list_empty(sai)) { - clli = agl_first_entry(sai); - cfs_list_del_init(&clli->lli_agl_list); - spin_unlock(&plli->lli_agl_lock); - ll_agl_trigger(&clli->lli_vfs_inode, - sai); - - if (!sa_received_empty(sai)) - goto interpret_it; - - if (unlikely( - !thread_is_running(thread))) { - ll_release_page(page, 0); - GOTO(out, rc = 0); - } - - if (!sa_sent_full(sai)) - goto do_it; - - spin_lock(&plli->lli_agl_lock); - } - spin_unlock(&plli->lli_agl_lock); - - goto keep_it; - } + while (!sa_received_empty(sai)) + ll_post_statahead(sai); -do_it: - ll_statahead_one(parent, name, namelen); - } - pos = le64_to_cpu(dp->ldp_hash_end); - if (pos == MDS_DIR_END_OFF) { - /* - * End of directory reached. - */ - ll_release_page(page, 0); - while (1) { - l_wait_event(thread->t_ctl_waitq, - !sa_received_empty(sai) || - sai->sai_sent == sai->sai_replied|| - !thread_is_running(thread), - &lwi); - - while (!sa_received_empty(sai)) - ll_post_statahead(sai); - - if (unlikely(!thread_is_running(thread))) - GOTO(out, rc = 0); - - if (sai->sai_sent == sai->sai_replied && - sa_received_empty(sai)) - break; - } + if (unlikely(!thread_is_running(thread))) + GOTO(out, rc = 0); + /* If no window for metadata statahead, but there are + * some AGL entries to be triggered, then try to help + * to process the AGL entries. */ + if (sa_sent_full(sai)) { spin_lock(&plli->lli_agl_lock); - while (!agl_list_empty(sai) && - thread_is_running(thread)) { + while (!agl_list_empty(sai)) { clli = agl_first_entry(sai); cfs_list_del_init(&clli->lli_agl_list); spin_unlock(&plli->lli_agl_lock); - ll_agl_trigger(&clli->lli_vfs_inode, sai); + ll_agl_trigger(&clli->lli_vfs_inode, + sai); + + if (!sa_received_empty(sai)) + goto interpret_it; + + if (unlikely( + !thread_is_running(thread))) + GOTO(out, rc = 0); + + if (!sa_sent_full(sai)) + goto do_it; + spin_lock(&plli->lli_agl_lock); } spin_unlock(&plli->lli_agl_lock); - GOTO(out, rc = 0); - } else if (1) { - /* - * chain is exhausted. - * Normal case: continue to the next page. - */ - ll_release_page(page, le32_to_cpu(dp->ldp_flags) & - LDF_COLLIDE); - sai->sai_in_readpage = 1; - page = ll_get_dir_page(dir, pos, &chain); - sai->sai_in_readpage = 0; - } else { - LASSERT(le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); - ll_release_page(page, 1); - /* - * go into overflow page. - */ - } - } - EXIT; + goto keep_it; + } + +do_it: + ll_statahead_one(parent, name, namelen); + } + + if (ent != NULL && !IS_ERR(ent)) + ll_dir_entry_end(dir, op_data, ent); + + /* + * End of directory reached. + */ + while (1) { + l_wait_event(thread->t_ctl_waitq, + !sa_received_empty(sai) || + sai->sai_sent == sai->sai_replied || + !thread_is_running(thread), + &lwi); + + while (!sa_received_empty(sai)) + ll_post_statahead(sai); + if (unlikely(!thread_is_running(thread))) + GOTO(out, rc = 0); + + if (sai->sai_sent == sai->sai_replied && + sa_received_empty(sai)) + break; + } + + spin_lock(&plli->lli_agl_lock); + while (!agl_list_empty(sai) && + thread_is_running(thread)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + spin_unlock(&plli->lli_agl_lock); + ll_agl_trigger(&clli->lli_vfs_inode, sai); + spin_lock(&plli->lli_agl_lock); + } + spin_unlock(&plli->lli_agl_lock); out: + EXIT; + ll_finish_md_op_data(op_data); if (sai->sai_agl_valid) { spin_lock(&plli->lli_agl_lock); thread_set_flags(agl_thread, SVC_STOPPING); @@ -1361,115 +1332,89 @@ enum { static int is_first_dirent(struct inode *dir, struct dentry *dentry) { - struct ll_dir_chain chain; - struct qstr *target = &dentry->d_name; - struct page *page; - __u64 pos = 0; - int dot_de; - int rc = LS_NONE_FIRST_DE; - ENTRY; - - ll_dir_chain_init(&chain); - page = ll_get_dir_page(dir, pos, &chain); - - while (1) { - struct lu_dirpage *dp; - struct lu_dirent *ent; - - if (IS_ERR(page)) { - struct ll_inode_info *lli = ll_i2info(dir); - - rc = PTR_ERR(page); - CERROR("error reading dir "DFID" at "LPU64": " - "[rc %d] [parent %u]\n", - PFID(ll_inode2fid(dir)), pos, - rc, lli->lli_opendir_pid); - break; - } + struct ll_dir_chain chain; + struct qstr *target = &dentry->d_name; + struct md_op_data *op_data; + int dot_de; + struct lu_dirent *ent; + int rc = LS_NONE_FIRST_DE; + ENTRY; - dp = page_address(page); - for (ent = lu_dirent_start(dp); ent != NULL; - ent = lu_dirent_next(ent)) { - __u64 hash; - int namelen; - char *name; - - hash = le64_to_cpu(ent->lde_hash); - /* The ll_get_dir_page() can return any page containing - * the given hash which may be not the start hash. */ - if (unlikely(hash < pos)) - continue; - - namelen = le16_to_cpu(ent->lde_namelen); - if (unlikely(namelen == 0)) - /* - * skip dummy record. - */ - continue; + ll_dir_chain_init(&chain); - name = ent->lde_name; - if (name[0] == '.') { - if (namelen == 1) - /* - * skip "." - */ - continue; - else if (name[1] == '.' && namelen == 2) - /* - * skip ".." - */ - continue; - else - dot_de = 1; - } else { - dot_de = 0; - } + op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0, + LUSTRE_OPC_ANY, dir); + if (IS_ERR(op_data)) + GOTO(out, rc = PTR_ERR(op_data)); + /** + *FIXME choose the start offset of the readdir + */ + op_data->op_stripe_offset = 0; + op_data->op_hash_offset = 0; + op_data->op_max_pages = + ll_i2sbi(dir)->ll_md_brw_size >> PAGE_CACHE_SHIFT; + + for (ent = ll_dir_entry_start(dir, op_data); + ent != NULL && !IS_ERR(ent); + ent = ll_dir_entry_next(dir, op_data, ent)) { + __u64 hash; + int namelen; + char *name; + + hash = le64_to_cpu(ent->lde_hash); + /* The ll_get_dir_page() can return any page containing + * the given hash which may be not the start hash. */ + if (unlikely(hash < op_data->op_hash_offset)) + continue; + + namelen = le16_to_cpu(ent->lde_namelen); + if (unlikely(namelen == 0)) + /* + * skip dummy record. + */ + continue; + + name = ent->lde_name; + if (name[0] == '.') { + if (namelen == 1) + /* + * skip "." + */ + continue; + else if (name[1] == '.' && namelen == 2) + /* + * skip ".." + */ + continue; + else + dot_de = 1; + } else { + dot_de = 0; + } - if (dot_de && target->name[0] != '.') { - CDEBUG(D_READA, "%.*s skip hidden file %.*s\n", - target->len, target->name, - namelen, name); - continue; - } + if (dot_de && target->name[0] != '.') { + CDEBUG(D_READA, "%.*s skip hidden file %.*s\n", + target->len, target->name, + namelen, name); + continue; + } - if (target->len != namelen || - memcmp(target->name, name, namelen) != 0) - rc = LS_NONE_FIRST_DE; - else if (!dot_de) - rc = LS_FIRST_DE; - else - rc = LS_FIRST_DOT_DE; + if (target->len != namelen || + memcmp(target->name, name, namelen) != 0) + rc = LS_NONE_FIRST_DE; + else if (!dot_de) + rc = LS_FIRST_DE; + else + rc = LS_FIRST_DOT_DE; - ll_release_page(page, 0); - GOTO(out, rc); - } - pos = le64_to_cpu(dp->ldp_hash_end); - if (pos == MDS_DIR_END_OFF) { - /* - * End of directory reached. - */ - ll_release_page(page, 0); - break; - } else if (1) { - /* - * chain is exhausted - * Normal case: continue to the next page. - */ - ll_release_page(page, le32_to_cpu(dp->ldp_flags) & - LDF_COLLIDE); - page = ll_get_dir_page(dir, pos, &chain); - } else { - /* - * go into overflow page. - */ - LASSERT(le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); - ll_release_page(page, 1); - } - } + break; + } EXIT; - + if (ent != NULL && !IS_ERR(ent)) + ll_dir_entry_end(dir, op_data, ent); + ll_finish_md_op_data(op_data); out: - ll_dir_chain_fini(&chain); + ll_dir_chain_fini(&chain); return rc; } diff --git a/lustre/lmv/lmv_internal.h b/lustre/lmv/lmv_internal.h index 1d027d7..de07e16 100644 --- a/lustre/lmv/lmv_internal.h +++ b/lustre/lmv/lmv_internal.h @@ -39,6 +39,7 @@ #include #include +#include #define LMV_MAX_TGT_COUNT 128 @@ -77,39 +78,6 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid, struct md_op_data *op_data); -static inline struct lmv_stripe_md *lmv_get_mea(struct ptlrpc_request *req) -{ - struct mdt_body *body; - struct lmv_stripe_md *mea; - - LASSERT(req != NULL); - - body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - - if (!body || !S_ISDIR(body->mode) || !body->eadatasize) - return NULL; - - mea = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, - body->eadatasize); - LASSERT(mea != NULL); - - if (mea->mea_count == 0) - return NULL; - if( mea->mea_magic != MEA_MAGIC_LAST_CHAR && - mea->mea_magic != MEA_MAGIC_ALL_CHARS && - mea->mea_magic != MEA_MAGIC_HASH_SEGMENT) - return NULL; - - return mea; -} - -static inline int lmv_get_easize(struct lmv_obd *lmv) -{ - return sizeof(struct lmv_stripe_md) + - lmv->desc.ld_tgt_count * - sizeof(struct lu_fid); -} - static inline struct lmv_tgt_desc * lmv_get_target(struct lmv_obd *lmv, mdsno_t mds) { @@ -142,6 +110,42 @@ lmv_find_target(struct lmv_obd *lmv, const struct lu_fid *fid) return lmv_get_target(lmv, mds); } +static inline unsigned int +mea_last_char_hash(unsigned int count, const char *name, int namelen) +{ + unsigned int c; + + c = name[namelen - 1]; + if (c == 0) + CWARN("invalid name %.*s\n", namelen, name); + + c = c % count; + + return c; +} + +static inline unsigned int +mea_all_chars_hash(unsigned int count, const char *name, int namelen) +{ + unsigned int c = 0; + + while (--namelen >= 0) + c += name[namelen]; + + c = c % count; + + return c; +} + +static inline int lmv_stripe_md_size(int stripe_count) +{ + struct lmv_stripe_md *lsm; + + return sizeof(*lsm) + stripe_count * sizeof(lsm->lsm_md_oinfo[0]); +} + +int raw_name2idx(int hashtype, int count, const char *name, int namelen); + struct lmv_tgt_desc *lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data, struct lu_fid *fid); diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 888a86f..2d24c6e 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -54,11 +54,49 @@ #include #include #include +#include #include +#include +#include #include #include #include "lmv_internal.h" +int raw_name2idx(int hashtype, int count, const char *name, int namelen) +{ + unsigned int c = 0; + int idx; + + LASSERT(namelen > 0); + + if (filename_is_volatile(name, namelen, &idx)) { + if (idx >= 0 && idx < count) + return idx; + goto choose_hash; + } + + if (count <= 1) + return 0; + +choose_hash: + switch (hashtype) { + case MEA_MAGIC_LAST_CHAR: + c = mea_last_char_hash(count, name, namelen); + break; + case MEA_MAGIC_ALL_CHARS: + c = mea_all_chars_hash(count, name, namelen); + break; + case MEA_MAGIC_HASH_SEGMENT: + CERROR("Unsupported hash type MEA_MAGIC_HASH_SEGMENT\n"); + break; + default: + CERROR("Unknown hash type 0x%x\n", hashtype); + } + + LASSERT(c < count); + return c; +} + static void lmv_activate_target(struct lmv_obd *lmv, struct lmv_tgt_desc *tgt, int activate) @@ -605,13 +643,13 @@ int lmv_check_connect(struct obd_device *obd) GOTO(out_disc, rc); } - lmv_set_timeouts(obd); - class_export_put(lmv->exp); - lmv->connected = 1; - easize = lmv_get_easize(lmv); - lmv_init_ea_size(obd->obd_self_export, easize, 0, 0); - lmv_init_unlock(lmv); - RETURN(0); + lmv_set_timeouts(obd); + class_export_put(lmv->exp); + lmv->connected = 1; + easize = lmv_mds_md_size(lmv->desc.ld_tgt_count, LMV_MAGIC); + lmv_init_ea_size(obd->obd_self_export, easize, 0, 0); + lmv_init_unlock(lmv); + RETURN(0); out_disc: while (i-- > 0) { @@ -2248,43 +2286,76 @@ static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs) #define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0) #endif /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */ -static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data, - struct page **pages, struct ptlrpc_request **request) +#define NORMAL_MAX_STRIPES 4 +int lmv_read_entry(struct obd_export *exp, struct md_op_data *op_data, + struct md_callback *cb_op, struct lu_dirent **ldp) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - __u64 offset = op_data->op_offset; + struct lmv_stripe_md *lsm = op_data->op_mea1; + struct lu_dirent *tmp_ents[NORMAL_MAX_STRIPES]; + struct lu_dirent **ents = NULL; + int stripe_count; + __u64 min_hash; + int min_idx = 0; + int i; int rc; - int ncfspgs; /* pages read in PAGE_CACHE_SIZE */ - int nlupgs; /* pages read in LU_PAGE_SIZE */ - struct lmv_tgt_desc *tgt; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n", - offset, PFID(&op_data->op_fid1)); + if (lsm == NULL) + stripe_count = 1; + else + stripe_count = lsm->lsm_md_stripe_count; - tgt = lmv_find_target(lmv, &op_data->op_fid1); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); + if (stripe_count > NORMAL_MAX_STRIPES) { + OBD_ALLOC(ents, sizeof(ents[0]) * stripe_count); + if (ents == NULL) + GOTO(out, rc = -ENOMEM); + } else { + ents = tmp_ents; + memset(ents, 0, sizeof(ents[0]) * stripe_count); + } - rc = md_readpage(tgt->ltd_exp, op_data, pages, request); - if (rc != 0) - RETURN(rc); + min_hash = MDS_DIR_END_OFF; + for (i = 0; i < stripe_count; i++) { + struct lmv_tgt_desc *tgt; + + if (likely(lsm == NULL)) { + tgt = lmv_find_target(lmv, &op_data->op_fid1); + if (IS_ERR(tgt)) + GOTO(out, rc = PTR_ERR(tgt)); + LASSERT(op_data->op_data != NULL); + } else { + tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds); + if (IS_ERR(tgt)) + GOTO(out, rc = PTR_ERR(tgt)); + op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid; + op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid; + op_data->op_stripe_offset = i; + } - ncfspgs = ((*request)->rq_bulk->bd_nob_transferred + - PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; - nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT; - LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK)); - LASSERT(ncfspgs > 0 && ncfspgs <= op_data->op_npages); + rc = md_read_entry(tgt->ltd_exp, op_data, cb_op, &ents[i]); + if (rc != 0) + GOTO(out, rc); - CDEBUG(D_INODE, "read %d(%d)/%d pages\n", ncfspgs, nlupgs, - op_data->op_npages); + if (ents[i] != NULL && + le64_to_cpu(ents[i]->lde_hash) <= min_hash) { + min_hash = le64_to_cpu(ents[i]->lde_hash); + min_idx = i; + } + } - lmv_adjust_dirpages(pages, ncfspgs, nlupgs); + if (min_hash != MDS_DIR_END_OFF) + *ldp = ents[min_idx]; + else + *ldp = NULL; +out: + if (stripe_count > NORMAL_MAX_STRIPES && ents != NULL) + OBD_FREE(ents, sizeof(ents[0]) * stripe_count); RETURN(rc); } @@ -2498,108 +2569,241 @@ int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp, RETURN(-EINVAL); } -int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, - struct lov_stripe_md *lsm) +static int lmv_pack_md_v1(const struct lmv_stripe_md *lsm, + struct lmv_mds_md_v1 *lmm1) { - struct obd_device *obd = class_exp2obd(exp); - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_stripe_md *meap; - struct lmv_stripe_md *lsmp; - int mea_size; - __u32 i; - ENTRY; + int cplen; + int i; - mea_size = lmv_get_easize(lmv); - if (!lmmp) - RETURN(mea_size); + lmm1->lmv_magic = cpu_to_le32(lsm->lsm_md_magic); + lmm1->lmv_stripe_count = cpu_to_le32(lsm->lsm_md_stripe_count); + lmm1->lmv_master_mdt_index = cpu_to_le32(lsm->lsm_md_master_mdt_index); + lmm1->lmv_hash_type = cpu_to_le32(lsm->lsm_md_hash_type); + cplen = strlcpy(lmm1->lmv_pool_name, lsm->lsm_md_pool_name, + sizeof(lmm1->lmv_pool_name)); + if (cplen >= sizeof(lmm1->lmv_pool_name)) + return -E2BIG; - if (*lmmp && !lsm) { - OBD_FREE_LARGE(*lmmp, mea_size); - *lmmp = NULL; - RETURN(0); - } + for (i = 0; i < lsm->lsm_md_stripe_count; i++) + fid_cpu_to_le(&lmm1->lmv_stripe_fids[i], + &lsm->lsm_md_oinfo[i].lmo_fid); + return 0; +} - if (*lmmp == NULL) { - OBD_ALLOC_LARGE(*lmmp, mea_size); - if (*lmmp == NULL) - RETURN(-ENOMEM); - } +int lmv_pack_md(union lmv_mds_md **lmmp, const struct lmv_stripe_md *lsm, + int stripe_count) +{ + int lmm_size = 0; + bool allocated = false; + int rc = 0; + ENTRY; - if (!lsm) - RETURN(mea_size); + LASSERT(lmmp != NULL); + /* Free lmm */ + if (*lmmp != NULL && lsm == NULL) { + int stripe_count; - lsmp = (struct lmv_stripe_md *)lsm; - meap = (struct lmv_stripe_md *)*lmmp; + stripe_count = lmv_mds_md_stripe_count_get(*lmmp); + lmm_size = lmv_mds_md_size(stripe_count, + le32_to_cpu((*lmmp)->lmv_magic)); + if (lmm_size == 0) + RETURN(-EINVAL); + OBD_FREE(*lmmp, lmm_size); + *lmmp = NULL; + RETURN(0); + } - if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR && - lsmp->mea_magic != MEA_MAGIC_ALL_CHARS) - RETURN(-EINVAL); + /* Alloc lmm */ + if (*lmmp == NULL && lsm == NULL) { + lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC); + LASSERT(lmm_size > 0); + OBD_ALLOC(*lmmp, lmm_size); + if (*lmmp == NULL) + RETURN(-ENOMEM); + lmv_mds_md_stripe_count_set(*lmmp, stripe_count); + (*lmmp)->lmv_magic = cpu_to_le32(LMV_MAGIC); + RETURN(lmm_size); + } - meap->mea_magic = cpu_to_le32(lsmp->mea_magic); - meap->mea_count = cpu_to_le32(lsmp->mea_count); - meap->mea_master = cpu_to_le32(lsmp->mea_master); + /* pack lmm */ + LASSERT(lsm != NULL); + lmm_size = lmv_mds_md_size(lsm->lsm_md_stripe_count, lsm->lsm_md_magic); + if (*lmmp == NULL) { + OBD_ALLOC(*lmmp, lmm_size); + if (*lmmp == NULL) + RETURN(-ENOMEM); + allocated = true; + } - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - meap->mea_ids[i] = lsmp->mea_ids[i]; - fid_cpu_to_le(&meap->mea_ids[i], &lsmp->mea_ids[i]); + switch (lsm->lsm_md_magic) { + case LMV_MAGIC_V1: + rc = lmv_pack_md_v1(lsm, &(*lmmp)->lmv_md_v1); + break; + default: + rc = -EINVAL; + break; } - RETURN(mea_size); + if (rc != 0 && allocated) { + OBD_FREE(*lmmp, lmm_size); + *lmmp = NULL; + } + + RETURN(lmm_size); } +EXPORT_SYMBOL(lmv_pack_md); -int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, - struct lov_mds_md *lmm, int lmm_size) +static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm, + const struct lmv_mds_md_v1 *lmm1) { - struct obd_device *obd = class_exp2obd(exp); - struct lmv_stripe_md **tmea = (struct lmv_stripe_md **)lsmp; - struct lmv_stripe_md *mea = (struct lmv_stripe_md *)lmm; - struct lmv_obd *lmv = &obd->u.lmv; - int mea_size; - __u32 i; - __u32 magic; - ENTRY; + struct lmv_obd *lmv = &exp->exp_obd->u.lmv; + int stripe_count; + int cplen; + int i; + int rc = 0; + ENTRY; - mea_size = lmv_get_easize(lmv); - if (lsmp == NULL) - return mea_size; + lsm->lsm_md_magic = le32_to_cpu(lmm1->lmv_magic); + lsm->lsm_md_stripe_count = le32_to_cpu(lmm1->lmv_stripe_count); + lsm->lsm_md_master_mdt_index = le32_to_cpu(lmm1->lmv_master_mdt_index); + lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type); + lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version); + cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name, + sizeof(lsm->lsm_md_pool_name)); + + if (cplen >= sizeof(lsm->lsm_md_pool_name)) + RETURN(-E2BIG); + + CDEBUG(D_INFO, "unpack lsm count %d, master %d hash_type %d" + "layout_version %d\n", lsm->lsm_md_stripe_count, + lsm->lsm_md_master_mdt_index, lsm->lsm_md_hash_type, + lsm->lsm_md_layout_version); + + stripe_count = le32_to_cpu(lmm1->lmv_stripe_count); + for (i = 0; i < le32_to_cpu(stripe_count); i++) { + fid_le_to_cpu(&lsm->lsm_md_oinfo[i].lmo_fid, + &lmm1->lmv_stripe_fids[i]); + rc = lmv_fld_lookup(lmv, &lsm->lsm_md_oinfo[i].lmo_fid, + &lsm->lsm_md_oinfo[i].lmo_mds); + if (rc != 0) + RETURN(rc); + CDEBUG(D_INFO, "unpack fid #%d "DFID"\n", i, + PFID(&lsm->lsm_md_oinfo[i].lmo_fid)); + } - if (*lsmp != NULL && lmm == NULL) { - OBD_FREE_LARGE(*tmea, mea_size); - *lsmp = NULL; - RETURN(0); - } + RETURN(rc); +} - LASSERT(mea_size == lmm_size); +int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp, + const union lmv_mds_md *lmm, int stripe_count) +{ + struct lmv_stripe_md *lsm; + int lsm_size; + int rc; + bool allocated = false; + ENTRY; - OBD_ALLOC_LARGE(*tmea, mea_size); - if (*tmea == NULL) - RETURN(-ENOMEM); + LASSERT(lsmp != NULL); - if (!lmm) - RETURN(mea_size); + lsm = *lsmp; + /* Free memmd */ + if (lsm != NULL && lmm == NULL) { +#ifdef __KERNEL__ + int i; + for (i = 1; i < lsm->lsm_md_stripe_count; i++) { + if (lsm->lsm_md_oinfo[i].lmo_root != NULL) + iput(lsm->lsm_md_oinfo[i].lmo_root); + } +#endif + lsm_size = lmv_stripe_md_size(lsm->lsm_md_stripe_count); + OBD_FREE(lsm, lsm_size); + *lsmp = NULL; + RETURN(0); + } - if (mea->mea_magic == MEA_MAGIC_LAST_CHAR || - mea->mea_magic == MEA_MAGIC_ALL_CHARS || - mea->mea_magic == MEA_MAGIC_HASH_SEGMENT) - { - magic = le32_to_cpu(mea->mea_magic); - } else { - /* - * Old mea is not handled here. - */ - CERROR("Old not supportable EA is found\n"); - LBUG(); - } + /* Alloc memmd */ + if (lsm == NULL && lmm == NULL) { + lsm_size = lmv_stripe_md_size(stripe_count); + OBD_ALLOC(lsm, lsm_size); + if (lsm == NULL) + RETURN(-ENOMEM); + lsm->lsm_md_stripe_count = stripe_count; + *lsmp = lsm; + RETURN(0); + } - (*tmea)->mea_magic = magic; - (*tmea)->mea_count = le32_to_cpu(mea->mea_count); - (*tmea)->mea_master = le32_to_cpu(mea->mea_master); + /* Unpack memmd */ + if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1) { + CERROR("%s: invalid magic %x.\n", exp->exp_obd->obd_name, + le32_to_cpu(lmm->lmv_magic)); + RETURN(-EINVAL); + } - for (i = 0; i < (*tmea)->mea_count; i++) { - (*tmea)->mea_ids[i] = mea->mea_ids[i]; - fid_le_to_cpu(&(*tmea)->mea_ids[i], &(*tmea)->mea_ids[i]); - } - RETURN(mea_size); + lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm)); + if (lsm == NULL) { + OBD_ALLOC(lsm, lsm_size); + if (lsm == NULL) + RETURN(-ENOMEM); + allocated = true; + *lsmp = lsm; + } + + switch (le32_to_cpu(lmm->lmv_magic)) { + case LMV_MAGIC_V1: + rc = lmv_unpack_md_v1(exp, lsm, &lmm->lmv_md_v1); + break; + default: + CERROR("%s: unrecognized magic %x\n", exp->exp_obd->obd_name, + le32_to_cpu(lmm->lmv_magic)); + rc = -EINVAL; + break; + } + + if (rc != 0 && allocated) { + OBD_FREE(lsm, lsm_size); + *lsmp = NULL; + lsm_size = rc; + } + RETURN(lsm_size); +} + +int lmv_alloc_memmd(struct lmv_stripe_md **lsmp, int stripes) +{ + return lmv_unpack_md(NULL, lsmp, NULL, stripes); +} +EXPORT_SYMBOL(lmv_alloc_memmd); + +void lmv_free_memmd(struct lmv_stripe_md *lsm) +{ + lmv_unpack_md(NULL, &lsm, NULL, 0); +} +EXPORT_SYMBOL(lmv_free_memmd); + +int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, + struct lov_mds_md *lmm, int disk_len) +{ + return lmv_unpack_md(exp, (struct lmv_stripe_md **)lsmp, + (union lmv_mds_md *)lmm, disk_len); +} + +int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, + struct lov_stripe_md *lsm) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv_obd = &obd->u.lmv; + const struct lmv_stripe_md *lmv = (struct lmv_stripe_md *)lsm; + int stripe_count; + + if (lmmp == NULL) { + if (lsm != NULL) + stripe_count = lmv->lsm_md_stripe_count; + else + stripe_count = lmv_obd->desc.ld_tgt_count; + + return lmv_mds_md_size(stripe_count, LMV_MAGIC_V1); + } + + return lmv_pack_md((union lmv_mds_md **)lmmp, lmv, 0); } static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, @@ -2681,12 +2885,13 @@ int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, struct obd_export *dt_exp, struct obd_export *md_exp, struct lustre_md *md) { - struct lmv_obd *lmv = &exp->exp_obd->u.lmv; + struct lmv_obd *lmv = &exp->exp_obd->u.lmv; struct lmv_tgt_desc *tgt = lmv->tgts[0]; if (tgt == NULL || tgt->ltd_exp == NULL) RETURN(-EINVAL); - return md_get_lustre_md(tgt->ltd_exp, req, dt_exp, md_exp, md); + + return md_get_lustre_md(lmv->tgts[0]->ltd_exp, req, dt_exp, md_exp, md); } int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md) @@ -2696,11 +2901,11 @@ int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md) struct lmv_tgt_desc *tgt = lmv->tgts[0]; ENTRY; - if (md->mea) - obd_free_memmd(exp, (void *)&md->mea); + if (md->lmv != NULL) + lmv_free_memmd(md->lmv); if (tgt == NULL || tgt->ltd_exp == NULL) RETURN(-EINVAL); - RETURN(md_free_lustre_md(tgt->ltd_exp, md)); + RETURN(md_free_lustre_md(lmv->tgts[0]->ltd_exp, md)); } int lmv_set_open_replay_data(struct obd_export *exp, @@ -2949,7 +3154,7 @@ struct md_ops lmv_md_ops = { .m_setattr = lmv_setattr, .m_setxattr = lmv_setxattr, .m_fsync = lmv_fsync, - .m_readpage = lmv_readpage, + .m_read_entry = lmv_read_entry, .m_unlink = lmv_unlink, .m_init_ea_size = lmv_init_ea_size, .m_cancel_unused = lmv_cancel_unused, diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index c87d86f..cc2816b 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -173,4 +173,17 @@ static inline int mdc_prep_elc_req(struct obd_export *exp, count); } +static inline unsigned long hash_x_index(__u64 hash, int hash64) +{ +#ifdef __KERNEL__ + if (BITS_PER_LONG == 32 && hash64) + hash >>= 32; +#endif + /* save hash 0 as index 0 because otherwise we'll save it at + * page index end (~0UL) and it causes truncate_inode_pages_range() + * to loop forever. + */ + return ~0UL - (hash + !hash); +} + #endif diff --git a/lustre/mdc/mdc_lib.c b/lustre/mdc/mdc_lib.c index e195e1d..544e816 100644 --- a/lustre/mdc/mdc_lib.c +++ b/lustre/mdc/mdc_lib.c @@ -41,6 +41,9 @@ #endif #include #include +#include +#include +#include #include "mdc_internal.h" #ifndef __KERNEL__ diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 63cf4e3..a54cf07 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -971,6 +971,9 @@ static int mdc_finish_intent_lock(struct obd_export *exp, LASSERT(request != LP_POISON); LASSERT(request->rq_repmsg != LP_POISON); + if (it->it_op & IT_READDIR) + RETURN(0); + if (!it_disposition(it, DISP_IT_EXECD)) { /* The server failed before it even started executing the * intent, i.e. because it couldn't unpack the request. */ @@ -1101,6 +1104,9 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM; break; + case IT_READDIR: + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + break; case IT_LAYOUT: policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT; break; @@ -1176,19 +1182,19 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags); - lockh.cookie = 0; - if (fid_is_sane(&op_data->op_fid2) && - (it->it_op & (IT_LOOKUP | IT_GETATTR))) { - /* We could just return 1 immediately, but since we should only - * be called in revalidate_it if we already have a lock, let's - * verify that. */ - it->d.lustre.it_lock_handle = 0; - rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL); - /* Only return failure if it was not GETATTR by cfid - (from inode_revalidate) */ - if (rc || op_data->op_namelen != 0) - RETURN(rc); - } + lockh.cookie = 0; + if (fid_is_sane(&op_data->op_fid2) && + (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) { + /* We could just return 1 immediately, but since we should only + * be called in revalidate_it if we already have a lock, let's + * verify that. */ + it->d.lustre.it_lock_handle = 0; + rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL); + /* Only return failure if it was not GETATTR by cfid + (from inode_revalidate) */ + if (rc || op_data->op_namelen != 0) + RETURN(rc); + } /* For case if upper layer did not alloc fid, do it now. */ if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) { diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 4288f0e..fcc2217 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -48,10 +48,13 @@ #include #include +#include #include #include #include #include +#include +#include #include "mdc_internal.h" @@ -569,25 +572,25 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, "but eadatasize 0\n"); RETURN(-EPROTO); } - if (md->body->valid & OBD_MD_MEA) { - lmvsize = md->body->eadatasize; - lmv = req_capsule_server_sized_get(pill, &RMF_MDT_MD, - lmvsize); - if (!lmv) - GOTO(out, rc = -EPROTO); - - rc = obd_unpackmd(md_exp, (void *)&md->mea, lmv, - lmvsize); - if (rc < 0) - GOTO(out, rc); - - if (rc < sizeof(*md->mea)) { - CDEBUG(D_INFO, "size too small: " - "rc < sizeof(*md->mea) (%d < %d)\n", - rc, (int)sizeof(*md->mea)); - GOTO(out, rc = -EPROTO); - } - } + if (md->body->valid & OBD_MD_MEA) { + lmvsize = md->body->eadatasize; + lmv = req_capsule_server_sized_get(pill, &RMF_MDT_MD, + lmvsize); + if (!lmv) + GOTO(out, rc = -EPROTO); + + rc = obd_unpackmd(md_exp, (void *)&md->lmv, lmv, + lmvsize); + if (rc < 0) + GOTO(out, rc); + + if (rc < sizeof(*md->lmv)) { + CDEBUG(D_INFO, "size too small: " + "rc < sizeof(*md->lmv) (%d < %d)\n", + rc, (int)sizeof(*md->lmv)); + GOTO(out, rc = -EPROTO); + } + } } rc = 0; @@ -1094,8 +1097,10 @@ out: EXPORT_SYMBOL(mdc_sendpage); #endif -int mdc_readpage(struct obd_export *exp, struct md_op_data *op_data, - struct page **pages, struct ptlrpc_request **request) +static int mdc_getpage(struct obd_export *exp, const struct lu_fid *fid, + __u64 offset, struct obd_capa *oc, + struct page **pages, int npages, + struct ptlrpc_request **request) { struct ptlrpc_request *req; struct ptlrpc_bulk_desc *desc; @@ -1110,73 +1115,666 @@ int mdc_readpage(struct obd_export *exp, struct md_op_data *op_data, init_waitqueue_head(&waitq); restart_bulk: - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE); - if (req == NULL) - RETURN(-ENOMEM); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE); + if (req == NULL) + RETURN(-ENOMEM); - mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + mdc_set_capa_size(req, &RMF_CAPA1, oc); - rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_READPAGE); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_READPAGE); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - req->rq_request_portal = MDS_READPAGE_PORTAL; - ptlrpc_at_set_req_timeout(req); + req->rq_request_portal = MDS_READPAGE_PORTAL; + ptlrpc_at_set_req_timeout(req); - desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, 1, BULK_PUT_SINK, + desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK, MDS_BULK_PORTAL); - if (desc == NULL) { - ptlrpc_request_free(req); - RETURN(-ENOMEM); - } + if (desc == NULL) { + ptlrpc_request_free(req); + RETURN(-ENOMEM); + } - /* NB req now owns desc and will free it when it gets freed */ - for (i = 0; i < op_data->op_npages; i++) + /* NB req now owns desc and will free it when it gets freed */ + for (i = 0; i < npages; i++) ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE); - mdc_readdir_pack(req, op_data->op_offset, - PAGE_CACHE_SIZE * op_data->op_npages, - &op_data->op_fid1, op_data->op_capa1); + mdc_readdir_pack(req, offset, PAGE_CACHE_SIZE * npages, fid, oc); - ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) { - ptlrpc_req_finished(req); - if (rc != -ETIMEDOUT) - RETURN(rc); + ptlrpc_request_set_replen(req); + rc = ptlrpc_queue_wait(req); + if (rc) { + ptlrpc_req_finished(req); + if (rc != -ETIMEDOUT) + RETURN(rc); - resends++; - if (!client_should_resend(resends, &exp->exp_obd->u.cli)) { - CERROR("too many resend retries, returning error\n"); - RETURN(-EIO); - } - lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL); - l_wait_event(waitq, 0, &lwi); + resends++; + if (!client_should_resend(resends, &exp->exp_obd->u.cli)) { + CERROR("%s: too many resend retries: rc = %d\n", + exp->exp_obd->obd_name, -EIO); + RETURN(-EIO); + } + lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, + NULL); + l_wait_event(waitq, 0, &lwi); - goto restart_bulk; - } + goto restart_bulk; + } - rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, - req->rq_bulk->bd_nob_transferred); - if (rc < 0) { - ptlrpc_req_finished(req); - RETURN(rc); - } + rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, + req->rq_bulk->bd_nob_transferred); + if (rc < 0) { + ptlrpc_req_finished(req); + RETURN(rc); + } - if (req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK) { - CERROR("Unexpected # bytes transferred: %d (%ld expected)\n", - req->rq_bulk->bd_nob_transferred, - PAGE_CACHE_SIZE * op_data->op_npages); - ptlrpc_req_finished(req); - RETURN(-EPROTO); - } + if (req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK) { + CERROR("%s: unexpected bytes transferred: %d (%ld expected)\n", + exp->exp_obd->obd_name, req->rq_bulk->bd_nob_transferred, + PAGE_CACHE_SIZE * npages); + ptlrpc_req_finished(req); + RETURN(-EPROTO); + } - *request = req; - RETURN(0); + *request = req; + RETURN(0); } +#ifdef __KERNEL__ +static void mdc_release_page(struct page *page, int remove) +{ + kunmap(page); + if (remove) { + lock_page(page); + if (likely(page->mapping != NULL)) + truncate_complete_page(page->mapping, page); + unlock_page(page); + } + page_cache_release(page); +} + +static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash, + __u64 *start, __u64 *end, int hash64) +{ + /* + * Complement of hash is used as an index so that + * radix_tree_gang_lookup() can be used to find a page with starting + * hash _smaller_ than one we are looking for. + */ + unsigned long offset = hash_x_index(*hash, hash64); + struct page *page; + int found; + + spin_lock_irq(&mapping->tree_lock); + found = radix_tree_gang_lookup(&mapping->page_tree, + (void **)&page, offset, 1); + if (found > 0) { + struct lu_dirpage *dp; + + page_cache_get(page); + spin_unlock_irq(&mapping->tree_lock); + /* + * In contrast to find_lock_page() we are sure that directory + * page cannot be truncated (while DLM lock is held) and, + * hence, can avoid restart. + * + * In fact, page cannot be locked here at all, because + * mdc_read_page_remote does synchronous io. + */ + wait_on_page_locked(page); + if (PageUptodate(page)) { + dp = kmap(page); + if (BITS_PER_LONG == 32 && hash64) { + *start = le64_to_cpu(dp->ldp_hash_start) >> 32; + *end = le64_to_cpu(dp->ldp_hash_end) >> 32; + *hash = *hash >> 32; + } else { + *start = le64_to_cpu(dp->ldp_hash_start); + *end = le64_to_cpu(dp->ldp_hash_end); + } + LASSERTF(*start <= *hash, "start = "LPX64",end = " + LPX64",hash = "LPX64"\n", *start, *end, *hash); + CDEBUG(D_VFSTRACE, "page%lu [%llu %llu], hash"LPU64"\n", + offset, *start, *end, *hash); + if (*hash > *end) { + mdc_release_page(page, 0); + page = NULL; + } else if (*end != *start && *hash == *end) { + /* + * upon hash collision, remove this page, + * otherwise put page reference, and + * ll_get_dir_page() will issue RPC to fetch + * the page we want. + */ + mdc_release_page(page, + le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); + page = NULL; + } + } else { + page_cache_release(page); + page = ERR_PTR(-EIO); + } + } else { + spin_unlock_irq(&mapping->tree_lock); + page = NULL; + } + return page; +} + +/* + * Adjust a set of pages, each page containing an array of lu_dirpages, + * so that each page can be used as a single logical lu_dirpage. + * + * A lu_dirpage is laid out as follows, where s = ldp_hash_start, + * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a + * struct lu_dirent. It has size up to LU_PAGE_SIZE. The ldp_hash_end + * value is used as a cookie to request the next lu_dirpage in a + * directory listing that spans multiple pages (two in this example): + * ________ + * | | + * .|--------v------- -----. + * |s|e|f|p|ent|ent| ... |ent| + * '--|-------------- -----' Each CFS_PAGE contains a single + * '------. lu_dirpage. + * .---------v------- -----. + * |s|e|f|p|ent| 0 | ... | 0 | + * '----------------- -----' + * + * However, on hosts where the native VM page size (PAGE_CACHE_SIZE) is + * larger than LU_PAGE_SIZE, a single host page may contain multiple + * lu_dirpages. After reading the lu_dirpages from the MDS, the + * ldp_hash_end of the first lu_dirpage refers to the one immediately + * after it in the same CFS_PAGE (arrows simplified for brevity, but + * in general e0==s1, e1==s2, etc.): + * + * .-------------------- -----. + * |s0|e0|f0|p|ent|ent| ... |ent| + * |---v---------------- -----| + * |s1|e1|f1|p|ent|ent| ... |ent| + * |---v---------------- -----| Here, each CFS_PAGE contains + * ... multiple lu_dirpages. + * |---v---------------- -----| + * |s'|e'|f'|p|ent|ent| ... |ent| + * '---|---------------- -----' + * v + * .----------------------------. + * | next CFS_PAGE | + * + * This structure is transformed into a single logical lu_dirpage as follows: + * + * - Replace e0 with e' so the request for the next lu_dirpage gets the page + * labeled 'next CFS_PAGE'. + * + * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether + * a hash collision with the next page exists. + * + * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span + * to the first entry of the next lu_dirpage. + */ +#if PAGE_CACHE_SIZE > LU_PAGE_SIZE +static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs) +{ + int i; + + for (i = 0; i < cfs_pgs; i++) { + struct lu_dirpage *dp = kmap(pages[i]); + struct lu_dirpage *first = dp; + struct lu_dirent *end_dirent = NULL; + struct lu_dirent *ent; + __u64 hash_end = dp->ldp_hash_end; + __u32 flags = dp->ldp_flags; + + while (--lu_pgs > 0) { + ent = lu_dirent_start(dp); + for (end_dirent = ent; ent != NULL; + end_dirent = ent, ent = lu_dirent_next(ent)); + + /* Advance dp to next lu_dirpage. */ + dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE); + + /* Check if we've reached the end of the CFS_PAGE. */ + if (!((unsigned long)dp & ~CFS_PAGE_MASK)) + break; + + /* Save the hash and flags of this lu_dirpage. */ + hash_end = dp->ldp_hash_end; + flags = dp->ldp_flags; + + /* Check if lu_dirpage contains no entries. */ + if (end_dirent == NULL) + break; + + /* Enlarge the end entry lde_reclen from 0 to + * first entry of next lu_dirpage. */ + LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0); + end_dirent->lde_reclen = + cpu_to_le16((char *)(dp->ldp_entries) - + (char *)end_dirent); + } + + first->ldp_hash_end = hash_end; + first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE); + first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE); + + kunmap(pages[i]); + } + LASSERTF(lu_pgs == 0, "left = %d", lu_pgs); +} +#else +#define mdc_adjust_dirpages(pages, cfs_pgs, lu_pgs) do {} while (0) +#endif /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */ + +/* parameters for readdir page */ +struct readpage_param { + struct md_op_data *rp_mod; + __u64 rp_off; + int rp_hash64; + struct obd_export *rp_exp; + struct md_callback *rp_cb; +}; + +/** + * Read pages from server. + * + * Page in MDS_READPAGE RPC is packed in LU_PAGE_SIZE, and each page contains + * a header lu_dirpage which describes the start/end hash, and whether this + * page is empty (contains no dir entry) or hash collide with next page. + * After client receives reply, several pages will be integrated into dir page + * in CFS_PAGE_SIZE (if CFS_PAGE_SIZE greater than LU_PAGE_SIZE), and the + * lu_dirpage for this integrated page will be adjusted. + **/ +static int mdc_read_page_remote(void *data, struct page *page0) +{ + struct readpage_param *rp = data; + struct page **page_pool; + struct page *page; + struct lu_dirpage *dp; + int rd_pgs = 0; /* number of pages read actually */ + int npages; + struct md_op_data *op_data = rp->rp_mod; + struct ptlrpc_request *req; + int max_pages = op_data->op_max_pages; + struct inode *inode; + struct lu_fid *fid; + int i; + int rc; + ENTRY; + + LASSERT(max_pages > 0 && max_pages <= PTLRPC_MAX_BRW_PAGES); + if (op_data->op_mea1 != NULL) { + __u32 index = op_data->op_stripe_offset; + + inode = op_data->op_mea1->lsm_md_oinfo[index].lmo_root; + fid = &op_data->op_mea1->lsm_md_oinfo[index].lmo_fid; + } else { + inode = op_data->op_data; + fid = &op_data->op_fid1; + } + LASSERT(inode != NULL); + + OBD_ALLOC(page_pool, sizeof(page_pool[0]) * max_pages); + if (page_pool != NULL) { + page_pool[0] = page0; + } else { + page_pool = &page0; + max_pages = 1; + } + + for (npages = 1; npages < max_pages; npages++) { + page = page_cache_alloc_cold(inode->i_mapping); + if (page == NULL) + break; + page_pool[npages] = page; + } + + rc = mdc_getpage(rp->rp_exp, fid, rp->rp_off, op_data->op_capa1, + page_pool, npages, &req); + if (rc == 0) { + int lu_pgs; + + rd_pgs = (req->rq_bulk->bd_nob_transferred + + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; + lu_pgs = req->rq_bulk->bd_nob_transferred >> + LU_PAGE_SHIFT; + LASSERT(!(req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK)); + + CDEBUG(D_INODE, "read %d(%d)/%d pages\n", rd_pgs, lu_pgs, + op_data->op_npages); + + mdc_adjust_dirpages(page_pool, rd_pgs, lu_pgs); + + SetPageUptodate(page0); + } + + unlock_page(page0); + ptlrpc_req_finished(req); + CDEBUG(D_CACHE, "read %d/%d pages\n", rd_pgs, npages); + for (i = 1; i < npages; i++) { + unsigned long offset; + __u64 hash; + int ret; + + page = page_pool[i]; + + if (rc < 0 || i >= rd_pgs) { + page_cache_release(page); + continue; + } + + SetPageUptodate(page); + + dp = kmap(page); + hash = le64_to_cpu(dp->ldp_hash_start); + kunmap(page); + + offset = hash_x_index(hash, rp->rp_hash64); + + prefetchw(&page->flags); + ret = add_to_page_cache_lru(page, inode->i_mapping, offset, + GFP_KERNEL); + if (ret == 0) + unlock_page(page); + else + CDEBUG(D_VFSTRACE, "page %lu add to page cache failed:" + " rc = %d\n", offset, ret); + page_cache_release(page); + } + + if (page_pool != &page0) + OBD_FREE(page_pool, sizeof(page_pool[0]) * max_pages); + + RETURN(rc); +} + +/** + * Read dir page from cache first, if it can not find it, read it from + * server and add into the cache. + */ +static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, + struct md_callback *cb_op, struct page **ppage) +{ + struct lookup_intent it = { .it_op = IT_READDIR }; + struct page *page; + struct inode *dir = NULL; + struct address_space *mapping; + struct lu_dirpage *dp; + __u64 start = 0; + __u64 end = 0; + struct lustre_handle lockh; + struct ptlrpc_request *enq_req = NULL; + struct readpage_param rp_param; + int rc; + + ENTRY; + + *ppage = NULL; + + if (op_data->op_mea1 != NULL) { + __u32 index = op_data->op_stripe_offset; + + dir = op_data->op_mea1->lsm_md_oinfo[index].lmo_root; + } else { + dir = op_data->op_data; + } + LASSERT(dir != NULL); + + mapping = dir->i_mapping; + + rc = mdc_intent_lock(exp, op_data, NULL, 0, &it, 0, &enq_req, + cb_op->md_blocking_ast, 0); + if (enq_req != NULL) + ptlrpc_req_finished(enq_req); + + if (rc < 0) { + CERROR("%s: "DFID" lock enqueue fails: rc = %d\n", + exp->exp_obd->obd_name, PFID(&op_data->op_fid1), rc); + RETURN(rc); + } + + rc = 0; + mdc_set_lock_data(exp, &it.d.lustre.it_lock_handle, dir, NULL); + + rp_param.rp_off = op_data->op_hash_offset; + rp_param.rp_hash64 = op_data->op_cli_flags & CLI_HASH64; + page = mdc_page_locate(mapping, &rp_param.rp_off, &start, &end, + rp_param.rp_hash64); + if (IS_ERR(page)) { + CERROR("%s: dir page locate: "DFID" at "LPU64": rc %ld\n", + exp->exp_obd->obd_name, PFID(&op_data->op_fid1), + rp_param.rp_off, PTR_ERR(page)); + GOTO(out_unlock, rc = PTR_ERR(page)); + } else if (page != NULL) { + /* + * XXX nikita: not entirely correct handling of a corner case: + * suppose hash chain of entries with hash value HASH crosses + * border between pages P0 and P1. First both P0 and P1 are + * cached, seekdir() is called for some entry from the P0 part + * of the chain. Later P0 goes out of cache. telldir(HASH) + * happens and finds P1, as it starts with matching hash + * value. Remaining entries from P0 part of the chain are + * skipped. (Is that really a bug?) + * + * Possible solutions: 0. don't cache P1 is such case, handle + * it as an "overflow" page. 1. invalidate all pages at + * once. 2. use HASH|1 as an index for P1. + */ + GOTO(hash_collision, page); + } + + rp_param.rp_exp = exp; + rp_param.rp_mod = op_data; + page = read_cache_page(mapping, + hash_x_index(rp_param.rp_off, + rp_param.rp_hash64), + mdc_read_page_remote, &rp_param); + if (IS_ERR(page)) { + CERROR("%s: read cache page: "DFID" at "LPU64": rc %ld\n", + exp->exp_obd->obd_name, PFID(&op_data->op_fid1), + rp_param.rp_off, PTR_ERR(page)); + GOTO(out_unlock, rc = PTR_ERR(page)); + } + + wait_on_page_locked(page); + (void)kmap(page); + if (!PageUptodate(page)) { + CERROR("%s: page not updated: "DFID" at "LPU64": rc %d\n", + exp->exp_obd->obd_name, PFID(&op_data->op_fid1), + rp_param.rp_off, -5); + goto fail; + } + if (!PageChecked(page)) + SetPageChecked(page); + if (PageError(page)) { + CERROR("%s: page error: "DFID" at "LPU64": rc %d\n", + exp->exp_obd->obd_name, PFID(&op_data->op_fid1), + rp_param.rp_off, -5); + goto fail; + } + +hash_collision: + dp = page_address(page); + if (BITS_PER_LONG == 32 && rp_param.rp_hash64) { + start = le64_to_cpu(dp->ldp_hash_start) >> 32; + end = le64_to_cpu(dp->ldp_hash_end) >> 32; + rp_param.rp_off = op_data->op_hash_offset >> 32; + } else { + start = le64_to_cpu(dp->ldp_hash_start); + end = le64_to_cpu(dp->ldp_hash_end); + rp_param.rp_off = op_data->op_hash_offset; + } + if (end == start) { + LASSERT(start == rp_param.rp_off); + CWARN("Page-wide hash collision: %#lx\n", (unsigned long)end); +#if BITS_PER_LONG == 32 + CWARN("Real page-wide hash collision at ["LPU64" "LPU64"] with " + "hash "LPU64"\n", le64_to_cpu(dp->ldp_hash_start), + le64_to_cpu(dp->ldp_hash_end), op_data->op_hash_offset); +#endif + + /* + * Fetch whole overflow chain... + * + * XXX not yet. + */ + goto fail; + } + *ppage = page; +out_unlock: + lockh.cookie = it.d.lustre.it_lock_handle; + ldlm_lock_decref(&lockh, it.d.lustre.it_lock_mode); + it.d.lustre.it_lock_handle = 0; + return rc; +fail: + kunmap(page); + mdc_release_page(page, 1); + rc = -EIO; + goto out_unlock; +} + +/** + * Read one directory entry from the cache. + */ +int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data, + struct md_callback *cb_op, struct lu_dirent **entp) +{ + struct page *page = NULL; + struct lu_dirpage *dp; + struct lu_dirent *ent; + int rc = 0; + int index = 0; + ENTRY; + + if (op_data->op_hash_offset == MDS_DIR_END_OFF) { + *entp = NULL; + RETURN(0); + } + + rc = mdc_read_page(exp, op_data, cb_op, &page); + if (rc != 0) + RETURN(rc); + + if (op_data->op_cli_flags & CLI_READENT_END) { + mdc_release_page(page, 0); + RETURN(0); + } + + dp = kmap(page); + for (ent = lu_dirent_start(dp); ent != NULL; + ent = lu_dirent_next(ent)) { + index++; + if (ent->lde_hash > op_data->op_hash_offset) + break; + } + kunmap(page); + + /* If it can not find entry in current page, try next page. */ + if (ent == NULL) { + __u64 orig_offset = op_data->op_hash_offset; + + if (dp->ldp_hash_end == MDS_DIR_END_OFF) { + mdc_release_page(page, 0); + RETURN(0); + } + + op_data->op_hash_offset = dp->ldp_hash_end; + mdc_release_page(page, + le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); + rc = mdc_read_page(exp, op_data, cb_op, &page); + if (rc != 0) + RETURN(rc); + + if (page != NULL) { + dp = kmap(page); + ent = lu_dirent_start(dp); + kunmap(page); + } + + op_data->op_hash_offset = orig_offset; + } + + *entp = ent; + + RETURN(rc); +} + +#else /* __KERNEL__ */ + +static struct page +*mdc_read_page_remote(struct obd_export *exp, const struct lmv_oinfo *lmo, + const __u64 hash, struct obd_capa *oc) +{ + struct ptlrpc_request *req = NULL; + struct page *page; + int rc; + + OBD_PAGE_ALLOC(page, 0); + if (page == NULL) + return ERR_PTR(-ENOMEM); + + rc = mdc_getpage(exp, &lmo->lmo_fid, hash, oc, &page, 1, &req); + if (req != NULL) + ptlrpc_req_finished(req); + + if (unlikely(rc)) { + OBD_PAGE_FREE(page); + return ERR_PTR(rc); + } + return page; +} + + +static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, + struct md_callback *cb_op, + struct page **ppage) +{ + struct page *page; + struct lmv_oinfo *lmo; + int rc = 0; + + /* No local cache for liblustre, always read entry remotely */ + lmo = &op_data->op_mea1->lsm_md_oinfo[op_data->op_stripe_offset]; + page = mdc_read_page_remote(exp, lmo, op_data->op_hash_offset, + op_data->op_capa1); + if (IS_ERR(page)) + return PTR_ERR(page); + + *ppage = page; + + return rc; +} + +int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data, + struct md_callback *cb_op, struct lu_dirent **entp) +{ + struct page *page = NULL; + struct lu_dirpage *dp; + struct lu_dirent *ent; + int rc; + ENTRY; + + rc = mdc_read_page(exp, op_data, cb_op, &page); + if (rc != 0) + RETURN(rc); + + dp = page_address(page); + if (dp->ldp_hash_end < op_data->op_hash_offset) + GOTO(out, *entp = NULL); + + for (ent = lu_dirent_start(dp); ent != NULL; + ent = lu_dirent_next(ent)) + if (ent->lde_hash >= op_data->op_hash_offset) + break; + *entp = ent; +out: + + OBD_PAGE_FREE(page); + RETURN(rc); +} + +#endif + static int mdc_statfs(const struct lu_env *env, struct obd_export *exp, struct obd_statfs *osfs, __u64 max_age, __u32 flags) @@ -2767,8 +3365,8 @@ struct md_ops mdc_md_ops = { .m_setattr = mdc_setattr, .m_setxattr = mdc_setxattr, .m_getxattr = mdc_getxattr, - .m_fsync = mdc_fsync, - .m_readpage = mdc_readpage, + .m_fsync = mdc_fsync, + .m_read_entry = mdc_read_entry, .m_unlink = mdc_unlink, .m_cancel_unused = mdc_cancel_unused, .m_init_ea_size = mdc_init_ea_size, diff --git a/lustre/obdclass/Makefile.in b/lustre/obdclass/Makefile.in index 5cfbfac..0276c98 100644 --- a/lustre/obdclass/Makefile.in +++ b/lustre/obdclass/Makefile.in @@ -9,7 +9,7 @@ obdclass-all-objs := llog.o llog_cat.o llog_obd.o llog_swab.o llog_osd.o obdclass-all-objs += class_obd.o debug.o genops.o uuid.o llog_ioctl.o obdclass-all-objs += lprocfs_status.o lprocfs_counters.o obdclass-all-objs += lustre_handles.o lustre_peer.o local_storage.o -obdclass-all-objs += statfs_pack.o obdo.o obd_config.o obd_mount.o mea.o +obdclass-all-objs += statfs_pack.o obdo.o obd_config.o obd_mount.o obdclass-all-objs += lu_object.o dt_object.o capa.o obdclass-all-objs += cl_object.o cl_page.o cl_lock.o cl_io.o lu_ref.o obdclass-all-objs += acl.o idmap.o diff --git a/lustre/obdclass/autoMakefile.am b/lustre/obdclass/autoMakefile.am index 9eada75..717c89a 100644 --- a/lustre/obdclass/autoMakefile.am +++ b/lustre/obdclass/autoMakefile.am @@ -6,7 +6,7 @@ DIST_SUBDIRS = linux darwin if LIBLUSTRE noinst_LIBRARIES = liblustreclass.a -liblustreclass_a_SOURCES = class_obd.c debug.c genops.c statfs_pack.c mea.c uuid.c +liblustreclass_a_SOURCES = class_obd.c debug.c genops.c statfs_pack.c uuid.c liblustreclass_a_SOURCES += lustre_handles.c lustre_peer.c lprocfs_status.c liblustreclass_a_SOURCES += obdo.c obd_config.c llog.c llog_obd.c llog_cat.c liblustreclass_a_SOURCES += llog_swab.c capa.c lprocfs_counters.c @@ -37,7 +37,7 @@ obdclass_SOURCES = \ lustre_handles.c lustre_peer.c obd_config.c \ obdo.c debug.c llog_ioctl.c uuid.c \ llog_swab.c llog_obd.c llog.c llog_cat.c \ - mea.c lu_object.c dt_object.c lu_ref.c \ + lu_object.c dt_object.c lu_ref.c \ lprocfs_counters.c obdclass_CFLAGS := $(EXTRA_KCFLAGS) diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index 6e9423a..cbc48ff 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -2382,6 +2382,7 @@ void lprocfs_init_mps_stats(int num_private_stats, struct lprocfs_stats *stats) LPROCFS_MD_OP_INIT(num_private_stats, stats, setattr); LPROCFS_MD_OP_INIT(num_private_stats, stats, fsync); LPROCFS_MD_OP_INIT(num_private_stats, stats, readpage); + LPROCFS_MD_OP_INIT(num_private_stats, stats, read_entry); LPROCFS_MD_OP_INIT(num_private_stats, stats, unlink); LPROCFS_MD_OP_INIT(num_private_stats, stats, setxattr); LPROCFS_MD_OP_INIT(num_private_stats, stats, getxattr); diff --git a/lustre/obdclass/mea.c b/lustre/obdclass/mea.c deleted file mode 100644 index 2a1ee39..0000000 --- a/lustre/obdclass/mea.c +++ /dev/null @@ -1,117 +0,0 @@ -/* - * GPL HEADER START - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 only, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License version 2 for more details (a copy is included - * in the LICENSE file that accompanied this code). - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. - * - * GPL HEADER END - */ -/* - * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved. - * Use is subject to license terms. - */ -/* - * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. - */ - -#define DEBUG_SUBSYSTEM S_CLASS -#include -#ifdef __KERNEL__ -#include /* for request_module() */ -#include -#include -#include -#else -#include -#include -#endif -#include -#include - -static int mea_last_char_hash(int count, char *name, int namelen) -{ - unsigned int c; - - c = name[namelen - 1]; - if (c == 0) - CWARN("looks like wrong len is passed\n"); - c = c % count; - return c; -} - -static int mea_all_chars_hash(int count, char *name, int namelen) -{ - unsigned int c = 0; - - while (--namelen >= 0) - c += name[namelen]; - c = c % count; - return c; -} - -int raw_name2idx(int hashtype, int count, const char *name, int namelen) -{ - unsigned int c = 0; - int idx; - - LASSERT(namelen > 0); - - if (filename_is_volatile(name, namelen, &idx)) { - if ((idx >= 0) && (idx < count)) - return idx; - goto hashchoice; - } - - if (count <= 1) - return 0; - -hashchoice: - switch (hashtype) { - case MEA_MAGIC_LAST_CHAR: - c = mea_last_char_hash(count, (char *)name, namelen); - break; - case MEA_MAGIC_ALL_CHARS: - c = mea_all_chars_hash(count, (char *)name, namelen); - break; - case MEA_MAGIC_HASH_SEGMENT: - CERROR("Unsupported hash type MEA_MAGIC_HASH_SEGMENT\n"); - break; - default: - CERROR("Unknown hash type 0x%x\n", hashtype); - } - - LASSERT(c < count); - return c; -} -EXPORT_SYMBOL(raw_name2idx); - -int mea_name2idx(struct lmv_stripe_md *mea, const char *name, int namelen) -{ - unsigned int c; - - LASSERT(mea && mea->mea_count); - - c = raw_name2idx(mea->mea_magic, mea->mea_count, name, namelen); - - LASSERT(c < mea->mea_count); - return c; -} -EXPORT_SYMBOL(mea_name2idx); diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 4c91135..2d6d7bd 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -2119,14 +2119,6 @@ void lustre_swab_lmv_desc (struct lmv_desc *ld) /* uuid endian insensitive */ } -void lustre_swab_lmv_stripe_md (struct lmv_stripe_md *mea) -{ - __swab32s(&mea->mea_magic); - __swab32s(&mea->mea_count); - __swab32s(&mea->mea_master); - CLASSERT(offsetof(typeof(*mea), mea_padding) != 0); -} - void lustre_swab_lmv_user_md(struct lmv_user_md *lum) { int i; diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index c4cf29e..a85b05c 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -57,7 +57,6 @@ void lustre_assert_wire_constants(void) * running on Linux deva 2.6.32-358.18.1.el6_lustre.gdf685d2.x86_64 #1 SMP Sat Aug 31 20:41:4 * with gcc version 4.4.4 20100726 (Red Hat 4.4.4-13) (GCC) */ - /* Constants... */ LASSERTF(PTL_RPC_MSG_REQUEST == 4711, "found %lld\n", (long long)PTL_RPC_MSG_REQUEST); @@ -2896,35 +2895,6 @@ void lustre_assert_wire_constants(void) LASSERTF((int)sizeof(((struct lmv_desc *)0)->ld_uuid) == 40, "found %lld\n", (long long)(int)sizeof(((struct lmv_desc *)0)->ld_uuid)); - /* Checks for struct lmv_stripe_md */ - LASSERTF((int)sizeof(struct lmv_stripe_md) == 32, "found %lld\n", - (long long)(int)sizeof(struct lmv_stripe_md)); - LASSERTF((int)offsetof(struct lmv_stripe_md, mea_magic) == 0, "found %lld\n", - (long long)(int)offsetof(struct lmv_stripe_md, mea_magic)); - LASSERTF((int)sizeof(((struct lmv_stripe_md *)0)->mea_magic) == 4, "found %lld\n", - (long long)(int)sizeof(((struct lmv_stripe_md *)0)->mea_magic)); - LASSERTF((int)offsetof(struct lmv_stripe_md, mea_count) == 4, "found %lld\n", - (long long)(int)offsetof(struct lmv_stripe_md, mea_count)); - LASSERTF((int)sizeof(((struct lmv_stripe_md *)0)->mea_count) == 4, "found %lld\n", - (long long)(int)sizeof(((struct lmv_stripe_md *)0)->mea_count)); - LASSERTF((int)offsetof(struct lmv_stripe_md, mea_master) == 8, "found %lld\n", - (long long)(int)offsetof(struct lmv_stripe_md, mea_master)); - LASSERTF((int)sizeof(((struct lmv_stripe_md *)0)->mea_master) == 4, "found %lld\n", - (long long)(int)sizeof(((struct lmv_stripe_md *)0)->mea_master)); - LASSERTF((int)offsetof(struct lmv_stripe_md, mea_padding) == 12, "found %lld\n", - (long long)(int)offsetof(struct lmv_stripe_md, mea_padding)); - LASSERTF((int)sizeof(((struct lmv_stripe_md *)0)->mea_padding) == 4, "found %lld\n", - (long long)(int)sizeof(((struct lmv_stripe_md *)0)->mea_padding)); - CLASSERT(LOV_MAXPOOLNAME == 16); - LASSERTF((int)offsetof(struct lmv_stripe_md, mea_pool_name[16]) == 32, "found %lld\n", - (long long)(int)offsetof(struct lmv_stripe_md, mea_pool_name[16])); - LASSERTF((int)sizeof(((struct lmv_stripe_md *)0)->mea_pool_name[16]) == 1, "found %lld\n", - (long long)(int)sizeof(((struct lmv_stripe_md *)0)->mea_pool_name[16])); - LASSERTF((int)offsetof(struct lmv_stripe_md, mea_ids[0]) == 32, "found %lld\n", - (long long)(int)offsetof(struct lmv_stripe_md, mea_ids[0])); - LASSERTF((int)sizeof(((struct lmv_stripe_md *)0)->mea_ids[0]) == 16, "found %lld\n", - (long long)(int)sizeof(((struct lmv_stripe_md *)0)->mea_ids[0])); - /* Checks for struct lov_desc */ LASSERTF((int)sizeof(struct lov_desc) == 88, "found %lld\n", (long long)(int)sizeof(struct lov_desc)); diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index 41378c4..5ad0312 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -1207,20 +1207,6 @@ check_lmv_desc(void) } static void -check_lmv_stripe_md(void) -{ - BLANK_LINE(); - CHECK_STRUCT(lmv_stripe_md); - CHECK_MEMBER(lmv_stripe_md, mea_magic); - CHECK_MEMBER(lmv_stripe_md, mea_count); - CHECK_MEMBER(lmv_stripe_md, mea_master); - CHECK_MEMBER(lmv_stripe_md, mea_padding); - CHECK_CVALUE(LOV_MAXPOOLNAME); - CHECK_MEMBER(lmv_stripe_md, mea_pool_name[LOV_MAXPOOLNAME]); - CHECK_MEMBER(lmv_stripe_md, mea_ids[0]); -} - -static void check_lov_desc(void) { BLANK_LINE(); @@ -2365,7 +2351,6 @@ main(int argc, char **argv) check_mdt_rec_setxattr(); check_mdt_rec_reint(); check_lmv_desc(); - check_lmv_stripe_md(); check_lov_desc(); check_ldlm_res_id(); check_ldlm_extent(); diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index b99b98c..def2824 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -66,7 +66,6 @@ void lustre_assert_wire_constants(void) * running on Linux testnode 2.6.32 #3 SMP Thu Sep 13 12:42:57 PDT 2012 x86_64 x86_64 x86_64 * with gcc version 4.4.6 20120305 (Red Hat 4.4.6-4) (GCC) */ - /* Constants... */ LASSERTF(PTL_RPC_MSG_REQUEST == 4711, "found %lld\n", (long long)PTL_RPC_MSG_REQUEST); @@ -2907,35 +2906,6 @@ void lustre_assert_wire_constants(void) LASSERTF((int)sizeof(((struct lmv_desc *)0)->ld_uuid) == 40, "found %lld\n", (long long)(int)sizeof(((struct lmv_desc *)0)->ld_uuid)); - /* Checks for struct lmv_stripe_md */ - LASSERTF((int)sizeof(struct lmv_stripe_md) == 32, "found %lld\n", - (long long)(int)sizeof(struct lmv_stripe_md)); - LASSERTF((int)offsetof(struct lmv_stripe_md, mea_magic) == 0, "found %lld\n", - (long long)(int)offsetof(struct lmv_stripe_md, mea_magic)); - LASSERTF((int)sizeof(((struct lmv_stripe_md *)0)->mea_magic) == 4, "found %lld\n", - (long long)(int)sizeof(((struct lmv_stripe_md *)0)->mea_magic)); - LASSERTF((int)offsetof(struct lmv_stripe_md, mea_count) == 4, "found %lld\n", - (long long)(int)offsetof(struct lmv_stripe_md, mea_count)); - LASSERTF((int)sizeof(((struct lmv_stripe_md *)0)->mea_count) == 4, "found %lld\n", - (long long)(int)sizeof(((struct lmv_stripe_md *)0)->mea_count)); - LASSERTF((int)offsetof(struct lmv_stripe_md, mea_master) == 8, "found %lld\n", - (long long)(int)offsetof(struct lmv_stripe_md, mea_master)); - LASSERTF((int)sizeof(((struct lmv_stripe_md *)0)->mea_master) == 4, "found %lld\n", - (long long)(int)sizeof(((struct lmv_stripe_md *)0)->mea_master)); - LASSERTF((int)offsetof(struct lmv_stripe_md, mea_padding) == 12, "found %lld\n", - (long long)(int)offsetof(struct lmv_stripe_md, mea_padding)); - LASSERTF((int)sizeof(((struct lmv_stripe_md *)0)->mea_padding) == 4, "found %lld\n", - (long long)(int)sizeof(((struct lmv_stripe_md *)0)->mea_padding)); - CLASSERT(LOV_MAXPOOLNAME == 16); - LASSERTF((int)offsetof(struct lmv_stripe_md, mea_pool_name[16]) == 32, "found %lld\n", - (long long)(int)offsetof(struct lmv_stripe_md, mea_pool_name[16])); - LASSERTF((int)sizeof(((struct lmv_stripe_md *)0)->mea_pool_name[16]) == 1, "found %lld\n", - (long long)(int)sizeof(((struct lmv_stripe_md *)0)->mea_pool_name[16])); - LASSERTF((int)offsetof(struct lmv_stripe_md, mea_ids[0]) == 32, "found %lld\n", - (long long)(int)offsetof(struct lmv_stripe_md, mea_ids[0])); - LASSERTF((int)sizeof(((struct lmv_stripe_md *)0)->mea_ids[0]) == 16, "found %lld\n", - (long long)(int)sizeof(((struct lmv_stripe_md *)0)->mea_ids[0])); - /* Checks for struct lov_desc */ LASSERTF((int)sizeof(struct lov_desc) == 88, "found %lld\n", (long long)(int)sizeof(struct lov_desc)); -- 1.8.3.1