From e2c5fb170c69c180ff54c41f516aa4a9ab5a53d0 Mon Sep 17 00:00:00 2001 From: nikita Date: Tue, 24 Oct 2006 05:21:32 +0000 Subject: [PATCH] osd,mdd: move osd_readpage() and friends into mdd layer. Remove ->do_readpage() method from dt_object_operations. --- lustre/include/dt_object.h | 6 +- lustre/mdd/mdd_dir.c | 2 +- lustre/mdd/mdd_object.c | 165 +++++++++++++++++++++++++++++++++++++++- lustre/mdd/mdd_orphans.c | 2 +- lustre/osd/osd_handler.c | 183 ++------------------------------------------- 5 files changed, 173 insertions(+), 185 deletions(-) diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index 6b029ee..fc62538 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -265,9 +265,6 @@ struct dt_object_operations { void (*do_ref_del)(const struct lu_env *env, struct dt_object *dt, struct thandle *th); - int (*do_readpage)(const struct lu_env *env, - struct dt_object *dt, const struct lu_rdpg *rdpg, - struct lustre_capa *capa); struct obd_capa *(*do_capa_get)(const struct lu_env *env, struct dt_object *dt, struct lustre_capa *old, __u64 opc); @@ -338,7 +335,8 @@ struct dt_index_operations { * precondition: dt_object_exists(dt); */ struct dt_it *(*init)(const struct lu_env *env, - struct dt_object *dt, int writable); + struct dt_object *dt, int writable, + struct lustre_capa *capa); void (*fini)(const struct lu_env *env, struct dt_it *di); int (*get)(const struct lu_env *env, diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index 97a46f3..26af156 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -484,7 +484,7 @@ static int mdd_dir_is_empty(const struct lu_env *env, obj = mdd_object_child(dir); iops = &obj->do_index_ops->dio_it; - it = iops->init(env, obj, 0); + it = iops->init(env, obj, 0, BYPASS_CAPA); if (it != NULL) { result = iops->get(env, it, (const void *)""); if (result > 0) { diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 029f9a6..50d6071 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -37,6 +37,8 @@ #include #include #include +/* fid_be_cpu(), fid_cpu_to_be(). */ +#include #include #include @@ -256,7 +258,7 @@ void mdd_ref_del_internal(const struct lu_env *env, struct mdd_object *obj, } /* get only inode attributes */ -int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj, +int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj, struct md_attr *ma) { int rc = 0; @@ -994,7 +996,7 @@ static int mdd_object_create(const struct lu_env *env, buf->lb_buf = (void *)spec->u.sp_ea.eadata; buf->lb_len = spec->u.sp_ea.eadatalen; if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) { - rc = __mdd_acl_init(env, mdd_obj, buf, + rc = __mdd_acl_init(env, mdd_obj, buf, &ma->ma_attr.la_mode, handle); if (rc) @@ -1208,6 +1210,162 @@ static int mdd_readpage_sanity_check(const struct lu_env *env, RETURN(rc); } +static int mdd_dir_page_build(const struct lu_env *env, int first, + void *area, int nob, + struct dt_it_ops *iops, struct dt_it *it, + __u32 *start, __u32 *end, struct lu_dirent **last) +{ + int result; + struct mdd_thread_info *info = mdd_env_info(env); + struct lu_fid *fid = &info->mti_fid; + struct lu_dirent *ent; + + if (first) { + memset(area, 0, sizeof (struct lu_dirpage)); + area += sizeof (struct lu_dirpage); + nob -= sizeof (struct lu_dirpage); + } + + LASSERT(nob > sizeof *ent); + + ent = area; + result = 0; + do { + char *name; + int len; + int recsize; + __u32 hash; + + name = (char *)iops->key(env, it); + len = iops->key_size(env, it); + + fid = (struct lu_fid *)iops->rec(env, it); + + recsize = (sizeof *ent + len + 3) & ~3; + hash = iops->store(env, it); + *end = hash; + CDEBUG(D_INFO, "%p %p %d "DFID": %#8.8x (%d) \"%*.*s\"\n", + name, ent, nob, PFID(fid), hash, len, len, len, name); + + if (nob >= recsize) { + fid_be_to_cpu(&ent->lde_fid, fid); + fid_cpu_to_le(&ent->lde_fid, &ent->lde_fid); + ent->lde_hash = hash; + ent->lde_namelen = cpu_to_le16(len); + ent->lde_reclen = cpu_to_le16(recsize); + memcpy(ent->lde_name, name, len); + if (first && ent == area) + *start = hash; + *last = ent; + ent = (void *)ent + recsize; + nob -= recsize; + result = iops->next(env, it); + } else { + /* + * record doesn't fit into page, enlarge previous one. + */ + LASSERT(*last != NULL); + (*last)->lde_reclen = + cpu_to_le16(le16_to_cpu((*last)->lde_reclen) + + nob); + break; + } + } while (result == 0); + + return result; +} + +static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, + const struct lu_rdpg *rdpg) +{ + struct dt_it *it; + struct dt_object *next = mdd_object_child(obj); + struct dt_it_ops *iops; + struct page *pg; + struct lu_dirent *last; + int i; + int rc; + int nob; + __u32 hash_start; + __u32 hash_end; + + LASSERT(rdpg->rp_pages != NULL); + LASSERT(next->do_index_ops != NULL); + + if (rdpg->rp_count <= 0) + return -EFAULT; + + /* + * iterate through directory and fill pages from @rdpg + */ + iops = &next->do_index_ops->dio_it; + it = iops->init(env, next, 0, mdd_object_capa(env, obj)); + if (it == NULL) + return -ENOMEM; + + rc = iops->load(env, it, rdpg->rp_hash); + + if (rc == 0) + /* + * Iterator didn't find record with exactly the key requested. + * + * It is currently either + * + * - positioned above record with key less than + * requested---skip it. + * + * - or not positioned at all (is in IAM_IT_SKEWED + * state)---position it on the next item. + */ + rc = iops->next(env, it); + else if (rc > 0) + rc = 0; + + /* + * At this point and across for-loop: + * + * rc == 0 -> ok, proceed. + * rc > 0 -> end of directory. + * rc < 0 -> error. + */ + for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0; + i++, nob -= CFS_PAGE_SIZE) { + LASSERT(i < rdpg->rp_npages); + pg = rdpg->rp_pages[i]; + rc = mdd_dir_page_build(env, !i, kmap(pg), + min_t(int, nob, CFS_PAGE_SIZE), iops, + it, &hash_start, &hash_end, &last); + if (rc != 0 || i == rdpg->rp_npages - 1) + last->lde_reclen = 0; + kunmap(pg); + } + if (rc > 0) { + /* + * end of directory. + */ + hash_end = ~0ul; + rc = 0; + } + if (rc == 0) { + struct lu_dirpage *dp; + + dp = kmap(rdpg->rp_pages[0]); + dp->ldp_hash_start = rdpg->rp_hash; + dp->ldp_hash_end = hash_end; + if (i == 0) + /* + * No pages were processed, mark this. + */ + dp->ldp_flags |= LDF_EMPTY; + dp->ldp_flags = cpu_to_le16(dp->ldp_flags); + kunmap(rdpg->rp_pages[0]); + } + iops->put(env, it); + iops->fini(env, it); + + return rc; +} + static int mdd_readpage(const struct lu_env *env, struct md_object *obj, const struct lu_rdpg *rdpg) { @@ -1224,8 +1382,7 @@ static int mdd_readpage(const struct lu_env *env, struct md_object *obj, if (rc) GOTO(out_unlock, rc); - rc = next->do_ops->do_readpage(env, next, rdpg, - mdd_object_capa(env, mdd_obj)); + rc = __mdd_readpage(env, mdd_obj, rdpg); out_unlock: mdd_read_unlock(env, mdd_obj); diff --git a/lustre/mdd/mdd_orphans.c b/lustre/mdd/mdd_orphans.c index 5cb4999..fe9209a 100644 --- a/lustre/mdd/mdd_orphans.c +++ b/lustre/mdd/mdd_orphans.c @@ -142,7 +142,7 @@ static int orph_index_iterate(const struct lu_env *env, ENTRY; iops = &dt_obj->do_index_ops->dio_it; - it = iops->init(env, dt_obj, 1); + it = iops->init(env, dt_obj, 1, BYPASS_CAPA); if (it != NULL) { result = iops->get(env, it, (const void *)key); if (result > 0) { diff --git a/lustre/osd/osd_handler.c b/lustre/osd/osd_handler.c index 174f207..731e932 100644 --- a/lustre/osd/osd_handler.c +++ b/lustre/osd/osd_handler.c @@ -217,7 +217,8 @@ static struct inode *osd_iget (struct osd_thread_info *info, const struct osd_inode_id *id); static struct super_block *osd_sb (const struct osd_device *dev); static struct dt_it *osd_it_init (const struct lu_env *env, - struct dt_object *dt, int wable); + struct dt_object *dt, int wable, + struct lustre_capa *capa); static struct dt_key *osd_it_key (const struct lu_env *env, const struct dt_it *di); static struct dt_rec *osd_it_rec (const struct lu_env *env, @@ -1260,178 +1261,6 @@ static int osd_xattr_del(const struct lu_env *env, return inode->i_op->removexattr(dentry, name); } -static int osd_dir_page_build(const struct lu_env *env, int first, - void *area, int nob, - struct dt_it_ops *iops, struct dt_it *it, - __u32 *start, __u32 *end, struct lu_dirent **last) -{ - int result; - struct osd_thread_info *info = lu_context_key_get(&env->le_ctx, - &osd_key); - struct lu_fid *fid = &info->oti_fid; - struct lu_dirent *ent; - - if (first) { - memset(area, 0, sizeof (struct lu_dirpage)); - area += sizeof (struct lu_dirpage); - nob -= sizeof (struct lu_dirpage); - } - - LASSERT(nob > sizeof *ent); - - ent = area; - result = 0; - do { - char *name; - int len; - int recsize; - __u32 hash; - - name = (char *)iops->key(env, it); - len = iops->key_size(env, it); - - *fid = *(struct lu_fid *)iops->rec(env, it); - - recsize = (sizeof *ent + len + 3) & ~3; - hash = iops->store(env, it); - *end = hash; - CDEBUG(D_INFO, "%p %p %d "DFID": %#8.8x (%d) \"%*.*s\"\n", - name, ent, nob, PFID(fid), hash, len, len, len, name); - - if (nob >= recsize) { - fid_be_to_cpu(&ent->lde_fid, fid); - fid_cpu_to_le(&ent->lde_fid, &ent->lde_fid); - ent->lde_hash = hash; - ent->lde_namelen = cpu_to_le16(len); - ent->lde_reclen = cpu_to_le16(recsize); - memcpy(ent->lde_name, name, len); - if (first && ent == area) - *start = hash; - *last = ent; - ent = (void *)ent + recsize; - nob -= recsize; - result = iops->next(env, it); - } else { - /* - * record doesn't fit into page, enlarge previous one. - */ - LASSERT(*last != NULL); - (*last)->lde_reclen = - cpu_to_le16(le16_to_cpu((*last)->lde_reclen) + - nob); - break; - } - } while (result == 0); - - return result; -} - -static int osd_readpage(const struct lu_env *env, - struct dt_object *dt, - const struct lu_rdpg *rdpg, - struct lustre_capa *capa) -{ - struct dt_it *it; - struct osd_object *obj = osd_dt_obj(dt); - struct dt_it_ops *iops; - struct page *pg; - struct lu_dirent *last; - int i; - int rc; - int nob; - __u32 hash_start; - __u32 hash_end; - - LASSERT(dt_object_exists(dt)); - LASSERT(osd_invariant(obj)); - LASSERT(osd_has_index(obj)); - LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj)); - - LASSERT(rdpg->rp_pages != NULL); - - if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ)) - return -EACCES; - - if (rdpg->rp_count <= 0) - return -EFAULT; - - if (rdpg->rp_count & (obj->oo_inode->i_blksize - 1)) { - CERROR("size %u is not multiple of blocksize %lu\n", - rdpg->rp_count, obj->oo_inode->i_blksize); - return -EFAULT; - } - - /* - * iterate through directory and fill pages from @rdpg - */ - iops = &dt->do_index_ops->dio_it; - it = iops->init(env, dt, 0); - if (it == NULL) - return -ENOMEM; - - rc = iops->load(env, it, rdpg->rp_hash); - - if (rc == 0) { - /* - * Iterator didn't find record with exactly the key requested. - * - * It is currently either - * - * - positioned above record with key less than - * requested---skip it. - * - * - or not positioned at all (is in IAM_IT_SKEWED - * state)---position it on the next item. - */ - rc = iops->next(env, it); - } else if (rc > 0) - rc = 0; - - /* - * At this point and across for-loop: - * - * rc == 0 -> ok, proceed. - * rc > 0 -> end of directory. - * rc < 0 -> error. - */ - for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0; - i++, nob -= CFS_PAGE_SIZE) { - LASSERT(i < rdpg->rp_npages); - pg = rdpg->rp_pages[i]; - rc = osd_dir_page_build(env, !i, kmap(pg), - min_t(int, nob, CFS_PAGE_SIZE), iops, - it, &hash_start, &hash_end, &last); - if (rc != 0 || i == rdpg->rp_npages - 1) - last->lde_reclen = 0; - kunmap(pg); - } - if (rc > 0) { - /* - * end of directory. - */ - hash_end = ~0ul; - rc = 0; - } - if (rc == 0) { - struct lu_dirpage *dp; - - dp = kmap(rdpg->rp_pages[0]); - dp->ldp_hash_start = rdpg->rp_hash; - dp->ldp_hash_end = hash_end; - if (i == 0) - /* - * No pages were processed, mark this. - */ - dp->ldp_flags |= LDF_EMPTY; - dp->ldp_flags = cpu_to_le16(dp->ldp_flags); - kunmap(rdpg->rp_pages[0]); - } - iops->put(env, it); - iops->fini(env, it); - - return rc; -} - static struct obd_capa *osd_capa_get(const struct lu_env *env, struct dt_object *dt, struct lustre_capa *old, @@ -1503,7 +1332,6 @@ static struct dt_object_operations osd_obj_ops = { .do_xattr_set = osd_xattr_set, .do_xattr_del = osd_xattr_del, .do_xattr_list = osd_xattr_list, - .do_readpage = osd_readpage, .do_capa_get = osd_capa_get, }; @@ -1734,7 +1562,8 @@ struct osd_it { }; static struct dt_it *osd_it_init(const struct lu_env *env, - struct dt_object *dt, int writable) + struct dt_object *dt, int writable, + struct lustre_capa *capa) { struct osd_it *it; struct osd_object *obj = osd_dt_obj(dt); @@ -1744,6 +1573,10 @@ static struct dt_it *osd_it_init(const struct lu_env *env, LASSERT(lu_object_exists(lo)); LASSERT(obj->oo_ipd != NULL); + if (osd_object_auth(env, dt, capa, writable ? CAPA_OPC_BODY_WRITE : + CAPA_OPC_BODY_READ)) + return ERR_PTR(-EACCES); + flags = writable ? IAM_IT_MOVE|IAM_IT_WRITE : IAM_IT_MOVE; OBD_ALLOC_PTR(it); if (it != NULL) { -- 1.8.3.1