From 0c267d3ab93ded281a2d8acaec32fe931bae8188 Mon Sep 17 00:00:00 2001 From: wangdi Date: Wed, 13 Sep 2006 15:39:57 +0000 Subject: [PATCH] Branch: b_new_cmd a lot fixes about splitting dir --- lustre/cmm/cmm_split.c | 37 ++++++++++------- lustre/cmm/mdc_device.c | 1 + lustre/cmm/mdc_internal.h | 4 +- lustre/cmm/mdc_object.c | 12 ++++-- lustre/include/lustre/lustre_idl.h | 1 + lustre/include/lustre_req_layout.h | 1 + lustre/include/obd.h | 3 +- lustre/include/obd_class.h | 3 +- lustre/liblustre/namei.c | 5 ++- lustre/liblustre/super.c | 8 ++-- lustre/llite/llite_lib.c | 8 ++-- lustre/lmv/lmv_intent.c | 12 ++++-- lustre/lmv/lmv_obd.c | 83 ++++++++++++++++++++++++++++++++------ lustre/lmv/lmv_object.c | 6 ++- lustre/mdc/mdc_internal.h | 4 +- lustre/mdc/mdc_locks.c | 13 +++++- lustre/mdc/mdc_request.c | 27 +++++++++++-- lustre/mdd/mdd_handler.c | 18 ++++++--- lustre/mds/handler.c | 1 + lustre/mdt/mdt_handler.c | 26 +++++++++--- lustre/mdt/mdt_open.c | 8 +++- lustre/obdclass/mea.c | 17 ++++++++ lustre/ptlrpc/layout.c | 6 +++ 23 files changed, 238 insertions(+), 66 deletions(-) diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c index af7f9c2..ce0bffc 100644 --- a/lustre/cmm/cmm_split.c +++ b/lustre/cmm/cmm_split.c @@ -36,6 +36,7 @@ #include #include #include +#include #include "cmm_internal.h" #include "mdc_internal.h" @@ -44,7 +45,7 @@ #define CMM_NO_SPLITTABLE 2 enum { - SPLIT_SIZE = 8*1024 + SPLIT_SIZE = 12*1024 }; static inline struct lu_fid* cmm2_fid(struct cmm_object *obj) @@ -74,6 +75,8 @@ static int cmm_expect_splitting(const struct lu_context *ctx, if (rc) GOTO(cleanup, rc); + rc = CMM_EXPECT_SPLIT; + if (lu_fid_eq(fid, cmm2_fid(md2cmm_obj(mo)))) GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); @@ -84,7 +87,7 @@ cleanup: } #define cmm_md_size(stripes) \ - (sizeof(struct lmv_stripe_md) + stripes * sizeof(struct lu_fid)) + (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid)) static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm, struct lu_fid *fid, int count) @@ -100,7 +103,7 @@ static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm, mc_linkage) { LASSERT(cmm->cmm_local_num != mc->mc_num); - rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i++], NULL); + rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i], NULL); if (rc > 0) { struct lu_site *ls; @@ -113,6 +116,7 @@ static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm, spin_unlock(&cmm->cmm_tgt_guard); RETURN(rc); } + i++; } spin_unlock(&cmm->cmm_tgt_guard); LASSERT(i == count); @@ -183,9 +187,9 @@ static int cmm_create_slave_objects(const struct lu_context *ctx, if (!lmv) RETURN(-ENOMEM); - lmv->mea_master = -1; - lmv->mea_magic = MEA_MAGIC_ALL_CHARS; - lmv->mea_count = cmm->cmm_tgt_count; + lmv->mea_master = cmm->cmm_local_num; + lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT; + lmv->mea_count = cmm->cmm_tgt_count + 1; lmv->mea_ids[0] = *lf; @@ -193,15 +197,12 @@ static int cmm_create_slave_objects(const struct lu_context *ctx, if (rc) GOTO(cleanup, rc); - for (i = 1; i < cmm->cmm_tgt_count; i ++) { + for (i = 1; i < cmm->cmm_tgt_count + 1; i ++) { rc = cmm_creat_remote_obj(ctx, cmm, &lmv->mea_ids[i], ma); if (rc) GOTO(cleanup, rc); } - rc = mo_xattr_set(ctx, md_object_next(mo), lmv, lmv_size, - MDS_LMV_MD_NAME, 0); - ma->ma_lmv_size = lmv_size; ma->ma_lmv = lmv; cleanup: @@ -290,7 +291,6 @@ static int cmm_remove_entries(const struct lu_context *ctx, RETURN(rc); } #endif -#define MAX_HASH_SIZE 0x3fffffff #define SPLIT_PAGE_COUNT 1 static int cmm_scan_and_split(const struct lu_context *ctx, struct md_object *mo, struct md_attr *ma) @@ -317,8 +317,8 @@ static int cmm_scan_and_split(const struct lu_context *ctx, GOTO(cleanup, rc = -ENOMEM); } - hash_segement = MAX_HASH_SIZE / cmm->cmm_tgt_count; - for (i = 1; i < cmm->cmm_tgt_count; i++) { + hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1); + for (i = 1; i < cmm->cmm_tgt_count + 1; i++) { struct lu_fid *lf = &ma->ma_lmv->mea_ids[i]; __u32 hash_end; @@ -355,7 +355,7 @@ int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo) if (ma == NULL) RETURN(-ENOMEM); - ma->ma_need = MA_INODE; + ma->ma_need = MA_INODE|MA_LMV; rc = mo_attr_get(ctx, mo, ma); if (rc) GOTO(cleanup, ma); @@ -372,7 +372,16 @@ int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo) /* step3: scan and split the object */ rc = cmm_scan_and_split(ctx, mo, ma); + if (rc) + GOTO(cleanup, ma); + + /* step4: set mea to the master object */ + rc = mo_xattr_set(ctx, md_object_next(mo), ma->ma_lmv, ma->ma_lmv_size, + MDS_LMV_MD_NAME, 0); + if (rc == -ERESTART) + CWARN("Dir"DFID" has been split \n", + PFID(lu_object_fid(&mo->mo_lu))); cleanup: if (ma->ma_lmv_size && ma->ma_lmv) OBD_FREE(ma->ma_lmv, ma->ma_lmv_size); diff --git a/lustre/cmm/mdc_device.c b/lustre/cmm/mdc_device.c index a476cc2..1780677 100644 --- a/lustre/cmm/mdc_device.c +++ b/lustre/cmm/mdc_device.c @@ -35,6 +35,7 @@ #include #include #include +#include "cmm_internal.h" #include "mdc_internal.h" static struct lu_device_operations mdc_lu_ops; diff --git a/lustre/cmm/mdc_internal.h b/lustre/cmm/mdc_internal.h index f5a5d30..a86f358 100644 --- a/lustre/cmm/mdc_internal.h +++ b/lustre/cmm/mdc_internal.h @@ -96,8 +96,8 @@ struct lu_object *mdc_object_alloc(const struct lu_context *, const struct lu_object_header *, struct lu_device *); #ifdef HAVE_SPLIT_SUPPORT -int mdc_send_page(const struct lu_context *ctx, struct md_object *mo, - struct page *page, __u32 end); +int mdc_send_page(struct cmm_device *cmm, const struct lu_context *ctx, + struct md_object *mo, struct page *page, __u32 end); #endif #endif /* __KERNEL__ */ diff --git a/lustre/cmm/mdc_object.c b/lustre/cmm/mdc_object.c index 5f595be..10bf0e9 100644 --- a/lustre/cmm/mdc_object.c +++ b/lustre/cmm/mdc_object.c @@ -35,6 +35,7 @@ #include #include #include +#include "cmm_internal.h" #include "mdc_internal.h" static struct md_object_operations mdc_mo_ops; @@ -256,14 +257,19 @@ int mdc_send_page(struct cmm_device *cm, const struct lu_context *ctx, kmap(page); dp = page_address(page); + + ent = lu_dirent_start(dp); + if (ent->lde_hash > end) + RETURN(-E2BIG); + for (ent = lu_dirent_start(dp); ent != NULL; ent = lu_dirent_next(ent)) { - if (ent->lde_hash < end) { + if (ent->lde_hash > end) { offset = (int)((__u32)ent - (__u32)dp); rc1 = -E2BIG; goto send_page; } - + /* allocate new fid for each obj */ rc = obd_fid_alloc(mc->mc_desc.cl_exp, &ent->lde_fid, NULL); if (rc > 0) { @@ -274,7 +280,7 @@ int mdc_send_page(struct cmm_device *cm, const struct lu_context *ctx, fid_seq(&ent->lde_fid), mc->mc_num, ctx); } - + if (rc < 0) { kunmap(page); RETURN(rc); diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 17fd2d2..ead982e 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -298,6 +298,7 @@ static inline struct lu_dirent *lu_dirent_next(struct lu_dirent *ent) #define MEA_MAGIC_LAST_CHAR 0xb2221ca1 #define MEA_MAGIC_ALL_CHARS 0xb222a11c +#define MEA_MAGIC_HASH_SEGMENT 0xb222a11b struct lmv_stripe_md { __u32 mea_magic; diff --git a/lustre/include/lustre_req_layout.h b/lustre/include/lustre_req_layout.h index 5f33a29..46348ad 100644 --- a/lustre/include/lustre_req_layout.h +++ b/lustre/include/lustre_req_layout.h @@ -109,6 +109,7 @@ extern const struct req_format RQF_MDS_PIN; extern const struct req_format RQF_MDS_CONNECT; extern const struct req_format RQF_MDS_DISCONNECT; extern const struct req_format RQF_MDS_READPAGE; +extern const struct req_format RQF_MDS_WRITEPAGE; extern const struct req_format RQF_MDS_DONE_WRITING; /* diff --git a/lustre/include/obd.h b/lustre/include/obd.h index debc5cf..640a5f6 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -1159,7 +1159,8 @@ struct md_ops { int (*m_init_ea_size)(struct obd_export *, int, int, int); int (*m_get_lustre_md)(struct obd_export *, struct ptlrpc_request *, - int, struct obd_export *, struct lustre_md *); + int, struct obd_export *, struct obd_export *, + struct lustre_md *); int (*m_free_lustre_md)(struct obd_export *, struct lustre_md *); diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 0d94bc0..995d2af 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1767,13 +1767,14 @@ static inline int md_unlink(struct obd_export *exp, struct md_op_data *op_data, static inline int md_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, int offset, struct obd_export *dt_exp, + struct obd_export *md_exp, struct lustre_md *md) { ENTRY; EXP_CHECK_MD_OP(exp, get_lustre_md); MD_COUNTER_INCREMENT(exp->exp_obd, get_lustre_md); RETURN(MDP(exp->exp_obd, get_lustre_md)(exp, req, offset, - dt_exp, md)); + dt_exp, md_exp, md)); } static inline int md_free_lustre_md(struct obd_export *exp, diff --git a/lustre/liblustre/namei.c b/lustre/liblustre/namei.c index aea7407..9da93a1 100644 --- a/lustre/liblustre/namei.c +++ b/lustre/liblustre/namei.c @@ -214,7 +214,8 @@ static int pnode_revalidate_finish(struct ptlrpc_request *req, RETURN(-ENOENT); rc = md_get_lustre_md(llu_i2sbi(inode)->ll_md_exp, req, - offset, llu_i2sbi(inode)->ll_dt_exp, &md); + offset, llu_i2sbi(inode)->ll_dt_exp, + llu_i2sbi(inode)->ll_md_exp, &md); if (rc) RETURN(rc); @@ -357,7 +358,7 @@ static int lookup_it_finish(struct ptlrpc_request *request, int offset, ptlrpc_req_finished(request); rc = md_get_lustre_md(sbi->ll_md_exp, request, offset, - sbi->ll_dt_exp, &md); + sbi->ll_dt_exp, sbi->ll_md_exp, &md); if (rc) RETURN(rc); diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 3676070..6dad0bc 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -454,7 +454,7 @@ static int llu_inode_revalidate(struct inode *inode) RETURN(-abs(rc)); } rc = md_get_lustre_md(sbi->ll_md_exp, req, REPLY_REC_OFF, - sbi->ll_dt_exp, &md); + sbi->ll_dt_exp, sbi->ll_md_exp, &md); /* XXX Too paranoid? */ if (((md.body->valid ^ valid) & OBD_MD_FLEASIZE) && @@ -701,7 +701,7 @@ int llu_setattr_raw(struct inode *inode, struct iattr *attr) } rc = md_get_lustre_md(sbi->ll_md_exp, request, REPLY_REC_OFF, - sbi->ll_dt_exp, &md); + sbi->ll_dt_exp, sbi->ll_md_exp, &md); if (rc) { ptlrpc_req_finished(request); RETURN(rc); @@ -1730,7 +1730,7 @@ static int llu_lov_setstripe_ea_info(struct inode *ino, int flags, } rc = md_get_lustre_md(sbi->ll_md_exp, req, - 1, sbi->ll_dt_exp, &md); + 1, sbi->ll_dt_exp, sbi->ll_md_exp, &md); if (rc) GOTO(out, rc); @@ -2085,7 +2085,7 @@ llu_fsswop_mount(const char *source, } err = md_get_lustre_md(sbi->ll_md_exp, request, REPLY_REC_OFF, - sbi->ll_dt_exp, &md); + sbi->ll_dt_exp, sbi->ll_md_exp, &md); if (err) { CERROR("failed to understand root inode md: rc = %d\n",err); GOTO(out_request, err); diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 58df98f..d36139f 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -342,7 +342,8 @@ int client_common_fill_super(struct super_block *sb, char *mdc, char *osc) } err = md_get_lustre_md(sbi->ll_md_exp, request, - REPLY_REC_OFF, sbi->ll_dt_exp, &md); + REPLY_REC_OFF, sbi->ll_dt_exp, sbi->ll_md_exp, + &md); if (err) { CERROR("failed to understand root inode md: rc = %d\n", err); ptlrpc_req_finished (request); @@ -1279,7 +1280,8 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) } rc = md_get_lustre_md(sbi->ll_md_exp, request, - REPLY_REC_OFF, sbi->ll_dt_exp, &md); + REPLY_REC_OFF, sbi->ll_dt_exp, + sbi->ll_md_exp, &md); if (rc) { ptlrpc_req_finished(request); RETURN(rc); @@ -1935,7 +1937,7 @@ int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req, prune_deathrow(sbi, 1); rc = md_get_lustre_md(sbi->ll_md_exp, req, offset, - sbi->ll_dt_exp, &md); + sbi->ll_dt_exp, sbi->ll_md_exp, &md); if (rc) RETURN(rc); diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index 3d5d58b..efe7337 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -37,6 +37,7 @@ #endif #include +#include #include #include #include @@ -178,10 +179,14 @@ repeat: mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, (char *)op_data->name, op_data->namelen); - CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n", - mds, PFID(&rpid)); rpid = obj->lo_inodes[mds].li_fid; + rc = lmv_fld_lookup(lmv, &rpid, &mds); lmv_obj_put(obj); + if (rc) + GOTO(out_free_sop_data, rc); + + CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n", + mds, PFID(&rpid)); } sop_data->fid1 = rpid; @@ -878,7 +883,8 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, LASSERT(body); update: - obj->lo_inodes[i].li_size = body->size; + obj->lo_inodes[i].li_size = (MAX_HASH_SIZE/obj->lo_objcount) * + (i + 1); CDEBUG(D_OTHER, "fresh: %lu\n", (unsigned long)obj->lo_inodes[i].li_size); diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index d9342be..24644a0 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -38,6 +39,7 @@ #include #include +#include #include #include #include @@ -694,9 +696,37 @@ static int lmv_placement_policy(struct obd_device *obd, #endif } else { - /* default policy is to use parent MDS */ + struct lmv_obj *obj; LASSERT(fid_is_sane(hint->ph_pfid)); - rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds); + + obj = lmv_obj_grab(obd, hint->ph_pfid); + if (obj) { + /* If the dir got split, alloc fid according + * to its hash + */ + struct lu_fid *rpid; + + *mds = raw_name2idx(obj->lo_hashtype, + obj->lo_objcount, + hint->ph_cname->name, + hint->ph_cname->len); + rpid = &obj->lo_inodes[*mds].li_fid; + rc = lmv_fld_lookup(lmv, rpid, mds); + if (rc) { + lmv_obj_put(obj); + GOTO(exit, rc); + } + CDEBUG(D_INODE, "the obj "DFID" has been" + "splitted,got MDS at "LPU64" by name %s\n", + PFID(hint->ph_pfid), *mds, + hint->ph_cname->name); + + rc = 0; + } else { + /* default policy is to use parent MDS */ + rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds); + } + } } else { /* sequences among all tgts are not well balanced, allocate new @@ -705,7 +735,7 @@ static int lmv_placement_policy(struct obd_device *obd, *mds = 0; rc = -EINVAL; } - +exit: if (rc) { CERROR("cannot choose MDS, err = %d\n", rc); } else { @@ -1180,7 +1210,7 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid) GOTO(cleanup, rc); } - rc = md_get_lustre_md(tgt_exp, req, 0, NULL, &md); + rc = md_get_lustre_md(tgt_exp, req, 1, NULL, exp, &md); if (rc) { CERROR("mdc_get_lustre_md() failed, error %d\n", rc); GOTO(cleanup, rc); @@ -1824,7 +1854,7 @@ static int lmv_readpage(struct obd_export *exp, struct obd_export *tgt_exp; struct lu_fid rid = *fid; struct lmv_obj *obj; - int i, rc; + int i = 0, rc; ENTRY; rc = lmv_check_connect(obd); @@ -1847,22 +1877,47 @@ static int lmv_readpage(struct obd_export *exp, rid = obj->lo_inodes[i].li_fid; lmv_obj_unlock(obj); - lmv_obj_put(obj); CDEBUG(D_OTHER, "forward to "DFID" with offset %lu\n", PFID(&rid), (unsigned long)offset); } - + tgt_exp = lmv_get_export(lmv, &rid); if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + GOTO(cleanup, PTR_ERR(tgt_exp)); rc = md_readpage(tgt_exp, &rid, offset, page, request); - + if (rc) + GOTO(cleanup, rc); +#ifdef __KERNEL__ + if (obj && i < obj->lo_objcount - 1) { + struct lu_dirpage *dp; + __u32 end; + /* This dirobj has been splitted, so we + * check whether reach the end of one hash_segment + * and reset ldp->ldp_hash_end + */ + kmap(page); + dp = page_address(page); + end = le32_to_cpu(dp->ldp_hash_end); + if (end == ~0ul) { + __u32 hash_segment_end = (i + 1) * + MAX_HASH_SIZE/obj->lo_objcount; + dp->ldp_hash_end = cpu_to_le32(hash_segment_end); + CDEBUG(D_INFO,"reset hash end %x for split obj "DFID"", + le32_to_cpu(dp->ldp_hash_end), PFID(&rid)); + } + kunmap(page); + + } +#endif /* * Here we could remove "." and ".." from all pages which at not from * master. But MDS has only "." and ".." for master dir. */ +cleanup: + if (obj) + lmv_obj_put(obj); RETURN(rc); } @@ -2178,7 +2233,8 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, RETURN(mea_size); if (mea->mea_magic == MEA_MAGIC_LAST_CHAR || - mea->mea_magic == MEA_MAGIC_ALL_CHARS) + mea->mea_magic == MEA_MAGIC_ALL_CHARS || + mea->mea_magic == MEA_MAGIC_HASH_SEGMENT) { magic = le32_to_cpu(mea->mea_magic); } else { @@ -2256,15 +2312,16 @@ int lmv_lock_match(struct obd_export *exp, int flags, } int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, - int offset, struct obd_export *dt_exp, - struct lustre_md *md) + int offset, struct obd_export *dt_exp, + struct obd_export *md_exp, struct lustre_md *md) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; int rc; ENTRY; - rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, offset, dt_exp, md); + rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, offset, dt_exp, md_exp, + md); RETURN(rc); } diff --git a/lustre/lmv/lmv_object.c b/lustre/lmv/lmv_object.c index ab48f9f..ddd126c 100644 --- a/lustre/lmv/lmv_object.c +++ b/lustre/lmv/lmv_object.c @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include "lmv_internal.h" @@ -64,7 +65,8 @@ lmv_obj_alloc(struct obd_device *obd, struct lmv_obd *lmv = &obd->u.lmv; LASSERT(mea->mea_magic == MEA_MAGIC_LAST_CHAR - || mea->mea_magic == MEA_MAGIC_ALL_CHARS); + || mea->mea_magic == MEA_MAGIC_ALL_CHARS + || mea->mea_magic == MEA_MAGIC_HASH_SEGMENT); OBD_SLAB_ALLOC(obj, obj_cache, CFS_ALLOC_STD, sizeof(*obj)); @@ -317,7 +319,7 @@ lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid, GOTO(cleanup, obj = ERR_PTR(rc)); } - rc = md_get_lustre_md(exp, req, 0, NULL, &md); + rc = md_get_lustre_md(exp, req, 0, NULL, exp, &md); if (rc) { CERROR("mdc_get_lustre_md() failed, error %d\n", rc); GOTO(cleanup, obj = ERR_PTR(rc)); diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index e1c73a0..43dcc49 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -157,7 +157,9 @@ int mdc_open(struct obd_export *exp, obd_id ino, int type, int flags, struct obd_client_handle; int mdc_get_lustre_md(struct obd_export *md_exp, struct ptlrpc_request *req, - int offset, struct obd_export *dt_exp, struct lustre_md *md); + int offset, struct obd_export *dt_exp, + struct obd_export *lmv_exp, + struct lustre_md *md); int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md); diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index a6a5894..43f9dbc 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -509,7 +509,18 @@ int mdc_enqueue(struct obd_export *exp, !it_open_error(DISP_OPEN_OPEN, it)) mdc_set_open_replay_data(NULL, NULL, req); - if ((body->valid & OBD_MD_FLEASIZE) != 0) { + if ((body->valid & OBD_MD_FLDIREA) != 0) { + if (body->eadatasize) { + eadata = lustre_swab_repbuf(req, + DLM_REPLY_REC_OFF + 1, + body->eadatasize, NULL); + if (eadata == NULL) { + CERROR ("Missing/short eadata\n"); + RETURN (-EPROTO); + } + } + } + if ((body->valid & OBD_MD_FLEASIZE)) { /* The eadata is opaque; just check that it is there. * Eventually, obd_unpackmd() will check the contents */ eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1, diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 6a7edaa..dfd3599 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -379,7 +379,9 @@ int mdc_unpack_acl(struct obd_export *exp, struct ptlrpc_request *req, #endif int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, - int offset, struct obd_export *dt_exp, struct lustre_md *md) + int offset, struct obd_export *dt_exp, + struct obd_export *md_exp, + struct lustre_md *md) { int rc = 0; ENTRY; @@ -420,9 +422,28 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, offset++; } else if (md->body->valid & OBD_MD_FLDIREA) { - /* TODO: umka, please handle this case */ + int lmvsize; + struct lov_mds_md *lmv; LASSERT(S_ISDIR(md->body->mode)); - offset++; + + if (md->body->eadatasize == 0) { + RETURN(0); + } + if (md->body->valid & OBD_MD_MEA) { + lmvsize = md->body->eadatasize; + lmv = lustre_msg_buf(req->rq_repmsg, offset, lmvsize); + LASSERT (lmv != NULL); + LASSERT_REPSWABBED(req, offset); + + rc = obd_unpackmd(md_exp, (void *)&md->mea, lmv, + lmvsize); + if (rc < 0) + RETURN(rc); + + LASSERT (rc >= sizeof (*md->mea)); + } + rc = 0; + offset ++; } /* for ACL, it's possible that FLACL is set but aclsize is zero. only diff --git a/lustre/mdd/mdd_handler.c b/lustre/mdd/mdd_handler.c index 5d9772a..01099fd 100644 --- a/lustre/mdd/mdd_handler.c +++ b/lustre/mdd/mdd_handler.c @@ -281,7 +281,6 @@ static int __mdd_lmm_get(const struct lu_context *ctxt, RETURN(rc); } -#ifdef HAVE_SPLIT_SUPPORT /* get lmv EA only*/ static int __mdd_lmv_get(const struct lu_context *ctxt, struct mdd_object *mdd_obj, struct md_attr *ma) @@ -296,7 +295,6 @@ static int __mdd_lmv_get(const struct lu_context *ctxt, } RETURN(rc); } -#endif static int mdd_attr_get_internal(const struct lu_context *ctxt, struct mdd_object *mdd_obj, @@ -313,12 +311,10 @@ static int mdd_attr_get_internal(const struct lu_context *ctxt, S_ISDIR(mdd_object_type(mdd_obj))) rc = __mdd_lmm_get(ctxt, mdd_obj, ma); } -#ifdef HAVE_SPLIT_SUPPORT if (rc == 0 && ma->ma_need & MA_LMV) { if (S_ISDIR(mdd_object_type(mdd_obj))) rc = __mdd_lmv_get(ctxt, mdd_obj, ma); } -#endif CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n", rc, ma->ma_valid); RETURN(rc); @@ -586,7 +582,7 @@ static int mdd_recovery_complete(const struct lu_context *ctxt, struct lu_device *next = &mdd->mdd_child->dd_lu_dev; int rc; ENTRY; -/* TODO: + /* TODO: rc = mdd_lov_set_nextid(ctx, mdd); if (rc) { CERROR("%s: mdd_lov_set_nextid failed %d\n", @@ -730,6 +726,18 @@ static int __mdd_xattr_set(const struct lu_context *ctxt, struct mdd_object *o, if (buf && buf_len > 0) { rc = next->do_ops->do_xattr_set(ctxt, next, buf, buf_len, name, 0, handle); +#ifdef HAVE_SPLIT_SUPPORT + if (rc == 0) { + /* very ugly hack, if setting lmv, it means splitting + * sucess, we should return -ERESTART to notify the + * client, so transno for this splitting should be + * zero according to the replay rules. so return -ERESTART + * here let mdt trans stop callback know this. + */ + if (strncmp(name, MDS_LMV_MD_NAME, strlen(name)) == 0) + rc = -ERESTART; + } +#endif }else if (buf == NULL && buf_len == 0) { rc = next->do_ops->do_xattr_del(ctxt, next, name, handle); } diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 4458f76..994e3ad 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -1362,6 +1362,7 @@ int mds_msg_check_version(struct lustre_msg *msg) case MDS_GETATTR_NAME: case MDS_STATFS: case MDS_READPAGE: + case MDS_WRITEPAGE: case MDS_REINT: case MDS_CLOSE: case MDS_DONE_WRITING: diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index dc44085..265a610 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -254,10 +254,18 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, repbody->eadatasize = 0; repbody->aclsize = 0; - ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD); - ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD, RCL_SERVER); - - ma->ma_need = MA_INODE | MA_LOV; + if(reqbody->valid & OBD_MD_MEA) { + /* Assumption: MDT_MD size is enough for lmv size FIXME */ + ma->ma_lmv = req_capsule_server_get(pill, &RMF_MDT_MD); + ma->ma_lmv_size = req_capsule_get_size(pill, &RMF_MDT_MD, + RCL_SERVER); + ma->ma_need = MA_INODE | MA_LMV; + } else { + ma->ma_need = MA_INODE | MA_LOV ; + ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD); + ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD, + RCL_SERVER); + } rc = mo_attr_get(ctxt, next, ma); if (rc == -EREMOTE) { /* This object is located on remote node.*/ @@ -285,6 +293,12 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, else repbody->valid |= OBD_MD_FLEASIZE; } + if (ma->ma_valid & MA_LMV) { + LASSERT(S_ISDIR(la->la_mode)); + repbody->eadatasize = ma->ma_lmv_size; + repbody->valid |= OBD_MD_FLDIREA; + repbody->valid |= OBD_MD_MEA; + } } else if (S_ISLNK(la->la_mode) && reqbody->valid & OBD_MD_LINKNAME) { rc = mo_readlink(ctxt, next, ma->ma_lmm, ma->ma_lmm_size); @@ -589,7 +603,7 @@ static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page) rc = mdo_name_insert(info->mti_ctxt, md_object_next(&object->mot_obj), ent->lde_name, lf, 0); - /* FIXME: add cross_flags */ + CDEBUG(D_INFO, "insert name %s rc %d \n", ent->lde_name, rc); if (rc) { kunmap(page); RETURN(rc); @@ -618,7 +632,7 @@ static int mdt_writepage(struct mdt_thread_info *info) ENTRY; desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL); - if (desc) + if (!desc) RETURN(-ENOMEM); /* allocate the page for the desc */ diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 4278d8d..6ba8fed 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -693,15 +693,19 @@ int mdt_open(struct mdt_thread_info *info) if (result == -ENOENT) { /* not found and with MDS_OPEN_CREAT: let's create it */ - mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE); result = mdo_create(info->mti_ctxt, mdt_object_child(parent), rr->rr_name, mdt_object_child(child), &info->mti_spec, &info->mti_attr); - if (result != 0) + if (result == -ERESTART) GOTO(out_child, result); + else { + mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE); + if (result != 0) + GOTO(out_child, result); + } created = 1; } else { /* we have to get attr & lov ea for this object*/ diff --git a/lustre/obdclass/mea.c b/lustre/obdclass/mea.c index 2de1de6..317af8d 100644 --- a/lustre/obdclass/mea.c +++ b/lustre/obdclass/mea.c @@ -33,6 +33,7 @@ #include #endif #include +#include static int mea_last_char_hash(int count, char *name, int namelen) { @@ -55,6 +56,19 @@ static int mea_all_chars_hash(int count, char *name, int namelen) return c; } +/* This hash calculate method must be same as the lvar hash method */ +static int mea_hash_segment(int count, char *name, int namelen) +{ + __u32 result = 0; + __u32 hash_segment = MAX_HASH_SIZE / count; + + strncpy((void *)&result, name, min(namelen, (int)sizeof result)); + + result = (result << 1) & 0x7fffffff; + + return result / hash_segment; +} + int raw_name2idx(int hashtype, int count, const char *name, int namelen) { unsigned int c = 0; @@ -70,6 +84,9 @@ int raw_name2idx(int hashtype, int count, const char *name, int namelen) case MEA_MAGIC_ALL_CHARS: c = mea_all_chars_hash(count, (char *) name, namelen); break; + case MEA_MAGIC_HASH_SEGMENT: + c = mea_hash_segment(count, (char *) name, namelen); + break; default: CERROR("unknown hash type 0x%x\n", hashtype); } diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index be3d5c8..76c9ecc 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -291,6 +291,7 @@ static const struct req_format *req_formats[] = { &RQF_MDS_CLOSE, &RQF_MDS_PIN, &RQF_MDS_READPAGE, + &RQF_MDS_WRITEPAGE, &RQF_MDS_DONE_WRITING }; @@ -612,6 +613,11 @@ const struct req_format RQF_MDS_READPAGE = mdt_body_only, mdt_body_only); EXPORT_SYMBOL(RQF_MDS_READPAGE); +const struct req_format RQF_MDS_WRITEPAGE = + DEFINE_REQ_FMT0("MDS_WRITEPAGE", + mdt_body_only, mdt_body_only); +EXPORT_SYMBOL(RQF_MDS_WRITEPAGE); + #if !defined(__REQ_LAYOUT_USER__) int req_layout_init(void) -- 1.8.3.1