Whamcloud - gitweb
LU-3068 build: fix 'incorrect expression' errors
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
index c8fda0a..e55f4a0 100644 (file)
@@ -344,7 +344,6 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
 #endif
         struct lmv_obd          *lmv = &obd->u.lmv;
         struct obd_uuid         *cluuid = &lmv->cluuid;
-        struct obd_connect_data *mdc_data = NULL;
         struct obd_uuid          lmv_mdc_uuid = { "LMV_MDC_UUID" };
         struct obd_device       *mdc_obd;
         struct obd_export       *mdc_exp;
@@ -389,8 +388,6 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
 
         fld_client_add_target(&lmv->lmv_fld, &target);
 
-        mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data;
-
         rc = obd_register_observer(mdc_obd, obd);
         if (rc) {
                 obd_disconnect(mdc_exp);
@@ -733,6 +730,88 @@ out_local:
         RETURN(rc);
 }
 
+static int lmv_fid2path(struct obd_export *exp, int len, void *karg, void *uarg)
+{
+       struct obd_device       *obddev = class_exp2obd(exp);
+       struct lmv_obd          *lmv = &obddev->u.lmv;
+       struct getinfo_fid2path *gf;
+       struct lmv_tgt_desc     *tgt;
+       struct getinfo_fid2path *remote_gf = NULL;
+       int                     remote_gf_size = 0;
+       int                     rc;
+
+       gf = (struct getinfo_fid2path *)karg;
+       tgt = lmv_find_target(lmv, &gf->gf_fid);
+       if (IS_ERR(tgt))
+               RETURN(PTR_ERR(tgt));
+
+repeat_fid2path:
+       rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
+       if (rc != 0 && rc != -EREMOTE)
+               GOTO(out_fid2path, rc);
+
+       /* If remote_gf != NULL, it means just building the
+        * path on the remote MDT, copy this path segement to gf */
+       if (remote_gf != NULL) {
+               struct getinfo_fid2path *ori_gf;
+               char *ptr;
+
+               ori_gf = (struct getinfo_fid2path *)karg;
+               if (strlen(ori_gf->gf_path) +
+                   strlen(gf->gf_path) > ori_gf->gf_pathlen)
+                       GOTO(out_fid2path, rc = -EOVERFLOW);
+
+               ptr = ori_gf->gf_path;
+
+               memmove(ptr + strlen(gf->gf_path) + 1, ptr,
+                       strlen(ori_gf->gf_path));
+
+               strncpy(ptr, gf->gf_path, strlen(gf->gf_path));
+               ptr += strlen(gf->gf_path);
+               *ptr = '/';
+       }
+
+       CDEBUG(D_INFO, "%s: get path %s "DFID" rec: "LPU64" ln: %u\n",
+              tgt->ltd_exp->exp_obd->obd_name,
+              gf->gf_path, PFID(&gf->gf_fid), gf->gf_recno,
+              gf->gf_linkno);
+
+       if (rc == 0)
+               GOTO(out_fid2path, rc);
+
+       /* sigh, has to go to another MDT to do path building further */
+       if (remote_gf == NULL) {
+               remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
+               OBD_ALLOC(remote_gf, remote_gf_size);
+               if (remote_gf == NULL)
+                       GOTO(out_fid2path, rc = -ENOMEM);
+               remote_gf->gf_pathlen = PATH_MAX;
+       }
+
+       if (!fid_is_sane(&gf->gf_fid)) {
+               CERROR("%s: invalid FID "DFID": rc = %d\n",
+                      tgt->ltd_exp->exp_obd->obd_name,
+                      PFID(&gf->gf_fid), -EINVAL);
+               GOTO(out_fid2path, rc = -EINVAL);
+       }
+
+       tgt = lmv_find_target(lmv, &gf->gf_fid);
+       if (IS_ERR(tgt))
+               GOTO(out_fid2path, rc = -EINVAL);
+
+       remote_gf->gf_fid = gf->gf_fid;
+       remote_gf->gf_recno = -1;
+       remote_gf->gf_linkno = -1;
+       memset(remote_gf->gf_path, 0, remote_gf->gf_pathlen);
+       gf = remote_gf;
+       goto repeat_fid2path;
+
+out_fid2path:
+       if (remote_gf != NULL)
+               OBD_FREE(remote_gf, remote_gf_size);
+       RETURN(rc);
+}
+
 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                          int len, void *karg, void *uarg)
 {
@@ -853,26 +932,25 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                break;
        }
        case OBD_IOC_FID2PATH: {
-               struct getinfo_fid2path *gf;
-               struct lmv_tgt_desc     *tgt;
-
-               gf = (struct getinfo_fid2path *)karg;
-               tgt = lmv_find_target(lmv, &gf->gf_fid);
-               if (IS_ERR(tgt))
-                       RETURN(PTR_ERR(tgt));
-               rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
+               rc = lmv_fid2path(exp, len, karg, uarg);
                break;
        }
        case LL_IOC_HSM_STATE_GET:
        case LL_IOC_HSM_STATE_SET:
-       case LL_IOC_HSM_ACTION: {
+       case LL_IOC_HSM_ACTION:
+       case LL_IOC_LOV_SWAP_LAYOUTS: {
                struct md_op_data       *op_data = karg;
-               struct lmv_tgt_desc     *tgt;
+               struct lmv_tgt_desc     *tgt1, *tgt2;
 
-               tgt = lmv_find_target(lmv, &op_data->op_fid1);
-               if (!tgt->ltd_exp)
+               tgt1 = lmv_find_target(lmv, &op_data->op_fid1);
+               tgt2 = lmv_find_target(lmv, &op_data->op_fid2);
+               if ((tgt1->ltd_exp == NULL) || (tgt2->ltd_exp == NULL))
                        RETURN(-EINVAL);
 
+               /* only files on same MDT can be have their layouts swapped */
+               if (tgt1->ltd_idx != tgt2->ltd_idx)
+                       RETURN(-EPERM);
+
                rc = obd_iocontrol(cmd, lmv->tgts[0]->ltd_exp, len, karg, uarg);
                break;
        }
@@ -962,13 +1040,41 @@ static int lmv_placement_policy(struct obd_device *obd,
                                 struct md_op_data *op_data,
                                 mdsno_t *mds)
 {
+       struct lmv_obd          *lmv = &obd->u.lmv;
+       ENTRY;
+
        LASSERT(mds != NULL);
 
-       /* Allocate new fid on target according to to different
-        * QOS policy. In DNE phase I, llite should always tell
-        * which MDT where the dir will be located */
-       *mds = op_data->op_mds;
+       if (lmv->desc.ld_tgt_count == 1) {
+               *mds = 0;
+               RETURN(0);
+       }
 
+       /**
+        * If stripe_offset is provided during setdirstripe
+        * (setdirstripe -i xx), xx MDS will be choosen.
+        */
+       if (op_data->op_cli_flags & CLI_SET_MEA) {
+               struct lmv_user_md *lum;
+
+               lum = (struct lmv_user_md *)op_data->op_data;
+               if (lum->lum_type == LMV_STRIPE_TYPE &&
+                   lum->lum_stripe_offset != -1) {
+                       if (lum->lum_stripe_offset >= lmv->desc.ld_tgt_count) {
+                               CERROR("%s: Stripe_offset %d > MDT count %d:"
+                                      " rc = %d\n", obd->obd_name,
+                                      lum->lum_stripe_offset,
+                                      lmv->desc.ld_tgt_count, -ERANGE);
+                               RETURN(-ERANGE);
+                       }
+                       *mds = lum->lum_stripe_offset;
+                       RETURN(0);
+               }
+       }
+
+       /* Allocate new fid on target according to operation type and parent
+        * home mds. */
+       *mds = op_data->op_mds;
        RETURN(0);
 }
 
@@ -1296,8 +1402,7 @@ static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
         RETURN(rc);
 }
 
-static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid,
-                             ldlm_iterator_t it, void *data)
+static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
 {
         struct obd_device   *obd = exp->exp_obd;
         struct lmv_obd      *lmv = &obd->u.lmv;
@@ -1319,7 +1424,7 @@ static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid,
        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
                if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
                        continue;
-               md_change_cbdata(lmv->tgts[i]->ltd_exp, fid, it, data);
+               md_null_inode(lmv->tgts[i]->ltd_exp, fid);
        }
 
        RETURN(0);
@@ -1960,14 +2065,19 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
        struct obd_device       *obd = exp->exp_obd;
        struct lmv_obd          *lmv = &obd->u.lmv;
        struct lmv_tgt_desc     *tgt = NULL;
-       int                      rc;
+       struct mdt_body         *body;
+       int                     rc;
        ENTRY;
 
        rc = lmv_check_connect(obd);
        if (rc)
                RETURN(rc);
-
-       tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+retry:
+       /* Send unlink requests to the MDT where the child is located */
+       if (likely(!fid_is_zero(&op_data->op_fid2)))
+               tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
+       else
+               tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
        if (IS_ERR(tgt))
                RETURN(PTR_ERR(tgt));
 
@@ -1993,9 +2103,48 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
        if (rc != 0)
                RETURN(rc);
 
+       CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%d\n",
+              PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
+
        rc = md_unlink(tgt->ltd_exp, op_data, request);
+       if (rc != 0 && rc != -EREMOTE)
+               RETURN(rc);
 
-       RETURN(rc);
+       body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
+       if (body == NULL)
+               RETURN(-EPROTO);
+
+       /* Not cross-ref case, just get out of here. */
+       if (likely(!(body->valid & OBD_MD_MDS)))
+               RETURN(0);
+
+       CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n",
+              exp->exp_obd->obd_name, PFID(&body->fid1));
+
+       /* This is a remote object, try remote MDT, Note: it may
+        * try more than 1 time here, Considering following case
+        * /mnt/lustre is root on MDT0, remote1 is on MDT1
+        * 1. Initially A does not know where remote1 is, it send
+        *    unlink RPC to MDT0, MDT0 return -EREMOTE, it will
+        *    resend unlink RPC to MDT1 (retry 1st time).
+        *
+        * 2. During the unlink RPC in flight,
+        *    client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
+        *    and create new remote1, but on MDT0
+        *
+        * 3. MDT1 get unlink RPC(from A), then do remote lock on
+        *    /mnt/lustre, then lookup get fid of remote1, and find
+        *    it is remote dir again, and replay -EREMOTE again.
+        *
+        * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
+        *
+        * In theory, it might try unlimited time here, but it should
+        * be very rare case.  */
+       op_data->op_fid2 = body->fid1;
+       ptlrpc_req_finished(*request);
+       *request = NULL;
+
+       goto retry;
 }
 
 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
@@ -2160,10 +2309,10 @@ int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
         meap->mea_count = cpu_to_le32(lsmp->mea_count);
         meap->mea_master = cpu_to_le32(lsmp->mea_master);
 
-        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-                meap->mea_ids[i] = meap->mea_ids[i];
-                fid_cpu_to_le(&meap->mea_ids[i], &meap->mea_ids[i]);
-        }
+       for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+               meap->mea_ids[i] = lsmp->mea_ids[i];
+               fid_cpu_to_le(&meap->mea_ids[i], &lsmp->mea_ids[i]);
+       }
 
         RETURN(mea_size);
 }
@@ -2541,7 +2690,7 @@ struct obd_ops lmv_obd_ops = {
 
 struct md_ops lmv_md_ops = {
         .m_getstatus            = lmv_getstatus,
-        .m_change_cbdata        = lmv_change_cbdata,
+        .m_null_inode          = lmv_null_inode,
         .m_find_cbdata          = lmv_find_cbdata,
         .m_close                = lmv_close,
         .m_create               = lmv_create,