RETURN(m);
}
+static inline void cmm_object_put(const struct lu_context *ctxt,
+ struct cmm_object *o)
+{
+ lu_object_put(ctxt, &o->cmo_obj.mo_lu);
+}
+
static int cmm_creat_remote_obj(const struct lu_context *ctx,
struct cmm_device *cmm,
struct lu_fid *fid, struct md_attr *ma)
spec, ma);
OBD_FREE_PTR(spec);
- RETURN(0);
+ cmm_object_put(ctx, obj);
+ RETURN(rc);
}
static int cmm_create_slave_objects(const struct lu_context *ctx,
lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
+ /* This lmv will be free after finish splitting. */
OBD_ALLOC(lmv, lmv_size);
if (!lmv)
RETURN(-ENOMEM);
- lmv->mea_master = -1;
+ lmv->mea_master = -1;
lmv->mea_magic = MEA_MAGIC_ALL_CHARS;
lmv->mea_count = cmm->cmm_tgt_count + 1;
lmv->mea_ids[0] = *lf;
-
+
rc = cmm_alloc_fid(ctx, cmm, &lmv->mea_ids[1], cmm->cmm_tgt_count);
if (rc)
GOTO(cleanup, rc);
GOTO(cleanup, rc);
}
- rc = mo_xattr_set(ctx, md_object_next(mo), lmv, lmv_size,
+ rc = mo_xattr_set(ctx, md_object_next(mo), lmv, lmv_size,
MDS_LMV_MD_NAME, 0);
+
+ ma->ma_lmv_size = lmv_size;
+ ma->ma_lmv = lmv;
cleanup:
- OBD_FREE(lmv, lmv_size);
RETURN(rc);
}
static int cmm_send_split_pages(const struct lu_context *ctx,
- struct md_object *mo, struct lu_rdpg *rdpg)
+ struct md_object *mo, struct lu_rdpg *rdpg,
+ struct lu_fid *fid)
{
- RETURN(0);
+ struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
+ struct cmm_object *obj;
+ int rc = 0, i;
+ ENTRY;
+
+ obj = cmm_object_find(ctx, cmm, fid);
+ if (IS_ERR(obj))
+ RETURN(PTR_ERR(obj));
+
+ for (i = 0; i < rdpg->rp_npages; i++) {
+ rc = mo_sendpage(ctx, md_object_next(&obj->cmo_obj),
+ rdpg->rp_pages[i]);
+ if (rc)
+ GOTO(cleanup, rc);
+ }
+cleanup:
+ cmm_object_put(ctx, obj);
+ RETURN(rc);
}
static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo,
- struct lu_rdpg *rdpg)
+ struct lu_rdpg *rdpg, struct lu_fid *lf)
{
struct lu_dirpage *dp;
__u32 hash_end;
- int rc;
+ int rc, i;
ENTRY;
+ /* init page with '0' */
+ for (i = 0; i < rdpg->rp_npages; i++) {
+ memset(kmap(rdpg->rp_pages[i]), 0, CFS_PAGE_SIZE);
+ kunmap(rdpg->rp_pages[i]);
+ }
+
+ /* Read splitted page and send them to the slave master */
do {
rc = mo_readpage(ctx, md_object_next(mo), rdpg);
if (rc)
RETURN(rc);
- rc = cmm_send_split_pages(ctx, mo, rdpg);
+ rc = cmm_send_split_pages(ctx, mo, rdpg, lf);
if (rc)
RETURN(rc);
}
static int cmm_remove_entries(const struct lu_context *ctx,
- struct md_object *mo, __u32 start, __u32 end)
+ struct md_object *mo, struct lu_rdpg *rdpg)
{
- RETURN(0);
+ struct lu_dirpage *dp;
+ struct lu_dirent *ent;
+ int rc = 0, i;
+ ENTRY;
+
+ for (i = 0; i < rdpg->rp_npages; i++) {
+ kmap(rdpg->rp_pages[i]);
+ dp = page_address(rdpg->rp_pages[i]);
+ for (ent = lu_dirent_start(dp); ent != NULL;
+ ent = lu_dirent_next(ent)) {
+ rc = mdo_name_remove(ctx, md_object_next(mo),
+ ent->lde_name);
+ if (rc) {
+ kunmap(rdpg->rp_pages[i]);
+ RETURN(rc);
+ }
+ }
+ kunmap(rdpg->rp_pages[i]);
+ }
+ RETURN(rc);
}
+
#define MAX_HASH_SIZE 0x3fffffff
#define SPLIT_PAGE_COUNT 1
-
static int cmm_scan_and_split(const struct lu_context *ctx,
struct md_object *mo, struct md_attr *ma)
{
if (rdpg->rp_pages[i] == NULL)
GOTO(cleanup, rc = -ENOMEM);
}
-
+
hash_segement = MAX_HASH_SIZE / cmm->cmm_tgt_count;
for (i = 1; i < cmm->cmm_tgt_count; i++) {
+ struct lu_fid *lf = &ma->ma_lmv->mea_ids[i];
+
rdpg->rp_hash = i * hash_segement;
rdpg->rp_hash_end = rdpg->rp_hash + hash_segement;
- rc = cmm_split_entries(ctx, mo, rdpg);
+ rc = cmm_split_entries(ctx, mo, rdpg, lf);
if (rc)
GOTO(cleanup, rc);
- rc = cmm_remove_entries(ctx, mo, rdpg->rp_hash,
- rdpg->rp_hash_end);
+ rc = cmm_remove_entries(ctx, mo, rdpg);
if (rc)
GOTO(cleanup, rc);
}
/* step3: scan and split the object */
rc = cmm_scan_and_split(ctx, mo, ma);
+
cleanup:
+ if (ma->ma_lmv_size && ma->ma_lmv)
+ OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
+
OBD_FREE_PTR(ma);
RETURN(rc);
RETURN(rc);
}
+#ifdef HAVE_SPLIT_SUPPORT
+static int mdc_sendpage(const struct lu_context *ctx, struct md_object *mo,
+ const struct page *page)
+{
+ struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
+ int rc;
+ ENTRY;
+
+ rc = md_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
+ page);
+
+ RETURN(rc);
+}
+#endif
+
static struct md_object_operations mdc_mo_ops = {
.moo_object_create = mdc_object_create,
.moo_ref_add = mdc_ref_add,
.moo_ref_del = mdc_ref_del,
+#ifdef HAVE_SPLIT_SUPPORT
+ .moo_sendpage = mdc_sendpage,
+#endif
};
/* md_dir_operations */
struct lu_dirent ldp_entries[0];
};
+static inline struct lu_dirent *lu_dirent_start(struct lu_dirpage *dp)
+{
+ return dp->ldp_entries;
+}
+
+static inline struct lu_dirent *lu_dirent_next(struct lu_dirent *ent)
+{
+ struct lu_dirent *next;
+
+ if (ent->lde_reclen != 0)
+ next = ((void *)ent) + le16_to_cpu(ent->lde_reclen);
+ else
+ next = NULL;
+ return next;
+}
+
#define MEA_MAGIC_LAST_CHAR 0xb2221ca1
#define MEA_MAGIC_ALL_CHARS 0xb222a11c
MDS_QUOTACTL = 48,
MDS_GETXATTR = 49,
MDS_SETXATTR = 50,
+ MDS_WRITEPAGE = 51,
MDS_LAST_OPC
} mds_cmd_t;
extern const struct req_format RQF_MDS_DISCONNECT;
extern const struct req_format RQF_MDS_READPAGE;
extern const struct req_format RQF_MDS_DONE_WRITING;
+#ifdef HAVE_SPLIT_SUPPORT
+extern const struct req_format RQF_MDS_WRITEPAGE;
+#endif
/*
* This is format of direct (non-intent) MDS_GETATTR_NAME request.
struct lu_attr ma_attr;
struct lov_mds_md *ma_lmm;
int ma_lmm_size;
- struct lmv_mds_md *ma_lmv;
+ struct lmv_stripe_md *ma_lmv;
int ma_lmv_size;
struct llog_cookie *ma_cookie;
int ma_cookie_size;
int (*moo_readpage)(const struct lu_context *, struct md_object *,
const struct lu_rdpg *);
+#ifdef HAVE_SPLIT_SUPPORT
+ int (*moo_writepage)(const struct lu_context *, struct md_object *,
+ const struct page *page);
+ int (*moo_sendpage)(const struct lu_context *, struct md_object *,
+ const struct page*);
+#endif
int (*moo_readlink)(const struct lu_context *ctxt,
struct md_object *obj,
void *buf, int buf_len);
return m->mo_ops->moo_readpage(cx, m, rdpg);
}
+#ifdef HAVE_SPLIT_SUPPORT
+static inline int mo_writepage(const struct lu_context *cx, struct md_object *m,
+ const struct page *pg)
+{
+ LASSERT(m->mo_ops->moo_writepage);
+ return m->mo_ops->moo_writepage(cx, m, pg);
+}
+
+static inline int mo_sendpage(const struct lu_context *cx, struct md_object *m,
+ const struct page *pg)
+{
+ LASSERT(m->mo_ops->moo_sendpage);
+ return m->mo_ops->moo_sendpage(cx, m, pg);
+}
+#endif
+
static inline int mo_object_create(const struct lu_context *cx,
struct md_object *m,
const struct md_create_spec *spc,
struct ptlrpc_request **);
int (*m_readpage)(struct obd_export *, const struct lu_fid *,
__u64, struct page *, struct ptlrpc_request **);
+#ifdef HAVE_SPLIT_SUPPORT
+ int (*m_sendpage)(struct obd_export *, const struct lu_fid *,
+ const struct page *);
+#endif
int (*m_unlink)(struct obd_export *, struct md_op_data *,
struct ptlrpc_request **);
RETURN(rc);
}
+#ifdef HAVE_SPLIT_SUPPORT
+static inline int md_sendpage(struct obd_export *exp,
+ const struct lu_fid *fid,
+ const struct page *page)
+{
+ int rc;
+ ENTRY;
+ EXP_CHECK_MD_OP(exp, sendpage);
+ MD_COUNTER_INCREMENT(exp->exp_obd, sendpage);
+ rc = MDP(exp->exp_obd, sendpage)(exp, fid, page);
+ RETURN(rc);
+}
+#endif
+
static inline int md_unlink(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request)
{
#define OBD_FAIL_MDS_SETXATTR_NET 0x132
#define OBD_FAIL_MDS_SETXATTR 0x133
#define OBD_FAIL_MDS_SETXATTR_WRITE 0x134
+#define OBD_FAIL_MDS_WRITEPAGE_NET 0x135
+#define OBD_FAIL_MDS_WRITEPAGE_PACK 0x136
#define OBD_FAIL_OST 0x200
#define OBD_FAIL_OST_CONNECT_NET 0x201
return default_llseek(filp, off, whence);
}
-static struct lu_dirent *lu_dirent_start(struct lu_dirpage *dp)
-{
- return dp->ldp_entries;
-}
-
-static struct lu_dirent *lu_dirent_next(struct lu_dirent *ent)
-{
- struct lu_dirent *next;
-
- if (ent->lde_reclen != 0)
- next = ((void *)ent) + le16_to_cpu(ent->lde_reclen);
- else
- next = NULL;
- return next;
-}
-
int ll_readdir(struct file *filp, void *cookie, filldir_t filldir)
{
struct inode *inode = filp->f_dentry->d_inode;
RETURN(rc);
}
+#ifdef HAVE_SPLIT_SUPPORT
+int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
+ const struct page *page)
+{
+ struct obd_import *imp = class_exp2cliimp(exp);
+ struct ptlrpc_request *req = NULL;
+ struct ptlrpc_bulk_desc *desc = NULL;
+ struct mdt_body *body;
+ int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ ENTRY;
+
+ CDEBUG(D_INODE, "object: "DFID"\n", PFID(fid));
+
+ req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_WRITEPAGE, 2, size,
+ NULL);
+ if (req == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ req->rq_request_portal = MDS_READPAGE_PORTAL;
+
+ desc = ptlrpc_prep_bulk_imp(req, 1, BULK_GET_SOURCE, MDS_BULK_PORTAL);
+ if (desc == NULL)
+ GOTO(out, rc = -ENOMEM);
+ /* NB req now owns desc and will free it when it gets freed */
+ ptlrpc_prep_bulk_page(desc, (struct page*)page, 0, PAGE_CACHE_SIZE);
+
+ mdc_readdir_pack(req, REQ_REC_OFF, 0, PAGE_CACHE_SIZE, fid);
+
+ ptlrpc_req_set_repsize(req, 2, size);
+ rc = ptlrpc_queue_wait(req);
+out:
+ ptlrpc_req_finished(req);
+ RETURN(rc);
+}
+#endif
+
int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid,
__u64 offset, struct page *page,
struct ptlrpc_request **request)
.m_getxattr = mdc_getxattr,
.m_sync = mdc_sync,
.m_readpage = mdc_readpage,
+#ifdef HAVE_SPLIT_SUPPORT
+ .m_sendpage = mdc_sendpage,
+#endif
.m_unlink = mdc_unlink,
.m_cancel_unused = mdc_cancel_unused,
.m_init_ea_size = mdc_init_ea_size,
return rc;
}
+#ifdef HAVE_SPLIT_SUPPORT
+/*
+ * Retrieve dir entry from the page and insert it to the
+ * slave object, actually, this should be in osd layer,
+ * but since it will not in the final product, so just do
+ * it here and do not define more moo api anymore for
+ * this.
+ */
+static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page)
+{
+ struct mdt_object *object = info->mti_object;
+ struct lu_dirpage *dp;
+ struct lu_dirent *ent;
+ int rc = 0;
+
+ kmap(page);
+ dp = page_address(page);
+ for (ent = lu_dirent_start(dp); ent != NULL;
+ ent = lu_dirent_next(ent)) {
+ struct lu_fid *lf = &ent->lde_fid;
+
+ /* FIXME: check isdir */
+ rc = mdo_name_insert(info->mti_ctxt,
+ md_object_next(&object->mot_obj),
+ ent->lde_name, lf, 0);
+ /*FIXME: add cross_flags*/
+ if (rc) {
+ kunmap(page);
+ RETURN(rc);
+ }
+ }
+ kunmap(page);
+
+ RETURN(rc);
+}
+
+static int mdt_bulk_timeout(void *data)
+{
+ ENTRY;
+ /* We don't fail the connection here, because having the export
+ * killed makes the (vital) call to commitrw very sad.
+ */
+ RETURN(1);
+}
+
+static int mdt_writepage(struct mdt_thread_info *info)
+{
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct l_wait_info *lwi;
+ struct ptlrpc_bulk_desc *desc;
+ struct page *page;
+ int rc;
+ ENTRY;
+
+ desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
+ if (desc)
+ RETURN(-ENOMEM);
+
+ /* allocate the page for the desc */
+ page = alloc_pages(GFP_KERNEL, 0);
+ if (!page)
+ GOTO(desc_cleanup, rc = -ENOMEM);
+
+ ptlrpc_prep_bulk_page(desc, page, 0, CFS_PAGE_SIZE);
+
+ /* FIXME: following parts are copied from ost_brw_write */
+
+ /* Check if client was evicted while we were doing i/o before touching
+ network */
+ OBD_ALLOC_PTR(lwi);
+ if (!lwi)
+ GOTO(cleanup_page, rc = -ENOMEM);
+
+ if (desc->bd_export->exp_failed)
+ rc = -ENOTCONN;
+ else
+ rc = ptlrpc_start_bulk_transfer (desc);
+ if (rc == 0) {
+ *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
+ mdt_bulk_timeout, desc);
+ rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
+ desc->bd_export->exp_failed, lwi);
+ LASSERT(rc == 0 || rc == -ETIMEDOUT);
+ if (rc == -ETIMEDOUT) {
+ DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
+ ptlrpc_abort_bulk(desc);
+ } else if (desc->bd_export->exp_failed) {
+ DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
+ rc = -ENOTCONN;
+ ptlrpc_abort_bulk(desc);
+ } else if (!desc->bd_success ||
+ desc->bd_nob_transferred != desc->bd_nob) {
+ DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
+ desc->bd_success ?
+ "truncated" : "network error on",
+ desc->bd_nob_transferred, desc->bd_nob);
+ /* XXX should this be a different errno? */
+ rc = -ETIMEDOUT;
+ }
+ } else {
+ DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
+ }
+ if (rc)
+ GOTO(cleanup_lwi, rc);
+ rc = mdt_write_dir_page(info, page);
+
+cleanup_lwi:
+ OBD_FREE_PTR(lwi);
+cleanup_page:
+ __free_pages(page, 0);
+desc_cleanup:
+ ptlrpc_free_bulk(desc);
+ RETURN(rc);
+}
+#endif
+
static int mdt_readpage(struct mdt_thread_info *info)
{
struct mdt_object *object = info->mti_object;
static struct mdt_handler mdt_readpage_ops[] = {
DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
+#ifdef HAVE_SPLIT_SUPPORT
+ DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, WRITEPAGE, mdt_writepage),
+#endif
/*
* XXX: this is ugly and should be fixed one day, see mdc_close() for
mdt_body_only, mdt_body_only);
EXPORT_SYMBOL(RQF_MDS_READPAGE);
+#ifdef HAVE_SPLIT_SUPPORT
+const struct req_format RQF_MDS_WRITEPAGE =
+ DEFINE_REQ_FMT0("MDS_WRITEPAGE",
+ mdt_body_only, mdt_body_only);
+EXPORT_SYMBOL(RQF_MDS_WRITEPAGE);
+#endif
+
#if !defined(__REQ_LAYOUT_USER__)
int req_layout_init(void)