Whamcloud - gitweb
update after codeinsp
authortappro <tappro>
Mon, 11 Sep 2006 23:36:00 +0000 (23:36 +0000)
committertappro <tappro>
Mon, 11 Sep 2006 23:36:00 +0000 (23:36 +0000)
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_recovery.c

index 2bd74d4..39a4e88 100644 (file)
@@ -71,25 +71,18 @@ struct mdt_client_data {
 
 static inline __u64 mcd_last_transno(struct mdt_client_data *mcd)
 {
-        return (le64_to_cpu(mcd->mcd_last_transno) >
-                le64_to_cpu(mcd->mcd_last_close_transno) ?
-                le64_to_cpu(mcd->mcd_last_transno) :
-                le64_to_cpu(mcd->mcd_last_close_transno));
+        return max(mcd->mcd_last_transno, mcd->mcd_last_close_transno);
 }
 
 static inline __u64 mcd_last_xid(struct mdt_client_data *mcd)
 {
-        return (le64_to_cpu(mcd->mcd_last_xid) >
-                le64_to_cpu(mcd->mcd_last_close_xid) ?
-                le64_to_cpu(mcd->mcd_last_xid) :
-                le64_to_cpu(mcd->mcd_last_close_xid));
+        return max(mcd->mcd_last_xid, mcd->mcd_last_close_xid);
 }
 
 /* copied from lr_server_data.
  * mds data stored at the head of last_rcvd file. In le32 order. */
 struct mdt_server_data {
         __u8  msd_uuid[40];        /* server UUID */
-        __u64 msd_unused;          /* was fsd_last_objid - don't use for now */
         __u64 msd_last_transno;    /* last completed transaction ID */
         __u64 msd_mount_count;     /* incarnation number */
         __u32 msd_feature_compat;  /* compatible feature flags */
@@ -98,13 +91,13 @@ struct mdt_server_data {
         __u32 msd_server_size;     /* size of server data area */
         __u32 msd_client_start;    /* start of per-client data area */
         __u16 msd_client_size;     /* size of per-client data area */
-        __u16 msd_subdir_count;    /* number of subdirectories for objects */
-        __u64 msd_catalog_oid;     /* recovery catalog object id */
-        __u32 msd_catalog_ogen;    /* recovery catalog inode generation */
-        __u8  msd_peeruuid[40];    /* UUID of MDS associated with this OST */
-        __u32 msd_ost_index;       /* index number of OST in LOV */
-        __u32 msd_mdt_index;       /* index number of MDT in LMV */
-        __u8  msd_padding[LR_SERVER_SIZE - 148];
+        //__u16 msd_subdir_count;    /* number of subdirectories for objects */
+        //__u64 msd_catalog_oid;     /* recovery catalog object id */
+        //__u32 msd_catalog_ogen;    /* recovery catalog inode generation */
+        //__u8  msd_peeruuid[40];    /* UUID of MDS associated with this OST */
+        //__u32 msd_ost_index;       /* index number of OST in LOV */
+        //__u32 msd_mdt_index;       /* index number of MDT in LMV */
+        __u8  msd_padding[LR_SERVER_SIZE - 78];
 };
 
 struct mdt_object;
@@ -163,7 +156,9 @@ struct mdt_device {
         int                        mdt_max_cookiesize;
         __u64                      mdt_mount_count;
 
+        /* last_rcvd data */
         struct mdt_server_data     mdt_msd;
+        spinlock_t                 mdt_client_bitmap_lock;
         unsigned long              mdt_client_bitmap[(LR_MAX_CLIENTS >> 3) / sizeof(long)];
 };
 
@@ -296,6 +291,13 @@ struct mdt_thread_info {
                         struct l_wait_info mti_wait_info;
                 } rdpg;
         } mti_u;
+
+        /* server and client data buffers */
+        struct mdt_server_data     mti_msd;
+        struct mdt_client_data     mti_mcd;
+        loff_t                     mti_off;
+        struct txn_param           mti_txn_param;
+
 };
 /*
  * Info allocated per-transaction.
index b5bd3c1..5f98f7c 100644 (file)
@@ -6,7 +6,7 @@
  *
  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
  *   Author: Huang Hua <huanghua@clusterfs.com>
- *   Author; Pershin Mike <tappro@clusterfs.com>
+ *   Author: Pershin Mike <tappro@clusterfs.com>
  *
  *   This file is part of the Lustre file system, http://www.lustre.org
  *   Lustre is a trademark of Cluster File Systems, Inc.
 
 #include "mdt_internal.h"
 
-static int mdt_update_server_data(const struct lu_context *ctx,
+static int mdt_server_data_update(const struct lu_context *ctx,
                                   struct mdt_device *mdt);
 
 /* TODO: maybe this pair should be defined in dt_object.c */
-static /*inline*/ ssize_t mdt_read_record(const struct lu_context *ctx,
-                                      struct dt_object *dt,
-                                      void *buf,
-                                      size_t count,
-                                      loff_t *pos)
+static int mdt_record_read(const struct lu_context *ctx,
+                           struct dt_object *dt, void *buf,
+                           size_t count, loff_t *pos)
 {
         int rc;
 
@@ -56,17 +54,14 @@ static /*inline*/ ssize_t mdt_read_record(const struct lu_context *ctx,
         return rc;
 }
 
-static /*inline*/ ssize_t mdt_write_record(const struct lu_context *ctx,
-                                       struct dt_object *dt,
-                                       const void *buf,
-                                       size_t count,
-                                       loff_t *pos,
-                                       struct thandle *th)
+static int mdt_record_write(const struct lu_context *ctx,
+                            struct dt_object *dt, const void *buf,
+                            size_t count, loff_t *pos, struct thandle *th)
 {
         int rc;
 
         LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
-
+        LASSERT(th != NULL);
         rc = dt->do_body_ops->dbo_write(ctx, dt, buf, count, pos, th);
         if (rc == count)
                 rc = 0;
@@ -74,53 +69,173 @@ static /*inline*/ ssize_t mdt_write_record(const struct lu_context *ctx,
                 rc = -EFAULT;
         return rc;
 }
+/* only one record write */
+
+enum {
+        MDT_TXN_WRITE_RECORD_CREDITS = 3
+};
+
+static struct thandle* mdt_trans_start(const struct lu_context *ctx,
+                                       struct mdt_device *mdt)
+{
+        struct mdt_thread_info *mti;
+        struct txn_param *p;
+
+        mti = lu_context_key_get(ctx, &mdt_thread_key);
+        p = &mti->mti_txn_param;
+        return mdt->mdt_bottom->dd_ops->dt_trans_start(ctx, mdt->mdt_bottom, p);
+}
+
+static void mdt_trans_stop(const struct lu_context *ctx,
+                           struct mdt_device *mdt, struct thandle *th)
+{
+        mdt->mdt_bottom->dd_ops->dt_trans_stop(ctx, th);
+}
 
 /* last_rcvd handling */
-static int mdt_read_last_rcvd_header(const struct lu_context *ctx,
+static inline void msd_le_to_cpu(struct mdt_server_data *buf,
+                                 struct mdt_server_data *msd)
+{
+        msd->msd_last_transno     = le64_to_cpu(buf->msd_last_transno);
+        msd->msd_mount_count      = le64_to_cpu(buf->msd_mount_count);
+        msd->msd_feature_compat   = le32_to_cpu(buf->msd_feature_compat);
+        msd->msd_feature_rocompat = le32_to_cpu(buf->msd_feature_rocompat);
+        msd->msd_feature_incompat = le32_to_cpu(buf->msd_feature_incompat);
+        msd->msd_server_size      = le32_to_cpu(buf->msd_server_size);
+        msd->msd_client_start     = le32_to_cpu(buf->msd_client_start);
+        msd->msd_client_size      = le16_to_cpu(buf->msd_client_size);
+}
+
+static inline void msd_cpu_to_le(struct mdt_server_data *msd,
+                                 struct mdt_server_data *buf)
+{
+        buf->msd_last_transno     = cpu_to_le64(msd->msd_last_transno);
+        buf->msd_mount_count      = cpu_to_le64(msd->msd_mount_count);
+        buf->msd_feature_compat   = cpu_to_le32(msd->msd_feature_compat);
+        buf->msd_feature_rocompat = cpu_to_le32(msd->msd_feature_rocompat);
+        buf->msd_feature_incompat = cpu_to_le32(msd->msd_feature_incompat);
+        buf->msd_server_size      = cpu_to_le32(msd->msd_server_size);
+        buf->msd_client_start     = cpu_to_le32(msd->msd_client_start);
+        buf->msd_client_size      = cpu_to_le16(msd->msd_client_size);
+}
+
+static inline void mcd_le_to_cpu(struct mdt_client_data *buf,
+                                 struct mdt_client_data *mcd)
+{
+        mcd->mcd_last_transno       = le64_to_cpu(buf->mcd_last_transno);
+        mcd->mcd_last_xid           = le64_to_cpu(buf->mcd_last_xid);
+        mcd->mcd_last_result        = le32_to_cpu(buf->mcd_last_result);
+        mcd->mcd_last_data          = le32_to_cpu(buf->mcd_last_data);
+        mcd->mcd_last_close_transno = le64_to_cpu(buf->mcd_last_close_transno);
+        mcd->mcd_last_close_xid     = le64_to_cpu(buf->mcd_last_close_xid);
+        mcd->mcd_last_close_result  = le32_to_cpu(buf->mcd_last_close_result);
+}
+
+static inline void mcd_cpu_to_le(struct mdt_client_data *mcd,
+                                 struct mdt_client_data *buf)
+{
+        buf->mcd_last_transno       = cpu_to_le64(mcd->mcd_last_transno);
+        buf->mcd_last_xid           = cpu_to_le64(mcd->mcd_last_xid);
+        buf->mcd_last_result        = cpu_to_le32(mcd->mcd_last_result);
+        buf->mcd_last_data          = cpu_to_le32(mcd->mcd_last_data);
+        buf->mcd_last_close_transno = cpu_to_le64(mcd->mcd_last_close_transno);
+        buf->mcd_last_close_xid     = cpu_to_le64(mcd->mcd_last_close_xid);
+        buf->mcd_last_close_result  = cpu_to_le32(mcd->mcd_last_close_result);
+}
+
+static int mdt_last_rcvd_header_read(const struct lu_context *ctx,
                                      struct mdt_device *mdt,
                                      struct mdt_server_data *msd)
 {
-        loff_t off = 0;
-        return mdt_read_record(ctx, mdt->mdt_last_rcvd, 
-                               msd, sizeof(*msd), &off);
+        struct mdt_thread_info *mti;
+        struct mdt_server_data *tmp;
+        loff_t *off;
+        int rc;
+
+        mti = lu_context_key_get(ctx, &mdt_thread_key);
+        /* temporary stuff for read */
+        tmp = &mti->mti_msd;
+        off = &mti->mti_off;
+        rc = mdt_record_read(ctx, mdt->mdt_last_rcvd, 
+                             tmp, sizeof(*tmp), off);
+        if (rc == 0) {
+                memcpy(msd->msd_uuid, tmp->msd_uuid, sizeof (msd->msd_uuid));
+                msd_le_to_cpu(tmp, msd);
+        }
+        return 0;
 }
 
-static int mdt_write_last_rcvd_header(const struct lu_context *ctx,
+static int mdt_last_rcvd_header_write(const struct lu_context *ctx,
                                       struct mdt_device *mdt,
-                                      struct mdt_server_data *msd,
-                                      struct thandle *th)
+                                      struct mdt_server_data *msd)
 {
+        struct mdt_thread_info *mti;
+        struct mdt_server_data *tmp;
+        struct thandle *th;
+        loff_t *off;
         int rc;
-        loff_t off = 0;
 
-        rc = mdt_write_record(ctx, mdt->mdt_last_rcvd, 
-                              msd, sizeof(*msd), &off, th);
-        
-        CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
-                       "uuid = %s\n"
-                       "last_transno = "LPU64"\n",
-                        rc,
-                        msd->msd_uuid,
-                        msd->msd_last_transno);
+        mti = lu_context_key_get(ctx, &mdt_thread_key);
+
+        mti->mti_txn_param.tp_credits = MDT_TXN_WRITE_RECORD_CREDITS;
+        th = mdt_trans_start(ctx, mdt);
+        if (IS_ERR(th))
+                RETURN(PTR_ERR(th));
+
+        /* temporary stuff for read */
+        tmp = &mti->mti_msd;
+        off = &mti->mti_off;
+
+        memcpy(tmp->msd_uuid, msd->msd_uuid, sizeof (msd->msd_uuid));
+        msd_cpu_to_le(msd, tmp);
+
+        rc = mdt_record_write(ctx, mdt->mdt_last_rcvd, 
+                              tmp, sizeof(*tmp), off, th);
+
+        CDEBUG(D_INFO, "write last_rcvd header (rc = %d):\n"
+                       "uuid = %s\nlast_transno = "LPU64"\n",
+                        rc, msd->msd_uuid, msd->msd_last_transno);
+
+        mdt_trans_stop(ctx, mdt, th);
         return rc;
 }
 
-static int mdt_read_last_rcvd(const struct lu_context *ctx,
+static int mdt_last_rcvd_read(const struct lu_context *ctx,
                               struct mdt_device *mdt,
                               struct mdt_client_data *mcd, loff_t *off)
 {
-        return mdt_read_record(ctx, mdt->mdt_last_rcvd, 
-                               mcd, sizeof(*mcd), off);
+        struct mdt_thread_info *mti;
+        struct mdt_client_data *tmp;
+        int rc;
+
+        mti = lu_context_key_get(ctx, &mdt_thread_key);
+        tmp = &mti->mti_mcd;
+        rc = mdt_record_read(ctx, mdt->mdt_last_rcvd, tmp, sizeof(*tmp), off);
+        if (rc == 0) {
+                memcpy(mcd->mcd_uuid, tmp->mcd_uuid, sizeof (mcd->mcd_uuid));
+                mcd_le_to_cpu(tmp, mcd);
+        }
+        return rc;
 }
 
-static int mdt_write_last_rcvd(const struct lu_context *ctx,
+static int mdt_last_rcvd_write(const struct lu_context *ctx,
                                struct mdt_device *mdt,
                                struct mdt_client_data *mcd,
                                loff_t *off, struct thandle *th)
 {
+        struct mdt_thread_info *mti;
+        struct mdt_client_data *tmp;
         int rc;
-        rc = mdt_write_record(ctx, mdt->mdt_last_rcvd, 
-                              mcd, sizeof(*mcd), off, th);
+
+        LASSERT(th != NULL);
+        mti = lu_context_key_get(ctx, &mdt_thread_key);
+        tmp = &mti->mti_mcd;
+
+        memcpy(mcd->mcd_uuid, tmp->mcd_uuid, sizeof (mcd->mcd_uuid));
+        mcd_cpu_to_le(mcd, tmp);
+
+        rc = mdt_record_write(ctx, mdt->mdt_last_rcvd,
+                              tmp, sizeof(*tmp), off, th);
 
         CDEBUG(D_INFO, "write mcd rc = %d:\n"
                        "uuid = %s\n"
@@ -143,14 +258,15 @@ static int mdt_write_last_rcvd(const struct lu_context *ctx,
         return rc;
 }
 
-static int mdt_init_clients_data(const struct lu_context *ctx,
-                                struct mdt_device *mdt,
-                                unsigned long last_size)
+
+static int mdt_clients_data_init(const struct lu_context *ctx,
+                                 struct mdt_device *mdt,
+                                 unsigned long last_size)
 {
         struct mdt_server_data *msd = &mdt->mdt_msd;
         struct mdt_client_data *mcd = NULL;
         struct obd_device      *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
-        loff_t off = 0;
+        loff_t off;
         int cl_idx;
         int rc = 0;
         ENTRY;
@@ -159,22 +275,22 @@ static int mdt_init_clients_data(const struct lu_context *ctx,
          * the header.  If we find clients with higher last_transno values
          * then those clients may need recovery done. */
 
-        for (cl_idx = 0, off = le32_to_cpu(msd->msd_client_start);
+        for (cl_idx = 0, off = msd->msd_client_start;
              off < last_size; cl_idx++) {
                 __u64 last_transno;
                 struct obd_export *exp;
                 struct mdt_export_data *med;
                 
                 if (!mcd) {
-                        OBD_ALLOC_WAIT(mcd, sizeof(*mcd));
+                        OBD_ALLOC_PTR(mcd);
                         if (!mcd)
                                 RETURN(-ENOMEM);
                 }
 
-                off = le32_to_cpu(msd->msd_client_start) +
-                        cl_idx * le16_to_cpu(msd->msd_client_size);
+                off = msd->msd_client_start +
+                        cl_idx * msd->msd_client_size;
 
-                rc = mdt_read_last_rcvd(ctx, mdt, mcd, &off);
+                rc = mdt_last_rcvd_read(ctx, mdt, mcd, &off);
                 if (rc) {
                         CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
                                LAST_RCVD, cl_idx, off, rc);
@@ -196,16 +312,14 @@ static int mdt_init_clients_data(const struct lu_context *ctx,
                  */
                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
                        " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx,
-                       last_transno, le64_to_cpu(msd->msd_last_transno),
+                       last_transno, msd->msd_last_transno,
                        mcd_last_xid(mcd));
 
                 exp = class_new_export(obd, (struct obd_uuid *)mcd->mcd_uuid);
                 if (IS_ERR(exp)) {
                         rc = 0;
                         continue;
-                        /* FIXME: Do we really want to return error?
-                        GOTO(err_client, rc = PTR_ERR(exp));
-                        */
+                        /* FIXME: Do we really want to return error? */
                 }
 
                 med = &exp->exp_mdt_data;
@@ -221,7 +335,7 @@ static int mdt_init_clients_data(const struct lu_context *ctx,
 
                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
                        cl_idx, last_transno);
-
+                /* protect __u64 value update */
                 spin_lock(&mdt->mdt_transno_lock);
                 if (last_transno > mdt->mdt_last_transno)
                         mdt->mdt_last_transno = last_transno;
@@ -233,7 +347,7 @@ static int mdt_init_clients_data(const struct lu_context *ctx,
         RETURN(rc);
 }
 
-static int mdt_init_server_data(const struct lu_context *ctx,
+static int mdt_server_data_init(const struct lu_context *ctx,
                                 struct mdt_device *mdt)
 {
         struct mdt_server_data *msd = &mdt->mdt_msd;
@@ -242,15 +356,15 @@ static int mdt_init_server_data(const struct lu_context *ctx,
         struct mdt_thread_info *mti;
         struct dt_object       *obj;
         struct lu_attr         *la;
-        unsigned long last_rcvd_size = 0;
+        unsigned long last_rcvd_size;
         __u64 mount_count;
-        int rc = 0;
+        int rc;
         ENTRY;
 
         /* ensure padding in the struct is the correct size */
-        LASSERT(offsetof(struct mdt_server_data, msd_padding) +
+        CLASSERT(offsetof(struct mdt_server_data, msd_padding) +
                 sizeof(msd->msd_padding) == LR_SERVER_SIZE);
-        LASSERT(offsetof(struct mdt_client_data, mcd_padding) +
+        CLASSERT(offsetof(struct mdt_client_data, mcd_padding) +
                 sizeof(mcd->mcd_padding) == LR_CLIENT_SIZE);
 
         mti = lu_context_key_get(ctx, &mdt_thread_key);
@@ -273,14 +387,14 @@ static int mdt_init_server_data(const struct lu_context *ctx,
                        sizeof(msd->msd_uuid));
                 msd->msd_last_transno = 0;
                 msd->msd_mount_count = 0;
-                msd->msd_server_size = cpu_to_le32(LR_SERVER_SIZE);
-                msd->msd_client_start = cpu_to_le32(LR_CLIENT_START);
-                msd->msd_client_size = cpu_to_le16(LR_CLIENT_SIZE);
-                msd->msd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID);
-                msd->msd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT |
-                                                       OBD_INCOMPAT_COMMON_LR);
+                msd->msd_server_size = LR_SERVER_SIZE;
+                msd->msd_client_start = LR_CLIENT_START;
+                msd->msd_client_size = LR_CLIENT_SIZE;
+                msd->msd_feature_rocompat = OBD_ROCOMPAT_LOVOBJID;
+                msd->msd_feature_incompat = OBD_INCOMPAT_MDT |
+                                                       OBD_INCOMPAT_COMMON_LR;
         } else {
-                rc = mdt_read_last_rcvd_header(ctx, mdt, msd);
+                rc = mdt_last_rcvd_header_read(ctx, mdt, msd);
                 if (rc) {
                         CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
                         GOTO(out, rc);
@@ -293,8 +407,8 @@ static int mdt_init_server_data(const struct lu_context *ctx,
                         GOTO(out, rc = -EINVAL);
                 }
         }
-        mount_count = le64_to_cpu(msd->msd_mount_count);
-
+        mount_count = msd->msd_mount_count;
+#if 0
         if (msd->msd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
                        obd->obd_name, le32_to_cpu(msd->msd_feature_incompat) &
@@ -317,28 +431,30 @@ static int mdt_init_server_data(const struct lu_context *ctx,
                 msd->msd_feature_incompat |= cpu_to_le32(LR_INCOMPAT_COMMON_LR);
                 */
         }
-        msd->msd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
+#endif
+        msd->msd_feature_compat = OBD_COMPAT_MDT;
 
         spin_lock(&mdt->mdt_transno_lock);
-        mdt->mdt_last_transno = le64_to_cpu(msd->msd_last_transno);
+        mdt->mdt_last_transno = msd->msd_last_transno;
         spin_unlock(&mdt->mdt_transno_lock);
+
         CDEBUG(D_INODE, "========BEGIN DUMPING LAST_RCVD========\n");
         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
                obd->obd_name, mdt->mdt_last_transno);
         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
                obd->obd_name, mount_count + 1);
         CDEBUG(D_INODE, "%s: server data size: %u\n",
-               obd->obd_name, le32_to_cpu(msd->msd_server_size));
+               obd->obd_name, msd->msd_server_size);
         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
-               obd->obd_name, le32_to_cpu(msd->msd_client_start));
+               obd->obd_name, msd->msd_client_start);
         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
-               obd->obd_name, le32_to_cpu(msd->msd_client_size));
+               obd->obd_name, msd->msd_client_size);
         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
                obd->obd_name, last_rcvd_size);
         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
-               last_rcvd_size <= le32_to_cpu(msd->msd_client_start) ? 0 :
-               (last_rcvd_size - le32_to_cpu(msd->msd_client_start)) /
-                le16_to_cpu(msd->msd_client_size));
+               last_rcvd_size <= msd->msd_client_start ? 0 :
+               (last_rcvd_size - msd->msd_client_start) /
+                msd->msd_client_size);
         CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
 
         if (!msd->msd_server_size || !msd->msd_client_start ||
@@ -347,7 +463,7 @@ static int mdt_init_server_data(const struct lu_context *ctx,
                 GOTO(out, rc = -EINVAL);
         }
 
-        rc = mdt_init_clients_data(ctx, mdt, last_rcvd_size);
+        rc = mdt_clients_data_init(ctx, mdt, last_rcvd_size);
         if (rc)
                 GOTO(err_client, rc);
 
@@ -370,10 +486,10 @@ static int mdt_init_server_data(const struct lu_context *ctx,
         }
 
         mdt->mdt_mount_count++;
-        msd->msd_mount_count = cpu_to_le64(mdt->mdt_mount_count);
+        msd->msd_mount_count = mdt->mdt_mount_count;
 
         /* save it, so mount count and last_transno is current */
-        rc = mdt_update_server_data(ctx, mdt);
+        rc = mdt_server_data_update(ctx, mdt);
         if (rc)
                 GOTO(err_client, rc);
 
@@ -385,24 +501,78 @@ out:
         return rc;
 }
 
-static int mdt_update_server_data(const struct lu_context *ctx,
+static int mdt_server_data_update(const struct lu_context *ctx,
                                   struct mdt_device *mdt)
 {
         struct mdt_server_data *msd = &mdt->mdt_msd;
-        int rc = 0;
+        int rc;
         ENTRY;
 
         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
                 mdt->mdt_mount_count, mdt->mdt_last_transno);
 
         spin_lock(&mdt->mdt_transno_lock);
-        msd->msd_last_transno = cpu_to_le64(mdt->mdt_last_transno);
+        msd->msd_last_transno = mdt->mdt_last_transno;
         spin_unlock(&mdt->mdt_transno_lock);
 
-        rc = mdt_write_last_rcvd_header(ctx, mdt, msd, NULL);
+        rc = mdt_last_rcvd_header_write(ctx, mdt, msd);
         RETURN(rc);
 }
+#if 0
+int mdt_client_new(const struct lu_context *ctx,
+                   struct mdt_device *mdt,
+                   struct mdt_export_data *med)
+{
+        unsigned long *bitmap = mdt->mdt_client_bitmap;
+        struct mdt_client_data *mcd = med->med_mcd;
+        struct mdt_server_data *msd = &mdt->mdt_msd;
+        int rc = 0;
+        ENTRY;
+
+        LASSERT(bitmap != NULL);
+
+        /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
+         * there's no need for extra complication here
+         */
+        cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
+repeat:
+        if (cl_idx >= LR_MAX_CLIENTS ||
+            MDT_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
+                CERROR("no room for clients - fix LR_MAX_CLIENTS\n");
+                return -EOVERFLOW;
+        }
+        if (test_and_set_bit(cl_idx, bitmap)) {
+                cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
+                                cl_idx);
+                goto repeat;
+        }
 
+        CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
+               cl_idx, med->med_mcd->mcd_uuid);
+
+        med->med_lr_idx = cl_idx;
+        med->med_lr_off = msd->msd_client_start +
+                          (cl_idx * msd->msd_client_size);
+        init_mutex(&med->med_mcd_lock);
+
+        LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
+
+        if (new_client) {
+                loff_t off = med->med_lr_off;
+                struct thandle *th;
+                th = mdt_trans_start(ctx, mdt);
+                if (IS_ERR(th))
+                        RETURN(PTR_ERR(th));
+                
+                rc = mdt_last_rcvd_write(ctx, mdt, mcd, &off, th);
+                CDEBUG(D_INFO, 
+                       "wrote client mcd at idx %u off %llu (len %u)\n",
+                       cl_idx, med->med_lr_off, sizeof(*mcd));
+                mdt_trans_stop(ctx, mdt, th);
+        }
+        RETURN(rc);
+}
+#endif
 /* Add client data to the MDS.  We use a bitmap to locate a free space
  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
  * Otherwise, we just have to read the data from the last_rcvd file and
@@ -453,28 +623,35 @@ int mdt_client_add(const struct lu_context *ctx,
                cl_idx, med->med_mcd->mcd_uuid);
 
         med->med_lr_idx = cl_idx;
-        med->med_lr_off = le32_to_cpu(msd->msd_client_start) +
-                          (cl_idx * le16_to_cpu(msd->msd_client_size));
+        med->med_lr_off = msd->msd_client_start +
+                          (cl_idx * msd->msd_client_size);
         init_mutex(&med->med_mcd_lock);
 
         LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
 
         if (new_client) {
                 loff_t off = med->med_lr_off;
-                rc = mdt_write_last_rcvd(ctx, mdt, mcd, &off, NULL);
+                struct thandle *th;
+                th = mdt_trans_start(ctx, mdt);
+                if (IS_ERR(th))
+                        RETURN(PTR_ERR(th));
+                
+                rc = mdt_last_rcvd_write(ctx, mdt, mcd, &off, th);
                 CDEBUG(D_INFO, 
                        "wrote client mcd at idx %u off %llu (len %u)\n",
                        cl_idx, med->med_lr_off, sizeof(*mcd));
+                mdt_trans_stop(ctx, mdt, th);
         }
         RETURN(rc);
 }
 
 int mdt_client_free(const struct lu_context *ctx,
-                    struct mdt_device *mdt,
-                    struct mdt_export_data *med)
+                   struct mdt_device *mdt,
+                   struct mdt_export_data *med)
 {
         struct mdt_client_data *mcd  = med->med_mcd;
         struct obd_device      *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
+        struct thandle *th;
         loff_t off;
         int rc = 0;
         ENTRY;
@@ -507,10 +684,16 @@ int mdt_client_free(const struct lu_context *ctx,
                 LBUG();
         }
 
+        th = mdt_trans_start(ctx, mdt);
+        if (IS_ERR(th))
+                GOTO(free, rc = PTR_ERR(th));
+
         mutex_down(&med->med_mcd_lock);
         memset(mcd, 0, sizeof *mcd);
-        rc = mdt_write_last_rcvd(ctx, mdt, mcd, &off, NULL);
+        
+        rc = mdt_last_rcvd_write(ctx, mdt, mcd, &off, th);
         mutex_up(&med->med_mcd_lock);
+        mdt_trans_stop(ctx, mdt, th);
         
         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
                         "zeroing out client idx %u in %s rc %d\n",
@@ -525,7 +708,7 @@ int mdt_client_free(const struct lu_context *ctx,
         /* Make sure the server's last_transno is up to date. Do this
          * after the client is freed so we know all the client's
          * transactions have been committed. */
-        mdt_update_server_data(ctx, mdt);
+        mdt_server_data_update(ctx, mdt);
 
         EXIT;
 free:
@@ -537,7 +720,7 @@ free:
 /*
  * last_rcvd & last_committed update callbacks
  */
-static int mdt_update_last_rcvd(struct mdt_thread_info *mti,
+static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
                                 struct thandle *th)
 {
         struct mdt_device *mdt = mti->mti_mdt;
@@ -567,23 +750,23 @@ static int mdt_update_last_rcvd(struct mdt_thread_info *mti,
         off = med->med_lr_off;
         mutex_down(&med->med_mcd_lock);
         if(lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
-                mcd->mcd_last_close_transno = cpu_to_le64(mti->mti_transno);
-                mcd->mcd_last_close_xid = cpu_to_le64(req->rq_xid);
-                mcd->mcd_last_close_result = cpu_to_le32(rc);
+                mcd->mcd_last_close_transno = mti->mti_transno;
+                mcd->mcd_last_close_xid = req->rq_xid;
+                mcd->mcd_last_close_result = rc;
         } else {
-                mcd->mcd_last_transno = cpu_to_le64(mti->mti_transno);
-                mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
-                mcd->mcd_last_result = cpu_to_le32(rc);
+                mcd->mcd_last_transno = mti->mti_transno;
+                mcd->mcd_last_xid = req->rq_xid;
+                mcd->mcd_last_result = rc;
                 /*XXX: save intent_disposition in mdt_thread_info?
                  * also there is bug - intent_dispostion is __u64,
                  * see struct ldlm_reply->lock_policy_res1; */
-                 mcd->mcd_last_data = cpu_to_le32(mti->mti_opdata);
+                 mcd->mcd_last_data = mti->mti_opdata;
         }
         if (off <= 0) {
                 CERROR("client idx %d has offset %lld\n", med->med_lr_idx, off);
                 err = -EINVAL;
         } else {
-                err = mdt_write_last_rcvd(mti->mti_ctxt, mdt, mcd, &off, th);
+                err = mdt_last_rcvd_write(mti->mti_ctxt, mdt, mcd, &off, th);
         }
         mutex_up(&med->med_mcd_lock);
         RETURN(err);
@@ -606,7 +789,7 @@ static int mdt_txn_start_cb(const struct lu_context *ctx,
 
 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
 {
-        return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid);
+        return req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid;
 }
 
 /* Update last_rcvd records with latests transaction data */
@@ -664,11 +847,11 @@ static int mdt_txn_stop_cb(const struct lu_context *ctx,
         txi->txi_transno = mti->mti_transno;
         spin_unlock(&mdt->mdt_transno_lock);
 
-        return mdt_update_last_rcvd(mti, txn);
+        return mdt_last_rcvd_update(mti, txn);
 }
 
 /* commit callback, need to update last_commited value */
-static int mdt_txn_commit_cb(const struct lu_context *ctxt
+static int mdt_txn_commit_cb(const struct lu_context *ctx, 
                              struct thandle *txn, void *cookie)
 {
         struct mdt_device *mdt = cookie;
@@ -711,7 +894,7 @@ int mdt_fs_setup(const struct lu_context *ctx, struct mdt_device *mdt)
                              LAST_RCVD, &last_fid);
         if(!IS_ERR(last)) {
                 mdt->mdt_last_rcvd = last;
-                rc = mdt_init_server_data(ctx, mdt);
+                rc = mdt_server_data_init(ctx, mdt);
                 if (rc) {
                         lu_object_put(ctx, &last->do_lu);
                         mdt->mdt_last_rcvd = NULL;