Whamcloud - gitweb
- added proper ref counting in lmv object manager.
authoryury <yury>
Fri, 18 Jun 2004 15:48:31 +0000 (15:48 +0000)
committeryury <yury>
Fri, 18 Jun 2004 15:48:31 +0000 (15:48 +0000)
- fixed possible race in lmv_create_obj().
- cleanups in API of lmv object manager.
- removed some redundant checks for return code.
- added error handling for lmv_create_obj() in various places.
- coding style and indentation fixes in lmv code.

lustre/lmv/lmv_intent.c
lustre/lmv/lmv_internal.h
lustre/lmv/lmv_obd.c
lustre/lmv/lmv_objmgr.c

index ba8a06f..3925c8a 100644 (file)
@@ -133,24 +133,25 @@ int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt,
         int rc, mds;
         ENTRY;
 
-        /* IT_OPEN is intended to open (and create, possible) an object.
-         * parent (pfid) may be splitted dir */
+        /* IT_OPEN is intended to open (and create, possible) an object. Parent
+         * (pfid) may be splitted dir */
 
 repeat:
         mds = rpfid.mds;
-        obj = lmv_grab_obj(obd, &rpfid, 0);
+        obj = lmv_grab_obj(obd, &rpfid);
         if (obj) {
                 /* directory is already splitted, so we have to forward
                  * request to the right MDS */
                 mds = raw_name2idx(obj->objcount, (char *)name, len);
-                rpfid = obj->objs[mds].fid;
                 CDEBUG(D_OTHER, "forward to MDS #%u\n", mds);
+
+                rpfid = obj->objs[mds].fid;
+                lmv_put_obj(obj);
         }
 
         rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name,
                             len, lmm, lmmsize, cfid, it, flags, reqp,
                             cb_blocking);
-        lmv_put_obj(obj);
         if (rc == -ERESTART) {
                 /* directory got splitted. time to update local object
                  * and repeat the request with proper MDS */
@@ -164,41 +165,48 @@ repeat:
         if (rc != 0)
                 RETURN(rc);
 
-        /* okay, MDS has returned success. probably name has been
-         * resolved in remote inode */
-        rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
-                                     reqp, cb_blocking);
+        /* okay, MDS has returned success. Probably name has been resolved in
+         * remote inode */
+        rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it,
+                                     flags, reqp, cb_blocking);
         if (rc != 0) {
                 LASSERT(rc < 0);
                 RETURN(rc);
         }
 
-        /* caller may use attrs MDS returns on IT_OPEN lock request
-         * so, we have to update them for splitted dir */
+        /* caller may use attrs MDS returns on IT_OPEN lock request so, we have
+         * to update them for splitted dir */
         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
         LASSERT(body != NULL);
+
         cfid = &body->fid1;
-        obj = lmv_grab_obj(obd, cfid, 0);
-        if (rc == 0 && !obj && (mea = is_body_of_splitted_dir(*reqp, 1))) {
+        obj = lmv_grab_obj(obd, cfid);
+        if (!obj && (mea = is_body_of_splitted_dir(*reqp, 1))) {
                 /* wow! this is splitted dir, we'd like to handle it */
-                rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
+                rc = lmv_create_obj(exp, &body->fid1, mea);
+                if (rc)
+                        RETURN(rc);
+                
+                obj = lmv_grab_obj(obd, cfid);
         }
-        obj = lmv_grab_obj(obd, cfid, 0);
+
         if (obj) {
                 /* this is splitted dir and we'd want to get attrs */
                 CDEBUG(D_OTHER, "attrs from slaves for %lu/%lu/%lu\n",
-                                (unsigned long) cfid->mds,
-                                (unsigned long) cfid->id,
-                                (unsigned long) cfid->generation);
+                       (unsigned long)cfid->mds, (unsigned long)cfid->id,
+                       (unsigned long)cfid->generation);
                 rc = lmv_revalidate_slaves(exp, reqp, cfid,
-                                it, 1, cb_blocking);
+                                           it, 1, cb_blocking);
         } else if (S_ISDIR(body->mode)) {
                 /*CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n",
                                 (unsigned long) cfid->mds,
                                 (unsigned long) cfid->id,
                                 (unsigned long) cfid->generation);*/
         }
-        lmv_put_obj(obj);
+        
+        if (obj)
+                lmv_put_obj(obj);
+        
         RETURN(rc);
 }
 
@@ -219,93 +227,100 @@ int lmv_intent_getattr(struct obd_export *exp, struct ll_uctxt *uctxt,
         ENTRY;
 
         if (cfid) {
-                /* caller wants to revalidate attrs of obj
-                 * we have to revalidate slaves if requested
-                 * object is splitted directory */
+                /* caller wants to revalidate attrs of obj we have to revalidate
+                 * slaves if requested object is splitted directory */
                 CDEBUG(D_OTHER, "revalidate attrs for %lu/%lu/%lu\n",
-                       (unsigned long) cfid->mds,
-                       (unsigned long) cfid->id,
-                       (unsigned long) cfid->generation);
+                       (unsigned long)cfid->mds, (unsigned long)cfid->id,
+                       (unsigned long)cfid->generation);
                 mds = cfid->mds;
-                obj = lmv_grab_obj(obd, cfid, 0);
+                obj = lmv_grab_obj(obd, cfid);
                 if (obj) {
-                        /* in fact, we need not this with current
-                         * _intent_lock(), but it may change some day */
+                        /* in fact, we need not this with current intent_lock(),
+                         * but it may change some day */
                         rpfid = obj->objs[mds].fid;
+                        lmv_put_obj(obj);
                 }
                 rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name,
                                     len, lmm, lmmsize, cfid, it, flags, reqp,
                                     cb_blocking);
                 if (obj && rc >= 0) {
-                        /* this is splitted dir. in order to optimize things
-                         * a bit, we consider obj valid updating missing
-                         * parts. FIXME: do we need to return any lock here?
-                         * it would be fine if we don't. this means that
-                         * nobody should use UPDATE lock to notify about
-                         * object removal */
+                        /* this is splitted dir. In order to optimize things a
+                         * bit, we consider obj valid updating missing parts.
+
+                         * FIXME: do we need to return any lock here? It would
+                         * be fine if we don't. this means that nobody should
+                         * use UPDATE lock to notify about object * removal */
                         CDEBUG(D_OTHER,
                                "revalidate slaves for %lu/%lu/%lu, rc %d\n",
-                               (unsigned long) cfid->mds,
-                               (unsigned long) cfid->id,
-                               (unsigned long) cfid->generation, rc);
+                               (unsigned long)cfid->mds, (unsigned long)cfid->id,
+                               (unsigned long)cfid->generation, rc);
+                        
                         rc = lmv_revalidate_slaves(exp, reqp, cfid, it, rc,
                                                    cb_blocking);
                 }
+
                 RETURN(rc);                
         }
 
         CDEBUG(D_OTHER, "INTENT getattr for %*s on %lu/%lu/%lu\n",
-               len, name, (unsigned long) pfid->mds,
-               (unsigned long) pfid->id,
-               (unsigned long) pfid->generation);
+               len, name, (unsigned long)pfid->mds, (unsigned long)pfid->id,
+               (unsigned long)pfid->generation);
 
         mds = pfid->mds;
-        obj = lmv_grab_obj(obd, pfid, 0);
+        obj = lmv_grab_obj(obd, pfid);
         if (obj && len) {
                 /* directory is already splitted. calculate mds */
                 mds = raw_name2idx(obj->objcount, (char *) name, len);
                 rpfid = obj->objs[mds].fid;
+                lmv_put_obj(obj);
+                
                 CDEBUG(D_OTHER, "forward to MDS #%u (slave %lu/%lu/%lu)\n",
-                       mds, (unsigned long) rpfid.mds,
-                       (unsigned long) rpfid.id,
-                       (unsigned long) rpfid.generation);
+                       mds, (unsigned long)rpfid.mds, (unsigned long)rpfid.id,
+                       (unsigned long)rpfid.generation);
         }
+        
         rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name,
                             len, lmm, lmmsize, NULL, it, flags, reqp,
                             cb_blocking);
+        
         if (rc < 0)
                 RETURN(rc);
+        
         LASSERT(rc == 0);
 
         /* okay, MDS has returned success. probably name has been
          * resolved in remote inode */
-        rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
-                                     reqp, cb_blocking);
+        rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it,
+                                     flags, reqp, cb_blocking);
         if (rc < 0)
                 RETURN(rc);
 
         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
         LASSERT(body != NULL);
+        
         cfid = &body->fid1;
-        obj2 = lmv_grab_obj(obd, cfid, 0);
+        obj2 = lmv_grab_obj(obd, cfid);
 
-        if (rc == 0 && !obj2 && (mea = is_body_of_splitted_dir(*reqp, 1))) {
-                /* wow! this is splitted dir, we'd like to handle it */
+        if (!obj2 && (mea = is_body_of_splitted_dir(*reqp, 1))) {
+                /* wow! this is splitted dir, we'd like to handle it. */
                 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
                 LASSERT(body != NULL);
-                rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
-                obj2 = lmv_grab_obj(obd, cfid, 0);
+
+                rc = lmv_create_obj(exp, &body->fid1, mea);
+                if (rc)
+                        RETURN(rc);
+                
+                obj2 = lmv_grab_obj(obd, cfid);
         }
 
         if (obj2) {
                 /* this is splitted dir and we'd want to get attrs */
-                CDEBUG(D_OTHER,
-                                "attrs from slaves for %lu/%lu/%lu, rc %d\n",
-                                (unsigned long) cfid->mds,
-                                (unsigned long) cfid->id,
-                                (unsigned long) cfid->generation, rc);
-                rc = lmv_revalidate_slaves(exp, reqp, cfid,
-                                it, 1, cb_blocking);
+                CDEBUG(D_OTHER, "attrs from slaves for %lu/%lu/%lu, rc %d\n",
+                       (unsigned long)cfid->mds, (unsigned long)cfid->id,
+                       (unsigned long)cfid->generation, rc);
+                
+                rc = lmv_revalidate_slaves(exp, reqp, cfid, it, 1, cb_blocking);
+                lmv_put_obj(obj2);
         }
         RETURN(rc);
 }
@@ -337,41 +352,42 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
         LASSERT(reqp);
         LASSERT(*reqp);
 
-        /* master is locked. we'd like to take locks on slaves
-         * and update attributes to be returned from the slaves
-         * it's important that lookup is called in two cases:
-         *  - for first time (dcache has no such a resolving yet
-         *  - ->d_revalidate() returned false
-         * last case possible only if all the objs (master and
-         * all slaves aren't valid */
+        /* master is locked. we'd like to take locks on slaves and update
+         * attributes to be returned from the slaves it's important that lookup
+         * is called in two cases:
+         
+         *  - for first time (dcache has no such a resolving yet.
+         *  - ->d_revalidate() returned false.
+         
+         * last case possible only if all the objs (master and all slaves aren't
+         * valid */
 
         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
         LASSERT(body != NULL);
 
-        obj = lmv_grab_obj(obd, &body->fid1, 0);
+        obj = lmv_grab_obj(obd, &body->fid1);
         LASSERT(obj);
 
         CDEBUG(D_OTHER, "lookup slaves for %lu/%lu/%lu\n",
-               (unsigned long) body->fid1.mds,
-               (unsigned long) body->fid1.id,
-               (unsigned long) body->fid1.generation);
+               (unsigned long)body->fid1.mds,
+               (unsigned long)body->fid1.id,
+               (unsigned long)body->fid1.generation);
 
         uctxt.gid1 = 0;
         uctxt.gid2 = 0;
+        
         for (i = 0; i < obj->objcount; i++) {
                 struct ll_fid fid = obj->objs[i].fid;
                 struct ptlrpc_request *req = NULL;
                 struct lookup_intent it;
 
-                if (fid_equal(&fid, &obj->fid)) {
+                if (fid_equal(&fid, &obj->fid))
                         /* skip master obj */
                         continue;
-                }
 
                 CDEBUG(D_OTHER, "lookup slave %lu/%lu/%lu\n",
-                       (unsigned long) fid.mds,
-                       (unsigned long) fid.id,
-                       (unsigned long) fid.generation);
+                       (unsigned long)fid.mds, (unsigned long)fid.id,
+                       (unsigned long)fid.generation);
 
                 /* is obj valid? */
                 memset(&it, 0, sizeof(it));
@@ -387,13 +403,12 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
                         goto release_lock;
                 }
 
-                if (rc < 0) {
+                if (rc < 0)
                         /* error during revalidation */
                         GOTO(cleanup, rc);
-                }
 
-                /* rc == 0, this means we have no such a lock and can't
-                 * think obj is still valid. lookup it again */
+                /* rc == 0, this means we have no such a lock and can't think
+                 * obj is still valid. lookup it again */
                 LASSERT(req == NULL);
                 req = NULL;
                 memset(&it, 0, sizeof(it));
@@ -401,24 +416,25 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
                 rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid,
                                     NULL, 0, NULL, 0, NULL, &it, 0, &req,
                                     lmv_dirobj_blocking_ast);
+
                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
                 LASSERT(rc <= 0);
-                if (rc < 0) {
+
+                if (rc < 0)
                         /* error during lookup */
                         GOTO(cleanup, rc);
-                }
                 
                 lock = ldlm_handle2lock(lockh);
                 LASSERT(lock);
-                lock->l_ast_data = obj;
-                atomic_inc(&obj->count);
+
+                lock->l_ast_data = lmv_get_obj(obj);
 
                 body2 = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body2));
                 LASSERT(body2);
 
                 obj->objs[i].size = body2->size;
                 CDEBUG(D_OTHER, "fresh: %lu\n",
-                                (unsigned long) obj->objs[i].size);
+                       (unsigned long) obj->objs[i].size);
 
                 LDLM_LOCK_PUT(lock);
 
@@ -430,15 +446,17 @@ release_lock:
                         ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
         }
 cleanup:
+        if (obj)
+                lmv_put_obj(obj);
         RETURN(rc);
 }
 
 int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt,
-                struct ll_fid *pfid, const char *name, int len,
-                void *lmm, int lmmsize, struct ll_fid *cfid,
-                struct lookup_intent *it, int flags,
-                struct ptlrpc_request **reqp,
-                ldlm_blocking_callback cb_blocking)
+                      struct ll_fid *pfid, const char *name, int len,
+                      void *lmm, int lmmsize, struct ll_fid *cfid,
+                      struct lookup_intent *it, int flags,
+                      struct ptlrpc_request **reqp,
+                      ldlm_blocking_callback cb_blocking)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
@@ -449,28 +467,27 @@ int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt,
         int rc, mds;
         ENTRY;
 
-        /* IT_LOOKUP is intended to produce name -> fid resolving
-         * (let's call this lookup below) or to confirm requested
-         * resolving is still valid (let's call this revalidation)
-         * cfid != NULL specifies revalidation */
+        /* IT_LOOKUP is intended to produce name -> fid resolving (let's call
+         * this lookup below) or to confirm requested resolving is still valid
+         * (let's call this revalidation) cfid != NULL specifies revalidation */
 
         if (cfid) {
-                /* this is revalidation: we have to check is LOOKUP
-                 * lock still valid for given fid. very important
-                 * part is that we have to choose right mds because
-                 * namespace is per mds */
+                /* this is revalidation: we have to check is LOOKUP lock still
+                 * valid for given fid. very important part is that we have to
+                 * choose right mds because namespace is per mds */
                 rpfid = *pfid;
-                obj = lmv_grab_obj(obd, pfid, 0);
+                obj = lmv_grab_obj(obd, pfid);
                 if (obj) {
                         mds = raw_name2idx(obj->objcount, (char *) name, len);
                         rpfid = obj->objs[mds].fid;
                         lmv_put_obj(obj);
                 }
                 mds = rpfid.mds;
+                
                 CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu to %d MDS\n",
-                       (unsigned long) cfid->mds,
-                       (unsigned long) cfid->id,
-                       (unsigned long) cfid->generation, mds);
+                       (unsigned long)cfid->mds, (unsigned long)cfid->id,
+                       (unsigned long)cfid->generation, mds);
+                
                 rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, pfid, name,
                                     len, lmm, lmmsize, cfid, it, flags,
                                     reqp, cb_blocking);
@@ -479,14 +496,13 @@ int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt,
 
         mds = pfid->mds;
 repeat:
-        /* this is lookup. during lookup we have to update all the
-         * attributes, because returned values will be put in struct
-         * inode */
+        /* this is lookup. during lookup we have to update all the attributes,
+         * because returned values will be put in struct inode */
 
-        obj = lmv_grab_obj(obd, pfid, 0);
+        obj = lmv_grab_obj(obd, pfid);
         if (obj && len) {
                 /* directory is already splitted. calculate mds */
-                mds = raw_name2idx(obj->objcount, (char *) name, len);
+                mds = raw_name2idx(obj->objcount, (char *)name, len);
                 rpfid = obj->objs[mds].fid;
                 lmv_put_obj(obj);
         }
@@ -495,12 +511,12 @@ repeat:
                             len, lmm, lmmsize, NULL, it, flags, reqp,
                             cb_blocking);
         if (rc > 0) {
-                /* very interesting. it seems object is still valid
-                 * but for some reason llite calls lookup, not revalidate */
+                /* very interesting. it seems object is still valid but for some
+                 * reason llite calls lookup, not revalidate */
                 CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n",
-                       (unsigned long) rpfid.mds,
-                       (unsigned long) rpfid.id,
-                       (unsigned long) rpfid.generation);
+                      (unsigned long)rpfid.mds, (unsigned long)rpfid.id,
+                      (unsigned long)rpfid.generation);
+                
                 LASSERT(*reqp == NULL);
                 RETURN(rc);
         }
@@ -508,9 +524,8 @@ repeat:
         if (rc == 0 && *reqp == NULL) {
                 /* once again, we're asked for lookup, not revalidate */
                 CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n",
-                       (unsigned long) rpfid.mds,
-                       (unsigned long) rpfid.id,
-                       (unsigned long) rpfid.generation);
+                       (unsigned long)rpfid.mds, (unsigned long)rpfid.id,
+                       (unsigned long)rpfid.generation);
                 RETURN(rc);
         }
        
@@ -520,7 +535,7 @@ repeat:
                  * had to fail and lookup on dir had to return mea */
                 CWARN("we haven't knew about directory splitting!\n");
                 LASSERT(obj == NULL);
-                rc = lmv_create_obj_from_attrs(exp, &rpfid, NULL);
+                rc = lmv_create_obj(exp, &rpfid, NULL);
                 if (rc)
                         RETURN(rc);
                 goto repeat;
@@ -538,21 +553,22 @@ repeat:
                 /* wow! this is splitted dir, we'd like to handle it */
                 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
                 LASSERT(body != NULL);
-                obj = lmv_grab_obj(obd, &body->fid1, 0);
-                if (!obj)
-                        rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
-                lmv_put_obj(obj);
+                
+                if ((obj = lmv_grab_obj(obd, &body->fid1)))
+                        lmv_put_obj(obj);
+                else
+                        rc = lmv_create_obj(exp, &body->fid1, mea);
         }
 
         RETURN(rc);
 }
 
 int lmv_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt,
-                struct ll_fid *pfid, const char *name, int len,
-                void *lmm, int lmmsize, struct ll_fid *cfid,
-                struct lookup_intent *it, int flags,
-                struct ptlrpc_request **reqp,
-                ldlm_blocking_callback cb_blocking)
+                    struct ll_fid *pfid, const char *name, int len,
+                    void *lmm, int lmmsize, struct ll_fid *cfid,
+                    struct lookup_intent *it, int flags,
+                    struct ptlrpc_request **reqp,
+                    ldlm_blocking_callback cb_blocking)
 {
         struct obd_device *obd = exp->exp_obd;
         int rc = 0;
@@ -603,17 +619,18 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
         int i, rc = 0;
         ENTRY;
 
-        /* we have to loop over the subobjects, check validity and update
-         * them from MDSs if needed. it's very useful that we need not to
-         * update all the fields. say, common fields (that are equal on 
-         * all the subojects need not to be update, another fields (i_size,
-         * for example) are cached all the time */
-        obj = lmv_grab_obj(obd, mfid, 0);
+        /* we have to loop over the subobjects, check validity and update them
+         * from MDSs if needed. it's very useful that we need not to update all
+         * the fields. say, common fields (that are equal on all the subojects
+         * need not to be update, another fields (i_size, for example) are
+         * cached all the time */
+        obj = lmv_grab_obj(obd, mfid);
         LASSERT(obj);
 
-        master_lock_mode = 0;
         uctxt.gid1 = 0;
         uctxt.gid2 = 0;
+        master_lock_mode = 0;
+        
         for (i = 0; i < obj->objcount; i++) {
                 struct ll_fid fid = obj->objs[i].fid;
                 struct lustre_handle *lockh = NULL;
@@ -623,8 +640,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                 int master = 0;
 
                 CDEBUG(D_OTHER, "revalidate subobj %lu/%lu/%lu\n",
-                       (unsigned long) fid.mds,
-                       (unsigned long) fid.id,
+                       (unsigned long)fid.mds, (unsigned long)fid.id,
                        (unsigned long) fid.generation);
 
                 memset(&it, 0, sizeof(it));
@@ -636,8 +652,8 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                                 /* lmv_intent_getattr() already checked
                                  * validness and took the lock */
                                 if (mreq) {
-                                        /* it even got the reply
-                                         * refresh attrs from that reply */
+                                        /* it even got the reply refresh attrs
+                                         * from that reply */
                                         body = lustre_msg_buf(mreq->rq_repmsg,
                                                               1,sizeof(*body));
                                         LASSERT(body != NULL);
@@ -663,13 +679,12 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                         goto release_lock;
                 }
 
-                if (rc < 0) {
+                if (rc < 0)
                         /* error during revalidation */
                         GOTO(cleanup, rc);
-                }
 
-                /* rc == 0, this means we have no such a lock and can't
-                 * think obj is still valid. lookup it again */
+                /* rc == 0, this means we have no such a lock and can't think
+                 * obj is still valid. lookup it again */
                 LASSERT(req == NULL);
                 req = NULL;
                 memset(&it, 0, sizeof(it));
@@ -678,10 +693,9 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                                     NULL, 0, NULL, 0, NULL, &it, 0, &req, cb);
                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
                 LASSERT(rc <= 0);
-                if (rc < 0) {
+                if (rc < 0)
                         /* error during lookup */
                         GOTO(cleanup, rc);
-                }
               
                 if (master) {
                         LASSERT(master_valid == 0);
@@ -694,8 +708,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                         /* this is slave. we want to control it */
                         lock = ldlm_handle2lock(lockh);
                         LASSERT(lock);
-                        lock->l_ast_data = obj;
-                        atomic_inc(&obj->count);
+                        lock->l_ast_data = lmv_get_obj(obj);
                         LDLM_LOCK_PUT(lock);
                 }
 
@@ -731,13 +744,13 @@ release_lock:
                        (unsigned long) size);
                 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
                 LASSERT(body);
+
                 /* FIXME: what about another attributes? */
                 body->size = size;
                 if (mreq == NULL) {
-                        /* very important to maintain lli->mds the same
-                         * because of revalidation. mreq == NULL means
-                         * that caller has no reply and the only attr
-                         * we can return is size */
+                        /* very important to maintain lli->mds the same because
+                         * of revalidation. mreq == NULL means that caller has
+                         * no reply and the only attr we can return is size */
                         body->valid = OBD_MD_FLSIZE;
                         body->mds = obj->fid.mds;
                 }
@@ -755,6 +768,7 @@ release_lock:
                 rc = 1;
         }
 cleanup:
+        if (obj)
+                lmv_put_obj(obj);
         RETURN(rc);
 }
-
index c56cae2..a8eb88c 100644 (file)
@@ -22,10 +22,20 @@ struct lmv_obj {
 };
 
 int lmv_dirobj_blocking_ast(struct ldlm_lock *,
-                            struct ldlm_lock_desc *, void *, int);
-struct lmv_obj *lmv_grab_obj(struct obd_device *obd,
-                             struct ll_fid *fid, int create);
+                            struct ldlm_lock_desc *,
+                           void *, int);
+
 void lmv_put_obj(struct lmv_obj *obj);
+struct lmv_obj *lmv_get_obj(struct lmv_obj *obj);
+
+int lmv_setup_mgr(struct obd_device *obd);
+void lmv_cleanup_mgr(struct obd_device *obd);
+
+struct lmv_obj *lmv_grab_obj(struct obd_device *obd,
+                            struct ll_fid *fid);
+
+int lmv_create_obj(struct obd_export *exp, struct ll_fid *fid,
+                  struct mea *mea);
 
 int lmv_intent_lock(struct obd_export *, struct ll_uctxt *,
                     struct ll_fid *, const char *, int, void *, int,
@@ -43,13 +53,12 @@ int lmv_intent_open(struct obd_export *, struct ll_uctxt *,
                     struct ll_fid *, const char *, int, void *, int,
                    struct ll_fid *, struct lookup_intent *, int,
                    struct ptlrpc_request **, ldlm_blocking_callback);
-int lmv_create_obj_from_attrs(struct obd_export *, struct ll_fid *,
-                               struct mea *);
+
 int lmv_check_connect(struct obd_device *obd);
 int lmv_revalidate_slaves(struct obd_export *, struct ptlrpc_request **,
                           struct ll_fid *, struct lookup_intent *, int,
                          ldlm_blocking_callback cb_blocking);
-void lmv_cleanup_objs(struct obd_device *obd);
+
 int lmv_get_mea_and_update_object(struct obd_export *, struct ll_fid *);
 
 static inline struct mea * 
index b9c4825..7ef1329 100644 (file)
@@ -416,13 +416,12 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
 
 static int lmv_setup(struct obd_device *obd, obd_count len, void *buf)
 {
-        struct lustre_cfg *lcfg = buf;
+        int i, rc = 0;
         struct lmv_desc *desc;
-        struct lmv_obd *lmv = &obd->u.lmv;
         struct obd_uuid *uuids;
         struct lmv_tgt_desc *tgts;
-        int i;
-        int rc = 0;
+        struct lustre_cfg *lcfg = buf;
+        struct lmv_obd *lmv = &obd->u.lmv;
         ENTRY;
 
         if (lcfg->lcfg_inllen1 < 1) {
@@ -453,7 +452,7 @@ static int lmv_setup(struct obd_device *obd, obd_count len, void *buf)
         OBD_ALLOC(lmv->tgts, lmv->bufsize);
         if (lmv->tgts == NULL) {
                 CERROR("Out of memory\n");
-                RETURN(-EINVAL);
+                RETURN(-ENOMEM);
         }
 
         lmv->desc = *desc;
@@ -462,10 +461,17 @@ static int lmv_setup(struct obd_device *obd, obd_count len, void *buf)
         for (i = 0, tgts = lmv->tgts; i < desc->ld_tgt_count; i++, tgts++)
                 tgts->uuid = uuids[i];
         
+        lmv->max_cookiesize = 0;
+
         lmv->max_easize = sizeof(struct ll_fid) *
                 desc->ld_tgt_count + sizeof(struct mea);
         
-        lmv->max_cookiesize = 0;
+        rc = lmv_setup_mgr(obd);
+        if (rc) {
+                CERROR("Can't setup LMV object manager, "
+                       "error %d.\n", rc);
+                OBD_FREE(lmv->tgts, lmv->bufsize);
+        }
 
         RETURN(rc);
 }
@@ -505,7 +511,7 @@ static int lmv_cleanup(struct obd_device *obd, int flags)
 {
         struct lmv_obd *lmv = &obd->u.lmv;
         ENTRY;
-        lmv_cleanup_objs(obd);
+        lmv_cleanup_mgr(obd);
         OBD_FREE(lmv->tgts, lmv->bufsize);
         RETURN(0);
 }
@@ -525,39 +531,42 @@ static int lmv_getstatus(struct obd_export *exp, struct ll_fid *fid)
 }
 
 static int lmv_getattr(struct obd_export *exp, struct ll_fid *fid,
-                unsigned long valid, unsigned int ea_size,
-                struct ptlrpc_request **request)
+                       unsigned long valid, unsigned int ea_size,
+                       struct ptlrpc_request **request)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
         int rc, i = fid->mds;
         struct lmv_obj *obj;
         ENTRY;
+
         rc = lmv_check_connect(obd);
         if (rc)
                 RETURN(rc);
-        obj = lmv_grab_obj(obd, fid, 0);
+
+        obj = lmv_grab_obj(obd, fid);
+        
         CDEBUG(D_OTHER, "GETATTR for %lu/%lu/%lu %s\n",
-               (unsigned long) fid->mds,
-               (unsigned long) fid->id,
-               (unsigned long) fid->generation,
-               obj ? "(splitted)" : "");
+               (unsigned long)fid->mds, (unsigned long)fid->id,
+               (unsigned long)fid->generation, obj ? "(splitted)" : "");
 
         LASSERT(fid->mds < lmv->desc.ld_tgt_count);
         rc = md_getattr(lmv->tgts[i].ltd_exp, fid,
-                             valid, ea_size, request);
+                        valid, ea_size, request);
         if (rc == 0 && obj) {
-                /* we have to loop over dirobjs here and gather attrs
-                 * for all the slaves */
+                /* we have to loop over dirobjs here and gather attrs for all
+                 * the slaves. */
 #warning "attrs gathering here"
         }
-        lmv_put_obj(obj);
+
+        if (obj)
+                lmv_put_obj(obj);
+        
         RETURN(rc);
 }
 
-static int lmv_change_cbdata(struct obd_export *exp,
-                                 struct ll_fid *fid, 
-                                 ldlm_iterator_t it, void *data)
+static int lmv_change_cbdata(struct obd_export *exp, struct ll_fid *fid, 
+                             ldlm_iterator_t it, void *data)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
@@ -567,12 +576,16 @@ static int lmv_change_cbdata(struct obd_export *exp,
         rc = lmv_check_connect(obd);
         if (rc)
                 RETURN(rc);
+        
         CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu\n",
-               (unsigned long) fid->mds,
-               (unsigned long) fid->id,
+               (unsigned long) fid->mds, (unsigned long) fid->id,
                (unsigned long) fid->generation);
+        
         LASSERT(fid->mds < lmv->desc.ld_tgt_count);
-        rc = md_change_cbdata(lmv->tgts[fid->mds].ltd_exp, fid, it, data);
+
+        rc = md_change_cbdata(lmv->tgts[fid->mds].ltd_exp,
+                              fid, it, data);
+        
         RETURN(rc);
 }
 
@@ -598,7 +611,7 @@ static int lmv_change_cbdata_name(struct obd_export *exp, struct ll_fid *pfid,
 
         /* this is default mds for directory name belongs to */
         mds = pfid->mds;
-        obj = lmv_grab_obj(obd, pfid, 0);
+        obj = lmv_grab_obj(obd, pfid);
         if (obj) {
                 /* directory is splitted. look for right mds for this name */
                 mds = raw_name2idx(obj->objcount, name, len);
@@ -673,8 +686,8 @@ int lmv_get_mea_and_update_object(struct obd_export *exp, struct ll_fid *fid)
         if (md.mea == NULL)
                 GOTO(cleanup, rc = -ENODATA);
 
-        rc = lmv_create_obj_from_attrs(exp, fid, md.mea);
-        obd_free_memmd(exp, (struct lov_stripe_md **) &md.mea);
+        rc = lmv_create_obj(exp, fid, md.mea);
+        obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
 
 cleanup:
         if (req)
@@ -700,7 +713,7 @@ int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data,
         if (!lmv->desc.ld_active_tgt_count)
                 RETURN(-EIO);
 repeat:
-        obj = lmv_grab_obj(obd, &op_data->fid1, 0);
+        obj = lmv_grab_obj(obd, &op_data->fid1);
         if (obj) {
                 mds = raw_name2idx(obj->objcount, op_data->name,
                                    op_data->namelen);
@@ -777,13 +790,15 @@ int lmv_enqueue_slaves(struct obd_export *exp, int locktype,
                 memset(&data2, 0, sizeof(data2));
                 data2.fid1 = mea->mea_fids[i];
                 mds = data2.fid1.mds;
+                
                 rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it, lockmode,
                                 &data2, lockh + i, lmm, lmmsize, cb_completion,
                                 cb_blocking, cb_data);
+                
                 CDEBUG(D_OTHER, "take lock on slave %lu/%lu/%lu -> %d/%d\n",
-                       (unsigned long) mea->mea_fids[i].mds,
-                       (unsigned long) mea->mea_fids[i].id,
-                       (unsigned long) mea->mea_fids[i].generation,
+                       (unsigned long)mea->mea_fids[i].mds,
+                       (unsigned long)mea->mea_fids[i].id,
+                       (unsigned long)mea->mea_fids[i].generation,
                        rc, it->d.lustre.it_status);
                 if (rc)
                         GOTO(cleanup, rc);
@@ -833,7 +848,7 @@ int lmv_enqueue(struct obd_export *exp, int lock_type,
         }
 
         if (data->namelen) {
-                obj = lmv_grab_obj(obd, &data->fid1, 0);
+                obj = lmv_grab_obj(obd, &data->fid1);
                 if (obj) {
                         /* directory is splitted. look for
                          * right mds for this name */
@@ -868,7 +883,7 @@ int lmv_getattr_name(struct obd_export *exp, struct ll_fid *fid,
        if (rc)
                RETURN(rc);
 repeat:
-        obj = lmv_grab_obj(obd, fid, 0);
+        obj = lmv_grab_obj(obd, fid);
         if (obj) {
                 /* directory is splitted. look for right mds for this name */
                 mds = raw_name2idx(obj->objcount, filename, namelen - 1);
@@ -930,7 +945,7 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data,
                RETURN(rc);
         if (data->namelen != 0) {
                 /* usual link request */
-                obj = lmv_grab_obj(obd, &data->fid1, 0);
+                obj = lmv_grab_obj(obd, &data->fid1);
                 if (obj) {
                         rc = raw_name2idx(obj->objcount, data->name,
                                           data->namelen);
@@ -1000,7 +1015,7 @@ int lmv_rename(struct obd_export *exp, struct mdc_op_data *data,
                 goto request;
         }
 
-        obj = lmv_grab_obj(obd, &data->fid1, 0);
+        obj = lmv_grab_obj(obd, &data->fid1);
         if (obj) {
                 /* directory is already splitted, so we have to forward
                  * request to the right MDS */
@@ -1010,10 +1025,10 @@ int lmv_rename(struct obd_export *exp, struct mdc_op_data *data,
                        (unsigned long) obj->objs[mds].fid.mds,
                        (unsigned long) obj->objs[mds].fid.id,
                        (unsigned long) obj->objs[mds].fid.generation);
+                lmv_put_obj(obj);
         }
-        lmv_put_obj(obj);
 
-        obj = lmv_grab_obj(obd, &data->fid2, 0);
+        obj = lmv_grab_obj(obd, &data->fid2);
         if (obj) {
                 /* directory is already splitted, so we have to forward
                  * request to the right MDS */
@@ -1023,8 +1038,8 @@ int lmv_rename(struct obd_export *exp, struct mdc_op_data *data,
                        (unsigned long) obj->objs[mds].fid.mds,
                        (unsigned long) obj->objs[mds].fid.id,
                        (unsigned long) obj->objs[mds].fid.generation);
+                lmv_put_obj(obj);
         }
-        lmv_put_obj(obj);
         
         mds = data->fid1.mds;
 
@@ -1035,8 +1050,8 @@ request:
 }
 
 int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data,
-                struct iattr *iattr, void *ea, int ealen, void *ea2, int ea2len,
-                struct ptlrpc_request **request)
+                struct iattr *iattr, void *ea, int ealen, void *ea2,
+                int ea2len, struct ptlrpc_request **request)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
@@ -1050,7 +1065,7 @@ int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data,
        if (rc)
                RETURN(rc);
 
-        obj = lmv_grab_obj(obd, &data->fid1, 0);
+        obj = lmv_grab_obj(obd, &data->fid1);
         CDEBUG(D_OTHER, "SETATTR for %lu/%lu/%lu, valid 0x%x%s\n",
                (unsigned long) data->fid1.mds,
                (unsigned long) data->fid1.id,
@@ -1101,8 +1116,8 @@ int lmv_sync(struct obd_export *exp, struct ll_fid *fid,
         RETURN(rc);
 }
 
-int lmv_dirobj_blocking_ast(struct ldlm_lock *lock,
-                            struct ldlm_lock_desc *desc, void *data, int flag)
+int lmv_dirobj_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                            void *data, int flag)
 {
         struct lustre_handle lockh;
         struct lmv_obj *obj;
@@ -1121,17 +1136,15 @@ int lmv_dirobj_blocking_ast(struct ldlm_lock *lock,
         case LDLM_CB_CANCELING:
                 /* time to drop cached attrs for dirobj */
                 obj = lock->l_ast_data;
-                if (!obj)
-                        break;
-
-                CDEBUG(D_OTHER, "cancel %s on %lu/%lu, master %lu/%lu/%lu\n",
-                       lock->l_resource->lr_name.name[3] == 1 ?
-                                "LOOKUP" : "UPDATE",
-                       (unsigned long) lock->l_resource->lr_name.name[0],
-                       (unsigned long) lock->l_resource->lr_name.name[1],
-                       (unsigned long) obj->fid.mds,
-                       (unsigned long) obj->fid.id,
-                       (unsigned long) obj->fid.generation);
+                if (obj) {
+                        CDEBUG(D_OTHER, "cancel %s on %lu/%lu, master %lu/%lu/%lu\n",
+                               lock->l_resource->lr_name.name[3] == 1 ? "LOOKUP" : "UPDATE",
+                               (unsigned long)lock->l_resource->lr_name.name[0],
+                               (unsigned long)lock->l_resource->lr_name.name[1],
+                               (unsigned long)obj->fid.mds, (unsigned long)obj->fid.id,
+                               (unsigned long)obj->fid.generation);
+                        lmv_put_obj(obj);
+                }
                 break;
         default:
                 LBUG();
@@ -1177,7 +1190,7 @@ int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid,
                (unsigned long) rfid.id,
                (unsigned long) rfid.generation);
 
-        obj = lmv_grab_obj(obd, mdc_fid, 0);
+        obj = lmv_grab_obj(obd, mdc_fid);
         if (obj) {
                 /* find dirobj containing page with requested offset */
                 /* FIXME: what about protecting cached attrs here? */
@@ -1187,22 +1200,21 @@ int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid,
                         offset -= obj->objs[i].size;
                 }
                 rfid = obj->objs[i].fid;
+                lmv_put_obj(obj);
+                
                 CDEBUG(D_OTHER, "forward to %lu/%lu/%lu with offset %lu\n",
-                       (unsigned long) rfid.mds,
-                       (unsigned long) rfid.id,
-                       (unsigned long) rfid.generation,
-                       (unsigned long) offset);
+                       (unsigned long)rfid.mds, (unsigned long)rfid.id,
+                       (unsigned long)rfid.generation, (unsigned long)offset);
         }
-        rc = md_readpage(lmv->tgts[rfid.mds].ltd_exp, &rfid, offset, page, request);
-        if (rc == 0 && !fid_equal(&rfid, mdc_fid)) {
-                /* this page isn't from master object. to avoid
-                 * ./.. duplication in directory, we have to remove them
-                 * from all slave objects */
+        rc = md_readpage(lmv->tgts[rfid.mds].ltd_exp, &rfid, offset,
+                         page, request);
+        
+        if (rc == 0 && !fid_equal(&rfid, mdc_fid))
+                /* this page isn't from master object. To avoid "." and ".." 
+                 * duplication in directory, we have to remove them from all
+                 * slave objects */
                 lmv_remove_dots(page);
-        }
-      
-        lmv_put_obj(obj);
-
+        
         RETURN(rc);
 }
 
@@ -1258,7 +1270,8 @@ int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data,
                 RETURN(rc);
         } else if (data->namelen != 0) {
                 struct lmv_obj *obj;
-                obj = lmv_grab_obj(obd, &data->fid1, 0);
+                
+                obj = lmv_grab_obj(obd, &data->fid1);
                 if (obj) {
                         i = raw_name2idx(obj->objcount, data->name,
                                          data->namelen);
@@ -1553,7 +1566,7 @@ int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
 }
 
 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **mem_tgt,
-                        struct lov_mds_md *disk_src, int mdsize)
+                 struct lov_mds_md *disk_src, int mdsize)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct lmv_obd *lmv = &obd->u.lmv;
@@ -1587,8 +1600,8 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **mem_tgt,
 }
 
 int lmv_brw(int rw, struct obd_export *exp, struct obdo *oa,
-                struct lov_stripe_md *ea, obd_count oa_bufs,
-                struct brw_page *pgarr, struct obd_trans_info *oti)
+            struct lov_stripe_md *ea, obd_count oa_bufs,
+            struct brw_page *pgarr, struct obd_trans_info *oti)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
index 1b1c9cd..c395748 100644 (file)
 #include <linux/obd_lmv.h>
 #include "lmv_internal.h"
 
-
 LIST_HEAD(lmv_obj_list);
 spinlock_t lmv_obj_list_lock = SPIN_LOCK_UNLOCKED;
 
-struct lmv_obj *lmv_grab_obj(struct obd_device *obd,
-                             struct ll_fid *fid, int create)
+/* creates new obj on passed @fid and @mea. */
+static struct lmv_obj *
+__lmv_alloc_obj(struct obd_device *obd, struct ll_fid *fid,
+                struct mea *mea)
 {
+        int i;
+        struct lmv_obj *obj;
+        unsigned int obj_size;
         struct lmv_obd *lmv = &obd->u.lmv;
-        struct list_head *cur;
-        struct lmv_obj *obj, *obj2;
-
-        spin_lock(&lmv_obj_list_lock);
-        list_for_each(cur, &lmv_obj_list) {
-                obj = list_entry(cur, struct lmv_obj, list);
-                if (obj->fid.mds == fid->mds && obj->fid.id == fid->id &&
-                                obj->fid.generation == fid->generation) {
-                        atomic_inc(&obj->count);
-                        spin_unlock(&lmv_obj_list_lock);
-                        RETURN(obj);
-                }
-        }
-        spin_unlock(&lmv_obj_list_lock);
 
-        if (!create)
-                RETURN(NULL);
-
-        /* no such object yet, allocate and initialize them */
         OBD_ALLOC(obj, sizeof(*obj));
         if (!obj)
-                RETURN(NULL);
-        atomic_set(&obj->count, 0);
-        obj->fid = *fid;
+                return NULL;
+
         obj->obd = obd;
+        obj->fid = *fid;
+          
+        atomic_set(&obj->count, 0);
+        obj->objcount = mea->mea_count;
 
-        OBD_ALLOC(obj->objs, sizeof(struct lmv_inode) * lmv->desc.ld_tgt_count);
-        if (!obj->objs) {
-                OBD_FREE(obj, sizeof(*obj));
-                RETURN(NULL);
-        }
-        memset(obj->objs, 0,  sizeof(struct lmv_inode) * lmv->desc.ld_tgt_count);
+        obj_size = sizeof(struct lmv_inode) *
+                lmv->desc.ld_tgt_count;
+        
+        OBD_ALLOC(obj->objs, obj_size);
+        if (!obj->objs)
+                goto err_obj;
 
-        spin_lock(&lmv_obj_list_lock);
-        list_for_each(cur, &lmv_obj_list) {
-                obj2 = list_entry(cur, struct lmv_obj, list);
-                if (obj2->fid.mds == fid->mds && obj2->fid.id == fid->id &&
-                                obj2->fid.generation == fid->generation) {
-                        /* someone created it already */
-                        OBD_FREE(obj->objs,
-                                  sizeof(struct lmv_inode) * lmv->desc.ld_tgt_count);
-                        OBD_FREE(obj, sizeof(*obj));
-
-                        atomic_inc(&obj2->count);
-                        spin_unlock(&lmv_obj_list_lock);
-                        RETURN(obj2);
-                }
+        memset(obj->objs, 0, obj_size);
+
+        /* put all fids in */
+        for (i = 0; i < mea->mea_count; i++) {
+                CDEBUG(D_OTHER, "subobj %lu/%lu/%lu\n",
+                       (unsigned long)mea->mea_fids[i].mds,
+                       (unsigned long)mea->mea_fids[i].id,
+                       (unsigned long)mea->mea_fids[i].generation);
+                obj->objs[i].fid = mea->mea_fids[i];
         }
-        list_add(&obj->list, &lmv_obj_list);
-        CDEBUG(D_OTHER, "new obj in lmv cache: %lu/%lu/%lu\n",
-               (unsigned long) fid->mds, (unsigned long) fid->id,
-               (unsigned long) fid->generation);
-        spin_unlock(&lmv_obj_list_lock);
 
-        RETURN(obj);
+        return obj;
         
+err_obj:
+        OBD_FREE(obj, sizeof(*obj));
+        return NULL;
 }
 
-void lmv_put_obj(struct lmv_obj *obj)
+/* destroys passed @obj. */
+static void
+__lmv_free_obj(struct lmv_obj *obj)
 {
-        if (!obj)
-                return;
+        unsigned int obj_size;
+        struct lmv_obd *lmv = &obj->obd->u.lmv;
+        
+        obj_size = sizeof(struct lmv_inode) *
+                lmv->desc.ld_tgt_count;
+        
+        OBD_FREE(obj->objs, obj_size);
+        OBD_FREE(obj, sizeof(*obj));
+}
+
+struct lmv_obj *
+lmv_get_obj(struct lmv_obj *obj)
+{
+        LASSERT(obj);
+        atomic_inc(&obj->count);
+        return obj;
+}
+
+void
+lmv_put_obj(struct lmv_obj *obj)
+{
+        LASSERT(obj);
+
         if (atomic_dec_and_test(&obj->count)) {
-                CDEBUG(D_OTHER, "last reference to %lu/%lu/%lu\n",
-                       (unsigned long) obj->fid.mds,
-                       (unsigned long) obj->fid.id,
-                       (unsigned long) obj->fid.generation);
+                struct ll_fid *fid = &obj->fid;
+                CDEBUG(D_OTHER, "last reference to %lu/%lu/%lu - destroying\n",
+                       (unsigned long)fid->mds, (unsigned long)fid->id,
+                       (unsigned long)fid->generation);
+                __lmv_free_obj(obj);
         }
 }
 
-void lmv_cleanup_objs(struct obd_device *obd)
+static struct lmv_obj *
+__lmv_grab_obj(struct obd_device *obd, struct ll_fid *fid)
 {
-        struct lmv_obd *lmv = &obd->u.lmv;
-        struct list_head *cur, *tmp;
         struct lmv_obj *obj;
+        struct list_head *cur;
 
-        spin_lock(&lmv_obj_list_lock);
-        list_for_each_safe(cur, tmp, &lmv_obj_list) {
+        list_for_each(cur, &lmv_obj_list) {
                 obj = list_entry(cur, struct lmv_obj, list);
-                if (obj->obd != obd)
-                        continue;
+                if (fid_equal(&obj->fid, fid))
+                        return lmv_get_obj(obj);
+        }
+        return NULL;
+}
 
-                list_del(&obj->list);
-                OBD_FREE(obj->objs,
-                         sizeof(struct lmv_inode) * lmv->desc.ld_tgt_count);
-                OBD_FREE(obj, sizeof(*obj));
+struct lmv_obj *
+lmv_grab_obj(struct obd_device *obd, struct ll_fid *fid)
+{
+        struct lmv_obj *obj;
+        ENTRY;
+        
+        spin_lock(&lmv_obj_list_lock);
+        obj = __lmv_grab_obj(obd, fid);
+        spin_unlock(&lmv_obj_list_lock);
+        
+        RETURN(obj);
+}
+
+/* looks in objects list for an object that matches passed @fid. If it is not
+ * found -- creates it using passed @mea and puts to list. */
+static struct lmv_obj *
+__lmv_create_obj(struct obd_device *obd, struct ll_fid *fid,
+                 struct mea *mea)
+{
+        struct lmv_obj *obj, *cobj;
+        ENTRY;
+
+        obj = lmv_grab_obj(obd, fid);
+        if (obj)
+                RETURN(obj);
+
+        /* no such object yet, allocate and initialize them. */
+        obj = __lmv_alloc_obj(obd, fid, mea);
+        if (!obj)
+                RETURN(NULL);
+
+        /* check if someone create it already while we were dealing with
+         * allocating @obj. */
+        spin_lock(&lmv_obj_list_lock);
+        cobj = __lmv_grab_obj(obd, fid);
+        if (cobj) {
+                /* someone created it already - put @obj and getting out. */
+                __lmv_free_obj(obj);
+                spin_unlock(&lmv_obj_list_lock);
+                RETURN(cobj);
         }
+
+        /* object is referenced by list and thus should have additional
+         * reference counted. */
+        lmv_get_obj(obj);
+        list_add(&obj->list, &lmv_obj_list);
         spin_unlock(&lmv_obj_list_lock);
+
+        CDEBUG(D_OTHER, "new obj in lmv cache: %lu/%lu/%lu\n",
+               (unsigned long)fid->mds, (unsigned long)fid->id,
+               (unsigned long)fid->generation);
+
+        RETURN(lmv_get_obj(obj));
+        
 }
 
-int lmv_create_obj_from_attrs(struct obd_export *exp,
-                              struct ll_fid *fid, struct mea *mea)
+int
+lmv_create_obj(struct obd_export *exp,
+               struct ll_fid *fid, struct mea *mea)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
@@ -158,8 +215,8 @@ int lmv_create_obj_from_attrs(struct obd_export *exp,
         ENTRY;
 
         CDEBUG(D_OTHER, "get mea for %lu/%lu/%lu and create lmv obj\n",
-               (unsigned long) fid->mds, (unsigned long) fid->id,
-               (unsigned long) fid->generation);
+               (unsigned long)fid->mds, (unsigned long)fid->id,
+               (unsigned long)fid->generation);
 
         if (!mea) {
                 unsigned long valid;
@@ -175,41 +232,73 @@ int lmv_create_obj_from_attrs(struct obd_export *exp,
                 rc = md_getattr(lmv->tgts[fid->mds].ltd_exp, fid,
                                 valid, mealen, &req);
                 if (rc) {
-                        CERROR("md_getattr() failed, rc = %d\n", rc);
+                        CERROR("md_getattr() failed, error %d\n", rc);
                         GOTO(cleanup, rc);
                 }
 
                 rc = mdc_req2lustre_md(exp, req, 0, NULL, &md);
                 if (rc) {
-                        CERROR("mdc_req2lustre_md() failed, rc = %d\n", rc);
+                        CERROR("mdc_req2lustre_md() failed, error %d\n", rc);
                         GOTO(cleanup, rc);
                 }
 
-                if (md.mea == NULL)
+                if (!md.mea)
                         GOTO(cleanup, rc = -ENODATA);
                         
                 mea = md.mea;
         }
 
-        /* got mea, now create obj for it */
-        obj = lmv_grab_obj(obd, fid, 1);
-        if (!obj)
+        /* got mea, now create obj for it. */
+        obj = __lmv_create_obj(obd, fid, mea);
+        if (!obj) {
+                CERROR("Can't create new object %lu/%lu/%lu\n",
+                       (unsigned long)fid->mds, (unsigned long)fid->id,
+                       (unsigned long)fid->generation);
                 GOTO(cleanup, rc = -ENOMEM);
-
-        obj->objcount = mea->mea_count;
-        /* put all fids in */
-        for (i = 0; i < mea->mea_count; i++) {
-                CDEBUG(D_OTHER, "subobj %lu/%lu/%lu\n",
-                       (unsigned long) mea->mea_fids[i].mds,
-                       (unsigned long) mea->mea_fids[i].id,
-                       (unsigned long) mea->mea_fids[i].generation);
-                obj->objs[i].fid = mea->mea_fids[i];
-        }
-
+        } else
+                lmv_put_obj(obj);
 cleanup:
         if (req)       
                 ptlrpc_req_finished(req);
         RETURN(rc); 
 }
 
+int
+lmv_setup_mgr(struct obd_device *obd)
+{
+        CWARN("LMV object manager setup\n");
+        return 0;
+}
+
+void
+lmv_cleanup_mgr(struct obd_device *obd)
+{
+        struct lmv_obj *obj;
+        struct list_head *cur, *tmp;
 
+        CWARN("LMV object manager cleanup\n");
+        
+        spin_lock(&lmv_obj_list_lock);
+        list_for_each_safe(cur, tmp, &lmv_obj_list) {
+                obj = list_entry(cur, struct lmv_obj, list);
+                
+                if (obj->obd != obd)
+                        continue;
+
+                list_del(&obj->list);
+
+                if (atomic_read(&obj->count) > 1) {
+                        struct ll_fid *fid = &obj->fid;
+                        
+                        CERROR("Object %lu/%lu/%lu has invalid ref count %d\n",
+                               (unsigned long)fid->mds, (unsigned long)fid->id,
+                               (unsigned long)fid->generation,
+                               atomic_read(&obj->count));
+                }
+        
+                /* list does not use object anymore, ref counter should be
+                 * descreased. */
+                lmv_put_obj(obj);
+        }
+        spin_unlock(&lmv_obj_list_lock);
+}