X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flmv%2Flmv_intent.c;h=d860e86d4c26cc2fd825bfc0da74510a53c6fb2d;hb=8b708618d67ad546d921a58a771439a908f1222b;hp=eb1f40b3e8f23041f82a0f7c6ddd82f071042c63;hpb=2dc9c16e770415d56839e1996015fec5fab93f29;p=fs%2Flustre-release.git diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index eb1f40b..d860e86 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -49,6 +49,13 @@ #include "lmv_internal.h" +static inline void lmv_drop_intent_lock(struct lookup_intent *it) +{ + if (it->d.lustre.it_lock_mode != 0) + ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle, + it->d.lustre.it_lock_mode); +} + int lmv_handle_remote_inode(struct obd_export *exp, struct ll_uctxt *uctxt, void *lmm, int lmmsize, struct lookup_intent *it, int flags, @@ -88,7 +95,7 @@ int lmv_handle_remote_inode(struct obd_export *exp, struct ll_uctxt *uctxt, nfid = body->fid1; it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE; - rc = md_intent_lock(lmv->tgts[nfid.mds].exp, uctxt, &nfid, + rc = md_intent_lock(lmv->tgts[nfid.mds].ltd_exp, uctxt, &nfid, NULL, 0, lmm, lmmsize, NULL, it, flags, &req, cb_blocking); @@ -96,9 +103,7 @@ int lmv_handle_remote_inode(struct obd_export *exp, struct ll_uctxt *uctxt, * order to maintain dcache consistency. thus drop UPDATE * lock here and put LOOKUP in request */ if (rc == 0) { - LASSERT(it->d.lustre.it_lock_mode != 0); - ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle, - it->d.lustre.it_lock_mode); + lmv_drop_intent_lock(it); memcpy(&it->d.lustre.it_lock_handle, &plock, sizeof(plock)); it->d.lustre.it_lock_mode = pmode; @@ -128,24 +133,24 @@ int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt, int rc, mds; ENTRY; - /* IT_OPEN is intended to open (and create, possible) an object. - * parent (pfid) may be splitted dir */ + /* IT_OPEN is intended to open (and create, possible) an object. Parent + * (pfid) may be splitted dir */ repeat: mds = rpfid.mds; - obj = lmv_grab_obj(obd, &rpfid, 0); + obj = lmv_grab_obj(obd, &rpfid); if (obj) { /* directory is already splitted, so we have to forward * request to the right MDS */ mds = raw_name2idx(obj->objcount, (char *)name, len); - rpfid = obj->objs[mds].fid; CDEBUG(D_OTHER, "forward to MDS #%u\n", mds); + rpfid = obj->objs[mds].fid; + lmv_put_obj(obj); } - rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name, + rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name, len, lmm, lmmsize, cfid, it, flags, reqp, cb_blocking); - lmv_put_obj(obj); if (rc == -ERESTART) { /* directory got splitted. time to update local object * and repeat the request with proper MDS */ @@ -159,41 +164,55 @@ repeat: if (rc != 0) RETURN(rc); - /* okay, MDS has returned success. probably name has been - * resolved in remote inode */ - rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags, - reqp, cb_blocking); + /* okay, MDS has returned success. Probably name has been resolved in + * remote inode */ + rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, + flags, reqp, cb_blocking); if (rc != 0) { LASSERT(rc < 0); + CERROR("can't handle remote %s: dir %lu/%lu/%lu(%lu/%lu/%lu):" + "%*s: %d\n", LL_IT2STR(it), + (unsigned long) pfid->mds, + (unsigned long) pfid->id, + (unsigned long) pfid->generation, + (unsigned long) rpfid.mds, + (unsigned long) rpfid.id, + (unsigned long) rpfid.generation, + len, name, rc); RETURN(rc); } - /* caller may use attrs MDS returns on IT_OPEN lock request - * so, we have to update them for splitted dir */ + /* caller may use attrs MDS returns on IT_OPEN lock request so, we have + * to update them for splitted dir */ body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); + cfid = &body->fid1; - obj = lmv_grab_obj(obd, cfid, 0); - if (rc == 0 && !obj && (mea = is_body_of_splitted_dir(*reqp, 1))) { + obj = lmv_grab_obj(obd, cfid); + if (!obj && (mea = body_of_splitted_dir(*reqp, 1))) { /* wow! this is splitted dir, we'd like to handle it */ - rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea); + obj = lmv_create_obj(exp, &body->fid1, mea); + if (IS_ERR(obj)) + RETURN(PTR_ERR(obj)); } - obj = lmv_grab_obj(obd, cfid, 0); + if (obj) { /* this is splitted dir and we'd want to get attrs */ CDEBUG(D_OTHER, "attrs from slaves for %lu/%lu/%lu\n", - (unsigned long) cfid->mds, - (unsigned long) cfid->id, - (unsigned long) cfid->generation); + (unsigned long)cfid->mds, (unsigned long)cfid->id, + (unsigned long)cfid->generation); rc = lmv_revalidate_slaves(exp, reqp, cfid, - it, 1, cb_blocking); + it, 1, cb_blocking); } else if (S_ISDIR(body->mode)) { /*CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n", (unsigned long) cfid->mds, (unsigned long) cfid->id, (unsigned long) cfid->generation);*/ } - lmv_put_obj(obj); + + if (obj) + lmv_put_obj(obj); + RETURN(rc); } @@ -214,93 +233,98 @@ int lmv_intent_getattr(struct obd_export *exp, struct ll_uctxt *uctxt, ENTRY; if (cfid) { - /* caller wants to revalidate attrs of obj - * we have to revalidate slaves if requested - * object is splitted directory */ + /* caller wants to revalidate attrs of obj we have to revalidate + * slaves if requested object is splitted directory */ CDEBUG(D_OTHER, "revalidate attrs for %lu/%lu/%lu\n", - (unsigned long) cfid->mds, - (unsigned long) cfid->id, - (unsigned long) cfid->generation); + (unsigned long)cfid->mds, (unsigned long)cfid->id, + (unsigned long)cfid->generation); mds = cfid->mds; - obj = lmv_grab_obj(obd, cfid, 0); + obj = lmv_grab_obj(obd, cfid); if (obj) { - /* in fact, we need not this with current - * _intent_lock(), but it may change some day */ + /* in fact, we need not this with current intent_lock(), + * but it may change some day */ rpfid = obj->objs[mds].fid; + lmv_put_obj(obj); } - rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name, + rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name, len, lmm, lmmsize, cfid, it, flags, reqp, cb_blocking); if (obj && rc >= 0) { - /* this is splitted dir. in order to optimize things - * a bit, we consider obj valid updating missing - * parts. FIXME: do we need to return any lock here? - * it would be fine if we don't. this means that - * nobody should use UPDATE lock to notify about - * object removal */ + /* this is splitted dir. In order to optimize things a + * bit, we consider obj valid updating missing parts. + + * FIXME: do we need to return any lock here? It would + * be fine if we don't. this means that nobody should + * use UPDATE lock to notify about object * removal */ CDEBUG(D_OTHER, "revalidate slaves for %lu/%lu/%lu, rc %d\n", - (unsigned long) cfid->mds, - (unsigned long) cfid->id, - (unsigned long) cfid->generation, rc); + (unsigned long)cfid->mds, (unsigned long)cfid->id, + (unsigned long)cfid->generation, rc); + rc = lmv_revalidate_slaves(exp, reqp, cfid, it, rc, cb_blocking); } + RETURN(rc); } CDEBUG(D_OTHER, "INTENT getattr for %*s on %lu/%lu/%lu\n", - len, name, (unsigned long) pfid->mds, - (unsigned long) pfid->id, - (unsigned long) pfid->generation); + len, name, (unsigned long)pfid->mds, (unsigned long)pfid->id, + (unsigned long)pfid->generation); mds = pfid->mds; - obj = lmv_grab_obj(obd, pfid, 0); + obj = lmv_grab_obj(obd, pfid); if (obj && len) { /* directory is already splitted. calculate mds */ mds = raw_name2idx(obj->objcount, (char *) name, len); rpfid = obj->objs[mds].fid; + lmv_put_obj(obj); + CDEBUG(D_OTHER, "forward to MDS #%u (slave %lu/%lu/%lu)\n", - mds, (unsigned long) rpfid.mds, - (unsigned long) rpfid.id, - (unsigned long) rpfid.generation); + mds, (unsigned long)rpfid.mds, (unsigned long)rpfid.id, + (unsigned long)rpfid.generation); } - rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name, + + rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name, len, lmm, lmmsize, NULL, it, flags, reqp, cb_blocking); + if (rc < 0) RETURN(rc); + LASSERT(rc == 0); /* okay, MDS has returned success. probably name has been * resolved in remote inode */ - rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags, - reqp, cb_blocking); + rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, + flags, reqp, cb_blocking); if (rc < 0) RETURN(rc); body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); + cfid = &body->fid1; - obj2 = lmv_grab_obj(obd, cfid, 0); + obj2 = lmv_grab_obj(obd, cfid); - if (rc == 0 && !obj2 && (mea = is_body_of_splitted_dir(*reqp, 1))) { - /* wow! this is splitted dir, we'd like to handle it */ + if (!obj2 && (mea = body_of_splitted_dir(*reqp, 1))) { + /* wow! this is splitted dir, we'd like to handle it. */ body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); - rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea); - obj2 = lmv_grab_obj(obd, cfid, 0); + + obj2 = lmv_create_obj(exp, &body->fid1, mea); + if (IS_ERR(obj2)) + RETURN(PTR_ERR(obj2)); } if (obj2) { /* this is splitted dir and we'd want to get attrs */ - CDEBUG(D_OTHER, - "attrs from slaves for %lu/%lu/%lu, rc %d\n", - (unsigned long) cfid->mds, - (unsigned long) cfid->id, - (unsigned long) cfid->generation, rc); - rc = lmv_revalidate_slaves(exp, reqp, cfid, - it, 1, cb_blocking); + CDEBUG(D_OTHER, "attrs from slaves for %lu/%lu/%lu, rc %d\n", + (unsigned long)cfid->mds, (unsigned long)cfid->id, + (unsigned long)cfid->generation, rc); + + rc = lmv_revalidate_slaves(exp, reqp, cfid, it, 1, cb_blocking); + lmv_put_obj(obj2); } RETURN(rc); } @@ -309,11 +333,10 @@ void lmv_update_body_from_obj(struct mds_body *body, struct lmv_inode *obj) { /* update size */ body->size += obj->size; - - /* update atime */ - /* update ctime */ - /* update mtime */ - /* update nlink */ +/* body->atime = obj->atime; + body->ctime = obj->ctime; + body->mtime = obj->mtime; + body->nlink = obj->nlink;*/ } int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) @@ -332,49 +355,53 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) LASSERT(reqp); LASSERT(*reqp); - /* master is locked. we'd like to take locks on slaves - * and update attributes to be returned from the slaves - * it's important that lookup is called in two cases: - * - for first time (dcache has no such a resolving yet - * - ->d_revalidate() returned false - * last case possible only if all the objs (master and - * all slaves aren't valid */ + /* master is locked. we'd like to take locks on slaves and update + * attributes to be returned from the slaves it's important that lookup + * is called in two cases: + + * - for first time (dcache has no such a resolving yet). + * - ->d_revalidate() returned false. + + * last case possible only if all the objs (master and all slaves aren't + * valid */ body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); - obj = lmv_grab_obj(obd, &body->fid1, 0); - LASSERT(obj); + obj = lmv_grab_obj(obd, &body->fid1); + LASSERT(obj != NULL); CDEBUG(D_OTHER, "lookup slaves for %lu/%lu/%lu\n", - (unsigned long) body->fid1.mds, - (unsigned long) body->fid1.id, - (unsigned long) body->fid1.generation); + (unsigned long)body->fid1.mds, + (unsigned long)body->fid1.id, + (unsigned long)body->fid1.generation); uctxt.gid1 = 0; uctxt.gid2 = 0; + + lmv_lock_obj(obj); + for (i = 0; i < obj->objcount; i++) { struct ll_fid fid = obj->objs[i].fid; struct ptlrpc_request *req = NULL; struct lookup_intent it; - if (fid_equal(&fid, &obj->fid)) { + if (fid_equal(&fid, &obj->fid)) /* skip master obj */ continue; - } CDEBUG(D_OTHER, "lookup slave %lu/%lu/%lu\n", - (unsigned long) fid.mds, - (unsigned long) fid.id, - (unsigned long) fid.generation); + (unsigned long)fid.mds, (unsigned long)fid.id, + (unsigned long)fid.generation); /* is obj valid? */ memset(&it, 0, sizeof(it)); it.it_op = IT_GETATTR; - rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid, + rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid, NULL, 0, NULL, 0, &fid, &it, 0, &req, lmv_dirobj_blocking_ast); - lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle; + + lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle; if (rc > 0) { /* nice, this slave is valid */ LASSERT(req == NULL); @@ -382,38 +409,40 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) goto release_lock; } - if (rc < 0) { + if (rc < 0) /* error during revalidation */ GOTO(cleanup, rc); - } - /* rc == 0, this means we have no such a lock and can't - * think obj is still valid. lookup it again */ + /* rc == 0, this means we have no such a lock and can't think + * obj is still valid. lookup it again */ LASSERT(req == NULL); req = NULL; + memset(&it, 0, sizeof(it)); it.it_op = IT_GETATTR; - rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid, + rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid, NULL, 0, NULL, 0, NULL, &it, 0, &req, lmv_dirobj_blocking_ast); + lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle; LASSERT(rc <= 0); - if (rc < 0) { + + if (rc < 0) /* error during lookup */ GOTO(cleanup, rc); - } lock = ldlm_handle2lock(lockh); LASSERT(lock); - lock->l_ast_data = obj; - atomic_inc(&obj->count); + + lock->l_ast_data = lmv_get_obj(obj); body2 = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body2)); LASSERT(body2); obj->objs[i].size = body2->size; + CDEBUG(D_OTHER, "fresh: %lu\n", - (unsigned long) obj->objs[i].size); + (unsigned long)obj->objs[i].size); LDLM_LOCK_PUT(lock); @@ -421,19 +450,22 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) ptlrpc_req_finished(req); release_lock: lmv_update_body_from_obj(body, obj->objs + i); + if (it.d.lustre.it_lock_mode) ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode); } cleanup: + lmv_unlock_obj(obj); + lmv_put_obj(obj); RETURN(rc); } int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt, - struct ll_fid *pfid, const char *name, int len, - void *lmm, int lmmsize, struct ll_fid *cfid, - struct lookup_intent *it, int flags, - struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking) + struct ll_fid *pfid, const char *name, int len, + void *lmm, int lmmsize, struct ll_fid *cfid, + struct lookup_intent *it, int flags, + struct ptlrpc_request **reqp, + ldlm_blocking_callback cb_blocking) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -444,29 +476,28 @@ int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt, int rc, mds; ENTRY; - /* IT_LOOKUP is intended to produce name -> fid resolving - * (let's call this lookup below) or to confirm requested - * resolving is still valid (let's call this revalidation) - * cfid != NULL specifies revalidation */ + /* IT_LOOKUP is intended to produce name -> fid resolving (let's call + * this lookup below) or to confirm requested resolving is still valid + * (let's call this revalidation) cfid != NULL specifies revalidation */ if (cfid) { - /* this is revalidation: we have to check is LOOKUP - * lock still valid for given fid. very important - * part is that we have to choose right mds because - * namespace is per mds */ + /* this is revalidation: we have to check is LOOKUP lock still + * valid for given fid. very important part is that we have to + * choose right mds because namespace is per mds */ rpfid = *pfid; - obj = lmv_grab_obj(obd, pfid, 0); + obj = lmv_grab_obj(obd, pfid); if (obj) { mds = raw_name2idx(obj->objcount, (char *) name, len); rpfid = obj->objs[mds].fid; lmv_put_obj(obj); } mds = rpfid.mds; + CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu to %d MDS\n", - (unsigned long) cfid->mds, - (unsigned long) cfid->id, - (unsigned long) cfid->generation, mds); - rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, pfid, name, + (unsigned long)cfid->mds, (unsigned long)cfid->id, + (unsigned long)cfid->generation, mds); + + rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, pfid, name, len, lmm, lmmsize, cfid, it, flags, reqp, cb_blocking); RETURN(rc); @@ -474,28 +505,28 @@ int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt, mds = pfid->mds; repeat: - /* this is lookup. during lookup we have to update all the - * attributes, because returned values will be put in struct - * inode */ + /* this is lookup. during lookup we have to update all the attributes, + * because returned values will be put in struct inode */ - obj = lmv_grab_obj(obd, pfid, 0); - if (obj && len) { - /* directory is already splitted. calculate mds */ - mds = raw_name2idx(obj->objcount, (char *) name, len); - rpfid = obj->objs[mds].fid; + obj = lmv_grab_obj(obd, pfid); + if (obj) { + if (len) { + /* directory is already splitted. calculate mds */ + mds = raw_name2idx(obj->objcount, (char *)name, len); + rpfid = obj->objs[mds].fid; + } lmv_put_obj(obj); } - rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name, + rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name, len, lmm, lmmsize, NULL, it, flags, reqp, cb_blocking); if (rc > 0) { - /* very interesting. it seems object is still valid - * but for some reason llite calls lookup, not revalidate */ + /* very interesting. it seems object is still valid but for some + * reason llite calls lookup, not revalidate */ CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n", - (unsigned long) rpfid.mds, - (unsigned long) rpfid.id, - (unsigned long) rpfid.generation); + (unsigned long)rpfid.mds, (unsigned long)rpfid.id, + (unsigned long)rpfid.generation); LASSERT(*reqp == NULL); RETURN(rc); } @@ -503,39 +534,44 @@ repeat: if (rc == 0 && *reqp == NULL) { /* once again, we're asked for lookup, not revalidate */ CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n", - (unsigned long) rpfid.mds, - (unsigned long) rpfid.id, - (unsigned long) rpfid.generation); + (unsigned long)rpfid.mds, (unsigned long)rpfid.id, + (unsigned long)rpfid.generation); RETURN(rc); } if (rc == -ERESTART) { - /* directory got splitted since last update. this shouldn't - * be becasue splitting causes lock revocation, so revalidate - * had to fail and lookup on dir had to return mea */ + /* directory got splitted since last update. this shouldn't be + * becasue splitting causes lock revocation, so revalidate had + * to fail and lookup on dir had to return mea */ CWARN("we haven't knew about directory splitting!\n"); LASSERT(obj == NULL); - rc = lmv_create_obj_from_attrs(exp, &rpfid, NULL); - if (rc) - RETURN(rc); + + obj = lmv_create_obj(exp, &rpfid, NULL); + if (IS_ERR(obj)) + RETURN(PTR_ERR(obj)); + lmv_put_obj(obj); goto repeat; } if (rc < 0) RETURN(rc); - /* okay, MDS has returned success. probably name has been - * resolved in remote inode */ + /* okay, MDS has returned success. probably name has been resolved in + * remote inode */ rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags, reqp, cb_blocking); - if (rc == 0 && (mea = is_body_of_splitted_dir(*reqp, 1))) { + if (rc == 0 && (mea = body_of_splitted_dir(*reqp, 1))) { /* wow! this is splitted dir, we'd like to handle it */ body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); - obj = lmv_grab_obj(obd, &body->fid1, 0); - if (!obj) - rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea); + + obj = lmv_grab_obj(obd, &body->fid1); + if (!obj) { + obj = lmv_create_obj(exp, &body->fid1, mea); + if (IS_ERR(obj)) + RETURN(PTR_ERR(obj)); + } lmv_put_obj(obj); } @@ -543,11 +579,11 @@ repeat: } int lmv_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt, - struct ll_fid *pfid, const char *name, int len, - void *lmm, int lmmsize, struct ll_fid *cfid, - struct lookup_intent *it, int flags, - struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking) + struct ll_fid *pfid, const char *name, int len, + void *lmm, int lmmsize, struct ll_fid *cfid, + struct lookup_intent *it, int flags, + struct ptlrpc_request **reqp, + ldlm_blocking_callback cb_blocking) { struct obd_device *obd = exp->exp_obd; int rc = 0; @@ -560,7 +596,10 @@ int lmv_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt, LL_IT2STR(it), len, name, (unsigned long) pfid->id, (unsigned long) pfid->generation, pfid->mds); - lmv_connect(obd); + rc = lmv_check_connect(obd); + if (rc) + RETURN(rc); + if (it->it_op == IT_LOOKUP) rc = lmv_intent_lookup(exp, uctxt, pfid, name, len, lmm, lmmsize, cfid, it, flags, reqp, @@ -569,7 +608,7 @@ int lmv_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt, rc = lmv_intent_open(exp, uctxt, pfid, name, len, lmm, lmmsize, cfid, it, flags, reqp, cb_blocking); - else if (it->it_op == IT_GETATTR) + else if (it->it_op == IT_GETATTR || it->it_op == IT_CHDIR) rc = lmv_intent_getattr(exp, uctxt, pfid, name, len, lmm, lmmsize, cfid, it, flags, reqp, cb_blocking); @@ -586,8 +625,8 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, struct ptlrpc_request *mreq = *reqp; struct lmv_obd *lmv = &obd->u.lmv; struct lustre_handle master_lockh; - unsigned long size = 0; struct ldlm_lock *lock; + unsigned long size = 0; struct mds_body *body; struct ll_uctxt uctxt; struct lmv_obj *obj; @@ -595,17 +634,20 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, int i, rc = 0; ENTRY; - /* we have to loop over the subobjects, check validity and update - * them from MDSs if needed. it's very useful that we need not to - * update all the fields. say, common fields (that are equal on - * all the subojects need not to be update, another fields (i_size, - * for example) are cached all the time */ - obj = lmv_grab_obj(obd, mfid, 0); - LASSERT(obj); + /* we have to loop over the subobjects, check validity and update them + * from MDSs if needed. it's very useful that we need not to update all + * the fields. say, common fields (that are equal on all the subojects + * need not to be update, another fields (i_size, for example) are + * cached all the time */ + obj = lmv_grab_obj(obd, mfid); + LASSERT(obj != NULL); - master_lock_mode = 0; uctxt.gid1 = 0; uctxt.gid2 = 0; + master_lock_mode = 0; + + lmv_lock_obj(obj); + for (i = 0; i < obj->objcount; i++) { struct ll_fid fid = obj->objs[i].fid; struct lustre_handle *lockh = NULL; @@ -615,8 +657,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, int master = 0; CDEBUG(D_OTHER, "revalidate subobj %lu/%lu/%lu\n", - (unsigned long) fid.mds, - (unsigned long) fid.id, + (unsigned long)fid.mds, (unsigned long)fid.id, (unsigned long) fid.generation); memset(&it, 0, sizeof(it)); @@ -628,10 +669,10 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, /* lmv_intent_getattr() already checked * validness and took the lock */ if (mreq) { - /* it even got the reply - * refresh attrs from that reply */ + /* it even got the reply refresh attrs + * from that reply */ body = lustre_msg_buf(mreq->rq_repmsg, - 1,sizeof(*body)); + 1, sizeof(*body)); LASSERT(body != NULL); goto update; } @@ -645,7 +686,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, } /* is obj valid? */ - rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid, + rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid, NULL, 0, NULL, 0, &fid, &it, 0, &req, cb); lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle; if (rc > 0) { @@ -655,25 +696,25 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, goto release_lock; } - if (rc < 0) { + if (rc < 0) /* error during revalidation */ GOTO(cleanup, rc); - } - /* rc == 0, this means we have no such a lock and can't - * think obj is still valid. lookup it again */ + /* rc == 0, this means we have no such a lock and can't think + * obj is still valid. lookup it again */ LASSERT(req == NULL); req = NULL; + memset(&it, 0, sizeof(it)); it.it_op = IT_GETATTR; - rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid, + rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid, NULL, 0, NULL, 0, NULL, &it, 0, &req, cb); lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle; LASSERT(rc <= 0); - if (rc < 0) { + + if (rc < 0) /* error during lookup */ GOTO(cleanup, rc); - } if (master) { LASSERT(master_valid == 0); @@ -686,8 +727,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, /* this is slave. we want to control it */ lock = ldlm_handle2lock(lockh); LASSERT(lock); - lock->l_ast_data = obj; - atomic_inc(&obj->count); + lock->l_ast_data = lmv_get_obj(obj); LDLM_LOCK_PUT(lock); } @@ -705,31 +745,35 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, update: obj->objs[i].size = body->size; + CDEBUG(D_OTHER, "fresh: %lu\n", - (unsigned long) obj->objs[i].size); + (unsigned long)obj->objs[i].size); if (req) ptlrpc_req_finished(req); release_lock: size += obj->objs[i].size; + if (it.d.lustre.it_lock_mode) ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode); } if (*reqp) { - /* some attrs got refreshed, we have reply and it's time - * to put fresh attrs to it */ + /* some attrs got refreshed, we have reply and it's time to put + * fresh attrs to it */ CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n", - (unsigned long) size); + (unsigned long)size); + body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body); - /* FIXME: what about another attributes? */ + + /* FIXME: what about other attributes? */ body->size = size; + if (mreq == NULL) { - /* very important to maintain lli->mds the same - * because of revalidation. mreq == NULL means - * that caller has no reply and the only attr - * we can return is size */ + /* very important to maintain lli->mds the same because + * of revalidation. mreq == NULL means that caller has + * no reply and the only attr we can return is size */ body->valid = OBD_MD_FLSIZE; body->mds = obj->fid.mds; } @@ -747,6 +791,7 @@ release_lock: rc = 1; } cleanup: + lmv_unlock_obj(obj); + lmv_put_obj(obj); RETURN(rc); } -