From: wangdi Date: Thu, 31 Oct 2013 06:33:38 +0000 (-0700) Subject: LU-1187 lmv: remove obsolete lmv object. X-Git-Tag: 2.3.59~18 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=5e91e5b5200e59a73c10d4c73768b6aee789c395 LU-1187 lmv: remove obsolete lmv object. lmv object is intended to handle spliting directory, which is obsolete for current DNE infrastructure. Signed-off-by: wang di Change-Id: I5337d497451f6869bc67474b0cd7d74bde01a172 Reviewed-on: http://review.whamcloud.com/5011 Tested-by: Hudson Reviewed-by: Alex Zhuravlev Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Oleg Drokin --- diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index fe87c92..3cc5d7a 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -2292,17 +2292,17 @@ struct md_op_data * ll_prep_md_op_data(struct md_op_data *op_data, op_data->op_capa2 = NULL; } - op_data->op_name = name; - op_data->op_namelen = namelen; - op_data->op_mode = mode; - op_data->op_mod_time = cfs_time_current_sec(); - op_data->op_fsuid = cfs_curproc_fsuid(); - op_data->op_fsgid = cfs_curproc_fsgid(); - op_data->op_cap = cfs_curproc_cap_pack(); - op_data->op_bias = MDS_CHECK_SPLIT; - op_data->op_opc = opc; - op_data->op_mds = 0; - op_data->op_data = data; + op_data->op_name = name; + op_data->op_namelen = namelen; + op_data->op_mode = mode; + op_data->op_mod_time = cfs_time_current_sec(); + op_data->op_fsuid = cfs_curproc_fsuid(); + op_data->op_fsgid = cfs_curproc_fsgid(); + op_data->op_cap = cfs_curproc_cap_pack(); + op_data->op_bias = 0; + op_data->op_opc = opc; + op_data->op_mds = 0; + op_data->op_data = data; /* If the file is being opened after mknod() (normally due to NFS) * try to use the default stripe data from parent directory for diff --git a/lustre/lmv/Makefile.in b/lustre/lmv/Makefile.in index 2f77e68..f03d419 100644 --- a/lustre/lmv/Makefile.in +++ b/lustre/lmv/Makefile.in @@ -1,4 +1,4 @@ MODULES := lmv -lmv-objs := lmv_obd.o lmv_intent.o lmv_fld.o lmv_object.o lproc_lmv.o +lmv-objs := lmv_obd.o lmv_intent.o lmv_fld.o lproc_lmv.o @INCLUDE_RULES@ diff --git a/lustre/lmv/autoMakefile.am b/lustre/lmv/autoMakefile.am index c1d63b1..da245d9 100644 --- a/lustre/lmv/autoMakefile.am +++ b/lustre/lmv/autoMakefile.am @@ -36,7 +36,7 @@ if LIBLUSTRE noinst_LIBRARIES = liblmv.a -liblmv_a_SOURCES = lmv_obd.c lmv_intent.c lmv_object.c lmv_fld.c +liblmv_a_SOURCES = lmv_obd.c lmv_intent.c lmv_fld.c liblmv_a_CPPFLAGS = $(LLCPPFLAGS) liblmv_a_CFLAGS = $(LLCFLAGS) endif diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index 2e9a8f4..b1f071d 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -64,60 +64,55 @@ int lmv_intent_remote(struct obd_export *exp, void *lmm, ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct ptlrpc_request *req = NULL; - struct lustre_handle plock; - struct md_op_data *op_data; - struct lmv_tgt_desc *tgt; - struct mdt_body *body; - int pmode; - int rc = 0; - ENTRY; - - body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY); - if (body == NULL) - RETURN(-EPROTO); + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct ptlrpc_request *req = NULL; + struct lustre_handle plock; + struct md_op_data *op_data; + struct lmv_tgt_desc *tgt; + struct mdt_body *body; + int pmode; + int rc = 0; + ENTRY; + + body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY); + if (body == NULL) + RETURN(-EPROTO); + + LASSERT((body->valid & OBD_MD_MDS)); + + /* + * Unfortunately, we have to lie to MDC/MDS to retrieve + * attributes llite needs and provideproper locking. + */ + if (it->it_op & IT_LOOKUP) + it->it_op = IT_GETATTR; + + /* + * We got LOOKUP lock, but we really need attrs. + */ + pmode = it->d.lustre.it_lock_mode; + if (pmode) { + plock.cookie = it->d.lustre.it_lock_handle; + it->d.lustre.it_lock_mode = 0; + it->d.lustre.it_data = NULL; + } - /* - * Not cross-ref case, just get out of here. - */ - if (!(body->valid & OBD_MD_MDS)) - RETURN(0); + LASSERT(fid_is_sane(&body->fid1)); - /* - * Unfortunately, we have to lie to MDC/MDS to retrieve - * attributes llite needs and provideproper locking. - */ - if (it->it_op & IT_LOOKUP) - it->it_op = IT_GETATTR; - - /* - * We got LOOKUP lock, but we really need attrs. - */ - pmode = it->d.lustre.it_lock_mode; - if (pmode) { - plock.cookie = it->d.lustre.it_lock_handle; - it->d.lustre.it_lock_mode = 0; - it->d.lustre.it_data = NULL; - } + tgt = lmv_find_target(lmv, &body->fid1); + if (IS_ERR(tgt)) + GOTO(out, rc = PTR_ERR(tgt)); - LASSERT(fid_is_sane(&body->fid1)); + OBD_ALLOC_PTR(op_data); + if (op_data == NULL) + GOTO(out, rc = -ENOMEM); - tgt = lmv_find_target(lmv, &body->fid1); - if (IS_ERR(tgt)) - GOTO(out, rc = PTR_ERR(tgt)); + op_data->op_fid1 = body->fid1; + op_data->op_bias = MDS_CROSS_REF; - OBD_ALLOC_PTR(op_data); - if (op_data == NULL) - GOTO(out, rc = -ENOMEM); - - op_data->op_fid1 = body->fid1; - op_data->op_bias = MDS_CROSS_REF; - - CDEBUG(D_INODE, - "REMOTE_INTENT with fid="DFID" -> mds #%d\n", - PFID(&body->fid1), tgt->ltd_idx); + CDEBUG(D_INODE, "REMOTE_INTENT with fid="DFID" -> mds #%d\n", + PFID(&body->fid1), tgt->ltd_idx); it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE; rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it, @@ -160,180 +155,76 @@ int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data, ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags) { - struct obd_device *obd = exp->exp_obd; - struct lu_fid rpid = op_data->op_fid1; - struct lmv_obd *lmv = &obd->u.lmv; - struct md_op_data *sop_data; - struct lmv_stripe_md *mea; - struct lmv_tgt_desc *tgt; - struct mdt_body *body; - struct lmv_object *obj; - int rc; - int loop = 0; - int sidx; - ENTRY; - - OBD_ALLOC_PTR(sop_data); - if (sop_data == NULL) - RETURN(-ENOMEM); - - /* save op_data fro repeat case */ - *sop_data = *op_data; - -repeat: - - ++loop; - LASSERT(loop <= 2); - obj = lmv_object_find(obd, &rpid); - if (obj) { - /* - * Directory is already split, so we have to forward request to - * the right MDS. - */ - sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, - (char *)op_data->op_name, - op_data->op_namelen); - - rpid = obj->lo_stripes[sidx].ls_fid; - - sop_data->op_mds = obj->lo_stripes[sidx].ls_mds; - tgt = lmv_get_target(lmv, sop_data->op_mds); - sop_data->op_bias &= ~MDS_CHECK_SPLIT; - lmv_object_put(obj); - - CDEBUG(D_INODE, - "Choose slave dir ("DFID") -> mds #%d\n", - PFID(&rpid), tgt->ltd_idx); - } else { - sop_data->op_bias |= MDS_CHECK_SPLIT; - tgt = lmv_find_target(lmv, &rpid); - sop_data->op_mds = tgt->ltd_idx; - } - if (IS_ERR(tgt)) - GOTO(out_free_sop_data, rc = PTR_ERR(tgt)); - - sop_data->op_fid1 = rpid; - - if (it->it_op & IT_CREAT) { - /* - * For open with IT_CREATE and for IT_CREATE cases allocate new - * fid and setup FLD for it. - */ - sop_data->op_fid3 = sop_data->op_fid2; - rc = lmv_fid_alloc(exp, &sop_data->op_fid2, sop_data); - if (rc) - GOTO(out_free_sop_data, rc); - - if (rc == -ERESTART) - goto repeat; - else if (rc) - GOTO(out_free_sop_data, rc); - } - - CDEBUG(D_INODE, - "OPEN_INTENT with fid1="DFID", fid2="DFID", name='%s' -> mds #%d\n", - PFID(&sop_data->op_fid1), PFID(&sop_data->op_fid2), - sop_data->op_name, tgt->ltd_idx); - - rc = md_intent_lock(tgt->ltd_exp, sop_data, lmm, lmmsize, it, flags, - reqp, cb_blocking, extra_lock_flags); - - if (rc == -ERESTART) { - LASSERT(*reqp != NULL); - DEBUG_REQ(D_WARNING|D_RPCTRACE, *reqp, - "Got -ERESTART during open!\n"); - ptlrpc_req_finished(*reqp); - *reqp = NULL; - it->d.lustre.it_data = NULL; - - /* - * Directory got split. Time to update local object and repeat - * the request with proper MDS. - */ - LASSERT(lu_fid_eq(&op_data->op_fid1, &rpid)); - rc = lmv_handle_split(exp, &rpid); - if (rc == 0) { - /* We should reallocate child FID. */ - rc = lmv_allocate_slaves(obd, &rpid, op_data, - &sop_data->op_fid2); - if (rc == 0) - goto repeat; - } - } - - if (rc != 0) - GOTO(out_free_sop_data, rc); - - /* - * Nothing is found, do not access body->fid1 as it is zero and thus - * pointless. - */ - if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) && - !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) && - !(it->d.lustre.it_disposition & DISP_OPEN_OPEN)) - GOTO(out_free_sop_data, rc = 0); - - /* - * Okay, MDS has returned success. Probably name has been resolved in - * remote inode. - */ - rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp, - cb_blocking, extra_lock_flags); - if (rc != 0) { - LASSERT(rc < 0); - /* - * This is possible, that some userspace application will try to - * open file as directory and we will have -ENOTDIR here. As - * this is normal situation, we should not print error here, - * only debug info. - */ - CDEBUG(D_INODE, "Can't handle remote %s: dir "DFID"("DFID"):" - "%*s: %d\n", LL_IT2STR(it), PFID(&op_data->op_fid2), - PFID(&rpid), op_data->op_namelen, op_data->op_name, rc); - GOTO(out_free_sop_data, rc); - } - - /* - * Caller may use attrs MDS returns on IT_OPEN lock request so, we have - * to update them for split dir. - */ - body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY); - LASSERT(body != NULL); - - /* - * Could not find object, FID is not present in response. - */ - if (!(body->valid & OBD_MD_FLID)) - GOTO(out_free_sop_data, rc = 0); - - obj = lmv_object_find(obd, &body->fid1); - if (obj == NULL) { - /* - * XXX: Capability for remote call! - */ - mea = lmv_get_mea(*reqp); - if (mea != NULL) { - obj = lmv_object_create(exp, &body->fid1, mea); - if (IS_ERR(obj)) - GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj)); - } - } + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + struct mdt_body *body; + int rc; + ENTRY; + + tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); + + if (it->it_op & IT_CREAT) { + /* + * For open with IT_CREATE and for IT_CREATE cases allocate new + * fid and setup FLD for it. + */ + op_data->op_fid3 = op_data->op_fid2; + rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data); + if (rc != 0) + RETURN(rc); + } - if (obj) { - /* - * This is split dir and we'd want to get attrs. - */ - CDEBUG(D_INODE, "Slave attributes for "DFID"\n", - PFID(&body->fid1)); + CDEBUG(D_INODE, "OPEN_INTENT with fid1="DFID", fid2="DFID"," + " name='%s' -> mds #%d\n", PFID(&op_data->op_fid1), + PFID(&op_data->op_fid2), op_data->op_name, tgt->ltd_idx); + + rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it, flags, + reqp, cb_blocking, extra_lock_flags); + if (rc != 0) + RETURN(rc); + /* + * Nothing is found, do not access body->fid1 as it is zero and thus + * pointless. + */ + if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) && + !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) && + !(it->d.lustre.it_disposition & DISP_OPEN_OPEN)) + RETURN(rc); + + body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY); + if (body == NULL) + RETURN(-EPROTO); + /* + * Not cross-ref case, just get out of here. + */ + if (likely(!(body->valid & OBD_MD_MDS))) + RETURN(0); + + /* + * Okay, MDS has returned success. Probably name has been resolved in + * remote inode. + */ + rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp, + cb_blocking, extra_lock_flags); + if (rc != 0) { + LASSERT(rc < 0); + /* + * This is possible, that some userspace application will try to + * open file as directory and we will have -ENOTDIR here. As + * this is normal situation, we should not print error here, + * only debug info. + */ + CDEBUG(D_INODE, "Can't handle remote %s: dir "DFID"("DFID"):" + "%*s: %d\n", LL_IT2STR(it), PFID(&op_data->op_fid2), + PFID(&op_data->op_fid1), op_data->op_namelen, + op_data->op_name, rc); + RETURN(rc); + } - rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1, - cb_blocking, extra_lock_flags); - lmv_object_put(obj); - } - EXIT; -out_free_sop_data: - OBD_FREE_PTR(sop_data); - return rc; + RETURN(rc); } /* @@ -345,162 +236,49 @@ int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data, ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags) { - struct obd_device *obd = exp->exp_obd; - struct lu_fid rpid = op_data->op_fid1; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_object *obj = NULL; - struct md_op_data *sop_data; - struct lmv_stripe_md *mea; - struct lmv_tgt_desc *tgt = NULL; - struct mdt_body *body; - int sidx; - int loop = 0; - int rc = 0; - ENTRY; - - OBD_ALLOC_PTR(sop_data); - if (sop_data == NULL) - RETURN(-ENOMEM); - - *sop_data = *op_data; - -repeat: - ++loop; - LASSERT(loop <= 2); - - obj = lmv_object_find(obd, &op_data->op_fid1); - if (obj && op_data->op_namelen) { - sidx = raw_name2idx(obj->lo_hashtype, - obj->lo_objcount, - (char *)op_data->op_name, - op_data->op_namelen); - rpid = obj->lo_stripes[sidx].ls_fid; - tgt = lmv_get_target(lmv, - obj->lo_stripes[sidx].ls_mds); - CDEBUG(D_INODE, - "Choose slave dir ("DFID") -> mds #%d\n", - PFID(&rpid), tgt->ltd_idx); - sop_data->op_bias &= ~MDS_CHECK_SPLIT; - } else { - tgt = lmv_find_target(lmv, &op_data->op_fid1); - sop_data->op_bias |= MDS_CHECK_SPLIT; - } - if (obj) - lmv_object_put(obj); - - if (IS_ERR(tgt)) - GOTO(out_free_sop_data, rc = PTR_ERR(tgt)); - - if (!fid_is_sane(&sop_data->op_fid2)) - fid_zero(&sop_data->op_fid2); - - CDEBUG(D_INODE, - "LOOKUP_INTENT with fid1="DFID", fid2="DFID - ", name='%s' -> mds #%d\n", - PFID(&sop_data->op_fid1), PFID(&sop_data->op_fid2), - sop_data->op_name ? sop_data->op_name : "", - tgt->ltd_idx); - - sop_data->op_bias &= ~MDS_CROSS_REF; - sop_data->op_fid1 = rpid; - - rc = md_intent_lock(tgt->ltd_exp, sop_data, lmm, lmmsize, it, - flags, reqp, cb_blocking, extra_lock_flags); - - if (rc == -ERESTART) { - LASSERT(*reqp != NULL); - DEBUG_REQ(D_WARNING|D_RPCTRACE, *reqp, - "Got -ERESTART during lookup!\n"); - ptlrpc_req_finished(*reqp); - *reqp = NULL; - it->d.lustre.it_data = 0; - - /* - * Directory got split since last update. This shouldn't be - * because splitting causes lock revocation, so revalidate had - * to fail and lookup on dir had to return mea. - */ - LASSERT(obj == NULL); - - obj = lmv_object_create(exp, &rpid, NULL); - if (IS_ERR(obj)) - GOTO(out_free_sop_data, rc = PTR_ERR(obj)); - lmv_object_put(obj); - goto repeat; - } - - if (rc < 0) - GOTO(out_free_sop_data, rc); - - if (obj && rc > 0) { - /* - * This is split dir. In order to optimize things a bit, we - * consider obj valid updating missing parts. - */ - CDEBUG(D_INODE, - "Revalidate slaves for "DFID", rc %d\n", - PFID(&op_data->op_fid1), rc); - - LASSERT(fid_is_sane(&op_data->op_fid2)); - rc = lmv_revalidate_slaves(exp, reqp, &op_data->op_fid1, it, rc, - cb_blocking, extra_lock_flags); - GOTO(out_free_sop_data, rc); - } - - if (*reqp == NULL) - GOTO(out_free_sop_data, rc); - - /* - * MDS has returned success. Probably name has been resolved in - * remote inode. Let's check this. - */ - rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, - reqp, cb_blocking, extra_lock_flags); - if (rc < 0) - GOTO(out_free_sop_data, rc); - - /* - * Nothing is found, do not access body->fid1 as it is zero and thus - * pointless. - */ - if (it->d.lustre.it_disposition & DISP_LOOKUP_NEG) - GOTO(out_free_sop_data, rc = 0); - - LASSERT(*reqp != NULL); - LASSERT((*reqp)->rq_repmsg != NULL); - body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY); - LASSERT(body != NULL); - - /* - * Could not find object, FID is not present in response. - */ - if (!(body->valid & OBD_MD_FLID)) - GOTO(out_free_sop_data, rc = 0); - - obj = lmv_object_find(obd, &body->fid1); - if (obj == NULL) { - /* - * XXX: Remote capability is not handled. - */ - mea = lmv_get_mea(*reqp); - if (mea != NULL) { - obj = lmv_object_create(exp, &body->fid1, mea); - if (IS_ERR(obj)) - GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj)); - } - } else { - CDEBUG(D_INODE, "Slave attributes for "DFID", rc %d\n", - PFID(&body->fid1), rc); - - rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1, - cb_blocking, extra_lock_flags); - lmv_object_put(obj); - } - - EXIT; -out_free_sop_data: - OBD_FREE_PTR(sop_data); - return rc; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt = NULL; + struct mdt_body *body; + int rc = 0; + ENTRY; + + tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); + + if (!fid_is_sane(&op_data->op_fid2)) + fid_zero(&op_data->op_fid2); + + CDEBUG(D_INODE, "LOOKUP_INTENT with fid1="DFID", fid2="DFID + ", name='%s' -> mds #%d\n", PFID(&op_data->op_fid1), + PFID(&op_data->op_fid2), + op_data->op_name ? op_data->op_name : "", + tgt->ltd_idx); + + op_data->op_bias &= ~MDS_CROSS_REF; + + rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it, + flags, reqp, cb_blocking, extra_lock_flags); + + if (rc < 0 || *reqp == NULL) + RETURN(rc); + + /* + * MDS has returned success. Probably name has been resolved in + * remote inode. Let's check this. + */ + body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY); + if (body == NULL) + RETURN(-EPROTO); + /* Not cross-ref case, just get out of here. */ + if (likely(!(body->valid & OBD_MD_MDS))) + RETURN(0); + + rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp, + cb_blocking, extra_lock_flags); + + RETURN(rc); } int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data, @@ -536,236 +314,3 @@ int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data, LBUG(); RETURN(rc); } - -int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, - const struct lu_fid *mid, struct lookup_intent *oit, - int master_valid, ldlm_blocking_callback cb_blocking, - __u64 extra_lock_flags) -{ - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - int master_lockm = 0; - struct lustre_handle *lockh = NULL; - struct ptlrpc_request *mreq = *reqp; - struct lustre_handle master_lockh = { 0 }; - struct md_op_data *op_data; - struct ldlm_lock *lock; - unsigned long size = 0; - struct mdt_body *body; - struct lmv_object *obj; - int i; - int rc = 0; - struct lu_fid fid; - struct ptlrpc_request *req; - ldlm_blocking_callback cb; - struct lookup_intent it; - struct lmv_tgt_desc *tgt; - int master; - ENTRY; - - CDEBUG(D_INODE, "Revalidate master obj "DFID"\n", PFID(mid)); - - OBD_ALLOC_PTR(op_data); - if (op_data == NULL) - RETURN(-ENOMEM); - - /* - * We have to loop over the subobjects, check validity and update them - * from MDS if needed. It's very useful that we need not to update all - * the fields. Say, common fields (that are equal on all the subojects - * need not to be update, another fields (i_size, for example) are - * cached all the time. - */ - obj = lmv_object_find_lock(obd, mid); - if (obj == NULL) { - OBD_FREE_PTR(op_data); - RETURN(-EALREADY); - } - - for (i = 0; i < obj->lo_objcount; i++) { - fid = obj->lo_stripes[i].ls_fid; - master = lu_fid_eq(&fid, &obj->lo_fid); - cb = master ? cb_blocking : lmv_blocking_ast; - - /* - * We need i_size and we would like to check possible cached locks, - * so this is is IT_GETATTR intent. - */ - memset(&it, 0, sizeof(it)); - it.it_op = IT_GETATTR; - - if (master && master_valid) { - /* - * lmv_intent_lookup() already checked - * validness and took the lock. - */ - if (mreq != NULL) { - body = req_capsule_server_get(&mreq->rq_pill, - &RMF_MDT_BODY); - LASSERT(body != NULL); - goto update; - } - /* - * Take already cached attrs into account. - */ - CDEBUG(D_INODE, - "Master "DFID"is locked and cached\n", - PFID(mid)); - goto release_lock; - } - - /* - * Prepare op_data for revalidating. Note that @fid2 shuld be - * defined otherwise it will go to server and take new lock - * which is what we reall not need here. - */ - memset(op_data, 0, sizeof(*op_data)); - op_data->op_bias = MDS_CROSS_REF; - op_data->op_fid1 = fid; - op_data->op_fid2 = fid; - req = NULL; - - tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds); - if (IS_ERR(tgt)) - GOTO(cleanup, rc = PTR_ERR(tgt)); - - CDEBUG(D_INODE, "Revalidate slave obj "DFID" -> mds #%d\n", - PFID(&fid), tgt->ltd_idx); - - rc = md_intent_lock(tgt->ltd_exp, op_data, NULL, 0, &it, 0, - &req, cb, extra_lock_flags); - - lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle; - if (rc > 0 && req == NULL) { - /* - * Nice, this slave is valid. - */ - CDEBUG(D_INODE, "Cached slave "DFID"\n", PFID(&fid)); - goto release_lock; - } - - if (rc < 0) - GOTO(cleanup, rc); - - if (master) { - /* - * Save lock on master to be returned to the caller. - */ - CDEBUG(D_INODE, "No lock on master "DFID" yet\n", - PFID(mid)); - memcpy(&master_lockh, lockh, sizeof(master_lockh)); - master_lockm = it.d.lustre.it_lock_mode; - it.d.lustre.it_lock_mode = 0; - } else { - /* - * This is slave. We want to control it. - */ - lock = ldlm_handle2lock(lockh); - LASSERT(lock != NULL); - lock->l_ast_data = lmv_object_get(obj); - LDLM_LOCK_PUT(lock); - } - - if (*reqp == NULL) { - /* - * This is first reply, we'll use it to return updated - * data back to the caller. - */ - LASSERT(req != NULL); - ptlrpc_request_addref(req); - *reqp = req; - } - - body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - LASSERT(body != NULL); - -update: - obj->lo_stripes[i].ls_size = body->size; - - CDEBUG(D_INODE, "Fresh size %lu from "DFID"\n", - (unsigned long)obj->lo_stripes[i].ls_size, PFID(&fid)); - - if (req) - ptlrpc_req_finished(req); -release_lock: - size += obj->lo_stripes[i].ls_size; - - if (it.d.lustre.it_lock_mode && lockh) { - ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode); - it.d.lustre.it_lock_mode = 0; - } - } - - if (*reqp) { - /* - * Some attrs got refreshed, we have reply and it's time to put - * fresh attrs to it. - */ - CDEBUG(D_INODE, "Return refreshed attrs: size = %lu for "DFID"\n", - (unsigned long)size, PFID(mid)); - - body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY); - LASSERT(body != NULL); - body->size = size; - - if (mreq == NULL) { - /* - * Very important to maintain mds num the same because - * of revalidation. mreq == NULL means that caller has - * no reply and the only attr we can return is size. - */ - body->valid = OBD_MD_FLSIZE; - } - if (master_valid == 0) { - oit->d.lustre.it_lock_handle = master_lockh.cookie; - oit->d.lustre.it_lock_mode = master_lockm; - } - rc = 0; - } else { - /* - * It seems all the attrs are fresh and we did no request. - */ - CDEBUG(D_INODE, "All the attrs were fresh on "DFID"\n", - PFID(mid)); - if (master_valid == 0) - oit->d.lustre.it_lock_mode = master_lockm; - rc = 1; - } - - EXIT; -cleanup: - OBD_FREE_PTR(op_data); - lmv_object_put_unlock(obj); - return rc; -} - -int lmv_allocate_slaves(struct obd_device *obd, struct lu_fid *pid, - struct md_op_data *op, struct lu_fid *fid) -{ - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_object *obj; - mdsno_t mds; - int sidx; - int rc; - ENTRY; - - obj = lmv_object_find(obd, pid); - if (obj == NULL) - RETURN(-EALREADY); - - sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, - (char *)op->op_name, op->op_namelen); - mds = obj->lo_stripes[sidx].ls_mds; - lmv_object_put(obj); - - rc = __lmv_fid_alloc(lmv, fid, mds); - if (rc) { - CERROR("Can't allocate fid, rc %d\n", rc); - RETURN(rc); - } - - CDEBUG(D_INODE, "Allocate new fid "DFID" for slave " - "obj -> mds #%x\n", PFID(fid), mds); - - RETURN(rc); -} diff --git a/lustre/lmv/lmv_internal.h b/lustre/lmv/lmv_internal.h index 9b46b54..3e3b842 100644 --- a/lustre/lmv/lmv_internal.h +++ b/lustre/lmv/lmv_internal.h @@ -48,109 +48,6 @@ #define LL_IT2STR(it) \ ((it) ? ldlm_it2str((it)->it_op) : "0") -struct lmv_stripe { - /** - * Dir stripe fid. - */ - struct lu_fid ls_fid; - /** - * Cached home mds number for \a li_fid. - */ - mdsno_t ls_mds; - /** - * Stripe object size. - */ - unsigned long ls_size; - /** - * Stripe flags. - */ - int ls_flags; -}; - -#define O_FREEING (1 << 0) - -struct lmv_object { - /** - * Link to global objects list. - */ - cfs_list_t lo_list; - /** - * Sema for protecting fields. - */ - struct mutex lo_guard; - /** - * Object state like O_FREEING. - */ - int lo_state; - /** - * Object ref counter. - */ - cfs_atomic_t lo_count; - /** - * Object master fid. - */ - struct lu_fid lo_fid; - /** - * Object hash type to find stripe by name. - */ - __u32 lo_hashtype; - /** - * Number of stripes. - */ - int lo_objcount; - /** - * Array of sub-objs. - */ - struct lmv_stripe *lo_stripes; - /** - * Pointer to LMV obd. - */ - struct obd_device *lo_obd; -}; - -int lmv_object_setup(struct obd_device *obd); -void lmv_object_cleanup(struct obd_device *obd); - -static inline void -lmv_object_lock(struct lmv_object *obj) -{ - LASSERT(obj); - mutex_lock(&obj->lo_guard); -} - -static inline void -lmv_object_unlock(struct lmv_object *obj) -{ - LASSERT(obj); - mutex_unlock(&obj->lo_guard); -} - -void lmv_object_add(struct lmv_object *obj); -void lmv_object_del(struct lmv_object *obj); - -void lmv_object_put(struct lmv_object *obj); -void lmv_object_put_unlock(struct lmv_object *obj); -void lmv_object_free(struct lmv_object *obj); - -struct lmv_object *lmv_object_get(struct lmv_object *obj); - -struct lmv_object *lmv_object_find(struct obd_device *obd, - const struct lu_fid *fid); - -struct lmv_object *lmv_object_find_lock(struct obd_device *obd, - const struct lu_fid *fid); - -struct lmv_object *lmv_object_alloc(struct obd_device *obd, - const struct lu_fid *fid, - struct lmv_stripe_md *mea); - -struct lmv_object *lmv_object_create(struct obd_export *exp, - const struct lu_fid *fid, - struct lmv_stripe_md *mea); - -int lmv_object_delete(struct obd_export *exp, - const struct lu_fid *fid); - int lmv_check_connect(struct obd_device *obd); int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data, @@ -171,15 +68,6 @@ int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data, ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags); -int lmv_allocate_slaves(struct obd_device *obd, struct lu_fid *pid, - struct md_op_data *op, struct lu_fid *fid); - -int lmv_revalidate_slaves(struct obd_export *, struct ptlrpc_request **, - const struct lu_fid *, struct lookup_intent *, int, - ldlm_blocking_callback cb_blocking, - __u64 extra_lock_flags); - -int lmv_handle_split(struct obd_export *, const struct lu_fid *); int lmv_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *, void *, int); int lmv_fld_lookup(struct lmv_obd *lmv, const struct lu_fid *fid, @@ -243,6 +131,9 @@ lmv_find_target(struct lmv_obd *lmv, const struct lu_fid *fid) return lmv_get_target(lmv, mds); } +struct lmv_tgt_desc +*lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data, + struct lu_fid *fid); /* lproc_lmv.c */ #ifdef LPROCFS void lprocfs_lmv_init_vars(struct lprocfs_static_vars *lvars); diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index e8044f9..ecba415 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -59,10 +59,6 @@ #include #include "lmv_internal.h" -/* object cache. */ -cfs_mem_cache_t *lmv_object_cache; -cfs_atomic_t lmv_object_count = CFS_ATOMIC_INIT(0); - static void lmv_activate_target(struct lmv_obd *lmv, struct lmv_tgt_desc *tgt, int activate) @@ -855,6 +851,7 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, RETURN(rc); } +#if 0 static int lmv_all_chars_policy(int count, const char *name, int len) { @@ -897,6 +894,7 @@ static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data, CERROR("Unsupported placement policy %x\n", placement); return -EINVAL; } +#endif /** * This is _inode_ placement policy function (not name). @@ -905,67 +903,14 @@ static int lmv_placement_policy(struct obd_device *obd, struct md_op_data *op_data, mdsno_t *mds) { - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_object *obj; - int rc; - ENTRY; - - LASSERT(mds != NULL); - - if (lmv->desc.ld_tgt_count == 1) { - *mds = 0; - RETURN(0); - } - - /* - * Allocate new fid on target according to operation type and parent - * home mds. - */ - obj = lmv_object_find(obd, &op_data->op_fid1); - if (obj != NULL || op_data->op_name == NULL || - op_data->op_opc != LUSTRE_OPC_MKDIR) { - /* - * Allocate fid for non-dir or for null name or for case parent - * dir is split. - */ - if (obj) { - lmv_object_put(obj); - - /* - * If we have this flag turned on, and we see that - * parent dir is split, this means, that caller did not - * notice split yet. This is race and we would like to - * let caller know that. - */ - if (op_data->op_bias & MDS_CHECK_SPLIT) - RETURN(-ERESTART); - } - - /* - * Allocate new fid on same mds where parent fid is located and - * where operation will be sent. In case of split dir, ->op_fid1 - * and ->op_mds here will contain fid and mds of slave directory - * object (assigned by caller). - */ - *mds = op_data->op_mds; - rc = 0; - } else { - /* - * Parent directory is not split and we want to create a - * directory in it. Let's calculate where to place it according - * to operation data @op_data. - */ - *mds = lmv_choose_mds(lmv, op_data, lmv->lmv_placement); - rc = 0; - } + LASSERT(mds != NULL); - if (rc) { - CERROR("Can't choose MDS, err = %d\n", rc); - } else { - LASSERT(*mds < lmv->desc.ld_tgt_count); - } + /* Allocate new fid on target according to to different + * QOS policy. In DNE phase I, llite should always tell + * which MDT where the dir will be located */ + *mds = op_data->op_mds; - RETURN(rc); + RETURN(0); } int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, @@ -1078,13 +1023,7 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) spin_lock_init(&lmv->lmv_lock); mutex_init(&lmv->init_mutex); - rc = lmv_object_setup(obd); - if (rc) { - CERROR("Can't setup LMV object manager, error %d.\n", rc); - GOTO(out_free_datas, rc); - } - - lprocfs_lmv_init_vars(&lvars); + lprocfs_lmv_init_vars(&lvars); lprocfs_obd_setup(obd, lvars.obd_vars); #ifdef LPROCFS { @@ -1115,15 +1054,14 @@ out_free_tgts: static int lmv_cleanup(struct obd_device *obd) { - struct lmv_obd *lmv = &obd->u.lmv; - ENTRY; + struct lmv_obd *lmv = &obd->u.lmv; + ENTRY; - fld_client_fini(&lmv->lmv_fld); - lmv_object_cleanup(obd); - OBD_FREE(lmv->datas, lmv->datas_size); - OBD_FREE(lmv->tgts, lmv->tgts_size); + fld_client_fini(&lmv->lmv_fld); + OBD_FREE(lmv->datas, lmv->datas_size); + OBD_FREE(lmv->tgts, lmv->tgts_size); - RETURN(0); + RETURN(0); } static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf) @@ -1271,9 +1209,7 @@ static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data, struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *tgt; - struct lmv_object *obj; int rc; - int i; ENTRY; rc = lmv_check_connect(obd); @@ -1290,51 +1226,6 @@ static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data, } rc = md_getattr(tgt->ltd_exp, op_data, request); - if (rc) - RETURN(rc); - - obj = lmv_object_find_lock(obd, &op_data->op_fid1); - - CDEBUG(D_INODE, "GETATTR for "DFID" %s\n", PFID(&op_data->op_fid1), - obj ? "(split)" : ""); - - /* - * If object is split, then we loop over all the slaves and gather size - * attribute. In ideal world we would have to gather also mds field from - * all slaves, as object is spread over the cluster and this is - * definitely interesting information and it is not good to loss it, - * but... - */ - if (obj) { - struct mdt_body *body; - - if (*request == NULL) { - lmv_object_put(obj); - RETURN(rc); - } - - body = req_capsule_server_get(&(*request)->rq_pill, - &RMF_MDT_BODY); - LASSERT(body != NULL); - - for (i = 0; i < obj->lo_objcount; i++) { - if (lmv->tgts[i].ltd_exp == NULL) { - CWARN("%s: NULL export for %d\n", - obd->obd_name, i); - continue; - } - - /* - * Skip master object. - */ - if (lu_fid_eq(&obj->lo_fid, &obj->lo_stripes[i].ls_fid)) - continue; - - body->size += obj->lo_stripes[i].ls_size; - } - - lmv_object_put_unlock(obj); - } RETURN(rc); } @@ -1417,73 +1308,16 @@ static int lmv_close(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); } -/** - * Called in the case MDS returns -ERESTART on create on open, what means that - * directory is split and its LMV presentation object has to be updated. - */ -int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid) +struct lmv_tgt_desc +*lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data, + struct lu_fid *fid) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct ptlrpc_request *req = NULL; - struct lmv_tgt_desc *tgt; - struct lmv_object *obj; - struct lustre_md md; - struct md_op_data *op_data; - int mealen; - int rc; - __u64 valid; - ENTRY; - - md.mea = NULL; - mealen = lmv_get_easize(lmv); - - valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA; - - tgt = lmv_find_target(lmv, fid); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); - - /* - * Time to update mea of parent fid. - */ - - OBD_ALLOC_PTR(op_data); - if (op_data == NULL) - RETURN(-ENOMEM); - - op_data->op_fid1 = *fid; - op_data->op_mode = mealen; - op_data->op_valid = valid; - - rc = md_getattr(tgt->ltd_exp, op_data, &req); - OBD_FREE_PTR(op_data); - if (rc) { - CERROR("md_getattr() failed, error %d\n", rc); - GOTO(cleanup, rc); - } - - rc = md_get_lustre_md(tgt->ltd_exp, req, NULL, exp, &md); - if (rc) { - CERROR("md_get_lustre_md() failed, error %d\n", rc); - GOTO(cleanup, rc); - } - - if (md.mea == NULL) - GOTO(cleanup, rc = -ENODATA); + struct lmv_tgt_desc *tgt; - obj = lmv_object_create(exp, fid, md.mea); - if (IS_ERR(obj)) - rc = PTR_ERR(obj); - else - lmv_object_put(obj); + tgt = lmv_find_target(lmv, fid); + op_data->op_mds = tgt->ltd_idx; - obd_free_memmd(exp, (void *)&md.mea); - EXIT; -cleanup: - if (req) - ptlrpc_req_finished(req); - return rc; + return tgt; } int lmv_create(struct obd_export *exp, struct md_op_data *op_data, @@ -1491,81 +1325,41 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data, __u32 gid, cfs_cap_t cap_effective, __u64 rdev, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; - struct lmv_object *obj; - int rc; - int loop = 0; - int sidx; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; + ENTRY; - rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); + rc = lmv_check_connect(obd); + if (rc) + RETURN(rc); - if (!lmv->desc.ld_active_tgt_count) - RETURN(-EIO); -repeat: - ++loop; - LASSERT(loop <= 2); - - obj = lmv_object_find(obd, &op_data->op_fid1); - if (obj) { - sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, - op_data->op_name, op_data->op_namelen); - op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid; - op_data->op_bias &= ~MDS_CHECK_SPLIT; - op_data->op_mds = obj->lo_stripes[sidx].ls_mds; - tgt = lmv_get_target(lmv, op_data->op_mds); - lmv_object_put(obj); - } else { - tgt = lmv_find_target(lmv, &op_data->op_fid1); - op_data->op_bias |= MDS_CHECK_SPLIT; - op_data->op_mds = tgt->ltd_idx; - } + if (!lmv->desc.ld_active_tgt_count) + RETURN(-EIO); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); + tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data); - if (rc == -ERESTART) - goto repeat; - else if (rc) - RETURN(rc); + rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data); + if (rc) + RETURN(rc); - CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n", - op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1), - op_data->op_mds); + CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n", + op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1), + op_data->op_mds); - op_data->op_flags |= MF_MDC_CANCEL_FID1; - rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid, - cap_effective, rdev, request); - if (rc == 0) { - if (*request == NULL) - RETURN(rc); - CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2)); - } else if (rc == -ERESTART) { - LASSERT(*request != NULL); - DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, - "Got -ERESTART during create!\n"); - ptlrpc_req_finished(*request); - *request = NULL; + op_data->op_flags |= MF_MDC_CANCEL_FID1; + rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid, + cap_effective, rdev, request); - /* - * Directory got split. Time to update local object and repeat - * the request with proper MDS. - */ - rc = lmv_handle_split(exp, &op_data->op_fid1); - if (rc == 0) { - rc = lmv_allocate_slaves(obd, &op_data->op_fid1, - op_data, &op_data->op_fid2); - if (rc) - RETURN(rc); - goto repeat; - } - } - RETURN(rc); + if (rc == 0) { + if (*request == NULL) + RETURN(rc); + CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2)); + } + RETURN(rc); } static int lmv_done_writing(struct obd_export *exp, @@ -1591,73 +1385,6 @@ static int lmv_done_writing(struct obd_export *exp, } static int -lmv_enqueue_slaves(struct obd_export *exp, struct ldlm_enqueue_info *einfo, - struct lookup_intent *it, struct md_op_data *op_data, - struct lustre_handle *lockh, void *lmm, int lmmsize) -{ - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_stripe_md *mea = op_data->op_mea1; - struct md_op_data *op_data2; - struct lmv_tgt_desc *tgt; - int i; - int rc = 0; - ENTRY; - - OBD_ALLOC_PTR(op_data2); - if (op_data2 == NULL) - RETURN(-ENOMEM); - - LASSERT(mea != NULL); - for (i = 0; i < mea->mea_count; i++) { - memset(op_data2, 0, sizeof(*op_data2)); - op_data2->op_fid1 = mea->mea_ids[i]; - op_data2->op_bias = 0; - - tgt = lmv_find_target(lmv, &op_data2->op_fid1); - if (IS_ERR(tgt)) - GOTO(cleanup, rc = PTR_ERR(tgt)); - - if (tgt->ltd_exp == NULL) - continue; - - rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data2, - lockh + i, lmm, lmmsize, NULL, 0); - - CDEBUG(D_INODE, "Take lock on slave "DFID" -> %d/%d\n", - PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status); - - if (rc) - GOTO(cleanup, rc); - - if (it->d.lustre.it_data) { - struct ptlrpc_request *req; - req = (struct ptlrpc_request *)it->d.lustre.it_data; - ptlrpc_req_finished(req); - } - - if (it->d.lustre.it_status) - GOTO(cleanup, rc = it->d.lustre.it_status); - } - - EXIT; -cleanup: - OBD_FREE_PTR(op_data2); - - if (rc != 0) { - /* - * Drop all taken locks. - */ - while (--i >= 0) { - if (lockh[i].cookie) - ldlm_lock_decref(lockh + i, einfo->ei_mode); - lockh[i].cookie = 0; - } - } - return rc; -} - -static int lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo, struct lookup_intent *it, struct md_op_data *op_data, struct lustre_handle *lockh, void *lmm, int lmmsize, @@ -1723,145 +1450,89 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, struct lustre_handle *lockh, void *lmm, int lmmsize, struct ptlrpc_request **req, __u64 extra_lock_flags) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; - struct lmv_object *obj; - int sidx; - int rc; - ENTRY; - - rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); - - CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n", - LL_IT2STR(it), PFID(&op_data->op_fid1)); + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; + ENTRY; - if (op_data->op_mea1 && it && it->it_op == IT_UNLINK) { - rc = lmv_enqueue_slaves(exp, einfo, it, op_data, - lockh, lmm, lmmsize); - RETURN(rc); - } + rc = lmv_check_connect(obd); + if (rc) + RETURN(rc); - obj = lmv_object_find(obd, &op_data->op_fid1); - if (obj && op_data->op_namelen) { - sidx = raw_name2idx(obj->lo_hashtype, - obj->lo_objcount, - (char *)op_data->op_name, - op_data->op_namelen); - op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid; - tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds); - } else { - tgt = lmv_find_target(lmv, &op_data->op_fid1); - } - if (obj) - lmv_object_put(obj); + CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n", + LL_IT2STR(it), PFID(&op_data->op_fid1)); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); + tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n", - LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx); + CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n", + LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx); - rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh, - lmm, lmmsize, req, extra_lock_flags); + rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh, + lmm, lmmsize, req, extra_lock_flags); - if (rc == 0 && it && it->it_op == IT_OPEN) { - rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh, - lmm, lmmsize, extra_lock_flags); - } - RETURN(rc); + if (rc == 0 && it && it->it_op == IT_OPEN) { + rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh, + lmm, lmmsize, extra_lock_flags); + } + RETURN(rc); } static int lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data, struct ptlrpc_request **request) { - struct ptlrpc_request *req = NULL; - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lu_fid rid = op_data->op_fid1; - struct lmv_tgt_desc *tgt; - struct mdt_body *body; - struct lmv_object *obj; - obd_valid valid = op_data->op_valid; - int rc; - int loop = 0; - int sidx; - ENTRY; + struct ptlrpc_request *req = NULL; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + struct mdt_body *body; + int rc; + ENTRY; - rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); + rc = lmv_check_connect(obd); + if (rc) + RETURN(rc); -repeat: - ++loop; - LASSERT(loop <= 2); - obj = lmv_object_find(obd, &rid); - if (obj) { - sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, - op_data->op_name, op_data->op_namelen); - rid = obj->lo_stripes[sidx].ls_fid; - tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds); - op_data->op_mds = obj->lo_stripes[sidx].ls_mds; - valid &= ~OBD_MD_FLCKSPLIT; - lmv_object_put(obj); - } else { - tgt = lmv_find_target(lmv, &rid); - valid |= OBD_MD_FLCKSPLIT; - op_data->op_mds = tgt->ltd_idx; - } - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); + tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" - "DFID" -> mds #%d\n", - op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1), - PFID(&rid), tgt->ltd_idx); - - op_data->op_valid = valid; - op_data->op_fid1 = rid; - rc = md_getattr_name(tgt->ltd_exp, op_data, request); - if (rc == 0) { - body = req_capsule_server_get(&(*request)->rq_pill, - &RMF_MDT_BODY); - LASSERT(body != NULL); - - if (body->valid & OBD_MD_MDS) { - rid = body->fid1; - CDEBUG(D_INODE, "Request attrs for "DFID"\n", - PFID(&rid)); - - tgt = lmv_find_target(lmv, &rid); - if (IS_ERR(tgt)) { - ptlrpc_req_finished(*request); - RETURN(PTR_ERR(tgt)); - } + CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n", + op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1), + tgt->ltd_idx); - op_data->op_fid1 = rid; - op_data->op_valid |= OBD_MD_FLCROSSREF; - op_data->op_namelen = 0; - op_data->op_name = NULL; - rc = md_getattr_name(tgt->ltd_exp, op_data, &req); - ptlrpc_req_finished(*request); - *request = req; - } - } else if (rc == -ERESTART) { - LASSERT(*request != NULL); - DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, - "Got -ERESTART during getattr!\n"); - ptlrpc_req_finished(*request); - *request = NULL; + rc = md_getattr_name(tgt->ltd_exp, op_data, request); + if (rc != 0) + RETURN(rc); - /* - * Directory got split. Time to update local object and repeat - * the request with proper MDS. - */ - rc = lmv_handle_split(exp, &rid); - if (rc == 0) - goto repeat; - } - RETURN(rc); + body = req_capsule_server_get(&(*request)->rq_pill, + &RMF_MDT_BODY); + LASSERT(body != NULL); + + if (body->valid & OBD_MD_MDS) { + struct lu_fid rid = body->fid1; + CDEBUG(D_INODE, "Request attrs for "DFID"\n", + PFID(&rid)); + + tgt = lmv_find_target(lmv, &rid); + if (IS_ERR(tgt)) { + ptlrpc_req_finished(*request); + RETURN(PTR_ERR(tgt)); + } + + op_data->op_fid1 = rid; + op_data->op_valid |= OBD_MD_FLCROSSREF; + op_data->op_namelen = 0; + op_data->op_name = NULL; + rc = md_getattr_name(tgt->ltd_exp, op_data, &req); + ptlrpc_req_finished(*request); + *request = req; + } + + RETURN(rc); } #define md_op_data_fid(op_data, fl) \ @@ -1871,59 +1542,6 @@ repeat: fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \ NULL) -static int lmv_early_cancel_slaves(struct obd_export *exp, - struct md_op_data *op_data, int op_tgt, - ldlm_mode_t mode, int bits, int flag) -{ - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - ldlm_policy_data_t policy = {{0}}; - struct lu_fid *op_fid; - struct lu_fid *st_fid; - struct lmv_tgt_desc *tgt; - struct lmv_object *obj; - int rc = 0; - int i; - ENTRY; - - op_fid = md_op_data_fid(op_data, flag); - if (!fid_is_sane(op_fid)) - RETURN(0); - - obj = lmv_object_find(obd, op_fid); - if (obj == NULL) - RETURN(-EALREADY); - - policy.l_inodebits.bits = bits; - for (i = 0; i < obj->lo_objcount; i++) { - tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds); - st_fid = &obj->lo_stripes[i].ls_fid; - if (op_tgt != tgt->ltd_idx) { - CDEBUG(D_INODE, "EARLY_CANCEL slave "DFID" -> mds #%d\n", - PFID(st_fid), tgt->ltd_idx); - rc = md_cancel_unused(tgt->ltd_exp, st_fid, &policy, - mode, LCF_ASYNC, NULL); - if (rc) - GOTO(out_put_obj, rc); - } else { - CDEBUG(D_INODE, - "EARLY_CANCEL skip operation target %d on "DFID"\n", - op_tgt, PFID(st_fid)); - /* - * Do not cancel locks for operation target, they will - * be handled later in underlaying layer when calling - * function we run on behalf of. - */ - *op_fid = *st_fid; - op_data->op_flags |= flag; - } - } - EXIT; -out_put_obj: - lmv_object_put(obj); - return rc; -} - static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data, int op_tgt, ldlm_mode_t mode, int bits, int flag) { @@ -1932,38 +1550,30 @@ static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data, struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *tgt; ldlm_policy_data_t policy = {{0}}; - struct lmv_object *obj; int rc = 0; ENTRY; if (!fid_is_sane(fid)) RETURN(0); - obj = lmv_object_find(obd, fid); - if (obj) { - rc = lmv_early_cancel_slaves(exp, op_data, op_tgt, mode, - bits, flag); - lmv_object_put(obj); - } else { - tgt = lmv_find_target(lmv, fid); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); - - if (tgt->ltd_idx != op_tgt) { - CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid)); - policy.l_inodebits.bits = bits; - rc = md_cancel_unused(tgt->ltd_exp, fid, &policy, - mode, LCF_ASYNC, NULL); - } else { - CDEBUG(D_INODE, - "EARLY_CANCEL skip operation target %d on "DFID"\n", - op_tgt, PFID(fid)); - op_data->op_flags |= flag; - rc = 0; - } + tgt = lmv_find_target(lmv, fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); + + if (tgt->ltd_idx != op_tgt) { + CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid)); + policy.l_inodebits.bits = bits; + rc = md_cancel_unused(tgt->ltd_exp, fid, &policy, + mode, LCF_ASYNC, NULL); + } else { + CDEBUG(D_INODE, + "EARLY_CANCEL skip operation target %d on "DFID"\n", + op_tgt, PFID(fid)); + op_data->op_flags |= flag; + rc = 0; + } - } - RETURN(rc); + RETURN(rc); } /* @@ -1973,77 +1583,41 @@ static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data, static int lmv_link(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; - struct lmv_object *obj; - int rc; - int loop = 0; - mdsno_t mds; - int sidx; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; + ENTRY; - rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); + rc = lmv_check_connect(obd); + if (rc) + RETURN(rc); -repeat: - ++loop; - LASSERT(loop <= 2); - LASSERT(op_data->op_namelen != 0); - - CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n", - PFID(&op_data->op_fid2), op_data->op_namelen, - op_data->op_name, PFID(&op_data->op_fid1)); - - obj = lmv_object_find(obd, &op_data->op_fid2); - if (obj) { - sidx = raw_name2idx(obj->lo_hashtype, - obj->lo_objcount, - op_data->op_name, - op_data->op_namelen); - op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid; - mds = obj->lo_stripes[sidx].ls_mds; - lmv_object_put(obj); - } else { - rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds); - if (rc) - RETURN(rc); - } + LASSERT(op_data->op_namelen != 0); - CDEBUG(D_INODE, "Forward to mds #%x ("DFID")\n", - mds, PFID(&op_data->op_fid1)); + CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n", + PFID(&op_data->op_fid2), op_data->op_namelen, + op_data->op_name, PFID(&op_data->op_fid1)); - op_data->op_fsuid = cfs_curproc_fsuid(); - op_data->op_fsgid = cfs_curproc_fsgid(); - op_data->op_cap = cfs_curproc_cap_pack(); - tgt = lmv_get_target(lmv, mds); + op_data->op_fsuid = cfs_curproc_fsuid(); + op_data->op_fsgid = cfs_curproc_fsgid(); + op_data->op_cap = cfs_curproc_cap_pack(); + tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - /* - * Cancel UPDATE lock on child (fid1). - */ - op_data->op_flags |= MF_MDC_CANCEL_FID2; - rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX, - MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1); - if (rc == 0) - rc = md_link(tgt->ltd_exp, op_data, request); - if (rc == -ERESTART) { - LASSERT(*request != NULL); - DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, - "Got -ERESTART during link!\n"); - ptlrpc_req_finished(*request); - *request = NULL; + /* + * Cancel UPDATE lock on child (fid1). + */ + op_data->op_flags |= MF_MDC_CANCEL_FID2; + rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX, + MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1); + if (rc != 0) + RETURN(rc); - /* - * Directory got split. Time to update local object and repeat - * the request with proper MDS. - */ - rc = lmv_handle_split(exp, &op_data->op_fid2); - if (rc == 0) - goto repeat; - } + rc = md_link(tgt->ltd_exp, op_data, request); - RETURN(rc); + RETURN(rc); } static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, @@ -2053,13 +1627,9 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *src_tgt; - int rc; - int sidx; - int loop = 0; - struct lmv_object *obj; - mdsno_t mds1; - mdsno_t mds2; - ENTRY; + struct lmv_tgt_desc *tgt_tgt; + int rc; + ENTRY; LASSERT(oldlen != 0); @@ -2071,99 +1641,51 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, if (rc) RETURN(rc); -repeat: - ++loop; - LASSERT(loop <= 2); - obj = lmv_object_find(obd, &op_data->op_fid1); - if (obj) { - sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, - (char *)old, oldlen); - op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid; - mds1 = obj->lo_stripes[sidx].ls_mds; - CDEBUG(D_INODE, "Parent obj "DFID"\n", PFID(&op_data->op_fid1)); - lmv_object_put(obj); - } else { - rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds1); - if (rc) - RETURN(rc); - } - - obj = lmv_object_find(obd, &op_data->op_fid2); - if (obj) { - /* - * Directory is already split, so we have to forward request to - * the right MDS. - */ - sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, - (char *)new, newlen); - - mds2 = obj->lo_stripes[sidx].ls_mds; - op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid; - CDEBUG(D_INODE, "Parent obj "DFID"\n", PFID(&op_data->op_fid2)); - lmv_object_put(obj); - } else { - rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds2); - if (rc) - RETURN(rc); - } - - op_data->op_fsuid = cfs_curproc_fsuid(); - op_data->op_fsgid = cfs_curproc_fsgid(); - op_data->op_cap = cfs_curproc_cap_pack(); - - src_tgt = lmv_get_target(lmv, mds1); - - /* - * LOOKUP lock on src child (fid3) should also be cancelled for - * src_tgt in mdc_rename. - */ - op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3; - - /* - * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its - * own target. - */ - rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, - LCK_EX, MDS_INODELOCK_UPDATE, - MF_MDC_CANCEL_FID2); - - /* - * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt. - */ - if (rc == 0) { - rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, - LCK_EX, MDS_INODELOCK_LOOKUP, - MF_MDC_CANCEL_FID4); - } + op_data->op_fsuid = cfs_curproc_fsuid(); + op_data->op_fsgid = cfs_curproc_fsgid(); + op_data->op_cap = cfs_curproc_cap_pack(); + src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1); + if (IS_ERR(src_tgt)) + RETURN(PTR_ERR(src_tgt)); + + tgt_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2); + if (IS_ERR(tgt_tgt)) + RETURN(PTR_ERR(tgt_tgt)); + /* + * LOOKUP lock on src child (fid3) should also be cancelled for + * src_tgt in mdc_rename. + */ + op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3; + + /* + * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its + * own target. + */ + rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, + LCK_EX, MDS_INODELOCK_UPDATE, + MF_MDC_CANCEL_FID2); + + /* + * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt. + */ + if (rc == 0) { + rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, + LCK_EX, MDS_INODELOCK_LOOKUP, + MF_MDC_CANCEL_FID4); + } - /* - * Cancel all the locks on tgt child (fid4). - */ - if (rc == 0) - rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, - LCK_EX, MDS_INODELOCK_FULL, - MF_MDC_CANCEL_FID4); - - if (rc == 0) - rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen, - new, newlen, request); - - if (rc == -ERESTART) { - LASSERT(*request != NULL); - DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, - "Got -ERESTART during rename!\n"); - ptlrpc_req_finished(*request); - *request = NULL; + /* + * Cancel all the locks on tgt child (fid4). + */ + if (rc == 0) + rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, + LCK_EX, MDS_INODELOCK_FULL, + MF_MDC_CANCEL_FID4); - /* - * Directory got split. Time to update local object and repeat - * the request with proper MDS. - */ - rc = lmv_handle_split(exp, &op_data->op_fid1); - if (rc == 0) - goto repeat; - } - RETURN(rc); + if (rc == 0) + rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen, + new, newlen, request); + RETURN(rc); } static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, @@ -2171,62 +1693,28 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request, struct md_open_data **mod) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct ptlrpc_request *req; - struct lmv_tgt_desc *tgt; - struct lmv_object *obj; - int rc = 0; - int i; - ENTRY; - - rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); - - obj = lmv_object_find(obd, &op_data->op_fid1); - - CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x%s\n", - PFID(&op_data->op_fid1), op_data->op_attr.ia_valid, - obj ? ", split" : ""); + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc = 0; + ENTRY; - op_data->op_flags |= MF_MDC_CANCEL_FID1; - if (obj) { - for (i = 0; i < obj->lo_objcount; i++) { - op_data->op_fid1 = obj->lo_stripes[i].ls_fid; + rc = lmv_check_connect(obd); + if (rc) + RETURN(rc); - tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds); - if (IS_ERR(tgt)) { - rc = PTR_ERR(tgt); - break; - } + CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x\n", + PFID(&op_data->op_fid1), op_data->op_attr.ia_valid); - rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, - ea2, ea2len, &req, mod); - - if (lu_fid_eq(&obj->lo_fid, &obj->lo_stripes[i].ls_fid)) { - /* - * This is master object and this request should - * be returned back to llite. - */ - *request = req; - } else { - ptlrpc_req_finished(req); - } + op_data->op_flags |= MF_MDC_CANCEL_FID1; + tgt = lmv_find_target(lmv, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - if (rc) - break; - } - lmv_object_put(obj); - } else { - tgt = lmv_find_target(lmv, &op_data->op_fid1); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); + rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2, + ea2len, request, mod); - rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2, - ea2len, request, mod); - } - RETURN(rc); + RETURN(rc); } static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid, @@ -2250,49 +1738,7 @@ static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid, RETURN(rc); } -/** - * Main purpose of LMV blocking ast is to remove split directory LMV - * presentation object (struct lmv_object) attached to the lock being revoked. - */ -int lmv_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, int flag) -{ - struct lustre_handle lockh; - struct lmv_object *obj; - int rc; - ENTRY; - - switch (flag) { - case LDLM_CB_BLOCKING: - ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); - if (rc < 0) { - CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc); - RETURN(rc); - } - break; - case LDLM_CB_CANCELING: - /* - * Time to drop cached attrs for split directory object - */ - obj = lock->l_ast_data; - if (obj) { - CDEBUG(D_INODE, "Cancel %s on "LPU64"/"LPU64 - ", master "DFID"\n", - lock->l_resource->lr_name.name[3] == 1 ? - "LOOKUP" : "UPDATE", - lock->l_resource->lr_name.name[0], - lock->l_resource->lr_name.name[1], - PFID(&obj->lo_fid)); - lmv_object_put(obj); - } - break; - default: - LBUG(); - } - RETURN(0); -} - +#if 0 static void lmv_hash_adjust(__u64 *hash, __u64 hash_adj) { __u64 val; @@ -2320,29 +1766,20 @@ static __u32 lmv_node_rank(struct obd_export *exp, const struct lu_fid *fid) return id ^ (id >> 32); } +#endif static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data, struct page **pages, struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_object *obj; - struct lu_fid rid = op_data->op_fid1; __u64 offset = op_data->op_offset; - __u64 hash_adj = 0; - __u32 rank = 0; - __u64 seg_size = 0; - __u64 tgt_tmp = 0; - int tgt_idx = 0; - int tgt0_idx = 0; int rc; - int nr = 0; int i; /* number of pages read, in CFS_PAGE_SIZE */ int nrdpgs; /* number of pages transferred in LU_PAGE_SIZE */ int nlupgs; - struct lmv_stripe *los; struct lmv_tgt_desc *tgt; struct lu_dirpage *dp; struct lu_dirent *ent; @@ -2352,258 +1789,170 @@ static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data, if (rc) RETURN(rc); - CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n", offset, PFID(&rid)); - - /* - * This case handle directory lookup in clustered metadata case (i.e. - * split directory is located on multiple md servers.) - * each server keeps directory entries for certain range of hashes. - * E.g. we have N server and suppose hash range is 0 to MAX_HASH. - * first server will keep records with hashes [ 0 ... MAX_HASH / N - 1], - * second one with hashes [MAX_HASH / N ... 2 * MAX_HASH / N] and - * so on.... - * readdir can simply start reading entries from 0 - N server in - * order but that will not scale well as all client will request dir in - * to server in same order. - * Following algorithm does optimization: - * Instead of doing readdir in 1, 2, ...., N order, client with a - * rank R does readdir in R, R + 1, ..., N, 1, ... R - 1 order. - * (every client has rank R) - * But ll_readdir() expect offset range [0 to MAX_HASH/N) but - * since client ask dir from MDS{R} client has pages with offsets - * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj - * on hash values that we get. - */ - obj = lmv_object_find_lock(obd, &rid); - if (obj) { - nr = obj->lo_objcount; - LASSERT(nr > 0); - seg_size = MAX_HASH_SIZE; - do_div(seg_size, nr); - los = obj->lo_stripes; - tgt = lmv_get_target(lmv, los[0].ls_mds); - rank = lmv_node_rank(tgt->ltd_exp, &rid) % nr; - tgt_tmp = offset; - do_div(tgt_tmp, seg_size); - tgt0_idx = do_div(tgt_tmp, nr); - tgt_idx = (tgt0_idx + rank) % nr; - - if (tgt_idx < tgt0_idx) - /* - * Wrap around. - * - * Last segment has unusual length due to division - * rounding. - */ - hash_adj = MAX_HASH_SIZE - seg_size * nr; - else - hash_adj = 0; - - hash_adj += rank * seg_size; - - CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" " - LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj, - offset, tgt0_idx, offset + hash_adj, tgt_idx); - - offset = (offset + hash_adj) & MAX_HASH_SIZE; - rid = obj->lo_stripes[tgt_idx].ls_fid; - tgt = lmv_get_target(lmv, los[tgt_idx].ls_mds); - - CDEBUG(D_INODE, "Forward to "DFID" with offset %lu i %d\n", - PFID(&rid), (unsigned long)offset, tgt_idx); - } else - tgt = lmv_find_target(lmv, &rid); - - if (IS_ERR(tgt)) - GOTO(cleanup, rc = PTR_ERR(tgt)); + CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n", + offset, PFID(&op_data->op_fid1)); + + /* + * This case handle directory lookup in clustered metadata case (i.e. + * split directory is located on multiple md servers.) + * each server keeps directory entries for certain range of hashes. + * E.g. we have N server and suppose hash range is 0 to MAX_HASH. + * first server will keep records with hashes [ 0 ... MAX_HASH /N - 1], + * second one with hashes [MAX_HASH / N ... 2 * MAX_HASH / N] and + * so on.... + * readdir can simply start reading entries from 0 - N server in + * order but that will not scale well as all client will request dir in + * to server in same order. + * Following algorithm does optimization: + * Instead of doing readdir in 1, 2, ...., N order, client with a + * rank R does readdir in R, R + 1, ..., N, 1, ... R - 1 order. + * (every client has rank R) + * But ll_readdir() expect offset range [0 to MAX_HASH/N) but + * since client ask dir from MDS{R} client has pages with offsets + * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj + * on hash values that we get. + if (0) { + LASSERT(nr > 0); + seg_size = MAX_HASH_SIZE; + do_div(seg_size, nr); + los = obj->lo_stripes; + tgt = lmv_get_target(lmv, los[0].ls_mds); + rank = lmv_node_rank(tgt->ltd_exp, fid) % nr; + tgt_tmp = offset; + do_div(tgt_tmp, seg_size); + tgt0_idx = do_div(tgt_tmp, nr); + tgt_idx = (tgt0_idx + rank) % nr; + + if (tgt_idx < tgt0_idx) + * Wrap around. + * + * Last segment has unusual length due to division + * rounding. + hash_adj = MAX_HASH_SIZE - seg_size * nr; + else + hash_adj = 0; + + hash_adj += rank * seg_size; + + CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" " + LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj, + offset, tgt0_idx, offset + hash_adj, tgt_idx); + + offset = (offset + hash_adj) & MAX_HASH_SIZE; + rid = lsm->mea_oinfo[tgt_idx].lmo_fid; + tgt = lmv_get_target(lmv, lsm->mea_oinfo[tgt_idx].lmo_mds); + + CDEBUG(D_INODE, "Forward to "DFID" with offset %lu i %d\n", + PFID(&rid), (unsigned long)offset, tgt_idx); + } + */ + tgt = lmv_find_target(lmv, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - op_data->op_fid1 = rid; - rc = md_readpage(tgt->ltd_exp, op_data, pages, request); - if (rc) - GOTO(cleanup, rc); + rc = md_readpage(tgt->ltd_exp, op_data, pages, request); + if (rc != 0) + RETURN(rc); - nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1) - >> CFS_PAGE_SHIFT; - nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT; - LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK)); - LASSERT(nrdpgs > 0 && nrdpgs <= op_data->op_npages); + nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1) + >> CFS_PAGE_SHIFT; + nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT; + LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK)); + LASSERT(nrdpgs > 0 && nrdpgs <= op_data->op_npages); - CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs, - op_data->op_npages); + CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs, + op_data->op_npages); - for (i = 0; i < nrdpgs; i++) { + for (i = 0; i < nrdpgs; i++) { #if CFS_PAGE_SIZE > LU_PAGE_SIZE - struct lu_dirpage *first; - __u64 hash_end = 0; - __u32 flags = 0; + struct lu_dirpage *first; + __u64 hash_end = 0; + __u32 flags = 0; #endif - struct lu_dirent *tmp = NULL; - - dp = cfs_kmap(pages[i]); - if (obj) { - lmv_hash_adjust(&dp->ldp_hash_start, hash_adj); - lmv_hash_adjust(&dp->ldp_hash_end, hash_adj); - LASSERT(le64_to_cpu(dp->ldp_hash_start) <= - op_data->op_offset); - - if ((tgt0_idx != nr - 1) && - (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF)) - { - dp->ldp_hash_end = cpu_to_le32(seg_size * - (tgt0_idx + 1)); - CDEBUG(D_INODE, - ""DFID" reset end "LPX64" tgt %d\n", - PFID(&rid), - (__u64)le64_to_cpu(dp->ldp_hash_end), - tgt_idx); - } - } + struct lu_dirent *tmp = NULL; - ent = lu_dirent_start(dp); + dp = cfs_kmap(pages[i]); + ent = lu_dirent_start(dp); #if CFS_PAGE_SIZE > LU_PAGE_SIZE - first = dp; - hash_end = dp->ldp_hash_end; + first = dp; + hash_end = dp->ldp_hash_end; repeat: #endif - nlupgs--; - for (tmp = ent; ent != NULL; - tmp = ent, ent = lu_dirent_next(ent)) { - if (obj) - lmv_hash_adjust(&ent->lde_hash, hash_adj); - } + nlupgs--; + for (tmp = ent; ent != NULL; + tmp = ent, ent = lu_dirent_next(ent)); #if CFS_PAGE_SIZE > LU_PAGE_SIZE - dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE); - if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) { - ent = lu_dirent_start(dp); - - if (obj) { - lmv_hash_adjust(&dp->ldp_hash_end, hash_adj); - if ((tgt0_idx != nr - 1) && - (le64_to_cpu(dp->ldp_hash_end) == - MDS_DIR_END_OFF)) { - hash_end = cpu_to_le32(seg_size * - (tgt0_idx + 1)); - CDEBUG(D_INODE, - ""DFID" reset end "LPX64" tgt %d\n", - PFID(&rid), - (__u64)le64_to_cpu(hash_end), - tgt_idx); - } - } - hash_end = dp->ldp_hash_end; - flags = dp->ldp_flags; - - if (tmp) { - /* enlarge the end entry lde_reclen from 0 to - * first entry of next lu_dirpage, in this way - * several lu_dirpages can be stored into one - * client page on client. */ - tmp = ((void *)tmp) + - le16_to_cpu(tmp->lde_reclen); - tmp->lde_reclen = - cpu_to_le16((char *)(dp->ldp_entries) - - (char *)tmp); - goto repeat; - } - } - first->ldp_hash_end = hash_end; - first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE); - first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE); + dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE); + if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) { + ent = lu_dirent_start(dp); + + if (tmp) { + /* enlarge the end entry lde_reclen from 0 to + * first entry of next lu_dirpage, in this way + * several lu_dirpages can be stored into one + * client page on client. */ + tmp = ((void *)tmp) + + le16_to_cpu(tmp->lde_reclen); + tmp->lde_reclen = + cpu_to_le16((char *)(dp->ldp_entries) - + (char *)tmp); + goto repeat; + } + } + first->ldp_hash_end = hash_end; + first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE); + first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE); #else - SET_BUT_UNUSED(tmp); + SET_BUT_UNUSED(tmp); #endif - cfs_kunmap(pages[i]); - } - EXIT; -cleanup: - if (obj) - lmv_object_put_unlock(obj); - return rc; + cfs_kunmap(pages[i]); + } + RETURN(rc); } static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt = NULL; - struct lmv_object *obj; - int rc; - int sidx; - int loop = 0; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt = NULL; + int rc; + ENTRY; - rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); + rc = lmv_check_connect(obd); + if (rc) + RETURN(rc); -repeat: - ++loop; - LASSERT(loop <= 2); - LASSERT(op_data->op_namelen != 0); - - obj = lmv_object_find(obd, &op_data->op_fid1); - if (obj) { - sidx = raw_name2idx(obj->lo_hashtype, - obj->lo_objcount, - op_data->op_name, - op_data->op_namelen); - op_data->op_bias &= ~MDS_CHECK_SPLIT; - op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid; - tgt = lmv_get_target(lmv, - obj->lo_stripes[sidx].ls_mds); - lmv_object_put(obj); - CDEBUG(D_INODE, "UNLINK '%*s' in "DFID" -> %u\n", - op_data->op_namelen, op_data->op_name, - PFID(&op_data->op_fid1), sidx); - } - - if (tgt == NULL) { - tgt = lmv_find_target(lmv, &op_data->op_fid1); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); - op_data->op_bias |= MDS_CHECK_SPLIT; - } - - op_data->op_fsuid = cfs_curproc_fsuid(); - op_data->op_fsgid = cfs_curproc_fsgid(); - op_data->op_cap = cfs_curproc_cap_pack(); + tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - /* - * If child's fid is given, cancel unused locks for it if it is from - * another export than parent. - * - * LOOKUP lock for child (fid3) should also be cancelled on parent - * tgt_tgt in mdc_unlink(). - */ - op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3; + op_data->op_fsuid = cfs_curproc_fsuid(); + op_data->op_fsgid = cfs_curproc_fsgid(); + op_data->op_cap = cfs_curproc_cap_pack(); - /* - * Cancel FULL locks on child (fid3). - */ - rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX, - MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3); + /* + * If child's fid is given, cancel unused locks for it if it is from + * another export than parent. + * + * LOOKUP lock for child (fid3) should also be cancelled on parent + * tgt_tgt in mdc_unlink(). + */ + op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3; - if (rc == 0) - rc = md_unlink(tgt->ltd_exp, op_data, request); + /* + * Cancel FULL locks on child (fid3). + */ + rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX, + MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3); - if (rc == -ERESTART) { - LASSERT(*request != NULL); - DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, - "Got -ERESTART during unlink!\n"); - ptlrpc_req_finished(*request); - *request = NULL; + if (rc != 0) + RETURN(rc); - /* - * Directory got split. Time to update local object and repeat - * the request with proper MDS. - */ - rc = lmv_handle_split(exp, &op_data->op_fid1); - if (rc == 0) - goto repeat; - } - RETURN(rc); + rc = md_unlink(tgt->ltd_exp, op_data, request); + + RETURN(rc); } static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) @@ -3016,39 +2365,23 @@ int lmv_intent_getattr_async(struct obd_export *exp, struct md_enqueue_info *minfo, struct ldlm_enqueue_info *einfo) { - struct md_op_data *op_data = &minfo->mi_data; - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_object *obj; - struct lmv_tgt_desc *tgt = NULL; - int rc; - int sidx; - ENTRY; - - rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); - - if (op_data->op_namelen) { - obj = lmv_object_find(obd, &op_data->op_fid1); - if (obj) { - sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, - (char *)op_data->op_name, - op_data->op_namelen); - op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid; - tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds); - lmv_object_put(obj); - } - } + struct md_op_data *op_data = &minfo->mi_data; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt = NULL; + int rc; + ENTRY; - if (tgt == NULL) - tgt = lmv_find_target(lmv, &op_data->op_fid1); + rc = lmv_check_connect(obd); + if (rc) + RETURN(rc); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); + tgt = lmv_find_target(lmv, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo); - RETURN(rc); + rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo); + RETURN(rc); } int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, @@ -3209,21 +2542,10 @@ int __init lmv_init(void) struct lprocfs_static_vars lvars; int rc; - lmv_object_cache = cfs_mem_cache_create("lmv_objects", - sizeof(struct lmv_object), - 0, 0); - if (!lmv_object_cache) { - CERROR("Error allocating lmv objects cache\n"); - return -ENOMEM; - } - lprocfs_lmv_init_vars(&lvars); rc = class_register_type(&lmv_obd_ops, &lmv_md_ops, lvars.module_vars, LUSTRE_LMV_NAME, NULL); - if (rc) - cfs_mem_cache_destroy(lmv_object_cache); - return rc; } @@ -3231,11 +2553,6 @@ int __init lmv_init(void) static void lmv_exit(void) { class_unregister_type(LUSTRE_LMV_NAME); - - LASSERTF(cfs_atomic_read(&lmv_object_count) == 0, - "Can't free lmv objects cache, %d object(s) busy\n", - cfs_atomic_read(&lmv_object_count)); - cfs_mem_cache_destroy(lmv_object_cache); } MODULE_AUTHOR("Sun Microsystems, Inc. "); diff --git a/lustre/lmv/lmv_object.c b/lustre/lmv/lmv_object.c deleted file mode 100644 index 78d38cd..0000000 --- a/lustre/lmv/lmv_object.c +++ /dev/null @@ -1,451 +0,0 @@ -/* - * GPL HEADER START - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 only, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License version 2 for more details (a copy is included - * in the LICENSE file that accompanied this code). - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. - * - * GPL HEADER END - */ -/* - * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. - * Use is subject to license terms. - * - * Copyright (c) 2011, 2012, Intel Corporation. - */ -/* - * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. - */ - -#define DEBUG_SUBSYSTEM S_LMV -#ifdef __KERNEL__ -#include -#include -#include -#include -#include -#include -#include -#else -#include -#endif - -#include -#include -#include -#include -#include -#include -#include -#include "lmv_internal.h" - -extern cfs_mem_cache_t *lmv_object_cache; -extern cfs_atomic_t lmv_object_count; - -static CFS_LIST_HEAD(obj_list); -static DEFINE_SPINLOCK(obj_list_lock); - -struct lmv_object *lmv_object_alloc(struct obd_device *obd, - const struct lu_fid *fid, - struct lmv_stripe_md *mea) -{ - struct lmv_obd *lmv = &obd->u.lmv; - unsigned int obj_size; - struct lmv_object *obj; - int i; - - LASSERT(mea->mea_magic == MEA_MAGIC_LAST_CHAR - || mea->mea_magic == MEA_MAGIC_ALL_CHARS - || mea->mea_magic == MEA_MAGIC_HASH_SEGMENT); - - OBD_SLAB_ALLOC_PTR(obj, lmv_object_cache); - if (!obj) - return NULL; - - cfs_atomic_inc(&lmv_object_count); - - obj->lo_fid = *fid; - obj->lo_obd = obd; - obj->lo_state = 0; - obj->lo_hashtype = mea->mea_magic; - - mutex_init(&obj->lo_guard); - cfs_atomic_set(&obj->lo_count, 0); - obj->lo_objcount = mea->mea_count; - - obj_size = sizeof(struct lmv_stripe) * - lmv->desc.ld_tgt_count; - - OBD_ALLOC_LARGE(obj->lo_stripes, obj_size); - if (!obj->lo_stripes) - goto err_obj; - - CDEBUG(D_INODE, "Allocate object for "DFID"\n", - PFID(fid)); - for (i = 0; i < mea->mea_count; i++) { - int rc; - - CDEBUG(D_INODE, "Process subobject "DFID"\n", - PFID(&mea->mea_ids[i])); - obj->lo_stripes[i].ls_fid = mea->mea_ids[i]; - LASSERT(fid_is_sane(&obj->lo_stripes[i].ls_fid)); - - /* - * Cache slave mds number to use it in all cases it is needed - * instead of constant lookup. - */ - rc = lmv_fld_lookup(lmv, &obj->lo_stripes[i].ls_fid, - &obj->lo_stripes[i].ls_mds); - if (rc) - goto err_obj; - } - - return obj; -err_obj: - OBD_FREE(obj, sizeof(*obj)); - return NULL; -} - -void lmv_object_free(struct lmv_object *obj) -{ - struct lmv_obd *lmv = &obj->lo_obd->u.lmv; - unsigned int obj_size; - - LASSERT(!cfs_atomic_read(&obj->lo_count)); - - obj_size = sizeof(struct lmv_stripe) * - lmv->desc.ld_tgt_count; - - OBD_FREE_LARGE(obj->lo_stripes, obj_size); - OBD_SLAB_FREE(obj, lmv_object_cache, sizeof(*obj)); - cfs_atomic_dec(&lmv_object_count); -} - -static void __lmv_object_add(struct lmv_object *obj) -{ - cfs_atomic_inc(&obj->lo_count); - cfs_list_add(&obj->lo_list, &obj_list); -} - -void lmv_object_add(struct lmv_object *obj) -{ - spin_lock(&obj_list_lock); - __lmv_object_add(obj); - spin_unlock(&obj_list_lock); -} - -static void __lmv_object_del(struct lmv_object *obj) -{ - cfs_list_del(&obj->lo_list); - lmv_object_free(obj); -} - -void lmv_object_del(struct lmv_object *obj) -{ - spin_lock(&obj_list_lock); - __lmv_object_del(obj); - spin_unlock(&obj_list_lock); -} - -static struct lmv_object *__lmv_object_get(struct lmv_object *obj) -{ - LASSERT(obj != NULL); - cfs_atomic_inc(&obj->lo_count); - return obj; -} - -struct lmv_object *lmv_object_get(struct lmv_object *obj) -{ - spin_lock(&obj_list_lock); - __lmv_object_get(obj); - spin_unlock(&obj_list_lock); - return obj; -} - -static void __lmv_object_put(struct lmv_object *obj) -{ - LASSERT(obj); - - if (cfs_atomic_dec_and_test(&obj->lo_count)) { - CDEBUG(D_INODE, "Last reference to "DFID" - " - "destroying\n", PFID(&obj->lo_fid)); - __lmv_object_del(obj); - } -} - -void lmv_object_put(struct lmv_object *obj) -{ - spin_lock(&obj_list_lock); - __lmv_object_put(obj); - spin_unlock(&obj_list_lock); -} - -void lmv_object_put_unlock(struct lmv_object *obj) -{ - lmv_object_unlock(obj); - lmv_object_put(obj); -} - -static struct lmv_object *__lmv_object_find(struct obd_device *obd, const struct lu_fid *fid) -{ - struct lmv_object *obj; - cfs_list_t *cur; - - cfs_list_for_each(cur, &obj_list) { - obj = cfs_list_entry(cur, struct lmv_object, lo_list); - - /* - * Check if object is in destroying phase. If so - skip - * it. - */ - if (obj->lo_state & O_FREEING) - continue; - - /* - * We should make sure, that we have found object belong to - * passed obd. It is possible that, object manager will have two - * objects with the same fid belong to different obds, if client - * and mds runs on the same host. May be it is good idea to have - * objects list associated with obd. - */ - if (obj->lo_obd != obd) - continue; - - /* - * Check if this is what we're looking for. - */ - if (lu_fid_eq(&obj->lo_fid, fid)) - return __lmv_object_get(obj); - } - - return NULL; -} - -struct lmv_object *lmv_object_find(struct obd_device *obd, - const struct lu_fid *fid) -{ - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_object *obj = NULL; - ENTRY; - - /* For single MDT case, lmv_object list is always empty. */ - if (lmv->desc.ld_tgt_count > 1) { - spin_lock(&obj_list_lock); - obj = __lmv_object_find(obd, fid); - spin_unlock(&obj_list_lock); - } - - RETURN(obj); -} - -struct lmv_object *lmv_object_find_lock(struct obd_device *obd, - const struct lu_fid *fid) -{ - struct lmv_object *obj; - ENTRY; - - obj = lmv_object_find(obd, fid); - if (obj) - lmv_object_lock(obj); - - RETURN(obj); -} - -static struct lmv_object *__lmv_object_create(struct obd_device *obd, - const struct lu_fid *fid, - struct lmv_stripe_md *mea) -{ - struct lmv_object *new; - struct lmv_object *obj; - ENTRY; - - obj = lmv_object_find(obd, fid); - if (obj) - RETURN(obj); - - new = lmv_object_alloc(obd, fid, mea); - if (!new) - RETURN(NULL); - - /* - * Check if someone created it already while we were dealing with - * allocating @obj. - */ - spin_lock(&obj_list_lock); - obj = __lmv_object_find(obd, fid); - if (obj) { - /* - * Someone created it already - put @obj and getting out. - */ - spin_unlock(&obj_list_lock); - lmv_object_free(new); - RETURN(obj); - } - - __lmv_object_add(new); - __lmv_object_get(new); - - spin_unlock(&obj_list_lock); - - CDEBUG(D_INODE, "New obj in lmv cache: "DFID"\n", PFID(fid)); - - RETURN(new); -} - -struct lmv_object *lmv_object_create(struct obd_export *exp, - const struct lu_fid *fid, - struct lmv_stripe_md *mea) -{ - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct ptlrpc_request *req = NULL; - struct lmv_tgt_desc *tgt; - struct lmv_object *obj; - struct lustre_md md; - int mealen; - int rc; - ENTRY; - - CDEBUG(D_INODE, "Get mea for "DFID" and create lmv obj\n", - PFID(fid)); - - md.mea = NULL; - - if (mea == NULL) { - struct md_op_data *op_data; - __u64 valid; - - CDEBUG(D_INODE, "Mea isn't passed in, get it now\n"); - mealen = lmv_get_easize(lmv); - - /* - * Time to update mea of parent fid. - */ - md.mea = NULL; - valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA; - - tgt = lmv_find_target(lmv, fid); - if (IS_ERR(tgt)) - GOTO(cleanup, obj = (void *)tgt); - - OBD_ALLOC_PTR(op_data); - if (op_data == NULL) - GOTO(cleanup, obj = ERR_PTR(-ENOMEM)); - - op_data->op_fid1 = *fid; - op_data->op_mode = mealen; - op_data->op_valid = valid; - rc = md_getattr(tgt->ltd_exp, op_data, &req); - OBD_FREE_PTR(op_data); - if (rc) { - CERROR("md_getattr() failed, error %d\n", rc); - GOTO(cleanup, obj = ERR_PTR(rc)); - } - - rc = md_get_lustre_md(exp, req, NULL, exp, &md); - if (rc) { - CERROR("md_get_lustre_md() failed, error %d\n", rc); - GOTO(cleanup, obj = ERR_PTR(rc)); - } - - if (md.mea == NULL) - GOTO(cleanup, obj = ERR_PTR(-ENODATA)); - - mea = md.mea; - } - - /* - * Got mea, now create obj for it. - */ - obj = __lmv_object_create(obd, fid, mea); - if (!obj) { - CERROR("Can't create new object "DFID"\n", - PFID(fid)); - GOTO(cleanup, obj = ERR_PTR(-ENOMEM)); - } - - if (md.mea != NULL) - obd_free_memmd(exp, (void *)&md.mea); - - EXIT; -cleanup: - if (req) - ptlrpc_req_finished(req); - return obj; -} - -int lmv_object_delete(struct obd_export *exp, const struct lu_fid *fid) -{ - struct obd_device *obd = exp->exp_obd; - struct lmv_object *obj; - int rc = 0; - ENTRY; - - spin_lock(&obj_list_lock); - obj = __lmv_object_find(obd, fid); - if (obj) { - obj->lo_state |= O_FREEING; - __lmv_object_put(obj); - __lmv_object_put(obj); - rc = 1; - } - spin_unlock(&obj_list_lock); - RETURN(rc); -} - -int lmv_object_setup(struct obd_device *obd) -{ - ENTRY; - LASSERT(obd != NULL); - - CDEBUG(D_INFO, "LMV object manager setup (%s)\n", - obd->obd_uuid.uuid); - - RETURN(0); -} - -void lmv_object_cleanup(struct obd_device *obd) -{ - cfs_list_t *cur; - cfs_list_t *tmp; - struct lmv_object *obj; - ENTRY; - - CDEBUG(D_INFO, "LMV object manager cleanup (%s)\n", - obd->obd_uuid.uuid); - - spin_lock(&obj_list_lock); - cfs_list_for_each_safe(cur, tmp, &obj_list) { - obj = cfs_list_entry(cur, struct lmv_object, lo_list); - - if (obj->lo_obd != obd) - continue; - - obj->lo_state |= O_FREEING; - if (cfs_atomic_read(&obj->lo_count) > 1) { - CERROR("Object "DFID" has count (%d)\n", - PFID(&obj->lo_fid), - cfs_atomic_read(&obj->lo_count)); - } - __lmv_object_put(obj); - } - spin_unlock(&obj_list_lock); - EXIT; -}