CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off "LPU64"\n",
inode->i_ino, inode->i_generation, inode, offset);
- mdc_pack_fid(&mdc_fid, inode->i_ino, inode->i_generation, S_IFDIR);
+ ll_pack_fid(&mdc_fid, inode->i_ino, inode->i_generation, S_IFDIR);
rc = mdc_readpage(ll_i2sbi(inode)->ll_mdc_exp, &mdc_fid,
offset, page, &request);
struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
{
- struct ldlm_res_id res_id =
- { .name = { dir->i_ino, (__u64)dir->i_generation} };
+ struct ldlm_res_id res_id;
struct lustre_handle lockh;
struct obd_device *obddev = class_exp2obd(ll_i2sbi(dir)->ll_mdc_exp);
struct address_space *mapping = dir->i_mapping;
ldlm_policy_data_t policy = {.l_inodebits = {MDS_INODELOCK_UPDATE} };
int rc;
+ fid_build_reg_res_name(ll_inode_lu_fid(dir), &res_id);
rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
&res_id, LDLM_IBITS, &policy, LCK_CR, &lockh);
if (!rc) {
struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CR,
ll_mdc_blocking_ast, ldlm_completion_ast, NULL, dir };
struct ptlrpc_request *request;
- struct mdc_op_data data;
+ struct mdc_op_data data = { { 0 } };
ll_prepare_mdc_op_data(&data, dir, NULL, NULL, 0, 0, NULL);
* 0: no live entries on this page.
*/
-int ll_readdir_page(char *addr, __u64 base, unsigned *offset,
- filldir_t filldir, void *cookie)
+static int ll_readdir_page(char *addr, __u64 base, unsigned *offset,
+ filldir_t filldir, void *cookie)
{
struct ll_dir_entry *de;
char *end;
return nr;
}
-int ll_readdir(struct file *filp, void *dirent, filldir_t filldir)
+static int ll_readdir_18(struct file *filp, void *dirent, filldir_t filldir)
{
struct inode *inode = filp->f_dentry->d_inode;
loff_t pos = filp->f_pos;
RETURN(rc);
}
+/*
+ * Chain of hash overflow pages.
+ */
+struct ll_dir_chain {
+ /* XXX something. Later */
+};
+
+static inline void ll_dir_chain_init(struct ll_dir_chain *chain)
+{
+}
+
+static inline void ll_dir_chain_fini(struct ll_dir_chain *chain)
+{
+}
+
+static inline __u32 hash_x_index(__u32 value)
+{
+ return ((__u32)~0) - value;
+}
+
+/*
+ * Layout of readdir pages, as transmitted on wire.
+ */
+struct lu_dirent {
+ struct lu_fid lde_fid;
+ __u64 lde_hash;
+ __u16 lde_reclen;
+ __u16 lde_namelen;
+ __u32 lde_padding;
+ char lde_name[0];
+};
+
+struct lu_dirpage {
+ __u64 ldp_hash_start;
+ __u64 ldp_hash_end;
+ __u16 ldp_flags;
+ __u16 ldp_pad0;
+ __u32 ldp_pad1;
+ struct lu_dirent ldp_entries[0];
+};
+
+enum lu_dirpage_flags {
+ LDF_EMPTY = 1 << 0
+};
+
+static inline struct lu_dirent *lu_dirent_start(struct lu_dirpage *dp)
+{
+ if (le16_to_cpu(dp->ldp_flags) & LDF_EMPTY)
+ return NULL;
+ else
+ return dp->ldp_entries;
+}
+
+static inline struct lu_dirent *lu_dirent_next(struct lu_dirent *ent)
+{
+ struct lu_dirent *next;
+
+ if (le16_to_cpu(ent->lde_reclen) != 0)
+ next = ((void *)ent) + le16_to_cpu(ent->lde_reclen);
+ else
+ next = NULL;
+
+ return next;
+}
+
+static inline int lu_dirent_size(struct lu_dirent *ent)
+{
+ if (le16_to_cpu(ent->lde_reclen) == 0) {
+ return (sizeof(*ent) +
+ le16_to_cpu(ent->lde_namelen) + 3) & ~3;
+ }
+ return le16_to_cpu(ent->lde_reclen);
+}
+
+#define DIR_END_OFF 0xfffffffffffffffeULL
+
+#ifdef HAVE_RW_TREE_LOCK
+#define TREE_READ_LOCK_IRQ(mapping) read_lock_irq(&(mapping)->tree_lock)
+#define TREE_READ_UNLOCK_IRQ(mapping) read_unlock_irq(&(mapping)->tree_lock)
+#else
+#define TREE_READ_LOCK_IRQ(mapping) spin_lock_irq(&(mapping)->tree_lock)
+#define TREE_READ_UNLOCK_IRQ(mapping) spin_unlock_irq(&(mapping)->tree_lock)
+#endif
+
+/* returns the page unlocked, but with a reference */
+static int ll_dir_readpage_20(struct file *file, struct page *page)
+{
+ struct inode *inode = page->mapping->host;
+ struct ptlrpc_request *request;
+ struct mdt_body *body;
+ struct ll_fid fid;
+ __u64 hash;
+ int rc;
+ ENTRY;
+
+ hash = hash_x_index(page->index);
+ CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off %lu\n",
+ inode->i_ino, inode->i_generation, inode, (unsigned long)hash);
+
+ ll_inode2fid(&fid, inode);
+ rc = mdc_readpage(ll_i2sbi(inode)->ll_mdc_exp, &fid,
+ hash, page, &request);
+ if (!rc) {
+ body = lustre_msg_buf(request->rq_repmsg, REPLY_REC_OFF,
+ sizeof(*body));
+ /* Checked by mdc_readpage() */
+ LASSERT(body != NULL);
+
+ if (body->valid & OBD_MD_FLSIZE) {
+ ll_inode_size_lock(inode, 0);
+ i_size_write(inode, body->size);
+ ll_inode_size_unlock(inode, 0);
+ }
+ SetPageUptodate(page);
+ }
+ ptlrpc_req_finished(request);
+
+ unlock_page(page);
+ EXIT;
+ return rc;
+}
+
+
+static void ll_check_page(struct inode *dir, struct page *page)
+{
+ /* XXX: check page format later */
+ SetPageChecked(page);
+}
+
+
+/*
+ * Find, kmap and return page that contains given hash.
+ */
+static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash,
+ __u64 *start, __u64 *end)
+{
+ struct address_space *mapping = dir->i_mapping;
+ /*
+ * Complement of hash is used as an index so that
+ * radix_tree_gang_lookup() can be used to find a page with starting
+ * hash _smaller_ than one we are looking for.
+ */
+ unsigned long offset = hash_x_index(hash);
+ struct page *page;
+ int found;
+ ENTRY;
+
+ TREE_READ_LOCK_IRQ(mapping);
+ found = radix_tree_gang_lookup(&mapping->page_tree,
+ (void **)&page, offset, 1);
+ if (found > 0) {
+ struct lu_dirpage *dp;
+
+ page_cache_get(page);
+ TREE_READ_UNLOCK_IRQ(mapping);
+ /*
+ * In contrast to find_lock_page() we are sure that directory
+ * page cannot be truncated (while DLM lock is held) and,
+ * hence, can avoid restart.
+ *
+ * In fact, page cannot be locked here at all, because
+ * ll_dir_readpage() does synchronous io.
+ */
+ wait_on_page(page);
+ if (PageUptodate(page)) {
+ dp = kmap(page);
+ *start = le64_to_cpu(dp->ldp_hash_start);
+ *end = le64_to_cpu(dp->ldp_hash_end);
+ LASSERT(*start <= hash);
+ if (hash > *end || (*end != *start && hash == *end)) {
+ kunmap(page);
+ lock_page(page);
+ ll_truncate_complete_page(page);
+ unlock_page(page);
+ page_cache_release(page);
+ page = NULL;
+ }
+ } else {
+ page_cache_release(page);
+ page = ERR_PTR(-EIO);
+ }
+
+ } else {
+ TREE_READ_UNLOCK_IRQ(mapping);
+ page = NULL;
+ }
+ RETURN(page);
+}
+
+static struct page *ll_get_dir_page_20(struct inode *dir, __u64 hash, int exact,
+ struct ll_dir_chain *chain)
+{
+ struct ldlm_res_id res_id;
+ struct lustre_handle lockh;
+ struct obd_device *obddev = class_exp2obd(ll_i2sbi(dir)->ll_mdc_exp);
+ struct address_space *mapping = dir->i_mapping;
+ struct lu_dirpage *dp;
+ struct page *page;
+ ldlm_policy_data_t policy = {.l_inodebits = {MDS_INODELOCK_UPDATE} };
+ ldlm_mode_t mode;
+ int rc;
+ __u64 start = 0;
+ __u64 end = 0;
+ ENTRY;
+
+ fid_build_reg_res_name(ll_inode_lu_fid(dir), &res_id);
+ mode = LCK_PR;
+ rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
+ &res_id, LDLM_IBITS, &policy, mode, &lockh);
+ if (!rc) {
+ struct lookup_intent it = { .it_op = IT_READDIR };
+ struct ldlm_enqueue_info einfo = { LDLM_IBITS, mode,
+ ll_mdc_blocking_ast, ldlm_completion_ast, NULL, dir };
+ struct ptlrpc_request *request;
+ struct mdc_op_data op_data = { { 0 } };
+
+ ll_prepare_mdc_op_data(&op_data, dir, NULL, NULL, 0, 0, NULL);
+
+ rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, &einfo, &it,
+ &op_data, &lockh, NULL, 0, 0);
+
+ request = (struct ptlrpc_request *)it.d.lustre.it_data;
+ if (request)
+ ptlrpc_req_finished(request);
+ if (rc < 0) {
+ CERROR("lock enqueue: rc: %d\n", rc);
+ RETURN(ERR_PTR(rc));
+ }
+ }
+ ldlm_lock_dump_handle(D_OTHER, &lockh);
+
+ page = ll_dir_page_locate(dir, hash, &start, &end);
+ if (IS_ERR(page))
+ GOTO(out_unlock, page);
+
+ if (page != NULL) {
+ /*
+ * XXX nikita: not entirely correct handling of a corner case:
+ * suppose hash chain of entries with hash value HASH crosses
+ * border between pages P0 and P1. First both P0 and P1 are
+ * cached, seekdir() is called for some entry from the P0 part
+ * of the chain. Later P0 goes out of cache. telldir(HASH)
+ * happens and finds P1, as it starts with matching hash
+ * value. Remaining entries from P0 part of the chain are
+ * skipped. (Is that really a bug?)
+ *
+ * Possible solutions: 0. don't cache P1 is such case, handle
+ * it as an "overflow" page. 1. invalidate all pages at
+ * once. 2. use HASH|1 as an index for P1.
+ */
+ if (exact && hash != start) {
+ /*
+ * readdir asked for a page starting _exactly_ from
+ * given hash, but cache contains stale page, with
+ * entries with smaller hash values. Stale page should
+ * be invalidated, and new one fetched.
+ */
+ CDEBUG(D_INFO, "Stale readpage page %p: %#lx != %#lx\n",
+ page, (unsigned long)hash, (unsigned long)start);
+ lock_page(page);
+ ll_truncate_complete_page(page);
+ unlock_page(page);
+ page_cache_release(page);
+ } else {
+ GOTO(hash_collision, page);
+ }
+ }
+
+ page = read_cache_page(mapping, hash_x_index(hash),
+ (filler_t*)ll_dir_readpage_20, NULL);
+ if (IS_ERR(page))
+ GOTO(out_unlock, page);
+
+ wait_on_page(page);
+ (void)kmap(page);
+ if (!PageUptodate(page))
+ goto fail;
+ if (!PageChecked(page))
+ ll_check_page(dir, page);
+ if (PageError(page))
+ goto fail;
+hash_collision:
+ dp = page_address(page);
+
+ start = le64_to_cpu(dp->ldp_hash_start);
+ end = le64_to_cpu(dp->ldp_hash_end);
+ if (end == start) {
+ LASSERT(start == hash);
+ CWARN("Page-wide hash collision: %#lx\n", (unsigned long)end);
+ /*
+ * Fetch whole overflow chain...
+ *
+ * XXX not yet.
+ */
+ goto fail;
+ }
+out_unlock:
+ ldlm_lock_decref(&lockh, mode);
+ RETURN(page);
+
+fail:
+ ll_put_page(page);
+ page = ERR_PTR(-EIO);
+ goto out_unlock;
+}
+
+static int ll_readdir_20(struct file *filp, void *cookie, filldir_t filldir)
+{
+ struct inode *inode = filp->f_dentry->d_inode;
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ __u64 pos = filp->f_pos;
+ struct page *page;
+ struct ll_dir_chain chain;
+ int rc;
+ int done;
+ int shift;
+ ENTRY;
+
+ CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) pos %lu/%llu\n",
+ inode->i_ino, inode->i_generation, inode,
+ (unsigned long)pos, i_size_read(inode));
+
+ if (pos == DIR_END_OFF)
+ /*
+ * end-of-file.
+ */
+ RETURN(0);
+
+ rc = 0;
+ done = 0;
+ shift = 0;
+ ll_dir_chain_init(&chain);
+
+ page = ll_get_dir_page_20(inode, pos, 0, &chain);
+
+ while (rc == 0 && !done) {
+ struct lu_dirpage *dp;
+ struct lu_dirent *ent;
+
+ if (!IS_ERR(page)) {
+ /*
+ * If page is empty (end of directoryis reached),
+ * use this value.
+ */
+ __u64 hash = DIR_END_OFF;
+ __u64 next;
+
+ dp = page_address(page);
+ for (ent = lu_dirent_start(dp); ent != NULL && !done;
+ ent = lu_dirent_next(ent)) {
+ char *name;
+ int namelen;
+ struct lu_fid fid;
+ ino_t ino;
+
+ hash = le64_to_cpu(ent->lde_hash);
+ namelen = le16_to_cpu(ent->lde_namelen);
+
+ if (hash < pos)
+ /*
+ * Skip until we find target hash
+ * value.
+ */
+ continue;
+
+ if (namelen == 0)
+ /*
+ * Skip dummy record.
+ */
+ continue;
+
+ fid = ent->lde_fid;
+ name = ent->lde_name;
+ fid_le_to_cpu(&fid, &fid);
+ ino = ll_fid_build_ino(sbi, (struct ll_fid*)&fid);
+
+ done = filldir(cookie, name, namelen,
+ (loff_t)hash, ino, DT_UNKNOWN);
+ }
+ next = le64_to_cpu(dp->ldp_hash_end);
+ ll_put_page(page);
+ if (!done) {
+ pos = next;
+ if (pos == DIR_END_OFF)
+ /*
+ * End of directory reached.
+ */
+ done = 1;
+ else if (1 /* chain is exhausted*/)
+ /*
+ * Normal case: continue to the next
+ * page.
+ */
+ page = ll_get_dir_page_20(inode, pos, 1,
+ &chain);
+ else {
+ /*
+ * go into overflow page.
+ */
+ }
+ } else {
+ pos = hash;
+ }
+ } else {
+ rc = PTR_ERR(page);
+ CERROR("error reading dir "DFID" at %lu: rc %d\n",
+ PFID(ll_inode_lu_fid(inode)),
+ (unsigned long)pos, rc);
+ }
+ }
+
+ filp->f_pos = (loff_t)(__s32)pos;
+ filp->f_version = inode->i_version;
+ touch_atime(filp->f_vfsmnt, filp->f_dentry);
+
+ ll_dir_chain_fini(&chain);
+
+ RETURN(rc);
+}
+
+static int ll_readdir(struct file *filp, void *cookie, filldir_t filldir)
+{
+ struct inode *inode = filp->f_dentry->d_inode;
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+
+ if (sbi->ll_mdc_exp->exp_connect_flags & OBD_CONNECT_FID) {
+ return ll_readdir_20(filp, cookie, filldir);
+ } else {
+ return ll_readdir_18(filp, cookie, filldir);
+ }
+}
+
#define QCTL_COPY(out, in) \
do { \
Q_COPY(out, in, qc_cmd); \
Q_COPY(out, in, qc_dqblk); \
} while (0)
-int ll_send_mgc_param(struct obd_export *mgc, char *string)
+static int ll_send_mgc_param(struct obd_export *mgc, char *string)
{
struct mgs_send_param *msp;
int rc = 0;
return rc;
}
-char *ll_get_fsname(struct inode *inode)
+static char *ll_get_fsname(struct inode *inode)
{
struct lustre_sb_info *lsi = s2lsi(inode->i_sb);
char *ptr, *fsname;
int set_default)
{
struct ll_sb_info *sbi = ll_i2sbi(inode);
- struct mdc_op_data data;
+ struct mdc_op_data data = { { 0 } };
struct ptlrpc_request *req = NULL;
struct lustre_sb_info *lsi = s2lsi(inode->i_sb);
struct obd_device *mgc = lsi->lsi_mgc;