Whamcloud - gitweb
Branch: b_new_cmd
authorwangdi <wangdi>
Fri, 1 Sep 2006 13:01:50 +0000 (13:01 +0000)
committerwangdi <wangdi>
Fri, 1 Sep 2006 13:01:50 +0000 (13:01 +0000)
some updates and fixes for splitting dir

12 files changed:
lustre/cmm/cmm_split.c
lustre/cmm/mdc_object.c
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_req_layout.h
lustre/include/md_object.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/include/obd_support.h
lustre/llite/dir.c
lustre/mdc/mdc_request.c
lustre/mdt/mdt_handler.c
lustre/ptlrpc/layout.c

index 7ea4174..00ba740 100644 (file)
@@ -125,6 +125,12 @@ struct cmm_object *cmm_object_find(const struct lu_context *ctxt,
         RETURN(m);
 }
 
+static inline void cmm_object_put(const struct lu_context *ctxt,
+                                  struct cmm_object *o)
+{
+        lu_object_put(ctxt, &o->cmo_obj.mo_lu);
+}
+
 static int cmm_creat_remote_obj(const struct lu_context *ctx, 
                                 struct cmm_device *cmm,
                                 struct lu_fid *fid, struct md_attr *ma)
@@ -143,7 +149,8 @@ static int cmm_creat_remote_obj(const struct lu_context *ctx,
                               spec, ma);
         OBD_FREE_PTR(spec);
 
-        RETURN(0);
+        cmm_object_put(ctx, obj);
+        RETURN(rc);
 }
 
 static int cmm_create_slave_objects(const struct lu_context *ctx,
@@ -157,16 +164,17 @@ static int cmm_create_slave_objects(const struct lu_context *ctx,
 
         lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
 
+        /* This lmv will be free after finish splitting. */
         OBD_ALLOC(lmv, lmv_size);
         if (!lmv)
                 RETURN(-ENOMEM);
 
-        lmv->mea_master = -1; 
+        lmv->mea_master = -1;
         lmv->mea_magic = MEA_MAGIC_ALL_CHARS;
         lmv->mea_count = cmm->cmm_tgt_count + 1;
 
         lmv->mea_ids[0] = *lf;
-        
+
         rc = cmm_alloc_fid(ctx, cmm, &lmv->mea_ids[1], cmm->cmm_tgt_count);
         if (rc)
                 GOTO(cleanup, rc);
@@ -177,33 +185,60 @@ static int cmm_create_slave_objects(const struct lu_context *ctx,
                         GOTO(cleanup, rc);
         }
 
-        rc = mo_xattr_set(ctx, md_object_next(mo), lmv, lmv_size, 
+        rc = mo_xattr_set(ctx, md_object_next(mo), lmv, lmv_size,
                           MDS_LMV_MD_NAME, 0);
+
+        ma->ma_lmv_size = lmv_size;
+        ma->ma_lmv = lmv;
 cleanup:
-        OBD_FREE(lmv, lmv_size);
         RETURN(rc);
 }
 
 static int cmm_send_split_pages(const struct lu_context *ctx, 
-                                struct md_object *mo, struct lu_rdpg *rdpg)
+                                struct md_object *mo, struct lu_rdpg *rdpg, 
+                                struct lu_fid *fid)
 {
-        RETURN(0);
+        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
+        struct cmm_object *obj;
+        int rc = 0, i;
+        ENTRY;
+
+        obj = cmm_object_find(ctx, cmm, fid);
+        if (IS_ERR(obj))
+                RETURN(PTR_ERR(obj));
+
+        for (i = 0; i < rdpg->rp_npages; i++) {
+                rc = mo_sendpage(ctx, md_object_next(&obj->cmo_obj),
+                                  rdpg->rp_pages[i]);
+                if (rc)
+                        GOTO(cleanup, rc);
+        }
+cleanup:
+        cmm_object_put(ctx, obj);
+        RETURN(rc);
 }
 
 static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo,
-                             struct lu_rdpg *rdpg)
+                             struct lu_rdpg *rdpg, struct lu_fid *lf)
 {
         struct lu_dirpage *dp;
         __u32 hash_end;
-        int rc;
+        int rc, i;
         ENTRY;
 
+        /* init page with '0' */
+        for (i = 0; i < rdpg->rp_npages; i++) {
+                memset(kmap(rdpg->rp_pages[i]), 0, CFS_PAGE_SIZE);
+                kunmap(rdpg->rp_pages[i]);
+        }
+
+        /* Read splitted page and send them to the slave master */
         do {
                 rc = mo_readpage(ctx, md_object_next(mo), rdpg);
                 if (rc)
                         RETURN(rc);
 
-                rc = cmm_send_split_pages(ctx, mo, rdpg);
+                rc = cmm_send_split_pages(ctx, mo, rdpg, lf);
                 if (rc)
                         RETURN(rc);
 
@@ -218,13 +253,32 @@ static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo,
 }
 
 static int cmm_remove_entries(const struct lu_context *ctx, 
-                              struct md_object *mo, __u32 start, __u32 end)
+                              struct md_object *mo, struct lu_rdpg *rdpg)
 {
-        RETURN(0);
+        struct lu_dirpage *dp;
+        struct lu_dirent  *ent;
+        int rc = 0, i;
+        ENTRY;
+
+        for (i = 0; i < rdpg->rp_npages; i++) {
+                kmap(rdpg->rp_pages[i]);
+                dp = page_address(rdpg->rp_pages[i]);
+                for (ent = lu_dirent_start(dp); ent != NULL;
+                                  ent = lu_dirent_next(ent)) {
+                        rc = mdo_name_remove(ctx, md_object_next(mo),
+                                             ent->lde_name);
+                        if (rc) {
+                                kunmap(rdpg->rp_pages[i]);
+                                RETURN(rc);
+                        }
+                }
+                kunmap(rdpg->rp_pages[i]);
+        }
+        RETURN(rc);
 }
+
 #define MAX_HASH_SIZE 0x3fffffff
 #define SPLIT_PAGE_COUNT 1
-
 static int cmm_scan_and_split(const struct lu_context *ctx,
                               struct md_object *mo, struct md_attr *ma)
 {
@@ -249,16 +303,17 @@ static int cmm_scan_and_split(const struct lu_context *ctx,
                 if (rdpg->rp_pages[i] == NULL)
                         GOTO(cleanup, rc = -ENOMEM);
         }
-        
+
         hash_segement = MAX_HASH_SIZE / cmm->cmm_tgt_count;
         for (i = 1; i < cmm->cmm_tgt_count; i++) {
+                struct lu_fid *lf = &ma->ma_lmv->mea_ids[i];
+
                 rdpg->rp_hash = i * hash_segement;
                 rdpg->rp_hash_end = rdpg->rp_hash + hash_segement;
-                rc = cmm_split_entries(ctx, mo, rdpg);
+                rc = cmm_split_entries(ctx, mo, rdpg, lf);
                 if (rc)
                         GOTO(cleanup, rc);
-                rc = cmm_remove_entries(ctx, mo, rdpg->rp_hash, 
-                                        rdpg->rp_hash_end);
+                rc = cmm_remove_entries(ctx, mo, rdpg);
                 if (rc)
                         GOTO(cleanup, rc);
         }
@@ -305,7 +360,11 @@ int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo)
 
         /* step3: scan and split the object */
         rc = cmm_scan_and_split(ctx, mo, ma);
+
 cleanup:
+        if (ma->ma_lmv_size && ma->ma_lmv)
+                OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
+        
         OBD_FREE_PTR(ma);
 
         RETURN(rc);
index 316f6f5..491b31e 100644 (file)
@@ -236,10 +236,28 @@ static int mdc_ref_del(const struct lu_context *ctx, struct md_object *mo,
         RETURN(rc);
 }
 
+#ifdef HAVE_SPLIT_SUPPORT
+static int mdc_sendpage(const struct lu_context *ctx, struct md_object *mo,
+                        const struct page *page)
+{
+        struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
+        int rc;
+        ENTRY;
+
+        rc = md_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
+                         page);
+
+        RETURN(rc);
+}
+#endif
+
 static struct md_object_operations mdc_mo_ops = {
         .moo_object_create  = mdc_object_create,
         .moo_ref_add        = mdc_ref_add,
         .moo_ref_del        = mdc_ref_del,
+#ifdef HAVE_SPLIT_SUPPORT
+        .moo_sendpage      = mdc_sendpage,
+#endif
 };
 
 /* md_dir_operations */
index 325811b..b6d1ddf 100644 (file)
@@ -294,6 +294,22 @@ struct lu_dirpage {
         struct lu_dirent ldp_entries[0];
 };
 
+static inline struct lu_dirent *lu_dirent_start(struct lu_dirpage *dp)
+{
+        return dp->ldp_entries;
+}
+
+static inline struct lu_dirent *lu_dirent_next(struct lu_dirent *ent)
+{
+        struct lu_dirent *next;
+
+        if (ent->lde_reclen != 0)
+                next = ((void *)ent) + le16_to_cpu(ent->lde_reclen);
+        else
+                next = NULL;
+        return next;
+}
+
 #define MEA_MAGIC_LAST_CHAR      0xb2221ca1
 #define MEA_MAGIC_ALL_CHARS      0xb222a11c
 
@@ -813,6 +829,7 @@ typedef enum {
         MDS_QUOTACTL     = 48,
         MDS_GETXATTR     = 49,
         MDS_SETXATTR     = 50,
+        MDS_WRITEPAGE     = 51,
         MDS_LAST_OPC
 } mds_cmd_t;
 
index 44b876f..e9c7fdd 100644 (file)
@@ -103,6 +103,9 @@ extern const struct req_format RQF_MDS_CONNECT;
 extern const struct req_format RQF_MDS_DISCONNECT;
 extern const struct req_format RQF_MDS_READPAGE;
 extern const struct req_format RQF_MDS_DONE_WRITING;
+#ifdef HAVE_SPLIT_SUPPORT
+extern const struct req_format RQF_MDS_WRITEPAGE;
+#endif
 
 /*
  * This is format of direct (non-intent) MDS_GETATTR_NAME request.
index 3cd8723..048919c 100644 (file)
@@ -62,7 +62,7 @@ struct md_attr {
         struct lu_attr          ma_attr;
         struct lov_mds_md      *ma_lmm;
         int                     ma_lmm_size;
-        struct lmv_mds_md      *ma_lmv;
+        struct lmv_stripe_md   *ma_lmv;
         int                     ma_lmv_size;
         struct llog_cookie     *ma_cookie;
         int                     ma_cookie_size;
@@ -111,6 +111,12 @@ struct md_object_operations {
 
         int (*moo_readpage)(const struct lu_context *, struct md_object *,
                             const struct lu_rdpg *);
+#ifdef HAVE_SPLIT_SUPPORT
+        int (*moo_writepage)(const struct lu_context *, struct md_object *,
+                            const struct page *page);
+        int (*moo_sendpage)(const struct lu_context *, struct md_object *,
+                            const struct page*);
+#endif
         int (*moo_readlink)(const struct lu_context *ctxt,
                             struct md_object *obj,
                             void *buf, int buf_len);
@@ -318,6 +324,22 @@ static inline int mo_readpage(const struct lu_context *cx, struct md_object *m,
         return m->mo_ops->moo_readpage(cx, m, rdpg);
 }
 
+#ifdef HAVE_SPLIT_SUPPORT
+static inline int mo_writepage(const struct lu_context *cx, struct md_object *m,
+                               const struct page *pg)
+{
+        LASSERT(m->mo_ops->moo_writepage);
+        return m->mo_ops->moo_writepage(cx, m, pg);
+}
+
+static inline int mo_sendpage(const struct lu_context *cx, struct md_object *m,
+                              const struct page *pg)
+{
+        LASSERT(m->mo_ops->moo_sendpage);
+        return m->mo_ops->moo_sendpage(cx, m, pg);
+}
+#endif
+
 static inline int mo_object_create(const struct lu_context *cx,
                                    struct md_object *m,
                                    const struct md_create_spec *spc,
index 6b029ed..5abc211 100644 (file)
@@ -1126,6 +1126,10 @@ struct md_ops {
                       struct ptlrpc_request **);
         int (*m_readpage)(struct obd_export *, const struct lu_fid *,
                           __u64, struct page *, struct ptlrpc_request **);
+#ifdef HAVE_SPLIT_SUPPORT
+        int (*m_sendpage)(struct obd_export *, const struct lu_fid *,
+                          const struct page *);
+#endif
         int (*m_unlink)(struct obd_export *, struct md_op_data *,
                         struct ptlrpc_request **);
 
index 7e36cbb..216a927 100644 (file)
@@ -1751,6 +1751,20 @@ static inline int md_readpage(struct obd_export *exp,
         RETURN(rc);
 }
 
+#ifdef HAVE_SPLIT_SUPPORT
+static inline int md_sendpage(struct obd_export *exp,
+                              const struct lu_fid *fid,
+                              const struct page *page)
+{
+        int rc;
+        ENTRY;
+        EXP_CHECK_MD_OP(exp, sendpage);
+        MD_COUNTER_INCREMENT(exp->exp_obd, sendpage);
+        rc = MDP(exp->exp_obd, sendpage)(exp, fid, page);
+        RETURN(rc);
+}
+#endif
+
 static inline int md_unlink(struct obd_export *exp, struct md_op_data *op_data,
                             struct ptlrpc_request **request)
 {
index 40e57cd..e38e2b1 100644 (file)
@@ -92,6 +92,8 @@ extern int obd_race_state;
 #define OBD_FAIL_MDS_SETXATTR_NET        0x132
 #define OBD_FAIL_MDS_SETXATTR            0x133
 #define OBD_FAIL_MDS_SETXATTR_WRITE      0x134
+#define OBD_FAIL_MDS_WRITEPAGE_NET       0x135
+#define OBD_FAIL_MDS_WRITEPAGE_PACK      0x136
 
 #define OBD_FAIL_OST                     0x200
 #define OBD_FAIL_OST_CONNECT_NET         0x201
index 983db99..9b361d2 100644 (file)
@@ -393,22 +393,6 @@ static loff_t ll_llseek(struct file *filp, loff_t off, int whence)
         return default_llseek(filp, off, whence);
 }
 
-static struct lu_dirent *lu_dirent_start(struct lu_dirpage *dp)
-{
-        return dp->ldp_entries;
-}
-
-static struct lu_dirent *lu_dirent_next(struct lu_dirent *ent)
-{
-        struct lu_dirent *next;
-
-        if (ent->lde_reclen != 0)
-                next = ((void *)ent) + le16_to_cpu(ent->lde_reclen);
-        else
-                next = NULL;
-        return next;
-}
-
 int ll_readdir(struct file *filp, void *cookie, filldir_t filldir)
 {
         struct inode         *inode = filp->f_dentry->d_inode;
index 7eeb499..d754d82 100644 (file)
@@ -728,6 +728,42 @@ int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data)
         RETURN(rc);
 }
 
+#ifdef HAVE_SPLIT_SUPPORT
+int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
+                 const struct page *page)
+{
+        struct obd_import *imp = class_exp2cliimp(exp);
+        struct ptlrpc_request *req = NULL;
+        struct ptlrpc_bulk_desc *desc = NULL;
+        struct mdt_body *body;
+        int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        ENTRY;
+
+        CDEBUG(D_INODE, "object: "DFID"\n", PFID(fid));
+
+        req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_WRITEPAGE, 2, size,
+                              NULL);
+        if (req == NULL)
+                GOTO(out, rc = -ENOMEM);
+
+        req->rq_request_portal = MDS_READPAGE_PORTAL;
+
+        desc = ptlrpc_prep_bulk_imp(req, 1, BULK_GET_SOURCE, MDS_BULK_PORTAL);
+        if (desc == NULL)
+                GOTO(out, rc = -ENOMEM);
+        /* NB req now owns desc and will free it when it gets freed */
+        ptlrpc_prep_bulk_page(desc, (struct page*)page, 0, PAGE_CACHE_SIZE);
+
+        mdc_readdir_pack(req, REQ_REC_OFF, 0, PAGE_CACHE_SIZE, fid);
+
+        ptlrpc_req_set_repsize(req, 2, size);
+        rc = ptlrpc_queue_wait(req);
+out:
+        ptlrpc_req_finished(req);
+        RETURN(rc);
+}
+#endif
+
 int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid,
                  __u64 offset, struct page *page,
                  struct ptlrpc_request **request)
@@ -1374,6 +1410,9 @@ struct md_ops mdc_md_ops = {
         .m_getxattr         = mdc_getxattr,
         .m_sync             = mdc_sync,
         .m_readpage         = mdc_readpage,
+#ifdef HAVE_SPLIT_SUPPORT
+        .m_sendpage         = mdc_sendpage,
+#endif
         .m_unlink           = mdc_unlink,
         .m_cancel_unused    = mdc_cancel_unused,
         .m_init_ea_size     = mdc_init_ea_size,
index c4069a3..4be567e 100644 (file)
@@ -561,6 +561,122 @@ out:
         return rc;
 }
 
+#ifdef HAVE_SPLIT_SUPPORT
+/* 
+ * Retrieve dir entry from the page and insert it to the
+ * slave object, actually, this should be in osd layer, 
+ * but since it will not in the final product, so just do
+ * it here and do not define more moo api anymore for 
+ * this.
+ */
+static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page)
+{
+        struct mdt_object *object = info->mti_object;
+        struct lu_dirpage *dp;
+        struct lu_dirent *ent;
+        int rc = 0;
+
+        kmap(page);
+        dp = page_address(page);
+        for (ent = lu_dirent_start(dp); ent != NULL;
+                          ent = lu_dirent_next(ent)) {
+                struct lu_fid *lf = &ent->lde_fid;
+
+                /* FIXME: check isdir */
+                rc = mdo_name_insert(info->mti_ctxt, 
+                                     md_object_next(&object->mot_obj),
+                                     ent->lde_name, lf, 0);
+                /*FIXME: add cross_flags*/
+                if (rc) {
+                        kunmap(page);
+                        RETURN(rc);
+                }
+        }
+        kunmap(page);
+        
+        RETURN(rc);
+}
+
+static int mdt_bulk_timeout(void *data)
+{
+        ENTRY;
+        /* We don't fail the connection here, because having the export
+         * killed makes the (vital) call to commitrw very sad.
+         */
+        RETURN(1);
+}
+
+static int mdt_writepage(struct mdt_thread_info *info)
+{
+        struct ptlrpc_request   *req = mdt_info_req(info);
+        struct l_wait_info      *lwi;
+        struct ptlrpc_bulk_desc *desc;
+        struct page             *page;
+        int                rc;
+        ENTRY;
+
+        desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
+        if (desc)
+                RETURN(-ENOMEM);
+
+        /* allocate the page for the desc */
+        page = alloc_pages(GFP_KERNEL, 0);
+        if (!page)
+                GOTO(desc_cleanup, rc = -ENOMEM);
+        
+        ptlrpc_prep_bulk_page(desc, page, 0, CFS_PAGE_SIZE);
+
+        /* FIXME: following parts are copied from ost_brw_write */
+
+        /* Check if client was evicted while we were doing i/o before touching
+           network */
+        OBD_ALLOC_PTR(lwi);
+        if (!lwi)
+                GOTO(cleanup_page, rc = -ENOMEM);
+
+        if (desc->bd_export->exp_failed)
+                rc = -ENOTCONN;
+        else
+                rc = ptlrpc_start_bulk_transfer (desc);
+        if (rc == 0) {
+                *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
+                                            mdt_bulk_timeout, desc);
+                rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
+                                  desc->bd_export->exp_failed, lwi);
+                LASSERT(rc == 0 || rc == -ETIMEDOUT);
+                if (rc == -ETIMEDOUT) {
+                        DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
+                        ptlrpc_abort_bulk(desc);
+                } else if (desc->bd_export->exp_failed) {
+                        DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
+                        rc = -ENOTCONN;
+                        ptlrpc_abort_bulk(desc);
+                } else if (!desc->bd_success ||
+                           desc->bd_nob_transferred != desc->bd_nob) {
+                        DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
+                                  desc->bd_success ?
+                                  "truncated" : "network error on",
+                                  desc->bd_nob_transferred, desc->bd_nob);
+                        /* XXX should this be a different errno? */
+                        rc = -ETIMEDOUT;
+                }
+        } else {
+                DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
+        }
+        if (rc)
+                GOTO(cleanup_lwi, rc);
+        rc = mdt_write_dir_page(info, page);
+
+cleanup_lwi:
+        OBD_FREE_PTR(lwi);
+cleanup_page:
+        __free_pages(page, 0);
+desc_cleanup:
+        ptlrpc_free_bulk(desc);
+        RETURN(rc);
+}
+#endif
+
 static int mdt_readpage(struct mdt_thread_info *info)
 {
         struct mdt_object *object = info->mti_object;
@@ -3015,6 +3131,9 @@ static struct mdt_opc_slice mdt_regular_handlers[] = {
 
 static struct mdt_handler mdt_readpage_ops[] = {
         DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
+#ifdef HAVE_SPLIT_SUPPORT
+        DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, WRITEPAGE, mdt_writepage),
+#endif
 
         /*
          * XXX: this is ugly and should be fixed one day, see mdc_close() for
index 6c872ed..5226cba 100644 (file)
@@ -611,6 +611,13 @@ const struct req_format RQF_MDS_READPAGE =
                         mdt_body_only, mdt_body_only);
 EXPORT_SYMBOL(RQF_MDS_READPAGE);
 
+#ifdef HAVE_SPLIT_SUPPORT
+const struct req_format RQF_MDS_WRITEPAGE =
+        DEFINE_REQ_FMT0("MDS_WRITEPAGE",
+                        mdt_body_only, mdt_body_only);
+EXPORT_SYMBOL(RQF_MDS_WRITEPAGE);
+#endif
+
 #if !defined(__REQ_LAYOUT_USER__)
 
 int req_layout_init(void)