Whamcloud - gitweb
fixed a bug in mdt_intent_getattr(): use reply message before packing it.
authorhuanghua <huanghua>
Tue, 11 Jul 2006 11:06:45 +0000 (11:06 +0000)
committerhuanghua <huanghua>
Tue, 11 Jul 2006 11:06:45 +0000 (11:06 +0000)
lustre/mdt/mdt_handler.c

index 2373acc..7ca9202 100644 (file)
@@ -191,7 +191,8 @@ static inline int mdt_body_has_lov(const struct lu_attr *la,
 }
 
 static int mdt_getattr_internal(struct mdt_thread_info *info,
-                                struct mdt_object *o)
+                                struct mdt_object *o,
+                                int need_pack_reply)
 {
         struct md_object        *next = mdt_object_child(o);
         const struct mdt_body   *reqbody = info->mti_body;
@@ -211,21 +212,20 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 
         rc = mo_attr_get(ctxt, next, la);
         if (rc == -EREMOTE) {
-                /* This object is located on remote node */
-                req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD, 
-                                     RCL_SERVER, 0);
-#ifdef CONFIG_FS_POSIX_ACL
-                req_capsule_set_size(&info->mti_pill, &RMF_EADATA, 
-                                     RCL_SERVER, 0);
-#endif
-                rc = req_capsule_pack(&info->mti_pill);
-                if (rc == 0) {
-                        repbody = req_capsule_server_get(&info->mti_pill,
-                                                         &RMF_MDT_BODY);
-                        repbody->fid1 = *mdt_object_fid(o);
-                        repbody->valid |= OBD_MD_FLID;
+                /* FIXME: This object is located on remote node.
+                 * What value should we return to client? 
+                 */
+                if (need_pack_reply) {
+                        rc = req_capsule_pack(&info->mti_pill);
+                        if (rc)
+                                RETURN(rc);
                 }
-                RETURN(rc);
+
+                repbody = req_capsule_server_get(&info->mti_pill,
+                                                 &RMF_MDT_BODY);
+                repbody->fid1 = *mdt_object_fid(o);
+                repbody->valid |= OBD_MD_FLID;
+                RETURN(0);
         } 
         if (rc){
                 CERROR("getattr error for "DFID3": %d\n",
@@ -233,6 +233,9 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                 RETURN(rc);
         }
 
+        if ( !need_pack_reply)
+                goto skip_packing;
+        
         /* pre-getattr: to guess how many bytes we need. */
         if (mdt_body_has_lov(la, reqbody)) {
                 /* this should return the total length, or error */
@@ -276,6 +279,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                 RETURN(rc);
         }
 
+skip_packing:
         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
         mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
 
@@ -357,7 +361,7 @@ static int mdt_getattr(struct mdt_thread_info *info)
         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
                 result = -ENOMEM;
         } else {
-                result = mdt_getattr_internal(info, info->mti_object);
+                result = mdt_getattr_internal(info, info->mti_object, 1);
         }
         RETURN(result);
 }
@@ -370,7 +374,8 @@ static int mdt_getattr(struct mdt_thread_info *info)
  */
 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                                  struct mdt_lock_handle *lhc,
-                                 __u64 child_bits)
+                                 __u64 child_bits,
+                                 struct ldlm_reply *ldlm_rep)
 {
         struct mdt_object *parent = info->mti_object;
         struct mdt_object *child;
@@ -387,15 +392,18 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         if (name == NULL)
                 RETURN(-EFAULT);
         
+        intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
         if (strlen(name) == 0) {
                 /* only open the child. parent is on another node. */
+                intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
                 child = parent;
                 mdt_lock_handle_init(lhc);
                 lhc->mlh_mode = LCK_CR;
                 result = mdt_object_lock(info, child, lhc, child_bits);
                 if (result != 0) {
                         /* finally, we can get attr for child. */
-                        result = mdt_getattr_internal(info, child);
+                        result = mdt_getattr_internal(info, child, 
+                                                      ldlm_rep ? 0 : 1);
                         if (result != 0)
                                 mdt_object_unlock(info, child, lhc);
                 }
@@ -411,9 +419,12 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
 
         /*step 2: lookup child's fid by name */
         result = mdo_lookup(info->mti_ctxt, next, name, child_fid);
-        if (result != 0)
+        if (result != 0) {
+                if (result == -ENOENT)
+                        intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
                 GOTO(out_parent, result);
-
+        } else
+                intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
         /*
          *step 3: find the child object by fid & lock it.
          *        regardless if it is local or remote.
@@ -425,7 +436,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 GOTO(out_parent, result = PTR_ERR(child));
 
         /* finally, we can get attr for child. */
-        result = mdt_getattr_internal(info, child);
+        result = mdt_getattr_internal(info, child, ldlm_rep ? 0 : 1);
         if (result != 0)
                 mdt_object_unlock(info, child, lhc);
         mdt_object_put(info->mti_ctxt, child);
@@ -444,7 +455,7 @@ static int mdt_getattr_name(struct mdt_thread_info *info)
 
         ENTRY;
 
-        rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE);
+        rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
         if (lustre_handle_is_used(&lhc->mlh_lh)) {
                 ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
                 lhc->mlh_lh.cookie = 0;
@@ -1427,6 +1438,7 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
         struct ldlm_reply      *ldlm_rep;
         struct mdt_lock_handle  tmp_lock;
         struct mdt_lock_handle *lhc = &tmp_lock;
+        struct mdt_device      *mdt = info->mti_mdt;
         __u64                   child_bits;
         int                     rc;
 
@@ -1444,23 +1456,28 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
                 RETURN(-EINVAL);
         }
 
-        rc = mdt_getattr_name_lock(info, lhc, child_bits);
+        req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD,
+                             RCL_SERVER, mdt->mdt_max_mdsize);
+#ifdef CONFIG_FS_POSIX_ACL
+        req_capsule_set_size(&info->mti_pill, &RMF_EADATA,
+                             RCL_SERVER, mdt->mdt_max_cookiesize);
+#endif
+        rc = req_capsule_pack(&info->mti_pill);
+        if (rc)
+                RETURN(rc);
         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
         intent_set_disposition(ldlm_rep, DISP_IT_EXECD);
-        intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
 
-        if (rc) {
-                if (rc == -EREMOTE)
-                        intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
-                else
-                        intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
-                if (ldlm_rep)
-                        ldlm_rep->lock_policy_res2 = 0;
+        ldlm_rep->lock_policy_res2 = 
+                mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
+
+        if (intent_disposition(ldlm_rep, DISP_LOOKUP_NEG))
+                ldlm_rep->lock_policy_res2 = 0;
+        if (!intent_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
+                    ldlm_rep->lock_policy_res2) {
                 RETURN(ELDLM_LOCK_ABORTED);
         }
 
-        intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
-
         new_lock = ldlm_handle2lock(&lhc->mlh_lh);
         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
                 RETURN(0);