Whamcloud - gitweb
LU-3068 build: fix 'incorrect expression' errors
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
index 0b0790f..e55f4a0 100644 (file)
@@ -90,16 +90,17 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
                lmv, uuid->uuid, activate);
 
        spin_lock(&lmv->lmv_lock);
-        for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
-                if (tgt->ltd_exp == NULL)
-                        continue;
+       for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+               tgt = lmv->tgts[i];
+               if (tgt == NULL || tgt->ltd_exp == NULL)
+                       continue;
 
-                CDEBUG(D_INFO, "Target idx %d is %s conn "LPX64"\n",
-                       i, tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
+               CDEBUG(D_INFO, "Target idx %d is %s conn "LPX64"\n", i,
+                      tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
 
-                if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
-                        break;
-        }
+               if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
+                       break;
+       }
 
         if (i == lmv->desc.ld_tgt_count)
                 GOTO(out_lmv_lock, rc = -EINVAL);
@@ -129,33 +130,11 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
        return rc;
 }
 
-static int lmv_set_mdc_data(struct lmv_obd *lmv, struct obd_uuid *uuid,
-                           struct obd_connect_data *data)
+struct obd_uuid *lmv_get_uuid(struct obd_export *exp)
 {
-       struct lmv_tgt_desc    *tgt;
-       int                     i;
-       ENTRY;
-
-       LASSERT(data != NULL);
-
-       spin_lock(&lmv->lmv_lock);
-       for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
-               if (tgt->ltd_exp == NULL)
-                       continue;
+       struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
 
-               if (obd_uuid_equals(uuid, &tgt->ltd_uuid)) {
-                       lmv->datas[tgt->ltd_idx] = *data;
-                       break;
-               }
-       }
-       spin_unlock(&lmv->lmv_lock);
-       RETURN(0);
-}
-
-struct obd_uuid *lmv_get_uuid(struct obd_export *exp) {
-        struct obd_device *obd = exp->exp_obd;
-        struct lmv_obd *lmv = &obd->u.lmv;
-        return obd_get_uuid(lmv->tgts[0].ltd_exp);
+       return obd_get_uuid(lmv->tgts[0]->ltd_exp);
 }
 
 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
@@ -188,27 +167,15 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
                                uuid->uuid, rc);
                         RETURN(rc);
                 }
-        } else if (ev == OBD_NOTIFY_OCD) {
-                conn_data = &watched->u.cli.cl_import->imp_connect_data;
-
-                /*
-                 * Set connect data to desired target, update exp_connect_flags.
-                 */
-                rc = lmv_set_mdc_data(lmv, uuid, conn_data);
-                if (rc) {
-                        CERROR("can't set connect data to target %s, rc %d\n",
-                               uuid->uuid, rc);
-                        RETURN(rc);
-                }
-
-                /*
-                 * XXX: Make sure that ocd_connect_flags from all targets are
-                 * the same. Otherwise one of MDTs runs wrong version or
-                 * something like this.  --umka
-                 */
-                obd->obd_self_export->exp_connect_flags =
-                        conn_data->ocd_connect_flags;
-        }
+       } else if (ev == OBD_NOTIFY_OCD) {
+               conn_data = &watched->u.cli.cl_import->imp_connect_data;
+               /*
+                * XXX: Make sure that ocd_connect_flags from all targets are
+                * the same. Otherwise one of MDTs runs wrong version or
+                * something like this.  --umka
+                */
+               obd->obd_self_export->exp_connect_data = *conn_data;
+       }
 #if 0
         else if (ev == OBD_NOTIFY_DISCON) {
                 /*
@@ -300,9 +267,9 @@ static int lmv_connect(const struct lu_env *env,
 
 static void lmv_set_timeouts(struct obd_device *obd)
 {
-        struct lmv_tgt_desc   *tgts;
-        struct lmv_obd        *lmv;
-        int                    i;
+       struct lmv_tgt_desc   *tgt;
+       struct lmv_obd        *lmv;
+       int                    i;
 
         lmv = &obd->u.lmv;
         if (lmv->server_timeout == 0)
@@ -311,13 +278,14 @@ static void lmv_set_timeouts(struct obd_device *obd)
         if (lmv->connected == 0)
                 return;
 
-        for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
-                if (tgts->ltd_exp == NULL)
-                        continue;
+       for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+               tgt = lmv->tgts[i];
+               if (tgt == NULL || tgt->ltd_exp == NULL || tgt->ltd_active == 0)
+                       continue;
 
-                obd_set_info_async(NULL, tgts->ltd_exp, sizeof(KEY_INTERMDS),
-                                   KEY_INTERMDS, 0, NULL, NULL);
-        }
+               obd_set_info_async(NULL, tgt->ltd_exp, sizeof(KEY_INTERMDS),
+                                  KEY_INTERMDS, 0, NULL, NULL);
+       }
 }
 
 static int lmv_init_ea_size(struct obd_export *exp, int easize,
@@ -348,21 +316,23 @@ static int lmv_init_ea_size(struct obd_export *exp, int easize,
         if (lmv->connected == 0)
                 RETURN(0);
 
-        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-                if (lmv->tgts[i].ltd_exp == NULL) {
-                        CWARN("%s: NULL export for %d\n", obd->obd_name, i);
-                        continue;
-                }
+       for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+               if (lmv->tgts[i] == NULL ||
+                   lmv->tgts[i]->ltd_exp == NULL ||
+                   lmv->tgts[i]->ltd_active == 0) {
+                       CWARN("%s: NULL export for %d\n", obd->obd_name, i);
+                       continue;
+               }
 
-                rc = md_init_ea_size(lmv->tgts[i].ltd_exp, easize, def_easize,
-                                     cookiesize);
-                if (rc) {
-                        CERROR("obd_init_ea_size() failed on MDT target %d, "
-                               "error %d.\n", i, rc);
-                        break;
-                }
-        }
-        RETURN(rc);
+               rc = md_init_ea_size(lmv->tgts[i]->ltd_exp, easize, def_easize,
+                                    cookiesize);
+               if (rc) {
+                       CERROR("%s: obd_init_ea_size() failed on MDT target %d:"
+                              " rc = %d.\n", obd->obd_name, i, rc);
+                       break;
+               }
+       }
+       RETURN(rc);
 }
 
 #define MAX_STRING_SIZE 128
@@ -374,7 +344,6 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
 #endif
         struct lmv_obd          *lmv = &obd->u.lmv;
         struct obd_uuid         *cluuid = &lmv->cluuid;
-        struct obd_connect_data *mdc_data = NULL;
         struct obd_uuid          lmv_mdc_uuid = { "LMV_MDC_UUID" };
         struct obd_device       *mdc_obd;
         struct obd_export       *mdc_exp;
@@ -419,8 +388,6 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
 
         fld_client_add_target(&lmv->lmv_fld, &target);
 
-        mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data;
-
         rc = obd_register_observer(mdc_obd, obd);
         if (rc) {
                 obd_disconnect(mdc_exp);
@@ -433,23 +400,19 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
                 /*
                  * Tell the observer about the new target.
                  */
-                rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
-                                OBD_NOTIFY_ACTIVE, (void *)(tgt - lmv->tgts));
-                if (rc) {
-                        obd_disconnect(mdc_exp);
-                        RETURN(rc);
-                }
+               rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
+                               OBD_NOTIFY_ACTIVE,
+                               (void *)(tgt - lmv->tgts[0]));
+               if (rc) {
+                       obd_disconnect(mdc_exp);
+                       RETURN(rc);
+               }
         }
 
         tgt->ltd_active = 1;
         tgt->ltd_exp = mdc_exp;
         lmv->desc.ld_active_tgt_count++;
 
-        /*
-         * Copy connect data, it may be used later.
-         */
-        lmv->datas[tgt->ltd_idx] = *mdc_data;
-
         md_init_ea_size(tgt->ltd_exp, lmv->max_easize,
                         lmv->max_def_easize, lmv->max_cookiesize);
 
@@ -482,39 +445,94 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
         RETURN(0);
 }
 
-int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid)
+static void lmv_del_target(struct lmv_obd *lmv, int index)
+{
+       if (lmv->tgts[index] == NULL)
+               return;
+
+       OBD_FREE_PTR(lmv->tgts[index]);
+       lmv->tgts[index] = NULL;
+       return;
+}
+
+static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
+                          __u32 index, int gen)
 {
         struct lmv_obd      *lmv = &obd->u.lmv;
         struct lmv_tgt_desc *tgt;
         int                  rc = 0;
         ENTRY;
 
-        CDEBUG(D_CONFIG, "Target uuid: %s.\n", tgt_uuid->uuid);
+       CDEBUG(D_CONFIG, "Target uuid: %s. index %d\n", uuidp->uuid, index);
 
         lmv_init_lock(lmv);
 
-        if (lmv->desc.ld_active_tgt_count >= LMV_MAX_TGT_COUNT) {
-                lmv_init_unlock(lmv);
-                CERROR("Can't add %s, LMV module compiled for %d MDCs. "
-                       "That many MDCs already configured.\n",
-                       tgt_uuid->uuid, LMV_MAX_TGT_COUNT);
-                RETURN(-EINVAL);
-        }
-        if (lmv->desc.ld_tgt_count == 0) {
-                struct obd_device *mdc_obd;
+       if (lmv->desc.ld_tgt_count == 0) {
+               struct obd_device *mdc_obd;
 
-                mdc_obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME,
-                                                &obd->obd_uuid);
-                if (!mdc_obd) {
-                        lmv_init_unlock(lmv);
-                        CERROR("Target %s not attached\n", tgt_uuid->uuid);
-                        RETURN(-EINVAL);
-                }
-        }
-       spin_lock(&lmv->lmv_lock);
-       tgt = lmv->tgts + lmv->desc.ld_tgt_count++;
-       tgt->ltd_uuid = *tgt_uuid;
-       spin_unlock(&lmv->lmv_lock);
+               mdc_obd = class_find_client_obd(uuidp, LUSTRE_MDC_NAME,
+                                               &obd->obd_uuid);
+               if (!mdc_obd) {
+                       lmv_init_unlock(lmv);
+                       CERROR("%s: Target %s not attached: rc = %d\n",
+                              obd->obd_name, uuidp->uuid, -EINVAL);
+                       RETURN(-EINVAL);
+               }
+       }
+
+       if ((index < lmv->tgts_size) && (lmv->tgts[index] != NULL)) {
+               tgt = lmv->tgts[index];
+               CERROR("%s: UUID %s already assigned at LOV target index %d:"
+                      " rc = %d\n", obd->obd_name,
+                      obd_uuid2str(&tgt->ltd_uuid), index, -EEXIST);
+               lmv_init_unlock(lmv);
+               RETURN(-EEXIST);
+       }
+
+       if (index >= lmv->tgts_size) {
+               /* We need to reallocate the lmv target array. */
+               struct lmv_tgt_desc **newtgts, **old = NULL;
+               __u32 newsize = 1;
+               __u32 oldsize = 0;
+
+               while (newsize < index + 1)
+                       newsize = newsize << 1;
+               OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
+               if (newtgts == NULL) {
+                       lmv_init_unlock(lmv);
+                       RETURN(-ENOMEM);
+               }
+
+               if (lmv->tgts_size) {
+                       memcpy(newtgts, lmv->tgts,
+                              sizeof(*newtgts) * lmv->tgts_size);
+                       old = lmv->tgts;
+                       oldsize = lmv->tgts_size;
+               }
+
+               lmv->tgts = newtgts;
+               lmv->tgts_size = newsize;
+               smp_rmb();
+               if (old)
+                       OBD_FREE(old, sizeof(*old) * oldsize);
+
+               CDEBUG(D_CONFIG, "tgts: %p size: %d\n", lmv->tgts,
+                      lmv->tgts_size);
+       }
+
+       OBD_ALLOC_PTR(tgt);
+       if (!tgt) {
+               lmv_init_unlock(lmv);
+               RETURN(-ENOMEM);
+       }
+
+       mutex_init(&tgt->ltd_fid_mutex);
+       tgt->ltd_idx = index;
+       tgt->ltd_uuid = *uuidp;
+       tgt->ltd_active = 0;
+       lmv->tgts[index] = tgt;
+       if (index >= lmv->desc.ld_tgt_count)
+               lmv->desc.ld_tgt_count = index + 1;
 
        if (lmv->connected) {
                rc = lmv_connect_mdc(obd, tgt);
@@ -562,13 +580,16 @@ int lmv_check_connect(struct obd_device *obd)
         CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
                lmv->cluuid.uuid, obd->obd_name);
 
-        LASSERT(lmv->tgts != NULL);
+       LASSERT(lmv->tgts != NULL);
 
-        for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
-                rc = lmv_connect_mdc(obd, tgt);
-                if (rc)
-                        GOTO(out_disc, rc);
-        }
+       for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+               tgt = lmv->tgts[i];
+               if (tgt == NULL)
+                       continue;
+               rc = lmv_connect_mdc(obd, tgt);
+               if (rc)
+                       GOTO(out_disc, rc);
+       }
 
         lmv_set_timeouts(obd);
         class_export_put(lmv->exp);
@@ -581,7 +602,9 @@ int lmv_check_connect(struct obd_device *obd)
  out_disc:
         while (i-- > 0) {
                 int rc2;
-                --tgt;
+               tgt = lmv->tgts[i];
+               if (tgt == NULL)
+                       continue;
                 tgt->ltd_active = 0;
                 if (tgt->ltd_exp) {
                         --lmv->desc.ld_active_tgt_count;
@@ -678,9 +701,10 @@ static int lmv_disconnect(struct obd_export *exp)
                 goto out_local;
 
         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-                if (lmv->tgts[i].ltd_exp == NULL)
+               if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
                         continue;
-                lmv_disconnect_mdc(obd, &lmv->tgts[i]);
+
+               lmv_disconnect_mdc(obd, lmv->tgts[i]);
         }
 
 #ifdef __KERNEL__
@@ -706,6 +730,88 @@ out_local:
         RETURN(rc);
 }
 
+static int lmv_fid2path(struct obd_export *exp, int len, void *karg, void *uarg)
+{
+       struct obd_device       *obddev = class_exp2obd(exp);
+       struct lmv_obd          *lmv = &obddev->u.lmv;
+       struct getinfo_fid2path *gf;
+       struct lmv_tgt_desc     *tgt;
+       struct getinfo_fid2path *remote_gf = NULL;
+       int                     remote_gf_size = 0;
+       int                     rc;
+
+       gf = (struct getinfo_fid2path *)karg;
+       tgt = lmv_find_target(lmv, &gf->gf_fid);
+       if (IS_ERR(tgt))
+               RETURN(PTR_ERR(tgt));
+
+repeat_fid2path:
+       rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
+       if (rc != 0 && rc != -EREMOTE)
+               GOTO(out_fid2path, rc);
+
+       /* If remote_gf != NULL, it means just building the
+        * path on the remote MDT, copy this path segement to gf */
+       if (remote_gf != NULL) {
+               struct getinfo_fid2path *ori_gf;
+               char *ptr;
+
+               ori_gf = (struct getinfo_fid2path *)karg;
+               if (strlen(ori_gf->gf_path) +
+                   strlen(gf->gf_path) > ori_gf->gf_pathlen)
+                       GOTO(out_fid2path, rc = -EOVERFLOW);
+
+               ptr = ori_gf->gf_path;
+
+               memmove(ptr + strlen(gf->gf_path) + 1, ptr,
+                       strlen(ori_gf->gf_path));
+
+               strncpy(ptr, gf->gf_path, strlen(gf->gf_path));
+               ptr += strlen(gf->gf_path);
+               *ptr = '/';
+       }
+
+       CDEBUG(D_INFO, "%s: get path %s "DFID" rec: "LPU64" ln: %u\n",
+              tgt->ltd_exp->exp_obd->obd_name,
+              gf->gf_path, PFID(&gf->gf_fid), gf->gf_recno,
+              gf->gf_linkno);
+
+       if (rc == 0)
+               GOTO(out_fid2path, rc);
+
+       /* sigh, has to go to another MDT to do path building further */
+       if (remote_gf == NULL) {
+               remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
+               OBD_ALLOC(remote_gf, remote_gf_size);
+               if (remote_gf == NULL)
+                       GOTO(out_fid2path, rc = -ENOMEM);
+               remote_gf->gf_pathlen = PATH_MAX;
+       }
+
+       if (!fid_is_sane(&gf->gf_fid)) {
+               CERROR("%s: invalid FID "DFID": rc = %d\n",
+                      tgt->ltd_exp->exp_obd->obd_name,
+                      PFID(&gf->gf_fid), -EINVAL);
+               GOTO(out_fid2path, rc = -EINVAL);
+       }
+
+       tgt = lmv_find_target(lmv, &gf->gf_fid);
+       if (IS_ERR(tgt))
+               GOTO(out_fid2path, rc = -EINVAL);
+
+       remote_gf->gf_fid = gf->gf_fid;
+       remote_gf->gf_recno = -1;
+       remote_gf->gf_linkno = -1;
+       memset(remote_gf->gf_path, 0, remote_gf->gf_pathlen);
+       gf = remote_gf;
+       goto repeat_fid2path;
+
+out_fid2path:
+       if (remote_gf != NULL)
+               OBD_FREE(remote_gf, remote_gf_size);
+       RETURN(rc);
+}
+
 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                          int len, void *karg, void *uarg)
 {
@@ -731,29 +837,30 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                 if ((index >= count))
                         RETURN(-ENODEV);
 
-                if (!lmv->tgts[index].ltd_active)
-                        RETURN(-ENODATA);
+               if (lmv->tgts[index] == NULL ||
+                   lmv->tgts[index]->ltd_active == 0)
+                       RETURN(-ENODATA);
 
-                mdc_obd = class_exp2obd(lmv->tgts[index].ltd_exp);
-                if (!mdc_obd)
-                        RETURN(-EINVAL);
+               mdc_obd = class_exp2obd(lmv->tgts[index]->ltd_exp);
+               if (!mdc_obd)
+                       RETURN(-EINVAL);
 
-                /* copy UUID */
-                if (cfs_copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
-                                     min((int) data->ioc_plen2,
-                                         (int) sizeof(struct obd_uuid))))
-                        RETURN(-EFAULT);
+               /* copy UUID */
+               if (cfs_copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
+                                    min((int) data->ioc_plen2,
+                                        (int) sizeof(struct obd_uuid))))
+                       RETURN(-EFAULT);
 
-                rc = obd_statfs(NULL, lmv->tgts[index].ltd_exp, &stat_buf,
-                                cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
-                                0);
-                if (rc)
-                        RETURN(rc);
-                if (cfs_copy_to_user(data->ioc_pbuf1, &stat_buf,
-                                     min((int) data->ioc_plen1,
-                                         (int) sizeof(stat_buf))))
-                        RETURN(-EFAULT);
-                break;
+               rc = obd_statfs(NULL, lmv->tgts[index]->ltd_exp, &stat_buf,
+                               cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
+                               0);
+               if (rc)
+                       RETURN(rc);
+               if (cfs_copy_to_user(data->ioc_pbuf1, &stat_buf,
+                                    min((int) data->ioc_plen1,
+                                        (int) sizeof(stat_buf))))
+                       RETURN(-EFAULT);
+               break;
         }
         case OBD_IOC_QUOTACTL: {
                 struct if_quotactl *qctl = karg;
@@ -764,15 +871,17 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                         if (qctl->qc_idx < 0 || count <= qctl->qc_idx)
                                 RETURN(-EINVAL);
 
-                        tgt = &lmv->tgts[qctl->qc_idx];
-                        if (!tgt->ltd_exp)
-                                RETURN(-EINVAL);
-                } else if (qctl->qc_valid == QC_UUID) {
-                        for (i = 0; i < count; i++) {
-                                tgt = &lmv->tgts[i];
-                                if (!obd_uuid_equals(&tgt->ltd_uuid,
-                                                     &qctl->obd_uuid))
-                                        continue;
+                       tgt = lmv->tgts[qctl->qc_idx];
+                       if (tgt == NULL || tgt->ltd_exp == NULL)
+                               RETURN(-EINVAL);
+               } else if (qctl->qc_valid == QC_UUID) {
+                       for (i = 0; i < count; i++) {
+                               tgt = lmv->tgts[i];
+                               if (tgt == NULL)
+                                       continue;
+                               if (!obd_uuid_equals(&tgt->ltd_uuid,
+                                                    &qctl->obd_uuid))
+                                       continue;
 
                                 if (tgt->ltd_exp == NULL)
                                         RETURN(-EINVAL);
@@ -808,56 +917,74 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                 if (icc->icc_mdtindex >= count)
                         RETURN(-ENODEV);
 
-                rc = obd_iocontrol(cmd, lmv->tgts[icc->icc_mdtindex].ltd_exp,
-                                   sizeof(*icc), icc, NULL);
-                break;
-        }
-        case LL_IOC_GET_CONNECT_FLAGS: {
-                rc = obd_iocontrol(cmd, lmv->tgts[0].ltd_exp, len, karg, uarg);
-                break;
-        }
+               if (lmv->tgts[icc->icc_mdtindex] == NULL ||
+                   lmv->tgts[icc->icc_mdtindex]->ltd_exp == NULL ||
+                   lmv->tgts[icc->icc_mdtindex]->ltd_active == 0)
+                       RETURN(-ENODEV);
+               rc = obd_iocontrol(cmd, lmv->tgts[icc->icc_mdtindex]->ltd_exp,
+                                  sizeof(*icc), icc, NULL);
+               break;
+       }
+       case LL_IOC_GET_CONNECT_FLAGS: {
+               if (lmv->tgts[0] == NULL)
+                       RETURN(-ENODATA);
+               rc = obd_iocontrol(cmd, lmv->tgts[0]->ltd_exp, len, karg, uarg);
+               break;
+       }
        case OBD_IOC_FID2PATH: {
-               struct getinfo_fid2path *gf;
-               struct lmv_tgt_desc     *tgt;
-
-               gf = (struct getinfo_fid2path *)karg;
-               tgt = lmv_find_target(lmv, &gf->gf_fid);
-               if (IS_ERR(tgt))
-                       RETURN(PTR_ERR(tgt));
-               rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
+               rc = lmv_fid2path(exp, len, karg, uarg);
                break;
        }
-        default : {
-                for (i = 0; i < count; i++) {
-                        int err;
-                        struct obd_device *mdc_obd;
-
-                        if (lmv->tgts[i].ltd_exp == NULL)
-                                continue;
-                        /* ll_umount_begin() sets force flag but for lmv, not
-                         * mdc. Let's pass it through */
-                        mdc_obd = class_exp2obd(lmv->tgts[i].ltd_exp);
-                        mdc_obd->obd_force = obddev->obd_force;
-                        err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len,
-                                            karg, uarg);
-                        if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
-                                RETURN(err);
-                        } else if (err) {
-                                if (lmv->tgts[i].ltd_active) {
-                                        CERROR("error: iocontrol MDC %s on MDT"
-                                               "idx %d cmd %x: err = %d\n",
-                                                lmv->tgts[i].ltd_uuid.uuid,
-                                                i, cmd, err);
-                                        if (!rc)
-                                                rc = err;
-                                }
-                        } else
-                                set = 1;
+       case LL_IOC_HSM_STATE_GET:
+       case LL_IOC_HSM_STATE_SET:
+       case LL_IOC_HSM_ACTION:
+       case LL_IOC_LOV_SWAP_LAYOUTS: {
+               struct md_op_data       *op_data = karg;
+               struct lmv_tgt_desc     *tgt1, *tgt2;
+
+               tgt1 = lmv_find_target(lmv, &op_data->op_fid1);
+               tgt2 = lmv_find_target(lmv, &op_data->op_fid2);
+               if ((tgt1->ltd_exp == NULL) || (tgt2->ltd_exp == NULL))
+                       RETURN(-EINVAL);
+
+               /* only files on same MDT can be have their layouts swapped */
+               if (tgt1->ltd_idx != tgt2->ltd_idx)
+                       RETURN(-EPERM);
+
+               rc = obd_iocontrol(cmd, lmv->tgts[0]->ltd_exp, len, karg, uarg);
+               break;
+       }
+       default:
+               for (i = 0; i < count; i++) {
+                       struct obd_device *mdc_obd;
+                       int err;
+
+                       if (lmv->tgts[i] == NULL ||
+                           lmv->tgts[i]->ltd_exp == NULL)
+                               continue;
+                       /* ll_umount_begin() sets force flag but for lmv, not
+                        * mdc. Let's pass it through */
+                       mdc_obd = class_exp2obd(lmv->tgts[i]->ltd_exp);
+                       mdc_obd->obd_force = obddev->obd_force;
+                       err = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len,
+                                           karg, uarg);
+                       if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
+                               RETURN(err);
+                       } else if (err) {
+                               if (lmv->tgts[i]->ltd_active) {
+                                       CERROR("error: iocontrol MDC %s on MDT"
+                                              "idx %d cmd %x: err = %d\n",
+                                               lmv->tgts[i]->ltd_uuid.uuid,
+                                               i, cmd, err);
+                                       if (!rc)
+                                               rc = err;
+                               }
+                       } else
+                               set = 1;
                 }
                 if (!set && !rc)
                         rc = -EIO;
         }
-        }
         RETURN(rc);
 }
 
@@ -913,13 +1040,41 @@ static int lmv_placement_policy(struct obd_device *obd,
                                 struct md_op_data *op_data,
                                 mdsno_t *mds)
 {
+       struct lmv_obd          *lmv = &obd->u.lmv;
+       ENTRY;
+
        LASSERT(mds != NULL);
 
-       /* Allocate new fid on target according to to different
-        * QOS policy. In DNE phase I, llite should always tell
-        * which MDT where the dir will be located */
-       *mds = op_data->op_mds;
+       if (lmv->desc.ld_tgt_count == 1) {
+               *mds = 0;
+               RETURN(0);
+       }
 
+       /**
+        * If stripe_offset is provided during setdirstripe
+        * (setdirstripe -i xx), xx MDS will be choosen.
+        */
+       if (op_data->op_cli_flags & CLI_SET_MEA) {
+               struct lmv_user_md *lum;
+
+               lum = (struct lmv_user_md *)op_data->op_data;
+               if (lum->lum_type == LMV_STRIPE_TYPE &&
+                   lum->lum_stripe_offset != -1) {
+                       if (lum->lum_stripe_offset >= lmv->desc.ld_tgt_count) {
+                               CERROR("%s: Stripe_offset %d > MDT count %d:"
+                                      " rc = %d\n", obd->obd_name,
+                                      lum->lum_stripe_offset,
+                                      lmv->desc.ld_tgt_count, -ERANGE);
+                               RETURN(-ERANGE);
+                       }
+                       *mds = lum->lum_stripe_offset;
+                       RETURN(0);
+               }
+       }
+
+       /* Allocate new fid on target according to operation type and parent
+        * home mds. */
+       *mds = op_data->op_mds;
        RETURN(0);
 }
 
@@ -938,8 +1093,8 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
          */
        mutex_lock(&tgt->ltd_fid_mutex);
 
-        if (!tgt->ltd_active)
-                GOTO(out, rc = -ENODEV);
+       if (tgt == NULL || tgt->ltd_active == 0 || tgt->ltd_exp == NULL)
+               GOTO(out, rc = -ENODEV);
 
         /*
          * Asking underlaying tgt layer to allocate new fid.
@@ -990,7 +1145,6 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         struct lprocfs_static_vars  lvars;
         struct lmv_desc            *desc;
         int                         rc;
-        int                         i = 0;
         ENTRY;
 
         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
@@ -1005,36 +1159,25 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 RETURN(-EINVAL);
         }
 
-        lmv->tgts_size = LMV_MAX_TGT_COUNT * sizeof(struct lmv_tgt_desc);
-
-        OBD_ALLOC(lmv->tgts, lmv->tgts_size);
-        if (lmv->tgts == NULL)
-                RETURN(-ENOMEM);
-
-        for (i = 0; i < LMV_MAX_TGT_COUNT; i++) {
-               mutex_init(&lmv->tgts[i].ltd_fid_mutex);
-                lmv->tgts[i].ltd_idx = i;
-        }
-
-        lmv->datas_size = LMV_MAX_TGT_COUNT * sizeof(struct obd_connect_data);
+       OBD_ALLOC(lmv->tgts, sizeof(*lmv->tgts) * 32);
+       if (lmv->tgts == NULL)
+               RETURN(-ENOMEM);
+       lmv->tgts_size = 32;
 
-        OBD_ALLOC(lmv->datas, lmv->datas_size);
-        if (lmv->datas == NULL)
-                GOTO(out_free_tgts, rc = -ENOMEM);
-
-        obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
-        lmv->desc.ld_tgt_count = 0;
-        lmv->desc.ld_active_tgt_count = 0;
-        lmv->max_cookiesize = 0;
-        lmv->max_def_easize = 0;
-        lmv->max_easize = 0;
-        lmv->lmv_placement = PLACEMENT_CHAR_POLICY;
+       obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
+       lmv->desc.ld_tgt_count = 0;
+       lmv->desc.ld_active_tgt_count = 0;
+       lmv->max_cookiesize = 0;
+       lmv->max_def_easize = 0;
+       lmv->max_easize = 0;
+       lmv->lmv_placement = PLACEMENT_CHAR_POLICY;
 
        spin_lock_init(&lmv->lmv_lock);
        mutex_init(&lmv->init_mutex);
 
        lprocfs_lmv_init_vars(&lvars);
-        lprocfs_obd_setup(obd, lvars.obd_vars);
+
+       lprocfs_obd_setup(obd, lvars.obd_vars);
 #ifdef LPROCFS
         {
                 rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
@@ -1044,21 +1187,16 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                                obd->obd_name, rc);
        }
 #endif
-        rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
-                             LUSTRE_CLI_FLD_HASH_DHT);
-        if (rc) {
-                CERROR("Can't init FLD, err %d\n", rc);
-                GOTO(out_free_datas, rc);
-        }
+       rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
+                            LUSTRE_CLI_FLD_HASH_DHT);
+       if (rc) {
+               CERROR("Can't init FLD, err %d\n", rc);
+               GOTO(out, rc);
+       }
 
         RETURN(0);
 
-out_free_datas:
-        OBD_FREE(lmv->datas, lmv->datas_size);
-        lmv->datas = NULL;
-out_free_tgts:
-        OBD_FREE(lmv->tgts, lmv->tgts_size);
-        lmv->tgts = NULL;
+out:
         return rc;
 }
 
@@ -1068,34 +1206,49 @@ static int lmv_cleanup(struct obd_device *obd)
        ENTRY;
 
        fld_client_fini(&lmv->lmv_fld);
-       OBD_FREE(lmv->datas, lmv->datas_size);
-       OBD_FREE(lmv->tgts, lmv->tgts_size);
-
+       if (lmv->tgts != NULL) {
+               int i;
+               for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+                       if (lmv->tgts[i] == NULL)
+                               continue;
+                       lmv_del_target(lmv, i);
+               }
+               OBD_FREE(lmv->tgts, sizeof(*lmv->tgts) * lmv->tgts_size);
+               lmv->tgts_size = 0;
+       }
        RETURN(0);
 }
 
 static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
 {
-        struct lustre_cfg     *lcfg = buf;
-        struct obd_uuid        tgt_uuid;
-        int                    rc;
-        ENTRY;
+       struct lustre_cfg       *lcfg = buf;
+       struct obd_uuid         obd_uuid;
+       int                     gen;
+       __u32                   index;
+       int                     rc;
+       ENTRY;
 
-        switch(lcfg->lcfg_command) {
-        case LCFG_ADD_MDC:
-                if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(tgt_uuid.uuid))
-                        GOTO(out, rc = -EINVAL);
-
-                obd_str2uuid(&tgt_uuid, lustre_cfg_string(lcfg, 1));
-                rc = lmv_add_target(obd, &tgt_uuid);
-                GOTO(out, rc);
-        default: {
-                CERROR("Unknown command: %d\n", lcfg->lcfg_command);
-                GOTO(out, rc = -EINVAL);
-        }
-        }
+       switch (lcfg->lcfg_command) {
+       case LCFG_ADD_MDC:
+               /* modify_mdc_tgts add 0:lustre-clilmv  1:lustre-MDT0000_UUID
+                * 2:0  3:1  4:lustre-MDT0000-mdc_UUID */
+               if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
+                       GOTO(out, rc = -EINVAL);
+
+               obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
+
+               if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
+                       GOTO(out, rc = -EINVAL);
+               if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
+                       GOTO(out, rc = -EINVAL);
+               rc = lmv_add_target(obd, &obd_uuid, index, gen);
+               GOTO(out, rc);
+       default:
+               CERROR("Unknown command: %d\n", lcfg->lcfg_command);
+               GOTO(out, rc = -EINVAL);
+       }
 out:
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
@@ -1117,17 +1270,17 @@ static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
                 RETURN(-ENOMEM);
 
         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-                if (lmv->tgts[i].ltd_exp == NULL)
-                        continue;
+               if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
+                       continue;
 
-                rc = obd_statfs(env, lmv->tgts[i].ltd_exp, temp,
-                                max_age, flags);
-                if (rc) {
-                        CERROR("can't stat MDS #%d (%s), error %d\n", i,
-                               lmv->tgts[i].ltd_exp->exp_obd->obd_name,
-                               rc);
-                        GOTO(out_free_temp, rc);
-                }
+               rc = obd_statfs(env, lmv->tgts[i]->ltd_exp, temp,
+                               max_age, flags);
+               if (rc) {
+                       CERROR("can't stat MDS #%d (%s), error %d\n", i,
+                              lmv->tgts[i]->ltd_exp->exp_obd->obd_name,
+                              rc);
+                       GOTO(out_free_temp, rc);
+               }
 
                if (i == 0) {
                        *osfs = *temp;
@@ -1166,8 +1319,8 @@ static int lmv_getstatus(struct obd_export *exp,
         if (rc)
                 RETURN(rc);
 
-        rc = md_getstatus(lmv->tgts[0].ltd_exp, fid, pc);
-        RETURN(rc);
+       rc = md_getstatus(lmv->tgts[0]->ltd_exp, fid, pc);
+       RETURN(rc);
 }
 
 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
@@ -1249,8 +1402,7 @@ static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
         RETURN(rc);
 }
 
-static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid,
-                             ldlm_iterator_t it, void *data)
+static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
 {
         struct obd_device   *obd = exp->exp_obd;
         struct lmv_obd      *lmv = &obd->u.lmv;
@@ -1264,15 +1416,18 @@ static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid,
 
         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
 
-        /*
-         * With CMD every object can have two locks in different namespaces:
-         * lookup lock in space of mds storing direntry and update/open lock in
-         * space of mds storing inode.
-         */
-        for (i = 0; i < lmv->desc.ld_tgt_count; i++)
-                md_change_cbdata(lmv->tgts[i].ltd_exp, fid, it, data);
+       /*
+        * With DNE every object can have two locks in different namespaces:
+        * lookup lock in space of MDT storing direntry and update/open lock in
+        * space of MDT storing inode.
+        */
+       for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+               if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
+                       continue;
+               md_null_inode(lmv->tgts[i]->ltd_exp, fid);
+       }
 
-        RETURN(0);
+       RETURN(0);
 }
 
 static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
@@ -1290,18 +1445,20 @@ static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
 
         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
 
-        /*
-         * With CMD every object can have two locks in different namespaces:
-         * lookup lock in space of mds storing direntry and update/open lock in
-         * space of mds storing inode.
-         */
-        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-                rc = md_find_cbdata(lmv->tgts[i].ltd_exp, fid, it, data);
-                if (rc)
-                        RETURN(rc);
-        }
+       /*
+        * With DNE every object can have two locks in different namespaces:
+        * lookup lock in space of MDT storing direntry and update/open lock in
+        * space of MDT storing inode.
+        */
+       for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+               if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
+                       continue;
+               rc = md_find_cbdata(lmv->tgts[i]->ltd_exp, fid, it, data);
+               if (rc)
+                       RETURN(rc);
+       }
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 
@@ -1757,36 +1914,6 @@ static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
         RETURN(rc);
 }
 
-#if 0
-static void lmv_hash_adjust(__u64 *hash, __u64 hash_adj)
-{
-        __u64         val;
-
-        val = le64_to_cpu(*hash);
-        if (val < hash_adj)
-                val += MAX_HASH_SIZE;
-        if (val != MDS_DIR_END_OFF)
-                *hash = cpu_to_le64(val - hash_adj);
-}
-
-static __u32 lmv_node_rank(struct obd_export *exp, const struct lu_fid *fid)
-{
-        __u64              id;
-        struct obd_import *imp;
-
-        /*
-         * XXX: to get nid we assume that underlying obd device is mdc.
-         */
-        imp  = class_exp2cliimp(exp);
-        id   = imp->imp_connection->c_self + fid_flatten(fid);
-
-        CDEBUG(D_INODE, "Readpage node rank: "LPX64" "DFID" "LPX64" "LPX64"\n",
-               imp->imp_connection->c_self, PFID(fid), id, id ^ (id >> 32));
-
-        return id ^ (id >> 32);
-}
-#endif
-
 static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
                         struct page **pages, struct ptlrpc_request **request)
 {
@@ -1830,6 +1957,8 @@ static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
         * since client ask dir from MDS{R} client has pages with offsets
         * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj
         * on hash  values that we get.
+        * Since these codes might be still useful for sharded directory, so
+        * Keeping this code for further reference
        if (0) {
                LASSERT(nr > 0);
                seg_size = MAX_HASH_SIZE;
@@ -1936,14 +2065,19 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
        struct obd_device       *obd = exp->exp_obd;
        struct lmv_obd          *lmv = &obd->u.lmv;
        struct lmv_tgt_desc     *tgt = NULL;
-       int                      rc;
+       struct mdt_body         *body;
+       int                     rc;
        ENTRY;
 
        rc = lmv_check_connect(obd);
        if (rc)
                RETURN(rc);
-
-       tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+retry:
+       /* Send unlink requests to the MDT where the child is located */
+       if (likely(!fid_is_zero(&op_data->op_fid2)))
+               tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
+       else
+               tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
        if (IS_ERR(tgt))
                RETURN(PTR_ERR(tgt));
 
@@ -1969,9 +2103,48 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
        if (rc != 0)
                RETURN(rc);
 
+       CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%d\n",
+              PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
+
        rc = md_unlink(tgt->ltd_exp, op_data, request);
+       if (rc != 0 && rc != -EREMOTE)
+               RETURN(rc);
 
-       RETURN(rc);
+       body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
+       if (body == NULL)
+               RETURN(-EPROTO);
+
+       /* Not cross-ref case, just get out of here. */
+       if (likely(!(body->valid & OBD_MD_MDS)))
+               RETURN(0);
+
+       CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n",
+              exp->exp_obd->obd_name, PFID(&body->fid1));
+
+       /* This is a remote object, try remote MDT, Note: it may
+        * try more than 1 time here, Considering following case
+        * /mnt/lustre is root on MDT0, remote1 is on MDT1
+        * 1. Initially A does not know where remote1 is, it send
+        *    unlink RPC to MDT0, MDT0 return -EREMOTE, it will
+        *    resend unlink RPC to MDT1 (retry 1st time).
+        *
+        * 2. During the unlink RPC in flight,
+        *    client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
+        *    and create new remote1, but on MDT0
+        *
+        * 3. MDT1 get unlink RPC(from A), then do remote lock on
+        *    /mnt/lustre, then lookup get fid of remote1, and find
+        *    it is remote dir again, and replay -EREMOTE again.
+        *
+        * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
+        *
+        * In theory, it might try unlimited time here, but it should
+        * be very rare case.  */
+       op_data->op_fid2 = body->fid1;
+       ptlrpc_req_finished(*request);
+       *request = NULL;
+
+       goto retry;
 }
 
 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
@@ -2012,7 +2185,7 @@ static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
 
         lmv = &obd->u.lmv;
         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
-                struct lmv_tgt_desc *tgts;
+               struct lmv_tgt_desc *tgt;
                 int i;
 
                 rc = lmv_check_connect(obd);
@@ -2020,20 +2193,17 @@ static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
                         RETURN(rc);
 
                 LASSERT(*vallen == sizeof(__u32));
-                for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
-                     i++, tgts++) {
-
-                        /*
-                         * All tgts should be connected when this gets called.
-                         */
-                        if (!tgts || !tgts->ltd_exp) {
-                                CERROR("target not setup?\n");
-                                continue;
-                        }
-
-                        if (!obd_get_info(env, tgts->ltd_exp, keylen, key,
-                                          vallen, val, NULL))
-                                RETURN(0);
+               for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+                       tgt = lmv->tgts[i];
+                       /*
+                        * All tgts should be connected when this gets called.
+                        */
+                       if (tgt == NULL || tgt->ltd_exp == NULL)
+                               continue;
+
+                       if (!obd_get_info(env, tgt->ltd_exp, keylen, key,
+                                         vallen, val, NULL))
+                               RETURN(0);
                 }
                 RETURN(-EINVAL);
         } else if (KEY_IS(KEY_MAX_EASIZE) || KEY_IS(KEY_CONN_DATA)) {
@@ -2041,16 +2211,14 @@ static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
                 if (rc)
                         RETURN(rc);
 
-                /*
-                 * Forwarding this request to first MDS, it should know LOV
-                 * desc.
-                 */
-                rc = obd_get_info(env, lmv->tgts[0].ltd_exp, keylen, key,
-                                  vallen, val, NULL);
-                if (!rc && KEY_IS(KEY_CONN_DATA)) {
-                        exp->exp_connect_flags =
-                        ((struct obd_connect_data *)val)->ocd_connect_flags;
-                }
+               /*
+                * Forwarding this request to first MDS, it should know LOV
+                * desc.
+                */
+               rc = obd_get_info(env, lmv->tgts[0]->ltd_exp, keylen, key,
+                                 vallen, val, NULL);
+               if (!rc && KEY_IS(KEY_CONN_DATA))
+                       exp->exp_connect_data = *(struct obd_connect_data *)val;
                 RETURN(rc);
         } else if (KEY_IS(KEY_TGT_COUNT)) {
                 *((int *)val) = lmv->desc.ld_tgt_count;
@@ -2082,11 +2250,11 @@ int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
         if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX)) {
                 int i, err = 0;
 
-                for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-                        tgt = &lmv->tgts[i];
+               for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+                       tgt = lmv->tgts[i];
 
-                        if (!tgt->ltd_exp)
-                                continue;
+                       if (tgt == NULL || tgt->ltd_exp == NULL)
+                               continue;
 
                         err = obd_set_info_async(env, tgt->ltd_exp,
                                                  keylen, key, vallen, val, set);
@@ -2141,10 +2309,10 @@ int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
         meap->mea_count = cpu_to_le32(lsmp->mea_count);
         meap->mea_master = cpu_to_le32(lsmp->mea_master);
 
-        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-                meap->mea_ids[i] = meap->mea_ids[i];
-                fid_cpu_to_le(&meap->mea_ids[i], &meap->mea_ids[i]);
-        }
+       for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+               meap->mea_ids[i] = lsmp->mea_ids[i];
+               fid_cpu_to_le(&meap->mea_ids[i], &lsmp->mea_ids[i]);
+       }
 
         RETURN(mea_size);
 }
@@ -2218,27 +2386,27 @@ static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
         LASSERT(fid != NULL);
 
         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-                if (!lmv->tgts[i].ltd_exp || !lmv->tgts[i].ltd_active)
-                        continue;
+               if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL ||
+                   lmv->tgts[i]->ltd_active == 0)
+                       continue;
 
-                err = md_cancel_unused(lmv->tgts[i].ltd_exp, fid,
-                                       policy, mode, flags, opaque);
-                if (!rc)
-                        rc = err;
+               err = md_cancel_unused(lmv->tgts[i]->ltd_exp, fid,
+                                      policy, mode, flags, opaque);
+               if (!rc)
+                       rc = err;
         }
-        RETURN(rc);
+       RETURN(rc);
 }
 
 int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
                       __u64 *bits)
 {
-        struct obd_device       *obd = exp->exp_obd;
-        struct lmv_obd          *lmv = &obd->u.lmv;
-        int                      rc;
-        ENTRY;
+       struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
+       int                      rc;
+       ENTRY;
 
-        rc =  md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data, bits);
-        RETURN(rc);
+       rc =  md_set_lock_data(lmv->tgts[0]->ltd_exp, lockh, data, bits);
+       RETURN(rc);
 }
 
 ldlm_mode_t lmv_lock_match(struct obd_export *exp, __u64 flags,
@@ -2261,7 +2429,12 @@ ldlm_mode_t lmv_lock_match(struct obd_export *exp, __u64 flags,
          * one fid was created in.
          */
         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-                rc = md_lock_match(lmv->tgts[i].ltd_exp, flags, fid,
+               if (lmv->tgts[i] == NULL ||
+                   lmv->tgts[i]->ltd_exp == NULL ||
+                   lmv->tgts[i]->ltd_active == 0)
+                       continue;
+
+               rc = md_lock_match(lmv->tgts[i]->ltd_exp, flags, fid,
                                    type, policy, mode, lockh);
                 if (rc)
                         RETURN(rc);
@@ -2271,26 +2444,23 @@ ldlm_mode_t lmv_lock_match(struct obd_export *exp, __u64 flags,
 }
 
 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
-                      struct obd_export *dt_exp, struct obd_export *md_exp,
-                      struct lustre_md *md)
+                     struct obd_export *dt_exp, struct obd_export *md_exp,
+                     struct lustre_md *md)
 {
-        struct obd_device       *obd = exp->exp_obd;
-        struct lmv_obd          *lmv = &obd->u.lmv;
-        int                      rc;
-        ENTRY;
-        rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, dt_exp, md_exp, md);
-        RETURN(rc);
+       struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
+
+       return md_get_lustre_md(lmv->tgts[0]->ltd_exp, req, dt_exp, md_exp, md);
 }
 
 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
 {
-        struct obd_device       *obd = exp->exp_obd;
-        struct lmv_obd          *lmv = &obd->u.lmv;
-        ENTRY;
+       struct obd_device       *obd = exp->exp_obd;
+       struct lmv_obd          *lmv = &obd->u.lmv;
+       ENTRY;
 
-        if (md->mea)
-                obd_free_memmd(exp, (void *)&md->mea);
-        RETURN(md_free_lustre_md(lmv->tgts[0].ltd_exp, md));
+       if (md->mea)
+               obd_free_memmd(exp, (void *)&md->mea);
+       RETURN(md_free_lustre_md(lmv->tgts[0]->ltd_exp, md));
 }
 
 int lmv_set_open_replay_data(struct obd_export *exp,
@@ -2369,15 +2539,11 @@ static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
 }
 
 int lmv_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
-                    const struct req_msg_field *field, struct obd_capa **oc)
+                   const struct req_msg_field *field, struct obd_capa **oc)
 {
-        struct obd_device *obd = exp->exp_obd;
-        struct lmv_obd *lmv = &obd->u.lmv;
-        int rc;
+       struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
 
-        ENTRY;
-        rc = md_unpack_capa(lmv->tgts[0].ltd_exp, req, field, oc);
-        RETURN(rc);
+       return md_unpack_capa(lmv->tgts[0]->ltd_exp, req, field, oc);
 }
 
 int lmv_intent_getattr_async(struct obd_export *exp,
@@ -2430,14 +2596,14 @@ int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
  * we directly fetch data from the slave MDTs.
  */
 int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
-                 struct obd_quotactl *oqctl)
+                struct obd_quotactl *oqctl)
 {
-        struct obd_device   *obd = class_exp2obd(exp);
-        struct lmv_obd      *lmv = &obd->u.lmv;
-        struct lmv_tgt_desc *tgt = &lmv->tgts[0];
-        int                  rc = 0, i;
-        __u64                curspace, curinodes;
-        ENTRY;
+       struct obd_device   *obd = class_exp2obd(exp);
+       struct lmv_obd      *lmv = &obd->u.lmv;
+       struct lmv_tgt_desc *tgt = lmv->tgts[0];
+       int                  rc = 0, i;
+       __u64                curspace, curinodes;
+       ENTRY;
 
         if (!lmv->desc.ld_tgt_count || !tgt->ltd_active) {
                 CERROR("master lmv inactive\n");
@@ -2451,11 +2617,11 @@ int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
 
         curspace = curinodes = 0;
         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-                int err;
-                tgt = &lmv->tgts[i];
+               int err;
+               tgt = lmv->tgts[i];
 
-                if (tgt->ltd_exp == NULL)
-                        continue;
+               if (tgt == NULL || tgt->ltd_exp == NULL || tgt->ltd_active == 0)
+                       continue;
                 if (!tgt->ltd_active) {
                         CDEBUG(D_HA, "mdt %d is inactive.\n", i);
                         continue;
@@ -2486,13 +2652,13 @@ int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp,
         int                  i, rc = 0;
         ENTRY;
 
-        for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
+       for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
                 int err;
-
-                if (!tgt->ltd_active) {
-                        CERROR("lmv idx %d inactive\n", i);
-                        RETURN(-EIO);
-                }
+               tgt = lmv->tgts[i];
+               if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) {
+                       CERROR("lmv idx %d inactive\n", i);
+                       RETURN(-EIO);
+               }
 
                 err = obd_quotacheck(tgt->ltd_exp, oqctl);
                 if (err && !rc)
@@ -2524,7 +2690,7 @@ struct obd_ops lmv_obd_ops = {
 
 struct md_ops lmv_md_ops = {
         .m_getstatus            = lmv_getstatus,
-        .m_change_cbdata        = lmv_change_cbdata,
+        .m_null_inode          = lmv_null_inode,
         .m_find_cbdata          = lmv_find_cbdata,
         .m_close                = lmv_close,
         .m_create               = lmv_create,