return target_handle_disconnect(mdt_info_req(info));
}
+static int mdt_sendpage(struct mdt_thread_info *info,
+ struct mdt_object *object,
+ struct lu_rdpg *rdpg)
+{
+ int rc = 0, npages, i, tmpcount, tmpsize = 0;
+ struct ptlrpc_bulk_desc *desc;
+ struct ptlrpc_request *req;
+ struct l_wait_info lwi;
+ ENTRY;
+
+ LASSERT((rdpg->rp_offset & (PAGE_SIZE - 1)) == 0);
+
+ req = info->mti_pill.rc_req;
+ npages = (rdpg->rp_count + PAGE_SIZE - 1) >> PAGE_SHIFT;
+ desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE,
+ MDS_BULK_PORTAL);
+ if (desc == NULL)
+ GOTO(free_desc, rc = -ENOMEM);
+
+ for (i = 0, tmpcount = rdpg->rp_count; i < npages; i++, tmpcount -= tmpsize) {
+ tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
+ ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
+ }
+
+ LASSERT(desc->bd_nob == rdpg->rp_count);
+ rc = ptlrpc_start_bulk_transfer(desc);
+ if (rc)
+ GOTO(free_desc, rc);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
+ CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
+ OBD_FAIL_MDS_SENDPAGE, rc);
+ GOTO(abort_bulk, rc);
+ }
+
+ lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
+ rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
+ LASSERT (rc == 0 || rc == -ETIMEDOUT);
+
+ if (rc == 0) {
+ if (desc->bd_success &&
+ desc->bd_nob_transferred == rdpg->rp_count)
+ GOTO(free_desc, rc);
+
+ rc = -ETIMEDOUT; /* XXX should this be a different errno? */
+ }
+
+ DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
+ (rc == -ETIMEDOUT) ? "timeout" : "network error",
+ desc->bd_nob_transferred, rdpg->rp_count,
+ req->rq_export->exp_client_uuid.uuid,
+ req->rq_export->exp_connection->c_remote_uuid.uuid);
+
+ class_fail_export(req->rq_export);
+
+ EXIT;
+abort_bulk:
+ ptlrpc_abort_bulk(desc);
+free_desc:
+ ptlrpc_free_bulk(desc);
+ return rc;
+}
+
static int mdt_readpage(struct mdt_thread_info *info)
{
- return -EOPNOTSUPP;
+ struct mdt_body *reqbody, *repbody;
+ int rc, i, tmpcount, tmpsize = 0;
+ struct lu_rdpg rdpg = { 0 };
+ struct mdt_object *child;
+ ENTRY;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
+ RETURN(-ENOMEM);
+
+ req_capsule_set(&info->mti_pill, &RQF_MDS_READPAGE);
+ rc = req_capsule_pack(&info->mti_pill);
+ if (rc)
+ RETURN(rc);
+
+ reqbody = req_capsule_client_get(&info->mti_pill,
+ &RMF_MDT_BODY);
+ if (reqbody == NULL)
+ RETURN(-EFAULT);
+
+ child = mdt_object_find(info->mti_ctxt,
+ info->mti_mdt, &reqbody->fid1);
+ if (IS_ERR(child))
+ RETURN(PTR_ERR(child));
+
+ repbody = req_capsule_server_get(&info->mti_pill,
+ &RMF_MDT_BODY);
+
+ if (repbody == NULL)
+ GOTO(out_child, -EFAULT);
+
+ /* prepare @rdpg before calling lower layers and transfer itself. */
+ rdpg.rp_offset = reqbody->size;
+ rdpg.rp_count = reqbody->nlink;
+ rdpg.rp_npages = (rdpg.rp_count + PAGE_SIZE - 1) >> PAGE_SHIFT;
+
+ for (i = 0, tmpcount = rdpg.rp_count;
+ i < rdpg.rp_npages; i++, tmpcount -= tmpsize) {
+ rdpg.rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
+ if (rdpg.rp_pages[i] == NULL)
+ GOTO(free_rdpg, rc = -ENOMEM);
+ }
+
+ /* call lower layers to fill allocated pages with directory data */
+ rc = mo_readpage(info->mti_ctxt,
+ mdt_object_child(child), &rdpg);
+ if (rc)
+ GOTO(free_rdpg, rc);
+
+ repbody->size = rdpg.rp_size;
+ repbody->valid = OBD_MD_FLSIZE;
+
+ /* send pages to client */
+ rc = mdt_sendpage(info, child, &rdpg);
+
+ EXIT;
+free_rdpg:
+ for (i = 0; i < rdpg.rp_npages; i++)
+ if (rdpg.rp_pages[i])
+ __free_pages(rdpg.rp_pages[i], 0);
+out_child:
+ mdt_object_put(info->mti_ctxt, child);
+ return rc;
}
static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op)
result = mdt_unpack_req_pack_rep(info, flags);
}
-
if (result == 0 && flags & MUTABOR &&
req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
result = -EROFS;
mdt_thread_info_init(req, info);
- result = mdt_handle0(req, info, mdt_handlers);
+ result = mdt_handle0(req, info, supported);
mdt_thread_info_fini(info);
RETURN(result);
.mos_opc_start = MDS_READPAGE,
.mos_opc_end = MDS_READPAGE + 1,
.mos_hs = mdt_mds_readpage_ops
+ },
+ {
+ .mos_hs = NULL
}
};
return 0;
}
+int osd_readpage(const struct lu_context *ctxt,
+ struct dt_object *dt, struct lu_rdpg *rdpg)
+{
+ struct osd_object *obj = osd_dt_obj(dt);
+ LASSERT(lu_object_exists(ctxt, &dt->do_lu));
+ LASSERT(osd_invariant(obj));
+
+ LASSERT(rdpg->rp_pages != NULL);
+
+ /* check input params */
+ if ((rdpg->rp_offset & (obj->oo_inode->i_blksize - 1)) != 0) {
+ CERROR("offset "LPU64" not on a block boundary of %lu\n",
+ rdpg->rp_offset, obj->oo_inode->i_blksize);
+ return -EFAULT;
+ }
+
+ if (rdpg->rp_count & (obj->oo_inode->i_blksize - 1)) {
+ CERROR("size %u is not multiple of blocksize %lu\n",
+ rdpg->rp_count, obj->oo_inode->i_blksize);
+ return -EFAULT;
+ }
+
+ /* prepare output */
+ rdpg->rp_size = obj->oo_inode->i_size;
+
+ /*
+ * XXX: here should be filling pages by dir data in ext3 format and
+ * ->rp_pages already should be allocated.
+ */
+ return 0;
+}
+
static struct dt_object_operations osd_obj_ops = {
.do_lock = osd_object_lock,
.do_unlock = osd_object_unlock,
.do_ref_add = osd_object_ref_add,
.do_ref_del = osd_object_ref_del,
.do_xattr_get = osd_xattr_get,
- .do_xattr_set = osd_xattr_set
+ .do_xattr_set = osd_xattr_set,
+ .do_readpage = osd_readpage
};
static struct dt_body_operations osd_body_ops = {