From 30f882bb273d5583aae1a41c141b52b85613ce70 Mon Sep 17 00:00:00 2001 From: yury Date: Fri, 18 Jun 2004 15:48:31 +0000 Subject: [PATCH] - added proper ref counting in lmv object manager. - fixed possible race in lmv_create_obj(). - cleanups in API of lmv object manager. - removed some redundant checks for return code. - added error handling for lmv_create_obj() in various places. - coding style and indentation fixes in lmv code. --- lustre/lmv/lmv_intent.c | 310 ++++++++++++++++++++++++---------------------- lustre/lmv/lmv_internal.h | 21 +++- lustre/lmv/lmv_obd.c | 157 ++++++++++++----------- lustre/lmv/lmv_objmgr.c | 267 ++++++++++++++++++++++++++------------- 4 files changed, 440 insertions(+), 315 deletions(-) diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index ba8a06f..3925c8a 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -133,24 +133,25 @@ int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt, int rc, mds; ENTRY; - /* IT_OPEN is intended to open (and create, possible) an object. - * parent (pfid) may be splitted dir */ + /* IT_OPEN is intended to open (and create, possible) an object. Parent + * (pfid) may be splitted dir */ repeat: mds = rpfid.mds; - obj = lmv_grab_obj(obd, &rpfid, 0); + obj = lmv_grab_obj(obd, &rpfid); if (obj) { /* directory is already splitted, so we have to forward * request to the right MDS */ mds = raw_name2idx(obj->objcount, (char *)name, len); - rpfid = obj->objs[mds].fid; CDEBUG(D_OTHER, "forward to MDS #%u\n", mds); + + rpfid = obj->objs[mds].fid; + lmv_put_obj(obj); } rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name, len, lmm, lmmsize, cfid, it, flags, reqp, cb_blocking); - lmv_put_obj(obj); if (rc == -ERESTART) { /* directory got splitted. time to update local object * and repeat the request with proper MDS */ @@ -164,41 +165,48 @@ repeat: if (rc != 0) RETURN(rc); - /* okay, MDS has returned success. probably name has been - * resolved in remote inode */ - rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags, - reqp, cb_blocking); + /* okay, MDS has returned success. Probably name has been resolved in + * remote inode */ + rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, + flags, reqp, cb_blocking); if (rc != 0) { LASSERT(rc < 0); RETURN(rc); } - /* caller may use attrs MDS returns on IT_OPEN lock request - * so, we have to update them for splitted dir */ + /* caller may use attrs MDS returns on IT_OPEN lock request so, we have + * to update them for splitted dir */ body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); + cfid = &body->fid1; - obj = lmv_grab_obj(obd, cfid, 0); - if (rc == 0 && !obj && (mea = is_body_of_splitted_dir(*reqp, 1))) { + obj = lmv_grab_obj(obd, cfid); + if (!obj && (mea = is_body_of_splitted_dir(*reqp, 1))) { /* wow! this is splitted dir, we'd like to handle it */ - rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea); + rc = lmv_create_obj(exp, &body->fid1, mea); + if (rc) + RETURN(rc); + + obj = lmv_grab_obj(obd, cfid); } - obj = lmv_grab_obj(obd, cfid, 0); + if (obj) { /* this is splitted dir and we'd want to get attrs */ CDEBUG(D_OTHER, "attrs from slaves for %lu/%lu/%lu\n", - (unsigned long) cfid->mds, - (unsigned long) cfid->id, - (unsigned long) cfid->generation); + (unsigned long)cfid->mds, (unsigned long)cfid->id, + (unsigned long)cfid->generation); rc = lmv_revalidate_slaves(exp, reqp, cfid, - it, 1, cb_blocking); + it, 1, cb_blocking); } else if (S_ISDIR(body->mode)) { /*CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n", (unsigned long) cfid->mds, (unsigned long) cfid->id, (unsigned long) cfid->generation);*/ } - lmv_put_obj(obj); + + if (obj) + lmv_put_obj(obj); + RETURN(rc); } @@ -219,93 +227,100 @@ int lmv_intent_getattr(struct obd_export *exp, struct ll_uctxt *uctxt, ENTRY; if (cfid) { - /* caller wants to revalidate attrs of obj - * we have to revalidate slaves if requested - * object is splitted directory */ + /* caller wants to revalidate attrs of obj we have to revalidate + * slaves if requested object is splitted directory */ CDEBUG(D_OTHER, "revalidate attrs for %lu/%lu/%lu\n", - (unsigned long) cfid->mds, - (unsigned long) cfid->id, - (unsigned long) cfid->generation); + (unsigned long)cfid->mds, (unsigned long)cfid->id, + (unsigned long)cfid->generation); mds = cfid->mds; - obj = lmv_grab_obj(obd, cfid, 0); + obj = lmv_grab_obj(obd, cfid); if (obj) { - /* in fact, we need not this with current - * _intent_lock(), but it may change some day */ + /* in fact, we need not this with current intent_lock(), + * but it may change some day */ rpfid = obj->objs[mds].fid; + lmv_put_obj(obj); } rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name, len, lmm, lmmsize, cfid, it, flags, reqp, cb_blocking); if (obj && rc >= 0) { - /* this is splitted dir. in order to optimize things - * a bit, we consider obj valid updating missing - * parts. FIXME: do we need to return any lock here? - * it would be fine if we don't. this means that - * nobody should use UPDATE lock to notify about - * object removal */ + /* this is splitted dir. In order to optimize things a + * bit, we consider obj valid updating missing parts. + + * FIXME: do we need to return any lock here? It would + * be fine if we don't. this means that nobody should + * use UPDATE lock to notify about object * removal */ CDEBUG(D_OTHER, "revalidate slaves for %lu/%lu/%lu, rc %d\n", - (unsigned long) cfid->mds, - (unsigned long) cfid->id, - (unsigned long) cfid->generation, rc); + (unsigned long)cfid->mds, (unsigned long)cfid->id, + (unsigned long)cfid->generation, rc); + rc = lmv_revalidate_slaves(exp, reqp, cfid, it, rc, cb_blocking); } + RETURN(rc); } CDEBUG(D_OTHER, "INTENT getattr for %*s on %lu/%lu/%lu\n", - len, name, (unsigned long) pfid->mds, - (unsigned long) pfid->id, - (unsigned long) pfid->generation); + len, name, (unsigned long)pfid->mds, (unsigned long)pfid->id, + (unsigned long)pfid->generation); mds = pfid->mds; - obj = lmv_grab_obj(obd, pfid, 0); + obj = lmv_grab_obj(obd, pfid); if (obj && len) { /* directory is already splitted. calculate mds */ mds = raw_name2idx(obj->objcount, (char *) name, len); rpfid = obj->objs[mds].fid; + lmv_put_obj(obj); + CDEBUG(D_OTHER, "forward to MDS #%u (slave %lu/%lu/%lu)\n", - mds, (unsigned long) rpfid.mds, - (unsigned long) rpfid.id, - (unsigned long) rpfid.generation); + mds, (unsigned long)rpfid.mds, (unsigned long)rpfid.id, + (unsigned long)rpfid.generation); } + rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name, len, lmm, lmmsize, NULL, it, flags, reqp, cb_blocking); + if (rc < 0) RETURN(rc); + LASSERT(rc == 0); /* okay, MDS has returned success. probably name has been * resolved in remote inode */ - rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags, - reqp, cb_blocking); + rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, + flags, reqp, cb_blocking); if (rc < 0) RETURN(rc); body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); + cfid = &body->fid1; - obj2 = lmv_grab_obj(obd, cfid, 0); + obj2 = lmv_grab_obj(obd, cfid); - if (rc == 0 && !obj2 && (mea = is_body_of_splitted_dir(*reqp, 1))) { - /* wow! this is splitted dir, we'd like to handle it */ + if (!obj2 && (mea = is_body_of_splitted_dir(*reqp, 1))) { + /* wow! this is splitted dir, we'd like to handle it. */ body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); - rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea); - obj2 = lmv_grab_obj(obd, cfid, 0); + + rc = lmv_create_obj(exp, &body->fid1, mea); + if (rc) + RETURN(rc); + + obj2 = lmv_grab_obj(obd, cfid); } if (obj2) { /* this is splitted dir and we'd want to get attrs */ - CDEBUG(D_OTHER, - "attrs from slaves for %lu/%lu/%lu, rc %d\n", - (unsigned long) cfid->mds, - (unsigned long) cfid->id, - (unsigned long) cfid->generation, rc); - rc = lmv_revalidate_slaves(exp, reqp, cfid, - it, 1, cb_blocking); + CDEBUG(D_OTHER, "attrs from slaves for %lu/%lu/%lu, rc %d\n", + (unsigned long)cfid->mds, (unsigned long)cfid->id, + (unsigned long)cfid->generation, rc); + + rc = lmv_revalidate_slaves(exp, reqp, cfid, it, 1, cb_blocking); + lmv_put_obj(obj2); } RETURN(rc); } @@ -337,41 +352,42 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) LASSERT(reqp); LASSERT(*reqp); - /* master is locked. we'd like to take locks on slaves - * and update attributes to be returned from the slaves - * it's important that lookup is called in two cases: - * - for first time (dcache has no such a resolving yet - * - ->d_revalidate() returned false - * last case possible only if all the objs (master and - * all slaves aren't valid */ + /* master is locked. we'd like to take locks on slaves and update + * attributes to be returned from the slaves it's important that lookup + * is called in two cases: + + * - for first time (dcache has no such a resolving yet. + * - ->d_revalidate() returned false. + + * last case possible only if all the objs (master and all slaves aren't + * valid */ body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); - obj = lmv_grab_obj(obd, &body->fid1, 0); + obj = lmv_grab_obj(obd, &body->fid1); LASSERT(obj); CDEBUG(D_OTHER, "lookup slaves for %lu/%lu/%lu\n", - (unsigned long) body->fid1.mds, - (unsigned long) body->fid1.id, - (unsigned long) body->fid1.generation); + (unsigned long)body->fid1.mds, + (unsigned long)body->fid1.id, + (unsigned long)body->fid1.generation); uctxt.gid1 = 0; uctxt.gid2 = 0; + for (i = 0; i < obj->objcount; i++) { struct ll_fid fid = obj->objs[i].fid; struct ptlrpc_request *req = NULL; struct lookup_intent it; - if (fid_equal(&fid, &obj->fid)) { + if (fid_equal(&fid, &obj->fid)) /* skip master obj */ continue; - } CDEBUG(D_OTHER, "lookup slave %lu/%lu/%lu\n", - (unsigned long) fid.mds, - (unsigned long) fid.id, - (unsigned long) fid.generation); + (unsigned long)fid.mds, (unsigned long)fid.id, + (unsigned long)fid.generation); /* is obj valid? */ memset(&it, 0, sizeof(it)); @@ -387,13 +403,12 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) goto release_lock; } - if (rc < 0) { + if (rc < 0) /* error during revalidation */ GOTO(cleanup, rc); - } - /* rc == 0, this means we have no such a lock and can't - * think obj is still valid. lookup it again */ + /* rc == 0, this means we have no such a lock and can't think + * obj is still valid. lookup it again */ LASSERT(req == NULL); req = NULL; memset(&it, 0, sizeof(it)); @@ -401,24 +416,25 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid, NULL, 0, NULL, 0, NULL, &it, 0, &req, lmv_dirobj_blocking_ast); + lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle; LASSERT(rc <= 0); - if (rc < 0) { + + if (rc < 0) /* error during lookup */ GOTO(cleanup, rc); - } lock = ldlm_handle2lock(lockh); LASSERT(lock); - lock->l_ast_data = obj; - atomic_inc(&obj->count); + + lock->l_ast_data = lmv_get_obj(obj); body2 = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body2)); LASSERT(body2); obj->objs[i].size = body2->size; CDEBUG(D_OTHER, "fresh: %lu\n", - (unsigned long) obj->objs[i].size); + (unsigned long) obj->objs[i].size); LDLM_LOCK_PUT(lock); @@ -430,15 +446,17 @@ release_lock: ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode); } cleanup: + if (obj) + lmv_put_obj(obj); RETURN(rc); } int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt, - struct ll_fid *pfid, const char *name, int len, - void *lmm, int lmmsize, struct ll_fid *cfid, - struct lookup_intent *it, int flags, - struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking) + struct ll_fid *pfid, const char *name, int len, + void *lmm, int lmmsize, struct ll_fid *cfid, + struct lookup_intent *it, int flags, + struct ptlrpc_request **reqp, + ldlm_blocking_callback cb_blocking) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -449,28 +467,27 @@ int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt, int rc, mds; ENTRY; - /* IT_LOOKUP is intended to produce name -> fid resolving - * (let's call this lookup below) or to confirm requested - * resolving is still valid (let's call this revalidation) - * cfid != NULL specifies revalidation */ + /* IT_LOOKUP is intended to produce name -> fid resolving (let's call + * this lookup below) or to confirm requested resolving is still valid + * (let's call this revalidation) cfid != NULL specifies revalidation */ if (cfid) { - /* this is revalidation: we have to check is LOOKUP - * lock still valid for given fid. very important - * part is that we have to choose right mds because - * namespace is per mds */ + /* this is revalidation: we have to check is LOOKUP lock still + * valid for given fid. very important part is that we have to + * choose right mds because namespace is per mds */ rpfid = *pfid; - obj = lmv_grab_obj(obd, pfid, 0); + obj = lmv_grab_obj(obd, pfid); if (obj) { mds = raw_name2idx(obj->objcount, (char *) name, len); rpfid = obj->objs[mds].fid; lmv_put_obj(obj); } mds = rpfid.mds; + CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu to %d MDS\n", - (unsigned long) cfid->mds, - (unsigned long) cfid->id, - (unsigned long) cfid->generation, mds); + (unsigned long)cfid->mds, (unsigned long)cfid->id, + (unsigned long)cfid->generation, mds); + rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, pfid, name, len, lmm, lmmsize, cfid, it, flags, reqp, cb_blocking); @@ -479,14 +496,13 @@ int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt, mds = pfid->mds; repeat: - /* this is lookup. during lookup we have to update all the - * attributes, because returned values will be put in struct - * inode */ + /* this is lookup. during lookup we have to update all the attributes, + * because returned values will be put in struct inode */ - obj = lmv_grab_obj(obd, pfid, 0); + obj = lmv_grab_obj(obd, pfid); if (obj && len) { /* directory is already splitted. calculate mds */ - mds = raw_name2idx(obj->objcount, (char *) name, len); + mds = raw_name2idx(obj->objcount, (char *)name, len); rpfid = obj->objs[mds].fid; lmv_put_obj(obj); } @@ -495,12 +511,12 @@ repeat: len, lmm, lmmsize, NULL, it, flags, reqp, cb_blocking); if (rc > 0) { - /* very interesting. it seems object is still valid - * but for some reason llite calls lookup, not revalidate */ + /* very interesting. it seems object is still valid but for some + * reason llite calls lookup, not revalidate */ CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n", - (unsigned long) rpfid.mds, - (unsigned long) rpfid.id, - (unsigned long) rpfid.generation); + (unsigned long)rpfid.mds, (unsigned long)rpfid.id, + (unsigned long)rpfid.generation); + LASSERT(*reqp == NULL); RETURN(rc); } @@ -508,9 +524,8 @@ repeat: if (rc == 0 && *reqp == NULL) { /* once again, we're asked for lookup, not revalidate */ CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n", - (unsigned long) rpfid.mds, - (unsigned long) rpfid.id, - (unsigned long) rpfid.generation); + (unsigned long)rpfid.mds, (unsigned long)rpfid.id, + (unsigned long)rpfid.generation); RETURN(rc); } @@ -520,7 +535,7 @@ repeat: * had to fail and lookup on dir had to return mea */ CWARN("we haven't knew about directory splitting!\n"); LASSERT(obj == NULL); - rc = lmv_create_obj_from_attrs(exp, &rpfid, NULL); + rc = lmv_create_obj(exp, &rpfid, NULL); if (rc) RETURN(rc); goto repeat; @@ -538,21 +553,22 @@ repeat: /* wow! this is splitted dir, we'd like to handle it */ body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body != NULL); - obj = lmv_grab_obj(obd, &body->fid1, 0); - if (!obj) - rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea); - lmv_put_obj(obj); + + if ((obj = lmv_grab_obj(obd, &body->fid1))) + lmv_put_obj(obj); + else + rc = lmv_create_obj(exp, &body->fid1, mea); } RETURN(rc); } int lmv_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt, - struct ll_fid *pfid, const char *name, int len, - void *lmm, int lmmsize, struct ll_fid *cfid, - struct lookup_intent *it, int flags, - struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking) + struct ll_fid *pfid, const char *name, int len, + void *lmm, int lmmsize, struct ll_fid *cfid, + struct lookup_intent *it, int flags, + struct ptlrpc_request **reqp, + ldlm_blocking_callback cb_blocking) { struct obd_device *obd = exp->exp_obd; int rc = 0; @@ -603,17 +619,18 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, int i, rc = 0; ENTRY; - /* we have to loop over the subobjects, check validity and update - * them from MDSs if needed. it's very useful that we need not to - * update all the fields. say, common fields (that are equal on - * all the subojects need not to be update, another fields (i_size, - * for example) are cached all the time */ - obj = lmv_grab_obj(obd, mfid, 0); + /* we have to loop over the subobjects, check validity and update them + * from MDSs if needed. it's very useful that we need not to update all + * the fields. say, common fields (that are equal on all the subojects + * need not to be update, another fields (i_size, for example) are + * cached all the time */ + obj = lmv_grab_obj(obd, mfid); LASSERT(obj); - master_lock_mode = 0; uctxt.gid1 = 0; uctxt.gid2 = 0; + master_lock_mode = 0; + for (i = 0; i < obj->objcount; i++) { struct ll_fid fid = obj->objs[i].fid; struct lustre_handle *lockh = NULL; @@ -623,8 +640,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, int master = 0; CDEBUG(D_OTHER, "revalidate subobj %lu/%lu/%lu\n", - (unsigned long) fid.mds, - (unsigned long) fid.id, + (unsigned long)fid.mds, (unsigned long)fid.id, (unsigned long) fid.generation); memset(&it, 0, sizeof(it)); @@ -636,8 +652,8 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, /* lmv_intent_getattr() already checked * validness and took the lock */ if (mreq) { - /* it even got the reply - * refresh attrs from that reply */ + /* it even got the reply refresh attrs + * from that reply */ body = lustre_msg_buf(mreq->rq_repmsg, 1,sizeof(*body)); LASSERT(body != NULL); @@ -663,13 +679,12 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, goto release_lock; } - if (rc < 0) { + if (rc < 0) /* error during revalidation */ GOTO(cleanup, rc); - } - /* rc == 0, this means we have no such a lock and can't - * think obj is still valid. lookup it again */ + /* rc == 0, this means we have no such a lock and can't think + * obj is still valid. lookup it again */ LASSERT(req == NULL); req = NULL; memset(&it, 0, sizeof(it)); @@ -678,10 +693,9 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, NULL, 0, NULL, 0, NULL, &it, 0, &req, cb); lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle; LASSERT(rc <= 0); - if (rc < 0) { + if (rc < 0) /* error during lookup */ GOTO(cleanup, rc); - } if (master) { LASSERT(master_valid == 0); @@ -694,8 +708,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, /* this is slave. we want to control it */ lock = ldlm_handle2lock(lockh); LASSERT(lock); - lock->l_ast_data = obj; - atomic_inc(&obj->count); + lock->l_ast_data = lmv_get_obj(obj); LDLM_LOCK_PUT(lock); } @@ -731,13 +744,13 @@ release_lock: (unsigned long) size); body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body)); LASSERT(body); + /* FIXME: what about another attributes? */ body->size = size; if (mreq == NULL) { - /* very important to maintain lli->mds the same - * because of revalidation. mreq == NULL means - * that caller has no reply and the only attr - * we can return is size */ + /* very important to maintain lli->mds the same because + * of revalidation. mreq == NULL means that caller has + * no reply and the only attr we can return is size */ body->valid = OBD_MD_FLSIZE; body->mds = obj->fid.mds; } @@ -755,6 +768,7 @@ release_lock: rc = 1; } cleanup: + if (obj) + lmv_put_obj(obj); RETURN(rc); } - diff --git a/lustre/lmv/lmv_internal.h b/lustre/lmv/lmv_internal.h index c56cae2..a8eb88c 100644 --- a/lustre/lmv/lmv_internal.h +++ b/lustre/lmv/lmv_internal.h @@ -22,10 +22,20 @@ struct lmv_obj { }; int lmv_dirobj_blocking_ast(struct ldlm_lock *, - struct ldlm_lock_desc *, void *, int); -struct lmv_obj *lmv_grab_obj(struct obd_device *obd, - struct ll_fid *fid, int create); + struct ldlm_lock_desc *, + void *, int); + void lmv_put_obj(struct lmv_obj *obj); +struct lmv_obj *lmv_get_obj(struct lmv_obj *obj); + +int lmv_setup_mgr(struct obd_device *obd); +void lmv_cleanup_mgr(struct obd_device *obd); + +struct lmv_obj *lmv_grab_obj(struct obd_device *obd, + struct ll_fid *fid); + +int lmv_create_obj(struct obd_export *exp, struct ll_fid *fid, + struct mea *mea); int lmv_intent_lock(struct obd_export *, struct ll_uctxt *, struct ll_fid *, const char *, int, void *, int, @@ -43,13 +53,12 @@ int lmv_intent_open(struct obd_export *, struct ll_uctxt *, struct ll_fid *, const char *, int, void *, int, struct ll_fid *, struct lookup_intent *, int, struct ptlrpc_request **, ldlm_blocking_callback); -int lmv_create_obj_from_attrs(struct obd_export *, struct ll_fid *, - struct mea *); + int lmv_check_connect(struct obd_device *obd); int lmv_revalidate_slaves(struct obd_export *, struct ptlrpc_request **, struct ll_fid *, struct lookup_intent *, int, ldlm_blocking_callback cb_blocking); -void lmv_cleanup_objs(struct obd_device *obd); + int lmv_get_mea_and_update_object(struct obd_export *, struct ll_fid *); static inline struct mea * diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index b9c4825..7ef1329 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -416,13 +416,12 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, static int lmv_setup(struct obd_device *obd, obd_count len, void *buf) { - struct lustre_cfg *lcfg = buf; + int i, rc = 0; struct lmv_desc *desc; - struct lmv_obd *lmv = &obd->u.lmv; struct obd_uuid *uuids; struct lmv_tgt_desc *tgts; - int i; - int rc = 0; + struct lustre_cfg *lcfg = buf; + struct lmv_obd *lmv = &obd->u.lmv; ENTRY; if (lcfg->lcfg_inllen1 < 1) { @@ -453,7 +452,7 @@ static int lmv_setup(struct obd_device *obd, obd_count len, void *buf) OBD_ALLOC(lmv->tgts, lmv->bufsize); if (lmv->tgts == NULL) { CERROR("Out of memory\n"); - RETURN(-EINVAL); + RETURN(-ENOMEM); } lmv->desc = *desc; @@ -462,10 +461,17 @@ static int lmv_setup(struct obd_device *obd, obd_count len, void *buf) for (i = 0, tgts = lmv->tgts; i < desc->ld_tgt_count; i++, tgts++) tgts->uuid = uuids[i]; + lmv->max_cookiesize = 0; + lmv->max_easize = sizeof(struct ll_fid) * desc->ld_tgt_count + sizeof(struct mea); - lmv->max_cookiesize = 0; + rc = lmv_setup_mgr(obd); + if (rc) { + CERROR("Can't setup LMV object manager, " + "error %d.\n", rc); + OBD_FREE(lmv->tgts, lmv->bufsize); + } RETURN(rc); } @@ -505,7 +511,7 @@ static int lmv_cleanup(struct obd_device *obd, int flags) { struct lmv_obd *lmv = &obd->u.lmv; ENTRY; - lmv_cleanup_objs(obd); + lmv_cleanup_mgr(obd); OBD_FREE(lmv->tgts, lmv->bufsize); RETURN(0); } @@ -525,39 +531,42 @@ static int lmv_getstatus(struct obd_export *exp, struct ll_fid *fid) } static int lmv_getattr(struct obd_export *exp, struct ll_fid *fid, - unsigned long valid, unsigned int ea_size, - struct ptlrpc_request **request) + unsigned long valid, unsigned int ea_size, + struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; int rc, i = fid->mds; struct lmv_obj *obj; ENTRY; + rc = lmv_check_connect(obd); if (rc) RETURN(rc); - obj = lmv_grab_obj(obd, fid, 0); + + obj = lmv_grab_obj(obd, fid); + CDEBUG(D_OTHER, "GETATTR for %lu/%lu/%lu %s\n", - (unsigned long) fid->mds, - (unsigned long) fid->id, - (unsigned long) fid->generation, - obj ? "(splitted)" : ""); + (unsigned long)fid->mds, (unsigned long)fid->id, + (unsigned long)fid->generation, obj ? "(splitted)" : ""); LASSERT(fid->mds < lmv->desc.ld_tgt_count); rc = md_getattr(lmv->tgts[i].ltd_exp, fid, - valid, ea_size, request); + valid, ea_size, request); if (rc == 0 && obj) { - /* we have to loop over dirobjs here and gather attrs - * for all the slaves */ + /* we have to loop over dirobjs here and gather attrs for all + * the slaves. */ #warning "attrs gathering here" } - lmv_put_obj(obj); + + if (obj) + lmv_put_obj(obj); + RETURN(rc); } -static int lmv_change_cbdata(struct obd_export *exp, - struct ll_fid *fid, - ldlm_iterator_t it, void *data) +static int lmv_change_cbdata(struct obd_export *exp, struct ll_fid *fid, + ldlm_iterator_t it, void *data) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -567,12 +576,16 @@ static int lmv_change_cbdata(struct obd_export *exp, rc = lmv_check_connect(obd); if (rc) RETURN(rc); + CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu\n", - (unsigned long) fid->mds, - (unsigned long) fid->id, + (unsigned long) fid->mds, (unsigned long) fid->id, (unsigned long) fid->generation); + LASSERT(fid->mds < lmv->desc.ld_tgt_count); - rc = md_change_cbdata(lmv->tgts[fid->mds].ltd_exp, fid, it, data); + + rc = md_change_cbdata(lmv->tgts[fid->mds].ltd_exp, + fid, it, data); + RETURN(rc); } @@ -598,7 +611,7 @@ static int lmv_change_cbdata_name(struct obd_export *exp, struct ll_fid *pfid, /* this is default mds for directory name belongs to */ mds = pfid->mds; - obj = lmv_grab_obj(obd, pfid, 0); + obj = lmv_grab_obj(obd, pfid); if (obj) { /* directory is splitted. look for right mds for this name */ mds = raw_name2idx(obj->objcount, name, len); @@ -673,8 +686,8 @@ int lmv_get_mea_and_update_object(struct obd_export *exp, struct ll_fid *fid) if (md.mea == NULL) GOTO(cleanup, rc = -ENODATA); - rc = lmv_create_obj_from_attrs(exp, fid, md.mea); - obd_free_memmd(exp, (struct lov_stripe_md **) &md.mea); + rc = lmv_create_obj(exp, fid, md.mea); + obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea); cleanup: if (req) @@ -700,7 +713,7 @@ int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data, if (!lmv->desc.ld_active_tgt_count) RETURN(-EIO); repeat: - obj = lmv_grab_obj(obd, &op_data->fid1, 0); + obj = lmv_grab_obj(obd, &op_data->fid1); if (obj) { mds = raw_name2idx(obj->objcount, op_data->name, op_data->namelen); @@ -777,13 +790,15 @@ int lmv_enqueue_slaves(struct obd_export *exp, int locktype, memset(&data2, 0, sizeof(data2)); data2.fid1 = mea->mea_fids[i]; mds = data2.fid1.mds; + rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it, lockmode, &data2, lockh + i, lmm, lmmsize, cb_completion, cb_blocking, cb_data); + CDEBUG(D_OTHER, "take lock on slave %lu/%lu/%lu -> %d/%d\n", - (unsigned long) mea->mea_fids[i].mds, - (unsigned long) mea->mea_fids[i].id, - (unsigned long) mea->mea_fids[i].generation, + (unsigned long)mea->mea_fids[i].mds, + (unsigned long)mea->mea_fids[i].id, + (unsigned long)mea->mea_fids[i].generation, rc, it->d.lustre.it_status); if (rc) GOTO(cleanup, rc); @@ -833,7 +848,7 @@ int lmv_enqueue(struct obd_export *exp, int lock_type, } if (data->namelen) { - obj = lmv_grab_obj(obd, &data->fid1, 0); + obj = lmv_grab_obj(obd, &data->fid1); if (obj) { /* directory is splitted. look for * right mds for this name */ @@ -868,7 +883,7 @@ int lmv_getattr_name(struct obd_export *exp, struct ll_fid *fid, if (rc) RETURN(rc); repeat: - obj = lmv_grab_obj(obd, fid, 0); + obj = lmv_grab_obj(obd, fid); if (obj) { /* directory is splitted. look for right mds for this name */ mds = raw_name2idx(obj->objcount, filename, namelen - 1); @@ -930,7 +945,7 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data, RETURN(rc); if (data->namelen != 0) { /* usual link request */ - obj = lmv_grab_obj(obd, &data->fid1, 0); + obj = lmv_grab_obj(obd, &data->fid1); if (obj) { rc = raw_name2idx(obj->objcount, data->name, data->namelen); @@ -1000,7 +1015,7 @@ int lmv_rename(struct obd_export *exp, struct mdc_op_data *data, goto request; } - obj = lmv_grab_obj(obd, &data->fid1, 0); + obj = lmv_grab_obj(obd, &data->fid1); if (obj) { /* directory is already splitted, so we have to forward * request to the right MDS */ @@ -1010,10 +1025,10 @@ int lmv_rename(struct obd_export *exp, struct mdc_op_data *data, (unsigned long) obj->objs[mds].fid.mds, (unsigned long) obj->objs[mds].fid.id, (unsigned long) obj->objs[mds].fid.generation); + lmv_put_obj(obj); } - lmv_put_obj(obj); - obj = lmv_grab_obj(obd, &data->fid2, 0); + obj = lmv_grab_obj(obd, &data->fid2); if (obj) { /* directory is already splitted, so we have to forward * request to the right MDS */ @@ -1023,8 +1038,8 @@ int lmv_rename(struct obd_export *exp, struct mdc_op_data *data, (unsigned long) obj->objs[mds].fid.mds, (unsigned long) obj->objs[mds].fid.id, (unsigned long) obj->objs[mds].fid.generation); + lmv_put_obj(obj); } - lmv_put_obj(obj); mds = data->fid1.mds; @@ -1035,8 +1050,8 @@ request: } int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data, - struct iattr *iattr, void *ea, int ealen, void *ea2, int ea2len, - struct ptlrpc_request **request) + struct iattr *iattr, void *ea, int ealen, void *ea2, + int ea2len, struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -1050,7 +1065,7 @@ int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data, if (rc) RETURN(rc); - obj = lmv_grab_obj(obd, &data->fid1, 0); + obj = lmv_grab_obj(obd, &data->fid1); CDEBUG(D_OTHER, "SETATTR for %lu/%lu/%lu, valid 0x%x%s\n", (unsigned long) data->fid1.mds, (unsigned long) data->fid1.id, @@ -1101,8 +1116,8 @@ int lmv_sync(struct obd_export *exp, struct ll_fid *fid, RETURN(rc); } -int lmv_dirobj_blocking_ast(struct ldlm_lock *lock, - struct ldlm_lock_desc *desc, void *data, int flag) +int lmv_dirobj_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, + void *data, int flag) { struct lustre_handle lockh; struct lmv_obj *obj; @@ -1121,17 +1136,15 @@ int lmv_dirobj_blocking_ast(struct ldlm_lock *lock, case LDLM_CB_CANCELING: /* time to drop cached attrs for dirobj */ obj = lock->l_ast_data; - if (!obj) - break; - - CDEBUG(D_OTHER, "cancel %s on %lu/%lu, master %lu/%lu/%lu\n", - lock->l_resource->lr_name.name[3] == 1 ? - "LOOKUP" : "UPDATE", - (unsigned long) lock->l_resource->lr_name.name[0], - (unsigned long) lock->l_resource->lr_name.name[1], - (unsigned long) obj->fid.mds, - (unsigned long) obj->fid.id, - (unsigned long) obj->fid.generation); + if (obj) { + CDEBUG(D_OTHER, "cancel %s on %lu/%lu, master %lu/%lu/%lu\n", + lock->l_resource->lr_name.name[3] == 1 ? "LOOKUP" : "UPDATE", + (unsigned long)lock->l_resource->lr_name.name[0], + (unsigned long)lock->l_resource->lr_name.name[1], + (unsigned long)obj->fid.mds, (unsigned long)obj->fid.id, + (unsigned long)obj->fid.generation); + lmv_put_obj(obj); + } break; default: LBUG(); @@ -1177,7 +1190,7 @@ int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid, (unsigned long) rfid.id, (unsigned long) rfid.generation); - obj = lmv_grab_obj(obd, mdc_fid, 0); + obj = lmv_grab_obj(obd, mdc_fid); if (obj) { /* find dirobj containing page with requested offset */ /* FIXME: what about protecting cached attrs here? */ @@ -1187,22 +1200,21 @@ int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid, offset -= obj->objs[i].size; } rfid = obj->objs[i].fid; + lmv_put_obj(obj); + CDEBUG(D_OTHER, "forward to %lu/%lu/%lu with offset %lu\n", - (unsigned long) rfid.mds, - (unsigned long) rfid.id, - (unsigned long) rfid.generation, - (unsigned long) offset); + (unsigned long)rfid.mds, (unsigned long)rfid.id, + (unsigned long)rfid.generation, (unsigned long)offset); } - rc = md_readpage(lmv->tgts[rfid.mds].ltd_exp, &rfid, offset, page, request); - if (rc == 0 && !fid_equal(&rfid, mdc_fid)) { - /* this page isn't from master object. to avoid - * ./.. duplication in directory, we have to remove them - * from all slave objects */ + rc = md_readpage(lmv->tgts[rfid.mds].ltd_exp, &rfid, offset, + page, request); + + if (rc == 0 && !fid_equal(&rfid, mdc_fid)) + /* this page isn't from master object. To avoid "." and ".." + * duplication in directory, we have to remove them from all + * slave objects */ lmv_remove_dots(page); - } - - lmv_put_obj(obj); - + RETURN(rc); } @@ -1258,7 +1270,8 @@ int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data, RETURN(rc); } else if (data->namelen != 0) { struct lmv_obj *obj; - obj = lmv_grab_obj(obd, &data->fid1, 0); + + obj = lmv_grab_obj(obd, &data->fid1); if (obj) { i = raw_name2idx(obj->objcount, data->name, data->namelen); @@ -1553,7 +1566,7 @@ int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, } int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **mem_tgt, - struct lov_mds_md *disk_src, int mdsize) + struct lov_mds_md *disk_src, int mdsize) { struct obd_device *obd = class_exp2obd(exp); struct lmv_obd *lmv = &obd->u.lmv; @@ -1587,8 +1600,8 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **mem_tgt, } int lmv_brw(int rw, struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *ea, obd_count oa_bufs, - struct brw_page *pgarr, struct obd_trans_info *oti) + struct lov_stripe_md *ea, obd_count oa_bufs, + struct brw_page *pgarr, struct obd_trans_info *oti) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; diff --git a/lustre/lmv/lmv_objmgr.c b/lustre/lmv/lmv_objmgr.c index 1b1c9cd..c395748 100644 --- a/lustre/lmv/lmv_objmgr.c +++ b/lustre/lmv/lmv_objmgr.c @@ -48,106 +48,163 @@ #include #include "lmv_internal.h" - LIST_HEAD(lmv_obj_list); spinlock_t lmv_obj_list_lock = SPIN_LOCK_UNLOCKED; -struct lmv_obj *lmv_grab_obj(struct obd_device *obd, - struct ll_fid *fid, int create) +/* creates new obj on passed @fid and @mea. */ +static struct lmv_obj * +__lmv_alloc_obj(struct obd_device *obd, struct ll_fid *fid, + struct mea *mea) { + int i; + struct lmv_obj *obj; + unsigned int obj_size; struct lmv_obd *lmv = &obd->u.lmv; - struct list_head *cur; - struct lmv_obj *obj, *obj2; - - spin_lock(&lmv_obj_list_lock); - list_for_each(cur, &lmv_obj_list) { - obj = list_entry(cur, struct lmv_obj, list); - if (obj->fid.mds == fid->mds && obj->fid.id == fid->id && - obj->fid.generation == fid->generation) { - atomic_inc(&obj->count); - spin_unlock(&lmv_obj_list_lock); - RETURN(obj); - } - } - spin_unlock(&lmv_obj_list_lock); - if (!create) - RETURN(NULL); - - /* no such object yet, allocate and initialize them */ OBD_ALLOC(obj, sizeof(*obj)); if (!obj) - RETURN(NULL); - atomic_set(&obj->count, 0); - obj->fid = *fid; + return NULL; + obj->obd = obd; + obj->fid = *fid; + + atomic_set(&obj->count, 0); + obj->objcount = mea->mea_count; - OBD_ALLOC(obj->objs, sizeof(struct lmv_inode) * lmv->desc.ld_tgt_count); - if (!obj->objs) { - OBD_FREE(obj, sizeof(*obj)); - RETURN(NULL); - } - memset(obj->objs, 0, sizeof(struct lmv_inode) * lmv->desc.ld_tgt_count); + obj_size = sizeof(struct lmv_inode) * + lmv->desc.ld_tgt_count; + + OBD_ALLOC(obj->objs, obj_size); + if (!obj->objs) + goto err_obj; - spin_lock(&lmv_obj_list_lock); - list_for_each(cur, &lmv_obj_list) { - obj2 = list_entry(cur, struct lmv_obj, list); - if (obj2->fid.mds == fid->mds && obj2->fid.id == fid->id && - obj2->fid.generation == fid->generation) { - /* someone created it already */ - OBD_FREE(obj->objs, - sizeof(struct lmv_inode) * lmv->desc.ld_tgt_count); - OBD_FREE(obj, sizeof(*obj)); - - atomic_inc(&obj2->count); - spin_unlock(&lmv_obj_list_lock); - RETURN(obj2); - } + memset(obj->objs, 0, obj_size); + + /* put all fids in */ + for (i = 0; i < mea->mea_count; i++) { + CDEBUG(D_OTHER, "subobj %lu/%lu/%lu\n", + (unsigned long)mea->mea_fids[i].mds, + (unsigned long)mea->mea_fids[i].id, + (unsigned long)mea->mea_fids[i].generation); + obj->objs[i].fid = mea->mea_fids[i]; } - list_add(&obj->list, &lmv_obj_list); - CDEBUG(D_OTHER, "new obj in lmv cache: %lu/%lu/%lu\n", - (unsigned long) fid->mds, (unsigned long) fid->id, - (unsigned long) fid->generation); - spin_unlock(&lmv_obj_list_lock); - RETURN(obj); + return obj; +err_obj: + OBD_FREE(obj, sizeof(*obj)); + return NULL; } -void lmv_put_obj(struct lmv_obj *obj) +/* destroys passed @obj. */ +static void +__lmv_free_obj(struct lmv_obj *obj) { - if (!obj) - return; + unsigned int obj_size; + struct lmv_obd *lmv = &obj->obd->u.lmv; + + obj_size = sizeof(struct lmv_inode) * + lmv->desc.ld_tgt_count; + + OBD_FREE(obj->objs, obj_size); + OBD_FREE(obj, sizeof(*obj)); +} + +struct lmv_obj * +lmv_get_obj(struct lmv_obj *obj) +{ + LASSERT(obj); + atomic_inc(&obj->count); + return obj; +} + +void +lmv_put_obj(struct lmv_obj *obj) +{ + LASSERT(obj); + if (atomic_dec_and_test(&obj->count)) { - CDEBUG(D_OTHER, "last reference to %lu/%lu/%lu\n", - (unsigned long) obj->fid.mds, - (unsigned long) obj->fid.id, - (unsigned long) obj->fid.generation); + struct ll_fid *fid = &obj->fid; + CDEBUG(D_OTHER, "last reference to %lu/%lu/%lu - destroying\n", + (unsigned long)fid->mds, (unsigned long)fid->id, + (unsigned long)fid->generation); + __lmv_free_obj(obj); } } -void lmv_cleanup_objs(struct obd_device *obd) +static struct lmv_obj * +__lmv_grab_obj(struct obd_device *obd, struct ll_fid *fid) { - struct lmv_obd *lmv = &obd->u.lmv; - struct list_head *cur, *tmp; struct lmv_obj *obj; + struct list_head *cur; - spin_lock(&lmv_obj_list_lock); - list_for_each_safe(cur, tmp, &lmv_obj_list) { + list_for_each(cur, &lmv_obj_list) { obj = list_entry(cur, struct lmv_obj, list); - if (obj->obd != obd) - continue; + if (fid_equal(&obj->fid, fid)) + return lmv_get_obj(obj); + } + return NULL; +} - list_del(&obj->list); - OBD_FREE(obj->objs, - sizeof(struct lmv_inode) * lmv->desc.ld_tgt_count); - OBD_FREE(obj, sizeof(*obj)); +struct lmv_obj * +lmv_grab_obj(struct obd_device *obd, struct ll_fid *fid) +{ + struct lmv_obj *obj; + ENTRY; + + spin_lock(&lmv_obj_list_lock); + obj = __lmv_grab_obj(obd, fid); + spin_unlock(&lmv_obj_list_lock); + + RETURN(obj); +} + +/* looks in objects list for an object that matches passed @fid. If it is not + * found -- creates it using passed @mea and puts to list. */ +static struct lmv_obj * +__lmv_create_obj(struct obd_device *obd, struct ll_fid *fid, + struct mea *mea) +{ + struct lmv_obj *obj, *cobj; + ENTRY; + + obj = lmv_grab_obj(obd, fid); + if (obj) + RETURN(obj); + + /* no such object yet, allocate and initialize them. */ + obj = __lmv_alloc_obj(obd, fid, mea); + if (!obj) + RETURN(NULL); + + /* check if someone create it already while we were dealing with + * allocating @obj. */ + spin_lock(&lmv_obj_list_lock); + cobj = __lmv_grab_obj(obd, fid); + if (cobj) { + /* someone created it already - put @obj and getting out. */ + __lmv_free_obj(obj); + spin_unlock(&lmv_obj_list_lock); + RETURN(cobj); } + + /* object is referenced by list and thus should have additional + * reference counted. */ + lmv_get_obj(obj); + list_add(&obj->list, &lmv_obj_list); spin_unlock(&lmv_obj_list_lock); + + CDEBUG(D_OTHER, "new obj in lmv cache: %lu/%lu/%lu\n", + (unsigned long)fid->mds, (unsigned long)fid->id, + (unsigned long)fid->generation); + + RETURN(lmv_get_obj(obj)); + } -int lmv_create_obj_from_attrs(struct obd_export *exp, - struct ll_fid *fid, struct mea *mea) +int +lmv_create_obj(struct obd_export *exp, + struct ll_fid *fid, struct mea *mea) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -158,8 +215,8 @@ int lmv_create_obj_from_attrs(struct obd_export *exp, ENTRY; CDEBUG(D_OTHER, "get mea for %lu/%lu/%lu and create lmv obj\n", - (unsigned long) fid->mds, (unsigned long) fid->id, - (unsigned long) fid->generation); + (unsigned long)fid->mds, (unsigned long)fid->id, + (unsigned long)fid->generation); if (!mea) { unsigned long valid; @@ -175,41 +232,73 @@ int lmv_create_obj_from_attrs(struct obd_export *exp, rc = md_getattr(lmv->tgts[fid->mds].ltd_exp, fid, valid, mealen, &req); if (rc) { - CERROR("md_getattr() failed, rc = %d\n", rc); + CERROR("md_getattr() failed, error %d\n", rc); GOTO(cleanup, rc); } rc = mdc_req2lustre_md(exp, req, 0, NULL, &md); if (rc) { - CERROR("mdc_req2lustre_md() failed, rc = %d\n", rc); + CERROR("mdc_req2lustre_md() failed, error %d\n", rc); GOTO(cleanup, rc); } - if (md.mea == NULL) + if (!md.mea) GOTO(cleanup, rc = -ENODATA); mea = md.mea; } - /* got mea, now create obj for it */ - obj = lmv_grab_obj(obd, fid, 1); - if (!obj) + /* got mea, now create obj for it. */ + obj = __lmv_create_obj(obd, fid, mea); + if (!obj) { + CERROR("Can't create new object %lu/%lu/%lu\n", + (unsigned long)fid->mds, (unsigned long)fid->id, + (unsigned long)fid->generation); GOTO(cleanup, rc = -ENOMEM); - - obj->objcount = mea->mea_count; - /* put all fids in */ - for (i = 0; i < mea->mea_count; i++) { - CDEBUG(D_OTHER, "subobj %lu/%lu/%lu\n", - (unsigned long) mea->mea_fids[i].mds, - (unsigned long) mea->mea_fids[i].id, - (unsigned long) mea->mea_fids[i].generation); - obj->objs[i].fid = mea->mea_fids[i]; - } - + } else + lmv_put_obj(obj); cleanup: if (req) ptlrpc_req_finished(req); RETURN(rc); } +int +lmv_setup_mgr(struct obd_device *obd) +{ + CWARN("LMV object manager setup\n"); + return 0; +} + +void +lmv_cleanup_mgr(struct obd_device *obd) +{ + struct lmv_obj *obj; + struct list_head *cur, *tmp; + CWARN("LMV object manager cleanup\n"); + + spin_lock(&lmv_obj_list_lock); + list_for_each_safe(cur, tmp, &lmv_obj_list) { + obj = list_entry(cur, struct lmv_obj, list); + + if (obj->obd != obd) + continue; + + list_del(&obj->list); + + if (atomic_read(&obj->count) > 1) { + struct ll_fid *fid = &obj->fid; + + CERROR("Object %lu/%lu/%lu has invalid ref count %d\n", + (unsigned long)fid->mds, (unsigned long)fid->id, + (unsigned long)fid->generation, + atomic_read(&obj->count)); + } + + /* list does not use object anymore, ref counter should be + * descreased. */ + lmv_put_obj(obj); + } + spin_unlock(&lmv_obj_list_lock); +} -- 1.8.3.1