Whamcloud - gitweb
LU-1347 style: removes obsolete EXPORT_SYMTAB macros
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
index 5417c67..1cd161b 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_LMV
 #ifdef __KERNEL__
 #include <linux/slab.h>
@@ -52,7 +49,6 @@
 #include <liblustre.h>
 #endif
 
-#include <lustre/lustre_idl.h>
 #include <lustre_log.h>
 #include <obd_support.h>
 #include <lustre_lib.h>
@@ -65,7 +61,7 @@
 
 /* object cache. */
 cfs_mem_cache_t *lmv_object_cache;
-atomic_t lmv_object_count = ATOMIC_INIT(0);
+cfs_atomic_t lmv_object_count = CFS_ATOMIC_INIT(0);
 
 static void lmv_activate_target(struct lmv_obd *lmv,
                                 struct lmv_tgt_desc *tgt,
@@ -97,7 +93,7 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
                lmv, uuid->uuid, activate);
 
-        spin_lock(&lmv->lmv_lock);
+        cfs_spin_lock(&lmv->lmv_lock);
         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
                 if (tgt->ltd_exp == NULL)
                         continue;
@@ -127,13 +123,13 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
                 GOTO(out_lmv_lock, rc);
         }
 
-        CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, 
+        CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
                activate ? "" : "in");
         lmv_activate_target(lmv, tgt, activate);
         EXIT;
 
  out_lmv_lock:
-        spin_unlock(&lmv->lmv_lock);
+        cfs_spin_unlock(&lmv->lmv_lock);
         return rc;
 }
 
@@ -146,7 +142,7 @@ static int lmv_set_mdc_data(struct lmv_obd *lmv, struct obd_uuid *uuid,
 
         LASSERT(data != NULL);
 
-        spin_lock(&lmv->lmv_lock);
+        cfs_spin_lock(&lmv->lmv_lock);
         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
                 if (tgt->ltd_exp == NULL)
                         continue;
@@ -156,7 +152,7 @@ static int lmv_set_mdc_data(struct lmv_obd *lmv, struct obd_uuid *uuid,
                         break;
                 }
         }
-        spin_unlock(&lmv->lmv_lock);
+        cfs_spin_unlock(&lmv->lmv_lock);
         RETURN(0);
 }
 
@@ -199,7 +195,7 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
         } else if (ev == OBD_NOTIFY_OCD) {
                 conn_data = &watched->u.cli.cl_import->imp_connect_data;
 
-                /* 
+                /*
                  * Set connect data to desired target, update exp_connect_flags.
                  */
                 rc = lmv_set_mdc_data(lmv, uuid, conn_data);
@@ -219,14 +215,14 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
         }
 #if 0
         else if (ev == OBD_NOTIFY_DISCON) {
-                /* 
-                 * For disconnect event, flush fld cache for failout MDS case. 
+                /*
+                 * For disconnect event, flush fld cache for failout MDS case.
                  */
                 fld_client_flush(&lmv->lmv_fld);
         }
 #endif
-        /* 
-         * Pass the notification up the chain. 
+        /*
+         * Pass the notification up the chain.
          */
         if (obd->obd_observer)
                 rc = obd_notify(obd->obd_observer, watched, ev, data);
@@ -236,10 +232,10 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
 
 /**
  * This is fake connect function. Its purpose is to initialize lmv and say
- * caller that everything is okay. Real connection will be performed later. 
+ * caller that everything is okay. Real connection will be performed later.
  */
 static int lmv_connect(const struct lu_env *env,
-                       struct lustre_handle *conn, struct obd_device *obd,
+                       struct obd_export **exp, struct obd_device *obd,
                        struct obd_uuid *cluuid, struct obd_connect_data *data,
                        void *localdata)
 {
@@ -247,29 +243,30 @@ static int lmv_connect(const struct lu_env *env,
         struct proc_dir_entry *lmv_proc_dir;
 #endif
         struct lmv_obd        *lmv = &obd->u.lmv;
-        struct obd_export     *exp;
+        struct lustre_handle  conn = { 0 };
         int                    rc = 0;
         ENTRY;
 
-        rc = class_connect(conn, obd, cluuid);
-        if (rc) {
-                CERROR("class_connection() returned %d\n", rc);
-                RETURN(rc);
-        }
-
-        exp = class_conn2export(conn);
-
-        /* 
+        /*
          * We don't want to actually do the underlying connections more than
-         * once, so keep track. 
+         * once, so keep track.
          */
         lmv->refcount++;
         if (lmv->refcount > 1) {
-                class_export_put(exp);
+                *exp = NULL;
                 RETURN(0);
         }
 
-        lmv->exp = exp;
+        rc = class_connect(&conn, obd, cluuid);
+        if (rc) {
+                CERROR("class_connection() returned %d\n", rc);
+                RETURN(rc);
+        }
+
+        *exp = class_conn2export(&conn);
+        class_export_get(*exp);
+
+        lmv->exp = *exp;
         lmv->connected = 0;
         lmv->cluuid = *cluuid;
 
@@ -286,11 +283,11 @@ static int lmv_connect(const struct lu_env *env,
         }
 #endif
 
-        /* 
+        /*
          * All real clients should perform actual connection right away, because
          * it is possible, that LMV will not have opportunity to connect targets
          * and MDC stuff will be called directly, for instance while reading
-         * ../mdc/../kbytesfree procfs file, etc. 
+         * ../mdc/../kbytesfree procfs file, etc.
          */
         if (data->ocd_connect_flags & OBD_CONNECT_REAL)
                 rc = lmv_check_connect(obd);
@@ -322,7 +319,7 @@ static void lmv_set_timeouts(struct obd_device *obd)
                 if (tgts->ltd_exp == NULL)
                         continue;
 
-                obd_set_info_async(tgts->ltd_exp, sizeof(KEY_INTERMDS),
+                obd_set_info_async(NULL, tgts->ltd_exp, sizeof(KEY_INTERMDS),
                                    KEY_INTERMDS, 0, NULL, NULL);
         }
 }
@@ -383,7 +380,6 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
         struct obd_uuid         *cluuid = &lmv->cluuid;
         struct obd_connect_data *mdc_data = NULL;
         struct obd_uuid          lmv_mdc_uuid = { "LMV_MDC_UUID" };
-        struct lustre_handle     conn = {0, };
         struct obd_device       *mdc_obd;
         struct obd_export       *mdc_exp;
         struct lu_fld_target     target;
@@ -407,16 +403,14 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
                 RETURN(-EINVAL);
         }
 
-        rc = obd_connect(NULL, &conn, mdc_obd, &lmv_mdc_uuid,
+        rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
                          &lmv->conn_data, NULL);
         if (rc) {
                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
                 RETURN(rc);
         }
 
-        mdc_exp = class_conn2export(&conn);
-
-        /* 
+        /*
          * Init fid sequence client for this mdc and add new fld target.
          */
         rc = obd_fid_init(mdc_exp);
@@ -440,7 +434,7 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
         }
 
         if (obd->obd_observer) {
-                /* 
+                /*
                  * Tell the observer about the new target.
                  */
                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
@@ -455,7 +449,7 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
         tgt->ltd_exp = mdc_exp;
         lmv->desc.ld_active_tgt_count++;
 
-        /* 
+        /*
          * Copy connect data, it may be used later.
          */
         lmv->datas[tgt->ltd_idx] = *mdc_data;
@@ -465,22 +459,20 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
 
         CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
-                atomic_read(&obd->obd_refcount));
+                cfs_atomic_read(&obd->obd_refcount));
 
 #ifdef __KERNEL__
         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
         if (lmv_proc_dir) {
                 struct proc_dir_entry *mdc_symlink;
-                char name[MAX_STRING_SIZE + 1];
 
                 LASSERT(mdc_obd->obd_type != NULL);
                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
-                name[MAX_STRING_SIZE] = '\0';
-                snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
-                         mdc_obd->obd_type->typ_name,
-                         mdc_obd->obd_name);
-                mdc_symlink = proc_symlink(mdc_obd->obd_name,
-                                           lmv_proc_dir, name);
+                mdc_symlink = lprocfs_add_symlink(mdc_obd->obd_name,
+                                                  lmv_proc_dir,
+                                                  "../../../%s/%s",
+                                                  mdc_obd->obd_type->typ_name,
+                                                  mdc_obd->obd_name);
                 if (mdc_symlink == NULL) {
                         CERROR("Could not register LMV target "
                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
@@ -523,24 +515,24 @@ int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid)
                         RETURN(-EINVAL);
                 }
 
-                rc = obd_llog_init(obd, &obd->obd_olg, mdc_obd, 0, NULL, tgt_uuid);
+                rc = obd_llog_init(obd, &obd->obd_olg, mdc_obd, NULL);
                 if (rc) {
                         lmv_init_unlock(lmv);
                         CERROR("lmv failed to setup llogging subsystems\n");
                 }
         }
-        spin_lock(&lmv->lmv_lock);
+        cfs_spin_lock(&lmv->lmv_lock);
         tgt = lmv->tgts + lmv->desc.ld_tgt_count++;
         tgt->ltd_uuid = *tgt_uuid;
-        spin_unlock(&lmv->lmv_lock);
+        cfs_spin_unlock(&lmv->lmv_lock);
 
         if (lmv->connected) {
                 rc = lmv_connect_mdc(obd, tgt);
                 if (rc) {
-                        spin_lock(&lmv->lmv_lock);
+                        cfs_spin_lock(&lmv->lmv_lock);
                         lmv->desc.ld_tgt_count--;
                         memset(tgt, 0, sizeof(*tgt));
-                        spin_unlock(&lmv->lmv_lock);
+                        cfs_spin_unlock(&lmv->lmv_lock);
                 } else {
                         int easize = sizeof(struct lmv_stripe_md) +
                                      lmv->desc.ld_tgt_count *
@@ -572,6 +564,7 @@ int lmv_check_connect(struct obd_device *obd)
         }
 
         if (lmv->desc.ld_tgt_count == 0) {
+                lmv_init_unlock(lmv);
                 CERROR("%s: no targets configured.\n", obd->obd_name);
                 RETURN(-EINVAL);
         }
@@ -630,8 +623,11 @@ static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
 
         mdc_obd = class_exp2obd(tgt->ltd_exp);
 
-        if (mdc_obd)
+        if (mdc_obd) {
+                mdc_obd->obd_force = obd->obd_force;
+                mdc_obd->obd_fail = obd->obd_fail;
                 mdc_obd->obd_no_recov = obd->obd_no_recov;
+        }
 
 #ifdef __KERNEL__
         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
@@ -684,8 +680,8 @@ static int lmv_disconnect(struct obd_export *exp)
         if (!lmv->tgts)
                 goto out_local;
 
-        /* 
-         * Only disconnect the underlying layers on the final disconnect. 
+        /*
+         * Only disconnect the underlying layers on the final disconnect.
          */
         lmv->refcount--;
         if (lmv->refcount != 0)
@@ -725,12 +721,13 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
 {
         struct obd_device    *obddev = class_exp2obd(exp);
         struct lmv_obd       *lmv = &obddev->u.lmv;
-        int                   i;
+        int                   i = 0;
         int                   rc = 0;
         int                   set = 0;
+        int                   count = lmv->desc.ld_tgt_count;
         ENTRY;
 
-        if (lmv->desc.ld_tgt_count == 0)
+        if (count == 0)
                 RETURN(-ENOTTY);
 
         switch (cmd) {
@@ -741,9 +738,7 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                 __u32 index;
 
                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
-                LASSERT(data->ioc_plen1 == sizeof(struct obd_statfs));
-
-                if ((index >= lmv->desc.ld_tgt_count))
+                if ((index >= count))
                         RETURN(-ENODEV);
 
                 if (!lmv->tgts[index].ltd_active)
@@ -753,26 +748,101 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                 if (!mdc_obd)
                         RETURN(-EINVAL);
 
-                rc = obd_statfs(mdc_obd, &stat_buf,
-                                cfs_time_current_64() - HZ, 0);
+                /* copy UUID */
+                if (cfs_copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
+                                     min((int) data->ioc_plen2,
+                                         (int) sizeof(struct obd_uuid))))
+                        RETURN(-EFAULT);
+
+                rc = obd_statfs(NULL, lmv->tgts[index].ltd_exp, &stat_buf,
+                                cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
+                                0);
                 if (rc)
                         RETURN(rc);
-                if (copy_to_user(data->ioc_pbuf1, &stat_buf, data->ioc_plen1))
-                        RETURN(rc);
-                rc = copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
-                                  data->ioc_plen2);
+                if (cfs_copy_to_user(data->ioc_pbuf1, &stat_buf,
+                                     min((int) data->ioc_plen1,
+                                         (int) sizeof(stat_buf))))
+                        RETURN(-EFAULT);
                 break;
         }
+        case OBD_IOC_QUOTACTL: {
+                struct if_quotactl *qctl = karg;
+                struct lmv_tgt_desc *tgt = NULL;
+                struct obd_quotactl *oqctl;
+
+                if (qctl->qc_valid == QC_MDTIDX) {
+                        if (qctl->qc_idx < 0 || count <= qctl->qc_idx)
+                                RETURN(-EINVAL);
+
+                        tgt = &lmv->tgts[qctl->qc_idx];
+                        if (!tgt->ltd_exp)
+                                RETURN(-EINVAL);
+                } else if (qctl->qc_valid == QC_UUID) {
+                        for (i = 0; i < count; i++) {
+                                tgt = &lmv->tgts[i];
+                                if (!obd_uuid_equals(&tgt->ltd_uuid,
+                                                     &qctl->obd_uuid))
+                                        continue;
+
+                                if (tgt->ltd_exp == NULL)
+                                        RETURN(-EINVAL);
+
+                                break;
+                        }
+                } else {
+                        RETURN(-EINVAL);
+                }
+
+                if (i >= count)
+                        RETURN(-EAGAIN);
+
+                LASSERT(tgt && tgt->ltd_exp);
+                OBD_ALLOC_PTR(oqctl);
+                if (!oqctl)
+                        RETURN(-ENOMEM);
+
+                QCTL_COPY(oqctl, qctl);
+                rc = obd_quotactl(tgt->ltd_exp, oqctl);
+                if (rc == 0) {
+                        QCTL_COPY(qctl, oqctl);
+                        qctl->qc_valid = QC_MDTIDX;
+                        qctl->obd_uuid = tgt->ltd_uuid;
+                }
+                OBD_FREE_PTR(oqctl);
+                break;
+        }
+        case OBD_IOC_CHANGELOG_SEND:
+        case OBD_IOC_CHANGELOG_CLEAR: {
+                struct ioc_changelog *icc = karg;
+
+                if (icc->icc_mdtindex >= count)
+                        RETURN(-ENODEV);
+
+                rc = obd_iocontrol(cmd, lmv->tgts[icc->icc_mdtindex].ltd_exp,
+                                   sizeof(*icc), icc, NULL);
+                break;
+        }
+        case LL_IOC_GET_CONNECT_FLAGS: {
+                rc = obd_iocontrol(cmd, lmv->tgts[0].ltd_exp, len, karg, uarg);
+                break;
+        }
+
         default : {
-                for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+                for (i = 0; i < count; i++) {
                         int err;
+                        struct obd_device *mdc_obd;
 
                         if (lmv->tgts[i].ltd_exp == NULL)
                                 continue;
-
+                        /* ll_umount_begin() sets force flag but for lmv, not
+                         * mdc. Let's pass it through */
+                        mdc_obd = class_exp2obd(lmv->tgts[i].ltd_exp);
+                        mdc_obd->obd_force = obddev->obd_force;
                         err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len,
                                             karg, uarg);
-                        if (err) {
+                        if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
+                                RETURN(err);
+                        } else if (err) {
                                 if (lmv->tgts[i].ltd_active) {
                                         CERROR("error: iocontrol MDC %s on MDT"
                                                "idx %d cmd %x: err = %d\n",
@@ -806,7 +876,7 @@ static int lmv_nid_policy(struct lmv_obd *lmv)
 {
         struct obd_import *imp;
         __u32              id;
-        
+
         /*
          * XXX: To get nid we assume that underlying obd device is mdc.
          */
@@ -835,7 +905,7 @@ static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
 }
 
 /**
- * This is _inode_ placement policy function (not name). 
+ * This is _inode_ placement policy function (not name).
  */
 static int lmv_placement_policy(struct obd_device *obd,
                                 struct md_op_data *op_data,
@@ -912,41 +982,28 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
         ENTRY;
 
         tgt = lmv_get_target(lmv, mds);
-    
-        /* 
+
+        /*
          * New seq alloc and FLD setup should be atomic. Otherwise we may find
          * on server that seq in new allocated fid is not yet known.
          */
-        down(&tgt->ltd_fid_sem);
+        cfs_mutex_lock(&tgt->ltd_fid_mutex);
 
         if (!tgt->ltd_active)
                 GOTO(out, rc = -ENODEV);
 
-        /* 
-         * Asking underlaying tgt layer to allocate new fid. 
+        /*
+         * Asking underlaying tgt layer to allocate new fid.
          */
         rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
         if (rc > 0) {
                 LASSERT(fid_is_sane(fid));
-
-                /* 
-                 * Client switches to new sequence, setup FLD. 
-                 */
-                rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid),
-                                       mds, NULL);
-                if (rc) {
-                        /* 
-                         * Delete just allocated fid sequence in case
-                         * of fail back.
-                         */
-                        CERROR("Can't create fld entry, rc %d\n", rc);
-                        obd_fid_delete(tgt->ltd_exp, NULL);
-                }
+                rc = 0;
         }
 
         EXIT;
 out:
-        up(&tgt->ltd_fid_sem);
+        cfs_mutex_unlock(&tgt->ltd_fid_mutex);
         return rc;
 }
 
@@ -955,7 +1012,7 @@ int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
 {
         struct obd_device     *obd = class_exp2obd(exp);
         struct lmv_obd        *lmv = &obd->u.lmv;
-        mdsno_t                mds;
+        mdsno_t                mds = 0;
         int                    rc;
         ENTRY;
 
@@ -1017,7 +1074,7 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 RETURN(-ENOMEM);
 
         for (i = 0; i < LMV_MAX_TGT_COUNT; i++) {
-                sema_init(&lmv->tgts[i].ltd_fid_sem, 1);
+                cfs_mutex_init(&lmv->tgts[i].ltd_fid_mutex);
                 lmv->tgts[i].ltd_idx = i;
         }
 
@@ -1035,8 +1092,8 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         lmv->max_easize = 0;
         lmv->lmv_placement = PLACEMENT_CHAR_POLICY;
 
-        spin_lock_init(&lmv->lmv_lock);
-        sema_init(&lmv->init_sem, 1);
+        cfs_spin_lock_init(&lmv->lmv_lock);
+        cfs_mutex_init(&lmv->init_mutex);
 
         rc = lmv_object_setup(obd);
         if (rc) {
@@ -1048,10 +1105,11 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         lprocfs_obd_setup(obd, lvars.obd_vars);
 #ifdef LPROCFS
         {
-                rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd_status",
+                rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
                                         0444, &lmv_proc_target_fops, obd);
                 if (rc)
-                        CWARN("Error adding target_obd_stats file (%d)\n", rc);
+                        CWARN("%s: error adding LMV target_obd file: rc = %d\n",
+                               obd->obd_name, rc);
        }
 #endif
         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
@@ -1078,7 +1136,6 @@ static int lmv_cleanup(struct obd_device *obd)
         ENTRY;
 
         fld_client_fini(&lmv->lmv_fld);
-        lprocfs_obd_cleanup(obd);
         lmv_object_cleanup(obd);
         OBD_FREE(lmv->datas, lmv->datas_size);
         OBD_FREE(lmv->tgts, lmv->tgts_size);
@@ -1110,9 +1167,10 @@ out:
         RETURN(rc);
 }
 
-static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs,
-                      __u64 max_age, __u32 flags)
+static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
+                      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
 {
+        struct obd_device     *obd = class_exp2obd(exp);
         struct lmv_obd        *lmv = &obd->u.lmv;
         struct obd_statfs     *temp;
         int                    rc = 0;
@@ -1131,7 +1189,7 @@ static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                 if (lmv->tgts[i].ltd_exp == NULL)
                         continue;
 
-                rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, temp,
+                rc = obd_statfs(env, lmv->tgts[i].ltd_exp, temp,
                                 max_age, flags);
                 if (rc) {
                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
@@ -1217,15 +1275,14 @@ static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
         if (IS_ERR(tgt))
                 RETURN(PTR_ERR(tgt));
 
-        rc = md_setxattr(tgt->ltd_exp, fid, oc, valid, name, input, 
+        rc = md_setxattr(tgt->ltd_exp, fid, oc, valid, name, input,
                          input_size, output_size, flags, suppgid,
                          request);
 
         RETURN(rc);
 }
 
-static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
-                       struct obd_capa *oc, obd_valid valid, int ea_size,
+static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
                        struct ptlrpc_request **request)
 {
         struct obd_device       *obd = exp->exp_obd;
@@ -1240,17 +1297,22 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
         if (rc)
                 RETURN(rc);
 
-        tgt = lmv_find_target(lmv, fid);
+        tgt = lmv_find_target(lmv, &op_data->op_fid1);
         if (IS_ERR(tgt))
                 RETURN(PTR_ERR(tgt));
 
-        rc = md_getattr(tgt->ltd_exp, fid, oc, valid, ea_size, request);
+        if (op_data->op_valid & OBD_MD_MDTIDX) {
+                op_data->op_mds = tgt->ltd_idx;
+                RETURN(0);
+        }
+
+        rc = md_getattr(tgt->ltd_exp, op_data, request);
         if (rc)
                 RETURN(rc);
 
-        obj = lmv_object_find_lock(obd, fid);
+        obj = lmv_object_find_lock(obd, &op_data->op_fid1);
 
-        CDEBUG(D_INODE, "GETATTR for "DFID" %s\n", PFID(fid),
+        CDEBUG(D_INODE, "GETATTR for "DFID" %s\n", PFID(&op_data->op_fid1),
                obj ? "(split)" : "");
 
         /*
@@ -1279,7 +1341,7 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
                                 continue;
                         }
 
-                        /* 
+                        /*
                          * Skip master object.
                          */
                         if (lu_fid_eq(&obj->lo_fid, &obj->lo_stripes[i].ls_fid))
@@ -1320,6 +1382,36 @@ static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid,
         RETURN(0);
 }
 
+static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
+                           ldlm_iterator_t it, void *data)
+{
+        struct obd_device   *obd = exp->exp_obd;
+        struct lmv_obd      *lmv = &obd->u.lmv;
+        int                  i;
+        int                  rc;
+        ENTRY;
+
+        rc = lmv_check_connect(obd);
+        if (rc)
+                RETURN(rc);
+
+        CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
+
+        /*
+         * With CMD every object can have two locks in different namespaces:
+         * lookup lock in space of mds storing direntry and update/open lock in
+         * space of mds storing inode.
+         */
+        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+                rc = md_find_cbdata(lmv->tgts[i].ltd_exp, fid, it, data);
+                if (rc)
+                        RETURN(rc);
+        }
+
+        RETURN(rc);
+}
+
+
 static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
                      struct md_open_data *mod, struct ptlrpc_request **request)
 {
@@ -1354,6 +1446,7 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
         struct lmv_tgt_desc     *tgt;
         struct lmv_object       *obj;
         struct lustre_md         md;
+        struct md_op_data       *op_data;
         int                      mealen;
         int                      rc;
         __u64                    valid;
@@ -1368,10 +1461,20 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
         if (IS_ERR(tgt))
                 RETURN(PTR_ERR(tgt));
 
-        /* 
-         * Time to update mea of parent fid. 
+        /*
+         * Time to update mea of parent fid.
          */
-        rc = md_getattr(tgt->ltd_exp, fid, NULL, valid, mealen, &req);
+
+        OBD_ALLOC_PTR(op_data);
+        if (op_data == NULL) 
+                RETURN(-ENOMEM);
+
+        op_data->op_fid1 = *fid;
+        op_data->op_mode = mealen;
+        op_data->op_valid = valid;
+
+        rc = md_getattr(tgt->ltd_exp, op_data, &req);
+        OBD_FREE_PTR(op_data);
         if (rc) {
                 CERROR("md_getattr() failed, error %d\n", rc);
                 GOTO(cleanup, rc);
@@ -1448,7 +1551,7 @@ repeat:
         else if (rc)
                 RETURN(rc);
 
-        CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #"LPU64"\n", 
+        CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
                op_data->op_mds);
 
@@ -1559,8 +1662,8 @@ cleanup:
         OBD_FREE_PTR(op_data2);
 
         if (rc != 0) {
-                /* 
-                 * Drop all taken locks. 
+                /*
+                 * Drop all taken locks.
                  */
                 while (--i >= 0) {
                         if (lockh[i].cookie)
@@ -1598,8 +1701,8 @@ lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
         CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n",
                LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
 
-        /* 
-         * We got LOOKUP lock, but we really need attrs. 
+        /*
+         * We got LOOKUP lock, but we really need attrs.
          */
         pmode = it->d.lustre.it_lock_mode;
         LASSERT(pmode != 0);
@@ -1677,7 +1780,7 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 
         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
                LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
-        
+
         rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
                         lmm, lmmsize, req, extra_lock_flags);
 
@@ -1689,18 +1792,17 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 }
 
 static int
-lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
-                 struct obd_capa *oc, const char *name, int namelen,
-                 obd_valid valid, int ea_size, __u32 suppgid,
+lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data,
                  struct ptlrpc_request **request)
 {
         struct ptlrpc_request   *req = NULL;
         struct obd_device       *obd = exp->exp_obd;
         struct lmv_obd          *lmv = &obd->u.lmv;
-        struct lu_fid            rid = *fid;
+        struct lu_fid            rid = op_data->op_fid1;
         struct lmv_tgt_desc     *tgt;
         struct mdt_body         *body;
         struct lmv_object       *obj;
+        obd_valid                valid = op_data->op_valid;
         int                      rc;
         int                      loop = 0;
         int                      sidx;
@@ -1716,23 +1818,27 @@ repeat:
         obj = lmv_object_find(obd, &rid);
         if (obj) {
                 sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
-                                       name, namelen - 1);
+                                    op_data->op_name, op_data->op_namelen);
                 rid = obj->lo_stripes[sidx].ls_fid;
                 tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds);
+                op_data->op_mds = obj->lo_stripes[sidx].ls_mds;
                 valid &= ~OBD_MD_FLCKSPLIT;
                 lmv_object_put(obj);
         } else {
                 tgt = lmv_find_target(lmv, &rid);
                 valid |= OBD_MD_FLCKSPLIT;
+                op_data->op_mds = tgt->ltd_idx;
         }
         if (IS_ERR(tgt))
                 RETURN(PTR_ERR(tgt));
 
         CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" - "DFID" -> mds #%d\n",
-               namelen, name, PFID(fid), PFID(&rid), tgt->ltd_idx);
+               op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
+               PFID(&rid), tgt->ltd_idx);
 
-        rc = md_getattr_name(tgt->ltd_exp, &rid, oc, name, namelen, valid,
-                             ea_size, suppgid, request);
+        op_data->op_valid = valid;
+        op_data->op_fid1 = rid;
+        rc = md_getattr_name(tgt->ltd_exp, op_data, request);
         if (rc == 0) {
                 body = req_capsule_server_get(&(*request)->rq_pill,
                                               &RMF_MDT_BODY);
@@ -1749,9 +1855,11 @@ repeat:
                                 RETURN(PTR_ERR(tgt));
                         }
 
-                        rc = md_getattr_name(tgt->ltd_exp, &rid, NULL, NULL,
-                                             1, valid | OBD_MD_FLCROSSREF,
-                                             ea_size, suppgid, &req);
+                        op_data->op_fid1 = rid;
+                        op_data->op_valid |= OBD_MD_FLCROSSREF;
+                        op_data->op_namelen = 0;
+                        op_data->op_name = NULL;
+                        rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
                         ptlrpc_req_finished(*request);
                         *request = req;
                 }
@@ -1802,7 +1910,7 @@ static int lmv_early_cancel_slaves(struct obd_export *exp,
         obj = lmv_object_find(obd, op_fid);
         if (obj == NULL)
                 RETURN(-EALREADY);
-                
+
         policy.l_inodebits.bits = bits;
         for (i = 0; i < obj->lo_objcount; i++) {
                 tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds);
@@ -1810,12 +1918,12 @@ static int lmv_early_cancel_slaves(struct obd_export *exp,
                 if (op_tgt != tgt->ltd_idx) {
                         CDEBUG(D_INODE, "EARLY_CANCEL slave "DFID" -> mds #%d\n",
                                PFID(st_fid), tgt->ltd_idx);
-                        rc = md_cancel_unused(tgt->ltd_exp, st_fid, &policy, 
-                                              mode, LDLM_FL_ASYNC, NULL);
+                        rc = md_cancel_unused(tgt->ltd_exp, st_fid, &policy,
+                                              mode, LCF_ASYNC, NULL);
                         if (rc)
                                 GOTO(out_put_obj, rc);
                 } else {
-                        CDEBUG(D_INODE, 
+                        CDEBUG(D_INODE,
                                "EARLY_CANCEL skip operation target %d on "DFID"\n",
                                op_tgt, PFID(st_fid));
                         /*
@@ -1862,9 +1970,9 @@ static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
                         CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
                         policy.l_inodebits.bits = bits;
                         rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
-                                              mode, LDLM_FL_ASYNC, NULL);
+                                              mode, LCF_ASYNC, NULL);
                 } else {
-                        CDEBUG(D_INODE, 
+                        CDEBUG(D_INODE,
                                "EARLY_CANCEL skip operation target %d on "DFID"\n",
                                op_tgt, PFID(fid));
                         op_data->op_flags |= flag;
@@ -1899,53 +2007,41 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
 repeat:
         ++loop;
         LASSERT(loop <= 2);
-        if (op_data->op_namelen != 0) {
-                obj = lmv_object_find(obd, &op_data->op_fid2);
-                if (obj) {
-                        sidx = raw_name2idx(obj->lo_hashtype,
-                                               obj->lo_objcount,
-                                               op_data->op_name,
-                                               op_data->op_namelen);
-                        op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid;
-                        mds = obj->lo_stripes[sidx].ls_mds;
-                        lmv_object_put(obj);
-                } else {
-                        rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds);
-                        if (rc)
-                                RETURN(rc);
-                }
+        LASSERT(op_data->op_namelen != 0);
+
+        CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
+               PFID(&op_data->op_fid2), op_data->op_namelen,
+               op_data->op_name, PFID(&op_data->op_fid1));
 
-                CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
-                       PFID(&op_data->op_fid2), op_data->op_namelen,
-                       op_data->op_name, PFID(&op_data->op_fid1));
+        obj = lmv_object_find(obd, &op_data->op_fid2);
+        if (obj) {
+                sidx = raw_name2idx(obj->lo_hashtype,
+                                    obj->lo_objcount,
+                                    op_data->op_name,
+                                    op_data->op_namelen);
+                op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid;
+                mds = obj->lo_stripes[sidx].ls_mds;
+                lmv_object_put(obj);
         } else {
-                rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
+                rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds);
                 if (rc)
                         RETURN(rc);
-
-                /* 
-                 * Request from MDS to acquire i_links for inode by fid1. 
-                 */
-                CDEBUG(D_INODE, "Inc i_nlinks for "DFID"\n",
-                       PFID(&op_data->op_fid1));
         }
 
-        CDEBUG(D_INODE, "Forward to mds #"LPU64" ("DFID")\n",
+        CDEBUG(D_INODE, "Forward to mds #%x ("DFID")\n",
                mds, PFID(&op_data->op_fid1));
 
-        op_data->op_fsuid = current->fsuid;
-        op_data->op_fsgid = current->fsgid;
+        op_data->op_fsuid = cfs_curproc_fsuid();
+        op_data->op_fsgid = cfs_curproc_fsgid();
         op_data->op_cap = cfs_curproc_cap_pack();
         tgt = lmv_get_target(lmv, mds);
 
-        if (op_data->op_namelen) {
-                /* 
-                 * Cancel UPDATE lock on child (fid1). 
-                 */
-                op_data->op_flags |= MF_MDC_CANCEL_FID2;
-                rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
-                                      MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
-        }
+        /*
+         * Cancel UPDATE lock on child (fid1).
+         */
+        op_data->op_flags |= MF_MDC_CANCEL_FID2;
+        rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
+                              MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
         if (rc == 0)
                 rc = md_link(tgt->ltd_exp, op_data, request);
         if (rc == -ERESTART) {
@@ -1974,7 +2070,6 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
         struct obd_device       *obd = exp->exp_obd;
         struct lmv_obd          *lmv = &obd->u.lmv;
         struct lmv_tgt_desc     *src_tgt;
-        struct lmv_tgt_desc     *tgt_tgt;
         int                      rc;
         int                      sidx;
         int                      loop = 0;
@@ -1983,6 +2078,8 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
         mdsno_t                  mds2;
         ENTRY;
 
+        LASSERT(oldlen != 0);
+
         CDEBUG(D_INODE, "RENAME %*s in "DFID" to %*s in "DFID"\n",
                oldlen, old, PFID(&op_data->op_fid1),
                newlen, new, PFID(&op_data->op_fid2));
@@ -1991,37 +2088,6 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
         if (rc)
                 RETURN(rc);
 
-        if (oldlen == 0) {
-                /*
-                 * MDS with old dir entry is asking another MDS to create name
-                 * there.
-                 */
-                CDEBUG(D_INODE,
-                       "Create %*s(%d/%d) in "DFID" pointing "
-                       "to "DFID"\n", newlen, new, oldlen, newlen,
-                       PFID(&op_data->op_fid2), PFID(&op_data->op_fid1));
-
-                rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds1);
-                if (rc)
-                        RETURN(rc);
-
-                /*
-                 * Target directory can be split, sowe should forward request to
-                 * the right MDS.
-                 */
-                obj = lmv_object_find(obd, &op_data->op_fid2);
-                if (obj) {
-                        sidx = raw_name2idx(obj->lo_hashtype,
-                                            obj->lo_objcount,
-                                            (char *)new, newlen);
-                        op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid;
-                        CDEBUG(D_INODE, "Parent obj "DFID"\n",
-                               PFID(&op_data->op_fid2));
-                        lmv_object_put(obj);
-                }
-                goto request;
-        }
-
 repeat:
         ++loop;
         LASSERT(loop <= 2);
@@ -2058,52 +2124,50 @@ repeat:
                         RETURN(rc);
         }
 
-request:
-        op_data->op_fsuid = current->fsuid;
-        op_data->op_fsgid = current->fsgid;
+        op_data->op_fsuid = cfs_curproc_fsuid();
+        op_data->op_fsgid = cfs_curproc_fsgid();
         op_data->op_cap = cfs_curproc_cap_pack();
 
         src_tgt = lmv_get_target(lmv, mds1);
-        tgt_tgt = lmv_get_target(lmv, mds2);
-        if (oldlen) {
-                /* 
-                 * LOOKUP lock on src child (fid3) should also be cancelled for
-                 * src_tgt in mdc_rename. 
-                 */
-                op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
 
-                /* 
-                 * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
-                 * own target. 
-                 */
-                rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, 
-                                      LCK_EX, MDS_INODELOCK_UPDATE, 
-                                      MF_MDC_CANCEL_FID2);
+        /*
+         * LOOKUP lock on src child (fid3) should also be cancelled for
+         * src_tgt in mdc_rename.
+         */
+        op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
 
-                /* 
-                 * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
-                 */
-                if (rc == 0) {
-                        rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, 
-                                              LCK_EX, MDS_INODELOCK_LOOKUP,
-                                              MF_MDC_CANCEL_FID4);
-                }
+        /*
+         * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
+         * own target.
+         */
+        rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+                              LCK_EX, MDS_INODELOCK_UPDATE,
+                              MF_MDC_CANCEL_FID2);
 
-                /* 
-                 * Cancel all the locks on tgt child (fid4). 
-                 */
-                if (rc == 0)
-                        rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, 
-                                              LCK_EX, MDS_INODELOCK_FULL,
-                                              MF_MDC_CANCEL_FID4);
+        /*
+         * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
+         */
+        if (rc == 0) {
+                rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+                                      LCK_EX, MDS_INODELOCK_LOOKUP,
+                                      MF_MDC_CANCEL_FID4);
         }
 
+        /*
+         * Cancel all the locks on tgt child (fid4).
+         */
+        if (rc == 0)
+                rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+                                      LCK_EX, MDS_INODELOCK_FULL,
+                                      MF_MDC_CANCEL_FID4);
+
         if (rc == 0)
                 rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
                                new, newlen, request);
+
         if (rc == -ERESTART) {
                 LASSERT(*request != NULL);
-                DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, 
+                DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
                           "Got -ERESTART during rename!\n");
                 ptlrpc_req_finished(*request);
                 *request = NULL;
@@ -2205,7 +2269,7 @@ static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
 
 /**
  * Main purpose of LMV blocking ast is to remove split directory LMV
- * presentation object (struct lmv_object) attached to the lock being revoked. 
+ * presentation object (struct lmv_object) attached to the lock being revoked.
  */
 int lmv_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                      void *data, int flag)
@@ -2225,7 +2289,7 @@ int lmv_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                 }
                 break;
         case LDLM_CB_CANCELING:
-                /* 
+                /*
                  * Time to drop cached attrs for split directory object
                  */
                 obj = lock->l_ast_data;
@@ -2253,7 +2317,7 @@ static void lmv_hash_adjust(__u64 *hash, __u64 hash_adj)
         val = le64_to_cpu(*hash);
         if (val < hash_adj)
                 val += MAX_HASH_SIZE;
-        if (val != DIR_END_OFF)
+        if (val != MDS_DIR_END_OFF)
                 *hash = cpu_to_le64(val - hash_adj);
 }
 
@@ -2274,15 +2338,14 @@ static __u32 lmv_node_rank(struct obd_export *exp, const struct lu_fid *fid)
         return id ^ (id >> 32);
 }
 
-static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
-                        struct obd_capa *oc, __u64 offset64, struct page *page,
-                        struct ptlrpc_request **request)
+static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
+                        struct page **pages, struct ptlrpc_request **request)
 {
         struct obd_device       *obd = exp->exp_obd;
         struct lmv_obd          *lmv = &obd->u.lmv;
-        struct lu_fid            rid = *fid;
         struct lmv_object       *obj;
-        __u64                    offset;
+        struct lu_fid            rid = op_data->op_fid1;
+        __u64                    offset = op_data->op_offset;
         __u64                    hash_adj = 0;
         __u32                    rank = 0;
         __u64                    seg_size = 0;
@@ -2291,14 +2354,17 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
         int                      tgt0_idx = 0;
         int                      rc;
         int                      nr = 0;
+        int                      i;
+        /* number of pages read, in CFS_PAGE_SIZE */
+        int                      nrdpgs;
+        /* number of pages transferred in LU_PAGE_SIZE */
+        int                      nlupgs;
         struct lmv_stripe       *los;
         struct lmv_tgt_desc     *tgt;
         struct lu_dirpage       *dp;
         struct lu_dirent        *ent;
         ENTRY;
 
-        offset = offset64;
-
         rc = lmv_check_connect(obd);
         if (rc)
                 RETURN(rc);
@@ -2325,7 +2391,7 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
          * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj
          * on hash  values that we get.
          */
-        obj = lmv_object_find_lock(obd, fid);
+        obj = lmv_object_find_lock(obd, &rid);
         if (obj) {
                 nr       = obj->lo_objcount;
                 LASSERT(nr > 0);
@@ -2333,7 +2399,7 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
                 do_div(seg_size, nr);
                 los      = obj->lo_stripes;
                 tgt      = lmv_get_target(lmv, los[0].ls_mds);
-                rank     = lmv_node_rank(tgt->ltd_exp, fid) % nr;
+                rank     = lmv_node_rank(tgt->ltd_exp, &rid) % nr;
                 tgt_tmp  = offset;
                 do_div(tgt_tmp, seg_size);
                 tgt0_idx = do_div(tgt_tmp,  nr);
@@ -2353,7 +2419,7 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
                 hash_adj += rank * seg_size;
 
                 CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" "
-                       LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj, 
+                       LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj,
                        offset, tgt0_idx, offset + hash_adj, tgt_idx);
 
                 offset = (offset + hash_adj) & MAX_HASH_SIZE;
@@ -2368,34 +2434,103 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
         if (IS_ERR(tgt))
                 GOTO(cleanup, rc = PTR_ERR(tgt));
 
-        rc = md_readpage(tgt->ltd_exp, &rid, oc, offset, page, request);
+        op_data->op_fid1 = rid;
+        rc = md_readpage(tgt->ltd_exp, op_data, pages, request);
         if (rc)
                 GOTO(cleanup, rc);
-        if (obj) {
-                dp = cfs_kmap(page);
 
-                lmv_hash_adjust(&dp->ldp_hash_start, hash_adj);
-                lmv_hash_adjust(&dp->ldp_hash_end,   hash_adj);
-                LASSERT(le64_to_cpu(dp->ldp_hash_start) <= offset64);
+        nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1)
+                 >> CFS_PAGE_SHIFT;
+        nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
+        LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
+        LASSERT(nrdpgs > 0 && nrdpgs <= op_data->op_npages);
 
-                for (ent = lu_dirent_start(dp); ent != NULL;
-                     ent = lu_dirent_next(ent))
-                        lmv_hash_adjust(&ent->lde_hash, hash_adj);
+        CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs,
+               op_data->op_npages);
 
-                if (tgt0_idx != nr - 1) {
-                        __u64 end;
+        for (i = 0; i < nrdpgs; i++) {
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+                struct lu_dirpage *first;
+                __u64 hash_end = 0;
+                __u32 flags = 0;
+#endif
+                struct lu_dirent *tmp = NULL;
 
-                        end = le64_to_cpu(dp->ldp_hash_end);
-                        if (end == DIR_END_OFF) {
+                dp = cfs_kmap(pages[i]);
+                if (obj) {
+                        lmv_hash_adjust(&dp->ldp_hash_start, hash_adj);
+                        lmv_hash_adjust(&dp->ldp_hash_end,   hash_adj);
+                        LASSERT(le64_to_cpu(dp->ldp_hash_start) <=
+                                op_data->op_offset);
+
+                        if ((tgt0_idx != nr - 1) &&
+                            (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF))
+                        {
                                 dp->ldp_hash_end = cpu_to_le32(seg_size *
                                                                (tgt0_idx + 1));
                                 CDEBUG(D_INODE,
                                        ""DFID" reset end "LPX64" tgt %d\n",
                                        PFID(&rid),
-                                       le64_to_cpu(dp->ldp_hash_end), tgt_idx);
+                                       (__u64)le64_to_cpu(dp->ldp_hash_end),
+                                       tgt_idx);
                         }
                 }
-                cfs_kunmap(page);
+
+                ent = lu_dirent_start(dp);
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+                first = dp;
+                hash_end = dp->ldp_hash_end;
+repeat:
+#endif
+                nlupgs--;
+                for (tmp = ent; ent != NULL;
+                     tmp = ent, ent = lu_dirent_next(ent)) {
+                        if (obj)
+                                lmv_hash_adjust(&ent->lde_hash, hash_adj);
+                }
+
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+                dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
+                if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) {
+                        ent = lu_dirent_start(dp);
+
+                        if (obj) {
+                                lmv_hash_adjust(&dp->ldp_hash_end, hash_adj);
+                                if ((tgt0_idx != nr - 1) &&
+                                    (le64_to_cpu(dp->ldp_hash_end) ==
+                                     MDS_DIR_END_OFF)) {
+                                        hash_end = cpu_to_le32(seg_size *
+                                                               (tgt0_idx + 1));
+                                        CDEBUG(D_INODE,
+                                            ""DFID" reset end "LPX64" tgt %d\n",
+                                            PFID(&rid),
+                                            (__u64)le64_to_cpu(hash_end),
+                                            tgt_idx);
+                                }
+                        }
+                        hash_end = dp->ldp_hash_end;
+                        flags = dp->ldp_flags;
+
+                        if (tmp) {
+                                /* enlarge the end entry lde_reclen from 0 to
+                                 * first entry of next lu_dirpage, in this way
+                                 * several lu_dirpages can be stored into one
+                                 * client page on client. */
+                                tmp = ((void *)tmp) +
+                                      le16_to_cpu(tmp->lde_reclen);
+                                tmp->lde_reclen =
+                                        cpu_to_le16((char *)(dp->ldp_entries) -
+                                                    (char *)tmp);
+                                goto repeat;
+                        }
+                }
+                first->ldp_hash_end = hash_end;
+                first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
+                first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
+#else
+                SET_BUT_UNUSED(tmp);
+#endif
+                cfs_kunmap(pages[i]);
         }
         EXIT;
 cleanup:
@@ -2412,6 +2547,7 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
         struct lmv_tgt_desc     *tgt = NULL;
         struct lmv_object       *obj;
         int                      rc;
+        int                      sidx;
         int                      loop = 0;
         ENTRY;
 
@@ -2422,28 +2558,24 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
 repeat:
         ++loop;
         LASSERT(loop <= 2);
-        if (op_data->op_namelen != 0) {
-                int sidx;
+        LASSERT(op_data->op_namelen != 0);
 
-                obj = lmv_object_find(obd, &op_data->op_fid1);
-                if (obj) {
-                        sidx = raw_name2idx(obj->lo_hashtype,
-                                            obj->lo_objcount,
-                                            op_data->op_name,
-                                            op_data->op_namelen);
-                        op_data->op_bias &= ~MDS_CHECK_SPLIT;
-                        op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
-                        tgt = lmv_get_target(lmv,
-                                             obj->lo_stripes[sidx].ls_mds);
-                        lmv_object_put(obj);
-                        CDEBUG(D_INODE, "UNLINK '%*s' in "DFID" -> %u\n",
-                               op_data->op_namelen, op_data->op_name,
-                               PFID(&op_data->op_fid1), sidx);
-                }
-        } else {
-                CDEBUG(D_INODE, "Drop i_nlink on "DFID"\n",
-                       PFID(&op_data->op_fid1));
+        obj = lmv_object_find(obd, &op_data->op_fid1);
+        if (obj) {
+                sidx = raw_name2idx(obj->lo_hashtype,
+                                    obj->lo_objcount,
+                                    op_data->op_name,
+                                    op_data->op_namelen);
+                op_data->op_bias &= ~MDS_CHECK_SPLIT;
+                op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
+                tgt = lmv_get_target(lmv,
+                                     obj->lo_stripes[sidx].ls_mds);
+                lmv_object_put(obj);
+                CDEBUG(D_INODE, "UNLINK '%*s' in "DFID" -> %u\n",
+                       op_data->op_namelen, op_data->op_name,
+                       PFID(&op_data->op_fid1), sidx);
         }
+
         if (tgt == NULL) {
                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
                 if (IS_ERR(tgt))
@@ -2451,29 +2583,28 @@ repeat:
                 op_data->op_bias |= MDS_CHECK_SPLIT;
         }
 
-        op_data->op_fsuid = current->fsuid;
-        op_data->op_fsgid = current->fsgid;
+        op_data->op_fsuid = cfs_curproc_fsuid();
+        op_data->op_fsgid = cfs_curproc_fsgid();
         op_data->op_cap = cfs_curproc_cap_pack();
 
-        /* 
+        /*
          * If child's fid is given, cancel unused locks for it if it is from
-         * another export than parent. 
+         * another export than parent.
+         *
+         * LOOKUP lock for child (fid3) should also be cancelled on parent
+         * tgt_tgt in mdc_unlink().
          */
-        if (op_data->op_namelen) {
-                /*
-                 * LOOKUP lock for child (fid3) should also be cancelled on 
-                 * parent tgt_tgt in mdc_unlink(). 
-                 */
-                op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
+        op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
+
+        /*
+         * Cancel FULL locks on child (fid3).
+         */
+        rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
+                              MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
 
-                /* 
-                 * Cancel FULL locks on child (fid3). 
-                 */
-                rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
-                                      MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
-        }
         if (rc == 0)
                 rc = md_unlink(tgt->ltd_exp, op_data, request);
+
         if (rc == -ERESTART) {
                 LASSERT(*request != NULL);
                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
@@ -2494,7 +2625,8 @@ repeat:
 
 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
 {
-        int        rc = 0;
+        struct lmv_obd *lmv = &obd->u.lmv;
+        int rc = 0;
 
         switch (stage) {
         case OBD_CLEANUP_EARLY:
@@ -2502,6 +2634,8 @@ static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
                  * stack. */
                 break;
         case OBD_CLEANUP_EXPORTS:
+                fld_client_proc_fini(&lmv->lmv_fld);
+                lprocfs_obd_cleanup(obd);
                 rc = obd_llog_finish(obd, 0);
                 if (rc != 0)
                         CERROR("failed to cleanup llogging subsystems\n");
@@ -2512,8 +2646,8 @@ static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
         RETURN(rc);
 }
 
-static int lmv_get_info(struct obd_export *exp, __u32 keylen,
-                        void *key, __u32 *vallen, void *val, 
+static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
+                        __u32 keylen, void *key, __u32 *vallen, void *val,
                         struct lov_stripe_md *lsm)
 {
         struct obd_device       *obd;
@@ -2541,15 +2675,15 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
                      i++, tgts++) {
 
-                        /* 
-                         * All tgts should be connected when this gets called. 
+                        /*
+                         * All tgts should be connected when this gets called.
                          */
                         if (!tgts || !tgts->ltd_exp) {
                                 CERROR("target not setup?\n");
                                 continue;
                         }
 
-                        if (!obd_get_info(tgts->ltd_exp, keylen, key,
+                        if (!obd_get_info(env, tgts->ltd_exp, keylen, key,
                                           vallen, val, NULL))
                                 RETURN(0);
                 }
@@ -2559,26 +2693,29 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
                 if (rc)
                         RETURN(rc);
 
-                /* 
+                /*
                  * Forwarding this request to first MDS, it should know LOV
-                 * desc. 
+                 * desc.
                  */
-                rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
+                rc = obd_get_info(env, lmv->tgts[0].ltd_exp, keylen, key,
                                   vallen, val, NULL);
                 if (!rc && KEY_IS(KEY_CONN_DATA)) {
                         exp->exp_connect_flags =
                         ((struct obd_connect_data *)val)->ocd_connect_flags;
                 }
                 RETURN(rc);
+        } else if (KEY_IS(KEY_TGT_COUNT)) {
+                *((int *)val) = lmv->desc.ld_tgt_count;
+                RETURN(0);
         }
 
         CDEBUG(D_IOCTL, "Invalid key\n");
         RETURN(-EINVAL);
 }
 
-int lmv_set_info_async(struct obd_export *exp, obd_count keylen,
-                       void *key, obd_count vallen, void *val,
-                       struct ptlrpc_request_set *set)
+int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
+                       obd_count keylen, void *key, obd_count vallen,
+                       void *val, struct ptlrpc_request_set *set)
 {
         struct lmv_tgt_desc    *tgt;
         struct obd_device      *obd;
@@ -2594,8 +2731,7 @@ int lmv_set_info_async(struct obd_export *exp, obd_count keylen,
         }
         lmv = &obd->u.lmv;
 
-        if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX) ||
-            KEY_IS(KEY_INIT_RECOV_BACKUP)) {
+        if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX)) {
                 int i, err = 0;
 
                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
@@ -2604,7 +2740,7 @@ int lmv_set_info_async(struct obd_export *exp, obd_count keylen,
                         if (!tgt->ltd_exp)
                                 continue;
 
-                        err = obd_set_info_async(tgt->ltd_exp,
+                        err = obd_set_info_async(env, tgt->ltd_exp,
                                                  keylen, key, vallen, val, set);
                         if (err && rc == 0)
                                 rc = err;
@@ -2632,13 +2768,13 @@ int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
                 RETURN(mea_size);
 
         if (*lmmp && !lsm) {
-                OBD_FREE(*lmmp, mea_size);
+                OBD_FREE_LARGE(*lmmp, mea_size);
                 *lmmp = NULL;
                 RETURN(0);
         }
 
         if (*lmmp == NULL) {
-                OBD_ALLOC(*lmmp, mea_size);
+                OBD_ALLOC_LARGE(*lmmp, mea_size);
                 if (*lmmp == NULL)
                         RETURN(-ENOMEM);
         }
@@ -2682,14 +2818,14 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
                 return mea_size;
 
         if (*lsmp != NULL && lmm == NULL) {
-                OBD_FREE(*tmea, mea_size);
+                OBD_FREE_LARGE(*tmea, mea_size);
                 *lsmp = NULL;
                 RETURN(0);
         }
 
         LASSERT(mea_size == lmm_size);
 
-        OBD_ALLOC(*tmea, mea_size);
+        OBD_ALLOC_LARGE(*tmea, mea_size);
         if (*tmea == NULL)
                 RETURN(-ENOMEM);
 
@@ -2702,8 +2838,8 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
         {
                 magic = le32_to_cpu(mea->mea_magic);
         } else {
-                /* 
-                 * Old mea is not handled here. 
+                /*
+                 * Old mea is not handled here.
                  */
                 CERROR("Old not supportable EA is found\n");
                 LBUG();
@@ -2721,8 +2857,8 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
 }
 
 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
-                             ldlm_policy_data_t *policy, ldlm_mode_t mode, 
-                             int flags, void *opaque)
+                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
+                             ldlm_cancel_flags_t flags, void *opaque)
 {
         struct obd_device       *obd = exp->exp_obd;
         struct lmv_obd          *lmv = &obd->u.lmv;
@@ -2745,13 +2881,15 @@ static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
         RETURN(rc);
 }
 
-int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
+int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
+                      __u64 *bits)
 {
         struct obd_device       *obd = exp->exp_obd;
         struct lmv_obd          *lmv = &obd->u.lmv;
         int                      rc;
         ENTRY;
-        rc =  md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data);
+
+        rc =  md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data, bits);
         RETURN(rc);
 }
 
@@ -2768,11 +2906,11 @@ ldlm_mode_t lmv_lock_match(struct obd_export *exp, int flags,
 
         CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
 
-        /* 
+        /*
          * With CMD every object can have two locks in different namespaces:
          * lookup lock in space of mds storing direntry and update/open lock in
          * space of mds storing inode. Thus we check all targets, not only that
-         * one fid was created in. 
+         * one fid was created in.
          */
         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
                 rc = md_lock_match(lmv->tgts[i].ltd_exp, flags, fid,
@@ -2882,6 +3020,18 @@ static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
         RETURN(rc);
 }
 
+int lmv_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
+                    const struct req_msg_field *field, struct obd_capa **oc)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct lmv_obd *lmv = &obd->u.lmv;
+        int rc;
+
+        ENTRY;
+        rc = md_unpack_capa(lmv->tgts[0].ltd_exp, req, field, oc);
+        RETURN(rc);
+}
+
 int lmv_intent_getattr_async(struct obd_export *exp,
                              struct md_enqueue_info *minfo,
                              struct ldlm_enqueue_info *einfo)
@@ -2890,7 +3040,7 @@ int lmv_intent_getattr_async(struct obd_export *exp,
         struct obd_device       *obd = exp->exp_obd;
         struct lmv_obd          *lmv = &obd->u.lmv;
         struct lmv_object       *obj;
-        struct lmv_tgt_desc     *tgt;
+        struct lmv_tgt_desc     *tgt = NULL;
         int                      rc;
         int                      sidx;
         ENTRY;
@@ -2899,36 +3049,21 @@ int lmv_intent_getattr_async(struct obd_export *exp,
         if (rc)
                 RETURN(rc);
 
-        if (!fid_is_sane(&op_data->op_fid2)) {
+        if (op_data->op_namelen) {
                 obj = lmv_object_find(obd, &op_data->op_fid1);
-                if (obj && op_data->op_namelen) {
-                        sidx = raw_name2idx(obj->lo_hashtype,
-                                            obj->lo_objcount,
+                if (obj) {
+                        sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
                                             (char *)op_data->op_name,
                                             op_data->op_namelen);
                         op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
-                        tgt = lmv_get_target(lmv, 
-                                             obj->lo_stripes[sidx].ls_mds);
-                        CDEBUG(D_INODE,
-                               "Choose slave dir ("DFID") -> mds #%d\n", 
-                               PFID(&op_data->op_fid1), tgt->ltd_idx);
-                } else {
-                        tgt = lmv_find_target(lmv, &op_data->op_fid1);
-                }
-                if (obj)
+                        tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds);
                         lmv_object_put(obj);
-        } else {
-                op_data->op_fid1 = op_data->op_fid2;
-                tgt = lmv_find_target(lmv, &op_data->op_fid2);
-                op_data->op_bias = MDS_CROSS_REF;
-                /*
-                 * Unfortunately, we have to lie to MDC/MDS to retrieve
-                 * attributes llite needs.
-                */
-                if (minfo->mi_it.it_op & IT_LOOKUP)
-                        minfo->mi_it.it_op = IT_GETATTR;
+                }
         }
-        
+
+        if (tgt == NULL)
+                tgt = lmv_find_target(lmv, &op_data->op_fid1);
+
         if (IS_ERR(tgt))
                 RETURN(PTR_ERR(tgt));
 
@@ -2936,9 +3071,8 @@ int lmv_intent_getattr_async(struct obd_export *exp,
         RETURN(rc);
 }
 
-int lmv_revalidate_lock(struct obd_export *exp,
-                        struct lookup_intent *it,
-                        struct lu_fid *fid)
+int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
+                        struct lu_fid *fid, __u64 *bits)
 {
         struct obd_device       *obd = exp->exp_obd;
         struct lmv_obd          *lmv = &obd->u.lmv;
@@ -2954,10 +3088,87 @@ int lmv_revalidate_lock(struct obd_export *exp,
         if (IS_ERR(tgt))
                 RETURN(PTR_ERR(tgt));
 
-        rc = md_revalidate_lock(tgt->ltd_exp, it, fid);
+        rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
+        RETURN(rc);
+}
+
+/**
+ * For lmv, only need to send request to master MDT, and the master MDT will
+ * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
+ * we directly fetch data from the slave MDTs.
+ */
+int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
+                 struct obd_quotactl *oqctl)
+{
+        struct obd_device   *obd = class_exp2obd(exp);
+        struct lmv_obd      *lmv = &obd->u.lmv;
+        struct lmv_tgt_desc *tgt = &lmv->tgts[0];
+        int                  rc = 0, i;
+        __u64                curspace, curinodes;
+        ENTRY;
+
+        if (!lmv->desc.ld_tgt_count || !tgt->ltd_active) {
+                CERROR("master lmv inactive\n");
+                RETURN(-EIO);
+        }
+
+        if (oqctl->qc_cmd != Q_GETOQUOTA) {
+                rc = obd_quotactl(tgt->ltd_exp, oqctl);
+                RETURN(rc);
+        }
+
+        curspace = curinodes = 0;
+        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+                int err;
+                tgt = &lmv->tgts[i];
+
+                if (tgt->ltd_exp == NULL)
+                        continue;
+                if (!tgt->ltd_active) {
+                        CDEBUG(D_HA, "mdt %d is inactive.\n", i);
+                        continue;
+                }
+
+                err = obd_quotactl(tgt->ltd_exp, oqctl);
+                if (err) {
+                        CERROR("getquota on mdt %d failed. %d\n", i, err);
+                        if (!rc)
+                                rc = err;
+                } else {
+                        curspace += oqctl->qc_dqblk.dqb_curspace;
+                        curinodes += oqctl->qc_dqblk.dqb_curinodes;
+                }
+        }
+        oqctl->qc_dqblk.dqb_curspace = curspace;
+        oqctl->qc_dqblk.dqb_curinodes = curinodes;
+
         RETURN(rc);
 }
 
+int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp,
+                   struct obd_quotactl *oqctl)
+{
+        struct obd_device   *obd = class_exp2obd(exp);
+        struct lmv_obd      *lmv = &obd->u.lmv;
+        struct lmv_tgt_desc *tgt;
+        int                  i, rc = 0;
+        ENTRY;
+
+        for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
+                int err;
+
+                if (!tgt->ltd_active) {
+                        CERROR("lmv idx %d inactive\n", i);
+                        RETURN(-EIO);
+                }
+
+                err = obd_quotacheck(tgt->ltd_exp, oqctl);
+                if (err && !rc)
+                        rc = err;
+        }
+
+        RETURN(rc);
+}
 
 struct obd_ops lmv_obd_ops = {
         .o_owner                = THIS_MODULE,
@@ -2975,12 +3186,15 @@ struct obd_ops lmv_obd_ops = {
         .o_notify               = lmv_notify,
         .o_get_uuid             = lmv_get_uuid,
         .o_iocontrol            = lmv_iocontrol,
-        .o_fid_delete           = lmv_fid_delete
+        .o_fid_delete           = lmv_fid_delete,
+        .o_quotacheck           = lmv_quotacheck,
+        .o_quotactl             = lmv_quotactl
 };
 
 struct md_ops lmv_md_ops = {
         .m_getstatus            = lmv_getstatus,
         .m_change_cbdata        = lmv_change_cbdata,
+        .m_find_cbdata          = lmv_find_cbdata,
         .m_close                = lmv_close,
         .m_create               = lmv_create,
         .m_done_writing         = lmv_done_writing,
@@ -3005,6 +3219,7 @@ struct md_ops lmv_md_ops = {
         .m_set_open_replay_data = lmv_set_open_replay_data,
         .m_clear_open_replay_data = lmv_clear_open_replay_data,
         .m_renew_capa           = lmv_renew_capa,
+        .m_unpack_capa          = lmv_unpack_capa,
         .m_get_remote_perm      = lmv_get_remote_perm,
         .m_intent_getattr_async = lmv_intent_getattr_async,
         .m_revalidate_lock      = lmv_revalidate_lock
@@ -3024,6 +3239,7 @@ int __init lmv_init(void)
         }
 
         lprocfs_lmv_init_vars(&lvars);
+
         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
                                  lvars.module_vars, LUSTRE_LMV_NAME, NULL);
         if (rc)
@@ -3037,9 +3253,9 @@ static void lmv_exit(void)
 {
         class_unregister_type(LUSTRE_LMV_NAME);
 
-        LASSERTF(atomic_read(&lmv_object_count) == 0,
+        LASSERTF(cfs_atomic_read(&lmv_object_count) == 0,
                  "Can't free lmv objects cache, %d object(s) busy\n",
-                 atomic_read(&lmv_object_count));
+                 cfs_atomic_read(&lmv_object_count));
         cfs_mem_cache_destroy(lmv_object_cache);
 }