Whamcloud - gitweb
- added basic readdir stuff;
authoryury <yury>
Sun, 23 Jul 2006 10:02:00 +0000 (10:02 +0000)
committeryury <yury>
Sun, 23 Jul 2006 10:02:00 +0000 (10:02 +0000)
- fixed bug in mdt_handle_common()

lustre/include/dt_object.h
lustre/include/lu_object.h
lustre/include/md_object.h
lustre/llite/dir.c
lustre/mdd/mdd_handler.c
lustre/mdt/mdt_handler.c
lustre/osd/osd_handler.c

index db80aea..98d3589 100644 (file)
@@ -212,6 +212,9 @@ struct dt_object_operations {
          */
         void  (*do_ref_del)(const struct lu_context *ctxt,
                             struct dt_object *dt, struct thandle *th);
+
+        int (*do_readpage)(const struct lu_context *ctxt,
+                           struct dt_object *dt, struct lu_rdpg *rdpg);
 };
 
 /*
index 5779c8e..f6c0e2d 100644 (file)
@@ -771,6 +771,17 @@ static inline int lu_object_assert_not_exists(const struct lu_context *ctx,
         return lu_object_exists(ctx, o) <= 0;
 }
 
+struct lu_rdpg {
+        /* input params, should be filled out by mdt */
+        loff_t                  rp_offset;
+        int                     rp_count;
+        int                     rp_npages;
+        struct page           **rp_pages;
+
+        /* output params, filled by osd */
+        __u64                   rp_size;
+};
+
 /*
  * lu_context. Execution context for lu_object methods. Currently associated
  * with thread.
index 1bbfc87..4a1eacc 100644 (file)
@@ -83,6 +83,9 @@ struct md_object_operations {
                            struct md_attr *);
         int (*moo_open)(const struct lu_context *, struct md_object *);
         int (*moo_close)(const struct lu_context *, struct md_object *);
+        
+        int (*moo_readpage)(const struct lu_context *, struct md_object *,
+                            struct lu_rdpg *);
 };
 
 /*
@@ -247,6 +250,13 @@ static inline int mo_close(const struct lu_context *cx, struct md_object *m)
         return m->mo_ops->moo_close(cx, m);
 }
 
+static inline int mo_readpage(const struct lu_context *cx, struct md_object *m,
+                              struct lu_rdpg *rdpg)
+{
+        LASSERT(m->mo_ops->moo_readpage);
+        return m->mo_ops->moo_readpage(cx, m, rdpg);
+}
+
 static inline int mo_object_create(const struct lu_context *cx,
                                    struct md_object *m, struct md_attr *at)
 {
index 1da3a5d..7e38503 100644 (file)
@@ -75,6 +75,7 @@ static int ll_dir_readpage(struct file *file, struct page *page)
                 LASSERT (body != NULL);         /* checked by md_readpage() */
                 LASSERT_REPSWABBED (request, 0); /* swabbed by md_readpage() */
 
+                LASSERT((body->valid & OBD_MD_FLSIZE) != 0);
                 inode->i_size = body->size;
                 SetPageUptodate(page);
         }
index d588915..8c807f5 100644 (file)
@@ -1159,6 +1159,18 @@ static int mdd_close(const struct lu_context *ctxt, struct md_object *obj)
         return 0;
 }
 
+static int mdd_readpage(const struct lu_context *ctxt, struct md_object *obj,
+                        struct md_rdpg *rdpg)
+{
+        struct dt_object *next;
+        int rc;
+
+        LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
+        next = mdd_object_child(obj);
+        rc = next->do_ops->do_readpage(ctxt, next, rdpg);
+        return rc;
+}
+
 struct md_device_operations mdd_ops = {
         .mdo_root_get       = mdd_root_get,
         .mdo_statfs         = mdd_statfs,
@@ -1186,7 +1198,8 @@ static struct md_object_operations mdd_obj_ops = {
         .moo_ref_add       = mdd_ref_add,
         .moo_ref_del       = mdd_ref_del,
         .moo_open          = mdd_open,
-        .moo_close         = mdd_close
+        .moo_close         = mdd_close,
+        .moo_readpage      = mdd_readpage
 };
 
 static struct obd_ops mdd_obd_device_ops = {
index 46208ef..5871426 100644 (file)
@@ -517,9 +517,133 @@ static int mdt_disconnect(struct mdt_thread_info *info)
         return target_handle_disconnect(mdt_info_req(info));
 }
 
+static int mdt_sendpage(struct mdt_thread_info *info,
+                        struct mdt_object *object,
+                        struct lu_rdpg *rdpg)
+{
+        int rc = 0, npages, i, tmpcount, tmpsize = 0;
+        struct ptlrpc_bulk_desc *desc;
+        struct ptlrpc_request *req;
+        struct l_wait_info lwi;
+        ENTRY;
+
+        LASSERT((rdpg->rp_offset & (PAGE_SIZE - 1)) == 0);
+
+        req = info->mti_pill.rc_req;
+        npages = (rdpg->rp_count + PAGE_SIZE - 1) >> PAGE_SHIFT;
+        desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE,
+                                    MDS_BULK_PORTAL);
+        if (desc == NULL)
+                GOTO(free_desc, rc = -ENOMEM);
+
+        for (i = 0, tmpcount = rdpg->rp_count; i < npages; i++, tmpcount -= tmpsize) {
+                tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
+                ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
+        }
+
+        LASSERT(desc->bd_nob == rdpg->rp_count);
+        rc = ptlrpc_start_bulk_transfer(desc);
+        if (rc)
+                GOTO(free_desc, rc);
+
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
+                CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
+                       OBD_FAIL_MDS_SENDPAGE, rc);
+                GOTO(abort_bulk, rc);
+        }
+
+        lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
+        rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
+        LASSERT (rc == 0 || rc == -ETIMEDOUT);
+
+        if (rc == 0) {
+                if (desc->bd_success &&
+                    desc->bd_nob_transferred == rdpg->rp_count)
+                        GOTO(free_desc, rc);
+
+                rc = -ETIMEDOUT; /* XXX should this be a different errno? */
+        }
+
+        DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
+                  (rc == -ETIMEDOUT) ? "timeout" : "network error",
+                  desc->bd_nob_transferred, rdpg->rp_count,
+                  req->rq_export->exp_client_uuid.uuid,
+                  req->rq_export->exp_connection->c_remote_uuid.uuid);
+
+        class_fail_export(req->rq_export);
+
+        EXIT;
+abort_bulk:
+        ptlrpc_abort_bulk(desc);
+free_desc:
+        ptlrpc_free_bulk(desc);
+        return rc;
+}
+
 static int mdt_readpage(struct mdt_thread_info *info)
 {
-        return -EOPNOTSUPP;
+        struct mdt_body *reqbody, *repbody;
+        int rc, i, tmpcount, tmpsize = 0;
+        struct lu_rdpg rdpg = { 0 };
+        struct mdt_object *child;
+        ENTRY;
+
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
+                RETURN(-ENOMEM);
+
+        req_capsule_set(&info->mti_pill, &RQF_MDS_READPAGE);
+        rc = req_capsule_pack(&info->mti_pill);
+        if (rc)
+                RETURN(rc);
+
+        reqbody = req_capsule_client_get(&info->mti_pill,
+                                         &RMF_MDT_BODY);
+        if (reqbody == NULL)
+                RETURN(-EFAULT);
+
+        child = mdt_object_find(info->mti_ctxt,
+                                info->mti_mdt, &reqbody->fid1);
+        if (IS_ERR(child))
+                RETURN(PTR_ERR(child));
+
+        repbody = req_capsule_server_get(&info->mti_pill,
+                                         &RMF_MDT_BODY);
+
+        if (repbody == NULL)
+                GOTO(out_child, -EFAULT);
+
+        /* prepare @rdpg before calling lower layers and transfer itself. */
+        rdpg.rp_offset = reqbody->size;
+        rdpg.rp_count = reqbody->nlink;
+        rdpg.rp_npages = (rdpg.rp_count + PAGE_SIZE - 1) >> PAGE_SHIFT;
+        
+        for (i = 0, tmpcount = rdpg.rp_count;
+             i < rdpg.rp_npages; i++, tmpcount -= tmpsize) {
+                rdpg.rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
+                if (rdpg.rp_pages[i] == NULL)
+                        GOTO(free_rdpg, rc = -ENOMEM);
+        }
+
+        /* call lower layers to fill allocated pages with directory data */
+        rc = mo_readpage(info->mti_ctxt,
+                         mdt_object_child(child), &rdpg);
+        if (rc)
+                GOTO(free_rdpg, rc);
+        
+        repbody->size = rdpg.rp_size;
+        repbody->valid = OBD_MD_FLSIZE;
+
+        /* send pages to client */
+        rc = mdt_sendpage(info, child, &rdpg);
+        
+        EXIT;
+free_rdpg:
+        for (i = 0; i < rdpg.rp_npages; i++)
+                if (rdpg.rp_pages[i])
+                        __free_pages(rdpg.rp_pages[i], 0);
+out_child:
+        mdt_object_put(info->mti_ctxt, child);
+        return rc;
 }
 
 static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op)
@@ -1059,7 +1183,6 @@ static int mdt_req_handle(struct mdt_thread_info *info,
                 result = mdt_unpack_req_pack_rep(info, flags);
         }
 
-
         if (result == 0 && flags & MUTABOR &&
             req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
                 result = -EROFS;
@@ -1309,7 +1432,7 @@ static int mdt_handle_common(struct ptlrpc_request *req,
 
         mdt_thread_info_init(req, info);
 
-        result = mdt_handle0(req, info, mdt_handlers);
+        result = mdt_handle0(req, info, supported);
 
         mdt_thread_info_fini(info);
         RETURN(result);
@@ -2795,6 +2918,9 @@ static struct mdt_opc_slice mdt_readpage_handlers[] = {
                 .mos_opc_start = MDS_READPAGE,
                 .mos_opc_end   = MDS_READPAGE + 1,
                 .mos_hs        = mdt_mds_readpage_ops
+        },
+        {
+                .mos_hs        = NULL
         }
 };
 
index 3bec91d..ae5404b 100644 (file)
@@ -820,6 +820,38 @@ int osd_xattr_set(const struct lu_context *ctxt, struct dt_object *dt,
         return 0;
 }
 
+int osd_readpage(const struct lu_context *ctxt,
+                 struct dt_object *dt, struct lu_rdpg *rdpg)
+{
+        struct osd_object *obj = osd_dt_obj(dt);
+        LASSERT(lu_object_exists(ctxt, &dt->do_lu));
+        LASSERT(osd_invariant(obj));
+
+        LASSERT(rdpg->rp_pages != NULL);
+        
+        /* check input params */
+        if ((rdpg->rp_offset & (obj->oo_inode->i_blksize - 1)) != 0) {
+                CERROR("offset "LPU64" not on a block boundary of %lu\n",
+                       rdpg->rp_offset, obj->oo_inode->i_blksize);
+                return -EFAULT;
+        }
+
+        if (rdpg->rp_count & (obj->oo_inode->i_blksize - 1)) {
+                CERROR("size %u is not multiple of blocksize %lu\n",
+                       rdpg->rp_count, obj->oo_inode->i_blksize);
+                return -EFAULT;
+        }
+
+        /* prepare output */
+        rdpg->rp_size = obj->oo_inode->i_size;
+
+        /*
+         * XXX: here should be filling pages by dir data in ext3 format and
+         * ->rp_pages already should be allocated.
+         */
+        return 0;
+}
+
 static struct dt_object_operations osd_obj_ops = {
         .do_lock      = osd_object_lock,
         .do_unlock    = osd_object_unlock,
@@ -830,7 +862,8 @@ static struct dt_object_operations osd_obj_ops = {
         .do_ref_add   = osd_object_ref_add,
         .do_ref_del   = osd_object_ref_del,
         .do_xattr_get = osd_xattr_get,
-        .do_xattr_set = osd_xattr_set
+        .do_xattr_set = osd_xattr_set,
+        .do_readpage  = osd_readpage
 };
 
 static struct dt_body_operations osd_body_ops = {