return rc;
}
+#ifdef HAVE_SPLIT_SUPPORT
+/*
+ * Retrieve dir entry from the page and insert it to the
+ * slave object, actually, this should be in osd layer,
+ * but since it will not in the final product, so just do
+ * it here and do not define more moo api anymore for
+ * this.
+ */
+static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page)
+{
+ struct mdt_object *object = info->mti_object;
+ struct lu_dirpage *dp;
+ struct lu_dirent *ent;
+ int rc = 0;
+
+ kmap(page);
+ dp = page_address(page);
+ for (ent = lu_dirent_start(dp); ent != NULL;
+ ent = lu_dirent_next(ent)) {
+ struct lu_fid *lf = &ent->lde_fid;
+
+ /* FIXME: check isdir */
+ rc = mdo_name_insert(info->mti_ctxt,
+ md_object_next(&object->mot_obj),
+ ent->lde_name, lf, 0);
+ /*FIXME: add cross_flags*/
+ if (rc) {
+ kunmap(page);
+ RETURN(rc);
+ }
+ }
+ kunmap(page);
+
+ RETURN(rc);
+}
+
+static int mdt_bulk_timeout(void *data)
+{
+ ENTRY;
+ /* We don't fail the connection here, because having the export
+ * killed makes the (vital) call to commitrw very sad.
+ */
+ RETURN(1);
+}
+
+static int mdt_writepage(struct mdt_thread_info *info)
+{
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct l_wait_info *lwi;
+ struct ptlrpc_bulk_desc *desc;
+ struct page *page;
+ int rc;
+ ENTRY;
+
+ desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
+ if (desc)
+ RETURN(-ENOMEM);
+
+ /* allocate the page for the desc */
+ page = alloc_pages(GFP_KERNEL, 0);
+ if (!page)
+ GOTO(desc_cleanup, rc = -ENOMEM);
+
+ ptlrpc_prep_bulk_page(desc, page, 0, CFS_PAGE_SIZE);
+
+ /* FIXME: following parts are copied from ost_brw_write */
+
+ /* Check if client was evicted while we were doing i/o before touching
+ network */
+ OBD_ALLOC_PTR(lwi);
+ if (!lwi)
+ GOTO(cleanup_page, rc = -ENOMEM);
+
+ if (desc->bd_export->exp_failed)
+ rc = -ENOTCONN;
+ else
+ rc = ptlrpc_start_bulk_transfer (desc);
+ if (rc == 0) {
+ *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
+ mdt_bulk_timeout, desc);
+ rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
+ desc->bd_export->exp_failed, lwi);
+ LASSERT(rc == 0 || rc == -ETIMEDOUT);
+ if (rc == -ETIMEDOUT) {
+ DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
+ ptlrpc_abort_bulk(desc);
+ } else if (desc->bd_export->exp_failed) {
+ DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
+ rc = -ENOTCONN;
+ ptlrpc_abort_bulk(desc);
+ } else if (!desc->bd_success ||
+ desc->bd_nob_transferred != desc->bd_nob) {
+ DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
+ desc->bd_success ?
+ "truncated" : "network error on",
+ desc->bd_nob_transferred, desc->bd_nob);
+ /* XXX should this be a different errno? */
+ rc = -ETIMEDOUT;
+ }
+ } else {
+ DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
+ }
+ if (rc)
+ GOTO(cleanup_lwi, rc);
+ rc = mdt_write_dir_page(info, page);
+
+cleanup_lwi:
+ OBD_FREE_PTR(lwi);
+cleanup_page:
+ __free_pages(page, 0);
+desc_cleanup:
+ ptlrpc_free_bulk(desc);
+ RETURN(rc);
+}
+#endif
+
static int mdt_readpage(struct mdt_thread_info *info)
{
struct mdt_object *object = info->mti_object;
static struct mdt_handler mdt_readpage_ops[] = {
DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
+#ifdef HAVE_SPLIT_SUPPORT
+ DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, WRITEPAGE, mdt_writepage),
+#endif
/*
* XXX: this is ugly and should be fixed one day, see mdc_close() for