Whamcloud - gitweb
- added mutex to lmv_obj to protect slaves attrs.
authoryury <yury>
Sat, 19 Jun 2004 17:15:26 +0000 (17:15 +0000)
committeryury <yury>
Sat, 19 Jun 2004 17:15:26 +0000 (17:15 +0000)
- changed semantic of lmv_create_obj(). By now it
returns created object, as mostly this is what all expect from it.

lustre/lmv/lmv_intent.c
lustre/lmv/lmv_internal.h
lustre/lmv/lmv_obd.c
lustre/lmv/lmv_objmgr.c

index 3925c8a..bf3f7b3 100644 (file)
@@ -181,13 +181,11 @@ repeat:
 
         cfid = &body->fid1;
         obj = lmv_grab_obj(obd, cfid);
-        if (!obj && (mea = is_body_of_splitted_dir(*reqp, 1))) {
+        if (!obj && (mea = body_of_splitted_dir(*reqp, 1))) {
                 /* wow! this is splitted dir, we'd like to handle it */
-                rc = lmv_create_obj(exp, &body->fid1, mea);
-                if (rc)
-                        RETURN(rc);
-                
-                obj = lmv_grab_obj(obd, cfid);
+                obj = lmv_create_obj(exp, &body->fid1, mea);
+                if (IS_ERR(obj))
+                        RETURN(PTR_ERR(obj));
         }
 
         if (obj) {
@@ -301,16 +299,14 @@ int lmv_intent_getattr(struct obd_export *exp, struct ll_uctxt *uctxt,
         cfid = &body->fid1;
         obj2 = lmv_grab_obj(obd, cfid);
 
-        if (!obj2 && (mea = is_body_of_splitted_dir(*reqp, 1))) {
+        if (!obj2 && (mea = body_of_splitted_dir(*reqp, 1))) {
                 /* wow! this is splitted dir, we'd like to handle it. */
                 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
                 LASSERT(body != NULL);
 
-                rc = lmv_create_obj(exp, &body->fid1, mea);
-                if (rc)
-                        RETURN(rc);
-                
-                obj2 = lmv_grab_obj(obd, cfid);
+                obj2 = lmv_create_obj(exp, &body->fid1, mea);
+                if (IS_ERR(obj2))
+                        RETURN(PTR_ERR(obj2));
         }
 
         if (obj2) {
@@ -366,7 +362,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
         LASSERT(body != NULL);
 
         obj = lmv_grab_obj(obd, &body->fid1);
-        LASSERT(obj);
+        LASSERT(obj != NULL);
 
         CDEBUG(D_OTHER, "lookup slaves for %lu/%lu/%lu\n",
                (unsigned long)body->fid1.mds,
@@ -375,6 +371,8 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
 
         uctxt.gid1 = 0;
         uctxt.gid2 = 0;
+
+        lmv_lock_obj(obj);
         
         for (i = 0; i < obj->objcount; i++) {
                 struct ll_fid fid = obj->objs[i].fid;
@@ -395,6 +393,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
                 rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid,
                                     NULL, 0, NULL, 0, &fid, &it, 0, &req,
                                     lmv_dirobj_blocking_ast);
+                
                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
                 if (rc > 0) {
                         /* nice, this slave is valid */
@@ -411,6 +410,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
                  * obj is still valid. lookup it again */
                 LASSERT(req == NULL);
                 req = NULL;
+
                 memset(&it, 0, sizeof(it));
                 it.it_op = IT_GETATTR;
                 rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid,
@@ -434,7 +434,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
 
                 obj->objs[i].size = body2->size;
                 CDEBUG(D_OTHER, "fresh: %lu\n",
-                       (unsigned long) obj->objs[i].size);
+                       (unsigned long)obj->objs[i].size);
 
                 LDLM_LOCK_PUT(lock);
 
@@ -442,12 +442,13 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
                         ptlrpc_req_finished(req);
 release_lock:
                 lmv_update_body_from_obj(body, obj->objs + i);
+
                 if (it.d.lustre.it_lock_mode)
                         ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
         }
 cleanup:
-        if (obj)
-                lmv_put_obj(obj);
+        lmv_unlock_obj(obj);
+        lmv_put_obj(obj);
         RETURN(rc);
 }
 
@@ -530,34 +531,40 @@ repeat:
         }
        
         if (rc == -ERESTART) {
-                /* directory got splitted since last update. this shouldn't
-                 * be becasue splitting causes lock revocation, so revalidate
-                 * had to fail and lookup on dir had to return mea */
+                /* directory got splitted since last update. this shouldn't be
+                 * becasue splitting causes lock revocation, so revalidate had
+                 * to fail and lookup on dir had to return mea */
                 CWARN("we haven't knew about directory splitting!\n");
                 LASSERT(obj == NULL);
-                rc = lmv_create_obj(exp, &rpfid, NULL);
-                if (rc)
-                        RETURN(rc);
+
+                obj = lmv_create_obj(exp, &rpfid, NULL);
+                if (IS_ERR(obj))
+                        RETURN(PTR_ERR(obj));
+                
                 goto repeat;
         }
 
         if (rc < 0)
                 RETURN(rc);
 
-        /* okay, MDS has returned success. probably name has been
-         * resolved in remote inode */
+        /* okay, MDS has returned success. probably name has been resolved in
+         * remote inode */
         rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
                                      reqp, cb_blocking);
 
-        if (rc == 0 && (mea = is_body_of_splitted_dir(*reqp, 1))) {
+        if (rc == 0 && (mea = body_of_splitted_dir(*reqp, 1))) {
                 /* wow! this is splitted dir, we'd like to handle it */
                 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
                 LASSERT(body != NULL);
                 
-                if ((obj = lmv_grab_obj(obd, &body->fid1)))
+                obj = lmv_grab_obj(obd, &body->fid1);
+                if (!obj) {
+                        obj = lmv_create_obj(exp, &body->fid1, mea);
+                        if (IS_ERR(obj))
+                                RETURN(PTR_ERR(obj));
+                } else {
                         lmv_put_obj(obj);
-                else
-                        rc = lmv_create_obj(exp, &body->fid1, mea);
+                }
         }
 
         RETURN(rc);
@@ -625,11 +632,13 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
          * need not to be update, another fields (i_size, for example) are
          * cached all the time */
         obj = lmv_grab_obj(obd, mfid);
-        LASSERT(obj);
+        LASSERT(obj != NULL);
 
         uctxt.gid1 = 0;
         uctxt.gid2 = 0;
         master_lock_mode = 0;
+
+        lmv_lock_obj(obj);
         
         for (i = 0; i < obj->objcount; i++) {
                 struct ll_fid fid = obj->objs[i].fid;
@@ -687,12 +696,14 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                  * obj is still valid. lookup it again */
                 LASSERT(req == NULL);
                 req = NULL;
+                
                 memset(&it, 0, sizeof(it));
                 it.it_op = IT_GETATTR;
                 rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid,
                                     NULL, 0, NULL, 0, NULL, &it, 0, &req, cb);
                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
                 LASSERT(rc <= 0);
+
                 if (rc < 0)
                         /* error during lookup */
                         GOTO(cleanup, rc);
@@ -727,7 +738,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
 update:
                 obj->objs[i].size = body->size;
                 CDEBUG(D_OTHER, "fresh: %lu\n",
-                       (unsigned long) obj->objs[i].size);
+                       (unsigned long)obj->objs[i].size);
 
                 if (req)
                         ptlrpc_req_finished(req);
@@ -738,8 +749,8 @@ release_lock:
         }
 
         if (*reqp) {
-                /* some attrs got refreshed, we have reply and it's time
-                 * to put fresh attrs to it */
+                /* some attrs got refreshed, we have reply and it's time to put
+                 * fresh attrs to it */
                 CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n",
                        (unsigned long) size);
                 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
@@ -768,7 +779,7 @@ release_lock:
                 rc = 1;
         }
 cleanup:
-        if (obj)
-                lmv_put_obj(obj);
+        lmv_unlock_obj(obj);
+        lmv_put_obj(obj);
         RETURN(rc);
 }
index 382df09..d4623db 100644 (file)
@@ -1,48 +1,67 @@
 #ifndef _LMV_INTERNAL_H_
 #define _LMV_INTERNAL_H_
 
-#define LL_IT2STR(it) ((it) ? ldlm_it2str((it)->it_op) : "0")
-#define MEA_SIZE_LMV(lmv)       \
-        ((lmv)->desc.ld_tgt_count * sizeof(struct ll_fid) + sizeof(struct mea))
+#define LL_IT2STR(it)                                  \
+       ((it) ? ldlm_it2str((it)->it_op) : "0")
+
+#define MEA_SIZE_LMV(lmv)                              \
+        ((lmv)->desc.ld_tgt_count *                    \
+        sizeof(struct ll_fid) + sizeof(struct mea))
         
 struct lmv_inode {
-        struct ll_fid   fid;            /* fid of dirobj */
-        unsigned long   size;
-        int             flags;
+        struct ll_fid      fid;            /* fid of dirobj */
+        unsigned long      size;
+        int                flags;
 };
 
 struct lmv_obj {
-        struct list_head        list;
-       struct semaphore        guard;
-       int                     freeing;        /* object ig freeing. */
-        atomic_t                count;
-        struct ll_fid           fid;            /* master fid of dir */
-        void                    *update;        /* bitmap of status (uptodate) */
-        int                     objcount;
-        struct lmv_inode        *objs;          /* array of dirobjs */
-        struct obd_device       *obd;           /* pointer to LMV itself */
+        struct list_head   list;
+       struct semaphore   guard;
+       int                freeing;        /* object is freeing. */
+        atomic_t           count;
+        struct ll_fid      fid;            /* master fid of dir */
+        void               *update;        /* bitmap of status (uptodate) */
+        int                objcount;
+        struct lmv_inode   *objs;          /* array of dirobjs */
+        struct obd_device  *obd;           /* pointer to LMV itself */
 };
 
+static inline void
+lmv_lock_obj(struct lmv_obj *obj)
+{
+        LASSERT(obj);
+        down(&obj->guard);
+}
+
+static inline void
+lmv_unlock_obj(struct lmv_obj *obj)
+{
+        LASSERT(obj);
+        up(&obj->guard);
+}
+
+void lmv_add_obj(struct lmv_obj *obj);
+void lmv_del_obj(struct lmv_obj *obj);
+
+void lmv_put_obj(struct lmv_obj *obj);
+void lmv_free_obj(struct lmv_obj *obj);
+
 int lmv_setup_mgr(struct obd_device *obd);
 void lmv_cleanup_mgr(struct obd_device *obd);
 int lmv_check_connect(struct obd_device *obd);
 
-void lmv_put_obj(struct lmv_obj *obj);
 struct lmv_obj *lmv_get_obj(struct lmv_obj *obj);
 
 struct lmv_obj *lmv_grab_obj(struct obd_device *obd,
                             struct ll_fid *fid);
 
-void lmv_free_obj(struct lmv_obj *obj);
-void lmv_add_obj(struct lmv_obj *obj);
-void lmv_del_obj(struct lmv_obj *obj);
-
 struct lmv_obj *lmv_alloc_obj(struct obd_device *obd,
                              struct ll_fid *fid,
                              struct mea *mea);
 
-int lmv_create_obj(struct obd_export *exp, struct ll_fid *fid,
-                  struct mea *mea);
+struct lmv_obj *lmv_create_obj(struct obd_export *exp,
+                              struct ll_fid *fid,
+                              struct mea *mea);
 
 int lmv_destroy_obj(struct obd_export *exp, struct ll_fid *fid);
 
@@ -71,12 +90,11 @@ int lmv_revalidate_slaves(struct obd_export *, struct ptlrpc_request **,
                          ldlm_blocking_callback cb_blocking);
 
 int lmv_get_mea_and_update_object(struct obd_export *, struct ll_fid *);
-
 int lmv_dirobj_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *,
                            void *, int);
 
 static inline struct mea * 
-is_body_of_splitted_dir(struct ptlrpc_request *req, int offset)
+body_of_splitted_dir(struct ptlrpc_request *req, int offset)
 {
        struct mds_body *body;
        struct mea *mea;
@@ -88,7 +106,8 @@ is_body_of_splitted_dir(struct ptlrpc_request *req, int offset)
        if (!body || !S_ISDIR(body->mode) || !body->eadatasize)
                return NULL;
 
-        mea = lustre_msg_buf(req->rq_repmsg, offset + 1, body->eadatasize);
+        mea = lustre_msg_buf(req->rq_repmsg,
+                            offset + 1, body->eadatasize);
        LASSERT(mea);
 
        if (mea->mea_count == 0)
@@ -97,7 +116,8 @@ is_body_of_splitted_dir(struct ptlrpc_request *req, int offset)
        return mea;
 }
 
-static inline int fid_equal(struct ll_fid *fid1, struct ll_fid *fid2)
+static inline int
+fid_equal(struct ll_fid *fid1, struct ll_fid *fid2)
 {
         if (fid1->mds != fid2->mds)
                 return 0;
index 7ef1329..681893d 100644 (file)
@@ -544,23 +544,25 @@ static int lmv_getattr(struct obd_export *exp, struct ll_fid *fid,
         if (rc)
                 RETURN(rc);
 
+        LASSERT(i < lmv->desc.ld_tgt_count);
+        
+        rc = md_getattr(lmv->tgts[i].ltd_exp, fid, valid,
+                        ea_size, request);
+        if (rc)
+                RETURN(rc);
+        
         obj = lmv_grab_obj(obd, fid);
         
         CDEBUG(D_OTHER, "GETATTR for %lu/%lu/%lu %s\n",
                (unsigned long)fid->mds, (unsigned long)fid->id,
                (unsigned long)fid->generation, obj ? "(splitted)" : "");
 
-        LASSERT(fid->mds < lmv->desc.ld_tgt_count);
-        rc = md_getattr(lmv->tgts[i].ltd_exp, fid,
-                        valid, ea_size, request);
-        if (rc == 0 && obj) {
+        if (obj) {
                 /* we have to loop over dirobjs here and gather attrs for all
                  * the slaves. */
 #warning "attrs gathering here"
-        }
-
-        if (obj)
                 lmv_put_obj(obj);
+        }
         
         RETURN(rc);
 }
@@ -577,9 +579,8 @@ static int lmv_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
         if (rc)
                 RETURN(rc);
         
-        CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu\n",
-               (unsigned long) fid->mds, (unsigned long) fid->id,
-               (unsigned long) fid->generation);
+        CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu\n", (unsigned long)fid->mds,
+               (unsigned long)fid->id, (unsigned long)fid->generation);
         
         LASSERT(fid->mds < lmv->desc.ld_tgt_count);
 
@@ -598,22 +599,25 @@ static int lmv_change_cbdata_name(struct obd_export *exp, struct ll_fid *pfid,
         struct lmv_obj *obj;
         int rc = 0, mds;
         ENTRY;
+
         rc = lmv_check_connect(obd);
         if (rc)
                 RETURN(rc);
+
         LASSERT(pfid->mds < lmv->desc.ld_tgt_count);
         LASSERT(cfid->mds < lmv->desc.ld_tgt_count);
+        
         CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu:%*s -> %lu/%lu/%lu\n",
-               (unsigned long) pfid->mds, (unsigned long) pfid->id,
-               (unsigned long) pfid->generation, len, name,
-               (unsigned long) cfid->mds, (unsigned long) cfid->id,
-               (unsigned long) cfid->generation);
+               (unsigned long)pfid->mds, (unsigned long)pfid->id,
+               (unsigned long)pfid->generation, len, name,
+               (unsigned long)cfid->mds, (unsigned long)cfid->id,
+               (unsigned long)cfid->generation);
 
-        /* this is default mds for directory name belongs to */
+        /* this is default mds for directory name belongs to. */
         mds = pfid->mds;
         obj = lmv_grab_obj(obd, pfid);
         if (obj) {
-                /* directory is splitted. look for right mds for this name */
+                /* directory is splitted. look for right mds for this name. */
                 mds = raw_name2idx(obj->objcount, name, len);
                 lmv_put_obj(obj);
         }
@@ -660,6 +664,7 @@ int lmv_get_mea_and_update_object(struct obd_export *exp, struct ll_fid *fid)
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
         struct ptlrpc_request *req = NULL;
+        struct lmv_obj *obj;
         struct lustre_md md;
         unsigned long valid;
         int mealen, rc;
@@ -686,7 +691,10 @@ int lmv_get_mea_and_update_object(struct obd_export *exp, struct ll_fid *fid)
         if (md.mea == NULL)
                 GOTO(cleanup, rc = -ENODATA);
 
-        rc = lmv_create_obj(exp, fid, md.mea);
+        obj = lmv_create_obj(exp, fid, md.mea);
+        if (IS_ERR(obj))
+                rc = PTR_ERR(obj);
+        
         obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
 
 cleanup:
@@ -722,27 +730,32 @@ repeat:
         }
 
         CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu\n",
-                        op_data->namelen, op_data->name,
-                        (unsigned long) op_data->fid1.mds,
-                        (unsigned long) op_data->fid1.id,
-                        (unsigned long) op_data->fid1.generation);
+               op_data->namelen, op_data->name,
+               (unsigned long)op_data->fid1.mds,
+               (unsigned long)op_data->fid1.id,
+               (unsigned long)op_data->fid1.generation);
+        
         rc = md_create(lmv->tgts[op_data->fid1.mds].ltd_exp, op_data, data,
                        datalen, mode, uid, gid, rdev, request);
         if (rc == 0) {
+                
                 if (*request == NULL)
-                     RETURN(rc);
+                        RETURN(rc);
+
                 mds_body = lustre_msg_buf((*request)->rq_repmsg, 0,
                                           sizeof(*mds_body));
                 LASSERT(mds_body != NULL);
+                
                 CDEBUG(D_OTHER, "created. id = %lu, generation = %lu, mds = %d\n",
-                       (unsigned long) mds_body->fid1.id,
-                       (unsigned long) mds_body->fid1.generation,
+                       (unsigned long)mds_body->fid1.id,
+                       (unsigned long)mds_body->fid1.generation,
                        op_data->fid1.mds);
+                
                 LASSERT(mds_body->valid & OBD_MD_MDS ||
                         mds_body->mds == op_data->fid1.mds);
         } else if (rc == -ERESTART) {
-                /* directory got splitted. time to update local object
-                 * and repeat the request with proper MDS */
+                /* directory got splitted. time to update local object and
+                 * repeat the request with proper MDS */
                 rc = lmv_get_mea_and_update_object(exp, &op_data->fid1);
                 if (rc == 0) {
                         ptlrpc_req_finished(*request);
@@ -769,11 +782,10 @@ int lmv_done_writing(struct obd_export *exp, struct obdo *obdo)
 }
 
 int lmv_enqueue_slaves(struct obd_export *exp, int locktype,
-                         struct lookup_intent *it, int lockmode,
-                         struct mdc_op_data *data, struct lustre_handle *lockh,
-                         void *lmm, int lmmsize,
-                         ldlm_completion_callback cb_completion,
-                         ldlm_blocking_callback cb_blocking, void *cb_data)
+                       struct lookup_intent *it, int lockmode,
+                       struct mdc_op_data *data, struct lustre_handle *lockh,
+                       void *lmm, int lmmsize, ldlm_completion_callback cb_completion,
+                       ldlm_blocking_callback cb_blocking, void *cb_data)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
@@ -826,8 +838,7 @@ cleanup:
 int lmv_enqueue(struct obd_export *exp, int lock_type,
                 struct lookup_intent *it, int lock_mode,
                 struct mdc_op_data *data, struct lustre_handle *lockh,
-                void *lmm, int lmmsize,
-                ldlm_completion_callback cb_completion,
+                void *lmm, int lmmsize, ldlm_completion_callback cb_completion,
                 ldlm_blocking_callback cb_blocking, void *cb_data)
 {
         struct obd_device *obd = exp->exp_obd;
@@ -850,17 +861,17 @@ int lmv_enqueue(struct obd_export *exp, int lock_type,
         if (data->namelen) {
                 obj = lmv_grab_obj(obd, &data->fid1);
                 if (obj) {
-                        /* directory is splitted. look for
-                         * right mds for this name */
+                        /* directory is splitted. look for right mds for this
+                         * name */
                         mds = raw_name2idx(obj->objcount, (char *)data->name,
                                            data->namelen);
                         data->fid1 = obj->objs[mds].fid;
                         lmv_put_obj(obj);
                 }
         }
-        CDEBUG(D_OTHER, "ENQUEUE '%s' on %lu/%lu\n",
-               LL_IT2STR(it), (unsigned long) data->fid1.id,
-               (unsigned long) data->fid1.generation);
+        CDEBUG(D_OTHER, "ENQUEUE '%s' on %lu/%lu\n", LL_IT2STR(it),
+               (unsigned long)data->fid1.id, (unsigned long)data->fid1.generation);
+        
         rc = md_enqueue(lmv->tgts[data->fid1.mds].ltd_exp, lock_type, it,
                         lock_mode, data, lockh, lmm, lmmsize, cb_completion,
                         cb_blocking, cb_data);
@@ -869,8 +880,8 @@ int lmv_enqueue(struct obd_export *exp, int lock_type,
 }
 
 int lmv_getattr_name(struct obd_export *exp, struct ll_fid *fid,
-                         char *filename, int namelen, unsigned long valid,
-                         unsigned int ea_size, struct ptlrpc_request **request)
+                     char *filename, int namelen, unsigned long valid,
+                     unsigned int ea_size, struct ptlrpc_request **request)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
@@ -890,17 +901,19 @@ repeat:
                 rfid = obj->objs[mds].fid;
                 lmv_put_obj(obj);
         }
+        
         CDEBUG(D_OTHER, "getattr_name for %*s on %lu/%lu/%lu -> %lu/%lu/%lu\n",
-               namelen, filename, (unsigned long) fid->mds,
-               (unsigned long) fid->id, (unsigned long) fid->generation,
-               (unsigned long) rfid.mds, (unsigned long) rfid.id,
-               (unsigned long) rfid.generation);
-        rc = md_getattr_name(lmv->tgts[mds].ltd_exp, &rfid, filename, namelen,
-                                  valid, ea_size, request);
+               namelen, filename, (unsigned long)fid->mds,
+               (unsigned long)fid->id, (unsigned long)fid->generation,
+               (unsigned long)rfid.mds, (unsigned long)rfid.id,
+               (unsigned long)rfid.generation);
+
+        rc = md_getattr_name(lmv->tgts[mds].ltd_exp, &rfid, filename,
+                             namelen, valid, ea_size, request);
         if (rc == 0) {
-                /* this could be cross-node reference. in this case all
-                 * we have right now is mds/ino/generation triple. we'd
-                 * like to find other attributes */
+                /* this could be cross-node reference. in this case all we have
+                 * right now is mds/ino/generation triple. we'd like to find
+                 * other attributes */
                 body = lustre_msg_buf((*request)->rq_repmsg, 0, sizeof(*body));
                 LASSERT(body != NULL);
                 if (body->valid & OBD_MD_MDS) {
@@ -916,8 +929,8 @@ repeat:
                         *request = req;
                 }
         } else if (rc == -ERESTART) {
-                /* directory got splitted. time to update local object
-                 * and repeat the request with proper MDS */
+                /* directory got splitted. time to update local object and
+                 * repeat the request with proper MDS */
                 rc = lmv_get_mea_and_update_object(exp, &rfid);
                 if (rc == 0) {
                         ptlrpc_req_finished(*request);
@@ -929,8 +942,8 @@ repeat:
 
 
 /*
- * llite passes fid of an target inode in data->fid1 and
- * fid of directory in data->fid2
+ * llite passes fid of an target inode in data->fid1 and fid of directory in
+ * data->fid2
  */
 int lmv_link(struct obd_export *exp, struct mdc_op_data *data,
              struct ptlrpc_request **request)
@@ -940,9 +953,11 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data,
         struct lmv_obj *obj;
         int rc;
         ENTRY;
+        
         rc = lmv_check_connect(obd);
        if (rc)
                RETURN(rc);
+
         if (data->namelen != 0) {
                 /* usual link request */
                 obj = lmv_grab_obj(obd, &data->fid1);
@@ -952,17 +967,22 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data,
                         data->fid1 = obj->objs[rc].fid;
                         lmv_put_obj(obj);
                 }
-                CDEBUG(D_OTHER,"link %u/%u/%u:%*s to %u/%u/%u mds %d\n",
-                       (unsigned) data->fid2.mds, (unsigned) data->fid2.id,
-                       (unsigned) data->fid2.generation, data->namelen,
-                       data->name, (unsigned) data->fid1.mds,
-                       (unsigned) data->fid1.id,
-                       (unsigned) data->fid1.generation, data->fid1.mds);
+                
+                CDEBUG(D_OTHER,"link %lu/%lu/%lu:%*s to %lu/%lu/%lu mds %lu\n",
+                       (unsigned long)data->fid2.mds,
+                       (unsigned long)data->fid2.id,
+                       (unsigned long)data->fid2.generation,
+                       data->namelen, data->name,
+                       (unsigned long)data->fid1.mds,
+                       (unsigned long)data->fid1.id,
+                       (unsigned long)data->fid1.generation,
+                       (unsigned long)data->fid1.mds);
         } else {
                 /* request from MDS to acquire i_links for inode by fid1 */
-                CDEBUG(D_OTHER, "inc i_nlinks for %u/%u/%u\n",
-                       (unsigned) data->fid1.mds, (unsigned) data->fid1.id,
-                       (unsigned) data->fid1.generation);
+                CDEBUG(D_OTHER, "inc i_nlinks for %lu/%lu/%lu\n",
+                       (unsigned long)data->fid1.mds,
+                       (unsigned long)data->fid1.id,
+                       (unsigned long)data->fid1.generation);
         }
                         
         rc = md_link(lmv->tgts[data->fid1.mds].ltd_exp, data, request);
@@ -980,64 +1000,65 @@ int lmv_rename(struct obd_export *exp, struct mdc_op_data *data,
         ENTRY;
 
         CDEBUG(D_OTHER, "rename %*s in %lu/%lu/%lu to %*s in %lu/%lu/%lu\n",
-               oldlen, old, (unsigned long) data->fid1.mds,
-               (unsigned long) data->fid1.id,
-               (unsigned long) data->fid1.generation,
+               oldlen, old, (unsigned long)data->fid1.mds,
+               (unsigned long)data->fid1.id,
+               (unsigned long)data->fid1.generation,
                newlen, new, (unsigned long) data->fid2.mds,
                (unsigned long) data->fid2.id,
                (unsigned long) data->fid2.generation);
+        
         if (!fid_equal(&data->fid1, &data->fid2))
                 CWARN("cross-node rename %lu/%lu/%lu:%*s to %lu/%lu/%lu:%*s\n",
-                      (unsigned long) data->fid1.mds,
-                      (unsigned long) data->fid1.id,
-                      (unsigned long) data->fid1.generation, oldlen, old,
-                      (unsigned long) data->fid2.mds,
-                      (unsigned long) data->fid2.id,
-                      (unsigned long) data->fid2.generation, newlen, new);
+                      (unsigned long)data->fid1.mds,
+                      (unsigned long)data->fid1.id,
+                      (unsigned long)data->fid1.generation, oldlen, old,
+                      (unsigned long)data->fid2.mds,
+                      (unsigned long)data->fid2.id,
+                      (unsigned long)data->fid2.generation, newlen, new);
 
         rc = lmv_check_connect(obd);
        if (rc)
                RETURN(rc);
 
         if (oldlen == 0) {
-                /* MDS with old dir entry is asking another MDS
-                 * to create name there */
+                /* MDS with old dir entry is asking another MDS to create name
+                 * there */
                 CDEBUG(D_OTHER,
                        "create %*s(%d/%d) in %lu/%lu/%lu pointing to %lu/%lu/%lu\n",
                        newlen, new, oldlen, newlen,
-                       (unsigned long) data->fid2.mds,
-                       (unsigned long) data->fid2.id,
-                       (unsigned long) data->fid2.generation,
-                       (unsigned long) data->fid1.mds,
-                       (unsigned long) data->fid1.id,
-                       (unsigned long) data->fid1.generation);
+                       (unsigned long)data->fid2.mds,
+                       (unsigned long)data->fid2.id,
+                       (unsigned long)data->fid2.generation,
+                       (unsigned long)data->fid1.mds,
+                       (unsigned long)data->fid1.id,
+                       (unsigned long)data->fid1.generation);
                 mds = data->fid2.mds;
                 goto request;
         }
 
         obj = lmv_grab_obj(obd, &data->fid1);
         if (obj) {
-                /* directory is already splitted, so we have to forward
-                 * request to the right MDS */
+                /* directory is already splitted, so we have to forward request
+                 * to the right MDS */
                 mds = raw_name2idx(obj->objcount, (char *)old, oldlen);
                 data->fid1 = obj->objs[mds].fid;
                 CDEBUG(D_OTHER, "forward to MDS #%u (%lu/%lu/%lu)\n", mds,
-                       (unsigned long) obj->objs[mds].fid.mds,
-                       (unsigned long) obj->objs[mds].fid.id,
-                       (unsigned long) obj->objs[mds].fid.generation);
+                       (unsigned long)obj->objs[mds].fid.mds,
+                       (unsigned long)obj->objs[mds].fid.id,
+                       (unsigned long)obj->objs[mds].fid.generation);
                 lmv_put_obj(obj);
         }
 
         obj = lmv_grab_obj(obd, &data->fid2);
         if (obj) {
-                /* directory is already splitted, so we have to forward
-                 * request to the right MDS */
+                /* directory is already splitted, so we have to forward request
+                 * to the right MDS */
                 mds = raw_name2idx(obj->objcount, (char *)new, newlen);
                 data->fid2 = obj->objs[mds].fid;
                 CDEBUG(D_OTHER, "forward to MDS #%u (%lu/%lu/%lu)\n", mds,
-                       (unsigned long) obj->objs[mds].fid.mds,
-                       (unsigned long) obj->objs[mds].fid.id,
-                       (unsigned long) obj->objs[mds].fid.generation);
+                       (unsigned long)obj->objs[mds].fid.mds,
+                       (unsigned long)obj->objs[mds].fid.id,
+                       (unsigned long)obj->objs[mds].fid.generation);
                 lmv_put_obj(obj);
         }
         
@@ -1066,20 +1087,27 @@ int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data,
                RETURN(rc);
 
         obj = lmv_grab_obj(obd, &data->fid1);
+        
         CDEBUG(D_OTHER, "SETATTR for %lu/%lu/%lu, valid 0x%x%s\n",
-               (unsigned long) data->fid1.mds,
-               (unsigned long) data->fid1.id,
-               (unsigned long) data->fid1.generation, iattr->ia_valid,
+               (unsigned long)data->fid1.mds, (unsigned long)data->fid1.id,
+               (unsigned long)data->fid1.generation, iattr->ia_valid,
                obj ? ", splitted" : "");
+        
         if (obj) {
                 for (i = 0; i < obj->objcount; i++) {
                         data->fid1 = obj->objs[i].fid;
-                        rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr, ea,
-                                        ealen, ea2, ea2len, &req);
-                        LASSERT(rc == 0);
+                        
+                        rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr,
+                                        ea, ealen, ea2, ea2len, &req);
+                        if (rc) {
+                                lmv_put_obj(obj);
+                                ptlrpc_req_finished(req);
+                                RETURN(rc);
+                        }
+
                         if (fid_equal(&obj->fid, &obj->objs[i].fid)) {
-                                /* this is master object and this request
-                                 * should be returned back to llite */
+                                /* this is master object and this request should
+                                 * be returned back to llite */
                                 *request = req;
                         } else {
                                 ptlrpc_req_finished(req);
@@ -1087,12 +1115,12 @@ int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data,
                 }
                 lmv_put_obj(obj);
         } else {
-                LASSERT(data->fid1.mds < lmv->desc.ld_tgt_count);
+                LASSERT(i < lmv->desc.ld_tgt_count);
                 rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr, ea, ealen,
                                 ea2, ea2len, request); 
                 if (rc == 0) {
                         mds_body = lustre_msg_buf((*request)->rq_repmsg, 0,
-                                        sizeof(*mds_body));
+                                                  sizeof(*mds_body));
                         LASSERT(mds_body != NULL);
                         LASSERT(mds_body->mds == i);
                 }
@@ -1192,14 +1220,17 @@ int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid,
 
         obj = lmv_grab_obj(obd, mdc_fid);
         if (obj) {
-                /* find dirobj containing page with requested offset */
-                /* FIXME: what about protecting cached attrs here? */
+                lmv_lock_obj(obj);
+
+                /* find dirobj containing page with requested offset. */
                 for (i = 0; i < obj->objcount; i++) {
                         if (offset < obj->objs[i].size)
                                 break;
                         offset -= obj->objs[i].size;
                 }
                 rfid = obj->objs[i].fid;
+                
+                lmv_unlock_obj(obj);
                 lmv_put_obj(obj);
                 
                 CDEBUG(D_OTHER, "forward to %lu/%lu/%lu with offset %lu\n",
index 7ba5a7a..f79709d 100644 (file)
@@ -193,22 +193,22 @@ __lmv_grab_obj(struct obd_device *obd, struct ll_fid *fid)
 
                 /* check if object is in progress of destroying. If so - skip
                  * it. */
-                down(&obj->guard);
+                lmv_lock_obj(obj);
 
                 if (obj->freeing) {
-                        up(&obj->guard);
+                        lmv_unlock_obj(obj);
                         continue;
                 }
 
                 /* check if this is waht we're looking for. */
                 if (fid_equal(&obj->fid, fid)) {
                         __lmv_get_obj(obj);
-                        up(&obj->guard);
+                        lmv_unlock_obj(obj);
                         return obj;
                 }
-                
-                up(&obj->guard);
+                lmv_unlock_obj(obj);
         }
+
         return NULL;
 }
 
@@ -269,7 +269,7 @@ __lmv_create_obj(struct obd_device *obd, struct ll_fid *fid,
 
 /* creates object from passed @fid and @mea. If @mea is NULL, it will be
  * obtained from correct MDT and used for constructing the object. */
-int
+struct lmv_obj *
 lmv_create_obj(struct obd_export *exp,
                struct ll_fid *fid, struct mea *mea)
 {
@@ -278,7 +278,7 @@ lmv_create_obj(struct obd_export *exp,
         struct ptlrpc_request *req = NULL;
         struct lmv_obj *obj;
         struct lustre_md md;
-        int mealen, i, rc = 0;
+        int mealen, i, rc;
         ENTRY;
 
         CDEBUG(D_OTHER, "get mea for %lu/%lu/%lu and create lmv obj\n",
@@ -300,17 +300,17 @@ lmv_create_obj(struct obd_export *exp,
                                 valid, mealen, &req);
                 if (rc) {
                         CERROR("md_getattr() failed, error %d\n", rc);
-                        GOTO(cleanup, rc);
+                        GOTO(cleanup, obj = ERR_PTR(rc));
                 }
 
                 rc = mdc_req2lustre_md(exp, req, 0, NULL, &md);
                 if (rc) {
                         CERROR("mdc_req2lustre_md() failed, error %d\n", rc);
-                        GOTO(cleanup, rc);
+                        GOTO(cleanup, obj = ERR_PTR(rc));
                 }
 
                 if (!md.mea)
-                        GOTO(cleanup, rc = -ENODATA);
+                        GOTO(cleanup, obj = ERR_PTR(-ENODATA));
                         
                 mea = md.mea;
         }
@@ -321,14 +321,14 @@ lmv_create_obj(struct obd_export *exp,
                 CERROR("Can't create new object %lu/%lu/%lu\n",
                        (unsigned long)fid->mds, (unsigned long)fid->id,
                        (unsigned long)fid->generation);
-                GOTO(cleanup, rc = -ENOMEM);
+                GOTO(cleanup, obj = ERR_PTR(-ENOMEM));
         }
         
         lmv_put_obj(obj);
 cleanup:
         if (req)       
                 ptlrpc_req_finished(req);
-        RETURN(rc); 
+        RETURN(obj);
 }
 
 /* looks for object with @fid and orders to destroy it. It possible the object
@@ -340,25 +340,27 @@ lmv_destroy_obj(struct obd_export *exp, struct ll_fid *fid)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obj *obj;
-        int rc = 0;
         ENTRY;
 
         spin_lock(&lmv_obj_list_lock);
         
         obj = __lmv_grab_obj(obd, fid);
         if (obj) {
-                down(&obj->guard);
+
+                /* marking object as "freeing in progress" */
+                lmv_lock_obj(obj);
                 obj->freeing = 1;
-                up(&obj->guard);
-                
+                lmv_unlock_obj(obj);
+
                 __lmv_put_obj(obj);
                 __lmv_put_obj(obj);
                 
-                rc = 1;
+                spin_unlock(&lmv_obj_list_lock);
+                RETURN(1);
         }
         
         spin_unlock(&lmv_obj_list_lock);
-        RETURN(rc);
+        RETURN(0);
 }
 
 int