* IT_OPEN is intended to open (and create, possible) an object. Parent (pid)
* may be split dir.
*/
-int lmv_intent_open(struct obd_export *exp, const struct lu_fid *pid,
- const char *name, int len, void *lmm, int lmmsize,
- const struct lu_fid *cid, struct lookup_intent *it,
+int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
+ void *lmm, int lmmsize, struct lookup_intent *it,
int flags, struct ptlrpc_request **reqp,
ldlm_blocking_callback cb_blocking,
int extra_lock_flags)
{
struct obd_device *obd = exp->exp_obd;
+ struct lu_fid rpid = op_data->fid1;
struct lmv_obd *lmv = &obd->u.lmv;
struct mdt_body *body = NULL;
- struct md_op_data *op_data;
+ struct md_op_data *sop_data;
struct lmv_stripe_md *mea;
- struct lu_fid rpid = *pid;
struct lmv_obj *obj;
int rc, loop = 0;
mdsno_t mds;
ENTRY;
- OBD_ALLOC_PTR(op_data);
- if (op_data == NULL)
+ OBD_ALLOC_PTR(sop_data);
+ if (sop_data == NULL)
RETURN(-ENOMEM);
+ /* save op_data fro repeat case */
+ *sop_data = *op_data;
+
repeat:
LASSERT(++loop <= 2);
rc = lmv_fld_lookup(lmv, &rpid, &mds);
if (rc)
- GOTO(out_free_op_data, rc);
+ GOTO(out_free_sop_data, rc);
obj = lmv_obj_grab(obd, &rpid);
if (obj) {
- /* directory is already split, so we have to forward request to
- * the right MDS. */
+ /*
+ * Directory is already split, so we have to forward request to
+ * the right MDS.
+ */
mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- (char *)name, len);
+ (char *)op_data->name, op_data->namelen);
CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n",
mds, PFID(&rpid));
lmv_obj_put(obj);
}
- op_data->fid1 = rpid;
+ sop_data->fid1 = rpid;
- if (cid)
- op_data->fid2 = *cid;
- op_data->name = name;
- op_data->namelen = len;
-
- rc = md_intent_lock(lmv->tgts[mds].ltd_exp, op_data,
+ rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data,
lmm, lmmsize, it, flags, reqp,
cb_blocking, extra_lock_flags);
if (rc == -ERESTART) {
- /* directory got split. time to update local object and
- * repeat the request with proper MDS */
- LASSERT(lu_fid_eq(pid, &rpid));
+ /*
+ * Directory got split. Time to update local object and repeat
+ * the request with proper MDS.
+ */
+ LASSERT(lu_fid_eq(&op_data->fid1, &rpid));
rc = lmv_handle_split(exp, &rpid);
if (rc == 0) {
ptlrpc_req_finished(*reqp);
- memset(op_data, 0, sizeof(*op_data));
goto repeat;
}
}
if (rc != 0)
- GOTO(out_free_op_data, rc);
+ GOTO(out_free_sop_data, rc);
- /* okay, MDS has returned success. Probably name has been resolved in
- * remote inode */
+ /*
+ * Okay, MDS has returned success. Probably name has been resolved in
+ * remote inode.
+ */
rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
cb_blocking, extra_lock_flags);
if (rc != 0) {
* only debug info.
*/
CDEBUG(D_OTHER, "can't handle remote %s: dir "DFID"("DFID"):"
- "%*s: %d\n", LL_IT2STR(it), PFID(pid), PFID(&rpid),
- len, name, rc);
- GOTO(out_free_op_data, rc);
+ "%*s: %d\n", LL_IT2STR(it), PFID(&op_data->fid2),
+ PFID(&rpid), op_data->namelen, op_data->name, rc);
+ GOTO(out_free_sop_data, rc);
}
/*
if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) &&
!(it->d.lustre.it_disposition & DISP_OPEN_CREATE) &&
!(it->d.lustre.it_disposition & DISP_OPEN_OPEN))
- GOTO(out_free_op_data, rc = 0);
+ GOTO(out_free_sop_data, rc = 0);
/* caller may use attrs MDS returns on IT_OPEN lock request so, we have
* to update them for split dir */
- body = lustre_msg_buf((*reqp)->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
+ body = lustre_msg_buf((*reqp)->rq_repmsg,
+ DLM_REPLY_REC_OFF, sizeof(*body));
LASSERT(body != NULL);
/* could not find object, FID is not present in response. */
if (!(body->valid & OBD_MD_FLID))
- GOTO(out_free_op_data, rc = 0);
+ GOTO(out_free_sop_data, rc = 0);
- cid = &body->fid1;
- obj = lmv_obj_grab(obd, cid);
+ obj = lmv_obj_grab(obd, &body->fid1);
if (!obj && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
/* wow! this is split dir, we'd like to handle it */
obj = lmv_obj_create(exp, &body->fid1, mea);
if (IS_ERR(obj))
- GOTO(out_free_op_data, rc = (int)PTR_ERR(obj));
+ GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
}
if (obj) {
/* this is split dir and we'd want to get attrs */
CDEBUG(D_OTHER, "attrs from slaves for "DFID"\n",
- PFID(cid));
+ PFID(&body->fid1));
- rc = lmv_revalidate_slaves(exp, reqp, cid, it, 1,
+ rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
cb_blocking, extra_lock_flags);
} else if (S_ISDIR(body->mode)) {
CDEBUG(D_OTHER, "object "DFID" has not lmv obj?\n",
- PFID(cid));
+ PFID(&body->fid1));
}
if (obj)
lmv_obj_put(obj);
EXIT;
-out_free_op_data:
- OBD_FREE_PTR(op_data);
+out_free_sop_data:
+ OBD_FREE_PTR(sop_data);
return rc;
}
-int lmv_intent_getattr(struct obd_export *exp, const struct lu_fid *pid,
- const char *name, int len, void *lmm, int lmmsize,
- const struct lu_fid *cid, struct lookup_intent *it,
+int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
+ void *lmm, int lmmsize, struct lookup_intent *it,
int flags, struct ptlrpc_request **reqp,
ldlm_blocking_callback cb_blocking,
int extra_lock_flags)
{
struct lmv_obj *obj = NULL, *obj2 = NULL;
struct obd_device *obd = exp->exp_obd;
+ struct lu_fid rpid = op_data->fid1;
struct lmv_obd *lmv = &obd->u.lmv;
struct mdt_body *body = NULL;
- struct md_op_data *op_data;
- struct lu_fid rpid = *pid;
+ struct md_op_data *sop_data;
struct lmv_stripe_md *mea;
mdsno_t mds;
int rc = 0;
ENTRY;
- OBD_ALLOC_PTR(op_data);
- if (op_data == NULL)
+ OBD_ALLOC_PTR(sop_data);
+ if (sop_data == NULL)
RETURN(-ENOMEM);
- if (cid) {
- /* caller wants to revalidate attrs of obj we have to revalidate
- * slaves if requested object is split directory */
- CDEBUG(D_OTHER, "revalidate attrs for "DFID"\n", PFID(cid));
- rc = lmv_fld_lookup(lmv, cid, &mds);
+ /* save op_data fro repeat case */
+ *sop_data = *op_data;
+
+ if (fid_is_sane(&op_data->fid2)) {
+ /*
+ * Caller wants to revalidate attrs of obj we have to revalidate
+ * slaves if requested object is split directory.
+ */
+ CDEBUG(D_OTHER, "revalidate attrs for "DFID"\n",
+ PFID(&op_data->fid2));
+
+ rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds);
if (rc)
- GOTO(out_free_op_data, rc);
+ GOTO(out_free_sop_data, rc);
#if 0
- obj = lmv_obj_grab(obd, cid);
+ /*
+ * In fact, we do not need this with current intent_lock(), but
+ * it may change some day.
+ */
+ obj = lmv_obj_grab(obd, &op_data->fid2);
if (obj) {
- /* in fact, we do not need this with current
- * intent_lock(), but it may change some day */
- if (!lu_fid_eq(pid, cid)){
+ if (!lu_fid_eq(&op_data->fid1, &op_data->fid2)){
rpid = obj->lo_inodes[mds].li_fid;
rc = lmv_fld_lookup(lmv, &rpid, &mds);
if (rc) {
lmv_obj_put(obj);
- GOTO(out_free_op_data, rc);
+ GOTO(out_free_sop_data, rc);
}
}
lmv_obj_put(obj);
}
#endif
- op_data->fid2 = *cid;
} else {
CDEBUG(D_OTHER, "INTENT getattr for %*s on "DFID"\n",
- len, name, PFID(pid));
- rc = lmv_fld_lookup(lmv, pid, &mds);
+ op_data->namelen, op_data->name,
+ PFID(&op_data->fid1));
+
+ rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
if (rc)
- GOTO(out_free_op_data, rc);
- obj = lmv_obj_grab(obd, pid);
- if (obj && len) {
+ GOTO(out_free_sop_data, rc);
+ obj = lmv_obj_grab(obd, &op_data->fid1);
+ if (obj && op_data->namelen) {
/* directory is already split. calculate mds */
mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- (char *)name, len);
+ (char *)op_data->name,
+ op_data->namelen);
+
rpid = obj->lo_inodes[mds].li_fid;
rc = lmv_fld_lookup(lmv, &rpid, &mds);
if (rc) {
lmv_obj_put(obj);
- GOTO(out_free_op_data, rc);
+ GOTO(out_free_sop_data, rc);
}
lmv_obj_put(obj);
}
}
- op_data->fid1 = rpid;
- op_data->name = name;
- op_data->namelen = len;
+ sop_data->fid1 = rpid;
- /* the same about fid returning. */
- rc = md_intent_lock(lmv->tgts[mds].ltd_exp, op_data, lmm,
+ rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm,
lmmsize, it, flags, reqp, cb_blocking,
extra_lock_flags);
if (rc < 0)
- GOTO(out_free_op_data, rc);
+ GOTO(out_free_sop_data, rc);
if (obj && rc > 0) {
/*
- * this is split dir. In order to optimize things a bit, we
+ * This is split dir. In order to optimize things a bit, we
* consider obj valid updating missing parts.
* FIXME: do we need to return any lock here? It would be fine
- * if we don't. this means that nobody should use UPDATE lock to
+ * if we don't. This means that nobody should use UPDATE lock to
* notify about object * removal.
*/
CDEBUG(D_OTHER,
"revalidate slaves for "DFID", rc %d\n",
- PFID(cid), rc);
+ PFID(&op_data->fid2), rc);
- LASSERT(cid != 0);
- rc = lmv_revalidate_slaves(exp, reqp, cid, it, rc,
+ LASSERT(fid_is_sane(&op_data->fid2));
+ rc = lmv_revalidate_slaves(exp, reqp, &op_data->fid2, it, rc,
cb_blocking, extra_lock_flags);
- GOTO(out_free_op_data, rc);
+ GOTO(out_free_sop_data, rc);
}
if (*reqp == NULL)
- GOTO(out_free_op_data, rc);
+ GOTO(out_free_sop_data, rc);
/*
- * okay, MDS has returned success. probably name has been resolved in
+ * okay, MDS has returned success. Probably name has been resolved in
* remote inode.
*/
rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags,
reqp, cb_blocking, extra_lock_flags);
if (rc < 0)
- GOTO(out_free_op_data, rc);
+ GOTO(out_free_sop_data, rc);
/*
- * nothing is found, do not access body->fid1 as it is zero and thus
+ * Nothing is found, do not access body->fid1 as it is zero and thus
* pointless.
*/
if (it->d.lustre.it_disposition & DISP_LOOKUP_NEG)
- GOTO(out_free_op_data, rc = 0);
+ GOTO(out_free_sop_data, rc = 0);
LASSERT(*reqp);
LASSERT((*reqp)->rq_repmsg);
- body = lustre_msg_buf((*reqp)->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
+ body = lustre_msg_buf((*reqp)->rq_repmsg,
+ DLM_REPLY_REC_OFF, sizeof(*body));
LASSERT(body != NULL);
/* could not find object, FID is not present in response. */
if (!(body->valid & OBD_MD_FLID))
- GOTO(out_free_op_data, rc = 0);
+ GOTO(out_free_sop_data, rc = 0);
- cid = &body->fid1;
- obj2 = lmv_obj_grab(obd, cid);
+ obj2 = lmv_obj_grab(obd, &body->fid1);
if (!obj2 && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
/* wow! this is split dir, we'd like to handle it. */
- body = lustre_msg_buf((*reqp)->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
- LASSERT(body != NULL);
-
obj2 = lmv_obj_create(exp, &body->fid1, mea);
if (IS_ERR(obj2))
- GOTO(out_free_op_data, rc = (int)PTR_ERR(obj2));
+ GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj2));
}
if (obj2) {
/* this is split dir and we'd want to get attrs */
CDEBUG(D_OTHER, "attrs from slaves for "DFID", rc %d\n",
- PFID(cid), rc);
+ PFID(&body->fid1), rc);
- rc = lmv_revalidate_slaves(exp, reqp, cid, it, 1,
+ rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
cb_blocking, extra_lock_flags);
lmv_obj_put(obj2);
}
EXIT;
-out_free_op_data:
- OBD_FREE_PTR(op_data);
+out_free_sop_data:
+ OBD_FREE_PTR(sop_data);
return rc;
}
return rc;
}
-int lmv_intent_lookup(struct obd_export *exp, const struct lu_fid *pid,
- const char *name, int len, void *lmm, int lmmsize,
- const struct lu_fid *cid, struct lookup_intent *it,
+int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
+ void *lmm, int lmmsize, struct lookup_intent *it,
int flags, struct ptlrpc_request **reqp,
ldlm_blocking_callback cb_blocking,
int extra_lock_flags)
{
struct obd_device *obd = exp->exp_obd;
+ struct lu_fid rpid = op_data->fid1;
struct lmv_obd *lmv = &obd->u.lmv;
struct mdt_body *body = NULL;
- struct lu_fid rpid = *pid;
- struct md_op_data *op_data;
+ struct md_op_data *sop_data;
struct lmv_stripe_md *mea;
struct lmv_obj *obj;
int rc, loop = 0;
mdsno_t mds;
ENTRY;
- OBD_ALLOC_PTR(op_data);
- if (op_data == NULL)
+ OBD_ALLOC_PTR(sop_data);
+ if (sop_data == NULL)
RETURN(-ENOMEM);
+ /* save op_data fro repeat case */
+ *sop_data = *op_data;
+
/*
* IT_LOOKUP is intended to produce name -> fid resolving (let's call
* this lookup below) or to confirm requested resolving is still valid
- * (let's call this revalidation) cid != NULL specifies revalidation.
+ * (let's call this revalidation) fid_is_sane(&sop_data->fid2) specifies
+ * revalidation.
*/
- if (cid) {
+ if (fid_is_sane(&op_data->fid2)) {
/*
* This is revalidate: we have to check is LOOKUP lock still
* valid for given fid. Very important part is that we have to
* choose right mds because namespace is per mds.
*/
- rpid = *pid;
- obj = lmv_obj_grab(obd, pid);
+ rpid = op_data->fid1;
+ obj = lmv_obj_grab(obd, &rpid);
if (obj) {
mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- (char *)name, len);
+ (char *)op_data->name,
+ op_data->namelen);
rpid = obj->lo_inodes[mds].li_fid;
lmv_obj_put(obj);
}
rc = lmv_fld_lookup(lmv, &rpid, &mds);
if (rc)
- GOTO(out_free_op_data, rc);
+ GOTO(out_free_sop_data, rc);
CDEBUG(D_OTHER, "revalidate lookup for "DFID" to #"LPU64" MDS\n",
- PFID(cid), mds);
-
- op_data->fid2 = *cid;
+ PFID(&op_data->fid2), mds);
} else {
- rc = lmv_fld_lookup(lmv, pid, &mds);
+ rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
if (rc)
- GOTO(out_free_op_data, rc);
+ GOTO(out_free_sop_data, rc);
repeat:
LASSERT(++loop <= 2);
* attributes, because returned values will be put in struct
* inode.
*/
- obj = lmv_obj_grab(obd, pid);
+ obj = lmv_obj_grab(obd, &op_data->fid1);
if (obj) {
- if (len) {
+ if (op_data->namelen) {
/* directory is already split. calculate mds */
- mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- (char *)name, len);
+ mds = raw_name2idx(obj->lo_hashtype,
+ obj->lo_objcount,
+ (char *)op_data->name,
+ op_data->namelen);
rpid = obj->lo_inodes[mds].li_fid;
rc = lmv_fld_lookup(lmv, &rpid, &mds);
if (rc) {
lmv_obj_put(obj);
- GOTO(out_free_op_data, rc);
+ GOTO(out_free_sop_data, rc);
}
}
lmv_obj_put(obj);
fid_zero(&op_data->fid2);
}
- op_data->fid1 = rpid;
- op_data->name = name;
- op_data->namelen = len;
+ sop_data->fid1 = rpid;
- rc = md_intent_lock(lmv->tgts[mds].ltd_exp, op_data, lmm, lmmsize,
+ rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm, lmmsize,
it, flags, reqp, cb_blocking, extra_lock_flags);
if (rc > 0) {
- LASSERT(cid != 0);
- GOTO(out_free_op_data, rc);
+ LASSERT(fid_is_sane(&op_data->fid2));
+ GOTO(out_free_sop_data, rc);
}
if (rc > 0) {
/*
- * very interesting. it seems object is still valid but for some
+ * Very interesting. it seems object is still valid but for some
* reason llite calls lookup, not revalidate.
*/
CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n",
PFID(&rpid));
LASSERT(*reqp == NULL);
- GOTO(out_free_op_data, rc);
+ GOTO(out_free_sop_data, rc);
}
if (rc == 0 && *reqp == NULL) {
/* once again, we're asked for lookup, not revalidate */
CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n",
PFID(&rpid));
- GOTO(out_free_op_data, rc);
+ GOTO(out_free_sop_data, rc);
}
if (rc == -ERESTART) {
- /* directory got split since last update. this shouldn't be
+ /*
+ * Directory got split since last update. This shouldn't be
* becasue splitting causes lock revocation, so revalidate had
- * to fail and lookup on dir had to return mea */
+ * to fail and lookup on dir had to return mea.
+ */
CWARN("we haven't knew about directory splitting!\n");
LASSERT(obj == NULL);
obj = lmv_obj_create(exp, &rpid, NULL);
if (IS_ERR(obj))
- GOTO(out_free_op_data, rc = (int)PTR_ERR(obj));
+ RETURN((int)PTR_ERR(obj));
lmv_obj_put(obj);
- memset(op_data, 0, sizeof(*op_data));
goto repeat;
}
if (rc < 0)
- GOTO(out_free_op_data, rc);
+ GOTO(out_free_sop_data, rc);
- /* okay, MDS has returned success. Probably name has been resolved in
- * remote inode. */
+ /*
+ * Okay, MDS has returned success. Probably name has been resolved in
+ * remote inode.
+ */
rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
cb_blocking, extra_lock_flags);
if (!obj) {
obj = lmv_obj_create(exp, &body->fid1, mea);
if (IS_ERR(obj))
- GOTO(out_free_op_data, rc = (int)PTR_ERR(obj));
+ GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
}
lmv_obj_put(obj);
}
EXIT;
-out_free_op_data:
- OBD_FREE_PTR(op_data);
+out_free_sop_data:
+ OBD_FREE_PTR(sop_data);
return rc;
}
int extra_lock_flags)
{
struct obd_device *obd = exp->exp_obd;
- const char *name = op_data->name;
- int len = op_data->namelen;
- struct lu_fid *pid, *cid;
int rc;
ENTRY;
LASSERT(it != NULL);
LASSERT(fid_is_sane(&op_data->fid1));
- pid = &op_data->fid1;
-
- cid = fid_is_sane(&op_data->fid2) ? &op_data->fid2 : NULL;
-
CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on "DFID"\n",
- LL_IT2STR(it), len, name, PFID(pid));
+ LL_IT2STR(it), op_data->namelen, op_data->name,
+ PFID(&op_data->fid1));
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
if (it->it_op & IT_LOOKUP)
- rc = lmv_intent_lookup(exp, pid, name, len, lmm,
- lmmsize, cid, it, flags, reqp,
- cb_blocking, extra_lock_flags);
+ rc = lmv_intent_lookup(exp, op_data, lmm, lmmsize, it,
+ flags, reqp, cb_blocking,
+ extra_lock_flags);
else if (it->it_op & IT_OPEN)
- rc = lmv_intent_open(exp, pid, name, len, lmm,
- lmmsize, cid, it, flags, reqp,
- cb_blocking, extra_lock_flags);
+ rc = lmv_intent_open(exp, op_data, lmm, lmmsize, it,
+ flags, reqp, cb_blocking,
+ extra_lock_flags);
else if (it->it_op & IT_GETATTR)
- rc = lmv_intent_getattr(exp, pid, name, len, lmm,
- lmmsize, cid, it, flags, reqp,
- cb_blocking, extra_lock_flags);
+ rc = lmv_intent_getattr(exp, op_data,lmm, lmmsize, it,
+ flags, reqp, cb_blocking,
+ extra_lock_flags);
else
LBUG();
RETURN(rc);