return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops);
}
-static int cmm_root_get(const struct lu_context *ctx, struct md_device *md,
- struct lu_fid *fid)
+int cmm_root_get(const struct lu_context *ctx, struct md_device *md,
+ struct lu_fid *fid)
{
struct cmm_device *cmm_dev = md2cmm_dev(md);
/* valid only on master MDS */
const struct lu_object_header *hdr,
struct lu_device *);
+int cmm_root_get(const struct lu_context *ctx, struct md_device *md,
+ struct lu_fid *fid);
+
#ifdef HAVE_SPLIT_SUPPORT
/* cmm_split.c */
int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo);
#endif
+
#endif /* __KERNEL__ */
#endif /* _CMM_INTERNAL_H */
#include "cmm_internal.h"
#include "mdc_internal.h"
-struct cmm_thread_info {
- struct md_attr cti_ma;
-};
-
-struct lu_context_key cmm_thread_key;
-struct cmm_thread_info *cmm_ctx_info(const struct lu_context *ctx)
-{
- struct cmm_thread_info *info;
-
- info = lu_context_key_get(ctx, &cmm_thread_key);
- LASSERT(info != NULL);
- return info;
-}
-
#define CMM_NO_SPLIT_EXPECTED 0
#define CMM_EXPECT_SPLIT 1
#define CMM_NO_SPLITTABLE 2
-#define SPLIT_SIZE 64*1024
+#define SPLIT_SIZE 8*1024
+
+static inline struct lu_fid* cmm2_fid(struct cmm_object *obj)
+{
+ return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
+}
static int cmm_expect_splitting(const struct lu_context *ctx,
struct md_object *mo, struct md_attr *ma)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
+ struct lu_fid *fid = NULL;
+ int rc = CMM_EXPECT_SPLIT;
ENTRY;
- if (cmm->cmm_tgt_count == 1)
- RETURN(CMM_NO_SPLIT_EXPECTED);
+ if (cmm->cmm_tgt_count == 0)
+ GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
if (ma->ma_attr.la_size < SPLIT_SIZE)
- RETURN(CMM_NO_SPLIT_EXPECTED);
+ GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
if (ma->ma_lmv_size)
- RETURN(CMM_NO_SPLIT_EXPECTED);
-
- RETURN(CMM_EXPECT_SPLIT);
-}
+ GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
+
+ OBD_ALLOC_PTR(fid);
+ rc = cmm_root_get(ctx, &cmm->cmm_md_dev, fid);
+ if (rc)
+ GOTO(cleanup, rc);
+
+ if (lu_fid_eq(fid, cmm2_fid(md2cmm_obj(mo))))
+ GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
-static inline struct lu_fid* cmm2_fid(struct cmm_object *obj)
-{
- return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
+cleanup:
+ if (fid)
+ OBD_FREE_PTR(fid);
+ RETURN(rc);
}
#define cmm_md_size(stripes) \
spin_lock(&cmm->cmm_tgt_guard);
list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
mc_linkage) {
- if (cmm->cmm_local_num == mc->mc_num)
- continue;
-
+ LASSERT(cmm->cmm_local_num != mc->mc_num);
+
rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i++], NULL);
if (rc < 0) {
spin_unlock(&cmm->cmm_tgt_guard);
}
}
spin_unlock(&cmm->cmm_tgt_guard);
- LASSERT(i + 1 == count);
+ LASSERT(i == count);
if (rc == 1)
rc = 0;
RETURN(rc);
struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo));
ENTRY;
- lmv_size = cmm_md_size(cmm->cmm_tgt_count);
+ lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
/* This lmv will be free after finish splitting. */
OBD_ALLOC(lmv, lmv_size);
static int cmm_send_split_pages(const struct lu_context *ctx,
struct md_object *mo, struct lu_rdpg *rdpg,
- struct lu_fid *fid)
+ struct lu_fid *fid, __u32 hash_end)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
struct cmm_object *obj;
for (i = 0; i < rdpg->rp_npages; i++) {
rc = mdc_send_page(ctx, md_object_next(&obj->cmo_obj),
- rdpg->rp_pages[i]);
+ rdpg->rp_pages[i], hash_end);
if (rc)
- GOTO(cleanup, rc);
+ break;
}
-cleanup:
cmm_object_put(ctx, obj);
RETURN(rc);
}
static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo,
- struct lu_rdpg *rdpg, struct lu_fid *lf)
+ struct lu_rdpg *rdpg, struct lu_fid *lf,
+ __u32 end)
{
- struct lu_dirpage *dp;
- __u32 hash_end;
int rc, i;
ENTRY;
- /* init page with '0' */
- for (i = 0; i < rdpg->rp_npages; i++) {
- memset(kmap(rdpg->rp_pages[i]), 0, CFS_PAGE_SIZE);
- kunmap(rdpg->rp_pages[i]);
- }
-
/* Read splitted page and send them to the slave master */
do {
+ /* init page with '0' */
+ for (i = 0; i < rdpg->rp_npages; i++) {
+ memset(kmap(rdpg->rp_pages[i]), 0, CFS_PAGE_SIZE);
+ kunmap(rdpg->rp_pages[i]);
+ }
+
rc = mo_readpage(ctx, md_object_next(mo), rdpg);
+
+ /* -E2BIG means it already reach the end of the dir */
+ if (rc == -E2BIG)
+ RETURN(0);
if (rc)
RETURN(rc);
- rc = cmm_send_split_pages(ctx, mo, rdpg, lf);
- if (rc)
- RETURN(rc);
+ rc = cmm_send_split_pages(ctx, mo, rdpg, lf, end);
- dp = kmap(rdpg->rp_pages[0]);
- hash_end = dp->ldp_hash_end;
- kunmap(rdpg->rp_pages[0]);
- if (hash_end == ~0ul)
- break;
- } while (hash_end < rdpg->rp_hash_end);
-
+ } while (rc == 0);
+
+ /* it means already finish splitting this segment */
+ if (rc == -E2BIG)
+ rc = 0;
RETURN(rc);
}
+#if 0
static int cmm_remove_entries(const struct lu_context *ctx,
struct md_object *mo, struct lu_rdpg *rdpg)
{
}
RETURN(rc);
}
-
+#endif
#define MAX_HASH_SIZE 0x3fffffff
#define SPLIT_PAGE_COUNT 1
static int cmm_scan_and_split(const struct lu_context *ctx,
hash_segement = MAX_HASH_SIZE / cmm->cmm_tgt_count;
for (i = 1; i < cmm->cmm_tgt_count; i++) {
struct lu_fid *lf = &ma->ma_lmv->mea_ids[i];
+ __u32 hash_end;
rdpg->rp_hash = i * hash_segement;
- rdpg->rp_hash_end = rdpg->rp_hash + hash_segement;
- rc = cmm_remove_entries(ctx, mo, rdpg);
- if (rc)
- GOTO(cleanup, rc);
-
- rc = cmm_split_entries(ctx, mo, rdpg, lf);
+ hash_end = rdpg->rp_hash + hash_segement;
+
+ rc = cmm_split_entries(ctx, mo, rdpg, lf, hash_end);
if (rc)
GOTO(cleanup, rc);
}
struct lu_device *);
#ifdef HAVE_SPLIT_SUPPORT
int mdc_send_page(const struct lu_context *ctx, struct md_object *mo,
- struct page *page);
+ struct page *page, __u32 end);
#endif
#endif /* __KERNEL__ */
#ifdef HAVE_SPLIT_SUPPORT
int mdc_send_page(const struct lu_context *ctx, struct md_object *mo,
- struct page *page)
+ struct page *page, __u32 end)
{
struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
struct lu_dirpage *dp;
struct lu_dirent *ent;
- int rc;
+ int rc, offset = 0, rc1 = 0;
ENTRY;
kmap(page);
dp = page_address(page);
for (ent = lu_dirent_start(dp); ent != NULL;
ent = lu_dirent_next(ent)) {
+ if (ent->lde_hash < end) {
+ offset = (int)((__u32)ent - (__u32)dp);
+ rc1 = -E2BIG;
+ goto send_page;
+ }
+
/* allocate new fid for each obj */
rc = obd_fid_alloc(mc->mc_desc.cl_exp, &ent->lde_fid, NULL);
if (rc) {
}
}
kunmap(page);
- rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
- page);
+ offset = CFS_PAGE_SIZE;
+send_page:
+ if (offset > 0) {
+ rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
+ page, offset);
+ CDEBUG(D_INFO, "send page %p offset %d fid "DFID" rc %d \n",
+ page, offset, PFID(lu_object_fid(&mo->mo_lu)),
+ rc);
+ }
+ if (rc == 0)
+ rc = rc1;
RETURN(rc);
}
#endif
struct lu_rdpg {
/* input params, should be filled out by mdt */
__u32 rp_hash; /* hash */
- __u32 rp_hash_end; /* hash end, means reading the
- entry until this hash*/
int rp_count; /* count in bytes */
int rp_npages; /* number of pages */
struct page **rp_pages; /* pointers to pages */
int it_open_error(int phase, struct lookup_intent *it);
#ifdef HAVE_SPLIT_SUPPORT
int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
- const struct page *page);
+ const struct page *page, int offset);
#endif
#endif
extern const struct req_format RQF_MDS_CONNECT;
extern const struct req_format RQF_MDS_DISCONNECT;
extern const struct req_format RQF_MDS_READPAGE;
+extern const struct req_format RQF_MDS_WRITEPAGE;
extern const struct req_format RQF_MDS_DONE_WRITING;
#ifdef HAVE_SPLIT_SUPPORT
extern const struct req_format RQF_MDS_WRITEPAGE;
#ifdef HAVE_SPLIT_SUPPORT
int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
- const struct page *page)
+ const struct page *page, int offset)
{
struct obd_import *imp = class_exp2cliimp(exp);
struct ptlrpc_request *req = NULL;
if (desc == NULL)
GOTO(out, rc = -ENOMEM);
/* NB req now owns desc and will free it when it gets freed */
- ptlrpc_prep_bulk_page(desc, (struct page*)page, 0, PAGE_CACHE_SIZE);
+ ptlrpc_prep_bulk_page(desc, (struct page*)page, 0, offset);
- mdc_readdir_pack(req, REQ_REC_OFF, 0, PAGE_CACHE_SIZE, fid);
+ mdc_readdir_pack(req, REQ_REC_OFF, 0, offset, fid);
ptlrpc_req_set_repsize(req, 2, size);
rc = ptlrpc_queue_wait(req);
}
}
kunmap(page);
-
RETURN(rc);
}
* reqbody->nlink contains number bytes to read.
*/
rdpg->rp_hash = reqbody->size;
- rdpg->rp_hash_end = ~0ul;
if ((__u64)rdpg->rp_hash != reqbody->size) {
CERROR("Invalid hash: %#llx != %#llx\n",
(__u64)rdpg->rp_hash, reqbody->size);
static int osd_dir_page_build(const struct lu_context *ctx, int first,
void *area, int nob,
struct dt_it_ops *iops, struct dt_it *it,
- __u32 *start, __u32 *end, __u32 hash_end,
- struct lu_dirent **last)
+ __u32 *start, __u32 *end, struct lu_dirent **last)
{
int result;
struct osd_thread_info *info = lu_context_key_get(ctx, &osd_key);
recsize = (sizeof *ent + len + 3) & ~3;
hash = iops->store(ctx, it);
- if (hash > hash_end) {
- *end = hash_end;
- if (first && ent == area)
- *start = hash_end;
- break;
- }
*end = hash;
CDEBUG(D_INODE, "%p %p %d "DFID": %#8.8x (%d)\"%*.*s\"\n",
area, ent, nob, PFID(fid), hash, len, len, len, name);
rc = osd_dir_page_build(ctxt, !i, kmap(pg),
min_t(int, nob, CFS_PAGE_SIZE),
iops, it,
- &hash_start, &hash_end,
- rdpg->rp_hash_end, &last);
+ &hash_start, &hash_end, &last);
if (rc != 0 || i == rdpg->rp_npages - 1)
last->lde_reclen = 0;
kunmap(pg);
&RQF_MDS_PIN,
&RQF_MDS_READPAGE,
&RQF_MDS_DONE_WRITING,
+ &RQF_MDS_WRITEPAGE,
};
struct req_msg_field {
mdt_body_only, mdt_body_only);
EXPORT_SYMBOL(RQF_MDS_READPAGE);
+const struct req_format RQF_MDS_WRITEPAGE =
+ DEFINE_REQ_FMT0("MDS_WRITEPAGE",
+ mdt_body_only, mdt_body_only);
+EXPORT_SYMBOL(RQF_MDS_WRITEPAGE);
+
#if !defined(__REQ_LAYOUT_USER__)
int req_layout_init(void)