From: yury Date: Sat, 27 Sep 2008 11:20:20 +0000 (+0000) Subject: b=16727 X-Git-Tag: v1_9_80~45 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=54e6e2442374d11ba55518b552f7230a989c9b1c b=16727 r=adilger,shadow - various CMD related fixes. --- diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c index d77e77a..1d105fb 100644 --- a/lustre/cmm/cmm_split.c +++ b/lustre/cmm/cmm_split.c @@ -368,8 +368,6 @@ static int cmm_split_slaves_create(const struct lu_env *env, GOTO(cleanup, rc); i++; } - - ma->ma_valid |= MA_LMV; EXIT; cleanup: return rc; @@ -395,7 +393,7 @@ static inline struct lu_name *cmm_name(const struct lu_env *env, cmi = cmm_env_info(env); lname = &cmi->cti_name; lname->ln_name = name; - /* NOT count the terminating '\0' of name for length */ + /* do NOT count the terminating '\0' of name for length */ lname->ln_namelen = buflen - 1; return lname; } @@ -410,7 +408,7 @@ static int cmm_split_remove_entry(const struct lu_env *env, { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); struct cmm_thread_info *cmi; - struct md_attr *ma; + struct md_attr *ma; struct cmm_object *obj; int is_dir, rc; char *name; @@ -434,7 +432,7 @@ static int cmm_split_remove_entry(const struct lu_env *env, /* * XXX: These days only cross-ref dirs are possible, so for the * sake of simplicity, in split, we suppose that all cross-ref - * names pint to directory and do not do additional getattr to + * names point to directory and do not do additional getattr to * remote MDT. */ is_dir = 1; @@ -616,7 +614,7 @@ static int cmm_split_process_dir(const struct lu_env *env, { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg; - __u64 hash_segement; + __u64 hash_segment; int rc = 0, i; ENTRY; @@ -631,23 +629,23 @@ static int cmm_split_process_dir(const struct lu_env *env, GOTO(cleanup, rc = -ENOMEM); } - LASSERT(ma->ma_valid & MA_LMV); - hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1); + hash_segment = MAX_HASH_SIZE; + do_div(hash_segment, cmm->cmm_tgt_count + 1); for (i = 1; i < cmm->cmm_tgt_count + 1; i++) { struct lu_fid *lf; __u64 hash_end; lf = &ma->ma_lmv->mea_ids[i]; - rdpg->rp_hash = i * hash_segement; + rdpg->rp_hash = i * hash_segment; if (i == cmm->cmm_tgt_count) hash_end = MAX_HASH_SIZE; else - hash_end = rdpg->rp_hash + hash_segement; + hash_end = rdpg->rp_hash + hash_segment; rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end); if (rc) { CERROR("Error (rc = %d) while splitting for %d: fid=" - DFID", %08x:%08x\n", rc, i, PFID(lf), + DFID", "LPX64":"LPX64"\n", rc, i, PFID(lf), rdpg->rp_hash, hash_end); GOTO(cleanup, rc); } @@ -718,7 +716,6 @@ int cmm_split_dir(const struct lu_env *env, struct md_object *mo) } /* Step5: Set mea to the master object. */ - LASSERT(ma->ma_valid & MA_LMV); buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size); rc = mo_xattr_set(env, md_object_next(mo), buf, MDS_LMV_MD_NAME, 0); diff --git a/lustre/cmm/mdc_object.c b/lustre/cmm/mdc_object.c index edc2fb6..70a1c3e 100644 --- a/lustre/cmm/mdc_object.c +++ b/lustre/cmm/mdc_object.c @@ -161,8 +161,9 @@ static int mdc_req2attr_update(const struct lu_env *env, struct mdc_thread_info *mci; struct ptlrpc_request *req; struct mdt_body *body; - struct lov_mds_md *lov; + struct lov_mds_md *md; struct llog_cookie *cookie; + void *acl; ENTRY; mci = mdc_info_get(env); @@ -182,42 +183,68 @@ static int mdc_req2attr_update(const struct lu_env *env, *ma->ma_capa = *capa; } - if (!(body->valid & OBD_MD_FLEASIZE)) - RETURN(0); + if ((body->valid & OBD_MD_FLEASIZE) || (body->valid & OBD_MD_FLDIREA)) { + if (body->eadatasize == 0) { + CERROR("No size defined for easize field\n"); + RETURN(-EPROTO); + } + + md = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, + body->eadatasize); + if (md == NULL) + RETURN(-EPROTO); - if (body->eadatasize == 0) { - CERROR("OBD_MD_FLEASIZE is set but eadatasize is zero\n"); - RETURN(-EPROTO); + LASSERT(ma->ma_lmm != NULL); + LASSERT(ma->ma_lmm_size >= body->eadatasize); + ma->ma_lmm_size = body->eadatasize; + memcpy(ma->ma_lmm, md, ma->ma_lmm_size); + ma->ma_valid |= MA_LOV; } - lov = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, - body->eadatasize); - if (lov == NULL) - RETURN(-EPROTO); + if (body->valid & OBD_MD_FLCOOKIE) { + /* + * ACL and cookie share the same body->aclsize, we need + * to make sure that they both never come here. + */ + LASSERT(!(body->valid & OBD_MD_FLACL)); - LASSERT(ma->ma_lmm != NULL); - LASSERT(ma->ma_lmm_size >= body->eadatasize); - ma->ma_lmm_size = body->eadatasize; - memcpy(ma->ma_lmm, lov, ma->ma_lmm_size); - ma->ma_valid |= MA_LOV; + if (body->aclsize == 0) { + CERROR("No size defined for cookie field\n"); + RETURN(-EPROTO); + } - if (!(body->valid & OBD_MD_FLCOOKIE)) - RETURN(0); + cookie = req_capsule_server_sized_get(&req->rq_pill, + &RMF_LOGCOOKIES, + body->aclsize); + if (cookie == NULL) + RETURN(-EPROTO); - if (body->aclsize == 0) { - CERROR("OBD_MD_FLCOOKIE is set but cookie size is zero\n"); - RETURN(-EPROTO); + LASSERT(ma->ma_cookie != NULL); + LASSERT(ma->ma_cookie_size == body->aclsize); + memcpy(ma->ma_cookie, cookie, ma->ma_cookie_size); + ma->ma_valid |= MA_COOKIE; } - cookie = req_capsule_server_sized_get(&req->rq_pill, &RMF_ACL, - body->aclsize); - if (cookie == NULL) - RETURN(-EPROTO); +#ifdef CONFIG_FS_POSIX_ACL + if (body->valid & OBD_MD_FLACL) { + if (body->aclsize == 0) { + CERROR("No size defined for acl field\n"); + RETURN(-EPROTO); + } + + acl = req_capsule_server_sized_get(&req->rq_pill, + &RMF_ACL, + body->aclsize); + if (acl == NULL) + RETURN(-EPROTO); + + LASSERT(ma->ma_acl != NULL); + LASSERT(ma->ma_acl_size == body->aclsize); + memcpy(ma->ma_acl, acl, ma->ma_acl_size); + ma->ma_valid |= MA_ACL_DEF; + } +#endif - LASSERT(ma->ma_cookie != NULL); - LASSERT(ma->ma_cookie_size == body->aclsize); - memcpy(ma->ma_cookie, cookie, ma->ma_cookie_size); - ma->ma_valid |= MA_COOKIE; RETURN(0); } diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 81cd3d8..01d8c8f 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -742,10 +742,20 @@ struct lmv_tgt_desc { struct semaphore ltd_fid_sem; }; +enum placement_policy { + PLACEMENT_CHAR_POLICY = 0, + PLACEMENT_NID_POLICY = 1, + PLACEMENT_INVAL_POLICY = 2, + PLACEMENT_MAX_POLICY +}; + +typedef enum placement_policy placement_policy_t; + struct lmv_obd { int refcount; struct lu_client_fld lmv_fld; spinlock_t lmv_lock; + placement_policy_t lmv_placement; struct lmv_desc desc; struct obd_uuid cluuid; struct obd_export *exp; @@ -1417,7 +1427,7 @@ enum { #define MAX_HASH_SIZE_32 0x7fffffffUL #define MAX_HASH_SIZE 0x7fffffffffffffffULL -#define MAX_HASH_HIGHEST_BIT 0x1000000000000000 +#define MAX_HASH_HIGHEST_BIT 0x1000000000000000ULL struct lustre_md { struct mdt_body *body; diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index d7d37cf..f5b1b4e 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -2023,7 +2023,6 @@ llu_fsswop_mount(const char *source, struct inode *root; struct pnode_base *rootpb; struct obd_device *obd; - struct lu_fid rootfid; struct llu_sb_info *sbi; struct obd_statfs osfs; static struct qstr noname = { NULL, 0, 0 }; @@ -2165,16 +2164,20 @@ llu_fsswop_mount(const char *source, llu_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp); - err = md_getstatus(sbi->ll_md_exp, &rootfid, NULL); + fid_zero(&sbi->ll_root_fid); + err = md_getstatus(sbi->ll_md_exp, &sbi->ll_root_fid, NULL); if (err) { CERROR("cannot mds_connect: rc = %d\n", err); GOTO(out_lock_cn_cb, err); } - CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&rootfid)); - sbi->ll_root_fid = rootfid; + if (!fid_is_sane(&sbi->ll_root_fid)) { + CERROR("Invalid root fid during mount\n"); + GOTO(out_lock_cn_cb, err = -EINVAL); + } + CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&sbi->ll_root_fid)); /* fetch attr of root inode */ - err = md_getattr(sbi->ll_md_exp, &rootfid, NULL, + err = md_getattr(sbi->ll_md_exp, &sbi->ll_root_fid, NULL, OBD_MD_FLGETATTR | OBD_MD_FLBLOCKS, 0, &request); if (err) { CERROR("md_getattr failed for root: rc = %d\n", err); diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 257516f..ca9a7f8 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -281,7 +281,6 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt) struct inode *root = 0; struct ll_sb_info *sbi = ll_s2sbi(sb); struct obd_device *obd; - struct lu_fid rootfid; struct obd_capa *oc = NULL; struct obd_statfs osfs; struct ptlrpc_request *request = NULL; @@ -563,13 +562,17 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt) GOTO(out_lock_cn_cb, err = -ENOMEM); } - err = md_getstatus(sbi->ll_md_exp, &rootfid, &oc); + fid_zero(&sbi->ll_root_fid); + err = md_getstatus(sbi->ll_md_exp, &sbi->ll_root_fid, &oc); if (err) { CERROR("cannot mds_connect: rc = %d\n", err); GOTO(out_lock_cn_cb, err); } - CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&rootfid)); - sbi->ll_root_fid = rootfid; + if (!fid_is_sane(&sbi->ll_root_fid)) { + CERROR("Invalid root fid during mount\n"); + GOTO(out_lock_cn_cb, err = -EINVAL); + } + CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&sbi->ll_root_fid)); sb->s_op = &lustre_super_operations; sb->s_export_op = &lustre_export_operations; @@ -582,7 +585,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt) else if (sbi->ll_flags & LL_SBI_ACL) valid |= OBD_MD_FLACL; - err = md_getattr(sbi->ll_md_exp, &rootfid, oc, valid, 0, &request); + err = md_getattr(sbi->ll_md_exp, &sbi->ll_root_fid, oc, valid, 0, + &request); if (oc) free_capa(oc); if (err) { @@ -1875,6 +1879,7 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md) inode->i_mode = (inode->i_mode & S_IFMT)|(body->mode & ~S_IFMT); if (body->valid & OBD_MD_FLTYPE) inode->i_mode = (inode->i_mode & ~S_IFMT)|(body->mode & S_IFMT); + LASSERT(inode->i_mode != 0); if (S_ISREG(inode->i_mode)) { inode->i_blkbits = min(PTLRPC_MAX_BRW_BITS + 1, LL_MAX_BLKSIZE_BITS); } else { diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index e3f0662..e8e5572 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -370,6 +370,7 @@ static int do_statahead_interpret(struct ll_statahead_info *sai) struct dentry *dentry; struct lookup_intent *it; int rc = 0; + struct mdt_body *body; ENTRY; spin_lock(&lli->lli_lock); @@ -392,6 +393,10 @@ static int do_statahead_interpret(struct ll_statahead_info *sai) if (entry->se_stat != SA_ENTRY_STATED) GOTO(out, rc = entry->se_stat); + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + if (body == NULL) + GOTO(out, rc = -EFAULT); + if (dentry->d_inode == NULL) { /* * lookup. @@ -404,6 +409,13 @@ static int do_statahead_interpret(struct ll_statahead_info *sai) LASSERT(fid_is_zero(&minfo->mi_data.op_fid2)); + /* + * XXX: No fid in reply, this is probaly cross-ref case. + * SA can't handle it yet. + */ + if (body->valid & OBD_MD_MDS) + GOTO(out, rc = -EAGAIN); + rc = ll_lookup_it_finish(req, it, &icbd); if (!rc) /* @@ -421,10 +433,6 @@ static int do_statahead_interpret(struct ll_statahead_info *sai) /* * revalidate. */ - struct mdt_body *body; - - body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, - sizeof(*body)); if (!lu_fid_eq(&minfo->mi_data.op_fid2, &body->fid1)) { ll_unhash_aliases(dentry->d_inode); GOTO(out, rc = -EAGAIN); diff --git a/lustre/lmv/lmv_fld.c b/lustre/lmv/lmv_fld.c index fbfe410..c2b9757 100644 --- a/lustre/lmv/lmv_fld.c +++ b/lustre/lmv/lmv_fld.c @@ -75,12 +75,13 @@ int lmv_fld_lookup(struct lmv_obd *lmv, RETURN(rc); } - CDEBUG(D_INFO, "Got mds "LPU64" for sequence: "LPU64"\n", - *mds, fid_seq(fid)); + CDEBUG(D_INODE, "FLD lookup got mds #"LPU64" for fid="DFID"\n", + *mds, PFID(fid)); if (*mds >= lmv->desc.ld_tgt_count) { - CERROR("Got invalid mds: "LPU64" (max: %d)\n", - *mds, lmv->desc.ld_tgt_count); + CERROR("FLD lookup got invalid mds #"LPU64" (max: %d) " + "for fid="DFID"\n", *mds, lmv->desc.ld_tgt_count, + PFID(fid)); rc = -EINVAL; } RETURN(rc); diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index 285378e..7e35465 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -63,64 +63,55 @@ #include #include "lmv_internal.h" -static inline void lmv_drop_intent_lock(struct lookup_intent *it) -{ - if (it->d.lustre.it_lock_mode != 0) { - ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle, - it->d.lustre.it_lock_mode); - it->d.lustre.it_lock_mode = 0; - } -} - int lmv_intent_remote(struct obd_export *exp, void *lmm, int lmmsize, struct lookup_intent *it, int flags, struct ptlrpc_request **reqp, ldlm_blocking_callback cb_blocking, int extra_lock_flags) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct ptlrpc_request *req = NULL; - struct lustre_handle plock; - struct md_op_data *op_data; - struct obd_export *tgt_exp; - struct mdt_body *body; - int pmode, rc = 0; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct ptlrpc_request *req = NULL; + struct lustre_handle plock; + struct md_op_data *op_data; + struct lmv_tgt_desc *tgt; + struct mdt_body *body; + int pmode; + int rc = 0; ENTRY; - body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_DLM_REP); - LASSERT(body != NULL); + body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY); + if (body == NULL) + RETURN(-EPROTO); + /* + * Not cross-ref case, just get out of here. + */ if (!(body->valid & OBD_MD_MDS)) RETURN(0); /* - * oh, MDS reports that this is remote inode case i.e. we have to ask - * for real attrs on another MDS. + * Unfortunately, we have to lie to MDC/MDS to retrieve + * attributes llite needs and provideproper locking. */ - if (it->it_op & IT_LOOKUP) { - /* - * unfortunately, we have to lie to MDC/MDS to retrieve - * attributes llite needs. - */ + if (it->it_op & IT_LOOKUP) it->it_op = IT_GETATTR; - } - /* we got LOOKUP lock, but we really need attrs */ + /* + * We got LOOKUP lock, but we really need attrs. + */ pmode = it->d.lustre.it_lock_mode; if (pmode) { plock.cookie = it->d.lustre.it_lock_handle; it->d.lustre.it_lock_mode = 0; - it->d.lustre.it_data = 0; + it->d.lustre.it_data = NULL; } LASSERT(fid_is_sane(&body->fid1)); - it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE; - - tgt_exp = lmv_find_export(lmv, &body->fid1); - if (IS_ERR(tgt_exp)) - GOTO(out, rc = PTR_ERR(tgt_exp)); + tgt = lmv_find_target(lmv, &body->fid1); + if (IS_ERR(tgt)) + GOTO(out, rc = PTR_ERR(tgt)); OBD_ALLOC_PTR(op_data); if (op_data == NULL) @@ -128,23 +119,33 @@ int lmv_intent_remote(struct obd_export *exp, void *lmm, op_data->op_fid1 = body->fid1; op_data->op_bias = MDS_CROSS_REF; + + CDEBUG(D_INODE, + "REMOTE_INTENT with fid="DFID" -> mds #%d\n", + PFID(&body->fid1), tgt->ltd_idx); - rc = md_intent_lock(tgt_exp, op_data, lmm, lmmsize, it, flags, - &req, cb_blocking, extra_lock_flags); + it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE; + rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it, + flags, &req, cb_blocking, extra_lock_flags); + if (rc) + GOTO(out_free_op_data, rc); /* - * llite needs LOOKUP lock to track dentry revocation in order to + * LLite needs LOOKUP lock to track dentry revocation in order to * maintain dcache consistency. Thus drop UPDATE lock here and put * LOOKUP in request. */ - if (rc == 0) { - lmv_drop_intent_lock(it); - it->d.lustre.it_lock_handle = plock.cookie; - it->d.lustre.it_lock_mode = pmode; + if (it->d.lustre.it_lock_mode != 0) { + ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle, + it->d.lustre.it_lock_mode); + it->d.lustre.it_lock_mode = 0; } + it->d.lustre.it_lock_handle = plock.cookie; + it->d.lustre.it_lock_mode = pmode; - OBD_FREE_PTR(op_data); EXIT; +out_free_op_data: + OBD_FREE_PTR(op_data); out: if (rc && pmode) ldlm_lock_decref(&plock, pmode); @@ -154,41 +155,6 @@ out: return rc; } -int lmv_alloc_slave_fids(struct obd_device *obd, struct lu_fid *pid, - struct md_op_data *op, struct lu_fid *fid) -{ - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_obj *obj; - mdsno_t mds; - int mea_idx; - int rc; - ENTRY; - - obj = lmv_obj_grab(obd, pid); - if (!obj) { - CERROR("Object "DFID" should be split\n", - PFID(pid)); - RETURN(0); - } - - mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, - (char *)op->op_name, op->op_namelen); - mds = obj->lo_inodes[mea_idx].li_mds; - lmv_obj_put(obj); - - rc = __lmv_fid_alloc(lmv, fid, mds); - if (rc) { - CERROR("Can't allocate new fid, rc %d\n", - rc); - RETURN(rc); - } - - CDEBUG(D_INFO, "Allocate new fid "DFID" for split " - "obj\n", PFID(fid)); - - RETURN(rc); -} - /* * IT_OPEN is intended to open (and create, possible) an object. Parent (pid) * may be split dir. @@ -199,15 +165,17 @@ int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data, ldlm_blocking_callback cb_blocking, int extra_lock_flags) { - struct obd_device *obd = exp->exp_obd; - struct lu_fid rpid = op_data->op_fid1; - struct lmv_obd *lmv = &obd->u.lmv; - struct md_op_data *sop_data; - struct obd_export *tgt_exp; - struct lmv_stripe_md *mea; - struct mdt_body *body; - struct lmv_obj *obj; - int rc, loop = 0; + struct obd_device *obd = exp->exp_obd; + struct lu_fid rpid = op_data->op_fid1; + struct lmv_obd *lmv = &obd->u.lmv; + struct md_op_data *sop_data; + struct lmv_stripe_md *mea; + struct lmv_tgt_desc *tgt; + struct mdt_body *body; + struct lmv_object *obj; + int rc; + int loop = 0; + int sidx; ENTRY; OBD_ALLOC_PTR(sop_data); @@ -221,35 +189,33 @@ repeat: ++loop; LASSERT(loop <= 2); - obj = lmv_obj_grab(obd, &rpid); + obj = lmv_object_find(obd, &rpid); if (obj) { - int mea_idx; - /* * Directory is already split, so we have to forward request to * the right MDS. */ - mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, + sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, (char *)op_data->op_name, op_data->op_namelen); - rpid = obj->lo_inodes[mea_idx].li_fid; + rpid = obj->lo_stripes[sidx].ls_fid; - sop_data->op_mds = obj->lo_inodes[mea_idx].li_mds; - tgt_exp = lmv_get_export(lmv, sop_data->op_mds); + sop_data->op_mds = obj->lo_stripes[sidx].ls_mds; + tgt = lmv_get_target(lmv, sop_data->op_mds); sop_data->op_bias &= ~MDS_CHECK_SPLIT; - lmv_obj_put(obj); - CDEBUG(D_OTHER, "Choose slave dir ("DFID")\n", PFID(&rpid)); - } else { - struct lmv_tgt_desc *tgt; + lmv_object_put(obj); + CDEBUG(D_INODE, + "Choose slave dir ("DFID") -> mds #%d\n", + PFID(&rpid), tgt->ltd_idx); + } else { sop_data->op_bias |= MDS_CHECK_SPLIT; tgt = lmv_find_target(lmv, &rpid); sop_data->op_mds = tgt->ltd_idx; - tgt_exp = tgt->ltd_exp; } - if (IS_ERR(tgt_exp)) - GOTO(out_free_sop_data, rc = PTR_ERR(tgt_exp)); + if (IS_ERR(tgt)) + GOTO(out_free_sop_data, rc = PTR_ERR(tgt)); sop_data->op_fid1 = rpid; @@ -258,7 +224,6 @@ repeat: * For open with IT_CREATE and for IT_CREATE cases allocate new * fid and setup FLD for it. */ - /* save old child fid for correctly check stale data*/ sop_data->op_fid3 = sop_data->op_fid2; rc = lmv_fid_alloc(exp, &sop_data->op_fid2, sop_data); if (rc) @@ -270,7 +235,12 @@ repeat: GOTO(out_free_sop_data, rc); } - rc = md_intent_lock(tgt_exp, sop_data, lmm, lmmsize, it, flags, + CDEBUG(D_INODE, + "OPEN_INTENT with fid1="DFID", fid2="DFID", name='%s' -> mds #%d\n", + PFID(&sop_data->op_fid1), PFID(&sop_data->op_fid2), + sop_data->op_name, tgt->ltd_idx); + + rc = md_intent_lock(tgt->ltd_exp, sop_data, lmm, lmmsize, it, flags, reqp, cb_blocking, extra_lock_flags); if (rc == -ERESTART) { @@ -279,7 +249,7 @@ repeat: "Got -ERESTART during open!\n"); ptlrpc_req_finished(*reqp); *reqp = NULL; - it->d.lustre.it_data = 0; + it->d.lustre.it_data = NULL; /* * Directory got split. Time to update local object and repeat @@ -289,8 +259,8 @@ repeat: rc = lmv_handle_split(exp, &rpid); if (rc == 0) { /* We should reallocate child FID. */ - rc = lmv_alloc_slave_fids(obd, &rpid, op_data, - &sop_data->op_fid2); + rc = lmv_allocate_slaves(obd, &rpid, op_data, + &sop_data->op_fid2); if (rc == 0) goto repeat; } @@ -300,6 +270,15 @@ repeat: GOTO(out_free_sop_data, rc); /* + * Nothing is found, do not access body->fid1 as it is zero and thus + * pointless. + */ + if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) && + !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) && + !(it->d.lustre.it_disposition & DISP_OPEN_OPEN)) + GOTO(out_free_sop_data, rc = 0); + + /* * Okay, MDS has returned success. Probably name has been resolved in * remote inode. */ @@ -313,147 +292,148 @@ repeat: * this is normal situation, we should not print error here, * only debug info. */ - CDEBUG(D_OTHER, "can't handle remote %s: dir "DFID"("DFID"):" + CDEBUG(D_INODE, "Can't handle remote %s: dir "DFID"("DFID"):" "%*s: %d\n", LL_IT2STR(it), PFID(&op_data->op_fid2), PFID(&rpid), op_data->op_namelen, op_data->op_name, rc); GOTO(out_free_sop_data, rc); } - /* - * Nothing is found, do not access body->fid1 as it is zero and thus - * pointless. + /* + * Caller may use attrs MDS returns on IT_OPEN lock request so, we have + * to update them for split dir. */ - if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) && - !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) && - !(it->d.lustre.it_disposition & DISP_OPEN_OPEN)) - GOTO(out_free_sop_data, rc = 0); - - /* caller may use attrs MDS returns on IT_OPEN lock request so, we have - * to update them for split dir */ - body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_DLM_REP); + body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY); LASSERT(body != NULL); - - /* could not find object, FID is not present in response. */ + + /* + * Could not find object, FID is not present in response. + */ if (!(body->valid & OBD_MD_FLID)) GOTO(out_free_sop_data, rc = 0); - obj = lmv_obj_grab(obd, &body->fid1); - if (!obj && (mea = lmv_get_mea(*reqp))) { - /* FIXME: capability for remote! */ - /* wow! this is split dir, we'd like to handle it */ - obj = lmv_obj_create(exp, &body->fid1, mea); - if (IS_ERR(obj)) - GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj)); + obj = lmv_object_find(obd, &body->fid1); + if (obj == NULL) { + /* + * XXX: Capability for remote call! + */ + mea = lmv_get_mea(*reqp); + if (mea != NULL) { + obj = lmv_object_create(exp, &body->fid1, mea); + if (IS_ERR(obj)) + GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj)); + } } if (obj) { - /* This is split dir and we'd want to get attrs. */ - CDEBUG(D_OTHER, "attrs from slaves for "DFID"\n", + /* + * This is split dir and we'd want to get attrs. + */ + CDEBUG(D_INODE, "Slave attributes for "DFID"\n", PFID(&body->fid1)); rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1, cb_blocking, extra_lock_flags); - } else if (S_ISDIR(body->mode)) { - CDEBUG(D_OTHER, "object "DFID" has not lmv obj?\n", - PFID(&body->fid1)); + lmv_object_put(obj); } - - if (obj) - lmv_obj_put(obj); - EXIT; out_free_sop_data: OBD_FREE_PTR(sop_data); return rc; } -int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data, - void *lmm, int lmmsize, struct lookup_intent *it, - int flags, struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking, - int extra_lock_flags) +/* + * Handler for: getattr, lookup and revalidate cases. + */ +int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data, + void *lmm, int lmmsize, struct lookup_intent *it, + int flags, struct ptlrpc_request **reqp, + ldlm_blocking_callback cb_blocking, + int extra_lock_flags) { - struct lmv_obj *obj = NULL, *obj2 = NULL; - struct obd_device *obd = exp->exp_obd; - struct lu_fid rpid = op_data->op_fid1; - struct lmv_obd *lmv = &obd->u.lmv; - struct md_op_data *sop_data; - struct lmv_stripe_md *mea; - struct mdt_body *body; - mdsno_t mds; - int rc = 0; + struct obd_device *obd = exp->exp_obd; + struct lu_fid rpid = op_data->op_fid1; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_object *obj = NULL; + struct md_op_data *sop_data; + struct lmv_stripe_md *mea; + struct lmv_tgt_desc *tgt = NULL; + struct mdt_body *body; + int sidx; + int loop = 0; + int rc = 0; ENTRY; OBD_ALLOC_PTR(sop_data); if (sop_data == NULL) RETURN(-ENOMEM); - /* save op_data fro repeat case */ *sop_data = *op_data; - if (fid_is_sane(&op_data->op_fid2)) { - /* - * Caller wants to revalidate attrs of obj we have to revalidate - * slaves if requested object is split directory. - */ - CDEBUG(D_OTHER, "revalidate attrs for "DFID"\n", - PFID(&op_data->op_fid2)); +repeat: + ++loop; + LASSERT(loop <= 2); - rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds); - if (rc) - GOTO(out_free_sop_data, rc); -#if 0 - /* - * In fact, we do not need this with current intent_lock(), but - * it may change some day. - */ - obj = lmv_obj_grab(obd, &op_data->op_fid2); - if (obj) { - if (!lu_fid_eq(&op_data->op_fid1, &op_data->op_fid2)){ - rpid = obj->lo_inodes[mds].li_fid; - mds = obj->lo_inodes[mds].li_mds; - } - lmv_obj_put(obj); - } -#endif + obj = lmv_object_find(obd, &op_data->op_fid1); + if (obj && op_data->op_namelen) { + sidx = raw_name2idx(obj->lo_hashtype, + obj->lo_objcount, + (char *)op_data->op_name, + op_data->op_namelen); + rpid = obj->lo_stripes[sidx].ls_fid; + tgt = lmv_get_target(lmv, + obj->lo_stripes[sidx].ls_mds); + CDEBUG(D_INODE, + "Choose slave dir ("DFID") -> mds #%d\n", + PFID(&rpid), tgt->ltd_idx); + sop_data->op_bias &= ~MDS_CHECK_SPLIT; } else { - CDEBUG(D_OTHER, "INTENT getattr for %*s on "DFID"\n", - op_data->op_namelen, op_data->op_name, - PFID(&op_data->op_fid1)); - - rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds); - if (rc) - GOTO(out_free_sop_data, rc); - obj = lmv_obj_grab(obd, &op_data->op_fid1); - if (obj && op_data->op_namelen) { - int mea_idx; - - /* directory is already split. calculate mds */ - mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, - (char *)op_data->op_name, - op_data->op_namelen); - rpid = obj->lo_inodes[mea_idx].li_fid; - mds = obj->lo_inodes[mea_idx].li_mds; - sop_data->op_bias &= ~MDS_CHECK_SPLIT; - lmv_obj_put(obj); - - CDEBUG(D_OTHER, "forward to MDS #"LPU64" (slave "DFID")\n", - mds, PFID(&rpid)); - } else { - rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds); - if (rc) - GOTO(out_free_sop_data, rc); - sop_data->op_bias |= MDS_CHECK_SPLIT; - } + tgt = lmv_find_target(lmv, &op_data->op_fid1); + sop_data->op_bias |= MDS_CHECK_SPLIT; } + if (obj) + lmv_object_put(obj); + + if (IS_ERR(tgt)) + GOTO(out_free_sop_data, rc = PTR_ERR(tgt)); + + if (!fid_is_sane(&sop_data->op_fid2)) + fid_zero(&sop_data->op_fid2); + + CDEBUG(D_INODE, + "LOOKUP_INTENT with fid1="DFID", fid2="DFID + ", name='%s' -> mds #%d\n", + PFID(&sop_data->op_fid1), PFID(&sop_data->op_fid2), + sop_data->op_name ? sop_data->op_name : "", + tgt->ltd_idx); + sop_data->op_bias &= ~MDS_CROSS_REF; sop_data->op_fid1 = rpid; - rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm, - lmmsize, it, flags, reqp, cb_blocking, - extra_lock_flags); + rc = md_intent_lock(tgt->ltd_exp, sop_data, lmm, lmmsize, it, + flags, reqp, cb_blocking, extra_lock_flags); - LASSERTF(rc != -ERESTART, "GETATTR: Got unhandled -ERESTART!\n"); + if (rc == -ERESTART) { + LASSERT(*reqp != NULL); + DEBUG_REQ(D_WARNING|D_RPCTRACE, *reqp, + "Got -ERESTART during lookup!\n"); + ptlrpc_req_finished(*reqp); + *reqp = NULL; + it->d.lustre.it_data = 0; + + /* + * Directory got split since last update. This shouldn't be + * because splitting causes lock revocation, so revalidate had + * to fail and lookup on dir had to return mea. + */ + LASSERT(obj == NULL); + + obj = lmv_object_create(exp, &rpid, NULL); + if (IS_ERR(obj)) + GOTO(out_free_sop_data, rc = PTR_ERR(obj)); + lmv_object_put(obj); + goto repeat; + } + if (rc < 0) GOTO(out_free_sop_data, rc); @@ -461,17 +441,13 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data, /* * This is split dir. In order to optimize things a bit, we * consider obj valid updating missing parts. - - * FIXME: do we need to return any lock here? It would be fine - * if we don't. This means that nobody should use UPDATE lock to - * notify about object * removal. */ - CDEBUG(D_OTHER, - "revalidate slaves for "DFID", rc %d\n", - PFID(&op_data->op_fid2), rc); + CDEBUG(D_INODE, + "Revalidate slaves for "DFID", rc %d\n", + PFID(&op_data->op_fid1), rc); LASSERT(fid_is_sane(&op_data->op_fid2)); - rc = lmv_revalidate_slaves(exp, reqp, &op_data->op_fid2, it, rc, + rc = lmv_revalidate_slaves(exp, reqp, &op_data->op_fid1, it, rc, cb_blocking, extra_lock_flags); GOTO(out_free_sop_data, rc); } @@ -480,8 +456,8 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data, GOTO(out_free_sop_data, rc); /* - * okay, MDS has returned success. Probably name has been resolved in - * remote inode. + * MDS has returned success. Probably name has been resolved in + * remote inode. Let's check this. */ rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp, cb_blocking, extra_lock_flags); @@ -495,319 +471,35 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data, if (it->d.lustre.it_disposition & DISP_LOOKUP_NEG) GOTO(out_free_sop_data, rc = 0); - LASSERT(*reqp); - LASSERT((*reqp)->rq_repmsg); + LASSERT(*reqp != NULL); + LASSERT((*reqp)->rq_repmsg != NULL); body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY); LASSERT(body != NULL); - /* could not find object, FID is not present in response. */ + /* + * Could not find object, FID is not present in response. + */ if (!(body->valid & OBD_MD_FLID)) GOTO(out_free_sop_data, rc = 0); - obj2 = lmv_obj_grab(obd, &body->fid1); - - if (!obj2 && (mea = lmv_get_mea(*reqp))) { - - /* FIXME remote capability! */ - /* wow! this is split dir, we'd like to handle it. */ - obj2 = lmv_obj_create(exp, &body->fid1, mea); - if (IS_ERR(obj2)) - GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj2)); - } - - if (obj2) { - /* this is split dir and we'd want to get attrs */ - CDEBUG(D_OTHER, "attrs from slaves for "DFID", rc %d\n", - PFID(&body->fid1), rc); - - rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1, - cb_blocking, extra_lock_flags); - lmv_obj_put(obj2); - } - - EXIT; -out_free_sop_data: - OBD_FREE_PTR(sop_data); - return rc; -} - -/* this is not used currently */ -int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) -{ - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lustre_handle *lockh; - struct md_op_data *op_data; - struct ldlm_lock *lock; - struct mdt_body *body2; - struct mdt_body *body; - struct lmv_obj *obj; - int i, rc = 0; - ENTRY; - - LASSERT(reqp); - LASSERT(*reqp); - - /* - * Master is locked. we'd like to take locks on slaves and update - * attributes to be returned from the slaves it's important that lookup - * is called in two cases: - - * - for first time (dcache has no such a resolving yet). - - * ->d_revalidate() returned false. - - * Last case possible only if all the objs (master and all slaves aren't - * valid. - */ - - OBD_ALLOC_PTR(op_data); - if (op_data == NULL) - RETURN(-ENOMEM); - - body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY); - LASSERT(body != NULL); - - LASSERT((body->valid & OBD_MD_FLID) != 0); - obj = lmv_obj_grab(obd, &body->fid1); - LASSERT(obj != NULL); - - CDEBUG(D_OTHER, "lookup slaves for "DFID"\n", - PFID(&body->fid1)); - - lmv_obj_lock(obj); - - for (i = 0; i < obj->lo_objcount; i++) { - struct lu_fid fid = obj->lo_inodes[i].li_fid; - struct ptlrpc_request *req = NULL; - struct obd_export *tgt_exp; - struct lookup_intent it; - - if (lu_fid_eq(&fid, &obj->lo_fid)) - /* skip master obj */ - continue; - - CDEBUG(D_OTHER, "lookup slave "DFID"\n", PFID(&fid)); - - /* is obj valid? */ - memset(&it, 0, sizeof(it)); - it.it_op = IT_GETATTR; - - memset(op_data, 0, sizeof(*op_data)); - op_data->op_fid1 = fid; - op_data->op_fid2 = fid; - op_data->op_bias = MDS_CROSS_REF; - - tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds); - if (IS_ERR(tgt_exp)) - GOTO(cleanup, rc = PTR_ERR(tgt_exp)); - - rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, - &req, lmv_blocking_ast, 0); - - lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle; - if (rc > 0 && req == NULL) { - /* nice, this slave is valid */ - LASSERT(req == NULL); - CDEBUG(D_OTHER, "cached\n"); - goto release_lock; - } - - if (rc < 0) { - /* error during lookup */ - GOTO(cleanup, rc); - } - lock = ldlm_handle2lock(lockh); - LASSERT(lock); - - lock->l_ast_data = lmv_obj_get(obj); - - body2 = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - LASSERT(body2 != NULL); - - obj->lo_inodes[i].li_size = body2->size; - - CDEBUG(D_OTHER, "fresh: %lu\n", - (unsigned long)obj->lo_inodes[i].li_size); - - LDLM_LOCK_PUT(lock); - - if (req) - ptlrpc_req_finished(req); -release_lock: - lmv_update_body(body, obj->lo_inodes + i); - - if (it.d.lustre.it_lock_mode) { - ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode); - it.d.lustre.it_lock_mode = 0; - } - } - - EXIT; -cleanup: - lmv_obj_unlock(obj); - lmv_obj_put(obj); - OBD_FREE_PTR(op_data); - return rc; -} - -int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data, - void *lmm, int lmmsize, struct lookup_intent *it, - int flags, struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking, - int extra_lock_flags) -{ - struct obd_device *obd = exp->exp_obd; - struct lu_fid rpid = op_data->op_fid1; - struct lmv_obd *lmv = &obd->u.lmv; - struct md_op_data *sop_data; - struct lmv_stripe_md *mea; - struct mdt_body *body; - struct lmv_obj *obj; - int rc, loop = 0; - int mea_idx; - mdsno_t mds; - ENTRY; - - OBD_ALLOC_PTR(sop_data); - if (sop_data == NULL) - RETURN(-ENOMEM); - - /* save op_data fro repeat case */ - *sop_data = *op_data; - - /* - * IT_LOOKUP is intended to produce name -> fid resolving (let's call - * this lookup below) or to confirm requested resolving is still valid - * (let's call this revalidation) fid_is_sane(&sop_data->op_fid2) specifies - * revalidation. - */ - if (fid_is_sane(&op_data->op_fid2)) { - /* - * This is revalidate: we have to check is LOOKUP lock still - * valid for given fid. Very important part is that we have to - * choose right mds because namespace is per mds. - */ - rpid = op_data->op_fid1; - obj = lmv_obj_grab(obd, &rpid); - if (obj) { - mea_idx = raw_name2idx(obj->lo_hashtype, - obj->lo_objcount, - (char *)op_data->op_name, - op_data->op_namelen); - rpid = obj->lo_inodes[mea_idx].li_fid; - mds = obj->lo_inodes[mea_idx].li_mds; - sop_data->op_bias &= ~MDS_CHECK_SPLIT; - lmv_obj_put(obj); - } else { - rc = lmv_fld_lookup(lmv, &rpid, &mds); - if (rc) - GOTO(out_free_sop_data, rc); - sop_data->op_bias |= MDS_CHECK_SPLIT; - } - - CDEBUG(D_OTHER, "revalidate lookup for "DFID" to #"LPU64" MDS\n", - PFID(&op_data->op_fid2), mds); - } else { -repeat: - ++loop; - LASSERT(loop <= 2); - - /* - * This is lookup. During lookup we have to update all the - * attributes, because returned values will be put in struct - * inode. + obj = lmv_object_find(obd, &body->fid1); + if (obj == NULL) { + /* + * XXX: Remote capability is not handled. */ - obj = lmv_obj_grab(obd, &op_data->op_fid1); - if (obj) { - if (op_data->op_namelen) { - /* directory is already split. calculate mds */ - mea_idx = raw_name2idx(obj->lo_hashtype, - obj->lo_objcount, - (char *)op_data->op_name, - op_data->op_namelen); - rpid = obj->lo_inodes[mea_idx].li_fid; - mds = obj->lo_inodes[mea_idx].li_mds; - } - sop_data->op_bias &= ~MDS_CHECK_SPLIT; - lmv_obj_put(obj); - } else { - rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds); - if (rc) - GOTO(out_free_sop_data, rc); - sop_data->op_bias |= MDS_CHECK_SPLIT; - } - fid_zero(&sop_data->op_fid2); - } - - sop_data->op_bias &= ~MDS_CROSS_REF; - sop_data->op_fid1 = rpid; - - rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm, lmmsize, - it, flags, reqp, cb_blocking, extra_lock_flags); - if (rc > 0) { - LASSERT(fid_is_sane(&op_data->op_fid2)); - /* - * Very interesting. it seems object is still valid but for some - * reason llite calls lookup, not revalidate. - */ - CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n", - PFID(&rpid)); - LASSERT(*reqp == NULL); - GOTO(out_free_sop_data, rc); - } - - if (rc == 0 && *reqp == NULL) { - /* once again, we're asked for lookup, not revalidate */ - CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n", - PFID(&rpid)); - GOTO(out_free_sop_data, rc); - } - - if (rc == -ERESTART) { - LASSERT(*reqp != NULL); - DEBUG_REQ(D_WARNING|D_RPCTRACE, *reqp, - "Got -ERESTART during lookup!\n"); - ptlrpc_req_finished(*reqp); - *reqp = NULL; - it->d.lustre.it_data = 0; - /* - * Directory got split since last update. This shouldn't be - * because splitting causes lock revocation, so revalidate had - * to fail and lookup on dir had to return mea. - */ - CWARN("we haven't knew about directory splitting!\n"); - LASSERT(obj == NULL); - - obj = lmv_obj_create(exp, &rpid, NULL); - if (IS_ERR(obj)) - GOTO(out_free_sop_data, rc = PTR_ERR(obj)); - lmv_obj_put(obj); - goto repeat; - } - - if (rc < 0) - GOTO(out_free_sop_data, rc); - - /* - * Okay, MDS has returned success. Probably name has been resolved in - * remote inode. - */ - rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp, - cb_blocking, extra_lock_flags); - - if (rc == 0 && (mea = lmv_get_mea(*reqp))) { - /* Wow! This is split dir, we'd like to handle it. */ - body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY); - LASSERT(body != NULL); - LASSERT((body->valid & OBD_MD_FLID) != 0); - - obj = lmv_obj_grab(obd, &body->fid1); - if (!obj) { - obj = lmv_obj_create(exp, &body->fid1, mea); + mea = lmv_get_mea(*reqp); + if (mea != NULL) { + obj = lmv_object_create(exp, &body->fid1, mea); if (IS_ERR(obj)) GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj)); } - lmv_obj_put(obj); + } else { + CDEBUG(D_INODE, "Slave attributes for "DFID", rc %d\n", + PFID(&body->fid1), rc); + + rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1, + cb_blocking, extra_lock_flags); + lmv_object_put(obj); } EXIT; @@ -823,13 +515,13 @@ int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data, int extra_lock_flags) { struct obd_device *obd = exp->exp_obd; - int rc; + int rc; ENTRY; LASSERT(it != NULL); LASSERT(fid_is_sane(&op_data->op_fid1)); - CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on "DFID"\n", + CDEBUG(D_INODE, "INTENT LOCK '%s' for '%*s' on "DFID"\n", LL_IT2STR(it), op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1)); @@ -837,7 +529,7 @@ int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data, if (rc) RETURN(rc); - if (it->it_op & IT_LOOKUP) + if (it->it_op & (IT_LOOKUP | IT_GETATTR)) rc = lmv_intent_lookup(exp, op_data, lmm, lmmsize, it, flags, reqp, cb_blocking, extra_lock_flags); @@ -845,10 +537,6 @@ int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data, rc = lmv_intent_open(exp, op_data, lmm, lmmsize, it, flags, reqp, cb_blocking, extra_lock_flags); - else if (it->it_op & IT_GETATTR) - rc = lmv_intent_getattr(exp, op_data,lmm, lmmsize, it, - flags, reqp, cb_blocking, - extra_lock_flags); else LBUG(); RETURN(rc); @@ -859,98 +547,103 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, int master_valid, ldlm_blocking_callback cb_blocking, int extra_lock_flags) { - struct obd_device *obd = exp->exp_obd; - struct ptlrpc_request *mreq = *reqp; - struct lmv_obd *lmv = &obd->u.lmv; - struct lustre_handle master_lockh; - struct obd_export *tgt_exp; - struct md_op_data *op_data; - struct ldlm_lock *lock; - unsigned long size = 0; - struct mdt_body *body; - struct lmv_obj *obj; - int master_lock_mode; - int i, rc = 0; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + int master_lockm = 0; + struct lustre_handle *lockh = NULL; + struct ptlrpc_request *mreq = *reqp; + struct lustre_handle master_lockh; + struct md_op_data *op_data; + struct ldlm_lock *lock; + unsigned long size = 0; + struct mdt_body *body; + struct lmv_object *obj; + int i; + int rc = 0; + struct lu_fid fid; + struct ptlrpc_request *req; + ldlm_blocking_callback cb; + struct lookup_intent it; + struct lmv_tgt_desc *tgt; + int master; ENTRY; + CDEBUG(D_INODE, "Revalidate master obj "DFID"\n", PFID(mid)); + OBD_ALLOC_PTR(op_data); if (op_data == NULL) RETURN(-ENOMEM); /* * We have to loop over the subobjects, check validity and update them - * from MDSs if needed. it's very useful that we need not to update all - * the fields. say, common fields (that are equal on all the subojects + * from MDS if needed. It's very useful that we need not to update all + * the fields. Say, common fields (that are equal on all the subojects * need not to be update, another fields (i_size, for example) are * cached all the time. */ - obj = lmv_obj_grab(obd, mid); - LASSERT(obj != NULL); - - master_lock_mode = 0; - - lmv_obj_lock(obj); + obj = lmv_object_find_lock(obd, mid); + if (obj == NULL) + RETURN(-EALREADY); for (i = 0; i < obj->lo_objcount; i++) { - struct lu_fid fid = obj->lo_inodes[i].li_fid; - struct lustre_handle *lockh = NULL; - struct ptlrpc_request *req = NULL; - ldlm_blocking_callback cb; - struct lookup_intent it; - int master = 0; - - CDEBUG(D_OTHER, "revalidate subobj "DFID"\n", - PFID(&fid)); + fid = obj->lo_stripes[i].ls_fid; + master = lu_fid_eq(&fid, &obj->lo_fid); + cb = master ? cb_blocking : lmv_blocking_ast; - memset(op_data, 0, sizeof(*op_data)); + /* + * We need i_size and we would like to check possible cached locks, + * so this is is IT_GETATTR intent. + */ memset(&it, 0, sizeof(it)); it.it_op = IT_GETATTR; - cb = lmv_blocking_ast; - - if (lu_fid_eq(&fid, &obj->lo_fid)) { - if (master_valid) { - /* - * lmv_intent_getattr() already checked - * validness and took the lock. - */ - if (mreq) { - /* - * It even got the reply refresh attrs - * from that reply. - */ - body = req_capsule_server_get( - &mreq->rq_pill, - &RMF_MDT_BODY); - LASSERT(body != NULL); - goto update; - } - /* take already cached attrs into account */ - CDEBUG(D_OTHER, - "master is locked and cached\n"); - goto release_lock; + if (master && master_valid) { + /* + * lmv_intent_lookup() already checked + * validness and took the lock. + */ + if (mreq != NULL) { + body = req_capsule_server_get(&mreq->rq_pill, + &RMF_MDT_BODY); + LASSERT(body != NULL); + goto update; } - master = 1; - cb = cb_blocking; + /* + * Take already cached attrs into account. + */ + CDEBUG(D_INODE, + "Master "DFID"is locked and cached\n", + PFID(mid)); + goto release_lock; } + /* + * Prepare op_data for revalidating. Note that @fid2 shuld be + * defined otherwise it will go to server and take new lock + * which is what we reall not need here. + */ + memset(op_data, 0, sizeof(*op_data)); + op_data->op_bias = MDS_CROSS_REF; op_data->op_fid1 = fid; op_data->op_fid2 = fid; - op_data->op_bias = MDS_CROSS_REF; + req = NULL; - /* Is obj valid? */ - tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds); - if (IS_ERR(tgt_exp)) - GOTO(cleanup, rc = PTR_ERR(tgt_exp)); + tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds); + if (IS_ERR(tgt)) + GOTO(cleanup, rc = PTR_ERR(tgt)); - rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, &req, cb, - extra_lock_flags); + CDEBUG(D_INODE, "Revalidate slave obj "DFID" -> mds #%d\n", + PFID(&fid), tgt->ltd_idx); + + rc = md_intent_lock(tgt->ltd_exp, op_data, NULL, 0, &it, 0, + &req, cb, extra_lock_flags); lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle; if (rc > 0 && req == NULL) { - /* Nice, this slave is valid */ - LASSERT(req == NULL); - CDEBUG(D_OTHER, "cached\n"); + /* + * Nice, this slave is valid. + */ + CDEBUG(D_INODE, "Cached slave "DFID"\n", PFID(&fid)); goto release_lock; } @@ -958,17 +651,21 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, GOTO(cleanup, rc); if (master) { - LASSERT(master_valid == 0); - /* Save lock on master to be returned to the caller. */ - CDEBUG(D_OTHER, "no lock on master yet\n"); + /* + * Save lock on master to be returned to the caller. + */ + CDEBUG(D_INODE, "No lock on master "DFID" yet\n", + PFID(mid)); memcpy(&master_lockh, lockh, sizeof(master_lockh)); - master_lock_mode = it.d.lustre.it_lock_mode; + master_lockm = it.d.lustre.it_lock_mode; it.d.lustre.it_lock_mode = 0; } else { - /* This is slave. We want to control it. */ + /* + * This is slave. We want to control it. + */ lock = ldlm_handle2lock(lockh); LASSERT(lock != NULL); - lock->l_ast_data = lmv_obj_get(obj); + lock->l_ast_data = lmv_object_get(obj); LDLM_LOCK_PUT(lock); } @@ -977,7 +674,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, * This is first reply, we'll use it to return updated * data back to the caller. */ - LASSERT(req); + LASSERT(req != NULL); ptlrpc_request_addref(req); *reqp = req; } @@ -986,17 +683,17 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, LASSERT(body != NULL); update: - obj->lo_inodes[i].li_size = body->size; + obj->lo_stripes[i].ls_size = body->size; - CDEBUG(D_OTHER, "fresh: %lu\n", - (unsigned long)obj->lo_inodes[i].li_size); + CDEBUG(D_INODE, "Fresh size %lu from "DFID"\n", + (unsigned long)obj->lo_stripes[i].ls_size, PFID(&fid)); if (req) ptlrpc_req_finished(req); release_lock: - size += obj->lo_inodes[i].li_size; + size += obj->lo_stripes[i].ls_size; - if (it.d.lustre.it_lock_mode) { + if (it.d.lustre.it_lock_mode && lockh) { ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode); it.d.lustre.it_lock_mode = 0; } @@ -1007,12 +704,11 @@ release_lock: * Some attrs got refreshed, we have reply and it's time to put * fresh attrs to it. */ - CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n", - (unsigned long)size); + CDEBUG(D_INODE, "Return refreshed attrs: size = %lu for "DFID"\n", + (unsigned long)size, PFID(mid)); body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY); LASSERT(body != NULL); - body->size = size; if (mreq == NULL) { @@ -1025,21 +721,54 @@ release_lock: } if (master_valid == 0) { oit->d.lustre.it_lock_handle = master_lockh.cookie; - oit->d.lustre.it_lock_mode = master_lock_mode; + oit->d.lustre.it_lock_mode = master_lockm; } rc = 0; } else { - /* It seems all the attrs are fresh and we did no request */ - CDEBUG(D_OTHER, "all the attrs were fresh\n"); + /* + * It seems all the attrs are fresh and we did no request. + */ + CDEBUG(D_INODE, "All the attrs were fresh on "DFID"\n", + PFID(mid)); if (master_valid == 0) - oit->d.lustre.it_lock_mode = master_lock_mode; + oit->d.lustre.it_lock_mode = master_lockm; rc = 1; } EXIT; cleanup: OBD_FREE_PTR(op_data); - lmv_obj_unlock(obj); - lmv_obj_put(obj); + lmv_object_put_unlock(obj); return rc; } + +int lmv_allocate_slaves(struct obd_device *obd, struct lu_fid *pid, + struct md_op_data *op, struct lu_fid *fid) +{ + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_object *obj; + mdsno_t mds; + int sidx; + int rc; + ENTRY; + + obj = lmv_object_find(obd, pid); + if (obj == NULL) + RETURN(-EALREADY); + + sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, + (char *)op->op_name, op->op_namelen); + mds = obj->lo_stripes[sidx].ls_mds; + lmv_object_put(obj); + + rc = __lmv_fid_alloc(lmv, fid, mds); + if (rc) { + CERROR("Can't allocate fid, rc %d\n", rc); + RETURN(rc); + } + + CDEBUG(D_INODE, "Allocate new fid "DFID" for slave " + "obj -> mds #"LPU64"\n", PFID(fid), mds); + + RETURN(rc); +} diff --git a/lustre/lmv/lmv_internal.h b/lustre/lmv/lmv_internal.h index c898acf..818d53d 100644 --- a/lustre/lmv/lmv_internal.h +++ b/lustre/lmv/lmv_internal.h @@ -40,15 +40,6 @@ #include #include -#ifndef __KERNEL__ -/* XXX: dirty hack, needs to be fixed more clever way. */ -struct qstr { - const char *name; - size_t len; - unsigned hashval; -}; -#endif - #define LMV_MAX_TGT_COUNT 128 #define lmv_init_lock(lmv) down(&lmv->init_sem); @@ -57,66 +48,108 @@ struct qstr { #define LL_IT2STR(it) \ ((it) ? ldlm_it2str((it)->it_op) : "0") -struct lmv_inode { - struct lu_fid li_fid; /* id of dirobj */ - mdsno_t li_mds; /* cached mdsno where @li_fid lives */ - unsigned long li_size; /* slave size value */ - int li_flags; +struct lmv_stripe { + /** + * Dir stripe fid. + */ + struct lu_fid ls_fid; + /** + * Cached home mds number for @li_fid. + */ + mdsno_t ls_mds; + /** + * Stripe object size. + */ + unsigned long ls_size; + /** + * Stripe flags. + */ + int ls_flags; }; -#define O_FREEING (1 << 0) - -struct lmv_obj { - struct list_head lo_list; - struct semaphore lo_guard; - int lo_state; /* object state. */ - atomic_t lo_count; /* ref counter. */ - struct lu_fid lo_fid; /* master id of dir */ - void *lo_update; /* bitmap of status (up-to-date) */ - __u32 lo_hashtype; - int lo_objcount; /* number of slaves */ - struct lmv_inode *lo_inodes; /* array of sub-objs */ - struct obd_device *lo_obd; /* pointer to LMV itself */ +#define O_FREEING (1 << 0) + +struct lmv_object { + /** + * Link to global objects list. + */ + struct list_head lo_list; + /** + * Sema for protecting fields. + */ + struct semaphore lo_guard; + /** + * Object state like O_FREEING. + */ + int lo_state; + /** + * Object ref counter. + */ + atomic_t lo_count; + /** + * Object master fid. + */ + struct lu_fid lo_fid; + /** + * Object hash type to find stripe by name. + */ + __u32 lo_hashtype; + /** + * Number of stripes. + */ + int lo_objcount; + /** + * Array of sub-objs. + */ + struct lmv_stripe *lo_stripes; + /** + * Pointer to LMV obd. + */ + struct obd_device *lo_obd; }; -int lmv_obj_setup(struct obd_device *obd); -void lmv_obj_cleanup(struct obd_device *obd); +int lmv_object_setup(struct obd_device *obd); +void lmv_object_cleanup(struct obd_device *obd); static inline void -lmv_obj_lock(struct lmv_obj *obj) +lmv_object_lock(struct lmv_object *obj) { LASSERT(obj); down(&obj->lo_guard); } static inline void -lmv_obj_unlock(struct lmv_obj *obj) +lmv_object_unlock(struct lmv_object *obj) { LASSERT(obj); up(&obj->lo_guard); } -void lmv_obj_add(struct lmv_obj *obj); -void lmv_obj_del(struct lmv_obj *obj); +void lmv_object_add(struct lmv_object *obj); +void lmv_object_del(struct lmv_object *obj); + +void lmv_object_put(struct lmv_object *obj); +void lmv_object_put_unlock(struct lmv_object *obj); +void lmv_object_free(struct lmv_object *obj); -void lmv_obj_put(struct lmv_obj *obj); -void lmv_obj_free(struct lmv_obj *obj); +struct lmv_object *lmv_object_get(struct lmv_object *obj); -struct lmv_obj *lmv_obj_get(struct lmv_obj *obj); +struct lmv_object *lmv_object_find(struct obd_device *obd, + const struct lu_fid *fid); -struct lmv_obj *lmv_obj_grab(struct obd_device *obd, - const struct lu_fid *fid); +struct lmv_object *lmv_object_find_lock(struct obd_device *obd, + const struct lu_fid *fid); -struct lmv_obj *lmv_obj_alloc(struct obd_device *obd, - const struct lu_fid *fid, - struct lmv_stripe_md *mea); +struct lmv_object *lmv_object_alloc(struct obd_device *obd, + const struct lu_fid *fid, + struct lmv_stripe_md *mea); -struct lmv_obj *lmv_obj_create(struct obd_export *exp, - const struct lu_fid *fid, - struct lmv_stripe_md *mea); +struct lmv_object *lmv_object_create(struct obd_export *exp, + const struct lu_fid *fid, + struct lmv_stripe_md *mea); -int lmv_obj_delete(struct obd_export *exp, - const struct lu_fid *fid); +int lmv_object_delete(struct obd_export *exp, + const struct lu_fid *fid); int lmv_check_connect(struct obd_device *obd); @@ -138,11 +171,8 @@ int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data, ldlm_blocking_callback cb_blocking, int extra_lock_flags); -int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data, - void *lmm, int lmmsize, struct lookup_intent *it, - int flags, struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking, - int extra_lock_flags); +int lmv_allocate_slaves(struct obd_device *obd, struct lu_fid *pid, + struct md_op_data *op, struct lu_fid *fid); int lmv_revalidate_slaves(struct obd_export *, struct ptlrpc_request **, const struct lu_fid *, struct lookup_intent *, int, @@ -158,34 +188,31 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, mdsno_t mds); int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid, struct md_op_data *op_data); -int lmv_alloc_slave_fids(struct obd_device *obd, struct lu_fid *pid, - struct md_op_data *op, struct lu_fid *fid); -static inline struct lmv_stripe_md * -lmv_get_mea(struct ptlrpc_request *req) +static inline struct lmv_stripe_md *lmv_get_mea(struct ptlrpc_request *req) { - struct mdt_body *body; - struct lmv_stripe_md *mea; + struct mdt_body *body; + struct lmv_stripe_md *mea; - LASSERT(req); + LASSERT(req != NULL); body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - if (!body || !S_ISDIR(body->mode) || !body->eadatasize) - return NULL; + if (!body || !S_ISDIR(body->mode) || !body->eadatasize) + return NULL; mea = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, body->eadatasize); - LASSERT(mea != NULL); + LASSERT(mea != NULL); - if (mea->mea_count == 0) - return NULL; + if (mea->mea_count == 0) + return NULL; if( mea->mea_magic != MEA_MAGIC_LAST_CHAR && mea->mea_magic != MEA_MAGIC_ALL_CHARS && mea->mea_magic != MEA_MAGIC_HASH_SEGMENT) return NULL; - - return mea; + + return mea; } static inline int lmv_get_easize(struct lmv_obd *lmv) @@ -201,12 +228,6 @@ lmv_get_target(struct lmv_obd *lmv, mdsno_t mds) return &lmv->tgts[mds]; } -static inline struct obd_export * -lmv_get_export(struct lmv_obd *lmv, mdsno_t mds) -{ - return lmv_get_target(lmv, mds)->ltd_exp; -} - static inline struct lmv_tgt_desc * lmv_find_target(struct lmv_obd *lmv, const struct lu_fid *fid) { @@ -220,22 +241,6 @@ lmv_find_target(struct lmv_obd *lmv, const struct lu_fid *fid) return lmv_get_target(lmv, mds); } -static inline struct obd_export * -lmv_find_export(struct lmv_obd *lmv, const struct lu_fid *fid) -{ - struct lmv_tgt_desc *tgt = lmv_find_target(lmv, fid); - if (IS_ERR(tgt)) - return (struct obd_export *)tgt; - return tgt->ltd_exp; -} - -static inline void lmv_update_body(struct mdt_body *body, - struct lmv_inode *lino) -{ - /* update object size */ - body->size += lino->li_size; -} - /* lproc_lmv.c */ #ifdef LPROCFS void lprocfs_lmv_init_vars(struct lprocfs_static_vars *lvars); diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 4a30926..5417c67 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -64,8 +64,8 @@ #include "lmv_internal.h" /* object cache. */ -cfs_mem_cache_t *obj_cache; -atomic_t obj_cache_count = ATOMIC_INIT(0); +cfs_mem_cache_t *lmv_object_cache; +atomic_t lmv_object_count = ATOMIC_INIT(0); static void lmv_activate_target(struct lmv_obd *lmv, struct lmv_tgt_desc *tgt, @@ -78,7 +78,8 @@ static void lmv_activate_target(struct lmv_obd *lmv, lmv->desc.ld_active_tgt_count += (activate ? 1 : -1); } -/* Error codes: +/** + * Error codes: * * -EINVAL : UUID can't be found in the LMV's target list * -ENOTCONN: The UUID is found, but the target connection is bad (!) @@ -87,9 +88,10 @@ static void lmv_activate_target(struct lmv_obd *lmv, static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid, int activate) { - struct lmv_tgt_desc *tgt; - struct obd_device *obd; - int i, rc = 0; + struct lmv_tgt_desc *tgt; + struct obd_device *obd; + int i; + int rc = 0; ENTRY; CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n", @@ -100,7 +102,7 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid, if (tgt->ltd_exp == NULL) continue; - CDEBUG(D_INFO, "lmv idx %d is %s conn "LPX64"\n", + CDEBUG(D_INFO, "Target idx %d is %s conn "LPX64"\n", i, tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie); if (obd_uuid_equals(uuid, &tgt->ltd_uuid)) @@ -125,11 +127,9 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid, GOTO(out_lmv_lock, rc); } - CDEBUG(D_INFO, "Marking OBD %p %sactive\n", - obd, activate ? "" : "in"); - + CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, + activate ? "" : "in"); lmv_activate_target(lmv, tgt, activate); - EXIT; out_lmv_lock: @@ -140,8 +140,8 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid, static int lmv_set_mdc_data(struct lmv_obd *lmv, struct obd_uuid *uuid, struct obd_connect_data *data) { - struct lmv_tgt_desc *tgt; - int i; + struct lmv_tgt_desc *tgt; + int i; ENTRY; LASSERT(data != NULL); @@ -169,9 +169,10 @@ struct obd_uuid *lmv_get_uuid(struct obd_export *exp) { static int lmv_notify(struct obd_device *obd, struct obd_device *watched, enum obd_notify_event ev, void *data) { - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_uuid *uuid; - int rc = 0; + struct obd_connect_data *conn_data; + struct lmv_obd *lmv = &obd->u.lmv; + struct obd_uuid *uuid; + int rc = 0; ENTRY; if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) { @@ -196,11 +197,11 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched, RETURN(rc); } } else if (ev == OBD_NOTIFY_OCD) { - struct obd_connect_data *conn_data = - &watched->u.cli.cl_import->imp_connect_data; + conn_data = &watched->u.cli.cl_import->imp_connect_data; - /* Set connect data to desired target, update - * exp_connect_flags. */ + /* + * Set connect data to desired target, update exp_connect_flags. + */ rc = lmv_set_mdc_data(lmv, uuid, conn_data); if (rc) { CERROR("can't set connect data to target %s, rc %d\n", @@ -218,19 +219,25 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched, } #if 0 else if (ev == OBD_NOTIFY_DISCON) { - /* For disconnect event, flush fld cache for failout MDS case. */ + /* + * For disconnect event, flush fld cache for failout MDS case. + */ fld_client_flush(&lmv->lmv_fld); } #endif - /* Pass the notification up the chain. */ + /* + * Pass the notification up the chain. + */ if (obd->obd_observer) rc = obd_notify(obd->obd_observer, watched, ev, data); RETURN(rc); } -/* this is fake connect function. Its purpose is to initialize lmv and say - * caller that everything is okay. Real connection will be performed later. */ +/** + * This is fake connect function. Its purpose is to initialize lmv and say + * caller that everything is okay. Real connection will be performed later. + */ static int lmv_connect(const struct lu_env *env, struct lustre_handle *conn, struct obd_device *obd, struct obd_uuid *cluuid, struct obd_connect_data *data, @@ -239,9 +246,9 @@ static int lmv_connect(const struct lu_env *env, #ifdef __KERNEL__ struct proc_dir_entry *lmv_proc_dir; #endif - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *exp; - int rc = 0; + struct lmv_obd *lmv = &obd->u.lmv; + struct obd_export *exp; + int rc = 0; ENTRY; rc = class_connect(conn, obd, cluuid); @@ -252,8 +259,10 @@ static int lmv_connect(const struct lu_env *env, exp = class_conn2export(conn); - /* we don't want to actually do the underlying connections more than - * once, so keep track. */ + /* + * We don't want to actually do the underlying connections more than + * once, so keep track. + */ lmv->refcount++; if (lmv->refcount > 1) { class_export_put(exp); @@ -277,10 +286,12 @@ static int lmv_connect(const struct lu_env *env, } #endif - /* all real clients should perform actual connection right away, because + /* + * All real clients should perform actual connection right away, because * it is possible, that LMV will not have opportunity to connect targets * and MDC stuff will be called directly, for instance while reading - * ../mdc/../kbytesfree procfs file, etc. */ + * ../mdc/../kbytesfree procfs file, etc. + */ if (data->ocd_connect_flags & OBD_CONNECT_REAL) rc = lmv_check_connect(obd); @@ -296,9 +307,9 @@ static int lmv_connect(const struct lu_env *env, static void lmv_set_timeouts(struct obd_device *obd) { - struct lmv_tgt_desc *tgts; - struct lmv_obd *lmv; - int i; + struct lmv_tgt_desc *tgts; + struct lmv_obd *lmv; + int i; lmv = &obd->u.lmv; if (lmv->server_timeout == 0) @@ -319,9 +330,11 @@ static void lmv_set_timeouts(struct obd_device *obd) static int lmv_init_ea_size(struct obd_export *exp, int easize, int def_easize, int cookiesize) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - int i, rc = 0, change = 0; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + int i; + int rc = 0; + int change = 0; ENTRY; if (lmv->max_easize < easize) { @@ -363,28 +376,20 @@ static int lmv_init_ea_size(struct obd_export *exp, int easize, int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) { - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_uuid *cluuid = &lmv->cluuid; - struct obd_connect_data *mdc_data = NULL; - struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" }; - struct lustre_handle conn = {0, }; - struct obd_device *mdc_obd; - struct obd_export *mdc_exp; - struct lu_fld_target target; - int rc; #ifdef __KERNEL__ - struct proc_dir_entry *lmv_proc_dir; + struct proc_dir_entry *lmv_proc_dir; #endif + struct lmv_obd *lmv = &obd->u.lmv; + struct obd_uuid *cluuid = &lmv->cluuid; + struct obd_connect_data *mdc_data = NULL; + struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" }; + struct lustre_handle conn = {0, }; + struct obd_device *mdc_obd; + struct obd_export *mdc_exp; + struct lu_fld_target target; + int rc; ENTRY; - /* for MDS: don't connect to yourself */ - if (obd_uuid_equals(&tgt->ltd_uuid, cluuid)) { - CDEBUG(D_CONFIG, "don't connect back to %s\n", cluuid->uuid); - /* XXX - the old code didn't increment active tgt count. - * should we ? */ - RETURN(0); - } - mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME, &obd->obd_uuid); if (!mdc_obd) { @@ -411,12 +416,13 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) mdc_exp = class_conn2export(&conn); - /* Init fid sequence client for this mdc. */ + /* + * Init fid sequence client for this mdc and add new fld target. + */ rc = obd_fid_init(mdc_exp); if (rc) RETURN(rc); - /* Add new FLD target. */ target.ft_srv = NULL; target.ft_exp = mdc_exp; target.ft_idx = tgt->ltd_idx; @@ -434,7 +440,9 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) } if (obd->obd_observer) { - /* tell the mds_lmv about the new target */ + /* + * Tell the observer about the new target. + */ rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd, OBD_NOTIFY_ACTIVE, (void *)(tgt - lmv->tgts)); if (rc) { @@ -447,13 +455,15 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) tgt->ltd_exp = mdc_exp; lmv->desc.ld_active_tgt_count++; - /* copy connect data, it may be used later */ + /* + * Copy connect data, it may be used later. + */ lmv->datas[tgt->ltd_idx] = *mdc_data; md_init_ea_size(tgt->ltd_exp, lmv->max_easize, lmv->max_def_easize, lmv->max_cookiesize); - CDEBUG(D_CONFIG, "connected to %s(%s) successfully (%d)\n", + CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n", mdc_obd->obd_name, mdc_obd->obd_uuid.uuid, atomic_read(&obd->obd_refcount)); @@ -472,7 +482,7 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) mdc_symlink = proc_symlink(mdc_obd->obd_name, lmv_proc_dir, name); if (mdc_symlink == NULL) { - CERROR("could not register LMV target " + CERROR("Could not register LMV target " "/proc/fs/lustre/%s/%s/target_obds/%s.", obd->obd_type->typ_name, obd->obd_name, mdc_obd->obd_name); @@ -486,18 +496,18 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid) { - struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *tgt; - int rc = 0; + int rc = 0; ENTRY; - CDEBUG(D_CONFIG, "tgt_uuid: %s.\n", tgt_uuid->uuid); + CDEBUG(D_CONFIG, "Target uuid: %s.\n", tgt_uuid->uuid); lmv_init_lock(lmv); if (lmv->desc.ld_active_tgt_count >= LMV_MAX_TGT_COUNT) { lmv_init_unlock(lmv); - CERROR("can't add %s, LMV module compiled for %d MDCs. " + CERROR("Can't add %s, LMV module compiled for %d MDCs. " "That many MDCs already configured.\n", tgt_uuid->uuid, LMV_MAX_TGT_COUNT); RETURN(-EINVAL); @@ -543,12 +553,13 @@ int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid) RETURN(rc); } -/* performs a check if passed obd is connected. If no - connect it. */ int lmv_check_connect(struct obd_device *obd) { - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; - int i, rc, easize; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int i; + int rc; + int easize; ENTRY; if (lmv->connected) @@ -565,7 +576,7 @@ int lmv_check_connect(struct obd_device *obd) RETURN(-EINVAL); } - CDEBUG(D_CONFIG, "time to connect %s to %s\n", + CDEBUG(D_CONFIG, "Time to connect %s to %s\n", lmv->cluuid.uuid, obd->obd_name); LASSERT(lmv->tgts != NULL); @@ -593,7 +604,7 @@ int lmv_check_connect(struct obd_device *obd) --lmv->desc.ld_active_tgt_count; rc2 = obd_disconnect(tgt->ltd_exp); if (rc2) { - CERROR("error: LMV target %s disconnect on " + CERROR("LMV target %s disconnect on " "MDC idx %d: error %d\n", tgt->ltd_uuid.uuid, i, rc2); } @@ -607,11 +618,11 @@ int lmv_check_connect(struct obd_device *obd) static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) { #ifdef __KERNEL__ - struct proc_dir_entry *lmv_proc_dir; + struct proc_dir_entry *lmv_proc_dir; #endif - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_device *mdc_obd; - int rc; + struct lmv_obd *lmv = &obd->u.lmv; + struct obd_device *mdc_obd; + int rc; ENTRY; LASSERT(tgt != NULL); @@ -641,7 +652,7 @@ static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) if (rc) CERROR("Can't finanize fids factory\n"); - CDEBUG(D_OTHER, "Disconnected from %s(%s) successfully\n", + CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n", tgt->ltd_exp->exp_obd->obd_name, tgt->ltd_exp->exp_obd->obd_uuid.uuid); @@ -661,18 +672,21 @@ static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) static int lmv_disconnect(struct obd_export *exp) { - struct obd_device *obd = class_exp2obd(exp); + struct obd_device *obd = class_exp2obd(exp); #ifdef __KERNEL__ struct proc_dir_entry *lmv_proc_dir; #endif - struct lmv_obd *lmv = &obd->u.lmv; - int rc, i; + struct lmv_obd *lmv = &obd->u.lmv; + int rc; + int i; ENTRY; if (!lmv->tgts) goto out_local; - /* Only disconnect the underlying layers on the final disconnect. */ + /* + * Only disconnect the underlying layers on the final disconnect. + */ lmv->refcount--; if (lmv->refcount != 0) goto out_local; @@ -709,9 +723,11 @@ out_local: static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void *uarg) { - struct obd_device *obddev = class_exp2obd(exp); - struct lmv_obd *lmv = &obddev->u.lmv; - int i, rc = 0, set = 0; + struct obd_device *obddev = class_exp2obd(exp); + struct lmv_obd *lmv = &obddev->u.lmv; + int i; + int rc = 0; + int set = 0; ENTRY; if (lmv->desc.ld_tgt_count == 0) @@ -737,14 +753,12 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, if (!mdc_obd) RETURN(-EINVAL); - /* got statfs data */ rc = obd_statfs(mdc_obd, &stat_buf, cfs_time_current_64() - HZ, 0); if (rc) RETURN(rc); if (copy_to_user(data->ioc_pbuf1, &stat_buf, data->ioc_plen1)) RETURN(rc); - /* copy UUID */ rc = copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd), data->ioc_plen2); break; @@ -777,11 +791,6 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, RETURN(rc); } -enum MDS_POLICY { - CHAR_TYPE, - NID_TYPE -}; - static int lmv_all_chars_policy(int count, const char *name, int len) { @@ -795,51 +804,60 @@ static int lmv_all_chars_policy(int count, const char *name, static int lmv_nid_policy(struct lmv_obd *lmv) { - struct obd_import *imp = class_exp2cliimp(lmv->tgts[0].ltd_exp); - __u32 id; + struct obd_import *imp; + __u32 id; + /* - * XXX Hack: to get nid we assume that underlying obd device is mdc. + * XXX: To get nid we assume that underlying obd device is mdc. */ + imp = class_exp2cliimp(lmv->tgts[0].ltd_exp); id = imp->imp_connection->c_self ^ (imp->imp_connection->c_self >> 32); return id % lmv->desc.ld_tgt_count; } static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data, - int type) + placement_policy_t placement) { - switch (type) { - case CHAR_TYPE: + switch (placement) { + case PLACEMENT_CHAR_POLICY: return lmv_all_chars_policy(lmv->desc.ld_tgt_count, op_data->op_name, op_data->op_namelen); - case NID_TYPE: + case PLACEMENT_NID_POLICY: return lmv_nid_policy(lmv); default: break; } - CERROR("unsupport type %d \n", type); + CERROR("Unsupported placement policy %x\n", placement); return -EINVAL; } -/* This is _inode_ placement policy function (not name). */ +/** + * This is _inode_ placement policy function (not name). + */ static int lmv_placement_policy(struct obd_device *obd, struct md_op_data *op_data, mdsno_t *mds) { - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_obj *obj; - int rc; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_object *obj; + int rc; ENTRY; LASSERT(mds != NULL); + if (lmv->desc.ld_tgt_count == 1) { + *mds = 0; + RETURN(0); + } + /* * Allocate new fid on target according to operation type and parent * home mds. */ - obj = lmv_obj_grab(obd, &op_data->op_fid1); + obj = lmv_object_find(obd, &op_data->op_fid1); if (obj != NULL || op_data->op_name == NULL || op_data->op_opc != LUSTRE_OPC_MKDIR) { /* @@ -847,7 +865,7 @@ static int lmv_placement_policy(struct obd_device *obd, * dir is split. */ if (obj) { - lmv_obj_put(obj); + lmv_object_put(obj); /* * If we have this flag turned on, and we see that @@ -867,71 +885,78 @@ static int lmv_placement_policy(struct obd_device *obd, */ *mds = op_data->op_mds; rc = 0; - -#if 0 - /* XXX: This should be removed later wehn we sure it is not - * needed. */ - rc = lmv_fld_lookup(lmv, &op_data->op_fid1, mds); - if (rc) - GOTO(out, rc); -#endif } else { /* * Parent directory is not split and we want to create a * directory in it. Let's calculate where to place it according - * to name. + * to operation data @op_data. */ - *mds = lmv_choose_mds(lmv, op_data, NID_TYPE); + *mds = lmv_choose_mds(lmv, op_data, lmv->lmv_placement); rc = 0; } - EXIT; -#if 0 -out: -#endif + if (rc) { CERROR("Can't choose MDS, err = %d\n", rc); } else { LASSERT(*mds < lmv->desc.ld_tgt_count); } - return rc; + RETURN(rc); } int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, mdsno_t mds) { - struct lmv_tgt_desc *tgt = &lmv->tgts[mds]; - int rc; + struct lmv_tgt_desc *tgt; + int rc; ENTRY; - /* New seq alloc and FLD setup should be atomic. */ + tgt = lmv_get_target(lmv, mds); + + /* + * New seq alloc and FLD setup should be atomic. Otherwise we may find + * on server that seq in new allocated fid is not yet known. + */ down(&tgt->ltd_fid_sem); - /* Asking underlaying tgt layer to allocate new fid. */ + if (!tgt->ltd_active) + GOTO(out, rc = -ENODEV); + + /* + * Asking underlaying tgt layer to allocate new fid. + */ rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL); if (rc > 0) { LASSERT(fid_is_sane(fid)); - /* Client switches to new sequence, setup FLD. */ + /* + * Client switches to new sequence, setup FLD. + */ rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid), mds, NULL); if (rc) { + /* + * Delete just allocated fid sequence in case + * of fail back. + */ CERROR("Can't create fld entry, rc %d\n", rc); - /* Delete just allocated fid sequence */ obd_fid_delete(tgt->ltd_exp, NULL); } } + + EXIT; +out: up(&tgt->ltd_fid_sem); - RETURN(rc); + return rc; } int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid, struct md_op_data *op_data) { - struct obd_device *obd = class_exp2obd(exp); - struct lmv_obd *lmv = &obd->u.lmv; - mdsno_t mds; - int rc; + struct obd_device *obd = class_exp2obd(exp); + struct lmv_obd *lmv = &obd->u.lmv; + mdsno_t mds; + int rc; ENTRY; LASSERT(op_data != NULL); @@ -956,10 +981,9 @@ int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid, static int lmv_fid_delete(struct obd_export *exp, const struct lu_fid *fid) { ENTRY; - - LASSERT(exp && fid); - if (lmv_obj_delete(exp, fid)) { - CDEBUG(D_OTHER, "lmv object "DFID" is destroyed.\n", + LASSERT(exp != NULL && fid != NULL); + if (lmv_object_delete(exp, fid)) { + CDEBUG(D_INODE, "Object "DFID" is destroyed.\n", PFID(fid)); } RETURN(0); @@ -967,10 +991,11 @@ static int lmv_fid_delete(struct obd_export *exp, const struct lu_fid *fid) static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) { - struct lmv_obd *lmv = &obd->u.lmv; - struct lprocfs_static_vars lvars; - struct lmv_desc *desc; - int rc, i = 0; + struct lmv_obd *lmv = &obd->u.lmv; + struct lprocfs_static_vars lvars; + struct lmv_desc *desc; + int rc; + int i = 0; ENTRY; if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) { @@ -980,7 +1005,7 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1); if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) { - CERROR("descriptor size wrong: %d > %d\n", + CERROR("Lmv descriptor size wrong: %d > %d\n", (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1)); RETURN(-EINVAL); } @@ -1008,14 +1033,14 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) lmv->max_cookiesize = 0; lmv->max_def_easize = 0; lmv->max_easize = 0; + lmv->lmv_placement = PLACEMENT_CHAR_POLICY; spin_lock_init(&lmv->lmv_lock); sema_init(&lmv->init_sem, 1); - rc = lmv_obj_setup(obd); + rc = lmv_object_setup(obd); if (rc) { - CERROR("Can't setup LMV object manager, " - "error %d.\n", rc); + CERROR("Can't setup LMV object manager, error %d.\n", rc); GOTO(out_free_datas, rc); } @@ -1026,14 +1051,13 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd_status", 0444, &lmv_proc_target_fops, obd); if (rc) - CWARN("Error adding the target_obd_status file\n"); + CWARN("Error adding target_obd_stats file (%d)\n", rc); } #endif rc = fld_client_init(&lmv->lmv_fld, obd->obd_name, LUSTRE_CLI_FLD_HASH_DHT); if (rc) { - CERROR("can't init FLD, err %d\n", - rc); + CERROR("Can't init FLD, err %d\n", rc); GOTO(out_free_datas, rc); } @@ -1050,12 +1074,12 @@ out_free_tgts: static int lmv_cleanup(struct obd_device *obd) { - struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_obd *lmv = &obd->u.lmv; ENTRY; fld_client_fini(&lmv->lmv_fld); lprocfs_obd_cleanup(obd); - lmv_obj_cleanup(obd); + lmv_object_cleanup(obd); OBD_FREE(lmv->datas, lmv->datas_size); OBD_FREE(lmv->tgts, lmv->tgts_size); @@ -1064,9 +1088,9 @@ static int lmv_cleanup(struct obd_device *obd) static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf) { - struct lustre_cfg *lcfg = buf; - struct obd_uuid tgt_uuid; - int rc; + struct lustre_cfg *lcfg = buf; + struct obd_uuid tgt_uuid; + int rc; ENTRY; switch(lcfg->lcfg_command) { @@ -1089,9 +1113,10 @@ out: static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs, __u64 max_age, __u32 flags) { - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_statfs *temp; - int rc = 0, i; + struct lmv_obd *lmv = &obd->u.lmv; + struct obd_statfs *temp; + int rc = 0; + int i; ENTRY; rc = lmv_check_connect(obd); @@ -1134,9 +1159,9 @@ static int lmv_getstatus(struct obd_export *exp, struct lu_fid *fid, struct obd_capa **pc) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - int rc; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + int rc; ENTRY; rc = lmv_check_connect(obd); @@ -1144,7 +1169,6 @@ static int lmv_getstatus(struct obd_export *exp, RETURN(rc); rc = md_getstatus(lmv->tgts[0].ltd_exp, fid, pc); - RETURN(rc); } @@ -1153,22 +1177,22 @@ static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid, const char *input, int input_size, int output_size, int flags, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; - int rc; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - tgt_exp = lmv_find_export(lmv, fid); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + tgt = lmv_find_target(lmv, fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - rc = md_getxattr(tgt_exp, fid, oc, valid, name, input, input_size, - output_size, flags, request); + rc = md_getxattr(tgt->ltd_exp, fid, oc, valid, name, input, + input_size, output_size, flags, request); RETURN(rc); } @@ -1179,22 +1203,22 @@ static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid, int flags, __u32 suppgid, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; - int rc; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - tgt_exp = lmv_find_export(lmv, fid); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + tgt = lmv_find_target(lmv, fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - rc = md_setxattr(tgt_exp, fid, oc, valid, name, - input, input_size, output_size, flags, suppgid, + rc = md_setxattr(tgt->ltd_exp, fid, oc, valid, name, input, + input_size, output_size, flags, suppgid, request); RETURN(rc); @@ -1204,28 +1228,29 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid, struct obd_capa *oc, obd_valid valid, int ea_size, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; - struct lmv_obj *obj; - int rc, i; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + struct lmv_object *obj; + int rc; + int i; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - tgt_exp = lmv_find_export(lmv, fid); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + tgt = lmv_find_target(lmv, fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - rc = md_getattr(tgt_exp, fid, oc, valid, ea_size, request); + rc = md_getattr(tgt->ltd_exp, fid, oc, valid, ea_size, request); if (rc) RETURN(rc); - obj = lmv_obj_grab(obd, fid); + obj = lmv_object_find_lock(obd, fid); - CDEBUG(D_OTHER, "GETATTR for "DFID" %s\n", PFID(fid), + CDEBUG(D_INODE, "GETATTR for "DFID" %s\n", PFID(fid), obj ? "(split)" : ""); /* @@ -1239,7 +1264,7 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid, struct mdt_body *body; if (*request == NULL) { - lmv_obj_put(obj); + lmv_object_put(obj); RETURN(rc); } @@ -1247,8 +1272,6 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid, &RMF_MDT_BODY); LASSERT(body != NULL); - lmv_obj_lock(obj); - for (i = 0; i < obj->lo_objcount; i++) { if (lmv->tgts[i].ltd_exp == NULL) { CWARN("%s: NULL export for %d\n", @@ -1256,15 +1279,16 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid, continue; } - /* skip master obj. */ - if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid)) + /* + * Skip master object. + */ + if (lu_fid_eq(&obj->lo_fid, &obj->lo_stripes[i].ls_fid)) continue; - lmv_update_body(body, &obj->lo_inodes[i]); + body->size += obj->lo_stripes[i].ls_size; } - lmv_obj_unlock(obj); - lmv_obj_put(obj); + lmv_object_put_unlock(obj); } RETURN(rc); @@ -1273,16 +1297,17 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid, static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid, ldlm_iterator_t it, void *data) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - int i, rc; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + int i; + int rc; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - CDEBUG(D_OTHER, "CBDATA for "DFID"\n", PFID(fid)); + CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid)); /* * With CMD every object can have two locks in different namespaces: @@ -1295,44 +1320,43 @@ static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid, RETURN(0); } -static int lmv_close(struct obd_export *exp, - struct md_op_data *op_data, - struct md_open_data *mod, - struct ptlrpc_request **request) +static int lmv_close(struct obd_export *exp, struct md_op_data *op_data, + struct md_open_data *mod, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; - int rc; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - tgt_exp = lmv_find_export(lmv, &op_data->op_fid1); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + tgt = lmv_find_target(lmv, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - CDEBUG(D_OTHER, "CLOSE "DFID"\n", PFID(&op_data->op_fid1)); - rc = md_close(tgt_exp, op_data, mod, request); + CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1)); + rc = md_close(tgt->ltd_exp, op_data, mod, request); RETURN(rc); } -/* +/** * Called in the case MDS returns -ERESTART on create on open, what means that * directory is split and its LMV presentation object has to be updated. */ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct ptlrpc_request *req = NULL; - struct obd_export *tgt_exp; - struct lmv_obj *obj; - struct lustre_md md; - int mealen, rc; - __u64 valid; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct ptlrpc_request *req = NULL; + struct lmv_tgt_desc *tgt; + struct lmv_object *obj; + struct lustre_md md; + int mealen; + int rc; + __u64 valid; ENTRY; md.mea = NULL; @@ -1340,35 +1364,35 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid) valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA; - tgt_exp = lmv_find_export(lmv, fid); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + tgt = lmv_find_target(lmv, fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - /* time to update mea of parent fid */ - rc = md_getattr(tgt_exp, fid, NULL, valid, mealen, &req); + /* + * Time to update mea of parent fid. + */ + rc = md_getattr(tgt->ltd_exp, fid, NULL, valid, mealen, &req); if (rc) { CERROR("md_getattr() failed, error %d\n", rc); GOTO(cleanup, rc); } - rc = md_get_lustre_md(tgt_exp, req, NULL, exp, &md); + rc = md_get_lustre_md(tgt->ltd_exp, req, NULL, exp, &md); if (rc) { - CERROR("mdc_get_lustre_md() failed, error %d\n", rc); + CERROR("md_get_lustre_md() failed, error %d\n", rc); GOTO(cleanup, rc); } if (md.mea == NULL) GOTO(cleanup, rc = -ENODATA); - obj = lmv_obj_create(exp, fid, md.mea); + obj = lmv_object_create(exp, fid, md.mea); if (IS_ERR(obj)) rc = PTR_ERR(obj); else - lmv_obj_put(obj); + lmv_object_put(obj); - /* XXX LOV STACKING */ obd_free_memmd(exp, (void *)&md.mea); - EXIT; cleanup: if (req) @@ -1381,11 +1405,13 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data, __u32 gid, cfs_cap_t cap_effective, __u64 rdev, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; - struct lmv_obj *obj; - int rc, loop = 0; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + struct lmv_object *obj; + int rc; + int loop = 0; + int sidx; ENTRY; rc = lmv_check_connect(obd); @@ -1397,28 +1423,24 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data, repeat: ++loop; LASSERT(loop <= 2); - obj = lmv_obj_grab(obd, &op_data->op_fid1); - if (obj) { - int mea_idx; - mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, + obj = lmv_object_find(obd, &op_data->op_fid1); + if (obj) { + sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, op_data->op_name, op_data->op_namelen); - op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid; + op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid; op_data->op_bias &= ~MDS_CHECK_SPLIT; - op_data->op_mds = obj->lo_inodes[mea_idx].li_mds; - tgt_exp = lmv_get_export(lmv, op_data->op_mds); - lmv_obj_put(obj); + op_data->op_mds = obj->lo_stripes[sidx].ls_mds; + tgt = lmv_get_target(lmv, op_data->op_mds); + lmv_object_put(obj); } else { - struct lmv_tgt_desc *tgt; - tgt = lmv_find_target(lmv, &op_data->op_fid1); op_data->op_bias |= MDS_CHECK_SPLIT; op_data->op_mds = tgt->ltd_idx; - tgt_exp = tgt->ltd_exp; } - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data); if (rc == -ERESTART) @@ -1426,16 +1448,17 @@ repeat: else if (rc) RETURN(rc); - CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->op_namelen, - op_data->op_name, PFID(&op_data->op_fid1)); + CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #"LPU64"\n", + op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1), + op_data->op_mds); op_data->op_flags |= MF_MDC_CANCEL_FID1; - rc = md_create(tgt_exp, op_data, data, datalen, mode, uid, gid, + rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid, cap_effective, rdev, request); if (rc == 0) { if (*request == NULL) RETURN(rc); - CDEBUG(D_OTHER, "created - "DFID"\n", PFID(&op_data->op_fid1)); + CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2)); } else if (rc == -ERESTART) { LASSERT(*request != NULL); DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, @@ -1449,8 +1472,8 @@ repeat: */ rc = lmv_handle_split(exp, &op_data->op_fid1); if (rc == 0) { - rc = lmv_alloc_slave_fids(obd, &op_data->op_fid1, - op_data, &op_data->op_fid2); + rc = lmv_allocate_slaves(obd, &op_data->op_fid1, + op_data, &op_data->op_fid2); if (rc) RETURN(rc); goto repeat; @@ -1463,21 +1486,21 @@ static int lmv_done_writing(struct obd_export *exp, struct md_op_data *op_data, struct md_open_data *mod) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; - int rc; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - tgt_exp = lmv_find_export(lmv, &op_data->op_fid1); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + tgt = lmv_find_target(lmv, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - rc = md_done_writing(tgt_exp, op_data, mod); + rc = md_done_writing(tgt->ltd_exp, op_data, mod); RETURN(rc); } @@ -1486,12 +1509,13 @@ lmv_enqueue_slaves(struct obd_export *exp, struct ldlm_enqueue_info *einfo, struct lookup_intent *it, struct md_op_data *op_data, struct lustre_handle *lockh, void *lmm, int lmmsize) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_stripe_md *mea = op_data->op_mea1; - struct md_op_data *op_data2; - struct obd_export *tgt_exp; - int i, rc = 0; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_stripe_md *mea = op_data->op_mea1; + struct md_op_data *op_data2; + struct lmv_tgt_desc *tgt; + int i; + int rc = 0; ENTRY; OBD_ALLOC_PTR(op_data2); @@ -1504,17 +1528,17 @@ lmv_enqueue_slaves(struct obd_export *exp, struct ldlm_enqueue_info *einfo, op_data2->op_fid1 = mea->mea_ids[i]; op_data2->op_bias = 0; - tgt_exp = lmv_find_export(lmv, &op_data2->op_fid1); - if (IS_ERR(tgt_exp)) - GOTO(cleanup, rc = PTR_ERR(tgt_exp)); + tgt = lmv_find_target(lmv, &op_data2->op_fid1); + if (IS_ERR(tgt)) + GOTO(cleanup, rc = PTR_ERR(tgt)); - if (tgt_exp == NULL) + if (tgt->ltd_exp == NULL) continue; - rc = md_enqueue(tgt_exp, einfo, it, op_data2, + rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data2, lockh + i, lmm, lmmsize, NULL, 0); - CDEBUG(D_OTHER, "take lock on slave "DFID" -> %d/%d\n", + CDEBUG(D_INODE, "Take lock on slave "DFID" -> %d/%d\n", PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status); if (rc) @@ -1535,7 +1559,9 @@ cleanup: OBD_FREE_PTR(op_data2); if (rc != 0) { - /* drop all taken locks */ + /* + * Drop all taken locks. + */ while (--i >= 0) { if (lockh[i].cookie) ldlm_lock_decref(lockh + i, einfo->ei_mode); @@ -1551,15 +1577,16 @@ lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo, struct lustre_handle *lockh, void *lmm, int lmmsize, int extra_lock_flags) { - struct ptlrpc_request *req = it->d.lustre.it_data; - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lustre_handle plock; - struct obd_export *tgt_exp; - struct md_op_data *rdata; - struct lu_fid fid_copy; - struct mdt_body *body; - int rc = 0, pmode; + struct ptlrpc_request *req = it->d.lustre.it_data; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lustre_handle plock; + struct lmv_tgt_desc *tgt; + struct md_op_data *rdata; + struct lu_fid fid1; + struct mdt_body *body; + int rc = 0; + int pmode; ENTRY; body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); @@ -1568,32 +1595,34 @@ lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo, if (!(body->valid & OBD_MD_MDS)) RETURN(0); - CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID" -> "DFID"\n", + CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n", LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1)); - /* We got LOOKUP lock, but we really need attrs */ + /* + * We got LOOKUP lock, but we really need attrs. + */ pmode = it->d.lustre.it_lock_mode; LASSERT(pmode != 0); memcpy(&plock, lockh, sizeof(plock)); it->d.lustre.it_lock_mode = 0; it->d.lustre.it_data = NULL; - fid_copy = body->fid1; + fid1 = body->fid1; it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE; ptlrpc_req_finished(req); - tgt_exp = lmv_find_export(lmv, &fid_copy); - if (IS_ERR(tgt_exp)) - GOTO(out, rc = PTR_ERR(tgt_exp)); + tgt = lmv_find_target(lmv, &fid1); + if (IS_ERR(tgt)) + GOTO(out, rc = PTR_ERR(tgt)); OBD_ALLOC_PTR(rdata); if (rdata == NULL) GOTO(out, rc = -ENOMEM); - rdata->op_fid1 = fid_copy; + rdata->op_fid1 = fid1; rdata->op_bias = MDS_CROSS_REF; - rc = md_enqueue(tgt_exp, einfo, it, rdata, lockh, + rc = md_enqueue(tgt->ltd_exp, einfo, it, rdata, lockh, lmm, lmmsize, NULL, extra_lock_flags); OBD_FREE_PTR(rdata); EXIT; @@ -1608,70 +1637,73 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, struct lustre_handle *lockh, void *lmm, int lmmsize, struct ptlrpc_request **req, int extra_lock_flags) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp = NULL; - struct lmv_obj *obj; - int rc; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + struct lmv_object *obj; + int sidx; + int rc; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); + CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n", + LL_IT2STR(it), PFID(&op_data->op_fid1)); + if (op_data->op_mea1 && it && it->it_op == IT_UNLINK) { rc = lmv_enqueue_slaves(exp, einfo, it, op_data, lockh, lmm, lmmsize); RETURN(rc); } - if (op_data->op_namelen) { - obj = lmv_obj_grab(obd, &op_data->op_fid1); - if (obj) { - int mea_idx; - - /* directory is split. look for right mds for this - * name */ - mea_idx = raw_name2idx(obj->lo_hashtype, - obj->lo_objcount, - (char *)op_data->op_name, - op_data->op_namelen); - op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid; - tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds); - lmv_obj_put(obj); - } + obj = lmv_object_find(obd, &op_data->op_fid1); + if (obj && op_data->op_namelen) { + sidx = raw_name2idx(obj->lo_hashtype, + obj->lo_objcount, + (char *)op_data->op_name, + op_data->op_namelen); + op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid; + tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds); + } else { + tgt = lmv_find_target(lmv, &op_data->op_fid1); } + if (obj) + lmv_object_put(obj); - if (tgt_exp == NULL) - tgt_exp = lmv_find_export(lmv, &op_data->op_fid1); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); - - CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it), - PFID(&op_data->op_fid1)); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - rc = md_enqueue(tgt_exp, einfo, it, op_data, lockh, + CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n", + LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx); + + rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh, lmm, lmmsize, req, extra_lock_flags); - if (rc == 0 && it && it->it_op == IT_OPEN) + if (rc == 0 && it && it->it_op == IT_OPEN) { rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh, lmm, lmmsize, extra_lock_flags); + } RETURN(rc); } static int lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid, - struct obd_capa *oc, const char *filename, int namelen, + struct obd_capa *oc, const char *name, int namelen, obd_valid valid, int ea_size, __u32 suppgid, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lu_fid rid = *fid; - struct obd_export *tgt_exp; - struct mdt_body *body; - struct lmv_obj *obj; - int rc, loop = 0; + struct ptlrpc_request *req = NULL; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lu_fid rid = *fid; + struct lmv_tgt_desc *tgt; + struct mdt_body *body; + struct lmv_object *obj; + int rc; + int loop = 0; + int sidx; ENTRY; rc = lmv_check_connect(obd); @@ -1681,28 +1713,25 @@ lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid, repeat: ++loop; LASSERT(loop <= 2); - obj = lmv_obj_grab(obd, &rid); + obj = lmv_object_find(obd, &rid); if (obj) { - int mea_idx; - - /* Directory is split. Look for right mds for this name */ - mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, - filename, namelen - 1); - rid = obj->lo_inodes[mea_idx].li_fid; - tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds); - lmv_obj_put(obj); + sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, + name, namelen - 1); + rid = obj->lo_stripes[sidx].ls_fid; + tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds); valid &= ~OBD_MD_FLCKSPLIT; + lmv_object_put(obj); } else { - tgt_exp = lmv_find_export(lmv, &rid); + tgt = lmv_find_target(lmv, &rid); valid |= OBD_MD_FLCKSPLIT; } - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - CDEBUG(D_OTHER, "getattr_name for %*s on "DFID" -> "DFID"\n", - namelen, filename, PFID(fid), PFID(&rid)); + CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" - "DFID" -> mds #%d\n", + namelen, name, PFID(fid), PFID(&rid), tgt->ltd_idx); - rc = md_getattr_name(tgt_exp, &rid, oc, filename, namelen, valid, + rc = md_getattr_name(tgt->ltd_exp, &rid, oc, name, namelen, valid, ea_size, suppgid, request); if (rc == 0) { body = req_capsule_server_get(&(*request)->rq_pill, @@ -1710,20 +1739,18 @@ repeat: LASSERT(body != NULL); if (body->valid & OBD_MD_MDS) { - struct ptlrpc_request *req = NULL; - rid = body->fid1; - CDEBUG(D_OTHER, "request attrs for "DFID"\n", + CDEBUG(D_INODE, "Request attrs for "DFID"\n", PFID(&rid)); - tgt_exp = lmv_find_export(lmv, &rid); - if (IS_ERR(tgt_exp)) { + tgt = lmv_find_target(lmv, &rid); + if (IS_ERR(tgt)) { ptlrpc_req_finished(*request); - RETURN(PTR_ERR(tgt_exp)); + RETURN(PTR_ERR(tgt)); } - rc = md_getattr_name(tgt_exp, &rid, NULL, NULL, 1, - valid | OBD_MD_FLCROSSREF, + rc = md_getattr_name(tgt->ltd_exp, &rid, NULL, NULL, + 1, valid | OBD_MD_FLCROSSREF, ea_size, suppgid, &req); ptlrpc_req_finished(*request); *request = req; @@ -1753,90 +1780,100 @@ repeat: fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \ NULL) -/* @tgt_exp is the export the metadata request is sent. - * @fid_exp is the export the cancel should be sent for the current fid. - * if @fid_exp is NULL, the export is found for the current fid. - * @op_data keeps the current fid, which is pointed through @flag. - * @mode, @bits -- lock match parameters. */ -static int lmv_early_cancel(struct lmv_obd *lmv, struct obd_export *tgt_exp, - struct obd_export *fid_exp, - struct md_op_data *op_data, - ldlm_mode_t mode, int bits, int flag) +static int lmv_early_cancel_slaves(struct obd_export *exp, + struct md_op_data *op_data, int op_tgt, + ldlm_mode_t mode, int bits, int flag) { - struct lu_fid *fid = md_op_data_fid(op_data, flag); - ldlm_policy_data_t policy = {{0}}; - int rc = 0; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + ldlm_policy_data_t policy = {{0}}; + struct lu_fid *op_fid; + struct lu_fid *st_fid; + struct lmv_tgt_desc *tgt; + struct lmv_object *obj; + int rc = 0; + int i; ENTRY; - if (!fid_is_sane(fid)) + op_fid = md_op_data_fid(op_data, flag); + if (!fid_is_sane(op_fid)) RETURN(0); - if (fid_exp == NULL) - fid_exp = lmv_find_export(lmv, fid); - - if (tgt_exp == fid_exp) { - /* The export is the same as on the target server, cancel - * will be sent along with the main metadata operation. */ - op_data->op_flags |= flag; - RETURN(0); - } - + obj = lmv_object_find(obd, op_fid); + if (obj == NULL) + RETURN(-EALREADY); + policy.l_inodebits.bits = bits; - rc = md_cancel_unused(fid_exp, fid, &policy, mode, LDLM_FL_ASYNC, NULL); - RETURN(rc); + for (i = 0; i < obj->lo_objcount; i++) { + tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds); + st_fid = &obj->lo_stripes[i].ls_fid; + if (op_tgt != tgt->ltd_idx) { + CDEBUG(D_INODE, "EARLY_CANCEL slave "DFID" -> mds #%d\n", + PFID(st_fid), tgt->ltd_idx); + rc = md_cancel_unused(tgt->ltd_exp, st_fid, &policy, + mode, LDLM_FL_ASYNC, NULL); + if (rc) + GOTO(out_put_obj, rc); + } else { + CDEBUG(D_INODE, + "EARLY_CANCEL skip operation target %d on "DFID"\n", + op_tgt, PFID(st_fid)); + /* + * Do not cancel locks for operation target, they will + * be handled later in underlaying layer when calling + * function we run on behalf of. + */ + *op_fid = *st_fid; + op_data->op_flags |= flag; + } + } + EXIT; +out_put_obj: + lmv_object_put(obj); + return rc; } -#ifdef EARLY_CANCEL_FOR_STRIPED_DIR_IS_READY -/* Check if the fid in @op_data pointed to by flag is of the same export(s) - * as @tgt_exp. Early cancels will be sent later by mdc code, otherwise, call - * md_cancel_unused for child export(s). */ -static int lmv_early_cancel_stripes(struct obd_export *exp, - struct obd_export *tgt_exp, - struct md_op_data *op_data, - ldlm_mode_t mode, int bits, int flag) +static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data, + int op_tgt, ldlm_mode_t mode, int bits, int flag) { - struct lu_fid *fid = md_op_data_fid(op_data, flag); - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *st_exp; - struct lmv_obj *obj; - int rc = 0; + struct lu_fid *fid = md_op_data_fid(op_data, flag); + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + ldlm_policy_data_t policy = {{0}}; + struct lmv_object *obj; + int rc = 0; ENTRY; if (!fid_is_sane(fid)) RETURN(0); - obj = lmv_obj_grab(obd, fid); + obj = lmv_object_find(obd, fid); if (obj) { - ldlm_policy_data_t policy = {{0}}; - struct lu_fid *st_fid; - int i; - - policy.l_inodebits.bits = bits; - for (i = 0; i < obj->lo_objcount; i++) { - st_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds); - st_fid = &obj->lo_inodes[i].li_fid; - if (tgt_exp != st_exp) { - rc = md_cancel_unused(st_exp, st_fid, &policy, - mode, LDLM_FL_ASYNC, - NULL); - if (rc) - break; - } else { - /* Some export matches to @tgt_exp, do cancel - * for its fid in mdc */ - *fid = *st_fid; - op_data->op_flags |= flag; - } - } - lmv_obj_put(obj); + rc = lmv_early_cancel_slaves(exp, op_data, op_tgt, mode, + bits, flag); + lmv_object_put(obj); } else { - rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data, - mode, bits, flag); + tgt = lmv_find_target(lmv, fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); + + if (tgt->ltd_idx != op_tgt) { + CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid)); + policy.l_inodebits.bits = bits; + rc = md_cancel_unused(tgt->ltd_exp, fid, &policy, + mode, LDLM_FL_ASYNC, NULL); + } else { + CDEBUG(D_INODE, + "EARLY_CANCEL skip operation target %d on "DFID"\n", + op_tgt, PFID(fid)); + op_data->op_flags |= flag; + rc = 0; + } + } RETURN(rc); } -#endif /* * llite passes fid of an target inode in op_data->op_fid1 and id of directory in @@ -1845,12 +1882,14 @@ static int lmv_early_cancel_stripes(struct obd_export *exp, static int lmv_link(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; - struct lmv_obj *obj; - int rc, loop = 0; - mdsno_t mds; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + struct lmv_object *obj; + int rc; + int loop = 0; + mdsno_t mds; + int sidx; ENTRY; rc = lmv_check_connect(obd); @@ -1861,25 +1900,22 @@ repeat: ++loop; LASSERT(loop <= 2); if (op_data->op_namelen != 0) { - int mea_idx; - - /* Usual link request */ - obj = lmv_obj_grab(obd, &op_data->op_fid2); + obj = lmv_object_find(obd, &op_data->op_fid2); if (obj) { - mea_idx = raw_name2idx(obj->lo_hashtype, + sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, op_data->op_name, op_data->op_namelen); - op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid; - mds = obj->lo_inodes[mea_idx].li_mds; - lmv_obj_put(obj); + op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid; + mds = obj->lo_stripes[sidx].ls_mds; + lmv_object_put(obj); } else { rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds); if (rc) RETURN(rc); } - CDEBUG(D_OTHER,"link "DFID":%*s to "DFID"\n", + CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n", PFID(&op_data->op_fid2), op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1)); } else { @@ -1887,27 +1923,31 @@ repeat: if (rc) RETURN(rc); - /* request from MDS to acquire i_links for inode by fid1 */ - CDEBUG(D_OTHER, "inc i_nlinks for "DFID"\n", + /* + * Request from MDS to acquire i_links for inode by fid1. + */ + CDEBUG(D_INODE, "Inc i_nlinks for "DFID"\n", PFID(&op_data->op_fid1)); } - CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n", + CDEBUG(D_INODE, "Forward to mds #"LPU64" ("DFID")\n", mds, PFID(&op_data->op_fid1)); op_data->op_fsuid = current->fsuid; op_data->op_fsgid = current->fsgid; op_data->op_cap = cfs_curproc_cap_pack(); + tgt = lmv_get_target(lmv, mds); - tgt_exp = lmv->tgts[mds].ltd_exp; if (op_data->op_namelen) { + /* + * Cancel UPDATE lock on child (fid1). + */ op_data->op_flags |= MF_MDC_CANCEL_FID2; - /* Cancel UPDATE lock on child (fid1). */ - rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data, LCK_EX, + rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX, MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1); } if (rc == 0) - rc = md_link(tgt_exp, op_data, request); + rc = md_link(tgt->ltd_exp, op_data, request); if (rc == -ERESTART) { LASSERT(*request != NULL); DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, @@ -1931,15 +1971,19 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, const char *old, int oldlen, const char *new, int newlen, struct ptlrpc_request **request) { - struct obd_export *tgt_exp = NULL, *src_exp; - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - int rc, mea_idx, loop = 0; - struct lmv_obj *obj; - mdsno_t mds1, mds2; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *src_tgt; + struct lmv_tgt_desc *tgt_tgt; + int rc; + int sidx; + int loop = 0; + struct lmv_object *obj; + mdsno_t mds1; + mdsno_t mds2; ENTRY; - CDEBUG(D_OTHER, "rename %*s in "DFID" to %*s in "DFID"\n", + CDEBUG(D_INODE, "RENAME %*s in "DFID" to %*s in "DFID"\n", oldlen, old, PFID(&op_data->op_fid1), newlen, new, PFID(&op_data->op_fid2)); @@ -1952,8 +1996,8 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, * MDS with old dir entry is asking another MDS to create name * there. */ - CDEBUG(D_OTHER, - "create %*s(%d/%d) in "DFID" pointing " + CDEBUG(D_INODE, + "Create %*s(%d/%d) in "DFID" pointing " "to "DFID"\n", newlen, new, oldlen, newlen, PFID(&op_data->op_fid2), PFID(&op_data->op_fid1)); @@ -1965,15 +2009,15 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, * Target directory can be split, sowe should forward request to * the right MDS. */ - obj = lmv_obj_grab(obd, &op_data->op_fid2); + obj = lmv_object_find(obd, &op_data->op_fid2); if (obj) { - mea_idx = raw_name2idx(obj->lo_hashtype, - obj->lo_objcount, - (char *)new, newlen); - op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid; - CDEBUG(D_OTHER, "Parent obj "DFID"\n", + sidx = raw_name2idx(obj->lo_hashtype, + obj->lo_objcount, + (char *)new, newlen); + op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid; + CDEBUG(D_INODE, "Parent obj "DFID"\n", PFID(&op_data->op_fid2)); - lmv_obj_put(obj); + lmv_object_put(obj); } goto request; } @@ -1981,37 +2025,33 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, repeat: ++loop; LASSERT(loop <= 2); - obj = lmv_obj_grab(obd, &op_data->op_fid1); + obj = lmv_object_find(obd, &op_data->op_fid1); if (obj) { - /* - * directory is already split, so we have to forward request to - * the right MDS. - */ - mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, - (char *)old, oldlen); - op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid; - mds1 = obj->lo_inodes[mea_idx].li_mds; - CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid1)); - lmv_obj_put(obj); + sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, + (char *)old, oldlen); + op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid; + mds1 = obj->lo_stripes[sidx].ls_mds; + CDEBUG(D_INODE, "Parent obj "DFID"\n", PFID(&op_data->op_fid1)); + lmv_object_put(obj); } else { rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds1); if (rc) RETURN(rc); } - obj = lmv_obj_grab(obd, &op_data->op_fid2); + obj = lmv_object_find(obd, &op_data->op_fid2); if (obj) { /* * Directory is already split, so we have to forward request to * the right MDS. */ - mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, + sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, (char *)new, newlen); - mds2 = obj->lo_inodes[mea_idx].li_mds; - op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid; - CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid2)); - lmv_obj_put(obj); + mds2 = obj->lo_stripes[sidx].ls_mds; + op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid; + CDEBUG(D_INODE, "Parent obj "DFID"\n", PFID(&op_data->op_fid2)); + lmv_object_put(obj); } else { rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds2); if (rc) @@ -2023,39 +2063,47 @@ request: op_data->op_fsgid = current->fsgid; op_data->op_cap = cfs_curproc_cap_pack(); - src_exp = lmv_get_export(lmv, mds1); - tgt_exp = lmv_get_export(lmv, mds2); + src_tgt = lmv_get_target(lmv, mds1); + tgt_tgt = lmv_get_target(lmv, mds2); if (oldlen) { - /* LOOKUP lock on src child (fid3) should also be cancelled for - * src_exp in mdc_rename. */ + /* + * LOOKUP lock on src child (fid3) should also be cancelled for + * src_tgt in mdc_rename. + */ op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3; - /* Cancel UPDATE locks on tgt parent (fid2), tgt_exp is its - * own export. */ - rc = lmv_early_cancel(lmv, src_exp, tgt_exp, op_data, LCK_EX, - MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2); + /* + * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its + * own target. + */ + rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, + LCK_EX, MDS_INODELOCK_UPDATE, + MF_MDC_CANCEL_FID2); - /* Cancel LOOKUP locks on tgt child (fid4) for parent tgt_exp.*/ - if (rc == 0) - rc = lmv_early_cancel(lmv, src_exp, tgt_exp, op_data, + /* + * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt. + */ + if (rc == 0) { + rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, LCK_EX, MDS_INODELOCK_LOOKUP, MF_MDC_CANCEL_FID4); + } - /* XXX: the case when child is a striped dir is not supported. - * Only the master stripe has all locks cancelled early. */ - /* Cancel all the locks on tgt child (fid4). */ + /* + * Cancel all the locks on tgt child (fid4). + */ if (rc == 0) - rc = lmv_early_cancel(lmv, src_exp, NULL, op_data, + rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, LCK_EX, MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID4); } if (rc == 0) - rc = md_rename(src_exp, op_data, old, oldlen, + rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen, new, newlen, request); if (rc == -ERESTART) { LASSERT(*request != NULL); - DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, + DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, "Got -ERESTART during rename!\n"); ptlrpc_req_finished(*request); *request = NULL; @@ -2076,41 +2124,42 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request, struct md_open_data **mod) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct ptlrpc_request *req; - struct obd_export *tgt_exp; - struct lmv_obj *obj; - int rc = 0, i; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct ptlrpc_request *req; + struct lmv_tgt_desc *tgt; + struct lmv_object *obj; + int rc = 0; + int i; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - obj = lmv_obj_grab(obd, &op_data->op_fid1); + obj = lmv_object_find(obd, &op_data->op_fid1); - CDEBUG(D_OTHER, "SETATTR for "DFID", valid 0x%x%s\n", + CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x%s\n", PFID(&op_data->op_fid1), op_data->op_attr.ia_valid, obj ? ", split" : ""); op_data->op_flags |= MF_MDC_CANCEL_FID1; if (obj) { for (i = 0; i < obj->lo_objcount; i++) { - op_data->op_fid1 = obj->lo_inodes[i].li_fid; + op_data->op_fid1 = obj->lo_stripes[i].ls_fid; - tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds); - if (IS_ERR(tgt_exp)) { - rc = PTR_ERR(tgt_exp); + tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds); + if (IS_ERR(tgt)) { + rc = PTR_ERR(tgt); break; } - rc = md_setattr(tgt_exp, op_data, ea, ealen, + rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2, ea2len, &req, mod); - if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid)) { + if (lu_fid_eq(&obj->lo_fid, &obj->lo_stripes[i].ls_fid)) { /* - * this is master object and this request should + * This is master object and this request should * be returned back to llite. */ *request = req; @@ -2121,13 +2170,13 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, if (rc) break; } - lmv_obj_put(obj); + lmv_object_put(obj); } else { - tgt_exp = lmv_find_export(lmv, &op_data->op_fid1); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + tgt = lmv_find_target(lmv, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - rc = md_setattr(tgt_exp, op_data, ea, ealen, ea2, + rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2, ea2len, request, mod); } RETURN(rc); @@ -2136,33 +2185,34 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid, struct obd_capa *oc, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; - int rc; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - tgt_exp = lmv_find_export(lmv, fid); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + tgt = lmv_find_target(lmv, fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - rc = md_sync(tgt_exp, fid, oc, request); + rc = md_sync(tgt->ltd_exp, fid, oc, request); RETURN(rc); } -/* main purpose of LMV blocking ast is to remove split directory LMV - * presentation object (struct lmv_obj) attached to the lock being revoked. */ -int lmv_blocking_ast(struct ldlm_lock *lock, - struct ldlm_lock_desc *desc, +/** + * Main purpose of LMV blocking ast is to remove split directory LMV + * presentation object (struct lmv_object) attached to the lock being revoked. + */ +int lmv_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag) { - struct lustre_handle lockh; - struct lmv_obj *obj; - int rc; + struct lustre_handle lockh; + struct lmv_object *obj; + int rc; ENTRY; switch (flag) { @@ -2175,17 +2225,19 @@ int lmv_blocking_ast(struct ldlm_lock *lock, } break; case LDLM_CB_CANCELING: - /* time to drop cached attrs for dirobj */ + /* + * Time to drop cached attrs for split directory object + */ obj = lock->l_ast_data; if (obj) { - CDEBUG(D_OTHER, "cancel %s on "LPU64"/"LPU64 + CDEBUG(D_INODE, "Cancel %s on "LPU64"/"LPU64 ", master "DFID"\n", lock->l_resource->lr_name.name[3] == 1 ? "LOOKUP" : "UPDATE", lock->l_resource->lr_name.name[0], lock->l_resource->lr_name.name[1], PFID(&obj->lo_fid)); - lmv_obj_put(obj); + lmv_object_put(obj); } break; default: @@ -2196,7 +2248,7 @@ int lmv_blocking_ast(struct ldlm_lock *lock, static void lmv_hash_adjust(__u64 *hash, __u64 hash_adj) { - __u64 val; + __u64 val; val = le64_to_cpu(*hash); if (val < hash_adj) @@ -2207,16 +2259,16 @@ static void lmv_hash_adjust(__u64 *hash, __u64 hash_adj) static __u32 lmv_node_rank(struct obd_export *exp, const struct lu_fid *fid) { - __u64 id; + __u64 id; struct obd_import *imp; /* - * XXX Hack: to get nid we assume that underlying obd device is mdc. + * XXX: to get nid we assume that underlying obd device is mdc. */ imp = class_exp2cliimp(exp); id = imp->imp_connection->c_self + fid_flatten(fid); - CDEBUG(D_INFO, "node rank: "LPX64" "DFID" "LPX64" "LPX64"\n", + CDEBUG(D_INODE, "Readpage node rank: "LPX64" "DFID" "LPX64" "LPX64"\n", imp->imp_connection->c_self, PFID(fid), id, id ^ (id >> 32)); return id ^ (id >> 32); @@ -2226,20 +2278,23 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, struct obd_capa *oc, __u64 offset64, struct page *page, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; - struct lu_fid rid = *fid; - struct lmv_obj *obj; - __u64 offset; - __u64 hash_adj = 0; - __u32 rank = 0; - __u64 seg_size = 0; - __u64 tgt_tmp = 0; - int tgt = 0; - int tgt0 = 0; - int rc; - int nr = 0; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lu_fid rid = *fid; + struct lmv_object *obj; + __u64 offset; + __u64 hash_adj = 0; + __u32 rank = 0; + __u64 seg_size = 0; + __u64 tgt_tmp = 0; + int tgt_idx = 0; + int tgt0_idx = 0; + int rc; + int nr = 0; + struct lmv_stripe *los; + struct lmv_tgt_desc *tgt; + struct lu_dirpage *dp; + struct lu_dirent *ent; ENTRY; offset = offset64; @@ -2248,10 +2303,7 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, if (rc) RETURN(rc); - CDEBUG(D_INFO, "READPAGE at "LPX64" from "DFID"\n", offset, PFID(&rid)); - - obj = lmv_obj_grab(obd, fid); - if (obj) { + CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n", offset, PFID(&rid)); /* * This case handle directory lookup in clustered metadata case (i.e. @@ -2273,24 +2325,21 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj * on hash values that we get. */ - - struct lmv_inode *loi; - - lmv_obj_lock(obj); - + obj = lmv_object_find_lock(obd, fid); + if (obj) { nr = obj->lo_objcount; LASSERT(nr > 0); seg_size = MAX_HASH_SIZE; do_div(seg_size, nr); - loi = obj->lo_inodes; - rank = lmv_node_rank(lmv_get_export(lmv, loi[0].li_mds), - fid) % nr; - tgt_tmp = offset; + los = obj->lo_stripes; + tgt = lmv_get_target(lmv, los[0].ls_mds); + rank = lmv_node_rank(tgt->ltd_exp, fid) % nr; + tgt_tmp = offset; do_div(tgt_tmp, seg_size); - tgt0 = do_div(tgt_tmp, nr); - tgt = (tgt0 + rank) % nr; + tgt0_idx = do_div(tgt_tmp, nr); + tgt_idx = (tgt0_idx + rank) % nr; - if (tgt < tgt0) + if (tgt_idx < tgt0_idx) /* * Wrap around. * @@ -2303,28 +2352,26 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, hash_adj += rank * seg_size; - CDEBUG(D_INFO, "hash_adj: %x "LPX64" "LPX64"/%x -> "LPX64"/%x\n", - rank, hash_adj, offset, tgt0, offset + hash_adj, tgt); + CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" " + LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj, + offset, tgt0_idx, offset + hash_adj, tgt_idx); offset = (offset + hash_adj) & MAX_HASH_SIZE; - rid = obj->lo_inodes[tgt].li_fid; - tgt_exp = lmv_get_export(lmv, loi[tgt].li_mds); + rid = obj->lo_stripes[tgt_idx].ls_fid; + tgt = lmv_get_target(lmv, los[tgt_idx].ls_mds); - CDEBUG(D_INFO, "forward to "DFID" with offset %lu i %d\n", - PFID(&rid), (unsigned long)offset, tgt); + CDEBUG(D_INODE, "Forward to "DFID" with offset %lu i %d\n", + PFID(&rid), (unsigned long)offset, tgt_idx); } else - tgt_exp = lmv_find_export(lmv, &rid); + tgt = lmv_find_target(lmv, &rid); - if (IS_ERR(tgt_exp)) - GOTO(cleanup, rc = PTR_ERR(tgt_exp)); + if (IS_ERR(tgt)) + GOTO(cleanup, rc = PTR_ERR(tgt)); - rc = md_readpage(tgt_exp, &rid, oc, offset, page, request); + rc = md_readpage(tgt->ltd_exp, &rid, oc, offset, page, request); if (rc) GOTO(cleanup, rc); if (obj) { - struct lu_dirpage *dp; - struct lu_dirent *ent; - dp = cfs_kmap(page); lmv_hash_adjust(&dp->ldp_hash_start, hash_adj); @@ -2335,134 +2382,72 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, ent = lu_dirent_next(ent)) lmv_hash_adjust(&ent->lde_hash, hash_adj); - if (tgt0 != nr - 1) { + if (tgt0_idx != nr - 1) { __u64 end; end = le64_to_cpu(dp->ldp_hash_end); if (end == DIR_END_OFF) { dp->ldp_hash_end = cpu_to_le32(seg_size * - (tgt0 + 1)); - CDEBUG(D_INFO, + (tgt0_idx + 1)); + CDEBUG(D_INODE, ""DFID" reset end "LPX64" tgt %d\n", PFID(&rid), - le64_to_cpu(dp->ldp_hash_end), tgt); + le64_to_cpu(dp->ldp_hash_end), tgt_idx); } } cfs_kunmap(page); } - /* - * Here we could remove "." and ".." from all pages which at not from - * master. But MDS has only "." and ".." for master dir. - */ EXIT; cleanup: - if (obj) { - lmv_obj_unlock(obj); - lmv_obj_put(obj); - } - return rc; -} - -static int lmv_unlink_slaves(struct obd_export *exp, - struct md_op_data *op_data, - struct ptlrpc_request **req) -{ - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_stripe_md *mea = op_data->op_mea1; - struct md_op_data *op_data2; - struct obd_export *tgt_exp; - int i, rc = 0; - ENTRY; - - OBD_ALLOC_PTR(op_data2); - if (op_data2 == NULL) - RETURN(-ENOMEM); - - op_data2->op_mode = S_IFDIR; - op_data2->op_fsuid = current->fsuid; - op_data2->op_fsgid = current->fsgid; - op_data2->op_bias = 0; - - LASSERT(mea != NULL); - for (i = 0; i < mea->mea_count; i++) { - memset(op_data2, 0, sizeof(*op_data2)); - op_data2->op_fid1 = mea->mea_ids[i]; - tgt_exp = lmv_find_export(lmv, &op_data2->op_fid1); - if (IS_ERR(tgt_exp)) - GOTO(out_free_op_data2, rc = PTR_ERR(tgt_exp)); - - if (tgt_exp == NULL) - continue; - - rc = md_unlink(tgt_exp, op_data2, req); - - CDEBUG(D_OTHER, "unlink slave "DFID" -> %d\n", - PFID(&mea->mea_ids[i]), rc); - - if (*req) { - ptlrpc_req_finished(*req); - *req = NULL; - } - if (rc) - GOTO(out_free_op_data2, rc); - } - - EXIT; -out_free_op_data2: - OBD_FREE_PTR(op_data2); + if (obj) + lmv_object_put_unlock(obj); return rc; } static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp = NULL; - struct lmv_obj *obj; - int rc, loop = 0; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt = NULL; + struct lmv_object *obj; + int rc; + int loop = 0; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - if (op_data->op_namelen == 0 && op_data->op_mea1 != NULL) { - /* mds asks to remove slave objects */ - rc = lmv_unlink_slaves(exp, op_data, request); - RETURN(rc); - } - repeat: ++loop; LASSERT(loop <= 2); if (op_data->op_namelen != 0) { - int mea_idx; + int sidx; - obj = lmv_obj_grab(obd, &op_data->op_fid1); + obj = lmv_object_find(obd, &op_data->op_fid1); if (obj) { - mea_idx = raw_name2idx(obj->lo_hashtype, - obj->lo_objcount, - op_data->op_name, - op_data->op_namelen); + sidx = raw_name2idx(obj->lo_hashtype, + obj->lo_objcount, + op_data->op_name, + op_data->op_namelen); op_data->op_bias &= ~MDS_CHECK_SPLIT; - op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid; - tgt_exp = lmv_get_export(lmv, - obj->lo_inodes[mea_idx].li_mds); - lmv_obj_put(obj); - CDEBUG(D_OTHER, "unlink '%*s' in "DFID" -> %u\n", + op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid; + tgt = lmv_get_target(lmv, + obj->lo_stripes[sidx].ls_mds); + lmv_object_put(obj); + CDEBUG(D_INODE, "UNLINK '%*s' in "DFID" -> %u\n", op_data->op_namelen, op_data->op_name, - PFID(&op_data->op_fid1), mea_idx); + PFID(&op_data->op_fid1), sidx); } } else { - CDEBUG(D_OTHER, "drop i_nlink on "DFID"\n", + CDEBUG(D_INODE, "Drop i_nlink on "DFID"\n", PFID(&op_data->op_fid1)); } - if (tgt_exp == NULL) { - tgt_exp = lmv_find_export(lmv, &op_data->op_fid1); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + if (tgt == NULL) { + tgt = lmv_find_target(lmv, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); op_data->op_bias |= MDS_CHECK_SPLIT; } @@ -2470,21 +2455,25 @@ repeat: op_data->op_fsgid = current->fsgid; op_data->op_cap = cfs_curproc_cap_pack(); - /* If child's fid is given, cancel unused locks for it if it is from - * another export than parent. */ + /* + * If child's fid is given, cancel unused locks for it if it is from + * another export than parent. + */ if (op_data->op_namelen) { - /* LOOKUP lock for child (fid3) should also be cancelled on - * parent tgt_exp in mdc_unlink(). */ + /* + * LOOKUP lock for child (fid3) should also be cancelled on + * parent tgt_tgt in mdc_unlink(). + */ op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3; - /* XXX: the case when child is a striped dir is not supported. - * Only the master stripe has all locks cancelled early. */ - /* Cancel FULL locks on child (fid3). */ - rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data, LCK_EX, + /* + * Cancel FULL locks on child (fid3). + */ + rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX, MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3); } if (rc == 0) - rc = md_unlink(tgt_exp, op_data, request); + rc = md_unlink(tgt->ltd_exp, op_data, request); if (rc == -ERESTART) { LASSERT(*request != NULL); DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, @@ -2503,45 +2492,9 @@ repeat: RETURN(rc); } -static int lmv_llog_init(struct obd_device *obd, struct obd_llog_group *olg, - struct obd_device *tgt, int count, - struct llog_catid *logid, struct obd_uuid *uuid) -{ -#if 0 - struct llog_ctxt *ctxt; - int rc; - ENTRY; - - LASSERT(group == OBD_LLOG_GROUP); - rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL, - &llog_client_ops); - if (rc == 0) { - ctxt = llog_group_get_ctxt(&obd->obd_olg, LLOG_CONFIG_REPL_CTXT); - llog_initiator_connect(ctxt, tgt); - llog_ctxt_put(ctxt); - } - RETURN(rc); -#else - return 0; -#endif -} - -static int lmv_llog_finish(struct obd_device *obd, int count) -{ - struct llog_ctxt *ctxt; - int rc = 0; - ENTRY; - - ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT); - if (ctxt) - rc = llog_cleanup(ctxt); - - RETURN(rc); -} - static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) { - int rc = 0; + int rc = 0; switch (stage) { case OBD_CLEANUP_EARLY: @@ -2560,16 +2513,17 @@ static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) } static int lmv_get_info(struct obd_export *exp, __u32 keylen, - void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm) + void *key, __u32 *vallen, void *val, + struct lov_stripe_md *lsm) { - struct obd_device *obd; - struct lmv_obd *lmv; - int rc = 0; + struct obd_device *obd; + struct lmv_obd *lmv; + int rc = 0; ENTRY; obd = class_exp2obd(exp); if (obd == NULL) { - CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n", + CDEBUG(D_IOCTL, "Invalid client cookie "LPX64"\n", exp->exp_handle.h_cookie); RETURN(-EINVAL); } @@ -2587,7 +2541,9 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen, for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) { - /* all tgts should be connected when this get called. */ + /* + * All tgts should be connected when this gets called. + */ if (!tgts || !tgts->ltd_exp) { CERROR("target not setup?\n"); continue; @@ -2603,8 +2559,10 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen, if (rc) RETURN(rc); - /* forwarding this request to first MDS, it should know LOV - * desc. */ + /* + * Forwarding this request to first MDS, it should know LOV + * desc. + */ rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key, vallen, val, NULL); if (!rc && KEY_IS(KEY_CONN_DATA)) { @@ -2614,7 +2572,7 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen, RETURN(rc); } - CDEBUG(D_IOCTL, "invalid key\n"); + CDEBUG(D_IOCTL, "Invalid key\n"); RETURN(-EINVAL); } @@ -2630,7 +2588,7 @@ int lmv_set_info_async(struct obd_export *exp, obd_count keylen, obd = class_exp2obd(exp); if (obd == NULL) { - CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n", + CDEBUG(D_IOCTL, "Invalid client cookie "LPX64"\n", exp->exp_handle.h_cookie); RETURN(-EINVAL); } @@ -2661,10 +2619,12 @@ int lmv_set_info_async(struct obd_export *exp, obd_count keylen, int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, struct lov_stripe_md *lsm) { - struct obd_device *obd = class_exp2obd(exp); - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_stripe_md *meap, *lsmp; - int mea_size, i; + struct obd_device *obd = class_exp2obd(exp); + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_stripe_md *meap; + struct lmv_stripe_md *lsmp; + int mea_size; + int i; ENTRY; mea_size = lmv_get_easize(lmv); @@ -2708,12 +2668,13 @@ int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, struct lov_mds_md *lmm, int lmm_size) { - struct obd_device *obd = class_exp2obd(exp); - struct lmv_stripe_md **tmea = (struct lmv_stripe_md **)lsmp; - struct lmv_stripe_md *mea = (struct lmv_stripe_md *)lmm; - struct lmv_obd *lmv = &obd->u.lmv; - int mea_size, i; - __u32 magic; + struct obd_device *obd = class_exp2obd(exp); + struct lmv_stripe_md **tmea = (struct lmv_stripe_md **)lsmp; + struct lmv_stripe_md *mea = (struct lmv_stripe_md *)lmm; + struct lmv_obd *lmv = &obd->u.lmv; + int mea_size; + int i; + __u32 magic; ENTRY; mea_size = lmv_get_easize(lmv); @@ -2741,7 +2702,10 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, { magic = le32_to_cpu(mea->mea_magic); } else { - /* old mea is not handled here */ + /* + * Old mea is not handled here. + */ + CERROR("Old not supportable EA is found\n"); LBUG(); } @@ -2756,14 +2720,15 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, RETURN(mea_size); } -static int lmv_cancel_unused(struct obd_export *exp, - const struct lu_fid *fid, - ldlm_policy_data_t *policy, - ldlm_mode_t mode, int flags, void *opaque) +static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, + ldlm_policy_data_t *policy, ldlm_mode_t mode, + int flags, void *opaque) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - int rc = 0, err, i; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + int rc = 0; + int err; + int i; ENTRY; LASSERT(fid != NULL); @@ -2782,11 +2747,12 @@ static int lmv_cancel_unused(struct obd_export *exp, int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + int rc; ENTRY; - RETURN(md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data)); + rc = md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data); + RETURN(rc); } ldlm_mode_t lmv_lock_match(struct obd_export *exp, int flags, @@ -2794,18 +2760,20 @@ ldlm_mode_t lmv_lock_match(struct obd_export *exp, int flags, ldlm_policy_data_t *policy, ldlm_mode_t mode, struct lustre_handle *lockh) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - ldlm_mode_t rc; - int i; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + ldlm_mode_t rc; + int i; ENTRY; - CDEBUG(D_OTHER, "lock match for "DFID"\n", PFID(fid)); + CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid)); - /* with CMD every object can have two locks in different namespaces: + /* + * With CMD every object can have two locks in different namespaces: * lookup lock in space of mds storing direntry and update/open lock in * space of mds storing inode. Thus we check all targets, not only that - * one fid was created in. */ + * one fid was created in. + */ for (i = 0; i < lmv->desc.ld_tgt_count; i++) { rc = md_lock_match(lmv->tgts[i].ltd_exp, flags, fid, type, policy, mode, lockh); @@ -2820,10 +2788,9 @@ int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, struct obd_export *dt_exp, struct obd_export *md_exp, struct lustre_md *md) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - int rc; - + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + int rc; ENTRY; rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, dt_exp, md_exp, md); RETURN(rc); @@ -2831,12 +2798,10 @@ int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; ENTRY; - /* XXX LOV STACKING */ if (md->mea) obd_free_memmd(exp, (void *)&md->mea); RETURN(md_free_lustre_md(lmv->tgts[0].ltd_exp, md)); @@ -2846,32 +2811,31 @@ int lmv_set_open_replay_data(struct obd_export *exp, struct obd_client_handle *och, struct ptlrpc_request *open_req) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; - + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; ENTRY; - tgt_exp = lmv_find_export(lmv, &och->och_fid); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + tgt = lmv_find_target(lmv, &och->och_fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - RETURN(md_set_open_replay_data(tgt_exp, och, open_req)); + RETURN(md_set_open_replay_data(tgt->ltd_exp, och, open_req)); } int lmv_clear_open_replay_data(struct obd_export *exp, struct obd_client_handle *och) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; ENTRY; - tgt_exp = lmv_find_export(lmv, &och->och_fid); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + tgt = lmv_find_target(lmv, &och->och_fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - RETURN(md_clear_open_replay_data(tgt_exp, och)); + RETURN(md_clear_open_replay_data(tgt->ltd_exp, och)); } static int lmv_get_remote_perm(struct obd_export *exp, @@ -2879,44 +2843,42 @@ static int lmv_get_remote_perm(struct obd_export *exp, struct obd_capa *oc, __u32 suppgid, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; - int rc; - + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - tgt_exp = lmv_find_export(lmv, fid); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); - - rc = md_get_remote_perm(tgt_exp, fid, oc, suppgid, request); + tgt = lmv_find_target(lmv, fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); + rc = md_get_remote_perm(tgt->ltd_exp, fid, oc, suppgid, request); RETURN(rc); } static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc, renew_capa_cb_t cb) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; - int rc; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - tgt_exp = lmv_find_export(lmv, &oc->c_capa.lc_fid); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + tgt = lmv_find_target(lmv, &oc->c_capa.lc_fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - rc = md_renew_capa(tgt_exp, oc, cb); + rc = md_renew_capa(tgt->ltd_exp, oc, cb); RETURN(rc); } @@ -2924,24 +2886,53 @@ int lmv_intent_getattr_async(struct obd_export *exp, struct md_enqueue_info *minfo, struct ldlm_enqueue_info *einfo) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; - int rc; + struct md_op_data *op_data = &minfo->mi_data; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_object *obj; + struct lmv_tgt_desc *tgt; + int rc; + int sidx; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - if (fid_is_zero(&minfo->mi_data.op_fid2)) - tgt_exp = lmv_find_export(lmv, &minfo->mi_data.op_fid1); - else - tgt_exp = lmv_find_export(lmv, &minfo->mi_data.op_fid2); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + if (!fid_is_sane(&op_data->op_fid2)) { + obj = lmv_object_find(obd, &op_data->op_fid1); + if (obj && op_data->op_namelen) { + sidx = raw_name2idx(obj->lo_hashtype, + obj->lo_objcount, + (char *)op_data->op_name, + op_data->op_namelen); + op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid; + tgt = lmv_get_target(lmv, + obj->lo_stripes[sidx].ls_mds); + CDEBUG(D_INODE, + "Choose slave dir ("DFID") -> mds #%d\n", + PFID(&op_data->op_fid1), tgt->ltd_idx); + } else { + tgt = lmv_find_target(lmv, &op_data->op_fid1); + } + if (obj) + lmv_object_put(obj); + } else { + op_data->op_fid1 = op_data->op_fid2; + tgt = lmv_find_target(lmv, &op_data->op_fid2); + op_data->op_bias = MDS_CROSS_REF; + /* + * Unfortunately, we have to lie to MDC/MDS to retrieve + * attributes llite needs. + */ + if (minfo->mi_it.it_op & IT_LOOKUP) + minfo->mi_it.it_op = IT_GETATTR; + } + + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - rc = md_intent_getattr_async(tgt_exp, minfo, einfo); + rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo); RETURN(rc); } @@ -2949,21 +2940,21 @@ int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, struct lu_fid *fid) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; - int rc; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - tgt_exp = lmv_find_export(lmv, fid); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); + tgt = lmv_find_target(lmv, fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - rc = md_revalidate_lock(tgt_exp, it, fid); + rc = md_revalidate_lock(tgt->ltd_exp, it, fid); RETURN(rc); } @@ -2977,8 +2968,6 @@ struct obd_ops lmv_obd_ops = { .o_connect = lmv_connect, .o_disconnect = lmv_disconnect, .o_statfs = lmv_statfs, - .o_llog_init = lmv_llog_init, - .o_llog_finish = lmv_llog_finish, .o_get_info = lmv_get_info, .o_set_info_async = lmv_set_info_async, .o_packmd = lmv_packmd, @@ -3024,13 +3013,13 @@ struct md_ops lmv_md_ops = { int __init lmv_init(void) { struct lprocfs_static_vars lvars; - int rc; + int rc; - obj_cache = cfs_mem_cache_create("lmv_objects", - sizeof(struct lmv_obj), - 0, 0); - if (!obj_cache) { - CERROR("error allocating lmv objects cache\n"); + lmv_object_cache = cfs_mem_cache_create("lmv_objects", + sizeof(struct lmv_object), + 0, 0); + if (!lmv_object_cache) { + CERROR("Error allocating lmv objects cache\n"); return -ENOMEM; } @@ -3038,7 +3027,7 @@ int __init lmv_init(void) rc = class_register_type(&lmv_obd_ops, &lmv_md_ops, lvars.module_vars, LUSTRE_LMV_NAME, NULL); if (rc) - cfs_mem_cache_destroy(obj_cache); + cfs_mem_cache_destroy(lmv_object_cache); return rc; } @@ -3046,14 +3035,12 @@ int __init lmv_init(void) #ifdef __KERNEL__ static void lmv_exit(void) { - int rc; - class_unregister_type(LUSTRE_LMV_NAME); - rc = cfs_mem_cache_destroy(obj_cache); - LASSERTF(rc == 0, - "can't free lmv objects cache, %d object(s)" - "still in use\n", atomic_read(&obj_cache_count)); + LASSERTF(atomic_read(&lmv_object_count) == 0, + "Can't free lmv objects cache, %d object(s) busy\n", + atomic_read(&lmv_object_count)); + cfs_mem_cache_destroy(lmv_object_cache); } MODULE_AUTHOR("Sun Microsystems, Inc. "); diff --git a/lustre/lmv/lmv_object.c b/lustre/lmv/lmv_object.c index f567fc9..f10c23c 100644 --- a/lustre/lmv/lmv_object.c +++ b/lustre/lmv/lmv_object.c @@ -59,35 +59,31 @@ #include #include "lmv_internal.h" -/* objects cache. */ -extern cfs_mem_cache_t *obj_cache; -extern atomic_t obj_cache_count; +extern cfs_mem_cache_t *lmv_object_cache; +extern atomic_t lmv_object_count; -/* object list and its guard. */ static CFS_LIST_HEAD(obj_list); static spinlock_t obj_list_lock = SPIN_LOCK_UNLOCKED; -/* creates new obj on passed @fid and @mea. */ -struct lmv_obj * -lmv_obj_alloc(struct obd_device *obd, - const struct lu_fid *fid, - struct lmv_stripe_md *mea) +struct lmv_object *lmv_object_alloc(struct obd_device *obd, + const struct lu_fid *fid, + struct lmv_stripe_md *mea) { - int i; - struct lmv_obj *obj; - unsigned int obj_size; - struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_obd *lmv = &obd->u.lmv; + unsigned int obj_size; + struct lmv_object *obj; + int i; LASSERT(mea->mea_magic == MEA_MAGIC_LAST_CHAR || mea->mea_magic == MEA_MAGIC_ALL_CHARS || mea->mea_magic == MEA_MAGIC_HASH_SEGMENT); - OBD_SLAB_ALLOC(obj, obj_cache, CFS_ALLOC_STD, + OBD_SLAB_ALLOC(obj, lmv_object_cache, CFS_ALLOC_STD, sizeof(*obj)); if (!obj) return NULL; - atomic_inc(&obj_cache_count); + atomic_inc(&lmv_object_count); obj->lo_fid = *fid; obj->lo_obd = obd; @@ -98,141 +94,138 @@ lmv_obj_alloc(struct obd_device *obd, atomic_set(&obj->lo_count, 0); obj->lo_objcount = mea->mea_count; - obj_size = sizeof(struct lmv_inode) * + obj_size = sizeof(struct lmv_stripe) * lmv->desc.ld_tgt_count; - OBD_ALLOC(obj->lo_inodes, obj_size); - if (!obj->lo_inodes) + OBD_ALLOC(obj->lo_stripes, obj_size); + if (!obj->lo_stripes) goto err_obj; - memset(obj->lo_inodes, 0, obj_size); + memset(obj->lo_stripes, 0, obj_size); - /* put all ids in */ + CDEBUG(D_INODE, "Allocate object for "DFID"\n", + PFID(fid)); for (i = 0; i < mea->mea_count; i++) { int rc; - CDEBUG(D_OTHER, "subobj "DFID"\n", + CDEBUG(D_INODE, "Process subobject "DFID"\n", PFID(&mea->mea_ids[i])); - obj->lo_inodes[i].li_fid = mea->mea_ids[i]; - LASSERT(fid_is_sane(&obj->lo_inodes[i].li_fid)); + obj->lo_stripes[i].ls_fid = mea->mea_ids[i]; + LASSERT(fid_is_sane(&obj->lo_stripes[i].ls_fid)); /* * Cache slave mds number to use it in all cases it is needed * instead of constant lookup. */ - rc = lmv_fld_lookup(lmv, &obj->lo_inodes[i].li_fid, - &obj->lo_inodes[i].li_mds); + rc = lmv_fld_lookup(lmv, &obj->lo_stripes[i].ls_fid, + &obj->lo_stripes[i].ls_mds); if (rc) goto err_obj; } return obj; - err_obj: OBD_FREE(obj, sizeof(*obj)); return NULL; } -/* destroy passed @obj. */ -void -lmv_obj_free(struct lmv_obj *obj) +void lmv_object_free(struct lmv_object *obj) { - struct lmv_obd *lmv = &obj->lo_obd->u.lmv; - unsigned int obj_size; + struct lmv_obd *lmv = &obj->lo_obd->u.lmv; + unsigned int obj_size; LASSERT(!atomic_read(&obj->lo_count)); - obj_size = sizeof(struct lmv_inode) * + obj_size = sizeof(struct lmv_stripe) * lmv->desc.ld_tgt_count; - OBD_FREE(obj->lo_inodes, obj_size); - OBD_SLAB_FREE(obj, obj_cache, sizeof(*obj)); - atomic_dec(&obj_cache_count); + OBD_FREE(obj->lo_stripes, obj_size); + OBD_SLAB_FREE(obj, lmv_object_cache, sizeof(*obj)); + atomic_dec(&lmv_object_count); } -static void -__lmv_obj_add(struct lmv_obj *obj) +static void __lmv_object_add(struct lmv_object *obj) { atomic_inc(&obj->lo_count); list_add(&obj->lo_list, &obj_list); } -void -lmv_obj_add(struct lmv_obj *obj) +void lmv_object_add(struct lmv_object *obj) { spin_lock(&obj_list_lock); - __lmv_obj_add(obj); + __lmv_object_add(obj); spin_unlock(&obj_list_lock); } -static void -__lmv_obj_del(struct lmv_obj *obj) +static void __lmv_object_del(struct lmv_object *obj) { list_del(&obj->lo_list); - lmv_obj_free(obj); + lmv_object_free(obj); } -void -lmv_obj_del(struct lmv_obj *obj) +void lmv_object_del(struct lmv_object *obj) { spin_lock(&obj_list_lock); - __lmv_obj_del(obj); + __lmv_object_del(obj); spin_unlock(&obj_list_lock); } -static struct lmv_obj * -__lmv_obj_get(struct lmv_obj *obj) +static struct lmv_object *__lmv_object_get(struct lmv_object *obj) { LASSERT(obj != NULL); atomic_inc(&obj->lo_count); return obj; } -struct lmv_obj * -lmv_obj_get(struct lmv_obj *obj) +struct lmv_object *lmv_object_get(struct lmv_object *obj) { spin_lock(&obj_list_lock); - __lmv_obj_get(obj); + __lmv_object_get(obj); spin_unlock(&obj_list_lock); return obj; } -static void -__lmv_obj_put(struct lmv_obj *obj) +static void __lmv_object_put(struct lmv_object *obj) { LASSERT(obj); if (atomic_dec_and_test(&obj->lo_count)) { - CDEBUG(D_OTHER, "last reference to "DFID" - " + CDEBUG(D_INODE, "Last reference to "DFID" - " "destroying\n", PFID(&obj->lo_fid)); - __lmv_obj_del(obj); + __lmv_object_del(obj); } } -void -lmv_obj_put(struct lmv_obj *obj) +void lmv_object_put(struct lmv_object *obj) { spin_lock(&obj_list_lock); - __lmv_obj_put(obj); + __lmv_object_put(obj); spin_unlock(&obj_list_lock); } -static struct lmv_obj * -__lmv_obj_grab(struct obd_device *obd, const struct lu_fid *fid) +void lmv_object_put_unlock(struct lmv_object *obj) +{ + lmv_object_unlock(obj); + lmv_object_put(obj); +} + +static struct lmv_object *__lmv_object_find(struct obd_device *obd, const struct lu_fid *fid) { - struct lmv_obj *obj; - struct list_head *cur; + struct lmv_object *obj; + struct list_head *cur; list_for_each(cur, &obj_list) { - obj = list_entry(cur, struct lmv_obj, lo_list); + obj = list_entry(cur, struct lmv_object, lo_list); - /* check if object is in progress of destroying. If so - skip - * it. */ + /* + * Check if object is in destroying phase. If so - skip + * it. + */ if (obj->lo_state & O_FREEING) continue; /* - * we should make sure, that we have found object belong to + * We should make sure, that we have found object belong to * passed obd. It is possible that, object manager will have two * objects with the same fid belong to different obds, if client * and mds runs on the same host. May be it is good idea to have @@ -241,84 +234,99 @@ __lmv_obj_grab(struct obd_device *obd, const struct lu_fid *fid) if (obj->lo_obd != obd) continue; - /* check if this is what we're looking for. */ + /* + * Check if this is what we're looking for. + */ if (lu_fid_eq(&obj->lo_fid, fid)) - return __lmv_obj_get(obj); + return __lmv_object_get(obj); } return NULL; } -struct lmv_obj * -lmv_obj_grab(struct obd_device *obd, const struct lu_fid *fid) +struct lmv_object *lmv_object_find(struct obd_device *obd, + const struct lu_fid *fid) { - struct lmv_obj *obj; + struct lmv_object *obj; ENTRY; spin_lock(&obj_list_lock); - obj = __lmv_obj_grab(obd, fid); + obj = __lmv_object_find(obd, fid); spin_unlock(&obj_list_lock); RETURN(obj); } -/* looks in objects list for an object that matches passed @fid. If it is not - * found -- creates it using passed @mea and puts onto list. */ -static struct lmv_obj * -__lmv_obj_create(struct obd_device *obd, const struct lu_fid *fid, - struct lmv_stripe_md *mea) +struct lmv_object *lmv_object_find_lock(struct obd_device *obd, + const struct lu_fid *fid) { - struct lmv_obj *new, *obj; + struct lmv_object *obj; ENTRY; - obj = lmv_obj_grab(obd, fid); + obj = lmv_object_find(obd, fid); + if (obj) + lmv_object_lock(obj); + + RETURN(obj); +} + +static struct lmv_object *__lmv_object_create(struct obd_device *obd, + const struct lu_fid *fid, + struct lmv_stripe_md *mea) +{ + struct lmv_object *new; + struct lmv_object *obj; + ENTRY; + + obj = lmv_object_find(obd, fid); if (obj) RETURN(obj); - /* no such object yet, allocate and initialize it. */ - new = lmv_obj_alloc(obd, fid, mea); + new = lmv_object_alloc(obd, fid, mea); if (!new) RETURN(NULL); - /* check if someone create it already while we were dealing with - * allocating @obj. */ + /* + * Check if someone created it already while we were dealing with + * allocating @obj. + */ spin_lock(&obj_list_lock); - obj = __lmv_obj_grab(obd, fid); + obj = __lmv_object_find(obd, fid); if (obj) { - /* someone created it already - put @obj and getting out. */ + /* + * Someone created it already - put @obj and getting out. + */ spin_unlock(&obj_list_lock); - lmv_obj_free(new); + lmv_object_free(new); RETURN(obj); } - __lmv_obj_add(new); - __lmv_obj_get(new); + __lmv_object_add(new); + __lmv_object_get(new); spin_unlock(&obj_list_lock); - CDEBUG(D_OTHER, "new obj in lmv cache: "DFID"\n", + CDEBUG(D_INODE, "New obj in lmv cache: "DFID"\n", PFID(fid)); RETURN(new); - } -/* creates object from passed @fid and @mea. If @mea is NULL, it will be - * obtained from correct MDT and used for constructing the object. */ -struct lmv_obj * -lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid, - struct lmv_stripe_md *mea) +struct lmv_object *lmv_object_create(struct obd_export *exp, + const struct lu_fid *fid, + struct lmv_stripe_md *mea) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct ptlrpc_request *req = NULL; - struct obd_export *tgt_exp; - struct lmv_obj *obj; - struct lustre_md md; - int mealen, rc; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct ptlrpc_request *req = NULL; + struct lmv_tgt_desc *tgt; + struct lmv_object *obj; + struct lustre_md md; + int mealen; + int rc; ENTRY; - CDEBUG(D_OTHER, "get mea for "DFID" and create lmv obj\n", + CDEBUG(D_INODE, "Get mea for "DFID" and create lmv obj\n", PFID(fid)); md.mea = NULL; @@ -326,18 +334,20 @@ lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid, if (mea == NULL) { __u64 valid; - CDEBUG(D_OTHER, "mea isn't passed in, get it now\n"); + CDEBUG(D_INODE, "Mea isn't passed in, get it now\n"); mealen = lmv_get_easize(lmv); - /* time to update mea of parent fid */ + /* + * Time to update mea of parent fid. + */ md.mea = NULL; valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA; - tgt_exp = lmv_find_export(lmv, fid); - if (IS_ERR(tgt_exp)) - GOTO(cleanup, obj = (void *)tgt_exp); + tgt = lmv_find_target(lmv, fid); + if (IS_ERR(tgt)) + GOTO(cleanup, obj = (void *)tgt); - rc = md_getattr(tgt_exp, fid, NULL, valid, mealen, &req); + rc = md_getattr(tgt->ltd_exp, fid, NULL, valid, mealen, &req); if (rc) { CERROR("md_getattr() failed, error %d\n", rc); GOTO(cleanup, obj = ERR_PTR(rc)); @@ -345,7 +355,7 @@ lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid, rc = md_get_lustre_md(exp, req, NULL, exp, &md); if (rc) { - CERROR("mdc_get_lustre_md() failed, error %d\n", rc); + CERROR("md_get_lustre_md() failed, error %d\n", rc); GOTO(cleanup, obj = ERR_PTR(rc)); } @@ -355,15 +365,16 @@ lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid, mea = md.mea; } - /* got mea, now create obj for it. */ - obj = __lmv_obj_create(obd, fid, mea); + /* + * Got mea, now create obj for it. + */ + obj = __lmv_object_create(obd, fid, mea); if (!obj) { CERROR("Can't create new object "DFID"\n", PFID(fid)); GOTO(cleanup, obj = ERR_PTR(-ENOMEM)); } - /* XXX LOV STACKING */ if (md.mea != NULL) obd_free_memmd(exp, (void *)&md.mea); @@ -374,35 +385,26 @@ cleanup: return obj; } -/* - * looks for object with @fid and orders to destroy it. It is possible the object - * will not be destroyed right now, because it is still using by someone. In - * this case it will be marked as "freeing" and will not be accessible anymore - * for subsequent callers of lmv_obj_grab(). - */ -int -lmv_obj_delete(struct obd_export *exp, const struct lu_fid *fid) +int lmv_object_delete(struct obd_export *exp, const struct lu_fid *fid) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obj *obj; - int rc = 0; + struct obd_device *obd = exp->exp_obd; + struct lmv_object *obj; + int rc = 0; ENTRY; spin_lock(&obj_list_lock); - obj = __lmv_obj_grab(obd, fid); + obj = __lmv_object_find(obd, fid); if (obj) { obj->lo_state |= O_FREEING; - __lmv_obj_put(obj); - __lmv_obj_put(obj); + __lmv_object_put(obj); + __lmv_object_put(obj); rc = 1; } spin_unlock(&obj_list_lock); - RETURN(rc); } -int -lmv_obj_setup(struct obd_device *obd) +int lmv_object_setup(struct obd_device *obd) { ENTRY; LASSERT(obd != NULL); @@ -413,11 +415,11 @@ lmv_obj_setup(struct obd_device *obd) RETURN(0); } -void -lmv_obj_cleanup(struct obd_device *obd) +void lmv_object_cleanup(struct obd_device *obd) { - struct list_head *cur, *tmp; - struct lmv_obj *obj; + struct list_head *cur; + struct list_head *tmp; + struct lmv_object *obj; ENTRY; CDEBUG(D_INFO, "LMV object manager cleanup (%s)\n", @@ -425,17 +427,17 @@ lmv_obj_cleanup(struct obd_device *obd) spin_lock(&obj_list_lock); list_for_each_safe(cur, tmp, &obj_list) { - obj = list_entry(cur, struct lmv_obj, lo_list); + obj = list_entry(cur, struct lmv_object, lo_list); if (obj->lo_obd != obd) continue; obj->lo_state |= O_FREEING; if (atomic_read(&obj->lo_count) > 1) { - CERROR("obj "DFID" has count > 1 (%d)\n", + CERROR("Object "DFID" has count (%d)\n", PFID(&obj->lo_fid), atomic_read(&obj->lo_count)); } - __lmv_obj_put(obj); + __lmv_object_put(obj); } spin_unlock(&obj_list_lock); EXIT; diff --git a/lustre/lmv/lproc_lmv.c b/lustre/lmv/lproc_lmv.c index 364943c..e880d23 100644 --- a/lustre/lmv/lproc_lmv.c +++ b/lustre/lmv/lproc_lmv.c @@ -49,8 +49,8 @@ static struct lprocfs_vars lprocfs_obd_vars[] = { {0} }; static int lmv_rd_numobd(char *page, char **start, off_t off, int count, int *eof, void *data) { - struct obd_device *dev = (struct obd_device*)data; - struct lmv_desc *desc; + struct obd_device *dev = (struct obd_device*)data; + struct lmv_desc *desc; LASSERT(dev != NULL); desc = &dev->u.lmv.desc; @@ -59,11 +59,83 @@ static int lmv_rd_numobd(char *page, char **start, off_t off, int count, } +static const char *placement_name[] = { + [PLACEMENT_CHAR_POLICY] = "CHAR", + [PLACEMENT_NID_POLICY] = "NID" +}; + +static placement_policy_t placement_name2policy(char *name, int len) +{ + int i; + + for (i = 0; i < PLACEMENT_MAX_POLICY; i++) { + if (!strncmp(placement_name[i], name, len)) + return i; + } + return PLACEMENT_INVAL_POLICY; +} + +static const char *placement_policy2name(placement_policy_t placement) +{ + LASSERT(placement < PLACEMENT_MAX_POLICY); + return placement_name[placement]; +} + +static int lmv_rd_placement(char *page, char **start, off_t off, int count, + int *eof, void *data) +{ + struct obd_device *dev = (struct obd_device*)data; + struct lmv_obd *lmv; + + LASSERT(dev != NULL); + lmv = &dev->u.lmv; + *eof = 1; + return snprintf(page, count, "%s\n", + placement_policy2name(lmv->lmv_placement)); + +} + +#define MAX_POLICY_STRING_SIZE 64 + +static int lmv_wr_placement(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct obd_device *dev = (struct obd_device *)data; + char dummy[MAX_POLICY_STRING_SIZE + 1]; + int len = count; + placement_policy_t policy; + struct lmv_obd *lmv; + + if (copy_from_user(dummy, buffer, MAX_POLICY_STRING_SIZE)) + return -EFAULT; + + LASSERT(dev != NULL); + lmv = &dev->u.lmv; + + if (len > MAX_POLICY_STRING_SIZE) + len = MAX_POLICY_STRING_SIZE; + + if (dummy[len - 1] == '\n') + len--; + dummy[len] = '\0'; + + policy = placement_name2policy(dummy, len); + if (policy != PLACEMENT_INVAL_POLICY) { + spin_lock(&lmv->lmv_lock); + lmv->lmv_placement = policy; + spin_unlock(&lmv->lmv_lock); + } else { + CERROR("Invalid placement policy \"%s\"!\n", dummy); + return -EINVAL; + } + return count; +} + static int lmv_rd_activeobd(char *page, char **start, off_t off, int count, int *eof, void *data) { - struct obd_device* dev = (struct obd_device*)data; - struct lmv_desc *desc; + struct obd_device *dev = (struct obd_device*)data; + struct lmv_desc *desc; LASSERT(dev != NULL); desc = &dev->u.lmv.desc; @@ -74,8 +146,8 @@ static int lmv_rd_activeobd(char *page, char **start, off_t off, int count, static int lmv_rd_desc_uuid(char *page, char **start, off_t off, int count, int *eof, void *data) { - struct obd_device *dev = (struct obd_device*) data; - struct lmv_obd *lmv; + struct obd_device *dev = (struct obd_device*) data; + struct lmv_obd *lmv; LASSERT(dev != NULL); lmv = &dev->u.lmv; @@ -85,9 +157,8 @@ static int lmv_rd_desc_uuid(char *page, char **start, off_t off, int count, static void *lmv_tgt_seq_start(struct seq_file *p, loff_t *pos) { - struct obd_device *dev = p->private; - struct lmv_obd *lmv = &dev->u.lmv; - + struct obd_device *dev = p->private; + struct lmv_obd *lmv = &dev->u.lmv; return (*pos >= lmv->desc.ld_tgt_count) ? NULL : &(lmv->tgts[*pos]); } @@ -99,37 +170,37 @@ static void lmv_tgt_seq_stop(struct seq_file *p, void *v) static void *lmv_tgt_seq_next(struct seq_file *p, void *v, loff_t *pos) { - struct obd_device *dev = p->private; - struct lmv_obd *lmv = &dev->u.lmv; - + struct obd_device *dev = p->private; + struct lmv_obd *lmv = &dev->u.lmv; ++*pos; return (*pos >=lmv->desc.ld_tgt_count) ? NULL : &(lmv->tgts[*pos]); } static int lmv_tgt_seq_show(struct seq_file *p, void *v) { - struct lmv_tgt_desc *tgt = v; - struct obd_device *dev = p->private; - struct lmv_obd *lmv = &dev->u.lmv; - int idx = tgt - &(lmv->tgts[0]); + struct lmv_tgt_desc *tgt = v; + struct obd_device *dev = p->private; + struct lmv_obd *lmv = &dev->u.lmv; + int idx = tgt - &(lmv->tgts[0]); return seq_printf(p, "%d: %s %sACTIVE\n", idx, tgt->ltd_uuid.uuid, tgt->ltd_active ? "" : "IN"); } struct seq_operations lmv_tgt_sops = { - .start = lmv_tgt_seq_start, - .stop = lmv_tgt_seq_stop, - .next = lmv_tgt_seq_next, - .show = lmv_tgt_seq_show, + .start = lmv_tgt_seq_start, + .stop = lmv_tgt_seq_stop, + .next = lmv_tgt_seq_next, + .show = lmv_tgt_seq_show, }; static int lmv_target_seq_open(struct inode *inode, struct file *file) { - struct proc_dir_entry *dp = PDE(inode); - struct seq_file *seq; - int rc = seq_open(file, &lmv_tgt_sops); - + struct proc_dir_entry *dp = PDE(inode); + struct seq_file *seq; + int rc; + + rc = seq_open(file, &lmv_tgt_sops); if (rc) return rc; @@ -140,29 +211,30 @@ static int lmv_target_seq_open(struct inode *inode, struct file *file) } struct lprocfs_vars lprocfs_lmv_obd_vars[] = { - { "numobd", lmv_rd_numobd, 0, 0 }, - { "activeobd", lmv_rd_activeobd, 0, 0 }, - { "uuid", lprocfs_rd_uuid, 0, 0 }, - { "desc_uuid", lmv_rd_desc_uuid, 0, 0 }, + { "numobd", lmv_rd_numobd, 0, 0 }, + { "placement", lmv_rd_placement, lmv_wr_placement, 0 }, + { "activeobd", lmv_rd_activeobd, 0, 0 }, + { "uuid", lprocfs_rd_uuid, 0, 0 }, + { "desc_uuid", lmv_rd_desc_uuid, 0, 0 }, { 0 } }; static struct lprocfs_vars lprocfs_lmv_module_vars[] = { - { "num_refs", lprocfs_rd_numrefs, 0, 0 }, + { "num_refs", lprocfs_rd_numrefs, 0, 0 }, { 0 } }; struct file_operations lmv_proc_target_fops = { - .owner = THIS_MODULE, - .open = lmv_target_seq_open, - .read = seq_read, - .llseek = seq_lseek, - .release = seq_release, + .owner = THIS_MODULE, + .open = lmv_target_seq_open, + .read = seq_read, + .llseek = seq_lseek, + .release = seq_release, }; #endif /* LPROCFS */ void lprocfs_lmv_init_vars(struct lprocfs_static_vars *lvars) { - lvars->module_vars = lprocfs_lmv_module_vars; - lvars->obd_vars = lprocfs_lmv_obd_vars; + lvars->module_vars = lprocfs_lmv_module_vars; + lvars->obd_vars = lprocfs_lmv_obd_vars; } diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 545ec84..5eecc29 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -777,7 +777,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, struct md_object *next = mdt_object_child(parent); struct lu_fid *child_fid = &info->mti_tmp_fid1; struct lu_name *lname = NULL; - const char *name; + const char *name = NULL; int namelen = 0; struct mdt_lock_handle *lhp; struct ldlm_lock *lock; @@ -798,27 +798,30 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, namelen = req_capsule_get_size(info->mti_pill, &RMF_NAME, RCL_CLIENT) - 1; - LASSERT(namelen >= 0); - - /* XXX: "namelen == 0" is for getattr by fid (OBD_CONNECT_ATTRFID), - * otherwise do not allow empty name, that is the name must contain - * at least one character and the terminating '\0'*/ - if (namelen == 0) { - reqbody =req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); - LASSERT(fid_is_sane(&reqbody->fid2)); - name = NULL; - - CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", " - "ldlm_rep = %p\n", - PFID(mdt_object_fid(parent)), PFID(&reqbody->fid2), - ldlm_rep); - } else { - lname = mdt_name(info->mti_env, (char *)name, namelen); - CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, " - "ldlm_rep = %p\n", - PFID(mdt_object_fid(parent)), name, ldlm_rep); - } + if (!info->mti_cross_ref) { + /* + * XXX: Check for "namelen == 0" is for getattr by fid + * (OBD_CONNECT_ATTRFID), otherwise do not allow empty name, + * that is the name must contain at least one character and + * the terminating '\0' + */ + if (namelen == 0) { + reqbody = req_capsule_client_get(info->mti_pill, + &RMF_MDT_BODY); + LASSERT(fid_is_sane(&reqbody->fid2)); + name = NULL; + CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", " + "ldlm_rep = %p\n", + PFID(mdt_object_fid(parent)), PFID(&reqbody->fid2), + ldlm_rep); + } else { + lname = mdt_name(info->mti_env, (char *)name, namelen); + CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, " + "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)), + name, ldlm_rep); + } + } mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD); rc = mdt_object_exists(parent); @@ -827,10 +830,10 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, &parent->mot_obj.mo_lu, "Parent doesn't exist!\n"); RETURN(-ESTALE); - } else + } else if (!info->mti_cross_ref) { LASSERTF(rc > 0, "Parent "DFID" is on remote server\n", PFID(mdt_object_fid(parent))); - + } if (lname) { rc = mdt_raw_lookup(info, parent, lname, ldlm_rep); if (rc != 0) { @@ -1244,7 +1247,7 @@ static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page, continue; fid_le_to_cpu(lf, &ent->lde_fid); - if (le32_to_cpu(ent->lde_hash) & MAX_HASH_HIGHEST_BIT) + if (le64_to_cpu(ent->lde_hash) & MAX_HASH_HIGHEST_BIT) ma->ma_attr.la_mode = S_IFDIR; else ma->ma_attr.la_mode = 0; @@ -1254,7 +1257,7 @@ static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page, memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen)); lname = mdt_name(info->mti_env, name, - le16_to_cpu(ent->lde_namelen) + 1); + le16_to_cpu(ent->lde_namelen)); ma->ma_attr_flags |= MDS_PERM_BYPASS; rc = mdo_name_insert(info->mti_env, md_object_next(&object->mot_obj), @@ -1392,9 +1395,9 @@ static int mdt_readpage(struct mdt_thread_info *info) * reqbody->nlink contains number bytes to read. */ rdpg->rp_hash = reqbody->size; - if ((__u64)rdpg->rp_hash != reqbody->size) { - CERROR("Invalid hash: %#llx != %#llx\n", - (__u64)rdpg->rp_hash, reqbody->size); + if (rdpg->rp_hash != reqbody->size) { + CERROR("Invalid hash: "LPX64" != "LPX64"\n", + rdpg->rp_hash, reqbody->size); RETURN(-EFAULT); } rdpg->rp_count = reqbody->nlink; @@ -1800,6 +1803,17 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, LASSERT(lh->mlh_type != MDT_PDO_LOCK); } + if (lh->mlh_type == MDT_PDO_LOCK) { + /* check for exists after object is locked */ + if (mdt_object_exists(o) == 0) { + /* Non-existent object shouldn't have PDO lock */ + RETURN(-ESTALE); + } else { + /* Non-dir object shouldn't have PDO lock */ + LASSERT(S_ISDIR(lu_object_attr(&o->mot_obj.mo_lu))); + } + } + memset(policy, 0, sizeof(*policy)); fid_build_reg_res_name(mdt_object_fid(o), res_id); @@ -1835,7 +1849,7 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, /* * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is * going to be sent to client. If it is - mdt_intent_policy() path will - * fix it up and turns FL_LOCAL flag off. + * fix it up and turn FL_LOCAL flag off. */ rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB); @@ -1843,16 +1857,6 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, if (rc) GOTO(out, rc); - if (lh->mlh_type == MDT_PDO_LOCK) { - /* check for exists after object is locked */ - if (mdt_object_exists(o) == 0) { - /* Non-existent object shouldn't have PDO lock */ - rc = -ESTALE; - } else { - /* Non-dir object shouldn't have PDO lock */ - LASSERT(S_ISDIR(lu_object_attr(&o->mot_obj.mo_lu))); - } - } out: if (rc) mdt_object_unlock(info, o, lh, 1); diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c index b3dc462..c2abdf1 100644 --- a/lustre/mdt/mdt_lib.c +++ b/lustre/mdt/mdt_lib.c @@ -690,7 +690,8 @@ static __u64 mdt_attr_valid_xlate(__u64 in, struct mdt_reint_record *rr, in &= ~(ATTR_MODE|ATTR_UID|ATTR_GID|ATTR_SIZE|ATTR_BLOCKS| ATTR_ATIME|ATTR_MTIME|ATTR_CTIME|ATTR_FROM_OPEN| ATTR_ATIME_SET|ATTR_CTIME_SET|ATTR_MTIME_SET| - ATTR_ATTR_FLAG|ATTR_RAW|MDS_OPEN_OWNEROVERRIDE); + ATTR_ATTR_FLAG|ATTR_RAW|MDS_OPEN_OWNEROVERRIDE| + ATTR_FORCE|ATTR_KILL_SUID); if (in != 0) CERROR("Unknown attr bits: %#llx\n", in); return out; @@ -848,9 +849,14 @@ static int mdt_create_unpack(struct mdt_thread_info *info) req_capsule_client_get(pill, &RMF_CAPA1)); mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA); - rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); - rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1; - LASSERT(rr->rr_name && rr->rr_namelen > 0); + if (!info->mti_cross_ref) { + rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); + rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1; + LASSERT(rr->rr_name && rr->rr_namelen > 0); + } else { + rr->rr_name = NULL; + rr->rr_namelen = 0; + } #ifdef CONFIG_FS_POSIX_ACL if (sp->sp_cr_flags & MDS_CREATE_RMT_ACL) { @@ -938,13 +944,14 @@ static int mdt_link_unpack(struct mdt_thread_info *info) mdt_set_capainfo(info, 1, rr->rr_fid2, req_capsule_client_get(pill, &RMF_CAPA2)); + info->mti_spec.sp_ck_split = !!(rec->lk_bias & MDS_CHECK_SPLIT); + info->mti_cross_ref = !!(rec->lk_bias & MDS_CROSS_REF); rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); if (rr->rr_name == NULL) RETURN(-EFAULT); rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1; - LASSERT(rr->rr_namelen > 0); - info->mti_spec.sp_ck_split = !!(rec->lk_bias & MDS_CHECK_SPLIT); - info->mti_cross_ref = !!(rec->lk_bias & MDS_CROSS_REF); + if (!info->mti_cross_ref) + LASSERT(rr->rr_namelen > 0); rc = mdt_dlmreq_unpack(info); RETURN(rc); @@ -985,13 +992,18 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info) mdt_set_capainfo(info, 0, rr->rr_fid1, req_capsule_client_get(pill, &RMF_CAPA1)); - rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); - if (rr->rr_name == NULL) - RETURN(-EFAULT); - rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1; - LASSERT(rr->rr_namelen > 0); - info->mti_spec.sp_ck_split = !!(rec->ul_bias & MDS_CHECK_SPLIT); info->mti_cross_ref = !!(rec->ul_bias & MDS_CROSS_REF); + if (!info->mti_cross_ref) { + rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); + rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1; + if (rr->rr_name == NULL || rr->rr_namelen == 0) + RETURN(-EFAULT); + } else { + rr->rr_name = NULL; + rr->rr_namelen = 0; + + } + info->mti_spec.sp_ck_split = !!(rec->ul_bias & MDS_CHECK_SPLIT); if (rec->ul_bias & MDS_VTX_BYPASS) ma->ma_attr_flags |= MDS_VTX_BYPASS; else @@ -1040,16 +1052,16 @@ static int mdt_rename_unpack(struct mdt_thread_info *info) mdt_set_capainfo(info, 1, rr->rr_fid2, req_capsule_client_get(pill, &RMF_CAPA2)); + info->mti_spec.sp_ck_split = !!(rec->rn_bias & MDS_CHECK_SPLIT); + info->mti_cross_ref = !!(rec->rn_bias & MDS_CROSS_REF); rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT); if (rr->rr_name == NULL || rr->rr_tgt == NULL) RETURN(-EFAULT); rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1; - LASSERT(rr->rr_namelen > 0); rr->rr_tgtlen = req_capsule_get_size(pill, &RMF_SYMTGT, RCL_CLIENT) - 1; - LASSERT(rr->rr_tgtlen > 0); - info->mti_spec.sp_ck_split = !!(rec->rn_bias & MDS_CHECK_SPLIT); - info->mti_cross_ref = !!(rec->rn_bias & MDS_CROSS_REF); + if (!info->mti_cross_ref) + LASSERT(rr->rr_namelen > 0 && rr->rr_tgtlen > 0); if (rec->rn_bias & MDS_VTX_BYPASS) ma->ma_attr_flags |= MDS_VTX_BYPASS; else diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 51f87c2..ea232c8 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -470,11 +470,21 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) RETURN(err_serious(-ENOENT)); - /* step 1: lock the parent */ + /* + * step 1: lock the parent. Note, this may be child in case of + * remote operation denoted by ->mti_cross_ref flag. + */ parent_lh = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name, - rr->rr_namelen); - + if (info->mti_cross_ref) { + /* + * Init reg lock for cross ref case when we need to do only + * ref del locally. + */ + mdt_lock_reg_init(parent_lh, LCK_PW); + } else { + mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name, + rr->rr_namelen); + } mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh, MDS_INODELOCK_UPDATE); if (IS_ERR(mp)) { @@ -500,7 +510,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA); rc = mo_ref_del(info->mti_env, mdt_object_child(mp), ma); - mdt_handle_last_unlink(info, mp, ma); + if (rc == 0) + mdt_handle_last_unlink(info, mp, ma); } else rc = 0; GOTO(out_unlock_parent, rc); diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index e0e11b0..9fd874a 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -1373,8 +1373,11 @@ static void *__req_capsule_get(struct req_capsule *pill, [RCL_SERVER] = "server" }; + LASSERT(pill != NULL); + LASSERT(pill != LP_POISON); fmt = pill->rc_fmt; LASSERT(fmt != NULL); + LASSERT(fmt != LP_POISON); LASSERT(__req_format_is_sane(fmt)); offset = __req_capsule_offset(pill, field, loc); diff --git a/lustre/tests/acceptance-small.sh b/lustre/tests/acceptance-small.sh index d7955da..7e4a9b5 100755 --- a/lustre/tests/acceptance-small.sh +++ b/lustre/tests/acceptance-small.sh @@ -87,7 +87,7 @@ for NAME in $CONFIGS; do export CLIENTMODSONLY=true fi - assert_env mds_HOST MDS_MKFS_OPTS MDSDEV + assert_env mds_HOST MDS_MKFS_OPTS assert_env ost_HOST OST_MKFS_OPTS OSTCOUNT assert_env FSNAME MOUNT MOUNT2 diff --git a/lustre/tests/cfg/lmv.sh b/lustre/tests/cfg/lmv.sh index e964af7..10ba95f 100644 --- a/lustre/tests/cfg/lmv.sh +++ b/lustre/tests/cfg/lmv.sh @@ -47,6 +47,15 @@ DIR=${DIR:-$MOUNT} DIR1=${DIR:-$MOUNT1} DIR2=${DIR2:-$MOUNT2} +if [ $UID -ne 0 ]; then + log "running as non-root uid $UID" + RUNAS_ID="$UID" + RUNAS="" +else + RUNAS_ID=${RUNAS_ID:-500} + RUNAS=${RUNAS:-"runas -u $RUNAS_ID"} +fi + PDSH=${PDSH:-no_dsh} FAILURE_MODE=${FAILURE_MODE:-SOFT} # or HARD POWER_DOWN=${POWER_DOWN:-"powerman --off"} diff --git a/lustre/tests/cfg/local.sh b/lustre/tests/cfg/local.sh index 5305b2d..aa8bc2a 100644 --- a/lustre/tests/cfg/local.sh +++ b/lustre/tests/cfg/local.sh @@ -13,8 +13,12 @@ CLIENTS="" TMP=${TMP:-/tmp} DAEMONSIZE=${DAEMONSIZE:-500} -MDSDEV=${MDSDEV:-$TMP/${FSNAME}-mdt1} MDSCOUNT=${MDSCOUNT:-1} +[ $MDSCOUNT -gt 4 ] && MDSCOUNT=4 +for num in $(seq $MDSCOUNT); do + eval mds${num}_HOST=\$\{mds${num}_HOST:-$mds_HOST\} + eval mds${num}failover_HOST=\$\{mds${num}failover_HOST:-$mdsfailover_HOST\} +done MDSDEVBASE=${MDSDEVBASE:-$TMP/${FSNAME}-mdt} MDSSIZE=${MDSSIZE:-100000} MDSOPT=${MDSOPT:-"--mountfsoptions=acl"} @@ -56,7 +60,8 @@ MKFSOPT="" MDSOPT=$MDSOPT" --param lov.stripecount=$STRIPES_PER_OBJ" [ "x$L_GETIDENTITY" != "x" ] && MDSOPT=$MDSOPT" --param mdt.identity_upcall=$L_GETIDENTITY" -MDS_MKFS_OPTS="--mgs --mdt --fsname=$FSNAME --device-size=$MDSSIZE --param sys.timeout=$TIMEOUT $MKFSOPT $MDSOPT $MDS_MKFS_OPTS" +MDS_MKFS_OPTS="--mgs --mdt --fsname=$FSNAME --device-size=$MDSSIZE --param sys.timeout=$TIMEOUT $MKFSOPT $MDSOPT" +MDSn_MKFS_OPTS="--mgsnode=$MGSNID --mdt --fsname=$FSNAME --device-size=$MDSSIZE --param sys.timeout=$TIMEOUT $MKFSOPT $MDSOPT" MKFSOPT="" [ "x$OSTJOURNALSIZE" != "x" ] && diff --git a/lustre/tests/conf-sanity.sh b/lustre/tests/conf-sanity.sh index 6fec4c5..5f4ce6e 100644 --- a/lustre/tests/conf-sanity.sh +++ b/lustre/tests/conf-sanity.sh @@ -11,13 +11,11 @@ set -e ONLY=${ONLY:-"$*"} -# These tests don't apply to mountconf -MOUNTCONFSKIP="10 11 12 13 13b 14 15" # bug number for skipped test: 13739 HEAD_EXCEPT=" 32a 32b " # bug number for skipped test: -ALWAYS_EXCEPT=" $CONF_SANITY_EXCEPT $MOUNTCONFSKIP $HEAD_EXCEPT" +ALWAYS_EXCEPT=" $CONF_SANITY_EXCEPT $HEAD_EXCEPT" # UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT! SRCDIR=`dirname $0` @@ -27,9 +25,6 @@ PTLDEBUG=${PTLDEBUG:--1} SAVE_PWD=$PWD LUSTRE=${LUSTRE:-`dirname $0`/..} RLUSTRE=${RLUSTRE:-$LUSTRE} -MOUNTLUSTRE=${MOUNTLUSTRE:-/sbin/mount.lustre} -MKFSLUSTRE=${MKFSLUSTRE:-/usr/sbin/mkfs.lustre} -HOSTNAME=`hostname` . $LUSTRE/tests/test-framework.sh init_test_env $@ @@ -40,7 +35,7 @@ fi # use small MDS + OST size to speed formatting time MDSSIZE=40000 OSTSIZE=40000 -. ${CONFIG:=$LUSTRE/tests/cfg/local.sh} +. ${CONFIG:=$LUSTRE/tests/cfg/$NAME.sh} # [ "$SLOW" = "no" ] && EXCEPT_SLOW="0 1 2 3 6 7 15 18 24b 25 30 31 32 33 34a " @@ -52,12 +47,13 @@ reformat() { } writeconf() { - local facet=mds + local facet=$SINGLEMDS + local dev=${facet}_dev shift stop ${facet} -f rm -f ${facet}active # who knows if/where $TUNEFS is installed? Better reformat if it fails... - do_facet ${facet} "$TUNEFS --writeconf $MDSDEV" || echo "tunefs failed, reformatting instead" && reformat + do_facet ${facet} "$TUNEFS --writeconf ${!dev}" || echo "tunefs failed, reformatting instead" && reformat } gen_config() { @@ -72,14 +68,16 @@ gen_config() { } start_mds() { - echo "start mds service on `facet_active_host mds`" - start mds $MDSDEV $MDS_MOUNT_OPTS || return 94 + local facet=$SINGLEMDS + local dev=${facet}_dev + echo "start mds service on `facet_active_host $facet`" + start $facet ${!dev} $MDS_MOUNT_OPTS || return 94 } stop_mds() { - echo "stop mds service on `facet_active_host mds`" + echo "stop mds service on `facet_active_host $SINGLEMDS`" # These tests all use non-failover stop - stop mds -f || return 97 + stop $SINGLEMDS -f || return 97 } start_ost() { @@ -385,263 +383,11 @@ test_9() { run_test 9 "test ptldebug and subsystem for mkfs" -test_10() { - echo "generate configuration with the same name for node and mds" - OLDXMLCONFIG=$XMLCONFIG - XMLCONFIG="broken.xml" - [ -f "$XMLCONFIG" ] && rm -f $XMLCONFIG - facet="mds" - rm -f ${facet}active - add_facet $facet - echo "the name for node and mds is the same" - do_lmc --add mds --node ${facet}_facet --mds ${facet}_facet \ - --dev $MDSDEV --size $MDSSIZE || return $? - do_lmc --add lov --mds ${facet}_facet --lov lov1 --stripe_sz \ - $STRIPE_BYTES --stripe_cnt $STRIPES_PER_OBJ \ - --stripe_pattern 0 || return $? - add_ost ost --lov lov1 --dev $OSTDEV --size $OSTSIZE - facet="client" - add_facet $facet --lustre_upcall $UPCALL - do_lmc --add mtpt --node ${facet}_facet --mds mds_facet \ - --lov lov1 --path $MOUNT - - echo "mount lustre" - start_ost - start_mds - mount_client $MOUNT - check_mount || return 41 - cleanup || return $? - - echo "Success!" - XMLCONFIG=$OLDXMLCONFIG -} -run_test 10 "mount lustre with the same name for node and mds" - -test_11() { - OLDXMLCONFIG=$XMLCONFIG - XMLCONFIG="conf11.xml" - - [ -f "$XMLCONFIG" ] && rm -f $XMLCONFIG - add_mds mds --dev $MDSDEV --size $MDSSIZE - add_ost ost --dev $OSTDEV --size $OSTSIZE - add_client client mds --path $MOUNT --ost ost_svc || return $? - echo "Default lov config success!" - - [ -f "$XMLCONFIG" ] && rm -f $XMLCONFIG - add_mds mds --dev $MDSDEV --size $MDSSIZE - add_ost ost --dev $OSTDEV --size $OSTSIZE - add_client client mds --path $MOUNT && return $? - echo "--add mtpt with neither --lov nor --ost will return error" - - echo "" - echo "Success!" - XMLCONFIG=$OLDXMLCONFIG -} -run_test 11 "use default lov configuration (should return error)" - -test_12() { - OLDXMLCONFIG=$XMLCONFIG - XMLCONFIG="batch.xml" - BATCHFILE="batchfile" - - # test double quote - [ -f "$XMLCONFIG" ] && rm -f $XMLCONFIG - [ -f "$BATCHFILE" ] && rm -f $BATCHFILE - echo "--add net --node $HOSTNAME --nid $HOSTNAME --nettype tcp" > $BATCHFILE - echo "--add mds --node $HOSTNAME --mds mds1 --mkfsoptions \"-I 128\"" >> $BATCHFILE - # --mkfsoptions "-I 128" - do_lmc -m $XMLCONFIG --batch $BATCHFILE || return $? - if [ `sed -n '/>-I 128 $BATCHFILE - echo "--add mds --node $HOSTNAME --mds mds1 --mkfsoptions \"-I 128" >> $BATCHFILE - # --mkfsoptions "-I 128 - do_lmc -m $XMLCONFIG --batch $BATCHFILE && return $? - echo "unmatched double quote should return error" - - # test single quote - rm -f $BATCHFILE - echo "--add net --node $HOSTNAME --nid $HOSTNAME --nettype tcp" > $BATCHFILE - echo "--add mds --node $HOSTNAME --mds mds1 --mkfsoptions '-I 128'" >> $BATCHFILE - # --mkfsoptions '-I 128' - do_lmc -m $XMLCONFIG --batch $BATCHFILE || return $? - if [ `sed -n '/>-I 128 $BATCHFILE - echo "--add mds --node $HOSTNAME --mds mds1 --mkfsoptions '-I 128" >> $BATCHFILE - # --mkfsoptions '-I 128 - do_lmc -m $XMLCONFIG --batch $BATCHFILE && return $? - echo "unmatched single quote should return error" - - # test backslash - rm -f $BATCHFILE - echo "--add net --node $HOSTNAME --nid $HOSTNAME --nettype tcp" > $BATCHFILE - echo "--add mds --node $HOSTNAME --mds mds1 --mkfsoptions \-\I\ \128" >> $BATCHFILE - # --mkfsoptions \-\I\ \128 - do_lmc -m $XMLCONFIG --batch $BATCHFILE || return $? - if [ `sed -n '/>-I 128 $BATCHFILE - echo "--add mds --node $HOSTNAME --mds mds1 --mkfsoptions -I\ 128\\" >> $BATCHFILE - # --mkfsoptions -I\ 128\ - do_lmc -m $XMLCONFIG --batch $BATCHFILE && return $? - echo "backslash followed by nothing should return error" - - rm -f $BATCHFILE - XMLCONFIG=$OLDXMLCONFIG -} -run_test 12 "lmc --batch, with single/double quote, backslash in batchfile" - -test_13a() { # was test_13 - OLDXMLCONFIG=$XMLCONFIG - XMLCONFIG="conf13-1.xml" - - # check long uuid will be truncated properly and uniquely - echo "To generate XML configuration file(with long ost name): $XMLCONFIG" - [ -f "$XMLCONFIG" ] && rm -f $XMLCONFIG - do_lmc --add net --node $HOSTNAME --nid $HOSTNAME --nettype tcp - do_lmc --add mds --node $HOSTNAME --mds mds1_name_longer_than_31characters - do_lmc --add mds --node $HOSTNAME --mds mds2_name_longer_than_31characters - if [ ! -f "$XMLCONFIG" ]; then - echo "Error:no file $XMLCONFIG created!" - return 1 - fi - EXPECTEDMDS1UUID="e_longer_than_31characters_UUID" - EXPECTEDMDS2UUID="longer_than_31characters_UUID_2" - FOUNDMDS1UUID=`awk -F"'" '/ $SECONDXMLCONFIG || return $? - echo "Generate the second XML configuration file" - gen_config - # don't compare .xml mtime, it will always be different - if [ `sed -e "s/mtime[^ ]*//" $XMLCONFIG | diff - $SECONDXMLCONFIG | wc -l` -eq 0 ]; then - echo "Success:multiple invocations for lmc generate same XML file" - else - echo "Error: multiple invocations for lmc generate different XML file" - return 1 - fi - - rm -f $XMLCONFIG $SECONDXMLCONFIG - XMLCONFIG=$OLDXMLCONFIG -} -run_test 13b "check lmc generates consistent .xml file" - -test_14() { - rm -f $XMLCONFIG - - # create xml file with --mkfsoptions for ost - echo "create xml file with --mkfsoptions for ost" - add_mds mds --dev $MDSDEV --size $MDSSIZE - add_lov lov1 mds --stripe_sz $STRIPE_BYTES\ - --stripe_cnt $STRIPES_PER_OBJ --stripe_pattern 0 - add_ost ost --lov lov1 --dev $OSTDEV --size $OSTSIZE \ - --mkfsoptions "-Llabel_conf_14" - add_client client mds --lov lov1 --path $MOUNT - - FOUNDSTRING=`awk -F"<" '//{print $2}' $XMLCONFIG` - EXPECTEDSTRING="mkfsoptions>-Llabel_conf_14" - if [ "$EXPECTEDSTRING" != "$FOUNDSTRING" ]; then - echo "Error: expected: $EXPECTEDSTRING; found: $FOUNDSTRING" - return 1 - fi - echo "Success:mkfsoptions for ost written to xml file correctly." - - # mount lustre to test lconf mkfsoptions-parsing - echo "mount lustre" - start_ost - start_mds - mount_client $MOUNT || return $? - if [ -z "`do_facet ost1 dumpe2fs -h $OSTDEV | grep label_conf_14`" ]; then - echo "Error: the mkoptions not applied to mke2fs of ost." - return 1 - fi - cleanup - echo "lconf mkfsoptions for ost success" - - gen_config -} -run_test 14 "test mkfsoptions of ost for lmc and lconf" - -cleanup_15() { - trap 0 - [ -f $MOUNTLUSTRE ] && echo "remove $MOUNTLUSTRE" && rm -f $MOUNTLUSTRE - if [ -f $MOUNTLUSTRE.sav ]; then - echo "return original $MOUNTLUSTRE.sav to $MOUNTLUSTRE" - mv $MOUNTLUSTRE.sav $MOUNTLUSTRE - fi -} - -# this only tests the kernel mount command, not anything about lustre. -test_15() { - MOUNTLUSTRE=${MOUNTLUSTRE:-/sbin/mount.lustre} - start_ost - start_mds - - echo "mount lustre on ${MOUNT} without $MOUNTLUSTRE....." - if [ -f "$MOUNTLUSTRE" ]; then - echo "save $MOUNTLUSTRE to $MOUNTLUSTRE.sav" - mv $MOUNTLUSTRE $MOUNTLUSTRE.sav && trap cleanup_15 EXIT INT - if [ -f $MOUNTLUSTRE ]; then - skip "$MOUNTLUSTRE cannot be moved, skipping test" - return 0 - fi - fi - - mount_client $MOUNT && error "mount succeeded" && return 1 - echo "mount lustre on $MOUNT without $MOUNTLUSTRE failed as expected" - cleanup_15 - cleanup || return $? -} -run_test 15 "zconf-mount without /sbin/mount.lustre (should return error)" - # LOGS/PENDING do not exist anymore since CMD3 test_16() { - TMPMTPT="${TMP}/conf16" - + local TMPMTPT="${TMP}/conf16" + local dev=${SINGLEMDS}_dev + local MDSDEV=${!dev} if [ ! -e "$MDSDEV" ]; then log "no $MDSDEV existing, so mount Lustre to create one" setup @@ -652,7 +398,7 @@ test_16() { [ -f "$MDSDEV" ] && LOOPOPT="-o loop" log "change the mode of $MDSDEV/OBJECTS to 555" - do_facet mds "mkdir -p $TMPMTPT && + do_facet $SINGLEMDS "mkdir -p $TMPMTPT && mount $LOOPOPT -t $FSTYPE $MDSDEV $TMPMTPT && chmod 555 $TMPMTPT/OBJECTS && umount $TMPMTPT" || return $? @@ -663,7 +409,7 @@ test_16() { cleanup || return $? log "read the mode of OBJECTS and check if they has been changed properly" - EXPECTEDOBJECTSMODE=`do_facet mds "debugfs -R 'stat OBJECTS' $MDSDEV 2> /dev/null" | grep 'Mode: ' | sed -e "s/.*Mode: *//" -e "s/ *Flags:.*//"` + EXPECTEDOBJECTSMODE=`do_facet $SINGLEMDS "debugfs -R 'stat OBJECTS' $MDSDEV 2> /dev/null" | grep 'Mode: ' | sed -e "s/.*Mode: *//" -e "s/ *Flags:.*//"` if [ "$EXPECTEDOBJECTSMODE" = "0777" ]; then log "Success:Lustre change the mode of OBJECTS correctly" @@ -674,6 +420,9 @@ test_16() { run_test 16 "verify that lustre will correct the mode of OBJECTS" test_17() { + local dev=${SINGLEMDS}_dev + local MDSDEV=${!dev} + if [ ! -e "$MDSDEV" ]; then echo "no $MDSDEV existing, so mount Lustre to create one" setup @@ -682,7 +431,7 @@ test_17() { fi echo "Remove mds config log" - do_facet mds "debugfs -w -R 'unlink CONFIGS/$FSNAME-MDT0000' $MDSDEV || return \$?" || return $? + do_facet $SINGLEMDS "debugfs -w -R 'unlink CONFIGS/$FSNAME-MDT0000' $MDSDEV || return \$?" || return $? start_ost start_mds && return 42 @@ -693,6 +442,9 @@ run_test 17 "Verify failed mds_postsetup won't fail assertion (2936) (should ret test_18() { [ "$FSTYPE" != "ldiskfs" ] && skip "not needed for FSTYPE=$FSTYPE" && return + local dev=${SINGLEMDS}_dev + local MDSDEV=${!dev} + local MIN=2000000 local OK= @@ -843,7 +595,7 @@ run_test 22 "start a client before osts (should return errs)" test_23a() { # was test_23 setup # fail mds - stop mds + stop $SINGLEMDS # force down client so that recovering mds waits for reconnect local running=$(grep -c $MOUNT /proc/mounts) || true if [ $running -ne 0 ]; then @@ -958,7 +710,7 @@ test_24a() { umount_client $MOUNT # the MDS must remain up until last MDT stop_mds - MDS=$(do_facet $SINGLEMDS "lctl get_param -n devices" | awk '($3 ~ "mdt" && $4 ~ "MDT") { print $4 }') + MDS=$(do_facet $SINGLEMDS "lctl get_param -n devices" | awk '($3 ~ "mdt" && $4 ~ "MDT") { print $4 }' | head -1) [ -z "$MDS" ] && error "No MDT" && return 8 cleanup_24a cleanup_nocli || return 6 @@ -992,9 +744,9 @@ run_test 25 "Verify modules are referenced" test_26() { load_modules # we need modules before mount for sysctl, so make sure... - do_facet mds "lsmod | grep -q lustre || modprobe lustre" + do_facet $SINGLEMDS "lsmod | grep -q lustre || modprobe lustre" #define OBD_FAIL_MDS_FS_SETUP 0x135 - do_facet mds "lctl set_param fail_loc=0x80000135" + do_facet $SINGLEMDS "lctl set_param fail_loc=0x80000135" start_mds && echo MDS started && return 1 lctl get_param -n devices DEVS=$(lctl get_param -n devices | wc -l) @@ -1015,7 +767,7 @@ set_and_check() { FINAL=$(($ORIG + 5)) fi echo "Setting $PARAM from $ORIG to $FINAL" - do_facet mds "$LCTL conf_param $PARAM=$FINAL" || error conf_param failed + do_facet $SINGLEMDS "$LCTL conf_param $PARAM=$FINAL" || error conf_param failed local RESULT local MAX=90 local WAIT=0 @@ -1046,10 +798,13 @@ test_27a() { run_test 27a "Reacquire MGS lock if OST started first" test_27b() { + # FIXME. ~grev setup - facet_failover mds - set_and_check mds "lctl get_param -n mdt.$FSNAME-MDT0000.identity_acquire_expire" "$FSNAME-MDT0000.mdt.identity_acquire_expire" || return 3 - set_and_check client "lctl get_param -n mdc.$FSNAME-MDT0000-mdc-*.max_rpcs_in_flight" "$FSNAME-MDT0000.mdc.max_rpcs_in_flight" || return 4 + local device=$(do_facet $SINGLEMDS "lctl get_param -n devices" | awk '($3 ~ "mdt" && $4 ~ "MDT") { print $4 }') + + facet_failover $SINGLEMDS + set_and_check $SINGLEMDS "lctl get_param -n mdt.$device.identity_acquire_expire" "$device.mdt.identity_acquire_expire" || return 3 + set_and_check client "lctl get_param -n mdc.$device-mdc-*.max_rpcs_in_flight" "$device.mdc.max_rpcs_in_flight" || return 4 check_mount cleanup } @@ -1106,7 +861,7 @@ test_29() { local WAIT=0 while [ 1 ]; do sleep 5 - RESULT=`do_facet mds " lctl get_param -n $MPROC"` + RESULT=`do_facet $SINGLEMDS " lctl get_param -n $MPROC"` [ ${PIPESTATUS[0]} = 0 ] || error "Can't read $MPROC" if [ $RESULT -eq $DEAC ]; then echo "MDT deactivated also after $WAIT sec (got $RESULT)" @@ -1313,7 +1068,7 @@ test_33a() { # bug 12333, was test_33 start fs2mds $fs2mdsdev $MDS_MOUNT_OPTS && trap cleanup_24a EXIT INT start fs2ost $fs2ostdev $OST_MOUNT_OPTS - do_facet mds "$LCTL conf_param $FSNAME2.sys.timeout=200" || rc=1 + do_facet $SINGLEMDS "$LCTL conf_param $FSNAME2.sys.timeout=200" || rc=1 mkdir -p $MOUNT2 mount -t lustre $MGSNID:/${FSNAME2} $MOUNT2 || rc=2 echo "ok." @@ -1395,7 +1150,8 @@ test_35() { # bug 12459 log "Set up a fake failnode for the MDS" FAKENID="127.0.0.2" - do_facet mds $LCTL conf_param ${FSNAME}-MDT0000.failover.node=$FAKENID || return 4 + local device=$(do_facet $SINGLEMDS "lctl get_param -n devices" | awk '($3 ~ "mdt" && $4 ~ "MDT") { print $4 }' | head -1) + do_facet $SINGLEMDS $LCTL conf_param ${device}.failover.node=$FAKENID || return 4 log "Wait for RECONNECT_INTERVAL seconds (10s)" sleep 10 @@ -1419,7 +1175,7 @@ test_35() { # bug 12459 # contact after the connection loss $LCTL dk $TMP/lustre-log-$TESTNAME.log NEXTCONN=`awk "/${MSG}/ {start = 1;} - /import_select_connection.*${FSNAME}-MDT0000-mdc.* using connection/ { + /import_select_connection.$device-mdc.* using connection/ { if (start) { if (\\\$NF ~ /$FAKENID/) print \\\$NF; @@ -1548,10 +1304,13 @@ test_38() { # bug 14222 stop_mds log "rename lov_objid file on MDS" rm -f $TMP/lov_objid.orig - do_facet mds "debugfs -c -R \\\"dump lov_objid $TMP/lov_objid.orig\\\" $MDSDEV" - do_facet mds "debugfs -w -R \\\"rm lov_objid\\\" $MDSDEV" - do_facet mds "od -Ax -td8 $TMP/lov_objid.orig" + local dev=${SINGLEMDS}_dev + local MDSDEV=${!dev} + do_facet $SINGLEMDS "debugfs -c -R \\\"dump lov_objid $TMP/lov_objid.orig\\\" $MDSDEV" + do_facet $SINGLEMDS "debugfs -w -R \\\"rm lov_objid\\\" $MDSDEV" + + do_facet $SINGLEMDS "od -Ax -td8 $TMP/lov_objid.orig" # check create in mds_lov_connect start_mds mount_client $MOUNT @@ -1559,17 +1318,17 @@ test_38() { # bug 14222 [ $V ] && log "verifying $DIR/$tdir/$f" diff -q $f $DIR/$tdir/$f || ERROR=y done - do_facet mds "debugfs -c -R \\\"dump lov_objid $TMP/lov_objid.new\\\" $MDSDEV" - do_facet mds "od -Ax -td8 $TMP/lov_objid.new" + do_facet $SINGLEMDS "debugfs -c -R \\\"dump lov_objid $TMP/lov_objid.new\\\" $MDSDEV" + do_facet $SINGLEMDS "od -Ax -td8 $TMP/lov_objid.new" [ "$ERROR" = "y" ] && error "old and new files are different after connect" || true # check it's updates in sync umount_client $MOUNT stop_mds - do_facet mds dd if=/dev/zero of=$TMP/lov_objid.clear bs=4096 count=1 - do_facet mds "debugfs -w -R \\\"rm lov_objid\\\" $MDSDEV" - do_facet mds "debugfs -w -R \\\"write $TMP/lov_objid.clear lov_objid\\\" $MDSDEV " + do_facet $SINGLEMDS dd if=/dev/zero of=$TMP/lov_objid.clear bs=4096 count=1 + do_facet $SINGLEMDS "debugfs -w -R \\\"rm lov_objid\\\" $MDSDEV" + do_facet $SINGLEMDS "debugfs -w -R \\\"write $TMP/lov_objid.clear lov_objid\\\" $MDSDEV " start_mds mount_client $MOUNT @@ -1577,8 +1336,8 @@ test_38() { # bug 14222 [ $V ] && log "verifying $DIR/$tdir/$f" diff -q $f $DIR/$tdir/$f || ERROR=y done - do_facet mds "debugfs -c -R \\\"dump lov_objid $TMP/lov_objid.new1\\\" $MDSDEV" - do_facet mds "od -Ax -td8 $TMP/lov_objid.new1" + do_facet $SINGLEMDS "debugfs -c -R \\\"dump lov_objid $TMP/lov_objid.new1\\\" $MDSDEV" + do_facet $SINGLEMDS "od -Ax -td8 $TMP/lov_objid.new1" umount_client $MOUNT stop_mds [ "$ERROR" = "y" ] && error "old and new files are different after sync" || true @@ -1600,7 +1359,7 @@ run_test 39 "leak_finder recognizes both LUSTRE and LNET malloc messages" test_40() { # bug 15759 start_ost #define OBD_FAIL_TGT_TOOMANY_THREADS 0x706 - do_facet mds "sysctl -w lustre.fail_loc=0x80000706" + do_facet $SINGLEMDS "sysctl -w lustre.fail_loc=0x80000706" start_mds cleanup } @@ -1608,9 +1367,12 @@ run_test 40 "race during service thread startup" test_41() { #bug 14134 local rc - start mds $MDSDEV $MDS_MOUNT_OPTS -o nosvc -n + local dev=${SINGLEMDS}_dev + local MDSDEV=${!dev} + + start $SINGLEMDS $MDSDEV $MDS_MOUNT_OPTS -o nosvc -n start ost1 `ostdevname 1` $OST_MOUNT_OPTS - start mds $MDSDEV $MDS_MOUNT_OPTS -o nomgs + start $SINGLEMDS $MDSDEV $MDS_MOUNT_OPTS -o nomgs mkdir -p $MOUNT mount_client $MOUNT || return 1 sleep 5 @@ -1620,8 +1382,8 @@ test_41() { #bug 14134 umount_client $MOUNT stop ost1 -f || return 201 - stop mds -f || return 202 - stop mds -f || return 203 + stop_mds -f || return 202 + stop_mds -f || return 203 unload_modules || return 204 return $rc } diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh index 9d95405..3f03f38 100755 --- a/lustre/tests/replay-single.sh +++ b/lustre/tests/replay-single.sh @@ -879,11 +879,11 @@ test_41() { do_facet client dd if=/dev/zero of=$f bs=4k count=1 || return 3 cancel_lru_locks osc # fail ost2 and read from ost1 - local osc2dev=`do_facet mds "lctl get_param -n devices | grep ${ost2_svc}-osc-MDT0000" | awk '{print $1}'` + local osc2dev=`do_facet $SINGLEMDS "lctl get_param -n devices | grep ${ost2_svc}-osc-MDT0000" | awk '{print $1}'` [ -z "$osc2dev" ] && echo "OST: $ost2_svc" && lctl get_param -n devices && return 4 - do_facet mds $LCTL --device $osc2dev deactivate || return 1 + do_facet $SINGLEMDS $LCTL --device $osc2dev deactivate || return 1 do_facet client dd if=$f of=/dev/null bs=4k count=1 || return 3 - do_facet mds $LCTL --device $osc2dev activate || return 2 + do_facet $SINGLEMDS $LCTL --device $osc2dev activate || return 2 return 0 } run_test 41 "read from a valid osc while other oscs are invalid" @@ -1765,6 +1765,71 @@ test_70b () { run_test 70b "mds recovery; $CLIENTCOUNT clients" # end multi-client tests +test_80a() { + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0 + + mkdir -p $DIR/$tdir + replay_barrier mds2 + $CHECKSTAT -t dir $DIR/$tdir || error "$CHECKSTAT -t dir $DIR/$tdir failed" + rmdir $DIR/$tdir || error "rmdir $DIR/$tdir failed" + fail mds2 + stat $DIR/$tdir +} +run_test 80a "CMD: unlink cross-node dir (fail mds with inode)" + +test_80b() { + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0 + + mkdir -p $DIR/$tdir + replay_barrier mds1 + $CHECKSTAT -t dir $DIR/$tdir || error "$CHECKSTAT -t dir $DIR/$tdir failed" + rmdir $DIR/$tdir || error "rmdir $DIR/$tdir failed" + fail mds1 + stat $DIR/$tdir +} +run_test 80b "CMD: unlink cross-node dir (fail mds with name)" + +test_81a() { + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0 + + mkdir -p $DIR/$tdir + createmany -o $DIR/$tdir/f 3000 || error "createmany failed" + sleep 10 + $CHECKSTAT -t dir $DIR/$tdir || error "$CHECKSTAT -t dir failed" + $CHECKSTAT -t file $DIR/$tdir/f1002 || error "$CHECKSTAT -t file failed" + replay_barrier mds1 + rm $DIR/$tdir/f1002 || error "rm $DIR/$tdir/f1002 failed" + fail mds1 + stat $DIR/$tdir/f1002 +} +run_test 81a "CMD: unlink cross-node file (fail mds with name)" + +test_82a() { + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0 + + local dir=$DIR/d82a + replay_barrier mds2 + mkdir $dir || error "mkdir $dir failed" + log "FAILOVER mds2" + fail mds2 + stat $DIR + $CHECKSTAT -t dir $dir || error "$CHECKSTAT -t dir $dir failed" +} +run_test 82a "CMD: mkdir cross-node dir (fail mds with inode)" + +test_82b() { + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0 + + local dir=$DIR/d82b + replay_barrier mds1 + mkdir $dir || error "mkdir $dir failed" + log "FAILOVER mds1" + fail mds1 + stat $DIR + $CHECKSTAT -t dir $dir || error "$CHECKSTAT -t dir $dir failed" +} +run_test 82b "CMD: mkdir cross-node dir (fail mds with name)" + equals_msg `basename $0`: test complete, cleaning up check_and_cleanup_lustre [ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG || true diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index e9ec28b..aeeb8c8 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -1126,14 +1126,32 @@ test_29() { touch $DIR/d29/foo log 'first d29' ls -l $DIR/d29 - LOCKCOUNTORIG=`lctl get_param -n ldlm.namespaces.*mdc*.lock_count` - LOCKUNUSEDCOUNTORIG=`lctl get_param -n ldlm.namespaces.*mdc*.lock_unused_count` - [ -z $"LOCKCOUNTORIG" ] && echo "No mdc lock count" && return 1 + + declare -i LOCKCOUNTORIG=0 + for lock_count in $(lctl get_param -n ldlm.namespaces.*mdc*.lock_count); do + let LOCKCOUNTORIG=$LOCKCOUNTORIG+$lock_count + done + [ $LOCKCOUNTORIG -eq 0 ] && echo "No mdc lock count" && return 1 + + declare -i LOCKUNUSEDCOUNTORIG=0 + for unused_count in $(lctl get_param -n ldlm.namespaces.*mdc*.lock_unused_count); do + let LOCKUNUSEDCOUNTORIG=$LOCKUNUSEDCOUNTORIG+$unused_count + done + log 'second d29' ls -l $DIR/d29 log 'done' - LOCKCOUNTCURRENT=`lctl get_param -n ldlm.namespaces.*mdc*.lock_count` - LOCKUNUSEDCOUNTCURRENT=`lctl get_param -n ldlm.namespaces.*mdc*.lock_unused_count` + + declare -i LOCKCOUNTCURRENT=0 + for lock_count in $(lctl get_param -n ldlm.namespaces.*mdc*.lock_count); do + let LOCKCOUNTCURRENT=$LOCKCOUNTCURRENT+$lock_count + done + + declare -i LOCKUNUSEDCOUNTCURRENT=0 + for unused_count in $(lctl get_param -n ldlm.namespaces.*mdc*.lock_unused_count); do + let LOCKUNUSEDCOUNTCURRENT=$LOCKUNUSEDCOUNTCURRENT+$unused_count + done + if [ "$LOCKCOUNTCURRENT" -gt "$LOCKCOUNTORIG" ]; then lctl set_param -n ldlm.dump_namespaces "" error "CURRENT: $LOCKCOUNTCURRENT > $LOCKCOUNTORIG" @@ -2180,6 +2198,42 @@ test_51b() { } run_test 51b "mkdir .../t-0 --- .../t-$NUMTEST ====================" +test_51bb() { + [ -z "$CLIENTS" ] && skip "needs >= 2 CLIENTS" && return + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return + + NUMFREE=`df -i -P $DIR | tail -n 1 | awk '{ print $4 }'` + [ $NUMFREE -lt 21000 ] && \ + skip "not enough free inodes ($NUMFREE)" && \ + return + + check_kernel_version 40 || NUMTEST=31000 + [ $NUMFREE -lt $NUMTEST ] && NUMTEST=$(($NUMFREE - 50)) + + mkdir -p $DIR/d51bb + + IUSED=$(lfs df -i $DIR | grep MDT | awk '{print $3}') + OLDUSED=($IUSED) + + do_nodes $CLIENTS "mkdir -p $DIR/\$(hostname)" + + ls $DIR + + do_nodes $CLIENTS "createmany -d $DIR/\$(hostname)/t- $NUMTEST" + IUSED=$(lfs df -i $DIR | grep MDT | awk '{print $3}') + NEWUSED=($IUSED) + + local rc=0 + for ((i=0; i<${#NEWUSED[@]}; i++)); do + echo "mds $i: inodes count OLD ${OLDUSED[$i]} NEW ${NEWUSED[$i]}" + [ ${OLDUSED[$i]} -lt ${NEWUSED[$i]} ] || rc=1 + done + + [ $rc -ne 0 ] && error "no CMD functionality!" +} +run_test 51bb "mkdir .../t-0 --- .../t-$NUMTEST (CMD) ====================" + + test_51c() { [ ! -d $DIR/d51b ] && skip "$DIR/51b missing" && \ return @@ -2261,7 +2315,7 @@ test_53() { for value in `lctl get_param osc.*-osc-MDT0000.prealloc_last_id` ; do param=`echo ${value[0]} | cut -d "=" -f1` ostname=`echo $param | cut -d "." -f2 | cut -d - -f 1-2` - ost_last=`lctl get_param -n obdfilter.$ostname.last_id` + ost_last=`lctl get_param -n obdfilter.$ostname.last_id | head -n 1` mds_last=`lctl get_param -n $param` echo "$ostname.last_id=$ost_last ; MDS.last_id=$mds_last" if [ $ost_last != $mds_last ]; then @@ -2639,7 +2693,7 @@ test_57b() { $GETSTRIPE $FILE1 2>&1 | grep -q "no stripe" || error "$FILE1 has an EA" $GETSTRIPE $FILEN 2>&1 | grep -q "no stripe" || error "$FILEN has an EA" - MDSFREE="`lctl get_param -n osd.*MDT*.kbytesfree 2> /dev/null`" + MDSFREE="`lctl get_param -n osd.*MDT0000.kbytesfree 2> /dev/null`" MDCFREE="`lctl get_param -n mdc.*.kbytesfree | head -n 1`" echo "opening files to create objects/EAs" for FILE in `seq -f $DIR/d57b/f%g 1 $FILECOUNT`; do @@ -2906,19 +2960,21 @@ test_65k() { # bug11679 remote_mds_nodsh && skip "remote MDS" && return echo "Check OST status: " - MDS_OSCS=`do_facet mds lctl dl | awk '/[oO][sS][cC].*md[ts]/ { print $4 }'` + MDS_OSCS=`do_facet $SINGLEMDS lctl dl | awk '/[oO][sS][cC].*md[ts]/ { print $4 }'` for OSC in $MDS_OSCS; do echo $OSC "is activate" - do_facet mds lctl --device %$OSC activate + do_facet $SINGLEMDS lctl --device %$OSC activate done do_facet client mkdir -p $DIR/$tdir for INACTIVE_OSC in $MDS_OSCS; do echo $INACTIVE_OSC "is Deactivate:" - do_facet mds lctl --device %$INACTIVE_OSC deactivate + do_facet $SINGLEMDS lctl --device %$INACTIVE_OSC deactivate for STRIPE_OSC in $MDS_OSCS; do STRIPE_OST=`osc_to_ost $STRIPE_OSC` - STRIPE_INDEX=`do_facet mds lctl get_param -n lov.*md*.target_obd | - grep $STRIPE_OST | awk -F: '{print $1}'` + STRIPE_INDEX=`do_facet $SINGLEMDS lctl get_param -n lov.*md*.target_obd | + grep $STRIPE_OST | awk -F: '{print $1}' | head -n 1` + + [ -f $DIR/$tdir/${STRIPE_INDEX} ] && continue echo "$SETSTRIPE $DIR/$tdir/${STRIPE_INDEX} -i ${STRIPE_INDEX} -c 1" do_facet client $SETSTRIPE $DIR/$tdir/${STRIPE_INDEX} -i ${STRIPE_INDEX} -c 1 RC=$? @@ -2926,7 +2982,7 @@ test_65k() { # bug11679 done do_facet client rm -f $DIR/$tdir/* echo $INACTIVE_OSC "is Activate." - do_facet mds lctl --device %$INACTIVE_OSC activate + do_facet $SINGLEMDS lctl --device %$INACTIVE_OSC activate done } run_test 65k "validate manual striping works properly with deactivated OSCs" @@ -5111,16 +5167,27 @@ test_124a() { } run_test 124a "lru resize =======================================" +get_max_pool_limit() +{ + local limit=`lctl get_param -n ldlm.namespaces.*-MDT0000-mdc-*.pool.limit` + local max=0 + for l in $limit; do + if test $l -gt $max; then + max=$l + fi + done + echo $max +} + test_124b() { [ -z "`lctl get_param -n mdc.*.connect_flags | grep lru_resize`" ] && \ skip "no lru resize on server" && return 0 - # even for cmd no matter what metadata namespace to use for getting - # the limit, we use appropriate. - LIMIT=`lctl get_param -n ldlm.namespaces.*mdc*.pool.limit` + LIMIT=`get_max_pool_limit` NR=$(($(default_lru_size)*20)) if [ $NR -gt $LIMIT ]; then + log "Limit lock number by $LIMIT locks" NR=$LIMIT fi lru_resize_disable mdc @@ -5252,42 +5319,52 @@ test_128() { # bug 15212 } run_test 128 "interactive lfs for 2 consecutive find's" +set_dir_limits () { + local mntdev + local node + + local LDPROC=/proc/fs/ldiskfs + + for node in $(mdts_nodes); do + devs=$(do_node $node "lctl get_param -n devices" | awk '($3 ~ "mdt" && $4 ~ "MDT") { print $4 }') + for dev in $devs; do + mntdev=$(do_node $node "lctl get_param -n osd.$dev.mntdev") + do_node $node "echo $1 >$LDPROC/\\\$(basename $mntdev)/max_dir_size" + done + done +} test_129() { [ "$FSTYPE" != "ldiskfs" ] && skip "not needed for FSTYPE=$FSTYPE" && return 0 - DEV=$(basename $(do_facet mds lctl get_param -n osd.*MDT*.mntdev)) - [ -z "$DEV" ] && error "can't access mds mntdev" EFBIG=27 - LDPROC=/proc/fs/ldiskfs/$DEV/max_dir_size MAX=16384 - do_facet mds "echo $MAX > $LDPROC" + set_dir_limits $MAX mkdir -p $DIR/$tdir I=0 J=0 - while [ ! $I -gt $MAX ]; do + while [ ! $I -gt $((MAX * MDSCOUNT)) ]; do multiop $DIR/$tdir/$J Oc rc=$? if [ $rc -eq $EFBIG ]; then - do_facet mds "echo 0 >$LDPROC" + set_dir_limits 0 echo "return code $rc received as expected" return 0 elif [ $rc -ne 0 ]; then - do_facet mds "echo 0 >$LDPROC" + set_dir_limits 0 error_exit "return code $rc received instead of expected $EFBIG" fi J=$((J+1)) I=$(stat -c%s "$DIR/$tdir") done - error "exceeded dir size limit: $I bytes" - do_facet mds "echo 0 >$LDPROC" + error "exceeded dir size limit $MAX x $MDSCOUNT $((MAX * MDSCOUNT)) : $I bytes" + do_facet $SINGLEMDS "echo 0 >$LDPROC" } run_test 129 "test directory size limit ========================" - test_130a() { filefrag_op=$(filefrag -e 2>&1 | grep "invalid option") [ -n "$filefrag_op" ] && skip "filefrag does not support FIEMAP" && return diff --git a/lustre/tests/test-framework.sh b/lustre/tests/test-framework.sh index 3b289e2..9480fff 100644 --- a/lustre/tests/test-framework.sh +++ b/lustre/tests/test-framework.sh @@ -461,9 +461,9 @@ mount_facet() { # start facet device options start() { - facet=$1 + local facet=$1 shift - device=$1 + local device=$1 shift eval export ${facet}_dev=${device} eval export ${facet}_opt=\"$@\" @@ -475,7 +475,7 @@ start() { stop() { local running - facet=$1 + local facet=$1 shift HOST=`facet_active_host $facet` [ -z $HOST ] && echo stop: no host for $facet && return 0 @@ -642,14 +642,14 @@ wait_for() { } wait_mds_recovery_done () { - local timeout=`do_facet mds lctl get_param -n timeout` + local timeout=`do_facet $SINGLEMDS lctl get_param -n timeout` #define OBD_RECOVERY_TIMEOUT (obd_timeout * 5 / 2) # as we are in process of changing obd_timeout in different ways # let's set MAX longer than that MAX=$(( timeout * 4 )) WAIT=0 while [ $WAIT -lt $MAX ]; do - STATUS=`do_facet $SINGLEMDS "lctl get_param -n mdt.*-MDT*.recovery_status | grep status"` + STATUS=`do_facet $SINGLEMDS "lctl get_param -n mdt.*-MDT0000.recovery_status | grep status"` echo $STATUS | grep COMPLETE && return 0 sleep 5 WAIT=$((WAIT + 5)) @@ -1048,6 +1048,7 @@ formatall() { if [ ! -z $SEC ]; then MDS_MKFS_OPTS="$MDS_MKFS_OPTS --param srpc.flavor.default=$SEC" + MDSn_MKFS_OPTS="$MDSn_MKFS_OPTS --param srpc.flavor.default=$SEC" OST_MKFS_OPTS="$OST_MKFS_OPTS --param srpc.flavor.default=$SEC" fi @@ -1275,7 +1276,7 @@ absolute_path() { at_is_valid() { if [ -z "$AT_MAX_PATH" ]; then - AT_MAX_PATH=$(do_facet mds "find /sys/ -name at_max") + AT_MAX_PATH=$(do_facet $SINGLEMDS "find /sys/ -name at_max") [ -z "$AT_MAX_PATH" ] && echo "missing /sys/.../at_max " && return 1 fi return 0 @@ -1285,7 +1286,7 @@ at_is_enabled() { at_is_valid || error "invalid call" # only check mds, we assume at_max is the same on all nodes - local at_max=$(do_facet mds "cat $AT_MAX_PATH") + local at_max=$(do_facet $SINGLEMDS "cat $AT_MAX_PATH") if [ $at_max -eq 0 ]; then return 1 else @@ -1334,27 +1335,27 @@ at_max_set() { drop_request() { # OBD_FAIL_MDS_ALL_REQUEST_NET RC=0 - do_facet mds lctl set_param fail_loc=0x123 + do_facet $SINGLEMDS lctl set_param fail_loc=0x123 do_facet client "$1" || RC=$? - do_facet mds lctl set_param fail_loc=0 + do_facet $SINGLEMDS lctl set_param fail_loc=0 return $RC } drop_reply() { # OBD_FAIL_MDS_ALL_REPLY_NET RC=0 - do_facet mds lctl set_param fail_loc=0x122 + do_facet $SINGLEMDS lctl set_param fail_loc=0x122 do_facet client "$@" || RC=$? - do_facet mds lctl set_param fail_loc=0 + do_facet $SINGLEMDS lctl set_param fail_loc=0 return $RC } drop_reint_reply() { # OBD_FAIL_MDS_REINT_NET_REP RC=0 - do_facet mds lctl set_param fail_loc=0x119 + do_facet $SINGLEMDS lctl set_param fail_loc=0x119 do_facet client "$@" || RC=$? - do_facet mds lctl set_param fail_loc=0 + do_facet $SINGLEMDS lctl set_param fail_loc=0 return $RC } @@ -1389,9 +1390,9 @@ drop_bl_callback() { drop_ldlm_reply() { #define OBD_FAIL_LDLM_REPLY 0x30c RC=0 - do_facet mds lctl set_param fail_loc=0x30c + do_facet $SINGLEMDS lctl set_param fail_loc=0x30c do_facet client "$@" || RC=$? - do_facet mds lctl set_param fail_loc=0 + do_facet $SINGLEMDS lctl set_param fail_loc=0 return $RC } @@ -1627,8 +1628,8 @@ pass() { } check_mds() { - FFREE=`lctl get_param -n osd.*MDT*.filesfree` - FTOTAL=`lctl get_param -n osd.*MDT*.filestotal` + FFREE=$(do_node $SINGLEMDS lctl get_param -n osd.*MDT*.filesfree | awk 'BEGIN{avail=0}; {avail+=$1}; END{print avail}') + FTOTAL=$(do_node $SINGLEMDS lctl get_param -n osd.*MDT*.filestotal | awk 'BEGIN{avail=0}; {avail+=$1}; END{print avail}') [ $FFREE -ge $FTOTAL ] && error "files free $FFREE > total $FTOTAL" || true } @@ -1754,17 +1755,10 @@ remote_ost_nodsh() } mdts_nodes () { - local MDSNODES=$(facet_host $SINGLEMDS) + local MDSNODES local NODES_sort - - # FIXME: Currenly we use only $SINGLEMDS, - # should be fixed when we will start to test cmd. - echo $MDSNODES - return - for num in `seq $MDSCOUNT`; do - local myMDS=$(facet_host mds$num) - MDSNODES="$MDSNODES $myMDS" + MDSNODES="$MDSNODES $(facet_host mds$num)" done NODES_sort=$(for i in $MDSNODES; do echo $i; done | sort -u)