Whamcloud - gitweb
after first round review of MDT.
authorhuanghua <huanghua>
Thu, 29 Jun 2006 10:20:51 +0000 (10:20 +0000)
committerhuanghua <huanghua>
Thu, 29 Jun 2006 10:20:51 +0000 (10:20 +0000)
(1) remove dependcy from lustre/mds/mds_internal.h

lustre/include/lustre_export.h
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_reint.c
lustre/ptlrpc/layout.c

index 1f80d42..95bfa74 100644 (file)
@@ -10,6 +10,7 @@
 
 /* Data stored per client in the last_rcvd file.  In le32 order. */
 struct mds_client_data;
+struct mdt_client_data;
 
 struct mds_export_data {
         struct list_head        med_open_head;
@@ -20,6 +21,14 @@ struct mds_export_data {
         int                     med_lr_idx;
 };
 
+struct mdt_export_data {
+        struct list_head        med_open_head;
+        spinlock_t              med_open_lock; /* lock med_open_head, mfd_list*/
+        struct mdt_client_data *med_mcd;
+        __u64                   med_ibits_known;
+        loff_t                  med_lr_off;
+        int                     med_lr_idx;
+};
 struct osc_creator {
         spinlock_t              oscc_lock;
         struct list_head        oscc_list;
@@ -78,12 +87,14 @@ struct obd_export {
                                   exp_libclient:1; /* liblustre client? */
         union {
                 struct mds_export_data    eu_mds_data;
+                struct mdt_export_data    eu_mdt_data;
                 struct filter_export_data eu_filter_data;
                 struct ec_export_data     eu_ec_data;
         } u;
 };
 
 #define exp_mds_data    u.eu_mds_data
+#define exp_mdt_data    u.eu_mdt_data
 #define exp_lov_data    u.eu_lov_data
 #define exp_filter_data u.eu_filter_data
 #define exp_ec_data     u.eu_ec_data
index 53d01a1..4456fec 100644 (file)
@@ -52,8 +52,6 @@
 #include <obd.h>
 /* lu2dt_dev() */
 #include <dt_object.h>
-/* struct mds_client_data */
-#include "../mds/mds_internal.h"
 #include "mdt_internal.h"
 
 /*
@@ -375,13 +373,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         struct mdt_lock_handle *lhp;
         struct lu_fid child_fid;
         struct ldlm_namespace *ns;
-
         ENTRY;
 
-        lhp = &info->mti_lh[MDT_LH_PARENT];
-        lhp->mlh_mode = LCK_CR;
-        lhc->mlh_mode = LCK_CR;
-
         LASSERT(info->mti_object != NULL);
 
         name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
@@ -390,6 +383,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
 
         ns = info->mti_mdt->mdt_namespace;
         /*step 1: lock parent */
+        lhp = &info->mti_lh[MDT_LH_PARENT];
+        lhp->mlh_mode = LCK_CR;
         result = mdt_object_lock(ns, parent, lhp, MDS_INODELOCK_UPDATE);
         if (result != 0)
                 RETURN(result);
@@ -407,6 +402,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         /*step 4: lock child: this lock is returned back to caller
          *                    if successfully get attr.
          */
+        lhc->mlh_mode = LCK_CR;
         result = mdt_object_lock(ns, child, lhc, child_bits);
         if (result != 0)
                 GOTO(out_child, result);
@@ -426,12 +422,13 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         }
         if (result != 0)
                 mdt_object_unlock(ns, child, lhc);
+        EXIT;
 
 out_child:
         mdt_object_put(info->mti_ctxt, child);
 out_parent:
         mdt_object_unlock(ns, parent, lhp);
-        RETURN(result);
+        return result;
 }
 
 /* normal handler: should release the child lock */
@@ -593,13 +590,13 @@ static int mdt_pin(struct mdt_thread_info *info)
 /* TODO these two methods not available now. */
 
 /* this should sync the whole device */
-static int mdt_device_sync(struct mdt_device *mdt)
+static int mdt_device_sync(struct mdt_thread_info *info)
 {
         return 0;
 }
 
 /* this should sync this object */
-static int mdt_object_sync(struct mdt_object *m)
+static int mdt_object_sync(struct mdt_thread_info *info)
 {
         return 0;
 }
@@ -622,14 +619,14 @@ static int mdt_sync(struct mdt_thread_info *info)
                 /* sync the whole device */
                 rc = req_capsule_pack(pill);
                 if (rc == 0)
-                        rc = mdt_device_sync(info->mti_mdt);
+                        rc = mdt_device_sync(info);
         } else {
                 /* sync an object */
                 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS | HABEO_REFERO);
                 if (rc != 0)
                         RETURN(rc);
 
-                rc = mdt_object_sync(info->mti_object);
+                rc = mdt_object_sync(info);
                 if (rc != 0)
                         RETURN(rc);
 
@@ -890,7 +887,7 @@ static struct mdt_handler *mdt_handler_find(__u32 opc)
 
 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
 {
-        return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
+        return req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid;
 }
 
 static int mdt_lock_resname_compat(struct mdt_device *m,
@@ -1004,8 +1001,7 @@ static int mdt_req_handle(struct mdt_thread_info *info,
 
         if (h->mh_fmt != NULL) {
                 req_capsule_set(&info->mti_pill, h->mh_fmt);
-                if (result == 0)
-                        result = mdt_unpack_req_pack_rep(info, flags);
+                result = mdt_unpack_req_pack_rep(info, flags);
         }
 
         if (result == 0 && flags & HABEO_CLAVIS) {
@@ -1050,15 +1046,34 @@ static int mdt_req_handle(struct mdt_thread_info *info,
                         result = mdt_lock_reply_compat(info->mti_mdt, dlm_rep);
         }
 
-        /* If we're DISCONNECTing, the mds_export_data is already freed */
+        /* If we're DISCONNECTing, the mdt_export_data is already freed */
         if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
+#ifdef MDT_CODE
+                /* FIXME: fake untill journal callback & open handling is OK.*/ 
+                __u64 last_transno;
+                __u64 last_committed;
+                struct mdt_device *mdt = info->mti_mdt;
+
+                LASSERT(mdt != NULL);
+                spin_lock(&mdt->mdt_transno_lock);
+                last_transno = ++ (mdt->mdt_last_transno);
+                last_committed = ++ (mdt->mdt_last_committed);
+                spin_unlock(&mdt->mdt_transno_lock);
+                
+                req->rq_repmsg->transno = req->rq_transno = last_transno;
+                req->rq_repmsg->last_xid = req->rq_xid;
+                req->rq_repmsg->last_committed = last_committed;
+                req->rq_export->exp_obd->obd_last_committed = last_committed;
+#else
                 req->rq_repmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
                 target_committed_to_req(req);
+#endif
         }
         req_capsule_fini(&info->mti_pill);
         RETURN(result);
 }
 
+
 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
 {
         lh->mlh_lh.cookie = 0ull;
@@ -1482,9 +1497,21 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
         new_lock = ldlm_handle2lock(&lhc.mlh_lh);
         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
                 RETURN(0);
+        
+        LASSERTF(new_lock != NULL, "op %d lockh "LPX64"\n",
+                 opcode, lhc.mlh_lh.cookie);
 
         *lockp = new_lock;
 
+        /* FIXME:This only happen when I can handle RESENT */
+        if (new_lock->l_export == req->rq_export) {
+                /* Already gave this to the client, which means that we
+                 * reconstructed a reply. */
+                LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
+                        MSG_RESENT);
+                RETURN(ELDLM_LOCK_REPLACED);
+        }
+
         /* Fixup the lock to be given to the client */
         l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
         new_lock->l_readers = 0;
@@ -1589,12 +1616,10 @@ static int mdt_intent_code(long itcode)
 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
                           struct ldlm_lock **lockp, int flags)
 {
-        int opc;
-        int rc;
-
         struct req_capsule   *pill;
         struct mdt_it_flavor *flv;
-
+        int opc;
+        int rc;
         ENTRY;
 
         opc = mdt_intent_code(itopc);
@@ -1879,7 +1904,7 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
                 .psc_max_reply_size   = MDS_MAXREPSIZE,
                 .psc_req_portal       = MDS_REQUEST_PORTAL,
                 .psc_rep_portal       = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT,
+                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
                 /*
                  * We'd like to have a mechanism to set this on a per-device
                  * basis, but alas...
@@ -2075,6 +2100,13 @@ static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
         obd = class_name2obd(dev);
         m->mdt_md_dev.md_lu_dev.ld_obd = obd;
 
+        spin_lock_init(&m->mdt_transno_lock);
+        /* FIXME: We need to load them from disk. But now fake it */
+        m->mdt_last_transno = 0;
+        m->mdt_last_committed = 0;
+        m->mdt_max_mdsize = MAX_MD_SIZE;
+        m->mdt_max_cookiesize = sizeof(struct llog_cookie);
+
         OBD_ALLOC_PTR(s);
         if (s == NULL)
                 RETURN(-ENOMEM);
@@ -2260,7 +2292,7 @@ static int mdt_connect0(struct mdt_device *mdt,
 
                 exp->exp_connect_flags = data->ocd_connect_flags;
                 data->ocd_version = LUSTRE_VERSION_CODE;
-                exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known;
+                exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
         }
 
         if (mdt->mdt_opts.mo_acl &&
@@ -2280,8 +2312,8 @@ static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
         struct obd_export *exp;
         int rc;
         struct mdt_device *mdt;
-        struct mds_export_data *med;
-        struct mds_client_data *mcd;
+        struct mdt_export_data *med;
+        struct mdt_client_data *mcd;
         ENTRY;
 
         if (!conn || !obd || !cluuid)
@@ -2295,7 +2327,7 @@ static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
 
         exp = class_conn2export(conn);
         LASSERT(exp != NULL);
-        med = &exp->exp_mds_data;
+        med = &exp->exp_mdt_data;
 
         rc = mdt_connect0(mdt, exp, data);
         if (rc == 0) {
@@ -2316,7 +2348,7 @@ static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
 
 static int mdt_obd_disconnect(struct obd_export *exp)
 {
-        struct mds_export_data *med = &exp->exp_mds_data;
+        struct mdt_export_data *med = &exp->exp_mdt_data;
         unsigned long irqflags;
         int rc;
         ENTRY;
@@ -2403,8 +2435,8 @@ static struct lu_device *mdt_device_alloc(const struct lu_context *ctx,
                 l = &m->mdt_md_dev.md_lu_dev;
                 result = mdt_init0(ctx, m, t, cfg);
                 if (result != 0) {
-                        mdt_device_free(ctx, l);
-                        return ERR_PTR(result);
+                        OBD_FREE_PTR(m);
+                        l = ERR_PTR(result);
                 }
 
         } else
index f0099a3..84e9c7a 100644 (file)
@@ -49,6 +49,8 @@
 #include <lustre_fid.h>
 #include <lustre_fld.h>
 #include <lustre_req_layout.h>
+/* LR_CLIENT_SIZE, etc. */
+#include <lustre_disk.h>
 
 struct mdt_device {
         /* super-class */
@@ -73,7 +75,26 @@ struct mdt_device {
                 signed int         mo_user_xattr :1;
                 signed int         mo_acl        :1;
         } mdt_opts;
+        /* Transaction related stuff here */
+        spinlock_t                 mdt_transno_lock;
+        __u64                      mdt_last_transno;
+        __u64                      mdt_last_committed;
+        /* these values should be updated from lov if necessary.
+         * or should be placed somewhere else. */
+        int                        mdt_max_mdsize;
+        int                        mdt_max_cookiesize;
+
+};
+/* Data stored per client in the last_rcvd file.  In le32 order. */
+struct mdt_client_data {
+        __u8  mcd_uuid[40];     /* client UUID */
+        __u64 mcd_last_transno; /* last completed transaction ID */
+        __u64 mcd_last_xid;     /* xid for the last transaction */
+        __u32 mcd_last_result;  /* result from last RPC */
+        __u32 mcd_last_data;    /* per-op data (disposition for open &c.) */
+        __u8  mcd_padding[LR_CLIENT_SIZE - 64];
 };
+#define MDT_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
 
 static inline struct md_device_operations *mdt_child_ops(struct mdt_device * m)
 {
index d9fd941..cf022d7 100644 (file)
@@ -54,7 +54,7 @@ static int mdt_setattr_unpack(struct mdt_thread_info *info)
         rec = req_capsule_client_get(pill, &RMF_REC_SETATTR);
 
         if (rec == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
 
         rr->rr_fid1 = &rec->sa_fid;
         attr->la_valid = rec->sa_valid;
@@ -109,7 +109,7 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
                 attr->la_flags = rec->cr_flags;
                 attr->la_ctime = rec->cr_time;
                 if (req_capsule_field_present(pill, &RMF_SYMTGT))
-                        rr->rr_tgt  = req_capsule_client_get(pill, &RMF_SYMTGT);
+                        rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT);
 #endif
         } else
                 result = -EFAULT;
@@ -127,7 +127,7 @@ static int mdt_link_unpack(struct mdt_thread_info *info)
 
         rec = req_capsule_client_get(pill, &RMF_REC_LINK);
         if (rec == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
 
         attr->la_uid = rec->lk_fsuid;
         attr->la_gid = rec->lk_fsgid;
@@ -137,7 +137,7 @@ static int mdt_link_unpack(struct mdt_thread_info *info)
 
         rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
         if (rr->rr_name == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
         RETURN(0);
 #endif
         ENTRY;
@@ -155,7 +155,7 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info)
 
         rec = req_capsule_client_get(pill, &RMF_REC_UNLINK);
         if (rec == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
 
         attr->la_uid = rec->ul_fsuid;
         attr->la_gid = rec->ul_fsgid;
@@ -166,7 +166,7 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info)
 
         rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
         if (rr->rr_name == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
         RETURN(0);
 #endif
         ENTRY;
@@ -184,7 +184,7 @@ static int mdt_rename_unpack(struct mdt_thread_info *info)
 
         rec = req_capsule_client_get(pill, &RMF_REC_RENAME);
         if (rec == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
 
         attr->la_uid = rec->rn_fsuid;
         attr->la_gid = rec->rn_fsgid;
@@ -194,10 +194,10 @@ static int mdt_rename_unpack(struct mdt_thread_info *info)
 
         rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
         if (rr->rr_name == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
         rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT);
         if (rr->rr_tgt == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
         RETURN(0);
 #endif
  
index 7f418a9..3358a15 100644 (file)
@@ -51,30 +51,32 @@ static int mdt_md_create(struct mdt_thread_info *info)
         struct mdt_object      *child;
         struct mdt_lock_handle *lh;
         struct mdt_body        *repbody = info->mti_reint_rep.mrr_body;
+        struct lu_attr         *attr = &info->mti_attr;
+        struct mdt_reint_record *rr = &info->mti_rr;
         int rc;
         ENTRY;
 
         lh = &info->mti_lh[MDT_LH_PARENT];
         lh->mlh_mode = LCK_PW;
 
-        parent = mdt_object_find_lock(info->mti_ctxt, mdt, info->mti_rr.rr_fid1,
+        parent = mdt_object_find_lock(info->mti_ctxt, mdt, rr->rr_fid1,
                                       lh, MDS_INODELOCK_UPDATE);
         if (IS_ERR(parent))
-                return PTR_ERR(parent);
+                RETURN(PTR_ERR(parent));
 
-        child = mdt_object_find(info->mti_ctxt, mdt, info->mti_rr.rr_fid2);
+        child = mdt_object_find(info->mti_ctxt, mdt, rr->rr_fid2);
         if (!IS_ERR(child)) {
                 struct md_object *next = mdt_object_child(parent);
 
-                rc = mdo_create(info->mti_ctxt, next, info->mti_rr.rr_name,
-                                mdt_object_child(child), &info->mti_attr);
+                rc = mdo_create(info->mti_ctxt, next, rr->rr_name,
+                                mdt_object_child(child), attr);
                 if (rc == 0) {
-                        /* return fid to client. */
+                        /* return fid to client. attr is over-written!!*/
                         rc = mo_attr_get(info->mti_ctxt, 
                                          mdt_object_child(child), 
-                                         &info->mti_attr);
+                                         attr);
                         if (rc == 0) {
-                                mdt_pack_attr2body(repbody, &info->mti_attr);
+                                mdt_pack_attr2body(repbody, attr);
                                 repbody->fid1 = *mdt_object_fid(child);
                                 repbody->valid |= OBD_MD_FLID;
                         }
@@ -87,6 +89,7 @@ static int mdt_md_create(struct mdt_thread_info *info)
         RETURN(rc);
 }
 
+#if 0
 static int mdt_md_mkdir(struct mdt_thread_info *info)
 {
         struct mdt_device      *mdt = info->mti_mdt;
@@ -117,6 +120,7 @@ static int mdt_md_mkdir(struct mdt_thread_info *info)
         mdt_object_put(info->mti_ctxt, parent);
         return result;
 }
+#endif
 
 /* partial request to create object only */
 static int mdt_md_mkobj(struct mdt_thread_info *info)
@@ -158,7 +162,6 @@ static int mdt_reint_setattr(struct mdt_thread_info *info)
 #ifdef MDT_CODE
         struct lu_attr *attr = &info->mti_attr;
         struct mdt_reint_record *rr = &info->mti_rr;
-        struct req_capsule *pill = &info->mti_pill;
         struct ptlrpc_request *req = mdt_info_req(info);
         struct mdt_object *mo;
         struct mdt_lock_handle *lh;
@@ -174,13 +177,6 @@ static int mdt_reint_setattr(struct mdt_thread_info *info)
 
         /* MDS_CHECK_RESENT */
 
-        rc = req_capsule_pack(pill);
-        if (rc)
-                RETURN(rc);        
-
-        if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
-                RETURN(-EROFS);
-
         if (attr->la_valid & ATTR_FROM_OPEN) {
                 mo = mdt_object_find(info->mti_ctxt, info->mti_mdt, rr->rr_fid1);
                 if (IS_ERR(mo))
@@ -204,6 +200,9 @@ static int mdt_reint_setattr(struct mdt_thread_info *info)
         if (lu_object_assert_not_exists(info->mti_ctxt, &mo->mot_obj.mo_lu))
                 GOTO(out_unlock, rc = -ENOENT);
 
+        if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+                GOTO(out_unlock, rc = -EROFS);
+
         rc = mo_attr_set(info->mti_ctxt, mdt_object_child(mo), attr);
         if (rc != 0)
                 GOTO(out_unlock, rc);
@@ -224,7 +223,10 @@ static int mdt_reint_setattr(struct mdt_thread_info *info)
                 repbody->valid |= OBD_MD_FLMTIME;
         if (attr->la_valid & (ATTR_ATIME | ATTR_ATIME_SET))
                 repbody->valid |= OBD_MD_FLATIME;
-        
+
+        /* FIXME: I have to combine the attr_set & xattr_set into one single
+                  transaction. How can I?
+         */ 
         rc = mo_xattr_set(info->mti_ctxt, mdt_object_child(mo),
                           rr->rr_eadata, rr->rr_eadatalen, "lov");
         if (rc)
@@ -278,7 +280,6 @@ static int mdt_reint_unlink(struct mdt_thread_info *info)
 #ifdef MDT_CODE
         struct lu_attr *attr = &info->mti_attr;
         struct mdt_reint_record *rr = &info->mti_rr;
-        struct req_capsule *pill = &info->mti_pill;
         struct ptlrpc_request *req = mdt_info_req(info);
         struct mdt_object *mp;
         struct mdt_object *mc;
@@ -295,13 +296,6 @@ static int mdt_reint_unlink(struct mdt_thread_info *info)
 
         /* MDS_CHECK_RESENT here */
 
-        rc = req_capsule_pack(pill);
-        if (rc)
-                RETURN(rc);
-
-        if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
-                RETURN(-EROFS);
-
         /* step 1: lock the parent */
         lhp = &info->mti_lh[MDT_LH_PARENT];
         lhp->mlh_mode = LCK_EX;
@@ -311,6 +305,9 @@ static int mdt_reint_unlink(struct mdt_thread_info *info)
                 RETURN(PTR_ERR(mp));
 
         if (strlen(rr->rr_name) == 0) {
+                if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+                        GOTO(out_unlock_parent, rc = -EROFS);
+
                 /* remote partial operation */
                 rc = mo_ref_del(info->mti_ctxt, mdt_object_child(mp));
                 GOTO(out_unlock_parent, rc);
@@ -346,7 +343,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info)
         if (rc != 0)
                 GOTO(out_unlock_child, rc);
 
-        if ((S_ISREG(attr->la_mode) && attr->la_nlink == 1)) 
+        if ((S_ISREG(attr->la_mode) && attr->la_nlink == 1)) {
                 /* && if opencount == 0*/
                 /* see mds_reint */
                 void * lmm = req_capsule_server_get(&info->mti_pill,
@@ -371,6 +368,9 @@ static int mdt_reint_unlink(struct mdt_thread_info *info)
                 rc = 0;
         }
 
+        if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+                GOTO(out_unlock_child, rc = -EROFS);
+
         /* step 4: delete it */
         rc = mdo_unlink(info->mti_ctxt, mdt_object_child(mc),
                         mdt_object_child(mc), rr->rr_name);
@@ -398,15 +398,12 @@ out_unlock_parent:
 static int mdt_reint_link(struct mdt_thread_info *info)
 {
 #ifdef MDT_CODE
-        struct lu_attr *attr = &info->mti_attr;
         struct mdt_reint_record *rr = &info->mti_rr;
-        struct req_capsule *pill = &info->mti_pill;
         struct ptlrpc_request *req = mdt_info_req(info);
         struct mdt_object *ms;
         struct mdt_object *mt;
         struct mdt_lock_handle *lhs;
         struct mdt_lock_handle *lht;
-        struct mdt_body *repbody;
         int rc;
 
         ENTRY;
@@ -416,13 +413,6 @@ static int mdt_reint_link(struct mdt_thread_info *info)
 
         /* MDS_CHECK_RESENT here */
 
-        rc = req_capsule_pack(pill);
-        if (rc)
-                RETURN(rc);        
-
-        if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
-                RETURN(-EROFS);
-
         /* step 1: lock the source */
         lhs = &info->mti_lh[MDT_LH_PARENT];
         lhs->mlh_mode = LCK_EX;
@@ -432,6 +422,9 @@ static int mdt_reint_link(struct mdt_thread_info *info)
                 RETURN(PTR_ERR(ms));
 
         if (strlen(rr->rr_name) == 0) {
+                if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+                        GOTO(out_unlock_source, rc = -EROFS);
+
                 /* remote partial operation */
                 rc = mo_ref_add(info->mti_ctxt, mdt_object_child(ms));
                 GOTO(out_unlock_source, rc);
@@ -449,6 +442,9 @@ static int mdt_reint_link(struct mdt_thread_info *info)
          *      return -ENOENT
          */
 
+        if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+                GOTO(out_unlock_target, rc = -EROFS);
+
         /* step 4: link it */
         rc = mdo_link(info->mti_ctxt, mdt_object_child(mt),
                       mdt_object_child(ms), rr->rr_name);
@@ -470,15 +466,12 @@ out_unlock_source:
 /* partial operation for rename */
 static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
 {
-        struct lu_attr *attr = &info->mti_attr;
         struct mdt_reint_record *rr = &info->mti_rr;
-        struct req_capsule *pill = &info->mti_pill;
         struct ptlrpc_request *req = mdt_info_req(info);
         struct mdt_object *mtgtdir;
         struct mdt_object *mtgt = NULL;
         struct mdt_lock_handle *lh_tgtdir;
         struct mdt_lock_handle *lh_tgt;
-        struct mdt_body *repbody;
         struct lu_fid tgt_fid;
         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
         int rc;
@@ -511,18 +504,21 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
                 mtgt = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
                                             &tgt_fid, lh_tgt, 
                                             MDS_INODELOCK_LOOKUP);
-                if (IS_ERR(mold))
+                if (IS_ERR(mtgt))
                         GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
         }
         
+        if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+                GOTO(out_unlock_tgt, rc = -EROFS);
+
         /* step 3: rename_tgt or name_insert */
         if (mtgt)
                 rc = mdo_rename_tgt(info->mti_ctxt, mdt_object_child(mtgtdir),
                                     mdt_object_child(mtgt), 
-                                    &rr->rr_fid2, rr->rr_tgt);
+                                    rr->rr_fid2, rr->rr_tgt);
         else
                 rc = mdo_name_insert(info->mti_ctxt, mdt_object_child(mtgtdir),
-                                     rr->rr_tgt, &rr->rr_fid2);
+                                     rr->rr_tgt, rr->rr_fid2);
                                      
         GOTO(out_unlock_tgt, rc);
 out_unlock_tgt:
@@ -531,7 +527,7 @@ out_unlock_tgt:
                 mdt_object_put(info->mti_ctxt, mtgt);
         }
 out_unlock_tgtdir:
-        mdt_object_unlock(ns, msrcdir, lh_tgtdir);
+        mdt_object_unlock(ns, mtgtdir, lh_tgtdir);
         mdt_object_put(info->mti_ctxt, mtgtdir);
 out:
         return rc;
@@ -542,7 +538,6 @@ out:
 static int mdt_reint_rename(struct mdt_thread_info *info)
 {
 #ifdef MDT_CODE
-        struct lu_attr *attr = &info->mti_attr;
         struct mdt_reint_record *rr = &info->mti_rr;
         struct req_capsule *pill = &info->mti_pill;
         struct ptlrpc_request *req = mdt_info_req(info);
@@ -556,7 +551,6 @@ static int mdt_reint_rename(struct mdt_thread_info *info)
         struct mdt_lock_handle lh_new;
         struct lu_fid old_fid = {0};
         struct lu_fid new_fid = {0};
-        struct mdt_body *repbody;
         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
         int rc;
 
@@ -568,13 +562,6 @@ static int mdt_reint_rename(struct mdt_thread_info *info)
 
         /* MDS_CHECK_RESENT here */
 
-        rc = req_capsule_pack(pill);
-        if (rc)
-                RETURN(rc);
-
-        if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
-                RETURN(-EROFS);
-
         rc = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
         if (rc == 1) {
         /* if (strlen(rr->rr_name) == 0) {*/
@@ -633,6 +620,8 @@ static int mdt_reint_rename(struct mdt_thread_info *info)
         /* step 5:TODO orphan handling on new object*/
         /* if isorphan(mnew) {...}
          */
+        if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+                GOTO(out_unlock_new, rc = -EROFS);
 
         /* step 6: rename it */
         rc = mdo_rename(info->mti_ctxt, mdt_object_child(msrcdir),
index a4d2a16..0371c66 100644 (file)
@@ -394,7 +394,7 @@ const struct req_msg_field RMF_EADATA = DEFINE_MSGF("eadata", 0, 16, NULL);
 EXPORT_SYMBOL(RMF_EADATA);
 
 const struct req_msg_field RMF_LOGCOOKIES =
-        DEFINE_MSGF("logcookies", 0, 16, NULL);
+        DEFINE_MSGF("logcookies", 0, sizeof(struct llog_cookie), NULL);
 EXPORT_SYMBOL(RMF_LOGCOOKIES);
 
 const struct req_msg_field RMF_REINT_OPC =