}
static int cml_readpage(const struct lu_context *ctxt, struct md_object *mo,
- struct lu_rdpg *rdpg)
+ const struct lu_rdpg *rdpg)
{
int rc;
ENTRY;
RETURN(-EFAULT);
}
+static int cmr_readpage(const struct lu_context *ctxt, struct md_object *mo,
+ const struct lu_rdpg *rdpg)
+{
+ RETURN(-EREMOTE);
+}
+
static struct md_object_operations cmr_mo_ops = {
.moo_attr_get = cmr_attr_get,
.moo_attr_set = cmr_attr_set,
.moo_ref_add = cmr_ref_add,
.moo_ref_del = cmr_ref_del,
.moo_open = cmr_open,
- .moo_close = cmr_close
+ .moo_close = cmr_close,
+ .moo_readpage = cmr_readpage
};
/* remote part of md_dir operations */
struct dt_object *dt, struct thandle *th);
int (*do_readpage)(const struct lu_context *ctxt,
- struct dt_object *dt, struct lu_rdpg *rdpg);
+ struct dt_object *dt, const struct lu_rdpg *rdpg);
};
/*
struct lu_rdpg {
/* input params, should be filled out by mdt */
- loff_t rp_offset; /* offset, page alligned*/
+ __u32 rp_hash; /* hash */
int rp_count; /* count in bytes */
int rp_npages; /* number of pages */
struct page **rp_pages; /* pointers to pages */
-
- /* output params, filled by osd */
- __u64 rp_size; /* file size in total */
};
enum lu_xattr_flags {
fid->f_ver = le32_to_cpu(fid_ver(fid));
}
-#define LU_NAME_LEN 255
-
/*
- * lustre directory entry. This is used for sending directory content to the
- * client, where it later gets converted to what Linux expects.
+ * Layout of readdir pages, as transmitted on wire.
*/
-struct lu_dir_entry {
- struct lu_fid de_fid; /* file fid */
- __u16 de_rec_len; /* rec len */
- __u16 de_name_len; /* name len */
- char de_name[LU_NAME_LEN]; /* file name */
+struct lu_dirent {
+ struct lu_fid lde_fid;
+ __u32 lde_hash;
+ __u16 lde_reclen;
+ __u16 lde_namelen;
+ char lde_name[0];
};
-#define LU_DIR_PAD 4
-#define LU_DIR_ROUND (LU_DIR_PAD - 1)
-
-#define LU_DIR_REC_LEN(len) ((sizeof(struct lu_fid) + 2*sizeof(__u16) + \
- len + 1 + LU_DIR_ROUND) & ~LU_DIR_ROUND)
+struct lu_dirpage {
+ __u32 ldp_hash_start;
+ __u32 ldp_hash_end;
+ __u16 ldp_pad0;
+ __u32 ldp_pad1;
+ struct lu_dirent ldp_entries[0];
+};
#define MEA_MAGIC_LAST_CHAR 0xb2221ca1
#define MEA_MAGIC_ALL_CHARS 0xb222a11c
int (*moo_close)(const struct lu_context *, struct md_object *);
int (*moo_readpage)(const struct lu_context *, struct md_object *,
- struct lu_rdpg *);
+ const struct lu_rdpg *);
};
/*
}
static inline int mo_readpage(const struct lu_context *cx, struct md_object *m,
- struct lu_rdpg *rdpg)
+ const struct lu_rdpg *rdpg)
{
LASSERT(m->mo_ops->moo_readpage);
return m->mo_ops->moo_readpage(cx, m, rdpg);
Index: iam/fs/ext3/Makefile
===================================================================
--- iam.orig/fs/ext3/Makefile 2006-05-31 20:24:32.000000000 +0400
-+++ iam/fs/ext3/Makefile 2006-07-25 16:59:51.000000000 +0400
++++ iam/fs/ext3/Makefile 2006-07-28 01:53:16.000000000 +0400
@@ -6,7 +6,7 @@ obj-$(CONFIG_EXT3_FS) += ext3.o
ext3-y := balloc.o bitmap.o dir.o file.o fsync.o ialloc.o inode.o iopen.o \
Index: iam/include/linux/lustre_iam.h
===================================================================
--- iam.orig/include/linux/lustre_iam.h 2006-05-31 20:24:32.000000000 +0400
-+++ iam/include/linux/lustre_iam.h 2006-07-25 16:59:51.000000000 +0400
++++ iam/include/linux/lustre_iam.h 2006-07-28 01:53:16.000000000 +0400
@@ -1,9 +1,68 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
Index: iam/fs/ext3/Makefile
===================================================================
---- iam.orig/fs/ext3/Makefile 2006-07-25 16:59:51.000000000 +0400
-+++ iam/fs/ext3/Makefile 2006-07-25 16:59:51.000000000 +0400
+--- iam.orig/fs/ext3/Makefile 2006-07-28 01:53:16.000000000 +0400
++++ iam/fs/ext3/Makefile 2006-07-28 01:53:16.000000000 +0400
@@ -6,7 +6,7 @@ obj-$(CONFIG_EXT3_FS) += ext3.o
ext3-y := balloc.o bitmap.o dir.o file.o fsync.o ialloc.o inode.o iopen.o \
ext3-$(CONFIG_EXT3_FS_POSIX_ACL) += acl.o
Index: iam/fs/ext3/dir.c
===================================================================
---- iam.orig/fs/ext3/dir.c 2006-07-25 16:59:51.000000000 +0400
-+++ iam/fs/ext3/dir.c 2006-07-25 16:59:51.000000000 +0400
+--- iam.orig/fs/ext3/dir.c 2006-07-28 01:53:16.000000000 +0400
++++ iam/fs/ext3/dir.c 2006-07-28 01:53:16.000000000 +0400
@@ -28,6 +28,7 @@
#include <linux/smp_lock.h>
#include <linux/slab.h>
(filp->f_version != inode->i_version)) {
Index: iam/fs/ext3/file.c
===================================================================
---- iam.orig/fs/ext3/file.c 2006-07-25 16:59:51.000000000 +0400
-+++ iam/fs/ext3/file.c 2006-07-25 16:59:51.000000000 +0400
+--- iam.orig/fs/ext3/file.c 2006-07-28 01:53:16.000000000 +0400
++++ iam/fs/ext3/file.c 2006-07-28 01:53:16.000000000 +0400
@@ -23,6 +23,7 @@
#include <linux/jbd.h>
#include <linux/ext3_fs.h>
Index: iam/fs/ext3/iam-uapi.c
===================================================================
--- iam.orig/fs/ext3/iam-uapi.c 2004-04-06 17:27:52.000000000 +0400
-+++ iam/fs/ext3/iam-uapi.c 2006-07-25 16:59:51.000000000 +0400
++++ iam/fs/ext3/iam-uapi.c 2006-07-28 01:53:16.000000000 +0400
@@ -0,0 +1,361 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+}
Index: iam/fs/ext3/ioctl.c
===================================================================
---- iam.orig/fs/ext3/ioctl.c 2006-07-25 16:59:51.000000000 +0400
-+++ iam/fs/ext3/ioctl.c 2006-07-25 16:59:51.000000000 +0400
+--- iam.orig/fs/ext3/ioctl.c 2006-07-28 01:53:16.000000000 +0400
++++ iam/fs/ext3/ioctl.c 2006-07-28 01:53:16.000000000 +0400
@@ -250,6 +250,6 @@ flags_err:
}
Index: iam/include/linux/lustre_iam.h
===================================================================
---- iam.orig/include/linux/lustre_iam.h 2006-07-25 16:59:51.000000000 +0400
-+++ iam/include/linux/lustre_iam.h 2006-07-25 16:59:51.000000000 +0400
+--- iam.orig/include/linux/lustre_iam.h 2006-07-28 01:53:16.000000000 +0400
++++ iam/include/linux/lustre_iam.h 2006-07-28 01:53:16.000000000 +0400
@@ -30,9 +30,6 @@
#ifndef __LINUX_LUSTRE_IAM_H__
#define __LINUX_LUSTRE_IAM_H__
#define PageChecked(page) test_bit(PG_checked, &(page)->flags)
#define SetPageChecked(page) set_bit(PG_checked, &(page)->flags)
+static __u32 hash_x_index(__u32 value)
+{
+ return ((__u32)~0) - value;
+}
+
/* returns the page unlocked, but with a reference */
static int ll_dir_readpage(struct file *file, struct page *page)
{
struct inode *inode = page->mapping->host;
struct ptlrpc_request *request;
struct mdt_body *body;
- __u64 offset;
- int rc = 0;
+ __u32 hash;
+ int rc;
ENTRY;
- offset = (__u64)page->index << PAGE_SHIFT;
- CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off "LPU64"\n",
- inode->i_ino, inode->i_generation, inode, offset);
+ hash = hash_x_index(page->index);
+ CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off %lu\n",
+ inode->i_ino, inode->i_generation, inode, (unsigned long)hash);
rc = md_readpage(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode),
- offset, page, &request);
+ hash, page, &request);
if (!rc) {
body = lustre_msg_buf(request->rq_repmsg, 0, sizeof (*body));
LASSERT (body != NULL); /* checked by md_readpage() */
page_cache_release(page);
}
-static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
+/*
+ * Find, kmap and return page that contains given hash.
+ */
+static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash,
+ __u32 *start, __u32 *end)
+{
+ struct address_space *mapping = dir->i_mapping;
+ /*
+ * Complement of hash is used as an index so that
+ * radix_tree_gang_lookup() can be used to find a page with starting
+ * hash _smaller_ than one we are looking for.
+ */
+ unsigned long offset = hash_x_index(hash);
+ struct page *page;
+ int found;
+
+ spin_lock_irq(&mapping->tree_lock);
+ found = radix_tree_gang_lookup(&mapping->page_tree,
+ (void **)&page, offset, 1);
+ if (found > 0) {
+ struct lu_dirpage *dp;
+
+ page_cache_get(page);
+ spin_unlock_irq(&mapping->tree_lock);
+ /*
+ * In contrast to find_lock_page() we are sure that directory
+ * page cannot be truncated (while DLM lock is held) and,
+ * hence, can avoid restart.
+ *
+ * In fact, page cannot be locked here at all, because
+ * ll_dir_readpage() does synchronous io.
+ */
+ wait_on_page(page);
+ if (PageUptodate(page)) {
+ dp = kmap(page);
+ *start = le32_to_cpu(dp->ldp_hash_start);
+ *end = le32_to_cpu(dp->ldp_hash_end);
+ LASSERT(*start <= hash);
+ if (hash > *end || (*end != *start && hash == *end)) {
+ kunmap(page);
+ page_cache_release(page);
+ page = NULL;
+ }
+ } else {
+ page_cache_release(page);
+ page = ERR_PTR(-EIO);
+ }
+
+ } else {
+ spin_unlock_irq(&mapping->tree_lock);
+ page = NULL;
+ }
+ return page;
+}
+
+/*
+ * Chain of hash overflow pages.
+ */
+struct ll_dir_chain {
+ /* XXX something. Later */
+};
+
+static void ll_dir_chain_init(struct ll_dir_chain *chain)
+{
+}
+
+static void ll_dir_chain_fini(struct ll_dir_chain *chain)
+{
+}
+
+static struct page *ll_get_dir_page(struct inode *dir, __u32 hash, int exact,
+ struct ll_dir_chain *chain)
{
ldlm_policy_data_t policy = {.l_inodebits = {MDS_INODELOCK_UPDATE} };
struct address_space *mapping = dir->i_mapping;
struct lustre_handle lockh;
struct page *page;
+ struct lu_dirpage *dp;
int rc;
+ __u32 start;
+ __u32 end;
rc = md_lock_match(ll_i2sbi(dir)->ll_md_exp, LDLM_FL_BLOCK_GRANTED,
ll_inode2fid(dir), LDLM_IBITS, &policy, LCK_CR, &lockh);
}
ldlm_lock_dump_handle(D_OTHER, &lockh);
- page = read_cache_page(mapping, n,
+ page = ll_dir_page_locate(dir, hash, &start, &end);
+ if (IS_ERR(page))
+ GOTO(out_unlock, page);
+
+ if (page != NULL) {
+ if (exact && hash != start) {
+ /*
+ * readdir asked for a page starting _exactly_ from
+ * given hash, but cache contains stale page, with
+ * entries with smaller hash values. Stale page should
+ * be invalidated, and new one fetched.
+ */
+ CWARN("Stale readpage page: %#lx != %#lx\n",
+ (unsigned long)hash, (unsigned long)start);
+ truncate_complete_page(mapping, page);
+ page_cache_release(page);
+ } else
+ GOTO(hash_collision, page);
+ }
+
+ page = read_cache_page(mapping, hash_x_index(hash),
(filler_t*)mapping->a_ops->readpage, NULL);
if (IS_ERR(page))
GOTO(out_unlock, page);
ll_check_page(dir, page);
if (PageError(page))
goto fail;
-
+ dp = page_address(page);
+
+ start = le32_to_cpu(dp->ldp_hash_start);
+ end = le32_to_cpu(dp->ldp_hash_end);
+hash_collision:
+ if (end == start) {
+ LASSERT(start == hash);
+ CWARN("Page-wide hash collision: %#lx\n", (unsigned long)end);
+ /*
+ * Fetch whole overflow chain...
+ *
+ * XXX not yet.
+ */
+ goto fail;
+ }
out_unlock:
ldlm_lock_decref(&lockh, LCK_CR);
return page;
goto out_unlock;
}
-static inline struct lu_dir_entry *ll_next_entry(struct lu_dir_entry *p)
+static loff_t ll_llseek(struct file *filp, loff_t off, int whence)
{
- return (struct lu_dir_entry *)((char *)p + le16_to_cpu(p->de_rec_len));
+ if (off != 0 || whence != 1 /* SEEK_CUR */) {
+ /*
+ * Except when telldir() is going on, reset readdir to the
+ * beginning of hash collision chain.
+ */
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(filp);
+
+ fd->fd_dir.lfd_dup = 0;
+ }
+ return default_llseek(filp, off, whence);
}
-int ll_readdir(struct file *filp, void *dirent, filldir_t filldir)
+static struct lu_dirent *lu_dirent_start(struct lu_dirpage *dp)
{
- struct inode *inode = filp->f_dentry->d_inode;
- loff_t pos = filp->f_pos;
- unsigned offset = pos & ~PAGE_CACHE_MASK;
- unsigned long n = pos >> PAGE_CACHE_SHIFT;
- unsigned long npages = dir_pages(inode);
- int rc = 0;
- ENTRY;
-
- CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) pos %llu/%llu\n",
- inode->i_ino, inode->i_generation, inode, pos, inode->i_size);
+ return dp->ldp_entries;
+}
- if (pos > inode->i_size - LU_DIR_REC_LEN(1))
- RETURN(0);
+static struct lu_dirent *lu_dirent_next(struct lu_dirent *ent)
+{
+ struct lu_dirent *next;
- for ( ; n < npages; n++, offset = 0) {
- char *kaddr, *limit;
- struct lu_dir_entry *de;
- struct page *page;
+ if (ent->lde_reclen != 0)
+ next = ((void *)ent) + le16_to_cpu(ent->lde_reclen);
+ else
+ next = NULL;
+ return next;
+}
- CDEBUG(D_VFSTRACE,"read %lu of dir %lu/%u page %lu/%lu size %llu\n",
- PAGE_CACHE_SIZE, inode->i_ino, inode->i_generation,
- n, npages, inode->i_size);
- page = ll_get_dir_page(inode, n);
+int ll_readdir(struct file *filp, void *cookie, filldir_t filldir)
+{
+ struct inode *inode = filp->f_dentry->d_inode;
+ struct ll_inode_info *info = ll_i2info(inode);
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(filp);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ __u32 pos = filp->f_pos;
+ struct page *page;
+ struct ll_dir_chain chain;
+ __u32 prevhash;
+ int rc;
+ int dup;
+ int done;
+ int shift;
+ ENTRY;
- /* size might have been updated by md_readpage */
- npages = dir_pages(inode);
+ CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) pos %lu/%llu\n",
+ inode->i_ino, inode->i_generation, inode,
+ (unsigned long)pos, inode->i_size);
- if (IS_ERR(page)) {
- rc = PTR_ERR(page);
- CERROR("error reading dir %lu/%u page %lu: rc %d\n",
- inode->i_ino, inode->i_generation, n, rc);
- continue;
- }
+ if (pos == ~0)
+ /*
+ * end-of-file.
+ */
+ RETURN(0);
- kaddr = page_address(page);
-
- de = (struct lu_dir_entry *)(kaddr + offset);
- limit = kaddr + PAGE_CACHE_SIZE - LU_DIR_REC_LEN(1);
- for ( ;(char*)de <= limit; de = ll_next_entry(de)) {
- if (fid_oid(&de->de_fid) && fid_seq(&de->de_fid)) {
- struct ll_sb_info *sbi = ll_i2sbi(inode);
- int over;
-
- rc = 0; /* no error if we return something */
-
- offset = (char *)de - kaddr;
- fid_le_to_cpu(&de->de_fid);
- over = filldir(dirent, de->de_name, de->de_name_len,
- (n << PAGE_CACHE_SHIFT) | offset,
- ll_fid_build_ino(sbi, &de->de_fid),
- 0);
- if (over) {
- ll_put_page(page);
- GOTO(done, rc);
+ rc = 0;
+ dup = 0;
+ done = 0;
+ shift = 0;
+ prevhash = ~0; /* impossible hash value */
+ ll_dir_chain_init(&chain);
+
+ page = ll_get_dir_page(inode, pos, 0, &chain);
+
+ while (rc == 0 && !done) {
+ struct lu_dirpage *dp;
+ struct lu_dirent *ent;
+
+ if (!IS_ERR(page)) {
+ __u32 hash; /* no, Richard, it _is_ initialized */
+ __u32 next;
+
+ dp = page_address(page);
+ for (ent = lu_dirent_start(dp); ent != NULL && !done;
+ ent = lu_dirent_next(ent)) {
+ char *name;
+ int namelen;
+ struct lu_fid fid;
+ ino_t ino;
+
+ /*
+ * XXX: implement correct swabbing here.
+ */
+
+ hash = le32_to_cpu(ent->lde_hash);
+ namelen = le16_to_cpu(ent->lde_namelen);
+
+ if (hash < pos)
+ /*
+ * Skip until we find target hash
+ * value.
+ */
+ continue;
+
+ if (namelen == 0)
+ /*
+ * Skip dummy record.
+ */
+ continue;
+ /*
+ * Keep track of how far we get into duplicate
+ * hash segment.
+ */
+ if (hash == prevhash)
+ dup++;
+ prevhash = hash;
+
+ if (hash == fd->fd_dir.lfd_duppos &&
+ fd->fd_dir.lfd_dup > 0) {
+ fd->fd_dir.lfd_dup--;
+ continue;
}
+
+ fid = ent->lde_fid;
+ name = ent->lde_name;
+ fid_le_to_cpu(&fid);
+ ino = ll_fid_build_ino(sbi, &fid);
+
+ done = filldir(cookie, name, namelen,
+ hash, ino, DT_UNKNOWN);
}
+ next = le32_to_cpu(dp->ldp_hash_end);
+ ll_put_page(page);
+ if (!done) {
+ pos = next;
+ if (pos == ~0)
+ /*
+ * End of directory reached.
+ */
+ done = 1;
+ else if (1 /* chain is exhausted*/)
+ /*
+ * Normal case: continue to the next
+ * page.
+ */
+ page = ll_get_dir_page(inode, pos, 1,
+ &chain);
+ else {
+ /*
+ * go into overflow page.
+ */
+ }
+ } else
+ pos = hash;
+ } else {
+ rc = PTR_ERR(page);
+ CERROR("error reading dir "DFID3" at %lu: rc %d\n",
+ PFID3(&info->lli_fid), (unsigned long)pos, rc);
}
- ll_put_page(page);
}
-done:
- filp->f_pos = (n << PAGE_CACHE_SHIFT) | offset;
+ filp->f_pos = pos;
filp->f_version = inode->i_version;
+ fd->fd_dir.lfd_dup = dup;
+ fd->fd_dir.lfd_duppos = prevhash;
touch_atime(filp->f_vfsmnt, filp->f_dentry);
+ ll_dir_chain_fini(&chain);
+
RETURN(rc);
}
.release = ll_dir_release,
.read = generic_read_dir,
.readdir = ll_readdir,
+ .llseek = ll_llseek,
.ioctl = ll_dir_ioctl
};
#include <lustre_debug.h>
#include <lustre_ver.h>
#include <lustre_disk.h> /* for s2sbi */
-
+
/*
struct lustre_intent_data {
__u64 it_lock_handle[2];
unsigned long ras_consecutive_pages;
/*
* number of read requests after the last read-ahead window reset
- * As window is reset on each seek, this is effectively the number
+ * As window is reset on each seek, this is effectively the number
* on consecutive read request and is used to trigger read-ahead.
*/
unsigned long ras_consecutive_requests;
*/
unsigned long ras_requests;
/*
- * Page index with respect to the current request, these value
+ * Page index with respect to the current request, these value
* will not be accurate when dealing with reads issued via mmap.
*/
unsigned long ras_request_index;
struct list_head ras_read_beads;
};
+struct ll_file_dir {
+ int lfd_dup;
+ loff_t lfd_duppos;
+};
+
extern kmem_cache_t *ll_file_data_slab;
struct lustre_handle;
struct ll_file_data {
__u32 fd_flags;
struct lustre_handle fd_cwlockh;
unsigned long fd_gid;
+ struct ll_file_dir fd_dir;
};
struct lov_stripe_md;
unsigned int cmd, unsigned long arg);
void ll_umount_begin(struct super_block *sb);
int ll_remount_fs(struct super_block *sb, int *flags, char *data);
-int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req,
+int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req,
int offset, struct super_block *);
void lustre_dump_dentry(struct dentry *, int recur);
void lustre_dump_inode(struct inode *);
rc = md_readpage(lmv->tgts[i].ltd_exp, &rid,
offset, page, request);
- if (rc == 0 && !lu_fid_eq(&rid, fid))
+ if (0 && rc == 0 && !lu_fid_eq(&rid, fid))
/* this page isn't from master object. To avoid "." and ".."
* duplication in directory, we have to remove them from all
- * slave objects */
+ * slave objects
+ *
+ * XXX this is not needed for cmd3 readdir, because only
+ * master directory has dot and dotdot.
+ */
lmv_remove_dots(page);
RETURN(rc);
result = iops->get(ctx, it, (const void *)"");
if (result > 0) {
int i;
- for (result = 0, i = 0; result == 0 && i < 3; ++i) {
+ for (result = 0, i = 0; result == 0 && i < 3; ++i)
result = iops->next(ctx, it);
-#if 0
- if (result == 0) {
- struct lu_fid *fid;
- char *name;
- int len;
-
- fid = (void *)iops->rec(ctx, it);
- name = (void *)iops->key(ctx, it);
- len = iops->key_size(ctx, it);
- CERROR("entry: "DFID3": \"%*.*s\"\n",
- PFID3(fid), len, len, name);
- }
-#endif
- }
iops->put(ctx, it);
if (result == 0)
result = -ENOTEMPTY;
}
static int mdd_readpage(const struct lu_context *ctxt, struct md_object *obj,
- struct lu_rdpg *rdpg)
+ const struct lu_rdpg *rdpg)
{
struct dt_object *next;
int rc;
if (rc < 0) {
if (rc == -ENODATA || rc == -EOPNOTSUPP)
rc = 0;
- else
+ else
CERROR("got acl size: %d\n", rc);
} else {
repbody->aclsize = rc;
if (lock) {
res_id = &lock->l_resource->lr_name;
LDLM_DEBUG(lock, "we will return this lock client\n");
- LASSERTF(fid_res_name_eq(mdt_object_fid(child),
+ LASSERTF(fid_res_name_eq(mdt_object_fid(child),
&lock->l_resource->lr_name),
"Lock res_id: %lu/%lu/%lu, Fid: "DFID3".\n",
(unsigned long)res_id->name[0],
for (i = 0, tmpcount = rdpg->rp_count;
i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
- tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
+ tmpsize = min(tmpcount, CFS_PAGE_SIZE);
ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
}
int rc;
int i;
ENTRY;
- RETURN(-EOPNOTSUPP);
if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
RETURN(-ENOMEM);
- reqbody = req_capsule_client_get(&info->mti_pill,
- &RMF_MDT_BODY);
- repbody = req_capsule_server_get(&info->mti_pill,
- &RMF_MDT_BODY);
+ reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
+ repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
if (reqbody == NULL || repbody == NULL)
RETURN(-EFAULT);
* reqbody->size contains offset of where to start to read and
* reqbody->nlink contains number bytes to read.
*/
- rdpg->rp_offset = reqbody->size;
- rdpg->rp_count = reqbody->nlink;
- rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >> PAGE_SHIFT;
- LASSERT((rdpg->rp_offset & (PAGE_SIZE - 1)) == 0);
-
+ rdpg->rp_hash = reqbody->size;
+ if ((__u64)rdpg->rp_hash != reqbody->size) {
+ CERROR("Invalid hash: %#llx != %#llx\n",
+ (__u64)rdpg->rp_hash, reqbody->size);
+ RETURN(-EFAULT);
+ }
+ rdpg->rp_count = reqbody->nlink;
+ rdpg->rp_npages = rdpg->rp_count + CFS_PAGE_SIZE - 1 >> CFS_PAGE_SHIFT;
OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
if (rdpg->rp_pages == NULL)
- GOTO(out, rc = -ENOMEM);
+ RETURN(-ENOMEM);
for (i = 0; i < rdpg->rp_npages; ++i) {
rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
if (rc)
GOTO(free_rdpg, rc);
- repbody->size = rdpg->rp_size;
- repbody->valid = OBD_MD_FLSIZE;
-
/* send pages to client */
rc = mdt_sendpage(info, rdpg);
for (i = 0; i < rdpg->rp_npages; i++)
if (rdpg->rp_pages[i] != NULL)
__free_pages(rdpg->rp_pages[i], 0);
- OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
-out:
+ OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
return rc;
}
LASSERT(osd_invariant(obj));
}
-int osd_xattr_get(const struct lu_context *ctxt, struct dt_object *dt,
- void *buf, int size, const char *name)
+static int osd_xattr_get(const struct lu_context *ctxt, struct dt_object *dt,
+ void *buf, int size, const char *name)
{
struct inode *inode = osd_dt_obj(dt)->oo_inode;
struct osd_thread_info *info = lu_context_key_get(ctxt, &osd_key);
return inode->i_op->getxattr(dentry, name, buf, size);
}
-int osd_xattr_set(const struct lu_context *ctxt, struct dt_object *dt,
- const void *buf, int size, const char *name, int fl,
- struct thandle *handle)
+static int osd_xattr_set(const struct lu_context *ctxt, struct dt_object *dt,
+ const void *buf, int size, const char *name, int fl,
+ struct thandle *handle)
{
int fs_flags;
return inode->i_op->setxattr(dentry, name, buf, size, fs_flags);
}
-int osd_xattr_list(const struct lu_context *ctxt, struct dt_object *dt,
- void *buf, int size)
+static int osd_xattr_list(const struct lu_context *ctxt, struct dt_object *dt,
+ void *buf, int size)
{
struct inode *inode = osd_dt_obj(dt)->oo_inode;
struct osd_thread_info *info = lu_context_key_get(ctxt, &osd_key);
return inode->i_op->listxattr(dentry, buf, size);
}
-int osd_xattr_del(const struct lu_context *ctxt, struct dt_object *dt,
- const char *name, struct thandle *handle)
+static int osd_xattr_del(const struct lu_context *ctxt, struct dt_object *dt,
+ const char *name, struct thandle *handle)
{
struct inode *inode = osd_dt_obj(dt)->oo_inode;
struct osd_thread_info *info = lu_context_key_get(ctxt, &osd_key);
return inode->i_op->removexattr(dentry, name);
}
+static int osd_dir_page_build(const struct lu_context *ctx, int first,
+ void *area, int nob,
+ struct dt_it_ops *iops, struct dt_it *it,
+ __u32 *start, __u32 *end,
+ struct lu_dirent **last)
+{
+ int result;
+ struct osd_thread_info *info = lu_context_key_get(ctx, &osd_key);
+ struct lu_fid *fid = &info->oti_fid;
+ struct lu_dirent *ent;
+
+ if (first) {
+ area += sizeof (struct lu_dirpage);
+ nob -= sizeof (struct lu_dirpage);
+ }
+
+ LASSERT(nob > sizeof *ent);
+
+ ent = area;
+ result = 0;
+ do {
+ char *name;
+ int len;
+ int recsize;
+ __u32 hash;
+
+ name = (char *)iops->key(ctx, it);
+ len = iops->key_size(ctx, it);
+
+ *fid = *(struct lu_fid *)iops->rec(ctx, it);
+ fid_cpu_to_le(fid);
+
+ recsize = (sizeof *ent + len + 3) & ~3;
+ /*
+ * XXX an interface is needed to obtain a hash.
+ *
+ * XXX this is horrible, most horrible hack.
+ */
+ hash = *(__u32 *)(name - sizeof(__u16) - sizeof(__u32));
+ *end = hash;
+ if (nob >= recsize) {
+ ent->lde_fid = *fid;
+ ent->lde_hash = hash;
+ ent->lde_namelen = cpu_to_le16(len);
+ ent->lde_reclen = cpu_to_le16(recsize);
+ memcpy(ent->lde_name, name, len);
+ if (first && ent == area)
+ *start = hash;
+ *last = ent;
+ ent = (void *)ent + recsize;
+ nob -= recsize;
+ result = iops->next(ctx, it);
+ } else {
+ /*
+ * record doesn't fit into page, enlarge previous one.
+ */
+ LASSERT(*last != NULL);
+ (*last)->lde_reclen =
+ cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
+ nob);
+ break;
+ }
+ } while (result == 0);
+ return result;
+}
-int osd_readpage(const struct lu_context *ctxt,
- struct dt_object *dt, struct lu_rdpg *rdpg)
+static int osd_readpage(const struct lu_context *ctxt,
+ struct dt_object *dt, const struct lu_rdpg *rdpg)
{
+ struct dt_it *it;
struct osd_object *obj = osd_dt_obj(dt);
- int i, rc, tmpcount, tmpsize = 0;
- struct dt_it_ops *iops;
- struct dt_it *it;
+ struct dt_it_ops *iops;
+ int i;
+ int rc;
+ int nob;
LASSERT(lu_object_exists(ctxt, &dt->do_lu));
LASSERT(osd_invariant(obj));
LASSERT(rdpg->rp_pages != NULL);
- /* check input params */
- if ((rdpg->rp_offset & (obj->oo_inode->i_blksize - 1)) != 0) {
- CERROR("offset "LPU64" not on a block boundary of %lu\n",
- rdpg->rp_offset, obj->oo_inode->i_blksize);
+ if (rdpg->rp_count <= 0)
return -EFAULT;
- }
if (rdpg->rp_count & (obj->oo_inode->i_blksize - 1)) {
CERROR("size %u is not multiple of blocksize %lu\n",
return -EFAULT;
}
- /* prepare output */
- rdpg->rp_size = obj->oo_inode->i_size;
-
/*
- * iterating directory and fill pages from @rdpg
+ * iterating through directory and fill pages from @rdpg
*/
iops = &dt->do_index_ops->dio_it;
it = iops->init(ctxt, dt);
if (it == NULL)
return -ENOMEM;
-
+ /*
+ * XXX position iterator at rdpg->rp_hash
+ */
rc = iops->get(ctxt, it, (const void *)"");
if (rc > 0) {
- for (i = 0, tmpcount = rdpg->rp_count;
- i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
- struct lu_dir_entry *entry, *last;
- int page_space = PAGE_SIZE;
-
- tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
- entry = kmap(rdpg->rp_pages[i]);
- last = entry;
-
- for (rc = 0; rc == 0; ) {
- rc = iops->next(ctxt, it);
-
- if (rc == 0) {
- struct lu_fid *fid;
- char *name;
- int len;
-
- fid = (void *)iops->rec(ctxt, it);
- name = (void *)iops->key(ctxt, it);
- len = iops->key_size(ctxt, it);
-
- entry->de_fid = *fid;
- fid_cpu_to_le(&entry->de_fid);
-
- entry->de_name_len = cpu_to_le16(len + 1);
- entry->de_rec_len = cpu_to_le16(LU_DIR_REC_LEN(len + 1));
-
- strncpy(entry->de_name, name, len);
- entry->de_name[len] = '\0';
-
- page_space -= LU_DIR_REC_LEN(len + 1);
- last = entry;
-
- entry = (struct lu_dir_entry *)((char *)entry +
- LU_DIR_REC_LEN(len + 1));
- }
- }
- /* last entry fills whole space in the page */
- if (page_space < PAGE_SIZE)
- last->de_rec_len += page_space;
- kunmap(rdpg->rp_pages[i]);
+ struct page *pg; /* no, Richard, it _is_ initialized */
+ struct lu_dirent *last;
+ __u32 hash_start;
+ __u32 hash_end;
+
+ for (i = 0, rc = 0, nob = rdpg->rp_count;
+ rc == 0 && nob > 0; i++, nob -= CFS_PAGE_SIZE) {
+
+ LASSERT(i < rdpg->rp_npages);
+ pg = rdpg->rp_pages[i];
+ rc = osd_dir_page_build(ctxt, !i, kmap(pg),
+ min_t(int, nob, CFS_PAGE_SIZE),
+ iops, it,
+ &hash_start, &hash_end, &last);
+ kunmap(pg);
}
iops->put(ctxt, it);
-
- rc = 0;
- } else if (rc == 0) {
+ if (rc > 0) {
+ /*
+ * end of directory.
+ */
+ hash_end = ~0ul;
+ rc = 0;
+ }
+ if (rc == 0) {
+ struct lu_dirpage *dp;
+
+ dp = kmap(rdpg->rp_pages[0]);
+ dp->ldp_hash_start = hash_start;
+ dp->ldp_hash_end = hash_end;
+ kunmap(rdpg->rp_pages[0]);
+ kmap(pg);
+ LASSERT(page_address(pg) <= (void *)last &&
+ (void *)last < page_address(pg) + CFS_PAGE_SIZE);
+ last->lde_reclen = 0;
+ kunmap(pg);
+ }
+ } else if (rc == 0)
rc = -EIO;
- }
+ iops->put(ctxt, it);
iops->fini(ctxt, it);
return rc;
.do_ref_del = osd_object_ref_del,
.do_xattr_get = osd_xattr_get,
.do_xattr_set = osd_xattr_set,
+ .do_xattr_del = osd_xattr_del,
.do_xattr_list = osd_xattr_list,
.do_readpage = osd_readpage
};