Whamcloud - gitweb
b=6427
[fs/lustre-release.git] / lustre / mds / mds_lmv.c
index 6c76ff3..ce81bc0 100644 (file)
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <linux/module.h>
+#include <linux/dcache.h>
+#include <linux/namei.h>
+#include <linux/obd_support.h>
+#include <linux/obd_class.h>
+#include <linux/obd.h>
+#include <linux/lustre_lib.h>
 #include <linux/lustre_mds.h>
 #include <linux/lustre_idl.h>
 #include <linux/obd_class.h>
 #include <linux/obd_lov.h>
 #include <linux/lustre_lib.h>
 #include <linux/lustre_fsfilt.h>
+#include <linux/lustre_lite.h>
+#include <asm/div64.h>
 
 #include "mds_internal.h"
 
-
 /*
  * TODO:
  *   - magic in mea struct
- *   - error handling is totally missed
  */
-
-int mds_lmv_connect(struct obd_device *obd, char *lmv_name)
+int mds_md_connect(struct obd_device *obd, char *md_name)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lustre_handle conn = {0};
-        int rc, valsize, value;
+        int rc, value;
+        __u32 valsize;
         ENTRY;
 
-        if (IS_ERR(mds->mds_lmv_obd))
-                RETURN(PTR_ERR(mds->mds_lmv_obd));
+        if (IS_ERR(mds->mds_md_obd))
+                RETURN(PTR_ERR(mds->mds_md_obd));
 
-        if (mds->mds_lmv_connected)
+        if (mds->mds_md_connected)
                 RETURN(0);
 
-        down(&mds->mds_lmv_sem);
-        if (mds->mds_lmv_connected) {
-                up(&mds->mds_lmv_sem);
+        down(&mds->mds_md_sem);
+        if (mds->mds_md_connected) {
+                up(&mds->mds_md_sem);
                 RETURN(0);
         }
 
-        mds->mds_lmv_obd = class_name2obd(lmv_name);
-        if (!mds->mds_lmv_obd) {
-                CERROR("MDS cannot locate LMV %s\n",
-                       lmv_name);
-                mds->mds_lmv_obd = ERR_PTR(-ENOTCONN);
+        mds->mds_md_obd = class_name2obd(md_name);
+        if (!mds->mds_md_obd) {
+                CERROR("MDS cannot locate MD(LMV) %s\n",
+                       md_name);
+                mds->mds_md_obd = ERR_PTR(-ENOTCONN);
                 GOTO(err_last, rc = -ENOTCONN);
         }
 
-        rc = obd_connect(&conn, mds->mds_lmv_obd,
-                         &obd->obd_uuid, OBD_OPT_MDS_CONNECTION);
+        rc = obd_connect(&conn, mds->mds_md_obd, &obd->obd_uuid, NULL,
+                         OBD_OPT_MDS_CONNECTION);
         if (rc) {
-                CERROR("MDS cannot connect to LMV %s (%d)\n",
-                       lmv_name, rc);
-                mds->mds_lmv_obd = ERR_PTR(rc);
+                CERROR("MDS cannot connect to MD(LMV) %s (%d)\n",
+                       md_name, rc);
+                mds->mds_md_obd = ERR_PTR(rc);
                 GOTO(err_last, rc);
         }
-        mds->mds_lmv_exp = class_conn2export(&conn);
-        if (mds->mds_lmv_exp == NULL)
+        mds->mds_md_exp = class_conn2export(&conn);
+        if (mds->mds_md_exp == NULL)
                 CERROR("can't get export!\n");
 
-        rc = obd_register_observer(mds->mds_lmv_obd, obd);
+        rc = obd_register_observer(mds->mds_md_obd, obd);
         if (rc) {
-                CERROR("MDS cannot register as observer of LMV %s, "
-                       "rc = %d\n", lmv_name, rc);
+                CERROR("MDS cannot register as observer of MD(LMV) %s, "
+                       "rc = %d\n", md_name, rc);
                 GOTO(err_discon, rc);
         }
 
         /* retrieve size of EA */
-        rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsize"),
+        rc = obd_get_info(mds->mds_md_exp, strlen("mdsize"),
                           "mdsize", &valsize, &value);
         if (rc) 
                 GOTO(err_reg, rc);
@@ -100,41 +106,48 @@ int mds_lmv_connect(struct obd_device *obd, char *lmv_name)
                 mds->mds_max_mdsize = value;
 
         /* find our number in LMV cluster */
-        rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsnum"),
+        rc = obd_get_info(mds->mds_md_exp, strlen("mdsnum"),
                           "mdsnum", &valsize, &value);
         if (rc) 
                 GOTO(err_reg, rc);
         
         mds->mds_num = value;
 
-        rc = obd_set_info(mds->mds_lmv_exp, strlen("inter_mds"),
+        rc = obd_set_info(mds->mds_md_exp, strlen("inter_mds"),
                           "inter_mds", 0, NULL);
         if (rc)
                 GOTO(err_reg, rc);
 
-        mds->mds_lmv_connected = 1;
-        up(&mds->mds_lmv_sem);
+        if (mds->mds_mds_sec) {
+                rc = obd_set_info(mds->mds_md_exp, strlen("sec"), "sec",
+                                  strlen(mds->mds_mds_sec), mds->mds_mds_sec);
+                if (rc)
+                        GOTO(err_reg, rc);
+        }
+
+        mds->mds_md_connected = 1;
+        up(&mds->mds_md_sem);
        RETURN(0);
 
 err_reg:
-        obd_register_observer(mds->mds_lmv_obd, NULL);
+        obd_register_observer(mds->mds_md_obd, NULL);
 err_discon:
-        obd_disconnect(mds->mds_lmv_exp, 0);
-        mds->mds_lmv_exp = NULL;
-        mds->mds_lmv_obd = ERR_PTR(rc);
+        obd_disconnect(mds->mds_md_exp, 0);
+        mds->mds_md_exp = NULL;
+        mds->mds_md_obd = ERR_PTR(rc);
 err_last:
-        up(&mds->mds_lmv_sem);
+        up(&mds->mds_md_sem);
         return rc;
 }
 
-int mds_lmv_postsetup(struct obd_device *obd)
+int mds_md_postsetup(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
         int rc = 0;
         ENTRY;
 
-        if (mds->mds_lmv_exp) {
-                rc = obd_init_ea_size(mds->mds_lmv_exp,
+        if (mds->mds_md_exp) {
+                rc = obd_init_ea_size(mds->mds_md_exp,
                                       mds->mds_max_mdsize,
                                       mds->mds_max_cookiesize);
         }
@@ -142,26 +155,26 @@ int mds_lmv_postsetup(struct obd_device *obd)
         RETURN(rc);
 }
 
-int mds_lmv_disconnect(struct obd_device *obd, int flags)
+int mds_md_disconnect(struct obd_device *obd, int flags)
 {
         struct mds_obd *mds = &obd->u.mds;
         int rc = 0;
         ENTRY;
 
-        if (!mds->mds_lmv_connected)
+        if (!mds->mds_md_connected)
                 RETURN(0);
 
-        down(&mds->mds_lmv_sem);
-        if (!IS_ERR(mds->mds_lmv_obd) && mds->mds_lmv_exp != NULL) {
-                LASSERT(mds->mds_lmv_connected);
+        down(&mds->mds_md_sem);
+        if (!IS_ERR(mds->mds_md_obd) && mds->mds_md_exp != NULL) {
+                LASSERT(mds->mds_md_connected);
                 
-                obd_register_observer(mds->mds_lmv_obd, NULL);
+                obd_register_observer(mds->mds_md_obd, NULL);
 
                 if (flags & OBD_OPT_FORCE) {
                         struct obd_device *lmv_obd;
                         struct obd_ioctl_data ioc_data = { 0 };
                         
-                        lmv_obd = class_exp2obd(mds->mds_lmv_exp);
+                        lmv_obd = class_exp2obd(mds->mds_md_exp);
                         if (lmv_obd == NULL)
                                 GOTO(out, rc = 0);
 
@@ -175,7 +188,7 @@ int mds_lmv_disconnect(struct obd_device *obd, int flags)
                          * it here. --umka.
                          */
                         lmv_obd->obd_no_recov = 1;
-                        obd_iocontrol(IOC_OSC_SET_ACTIVE, mds->mds_lmv_exp,
+                        obd_iocontrol(IOC_OSC_SET_ACTIVE, mds->mds_md_exp,
                                       sizeof(ioc_data), &ioc_data, NULL);
                 }
 
@@ -184,38 +197,60 @@ int mds_lmv_disconnect(struct obd_device *obd, int flags)
                  * disconnected by class_disconnect_exports()) then we just need
                  * to drop our ref.
                  */
-                mds->mds_lmv_connected = 0;
-                rc = obd_disconnect(mds->mds_lmv_exp, flags);
+                mds->mds_md_connected = 0;
+                rc = obd_disconnect(mds->mds_md_exp, flags);
                 if (rc)
-                        class_export_put(mds->mds_lmv_exp);
+                        class_export_put(mds->mds_md_exp);
 
         out:
-                mds->mds_lmv_exp = NULL;
-                mds->mds_lmv_obd = NULL;
+                mds->mds_md_exp = NULL;
+                mds->mds_md_obd = NULL;
         }
-        up(&mds->mds_lmv_sem);
+        up(&mds->mds_md_sem);
         RETURN(rc);
 }
 
-int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode,
-                     struct mea **mea, int *mea_size)
+int mds_md_reconnect(struct obd_device *obd)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct obd_statfs osfs;
+        int err;
+        ENTRY;
+
+        /* We don't know state of connections to another MDSes
+         * before the failure. If MDS we were connected to before
+         * the failure gets failed, then it will wait for us to
+         * reconnect and will timed recovery out. bug 4920 */
+        if (mds->mds_md_connected == 0)
+                RETURN(0);
+        if (mds->mds_md_obd == NULL)
+                RETURN(0);
+
+        err = obd_statfs(mds->mds_md_obd, &osfs, jiffies - HZ);
+        if (err)
+                CERROR("can't reconnect to MDSes after recovery: %d\n", err);
+        RETURN(0);
+}
+
+int mds_md_get_attr(struct obd_device *obd, struct inode *inode,
+                    struct mea **mea, int *mea_size)
 {
         struct mds_obd *mds = &obd->u.mds;
        int rc;
         ENTRY;
 
-       if (!mds->mds_lmv_obd)
+       if (!mds->mds_md_obd)
                RETURN(0);
         if (!S_ISDIR(inode->i_mode))
                 RETURN(0);
 
        /* first calculate mea size */
-        *mea_size = obd_alloc_diskmd(mds->mds_lmv_exp,
+        *mea_size = obd_alloc_diskmd(mds->mds_md_exp,
                                      (struct lov_mds_md **)mea);
         if (*mea_size < 0 || *mea == NULL)
                 return *mea_size < 0 ? *mea_size : -EINVAL;
 
-        rc = mds_get_md(obd, inode, *mea, mea_size, 1);
+        rc = mds_get_md(obd, inode, *mea, mea_size, 1, 1);
 
        if (rc <= 0) {
                OBD_FREE(*mea, *mea_size);
@@ -360,7 +395,7 @@ static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum)
                 ca->brwc.count = PAGE_SIZE;
                 ca->brwc.flag = 0;
                 ca->oa.o_mds = mdsnum;
-                rc = obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
+                rc = obd_brw(OBD_BRW_WRITE, mds->mds_md_exp, &ca->oa,
                              (struct lov_stripe_md *) dc->mea,
                              1, &ca->brwc, NULL);
                 if (rc)
@@ -551,7 +586,7 @@ int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry)
         int rc, size;
 
        /* clustered MD ? */
-       if (!mds->mds_lmv_obd)
+       if (!mds->mds_md_obd)
                return MDS_NO_SPLITTABLE;
 
         /* inode exist? */
@@ -574,7 +609,7 @@ int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry)
         if (dentry->d_inode->i_size < MAX_DIR_SIZE)
                 return MDS_NO_SPLIT_EXPECTED;
 
-        mds_get_lmv_attr(obd, dentry->d_inode, &mea, &size);
+        mds_md_get_attr(obd, dentry->d_inode, &mea, &size);
         if (mea) {
                 /* already splitted or slave object: shouldn't be splitted */
                 rc = MDS_NO_SPLITTABLE;
@@ -624,13 +659,13 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
 
         if (mea == NULL)
                 mea = &tmea;
-        mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL);
+        mea_size = obd_size_diskmd(mds->mds_md_exp, NULL);
 
         /* FIXME: Actually we may only want to allocate enough space for
          * necessary amount of stripes, but on the other hand with this
          * approach of allocating maximal possible amount of MDS slots,
          * it would be easier to split the dir over more MDSes */
-        rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *)mea);
+        rc = obd_alloc_diskmd(mds->mds_md_exp, (void *)mea);
         if (rc < 0) {
                 CERROR("obd_alloc_diskmd() failed, error %d.\n", rc);
                 RETURN(rc);
@@ -679,7 +714,7 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
         CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
                obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
                         
-        rc = obd_create(mds->mds_lmv_exp, oa,
+        rc = obd_create(mds->mds_md_exp, oa, NULL, 0,
                         (struct lov_stripe_md **)mea, NULL);
         if (rc) {
                 CERROR("Can't create remote inode, rc = %d\n", rc);
@@ -699,7 +734,7 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
                 GOTO(err_oa, rc = PTR_ERR(handle));
         }
 
-       rc = fsfilt_set_md(obd, dir, handle, *mea, mea_size);
+       rc = fsfilt_set_md(obd, dir, handle, *mea, mea_size, EA_MEA);
         if (rc) {
                 up(&dir->i_sem);
                 CERROR("fsfilt_set_md() failed, error %d.\n", rc);
@@ -719,7 +754,7 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
        /* 3) read through the dir and distribute it over objects */
         rc = scan_and_distribute(obd, dentry, *mea);
        if (mea == &tmea)
-                obd_free_diskmd(mds->mds_lmv_exp, (struct lov_mds_md **)mea);
+                obd_free_diskmd(mds->mds_md_exp, (struct lov_mds_md **)mea);
         if (rc) {
                 CERROR("scan_and_distribute() failed, error %d.\n", rc);
                 RETURN(rc);
@@ -882,16 +917,30 @@ int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
         RETURN(rc);
 }
 
-int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int flags)
+int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int flags,
+                        struct ptlrpc_peer *peer, struct inode *parent)
 {
-        struct lmv_obd *lmv;
         struct mds_obd *mds = &obd->u.mds;
+        struct lmv_obd *lmv;
         int i = mds->mds_num;
-
+        char peer_str[PTL_NALFMT_SIZE];
         if (flags & REC_REINT_CREATE) { 
                 i = mds->mds_num;
-        } else if (mds->mds_lmv_exp) {
-                lmv = &mds->mds_lmv_exp->exp_obd->u.lmv;
+        } else if (mds->mds_md_exp != NULL && peer != NULL) {
+                LASSERT(parent != NULL);
+                /* distribute only at root level */
+                lmv = &mds->mds_md_exp->exp_obd->u.lmv;
+                if (parent->i_ino != id_ino(&mds->mds_rootid)) {
+                        i = mds->mds_num;
+                } else {
+                        __u64 nid = peer->peer_id.nid;
+                        __u64 count = lmv->desc.ld_tgt_count;
+                        i = do_div(nid, count);
+                        CWARN("client from %s creates top dir %*s on mds #%d\n",
+                              ptlrpc_peernid2str(peer,peer_str), len, name, i+1);
+                }
+        } else if (mds->mds_md_exp) {
+                lmv = &mds->mds_md_exp->exp_obd->u.lmv;
                 i = raw_name2idx(MEA_MAGIC_LAST_CHAR, lmv->desc.ld_tgt_count, name, len);
         }
         RETURN(i);
@@ -901,7 +950,7 @@ int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
                         struct lustre_handle **rlockh)
 {
         struct mds_obd *mds = &obd->u.mds;
-        struct mdc_op_data op_data;
+        struct mdc_op_data *op_data;
         struct lookup_intent it;
         struct mea *mea = NULL;
         int mea_size, rc;
@@ -913,15 +962,15 @@ int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
         LASSERT(dentry->d_inode != NULL);
 
        /* clustered MD ? */
-       if (!mds->mds_lmv_obd)
+       if (!mds->mds_md_obd)
                RETURN(0);
 
         /* a dir can be splitted only */
         if (!S_ISDIR(dentry->d_inode->i_mode))
                 RETURN(0);
 
-        rc = mds_get_lmv_attr(obd, dentry->d_inode,
-                              &mea, &mea_size);
+        rc = mds_md_get_attr(obd, dentry->d_inode,
+                             &mea, &mea_size);
         if (rc)
                 RETURN(rc);
 
@@ -944,35 +993,49 @@ int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
                 GOTO(cleanup, rc = -ENOMEM);
 
         memset(*rlockh, 0, handle_size);
-        memset(&op_data, 0, sizeof(op_data));
+        OBD_ALLOC(op_data, sizeof(*op_data));
+        if (op_data == NULL) {
+                OBD_FREE(*rlockh, handle_size);
+                RETURN(-ENOMEM);
+        }
+        memset(op_data, 0, sizeof(*op_data));
 
-        op_data.mea1 = mea;
+        op_data->mea1 = mea;
         it.it_op = IT_UNLINK;
 
-        rc = md_enqueue(mds->mds_lmv_exp, LDLM_IBITS, &it, LCK_EX,
-                        &op_data, *rlockh, NULL, 0, ldlm_completion_ast,
+        OBD_ALLOC(it.d.fs_data, sizeof(struct lustre_intent_data));
+
+        rc = md_enqueue(mds->mds_md_exp, LDLM_IBITS, &it, LCK_EX,
+                        op_data, *rlockh, NULL, 0, ldlm_completion_ast,
                         mds_blocking_ast, NULL);
+        OBD_FREE(op_data, sizeof(*op_data));
+        OBD_FREE(it.d.fs_data, sizeof(struct lustre_intent_data));
+        EXIT;
 cleanup:
         OBD_FREE(mea, mea_size);
-        RETURN(rc);
+        return rc;
 }
 
 void mds_unlock_slave_objs(struct obd_device *obd, struct dentry *dentry,
-                        struct lustre_handle *lockh)
+                           struct lustre_handle *lockh)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct mea *mea = NULL;
         int mea_size, rc, i;
+        ENTRY;
 
-        if (lockh == NULL)
+        if (lockh == NULL) {
+                EXIT;
                 return;
+        }
 
-       LASSERT(mds->mds_lmv_obd != NULL);
+       LASSERT(mds->mds_md_obd != NULL);
         LASSERT(S_ISDIR(dentry->d_inode->i_mode));
 
-        rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
+        rc = mds_md_get_attr(obd, dentry->d_inode, &mea, &mea_size);
         if (rc) {
                 CERROR("locks are leaked\n");
+                EXIT;
                 return;
         }
         LASSERT(mea_size != 0);
@@ -990,26 +1053,27 @@ void mds_unlock_slave_objs(struct obd_device *obd, struct dentry *dentry,
 
         OBD_FREE(lockh, sizeof(struct lustre_handle) * mea->mea_count);
         OBD_FREE(mea, mea_size);
+        EXIT;
 }
 
 int mds_unlink_slave_objs(struct obd_device *obd, struct dentry *dentry)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct ptlrpc_request *req = NULL;
-        struct mdc_op_data op_data;
+        struct mdc_op_data *op_data;
         struct mea *mea = NULL;
         int mea_size, rc;
         ENTRY;
 
        /* clustered MD ? */
-       if (!mds->mds_lmv_obd)
+       if (!mds->mds_md_obd)
                RETURN(0);
 
         /* a dir can be splitted only */
         if (!S_ISDIR(dentry->d_inode->i_mode))
                 RETURN(0);
 
-        rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
+        rc = mds_md_get_attr(obd, dentry->d_inode, &mea, &mea_size);
         if (rc)
                 RETURN(rc);
 
@@ -1020,12 +1084,17 @@ int mds_unlink_slave_objs(struct obd_device *obd, struct dentry *dentry)
                 GOTO(cleanup, rc = 0);
 
         CDEBUG(D_OTHER, "%s: unlink slaves for %lu/%lu\n", obd->obd_name,
-               (unsigned long) dentry->d_inode->i_ino,
-               (unsigned long) dentry->d_inode->i_generation);
+               (unsigned long)dentry->d_inode->i_ino,
+               (unsigned long)dentry->d_inode->i_generation);
 
-        memset(&op_data, 0, sizeof(op_data));
-        op_data.mea1 = mea;
-        rc = md_unlink(mds->mds_lmv_exp, &op_data, &req);
+        OBD_ALLOC(op_data, sizeof(*op_data));
+        if (op_data == NULL)
+                RETURN(-ENOMEM);
+        
+        memset(op_data, 0, sizeof(*op_data));
+        op_data->mea1 = mea;
+        rc = md_unlink(mds->mds_md_exp, op_data, &req);
+        OBD_FREE(op_data, sizeof(*op_data));
         LASSERT(req == NULL);
         EXIT;
 cleanup:
@@ -1118,7 +1187,6 @@ int mds_lock_and_check_slave(int offset, struct ptlrpc_request *req,
                 CERROR("Can't unpack security desc\n");
                 GOTO(cleanup, rc = -EFAULT);
         }
-        mds_squash_root(&obd->u.mds, rsd, &req->rq_peer.peer_id.nid); 
 
         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
                                   lustre_swab_mds_body);
@@ -1138,20 +1206,25 @@ int mds_lock_and_check_slave(int offset, struct ptlrpc_request *req,
         }
         cleanup_phase = 1;
 
-        LASSERT(S_ISDIR(dentry->d_inode->i_mode));
+       /* 
+        * handling the case when remote MDS checks if dir is empty 
+        * before rename. But it also does it for all entries, because
+        * inode is stored here and remote MDS does not know if rename
+        * point to dir or to reg file. So we check it here. 
+        */
+       if (!S_ISDIR(dentry->d_inode->i_mode))
+               GOTO(cleanup, rc = 0);
 
-        rc = mds_init_ucred(&uc, rsd);
+        rc = mds_init_ucred(&uc, req, rsd);
         if (rc) {
-                CERROR("can't init ucred\n");
                 GOTO(cleanup, rc);
         }
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
+       rc = mds_is_dir_empty(obd, dentry) ? 0 : -ENOTEMPTY;
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
 
-        rc = 0;
-        if (!mds_is_dir_empty(obd, dentry))
-                rc = -ENOTEMPTY;
-
+        mds_exit_ucred(&uc);
         EXIT;
 cleanup:
         switch(cleanup_phase) {
@@ -1159,8 +1232,6 @@ cleanup:
                 if (rc)
                         ldlm_lock_decref(lockh, LCK_EX);
                 l_dput(dentry);
-                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
-                mds_exit_ucred(&uc);
         default:
                 break;
         }
@@ -1168,49 +1239,41 @@ cleanup:
 }
 
 int mds_convert_mea_ea(struct obd_device *obd, struct inode *inode,
-                       struct lov_mds_md *lmm, int lmmsize)
+                       struct lov_mds_md *lmm, int lmm_size)
 {
-        int i, rc, err, size;
+        struct lov_stripe_md *lsm = NULL;
         struct mea_old *old;
         struct mea *mea;
-        struct mea *new;
         void *handle;
+        int rc, err;
         ENTRY;
 
-        mea = (struct mea *) lmm;
+        mea = (struct mea *)lmm;
+        old = (struct mea_old *)lmm;
+
         if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
-                mea->mea_magic == MEA_MAGIC_ALL_CHARS)
+            mea->mea_magic == MEA_MAGIC_ALL_CHARS)
                 RETURN(0);
 
-        old = (struct mea_old *) lmm;
-        
-        rc = sizeof(struct lustre_id) * old->mea_count + 
-                sizeof(struct mea_old);
-        
-        if (old->mea_count > 256 || old->mea_master > 256 || lmmsize < rc
-                        || old->mea_master > old->mea_count) {
-                CWARN("unknown MEA format, dont convert it\n");
-                CWARN("  count %u, master %u, size %u\n",
-                      old->mea_count, old->mea_master, rc);
-                RETURN(0);
-        }
-                
-        CWARN("converting MEA EA on %lu/%u from V0 to V1 (%u/%u)\n",
-              inode->i_ino, inode->i_generation, old->mea_count, 
-              old->mea_master);
+        /*
+         * making MDS try LOV EA converting in the non-LMV configuration
+         * cases.
+         */
+        if (!obd->u.mds.mds_md_exp)
+                RETURN(-EINVAL);
 
-        size = sizeof(struct lustre_id) * old->mea_count + 
-                sizeof(struct mea);
-        
-        OBD_ALLOC(new, size);
-        if (new == NULL)
-                RETURN(-ENOMEM);
+        CDEBUG(D_INODE, "converting MEA EA on %lu/%u from V0 to V1 (%u/%u)\n",
+               inode->i_ino, inode->i_generation, old->mea_count, 
+               old->mea_master);
+
+        rc = obd_unpackmd(obd->u.mds.mds_md_exp, &lsm, lmm, lmm_size);
+        if (rc < 0)
+                GOTO(conv_end, rc);
 
-        new->mea_magic = MEA_MAGIC_LAST_CHAR;
-        new->mea_count = old->mea_count;
-        new->mea_master = old->mea_master;
-        for (i = 0; i < new->mea_count; i++)
-                new->mea_ids[i] = old->mea_ids[i];
+        rc = obd_packmd(obd->u.mds.mds_md_exp, &lmm, lsm);
+        if (rc < 0)
+                GOTO(conv_free, rc);
+        lmm_size = rc;
 
         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
         if (IS_ERR(handle)) {
@@ -1218,17 +1281,14 @@ int mds_convert_mea_ea(struct obd_device *obd, struct inode *inode,
                 GOTO(conv_free, rc);
         }
 
-        rc = fsfilt_set_md(obd, inode, handle, (struct lov_mds_md *) new, size);
-        if (rc > lmmsize)
-                size = lmmsize;
-        memcpy(lmm, new, size);
-
+        rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, EA_MEA);
         err = fsfilt_commit(obd, obd->u.mds.mds_sb, inode, handle, 0);
         if (!rc)
-                rc = err ? err : size;
-        EXIT;
+                rc = err ? err : lmm_size;
+        GOTO(conv_free, rc);
 conv_free:
-        OBD_FREE(new, size);
+        obd_free_memmd(obd->u.mds.mds_md_exp, &lsm);
+conv_end:
         return rc;
 }