Whamcloud - gitweb
Branch: b_new_cmd
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index c4069a3..4be567e 100644 (file)
@@ -561,6 +561,122 @@ out:
         return rc;
 }
 
+#ifdef HAVE_SPLIT_SUPPORT
+/* 
+ * Retrieve dir entry from the page and insert it to the
+ * slave object, actually, this should be in osd layer, 
+ * but since it will not in the final product, so just do
+ * it here and do not define more moo api anymore for 
+ * this.
+ */
+static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page)
+{
+        struct mdt_object *object = info->mti_object;
+        struct lu_dirpage *dp;
+        struct lu_dirent *ent;
+        int rc = 0;
+
+        kmap(page);
+        dp = page_address(page);
+        for (ent = lu_dirent_start(dp); ent != NULL;
+                          ent = lu_dirent_next(ent)) {
+                struct lu_fid *lf = &ent->lde_fid;
+
+                /* FIXME: check isdir */
+                rc = mdo_name_insert(info->mti_ctxt, 
+                                     md_object_next(&object->mot_obj),
+                                     ent->lde_name, lf, 0);
+                /*FIXME: add cross_flags*/
+                if (rc) {
+                        kunmap(page);
+                        RETURN(rc);
+                }
+        }
+        kunmap(page);
+        
+        RETURN(rc);
+}
+
+static int mdt_bulk_timeout(void *data)
+{
+        ENTRY;
+        /* We don't fail the connection here, because having the export
+         * killed makes the (vital) call to commitrw very sad.
+         */
+        RETURN(1);
+}
+
+static int mdt_writepage(struct mdt_thread_info *info)
+{
+        struct ptlrpc_request   *req = mdt_info_req(info);
+        struct l_wait_info      *lwi;
+        struct ptlrpc_bulk_desc *desc;
+        struct page             *page;
+        int                rc;
+        ENTRY;
+
+        desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
+        if (desc)
+                RETURN(-ENOMEM);
+
+        /* allocate the page for the desc */
+        page = alloc_pages(GFP_KERNEL, 0);
+        if (!page)
+                GOTO(desc_cleanup, rc = -ENOMEM);
+        
+        ptlrpc_prep_bulk_page(desc, page, 0, CFS_PAGE_SIZE);
+
+        /* FIXME: following parts are copied from ost_brw_write */
+
+        /* Check if client was evicted while we were doing i/o before touching
+           network */
+        OBD_ALLOC_PTR(lwi);
+        if (!lwi)
+                GOTO(cleanup_page, rc = -ENOMEM);
+
+        if (desc->bd_export->exp_failed)
+                rc = -ENOTCONN;
+        else
+                rc = ptlrpc_start_bulk_transfer (desc);
+        if (rc == 0) {
+                *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
+                                            mdt_bulk_timeout, desc);
+                rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
+                                  desc->bd_export->exp_failed, lwi);
+                LASSERT(rc == 0 || rc == -ETIMEDOUT);
+                if (rc == -ETIMEDOUT) {
+                        DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
+                        ptlrpc_abort_bulk(desc);
+                } else if (desc->bd_export->exp_failed) {
+                        DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
+                        rc = -ENOTCONN;
+                        ptlrpc_abort_bulk(desc);
+                } else if (!desc->bd_success ||
+                           desc->bd_nob_transferred != desc->bd_nob) {
+                        DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
+                                  desc->bd_success ?
+                                  "truncated" : "network error on",
+                                  desc->bd_nob_transferred, desc->bd_nob);
+                        /* XXX should this be a different errno? */
+                        rc = -ETIMEDOUT;
+                }
+        } else {
+                DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
+        }
+        if (rc)
+                GOTO(cleanup_lwi, rc);
+        rc = mdt_write_dir_page(info, page);
+
+cleanup_lwi:
+        OBD_FREE_PTR(lwi);
+cleanup_page:
+        __free_pages(page, 0);
+desc_cleanup:
+        ptlrpc_free_bulk(desc);
+        RETURN(rc);
+}
+#endif
+
 static int mdt_readpage(struct mdt_thread_info *info)
 {
         struct mdt_object *object = info->mti_object;
@@ -3015,6 +3131,9 @@ static struct mdt_opc_slice mdt_regular_handlers[] = {
 
 static struct mdt_handler mdt_readpage_ops[] = {
         DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
+#ifdef HAVE_SPLIT_SUPPORT
+        DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, WRITEPAGE, mdt_writepage),
+#endif
 
         /*
          * XXX: this is ugly and should be fixed one day, see mdc_close() for