Whamcloud - gitweb
fixed some defects according to inspection result. (not finished)
authorhuanghua <huanghua>
Sun, 9 Jul 2006 13:56:06 +0000 (13:56 +0000)
committerhuanghua <huanghua>
Sun, 9 Jul 2006 13:56:06 +0000 (13:56 +0000)
lustre/mdt/mdt_fs.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/mdt/mdt_xattr.c

index 484fc84..cf67877 100644 (file)
@@ -129,15 +129,15 @@ int mdt_client_free(const struct lu_context *ctxt,
                     struct mdt_export_data *med)
 {
         struct mdt_client_data *mcd = med->med_mcd;
-        int rc;
+        int rc = 0;
         loff_t off;
         ENTRY;
 
         if (!mcd)
                 RETURN(0);
 
-        CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n",
-               med->med_lr_idx, med->med_lr_off, mcd->mcd_uuid);
+        CDEBUG(D_INFO, "freeing client at idx %u, offset %lld\n",
+               med->med_lr_idx, med->med_lr_off);
 
         off = med->med_lr_off;
 
@@ -157,12 +157,13 @@ int mdt_client_free(const struct lu_context *ctxt,
                 LBUG();
         }
 
-//      if (!(exp->exp_flags & OBD_OPT_FAILOVER)) {
         memset(&mcd, 0, sizeof *mcd);
+/*
         rc = mdt->mdt_last->do_body_ops->dbo_write(ctxt, 
                                                    mdt->mdt_last,
                                                    mcd,
                                                    sizeof(*mcd), &off);
+*/
         CDEBUG_EX(rc == 0 ? D_INFO : D_ERROR,
                   "zeroing out client idx %u in %s rc %d\n",
                   med->med_lr_idx, LAST_RCVD, rc);
index e9837a5..edbc482 100644 (file)
@@ -42,7 +42,7 @@
 #include <lustre_ver.h>
 /*
  * struct OBD_{ALLOC,FREE}*()
- * OBD_FAIL_CHECK
+ * MDT_FAIL_CHECK
  */
 #include <obd_support.h>
 /* struct ptlrpc_request */
  */
 unsigned long mdt_num_threads;
 
+/* ptlrpc request handler for MDT. All handlers are 
+ * grouped into several slices - struct mdt_opc_slice,
+ * and stored in an array - mdt_handlers[].
+ */
 struct mdt_handler {
+        /* The name of this handler. */
         const char *mh_name;
+        /* Fail id for this handler, checked at the beginning of this handler.*/
         int         mh_fail_id;
+        /* Operation code for this handler */
         __u32       mh_opc;
+        /* flags are listed in enum mdt_handler_flags below. */ 
         __u32       mh_flags;
+        /* The actual handler function to execute. */
         int (*mh_act)(struct mdt_thread_info *info);
-
+        /* Request format for this request. */
         const struct req_format *mh_fmt;
 };
 
@@ -111,10 +120,7 @@ static int mdt_getstatus(struct mdt_thread_info *info)
 
         ENTRY;
 
-        /* Can I remove the OBD_FAIL_CHECK here? because the packing result
-         * has already been checked.
-         */
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
+        if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
                 result = -ENOMEM;
         else {
                 body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
@@ -124,9 +130,6 @@ static int mdt_getstatus(struct mdt_thread_info *info)
                         body->valid |= OBD_MD_FLID;
         }
 
-        /* the last_committed and last_xid fields are filled in for all
-         * replies already - no need to do so here also.
-         */
         RETURN(result);
 }
 
@@ -138,8 +141,7 @@ static int mdt_statfs(struct mdt_thread_info *info)
 
         ENTRY;
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
-                CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
+        if (MDT_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
                 result = -ENOMEM;
         } else {
                 osfs = req_capsule_server_get(&info->mti_pill, &RMF_OBD_STATFS);
@@ -152,7 +154,9 @@ static int mdt_statfs(struct mdt_thread_info *info)
         RETURN(result);
 }
 
-void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr)
+void mdt_pack_attr2body(struct mdt_body *b, 
+                        struct lu_attr *attr, 
+                        const struct lu_fid *fid)
 {
         b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID |
                     OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLTYPE |
@@ -172,56 +176,73 @@ void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr)
         b->gid        = attr->la_gid;
         b->flags      = attr->la_flags;
         b->nlink      = attr->la_nlink;
+
+        if (fid) {
+                b->fid1 = *fid;
+                b->valid |= OBD_MD_FLID;
+        }
 }
 
-static int mdt_getattr_pack_msg(struct mdt_thread_info *info, 
+static inline int mdt_body_has_lov(const struct lu_attr *la,
+                                   const struct mdt_body *body)
+{
+        return ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
+                (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
+}
+
+static int mdt_getattr_internal(struct mdt_thread_info *info,
                                 struct mdt_object *o)
 {
-        const struct mdt_body *body = info->mti_body;
-        struct req_capsule *pill = &info->mti_pill;
-        struct ptlrpc_request *req = mdt_info_req(info);
-        struct md_object *next = mdt_object_child(o);
-        struct lu_attr *la = &info->mti_attr;
-        int rc;
+        struct md_object        *next = mdt_object_child(o);
+        const struct mdt_body   *reqbody = info->mti_body;
+        struct ptlrpc_request   *req = mdt_info_req(info);
+        struct lu_attr          *la = &info->mti_attr;
+        struct req_capsule      *pill = &info->mti_pill;
+        const struct lu_context *ctxt = info->mti_ctxt;
+        struct mdt_body         *repbody;
+        void                    *buffer;
+        int                     length;
+        int                     rc;
         ENTRY;
 
-        rc = mo_attr_get(info->mti_ctxt, next, la);
+        if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
+                RETURN(-ENOMEM);
+        }
+
+        rc = mo_attr_get(ctxt, next, la);
         if (rc){
+                CERROR("getattr error for "DFID3": %d\n",
+                        PFID3(mdt_object_fid(o)), rc);
                 RETURN(rc);
         }
-        if ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
-            (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA))) {
-                rc = mo_xattr_get(info->mti_ctxt, next, NULL, 0, "lov");
 
-                CDEBUG(D_INODE, "got %d bytes MD data for object "DFID3"\n",
-                       rc, PFID3(mdt_object_fid(o)));
+        /* pre-getattr: to guess how many bytes we need. */
+        if (mdt_body_has_lov(la, reqbody)) {
+                /* this should return the total length, or error */
+                rc = mo_xattr_get(ctxt, next, NULL, 0, XATTR_NAME_LOV);
+
+                CDEBUG(D_INODE, "got %d(max=%d) bytes MD data for "DFID3"\n",
+                       rc, info->mti_mdt->mdt_max_mdsize,
+                       PFID3(mdt_object_fid(o)));
                 if (rc < 0) {
                         if (rc != -ENODATA && rc != -EOPNOTSUPP) {
-                                CERROR("error getting MD "DFID3": rc = %d\n",
-                                       PFID3(mdt_object_fid(o)),
-                                       rc);
                                 RETURN(rc);
                         }
                         rc = 0;
-                } else if (rc > MAX_MD_SIZE) {
-                        CERROR("MD size %d larger than maximum possible %u\n",
-                               rc, MAX_MD_SIZE);
+                } else if (rc > info->mti_mdt->mdt_max_mdsize) {
                         rc = 0;
                 }
-        } else if (S_ISLNK(la->la_mode) && (body->valid & OBD_MD_LINKNAME)) {
-                /* XXX:It also uses the mdt_md to hold symname.
-                 * Are there any problem? will be swabbed? hope not.
-                 */
-                rc = min_t(int, la->la_size + 1, body->eadatasize);
+        } else if (S_ISLNK(la->la_mode) && (reqbody->valid & OBD_MD_LINKNAME)) {
+                /* NB: It also uses the mdt_md to hold symname. */
+                rc = min_t(int, la->la_size + 1, reqbody->eadatasize);
         }
         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
 
 #ifdef CONFIG_FS_POSIX_ACL
         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
-            (body->valid & OBD_MD_FLACL)) {
+            (reqbody->valid & OBD_MD_FLACL)) {
 
-                rc = mo_xattr_get(info->mti_ctxt, next,
-                                  NULL, 0, XATTR_NAME_ACL_ACCESS);
+                rc = mo_xattr_get(ctxt, next, NULL, 0, XATTR_NAME_ACL_ACCESS);
                 if (rc < 0) {
                         if (rc != -ENODATA && rc != -EOPNOTSUPP) {
                                 CERROR("got acl size: %d\n", rc);
@@ -232,51 +253,25 @@ static int mdt_getattr_pack_msg(struct mdt_thread_info *info,
                 req_capsule_set_size(pill, &RMF_EADATA, RCL_SERVER, rc);
         }
 #endif
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
-                CERROR("failed MDT_GETATTR_PACK test\n");
-                RETURN(-ENOMEM);
-        }
-        rc = req_capsule_pack(&info->mti_pill);
+        rc = req_capsule_pack(pill);
         if (rc) {
-                CERROR("lustre_pack_reply failed: rc %d\n", rc);
-        }
-
-        RETURN(rc);
-}
-
-static int mdt_getattr_internal(struct mdt_thread_info *info,
-                                struct mdt_object *o)
-{
-        struct md_object *next = mdt_object_child(o);
-        const struct mdt_body  *reqbody = info->mti_body;
-        struct mdt_body  *repbody;
-        struct lu_attr *la = &info->mti_attr;
-        int rc;
-        void *buffer;
-        int length;
-        struct ptlrpc_request *req = mdt_info_req(info);
-        ENTRY;
-
-        rc = mo_attr_get(info->mti_ctxt, next, la);
-        if (rc){
-                CERROR("getattr error for "DFID3": %d\n",
-                        PFID3(&reqbody->fid1), rc);
+                CERROR("lustre pack reply for getattr failed: rc %d\n", rc);
                 RETURN(rc);
         }
 
-        repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
-        mdt_pack_attr2body(repbody, la);
-        repbody->fid1 = *mdt_object_fid(o);
-        repbody->valid |= OBD_MD_FLID;
+        repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
+        mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
 
-        buffer = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
-        length = req_capsule_get_size(&info->mti_pill, &RMF_MDT_MD,
-                                      RCL_SERVER);
+        buffer = req_capsule_server_get(pill, &RMF_MDT_MD);
+        length = req_capsule_get_size(pill, &RMF_MDT_MD, RCL_SERVER);
+
+        /* now, to getattr*/
+        if (mdt_body_has_lov(la, reqbody)) {
+                if (length > 0)
+                        rc = mo_xattr_get(info->mti_ctxt, next,
+                                          buffer, length, XATTR_NAME_LOV);
+                /* else rc = 0; */
 
-        if ((S_ISREG(la->la_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
-            (S_ISDIR(la->la_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
-                rc = mo_xattr_get(info->mti_ctxt, next,
-                                  buffer, length, "lov");
                 if (rc < 0)
                         RETURN(rc);
 
@@ -308,7 +303,6 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                 repbody->valid |= OBD_MD_FLMODEASIZE;
         }
 
-
 #ifdef CONFIG_FS_POSIX_ACL
         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
             (reqbody->valid & OBD_MD_FLACL)) {
@@ -317,15 +311,15 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                 length = req_capsule_get_size(&info->mti_pill,
                                               &RMF_EADATA,
                                               RCL_SERVER);
-                rc = mo_xattr_get(info->mti_ctxt, next,
-                                  buffer, length, XATTR_NAME_ACL_ACCESS);
+                if (length > 0)
+                        rc = mo_xattr_get(info->mti_ctxt, next, buffer, 
+                                          length, XATTR_NAME_ACL_ACCESS);
+                else 
+                        rc = 0;
 
                 if (rc < 0) {
-                        if (rc != -ENODATA && rc != -EOPNOTSUPP) {
-                                CERROR("got acl size: %d\n", rc);
-                                RETURN(rc);
-                        }
-                        rc = 0;
+                        CERROR("got acl size: %d\n", rc);
+                        RETURN(rc);
                 }
                 repbody->aclsize = rc;
                 repbody->valid |= OBD_MD_FLACL;
@@ -343,13 +337,10 @@ static int mdt_getattr(struct mdt_thread_info *info)
                                         &info->mti_object->mot_obj.mo_lu));
         ENTRY;
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
-                CERROR(LUSTRE_MDT0_NAME": getattr lustre_pack_reply failed\n");
+        if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
                 result = -ENOMEM;
         } else {
-                result = mdt_getattr_pack_msg(info, info->mti_object);
-                if (result == 0)
-                        result = mdt_getattr_internal(info, info->mti_object);
+                result = mdt_getattr_internal(info, info->mti_object);
         }
         RETURN(result);
 }
@@ -367,11 +358,10 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         struct mdt_object *parent = info->mti_object;
         struct mdt_object *child;
         struct md_object  *next = mdt_object_child(info->mti_object);
-        const char *name;
-        int result;
+        struct lu_fid     *child_fid = &info->mti_tmp_fid1;
+        const char        *name;
+        int               result;
         struct mdt_lock_handle *lhp;
-        struct lu_fid child_fid;
-        struct ldlm_namespace *ns;
         ENTRY;
 
         LASSERT(info->mti_object != NULL);
@@ -380,16 +370,15 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         if (name == NULL)
                 RETURN(-EFAULT);
 
-        ns = info->mti_mdt->mdt_namespace;
         /*step 1: lock parent */
         lhp = &info->mti_lh[MDT_LH_PARENT];
         lhp->mlh_mode = LCK_CR;
-        result = mdt_object_lock(ns, parent, lhp, MDS_INODELOCK_UPDATE);
+        result = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE);
         if (result != 0)
                 RETURN(result);
 
         /*step 2: lookup child's fid by name */
-        result = mdo_lookup(info->mti_ctxt, next, name, &child_fid);
+        result = mdo_lookup(info->mti_ctxt, next, name, child_fid);
         if (result == -EREMOTE) {
                 /* This object is located on remote node */
                 req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD, 
@@ -403,45 +392,42 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         struct mdt_body *repbody;
                         repbody = req_capsule_server_get(&info->mti_pill, 
                                                          &RMF_MDT_BODY);
-                        repbody->fid1 = child_fid;
+                        repbody->fid1 = *child_fid;
                 }
+                GOTO(out_parent, result);
         } 
         if (result != 0)
                 GOTO(out_parent, result);
 
         /*step 3: find the child object by fid & lock it*/
         lhc->mlh_mode = LCK_CR;
-        child = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
-                                     &child_fid, lhc, child_bits);
+        child = mdt_object_find_lock(info, child_fid, lhc, child_bits);
         if (IS_ERR(child))
                 GOTO(out_parent, result = PTR_ERR(child));
 
         /* finally, we can get attr for child. */
-        result = mdt_getattr_pack_msg(info, child);
-        if (result == 0) {
-                result = mdt_getattr_internal(info, child);
-        }
+        result = mdt_getattr_internal(info, child);
         if (result != 0)
-                mdt_object_unlock(ns, child, lhc);
-        EXIT;
-
+                mdt_object_unlock(info, child, lhc);
         mdt_object_put(info->mti_ctxt, child);
+
+        EXIT;
 out_parent:
-        mdt_object_unlock(ns, parent, lhp);
+        mdt_object_unlock(info, parent, lhp);
         return result;
 }
 
 /* normal handler: should release the child lock */
 static int mdt_getattr_name(struct mdt_thread_info *info)
 {
-        struct mdt_lock_handle lhc = {{0}};
+        struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
         int rc;
 
         ENTRY;
 
-        rc = mdt_getattr_name_lock(info, &lhc, MDS_INODELOCK_UPDATE);
-        if (rc == 0 && lustre_handle_is_used(&lhc.mlh_lh))
-                ldlm_lock_decref(&lhc.mlh_lh, lhc.mlh_mode);
+        rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE);
+        if (lustre_handle_is_used(&lhc->mlh_lh))
+                ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
         RETURN(rc);
 }
 
@@ -449,9 +435,6 @@ static struct lu_device_operations mdt_lu_ops;
 
 static int lu_device_is_mdt(struct lu_device *d)
 {
-        /*
-         * XXX for now. Tags in lu_device_type->ldt_something are needed.
-         */
         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
 }
 
@@ -506,7 +489,7 @@ static long mdt_reint_opcode(struct mdt_thread_info *info,
         __u32 *ptr;
         long opc;
 
-        opc = -EINVAL;
+        opc = -EFAULT;
         ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
         if (ptr != NULL) {
                 opc = *ptr;
@@ -563,8 +546,8 @@ static int mdt_object_sync(struct mdt_thread_info *info)
 
 static int mdt_sync(struct mdt_thread_info *info)
 {
-        struct mdt_body *body;
         struct req_capsule *pill = &info->mti_pill;
+        struct mdt_body *body;
         int rc;
         ENTRY;
 
@@ -583,23 +566,25 @@ static int mdt_sync(struct mdt_thread_info *info)
         } else {
                 /* sync an object */
                 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS | HABEO_REFERO);
-                if (rc != 0)
-                        RETURN(rc);
-
-                rc = mdt_object_sync(info);
-                if (rc != 0)
-                        RETURN(rc);
-
-                rc = mo_attr_get(info->mti_ctxt,
-                                 mdt_object_child(info->mti_object),
-                                 &info->mti_attr);
-                if (rc != 0)
-                        RETURN(rc);
-
-                body = req_capsule_server_get(pill, &RMF_MDT_BODY);
-                mdt_pack_attr2body(body, &info->mti_attr);
-                body->fid1 = *mdt_object_fid(info->mti_object);
-                body->valid |= OBD_MD_FLID;
+                if (rc == 0) {
+                        rc = mdt_object_sync(info);
+                        if (rc == 0) {
+                                struct md_object *next;
+                                const struct lu_fid    *fid;
+                                next = mdt_object_child(info->mti_object);
+                                fid = mdt_object_fid(info->mti_object);
+                                rc = mo_attr_get(info->mti_ctxt,
+                                                 next,
+                                                 &info->mti_attr);
+                                if (rc == 0) {
+                                        body = req_capsule_server_get(pill, 
+                                                                &RMF_MDT_BODY);
+                                        mdt_pack_attr2body(body, 
+                                                           &info->mti_attr, 
+                                                           fid);
+                                }
+                        }
+                }
         }
         RETURN(rc);
 }
@@ -707,9 +692,9 @@ int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
              struct lustre_handle *lh, ldlm_mode_t mode,
-             ldlm_policy_data_t *policy)
+             ldlm_policy_data_t *policy,
+             struct ldlm_res_id *res_id)
 {
-        struct ldlm_res_id res_id;
         int flags = 0;
         int rc;
         ENTRY;
@@ -719,7 +704,7 @@ int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
         LASSERT(f != NULL);
 
         /* FIXME: is that correct to have @flags=0 here? */
-        rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, &res_id),
+        rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, res_id),
                               LDLM_IBITS, policy, mode, &flags,
                               ldlm_blocking_ast, ldlm_completion_ast, NULL,
                               NULL, NULL, 0, NULL, lh);
@@ -764,50 +749,59 @@ struct mdt_object *mdt_object_find(const struct lu_context *ctxt,
                 return mdt_obj(o);
 }
 
-int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
+int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, 
                     struct mdt_lock_handle *lh, __u64 ibits)
 {
-        ldlm_policy_data_t p = {
-                .l_inodebits = {
-                        .bits = ibits
-                }
-        };
+        ldlm_policy_data_t *policy = &info->mti_policy;
+        struct ldlm_res_id *res_id = &info->mti_res_id;
+        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
+
         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
         LASSERT(lh->mlh_mode != LCK_MINMODE);
 
-        return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, &p);
+        policy->l_inodebits.bits = ibits;
+
+        return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, policy, res_id);
 }
 
-void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
+void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
                        struct mdt_lock_handle *lh)
 {
+        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
+
         if (lustre_handle_is_used(&lh->mlh_lh)) {
                 fid_unlock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
                 lh->mlh_lh.cookie = 0;
         }
 }
 
-struct mdt_object *mdt_object_find_lock(const struct lu_context *ctxt,
-                                        struct mdt_device *d,
+struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
                                         const struct lu_fid *f,
                                         struct mdt_lock_handle *lh,
                                         __u64 ibits)
 {
         struct mdt_object *o;
 
-        o = mdt_object_find(ctxt, d, f);
+        o = mdt_object_find(info->mti_ctxt, info->mti_mdt, f);
         if (!IS_ERR(o)) {
                 int result;
 
-                result = mdt_object_lock(d->mdt_namespace, o, lh, ibits);
+                result = mdt_object_lock(info, o, lh, ibits);
                 if (result != 0) {
-                        mdt_object_put(ctxt, o);
+                        mdt_object_put(info->mti_ctxt, o);
                         o = ERR_PTR(result);
                 }
         }
         return o;
 }
 
+void mdt_object_unlock_put(struct mdt_thread_info * info,
+                           struct mdt_object * o,
+                           struct mdt_lock_handle *lh)
+{
+        mdt_object_unlock(info, o, lh);
+        mdt_object_put(info->mti_ctxt, o);
+}
 
 static struct mdt_handler *mdt_handler_find(__u32 opc)
 {
@@ -954,7 +948,7 @@ static int mdt_req_handle(struct mdt_thread_info *info,
 
                 dlm_req = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
                 if (dlm_req != NULL) {
-                        if (info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME)
+                        if (info->mti_mdt->mdt_opts.mo_compat_resname)
                                 result = mdt_lock_resname_compat(info->mti_mdt,
                                                                  dlm_req);
                         info->mti_dlm_req = dlm_req;
@@ -981,7 +975,7 @@ static int mdt_req_handle(struct mdt_thread_info *info,
         LASSERT(current->journal_info == NULL);
 
         if (result == 0 && flags & HABEO_CLAVIS &&
-            info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME) {
+            info->mti_mdt->mdt_opts.mo_compat_resname) {
                 struct ldlm_reply *dlm_rep;
 
                 dlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
@@ -1403,7 +1397,8 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
         struct ldlm_lock *new_lock = NULL;
         struct ptlrpc_request *req = mdt_info_req(info);
         struct ldlm_reply *ldlm_rep;
-        struct mdt_lock_handle lhc = {{0}};
+        struct mdt_lock_handle tmp_lock;
+        struct mdt_lock_handle *lhc = &tmp_lock;
         int    rc;
 
         ENTRY;
@@ -1418,12 +1413,12 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
         default:
                 CERROR("Unhandled till now");
                 RETURN(-EINVAL);
-                break;
         }
 
-        rc = mdt_getattr_name_lock(info, &lhc, child_bits);
-        ldlm_rep = req_capsule_server_get(&info->mti_pill, 
-                                          &RMF_DLM_REP);
+
+        mdt_lock_handle_init(lhc);
+        rc = mdt_getattr_name_lock(info, lhc, child_bits);
+        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
         intent_set_disposition(ldlm_rep, DISP_IT_EXECD);
         intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
 
@@ -1439,12 +1434,12 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
 
         intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
 
-        new_lock = ldlm_handle2lock(&lhc.mlh_lh);
+        new_lock = ldlm_handle2lock(&lhc->mlh_lh);
         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
                 RETURN(0);
 
         LASSERTF(new_lock != NULL, "op %d lockh "LPX64"\n",
-                 opcode, lhc.mlh_lh.cookie);
+                 opcode, lhc->mlh_lh.cookie);
 
         *lockp = new_lock;
 
@@ -1671,8 +1666,6 @@ static int mdt_seq_init(const struct lu_context *ctx,
                                              m->mdt_bottom, uuid,
                                              LUSTRE_SEQ_CTLR,
                                              ctx);
-                        if (rc)
-                                mdt_seq_fini(ctx, m);
                 } else
                         rc = -ENOMEM;
         }
@@ -1685,11 +1678,12 @@ static int mdt_seq_init(const struct lu_context *ctx,
                                      m->mdt_bottom, uuid,
                                      LUSTRE_SEQ_SRV,
                                      ctx);
-                if (rc)
-                        mdt_seq_fini(ctx, m);
         } else
                 rc = -ENOMEM;
 
+        if (rc)
+                mdt_seq_fini(ctx, m);
+
         RETURN(rc);
 }
 
@@ -1698,14 +1692,26 @@ static int mdt_seq_init_ctlr(const struct lu_context *ctx,
                              struct mdt_device *m,
                              struct lustre_cfg *cfg)
 {
-        struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
+        struct lu_site    *ls = m->mdt_md_dev.md_lu_dev.ld_site;
         struct obd_device *mdc;
-        struct obd_uuid uuid;
-        char *uuid_str;
-        int rc, index;
+        struct obd_uuid   *uuidp;
+        char              *uuid_str;
+        int               rc;
+        int               index;
+        struct mdt_thread_info *info;
+        char *p, *index_string = lustre_cfg_string(cfg, 2);
         ENTRY;
 
-        index = simple_strtol(lustre_cfg_string(cfg, 2), NULL, 10);
+        info = lu_context_key_get(ctx, &mdt_thread_key);
+        uuidp = &info->mti_u.uuid;
+
+        LASSERT(index_string);
+
+        index = simple_strtol(index_string, &p, 10);
+        if (*p) {
+                CERROR("Invalid index in lustre_cgf, offset 2\n");
+                RETURN(-EINVAL);
+        }
 
         /* check if this is first MDC add and controller is not yet
          * initialized. */
@@ -1713,8 +1719,8 @@ static int mdt_seq_init_ctlr(const struct lu_context *ctx,
                 RETURN(0);
 
         uuid_str = lustre_cfg_string(cfg, 1);
-        obd_str2uuid(&uuid, uuid_str);
-        mdc = class_find_client_obd(&uuid, LUSTRE_MDC_NAME, NULL);
+        obd_str2uuid(uuidp, uuid_str);
+        mdc = class_find_client_obd(uuidp, LUSTRE_MDC_NAME, NULL);
         if (!mdc) {
                 CERROR("can't find controller MDC by uuid %s\n",
                        uuid_str);
@@ -1880,7 +1886,7 @@ err_mdt_svc:
         ptlrpc_unregister_service(m->mdt_service);
         m->mdt_service = NULL;
 
-        return (rc);
+        return rc;
 }
 
 static void mdt_stack_fini(const struct lu_context *ctx,
@@ -2009,6 +2015,7 @@ static void mdt_fini(const struct lu_context *ctx, struct mdt_device *m)
 /*
         mdt_fs_cleanup(ctx, m);
 */
+        ping_evictor_stop();
         mdt_stop_ptlrpc_service(m);
 
         /* finish the stack */
@@ -2055,11 +2062,10 @@ static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
         m->mdt_max_mdsize = MAX_MD_SIZE;
         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
 
-        m->mdt_flags = 0;
-
         /* Temporary. should parse mount option. */
         m->mdt_opts.mo_user_xattr = 0;
         m->mdt_opts.mo_acl = 0;
+        m->mdt_opts.mo_compat_resname = 0;
 
 
         OBD_ALLOC_PTR(s);
@@ -2104,6 +2110,8 @@ static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
         rc = mdt_start_ptlrpc_service(m);
         if (rc)
                 GOTO(err_free_ns, rc);
+
+        ping_evictor_start();
 /* TODO: wait for read & write
         rc = mdt_fs_setup(ctx, m);
         if (rc)
index ee6ada9..f94e0ef 100644 (file)
@@ -108,11 +108,6 @@ struct mdt_device {
         struct ptlrpc_client       mdt_ldlm_client;
         /* underlying device */
         struct md_device          *mdt_child;
-        /*
-         * Device flags, taken from enum mdt_flags. No locking (so far) is
-         * necessary.
-         */
-        unsigned long              mdt_flags;
         struct dt_device          *mdt_bottom;
         /*
          * Options bit-fields.
@@ -120,6 +115,7 @@ struct mdt_device {
         struct {
                 signed int         mo_user_xattr :1;
                 signed int         mo_acl        :1;
+                signed int         mo_compat_resname:1;
         } mdt_opts;
         /* Transaction related stuff here */
         spinlock_t                 mdt_transno_lock;
@@ -141,14 +137,6 @@ struct mdt_device {
 #define MDT_ROCOMPAT_SUPP       (OBD_ROCOMPAT_LOVOBJID)
 #define MDT_INCOMPAT_SUPP       (OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR)
 
-enum mdt_flags {
-        /*
-         * This mdt works with legacy clients with different resource name
-         * encoding (pre-fid, etc.).
-         */
-        MDT_CL_COMPAT_RESNAME = 1 << 0,
-};
-
 struct mdt_object {
         struct lu_object_header mot_header;
         struct md_object        mot_obj;
@@ -166,6 +154,8 @@ enum {
 enum {
         MDT_LH_PARENT,
         MDT_LH_CHILD,
+        MDT_LH_OLD,
+        MDT_LH_NEW,
         MDT_LH_NR
 };
 
@@ -184,6 +174,7 @@ struct mdt_reint_record {
 
 
 #define XATTR_NAME_ACL_ACCESS   "system.posix_acl_access"
+#define XATTR_NAME_LOV          "lov"
 
 /*
  * Common data shared by mdt-level handlers. This is allocated per-thread to
@@ -240,6 +231,15 @@ struct mdt_thread_info {
          * frequent.
          */
         struct kstatfs             mti_sfs;
+
+        /* temporary stuff used by thread */
+        struct lu_fid              mti_tmp_fid1;
+        struct lu_fid              mti_tmp_fid2;
+        ldlm_policy_data_t         mti_policy;
+        struct ldlm_res_id         mti_res_id;
+        union {
+                struct obd_uuid    uuid;
+        } mti_u;
 };
 
 static inline struct md_device_operations *mdt_child_ops(struct mdt_device * m)
@@ -275,26 +275,30 @@ static inline const struct lu_fid *mdt_object_fid(struct mdt_object *o)
         return lu_object_fid(&o->mot_obj.mo_lu);
 }
 
-int mdt_object_lock(struct ldlm_namespace *, 
+int mdt_object_lock(struct mdt_thread_info *,
                     struct mdt_object *,
                     struct mdt_lock_handle *, 
                     __u64);
 
-void mdt_object_unlock(struct ldlm_namespace *, 
+void mdt_object_unlock(struct mdt_thread_info *, 
                        struct mdt_object *,
                        struct mdt_lock_handle *);
 
 struct mdt_object *mdt_object_find(const struct lu_context *,
                                    struct mdt_device *, 
                                    const struct lu_fid *);
-struct mdt_object *mdt_object_find_lock(const struct lu_context *,
-                                        struct mdt_device *,
+struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *,
                                         const struct lu_fid *,
-                                        struct mdt_lock_handle *, __u64);
+                                        struct mdt_lock_handle *,
+                                        __u64);
+void mdt_object_unlock_put(struct mdt_thread_info *, 
+                           struct mdt_object *,
+                           struct mdt_lock_handle *);
 
 int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op);
 int mdt_reint_rec(struct mdt_thread_info *);
-void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr);
+void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr, 
+                        const struct lu_fid *fid);
 
 int mdt_getxattr(struct mdt_thread_info *info);
 int mdt_setxattr(struct mdt_thread_info *info);
@@ -341,7 +345,13 @@ int mdt_close(struct mdt_thread_info *info);
 
 int mdt_done_writing(struct mdt_thread_info *info);
 
-
+/* debug issues helper starts here*/
+#define MDT_FAIL_CHECK(id)                                              \
+({                                                                      \
+        if (OBD_FAIL_CHECK(id))                                         \
+                CERROR(LUSTRE_MDT0_NAME": " #id " test failed\n");      \
+        OBD_FAIL_CHECK(id);                                             \
+})
 
 #endif /* __KERNEL__ */
 #endif /* _MDT_H */
index c574855..3838868 100644 (file)
@@ -97,13 +97,14 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
                 rr->rr_fid1 = &rec->cr_fid1;
                 rr->rr_fid2 = &rec->cr_fid2;
                 attr->la_mode = rec->cr_mode;
-                rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
                 attr->la_rdev  = rec->cr_rdev;
                 attr->la_uid   = rec->cr_fsuid;
                 attr->la_gid   = rec->cr_fsgid;
                 attr->la_flags = rec->cr_flags;
                 attr->la_ctime = rec->cr_time;
                 attr->la_mtime = rec->cr_time;
+                rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
+
                 if (req_capsule_field_present(pill, &RMF_SYMTGT))
                         rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT);
         } else
index 29c2afe..40df23d 100644 (file)
@@ -110,23 +110,21 @@ static int mdt_object_open(struct mdt_thread_info *info,
         if (rc != 0)
                 GOTO(out, rc);
 
-        mdt_pack_attr2body(repbody, &info->mti_attr);
-        repbody->fid1 = *mdt_object_fid(o);
-        repbody->valid |= OBD_MD_FLID;
+        mdt_pack_attr2body(repbody, &info->mti_attr, mdt_object_fid(o));
 
 /*
         rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(o),
                           lmm, info->mti_mdt->mdt_max_mdsize, "lov");
         if (rc < 0)
                 GOTO(out, rc = -EINVAL);
-
+*/
+        rc = 0;
         if (S_ISDIR(info->mti_attr.la_mode))
                 repbody->valid |= OBD_MD_FLDIREA;
         else
                 repbody->valid |= OBD_MD_FLEASIZE;
         repbody->eadatasize = rc;
         rc = 0;
-*/
         mfd = mdt_mfd_new();
         if (mfd == NULL) {
                 CERROR("mds: out of memory\n");
@@ -193,14 +191,12 @@ int mdt_lock_new_child(struct mdt_thread_info *info,
 
         mdt_lock_handle_init(&lockh);
         lockh.mlh_mode = LCK_EX;
-        rc = mdt_object_lock(info->mti_mdt->mdt_namespace, 
-                             o, &lockh, MDS_INODELOCK_UPDATE);
+        rc = mdt_object_lock(info, o, &lockh, MDS_INODELOCK_UPDATE);
 
         if (rc != ELDLM_OK)
                 CERROR("can not mdt_object_lock: %d\n", rc);
         else if (child_lockh == &lockh)
-                mdt_object_unlock(info->mti_mdt->mdt_namespace, 
-                                  o, &lockh);
+                mdt_object_unlock(info, o, &lockh);
 
         RETURN(rc);
 }
@@ -230,11 +226,13 @@ int mdt_reint_open(struct mdt_thread_info *info)
 
         lh = &info->mti_lh[MDT_LH_PARENT];
         lh->mlh_mode = LCK_PW;
-        parent = mdt_object_find_lock(info->mti_ctxt, mdt, rr->rr_fid1,
+        parent = mdt_object_find_lock(info, rr->rr_fid1,
                                       lh, MDS_INODELOCK_UPDATE);
-        if (IS_ERR(parent))
+        if (IS_ERR(parent)) {
+                intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
+                intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
                 GOTO(out, result = PTR_ERR(parent));
-
+        }
         result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
                             rr->rr_name, &child_fid);
         if (result && result != -ENOENT) {
@@ -273,11 +271,12 @@ int mdt_reint_open(struct mdt_thread_info *info)
                 if (result != 0)
                         GOTO(out_child, result);
                 created = 1;
-        } else
-                intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
+        }
 
         /* Open it now. */
         result = mdt_object_open(info, child, info->mti_attr.la_flags);
+        if (result == 0)
+                intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
         GOTO(destroy_child, result);
 
 destroy_child:
@@ -290,8 +289,7 @@ destroy_child:
 out_child:
         mdt_object_put(info->mti_ctxt, child);
 out_parent:
-        mdt_object_unlock(mdt->mdt_namespace, parent, lh);
-        mdt_object_put(info->mti_ctxt, parent);
+        mdt_object_unlock_put(info, parent, lh);
 out:
         return result;
 }
@@ -350,9 +348,7 @@ int mdt_close(struct mdt_thread_info *info)
         rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o),
                          &info->mti_attr);
         if (rc == 0) {
-                mdt_pack_attr2body(repbody, &info->mti_attr);
-                repbody->fid1 = *mdt_object_fid(o);
-                repbody->valid |= OBD_MD_FLID;
+                mdt_pack_attr2body(repbody, &info->mti_attr, mdt_object_fid(o));
 /*
                 rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(o),
                                   lmm, info->mti_mdt->mdt_max_mdsize, "lov");
index fd12205..6980bb6 100644 (file)
@@ -54,7 +54,7 @@ static int mdt_md_create(struct mdt_thread_info *info)
         lh = &info->mti_lh[MDT_LH_PARENT];
         lh->mlh_mode = LCK_PW;
 
-        parent = mdt_object_find_lock(info->mti_ctxt, mdt, rr->rr_fid1,
+        parent = mdt_object_find_lock(info, rr->rr_fid1,
                                       lh, MDS_INODELOCK_UPDATE);
         if (IS_ERR(parent))
                 RETURN(PTR_ERR(parent));
@@ -71,7 +71,11 @@ static int mdt_md_create(struct mdt_thread_info *info)
                                          mdt_object_child(child), 
                                          attr);
                         if (rc == 0) {
-                                mdt_pack_attr2body(repbody, attr);
+                                /*parent and child are all local. */
+                                mdt_pack_attr2body(repbody, attr,
+                                                   mdt_object_fid(child));
+                        } else if (rc == -EREMOTE) {
+                                /* parent is local, child is remote. */
                                 repbody->fid1 = *mdt_object_fid(child);
                                 repbody->valid |= OBD_MD_FLID;
                         }
@@ -79,8 +83,7 @@ static int mdt_md_create(struct mdt_thread_info *info)
                 mdt_object_put(info->mti_ctxt, child);
         } else
                 rc = PTR_ERR(child);
-        mdt_object_unlock(mdt->mdt_namespace, parent, lh);
-        mdt_object_put(info->mti_ctxt, parent);
+        mdt_object_unlock_put(info, parent, lh);
         RETURN(rc);
 }
 
@@ -107,9 +110,8 @@ static int mdt_md_mkobj(struct mdt_thread_info *info)
                                          next, 
                                          &info->mti_attr);
                         if (rc == 0) {
-                                mdt_pack_attr2body(repbody, &info->mti_attr);
-                                repbody->fid1 = *mdt_object_fid(o);
-                                repbody->valid |= OBD_MD_FLID;
+                                mdt_pack_attr2body(repbody, &info->mti_attr,
+                                                   mdt_object_fid(o));
                         }
                 }
                 mdt_object_put(info->mti_ctxt, o);
@@ -120,17 +122,21 @@ static int mdt_md_mkobj(struct mdt_thread_info *info)
 }
 
 
+/* In the raw-setattr case, we lock the child inode.
+ * In the write-back case or if being called from open, 
+ *               the client holds a lock already.
+ * We use the ATTR_FROM_OPEN flag to tell these cases apart. */
 static int mdt_reint_setattr(struct mdt_thread_info *info)
 {
-        struct lu_attr *attr = &info->mti_attr;
+        struct lu_attr          *attr = &info->mti_attr;
         struct mdt_reint_record *rr = &info->mti_rr;
-        struct ptlrpc_request *req = mdt_info_req(info);
-        struct mdt_object *mo;
-        struct mdt_lock_handle *lh;
-        struct lu_attr tmp_attr;
-        struct mdt_body *repbody;
-        int rc;
-        int locked = 0;
+        struct ptlrpc_request   *req = mdt_info_req(info);
+        struct mdt_object       *mo;
+        struct md_object        *next;
+        struct mdt_lock_handle  *lh;
+        struct mdt_body         *repbody;
+        /*__u64                   valid = attr->la_valid;*/
+        int                     rc;
 
         ENTRY;
 
@@ -152,57 +158,54 @@ static int mdt_reint_setattr(struct mdt_thread_info *info)
                 lh = &info->mti_lh[MDT_LH_PARENT];
                 lh->mlh_mode = LCK_EX;
  
-                mo = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt, 
-                                          rr->rr_fid1, lh, lockpart);
+                mo = mdt_object_find_lock(info, rr->rr_fid1, lh, lockpart);
                 
                 if (IS_ERR(mo))
                         RETURN(rc = PTR_ERR(mo));
-                locked = 1;
         }
 
-        if (lu_object_assert_not_exists(info->mti_ctxt, &mo->mot_obj.mo_lu))
+        next = mdt_object_child(mo);
+        if (lu_object_exists(info->mti_ctxt, &mo->mot_obj.mo_lu) <= 0)
                 GOTO(out_unlock, rc = -ENOENT);
 
         if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
                 GOTO(out_unlock, rc = -EROFS);
 
-        rc = mo_attr_set(info->mti_ctxt, mdt_object_child(mo), attr);
+        rc = mo_attr_set(info->mti_ctxt, next, attr);
         if (rc != 0)
                 GOTO(out_unlock, rc);
         
-        rc = mo_attr_get(info->mti_ctxt, mdt_object_child(mo), &tmp_attr);
+        rc = mo_attr_get(info->mti_ctxt, next, attr);
         if (rc != 0)
                 GOTO(out_unlock, rc);
 
         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
-        mdt_pack_attr2body(repbody, &tmp_attr);
-        repbody->fid1 = *mdt_object_fid(mo);
-        repbody->valid |= OBD_MD_FLID;
+        mdt_pack_attr2body(repbody, attr, mdt_object_fid(mo));
        
         /* don't return OST-specific attributes if we didn't just set them. 
-        if (attr->la_valid & ATTR_SIZE)
+        if (valid & ATTR_SIZE)
                 repbody->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
-        if (attr->la_valid & (ATTR_MTIME | ATTR_MTIME_SET))
+        if (valid & (ATTR_MTIME | ATTR_MTIME_SET))
                 repbody->valid |= OBD_MD_FLMTIME;
-        if (attr->la_valid & (ATTR_ATIME | ATTR_ATIME_SET))
+        if (valid & (ATTR_ATIME | ATTR_ATIME_SET))
                 repbody->valid |= OBD_MD_FLATIME;
         */
         /* FIXME: I have to combine the attr_set & xattr_set into one single
                   transaction. How can I?
-        rc = mo_xattr_set(info->mti_ctxt, mdt_object_child(mo),
-                          rr->rr_eadata, rr->rr_eadatalen, "lov");
+         */
+
+        if (rr->rr_eadatalen > 0)
+                rc = mo_xattr_set(info->mti_ctxt, next,
+                                  rr->rr_eadata, rr->rr_eadatalen, 
+                                  XATTR_NAME_LOV);
         if (rc)
                 GOTO(out_unlock, rc);
-         */ 
 
         /* FIXME & TODO Please deal with logcookies here*/
         GOTO(out_unlock, rc);
 out_unlock:
-        if (locked) {
-                mdt_object_unlock(info->mti_mdt->mdt_namespace, mo, lh);
-        }
-        mdt_object_put(info->mti_ctxt, mo);
-        return (rc);
+        mdt_object_unlock_put(info, mo, lh);
+        return rc;
 }
 
 
@@ -238,14 +241,14 @@ static int mdt_reint_create(struct mdt_thread_info *info)
 static int mdt_reint_unlink(struct mdt_thread_info *info)
 {
         struct mdt_reint_record *rr = &info->mti_rr;
-        struct ptlrpc_request *req = mdt_info_req(info);
-        struct mdt_object *mp;
-        struct mdt_object *mc;
+        struct ptlrpc_request   *req = mdt_info_req(info);
+        struct mdt_object       *mp;
+        struct mdt_object       *mc;
         struct mdt_lock_handle *lhp;
         struct mdt_lock_handle *lhc;
-        struct mdt_body *repbody;
-        struct lu_fid child_fid;
-        int rc;
+        struct mdt_body         *repbody;
+        struct lu_fid           *child_fid = &info->mti_tmp_fid1;
+        int                     rc;
 
         ENTRY;
 
@@ -257,8 +260,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info)
         /* step 1: lock the parent */
         lhp = &info->mti_lh[MDT_LH_PARENT];
         lhp->mlh_mode = LCK_EX;
-        mp = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
-                                  rr->rr_fid1, lhp, MDS_INODELOCK_UPDATE);
+        mp = mdt_object_find_lock(info, rr->rr_fid1, lhp, MDS_INODELOCK_UPDATE);
         if (IS_ERR(mp))
                 RETURN(PTR_ERR(mp));
 
@@ -271,26 +273,25 @@ static int mdt_reint_unlink(struct mdt_thread_info *info)
                 GOTO(out_unlock_parent, rc);
         }
 
+        repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
         /*step 2: find & lock the child */
         lhc = &info->mti_lh[MDT_LH_CHILD];
         lhc->mlh_mode = LCK_EX;
         rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mp),
-                        rr->rr_name, &child_fid);
-        if (rc) {
-                GOTO(out_unlock_parent, rc);
+                        rr->rr_name, child_fid);
+        if (rc == -EREMOTE) {
+                repbody->fid1 = *child_fid;
+                repbody->valid |= OBD_MD_FLID;
         }
+        if (rc != 0)
+                 GOTO(out_unlock_parent, rc);
 
-        mc = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt, &child_fid,
-                                  lhc, MDS_INODELOCK_FULL);
+        mc = mdt_object_find_lock(info, child_fid, lhc, MDS_INODELOCK_FULL);
         if (IS_ERR(mc))
                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
 
-        /* NB: Be aware of Bug 2029 */
         
-        /*step 3: deal with orphan */
-
-        repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
-
+        /*step 3:  do some checking*/
         if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
                 GOTO(out_unlock_child, rc = -EROFS);
 
@@ -298,23 +299,11 @@ static int mdt_reint_unlink(struct mdt_thread_info *info)
         rc = mdo_unlink(info->mti_ctxt, mdt_object_child(mp),
                         mdt_object_child(mc), rr->rr_name);
 
-        /*step 5: orphan handling & recovery issue */
-        if (rc == 0) {
-        /* If this is potentially the last reference to this inode, get the
-         * OBD EA data first so the client can destroy OST objects.  We
-         * only do the object removal later if no open files/links remain. */
-
-                /* FIXME & TODO:
-                 * 1. orphan handling here
-                 * 2. Please deal with logcookies here */ 
-        }
         GOTO(out_unlock_child, rc);
 out_unlock_child:
-        mdt_object_unlock(info->mti_mdt->mdt_namespace, mc, lhc);
-        mdt_object_put(info->mti_ctxt, mc);
+        mdt_object_unlock_put(info, mc, lhc);
 out_unlock_parent:
-        mdt_object_unlock(info->mti_mdt->mdt_namespace, mp, lhp);
-        mdt_object_put(info->mti_ctxt, mp);
+        mdt_object_unlock_put(info, mp, lhp);
         return rc;
 }
 
@@ -338,8 +327,7 @@ static int mdt_reint_link(struct mdt_thread_info *info)
         /* step 1: lock the source */
         lhs = &info->mti_lh[MDT_LH_PARENT];
         lhs->mlh_mode = LCK_EX;
-        ms = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
-                                  rr->rr_fid1, lhs, MDS_INODELOCK_UPDATE);
+        ms = mdt_object_find_lock(info, rr->rr_fid1, lhs, MDS_INODELOCK_UPDATE);
         if (IS_ERR(ms))
                 RETURN(PTR_ERR(ms));
 
@@ -354,8 +342,7 @@ static int mdt_reint_link(struct mdt_thread_info *info)
         /*step 2: find & lock the target */
         lht = &info->mti_lh[MDT_LH_CHILD];
         lht->mlh_mode = LCK_EX;
-        mt = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
-                                  rr->rr_fid2, lht, MDS_INODELOCK_UPDATE);
+        mt = mdt_object_find_lock(info, rr->rr_fid2, lht, MDS_INODELOCK_UPDATE);
         if (IS_ERR(mt))
                 GOTO(out_unlock_source, rc = PTR_ERR(mt));
 
@@ -368,11 +355,9 @@ static int mdt_reint_link(struct mdt_thread_info *info)
         GOTO(out_unlock_target, rc);
 
 out_unlock_target:
-        mdt_object_unlock(info->mti_mdt->mdt_namespace, mt, lht);
-        mdt_object_put(info->mti_ctxt, mt);
+        mdt_object_unlock_put(info, mt, lht);
 out_unlock_source:
-        mdt_object_unlock(info->mti_mdt->mdt_namespace, ms, lhs);
-        mdt_object_put(info->mti_ctxt, ms);
+        mdt_object_unlock_put(info, ms, lhs);
         return rc;
 }
 
@@ -380,14 +365,13 @@ out_unlock_source:
 static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
 {
         struct mdt_reint_record *rr = &info->mti_rr;
-        struct ptlrpc_request *req = mdt_info_req(info);
-        struct mdt_object *mtgtdir;
-        struct mdt_object *mtgt = NULL;
-        struct mdt_lock_handle *lh_tgtdir;
-        struct mdt_lock_handle *lh_tgt;
-        struct lu_fid tgt_fid;
-        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
-        int rc;
+        struct ptlrpc_request   *req = mdt_info_req(info);
+        struct mdt_object       *mtgtdir;
+        struct mdt_object       *mtgt = NULL;
+        struct mdt_lock_handle  *lh_tgtdir;
+        struct mdt_lock_handle  *lh_tgt;
+        struct lu_fid           *tgt_fid = &info->mti_tmp_fid1;
+        int                     rc;
 
         ENTRY;
 
@@ -398,15 +382,14 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
         /* step 1: lookup & lock the tgt dir */
         lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
         lh_tgtdir->mlh_mode = LCK_PW;
-        mtgtdir = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
-                                       rr->rr_fid1, lh_tgtdir, 
+        mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir, 
                                        MDS_INODELOCK_UPDATE);
         if (IS_ERR(mtgtdir))
                 GOTO(out, rc = PTR_ERR(mtgtdir));
 
         /*step 2: find & lock the target object if exists*/
         rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mtgtdir),
-                        rr->rr_tgt, &tgt_fid);
+                        rr->rr_tgt, tgt_fid);
         if (rc && rc != -ENOENT)
                 GOTO(out_unlock_tgtdir, rc);
 
@@ -417,8 +400,7 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
                 lh_tgt = &info->mti_lh[MDT_LH_CHILD];
                 lh_tgt->mlh_mode = LCK_EX;
  
-                mtgt = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
-                                            &tgt_fid, lh_tgt, 
+                mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt, 
                                             MDS_INODELOCK_LOOKUP);
                 if (IS_ERR(mtgt))
                         GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
@@ -436,12 +418,10 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
 
 out_unlock_tgt:
         if (mtgt) {
-                mdt_object_unlock(ns, mtgt, lh_tgt);
-                mdt_object_put(info->mti_ctxt, mtgt);
+                mdt_object_unlock_put(info, mtgt, lh_tgt);
         }
 out_unlock_tgtdir:
-        mdt_object_unlock(ns, mtgtdir, lh_tgtdir);
-        mdt_object_put(info->mti_ctxt, mtgtdir);
+        mdt_object_unlock_put(info, mtgtdir, lh_tgtdir);
 out:
         return rc;
 }
@@ -450,19 +430,18 @@ out:
 static int mdt_reint_rename(struct mdt_thread_info *info)
 {
         struct mdt_reint_record *rr = &info->mti_rr;
-        struct req_capsule *pill = &info->mti_pill;
-        struct ptlrpc_request *req = mdt_info_req(info);
-        struct mdt_object *msrcdir;
-        struct mdt_object *mtgtdir;
-        struct mdt_object *mold;
-        struct mdt_object *mnew = NULL;
+        struct req_capsule      *pill = &info->mti_pill;
+        struct ptlrpc_request   *req = mdt_info_req(info);
+        struct mdt_object       *msrcdir;
+        struct mdt_object       *mtgtdir;
+        struct mdt_object       *mold;
+        struct mdt_object       *mnew = NULL;
         struct mdt_lock_handle *lh_srcdirp;
         struct mdt_lock_handle *lh_tgtdirp;
-        struct mdt_lock_handle lh_old;
-        struct mdt_lock_handle lh_new;
-        struct lu_fid old_fid = {0};
-        struct lu_fid new_fid = {0};
-        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
+        struct mdt_lock_handle *lh_oldp;
+        struct mdt_lock_handle *lh_newp;
+        struct lu_fid          *old_fid = &info->mti_tmp_fid1;
+        struct lu_fid          *new_fid = &info->mti_tmp_fid2;
         int rc;
 
         ENTRY;
@@ -482,8 +461,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info)
         /* step 1: lock the source dir */
         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
         lh_srcdirp->mlh_mode = LCK_EX;
-        msrcdir = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
-                                       rr->rr_fid1, lh_srcdirp, 
+        msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp, 
                                        MDS_INODELOCK_UPDATE);
         if (IS_ERR(msrcdir))
                 GOTO(out, rc = PTR_ERR(msrcdir));
@@ -491,21 +469,20 @@ static int mdt_reint_rename(struct mdt_thread_info *info)
         /*step 2: find & lock the target dir*/
         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
         lh_tgtdirp->mlh_mode = LCK_EX;
-        mtgtdir = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
-                                       rr->rr_fid2, lh_tgtdirp, 
+        mtgtdir = mdt_object_find_lock(info, rr->rr_fid2, lh_tgtdirp, 
                                        MDS_INODELOCK_UPDATE);
         if (IS_ERR(mtgtdir))
                 GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
         
         /*step 3: find & lock the old object*/
         rc = mdo_lookup(info->mti_ctxt, mdt_object_child(msrcdir),
-                        rr->rr_name, &old_fid);
+                        rr->rr_name, old_fid);
         if (rc) 
                 GOTO(out_unlock_target, rc);
  
-        lh_old.mlh_mode = LCK_EX;
-        mold = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
-                                    &old_fid, &lh_old
+        lh_oldp = &info->mti_lh[MDT_LH_OLD];
+        lh_oldp->mlh_mode = LCK_EX;
+        mold = mdt_object_find_lock(info, old_fid, lh_oldp
                                     MDS_INODELOCK_LOOKUP);
         if (IS_ERR(mold))
                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
@@ -513,15 +490,15 @@ static int mdt_reint_rename(struct mdt_thread_info *info)
         /*step 4: find & lock the new object*/
         /* new target object may not exist now */
         rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mtgtdir),
-                        rr->rr_tgt, &new_fid);
+                        rr->rr_tgt, new_fid);
         if (rc && rc != -ENOENT)
                 GOTO(out_unlock_old, rc);
 
         if (rc == 0) { 
                 /* the new_fid should have been filled at this moment*/
-                lh_new.mlh_mode = LCK_EX;
-                mnew = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
-                                            &new_fid, &lh_new
+                lh_newp = &info->mti_lh[MDT_LH_NEW];
+                lh_newp->mlh_mode = LCK_EX;
+                mnew = mdt_object_find_lock(info, new_fid, lh_newp
                                             MDS_INODELOCK_FULL);
                 if (IS_ERR(mnew))
                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
@@ -533,7 +510,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info)
 
         /* step 6: rename it */
         rc = mdo_rename(info->mti_ctxt, mdt_object_child(msrcdir),
-                        mdt_object_child(mtgtdir), &old_fid,
+                        mdt_object_child(mtgtdir), old_fid,
                         rr->rr_name, mnew ? mdt_object_child(mnew): NULL,
                         rr->rr_tgt);
         if (rc)
@@ -545,18 +522,14 @@ static int mdt_reint_rename(struct mdt_thread_info *info)
 
 out_unlock_new:
         if (mnew) {
-                mdt_object_unlock(ns, mnew, &lh_new);
-                mdt_object_put(info->mti_ctxt, mnew);
+                mdt_object_unlock_put(info, mnew, lh_newp);
         }
 out_unlock_old:
-        mdt_object_unlock(ns, mold, &lh_old);
-        mdt_object_put(info->mti_ctxt, mold);
+        mdt_object_unlock_put(info, mold, lh_oldp);
 out_unlock_target:
-        mdt_object_unlock(ns, mtgtdir, lh_tgtdirp);
-        mdt_object_put(info->mti_ctxt, mtgtdir);
+        mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp);
 out_unlock_source:
-        mdt_object_unlock(ns, msrcdir, lh_srcdirp);
-        mdt_object_put(info->mti_ctxt, msrcdir);
+        mdt_object_unlock_put(info, msrcdir, lh_srcdirp);
 out:
         return rc;
 }
index f17339e..01b88bd 100644 (file)
 /* return EADATA length to the caller. negative value means error */
 static int mdt_getxattr_pack_reply(struct mdt_thread_info * info)
 {
-        struct req_capsule *pill = &info->mti_pill ;
-        struct ptlrpc_request *req = mdt_info_req(info);
-        char *xattr_name;
-        int rc;
+        struct req_capsule     *pill = &info->mti_pill ;
+        struct ptlrpc_request  *req = mdt_info_req(info);
+        char                   *xattr_name;
+        __u64                   valid = info->mti_body->valid;
+        const char              user_string[] = "user.";
+        int                     rc;
+
+        if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETXATTR_PACK)) {
+                return -ENOMEM;
+        }
 
+        if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_XATTR) &&
+             (strncmp(xattr_name, user_string, sizeof(user_string) - 1) == 0))
+                return -EOPNOTSUPP;
         /* Imagine how many bytes we need */
-        if (info->mti_body->valid & OBD_MD_FLXATTR) {
+        if ((valid & OBD_MD_FLXATTR) == OBD_MD_FLXATTR) {
                 xattr_name = req_capsule_client_get(pill, &RMF_NAME);
                 if (!xattr_name) {
-                        CERROR("can't extract xattr name for getxattr\n");
                         return -EFAULT;
                 }
-
-                if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_XATTR) &&
-                    (strncmp(xattr_name, "user.", 5) == 0))
-                        return -EOPNOTSUPP;
-
                 rc = mo_xattr_get(info->mti_ctxt, 
                                   mdt_object_child(info->mti_object), 
                                   NULL, 0, xattr_name);
-        } else if (info->mti_body->valid & OBD_MD_FLXATTRLS) {
+        } else if ((valid & OBD_MD_FLXATTRLS) == OBD_MD_FLXATTRLS) {
                 rc = mo_xattr_list(info->mti_ctxt, 
                                    mdt_object_child(info->mti_object),
                                    NULL, 0);
@@ -80,11 +83,6 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info)
         }
         req_capsule_set_size(pill, &RMF_EADATA, RCL_SERVER, rc);
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETXATTR_PACK)) {
-                CERROR("failed MDS_GETXATTR_PACK test\n");
-                return -ENOMEM;
-        }
-
         rc = req_capsule_pack(pill);
         return rc;
 }
@@ -103,18 +101,17 @@ int mdt_getxattr(struct mdt_thread_info *info)
         LASSERT(info->mti_object != NULL);
         LASSERT(lu_object_assert_exists(info->mti_ctxt,
                                  &info->mti_object->mot_obj.mo_lu));
-        ENTRY;
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETXATTR_PACK)) {
-                CERROR(LUSTRE_MDT0_NAME":getxattr pack_reply failed\n");
+        if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETXATTR_PACK)) {
                 RETURN(rc = -ENOMEM);
         }
 
         next = mdt_object_child(info->mti_object);
 
         rc = mdt_getxattr_pack_reply(info);
-        if (rc < 0)
+        if (rc != 0)
                 RETURN(rc);
+
         buf = req_capsule_server_get(&info->mti_pill, &RMF_EADATA);
         rep_body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
         buflen = req_capsule_get_size(&info->mti_pill, &RMF_EADATA, RCL_SERVER);
@@ -153,38 +150,41 @@ no_xattr:
 
 int mdt_setxattr(struct mdt_thread_info *info)
 {
-        int    rc;
-        char  *xattr_name;
-        int    xattr_len;
-        struct ptlrpc_request *req = mdt_info_req(info);
-        __u64 lockpart;
+        struct ptlrpc_request  *req = mdt_info_req(info);
+        const char              user_string[] = "user.";
+        const char              trust_string[] = "trusted.";
         struct mdt_lock_handle *lh;
+        __u64                   valid = info->mti_body->valid;
+        char                   *xattr_name;
+        int                     xattr_len;
+        __u64                   lockpart;
+        int                     rc;
         ENTRY;
 
 
         DEBUG_REQ(D_INODE, req, "setxattr "DFID3"\n",
                   PFID3(&info->mti_body->fid1));
-
-/*        MDS_CHECK_RESENT(req, mds_reconstruct_generic(req)); */
+        if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SETXATTR)) {
+                RETURN(rc = -ENOMEM);
+        }
 
         /* various sanity check for xattr name */
         xattr_name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
         if (!xattr_name) {
-                CERROR("can't extract xattr name\n");
-                GOTO(out, rc = -EPROTO);
+                GOTO(out, rc = -EFAULT);
         }
 
         CDEBUG(D_INODE, "%s xattr %s\n",
                   info->mti_body->valid & OBD_MD_FLXATTR ? "set" : "remove",
                   xattr_name);
 
-        if (strncmp(xattr_name, "trusted.", 8) == 0) {
-                if (strcmp(xattr_name + 8, "lov") == 0)
+        if (strncmp(xattr_name, trust_string, sizeof(trust_string) - 1) == 0) {
+                if (strcmp(xattr_name + 8, XATTR_NAME_LOV) == 0)
                         GOTO(out, rc = -EACCES);
         }
 
         if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_XATTR) &&
-            (strncmp(xattr_name, "user.", 5) == 0)) {
+            (strncmp(xattr_name, user_string, sizeof(user_string) - 1) == 0)) {
                 GOTO(out, rc = -EOPNOTSUPP);
         }
 
@@ -194,15 +194,14 @@ int mdt_setxattr(struct mdt_thread_info *info)
 
         lh = &info->mti_lh[MDT_LH_PARENT];
         lh->mlh_mode = LCK_EX;
-        rc = mdt_object_lock(info->mti_mdt->mdt_namespace, info->mti_object, 
-                                 lh, lockpart);
+        rc = mdt_object_lock(info, info->mti_object, lh, lockpart);
         if (rc != 0)
                 GOTO(out, rc);
 
         if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
                 GOTO(out_unlock, rc = -EROFS);
 
-        if (info->mti_body->valid & OBD_MD_FLXATTR) {
+        if ((valid & OBD_MD_FLXATTR) == OBD_MD_FLXATTR) {
                 char * xattr; 
                 if (!req_capsule_field_present(&info->mti_pill, &RMF_EADATA)) {
                         CERROR("no xattr data supplied\n");
@@ -219,7 +218,7 @@ int mdt_setxattr(struct mdt_thread_info *info)
                                           mdt_object_child(info->mti_object),
                                           xattr, xattr_len, xattr_name);
                 }
-        } else if (info->mti_body->valid & OBD_MD_FLXATTRRM) {
+        } else if ((valid & OBD_MD_FLXATTRRM) == OBD_MD_FLXATTRRM) {
                 rc = mo_xattr_del(info->mti_ctxt, 
                                   mdt_object_child(info->mti_object),
                                   xattr_name);
@@ -229,8 +228,7 @@ int mdt_setxattr(struct mdt_thread_info *info)
         }
         EXIT;
 out_unlock:
-        mdt_object_unlock(info->mti_mdt->mdt_namespace, 
-                          info->mti_object, lh);
+        mdt_object_unlock(info, info->mti_object, lh);
 out:
         return rc;
 }