Whamcloud - gitweb
LU-4684 migrate: migrate striped directory
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
index 5ab1cb4..29e1e76 100644 (file)
@@ -1864,156 +1864,286 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
        RETURN(rc);
 }
 
-static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
-                     const char *old, size_t oldlen,
-                     const char *new, size_t newlen,
-                     struct ptlrpc_request **request)
+static int lmv_migrate(struct obd_export *exp, struct md_op_data *op_data,
+                       const char *name, size_t namelen,
+                       struct ptlrpc_request **request)
 {
-       struct obd_device       *obd = exp->exp_obd;
-       struct lmv_obd          *lmv = &obd->u.lmv;
-       struct lmv_tgt_desc     *src_tgt;
-       struct lmv_tgt_desc     *tgt_tgt;
-       struct obd_export       *target_exp;
-       struct mdt_body         *body;
-       int                     rc;
+       struct obd_device *obd = exp->exp_obd;
+       struct lmv_obd *lmv = &obd->u.lmv;
+       struct lmv_stripe_md *lsm = op_data->op_mea1;
+       struct lmv_tgt_desc *parent_tgt;
+       struct lmv_tgt_desc *sp_tgt;
+       struct lmv_tgt_desc *tp_tgt = NULL;
+       struct lmv_tgt_desc *child_tgt;
+       struct lmv_tgt_desc *tgt;
+       struct lu_fid target_fid;
+       int rc;
+
        ENTRY;
 
-       LASSERT(oldlen != 0);
+       LASSERT(op_data->op_cli_flags & CLI_MIGRATE);
+       LASSERTF(fid_is_sane(&op_data->op_fid3), "invalid FID "DFID"\n",
+                PFID(&op_data->op_fid3));
 
-       CDEBUG(D_INODE, "RENAME %.*s in "DFID":%d to %.*s in "DFID":%d\n",
-              (int)oldlen, old, PFID(&op_data->op_fid1),
-              op_data->op_mea1 ? op_data->op_mea1->lsm_md_stripe_count : 0,
-              (int)newlen, new, PFID(&op_data->op_fid2),
-              op_data->op_mea2 ? op_data->op_mea2->lsm_md_stripe_count : 0);
+       CDEBUG(D_INODE, "MIGRATE "DFID"/%.*s\n",
+              PFID(&op_data->op_fid1), (int)namelen, name);
 
        op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
        op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
        op_data->op_cap = cfs_curproc_cap_pack();
-       if (op_data->op_cli_flags & CLI_MIGRATE) {
-               LASSERTF(fid_is_sane(&op_data->op_fid3), "invalid FID "DFID"\n",
-                        PFID(&op_data->op_fid3));
-
-               if (op_data->op_mea1 != NULL) {
-                       struct lmv_stripe_md    *lsm = op_data->op_mea1;
-                       struct lmv_tgt_desc     *tmp;
-
-                       /* Fix the parent fid for striped dir */
-                       tmp = lmv_locate_target_for_name(lmv, lsm, old,
-                                                        oldlen,
-                                                        &op_data->op_fid1,
-                                                        NULL);
-                       if (IS_ERR(tmp))
-                               RETURN(PTR_ERR(tmp));
-               }
-
-               rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
-               if (rc != 0)
-                       RETURN(rc);
 
-               src_tgt = lmv_find_target(lmv, &op_data->op_fid3);
-               if (IS_ERR(src_tgt))
-                       RETURN(PTR_ERR(src_tgt));
+       parent_tgt = lmv_find_target(lmv, &op_data->op_fid1);
+       if (IS_ERR(parent_tgt))
+               RETURN(PTR_ERR(parent_tgt));
 
-               target_exp = src_tgt->ltd_exp;
-       } else {
-               if (op_data->op_mea1 != NULL) {
-                       struct lmv_stripe_md    *lsm = op_data->op_mea1;
+       if (lsm) {
+               __u32 hash_type = lsm->lsm_md_hash_type;
+               __u32 stripe_count = lsm->lsm_md_stripe_count;
 
-                       src_tgt = lmv_locate_target_for_name(lmv, lsm, old,
-                                                            oldlen,
-                                                            &op_data->op_fid1,
-                                                            &op_data->op_mds);
-               } else {
-                       src_tgt = lmv_find_target(lmv, &op_data->op_fid1);
+               /*
+                * old stripes are appended after new stripes for migrating
+                * directory.
+                */
+               if (lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION) {
+                       hash_type = lsm->lsm_md_migrate_hash;
+                       stripe_count -= lsm->lsm_md_migrate_offset;
                }
-               if (IS_ERR(src_tgt))
-                       RETURN(PTR_ERR(src_tgt));
 
+               rc = lmv_name_to_stripe_index(hash_type, stripe_count, name,
+                                             namelen);
+               if (rc < 0)
+                       RETURN(rc);
 
-               if (op_data->op_mea2 != NULL) {
-                       struct lmv_stripe_md    *lsm = op_data->op_mea2;
+               if (lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION)
+                       rc += lsm->lsm_md_migrate_offset;
 
-                       tgt_tgt = lmv_locate_target_for_name(lmv, lsm, new,
-                                                            newlen,
-                                                            &op_data->op_fid2,
-                                                            &op_data->op_mds);
-               } else {
-                       tgt_tgt = lmv_find_target(lmv, &op_data->op_fid2);
+               /* save it in fid4 temporarily for early cancel */
+               op_data->op_fid4 = lsm->lsm_md_oinfo[rc].lmo_fid;
+               sp_tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[rc].lmo_mds,
+                                       NULL);
+               if (IS_ERR(sp_tgt))
+                       RETURN(PTR_ERR(sp_tgt));
 
+               /*
+                * if parent is being migrated too, fill op_fid2 with target
+                * stripe fid, otherwise the target stripe is not created yet.
+                */
+               if (lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION) {
+                       hash_type = lsm->lsm_md_hash_type &
+                                   ~LMV_HASH_FLAG_MIGRATION;
+                       stripe_count = lsm->lsm_md_migrate_offset;
+
+                       rc = lmv_name_to_stripe_index(hash_type, stripe_count,
+                                                     name, namelen);
+                       if (rc < 0)
+                               RETURN(rc);
+
+                       op_data->op_fid2 = lsm->lsm_md_oinfo[rc].lmo_fid;
+                       tp_tgt = lmv_get_target(lmv,
+                                               lsm->lsm_md_oinfo[rc].lmo_mds,
+                                               NULL);
+                       if (IS_ERR(tp_tgt))
+                               RETURN(PTR_ERR(tp_tgt));
                }
-               if (IS_ERR(tgt_tgt))
-                       RETURN(PTR_ERR(tgt_tgt));
-
-               target_exp = tgt_tgt->ltd_exp;
+       } else {
+               sp_tgt = parent_tgt;
        }
 
-       /*
-        * LOOKUP lock on src child (fid3) should also be cancelled for
-        * src_tgt in mdc_rename.
-        */
-       op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
+       child_tgt = lmv_find_target(lmv, &op_data->op_fid3);
+       if (IS_ERR(child_tgt))
+               RETURN(PTR_ERR(child_tgt));
 
-       /*
-        * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
-        * own target.
-        */
-       rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
-                             LCK_EX, MDS_INODELOCK_UPDATE,
-                             MF_MDC_CANCEL_FID2);
-
-       if (rc != 0)
+       rc = lmv_fid_alloc(NULL, exp, &target_fid, op_data);
+       if (rc)
                RETURN(rc);
+
        /*
-        * Cancel LOOKUP locks on source child (fid3) for parent tgt_tgt.
+        * for directory, send migrate request to the MDT where the object will
+        * be migrated to, because we can't create a striped directory remotely.
+        *
+        * otherwise, send to the MDT where source is located because regular
+        * file may open lease.
+        *
+        * NB. if MDT doesn't support DIR_MIGRATE, send to source MDT too for
+        * backward compatibility.
         */
-       if (fid_is_sane(&op_data->op_fid3)) {
-               struct lmv_tgt_desc *tgt;
-
-               tgt = lmv_find_target(lmv, &op_data->op_fid1);
+       if (S_ISDIR(op_data->op_mode) &&
+           (exp_connect_flags2(exp) & OBD_CONNECT2_DIR_MIGRATE)) {
+               tgt = lmv_find_target(lmv, &target_fid);
                if (IS_ERR(tgt))
                        RETURN(PTR_ERR(tgt));
+       } else {
+               tgt = child_tgt;
+       }
 
-               /* Cancel LOOKUP lock on its parent */
-               rc = lmv_early_cancel(exp, tgt, op_data, src_tgt->ltd_idx,
-                                     LCK_EX, MDS_INODELOCK_LOOKUP,
-                                     MF_MDC_CANCEL_FID3);
-               if (rc != 0)
+       /* cancel UPDATE lock of parent master object */
+       rc = lmv_early_cancel(exp, parent_tgt, op_data, tgt->ltd_idx, LCK_EX,
+                             MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
+       if (rc)
+               RETURN(rc);
+
+       /* cancel UPDATE lock of source parent */
+       if (sp_tgt != parent_tgt) {
+               /*
+                * migrate RPC packs master object FID, because we can only pack
+                * two FIDs in reint RPC, but MDS needs to know both source
+                * parent and target parent, and it will obtain them from master
+                * FID and LMV, the other FID in RPC is kept for target.
+                *
+                * since this FID is not passed to MDC, cancel it anyway.
+                */
+               rc = lmv_early_cancel(exp, sp_tgt, op_data, -1, LCK_EX,
+                                     MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID4);
+               if (rc)
                        RETURN(rc);
 
-               rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
-                                     LCK_EX, MDS_INODELOCK_ELC,
+               op_data->op_flags &= ~MF_MDC_CANCEL_FID4;
+       }
+       op_data->op_fid4 = target_fid;
+
+       /* cancel UPDATE locks of target parent */
+       rc = lmv_early_cancel(exp, tp_tgt, op_data, tgt->ltd_idx, LCK_EX,
+                             MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2);
+       if (rc)
+               RETURN(rc);
+
+       /* cancel LOOKUP lock of source if source is remote object */
+       if (child_tgt != sp_tgt) {
+               rc = lmv_early_cancel(exp, sp_tgt, op_data, tgt->ltd_idx,
+                                     LCK_EX, MDS_INODELOCK_LOOKUP,
                                      MF_MDC_CANCEL_FID3);
-               if (rc != 0)
+               if (rc)
                        RETURN(rc);
        }
 
-retry_rename:
-       /*
-        * Cancel all the locks on tgt child (fid4).
-        */
-       if (fid_is_sane(&op_data->op_fid4)) {
-               struct lmv_tgt_desc *tgt;
+       /* cancel ELC locks of source */
+       rc = lmv_early_cancel(exp, child_tgt, op_data, tgt->ltd_idx, LCK_EX,
+                             MDS_INODELOCK_ELC, MF_MDC_CANCEL_FID3);
+       if (rc)
+               RETURN(rc);
 
-               rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
-                                     LCK_EX, MDS_INODELOCK_ELC,
-                                     MF_MDC_CANCEL_FID4);
-               if (rc != 0)
-                       RETURN(rc);
+       rc = md_rename(tgt->ltd_exp, op_data, name, namelen, NULL, 0, request);
+
+       RETURN(rc);
+}
+
+static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
+                     const char *old, size_t oldlen,
+                     const char *new, size_t newlen,
+                     struct ptlrpc_request **request)
+{
+       struct obd_device *obd = exp->exp_obd;
+       struct lmv_obd *lmv = &obd->u.lmv;
+       struct lmv_stripe_md *lsm = op_data->op_mea1;
+       struct lmv_tgt_desc *sp_tgt;
+       struct lmv_tgt_desc *tp_tgt = NULL;
+       struct lmv_tgt_desc *tgt;
+       struct mdt_body *body;
+       int rc;
+
+       ENTRY;
+
+       LASSERT(oldlen != 0);
+
+       if (op_data->op_cli_flags & CLI_MIGRATE) {
+               rc = lmv_migrate(exp, op_data, old, oldlen, request);
+               RETURN(rc);
+       }
+
+       op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
+       op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
+       op_data->op_cap = cfs_curproc_cap_pack();
+
+       CDEBUG(D_INODE, "RENAME "DFID"/%.*s to "DFID"/%.*s\n",
+               PFID(&op_data->op_fid1), (int)oldlen, old,
+               PFID(&op_data->op_fid2), (int)newlen, new);
 
+       if (lsm)
+               sp_tgt = lmv_locate_target_for_name(lmv, lsm, old, oldlen,
+                                                   &op_data->op_fid1,
+                                                   &op_data->op_mds);
+       else
+               sp_tgt = lmv_find_target(lmv, &op_data->op_fid1);
+       if (IS_ERR(sp_tgt))
+               RETURN(PTR_ERR(sp_tgt));
+
+       lsm = op_data->op_mea2;
+       if (lsm)
+               tp_tgt = lmv_locate_target_for_name(lmv, lsm, new, newlen,
+                                                   &op_data->op_fid2,
+                                                   &op_data->op_mds);
+       else
+               tp_tgt = lmv_find_target(lmv, &op_data->op_fid2);
+       if (IS_ERR(tp_tgt))
+               RETURN(PTR_ERR(tp_tgt));
+
+       /* Since the target child might be destroyed, and it might become
+        * orphan, and we can only check orphan on the local MDT right now, so
+        * we send rename request to the MDT where target child is located. If
+        * target child does not exist, then it will send the request to the
+        * target parent */
+       if (fid_is_sane(&op_data->op_fid4)) {
                tgt = lmv_find_target(lmv, &op_data->op_fid4);
                if (IS_ERR(tgt))
                        RETURN(PTR_ERR(tgt));
+       } else {
+               tgt = tp_tgt;
+       }
+
+       op_data->op_flags |= MF_MDC_CANCEL_FID4;
+
+       /* cancel UPDATE locks of source parent */
+       rc = lmv_early_cancel(exp, sp_tgt, op_data, tgt->ltd_idx, LCK_EX,
+                             MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
+       if (rc != 0)
+               RETURN(rc);
+
+       /* cancel UPDATE locks of target parent */
+       rc = lmv_early_cancel(exp, tp_tgt, op_data, tgt->ltd_idx, LCK_EX,
+                             MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2);
+       if (rc != 0)
+               RETURN(rc);
+
+       if (fid_is_sane(&op_data->op_fid3)) {
+               struct lmv_tgt_desc *src_tgt;
+
+               src_tgt = lmv_find_target(lmv, &op_data->op_fid3);
+               if (IS_ERR(src_tgt))
+                       RETURN(PTR_ERR(src_tgt));
+
+               /* cancel LOOKUP lock of source on source parent */
+               if (src_tgt != sp_tgt) {
+                       rc = lmv_early_cancel(exp, sp_tgt, op_data,
+                                             tgt->ltd_idx, LCK_EX,
+                                             MDS_INODELOCK_LOOKUP,
+                                             MF_MDC_CANCEL_FID3);
+                       if (rc != 0)
+                               RETURN(rc);
+               }
+
+               /* cancel ELC locks of source */
+               rc = lmv_early_cancel(exp, src_tgt, op_data, tgt->ltd_idx,
+                                     LCK_EX, MDS_INODELOCK_ELC,
+                                     MF_MDC_CANCEL_FID3);
+               if (rc != 0)
+                       RETURN(rc);
+       }
 
-               /* Since the target child might be destroyed, and it might
-                * become orphan, and we can only check orphan on the local
-                * MDT right now, so we send rename request to the MDT where
-                * target child is located. If target child does not exist,
-                * then it will send the request to the target parent */
-               target_exp = tgt->ltd_exp;
+retry_rename:
+       if (fid_is_sane(&op_data->op_fid4)) {
+               /* cancel LOOKUP lock of target on target parent */
+               if (tgt != tp_tgt) {
+                       rc = lmv_early_cancel(exp, tp_tgt, op_data,
+                                             tgt->ltd_idx, LCK_EX,
+                                             MDS_INODELOCK_LOOKUP,
+                                             MF_MDC_CANCEL_FID4);
+                       if (rc != 0)
+                               RETURN(rc);
+               }
        }
 
-       rc = md_rename(target_exp, op_data, old, oldlen, new, newlen,
-                      request);
+       rc = md_rename(tgt->ltd_exp, op_data, old, oldlen, new, newlen,
+                       request);
 
        if (rc != 0 && rc != -EXDEV)
                RETURN(rc);
@@ -2032,6 +2162,11 @@ retry_rename:
        op_data->op_fid4 = body->mbo_fid1;
        ptlrpc_req_finished(*request);
        *request = NULL;
+
+       tgt = lmv_find_target(lmv, &op_data->op_fid4);
+       if (IS_ERR(tgt))
+               RETURN(PTR_ERR(tgt));
+
        goto retry_rename;
 }
 
@@ -2781,13 +2916,15 @@ static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm,
        else
                lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type);
        lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version);
+       lsm->lsm_md_migrate_offset = le32_to_cpu(lmm1->lmv_migrate_offset);
+       lsm->lsm_md_migrate_hash = le32_to_cpu(lmm1->lmv_migrate_hash);
        cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name,
                        sizeof(lsm->lsm_md_pool_name));
 
        if (cplen >= sizeof(lsm->lsm_md_pool_name))
                RETURN(-E2BIG);
 
-       CDEBUG(D_INFO, "unpack lsm count %d, master %d hash_type %d"
+       CDEBUG(D_INFO, "unpack lsm count %d, master %d hash_type %#x "
               "layout_version %d\n", lsm->lsm_md_stripe_count,
               lsm->lsm_md_master_mdt_index, lsm->lsm_md_hash_type,
               lsm->lsm_md_layout_version);
@@ -2822,14 +2959,9 @@ static int lmv_unpackmd(struct obd_export *exp, struct lmv_stripe_md **lsmp,
        /* Free memmd */
        if (lsm != NULL && lmm == NULL) {
                int i;
-               for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
-                       /* For migrating inode, the master stripe and master
-                        * object will be the same, so do not need iput, see
-                        * ll_update_lsm_md */
-                       if (!(lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION &&
-                             i == 0) && lsm->lsm_md_oinfo[i].lmo_root != NULL)
-                               iput(lsm->lsm_md_oinfo[i].lmo_root);
-               }
+
+               for (i = 0; i < lsm->lsm_md_stripe_count; i++)
+                       iput(lsm->lsm_md_oinfo[i].lmo_root);
                lsm_size = lmv_stripe_md_size(lsm->lsm_md_stripe_count);
                OBD_FREE(lsm, lsm_size);
                *lsmp = NULL;