Whamcloud - gitweb
osd,mdd: move osd_readpage() and friends into mdd layer. Remove ->do_readpage() metho...
authornikita <nikita>
Tue, 24 Oct 2006 05:21:32 +0000 (05:21 +0000)
committernikita <nikita>
Tue, 24 Oct 2006 05:21:32 +0000 (05:21 +0000)
lustre/include/dt_object.h
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_object.c
lustre/mdd/mdd_orphans.c
lustre/osd/osd_handler.c

index 6b029ee..fc62538 100644 (file)
@@ -265,9 +265,6 @@ struct dt_object_operations {
         void  (*do_ref_del)(const struct lu_env *env,
                             struct dt_object *dt, struct thandle *th);
 
-        int   (*do_readpage)(const struct lu_env *env,
-                             struct dt_object *dt, const struct lu_rdpg *rdpg,
-                             struct lustre_capa *capa);
         struct obd_capa *(*do_capa_get)(const struct lu_env *env,
                                         struct dt_object *dt,
                                         struct lustre_capa *old, __u64 opc);
@@ -338,7 +335,8 @@ struct dt_index_operations {
                  * precondition: dt_object_exists(dt);
                  */
                 struct dt_it *(*init)(const struct lu_env *env,
-                                      struct dt_object *dt, int writable);
+                                      struct dt_object *dt, int writable,
+                                      struct lustre_capa *capa);
                 void          (*fini)(const struct lu_env *env,
                                       struct dt_it *di);
                 int            (*get)(const struct lu_env *env,
index 97a46f3..26af156 100644 (file)
@@ -484,7 +484,7 @@ static int mdd_dir_is_empty(const struct lu_env *env,
 
         obj = mdd_object_child(dir);
         iops = &obj->do_index_ops->dio_it;
-        it = iops->init(env, obj, 0);
+        it = iops->init(env, obj, 0, BYPASS_CAPA);
         if (it != NULL) {
                 result = iops->get(env, it, (const void *)"");
                 if (result > 0) {
index 029f9a6..50d6071 100644 (file)
@@ -37,6 +37,8 @@
 #include <lustre_ver.h>
 #include <obd_support.h>
 #include <lprocfs_status.h>
+/* fid_be_cpu(), fid_cpu_to_be(). */
+#include <lustre_fid.h>
 
 #include <linux/ldiskfs_fs.h>
 #include <lustre_mds.h>
@@ -256,7 +258,7 @@ void mdd_ref_del_internal(const struct lu_env *env, struct mdd_object *obj,
 }
 
 /* get only inode attributes */
-int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj, 
+int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
                   struct md_attr *ma)
 {
         int rc = 0;
@@ -994,7 +996,7 @@ static int mdd_object_create(const struct lu_env *env,
                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
                         buf->lb_len = spec->u.sp_ea.eadatalen;
                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
-                                rc = __mdd_acl_init(env, mdd_obj, buf, 
+                                rc = __mdd_acl_init(env, mdd_obj, buf,
                                                     &ma->ma_attr.la_mode,
                                                     handle);
                                 if (rc)
@@ -1208,6 +1210,162 @@ static int mdd_readpage_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
+static int mdd_dir_page_build(const struct lu_env *env, int first,
+                              void *area, int nob,
+                              struct dt_it_ops  *iops, struct dt_it *it,
+                              __u32 *start, __u32 *end, struct lu_dirent **last)
+{
+        int result;
+        struct mdd_thread_info *info = mdd_env_info(env);
+        struct lu_fid          *fid  = &info->mti_fid;
+        struct lu_dirent       *ent;
+
+        if (first) {
+                memset(area, 0, sizeof (struct lu_dirpage));
+                area += sizeof (struct lu_dirpage);
+                nob  -= sizeof (struct lu_dirpage);
+        }
+
+        LASSERT(nob > sizeof *ent);
+
+        ent  = area;
+        result = 0;
+        do {
+                char  *name;
+                int    len;
+                int    recsize;
+                __u32  hash;
+
+                name = (char *)iops->key(env, it);
+                len  = iops->key_size(env, it);
+
+                fid  = (struct lu_fid *)iops->rec(env, it);
+
+                recsize = (sizeof *ent + len + 3) & ~3;
+                hash = iops->store(env, it);
+                *end = hash;
+                CDEBUG(D_INFO, "%p %p %d "DFID": %#8.8x (%d) \"%*.*s\"\n",
+                       name, ent, nob, PFID(fid), hash, len, len, len, name);
+
+                if (nob >= recsize) {
+                        fid_be_to_cpu(&ent->lde_fid, fid);
+                        fid_cpu_to_le(&ent->lde_fid, &ent->lde_fid);
+                        ent->lde_hash = hash;
+                        ent->lde_namelen = cpu_to_le16(len);
+                        ent->lde_reclen  = cpu_to_le16(recsize);
+                        memcpy(ent->lde_name, name, len);
+                        if (first && ent == area)
+                                *start = hash;
+                        *last = ent;
+                        ent = (void *)ent + recsize;
+                        nob -= recsize;
+                        result = iops->next(env, it);
+                } else {
+                        /*
+                         * record doesn't fit into page, enlarge previous one.
+                         */
+                        LASSERT(*last != NULL);
+                        (*last)->lde_reclen =
+                                cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
+                                            nob);
+                        break;
+                }
+        } while (result == 0);
+
+        return result;
+}
+
+static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
+                          const struct lu_rdpg *rdpg)
+{
+        struct dt_it      *it;
+        struct dt_object  *next = mdd_object_child(obj);
+        struct dt_it_ops  *iops;
+        struct page       *pg;
+        struct lu_dirent  *last;
+        int i;
+        int rc;
+        int nob;
+        __u32 hash_start;
+        __u32 hash_end;
+
+        LASSERT(rdpg->rp_pages != NULL);
+        LASSERT(next->do_index_ops != NULL);
+
+        if (rdpg->rp_count <= 0)
+                return -EFAULT;
+
+        /*
+         * iterate through directory and fill pages from @rdpg
+         */
+        iops = &next->do_index_ops->dio_it;
+        it = iops->init(env, next, 0, mdd_object_capa(env, obj));
+        if (it == NULL)
+                return -ENOMEM;
+
+        rc = iops->load(env, it, rdpg->rp_hash);
+
+        if (rc == 0)
+                /*
+                 * Iterator didn't find record with exactly the key requested.
+                 *
+                 * It is currently either
+                 *
+                 *     - positioned above record with key less than
+                 *     requested---skip it.
+                 *
+                 *     - or not positioned at all (is in IAM_IT_SKEWED
+                 *     state)---position it on the next item.
+                 */
+                rc = iops->next(env, it);
+        else if (rc > 0)
+                rc = 0;
+
+        /*
+         * At this point and across for-loop:
+         *
+         *  rc == 0 -> ok, proceed.
+         *  rc >  0 -> end of directory.
+         *  rc <  0 -> error.
+         */
+        for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
+             i++, nob -= CFS_PAGE_SIZE) {
+                LASSERT(i < rdpg->rp_npages);
+                pg = rdpg->rp_pages[i];
+                rc = mdd_dir_page_build(env, !i, kmap(pg),
+                                        min_t(int, nob, CFS_PAGE_SIZE), iops,
+                                        it, &hash_start, &hash_end, &last);
+                if (rc != 0 || i == rdpg->rp_npages - 1)
+                        last->lde_reclen = 0;
+                kunmap(pg);
+        }
+        if (rc > 0) {
+                /*
+                 * end of directory.
+                 */
+                hash_end = ~0ul;
+                rc = 0;
+        }
+        if (rc == 0) {
+                struct lu_dirpage *dp;
+
+                dp = kmap(rdpg->rp_pages[0]);
+                dp->ldp_hash_start = rdpg->rp_hash;
+                dp->ldp_hash_end   = hash_end;
+                if (i == 0)
+                        /*
+                         * No pages were processed, mark this.
+                         */
+                        dp->ldp_flags |= LDF_EMPTY;
+                dp->ldp_flags = cpu_to_le16(dp->ldp_flags);
+                kunmap(rdpg->rp_pages[0]);
+        }
+        iops->put(env, it);
+        iops->fini(env, it);
+
+        return rc;
+}
+
 static int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                         const struct lu_rdpg *rdpg)
 {
@@ -1224,8 +1382,7 @@ static int mdd_readpage(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 GOTO(out_unlock, rc);
 
-        rc = next->do_ops->do_readpage(env, next, rdpg,
-                                       mdd_object_capa(env, mdd_obj));
+        rc = __mdd_readpage(env, mdd_obj, rdpg);
 
 out_unlock:
         mdd_read_unlock(env, mdd_obj);
index 5cb4999..fe9209a 100644 (file)
@@ -142,7 +142,7 @@ static int orph_index_iterate(const struct lu_env *env,
         ENTRY;
 
         iops = &dt_obj->do_index_ops->dio_it;
-        it = iops->init(env, dt_obj, 1);
+        it = iops->init(env, dt_obj, 1, BYPASS_CAPA);
         if (it != NULL) {
                 result = iops->get(env, it, (const void *)key);
                 if (result > 0) {
index 174f207..731e932 100644 (file)
@@ -217,7 +217,8 @@ static struct inode       *osd_iget         (struct osd_thread_info *info,
                                              const struct osd_inode_id *id);
 static struct super_block *osd_sb           (const struct osd_device *dev);
 static struct dt_it       *osd_it_init      (const struct lu_env *env,
-                                             struct dt_object *dt, int wable);
+                                             struct dt_object *dt, int wable,
+                                             struct lustre_capa *capa);
 static struct dt_key      *osd_it_key       (const struct lu_env *env,
                                              const struct dt_it *di);
 static struct dt_rec      *osd_it_rec       (const struct lu_env *env,
@@ -1260,178 +1261,6 @@ static int osd_xattr_del(const struct lu_env *env,
         return inode->i_op->removexattr(dentry, name);
 }
 
-static int osd_dir_page_build(const struct lu_env *env, int first,
-                              void *area, int nob,
-                              struct dt_it_ops  *iops, struct dt_it *it,
-                              __u32 *start, __u32 *end, struct lu_dirent **last)
-{
-        int result;
-        struct osd_thread_info *info = lu_context_key_get(&env->le_ctx,
-                                                          &osd_key);
-        struct lu_fid          *fid  = &info->oti_fid;
-        struct lu_dirent       *ent;
-
-        if (first) {
-                memset(area, 0, sizeof (struct lu_dirpage));
-                area += sizeof (struct lu_dirpage);
-                nob  -= sizeof (struct lu_dirpage);
-        }
-
-        LASSERT(nob > sizeof *ent);
-
-        ent  = area;
-        result = 0;
-        do {
-                char  *name;
-                int    len;
-                int    recsize;
-                __u32  hash;
-
-                name = (char *)iops->key(env, it);
-                len  = iops->key_size(env, it);
-
-                *fid  = *(struct lu_fid *)iops->rec(env, it);
-
-                recsize = (sizeof *ent + len + 3) & ~3;
-                hash = iops->store(env, it);
-                *end = hash;
-                CDEBUG(D_INFO, "%p %p %d "DFID": %#8.8x (%d) \"%*.*s\"\n",
-                       name, ent, nob, PFID(fid), hash, len, len, len, name);
-
-                if (nob >= recsize) {
-                        fid_be_to_cpu(&ent->lde_fid, fid);
-                        fid_cpu_to_le(&ent->lde_fid, &ent->lde_fid);
-                        ent->lde_hash = hash;
-                        ent->lde_namelen = cpu_to_le16(len);
-                        ent->lde_reclen  = cpu_to_le16(recsize);
-                        memcpy(ent->lde_name, name, len);
-                        if (first && ent == area)
-                                *start = hash;
-                        *last = ent;
-                        ent = (void *)ent + recsize;
-                        nob -= recsize;
-                        result = iops->next(env, it);
-                } else {
-                        /*
-                         * record doesn't fit into page, enlarge previous one.
-                         */
-                        LASSERT(*last != NULL);
-                        (*last)->lde_reclen =
-                                cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
-                                            nob);
-                        break;
-                }
-        } while (result == 0);
-
-        return result;
-}
-
-static int osd_readpage(const struct lu_env *env,
-                        struct dt_object *dt,
-                        const struct lu_rdpg *rdpg,
-                        struct lustre_capa *capa)
-{
-        struct dt_it      *it;
-        struct osd_object *obj = osd_dt_obj(dt);
-        struct dt_it_ops  *iops;
-        struct page       *pg;
-        struct lu_dirent  *last;
-        int i;
-        int rc;
-        int nob;
-        __u32 hash_start;
-        __u32 hash_end;
-
-        LASSERT(dt_object_exists(dt));
-        LASSERT(osd_invariant(obj));
-        LASSERT(osd_has_index(obj));
-        LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
-
-        LASSERT(rdpg->rp_pages != NULL);
-
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
-                return -EACCES;
-
-        if (rdpg->rp_count <= 0)
-                return -EFAULT;
-
-        if (rdpg->rp_count & (obj->oo_inode->i_blksize - 1)) {
-                CERROR("size %u is not multiple of blocksize %lu\n",
-                       rdpg->rp_count, obj->oo_inode->i_blksize);
-                return -EFAULT;
-        }
-
-        /*
-         * iterate through directory and fill pages from @rdpg
-         */
-        iops = &dt->do_index_ops->dio_it;
-        it = iops->init(env, dt, 0);
-        if (it == NULL)
-                return -ENOMEM;
-
-        rc = iops->load(env, it, rdpg->rp_hash);
-
-        if (rc == 0) {
-                /*
-                 * Iterator didn't find record with exactly the key requested.
-                 *
-                 * It is currently either
-                 *
-                 *     - positioned above record with key less than
-                 *     requested---skip it.
-                 *
-                 *     - or not positioned at all (is in IAM_IT_SKEWED
-                 *     state)---position it on the next item.
-                 */
-                rc = iops->next(env, it);
-        } else if (rc > 0)
-                rc = 0;
-
-        /*
-         * At this point and across for-loop:
-         *
-         *  rc == 0 -> ok, proceed.
-         *  rc >  0 -> end of directory.
-         *  rc <  0 -> error.
-         */
-        for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
-             i++, nob -= CFS_PAGE_SIZE) {
-                LASSERT(i < rdpg->rp_npages);
-                pg = rdpg->rp_pages[i];
-                rc = osd_dir_page_build(env, !i, kmap(pg),
-                                        min_t(int, nob, CFS_PAGE_SIZE), iops,
-                                        it, &hash_start, &hash_end, &last);
-                if (rc != 0 || i == rdpg->rp_npages - 1)
-                        last->lde_reclen = 0;
-                kunmap(pg);
-        }
-        if (rc > 0) {
-                /*
-                 * end of directory.
-                 */
-                hash_end = ~0ul;
-                rc = 0;
-        }
-        if (rc == 0) {
-                struct lu_dirpage *dp;
-
-                dp = kmap(rdpg->rp_pages[0]);
-                dp->ldp_hash_start = rdpg->rp_hash;
-                dp->ldp_hash_end   = hash_end;
-                if (i == 0)
-                        /*
-                         * No pages were processed, mark this.
-                         */
-                        dp->ldp_flags |= LDF_EMPTY;
-                dp->ldp_flags = cpu_to_le16(dp->ldp_flags);
-                kunmap(rdpg->rp_pages[0]);
-        }
-        iops->put(env, it);
-        iops->fini(env, it);
-
-        return rc;
-}
-
 static struct obd_capa *osd_capa_get(const struct lu_env *env,
                                      struct dt_object *dt,
                                      struct lustre_capa *old,
@@ -1503,7 +1332,6 @@ static struct dt_object_operations osd_obj_ops = {
         .do_xattr_set    = osd_xattr_set,
         .do_xattr_del    = osd_xattr_del,
         .do_xattr_list   = osd_xattr_list,
-        .do_readpage     = osd_readpage,
         .do_capa_get     = osd_capa_get,
 };
 
@@ -1734,7 +1562,8 @@ struct osd_it {
 };
 
 static struct dt_it *osd_it_init(const struct lu_env *env,
-                                 struct dt_object *dt, int writable)
+                                 struct dt_object *dt, int writable,
+                                 struct lustre_capa *capa)
 {
         struct osd_it     *it;
         struct osd_object *obj = osd_dt_obj(dt);
@@ -1744,6 +1573,10 @@ static struct dt_it *osd_it_init(const struct lu_env *env,
         LASSERT(lu_object_exists(lo));
         LASSERT(obj->oo_ipd != NULL);
 
+        if (osd_object_auth(env, dt, capa, writable ? CAPA_OPC_BODY_WRITE :
+                            CAPA_OPC_BODY_READ))
+                return ERR_PTR(-EACCES);
+
         flags = writable ? IAM_IT_MOVE|IAM_IT_WRITE : IAM_IT_MOVE;
         OBD_ALLOC_PTR(it);
         if (it != NULL) {