Whamcloud - gitweb
LU-4098 lmv: kernel crash due to misconfigured MDT
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
index 15aa738..20fb834 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Intel Corporation.
+ * Copyright (c) 2011, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -77,17 +77,18 @@ static void lmv_activate_target(struct lmv_obd *lmv,
  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
  */
-static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
-                              int activate)
-{
-        struct lmv_tgt_desc    *tgt;
-        struct obd_device      *obd;
-        int                     i;
-        int                     rc = 0;
-        ENTRY;
+static int lmv_set_mdc_active(struct lmv_obd *lmv,
+                             const struct obd_uuid *uuid,
+                             int activate)
+{
+       struct lmv_tgt_desc     *tgt = NULL;
+       struct obd_device       *obd;
+       __u32                    i;
+       int                      rc = 0;
+       ENTRY;
 
-        CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
-               lmv, uuid->uuid, activate);
+       CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
+                       lmv, uuid->uuid, activate);
 
        spin_lock(&lmv->lmv_lock);
        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
@@ -132,9 +133,10 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
 
 struct obd_uuid *lmv_get_uuid(struct obd_export *exp)
 {
-       struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
+       struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
+       struct lmv_tgt_desc     *tgt = lmv->tgts[0];
 
-       return obd_get_uuid(lmv->tgts[0]->ltd_exp);
+       return (tgt == NULL) ? NULL : obd_get_uuid(tgt->ltd_exp);
 }
 
 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
@@ -252,7 +254,7 @@ static int lmv_connect(const struct lu_env *env,
          * and MDC stuff will be called directly, for instance while reading
          * ../mdc/../kbytesfree procfs file, etc.
          */
-        if (data->ocd_connect_flags & OBD_CONNECT_REAL)
+       if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_REAL))
                 rc = lmv_check_connect(obd);
 
 #ifdef __KERNEL__
@@ -267,9 +269,8 @@ static int lmv_connect(const struct lu_env *env,
 
 static void lmv_set_timeouts(struct obd_device *obd)
 {
-       struct lmv_tgt_desc   *tgt;
-       struct lmv_obd        *lmv;
-       int                    i;
+       struct lmv_obd          *lmv;
+       __u32                    i;
 
         lmv = &obd->u.lmv;
         if (lmv->server_timeout == 0)
@@ -279,8 +280,9 @@ static void lmv_set_timeouts(struct obd_device *obd)
                 return;
 
        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-               tgt = lmv->tgts[i];
-               if (tgt == NULL || tgt->ltd_exp == NULL || tgt->ltd_active == 0)
+               struct lmv_tgt_desc *tgt = lmv->tgts[i];
+
+               if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
                        continue;
 
                obd_set_info_async(NULL, tgt->ltd_exp, sizeof(KEY_INTERMDS),
@@ -291,12 +293,12 @@ static void lmv_set_timeouts(struct obd_device *obd)
 static int lmv_init_ea_size(struct obd_export *exp, int easize,
                             int def_easize, int cookiesize)
 {
-        struct obd_device   *obd = exp->exp_obd;
-        struct lmv_obd      *lmv = &obd->u.lmv;
-        int                  i;
-        int                  rc = 0;
-        int                  change = 0;
-        ENTRY;
+       struct obd_device       *obd = exp->exp_obd;
+       struct lmv_obd          *lmv = &obd->u.lmv;
+       __u32                    i;
+       int                      rc = 0;
+       int                      change = 0;
+       ENTRY;
 
         if (lmv->max_easize < easize) {
                 lmv->max_easize = easize;
@@ -317,14 +319,14 @@ static int lmv_init_ea_size(struct obd_export *exp, int easize,
                 RETURN(0);
 
        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-               if (lmv->tgts[i] == NULL ||
-                   lmv->tgts[i]->ltd_exp == NULL ||
-                   lmv->tgts[i]->ltd_active == 0) {
+               struct lmv_tgt_desc *tgt = lmv->tgts[i];
+
+               if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) {
                        CWARN("%s: NULL export for %d\n", obd->obd_name, i);
                        continue;
                }
 
-               rc = md_init_ea_size(lmv->tgts[i]->ltd_exp, easize, def_easize,
+               rc = md_init_ea_size(tgt->ltd_exp, easize, def_easize,
                                     cookiesize);
                if (rc) {
                        CERROR("%s: obd_init_ea_size() failed on MDT target %d:"
@@ -555,12 +557,12 @@ static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
 
 int lmv_check_connect(struct obd_device *obd)
 {
-        struct lmv_obd       *lmv = &obd->u.lmv;
-        struct lmv_tgt_desc  *tgt;
-        int                   i;
-        int                   rc;
-        int                   easize;
-        ENTRY;
+       struct lmv_obd          *lmv = &obd->u.lmv;
+       struct lmv_tgt_desc     *tgt;
+       __u32                    i;
+       int                      rc;
+       int                      easize;
+       ENTRY;
 
         if (lmv->connected)
                 RETURN(0);
@@ -577,11 +579,18 @@ int lmv_check_connect(struct obd_device *obd)
                 RETURN(-EINVAL);
         }
 
-        CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
-               lmv->cluuid.uuid, obd->obd_name);
-
        LASSERT(lmv->tgts != NULL);
 
+       if (lmv->tgts[0] == NULL) {
+               lmv_init_unlock(lmv);
+               CERROR("%s: no target configured for index 0.\n",
+                      obd->obd_name);
+               RETURN(-EINVAL);
+       }
+
+       CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
+              lmv->cluuid.uuid, obd->obd_name);
+
        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
                tgt = lmv->tgts[i];
                if (tgt == NULL)
@@ -681,14 +690,14 @@ static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
 
 static int lmv_disconnect(struct obd_export *exp)
 {
-        struct obd_device     *obd = class_exp2obd(exp);
+       struct obd_device       *obd = class_exp2obd(exp);
 #ifdef __KERNEL__
-        struct proc_dir_entry *lmv_proc_dir;
+       struct proc_dir_entry   *lmv_proc_dir;
 #endif
-        struct lmv_obd        *lmv = &obd->u.lmv;
-        int                    rc;
-        int                    i;
-        ENTRY;
+       struct lmv_obd          *lmv = &obd->u.lmv;
+       int                      rc;
+       __u32                    i;
+       ENTRY;
 
         if (!lmv->tgts)
                 goto out_local;
@@ -812,16 +821,166 @@ out_fid2path:
        RETURN(rc);
 }
 
+static int lmv_hsm_req_count(struct lmv_obd *lmv,
+                            const struct hsm_user_request *hur,
+                            const struct lmv_tgt_desc *tgt_mds)
+{
+       __u32                    i;
+       int                      nr = 0;
+       struct lmv_tgt_desc     *curr_tgt;
+
+       /* count how many requests must be sent to the given target */
+       for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
+               curr_tgt = lmv_find_target(lmv, &hur->hur_user_item[i].hui_fid);
+               if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid))
+                       nr++;
+       }
+       return nr;
+}
+
+static void lmv_hsm_req_build(struct lmv_obd *lmv,
+                             struct hsm_user_request *hur_in,
+                             const struct lmv_tgt_desc *tgt_mds,
+                             struct hsm_user_request *hur_out)
+{
+       __u32                    i, nr_out;
+       struct lmv_tgt_desc     *curr_tgt;
+
+       /* build the hsm_user_request for the given target */
+       hur_out->hur_request = hur_in->hur_request;
+       nr_out = 0;
+       for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) {
+               curr_tgt = lmv_find_target(lmv,
+                                          &hur_in->hur_user_item[i].hui_fid);
+               if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) {
+                       hur_out->hur_user_item[nr_out] =
+                                               hur_in->hur_user_item[i];
+                       nr_out++;
+               }
+       }
+       hur_out->hur_request.hr_itemcount = nr_out;
+       memcpy(hur_data(hur_out), hur_data(hur_in),
+              hur_in->hur_request.hr_data_len);
+}
+
+static int lmv_hsm_ct_unregister(struct lmv_obd *lmv, unsigned int cmd, int len,
+                                struct lustre_kernelcomm *lk, void *uarg)
+{
+       __u32                    i;
+       int                      rc;
+       struct kkuc_ct_data     *kcd = NULL;
+       ENTRY;
+
+       /* unregister request (call from llapi_hsm_copytool_fini) */
+       for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+               struct lmv_tgt_desc *tgt = lmv->tgts[i];
+
+               if (tgt == NULL || tgt->ltd_exp == NULL)
+                       continue;
+               /* best effort: try to clean as much as possible
+                * (continue on error) */
+               obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg);
+       }
+
+       /* Whatever the result, remove copytool from kuc groups.
+        * Unreached coordinators will get EPIPE on next requests
+        * and will unregister automatically.
+        */
+       rc = libcfs_kkuc_group_rem(lk->lk_uid, lk->lk_group, (void **)&kcd);
+       if (kcd != NULL)
+               OBD_FREE_PTR(kcd);
+
+       RETURN(rc);
+}
+
+static int lmv_hsm_ct_register(struct lmv_obd *lmv, unsigned int cmd, int len,
+                              struct lustre_kernelcomm *lk, void *uarg)
+{
+       struct file             *filp;
+       __u32                    i, j;
+       int                      err, rc;
+       bool                     any_set = false;
+       struct kkuc_ct_data     *kcd;
+       ENTRY;
+
+       /* All or nothing: try to register to all MDS.
+        * In case of failure, unregister from previous MDS,
+        * except if it because of inactive target. */
+       for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+               struct lmv_tgt_desc *tgt = lmv->tgts[i];
+
+               if (tgt == NULL || tgt->ltd_exp == NULL)
+                       continue;
+               err = obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg);
+               if (err) {
+                       if (tgt->ltd_active) {
+                               /* permanent error */
+                               CERROR("%s: iocontrol MDC %s on MDT"
+                                      " idx %d cmd %x: err = %d\n",
+                                      class_exp2obd(lmv->exp)->obd_name,
+                                      tgt->ltd_uuid.uuid, i, cmd, err);
+                               rc = err;
+                               lk->lk_flags |= LK_FLG_STOP;
+                               /* unregister from previous MDS */
+                               for (j = 0; j < i; j++) {
+                                       tgt = lmv->tgts[j];
+                                       if (tgt == NULL || tgt->ltd_exp == NULL)
+                                               continue;
+                                       obd_iocontrol(cmd, tgt->ltd_exp, len,
+                                                     lk, uarg);
+                               }
+                               RETURN(rc);
+                       }
+                       /* else: transient error.
+                        * kuc will register to the missing MDT
+                        * when it is back */
+               } else {
+                       any_set = true;
+               }
+       }
+
+       if (!any_set)
+               /* no registration done: return error */
+               RETURN(-ENOTCONN);
+
+       /* at least one registration done, with no failure */
+       filp = fget(lk->lk_wfd);
+       if (filp == NULL)
+               RETURN(-EBADF);
+
+       OBD_ALLOC_PTR(kcd);
+       if (kcd == NULL) {
+               fput(filp);
+               RETURN(-ENOMEM);
+       }
+       kcd->kcd_magic = KKUC_CT_DATA_MAGIC;
+       kcd->kcd_uuid = lmv->cluuid;
+       kcd->kcd_archive = lk->lk_data;
+
+       rc = libcfs_kkuc_group_add(filp, lk->lk_uid, lk->lk_group, kcd);
+       if (rc != 0) {
+               if (filp != NULL)
+                       fput(filp);
+               OBD_FREE_PTR(kcd);
+       }
+
+       RETURN(rc);
+}
+
+
+
+
 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                          int len, void *karg, void *uarg)
 {
-        struct obd_device    *obddev = class_exp2obd(exp);
-        struct lmv_obd       *lmv = &obddev->u.lmv;
-        int                   i = 0;
-        int                   rc = 0;
-        int                   set = 0;
-        int                   count = lmv->desc.ld_tgt_count;
-        ENTRY;
+       struct obd_device       *obddev = class_exp2obd(exp);
+       struct lmv_obd          *lmv = &obddev->u.lmv;
+       struct lmv_tgt_desc     *tgt = NULL;
+       __u32                    i = 0;
+       int                      rc = 0;
+       int                      set = 0;
+       __u32                    count = lmv->desc.ld_tgt_count;
+       ENTRY;
 
         if (count == 0)
                 RETURN(-ENOTTY);
@@ -837,39 +996,38 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                 if ((index >= count))
                         RETURN(-ENODEV);
 
-               if (lmv->tgts[index] == NULL ||
-                   lmv->tgts[index]->ltd_active == 0)
+               tgt = lmv->tgts[index];
+               if (tgt == NULL || !tgt->ltd_active)
                        RETURN(-ENODATA);
 
-               mdc_obd = class_exp2obd(lmv->tgts[index]->ltd_exp);
+               mdc_obd = class_exp2obd(tgt->ltd_exp);
                if (!mdc_obd)
                        RETURN(-EINVAL);
 
                /* copy UUID */
-               if (cfs_copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
-                                    min((int) data->ioc_plen2,
-                                        (int) sizeof(struct obd_uuid))))
+               if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
+                                min((int) data->ioc_plen2,
+                                    (int) sizeof(struct obd_uuid))))
                        RETURN(-EFAULT);
 
-               rc = obd_statfs(NULL, lmv->tgts[index]->ltd_exp, &stat_buf,
+               rc = obd_statfs(NULL, tgt->ltd_exp, &stat_buf,
                                cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
                                0);
                if (rc)
                        RETURN(rc);
-               if (cfs_copy_to_user(data->ioc_pbuf1, &stat_buf,
-                                    min((int) data->ioc_plen1,
-                                        (int) sizeof(stat_buf))))
+               if (copy_to_user(data->ioc_pbuf1, &stat_buf,
+                                min((int) data->ioc_plen1,
+                                    (int) sizeof(stat_buf))))
                        RETURN(-EFAULT);
                break;
         }
         case OBD_IOC_QUOTACTL: {
                 struct if_quotactl *qctl = karg;
-                struct lmv_tgt_desc *tgt = NULL;
                 struct obd_quotactl *oqctl;
 
-                if (qctl->qc_valid == QC_MDTIDX) {
-                        if (qctl->qc_idx < 0 || count <= qctl->qc_idx)
-                                RETURN(-EINVAL);
+               if (qctl->qc_valid == QC_MDTIDX) {
+                       if (count <= qctl->qc_idx)
+                               RETURN(-EINVAL);
 
                        tgt = lmv->tgts[qctl->qc_idx];
                        if (tgt == NULL || tgt->ltd_exp == NULL)
@@ -895,7 +1053,7 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                 if (i >= count)
                         RETURN(-EAGAIN);
 
-                LASSERT(tgt && tgt->ltd_exp);
+                LASSERT(tgt != NULL && tgt->ltd_exp != NULL);
                 OBD_ALLOC_PTR(oqctl);
                 if (!oqctl)
                         RETURN(-ENOMEM);
@@ -917,18 +1075,17 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                 if (icc->icc_mdtindex >= count)
                         RETURN(-ENODEV);
 
-               if (lmv->tgts[icc->icc_mdtindex] == NULL ||
-                   lmv->tgts[icc->icc_mdtindex]->ltd_exp == NULL ||
-                   lmv->tgts[icc->icc_mdtindex]->ltd_active == 0)
+               tgt = lmv->tgts[icc->icc_mdtindex];
+               if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
                        RETURN(-ENODEV);
-               rc = obd_iocontrol(cmd, lmv->tgts[icc->icc_mdtindex]->ltd_exp,
-                                  sizeof(*icc), icc, NULL);
+               rc = obd_iocontrol(cmd, tgt->ltd_exp, sizeof(*icc), icc, NULL);
                break;
        }
        case LL_IOC_GET_CONNECT_FLAGS: {
-               if (lmv->tgts[0] == NULL)
+               tgt = lmv->tgts[0];
+               if (tgt == NULL || tgt->ltd_exp == NULL)
                        RETURN(-ENODATA);
-               rc = obd_iocontrol(cmd, lmv->tgts[0]->ltd_exp, len, karg, uarg);
+               rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
                break;
        }
        case OBD_IOC_FID2PATH: {
@@ -937,13 +1094,90 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
        }
        case LL_IOC_HSM_STATE_GET:
        case LL_IOC_HSM_STATE_SET:
-       case LL_IOC_HSM_ACTION:
+       case LL_IOC_HSM_ACTION: {
+               struct md_op_data       *op_data = karg;
+
+               tgt = lmv_find_target(lmv, &op_data->op_fid1);
+               if (IS_ERR(tgt))
+                       RETURN(PTR_ERR(tgt));
+
+               if (tgt->ltd_exp == NULL)
+                       RETURN(-EINVAL);
+
+               rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
+               break;
+       }
+       case LL_IOC_HSM_PROGRESS: {
+               const struct hsm_progress_kernel *hpk = karg;
+
+               tgt = lmv_find_target(lmv, &hpk->hpk_fid);
+               if (IS_ERR(tgt))
+                       RETURN(PTR_ERR(tgt));
+               rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
+               break;
+       }
+       case LL_IOC_HSM_REQUEST: {
+               struct hsm_user_request *hur = karg;
+               unsigned int reqcount = hur->hur_request.hr_itemcount;
+
+               if (reqcount == 0)
+                       RETURN(0);
+
+               /* if the request is about a single fid
+                * or if there is a single MDS, no need to split
+                * the request. */
+               if (reqcount == 1 || count == 1) {
+                       tgt = lmv_find_target(lmv,
+                                             &hur->hur_user_item[0].hui_fid);
+                       if (IS_ERR(tgt))
+                               RETURN(PTR_ERR(tgt));
+                       rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
+               } else {
+                       /* split fid list to their respective MDS */
+                       for (i = 0; i < count; i++) {
+                               unsigned int            nr, reqlen;
+                               int                     rc1;
+                               struct hsm_user_request *req;
+
+                               tgt = lmv->tgts[i];
+                               if (tgt == NULL || tgt->ltd_exp == NULL)
+                                       continue;
+
+                               nr = lmv_hsm_req_count(lmv, hur, tgt);
+                               if (nr == 0) /* nothing for this MDS */
+                                       continue;
+
+                               /* build a request with fids for this MDS */
+                               reqlen = offsetof(typeof(*hur),
+                                                 hur_user_item[nr])
+                                               + hur->hur_request.hr_data_len;
+                               OBD_ALLOC_LARGE(req, reqlen);
+                               if (req == NULL)
+                                       RETURN(-ENOMEM);
+
+                               lmv_hsm_req_build(lmv, hur, tgt, req);
+
+                               rc1 = obd_iocontrol(cmd, tgt->ltd_exp, reqlen,
+                                                   req, uarg);
+                               if (rc1 != 0 && rc == 0)
+                                       rc = rc1;
+                               OBD_FREE_LARGE(req, reqlen);
+                       }
+               }
+               break;
+       }
        case LL_IOC_LOV_SWAP_LAYOUTS: {
                struct md_op_data       *op_data = karg;
                struct lmv_tgt_desc     *tgt1, *tgt2;
 
                tgt1 = lmv_find_target(lmv, &op_data->op_fid1);
+               if (IS_ERR(tgt1))
+                       RETURN(PTR_ERR(tgt1));
+
                tgt2 = lmv_find_target(lmv, &op_data->op_fid2);
+               if (IS_ERR(tgt2))
+                       RETURN(PTR_ERR(tgt2));
+
                if ((tgt1->ltd_exp == NULL) || (tgt2->ltd_exp == NULL))
                        RETURN(-EINVAL);
 
@@ -954,28 +1188,34 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg);
                break;
        }
+       case LL_IOC_HSM_CT_START: {
+               struct lustre_kernelcomm *lk = karg;
+               if (lk->lk_flags & LK_FLG_STOP)
+                       rc = lmv_hsm_ct_unregister(lmv, cmd, len, lk, uarg);
+               else
+                       rc = lmv_hsm_ct_register(lmv, cmd, len, lk, uarg);
+               break;
+       }
        default:
                for (i = 0; i < count; i++) {
                        struct obd_device *mdc_obd;
                        int err;
 
-                       if (lmv->tgts[i] == NULL ||
-                           lmv->tgts[i]->ltd_exp == NULL)
+                       tgt = lmv->tgts[i];
+                       if (tgt == NULL || tgt->ltd_exp == NULL)
                                continue;
                        /* ll_umount_begin() sets force flag but for lmv, not
                         * mdc. Let's pass it through */
-                       mdc_obd = class_exp2obd(lmv->tgts[i]->ltd_exp);
+                       mdc_obd = class_exp2obd(tgt->ltd_exp);
                        mdc_obd->obd_force = obddev->obd_force;
-                       err = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len,
-                                           karg, uarg);
+                       err = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
                        if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
                                RETURN(err);
                        } else if (err) {
-                               if (lmv->tgts[i]->ltd_active) {
+                               if (tgt->ltd_active) {
                                        CERROR("error: iocontrol MDC %s on MDT"
-                                              "idx %d cmd %x: err = %d\n",
-                                               lmv->tgts[i]->ltd_uuid.uuid,
-                                               i, cmd, err);
+                                              " idx %d cmd %x: err = %d\n",
+                                              tgt->ltd_uuid.uuid, i, cmd, err);
                                        if (!rc)
                                                rc = err;
                                }
@@ -1079,21 +1319,23 @@ static int lmv_placement_policy(struct obd_device *obd,
 }
 
 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
-                    mdsno_t mds)
+                   mdsno_t mds)
 {
-        struct lmv_tgt_desc *tgt;
-        int                  rc;
-        ENTRY;
+       struct lmv_tgt_desc     *tgt;
+       int                      rc;
+       ENTRY;
 
-        tgt = lmv_get_target(lmv, mds);
+       tgt = lmv_get_target(lmv, mds);
+       if (IS_ERR(tgt))
+               RETURN(PTR_ERR(tgt));
 
-        /*
-         * New seq alloc and FLD setup should be atomic. Otherwise we may find
-         * on server that seq in new allocated fid is not yet known.
-         */
+       /*
+        * New seq alloc and FLD setup should be atomic. Otherwise we may find
+        * on server that seq in new allocated fid is not yet known.
+        */
        mutex_lock(&tgt->ltd_fid_mutex);
 
-       if (tgt == NULL || tgt->ltd_active == 0 || tgt->ltd_exp == NULL)
+       if (tgt->ltd_active == 0 || tgt->ltd_exp == NULL)
                GOTO(out, rc = -ENODEV);
 
         /*
@@ -1178,14 +1420,15 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
        lprocfs_lmv_init_vars(&lvars);
 
        lprocfs_obd_setup(obd, lvars.obd_vars);
+       lprocfs_alloc_md_stats(obd, 0);
 #ifdef LPROCFS
-        {
-                rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
-                                        0444, &lmv_proc_target_fops, obd);
-                if (rc)
-                        CWARN("%s: error adding LMV target_obd file: rc = %d\n",
-                               obd->obd_name, rc);
-       }
+       {
+               rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
+                                       0444, &lmv_proc_target_fops, obd);
+               if (rc)
+                       CWARN("%s: error adding LMV target_obd file: rc = %d\n",
+                             obd->obd_name, rc);
+       }
 #endif
        rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
                             LUSTRE_CLI_FLD_HASH_DHT);
@@ -1254,12 +1497,12 @@ out:
 static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
 {
-        struct obd_device     *obd = class_exp2obd(exp);
-        struct lmv_obd        *lmv = &obd->u.lmv;
-        struct obd_statfs     *temp;
-        int                    rc = 0;
-        int                    i;
-        ENTRY;
+       struct obd_device       *obd = class_exp2obd(exp);
+       struct lmv_obd          *lmv = &obd->u.lmv;
+       struct obd_statfs       *temp;
+       int                      rc = 0;
+       __u32                    i;
+       ENTRY;
 
         rc = lmv_check_connect(obd);
         if (rc)
@@ -1406,7 +1649,7 @@ static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
 {
         struct obd_device   *obd = exp->exp_obd;
         struct lmv_obd      *lmv = &obd->u.lmv;
-        int                  i;
+       __u32                i;
         int                  rc;
         ENTRY;
 
@@ -1435,7 +1678,7 @@ static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
 {
         struct obd_device   *obd = exp->exp_obd;
         struct lmv_obd      *lmv = &obd->u.lmv;
-        int                  i;
+       __u32                i;
         int                  rc;
         ENTRY;
 
@@ -1491,6 +1734,9 @@ struct lmv_tgt_desc
        struct lmv_tgt_desc *tgt;
 
        tgt = lmv_find_target(lmv, fid);
+       if (IS_ERR(tgt))
+               return tgt;
+
        op_data->op_mds = tgt->ltd_idx;
 
        return tgt;
@@ -1562,9 +1808,9 @@ static int lmv_done_writing(struct obd_export *exp,
 
 static int
 lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
-                   struct lookup_intent *it, struct md_op_data *op_data,
-                   struct lustre_handle *lockh, void *lmm, int lmmsize,
-                   int extra_lock_flags)
+                  struct lookup_intent *it, struct md_op_data *op_data,
+                  struct lustre_handle *lockh, void *lmm, int lmmsize,
+                  __u64 extra_lock_flags)
 {
         struct ptlrpc_request      *req = it->d.lustre.it_data;
         struct obd_device          *obd = exp->exp_obd;
@@ -1775,8 +2021,8 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
               PFID(&op_data->op_fid2), op_data->op_namelen,
               op_data->op_name, PFID(&op_data->op_fid1));
 
-       op_data->op_fsuid = cfs_curproc_fsuid();
-       op_data->op_fsgid = cfs_curproc_fsgid();
+       op_data->op_fsuid = current_fsuid();
+       op_data->op_fsgid = current_fsgid();
        op_data->op_cap = cfs_curproc_cap_pack();
        tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
        if (IS_ERR(tgt))
@@ -1817,8 +2063,8 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
         if (rc)
                 RETURN(rc);
 
-       op_data->op_fsuid = cfs_curproc_fsuid();
-       op_data->op_fsgid = cfs_curproc_fsgid();
+       op_data->op_fsuid = current_fsuid();
+       op_data->op_fsgid = current_fsgid();
        op_data->op_cap = cfs_curproc_cap_pack();
        src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
        if (IS_ERR(src_tgt))
@@ -1893,107 +2139,149 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
        RETURN(rc);
 }
 
-static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
-                    struct obd_capa *oc, struct ptlrpc_request **request)
+static int lmv_fsync(struct obd_export *exp, const struct lu_fid *fid,
+                    struct obd_capa *oc, struct ptlrpc_request **request)
 {
-        struct obd_device         *obd = exp->exp_obd;
-        struct lmv_obd            *lmv = &obd->u.lmv;
-        struct lmv_tgt_desc       *tgt;
-        int                        rc;
-        ENTRY;
+       struct obd_device       *obd = exp->exp_obd;
+       struct lmv_obd          *lmv = &obd->u.lmv;
+       struct lmv_tgt_desc     *tgt;
+       int                      rc;
+       ENTRY;
 
-        rc = lmv_check_connect(obd);
-        if (rc)
-                RETURN(rc);
+       rc = lmv_check_connect(obd);
+       if (rc != 0)
+               RETURN(rc);
 
-        tgt = lmv_find_target(lmv, fid);
-        if (IS_ERR(tgt))
-                RETURN(PTR_ERR(tgt));
+       tgt = lmv_find_target(lmv, fid);
+       if (IS_ERR(tgt))
+               RETURN(PTR_ERR(tgt));
 
-        rc = md_sync(tgt->ltd_exp, fid, oc, request);
-        RETURN(rc);
+       rc = md_fsync(tgt->ltd_exp, fid, oc, request);
+       RETURN(rc);
 }
 
-static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
-                        struct page **pages, struct ptlrpc_request **request)
+/*
+ * Adjust a set of pages, each page containing an array of lu_dirpages,
+ * so that each page can be used as a single logical lu_dirpage.
+ *
+ * A lu_dirpage is laid out as follows, where s = ldp_hash_start,
+ * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a
+ * struct lu_dirent.  It has size up to LU_PAGE_SIZE. The ldp_hash_end
+ * value is used as a cookie to request the next lu_dirpage in a
+ * directory listing that spans multiple pages (two in this example):
+ *   ________
+ *  |        |
+ * .|--------v-------   -----.
+ * |s|e|f|p|ent|ent| ... |ent|
+ * '--|--------------   -----'   Each CFS_PAGE contains a single
+ *    '------.                   lu_dirpage.
+ * .---------v-------   -----.
+ * |s|e|f|p|ent| 0 | ... | 0 |
+ * '-----------------   -----'
+ *
+ * However, on hosts where the native VM page size (PAGE_CACHE_SIZE) is
+ * larger than LU_PAGE_SIZE, a single host page may contain multiple
+ * lu_dirpages. After reading the lu_dirpages from the MDS, the
+ * ldp_hash_end of the first lu_dirpage refers to the one immediately
+ * after it in the same CFS_PAGE (arrows simplified for brevity, but
+ * in general e0==s1, e1==s2, etc.):
+ *
+ * .--------------------   -----.
+ * |s0|e0|f0|p|ent|ent| ... |ent|
+ * |---v----------------   -----|
+ * |s1|e1|f1|p|ent|ent| ... |ent|
+ * |---v----------------   -----|  Here, each CFS_PAGE contains
+ *             ...                 multiple lu_dirpages.
+ * |---v----------------   -----|
+ * |s'|e'|f'|p|ent|ent| ... |ent|
+ * '---|----------------   -----'
+ *     v
+ * .----------------------------.
+ * |        next CFS_PAGE       |
+ *
+ * This structure is transformed into a single logical lu_dirpage as follows:
+ *
+ * - Replace e0 with e' so the request for the next lu_dirpage gets the page
+ *   labeled 'next CFS_PAGE'.
+ *
+ * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
+ *   a hash collision with the next page exists.
+ *
+ * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span
+ *   to the first entry of the next lu_dirpage.
+ */
+#if PAGE_CACHE_SIZE > LU_PAGE_SIZE
+static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs)
 {
-        struct obd_device       *obd = exp->exp_obd;
-        struct lmv_obd          *lmv = &obd->u.lmv;
-        __u64                    offset = op_data->op_offset;
-        int                      rc;
-        int                      i;
-        /* number of pages read, in CFS_PAGE_SIZE */
-        int                      nrdpgs;
-        /* number of pages transferred in LU_PAGE_SIZE */
-        int                      nlupgs;
-        struct lmv_tgt_desc     *tgt;
-        struct lu_dirpage       *dp;
-        struct lu_dirent        *ent;
-        ENTRY;
+       int i;
 
-        rc = lmv_check_connect(obd);
-        if (rc)
-                RETURN(rc);
+       for (i = 0; i < ncfspgs; i++) {
+               struct lu_dirpage       *dp = kmap(pages[i]);
+               struct lu_dirpage       *first = dp;
+               struct lu_dirent        *end_dirent = NULL;
+               struct lu_dirent        *ent;
+               __u64                   hash_end = dp->ldp_hash_end;
+               __u32                   flags = dp->ldp_flags;
 
-       CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n",
-              offset, PFID(&op_data->op_fid1));
+               while (--nlupgs > 0) {
+                       ent = lu_dirent_start(dp);
+                       for (end_dirent = ent; ent != NULL;
+                            end_dirent = ent, ent = lu_dirent_next(ent));
+
+                       /* Advance dp to next lu_dirpage. */
+                       dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
+
+                       /* Check if we've reached the end of the CFS_PAGE. */
+                       if (!((unsigned long)dp & ~CFS_PAGE_MASK))
+                               break;
+
+                       /* Save the hash and flags of this lu_dirpage. */
+                       hash_end = dp->ldp_hash_end;
+                       flags = dp->ldp_flags;
+
+                       /* Check if lu_dirpage contains no entries. */
+                       if (!end_dirent)
+                               break;
+
+                       /* Enlarge the end entry lde_reclen from 0 to
+                        * first entry of next lu_dirpage. */
+                       LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0);
+                       end_dirent->lde_reclen =
+                               cpu_to_le16((char *)(dp->ldp_entries) -
+                                           (char *)end_dirent);
+               }
 
-       /*
-        * This case handle directory lookup in clustered metadata case (i.e.
-        * split directory is located on multiple md servers.)
-        * each server keeps directory entries for certain range of hashes.
-        * E.g. we have N server and suppose hash range is 0 to MAX_HASH.
-        * first server will keep records with hashes [ 0 ... MAX_HASH /N  - 1],
-        * second one with hashes [MAX_HASH / N ... 2 * MAX_HASH / N] and
-        * so on....
-        *      readdir can simply start reading entries from 0 - N server in
-        * order but that will not scale well as all client will request dir in
-        * to server in same order.
-        * Following algorithm does optimization:
-        * Instead of doing readdir in 1, 2, ...., N order, client with a
-        * rank R does readdir in R, R + 1, ..., N, 1, ... R - 1 order.
-        * (every client has rank R)
-        *      But ll_readdir() expect offset range [0 to MAX_HASH/N) but
-        * since client ask dir from MDS{R} client has pages with offsets
-        * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj
-        * on hash  values that we get.
-        * Since these codes might be still useful for sharded directory, so
-        * Keeping this code for further reference
-       if (0) {
-               LASSERT(nr > 0);
-               seg_size = MAX_HASH_SIZE;
-               do_div(seg_size, nr);
-               los      = obj->lo_stripes;
-               tgt      = lmv_get_target(lmv, los[0].ls_mds);
-               rank     = lmv_node_rank(tgt->ltd_exp, fid) % nr;
-               tgt_tmp  = offset;
-               do_div(tgt_tmp, seg_size);
-               tgt0_idx = do_div(tgt_tmp,  nr);
-               tgt_idx  = (tgt0_idx + rank) % nr;
-
-               if (tgt_idx < tgt0_idx)
-                        * Wrap around.
-                        *
-                        * Last segment has unusual length due to division
-                        * rounding.
-                       hash_adj = MAX_HASH_SIZE - seg_size * nr;
-               else
-                       hash_adj = 0;
+               first->ldp_hash_end = hash_end;
+               first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
+               first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
+
+               kunmap(pages[i]);
+       }
+       LASSERTF(nlupgs == 0, "left = %d", nlupgs);
+}
+#else
+#define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0)
+#endif /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */
 
-               hash_adj += rank * seg_size;
+static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
+                       struct page **pages, struct ptlrpc_request **request)
+{
+       struct obd_device       *obd = exp->exp_obd;
+       struct lmv_obd          *lmv = &obd->u.lmv;
+       __u64                   offset = op_data->op_offset;
+       int                     rc;
+       int                     ncfspgs; /* pages read in PAGE_CACHE_SIZE */
+       int                     nlupgs; /* pages read in LU_PAGE_SIZE */
+       struct lmv_tgt_desc     *tgt;
+       ENTRY;
 
-               CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" "
-                      LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj,
-                      offset, tgt0_idx, offset + hash_adj, tgt_idx);
+       rc = lmv_check_connect(obd);
+       if (rc)
+               RETURN(rc);
 
-               offset = (offset + hash_adj) & MAX_HASH_SIZE;
-               rid = lsm->mea_oinfo[tgt_idx].lmo_fid;
-               tgt = lmv_get_target(lmv, lsm->mea_oinfo[tgt_idx].lmo_mds);
+       CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n",
+              offset, PFID(&op_data->op_fid1));
 
-               CDEBUG(D_INODE, "Forward to "DFID" with offset %lu i %d\n",
-                      PFID(&rid), (unsigned long)offset, tgt_idx);
-       }
-       */
        tgt = lmv_find_target(lmv, &op_data->op_fid1);
        if (IS_ERR(tgt))
                RETURN(PTR_ERR(tgt));
@@ -2002,60 +2290,17 @@ static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
        if (rc != 0)
                RETURN(rc);
 
-       nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1)
-                >> CFS_PAGE_SHIFT;
+       ncfspgs = ((*request)->rq_bulk->bd_nob_transferred +
+                  PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
        nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
        LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
-       LASSERT(nrdpgs > 0 && nrdpgs <= op_data->op_npages);
+       LASSERT(ncfspgs > 0 && ncfspgs <= op_data->op_npages);
 
-       CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs,
+       CDEBUG(D_INODE, "read %d(%d)/%d pages\n", ncfspgs, nlupgs,
               op_data->op_npages);
 
-       for (i = 0; i < nrdpgs; i++) {
-#if CFS_PAGE_SIZE > LU_PAGE_SIZE
-               struct lu_dirpage *first;
-               __u64 hash_end = 0;
-               __u32 flags = 0;
-#endif
-               struct lu_dirent *tmp = NULL;
-
-               dp = cfs_kmap(pages[i]);
-               ent = lu_dirent_start(dp);
-#if CFS_PAGE_SIZE > LU_PAGE_SIZE
-               first = dp;
-               hash_end = dp->ldp_hash_end;
-repeat:
-#endif
-               nlupgs--;
-
-               for (tmp = ent; ent != NULL;
-                    tmp = ent, ent = lu_dirent_next(ent));
-#if CFS_PAGE_SIZE > LU_PAGE_SIZE
-               dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
-               if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) {
-                       ent = lu_dirent_start(dp);
+       lmv_adjust_dirpages(pages, ncfspgs, nlupgs);
 
-                       if (tmp) {
-                               /* enlarge the end entry lde_reclen from 0 to
-                                * first entry of next lu_dirpage, in this way
-                                * several lu_dirpages can be stored into one
-                                * client page on client. */
-                               tmp = ((void *)tmp) +
-                                     le16_to_cpu(tmp->lde_reclen);
-                               tmp->lde_reclen =
-                                       cpu_to_le16((char *)(dp->ldp_entries) -
-                                                   (char *)tmp);
-                               goto repeat;
-                       }
-               }
-               first->ldp_hash_end = hash_end;
-               first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
-               first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
-#else
-               SET_BUT_UNUSED(tmp);
-#endif
-               cfs_kunmap(pages[i]);
-       }
        RETURN(rc);
 }
 
@@ -2081,8 +2326,8 @@ retry:
        if (IS_ERR(tgt))
                RETURN(PTR_ERR(tgt));
 
-       op_data->op_fsuid = cfs_curproc_fsuid();
-       op_data->op_fsgid = cfs_curproc_fsgid();
+       op_data->op_fsuid = current_fsuid();
+       op_data->op_fsgid = current_fsgid();
        op_data->op_cap = cfs_curproc_cap_pack();
 
        /*
@@ -2160,6 +2405,7 @@ static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
         case OBD_CLEANUP_EXPORTS:
                 fld_client_proc_fini(&lmv->lmv_fld);
                 lprocfs_obd_cleanup(obd);
+               lprocfs_free_md_stats(obd);
                 break;
         default:
                 break;
@@ -2185,7 +2431,6 @@ static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
 
         lmv = &obd->u.lmv;
         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
-               struct lmv_tgt_desc *tgt;
                 int i;
 
                 rc = lmv_check_connect(obd);
@@ -2194,7 +2439,7 @@ static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
 
                 LASSERT(*vallen == sizeof(__u32));
                for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-                       tgt = lmv->tgts[i];
+                       struct lmv_tgt_desc *tgt = lmv->tgts[i];
                        /*
                         * All tgts should be connected when this gets called.
                         */
@@ -2276,7 +2521,7 @@ int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
         struct lmv_stripe_md      *meap;
         struct lmv_stripe_md      *lsmp;
         int                        mea_size;
-        int                        i;
+       __u32                      i;
         ENTRY;
 
         mea_size = lmv_get_easize(lmv);
@@ -2325,7 +2570,7 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
         struct lmv_stripe_md       *mea = (struct lmv_stripe_md *)lmm;
         struct lmv_obd             *lmv = &obd->u.lmv;
         int                         mea_size;
-        int                         i;
+       __u32                       i;
         __u32                       magic;
         ENTRY;
 
@@ -2380,32 +2625,36 @@ static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
         struct lmv_obd          *lmv = &obd->u.lmv;
         int                      rc = 0;
         int                      err;
-        int                      i;
+       __u32                    i;
         ENTRY;
 
         LASSERT(fid != NULL);
 
-        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-               if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL ||
-                   lmv->tgts[i]->ltd_active == 0)
+       for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+               struct lmv_tgt_desc *tgt = lmv->tgts[i];
+
+               if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
                        continue;
 
-               err = md_cancel_unused(lmv->tgts[i]->ltd_exp, fid,
-                                      policy, mode, flags, opaque);
+               err = md_cancel_unused(tgt->ltd_exp, fid, policy, mode, flags,
+                                      opaque);
                if (!rc)
                        rc = err;
-        }
+       }
        RETURN(rc);
 }
 
 int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
                       __u64 *bits)
 {
-       struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
-       int                      rc;
+       struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
+       struct lmv_tgt_desc     *tgt = lmv->tgts[0];
+       int                      rc;
        ENTRY;
 
-       rc =  md_set_lock_data(lmv->tgts[0]->ltd_exp, lockh, data, bits);
+       if (tgt == NULL || tgt->ltd_exp == NULL)
+               RETURN(-EINVAL);
+       rc =  md_set_lock_data(tgt->ltd_exp, lockh, data, bits);
        RETURN(rc);
 }
 
@@ -2417,7 +2666,7 @@ ldlm_mode_t lmv_lock_match(struct obd_export *exp, __u64 flags,
         struct obd_device       *obd = exp->exp_obd;
         struct lmv_obd          *lmv = &obd->u.lmv;
         ldlm_mode_t              rc;
-        int                      i;
+       __u32                    i;
         ENTRY;
 
         CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
@@ -2429,13 +2678,13 @@ ldlm_mode_t lmv_lock_match(struct obd_export *exp, __u64 flags,
          * one fid was created in.
          */
         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-               if (lmv->tgts[i] == NULL ||
-                   lmv->tgts[i]->ltd_exp == NULL ||
-                   lmv->tgts[i]->ltd_active == 0)
+               struct lmv_tgt_desc *tgt = lmv->tgts[i];
+
+               if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
                        continue;
 
-               rc = md_lock_match(lmv->tgts[i]->ltd_exp, flags, fid,
-                                   type, policy, mode, lockh);
+               rc = md_lock_match(tgt->ltd_exp, flags, fid, type, policy, mode,
+                                  lockh);
                 if (rc)
                         RETURN(rc);
         }
@@ -2447,20 +2696,26 @@ int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
                      struct obd_export *dt_exp, struct obd_export *md_exp,
                      struct lustre_md *md)
 {
-       struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
+       struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
+       struct lmv_tgt_desc     *tgt = lmv->tgts[0];
 
-       return md_get_lustre_md(lmv->tgts[0]->ltd_exp, req, dt_exp, md_exp, md);
+       if (tgt == NULL || tgt->ltd_exp == NULL)
+               RETURN(-EINVAL);
+       return md_get_lustre_md(tgt->ltd_exp, req, dt_exp, md_exp, md);
 }
 
 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
 {
-       struct obd_device       *obd = exp->exp_obd;
-       struct lmv_obd          *lmv = &obd->u.lmv;
+       struct obd_device       *obd = exp->exp_obd;
+       struct lmv_obd          *lmv = &obd->u.lmv;
+       struct lmv_tgt_desc     *tgt = lmv->tgts[0];
        ENTRY;
 
        if (md->mea)
                obd_free_memmd(exp, (void *)&md->mea);
-       RETURN(md_free_lustre_md(lmv->tgts[0]->ltd_exp, md));
+       if (tgt == NULL || tgt->ltd_exp == NULL)
+               RETURN(-EINVAL);
+       RETURN(md_free_lustre_md(tgt->ltd_exp, md));
 }
 
 int lmv_set_open_replay_data(struct obd_export *exp,
@@ -2541,9 +2796,12 @@ static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
 int lmv_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
                    const struct req_msg_field *field, struct obd_capa **oc)
 {
-       struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
+       struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
+       struct lmv_tgt_desc     *tgt = lmv->tgts[0];
 
-       return md_unpack_capa(lmv->tgts[0]->ltd_exp, req, field, oc);
+       if (tgt == NULL || tgt->ltd_exp == NULL)
+               RETURN(-EINVAL);
+       return md_unpack_capa(tgt->ltd_exp, req, field, oc);
 }
 
 int lmv_intent_getattr_async(struct obd_export *exp,
@@ -2601,14 +2859,18 @@ int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
        struct obd_device   *obd = class_exp2obd(exp);
        struct lmv_obd      *lmv = &obd->u.lmv;
        struct lmv_tgt_desc *tgt = lmv->tgts[0];
-       int                  rc = 0, i;
+       int                  rc = 0;
+       __u32                i;
        __u64                curspace, curinodes;
        ENTRY;
 
-        if (!lmv->desc.ld_tgt_count || !tgt->ltd_active) {
-                CERROR("master lmv inactive\n");
-                RETURN(-EIO);
-        }
+       if (tgt == NULL ||
+           tgt->ltd_exp == NULL ||
+           !tgt->ltd_active ||
+           lmv->desc.ld_tgt_count == 0) {
+               CERROR("master lmv inactive\n");
+               RETURN(-EIO);
+       }
 
         if (oqctl->qc_cmd != Q_GETOQUOTA) {
                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
@@ -2620,12 +2882,8 @@ int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
                int err;
                tgt = lmv->tgts[i];
 
-               if (tgt == NULL || tgt->ltd_exp == NULL || tgt->ltd_active == 0)
+               if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
                        continue;
-                if (!tgt->ltd_active) {
-                        CDEBUG(D_HA, "mdt %d is inactive.\n", i);
-                        continue;
-                }
 
                 err = obd_quotactl(tgt->ltd_exp, oqctl);
                 if (err) {
@@ -2646,14 +2904,15 @@ int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
 int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp,
                    struct obd_quotactl *oqctl)
 {
-        struct obd_device   *obd = class_exp2obd(exp);
-        struct lmv_obd      *lmv = &obd->u.lmv;
-        struct lmv_tgt_desc *tgt;
-        int                  i, rc = 0;
-        ENTRY;
+       struct obd_device       *obd = class_exp2obd(exp);
+       struct lmv_obd          *lmv = &obd->u.lmv;
+       struct lmv_tgt_desc     *tgt;
+       __u32                    i;
+       int                      rc = 0;
+       ENTRY;
 
        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-                int err;
+               int err;
                tgt = lmv->tgts[i];
                if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) {
                        CERROR("lmv idx %d inactive\n", i);
@@ -2704,7 +2963,7 @@ struct md_ops lmv_md_ops = {
         .m_rename               = lmv_rename,
         .m_setattr              = lmv_setattr,
         .m_setxattr             = lmv_setxattr,
-        .m_sync                 = lmv_sync,
+       .m_fsync                = lmv_fsync,
         .m_readpage             = lmv_readpage,
         .m_unlink               = lmv_unlink,
         .m_init_ea_size         = lmv_init_ea_size,