Whamcloud - gitweb
b=20997 passthrough obd_force from lmv to mdc
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
index bb19245..d9b2c8c 100644 (file)
@@ -26,7 +26,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  */
 /*
@@ -52,7 +52,6 @@
 #include <liblustre.h>
 #endif
 
-#include <lustre/lustre_idl.h>
 #include <lustre_log.h>
 #include <obd_support.h>
 #include <lustre_lib.h>
@@ -65,7 +64,7 @@
 
 /* object cache. */
 cfs_mem_cache_t *lmv_object_cache;
-atomic_t lmv_object_count = ATOMIC_INIT(0);
+cfs_atomic_t lmv_object_count = CFS_ATOMIC_INIT(0);
 
 static void lmv_activate_target(struct lmv_obd *lmv,
                                 struct lmv_tgt_desc *tgt,
@@ -97,7 +96,7 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
                lmv, uuid->uuid, activate);
 
-        spin_lock(&lmv->lmv_lock);
+        cfs_spin_lock(&lmv->lmv_lock);
         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
                 if (tgt->ltd_exp == NULL)
                         continue;
@@ -127,13 +126,13 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
                 GOTO(out_lmv_lock, rc);
         }
 
-        CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, 
+        CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
                activate ? "" : "in");
         lmv_activate_target(lmv, tgt, activate);
         EXIT;
 
  out_lmv_lock:
-        spin_unlock(&lmv->lmv_lock);
+        cfs_spin_unlock(&lmv->lmv_lock);
         return rc;
 }
 
@@ -146,7 +145,7 @@ static int lmv_set_mdc_data(struct lmv_obd *lmv, struct obd_uuid *uuid,
 
         LASSERT(data != NULL);
 
-        spin_lock(&lmv->lmv_lock);
+        cfs_spin_lock(&lmv->lmv_lock);
         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
                 if (tgt->ltd_exp == NULL)
                         continue;
@@ -156,7 +155,7 @@ static int lmv_set_mdc_data(struct lmv_obd *lmv, struct obd_uuid *uuid,
                         break;
                 }
         }
-        spin_unlock(&lmv->lmv_lock);
+        cfs_spin_unlock(&lmv->lmv_lock);
         RETURN(0);
 }
 
@@ -199,7 +198,7 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
         } else if (ev == OBD_NOTIFY_OCD) {
                 conn_data = &watched->u.cli.cl_import->imp_connect_data;
 
-                /* 
+                /*
                  * Set connect data to desired target, update exp_connect_flags.
                  */
                 rc = lmv_set_mdc_data(lmv, uuid, conn_data);
@@ -219,14 +218,14 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
         }
 #if 0
         else if (ev == OBD_NOTIFY_DISCON) {
-                /* 
-                 * For disconnect event, flush fld cache for failout MDS case. 
+                /*
+                 * For disconnect event, flush fld cache for failout MDS case.
                  */
                 fld_client_flush(&lmv->lmv_fld);
         }
 #endif
-        /* 
-         * Pass the notification up the chain. 
+        /*
+         * Pass the notification up the chain.
          */
         if (obd->obd_observer)
                 rc = obd_notify(obd->obd_observer, watched, ev, data);
@@ -236,10 +235,10 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
 
 /**
  * This is fake connect function. Its purpose is to initialize lmv and say
- * caller that everything is okay. Real connection will be performed later. 
+ * caller that everything is okay. Real connection will be performed later.
  */
 static int lmv_connect(const struct lu_env *env,
-                       struct lustre_handle *conn, struct obd_device *obd,
+                       struct obd_export **exp, struct obd_device *obd,
                        struct obd_uuid *cluuid, struct obd_connect_data *data,
                        void *localdata)
 {
@@ -247,29 +246,30 @@ static int lmv_connect(const struct lu_env *env,
         struct proc_dir_entry *lmv_proc_dir;
 #endif
         struct lmv_obd        *lmv = &obd->u.lmv;
-        struct obd_export     *exp;
+        struct lustre_handle  conn = { 0 };
         int                    rc = 0;
         ENTRY;
 
-        rc = class_connect(conn, obd, cluuid);
-        if (rc) {
-                CERROR("class_connection() returned %d\n", rc);
-                RETURN(rc);
-        }
-
-        exp = class_conn2export(conn);
-
-        /* 
+        /*
          * We don't want to actually do the underlying connections more than
-         * once, so keep track. 
+         * once, so keep track.
          */
         lmv->refcount++;
         if (lmv->refcount > 1) {
-                class_export_put(exp);
+                *exp = NULL;
                 RETURN(0);
         }
 
-        lmv->exp = exp;
+        rc = class_connect(&conn, obd, cluuid);
+        if (rc) {
+                CERROR("class_connection() returned %d\n", rc);
+                RETURN(rc);
+        }
+
+        *exp = class_conn2export(&conn);
+        class_export_get(*exp);
+
+        lmv->exp = *exp;
         lmv->connected = 0;
         lmv->cluuid = *cluuid;
 
@@ -286,11 +286,11 @@ static int lmv_connect(const struct lu_env *env,
         }
 #endif
 
-        /* 
+        /*
          * All real clients should perform actual connection right away, because
          * it is possible, that LMV will not have opportunity to connect targets
          * and MDC stuff will be called directly, for instance while reading
-         * ../mdc/../kbytesfree procfs file, etc. 
+         * ../mdc/../kbytesfree procfs file, etc.
          */
         if (data->ocd_connect_flags & OBD_CONNECT_REAL)
                 rc = lmv_check_connect(obd);
@@ -383,7 +383,6 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
         struct obd_uuid         *cluuid = &lmv->cluuid;
         struct obd_connect_data *mdc_data = NULL;
         struct obd_uuid          lmv_mdc_uuid = { "LMV_MDC_UUID" };
-        struct lustre_handle     conn = {0, };
         struct obd_device       *mdc_obd;
         struct obd_export       *mdc_exp;
         struct lu_fld_target     target;
@@ -407,16 +406,14 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
                 RETURN(-EINVAL);
         }
 
-        rc = obd_connect(NULL, &conn, mdc_obd, &lmv_mdc_uuid,
+        rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
                          &lmv->conn_data, NULL);
         if (rc) {
                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
                 RETURN(rc);
         }
 
-        mdc_exp = class_conn2export(&conn);
-
-        /* 
+        /*
          * Init fid sequence client for this mdc and add new fld target.
          */
         rc = obd_fid_init(mdc_exp);
@@ -440,7 +437,7 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
         }
 
         if (obd->obd_observer) {
-                /* 
+                /*
                  * Tell the observer about the new target.
                  */
                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
@@ -455,7 +452,7 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
         tgt->ltd_exp = mdc_exp;
         lmv->desc.ld_active_tgt_count++;
 
-        /* 
+        /*
          * Copy connect data, it may be used later.
          */
         lmv->datas[tgt->ltd_idx] = *mdc_data;
@@ -465,22 +462,20 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
 
         CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
-                atomic_read(&obd->obd_refcount));
+                cfs_atomic_read(&obd->obd_refcount));
 
 #ifdef __KERNEL__
         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
         if (lmv_proc_dir) {
                 struct proc_dir_entry *mdc_symlink;
-                char name[MAX_STRING_SIZE + 1];
 
                 LASSERT(mdc_obd->obd_type != NULL);
                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
-                name[MAX_STRING_SIZE] = '\0';
-                snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
-                         mdc_obd->obd_type->typ_name,
-                         mdc_obd->obd_name);
-                mdc_symlink = proc_symlink(mdc_obd->obd_name,
-                                           lmv_proc_dir, name);
+                mdc_symlink = lprocfs_add_symlink(mdc_obd->obd_name,
+                                                  lmv_proc_dir,
+                                                  "../../../%s/%s",
+                                                  mdc_obd->obd_type->typ_name,
+                                                  mdc_obd->obd_name);
                 if (mdc_symlink == NULL) {
                         CERROR("Could not register LMV target "
                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
@@ -523,24 +518,24 @@ int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid)
                         RETURN(-EINVAL);
                 }
 
-                rc = obd_llog_init(obd, &obd->obd_olg, mdc_obd, 0, NULL, tgt_uuid);
+                rc = obd_llog_init(obd, &obd->obd_olg, mdc_obd, NULL);
                 if (rc) {
                         lmv_init_unlock(lmv);
                         CERROR("lmv failed to setup llogging subsystems\n");
                 }
         }
-        spin_lock(&lmv->lmv_lock);
+        cfs_spin_lock(&lmv->lmv_lock);
         tgt = lmv->tgts + lmv->desc.ld_tgt_count++;
         tgt->ltd_uuid = *tgt_uuid;
-        spin_unlock(&lmv->lmv_lock);
+        cfs_spin_unlock(&lmv->lmv_lock);
 
         if (lmv->connected) {
                 rc = lmv_connect_mdc(obd, tgt);
                 if (rc) {
-                        spin_lock(&lmv->lmv_lock);
+                        cfs_spin_lock(&lmv->lmv_lock);
                         lmv->desc.ld_tgt_count--;
                         memset(tgt, 0, sizeof(*tgt));
-                        spin_unlock(&lmv->lmv_lock);
+                        cfs_spin_unlock(&lmv->lmv_lock);
                 } else {
                         int easize = sizeof(struct lmv_stripe_md) +
                                      lmv->desc.ld_tgt_count *
@@ -630,8 +625,11 @@ static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
 
         mdc_obd = class_exp2obd(tgt->ltd_exp);
 
-        if (mdc_obd)
+        if (mdc_obd) {
+                mdc_obd->obd_force = obd->obd_force;
+                mdc_obd->obd_fail = obd->obd_fail;
                 mdc_obd->obd_no_recov = obd->obd_no_recov;
+        }
 
 #ifdef __KERNEL__
         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
@@ -684,8 +682,8 @@ static int lmv_disconnect(struct obd_export *exp)
         if (!lmv->tgts)
                 goto out_local;
 
-        /* 
-         * Only disconnect the underlying layers on the final disconnect. 
+        /*
+         * Only disconnect the underlying layers on the final disconnect.
          */
         lmv->refcount--;
         if (lmv->refcount != 0)
@@ -725,12 +723,13 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
 {
         struct obd_device    *obddev = class_exp2obd(exp);
         struct lmv_obd       *lmv = &obddev->u.lmv;
-        int                   i;
+        int                   i = 0;
         int                   rc = 0;
         int                   set = 0;
+        int                   count = lmv->desc.ld_tgt_count;
         ENTRY;
 
-        if (lmv->desc.ld_tgt_count == 0)
+        if (count == 0)
                 RETURN(-ENOTTY);
 
         switch (cmd) {
@@ -741,9 +740,7 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                 __u32 index;
 
                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
-                LASSERT(data->ioc_plen1 == sizeof(struct obd_statfs));
-
-                if ((index >= lmv->desc.ld_tgt_count))
+                if ((index >= count))
                         RETURN(-ENODEV);
 
                 if (!lmv->tgts[index].ltd_active)
@@ -753,26 +750,101 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                 if (!mdc_obd)
                         RETURN(-EINVAL);
 
+                /* copy UUID */
+                if (cfs_copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
+                                     min((int) data->ioc_plen2,
+                                         (int) sizeof(struct obd_uuid))))
+                        RETURN(-EFAULT);
+
                 rc = obd_statfs(mdc_obd, &stat_buf,
-                                cfs_time_current_64() - HZ, 0);
+                                cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
+                                0);
                 if (rc)
                         RETURN(rc);
-                if (copy_to_user(data->ioc_pbuf1, &stat_buf, data->ioc_plen1))
-                        RETURN(rc);
-                rc = copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
-                                  data->ioc_plen2);
+                if (cfs_copy_to_user(data->ioc_pbuf1, &stat_buf,
+                                     min((int) data->ioc_plen1,
+                                         (int) sizeof(stat_buf))))
+                        RETURN(-EFAULT);
                 break;
         }
+        case OBD_IOC_QUOTACTL: {
+                struct if_quotactl *qctl = karg;
+                struct lmv_tgt_desc *tgt = NULL;
+                struct obd_quotactl *oqctl;
+
+                if (qctl->qc_valid == QC_MDTIDX) {
+                        if (qctl->qc_idx < 0 || count <= qctl->qc_idx)
+                                RETURN(-EINVAL);
+
+                        tgt = &lmv->tgts[qctl->qc_idx];
+                        if (!tgt->ltd_exp)
+                                RETURN(-EINVAL);
+                } else if (qctl->qc_valid == QC_UUID) {
+                        for (i = 0; i < count; i++) {
+                                tgt = &lmv->tgts[i];
+                                if (!obd_uuid_equals(&tgt->ltd_uuid,
+                                                     &qctl->obd_uuid))
+                                        continue;
+
+                                if (tgt->ltd_exp == NULL)
+                                        RETURN(-EINVAL);
+
+                                break;
+                        }
+                } else {
+                        RETURN(-EINVAL);
+                }
+
+                if (i >= count)
+                        RETURN(-EAGAIN);
+
+                LASSERT(tgt && tgt->ltd_exp);
+                OBD_ALLOC_PTR(oqctl);
+                if (!oqctl)
+                        RETURN(-ENOMEM);
+
+                QCTL_COPY(oqctl, qctl);
+                rc = obd_quotactl(tgt->ltd_exp, oqctl);
+                if (rc == 0) {
+                        QCTL_COPY(qctl, oqctl);
+                        qctl->qc_valid = QC_MDTIDX;
+                        qctl->obd_uuid = tgt->ltd_uuid;
+                }
+                OBD_FREE_PTR(oqctl);
+                break;
+        }
+        case OBD_IOC_CHANGELOG_SEND:
+        case OBD_IOC_CHANGELOG_CLEAR: {
+                struct ioc_changelog *icc = karg;
+
+                if (icc->icc_mdtindex >= count)
+                        RETURN(-ENODEV);
+
+                rc = obd_iocontrol(cmd, lmv->tgts[icc->icc_mdtindex].ltd_exp,
+                                   sizeof(*icc), icc, NULL);
+                break;
+        }
+        case LL_IOC_GET_CONNECT_FLAGS: {
+                rc = obd_iocontrol(cmd, lmv->tgts[0].ltd_exp, len, karg, uarg);
+                break;
+        }
+
         default : {
-                for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+                for (i = 0; i < count; i++) {
                         int err;
+                        struct obd_device *mdc_obd;
 
                         if (lmv->tgts[i].ltd_exp == NULL)
                                 continue;
-
+                        /* ll_umount_begin() sets force flag but for lmv, not
+                         * mdc. Let's pass it through */
+                        mdc_obd = class_exp2obd(lmv->tgts[i].ltd_exp);
+                        mdc_obd->obd_force = obddev->obd_force;
                         err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len,
                                             karg, uarg);
-                        if (err) {
+                        if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
+                                RETURN(err);
+                        } else if (err) {
                                 if (lmv->tgts[i].ltd_active) {
                                         CERROR("error: iocontrol MDC %s on MDT"
                                                "idx %d cmd %x: err = %d\n",
@@ -806,7 +878,7 @@ static int lmv_nid_policy(struct lmv_obd *lmv)
 {
         struct obd_import *imp;
         __u32              id;
-        
+
         /*
          * XXX: To get nid we assume that underlying obd device is mdc.
          */
@@ -835,7 +907,7 @@ static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
 }
 
 /**
- * This is _inode_ placement policy function (not name). 
+ * This is _inode_ placement policy function (not name).
  */
 static int lmv_placement_policy(struct obd_device *obd,
                                 struct md_op_data *op_data,
@@ -912,41 +984,28 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
         ENTRY;
 
         tgt = lmv_get_target(lmv, mds);
-    
-        /* 
+
+        /*
          * New seq alloc and FLD setup should be atomic. Otherwise we may find
          * on server that seq in new allocated fid is not yet known.
          */
-        down(&tgt->ltd_fid_sem);
+        cfs_down(&tgt->ltd_fid_sem);
 
         if (!tgt->ltd_active)
                 GOTO(out, rc = -ENODEV);
 
-        /* 
-         * Asking underlaying tgt layer to allocate new fid. 
+        /*
+         * Asking underlaying tgt layer to allocate new fid.
          */
         rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
         if (rc > 0) {
                 LASSERT(fid_is_sane(fid));
-
-                /* 
-                 * Client switches to new sequence, setup FLD. 
-                 */
-                rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid),
-                                       mds, NULL);
-                if (rc) {
-                        /* 
-                         * Delete just allocated fid sequence in case
-                         * of fail back.
-                         */
-                        CERROR("Can't create fld entry, rc %d\n", rc);
-                        obd_fid_delete(tgt->ltd_exp, NULL);
-                }
+                rc = 0;
         }
 
         EXIT;
 out:
-        up(&tgt->ltd_fid_sem);
+        cfs_up(&tgt->ltd_fid_sem);
         return rc;
 }
 
@@ -1017,7 +1076,7 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 RETURN(-ENOMEM);
 
         for (i = 0; i < LMV_MAX_TGT_COUNT; i++) {
-                sema_init(&lmv->tgts[i].ltd_fid_sem, 1);
+                cfs_sema_init(&lmv->tgts[i].ltd_fid_sem, 1);
                 lmv->tgts[i].ltd_idx = i;
         }
 
@@ -1035,8 +1094,8 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         lmv->max_easize = 0;
         lmv->lmv_placement = PLACEMENT_CHAR_POLICY;
 
-        spin_lock_init(&lmv->lmv_lock);
-        sema_init(&lmv->init_sem, 1);
+        cfs_spin_lock_init(&lmv->lmv_lock);
+        cfs_sema_init(&lmv->init_sem, 1);
 
         rc = lmv_object_setup(obd);
         if (rc) {
@@ -1217,15 +1276,14 @@ static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
         if (IS_ERR(tgt))
                 RETURN(PTR_ERR(tgt));
 
-        rc = md_setxattr(tgt->ltd_exp, fid, oc, valid, name, input, 
+        rc = md_setxattr(tgt->ltd_exp, fid, oc, valid, name, input,
                          input_size, output_size, flags, suppgid,
                          request);
 
         RETURN(rc);
 }
 
-static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
-                       struct obd_capa *oc, obd_valid valid, int ea_size,
+static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
                        struct ptlrpc_request **request)
 {
         struct obd_device       *obd = exp->exp_obd;
@@ -1240,17 +1298,22 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
         if (rc)
                 RETURN(rc);
 
-        tgt = lmv_find_target(lmv, fid);
+        tgt = lmv_find_target(lmv, &op_data->op_fid1);
         if (IS_ERR(tgt))
                 RETURN(PTR_ERR(tgt));
 
-        rc = md_getattr(tgt->ltd_exp, fid, oc, valid, ea_size, request);
+        if (op_data->op_valid & OBD_MD_MDTIDX) {
+                op_data->op_mds = tgt->ltd_idx;
+                RETURN(0);
+        }
+
+        rc = md_getattr(tgt->ltd_exp, op_data, request);
         if (rc)
                 RETURN(rc);
 
-        obj = lmv_object_find_lock(obd, fid);
+        obj = lmv_object_find_lock(obd, &op_data->op_fid1);
 
-        CDEBUG(D_INODE, "GETATTR for "DFID" %s\n", PFID(fid),
+        CDEBUG(D_INODE, "GETATTR for "DFID" %s\n", PFID(&op_data->op_fid1),
                obj ? "(split)" : "");
 
         /*
@@ -1279,7 +1342,7 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
                                 continue;
                         }
 
-                        /* 
+                        /*
                          * Skip master object.
                          */
                         if (lu_fid_eq(&obj->lo_fid, &obj->lo_stripes[i].ls_fid))
@@ -1320,6 +1383,36 @@ static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid,
         RETURN(0);
 }
 
+static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
+                           ldlm_iterator_t it, void *data)
+{
+        struct obd_device   *obd = exp->exp_obd;
+        struct lmv_obd      *lmv = &obd->u.lmv;
+        int                  i;
+        int                  rc;
+        ENTRY;
+
+        rc = lmv_check_connect(obd);
+        if (rc)
+                RETURN(rc);
+
+        CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
+
+        /*
+         * With CMD every object can have two locks in different namespaces:
+         * lookup lock in space of mds storing direntry and update/open lock in
+         * space of mds storing inode.
+         */
+        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+                rc = md_find_cbdata(lmv->tgts[i].ltd_exp, fid, it, data);
+                if (rc)
+                        RETURN(rc);
+        }
+
+        RETURN(rc);
+}
+
+
 static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
                      struct md_open_data *mod, struct ptlrpc_request **request)
 {
@@ -1354,6 +1447,7 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
         struct lmv_tgt_desc     *tgt;
         struct lmv_object       *obj;
         struct lustre_md         md;
+        struct md_op_data       *op_data;
         int                      mealen;
         int                      rc;
         __u64                    valid;
@@ -1368,10 +1462,20 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
         if (IS_ERR(tgt))
                 RETURN(PTR_ERR(tgt));
 
-        /* 
-         * Time to update mea of parent fid. 
+        /*
+         * Time to update mea of parent fid.
          */
-        rc = md_getattr(tgt->ltd_exp, fid, NULL, valid, mealen, &req);
+
+        OBD_ALLOC_PTR(op_data);
+        if (op_data == NULL) 
+                RETURN(-ENOMEM);
+
+        op_data->op_fid1 = *fid;
+        op_data->op_mode = mealen;
+        op_data->op_valid = valid;
+
+        rc = md_getattr(tgt->ltd_exp, op_data, &req);
+        OBD_FREE_PTR(op_data);
         if (rc) {
                 CERROR("md_getattr() failed, error %d\n", rc);
                 GOTO(cleanup, rc);
@@ -1448,7 +1552,7 @@ repeat:
         else if (rc)
                 RETURN(rc);
 
-        CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #"LPU64"\n", 
+        CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
                op_data->op_mds);
 
@@ -1559,8 +1663,8 @@ cleanup:
         OBD_FREE_PTR(op_data2);
 
         if (rc != 0) {
-                /* 
-                 * Drop all taken locks. 
+                /*
+                 * Drop all taken locks.
                  */
                 while (--i >= 0) {
                         if (lockh[i].cookie)
@@ -1598,8 +1702,8 @@ lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
         CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n",
                LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
 
-        /* 
-         * We got LOOKUP lock, but we really need attrs. 
+        /*
+         * We got LOOKUP lock, but we really need attrs.
          */
         pmode = it->d.lustre.it_lock_mode;
         LASSERT(pmode != 0);
@@ -1677,7 +1781,7 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 
         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
                LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
-        
+
         rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
                         lmm, lmmsize, req, extra_lock_flags);
 
@@ -1689,18 +1793,17 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 }
 
 static int
-lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
-                 struct obd_capa *oc, const char *name, int namelen,
-                 obd_valid valid, int ea_size, __u32 suppgid,
+lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data,
                  struct ptlrpc_request **request)
 {
         struct ptlrpc_request   *req = NULL;
         struct obd_device       *obd = exp->exp_obd;
         struct lmv_obd          *lmv = &obd->u.lmv;
-        struct lu_fid            rid = *fid;
+        struct lu_fid            rid = op_data->op_fid1;
         struct lmv_tgt_desc     *tgt;
         struct mdt_body         *body;
         struct lmv_object       *obj;
+        obd_valid                valid = op_data->op_valid;
         int                      rc;
         int                      loop = 0;
         int                      sidx;
@@ -1716,23 +1819,27 @@ repeat:
         obj = lmv_object_find(obd, &rid);
         if (obj) {
                 sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
-                                       name, namelen - 1);
+                                    op_data->op_name, op_data->op_namelen);
                 rid = obj->lo_stripes[sidx].ls_fid;
                 tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds);
+                op_data->op_mds = obj->lo_stripes[sidx].ls_mds;
                 valid &= ~OBD_MD_FLCKSPLIT;
                 lmv_object_put(obj);
         } else {
                 tgt = lmv_find_target(lmv, &rid);
                 valid |= OBD_MD_FLCKSPLIT;
+                op_data->op_mds = tgt->ltd_idx;
         }
         if (IS_ERR(tgt))
                 RETURN(PTR_ERR(tgt));
 
         CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" - "DFID" -> mds #%d\n",
-               namelen, name, PFID(fid), PFID(&rid), tgt->ltd_idx);
+               op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
+               PFID(&rid), tgt->ltd_idx);
 
-        rc = md_getattr_name(tgt->ltd_exp, &rid, oc, name, namelen, valid,
-                             ea_size, suppgid, request);
+        op_data->op_valid = valid;
+        op_data->op_fid1 = rid;
+        rc = md_getattr_name(tgt->ltd_exp, op_data, request);
         if (rc == 0) {
                 body = req_capsule_server_get(&(*request)->rq_pill,
                                               &RMF_MDT_BODY);
@@ -1749,9 +1856,11 @@ repeat:
                                 RETURN(PTR_ERR(tgt));
                         }
 
-                        rc = md_getattr_name(tgt->ltd_exp, &rid, NULL, NULL,
-                                             1, valid | OBD_MD_FLCROSSREF,
-                                             ea_size, suppgid, &req);
+                        op_data->op_fid1 = rid;
+                        op_data->op_valid |= OBD_MD_FLCROSSREF;
+                        op_data->op_namelen = 0;
+                        op_data->op_name = NULL;
+                        rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
                         ptlrpc_req_finished(*request);
                         *request = req;
                 }
@@ -1802,7 +1911,7 @@ static int lmv_early_cancel_slaves(struct obd_export *exp,
         obj = lmv_object_find(obd, op_fid);
         if (obj == NULL)
                 RETURN(-EALREADY);
-                
+
         policy.l_inodebits.bits = bits;
         for (i = 0; i < obj->lo_objcount; i++) {
                 tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds);
@@ -1810,12 +1919,12 @@ static int lmv_early_cancel_slaves(struct obd_export *exp,
                 if (op_tgt != tgt->ltd_idx) {
                         CDEBUG(D_INODE, "EARLY_CANCEL slave "DFID" -> mds #%d\n",
                                PFID(st_fid), tgt->ltd_idx);
-                        rc = md_cancel_unused(tgt->ltd_exp, st_fid, &policy, 
-                                              mode, LDLM_FL_ASYNC, NULL);
+                        rc = md_cancel_unused(tgt->ltd_exp, st_fid, &policy,
+                                              mode, LCF_ASYNC, NULL);
                         if (rc)
                                 GOTO(out_put_obj, rc);
                 } else {
-                        CDEBUG(D_INODE, 
+                        CDEBUG(D_INODE,
                                "EARLY_CANCEL skip operation target %d on "DFID"\n",
                                op_tgt, PFID(st_fid));
                         /*
@@ -1862,9 +1971,9 @@ static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
                         CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
                         policy.l_inodebits.bits = bits;
                         rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
-                                              mode, LDLM_FL_ASYNC, NULL);
+                                              mode, LCF_ASYNC, NULL);
                 } else {
-                        CDEBUG(D_INODE, 
+                        CDEBUG(D_INODE,
                                "EARLY_CANCEL skip operation target %d on "DFID"\n",
                                op_tgt, PFID(fid));
                         op_data->op_flags |= flag;
@@ -1920,16 +2029,16 @@ repeat:
                         RETURN(rc);
         }
 
-        CDEBUG(D_INODE, "Forward to mds #"LPU64" ("DFID")\n",
+        CDEBUG(D_INODE, "Forward to mds #%x ("DFID")\n",
                mds, PFID(&op_data->op_fid1));
 
-        op_data->op_fsuid = current->fsuid;
-        op_data->op_fsgid = current->fsgid;
+        op_data->op_fsuid = cfs_curproc_fsuid();
+        op_data->op_fsgid = cfs_curproc_fsgid();
         op_data->op_cap = cfs_curproc_cap_pack();
         tgt = lmv_get_target(lmv, mds);
 
-        /* 
-         * Cancel UPDATE lock on child (fid1). 
+        /*
+         * Cancel UPDATE lock on child (fid1).
          */
         op_data->op_flags |= MF_MDC_CANCEL_FID2;
         rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
@@ -2017,41 +2126,41 @@ repeat:
                         RETURN(rc);
         }
 
-        op_data->op_fsuid = current->fsuid;
-        op_data->op_fsgid = current->fsgid;
+        op_data->op_fsuid = cfs_curproc_fsuid();
+        op_data->op_fsgid = cfs_curproc_fsgid();
         op_data->op_cap = cfs_curproc_cap_pack();
 
         src_tgt = lmv_get_target(lmv, mds1);
         tgt_tgt = lmv_get_target(lmv, mds2);
 
-        /* 
+        /*
          * LOOKUP lock on src child (fid3) should also be cancelled for
-         * src_tgt in mdc_rename. 
+         * src_tgt in mdc_rename.
          */
         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
 
-        /* 
+        /*
          * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
-         * own target. 
+         * own target.
          */
-        rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, 
-                              LCK_EX, MDS_INODELOCK_UPDATE, 
+        rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+                              LCK_EX, MDS_INODELOCK_UPDATE,
                               MF_MDC_CANCEL_FID2);
 
-        /* 
+        /*
          * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
          */
         if (rc == 0) {
-                rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, 
+                rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
                                       LCK_EX, MDS_INODELOCK_LOOKUP,
                                       MF_MDC_CANCEL_FID4);
         }
 
-        /* 
-         * Cancel all the locks on tgt child (fid4). 
+        /*
+         * Cancel all the locks on tgt child (fid4).
          */
         if (rc == 0)
-                rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, 
+                rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
                                       LCK_EX, MDS_INODELOCK_FULL,
                                       MF_MDC_CANCEL_FID4);
 
@@ -2061,7 +2170,7 @@ repeat:
 
         if (rc == -ERESTART) {
                 LASSERT(*request != NULL);
-                DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, 
+                DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
                           "Got -ERESTART during rename!\n");
                 ptlrpc_req_finished(*request);
                 *request = NULL;
@@ -2163,7 +2272,7 @@ static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
 
 /**
  * Main purpose of LMV blocking ast is to remove split directory LMV
- * presentation object (struct lmv_object) attached to the lock being revoked. 
+ * presentation object (struct lmv_object) attached to the lock being revoked.
  */
 int lmv_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                      void *data, int flag)
@@ -2183,7 +2292,7 @@ int lmv_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                 }
                 break;
         case LDLM_CB_CANCELING:
-                /* 
+                /*
                  * Time to drop cached attrs for split directory object
                  */
                 obj = lock->l_ast_data;
@@ -2311,7 +2420,7 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
                 hash_adj += rank * seg_size;
 
                 CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" "
-                       LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj, 
+                       LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj,
                        offset, tgt0_idx, offset + hash_adj, tgt_idx);
 
                 offset = (offset + hash_adj) & MAX_HASH_SIZE;
@@ -2350,7 +2459,7 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
                                 CDEBUG(D_INODE,
                                        ""DFID" reset end "LPX64" tgt %d\n",
                                        PFID(&rid),
-                                       le64_to_cpu(dp->ldp_hash_end), tgt_idx);
+                                       (__u64)le64_to_cpu(dp->ldp_hash_end), tgt_idx);
                         }
                 }
                 cfs_kunmap(page);
@@ -2406,21 +2515,21 @@ repeat:
                 op_data->op_bias |= MDS_CHECK_SPLIT;
         }
 
-        op_data->op_fsuid = current->fsuid;
-        op_data->op_fsgid = current->fsgid;
+        op_data->op_fsuid = cfs_curproc_fsuid();
+        op_data->op_fsgid = cfs_curproc_fsgid();
         op_data->op_cap = cfs_curproc_cap_pack();
 
-        /* 
+        /*
          * If child's fid is given, cancel unused locks for it if it is from
          * another export than parent.
          *
-         * LOOKUP lock for child (fid3) should also be cancelled on parent 
-         * tgt_tgt in mdc_unlink(). 
+         * LOOKUP lock for child (fid3) should also be cancelled on parent
+         * tgt_tgt in mdc_unlink().
          */
         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
 
-        /* 
-         * Cancel FULL locks on child (fid3). 
+        /*
+         * Cancel FULL locks on child (fid3).
          */
         rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
                               MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
@@ -2467,7 +2576,7 @@ static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
 }
 
 static int lmv_get_info(struct obd_export *exp, __u32 keylen,
-                        void *key, __u32 *vallen, void *val, 
+                        void *key, __u32 *vallen, void *val,
                         struct lov_stripe_md *lsm)
 {
         struct obd_device       *obd;
@@ -2495,8 +2604,8 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
                      i++, tgts++) {
 
-                        /* 
-                         * All tgts should be connected when this gets called. 
+                        /*
+                         * All tgts should be connected when this gets called.
                          */
                         if (!tgts || !tgts->ltd_exp) {
                                 CERROR("target not setup?\n");
@@ -2513,9 +2622,9 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
                 if (rc)
                         RETURN(rc);
 
-                /* 
+                /*
                  * Forwarding this request to first MDS, it should know LOV
-                 * desc. 
+                 * desc.
                  */
                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
                                   vallen, val, NULL);
@@ -2548,8 +2657,7 @@ int lmv_set_info_async(struct obd_export *exp, obd_count keylen,
         }
         lmv = &obd->u.lmv;
 
-        if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX) ||
-            KEY_IS(KEY_INIT_RECOV_BACKUP)) {
+        if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX)) {
                 int i, err = 0;
 
                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
@@ -2656,8 +2764,8 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
         {
                 magic = le32_to_cpu(mea->mea_magic);
         } else {
-                /* 
-                 * Old mea is not handled here. 
+                /*
+                 * Old mea is not handled here.
                  */
                 CERROR("Old not supportable EA is found\n");
                 LBUG();
@@ -2675,8 +2783,8 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
 }
 
 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
-                             ldlm_policy_data_t *policy, ldlm_mode_t mode, 
-                             int flags, void *opaque)
+                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
+                             ldlm_cancel_flags_t flags, void *opaque)
 {
         struct obd_device       *obd = exp->exp_obd;
         struct lmv_obd          *lmv = &obd->u.lmv;
@@ -2699,13 +2807,15 @@ static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
         RETURN(rc);
 }
 
-int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
+int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
+                      __u32 *bits)
 {
         struct obd_device       *obd = exp->exp_obd;
         struct lmv_obd          *lmv = &obd->u.lmv;
         int                      rc;
         ENTRY;
-        rc =  md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data);
+
+        rc =  md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data, bits);
         RETURN(rc);
 }
 
@@ -2722,11 +2832,11 @@ ldlm_mode_t lmv_lock_match(struct obd_export *exp, int flags,
 
         CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
 
-        /* 
+        /*
          * With CMD every object can have two locks in different namespaces:
          * lookup lock in space of mds storing direntry and update/open lock in
          * space of mds storing inode. Thus we check all targets, not only that
-         * one fid was created in. 
+         * one fid was created in.
          */
         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
                 rc = md_lock_match(lmv->tgts[i].ltd_exp, flags, fid,
@@ -2836,6 +2946,18 @@ static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
         RETURN(rc);
 }
 
+int lmv_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
+                    const struct req_msg_field *field, struct obd_capa **oc)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct lmv_obd *lmv = &obd->u.lmv;
+        int rc;
+
+        ENTRY;
+        rc = md_unpack_capa(lmv->tgts[0].ltd_exp, req, field, oc);
+        RETURN(rc);
+}
+
 int lmv_intent_getattr_async(struct obd_export *exp,
                              struct md_enqueue_info *minfo,
                              struct ldlm_enqueue_info *einfo)
@@ -2861,10 +2983,10 @@ int lmv_intent_getattr_async(struct obd_export *exp,
                                             (char *)op_data->op_name,
                                             op_data->op_namelen);
                         op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
-                        tgt = lmv_get_target(lmv, 
+                        tgt = lmv_get_target(lmv,
                                              obj->lo_stripes[sidx].ls_mds);
                         CDEBUG(D_INODE,
-                               "Choose slave dir ("DFID") -> mds #%d\n", 
+                               "Choose slave dir ("DFID") -> mds #%d\n",
                                PFID(&op_data->op_fid1), tgt->ltd_idx);
                 } else {
                         tgt = lmv_find_target(lmv, &op_data->op_fid1);
@@ -2882,7 +3004,7 @@ int lmv_intent_getattr_async(struct obd_export *exp,
                 if (minfo->mi_it.it_op & IT_LOOKUP)
                         minfo->mi_it.it_op = IT_GETATTR;
         }
-        
+
         if (IS_ERR(tgt))
                 RETURN(PTR_ERR(tgt));
 
@@ -2890,9 +3012,8 @@ int lmv_intent_getattr_async(struct obd_export *exp,
         RETURN(rc);
 }
 
-int lmv_revalidate_lock(struct obd_export *exp,
-                        struct lookup_intent *it,
-                        struct lu_fid *fid)
+int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
+                        struct lu_fid *fid, __u32 *bits)
 {
         struct obd_device       *obd = exp->exp_obd;
         struct lmv_obd          *lmv = &obd->u.lmv;
@@ -2908,7 +3029,7 @@ int lmv_revalidate_lock(struct obd_export *exp,
         if (IS_ERR(tgt))
                 RETURN(PTR_ERR(tgt));
 
-        rc = md_revalidate_lock(tgt->ltd_exp, it, fid);
+        rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
         RETURN(rc);
 }
 
@@ -2935,6 +3056,7 @@ struct obd_ops lmv_obd_ops = {
 struct md_ops lmv_md_ops = {
         .m_getstatus            = lmv_getstatus,
         .m_change_cbdata        = lmv_change_cbdata,
+        .m_find_cbdata          = lmv_find_cbdata,
         .m_close                = lmv_close,
         .m_create               = lmv_create,
         .m_done_writing         = lmv_done_writing,
@@ -2959,11 +3081,15 @@ struct md_ops lmv_md_ops = {
         .m_set_open_replay_data = lmv_set_open_replay_data,
         .m_clear_open_replay_data = lmv_clear_open_replay_data,
         .m_renew_capa           = lmv_renew_capa,
+        .m_unpack_capa          = lmv_unpack_capa,
         .m_get_remote_perm      = lmv_get_remote_perm,
         .m_intent_getattr_async = lmv_intent_getattr_async,
         .m_revalidate_lock      = lmv_revalidate_lock
 };
 
+static quota_interface_t *quota_interface;
+extern quota_interface_t lmv_quota_interface;
+
 int __init lmv_init(void)
 {
         struct lprocfs_static_vars lvars;
@@ -2978,10 +3104,18 @@ int __init lmv_init(void)
         }
 
         lprocfs_lmv_init_vars(&lvars);
+
+        cfs_request_module("lquota");
+        quota_interface = PORTAL_SYMBOL_GET(lmv_quota_interface);
+        init_obd_quota_ops(quota_interface, &lmv_obd_ops);
+
         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
                                  lvars.module_vars, LUSTRE_LMV_NAME, NULL);
-        if (rc)
+        if (rc) {
+                if (quota_interface)
+                        PORTAL_SYMBOL_PUT(lmv_quota_interface);
                 cfs_mem_cache_destroy(lmv_object_cache);
+        }
 
         return rc;
 }
@@ -2989,11 +3123,14 @@ int __init lmv_init(void)
 #ifdef __KERNEL__
 static void lmv_exit(void)
 {
+        if (quota_interface)
+                PORTAL_SYMBOL_PUT(lmv_quota_interface);
+
         class_unregister_type(LUSTRE_LMV_NAME);
 
-        LASSERTF(atomic_read(&lmv_object_count) == 0,
+        LASSERTF(cfs_atomic_read(&lmv_object_count) == 0,
                  "Can't free lmv objects cache, %d object(s) busy\n",
-                 atomic_read(&lmv_object_count));
+                 cfs_atomic_read(&lmv_object_count));
         cfs_mem_cache_destroy(lmv_object_cache);
 }