ldlm_blocking_callback cb_blocking,
__u64 extra_lock_flags)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct ptlrpc_request *req = NULL;
- struct lustre_handle plock;
- struct md_op_data *op_data;
- struct lmv_tgt_desc *tgt;
- struct mdt_body *body;
- int pmode;
- int rc = 0;
- ENTRY;
-
- body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
- if (body == NULL)
- RETURN(-EPROTO);
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct ptlrpc_request *req = NULL;
+ struct lustre_handle plock;
+ struct md_op_data *op_data;
+ struct lmv_tgt_desc *tgt;
+ struct mdt_body *body;
+ int pmode;
+ int rc = 0;
+ ENTRY;
+
+ body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
+ if (body == NULL)
+ RETURN(-EPROTO);
+
+ LASSERT((body->valid & OBD_MD_MDS));
+
+ /*
+ * Unfortunately, we have to lie to MDC/MDS to retrieve
+ * attributes llite needs and provideproper locking.
+ */
+ if (it->it_op & IT_LOOKUP)
+ it->it_op = IT_GETATTR;
+
+ /*
+ * We got LOOKUP lock, but we really need attrs.
+ */
+ pmode = it->d.lustre.it_lock_mode;
+ if (pmode) {
+ plock.cookie = it->d.lustre.it_lock_handle;
+ it->d.lustre.it_lock_mode = 0;
+ it->d.lustre.it_data = NULL;
+ }
- /*
- * Not cross-ref case, just get out of here.
- */
- if (!(body->valid & OBD_MD_MDS))
- RETURN(0);
+ LASSERT(fid_is_sane(&body->fid1));
- /*
- * Unfortunately, we have to lie to MDC/MDS to retrieve
- * attributes llite needs and provideproper locking.
- */
- if (it->it_op & IT_LOOKUP)
- it->it_op = IT_GETATTR;
-
- /*
- * We got LOOKUP lock, but we really need attrs.
- */
- pmode = it->d.lustre.it_lock_mode;
- if (pmode) {
- plock.cookie = it->d.lustre.it_lock_handle;
- it->d.lustre.it_lock_mode = 0;
- it->d.lustre.it_data = NULL;
- }
+ tgt = lmv_find_target(lmv, &body->fid1);
+ if (IS_ERR(tgt))
+ GOTO(out, rc = PTR_ERR(tgt));
- LASSERT(fid_is_sane(&body->fid1));
+ OBD_ALLOC_PTR(op_data);
+ if (op_data == NULL)
+ GOTO(out, rc = -ENOMEM);
- tgt = lmv_find_target(lmv, &body->fid1);
- if (IS_ERR(tgt))
- GOTO(out, rc = PTR_ERR(tgt));
+ op_data->op_fid1 = body->fid1;
+ op_data->op_bias = MDS_CROSS_REF;
- OBD_ALLOC_PTR(op_data);
- if (op_data == NULL)
- GOTO(out, rc = -ENOMEM);
-
- op_data->op_fid1 = body->fid1;
- op_data->op_bias = MDS_CROSS_REF;
-
- CDEBUG(D_INODE,
- "REMOTE_INTENT with fid="DFID" -> mds #%d\n",
- PFID(&body->fid1), tgt->ltd_idx);
+ CDEBUG(D_INODE, "REMOTE_INTENT with fid="DFID" -> mds #%d\n",
+ PFID(&body->fid1), tgt->ltd_idx);
it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it,
ldlm_blocking_callback cb_blocking,
__u64 extra_lock_flags)
{
- struct obd_device *obd = exp->exp_obd;
- struct lu_fid rpid = op_data->op_fid1;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct md_op_data *sop_data;
- struct lmv_stripe_md *mea;
- struct lmv_tgt_desc *tgt;
- struct mdt_body *body;
- struct lmv_object *obj;
- int rc;
- int loop = 0;
- int sidx;
- ENTRY;
-
- OBD_ALLOC_PTR(sop_data);
- if (sop_data == NULL)
- RETURN(-ENOMEM);
-
- /* save op_data fro repeat case */
- *sop_data = *op_data;
-
-repeat:
-
- ++loop;
- LASSERT(loop <= 2);
- obj = lmv_object_find(obd, &rpid);
- if (obj) {
- /*
- * Directory is already split, so we have to forward request to
- * the right MDS.
- */
- sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- (char *)op_data->op_name,
- op_data->op_namelen);
-
- rpid = obj->lo_stripes[sidx].ls_fid;
-
- sop_data->op_mds = obj->lo_stripes[sidx].ls_mds;
- tgt = lmv_get_target(lmv, sop_data->op_mds);
- sop_data->op_bias &= ~MDS_CHECK_SPLIT;
- lmv_object_put(obj);
-
- CDEBUG(D_INODE,
- "Choose slave dir ("DFID") -> mds #%d\n",
- PFID(&rpid), tgt->ltd_idx);
- } else {
- sop_data->op_bias |= MDS_CHECK_SPLIT;
- tgt = lmv_find_target(lmv, &rpid);
- sop_data->op_mds = tgt->ltd_idx;
- }
- if (IS_ERR(tgt))
- GOTO(out_free_sop_data, rc = PTR_ERR(tgt));
-
- sop_data->op_fid1 = rpid;
-
- if (it->it_op & IT_CREAT) {
- /*
- * For open with IT_CREATE and for IT_CREATE cases allocate new
- * fid and setup FLD for it.
- */
- sop_data->op_fid3 = sop_data->op_fid2;
- rc = lmv_fid_alloc(exp, &sop_data->op_fid2, sop_data);
- if (rc)
- GOTO(out_free_sop_data, rc);
-
- if (rc == -ERESTART)
- goto repeat;
- else if (rc)
- GOTO(out_free_sop_data, rc);
- }
-
- CDEBUG(D_INODE,
- "OPEN_INTENT with fid1="DFID", fid2="DFID", name='%s' -> mds #%d\n",
- PFID(&sop_data->op_fid1), PFID(&sop_data->op_fid2),
- sop_data->op_name, tgt->ltd_idx);
-
- rc = md_intent_lock(tgt->ltd_exp, sop_data, lmm, lmmsize, it, flags,
- reqp, cb_blocking, extra_lock_flags);
-
- if (rc == -ERESTART) {
- LASSERT(*reqp != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *reqp,
- "Got -ERESTART during open!\n");
- ptlrpc_req_finished(*reqp);
- *reqp = NULL;
- it->d.lustre.it_data = NULL;
-
- /*
- * Directory got split. Time to update local object and repeat
- * the request with proper MDS.
- */
- LASSERT(lu_fid_eq(&op_data->op_fid1, &rpid));
- rc = lmv_handle_split(exp, &rpid);
- if (rc == 0) {
- /* We should reallocate child FID. */
- rc = lmv_allocate_slaves(obd, &rpid, op_data,
- &sop_data->op_fid2);
- if (rc == 0)
- goto repeat;
- }
- }
-
- if (rc != 0)
- GOTO(out_free_sop_data, rc);
-
- /*
- * Nothing is found, do not access body->fid1 as it is zero and thus
- * pointless.
- */
- if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) &&
- !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) &&
- !(it->d.lustre.it_disposition & DISP_OPEN_OPEN))
- GOTO(out_free_sop_data, rc = 0);
-
- /*
- * Okay, MDS has returned success. Probably name has been resolved in
- * remote inode.
- */
- rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
- cb_blocking, extra_lock_flags);
- if (rc != 0) {
- LASSERT(rc < 0);
- /*
- * This is possible, that some userspace application will try to
- * open file as directory and we will have -ENOTDIR here. As
- * this is normal situation, we should not print error here,
- * only debug info.
- */
- CDEBUG(D_INODE, "Can't handle remote %s: dir "DFID"("DFID"):"
- "%*s: %d\n", LL_IT2STR(it), PFID(&op_data->op_fid2),
- PFID(&rpid), op_data->op_namelen, op_data->op_name, rc);
- GOTO(out_free_sop_data, rc);
- }
-
- /*
- * Caller may use attrs MDS returns on IT_OPEN lock request so, we have
- * to update them for split dir.
- */
- body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
- LASSERT(body != NULL);
-
- /*
- * Could not find object, FID is not present in response.
- */
- if (!(body->valid & OBD_MD_FLID))
- GOTO(out_free_sop_data, rc = 0);
-
- obj = lmv_object_find(obd, &body->fid1);
- if (obj == NULL) {
- /*
- * XXX: Capability for remote call!
- */
- mea = lmv_get_mea(*reqp);
- if (mea != NULL) {
- obj = lmv_object_create(exp, &body->fid1, mea);
- if (IS_ERR(obj))
- GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
- }
- }
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ struct mdt_body *body;
+ int rc;
+ ENTRY;
+
+ tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+
+ if (it->it_op & IT_CREAT) {
+ /*
+ * For open with IT_CREATE and for IT_CREATE cases allocate new
+ * fid and setup FLD for it.
+ */
+ op_data->op_fid3 = op_data->op_fid2;
+ rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
+ if (rc != 0)
+ RETURN(rc);
+ }
- if (obj) {
- /*
- * This is split dir and we'd want to get attrs.
- */
- CDEBUG(D_INODE, "Slave attributes for "DFID"\n",
- PFID(&body->fid1));
+ CDEBUG(D_INODE, "OPEN_INTENT with fid1="DFID", fid2="DFID","
+ " name='%s' -> mds #%d\n", PFID(&op_data->op_fid1),
+ PFID(&op_data->op_fid2), op_data->op_name, tgt->ltd_idx);
+
+ rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it, flags,
+ reqp, cb_blocking, extra_lock_flags);
+ if (rc != 0)
+ RETURN(rc);
+ /*
+ * Nothing is found, do not access body->fid1 as it is zero and thus
+ * pointless.
+ */
+ if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) &&
+ !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) &&
+ !(it->d.lustre.it_disposition & DISP_OPEN_OPEN))
+ RETURN(rc);
+
+ body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
+ if (body == NULL)
+ RETURN(-EPROTO);
+ /*
+ * Not cross-ref case, just get out of here.
+ */
+ if (likely(!(body->valid & OBD_MD_MDS)))
+ RETURN(0);
+
+ /*
+ * Okay, MDS has returned success. Probably name has been resolved in
+ * remote inode.
+ */
+ rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
+ cb_blocking, extra_lock_flags);
+ if (rc != 0) {
+ LASSERT(rc < 0);
+ /*
+ * This is possible, that some userspace application will try to
+ * open file as directory and we will have -ENOTDIR here. As
+ * this is normal situation, we should not print error here,
+ * only debug info.
+ */
+ CDEBUG(D_INODE, "Can't handle remote %s: dir "DFID"("DFID"):"
+ "%*s: %d\n", LL_IT2STR(it), PFID(&op_data->op_fid2),
+ PFID(&op_data->op_fid1), op_data->op_namelen,
+ op_data->op_name, rc);
+ RETURN(rc);
+ }
- rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
- cb_blocking, extra_lock_flags);
- lmv_object_put(obj);
- }
- EXIT;
-out_free_sop_data:
- OBD_FREE_PTR(sop_data);
- return rc;
+ RETURN(rc);
}
/*
ldlm_blocking_callback cb_blocking,
__u64 extra_lock_flags)
{
- struct obd_device *obd = exp->exp_obd;
- struct lu_fid rpid = op_data->op_fid1;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_object *obj = NULL;
- struct md_op_data *sop_data;
- struct lmv_stripe_md *mea;
- struct lmv_tgt_desc *tgt = NULL;
- struct mdt_body *body;
- int sidx;
- int loop = 0;
- int rc = 0;
- ENTRY;
-
- OBD_ALLOC_PTR(sop_data);
- if (sop_data == NULL)
- RETURN(-ENOMEM);
-
- *sop_data = *op_data;
-
-repeat:
- ++loop;
- LASSERT(loop <= 2);
-
- obj = lmv_object_find(obd, &op_data->op_fid1);
- if (obj && op_data->op_namelen) {
- sidx = raw_name2idx(obj->lo_hashtype,
- obj->lo_objcount,
- (char *)op_data->op_name,
- op_data->op_namelen);
- rpid = obj->lo_stripes[sidx].ls_fid;
- tgt = lmv_get_target(lmv,
- obj->lo_stripes[sidx].ls_mds);
- CDEBUG(D_INODE,
- "Choose slave dir ("DFID") -> mds #%d\n",
- PFID(&rpid), tgt->ltd_idx);
- sop_data->op_bias &= ~MDS_CHECK_SPLIT;
- } else {
- tgt = lmv_find_target(lmv, &op_data->op_fid1);
- sop_data->op_bias |= MDS_CHECK_SPLIT;
- }
- if (obj)
- lmv_object_put(obj);
-
- if (IS_ERR(tgt))
- GOTO(out_free_sop_data, rc = PTR_ERR(tgt));
-
- if (!fid_is_sane(&sop_data->op_fid2))
- fid_zero(&sop_data->op_fid2);
-
- CDEBUG(D_INODE,
- "LOOKUP_INTENT with fid1="DFID", fid2="DFID
- ", name='%s' -> mds #%d\n",
- PFID(&sop_data->op_fid1), PFID(&sop_data->op_fid2),
- sop_data->op_name ? sop_data->op_name : "<NULL>",
- tgt->ltd_idx);
-
- sop_data->op_bias &= ~MDS_CROSS_REF;
- sop_data->op_fid1 = rpid;
-
- rc = md_intent_lock(tgt->ltd_exp, sop_data, lmm, lmmsize, it,
- flags, reqp, cb_blocking, extra_lock_flags);
-
- if (rc == -ERESTART) {
- LASSERT(*reqp != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *reqp,
- "Got -ERESTART during lookup!\n");
- ptlrpc_req_finished(*reqp);
- *reqp = NULL;
- it->d.lustre.it_data = 0;
-
- /*
- * Directory got split since last update. This shouldn't be
- * because splitting causes lock revocation, so revalidate had
- * to fail and lookup on dir had to return mea.
- */
- LASSERT(obj == NULL);
-
- obj = lmv_object_create(exp, &rpid, NULL);
- if (IS_ERR(obj))
- GOTO(out_free_sop_data, rc = PTR_ERR(obj));
- lmv_object_put(obj);
- goto repeat;
- }
-
- if (rc < 0)
- GOTO(out_free_sop_data, rc);
-
- if (obj && rc > 0) {
- /*
- * This is split dir. In order to optimize things a bit, we
- * consider obj valid updating missing parts.
- */
- CDEBUG(D_INODE,
- "Revalidate slaves for "DFID", rc %d\n",
- PFID(&op_data->op_fid1), rc);
-
- LASSERT(fid_is_sane(&op_data->op_fid2));
- rc = lmv_revalidate_slaves(exp, reqp, &op_data->op_fid1, it, rc,
- cb_blocking, extra_lock_flags);
- GOTO(out_free_sop_data, rc);
- }
-
- if (*reqp == NULL)
- GOTO(out_free_sop_data, rc);
-
- /*
- * MDS has returned success. Probably name has been resolved in
- * remote inode. Let's check this.
- */
- rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags,
- reqp, cb_blocking, extra_lock_flags);
- if (rc < 0)
- GOTO(out_free_sop_data, rc);
-
- /*
- * Nothing is found, do not access body->fid1 as it is zero and thus
- * pointless.
- */
- if (it->d.lustre.it_disposition & DISP_LOOKUP_NEG)
- GOTO(out_free_sop_data, rc = 0);
-
- LASSERT(*reqp != NULL);
- LASSERT((*reqp)->rq_repmsg != NULL);
- body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
- LASSERT(body != NULL);
-
- /*
- * Could not find object, FID is not present in response.
- */
- if (!(body->valid & OBD_MD_FLID))
- GOTO(out_free_sop_data, rc = 0);
-
- obj = lmv_object_find(obd, &body->fid1);
- if (obj == NULL) {
- /*
- * XXX: Remote capability is not handled.
- */
- mea = lmv_get_mea(*reqp);
- if (mea != NULL) {
- obj = lmv_object_create(exp, &body->fid1, mea);
- if (IS_ERR(obj))
- GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
- }
- } else {
- CDEBUG(D_INODE, "Slave attributes for "DFID", rc %d\n",
- PFID(&body->fid1), rc);
-
- rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
- cb_blocking, extra_lock_flags);
- lmv_object_put(obj);
- }
-
- EXIT;
-out_free_sop_data:
- OBD_FREE_PTR(sop_data);
- return rc;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt = NULL;
+ struct mdt_body *body;
+ int rc = 0;
+ ENTRY;
+
+ tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+
+ if (!fid_is_sane(&op_data->op_fid2))
+ fid_zero(&op_data->op_fid2);
+
+ CDEBUG(D_INODE, "LOOKUP_INTENT with fid1="DFID", fid2="DFID
+ ", name='%s' -> mds #%d\n", PFID(&op_data->op_fid1),
+ PFID(&op_data->op_fid2),
+ op_data->op_name ? op_data->op_name : "<NULL>",
+ tgt->ltd_idx);
+
+ op_data->op_bias &= ~MDS_CROSS_REF;
+
+ rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it,
+ flags, reqp, cb_blocking, extra_lock_flags);
+
+ if (rc < 0 || *reqp == NULL)
+ RETURN(rc);
+
+ /*
+ * MDS has returned success. Probably name has been resolved in
+ * remote inode. Let's check this.
+ */
+ body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
+ if (body == NULL)
+ RETURN(-EPROTO);
+ /* Not cross-ref case, just get out of here. */
+ if (likely(!(body->valid & OBD_MD_MDS)))
+ RETURN(0);
+
+ rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
+ cb_blocking, extra_lock_flags);
+
+ RETURN(rc);
}
int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
LBUG();
RETURN(rc);
}
-
-int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
- const struct lu_fid *mid, struct lookup_intent *oit,
- int master_valid, ldlm_blocking_callback cb_blocking,
- __u64 extra_lock_flags)
-{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- int master_lockm = 0;
- struct lustre_handle *lockh = NULL;
- struct ptlrpc_request *mreq = *reqp;
- struct lustre_handle master_lockh = { 0 };
- struct md_op_data *op_data;
- struct ldlm_lock *lock;
- unsigned long size = 0;
- struct mdt_body *body;
- struct lmv_object *obj;
- int i;
- int rc = 0;
- struct lu_fid fid;
- struct ptlrpc_request *req;
- ldlm_blocking_callback cb;
- struct lookup_intent it;
- struct lmv_tgt_desc *tgt;
- int master;
- ENTRY;
-
- CDEBUG(D_INODE, "Revalidate master obj "DFID"\n", PFID(mid));
-
- OBD_ALLOC_PTR(op_data);
- if (op_data == NULL)
- RETURN(-ENOMEM);
-
- /*
- * We have to loop over the subobjects, check validity and update them
- * from MDS if needed. It's very useful that we need not to update all
- * the fields. Say, common fields (that are equal on all the subojects
- * need not to be update, another fields (i_size, for example) are
- * cached all the time.
- */
- obj = lmv_object_find_lock(obd, mid);
- if (obj == NULL) {
- OBD_FREE_PTR(op_data);
- RETURN(-EALREADY);
- }
-
- for (i = 0; i < obj->lo_objcount; i++) {
- fid = obj->lo_stripes[i].ls_fid;
- master = lu_fid_eq(&fid, &obj->lo_fid);
- cb = master ? cb_blocking : lmv_blocking_ast;
-
- /*
- * We need i_size and we would like to check possible cached locks,
- * so this is is IT_GETATTR intent.
- */
- memset(&it, 0, sizeof(it));
- it.it_op = IT_GETATTR;
-
- if (master && master_valid) {
- /*
- * lmv_intent_lookup() already checked
- * validness and took the lock.
- */
- if (mreq != NULL) {
- body = req_capsule_server_get(&mreq->rq_pill,
- &RMF_MDT_BODY);
- LASSERT(body != NULL);
- goto update;
- }
- /*
- * Take already cached attrs into account.
- */
- CDEBUG(D_INODE,
- "Master "DFID"is locked and cached\n",
- PFID(mid));
- goto release_lock;
- }
-
- /*
- * Prepare op_data for revalidating. Note that @fid2 shuld be
- * defined otherwise it will go to server and take new lock
- * which is what we reall not need here.
- */
- memset(op_data, 0, sizeof(*op_data));
- op_data->op_bias = MDS_CROSS_REF;
- op_data->op_fid1 = fid;
- op_data->op_fid2 = fid;
- req = NULL;
-
- tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds);
- if (IS_ERR(tgt))
- GOTO(cleanup, rc = PTR_ERR(tgt));
-
- CDEBUG(D_INODE, "Revalidate slave obj "DFID" -> mds #%d\n",
- PFID(&fid), tgt->ltd_idx);
-
- rc = md_intent_lock(tgt->ltd_exp, op_data, NULL, 0, &it, 0,
- &req, cb, extra_lock_flags);
-
- lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
- if (rc > 0 && req == NULL) {
- /*
- * Nice, this slave is valid.
- */
- CDEBUG(D_INODE, "Cached slave "DFID"\n", PFID(&fid));
- goto release_lock;
- }
-
- if (rc < 0)
- GOTO(cleanup, rc);
-
- if (master) {
- /*
- * Save lock on master to be returned to the caller.
- */
- CDEBUG(D_INODE, "No lock on master "DFID" yet\n",
- PFID(mid));
- memcpy(&master_lockh, lockh, sizeof(master_lockh));
- master_lockm = it.d.lustre.it_lock_mode;
- it.d.lustre.it_lock_mode = 0;
- } else {
- /*
- * This is slave. We want to control it.
- */
- lock = ldlm_handle2lock(lockh);
- LASSERT(lock != NULL);
- lock->l_ast_data = lmv_object_get(obj);
- LDLM_LOCK_PUT(lock);
- }
-
- if (*reqp == NULL) {
- /*
- * This is first reply, we'll use it to return updated
- * data back to the caller.
- */
- LASSERT(req != NULL);
- ptlrpc_request_addref(req);
- *reqp = req;
- }
-
- body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
- LASSERT(body != NULL);
-
-update:
- obj->lo_stripes[i].ls_size = body->size;
-
- CDEBUG(D_INODE, "Fresh size %lu from "DFID"\n",
- (unsigned long)obj->lo_stripes[i].ls_size, PFID(&fid));
-
- if (req)
- ptlrpc_req_finished(req);
-release_lock:
- size += obj->lo_stripes[i].ls_size;
-
- if (it.d.lustre.it_lock_mode && lockh) {
- ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
- it.d.lustre.it_lock_mode = 0;
- }
- }
-
- if (*reqp) {
- /*
- * Some attrs got refreshed, we have reply and it's time to put
- * fresh attrs to it.
- */
- CDEBUG(D_INODE, "Return refreshed attrs: size = %lu for "DFID"\n",
- (unsigned long)size, PFID(mid));
-
- body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
- LASSERT(body != NULL);
- body->size = size;
-
- if (mreq == NULL) {
- /*
- * Very important to maintain mds num the same because
- * of revalidation. mreq == NULL means that caller has
- * no reply and the only attr we can return is size.
- */
- body->valid = OBD_MD_FLSIZE;
- }
- if (master_valid == 0) {
- oit->d.lustre.it_lock_handle = master_lockh.cookie;
- oit->d.lustre.it_lock_mode = master_lockm;
- }
- rc = 0;
- } else {
- /*
- * It seems all the attrs are fresh and we did no request.
- */
- CDEBUG(D_INODE, "All the attrs were fresh on "DFID"\n",
- PFID(mid));
- if (master_valid == 0)
- oit->d.lustre.it_lock_mode = master_lockm;
- rc = 1;
- }
-
- EXIT;
-cleanup:
- OBD_FREE_PTR(op_data);
- lmv_object_put_unlock(obj);
- return rc;
-}
-
-int lmv_allocate_slaves(struct obd_device *obd, struct lu_fid *pid,
- struct md_op_data *op, struct lu_fid *fid)
-{
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_object *obj;
- mdsno_t mds;
- int sidx;
- int rc;
- ENTRY;
-
- obj = lmv_object_find(obd, pid);
- if (obj == NULL)
- RETURN(-EALREADY);
-
- sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- (char *)op->op_name, op->op_namelen);
- mds = obj->lo_stripes[sidx].ls_mds;
- lmv_object_put(obj);
-
- rc = __lmv_fid_alloc(lmv, fid, mds);
- if (rc) {
- CERROR("Can't allocate fid, rc %d\n", rc);
- RETURN(rc);
- }
-
- CDEBUG(D_INODE, "Allocate new fid "DFID" for slave "
- "obj -> mds #%x\n", PFID(fid), mds);
-
- RETURN(rc);
-}
#include <lustre_fid.h>
#include "lmv_internal.h"
-/* object cache. */
-cfs_mem_cache_t *lmv_object_cache;
-cfs_atomic_t lmv_object_count = CFS_ATOMIC_INIT(0);
-
static void lmv_activate_target(struct lmv_obd *lmv,
struct lmv_tgt_desc *tgt,
int activate)
RETURN(rc);
}
+#if 0
static int lmv_all_chars_policy(int count, const char *name,
int len)
{
CERROR("Unsupported placement policy %x\n", placement);
return -EINVAL;
}
+#endif
/**
* This is _inode_ placement policy function (not name).
struct md_op_data *op_data,
mdsno_t *mds)
{
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_object *obj;
- int rc;
- ENTRY;
-
- LASSERT(mds != NULL);
-
- if (lmv->desc.ld_tgt_count == 1) {
- *mds = 0;
- RETURN(0);
- }
-
- /*
- * Allocate new fid on target according to operation type and parent
- * home mds.
- */
- obj = lmv_object_find(obd, &op_data->op_fid1);
- if (obj != NULL || op_data->op_name == NULL ||
- op_data->op_opc != LUSTRE_OPC_MKDIR) {
- /*
- * Allocate fid for non-dir or for null name or for case parent
- * dir is split.
- */
- if (obj) {
- lmv_object_put(obj);
-
- /*
- * If we have this flag turned on, and we see that
- * parent dir is split, this means, that caller did not
- * notice split yet. This is race and we would like to
- * let caller know that.
- */
- if (op_data->op_bias & MDS_CHECK_SPLIT)
- RETURN(-ERESTART);
- }
-
- /*
- * Allocate new fid on same mds where parent fid is located and
- * where operation will be sent. In case of split dir, ->op_fid1
- * and ->op_mds here will contain fid and mds of slave directory
- * object (assigned by caller).
- */
- *mds = op_data->op_mds;
- rc = 0;
- } else {
- /*
- * Parent directory is not split and we want to create a
- * directory in it. Let's calculate where to place it according
- * to operation data @op_data.
- */
- *mds = lmv_choose_mds(lmv, op_data, lmv->lmv_placement);
- rc = 0;
- }
+ LASSERT(mds != NULL);
- if (rc) {
- CERROR("Can't choose MDS, err = %d\n", rc);
- } else {
- LASSERT(*mds < lmv->desc.ld_tgt_count);
- }
+ /* Allocate new fid on target according to to different
+ * QOS policy. In DNE phase I, llite should always tell
+ * which MDT where the dir will be located */
+ *mds = op_data->op_mds;
- RETURN(rc);
+ RETURN(0);
}
int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
spin_lock_init(&lmv->lmv_lock);
mutex_init(&lmv->init_mutex);
- rc = lmv_object_setup(obd);
- if (rc) {
- CERROR("Can't setup LMV object manager, error %d.\n", rc);
- GOTO(out_free_datas, rc);
- }
-
- lprocfs_lmv_init_vars(&lvars);
+ lprocfs_lmv_init_vars(&lvars);
lprocfs_obd_setup(obd, lvars.obd_vars);
#ifdef LPROCFS
{
static int lmv_cleanup(struct obd_device *obd)
{
- struct lmv_obd *lmv = &obd->u.lmv;
- ENTRY;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ ENTRY;
- fld_client_fini(&lmv->lmv_fld);
- lmv_object_cleanup(obd);
- OBD_FREE(lmv->datas, lmv->datas_size);
- OBD_FREE(lmv->tgts, lmv->tgts_size);
+ fld_client_fini(&lmv->lmv_fld);
+ OBD_FREE(lmv->datas, lmv->datas_size);
+ OBD_FREE(lmv->tgts, lmv->tgts_size);
- RETURN(0);
+ RETURN(0);
}
static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
struct lmv_tgt_desc *tgt;
- struct lmv_object *obj;
int rc;
- int i;
ENTRY;
rc = lmv_check_connect(obd);
}
rc = md_getattr(tgt->ltd_exp, op_data, request);
- if (rc)
- RETURN(rc);
-
- obj = lmv_object_find_lock(obd, &op_data->op_fid1);
-
- CDEBUG(D_INODE, "GETATTR for "DFID" %s\n", PFID(&op_data->op_fid1),
- obj ? "(split)" : "");
-
- /*
- * If object is split, then we loop over all the slaves and gather size
- * attribute. In ideal world we would have to gather also mds field from
- * all slaves, as object is spread over the cluster and this is
- * definitely interesting information and it is not good to loss it,
- * but...
- */
- if (obj) {
- struct mdt_body *body;
-
- if (*request == NULL) {
- lmv_object_put(obj);
- RETURN(rc);
- }
-
- body = req_capsule_server_get(&(*request)->rq_pill,
- &RMF_MDT_BODY);
- LASSERT(body != NULL);
-
- for (i = 0; i < obj->lo_objcount; i++) {
- if (lmv->tgts[i].ltd_exp == NULL) {
- CWARN("%s: NULL export for %d\n",
- obd->obd_name, i);
- continue;
- }
-
- /*
- * Skip master object.
- */
- if (lu_fid_eq(&obj->lo_fid, &obj->lo_stripes[i].ls_fid))
- continue;
-
- body->size += obj->lo_stripes[i].ls_size;
- }
-
- lmv_object_put_unlock(obj);
- }
RETURN(rc);
}
RETURN(rc);
}
-/**
- * Called in the case MDS returns -ERESTART on create on open, what means that
- * directory is split and its LMV presentation object has to be updated.
- */
-int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
+struct lmv_tgt_desc
+*lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
+ struct lu_fid *fid)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct ptlrpc_request *req = NULL;
- struct lmv_tgt_desc *tgt;
- struct lmv_object *obj;
- struct lustre_md md;
- struct md_op_data *op_data;
- int mealen;
- int rc;
- __u64 valid;
- ENTRY;
-
- md.mea = NULL;
- mealen = lmv_get_easize(lmv);
-
- valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
-
- tgt = lmv_find_target(lmv, fid);
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
-
- /*
- * Time to update mea of parent fid.
- */
-
- OBD_ALLOC_PTR(op_data);
- if (op_data == NULL)
- RETURN(-ENOMEM);
-
- op_data->op_fid1 = *fid;
- op_data->op_mode = mealen;
- op_data->op_valid = valid;
-
- rc = md_getattr(tgt->ltd_exp, op_data, &req);
- OBD_FREE_PTR(op_data);
- if (rc) {
- CERROR("md_getattr() failed, error %d\n", rc);
- GOTO(cleanup, rc);
- }
-
- rc = md_get_lustre_md(tgt->ltd_exp, req, NULL, exp, &md);
- if (rc) {
- CERROR("md_get_lustre_md() failed, error %d\n", rc);
- GOTO(cleanup, rc);
- }
-
- if (md.mea == NULL)
- GOTO(cleanup, rc = -ENODATA);
+ struct lmv_tgt_desc *tgt;
- obj = lmv_object_create(exp, fid, md.mea);
- if (IS_ERR(obj))
- rc = PTR_ERR(obj);
- else
- lmv_object_put(obj);
+ tgt = lmv_find_target(lmv, fid);
+ op_data->op_mds = tgt->ltd_idx;
- obd_free_memmd(exp, (void *)&md.mea);
- EXIT;
-cleanup:
- if (req)
- ptlrpc_req_finished(req);
- return rc;
+ return tgt;
}
int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
__u32 gid, cfs_cap_t cap_effective, __u64 rdev,
struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_tgt_desc *tgt;
- struct lmv_object *obj;
- int rc;
- int loop = 0;
- int sidx;
- ENTRY;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc;
+ ENTRY;
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
- if (!lmv->desc.ld_active_tgt_count)
- RETURN(-EIO);
-repeat:
- ++loop;
- LASSERT(loop <= 2);
-
- obj = lmv_object_find(obd, &op_data->op_fid1);
- if (obj) {
- sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- op_data->op_name, op_data->op_namelen);
- op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
- op_data->op_bias &= ~MDS_CHECK_SPLIT;
- op_data->op_mds = obj->lo_stripes[sidx].ls_mds;
- tgt = lmv_get_target(lmv, op_data->op_mds);
- lmv_object_put(obj);
- } else {
- tgt = lmv_find_target(lmv, &op_data->op_fid1);
- op_data->op_bias |= MDS_CHECK_SPLIT;
- op_data->op_mds = tgt->ltd_idx;
- }
+ if (!lmv->desc.ld_active_tgt_count)
+ RETURN(-EIO);
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
+ tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
- if (rc == -ERESTART)
- goto repeat;
- else if (rc)
- RETURN(rc);
+ rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
+ if (rc)
+ RETURN(rc);
- CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
- op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
- op_data->op_mds);
+ CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
+ op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
+ op_data->op_mds);
- op_data->op_flags |= MF_MDC_CANCEL_FID1;
- rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
- cap_effective, rdev, request);
- if (rc == 0) {
- if (*request == NULL)
- RETURN(rc);
- CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
- } else if (rc == -ERESTART) {
- LASSERT(*request != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
- "Got -ERESTART during create!\n");
- ptlrpc_req_finished(*request);
- *request = NULL;
+ op_data->op_flags |= MF_MDC_CANCEL_FID1;
+ rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
+ cap_effective, rdev, request);
- /*
- * Directory got split. Time to update local object and repeat
- * the request with proper MDS.
- */
- rc = lmv_handle_split(exp, &op_data->op_fid1);
- if (rc == 0) {
- rc = lmv_allocate_slaves(obd, &op_data->op_fid1,
- op_data, &op_data->op_fid2);
- if (rc)
- RETURN(rc);
- goto repeat;
- }
- }
- RETURN(rc);
+ if (rc == 0) {
+ if (*request == NULL)
+ RETURN(rc);
+ CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
+ }
+ RETURN(rc);
}
static int lmv_done_writing(struct obd_export *exp,
}
static int
-lmv_enqueue_slaves(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
- struct lookup_intent *it, struct md_op_data *op_data,
- struct lustre_handle *lockh, void *lmm, int lmmsize)
-{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_stripe_md *mea = op_data->op_mea1;
- struct md_op_data *op_data2;
- struct lmv_tgt_desc *tgt;
- int i;
- int rc = 0;
- ENTRY;
-
- OBD_ALLOC_PTR(op_data2);
- if (op_data2 == NULL)
- RETURN(-ENOMEM);
-
- LASSERT(mea != NULL);
- for (i = 0; i < mea->mea_count; i++) {
- memset(op_data2, 0, sizeof(*op_data2));
- op_data2->op_fid1 = mea->mea_ids[i];
- op_data2->op_bias = 0;
-
- tgt = lmv_find_target(lmv, &op_data2->op_fid1);
- if (IS_ERR(tgt))
- GOTO(cleanup, rc = PTR_ERR(tgt));
-
- if (tgt->ltd_exp == NULL)
- continue;
-
- rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data2,
- lockh + i, lmm, lmmsize, NULL, 0);
-
- CDEBUG(D_INODE, "Take lock on slave "DFID" -> %d/%d\n",
- PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status);
-
- if (rc)
- GOTO(cleanup, rc);
-
- if (it->d.lustre.it_data) {
- struct ptlrpc_request *req;
- req = (struct ptlrpc_request *)it->d.lustre.it_data;
- ptlrpc_req_finished(req);
- }
-
- if (it->d.lustre.it_status)
- GOTO(cleanup, rc = it->d.lustre.it_status);
- }
-
- EXIT;
-cleanup:
- OBD_FREE_PTR(op_data2);
-
- if (rc != 0) {
- /*
- * Drop all taken locks.
- */
- while (--i >= 0) {
- if (lockh[i].cookie)
- ldlm_lock_decref(lockh + i, einfo->ei_mode);
- lockh[i].cookie = 0;
- }
- }
- return rc;
-}
-
-static int
lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
struct lookup_intent *it, struct md_op_data *op_data,
struct lustre_handle *lockh, void *lmm, int lmmsize,
struct lustre_handle *lockh, void *lmm, int lmmsize,
struct ptlrpc_request **req, __u64 extra_lock_flags)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_tgt_desc *tgt;
- struct lmv_object *obj;
- int sidx;
- int rc;
- ENTRY;
-
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
-
- CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n",
- LL_IT2STR(it), PFID(&op_data->op_fid1));
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc;
+ ENTRY;
- if (op_data->op_mea1 && it && it->it_op == IT_UNLINK) {
- rc = lmv_enqueue_slaves(exp, einfo, it, op_data,
- lockh, lmm, lmmsize);
- RETURN(rc);
- }
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
- obj = lmv_object_find(obd, &op_data->op_fid1);
- if (obj && op_data->op_namelen) {
- sidx = raw_name2idx(obj->lo_hashtype,
- obj->lo_objcount,
- (char *)op_data->op_name,
- op_data->op_namelen);
- op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
- tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds);
- } else {
- tgt = lmv_find_target(lmv, &op_data->op_fid1);
- }
- if (obj)
- lmv_object_put(obj);
+ CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n",
+ LL_IT2STR(it), PFID(&op_data->op_fid1));
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
+ tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
- LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
+ CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
+ LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
- rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
- lmm, lmmsize, req, extra_lock_flags);
+ rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
+ lmm, lmmsize, req, extra_lock_flags);
- if (rc == 0 && it && it->it_op == IT_OPEN) {
- rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
- lmm, lmmsize, extra_lock_flags);
- }
- RETURN(rc);
+ if (rc == 0 && it && it->it_op == IT_OPEN) {
+ rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
+ lmm, lmmsize, extra_lock_flags);
+ }
+ RETURN(rc);
}
static int
lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data,
struct ptlrpc_request **request)
{
- struct ptlrpc_request *req = NULL;
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lu_fid rid = op_data->op_fid1;
- struct lmv_tgt_desc *tgt;
- struct mdt_body *body;
- struct lmv_object *obj;
- obd_valid valid = op_data->op_valid;
- int rc;
- int loop = 0;
- int sidx;
- ENTRY;
+ struct ptlrpc_request *req = NULL;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ struct mdt_body *body;
+ int rc;
+ ENTRY;
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
-repeat:
- ++loop;
- LASSERT(loop <= 2);
- obj = lmv_object_find(obd, &rid);
- if (obj) {
- sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- op_data->op_name, op_data->op_namelen);
- rid = obj->lo_stripes[sidx].ls_fid;
- tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds);
- op_data->op_mds = obj->lo_stripes[sidx].ls_mds;
- valid &= ~OBD_MD_FLCKSPLIT;
- lmv_object_put(obj);
- } else {
- tgt = lmv_find_target(lmv, &rid);
- valid |= OBD_MD_FLCKSPLIT;
- op_data->op_mds = tgt->ltd_idx;
- }
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
+ tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" - "DFID" -> mds #%d\n",
- op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
- PFID(&rid), tgt->ltd_idx);
-
- op_data->op_valid = valid;
- op_data->op_fid1 = rid;
- rc = md_getattr_name(tgt->ltd_exp, op_data, request);
- if (rc == 0) {
- body = req_capsule_server_get(&(*request)->rq_pill,
- &RMF_MDT_BODY);
- LASSERT(body != NULL);
-
- if (body->valid & OBD_MD_MDS) {
- rid = body->fid1;
- CDEBUG(D_INODE, "Request attrs for "DFID"\n",
- PFID(&rid));
-
- tgt = lmv_find_target(lmv, &rid);
- if (IS_ERR(tgt)) {
- ptlrpc_req_finished(*request);
- RETURN(PTR_ERR(tgt));
- }
+ CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n",
+ op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
+ tgt->ltd_idx);
- op_data->op_fid1 = rid;
- op_data->op_valid |= OBD_MD_FLCROSSREF;
- op_data->op_namelen = 0;
- op_data->op_name = NULL;
- rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
- ptlrpc_req_finished(*request);
- *request = req;
- }
- } else if (rc == -ERESTART) {
- LASSERT(*request != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
- "Got -ERESTART during getattr!\n");
- ptlrpc_req_finished(*request);
- *request = NULL;
+ rc = md_getattr_name(tgt->ltd_exp, op_data, request);
+ if (rc != 0)
+ RETURN(rc);
- /*
- * Directory got split. Time to update local object and repeat
- * the request with proper MDS.
- */
- rc = lmv_handle_split(exp, &rid);
- if (rc == 0)
- goto repeat;
- }
- RETURN(rc);
+ body = req_capsule_server_get(&(*request)->rq_pill,
+ &RMF_MDT_BODY);
+ LASSERT(body != NULL);
+
+ if (body->valid & OBD_MD_MDS) {
+ struct lu_fid rid = body->fid1;
+ CDEBUG(D_INODE, "Request attrs for "DFID"\n",
+ PFID(&rid));
+
+ tgt = lmv_find_target(lmv, &rid);
+ if (IS_ERR(tgt)) {
+ ptlrpc_req_finished(*request);
+ RETURN(PTR_ERR(tgt));
+ }
+
+ op_data->op_fid1 = rid;
+ op_data->op_valid |= OBD_MD_FLCROSSREF;
+ op_data->op_namelen = 0;
+ op_data->op_name = NULL;
+ rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
+ ptlrpc_req_finished(*request);
+ *request = req;
+ }
+
+ RETURN(rc);
}
#define md_op_data_fid(op_data, fl) \
fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
NULL)
-static int lmv_early_cancel_slaves(struct obd_export *exp,
- struct md_op_data *op_data, int op_tgt,
- ldlm_mode_t mode, int bits, int flag)
-{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- ldlm_policy_data_t policy = {{0}};
- struct lu_fid *op_fid;
- struct lu_fid *st_fid;
- struct lmv_tgt_desc *tgt;
- struct lmv_object *obj;
- int rc = 0;
- int i;
- ENTRY;
-
- op_fid = md_op_data_fid(op_data, flag);
- if (!fid_is_sane(op_fid))
- RETURN(0);
-
- obj = lmv_object_find(obd, op_fid);
- if (obj == NULL)
- RETURN(-EALREADY);
-
- policy.l_inodebits.bits = bits;
- for (i = 0; i < obj->lo_objcount; i++) {
- tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds);
- st_fid = &obj->lo_stripes[i].ls_fid;
- if (op_tgt != tgt->ltd_idx) {
- CDEBUG(D_INODE, "EARLY_CANCEL slave "DFID" -> mds #%d\n",
- PFID(st_fid), tgt->ltd_idx);
- rc = md_cancel_unused(tgt->ltd_exp, st_fid, &policy,
- mode, LCF_ASYNC, NULL);
- if (rc)
- GOTO(out_put_obj, rc);
- } else {
- CDEBUG(D_INODE,
- "EARLY_CANCEL skip operation target %d on "DFID"\n",
- op_tgt, PFID(st_fid));
- /*
- * Do not cancel locks for operation target, they will
- * be handled later in underlaying layer when calling
- * function we run on behalf of.
- */
- *op_fid = *st_fid;
- op_data->op_flags |= flag;
- }
- }
- EXIT;
-out_put_obj:
- lmv_object_put(obj);
- return rc;
-}
-
static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
int op_tgt, ldlm_mode_t mode, int bits, int flag)
{
struct lmv_obd *lmv = &obd->u.lmv;
struct lmv_tgt_desc *tgt;
ldlm_policy_data_t policy = {{0}};
- struct lmv_object *obj;
int rc = 0;
ENTRY;
if (!fid_is_sane(fid))
RETURN(0);
- obj = lmv_object_find(obd, fid);
- if (obj) {
- rc = lmv_early_cancel_slaves(exp, op_data, op_tgt, mode,
- bits, flag);
- lmv_object_put(obj);
- } else {
- tgt = lmv_find_target(lmv, fid);
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
-
- if (tgt->ltd_idx != op_tgt) {
- CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
- policy.l_inodebits.bits = bits;
- rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
- mode, LCF_ASYNC, NULL);
- } else {
- CDEBUG(D_INODE,
- "EARLY_CANCEL skip operation target %d on "DFID"\n",
- op_tgt, PFID(fid));
- op_data->op_flags |= flag;
- rc = 0;
- }
+ tgt = lmv_find_target(lmv, fid);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+
+ if (tgt->ltd_idx != op_tgt) {
+ CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
+ policy.l_inodebits.bits = bits;
+ rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
+ mode, LCF_ASYNC, NULL);
+ } else {
+ CDEBUG(D_INODE,
+ "EARLY_CANCEL skip operation target %d on "DFID"\n",
+ op_tgt, PFID(fid));
+ op_data->op_flags |= flag;
+ rc = 0;
+ }
- }
- RETURN(rc);
+ RETURN(rc);
}
/*
static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_tgt_desc *tgt;
- struct lmv_object *obj;
- int rc;
- int loop = 0;
- mdsno_t mds;
- int sidx;
- ENTRY;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc;
+ ENTRY;
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
-repeat:
- ++loop;
- LASSERT(loop <= 2);
- LASSERT(op_data->op_namelen != 0);
-
- CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
- PFID(&op_data->op_fid2), op_data->op_namelen,
- op_data->op_name, PFID(&op_data->op_fid1));
-
- obj = lmv_object_find(obd, &op_data->op_fid2);
- if (obj) {
- sidx = raw_name2idx(obj->lo_hashtype,
- obj->lo_objcount,
- op_data->op_name,
- op_data->op_namelen);
- op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid;
- mds = obj->lo_stripes[sidx].ls_mds;
- lmv_object_put(obj);
- } else {
- rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds);
- if (rc)
- RETURN(rc);
- }
+ LASSERT(op_data->op_namelen != 0);
- CDEBUG(D_INODE, "Forward to mds #%x ("DFID")\n",
- mds, PFID(&op_data->op_fid1));
+ CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
+ PFID(&op_data->op_fid2), op_data->op_namelen,
+ op_data->op_name, PFID(&op_data->op_fid1));
- op_data->op_fsuid = cfs_curproc_fsuid();
- op_data->op_fsgid = cfs_curproc_fsgid();
- op_data->op_cap = cfs_curproc_cap_pack();
- tgt = lmv_get_target(lmv, mds);
+ op_data->op_fsuid = cfs_curproc_fsuid();
+ op_data->op_fsgid = cfs_curproc_fsgid();
+ op_data->op_cap = cfs_curproc_cap_pack();
+ tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- /*
- * Cancel UPDATE lock on child (fid1).
- */
- op_data->op_flags |= MF_MDC_CANCEL_FID2;
- rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
- MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
- if (rc == 0)
- rc = md_link(tgt->ltd_exp, op_data, request);
- if (rc == -ERESTART) {
- LASSERT(*request != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
- "Got -ERESTART during link!\n");
- ptlrpc_req_finished(*request);
- *request = NULL;
+ /*
+ * Cancel UPDATE lock on child (fid1).
+ */
+ op_data->op_flags |= MF_MDC_CANCEL_FID2;
+ rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
+ MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
+ if (rc != 0)
+ RETURN(rc);
- /*
- * Directory got split. Time to update local object and repeat
- * the request with proper MDS.
- */
- rc = lmv_handle_split(exp, &op_data->op_fid2);
- if (rc == 0)
- goto repeat;
- }
+ rc = md_link(tgt->ltd_exp, op_data, request);
- RETURN(rc);
+ RETURN(rc);
}
static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
struct lmv_tgt_desc *src_tgt;
- int rc;
- int sidx;
- int loop = 0;
- struct lmv_object *obj;
- mdsno_t mds1;
- mdsno_t mds2;
- ENTRY;
+ struct lmv_tgt_desc *tgt_tgt;
+ int rc;
+ ENTRY;
LASSERT(oldlen != 0);
if (rc)
RETURN(rc);
-repeat:
- ++loop;
- LASSERT(loop <= 2);
- obj = lmv_object_find(obd, &op_data->op_fid1);
- if (obj) {
- sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- (char *)old, oldlen);
- op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
- mds1 = obj->lo_stripes[sidx].ls_mds;
- CDEBUG(D_INODE, "Parent obj "DFID"\n", PFID(&op_data->op_fid1));
- lmv_object_put(obj);
- } else {
- rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds1);
- if (rc)
- RETURN(rc);
- }
-
- obj = lmv_object_find(obd, &op_data->op_fid2);
- if (obj) {
- /*
- * Directory is already split, so we have to forward request to
- * the right MDS.
- */
- sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- (char *)new, newlen);
-
- mds2 = obj->lo_stripes[sidx].ls_mds;
- op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid;
- CDEBUG(D_INODE, "Parent obj "DFID"\n", PFID(&op_data->op_fid2));
- lmv_object_put(obj);
- } else {
- rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds2);
- if (rc)
- RETURN(rc);
- }
-
- op_data->op_fsuid = cfs_curproc_fsuid();
- op_data->op_fsgid = cfs_curproc_fsgid();
- op_data->op_cap = cfs_curproc_cap_pack();
-
- src_tgt = lmv_get_target(lmv, mds1);
-
- /*
- * LOOKUP lock on src child (fid3) should also be cancelled for
- * src_tgt in mdc_rename.
- */
- op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
-
- /*
- * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
- * own target.
- */
- rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
- LCK_EX, MDS_INODELOCK_UPDATE,
- MF_MDC_CANCEL_FID2);
-
- /*
- * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
- */
- if (rc == 0) {
- rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
- LCK_EX, MDS_INODELOCK_LOOKUP,
- MF_MDC_CANCEL_FID4);
- }
+ op_data->op_fsuid = cfs_curproc_fsuid();
+ op_data->op_fsgid = cfs_curproc_fsgid();
+ op_data->op_cap = cfs_curproc_cap_pack();
+ src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+ if (IS_ERR(src_tgt))
+ RETURN(PTR_ERR(src_tgt));
+
+ tgt_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
+ if (IS_ERR(tgt_tgt))
+ RETURN(PTR_ERR(tgt_tgt));
+ /*
+ * LOOKUP lock on src child (fid3) should also be cancelled for
+ * src_tgt in mdc_rename.
+ */
+ op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
+
+ /*
+ * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
+ * own target.
+ */
+ rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+ LCK_EX, MDS_INODELOCK_UPDATE,
+ MF_MDC_CANCEL_FID2);
+
+ /*
+ * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
+ */
+ if (rc == 0) {
+ rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+ LCK_EX, MDS_INODELOCK_LOOKUP,
+ MF_MDC_CANCEL_FID4);
+ }
- /*
- * Cancel all the locks on tgt child (fid4).
- */
- if (rc == 0)
- rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
- LCK_EX, MDS_INODELOCK_FULL,
- MF_MDC_CANCEL_FID4);
-
- if (rc == 0)
- rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
- new, newlen, request);
-
- if (rc == -ERESTART) {
- LASSERT(*request != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
- "Got -ERESTART during rename!\n");
- ptlrpc_req_finished(*request);
- *request = NULL;
+ /*
+ * Cancel all the locks on tgt child (fid4).
+ */
+ if (rc == 0)
+ rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+ LCK_EX, MDS_INODELOCK_FULL,
+ MF_MDC_CANCEL_FID4);
- /*
- * Directory got split. Time to update local object and repeat
- * the request with proper MDS.
- */
- rc = lmv_handle_split(exp, &op_data->op_fid1);
- if (rc == 0)
- goto repeat;
- }
- RETURN(rc);
+ if (rc == 0)
+ rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
+ new, newlen, request);
+ RETURN(rc);
}
static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request,
struct md_open_data **mod)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct ptlrpc_request *req;
- struct lmv_tgt_desc *tgt;
- struct lmv_object *obj;
- int rc = 0;
- int i;
- ENTRY;
-
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
-
- obj = lmv_object_find(obd, &op_data->op_fid1);
-
- CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x%s\n",
- PFID(&op_data->op_fid1), op_data->op_attr.ia_valid,
- obj ? ", split" : "");
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc = 0;
+ ENTRY;
- op_data->op_flags |= MF_MDC_CANCEL_FID1;
- if (obj) {
- for (i = 0; i < obj->lo_objcount; i++) {
- op_data->op_fid1 = obj->lo_stripes[i].ls_fid;
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
- tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds);
- if (IS_ERR(tgt)) {
- rc = PTR_ERR(tgt);
- break;
- }
+ CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x\n",
+ PFID(&op_data->op_fid1), op_data->op_attr.ia_valid);
- rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen,
- ea2, ea2len, &req, mod);
-
- if (lu_fid_eq(&obj->lo_fid, &obj->lo_stripes[i].ls_fid)) {
- /*
- * This is master object and this request should
- * be returned back to llite.
- */
- *request = req;
- } else {
- ptlrpc_req_finished(req);
- }
+ op_data->op_flags |= MF_MDC_CANCEL_FID1;
+ tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- if (rc)
- break;
- }
- lmv_object_put(obj);
- } else {
- tgt = lmv_find_target(lmv, &op_data->op_fid1);
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
+ rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2,
+ ea2len, request, mod);
- rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2,
- ea2len, request, mod);
- }
- RETURN(rc);
+ RETURN(rc);
}
static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
RETURN(rc);
}
-/**
- * Main purpose of LMV blocking ast is to remove split directory LMV
- * presentation object (struct lmv_object) attached to the lock being revoked.
- */
-int lmv_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
- void *data, int flag)
-{
- struct lustre_handle lockh;
- struct lmv_object *obj;
- int rc;
- ENTRY;
-
- switch (flag) {
- case LDLM_CB_BLOCKING:
- ldlm_lock2handle(lock, &lockh);
- rc = ldlm_cli_cancel(&lockh);
- if (rc < 0) {
- CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
- RETURN(rc);
- }
- break;
- case LDLM_CB_CANCELING:
- /*
- * Time to drop cached attrs for split directory object
- */
- obj = lock->l_ast_data;
- if (obj) {
- CDEBUG(D_INODE, "Cancel %s on "LPU64"/"LPU64
- ", master "DFID"\n",
- lock->l_resource->lr_name.name[3] == 1 ?
- "LOOKUP" : "UPDATE",
- lock->l_resource->lr_name.name[0],
- lock->l_resource->lr_name.name[1],
- PFID(&obj->lo_fid));
- lmv_object_put(obj);
- }
- break;
- default:
- LBUG();
- }
- RETURN(0);
-}
-
+#if 0
static void lmv_hash_adjust(__u64 *hash, __u64 hash_adj)
{
__u64 val;
return id ^ (id >> 32);
}
+#endif
static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
struct page **pages, struct ptlrpc_request **request)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_object *obj;
- struct lu_fid rid = op_data->op_fid1;
__u64 offset = op_data->op_offset;
- __u64 hash_adj = 0;
- __u32 rank = 0;
- __u64 seg_size = 0;
- __u64 tgt_tmp = 0;
- int tgt_idx = 0;
- int tgt0_idx = 0;
int rc;
- int nr = 0;
int i;
/* number of pages read, in CFS_PAGE_SIZE */
int nrdpgs;
/* number of pages transferred in LU_PAGE_SIZE */
int nlupgs;
- struct lmv_stripe *los;
struct lmv_tgt_desc *tgt;
struct lu_dirpage *dp;
struct lu_dirent *ent;
if (rc)
RETURN(rc);
- CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n", offset, PFID(&rid));
-
- /*
- * This case handle directory lookup in clustered metadata case (i.e.
- * split directory is located on multiple md servers.)
- * each server keeps directory entries for certain range of hashes.
- * E.g. we have N server and suppose hash range is 0 to MAX_HASH.
- * first server will keep records with hashes [ 0 ... MAX_HASH / N - 1],
- * second one with hashes [MAX_HASH / N ... 2 * MAX_HASH / N] and
- * so on....
- * readdir can simply start reading entries from 0 - N server in
- * order but that will not scale well as all client will request dir in
- * to server in same order.
- * Following algorithm does optimization:
- * Instead of doing readdir in 1, 2, ...., N order, client with a
- * rank R does readdir in R, R + 1, ..., N, 1, ... R - 1 order.
- * (every client has rank R)
- * But ll_readdir() expect offset range [0 to MAX_HASH/N) but
- * since client ask dir from MDS{R} client has pages with offsets
- * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj
- * on hash values that we get.
- */
- obj = lmv_object_find_lock(obd, &rid);
- if (obj) {
- nr = obj->lo_objcount;
- LASSERT(nr > 0);
- seg_size = MAX_HASH_SIZE;
- do_div(seg_size, nr);
- los = obj->lo_stripes;
- tgt = lmv_get_target(lmv, los[0].ls_mds);
- rank = lmv_node_rank(tgt->ltd_exp, &rid) % nr;
- tgt_tmp = offset;
- do_div(tgt_tmp, seg_size);
- tgt0_idx = do_div(tgt_tmp, nr);
- tgt_idx = (tgt0_idx + rank) % nr;
-
- if (tgt_idx < tgt0_idx)
- /*
- * Wrap around.
- *
- * Last segment has unusual length due to division
- * rounding.
- */
- hash_adj = MAX_HASH_SIZE - seg_size * nr;
- else
- hash_adj = 0;
-
- hash_adj += rank * seg_size;
-
- CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" "
- LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj,
- offset, tgt0_idx, offset + hash_adj, tgt_idx);
-
- offset = (offset + hash_adj) & MAX_HASH_SIZE;
- rid = obj->lo_stripes[tgt_idx].ls_fid;
- tgt = lmv_get_target(lmv, los[tgt_idx].ls_mds);
-
- CDEBUG(D_INODE, "Forward to "DFID" with offset %lu i %d\n",
- PFID(&rid), (unsigned long)offset, tgt_idx);
- } else
- tgt = lmv_find_target(lmv, &rid);
-
- if (IS_ERR(tgt))
- GOTO(cleanup, rc = PTR_ERR(tgt));
+ CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n",
+ offset, PFID(&op_data->op_fid1));
+
+ /*
+ * This case handle directory lookup in clustered metadata case (i.e.
+ * split directory is located on multiple md servers.)
+ * each server keeps directory entries for certain range of hashes.
+ * E.g. we have N server and suppose hash range is 0 to MAX_HASH.
+ * first server will keep records with hashes [ 0 ... MAX_HASH /N - 1],
+ * second one with hashes [MAX_HASH / N ... 2 * MAX_HASH / N] and
+ * so on....
+ * readdir can simply start reading entries from 0 - N server in
+ * order but that will not scale well as all client will request dir in
+ * to server in same order.
+ * Following algorithm does optimization:
+ * Instead of doing readdir in 1, 2, ...., N order, client with a
+ * rank R does readdir in R, R + 1, ..., N, 1, ... R - 1 order.
+ * (every client has rank R)
+ * But ll_readdir() expect offset range [0 to MAX_HASH/N) but
+ * since client ask dir from MDS{R} client has pages with offsets
+ * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj
+ * on hash values that we get.
+ if (0) {
+ LASSERT(nr > 0);
+ seg_size = MAX_HASH_SIZE;
+ do_div(seg_size, nr);
+ los = obj->lo_stripes;
+ tgt = lmv_get_target(lmv, los[0].ls_mds);
+ rank = lmv_node_rank(tgt->ltd_exp, fid) % nr;
+ tgt_tmp = offset;
+ do_div(tgt_tmp, seg_size);
+ tgt0_idx = do_div(tgt_tmp, nr);
+ tgt_idx = (tgt0_idx + rank) % nr;
+
+ if (tgt_idx < tgt0_idx)
+ * Wrap around.
+ *
+ * Last segment has unusual length due to division
+ * rounding.
+ hash_adj = MAX_HASH_SIZE - seg_size * nr;
+ else
+ hash_adj = 0;
+
+ hash_adj += rank * seg_size;
+
+ CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" "
+ LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj,
+ offset, tgt0_idx, offset + hash_adj, tgt_idx);
+
+ offset = (offset + hash_adj) & MAX_HASH_SIZE;
+ rid = lsm->mea_oinfo[tgt_idx].lmo_fid;
+ tgt = lmv_get_target(lmv, lsm->mea_oinfo[tgt_idx].lmo_mds);
+
+ CDEBUG(D_INODE, "Forward to "DFID" with offset %lu i %d\n",
+ PFID(&rid), (unsigned long)offset, tgt_idx);
+ }
+ */
+ tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- op_data->op_fid1 = rid;
- rc = md_readpage(tgt->ltd_exp, op_data, pages, request);
- if (rc)
- GOTO(cleanup, rc);
+ rc = md_readpage(tgt->ltd_exp, op_data, pages, request);
+ if (rc != 0)
+ RETURN(rc);
- nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1)
- >> CFS_PAGE_SHIFT;
- nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
- LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
- LASSERT(nrdpgs > 0 && nrdpgs <= op_data->op_npages);
+ nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1)
+ >> CFS_PAGE_SHIFT;
+ nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
+ LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
+ LASSERT(nrdpgs > 0 && nrdpgs <= op_data->op_npages);
- CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs,
- op_data->op_npages);
+ CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs,
+ op_data->op_npages);
- for (i = 0; i < nrdpgs; i++) {
+ for (i = 0; i < nrdpgs; i++) {
#if CFS_PAGE_SIZE > LU_PAGE_SIZE
- struct lu_dirpage *first;
- __u64 hash_end = 0;
- __u32 flags = 0;
+ struct lu_dirpage *first;
+ __u64 hash_end = 0;
+ __u32 flags = 0;
#endif
- struct lu_dirent *tmp = NULL;
-
- dp = cfs_kmap(pages[i]);
- if (obj) {
- lmv_hash_adjust(&dp->ldp_hash_start, hash_adj);
- lmv_hash_adjust(&dp->ldp_hash_end, hash_adj);
- LASSERT(le64_to_cpu(dp->ldp_hash_start) <=
- op_data->op_offset);
-
- if ((tgt0_idx != nr - 1) &&
- (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF))
- {
- dp->ldp_hash_end = cpu_to_le32(seg_size *
- (tgt0_idx + 1));
- CDEBUG(D_INODE,
- ""DFID" reset end "LPX64" tgt %d\n",
- PFID(&rid),
- (__u64)le64_to_cpu(dp->ldp_hash_end),
- tgt_idx);
- }
- }
+ struct lu_dirent *tmp = NULL;
- ent = lu_dirent_start(dp);
+ dp = cfs_kmap(pages[i]);
+ ent = lu_dirent_start(dp);
#if CFS_PAGE_SIZE > LU_PAGE_SIZE
- first = dp;
- hash_end = dp->ldp_hash_end;
+ first = dp;
+ hash_end = dp->ldp_hash_end;
repeat:
#endif
- nlupgs--;
- for (tmp = ent; ent != NULL;
- tmp = ent, ent = lu_dirent_next(ent)) {
- if (obj)
- lmv_hash_adjust(&ent->lde_hash, hash_adj);
- }
+ nlupgs--;
+ for (tmp = ent; ent != NULL;
+ tmp = ent, ent = lu_dirent_next(ent));
#if CFS_PAGE_SIZE > LU_PAGE_SIZE
- dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
- if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) {
- ent = lu_dirent_start(dp);
-
- if (obj) {
- lmv_hash_adjust(&dp->ldp_hash_end, hash_adj);
- if ((tgt0_idx != nr - 1) &&
- (le64_to_cpu(dp->ldp_hash_end) ==
- MDS_DIR_END_OFF)) {
- hash_end = cpu_to_le32(seg_size *
- (tgt0_idx + 1));
- CDEBUG(D_INODE,
- ""DFID" reset end "LPX64" tgt %d\n",
- PFID(&rid),
- (__u64)le64_to_cpu(hash_end),
- tgt_idx);
- }
- }
- hash_end = dp->ldp_hash_end;
- flags = dp->ldp_flags;
-
- if (tmp) {
- /* enlarge the end entry lde_reclen from 0 to
- * first entry of next lu_dirpage, in this way
- * several lu_dirpages can be stored into one
- * client page on client. */
- tmp = ((void *)tmp) +
- le16_to_cpu(tmp->lde_reclen);
- tmp->lde_reclen =
- cpu_to_le16((char *)(dp->ldp_entries) -
- (char *)tmp);
- goto repeat;
- }
- }
- first->ldp_hash_end = hash_end;
- first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
- first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
+ dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
+ if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) {
+ ent = lu_dirent_start(dp);
+
+ if (tmp) {
+ /* enlarge the end entry lde_reclen from 0 to
+ * first entry of next lu_dirpage, in this way
+ * several lu_dirpages can be stored into one
+ * client page on client. */
+ tmp = ((void *)tmp) +
+ le16_to_cpu(tmp->lde_reclen);
+ tmp->lde_reclen =
+ cpu_to_le16((char *)(dp->ldp_entries) -
+ (char *)tmp);
+ goto repeat;
+ }
+ }
+ first->ldp_hash_end = hash_end;
+ first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
+ first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
#else
- SET_BUT_UNUSED(tmp);
+ SET_BUT_UNUSED(tmp);
#endif
- cfs_kunmap(pages[i]);
- }
- EXIT;
-cleanup:
- if (obj)
- lmv_object_put_unlock(obj);
- return rc;
+ cfs_kunmap(pages[i]);
+ }
+ RETURN(rc);
}
static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_tgt_desc *tgt = NULL;
- struct lmv_object *obj;
- int rc;
- int sidx;
- int loop = 0;
- ENTRY;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt = NULL;
+ int rc;
+ ENTRY;
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
-repeat:
- ++loop;
- LASSERT(loop <= 2);
- LASSERT(op_data->op_namelen != 0);
-
- obj = lmv_object_find(obd, &op_data->op_fid1);
- if (obj) {
- sidx = raw_name2idx(obj->lo_hashtype,
- obj->lo_objcount,
- op_data->op_name,
- op_data->op_namelen);
- op_data->op_bias &= ~MDS_CHECK_SPLIT;
- op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
- tgt = lmv_get_target(lmv,
- obj->lo_stripes[sidx].ls_mds);
- lmv_object_put(obj);
- CDEBUG(D_INODE, "UNLINK '%*s' in "DFID" -> %u\n",
- op_data->op_namelen, op_data->op_name,
- PFID(&op_data->op_fid1), sidx);
- }
-
- if (tgt == NULL) {
- tgt = lmv_find_target(lmv, &op_data->op_fid1);
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
- op_data->op_bias |= MDS_CHECK_SPLIT;
- }
-
- op_data->op_fsuid = cfs_curproc_fsuid();
- op_data->op_fsgid = cfs_curproc_fsgid();
- op_data->op_cap = cfs_curproc_cap_pack();
+ tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- /*
- * If child's fid is given, cancel unused locks for it if it is from
- * another export than parent.
- *
- * LOOKUP lock for child (fid3) should also be cancelled on parent
- * tgt_tgt in mdc_unlink().
- */
- op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
+ op_data->op_fsuid = cfs_curproc_fsuid();
+ op_data->op_fsgid = cfs_curproc_fsgid();
+ op_data->op_cap = cfs_curproc_cap_pack();
- /*
- * Cancel FULL locks on child (fid3).
- */
- rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
- MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
+ /*
+ * If child's fid is given, cancel unused locks for it if it is from
+ * another export than parent.
+ *
+ * LOOKUP lock for child (fid3) should also be cancelled on parent
+ * tgt_tgt in mdc_unlink().
+ */
+ op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
- if (rc == 0)
- rc = md_unlink(tgt->ltd_exp, op_data, request);
+ /*
+ * Cancel FULL locks on child (fid3).
+ */
+ rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
+ MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
- if (rc == -ERESTART) {
- LASSERT(*request != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
- "Got -ERESTART during unlink!\n");
- ptlrpc_req_finished(*request);
- *request = NULL;
+ if (rc != 0)
+ RETURN(rc);
- /*
- * Directory got split. Time to update local object and repeat
- * the request with proper MDS.
- */
- rc = lmv_handle_split(exp, &op_data->op_fid1);
- if (rc == 0)
- goto repeat;
- }
- RETURN(rc);
+ rc = md_unlink(tgt->ltd_exp, op_data, request);
+
+ RETURN(rc);
}
static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
struct md_enqueue_info *minfo,
struct ldlm_enqueue_info *einfo)
{
- struct md_op_data *op_data = &minfo->mi_data;
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_object *obj;
- struct lmv_tgt_desc *tgt = NULL;
- int rc;
- int sidx;
- ENTRY;
-
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
-
- if (op_data->op_namelen) {
- obj = lmv_object_find(obd, &op_data->op_fid1);
- if (obj) {
- sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- (char *)op_data->op_name,
- op_data->op_namelen);
- op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
- tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds);
- lmv_object_put(obj);
- }
- }
+ struct md_op_data *op_data = &minfo->mi_data;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt = NULL;
+ int rc;
+ ENTRY;
- if (tgt == NULL)
- tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
+ tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo);
- RETURN(rc);
+ rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo);
+ RETURN(rc);
}
int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
struct lprocfs_static_vars lvars;
int rc;
- lmv_object_cache = cfs_mem_cache_create("lmv_objects",
- sizeof(struct lmv_object),
- 0, 0);
- if (!lmv_object_cache) {
- CERROR("Error allocating lmv objects cache\n");
- return -ENOMEM;
- }
-
lprocfs_lmv_init_vars(&lvars);
rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
lvars.module_vars, LUSTRE_LMV_NAME, NULL);
- if (rc)
- cfs_mem_cache_destroy(lmv_object_cache);
-
return rc;
}
static void lmv_exit(void)
{
class_unregister_type(LUSTRE_LMV_NAME);
-
- LASSERTF(cfs_atomic_read(&lmv_object_count) == 0,
- "Can't free lmv objects cache, %d object(s) busy\n",
- cfs_atomic_read(&lmv_object_count));
- cfs_mem_cache_destroy(lmv_object_cache);
}
MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
+++ /dev/null
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- *
- * Copyright (c) 2011, 2012, Intel Corporation.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- */
-
-#define DEBUG_SUBSYSTEM S_LMV
-#ifdef __KERNEL__
-#include <linux/slab.h>
-#include <linux/module.h>
-#include <linux/init.h>
-#include <linux/slab.h>
-#include <linux/pagemap.h>
-#include <asm/div64.h>
-#include <linux/seq_file.h>
-#else
-#include <liblustre.h>
-#endif
-
-#include <obd_support.h>
-#include <lustre/lustre_idl.h>
-#include <lustre_lib.h>
-#include <lustre_net.h>
-#include <lustre_dlm.h>
-#include <obd_class.h>
-#include <lprocfs_status.h>
-#include "lmv_internal.h"
-
-extern cfs_mem_cache_t *lmv_object_cache;
-extern cfs_atomic_t lmv_object_count;
-
-static CFS_LIST_HEAD(obj_list);
-static DEFINE_SPINLOCK(obj_list_lock);
-
-struct lmv_object *lmv_object_alloc(struct obd_device *obd,
- const struct lu_fid *fid,
- struct lmv_stripe_md *mea)
-{
- struct lmv_obd *lmv = &obd->u.lmv;
- unsigned int obj_size;
- struct lmv_object *obj;
- int i;
-
- LASSERT(mea->mea_magic == MEA_MAGIC_LAST_CHAR
- || mea->mea_magic == MEA_MAGIC_ALL_CHARS
- || mea->mea_magic == MEA_MAGIC_HASH_SEGMENT);
-
- OBD_SLAB_ALLOC_PTR(obj, lmv_object_cache);
- if (!obj)
- return NULL;
-
- cfs_atomic_inc(&lmv_object_count);
-
- obj->lo_fid = *fid;
- obj->lo_obd = obd;
- obj->lo_state = 0;
- obj->lo_hashtype = mea->mea_magic;
-
- mutex_init(&obj->lo_guard);
- cfs_atomic_set(&obj->lo_count, 0);
- obj->lo_objcount = mea->mea_count;
-
- obj_size = sizeof(struct lmv_stripe) *
- lmv->desc.ld_tgt_count;
-
- OBD_ALLOC_LARGE(obj->lo_stripes, obj_size);
- if (!obj->lo_stripes)
- goto err_obj;
-
- CDEBUG(D_INODE, "Allocate object for "DFID"\n",
- PFID(fid));
- for (i = 0; i < mea->mea_count; i++) {
- int rc;
-
- CDEBUG(D_INODE, "Process subobject "DFID"\n",
- PFID(&mea->mea_ids[i]));
- obj->lo_stripes[i].ls_fid = mea->mea_ids[i];
- LASSERT(fid_is_sane(&obj->lo_stripes[i].ls_fid));
-
- /*
- * Cache slave mds number to use it in all cases it is needed
- * instead of constant lookup.
- */
- rc = lmv_fld_lookup(lmv, &obj->lo_stripes[i].ls_fid,
- &obj->lo_stripes[i].ls_mds);
- if (rc)
- goto err_obj;
- }
-
- return obj;
-err_obj:
- OBD_FREE(obj, sizeof(*obj));
- return NULL;
-}
-
-void lmv_object_free(struct lmv_object *obj)
-{
- struct lmv_obd *lmv = &obj->lo_obd->u.lmv;
- unsigned int obj_size;
-
- LASSERT(!cfs_atomic_read(&obj->lo_count));
-
- obj_size = sizeof(struct lmv_stripe) *
- lmv->desc.ld_tgt_count;
-
- OBD_FREE_LARGE(obj->lo_stripes, obj_size);
- OBD_SLAB_FREE(obj, lmv_object_cache, sizeof(*obj));
- cfs_atomic_dec(&lmv_object_count);
-}
-
-static void __lmv_object_add(struct lmv_object *obj)
-{
- cfs_atomic_inc(&obj->lo_count);
- cfs_list_add(&obj->lo_list, &obj_list);
-}
-
-void lmv_object_add(struct lmv_object *obj)
-{
- spin_lock(&obj_list_lock);
- __lmv_object_add(obj);
- spin_unlock(&obj_list_lock);
-}
-
-static void __lmv_object_del(struct lmv_object *obj)
-{
- cfs_list_del(&obj->lo_list);
- lmv_object_free(obj);
-}
-
-void lmv_object_del(struct lmv_object *obj)
-{
- spin_lock(&obj_list_lock);
- __lmv_object_del(obj);
- spin_unlock(&obj_list_lock);
-}
-
-static struct lmv_object *__lmv_object_get(struct lmv_object *obj)
-{
- LASSERT(obj != NULL);
- cfs_atomic_inc(&obj->lo_count);
- return obj;
-}
-
-struct lmv_object *lmv_object_get(struct lmv_object *obj)
-{
- spin_lock(&obj_list_lock);
- __lmv_object_get(obj);
- spin_unlock(&obj_list_lock);
- return obj;
-}
-
-static void __lmv_object_put(struct lmv_object *obj)
-{
- LASSERT(obj);
-
- if (cfs_atomic_dec_and_test(&obj->lo_count)) {
- CDEBUG(D_INODE, "Last reference to "DFID" - "
- "destroying\n", PFID(&obj->lo_fid));
- __lmv_object_del(obj);
- }
-}
-
-void lmv_object_put(struct lmv_object *obj)
-{
- spin_lock(&obj_list_lock);
- __lmv_object_put(obj);
- spin_unlock(&obj_list_lock);
-}
-
-void lmv_object_put_unlock(struct lmv_object *obj)
-{
- lmv_object_unlock(obj);
- lmv_object_put(obj);
-}
-
-static struct lmv_object *__lmv_object_find(struct obd_device *obd, const struct lu_fid *fid)
-{
- struct lmv_object *obj;
- cfs_list_t *cur;
-
- cfs_list_for_each(cur, &obj_list) {
- obj = cfs_list_entry(cur, struct lmv_object, lo_list);
-
- /*
- * Check if object is in destroying phase. If so - skip
- * it.
- */
- if (obj->lo_state & O_FREEING)
- continue;
-
- /*
- * We should make sure, that we have found object belong to
- * passed obd. It is possible that, object manager will have two
- * objects with the same fid belong to different obds, if client
- * and mds runs on the same host. May be it is good idea to have
- * objects list associated with obd.
- */
- if (obj->lo_obd != obd)
- continue;
-
- /*
- * Check if this is what we're looking for.
- */
- if (lu_fid_eq(&obj->lo_fid, fid))
- return __lmv_object_get(obj);
- }
-
- return NULL;
-}
-
-struct lmv_object *lmv_object_find(struct obd_device *obd,
- const struct lu_fid *fid)
-{
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_object *obj = NULL;
- ENTRY;
-
- /* For single MDT case, lmv_object list is always empty. */
- if (lmv->desc.ld_tgt_count > 1) {
- spin_lock(&obj_list_lock);
- obj = __lmv_object_find(obd, fid);
- spin_unlock(&obj_list_lock);
- }
-
- RETURN(obj);
-}
-
-struct lmv_object *lmv_object_find_lock(struct obd_device *obd,
- const struct lu_fid *fid)
-{
- struct lmv_object *obj;
- ENTRY;
-
- obj = lmv_object_find(obd, fid);
- if (obj)
- lmv_object_lock(obj);
-
- RETURN(obj);
-}
-
-static struct lmv_object *__lmv_object_create(struct obd_device *obd,
- const struct lu_fid *fid,
- struct lmv_stripe_md *mea)
-{
- struct lmv_object *new;
- struct lmv_object *obj;
- ENTRY;
-
- obj = lmv_object_find(obd, fid);
- if (obj)
- RETURN(obj);
-
- new = lmv_object_alloc(obd, fid, mea);
- if (!new)
- RETURN(NULL);
-
- /*
- * Check if someone created it already while we were dealing with
- * allocating @obj.
- */
- spin_lock(&obj_list_lock);
- obj = __lmv_object_find(obd, fid);
- if (obj) {
- /*
- * Someone created it already - put @obj and getting out.
- */
- spin_unlock(&obj_list_lock);
- lmv_object_free(new);
- RETURN(obj);
- }
-
- __lmv_object_add(new);
- __lmv_object_get(new);
-
- spin_unlock(&obj_list_lock);
-
- CDEBUG(D_INODE, "New obj in lmv cache: "DFID"\n", PFID(fid));
-
- RETURN(new);
-}
-
-struct lmv_object *lmv_object_create(struct obd_export *exp,
- const struct lu_fid *fid,
- struct lmv_stripe_md *mea)
-{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct ptlrpc_request *req = NULL;
- struct lmv_tgt_desc *tgt;
- struct lmv_object *obj;
- struct lustre_md md;
- int mealen;
- int rc;
- ENTRY;
-
- CDEBUG(D_INODE, "Get mea for "DFID" and create lmv obj\n",
- PFID(fid));
-
- md.mea = NULL;
-
- if (mea == NULL) {
- struct md_op_data *op_data;
- __u64 valid;
-
- CDEBUG(D_INODE, "Mea isn't passed in, get it now\n");
- mealen = lmv_get_easize(lmv);
-
- /*
- * Time to update mea of parent fid.
- */
- md.mea = NULL;
- valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
-
- tgt = lmv_find_target(lmv, fid);
- if (IS_ERR(tgt))
- GOTO(cleanup, obj = (void *)tgt);
-
- OBD_ALLOC_PTR(op_data);
- if (op_data == NULL)
- GOTO(cleanup, obj = ERR_PTR(-ENOMEM));
-
- op_data->op_fid1 = *fid;
- op_data->op_mode = mealen;
- op_data->op_valid = valid;
- rc = md_getattr(tgt->ltd_exp, op_data, &req);
- OBD_FREE_PTR(op_data);
- if (rc) {
- CERROR("md_getattr() failed, error %d\n", rc);
- GOTO(cleanup, obj = ERR_PTR(rc));
- }
-
- rc = md_get_lustre_md(exp, req, NULL, exp, &md);
- if (rc) {
- CERROR("md_get_lustre_md() failed, error %d\n", rc);
- GOTO(cleanup, obj = ERR_PTR(rc));
- }
-
- if (md.mea == NULL)
- GOTO(cleanup, obj = ERR_PTR(-ENODATA));
-
- mea = md.mea;
- }
-
- /*
- * Got mea, now create obj for it.
- */
- obj = __lmv_object_create(obd, fid, mea);
- if (!obj) {
- CERROR("Can't create new object "DFID"\n",
- PFID(fid));
- GOTO(cleanup, obj = ERR_PTR(-ENOMEM));
- }
-
- if (md.mea != NULL)
- obd_free_memmd(exp, (void *)&md.mea);
-
- EXIT;
-cleanup:
- if (req)
- ptlrpc_req_finished(req);
- return obj;
-}
-
-int lmv_object_delete(struct obd_export *exp, const struct lu_fid *fid)
-{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_object *obj;
- int rc = 0;
- ENTRY;
-
- spin_lock(&obj_list_lock);
- obj = __lmv_object_find(obd, fid);
- if (obj) {
- obj->lo_state |= O_FREEING;
- __lmv_object_put(obj);
- __lmv_object_put(obj);
- rc = 1;
- }
- spin_unlock(&obj_list_lock);
- RETURN(rc);
-}
-
-int lmv_object_setup(struct obd_device *obd)
-{
- ENTRY;
- LASSERT(obd != NULL);
-
- CDEBUG(D_INFO, "LMV object manager setup (%s)\n",
- obd->obd_uuid.uuid);
-
- RETURN(0);
-}
-
-void lmv_object_cleanup(struct obd_device *obd)
-{
- cfs_list_t *cur;
- cfs_list_t *tmp;
- struct lmv_object *obj;
- ENTRY;
-
- CDEBUG(D_INFO, "LMV object manager cleanup (%s)\n",
- obd->obd_uuid.uuid);
-
- spin_lock(&obj_list_lock);
- cfs_list_for_each_safe(cur, tmp, &obj_list) {
- obj = cfs_list_entry(cur, struct lmv_object, lo_list);
-
- if (obj->lo_obd != obd)
- continue;
-
- obj->lo_state |= O_FREEING;
- if (cfs_atomic_read(&obj->lo_count) > 1) {
- CERROR("Object "DFID" has count (%d)\n",
- PFID(&obj->lo_fid),
- cfs_atomic_read(&obj->lo_count));
- }
- __lmv_object_put(obj);
- }
- spin_unlock(&obj_list_lock);
- EXIT;
-}