From: huanghua Date: Fri, 11 Jul 2008 05:22:00 +0000 (+0000) Subject: Branch b1_8_gate X-Git-Tag: v1_7_0_51~2^126 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=c313d2738b27b752615119f76523fc12c431a187;p=fs%2Flustre-release.git Branch b1_8_gate b=11930 i=adilger i=nikita.danilov i=alex --- diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index ee9b255..5791e41 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -71,7 +71,7 @@ static int ll_dir_readpage(struct file *file, struct page *page) CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off "LPU64"\n", inode->i_ino, inode->i_generation, inode, offset); - mdc_pack_fid(&mdc_fid, inode->i_ino, inode->i_generation, S_IFDIR); + ll_pack_fid(&mdc_fid, inode->i_ino, inode->i_generation, S_IFDIR); rc = mdc_readpage(ll_i2sbi(inode)->ll_mdc_exp, &mdc_fid, offset, page, &request); @@ -200,8 +200,7 @@ static void ll_dir_check_page(struct inode *dir, struct page *page) struct page *ll_get_dir_page(struct inode *dir, unsigned long n) { - struct ldlm_res_id res_id = - { .name = { dir->i_ino, (__u64)dir->i_generation} }; + struct ldlm_res_id res_id; struct lustre_handle lockh; struct obd_device *obddev = class_exp2obd(ll_i2sbi(dir)->ll_mdc_exp); struct address_space *mapping = dir->i_mapping; @@ -209,6 +208,7 @@ struct page *ll_get_dir_page(struct inode *dir, unsigned long n) ldlm_policy_data_t policy = {.l_inodebits = {MDS_INODELOCK_UPDATE} }; int rc; + fid_build_reg_res_name(ll_inode_lu_fid(dir), &res_id); rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh); if (!rc) { @@ -216,7 +216,7 @@ struct page *ll_get_dir_page(struct inode *dir, unsigned long n) struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CR, ll_mdc_blocking_ast, ldlm_completion_ast, NULL, dir }; struct ptlrpc_request *request; - struct mdc_op_data data; + struct mdc_op_data data = { { 0 } }; ll_prepare_mdc_op_data(&data, dir, NULL, NULL, 0, 0, NULL); @@ -302,8 +302,8 @@ static unsigned char ll_dir_filetype_table[LL_DIR_FT_MAX] = { * 0: no live entries on this page. */ -int ll_readdir_page(char *addr, __u64 base, unsigned *offset, - filldir_t filldir, void *cookie) +static int ll_readdir_page(char *addr, __u64 base, unsigned *offset, + filldir_t filldir, void *cookie) { struct ll_dir_entry *de; char *end; @@ -325,7 +325,7 @@ int ll_readdir_page(char *addr, __u64 base, unsigned *offset, return nr; } -int ll_readdir(struct file *filp, void *dirent, filldir_t filldir) +static int ll_readdir_18(struct file *filp, void *dirent, filldir_t filldir) { struct inode *inode = filp->f_dentry->d_inode; loff_t pos = filp->f_pos; @@ -410,6 +410,438 @@ int ll_readdir(struct file *filp, void *dirent, filldir_t filldir) RETURN(rc); } +/* + * Chain of hash overflow pages. + */ +struct ll_dir_chain { + /* XXX something. Later */ +}; + +static inline void ll_dir_chain_init(struct ll_dir_chain *chain) +{ +} + +static inline void ll_dir_chain_fini(struct ll_dir_chain *chain) +{ +} + +static inline __u32 hash_x_index(__u32 value) +{ + return ((__u32)~0) - value; +} + +/* + * Layout of readdir pages, as transmitted on wire. + */ +struct lu_dirent { + struct lu_fid lde_fid; + __u64 lde_hash; + __u16 lde_reclen; + __u16 lde_namelen; + __u32 lde_padding; + char lde_name[0]; +}; + +struct lu_dirpage { + __u64 ldp_hash_start; + __u64 ldp_hash_end; + __u16 ldp_flags; + __u16 ldp_pad0; + __u32 ldp_pad1; + struct lu_dirent ldp_entries[0]; +}; + +enum lu_dirpage_flags { + LDF_EMPTY = 1 << 0 +}; + +static inline struct lu_dirent *lu_dirent_start(struct lu_dirpage *dp) +{ + if (le16_to_cpu(dp->ldp_flags) & LDF_EMPTY) + return NULL; + else + return dp->ldp_entries; +} + +static inline struct lu_dirent *lu_dirent_next(struct lu_dirent *ent) +{ + struct lu_dirent *next; + + if (le16_to_cpu(ent->lde_reclen) != 0) + next = ((void *)ent) + le16_to_cpu(ent->lde_reclen); + else + next = NULL; + + return next; +} + +static inline int lu_dirent_size(struct lu_dirent *ent) +{ + if (le16_to_cpu(ent->lde_reclen) == 0) { + return (sizeof(*ent) + + le16_to_cpu(ent->lde_namelen) + 3) & ~3; + } + return le16_to_cpu(ent->lde_reclen); +} + +#define DIR_END_OFF 0xfffffffffffffffeULL + +#ifdef HAVE_RW_TREE_LOCK +#define TREE_READ_LOCK_IRQ(mapping) read_lock_irq(&(mapping)->tree_lock) +#define TREE_READ_UNLOCK_IRQ(mapping) read_unlock_irq(&(mapping)->tree_lock) +#else +#define TREE_READ_LOCK_IRQ(mapping) spin_lock_irq(&(mapping)->tree_lock) +#define TREE_READ_UNLOCK_IRQ(mapping) spin_unlock_irq(&(mapping)->tree_lock) +#endif + +/* returns the page unlocked, but with a reference */ +static int ll_dir_readpage_20(struct file *file, struct page *page) +{ + struct inode *inode = page->mapping->host; + struct ptlrpc_request *request; + struct mdt_body *body; + struct ll_fid fid; + __u64 hash; + int rc; + ENTRY; + + hash = hash_x_index(page->index); + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off %lu\n", + inode->i_ino, inode->i_generation, inode, (unsigned long)hash); + + ll_inode2fid(&fid, inode); + rc = mdc_readpage(ll_i2sbi(inode)->ll_mdc_exp, &fid, + hash, page, &request); + if (!rc) { + body = lustre_msg_buf(request->rq_repmsg, REPLY_REC_OFF, + sizeof(*body)); + /* Checked by mdc_readpage() */ + LASSERT(body != NULL); + + if (body->valid & OBD_MD_FLSIZE) { + ll_inode_size_lock(inode, 0); + i_size_write(inode, body->size); + ll_inode_size_unlock(inode, 0); + } + SetPageUptodate(page); + } + ptlrpc_req_finished(request); + + unlock_page(page); + EXIT; + return rc; +} + + +static void ll_check_page(struct inode *dir, struct page *page) +{ + /* XXX: check page format later */ + SetPageChecked(page); +} + + +/* + * Find, kmap and return page that contains given hash. + */ +static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash, + __u64 *start, __u64 *end) +{ + struct address_space *mapping = dir->i_mapping; + /* + * Complement of hash is used as an index so that + * radix_tree_gang_lookup() can be used to find a page with starting + * hash _smaller_ than one we are looking for. + */ + unsigned long offset = hash_x_index(hash); + struct page *page; + int found; + ENTRY; + + TREE_READ_LOCK_IRQ(mapping); + found = radix_tree_gang_lookup(&mapping->page_tree, + (void **)&page, offset, 1); + if (found > 0) { + struct lu_dirpage *dp; + + page_cache_get(page); + TREE_READ_UNLOCK_IRQ(mapping); + /* + * In contrast to find_lock_page() we are sure that directory + * page cannot be truncated (while DLM lock is held) and, + * hence, can avoid restart. + * + * In fact, page cannot be locked here at all, because + * ll_dir_readpage() does synchronous io. + */ + wait_on_page(page); + if (PageUptodate(page)) { + dp = kmap(page); + *start = le64_to_cpu(dp->ldp_hash_start); + *end = le64_to_cpu(dp->ldp_hash_end); + LASSERT(*start <= hash); + if (hash > *end || (*end != *start && hash == *end)) { + kunmap(page); + lock_page(page); + ll_truncate_complete_page(page); + unlock_page(page); + page_cache_release(page); + page = NULL; + } + } else { + page_cache_release(page); + page = ERR_PTR(-EIO); + } + + } else { + TREE_READ_UNLOCK_IRQ(mapping); + page = NULL; + } + RETURN(page); +} + +static struct page *ll_get_dir_page_20(struct inode *dir, __u64 hash, int exact, + struct ll_dir_chain *chain) +{ + struct ldlm_res_id res_id; + struct lustre_handle lockh; + struct obd_device *obddev = class_exp2obd(ll_i2sbi(dir)->ll_mdc_exp); + struct address_space *mapping = dir->i_mapping; + struct lu_dirpage *dp; + struct page *page; + ldlm_policy_data_t policy = {.l_inodebits = {MDS_INODELOCK_UPDATE} }; + ldlm_mode_t mode; + int rc; + __u64 start = 0; + __u64 end = 0; + ENTRY; + + fid_build_reg_res_name(ll_inode_lu_fid(dir), &res_id); + mode = LCK_PR; + rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED, + &res_id, LDLM_IBITS, &policy, mode, &lockh); + if (!rc) { + struct lookup_intent it = { .it_op = IT_READDIR }; + struct ldlm_enqueue_info einfo = { LDLM_IBITS, mode, + ll_mdc_blocking_ast, ldlm_completion_ast, NULL, dir }; + struct ptlrpc_request *request; + struct mdc_op_data op_data = { { 0 } }; + + ll_prepare_mdc_op_data(&op_data, dir, NULL, NULL, 0, 0, NULL); + + rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, &einfo, &it, + &op_data, &lockh, NULL, 0, 0); + + request = (struct ptlrpc_request *)it.d.lustre.it_data; + if (request) + ptlrpc_req_finished(request); + if (rc < 0) { + CERROR("lock enqueue: rc: %d\n", rc); + RETURN(ERR_PTR(rc)); + } + } + ldlm_lock_dump_handle(D_OTHER, &lockh); + + page = ll_dir_page_locate(dir, hash, &start, &end); + if (IS_ERR(page)) + GOTO(out_unlock, page); + + if (page != NULL) { + /* + * XXX nikita: not entirely correct handling of a corner case: + * suppose hash chain of entries with hash value HASH crosses + * border between pages P0 and P1. First both P0 and P1 are + * cached, seekdir() is called for some entry from the P0 part + * of the chain. Later P0 goes out of cache. telldir(HASH) + * happens and finds P1, as it starts with matching hash + * value. Remaining entries from P0 part of the chain are + * skipped. (Is that really a bug?) + * + * Possible solutions: 0. don't cache P1 is such case, handle + * it as an "overflow" page. 1. invalidate all pages at + * once. 2. use HASH|1 as an index for P1. + */ + if (exact && hash != start) { + /* + * readdir asked for a page starting _exactly_ from + * given hash, but cache contains stale page, with + * entries with smaller hash values. Stale page should + * be invalidated, and new one fetched. + */ + CDEBUG(D_INFO, "Stale readpage page %p: %#lx != %#lx\n", + page, (unsigned long)hash, (unsigned long)start); + lock_page(page); + ll_truncate_complete_page(page); + unlock_page(page); + page_cache_release(page); + } else { + GOTO(hash_collision, page); + } + } + + page = read_cache_page(mapping, hash_x_index(hash), + (filler_t*)ll_dir_readpage_20, NULL); + if (IS_ERR(page)) + GOTO(out_unlock, page); + + wait_on_page(page); + (void)kmap(page); + if (!PageUptodate(page)) + goto fail; + if (!PageChecked(page)) + ll_check_page(dir, page); + if (PageError(page)) + goto fail; +hash_collision: + dp = page_address(page); + + start = le64_to_cpu(dp->ldp_hash_start); + end = le64_to_cpu(dp->ldp_hash_end); + if (end == start) { + LASSERT(start == hash); + CWARN("Page-wide hash collision: %#lx\n", (unsigned long)end); + /* + * Fetch whole overflow chain... + * + * XXX not yet. + */ + goto fail; + } +out_unlock: + ldlm_lock_decref(&lockh, mode); + RETURN(page); + +fail: + ll_put_page(page); + page = ERR_PTR(-EIO); + goto out_unlock; +} + +static int ll_readdir_20(struct file *filp, void *cookie, filldir_t filldir) +{ + struct inode *inode = filp->f_dentry->d_inode; + struct ll_sb_info *sbi = ll_i2sbi(inode); + __u64 pos = filp->f_pos; + struct page *page; + struct ll_dir_chain chain; + int rc; + int done; + int shift; + ENTRY; + + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) pos %lu/%llu\n", + inode->i_ino, inode->i_generation, inode, + (unsigned long)pos, i_size_read(inode)); + + if (pos == DIR_END_OFF) + /* + * end-of-file. + */ + RETURN(0); + + rc = 0; + done = 0; + shift = 0; + ll_dir_chain_init(&chain); + + page = ll_get_dir_page_20(inode, pos, 0, &chain); + + while (rc == 0 && !done) { + struct lu_dirpage *dp; + struct lu_dirent *ent; + + if (!IS_ERR(page)) { + /* + * If page is empty (end of directoryis reached), + * use this value. + */ + __u64 hash = DIR_END_OFF; + __u64 next; + + dp = page_address(page); + for (ent = lu_dirent_start(dp); ent != NULL && !done; + ent = lu_dirent_next(ent)) { + char *name; + int namelen; + struct lu_fid fid; + ino_t ino; + + hash = le64_to_cpu(ent->lde_hash); + namelen = le16_to_cpu(ent->lde_namelen); + + if (hash < pos) + /* + * Skip until we find target hash + * value. + */ + continue; + + if (namelen == 0) + /* + * Skip dummy record. + */ + continue; + + fid = ent->lde_fid; + name = ent->lde_name; + fid_le_to_cpu(&fid, &fid); + ino = ll_fid_build_ino(sbi, (struct ll_fid*)&fid); + + done = filldir(cookie, name, namelen, + (loff_t)hash, ino, DT_UNKNOWN); + } + next = le64_to_cpu(dp->ldp_hash_end); + ll_put_page(page); + if (!done) { + pos = next; + if (pos == DIR_END_OFF) + /* + * End of directory reached. + */ + done = 1; + else if (1 /* chain is exhausted*/) + /* + * Normal case: continue to the next + * page. + */ + page = ll_get_dir_page_20(inode, pos, 1, + &chain); + else { + /* + * go into overflow page. + */ + } + } else { + pos = hash; + } + } else { + rc = PTR_ERR(page); + CERROR("error reading dir "DFID" at %lu: rc %d\n", + PFID(ll_inode_lu_fid(inode)), + (unsigned long)pos, rc); + } + } + + filp->f_pos = (loff_t)(__s32)pos; + filp->f_version = inode->i_version; + touch_atime(filp->f_vfsmnt, filp->f_dentry); + + ll_dir_chain_fini(&chain); + + RETURN(rc); +} + +static int ll_readdir(struct file *filp, void *cookie, filldir_t filldir) +{ + struct inode *inode = filp->f_dentry->d_inode; + struct ll_sb_info *sbi = ll_i2sbi(inode); + + if (sbi->ll_mdc_exp->exp_connect_flags & OBD_CONNECT_FID) { + return ll_readdir_20(filp, cookie, filldir); + } else { + return ll_readdir_18(filp, cookie, filldir); + } +} + #define QCTL_COPY(out, in) \ do { \ Q_COPY(out, in, qc_cmd); \ @@ -420,7 +852,7 @@ do { \ Q_COPY(out, in, qc_dqblk); \ } while (0) -int ll_send_mgc_param(struct obd_export *mgc, char *string) +static int ll_send_mgc_param(struct obd_export *mgc, char *string) { struct mgs_send_param *msp; int rc = 0; @@ -439,7 +871,7 @@ int ll_send_mgc_param(struct obd_export *mgc, char *string) return rc; } -char *ll_get_fsname(struct inode *inode) +static char *ll_get_fsname(struct inode *inode) { struct lustre_sb_info *lsi = s2lsi(inode->i_sb); char *ptr, *fsname; @@ -460,7 +892,7 @@ int ll_dir_setstripe(struct inode *inode, struct lov_user_md *lump, int set_default) { struct ll_sb_info *sbi = ll_i2sbi(inode); - struct mdc_op_data data; + struct mdc_op_data data = { { 0 } }; struct ptlrpc_request *req = NULL; struct lustre_sb_info *lsi = s2lsi(inode->i_sb); struct obd_device *mgc = lsi->lsi_mgc;