X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flmv%2Flmv_intent.c;h=c308cdf9c4acc01c8c87689c33bb18497fba7514;hb=0007277c94581e79fe0441d424a3a84e0f5c6d6a;hp=8726bdab0bb40bd139ad37f7ca5dc2084e989be6;hpb=386f2e3a877e214eecd69130a45b45d7570260a7;p=fs%2Flustre-release.git diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index 8726bda..c308cdf 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -31,6 +31,7 @@ #include #include #include +#include #else #include #endif @@ -46,117 +47,125 @@ #include #include #include +#include #include "lmv_internal.h" - static inline void lmv_drop_intent_lock(struct lookup_intent *it) { - if (it->d.lustre.it_lock_mode != 0) - ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle, - it->d.lustre.it_lock_mode); + if (LUSTRE_IT(it)->it_lock_mode != 0) + ldlm_lock_decref((void *)&LUSTRE_IT(it)->it_lock_handle, + LUSTRE_IT(it)->it_lock_mode); } -int lmv_handle_remote_inode(struct obd_export *exp, struct ll_uctxt *uctxt, - void *lmm, int lmmsize, - struct lookup_intent *it, int flags, - struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking) +int lmv_intent_remote(struct obd_export *exp, void *lmm, + int lmmsize, struct lookup_intent *it, + int flags, struct ptlrpc_request **reqp, + ldlm_blocking_callback cb_blocking) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; + struct ptlrpc_request *req = NULL; struct mds_body *body = NULL; - int rc = 0; + struct lustre_handle plock; + struct lustre_id nid; + int pmode, rc = 0; ENTRY; body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); - if (body->valid & OBD_MD_MDS) { - /* oh, MDS reports that this is remote inode case - * i.e. we have to ask for real attrs on another MDS */ - struct ptlrpc_request *req; - struct ll_fid nfid; - struct lustre_handle plock; - int pmode; - - if (it->it_op == IT_LOOKUP) { - /* unfortunately, we have to lie to MDC/MDS to - * retrieve attributes llite needs */ - it->it_op = IT_GETATTR; - } - - /* we got LOOKUP lock, but we really need attrs */ - pmode = it->d.lustre.it_lock_mode; - if (pmode) { - memcpy(&plock, &it->d.lustre.it_lock_handle, - sizeof(plock)); - it->d.lustre.it_lock_mode = 0; - } - - nfid = body->fid1; - it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE; - rc = md_intent_lock(lmv->tgts[nfid.mds].ltd_exp, uctxt, &nfid, - NULL, 0, lmm, lmmsize, NULL, it, flags, - &req, cb_blocking); + if (!(body->valid & OBD_MD_MDS)) + RETURN(0); + + /* + * oh, MDS reports that this is remote inode case i.e. we have to ask + * for real attrs on another MDS. + */ + if (it->it_op == IT_LOOKUP || it->it_op == IT_CHDIR) { + /* + * unfortunately, we have to lie to MDC/MDS to retrieve + * attributes llite needs. + */ + it->it_op = IT_GETATTR; + } - /* llite needs LOOKUP lock to track dentry revocation in - * order to maintain dcache consistency. thus drop UPDATE - * lock here and put LOOKUP in request */ - if (rc == 0) { - lmv_drop_intent_lock(it); - memcpy(&it->d.lustre.it_lock_handle, &plock, - sizeof(plock)); - it->d.lustre.it_lock_mode = pmode; - - } else if (pmode) - ldlm_lock_decref(&plock, pmode); + /* we got LOOKUP lock, but we really need attrs */ + pmode = LUSTRE_IT(it)->it_lock_mode; + if (pmode) { + memcpy(&plock, &LUSTRE_IT(it)->it_lock_handle, + sizeof(plock)); + LUSTRE_IT(it)->it_lock_mode = 0; + LUSTRE_IT(it)->it_data = 0; + } - ptlrpc_req_finished(*reqp); - *reqp = req; + LASSERT((body->valid & OBD_MD_FID) != 0); + + nid = body->id1; + LUSTRE_IT(it)->it_disposition &= ~DISP_ENQ_COMPLETE; + rc = md_intent_lock(lmv->tgts[id_group(&nid)].ltd_exp, &nid, + NULL, 0, lmm, lmmsize, NULL, it, flags, + &req, cb_blocking); + + /* + * llite needs LOOKUP lock to track dentry revocation in order to + * maintain dcache consistency. Thus drop UPDATE lock here and put + * LOOKUP in request. + */ + if (rc == 0) { + lmv_drop_intent_lock(it); + memcpy(&LUSTRE_IT(it)->it_lock_handle, &plock, + sizeof(plock)); + LUSTRE_IT(it)->it_lock_mode = pmode; + } else if (pmode) { + ldlm_lock_decref(&plock, pmode); } + + ptlrpc_req_finished(*reqp); + *reqp = req; RETURN(rc); } -int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt, - struct ll_fid *pfid, const char *name, int len, - void *lmm, int lmmsize, struct ll_fid *cfid, - struct lookup_intent *it, int flags, - struct ptlrpc_request **reqp, +int lmv_intent_open(struct obd_export *exp, struct lustre_id *pid, + const char *name, int len, void *lmm, int lmmsize, + struct lustre_id *cid, struct lookup_intent *it, + int flags, struct ptlrpc_request **reqp, ldlm_blocking_callback cb_blocking) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct mds_body *body = NULL; - struct ll_fid rpfid = *pfid; + struct lustre_id rpid = *pid; + int rc, mds, loop = 0; struct lmv_obj *obj; struct mea *mea; - int rc, mds; ENTRY; /* IT_OPEN is intended to open (and create, possible) an object. Parent - * (pfid) may be splitted dir */ + * (pid) may be splitted dir */ repeat: - mds = rpfid.mds; - obj = lmv_grab_obj(obd, &rpfid); + LASSERT(++loop <= 2); + mds = id_group(&rpid); + obj = lmv_grab_obj(obd, &rpid); if (obj) { /* directory is already splitted, so we have to forward * request to the right MDS */ - mds = raw_name2idx(obj->objcount, (char *)name, len); - CDEBUG(D_OTHER, "forward to MDS #%u\n", mds); - - rpfid = obj->objs[mds].fid; + mds = raw_name2idx(obj->hashtype, obj->objcount, + (char *)name, len); + + CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n", + mds, OLID4(&rpid)); + rpid = obj->objs[mds].id; lmv_put_obj(obj); } - rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name, - len, lmm, lmmsize, cfid, it, flags, reqp, - cb_blocking); + rc = md_intent_lock(lmv->tgts[id_group(&rpid)].ltd_exp, &rpid, name, + len, lmm, lmmsize, cid, it, flags, reqp, cb_blocking); if (rc == -ERESTART) { - /* directory got splitted. time to update local object - * and repeat the request with proper MDS */ - LASSERT(fid_equal(pfid, &rpfid)); - rc = lmv_get_mea_and_update_object(exp, &rpfid); + /* directory got splitted. time to update local object and + * repeat the request with proper MDS */ + LASSERT(id_equal_fid(pid, &rpid)); + rc = lmv_get_mea_and_update_object(exp, &rpid); if (rc == 0) { ptlrpc_req_finished(*reqp); goto repeat; @@ -167,39 +176,59 @@ repeat: /* okay, MDS has returned success. Probably name has been resolved in * remote inode */ - rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, - flags, reqp, cb_blocking); + rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp, cb_blocking); if (rc != 0) { LASSERT(rc < 0); + + /* + * this is possible, that some userspace application will try to + * open file as directory and we will have -ENOTDIR here. As + * this is "usual" situation, we should not print error here, + * only debug info. + */ + CDEBUG(D_OTHER, "can't handle remote %s: dir "DLID4"("DLID4"):" + "%*s: %d\n", LL_IT2STR(it), OLID4(pid), OLID4(&rpid), + len, name, rc); RETURN(rc); } + /* + * nothing is found, do not access body->id1 as it is zero and thus + * pointless. + */ + if ((LUSTRE_IT(it)->it_disposition & DISP_LOOKUP_NEG) && + !(LUSTRE_IT(it)->it_disposition & DISP_OPEN_CREATE) && + !(LUSTRE_IT(it)->it_disposition & DISP_OPEN_OPEN)) + RETURN(0); + /* caller may use attrs MDS returns on IT_OPEN lock request so, we have * to update them for splitted dir */ body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); - cfid = &body->fid1; - obj = lmv_grab_obj(obd, cfid); - if (!obj && (mea = body_of_splitted_dir(*reqp, 1))) { + /* could not find object, FID is not present in response. */ + if (!(body->valid & OBD_MD_FID)) + RETURN(0); + + cid = &body->id1; + obj = lmv_grab_obj(obd, cid); + if (!obj && (mea = lmv_splitted_dir_body(*reqp, 1))) { /* wow! this is splitted dir, we'd like to handle it */ - obj = lmv_create_obj(exp, &body->fid1, mea); + obj = lmv_create_obj(exp, &body->id1, mea); if (IS_ERR(obj)) RETURN(PTR_ERR(obj)); } if (obj) { /* this is splitted dir and we'd want to get attrs */ - CDEBUG(D_OTHER, "attrs from slaves for %lu/%lu/%lu\n", - (unsigned long)cfid->mds, (unsigned long)cfid->id, - (unsigned long)cfid->generation); - rc = lmv_revalidate_slaves(exp, reqp, cfid, - it, 1, cb_blocking); + CDEBUG(D_OTHER, "attrs from slaves for "DLID4"\n", + OLID4(cid)); + + rc = lmv_revalidate_slaves(exp, reqp, cid, it, 1, + cb_blocking); } else if (S_ISDIR(body->mode)) { - /*CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n", - (unsigned long) cfid->mds, - (unsigned long) cfid->id, - (unsigned long) cfid->generation);*/ + CDEBUG(D_OTHER, "object "DLID4" has not lmv obj?\n", + OLID4(cid)); } if (obj) @@ -208,114 +237,123 @@ repeat: RETURN(rc); } -int lmv_intent_getattr(struct obd_export *exp, struct ll_uctxt *uctxt, - struct ll_fid *pfid, const char *name, int len, - void *lmm, int lmmsize, struct ll_fid *cfid, - struct lookup_intent *it, int flags, - struct ptlrpc_request **reqp, +int lmv_intent_getattr(struct obd_export *exp, struct lustre_id *pid, + const char *name, int len, void *lmm, int lmmsize, + struct lustre_id *cid, struct lookup_intent *it, + int flags, struct ptlrpc_request **reqp, ldlm_blocking_callback cb_blocking) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct mds_body *body = NULL; - struct ll_fid rpfid = *pfid; - struct lmv_obj *obj, *obj2; + struct lustre_id rpid = *pid; + struct lmv_obj *obj = NULL, *obj2 = NULL; struct mea *mea; int rc = 0, mds; ENTRY; - if (cfid) { + if (cid) { /* caller wants to revalidate attrs of obj we have to revalidate * slaves if requested object is splitted directory */ - CDEBUG(D_OTHER, "revalidate attrs for %lu/%lu/%lu\n", - (unsigned long)cfid->mds, (unsigned long)cfid->id, - (unsigned long)cfid->generation); - mds = cfid->mds; - obj = lmv_grab_obj(obd, cfid); + CDEBUG(D_OTHER, "revalidate attrs for "DLID4"\n", OLID4(cid)); + mds = id_group(cid); +#if 0 + obj = lmv_grab_obj(obd, cid); if (obj) { /* in fact, we need not this with current intent_lock(), * but it may change some day */ - rpfid = obj->objs[mds].fid; + if (!id_equal_fid(pid, cid)){ + rpid = obj->objs[mds].id; + mds = id_group(&rpid); + } lmv_put_obj(obj); } - rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name, - len, lmm, lmmsize, cfid, it, flags, reqp, - cb_blocking); - if (obj && rc >= 0) { - /* this is splitted dir. In order to optimize things a - * bit, we consider obj valid updating missing parts. - - * FIXME: do we need to return any lock here? It would - * be fine if we don't. this means that nobody should - * use UPDATE lock to notify about object * removal */ - CDEBUG(D_OTHER, - "revalidate slaves for %lu/%lu/%lu, rc %d\n", - (unsigned long)cfid->mds, (unsigned long)cfid->id, - (unsigned long)cfid->generation, rc); - - rc = lmv_revalidate_slaves(exp, reqp, cfid, it, rc, - cb_blocking); - } +#endif + } else { + CDEBUG(D_OTHER, "INTENT getattr for %*s on "DLID4"\n", + len, name, OLID4(pid)); + mds = id_group(pid); + obj = lmv_grab_obj(obd, pid); + if (obj && len) { + /* directory is already splitted. calculate mds */ + mds = raw_name2idx(obj->hashtype, obj->objcount, + (char *)name, len); + rpid = obj->objs[mds].id; + mds = id_group(&rpid); + lmv_put_obj(obj); - RETURN(rc); + CDEBUG(D_OTHER, "forward to MDS #%u (slave "DLID4")\n", + mds, OLID4(&rpid)); + } } - CDEBUG(D_OTHER, "INTENT getattr for %*s on %lu/%lu/%lu\n", - len, name, (unsigned long)pfid->mds, (unsigned long)pfid->id, - (unsigned long)pfid->generation); - - mds = pfid->mds; - obj = lmv_grab_obj(obd, pfid); - if (obj && len) { - /* directory is already splitted. calculate mds */ - mds = raw_name2idx(obj->objcount, (char *) name, len); - rpfid = obj->objs[mds].fid; - lmv_put_obj(obj); - - CDEBUG(D_OTHER, "forward to MDS #%u (slave %lu/%lu/%lu)\n", - mds, (unsigned long)rpfid.mds, (unsigned long)rpfid.id, - (unsigned long)rpfid.generation); - } - - rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name, - len, lmm, lmmsize, NULL, it, flags, reqp, - cb_blocking); - + /* the same about fid returning. */ + rc = md_intent_lock(lmv->tgts[mds].ltd_exp, &rpid, name, len, lmm, + lmmsize, cid, it, flags, reqp, cb_blocking); if (rc < 0) RETURN(rc); - - LASSERT(rc == 0); + + if (obj && rc > 0) { + /* this is splitted dir. In order to optimize things a + * bit, we consider obj valid updating missing parts. + + * FIXME: do we need to return any lock here? It would + * be fine if we don't. this means that nobody should + * use UPDATE lock to notify about object * removal */ + CDEBUG(D_OTHER, + "revalidate slaves for "DLID4", rc %d\n", + OLID4(cid), rc); + + LASSERT(cid != 0); + rc = lmv_revalidate_slaves(exp, reqp, cid, it, rc, + cb_blocking); + RETURN(rc); + } + if (*reqp == NULL) + RETURN(rc); + /* okay, MDS has returned success. probably name has been * resolved in remote inode */ - rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, - flags, reqp, cb_blocking); + rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, + reqp, cb_blocking); if (rc < 0) RETURN(rc); + /* + * nothing is found, do not access body->id1 as it is zero and thus + * pointless. + */ + if (LUSTRE_IT(it)->it_disposition & DISP_LOOKUP_NEG) + RETURN(0); + body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); - - cfid = &body->fid1; - obj2 = lmv_grab_obj(obd, cfid); - if (!obj2 && (mea = body_of_splitted_dir(*reqp, 1))) { + /* could not find object, FID is not present in response. */ + if (!(body->valid & OBD_MD_FID)) + RETURN(0); + + cid = &body->id1; + obj2 = lmv_grab_obj(obd, cid); + + if (!obj2 && (mea = lmv_splitted_dir_body(*reqp, 1))) { /* wow! this is splitted dir, we'd like to handle it. */ body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); - obj2 = lmv_create_obj(exp, &body->fid1, mea); + obj2 = lmv_create_obj(exp, &body->id1, mea); if (IS_ERR(obj2)) RETURN(PTR_ERR(obj2)); } if (obj2) { /* this is splitted dir and we'd want to get attrs */ - CDEBUG(D_OTHER, "attrs from slaves for %lu/%lu/%lu, rc %d\n", - (unsigned long)cfid->mds, (unsigned long)cfid->id, - (unsigned long)cfid->generation, rc); + CDEBUG(D_OTHER, "attrs from slaves for "DLID4", rc %d\n", + OLID4(cid), rc); - rc = lmv_revalidate_slaves(exp, reqp, cfid, it, 1, cb_blocking); + rc = lmv_revalidate_slaves(exp, reqp, cid, it, 1, + cb_blocking); lmv_put_obj(obj2); } RETURN(rc); @@ -325,11 +363,6 @@ void lmv_update_body_from_obj(struct mds_body *body, struct lmv_inode *obj) { /* update size */ body->size += obj->size; - - /* update atime */ - /* update ctime */ - /* update mtime */ - /* update nlink */ } int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) @@ -340,7 +373,6 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) struct lustre_handle *lockh; struct ldlm_lock *lock; struct mds_body *body2; - struct ll_uctxt uctxt; struct lmv_obj *obj; int i, rc = 0; ENTRY; @@ -360,70 +392,51 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); + LASSERT((body->valid & OBD_MD_FID) != 0); - obj = lmv_grab_obj(obd, &body->fid1); + obj = lmv_grab_obj(obd, &body->id1); LASSERT(obj != NULL); - CDEBUG(D_OTHER, "lookup slaves for %lu/%lu/%lu\n", - (unsigned long)body->fid1.mds, - (unsigned long)body->fid1.id, - (unsigned long)body->fid1.generation); - - uctxt.gid1 = 0; - uctxt.gid2 = 0; + CDEBUG(D_OTHER, "lookup slaves for "DLID4"\n", + OLID4(&body->id1)); lmv_lock_obj(obj); for (i = 0; i < obj->objcount; i++) { - struct ll_fid fid = obj->objs[i].fid; + struct lustre_id id = obj->objs[i].id; struct ptlrpc_request *req = NULL; struct lookup_intent it; - if (fid_equal(&fid, &obj->fid)) + if (id_equal_fid(&id, &obj->id)) /* skip master obj */ continue; - CDEBUG(D_OTHER, "lookup slave %lu/%lu/%lu\n", - (unsigned long)fid.mds, (unsigned long)fid.id, - (unsigned long)fid.generation); + CDEBUG(D_OTHER, "lookup slave "DLID4"\n", OLID4(&id)); /* is obj valid? */ memset(&it, 0, sizeof(it)); it.it_op = IT_GETATTR; - rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid, - NULL, 0, NULL, 0, &fid, &it, 0, &req, + OBD_ALLOC(it.d.fs_data, sizeof(struct lustre_intent_data)); + if (!it.d.fs_data) + GOTO(cleanup, rc = -ENOMEM); + + rc = md_intent_lock(lmv->tgts[id_group(&id)].ltd_exp, &id, + NULL, 0, NULL, 0, &id, &it, 0, &req, lmv_dirobj_blocking_ast); - lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle; - if (rc > 0) { + lockh = (struct lustre_handle *)&LUSTRE_IT(&it)->it_lock_handle; + if (rc > 0 && req == NULL) { /* nice, this slave is valid */ LASSERT(req == NULL); CDEBUG(D_OTHER, "cached\n"); goto release_lock; } - if (rc < 0) - /* error during revalidation */ - GOTO(cleanup, rc); - - /* rc == 0, this means we have no such a lock and can't think - * obj is still valid. lookup it again */ - LASSERT(req == NULL); - req = NULL; - - memset(&it, 0, sizeof(it)); - it.it_op = IT_GETATTR; - rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid, - NULL, 0, NULL, 0, NULL, &it, 0, &req, - lmv_dirobj_blocking_ast); - - lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle; - LASSERT(rc <= 0); - - if (rc < 0) + if (rc < 0) { + OBD_FREE(it.d.fs_data, sizeof(struct lustre_intent_data)); /* error during lookup */ GOTO(cleanup, rc); - + } lock = ldlm_handle2lock(lockh); LASSERT(lock); @@ -444,90 +457,98 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) release_lock: lmv_update_body_from_obj(body, obj->objs + i); - if (it.d.lustre.it_lock_mode) - ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode); + if (LUSTRE_IT(&it)->it_lock_mode) + ldlm_lock_decref(lockh, LUSTRE_IT(&it)->it_lock_mode); + OBD_FREE(it.d.fs_data, sizeof(struct lustre_intent_data)); } + + EXIT; cleanup: lmv_unlock_obj(obj); lmv_put_obj(obj); - RETURN(rc); + return rc; } -int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt, - struct ll_fid *pfid, const char *name, int len, - void *lmm, int lmmsize, struct ll_fid *cfid, - struct lookup_intent *it, int flags, - struct ptlrpc_request **reqp, +int lmv_intent_lookup(struct obd_export *exp, struct lustre_id *pid, + const char *name, int len, void *lmm, int lmmsize, + struct lustre_id *cid, struct lookup_intent *it, + int flags, struct ptlrpc_request **reqp, ldlm_blocking_callback cb_blocking) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct mds_body *body = NULL; - struct ll_fid rpfid = *pfid; + struct lustre_id rpid = *pid; struct lmv_obj *obj; struct mea *mea; - int rc, mds; + int rc, mds, loop = 0; ENTRY; - /* IT_LOOKUP is intended to produce name -> fid resolving (let's call + /* + * IT_LOOKUP is intended to produce name -> id resolving (let's call * this lookup below) or to confirm requested resolving is still valid - * (let's call this revalidation) cfid != NULL specifies revalidation */ - - if (cfid) { - /* this is revalidation: we have to check is LOOKUP lock still - * valid for given fid. very important part is that we have to - * choose right mds because namespace is per mds */ - rpfid = *pfid; - obj = lmv_grab_obj(obd, pfid); + * (let's call this revalidation) cid != NULL specifies revalidation. + */ + if (cid) { + /* + * this is revalidation: we have to check is LOOKUP lock still + * valid for given id. Very important part is that we have to + * choose right mds because namespace is per mds. + */ + rpid = *pid; + obj = lmv_grab_obj(obd, pid); if (obj) { - mds = raw_name2idx(obj->objcount, (char *) name, len); - rpfid = obj->objs[mds].fid; + mds = raw_name2idx(obj->hashtype, obj->objcount, + (char *)name, len); + rpid = obj->objs[mds].id; lmv_put_obj(obj); } - mds = rpfid.mds; - - CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu to %d MDS\n", - (unsigned long)cfid->mds, (unsigned long)cfid->id, - (unsigned long)cfid->generation, mds); - - rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, pfid, name, - len, lmm, lmmsize, cfid, it, flags, - reqp, cb_blocking); - RETURN(rc); - } + mds = id_group(&rpid); + + CDEBUG(D_OTHER, "revalidate lookup for "DLID4" to %d MDS\n", + OLID4(cid), mds); - mds = pfid->mds; + } else { + mds = id_group(pid); repeat: - /* this is lookup. during lookup we have to update all the attributes, - * because returned values will be put in struct inode */ - - obj = lmv_grab_obj(obd, pfid); - if (obj && len) { - /* directory is already splitted. calculate mds */ - mds = raw_name2idx(obj->objcount, (char *)name, len); - rpfid = obj->objs[mds].fid; - lmv_put_obj(obj); - } + LASSERT(++loop <= 2); + + /* this is lookup. during lookup we have to update all the + * attributes, because returned values will be put in struct + * inode */ - rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name, - len, lmm, lmmsize, NULL, it, flags, reqp, - cb_blocking); + obj = lmv_grab_obj(obd, pid); + if (obj) { + if (len) { + /* directory is already splitted. calculate mds */ + mds = raw_name2idx(obj->hashtype, obj->objcount, + (char *)name, len); + rpid = obj->objs[mds].id; + mds = id_group(&rpid); + } + lmv_put_obj(obj); + } + } + rc = md_intent_lock(lmv->tgts[mds].ltd_exp, &rpid, name, + len, lmm, lmmsize, cid, it, flags, + reqp, cb_blocking); + if (rc > 0) { + LASSERT(cid != 0); + RETURN(rc); + } if (rc > 0) { /* very interesting. it seems object is still valid but for some * reason llite calls lookup, not revalidate */ - CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n", - (unsigned long)rpfid.mds, (unsigned long)rpfid.id, - (unsigned long)rpfid.generation); - + CDEBUG(D_OTHER, "lookup for "DLID4" and data should be uptodate\n", + OLID4(&rpid)); LASSERT(*reqp == NULL); RETURN(rc); } if (rc == 0 && *reqp == NULL) { /* once again, we're asked for lookup, not revalidate */ - CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n", - (unsigned long)rpfid.mds, (unsigned long)rpfid.id, - (unsigned long)rpfid.generation); + CDEBUG(D_OTHER, "lookup for "DLID4" and data should be uptodate\n", + OLID4(&rpid)); RETURN(rc); } @@ -538,44 +559,42 @@ repeat: CWARN("we haven't knew about directory splitting!\n"); LASSERT(obj == NULL); - obj = lmv_create_obj(exp, &rpfid, NULL); + obj = lmv_create_obj(exp, &rpid, NULL); if (IS_ERR(obj)) RETURN(PTR_ERR(obj)); - + lmv_put_obj(obj); goto repeat; } if (rc < 0) RETURN(rc); - /* okay, MDS has returned success. probably name has been resolved in - * remote inode */ - rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags, - reqp, cb_blocking); + /* okay, MDS has returned success. Probably name has been resolved in + * remote inode. */ + rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp, cb_blocking); - if (rc == 0 && (mea = body_of_splitted_dir(*reqp, 1))) { + if (rc == 0 && (mea = lmv_splitted_dir_body(*reqp, 1))) { /* wow! this is splitted dir, we'd like to handle it */ body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); + LASSERT((body->valid & OBD_MD_FID) != 0); - obj = lmv_grab_obj(obd, &body->fid1); + obj = lmv_grab_obj(obd, &body->id1); if (!obj) { - obj = lmv_create_obj(exp, &body->fid1, mea); + obj = lmv_create_obj(exp, &body->id1, mea); if (IS_ERR(obj)) RETURN(PTR_ERR(obj)); - } else { - lmv_put_obj(obj); } + lmv_put_obj(obj); } RETURN(rc); } -int lmv_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt, - struct ll_fid *pfid, const char *name, int len, - void *lmm, int lmmsize, struct ll_fid *cfid, - struct lookup_intent *it, int flags, - struct ptlrpc_request **reqp, +int lmv_intent_lock(struct obd_export *exp, struct lustre_id *pid, + const char *name, int len, void *lmm, int lmmsize, + struct lustre_id *cid, struct lookup_intent *it, + int flags, struct ptlrpc_request **reqp, ldlm_blocking_callback cb_blocking) { struct obd_device *obd = exp->exp_obd; @@ -583,27 +602,27 @@ int lmv_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt, ENTRY; LASSERT(it); - LASSERT(pfid); + LASSERT(pid); - CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on %lu/%lu -> %u\n", - LL_IT2STR(it), len, name, (unsigned long) pfid->id, - (unsigned long) pfid->generation, pfid->mds); + CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on %lu/%lu -> %lu\n", + LL_IT2STR(it), len, name, (unsigned long)id_ino(pid), + (unsigned long)id_gen(pid), (unsigned long)id_group(pid)); rc = lmv_check_connect(obd); if (rc) RETURN(rc); if (it->it_op == IT_LOOKUP) - rc = lmv_intent_lookup(exp, uctxt, pfid, name, len, lmm, - lmmsize, cfid, it, flags, reqp, + rc = lmv_intent_lookup(exp, pid, name, len, lmm, + lmmsize, cid, it, flags, reqp, cb_blocking); else if (it->it_op & IT_OPEN) - rc = lmv_intent_open(exp, uctxt, pfid, name, len, lmm, - lmmsize, cfid, it, flags, reqp, + rc = lmv_intent_open(exp, pid, name, len, lmm, + lmmsize, cid, it, flags, reqp, cb_blocking); else if (it->it_op == IT_GETATTR || it->it_op == IT_CHDIR) - rc = lmv_intent_getattr(exp, uctxt, pfid, name, len, lmm, - lmmsize, cfid, it, flags, reqp, + rc = lmv_intent_getattr(exp, pid, name, len, lmm, + lmmsize, cid, it, flags, reqp, cb_blocking); else LBUG(); @@ -611,7 +630,7 @@ int lmv_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt, } int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, - struct ll_fid *mfid, struct lookup_intent *oit, + struct lustre_id *mid, struct lookup_intent *oit, int master_valid, ldlm_blocking_callback cb_blocking) { struct obd_device *obd = exp->exp_obd; @@ -621,7 +640,6 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, struct ldlm_lock *lock; unsigned long size = 0; struct mds_body *body; - struct ll_uctxt uctxt; struct lmv_obj *obj; int master_lock_mode; int i, rc = 0; @@ -632,32 +650,34 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, * the fields. say, common fields (that are equal on all the subojects * need not to be update, another fields (i_size, for example) are * cached all the time */ - obj = lmv_grab_obj(obd, mfid); + obj = lmv_grab_obj(obd, mid); LASSERT(obj != NULL); - uctxt.gid1 = 0; - uctxt.gid2 = 0; master_lock_mode = 0; lmv_lock_obj(obj); for (i = 0; i < obj->objcount; i++) { - struct ll_fid fid = obj->objs[i].fid; + struct lustre_id id = obj->objs[i].id; struct lustre_handle *lockh = NULL; struct ptlrpc_request *req = NULL; ldlm_blocking_callback cb; struct lookup_intent it; int master = 0; - CDEBUG(D_OTHER, "revalidate subobj %lu/%lu/%lu\n", - (unsigned long)fid.mds, (unsigned long)fid.id, - (unsigned long) fid.generation); + CDEBUG(D_OTHER, "revalidate subobj "DLID4"\n", + OLID4(&id)); memset(&it, 0, sizeof(it)); it.it_op = IT_GETATTR; + cb = lmv_dirobj_blocking_ast; - if (fid_equal(&fid, &obj->fid)) { + OBD_ALLOC(it.d.fs_data, sizeof(struct lustre_intent_data)); + if (!it.d.fs_data) + GOTO(cleanup, rc = -ENOMEM); + + if (id_equal_fid(&id, &obj->id)) { if (master_valid) { /* lmv_intent_getattr() already checked * validness and took the lock */ @@ -678,44 +698,31 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, cb = cb_blocking; } + /* is obj valid? */ - rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid, - NULL, 0, NULL, 0, &fid, &it, 0, &req, cb); - lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle; - if (rc > 0) { + rc = md_intent_lock(lmv->tgts[id_group(&id)].ltd_exp, + &id, NULL, 0, NULL, 0, &id, &it, 0, + &req, cb); + lockh = (struct lustre_handle *) &LUSTRE_IT(&it)->it_lock_handle; + if (rc > 0 && req == NULL) { /* nice, this slave is valid */ LASSERT(req == NULL); CDEBUG(D_OTHER, "cached\n"); goto release_lock; } - if (rc < 0) + if (rc < 0) { + OBD_FREE(it.d.fs_data, sizeof(struct lustre_intent_data)); /* error during revalidation */ GOTO(cleanup, rc); - - /* rc == 0, this means we have no such a lock and can't think - * obj is still valid. lookup it again */ - LASSERT(req == NULL); - req = NULL; - - memset(&it, 0, sizeof(it)); - it.it_op = IT_GETATTR; - rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid, - NULL, 0, NULL, 0, NULL, &it, 0, &req, cb); - lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle; - LASSERT(rc <= 0); - - if (rc < 0) - /* error during lookup */ - GOTO(cleanup, rc); - + } if (master) { LASSERT(master_valid == 0); /* save lock on master to be returned to the caller */ CDEBUG(D_OTHER, "no lock on master yet\n"); memcpy(&master_lockh, lockh, sizeof(master_lockh)); - master_lock_mode = it.d.lustre.it_lock_mode; - it.d.lustre.it_lock_mode = 0; + master_lock_mode = LUSTRE_IT(&it)->it_lock_mode; + LUSTRE_IT(&it)->it_lock_mode = 0; } else { /* this is slave. we want to control it */ lock = ldlm_handle2lock(lockh); @@ -725,8 +732,8 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, } if (*reqp == NULL) { - /* this is first reply, we'll use it to return - * updated data back to the caller */ + /* this is first reply, we'll use it to return updated + * data back to the caller */ LASSERT(req); ptlrpc_request_addref(req); *reqp = req; @@ -741,14 +748,15 @@ update: CDEBUG(D_OTHER, "fresh: %lu\n", (unsigned long)obj->objs[i].size); - + if (req) ptlrpc_req_finished(req); release_lock: size += obj->objs[i].size; - if (it.d.lustre.it_lock_mode) - ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode); + if (LUSTRE_IT(&it)->it_lock_mode) + ldlm_lock_decref(lockh, LUSTRE_IT(&it)->it_lock_mode); + OBD_FREE(it.d.fs_data, sizeof(struct lustre_intent_data)); } if (*reqp) { @@ -764,27 +772,30 @@ release_lock: body->size = size; if (mreq == NULL) { - /* very important to maintain lli->mds the same because - * of revalidation. mreq == NULL means that caller has - * no reply and the only attr we can return is size */ + /* very important to maintain id_group(lli->lli_id) the + * same because of revalidation. mreq == NULL means that + * caller has no reply and the only attr we can return + * is size */ body->valid = OBD_MD_FLSIZE; - body->mds = obj->fid.mds; +// body->mds = id_group(&obj->id); } if (master_valid == 0) { - memcpy(&oit->d.lustre.it_lock_handle, + memcpy(&LUSTRE_IT(oit)->it_lock_handle, &master_lockh, sizeof(master_lockh)); - oit->d.lustre.it_lock_mode = master_lock_mode; + LUSTRE_IT(oit)->it_lock_mode = master_lock_mode; } rc = 0; } else { /* it seems all the attrs are fresh and we did no request */ CDEBUG(D_OTHER, "all the attrs were fresh\n"); if (master_valid == 0) - oit->d.lustre.it_lock_mode = master_lock_mode; + LUSTRE_IT(oit)->it_lock_mode = master_lock_mode; rc = 1; } + + EXIT; cleanup: lmv_unlock_obj(obj); lmv_put_obj(obj); - RETURN(rc); + return rc; }