From: nikita Date: Thu, 27 Jul 2006 22:31:50 +0000 (+0000) Subject: new hash-order readdir. Initial implementation X-Git-Tag: v1_8_0_110~486^2~1325 X-Git-Url: https://git.whamcloud.com/gitweb?a=commitdiff_plain;h=b4bf0081d8df7898214888e14d569aed7262f6c2;p=fs%2Flustre-release.git new hash-order readdir. Initial implementation --- diff --git a/lustre/cmm/cmm_object.c b/lustre/cmm/cmm_object.c index ef5fd16..99586d1 100644 --- a/lustre/cmm/cmm_object.c +++ b/lustre/cmm/cmm_object.c @@ -308,7 +308,7 @@ static int cml_close(const struct lu_context *ctx, struct md_object *mo) } static int cml_readpage(const struct lu_context *ctxt, struct md_object *mo, - struct lu_rdpg *rdpg) + const struct lu_rdpg *rdpg) { int rc; ENTRY; @@ -585,6 +585,12 @@ static int cmr_close(const struct lu_context *ctx, struct md_object *mo) RETURN(-EFAULT); } +static int cmr_readpage(const struct lu_context *ctxt, struct md_object *mo, + const struct lu_rdpg *rdpg) +{ + RETURN(-EREMOTE); +} + static struct md_object_operations cmr_mo_ops = { .moo_attr_get = cmr_attr_get, .moo_attr_set = cmr_attr_set, @@ -596,7 +602,8 @@ static struct md_object_operations cmr_mo_ops = { .moo_ref_add = cmr_ref_add, .moo_ref_del = cmr_ref_del, .moo_open = cmr_open, - .moo_close = cmr_close + .moo_close = cmr_close, + .moo_readpage = cmr_readpage }; /* remote part of md_dir operations */ diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index 2ae1320..0c992cc 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -232,7 +232,7 @@ struct dt_object_operations { struct dt_object *dt, struct thandle *th); int (*do_readpage)(const struct lu_context *ctxt, - struct dt_object *dt, struct lu_rdpg *rdpg); + struct dt_object *dt, const struct lu_rdpg *rdpg); }; /* diff --git a/lustre/include/lu_object.h b/lustre/include/lu_object.h index ddb2947c5..611d6ce 100644 --- a/lustre/include/lu_object.h +++ b/lustre/include/lu_object.h @@ -773,13 +773,10 @@ static inline int lu_object_assert_not_exists(const struct lu_context *ctx, struct lu_rdpg { /* input params, should be filled out by mdt */ - loff_t rp_offset; /* offset, page alligned*/ + __u32 rp_hash; /* hash */ int rp_count; /* count in bytes */ int rp_npages; /* number of pages */ struct page **rp_pages; /* pointers to pages */ - - /* output params, filled by osd */ - __u64 rp_size; /* file size in total */ }; enum lu_xattr_flags { diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index c525140..e34d701 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -262,24 +262,24 @@ static inline void fid_le_to_cpu(struct lu_fid *fid) fid->f_ver = le32_to_cpu(fid_ver(fid)); } -#define LU_NAME_LEN 255 - /* - * lustre directory entry. This is used for sending directory content to the - * client, where it later gets converted to what Linux expects. + * Layout of readdir pages, as transmitted on wire. */ -struct lu_dir_entry { - struct lu_fid de_fid; /* file fid */ - __u16 de_rec_len; /* rec len */ - __u16 de_name_len; /* name len */ - char de_name[LU_NAME_LEN]; /* file name */ +struct lu_dirent { + struct lu_fid lde_fid; + __u32 lde_hash; + __u16 lde_reclen; + __u16 lde_namelen; + char lde_name[0]; }; -#define LU_DIR_PAD 4 -#define LU_DIR_ROUND (LU_DIR_PAD - 1) - -#define LU_DIR_REC_LEN(len) ((sizeof(struct lu_fid) + 2*sizeof(__u16) + \ - len + 1 + LU_DIR_ROUND) & ~LU_DIR_ROUND) +struct lu_dirpage { + __u32 ldp_hash_start; + __u32 ldp_hash_end; + __u16 ldp_pad0; + __u32 ldp_pad1; + struct lu_dirent ldp_entries[0]; +}; #define MEA_MAGIC_LAST_CHAR 0xb2221ca1 #define MEA_MAGIC_ALL_CHARS 0xb222a11c diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index e78ac87..15c7cfe 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -92,7 +92,7 @@ struct md_object_operations { int (*moo_close)(const struct lu_context *, struct md_object *); int (*moo_readpage)(const struct lu_context *, struct md_object *, - struct lu_rdpg *); + const struct lu_rdpg *); }; /* @@ -249,7 +249,7 @@ static inline int mo_close(const struct lu_context *cx, struct md_object *m) } static inline int mo_readpage(const struct lu_context *cx, struct md_object *m, - struct lu_rdpg *rdpg) + const struct lu_rdpg *rdpg) { LASSERT(m->mo_ops->moo_readpage); return m->mo_ops->moo_readpage(cx, m, rdpg); diff --git a/lustre/kernel_patches/patches/ext3-iam-separate.patch b/lustre/kernel_patches/patches/ext3-iam-separate.patch index f2a76dc..b06605a 100644 --- a/lustre/kernel_patches/patches/ext3-iam-separate.patch +++ b/lustre/kernel_patches/patches/ext3-iam-separate.patch @@ -1,7 +1,7 @@ Index: iam/fs/ext3/Makefile =================================================================== --- iam.orig/fs/ext3/Makefile 2006-05-31 20:24:32.000000000 +0400 -+++ iam/fs/ext3/Makefile 2006-07-25 16:59:51.000000000 +0400 ++++ iam/fs/ext3/Makefile 2006-07-28 01:53:16.000000000 +0400 @@ -6,7 +6,7 @@ obj-$(CONFIG_EXT3_FS) += ext3.o ext3-y := balloc.o bitmap.o dir.o file.o fsync.o ialloc.o inode.o iopen.o \ @@ -5461,7 +5461,7 @@ Index: iam/fs/ext3/namei.c Index: iam/include/linux/lustre_iam.h =================================================================== --- iam.orig/include/linux/lustre_iam.h 2006-05-31 20:24:32.000000000 +0400 -+++ iam/include/linux/lustre_iam.h 2006-07-25 16:59:51.000000000 +0400 ++++ iam/include/linux/lustre_iam.h 2006-07-28 01:53:16.000000000 +0400 @@ -1,9 +1,68 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: diff --git a/lustre/kernel_patches/patches/ext3-iam-uapi.patch b/lustre/kernel_patches/patches/ext3-iam-uapi.patch index a3db133..146d4af 100644 --- a/lustre/kernel_patches/patches/ext3-iam-uapi.patch +++ b/lustre/kernel_patches/patches/ext3-iam-uapi.patch @@ -1,7 +1,7 @@ Index: iam/fs/ext3/Makefile =================================================================== ---- iam.orig/fs/ext3/Makefile 2006-07-25 16:59:51.000000000 +0400 -+++ iam/fs/ext3/Makefile 2006-07-25 16:59:51.000000000 +0400 +--- iam.orig/fs/ext3/Makefile 2006-07-28 01:53:16.000000000 +0400 ++++ iam/fs/ext3/Makefile 2006-07-28 01:53:16.000000000 +0400 @@ -6,7 +6,7 @@ obj-$(CONFIG_EXT3_FS) += ext3.o ext3-y := balloc.o bitmap.o dir.o file.o fsync.o ialloc.o inode.o iopen.o \ @@ -13,8 +13,8 @@ Index: iam/fs/ext3/Makefile ext3-$(CONFIG_EXT3_FS_POSIX_ACL) += acl.o Index: iam/fs/ext3/dir.c =================================================================== ---- iam.orig/fs/ext3/dir.c 2006-07-25 16:59:51.000000000 +0400 -+++ iam/fs/ext3/dir.c 2006-07-25 16:59:51.000000000 +0400 +--- iam.orig/fs/ext3/dir.c 2006-07-28 01:53:16.000000000 +0400 ++++ iam/fs/ext3/dir.c 2006-07-28 01:53:16.000000000 +0400 @@ -28,6 +28,7 @@ #include #include @@ -112,8 +112,8 @@ Index: iam/fs/ext3/dir.c (filp->f_version != inode->i_version)) { Index: iam/fs/ext3/file.c =================================================================== ---- iam.orig/fs/ext3/file.c 2006-07-25 16:59:51.000000000 +0400 -+++ iam/fs/ext3/file.c 2006-07-25 16:59:51.000000000 +0400 +--- iam.orig/fs/ext3/file.c 2006-07-28 01:53:16.000000000 +0400 ++++ iam/fs/ext3/file.c 2006-07-28 01:53:16.000000000 +0400 @@ -23,6 +23,7 @@ #include #include @@ -149,7 +149,7 @@ Index: iam/fs/ext3/file.c Index: iam/fs/ext3/iam-uapi.c =================================================================== --- iam.orig/fs/ext3/iam-uapi.c 2004-04-06 17:27:52.000000000 +0400 -+++ iam/fs/ext3/iam-uapi.c 2006-07-25 16:59:51.000000000 +0400 ++++ iam/fs/ext3/iam-uapi.c 2006-07-28 01:53:16.000000000 +0400 @@ -0,0 +1,361 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: @@ -514,8 +514,8 @@ Index: iam/fs/ext3/iam-uapi.c +} Index: iam/fs/ext3/ioctl.c =================================================================== ---- iam.orig/fs/ext3/ioctl.c 2006-07-25 16:59:51.000000000 +0400 -+++ iam/fs/ext3/ioctl.c 2006-07-25 16:59:51.000000000 +0400 +--- iam.orig/fs/ext3/ioctl.c 2006-07-28 01:53:16.000000000 +0400 ++++ iam/fs/ext3/ioctl.c 2006-07-28 01:53:16.000000000 +0400 @@ -250,6 +250,6 @@ flags_err: @@ -526,8 +526,8 @@ Index: iam/fs/ext3/ioctl.c } Index: iam/include/linux/lustre_iam.h =================================================================== ---- iam.orig/include/linux/lustre_iam.h 2006-07-25 16:59:51.000000000 +0400 -+++ iam/include/linux/lustre_iam.h 2006-07-25 16:59:51.000000000 +0400 +--- iam.orig/include/linux/lustre_iam.h 2006-07-28 01:53:16.000000000 +0400 ++++ iam/include/linux/lustre_iam.h 2006-07-28 01:53:16.000000000 +0400 @@ -30,9 +30,6 @@ #ifndef __LINUX_LUSTRE_IAM_H__ #define __LINUX_LUSTRE_IAM_H__ diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 43e828b..eb604c4 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -52,22 +52,27 @@ #define PageChecked(page) test_bit(PG_checked, &(page)->flags) #define SetPageChecked(page) set_bit(PG_checked, &(page)->flags) +static __u32 hash_x_index(__u32 value) +{ + return ((__u32)~0) - value; +} + /* returns the page unlocked, but with a reference */ static int ll_dir_readpage(struct file *file, struct page *page) { struct inode *inode = page->mapping->host; struct ptlrpc_request *request; struct mdt_body *body; - __u64 offset; - int rc = 0; + __u32 hash; + int rc; ENTRY; - offset = (__u64)page->index << PAGE_SHIFT; - CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off "LPU64"\n", - inode->i_ino, inode->i_generation, inode, offset); + hash = hash_x_index(page->index); + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off %lu\n", + inode->i_ino, inode->i_generation, inode, (unsigned long)hash); rc = md_readpage(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), - offset, page, &request); + hash, page, &request); if (!rc) { body = lustre_msg_buf(request->rq_repmsg, 0, sizeof (*body)); LASSERT (body != NULL); /* checked by md_readpage() */ @@ -110,13 +115,87 @@ static inline void ll_put_page(struct page *page) page_cache_release(page); } -static struct page *ll_get_dir_page(struct inode *dir, unsigned long n) +/* + * Find, kmap and return page that contains given hash. + */ +static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash, + __u32 *start, __u32 *end) +{ + struct address_space *mapping = dir->i_mapping; + /* + * Complement of hash is used as an index so that + * radix_tree_gang_lookup() can be used to find a page with starting + * hash _smaller_ than one we are looking for. + */ + unsigned long offset = hash_x_index(hash); + struct page *page; + int found; + + spin_lock_irq(&mapping->tree_lock); + found = radix_tree_gang_lookup(&mapping->page_tree, + (void **)&page, offset, 1); + if (found > 0) { + struct lu_dirpage *dp; + + page_cache_get(page); + spin_unlock_irq(&mapping->tree_lock); + /* + * In contrast to find_lock_page() we are sure that directory + * page cannot be truncated (while DLM lock is held) and, + * hence, can avoid restart. + * + * In fact, page cannot be locked here at all, because + * ll_dir_readpage() does synchronous io. + */ + wait_on_page(page); + if (PageUptodate(page)) { + dp = kmap(page); + *start = le32_to_cpu(dp->ldp_hash_start); + *end = le32_to_cpu(dp->ldp_hash_end); + LASSERT(*start <= hash); + if (hash > *end || (*end != *start && hash == *end)) { + kunmap(page); + page_cache_release(page); + page = NULL; + } + } else { + page_cache_release(page); + page = ERR_PTR(-EIO); + } + + } else { + spin_unlock_irq(&mapping->tree_lock); + page = NULL; + } + return page; +} + +/* + * Chain of hash overflow pages. + */ +struct ll_dir_chain { + /* XXX something. Later */ +}; + +static void ll_dir_chain_init(struct ll_dir_chain *chain) +{ +} + +static void ll_dir_chain_fini(struct ll_dir_chain *chain) +{ +} + +static struct page *ll_get_dir_page(struct inode *dir, __u32 hash, int exact, + struct ll_dir_chain *chain) { ldlm_policy_data_t policy = {.l_inodebits = {MDS_INODELOCK_UPDATE} }; struct address_space *mapping = dir->i_mapping; struct lustre_handle lockh; struct page *page; + struct lu_dirpage *dp; int rc; + __u32 start; + __u32 end; rc = md_lock_match(ll_i2sbi(dir)->ll_md_exp, LDLM_FL_BLOCK_GRANTED, ll_inode2fid(dir), LDLM_IBITS, &policy, LCK_CR, &lockh); @@ -142,7 +221,27 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n) } ldlm_lock_dump_handle(D_OTHER, &lockh); - page = read_cache_page(mapping, n, + page = ll_dir_page_locate(dir, hash, &start, &end); + if (IS_ERR(page)) + GOTO(out_unlock, page); + + if (page != NULL) { + if (exact && hash != start) { + /* + * readdir asked for a page starting _exactly_ from + * given hash, but cache contains stale page, with + * entries with smaller hash values. Stale page should + * be invalidated, and new one fetched. + */ + CWARN("Stale readpage page: %#lx != %#lx\n", + (unsigned long)hash, (unsigned long)start); + truncate_complete_page(mapping, page); + page_cache_release(page); + } else + GOTO(hash_collision, page); + } + + page = read_cache_page(mapping, hash_x_index(hash), (filler_t*)mapping->a_ops->readpage, NULL); if (IS_ERR(page)) GOTO(out_unlock, page); @@ -155,7 +254,21 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n) ll_check_page(dir, page); if (PageError(page)) goto fail; - + dp = page_address(page); + + start = le32_to_cpu(dp->ldp_hash_start); + end = le32_to_cpu(dp->ldp_hash_end); +hash_collision: + if (end == start) { + LASSERT(start == hash); + CWARN("Page-wide hash collision: %#lx\n", (unsigned long)end); + /* + * Fetch whole overflow chain... + * + * XXX not yet. + */ + goto fail; + } out_unlock: ldlm_lock_decref(&lockh, LCK_CR); return page; @@ -166,78 +279,166 @@ fail: goto out_unlock; } -static inline struct lu_dir_entry *ll_next_entry(struct lu_dir_entry *p) +static loff_t ll_llseek(struct file *filp, loff_t off, int whence) { - return (struct lu_dir_entry *)((char *)p + le16_to_cpu(p->de_rec_len)); + if (off != 0 || whence != 1 /* SEEK_CUR */) { + /* + * Except when telldir() is going on, reset readdir to the + * beginning of hash collision chain. + */ + struct ll_file_data *fd = LUSTRE_FPRIVATE(filp); + + fd->fd_dir.lfd_dup = 0; + } + return default_llseek(filp, off, whence); } -int ll_readdir(struct file *filp, void *dirent, filldir_t filldir) +static struct lu_dirent *lu_dirent_start(struct lu_dirpage *dp) { - struct inode *inode = filp->f_dentry->d_inode; - loff_t pos = filp->f_pos; - unsigned offset = pos & ~PAGE_CACHE_MASK; - unsigned long n = pos >> PAGE_CACHE_SHIFT; - unsigned long npages = dir_pages(inode); - int rc = 0; - ENTRY; - - CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) pos %llu/%llu\n", - inode->i_ino, inode->i_generation, inode, pos, inode->i_size); + return dp->ldp_entries; +} - if (pos > inode->i_size - LU_DIR_REC_LEN(1)) - RETURN(0); +static struct lu_dirent *lu_dirent_next(struct lu_dirent *ent) +{ + struct lu_dirent *next; - for ( ; n < npages; n++, offset = 0) { - char *kaddr, *limit; - struct lu_dir_entry *de; - struct page *page; + if (ent->lde_reclen != 0) + next = ((void *)ent) + le16_to_cpu(ent->lde_reclen); + else + next = NULL; + return next; +} - CDEBUG(D_VFSTRACE,"read %lu of dir %lu/%u page %lu/%lu size %llu\n", - PAGE_CACHE_SIZE, inode->i_ino, inode->i_generation, - n, npages, inode->i_size); - page = ll_get_dir_page(inode, n); +int ll_readdir(struct file *filp, void *cookie, filldir_t filldir) +{ + struct inode *inode = filp->f_dentry->d_inode; + struct ll_inode_info *info = ll_i2info(inode); + struct ll_file_data *fd = LUSTRE_FPRIVATE(filp); + struct ll_sb_info *sbi = ll_i2sbi(inode); + __u32 pos = filp->f_pos; + struct page *page; + struct ll_dir_chain chain; + __u32 prevhash; + int rc; + int dup; + int done; + int shift; + ENTRY; - /* size might have been updated by md_readpage */ - npages = dir_pages(inode); + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) pos %lu/%llu\n", + inode->i_ino, inode->i_generation, inode, + (unsigned long)pos, inode->i_size); - if (IS_ERR(page)) { - rc = PTR_ERR(page); - CERROR("error reading dir %lu/%u page %lu: rc %d\n", - inode->i_ino, inode->i_generation, n, rc); - continue; - } + if (pos == ~0) + /* + * end-of-file. + */ + RETURN(0); - kaddr = page_address(page); - - de = (struct lu_dir_entry *)(kaddr + offset); - limit = kaddr + PAGE_CACHE_SIZE - LU_DIR_REC_LEN(1); - for ( ;(char*)de <= limit; de = ll_next_entry(de)) { - if (fid_oid(&de->de_fid) && fid_seq(&de->de_fid)) { - struct ll_sb_info *sbi = ll_i2sbi(inode); - int over; - - rc = 0; /* no error if we return something */ - - offset = (char *)de - kaddr; - fid_le_to_cpu(&de->de_fid); - over = filldir(dirent, de->de_name, de->de_name_len, - (n << PAGE_CACHE_SHIFT) | offset, - ll_fid_build_ino(sbi, &de->de_fid), - 0); - if (over) { - ll_put_page(page); - GOTO(done, rc); + rc = 0; + dup = 0; + done = 0; + shift = 0; + prevhash = ~0; /* impossible hash value */ + ll_dir_chain_init(&chain); + + page = ll_get_dir_page(inode, pos, 0, &chain); + + while (rc == 0 && !done) { + struct lu_dirpage *dp; + struct lu_dirent *ent; + + if (!IS_ERR(page)) { + __u32 hash; /* no, Richard, it _is_ initialized */ + __u32 next; + + dp = page_address(page); + for (ent = lu_dirent_start(dp); ent != NULL && !done; + ent = lu_dirent_next(ent)) { + char *name; + int namelen; + struct lu_fid fid; + ino_t ino; + + /* + * XXX: implement correct swabbing here. + */ + + hash = le32_to_cpu(ent->lde_hash); + namelen = le16_to_cpu(ent->lde_namelen); + + if (hash < pos) + /* + * Skip until we find target hash + * value. + */ + continue; + + if (namelen == 0) + /* + * Skip dummy record. + */ + continue; + /* + * Keep track of how far we get into duplicate + * hash segment. + */ + if (hash == prevhash) + dup++; + prevhash = hash; + + if (hash == fd->fd_dir.lfd_duppos && + fd->fd_dir.lfd_dup > 0) { + fd->fd_dir.lfd_dup--; + continue; } + + fid = ent->lde_fid; + name = ent->lde_name; + fid_le_to_cpu(&fid); + ino = ll_fid_build_ino(sbi, &fid); + + done = filldir(cookie, name, namelen, + hash, ino, DT_UNKNOWN); } + next = le32_to_cpu(dp->ldp_hash_end); + ll_put_page(page); + if (!done) { + pos = next; + if (pos == ~0) + /* + * End of directory reached. + */ + done = 1; + else if (1 /* chain is exhausted*/) + /* + * Normal case: continue to the next + * page. + */ + page = ll_get_dir_page(inode, pos, 1, + &chain); + else { + /* + * go into overflow page. + */ + } + } else + pos = hash; + } else { + rc = PTR_ERR(page); + CERROR("error reading dir "DFID3" at %lu: rc %d\n", + PFID3(&info->lli_fid), (unsigned long)pos, rc); } - ll_put_page(page); } -done: - filp->f_pos = (n << PAGE_CACHE_SHIFT) | offset; + filp->f_pos = pos; filp->f_version = inode->i_version; + fd->fd_dir.lfd_dup = dup; + fd->fd_dir.lfd_duppos = prevhash; touch_atime(filp->f_vfsmnt, filp->f_dentry); + ll_dir_chain_fini(&chain); + RETURN(rc); } @@ -798,6 +999,7 @@ struct file_operations ll_dir_operations = { .release = ll_dir_release, .read = generic_read_dir, .readdir = ll_readdir, + .llseek = ll_llseek, .ioctl = ll_dir_ioctl }; diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 4b26a70..5da3837 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -14,7 +14,7 @@ #include #include #include /* for s2sbi */ - + /* struct lustre_intent_data { __u64 it_lock_handle[2]; @@ -217,7 +217,7 @@ struct ll_readahead_state { unsigned long ras_consecutive_pages; /* * number of read requests after the last read-ahead window reset - * As window is reset on each seek, this is effectively the number + * As window is reset on each seek, this is effectively the number * on consecutive read request and is used to trigger read-ahead. */ unsigned long ras_consecutive_requests; @@ -244,7 +244,7 @@ struct ll_readahead_state { */ unsigned long ras_requests; /* - * Page index with respect to the current request, these value + * Page index with respect to the current request, these value * will not be accurate when dealing with reads issued via mmap. */ unsigned long ras_request_index; @@ -256,6 +256,11 @@ struct ll_readahead_state { struct list_head ras_read_beads; }; +struct ll_file_dir { + int lfd_dup; + loff_t lfd_duppos; +}; + extern kmem_cache_t *ll_file_data_slab; struct lustre_handle; struct ll_file_data { @@ -264,6 +269,7 @@ struct ll_file_data { __u32 fd_flags; struct lustre_handle fd_cwlockh; unsigned long fd_gid; + struct ll_file_dir fd_dir; }; struct lov_stripe_md; @@ -444,7 +450,7 @@ int ll_iocontrol(struct inode *inode, struct file *file, unsigned int cmd, unsigned long arg); void ll_umount_begin(struct super_block *sb); int ll_remount_fs(struct super_block *sb, int *flags, char *data); -int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req, +int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req, int offset, struct super_block *); void lustre_dump_dentry(struct dentry *, int recur); void lustre_dump_inode(struct inode *); diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 66969a5..518fccc 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -1768,10 +1768,14 @@ static int lmv_readpage(struct obd_export *exp, struct lu_fid *fid, rc = md_readpage(lmv->tgts[i].ltd_exp, &rid, offset, page, request); - if (rc == 0 && !lu_fid_eq(&rid, fid)) + if (0 && rc == 0 && !lu_fid_eq(&rid, fid)) /* this page isn't from master object. To avoid "." and ".." * duplication in directory, we have to remove them from all - * slave objects */ + * slave objects + * + * XXX this is not needed for cmd3 readdir, because only + * master directory has dot and dotdot. + */ lmv_remove_dots(page); RETURN(rc); diff --git a/lustre/mdd/mdd_handler.c b/lustre/mdd/mdd_handler.c index 7f2eba5..e5e14a4 100644 --- a/lustre/mdd/mdd_handler.c +++ b/lustre/mdd/mdd_handler.c @@ -621,22 +621,8 @@ static int mdd_dir_is_empty(const struct lu_context *ctx, result = iops->get(ctx, it, (const void *)""); if (result > 0) { int i; - for (result = 0, i = 0; result == 0 && i < 3; ++i) { + for (result = 0, i = 0; result == 0 && i < 3; ++i) result = iops->next(ctx, it); -#if 0 - if (result == 0) { - struct lu_fid *fid; - char *name; - int len; - - fid = (void *)iops->rec(ctx, it); - name = (void *)iops->key(ctx, it); - len = iops->key_size(ctx, it); - CERROR("entry: "DFID3": \"%*.*s\"\n", - PFID3(fid), len, len, name); - } -#endif - } iops->put(ctx, it); if (result == 0) result = -ENOTEMPTY; @@ -1223,7 +1209,7 @@ static int mdd_close(const struct lu_context *ctxt, struct md_object *obj) } static int mdd_readpage(const struct lu_context *ctxt, struct md_object *obj, - struct lu_rdpg *rdpg) + const struct lu_rdpg *rdpg) { struct dt_object *next; int rc; diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 71b2e3e..9baac52 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -355,7 +355,7 @@ skip_packing: if (rc < 0) { if (rc == -ENODATA || rc == -EOPNOTSUPP) rc = 0; - else + else CERROR("got acl size: %d\n", rc); } else { repbody->aclsize = rc; @@ -470,7 +470,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, if (lock) { res_id = &lock->l_resource->lr_name; LDLM_DEBUG(lock, "we will return this lock client\n"); - LASSERTF(fid_res_name_eq(mdt_object_fid(child), + LASSERTF(fid_res_name_eq(mdt_object_fid(child), &lock->l_resource->lr_name), "Lock res_id: %lu/%lu/%lu, Fid: "DFID3".\n", (unsigned long)res_id->name[0], @@ -557,7 +557,7 @@ static int mdt_sendpage(struct mdt_thread_info *info, for (i = 0, tmpcount = rdpg->rp_count; i < rdpg->rp_npages; i++, tmpcount -= tmpsize) { - tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount; + tmpsize = min(tmpcount, CFS_PAGE_SIZE); ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize); } @@ -607,15 +607,12 @@ static int mdt_readpage(struct mdt_thread_info *info) int rc; int i; ENTRY; - RETURN(-EOPNOTSUPP); if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) RETURN(-ENOMEM); - reqbody = req_capsule_client_get(&info->mti_pill, - &RMF_MDT_BODY); - repbody = req_capsule_server_get(&info->mti_pill, - &RMF_MDT_BODY); + reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY); + repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); if (reqbody == NULL || repbody == NULL) RETURN(-EFAULT); @@ -624,14 +621,17 @@ static int mdt_readpage(struct mdt_thread_info *info) * reqbody->size contains offset of where to start to read and * reqbody->nlink contains number bytes to read. */ - rdpg->rp_offset = reqbody->size; - rdpg->rp_count = reqbody->nlink; - rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >> PAGE_SHIFT; - LASSERT((rdpg->rp_offset & (PAGE_SIZE - 1)) == 0); - + rdpg->rp_hash = reqbody->size; + if ((__u64)rdpg->rp_hash != reqbody->size) { + CERROR("Invalid hash: %#llx != %#llx\n", + (__u64)rdpg->rp_hash, reqbody->size); + RETURN(-EFAULT); + } + rdpg->rp_count = reqbody->nlink; + rdpg->rp_npages = rdpg->rp_count + CFS_PAGE_SIZE - 1 >> CFS_PAGE_SHIFT; OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]); if (rdpg->rp_pages == NULL) - GOTO(out, rc = -ENOMEM); + RETURN(-ENOMEM); for (i = 0; i < rdpg->rp_npages; ++i) { rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0); @@ -644,9 +644,6 @@ static int mdt_readpage(struct mdt_thread_info *info) if (rc) GOTO(free_rdpg, rc); - repbody->size = rdpg->rp_size; - repbody->valid = OBD_MD_FLSIZE; - /* send pages to client */ rc = mdt_sendpage(info, rdpg); @@ -655,8 +652,7 @@ free_rdpg: for (i = 0; i < rdpg->rp_npages; i++) if (rdpg->rp_pages[i] != NULL) __free_pages(rdpg->rp_pages[i], 0); - OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]); -out: + OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]); return rc; } diff --git a/lustre/osd/osd_handler.c b/lustre/osd/osd_handler.c index 8fac73b..c6dc005 100644 --- a/lustre/osd/osd_handler.c +++ b/lustre/osd/osd_handler.c @@ -823,8 +823,8 @@ static void osd_object_ref_del(const struct lu_context *ctxt, LASSERT(osd_invariant(obj)); } -int osd_xattr_get(const struct lu_context *ctxt, struct dt_object *dt, - void *buf, int size, const char *name) +static int osd_xattr_get(const struct lu_context *ctxt, struct dt_object *dt, + void *buf, int size, const char *name) { struct inode *inode = osd_dt_obj(dt)->oo_inode; struct osd_thread_info *info = lu_context_key_get(ctxt, &osd_key); @@ -836,9 +836,9 @@ int osd_xattr_get(const struct lu_context *ctxt, struct dt_object *dt, return inode->i_op->getxattr(dentry, name, buf, size); } -int osd_xattr_set(const struct lu_context *ctxt, struct dt_object *dt, - const void *buf, int size, const char *name, int fl, - struct thandle *handle) +static int osd_xattr_set(const struct lu_context *ctxt, struct dt_object *dt, + const void *buf, int size, const char *name, int fl, + struct thandle *handle) { int fs_flags; @@ -860,8 +860,8 @@ int osd_xattr_set(const struct lu_context *ctxt, struct dt_object *dt, return inode->i_op->setxattr(dentry, name, buf, size, fs_flags); } -int osd_xattr_list(const struct lu_context *ctxt, struct dt_object *dt, - void *buf, int size) +static int osd_xattr_list(const struct lu_context *ctxt, struct dt_object *dt, + void *buf, int size) { struct inode *inode = osd_dt_obj(dt)->oo_inode; struct osd_thread_info *info = lu_context_key_get(ctxt, &osd_key); @@ -873,8 +873,8 @@ int osd_xattr_list(const struct lu_context *ctxt, struct dt_object *dt, return inode->i_op->listxattr(dentry, buf, size); } -int osd_xattr_del(const struct lu_context *ctxt, struct dt_object *dt, - const char *name, struct thandle *handle) +static int osd_xattr_del(const struct lu_context *ctxt, struct dt_object *dt, + const char *name, struct thandle *handle) { struct inode *inode = osd_dt_obj(dt)->oo_inode; struct osd_thread_info *info = lu_context_key_get(ctxt, &osd_key); @@ -886,26 +886,89 @@ int osd_xattr_del(const struct lu_context *ctxt, struct dt_object *dt, return inode->i_op->removexattr(dentry, name); } +static int osd_dir_page_build(const struct lu_context *ctx, int first, + void *area, int nob, + struct dt_it_ops *iops, struct dt_it *it, + __u32 *start, __u32 *end, + struct lu_dirent **last) +{ + int result; + struct osd_thread_info *info = lu_context_key_get(ctx, &osd_key); + struct lu_fid *fid = &info->oti_fid; + struct lu_dirent *ent; + + if (first) { + area += sizeof (struct lu_dirpage); + nob -= sizeof (struct lu_dirpage); + } + + LASSERT(nob > sizeof *ent); + + ent = area; + result = 0; + do { + char *name; + int len; + int recsize; + __u32 hash; + + name = (char *)iops->key(ctx, it); + len = iops->key_size(ctx, it); + + *fid = *(struct lu_fid *)iops->rec(ctx, it); + fid_cpu_to_le(fid); + + recsize = (sizeof *ent + len + 3) & ~3; + /* + * XXX an interface is needed to obtain a hash. + * + * XXX this is horrible, most horrible hack. + */ + hash = *(__u32 *)(name - sizeof(__u16) - sizeof(__u32)); + *end = hash; + if (nob >= recsize) { + ent->lde_fid = *fid; + ent->lde_hash = hash; + ent->lde_namelen = cpu_to_le16(len); + ent->lde_reclen = cpu_to_le16(recsize); + memcpy(ent->lde_name, name, len); + if (first && ent == area) + *start = hash; + *last = ent; + ent = (void *)ent + recsize; + nob -= recsize; + result = iops->next(ctx, it); + } else { + /* + * record doesn't fit into page, enlarge previous one. + */ + LASSERT(*last != NULL); + (*last)->lde_reclen = + cpu_to_le16(le16_to_cpu((*last)->lde_reclen) + + nob); + break; + } + } while (result == 0); + return result; +} -int osd_readpage(const struct lu_context *ctxt, - struct dt_object *dt, struct lu_rdpg *rdpg) +static int osd_readpage(const struct lu_context *ctxt, + struct dt_object *dt, const struct lu_rdpg *rdpg) { + struct dt_it *it; struct osd_object *obj = osd_dt_obj(dt); - int i, rc, tmpcount, tmpsize = 0; - struct dt_it_ops *iops; - struct dt_it *it; + struct dt_it_ops *iops; + int i; + int rc; + int nob; LASSERT(lu_object_exists(ctxt, &dt->do_lu)); LASSERT(osd_invariant(obj)); LASSERT(rdpg->rp_pages != NULL); - /* check input params */ - if ((rdpg->rp_offset & (obj->oo_inode->i_blksize - 1)) != 0) { - CERROR("offset "LPU64" not on a block boundary of %lu\n", - rdpg->rp_offset, obj->oo_inode->i_blksize); + if (rdpg->rp_count <= 0) return -EFAULT; - } if (rdpg->rp_count & (obj->oo_inode->i_blksize - 1)) { CERROR("size %u is not multiple of blocksize %lu\n", @@ -913,67 +976,58 @@ int osd_readpage(const struct lu_context *ctxt, return -EFAULT; } - /* prepare output */ - rdpg->rp_size = obj->oo_inode->i_size; - /* - * iterating directory and fill pages from @rdpg + * iterating through directory and fill pages from @rdpg */ iops = &dt->do_index_ops->dio_it; it = iops->init(ctxt, dt); if (it == NULL) return -ENOMEM; - + /* + * XXX position iterator at rdpg->rp_hash + */ rc = iops->get(ctxt, it, (const void *)""); if (rc > 0) { - for (i = 0, tmpcount = rdpg->rp_count; - i < rdpg->rp_npages; i++, tmpcount -= tmpsize) { - struct lu_dir_entry *entry, *last; - int page_space = PAGE_SIZE; - - tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount; - entry = kmap(rdpg->rp_pages[i]); - last = entry; - - for (rc = 0; rc == 0; ) { - rc = iops->next(ctxt, it); - - if (rc == 0) { - struct lu_fid *fid; - char *name; - int len; - - fid = (void *)iops->rec(ctxt, it); - name = (void *)iops->key(ctxt, it); - len = iops->key_size(ctxt, it); - - entry->de_fid = *fid; - fid_cpu_to_le(&entry->de_fid); - - entry->de_name_len = cpu_to_le16(len + 1); - entry->de_rec_len = cpu_to_le16(LU_DIR_REC_LEN(len + 1)); - - strncpy(entry->de_name, name, len); - entry->de_name[len] = '\0'; - - page_space -= LU_DIR_REC_LEN(len + 1); - last = entry; - - entry = (struct lu_dir_entry *)((char *)entry + - LU_DIR_REC_LEN(len + 1)); - } - } - /* last entry fills whole space in the page */ - if (page_space < PAGE_SIZE) - last->de_rec_len += page_space; - kunmap(rdpg->rp_pages[i]); + struct page *pg; /* no, Richard, it _is_ initialized */ + struct lu_dirent *last; + __u32 hash_start; + __u32 hash_end; + + for (i = 0, rc = 0, nob = rdpg->rp_count; + rc == 0 && nob > 0; i++, nob -= CFS_PAGE_SIZE) { + + LASSERT(i < rdpg->rp_npages); + pg = rdpg->rp_pages[i]; + rc = osd_dir_page_build(ctxt, !i, kmap(pg), + min_t(int, nob, CFS_PAGE_SIZE), + iops, it, + &hash_start, &hash_end, &last); + kunmap(pg); } iops->put(ctxt, it); - - rc = 0; - } else if (rc == 0) { + if (rc > 0) { + /* + * end of directory. + */ + hash_end = ~0ul; + rc = 0; + } + if (rc == 0) { + struct lu_dirpage *dp; + + dp = kmap(rdpg->rp_pages[0]); + dp->ldp_hash_start = hash_start; + dp->ldp_hash_end = hash_end; + kunmap(rdpg->rp_pages[0]); + kmap(pg); + LASSERT(page_address(pg) <= (void *)last && + (void *)last < page_address(pg) + CFS_PAGE_SIZE); + last->lde_reclen = 0; + kunmap(pg); + } + } else if (rc == 0) rc = -EIO; - } + iops->put(ctxt, it); iops->fini(ctxt, it); return rc; @@ -990,6 +1044,7 @@ static struct dt_object_operations osd_obj_ops = { .do_ref_del = osd_object_ref_del, .do_xattr_get = osd_xattr_get, .do_xattr_set = osd_xattr_set, + .do_xattr_del = osd_xattr_del, .do_xattr_list = osd_xattr_list, .do_readpage = osd_readpage };